1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 /* 25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 */ 28 29 #include <sys/types.h> 30 #include <sys/param.h> 31 #include <sys/thread.h> 32 #include <sys/sysmacros.h> 33 #include <sys/stropts.h> 34 #include <sys/stream.h> 35 #include <sys/strsubr.h> 36 #include <sys/strsun.h> 37 #include <sys/conf.h> 38 #include <sys/debug.h> 39 #include <sys/cmn_err.h> 40 #include <sys/kmem.h> 41 #include <sys/atomic.h> 42 #include <sys/errno.h> 43 #include <sys/vtrace.h> 44 #include <sys/ftrace.h> 45 #include <sys/ontrap.h> 46 #include <sys/multidata.h> 47 #include <sys/multidata_impl.h> 48 #include <sys/sdt.h> 49 #include <sys/strft.h> 50 51 #ifdef DEBUG 52 #include <sys/kmem_impl.h> 53 #endif 54 55 /* 56 * This file contains all the STREAMS utility routines that may 57 * be used by modules and drivers. 58 */ 59 60 /* 61 * STREAMS message allocator: principles of operation 62 * 63 * The streams message allocator consists of all the routines that 64 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 65 * dupb(), freeb() and freemsg(). What follows is a high-level view 66 * of how the allocator works. 67 * 68 * Every streams message consists of one or more mblks, a dblk, and data. 69 * All mblks for all types of messages come from a common mblk_cache. 70 * The dblk and data come in several flavors, depending on how the 71 * message is allocated: 72 * 73 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 74 * fixed-size dblk/data caches. For message sizes that are multiples of 75 * PAGESIZE, dblks are allocated separately from the buffer. 76 * The associated buffer is allocated by the constructor using kmem_alloc(). 77 * For all other message sizes, dblk and its associated data is allocated 78 * as a single contiguous chunk of memory. 79 * Objects in these caches consist of a dblk plus its associated data. 80 * allocb() determines the nearest-size cache by table lookup: 81 * the dblk_cache[] array provides the mapping from size to dblk cache. 82 * 83 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 84 * kmem_alloc()'ing a buffer for the data and supplying that 85 * buffer to gesballoc(), described below. 86 * 87 * (3) The four flavors of [d]esballoc[a] are all implemented by a 88 * common routine, gesballoc() ("generic esballoc"). gesballoc() 89 * allocates a dblk from the global dblk_esb_cache and sets db_base, 90 * db_lim and db_frtnp to describe the caller-supplied buffer. 91 * 92 * While there are several routines to allocate messages, there is only 93 * one routine to free messages: freeb(). freeb() simply invokes the 94 * dblk's free method, dbp->db_free(), which is set at allocation time. 95 * 96 * dupb() creates a new reference to a message by allocating a new mblk, 97 * incrementing the dblk reference count and setting the dblk's free 98 * method to dblk_decref(). The dblk's original free method is retained 99 * in db_lastfree. dblk_decref() decrements the reference count on each 100 * freeb(). If this is not the last reference it just frees the mblk; 101 * if this *is* the last reference, it restores db_free to db_lastfree, 102 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 103 * 104 * The implementation makes aggressive use of kmem object caching for 105 * maximum performance. This makes the code simple and compact, but 106 * also a bit abstruse in some places. The invariants that constitute a 107 * message's constructed state, described below, are more subtle than usual. 108 * 109 * Every dblk has an "attached mblk" as part of its constructed state. 110 * The mblk is allocated by the dblk's constructor and remains attached 111 * until the message is either dup'ed or pulled up. In the dupb() case 112 * the mblk association doesn't matter until the last free, at which time 113 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 114 * the mblk association because it swaps the leading mblks of two messages, 115 * so it is responsible for swapping their db_mblk pointers accordingly. 116 * From a constructed-state viewpoint it doesn't matter that a dblk's 117 * attached mblk can change while the message is allocated; all that 118 * matters is that the dblk has *some* attached mblk when it's freed. 119 * 120 * The sizes of the allocb() small-message caches are not magical. 121 * They represent a good trade-off between internal and external 122 * fragmentation for current workloads. They should be reevaluated 123 * periodically, especially if allocations larger than DBLK_MAX_CACHE 124 * become common. We use 64-byte alignment so that dblks don't 125 * straddle cache lines unnecessarily. 126 */ 127 #define DBLK_MAX_CACHE 73728 128 #define DBLK_CACHE_ALIGN 64 129 #define DBLK_MIN_SIZE 8 130 #define DBLK_SIZE_SHIFT 3 131 132 #ifdef _BIG_ENDIAN 133 #define DBLK_RTFU_SHIFT(field) \ 134 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 135 #else 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 138 #endif 139 140 #define DBLK_RTFU(ref, type, flags, uioflag) \ 141 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 142 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 143 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 144 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 145 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 146 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 148 149 static size_t dblk_sizes[] = { 150 #ifdef _LP64 151 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856, 152 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624, 153 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392, 154 #else 155 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904, 156 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672, 157 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440, 158 #endif 159 DBLK_MAX_CACHE, 0 160 }; 161 162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 163 static struct kmem_cache *mblk_cache; 164 static struct kmem_cache *dblk_esb_cache; 165 static struct kmem_cache *fthdr_cache; 166 static struct kmem_cache *ftblk_cache; 167 168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 169 static mblk_t *allocb_oversize(size_t size, int flags); 170 static int allocb_tryhard_fails; 171 static void frnop_func(void *arg); 172 frtn_t frnop = { frnop_func }; 173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 174 175 static boolean_t rwnext_enter(queue_t *qp); 176 static void rwnext_exit(queue_t *qp); 177 178 /* 179 * Patchable mblk/dblk kmem_cache flags. 180 */ 181 int dblk_kmem_flags = 0; 182 int mblk_kmem_flags = 0; 183 184 static int 185 dblk_constructor(void *buf, void *cdrarg, int kmflags) 186 { 187 dblk_t *dbp = buf; 188 ssize_t msg_size = (ssize_t)cdrarg; 189 size_t index; 190 191 ASSERT(msg_size != 0); 192 193 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 194 195 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 196 197 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 198 return (-1); 199 if ((msg_size & PAGEOFFSET) == 0) { 200 dbp->db_base = kmem_alloc(msg_size, kmflags); 201 if (dbp->db_base == NULL) { 202 kmem_cache_free(mblk_cache, dbp->db_mblk); 203 return (-1); 204 } 205 } else { 206 dbp->db_base = (unsigned char *)&dbp[1]; 207 } 208 209 dbp->db_mblk->b_datap = dbp; 210 dbp->db_cache = dblk_cache[index]; 211 dbp->db_lim = dbp->db_base + msg_size; 212 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 213 dbp->db_frtnp = NULL; 214 dbp->db_fthdr = NULL; 215 dbp->db_credp = NULL; 216 dbp->db_cpid = -1; 217 dbp->db_struioflag = 0; 218 dbp->db_struioun.cksum.flags = 0; 219 return (0); 220 } 221 222 /*ARGSUSED*/ 223 static int 224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 225 { 226 dblk_t *dbp = buf; 227 228 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 229 return (-1); 230 dbp->db_mblk->b_datap = dbp; 231 dbp->db_cache = dblk_esb_cache; 232 dbp->db_fthdr = NULL; 233 dbp->db_credp = NULL; 234 dbp->db_cpid = -1; 235 dbp->db_struioflag = 0; 236 dbp->db_struioun.cksum.flags = 0; 237 return (0); 238 } 239 240 static int 241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 242 { 243 dblk_t *dbp = buf; 244 bcache_t *bcp = cdrarg; 245 246 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 247 return (-1); 248 249 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags); 250 if (dbp->db_base == NULL) { 251 kmem_cache_free(mblk_cache, dbp->db_mblk); 252 return (-1); 253 } 254 255 dbp->db_mblk->b_datap = dbp; 256 dbp->db_cache = (void *)bcp; 257 dbp->db_lim = dbp->db_base + bcp->size; 258 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 259 dbp->db_frtnp = NULL; 260 dbp->db_fthdr = NULL; 261 dbp->db_credp = NULL; 262 dbp->db_cpid = -1; 263 dbp->db_struioflag = 0; 264 dbp->db_struioun.cksum.flags = 0; 265 return (0); 266 } 267 268 /*ARGSUSED*/ 269 static void 270 dblk_destructor(void *buf, void *cdrarg) 271 { 272 dblk_t *dbp = buf; 273 ssize_t msg_size = (ssize_t)cdrarg; 274 275 ASSERT(dbp->db_mblk->b_datap == dbp); 276 ASSERT(msg_size != 0); 277 ASSERT(dbp->db_struioflag == 0); 278 ASSERT(dbp->db_struioun.cksum.flags == 0); 279 280 if ((msg_size & PAGEOFFSET) == 0) { 281 kmem_free(dbp->db_base, msg_size); 282 } 283 284 kmem_cache_free(mblk_cache, dbp->db_mblk); 285 } 286 287 static void 288 bcache_dblk_destructor(void *buf, void *cdrarg) 289 { 290 dblk_t *dbp = buf; 291 bcache_t *bcp = cdrarg; 292 293 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 294 295 ASSERT(dbp->db_mblk->b_datap == dbp); 296 ASSERT(dbp->db_struioflag == 0); 297 ASSERT(dbp->db_struioun.cksum.flags == 0); 298 299 kmem_cache_free(mblk_cache, dbp->db_mblk); 300 } 301 302 /* ARGSUSED */ 303 static int 304 ftblk_constructor(void *buf, void *cdrarg, int kmflags) 305 { 306 ftblk_t *fbp = buf; 307 int i; 308 309 bzero(fbp, sizeof (ftblk_t)); 310 if (str_ftstack != 0) { 311 for (i = 0; i < FTBLK_EVNTS; i++) 312 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags); 313 } 314 315 return (0); 316 } 317 318 /* ARGSUSED */ 319 static void 320 ftblk_destructor(void *buf, void *cdrarg) 321 { 322 ftblk_t *fbp = buf; 323 int i; 324 325 if (str_ftstack != 0) { 326 for (i = 0; i < FTBLK_EVNTS; i++) { 327 if (fbp->ev[i].stk != NULL) { 328 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t)); 329 fbp->ev[i].stk = NULL; 330 } 331 } 332 } 333 } 334 335 static int 336 fthdr_constructor(void *buf, void *cdrarg, int kmflags) 337 { 338 fthdr_t *fhp = buf; 339 340 return (ftblk_constructor(&fhp->first, cdrarg, kmflags)); 341 } 342 343 static void 344 fthdr_destructor(void *buf, void *cdrarg) 345 { 346 fthdr_t *fhp = buf; 347 348 ftblk_destructor(&fhp->first, cdrarg); 349 } 350 351 void 352 streams_msg_init(void) 353 { 354 char name[40]; 355 size_t size; 356 size_t lastsize = DBLK_MIN_SIZE; 357 size_t *sizep; 358 struct kmem_cache *cp; 359 size_t tot_size; 360 int offset; 361 362 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32, 363 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags); 364 365 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 366 367 if ((offset = (size & PAGEOFFSET)) != 0) { 368 /* 369 * We are in the middle of a page, dblk should 370 * be allocated on the same page 371 */ 372 tot_size = size + sizeof (dblk_t); 373 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 374 < PAGESIZE); 375 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 376 377 } else { 378 379 /* 380 * buf size is multiple of page size, dblk and 381 * buffer are allocated separately. 382 */ 383 384 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 385 tot_size = sizeof (dblk_t); 386 } 387 388 (void) sprintf(name, "streams_dblk_%ld", size); 389 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN, 390 dblk_constructor, dblk_destructor, NULL, (void *)(size), 391 NULL, dblk_kmem_flags); 392 393 while (lastsize <= size) { 394 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 395 lastsize += DBLK_MIN_SIZE; 396 } 397 } 398 399 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t), 400 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL, 401 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags); 402 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32, 403 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0); 404 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32, 405 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0); 406 407 /* Initialize Multidata caches */ 408 mmd_init(); 409 410 /* initialize throttling queue for esballoc */ 411 esballoc_queue_init(); 412 } 413 414 /*ARGSUSED*/ 415 mblk_t * 416 allocb(size_t size, uint_t pri) 417 { 418 dblk_t *dbp; 419 mblk_t *mp; 420 size_t index; 421 422 index = (size - 1) >> DBLK_SIZE_SHIFT; 423 424 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 425 if (size != 0) { 426 mp = allocb_oversize(size, KM_NOSLEEP); 427 goto out; 428 } 429 index = 0; 430 } 431 432 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 433 mp = NULL; 434 goto out; 435 } 436 437 mp = dbp->db_mblk; 438 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 439 mp->b_next = mp->b_prev = mp->b_cont = NULL; 440 mp->b_rptr = mp->b_wptr = dbp->db_base; 441 mp->b_queue = NULL; 442 MBLK_BAND_FLAG_WORD(mp) = 0; 443 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 444 out: 445 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 446 447 return (mp); 448 } 449 450 mblk_t * 451 allocb_tmpl(size_t size, const mblk_t *tmpl) 452 { 453 mblk_t *mp = allocb(size, 0); 454 455 if (mp != NULL) { 456 cred_t *cr = DB_CRED(tmpl); 457 if (cr != NULL) 458 crhold(mp->b_datap->db_credp = cr); 459 DB_CPID(mp) = DB_CPID(tmpl); 460 DB_TYPE(mp) = DB_TYPE(tmpl); 461 } 462 return (mp); 463 } 464 465 mblk_t * 466 allocb_cred(size_t size, cred_t *cr) 467 { 468 mblk_t *mp = allocb(size, 0); 469 470 if (mp != NULL && cr != NULL) 471 crhold(mp->b_datap->db_credp = cr); 472 473 return (mp); 474 } 475 476 mblk_t * 477 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr) 478 { 479 mblk_t *mp = allocb_wait(size, 0, flags, error); 480 481 if (mp != NULL && cr != NULL) 482 crhold(mp->b_datap->db_credp = cr); 483 484 return (mp); 485 } 486 487 void 488 freeb(mblk_t *mp) 489 { 490 dblk_t *dbp = mp->b_datap; 491 492 ASSERT(dbp->db_ref > 0); 493 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 494 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 495 496 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 497 498 dbp->db_free(mp, dbp); 499 } 500 501 void 502 freemsg(mblk_t *mp) 503 { 504 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 505 while (mp) { 506 dblk_t *dbp = mp->b_datap; 507 mblk_t *mp_cont = mp->b_cont; 508 509 ASSERT(dbp->db_ref > 0); 510 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 511 512 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 513 514 dbp->db_free(mp, dbp); 515 mp = mp_cont; 516 } 517 } 518 519 /* 520 * Reallocate a block for another use. Try hard to use the old block. 521 * If the old data is wanted (copy), leave b_wptr at the end of the data, 522 * otherwise return b_wptr = b_rptr. 523 * 524 * This routine is private and unstable. 525 */ 526 mblk_t * 527 reallocb(mblk_t *mp, size_t size, uint_t copy) 528 { 529 mblk_t *mp1; 530 unsigned char *old_rptr; 531 ptrdiff_t cur_size; 532 533 if (mp == NULL) 534 return (allocb(size, BPRI_HI)); 535 536 cur_size = mp->b_wptr - mp->b_rptr; 537 old_rptr = mp->b_rptr; 538 539 ASSERT(mp->b_datap->db_ref != 0); 540 541 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 542 /* 543 * If the data is wanted and it will fit where it is, no 544 * work is required. 545 */ 546 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 547 return (mp); 548 549 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 550 mp1 = mp; 551 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 552 /* XXX other mp state could be copied too, db_flags ... ? */ 553 mp1->b_cont = mp->b_cont; 554 } else { 555 return (NULL); 556 } 557 558 if (copy) { 559 bcopy(old_rptr, mp1->b_rptr, cur_size); 560 mp1->b_wptr = mp1->b_rptr + cur_size; 561 } 562 563 if (mp != mp1) 564 freeb(mp); 565 566 return (mp1); 567 } 568 569 static void 570 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 571 { 572 ASSERT(dbp->db_mblk == mp); 573 if (dbp->db_fthdr != NULL) 574 str_ftfree(dbp); 575 576 /* set credp and projid to be 'unspecified' before returning to cache */ 577 if (dbp->db_credp != NULL) { 578 crfree(dbp->db_credp); 579 dbp->db_credp = NULL; 580 } 581 dbp->db_cpid = -1; 582 583 /* Reset the struioflag and the checksum flag fields */ 584 dbp->db_struioflag = 0; 585 dbp->db_struioun.cksum.flags = 0; 586 587 /* and the COOKED and/or UIOA flag(s) */ 588 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA); 589 590 kmem_cache_free(dbp->db_cache, dbp); 591 } 592 593 static void 594 dblk_decref(mblk_t *mp, dblk_t *dbp) 595 { 596 if (dbp->db_ref != 1) { 597 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 598 -(1 << DBLK_RTFU_SHIFT(db_ref))); 599 /* 600 * atomic_add_32_nv() just decremented db_ref, so we no longer 601 * have a reference to the dblk, which means another thread 602 * could free it. Therefore we cannot examine the dblk to 603 * determine whether ours was the last reference. Instead, 604 * we extract the new and minimum reference counts from rtfu. 605 * Note that all we're really saying is "if (ref != refmin)". 606 */ 607 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 608 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 609 kmem_cache_free(mblk_cache, mp); 610 return; 611 } 612 } 613 dbp->db_mblk = mp; 614 dbp->db_free = dbp->db_lastfree; 615 dbp->db_lastfree(mp, dbp); 616 } 617 618 mblk_t * 619 dupb(mblk_t *mp) 620 { 621 dblk_t *dbp = mp->b_datap; 622 mblk_t *new_mp; 623 uint32_t oldrtfu, newrtfu; 624 625 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 626 goto out; 627 628 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 629 new_mp->b_rptr = mp->b_rptr; 630 new_mp->b_wptr = mp->b_wptr; 631 new_mp->b_datap = dbp; 632 new_mp->b_queue = NULL; 633 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 634 635 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 636 637 dbp->db_free = dblk_decref; 638 do { 639 ASSERT(dbp->db_ref > 0); 640 oldrtfu = DBLK_RTFU_WORD(dbp); 641 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 642 /* 643 * If db_ref is maxed out we can't dup this message anymore. 644 */ 645 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 646 kmem_cache_free(mblk_cache, new_mp); 647 new_mp = NULL; 648 goto out; 649 } 650 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 651 652 out: 653 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 654 return (new_mp); 655 } 656 657 static void 658 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 659 { 660 frtn_t *frp = dbp->db_frtnp; 661 662 ASSERT(dbp->db_mblk == mp); 663 frp->free_func(frp->free_arg); 664 if (dbp->db_fthdr != NULL) 665 str_ftfree(dbp); 666 667 /* set credp and projid to be 'unspecified' before returning to cache */ 668 if (dbp->db_credp != NULL) { 669 crfree(dbp->db_credp); 670 dbp->db_credp = NULL; 671 } 672 dbp->db_cpid = -1; 673 dbp->db_struioflag = 0; 674 dbp->db_struioun.cksum.flags = 0; 675 676 kmem_cache_free(dbp->db_cache, dbp); 677 } 678 679 /*ARGSUSED*/ 680 static void 681 frnop_func(void *arg) 682 { 683 } 684 685 /* 686 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 687 */ 688 static mblk_t * 689 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 690 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 691 { 692 dblk_t *dbp; 693 mblk_t *mp; 694 695 ASSERT(base != NULL && frp != NULL); 696 697 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 698 mp = NULL; 699 goto out; 700 } 701 702 mp = dbp->db_mblk; 703 dbp->db_base = base; 704 dbp->db_lim = base + size; 705 dbp->db_free = dbp->db_lastfree = lastfree; 706 dbp->db_frtnp = frp; 707 DBLK_RTFU_WORD(dbp) = db_rtfu; 708 mp->b_next = mp->b_prev = mp->b_cont = NULL; 709 mp->b_rptr = mp->b_wptr = base; 710 mp->b_queue = NULL; 711 MBLK_BAND_FLAG_WORD(mp) = 0; 712 713 out: 714 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 715 return (mp); 716 } 717 718 /*ARGSUSED*/ 719 mblk_t * 720 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 721 { 722 mblk_t *mp; 723 724 /* 725 * Note that this is structured to allow the common case (i.e. 726 * STREAMS flowtracing disabled) to call gesballoc() with tail 727 * call optimization. 728 */ 729 if (!str_ftnever) { 730 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 731 frp, freebs_enqueue, KM_NOSLEEP); 732 733 if (mp != NULL) 734 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 735 return (mp); 736 } 737 738 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 739 frp, freebs_enqueue, KM_NOSLEEP)); 740 } 741 742 /* 743 * Same as esballoc() but sleeps waiting for memory. 744 */ 745 /*ARGSUSED*/ 746 mblk_t * 747 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 748 { 749 mblk_t *mp; 750 751 /* 752 * Note that this is structured to allow the common case (i.e. 753 * STREAMS flowtracing disabled) to call gesballoc() with tail 754 * call optimization. 755 */ 756 if (!str_ftnever) { 757 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 758 frp, freebs_enqueue, KM_SLEEP); 759 760 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 761 return (mp); 762 } 763 764 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 765 frp, freebs_enqueue, KM_SLEEP)); 766 } 767 768 /*ARGSUSED*/ 769 mblk_t * 770 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 771 { 772 mblk_t *mp; 773 774 /* 775 * Note that this is structured to allow the common case (i.e. 776 * STREAMS flowtracing disabled) to call gesballoc() with tail 777 * call optimization. 778 */ 779 if (!str_ftnever) { 780 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 781 frp, dblk_lastfree_desb, KM_NOSLEEP); 782 783 if (mp != NULL) 784 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 785 return (mp); 786 } 787 788 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 789 frp, dblk_lastfree_desb, KM_NOSLEEP)); 790 } 791 792 /*ARGSUSED*/ 793 mblk_t * 794 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 795 { 796 mblk_t *mp; 797 798 /* 799 * Note that this is structured to allow the common case (i.e. 800 * STREAMS flowtracing disabled) to call gesballoc() with tail 801 * call optimization. 802 */ 803 if (!str_ftnever) { 804 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 805 frp, freebs_enqueue, KM_NOSLEEP); 806 807 if (mp != NULL) 808 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 809 return (mp); 810 } 811 812 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 813 frp, freebs_enqueue, KM_NOSLEEP)); 814 } 815 816 /*ARGSUSED*/ 817 mblk_t * 818 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 819 { 820 mblk_t *mp; 821 822 /* 823 * Note that this is structured to allow the common case (i.e. 824 * STREAMS flowtracing disabled) to call gesballoc() with tail 825 * call optimization. 826 */ 827 if (!str_ftnever) { 828 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 829 frp, dblk_lastfree_desb, KM_NOSLEEP); 830 831 if (mp != NULL) 832 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 833 return (mp); 834 } 835 836 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 837 frp, dblk_lastfree_desb, KM_NOSLEEP)); 838 } 839 840 static void 841 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 842 { 843 bcache_t *bcp = dbp->db_cache; 844 845 ASSERT(dbp->db_mblk == mp); 846 if (dbp->db_fthdr != NULL) 847 str_ftfree(dbp); 848 849 /* set credp and projid to be 'unspecified' before returning to cache */ 850 if (dbp->db_credp != NULL) { 851 crfree(dbp->db_credp); 852 dbp->db_credp = NULL; 853 } 854 dbp->db_cpid = -1; 855 dbp->db_struioflag = 0; 856 dbp->db_struioun.cksum.flags = 0; 857 858 mutex_enter(&bcp->mutex); 859 kmem_cache_free(bcp->dblk_cache, dbp); 860 bcp->alloc--; 861 862 if (bcp->alloc == 0 && bcp->destroy != 0) { 863 kmem_cache_destroy(bcp->dblk_cache); 864 kmem_cache_destroy(bcp->buffer_cache); 865 mutex_exit(&bcp->mutex); 866 mutex_destroy(&bcp->mutex); 867 kmem_free(bcp, sizeof (bcache_t)); 868 } else { 869 mutex_exit(&bcp->mutex); 870 } 871 } 872 873 bcache_t * 874 bcache_create(char *name, size_t size, uint_t align) 875 { 876 bcache_t *bcp; 877 char buffer[255]; 878 879 ASSERT((align & (align - 1)) == 0); 880 881 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL) 882 return (NULL); 883 884 bcp->size = size; 885 bcp->align = align; 886 bcp->alloc = 0; 887 bcp->destroy = 0; 888 889 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 890 891 (void) sprintf(buffer, "%s_buffer_cache", name); 892 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 893 NULL, NULL, NULL, 0); 894 (void) sprintf(buffer, "%s_dblk_cache", name); 895 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 896 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 897 NULL, (void *)bcp, NULL, 0); 898 899 return (bcp); 900 } 901 902 void 903 bcache_destroy(bcache_t *bcp) 904 { 905 ASSERT(bcp != NULL); 906 907 mutex_enter(&bcp->mutex); 908 if (bcp->alloc == 0) { 909 kmem_cache_destroy(bcp->dblk_cache); 910 kmem_cache_destroy(bcp->buffer_cache); 911 mutex_exit(&bcp->mutex); 912 mutex_destroy(&bcp->mutex); 913 kmem_free(bcp, sizeof (bcache_t)); 914 } else { 915 bcp->destroy++; 916 mutex_exit(&bcp->mutex); 917 } 918 } 919 920 /*ARGSUSED*/ 921 mblk_t * 922 bcache_allocb(bcache_t *bcp, uint_t pri) 923 { 924 dblk_t *dbp; 925 mblk_t *mp = NULL; 926 927 ASSERT(bcp != NULL); 928 929 mutex_enter(&bcp->mutex); 930 if (bcp->destroy != 0) { 931 mutex_exit(&bcp->mutex); 932 goto out; 933 } 934 935 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 936 mutex_exit(&bcp->mutex); 937 goto out; 938 } 939 bcp->alloc++; 940 mutex_exit(&bcp->mutex); 941 942 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 943 944 mp = dbp->db_mblk; 945 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 946 mp->b_next = mp->b_prev = mp->b_cont = NULL; 947 mp->b_rptr = mp->b_wptr = dbp->db_base; 948 mp->b_queue = NULL; 949 MBLK_BAND_FLAG_WORD(mp) = 0; 950 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 951 out: 952 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 953 954 return (mp); 955 } 956 957 static void 958 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 959 { 960 ASSERT(dbp->db_mblk == mp); 961 if (dbp->db_fthdr != NULL) 962 str_ftfree(dbp); 963 964 /* set credp and projid to be 'unspecified' before returning to cache */ 965 if (dbp->db_credp != NULL) { 966 crfree(dbp->db_credp); 967 dbp->db_credp = NULL; 968 } 969 dbp->db_cpid = -1; 970 dbp->db_struioflag = 0; 971 dbp->db_struioun.cksum.flags = 0; 972 973 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 974 kmem_cache_free(dbp->db_cache, dbp); 975 } 976 977 static mblk_t * 978 allocb_oversize(size_t size, int kmflags) 979 { 980 mblk_t *mp; 981 void *buf; 982 983 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 984 if ((buf = kmem_alloc(size, kmflags)) == NULL) 985 return (NULL); 986 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 987 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 988 kmem_free(buf, size); 989 990 if (mp != NULL) 991 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 992 993 return (mp); 994 } 995 996 mblk_t * 997 allocb_tryhard(size_t target_size) 998 { 999 size_t size; 1000 mblk_t *bp; 1001 1002 for (size = target_size; size < target_size + 512; 1003 size += DBLK_CACHE_ALIGN) 1004 if ((bp = allocb(size, BPRI_HI)) != NULL) 1005 return (bp); 1006 allocb_tryhard_fails++; 1007 return (NULL); 1008 } 1009 1010 /* 1011 * This routine is consolidation private for STREAMS internal use 1012 * This routine may only be called from sync routines (i.e., not 1013 * from put or service procedures). It is located here (rather 1014 * than strsubr.c) so that we don't have to expose all of the 1015 * allocb() implementation details in header files. 1016 */ 1017 mblk_t * 1018 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 1019 { 1020 dblk_t *dbp; 1021 mblk_t *mp; 1022 size_t index; 1023 1024 index = (size -1) >> DBLK_SIZE_SHIFT; 1025 1026 if (flags & STR_NOSIG) { 1027 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 1028 if (size != 0) { 1029 mp = allocb_oversize(size, KM_SLEEP); 1030 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1031 (uintptr_t)mp); 1032 return (mp); 1033 } 1034 index = 0; 1035 } 1036 1037 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1038 mp = dbp->db_mblk; 1039 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1040 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1041 mp->b_rptr = mp->b_wptr = dbp->db_base; 1042 mp->b_queue = NULL; 1043 MBLK_BAND_FLAG_WORD(mp) = 0; 1044 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1045 1046 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1047 1048 } else { 1049 while ((mp = allocb(size, pri)) == NULL) { 1050 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1051 return (NULL); 1052 } 1053 } 1054 1055 return (mp); 1056 } 1057 1058 /* 1059 * Call function 'func' with 'arg' when a class zero block can 1060 * be allocated with priority 'pri'. 1061 */ 1062 bufcall_id_t 1063 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1064 { 1065 return (bufcall(1, pri, func, arg)); 1066 } 1067 1068 /* 1069 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1070 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1071 * This provides consistency for all internal allocators of ioctl. 1072 */ 1073 mblk_t * 1074 mkiocb(uint_t cmd) 1075 { 1076 struct iocblk *ioc; 1077 mblk_t *mp; 1078 1079 /* 1080 * Allocate enough space for any of the ioctl related messages. 1081 */ 1082 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1083 return (NULL); 1084 1085 bzero(mp->b_rptr, sizeof (union ioctypes)); 1086 1087 /* 1088 * Set the mblk_t information and ptrs correctly. 1089 */ 1090 mp->b_wptr += sizeof (struct iocblk); 1091 mp->b_datap->db_type = M_IOCTL; 1092 1093 /* 1094 * Fill in the fields. 1095 */ 1096 ioc = (struct iocblk *)mp->b_rptr; 1097 ioc->ioc_cmd = cmd; 1098 ioc->ioc_cr = kcred; 1099 ioc->ioc_id = getiocseqno(); 1100 ioc->ioc_flag = IOC_NATIVE; 1101 return (mp); 1102 } 1103 1104 /* 1105 * test if block of given size can be allocated with a request of 1106 * the given priority. 1107 * 'pri' is no longer used, but is retained for compatibility. 1108 */ 1109 /* ARGSUSED */ 1110 int 1111 testb(size_t size, uint_t pri) 1112 { 1113 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1114 } 1115 1116 /* 1117 * Call function 'func' with argument 'arg' when there is a reasonably 1118 * good chance that a block of size 'size' can be allocated. 1119 * 'pri' is no longer used, but is retained for compatibility. 1120 */ 1121 /* ARGSUSED */ 1122 bufcall_id_t 1123 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1124 { 1125 static long bid = 1; /* always odd to save checking for zero */ 1126 bufcall_id_t bc_id; 1127 struct strbufcall *bcp; 1128 1129 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1130 return (0); 1131 1132 bcp->bc_func = func; 1133 bcp->bc_arg = arg; 1134 bcp->bc_size = size; 1135 bcp->bc_next = NULL; 1136 bcp->bc_executor = NULL; 1137 1138 mutex_enter(&strbcall_lock); 1139 /* 1140 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1141 * should be no references to bcp since it may be freed by 1142 * runbufcalls(). Since bcp_id field is returned, we save its value in 1143 * the local var. 1144 */ 1145 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1146 1147 /* 1148 * add newly allocated stream event to existing 1149 * linked list of events. 1150 */ 1151 if (strbcalls.bc_head == NULL) { 1152 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1153 } else { 1154 strbcalls.bc_tail->bc_next = bcp; 1155 strbcalls.bc_tail = bcp; 1156 } 1157 1158 cv_signal(&strbcall_cv); 1159 mutex_exit(&strbcall_lock); 1160 return (bc_id); 1161 } 1162 1163 /* 1164 * Cancel a bufcall request. 1165 */ 1166 void 1167 unbufcall(bufcall_id_t id) 1168 { 1169 strbufcall_t *bcp, *pbcp; 1170 1171 mutex_enter(&strbcall_lock); 1172 again: 1173 pbcp = NULL; 1174 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1175 if (id == bcp->bc_id) 1176 break; 1177 pbcp = bcp; 1178 } 1179 if (bcp) { 1180 if (bcp->bc_executor != NULL) { 1181 if (bcp->bc_executor != curthread) { 1182 cv_wait(&bcall_cv, &strbcall_lock); 1183 goto again; 1184 } 1185 } else { 1186 if (pbcp) 1187 pbcp->bc_next = bcp->bc_next; 1188 else 1189 strbcalls.bc_head = bcp->bc_next; 1190 if (bcp == strbcalls.bc_tail) 1191 strbcalls.bc_tail = pbcp; 1192 kmem_free(bcp, sizeof (strbufcall_t)); 1193 } 1194 } 1195 mutex_exit(&strbcall_lock); 1196 } 1197 1198 /* 1199 * Duplicate a message block by block (uses dupb), returning 1200 * a pointer to the duplicate message. 1201 * Returns a non-NULL value only if the entire message 1202 * was dup'd. 1203 */ 1204 mblk_t * 1205 dupmsg(mblk_t *bp) 1206 { 1207 mblk_t *head, *nbp; 1208 1209 if (!bp || !(nbp = head = dupb(bp))) 1210 return (NULL); 1211 1212 while (bp->b_cont) { 1213 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1214 freemsg(head); 1215 return (NULL); 1216 } 1217 nbp = nbp->b_cont; 1218 bp = bp->b_cont; 1219 } 1220 return (head); 1221 } 1222 1223 #define DUPB_NOLOAN(bp) \ 1224 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1225 copyb((bp)) : dupb((bp))) 1226 1227 mblk_t * 1228 dupmsg_noloan(mblk_t *bp) 1229 { 1230 mblk_t *head, *nbp; 1231 1232 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1233 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1234 return (NULL); 1235 1236 while (bp->b_cont) { 1237 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1238 freemsg(head); 1239 return (NULL); 1240 } 1241 nbp = nbp->b_cont; 1242 bp = bp->b_cont; 1243 } 1244 return (head); 1245 } 1246 1247 /* 1248 * Copy data from message and data block to newly allocated message and 1249 * data block. Returns new message block pointer, or NULL if error. 1250 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1251 * as in the original even when db_base is not word aligned. (bug 1052877) 1252 */ 1253 mblk_t * 1254 copyb(mblk_t *bp) 1255 { 1256 mblk_t *nbp; 1257 dblk_t *dp, *ndp; 1258 uchar_t *base; 1259 size_t size; 1260 size_t unaligned; 1261 1262 ASSERT(bp->b_wptr >= bp->b_rptr); 1263 1264 dp = bp->b_datap; 1265 if (dp->db_fthdr != NULL) 1266 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1267 1268 /* 1269 * Special handling for Multidata message; this should be 1270 * removed once a copy-callback routine is made available. 1271 */ 1272 if (dp->db_type == M_MULTIDATA) { 1273 cred_t *cr; 1274 1275 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1276 return (NULL); 1277 1278 nbp->b_flag = bp->b_flag; 1279 nbp->b_band = bp->b_band; 1280 ndp = nbp->b_datap; 1281 1282 /* See comments below on potential issues. */ 1283 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1284 1285 ASSERT(ndp->db_type == dp->db_type); 1286 cr = dp->db_credp; 1287 if (cr != NULL) 1288 crhold(ndp->db_credp = cr); 1289 ndp->db_cpid = dp->db_cpid; 1290 return (nbp); 1291 } 1292 1293 size = dp->db_lim - dp->db_base; 1294 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1295 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1296 return (NULL); 1297 nbp->b_flag = bp->b_flag; 1298 nbp->b_band = bp->b_band; 1299 ndp = nbp->b_datap; 1300 1301 /* 1302 * Well, here is a potential issue. If we are trying to 1303 * trace a flow, and we copy the message, we might lose 1304 * information about where this message might have been. 1305 * So we should inherit the FT data. On the other hand, 1306 * a user might be interested only in alloc to free data. 1307 * So I guess the real answer is to provide a tunable. 1308 */ 1309 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1310 1311 base = ndp->db_base + unaligned; 1312 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1313 1314 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1315 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1316 1317 return (nbp); 1318 } 1319 1320 /* 1321 * Copy data from message to newly allocated message using new 1322 * data blocks. Returns a pointer to the new message, or NULL if error. 1323 */ 1324 mblk_t * 1325 copymsg(mblk_t *bp) 1326 { 1327 mblk_t *head, *nbp; 1328 1329 if (!bp || !(nbp = head = copyb(bp))) 1330 return (NULL); 1331 1332 while (bp->b_cont) { 1333 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1334 freemsg(head); 1335 return (NULL); 1336 } 1337 nbp = nbp->b_cont; 1338 bp = bp->b_cont; 1339 } 1340 return (head); 1341 } 1342 1343 /* 1344 * link a message block to tail of message 1345 */ 1346 void 1347 linkb(mblk_t *mp, mblk_t *bp) 1348 { 1349 ASSERT(mp && bp); 1350 1351 for (; mp->b_cont; mp = mp->b_cont) 1352 ; 1353 mp->b_cont = bp; 1354 } 1355 1356 /* 1357 * unlink a message block from head of message 1358 * return pointer to new message. 1359 * NULL if message becomes empty. 1360 */ 1361 mblk_t * 1362 unlinkb(mblk_t *bp) 1363 { 1364 mblk_t *bp1; 1365 1366 bp1 = bp->b_cont; 1367 bp->b_cont = NULL; 1368 return (bp1); 1369 } 1370 1371 /* 1372 * remove a message block "bp" from message "mp" 1373 * 1374 * Return pointer to new message or NULL if no message remains. 1375 * Return -1 if bp is not found in message. 1376 */ 1377 mblk_t * 1378 rmvb(mblk_t *mp, mblk_t *bp) 1379 { 1380 mblk_t *tmp; 1381 mblk_t *lastp = NULL; 1382 1383 ASSERT(mp && bp); 1384 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1385 if (tmp == bp) { 1386 if (lastp) 1387 lastp->b_cont = tmp->b_cont; 1388 else 1389 mp = tmp->b_cont; 1390 tmp->b_cont = NULL; 1391 return (mp); 1392 } 1393 lastp = tmp; 1394 } 1395 return ((mblk_t *)-1); 1396 } 1397 1398 /* 1399 * Concatenate and align first len bytes of common 1400 * message type. Len == -1, means concat everything. 1401 * Returns 1 on success, 0 on failure 1402 * After the pullup, mp points to the pulled up data. 1403 */ 1404 int 1405 pullupmsg(mblk_t *mp, ssize_t len) 1406 { 1407 mblk_t *bp, *b_cont; 1408 dblk_t *dbp; 1409 ssize_t n; 1410 1411 ASSERT(mp->b_datap->db_ref > 0); 1412 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1413 1414 /* 1415 * We won't handle Multidata message, since it contains 1416 * metadata which this function has no knowledge of; we 1417 * assert on DEBUG, and return failure otherwise. 1418 */ 1419 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1420 if (mp->b_datap->db_type == M_MULTIDATA) 1421 return (0); 1422 1423 if (len == -1) { 1424 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1425 return (1); 1426 len = xmsgsize(mp); 1427 } else { 1428 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1429 ASSERT(first_mblk_len >= 0); 1430 /* 1431 * If the length is less than that of the first mblk, 1432 * we want to pull up the message into an aligned mblk. 1433 * Though not part of the spec, some callers assume it. 1434 */ 1435 if (len <= first_mblk_len) { 1436 if (str_aligned(mp->b_rptr)) 1437 return (1); 1438 len = first_mblk_len; 1439 } else if (xmsgsize(mp) < len) 1440 return (0); 1441 } 1442 1443 if ((bp = allocb_tmpl(len, mp)) == NULL) 1444 return (0); 1445 1446 dbp = bp->b_datap; 1447 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1448 mp->b_datap = dbp; /* ... and mp heads the new message */ 1449 mp->b_datap->db_mblk = mp; 1450 bp->b_datap->db_mblk = bp; 1451 mp->b_rptr = mp->b_wptr = dbp->db_base; 1452 1453 do { 1454 ASSERT(bp->b_datap->db_ref > 0); 1455 ASSERT(bp->b_wptr >= bp->b_rptr); 1456 n = MIN(bp->b_wptr - bp->b_rptr, len); 1457 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1458 mp->b_wptr += n; 1459 bp->b_rptr += n; 1460 len -= n; 1461 if (bp->b_rptr != bp->b_wptr) 1462 break; 1463 b_cont = bp->b_cont; 1464 freeb(bp); 1465 bp = b_cont; 1466 } while (len && bp); 1467 1468 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1469 1470 return (1); 1471 } 1472 1473 /* 1474 * Concatenate and align at least the first len bytes of common message 1475 * type. Len == -1 means concatenate everything. The original message is 1476 * unaltered. Returns a pointer to a new message on success, otherwise 1477 * returns NULL. 1478 */ 1479 mblk_t * 1480 msgpullup(mblk_t *mp, ssize_t len) 1481 { 1482 mblk_t *newmp; 1483 ssize_t totlen; 1484 ssize_t n; 1485 1486 /* 1487 * We won't handle Multidata message, since it contains 1488 * metadata which this function has no knowledge of; we 1489 * assert on DEBUG, and return failure otherwise. 1490 */ 1491 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1492 if (mp->b_datap->db_type == M_MULTIDATA) 1493 return (NULL); 1494 1495 totlen = xmsgsize(mp); 1496 1497 if ((len > 0) && (len > totlen)) 1498 return (NULL); 1499 1500 /* 1501 * Copy all of the first msg type into one new mblk, then dupmsg 1502 * and link the rest onto this. 1503 */ 1504 1505 len = totlen; 1506 1507 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1508 return (NULL); 1509 1510 newmp->b_flag = mp->b_flag; 1511 newmp->b_band = mp->b_band; 1512 1513 while (len > 0) { 1514 n = mp->b_wptr - mp->b_rptr; 1515 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1516 if (n > 0) 1517 bcopy(mp->b_rptr, newmp->b_wptr, n); 1518 newmp->b_wptr += n; 1519 len -= n; 1520 mp = mp->b_cont; 1521 } 1522 1523 if (mp != NULL) { 1524 newmp->b_cont = dupmsg(mp); 1525 if (newmp->b_cont == NULL) { 1526 freemsg(newmp); 1527 return (NULL); 1528 } 1529 } 1530 1531 return (newmp); 1532 } 1533 1534 /* 1535 * Trim bytes from message 1536 * len > 0, trim from head 1537 * len < 0, trim from tail 1538 * Returns 1 on success, 0 on failure. 1539 */ 1540 int 1541 adjmsg(mblk_t *mp, ssize_t len) 1542 { 1543 mblk_t *bp; 1544 mblk_t *save_bp = NULL; 1545 mblk_t *prev_bp; 1546 mblk_t *bcont; 1547 unsigned char type; 1548 ssize_t n; 1549 int fromhead; 1550 int first; 1551 1552 ASSERT(mp != NULL); 1553 /* 1554 * We won't handle Multidata message, since it contains 1555 * metadata which this function has no knowledge of; we 1556 * assert on DEBUG, and return failure otherwise. 1557 */ 1558 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1559 if (mp->b_datap->db_type == M_MULTIDATA) 1560 return (0); 1561 1562 if (len < 0) { 1563 fromhead = 0; 1564 len = -len; 1565 } else { 1566 fromhead = 1; 1567 } 1568 1569 if (xmsgsize(mp) < len) 1570 return (0); 1571 1572 if (fromhead) { 1573 first = 1; 1574 while (len) { 1575 ASSERT(mp->b_wptr >= mp->b_rptr); 1576 n = MIN(mp->b_wptr - mp->b_rptr, len); 1577 mp->b_rptr += n; 1578 len -= n; 1579 1580 /* 1581 * If this is not the first zero length 1582 * message remove it 1583 */ 1584 if (!first && (mp->b_wptr == mp->b_rptr)) { 1585 bcont = mp->b_cont; 1586 freeb(mp); 1587 mp = save_bp->b_cont = bcont; 1588 } else { 1589 save_bp = mp; 1590 mp = mp->b_cont; 1591 } 1592 first = 0; 1593 } 1594 } else { 1595 type = mp->b_datap->db_type; 1596 while (len) { 1597 bp = mp; 1598 save_bp = NULL; 1599 1600 /* 1601 * Find the last message of same type 1602 */ 1603 while (bp && bp->b_datap->db_type == type) { 1604 ASSERT(bp->b_wptr >= bp->b_rptr); 1605 prev_bp = save_bp; 1606 save_bp = bp; 1607 bp = bp->b_cont; 1608 } 1609 if (save_bp == NULL) 1610 break; 1611 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1612 save_bp->b_wptr -= n; 1613 len -= n; 1614 1615 /* 1616 * If this is not the first message 1617 * and we have taken away everything 1618 * from this message, remove it 1619 */ 1620 1621 if ((save_bp != mp) && 1622 (save_bp->b_wptr == save_bp->b_rptr)) { 1623 bcont = save_bp->b_cont; 1624 freeb(save_bp); 1625 prev_bp->b_cont = bcont; 1626 } 1627 } 1628 } 1629 return (1); 1630 } 1631 1632 /* 1633 * get number of data bytes in message 1634 */ 1635 size_t 1636 msgdsize(mblk_t *bp) 1637 { 1638 size_t count = 0; 1639 1640 for (; bp; bp = bp->b_cont) 1641 if (bp->b_datap->db_type == M_DATA) { 1642 ASSERT(bp->b_wptr >= bp->b_rptr); 1643 count += bp->b_wptr - bp->b_rptr; 1644 } 1645 return (count); 1646 } 1647 1648 /* 1649 * Get a message off head of queue 1650 * 1651 * If queue has no buffers then mark queue 1652 * with QWANTR. (queue wants to be read by 1653 * someone when data becomes available) 1654 * 1655 * If there is something to take off then do so. 1656 * If queue falls below hi water mark turn off QFULL 1657 * flag. Decrement weighted count of queue. 1658 * Also turn off QWANTR because queue is being read. 1659 * 1660 * The queue count is maintained on a per-band basis. 1661 * Priority band 0 (normal messages) uses q_count, 1662 * q_lowat, etc. Non-zero priority bands use the 1663 * fields in their respective qband structures 1664 * (qb_count, qb_lowat, etc.) All messages appear 1665 * on the same list, linked via their b_next pointers. 1666 * q_first is the head of the list. q_count does 1667 * not reflect the size of all the messages on the 1668 * queue. It only reflects those messages in the 1669 * normal band of flow. The one exception to this 1670 * deals with high priority messages. They are in 1671 * their own conceptual "band", but are accounted 1672 * against q_count. 1673 * 1674 * If queue count is below the lo water mark and QWANTW 1675 * is set, enable the closest backq which has a service 1676 * procedure and turn off the QWANTW flag. 1677 * 1678 * getq could be built on top of rmvq, but isn't because 1679 * of performance considerations. 1680 * 1681 * A note on the use of q_count and q_mblkcnt: 1682 * q_count is the traditional byte count for messages that 1683 * have been put on a queue. Documentation tells us that 1684 * we shouldn't rely on that count, but some drivers/modules 1685 * do. What was needed, however, is a mechanism to prevent 1686 * runaway streams from consuming all of the resources, 1687 * and particularly be able to flow control zero-length 1688 * messages. q_mblkcnt is used for this purpose. It 1689 * counts the number of mblk's that are being put on 1690 * the queue. The intention here, is that each mblk should 1691 * contain one byte of data and, for the purpose of 1692 * flow-control, logically does. A queue will become 1693 * full when EITHER of these values (q_count and q_mblkcnt) 1694 * reach the highwater mark. It will clear when BOTH 1695 * of them drop below the highwater mark. And it will 1696 * backenable when BOTH of them drop below the lowwater 1697 * mark. 1698 * With this algorithm, a driver/module might be able 1699 * to find a reasonably accurate q_count, and the 1700 * framework can still try and limit resource usage. 1701 */ 1702 mblk_t * 1703 getq(queue_t *q) 1704 { 1705 mblk_t *bp; 1706 uchar_t band = 0; 1707 1708 bp = getq_noenab(q, 0); 1709 if (bp != NULL) 1710 band = bp->b_band; 1711 1712 /* 1713 * Inlined from qbackenable(). 1714 * Quick check without holding the lock. 1715 */ 1716 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1717 return (bp); 1718 1719 qbackenable(q, band); 1720 return (bp); 1721 } 1722 1723 /* 1724 * Calculate number of data bytes in a single data message block taking 1725 * multidata messages into account. 1726 */ 1727 1728 #define ADD_MBLK_SIZE(mp, size) \ 1729 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1730 (size) += MBLKL(mp); \ 1731 } else { \ 1732 uint_t pinuse; \ 1733 \ 1734 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1735 (size) += pinuse; \ 1736 } 1737 1738 /* 1739 * Returns the number of bytes in a message (a message is defined as a 1740 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we 1741 * also return the number of distinct mblks in the message. 1742 */ 1743 int 1744 mp_cont_len(mblk_t *bp, int *mblkcnt) 1745 { 1746 mblk_t *mp; 1747 int mblks = 0; 1748 int bytes = 0; 1749 1750 for (mp = bp; mp != NULL; mp = mp->b_cont) { 1751 ADD_MBLK_SIZE(mp, bytes); 1752 mblks++; 1753 } 1754 1755 if (mblkcnt != NULL) 1756 *mblkcnt = mblks; 1757 1758 return (bytes); 1759 } 1760 1761 /* 1762 * Like getq() but does not backenable. This is used by the stream 1763 * head when a putback() is likely. The caller must call qbackenable() 1764 * after it is done with accessing the queue. 1765 * The rbytes arguments to getq_noneab() allows callers to specify a 1766 * the maximum number of bytes to return. If the current amount on the 1767 * queue is less than this then the entire message will be returned. 1768 * A value of 0 returns the entire message and is equivalent to the old 1769 * default behaviour prior to the addition of the rbytes argument. 1770 */ 1771 mblk_t * 1772 getq_noenab(queue_t *q, ssize_t rbytes) 1773 { 1774 mblk_t *bp, *mp1; 1775 mblk_t *mp2 = NULL; 1776 qband_t *qbp; 1777 kthread_id_t freezer; 1778 int bytecnt = 0, mblkcnt = 0; 1779 1780 /* freezestr should allow its caller to call getq/putq */ 1781 freezer = STREAM(q)->sd_freezer; 1782 if (freezer == curthread) { 1783 ASSERT(frozenstr(q)); 1784 ASSERT(MUTEX_HELD(QLOCK(q))); 1785 } else 1786 mutex_enter(QLOCK(q)); 1787 1788 if ((bp = q->q_first) == 0) { 1789 q->q_flag |= QWANTR; 1790 } else { 1791 /* 1792 * If the caller supplied a byte threshold and there is 1793 * more than this amount on the queue then break up the 1794 * the message appropriately. We can only safely do 1795 * this for M_DATA messages. 1796 */ 1797 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) && 1798 (q->q_count > rbytes)) { 1799 /* 1800 * Inline version of mp_cont_len() which terminates 1801 * when we meet or exceed rbytes. 1802 */ 1803 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) { 1804 mblkcnt++; 1805 ADD_MBLK_SIZE(mp1, bytecnt); 1806 if (bytecnt >= rbytes) 1807 break; 1808 } 1809 /* 1810 * We need to account for the following scenarios: 1811 * 1812 * 1) Too much data in the first message: 1813 * mp1 will be the mblk which puts us over our 1814 * byte limit. 1815 * 2) Not enough data in the first message: 1816 * mp1 will be NULL. 1817 * 3) Exactly the right amount of data contained within 1818 * whole mblks: 1819 * mp1->b_cont will be where we break the message. 1820 */ 1821 if (bytecnt > rbytes) { 1822 /* 1823 * Dup/copy mp1 and put what we don't need 1824 * back onto the queue. Adjust the read/write 1825 * and continuation pointers appropriately 1826 * and decrement the current mblk count to 1827 * reflect we are putting an mblk back onto 1828 * the queue. 1829 * When adjusting the message pointers, it's 1830 * OK to use the existing bytecnt and the 1831 * requested amount (rbytes) to calculate the 1832 * the new write offset (b_wptr) of what we 1833 * are taking. However, we cannot use these 1834 * values when calculating the read offset of 1835 * the mblk we are putting back on the queue. 1836 * This is because the begining (b_rptr) of the 1837 * mblk represents some arbitrary point within 1838 * the message. 1839 * It's simplest to do this by advancing b_rptr 1840 * by the new length of mp1 as we don't have to 1841 * remember any intermediate state. 1842 */ 1843 ASSERT(mp1 != NULL); 1844 mblkcnt--; 1845 if ((mp2 = dupb(mp1)) == NULL && 1846 (mp2 = copyb(mp1)) == NULL) { 1847 bytecnt = mblkcnt = 0; 1848 goto dup_failed; 1849 } 1850 mp2->b_cont = mp1->b_cont; 1851 mp1->b_wptr -= bytecnt - rbytes; 1852 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr; 1853 mp1->b_cont = NULL; 1854 bytecnt = rbytes; 1855 } else { 1856 /* 1857 * Either there is not enough data in the first 1858 * message or there is no excess data to deal 1859 * with. If mp1 is NULL, we are taking the 1860 * whole message. No need to do anything. 1861 * Otherwise we assign mp1->b_cont to mp2 as 1862 * we will be putting this back onto the head of 1863 * the queue. 1864 */ 1865 if (mp1 != NULL) { 1866 mp2 = mp1->b_cont; 1867 mp1->b_cont = NULL; 1868 } 1869 } 1870 /* 1871 * If mp2 is not NULL then we have part of the message 1872 * to put back onto the queue. 1873 */ 1874 if (mp2 != NULL) { 1875 if ((mp2->b_next = bp->b_next) == NULL) 1876 q->q_last = mp2; 1877 else 1878 bp->b_next->b_prev = mp2; 1879 q->q_first = mp2; 1880 } else { 1881 if ((q->q_first = bp->b_next) == NULL) 1882 q->q_last = NULL; 1883 else 1884 q->q_first->b_prev = NULL; 1885 } 1886 } else { 1887 /* 1888 * Either no byte threshold was supplied, there is 1889 * not enough on the queue or we failed to 1890 * duplicate/copy a data block. In these cases we 1891 * just take the entire first message. 1892 */ 1893 dup_failed: 1894 bytecnt = mp_cont_len(bp, &mblkcnt); 1895 if ((q->q_first = bp->b_next) == NULL) 1896 q->q_last = NULL; 1897 else 1898 q->q_first->b_prev = NULL; 1899 } 1900 if (bp->b_band == 0) { 1901 q->q_count -= bytecnt; 1902 q->q_mblkcnt -= mblkcnt; 1903 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 1904 (q->q_mblkcnt < q->q_hiwat))) { 1905 q->q_flag &= ~QFULL; 1906 } 1907 } else { 1908 int i; 1909 1910 ASSERT(bp->b_band <= q->q_nband); 1911 ASSERT(q->q_bandp != NULL); 1912 ASSERT(MUTEX_HELD(QLOCK(q))); 1913 qbp = q->q_bandp; 1914 i = bp->b_band; 1915 while (--i > 0) 1916 qbp = qbp->qb_next; 1917 if (qbp->qb_first == qbp->qb_last) { 1918 qbp->qb_first = NULL; 1919 qbp->qb_last = NULL; 1920 } else { 1921 qbp->qb_first = bp->b_next; 1922 } 1923 qbp->qb_count -= bytecnt; 1924 qbp->qb_mblkcnt -= mblkcnt; 1925 if (qbp->qb_mblkcnt == 0 || 1926 ((qbp->qb_count < qbp->qb_hiwat) && 1927 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 1928 qbp->qb_flag &= ~QB_FULL; 1929 } 1930 } 1931 q->q_flag &= ~QWANTR; 1932 bp->b_next = NULL; 1933 bp->b_prev = NULL; 1934 } 1935 if (freezer != curthread) 1936 mutex_exit(QLOCK(q)); 1937 1938 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 1939 1940 return (bp); 1941 } 1942 1943 /* 1944 * Determine if a backenable is needed after removing a message in the 1945 * specified band. 1946 * NOTE: This routine assumes that something like getq_noenab() has been 1947 * already called. 1948 * 1949 * For the read side it is ok to hold sd_lock across calling this (and the 1950 * stream head often does). 1951 * But for the write side strwakeq might be invoked and it acquires sd_lock. 1952 */ 1953 void 1954 qbackenable(queue_t *q, uchar_t band) 1955 { 1956 int backenab = 0; 1957 qband_t *qbp; 1958 kthread_id_t freezer; 1959 1960 ASSERT(q); 1961 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 1962 1963 /* 1964 * Quick check without holding the lock. 1965 * OK since after getq() has lowered the q_count these flags 1966 * would not change unless either the qbackenable() is done by 1967 * another thread (which is ok) or the queue has gotten QFULL 1968 * in which case another backenable will take place when the queue 1969 * drops below q_lowat. 1970 */ 1971 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1972 return; 1973 1974 /* freezestr should allow its caller to call getq/putq */ 1975 freezer = STREAM(q)->sd_freezer; 1976 if (freezer == curthread) { 1977 ASSERT(frozenstr(q)); 1978 ASSERT(MUTEX_HELD(QLOCK(q))); 1979 } else 1980 mutex_enter(QLOCK(q)); 1981 1982 if (band == 0) { 1983 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 1984 q->q_mblkcnt < q->q_lowat)) { 1985 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 1986 } 1987 } else { 1988 int i; 1989 1990 ASSERT((unsigned)band <= q->q_nband); 1991 ASSERT(q->q_bandp != NULL); 1992 1993 qbp = q->q_bandp; 1994 i = band; 1995 while (--i > 0) 1996 qbp = qbp->qb_next; 1997 1998 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 1999 qbp->qb_mblkcnt < qbp->qb_lowat)) { 2000 backenab = qbp->qb_flag & QB_WANTW; 2001 } 2002 } 2003 2004 if (backenab == 0) { 2005 if (freezer != curthread) 2006 mutex_exit(QLOCK(q)); 2007 return; 2008 } 2009 2010 /* Have to drop the lock across strwakeq and backenable */ 2011 if (backenab & QWANTWSYNC) 2012 q->q_flag &= ~QWANTWSYNC; 2013 if (backenab & (QWANTW|QB_WANTW)) { 2014 if (band != 0) 2015 qbp->qb_flag &= ~QB_WANTW; 2016 else { 2017 q->q_flag &= ~QWANTW; 2018 } 2019 } 2020 2021 if (freezer != curthread) 2022 mutex_exit(QLOCK(q)); 2023 2024 if (backenab & QWANTWSYNC) 2025 strwakeq(q, QWANTWSYNC); 2026 if (backenab & (QWANTW|QB_WANTW)) 2027 backenable(q, band); 2028 } 2029 2030 /* 2031 * Remove a message from a queue. The queue count and other 2032 * flow control parameters are adjusted and the back queue 2033 * enabled if necessary. 2034 * 2035 * rmvq can be called with the stream frozen, but other utility functions 2036 * holding QLOCK, and by streams modules without any locks/frozen. 2037 */ 2038 void 2039 rmvq(queue_t *q, mblk_t *mp) 2040 { 2041 ASSERT(mp != NULL); 2042 2043 rmvq_noenab(q, mp); 2044 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 2045 /* 2046 * qbackenable can handle a frozen stream but not a "random" 2047 * qlock being held. Drop lock across qbackenable. 2048 */ 2049 mutex_exit(QLOCK(q)); 2050 qbackenable(q, mp->b_band); 2051 mutex_enter(QLOCK(q)); 2052 } else { 2053 qbackenable(q, mp->b_band); 2054 } 2055 } 2056 2057 /* 2058 * Like rmvq() but without any backenabling. 2059 * This exists to handle SR_CONSOL_DATA in strrput(). 2060 */ 2061 void 2062 rmvq_noenab(queue_t *q, mblk_t *mp) 2063 { 2064 int i; 2065 qband_t *qbp = NULL; 2066 kthread_id_t freezer; 2067 int bytecnt = 0, mblkcnt = 0; 2068 2069 freezer = STREAM(q)->sd_freezer; 2070 if (freezer == curthread) { 2071 ASSERT(frozenstr(q)); 2072 ASSERT(MUTEX_HELD(QLOCK(q))); 2073 } else if (MUTEX_HELD(QLOCK(q))) { 2074 /* Don't drop lock on exit */ 2075 freezer = curthread; 2076 } else 2077 mutex_enter(QLOCK(q)); 2078 2079 ASSERT(mp->b_band <= q->q_nband); 2080 if (mp->b_band != 0) { /* Adjust band pointers */ 2081 ASSERT(q->q_bandp != NULL); 2082 qbp = q->q_bandp; 2083 i = mp->b_band; 2084 while (--i > 0) 2085 qbp = qbp->qb_next; 2086 if (mp == qbp->qb_first) { 2087 if (mp->b_next && mp->b_band == mp->b_next->b_band) 2088 qbp->qb_first = mp->b_next; 2089 else 2090 qbp->qb_first = NULL; 2091 } 2092 if (mp == qbp->qb_last) { 2093 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 2094 qbp->qb_last = mp->b_prev; 2095 else 2096 qbp->qb_last = NULL; 2097 } 2098 } 2099 2100 /* 2101 * Remove the message from the list. 2102 */ 2103 if (mp->b_prev) 2104 mp->b_prev->b_next = mp->b_next; 2105 else 2106 q->q_first = mp->b_next; 2107 if (mp->b_next) 2108 mp->b_next->b_prev = mp->b_prev; 2109 else 2110 q->q_last = mp->b_prev; 2111 mp->b_next = NULL; 2112 mp->b_prev = NULL; 2113 2114 /* Get the size of the message for q_count accounting */ 2115 bytecnt = mp_cont_len(mp, &mblkcnt); 2116 2117 if (mp->b_band == 0) { /* Perform q_count accounting */ 2118 q->q_count -= bytecnt; 2119 q->q_mblkcnt -= mblkcnt; 2120 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2121 (q->q_mblkcnt < q->q_hiwat))) { 2122 q->q_flag &= ~QFULL; 2123 } 2124 } else { /* Perform qb_count accounting */ 2125 qbp->qb_count -= bytecnt; 2126 qbp->qb_mblkcnt -= mblkcnt; 2127 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) && 2128 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2129 qbp->qb_flag &= ~QB_FULL; 2130 } 2131 } 2132 if (freezer != curthread) 2133 mutex_exit(QLOCK(q)); 2134 2135 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 2136 } 2137 2138 /* 2139 * Empty a queue. 2140 * If flag is set, remove all messages. Otherwise, remove 2141 * only non-control messages. If queue falls below its low 2142 * water mark, and QWANTW is set, enable the nearest upstream 2143 * service procedure. 2144 * 2145 * Historical note: when merging the M_FLUSH code in strrput with this 2146 * code one difference was discovered. flushq did not have a check 2147 * for q_lowat == 0 in the backenabling test. 2148 * 2149 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2150 * if one exists on the queue. 2151 */ 2152 void 2153 flushq_common(queue_t *q, int flag, int pcproto_flag) 2154 { 2155 mblk_t *mp, *nmp; 2156 qband_t *qbp; 2157 int backenab = 0; 2158 unsigned char bpri; 2159 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2160 2161 if (q->q_first == NULL) 2162 return; 2163 2164 mutex_enter(QLOCK(q)); 2165 mp = q->q_first; 2166 q->q_first = NULL; 2167 q->q_last = NULL; 2168 q->q_count = 0; 2169 q->q_mblkcnt = 0; 2170 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2171 qbp->qb_first = NULL; 2172 qbp->qb_last = NULL; 2173 qbp->qb_count = 0; 2174 qbp->qb_mblkcnt = 0; 2175 qbp->qb_flag &= ~QB_FULL; 2176 } 2177 q->q_flag &= ~QFULL; 2178 mutex_exit(QLOCK(q)); 2179 while (mp) { 2180 nmp = mp->b_next; 2181 mp->b_next = mp->b_prev = NULL; 2182 2183 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2184 2185 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2186 (void) putq(q, mp); 2187 else if (flag || datamsg(mp->b_datap->db_type)) 2188 freemsg(mp); 2189 else 2190 (void) putq(q, mp); 2191 mp = nmp; 2192 } 2193 bpri = 1; 2194 mutex_enter(QLOCK(q)); 2195 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2196 if ((qbp->qb_flag & QB_WANTW) && 2197 (((qbp->qb_count < qbp->qb_lowat) && 2198 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2199 qbp->qb_lowat == 0)) { 2200 qbp->qb_flag &= ~QB_WANTW; 2201 backenab = 1; 2202 qbf[bpri] = 1; 2203 } else 2204 qbf[bpri] = 0; 2205 bpri++; 2206 } 2207 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2208 if ((q->q_flag & QWANTW) && 2209 (((q->q_count < q->q_lowat) && 2210 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2211 q->q_flag &= ~QWANTW; 2212 backenab = 1; 2213 qbf[0] = 1; 2214 } else 2215 qbf[0] = 0; 2216 2217 /* 2218 * If any band can now be written to, and there is a writer 2219 * for that band, then backenable the closest service procedure. 2220 */ 2221 if (backenab) { 2222 mutex_exit(QLOCK(q)); 2223 for (bpri = q->q_nband; bpri != 0; bpri--) 2224 if (qbf[bpri]) 2225 backenable(q, bpri); 2226 if (qbf[0]) 2227 backenable(q, 0); 2228 } else 2229 mutex_exit(QLOCK(q)); 2230 } 2231 2232 /* 2233 * The real flushing takes place in flushq_common. This is done so that 2234 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2235 * or not. Currently the only place that uses this flag is the stream head. 2236 */ 2237 void 2238 flushq(queue_t *q, int flag) 2239 { 2240 flushq_common(q, flag, 0); 2241 } 2242 2243 /* 2244 * Flush the queue of messages of the given priority band. 2245 * There is some duplication of code between flushq and flushband. 2246 * This is because we want to optimize the code as much as possible. 2247 * The assumption is that there will be more messages in the normal 2248 * (priority 0) band than in any other. 2249 * 2250 * Historical note: when merging the M_FLUSH code in strrput with this 2251 * code one difference was discovered. flushband had an extra check for 2252 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2253 * case. That check does not match the man page for flushband and was not 2254 * in the strrput flush code hence it was removed. 2255 */ 2256 void 2257 flushband(queue_t *q, unsigned char pri, int flag) 2258 { 2259 mblk_t *mp; 2260 mblk_t *nmp; 2261 mblk_t *last; 2262 qband_t *qbp; 2263 int band; 2264 2265 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2266 if (pri > q->q_nband) { 2267 return; 2268 } 2269 mutex_enter(QLOCK(q)); 2270 if (pri == 0) { 2271 mp = q->q_first; 2272 q->q_first = NULL; 2273 q->q_last = NULL; 2274 q->q_count = 0; 2275 q->q_mblkcnt = 0; 2276 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2277 qbp->qb_first = NULL; 2278 qbp->qb_last = NULL; 2279 qbp->qb_count = 0; 2280 qbp->qb_mblkcnt = 0; 2281 qbp->qb_flag &= ~QB_FULL; 2282 } 2283 q->q_flag &= ~QFULL; 2284 mutex_exit(QLOCK(q)); 2285 while (mp) { 2286 nmp = mp->b_next; 2287 mp->b_next = mp->b_prev = NULL; 2288 if ((mp->b_band == 0) && 2289 ((flag == FLUSHALL) || 2290 datamsg(mp->b_datap->db_type))) 2291 freemsg(mp); 2292 else 2293 (void) putq(q, mp); 2294 mp = nmp; 2295 } 2296 mutex_enter(QLOCK(q)); 2297 if ((q->q_flag & QWANTW) && 2298 (((q->q_count < q->q_lowat) && 2299 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2300 q->q_flag &= ~QWANTW; 2301 mutex_exit(QLOCK(q)); 2302 2303 backenable(q, pri); 2304 } else 2305 mutex_exit(QLOCK(q)); 2306 } else { /* pri != 0 */ 2307 boolean_t flushed = B_FALSE; 2308 band = pri; 2309 2310 ASSERT(MUTEX_HELD(QLOCK(q))); 2311 qbp = q->q_bandp; 2312 while (--band > 0) 2313 qbp = qbp->qb_next; 2314 mp = qbp->qb_first; 2315 if (mp == NULL) { 2316 mutex_exit(QLOCK(q)); 2317 return; 2318 } 2319 last = qbp->qb_last->b_next; 2320 /* 2321 * rmvq_noenab() and freemsg() are called for each mblk that 2322 * meets the criteria. The loop is executed until the last 2323 * mblk has been processed. 2324 */ 2325 while (mp != last) { 2326 ASSERT(mp->b_band == pri); 2327 nmp = mp->b_next; 2328 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2329 rmvq_noenab(q, mp); 2330 freemsg(mp); 2331 flushed = B_TRUE; 2332 } 2333 mp = nmp; 2334 } 2335 mutex_exit(QLOCK(q)); 2336 2337 /* 2338 * If any mblk(s) has been freed, we know that qbackenable() 2339 * will need to be called. 2340 */ 2341 if (flushed) 2342 qbackenable(q, pri); 2343 } 2344 } 2345 2346 /* 2347 * Return 1 if the queue is not full. If the queue is full, return 2348 * 0 (may not put message) and set QWANTW flag (caller wants to write 2349 * to the queue). 2350 */ 2351 int 2352 canput(queue_t *q) 2353 { 2354 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2355 2356 /* this is for loopback transports, they should not do a canput */ 2357 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2358 2359 /* Find next forward module that has a service procedure */ 2360 q = q->q_nfsrv; 2361 2362 if (!(q->q_flag & QFULL)) { 2363 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2364 return (1); 2365 } 2366 mutex_enter(QLOCK(q)); 2367 if (q->q_flag & QFULL) { 2368 q->q_flag |= QWANTW; 2369 mutex_exit(QLOCK(q)); 2370 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2371 return (0); 2372 } 2373 mutex_exit(QLOCK(q)); 2374 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2375 return (1); 2376 } 2377 2378 /* 2379 * This is the new canput for use with priority bands. Return 1 if the 2380 * band is not full. If the band is full, return 0 (may not put message) 2381 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2382 * write to the queue). 2383 */ 2384 int 2385 bcanput(queue_t *q, unsigned char pri) 2386 { 2387 qband_t *qbp; 2388 2389 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2390 if (!q) 2391 return (0); 2392 2393 /* Find next forward module that has a service procedure */ 2394 q = q->q_nfsrv; 2395 2396 mutex_enter(QLOCK(q)); 2397 if (pri == 0) { 2398 if (q->q_flag & QFULL) { 2399 q->q_flag |= QWANTW; 2400 mutex_exit(QLOCK(q)); 2401 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2402 "bcanput:%p %X %d", q, pri, 0); 2403 return (0); 2404 } 2405 } else { /* pri != 0 */ 2406 if (pri > q->q_nband) { 2407 /* 2408 * No band exists yet, so return success. 2409 */ 2410 mutex_exit(QLOCK(q)); 2411 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2412 "bcanput:%p %X %d", q, pri, 1); 2413 return (1); 2414 } 2415 qbp = q->q_bandp; 2416 while (--pri) 2417 qbp = qbp->qb_next; 2418 if (qbp->qb_flag & QB_FULL) { 2419 qbp->qb_flag |= QB_WANTW; 2420 mutex_exit(QLOCK(q)); 2421 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2422 "bcanput:%p %X %d", q, pri, 0); 2423 return (0); 2424 } 2425 } 2426 mutex_exit(QLOCK(q)); 2427 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2428 "bcanput:%p %X %d", q, pri, 1); 2429 return (1); 2430 } 2431 2432 /* 2433 * Put a message on a queue. 2434 * 2435 * Messages are enqueued on a priority basis. The priority classes 2436 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2437 * and B_NORMAL (type < QPCTL && band == 0). 2438 * 2439 * Add appropriate weighted data block sizes to queue count. 2440 * If queue hits high water mark then set QFULL flag. 2441 * 2442 * If QNOENAB is not set (putq is allowed to enable the queue), 2443 * enable the queue only if the message is PRIORITY, 2444 * or the QWANTR flag is set (indicating that the service procedure 2445 * is ready to read the queue. This implies that a service 2446 * procedure must NEVER put a high priority message back on its own 2447 * queue, as this would result in an infinite loop (!). 2448 */ 2449 int 2450 putq(queue_t *q, mblk_t *bp) 2451 { 2452 mblk_t *tmp; 2453 qband_t *qbp = NULL; 2454 int mcls = (int)queclass(bp); 2455 kthread_id_t freezer; 2456 int bytecnt = 0, mblkcnt = 0; 2457 2458 freezer = STREAM(q)->sd_freezer; 2459 if (freezer == curthread) { 2460 ASSERT(frozenstr(q)); 2461 ASSERT(MUTEX_HELD(QLOCK(q))); 2462 } else 2463 mutex_enter(QLOCK(q)); 2464 2465 /* 2466 * Make sanity checks and if qband structure is not yet 2467 * allocated, do so. 2468 */ 2469 if (mcls == QPCTL) { 2470 if (bp->b_band != 0) 2471 bp->b_band = 0; /* force to be correct */ 2472 } else if (bp->b_band != 0) { 2473 int i; 2474 qband_t **qbpp; 2475 2476 if (bp->b_band > q->q_nband) { 2477 2478 /* 2479 * The qband structure for this priority band is 2480 * not on the queue yet, so we have to allocate 2481 * one on the fly. It would be wasteful to 2482 * associate the qband structures with every 2483 * queue when the queues are allocated. This is 2484 * because most queues will only need the normal 2485 * band of flow which can be described entirely 2486 * by the queue itself. 2487 */ 2488 qbpp = &q->q_bandp; 2489 while (*qbpp) 2490 qbpp = &(*qbpp)->qb_next; 2491 while (bp->b_band > q->q_nband) { 2492 if ((*qbpp = allocband()) == NULL) { 2493 if (freezer != curthread) 2494 mutex_exit(QLOCK(q)); 2495 return (0); 2496 } 2497 (*qbpp)->qb_hiwat = q->q_hiwat; 2498 (*qbpp)->qb_lowat = q->q_lowat; 2499 q->q_nband++; 2500 qbpp = &(*qbpp)->qb_next; 2501 } 2502 } 2503 ASSERT(MUTEX_HELD(QLOCK(q))); 2504 qbp = q->q_bandp; 2505 i = bp->b_band; 2506 while (--i) 2507 qbp = qbp->qb_next; 2508 } 2509 2510 /* 2511 * If queue is empty, add the message and initialize the pointers. 2512 * Otherwise, adjust message pointers and queue pointers based on 2513 * the type of the message and where it belongs on the queue. Some 2514 * code is duplicated to minimize the number of conditionals and 2515 * hopefully minimize the amount of time this routine takes. 2516 */ 2517 if (!q->q_first) { 2518 bp->b_next = NULL; 2519 bp->b_prev = NULL; 2520 q->q_first = bp; 2521 q->q_last = bp; 2522 if (qbp) { 2523 qbp->qb_first = bp; 2524 qbp->qb_last = bp; 2525 } 2526 } else if (!qbp) { /* bp->b_band == 0 */ 2527 2528 /* 2529 * If queue class of message is less than or equal to 2530 * that of the last one on the queue, tack on to the end. 2531 */ 2532 tmp = q->q_last; 2533 if (mcls <= (int)queclass(tmp)) { 2534 bp->b_next = NULL; 2535 bp->b_prev = tmp; 2536 tmp->b_next = bp; 2537 q->q_last = bp; 2538 } else { 2539 tmp = q->q_first; 2540 while ((int)queclass(tmp) >= mcls) 2541 tmp = tmp->b_next; 2542 2543 /* 2544 * Insert bp before tmp. 2545 */ 2546 bp->b_next = tmp; 2547 bp->b_prev = tmp->b_prev; 2548 if (tmp->b_prev) 2549 tmp->b_prev->b_next = bp; 2550 else 2551 q->q_first = bp; 2552 tmp->b_prev = bp; 2553 } 2554 } else { /* bp->b_band != 0 */ 2555 if (qbp->qb_first) { 2556 tmp = qbp->qb_last; 2557 2558 /* 2559 * Insert bp after the last message in this band. 2560 */ 2561 bp->b_next = tmp->b_next; 2562 if (tmp->b_next) 2563 tmp->b_next->b_prev = bp; 2564 else 2565 q->q_last = bp; 2566 bp->b_prev = tmp; 2567 tmp->b_next = bp; 2568 } else { 2569 tmp = q->q_last; 2570 if ((mcls < (int)queclass(tmp)) || 2571 (bp->b_band <= tmp->b_band)) { 2572 2573 /* 2574 * Tack bp on end of queue. 2575 */ 2576 bp->b_next = NULL; 2577 bp->b_prev = tmp; 2578 tmp->b_next = bp; 2579 q->q_last = bp; 2580 } else { 2581 tmp = q->q_first; 2582 while (tmp->b_datap->db_type >= QPCTL) 2583 tmp = tmp->b_next; 2584 while (tmp->b_band >= bp->b_band) 2585 tmp = tmp->b_next; 2586 2587 /* 2588 * Insert bp before tmp. 2589 */ 2590 bp->b_next = tmp; 2591 bp->b_prev = tmp->b_prev; 2592 if (tmp->b_prev) 2593 tmp->b_prev->b_next = bp; 2594 else 2595 q->q_first = bp; 2596 tmp->b_prev = bp; 2597 } 2598 qbp->qb_first = bp; 2599 } 2600 qbp->qb_last = bp; 2601 } 2602 2603 /* Get message byte count for q_count accounting */ 2604 bytecnt = mp_cont_len(bp, &mblkcnt); 2605 2606 if (qbp) { 2607 qbp->qb_count += bytecnt; 2608 qbp->qb_mblkcnt += mblkcnt; 2609 if ((qbp->qb_count >= qbp->qb_hiwat) || 2610 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2611 qbp->qb_flag |= QB_FULL; 2612 } 2613 } else { 2614 q->q_count += bytecnt; 2615 q->q_mblkcnt += mblkcnt; 2616 if ((q->q_count >= q->q_hiwat) || 2617 (q->q_mblkcnt >= q->q_hiwat)) { 2618 q->q_flag |= QFULL; 2619 } 2620 } 2621 2622 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2623 2624 if ((mcls > QNORM) || 2625 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2626 qenable_locked(q); 2627 ASSERT(MUTEX_HELD(QLOCK(q))); 2628 if (freezer != curthread) 2629 mutex_exit(QLOCK(q)); 2630 2631 return (1); 2632 } 2633 2634 /* 2635 * Put stuff back at beginning of Q according to priority order. 2636 * See comment on putq above for details. 2637 */ 2638 int 2639 putbq(queue_t *q, mblk_t *bp) 2640 { 2641 mblk_t *tmp; 2642 qband_t *qbp = NULL; 2643 int mcls = (int)queclass(bp); 2644 kthread_id_t freezer; 2645 int bytecnt = 0, mblkcnt = 0; 2646 2647 ASSERT(q && bp); 2648 ASSERT(bp->b_next == NULL); 2649 freezer = STREAM(q)->sd_freezer; 2650 if (freezer == curthread) { 2651 ASSERT(frozenstr(q)); 2652 ASSERT(MUTEX_HELD(QLOCK(q))); 2653 } else 2654 mutex_enter(QLOCK(q)); 2655 2656 /* 2657 * Make sanity checks and if qband structure is not yet 2658 * allocated, do so. 2659 */ 2660 if (mcls == QPCTL) { 2661 if (bp->b_band != 0) 2662 bp->b_band = 0; /* force to be correct */ 2663 } else if (bp->b_band != 0) { 2664 int i; 2665 qband_t **qbpp; 2666 2667 if (bp->b_band > q->q_nband) { 2668 qbpp = &q->q_bandp; 2669 while (*qbpp) 2670 qbpp = &(*qbpp)->qb_next; 2671 while (bp->b_band > q->q_nband) { 2672 if ((*qbpp = allocband()) == NULL) { 2673 if (freezer != curthread) 2674 mutex_exit(QLOCK(q)); 2675 return (0); 2676 } 2677 (*qbpp)->qb_hiwat = q->q_hiwat; 2678 (*qbpp)->qb_lowat = q->q_lowat; 2679 q->q_nband++; 2680 qbpp = &(*qbpp)->qb_next; 2681 } 2682 } 2683 qbp = q->q_bandp; 2684 i = bp->b_band; 2685 while (--i) 2686 qbp = qbp->qb_next; 2687 } 2688 2689 /* 2690 * If queue is empty or if message is high priority, 2691 * place on the front of the queue. 2692 */ 2693 tmp = q->q_first; 2694 if ((!tmp) || (mcls == QPCTL)) { 2695 bp->b_next = tmp; 2696 if (tmp) 2697 tmp->b_prev = bp; 2698 else 2699 q->q_last = bp; 2700 q->q_first = bp; 2701 bp->b_prev = NULL; 2702 if (qbp) { 2703 qbp->qb_first = bp; 2704 qbp->qb_last = bp; 2705 } 2706 } else if (qbp) { /* bp->b_band != 0 */ 2707 tmp = qbp->qb_first; 2708 if (tmp) { 2709 2710 /* 2711 * Insert bp before the first message in this band. 2712 */ 2713 bp->b_next = tmp; 2714 bp->b_prev = tmp->b_prev; 2715 if (tmp->b_prev) 2716 tmp->b_prev->b_next = bp; 2717 else 2718 q->q_first = bp; 2719 tmp->b_prev = bp; 2720 } else { 2721 tmp = q->q_last; 2722 if ((mcls < (int)queclass(tmp)) || 2723 (bp->b_band < tmp->b_band)) { 2724 2725 /* 2726 * Tack bp on end of queue. 2727 */ 2728 bp->b_next = NULL; 2729 bp->b_prev = tmp; 2730 tmp->b_next = bp; 2731 q->q_last = bp; 2732 } else { 2733 tmp = q->q_first; 2734 while (tmp->b_datap->db_type >= QPCTL) 2735 tmp = tmp->b_next; 2736 while (tmp->b_band > bp->b_band) 2737 tmp = tmp->b_next; 2738 2739 /* 2740 * Insert bp before tmp. 2741 */ 2742 bp->b_next = tmp; 2743 bp->b_prev = tmp->b_prev; 2744 if (tmp->b_prev) 2745 tmp->b_prev->b_next = bp; 2746 else 2747 q->q_first = bp; 2748 tmp->b_prev = bp; 2749 } 2750 qbp->qb_last = bp; 2751 } 2752 qbp->qb_first = bp; 2753 } else { /* bp->b_band == 0 && !QPCTL */ 2754 2755 /* 2756 * If the queue class or band is less than that of the last 2757 * message on the queue, tack bp on the end of the queue. 2758 */ 2759 tmp = q->q_last; 2760 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2761 bp->b_next = NULL; 2762 bp->b_prev = tmp; 2763 tmp->b_next = bp; 2764 q->q_last = bp; 2765 } else { 2766 tmp = q->q_first; 2767 while (tmp->b_datap->db_type >= QPCTL) 2768 tmp = tmp->b_next; 2769 while (tmp->b_band > bp->b_band) 2770 tmp = tmp->b_next; 2771 2772 /* 2773 * Insert bp before tmp. 2774 */ 2775 bp->b_next = tmp; 2776 bp->b_prev = tmp->b_prev; 2777 if (tmp->b_prev) 2778 tmp->b_prev->b_next = bp; 2779 else 2780 q->q_first = bp; 2781 tmp->b_prev = bp; 2782 } 2783 } 2784 2785 /* Get message byte count for q_count accounting */ 2786 bytecnt = mp_cont_len(bp, &mblkcnt); 2787 2788 if (qbp) { 2789 qbp->qb_count += bytecnt; 2790 qbp->qb_mblkcnt += mblkcnt; 2791 if ((qbp->qb_count >= qbp->qb_hiwat) || 2792 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2793 qbp->qb_flag |= QB_FULL; 2794 } 2795 } else { 2796 q->q_count += bytecnt; 2797 q->q_mblkcnt += mblkcnt; 2798 if ((q->q_count >= q->q_hiwat) || 2799 (q->q_mblkcnt >= q->q_hiwat)) { 2800 q->q_flag |= QFULL; 2801 } 2802 } 2803 2804 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2805 2806 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2807 qenable_locked(q); 2808 ASSERT(MUTEX_HELD(QLOCK(q))); 2809 if (freezer != curthread) 2810 mutex_exit(QLOCK(q)); 2811 2812 return (1); 2813 } 2814 2815 /* 2816 * Insert a message before an existing message on the queue. If the 2817 * existing message is NULL, the new messages is placed on the end of 2818 * the queue. The queue class of the new message is ignored. However, 2819 * the priority band of the new message must adhere to the following 2820 * ordering: 2821 * 2822 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2823 * 2824 * All flow control parameters are updated. 2825 * 2826 * insq can be called with the stream frozen, but other utility functions 2827 * holding QLOCK, and by streams modules without any locks/frozen. 2828 */ 2829 int 2830 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2831 { 2832 mblk_t *tmp; 2833 qband_t *qbp = NULL; 2834 int mcls = (int)queclass(mp); 2835 kthread_id_t freezer; 2836 int bytecnt = 0, mblkcnt = 0; 2837 2838 freezer = STREAM(q)->sd_freezer; 2839 if (freezer == curthread) { 2840 ASSERT(frozenstr(q)); 2841 ASSERT(MUTEX_HELD(QLOCK(q))); 2842 } else if (MUTEX_HELD(QLOCK(q))) { 2843 /* Don't drop lock on exit */ 2844 freezer = curthread; 2845 } else 2846 mutex_enter(QLOCK(q)); 2847 2848 if (mcls == QPCTL) { 2849 if (mp->b_band != 0) 2850 mp->b_band = 0; /* force to be correct */ 2851 if (emp && emp->b_prev && 2852 (emp->b_prev->b_datap->db_type < QPCTL)) 2853 goto badord; 2854 } 2855 if (emp) { 2856 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2857 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2858 (emp->b_prev->b_band < mp->b_band))) { 2859 goto badord; 2860 } 2861 } else { 2862 tmp = q->q_last; 2863 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2864 badord: 2865 cmn_err(CE_WARN, 2866 "insq: attempt to insert message out of order " 2867 "on q %p", (void *)q); 2868 if (freezer != curthread) 2869 mutex_exit(QLOCK(q)); 2870 return (0); 2871 } 2872 } 2873 2874 if (mp->b_band != 0) { 2875 int i; 2876 qband_t **qbpp; 2877 2878 if (mp->b_band > q->q_nband) { 2879 qbpp = &q->q_bandp; 2880 while (*qbpp) 2881 qbpp = &(*qbpp)->qb_next; 2882 while (mp->b_band > q->q_nband) { 2883 if ((*qbpp = allocband()) == NULL) { 2884 if (freezer != curthread) 2885 mutex_exit(QLOCK(q)); 2886 return (0); 2887 } 2888 (*qbpp)->qb_hiwat = q->q_hiwat; 2889 (*qbpp)->qb_lowat = q->q_lowat; 2890 q->q_nband++; 2891 qbpp = &(*qbpp)->qb_next; 2892 } 2893 } 2894 qbp = q->q_bandp; 2895 i = mp->b_band; 2896 while (--i) 2897 qbp = qbp->qb_next; 2898 } 2899 2900 if ((mp->b_next = emp) != NULL) { 2901 if ((mp->b_prev = emp->b_prev) != NULL) 2902 emp->b_prev->b_next = mp; 2903 else 2904 q->q_first = mp; 2905 emp->b_prev = mp; 2906 } else { 2907 if ((mp->b_prev = q->q_last) != NULL) 2908 q->q_last->b_next = mp; 2909 else 2910 q->q_first = mp; 2911 q->q_last = mp; 2912 } 2913 2914 /* Get mblk and byte count for q_count accounting */ 2915 bytecnt = mp_cont_len(mp, &mblkcnt); 2916 2917 if (qbp) { /* adjust qband pointers and count */ 2918 if (!qbp->qb_first) { 2919 qbp->qb_first = mp; 2920 qbp->qb_last = mp; 2921 } else { 2922 if (mp->b_prev == NULL || (mp->b_prev != NULL && 2923 (mp->b_prev->b_band != mp->b_band))) 2924 qbp->qb_first = mp; 2925 else if (mp->b_next == NULL || (mp->b_next != NULL && 2926 (mp->b_next->b_band != mp->b_band))) 2927 qbp->qb_last = mp; 2928 } 2929 qbp->qb_count += bytecnt; 2930 qbp->qb_mblkcnt += mblkcnt; 2931 if ((qbp->qb_count >= qbp->qb_hiwat) || 2932 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2933 qbp->qb_flag |= QB_FULL; 2934 } 2935 } else { 2936 q->q_count += bytecnt; 2937 q->q_mblkcnt += mblkcnt; 2938 if ((q->q_count >= q->q_hiwat) || 2939 (q->q_mblkcnt >= q->q_hiwat)) { 2940 q->q_flag |= QFULL; 2941 } 2942 } 2943 2944 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 2945 2946 if (canenable(q) && (q->q_flag & QWANTR)) 2947 qenable_locked(q); 2948 2949 ASSERT(MUTEX_HELD(QLOCK(q))); 2950 if (freezer != curthread) 2951 mutex_exit(QLOCK(q)); 2952 2953 return (1); 2954 } 2955 2956 /* 2957 * Create and put a control message on queue. 2958 */ 2959 int 2960 putctl(queue_t *q, int type) 2961 { 2962 mblk_t *bp; 2963 2964 if ((datamsg(type) && (type != M_DELAY)) || 2965 (bp = allocb_tryhard(0)) == NULL) 2966 return (0); 2967 bp->b_datap->db_type = (unsigned char) type; 2968 2969 put(q, bp); 2970 2971 return (1); 2972 } 2973 2974 /* 2975 * Control message with a single-byte parameter 2976 */ 2977 int 2978 putctl1(queue_t *q, int type, int param) 2979 { 2980 mblk_t *bp; 2981 2982 if ((datamsg(type) && (type != M_DELAY)) || 2983 (bp = allocb_tryhard(1)) == NULL) 2984 return (0); 2985 bp->b_datap->db_type = (unsigned char)type; 2986 *bp->b_wptr++ = (unsigned char)param; 2987 2988 put(q, bp); 2989 2990 return (1); 2991 } 2992 2993 int 2994 putnextctl1(queue_t *q, int type, int param) 2995 { 2996 mblk_t *bp; 2997 2998 if ((datamsg(type) && (type != M_DELAY)) || 2999 ((bp = allocb_tryhard(1)) == NULL)) 3000 return (0); 3001 3002 bp->b_datap->db_type = (unsigned char)type; 3003 *bp->b_wptr++ = (unsigned char)param; 3004 3005 putnext(q, bp); 3006 3007 return (1); 3008 } 3009 3010 int 3011 putnextctl(queue_t *q, int type) 3012 { 3013 mblk_t *bp; 3014 3015 if ((datamsg(type) && (type != M_DELAY)) || 3016 ((bp = allocb_tryhard(0)) == NULL)) 3017 return (0); 3018 bp->b_datap->db_type = (unsigned char)type; 3019 3020 putnext(q, bp); 3021 3022 return (1); 3023 } 3024 3025 /* 3026 * Return the queue upstream from this one 3027 */ 3028 queue_t * 3029 backq(queue_t *q) 3030 { 3031 q = _OTHERQ(q); 3032 if (q->q_next) { 3033 q = q->q_next; 3034 return (_OTHERQ(q)); 3035 } 3036 return (NULL); 3037 } 3038 3039 /* 3040 * Send a block back up the queue in reverse from this 3041 * one (e.g. to respond to ioctls) 3042 */ 3043 void 3044 qreply(queue_t *q, mblk_t *bp) 3045 { 3046 ASSERT(q && bp); 3047 3048 putnext(_OTHERQ(q), bp); 3049 } 3050 3051 /* 3052 * Streams Queue Scheduling 3053 * 3054 * Queues are enabled through qenable() when they have messages to 3055 * process. They are serviced by queuerun(), which runs each enabled 3056 * queue's service procedure. The call to queuerun() is processor 3057 * dependent - the general principle is that it be run whenever a queue 3058 * is enabled but before returning to user level. For system calls, 3059 * the function runqueues() is called if their action causes a queue 3060 * to be enabled. For device interrupts, queuerun() should be 3061 * called before returning from the last level of interrupt. Beyond 3062 * this, no timing assumptions should be made about queue scheduling. 3063 */ 3064 3065 /* 3066 * Enable a queue: put it on list of those whose service procedures are 3067 * ready to run and set up the scheduling mechanism. 3068 * The broadcast is done outside the mutex -> to avoid the woken thread 3069 * from contending with the mutex. This is OK 'cos the queue has been 3070 * enqueued on the runlist and flagged safely at this point. 3071 */ 3072 void 3073 qenable(queue_t *q) 3074 { 3075 mutex_enter(QLOCK(q)); 3076 qenable_locked(q); 3077 mutex_exit(QLOCK(q)); 3078 } 3079 /* 3080 * Return number of messages on queue 3081 */ 3082 int 3083 qsize(queue_t *qp) 3084 { 3085 int count = 0; 3086 mblk_t *mp; 3087 3088 mutex_enter(QLOCK(qp)); 3089 for (mp = qp->q_first; mp; mp = mp->b_next) 3090 count++; 3091 mutex_exit(QLOCK(qp)); 3092 return (count); 3093 } 3094 3095 /* 3096 * noenable - set queue so that putq() will not enable it. 3097 * enableok - set queue so that putq() can enable it. 3098 */ 3099 void 3100 noenable(queue_t *q) 3101 { 3102 mutex_enter(QLOCK(q)); 3103 q->q_flag |= QNOENB; 3104 mutex_exit(QLOCK(q)); 3105 } 3106 3107 void 3108 enableok(queue_t *q) 3109 { 3110 mutex_enter(QLOCK(q)); 3111 q->q_flag &= ~QNOENB; 3112 mutex_exit(QLOCK(q)); 3113 } 3114 3115 /* 3116 * Set queue fields. 3117 */ 3118 int 3119 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 3120 { 3121 qband_t *qbp = NULL; 3122 queue_t *wrq; 3123 int error = 0; 3124 kthread_id_t freezer; 3125 3126 freezer = STREAM(q)->sd_freezer; 3127 if (freezer == curthread) { 3128 ASSERT(frozenstr(q)); 3129 ASSERT(MUTEX_HELD(QLOCK(q))); 3130 } else 3131 mutex_enter(QLOCK(q)); 3132 3133 if (what >= QBAD) { 3134 error = EINVAL; 3135 goto done; 3136 } 3137 if (pri != 0) { 3138 int i; 3139 qband_t **qbpp; 3140 3141 if (pri > q->q_nband) { 3142 qbpp = &q->q_bandp; 3143 while (*qbpp) 3144 qbpp = &(*qbpp)->qb_next; 3145 while (pri > q->q_nband) { 3146 if ((*qbpp = allocband()) == NULL) { 3147 error = EAGAIN; 3148 goto done; 3149 } 3150 (*qbpp)->qb_hiwat = q->q_hiwat; 3151 (*qbpp)->qb_lowat = q->q_lowat; 3152 q->q_nband++; 3153 qbpp = &(*qbpp)->qb_next; 3154 } 3155 } 3156 qbp = q->q_bandp; 3157 i = pri; 3158 while (--i) 3159 qbp = qbp->qb_next; 3160 } 3161 switch (what) { 3162 3163 case QHIWAT: 3164 if (qbp) 3165 qbp->qb_hiwat = (size_t)val; 3166 else 3167 q->q_hiwat = (size_t)val; 3168 break; 3169 3170 case QLOWAT: 3171 if (qbp) 3172 qbp->qb_lowat = (size_t)val; 3173 else 3174 q->q_lowat = (size_t)val; 3175 break; 3176 3177 case QMAXPSZ: 3178 if (qbp) 3179 error = EINVAL; 3180 else 3181 q->q_maxpsz = (ssize_t)val; 3182 3183 /* 3184 * Performance concern, strwrite looks at the module below 3185 * the stream head for the maxpsz each time it does a write 3186 * we now cache it at the stream head. Check to see if this 3187 * queue is sitting directly below the stream head. 3188 */ 3189 wrq = STREAM(q)->sd_wrq; 3190 if (q != wrq->q_next) 3191 break; 3192 3193 /* 3194 * If the stream is not frozen drop the current QLOCK and 3195 * acquire the sd_wrq QLOCK which protects sd_qn_* 3196 */ 3197 if (freezer != curthread) { 3198 mutex_exit(QLOCK(q)); 3199 mutex_enter(QLOCK(wrq)); 3200 } 3201 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3202 3203 if (strmsgsz != 0) { 3204 if (val == INFPSZ) 3205 val = strmsgsz; 3206 else { 3207 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3208 val = MIN(PIPE_BUF, val); 3209 else 3210 val = MIN(strmsgsz, val); 3211 } 3212 } 3213 STREAM(q)->sd_qn_maxpsz = val; 3214 if (freezer != curthread) { 3215 mutex_exit(QLOCK(wrq)); 3216 mutex_enter(QLOCK(q)); 3217 } 3218 break; 3219 3220 case QMINPSZ: 3221 if (qbp) 3222 error = EINVAL; 3223 else 3224 q->q_minpsz = (ssize_t)val; 3225 3226 /* 3227 * Performance concern, strwrite looks at the module below 3228 * the stream head for the maxpsz each time it does a write 3229 * we now cache it at the stream head. Check to see if this 3230 * queue is sitting directly below the stream head. 3231 */ 3232 wrq = STREAM(q)->sd_wrq; 3233 if (q != wrq->q_next) 3234 break; 3235 3236 /* 3237 * If the stream is not frozen drop the current QLOCK and 3238 * acquire the sd_wrq QLOCK which protects sd_qn_* 3239 */ 3240 if (freezer != curthread) { 3241 mutex_exit(QLOCK(q)); 3242 mutex_enter(QLOCK(wrq)); 3243 } 3244 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3245 3246 if (freezer != curthread) { 3247 mutex_exit(QLOCK(wrq)); 3248 mutex_enter(QLOCK(q)); 3249 } 3250 break; 3251 3252 case QSTRUIOT: 3253 if (qbp) 3254 error = EINVAL; 3255 else 3256 q->q_struiot = (ushort_t)val; 3257 break; 3258 3259 case QCOUNT: 3260 case QFIRST: 3261 case QLAST: 3262 case QFLAG: 3263 error = EPERM; 3264 break; 3265 3266 default: 3267 error = EINVAL; 3268 break; 3269 } 3270 done: 3271 if (freezer != curthread) 3272 mutex_exit(QLOCK(q)); 3273 return (error); 3274 } 3275 3276 /* 3277 * Get queue fields. 3278 */ 3279 int 3280 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3281 { 3282 qband_t *qbp = NULL; 3283 int error = 0; 3284 kthread_id_t freezer; 3285 3286 freezer = STREAM(q)->sd_freezer; 3287 if (freezer == curthread) { 3288 ASSERT(frozenstr(q)); 3289 ASSERT(MUTEX_HELD(QLOCK(q))); 3290 } else 3291 mutex_enter(QLOCK(q)); 3292 if (what >= QBAD) { 3293 error = EINVAL; 3294 goto done; 3295 } 3296 if (pri != 0) { 3297 int i; 3298 qband_t **qbpp; 3299 3300 if (pri > q->q_nband) { 3301 qbpp = &q->q_bandp; 3302 while (*qbpp) 3303 qbpp = &(*qbpp)->qb_next; 3304 while (pri > q->q_nband) { 3305 if ((*qbpp = allocband()) == NULL) { 3306 error = EAGAIN; 3307 goto done; 3308 } 3309 (*qbpp)->qb_hiwat = q->q_hiwat; 3310 (*qbpp)->qb_lowat = q->q_lowat; 3311 q->q_nband++; 3312 qbpp = &(*qbpp)->qb_next; 3313 } 3314 } 3315 qbp = q->q_bandp; 3316 i = pri; 3317 while (--i) 3318 qbp = qbp->qb_next; 3319 } 3320 switch (what) { 3321 case QHIWAT: 3322 if (qbp) 3323 *(size_t *)valp = qbp->qb_hiwat; 3324 else 3325 *(size_t *)valp = q->q_hiwat; 3326 break; 3327 3328 case QLOWAT: 3329 if (qbp) 3330 *(size_t *)valp = qbp->qb_lowat; 3331 else 3332 *(size_t *)valp = q->q_lowat; 3333 break; 3334 3335 case QMAXPSZ: 3336 if (qbp) 3337 error = EINVAL; 3338 else 3339 *(ssize_t *)valp = q->q_maxpsz; 3340 break; 3341 3342 case QMINPSZ: 3343 if (qbp) 3344 error = EINVAL; 3345 else 3346 *(ssize_t *)valp = q->q_minpsz; 3347 break; 3348 3349 case QCOUNT: 3350 if (qbp) 3351 *(size_t *)valp = qbp->qb_count; 3352 else 3353 *(size_t *)valp = q->q_count; 3354 break; 3355 3356 case QFIRST: 3357 if (qbp) 3358 *(mblk_t **)valp = qbp->qb_first; 3359 else 3360 *(mblk_t **)valp = q->q_first; 3361 break; 3362 3363 case QLAST: 3364 if (qbp) 3365 *(mblk_t **)valp = qbp->qb_last; 3366 else 3367 *(mblk_t **)valp = q->q_last; 3368 break; 3369 3370 case QFLAG: 3371 if (qbp) 3372 *(uint_t *)valp = qbp->qb_flag; 3373 else 3374 *(uint_t *)valp = q->q_flag; 3375 break; 3376 3377 case QSTRUIOT: 3378 if (qbp) 3379 error = EINVAL; 3380 else 3381 *(short *)valp = q->q_struiot; 3382 break; 3383 3384 default: 3385 error = EINVAL; 3386 break; 3387 } 3388 done: 3389 if (freezer != curthread) 3390 mutex_exit(QLOCK(q)); 3391 return (error); 3392 } 3393 3394 /* 3395 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3396 * QWANTWSYNC or QWANTR or QWANTW, 3397 * 3398 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3399 * deferred wakeup will be done. Also if strpoll() in progress then a 3400 * deferred pollwakeup will be done. 3401 */ 3402 void 3403 strwakeq(queue_t *q, int flag) 3404 { 3405 stdata_t *stp = STREAM(q); 3406 pollhead_t *pl; 3407 3408 mutex_enter(&stp->sd_lock); 3409 pl = &stp->sd_pollist; 3410 if (flag & QWANTWSYNC) { 3411 ASSERT(!(q->q_flag & QREADR)); 3412 if (stp->sd_flag & WSLEEP) { 3413 stp->sd_flag &= ~WSLEEP; 3414 cv_broadcast(&stp->sd_wrq->q_wait); 3415 } else { 3416 stp->sd_wakeq |= WSLEEP; 3417 } 3418 3419 mutex_exit(&stp->sd_lock); 3420 pollwakeup(pl, POLLWRNORM); 3421 mutex_enter(&stp->sd_lock); 3422 3423 if (stp->sd_sigflags & S_WRNORM) 3424 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3425 } else if (flag & QWANTR) { 3426 if (stp->sd_flag & RSLEEP) { 3427 stp->sd_flag &= ~RSLEEP; 3428 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3429 } else { 3430 stp->sd_wakeq |= RSLEEP; 3431 } 3432 3433 mutex_exit(&stp->sd_lock); 3434 pollwakeup(pl, POLLIN | POLLRDNORM); 3435 mutex_enter(&stp->sd_lock); 3436 3437 { 3438 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3439 3440 if (events) 3441 strsendsig(stp->sd_siglist, events, 0, 0); 3442 } 3443 } else { 3444 if (stp->sd_flag & WSLEEP) { 3445 stp->sd_flag &= ~WSLEEP; 3446 cv_broadcast(&stp->sd_wrq->q_wait); 3447 } 3448 3449 mutex_exit(&stp->sd_lock); 3450 pollwakeup(pl, POLLWRNORM); 3451 mutex_enter(&stp->sd_lock); 3452 3453 if (stp->sd_sigflags & S_WRNORM) 3454 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3455 } 3456 mutex_exit(&stp->sd_lock); 3457 } 3458 3459 int 3460 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3461 { 3462 stdata_t *stp = STREAM(q); 3463 int typ = STRUIOT_STANDARD; 3464 uio_t *uiop = &dp->d_uio; 3465 dblk_t *dbp; 3466 ssize_t uiocnt; 3467 ssize_t cnt; 3468 unsigned char *ptr; 3469 ssize_t resid; 3470 int error = 0; 3471 on_trap_data_t otd; 3472 queue_t *stwrq; 3473 3474 /* 3475 * Plumbing may change while taking the type so store the 3476 * queue in a temporary variable. It doesn't matter even 3477 * if the we take the type from the previous plumbing, 3478 * that's because if the plumbing has changed when we were 3479 * holding the queue in a temporary variable, we can continue 3480 * processing the message the way it would have been processed 3481 * in the old plumbing, without any side effects but a bit 3482 * extra processing for partial ip header checksum. 3483 * 3484 * This has been done to avoid holding the sd_lock which is 3485 * very hot. 3486 */ 3487 3488 stwrq = stp->sd_struiowrq; 3489 if (stwrq) 3490 typ = stwrq->q_struiot; 3491 3492 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3493 dbp = mp->b_datap; 3494 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3495 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3496 cnt = MIN(uiocnt, uiop->uio_resid); 3497 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3498 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3499 /* 3500 * Either this mblk has already been processed 3501 * or there is no more room in this mblk (?). 3502 */ 3503 continue; 3504 } 3505 switch (typ) { 3506 case STRUIOT_STANDARD: 3507 if (noblock) { 3508 if (on_trap(&otd, OT_DATA_ACCESS)) { 3509 no_trap(); 3510 error = EWOULDBLOCK; 3511 goto out; 3512 } 3513 } 3514 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3515 if (noblock) 3516 no_trap(); 3517 goto out; 3518 } 3519 if (noblock) 3520 no_trap(); 3521 break; 3522 3523 default: 3524 error = EIO; 3525 goto out; 3526 } 3527 dbp->db_struioflag |= STRUIO_DONE; 3528 dbp->db_cksumstuff += cnt; 3529 } 3530 out: 3531 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3532 /* 3533 * A fault has occured and some bytes were moved to the 3534 * current mblk, the uio_t has already been updated by 3535 * the appropriate uio routine, so also update the mblk 3536 * to reflect this in case this same mblk chain is used 3537 * again (after the fault has been handled). 3538 */ 3539 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3540 if (uiocnt >= resid) 3541 dbp->db_cksumstuff += resid; 3542 } 3543 return (error); 3544 } 3545 3546 /* 3547 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3548 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3549 * that removeq() will not try to close the queue while a thread is inside the 3550 * queue. 3551 */ 3552 static boolean_t 3553 rwnext_enter(queue_t *qp) 3554 { 3555 mutex_enter(QLOCK(qp)); 3556 if (qp->q_flag & QWCLOSE) { 3557 mutex_exit(QLOCK(qp)); 3558 return (B_FALSE); 3559 } 3560 qp->q_rwcnt++; 3561 ASSERT(qp->q_rwcnt != 0); 3562 mutex_exit(QLOCK(qp)); 3563 return (B_TRUE); 3564 } 3565 3566 /* 3567 * Decrease the count of threads running in sync stream queue and wake up any 3568 * threads blocked in removeq(). 3569 */ 3570 static void 3571 rwnext_exit(queue_t *qp) 3572 { 3573 mutex_enter(QLOCK(qp)); 3574 qp->q_rwcnt--; 3575 if (qp->q_flag & QWANTRMQSYNC) { 3576 qp->q_flag &= ~QWANTRMQSYNC; 3577 cv_broadcast(&qp->q_wait); 3578 } 3579 mutex_exit(QLOCK(qp)); 3580 } 3581 3582 /* 3583 * The purpose of rwnext() is to call the rw procedure of the next 3584 * (downstream) modules queue. 3585 * 3586 * treated as put entrypoint for perimeter syncronization. 3587 * 3588 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3589 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3590 * not matter if any regular put entrypoints have been already entered. We 3591 * can't increment one of the sq_putcounts (instead of sq_count) because 3592 * qwait_rw won't know which counter to decrement. 3593 * 3594 * It would be reasonable to add the lockless FASTPUT logic. 3595 */ 3596 int 3597 rwnext(queue_t *qp, struiod_t *dp) 3598 { 3599 queue_t *nqp; 3600 syncq_t *sq; 3601 uint16_t count; 3602 uint16_t flags; 3603 struct qinit *qi; 3604 int (*proc)(); 3605 struct stdata *stp; 3606 int isread; 3607 int rval; 3608 3609 stp = STREAM(qp); 3610 /* 3611 * Prevent q_next from changing by holding sd_lock until acquiring 3612 * SQLOCK. Note that a read-side rwnext from the streamhead will 3613 * already have sd_lock acquired. In either case sd_lock is always 3614 * released after acquiring SQLOCK. 3615 * 3616 * The streamhead read-side holding sd_lock when calling rwnext is 3617 * required to prevent a race condition were M_DATA mblks flowing 3618 * up the read-side of the stream could be bypassed by a rwnext() 3619 * down-call. In this case sd_lock acts as the streamhead perimeter. 3620 */ 3621 if ((nqp = _WR(qp)) == qp) { 3622 isread = 0; 3623 mutex_enter(&stp->sd_lock); 3624 qp = nqp->q_next; 3625 } else { 3626 isread = 1; 3627 if (nqp != stp->sd_wrq) 3628 /* Not streamhead */ 3629 mutex_enter(&stp->sd_lock); 3630 qp = _RD(nqp->q_next); 3631 } 3632 qi = qp->q_qinfo; 3633 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3634 /* 3635 * Not a synchronous module or no r/w procedure for this 3636 * queue, so just return EINVAL and let the caller handle it. 3637 */ 3638 mutex_exit(&stp->sd_lock); 3639 return (EINVAL); 3640 } 3641 3642 if (rwnext_enter(qp) == B_FALSE) { 3643 mutex_exit(&stp->sd_lock); 3644 return (EINVAL); 3645 } 3646 3647 sq = qp->q_syncq; 3648 mutex_enter(SQLOCK(sq)); 3649 mutex_exit(&stp->sd_lock); 3650 count = sq->sq_count; 3651 flags = sq->sq_flags; 3652 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3653 3654 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3655 /* 3656 * if this queue is being closed, return. 3657 */ 3658 if (qp->q_flag & QWCLOSE) { 3659 mutex_exit(SQLOCK(sq)); 3660 rwnext_exit(qp); 3661 return (EINVAL); 3662 } 3663 3664 /* 3665 * Wait until we can enter the inner perimeter. 3666 */ 3667 sq->sq_flags = flags | SQ_WANTWAKEUP; 3668 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3669 count = sq->sq_count; 3670 flags = sq->sq_flags; 3671 } 3672 3673 if (isread == 0 && stp->sd_struiowrq == NULL || 3674 isread == 1 && stp->sd_struiordq == NULL) { 3675 /* 3676 * Stream plumbing changed while waiting for inner perimeter 3677 * so just return EINVAL and let the caller handle it. 3678 */ 3679 mutex_exit(SQLOCK(sq)); 3680 rwnext_exit(qp); 3681 return (EINVAL); 3682 } 3683 if (!(flags & SQ_CIPUT)) 3684 sq->sq_flags = flags | SQ_EXCL; 3685 sq->sq_count = count + 1; 3686 ASSERT(sq->sq_count != 0); /* Wraparound */ 3687 /* 3688 * Note: The only message ordering guarantee that rwnext() makes is 3689 * for the write queue flow-control case. All others (r/w queue 3690 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3691 * the queue's rw procedure. This could be genralized here buy 3692 * running the queue's service procedure, but that wouldn't be 3693 * the most efficent for all cases. 3694 */ 3695 mutex_exit(SQLOCK(sq)); 3696 if (! isread && (qp->q_flag & QFULL)) { 3697 /* 3698 * Write queue may be flow controlled. If so, 3699 * mark the queue for wakeup when it's not. 3700 */ 3701 mutex_enter(QLOCK(qp)); 3702 if (qp->q_flag & QFULL) { 3703 qp->q_flag |= QWANTWSYNC; 3704 mutex_exit(QLOCK(qp)); 3705 rval = EWOULDBLOCK; 3706 goto out; 3707 } 3708 mutex_exit(QLOCK(qp)); 3709 } 3710 3711 if (! isread && dp->d_mp) 3712 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3713 dp->d_mp->b_datap->db_base); 3714 3715 rval = (*proc)(qp, dp); 3716 3717 if (isread && dp->d_mp) 3718 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3719 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3720 out: 3721 /* 3722 * The queue is protected from being freed by sq_count, so it is 3723 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3724 */ 3725 rwnext_exit(qp); 3726 3727 mutex_enter(SQLOCK(sq)); 3728 flags = sq->sq_flags; 3729 ASSERT(sq->sq_count != 0); 3730 sq->sq_count--; 3731 if (flags & SQ_TAIL) { 3732 putnext_tail(sq, qp, flags); 3733 /* 3734 * The only purpose of this ASSERT is to preserve calling stack 3735 * in DEBUG kernel. 3736 */ 3737 ASSERT(flags & SQ_TAIL); 3738 return (rval); 3739 } 3740 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3741 /* 3742 * Safe to always drop SQ_EXCL: 3743 * Not SQ_CIPUT means we set SQ_EXCL above 3744 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3745 * did a qwriter(INNER) in which case nobody else 3746 * is in the inner perimeter and we are exiting. 3747 * 3748 * I would like to make the following assertion: 3749 * 3750 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3751 * sq->sq_count == 0); 3752 * 3753 * which indicates that if we are both putshared and exclusive, 3754 * we became exclusive while executing the putproc, and the only 3755 * claim on the syncq was the one we dropped a few lines above. 3756 * But other threads that enter putnext while the syncq is exclusive 3757 * need to make a claim as they may need to drop SQLOCK in the 3758 * has_writers case to avoid deadlocks. If these threads are 3759 * delayed or preempted, it is possible that the writer thread can 3760 * find out that there are other claims making the (sq_count == 0) 3761 * test invalid. 3762 */ 3763 3764 sq->sq_flags = flags & ~SQ_EXCL; 3765 if (sq->sq_flags & SQ_WANTWAKEUP) { 3766 sq->sq_flags &= ~SQ_WANTWAKEUP; 3767 cv_broadcast(&sq->sq_wait); 3768 } 3769 mutex_exit(SQLOCK(sq)); 3770 return (rval); 3771 } 3772 3773 /* 3774 * The purpose of infonext() is to call the info procedure of the next 3775 * (downstream) modules queue. 3776 * 3777 * treated as put entrypoint for perimeter syncronization. 3778 * 3779 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3780 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3781 * it does not matter if any regular put entrypoints have been already 3782 * entered. 3783 */ 3784 int 3785 infonext(queue_t *qp, infod_t *idp) 3786 { 3787 queue_t *nqp; 3788 syncq_t *sq; 3789 uint16_t count; 3790 uint16_t flags; 3791 struct qinit *qi; 3792 int (*proc)(); 3793 struct stdata *stp; 3794 int rval; 3795 3796 stp = STREAM(qp); 3797 /* 3798 * Prevent q_next from changing by holding sd_lock until 3799 * acquiring SQLOCK. 3800 */ 3801 mutex_enter(&stp->sd_lock); 3802 if ((nqp = _WR(qp)) == qp) { 3803 qp = nqp->q_next; 3804 } else { 3805 qp = _RD(nqp->q_next); 3806 } 3807 qi = qp->q_qinfo; 3808 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3809 mutex_exit(&stp->sd_lock); 3810 return (EINVAL); 3811 } 3812 sq = qp->q_syncq; 3813 mutex_enter(SQLOCK(sq)); 3814 mutex_exit(&stp->sd_lock); 3815 count = sq->sq_count; 3816 flags = sq->sq_flags; 3817 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3818 3819 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3820 /* 3821 * Wait until we can enter the inner perimeter. 3822 */ 3823 sq->sq_flags = flags | SQ_WANTWAKEUP; 3824 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3825 count = sq->sq_count; 3826 flags = sq->sq_flags; 3827 } 3828 3829 if (! (flags & SQ_CIPUT)) 3830 sq->sq_flags = flags | SQ_EXCL; 3831 sq->sq_count = count + 1; 3832 ASSERT(sq->sq_count != 0); /* Wraparound */ 3833 mutex_exit(SQLOCK(sq)); 3834 3835 rval = (*proc)(qp, idp); 3836 3837 mutex_enter(SQLOCK(sq)); 3838 flags = sq->sq_flags; 3839 ASSERT(sq->sq_count != 0); 3840 sq->sq_count--; 3841 if (flags & SQ_TAIL) { 3842 putnext_tail(sq, qp, flags); 3843 /* 3844 * The only purpose of this ASSERT is to preserve calling stack 3845 * in DEBUG kernel. 3846 */ 3847 ASSERT(flags & SQ_TAIL); 3848 return (rval); 3849 } 3850 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3851 /* 3852 * XXXX 3853 * I am not certain the next comment is correct here. I need to consider 3854 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3855 * might cause other problems. It just might be safer to drop it if 3856 * !SQ_CIPUT because that is when we set it. 3857 */ 3858 /* 3859 * Safe to always drop SQ_EXCL: 3860 * Not SQ_CIPUT means we set SQ_EXCL above 3861 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3862 * did a qwriter(INNER) in which case nobody else 3863 * is in the inner perimeter and we are exiting. 3864 * 3865 * I would like to make the following assertion: 3866 * 3867 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3868 * sq->sq_count == 0); 3869 * 3870 * which indicates that if we are both putshared and exclusive, 3871 * we became exclusive while executing the putproc, and the only 3872 * claim on the syncq was the one we dropped a few lines above. 3873 * But other threads that enter putnext while the syncq is exclusive 3874 * need to make a claim as they may need to drop SQLOCK in the 3875 * has_writers case to avoid deadlocks. If these threads are 3876 * delayed or preempted, it is possible that the writer thread can 3877 * find out that there are other claims making the (sq_count == 0) 3878 * test invalid. 3879 */ 3880 3881 sq->sq_flags = flags & ~SQ_EXCL; 3882 mutex_exit(SQLOCK(sq)); 3883 return (rval); 3884 } 3885 3886 /* 3887 * Return nonzero if the queue is responsible for struio(), else return 0. 3888 */ 3889 int 3890 isuioq(queue_t *q) 3891 { 3892 if (q->q_flag & QREADR) 3893 return (STREAM(q)->sd_struiordq == q); 3894 else 3895 return (STREAM(q)->sd_struiowrq == q); 3896 } 3897 3898 #if defined(__sparc) 3899 int disable_putlocks = 0; 3900 #else 3901 int disable_putlocks = 1; 3902 #endif 3903 3904 /* 3905 * called by create_putlock. 3906 */ 3907 static void 3908 create_syncq_putlocks(queue_t *q) 3909 { 3910 syncq_t *sq = q->q_syncq; 3911 ciputctrl_t *cip; 3912 int i; 3913 3914 ASSERT(sq != NULL); 3915 3916 ASSERT(disable_putlocks == 0); 3917 ASSERT(n_ciputctrl >= min_n_ciputctrl); 3918 ASSERT(ciputctrl_cache != NULL); 3919 3920 if (!(sq->sq_type & SQ_CIPUT)) 3921 return; 3922 3923 for (i = 0; i <= 1; i++) { 3924 if (sq->sq_ciputctrl == NULL) { 3925 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3926 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3927 mutex_enter(SQLOCK(sq)); 3928 if (sq->sq_ciputctrl != NULL) { 3929 mutex_exit(SQLOCK(sq)); 3930 kmem_cache_free(ciputctrl_cache, cip); 3931 } else { 3932 ASSERT(sq->sq_nciputctrl == 0); 3933 sq->sq_nciputctrl = n_ciputctrl - 1; 3934 /* 3935 * putnext checks sq_ciputctrl without holding 3936 * SQLOCK. if it is not NULL putnext assumes 3937 * sq_nciputctrl is initialized. membar below 3938 * insures that. 3939 */ 3940 membar_producer(); 3941 sq->sq_ciputctrl = cip; 3942 mutex_exit(SQLOCK(sq)); 3943 } 3944 } 3945 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 3946 if (i == 1) 3947 break; 3948 q = _OTHERQ(q); 3949 if (!(q->q_flag & QPERQ)) { 3950 ASSERT(sq == q->q_syncq); 3951 break; 3952 } 3953 ASSERT(q->q_syncq != NULL); 3954 ASSERT(sq != q->q_syncq); 3955 sq = q->q_syncq; 3956 ASSERT(sq->sq_type & SQ_CIPUT); 3957 } 3958 } 3959 3960 /* 3961 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 3962 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 3963 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 3964 * starting from q and down to the driver. 3965 * 3966 * This should be called after the affected queues are part of stream 3967 * geometry. It should be called from driver/module open routine after 3968 * qprocson() call. It is also called from nfs syscall where it is known that 3969 * stream is configured and won't change its geometry during create_putlock 3970 * call. 3971 * 3972 * caller normally uses 0 value for the stream argument to speed up MT putnext 3973 * into the perimeter of q for example because its perimeter is per module 3974 * (e.g. IP). 3975 * 3976 * caller normally uses non 0 value for the stream argument to hint the system 3977 * that the stream of q is a very contended global system stream 3978 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 3979 * particularly MT hot. 3980 * 3981 * Caller insures stream plumbing won't happen while we are here and therefore 3982 * q_next can be safely used. 3983 */ 3984 3985 void 3986 create_putlocks(queue_t *q, int stream) 3987 { 3988 ciputctrl_t *cip; 3989 struct stdata *stp = STREAM(q); 3990 3991 q = _WR(q); 3992 ASSERT(stp != NULL); 3993 3994 if (disable_putlocks != 0) 3995 return; 3996 3997 if (n_ciputctrl < min_n_ciputctrl) 3998 return; 3999 4000 ASSERT(ciputctrl_cache != NULL); 4001 4002 if (stream != 0 && stp->sd_ciputctrl == NULL) { 4003 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4004 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4005 mutex_enter(&stp->sd_lock); 4006 if (stp->sd_ciputctrl != NULL) { 4007 mutex_exit(&stp->sd_lock); 4008 kmem_cache_free(ciputctrl_cache, cip); 4009 } else { 4010 ASSERT(stp->sd_nciputctrl == 0); 4011 stp->sd_nciputctrl = n_ciputctrl - 1; 4012 /* 4013 * putnext checks sd_ciputctrl without holding 4014 * sd_lock. if it is not NULL putnext assumes 4015 * sd_nciputctrl is initialized. membar below 4016 * insures that. 4017 */ 4018 membar_producer(); 4019 stp->sd_ciputctrl = cip; 4020 mutex_exit(&stp->sd_lock); 4021 } 4022 } 4023 4024 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 4025 4026 while (_SAMESTR(q)) { 4027 create_syncq_putlocks(q); 4028 if (stream == 0) 4029 return; 4030 q = q->q_next; 4031 } 4032 ASSERT(q != NULL); 4033 create_syncq_putlocks(q); 4034 } 4035 4036 /* 4037 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 4038 * through a stream. 4039 * 4040 * Data currently record per-event is a timestamp, module/driver name, 4041 * downstream module/driver name, optional callstack, event type and a per 4042 * type datum. Much of the STREAMS framework is instrumented for automatic 4043 * flow tracing (when enabled). Events can be defined and used by STREAMS 4044 * modules and drivers. 4045 * 4046 * Global objects: 4047 * 4048 * str_ftevent() - Add a flow-trace event to a dblk. 4049 * str_ftfree() - Free flow-trace data 4050 * 4051 * Local objects: 4052 * 4053 * fthdr_cache - pointer to the kmem cache for trace header. 4054 * ftblk_cache - pointer to the kmem cache for trace data blocks. 4055 */ 4056 4057 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 4058 int str_ftstack = 0; /* Don't record event call stacks */ 4059 4060 void 4061 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 4062 { 4063 ftblk_t *bp = hp->tail; 4064 ftblk_t *nbp; 4065 ftevnt_t *ep; 4066 int ix, nix; 4067 4068 ASSERT(hp != NULL); 4069 4070 for (;;) { 4071 if ((ix = bp->ix) == FTBLK_EVNTS) { 4072 /* 4073 * Tail doesn't have room, so need a new tail. 4074 * 4075 * To make this MT safe, first, allocate a new 4076 * ftblk, and initialize it. To make life a 4077 * little easier, reserve the first slot (mostly 4078 * by making ix = 1). When we are finished with 4079 * the initialization, CAS this pointer to the 4080 * tail. If this succeeds, this is the new 4081 * "next" block. Otherwise, another thread 4082 * got here first, so free the block and start 4083 * again. 4084 */ 4085 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP); 4086 if (nbp == NULL) { 4087 /* no mem, so punt */ 4088 str_ftnever++; 4089 /* free up all flow data? */ 4090 return; 4091 } 4092 nbp->nxt = NULL; 4093 nbp->ix = 1; 4094 /* 4095 * Just in case there is another thread about 4096 * to get the next index, we need to make sure 4097 * the value is there for it. 4098 */ 4099 membar_producer(); 4100 if (casptr(&hp->tail, bp, nbp) == bp) { 4101 /* CAS was successful */ 4102 bp->nxt = nbp; 4103 membar_producer(); 4104 bp = nbp; 4105 ix = 0; 4106 goto cas_good; 4107 } else { 4108 kmem_cache_free(ftblk_cache, nbp); 4109 bp = hp->tail; 4110 continue; 4111 } 4112 } 4113 nix = ix + 1; 4114 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 4115 cas_good: 4116 if (curthread != hp->thread) { 4117 hp->thread = curthread; 4118 evnt |= FTEV_CS; 4119 } 4120 if (CPU->cpu_seqid != hp->cpu_seqid) { 4121 hp->cpu_seqid = CPU->cpu_seqid; 4122 evnt |= FTEV_PS; 4123 } 4124 ep = &bp->ev[ix]; 4125 break; 4126 } 4127 } 4128 4129 if (evnt & FTEV_QMASK) { 4130 queue_t *qp = p; 4131 4132 if (!(qp->q_flag & QREADR)) 4133 evnt |= FTEV_ISWR; 4134 4135 ep->mid = Q2NAME(qp); 4136 4137 /* 4138 * We only record the next queue name for FTEV_PUTNEXT since 4139 * that's the only time we *really* need it, and the putnext() 4140 * code ensures that qp->q_next won't vanish. (We could use 4141 * claimstr()/releasestr() but at a performance cost.) 4142 */ 4143 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL) 4144 ep->midnext = Q2NAME(qp->q_next); 4145 else 4146 ep->midnext = NULL; 4147 } else { 4148 ep->mid = p; 4149 ep->midnext = NULL; 4150 } 4151 4152 if (ep->stk != NULL) 4153 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH); 4154 4155 ep->ts = gethrtime(); 4156 ep->evnt = evnt; 4157 ep->data = data; 4158 hp->hash = (hp->hash << 9) + hp->hash; 4159 hp->hash += (evnt << 16) | data; 4160 hp->hash += (uintptr_t)ep->mid; 4161 } 4162 4163 /* 4164 * Free flow-trace data. 4165 */ 4166 void 4167 str_ftfree(dblk_t *dbp) 4168 { 4169 fthdr_t *hp = dbp->db_fthdr; 4170 ftblk_t *bp = &hp->first; 4171 ftblk_t *nbp; 4172 4173 if (bp != hp->tail || bp->ix != 0) { 4174 /* 4175 * Clear out the hash, have the tail point to itself, and free 4176 * any continuation blocks. 4177 */ 4178 bp = hp->first.nxt; 4179 hp->tail = &hp->first; 4180 hp->hash = 0; 4181 hp->first.nxt = NULL; 4182 hp->first.ix = 0; 4183 while (bp != NULL) { 4184 nbp = bp->nxt; 4185 kmem_cache_free(ftblk_cache, bp); 4186 bp = nbp; 4187 } 4188 } 4189 kmem_cache_free(fthdr_cache, hp); 4190 dbp->db_fthdr = NULL; 4191 } 4192