1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 25 /* 26 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 27 * Use is subject to license terms. 28 */ 29 30 #pragma ident "%Z%%M% %I% %E% SMI" 31 32 #include <sys/types.h> 33 #include <sys/param.h> 34 #include <sys/thread.h> 35 #include <sys/sysmacros.h> 36 #include <sys/stropts.h> 37 #include <sys/stream.h> 38 #include <sys/strsubr.h> 39 #include <sys/strsun.h> 40 #include <sys/conf.h> 41 #include <sys/debug.h> 42 #include <sys/cmn_err.h> 43 #include <sys/kmem.h> 44 #include <sys/atomic.h> 45 #include <sys/errno.h> 46 #include <sys/vtrace.h> 47 #include <sys/ftrace.h> 48 #include <sys/ontrap.h> 49 #include <sys/multidata.h> 50 #include <sys/multidata_impl.h> 51 #include <sys/sdt.h> 52 #include <sys/strft.h> 53 54 #ifdef DEBUG 55 #include <sys/kmem_impl.h> 56 #endif 57 58 /* 59 * This file contains all the STREAMS utility routines that may 60 * be used by modules and drivers. 61 */ 62 63 /* 64 * STREAMS message allocator: principles of operation 65 * 66 * The streams message allocator consists of all the routines that 67 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 68 * dupb(), freeb() and freemsg(). What follows is a high-level view 69 * of how the allocator works. 70 * 71 * Every streams message consists of one or more mblks, a dblk, and data. 72 * All mblks for all types of messages come from a common mblk_cache. 73 * The dblk and data come in several flavors, depending on how the 74 * message is allocated: 75 * 76 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 77 * fixed-size dblk/data caches. For message sizes that are multiples of 78 * PAGESIZE, dblks are allocated separately from the buffer. 79 * The associated buffer is allocated by the constructor using kmem_alloc(). 80 * For all other message sizes, dblk and its associated data is allocated 81 * as a single contiguous chunk of memory. 82 * Objects in these caches consist of a dblk plus its associated data. 83 * allocb() determines the nearest-size cache by table lookup: 84 * the dblk_cache[] array provides the mapping from size to dblk cache. 85 * 86 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 87 * kmem_alloc()'ing a buffer for the data and supplying that 88 * buffer to gesballoc(), described below. 89 * 90 * (3) The four flavors of [d]esballoc[a] are all implemented by a 91 * common routine, gesballoc() ("generic esballoc"). gesballoc() 92 * allocates a dblk from the global dblk_esb_cache and sets db_base, 93 * db_lim and db_frtnp to describe the caller-supplied buffer. 94 * 95 * While there are several routines to allocate messages, there is only 96 * one routine to free messages: freeb(). freeb() simply invokes the 97 * dblk's free method, dbp->db_free(), which is set at allocation time. 98 * 99 * dupb() creates a new reference to a message by allocating a new mblk, 100 * incrementing the dblk reference count and setting the dblk's free 101 * method to dblk_decref(). The dblk's original free method is retained 102 * in db_lastfree. dblk_decref() decrements the reference count on each 103 * freeb(). If this is not the last reference it just frees the mblk; 104 * if this *is* the last reference, it restores db_free to db_lastfree, 105 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 106 * 107 * The implementation makes aggressive use of kmem object caching for 108 * maximum performance. This makes the code simple and compact, but 109 * also a bit abstruse in some places. The invariants that constitute a 110 * message's constructed state, described below, are more subtle than usual. 111 * 112 * Every dblk has an "attached mblk" as part of its constructed state. 113 * The mblk is allocated by the dblk's constructor and remains attached 114 * until the message is either dup'ed or pulled up. In the dupb() case 115 * the mblk association doesn't matter until the last free, at which time 116 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 117 * the mblk association because it swaps the leading mblks of two messages, 118 * so it is responsible for swapping their db_mblk pointers accordingly. 119 * From a constructed-state viewpoint it doesn't matter that a dblk's 120 * attached mblk can change while the message is allocated; all that 121 * matters is that the dblk has *some* attached mblk when it's freed. 122 * 123 * The sizes of the allocb() small-message caches are not magical. 124 * They represent a good trade-off between internal and external 125 * fragmentation for current workloads. They should be reevaluated 126 * periodically, especially if allocations larger than DBLK_MAX_CACHE 127 * become common. We use 64-byte alignment so that dblks don't 128 * straddle cache lines unnecessarily. 129 */ 130 #define DBLK_MAX_CACHE 73728 131 #define DBLK_CACHE_ALIGN 64 132 #define DBLK_MIN_SIZE 8 133 #define DBLK_SIZE_SHIFT 3 134 135 #ifdef _BIG_ENDIAN 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 138 #else 139 #define DBLK_RTFU_SHIFT(field) \ 140 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 141 #endif 142 143 #define DBLK_RTFU(ref, type, flags, uioflag) \ 144 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 145 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 146 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 147 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 148 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 149 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 150 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 151 152 static size_t dblk_sizes[] = { 153 #ifdef _LP64 154 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920, 155 8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688, 156 40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456, 157 #else 158 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968, 159 8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736, 160 40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504, 161 #endif 162 DBLK_MAX_CACHE, 0 163 }; 164 165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 166 static struct kmem_cache *mblk_cache; 167 static struct kmem_cache *dblk_esb_cache; 168 static struct kmem_cache *fthdr_cache; 169 static struct kmem_cache *ftblk_cache; 170 171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 172 static mblk_t *allocb_oversize(size_t size, int flags); 173 static int allocb_tryhard_fails; 174 static void frnop_func(void *arg); 175 frtn_t frnop = { frnop_func }; 176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 177 178 static boolean_t rwnext_enter(queue_t *qp); 179 static void rwnext_exit(queue_t *qp); 180 181 /* 182 * Patchable mblk/dblk kmem_cache flags. 183 */ 184 int dblk_kmem_flags = 0; 185 int mblk_kmem_flags = 0; 186 187 188 static int 189 dblk_constructor(void *buf, void *cdrarg, int kmflags) 190 { 191 dblk_t *dbp = buf; 192 ssize_t msg_size = (ssize_t)cdrarg; 193 size_t index; 194 195 ASSERT(msg_size != 0); 196 197 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 198 199 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 200 201 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 202 return (-1); 203 if ((msg_size & PAGEOFFSET) == 0) { 204 dbp->db_base = kmem_alloc(msg_size, kmflags); 205 if (dbp->db_base == NULL) { 206 kmem_cache_free(mblk_cache, dbp->db_mblk); 207 return (-1); 208 } 209 } else { 210 dbp->db_base = (unsigned char *)&dbp[1]; 211 } 212 213 dbp->db_mblk->b_datap = dbp; 214 dbp->db_cache = dblk_cache[index]; 215 dbp->db_lim = dbp->db_base + msg_size; 216 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 217 dbp->db_frtnp = NULL; 218 dbp->db_fthdr = NULL; 219 dbp->db_credp = NULL; 220 dbp->db_cpid = -1; 221 dbp->db_struioflag = 0; 222 dbp->db_struioun.cksum.flags = 0; 223 return (0); 224 } 225 226 /*ARGSUSED*/ 227 static int 228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 229 { 230 dblk_t *dbp = buf; 231 232 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 233 return (-1); 234 dbp->db_mblk->b_datap = dbp; 235 dbp->db_cache = dblk_esb_cache; 236 dbp->db_fthdr = NULL; 237 dbp->db_credp = NULL; 238 dbp->db_cpid = -1; 239 dbp->db_struioflag = 0; 240 dbp->db_struioun.cksum.flags = 0; 241 return (0); 242 } 243 244 static int 245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 246 { 247 dblk_t *dbp = buf; 248 bcache_t *bcp = (bcache_t *)cdrarg; 249 250 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 251 return (-1); 252 253 if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache, 254 kmflags)) == NULL) { 255 kmem_cache_free(mblk_cache, dbp->db_mblk); 256 return (-1); 257 } 258 259 dbp->db_mblk->b_datap = dbp; 260 dbp->db_cache = (void *)bcp; 261 dbp->db_lim = dbp->db_base + bcp->size; 262 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 263 dbp->db_frtnp = NULL; 264 dbp->db_fthdr = NULL; 265 dbp->db_credp = NULL; 266 dbp->db_cpid = -1; 267 dbp->db_struioflag = 0; 268 dbp->db_struioun.cksum.flags = 0; 269 return (0); 270 } 271 272 /*ARGSUSED*/ 273 static void 274 dblk_destructor(void *buf, void *cdrarg) 275 { 276 dblk_t *dbp = buf; 277 ssize_t msg_size = (ssize_t)cdrarg; 278 279 ASSERT(dbp->db_mblk->b_datap == dbp); 280 281 ASSERT(msg_size != 0); 282 283 ASSERT(dbp->db_struioflag == 0); 284 ASSERT(dbp->db_struioun.cksum.flags == 0); 285 286 if ((msg_size & PAGEOFFSET) == 0) { 287 kmem_free(dbp->db_base, msg_size); 288 } 289 290 kmem_cache_free(mblk_cache, dbp->db_mblk); 291 } 292 293 static void 294 bcache_dblk_destructor(void *buf, void *cdrarg) 295 { 296 dblk_t *dbp = buf; 297 bcache_t *bcp = (bcache_t *)cdrarg; 298 299 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 300 301 ASSERT(dbp->db_mblk->b_datap == dbp); 302 303 ASSERT(dbp->db_struioflag == 0); 304 ASSERT(dbp->db_struioun.cksum.flags == 0); 305 306 kmem_cache_free(mblk_cache, dbp->db_mblk); 307 } 308 309 void 310 streams_msg_init(void) 311 { 312 char name[40]; 313 size_t size; 314 size_t lastsize = DBLK_MIN_SIZE; 315 size_t *sizep; 316 struct kmem_cache *cp; 317 size_t tot_size; 318 int offset; 319 320 mblk_cache = kmem_cache_create("streams_mblk", 321 sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL, 322 mblk_kmem_flags); 323 324 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 325 326 if ((offset = (size & PAGEOFFSET)) != 0) { 327 /* 328 * We are in the middle of a page, dblk should 329 * be allocated on the same page 330 */ 331 tot_size = size + sizeof (dblk_t); 332 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 333 < PAGESIZE); 334 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 335 336 } else { 337 338 /* 339 * buf size is multiple of page size, dblk and 340 * buffer are allocated separately. 341 */ 342 343 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 344 tot_size = sizeof (dblk_t); 345 } 346 347 (void) sprintf(name, "streams_dblk_%ld", size); 348 cp = kmem_cache_create(name, tot_size, 349 DBLK_CACHE_ALIGN, dblk_constructor, 350 dblk_destructor, NULL, 351 (void *)(size), NULL, dblk_kmem_flags); 352 353 while (lastsize <= size) { 354 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 355 lastsize += DBLK_MIN_SIZE; 356 } 357 } 358 359 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", 360 sizeof (dblk_t), DBLK_CACHE_ALIGN, 361 dblk_esb_constructor, dblk_destructor, NULL, 362 (void *) sizeof (dblk_t), NULL, dblk_kmem_flags); 363 fthdr_cache = kmem_cache_create("streams_fthdr", 364 sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 365 ftblk_cache = kmem_cache_create("streams_ftblk", 366 sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 367 368 /* Initialize Multidata caches */ 369 mmd_init(); 370 } 371 372 /*ARGSUSED*/ 373 mblk_t * 374 allocb(size_t size, uint_t pri) 375 { 376 dblk_t *dbp; 377 mblk_t *mp; 378 size_t index; 379 380 index = (size - 1) >> DBLK_SIZE_SHIFT; 381 382 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 383 if (size != 0) { 384 mp = allocb_oversize(size, KM_NOSLEEP); 385 goto out; 386 } 387 index = 0; 388 } 389 390 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 391 mp = NULL; 392 goto out; 393 } 394 395 mp = dbp->db_mblk; 396 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 397 mp->b_next = mp->b_prev = mp->b_cont = NULL; 398 mp->b_rptr = mp->b_wptr = dbp->db_base; 399 mp->b_queue = NULL; 400 MBLK_BAND_FLAG_WORD(mp) = 0; 401 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 402 out: 403 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 404 405 return (mp); 406 } 407 408 mblk_t * 409 allocb_tmpl(size_t size, const mblk_t *tmpl) 410 { 411 mblk_t *mp = allocb(size, 0); 412 413 if (mp != NULL) { 414 cred_t *cr = DB_CRED(tmpl); 415 if (cr != NULL) 416 crhold(mp->b_datap->db_credp = cr); 417 DB_CPID(mp) = DB_CPID(tmpl); 418 DB_TYPE(mp) = DB_TYPE(tmpl); 419 } 420 return (mp); 421 } 422 423 mblk_t * 424 allocb_cred(size_t size, cred_t *cr) 425 { 426 mblk_t *mp = allocb(size, 0); 427 428 if (mp != NULL && cr != NULL) 429 crhold(mp->b_datap->db_credp = cr); 430 431 return (mp); 432 } 433 434 mblk_t * 435 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr) 436 { 437 mblk_t *mp = allocb_wait(size, 0, flags, error); 438 439 if (mp != NULL && cr != NULL) 440 crhold(mp->b_datap->db_credp = cr); 441 442 return (mp); 443 } 444 445 void 446 freeb(mblk_t *mp) 447 { 448 dblk_t *dbp = mp->b_datap; 449 450 ASSERT(dbp->db_ref > 0); 451 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 452 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 453 454 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 455 456 dbp->db_free(mp, dbp); 457 } 458 459 void 460 freemsg(mblk_t *mp) 461 { 462 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 463 while (mp) { 464 dblk_t *dbp = mp->b_datap; 465 mblk_t *mp_cont = mp->b_cont; 466 467 ASSERT(dbp->db_ref > 0); 468 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 469 470 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 471 472 dbp->db_free(mp, dbp); 473 mp = mp_cont; 474 } 475 } 476 477 /* 478 * Reallocate a block for another use. Try hard to use the old block. 479 * If the old data is wanted (copy), leave b_wptr at the end of the data, 480 * otherwise return b_wptr = b_rptr. 481 * 482 * This routine is private and unstable. 483 */ 484 mblk_t * 485 reallocb(mblk_t *mp, size_t size, uint_t copy) 486 { 487 mblk_t *mp1; 488 unsigned char *old_rptr; 489 ptrdiff_t cur_size; 490 491 if (mp == NULL) 492 return (allocb(size, BPRI_HI)); 493 494 cur_size = mp->b_wptr - mp->b_rptr; 495 old_rptr = mp->b_rptr; 496 497 ASSERT(mp->b_datap->db_ref != 0); 498 499 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 500 /* 501 * If the data is wanted and it will fit where it is, no 502 * work is required. 503 */ 504 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 505 return (mp); 506 507 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 508 mp1 = mp; 509 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 510 /* XXX other mp state could be copied too, db_flags ... ? */ 511 mp1->b_cont = mp->b_cont; 512 } else { 513 return (NULL); 514 } 515 516 if (copy) { 517 bcopy(old_rptr, mp1->b_rptr, cur_size); 518 mp1->b_wptr = mp1->b_rptr + cur_size; 519 } 520 521 if (mp != mp1) 522 freeb(mp); 523 524 return (mp1); 525 } 526 527 static void 528 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 529 { 530 ASSERT(dbp->db_mblk == mp); 531 if (dbp->db_fthdr != NULL) 532 str_ftfree(dbp); 533 534 /* set credp and projid to be 'unspecified' before returning to cache */ 535 if (dbp->db_credp != NULL) { 536 crfree(dbp->db_credp); 537 dbp->db_credp = NULL; 538 } 539 dbp->db_cpid = -1; 540 541 /* Reset the struioflag and the checksum flag fields */ 542 dbp->db_struioflag = 0; 543 dbp->db_struioun.cksum.flags = 0; 544 545 /* and the COOKED flag */ 546 dbp->db_flags &= ~DBLK_COOKED; 547 548 kmem_cache_free(dbp->db_cache, dbp); 549 } 550 551 static void 552 dblk_decref(mblk_t *mp, dblk_t *dbp) 553 { 554 if (dbp->db_ref != 1) { 555 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 556 -(1 << DBLK_RTFU_SHIFT(db_ref))); 557 /* 558 * atomic_add_32_nv() just decremented db_ref, so we no longer 559 * have a reference to the dblk, which means another thread 560 * could free it. Therefore we cannot examine the dblk to 561 * determine whether ours was the last reference. Instead, 562 * we extract the new and minimum reference counts from rtfu. 563 * Note that all we're really saying is "if (ref != refmin)". 564 */ 565 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 566 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 567 kmem_cache_free(mblk_cache, mp); 568 return; 569 } 570 } 571 dbp->db_mblk = mp; 572 dbp->db_free = dbp->db_lastfree; 573 dbp->db_lastfree(mp, dbp); 574 } 575 576 mblk_t * 577 dupb(mblk_t *mp) 578 { 579 dblk_t *dbp = mp->b_datap; 580 mblk_t *new_mp; 581 uint32_t oldrtfu, newrtfu; 582 583 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 584 goto out; 585 586 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 587 new_mp->b_rptr = mp->b_rptr; 588 new_mp->b_wptr = mp->b_wptr; 589 new_mp->b_datap = dbp; 590 new_mp->b_queue = NULL; 591 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 592 593 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 594 595 dbp->db_free = dblk_decref; 596 do { 597 ASSERT(dbp->db_ref > 0); 598 oldrtfu = DBLK_RTFU_WORD(dbp); 599 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 600 /* 601 * If db_ref is maxed out we can't dup this message anymore. 602 */ 603 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 604 kmem_cache_free(mblk_cache, new_mp); 605 new_mp = NULL; 606 goto out; 607 } 608 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 609 610 out: 611 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 612 return (new_mp); 613 } 614 615 static void 616 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 617 { 618 frtn_t *frp = dbp->db_frtnp; 619 620 ASSERT(dbp->db_mblk == mp); 621 frp->free_func(frp->free_arg); 622 if (dbp->db_fthdr != NULL) 623 str_ftfree(dbp); 624 625 /* set credp and projid to be 'unspecified' before returning to cache */ 626 if (dbp->db_credp != NULL) { 627 crfree(dbp->db_credp); 628 dbp->db_credp = NULL; 629 } 630 dbp->db_cpid = -1; 631 dbp->db_struioflag = 0; 632 dbp->db_struioun.cksum.flags = 0; 633 634 kmem_cache_free(dbp->db_cache, dbp); 635 } 636 637 /*ARGSUSED*/ 638 static void 639 frnop_func(void *arg) 640 { 641 } 642 643 /* 644 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 645 */ 646 static mblk_t * 647 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 648 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 649 { 650 dblk_t *dbp; 651 mblk_t *mp; 652 653 ASSERT(base != NULL && frp != NULL); 654 655 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 656 mp = NULL; 657 goto out; 658 } 659 660 mp = dbp->db_mblk; 661 dbp->db_base = base; 662 dbp->db_lim = base + size; 663 dbp->db_free = dbp->db_lastfree = lastfree; 664 dbp->db_frtnp = frp; 665 DBLK_RTFU_WORD(dbp) = db_rtfu; 666 mp->b_next = mp->b_prev = mp->b_cont = NULL; 667 mp->b_rptr = mp->b_wptr = base; 668 mp->b_queue = NULL; 669 MBLK_BAND_FLAG_WORD(mp) = 0; 670 671 out: 672 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 673 return (mp); 674 } 675 676 /*ARGSUSED*/ 677 mblk_t * 678 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 679 { 680 mblk_t *mp; 681 682 /* 683 * Note that this is structured to allow the common case (i.e. 684 * STREAMS flowtracing disabled) to call gesballoc() with tail 685 * call optimization. 686 */ 687 if (!str_ftnever) { 688 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 689 frp, freebs_enqueue, KM_NOSLEEP); 690 691 if (mp != NULL) 692 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 693 return (mp); 694 } 695 696 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 697 frp, freebs_enqueue, KM_NOSLEEP)); 698 } 699 700 /* 701 * Same as esballoc() but sleeps waiting for memory. 702 */ 703 /*ARGSUSED*/ 704 mblk_t * 705 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 706 { 707 mblk_t *mp; 708 709 /* 710 * Note that this is structured to allow the common case (i.e. 711 * STREAMS flowtracing disabled) to call gesballoc() with tail 712 * call optimization. 713 */ 714 if (!str_ftnever) { 715 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 716 frp, freebs_enqueue, KM_SLEEP); 717 718 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 719 return (mp); 720 } 721 722 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 723 frp, freebs_enqueue, KM_SLEEP)); 724 } 725 726 /*ARGSUSED*/ 727 mblk_t * 728 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 729 { 730 mblk_t *mp; 731 732 /* 733 * Note that this is structured to allow the common case (i.e. 734 * STREAMS flowtracing disabled) to call gesballoc() with tail 735 * call optimization. 736 */ 737 if (!str_ftnever) { 738 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 739 frp, dblk_lastfree_desb, KM_NOSLEEP); 740 741 if (mp != NULL) 742 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 743 return (mp); 744 } 745 746 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 747 frp, dblk_lastfree_desb, KM_NOSLEEP)); 748 } 749 750 /*ARGSUSED*/ 751 mblk_t * 752 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 753 { 754 mblk_t *mp; 755 756 /* 757 * Note that this is structured to allow the common case (i.e. 758 * STREAMS flowtracing disabled) to call gesballoc() with tail 759 * call optimization. 760 */ 761 if (!str_ftnever) { 762 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 763 frp, freebs_enqueue, KM_NOSLEEP); 764 765 if (mp != NULL) 766 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 767 return (mp); 768 } 769 770 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 771 frp, freebs_enqueue, KM_NOSLEEP)); 772 } 773 774 /*ARGSUSED*/ 775 mblk_t * 776 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 777 { 778 mblk_t *mp; 779 780 /* 781 * Note that this is structured to allow the common case (i.e. 782 * STREAMS flowtracing disabled) to call gesballoc() with tail 783 * call optimization. 784 */ 785 if (!str_ftnever) { 786 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 787 frp, dblk_lastfree_desb, KM_NOSLEEP); 788 789 if (mp != NULL) 790 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 791 return (mp); 792 } 793 794 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 795 frp, dblk_lastfree_desb, KM_NOSLEEP)); 796 } 797 798 static void 799 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 800 { 801 bcache_t *bcp = dbp->db_cache; 802 803 ASSERT(dbp->db_mblk == mp); 804 if (dbp->db_fthdr != NULL) 805 str_ftfree(dbp); 806 807 /* set credp and projid to be 'unspecified' before returning to cache */ 808 if (dbp->db_credp != NULL) { 809 crfree(dbp->db_credp); 810 dbp->db_credp = NULL; 811 } 812 dbp->db_cpid = -1; 813 dbp->db_struioflag = 0; 814 dbp->db_struioun.cksum.flags = 0; 815 816 mutex_enter(&bcp->mutex); 817 kmem_cache_free(bcp->dblk_cache, dbp); 818 bcp->alloc--; 819 820 if (bcp->alloc == 0 && bcp->destroy != 0) { 821 kmem_cache_destroy(bcp->dblk_cache); 822 kmem_cache_destroy(bcp->buffer_cache); 823 mutex_exit(&bcp->mutex); 824 mutex_destroy(&bcp->mutex); 825 kmem_free(bcp, sizeof (bcache_t)); 826 } else { 827 mutex_exit(&bcp->mutex); 828 } 829 } 830 831 bcache_t * 832 bcache_create(char *name, size_t size, uint_t align) 833 { 834 bcache_t *bcp; 835 char buffer[255]; 836 837 ASSERT((align & (align - 1)) == 0); 838 839 if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == 840 NULL) { 841 return (NULL); 842 } 843 844 bcp->size = size; 845 bcp->align = align; 846 bcp->alloc = 0; 847 bcp->destroy = 0; 848 849 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 850 851 (void) sprintf(buffer, "%s_buffer_cache", name); 852 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 853 NULL, NULL, NULL, 0); 854 (void) sprintf(buffer, "%s_dblk_cache", name); 855 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 856 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 857 NULL, (void *)bcp, NULL, 0); 858 859 return (bcp); 860 } 861 862 void 863 bcache_destroy(bcache_t *bcp) 864 { 865 ASSERT(bcp != NULL); 866 867 mutex_enter(&bcp->mutex); 868 if (bcp->alloc == 0) { 869 kmem_cache_destroy(bcp->dblk_cache); 870 kmem_cache_destroy(bcp->buffer_cache); 871 mutex_exit(&bcp->mutex); 872 mutex_destroy(&bcp->mutex); 873 kmem_free(bcp, sizeof (bcache_t)); 874 } else { 875 bcp->destroy++; 876 mutex_exit(&bcp->mutex); 877 } 878 } 879 880 /*ARGSUSED*/ 881 mblk_t * 882 bcache_allocb(bcache_t *bcp, uint_t pri) 883 { 884 dblk_t *dbp; 885 mblk_t *mp = NULL; 886 887 ASSERT(bcp != NULL); 888 889 mutex_enter(&bcp->mutex); 890 if (bcp->destroy != 0) { 891 mutex_exit(&bcp->mutex); 892 goto out; 893 } 894 895 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 896 mutex_exit(&bcp->mutex); 897 goto out; 898 } 899 bcp->alloc++; 900 mutex_exit(&bcp->mutex); 901 902 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 903 904 mp = dbp->db_mblk; 905 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 906 mp->b_next = mp->b_prev = mp->b_cont = NULL; 907 mp->b_rptr = mp->b_wptr = dbp->db_base; 908 mp->b_queue = NULL; 909 MBLK_BAND_FLAG_WORD(mp) = 0; 910 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 911 out: 912 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 913 914 return (mp); 915 } 916 917 static void 918 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 919 { 920 ASSERT(dbp->db_mblk == mp); 921 if (dbp->db_fthdr != NULL) 922 str_ftfree(dbp); 923 924 /* set credp and projid to be 'unspecified' before returning to cache */ 925 if (dbp->db_credp != NULL) { 926 crfree(dbp->db_credp); 927 dbp->db_credp = NULL; 928 } 929 dbp->db_cpid = -1; 930 dbp->db_struioflag = 0; 931 dbp->db_struioun.cksum.flags = 0; 932 933 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 934 kmem_cache_free(dbp->db_cache, dbp); 935 } 936 937 static mblk_t * 938 allocb_oversize(size_t size, int kmflags) 939 { 940 mblk_t *mp; 941 void *buf; 942 943 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 944 if ((buf = kmem_alloc(size, kmflags)) == NULL) 945 return (NULL); 946 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 947 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 948 kmem_free(buf, size); 949 950 if (mp != NULL) 951 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 952 953 return (mp); 954 } 955 956 mblk_t * 957 allocb_tryhard(size_t target_size) 958 { 959 size_t size; 960 mblk_t *bp; 961 962 for (size = target_size; size < target_size + 512; 963 size += DBLK_CACHE_ALIGN) 964 if ((bp = allocb(size, BPRI_HI)) != NULL) 965 return (bp); 966 allocb_tryhard_fails++; 967 return (NULL); 968 } 969 970 /* 971 * This routine is consolidation private for STREAMS internal use 972 * This routine may only be called from sync routines (i.e., not 973 * from put or service procedures). It is located here (rather 974 * than strsubr.c) so that we don't have to expose all of the 975 * allocb() implementation details in header files. 976 */ 977 mblk_t * 978 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 979 { 980 dblk_t *dbp; 981 mblk_t *mp; 982 size_t index; 983 984 index = (size -1) >> DBLK_SIZE_SHIFT; 985 986 if (flags & STR_NOSIG) { 987 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 988 if (size != 0) { 989 mp = allocb_oversize(size, KM_SLEEP); 990 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 991 (uintptr_t)mp); 992 return (mp); 993 } 994 index = 0; 995 } 996 997 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 998 mp = dbp->db_mblk; 999 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1000 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1001 mp->b_rptr = mp->b_wptr = dbp->db_base; 1002 mp->b_queue = NULL; 1003 MBLK_BAND_FLAG_WORD(mp) = 0; 1004 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1005 1006 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1007 1008 } else { 1009 while ((mp = allocb(size, pri)) == NULL) { 1010 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1011 return (NULL); 1012 } 1013 } 1014 1015 return (mp); 1016 } 1017 1018 /* 1019 * Call function 'func' with 'arg' when a class zero block can 1020 * be allocated with priority 'pri'. 1021 */ 1022 bufcall_id_t 1023 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1024 { 1025 return (bufcall(1, pri, func, arg)); 1026 } 1027 1028 /* 1029 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1030 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1031 * This provides consistency for all internal allocators of ioctl. 1032 */ 1033 mblk_t * 1034 mkiocb(uint_t cmd) 1035 { 1036 struct iocblk *ioc; 1037 mblk_t *mp; 1038 1039 /* 1040 * Allocate enough space for any of the ioctl related messages. 1041 */ 1042 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1043 return (NULL); 1044 1045 bzero(mp->b_rptr, sizeof (union ioctypes)); 1046 1047 /* 1048 * Set the mblk_t information and ptrs correctly. 1049 */ 1050 mp->b_wptr += sizeof (struct iocblk); 1051 mp->b_datap->db_type = M_IOCTL; 1052 1053 /* 1054 * Fill in the fields. 1055 */ 1056 ioc = (struct iocblk *)mp->b_rptr; 1057 ioc->ioc_cmd = cmd; 1058 ioc->ioc_cr = kcred; 1059 ioc->ioc_id = getiocseqno(); 1060 ioc->ioc_flag = IOC_NATIVE; 1061 return (mp); 1062 } 1063 1064 /* 1065 * test if block of given size can be allocated with a request of 1066 * the given priority. 1067 * 'pri' is no longer used, but is retained for compatibility. 1068 */ 1069 /* ARGSUSED */ 1070 int 1071 testb(size_t size, uint_t pri) 1072 { 1073 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1074 } 1075 1076 /* 1077 * Call function 'func' with argument 'arg' when there is a reasonably 1078 * good chance that a block of size 'size' can be allocated. 1079 * 'pri' is no longer used, but is retained for compatibility. 1080 */ 1081 /* ARGSUSED */ 1082 bufcall_id_t 1083 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1084 { 1085 static long bid = 1; /* always odd to save checking for zero */ 1086 bufcall_id_t bc_id; 1087 struct strbufcall *bcp; 1088 1089 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1090 return (0); 1091 1092 bcp->bc_func = func; 1093 bcp->bc_arg = arg; 1094 bcp->bc_size = size; 1095 bcp->bc_next = NULL; 1096 bcp->bc_executor = NULL; 1097 1098 mutex_enter(&strbcall_lock); 1099 /* 1100 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1101 * should be no references to bcp since it may be freed by 1102 * runbufcalls(). Since bcp_id field is returned, we save its value in 1103 * the local var. 1104 */ 1105 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1106 1107 /* 1108 * add newly allocated stream event to existing 1109 * linked list of events. 1110 */ 1111 if (strbcalls.bc_head == NULL) { 1112 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1113 } else { 1114 strbcalls.bc_tail->bc_next = bcp; 1115 strbcalls.bc_tail = bcp; 1116 } 1117 1118 cv_signal(&strbcall_cv); 1119 mutex_exit(&strbcall_lock); 1120 return (bc_id); 1121 } 1122 1123 /* 1124 * Cancel a bufcall request. 1125 */ 1126 void 1127 unbufcall(bufcall_id_t id) 1128 { 1129 strbufcall_t *bcp, *pbcp; 1130 1131 mutex_enter(&strbcall_lock); 1132 again: 1133 pbcp = NULL; 1134 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1135 if (id == bcp->bc_id) 1136 break; 1137 pbcp = bcp; 1138 } 1139 if (bcp) { 1140 if (bcp->bc_executor != NULL) { 1141 if (bcp->bc_executor != curthread) { 1142 cv_wait(&bcall_cv, &strbcall_lock); 1143 goto again; 1144 } 1145 } else { 1146 if (pbcp) 1147 pbcp->bc_next = bcp->bc_next; 1148 else 1149 strbcalls.bc_head = bcp->bc_next; 1150 if (bcp == strbcalls.bc_tail) 1151 strbcalls.bc_tail = pbcp; 1152 kmem_free(bcp, sizeof (strbufcall_t)); 1153 } 1154 } 1155 mutex_exit(&strbcall_lock); 1156 } 1157 1158 /* 1159 * Duplicate a message block by block (uses dupb), returning 1160 * a pointer to the duplicate message. 1161 * Returns a non-NULL value only if the entire message 1162 * was dup'd. 1163 */ 1164 mblk_t * 1165 dupmsg(mblk_t *bp) 1166 { 1167 mblk_t *head, *nbp; 1168 1169 if (!bp || !(nbp = head = dupb(bp))) 1170 return (NULL); 1171 1172 while (bp->b_cont) { 1173 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1174 freemsg(head); 1175 return (NULL); 1176 } 1177 nbp = nbp->b_cont; 1178 bp = bp->b_cont; 1179 } 1180 return (head); 1181 } 1182 1183 #define DUPB_NOLOAN(bp) \ 1184 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1185 copyb((bp)) : dupb((bp))) 1186 1187 mblk_t * 1188 dupmsg_noloan(mblk_t *bp) 1189 { 1190 mblk_t *head, *nbp; 1191 1192 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1193 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1194 return (NULL); 1195 1196 while (bp->b_cont) { 1197 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1198 freemsg(head); 1199 return (NULL); 1200 } 1201 nbp = nbp->b_cont; 1202 bp = bp->b_cont; 1203 } 1204 return (head); 1205 } 1206 1207 /* 1208 * Copy data from message and data block to newly allocated message and 1209 * data block. Returns new message block pointer, or NULL if error. 1210 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1211 * as in the original even when db_base is not word aligned. (bug 1052877) 1212 */ 1213 mblk_t * 1214 copyb(mblk_t *bp) 1215 { 1216 mblk_t *nbp; 1217 dblk_t *dp, *ndp; 1218 uchar_t *base; 1219 size_t size; 1220 size_t unaligned; 1221 1222 ASSERT(bp->b_wptr >= bp->b_rptr); 1223 1224 dp = bp->b_datap; 1225 if (dp->db_fthdr != NULL) 1226 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1227 1228 /* 1229 * Special handling for Multidata message; this should be 1230 * removed once a copy-callback routine is made available. 1231 */ 1232 if (dp->db_type == M_MULTIDATA) { 1233 cred_t *cr; 1234 1235 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1236 return (NULL); 1237 1238 nbp->b_flag = bp->b_flag; 1239 nbp->b_band = bp->b_band; 1240 ndp = nbp->b_datap; 1241 1242 /* See comments below on potential issues. */ 1243 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1244 1245 ASSERT(ndp->db_type == dp->db_type); 1246 cr = dp->db_credp; 1247 if (cr != NULL) 1248 crhold(ndp->db_credp = cr); 1249 ndp->db_cpid = dp->db_cpid; 1250 return (nbp); 1251 } 1252 1253 size = dp->db_lim - dp->db_base; 1254 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1255 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1256 return (NULL); 1257 nbp->b_flag = bp->b_flag; 1258 nbp->b_band = bp->b_band; 1259 ndp = nbp->b_datap; 1260 1261 /* 1262 * Well, here is a potential issue. If we are trying to 1263 * trace a flow, and we copy the message, we might lose 1264 * information about where this message might have been. 1265 * So we should inherit the FT data. On the other hand, 1266 * a user might be interested only in alloc to free data. 1267 * So I guess the real answer is to provide a tunable. 1268 */ 1269 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1270 1271 base = ndp->db_base + unaligned; 1272 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1273 1274 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1275 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1276 1277 return (nbp); 1278 } 1279 1280 /* 1281 * Copy data from message to newly allocated message using new 1282 * data blocks. Returns a pointer to the new message, or NULL if error. 1283 */ 1284 mblk_t * 1285 copymsg(mblk_t *bp) 1286 { 1287 mblk_t *head, *nbp; 1288 1289 if (!bp || !(nbp = head = copyb(bp))) 1290 return (NULL); 1291 1292 while (bp->b_cont) { 1293 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1294 freemsg(head); 1295 return (NULL); 1296 } 1297 nbp = nbp->b_cont; 1298 bp = bp->b_cont; 1299 } 1300 return (head); 1301 } 1302 1303 /* 1304 * link a message block to tail of message 1305 */ 1306 void 1307 linkb(mblk_t *mp, mblk_t *bp) 1308 { 1309 ASSERT(mp && bp); 1310 1311 for (; mp->b_cont; mp = mp->b_cont) 1312 ; 1313 mp->b_cont = bp; 1314 } 1315 1316 /* 1317 * unlink a message block from head of message 1318 * return pointer to new message. 1319 * NULL if message becomes empty. 1320 */ 1321 mblk_t * 1322 unlinkb(mblk_t *bp) 1323 { 1324 mblk_t *bp1; 1325 1326 bp1 = bp->b_cont; 1327 bp->b_cont = NULL; 1328 return (bp1); 1329 } 1330 1331 /* 1332 * remove a message block "bp" from message "mp" 1333 * 1334 * Return pointer to new message or NULL if no message remains. 1335 * Return -1 if bp is not found in message. 1336 */ 1337 mblk_t * 1338 rmvb(mblk_t *mp, mblk_t *bp) 1339 { 1340 mblk_t *tmp; 1341 mblk_t *lastp = NULL; 1342 1343 ASSERT(mp && bp); 1344 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1345 if (tmp == bp) { 1346 if (lastp) 1347 lastp->b_cont = tmp->b_cont; 1348 else 1349 mp = tmp->b_cont; 1350 tmp->b_cont = NULL; 1351 return (mp); 1352 } 1353 lastp = tmp; 1354 } 1355 return ((mblk_t *)-1); 1356 } 1357 1358 /* 1359 * Concatenate and align first len bytes of common 1360 * message type. Len == -1, means concat everything. 1361 * Returns 1 on success, 0 on failure 1362 * After the pullup, mp points to the pulled up data. 1363 */ 1364 int 1365 pullupmsg(mblk_t *mp, ssize_t len) 1366 { 1367 mblk_t *bp, *b_cont; 1368 dblk_t *dbp; 1369 ssize_t n; 1370 1371 ASSERT(mp->b_datap->db_ref > 0); 1372 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1373 1374 /* 1375 * We won't handle Multidata message, since it contains 1376 * metadata which this function has no knowledge of; we 1377 * assert on DEBUG, and return failure otherwise. 1378 */ 1379 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1380 if (mp->b_datap->db_type == M_MULTIDATA) 1381 return (0); 1382 1383 if (len == -1) { 1384 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1385 return (1); 1386 len = xmsgsize(mp); 1387 } else { 1388 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1389 ASSERT(first_mblk_len >= 0); 1390 /* 1391 * If the length is less than that of the first mblk, 1392 * we want to pull up the message into an aligned mblk. 1393 * Though not part of the spec, some callers assume it. 1394 */ 1395 if (len <= first_mblk_len) { 1396 if (str_aligned(mp->b_rptr)) 1397 return (1); 1398 len = first_mblk_len; 1399 } else if (xmsgsize(mp) < len) 1400 return (0); 1401 } 1402 1403 if ((bp = allocb_tmpl(len, mp)) == NULL) 1404 return (0); 1405 1406 dbp = bp->b_datap; 1407 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1408 mp->b_datap = dbp; /* ... and mp heads the new message */ 1409 mp->b_datap->db_mblk = mp; 1410 bp->b_datap->db_mblk = bp; 1411 mp->b_rptr = mp->b_wptr = dbp->db_base; 1412 1413 do { 1414 ASSERT(bp->b_datap->db_ref > 0); 1415 ASSERT(bp->b_wptr >= bp->b_rptr); 1416 n = MIN(bp->b_wptr - bp->b_rptr, len); 1417 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1418 mp->b_wptr += n; 1419 bp->b_rptr += n; 1420 len -= n; 1421 if (bp->b_rptr != bp->b_wptr) 1422 break; 1423 b_cont = bp->b_cont; 1424 freeb(bp); 1425 bp = b_cont; 1426 } while (len && bp); 1427 1428 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1429 1430 return (1); 1431 } 1432 1433 /* 1434 * Concatenate and align at least the first len bytes of common message 1435 * type. Len == -1 means concatenate everything. The original message is 1436 * unaltered. Returns a pointer to a new message on success, otherwise 1437 * returns NULL. 1438 */ 1439 mblk_t * 1440 msgpullup(mblk_t *mp, ssize_t len) 1441 { 1442 mblk_t *newmp; 1443 ssize_t totlen; 1444 ssize_t n; 1445 1446 /* 1447 * We won't handle Multidata message, since it contains 1448 * metadata which this function has no knowledge of; we 1449 * assert on DEBUG, and return failure otherwise. 1450 */ 1451 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1452 if (mp->b_datap->db_type == M_MULTIDATA) 1453 return (NULL); 1454 1455 totlen = xmsgsize(mp); 1456 1457 if ((len > 0) && (len > totlen)) 1458 return (NULL); 1459 1460 /* 1461 * Copy all of the first msg type into one new mblk, then dupmsg 1462 * and link the rest onto this. 1463 */ 1464 1465 len = totlen; 1466 1467 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1468 return (NULL); 1469 1470 newmp->b_flag = mp->b_flag; 1471 newmp->b_band = mp->b_band; 1472 1473 while (len > 0) { 1474 n = mp->b_wptr - mp->b_rptr; 1475 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1476 if (n > 0) 1477 bcopy(mp->b_rptr, newmp->b_wptr, n); 1478 newmp->b_wptr += n; 1479 len -= n; 1480 mp = mp->b_cont; 1481 } 1482 1483 if (mp != NULL) { 1484 newmp->b_cont = dupmsg(mp); 1485 if (newmp->b_cont == NULL) { 1486 freemsg(newmp); 1487 return (NULL); 1488 } 1489 } 1490 1491 return (newmp); 1492 } 1493 1494 /* 1495 * Trim bytes from message 1496 * len > 0, trim from head 1497 * len < 0, trim from tail 1498 * Returns 1 on success, 0 on failure. 1499 */ 1500 int 1501 adjmsg(mblk_t *mp, ssize_t len) 1502 { 1503 mblk_t *bp; 1504 mblk_t *save_bp = NULL; 1505 mblk_t *prev_bp; 1506 mblk_t *bcont; 1507 unsigned char type; 1508 ssize_t n; 1509 int fromhead; 1510 int first; 1511 1512 ASSERT(mp != NULL); 1513 /* 1514 * We won't handle Multidata message, since it contains 1515 * metadata which this function has no knowledge of; we 1516 * assert on DEBUG, and return failure otherwise. 1517 */ 1518 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1519 if (mp->b_datap->db_type == M_MULTIDATA) 1520 return (0); 1521 1522 if (len < 0) { 1523 fromhead = 0; 1524 len = -len; 1525 } else { 1526 fromhead = 1; 1527 } 1528 1529 if (xmsgsize(mp) < len) 1530 return (0); 1531 1532 1533 if (fromhead) { 1534 first = 1; 1535 while (len) { 1536 ASSERT(mp->b_wptr >= mp->b_rptr); 1537 n = MIN(mp->b_wptr - mp->b_rptr, len); 1538 mp->b_rptr += n; 1539 len -= n; 1540 1541 /* 1542 * If this is not the first zero length 1543 * message remove it 1544 */ 1545 if (!first && (mp->b_wptr == mp->b_rptr)) { 1546 bcont = mp->b_cont; 1547 freeb(mp); 1548 mp = save_bp->b_cont = bcont; 1549 } else { 1550 save_bp = mp; 1551 mp = mp->b_cont; 1552 } 1553 first = 0; 1554 } 1555 } else { 1556 type = mp->b_datap->db_type; 1557 while (len) { 1558 bp = mp; 1559 save_bp = NULL; 1560 1561 /* 1562 * Find the last message of same type 1563 */ 1564 1565 while (bp && bp->b_datap->db_type == type) { 1566 ASSERT(bp->b_wptr >= bp->b_rptr); 1567 prev_bp = save_bp; 1568 save_bp = bp; 1569 bp = bp->b_cont; 1570 } 1571 if (save_bp == NULL) 1572 break; 1573 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1574 save_bp->b_wptr -= n; 1575 len -= n; 1576 1577 /* 1578 * If this is not the first message 1579 * and we have taken away everything 1580 * from this message, remove it 1581 */ 1582 1583 if ((save_bp != mp) && 1584 (save_bp->b_wptr == save_bp->b_rptr)) { 1585 bcont = save_bp->b_cont; 1586 freeb(save_bp); 1587 prev_bp->b_cont = bcont; 1588 } 1589 } 1590 } 1591 return (1); 1592 } 1593 1594 /* 1595 * get number of data bytes in message 1596 */ 1597 size_t 1598 msgdsize(mblk_t *bp) 1599 { 1600 size_t count = 0; 1601 1602 for (; bp; bp = bp->b_cont) 1603 if (bp->b_datap->db_type == M_DATA) { 1604 ASSERT(bp->b_wptr >= bp->b_rptr); 1605 count += bp->b_wptr - bp->b_rptr; 1606 } 1607 return (count); 1608 } 1609 1610 /* 1611 * Get a message off head of queue 1612 * 1613 * If queue has no buffers then mark queue 1614 * with QWANTR. (queue wants to be read by 1615 * someone when data becomes available) 1616 * 1617 * If there is something to take off then do so. 1618 * If queue falls below hi water mark turn off QFULL 1619 * flag. Decrement weighted count of queue. 1620 * Also turn off QWANTR because queue is being read. 1621 * 1622 * The queue count is maintained on a per-band basis. 1623 * Priority band 0 (normal messages) uses q_count, 1624 * q_lowat, etc. Non-zero priority bands use the 1625 * fields in their respective qband structures 1626 * (qb_count, qb_lowat, etc.) All messages appear 1627 * on the same list, linked via their b_next pointers. 1628 * q_first is the head of the list. q_count does 1629 * not reflect the size of all the messages on the 1630 * queue. It only reflects those messages in the 1631 * normal band of flow. The one exception to this 1632 * deals with high priority messages. They are in 1633 * their own conceptual "band", but are accounted 1634 * against q_count. 1635 * 1636 * If queue count is below the lo water mark and QWANTW 1637 * is set, enable the closest backq which has a service 1638 * procedure and turn off the QWANTW flag. 1639 * 1640 * getq could be built on top of rmvq, but isn't because 1641 * of performance considerations. 1642 * 1643 * A note on the use of q_count and q_mblkcnt: 1644 * q_count is the traditional byte count for messages that 1645 * have been put on a queue. Documentation tells us that 1646 * we shouldn't rely on that count, but some drivers/modules 1647 * do. What was needed, however, is a mechanism to prevent 1648 * runaway streams from consuming all of the resources, 1649 * and particularly be able to flow control zero-length 1650 * messages. q_mblkcnt is used for this purpose. It 1651 * counts the number of mblk's that are being put on 1652 * the queue. The intention here, is that each mblk should 1653 * contain one byte of data and, for the purpose of 1654 * flow-control, logically does. A queue will become 1655 * full when EITHER of these values (q_count and q_mblkcnt) 1656 * reach the highwater mark. It will clear when BOTH 1657 * of them drop below the highwater mark. And it will 1658 * backenable when BOTH of them drop below the lowwater 1659 * mark. 1660 * With this algorithm, a driver/module might be able 1661 * to find a reasonably accurate q_count, and the 1662 * framework can still try and limit resource usage. 1663 */ 1664 mblk_t * 1665 getq(queue_t *q) 1666 { 1667 mblk_t *bp; 1668 uchar_t band = 0; 1669 1670 bp = getq_noenab(q); 1671 if (bp != NULL) 1672 band = bp->b_band; 1673 1674 /* 1675 * Inlined from qbackenable(). 1676 * Quick check without holding the lock. 1677 */ 1678 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1679 return (bp); 1680 1681 qbackenable(q, band); 1682 return (bp); 1683 } 1684 1685 /* 1686 * Calculate number of data bytes in a single data message block taking 1687 * multidata messages into account. 1688 */ 1689 1690 #define ADD_MBLK_SIZE(mp, size) \ 1691 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1692 (size) += MBLKL(mp); \ 1693 } else { \ 1694 uint_t pinuse; \ 1695 \ 1696 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1697 (size) += pinuse; \ 1698 } 1699 1700 /* 1701 * Like getq() but does not backenable. This is used by the stream 1702 * head when a putback() is likely. The caller must call qbackenable() 1703 * after it is done with accessing the queue. 1704 */ 1705 mblk_t * 1706 getq_noenab(queue_t *q) 1707 { 1708 mblk_t *bp; 1709 mblk_t *tmp; 1710 qband_t *qbp; 1711 kthread_id_t freezer; 1712 int bytecnt = 0, mblkcnt = 0; 1713 1714 /* freezestr should allow its caller to call getq/putq */ 1715 freezer = STREAM(q)->sd_freezer; 1716 if (freezer == curthread) { 1717 ASSERT(frozenstr(q)); 1718 ASSERT(MUTEX_HELD(QLOCK(q))); 1719 } else 1720 mutex_enter(QLOCK(q)); 1721 1722 if ((bp = q->q_first) == 0) { 1723 q->q_flag |= QWANTR; 1724 } else { 1725 if ((q->q_first = bp->b_next) == NULL) 1726 q->q_last = NULL; 1727 else 1728 q->q_first->b_prev = NULL; 1729 1730 /* Get message byte count for q_count accounting */ 1731 for (tmp = bp; tmp; tmp = tmp->b_cont) { 1732 ADD_MBLK_SIZE(tmp, bytecnt); 1733 mblkcnt++; 1734 } 1735 1736 if (bp->b_band == 0) { 1737 q->q_count -= bytecnt; 1738 q->q_mblkcnt -= mblkcnt; 1739 if ((q->q_count < q->q_hiwat) && 1740 (q->q_mblkcnt < q->q_hiwat)) { 1741 q->q_flag &= ~QFULL; 1742 } 1743 } else { 1744 int i; 1745 1746 ASSERT(bp->b_band <= q->q_nband); 1747 ASSERT(q->q_bandp != NULL); 1748 ASSERT(MUTEX_HELD(QLOCK(q))); 1749 qbp = q->q_bandp; 1750 i = bp->b_band; 1751 while (--i > 0) 1752 qbp = qbp->qb_next; 1753 if (qbp->qb_first == qbp->qb_last) { 1754 qbp->qb_first = NULL; 1755 qbp->qb_last = NULL; 1756 } else { 1757 qbp->qb_first = bp->b_next; 1758 } 1759 qbp->qb_count -= bytecnt; 1760 qbp->qb_mblkcnt -= mblkcnt; 1761 if ((qbp->qb_count < qbp->qb_hiwat) && 1762 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1763 qbp->qb_flag &= ~QB_FULL; 1764 } 1765 } 1766 q->q_flag &= ~QWANTR; 1767 bp->b_next = NULL; 1768 bp->b_prev = NULL; 1769 } 1770 if (freezer != curthread) 1771 mutex_exit(QLOCK(q)); 1772 1773 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 1774 1775 return (bp); 1776 } 1777 1778 /* 1779 * Determine if a backenable is needed after removing a message in the 1780 * specified band. 1781 * NOTE: This routine assumes that something like getq_noenab() has been 1782 * already called. 1783 * 1784 * For the read side it is ok to hold sd_lock across calling this (and the 1785 * stream head often does). 1786 * But for the write side strwakeq might be invoked and it acquires sd_lock. 1787 */ 1788 void 1789 qbackenable(queue_t *q, uchar_t band) 1790 { 1791 int backenab = 0; 1792 qband_t *qbp; 1793 kthread_id_t freezer; 1794 1795 ASSERT(q); 1796 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 1797 1798 /* 1799 * Quick check without holding the lock. 1800 * OK since after getq() has lowered the q_count these flags 1801 * would not change unless either the qbackenable() is done by 1802 * another thread (which is ok) or the queue has gotten QFULL 1803 * in which case another backenable will take place when the queue 1804 * drops below q_lowat. 1805 */ 1806 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1807 return; 1808 1809 /* freezestr should allow its caller to call getq/putq */ 1810 freezer = STREAM(q)->sd_freezer; 1811 if (freezer == curthread) { 1812 ASSERT(frozenstr(q)); 1813 ASSERT(MUTEX_HELD(QLOCK(q))); 1814 } else 1815 mutex_enter(QLOCK(q)); 1816 1817 if (band == 0) { 1818 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 1819 q->q_mblkcnt < q->q_lowat)) { 1820 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 1821 } 1822 } else { 1823 int i; 1824 1825 ASSERT((unsigned)band <= q->q_nband); 1826 ASSERT(q->q_bandp != NULL); 1827 1828 qbp = q->q_bandp; 1829 i = band; 1830 while (--i > 0) 1831 qbp = qbp->qb_next; 1832 1833 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 1834 qbp->qb_mblkcnt < qbp->qb_lowat)) { 1835 backenab = qbp->qb_flag & QB_WANTW; 1836 } 1837 } 1838 1839 if (backenab == 0) { 1840 if (freezer != curthread) 1841 mutex_exit(QLOCK(q)); 1842 return; 1843 } 1844 1845 /* Have to drop the lock across strwakeq and backenable */ 1846 if (backenab & QWANTWSYNC) 1847 q->q_flag &= ~QWANTWSYNC; 1848 if (backenab & (QWANTW|QB_WANTW)) { 1849 if (band != 0) 1850 qbp->qb_flag &= ~QB_WANTW; 1851 else { 1852 q->q_flag &= ~QWANTW; 1853 } 1854 } 1855 1856 if (freezer != curthread) 1857 mutex_exit(QLOCK(q)); 1858 1859 if (backenab & QWANTWSYNC) 1860 strwakeq(q, QWANTWSYNC); 1861 if (backenab & (QWANTW|QB_WANTW)) 1862 backenable(q, band); 1863 } 1864 1865 /* 1866 * Remove a message from a queue. The queue count and other 1867 * flow control parameters are adjusted and the back queue 1868 * enabled if necessary. 1869 * 1870 * rmvq can be called with the stream frozen, but other utility functions 1871 * holding QLOCK, and by streams modules without any locks/frozen. 1872 */ 1873 void 1874 rmvq(queue_t *q, mblk_t *mp) 1875 { 1876 ASSERT(mp != NULL); 1877 1878 rmvq_noenab(q, mp); 1879 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 1880 /* 1881 * qbackenable can handle a frozen stream but not a "random" 1882 * qlock being held. Drop lock across qbackenable. 1883 */ 1884 mutex_exit(QLOCK(q)); 1885 qbackenable(q, mp->b_band); 1886 mutex_enter(QLOCK(q)); 1887 } else { 1888 qbackenable(q, mp->b_band); 1889 } 1890 } 1891 1892 /* 1893 * Like rmvq() but without any backenabling. 1894 * This exists to handle SR_CONSOL_DATA in strrput(). 1895 */ 1896 void 1897 rmvq_noenab(queue_t *q, mblk_t *mp) 1898 { 1899 mblk_t *tmp; 1900 int i; 1901 qband_t *qbp = NULL; 1902 kthread_id_t freezer; 1903 int bytecnt = 0, mblkcnt = 0; 1904 1905 freezer = STREAM(q)->sd_freezer; 1906 if (freezer == curthread) { 1907 ASSERT(frozenstr(q)); 1908 ASSERT(MUTEX_HELD(QLOCK(q))); 1909 } else if (MUTEX_HELD(QLOCK(q))) { 1910 /* Don't drop lock on exit */ 1911 freezer = curthread; 1912 } else 1913 mutex_enter(QLOCK(q)); 1914 1915 ASSERT(mp->b_band <= q->q_nband); 1916 if (mp->b_band != 0) { /* Adjust band pointers */ 1917 ASSERT(q->q_bandp != NULL); 1918 qbp = q->q_bandp; 1919 i = mp->b_band; 1920 while (--i > 0) 1921 qbp = qbp->qb_next; 1922 if (mp == qbp->qb_first) { 1923 if (mp->b_next && mp->b_band == mp->b_next->b_band) 1924 qbp->qb_first = mp->b_next; 1925 else 1926 qbp->qb_first = NULL; 1927 } 1928 if (mp == qbp->qb_last) { 1929 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 1930 qbp->qb_last = mp->b_prev; 1931 else 1932 qbp->qb_last = NULL; 1933 } 1934 } 1935 1936 /* 1937 * Remove the message from the list. 1938 */ 1939 if (mp->b_prev) 1940 mp->b_prev->b_next = mp->b_next; 1941 else 1942 q->q_first = mp->b_next; 1943 if (mp->b_next) 1944 mp->b_next->b_prev = mp->b_prev; 1945 else 1946 q->q_last = mp->b_prev; 1947 mp->b_next = NULL; 1948 mp->b_prev = NULL; 1949 1950 /* Get the size of the message for q_count accounting */ 1951 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1952 ADD_MBLK_SIZE(tmp, bytecnt); 1953 mblkcnt++; 1954 } 1955 1956 if (mp->b_band == 0) { /* Perform q_count accounting */ 1957 q->q_count -= bytecnt; 1958 q->q_mblkcnt -= mblkcnt; 1959 if ((q->q_count < q->q_hiwat) && 1960 (q->q_mblkcnt < q->q_hiwat)) { 1961 q->q_flag &= ~QFULL; 1962 } 1963 } else { /* Perform qb_count accounting */ 1964 qbp->qb_count -= bytecnt; 1965 qbp->qb_mblkcnt -= mblkcnt; 1966 if ((qbp->qb_count < qbp->qb_hiwat) && 1967 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1968 qbp->qb_flag &= ~QB_FULL; 1969 } 1970 } 1971 if (freezer != curthread) 1972 mutex_exit(QLOCK(q)); 1973 1974 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 1975 } 1976 1977 /* 1978 * Empty a queue. 1979 * If flag is set, remove all messages. Otherwise, remove 1980 * only non-control messages. If queue falls below its low 1981 * water mark, and QWANTW is set, enable the nearest upstream 1982 * service procedure. 1983 * 1984 * Historical note: when merging the M_FLUSH code in strrput with this 1985 * code one difference was discovered. flushq did not have a check 1986 * for q_lowat == 0 in the backenabling test. 1987 * 1988 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 1989 * if one exists on the queue. 1990 */ 1991 void 1992 flushq_common(queue_t *q, int flag, int pcproto_flag) 1993 { 1994 mblk_t *mp, *nmp; 1995 qband_t *qbp; 1996 int backenab = 0; 1997 unsigned char bpri; 1998 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 1999 2000 if (q->q_first == NULL) 2001 return; 2002 2003 mutex_enter(QLOCK(q)); 2004 mp = q->q_first; 2005 q->q_first = NULL; 2006 q->q_last = NULL; 2007 q->q_count = 0; 2008 q->q_mblkcnt = 0; 2009 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2010 qbp->qb_first = NULL; 2011 qbp->qb_last = NULL; 2012 qbp->qb_count = 0; 2013 qbp->qb_mblkcnt = 0; 2014 qbp->qb_flag &= ~QB_FULL; 2015 } 2016 q->q_flag &= ~QFULL; 2017 mutex_exit(QLOCK(q)); 2018 while (mp) { 2019 nmp = mp->b_next; 2020 mp->b_next = mp->b_prev = NULL; 2021 2022 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2023 2024 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2025 (void) putq(q, mp); 2026 else if (flag || datamsg(mp->b_datap->db_type)) 2027 freemsg(mp); 2028 else 2029 (void) putq(q, mp); 2030 mp = nmp; 2031 } 2032 bpri = 1; 2033 mutex_enter(QLOCK(q)); 2034 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2035 if ((qbp->qb_flag & QB_WANTW) && 2036 (((qbp->qb_count < qbp->qb_lowat) && 2037 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2038 qbp->qb_lowat == 0)) { 2039 qbp->qb_flag &= ~QB_WANTW; 2040 backenab = 1; 2041 qbf[bpri] = 1; 2042 } else 2043 qbf[bpri] = 0; 2044 bpri++; 2045 } 2046 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2047 if ((q->q_flag & QWANTW) && 2048 (((q->q_count < q->q_lowat) && 2049 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2050 q->q_flag &= ~QWANTW; 2051 backenab = 1; 2052 qbf[0] = 1; 2053 } else 2054 qbf[0] = 0; 2055 2056 /* 2057 * If any band can now be written to, and there is a writer 2058 * for that band, then backenable the closest service procedure. 2059 */ 2060 if (backenab) { 2061 mutex_exit(QLOCK(q)); 2062 for (bpri = q->q_nband; bpri != 0; bpri--) 2063 if (qbf[bpri]) 2064 backenable(q, bpri); 2065 if (qbf[0]) 2066 backenable(q, 0); 2067 } else 2068 mutex_exit(QLOCK(q)); 2069 } 2070 2071 /* 2072 * The real flushing takes place in flushq_common. This is done so that 2073 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2074 * or not. Currently the only place that uses this flag is the stream head. 2075 */ 2076 void 2077 flushq(queue_t *q, int flag) 2078 { 2079 flushq_common(q, flag, 0); 2080 } 2081 2082 /* 2083 * Flush the queue of messages of the given priority band. 2084 * There is some duplication of code between flushq and flushband. 2085 * This is because we want to optimize the code as much as possible. 2086 * The assumption is that there will be more messages in the normal 2087 * (priority 0) band than in any other. 2088 * 2089 * Historical note: when merging the M_FLUSH code in strrput with this 2090 * code one difference was discovered. flushband had an extra check for 2091 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2092 * case. That check does not match the man page for flushband and was not 2093 * in the strrput flush code hence it was removed. 2094 */ 2095 void 2096 flushband(queue_t *q, unsigned char pri, int flag) 2097 { 2098 mblk_t *mp; 2099 mblk_t *nmp; 2100 mblk_t *last; 2101 qband_t *qbp; 2102 int band; 2103 2104 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2105 if (pri > q->q_nband) { 2106 return; 2107 } 2108 mutex_enter(QLOCK(q)); 2109 if (pri == 0) { 2110 mp = q->q_first; 2111 q->q_first = NULL; 2112 q->q_last = NULL; 2113 q->q_count = 0; 2114 q->q_mblkcnt = 0; 2115 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2116 qbp->qb_first = NULL; 2117 qbp->qb_last = NULL; 2118 qbp->qb_count = 0; 2119 qbp->qb_mblkcnt = 0; 2120 qbp->qb_flag &= ~QB_FULL; 2121 } 2122 q->q_flag &= ~QFULL; 2123 mutex_exit(QLOCK(q)); 2124 while (mp) { 2125 nmp = mp->b_next; 2126 mp->b_next = mp->b_prev = NULL; 2127 if ((mp->b_band == 0) && 2128 ((flag == FLUSHALL) || 2129 datamsg(mp->b_datap->db_type))) 2130 freemsg(mp); 2131 else 2132 (void) putq(q, mp); 2133 mp = nmp; 2134 } 2135 mutex_enter(QLOCK(q)); 2136 if ((q->q_flag & QWANTW) && 2137 (((q->q_count < q->q_lowat) && 2138 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2139 q->q_flag &= ~QWANTW; 2140 mutex_exit(QLOCK(q)); 2141 2142 backenable(q, pri); 2143 } else 2144 mutex_exit(QLOCK(q)); 2145 } else { /* pri != 0 */ 2146 boolean_t flushed = B_FALSE; 2147 band = pri; 2148 2149 ASSERT(MUTEX_HELD(QLOCK(q))); 2150 qbp = q->q_bandp; 2151 while (--band > 0) 2152 qbp = qbp->qb_next; 2153 mp = qbp->qb_first; 2154 if (mp == NULL) { 2155 mutex_exit(QLOCK(q)); 2156 return; 2157 } 2158 last = qbp->qb_last->b_next; 2159 /* 2160 * rmvq_noenab() and freemsg() are called for each mblk that 2161 * meets the criteria. The loop is executed until the last 2162 * mblk has been processed. 2163 */ 2164 while (mp != last) { 2165 ASSERT(mp->b_band == pri); 2166 nmp = mp->b_next; 2167 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2168 rmvq_noenab(q, mp); 2169 freemsg(mp); 2170 flushed = B_TRUE; 2171 } 2172 mp = nmp; 2173 } 2174 mutex_exit(QLOCK(q)); 2175 2176 /* 2177 * If any mblk(s) has been freed, we know that qbackenable() 2178 * will need to be called. 2179 */ 2180 if (flushed) 2181 qbackenable(q, pri); 2182 } 2183 } 2184 2185 /* 2186 * Return 1 if the queue is not full. If the queue is full, return 2187 * 0 (may not put message) and set QWANTW flag (caller wants to write 2188 * to the queue). 2189 */ 2190 int 2191 canput(queue_t *q) 2192 { 2193 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2194 2195 /* this is for loopback transports, they should not do a canput */ 2196 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2197 2198 /* Find next forward module that has a service procedure */ 2199 q = q->q_nfsrv; 2200 2201 if (!(q->q_flag & QFULL)) { 2202 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2203 return (1); 2204 } 2205 mutex_enter(QLOCK(q)); 2206 if (q->q_flag & QFULL) { 2207 q->q_flag |= QWANTW; 2208 mutex_exit(QLOCK(q)); 2209 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2210 return (0); 2211 } 2212 mutex_exit(QLOCK(q)); 2213 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2214 return (1); 2215 } 2216 2217 /* 2218 * This is the new canput for use with priority bands. Return 1 if the 2219 * band is not full. If the band is full, return 0 (may not put message) 2220 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2221 * write to the queue). 2222 */ 2223 int 2224 bcanput(queue_t *q, unsigned char pri) 2225 { 2226 qband_t *qbp; 2227 2228 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2229 if (!q) 2230 return (0); 2231 2232 /* Find next forward module that has a service procedure */ 2233 q = q->q_nfsrv; 2234 2235 mutex_enter(QLOCK(q)); 2236 if (pri == 0) { 2237 if (q->q_flag & QFULL) { 2238 q->q_flag |= QWANTW; 2239 mutex_exit(QLOCK(q)); 2240 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2241 "bcanput:%p %X %d", q, pri, 0); 2242 return (0); 2243 } 2244 } else { /* pri != 0 */ 2245 if (pri > q->q_nband) { 2246 /* 2247 * No band exists yet, so return success. 2248 */ 2249 mutex_exit(QLOCK(q)); 2250 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2251 "bcanput:%p %X %d", q, pri, 1); 2252 return (1); 2253 } 2254 qbp = q->q_bandp; 2255 while (--pri) 2256 qbp = qbp->qb_next; 2257 if (qbp->qb_flag & QB_FULL) { 2258 qbp->qb_flag |= QB_WANTW; 2259 mutex_exit(QLOCK(q)); 2260 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2261 "bcanput:%p %X %d", q, pri, 0); 2262 return (0); 2263 } 2264 } 2265 mutex_exit(QLOCK(q)); 2266 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2267 "bcanput:%p %X %d", q, pri, 1); 2268 return (1); 2269 } 2270 2271 /* 2272 * Put a message on a queue. 2273 * 2274 * Messages are enqueued on a priority basis. The priority classes 2275 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2276 * and B_NORMAL (type < QPCTL && band == 0). 2277 * 2278 * Add appropriate weighted data block sizes to queue count. 2279 * If queue hits high water mark then set QFULL flag. 2280 * 2281 * If QNOENAB is not set (putq is allowed to enable the queue), 2282 * enable the queue only if the message is PRIORITY, 2283 * or the QWANTR flag is set (indicating that the service procedure 2284 * is ready to read the queue. This implies that a service 2285 * procedure must NEVER put a high priority message back on its own 2286 * queue, as this would result in an infinite loop (!). 2287 */ 2288 int 2289 putq(queue_t *q, mblk_t *bp) 2290 { 2291 mblk_t *tmp; 2292 qband_t *qbp = NULL; 2293 int mcls = (int)queclass(bp); 2294 kthread_id_t freezer; 2295 int bytecnt = 0, mblkcnt = 0; 2296 2297 freezer = STREAM(q)->sd_freezer; 2298 if (freezer == curthread) { 2299 ASSERT(frozenstr(q)); 2300 ASSERT(MUTEX_HELD(QLOCK(q))); 2301 } else 2302 mutex_enter(QLOCK(q)); 2303 2304 /* 2305 * Make sanity checks and if qband structure is not yet 2306 * allocated, do so. 2307 */ 2308 if (mcls == QPCTL) { 2309 if (bp->b_band != 0) 2310 bp->b_band = 0; /* force to be correct */ 2311 } else if (bp->b_band != 0) { 2312 int i; 2313 qband_t **qbpp; 2314 2315 if (bp->b_band > q->q_nband) { 2316 2317 /* 2318 * The qband structure for this priority band is 2319 * not on the queue yet, so we have to allocate 2320 * one on the fly. It would be wasteful to 2321 * associate the qband structures with every 2322 * queue when the queues are allocated. This is 2323 * because most queues will only need the normal 2324 * band of flow which can be described entirely 2325 * by the queue itself. 2326 */ 2327 qbpp = &q->q_bandp; 2328 while (*qbpp) 2329 qbpp = &(*qbpp)->qb_next; 2330 while (bp->b_band > q->q_nband) { 2331 if ((*qbpp = allocband()) == NULL) { 2332 if (freezer != curthread) 2333 mutex_exit(QLOCK(q)); 2334 return (0); 2335 } 2336 (*qbpp)->qb_hiwat = q->q_hiwat; 2337 (*qbpp)->qb_lowat = q->q_lowat; 2338 q->q_nband++; 2339 qbpp = &(*qbpp)->qb_next; 2340 } 2341 } 2342 ASSERT(MUTEX_HELD(QLOCK(q))); 2343 qbp = q->q_bandp; 2344 i = bp->b_band; 2345 while (--i) 2346 qbp = qbp->qb_next; 2347 } 2348 2349 /* 2350 * If queue is empty, add the message and initialize the pointers. 2351 * Otherwise, adjust message pointers and queue pointers based on 2352 * the type of the message and where it belongs on the queue. Some 2353 * code is duplicated to minimize the number of conditionals and 2354 * hopefully minimize the amount of time this routine takes. 2355 */ 2356 if (!q->q_first) { 2357 bp->b_next = NULL; 2358 bp->b_prev = NULL; 2359 q->q_first = bp; 2360 q->q_last = bp; 2361 if (qbp) { 2362 qbp->qb_first = bp; 2363 qbp->qb_last = bp; 2364 } 2365 } else if (!qbp) { /* bp->b_band == 0 */ 2366 2367 /* 2368 * If queue class of message is less than or equal to 2369 * that of the last one on the queue, tack on to the end. 2370 */ 2371 tmp = q->q_last; 2372 if (mcls <= (int)queclass(tmp)) { 2373 bp->b_next = NULL; 2374 bp->b_prev = tmp; 2375 tmp->b_next = bp; 2376 q->q_last = bp; 2377 } else { 2378 tmp = q->q_first; 2379 while ((int)queclass(tmp) >= mcls) 2380 tmp = tmp->b_next; 2381 2382 /* 2383 * Insert bp before tmp. 2384 */ 2385 bp->b_next = tmp; 2386 bp->b_prev = tmp->b_prev; 2387 if (tmp->b_prev) 2388 tmp->b_prev->b_next = bp; 2389 else 2390 q->q_first = bp; 2391 tmp->b_prev = bp; 2392 } 2393 } else { /* bp->b_band != 0 */ 2394 if (qbp->qb_first) { 2395 tmp = qbp->qb_last; 2396 2397 /* 2398 * Insert bp after the last message in this band. 2399 */ 2400 bp->b_next = tmp->b_next; 2401 if (tmp->b_next) 2402 tmp->b_next->b_prev = bp; 2403 else 2404 q->q_last = bp; 2405 bp->b_prev = tmp; 2406 tmp->b_next = bp; 2407 } else { 2408 tmp = q->q_last; 2409 if ((mcls < (int)queclass(tmp)) || 2410 (bp->b_band <= tmp->b_band)) { 2411 2412 /* 2413 * Tack bp on end of queue. 2414 */ 2415 bp->b_next = NULL; 2416 bp->b_prev = tmp; 2417 tmp->b_next = bp; 2418 q->q_last = bp; 2419 } else { 2420 tmp = q->q_first; 2421 while (tmp->b_datap->db_type >= QPCTL) 2422 tmp = tmp->b_next; 2423 while (tmp->b_band >= bp->b_band) 2424 tmp = tmp->b_next; 2425 2426 /* 2427 * Insert bp before tmp. 2428 */ 2429 bp->b_next = tmp; 2430 bp->b_prev = tmp->b_prev; 2431 if (tmp->b_prev) 2432 tmp->b_prev->b_next = bp; 2433 else 2434 q->q_first = bp; 2435 tmp->b_prev = bp; 2436 } 2437 qbp->qb_first = bp; 2438 } 2439 qbp->qb_last = bp; 2440 } 2441 2442 /* Get message byte count for q_count accounting */ 2443 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2444 ADD_MBLK_SIZE(tmp, bytecnt); 2445 mblkcnt++; 2446 } 2447 2448 if (qbp) { 2449 qbp->qb_count += bytecnt; 2450 qbp->qb_mblkcnt += mblkcnt; 2451 if ((qbp->qb_count >= qbp->qb_hiwat) || 2452 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2453 qbp->qb_flag |= QB_FULL; 2454 } 2455 } else { 2456 q->q_count += bytecnt; 2457 q->q_mblkcnt += mblkcnt; 2458 if ((q->q_count >= q->q_hiwat) || 2459 (q->q_mblkcnt >= q->q_hiwat)) { 2460 q->q_flag |= QFULL; 2461 } 2462 } 2463 2464 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2465 2466 if ((mcls > QNORM) || 2467 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2468 qenable_locked(q); 2469 ASSERT(MUTEX_HELD(QLOCK(q))); 2470 if (freezer != curthread) 2471 mutex_exit(QLOCK(q)); 2472 2473 return (1); 2474 } 2475 2476 /* 2477 * Put stuff back at beginning of Q according to priority order. 2478 * See comment on putq above for details. 2479 */ 2480 int 2481 putbq(queue_t *q, mblk_t *bp) 2482 { 2483 mblk_t *tmp; 2484 qband_t *qbp = NULL; 2485 int mcls = (int)queclass(bp); 2486 kthread_id_t freezer; 2487 int bytecnt = 0, mblkcnt = 0; 2488 2489 ASSERT(q && bp); 2490 ASSERT(bp->b_next == NULL); 2491 freezer = STREAM(q)->sd_freezer; 2492 if (freezer == curthread) { 2493 ASSERT(frozenstr(q)); 2494 ASSERT(MUTEX_HELD(QLOCK(q))); 2495 } else 2496 mutex_enter(QLOCK(q)); 2497 2498 /* 2499 * Make sanity checks and if qband structure is not yet 2500 * allocated, do so. 2501 */ 2502 if (mcls == QPCTL) { 2503 if (bp->b_band != 0) 2504 bp->b_band = 0; /* force to be correct */ 2505 } else if (bp->b_band != 0) { 2506 int i; 2507 qband_t **qbpp; 2508 2509 if (bp->b_band > q->q_nband) { 2510 qbpp = &q->q_bandp; 2511 while (*qbpp) 2512 qbpp = &(*qbpp)->qb_next; 2513 while (bp->b_band > q->q_nband) { 2514 if ((*qbpp = allocband()) == NULL) { 2515 if (freezer != curthread) 2516 mutex_exit(QLOCK(q)); 2517 return (0); 2518 } 2519 (*qbpp)->qb_hiwat = q->q_hiwat; 2520 (*qbpp)->qb_lowat = q->q_lowat; 2521 q->q_nband++; 2522 qbpp = &(*qbpp)->qb_next; 2523 } 2524 } 2525 qbp = q->q_bandp; 2526 i = bp->b_band; 2527 while (--i) 2528 qbp = qbp->qb_next; 2529 } 2530 2531 /* 2532 * If queue is empty or if message is high priority, 2533 * place on the front of the queue. 2534 */ 2535 tmp = q->q_first; 2536 if ((!tmp) || (mcls == QPCTL)) { 2537 bp->b_next = tmp; 2538 if (tmp) 2539 tmp->b_prev = bp; 2540 else 2541 q->q_last = bp; 2542 q->q_first = bp; 2543 bp->b_prev = NULL; 2544 if (qbp) { 2545 qbp->qb_first = bp; 2546 qbp->qb_last = bp; 2547 } 2548 } else if (qbp) { /* bp->b_band != 0 */ 2549 tmp = qbp->qb_first; 2550 if (tmp) { 2551 2552 /* 2553 * Insert bp before the first message in this band. 2554 */ 2555 bp->b_next = tmp; 2556 bp->b_prev = tmp->b_prev; 2557 if (tmp->b_prev) 2558 tmp->b_prev->b_next = bp; 2559 else 2560 q->q_first = bp; 2561 tmp->b_prev = bp; 2562 } else { 2563 tmp = q->q_last; 2564 if ((mcls < (int)queclass(tmp)) || 2565 (bp->b_band < tmp->b_band)) { 2566 2567 /* 2568 * Tack bp on end of queue. 2569 */ 2570 bp->b_next = NULL; 2571 bp->b_prev = tmp; 2572 tmp->b_next = bp; 2573 q->q_last = bp; 2574 } else { 2575 tmp = q->q_first; 2576 while (tmp->b_datap->db_type >= QPCTL) 2577 tmp = tmp->b_next; 2578 while (tmp->b_band > bp->b_band) 2579 tmp = tmp->b_next; 2580 2581 /* 2582 * Insert bp before tmp. 2583 */ 2584 bp->b_next = tmp; 2585 bp->b_prev = tmp->b_prev; 2586 if (tmp->b_prev) 2587 tmp->b_prev->b_next = bp; 2588 else 2589 q->q_first = bp; 2590 tmp->b_prev = bp; 2591 } 2592 qbp->qb_last = bp; 2593 } 2594 qbp->qb_first = bp; 2595 } else { /* bp->b_band == 0 && !QPCTL */ 2596 2597 /* 2598 * If the queue class or band is less than that of the last 2599 * message on the queue, tack bp on the end of the queue. 2600 */ 2601 tmp = q->q_last; 2602 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2603 bp->b_next = NULL; 2604 bp->b_prev = tmp; 2605 tmp->b_next = bp; 2606 q->q_last = bp; 2607 } else { 2608 tmp = q->q_first; 2609 while (tmp->b_datap->db_type >= QPCTL) 2610 tmp = tmp->b_next; 2611 while (tmp->b_band > bp->b_band) 2612 tmp = tmp->b_next; 2613 2614 /* 2615 * Insert bp before tmp. 2616 */ 2617 bp->b_next = tmp; 2618 bp->b_prev = tmp->b_prev; 2619 if (tmp->b_prev) 2620 tmp->b_prev->b_next = bp; 2621 else 2622 q->q_first = bp; 2623 tmp->b_prev = bp; 2624 } 2625 } 2626 2627 /* Get message byte count for q_count accounting */ 2628 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2629 ADD_MBLK_SIZE(tmp, bytecnt); 2630 mblkcnt++; 2631 } 2632 if (qbp) { 2633 qbp->qb_count += bytecnt; 2634 qbp->qb_mblkcnt += mblkcnt; 2635 if ((qbp->qb_count >= qbp->qb_hiwat) || 2636 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2637 qbp->qb_flag |= QB_FULL; 2638 } 2639 } else { 2640 q->q_count += bytecnt; 2641 q->q_mblkcnt += mblkcnt; 2642 if ((q->q_count >= q->q_hiwat) || 2643 (q->q_mblkcnt >= q->q_hiwat)) { 2644 q->q_flag |= QFULL; 2645 } 2646 } 2647 2648 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2649 2650 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2651 qenable_locked(q); 2652 ASSERT(MUTEX_HELD(QLOCK(q))); 2653 if (freezer != curthread) 2654 mutex_exit(QLOCK(q)); 2655 2656 return (1); 2657 } 2658 2659 /* 2660 * Insert a message before an existing message on the queue. If the 2661 * existing message is NULL, the new messages is placed on the end of 2662 * the queue. The queue class of the new message is ignored. However, 2663 * the priority band of the new message must adhere to the following 2664 * ordering: 2665 * 2666 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2667 * 2668 * All flow control parameters are updated. 2669 * 2670 * insq can be called with the stream frozen, but other utility functions 2671 * holding QLOCK, and by streams modules without any locks/frozen. 2672 */ 2673 int 2674 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2675 { 2676 mblk_t *tmp; 2677 qband_t *qbp = NULL; 2678 int mcls = (int)queclass(mp); 2679 kthread_id_t freezer; 2680 int bytecnt = 0, mblkcnt = 0; 2681 2682 freezer = STREAM(q)->sd_freezer; 2683 if (freezer == curthread) { 2684 ASSERT(frozenstr(q)); 2685 ASSERT(MUTEX_HELD(QLOCK(q))); 2686 } else if (MUTEX_HELD(QLOCK(q))) { 2687 /* Don't drop lock on exit */ 2688 freezer = curthread; 2689 } else 2690 mutex_enter(QLOCK(q)); 2691 2692 if (mcls == QPCTL) { 2693 if (mp->b_band != 0) 2694 mp->b_band = 0; /* force to be correct */ 2695 if (emp && emp->b_prev && 2696 (emp->b_prev->b_datap->db_type < QPCTL)) 2697 goto badord; 2698 } 2699 if (emp) { 2700 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2701 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2702 (emp->b_prev->b_band < mp->b_band))) { 2703 goto badord; 2704 } 2705 } else { 2706 tmp = q->q_last; 2707 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2708 badord: 2709 cmn_err(CE_WARN, 2710 "insq: attempt to insert message out of order " 2711 "on q %p", (void *)q); 2712 if (freezer != curthread) 2713 mutex_exit(QLOCK(q)); 2714 return (0); 2715 } 2716 } 2717 2718 if (mp->b_band != 0) { 2719 int i; 2720 qband_t **qbpp; 2721 2722 if (mp->b_band > q->q_nband) { 2723 qbpp = &q->q_bandp; 2724 while (*qbpp) 2725 qbpp = &(*qbpp)->qb_next; 2726 while (mp->b_band > q->q_nband) { 2727 if ((*qbpp = allocband()) == NULL) { 2728 if (freezer != curthread) 2729 mutex_exit(QLOCK(q)); 2730 return (0); 2731 } 2732 (*qbpp)->qb_hiwat = q->q_hiwat; 2733 (*qbpp)->qb_lowat = q->q_lowat; 2734 q->q_nband++; 2735 qbpp = &(*qbpp)->qb_next; 2736 } 2737 } 2738 qbp = q->q_bandp; 2739 i = mp->b_band; 2740 while (--i) 2741 qbp = qbp->qb_next; 2742 } 2743 2744 if ((mp->b_next = emp) != NULL) { 2745 if ((mp->b_prev = emp->b_prev) != NULL) 2746 emp->b_prev->b_next = mp; 2747 else 2748 q->q_first = mp; 2749 emp->b_prev = mp; 2750 } else { 2751 if ((mp->b_prev = q->q_last) != NULL) 2752 q->q_last->b_next = mp; 2753 else 2754 q->q_first = mp; 2755 q->q_last = mp; 2756 } 2757 2758 /* Get mblk and byte count for q_count accounting */ 2759 for (tmp = mp; tmp; tmp = tmp->b_cont) { 2760 ADD_MBLK_SIZE(tmp, bytecnt); 2761 mblkcnt++; 2762 } 2763 2764 if (qbp) { /* adjust qband pointers and count */ 2765 if (!qbp->qb_first) { 2766 qbp->qb_first = mp; 2767 qbp->qb_last = mp; 2768 } else { 2769 if (mp->b_prev == NULL || (mp->b_prev != NULL && 2770 (mp->b_prev->b_band != mp->b_band))) 2771 qbp->qb_first = mp; 2772 else if (mp->b_next == NULL || (mp->b_next != NULL && 2773 (mp->b_next->b_band != mp->b_band))) 2774 qbp->qb_last = mp; 2775 } 2776 qbp->qb_count += bytecnt; 2777 qbp->qb_mblkcnt += mblkcnt; 2778 if ((qbp->qb_count >= qbp->qb_hiwat) || 2779 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2780 qbp->qb_flag |= QB_FULL; 2781 } 2782 } else { 2783 q->q_count += bytecnt; 2784 q->q_mblkcnt += mblkcnt; 2785 if ((q->q_count >= q->q_hiwat) || 2786 (q->q_mblkcnt >= q->q_hiwat)) { 2787 q->q_flag |= QFULL; 2788 } 2789 } 2790 2791 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 2792 2793 if (canenable(q) && (q->q_flag & QWANTR)) 2794 qenable_locked(q); 2795 2796 ASSERT(MUTEX_HELD(QLOCK(q))); 2797 if (freezer != curthread) 2798 mutex_exit(QLOCK(q)); 2799 2800 return (1); 2801 } 2802 2803 /* 2804 * Create and put a control message on queue. 2805 */ 2806 int 2807 putctl(queue_t *q, int type) 2808 { 2809 mblk_t *bp; 2810 2811 if ((datamsg(type) && (type != M_DELAY)) || 2812 (bp = allocb_tryhard(0)) == NULL) 2813 return (0); 2814 bp->b_datap->db_type = (unsigned char) type; 2815 2816 put(q, bp); 2817 2818 return (1); 2819 } 2820 2821 /* 2822 * Control message with a single-byte parameter 2823 */ 2824 int 2825 putctl1(queue_t *q, int type, int param) 2826 { 2827 mblk_t *bp; 2828 2829 if ((datamsg(type) && (type != M_DELAY)) || 2830 (bp = allocb_tryhard(1)) == NULL) 2831 return (0); 2832 bp->b_datap->db_type = (unsigned char)type; 2833 *bp->b_wptr++ = (unsigned char)param; 2834 2835 put(q, bp); 2836 2837 return (1); 2838 } 2839 2840 int 2841 putnextctl1(queue_t *q, int type, int param) 2842 { 2843 mblk_t *bp; 2844 2845 if ((datamsg(type) && (type != M_DELAY)) || 2846 ((bp = allocb_tryhard(1)) == NULL)) 2847 return (0); 2848 2849 bp->b_datap->db_type = (unsigned char)type; 2850 *bp->b_wptr++ = (unsigned char)param; 2851 2852 putnext(q, bp); 2853 2854 return (1); 2855 } 2856 2857 int 2858 putnextctl(queue_t *q, int type) 2859 { 2860 mblk_t *bp; 2861 2862 if ((datamsg(type) && (type != M_DELAY)) || 2863 ((bp = allocb_tryhard(0)) == NULL)) 2864 return (0); 2865 bp->b_datap->db_type = (unsigned char)type; 2866 2867 putnext(q, bp); 2868 2869 return (1); 2870 } 2871 2872 /* 2873 * Return the queue upstream from this one 2874 */ 2875 queue_t * 2876 backq(queue_t *q) 2877 { 2878 q = _OTHERQ(q); 2879 if (q->q_next) { 2880 q = q->q_next; 2881 return (_OTHERQ(q)); 2882 } 2883 return (NULL); 2884 } 2885 2886 /* 2887 * Send a block back up the queue in reverse from this 2888 * one (e.g. to respond to ioctls) 2889 */ 2890 void 2891 qreply(queue_t *q, mblk_t *bp) 2892 { 2893 ASSERT(q && bp); 2894 2895 putnext(_OTHERQ(q), bp); 2896 } 2897 2898 /* 2899 * Streams Queue Scheduling 2900 * 2901 * Queues are enabled through qenable() when they have messages to 2902 * process. They are serviced by queuerun(), which runs each enabled 2903 * queue's service procedure. The call to queuerun() is processor 2904 * dependent - the general principle is that it be run whenever a queue 2905 * is enabled but before returning to user level. For system calls, 2906 * the function runqueues() is called if their action causes a queue 2907 * to be enabled. For device interrupts, queuerun() should be 2908 * called before returning from the last level of interrupt. Beyond 2909 * this, no timing assumptions should be made about queue scheduling. 2910 */ 2911 2912 /* 2913 * Enable a queue: put it on list of those whose service procedures are 2914 * ready to run and set up the scheduling mechanism. 2915 * The broadcast is done outside the mutex -> to avoid the woken thread 2916 * from contending with the mutex. This is OK 'cos the queue has been 2917 * enqueued on the runlist and flagged safely at this point. 2918 */ 2919 void 2920 qenable(queue_t *q) 2921 { 2922 mutex_enter(QLOCK(q)); 2923 qenable_locked(q); 2924 mutex_exit(QLOCK(q)); 2925 } 2926 /* 2927 * Return number of messages on queue 2928 */ 2929 int 2930 qsize(queue_t *qp) 2931 { 2932 int count = 0; 2933 mblk_t *mp; 2934 2935 mutex_enter(QLOCK(qp)); 2936 for (mp = qp->q_first; mp; mp = mp->b_next) 2937 count++; 2938 mutex_exit(QLOCK(qp)); 2939 return (count); 2940 } 2941 2942 /* 2943 * noenable - set queue so that putq() will not enable it. 2944 * enableok - set queue so that putq() can enable it. 2945 */ 2946 void 2947 noenable(queue_t *q) 2948 { 2949 mutex_enter(QLOCK(q)); 2950 q->q_flag |= QNOENB; 2951 mutex_exit(QLOCK(q)); 2952 } 2953 2954 void 2955 enableok(queue_t *q) 2956 { 2957 mutex_enter(QLOCK(q)); 2958 q->q_flag &= ~QNOENB; 2959 mutex_exit(QLOCK(q)); 2960 } 2961 2962 /* 2963 * Set queue fields. 2964 */ 2965 int 2966 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 2967 { 2968 qband_t *qbp = NULL; 2969 queue_t *wrq; 2970 int error = 0; 2971 kthread_id_t freezer; 2972 2973 freezer = STREAM(q)->sd_freezer; 2974 if (freezer == curthread) { 2975 ASSERT(frozenstr(q)); 2976 ASSERT(MUTEX_HELD(QLOCK(q))); 2977 } else 2978 mutex_enter(QLOCK(q)); 2979 2980 if (what >= QBAD) { 2981 error = EINVAL; 2982 goto done; 2983 } 2984 if (pri != 0) { 2985 int i; 2986 qband_t **qbpp; 2987 2988 if (pri > q->q_nband) { 2989 qbpp = &q->q_bandp; 2990 while (*qbpp) 2991 qbpp = &(*qbpp)->qb_next; 2992 while (pri > q->q_nband) { 2993 if ((*qbpp = allocband()) == NULL) { 2994 error = EAGAIN; 2995 goto done; 2996 } 2997 (*qbpp)->qb_hiwat = q->q_hiwat; 2998 (*qbpp)->qb_lowat = q->q_lowat; 2999 q->q_nband++; 3000 qbpp = &(*qbpp)->qb_next; 3001 } 3002 } 3003 qbp = q->q_bandp; 3004 i = pri; 3005 while (--i) 3006 qbp = qbp->qb_next; 3007 } 3008 switch (what) { 3009 3010 case QHIWAT: 3011 if (qbp) 3012 qbp->qb_hiwat = (size_t)val; 3013 else 3014 q->q_hiwat = (size_t)val; 3015 break; 3016 3017 case QLOWAT: 3018 if (qbp) 3019 qbp->qb_lowat = (size_t)val; 3020 else 3021 q->q_lowat = (size_t)val; 3022 break; 3023 3024 case QMAXPSZ: 3025 if (qbp) 3026 error = EINVAL; 3027 else 3028 q->q_maxpsz = (ssize_t)val; 3029 3030 /* 3031 * Performance concern, strwrite looks at the module below 3032 * the stream head for the maxpsz each time it does a write 3033 * we now cache it at the stream head. Check to see if this 3034 * queue is sitting directly below the stream head. 3035 */ 3036 wrq = STREAM(q)->sd_wrq; 3037 if (q != wrq->q_next) 3038 break; 3039 3040 /* 3041 * If the stream is not frozen drop the current QLOCK and 3042 * acquire the sd_wrq QLOCK which protects sd_qn_* 3043 */ 3044 if (freezer != curthread) { 3045 mutex_exit(QLOCK(q)); 3046 mutex_enter(QLOCK(wrq)); 3047 } 3048 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3049 3050 if (strmsgsz != 0) { 3051 if (val == INFPSZ) 3052 val = strmsgsz; 3053 else { 3054 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3055 val = MIN(PIPE_BUF, val); 3056 else 3057 val = MIN(strmsgsz, val); 3058 } 3059 } 3060 STREAM(q)->sd_qn_maxpsz = val; 3061 if (freezer != curthread) { 3062 mutex_exit(QLOCK(wrq)); 3063 mutex_enter(QLOCK(q)); 3064 } 3065 break; 3066 3067 case QMINPSZ: 3068 if (qbp) 3069 error = EINVAL; 3070 else 3071 q->q_minpsz = (ssize_t)val; 3072 3073 /* 3074 * Performance concern, strwrite looks at the module below 3075 * the stream head for the maxpsz each time it does a write 3076 * we now cache it at the stream head. Check to see if this 3077 * queue is sitting directly below the stream head. 3078 */ 3079 wrq = STREAM(q)->sd_wrq; 3080 if (q != wrq->q_next) 3081 break; 3082 3083 /* 3084 * If the stream is not frozen drop the current QLOCK and 3085 * acquire the sd_wrq QLOCK which protects sd_qn_* 3086 */ 3087 if (freezer != curthread) { 3088 mutex_exit(QLOCK(q)); 3089 mutex_enter(QLOCK(wrq)); 3090 } 3091 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3092 3093 if (freezer != curthread) { 3094 mutex_exit(QLOCK(wrq)); 3095 mutex_enter(QLOCK(q)); 3096 } 3097 break; 3098 3099 case QSTRUIOT: 3100 if (qbp) 3101 error = EINVAL; 3102 else 3103 q->q_struiot = (ushort_t)val; 3104 break; 3105 3106 case QCOUNT: 3107 case QFIRST: 3108 case QLAST: 3109 case QFLAG: 3110 error = EPERM; 3111 break; 3112 3113 default: 3114 error = EINVAL; 3115 break; 3116 } 3117 done: 3118 if (freezer != curthread) 3119 mutex_exit(QLOCK(q)); 3120 return (error); 3121 } 3122 3123 /* 3124 * Get queue fields. 3125 */ 3126 int 3127 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3128 { 3129 qband_t *qbp = NULL; 3130 int error = 0; 3131 kthread_id_t freezer; 3132 3133 freezer = STREAM(q)->sd_freezer; 3134 if (freezer == curthread) { 3135 ASSERT(frozenstr(q)); 3136 ASSERT(MUTEX_HELD(QLOCK(q))); 3137 } else 3138 mutex_enter(QLOCK(q)); 3139 if (what >= QBAD) { 3140 error = EINVAL; 3141 goto done; 3142 } 3143 if (pri != 0) { 3144 int i; 3145 qband_t **qbpp; 3146 3147 if (pri > q->q_nband) { 3148 qbpp = &q->q_bandp; 3149 while (*qbpp) 3150 qbpp = &(*qbpp)->qb_next; 3151 while (pri > q->q_nband) { 3152 if ((*qbpp = allocband()) == NULL) { 3153 error = EAGAIN; 3154 goto done; 3155 } 3156 (*qbpp)->qb_hiwat = q->q_hiwat; 3157 (*qbpp)->qb_lowat = q->q_lowat; 3158 q->q_nband++; 3159 qbpp = &(*qbpp)->qb_next; 3160 } 3161 } 3162 qbp = q->q_bandp; 3163 i = pri; 3164 while (--i) 3165 qbp = qbp->qb_next; 3166 } 3167 switch (what) { 3168 case QHIWAT: 3169 if (qbp) 3170 *(size_t *)valp = qbp->qb_hiwat; 3171 else 3172 *(size_t *)valp = q->q_hiwat; 3173 break; 3174 3175 case QLOWAT: 3176 if (qbp) 3177 *(size_t *)valp = qbp->qb_lowat; 3178 else 3179 *(size_t *)valp = q->q_lowat; 3180 break; 3181 3182 case QMAXPSZ: 3183 if (qbp) 3184 error = EINVAL; 3185 else 3186 *(ssize_t *)valp = q->q_maxpsz; 3187 break; 3188 3189 case QMINPSZ: 3190 if (qbp) 3191 error = EINVAL; 3192 else 3193 *(ssize_t *)valp = q->q_minpsz; 3194 break; 3195 3196 case QCOUNT: 3197 if (qbp) 3198 *(size_t *)valp = qbp->qb_count; 3199 else 3200 *(size_t *)valp = q->q_count; 3201 break; 3202 3203 case QFIRST: 3204 if (qbp) 3205 *(mblk_t **)valp = qbp->qb_first; 3206 else 3207 *(mblk_t **)valp = q->q_first; 3208 break; 3209 3210 case QLAST: 3211 if (qbp) 3212 *(mblk_t **)valp = qbp->qb_last; 3213 else 3214 *(mblk_t **)valp = q->q_last; 3215 break; 3216 3217 case QFLAG: 3218 if (qbp) 3219 *(uint_t *)valp = qbp->qb_flag; 3220 else 3221 *(uint_t *)valp = q->q_flag; 3222 break; 3223 3224 case QSTRUIOT: 3225 if (qbp) 3226 error = EINVAL; 3227 else 3228 *(short *)valp = q->q_struiot; 3229 break; 3230 3231 default: 3232 error = EINVAL; 3233 break; 3234 } 3235 done: 3236 if (freezer != curthread) 3237 mutex_exit(QLOCK(q)); 3238 return (error); 3239 } 3240 3241 /* 3242 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3243 * QWANTWSYNC or QWANTR or QWANTW, 3244 * 3245 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3246 * deferred wakeup will be done. Also if strpoll() in progress then a 3247 * deferred pollwakeup will be done. 3248 */ 3249 void 3250 strwakeq(queue_t *q, int flag) 3251 { 3252 stdata_t *stp = STREAM(q); 3253 pollhead_t *pl; 3254 3255 mutex_enter(&stp->sd_lock); 3256 pl = &stp->sd_pollist; 3257 if (flag & QWANTWSYNC) { 3258 ASSERT(!(q->q_flag & QREADR)); 3259 if (stp->sd_flag & WSLEEP) { 3260 stp->sd_flag &= ~WSLEEP; 3261 cv_broadcast(&stp->sd_wrq->q_wait); 3262 } else { 3263 stp->sd_wakeq |= WSLEEP; 3264 } 3265 3266 mutex_exit(&stp->sd_lock); 3267 pollwakeup(pl, POLLWRNORM); 3268 mutex_enter(&stp->sd_lock); 3269 3270 if (stp->sd_sigflags & S_WRNORM) 3271 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3272 } else if (flag & QWANTR) { 3273 if (stp->sd_flag & RSLEEP) { 3274 stp->sd_flag &= ~RSLEEP; 3275 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3276 } else { 3277 stp->sd_wakeq |= RSLEEP; 3278 } 3279 3280 mutex_exit(&stp->sd_lock); 3281 pollwakeup(pl, POLLIN | POLLRDNORM); 3282 mutex_enter(&stp->sd_lock); 3283 3284 { 3285 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3286 3287 if (events) 3288 strsendsig(stp->sd_siglist, events, 0, 0); 3289 } 3290 } else { 3291 if (stp->sd_flag & WSLEEP) { 3292 stp->sd_flag &= ~WSLEEP; 3293 cv_broadcast(&stp->sd_wrq->q_wait); 3294 } 3295 3296 mutex_exit(&stp->sd_lock); 3297 pollwakeup(pl, POLLWRNORM); 3298 mutex_enter(&stp->sd_lock); 3299 3300 if (stp->sd_sigflags & S_WRNORM) 3301 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3302 } 3303 mutex_exit(&stp->sd_lock); 3304 } 3305 3306 int 3307 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3308 { 3309 stdata_t *stp = STREAM(q); 3310 int typ = STRUIOT_STANDARD; 3311 uio_t *uiop = &dp->d_uio; 3312 dblk_t *dbp; 3313 ssize_t uiocnt; 3314 ssize_t cnt; 3315 unsigned char *ptr; 3316 ssize_t resid; 3317 int error = 0; 3318 on_trap_data_t otd; 3319 queue_t *stwrq; 3320 3321 /* 3322 * Plumbing may change while taking the type so store the 3323 * queue in a temporary variable. It doesn't matter even 3324 * if the we take the type from the previous plumbing, 3325 * that's because if the plumbing has changed when we were 3326 * holding the queue in a temporary variable, we can continue 3327 * processing the message the way it would have been processed 3328 * in the old plumbing, without any side effects but a bit 3329 * extra processing for partial ip header checksum. 3330 * 3331 * This has been done to avoid holding the sd_lock which is 3332 * very hot. 3333 */ 3334 3335 stwrq = stp->sd_struiowrq; 3336 if (stwrq) 3337 typ = stwrq->q_struiot; 3338 3339 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3340 dbp = mp->b_datap; 3341 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3342 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3343 cnt = MIN(uiocnt, uiop->uio_resid); 3344 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3345 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3346 /* 3347 * Either this mblk has already been processed 3348 * or there is no more room in this mblk (?). 3349 */ 3350 continue; 3351 } 3352 switch (typ) { 3353 case STRUIOT_STANDARD: 3354 if (noblock) { 3355 if (on_trap(&otd, OT_DATA_ACCESS)) { 3356 no_trap(); 3357 error = EWOULDBLOCK; 3358 goto out; 3359 } 3360 } 3361 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3362 if (noblock) 3363 no_trap(); 3364 goto out; 3365 } 3366 if (noblock) 3367 no_trap(); 3368 break; 3369 3370 default: 3371 error = EIO; 3372 goto out; 3373 } 3374 dbp->db_struioflag |= STRUIO_DONE; 3375 dbp->db_cksumstuff += cnt; 3376 } 3377 out: 3378 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3379 /* 3380 * A fault has occured and some bytes were moved to the 3381 * current mblk, the uio_t has already been updated by 3382 * the appropriate uio routine, so also update the mblk 3383 * to reflect this in case this same mblk chain is used 3384 * again (after the fault has been handled). 3385 */ 3386 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3387 if (uiocnt >= resid) 3388 dbp->db_cksumstuff += resid; 3389 } 3390 return (error); 3391 } 3392 3393 /* 3394 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3395 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3396 * that removeq() will not try to close the queue while a thread is inside the 3397 * queue. 3398 */ 3399 static boolean_t 3400 rwnext_enter(queue_t *qp) 3401 { 3402 mutex_enter(QLOCK(qp)); 3403 if (qp->q_flag & QWCLOSE) { 3404 mutex_exit(QLOCK(qp)); 3405 return (B_FALSE); 3406 } 3407 qp->q_rwcnt++; 3408 ASSERT(qp->q_rwcnt != 0); 3409 mutex_exit(QLOCK(qp)); 3410 return (B_TRUE); 3411 } 3412 3413 /* 3414 * Decrease the count of threads running in sync stream queue and wake up any 3415 * threads blocked in removeq(). 3416 */ 3417 static void 3418 rwnext_exit(queue_t *qp) 3419 { 3420 mutex_enter(QLOCK(qp)); 3421 qp->q_rwcnt--; 3422 if (qp->q_flag & QWANTRMQSYNC) { 3423 qp->q_flag &= ~QWANTRMQSYNC; 3424 cv_broadcast(&qp->q_wait); 3425 } 3426 mutex_exit(QLOCK(qp)); 3427 } 3428 3429 /* 3430 * The purpose of rwnext() is to call the rw procedure of the next 3431 * (downstream) modules queue. 3432 * 3433 * treated as put entrypoint for perimeter syncronization. 3434 * 3435 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3436 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3437 * not matter if any regular put entrypoints have been already entered. We 3438 * can't increment one of the sq_putcounts (instead of sq_count) because 3439 * qwait_rw won't know which counter to decrement. 3440 * 3441 * It would be reasonable to add the lockless FASTPUT logic. 3442 */ 3443 int 3444 rwnext(queue_t *qp, struiod_t *dp) 3445 { 3446 queue_t *nqp; 3447 syncq_t *sq; 3448 uint16_t count; 3449 uint16_t flags; 3450 struct qinit *qi; 3451 int (*proc)(); 3452 struct stdata *stp; 3453 int isread; 3454 int rval; 3455 3456 stp = STREAM(qp); 3457 /* 3458 * Prevent q_next from changing by holding sd_lock until acquiring 3459 * SQLOCK. Note that a read-side rwnext from the streamhead will 3460 * already have sd_lock acquired. In either case sd_lock is always 3461 * released after acquiring SQLOCK. 3462 * 3463 * The streamhead read-side holding sd_lock when calling rwnext is 3464 * required to prevent a race condition were M_DATA mblks flowing 3465 * up the read-side of the stream could be bypassed by a rwnext() 3466 * down-call. In this case sd_lock acts as the streamhead perimeter. 3467 */ 3468 if ((nqp = _WR(qp)) == qp) { 3469 isread = 0; 3470 mutex_enter(&stp->sd_lock); 3471 qp = nqp->q_next; 3472 } else { 3473 isread = 1; 3474 if (nqp != stp->sd_wrq) 3475 /* Not streamhead */ 3476 mutex_enter(&stp->sd_lock); 3477 qp = _RD(nqp->q_next); 3478 } 3479 qi = qp->q_qinfo; 3480 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3481 /* 3482 * Not a synchronous module or no r/w procedure for this 3483 * queue, so just return EINVAL and let the caller handle it. 3484 */ 3485 mutex_exit(&stp->sd_lock); 3486 return (EINVAL); 3487 } 3488 3489 if (rwnext_enter(qp) == B_FALSE) { 3490 mutex_exit(&stp->sd_lock); 3491 return (EINVAL); 3492 } 3493 3494 sq = qp->q_syncq; 3495 mutex_enter(SQLOCK(sq)); 3496 mutex_exit(&stp->sd_lock); 3497 count = sq->sq_count; 3498 flags = sq->sq_flags; 3499 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3500 3501 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3502 /* 3503 * if this queue is being closed, return. 3504 */ 3505 if (qp->q_flag & QWCLOSE) { 3506 mutex_exit(SQLOCK(sq)); 3507 rwnext_exit(qp); 3508 return (EINVAL); 3509 } 3510 3511 /* 3512 * Wait until we can enter the inner perimeter. 3513 */ 3514 sq->sq_flags = flags | SQ_WANTWAKEUP; 3515 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3516 count = sq->sq_count; 3517 flags = sq->sq_flags; 3518 } 3519 3520 if (isread == 0 && stp->sd_struiowrq == NULL || 3521 isread == 1 && stp->sd_struiordq == NULL) { 3522 /* 3523 * Stream plumbing changed while waiting for inner perimeter 3524 * so just return EINVAL and let the caller handle it. 3525 */ 3526 mutex_exit(SQLOCK(sq)); 3527 rwnext_exit(qp); 3528 return (EINVAL); 3529 } 3530 if (!(flags & SQ_CIPUT)) 3531 sq->sq_flags = flags | SQ_EXCL; 3532 sq->sq_count = count + 1; 3533 ASSERT(sq->sq_count != 0); /* Wraparound */ 3534 /* 3535 * Note: The only message ordering guarantee that rwnext() makes is 3536 * for the write queue flow-control case. All others (r/w queue 3537 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3538 * the queue's rw procedure. This could be genralized here buy 3539 * running the queue's service procedure, but that wouldn't be 3540 * the most efficent for all cases. 3541 */ 3542 mutex_exit(SQLOCK(sq)); 3543 if (! isread && (qp->q_flag & QFULL)) { 3544 /* 3545 * Write queue may be flow controlled. If so, 3546 * mark the queue for wakeup when it's not. 3547 */ 3548 mutex_enter(QLOCK(qp)); 3549 if (qp->q_flag & QFULL) { 3550 qp->q_flag |= QWANTWSYNC; 3551 mutex_exit(QLOCK(qp)); 3552 rval = EWOULDBLOCK; 3553 goto out; 3554 } 3555 mutex_exit(QLOCK(qp)); 3556 } 3557 3558 if (! isread && dp->d_mp) 3559 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3560 dp->d_mp->b_datap->db_base); 3561 3562 rval = (*proc)(qp, dp); 3563 3564 if (isread && dp->d_mp) 3565 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3566 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3567 out: 3568 /* 3569 * The queue is protected from being freed by sq_count, so it is 3570 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3571 */ 3572 rwnext_exit(qp); 3573 3574 mutex_enter(SQLOCK(sq)); 3575 flags = sq->sq_flags; 3576 ASSERT(sq->sq_count != 0); 3577 sq->sq_count--; 3578 if (flags & SQ_TAIL) { 3579 putnext_tail(sq, qp, flags); 3580 /* 3581 * The only purpose of this ASSERT is to preserve calling stack 3582 * in DEBUG kernel. 3583 */ 3584 ASSERT(flags & SQ_TAIL); 3585 return (rval); 3586 } 3587 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3588 /* 3589 * Safe to always drop SQ_EXCL: 3590 * Not SQ_CIPUT means we set SQ_EXCL above 3591 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3592 * did a qwriter(INNER) in which case nobody else 3593 * is in the inner perimeter and we are exiting. 3594 * 3595 * I would like to make the following assertion: 3596 * 3597 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3598 * sq->sq_count == 0); 3599 * 3600 * which indicates that if we are both putshared and exclusive, 3601 * we became exclusive while executing the putproc, and the only 3602 * claim on the syncq was the one we dropped a few lines above. 3603 * But other threads that enter putnext while the syncq is exclusive 3604 * need to make a claim as they may need to drop SQLOCK in the 3605 * has_writers case to avoid deadlocks. If these threads are 3606 * delayed or preempted, it is possible that the writer thread can 3607 * find out that there are other claims making the (sq_count == 0) 3608 * test invalid. 3609 */ 3610 3611 sq->sq_flags = flags & ~SQ_EXCL; 3612 if (sq->sq_flags & SQ_WANTWAKEUP) { 3613 sq->sq_flags &= ~SQ_WANTWAKEUP; 3614 cv_broadcast(&sq->sq_wait); 3615 } 3616 mutex_exit(SQLOCK(sq)); 3617 return (rval); 3618 } 3619 3620 /* 3621 * The purpose of infonext() is to call the info procedure of the next 3622 * (downstream) modules queue. 3623 * 3624 * treated as put entrypoint for perimeter syncronization. 3625 * 3626 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3627 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3628 * it does not matter if any regular put entrypoints have been already 3629 * entered. 3630 */ 3631 int 3632 infonext(queue_t *qp, infod_t *idp) 3633 { 3634 queue_t *nqp; 3635 syncq_t *sq; 3636 uint16_t count; 3637 uint16_t flags; 3638 struct qinit *qi; 3639 int (*proc)(); 3640 struct stdata *stp; 3641 int rval; 3642 3643 stp = STREAM(qp); 3644 /* 3645 * Prevent q_next from changing by holding sd_lock until 3646 * acquiring SQLOCK. 3647 */ 3648 mutex_enter(&stp->sd_lock); 3649 if ((nqp = _WR(qp)) == qp) { 3650 qp = nqp->q_next; 3651 } else { 3652 qp = _RD(nqp->q_next); 3653 } 3654 qi = qp->q_qinfo; 3655 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3656 mutex_exit(&stp->sd_lock); 3657 return (EINVAL); 3658 } 3659 sq = qp->q_syncq; 3660 mutex_enter(SQLOCK(sq)); 3661 mutex_exit(&stp->sd_lock); 3662 count = sq->sq_count; 3663 flags = sq->sq_flags; 3664 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3665 3666 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3667 /* 3668 * Wait until we can enter the inner perimeter. 3669 */ 3670 sq->sq_flags = flags | SQ_WANTWAKEUP; 3671 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3672 count = sq->sq_count; 3673 flags = sq->sq_flags; 3674 } 3675 3676 if (! (flags & SQ_CIPUT)) 3677 sq->sq_flags = flags | SQ_EXCL; 3678 sq->sq_count = count + 1; 3679 ASSERT(sq->sq_count != 0); /* Wraparound */ 3680 mutex_exit(SQLOCK(sq)); 3681 3682 rval = (*proc)(qp, idp); 3683 3684 mutex_enter(SQLOCK(sq)); 3685 flags = sq->sq_flags; 3686 ASSERT(sq->sq_count != 0); 3687 sq->sq_count--; 3688 if (flags & SQ_TAIL) { 3689 putnext_tail(sq, qp, flags); 3690 /* 3691 * The only purpose of this ASSERT is to preserve calling stack 3692 * in DEBUG kernel. 3693 */ 3694 ASSERT(flags & SQ_TAIL); 3695 return (rval); 3696 } 3697 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3698 /* 3699 * XXXX 3700 * I am not certain the next comment is correct here. I need to consider 3701 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3702 * might cause other problems. It just might be safer to drop it if 3703 * !SQ_CIPUT because that is when we set it. 3704 */ 3705 /* 3706 * Safe to always drop SQ_EXCL: 3707 * Not SQ_CIPUT means we set SQ_EXCL above 3708 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3709 * did a qwriter(INNER) in which case nobody else 3710 * is in the inner perimeter and we are exiting. 3711 * 3712 * I would like to make the following assertion: 3713 * 3714 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3715 * sq->sq_count == 0); 3716 * 3717 * which indicates that if we are both putshared and exclusive, 3718 * we became exclusive while executing the putproc, and the only 3719 * claim on the syncq was the one we dropped a few lines above. 3720 * But other threads that enter putnext while the syncq is exclusive 3721 * need to make a claim as they may need to drop SQLOCK in the 3722 * has_writers case to avoid deadlocks. If these threads are 3723 * delayed or preempted, it is possible that the writer thread can 3724 * find out that there are other claims making the (sq_count == 0) 3725 * test invalid. 3726 */ 3727 3728 sq->sq_flags = flags & ~SQ_EXCL; 3729 mutex_exit(SQLOCK(sq)); 3730 return (rval); 3731 } 3732 3733 /* 3734 * Return nonzero if the queue is responsible for struio(), else return 0. 3735 */ 3736 int 3737 isuioq(queue_t *q) 3738 { 3739 if (q->q_flag & QREADR) 3740 return (STREAM(q)->sd_struiordq == q); 3741 else 3742 return (STREAM(q)->sd_struiowrq == q); 3743 } 3744 3745 #if defined(__sparc) 3746 int disable_putlocks = 0; 3747 #else 3748 int disable_putlocks = 1; 3749 #endif 3750 3751 /* 3752 * called by create_putlock. 3753 */ 3754 static void 3755 create_syncq_putlocks(queue_t *q) 3756 { 3757 syncq_t *sq = q->q_syncq; 3758 ciputctrl_t *cip; 3759 int i; 3760 3761 ASSERT(sq != NULL); 3762 3763 ASSERT(disable_putlocks == 0); 3764 ASSERT(n_ciputctrl >= min_n_ciputctrl); 3765 ASSERT(ciputctrl_cache != NULL); 3766 3767 if (!(sq->sq_type & SQ_CIPUT)) 3768 return; 3769 3770 for (i = 0; i <= 1; i++) { 3771 if (sq->sq_ciputctrl == NULL) { 3772 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3773 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3774 mutex_enter(SQLOCK(sq)); 3775 if (sq->sq_ciputctrl != NULL) { 3776 mutex_exit(SQLOCK(sq)); 3777 kmem_cache_free(ciputctrl_cache, cip); 3778 } else { 3779 ASSERT(sq->sq_nciputctrl == 0); 3780 sq->sq_nciputctrl = n_ciputctrl - 1; 3781 /* 3782 * putnext checks sq_ciputctrl without holding 3783 * SQLOCK. if it is not NULL putnext assumes 3784 * sq_nciputctrl is initialized. membar below 3785 * insures that. 3786 */ 3787 membar_producer(); 3788 sq->sq_ciputctrl = cip; 3789 mutex_exit(SQLOCK(sq)); 3790 } 3791 } 3792 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 3793 if (i == 1) 3794 break; 3795 q = _OTHERQ(q); 3796 if (!(q->q_flag & QPERQ)) { 3797 ASSERT(sq == q->q_syncq); 3798 break; 3799 } 3800 ASSERT(q->q_syncq != NULL); 3801 ASSERT(sq != q->q_syncq); 3802 sq = q->q_syncq; 3803 ASSERT(sq->sq_type & SQ_CIPUT); 3804 } 3805 } 3806 3807 /* 3808 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 3809 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 3810 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 3811 * starting from q and down to the driver. 3812 * 3813 * This should be called after the affected queues are part of stream 3814 * geometry. It should be called from driver/module open routine after 3815 * qprocson() call. It is also called from nfs syscall where it is known that 3816 * stream is configured and won't change its geometry during create_putlock 3817 * call. 3818 * 3819 * caller normally uses 0 value for the stream argument to speed up MT putnext 3820 * into the perimeter of q for example because its perimeter is per module 3821 * (e.g. IP). 3822 * 3823 * caller normally uses non 0 value for the stream argument to hint the system 3824 * that the stream of q is a very contended global system stream 3825 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 3826 * particularly MT hot. 3827 * 3828 * Caller insures stream plumbing won't happen while we are here and therefore 3829 * q_next can be safely used. 3830 */ 3831 3832 void 3833 create_putlocks(queue_t *q, int stream) 3834 { 3835 ciputctrl_t *cip; 3836 struct stdata *stp = STREAM(q); 3837 3838 q = _WR(q); 3839 ASSERT(stp != NULL); 3840 3841 if (disable_putlocks != 0) 3842 return; 3843 3844 if (n_ciputctrl < min_n_ciputctrl) 3845 return; 3846 3847 ASSERT(ciputctrl_cache != NULL); 3848 3849 if (stream != 0 && stp->sd_ciputctrl == NULL) { 3850 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3851 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3852 mutex_enter(&stp->sd_lock); 3853 if (stp->sd_ciputctrl != NULL) { 3854 mutex_exit(&stp->sd_lock); 3855 kmem_cache_free(ciputctrl_cache, cip); 3856 } else { 3857 ASSERT(stp->sd_nciputctrl == 0); 3858 stp->sd_nciputctrl = n_ciputctrl - 1; 3859 /* 3860 * putnext checks sd_ciputctrl without holding 3861 * sd_lock. if it is not NULL putnext assumes 3862 * sd_nciputctrl is initialized. membar below 3863 * insures that. 3864 */ 3865 membar_producer(); 3866 stp->sd_ciputctrl = cip; 3867 mutex_exit(&stp->sd_lock); 3868 } 3869 } 3870 3871 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 3872 3873 while (_SAMESTR(q)) { 3874 create_syncq_putlocks(q); 3875 if (stream == 0) 3876 return; 3877 q = q->q_next; 3878 } 3879 ASSERT(q != NULL); 3880 create_syncq_putlocks(q); 3881 } 3882 3883 /* 3884 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 3885 * through a stream. 3886 * 3887 * Data currently record per event is a hrtime stamp, queue address, event 3888 * type, and a per type datum. Much of the STREAMS framework is instrumented 3889 * for automatic flow tracing (when enabled). Events can be defined and used 3890 * by STREAMS modules and drivers. 3891 * 3892 * Global objects: 3893 * 3894 * str_ftevent() - Add a flow-trace event to a dblk. 3895 * str_ftfree() - Free flow-trace data 3896 * 3897 * Local objects: 3898 * 3899 * fthdr_cache - pointer to the kmem cache for trace header. 3900 * ftblk_cache - pointer to the kmem cache for trace data blocks. 3901 */ 3902 3903 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 3904 3905 void 3906 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 3907 { 3908 ftblk_t *bp = hp->tail; 3909 ftblk_t *nbp; 3910 ftevnt_t *ep; 3911 int ix, nix; 3912 3913 ASSERT(hp != NULL); 3914 3915 for (;;) { 3916 if ((ix = bp->ix) == FTBLK_EVNTS) { 3917 /* 3918 * Tail doesn't have room, so need a new tail. 3919 * 3920 * To make this MT safe, first, allocate a new 3921 * ftblk, and initialize it. To make life a 3922 * little easier, reserve the first slot (mostly 3923 * by making ix = 1). When we are finished with 3924 * the initialization, CAS this pointer to the 3925 * tail. If this succeeds, this is the new 3926 * "next" block. Otherwise, another thread 3927 * got here first, so free the block and start 3928 * again. 3929 */ 3930 if (!(nbp = kmem_cache_alloc(ftblk_cache, 3931 KM_NOSLEEP))) { 3932 /* no mem, so punt */ 3933 str_ftnever++; 3934 /* free up all flow data? */ 3935 return; 3936 } 3937 nbp->nxt = NULL; 3938 nbp->ix = 1; 3939 /* 3940 * Just in case there is another thread about 3941 * to get the next index, we need to make sure 3942 * the value is there for it. 3943 */ 3944 membar_producer(); 3945 if (casptr(&hp->tail, bp, nbp) == bp) { 3946 /* CAS was successful */ 3947 bp->nxt = nbp; 3948 membar_producer(); 3949 bp = nbp; 3950 ix = 0; 3951 goto cas_good; 3952 } else { 3953 kmem_cache_free(ftblk_cache, nbp); 3954 bp = hp->tail; 3955 continue; 3956 } 3957 } 3958 nix = ix + 1; 3959 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 3960 cas_good: 3961 if (curthread != hp->thread) { 3962 hp->thread = curthread; 3963 evnt |= FTEV_CS; 3964 } 3965 if (CPU->cpu_seqid != hp->cpu_seqid) { 3966 hp->cpu_seqid = CPU->cpu_seqid; 3967 evnt |= FTEV_PS; 3968 } 3969 ep = &bp->ev[ix]; 3970 break; 3971 } 3972 } 3973 3974 if (evnt & FTEV_QMASK) { 3975 queue_t *qp = p; 3976 3977 /* 3978 * It is possible that the module info is broke 3979 * (as is logsubr.c at this comment writing). 3980 * Instead of panicing or doing other unmentionables, 3981 * we shall put a dummy name as the mid, and continue. 3982 */ 3983 if (qp->q_qinfo == NULL) 3984 ep->mid = "NONAME"; 3985 else 3986 ep->mid = qp->q_qinfo->qi_minfo->mi_idname; 3987 3988 if (!(qp->q_flag & QREADR)) 3989 evnt |= FTEV_ISWR; 3990 } else { 3991 ep->mid = (char *)p; 3992 } 3993 3994 ep->ts = gethrtime(); 3995 ep->evnt = evnt; 3996 ep->data = data; 3997 hp->hash = (hp->hash << 9) + hp->hash; 3998 hp->hash += (evnt << 16) | data; 3999 hp->hash += (uintptr_t)ep->mid; 4000 } 4001 4002 /* 4003 * Free flow-trace data. 4004 */ 4005 void 4006 str_ftfree(dblk_t *dbp) 4007 { 4008 fthdr_t *hp = dbp->db_fthdr; 4009 ftblk_t *bp = &hp->first; 4010 ftblk_t *nbp; 4011 4012 if (bp != hp->tail || bp->ix != 0) { 4013 /* 4014 * Clear out the hash, have the tail point to itself, and free 4015 * any continuation blocks. 4016 */ 4017 bp = hp->first.nxt; 4018 hp->tail = &hp->first; 4019 hp->hash = 0; 4020 hp->first.nxt = NULL; 4021 hp->first.ix = 0; 4022 while (bp != NULL) { 4023 nbp = bp->nxt; 4024 kmem_cache_free(ftblk_cache, bp); 4025 bp = nbp; 4026 } 4027 } 4028 kmem_cache_free(fthdr_cache, hp); 4029 dbp->db_fthdr = NULL; 4030 } 4031