1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 25 /* 26 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 27 * Use is subject to license terms. 28 */ 29 30 #pragma ident "%Z%%M% %I% %E% SMI" 31 32 #include <sys/types.h> 33 #include <sys/param.h> 34 #include <sys/thread.h> 35 #include <sys/sysmacros.h> 36 #include <sys/stropts.h> 37 #include <sys/stream.h> 38 #include <sys/strsubr.h> 39 #include <sys/strsun.h> 40 #include <sys/conf.h> 41 #include <sys/debug.h> 42 #include <sys/cmn_err.h> 43 #include <sys/kmem.h> 44 #include <sys/atomic.h> 45 #include <sys/errno.h> 46 #include <sys/vtrace.h> 47 #include <sys/ftrace.h> 48 #include <sys/ontrap.h> 49 #include <sys/multidata.h> 50 #include <sys/multidata_impl.h> 51 #include <sys/sdt.h> 52 #include <sys/strft.h> 53 54 #ifdef DEBUG 55 #include <sys/kmem_impl.h> 56 #endif 57 58 /* 59 * This file contains all the STREAMS utility routines that may 60 * be used by modules and drivers. 61 */ 62 63 /* 64 * STREAMS message allocator: principles of operation 65 * 66 * The streams message allocator consists of all the routines that 67 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 68 * dupb(), freeb() and freemsg(). What follows is a high-level view 69 * of how the allocator works. 70 * 71 * Every streams message consists of one or more mblks, a dblk, and data. 72 * All mblks for all types of messages come from a common mblk_cache. 73 * The dblk and data come in several flavors, depending on how the 74 * message is allocated: 75 * 76 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 77 * fixed-size dblk/data caches. For message sizes that are multiples of 78 * PAGESIZE, dblks are allocated separately from the buffer. 79 * The associated buffer is allocated by the constructor using kmem_alloc(). 80 * For all other message sizes, dblk and its associated data is allocated 81 * as a single contiguous chunk of memory. 82 * Objects in these caches consist of a dblk plus its associated data. 83 * allocb() determines the nearest-size cache by table lookup: 84 * the dblk_cache[] array provides the mapping from size to dblk cache. 85 * 86 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 87 * kmem_alloc()'ing a buffer for the data and supplying that 88 * buffer to gesballoc(), described below. 89 * 90 * (3) The four flavors of [d]esballoc[a] are all implemented by a 91 * common routine, gesballoc() ("generic esballoc"). gesballoc() 92 * allocates a dblk from the global dblk_esb_cache and sets db_base, 93 * db_lim and db_frtnp to describe the caller-supplied buffer. 94 * 95 * While there are several routines to allocate messages, there is only 96 * one routine to free messages: freeb(). freeb() simply invokes the 97 * dblk's free method, dbp->db_free(), which is set at allocation time. 98 * 99 * dupb() creates a new reference to a message by allocating a new mblk, 100 * incrementing the dblk reference count and setting the dblk's free 101 * method to dblk_decref(). The dblk's original free method is retained 102 * in db_lastfree. dblk_decref() decrements the reference count on each 103 * freeb(). If this is not the last reference it just frees the mblk; 104 * if this *is* the last reference, it restores db_free to db_lastfree, 105 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 106 * 107 * The implementation makes aggressive use of kmem object caching for 108 * maximum performance. This makes the code simple and compact, but 109 * also a bit abstruse in some places. The invariants that constitute a 110 * message's constructed state, described below, are more subtle than usual. 111 * 112 * Every dblk has an "attached mblk" as part of its constructed state. 113 * The mblk is allocated by the dblk's constructor and remains attached 114 * until the message is either dup'ed or pulled up. In the dupb() case 115 * the mblk association doesn't matter until the last free, at which time 116 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 117 * the mblk association because it swaps the leading mblks of two messages, 118 * so it is responsible for swapping their db_mblk pointers accordingly. 119 * From a constructed-state viewpoint it doesn't matter that a dblk's 120 * attached mblk can change while the message is allocated; all that 121 * matters is that the dblk has *some* attached mblk when it's freed. 122 * 123 * The sizes of the allocb() small-message caches are not magical. 124 * They represent a good trade-off between internal and external 125 * fragmentation for current workloads. They should be reevaluated 126 * periodically, especially if allocations larger than DBLK_MAX_CACHE 127 * become common. We use 64-byte alignment so that dblks don't 128 * straddle cache lines unnecessarily. 129 */ 130 #define DBLK_MAX_CACHE 73728 131 #define DBLK_CACHE_ALIGN 64 132 #define DBLK_MIN_SIZE 8 133 #define DBLK_SIZE_SHIFT 3 134 135 #ifdef _BIG_ENDIAN 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 138 #else 139 #define DBLK_RTFU_SHIFT(field) \ 140 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 141 #endif 142 143 #define DBLK_RTFU(ref, type, flags, uioflag) \ 144 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 145 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 146 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 147 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 148 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 149 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 150 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 151 152 static size_t dblk_sizes[] = { 153 #ifdef _LP64 154 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920, 155 8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688, 156 40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456, 157 #else 158 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968, 159 8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736, 160 40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504, 161 #endif 162 DBLK_MAX_CACHE, 0 163 }; 164 165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 166 static struct kmem_cache *mblk_cache; 167 static struct kmem_cache *dblk_esb_cache; 168 static struct kmem_cache *fthdr_cache; 169 static struct kmem_cache *ftblk_cache; 170 171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 172 static mblk_t *allocb_oversize(size_t size, int flags); 173 static int allocb_tryhard_fails; 174 static void frnop_func(void *arg); 175 frtn_t frnop = { frnop_func }; 176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 177 178 static boolean_t rwnext_enter(queue_t *qp); 179 static void rwnext_exit(queue_t *qp); 180 181 /* 182 * Patchable mblk/dblk kmem_cache flags. 183 */ 184 int dblk_kmem_flags = 0; 185 int mblk_kmem_flags = 0; 186 187 188 static int 189 dblk_constructor(void *buf, void *cdrarg, int kmflags) 190 { 191 dblk_t *dbp = buf; 192 ssize_t msg_size = (ssize_t)cdrarg; 193 size_t index; 194 195 ASSERT(msg_size != 0); 196 197 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 198 199 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 200 201 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 202 return (-1); 203 if ((msg_size & PAGEOFFSET) == 0) { 204 dbp->db_base = kmem_alloc(msg_size, kmflags); 205 if (dbp->db_base == NULL) { 206 kmem_cache_free(mblk_cache, dbp->db_mblk); 207 return (-1); 208 } 209 } else { 210 dbp->db_base = (unsigned char *)&dbp[1]; 211 } 212 213 dbp->db_mblk->b_datap = dbp; 214 dbp->db_cache = dblk_cache[index]; 215 dbp->db_lim = dbp->db_base + msg_size; 216 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 217 dbp->db_frtnp = NULL; 218 dbp->db_fthdr = NULL; 219 dbp->db_credp = NULL; 220 dbp->db_cpid = -1; 221 dbp->db_struioflag = 0; 222 dbp->db_struioun.cksum.flags = 0; 223 return (0); 224 } 225 226 /*ARGSUSED*/ 227 static int 228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 229 { 230 dblk_t *dbp = buf; 231 232 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 233 return (-1); 234 dbp->db_mblk->b_datap = dbp; 235 dbp->db_cache = dblk_esb_cache; 236 dbp->db_fthdr = NULL; 237 dbp->db_credp = NULL; 238 dbp->db_cpid = -1; 239 dbp->db_struioflag = 0; 240 dbp->db_struioun.cksum.flags = 0; 241 return (0); 242 } 243 244 static int 245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 246 { 247 dblk_t *dbp = buf; 248 bcache_t *bcp = (bcache_t *)cdrarg; 249 250 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 251 return (-1); 252 253 if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache, 254 kmflags)) == NULL) { 255 kmem_cache_free(mblk_cache, dbp->db_mblk); 256 return (-1); 257 } 258 259 dbp->db_mblk->b_datap = dbp; 260 dbp->db_cache = (void *)bcp; 261 dbp->db_lim = dbp->db_base + bcp->size; 262 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 263 dbp->db_frtnp = NULL; 264 dbp->db_fthdr = NULL; 265 dbp->db_credp = NULL; 266 dbp->db_cpid = -1; 267 dbp->db_struioflag = 0; 268 dbp->db_struioun.cksum.flags = 0; 269 return (0); 270 } 271 272 /*ARGSUSED*/ 273 static void 274 dblk_destructor(void *buf, void *cdrarg) 275 { 276 dblk_t *dbp = buf; 277 ssize_t msg_size = (ssize_t)cdrarg; 278 279 ASSERT(dbp->db_mblk->b_datap == dbp); 280 281 ASSERT(msg_size != 0); 282 283 ASSERT(dbp->db_struioflag == 0); 284 ASSERT(dbp->db_struioun.cksum.flags == 0); 285 286 if ((msg_size & PAGEOFFSET) == 0) { 287 kmem_free(dbp->db_base, msg_size); 288 } 289 290 kmem_cache_free(mblk_cache, dbp->db_mblk); 291 } 292 293 static void 294 bcache_dblk_destructor(void *buf, void *cdrarg) 295 { 296 dblk_t *dbp = buf; 297 bcache_t *bcp = (bcache_t *)cdrarg; 298 299 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 300 301 ASSERT(dbp->db_mblk->b_datap == dbp); 302 303 ASSERT(dbp->db_struioflag == 0); 304 ASSERT(dbp->db_struioun.cksum.flags == 0); 305 306 kmem_cache_free(mblk_cache, dbp->db_mblk); 307 } 308 309 void 310 streams_msg_init(void) 311 { 312 char name[40]; 313 size_t size; 314 size_t lastsize = DBLK_MIN_SIZE; 315 size_t *sizep; 316 struct kmem_cache *cp; 317 size_t tot_size; 318 int offset; 319 320 mblk_cache = kmem_cache_create("streams_mblk", 321 sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL, 322 mblk_kmem_flags); 323 324 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 325 326 if ((offset = (size & PAGEOFFSET)) != 0) { 327 /* 328 * We are in the middle of a page, dblk should 329 * be allocated on the same page 330 */ 331 tot_size = size + sizeof (dblk_t); 332 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 333 < PAGESIZE); 334 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 335 336 } else { 337 338 /* 339 * buf size is multiple of page size, dblk and 340 * buffer are allocated separately. 341 */ 342 343 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 344 tot_size = sizeof (dblk_t); 345 } 346 347 (void) sprintf(name, "streams_dblk_%ld", size); 348 cp = kmem_cache_create(name, tot_size, 349 DBLK_CACHE_ALIGN, dblk_constructor, 350 dblk_destructor, NULL, 351 (void *)(size), NULL, dblk_kmem_flags); 352 353 while (lastsize <= size) { 354 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 355 lastsize += DBLK_MIN_SIZE; 356 } 357 } 358 359 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", 360 sizeof (dblk_t), DBLK_CACHE_ALIGN, 361 dblk_esb_constructor, dblk_destructor, NULL, 362 (void *) sizeof (dblk_t), NULL, dblk_kmem_flags); 363 fthdr_cache = kmem_cache_create("streams_fthdr", 364 sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 365 ftblk_cache = kmem_cache_create("streams_ftblk", 366 sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 367 368 /* Initialize Multidata caches */ 369 mmd_init(); 370 } 371 372 /*ARGSUSED*/ 373 mblk_t * 374 allocb(size_t size, uint_t pri) 375 { 376 dblk_t *dbp; 377 mblk_t *mp; 378 size_t index; 379 380 index = (size - 1) >> DBLK_SIZE_SHIFT; 381 382 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 383 if (size != 0) { 384 mp = allocb_oversize(size, KM_NOSLEEP); 385 goto out; 386 } 387 index = 0; 388 } 389 390 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 391 mp = NULL; 392 goto out; 393 } 394 395 mp = dbp->db_mblk; 396 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 397 mp->b_next = mp->b_prev = mp->b_cont = NULL; 398 mp->b_rptr = mp->b_wptr = dbp->db_base; 399 mp->b_queue = NULL; 400 MBLK_BAND_FLAG_WORD(mp) = 0; 401 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 402 out: 403 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 404 405 return (mp); 406 } 407 408 mblk_t * 409 allocb_tmpl(size_t size, const mblk_t *tmpl) 410 { 411 mblk_t *mp = allocb(size, 0); 412 413 if (mp != NULL) { 414 cred_t *cr = DB_CRED(tmpl); 415 if (cr != NULL) 416 crhold(mp->b_datap->db_credp = cr); 417 DB_CPID(mp) = DB_CPID(tmpl); 418 DB_TYPE(mp) = DB_TYPE(tmpl); 419 } 420 return (mp); 421 } 422 423 mblk_t * 424 allocb_cred(size_t size, cred_t *cr) 425 { 426 mblk_t *mp = allocb(size, 0); 427 428 if (mp != NULL && cr != NULL) 429 crhold(mp->b_datap->db_credp = cr); 430 431 return (mp); 432 } 433 434 mblk_t * 435 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr) 436 { 437 mblk_t *mp = allocb_wait(size, 0, flags, error); 438 439 if (mp != NULL && cr != NULL) 440 crhold(mp->b_datap->db_credp = cr); 441 442 return (mp); 443 } 444 445 void 446 freeb(mblk_t *mp) 447 { 448 dblk_t *dbp = mp->b_datap; 449 450 ASSERT(dbp->db_ref > 0); 451 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 452 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 453 454 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 455 456 dbp->db_free(mp, dbp); 457 } 458 459 void 460 freemsg(mblk_t *mp) 461 { 462 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 463 while (mp) { 464 dblk_t *dbp = mp->b_datap; 465 mblk_t *mp_cont = mp->b_cont; 466 467 ASSERT(dbp->db_ref > 0); 468 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 469 470 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 471 472 dbp->db_free(mp, dbp); 473 mp = mp_cont; 474 } 475 } 476 477 /* 478 * Reallocate a block for another use. Try hard to use the old block. 479 * If the old data is wanted (copy), leave b_wptr at the end of the data, 480 * otherwise return b_wptr = b_rptr. 481 * 482 * This routine is private and unstable. 483 */ 484 mblk_t * 485 reallocb(mblk_t *mp, size_t size, uint_t copy) 486 { 487 mblk_t *mp1; 488 unsigned char *old_rptr; 489 ptrdiff_t cur_size; 490 491 if (mp == NULL) 492 return (allocb(size, BPRI_HI)); 493 494 cur_size = mp->b_wptr - mp->b_rptr; 495 old_rptr = mp->b_rptr; 496 497 ASSERT(mp->b_datap->db_ref != 0); 498 499 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 500 /* 501 * If the data is wanted and it will fit where it is, no 502 * work is required. 503 */ 504 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 505 return (mp); 506 507 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 508 mp1 = mp; 509 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 510 /* XXX other mp state could be copied too, db_flags ... ? */ 511 mp1->b_cont = mp->b_cont; 512 } else { 513 return (NULL); 514 } 515 516 if (copy) { 517 bcopy(old_rptr, mp1->b_rptr, cur_size); 518 mp1->b_wptr = mp1->b_rptr + cur_size; 519 } 520 521 if (mp != mp1) 522 freeb(mp); 523 524 return (mp1); 525 } 526 527 static void 528 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 529 { 530 ASSERT(dbp->db_mblk == mp); 531 if (dbp->db_fthdr != NULL) 532 str_ftfree(dbp); 533 534 /* set credp and projid to be 'unspecified' before returning to cache */ 535 if (dbp->db_credp != NULL) { 536 crfree(dbp->db_credp); 537 dbp->db_credp = NULL; 538 } 539 dbp->db_cpid = -1; 540 541 /* Reset the struioflag and the checksum flag fields */ 542 dbp->db_struioflag = 0; 543 dbp->db_struioun.cksum.flags = 0; 544 545 /* and the COOKED flag */ 546 dbp->db_flags &= ~DBLK_COOKED; 547 548 kmem_cache_free(dbp->db_cache, dbp); 549 } 550 551 static void 552 dblk_decref(mblk_t *mp, dblk_t *dbp) 553 { 554 if (dbp->db_ref != 1) { 555 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 556 -(1 << DBLK_RTFU_SHIFT(db_ref))); 557 /* 558 * atomic_add_32_nv() just decremented db_ref, so we no longer 559 * have a reference to the dblk, which means another thread 560 * could free it. Therefore we cannot examine the dblk to 561 * determine whether ours was the last reference. Instead, 562 * we extract the new and minimum reference counts from rtfu. 563 * Note that all we're really saying is "if (ref != refmin)". 564 */ 565 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 566 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 567 kmem_cache_free(mblk_cache, mp); 568 return; 569 } 570 } 571 dbp->db_mblk = mp; 572 dbp->db_free = dbp->db_lastfree; 573 dbp->db_lastfree(mp, dbp); 574 } 575 576 mblk_t * 577 dupb(mblk_t *mp) 578 { 579 dblk_t *dbp = mp->b_datap; 580 mblk_t *new_mp; 581 uint32_t oldrtfu, newrtfu; 582 583 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 584 goto out; 585 586 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 587 new_mp->b_rptr = mp->b_rptr; 588 new_mp->b_wptr = mp->b_wptr; 589 new_mp->b_datap = dbp; 590 new_mp->b_queue = NULL; 591 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 592 593 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 594 595 /* 596 * First-dup optimization. The enabling assumption is that there 597 * can can never be a race (in correct code) to dup the first copy 598 * of a message. Therefore we don't need to do it atomically. 599 */ 600 if (dbp->db_free != dblk_decref) { 601 dbp->db_free = dblk_decref; 602 dbp->db_ref++; 603 goto out; 604 } 605 606 do { 607 ASSERT(dbp->db_ref > 0); 608 oldrtfu = DBLK_RTFU_WORD(dbp); 609 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 610 /* 611 * If db_ref is maxed out we can't dup this message anymore. 612 */ 613 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 614 kmem_cache_free(mblk_cache, new_mp); 615 new_mp = NULL; 616 goto out; 617 } 618 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 619 620 out: 621 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 622 return (new_mp); 623 } 624 625 static void 626 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 627 { 628 frtn_t *frp = dbp->db_frtnp; 629 630 ASSERT(dbp->db_mblk == mp); 631 frp->free_func(frp->free_arg); 632 if (dbp->db_fthdr != NULL) 633 str_ftfree(dbp); 634 635 /* set credp and projid to be 'unspecified' before returning to cache */ 636 if (dbp->db_credp != NULL) { 637 crfree(dbp->db_credp); 638 dbp->db_credp = NULL; 639 } 640 dbp->db_cpid = -1; 641 dbp->db_struioflag = 0; 642 dbp->db_struioun.cksum.flags = 0; 643 644 kmem_cache_free(dbp->db_cache, dbp); 645 } 646 647 /*ARGSUSED*/ 648 static void 649 frnop_func(void *arg) 650 { 651 } 652 653 /* 654 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 655 */ 656 static mblk_t * 657 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 658 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 659 { 660 dblk_t *dbp; 661 mblk_t *mp; 662 663 ASSERT(base != NULL && frp != NULL); 664 665 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 666 mp = NULL; 667 goto out; 668 } 669 670 mp = dbp->db_mblk; 671 dbp->db_base = base; 672 dbp->db_lim = base + size; 673 dbp->db_free = dbp->db_lastfree = lastfree; 674 dbp->db_frtnp = frp; 675 DBLK_RTFU_WORD(dbp) = db_rtfu; 676 mp->b_next = mp->b_prev = mp->b_cont = NULL; 677 mp->b_rptr = mp->b_wptr = base; 678 mp->b_queue = NULL; 679 MBLK_BAND_FLAG_WORD(mp) = 0; 680 681 out: 682 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 683 return (mp); 684 } 685 686 /*ARGSUSED*/ 687 mblk_t * 688 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 689 { 690 mblk_t *mp; 691 692 /* 693 * Note that this is structured to allow the common case (i.e. 694 * STREAMS flowtracing disabled) to call gesballoc() with tail 695 * call optimization. 696 */ 697 if (!str_ftnever) { 698 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 699 frp, freebs_enqueue, KM_NOSLEEP); 700 701 if (mp != NULL) 702 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 703 return (mp); 704 } 705 706 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 707 frp, freebs_enqueue, KM_NOSLEEP)); 708 } 709 710 /* 711 * Same as esballoc() but sleeps waiting for memory. 712 */ 713 /*ARGSUSED*/ 714 mblk_t * 715 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 716 { 717 mblk_t *mp; 718 719 /* 720 * Note that this is structured to allow the common case (i.e. 721 * STREAMS flowtracing disabled) to call gesballoc() with tail 722 * call optimization. 723 */ 724 if (!str_ftnever) { 725 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 726 frp, freebs_enqueue, KM_SLEEP); 727 728 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 729 return (mp); 730 } 731 732 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 733 frp, freebs_enqueue, KM_SLEEP)); 734 } 735 736 /*ARGSUSED*/ 737 mblk_t * 738 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 739 { 740 mblk_t *mp; 741 742 /* 743 * Note that this is structured to allow the common case (i.e. 744 * STREAMS flowtracing disabled) to call gesballoc() with tail 745 * call optimization. 746 */ 747 if (!str_ftnever) { 748 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 749 frp, dblk_lastfree_desb, KM_NOSLEEP); 750 751 if (mp != NULL) 752 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 753 return (mp); 754 } 755 756 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 757 frp, dblk_lastfree_desb, KM_NOSLEEP)); 758 } 759 760 /*ARGSUSED*/ 761 mblk_t * 762 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 763 { 764 mblk_t *mp; 765 766 /* 767 * Note that this is structured to allow the common case (i.e. 768 * STREAMS flowtracing disabled) to call gesballoc() with tail 769 * call optimization. 770 */ 771 if (!str_ftnever) { 772 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 773 frp, freebs_enqueue, KM_NOSLEEP); 774 775 if (mp != NULL) 776 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 777 return (mp); 778 } 779 780 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 781 frp, freebs_enqueue, KM_NOSLEEP)); 782 } 783 784 /*ARGSUSED*/ 785 mblk_t * 786 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 787 { 788 mblk_t *mp; 789 790 /* 791 * Note that this is structured to allow the common case (i.e. 792 * STREAMS flowtracing disabled) to call gesballoc() with tail 793 * call optimization. 794 */ 795 if (!str_ftnever) { 796 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 797 frp, dblk_lastfree_desb, KM_NOSLEEP); 798 799 if (mp != NULL) 800 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 801 return (mp); 802 } 803 804 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 805 frp, dblk_lastfree_desb, KM_NOSLEEP)); 806 } 807 808 static void 809 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 810 { 811 bcache_t *bcp = dbp->db_cache; 812 813 ASSERT(dbp->db_mblk == mp); 814 if (dbp->db_fthdr != NULL) 815 str_ftfree(dbp); 816 817 /* set credp and projid to be 'unspecified' before returning to cache */ 818 if (dbp->db_credp != NULL) { 819 crfree(dbp->db_credp); 820 dbp->db_credp = NULL; 821 } 822 dbp->db_cpid = -1; 823 dbp->db_struioflag = 0; 824 dbp->db_struioun.cksum.flags = 0; 825 826 mutex_enter(&bcp->mutex); 827 kmem_cache_free(bcp->dblk_cache, dbp); 828 bcp->alloc--; 829 830 if (bcp->alloc == 0 && bcp->destroy != 0) { 831 kmem_cache_destroy(bcp->dblk_cache); 832 kmem_cache_destroy(bcp->buffer_cache); 833 mutex_exit(&bcp->mutex); 834 mutex_destroy(&bcp->mutex); 835 kmem_free(bcp, sizeof (bcache_t)); 836 } else { 837 mutex_exit(&bcp->mutex); 838 } 839 } 840 841 bcache_t * 842 bcache_create(char *name, size_t size, uint_t align) 843 { 844 bcache_t *bcp; 845 char buffer[255]; 846 847 ASSERT((align & (align - 1)) == 0); 848 849 if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == 850 NULL) { 851 return (NULL); 852 } 853 854 bcp->size = size; 855 bcp->align = align; 856 bcp->alloc = 0; 857 bcp->destroy = 0; 858 859 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 860 861 (void) sprintf(buffer, "%s_buffer_cache", name); 862 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 863 NULL, NULL, NULL, 0); 864 (void) sprintf(buffer, "%s_dblk_cache", name); 865 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 866 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 867 NULL, (void *)bcp, NULL, 0); 868 869 return (bcp); 870 } 871 872 void 873 bcache_destroy(bcache_t *bcp) 874 { 875 ASSERT(bcp != NULL); 876 877 mutex_enter(&bcp->mutex); 878 if (bcp->alloc == 0) { 879 kmem_cache_destroy(bcp->dblk_cache); 880 kmem_cache_destroy(bcp->buffer_cache); 881 mutex_exit(&bcp->mutex); 882 mutex_destroy(&bcp->mutex); 883 kmem_free(bcp, sizeof (bcache_t)); 884 } else { 885 bcp->destroy++; 886 mutex_exit(&bcp->mutex); 887 } 888 } 889 890 /*ARGSUSED*/ 891 mblk_t * 892 bcache_allocb(bcache_t *bcp, uint_t pri) 893 { 894 dblk_t *dbp; 895 mblk_t *mp = NULL; 896 897 ASSERT(bcp != NULL); 898 899 mutex_enter(&bcp->mutex); 900 if (bcp->destroy != 0) { 901 mutex_exit(&bcp->mutex); 902 goto out; 903 } 904 905 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 906 mutex_exit(&bcp->mutex); 907 goto out; 908 } 909 bcp->alloc++; 910 mutex_exit(&bcp->mutex); 911 912 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 913 914 mp = dbp->db_mblk; 915 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 916 mp->b_next = mp->b_prev = mp->b_cont = NULL; 917 mp->b_rptr = mp->b_wptr = dbp->db_base; 918 mp->b_queue = NULL; 919 MBLK_BAND_FLAG_WORD(mp) = 0; 920 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 921 out: 922 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 923 924 return (mp); 925 } 926 927 static void 928 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 929 { 930 ASSERT(dbp->db_mblk == mp); 931 if (dbp->db_fthdr != NULL) 932 str_ftfree(dbp); 933 934 /* set credp and projid to be 'unspecified' before returning to cache */ 935 if (dbp->db_credp != NULL) { 936 crfree(dbp->db_credp); 937 dbp->db_credp = NULL; 938 } 939 dbp->db_cpid = -1; 940 dbp->db_struioflag = 0; 941 dbp->db_struioun.cksum.flags = 0; 942 943 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 944 kmem_cache_free(dbp->db_cache, dbp); 945 } 946 947 static mblk_t * 948 allocb_oversize(size_t size, int kmflags) 949 { 950 mblk_t *mp; 951 void *buf; 952 953 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 954 if ((buf = kmem_alloc(size, kmflags)) == NULL) 955 return (NULL); 956 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 957 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 958 kmem_free(buf, size); 959 960 if (mp != NULL) 961 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 962 963 return (mp); 964 } 965 966 mblk_t * 967 allocb_tryhard(size_t target_size) 968 { 969 size_t size; 970 mblk_t *bp; 971 972 for (size = target_size; size < target_size + 512; 973 size += DBLK_CACHE_ALIGN) 974 if ((bp = allocb(size, BPRI_HI)) != NULL) 975 return (bp); 976 allocb_tryhard_fails++; 977 return (NULL); 978 } 979 980 /* 981 * This routine is consolidation private for STREAMS internal use 982 * This routine may only be called from sync routines (i.e., not 983 * from put or service procedures). It is located here (rather 984 * than strsubr.c) so that we don't have to expose all of the 985 * allocb() implementation details in header files. 986 */ 987 mblk_t * 988 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 989 { 990 dblk_t *dbp; 991 mblk_t *mp; 992 size_t index; 993 994 index = (size -1) >> DBLK_SIZE_SHIFT; 995 996 if (flags & STR_NOSIG) { 997 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 998 if (size != 0) { 999 mp = allocb_oversize(size, KM_SLEEP); 1000 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1001 (uintptr_t)mp); 1002 return (mp); 1003 } 1004 index = 0; 1005 } 1006 1007 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1008 mp = dbp->db_mblk; 1009 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1010 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1011 mp->b_rptr = mp->b_wptr = dbp->db_base; 1012 mp->b_queue = NULL; 1013 MBLK_BAND_FLAG_WORD(mp) = 0; 1014 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1015 1016 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1017 1018 } else { 1019 while ((mp = allocb(size, pri)) == NULL) { 1020 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1021 return (NULL); 1022 } 1023 } 1024 1025 return (mp); 1026 } 1027 1028 /* 1029 * Call function 'func' with 'arg' when a class zero block can 1030 * be allocated with priority 'pri'. 1031 */ 1032 bufcall_id_t 1033 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1034 { 1035 return (bufcall(1, pri, func, arg)); 1036 } 1037 1038 /* 1039 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1040 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1041 * This provides consistency for all internal allocators of ioctl. 1042 */ 1043 mblk_t * 1044 mkiocb(uint_t cmd) 1045 { 1046 struct iocblk *ioc; 1047 mblk_t *mp; 1048 1049 /* 1050 * Allocate enough space for any of the ioctl related messages. 1051 */ 1052 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1053 return (NULL); 1054 1055 bzero(mp->b_rptr, sizeof (union ioctypes)); 1056 1057 /* 1058 * Set the mblk_t information and ptrs correctly. 1059 */ 1060 mp->b_wptr += sizeof (struct iocblk); 1061 mp->b_datap->db_type = M_IOCTL; 1062 1063 /* 1064 * Fill in the fields. 1065 */ 1066 ioc = (struct iocblk *)mp->b_rptr; 1067 ioc->ioc_cmd = cmd; 1068 ioc->ioc_cr = kcred; 1069 ioc->ioc_id = getiocseqno(); 1070 ioc->ioc_flag = IOC_NATIVE; 1071 return (mp); 1072 } 1073 1074 /* 1075 * test if block of given size can be allocated with a request of 1076 * the given priority. 1077 * 'pri' is no longer used, but is retained for compatibility. 1078 */ 1079 /* ARGSUSED */ 1080 int 1081 testb(size_t size, uint_t pri) 1082 { 1083 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1084 } 1085 1086 /* 1087 * Call function 'func' with argument 'arg' when there is a reasonably 1088 * good chance that a block of size 'size' can be allocated. 1089 * 'pri' is no longer used, but is retained for compatibility. 1090 */ 1091 /* ARGSUSED */ 1092 bufcall_id_t 1093 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1094 { 1095 static long bid = 1; /* always odd to save checking for zero */ 1096 bufcall_id_t bc_id; 1097 struct strbufcall *bcp; 1098 1099 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1100 return (0); 1101 1102 bcp->bc_func = func; 1103 bcp->bc_arg = arg; 1104 bcp->bc_size = size; 1105 bcp->bc_next = NULL; 1106 bcp->bc_executor = NULL; 1107 1108 mutex_enter(&strbcall_lock); 1109 /* 1110 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1111 * should be no references to bcp since it may be freed by 1112 * runbufcalls(). Since bcp_id field is returned, we save its value in 1113 * the local var. 1114 */ 1115 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1116 1117 /* 1118 * add newly allocated stream event to existing 1119 * linked list of events. 1120 */ 1121 if (strbcalls.bc_head == NULL) { 1122 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1123 } else { 1124 strbcalls.bc_tail->bc_next = bcp; 1125 strbcalls.bc_tail = bcp; 1126 } 1127 1128 cv_signal(&strbcall_cv); 1129 mutex_exit(&strbcall_lock); 1130 return (bc_id); 1131 } 1132 1133 /* 1134 * Cancel a bufcall request. 1135 */ 1136 void 1137 unbufcall(bufcall_id_t id) 1138 { 1139 strbufcall_t *bcp, *pbcp; 1140 1141 mutex_enter(&strbcall_lock); 1142 again: 1143 pbcp = NULL; 1144 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1145 if (id == bcp->bc_id) 1146 break; 1147 pbcp = bcp; 1148 } 1149 if (bcp) { 1150 if (bcp->bc_executor != NULL) { 1151 if (bcp->bc_executor != curthread) { 1152 cv_wait(&bcall_cv, &strbcall_lock); 1153 goto again; 1154 } 1155 } else { 1156 if (pbcp) 1157 pbcp->bc_next = bcp->bc_next; 1158 else 1159 strbcalls.bc_head = bcp->bc_next; 1160 if (bcp == strbcalls.bc_tail) 1161 strbcalls.bc_tail = pbcp; 1162 kmem_free(bcp, sizeof (strbufcall_t)); 1163 } 1164 } 1165 mutex_exit(&strbcall_lock); 1166 } 1167 1168 /* 1169 * Duplicate a message block by block (uses dupb), returning 1170 * a pointer to the duplicate message. 1171 * Returns a non-NULL value only if the entire message 1172 * was dup'd. 1173 */ 1174 mblk_t * 1175 dupmsg(mblk_t *bp) 1176 { 1177 mblk_t *head, *nbp; 1178 1179 if (!bp || !(nbp = head = dupb(bp))) 1180 return (NULL); 1181 1182 while (bp->b_cont) { 1183 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1184 freemsg(head); 1185 return (NULL); 1186 } 1187 nbp = nbp->b_cont; 1188 bp = bp->b_cont; 1189 } 1190 return (head); 1191 } 1192 1193 #define DUPB_NOLOAN(bp) \ 1194 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1195 copyb((bp)) : dupb((bp))) 1196 1197 mblk_t * 1198 dupmsg_noloan(mblk_t *bp) 1199 { 1200 mblk_t *head, *nbp; 1201 1202 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1203 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1204 return (NULL); 1205 1206 while (bp->b_cont) { 1207 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1208 freemsg(head); 1209 return (NULL); 1210 } 1211 nbp = nbp->b_cont; 1212 bp = bp->b_cont; 1213 } 1214 return (head); 1215 } 1216 1217 /* 1218 * Copy data from message and data block to newly allocated message and 1219 * data block. Returns new message block pointer, or NULL if error. 1220 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1221 * as in the original even when db_base is not word aligned. (bug 1052877) 1222 */ 1223 mblk_t * 1224 copyb(mblk_t *bp) 1225 { 1226 mblk_t *nbp; 1227 dblk_t *dp, *ndp; 1228 uchar_t *base; 1229 size_t size; 1230 size_t unaligned; 1231 1232 ASSERT(bp->b_wptr >= bp->b_rptr); 1233 1234 dp = bp->b_datap; 1235 if (dp->db_fthdr != NULL) 1236 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1237 1238 /* 1239 * Special handling for Multidata message; this should be 1240 * removed once a copy-callback routine is made available. 1241 */ 1242 if (dp->db_type == M_MULTIDATA) { 1243 cred_t *cr; 1244 1245 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1246 return (NULL); 1247 1248 nbp->b_flag = bp->b_flag; 1249 nbp->b_band = bp->b_band; 1250 ndp = nbp->b_datap; 1251 1252 /* See comments below on potential issues. */ 1253 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1254 1255 ASSERT(ndp->db_type == dp->db_type); 1256 cr = dp->db_credp; 1257 if (cr != NULL) 1258 crhold(ndp->db_credp = cr); 1259 ndp->db_cpid = dp->db_cpid; 1260 return (nbp); 1261 } 1262 1263 size = dp->db_lim - dp->db_base; 1264 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1265 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1266 return (NULL); 1267 nbp->b_flag = bp->b_flag; 1268 nbp->b_band = bp->b_band; 1269 ndp = nbp->b_datap; 1270 1271 /* 1272 * Well, here is a potential issue. If we are trying to 1273 * trace a flow, and we copy the message, we might lose 1274 * information about where this message might have been. 1275 * So we should inherit the FT data. On the other hand, 1276 * a user might be interested only in alloc to free data. 1277 * So I guess the real answer is to provide a tunable. 1278 */ 1279 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1280 1281 base = ndp->db_base + unaligned; 1282 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1283 1284 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1285 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1286 1287 return (nbp); 1288 } 1289 1290 /* 1291 * Copy data from message to newly allocated message using new 1292 * data blocks. Returns a pointer to the new message, or NULL if error. 1293 */ 1294 mblk_t * 1295 copymsg(mblk_t *bp) 1296 { 1297 mblk_t *head, *nbp; 1298 1299 if (!bp || !(nbp = head = copyb(bp))) 1300 return (NULL); 1301 1302 while (bp->b_cont) { 1303 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1304 freemsg(head); 1305 return (NULL); 1306 } 1307 nbp = nbp->b_cont; 1308 bp = bp->b_cont; 1309 } 1310 return (head); 1311 } 1312 1313 /* 1314 * link a message block to tail of message 1315 */ 1316 void 1317 linkb(mblk_t *mp, mblk_t *bp) 1318 { 1319 ASSERT(mp && bp); 1320 1321 for (; mp->b_cont; mp = mp->b_cont) 1322 ; 1323 mp->b_cont = bp; 1324 } 1325 1326 /* 1327 * unlink a message block from head of message 1328 * return pointer to new message. 1329 * NULL if message becomes empty. 1330 */ 1331 mblk_t * 1332 unlinkb(mblk_t *bp) 1333 { 1334 mblk_t *bp1; 1335 1336 bp1 = bp->b_cont; 1337 bp->b_cont = NULL; 1338 return (bp1); 1339 } 1340 1341 /* 1342 * remove a message block "bp" from message "mp" 1343 * 1344 * Return pointer to new message or NULL if no message remains. 1345 * Return -1 if bp is not found in message. 1346 */ 1347 mblk_t * 1348 rmvb(mblk_t *mp, mblk_t *bp) 1349 { 1350 mblk_t *tmp; 1351 mblk_t *lastp = NULL; 1352 1353 ASSERT(mp && bp); 1354 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1355 if (tmp == bp) { 1356 if (lastp) 1357 lastp->b_cont = tmp->b_cont; 1358 else 1359 mp = tmp->b_cont; 1360 tmp->b_cont = NULL; 1361 return (mp); 1362 } 1363 lastp = tmp; 1364 } 1365 return ((mblk_t *)-1); 1366 } 1367 1368 /* 1369 * Concatenate and align first len bytes of common 1370 * message type. Len == -1, means concat everything. 1371 * Returns 1 on success, 0 on failure 1372 * After the pullup, mp points to the pulled up data. 1373 */ 1374 int 1375 pullupmsg(mblk_t *mp, ssize_t len) 1376 { 1377 mblk_t *bp, *b_cont; 1378 dblk_t *dbp; 1379 ssize_t n; 1380 uint32_t start, stuff, end, value, flags; 1381 1382 ASSERT(mp->b_datap->db_ref > 0); 1383 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1384 1385 /* 1386 * We won't handle Multidata message, since it contains 1387 * metadata which this function has no knowledge of; we 1388 * assert on DEBUG, and return failure otherwise. 1389 */ 1390 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1391 if (mp->b_datap->db_type == M_MULTIDATA) 1392 return (0); 1393 1394 if (len == -1) { 1395 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1396 return (1); 1397 len = xmsgsize(mp); 1398 } else { 1399 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1400 ASSERT(first_mblk_len >= 0); 1401 /* 1402 * If the length is less than that of the first mblk, 1403 * we want to pull up the message into an aligned mblk. 1404 * Though not part of the spec, some callers assume it. 1405 */ 1406 if (len <= first_mblk_len) { 1407 if (str_aligned(mp->b_rptr)) 1408 return (1); 1409 len = first_mblk_len; 1410 } else if (xmsgsize(mp) < len) 1411 return (0); 1412 } 1413 1414 if ((bp = allocb_tmpl(len, mp)) == NULL) 1415 return (0); 1416 1417 dbp = bp->b_datap; 1418 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1419 mp->b_datap = dbp; /* ... and mp heads the new message */ 1420 mp->b_datap->db_mblk = mp; 1421 bp->b_datap->db_mblk = bp; 1422 mp->b_rptr = mp->b_wptr = dbp->db_base; 1423 1424 /* 1425 * Need to preserve checksum information by copying them 1426 * to mp which heads the pulluped message. 1427 */ 1428 hcksum_retrieve(bp, NULL, NULL, &start, &stuff, &end, &value, &flags); 1429 (void) hcksum_assoc(mp, NULL, NULL, start, stuff, end, value, flags, 0); 1430 1431 do { 1432 ASSERT(bp->b_datap->db_ref > 0); 1433 ASSERT(bp->b_wptr >= bp->b_rptr); 1434 n = MIN(bp->b_wptr - bp->b_rptr, len); 1435 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1436 mp->b_wptr += n; 1437 bp->b_rptr += n; 1438 len -= n; 1439 if (bp->b_rptr != bp->b_wptr) 1440 break; 1441 b_cont = bp->b_cont; 1442 freeb(bp); 1443 bp = b_cont; 1444 } while (len && bp); 1445 1446 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1447 1448 return (1); 1449 } 1450 1451 /* 1452 * Concatenate and align at least the first len bytes of common message 1453 * type. Len == -1 means concatenate everything. The original message is 1454 * unaltered. Returns a pointer to a new message on success, otherwise 1455 * returns NULL. 1456 */ 1457 mblk_t * 1458 msgpullup(mblk_t *mp, ssize_t len) 1459 { 1460 mblk_t *newmp; 1461 ssize_t totlen; 1462 ssize_t n; 1463 uint32_t start, stuff, end, value, flags; 1464 1465 /* 1466 * We won't handle Multidata message, since it contains 1467 * metadata which this function has no knowledge of; we 1468 * assert on DEBUG, and return failure otherwise. 1469 */ 1470 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1471 if (mp->b_datap->db_type == M_MULTIDATA) 1472 return (NULL); 1473 1474 totlen = xmsgsize(mp); 1475 1476 if ((len > 0) && (len > totlen)) 1477 return (NULL); 1478 1479 /* 1480 * Copy all of the first msg type into one new mblk, then dupmsg 1481 * and link the rest onto this. 1482 */ 1483 1484 len = totlen; 1485 1486 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1487 return (NULL); 1488 1489 newmp->b_flag = mp->b_flag; 1490 newmp->b_band = mp->b_band; 1491 1492 /* 1493 * Need to preserve checksum information by copying them 1494 * to newmp which heads the pulluped message. 1495 */ 1496 hcksum_retrieve(mp, NULL, NULL, &start, &stuff, &end, &value, &flags); 1497 (void) hcksum_assoc(newmp, NULL, NULL, start, stuff, end, 1498 value, flags, 0); 1499 1500 while (len > 0) { 1501 n = mp->b_wptr - mp->b_rptr; 1502 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1503 if (n > 0) 1504 bcopy(mp->b_rptr, newmp->b_wptr, n); 1505 newmp->b_wptr += n; 1506 len -= n; 1507 mp = mp->b_cont; 1508 } 1509 1510 if (mp != NULL) { 1511 newmp->b_cont = dupmsg(mp); 1512 if (newmp->b_cont == NULL) { 1513 freemsg(newmp); 1514 return (NULL); 1515 } 1516 } 1517 1518 return (newmp); 1519 } 1520 1521 /* 1522 * Trim bytes from message 1523 * len > 0, trim from head 1524 * len < 0, trim from tail 1525 * Returns 1 on success, 0 on failure. 1526 */ 1527 int 1528 adjmsg(mblk_t *mp, ssize_t len) 1529 { 1530 mblk_t *bp; 1531 mblk_t *save_bp = NULL; 1532 mblk_t *prev_bp; 1533 mblk_t *bcont; 1534 unsigned char type; 1535 ssize_t n; 1536 int fromhead; 1537 int first; 1538 1539 ASSERT(mp != NULL); 1540 /* 1541 * We won't handle Multidata message, since it contains 1542 * metadata which this function has no knowledge of; we 1543 * assert on DEBUG, and return failure otherwise. 1544 */ 1545 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1546 if (mp->b_datap->db_type == M_MULTIDATA) 1547 return (0); 1548 1549 if (len < 0) { 1550 fromhead = 0; 1551 len = -len; 1552 } else { 1553 fromhead = 1; 1554 } 1555 1556 if (xmsgsize(mp) < len) 1557 return (0); 1558 1559 1560 if (fromhead) { 1561 first = 1; 1562 while (len) { 1563 ASSERT(mp->b_wptr >= mp->b_rptr); 1564 n = MIN(mp->b_wptr - mp->b_rptr, len); 1565 mp->b_rptr += n; 1566 len -= n; 1567 1568 /* 1569 * If this is not the first zero length 1570 * message remove it 1571 */ 1572 if (!first && (mp->b_wptr == mp->b_rptr)) { 1573 bcont = mp->b_cont; 1574 freeb(mp); 1575 mp = save_bp->b_cont = bcont; 1576 } else { 1577 save_bp = mp; 1578 mp = mp->b_cont; 1579 } 1580 first = 0; 1581 } 1582 } else { 1583 type = mp->b_datap->db_type; 1584 while (len) { 1585 bp = mp; 1586 save_bp = NULL; 1587 1588 /* 1589 * Find the last message of same type 1590 */ 1591 1592 while (bp && bp->b_datap->db_type == type) { 1593 ASSERT(bp->b_wptr >= bp->b_rptr); 1594 prev_bp = save_bp; 1595 save_bp = bp; 1596 bp = bp->b_cont; 1597 } 1598 if (save_bp == NULL) 1599 break; 1600 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1601 save_bp->b_wptr -= n; 1602 len -= n; 1603 1604 /* 1605 * If this is not the first message 1606 * and we have taken away everything 1607 * from this message, remove it 1608 */ 1609 1610 if ((save_bp != mp) && 1611 (save_bp->b_wptr == save_bp->b_rptr)) { 1612 bcont = save_bp->b_cont; 1613 freeb(save_bp); 1614 prev_bp->b_cont = bcont; 1615 } 1616 } 1617 } 1618 return (1); 1619 } 1620 1621 /* 1622 * get number of data bytes in message 1623 */ 1624 size_t 1625 msgdsize(mblk_t *bp) 1626 { 1627 size_t count = 0; 1628 1629 for (; bp; bp = bp->b_cont) 1630 if (bp->b_datap->db_type == M_DATA) { 1631 ASSERT(bp->b_wptr >= bp->b_rptr); 1632 count += bp->b_wptr - bp->b_rptr; 1633 } 1634 return (count); 1635 } 1636 1637 /* 1638 * Get a message off head of queue 1639 * 1640 * If queue has no buffers then mark queue 1641 * with QWANTR. (queue wants to be read by 1642 * someone when data becomes available) 1643 * 1644 * If there is something to take off then do so. 1645 * If queue falls below hi water mark turn off QFULL 1646 * flag. Decrement weighted count of queue. 1647 * Also turn off QWANTR because queue is being read. 1648 * 1649 * The queue count is maintained on a per-band basis. 1650 * Priority band 0 (normal messages) uses q_count, 1651 * q_lowat, etc. Non-zero priority bands use the 1652 * fields in their respective qband structures 1653 * (qb_count, qb_lowat, etc.) All messages appear 1654 * on the same list, linked via their b_next pointers. 1655 * q_first is the head of the list. q_count does 1656 * not reflect the size of all the messages on the 1657 * queue. It only reflects those messages in the 1658 * normal band of flow. The one exception to this 1659 * deals with high priority messages. They are in 1660 * their own conceptual "band", but are accounted 1661 * against q_count. 1662 * 1663 * If queue count is below the lo water mark and QWANTW 1664 * is set, enable the closest backq which has a service 1665 * procedure and turn off the QWANTW flag. 1666 * 1667 * getq could be built on top of rmvq, but isn't because 1668 * of performance considerations. 1669 * 1670 * A note on the use of q_count and q_mblkcnt: 1671 * q_count is the traditional byte count for messages that 1672 * have been put on a queue. Documentation tells us that 1673 * we shouldn't rely on that count, but some drivers/modules 1674 * do. What was needed, however, is a mechanism to prevent 1675 * runaway streams from consuming all of the resources, 1676 * and particularly be able to flow control zero-length 1677 * messages. q_mblkcnt is used for this purpose. It 1678 * counts the number of mblk's that are being put on 1679 * the queue. The intention here, is that each mblk should 1680 * contain one byte of data and, for the purpose of 1681 * flow-control, logically does. A queue will become 1682 * full when EITHER of these values (q_count and q_mblkcnt) 1683 * reach the highwater mark. It will clear when BOTH 1684 * of them drop below the highwater mark. And it will 1685 * backenable when BOTH of them drop below the lowwater 1686 * mark. 1687 * With this algorithm, a driver/module might be able 1688 * to find a reasonably accurate q_count, and the 1689 * framework can still try and limit resource usage. 1690 */ 1691 mblk_t * 1692 getq(queue_t *q) 1693 { 1694 mblk_t *bp; 1695 uchar_t band = 0; 1696 1697 bp = getq_noenab(q); 1698 if (bp != NULL) 1699 band = bp->b_band; 1700 1701 /* 1702 * Inlined from qbackenable(). 1703 * Quick check without holding the lock. 1704 */ 1705 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1706 return (bp); 1707 1708 qbackenable(q, band); 1709 return (bp); 1710 } 1711 1712 /* 1713 * Calculate number of data bytes in a single data message block taking 1714 * multidata messages into account. 1715 */ 1716 1717 #define ADD_MBLK_SIZE(mp, size) \ 1718 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1719 (size) += MBLKL(mp); \ 1720 } else { \ 1721 uint_t pinuse; \ 1722 \ 1723 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1724 (size) += pinuse; \ 1725 } 1726 1727 /* 1728 * Like getq() but does not backenable. This is used by the stream 1729 * head when a putback() is likely. The caller must call qbackenable() 1730 * after it is done with accessing the queue. 1731 */ 1732 mblk_t * 1733 getq_noenab(queue_t *q) 1734 { 1735 mblk_t *bp; 1736 mblk_t *tmp; 1737 qband_t *qbp; 1738 kthread_id_t freezer; 1739 int bytecnt = 0, mblkcnt = 0; 1740 1741 /* freezestr should allow its caller to call getq/putq */ 1742 freezer = STREAM(q)->sd_freezer; 1743 if (freezer == curthread) { 1744 ASSERT(frozenstr(q)); 1745 ASSERT(MUTEX_HELD(QLOCK(q))); 1746 } else 1747 mutex_enter(QLOCK(q)); 1748 1749 if ((bp = q->q_first) == 0) { 1750 q->q_flag |= QWANTR; 1751 } else { 1752 if ((q->q_first = bp->b_next) == NULL) 1753 q->q_last = NULL; 1754 else 1755 q->q_first->b_prev = NULL; 1756 1757 /* Get message byte count for q_count accounting */ 1758 for (tmp = bp; tmp; tmp = tmp->b_cont) { 1759 ADD_MBLK_SIZE(tmp, bytecnt); 1760 mblkcnt++; 1761 } 1762 1763 if (bp->b_band == 0) { 1764 q->q_count -= bytecnt; 1765 q->q_mblkcnt -= mblkcnt; 1766 if ((q->q_count < q->q_hiwat) && 1767 (q->q_mblkcnt < q->q_hiwat)) { 1768 q->q_flag &= ~QFULL; 1769 } 1770 } else { 1771 int i; 1772 1773 ASSERT(bp->b_band <= q->q_nband); 1774 ASSERT(q->q_bandp != NULL); 1775 ASSERT(MUTEX_HELD(QLOCK(q))); 1776 qbp = q->q_bandp; 1777 i = bp->b_band; 1778 while (--i > 0) 1779 qbp = qbp->qb_next; 1780 if (qbp->qb_first == qbp->qb_last) { 1781 qbp->qb_first = NULL; 1782 qbp->qb_last = NULL; 1783 } else { 1784 qbp->qb_first = bp->b_next; 1785 } 1786 qbp->qb_count -= bytecnt; 1787 qbp->qb_mblkcnt -= mblkcnt; 1788 if ((qbp->qb_count < qbp->qb_hiwat) && 1789 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1790 qbp->qb_flag &= ~QB_FULL; 1791 } 1792 } 1793 q->q_flag &= ~QWANTR; 1794 bp->b_next = NULL; 1795 bp->b_prev = NULL; 1796 } 1797 if (freezer != curthread) 1798 mutex_exit(QLOCK(q)); 1799 1800 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 1801 1802 return (bp); 1803 } 1804 1805 /* 1806 * Determine if a backenable is needed after removing a message in the 1807 * specified band. 1808 * NOTE: This routine assumes that something like getq_noenab() has been 1809 * already called. 1810 * 1811 * For the read side it is ok to hold sd_lock across calling this (and the 1812 * stream head often does). 1813 * But for the write side strwakeq might be invoked and it acquires sd_lock. 1814 */ 1815 void 1816 qbackenable(queue_t *q, uchar_t band) 1817 { 1818 int backenab = 0; 1819 qband_t *qbp; 1820 kthread_id_t freezer; 1821 1822 ASSERT(q); 1823 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 1824 1825 /* 1826 * Quick check without holding the lock. 1827 * OK since after getq() has lowered the q_count these flags 1828 * would not change unless either the qbackenable() is done by 1829 * another thread (which is ok) or the queue has gotten QFULL 1830 * in which case another backenable will take place when the queue 1831 * drops below q_lowat. 1832 */ 1833 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1834 return; 1835 1836 /* freezestr should allow its caller to call getq/putq */ 1837 freezer = STREAM(q)->sd_freezer; 1838 if (freezer == curthread) { 1839 ASSERT(frozenstr(q)); 1840 ASSERT(MUTEX_HELD(QLOCK(q))); 1841 } else 1842 mutex_enter(QLOCK(q)); 1843 1844 if (band == 0) { 1845 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 1846 q->q_mblkcnt < q->q_lowat)) { 1847 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 1848 } 1849 } else { 1850 int i; 1851 1852 ASSERT((unsigned)band <= q->q_nband); 1853 ASSERT(q->q_bandp != NULL); 1854 1855 qbp = q->q_bandp; 1856 i = band; 1857 while (--i > 0) 1858 qbp = qbp->qb_next; 1859 1860 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 1861 qbp->qb_mblkcnt < qbp->qb_lowat)) { 1862 backenab = qbp->qb_flag & QB_WANTW; 1863 } 1864 } 1865 1866 if (backenab == 0) { 1867 if (freezer != curthread) 1868 mutex_exit(QLOCK(q)); 1869 return; 1870 } 1871 1872 /* Have to drop the lock across strwakeq and backenable */ 1873 if (backenab & QWANTWSYNC) 1874 q->q_flag &= ~QWANTWSYNC; 1875 if (backenab & (QWANTW|QB_WANTW)) { 1876 if (band != 0) 1877 qbp->qb_flag &= ~QB_WANTW; 1878 else { 1879 q->q_flag &= ~QWANTW; 1880 } 1881 } 1882 1883 if (freezer != curthread) 1884 mutex_exit(QLOCK(q)); 1885 1886 if (backenab & QWANTWSYNC) 1887 strwakeq(q, QWANTWSYNC); 1888 if (backenab & (QWANTW|QB_WANTW)) 1889 backenable(q, band); 1890 } 1891 1892 /* 1893 * Remove a message from a queue. The queue count and other 1894 * flow control parameters are adjusted and the back queue 1895 * enabled if necessary. 1896 * 1897 * rmvq can be called with the stream frozen, but other utility functions 1898 * holding QLOCK, and by streams modules without any locks/frozen. 1899 */ 1900 void 1901 rmvq(queue_t *q, mblk_t *mp) 1902 { 1903 ASSERT(mp != NULL); 1904 1905 rmvq_noenab(q, mp); 1906 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 1907 /* 1908 * qbackenable can handle a frozen stream but not a "random" 1909 * qlock being held. Drop lock across qbackenable. 1910 */ 1911 mutex_exit(QLOCK(q)); 1912 qbackenable(q, mp->b_band); 1913 mutex_enter(QLOCK(q)); 1914 } else { 1915 qbackenable(q, mp->b_band); 1916 } 1917 } 1918 1919 /* 1920 * Like rmvq() but without any backenabling. 1921 * This exists to handle SR_CONSOL_DATA in strrput(). 1922 */ 1923 void 1924 rmvq_noenab(queue_t *q, mblk_t *mp) 1925 { 1926 mblk_t *tmp; 1927 int i; 1928 qband_t *qbp = NULL; 1929 kthread_id_t freezer; 1930 int bytecnt = 0, mblkcnt = 0; 1931 1932 freezer = STREAM(q)->sd_freezer; 1933 if (freezer == curthread) { 1934 ASSERT(frozenstr(q)); 1935 ASSERT(MUTEX_HELD(QLOCK(q))); 1936 } else if (MUTEX_HELD(QLOCK(q))) { 1937 /* Don't drop lock on exit */ 1938 freezer = curthread; 1939 } else 1940 mutex_enter(QLOCK(q)); 1941 1942 ASSERT(mp->b_band <= q->q_nband); 1943 if (mp->b_band != 0) { /* Adjust band pointers */ 1944 ASSERT(q->q_bandp != NULL); 1945 qbp = q->q_bandp; 1946 i = mp->b_band; 1947 while (--i > 0) 1948 qbp = qbp->qb_next; 1949 if (mp == qbp->qb_first) { 1950 if (mp->b_next && mp->b_band == mp->b_next->b_band) 1951 qbp->qb_first = mp->b_next; 1952 else 1953 qbp->qb_first = NULL; 1954 } 1955 if (mp == qbp->qb_last) { 1956 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 1957 qbp->qb_last = mp->b_prev; 1958 else 1959 qbp->qb_last = NULL; 1960 } 1961 } 1962 1963 /* 1964 * Remove the message from the list. 1965 */ 1966 if (mp->b_prev) 1967 mp->b_prev->b_next = mp->b_next; 1968 else 1969 q->q_first = mp->b_next; 1970 if (mp->b_next) 1971 mp->b_next->b_prev = mp->b_prev; 1972 else 1973 q->q_last = mp->b_prev; 1974 mp->b_next = NULL; 1975 mp->b_prev = NULL; 1976 1977 /* Get the size of the message for q_count accounting */ 1978 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1979 ADD_MBLK_SIZE(tmp, bytecnt); 1980 mblkcnt++; 1981 } 1982 1983 if (mp->b_band == 0) { /* Perform q_count accounting */ 1984 q->q_count -= bytecnt; 1985 q->q_mblkcnt -= mblkcnt; 1986 if ((q->q_count < q->q_hiwat) && 1987 (q->q_mblkcnt < q->q_hiwat)) { 1988 q->q_flag &= ~QFULL; 1989 } 1990 } else { /* Perform qb_count accounting */ 1991 qbp->qb_count -= bytecnt; 1992 qbp->qb_mblkcnt -= mblkcnt; 1993 if ((qbp->qb_count < qbp->qb_hiwat) && 1994 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1995 qbp->qb_flag &= ~QB_FULL; 1996 } 1997 } 1998 if (freezer != curthread) 1999 mutex_exit(QLOCK(q)); 2000 2001 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 2002 } 2003 2004 /* 2005 * Empty a queue. 2006 * If flag is set, remove all messages. Otherwise, remove 2007 * only non-control messages. If queue falls below its low 2008 * water mark, and QWANTW is set, enable the nearest upstream 2009 * service procedure. 2010 * 2011 * Historical note: when merging the M_FLUSH code in strrput with this 2012 * code one difference was discovered. flushq did not have a check 2013 * for q_lowat == 0 in the backenabling test. 2014 * 2015 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2016 * if one exists on the queue. 2017 */ 2018 void 2019 flushq_common(queue_t *q, int flag, int pcproto_flag) 2020 { 2021 mblk_t *mp, *nmp; 2022 qband_t *qbp; 2023 int backenab = 0; 2024 unsigned char bpri; 2025 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2026 2027 if (q->q_first == NULL) 2028 return; 2029 2030 mutex_enter(QLOCK(q)); 2031 mp = q->q_first; 2032 q->q_first = NULL; 2033 q->q_last = NULL; 2034 q->q_count = 0; 2035 q->q_mblkcnt = 0; 2036 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2037 qbp->qb_first = NULL; 2038 qbp->qb_last = NULL; 2039 qbp->qb_count = 0; 2040 qbp->qb_mblkcnt = 0; 2041 qbp->qb_flag &= ~QB_FULL; 2042 } 2043 q->q_flag &= ~QFULL; 2044 mutex_exit(QLOCK(q)); 2045 while (mp) { 2046 nmp = mp->b_next; 2047 mp->b_next = mp->b_prev = NULL; 2048 2049 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2050 2051 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2052 (void) putq(q, mp); 2053 else if (flag || datamsg(mp->b_datap->db_type)) 2054 freemsg(mp); 2055 else 2056 (void) putq(q, mp); 2057 mp = nmp; 2058 } 2059 bpri = 1; 2060 mutex_enter(QLOCK(q)); 2061 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2062 if ((qbp->qb_flag & QB_WANTW) && 2063 (((qbp->qb_count < qbp->qb_lowat) && 2064 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2065 qbp->qb_lowat == 0)) { 2066 qbp->qb_flag &= ~QB_WANTW; 2067 backenab = 1; 2068 qbf[bpri] = 1; 2069 } else 2070 qbf[bpri] = 0; 2071 bpri++; 2072 } 2073 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2074 if ((q->q_flag & QWANTW) && 2075 (((q->q_count < q->q_lowat) && 2076 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2077 q->q_flag &= ~QWANTW; 2078 backenab = 1; 2079 qbf[0] = 1; 2080 } else 2081 qbf[0] = 0; 2082 2083 /* 2084 * If any band can now be written to, and there is a writer 2085 * for that band, then backenable the closest service procedure. 2086 */ 2087 if (backenab) { 2088 mutex_exit(QLOCK(q)); 2089 for (bpri = q->q_nband; bpri != 0; bpri--) 2090 if (qbf[bpri]) 2091 backenable(q, bpri); 2092 if (qbf[0]) 2093 backenable(q, 0); 2094 } else 2095 mutex_exit(QLOCK(q)); 2096 } 2097 2098 /* 2099 * The real flushing takes place in flushq_common. This is done so that 2100 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2101 * or not. Currently the only place that uses this flag is the stream head. 2102 */ 2103 void 2104 flushq(queue_t *q, int flag) 2105 { 2106 flushq_common(q, flag, 0); 2107 } 2108 2109 /* 2110 * Flush the queue of messages of the given priority band. 2111 * There is some duplication of code between flushq and flushband. 2112 * This is because we want to optimize the code as much as possible. 2113 * The assumption is that there will be more messages in the normal 2114 * (priority 0) band than in any other. 2115 * 2116 * Historical note: when merging the M_FLUSH code in strrput with this 2117 * code one difference was discovered. flushband had an extra check for 2118 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2119 * case. That check does not match the man page for flushband and was not 2120 * in the strrput flush code hence it was removed. 2121 */ 2122 void 2123 flushband(queue_t *q, unsigned char pri, int flag) 2124 { 2125 mblk_t *mp; 2126 mblk_t *nmp; 2127 mblk_t *last; 2128 qband_t *qbp; 2129 int band; 2130 2131 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2132 if (pri > q->q_nband) { 2133 return; 2134 } 2135 mutex_enter(QLOCK(q)); 2136 if (pri == 0) { 2137 mp = q->q_first; 2138 q->q_first = NULL; 2139 q->q_last = NULL; 2140 q->q_count = 0; 2141 q->q_mblkcnt = 0; 2142 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2143 qbp->qb_first = NULL; 2144 qbp->qb_last = NULL; 2145 qbp->qb_count = 0; 2146 qbp->qb_mblkcnt = 0; 2147 qbp->qb_flag &= ~QB_FULL; 2148 } 2149 q->q_flag &= ~QFULL; 2150 mutex_exit(QLOCK(q)); 2151 while (mp) { 2152 nmp = mp->b_next; 2153 mp->b_next = mp->b_prev = NULL; 2154 if ((mp->b_band == 0) && 2155 ((flag == FLUSHALL) || 2156 datamsg(mp->b_datap->db_type))) 2157 freemsg(mp); 2158 else 2159 (void) putq(q, mp); 2160 mp = nmp; 2161 } 2162 mutex_enter(QLOCK(q)); 2163 if ((q->q_flag & QWANTW) && 2164 (((q->q_count < q->q_lowat) && 2165 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2166 q->q_flag &= ~QWANTW; 2167 mutex_exit(QLOCK(q)); 2168 2169 backenable(q, pri); 2170 } else 2171 mutex_exit(QLOCK(q)); 2172 } else { /* pri != 0 */ 2173 boolean_t flushed = B_FALSE; 2174 band = pri; 2175 2176 ASSERT(MUTEX_HELD(QLOCK(q))); 2177 qbp = q->q_bandp; 2178 while (--band > 0) 2179 qbp = qbp->qb_next; 2180 mp = qbp->qb_first; 2181 if (mp == NULL) { 2182 mutex_exit(QLOCK(q)); 2183 return; 2184 } 2185 last = qbp->qb_last->b_next; 2186 /* 2187 * rmvq_noenab() and freemsg() are called for each mblk that 2188 * meets the criteria. The loop is executed until the last 2189 * mblk has been processed. 2190 */ 2191 while (mp != last) { 2192 ASSERT(mp->b_band == pri); 2193 nmp = mp->b_next; 2194 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2195 rmvq_noenab(q, mp); 2196 freemsg(mp); 2197 flushed = B_TRUE; 2198 } 2199 mp = nmp; 2200 } 2201 mutex_exit(QLOCK(q)); 2202 2203 /* 2204 * If any mblk(s) has been freed, we know that qbackenable() 2205 * will need to be called. 2206 */ 2207 if (flushed) 2208 qbackenable(q, pri); 2209 } 2210 } 2211 2212 /* 2213 * Return 1 if the queue is not full. If the queue is full, return 2214 * 0 (may not put message) and set QWANTW flag (caller wants to write 2215 * to the queue). 2216 */ 2217 int 2218 canput(queue_t *q) 2219 { 2220 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2221 2222 /* this is for loopback transports, they should not do a canput */ 2223 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2224 2225 /* Find next forward module that has a service procedure */ 2226 q = q->q_nfsrv; 2227 2228 if (!(q->q_flag & QFULL)) { 2229 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2230 return (1); 2231 } 2232 mutex_enter(QLOCK(q)); 2233 if (q->q_flag & QFULL) { 2234 q->q_flag |= QWANTW; 2235 mutex_exit(QLOCK(q)); 2236 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2237 return (0); 2238 } 2239 mutex_exit(QLOCK(q)); 2240 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2241 return (1); 2242 } 2243 2244 /* 2245 * This is the new canput for use with priority bands. Return 1 if the 2246 * band is not full. If the band is full, return 0 (may not put message) 2247 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2248 * write to the queue). 2249 */ 2250 int 2251 bcanput(queue_t *q, unsigned char pri) 2252 { 2253 qband_t *qbp; 2254 2255 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2256 if (!q) 2257 return (0); 2258 2259 /* Find next forward module that has a service procedure */ 2260 q = q->q_nfsrv; 2261 2262 mutex_enter(QLOCK(q)); 2263 if (pri == 0) { 2264 if (q->q_flag & QFULL) { 2265 q->q_flag |= QWANTW; 2266 mutex_exit(QLOCK(q)); 2267 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2268 "bcanput:%p %X %d", q, pri, 0); 2269 return (0); 2270 } 2271 } else { /* pri != 0 */ 2272 if (pri > q->q_nband) { 2273 /* 2274 * No band exists yet, so return success. 2275 */ 2276 mutex_exit(QLOCK(q)); 2277 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2278 "bcanput:%p %X %d", q, pri, 1); 2279 return (1); 2280 } 2281 qbp = q->q_bandp; 2282 while (--pri) 2283 qbp = qbp->qb_next; 2284 if (qbp->qb_flag & QB_FULL) { 2285 qbp->qb_flag |= QB_WANTW; 2286 mutex_exit(QLOCK(q)); 2287 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2288 "bcanput:%p %X %d", q, pri, 0); 2289 return (0); 2290 } 2291 } 2292 mutex_exit(QLOCK(q)); 2293 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2294 "bcanput:%p %X %d", q, pri, 1); 2295 return (1); 2296 } 2297 2298 /* 2299 * Put a message on a queue. 2300 * 2301 * Messages are enqueued on a priority basis. The priority classes 2302 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2303 * and B_NORMAL (type < QPCTL && band == 0). 2304 * 2305 * Add appropriate weighted data block sizes to queue count. 2306 * If queue hits high water mark then set QFULL flag. 2307 * 2308 * If QNOENAB is not set (putq is allowed to enable the queue), 2309 * enable the queue only if the message is PRIORITY, 2310 * or the QWANTR flag is set (indicating that the service procedure 2311 * is ready to read the queue. This implies that a service 2312 * procedure must NEVER put a high priority message back on its own 2313 * queue, as this would result in an infinite loop (!). 2314 */ 2315 int 2316 putq(queue_t *q, mblk_t *bp) 2317 { 2318 mblk_t *tmp; 2319 qband_t *qbp = NULL; 2320 int mcls = (int)queclass(bp); 2321 kthread_id_t freezer; 2322 int bytecnt = 0, mblkcnt = 0; 2323 2324 freezer = STREAM(q)->sd_freezer; 2325 if (freezer == curthread) { 2326 ASSERT(frozenstr(q)); 2327 ASSERT(MUTEX_HELD(QLOCK(q))); 2328 } else 2329 mutex_enter(QLOCK(q)); 2330 2331 /* 2332 * Make sanity checks and if qband structure is not yet 2333 * allocated, do so. 2334 */ 2335 if (mcls == QPCTL) { 2336 if (bp->b_band != 0) 2337 bp->b_band = 0; /* force to be correct */ 2338 } else if (bp->b_band != 0) { 2339 int i; 2340 qband_t **qbpp; 2341 2342 if (bp->b_band > q->q_nband) { 2343 2344 /* 2345 * The qband structure for this priority band is 2346 * not on the queue yet, so we have to allocate 2347 * one on the fly. It would be wasteful to 2348 * associate the qband structures with every 2349 * queue when the queues are allocated. This is 2350 * because most queues will only need the normal 2351 * band of flow which can be described entirely 2352 * by the queue itself. 2353 */ 2354 qbpp = &q->q_bandp; 2355 while (*qbpp) 2356 qbpp = &(*qbpp)->qb_next; 2357 while (bp->b_band > q->q_nband) { 2358 if ((*qbpp = allocband()) == NULL) { 2359 if (freezer != curthread) 2360 mutex_exit(QLOCK(q)); 2361 return (0); 2362 } 2363 (*qbpp)->qb_hiwat = q->q_hiwat; 2364 (*qbpp)->qb_lowat = q->q_lowat; 2365 q->q_nband++; 2366 qbpp = &(*qbpp)->qb_next; 2367 } 2368 } 2369 ASSERT(MUTEX_HELD(QLOCK(q))); 2370 qbp = q->q_bandp; 2371 i = bp->b_band; 2372 while (--i) 2373 qbp = qbp->qb_next; 2374 } 2375 2376 /* 2377 * If queue is empty, add the message and initialize the pointers. 2378 * Otherwise, adjust message pointers and queue pointers based on 2379 * the type of the message and where it belongs on the queue. Some 2380 * code is duplicated to minimize the number of conditionals and 2381 * hopefully minimize the amount of time this routine takes. 2382 */ 2383 if (!q->q_first) { 2384 bp->b_next = NULL; 2385 bp->b_prev = NULL; 2386 q->q_first = bp; 2387 q->q_last = bp; 2388 if (qbp) { 2389 qbp->qb_first = bp; 2390 qbp->qb_last = bp; 2391 } 2392 } else if (!qbp) { /* bp->b_band == 0 */ 2393 2394 /* 2395 * If queue class of message is less than or equal to 2396 * that of the last one on the queue, tack on to the end. 2397 */ 2398 tmp = q->q_last; 2399 if (mcls <= (int)queclass(tmp)) { 2400 bp->b_next = NULL; 2401 bp->b_prev = tmp; 2402 tmp->b_next = bp; 2403 q->q_last = bp; 2404 } else { 2405 tmp = q->q_first; 2406 while ((int)queclass(tmp) >= mcls) 2407 tmp = tmp->b_next; 2408 2409 /* 2410 * Insert bp before tmp. 2411 */ 2412 bp->b_next = tmp; 2413 bp->b_prev = tmp->b_prev; 2414 if (tmp->b_prev) 2415 tmp->b_prev->b_next = bp; 2416 else 2417 q->q_first = bp; 2418 tmp->b_prev = bp; 2419 } 2420 } else { /* bp->b_band != 0 */ 2421 if (qbp->qb_first) { 2422 tmp = qbp->qb_last; 2423 2424 /* 2425 * Insert bp after the last message in this band. 2426 */ 2427 bp->b_next = tmp->b_next; 2428 if (tmp->b_next) 2429 tmp->b_next->b_prev = bp; 2430 else 2431 q->q_last = bp; 2432 bp->b_prev = tmp; 2433 tmp->b_next = bp; 2434 } else { 2435 tmp = q->q_last; 2436 if ((mcls < (int)queclass(tmp)) || 2437 (bp->b_band <= tmp->b_band)) { 2438 2439 /* 2440 * Tack bp on end of queue. 2441 */ 2442 bp->b_next = NULL; 2443 bp->b_prev = tmp; 2444 tmp->b_next = bp; 2445 q->q_last = bp; 2446 } else { 2447 tmp = q->q_first; 2448 while (tmp->b_datap->db_type >= QPCTL) 2449 tmp = tmp->b_next; 2450 while (tmp->b_band >= bp->b_band) 2451 tmp = tmp->b_next; 2452 2453 /* 2454 * Insert bp before tmp. 2455 */ 2456 bp->b_next = tmp; 2457 bp->b_prev = tmp->b_prev; 2458 if (tmp->b_prev) 2459 tmp->b_prev->b_next = bp; 2460 else 2461 q->q_first = bp; 2462 tmp->b_prev = bp; 2463 } 2464 qbp->qb_first = bp; 2465 } 2466 qbp->qb_last = bp; 2467 } 2468 2469 /* Get message byte count for q_count accounting */ 2470 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2471 ADD_MBLK_SIZE(tmp, bytecnt); 2472 mblkcnt++; 2473 } 2474 2475 if (qbp) { 2476 qbp->qb_count += bytecnt; 2477 qbp->qb_mblkcnt += mblkcnt; 2478 if ((qbp->qb_count >= qbp->qb_hiwat) || 2479 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2480 qbp->qb_flag |= QB_FULL; 2481 } 2482 } else { 2483 q->q_count += bytecnt; 2484 q->q_mblkcnt += mblkcnt; 2485 if ((q->q_count >= q->q_hiwat) || 2486 (q->q_mblkcnt >= q->q_hiwat)) { 2487 q->q_flag |= QFULL; 2488 } 2489 } 2490 2491 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2492 2493 if ((mcls > QNORM) || 2494 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2495 qenable_locked(q); 2496 ASSERT(MUTEX_HELD(QLOCK(q))); 2497 if (freezer != curthread) 2498 mutex_exit(QLOCK(q)); 2499 2500 return (1); 2501 } 2502 2503 /* 2504 * Put stuff back at beginning of Q according to priority order. 2505 * See comment on putq above for details. 2506 */ 2507 int 2508 putbq(queue_t *q, mblk_t *bp) 2509 { 2510 mblk_t *tmp; 2511 qband_t *qbp = NULL; 2512 int mcls = (int)queclass(bp); 2513 kthread_id_t freezer; 2514 int bytecnt = 0, mblkcnt = 0; 2515 2516 ASSERT(q && bp); 2517 ASSERT(bp->b_next == NULL); 2518 freezer = STREAM(q)->sd_freezer; 2519 if (freezer == curthread) { 2520 ASSERT(frozenstr(q)); 2521 ASSERT(MUTEX_HELD(QLOCK(q))); 2522 } else 2523 mutex_enter(QLOCK(q)); 2524 2525 /* 2526 * Make sanity checks and if qband structure is not yet 2527 * allocated, do so. 2528 */ 2529 if (mcls == QPCTL) { 2530 if (bp->b_band != 0) 2531 bp->b_band = 0; /* force to be correct */ 2532 } else if (bp->b_band != 0) { 2533 int i; 2534 qband_t **qbpp; 2535 2536 if (bp->b_band > q->q_nband) { 2537 qbpp = &q->q_bandp; 2538 while (*qbpp) 2539 qbpp = &(*qbpp)->qb_next; 2540 while (bp->b_band > q->q_nband) { 2541 if ((*qbpp = allocband()) == NULL) { 2542 if (freezer != curthread) 2543 mutex_exit(QLOCK(q)); 2544 return (0); 2545 } 2546 (*qbpp)->qb_hiwat = q->q_hiwat; 2547 (*qbpp)->qb_lowat = q->q_lowat; 2548 q->q_nband++; 2549 qbpp = &(*qbpp)->qb_next; 2550 } 2551 } 2552 qbp = q->q_bandp; 2553 i = bp->b_band; 2554 while (--i) 2555 qbp = qbp->qb_next; 2556 } 2557 2558 /* 2559 * If queue is empty or if message is high priority, 2560 * place on the front of the queue. 2561 */ 2562 tmp = q->q_first; 2563 if ((!tmp) || (mcls == QPCTL)) { 2564 bp->b_next = tmp; 2565 if (tmp) 2566 tmp->b_prev = bp; 2567 else 2568 q->q_last = bp; 2569 q->q_first = bp; 2570 bp->b_prev = NULL; 2571 if (qbp) { 2572 qbp->qb_first = bp; 2573 qbp->qb_last = bp; 2574 } 2575 } else if (qbp) { /* bp->b_band != 0 */ 2576 tmp = qbp->qb_first; 2577 if (tmp) { 2578 2579 /* 2580 * Insert bp before the first message in this band. 2581 */ 2582 bp->b_next = tmp; 2583 bp->b_prev = tmp->b_prev; 2584 if (tmp->b_prev) 2585 tmp->b_prev->b_next = bp; 2586 else 2587 q->q_first = bp; 2588 tmp->b_prev = bp; 2589 } else { 2590 tmp = q->q_last; 2591 if ((mcls < (int)queclass(tmp)) || 2592 (bp->b_band < tmp->b_band)) { 2593 2594 /* 2595 * Tack bp on end of queue. 2596 */ 2597 bp->b_next = NULL; 2598 bp->b_prev = tmp; 2599 tmp->b_next = bp; 2600 q->q_last = bp; 2601 } else { 2602 tmp = q->q_first; 2603 while (tmp->b_datap->db_type >= QPCTL) 2604 tmp = tmp->b_next; 2605 while (tmp->b_band > bp->b_band) 2606 tmp = tmp->b_next; 2607 2608 /* 2609 * Insert bp before tmp. 2610 */ 2611 bp->b_next = tmp; 2612 bp->b_prev = tmp->b_prev; 2613 if (tmp->b_prev) 2614 tmp->b_prev->b_next = bp; 2615 else 2616 q->q_first = bp; 2617 tmp->b_prev = bp; 2618 } 2619 qbp->qb_last = bp; 2620 } 2621 qbp->qb_first = bp; 2622 } else { /* bp->b_band == 0 && !QPCTL */ 2623 2624 /* 2625 * If the queue class or band is less than that of the last 2626 * message on the queue, tack bp on the end of the queue. 2627 */ 2628 tmp = q->q_last; 2629 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2630 bp->b_next = NULL; 2631 bp->b_prev = tmp; 2632 tmp->b_next = bp; 2633 q->q_last = bp; 2634 } else { 2635 tmp = q->q_first; 2636 while (tmp->b_datap->db_type >= QPCTL) 2637 tmp = tmp->b_next; 2638 while (tmp->b_band > bp->b_band) 2639 tmp = tmp->b_next; 2640 2641 /* 2642 * Insert bp before tmp. 2643 */ 2644 bp->b_next = tmp; 2645 bp->b_prev = tmp->b_prev; 2646 if (tmp->b_prev) 2647 tmp->b_prev->b_next = bp; 2648 else 2649 q->q_first = bp; 2650 tmp->b_prev = bp; 2651 } 2652 } 2653 2654 /* Get message byte count for q_count accounting */ 2655 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2656 ADD_MBLK_SIZE(tmp, bytecnt); 2657 mblkcnt++; 2658 } 2659 if (qbp) { 2660 qbp->qb_count += bytecnt; 2661 qbp->qb_mblkcnt += mblkcnt; 2662 if ((qbp->qb_count >= qbp->qb_hiwat) || 2663 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2664 qbp->qb_flag |= QB_FULL; 2665 } 2666 } else { 2667 q->q_count += bytecnt; 2668 q->q_mblkcnt += mblkcnt; 2669 if ((q->q_count >= q->q_hiwat) || 2670 (q->q_mblkcnt >= q->q_hiwat)) { 2671 q->q_flag |= QFULL; 2672 } 2673 } 2674 2675 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2676 2677 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2678 qenable_locked(q); 2679 ASSERT(MUTEX_HELD(QLOCK(q))); 2680 if (freezer != curthread) 2681 mutex_exit(QLOCK(q)); 2682 2683 return (1); 2684 } 2685 2686 /* 2687 * Insert a message before an existing message on the queue. If the 2688 * existing message is NULL, the new messages is placed on the end of 2689 * the queue. The queue class of the new message is ignored. However, 2690 * the priority band of the new message must adhere to the following 2691 * ordering: 2692 * 2693 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2694 * 2695 * All flow control parameters are updated. 2696 * 2697 * insq can be called with the stream frozen, but other utility functions 2698 * holding QLOCK, and by streams modules without any locks/frozen. 2699 */ 2700 int 2701 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2702 { 2703 mblk_t *tmp; 2704 qband_t *qbp = NULL; 2705 int mcls = (int)queclass(mp); 2706 kthread_id_t freezer; 2707 int bytecnt = 0, mblkcnt = 0; 2708 2709 freezer = STREAM(q)->sd_freezer; 2710 if (freezer == curthread) { 2711 ASSERT(frozenstr(q)); 2712 ASSERT(MUTEX_HELD(QLOCK(q))); 2713 } else if (MUTEX_HELD(QLOCK(q))) { 2714 /* Don't drop lock on exit */ 2715 freezer = curthread; 2716 } else 2717 mutex_enter(QLOCK(q)); 2718 2719 if (mcls == QPCTL) { 2720 if (mp->b_band != 0) 2721 mp->b_band = 0; /* force to be correct */ 2722 if (emp && emp->b_prev && 2723 (emp->b_prev->b_datap->db_type < QPCTL)) 2724 goto badord; 2725 } 2726 if (emp) { 2727 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2728 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2729 (emp->b_prev->b_band < mp->b_band))) { 2730 goto badord; 2731 } 2732 } else { 2733 tmp = q->q_last; 2734 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2735 badord: 2736 cmn_err(CE_WARN, 2737 "insq: attempt to insert message out of order " 2738 "on q %p", (void *)q); 2739 if (freezer != curthread) 2740 mutex_exit(QLOCK(q)); 2741 return (0); 2742 } 2743 } 2744 2745 if (mp->b_band != 0) { 2746 int i; 2747 qband_t **qbpp; 2748 2749 if (mp->b_band > q->q_nband) { 2750 qbpp = &q->q_bandp; 2751 while (*qbpp) 2752 qbpp = &(*qbpp)->qb_next; 2753 while (mp->b_band > q->q_nband) { 2754 if ((*qbpp = allocband()) == NULL) { 2755 if (freezer != curthread) 2756 mutex_exit(QLOCK(q)); 2757 return (0); 2758 } 2759 (*qbpp)->qb_hiwat = q->q_hiwat; 2760 (*qbpp)->qb_lowat = q->q_lowat; 2761 q->q_nband++; 2762 qbpp = &(*qbpp)->qb_next; 2763 } 2764 } 2765 qbp = q->q_bandp; 2766 i = mp->b_band; 2767 while (--i) 2768 qbp = qbp->qb_next; 2769 } 2770 2771 if ((mp->b_next = emp) != NULL) { 2772 if ((mp->b_prev = emp->b_prev) != NULL) 2773 emp->b_prev->b_next = mp; 2774 else 2775 q->q_first = mp; 2776 emp->b_prev = mp; 2777 } else { 2778 if ((mp->b_prev = q->q_last) != NULL) 2779 q->q_last->b_next = mp; 2780 else 2781 q->q_first = mp; 2782 q->q_last = mp; 2783 } 2784 2785 /* Get mblk and byte count for q_count accounting */ 2786 for (tmp = mp; tmp; tmp = tmp->b_cont) { 2787 ADD_MBLK_SIZE(tmp, bytecnt); 2788 mblkcnt++; 2789 } 2790 2791 if (qbp) { /* adjust qband pointers and count */ 2792 if (!qbp->qb_first) { 2793 qbp->qb_first = mp; 2794 qbp->qb_last = mp; 2795 } else { 2796 if (mp->b_prev == NULL || (mp->b_prev != NULL && 2797 (mp->b_prev->b_band != mp->b_band))) 2798 qbp->qb_first = mp; 2799 else if (mp->b_next == NULL || (mp->b_next != NULL && 2800 (mp->b_next->b_band != mp->b_band))) 2801 qbp->qb_last = mp; 2802 } 2803 qbp->qb_count += bytecnt; 2804 qbp->qb_mblkcnt += mblkcnt; 2805 if ((qbp->qb_count >= qbp->qb_hiwat) || 2806 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2807 qbp->qb_flag |= QB_FULL; 2808 } 2809 } else { 2810 q->q_count += bytecnt; 2811 q->q_mblkcnt += mblkcnt; 2812 if ((q->q_count >= q->q_hiwat) || 2813 (q->q_mblkcnt >= q->q_hiwat)) { 2814 q->q_flag |= QFULL; 2815 } 2816 } 2817 2818 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 2819 2820 if (canenable(q) && (q->q_flag & QWANTR)) 2821 qenable_locked(q); 2822 2823 ASSERT(MUTEX_HELD(QLOCK(q))); 2824 if (freezer != curthread) 2825 mutex_exit(QLOCK(q)); 2826 2827 return (1); 2828 } 2829 2830 /* 2831 * Create and put a control message on queue. 2832 */ 2833 int 2834 putctl(queue_t *q, int type) 2835 { 2836 mblk_t *bp; 2837 2838 if ((datamsg(type) && (type != M_DELAY)) || 2839 (bp = allocb_tryhard(0)) == NULL) 2840 return (0); 2841 bp->b_datap->db_type = (unsigned char) type; 2842 2843 put(q, bp); 2844 2845 return (1); 2846 } 2847 2848 /* 2849 * Control message with a single-byte parameter 2850 */ 2851 int 2852 putctl1(queue_t *q, int type, int param) 2853 { 2854 mblk_t *bp; 2855 2856 if ((datamsg(type) && (type != M_DELAY)) || 2857 (bp = allocb_tryhard(1)) == NULL) 2858 return (0); 2859 bp->b_datap->db_type = (unsigned char)type; 2860 *bp->b_wptr++ = (unsigned char)param; 2861 2862 put(q, bp); 2863 2864 return (1); 2865 } 2866 2867 int 2868 putnextctl1(queue_t *q, int type, int param) 2869 { 2870 mblk_t *bp; 2871 2872 if ((datamsg(type) && (type != M_DELAY)) || 2873 ((bp = allocb_tryhard(1)) == NULL)) 2874 return (0); 2875 2876 bp->b_datap->db_type = (unsigned char)type; 2877 *bp->b_wptr++ = (unsigned char)param; 2878 2879 putnext(q, bp); 2880 2881 return (1); 2882 } 2883 2884 int 2885 putnextctl(queue_t *q, int type) 2886 { 2887 mblk_t *bp; 2888 2889 if ((datamsg(type) && (type != M_DELAY)) || 2890 ((bp = allocb_tryhard(0)) == NULL)) 2891 return (0); 2892 bp->b_datap->db_type = (unsigned char)type; 2893 2894 putnext(q, bp); 2895 2896 return (1); 2897 } 2898 2899 /* 2900 * Return the queue upstream from this one 2901 */ 2902 queue_t * 2903 backq(queue_t *q) 2904 { 2905 q = _OTHERQ(q); 2906 if (q->q_next) { 2907 q = q->q_next; 2908 return (_OTHERQ(q)); 2909 } 2910 return (NULL); 2911 } 2912 2913 /* 2914 * Send a block back up the queue in reverse from this 2915 * one (e.g. to respond to ioctls) 2916 */ 2917 void 2918 qreply(queue_t *q, mblk_t *bp) 2919 { 2920 ASSERT(q && bp); 2921 2922 putnext(_OTHERQ(q), bp); 2923 } 2924 2925 /* 2926 * Streams Queue Scheduling 2927 * 2928 * Queues are enabled through qenable() when they have messages to 2929 * process. They are serviced by queuerun(), which runs each enabled 2930 * queue's service procedure. The call to queuerun() is processor 2931 * dependent - the general principle is that it be run whenever a queue 2932 * is enabled but before returning to user level. For system calls, 2933 * the function runqueues() is called if their action causes a queue 2934 * to be enabled. For device interrupts, queuerun() should be 2935 * called before returning from the last level of interrupt. Beyond 2936 * this, no timing assumptions should be made about queue scheduling. 2937 */ 2938 2939 /* 2940 * Enable a queue: put it on list of those whose service procedures are 2941 * ready to run and set up the scheduling mechanism. 2942 * The broadcast is done outside the mutex -> to avoid the woken thread 2943 * from contending with the mutex. This is OK 'cos the queue has been 2944 * enqueued on the runlist and flagged safely at this point. 2945 */ 2946 void 2947 qenable(queue_t *q) 2948 { 2949 mutex_enter(QLOCK(q)); 2950 qenable_locked(q); 2951 mutex_exit(QLOCK(q)); 2952 } 2953 /* 2954 * Return number of messages on queue 2955 */ 2956 int 2957 qsize(queue_t *qp) 2958 { 2959 int count = 0; 2960 mblk_t *mp; 2961 2962 mutex_enter(QLOCK(qp)); 2963 for (mp = qp->q_first; mp; mp = mp->b_next) 2964 count++; 2965 mutex_exit(QLOCK(qp)); 2966 return (count); 2967 } 2968 2969 /* 2970 * noenable - set queue so that putq() will not enable it. 2971 * enableok - set queue so that putq() can enable it. 2972 */ 2973 void 2974 noenable(queue_t *q) 2975 { 2976 mutex_enter(QLOCK(q)); 2977 q->q_flag |= QNOENB; 2978 mutex_exit(QLOCK(q)); 2979 } 2980 2981 void 2982 enableok(queue_t *q) 2983 { 2984 mutex_enter(QLOCK(q)); 2985 q->q_flag &= ~QNOENB; 2986 mutex_exit(QLOCK(q)); 2987 } 2988 2989 /* 2990 * Set queue fields. 2991 */ 2992 int 2993 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 2994 { 2995 qband_t *qbp = NULL; 2996 queue_t *wrq; 2997 int error = 0; 2998 kthread_id_t freezer; 2999 3000 freezer = STREAM(q)->sd_freezer; 3001 if (freezer == curthread) { 3002 ASSERT(frozenstr(q)); 3003 ASSERT(MUTEX_HELD(QLOCK(q))); 3004 } else 3005 mutex_enter(QLOCK(q)); 3006 3007 if (what >= QBAD) { 3008 error = EINVAL; 3009 goto done; 3010 } 3011 if (pri != 0) { 3012 int i; 3013 qband_t **qbpp; 3014 3015 if (pri > q->q_nband) { 3016 qbpp = &q->q_bandp; 3017 while (*qbpp) 3018 qbpp = &(*qbpp)->qb_next; 3019 while (pri > q->q_nband) { 3020 if ((*qbpp = allocband()) == NULL) { 3021 error = EAGAIN; 3022 goto done; 3023 } 3024 (*qbpp)->qb_hiwat = q->q_hiwat; 3025 (*qbpp)->qb_lowat = q->q_lowat; 3026 q->q_nband++; 3027 qbpp = &(*qbpp)->qb_next; 3028 } 3029 } 3030 qbp = q->q_bandp; 3031 i = pri; 3032 while (--i) 3033 qbp = qbp->qb_next; 3034 } 3035 switch (what) { 3036 3037 case QHIWAT: 3038 if (qbp) 3039 qbp->qb_hiwat = (size_t)val; 3040 else 3041 q->q_hiwat = (size_t)val; 3042 break; 3043 3044 case QLOWAT: 3045 if (qbp) 3046 qbp->qb_lowat = (size_t)val; 3047 else 3048 q->q_lowat = (size_t)val; 3049 break; 3050 3051 case QMAXPSZ: 3052 if (qbp) 3053 error = EINVAL; 3054 else 3055 q->q_maxpsz = (ssize_t)val; 3056 3057 /* 3058 * Performance concern, strwrite looks at the module below 3059 * the stream head for the maxpsz each time it does a write 3060 * we now cache it at the stream head. Check to see if this 3061 * queue is sitting directly below the stream head. 3062 */ 3063 wrq = STREAM(q)->sd_wrq; 3064 if (q != wrq->q_next) 3065 break; 3066 3067 /* 3068 * If the stream is not frozen drop the current QLOCK and 3069 * acquire the sd_wrq QLOCK which protects sd_qn_* 3070 */ 3071 if (freezer != curthread) { 3072 mutex_exit(QLOCK(q)); 3073 mutex_enter(QLOCK(wrq)); 3074 } 3075 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3076 3077 if (strmsgsz != 0) { 3078 if (val == INFPSZ) 3079 val = strmsgsz; 3080 else { 3081 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3082 val = MIN(PIPE_BUF, val); 3083 else 3084 val = MIN(strmsgsz, val); 3085 } 3086 } 3087 STREAM(q)->sd_qn_maxpsz = val; 3088 if (freezer != curthread) { 3089 mutex_exit(QLOCK(wrq)); 3090 mutex_enter(QLOCK(q)); 3091 } 3092 break; 3093 3094 case QMINPSZ: 3095 if (qbp) 3096 error = EINVAL; 3097 else 3098 q->q_minpsz = (ssize_t)val; 3099 3100 /* 3101 * Performance concern, strwrite looks at the module below 3102 * the stream head for the maxpsz each time it does a write 3103 * we now cache it at the stream head. Check to see if this 3104 * queue is sitting directly below the stream head. 3105 */ 3106 wrq = STREAM(q)->sd_wrq; 3107 if (q != wrq->q_next) 3108 break; 3109 3110 /* 3111 * If the stream is not frozen drop the current QLOCK and 3112 * acquire the sd_wrq QLOCK which protects sd_qn_* 3113 */ 3114 if (freezer != curthread) { 3115 mutex_exit(QLOCK(q)); 3116 mutex_enter(QLOCK(wrq)); 3117 } 3118 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3119 3120 if (freezer != curthread) { 3121 mutex_exit(QLOCK(wrq)); 3122 mutex_enter(QLOCK(q)); 3123 } 3124 break; 3125 3126 case QSTRUIOT: 3127 if (qbp) 3128 error = EINVAL; 3129 else 3130 q->q_struiot = (ushort_t)val; 3131 break; 3132 3133 case QCOUNT: 3134 case QFIRST: 3135 case QLAST: 3136 case QFLAG: 3137 error = EPERM; 3138 break; 3139 3140 default: 3141 error = EINVAL; 3142 break; 3143 } 3144 done: 3145 if (freezer != curthread) 3146 mutex_exit(QLOCK(q)); 3147 return (error); 3148 } 3149 3150 /* 3151 * Get queue fields. 3152 */ 3153 int 3154 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3155 { 3156 qband_t *qbp = NULL; 3157 int error = 0; 3158 kthread_id_t freezer; 3159 3160 freezer = STREAM(q)->sd_freezer; 3161 if (freezer == curthread) { 3162 ASSERT(frozenstr(q)); 3163 ASSERT(MUTEX_HELD(QLOCK(q))); 3164 } else 3165 mutex_enter(QLOCK(q)); 3166 if (what >= QBAD) { 3167 error = EINVAL; 3168 goto done; 3169 } 3170 if (pri != 0) { 3171 int i; 3172 qband_t **qbpp; 3173 3174 if (pri > q->q_nband) { 3175 qbpp = &q->q_bandp; 3176 while (*qbpp) 3177 qbpp = &(*qbpp)->qb_next; 3178 while (pri > q->q_nband) { 3179 if ((*qbpp = allocband()) == NULL) { 3180 error = EAGAIN; 3181 goto done; 3182 } 3183 (*qbpp)->qb_hiwat = q->q_hiwat; 3184 (*qbpp)->qb_lowat = q->q_lowat; 3185 q->q_nband++; 3186 qbpp = &(*qbpp)->qb_next; 3187 } 3188 } 3189 qbp = q->q_bandp; 3190 i = pri; 3191 while (--i) 3192 qbp = qbp->qb_next; 3193 } 3194 switch (what) { 3195 case QHIWAT: 3196 if (qbp) 3197 *(size_t *)valp = qbp->qb_hiwat; 3198 else 3199 *(size_t *)valp = q->q_hiwat; 3200 break; 3201 3202 case QLOWAT: 3203 if (qbp) 3204 *(size_t *)valp = qbp->qb_lowat; 3205 else 3206 *(size_t *)valp = q->q_lowat; 3207 break; 3208 3209 case QMAXPSZ: 3210 if (qbp) 3211 error = EINVAL; 3212 else 3213 *(ssize_t *)valp = q->q_maxpsz; 3214 break; 3215 3216 case QMINPSZ: 3217 if (qbp) 3218 error = EINVAL; 3219 else 3220 *(ssize_t *)valp = q->q_minpsz; 3221 break; 3222 3223 case QCOUNT: 3224 if (qbp) 3225 *(size_t *)valp = qbp->qb_count; 3226 else 3227 *(size_t *)valp = q->q_count; 3228 break; 3229 3230 case QFIRST: 3231 if (qbp) 3232 *(mblk_t **)valp = qbp->qb_first; 3233 else 3234 *(mblk_t **)valp = q->q_first; 3235 break; 3236 3237 case QLAST: 3238 if (qbp) 3239 *(mblk_t **)valp = qbp->qb_last; 3240 else 3241 *(mblk_t **)valp = q->q_last; 3242 break; 3243 3244 case QFLAG: 3245 if (qbp) 3246 *(uint_t *)valp = qbp->qb_flag; 3247 else 3248 *(uint_t *)valp = q->q_flag; 3249 break; 3250 3251 case QSTRUIOT: 3252 if (qbp) 3253 error = EINVAL; 3254 else 3255 *(short *)valp = q->q_struiot; 3256 break; 3257 3258 default: 3259 error = EINVAL; 3260 break; 3261 } 3262 done: 3263 if (freezer != curthread) 3264 mutex_exit(QLOCK(q)); 3265 return (error); 3266 } 3267 3268 /* 3269 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3270 * QWANTWSYNC or QWANTR or QWANTW, 3271 * 3272 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3273 * deferred wakeup will be done. Also if strpoll() in progress then a 3274 * deferred pollwakeup will be done. 3275 */ 3276 void 3277 strwakeq(queue_t *q, int flag) 3278 { 3279 stdata_t *stp = STREAM(q); 3280 pollhead_t *pl; 3281 3282 mutex_enter(&stp->sd_lock); 3283 pl = &stp->sd_pollist; 3284 if (flag & QWANTWSYNC) { 3285 ASSERT(!(q->q_flag & QREADR)); 3286 if (stp->sd_flag & WSLEEP) { 3287 stp->sd_flag &= ~WSLEEP; 3288 cv_broadcast(&stp->sd_wrq->q_wait); 3289 } else { 3290 stp->sd_wakeq |= WSLEEP; 3291 } 3292 3293 mutex_exit(&stp->sd_lock); 3294 pollwakeup(pl, POLLWRNORM); 3295 mutex_enter(&stp->sd_lock); 3296 3297 if (stp->sd_sigflags & S_WRNORM) 3298 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3299 } else if (flag & QWANTR) { 3300 if (stp->sd_flag & RSLEEP) { 3301 stp->sd_flag &= ~RSLEEP; 3302 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3303 } else { 3304 stp->sd_wakeq |= RSLEEP; 3305 } 3306 3307 mutex_exit(&stp->sd_lock); 3308 pollwakeup(pl, POLLIN | POLLRDNORM); 3309 mutex_enter(&stp->sd_lock); 3310 3311 { 3312 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3313 3314 if (events) 3315 strsendsig(stp->sd_siglist, events, 0, 0); 3316 } 3317 } else { 3318 if (stp->sd_flag & WSLEEP) { 3319 stp->sd_flag &= ~WSLEEP; 3320 cv_broadcast(&stp->sd_wrq->q_wait); 3321 } 3322 3323 mutex_exit(&stp->sd_lock); 3324 pollwakeup(pl, POLLWRNORM); 3325 mutex_enter(&stp->sd_lock); 3326 3327 if (stp->sd_sigflags & S_WRNORM) 3328 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3329 } 3330 mutex_exit(&stp->sd_lock); 3331 } 3332 3333 int 3334 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3335 { 3336 stdata_t *stp = STREAM(q); 3337 int typ = STRUIOT_STANDARD; 3338 uio_t *uiop = &dp->d_uio; 3339 dblk_t *dbp; 3340 ssize_t uiocnt; 3341 ssize_t cnt; 3342 unsigned char *ptr; 3343 ssize_t resid; 3344 int error = 0; 3345 on_trap_data_t otd; 3346 queue_t *stwrq; 3347 3348 /* 3349 * Plumbing may change while taking the type so store the 3350 * queue in a temporary variable. It doesn't matter even 3351 * if the we take the type from the previous plumbing, 3352 * that's because if the plumbing has changed when we were 3353 * holding the queue in a temporary variable, we can continue 3354 * processing the message the way it would have been processed 3355 * in the old plumbing, without any side effects but a bit 3356 * extra processing for partial ip header checksum. 3357 * 3358 * This has been done to avoid holding the sd_lock which is 3359 * very hot. 3360 */ 3361 3362 stwrq = stp->sd_struiowrq; 3363 if (stwrq) 3364 typ = stwrq->q_struiot; 3365 3366 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3367 dbp = mp->b_datap; 3368 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3369 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3370 cnt = MIN(uiocnt, uiop->uio_resid); 3371 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3372 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3373 /* 3374 * Either this mblk has already been processed 3375 * or there is no more room in this mblk (?). 3376 */ 3377 continue; 3378 } 3379 switch (typ) { 3380 case STRUIOT_STANDARD: 3381 if (noblock) { 3382 if (on_trap(&otd, OT_DATA_ACCESS)) { 3383 no_trap(); 3384 error = EWOULDBLOCK; 3385 goto out; 3386 } 3387 } 3388 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3389 if (noblock) 3390 no_trap(); 3391 goto out; 3392 } 3393 if (noblock) 3394 no_trap(); 3395 break; 3396 3397 default: 3398 error = EIO; 3399 goto out; 3400 } 3401 dbp->db_struioflag |= STRUIO_DONE; 3402 dbp->db_cksumstuff += cnt; 3403 } 3404 out: 3405 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3406 /* 3407 * A fault has occured and some bytes were moved to the 3408 * current mblk, the uio_t has already been updated by 3409 * the appropriate uio routine, so also update the mblk 3410 * to reflect this in case this same mblk chain is used 3411 * again (after the fault has been handled). 3412 */ 3413 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3414 if (uiocnt >= resid) 3415 dbp->db_cksumstuff += resid; 3416 } 3417 return (error); 3418 } 3419 3420 /* 3421 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3422 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3423 * that removeq() will not try to close the queue while a thread is inside the 3424 * queue. 3425 */ 3426 static boolean_t 3427 rwnext_enter(queue_t *qp) 3428 { 3429 mutex_enter(QLOCK(qp)); 3430 if (qp->q_flag & QWCLOSE) { 3431 mutex_exit(QLOCK(qp)); 3432 return (B_FALSE); 3433 } 3434 qp->q_rwcnt++; 3435 ASSERT(qp->q_rwcnt != 0); 3436 mutex_exit(QLOCK(qp)); 3437 return (B_TRUE); 3438 } 3439 3440 /* 3441 * Decrease the count of threads running in sync stream queue and wake up any 3442 * threads blocked in removeq(). 3443 */ 3444 static void 3445 rwnext_exit(queue_t *qp) 3446 { 3447 mutex_enter(QLOCK(qp)); 3448 qp->q_rwcnt--; 3449 if (qp->q_flag & QWANTRMQSYNC) { 3450 qp->q_flag &= ~QWANTRMQSYNC; 3451 cv_broadcast(&qp->q_wait); 3452 } 3453 mutex_exit(QLOCK(qp)); 3454 } 3455 3456 /* 3457 * The purpose of rwnext() is to call the rw procedure of the next 3458 * (downstream) modules queue. 3459 * 3460 * treated as put entrypoint for perimeter syncronization. 3461 * 3462 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3463 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3464 * not matter if any regular put entrypoints have been already entered. We 3465 * can't increment one of the sq_putcounts (instead of sq_count) because 3466 * qwait_rw won't know which counter to decrement. 3467 * 3468 * It would be reasonable to add the lockless FASTPUT logic. 3469 */ 3470 int 3471 rwnext(queue_t *qp, struiod_t *dp) 3472 { 3473 queue_t *nqp; 3474 syncq_t *sq; 3475 uint16_t count; 3476 uint16_t flags; 3477 struct qinit *qi; 3478 int (*proc)(); 3479 struct stdata *stp; 3480 int isread; 3481 int rval; 3482 3483 stp = STREAM(qp); 3484 /* 3485 * Prevent q_next from changing by holding sd_lock until acquiring 3486 * SQLOCK. Note that a read-side rwnext from the streamhead will 3487 * already have sd_lock acquired. In either case sd_lock is always 3488 * released after acquiring SQLOCK. 3489 * 3490 * The streamhead read-side holding sd_lock when calling rwnext is 3491 * required to prevent a race condition were M_DATA mblks flowing 3492 * up the read-side of the stream could be bypassed by a rwnext() 3493 * down-call. In this case sd_lock acts as the streamhead perimeter. 3494 */ 3495 if ((nqp = _WR(qp)) == qp) { 3496 isread = 0; 3497 mutex_enter(&stp->sd_lock); 3498 qp = nqp->q_next; 3499 } else { 3500 isread = 1; 3501 if (nqp != stp->sd_wrq) 3502 /* Not streamhead */ 3503 mutex_enter(&stp->sd_lock); 3504 qp = _RD(nqp->q_next); 3505 } 3506 qi = qp->q_qinfo; 3507 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3508 /* 3509 * Not a synchronous module or no r/w procedure for this 3510 * queue, so just return EINVAL and let the caller handle it. 3511 */ 3512 mutex_exit(&stp->sd_lock); 3513 return (EINVAL); 3514 } 3515 3516 if (rwnext_enter(qp) == B_FALSE) { 3517 mutex_exit(&stp->sd_lock); 3518 return (EINVAL); 3519 } 3520 3521 sq = qp->q_syncq; 3522 mutex_enter(SQLOCK(sq)); 3523 mutex_exit(&stp->sd_lock); 3524 count = sq->sq_count; 3525 flags = sq->sq_flags; 3526 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3527 3528 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3529 /* 3530 * if this queue is being closed, return. 3531 */ 3532 if (qp->q_flag & QWCLOSE) { 3533 mutex_exit(SQLOCK(sq)); 3534 rwnext_exit(qp); 3535 return (EINVAL); 3536 } 3537 3538 /* 3539 * Wait until we can enter the inner perimeter. 3540 */ 3541 sq->sq_flags = flags | SQ_WANTWAKEUP; 3542 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3543 count = sq->sq_count; 3544 flags = sq->sq_flags; 3545 } 3546 3547 if (isread == 0 && stp->sd_struiowrq == NULL || 3548 isread == 1 && stp->sd_struiordq == NULL) { 3549 /* 3550 * Stream plumbing changed while waiting for inner perimeter 3551 * so just return EINVAL and let the caller handle it. 3552 */ 3553 mutex_exit(SQLOCK(sq)); 3554 rwnext_exit(qp); 3555 return (EINVAL); 3556 } 3557 if (!(flags & SQ_CIPUT)) 3558 sq->sq_flags = flags | SQ_EXCL; 3559 sq->sq_count = count + 1; 3560 ASSERT(sq->sq_count != 0); /* Wraparound */ 3561 /* 3562 * Note: The only message ordering guarantee that rwnext() makes is 3563 * for the write queue flow-control case. All others (r/w queue 3564 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3565 * the queue's rw procedure. This could be genralized here buy 3566 * running the queue's service procedure, but that wouldn't be 3567 * the most efficent for all cases. 3568 */ 3569 mutex_exit(SQLOCK(sq)); 3570 if (! isread && (qp->q_flag & QFULL)) { 3571 /* 3572 * Write queue may be flow controlled. If so, 3573 * mark the queue for wakeup when it's not. 3574 */ 3575 mutex_enter(QLOCK(qp)); 3576 if (qp->q_flag & QFULL) { 3577 qp->q_flag |= QWANTWSYNC; 3578 mutex_exit(QLOCK(qp)); 3579 rval = EWOULDBLOCK; 3580 goto out; 3581 } 3582 mutex_exit(QLOCK(qp)); 3583 } 3584 3585 if (! isread && dp->d_mp) 3586 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3587 dp->d_mp->b_datap->db_base); 3588 3589 rval = (*proc)(qp, dp); 3590 3591 if (isread && dp->d_mp) 3592 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3593 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3594 out: 3595 /* 3596 * The queue is protected from being freed by sq_count, so it is 3597 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3598 */ 3599 rwnext_exit(qp); 3600 3601 mutex_enter(SQLOCK(sq)); 3602 flags = sq->sq_flags; 3603 ASSERT(sq->sq_count != 0); 3604 sq->sq_count--; 3605 if (flags & SQ_TAIL) { 3606 putnext_tail(sq, qp, flags); 3607 /* 3608 * The only purpose of this ASSERT is to preserve calling stack 3609 * in DEBUG kernel. 3610 */ 3611 ASSERT(flags & SQ_TAIL); 3612 return (rval); 3613 } 3614 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3615 /* 3616 * Safe to always drop SQ_EXCL: 3617 * Not SQ_CIPUT means we set SQ_EXCL above 3618 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3619 * did a qwriter(INNER) in which case nobody else 3620 * is in the inner perimeter and we are exiting. 3621 * 3622 * I would like to make the following assertion: 3623 * 3624 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3625 * sq->sq_count == 0); 3626 * 3627 * which indicates that if we are both putshared and exclusive, 3628 * we became exclusive while executing the putproc, and the only 3629 * claim on the syncq was the one we dropped a few lines above. 3630 * But other threads that enter putnext while the syncq is exclusive 3631 * need to make a claim as they may need to drop SQLOCK in the 3632 * has_writers case to avoid deadlocks. If these threads are 3633 * delayed or preempted, it is possible that the writer thread can 3634 * find out that there are other claims making the (sq_count == 0) 3635 * test invalid. 3636 */ 3637 3638 sq->sq_flags = flags & ~SQ_EXCL; 3639 if (sq->sq_flags & SQ_WANTWAKEUP) { 3640 sq->sq_flags &= ~SQ_WANTWAKEUP; 3641 cv_broadcast(&sq->sq_wait); 3642 } 3643 mutex_exit(SQLOCK(sq)); 3644 return (rval); 3645 } 3646 3647 /* 3648 * The purpose of infonext() is to call the info procedure of the next 3649 * (downstream) modules queue. 3650 * 3651 * treated as put entrypoint for perimeter syncronization. 3652 * 3653 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3654 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3655 * it does not matter if any regular put entrypoints have been already 3656 * entered. 3657 */ 3658 int 3659 infonext(queue_t *qp, infod_t *idp) 3660 { 3661 queue_t *nqp; 3662 syncq_t *sq; 3663 uint16_t count; 3664 uint16_t flags; 3665 struct qinit *qi; 3666 int (*proc)(); 3667 struct stdata *stp; 3668 int rval; 3669 3670 stp = STREAM(qp); 3671 /* 3672 * Prevent q_next from changing by holding sd_lock until 3673 * acquiring SQLOCK. 3674 */ 3675 mutex_enter(&stp->sd_lock); 3676 if ((nqp = _WR(qp)) == qp) { 3677 qp = nqp->q_next; 3678 } else { 3679 qp = _RD(nqp->q_next); 3680 } 3681 qi = qp->q_qinfo; 3682 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3683 mutex_exit(&stp->sd_lock); 3684 return (EINVAL); 3685 } 3686 sq = qp->q_syncq; 3687 mutex_enter(SQLOCK(sq)); 3688 mutex_exit(&stp->sd_lock); 3689 count = sq->sq_count; 3690 flags = sq->sq_flags; 3691 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3692 3693 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3694 /* 3695 * Wait until we can enter the inner perimeter. 3696 */ 3697 sq->sq_flags = flags | SQ_WANTWAKEUP; 3698 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3699 count = sq->sq_count; 3700 flags = sq->sq_flags; 3701 } 3702 3703 if (! (flags & SQ_CIPUT)) 3704 sq->sq_flags = flags | SQ_EXCL; 3705 sq->sq_count = count + 1; 3706 ASSERT(sq->sq_count != 0); /* Wraparound */ 3707 mutex_exit(SQLOCK(sq)); 3708 3709 rval = (*proc)(qp, idp); 3710 3711 mutex_enter(SQLOCK(sq)); 3712 flags = sq->sq_flags; 3713 ASSERT(sq->sq_count != 0); 3714 sq->sq_count--; 3715 if (flags & SQ_TAIL) { 3716 putnext_tail(sq, qp, flags); 3717 /* 3718 * The only purpose of this ASSERT is to preserve calling stack 3719 * in DEBUG kernel. 3720 */ 3721 ASSERT(flags & SQ_TAIL); 3722 return (rval); 3723 } 3724 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3725 /* 3726 * XXXX 3727 * I am not certain the next comment is correct here. I need to consider 3728 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3729 * might cause other problems. It just might be safer to drop it if 3730 * !SQ_CIPUT because that is when we set it. 3731 */ 3732 /* 3733 * Safe to always drop SQ_EXCL: 3734 * Not SQ_CIPUT means we set SQ_EXCL above 3735 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3736 * did a qwriter(INNER) in which case nobody else 3737 * is in the inner perimeter and we are exiting. 3738 * 3739 * I would like to make the following assertion: 3740 * 3741 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3742 * sq->sq_count == 0); 3743 * 3744 * which indicates that if we are both putshared and exclusive, 3745 * we became exclusive while executing the putproc, and the only 3746 * claim on the syncq was the one we dropped a few lines above. 3747 * But other threads that enter putnext while the syncq is exclusive 3748 * need to make a claim as they may need to drop SQLOCK in the 3749 * has_writers case to avoid deadlocks. If these threads are 3750 * delayed or preempted, it is possible that the writer thread can 3751 * find out that there are other claims making the (sq_count == 0) 3752 * test invalid. 3753 */ 3754 3755 sq->sq_flags = flags & ~SQ_EXCL; 3756 mutex_exit(SQLOCK(sq)); 3757 return (rval); 3758 } 3759 3760 /* 3761 * Return nonzero if the queue is responsible for struio(), else return 0. 3762 */ 3763 int 3764 isuioq(queue_t *q) 3765 { 3766 if (q->q_flag & QREADR) 3767 return (STREAM(q)->sd_struiordq == q); 3768 else 3769 return (STREAM(q)->sd_struiowrq == q); 3770 } 3771 3772 #if defined(__sparc) 3773 int disable_putlocks = 0; 3774 #else 3775 int disable_putlocks = 1; 3776 #endif 3777 3778 /* 3779 * called by create_putlock. 3780 */ 3781 static void 3782 create_syncq_putlocks(queue_t *q) 3783 { 3784 syncq_t *sq = q->q_syncq; 3785 ciputctrl_t *cip; 3786 int i; 3787 3788 ASSERT(sq != NULL); 3789 3790 ASSERT(disable_putlocks == 0); 3791 ASSERT(n_ciputctrl >= min_n_ciputctrl); 3792 ASSERT(ciputctrl_cache != NULL); 3793 3794 if (!(sq->sq_type & SQ_CIPUT)) 3795 return; 3796 3797 for (i = 0; i <= 1; i++) { 3798 if (sq->sq_ciputctrl == NULL) { 3799 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3800 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3801 mutex_enter(SQLOCK(sq)); 3802 if (sq->sq_ciputctrl != NULL) { 3803 mutex_exit(SQLOCK(sq)); 3804 kmem_cache_free(ciputctrl_cache, cip); 3805 } else { 3806 ASSERT(sq->sq_nciputctrl == 0); 3807 sq->sq_nciputctrl = n_ciputctrl - 1; 3808 /* 3809 * putnext checks sq_ciputctrl without holding 3810 * SQLOCK. if it is not NULL putnext assumes 3811 * sq_nciputctrl is initialized. membar below 3812 * insures that. 3813 */ 3814 membar_producer(); 3815 sq->sq_ciputctrl = cip; 3816 mutex_exit(SQLOCK(sq)); 3817 } 3818 } 3819 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 3820 if (i == 1) 3821 break; 3822 q = _OTHERQ(q); 3823 if (!(q->q_flag & QPERQ)) { 3824 ASSERT(sq == q->q_syncq); 3825 break; 3826 } 3827 ASSERT(q->q_syncq != NULL); 3828 ASSERT(sq != q->q_syncq); 3829 sq = q->q_syncq; 3830 ASSERT(sq->sq_type & SQ_CIPUT); 3831 } 3832 } 3833 3834 /* 3835 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 3836 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 3837 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 3838 * starting from q and down to the driver. 3839 * 3840 * This should be called after the affected queues are part of stream 3841 * geometry. It should be called from driver/module open routine after 3842 * qprocson() call. It is also called from nfs syscall where it is known that 3843 * stream is configured and won't change its geometry during create_putlock 3844 * call. 3845 * 3846 * caller normally uses 0 value for the stream argument to speed up MT putnext 3847 * into the perimeter of q for example because its perimeter is per module 3848 * (e.g. IP). 3849 * 3850 * caller normally uses non 0 value for the stream argument to hint the system 3851 * that the stream of q is a very contended global system stream 3852 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 3853 * particularly MT hot. 3854 * 3855 * Caller insures stream plumbing won't happen while we are here and therefore 3856 * q_next can be safely used. 3857 */ 3858 3859 void 3860 create_putlocks(queue_t *q, int stream) 3861 { 3862 ciputctrl_t *cip; 3863 struct stdata *stp = STREAM(q); 3864 3865 q = _WR(q); 3866 ASSERT(stp != NULL); 3867 3868 if (disable_putlocks != 0) 3869 return; 3870 3871 if (n_ciputctrl < min_n_ciputctrl) 3872 return; 3873 3874 ASSERT(ciputctrl_cache != NULL); 3875 3876 if (stream != 0 && stp->sd_ciputctrl == NULL) { 3877 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3878 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3879 mutex_enter(&stp->sd_lock); 3880 if (stp->sd_ciputctrl != NULL) { 3881 mutex_exit(&stp->sd_lock); 3882 kmem_cache_free(ciputctrl_cache, cip); 3883 } else { 3884 ASSERT(stp->sd_nciputctrl == 0); 3885 stp->sd_nciputctrl = n_ciputctrl - 1; 3886 /* 3887 * putnext checks sd_ciputctrl without holding 3888 * sd_lock. if it is not NULL putnext assumes 3889 * sd_nciputctrl is initialized. membar below 3890 * insures that. 3891 */ 3892 membar_producer(); 3893 stp->sd_ciputctrl = cip; 3894 mutex_exit(&stp->sd_lock); 3895 } 3896 } 3897 3898 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 3899 3900 while (_SAMESTR(q)) { 3901 create_syncq_putlocks(q); 3902 if (stream == 0) 3903 return; 3904 q = q->q_next; 3905 } 3906 ASSERT(q != NULL); 3907 create_syncq_putlocks(q); 3908 } 3909 3910 /* 3911 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 3912 * through a stream. 3913 * 3914 * Data currently record per event is a hrtime stamp, queue address, event 3915 * type, and a per type datum. Much of the STREAMS framework is instrumented 3916 * for automatic flow tracing (when enabled). Events can be defined and used 3917 * by STREAMS modules and drivers. 3918 * 3919 * Global objects: 3920 * 3921 * str_ftevent() - Add a flow-trace event to a dblk. 3922 * str_ftfree() - Free flow-trace data 3923 * 3924 * Local objects: 3925 * 3926 * fthdr_cache - pointer to the kmem cache for trace header. 3927 * ftblk_cache - pointer to the kmem cache for trace data blocks. 3928 */ 3929 3930 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 3931 3932 void 3933 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 3934 { 3935 ftblk_t *bp = hp->tail; 3936 ftblk_t *nbp; 3937 ftevnt_t *ep; 3938 int ix, nix; 3939 3940 ASSERT(hp != NULL); 3941 3942 for (;;) { 3943 if ((ix = bp->ix) == FTBLK_EVNTS) { 3944 /* 3945 * Tail doesn't have room, so need a new tail. 3946 * 3947 * To make this MT safe, first, allocate a new 3948 * ftblk, and initialize it. To make life a 3949 * little easier, reserve the first slot (mostly 3950 * by making ix = 1). When we are finished with 3951 * the initialization, CAS this pointer to the 3952 * tail. If this succeeds, this is the new 3953 * "next" block. Otherwise, another thread 3954 * got here first, so free the block and start 3955 * again. 3956 */ 3957 if (!(nbp = kmem_cache_alloc(ftblk_cache, 3958 KM_NOSLEEP))) { 3959 /* no mem, so punt */ 3960 str_ftnever++; 3961 /* free up all flow data? */ 3962 return; 3963 } 3964 nbp->nxt = NULL; 3965 nbp->ix = 1; 3966 /* 3967 * Just in case there is another thread about 3968 * to get the next index, we need to make sure 3969 * the value is there for it. 3970 */ 3971 membar_producer(); 3972 if (casptr(&hp->tail, bp, nbp) == bp) { 3973 /* CAS was successful */ 3974 bp->nxt = nbp; 3975 membar_producer(); 3976 bp = nbp; 3977 ix = 0; 3978 goto cas_good; 3979 } else { 3980 kmem_cache_free(ftblk_cache, nbp); 3981 bp = hp->tail; 3982 continue; 3983 } 3984 } 3985 nix = ix + 1; 3986 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 3987 cas_good: 3988 if (curthread != hp->thread) { 3989 hp->thread = curthread; 3990 evnt |= FTEV_CS; 3991 } 3992 if (CPU->cpu_seqid != hp->cpu_seqid) { 3993 hp->cpu_seqid = CPU->cpu_seqid; 3994 evnt |= FTEV_PS; 3995 } 3996 ep = &bp->ev[ix]; 3997 break; 3998 } 3999 } 4000 4001 if (evnt & FTEV_QMASK) { 4002 queue_t *qp = p; 4003 4004 /* 4005 * It is possible that the module info is broke 4006 * (as is logsubr.c at this comment writing). 4007 * Instead of panicing or doing other unmentionables, 4008 * we shall put a dummy name as the mid, and continue. 4009 */ 4010 if (qp->q_qinfo == NULL) 4011 ep->mid = "NONAME"; 4012 else 4013 ep->mid = qp->q_qinfo->qi_minfo->mi_idname; 4014 4015 if (!(qp->q_flag & QREADR)) 4016 evnt |= FTEV_ISWR; 4017 } else { 4018 ep->mid = (char *)p; 4019 } 4020 4021 ep->ts = gethrtime(); 4022 ep->evnt = evnt; 4023 ep->data = data; 4024 hp->hash = (hp->hash << 9) + hp->hash; 4025 hp->hash += (evnt << 16) | data; 4026 hp->hash += (uintptr_t)ep->mid; 4027 } 4028 4029 /* 4030 * Free flow-trace data. 4031 */ 4032 void 4033 str_ftfree(dblk_t *dbp) 4034 { 4035 fthdr_t *hp = dbp->db_fthdr; 4036 ftblk_t *bp = &hp->first; 4037 ftblk_t *nbp; 4038 4039 if (bp != hp->tail || bp->ix != 0) { 4040 /* 4041 * Clear out the hash, have the tail point to itself, and free 4042 * any continuation blocks. 4043 */ 4044 bp = hp->first.nxt; 4045 hp->tail = &hp->first; 4046 hp->hash = 0; 4047 hp->first.nxt = NULL; 4048 hp->first.ix = 0; 4049 while (bp != NULL) { 4050 nbp = bp->nxt; 4051 kmem_cache_free(ftblk_cache, bp); 4052 bp = nbp; 4053 } 4054 } 4055 kmem_cache_free(fthdr_cache, hp); 4056 dbp->db_fthdr = NULL; 4057 } 4058