1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 23 /* All Rights Reserved */ 24 25 26 /* 27 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 28 * Use is subject to license terms. 29 */ 30 31 #pragma ident "%Z%%M% %I% %E% SMI" 32 33 #include <sys/types.h> 34 #include <sys/param.h> 35 #include <sys/thread.h> 36 #include <sys/sysmacros.h> 37 #include <sys/stropts.h> 38 #include <sys/stream.h> 39 #include <sys/strsubr.h> 40 #include <sys/strsun.h> 41 #include <sys/conf.h> 42 #include <sys/debug.h> 43 #include <sys/cmn_err.h> 44 #include <sys/kmem.h> 45 #include <sys/atomic.h> 46 #include <sys/errno.h> 47 #include <sys/vtrace.h> 48 #include <sys/ftrace.h> 49 #include <sys/ontrap.h> 50 #include <sys/multidata.h> 51 #include <sys/multidata_impl.h> 52 #include <sys/sdt.h> 53 54 #ifdef DEBUG 55 #include <sys/kmem_impl.h> 56 #endif 57 58 /* 59 * This file contains all the STREAMS utility routines that may 60 * be used by modules and drivers. 61 */ 62 63 /* 64 * STREAMS message allocator: principles of operation 65 * 66 * The streams message allocator consists of all the routines that 67 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 68 * dupb(), freeb() and freemsg(). What follows is a high-level view 69 * of how the allocator works. 70 * 71 * Every streams message consists of one or more mblks, a dblk, and data. 72 * All mblks for all types of messages come from a common mblk_cache. 73 * The dblk and data come in several flavors, depending on how the 74 * message is allocated: 75 * 76 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 77 * fixed-size dblk/data caches. For message sizes that are multiples of 78 * PAGESIZE, dblks are allocated separately from the buffer. 79 * The associated buffer is allocated by the constructor using kmem_alloc(). 80 * For all other message sizes, dblk and its associated data is allocated 81 * as a single contiguous chunk of memory. 82 * Objects in these caches consist of a dblk plus its associated data. 83 * allocb() determines the nearest-size cache by table lookup: 84 * the dblk_cache[] array provides the mapping from size to dblk cache. 85 * 86 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 87 * kmem_alloc()'ing a buffer for the data and supplying that 88 * buffer to gesballoc(), described below. 89 * 90 * (3) The four flavors of [d]esballoc[a] are all implemented by a 91 * common routine, gesballoc() ("generic esballoc"). gesballoc() 92 * allocates a dblk from the global dblk_esb_cache and sets db_base, 93 * db_lim and db_frtnp to describe the caller-supplied buffer. 94 * 95 * While there are several routines to allocate messages, there is only 96 * one routine to free messages: freeb(). freeb() simply invokes the 97 * dblk's free method, dbp->db_free(), which is set at allocation time. 98 * 99 * dupb() creates a new reference to a message by allocating a new mblk, 100 * incrementing the dblk reference count and setting the dblk's free 101 * method to dblk_decref(). The dblk's original free method is retained 102 * in db_lastfree. dblk_decref() decrements the reference count on each 103 * freeb(). If this is not the last reference it just frees the mblk; 104 * if this *is* the last reference, it restores db_free to db_lastfree, 105 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 106 * 107 * The implementation makes aggressive use of kmem object caching for 108 * maximum performance. This makes the code simple and compact, but 109 * also a bit abstruse in some places. The invariants that constitute a 110 * message's constructed state, described below, are more subtle than usual. 111 * 112 * Every dblk has an "attached mblk" as part of its constructed state. 113 * The mblk is allocated by the dblk's constructor and remains attached 114 * until the message is either dup'ed or pulled up. In the dupb() case 115 * the mblk association doesn't matter until the last free, at which time 116 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 117 * the mblk association because it swaps the leading mblks of two messages, 118 * so it is responsible for swapping their db_mblk pointers accordingly. 119 * From a constructed-state viewpoint it doesn't matter that a dblk's 120 * attached mblk can change while the message is allocated; all that 121 * matters is that the dblk has *some* attached mblk when it's freed. 122 * 123 * The sizes of the allocb() small-message caches are not magical. 124 * They represent a good trade-off between internal and external 125 * fragmentation for current workloads. They should be reevaluated 126 * periodically, especially if allocations larger than DBLK_MAX_CACHE 127 * become common. We use 64-byte alignment so that dblks don't 128 * straddle cache lines unnecessarily. 129 */ 130 #define DBLK_MAX_CACHE 73728 131 #define DBLK_CACHE_ALIGN 64 132 #define DBLK_MIN_SIZE 8 133 #define DBLK_SIZE_SHIFT 3 134 135 #ifdef _BIG_ENDIAN 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 138 #else 139 #define DBLK_RTFU_SHIFT(field) \ 140 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 141 #endif 142 143 #define DBLK_RTFU(ref, type, flags, uioflag) \ 144 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 145 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 146 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 147 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 148 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 149 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 150 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 151 152 static size_t dblk_sizes[] = { 153 #ifdef _LP64 154 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920, 155 8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688, 156 40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456, 157 #else 158 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968, 159 8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736, 160 40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504, 161 #endif 162 DBLK_MAX_CACHE, 0 163 }; 164 165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 166 static struct kmem_cache *mblk_cache; 167 static struct kmem_cache *dblk_esb_cache; 168 static struct kmem_cache *fthdr_cache; 169 static struct kmem_cache *ftblk_cache; 170 171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 172 static mblk_t *allocb_oversize(size_t size, int flags); 173 static int allocb_tryhard_fails; 174 static void frnop_func(void *arg); 175 frtn_t frnop = { frnop_func }; 176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 177 178 static boolean_t rwnext_enter(queue_t *qp); 179 static void rwnext_exit(queue_t *qp); 180 181 /* 182 * Patchable mblk/dblk kmem_cache flags. 183 */ 184 int dblk_kmem_flags = 0; 185 int mblk_kmem_flags = 0; 186 187 188 static int 189 dblk_constructor(void *buf, void *cdrarg, int kmflags) 190 { 191 dblk_t *dbp = buf; 192 ssize_t msg_size = (ssize_t)cdrarg; 193 size_t index; 194 195 ASSERT(msg_size != 0); 196 197 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 198 199 ASSERT(index <= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 200 201 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 202 return (-1); 203 if ((msg_size & PAGEOFFSET) == 0) { 204 dbp->db_base = kmem_alloc(msg_size, kmflags); 205 if (dbp->db_base == NULL) { 206 kmem_cache_free(mblk_cache, dbp->db_mblk); 207 return (-1); 208 } 209 } else { 210 dbp->db_base = (unsigned char *)&dbp[1]; 211 } 212 213 dbp->db_mblk->b_datap = dbp; 214 dbp->db_cache = dblk_cache[index]; 215 dbp->db_lim = dbp->db_base + msg_size; 216 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 217 dbp->db_frtnp = NULL; 218 dbp->db_fthdr = NULL; 219 dbp->db_credp = NULL; 220 dbp->db_cpid = -1; 221 dbp->db_struioflag = 0; 222 dbp->db_struioun.cksum.flags = 0; 223 return (0); 224 } 225 226 /*ARGSUSED*/ 227 static int 228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 229 { 230 dblk_t *dbp = buf; 231 232 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 233 return (-1); 234 dbp->db_mblk->b_datap = dbp; 235 dbp->db_cache = dblk_esb_cache; 236 dbp->db_fthdr = NULL; 237 dbp->db_credp = NULL; 238 dbp->db_cpid = -1; 239 dbp->db_struioflag = 0; 240 dbp->db_struioun.cksum.flags = 0; 241 return (0); 242 } 243 244 static int 245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 246 { 247 dblk_t *dbp = buf; 248 bcache_t *bcp = (bcache_t *)cdrarg; 249 250 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 251 return (-1); 252 253 if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache, 254 kmflags)) == NULL) { 255 kmem_cache_free(mblk_cache, dbp->db_mblk); 256 return (-1); 257 } 258 259 dbp->db_mblk->b_datap = dbp; 260 dbp->db_cache = (void *)bcp; 261 dbp->db_lim = dbp->db_base + bcp->size; 262 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 263 dbp->db_frtnp = NULL; 264 dbp->db_fthdr = NULL; 265 dbp->db_credp = NULL; 266 dbp->db_cpid = -1; 267 dbp->db_struioflag = 0; 268 dbp->db_struioun.cksum.flags = 0; 269 return (0); 270 } 271 272 /*ARGSUSED*/ 273 static void 274 dblk_destructor(void *buf, void *cdrarg) 275 { 276 dblk_t *dbp = buf; 277 ssize_t msg_size = (ssize_t)cdrarg; 278 279 ASSERT(dbp->db_mblk->b_datap == dbp); 280 281 ASSERT(msg_size != 0); 282 283 ASSERT(dbp->db_struioflag == 0); 284 ASSERT(dbp->db_struioun.cksum.flags == 0); 285 286 if ((msg_size & PAGEOFFSET) == 0) { 287 kmem_free(dbp->db_base, msg_size); 288 } 289 290 kmem_cache_free(mblk_cache, dbp->db_mblk); 291 } 292 293 static void 294 bcache_dblk_destructor(void *buf, void *cdrarg) 295 { 296 dblk_t *dbp = buf; 297 bcache_t *bcp = (bcache_t *)cdrarg; 298 299 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 300 301 ASSERT(dbp->db_mblk->b_datap == dbp); 302 303 ASSERT(dbp->db_struioflag == 0); 304 ASSERT(dbp->db_struioun.cksum.flags == 0); 305 306 kmem_cache_free(mblk_cache, dbp->db_mblk); 307 } 308 309 void 310 streams_msg_init(void) 311 { 312 char name[40]; 313 size_t size; 314 size_t lastsize = DBLK_MIN_SIZE; 315 size_t *sizep; 316 struct kmem_cache *cp; 317 size_t tot_size; 318 int offset; 319 320 mblk_cache = kmem_cache_create("streams_mblk", 321 sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL, 322 mblk_kmem_flags); 323 324 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 325 326 if ((offset = (size & PAGEOFFSET)) != 0) { 327 /* 328 * We are in the middle of a page, dblk should 329 * be allocated on the same page 330 */ 331 tot_size = size + sizeof (dblk_t); 332 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 333 < PAGESIZE); 334 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 335 336 } else { 337 338 /* 339 * buf size is multiple of page size, dblk and 340 * buffer are allocated separately. 341 */ 342 343 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 344 tot_size = sizeof (dblk_t); 345 } 346 347 (void) sprintf(name, "streams_dblk_%ld", size); 348 cp = kmem_cache_create(name, tot_size, 349 DBLK_CACHE_ALIGN, dblk_constructor, 350 dblk_destructor, NULL, 351 (void *)(size), NULL, dblk_kmem_flags); 352 353 while (lastsize <= size) { 354 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 355 lastsize += DBLK_MIN_SIZE; 356 } 357 } 358 359 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", 360 sizeof (dblk_t), DBLK_CACHE_ALIGN, 361 dblk_esb_constructor, dblk_destructor, NULL, 362 (void *) sizeof (dblk_t), NULL, dblk_kmem_flags); 363 fthdr_cache = kmem_cache_create("streams_fthdr", 364 sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 365 ftblk_cache = kmem_cache_create("streams_ftblk", 366 sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 367 368 /* Initialize Multidata caches */ 369 mmd_init(); 370 } 371 372 /*ARGSUSED*/ 373 mblk_t * 374 allocb(size_t size, uint_t pri) 375 { 376 dblk_t *dbp; 377 mblk_t *mp; 378 size_t index; 379 380 index = (size - 1) >> DBLK_SIZE_SHIFT; 381 382 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 383 if (size != 0) { 384 mp = allocb_oversize(size, KM_NOSLEEP); 385 goto out; 386 } 387 index = 0; 388 } 389 390 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 391 mp = NULL; 392 goto out; 393 } 394 395 mp = dbp->db_mblk; 396 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 397 mp->b_next = mp->b_prev = mp->b_cont = NULL; 398 mp->b_rptr = mp->b_wptr = dbp->db_base; 399 mp->b_queue = NULL; 400 MBLK_BAND_FLAG_WORD(mp) = 0; 401 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 402 out: 403 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 404 405 return (mp); 406 } 407 408 mblk_t * 409 allocb_tmpl(size_t size, const mblk_t *tmpl) 410 { 411 mblk_t *mp = allocb(size, 0); 412 413 if (mp != NULL) { 414 cred_t *cr = DB_CRED(tmpl); 415 if (cr != NULL) 416 crhold(mp->b_datap->db_credp = cr); 417 DB_CPID(mp) = DB_CPID(tmpl); 418 DB_TYPE(mp) = DB_TYPE(tmpl); 419 } 420 return (mp); 421 } 422 423 mblk_t * 424 allocb_cred(size_t size, cred_t *cr) 425 { 426 mblk_t *mp = allocb(size, 0); 427 428 if (mp != NULL && cr != NULL) 429 crhold(mp->b_datap->db_credp = cr); 430 431 return (mp); 432 } 433 434 mblk_t * 435 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr) 436 { 437 mblk_t *mp = allocb_wait(size, 0, flags, error); 438 439 if (mp != NULL && cr != NULL) 440 crhold(mp->b_datap->db_credp = cr); 441 442 return (mp); 443 } 444 445 void 446 freeb(mblk_t *mp) 447 { 448 dblk_t *dbp = mp->b_datap; 449 450 ASSERT(dbp->db_ref > 0); 451 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 452 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 453 454 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 455 456 dbp->db_free(mp, dbp); 457 } 458 459 void 460 freemsg(mblk_t *mp) 461 { 462 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 463 while (mp) { 464 dblk_t *dbp = mp->b_datap; 465 mblk_t *mp_cont = mp->b_cont; 466 467 ASSERT(dbp->db_ref > 0); 468 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 469 470 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 471 472 dbp->db_free(mp, dbp); 473 mp = mp_cont; 474 } 475 } 476 477 /* 478 * Reallocate a block for another use. Try hard to use the old block. 479 * If the old data is wanted (copy), leave b_wptr at the end of the data, 480 * otherwise return b_wptr = b_rptr. 481 * 482 * This routine is private and unstable. 483 */ 484 mblk_t * 485 reallocb(mblk_t *mp, size_t size, uint_t copy) 486 { 487 mblk_t *mp1; 488 unsigned char *old_rptr; 489 ptrdiff_t cur_size; 490 491 if (mp == NULL) 492 return (allocb(size, BPRI_HI)); 493 494 cur_size = mp->b_wptr - mp->b_rptr; 495 old_rptr = mp->b_rptr; 496 497 ASSERT(mp->b_datap->db_ref != 0); 498 499 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 500 /* 501 * If the data is wanted and it will fit where it is, no 502 * work is required. 503 */ 504 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 505 return (mp); 506 507 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 508 mp1 = mp; 509 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 510 /* XXX other mp state could be copied too, db_flags ... ? */ 511 mp1->b_cont = mp->b_cont; 512 } else { 513 return (NULL); 514 } 515 516 if (copy) { 517 bcopy(old_rptr, mp1->b_rptr, cur_size); 518 mp1->b_wptr = mp1->b_rptr + cur_size; 519 } 520 521 if (mp != mp1) 522 freeb(mp); 523 524 return (mp1); 525 } 526 527 static void 528 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 529 { 530 ASSERT(dbp->db_mblk == mp); 531 if (dbp->db_fthdr != NULL) 532 str_ftfree(dbp); 533 534 /* set credp and projid to be 'unspecified' before returning to cache */ 535 if (dbp->db_credp != NULL) { 536 crfree(dbp->db_credp); 537 dbp->db_credp = NULL; 538 } 539 dbp->db_cpid = -1; 540 541 /* Reset the struioflag and the checksum flag fields */ 542 dbp->db_struioflag = 0; 543 dbp->db_struioun.cksum.flags = 0; 544 545 kmem_cache_free(dbp->db_cache, dbp); 546 } 547 548 static void 549 dblk_decref(mblk_t *mp, dblk_t *dbp) 550 { 551 if (dbp->db_ref != 1) { 552 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 553 -(1 << DBLK_RTFU_SHIFT(db_ref))); 554 /* 555 * atomic_add_32_nv() just decremented db_ref, so we no longer 556 * have a reference to the dblk, which means another thread 557 * could free it. Therefore we cannot examine the dblk to 558 * determine whether ours was the last reference. Instead, 559 * we extract the new and minimum reference counts from rtfu. 560 * Note that all we're really saying is "if (ref != refmin)". 561 */ 562 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 563 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 564 kmem_cache_free(mblk_cache, mp); 565 return; 566 } 567 } 568 dbp->db_mblk = mp; 569 dbp->db_free = dbp->db_lastfree; 570 dbp->db_lastfree(mp, dbp); 571 } 572 573 mblk_t * 574 dupb(mblk_t *mp) 575 { 576 dblk_t *dbp = mp->b_datap; 577 mblk_t *new_mp; 578 uint32_t oldrtfu, newrtfu; 579 580 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 581 goto out; 582 583 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 584 new_mp->b_rptr = mp->b_rptr; 585 new_mp->b_wptr = mp->b_wptr; 586 new_mp->b_datap = dbp; 587 new_mp->b_queue = NULL; 588 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 589 590 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 591 592 /* 593 * First-dup optimization. The enabling assumption is that there 594 * can can never be a race (in correct code) to dup the first copy 595 * of a message. Therefore we don't need to do it atomically. 596 */ 597 if (dbp->db_free != dblk_decref) { 598 dbp->db_free = dblk_decref; 599 dbp->db_ref++; 600 goto out; 601 } 602 603 do { 604 ASSERT(dbp->db_ref > 0); 605 oldrtfu = DBLK_RTFU_WORD(dbp); 606 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 607 /* 608 * If db_ref is maxed out we can't dup this message anymore. 609 */ 610 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 611 kmem_cache_free(mblk_cache, new_mp); 612 new_mp = NULL; 613 goto out; 614 } 615 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 616 617 out: 618 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 619 return (new_mp); 620 } 621 622 static void 623 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 624 { 625 frtn_t *frp = dbp->db_frtnp; 626 627 ASSERT(dbp->db_mblk == mp); 628 frp->free_func(frp->free_arg); 629 if (dbp->db_fthdr != NULL) 630 str_ftfree(dbp); 631 632 /* set credp and projid to be 'unspecified' before returning to cache */ 633 if (dbp->db_credp != NULL) { 634 crfree(dbp->db_credp); 635 dbp->db_credp = NULL; 636 } 637 dbp->db_cpid = -1; 638 dbp->db_struioflag = 0; 639 dbp->db_struioun.cksum.flags = 0; 640 641 kmem_cache_free(dbp->db_cache, dbp); 642 } 643 644 /*ARGSUSED*/ 645 static void 646 frnop_func(void *arg) 647 { 648 } 649 650 /* 651 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 652 */ 653 static mblk_t * 654 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 655 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 656 { 657 dblk_t *dbp; 658 mblk_t *mp; 659 660 ASSERT(base != NULL && frp != NULL); 661 662 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 663 mp = NULL; 664 goto out; 665 } 666 667 mp = dbp->db_mblk; 668 dbp->db_base = base; 669 dbp->db_lim = base + size; 670 dbp->db_free = dbp->db_lastfree = lastfree; 671 dbp->db_frtnp = frp; 672 DBLK_RTFU_WORD(dbp) = db_rtfu; 673 mp->b_next = mp->b_prev = mp->b_cont = NULL; 674 mp->b_rptr = mp->b_wptr = base; 675 mp->b_queue = NULL; 676 MBLK_BAND_FLAG_WORD(mp) = 0; 677 678 out: 679 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 680 return (mp); 681 } 682 683 /*ARGSUSED*/ 684 mblk_t * 685 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 686 { 687 mblk_t *mp; 688 689 /* 690 * Note that this is structured to allow the common case (i.e. 691 * STREAMS flowtracing disabled) to call gesballoc() with tail 692 * call optimization. 693 */ 694 if (!str_ftnever) { 695 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 696 frp, freebs_enqueue, KM_NOSLEEP); 697 698 if (mp != NULL) 699 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 700 return (mp); 701 } 702 703 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 704 frp, freebs_enqueue, KM_NOSLEEP)); 705 } 706 707 /* 708 * Same as esballoc() but sleeps waiting for memory. 709 */ 710 /*ARGSUSED*/ 711 mblk_t * 712 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 713 { 714 mblk_t *mp; 715 716 /* 717 * Note that this is structured to allow the common case (i.e. 718 * STREAMS flowtracing disabled) to call gesballoc() with tail 719 * call optimization. 720 */ 721 if (!str_ftnever) { 722 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 723 frp, freebs_enqueue, KM_SLEEP); 724 725 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 726 return (mp); 727 } 728 729 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 730 frp, freebs_enqueue, KM_SLEEP)); 731 } 732 733 /*ARGSUSED*/ 734 mblk_t * 735 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 736 { 737 mblk_t *mp; 738 739 /* 740 * Note that this is structured to allow the common case (i.e. 741 * STREAMS flowtracing disabled) to call gesballoc() with tail 742 * call optimization. 743 */ 744 if (!str_ftnever) { 745 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 746 frp, dblk_lastfree_desb, KM_NOSLEEP); 747 748 if (mp != NULL) 749 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 750 return (mp); 751 } 752 753 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 754 frp, dblk_lastfree_desb, KM_NOSLEEP)); 755 } 756 757 /*ARGSUSED*/ 758 mblk_t * 759 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 760 { 761 mblk_t *mp; 762 763 /* 764 * Note that this is structured to allow the common case (i.e. 765 * STREAMS flowtracing disabled) to call gesballoc() with tail 766 * call optimization. 767 */ 768 if (!str_ftnever) { 769 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 770 frp, freebs_enqueue, KM_NOSLEEP); 771 772 if (mp != NULL) 773 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 774 return (mp); 775 } 776 777 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 778 frp, freebs_enqueue, KM_NOSLEEP)); 779 } 780 781 /*ARGSUSED*/ 782 mblk_t * 783 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 784 { 785 mblk_t *mp; 786 787 /* 788 * Note that this is structured to allow the common case (i.e. 789 * STREAMS flowtracing disabled) to call gesballoc() with tail 790 * call optimization. 791 */ 792 if (!str_ftnever) { 793 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 794 frp, dblk_lastfree_desb, KM_NOSLEEP); 795 796 if (mp != NULL) 797 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 798 return (mp); 799 } 800 801 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 802 frp, dblk_lastfree_desb, KM_NOSLEEP)); 803 } 804 805 static void 806 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 807 { 808 bcache_t *bcp = dbp->db_cache; 809 810 ASSERT(dbp->db_mblk == mp); 811 if (dbp->db_fthdr != NULL) 812 str_ftfree(dbp); 813 814 /* set credp and projid to be 'unspecified' before returning to cache */ 815 if (dbp->db_credp != NULL) { 816 crfree(dbp->db_credp); 817 dbp->db_credp = NULL; 818 } 819 dbp->db_cpid = -1; 820 dbp->db_struioflag = 0; 821 dbp->db_struioun.cksum.flags = 0; 822 823 mutex_enter(&bcp->mutex); 824 kmem_cache_free(bcp->dblk_cache, dbp); 825 bcp->alloc--; 826 827 if (bcp->alloc == 0 && bcp->destroy != 0) { 828 kmem_cache_destroy(bcp->dblk_cache); 829 kmem_cache_destroy(bcp->buffer_cache); 830 mutex_exit(&bcp->mutex); 831 mutex_destroy(&bcp->mutex); 832 kmem_free(bcp, sizeof (bcache_t)); 833 } else { 834 mutex_exit(&bcp->mutex); 835 } 836 } 837 838 bcache_t * 839 bcache_create(char *name, size_t size, uint_t align) 840 { 841 bcache_t *bcp; 842 char buffer[255]; 843 844 ASSERT((align & (align - 1)) == 0); 845 846 if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == 847 NULL) { 848 return (NULL); 849 } 850 851 bcp->size = size; 852 bcp->align = align; 853 bcp->alloc = 0; 854 bcp->destroy = 0; 855 856 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 857 858 (void) sprintf(buffer, "%s_buffer_cache", name); 859 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 860 NULL, NULL, NULL, 0); 861 (void) sprintf(buffer, "%s_dblk_cache", name); 862 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 863 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 864 NULL, (void *)bcp, NULL, 0); 865 866 return (bcp); 867 } 868 869 void 870 bcache_destroy(bcache_t *bcp) 871 { 872 ASSERT(bcp != NULL); 873 874 mutex_enter(&bcp->mutex); 875 if (bcp->alloc == 0) { 876 kmem_cache_destroy(bcp->dblk_cache); 877 kmem_cache_destroy(bcp->buffer_cache); 878 mutex_exit(&bcp->mutex); 879 mutex_destroy(&bcp->mutex); 880 kmem_free(bcp, sizeof (bcache_t)); 881 } else { 882 bcp->destroy++; 883 mutex_exit(&bcp->mutex); 884 } 885 } 886 887 /*ARGSUSED*/ 888 mblk_t * 889 bcache_allocb(bcache_t *bcp, uint_t pri) 890 { 891 dblk_t *dbp; 892 mblk_t *mp = NULL; 893 894 ASSERT(bcp != NULL); 895 896 mutex_enter(&bcp->mutex); 897 if (bcp->destroy != 0) { 898 mutex_exit(&bcp->mutex); 899 goto out; 900 } 901 902 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 903 mutex_exit(&bcp->mutex); 904 goto out; 905 } 906 bcp->alloc++; 907 mutex_exit(&bcp->mutex); 908 909 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 910 911 mp = dbp->db_mblk; 912 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 913 mp->b_next = mp->b_prev = mp->b_cont = NULL; 914 mp->b_rptr = mp->b_wptr = dbp->db_base; 915 mp->b_queue = NULL; 916 MBLK_BAND_FLAG_WORD(mp) = 0; 917 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 918 out: 919 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 920 921 return (mp); 922 } 923 924 static void 925 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 926 { 927 ASSERT(dbp->db_mblk == mp); 928 if (dbp->db_fthdr != NULL) 929 str_ftfree(dbp); 930 931 /* set credp and projid to be 'unspecified' before returning to cache */ 932 if (dbp->db_credp != NULL) { 933 crfree(dbp->db_credp); 934 dbp->db_credp = NULL; 935 } 936 dbp->db_cpid = -1; 937 dbp->db_struioflag = 0; 938 dbp->db_struioun.cksum.flags = 0; 939 940 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 941 kmem_cache_free(dbp->db_cache, dbp); 942 } 943 944 static mblk_t * 945 allocb_oversize(size_t size, int kmflags) 946 { 947 mblk_t *mp; 948 void *buf; 949 950 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 951 if ((buf = kmem_alloc(size, kmflags)) == NULL) 952 return (NULL); 953 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 954 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 955 kmem_free(buf, size); 956 957 if (mp != NULL) 958 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 959 960 return (mp); 961 } 962 963 mblk_t * 964 allocb_tryhard(size_t target_size) 965 { 966 size_t size; 967 mblk_t *bp; 968 969 for (size = target_size; size < target_size + 512; 970 size += DBLK_CACHE_ALIGN) 971 if ((bp = allocb(size, BPRI_HI)) != NULL) 972 return (bp); 973 allocb_tryhard_fails++; 974 return (NULL); 975 } 976 977 /* 978 * This routine is consolidation private for STREAMS internal use 979 * This routine may only be called from sync routines (i.e., not 980 * from put or service procedures). It is located here (rather 981 * than strsubr.c) so that we don't have to expose all of the 982 * allocb() implementation details in header files. 983 */ 984 mblk_t * 985 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 986 { 987 dblk_t *dbp; 988 mblk_t *mp; 989 size_t index; 990 991 index = (size -1) >> DBLK_SIZE_SHIFT; 992 993 if (flags & STR_NOSIG) { 994 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 995 if (size != 0) { 996 mp = allocb_oversize(size, KM_SLEEP); 997 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 998 (uintptr_t)mp); 999 return (mp); 1000 } 1001 index = 0; 1002 } 1003 1004 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1005 mp = dbp->db_mblk; 1006 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1007 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1008 mp->b_rptr = mp->b_wptr = dbp->db_base; 1009 mp->b_queue = NULL; 1010 MBLK_BAND_FLAG_WORD(mp) = 0; 1011 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1012 1013 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1014 1015 } else { 1016 while ((mp = allocb(size, pri)) == NULL) { 1017 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1018 return (NULL); 1019 } 1020 } 1021 1022 return (mp); 1023 } 1024 1025 /* 1026 * Call function 'func' with 'arg' when a class zero block can 1027 * be allocated with priority 'pri'. 1028 */ 1029 bufcall_id_t 1030 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1031 { 1032 return (bufcall(1, pri, func, arg)); 1033 } 1034 1035 /* 1036 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1037 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1038 * This provides consistency for all internal allocators of ioctl. 1039 */ 1040 mblk_t * 1041 mkiocb(uint_t cmd) 1042 { 1043 struct iocblk *ioc; 1044 mblk_t *mp; 1045 1046 /* 1047 * Allocate enough space for any of the ioctl related messages. 1048 */ 1049 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1050 return (NULL); 1051 1052 bzero(mp->b_rptr, sizeof (union ioctypes)); 1053 1054 /* 1055 * Set the mblk_t information and ptrs correctly. 1056 */ 1057 mp->b_wptr += sizeof (struct iocblk); 1058 mp->b_datap->db_type = M_IOCTL; 1059 1060 /* 1061 * Fill in the fields. 1062 */ 1063 ioc = (struct iocblk *)mp->b_rptr; 1064 ioc->ioc_cmd = cmd; 1065 ioc->ioc_cr = kcred; 1066 ioc->ioc_id = getiocseqno(); 1067 ioc->ioc_flag = IOC_NATIVE; 1068 return (mp); 1069 } 1070 1071 /* 1072 * test if block of given size can be allocated with a request of 1073 * the given priority. 1074 * 'pri' is no longer used, but is retained for compatibility. 1075 */ 1076 /* ARGSUSED */ 1077 int 1078 testb(size_t size, uint_t pri) 1079 { 1080 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1081 } 1082 1083 /* 1084 * Call function 'func' with argument 'arg' when there is a reasonably 1085 * good chance that a block of size 'size' can be allocated. 1086 * 'pri' is no longer used, but is retained for compatibility. 1087 */ 1088 /* ARGSUSED */ 1089 bufcall_id_t 1090 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1091 { 1092 static long bid = 1; /* always odd to save checking for zero */ 1093 bufcall_id_t bc_id; 1094 struct strbufcall *bcp; 1095 1096 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1097 return (0); 1098 1099 bcp->bc_func = func; 1100 bcp->bc_arg = arg; 1101 bcp->bc_size = size; 1102 bcp->bc_next = NULL; 1103 bcp->bc_executor = NULL; 1104 1105 mutex_enter(&strbcall_lock); 1106 /* 1107 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1108 * should be no references to bcp since it may be freed by 1109 * runbufcalls(). Since bcp_id field is returned, we save its value in 1110 * the local var. 1111 */ 1112 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1113 1114 /* 1115 * add newly allocated stream event to existing 1116 * linked list of events. 1117 */ 1118 if (strbcalls.bc_head == NULL) { 1119 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1120 } else { 1121 strbcalls.bc_tail->bc_next = bcp; 1122 strbcalls.bc_tail = bcp; 1123 } 1124 1125 cv_signal(&strbcall_cv); 1126 mutex_exit(&strbcall_lock); 1127 return (bc_id); 1128 } 1129 1130 /* 1131 * Cancel a bufcall request. 1132 */ 1133 void 1134 unbufcall(bufcall_id_t id) 1135 { 1136 strbufcall_t *bcp, *pbcp; 1137 1138 mutex_enter(&strbcall_lock); 1139 again: 1140 pbcp = NULL; 1141 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1142 if (id == bcp->bc_id) 1143 break; 1144 pbcp = bcp; 1145 } 1146 if (bcp) { 1147 if (bcp->bc_executor != NULL) { 1148 if (bcp->bc_executor != curthread) { 1149 cv_wait(&bcall_cv, &strbcall_lock); 1150 goto again; 1151 } 1152 } else { 1153 if (pbcp) 1154 pbcp->bc_next = bcp->bc_next; 1155 else 1156 strbcalls.bc_head = bcp->bc_next; 1157 if (bcp == strbcalls.bc_tail) 1158 strbcalls.bc_tail = pbcp; 1159 kmem_free(bcp, sizeof (strbufcall_t)); 1160 } 1161 } 1162 mutex_exit(&strbcall_lock); 1163 } 1164 1165 /* 1166 * Duplicate a message block by block (uses dupb), returning 1167 * a pointer to the duplicate message. 1168 * Returns a non-NULL value only if the entire message 1169 * was dup'd. 1170 */ 1171 mblk_t * 1172 dupmsg(mblk_t *bp) 1173 { 1174 mblk_t *head, *nbp; 1175 1176 if (!bp || !(nbp = head = dupb(bp))) 1177 return (NULL); 1178 1179 while (bp->b_cont) { 1180 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1181 freemsg(head); 1182 return (NULL); 1183 } 1184 nbp = nbp->b_cont; 1185 bp = bp->b_cont; 1186 } 1187 return (head); 1188 } 1189 1190 #define DUPB_NOLOAN(bp) \ 1191 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1192 copyb((bp)) : dupb((bp))) 1193 1194 mblk_t * 1195 dupmsg_noloan(mblk_t *bp) 1196 { 1197 mblk_t *head, *nbp; 1198 1199 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1200 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1201 return (NULL); 1202 1203 while (bp->b_cont) { 1204 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1205 freemsg(head); 1206 return (NULL); 1207 } 1208 nbp = nbp->b_cont; 1209 bp = bp->b_cont; 1210 } 1211 return (head); 1212 } 1213 1214 /* 1215 * Copy data from message and data block to newly allocated message and 1216 * data block. Returns new message block pointer, or NULL if error. 1217 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1218 * as in the original even when db_base is not word aligned. (bug 1052877) 1219 */ 1220 mblk_t * 1221 copyb(mblk_t *bp) 1222 { 1223 mblk_t *nbp; 1224 dblk_t *dp, *ndp; 1225 uchar_t *base; 1226 size_t size; 1227 size_t unaligned; 1228 1229 ASSERT(bp->b_wptr >= bp->b_rptr); 1230 1231 dp = bp->b_datap; 1232 if (dp->db_fthdr != NULL) 1233 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1234 1235 /* 1236 * Special handling for Multidata message; this should be 1237 * removed once a copy-callback routine is made available. 1238 */ 1239 if (dp->db_type == M_MULTIDATA) { 1240 cred_t *cr; 1241 1242 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1243 return (NULL); 1244 1245 nbp->b_flag = bp->b_flag; 1246 nbp->b_band = bp->b_band; 1247 ndp = nbp->b_datap; 1248 1249 /* See comments below on potential issues. */ 1250 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1251 1252 ASSERT(ndp->db_type == dp->db_type); 1253 cr = dp->db_credp; 1254 if (cr != NULL) 1255 crhold(ndp->db_credp = cr); 1256 ndp->db_cpid = dp->db_cpid; 1257 return (nbp); 1258 } 1259 1260 size = dp->db_lim - dp->db_base; 1261 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1262 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1263 return (NULL); 1264 nbp->b_flag = bp->b_flag; 1265 nbp->b_band = bp->b_band; 1266 ndp = nbp->b_datap; 1267 1268 /* 1269 * Well, here is a potential issue. If we are trying to 1270 * trace a flow, and we copy the message, we might lose 1271 * information about where this message might have been. 1272 * So we should inherit the FT data. On the other hand, 1273 * a user might be interested only in alloc to free data. 1274 * So I guess the real answer is to provide a tunable. 1275 */ 1276 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1277 1278 base = ndp->db_base + unaligned; 1279 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1280 1281 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1282 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1283 1284 return (nbp); 1285 } 1286 1287 /* 1288 * Copy data from message to newly allocated message using new 1289 * data blocks. Returns a pointer to the new message, or NULL if error. 1290 */ 1291 mblk_t * 1292 copymsg(mblk_t *bp) 1293 { 1294 mblk_t *head, *nbp; 1295 1296 if (!bp || !(nbp = head = copyb(bp))) 1297 return (NULL); 1298 1299 while (bp->b_cont) { 1300 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1301 freemsg(head); 1302 return (NULL); 1303 } 1304 nbp = nbp->b_cont; 1305 bp = bp->b_cont; 1306 } 1307 return (head); 1308 } 1309 1310 /* 1311 * link a message block to tail of message 1312 */ 1313 void 1314 linkb(mblk_t *mp, mblk_t *bp) 1315 { 1316 ASSERT(mp && bp); 1317 1318 for (; mp->b_cont; mp = mp->b_cont) 1319 ; 1320 mp->b_cont = bp; 1321 } 1322 1323 /* 1324 * unlink a message block from head of message 1325 * return pointer to new message. 1326 * NULL if message becomes empty. 1327 */ 1328 mblk_t * 1329 unlinkb(mblk_t *bp) 1330 { 1331 mblk_t *bp1; 1332 1333 bp1 = bp->b_cont; 1334 bp->b_cont = NULL; 1335 return (bp1); 1336 } 1337 1338 /* 1339 * remove a message block "bp" from message "mp" 1340 * 1341 * Return pointer to new message or NULL if no message remains. 1342 * Return -1 if bp is not found in message. 1343 */ 1344 mblk_t * 1345 rmvb(mblk_t *mp, mblk_t *bp) 1346 { 1347 mblk_t *tmp; 1348 mblk_t *lastp = NULL; 1349 1350 ASSERT(mp && bp); 1351 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1352 if (tmp == bp) { 1353 if (lastp) 1354 lastp->b_cont = tmp->b_cont; 1355 else 1356 mp = tmp->b_cont; 1357 tmp->b_cont = NULL; 1358 return (mp); 1359 } 1360 lastp = tmp; 1361 } 1362 return ((mblk_t *)-1); 1363 } 1364 1365 /* 1366 * Concatenate and align first len bytes of common 1367 * message type. Len == -1, means concat everything. 1368 * Returns 1 on success, 0 on failure 1369 * After the pullup, mp points to the pulled up data. 1370 */ 1371 int 1372 pullupmsg(mblk_t *mp, ssize_t len) 1373 { 1374 mblk_t *bp, *b_cont; 1375 dblk_t *dbp; 1376 ssize_t n; 1377 1378 ASSERT(mp->b_datap->db_ref > 0); 1379 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1380 1381 /* 1382 * We won't handle Multidata message, since it contains 1383 * metadata which this function has no knowledge of; we 1384 * assert on DEBUG, and return failure otherwise. 1385 */ 1386 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1387 if (mp->b_datap->db_type == M_MULTIDATA) 1388 return (0); 1389 1390 if (len == -1) { 1391 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1392 return (1); 1393 len = xmsgsize(mp); 1394 } else { 1395 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1396 ASSERT(first_mblk_len >= 0); 1397 /* 1398 * If the length is less than that of the first mblk, 1399 * we want to pull up the message into an aligned mblk. 1400 * Though not part of the spec, some callers assume it. 1401 */ 1402 if (len <= first_mblk_len) { 1403 if (str_aligned(mp->b_rptr)) 1404 return (1); 1405 len = first_mblk_len; 1406 } else if (xmsgsize(mp) < len) 1407 return (0); 1408 } 1409 1410 if ((bp = allocb_tmpl(len, mp)) == NULL) 1411 return (0); 1412 1413 dbp = bp->b_datap; 1414 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1415 mp->b_datap = dbp; /* ... and mp heads the new message */ 1416 mp->b_datap->db_mblk = mp; 1417 bp->b_datap->db_mblk = bp; 1418 mp->b_rptr = mp->b_wptr = dbp->db_base; 1419 1420 do { 1421 ASSERT(bp->b_datap->db_ref > 0); 1422 ASSERT(bp->b_wptr >= bp->b_rptr); 1423 n = MIN(bp->b_wptr - bp->b_rptr, len); 1424 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1425 mp->b_wptr += n; 1426 bp->b_rptr += n; 1427 len -= n; 1428 if (bp->b_rptr != bp->b_wptr) 1429 break; 1430 b_cont = bp->b_cont; 1431 freeb(bp); 1432 bp = b_cont; 1433 } while (len && bp); 1434 1435 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1436 1437 return (1); 1438 } 1439 1440 /* 1441 * Concatenate and align at least the first len bytes of common message 1442 * type. Len == -1 means concatenate everything. The original message is 1443 * unaltered. Returns a pointer to a new message on success, otherwise 1444 * returns NULL. 1445 */ 1446 mblk_t * 1447 msgpullup(mblk_t *mp, ssize_t len) 1448 { 1449 mblk_t *newmp; 1450 ssize_t totlen; 1451 ssize_t n; 1452 1453 /* 1454 * We won't handle Multidata message, since it contains 1455 * metadata which this function has no knowledge of; we 1456 * assert on DEBUG, and return failure otherwise. 1457 */ 1458 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1459 if (mp->b_datap->db_type == M_MULTIDATA) 1460 return (NULL); 1461 1462 totlen = xmsgsize(mp); 1463 1464 if ((len > 0) && (len > totlen)) 1465 return (NULL); 1466 1467 /* 1468 * Copy all of the first msg type into one new mblk, then dupmsg 1469 * and link the rest onto this. 1470 */ 1471 1472 len = totlen; 1473 1474 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1475 return (NULL); 1476 1477 newmp->b_flag = mp->b_flag; 1478 newmp->b_band = mp->b_band; 1479 1480 while (len > 0) { 1481 n = mp->b_wptr - mp->b_rptr; 1482 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1483 if (n > 0) 1484 bcopy(mp->b_rptr, newmp->b_wptr, n); 1485 newmp->b_wptr += n; 1486 len -= n; 1487 mp = mp->b_cont; 1488 } 1489 1490 if (mp != NULL) { 1491 newmp->b_cont = dupmsg(mp); 1492 if (newmp->b_cont == NULL) { 1493 freemsg(newmp); 1494 return (NULL); 1495 } 1496 } 1497 1498 return (newmp); 1499 } 1500 1501 /* 1502 * Trim bytes from message 1503 * len > 0, trim from head 1504 * len < 0, trim from tail 1505 * Returns 1 on success, 0 on failure. 1506 */ 1507 int 1508 adjmsg(mblk_t *mp, ssize_t len) 1509 { 1510 mblk_t *bp; 1511 mblk_t *save_bp = NULL; 1512 mblk_t *prev_bp; 1513 mblk_t *bcont; 1514 unsigned char type; 1515 ssize_t n; 1516 int fromhead; 1517 int first; 1518 1519 ASSERT(mp != NULL); 1520 /* 1521 * We won't handle Multidata message, since it contains 1522 * metadata which this function has no knowledge of; we 1523 * assert on DEBUG, and return failure otherwise. 1524 */ 1525 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1526 if (mp->b_datap->db_type == M_MULTIDATA) 1527 return (0); 1528 1529 if (len < 0) { 1530 fromhead = 0; 1531 len = -len; 1532 } else { 1533 fromhead = 1; 1534 } 1535 1536 if (xmsgsize(mp) < len) 1537 return (0); 1538 1539 1540 if (fromhead) { 1541 first = 1; 1542 while (len) { 1543 ASSERT(mp->b_wptr >= mp->b_rptr); 1544 n = MIN(mp->b_wptr - mp->b_rptr, len); 1545 mp->b_rptr += n; 1546 len -= n; 1547 1548 /* 1549 * If this is not the first zero length 1550 * message remove it 1551 */ 1552 if (!first && (mp->b_wptr == mp->b_rptr)) { 1553 bcont = mp->b_cont; 1554 freeb(mp); 1555 mp = save_bp->b_cont = bcont; 1556 } else { 1557 save_bp = mp; 1558 mp = mp->b_cont; 1559 } 1560 first = 0; 1561 } 1562 } else { 1563 type = mp->b_datap->db_type; 1564 while (len) { 1565 bp = mp; 1566 save_bp = NULL; 1567 1568 /* 1569 * Find the last message of same type 1570 */ 1571 1572 while (bp && bp->b_datap->db_type == type) { 1573 ASSERT(bp->b_wptr >= bp->b_rptr); 1574 prev_bp = save_bp; 1575 save_bp = bp; 1576 bp = bp->b_cont; 1577 } 1578 if (save_bp == NULL) 1579 break; 1580 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1581 save_bp->b_wptr -= n; 1582 len -= n; 1583 1584 /* 1585 * If this is not the first message 1586 * and we have taken away everything 1587 * from this message, remove it 1588 */ 1589 1590 if ((save_bp != mp) && 1591 (save_bp->b_wptr == save_bp->b_rptr)) { 1592 bcont = save_bp->b_cont; 1593 freeb(save_bp); 1594 prev_bp->b_cont = bcont; 1595 } 1596 } 1597 } 1598 return (1); 1599 } 1600 1601 /* 1602 * get number of data bytes in message 1603 */ 1604 size_t 1605 msgdsize(mblk_t *bp) 1606 { 1607 size_t count = 0; 1608 1609 for (; bp; bp = bp->b_cont) 1610 if (bp->b_datap->db_type == M_DATA) { 1611 ASSERT(bp->b_wptr >= bp->b_rptr); 1612 count += bp->b_wptr - bp->b_rptr; 1613 } 1614 return (count); 1615 } 1616 1617 /* 1618 * Get a message off head of queue 1619 * 1620 * If queue has no buffers then mark queue 1621 * with QWANTR. (queue wants to be read by 1622 * someone when data becomes available) 1623 * 1624 * If there is something to take off then do so. 1625 * If queue falls below hi water mark turn off QFULL 1626 * flag. Decrement weighted count of queue. 1627 * Also turn off QWANTR because queue is being read. 1628 * 1629 * The queue count is maintained on a per-band basis. 1630 * Priority band 0 (normal messages) uses q_count, 1631 * q_lowat, etc. Non-zero priority bands use the 1632 * fields in their respective qband structures 1633 * (qb_count, qb_lowat, etc.) All messages appear 1634 * on the same list, linked via their b_next pointers. 1635 * q_first is the head of the list. q_count does 1636 * not reflect the size of all the messages on the 1637 * queue. It only reflects those messages in the 1638 * normal band of flow. The one exception to this 1639 * deals with high priority messages. They are in 1640 * their own conceptual "band", but are accounted 1641 * against q_count. 1642 * 1643 * If queue count is below the lo water mark and QWANTW 1644 * is set, enable the closest backq which has a service 1645 * procedure and turn off the QWANTW flag. 1646 * 1647 * getq could be built on top of rmvq, but isn't because 1648 * of performance considerations. 1649 * 1650 * A note on the use of q_count and q_mblkcnt: 1651 * q_count is the traditional byte count for messages that 1652 * have been put on a queue. Documentation tells us that 1653 * we shouldn't rely on that count, but some drivers/modules 1654 * do. What was needed, however, is a mechanism to prevent 1655 * runaway streams from consuming all of the resources, 1656 * and particularly be able to flow control zero-length 1657 * messages. q_mblkcnt is used for this purpose. It 1658 * counts the number of mblk's that are being put on 1659 * the queue. The intention here, is that each mblk should 1660 * contain one byte of data and, for the purpose of 1661 * flow-control, logically does. A queue will become 1662 * full when EITHER of these values (q_count and q_mblkcnt) 1663 * reach the highwater mark. It will clear when BOTH 1664 * of them drop below the highwater mark. And it will 1665 * backenable when BOTH of them drop below the lowwater 1666 * mark. 1667 * With this algorithm, a driver/module might be able 1668 * to find a reasonably accurate q_count, and the 1669 * framework can still try and limit resource usage. 1670 */ 1671 mblk_t * 1672 getq(queue_t *q) 1673 { 1674 mblk_t *bp; 1675 uchar_t band = 0; 1676 1677 bp = getq_noenab(q); 1678 if (bp != NULL) 1679 band = bp->b_band; 1680 1681 /* 1682 * Inlined from qbackenable(). 1683 * Quick check without holding the lock. 1684 */ 1685 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1686 return (bp); 1687 1688 qbackenable(q, band); 1689 return (bp); 1690 } 1691 1692 /* 1693 * Like getq() but does not backenable. This is used by the stream 1694 * head when a putback() is likely. The caller must call qbackenable() 1695 * after it is done with accessing the queue. 1696 */ 1697 mblk_t * 1698 getq_noenab(queue_t *q) 1699 { 1700 mblk_t *bp; 1701 mblk_t *tmp; 1702 qband_t *qbp; 1703 kthread_id_t freezer; 1704 int bytecnt = 0, mblkcnt = 0; 1705 1706 /* freezestr should allow its caller to call getq/putq */ 1707 freezer = STREAM(q)->sd_freezer; 1708 if (freezer == curthread) { 1709 ASSERT(frozenstr(q)); 1710 ASSERT(MUTEX_HELD(QLOCK(q))); 1711 } else 1712 mutex_enter(QLOCK(q)); 1713 1714 if ((bp = q->q_first) == 0) { 1715 q->q_flag |= QWANTR; 1716 } else { 1717 if ((q->q_first = bp->b_next) == NULL) 1718 q->q_last = NULL; 1719 else 1720 q->q_first->b_prev = NULL; 1721 1722 /* Get message byte count for q_count accounting */ 1723 for (tmp = bp; tmp; tmp = tmp->b_cont) { 1724 bytecnt += (tmp->b_wptr - tmp->b_rptr); 1725 mblkcnt++; 1726 } 1727 1728 if (bp->b_band == 0) { 1729 q->q_count -= bytecnt; 1730 q->q_mblkcnt -= mblkcnt; 1731 if ((q->q_count < q->q_hiwat) && 1732 (q->q_mblkcnt < q->q_hiwat)) { 1733 q->q_flag &= ~QFULL; 1734 } 1735 } else { 1736 int i; 1737 1738 ASSERT(bp->b_band <= q->q_nband); 1739 ASSERT(q->q_bandp != NULL); 1740 ASSERT(MUTEX_HELD(QLOCK(q))); 1741 qbp = q->q_bandp; 1742 i = bp->b_band; 1743 while (--i > 0) 1744 qbp = qbp->qb_next; 1745 if (qbp->qb_first == qbp->qb_last) { 1746 qbp->qb_first = NULL; 1747 qbp->qb_last = NULL; 1748 } else { 1749 qbp->qb_first = bp->b_next; 1750 } 1751 qbp->qb_count -= bytecnt; 1752 qbp->qb_mblkcnt -= mblkcnt; 1753 if ((qbp->qb_count < qbp->qb_hiwat) && 1754 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1755 qbp->qb_flag &= ~QB_FULL; 1756 } 1757 } 1758 q->q_flag &= ~QWANTR; 1759 bp->b_next = NULL; 1760 bp->b_prev = NULL; 1761 } 1762 if (freezer != curthread) 1763 mutex_exit(QLOCK(q)); 1764 1765 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 1766 1767 return (bp); 1768 } 1769 1770 /* 1771 * Determine if a backenable is needed after removing a message in the 1772 * specified band. 1773 * NOTE: This routine assumes that something like getq_noenab() has been 1774 * already called. 1775 * 1776 * For the read side it is ok to hold sd_lock across calling this (and the 1777 * stream head often does). 1778 * But for the write side strwakeq might be invoked and it acquires sd_lock. 1779 */ 1780 void 1781 qbackenable(queue_t *q, uchar_t band) 1782 { 1783 int backenab = 0; 1784 qband_t *qbp; 1785 kthread_id_t freezer; 1786 1787 ASSERT(q); 1788 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 1789 1790 /* 1791 * Quick check without holding the lock. 1792 * OK since after getq() has lowered the q_count these flags 1793 * would not change unless either the qbackenable() is done by 1794 * another thread (which is ok) or the queue has gotten QFULL 1795 * in which case another backenable will take place when the queue 1796 * drops below q_lowat. 1797 */ 1798 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1799 return; 1800 1801 /* freezestr should allow its caller to call getq/putq */ 1802 freezer = STREAM(q)->sd_freezer; 1803 if (freezer == curthread) { 1804 ASSERT(frozenstr(q)); 1805 ASSERT(MUTEX_HELD(QLOCK(q))); 1806 } else 1807 mutex_enter(QLOCK(q)); 1808 1809 if (band == 0) { 1810 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 1811 q->q_mblkcnt < q->q_lowat)) { 1812 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 1813 } 1814 } else { 1815 int i; 1816 1817 ASSERT((unsigned)band <= q->q_nband); 1818 ASSERT(q->q_bandp != NULL); 1819 1820 qbp = q->q_bandp; 1821 i = band; 1822 while (--i > 0) 1823 qbp = qbp->qb_next; 1824 1825 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 1826 qbp->qb_mblkcnt < qbp->qb_lowat)) { 1827 backenab = qbp->qb_flag & QB_WANTW; 1828 } 1829 } 1830 1831 if (backenab == 0) { 1832 if (freezer != curthread) 1833 mutex_exit(QLOCK(q)); 1834 return; 1835 } 1836 1837 /* Have to drop the lock across strwakeq and backenable */ 1838 if (backenab & QWANTWSYNC) 1839 q->q_flag &= ~QWANTWSYNC; 1840 if (backenab & (QWANTW|QB_WANTW)) { 1841 if (band != 0) 1842 qbp->qb_flag &= ~QB_WANTW; 1843 else { 1844 q->q_flag &= ~QWANTW; 1845 } 1846 } 1847 1848 if (freezer != curthread) 1849 mutex_exit(QLOCK(q)); 1850 1851 if (backenab & QWANTWSYNC) 1852 strwakeq(q, QWANTWSYNC); 1853 if (backenab & (QWANTW|QB_WANTW)) 1854 backenable(q, band); 1855 } 1856 1857 /* 1858 * Remove a message from a queue. The queue count and other 1859 * flow control parameters are adjusted and the back queue 1860 * enabled if necessary. 1861 * 1862 * rmvq can be called with the stream frozen, but other utility functions 1863 * holding QLOCK, and by streams modules without any locks/frozen. 1864 */ 1865 void 1866 rmvq(queue_t *q, mblk_t *mp) 1867 { 1868 ASSERT(mp != NULL); 1869 1870 rmvq_noenab(q, mp); 1871 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 1872 /* 1873 * qbackenable can handle a frozen stream but not a "random" 1874 * qlock being held. Drop lock across qbackenable. 1875 */ 1876 mutex_exit(QLOCK(q)); 1877 qbackenable(q, mp->b_band); 1878 mutex_enter(QLOCK(q)); 1879 } else { 1880 qbackenable(q, mp->b_band); 1881 } 1882 } 1883 1884 /* 1885 * Like rmvq() but without any backenabling. 1886 * This exists to handle SR_CONSOL_DATA in strrput(). 1887 */ 1888 void 1889 rmvq_noenab(queue_t *q, mblk_t *mp) 1890 { 1891 mblk_t *tmp; 1892 int i; 1893 qband_t *qbp = NULL; 1894 kthread_id_t freezer; 1895 int bytecnt = 0, mblkcnt = 0; 1896 1897 freezer = STREAM(q)->sd_freezer; 1898 if (freezer == curthread) { 1899 ASSERT(frozenstr(q)); 1900 ASSERT(MUTEX_HELD(QLOCK(q))); 1901 } else if (MUTEX_HELD(QLOCK(q))) { 1902 /* Don't drop lock on exit */ 1903 freezer = curthread; 1904 } else 1905 mutex_enter(QLOCK(q)); 1906 1907 ASSERT(mp->b_band <= q->q_nband); 1908 if (mp->b_band != 0) { /* Adjust band pointers */ 1909 ASSERT(q->q_bandp != NULL); 1910 qbp = q->q_bandp; 1911 i = mp->b_band; 1912 while (--i > 0) 1913 qbp = qbp->qb_next; 1914 if (mp == qbp->qb_first) { 1915 if (mp->b_next && mp->b_band == mp->b_next->b_band) 1916 qbp->qb_first = mp->b_next; 1917 else 1918 qbp->qb_first = NULL; 1919 } 1920 if (mp == qbp->qb_last) { 1921 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 1922 qbp->qb_last = mp->b_prev; 1923 else 1924 qbp->qb_last = NULL; 1925 } 1926 } 1927 1928 /* 1929 * Remove the message from the list. 1930 */ 1931 if (mp->b_prev) 1932 mp->b_prev->b_next = mp->b_next; 1933 else 1934 q->q_first = mp->b_next; 1935 if (mp->b_next) 1936 mp->b_next->b_prev = mp->b_prev; 1937 else 1938 q->q_last = mp->b_prev; 1939 mp->b_next = NULL; 1940 mp->b_prev = NULL; 1941 1942 /* Get the size of the message for q_count accounting */ 1943 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1944 bytecnt += (tmp->b_wptr - tmp->b_rptr); 1945 mblkcnt++; 1946 } 1947 1948 if (mp->b_band == 0) { /* Perform q_count accounting */ 1949 q->q_count -= bytecnt; 1950 q->q_mblkcnt -= mblkcnt; 1951 if ((q->q_count < q->q_hiwat) && 1952 (q->q_mblkcnt < q->q_hiwat)) { 1953 q->q_flag &= ~QFULL; 1954 } 1955 } else { /* Perform qb_count accounting */ 1956 qbp->qb_count -= bytecnt; 1957 qbp->qb_mblkcnt -= mblkcnt; 1958 if ((qbp->qb_count < qbp->qb_hiwat) && 1959 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1960 qbp->qb_flag &= ~QB_FULL; 1961 } 1962 } 1963 if (freezer != curthread) 1964 mutex_exit(QLOCK(q)); 1965 1966 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 1967 } 1968 1969 /* 1970 * Empty a queue. 1971 * If flag is set, remove all messages. Otherwise, remove 1972 * only non-control messages. If queue falls below its low 1973 * water mark, and QWANTW is set, enable the nearest upstream 1974 * service procedure. 1975 * 1976 * Historical note: when merging the M_FLUSH code in strrput with this 1977 * code one difference was discovered. flushq did not have a check 1978 * for q_lowat == 0 in the backenabling test. 1979 * 1980 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 1981 * if one exists on the queue. 1982 */ 1983 void 1984 flushq_common(queue_t *q, int flag, int pcproto_flag) 1985 { 1986 mblk_t *mp, *nmp; 1987 qband_t *qbp; 1988 int backenab = 0; 1989 unsigned char bpri; 1990 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 1991 1992 if (q->q_first == NULL) 1993 return; 1994 1995 mutex_enter(QLOCK(q)); 1996 mp = q->q_first; 1997 q->q_first = NULL; 1998 q->q_last = NULL; 1999 q->q_count = 0; 2000 q->q_mblkcnt = 0; 2001 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2002 qbp->qb_first = NULL; 2003 qbp->qb_last = NULL; 2004 qbp->qb_count = 0; 2005 qbp->qb_mblkcnt = 0; 2006 qbp->qb_flag &= ~QB_FULL; 2007 } 2008 q->q_flag &= ~QFULL; 2009 mutex_exit(QLOCK(q)); 2010 while (mp) { 2011 nmp = mp->b_next; 2012 mp->b_next = mp->b_prev = NULL; 2013 2014 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2015 2016 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2017 (void) putq(q, mp); 2018 else if (flag || datamsg(mp->b_datap->db_type)) 2019 freemsg(mp); 2020 else 2021 (void) putq(q, mp); 2022 mp = nmp; 2023 } 2024 bpri = 1; 2025 mutex_enter(QLOCK(q)); 2026 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2027 if ((qbp->qb_flag & QB_WANTW) && 2028 (((qbp->qb_count < qbp->qb_lowat) && 2029 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2030 qbp->qb_lowat == 0)) { 2031 qbp->qb_flag &= ~QB_WANTW; 2032 backenab = 1; 2033 qbf[bpri] = 1; 2034 } else 2035 qbf[bpri] = 0; 2036 bpri++; 2037 } 2038 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2039 if ((q->q_flag & QWANTW) && 2040 (((q->q_count < q->q_lowat) && 2041 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2042 q->q_flag &= ~QWANTW; 2043 backenab = 1; 2044 qbf[0] = 1; 2045 } else 2046 qbf[0] = 0; 2047 2048 /* 2049 * If any band can now be written to, and there is a writer 2050 * for that band, then backenable the closest service procedure. 2051 */ 2052 if (backenab) { 2053 mutex_exit(QLOCK(q)); 2054 for (bpri = q->q_nband; bpri != 0; bpri--) 2055 if (qbf[bpri]) 2056 backenable(q, bpri); 2057 if (qbf[0]) 2058 backenable(q, 0); 2059 } else 2060 mutex_exit(QLOCK(q)); 2061 } 2062 2063 /* 2064 * The real flushing takes place in flushq_common. This is done so that 2065 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2066 * or not. Currently the only place that uses this flag is the stream head. 2067 */ 2068 void 2069 flushq(queue_t *q, int flag) 2070 { 2071 flushq_common(q, flag, 0); 2072 } 2073 2074 /* 2075 * Flush the queue of messages of the given priority band. 2076 * There is some duplication of code between flushq and flushband. 2077 * This is because we want to optimize the code as much as possible. 2078 * The assumption is that there will be more messages in the normal 2079 * (priority 0) band than in any other. 2080 * 2081 * Historical note: when merging the M_FLUSH code in strrput with this 2082 * code one difference was discovered. flushband had an extra check for 2083 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2084 * case. That check does not match the man page for flushband and was not 2085 * in the strrput flush code hence it was removed. 2086 */ 2087 void 2088 flushband(queue_t *q, unsigned char pri, int flag) 2089 { 2090 mblk_t *mp; 2091 mblk_t *nmp; 2092 mblk_t *last; 2093 qband_t *qbp; 2094 int band; 2095 2096 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2097 if (pri > q->q_nband) { 2098 return; 2099 } 2100 mutex_enter(QLOCK(q)); 2101 if (pri == 0) { 2102 mp = q->q_first; 2103 q->q_first = NULL; 2104 q->q_last = NULL; 2105 q->q_count = 0; 2106 q->q_mblkcnt = 0; 2107 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2108 qbp->qb_first = NULL; 2109 qbp->qb_last = NULL; 2110 qbp->qb_count = 0; 2111 qbp->qb_mblkcnt = 0; 2112 qbp->qb_flag &= ~QB_FULL; 2113 } 2114 q->q_flag &= ~QFULL; 2115 mutex_exit(QLOCK(q)); 2116 while (mp) { 2117 nmp = mp->b_next; 2118 mp->b_next = mp->b_prev = NULL; 2119 if ((mp->b_band == 0) && 2120 ((flag == FLUSHALL) || 2121 datamsg(mp->b_datap->db_type))) 2122 freemsg(mp); 2123 else 2124 (void) putq(q, mp); 2125 mp = nmp; 2126 } 2127 mutex_enter(QLOCK(q)); 2128 if ((q->q_flag & QWANTW) && 2129 (((q->q_count < q->q_lowat) && 2130 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2131 q->q_flag &= ~QWANTW; 2132 mutex_exit(QLOCK(q)); 2133 2134 backenable(q, pri); 2135 } else 2136 mutex_exit(QLOCK(q)); 2137 } else { /* pri != 0 */ 2138 boolean_t flushed = B_FALSE; 2139 band = pri; 2140 2141 ASSERT(MUTEX_HELD(QLOCK(q))); 2142 qbp = q->q_bandp; 2143 while (--band > 0) 2144 qbp = qbp->qb_next; 2145 mp = qbp->qb_first; 2146 if (mp == NULL) { 2147 mutex_exit(QLOCK(q)); 2148 return; 2149 } 2150 last = qbp->qb_last->b_next; 2151 /* 2152 * rmvq_noenab() and freemsg() are called for each mblk that 2153 * meets the criteria. The loop is executed until the last 2154 * mblk has been processed. 2155 */ 2156 while (mp != last) { 2157 ASSERT(mp->b_band == pri); 2158 nmp = mp->b_next; 2159 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2160 rmvq_noenab(q, mp); 2161 freemsg(mp); 2162 flushed = B_TRUE; 2163 } 2164 mp = nmp; 2165 } 2166 mutex_exit(QLOCK(q)); 2167 2168 /* 2169 * If any mblk(s) has been freed, we know that qbackenable() 2170 * will need to be called. 2171 */ 2172 if (flushed) 2173 qbackenable(q, pri); 2174 } 2175 } 2176 2177 /* 2178 * Return 1 if the queue is not full. If the queue is full, return 2179 * 0 (may not put message) and set QWANTW flag (caller wants to write 2180 * to the queue). 2181 */ 2182 int 2183 canput(queue_t *q) 2184 { 2185 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2186 2187 /* this is for loopback transports, they should not do a canput */ 2188 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2189 2190 /* Find next forward module that has a service procedure */ 2191 q = q->q_nfsrv; 2192 2193 if (!(q->q_flag & QFULL)) { 2194 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2195 return (1); 2196 } 2197 mutex_enter(QLOCK(q)); 2198 if (q->q_flag & QFULL) { 2199 q->q_flag |= QWANTW; 2200 mutex_exit(QLOCK(q)); 2201 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2202 return (0); 2203 } 2204 mutex_exit(QLOCK(q)); 2205 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2206 return (1); 2207 } 2208 2209 /* 2210 * This is the new canput for use with priority bands. Return 1 if the 2211 * band is not full. If the band is full, return 0 (may not put message) 2212 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2213 * write to the queue). 2214 */ 2215 int 2216 bcanput(queue_t *q, unsigned char pri) 2217 { 2218 qband_t *qbp; 2219 2220 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2221 if (!q) 2222 return (0); 2223 2224 /* Find next forward module that has a service procedure */ 2225 q = q->q_nfsrv; 2226 2227 mutex_enter(QLOCK(q)); 2228 if (pri == 0) { 2229 if (q->q_flag & QFULL) { 2230 q->q_flag |= QWANTW; 2231 mutex_exit(QLOCK(q)); 2232 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2233 "bcanput:%p %X %d", q, pri, 0); 2234 return (0); 2235 } 2236 } else { /* pri != 0 */ 2237 if (pri > q->q_nband) { 2238 /* 2239 * No band exists yet, so return success. 2240 */ 2241 mutex_exit(QLOCK(q)); 2242 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2243 "bcanput:%p %X %d", q, pri, 1); 2244 return (1); 2245 } 2246 qbp = q->q_bandp; 2247 while (--pri) 2248 qbp = qbp->qb_next; 2249 if (qbp->qb_flag & QB_FULL) { 2250 qbp->qb_flag |= QB_WANTW; 2251 mutex_exit(QLOCK(q)); 2252 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2253 "bcanput:%p %X %d", q, pri, 0); 2254 return (0); 2255 } 2256 } 2257 mutex_exit(QLOCK(q)); 2258 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2259 "bcanput:%p %X %d", q, pri, 1); 2260 return (1); 2261 } 2262 2263 /* 2264 * Put a message on a queue. 2265 * 2266 * Messages are enqueued on a priority basis. The priority classes 2267 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2268 * and B_NORMAL (type < QPCTL && band == 0). 2269 * 2270 * Add appropriate weighted data block sizes to queue count. 2271 * If queue hits high water mark then set QFULL flag. 2272 * 2273 * If QNOENAB is not set (putq is allowed to enable the queue), 2274 * enable the queue only if the message is PRIORITY, 2275 * or the QWANTR flag is set (indicating that the service procedure 2276 * is ready to read the queue. This implies that a service 2277 * procedure must NEVER put a high priority message back on its own 2278 * queue, as this would result in an infinite loop (!). 2279 */ 2280 int 2281 putq(queue_t *q, mblk_t *bp) 2282 { 2283 mblk_t *tmp; 2284 qband_t *qbp = NULL; 2285 int mcls = (int)queclass(bp); 2286 kthread_id_t freezer; 2287 int bytecnt = 0, mblkcnt = 0; 2288 2289 freezer = STREAM(q)->sd_freezer; 2290 if (freezer == curthread) { 2291 ASSERT(frozenstr(q)); 2292 ASSERT(MUTEX_HELD(QLOCK(q))); 2293 } else 2294 mutex_enter(QLOCK(q)); 2295 2296 /* 2297 * Make sanity checks and if qband structure is not yet 2298 * allocated, do so. 2299 */ 2300 if (mcls == QPCTL) { 2301 if (bp->b_band != 0) 2302 bp->b_band = 0; /* force to be correct */ 2303 } else if (bp->b_band != 0) { 2304 int i; 2305 qband_t **qbpp; 2306 2307 if (bp->b_band > q->q_nband) { 2308 2309 /* 2310 * The qband structure for this priority band is 2311 * not on the queue yet, so we have to allocate 2312 * one on the fly. It would be wasteful to 2313 * associate the qband structures with every 2314 * queue when the queues are allocated. This is 2315 * because most queues will only need the normal 2316 * band of flow which can be described entirely 2317 * by the queue itself. 2318 */ 2319 qbpp = &q->q_bandp; 2320 while (*qbpp) 2321 qbpp = &(*qbpp)->qb_next; 2322 while (bp->b_band > q->q_nband) { 2323 if ((*qbpp = allocband()) == NULL) { 2324 if (freezer != curthread) 2325 mutex_exit(QLOCK(q)); 2326 return (0); 2327 } 2328 (*qbpp)->qb_hiwat = q->q_hiwat; 2329 (*qbpp)->qb_lowat = q->q_lowat; 2330 q->q_nband++; 2331 qbpp = &(*qbpp)->qb_next; 2332 } 2333 } 2334 ASSERT(MUTEX_HELD(QLOCK(q))); 2335 qbp = q->q_bandp; 2336 i = bp->b_band; 2337 while (--i) 2338 qbp = qbp->qb_next; 2339 } 2340 2341 /* 2342 * If queue is empty, add the message and initialize the pointers. 2343 * Otherwise, adjust message pointers and queue pointers based on 2344 * the type of the message and where it belongs on the queue. Some 2345 * code is duplicated to minimize the number of conditionals and 2346 * hopefully minimize the amount of time this routine takes. 2347 */ 2348 if (!q->q_first) { 2349 bp->b_next = NULL; 2350 bp->b_prev = NULL; 2351 q->q_first = bp; 2352 q->q_last = bp; 2353 if (qbp) { 2354 qbp->qb_first = bp; 2355 qbp->qb_last = bp; 2356 } 2357 } else if (!qbp) { /* bp->b_band == 0 */ 2358 2359 /* 2360 * If queue class of message is less than or equal to 2361 * that of the last one on the queue, tack on to the end. 2362 */ 2363 tmp = q->q_last; 2364 if (mcls <= (int)queclass(tmp)) { 2365 bp->b_next = NULL; 2366 bp->b_prev = tmp; 2367 tmp->b_next = bp; 2368 q->q_last = bp; 2369 } else { 2370 tmp = q->q_first; 2371 while ((int)queclass(tmp) >= mcls) 2372 tmp = tmp->b_next; 2373 2374 /* 2375 * Insert bp before tmp. 2376 */ 2377 bp->b_next = tmp; 2378 bp->b_prev = tmp->b_prev; 2379 if (tmp->b_prev) 2380 tmp->b_prev->b_next = bp; 2381 else 2382 q->q_first = bp; 2383 tmp->b_prev = bp; 2384 } 2385 } else { /* bp->b_band != 0 */ 2386 if (qbp->qb_first) { 2387 tmp = qbp->qb_last; 2388 2389 /* 2390 * Insert bp after the last message in this band. 2391 */ 2392 bp->b_next = tmp->b_next; 2393 if (tmp->b_next) 2394 tmp->b_next->b_prev = bp; 2395 else 2396 q->q_last = bp; 2397 bp->b_prev = tmp; 2398 tmp->b_next = bp; 2399 } else { 2400 tmp = q->q_last; 2401 if ((mcls < (int)queclass(tmp)) || 2402 (bp->b_band <= tmp->b_band)) { 2403 2404 /* 2405 * Tack bp on end of queue. 2406 */ 2407 bp->b_next = NULL; 2408 bp->b_prev = tmp; 2409 tmp->b_next = bp; 2410 q->q_last = bp; 2411 } else { 2412 tmp = q->q_first; 2413 while (tmp->b_datap->db_type >= QPCTL) 2414 tmp = tmp->b_next; 2415 while (tmp->b_band >= bp->b_band) 2416 tmp = tmp->b_next; 2417 2418 /* 2419 * Insert bp before tmp. 2420 */ 2421 bp->b_next = tmp; 2422 bp->b_prev = tmp->b_prev; 2423 if (tmp->b_prev) 2424 tmp->b_prev->b_next = bp; 2425 else 2426 q->q_first = bp; 2427 tmp->b_prev = bp; 2428 } 2429 qbp->qb_first = bp; 2430 } 2431 qbp->qb_last = bp; 2432 } 2433 2434 /* Get message byte count for q_count accounting */ 2435 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2436 bytecnt += (tmp->b_wptr - tmp->b_rptr); 2437 mblkcnt++; 2438 } 2439 if (qbp) { 2440 qbp->qb_count += bytecnt; 2441 qbp->qb_mblkcnt += mblkcnt; 2442 if ((qbp->qb_count >= qbp->qb_hiwat) || 2443 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2444 qbp->qb_flag |= QB_FULL; 2445 } 2446 } else { 2447 q->q_count += bytecnt; 2448 q->q_mblkcnt += mblkcnt; 2449 if ((q->q_count >= q->q_hiwat) || 2450 (q->q_mblkcnt >= q->q_hiwat)) { 2451 q->q_flag |= QFULL; 2452 } 2453 } 2454 2455 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2456 2457 if ((mcls > QNORM) || 2458 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2459 qenable_locked(q); 2460 ASSERT(MUTEX_HELD(QLOCK(q))); 2461 if (freezer != curthread) 2462 mutex_exit(QLOCK(q)); 2463 2464 return (1); 2465 } 2466 2467 /* 2468 * Put stuff back at beginning of Q according to priority order. 2469 * See comment on putq above for details. 2470 */ 2471 int 2472 putbq(queue_t *q, mblk_t *bp) 2473 { 2474 mblk_t *tmp; 2475 qband_t *qbp = NULL; 2476 int mcls = (int)queclass(bp); 2477 kthread_id_t freezer; 2478 int bytecnt = 0, mblkcnt = 0; 2479 2480 ASSERT(q && bp); 2481 ASSERT(bp->b_next == NULL); 2482 freezer = STREAM(q)->sd_freezer; 2483 if (freezer == curthread) { 2484 ASSERT(frozenstr(q)); 2485 ASSERT(MUTEX_HELD(QLOCK(q))); 2486 } else 2487 mutex_enter(QLOCK(q)); 2488 2489 /* 2490 * Make sanity checks and if qband structure is not yet 2491 * allocated, do so. 2492 */ 2493 if (mcls == QPCTL) { 2494 if (bp->b_band != 0) 2495 bp->b_band = 0; /* force to be correct */ 2496 } else if (bp->b_band != 0) { 2497 int i; 2498 qband_t **qbpp; 2499 2500 if (bp->b_band > q->q_nband) { 2501 qbpp = &q->q_bandp; 2502 while (*qbpp) 2503 qbpp = &(*qbpp)->qb_next; 2504 while (bp->b_band > q->q_nband) { 2505 if ((*qbpp = allocband()) == NULL) { 2506 if (freezer != curthread) 2507 mutex_exit(QLOCK(q)); 2508 return (0); 2509 } 2510 (*qbpp)->qb_hiwat = q->q_hiwat; 2511 (*qbpp)->qb_lowat = q->q_lowat; 2512 q->q_nband++; 2513 qbpp = &(*qbpp)->qb_next; 2514 } 2515 } 2516 qbp = q->q_bandp; 2517 i = bp->b_band; 2518 while (--i) 2519 qbp = qbp->qb_next; 2520 } 2521 2522 /* 2523 * If queue is empty or if message is high priority, 2524 * place on the front of the queue. 2525 */ 2526 tmp = q->q_first; 2527 if ((!tmp) || (mcls == QPCTL)) { 2528 bp->b_next = tmp; 2529 if (tmp) 2530 tmp->b_prev = bp; 2531 else 2532 q->q_last = bp; 2533 q->q_first = bp; 2534 bp->b_prev = NULL; 2535 if (qbp) { 2536 qbp->qb_first = bp; 2537 qbp->qb_last = bp; 2538 } 2539 } else if (qbp) { /* bp->b_band != 0 */ 2540 tmp = qbp->qb_first; 2541 if (tmp) { 2542 2543 /* 2544 * Insert bp before the first message in this band. 2545 */ 2546 bp->b_next = tmp; 2547 bp->b_prev = tmp->b_prev; 2548 if (tmp->b_prev) 2549 tmp->b_prev->b_next = bp; 2550 else 2551 q->q_first = bp; 2552 tmp->b_prev = bp; 2553 } else { 2554 tmp = q->q_last; 2555 if ((mcls < (int)queclass(tmp)) || 2556 (bp->b_band < tmp->b_band)) { 2557 2558 /* 2559 * Tack bp on end of queue. 2560 */ 2561 bp->b_next = NULL; 2562 bp->b_prev = tmp; 2563 tmp->b_next = bp; 2564 q->q_last = bp; 2565 } else { 2566 tmp = q->q_first; 2567 while (tmp->b_datap->db_type >= QPCTL) 2568 tmp = tmp->b_next; 2569 while (tmp->b_band > bp->b_band) 2570 tmp = tmp->b_next; 2571 2572 /* 2573 * Insert bp before tmp. 2574 */ 2575 bp->b_next = tmp; 2576 bp->b_prev = tmp->b_prev; 2577 if (tmp->b_prev) 2578 tmp->b_prev->b_next = bp; 2579 else 2580 q->q_first = bp; 2581 tmp->b_prev = bp; 2582 } 2583 qbp->qb_last = bp; 2584 } 2585 qbp->qb_first = bp; 2586 } else { /* bp->b_band == 0 && !QPCTL */ 2587 2588 /* 2589 * If the queue class or band is less than that of the last 2590 * message on the queue, tack bp on the end of the queue. 2591 */ 2592 tmp = q->q_last; 2593 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2594 bp->b_next = NULL; 2595 bp->b_prev = tmp; 2596 tmp->b_next = bp; 2597 q->q_last = bp; 2598 } else { 2599 tmp = q->q_first; 2600 while (tmp->b_datap->db_type >= QPCTL) 2601 tmp = tmp->b_next; 2602 while (tmp->b_band > bp->b_band) 2603 tmp = tmp->b_next; 2604 2605 /* 2606 * Insert bp before tmp. 2607 */ 2608 bp->b_next = tmp; 2609 bp->b_prev = tmp->b_prev; 2610 if (tmp->b_prev) 2611 tmp->b_prev->b_next = bp; 2612 else 2613 q->q_first = bp; 2614 tmp->b_prev = bp; 2615 } 2616 } 2617 2618 /* Get message byte count for q_count accounting */ 2619 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2620 bytecnt += (tmp->b_wptr - tmp->b_rptr); 2621 mblkcnt++; 2622 } 2623 if (qbp) { 2624 qbp->qb_count += bytecnt; 2625 qbp->qb_mblkcnt += mblkcnt; 2626 if ((qbp->qb_count >= qbp->qb_hiwat) || 2627 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2628 qbp->qb_flag |= QB_FULL; 2629 } 2630 } else { 2631 q->q_count += bytecnt; 2632 q->q_mblkcnt += mblkcnt; 2633 if ((q->q_count >= q->q_hiwat) || 2634 (q->q_mblkcnt >= q->q_hiwat)) { 2635 q->q_flag |= QFULL; 2636 } 2637 } 2638 2639 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2640 2641 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2642 qenable_locked(q); 2643 ASSERT(MUTEX_HELD(QLOCK(q))); 2644 if (freezer != curthread) 2645 mutex_exit(QLOCK(q)); 2646 2647 return (1); 2648 } 2649 2650 /* 2651 * Insert a message before an existing message on the queue. If the 2652 * existing message is NULL, the new messages is placed on the end of 2653 * the queue. The queue class of the new message is ignored. However, 2654 * the priority band of the new message must adhere to the following 2655 * ordering: 2656 * 2657 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2658 * 2659 * All flow control parameters are updated. 2660 * 2661 * insq can be called with the stream frozen, but other utility functions 2662 * holding QLOCK, and by streams modules without any locks/frozen. 2663 */ 2664 int 2665 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2666 { 2667 mblk_t *tmp; 2668 qband_t *qbp = NULL; 2669 int mcls = (int)queclass(mp); 2670 kthread_id_t freezer; 2671 int bytecnt = 0, mblkcnt = 0; 2672 2673 freezer = STREAM(q)->sd_freezer; 2674 if (freezer == curthread) { 2675 ASSERT(frozenstr(q)); 2676 ASSERT(MUTEX_HELD(QLOCK(q))); 2677 } else if (MUTEX_HELD(QLOCK(q))) { 2678 /* Don't drop lock on exit */ 2679 freezer = curthread; 2680 } else 2681 mutex_enter(QLOCK(q)); 2682 2683 if (mcls == QPCTL) { 2684 if (mp->b_band != 0) 2685 mp->b_band = 0; /* force to be correct */ 2686 if (emp && emp->b_prev && 2687 (emp->b_prev->b_datap->db_type < QPCTL)) 2688 goto badord; 2689 } 2690 if (emp) { 2691 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2692 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2693 (emp->b_prev->b_band < mp->b_band))) { 2694 goto badord; 2695 } 2696 } else { 2697 tmp = q->q_last; 2698 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2699 badord: 2700 cmn_err(CE_WARN, 2701 "insq: attempt to insert message out of order " 2702 "on q %p", (void *)q); 2703 if (freezer != curthread) 2704 mutex_exit(QLOCK(q)); 2705 return (0); 2706 } 2707 } 2708 2709 if (mp->b_band != 0) { 2710 int i; 2711 qband_t **qbpp; 2712 2713 if (mp->b_band > q->q_nband) { 2714 qbpp = &q->q_bandp; 2715 while (*qbpp) 2716 qbpp = &(*qbpp)->qb_next; 2717 while (mp->b_band > q->q_nband) { 2718 if ((*qbpp = allocband()) == NULL) { 2719 if (freezer != curthread) 2720 mutex_exit(QLOCK(q)); 2721 return (0); 2722 } 2723 (*qbpp)->qb_hiwat = q->q_hiwat; 2724 (*qbpp)->qb_lowat = q->q_lowat; 2725 q->q_nband++; 2726 qbpp = &(*qbpp)->qb_next; 2727 } 2728 } 2729 qbp = q->q_bandp; 2730 i = mp->b_band; 2731 while (--i) 2732 qbp = qbp->qb_next; 2733 } 2734 2735 if ((mp->b_next = emp) != NULL) { 2736 if ((mp->b_prev = emp->b_prev) != NULL) 2737 emp->b_prev->b_next = mp; 2738 else 2739 q->q_first = mp; 2740 emp->b_prev = mp; 2741 } else { 2742 if ((mp->b_prev = q->q_last) != NULL) 2743 q->q_last->b_next = mp; 2744 else 2745 q->q_first = mp; 2746 q->q_last = mp; 2747 } 2748 2749 /* Get mblk and byte count for q_count accounting */ 2750 for (tmp = mp; tmp; tmp = tmp->b_cont) { 2751 bytecnt += (tmp->b_wptr - tmp->b_rptr); 2752 mblkcnt++; 2753 } 2754 2755 if (qbp) { /* adjust qband pointers and count */ 2756 if (!qbp->qb_first) { 2757 qbp->qb_first = mp; 2758 qbp->qb_last = mp; 2759 } else { 2760 if (mp->b_prev == NULL || (mp->b_prev != NULL && 2761 (mp->b_prev->b_band != mp->b_band))) 2762 qbp->qb_first = mp; 2763 else if (mp->b_next == NULL || (mp->b_next != NULL && 2764 (mp->b_next->b_band != mp->b_band))) 2765 qbp->qb_last = mp; 2766 } 2767 qbp->qb_count += bytecnt; 2768 qbp->qb_mblkcnt += mblkcnt; 2769 if ((qbp->qb_count >= qbp->qb_hiwat) || 2770 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2771 qbp->qb_flag |= QB_FULL; 2772 } 2773 } else { 2774 q->q_count += bytecnt; 2775 q->q_mblkcnt += mblkcnt; 2776 if ((q->q_count >= q->q_hiwat) || 2777 (q->q_mblkcnt >= q->q_hiwat)) { 2778 q->q_flag |= QFULL; 2779 } 2780 } 2781 2782 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 2783 2784 if (canenable(q) && (q->q_flag & QWANTR)) 2785 qenable_locked(q); 2786 2787 ASSERT(MUTEX_HELD(QLOCK(q))); 2788 if (freezer != curthread) 2789 mutex_exit(QLOCK(q)); 2790 2791 return (1); 2792 } 2793 2794 /* 2795 * Create and put a control message on queue. 2796 */ 2797 int 2798 putctl(queue_t *q, int type) 2799 { 2800 mblk_t *bp; 2801 2802 if ((datamsg(type) && (type != M_DELAY)) || 2803 (bp = allocb_tryhard(0)) == NULL) 2804 return (0); 2805 bp->b_datap->db_type = (unsigned char) type; 2806 2807 put(q, bp); 2808 2809 return (1); 2810 } 2811 2812 /* 2813 * Control message with a single-byte parameter 2814 */ 2815 int 2816 putctl1(queue_t *q, int type, int param) 2817 { 2818 mblk_t *bp; 2819 2820 if ((datamsg(type) && (type != M_DELAY)) || 2821 (bp = allocb_tryhard(1)) == NULL) 2822 return (0); 2823 bp->b_datap->db_type = (unsigned char)type; 2824 *bp->b_wptr++ = (unsigned char)param; 2825 2826 put(q, bp); 2827 2828 return (1); 2829 } 2830 2831 int 2832 putnextctl1(queue_t *q, int type, int param) 2833 { 2834 mblk_t *bp; 2835 2836 if ((datamsg(type) && (type != M_DELAY)) || 2837 ((bp = allocb_tryhard(1)) == NULL)) 2838 return (0); 2839 2840 bp->b_datap->db_type = (unsigned char)type; 2841 *bp->b_wptr++ = (unsigned char)param; 2842 2843 putnext(q, bp); 2844 2845 return (1); 2846 } 2847 2848 int 2849 putnextctl(queue_t *q, int type) 2850 { 2851 mblk_t *bp; 2852 2853 if ((datamsg(type) && (type != M_DELAY)) || 2854 ((bp = allocb_tryhard(0)) == NULL)) 2855 return (0); 2856 bp->b_datap->db_type = (unsigned char)type; 2857 2858 putnext(q, bp); 2859 2860 return (1); 2861 } 2862 2863 /* 2864 * Return the queue upstream from this one 2865 */ 2866 queue_t * 2867 backq(queue_t *q) 2868 { 2869 q = _OTHERQ(q); 2870 if (q->q_next) { 2871 q = q->q_next; 2872 return (_OTHERQ(q)); 2873 } 2874 return (NULL); 2875 } 2876 2877 /* 2878 * Send a block back up the queue in reverse from this 2879 * one (e.g. to respond to ioctls) 2880 */ 2881 void 2882 qreply(queue_t *q, mblk_t *bp) 2883 { 2884 ASSERT(q && bp); 2885 2886 putnext(_OTHERQ(q), bp); 2887 } 2888 2889 /* 2890 * Streams Queue Scheduling 2891 * 2892 * Queues are enabled through qenable() when they have messages to 2893 * process. They are serviced by queuerun(), which runs each enabled 2894 * queue's service procedure. The call to queuerun() is processor 2895 * dependent - the general principle is that it be run whenever a queue 2896 * is enabled but before returning to user level. For system calls, 2897 * the function runqueues() is called if their action causes a queue 2898 * to be enabled. For device interrupts, queuerun() should be 2899 * called before returning from the last level of interrupt. Beyond 2900 * this, no timing assumptions should be made about queue scheduling. 2901 */ 2902 2903 /* 2904 * Enable a queue: put it on list of those whose service procedures are 2905 * ready to run and set up the scheduling mechanism. 2906 * The broadcast is done outside the mutex -> to avoid the woken thread 2907 * from contending with the mutex. This is OK 'cos the queue has been 2908 * enqueued on the runlist and flagged safely at this point. 2909 */ 2910 void 2911 qenable(queue_t *q) 2912 { 2913 mutex_enter(QLOCK(q)); 2914 qenable_locked(q); 2915 mutex_exit(QLOCK(q)); 2916 } 2917 /* 2918 * Return number of messages on queue 2919 */ 2920 int 2921 qsize(queue_t *qp) 2922 { 2923 int count = 0; 2924 mblk_t *mp; 2925 2926 mutex_enter(QLOCK(qp)); 2927 for (mp = qp->q_first; mp; mp = mp->b_next) 2928 count++; 2929 mutex_exit(QLOCK(qp)); 2930 return (count); 2931 } 2932 2933 /* 2934 * noenable - set queue so that putq() will not enable it. 2935 * enableok - set queue so that putq() can enable it. 2936 */ 2937 void 2938 noenable(queue_t *q) 2939 { 2940 mutex_enter(QLOCK(q)); 2941 q->q_flag |= QNOENB; 2942 mutex_exit(QLOCK(q)); 2943 } 2944 2945 void 2946 enableok(queue_t *q) 2947 { 2948 mutex_enter(QLOCK(q)); 2949 q->q_flag &= ~QNOENB; 2950 mutex_exit(QLOCK(q)); 2951 } 2952 2953 /* 2954 * Set queue fields. 2955 */ 2956 int 2957 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 2958 { 2959 qband_t *qbp = NULL; 2960 queue_t *wrq; 2961 int error = 0; 2962 kthread_id_t freezer; 2963 2964 freezer = STREAM(q)->sd_freezer; 2965 if (freezer == curthread) { 2966 ASSERT(frozenstr(q)); 2967 ASSERT(MUTEX_HELD(QLOCK(q))); 2968 } else 2969 mutex_enter(QLOCK(q)); 2970 2971 if (what >= QBAD) { 2972 error = EINVAL; 2973 goto done; 2974 } 2975 if (pri != 0) { 2976 int i; 2977 qband_t **qbpp; 2978 2979 if (pri > q->q_nband) { 2980 qbpp = &q->q_bandp; 2981 while (*qbpp) 2982 qbpp = &(*qbpp)->qb_next; 2983 while (pri > q->q_nband) { 2984 if ((*qbpp = allocband()) == NULL) { 2985 error = EAGAIN; 2986 goto done; 2987 } 2988 (*qbpp)->qb_hiwat = q->q_hiwat; 2989 (*qbpp)->qb_lowat = q->q_lowat; 2990 q->q_nband++; 2991 qbpp = &(*qbpp)->qb_next; 2992 } 2993 } 2994 qbp = q->q_bandp; 2995 i = pri; 2996 while (--i) 2997 qbp = qbp->qb_next; 2998 } 2999 switch (what) { 3000 3001 case QHIWAT: 3002 if (qbp) 3003 qbp->qb_hiwat = (size_t)val; 3004 else 3005 q->q_hiwat = (size_t)val; 3006 break; 3007 3008 case QLOWAT: 3009 if (qbp) 3010 qbp->qb_lowat = (size_t)val; 3011 else 3012 q->q_lowat = (size_t)val; 3013 break; 3014 3015 case QMAXPSZ: 3016 if (qbp) 3017 error = EINVAL; 3018 else 3019 q->q_maxpsz = (ssize_t)val; 3020 3021 /* 3022 * Performance concern, strwrite looks at the module below 3023 * the stream head for the maxpsz each time it does a write 3024 * we now cache it at the stream head. Check to see if this 3025 * queue is sitting directly below the stream head. 3026 */ 3027 wrq = STREAM(q)->sd_wrq; 3028 if (q != wrq->q_next) 3029 break; 3030 3031 /* 3032 * If the stream is not frozen drop the current QLOCK and 3033 * acquire the sd_wrq QLOCK which protects sd_qn_* 3034 */ 3035 if (freezer != curthread) { 3036 mutex_exit(QLOCK(q)); 3037 mutex_enter(QLOCK(wrq)); 3038 } 3039 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3040 3041 if (strmsgsz != 0) { 3042 if (val == INFPSZ) 3043 val = strmsgsz; 3044 else { 3045 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3046 val = MIN(PIPE_BUF, val); 3047 else 3048 val = MIN(strmsgsz, val); 3049 } 3050 } 3051 STREAM(q)->sd_qn_maxpsz = val; 3052 if (freezer != curthread) { 3053 mutex_exit(QLOCK(wrq)); 3054 mutex_enter(QLOCK(q)); 3055 } 3056 break; 3057 3058 case QMINPSZ: 3059 if (qbp) 3060 error = EINVAL; 3061 else 3062 q->q_minpsz = (ssize_t)val; 3063 3064 /* 3065 * Performance concern, strwrite looks at the module below 3066 * the stream head for the maxpsz each time it does a write 3067 * we now cache it at the stream head. Check to see if this 3068 * queue is sitting directly below the stream head. 3069 */ 3070 wrq = STREAM(q)->sd_wrq; 3071 if (q != wrq->q_next) 3072 break; 3073 3074 /* 3075 * If the stream is not frozen drop the current QLOCK and 3076 * acquire the sd_wrq QLOCK which protects sd_qn_* 3077 */ 3078 if (freezer != curthread) { 3079 mutex_exit(QLOCK(q)); 3080 mutex_enter(QLOCK(wrq)); 3081 } 3082 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3083 3084 if (freezer != curthread) { 3085 mutex_exit(QLOCK(wrq)); 3086 mutex_enter(QLOCK(q)); 3087 } 3088 break; 3089 3090 case QSTRUIOT: 3091 if (qbp) 3092 error = EINVAL; 3093 else 3094 q->q_struiot = (ushort_t)val; 3095 break; 3096 3097 case QCOUNT: 3098 case QFIRST: 3099 case QLAST: 3100 case QFLAG: 3101 error = EPERM; 3102 break; 3103 3104 default: 3105 error = EINVAL; 3106 break; 3107 } 3108 done: 3109 if (freezer != curthread) 3110 mutex_exit(QLOCK(q)); 3111 return (error); 3112 } 3113 3114 /* 3115 * Get queue fields. 3116 */ 3117 int 3118 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3119 { 3120 qband_t *qbp = NULL; 3121 int error = 0; 3122 kthread_id_t freezer; 3123 3124 freezer = STREAM(q)->sd_freezer; 3125 if (freezer == curthread) { 3126 ASSERT(frozenstr(q)); 3127 ASSERT(MUTEX_HELD(QLOCK(q))); 3128 } else 3129 mutex_enter(QLOCK(q)); 3130 if (what >= QBAD) { 3131 error = EINVAL; 3132 goto done; 3133 } 3134 if (pri != 0) { 3135 int i; 3136 qband_t **qbpp; 3137 3138 if (pri > q->q_nband) { 3139 qbpp = &q->q_bandp; 3140 while (*qbpp) 3141 qbpp = &(*qbpp)->qb_next; 3142 while (pri > q->q_nband) { 3143 if ((*qbpp = allocband()) == NULL) { 3144 error = EAGAIN; 3145 goto done; 3146 } 3147 (*qbpp)->qb_hiwat = q->q_hiwat; 3148 (*qbpp)->qb_lowat = q->q_lowat; 3149 q->q_nband++; 3150 qbpp = &(*qbpp)->qb_next; 3151 } 3152 } 3153 qbp = q->q_bandp; 3154 i = pri; 3155 while (--i) 3156 qbp = qbp->qb_next; 3157 } 3158 switch (what) { 3159 case QHIWAT: 3160 if (qbp) 3161 *(size_t *)valp = qbp->qb_hiwat; 3162 else 3163 *(size_t *)valp = q->q_hiwat; 3164 break; 3165 3166 case QLOWAT: 3167 if (qbp) 3168 *(size_t *)valp = qbp->qb_lowat; 3169 else 3170 *(size_t *)valp = q->q_lowat; 3171 break; 3172 3173 case QMAXPSZ: 3174 if (qbp) 3175 error = EINVAL; 3176 else 3177 *(ssize_t *)valp = q->q_maxpsz; 3178 break; 3179 3180 case QMINPSZ: 3181 if (qbp) 3182 error = EINVAL; 3183 else 3184 *(ssize_t *)valp = q->q_minpsz; 3185 break; 3186 3187 case QCOUNT: 3188 if (qbp) 3189 *(size_t *)valp = qbp->qb_count; 3190 else 3191 *(size_t *)valp = q->q_count; 3192 break; 3193 3194 case QFIRST: 3195 if (qbp) 3196 *(mblk_t **)valp = qbp->qb_first; 3197 else 3198 *(mblk_t **)valp = q->q_first; 3199 break; 3200 3201 case QLAST: 3202 if (qbp) 3203 *(mblk_t **)valp = qbp->qb_last; 3204 else 3205 *(mblk_t **)valp = q->q_last; 3206 break; 3207 3208 case QFLAG: 3209 if (qbp) 3210 *(uint_t *)valp = qbp->qb_flag; 3211 else 3212 *(uint_t *)valp = q->q_flag; 3213 break; 3214 3215 case QSTRUIOT: 3216 if (qbp) 3217 error = EINVAL; 3218 else 3219 *(short *)valp = q->q_struiot; 3220 break; 3221 3222 default: 3223 error = EINVAL; 3224 break; 3225 } 3226 done: 3227 if (freezer != curthread) 3228 mutex_exit(QLOCK(q)); 3229 return (error); 3230 } 3231 3232 /* 3233 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3234 * QWANTWSYNC or QWANTR or QWANTW, 3235 * 3236 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3237 * deferred wakeup will be done. Also if strpoll() in progress then a 3238 * deferred pollwakeup will be done. 3239 */ 3240 void 3241 strwakeq(queue_t *q, int flag) 3242 { 3243 stdata_t *stp = STREAM(q); 3244 pollhead_t *pl; 3245 3246 mutex_enter(&stp->sd_lock); 3247 pl = &stp->sd_pollist; 3248 if (flag & QWANTWSYNC) { 3249 ASSERT(!(q->q_flag & QREADR)); 3250 if (stp->sd_flag & WSLEEP) { 3251 stp->sd_flag &= ~WSLEEP; 3252 cv_broadcast(&stp->sd_wrq->q_wait); 3253 } else { 3254 stp->sd_wakeq |= WSLEEP; 3255 } 3256 3257 mutex_exit(&stp->sd_lock); 3258 pollwakeup(pl, POLLWRNORM); 3259 mutex_enter(&stp->sd_lock); 3260 3261 if (stp->sd_sigflags & S_WRNORM) 3262 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3263 } else if (flag & QWANTR) { 3264 if (stp->sd_flag & RSLEEP) { 3265 stp->sd_flag &= ~RSLEEP; 3266 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3267 } else { 3268 stp->sd_wakeq |= RSLEEP; 3269 } 3270 3271 mutex_exit(&stp->sd_lock); 3272 pollwakeup(pl, POLLIN | POLLRDNORM); 3273 mutex_enter(&stp->sd_lock); 3274 3275 { 3276 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3277 3278 if (events) 3279 strsendsig(stp->sd_siglist, events, 0, 0); 3280 } 3281 } else { 3282 if (stp->sd_flag & WSLEEP) { 3283 stp->sd_flag &= ~WSLEEP; 3284 cv_broadcast(&stp->sd_wrq->q_wait); 3285 } 3286 3287 mutex_exit(&stp->sd_lock); 3288 pollwakeup(pl, POLLWRNORM); 3289 mutex_enter(&stp->sd_lock); 3290 3291 if (stp->sd_sigflags & S_WRNORM) 3292 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3293 } 3294 mutex_exit(&stp->sd_lock); 3295 } 3296 3297 int 3298 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3299 { 3300 stdata_t *stp = STREAM(q); 3301 int typ = STRUIOT_STANDARD; 3302 uio_t *uiop = &dp->d_uio; 3303 dblk_t *dbp; 3304 ssize_t uiocnt; 3305 ssize_t cnt; 3306 unsigned char *ptr; 3307 ssize_t resid; 3308 int error = 0; 3309 on_trap_data_t otd; 3310 queue_t *stwrq; 3311 3312 /* 3313 * Plumbing may change while taking the type so store the 3314 * queue in a temporary variable. It doesn't matter even 3315 * if the we take the type from the previous plumbing, 3316 * that's because if the plumbing has changed when we were 3317 * holding the queue in a temporary variable, we can continue 3318 * processing the message the way it would have been processed 3319 * in the old plumbing, without any side effects but a bit 3320 * extra processing for partial ip header checksum. 3321 * 3322 * This has been done to avoid holding the sd_lock which is 3323 * very hot. 3324 */ 3325 3326 stwrq = stp->sd_struiowrq; 3327 if (stwrq) 3328 typ = stwrq->q_struiot; 3329 3330 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3331 dbp = mp->b_datap; 3332 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3333 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3334 cnt = MIN(uiocnt, uiop->uio_resid); 3335 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3336 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3337 /* 3338 * Either this mblk has already been processed 3339 * or there is no more room in this mblk (?). 3340 */ 3341 continue; 3342 } 3343 switch (typ) { 3344 case STRUIOT_STANDARD: 3345 if (noblock) { 3346 if (on_trap(&otd, OT_DATA_ACCESS)) { 3347 no_trap(); 3348 error = EWOULDBLOCK; 3349 goto out; 3350 } 3351 } 3352 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3353 if (noblock) 3354 no_trap(); 3355 goto out; 3356 } 3357 if (noblock) 3358 no_trap(); 3359 break; 3360 3361 default: 3362 error = EIO; 3363 goto out; 3364 } 3365 dbp->db_struioflag |= STRUIO_DONE; 3366 dbp->db_cksumstuff += cnt; 3367 } 3368 out: 3369 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3370 /* 3371 * A fault has occured and some bytes were moved to the 3372 * current mblk, the uio_t has already been updated by 3373 * the appropriate uio routine, so also update the mblk 3374 * to reflect this in case this same mblk chain is used 3375 * again (after the fault has been handled). 3376 */ 3377 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3378 if (uiocnt >= resid) 3379 dbp->db_cksumstuff += resid; 3380 } 3381 return (error); 3382 } 3383 3384 /* 3385 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3386 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3387 * that removeq() will not try to close the queue while a thread is inside the 3388 * queue. 3389 */ 3390 static boolean_t 3391 rwnext_enter(queue_t *qp) 3392 { 3393 mutex_enter(QLOCK(qp)); 3394 if (qp->q_flag & QWCLOSE) { 3395 mutex_exit(QLOCK(qp)); 3396 return (B_FALSE); 3397 } 3398 qp->q_rwcnt++; 3399 ASSERT(qp->q_rwcnt != 0); 3400 mutex_exit(QLOCK(qp)); 3401 return (B_TRUE); 3402 } 3403 3404 /* 3405 * Decrease the count of threads running in sync stream queue and wake up any 3406 * threads blocked in removeq(). 3407 */ 3408 static void 3409 rwnext_exit(queue_t *qp) 3410 { 3411 mutex_enter(QLOCK(qp)); 3412 qp->q_rwcnt--; 3413 if (qp->q_flag & QWANTRMQSYNC) { 3414 qp->q_flag &= ~QWANTRMQSYNC; 3415 cv_broadcast(&qp->q_wait); 3416 } 3417 mutex_exit(QLOCK(qp)); 3418 } 3419 3420 /* 3421 * The purpose of rwnext() is to call the rw procedure of the next 3422 * (downstream) modules queue. 3423 * 3424 * treated as put entrypoint for perimeter syncronization. 3425 * 3426 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3427 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3428 * not matter if any regular put entrypoints have been already entered. We 3429 * can't increment one of the sq_putcounts (instead of sq_count) because 3430 * qwait_rw won't know which counter to decrement. 3431 * 3432 * It would be reasonable to add the lockless FASTPUT logic. 3433 */ 3434 int 3435 rwnext(queue_t *qp, struiod_t *dp) 3436 { 3437 queue_t *nqp; 3438 syncq_t *sq; 3439 uint16_t count; 3440 uint16_t flags; 3441 struct qinit *qi; 3442 int (*proc)(); 3443 struct stdata *stp; 3444 int isread; 3445 int rval; 3446 3447 stp = STREAM(qp); 3448 /* 3449 * Prevent q_next from changing by holding sd_lock until acquiring 3450 * SQLOCK. Note that a read-side rwnext from the streamhead will 3451 * already have sd_lock acquired. In either case sd_lock is always 3452 * released after acquiring SQLOCK. 3453 * 3454 * The streamhead read-side holding sd_lock when calling rwnext is 3455 * required to prevent a race condition were M_DATA mblks flowing 3456 * up the read-side of the stream could be bypassed by a rwnext() 3457 * down-call. In this case sd_lock acts as the streamhead perimeter. 3458 */ 3459 if ((nqp = _WR(qp)) == qp) { 3460 isread = 0; 3461 mutex_enter(&stp->sd_lock); 3462 qp = nqp->q_next; 3463 } else { 3464 isread = 1; 3465 if (nqp != stp->sd_wrq) 3466 /* Not streamhead */ 3467 mutex_enter(&stp->sd_lock); 3468 qp = _RD(nqp->q_next); 3469 } 3470 qi = qp->q_qinfo; 3471 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3472 /* 3473 * Not a synchronous module or no r/w procedure for this 3474 * queue, so just return EINVAL and let the caller handle it. 3475 */ 3476 mutex_exit(&stp->sd_lock); 3477 return (EINVAL); 3478 } 3479 3480 if (rwnext_enter(qp) == B_FALSE) { 3481 mutex_exit(&stp->sd_lock); 3482 return (EINVAL); 3483 } 3484 3485 sq = qp->q_syncq; 3486 mutex_enter(SQLOCK(sq)); 3487 mutex_exit(&stp->sd_lock); 3488 count = sq->sq_count; 3489 flags = sq->sq_flags; 3490 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3491 3492 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3493 /* 3494 * if this queue is being closed, return. 3495 */ 3496 if (qp->q_flag & QWCLOSE) { 3497 mutex_exit(SQLOCK(sq)); 3498 rwnext_exit(qp); 3499 return (EINVAL); 3500 } 3501 3502 /* 3503 * Wait until we can enter the inner perimeter. 3504 */ 3505 sq->sq_flags = flags | SQ_WANTWAKEUP; 3506 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3507 count = sq->sq_count; 3508 flags = sq->sq_flags; 3509 } 3510 3511 if (isread == 0 && stp->sd_struiowrq == NULL || 3512 isread == 1 && stp->sd_struiordq == NULL) { 3513 /* 3514 * Stream plumbing changed while waiting for inner perimeter 3515 * so just return EINVAL and let the caller handle it. 3516 */ 3517 mutex_exit(SQLOCK(sq)); 3518 rwnext_exit(qp); 3519 return (EINVAL); 3520 } 3521 if (!(flags & SQ_CIPUT)) 3522 sq->sq_flags = flags | SQ_EXCL; 3523 sq->sq_count = count + 1; 3524 ASSERT(sq->sq_count != 0); /* Wraparound */ 3525 /* 3526 * Note: The only message ordering guarantee that rwnext() makes is 3527 * for the write queue flow-control case. All others (r/w queue 3528 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3529 * the queue's rw procedure. This could be genralized here buy 3530 * running the queue's service procedure, but that wouldn't be 3531 * the most efficent for all cases. 3532 */ 3533 mutex_exit(SQLOCK(sq)); 3534 if (! isread && (qp->q_flag & QFULL)) { 3535 /* 3536 * Write queue may be flow controlled. If so, 3537 * mark the queue for wakeup when it's not. 3538 */ 3539 mutex_enter(QLOCK(qp)); 3540 if (qp->q_flag & QFULL) { 3541 qp->q_flag |= QWANTWSYNC; 3542 mutex_exit(QLOCK(qp)); 3543 rval = EWOULDBLOCK; 3544 goto out; 3545 } 3546 mutex_exit(QLOCK(qp)); 3547 } 3548 3549 if (! isread && dp->d_mp) 3550 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3551 dp->d_mp->b_datap->db_base); 3552 3553 rval = (*proc)(qp, dp); 3554 3555 if (isread && dp->d_mp) 3556 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3557 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3558 out: 3559 /* 3560 * The queue is protected from being freed by sq_count, so it is 3561 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3562 */ 3563 rwnext_exit(qp); 3564 3565 mutex_enter(SQLOCK(sq)); 3566 flags = sq->sq_flags; 3567 ASSERT(sq->sq_count != 0); 3568 sq->sq_count--; 3569 if (flags & SQ_TAIL) { 3570 putnext_tail(sq, qp, flags); 3571 /* 3572 * The only purpose of this ASSERT is to preserve calling stack 3573 * in DEBUG kernel. 3574 */ 3575 ASSERT(flags & SQ_TAIL); 3576 return (rval); 3577 } 3578 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3579 /* 3580 * Safe to always drop SQ_EXCL: 3581 * Not SQ_CIPUT means we set SQ_EXCL above 3582 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3583 * did a qwriter(INNER) in which case nobody else 3584 * is in the inner perimeter and we are exiting. 3585 * 3586 * I would like to make the following assertion: 3587 * 3588 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3589 * sq->sq_count == 0); 3590 * 3591 * which indicates that if we are both putshared and exclusive, 3592 * we became exclusive while executing the putproc, and the only 3593 * claim on the syncq was the one we dropped a few lines above. 3594 * But other threads that enter putnext while the syncq is exclusive 3595 * need to make a claim as they may need to drop SQLOCK in the 3596 * has_writers case to avoid deadlocks. If these threads are 3597 * delayed or preempted, it is possible that the writer thread can 3598 * find out that there are other claims making the (sq_count == 0) 3599 * test invalid. 3600 */ 3601 3602 sq->sq_flags = flags & ~SQ_EXCL; 3603 if (sq->sq_flags & SQ_WANTWAKEUP) { 3604 sq->sq_flags &= ~SQ_WANTWAKEUP; 3605 cv_broadcast(&sq->sq_wait); 3606 } 3607 mutex_exit(SQLOCK(sq)); 3608 return (rval); 3609 } 3610 3611 /* 3612 * The purpose of infonext() is to call the info procedure of the next 3613 * (downstream) modules queue. 3614 * 3615 * treated as put entrypoint for perimeter syncronization. 3616 * 3617 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3618 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3619 * it does not matter if any regular put entrypoints have been already 3620 * entered. 3621 */ 3622 int 3623 infonext(queue_t *qp, infod_t *idp) 3624 { 3625 queue_t *nqp; 3626 syncq_t *sq; 3627 uint16_t count; 3628 uint16_t flags; 3629 struct qinit *qi; 3630 int (*proc)(); 3631 struct stdata *stp; 3632 int rval; 3633 3634 stp = STREAM(qp); 3635 /* 3636 * Prevent q_next from changing by holding sd_lock until 3637 * acquiring SQLOCK. 3638 */ 3639 mutex_enter(&stp->sd_lock); 3640 if ((nqp = _WR(qp)) == qp) { 3641 qp = nqp->q_next; 3642 } else { 3643 qp = _RD(nqp->q_next); 3644 } 3645 qi = qp->q_qinfo; 3646 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3647 mutex_exit(&stp->sd_lock); 3648 return (EINVAL); 3649 } 3650 sq = qp->q_syncq; 3651 mutex_enter(SQLOCK(sq)); 3652 mutex_exit(&stp->sd_lock); 3653 count = sq->sq_count; 3654 flags = sq->sq_flags; 3655 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3656 3657 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3658 /* 3659 * Wait until we can enter the inner perimeter. 3660 */ 3661 sq->sq_flags = flags | SQ_WANTWAKEUP; 3662 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3663 count = sq->sq_count; 3664 flags = sq->sq_flags; 3665 } 3666 3667 if (! (flags & SQ_CIPUT)) 3668 sq->sq_flags = flags | SQ_EXCL; 3669 sq->sq_count = count + 1; 3670 ASSERT(sq->sq_count != 0); /* Wraparound */ 3671 mutex_exit(SQLOCK(sq)); 3672 3673 rval = (*proc)(qp, idp); 3674 3675 mutex_enter(SQLOCK(sq)); 3676 flags = sq->sq_flags; 3677 ASSERT(sq->sq_count != 0); 3678 sq->sq_count--; 3679 if (flags & SQ_TAIL) { 3680 putnext_tail(sq, qp, flags); 3681 /* 3682 * The only purpose of this ASSERT is to preserve calling stack 3683 * in DEBUG kernel. 3684 */ 3685 ASSERT(flags & SQ_TAIL); 3686 return (rval); 3687 } 3688 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3689 /* 3690 * XXXX 3691 * I am not certain the next comment is correct here. I need to consider 3692 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3693 * might cause other problems. It just might be safer to drop it if 3694 * !SQ_CIPUT because that is when we set it. 3695 */ 3696 /* 3697 * Safe to always drop SQ_EXCL: 3698 * Not SQ_CIPUT means we set SQ_EXCL above 3699 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3700 * did a qwriter(INNER) in which case nobody else 3701 * is in the inner perimeter and we are exiting. 3702 * 3703 * I would like to make the following assertion: 3704 * 3705 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3706 * sq->sq_count == 0); 3707 * 3708 * which indicates that if we are both putshared and exclusive, 3709 * we became exclusive while executing the putproc, and the only 3710 * claim on the syncq was the one we dropped a few lines above. 3711 * But other threads that enter putnext while the syncq is exclusive 3712 * need to make a claim as they may need to drop SQLOCK in the 3713 * has_writers case to avoid deadlocks. If these threads are 3714 * delayed or preempted, it is possible that the writer thread can 3715 * find out that there are other claims making the (sq_count == 0) 3716 * test invalid. 3717 */ 3718 3719 sq->sq_flags = flags & ~SQ_EXCL; 3720 mutex_exit(SQLOCK(sq)); 3721 return (rval); 3722 } 3723 3724 /* 3725 * Return nonzero if the queue is responsible for struio(), else return 0. 3726 */ 3727 int 3728 isuioq(queue_t *q) 3729 { 3730 if (q->q_flag & QREADR) 3731 return (STREAM(q)->sd_struiordq == q); 3732 else 3733 return (STREAM(q)->sd_struiowrq == q); 3734 } 3735 3736 #if defined(__sparc) 3737 int disable_putlocks = 0; 3738 #else 3739 int disable_putlocks = 1; 3740 #endif 3741 3742 /* 3743 * called by create_putlock. 3744 */ 3745 static void 3746 create_syncq_putlocks(queue_t *q) 3747 { 3748 syncq_t *sq = q->q_syncq; 3749 ciputctrl_t *cip; 3750 int i; 3751 3752 ASSERT(sq != NULL); 3753 3754 ASSERT(disable_putlocks == 0); 3755 ASSERT(n_ciputctrl >= min_n_ciputctrl); 3756 ASSERT(ciputctrl_cache != NULL); 3757 3758 if (!(sq->sq_type & SQ_CIPUT)) 3759 return; 3760 3761 for (i = 0; i <= 1; i++) { 3762 if (sq->sq_ciputctrl == NULL) { 3763 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3764 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3765 mutex_enter(SQLOCK(sq)); 3766 if (sq->sq_ciputctrl != NULL) { 3767 mutex_exit(SQLOCK(sq)); 3768 kmem_cache_free(ciputctrl_cache, cip); 3769 } else { 3770 ASSERT(sq->sq_nciputctrl == 0); 3771 sq->sq_nciputctrl = n_ciputctrl - 1; 3772 /* 3773 * putnext checks sq_ciputctrl without holding 3774 * SQLOCK. if it is not NULL putnext assumes 3775 * sq_nciputctrl is initialized. membar below 3776 * insures that. 3777 */ 3778 membar_producer(); 3779 sq->sq_ciputctrl = cip; 3780 mutex_exit(SQLOCK(sq)); 3781 } 3782 } 3783 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 3784 if (i == 1) 3785 break; 3786 q = _OTHERQ(q); 3787 if (!(q->q_flag & QPERQ)) { 3788 ASSERT(sq == q->q_syncq); 3789 break; 3790 } 3791 ASSERT(q->q_syncq != NULL); 3792 ASSERT(sq != q->q_syncq); 3793 sq = q->q_syncq; 3794 ASSERT(sq->sq_type & SQ_CIPUT); 3795 } 3796 } 3797 3798 /* 3799 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 3800 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 3801 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 3802 * starting from q and down to the driver. 3803 * 3804 * This should be called after the affected queues are part of stream 3805 * geometry. It should be called from driver/module open routine after 3806 * qprocson() call. It is also called from nfs syscall where it is known that 3807 * stream is configured and won't change its geometry during create_putlock 3808 * call. 3809 * 3810 * caller normally uses 0 value for the stream argument to speed up MT putnext 3811 * into the perimeter of q for example because its perimeter is per module 3812 * (e.g. IP). 3813 * 3814 * caller normally uses non 0 value for the stream argument to hint the system 3815 * that the stream of q is a very contended global system stream 3816 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 3817 * particularly MT hot. 3818 * 3819 * Caller insures stream plumbing won't happen while we are here and therefore 3820 * q_next can be safely used. 3821 */ 3822 3823 void 3824 create_putlocks(queue_t *q, int stream) 3825 { 3826 ciputctrl_t *cip; 3827 struct stdata *stp = STREAM(q); 3828 3829 q = _WR(q); 3830 ASSERT(stp != NULL); 3831 3832 if (disable_putlocks != 0) 3833 return; 3834 3835 if (n_ciputctrl < min_n_ciputctrl) 3836 return; 3837 3838 ASSERT(ciputctrl_cache != NULL); 3839 3840 if (stream != 0 && stp->sd_ciputctrl == NULL) { 3841 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3842 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3843 mutex_enter(&stp->sd_lock); 3844 if (stp->sd_ciputctrl != NULL) { 3845 mutex_exit(&stp->sd_lock); 3846 kmem_cache_free(ciputctrl_cache, cip); 3847 } else { 3848 ASSERT(stp->sd_nciputctrl == 0); 3849 stp->sd_nciputctrl = n_ciputctrl - 1; 3850 /* 3851 * putnext checks sd_ciputctrl without holding 3852 * sd_lock. if it is not NULL putnext assumes 3853 * sd_nciputctrl is initialized. membar below 3854 * insures that. 3855 */ 3856 membar_producer(); 3857 stp->sd_ciputctrl = cip; 3858 mutex_exit(&stp->sd_lock); 3859 } 3860 } 3861 3862 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 3863 3864 while (_SAMESTR(q)) { 3865 create_syncq_putlocks(q); 3866 if (stream == 0) 3867 return; 3868 q = q->q_next; 3869 } 3870 ASSERT(q != NULL); 3871 create_syncq_putlocks(q); 3872 } 3873 3874 /* 3875 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 3876 * through a stream. 3877 * 3878 * Data currently record per event is a hrtime stamp, queue address, event 3879 * type, and a per type datum. Much of the STREAMS framework is instrumented 3880 * for automatic flow tracing (when enabled). Events can be defined and used 3881 * by STREAMS modules and drivers. 3882 * 3883 * Global objects: 3884 * 3885 * str_ftevent() - Add a flow-trace event to a dblk. 3886 * str_ftfree() - Free flow-trace data 3887 * 3888 * Local objects: 3889 * 3890 * fthdr_cache - pointer to the kmem cache for trace header. 3891 * ftblk_cache - pointer to the kmem cache for trace data blocks. 3892 */ 3893 3894 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 3895 3896 void 3897 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 3898 { 3899 ftblk_t *bp = hp->tail; 3900 ftblk_t *nbp; 3901 ftevnt_t *ep; 3902 int ix, nix; 3903 3904 ASSERT(hp != NULL); 3905 3906 for (;;) { 3907 if ((ix = bp->ix) == FTBLK_EVNTS) { 3908 /* 3909 * Tail doesn't have room, so need a new tail. 3910 * 3911 * To make this MT safe, first, allocate a new 3912 * ftblk, and initialize it. To make life a 3913 * little easier, reserve the first slot (mostly 3914 * by making ix = 1). When we are finished with 3915 * the initialization, CAS this pointer to the 3916 * tail. If this succeeds, this is the new 3917 * "next" block. Otherwise, another thread 3918 * got here first, so free the block and start 3919 * again. 3920 */ 3921 if (!(nbp = kmem_cache_alloc(ftblk_cache, 3922 KM_NOSLEEP))) { 3923 /* no mem, so punt */ 3924 str_ftnever++; 3925 /* free up all flow data? */ 3926 return; 3927 } 3928 nbp->nxt = NULL; 3929 nbp->ix = 1; 3930 /* 3931 * Just in case there is another thread about 3932 * to get the next index, we need to make sure 3933 * the value is there for it. 3934 */ 3935 membar_producer(); 3936 if (casptr(&hp->tail, bp, nbp) == bp) { 3937 /* CAS was successful */ 3938 bp->nxt = nbp; 3939 membar_producer(); 3940 bp = nbp; 3941 ix = 0; 3942 goto cas_good; 3943 } else { 3944 kmem_cache_free(ftblk_cache, nbp); 3945 bp = hp->tail; 3946 continue; 3947 } 3948 } 3949 nix = ix + 1; 3950 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 3951 cas_good: 3952 if (curthread != hp->thread) { 3953 hp->thread = curthread; 3954 evnt |= FTEV_CS; 3955 } 3956 if (CPU->cpu_seqid != hp->cpu_seqid) { 3957 hp->cpu_seqid = CPU->cpu_seqid; 3958 evnt |= FTEV_PS; 3959 } 3960 ep = &bp->ev[ix]; 3961 break; 3962 } 3963 } 3964 3965 if (evnt & FTEV_QMASK) { 3966 queue_t *qp = p; 3967 3968 /* 3969 * It is possible that the module info is broke 3970 * (as is logsubr.c at this comment writing). 3971 * Instead of panicing or doing other unmentionables, 3972 * we shall put a dummy name as the mid, and continue. 3973 */ 3974 if (qp->q_qinfo == NULL) 3975 ep->mid = "NONAME"; 3976 else 3977 ep->mid = qp->q_qinfo->qi_minfo->mi_idname; 3978 3979 if (!(qp->q_flag & QREADR)) 3980 evnt |= FTEV_ISWR; 3981 } else { 3982 ep->mid = (char *)p; 3983 } 3984 3985 ep->ts = gethrtime(); 3986 ep->evnt = evnt; 3987 ep->data = data; 3988 hp->hash = (hp->hash << 9) + hp->hash; 3989 hp->hash += (evnt << 16) | data; 3990 hp->hash += (uintptr_t)ep->mid; 3991 } 3992 3993 /* 3994 * Free flow-trace data. 3995 */ 3996 void 3997 str_ftfree(dblk_t *dbp) 3998 { 3999 fthdr_t *hp = dbp->db_fthdr; 4000 ftblk_t *bp = &hp->first; 4001 ftblk_t *nbp; 4002 4003 if (bp != hp->tail || bp->ix != 0) { 4004 /* 4005 * Clear out the hash, have the tail point to itself, and free 4006 * any continuation blocks. 4007 */ 4008 bp = hp->first.nxt; 4009 hp->tail = &hp->first; 4010 hp->hash = 0; 4011 hp->first.nxt = NULL; 4012 hp->first.ix = 0; 4013 while (bp != NULL) { 4014 nbp = bp->nxt; 4015 kmem_cache_free(ftblk_cache, bp); 4016 bp = nbp; 4017 } 4018 } 4019 kmem_cache_free(fthdr_cache, hp); 4020 dbp->db_fthdr = NULL; 4021 } 4022