1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 25 /* 26 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 27 * Use is subject to license terms. 28 */ 29 30 #pragma ident "%Z%%M% %I% %E% SMI" 31 32 #include <sys/types.h> 33 #include <sys/param.h> 34 #include <sys/thread.h> 35 #include <sys/sysmacros.h> 36 #include <sys/stropts.h> 37 #include <sys/stream.h> 38 #include <sys/strsubr.h> 39 #include <sys/strsun.h> 40 #include <sys/conf.h> 41 #include <sys/debug.h> 42 #include <sys/cmn_err.h> 43 #include <sys/kmem.h> 44 #include <sys/atomic.h> 45 #include <sys/errno.h> 46 #include <sys/vtrace.h> 47 #include <sys/ftrace.h> 48 #include <sys/ontrap.h> 49 #include <sys/multidata.h> 50 #include <sys/multidata_impl.h> 51 #include <sys/sdt.h> 52 #include <sys/strft.h> 53 54 #ifdef DEBUG 55 #include <sys/kmem_impl.h> 56 #endif 57 58 /* 59 * This file contains all the STREAMS utility routines that may 60 * be used by modules and drivers. 61 */ 62 63 /* 64 * STREAMS message allocator: principles of operation 65 * 66 * The streams message allocator consists of all the routines that 67 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 68 * dupb(), freeb() and freemsg(). What follows is a high-level view 69 * of how the allocator works. 70 * 71 * Every streams message consists of one or more mblks, a dblk, and data. 72 * All mblks for all types of messages come from a common mblk_cache. 73 * The dblk and data come in several flavors, depending on how the 74 * message is allocated: 75 * 76 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 77 * fixed-size dblk/data caches. For message sizes that are multiples of 78 * PAGESIZE, dblks are allocated separately from the buffer. 79 * The associated buffer is allocated by the constructor using kmem_alloc(). 80 * For all other message sizes, dblk and its associated data is allocated 81 * as a single contiguous chunk of memory. 82 * Objects in these caches consist of a dblk plus its associated data. 83 * allocb() determines the nearest-size cache by table lookup: 84 * the dblk_cache[] array provides the mapping from size to dblk cache. 85 * 86 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 87 * kmem_alloc()'ing a buffer for the data and supplying that 88 * buffer to gesballoc(), described below. 89 * 90 * (3) The four flavors of [d]esballoc[a] are all implemented by a 91 * common routine, gesballoc() ("generic esballoc"). gesballoc() 92 * allocates a dblk from the global dblk_esb_cache and sets db_base, 93 * db_lim and db_frtnp to describe the caller-supplied buffer. 94 * 95 * While there are several routines to allocate messages, there is only 96 * one routine to free messages: freeb(). freeb() simply invokes the 97 * dblk's free method, dbp->db_free(), which is set at allocation time. 98 * 99 * dupb() creates a new reference to a message by allocating a new mblk, 100 * incrementing the dblk reference count and setting the dblk's free 101 * method to dblk_decref(). The dblk's original free method is retained 102 * in db_lastfree. dblk_decref() decrements the reference count on each 103 * freeb(). If this is not the last reference it just frees the mblk; 104 * if this *is* the last reference, it restores db_free to db_lastfree, 105 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 106 * 107 * The implementation makes aggressive use of kmem object caching for 108 * maximum performance. This makes the code simple and compact, but 109 * also a bit abstruse in some places. The invariants that constitute a 110 * message's constructed state, described below, are more subtle than usual. 111 * 112 * Every dblk has an "attached mblk" as part of its constructed state. 113 * The mblk is allocated by the dblk's constructor and remains attached 114 * until the message is either dup'ed or pulled up. In the dupb() case 115 * the mblk association doesn't matter until the last free, at which time 116 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 117 * the mblk association because it swaps the leading mblks of two messages, 118 * so it is responsible for swapping their db_mblk pointers accordingly. 119 * From a constructed-state viewpoint it doesn't matter that a dblk's 120 * attached mblk can change while the message is allocated; all that 121 * matters is that the dblk has *some* attached mblk when it's freed. 122 * 123 * The sizes of the allocb() small-message caches are not magical. 124 * They represent a good trade-off between internal and external 125 * fragmentation for current workloads. They should be reevaluated 126 * periodically, especially if allocations larger than DBLK_MAX_CACHE 127 * become common. We use 64-byte alignment so that dblks don't 128 * straddle cache lines unnecessarily. 129 */ 130 #define DBLK_MAX_CACHE 73728 131 #define DBLK_CACHE_ALIGN 64 132 #define DBLK_MIN_SIZE 8 133 #define DBLK_SIZE_SHIFT 3 134 135 #ifdef _BIG_ENDIAN 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 138 #else 139 #define DBLK_RTFU_SHIFT(field) \ 140 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 141 #endif 142 143 #define DBLK_RTFU(ref, type, flags, uioflag) \ 144 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 145 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 146 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 147 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 148 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 149 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 150 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 151 152 static size_t dblk_sizes[] = { 153 #ifdef _LP64 154 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856, 155 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624, 156 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392, 157 #else 158 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904, 159 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672, 160 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440, 161 #endif 162 DBLK_MAX_CACHE, 0 163 }; 164 165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 166 static struct kmem_cache *mblk_cache; 167 static struct kmem_cache *dblk_esb_cache; 168 static struct kmem_cache *fthdr_cache; 169 static struct kmem_cache *ftblk_cache; 170 171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 172 static mblk_t *allocb_oversize(size_t size, int flags); 173 static int allocb_tryhard_fails; 174 static void frnop_func(void *arg); 175 frtn_t frnop = { frnop_func }; 176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 177 178 static boolean_t rwnext_enter(queue_t *qp); 179 static void rwnext_exit(queue_t *qp); 180 181 /* 182 * Patchable mblk/dblk kmem_cache flags. 183 */ 184 int dblk_kmem_flags = 0; 185 int mblk_kmem_flags = 0; 186 187 188 static int 189 dblk_constructor(void *buf, void *cdrarg, int kmflags) 190 { 191 dblk_t *dbp = buf; 192 ssize_t msg_size = (ssize_t)cdrarg; 193 size_t index; 194 195 ASSERT(msg_size != 0); 196 197 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 198 199 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 200 201 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 202 return (-1); 203 if ((msg_size & PAGEOFFSET) == 0) { 204 dbp->db_base = kmem_alloc(msg_size, kmflags); 205 if (dbp->db_base == NULL) { 206 kmem_cache_free(mblk_cache, dbp->db_mblk); 207 return (-1); 208 } 209 } else { 210 dbp->db_base = (unsigned char *)&dbp[1]; 211 } 212 213 dbp->db_mblk->b_datap = dbp; 214 dbp->db_cache = dblk_cache[index]; 215 dbp->db_lim = dbp->db_base + msg_size; 216 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 217 dbp->db_frtnp = NULL; 218 dbp->db_fthdr = NULL; 219 dbp->db_credp = NULL; 220 dbp->db_cpid = -1; 221 dbp->db_struioflag = 0; 222 dbp->db_struioun.cksum.flags = 0; 223 return (0); 224 } 225 226 /*ARGSUSED*/ 227 static int 228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 229 { 230 dblk_t *dbp = buf; 231 232 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 233 return (-1); 234 dbp->db_mblk->b_datap = dbp; 235 dbp->db_cache = dblk_esb_cache; 236 dbp->db_fthdr = NULL; 237 dbp->db_credp = NULL; 238 dbp->db_cpid = -1; 239 dbp->db_struioflag = 0; 240 dbp->db_struioun.cksum.flags = 0; 241 return (0); 242 } 243 244 static int 245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 246 { 247 dblk_t *dbp = buf; 248 bcache_t *bcp = (bcache_t *)cdrarg; 249 250 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 251 return (-1); 252 253 if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache, 254 kmflags)) == NULL) { 255 kmem_cache_free(mblk_cache, dbp->db_mblk); 256 return (-1); 257 } 258 259 dbp->db_mblk->b_datap = dbp; 260 dbp->db_cache = (void *)bcp; 261 dbp->db_lim = dbp->db_base + bcp->size; 262 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 263 dbp->db_frtnp = NULL; 264 dbp->db_fthdr = NULL; 265 dbp->db_credp = NULL; 266 dbp->db_cpid = -1; 267 dbp->db_struioflag = 0; 268 dbp->db_struioun.cksum.flags = 0; 269 return (0); 270 } 271 272 /*ARGSUSED*/ 273 static void 274 dblk_destructor(void *buf, void *cdrarg) 275 { 276 dblk_t *dbp = buf; 277 ssize_t msg_size = (ssize_t)cdrarg; 278 279 ASSERT(dbp->db_mblk->b_datap == dbp); 280 281 ASSERT(msg_size != 0); 282 283 ASSERT(dbp->db_struioflag == 0); 284 ASSERT(dbp->db_struioun.cksum.flags == 0); 285 286 if ((msg_size & PAGEOFFSET) == 0) { 287 kmem_free(dbp->db_base, msg_size); 288 } 289 290 kmem_cache_free(mblk_cache, dbp->db_mblk); 291 } 292 293 static void 294 bcache_dblk_destructor(void *buf, void *cdrarg) 295 { 296 dblk_t *dbp = buf; 297 bcache_t *bcp = (bcache_t *)cdrarg; 298 299 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 300 301 ASSERT(dbp->db_mblk->b_datap == dbp); 302 303 ASSERT(dbp->db_struioflag == 0); 304 ASSERT(dbp->db_struioun.cksum.flags == 0); 305 306 kmem_cache_free(mblk_cache, dbp->db_mblk); 307 } 308 309 void 310 streams_msg_init(void) 311 { 312 char name[40]; 313 size_t size; 314 size_t lastsize = DBLK_MIN_SIZE; 315 size_t *sizep; 316 struct kmem_cache *cp; 317 size_t tot_size; 318 int offset; 319 320 mblk_cache = kmem_cache_create("streams_mblk", 321 sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL, 322 mblk_kmem_flags); 323 324 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 325 326 if ((offset = (size & PAGEOFFSET)) != 0) { 327 /* 328 * We are in the middle of a page, dblk should 329 * be allocated on the same page 330 */ 331 tot_size = size + sizeof (dblk_t); 332 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 333 < PAGESIZE); 334 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 335 336 } else { 337 338 /* 339 * buf size is multiple of page size, dblk and 340 * buffer are allocated separately. 341 */ 342 343 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 344 tot_size = sizeof (dblk_t); 345 } 346 347 (void) sprintf(name, "streams_dblk_%ld", size); 348 cp = kmem_cache_create(name, tot_size, 349 DBLK_CACHE_ALIGN, dblk_constructor, 350 dblk_destructor, NULL, 351 (void *)(size), NULL, dblk_kmem_flags); 352 353 while (lastsize <= size) { 354 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 355 lastsize += DBLK_MIN_SIZE; 356 } 357 } 358 359 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", 360 sizeof (dblk_t), DBLK_CACHE_ALIGN, 361 dblk_esb_constructor, dblk_destructor, NULL, 362 (void *) sizeof (dblk_t), NULL, dblk_kmem_flags); 363 fthdr_cache = kmem_cache_create("streams_fthdr", 364 sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 365 ftblk_cache = kmem_cache_create("streams_ftblk", 366 sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 367 368 /* Initialize Multidata caches */ 369 mmd_init(); 370 371 /* initialize throttling queue for esballoc */ 372 esballoc_queue_init(); 373 } 374 375 /*ARGSUSED*/ 376 mblk_t * 377 allocb(size_t size, uint_t pri) 378 { 379 dblk_t *dbp; 380 mblk_t *mp; 381 size_t index; 382 383 index = (size - 1) >> DBLK_SIZE_SHIFT; 384 385 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 386 if (size != 0) { 387 mp = allocb_oversize(size, KM_NOSLEEP); 388 goto out; 389 } 390 index = 0; 391 } 392 393 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 394 mp = NULL; 395 goto out; 396 } 397 398 mp = dbp->db_mblk; 399 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 400 mp->b_next = mp->b_prev = mp->b_cont = NULL; 401 mp->b_rptr = mp->b_wptr = dbp->db_base; 402 mp->b_queue = NULL; 403 MBLK_BAND_FLAG_WORD(mp) = 0; 404 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 405 out: 406 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 407 408 return (mp); 409 } 410 411 mblk_t * 412 allocb_tmpl(size_t size, const mblk_t *tmpl) 413 { 414 mblk_t *mp = allocb(size, 0); 415 416 if (mp != NULL) { 417 cred_t *cr = DB_CRED(tmpl); 418 if (cr != NULL) 419 crhold(mp->b_datap->db_credp = cr); 420 DB_CPID(mp) = DB_CPID(tmpl); 421 DB_TYPE(mp) = DB_TYPE(tmpl); 422 } 423 return (mp); 424 } 425 426 mblk_t * 427 allocb_cred(size_t size, cred_t *cr) 428 { 429 mblk_t *mp = allocb(size, 0); 430 431 if (mp != NULL && cr != NULL) 432 crhold(mp->b_datap->db_credp = cr); 433 434 return (mp); 435 } 436 437 mblk_t * 438 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr) 439 { 440 mblk_t *mp = allocb_wait(size, 0, flags, error); 441 442 if (mp != NULL && cr != NULL) 443 crhold(mp->b_datap->db_credp = cr); 444 445 return (mp); 446 } 447 448 void 449 freeb(mblk_t *mp) 450 { 451 dblk_t *dbp = mp->b_datap; 452 453 ASSERT(dbp->db_ref > 0); 454 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 455 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 456 457 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 458 459 dbp->db_free(mp, dbp); 460 } 461 462 void 463 freemsg(mblk_t *mp) 464 { 465 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 466 while (mp) { 467 dblk_t *dbp = mp->b_datap; 468 mblk_t *mp_cont = mp->b_cont; 469 470 ASSERT(dbp->db_ref > 0); 471 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 472 473 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 474 475 dbp->db_free(mp, dbp); 476 mp = mp_cont; 477 } 478 } 479 480 /* 481 * Reallocate a block for another use. Try hard to use the old block. 482 * If the old data is wanted (copy), leave b_wptr at the end of the data, 483 * otherwise return b_wptr = b_rptr. 484 * 485 * This routine is private and unstable. 486 */ 487 mblk_t * 488 reallocb(mblk_t *mp, size_t size, uint_t copy) 489 { 490 mblk_t *mp1; 491 unsigned char *old_rptr; 492 ptrdiff_t cur_size; 493 494 if (mp == NULL) 495 return (allocb(size, BPRI_HI)); 496 497 cur_size = mp->b_wptr - mp->b_rptr; 498 old_rptr = mp->b_rptr; 499 500 ASSERT(mp->b_datap->db_ref != 0); 501 502 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 503 /* 504 * If the data is wanted and it will fit where it is, no 505 * work is required. 506 */ 507 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 508 return (mp); 509 510 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 511 mp1 = mp; 512 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 513 /* XXX other mp state could be copied too, db_flags ... ? */ 514 mp1->b_cont = mp->b_cont; 515 } else { 516 return (NULL); 517 } 518 519 if (copy) { 520 bcopy(old_rptr, mp1->b_rptr, cur_size); 521 mp1->b_wptr = mp1->b_rptr + cur_size; 522 } 523 524 if (mp != mp1) 525 freeb(mp); 526 527 return (mp1); 528 } 529 530 static void 531 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 532 { 533 ASSERT(dbp->db_mblk == mp); 534 if (dbp->db_fthdr != NULL) 535 str_ftfree(dbp); 536 537 /* set credp and projid to be 'unspecified' before returning to cache */ 538 if (dbp->db_credp != NULL) { 539 crfree(dbp->db_credp); 540 dbp->db_credp = NULL; 541 } 542 dbp->db_cpid = -1; 543 544 /* Reset the struioflag and the checksum flag fields */ 545 dbp->db_struioflag = 0; 546 dbp->db_struioun.cksum.flags = 0; 547 548 /* and the COOKED and/or UIOA flag(s) */ 549 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA); 550 551 kmem_cache_free(dbp->db_cache, dbp); 552 } 553 554 static void 555 dblk_decref(mblk_t *mp, dblk_t *dbp) 556 { 557 if (dbp->db_ref != 1) { 558 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 559 -(1 << DBLK_RTFU_SHIFT(db_ref))); 560 /* 561 * atomic_add_32_nv() just decremented db_ref, so we no longer 562 * have a reference to the dblk, which means another thread 563 * could free it. Therefore we cannot examine the dblk to 564 * determine whether ours was the last reference. Instead, 565 * we extract the new and minimum reference counts from rtfu. 566 * Note that all we're really saying is "if (ref != refmin)". 567 */ 568 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 569 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 570 kmem_cache_free(mblk_cache, mp); 571 return; 572 } 573 } 574 dbp->db_mblk = mp; 575 dbp->db_free = dbp->db_lastfree; 576 dbp->db_lastfree(mp, dbp); 577 } 578 579 mblk_t * 580 dupb(mblk_t *mp) 581 { 582 dblk_t *dbp = mp->b_datap; 583 mblk_t *new_mp; 584 uint32_t oldrtfu, newrtfu; 585 586 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 587 goto out; 588 589 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 590 new_mp->b_rptr = mp->b_rptr; 591 new_mp->b_wptr = mp->b_wptr; 592 new_mp->b_datap = dbp; 593 new_mp->b_queue = NULL; 594 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 595 596 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 597 598 dbp->db_free = dblk_decref; 599 do { 600 ASSERT(dbp->db_ref > 0); 601 oldrtfu = DBLK_RTFU_WORD(dbp); 602 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 603 /* 604 * If db_ref is maxed out we can't dup this message anymore. 605 */ 606 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 607 kmem_cache_free(mblk_cache, new_mp); 608 new_mp = NULL; 609 goto out; 610 } 611 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 612 613 out: 614 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 615 return (new_mp); 616 } 617 618 static void 619 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 620 { 621 frtn_t *frp = dbp->db_frtnp; 622 623 ASSERT(dbp->db_mblk == mp); 624 frp->free_func(frp->free_arg); 625 if (dbp->db_fthdr != NULL) 626 str_ftfree(dbp); 627 628 /* set credp and projid to be 'unspecified' before returning to cache */ 629 if (dbp->db_credp != NULL) { 630 crfree(dbp->db_credp); 631 dbp->db_credp = NULL; 632 } 633 dbp->db_cpid = -1; 634 dbp->db_struioflag = 0; 635 dbp->db_struioun.cksum.flags = 0; 636 637 kmem_cache_free(dbp->db_cache, dbp); 638 } 639 640 /*ARGSUSED*/ 641 static void 642 frnop_func(void *arg) 643 { 644 } 645 646 /* 647 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 648 */ 649 static mblk_t * 650 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 651 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 652 { 653 dblk_t *dbp; 654 mblk_t *mp; 655 656 ASSERT(base != NULL && frp != NULL); 657 658 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 659 mp = NULL; 660 goto out; 661 } 662 663 mp = dbp->db_mblk; 664 dbp->db_base = base; 665 dbp->db_lim = base + size; 666 dbp->db_free = dbp->db_lastfree = lastfree; 667 dbp->db_frtnp = frp; 668 DBLK_RTFU_WORD(dbp) = db_rtfu; 669 mp->b_next = mp->b_prev = mp->b_cont = NULL; 670 mp->b_rptr = mp->b_wptr = base; 671 mp->b_queue = NULL; 672 MBLK_BAND_FLAG_WORD(mp) = 0; 673 674 out: 675 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 676 return (mp); 677 } 678 679 /*ARGSUSED*/ 680 mblk_t * 681 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 682 { 683 mblk_t *mp; 684 685 /* 686 * Note that this is structured to allow the common case (i.e. 687 * STREAMS flowtracing disabled) to call gesballoc() with tail 688 * call optimization. 689 */ 690 if (!str_ftnever) { 691 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 692 frp, freebs_enqueue, KM_NOSLEEP); 693 694 if (mp != NULL) 695 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 696 return (mp); 697 } 698 699 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 700 frp, freebs_enqueue, KM_NOSLEEP)); 701 } 702 703 /* 704 * Same as esballoc() but sleeps waiting for memory. 705 */ 706 /*ARGSUSED*/ 707 mblk_t * 708 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 709 { 710 mblk_t *mp; 711 712 /* 713 * Note that this is structured to allow the common case (i.e. 714 * STREAMS flowtracing disabled) to call gesballoc() with tail 715 * call optimization. 716 */ 717 if (!str_ftnever) { 718 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 719 frp, freebs_enqueue, KM_SLEEP); 720 721 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 722 return (mp); 723 } 724 725 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 726 frp, freebs_enqueue, KM_SLEEP)); 727 } 728 729 /*ARGSUSED*/ 730 mblk_t * 731 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 732 { 733 mblk_t *mp; 734 735 /* 736 * Note that this is structured to allow the common case (i.e. 737 * STREAMS flowtracing disabled) to call gesballoc() with tail 738 * call optimization. 739 */ 740 if (!str_ftnever) { 741 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 742 frp, dblk_lastfree_desb, KM_NOSLEEP); 743 744 if (mp != NULL) 745 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 746 return (mp); 747 } 748 749 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 750 frp, dblk_lastfree_desb, KM_NOSLEEP)); 751 } 752 753 /*ARGSUSED*/ 754 mblk_t * 755 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 756 { 757 mblk_t *mp; 758 759 /* 760 * Note that this is structured to allow the common case (i.e. 761 * STREAMS flowtracing disabled) to call gesballoc() with tail 762 * call optimization. 763 */ 764 if (!str_ftnever) { 765 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 766 frp, freebs_enqueue, KM_NOSLEEP); 767 768 if (mp != NULL) 769 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 770 return (mp); 771 } 772 773 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 774 frp, freebs_enqueue, KM_NOSLEEP)); 775 } 776 777 /*ARGSUSED*/ 778 mblk_t * 779 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 780 { 781 mblk_t *mp; 782 783 /* 784 * Note that this is structured to allow the common case (i.e. 785 * STREAMS flowtracing disabled) to call gesballoc() with tail 786 * call optimization. 787 */ 788 if (!str_ftnever) { 789 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 790 frp, dblk_lastfree_desb, KM_NOSLEEP); 791 792 if (mp != NULL) 793 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 794 return (mp); 795 } 796 797 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 798 frp, dblk_lastfree_desb, KM_NOSLEEP)); 799 } 800 801 static void 802 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 803 { 804 bcache_t *bcp = dbp->db_cache; 805 806 ASSERT(dbp->db_mblk == mp); 807 if (dbp->db_fthdr != NULL) 808 str_ftfree(dbp); 809 810 /* set credp and projid to be 'unspecified' before returning to cache */ 811 if (dbp->db_credp != NULL) { 812 crfree(dbp->db_credp); 813 dbp->db_credp = NULL; 814 } 815 dbp->db_cpid = -1; 816 dbp->db_struioflag = 0; 817 dbp->db_struioun.cksum.flags = 0; 818 819 mutex_enter(&bcp->mutex); 820 kmem_cache_free(bcp->dblk_cache, dbp); 821 bcp->alloc--; 822 823 if (bcp->alloc == 0 && bcp->destroy != 0) { 824 kmem_cache_destroy(bcp->dblk_cache); 825 kmem_cache_destroy(bcp->buffer_cache); 826 mutex_exit(&bcp->mutex); 827 mutex_destroy(&bcp->mutex); 828 kmem_free(bcp, sizeof (bcache_t)); 829 } else { 830 mutex_exit(&bcp->mutex); 831 } 832 } 833 834 bcache_t * 835 bcache_create(char *name, size_t size, uint_t align) 836 { 837 bcache_t *bcp; 838 char buffer[255]; 839 840 ASSERT((align & (align - 1)) == 0); 841 842 if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == 843 NULL) { 844 return (NULL); 845 } 846 847 bcp->size = size; 848 bcp->align = align; 849 bcp->alloc = 0; 850 bcp->destroy = 0; 851 852 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 853 854 (void) sprintf(buffer, "%s_buffer_cache", name); 855 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 856 NULL, NULL, NULL, 0); 857 (void) sprintf(buffer, "%s_dblk_cache", name); 858 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 859 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 860 NULL, (void *)bcp, NULL, 0); 861 862 return (bcp); 863 } 864 865 void 866 bcache_destroy(bcache_t *bcp) 867 { 868 ASSERT(bcp != NULL); 869 870 mutex_enter(&bcp->mutex); 871 if (bcp->alloc == 0) { 872 kmem_cache_destroy(bcp->dblk_cache); 873 kmem_cache_destroy(bcp->buffer_cache); 874 mutex_exit(&bcp->mutex); 875 mutex_destroy(&bcp->mutex); 876 kmem_free(bcp, sizeof (bcache_t)); 877 } else { 878 bcp->destroy++; 879 mutex_exit(&bcp->mutex); 880 } 881 } 882 883 /*ARGSUSED*/ 884 mblk_t * 885 bcache_allocb(bcache_t *bcp, uint_t pri) 886 { 887 dblk_t *dbp; 888 mblk_t *mp = NULL; 889 890 ASSERT(bcp != NULL); 891 892 mutex_enter(&bcp->mutex); 893 if (bcp->destroy != 0) { 894 mutex_exit(&bcp->mutex); 895 goto out; 896 } 897 898 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 899 mutex_exit(&bcp->mutex); 900 goto out; 901 } 902 bcp->alloc++; 903 mutex_exit(&bcp->mutex); 904 905 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 906 907 mp = dbp->db_mblk; 908 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 909 mp->b_next = mp->b_prev = mp->b_cont = NULL; 910 mp->b_rptr = mp->b_wptr = dbp->db_base; 911 mp->b_queue = NULL; 912 MBLK_BAND_FLAG_WORD(mp) = 0; 913 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 914 out: 915 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 916 917 return (mp); 918 } 919 920 static void 921 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 922 { 923 ASSERT(dbp->db_mblk == mp); 924 if (dbp->db_fthdr != NULL) 925 str_ftfree(dbp); 926 927 /* set credp and projid to be 'unspecified' before returning to cache */ 928 if (dbp->db_credp != NULL) { 929 crfree(dbp->db_credp); 930 dbp->db_credp = NULL; 931 } 932 dbp->db_cpid = -1; 933 dbp->db_struioflag = 0; 934 dbp->db_struioun.cksum.flags = 0; 935 936 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 937 kmem_cache_free(dbp->db_cache, dbp); 938 } 939 940 static mblk_t * 941 allocb_oversize(size_t size, int kmflags) 942 { 943 mblk_t *mp; 944 void *buf; 945 946 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 947 if ((buf = kmem_alloc(size, kmflags)) == NULL) 948 return (NULL); 949 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 950 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 951 kmem_free(buf, size); 952 953 if (mp != NULL) 954 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 955 956 return (mp); 957 } 958 959 mblk_t * 960 allocb_tryhard(size_t target_size) 961 { 962 size_t size; 963 mblk_t *bp; 964 965 for (size = target_size; size < target_size + 512; 966 size += DBLK_CACHE_ALIGN) 967 if ((bp = allocb(size, BPRI_HI)) != NULL) 968 return (bp); 969 allocb_tryhard_fails++; 970 return (NULL); 971 } 972 973 /* 974 * This routine is consolidation private for STREAMS internal use 975 * This routine may only be called from sync routines (i.e., not 976 * from put or service procedures). It is located here (rather 977 * than strsubr.c) so that we don't have to expose all of the 978 * allocb() implementation details in header files. 979 */ 980 mblk_t * 981 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 982 { 983 dblk_t *dbp; 984 mblk_t *mp; 985 size_t index; 986 987 index = (size -1) >> DBLK_SIZE_SHIFT; 988 989 if (flags & STR_NOSIG) { 990 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 991 if (size != 0) { 992 mp = allocb_oversize(size, KM_SLEEP); 993 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 994 (uintptr_t)mp); 995 return (mp); 996 } 997 index = 0; 998 } 999 1000 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1001 mp = dbp->db_mblk; 1002 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1003 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1004 mp->b_rptr = mp->b_wptr = dbp->db_base; 1005 mp->b_queue = NULL; 1006 MBLK_BAND_FLAG_WORD(mp) = 0; 1007 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1008 1009 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1010 1011 } else { 1012 while ((mp = allocb(size, pri)) == NULL) { 1013 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1014 return (NULL); 1015 } 1016 } 1017 1018 return (mp); 1019 } 1020 1021 /* 1022 * Call function 'func' with 'arg' when a class zero block can 1023 * be allocated with priority 'pri'. 1024 */ 1025 bufcall_id_t 1026 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1027 { 1028 return (bufcall(1, pri, func, arg)); 1029 } 1030 1031 /* 1032 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1033 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1034 * This provides consistency for all internal allocators of ioctl. 1035 */ 1036 mblk_t * 1037 mkiocb(uint_t cmd) 1038 { 1039 struct iocblk *ioc; 1040 mblk_t *mp; 1041 1042 /* 1043 * Allocate enough space for any of the ioctl related messages. 1044 */ 1045 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1046 return (NULL); 1047 1048 bzero(mp->b_rptr, sizeof (union ioctypes)); 1049 1050 /* 1051 * Set the mblk_t information and ptrs correctly. 1052 */ 1053 mp->b_wptr += sizeof (struct iocblk); 1054 mp->b_datap->db_type = M_IOCTL; 1055 1056 /* 1057 * Fill in the fields. 1058 */ 1059 ioc = (struct iocblk *)mp->b_rptr; 1060 ioc->ioc_cmd = cmd; 1061 ioc->ioc_cr = kcred; 1062 ioc->ioc_id = getiocseqno(); 1063 ioc->ioc_flag = IOC_NATIVE; 1064 return (mp); 1065 } 1066 1067 /* 1068 * test if block of given size can be allocated with a request of 1069 * the given priority. 1070 * 'pri' is no longer used, but is retained for compatibility. 1071 */ 1072 /* ARGSUSED */ 1073 int 1074 testb(size_t size, uint_t pri) 1075 { 1076 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1077 } 1078 1079 /* 1080 * Call function 'func' with argument 'arg' when there is a reasonably 1081 * good chance that a block of size 'size' can be allocated. 1082 * 'pri' is no longer used, but is retained for compatibility. 1083 */ 1084 /* ARGSUSED */ 1085 bufcall_id_t 1086 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1087 { 1088 static long bid = 1; /* always odd to save checking for zero */ 1089 bufcall_id_t bc_id; 1090 struct strbufcall *bcp; 1091 1092 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1093 return (0); 1094 1095 bcp->bc_func = func; 1096 bcp->bc_arg = arg; 1097 bcp->bc_size = size; 1098 bcp->bc_next = NULL; 1099 bcp->bc_executor = NULL; 1100 1101 mutex_enter(&strbcall_lock); 1102 /* 1103 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1104 * should be no references to bcp since it may be freed by 1105 * runbufcalls(). Since bcp_id field is returned, we save its value in 1106 * the local var. 1107 */ 1108 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1109 1110 /* 1111 * add newly allocated stream event to existing 1112 * linked list of events. 1113 */ 1114 if (strbcalls.bc_head == NULL) { 1115 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1116 } else { 1117 strbcalls.bc_tail->bc_next = bcp; 1118 strbcalls.bc_tail = bcp; 1119 } 1120 1121 cv_signal(&strbcall_cv); 1122 mutex_exit(&strbcall_lock); 1123 return (bc_id); 1124 } 1125 1126 /* 1127 * Cancel a bufcall request. 1128 */ 1129 void 1130 unbufcall(bufcall_id_t id) 1131 { 1132 strbufcall_t *bcp, *pbcp; 1133 1134 mutex_enter(&strbcall_lock); 1135 again: 1136 pbcp = NULL; 1137 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1138 if (id == bcp->bc_id) 1139 break; 1140 pbcp = bcp; 1141 } 1142 if (bcp) { 1143 if (bcp->bc_executor != NULL) { 1144 if (bcp->bc_executor != curthread) { 1145 cv_wait(&bcall_cv, &strbcall_lock); 1146 goto again; 1147 } 1148 } else { 1149 if (pbcp) 1150 pbcp->bc_next = bcp->bc_next; 1151 else 1152 strbcalls.bc_head = bcp->bc_next; 1153 if (bcp == strbcalls.bc_tail) 1154 strbcalls.bc_tail = pbcp; 1155 kmem_free(bcp, sizeof (strbufcall_t)); 1156 } 1157 } 1158 mutex_exit(&strbcall_lock); 1159 } 1160 1161 /* 1162 * Duplicate a message block by block (uses dupb), returning 1163 * a pointer to the duplicate message. 1164 * Returns a non-NULL value only if the entire message 1165 * was dup'd. 1166 */ 1167 mblk_t * 1168 dupmsg(mblk_t *bp) 1169 { 1170 mblk_t *head, *nbp; 1171 1172 if (!bp || !(nbp = head = dupb(bp))) 1173 return (NULL); 1174 1175 while (bp->b_cont) { 1176 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1177 freemsg(head); 1178 return (NULL); 1179 } 1180 nbp = nbp->b_cont; 1181 bp = bp->b_cont; 1182 } 1183 return (head); 1184 } 1185 1186 #define DUPB_NOLOAN(bp) \ 1187 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1188 copyb((bp)) : dupb((bp))) 1189 1190 mblk_t * 1191 dupmsg_noloan(mblk_t *bp) 1192 { 1193 mblk_t *head, *nbp; 1194 1195 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1196 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1197 return (NULL); 1198 1199 while (bp->b_cont) { 1200 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1201 freemsg(head); 1202 return (NULL); 1203 } 1204 nbp = nbp->b_cont; 1205 bp = bp->b_cont; 1206 } 1207 return (head); 1208 } 1209 1210 /* 1211 * Copy data from message and data block to newly allocated message and 1212 * data block. Returns new message block pointer, or NULL if error. 1213 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1214 * as in the original even when db_base is not word aligned. (bug 1052877) 1215 */ 1216 mblk_t * 1217 copyb(mblk_t *bp) 1218 { 1219 mblk_t *nbp; 1220 dblk_t *dp, *ndp; 1221 uchar_t *base; 1222 size_t size; 1223 size_t unaligned; 1224 1225 ASSERT(bp->b_wptr >= bp->b_rptr); 1226 1227 dp = bp->b_datap; 1228 if (dp->db_fthdr != NULL) 1229 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1230 1231 /* 1232 * Special handling for Multidata message; this should be 1233 * removed once a copy-callback routine is made available. 1234 */ 1235 if (dp->db_type == M_MULTIDATA) { 1236 cred_t *cr; 1237 1238 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1239 return (NULL); 1240 1241 nbp->b_flag = bp->b_flag; 1242 nbp->b_band = bp->b_band; 1243 ndp = nbp->b_datap; 1244 1245 /* See comments below on potential issues. */ 1246 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1247 1248 ASSERT(ndp->db_type == dp->db_type); 1249 cr = dp->db_credp; 1250 if (cr != NULL) 1251 crhold(ndp->db_credp = cr); 1252 ndp->db_cpid = dp->db_cpid; 1253 return (nbp); 1254 } 1255 1256 size = dp->db_lim - dp->db_base; 1257 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1258 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1259 return (NULL); 1260 nbp->b_flag = bp->b_flag; 1261 nbp->b_band = bp->b_band; 1262 ndp = nbp->b_datap; 1263 1264 /* 1265 * Well, here is a potential issue. If we are trying to 1266 * trace a flow, and we copy the message, we might lose 1267 * information about where this message might have been. 1268 * So we should inherit the FT data. On the other hand, 1269 * a user might be interested only in alloc to free data. 1270 * So I guess the real answer is to provide a tunable. 1271 */ 1272 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1273 1274 base = ndp->db_base + unaligned; 1275 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1276 1277 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1278 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1279 1280 return (nbp); 1281 } 1282 1283 /* 1284 * Copy data from message to newly allocated message using new 1285 * data blocks. Returns a pointer to the new message, or NULL if error. 1286 */ 1287 mblk_t * 1288 copymsg(mblk_t *bp) 1289 { 1290 mblk_t *head, *nbp; 1291 1292 if (!bp || !(nbp = head = copyb(bp))) 1293 return (NULL); 1294 1295 while (bp->b_cont) { 1296 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1297 freemsg(head); 1298 return (NULL); 1299 } 1300 nbp = nbp->b_cont; 1301 bp = bp->b_cont; 1302 } 1303 return (head); 1304 } 1305 1306 /* 1307 * link a message block to tail of message 1308 */ 1309 void 1310 linkb(mblk_t *mp, mblk_t *bp) 1311 { 1312 ASSERT(mp && bp); 1313 1314 for (; mp->b_cont; mp = mp->b_cont) 1315 ; 1316 mp->b_cont = bp; 1317 } 1318 1319 /* 1320 * unlink a message block from head of message 1321 * return pointer to new message. 1322 * NULL if message becomes empty. 1323 */ 1324 mblk_t * 1325 unlinkb(mblk_t *bp) 1326 { 1327 mblk_t *bp1; 1328 1329 bp1 = bp->b_cont; 1330 bp->b_cont = NULL; 1331 return (bp1); 1332 } 1333 1334 /* 1335 * remove a message block "bp" from message "mp" 1336 * 1337 * Return pointer to new message or NULL if no message remains. 1338 * Return -1 if bp is not found in message. 1339 */ 1340 mblk_t * 1341 rmvb(mblk_t *mp, mblk_t *bp) 1342 { 1343 mblk_t *tmp; 1344 mblk_t *lastp = NULL; 1345 1346 ASSERT(mp && bp); 1347 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1348 if (tmp == bp) { 1349 if (lastp) 1350 lastp->b_cont = tmp->b_cont; 1351 else 1352 mp = tmp->b_cont; 1353 tmp->b_cont = NULL; 1354 return (mp); 1355 } 1356 lastp = tmp; 1357 } 1358 return ((mblk_t *)-1); 1359 } 1360 1361 /* 1362 * Concatenate and align first len bytes of common 1363 * message type. Len == -1, means concat everything. 1364 * Returns 1 on success, 0 on failure 1365 * After the pullup, mp points to the pulled up data. 1366 */ 1367 int 1368 pullupmsg(mblk_t *mp, ssize_t len) 1369 { 1370 mblk_t *bp, *b_cont; 1371 dblk_t *dbp; 1372 ssize_t n; 1373 1374 ASSERT(mp->b_datap->db_ref > 0); 1375 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1376 1377 /* 1378 * We won't handle Multidata message, since it contains 1379 * metadata which this function has no knowledge of; we 1380 * assert on DEBUG, and return failure otherwise. 1381 */ 1382 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1383 if (mp->b_datap->db_type == M_MULTIDATA) 1384 return (0); 1385 1386 if (len == -1) { 1387 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1388 return (1); 1389 len = xmsgsize(mp); 1390 } else { 1391 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1392 ASSERT(first_mblk_len >= 0); 1393 /* 1394 * If the length is less than that of the first mblk, 1395 * we want to pull up the message into an aligned mblk. 1396 * Though not part of the spec, some callers assume it. 1397 */ 1398 if (len <= first_mblk_len) { 1399 if (str_aligned(mp->b_rptr)) 1400 return (1); 1401 len = first_mblk_len; 1402 } else if (xmsgsize(mp) < len) 1403 return (0); 1404 } 1405 1406 if ((bp = allocb_tmpl(len, mp)) == NULL) 1407 return (0); 1408 1409 dbp = bp->b_datap; 1410 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1411 mp->b_datap = dbp; /* ... and mp heads the new message */ 1412 mp->b_datap->db_mblk = mp; 1413 bp->b_datap->db_mblk = bp; 1414 mp->b_rptr = mp->b_wptr = dbp->db_base; 1415 1416 do { 1417 ASSERT(bp->b_datap->db_ref > 0); 1418 ASSERT(bp->b_wptr >= bp->b_rptr); 1419 n = MIN(bp->b_wptr - bp->b_rptr, len); 1420 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1421 mp->b_wptr += n; 1422 bp->b_rptr += n; 1423 len -= n; 1424 if (bp->b_rptr != bp->b_wptr) 1425 break; 1426 b_cont = bp->b_cont; 1427 freeb(bp); 1428 bp = b_cont; 1429 } while (len && bp); 1430 1431 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1432 1433 return (1); 1434 } 1435 1436 /* 1437 * Concatenate and align at least the first len bytes of common message 1438 * type. Len == -1 means concatenate everything. The original message is 1439 * unaltered. Returns a pointer to a new message on success, otherwise 1440 * returns NULL. 1441 */ 1442 mblk_t * 1443 msgpullup(mblk_t *mp, ssize_t len) 1444 { 1445 mblk_t *newmp; 1446 ssize_t totlen; 1447 ssize_t n; 1448 1449 /* 1450 * We won't handle Multidata message, since it contains 1451 * metadata which this function has no knowledge of; we 1452 * assert on DEBUG, and return failure otherwise. 1453 */ 1454 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1455 if (mp->b_datap->db_type == M_MULTIDATA) 1456 return (NULL); 1457 1458 totlen = xmsgsize(mp); 1459 1460 if ((len > 0) && (len > totlen)) 1461 return (NULL); 1462 1463 /* 1464 * Copy all of the first msg type into one new mblk, then dupmsg 1465 * and link the rest onto this. 1466 */ 1467 1468 len = totlen; 1469 1470 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1471 return (NULL); 1472 1473 newmp->b_flag = mp->b_flag; 1474 newmp->b_band = mp->b_band; 1475 1476 while (len > 0) { 1477 n = mp->b_wptr - mp->b_rptr; 1478 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1479 if (n > 0) 1480 bcopy(mp->b_rptr, newmp->b_wptr, n); 1481 newmp->b_wptr += n; 1482 len -= n; 1483 mp = mp->b_cont; 1484 } 1485 1486 if (mp != NULL) { 1487 newmp->b_cont = dupmsg(mp); 1488 if (newmp->b_cont == NULL) { 1489 freemsg(newmp); 1490 return (NULL); 1491 } 1492 } 1493 1494 return (newmp); 1495 } 1496 1497 /* 1498 * Trim bytes from message 1499 * len > 0, trim from head 1500 * len < 0, trim from tail 1501 * Returns 1 on success, 0 on failure. 1502 */ 1503 int 1504 adjmsg(mblk_t *mp, ssize_t len) 1505 { 1506 mblk_t *bp; 1507 mblk_t *save_bp = NULL; 1508 mblk_t *prev_bp; 1509 mblk_t *bcont; 1510 unsigned char type; 1511 ssize_t n; 1512 int fromhead; 1513 int first; 1514 1515 ASSERT(mp != NULL); 1516 /* 1517 * We won't handle Multidata message, since it contains 1518 * metadata which this function has no knowledge of; we 1519 * assert on DEBUG, and return failure otherwise. 1520 */ 1521 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1522 if (mp->b_datap->db_type == M_MULTIDATA) 1523 return (0); 1524 1525 if (len < 0) { 1526 fromhead = 0; 1527 len = -len; 1528 } else { 1529 fromhead = 1; 1530 } 1531 1532 if (xmsgsize(mp) < len) 1533 return (0); 1534 1535 1536 if (fromhead) { 1537 first = 1; 1538 while (len) { 1539 ASSERT(mp->b_wptr >= mp->b_rptr); 1540 n = MIN(mp->b_wptr - mp->b_rptr, len); 1541 mp->b_rptr += n; 1542 len -= n; 1543 1544 /* 1545 * If this is not the first zero length 1546 * message remove it 1547 */ 1548 if (!first && (mp->b_wptr == mp->b_rptr)) { 1549 bcont = mp->b_cont; 1550 freeb(mp); 1551 mp = save_bp->b_cont = bcont; 1552 } else { 1553 save_bp = mp; 1554 mp = mp->b_cont; 1555 } 1556 first = 0; 1557 } 1558 } else { 1559 type = mp->b_datap->db_type; 1560 while (len) { 1561 bp = mp; 1562 save_bp = NULL; 1563 1564 /* 1565 * Find the last message of same type 1566 */ 1567 1568 while (bp && bp->b_datap->db_type == type) { 1569 ASSERT(bp->b_wptr >= bp->b_rptr); 1570 prev_bp = save_bp; 1571 save_bp = bp; 1572 bp = bp->b_cont; 1573 } 1574 if (save_bp == NULL) 1575 break; 1576 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1577 save_bp->b_wptr -= n; 1578 len -= n; 1579 1580 /* 1581 * If this is not the first message 1582 * and we have taken away everything 1583 * from this message, remove it 1584 */ 1585 1586 if ((save_bp != mp) && 1587 (save_bp->b_wptr == save_bp->b_rptr)) { 1588 bcont = save_bp->b_cont; 1589 freeb(save_bp); 1590 prev_bp->b_cont = bcont; 1591 } 1592 } 1593 } 1594 return (1); 1595 } 1596 1597 /* 1598 * get number of data bytes in message 1599 */ 1600 size_t 1601 msgdsize(mblk_t *bp) 1602 { 1603 size_t count = 0; 1604 1605 for (; bp; bp = bp->b_cont) 1606 if (bp->b_datap->db_type == M_DATA) { 1607 ASSERT(bp->b_wptr >= bp->b_rptr); 1608 count += bp->b_wptr - bp->b_rptr; 1609 } 1610 return (count); 1611 } 1612 1613 /* 1614 * Get a message off head of queue 1615 * 1616 * If queue has no buffers then mark queue 1617 * with QWANTR. (queue wants to be read by 1618 * someone when data becomes available) 1619 * 1620 * If there is something to take off then do so. 1621 * If queue falls below hi water mark turn off QFULL 1622 * flag. Decrement weighted count of queue. 1623 * Also turn off QWANTR because queue is being read. 1624 * 1625 * The queue count is maintained on a per-band basis. 1626 * Priority band 0 (normal messages) uses q_count, 1627 * q_lowat, etc. Non-zero priority bands use the 1628 * fields in their respective qband structures 1629 * (qb_count, qb_lowat, etc.) All messages appear 1630 * on the same list, linked via their b_next pointers. 1631 * q_first is the head of the list. q_count does 1632 * not reflect the size of all the messages on the 1633 * queue. It only reflects those messages in the 1634 * normal band of flow. The one exception to this 1635 * deals with high priority messages. They are in 1636 * their own conceptual "band", but are accounted 1637 * against q_count. 1638 * 1639 * If queue count is below the lo water mark and QWANTW 1640 * is set, enable the closest backq which has a service 1641 * procedure and turn off the QWANTW flag. 1642 * 1643 * getq could be built on top of rmvq, but isn't because 1644 * of performance considerations. 1645 * 1646 * A note on the use of q_count and q_mblkcnt: 1647 * q_count is the traditional byte count for messages that 1648 * have been put on a queue. Documentation tells us that 1649 * we shouldn't rely on that count, but some drivers/modules 1650 * do. What was needed, however, is a mechanism to prevent 1651 * runaway streams from consuming all of the resources, 1652 * and particularly be able to flow control zero-length 1653 * messages. q_mblkcnt is used for this purpose. It 1654 * counts the number of mblk's that are being put on 1655 * the queue. The intention here, is that each mblk should 1656 * contain one byte of data and, for the purpose of 1657 * flow-control, logically does. A queue will become 1658 * full when EITHER of these values (q_count and q_mblkcnt) 1659 * reach the highwater mark. It will clear when BOTH 1660 * of them drop below the highwater mark. And it will 1661 * backenable when BOTH of them drop below the lowwater 1662 * mark. 1663 * With this algorithm, a driver/module might be able 1664 * to find a reasonably accurate q_count, and the 1665 * framework can still try and limit resource usage. 1666 */ 1667 mblk_t * 1668 getq(queue_t *q) 1669 { 1670 mblk_t *bp; 1671 uchar_t band = 0; 1672 1673 bp = getq_noenab(q, 0); 1674 if (bp != NULL) 1675 band = bp->b_band; 1676 1677 /* 1678 * Inlined from qbackenable(). 1679 * Quick check without holding the lock. 1680 */ 1681 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1682 return (bp); 1683 1684 qbackenable(q, band); 1685 return (bp); 1686 } 1687 1688 /* 1689 * Calculate number of data bytes in a single data message block taking 1690 * multidata messages into account. 1691 */ 1692 1693 #define ADD_MBLK_SIZE(mp, size) \ 1694 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1695 (size) += MBLKL(mp); \ 1696 } else { \ 1697 uint_t pinuse; \ 1698 \ 1699 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1700 (size) += pinuse; \ 1701 } 1702 1703 /* 1704 * Returns the number of bytes in a message (a message is defined as a 1705 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we 1706 * also return the number of distinct mblks in the message. 1707 */ 1708 int 1709 mp_cont_len(mblk_t *bp, int *mblkcnt) 1710 { 1711 mblk_t *mp; 1712 int mblks = 0; 1713 int bytes = 0; 1714 1715 for (mp = bp; mp != NULL; mp = mp->b_cont) { 1716 ADD_MBLK_SIZE(mp, bytes); 1717 mblks++; 1718 } 1719 1720 if (mblkcnt != NULL) 1721 *mblkcnt = mblks; 1722 1723 return (bytes); 1724 } 1725 1726 /* 1727 * Like getq() but does not backenable. This is used by the stream 1728 * head when a putback() is likely. The caller must call qbackenable() 1729 * after it is done with accessing the queue. 1730 * The rbytes arguments to getq_noneab() allows callers to specify a 1731 * the maximum number of bytes to return. If the current amount on the 1732 * queue is less than this then the entire message will be returned. 1733 * A value of 0 returns the entire message and is equivalent to the old 1734 * default behaviour prior to the addition of the rbytes argument. 1735 */ 1736 mblk_t * 1737 getq_noenab(queue_t *q, ssize_t rbytes) 1738 { 1739 mblk_t *bp, *mp1; 1740 mblk_t *mp2 = NULL; 1741 qband_t *qbp; 1742 kthread_id_t freezer; 1743 int bytecnt = 0, mblkcnt = 0; 1744 1745 /* freezestr should allow its caller to call getq/putq */ 1746 freezer = STREAM(q)->sd_freezer; 1747 if (freezer == curthread) { 1748 ASSERT(frozenstr(q)); 1749 ASSERT(MUTEX_HELD(QLOCK(q))); 1750 } else 1751 mutex_enter(QLOCK(q)); 1752 1753 if ((bp = q->q_first) == 0) { 1754 q->q_flag |= QWANTR; 1755 } else { 1756 /* 1757 * If the caller supplied a byte threshold and there is 1758 * more than this amount on the queue then break up the 1759 * the message appropriately. We can only safely do 1760 * this for M_DATA messages. 1761 */ 1762 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) && 1763 (q->q_count > rbytes)) { 1764 /* 1765 * Inline version of mp_cont_len() which terminates 1766 * when we meet or exceed rbytes. 1767 */ 1768 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) { 1769 mblkcnt++; 1770 ADD_MBLK_SIZE(mp1, bytecnt); 1771 if (bytecnt >= rbytes) 1772 break; 1773 } 1774 /* 1775 * We need to account for the following scenarios: 1776 * 1777 * 1) Too much data in the first message: 1778 * mp1 will be the mblk which puts us over our 1779 * byte limit. 1780 * 2) Not enough data in the first message: 1781 * mp1 will be NULL. 1782 * 3) Exactly the right amount of data contained within 1783 * whole mblks: 1784 * mp1->b_cont will be where we break the message. 1785 */ 1786 if (bytecnt > rbytes) { 1787 /* 1788 * Dup/copy mp1 and put what we don't need 1789 * back onto the queue. Adjust the read/write 1790 * and continuation pointers appropriately 1791 * and decrement the current mblk count to 1792 * reflect we are putting an mblk back onto 1793 * the queue. 1794 * When adjusting the message pointers, it's 1795 * OK to use the existing bytecnt and the 1796 * requested amount (rbytes) to calculate the 1797 * the new write offset (b_wptr) of what we 1798 * are taking. However, we cannot use these 1799 * values when calculating the read offset of 1800 * the mblk we are putting back on the queue. 1801 * This is because the begining (b_rptr) of the 1802 * mblk represents some arbitrary point within 1803 * the message. 1804 * It's simplest to do this by advancing b_rptr 1805 * by the new length of mp1 as we don't have to 1806 * remember any intermediate state. 1807 */ 1808 ASSERT(mp1 != NULL); 1809 mblkcnt--; 1810 if ((mp2 = dupb(mp1)) == NULL && 1811 (mp2 = copyb(mp1)) == NULL) { 1812 bytecnt = mblkcnt = 0; 1813 goto dup_failed; 1814 } 1815 mp2->b_cont = mp1->b_cont; 1816 mp1->b_wptr -= bytecnt - rbytes; 1817 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr; 1818 mp1->b_cont = NULL; 1819 bytecnt = rbytes; 1820 } else { 1821 /* 1822 * Either there is not enough data in the first 1823 * message or there is no excess data to deal 1824 * with. If mp1 is NULL, we are taking the 1825 * whole message. No need to do anything. 1826 * Otherwise we assign mp1->b_cont to mp2 as 1827 * we will be putting this back onto the head of 1828 * the queue. 1829 */ 1830 if (mp1 != NULL) { 1831 mp2 = mp1->b_cont; 1832 mp1->b_cont = NULL; 1833 } 1834 } 1835 /* 1836 * If mp2 is not NULL then we have part of the message 1837 * to put back onto the queue. 1838 */ 1839 if (mp2 != NULL) { 1840 if ((mp2->b_next = bp->b_next) == NULL) 1841 q->q_last = mp2; 1842 else 1843 bp->b_next->b_prev = mp2; 1844 q->q_first = mp2; 1845 } else { 1846 if ((q->q_first = bp->b_next) == NULL) 1847 q->q_last = NULL; 1848 else 1849 q->q_first->b_prev = NULL; 1850 } 1851 } else { 1852 /* 1853 * Either no byte threshold was supplied, there is 1854 * not enough on the queue or we failed to 1855 * duplicate/copy a data block. In these cases we 1856 * just take the entire first message. 1857 */ 1858 dup_failed: 1859 bytecnt = mp_cont_len(bp, &mblkcnt); 1860 if ((q->q_first = bp->b_next) == NULL) 1861 q->q_last = NULL; 1862 else 1863 q->q_first->b_prev = NULL; 1864 } 1865 if (bp->b_band == 0) { 1866 q->q_count -= bytecnt; 1867 q->q_mblkcnt -= mblkcnt; 1868 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 1869 (q->q_mblkcnt < q->q_hiwat))) { 1870 q->q_flag &= ~QFULL; 1871 } 1872 } else { 1873 int i; 1874 1875 ASSERT(bp->b_band <= q->q_nband); 1876 ASSERT(q->q_bandp != NULL); 1877 ASSERT(MUTEX_HELD(QLOCK(q))); 1878 qbp = q->q_bandp; 1879 i = bp->b_band; 1880 while (--i > 0) 1881 qbp = qbp->qb_next; 1882 if (qbp->qb_first == qbp->qb_last) { 1883 qbp->qb_first = NULL; 1884 qbp->qb_last = NULL; 1885 } else { 1886 qbp->qb_first = bp->b_next; 1887 } 1888 qbp->qb_count -= bytecnt; 1889 qbp->qb_mblkcnt -= mblkcnt; 1890 if (qbp->qb_mblkcnt == 0 || 1891 ((qbp->qb_count < qbp->qb_hiwat) && 1892 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 1893 qbp->qb_flag &= ~QB_FULL; 1894 } 1895 } 1896 q->q_flag &= ~QWANTR; 1897 bp->b_next = NULL; 1898 bp->b_prev = NULL; 1899 } 1900 if (freezer != curthread) 1901 mutex_exit(QLOCK(q)); 1902 1903 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 1904 1905 return (bp); 1906 } 1907 1908 /* 1909 * Determine if a backenable is needed after removing a message in the 1910 * specified band. 1911 * NOTE: This routine assumes that something like getq_noenab() has been 1912 * already called. 1913 * 1914 * For the read side it is ok to hold sd_lock across calling this (and the 1915 * stream head often does). 1916 * But for the write side strwakeq might be invoked and it acquires sd_lock. 1917 */ 1918 void 1919 qbackenable(queue_t *q, uchar_t band) 1920 { 1921 int backenab = 0; 1922 qband_t *qbp; 1923 kthread_id_t freezer; 1924 1925 ASSERT(q); 1926 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 1927 1928 /* 1929 * Quick check without holding the lock. 1930 * OK since after getq() has lowered the q_count these flags 1931 * would not change unless either the qbackenable() is done by 1932 * another thread (which is ok) or the queue has gotten QFULL 1933 * in which case another backenable will take place when the queue 1934 * drops below q_lowat. 1935 */ 1936 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1937 return; 1938 1939 /* freezestr should allow its caller to call getq/putq */ 1940 freezer = STREAM(q)->sd_freezer; 1941 if (freezer == curthread) { 1942 ASSERT(frozenstr(q)); 1943 ASSERT(MUTEX_HELD(QLOCK(q))); 1944 } else 1945 mutex_enter(QLOCK(q)); 1946 1947 if (band == 0) { 1948 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 1949 q->q_mblkcnt < q->q_lowat)) { 1950 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 1951 } 1952 } else { 1953 int i; 1954 1955 ASSERT((unsigned)band <= q->q_nband); 1956 ASSERT(q->q_bandp != NULL); 1957 1958 qbp = q->q_bandp; 1959 i = band; 1960 while (--i > 0) 1961 qbp = qbp->qb_next; 1962 1963 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 1964 qbp->qb_mblkcnt < qbp->qb_lowat)) { 1965 backenab = qbp->qb_flag & QB_WANTW; 1966 } 1967 } 1968 1969 if (backenab == 0) { 1970 if (freezer != curthread) 1971 mutex_exit(QLOCK(q)); 1972 return; 1973 } 1974 1975 /* Have to drop the lock across strwakeq and backenable */ 1976 if (backenab & QWANTWSYNC) 1977 q->q_flag &= ~QWANTWSYNC; 1978 if (backenab & (QWANTW|QB_WANTW)) { 1979 if (band != 0) 1980 qbp->qb_flag &= ~QB_WANTW; 1981 else { 1982 q->q_flag &= ~QWANTW; 1983 } 1984 } 1985 1986 if (freezer != curthread) 1987 mutex_exit(QLOCK(q)); 1988 1989 if (backenab & QWANTWSYNC) 1990 strwakeq(q, QWANTWSYNC); 1991 if (backenab & (QWANTW|QB_WANTW)) 1992 backenable(q, band); 1993 } 1994 1995 /* 1996 * Remove a message from a queue. The queue count and other 1997 * flow control parameters are adjusted and the back queue 1998 * enabled if necessary. 1999 * 2000 * rmvq can be called with the stream frozen, but other utility functions 2001 * holding QLOCK, and by streams modules without any locks/frozen. 2002 */ 2003 void 2004 rmvq(queue_t *q, mblk_t *mp) 2005 { 2006 ASSERT(mp != NULL); 2007 2008 rmvq_noenab(q, mp); 2009 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 2010 /* 2011 * qbackenable can handle a frozen stream but not a "random" 2012 * qlock being held. Drop lock across qbackenable. 2013 */ 2014 mutex_exit(QLOCK(q)); 2015 qbackenable(q, mp->b_band); 2016 mutex_enter(QLOCK(q)); 2017 } else { 2018 qbackenable(q, mp->b_band); 2019 } 2020 } 2021 2022 /* 2023 * Like rmvq() but without any backenabling. 2024 * This exists to handle SR_CONSOL_DATA in strrput(). 2025 */ 2026 void 2027 rmvq_noenab(queue_t *q, mblk_t *mp) 2028 { 2029 int i; 2030 qband_t *qbp = NULL; 2031 kthread_id_t freezer; 2032 int bytecnt = 0, mblkcnt = 0; 2033 2034 freezer = STREAM(q)->sd_freezer; 2035 if (freezer == curthread) { 2036 ASSERT(frozenstr(q)); 2037 ASSERT(MUTEX_HELD(QLOCK(q))); 2038 } else if (MUTEX_HELD(QLOCK(q))) { 2039 /* Don't drop lock on exit */ 2040 freezer = curthread; 2041 } else 2042 mutex_enter(QLOCK(q)); 2043 2044 ASSERT(mp->b_band <= q->q_nband); 2045 if (mp->b_band != 0) { /* Adjust band pointers */ 2046 ASSERT(q->q_bandp != NULL); 2047 qbp = q->q_bandp; 2048 i = mp->b_band; 2049 while (--i > 0) 2050 qbp = qbp->qb_next; 2051 if (mp == qbp->qb_first) { 2052 if (mp->b_next && mp->b_band == mp->b_next->b_band) 2053 qbp->qb_first = mp->b_next; 2054 else 2055 qbp->qb_first = NULL; 2056 } 2057 if (mp == qbp->qb_last) { 2058 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 2059 qbp->qb_last = mp->b_prev; 2060 else 2061 qbp->qb_last = NULL; 2062 } 2063 } 2064 2065 /* 2066 * Remove the message from the list. 2067 */ 2068 if (mp->b_prev) 2069 mp->b_prev->b_next = mp->b_next; 2070 else 2071 q->q_first = mp->b_next; 2072 if (mp->b_next) 2073 mp->b_next->b_prev = mp->b_prev; 2074 else 2075 q->q_last = mp->b_prev; 2076 mp->b_next = NULL; 2077 mp->b_prev = NULL; 2078 2079 /* Get the size of the message for q_count accounting */ 2080 bytecnt = mp_cont_len(mp, &mblkcnt); 2081 2082 if (mp->b_band == 0) { /* Perform q_count accounting */ 2083 q->q_count -= bytecnt; 2084 q->q_mblkcnt -= mblkcnt; 2085 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2086 (q->q_mblkcnt < q->q_hiwat))) { 2087 q->q_flag &= ~QFULL; 2088 } 2089 } else { /* Perform qb_count accounting */ 2090 qbp->qb_count -= bytecnt; 2091 qbp->qb_mblkcnt -= mblkcnt; 2092 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) && 2093 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2094 qbp->qb_flag &= ~QB_FULL; 2095 } 2096 } 2097 if (freezer != curthread) 2098 mutex_exit(QLOCK(q)); 2099 2100 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 2101 } 2102 2103 /* 2104 * Empty a queue. 2105 * If flag is set, remove all messages. Otherwise, remove 2106 * only non-control messages. If queue falls below its low 2107 * water mark, and QWANTW is set, enable the nearest upstream 2108 * service procedure. 2109 * 2110 * Historical note: when merging the M_FLUSH code in strrput with this 2111 * code one difference was discovered. flushq did not have a check 2112 * for q_lowat == 0 in the backenabling test. 2113 * 2114 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2115 * if one exists on the queue. 2116 */ 2117 void 2118 flushq_common(queue_t *q, int flag, int pcproto_flag) 2119 { 2120 mblk_t *mp, *nmp; 2121 qband_t *qbp; 2122 int backenab = 0; 2123 unsigned char bpri; 2124 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2125 2126 if (q->q_first == NULL) 2127 return; 2128 2129 mutex_enter(QLOCK(q)); 2130 mp = q->q_first; 2131 q->q_first = NULL; 2132 q->q_last = NULL; 2133 q->q_count = 0; 2134 q->q_mblkcnt = 0; 2135 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2136 qbp->qb_first = NULL; 2137 qbp->qb_last = NULL; 2138 qbp->qb_count = 0; 2139 qbp->qb_mblkcnt = 0; 2140 qbp->qb_flag &= ~QB_FULL; 2141 } 2142 q->q_flag &= ~QFULL; 2143 mutex_exit(QLOCK(q)); 2144 while (mp) { 2145 nmp = mp->b_next; 2146 mp->b_next = mp->b_prev = NULL; 2147 2148 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2149 2150 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2151 (void) putq(q, mp); 2152 else if (flag || datamsg(mp->b_datap->db_type)) 2153 freemsg(mp); 2154 else 2155 (void) putq(q, mp); 2156 mp = nmp; 2157 } 2158 bpri = 1; 2159 mutex_enter(QLOCK(q)); 2160 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2161 if ((qbp->qb_flag & QB_WANTW) && 2162 (((qbp->qb_count < qbp->qb_lowat) && 2163 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2164 qbp->qb_lowat == 0)) { 2165 qbp->qb_flag &= ~QB_WANTW; 2166 backenab = 1; 2167 qbf[bpri] = 1; 2168 } else 2169 qbf[bpri] = 0; 2170 bpri++; 2171 } 2172 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2173 if ((q->q_flag & QWANTW) && 2174 (((q->q_count < q->q_lowat) && 2175 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2176 q->q_flag &= ~QWANTW; 2177 backenab = 1; 2178 qbf[0] = 1; 2179 } else 2180 qbf[0] = 0; 2181 2182 /* 2183 * If any band can now be written to, and there is a writer 2184 * for that band, then backenable the closest service procedure. 2185 */ 2186 if (backenab) { 2187 mutex_exit(QLOCK(q)); 2188 for (bpri = q->q_nband; bpri != 0; bpri--) 2189 if (qbf[bpri]) 2190 backenable(q, bpri); 2191 if (qbf[0]) 2192 backenable(q, 0); 2193 } else 2194 mutex_exit(QLOCK(q)); 2195 } 2196 2197 /* 2198 * The real flushing takes place in flushq_common. This is done so that 2199 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2200 * or not. Currently the only place that uses this flag is the stream head. 2201 */ 2202 void 2203 flushq(queue_t *q, int flag) 2204 { 2205 flushq_common(q, flag, 0); 2206 } 2207 2208 /* 2209 * Flush the queue of messages of the given priority band. 2210 * There is some duplication of code between flushq and flushband. 2211 * This is because we want to optimize the code as much as possible. 2212 * The assumption is that there will be more messages in the normal 2213 * (priority 0) band than in any other. 2214 * 2215 * Historical note: when merging the M_FLUSH code in strrput with this 2216 * code one difference was discovered. flushband had an extra check for 2217 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2218 * case. That check does not match the man page for flushband and was not 2219 * in the strrput flush code hence it was removed. 2220 */ 2221 void 2222 flushband(queue_t *q, unsigned char pri, int flag) 2223 { 2224 mblk_t *mp; 2225 mblk_t *nmp; 2226 mblk_t *last; 2227 qband_t *qbp; 2228 int band; 2229 2230 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2231 if (pri > q->q_nband) { 2232 return; 2233 } 2234 mutex_enter(QLOCK(q)); 2235 if (pri == 0) { 2236 mp = q->q_first; 2237 q->q_first = NULL; 2238 q->q_last = NULL; 2239 q->q_count = 0; 2240 q->q_mblkcnt = 0; 2241 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2242 qbp->qb_first = NULL; 2243 qbp->qb_last = NULL; 2244 qbp->qb_count = 0; 2245 qbp->qb_mblkcnt = 0; 2246 qbp->qb_flag &= ~QB_FULL; 2247 } 2248 q->q_flag &= ~QFULL; 2249 mutex_exit(QLOCK(q)); 2250 while (mp) { 2251 nmp = mp->b_next; 2252 mp->b_next = mp->b_prev = NULL; 2253 if ((mp->b_band == 0) && 2254 ((flag == FLUSHALL) || 2255 datamsg(mp->b_datap->db_type))) 2256 freemsg(mp); 2257 else 2258 (void) putq(q, mp); 2259 mp = nmp; 2260 } 2261 mutex_enter(QLOCK(q)); 2262 if ((q->q_flag & QWANTW) && 2263 (((q->q_count < q->q_lowat) && 2264 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2265 q->q_flag &= ~QWANTW; 2266 mutex_exit(QLOCK(q)); 2267 2268 backenable(q, pri); 2269 } else 2270 mutex_exit(QLOCK(q)); 2271 } else { /* pri != 0 */ 2272 boolean_t flushed = B_FALSE; 2273 band = pri; 2274 2275 ASSERT(MUTEX_HELD(QLOCK(q))); 2276 qbp = q->q_bandp; 2277 while (--band > 0) 2278 qbp = qbp->qb_next; 2279 mp = qbp->qb_first; 2280 if (mp == NULL) { 2281 mutex_exit(QLOCK(q)); 2282 return; 2283 } 2284 last = qbp->qb_last->b_next; 2285 /* 2286 * rmvq_noenab() and freemsg() are called for each mblk that 2287 * meets the criteria. The loop is executed until the last 2288 * mblk has been processed. 2289 */ 2290 while (mp != last) { 2291 ASSERT(mp->b_band == pri); 2292 nmp = mp->b_next; 2293 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2294 rmvq_noenab(q, mp); 2295 freemsg(mp); 2296 flushed = B_TRUE; 2297 } 2298 mp = nmp; 2299 } 2300 mutex_exit(QLOCK(q)); 2301 2302 /* 2303 * If any mblk(s) has been freed, we know that qbackenable() 2304 * will need to be called. 2305 */ 2306 if (flushed) 2307 qbackenable(q, pri); 2308 } 2309 } 2310 2311 /* 2312 * Return 1 if the queue is not full. If the queue is full, return 2313 * 0 (may not put message) and set QWANTW flag (caller wants to write 2314 * to the queue). 2315 */ 2316 int 2317 canput(queue_t *q) 2318 { 2319 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2320 2321 /* this is for loopback transports, they should not do a canput */ 2322 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2323 2324 /* Find next forward module that has a service procedure */ 2325 q = q->q_nfsrv; 2326 2327 if (!(q->q_flag & QFULL)) { 2328 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2329 return (1); 2330 } 2331 mutex_enter(QLOCK(q)); 2332 if (q->q_flag & QFULL) { 2333 q->q_flag |= QWANTW; 2334 mutex_exit(QLOCK(q)); 2335 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2336 return (0); 2337 } 2338 mutex_exit(QLOCK(q)); 2339 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2340 return (1); 2341 } 2342 2343 /* 2344 * This is the new canput for use with priority bands. Return 1 if the 2345 * band is not full. If the band is full, return 0 (may not put message) 2346 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2347 * write to the queue). 2348 */ 2349 int 2350 bcanput(queue_t *q, unsigned char pri) 2351 { 2352 qband_t *qbp; 2353 2354 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2355 if (!q) 2356 return (0); 2357 2358 /* Find next forward module that has a service procedure */ 2359 q = q->q_nfsrv; 2360 2361 mutex_enter(QLOCK(q)); 2362 if (pri == 0) { 2363 if (q->q_flag & QFULL) { 2364 q->q_flag |= QWANTW; 2365 mutex_exit(QLOCK(q)); 2366 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2367 "bcanput:%p %X %d", q, pri, 0); 2368 return (0); 2369 } 2370 } else { /* pri != 0 */ 2371 if (pri > q->q_nband) { 2372 /* 2373 * No band exists yet, so return success. 2374 */ 2375 mutex_exit(QLOCK(q)); 2376 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2377 "bcanput:%p %X %d", q, pri, 1); 2378 return (1); 2379 } 2380 qbp = q->q_bandp; 2381 while (--pri) 2382 qbp = qbp->qb_next; 2383 if (qbp->qb_flag & QB_FULL) { 2384 qbp->qb_flag |= QB_WANTW; 2385 mutex_exit(QLOCK(q)); 2386 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2387 "bcanput:%p %X %d", q, pri, 0); 2388 return (0); 2389 } 2390 } 2391 mutex_exit(QLOCK(q)); 2392 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2393 "bcanput:%p %X %d", q, pri, 1); 2394 return (1); 2395 } 2396 2397 /* 2398 * Put a message on a queue. 2399 * 2400 * Messages are enqueued on a priority basis. The priority classes 2401 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2402 * and B_NORMAL (type < QPCTL && band == 0). 2403 * 2404 * Add appropriate weighted data block sizes to queue count. 2405 * If queue hits high water mark then set QFULL flag. 2406 * 2407 * If QNOENAB is not set (putq is allowed to enable the queue), 2408 * enable the queue only if the message is PRIORITY, 2409 * or the QWANTR flag is set (indicating that the service procedure 2410 * is ready to read the queue. This implies that a service 2411 * procedure must NEVER put a high priority message back on its own 2412 * queue, as this would result in an infinite loop (!). 2413 */ 2414 int 2415 putq(queue_t *q, mblk_t *bp) 2416 { 2417 mblk_t *tmp; 2418 qband_t *qbp = NULL; 2419 int mcls = (int)queclass(bp); 2420 kthread_id_t freezer; 2421 int bytecnt = 0, mblkcnt = 0; 2422 2423 freezer = STREAM(q)->sd_freezer; 2424 if (freezer == curthread) { 2425 ASSERT(frozenstr(q)); 2426 ASSERT(MUTEX_HELD(QLOCK(q))); 2427 } else 2428 mutex_enter(QLOCK(q)); 2429 2430 /* 2431 * Make sanity checks and if qband structure is not yet 2432 * allocated, do so. 2433 */ 2434 if (mcls == QPCTL) { 2435 if (bp->b_band != 0) 2436 bp->b_band = 0; /* force to be correct */ 2437 } else if (bp->b_band != 0) { 2438 int i; 2439 qband_t **qbpp; 2440 2441 if (bp->b_band > q->q_nband) { 2442 2443 /* 2444 * The qband structure for this priority band is 2445 * not on the queue yet, so we have to allocate 2446 * one on the fly. It would be wasteful to 2447 * associate the qband structures with every 2448 * queue when the queues are allocated. This is 2449 * because most queues will only need the normal 2450 * band of flow which can be described entirely 2451 * by the queue itself. 2452 */ 2453 qbpp = &q->q_bandp; 2454 while (*qbpp) 2455 qbpp = &(*qbpp)->qb_next; 2456 while (bp->b_band > q->q_nband) { 2457 if ((*qbpp = allocband()) == NULL) { 2458 if (freezer != curthread) 2459 mutex_exit(QLOCK(q)); 2460 return (0); 2461 } 2462 (*qbpp)->qb_hiwat = q->q_hiwat; 2463 (*qbpp)->qb_lowat = q->q_lowat; 2464 q->q_nband++; 2465 qbpp = &(*qbpp)->qb_next; 2466 } 2467 } 2468 ASSERT(MUTEX_HELD(QLOCK(q))); 2469 qbp = q->q_bandp; 2470 i = bp->b_band; 2471 while (--i) 2472 qbp = qbp->qb_next; 2473 } 2474 2475 /* 2476 * If queue is empty, add the message and initialize the pointers. 2477 * Otherwise, adjust message pointers and queue pointers based on 2478 * the type of the message and where it belongs on the queue. Some 2479 * code is duplicated to minimize the number of conditionals and 2480 * hopefully minimize the amount of time this routine takes. 2481 */ 2482 if (!q->q_first) { 2483 bp->b_next = NULL; 2484 bp->b_prev = NULL; 2485 q->q_first = bp; 2486 q->q_last = bp; 2487 if (qbp) { 2488 qbp->qb_first = bp; 2489 qbp->qb_last = bp; 2490 } 2491 } else if (!qbp) { /* bp->b_band == 0 */ 2492 2493 /* 2494 * If queue class of message is less than or equal to 2495 * that of the last one on the queue, tack on to the end. 2496 */ 2497 tmp = q->q_last; 2498 if (mcls <= (int)queclass(tmp)) { 2499 bp->b_next = NULL; 2500 bp->b_prev = tmp; 2501 tmp->b_next = bp; 2502 q->q_last = bp; 2503 } else { 2504 tmp = q->q_first; 2505 while ((int)queclass(tmp) >= mcls) 2506 tmp = tmp->b_next; 2507 2508 /* 2509 * Insert bp before tmp. 2510 */ 2511 bp->b_next = tmp; 2512 bp->b_prev = tmp->b_prev; 2513 if (tmp->b_prev) 2514 tmp->b_prev->b_next = bp; 2515 else 2516 q->q_first = bp; 2517 tmp->b_prev = bp; 2518 } 2519 } else { /* bp->b_band != 0 */ 2520 if (qbp->qb_first) { 2521 tmp = qbp->qb_last; 2522 2523 /* 2524 * Insert bp after the last message in this band. 2525 */ 2526 bp->b_next = tmp->b_next; 2527 if (tmp->b_next) 2528 tmp->b_next->b_prev = bp; 2529 else 2530 q->q_last = bp; 2531 bp->b_prev = tmp; 2532 tmp->b_next = bp; 2533 } else { 2534 tmp = q->q_last; 2535 if ((mcls < (int)queclass(tmp)) || 2536 (bp->b_band <= tmp->b_band)) { 2537 2538 /* 2539 * Tack bp on end of queue. 2540 */ 2541 bp->b_next = NULL; 2542 bp->b_prev = tmp; 2543 tmp->b_next = bp; 2544 q->q_last = bp; 2545 } else { 2546 tmp = q->q_first; 2547 while (tmp->b_datap->db_type >= QPCTL) 2548 tmp = tmp->b_next; 2549 while (tmp->b_band >= bp->b_band) 2550 tmp = tmp->b_next; 2551 2552 /* 2553 * Insert bp before tmp. 2554 */ 2555 bp->b_next = tmp; 2556 bp->b_prev = tmp->b_prev; 2557 if (tmp->b_prev) 2558 tmp->b_prev->b_next = bp; 2559 else 2560 q->q_first = bp; 2561 tmp->b_prev = bp; 2562 } 2563 qbp->qb_first = bp; 2564 } 2565 qbp->qb_last = bp; 2566 } 2567 2568 /* Get message byte count for q_count accounting */ 2569 bytecnt = mp_cont_len(bp, &mblkcnt); 2570 2571 if (qbp) { 2572 qbp->qb_count += bytecnt; 2573 qbp->qb_mblkcnt += mblkcnt; 2574 if ((qbp->qb_count >= qbp->qb_hiwat) || 2575 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2576 qbp->qb_flag |= QB_FULL; 2577 } 2578 } else { 2579 q->q_count += bytecnt; 2580 q->q_mblkcnt += mblkcnt; 2581 if ((q->q_count >= q->q_hiwat) || 2582 (q->q_mblkcnt >= q->q_hiwat)) { 2583 q->q_flag |= QFULL; 2584 } 2585 } 2586 2587 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2588 2589 if ((mcls > QNORM) || 2590 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2591 qenable_locked(q); 2592 ASSERT(MUTEX_HELD(QLOCK(q))); 2593 if (freezer != curthread) 2594 mutex_exit(QLOCK(q)); 2595 2596 return (1); 2597 } 2598 2599 /* 2600 * Put stuff back at beginning of Q according to priority order. 2601 * See comment on putq above for details. 2602 */ 2603 int 2604 putbq(queue_t *q, mblk_t *bp) 2605 { 2606 mblk_t *tmp; 2607 qband_t *qbp = NULL; 2608 int mcls = (int)queclass(bp); 2609 kthread_id_t freezer; 2610 int bytecnt = 0, mblkcnt = 0; 2611 2612 ASSERT(q && bp); 2613 ASSERT(bp->b_next == NULL); 2614 freezer = STREAM(q)->sd_freezer; 2615 if (freezer == curthread) { 2616 ASSERT(frozenstr(q)); 2617 ASSERT(MUTEX_HELD(QLOCK(q))); 2618 } else 2619 mutex_enter(QLOCK(q)); 2620 2621 /* 2622 * Make sanity checks and if qband structure is not yet 2623 * allocated, do so. 2624 */ 2625 if (mcls == QPCTL) { 2626 if (bp->b_band != 0) 2627 bp->b_band = 0; /* force to be correct */ 2628 } else if (bp->b_band != 0) { 2629 int i; 2630 qband_t **qbpp; 2631 2632 if (bp->b_band > q->q_nband) { 2633 qbpp = &q->q_bandp; 2634 while (*qbpp) 2635 qbpp = &(*qbpp)->qb_next; 2636 while (bp->b_band > q->q_nband) { 2637 if ((*qbpp = allocband()) == NULL) { 2638 if (freezer != curthread) 2639 mutex_exit(QLOCK(q)); 2640 return (0); 2641 } 2642 (*qbpp)->qb_hiwat = q->q_hiwat; 2643 (*qbpp)->qb_lowat = q->q_lowat; 2644 q->q_nband++; 2645 qbpp = &(*qbpp)->qb_next; 2646 } 2647 } 2648 qbp = q->q_bandp; 2649 i = bp->b_band; 2650 while (--i) 2651 qbp = qbp->qb_next; 2652 } 2653 2654 /* 2655 * If queue is empty or if message is high priority, 2656 * place on the front of the queue. 2657 */ 2658 tmp = q->q_first; 2659 if ((!tmp) || (mcls == QPCTL)) { 2660 bp->b_next = tmp; 2661 if (tmp) 2662 tmp->b_prev = bp; 2663 else 2664 q->q_last = bp; 2665 q->q_first = bp; 2666 bp->b_prev = NULL; 2667 if (qbp) { 2668 qbp->qb_first = bp; 2669 qbp->qb_last = bp; 2670 } 2671 } else if (qbp) { /* bp->b_band != 0 */ 2672 tmp = qbp->qb_first; 2673 if (tmp) { 2674 2675 /* 2676 * Insert bp before the first message in this band. 2677 */ 2678 bp->b_next = tmp; 2679 bp->b_prev = tmp->b_prev; 2680 if (tmp->b_prev) 2681 tmp->b_prev->b_next = bp; 2682 else 2683 q->q_first = bp; 2684 tmp->b_prev = bp; 2685 } else { 2686 tmp = q->q_last; 2687 if ((mcls < (int)queclass(tmp)) || 2688 (bp->b_band < tmp->b_band)) { 2689 2690 /* 2691 * Tack bp on end of queue. 2692 */ 2693 bp->b_next = NULL; 2694 bp->b_prev = tmp; 2695 tmp->b_next = bp; 2696 q->q_last = bp; 2697 } else { 2698 tmp = q->q_first; 2699 while (tmp->b_datap->db_type >= QPCTL) 2700 tmp = tmp->b_next; 2701 while (tmp->b_band > bp->b_band) 2702 tmp = tmp->b_next; 2703 2704 /* 2705 * Insert bp before tmp. 2706 */ 2707 bp->b_next = tmp; 2708 bp->b_prev = tmp->b_prev; 2709 if (tmp->b_prev) 2710 tmp->b_prev->b_next = bp; 2711 else 2712 q->q_first = bp; 2713 tmp->b_prev = bp; 2714 } 2715 qbp->qb_last = bp; 2716 } 2717 qbp->qb_first = bp; 2718 } else { /* bp->b_band == 0 && !QPCTL */ 2719 2720 /* 2721 * If the queue class or band is less than that of the last 2722 * message on the queue, tack bp on the end of the queue. 2723 */ 2724 tmp = q->q_last; 2725 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2726 bp->b_next = NULL; 2727 bp->b_prev = tmp; 2728 tmp->b_next = bp; 2729 q->q_last = bp; 2730 } else { 2731 tmp = q->q_first; 2732 while (tmp->b_datap->db_type >= QPCTL) 2733 tmp = tmp->b_next; 2734 while (tmp->b_band > bp->b_band) 2735 tmp = tmp->b_next; 2736 2737 /* 2738 * Insert bp before tmp. 2739 */ 2740 bp->b_next = tmp; 2741 bp->b_prev = tmp->b_prev; 2742 if (tmp->b_prev) 2743 tmp->b_prev->b_next = bp; 2744 else 2745 q->q_first = bp; 2746 tmp->b_prev = bp; 2747 } 2748 } 2749 2750 /* Get message byte count for q_count accounting */ 2751 bytecnt = mp_cont_len(bp, &mblkcnt); 2752 2753 if (qbp) { 2754 qbp->qb_count += bytecnt; 2755 qbp->qb_mblkcnt += mblkcnt; 2756 if ((qbp->qb_count >= qbp->qb_hiwat) || 2757 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2758 qbp->qb_flag |= QB_FULL; 2759 } 2760 } else { 2761 q->q_count += bytecnt; 2762 q->q_mblkcnt += mblkcnt; 2763 if ((q->q_count >= q->q_hiwat) || 2764 (q->q_mblkcnt >= q->q_hiwat)) { 2765 q->q_flag |= QFULL; 2766 } 2767 } 2768 2769 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2770 2771 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2772 qenable_locked(q); 2773 ASSERT(MUTEX_HELD(QLOCK(q))); 2774 if (freezer != curthread) 2775 mutex_exit(QLOCK(q)); 2776 2777 return (1); 2778 } 2779 2780 /* 2781 * Insert a message before an existing message on the queue. If the 2782 * existing message is NULL, the new messages is placed on the end of 2783 * the queue. The queue class of the new message is ignored. However, 2784 * the priority band of the new message must adhere to the following 2785 * ordering: 2786 * 2787 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2788 * 2789 * All flow control parameters are updated. 2790 * 2791 * insq can be called with the stream frozen, but other utility functions 2792 * holding QLOCK, and by streams modules without any locks/frozen. 2793 */ 2794 int 2795 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2796 { 2797 mblk_t *tmp; 2798 qband_t *qbp = NULL; 2799 int mcls = (int)queclass(mp); 2800 kthread_id_t freezer; 2801 int bytecnt = 0, mblkcnt = 0; 2802 2803 freezer = STREAM(q)->sd_freezer; 2804 if (freezer == curthread) { 2805 ASSERT(frozenstr(q)); 2806 ASSERT(MUTEX_HELD(QLOCK(q))); 2807 } else if (MUTEX_HELD(QLOCK(q))) { 2808 /* Don't drop lock on exit */ 2809 freezer = curthread; 2810 } else 2811 mutex_enter(QLOCK(q)); 2812 2813 if (mcls == QPCTL) { 2814 if (mp->b_band != 0) 2815 mp->b_band = 0; /* force to be correct */ 2816 if (emp && emp->b_prev && 2817 (emp->b_prev->b_datap->db_type < QPCTL)) 2818 goto badord; 2819 } 2820 if (emp) { 2821 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2822 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2823 (emp->b_prev->b_band < mp->b_band))) { 2824 goto badord; 2825 } 2826 } else { 2827 tmp = q->q_last; 2828 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2829 badord: 2830 cmn_err(CE_WARN, 2831 "insq: attempt to insert message out of order " 2832 "on q %p", (void *)q); 2833 if (freezer != curthread) 2834 mutex_exit(QLOCK(q)); 2835 return (0); 2836 } 2837 } 2838 2839 if (mp->b_band != 0) { 2840 int i; 2841 qband_t **qbpp; 2842 2843 if (mp->b_band > q->q_nband) { 2844 qbpp = &q->q_bandp; 2845 while (*qbpp) 2846 qbpp = &(*qbpp)->qb_next; 2847 while (mp->b_band > q->q_nband) { 2848 if ((*qbpp = allocband()) == NULL) { 2849 if (freezer != curthread) 2850 mutex_exit(QLOCK(q)); 2851 return (0); 2852 } 2853 (*qbpp)->qb_hiwat = q->q_hiwat; 2854 (*qbpp)->qb_lowat = q->q_lowat; 2855 q->q_nband++; 2856 qbpp = &(*qbpp)->qb_next; 2857 } 2858 } 2859 qbp = q->q_bandp; 2860 i = mp->b_band; 2861 while (--i) 2862 qbp = qbp->qb_next; 2863 } 2864 2865 if ((mp->b_next = emp) != NULL) { 2866 if ((mp->b_prev = emp->b_prev) != NULL) 2867 emp->b_prev->b_next = mp; 2868 else 2869 q->q_first = mp; 2870 emp->b_prev = mp; 2871 } else { 2872 if ((mp->b_prev = q->q_last) != NULL) 2873 q->q_last->b_next = mp; 2874 else 2875 q->q_first = mp; 2876 q->q_last = mp; 2877 } 2878 2879 /* Get mblk and byte count for q_count accounting */ 2880 bytecnt = mp_cont_len(mp, &mblkcnt); 2881 2882 if (qbp) { /* adjust qband pointers and count */ 2883 if (!qbp->qb_first) { 2884 qbp->qb_first = mp; 2885 qbp->qb_last = mp; 2886 } else { 2887 if (mp->b_prev == NULL || (mp->b_prev != NULL && 2888 (mp->b_prev->b_band != mp->b_band))) 2889 qbp->qb_first = mp; 2890 else if (mp->b_next == NULL || (mp->b_next != NULL && 2891 (mp->b_next->b_band != mp->b_band))) 2892 qbp->qb_last = mp; 2893 } 2894 qbp->qb_count += bytecnt; 2895 qbp->qb_mblkcnt += mblkcnt; 2896 if ((qbp->qb_count >= qbp->qb_hiwat) || 2897 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2898 qbp->qb_flag |= QB_FULL; 2899 } 2900 } else { 2901 q->q_count += bytecnt; 2902 q->q_mblkcnt += mblkcnt; 2903 if ((q->q_count >= q->q_hiwat) || 2904 (q->q_mblkcnt >= q->q_hiwat)) { 2905 q->q_flag |= QFULL; 2906 } 2907 } 2908 2909 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 2910 2911 if (canenable(q) && (q->q_flag & QWANTR)) 2912 qenable_locked(q); 2913 2914 ASSERT(MUTEX_HELD(QLOCK(q))); 2915 if (freezer != curthread) 2916 mutex_exit(QLOCK(q)); 2917 2918 return (1); 2919 } 2920 2921 /* 2922 * Create and put a control message on queue. 2923 */ 2924 int 2925 putctl(queue_t *q, int type) 2926 { 2927 mblk_t *bp; 2928 2929 if ((datamsg(type) && (type != M_DELAY)) || 2930 (bp = allocb_tryhard(0)) == NULL) 2931 return (0); 2932 bp->b_datap->db_type = (unsigned char) type; 2933 2934 put(q, bp); 2935 2936 return (1); 2937 } 2938 2939 /* 2940 * Control message with a single-byte parameter 2941 */ 2942 int 2943 putctl1(queue_t *q, int type, int param) 2944 { 2945 mblk_t *bp; 2946 2947 if ((datamsg(type) && (type != M_DELAY)) || 2948 (bp = allocb_tryhard(1)) == NULL) 2949 return (0); 2950 bp->b_datap->db_type = (unsigned char)type; 2951 *bp->b_wptr++ = (unsigned char)param; 2952 2953 put(q, bp); 2954 2955 return (1); 2956 } 2957 2958 int 2959 putnextctl1(queue_t *q, int type, int param) 2960 { 2961 mblk_t *bp; 2962 2963 if ((datamsg(type) && (type != M_DELAY)) || 2964 ((bp = allocb_tryhard(1)) == NULL)) 2965 return (0); 2966 2967 bp->b_datap->db_type = (unsigned char)type; 2968 *bp->b_wptr++ = (unsigned char)param; 2969 2970 putnext(q, bp); 2971 2972 return (1); 2973 } 2974 2975 int 2976 putnextctl(queue_t *q, int type) 2977 { 2978 mblk_t *bp; 2979 2980 if ((datamsg(type) && (type != M_DELAY)) || 2981 ((bp = allocb_tryhard(0)) == NULL)) 2982 return (0); 2983 bp->b_datap->db_type = (unsigned char)type; 2984 2985 putnext(q, bp); 2986 2987 return (1); 2988 } 2989 2990 /* 2991 * Return the queue upstream from this one 2992 */ 2993 queue_t * 2994 backq(queue_t *q) 2995 { 2996 q = _OTHERQ(q); 2997 if (q->q_next) { 2998 q = q->q_next; 2999 return (_OTHERQ(q)); 3000 } 3001 return (NULL); 3002 } 3003 3004 /* 3005 * Send a block back up the queue in reverse from this 3006 * one (e.g. to respond to ioctls) 3007 */ 3008 void 3009 qreply(queue_t *q, mblk_t *bp) 3010 { 3011 ASSERT(q && bp); 3012 3013 putnext(_OTHERQ(q), bp); 3014 } 3015 3016 /* 3017 * Streams Queue Scheduling 3018 * 3019 * Queues are enabled through qenable() when they have messages to 3020 * process. They are serviced by queuerun(), which runs each enabled 3021 * queue's service procedure. The call to queuerun() is processor 3022 * dependent - the general principle is that it be run whenever a queue 3023 * is enabled but before returning to user level. For system calls, 3024 * the function runqueues() is called if their action causes a queue 3025 * to be enabled. For device interrupts, queuerun() should be 3026 * called before returning from the last level of interrupt. Beyond 3027 * this, no timing assumptions should be made about queue scheduling. 3028 */ 3029 3030 /* 3031 * Enable a queue: put it on list of those whose service procedures are 3032 * ready to run and set up the scheduling mechanism. 3033 * The broadcast is done outside the mutex -> to avoid the woken thread 3034 * from contending with the mutex. This is OK 'cos the queue has been 3035 * enqueued on the runlist and flagged safely at this point. 3036 */ 3037 void 3038 qenable(queue_t *q) 3039 { 3040 mutex_enter(QLOCK(q)); 3041 qenable_locked(q); 3042 mutex_exit(QLOCK(q)); 3043 } 3044 /* 3045 * Return number of messages on queue 3046 */ 3047 int 3048 qsize(queue_t *qp) 3049 { 3050 int count = 0; 3051 mblk_t *mp; 3052 3053 mutex_enter(QLOCK(qp)); 3054 for (mp = qp->q_first; mp; mp = mp->b_next) 3055 count++; 3056 mutex_exit(QLOCK(qp)); 3057 return (count); 3058 } 3059 3060 /* 3061 * noenable - set queue so that putq() will not enable it. 3062 * enableok - set queue so that putq() can enable it. 3063 */ 3064 void 3065 noenable(queue_t *q) 3066 { 3067 mutex_enter(QLOCK(q)); 3068 q->q_flag |= QNOENB; 3069 mutex_exit(QLOCK(q)); 3070 } 3071 3072 void 3073 enableok(queue_t *q) 3074 { 3075 mutex_enter(QLOCK(q)); 3076 q->q_flag &= ~QNOENB; 3077 mutex_exit(QLOCK(q)); 3078 } 3079 3080 /* 3081 * Set queue fields. 3082 */ 3083 int 3084 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 3085 { 3086 qband_t *qbp = NULL; 3087 queue_t *wrq; 3088 int error = 0; 3089 kthread_id_t freezer; 3090 3091 freezer = STREAM(q)->sd_freezer; 3092 if (freezer == curthread) { 3093 ASSERT(frozenstr(q)); 3094 ASSERT(MUTEX_HELD(QLOCK(q))); 3095 } else 3096 mutex_enter(QLOCK(q)); 3097 3098 if (what >= QBAD) { 3099 error = EINVAL; 3100 goto done; 3101 } 3102 if (pri != 0) { 3103 int i; 3104 qband_t **qbpp; 3105 3106 if (pri > q->q_nband) { 3107 qbpp = &q->q_bandp; 3108 while (*qbpp) 3109 qbpp = &(*qbpp)->qb_next; 3110 while (pri > q->q_nband) { 3111 if ((*qbpp = allocband()) == NULL) { 3112 error = EAGAIN; 3113 goto done; 3114 } 3115 (*qbpp)->qb_hiwat = q->q_hiwat; 3116 (*qbpp)->qb_lowat = q->q_lowat; 3117 q->q_nband++; 3118 qbpp = &(*qbpp)->qb_next; 3119 } 3120 } 3121 qbp = q->q_bandp; 3122 i = pri; 3123 while (--i) 3124 qbp = qbp->qb_next; 3125 } 3126 switch (what) { 3127 3128 case QHIWAT: 3129 if (qbp) 3130 qbp->qb_hiwat = (size_t)val; 3131 else 3132 q->q_hiwat = (size_t)val; 3133 break; 3134 3135 case QLOWAT: 3136 if (qbp) 3137 qbp->qb_lowat = (size_t)val; 3138 else 3139 q->q_lowat = (size_t)val; 3140 break; 3141 3142 case QMAXPSZ: 3143 if (qbp) 3144 error = EINVAL; 3145 else 3146 q->q_maxpsz = (ssize_t)val; 3147 3148 /* 3149 * Performance concern, strwrite looks at the module below 3150 * the stream head for the maxpsz each time it does a write 3151 * we now cache it at the stream head. Check to see if this 3152 * queue is sitting directly below the stream head. 3153 */ 3154 wrq = STREAM(q)->sd_wrq; 3155 if (q != wrq->q_next) 3156 break; 3157 3158 /* 3159 * If the stream is not frozen drop the current QLOCK and 3160 * acquire the sd_wrq QLOCK which protects sd_qn_* 3161 */ 3162 if (freezer != curthread) { 3163 mutex_exit(QLOCK(q)); 3164 mutex_enter(QLOCK(wrq)); 3165 } 3166 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3167 3168 if (strmsgsz != 0) { 3169 if (val == INFPSZ) 3170 val = strmsgsz; 3171 else { 3172 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3173 val = MIN(PIPE_BUF, val); 3174 else 3175 val = MIN(strmsgsz, val); 3176 } 3177 } 3178 STREAM(q)->sd_qn_maxpsz = val; 3179 if (freezer != curthread) { 3180 mutex_exit(QLOCK(wrq)); 3181 mutex_enter(QLOCK(q)); 3182 } 3183 break; 3184 3185 case QMINPSZ: 3186 if (qbp) 3187 error = EINVAL; 3188 else 3189 q->q_minpsz = (ssize_t)val; 3190 3191 /* 3192 * Performance concern, strwrite looks at the module below 3193 * the stream head for the maxpsz each time it does a write 3194 * we now cache it at the stream head. Check to see if this 3195 * queue is sitting directly below the stream head. 3196 */ 3197 wrq = STREAM(q)->sd_wrq; 3198 if (q != wrq->q_next) 3199 break; 3200 3201 /* 3202 * If the stream is not frozen drop the current QLOCK and 3203 * acquire the sd_wrq QLOCK which protects sd_qn_* 3204 */ 3205 if (freezer != curthread) { 3206 mutex_exit(QLOCK(q)); 3207 mutex_enter(QLOCK(wrq)); 3208 } 3209 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3210 3211 if (freezer != curthread) { 3212 mutex_exit(QLOCK(wrq)); 3213 mutex_enter(QLOCK(q)); 3214 } 3215 break; 3216 3217 case QSTRUIOT: 3218 if (qbp) 3219 error = EINVAL; 3220 else 3221 q->q_struiot = (ushort_t)val; 3222 break; 3223 3224 case QCOUNT: 3225 case QFIRST: 3226 case QLAST: 3227 case QFLAG: 3228 error = EPERM; 3229 break; 3230 3231 default: 3232 error = EINVAL; 3233 break; 3234 } 3235 done: 3236 if (freezer != curthread) 3237 mutex_exit(QLOCK(q)); 3238 return (error); 3239 } 3240 3241 /* 3242 * Get queue fields. 3243 */ 3244 int 3245 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3246 { 3247 qband_t *qbp = NULL; 3248 int error = 0; 3249 kthread_id_t freezer; 3250 3251 freezer = STREAM(q)->sd_freezer; 3252 if (freezer == curthread) { 3253 ASSERT(frozenstr(q)); 3254 ASSERT(MUTEX_HELD(QLOCK(q))); 3255 } else 3256 mutex_enter(QLOCK(q)); 3257 if (what >= QBAD) { 3258 error = EINVAL; 3259 goto done; 3260 } 3261 if (pri != 0) { 3262 int i; 3263 qband_t **qbpp; 3264 3265 if (pri > q->q_nband) { 3266 qbpp = &q->q_bandp; 3267 while (*qbpp) 3268 qbpp = &(*qbpp)->qb_next; 3269 while (pri > q->q_nband) { 3270 if ((*qbpp = allocband()) == NULL) { 3271 error = EAGAIN; 3272 goto done; 3273 } 3274 (*qbpp)->qb_hiwat = q->q_hiwat; 3275 (*qbpp)->qb_lowat = q->q_lowat; 3276 q->q_nband++; 3277 qbpp = &(*qbpp)->qb_next; 3278 } 3279 } 3280 qbp = q->q_bandp; 3281 i = pri; 3282 while (--i) 3283 qbp = qbp->qb_next; 3284 } 3285 switch (what) { 3286 case QHIWAT: 3287 if (qbp) 3288 *(size_t *)valp = qbp->qb_hiwat; 3289 else 3290 *(size_t *)valp = q->q_hiwat; 3291 break; 3292 3293 case QLOWAT: 3294 if (qbp) 3295 *(size_t *)valp = qbp->qb_lowat; 3296 else 3297 *(size_t *)valp = q->q_lowat; 3298 break; 3299 3300 case QMAXPSZ: 3301 if (qbp) 3302 error = EINVAL; 3303 else 3304 *(ssize_t *)valp = q->q_maxpsz; 3305 break; 3306 3307 case QMINPSZ: 3308 if (qbp) 3309 error = EINVAL; 3310 else 3311 *(ssize_t *)valp = q->q_minpsz; 3312 break; 3313 3314 case QCOUNT: 3315 if (qbp) 3316 *(size_t *)valp = qbp->qb_count; 3317 else 3318 *(size_t *)valp = q->q_count; 3319 break; 3320 3321 case QFIRST: 3322 if (qbp) 3323 *(mblk_t **)valp = qbp->qb_first; 3324 else 3325 *(mblk_t **)valp = q->q_first; 3326 break; 3327 3328 case QLAST: 3329 if (qbp) 3330 *(mblk_t **)valp = qbp->qb_last; 3331 else 3332 *(mblk_t **)valp = q->q_last; 3333 break; 3334 3335 case QFLAG: 3336 if (qbp) 3337 *(uint_t *)valp = qbp->qb_flag; 3338 else 3339 *(uint_t *)valp = q->q_flag; 3340 break; 3341 3342 case QSTRUIOT: 3343 if (qbp) 3344 error = EINVAL; 3345 else 3346 *(short *)valp = q->q_struiot; 3347 break; 3348 3349 default: 3350 error = EINVAL; 3351 break; 3352 } 3353 done: 3354 if (freezer != curthread) 3355 mutex_exit(QLOCK(q)); 3356 return (error); 3357 } 3358 3359 /* 3360 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3361 * QWANTWSYNC or QWANTR or QWANTW, 3362 * 3363 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3364 * deferred wakeup will be done. Also if strpoll() in progress then a 3365 * deferred pollwakeup will be done. 3366 */ 3367 void 3368 strwakeq(queue_t *q, int flag) 3369 { 3370 stdata_t *stp = STREAM(q); 3371 pollhead_t *pl; 3372 3373 mutex_enter(&stp->sd_lock); 3374 pl = &stp->sd_pollist; 3375 if (flag & QWANTWSYNC) { 3376 ASSERT(!(q->q_flag & QREADR)); 3377 if (stp->sd_flag & WSLEEP) { 3378 stp->sd_flag &= ~WSLEEP; 3379 cv_broadcast(&stp->sd_wrq->q_wait); 3380 } else { 3381 stp->sd_wakeq |= WSLEEP; 3382 } 3383 3384 mutex_exit(&stp->sd_lock); 3385 pollwakeup(pl, POLLWRNORM); 3386 mutex_enter(&stp->sd_lock); 3387 3388 if (stp->sd_sigflags & S_WRNORM) 3389 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3390 } else if (flag & QWANTR) { 3391 if (stp->sd_flag & RSLEEP) { 3392 stp->sd_flag &= ~RSLEEP; 3393 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3394 } else { 3395 stp->sd_wakeq |= RSLEEP; 3396 } 3397 3398 mutex_exit(&stp->sd_lock); 3399 pollwakeup(pl, POLLIN | POLLRDNORM); 3400 mutex_enter(&stp->sd_lock); 3401 3402 { 3403 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3404 3405 if (events) 3406 strsendsig(stp->sd_siglist, events, 0, 0); 3407 } 3408 } else { 3409 if (stp->sd_flag & WSLEEP) { 3410 stp->sd_flag &= ~WSLEEP; 3411 cv_broadcast(&stp->sd_wrq->q_wait); 3412 } 3413 3414 mutex_exit(&stp->sd_lock); 3415 pollwakeup(pl, POLLWRNORM); 3416 mutex_enter(&stp->sd_lock); 3417 3418 if (stp->sd_sigflags & S_WRNORM) 3419 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3420 } 3421 mutex_exit(&stp->sd_lock); 3422 } 3423 3424 int 3425 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3426 { 3427 stdata_t *stp = STREAM(q); 3428 int typ = STRUIOT_STANDARD; 3429 uio_t *uiop = &dp->d_uio; 3430 dblk_t *dbp; 3431 ssize_t uiocnt; 3432 ssize_t cnt; 3433 unsigned char *ptr; 3434 ssize_t resid; 3435 int error = 0; 3436 on_trap_data_t otd; 3437 queue_t *stwrq; 3438 3439 /* 3440 * Plumbing may change while taking the type so store the 3441 * queue in a temporary variable. It doesn't matter even 3442 * if the we take the type from the previous plumbing, 3443 * that's because if the plumbing has changed when we were 3444 * holding the queue in a temporary variable, we can continue 3445 * processing the message the way it would have been processed 3446 * in the old plumbing, without any side effects but a bit 3447 * extra processing for partial ip header checksum. 3448 * 3449 * This has been done to avoid holding the sd_lock which is 3450 * very hot. 3451 */ 3452 3453 stwrq = stp->sd_struiowrq; 3454 if (stwrq) 3455 typ = stwrq->q_struiot; 3456 3457 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3458 dbp = mp->b_datap; 3459 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3460 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3461 cnt = MIN(uiocnt, uiop->uio_resid); 3462 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3463 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3464 /* 3465 * Either this mblk has already been processed 3466 * or there is no more room in this mblk (?). 3467 */ 3468 continue; 3469 } 3470 switch (typ) { 3471 case STRUIOT_STANDARD: 3472 if (noblock) { 3473 if (on_trap(&otd, OT_DATA_ACCESS)) { 3474 no_trap(); 3475 error = EWOULDBLOCK; 3476 goto out; 3477 } 3478 } 3479 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3480 if (noblock) 3481 no_trap(); 3482 goto out; 3483 } 3484 if (noblock) 3485 no_trap(); 3486 break; 3487 3488 default: 3489 error = EIO; 3490 goto out; 3491 } 3492 dbp->db_struioflag |= STRUIO_DONE; 3493 dbp->db_cksumstuff += cnt; 3494 } 3495 out: 3496 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3497 /* 3498 * A fault has occured and some bytes were moved to the 3499 * current mblk, the uio_t has already been updated by 3500 * the appropriate uio routine, so also update the mblk 3501 * to reflect this in case this same mblk chain is used 3502 * again (after the fault has been handled). 3503 */ 3504 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3505 if (uiocnt >= resid) 3506 dbp->db_cksumstuff += resid; 3507 } 3508 return (error); 3509 } 3510 3511 /* 3512 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3513 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3514 * that removeq() will not try to close the queue while a thread is inside the 3515 * queue. 3516 */ 3517 static boolean_t 3518 rwnext_enter(queue_t *qp) 3519 { 3520 mutex_enter(QLOCK(qp)); 3521 if (qp->q_flag & QWCLOSE) { 3522 mutex_exit(QLOCK(qp)); 3523 return (B_FALSE); 3524 } 3525 qp->q_rwcnt++; 3526 ASSERT(qp->q_rwcnt != 0); 3527 mutex_exit(QLOCK(qp)); 3528 return (B_TRUE); 3529 } 3530 3531 /* 3532 * Decrease the count of threads running in sync stream queue and wake up any 3533 * threads blocked in removeq(). 3534 */ 3535 static void 3536 rwnext_exit(queue_t *qp) 3537 { 3538 mutex_enter(QLOCK(qp)); 3539 qp->q_rwcnt--; 3540 if (qp->q_flag & QWANTRMQSYNC) { 3541 qp->q_flag &= ~QWANTRMQSYNC; 3542 cv_broadcast(&qp->q_wait); 3543 } 3544 mutex_exit(QLOCK(qp)); 3545 } 3546 3547 /* 3548 * The purpose of rwnext() is to call the rw procedure of the next 3549 * (downstream) modules queue. 3550 * 3551 * treated as put entrypoint for perimeter syncronization. 3552 * 3553 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3554 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3555 * not matter if any regular put entrypoints have been already entered. We 3556 * can't increment one of the sq_putcounts (instead of sq_count) because 3557 * qwait_rw won't know which counter to decrement. 3558 * 3559 * It would be reasonable to add the lockless FASTPUT logic. 3560 */ 3561 int 3562 rwnext(queue_t *qp, struiod_t *dp) 3563 { 3564 queue_t *nqp; 3565 syncq_t *sq; 3566 uint16_t count; 3567 uint16_t flags; 3568 struct qinit *qi; 3569 int (*proc)(); 3570 struct stdata *stp; 3571 int isread; 3572 int rval; 3573 3574 stp = STREAM(qp); 3575 /* 3576 * Prevent q_next from changing by holding sd_lock until acquiring 3577 * SQLOCK. Note that a read-side rwnext from the streamhead will 3578 * already have sd_lock acquired. In either case sd_lock is always 3579 * released after acquiring SQLOCK. 3580 * 3581 * The streamhead read-side holding sd_lock when calling rwnext is 3582 * required to prevent a race condition were M_DATA mblks flowing 3583 * up the read-side of the stream could be bypassed by a rwnext() 3584 * down-call. In this case sd_lock acts as the streamhead perimeter. 3585 */ 3586 if ((nqp = _WR(qp)) == qp) { 3587 isread = 0; 3588 mutex_enter(&stp->sd_lock); 3589 qp = nqp->q_next; 3590 } else { 3591 isread = 1; 3592 if (nqp != stp->sd_wrq) 3593 /* Not streamhead */ 3594 mutex_enter(&stp->sd_lock); 3595 qp = _RD(nqp->q_next); 3596 } 3597 qi = qp->q_qinfo; 3598 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3599 /* 3600 * Not a synchronous module or no r/w procedure for this 3601 * queue, so just return EINVAL and let the caller handle it. 3602 */ 3603 mutex_exit(&stp->sd_lock); 3604 return (EINVAL); 3605 } 3606 3607 if (rwnext_enter(qp) == B_FALSE) { 3608 mutex_exit(&stp->sd_lock); 3609 return (EINVAL); 3610 } 3611 3612 sq = qp->q_syncq; 3613 mutex_enter(SQLOCK(sq)); 3614 mutex_exit(&stp->sd_lock); 3615 count = sq->sq_count; 3616 flags = sq->sq_flags; 3617 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3618 3619 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3620 /* 3621 * if this queue is being closed, return. 3622 */ 3623 if (qp->q_flag & QWCLOSE) { 3624 mutex_exit(SQLOCK(sq)); 3625 rwnext_exit(qp); 3626 return (EINVAL); 3627 } 3628 3629 /* 3630 * Wait until we can enter the inner perimeter. 3631 */ 3632 sq->sq_flags = flags | SQ_WANTWAKEUP; 3633 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3634 count = sq->sq_count; 3635 flags = sq->sq_flags; 3636 } 3637 3638 if (isread == 0 && stp->sd_struiowrq == NULL || 3639 isread == 1 && stp->sd_struiordq == NULL) { 3640 /* 3641 * Stream plumbing changed while waiting for inner perimeter 3642 * so just return EINVAL and let the caller handle it. 3643 */ 3644 mutex_exit(SQLOCK(sq)); 3645 rwnext_exit(qp); 3646 return (EINVAL); 3647 } 3648 if (!(flags & SQ_CIPUT)) 3649 sq->sq_flags = flags | SQ_EXCL; 3650 sq->sq_count = count + 1; 3651 ASSERT(sq->sq_count != 0); /* Wraparound */ 3652 /* 3653 * Note: The only message ordering guarantee that rwnext() makes is 3654 * for the write queue flow-control case. All others (r/w queue 3655 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3656 * the queue's rw procedure. This could be genralized here buy 3657 * running the queue's service procedure, but that wouldn't be 3658 * the most efficent for all cases. 3659 */ 3660 mutex_exit(SQLOCK(sq)); 3661 if (! isread && (qp->q_flag & QFULL)) { 3662 /* 3663 * Write queue may be flow controlled. If so, 3664 * mark the queue for wakeup when it's not. 3665 */ 3666 mutex_enter(QLOCK(qp)); 3667 if (qp->q_flag & QFULL) { 3668 qp->q_flag |= QWANTWSYNC; 3669 mutex_exit(QLOCK(qp)); 3670 rval = EWOULDBLOCK; 3671 goto out; 3672 } 3673 mutex_exit(QLOCK(qp)); 3674 } 3675 3676 if (! isread && dp->d_mp) 3677 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3678 dp->d_mp->b_datap->db_base); 3679 3680 rval = (*proc)(qp, dp); 3681 3682 if (isread && dp->d_mp) 3683 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3684 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3685 out: 3686 /* 3687 * The queue is protected from being freed by sq_count, so it is 3688 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3689 */ 3690 rwnext_exit(qp); 3691 3692 mutex_enter(SQLOCK(sq)); 3693 flags = sq->sq_flags; 3694 ASSERT(sq->sq_count != 0); 3695 sq->sq_count--; 3696 if (flags & SQ_TAIL) { 3697 putnext_tail(sq, qp, flags); 3698 /* 3699 * The only purpose of this ASSERT is to preserve calling stack 3700 * in DEBUG kernel. 3701 */ 3702 ASSERT(flags & SQ_TAIL); 3703 return (rval); 3704 } 3705 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3706 /* 3707 * Safe to always drop SQ_EXCL: 3708 * Not SQ_CIPUT means we set SQ_EXCL above 3709 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3710 * did a qwriter(INNER) in which case nobody else 3711 * is in the inner perimeter and we are exiting. 3712 * 3713 * I would like to make the following assertion: 3714 * 3715 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3716 * sq->sq_count == 0); 3717 * 3718 * which indicates that if we are both putshared and exclusive, 3719 * we became exclusive while executing the putproc, and the only 3720 * claim on the syncq was the one we dropped a few lines above. 3721 * But other threads that enter putnext while the syncq is exclusive 3722 * need to make a claim as they may need to drop SQLOCK in the 3723 * has_writers case to avoid deadlocks. If these threads are 3724 * delayed or preempted, it is possible that the writer thread can 3725 * find out that there are other claims making the (sq_count == 0) 3726 * test invalid. 3727 */ 3728 3729 sq->sq_flags = flags & ~SQ_EXCL; 3730 if (sq->sq_flags & SQ_WANTWAKEUP) { 3731 sq->sq_flags &= ~SQ_WANTWAKEUP; 3732 cv_broadcast(&sq->sq_wait); 3733 } 3734 mutex_exit(SQLOCK(sq)); 3735 return (rval); 3736 } 3737 3738 /* 3739 * The purpose of infonext() is to call the info procedure of the next 3740 * (downstream) modules queue. 3741 * 3742 * treated as put entrypoint for perimeter syncronization. 3743 * 3744 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3745 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3746 * it does not matter if any regular put entrypoints have been already 3747 * entered. 3748 */ 3749 int 3750 infonext(queue_t *qp, infod_t *idp) 3751 { 3752 queue_t *nqp; 3753 syncq_t *sq; 3754 uint16_t count; 3755 uint16_t flags; 3756 struct qinit *qi; 3757 int (*proc)(); 3758 struct stdata *stp; 3759 int rval; 3760 3761 stp = STREAM(qp); 3762 /* 3763 * Prevent q_next from changing by holding sd_lock until 3764 * acquiring SQLOCK. 3765 */ 3766 mutex_enter(&stp->sd_lock); 3767 if ((nqp = _WR(qp)) == qp) { 3768 qp = nqp->q_next; 3769 } else { 3770 qp = _RD(nqp->q_next); 3771 } 3772 qi = qp->q_qinfo; 3773 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3774 mutex_exit(&stp->sd_lock); 3775 return (EINVAL); 3776 } 3777 sq = qp->q_syncq; 3778 mutex_enter(SQLOCK(sq)); 3779 mutex_exit(&stp->sd_lock); 3780 count = sq->sq_count; 3781 flags = sq->sq_flags; 3782 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3783 3784 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3785 /* 3786 * Wait until we can enter the inner perimeter. 3787 */ 3788 sq->sq_flags = flags | SQ_WANTWAKEUP; 3789 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3790 count = sq->sq_count; 3791 flags = sq->sq_flags; 3792 } 3793 3794 if (! (flags & SQ_CIPUT)) 3795 sq->sq_flags = flags | SQ_EXCL; 3796 sq->sq_count = count + 1; 3797 ASSERT(sq->sq_count != 0); /* Wraparound */ 3798 mutex_exit(SQLOCK(sq)); 3799 3800 rval = (*proc)(qp, idp); 3801 3802 mutex_enter(SQLOCK(sq)); 3803 flags = sq->sq_flags; 3804 ASSERT(sq->sq_count != 0); 3805 sq->sq_count--; 3806 if (flags & SQ_TAIL) { 3807 putnext_tail(sq, qp, flags); 3808 /* 3809 * The only purpose of this ASSERT is to preserve calling stack 3810 * in DEBUG kernel. 3811 */ 3812 ASSERT(flags & SQ_TAIL); 3813 return (rval); 3814 } 3815 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3816 /* 3817 * XXXX 3818 * I am not certain the next comment is correct here. I need to consider 3819 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3820 * might cause other problems. It just might be safer to drop it if 3821 * !SQ_CIPUT because that is when we set it. 3822 */ 3823 /* 3824 * Safe to always drop SQ_EXCL: 3825 * Not SQ_CIPUT means we set SQ_EXCL above 3826 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3827 * did a qwriter(INNER) in which case nobody else 3828 * is in the inner perimeter and we are exiting. 3829 * 3830 * I would like to make the following assertion: 3831 * 3832 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3833 * sq->sq_count == 0); 3834 * 3835 * which indicates that if we are both putshared and exclusive, 3836 * we became exclusive while executing the putproc, and the only 3837 * claim on the syncq was the one we dropped a few lines above. 3838 * But other threads that enter putnext while the syncq is exclusive 3839 * need to make a claim as they may need to drop SQLOCK in the 3840 * has_writers case to avoid deadlocks. If these threads are 3841 * delayed or preempted, it is possible that the writer thread can 3842 * find out that there are other claims making the (sq_count == 0) 3843 * test invalid. 3844 */ 3845 3846 sq->sq_flags = flags & ~SQ_EXCL; 3847 mutex_exit(SQLOCK(sq)); 3848 return (rval); 3849 } 3850 3851 /* 3852 * Return nonzero if the queue is responsible for struio(), else return 0. 3853 */ 3854 int 3855 isuioq(queue_t *q) 3856 { 3857 if (q->q_flag & QREADR) 3858 return (STREAM(q)->sd_struiordq == q); 3859 else 3860 return (STREAM(q)->sd_struiowrq == q); 3861 } 3862 3863 #if defined(__sparc) 3864 int disable_putlocks = 0; 3865 #else 3866 int disable_putlocks = 1; 3867 #endif 3868 3869 /* 3870 * called by create_putlock. 3871 */ 3872 static void 3873 create_syncq_putlocks(queue_t *q) 3874 { 3875 syncq_t *sq = q->q_syncq; 3876 ciputctrl_t *cip; 3877 int i; 3878 3879 ASSERT(sq != NULL); 3880 3881 ASSERT(disable_putlocks == 0); 3882 ASSERT(n_ciputctrl >= min_n_ciputctrl); 3883 ASSERT(ciputctrl_cache != NULL); 3884 3885 if (!(sq->sq_type & SQ_CIPUT)) 3886 return; 3887 3888 for (i = 0; i <= 1; i++) { 3889 if (sq->sq_ciputctrl == NULL) { 3890 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3891 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3892 mutex_enter(SQLOCK(sq)); 3893 if (sq->sq_ciputctrl != NULL) { 3894 mutex_exit(SQLOCK(sq)); 3895 kmem_cache_free(ciputctrl_cache, cip); 3896 } else { 3897 ASSERT(sq->sq_nciputctrl == 0); 3898 sq->sq_nciputctrl = n_ciputctrl - 1; 3899 /* 3900 * putnext checks sq_ciputctrl without holding 3901 * SQLOCK. if it is not NULL putnext assumes 3902 * sq_nciputctrl is initialized. membar below 3903 * insures that. 3904 */ 3905 membar_producer(); 3906 sq->sq_ciputctrl = cip; 3907 mutex_exit(SQLOCK(sq)); 3908 } 3909 } 3910 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 3911 if (i == 1) 3912 break; 3913 q = _OTHERQ(q); 3914 if (!(q->q_flag & QPERQ)) { 3915 ASSERT(sq == q->q_syncq); 3916 break; 3917 } 3918 ASSERT(q->q_syncq != NULL); 3919 ASSERT(sq != q->q_syncq); 3920 sq = q->q_syncq; 3921 ASSERT(sq->sq_type & SQ_CIPUT); 3922 } 3923 } 3924 3925 /* 3926 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 3927 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 3928 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 3929 * starting from q and down to the driver. 3930 * 3931 * This should be called after the affected queues are part of stream 3932 * geometry. It should be called from driver/module open routine after 3933 * qprocson() call. It is also called from nfs syscall where it is known that 3934 * stream is configured and won't change its geometry during create_putlock 3935 * call. 3936 * 3937 * caller normally uses 0 value for the stream argument to speed up MT putnext 3938 * into the perimeter of q for example because its perimeter is per module 3939 * (e.g. IP). 3940 * 3941 * caller normally uses non 0 value for the stream argument to hint the system 3942 * that the stream of q is a very contended global system stream 3943 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 3944 * particularly MT hot. 3945 * 3946 * Caller insures stream plumbing won't happen while we are here and therefore 3947 * q_next can be safely used. 3948 */ 3949 3950 void 3951 create_putlocks(queue_t *q, int stream) 3952 { 3953 ciputctrl_t *cip; 3954 struct stdata *stp = STREAM(q); 3955 3956 q = _WR(q); 3957 ASSERT(stp != NULL); 3958 3959 if (disable_putlocks != 0) 3960 return; 3961 3962 if (n_ciputctrl < min_n_ciputctrl) 3963 return; 3964 3965 ASSERT(ciputctrl_cache != NULL); 3966 3967 if (stream != 0 && stp->sd_ciputctrl == NULL) { 3968 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3969 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3970 mutex_enter(&stp->sd_lock); 3971 if (stp->sd_ciputctrl != NULL) { 3972 mutex_exit(&stp->sd_lock); 3973 kmem_cache_free(ciputctrl_cache, cip); 3974 } else { 3975 ASSERT(stp->sd_nciputctrl == 0); 3976 stp->sd_nciputctrl = n_ciputctrl - 1; 3977 /* 3978 * putnext checks sd_ciputctrl without holding 3979 * sd_lock. if it is not NULL putnext assumes 3980 * sd_nciputctrl is initialized. membar below 3981 * insures that. 3982 */ 3983 membar_producer(); 3984 stp->sd_ciputctrl = cip; 3985 mutex_exit(&stp->sd_lock); 3986 } 3987 } 3988 3989 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 3990 3991 while (_SAMESTR(q)) { 3992 create_syncq_putlocks(q); 3993 if (stream == 0) 3994 return; 3995 q = q->q_next; 3996 } 3997 ASSERT(q != NULL); 3998 create_syncq_putlocks(q); 3999 } 4000 4001 /* 4002 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 4003 * through a stream. 4004 * 4005 * Data currently record per event is a hrtime stamp, queue address, event 4006 * type, and a per type datum. Much of the STREAMS framework is instrumented 4007 * for automatic flow tracing (when enabled). Events can be defined and used 4008 * by STREAMS modules and drivers. 4009 * 4010 * Global objects: 4011 * 4012 * str_ftevent() - Add a flow-trace event to a dblk. 4013 * str_ftfree() - Free flow-trace data 4014 * 4015 * Local objects: 4016 * 4017 * fthdr_cache - pointer to the kmem cache for trace header. 4018 * ftblk_cache - pointer to the kmem cache for trace data blocks. 4019 */ 4020 4021 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 4022 4023 void 4024 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 4025 { 4026 ftblk_t *bp = hp->tail; 4027 ftblk_t *nbp; 4028 ftevnt_t *ep; 4029 int ix, nix; 4030 4031 ASSERT(hp != NULL); 4032 4033 for (;;) { 4034 if ((ix = bp->ix) == FTBLK_EVNTS) { 4035 /* 4036 * Tail doesn't have room, so need a new tail. 4037 * 4038 * To make this MT safe, first, allocate a new 4039 * ftblk, and initialize it. To make life a 4040 * little easier, reserve the first slot (mostly 4041 * by making ix = 1). When we are finished with 4042 * the initialization, CAS this pointer to the 4043 * tail. If this succeeds, this is the new 4044 * "next" block. Otherwise, another thread 4045 * got here first, so free the block and start 4046 * again. 4047 */ 4048 if (!(nbp = kmem_cache_alloc(ftblk_cache, 4049 KM_NOSLEEP))) { 4050 /* no mem, so punt */ 4051 str_ftnever++; 4052 /* free up all flow data? */ 4053 return; 4054 } 4055 nbp->nxt = NULL; 4056 nbp->ix = 1; 4057 /* 4058 * Just in case there is another thread about 4059 * to get the next index, we need to make sure 4060 * the value is there for it. 4061 */ 4062 membar_producer(); 4063 if (casptr(&hp->tail, bp, nbp) == bp) { 4064 /* CAS was successful */ 4065 bp->nxt = nbp; 4066 membar_producer(); 4067 bp = nbp; 4068 ix = 0; 4069 goto cas_good; 4070 } else { 4071 kmem_cache_free(ftblk_cache, nbp); 4072 bp = hp->tail; 4073 continue; 4074 } 4075 } 4076 nix = ix + 1; 4077 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 4078 cas_good: 4079 if (curthread != hp->thread) { 4080 hp->thread = curthread; 4081 evnt |= FTEV_CS; 4082 } 4083 if (CPU->cpu_seqid != hp->cpu_seqid) { 4084 hp->cpu_seqid = CPU->cpu_seqid; 4085 evnt |= FTEV_PS; 4086 } 4087 ep = &bp->ev[ix]; 4088 break; 4089 } 4090 } 4091 4092 if (evnt & FTEV_QMASK) { 4093 queue_t *qp = p; 4094 4095 /* 4096 * It is possible that the module info is broke 4097 * (as is logsubr.c at this comment writing). 4098 * Instead of panicing or doing other unmentionables, 4099 * we shall put a dummy name as the mid, and continue. 4100 */ 4101 if (qp->q_qinfo == NULL) 4102 ep->mid = "NONAME"; 4103 else 4104 ep->mid = qp->q_qinfo->qi_minfo->mi_idname; 4105 4106 if (!(qp->q_flag & QREADR)) 4107 evnt |= FTEV_ISWR; 4108 } else { 4109 ep->mid = (char *)p; 4110 } 4111 4112 ep->ts = gethrtime(); 4113 ep->evnt = evnt; 4114 ep->data = data; 4115 hp->hash = (hp->hash << 9) + hp->hash; 4116 hp->hash += (evnt << 16) | data; 4117 hp->hash += (uintptr_t)ep->mid; 4118 } 4119 4120 /* 4121 * Free flow-trace data. 4122 */ 4123 void 4124 str_ftfree(dblk_t *dbp) 4125 { 4126 fthdr_t *hp = dbp->db_fthdr; 4127 ftblk_t *bp = &hp->first; 4128 ftblk_t *nbp; 4129 4130 if (bp != hp->tail || bp->ix != 0) { 4131 /* 4132 * Clear out the hash, have the tail point to itself, and free 4133 * any continuation blocks. 4134 */ 4135 bp = hp->first.nxt; 4136 hp->tail = &hp->first; 4137 hp->hash = 0; 4138 hp->first.nxt = NULL; 4139 hp->first.ix = 0; 4140 while (bp != NULL) { 4141 nbp = bp->nxt; 4142 kmem_cache_free(ftblk_cache, bp); 4143 bp = nbp; 4144 } 4145 } 4146 kmem_cache_free(fthdr_cache, hp); 4147 dbp->db_fthdr = NULL; 4148 } 4149