1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 25 /* 26 * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 27 * Use is subject to license terms. 28 */ 29 30 #pragma ident "%Z%%M% %I% %E% SMI" 31 32 #include <sys/types.h> 33 #include <sys/param.h> 34 #include <sys/thread.h> 35 #include <sys/sysmacros.h> 36 #include <sys/stropts.h> 37 #include <sys/stream.h> 38 #include <sys/strsubr.h> 39 #include <sys/strsun.h> 40 #include <sys/conf.h> 41 #include <sys/debug.h> 42 #include <sys/cmn_err.h> 43 #include <sys/kmem.h> 44 #include <sys/atomic.h> 45 #include <sys/errno.h> 46 #include <sys/vtrace.h> 47 #include <sys/ftrace.h> 48 #include <sys/ontrap.h> 49 #include <sys/multidata.h> 50 #include <sys/multidata_impl.h> 51 #include <sys/sdt.h> 52 #include <sys/strft.h> 53 54 #ifdef DEBUG 55 #include <sys/kmem_impl.h> 56 #endif 57 58 /* 59 * This file contains all the STREAMS utility routines that may 60 * be used by modules and drivers. 61 */ 62 63 /* 64 * STREAMS message allocator: principles of operation 65 * 66 * The streams message allocator consists of all the routines that 67 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 68 * dupb(), freeb() and freemsg(). What follows is a high-level view 69 * of how the allocator works. 70 * 71 * Every streams message consists of one or more mblks, a dblk, and data. 72 * All mblks for all types of messages come from a common mblk_cache. 73 * The dblk and data come in several flavors, depending on how the 74 * message is allocated: 75 * 76 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 77 * fixed-size dblk/data caches. For message sizes that are multiples of 78 * PAGESIZE, dblks are allocated separately from the buffer. 79 * The associated buffer is allocated by the constructor using kmem_alloc(). 80 * For all other message sizes, dblk and its associated data is allocated 81 * as a single contiguous chunk of memory. 82 * Objects in these caches consist of a dblk plus its associated data. 83 * allocb() determines the nearest-size cache by table lookup: 84 * the dblk_cache[] array provides the mapping from size to dblk cache. 85 * 86 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 87 * kmem_alloc()'ing a buffer for the data and supplying that 88 * buffer to gesballoc(), described below. 89 * 90 * (3) The four flavors of [d]esballoc[a] are all implemented by a 91 * common routine, gesballoc() ("generic esballoc"). gesballoc() 92 * allocates a dblk from the global dblk_esb_cache and sets db_base, 93 * db_lim and db_frtnp to describe the caller-supplied buffer. 94 * 95 * While there are several routines to allocate messages, there is only 96 * one routine to free messages: freeb(). freeb() simply invokes the 97 * dblk's free method, dbp->db_free(), which is set at allocation time. 98 * 99 * dupb() creates a new reference to a message by allocating a new mblk, 100 * incrementing the dblk reference count and setting the dblk's free 101 * method to dblk_decref(). The dblk's original free method is retained 102 * in db_lastfree. dblk_decref() decrements the reference count on each 103 * freeb(). If this is not the last reference it just frees the mblk; 104 * if this *is* the last reference, it restores db_free to db_lastfree, 105 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 106 * 107 * The implementation makes aggressive use of kmem object caching for 108 * maximum performance. This makes the code simple and compact, but 109 * also a bit abstruse in some places. The invariants that constitute a 110 * message's constructed state, described below, are more subtle than usual. 111 * 112 * Every dblk has an "attached mblk" as part of its constructed state. 113 * The mblk is allocated by the dblk's constructor and remains attached 114 * until the message is either dup'ed or pulled up. In the dupb() case 115 * the mblk association doesn't matter until the last free, at which time 116 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 117 * the mblk association because it swaps the leading mblks of two messages, 118 * so it is responsible for swapping their db_mblk pointers accordingly. 119 * From a constructed-state viewpoint it doesn't matter that a dblk's 120 * attached mblk can change while the message is allocated; all that 121 * matters is that the dblk has *some* attached mblk when it's freed. 122 * 123 * The sizes of the allocb() small-message caches are not magical. 124 * They represent a good trade-off between internal and external 125 * fragmentation for current workloads. They should be reevaluated 126 * periodically, especially if allocations larger than DBLK_MAX_CACHE 127 * become common. We use 64-byte alignment so that dblks don't 128 * straddle cache lines unnecessarily. 129 */ 130 #define DBLK_MAX_CACHE 73728 131 #define DBLK_CACHE_ALIGN 64 132 #define DBLK_MIN_SIZE 8 133 #define DBLK_SIZE_SHIFT 3 134 135 #ifdef _BIG_ENDIAN 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 138 #else 139 #define DBLK_RTFU_SHIFT(field) \ 140 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 141 #endif 142 143 #define DBLK_RTFU(ref, type, flags, uioflag) \ 144 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 145 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 146 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 147 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 148 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 149 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 150 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 151 152 static size_t dblk_sizes[] = { 153 #ifdef _LP64 154 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920, 155 8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688, 156 40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456, 157 #else 158 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968, 159 8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736, 160 40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504, 161 #endif 162 DBLK_MAX_CACHE, 0 163 }; 164 165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 166 static struct kmem_cache *mblk_cache; 167 static struct kmem_cache *dblk_esb_cache; 168 static struct kmem_cache *fthdr_cache; 169 static struct kmem_cache *ftblk_cache; 170 171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 172 static mblk_t *allocb_oversize(size_t size, int flags); 173 static int allocb_tryhard_fails; 174 static void frnop_func(void *arg); 175 frtn_t frnop = { frnop_func }; 176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 177 178 static boolean_t rwnext_enter(queue_t *qp); 179 static void rwnext_exit(queue_t *qp); 180 181 /* 182 * Patchable mblk/dblk kmem_cache flags. 183 */ 184 int dblk_kmem_flags = 0; 185 int mblk_kmem_flags = 0; 186 187 188 static int 189 dblk_constructor(void *buf, void *cdrarg, int kmflags) 190 { 191 dblk_t *dbp = buf; 192 ssize_t msg_size = (ssize_t)cdrarg; 193 size_t index; 194 195 ASSERT(msg_size != 0); 196 197 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 198 199 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 200 201 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 202 return (-1); 203 if ((msg_size & PAGEOFFSET) == 0) { 204 dbp->db_base = kmem_alloc(msg_size, kmflags); 205 if (dbp->db_base == NULL) { 206 kmem_cache_free(mblk_cache, dbp->db_mblk); 207 return (-1); 208 } 209 } else { 210 dbp->db_base = (unsigned char *)&dbp[1]; 211 } 212 213 dbp->db_mblk->b_datap = dbp; 214 dbp->db_cache = dblk_cache[index]; 215 dbp->db_lim = dbp->db_base + msg_size; 216 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 217 dbp->db_frtnp = NULL; 218 dbp->db_fthdr = NULL; 219 dbp->db_credp = NULL; 220 dbp->db_cpid = -1; 221 dbp->db_struioflag = 0; 222 dbp->db_struioun.cksum.flags = 0; 223 return (0); 224 } 225 226 /*ARGSUSED*/ 227 static int 228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 229 { 230 dblk_t *dbp = buf; 231 232 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 233 return (-1); 234 dbp->db_mblk->b_datap = dbp; 235 dbp->db_cache = dblk_esb_cache; 236 dbp->db_fthdr = NULL; 237 dbp->db_credp = NULL; 238 dbp->db_cpid = -1; 239 dbp->db_struioflag = 0; 240 dbp->db_struioun.cksum.flags = 0; 241 return (0); 242 } 243 244 static int 245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 246 { 247 dblk_t *dbp = buf; 248 bcache_t *bcp = (bcache_t *)cdrarg; 249 250 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 251 return (-1); 252 253 if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache, 254 kmflags)) == NULL) { 255 kmem_cache_free(mblk_cache, dbp->db_mblk); 256 return (-1); 257 } 258 259 dbp->db_mblk->b_datap = dbp; 260 dbp->db_cache = (void *)bcp; 261 dbp->db_lim = dbp->db_base + bcp->size; 262 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 263 dbp->db_frtnp = NULL; 264 dbp->db_fthdr = NULL; 265 dbp->db_credp = NULL; 266 dbp->db_cpid = -1; 267 dbp->db_struioflag = 0; 268 dbp->db_struioun.cksum.flags = 0; 269 return (0); 270 } 271 272 /*ARGSUSED*/ 273 static void 274 dblk_destructor(void *buf, void *cdrarg) 275 { 276 dblk_t *dbp = buf; 277 ssize_t msg_size = (ssize_t)cdrarg; 278 279 ASSERT(dbp->db_mblk->b_datap == dbp); 280 281 ASSERT(msg_size != 0); 282 283 ASSERT(dbp->db_struioflag == 0); 284 ASSERT(dbp->db_struioun.cksum.flags == 0); 285 286 if ((msg_size & PAGEOFFSET) == 0) { 287 kmem_free(dbp->db_base, msg_size); 288 } 289 290 kmem_cache_free(mblk_cache, dbp->db_mblk); 291 } 292 293 static void 294 bcache_dblk_destructor(void *buf, void *cdrarg) 295 { 296 dblk_t *dbp = buf; 297 bcache_t *bcp = (bcache_t *)cdrarg; 298 299 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 300 301 ASSERT(dbp->db_mblk->b_datap == dbp); 302 303 ASSERT(dbp->db_struioflag == 0); 304 ASSERT(dbp->db_struioun.cksum.flags == 0); 305 306 kmem_cache_free(mblk_cache, dbp->db_mblk); 307 } 308 309 void 310 streams_msg_init(void) 311 { 312 char name[40]; 313 size_t size; 314 size_t lastsize = DBLK_MIN_SIZE; 315 size_t *sizep; 316 struct kmem_cache *cp; 317 size_t tot_size; 318 int offset; 319 320 mblk_cache = kmem_cache_create("streams_mblk", 321 sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL, 322 mblk_kmem_flags); 323 324 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 325 326 if ((offset = (size & PAGEOFFSET)) != 0) { 327 /* 328 * We are in the middle of a page, dblk should 329 * be allocated on the same page 330 */ 331 tot_size = size + sizeof (dblk_t); 332 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 333 < PAGESIZE); 334 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 335 336 } else { 337 338 /* 339 * buf size is multiple of page size, dblk and 340 * buffer are allocated separately. 341 */ 342 343 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 344 tot_size = sizeof (dblk_t); 345 } 346 347 (void) sprintf(name, "streams_dblk_%ld", size); 348 cp = kmem_cache_create(name, tot_size, 349 DBLK_CACHE_ALIGN, dblk_constructor, 350 dblk_destructor, NULL, 351 (void *)(size), NULL, dblk_kmem_flags); 352 353 while (lastsize <= size) { 354 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 355 lastsize += DBLK_MIN_SIZE; 356 } 357 } 358 359 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", 360 sizeof (dblk_t), DBLK_CACHE_ALIGN, 361 dblk_esb_constructor, dblk_destructor, NULL, 362 (void *) sizeof (dblk_t), NULL, dblk_kmem_flags); 363 fthdr_cache = kmem_cache_create("streams_fthdr", 364 sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 365 ftblk_cache = kmem_cache_create("streams_ftblk", 366 sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 367 368 /* Initialize Multidata caches */ 369 mmd_init(); 370 371 /* initialize throttling queue for esballoc */ 372 esballoc_queue_init(); 373 } 374 375 /*ARGSUSED*/ 376 mblk_t * 377 allocb(size_t size, uint_t pri) 378 { 379 dblk_t *dbp; 380 mblk_t *mp; 381 size_t index; 382 383 index = (size - 1) >> DBLK_SIZE_SHIFT; 384 385 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 386 if (size != 0) { 387 mp = allocb_oversize(size, KM_NOSLEEP); 388 goto out; 389 } 390 index = 0; 391 } 392 393 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 394 mp = NULL; 395 goto out; 396 } 397 398 mp = dbp->db_mblk; 399 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 400 mp->b_next = mp->b_prev = mp->b_cont = NULL; 401 mp->b_rptr = mp->b_wptr = dbp->db_base; 402 mp->b_queue = NULL; 403 MBLK_BAND_FLAG_WORD(mp) = 0; 404 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 405 out: 406 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 407 408 return (mp); 409 } 410 411 mblk_t * 412 allocb_tmpl(size_t size, const mblk_t *tmpl) 413 { 414 mblk_t *mp = allocb(size, 0); 415 416 if (mp != NULL) { 417 cred_t *cr = DB_CRED(tmpl); 418 if (cr != NULL) 419 crhold(mp->b_datap->db_credp = cr); 420 DB_CPID(mp) = DB_CPID(tmpl); 421 DB_TYPE(mp) = DB_TYPE(tmpl); 422 } 423 return (mp); 424 } 425 426 mblk_t * 427 allocb_cred(size_t size, cred_t *cr) 428 { 429 mblk_t *mp = allocb(size, 0); 430 431 if (mp != NULL && cr != NULL) 432 crhold(mp->b_datap->db_credp = cr); 433 434 return (mp); 435 } 436 437 mblk_t * 438 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr) 439 { 440 mblk_t *mp = allocb_wait(size, 0, flags, error); 441 442 if (mp != NULL && cr != NULL) 443 crhold(mp->b_datap->db_credp = cr); 444 445 return (mp); 446 } 447 448 void 449 freeb(mblk_t *mp) 450 { 451 dblk_t *dbp = mp->b_datap; 452 453 ASSERT(dbp->db_ref > 0); 454 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 455 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 456 457 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 458 459 dbp->db_free(mp, dbp); 460 } 461 462 void 463 freemsg(mblk_t *mp) 464 { 465 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 466 while (mp) { 467 dblk_t *dbp = mp->b_datap; 468 mblk_t *mp_cont = mp->b_cont; 469 470 ASSERT(dbp->db_ref > 0); 471 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 472 473 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 474 475 dbp->db_free(mp, dbp); 476 mp = mp_cont; 477 } 478 } 479 480 /* 481 * Reallocate a block for another use. Try hard to use the old block. 482 * If the old data is wanted (copy), leave b_wptr at the end of the data, 483 * otherwise return b_wptr = b_rptr. 484 * 485 * This routine is private and unstable. 486 */ 487 mblk_t * 488 reallocb(mblk_t *mp, size_t size, uint_t copy) 489 { 490 mblk_t *mp1; 491 unsigned char *old_rptr; 492 ptrdiff_t cur_size; 493 494 if (mp == NULL) 495 return (allocb(size, BPRI_HI)); 496 497 cur_size = mp->b_wptr - mp->b_rptr; 498 old_rptr = mp->b_rptr; 499 500 ASSERT(mp->b_datap->db_ref != 0); 501 502 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 503 /* 504 * If the data is wanted and it will fit where it is, no 505 * work is required. 506 */ 507 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 508 return (mp); 509 510 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 511 mp1 = mp; 512 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 513 /* XXX other mp state could be copied too, db_flags ... ? */ 514 mp1->b_cont = mp->b_cont; 515 } else { 516 return (NULL); 517 } 518 519 if (copy) { 520 bcopy(old_rptr, mp1->b_rptr, cur_size); 521 mp1->b_wptr = mp1->b_rptr + cur_size; 522 } 523 524 if (mp != mp1) 525 freeb(mp); 526 527 return (mp1); 528 } 529 530 static void 531 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 532 { 533 ASSERT(dbp->db_mblk == mp); 534 if (dbp->db_fthdr != NULL) 535 str_ftfree(dbp); 536 537 /* set credp and projid to be 'unspecified' before returning to cache */ 538 if (dbp->db_credp != NULL) { 539 crfree(dbp->db_credp); 540 dbp->db_credp = NULL; 541 } 542 dbp->db_cpid = -1; 543 544 /* Reset the struioflag and the checksum flag fields */ 545 dbp->db_struioflag = 0; 546 dbp->db_struioun.cksum.flags = 0; 547 548 /* and the COOKED flag */ 549 dbp->db_flags &= ~DBLK_COOKED; 550 551 kmem_cache_free(dbp->db_cache, dbp); 552 } 553 554 static void 555 dblk_decref(mblk_t *mp, dblk_t *dbp) 556 { 557 if (dbp->db_ref != 1) { 558 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 559 -(1 << DBLK_RTFU_SHIFT(db_ref))); 560 /* 561 * atomic_add_32_nv() just decremented db_ref, so we no longer 562 * have a reference to the dblk, which means another thread 563 * could free it. Therefore we cannot examine the dblk to 564 * determine whether ours was the last reference. Instead, 565 * we extract the new and minimum reference counts from rtfu. 566 * Note that all we're really saying is "if (ref != refmin)". 567 */ 568 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 569 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 570 kmem_cache_free(mblk_cache, mp); 571 return; 572 } 573 } 574 dbp->db_mblk = mp; 575 dbp->db_free = dbp->db_lastfree; 576 dbp->db_lastfree(mp, dbp); 577 } 578 579 mblk_t * 580 dupb(mblk_t *mp) 581 { 582 dblk_t *dbp = mp->b_datap; 583 mblk_t *new_mp; 584 uint32_t oldrtfu, newrtfu; 585 586 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 587 goto out; 588 589 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 590 new_mp->b_rptr = mp->b_rptr; 591 new_mp->b_wptr = mp->b_wptr; 592 new_mp->b_datap = dbp; 593 new_mp->b_queue = NULL; 594 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 595 596 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 597 598 dbp->db_free = dblk_decref; 599 do { 600 ASSERT(dbp->db_ref > 0); 601 oldrtfu = DBLK_RTFU_WORD(dbp); 602 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 603 /* 604 * If db_ref is maxed out we can't dup this message anymore. 605 */ 606 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 607 kmem_cache_free(mblk_cache, new_mp); 608 new_mp = NULL; 609 goto out; 610 } 611 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 612 613 out: 614 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 615 return (new_mp); 616 } 617 618 static void 619 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 620 { 621 frtn_t *frp = dbp->db_frtnp; 622 623 ASSERT(dbp->db_mblk == mp); 624 frp->free_func(frp->free_arg); 625 if (dbp->db_fthdr != NULL) 626 str_ftfree(dbp); 627 628 /* set credp and projid to be 'unspecified' before returning to cache */ 629 if (dbp->db_credp != NULL) { 630 crfree(dbp->db_credp); 631 dbp->db_credp = NULL; 632 } 633 dbp->db_cpid = -1; 634 dbp->db_struioflag = 0; 635 dbp->db_struioun.cksum.flags = 0; 636 637 kmem_cache_free(dbp->db_cache, dbp); 638 } 639 640 /*ARGSUSED*/ 641 static void 642 frnop_func(void *arg) 643 { 644 } 645 646 /* 647 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 648 */ 649 static mblk_t * 650 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 651 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 652 { 653 dblk_t *dbp; 654 mblk_t *mp; 655 656 ASSERT(base != NULL && frp != NULL); 657 658 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 659 mp = NULL; 660 goto out; 661 } 662 663 mp = dbp->db_mblk; 664 dbp->db_base = base; 665 dbp->db_lim = base + size; 666 dbp->db_free = dbp->db_lastfree = lastfree; 667 dbp->db_frtnp = frp; 668 DBLK_RTFU_WORD(dbp) = db_rtfu; 669 mp->b_next = mp->b_prev = mp->b_cont = NULL; 670 mp->b_rptr = mp->b_wptr = base; 671 mp->b_queue = NULL; 672 MBLK_BAND_FLAG_WORD(mp) = 0; 673 674 out: 675 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 676 return (mp); 677 } 678 679 /*ARGSUSED*/ 680 mblk_t * 681 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 682 { 683 mblk_t *mp; 684 685 /* 686 * Note that this is structured to allow the common case (i.e. 687 * STREAMS flowtracing disabled) to call gesballoc() with tail 688 * call optimization. 689 */ 690 if (!str_ftnever) { 691 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 692 frp, freebs_enqueue, KM_NOSLEEP); 693 694 if (mp != NULL) 695 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 696 return (mp); 697 } 698 699 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 700 frp, freebs_enqueue, KM_NOSLEEP)); 701 } 702 703 /* 704 * Same as esballoc() but sleeps waiting for memory. 705 */ 706 /*ARGSUSED*/ 707 mblk_t * 708 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 709 { 710 mblk_t *mp; 711 712 /* 713 * Note that this is structured to allow the common case (i.e. 714 * STREAMS flowtracing disabled) to call gesballoc() with tail 715 * call optimization. 716 */ 717 if (!str_ftnever) { 718 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 719 frp, freebs_enqueue, KM_SLEEP); 720 721 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 722 return (mp); 723 } 724 725 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 726 frp, freebs_enqueue, KM_SLEEP)); 727 } 728 729 /*ARGSUSED*/ 730 mblk_t * 731 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 732 { 733 mblk_t *mp; 734 735 /* 736 * Note that this is structured to allow the common case (i.e. 737 * STREAMS flowtracing disabled) to call gesballoc() with tail 738 * call optimization. 739 */ 740 if (!str_ftnever) { 741 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 742 frp, dblk_lastfree_desb, KM_NOSLEEP); 743 744 if (mp != NULL) 745 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 746 return (mp); 747 } 748 749 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 750 frp, dblk_lastfree_desb, KM_NOSLEEP)); 751 } 752 753 /*ARGSUSED*/ 754 mblk_t * 755 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 756 { 757 mblk_t *mp; 758 759 /* 760 * Note that this is structured to allow the common case (i.e. 761 * STREAMS flowtracing disabled) to call gesballoc() with tail 762 * call optimization. 763 */ 764 if (!str_ftnever) { 765 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 766 frp, freebs_enqueue, KM_NOSLEEP); 767 768 if (mp != NULL) 769 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 770 return (mp); 771 } 772 773 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 774 frp, freebs_enqueue, KM_NOSLEEP)); 775 } 776 777 /*ARGSUSED*/ 778 mblk_t * 779 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 780 { 781 mblk_t *mp; 782 783 /* 784 * Note that this is structured to allow the common case (i.e. 785 * STREAMS flowtracing disabled) to call gesballoc() with tail 786 * call optimization. 787 */ 788 if (!str_ftnever) { 789 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 790 frp, dblk_lastfree_desb, KM_NOSLEEP); 791 792 if (mp != NULL) 793 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 794 return (mp); 795 } 796 797 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 798 frp, dblk_lastfree_desb, KM_NOSLEEP)); 799 } 800 801 static void 802 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 803 { 804 bcache_t *bcp = dbp->db_cache; 805 806 ASSERT(dbp->db_mblk == mp); 807 if (dbp->db_fthdr != NULL) 808 str_ftfree(dbp); 809 810 /* set credp and projid to be 'unspecified' before returning to cache */ 811 if (dbp->db_credp != NULL) { 812 crfree(dbp->db_credp); 813 dbp->db_credp = NULL; 814 } 815 dbp->db_cpid = -1; 816 dbp->db_struioflag = 0; 817 dbp->db_struioun.cksum.flags = 0; 818 819 mutex_enter(&bcp->mutex); 820 kmem_cache_free(bcp->dblk_cache, dbp); 821 bcp->alloc--; 822 823 if (bcp->alloc == 0 && bcp->destroy != 0) { 824 kmem_cache_destroy(bcp->dblk_cache); 825 kmem_cache_destroy(bcp->buffer_cache); 826 mutex_exit(&bcp->mutex); 827 mutex_destroy(&bcp->mutex); 828 kmem_free(bcp, sizeof (bcache_t)); 829 } else { 830 mutex_exit(&bcp->mutex); 831 } 832 } 833 834 bcache_t * 835 bcache_create(char *name, size_t size, uint_t align) 836 { 837 bcache_t *bcp; 838 char buffer[255]; 839 840 ASSERT((align & (align - 1)) == 0); 841 842 if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == 843 NULL) { 844 return (NULL); 845 } 846 847 bcp->size = size; 848 bcp->align = align; 849 bcp->alloc = 0; 850 bcp->destroy = 0; 851 852 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 853 854 (void) sprintf(buffer, "%s_buffer_cache", name); 855 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 856 NULL, NULL, NULL, 0); 857 (void) sprintf(buffer, "%s_dblk_cache", name); 858 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 859 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 860 NULL, (void *)bcp, NULL, 0); 861 862 return (bcp); 863 } 864 865 void 866 bcache_destroy(bcache_t *bcp) 867 { 868 ASSERT(bcp != NULL); 869 870 mutex_enter(&bcp->mutex); 871 if (bcp->alloc == 0) { 872 kmem_cache_destroy(bcp->dblk_cache); 873 kmem_cache_destroy(bcp->buffer_cache); 874 mutex_exit(&bcp->mutex); 875 mutex_destroy(&bcp->mutex); 876 kmem_free(bcp, sizeof (bcache_t)); 877 } else { 878 bcp->destroy++; 879 mutex_exit(&bcp->mutex); 880 } 881 } 882 883 /*ARGSUSED*/ 884 mblk_t * 885 bcache_allocb(bcache_t *bcp, uint_t pri) 886 { 887 dblk_t *dbp; 888 mblk_t *mp = NULL; 889 890 ASSERT(bcp != NULL); 891 892 mutex_enter(&bcp->mutex); 893 if (bcp->destroy != 0) { 894 mutex_exit(&bcp->mutex); 895 goto out; 896 } 897 898 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 899 mutex_exit(&bcp->mutex); 900 goto out; 901 } 902 bcp->alloc++; 903 mutex_exit(&bcp->mutex); 904 905 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 906 907 mp = dbp->db_mblk; 908 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 909 mp->b_next = mp->b_prev = mp->b_cont = NULL; 910 mp->b_rptr = mp->b_wptr = dbp->db_base; 911 mp->b_queue = NULL; 912 MBLK_BAND_FLAG_WORD(mp) = 0; 913 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 914 out: 915 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 916 917 return (mp); 918 } 919 920 static void 921 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 922 { 923 ASSERT(dbp->db_mblk == mp); 924 if (dbp->db_fthdr != NULL) 925 str_ftfree(dbp); 926 927 /* set credp and projid to be 'unspecified' before returning to cache */ 928 if (dbp->db_credp != NULL) { 929 crfree(dbp->db_credp); 930 dbp->db_credp = NULL; 931 } 932 dbp->db_cpid = -1; 933 dbp->db_struioflag = 0; 934 dbp->db_struioun.cksum.flags = 0; 935 936 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 937 kmem_cache_free(dbp->db_cache, dbp); 938 } 939 940 static mblk_t * 941 allocb_oversize(size_t size, int kmflags) 942 { 943 mblk_t *mp; 944 void *buf; 945 946 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 947 if ((buf = kmem_alloc(size, kmflags)) == NULL) 948 return (NULL); 949 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 950 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 951 kmem_free(buf, size); 952 953 if (mp != NULL) 954 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 955 956 return (mp); 957 } 958 959 mblk_t * 960 allocb_tryhard(size_t target_size) 961 { 962 size_t size; 963 mblk_t *bp; 964 965 for (size = target_size; size < target_size + 512; 966 size += DBLK_CACHE_ALIGN) 967 if ((bp = allocb(size, BPRI_HI)) != NULL) 968 return (bp); 969 allocb_tryhard_fails++; 970 return (NULL); 971 } 972 973 /* 974 * This routine is consolidation private for STREAMS internal use 975 * This routine may only be called from sync routines (i.e., not 976 * from put or service procedures). It is located here (rather 977 * than strsubr.c) so that we don't have to expose all of the 978 * allocb() implementation details in header files. 979 */ 980 mblk_t * 981 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 982 { 983 dblk_t *dbp; 984 mblk_t *mp; 985 size_t index; 986 987 index = (size -1) >> DBLK_SIZE_SHIFT; 988 989 if (flags & STR_NOSIG) { 990 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 991 if (size != 0) { 992 mp = allocb_oversize(size, KM_SLEEP); 993 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 994 (uintptr_t)mp); 995 return (mp); 996 } 997 index = 0; 998 } 999 1000 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1001 mp = dbp->db_mblk; 1002 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1003 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1004 mp->b_rptr = mp->b_wptr = dbp->db_base; 1005 mp->b_queue = NULL; 1006 MBLK_BAND_FLAG_WORD(mp) = 0; 1007 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1008 1009 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1010 1011 } else { 1012 while ((mp = allocb(size, pri)) == NULL) { 1013 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1014 return (NULL); 1015 } 1016 } 1017 1018 return (mp); 1019 } 1020 1021 /* 1022 * Call function 'func' with 'arg' when a class zero block can 1023 * be allocated with priority 'pri'. 1024 */ 1025 bufcall_id_t 1026 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1027 { 1028 return (bufcall(1, pri, func, arg)); 1029 } 1030 1031 /* 1032 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1033 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1034 * This provides consistency for all internal allocators of ioctl. 1035 */ 1036 mblk_t * 1037 mkiocb(uint_t cmd) 1038 { 1039 struct iocblk *ioc; 1040 mblk_t *mp; 1041 1042 /* 1043 * Allocate enough space for any of the ioctl related messages. 1044 */ 1045 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1046 return (NULL); 1047 1048 bzero(mp->b_rptr, sizeof (union ioctypes)); 1049 1050 /* 1051 * Set the mblk_t information and ptrs correctly. 1052 */ 1053 mp->b_wptr += sizeof (struct iocblk); 1054 mp->b_datap->db_type = M_IOCTL; 1055 1056 /* 1057 * Fill in the fields. 1058 */ 1059 ioc = (struct iocblk *)mp->b_rptr; 1060 ioc->ioc_cmd = cmd; 1061 ioc->ioc_cr = kcred; 1062 ioc->ioc_id = getiocseqno(); 1063 ioc->ioc_flag = IOC_NATIVE; 1064 return (mp); 1065 } 1066 1067 /* 1068 * test if block of given size can be allocated with a request of 1069 * the given priority. 1070 * 'pri' is no longer used, but is retained for compatibility. 1071 */ 1072 /* ARGSUSED */ 1073 int 1074 testb(size_t size, uint_t pri) 1075 { 1076 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1077 } 1078 1079 /* 1080 * Call function 'func' with argument 'arg' when there is a reasonably 1081 * good chance that a block of size 'size' can be allocated. 1082 * 'pri' is no longer used, but is retained for compatibility. 1083 */ 1084 /* ARGSUSED */ 1085 bufcall_id_t 1086 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1087 { 1088 static long bid = 1; /* always odd to save checking for zero */ 1089 bufcall_id_t bc_id; 1090 struct strbufcall *bcp; 1091 1092 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1093 return (0); 1094 1095 bcp->bc_func = func; 1096 bcp->bc_arg = arg; 1097 bcp->bc_size = size; 1098 bcp->bc_next = NULL; 1099 bcp->bc_executor = NULL; 1100 1101 mutex_enter(&strbcall_lock); 1102 /* 1103 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1104 * should be no references to bcp since it may be freed by 1105 * runbufcalls(). Since bcp_id field is returned, we save its value in 1106 * the local var. 1107 */ 1108 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1109 1110 /* 1111 * add newly allocated stream event to existing 1112 * linked list of events. 1113 */ 1114 if (strbcalls.bc_head == NULL) { 1115 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1116 } else { 1117 strbcalls.bc_tail->bc_next = bcp; 1118 strbcalls.bc_tail = bcp; 1119 } 1120 1121 cv_signal(&strbcall_cv); 1122 mutex_exit(&strbcall_lock); 1123 return (bc_id); 1124 } 1125 1126 /* 1127 * Cancel a bufcall request. 1128 */ 1129 void 1130 unbufcall(bufcall_id_t id) 1131 { 1132 strbufcall_t *bcp, *pbcp; 1133 1134 mutex_enter(&strbcall_lock); 1135 again: 1136 pbcp = NULL; 1137 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1138 if (id == bcp->bc_id) 1139 break; 1140 pbcp = bcp; 1141 } 1142 if (bcp) { 1143 if (bcp->bc_executor != NULL) { 1144 if (bcp->bc_executor != curthread) { 1145 cv_wait(&bcall_cv, &strbcall_lock); 1146 goto again; 1147 } 1148 } else { 1149 if (pbcp) 1150 pbcp->bc_next = bcp->bc_next; 1151 else 1152 strbcalls.bc_head = bcp->bc_next; 1153 if (bcp == strbcalls.bc_tail) 1154 strbcalls.bc_tail = pbcp; 1155 kmem_free(bcp, sizeof (strbufcall_t)); 1156 } 1157 } 1158 mutex_exit(&strbcall_lock); 1159 } 1160 1161 /* 1162 * Duplicate a message block by block (uses dupb), returning 1163 * a pointer to the duplicate message. 1164 * Returns a non-NULL value only if the entire message 1165 * was dup'd. 1166 */ 1167 mblk_t * 1168 dupmsg(mblk_t *bp) 1169 { 1170 mblk_t *head, *nbp; 1171 1172 if (!bp || !(nbp = head = dupb(bp))) 1173 return (NULL); 1174 1175 while (bp->b_cont) { 1176 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1177 freemsg(head); 1178 return (NULL); 1179 } 1180 nbp = nbp->b_cont; 1181 bp = bp->b_cont; 1182 } 1183 return (head); 1184 } 1185 1186 #define DUPB_NOLOAN(bp) \ 1187 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1188 copyb((bp)) : dupb((bp))) 1189 1190 mblk_t * 1191 dupmsg_noloan(mblk_t *bp) 1192 { 1193 mblk_t *head, *nbp; 1194 1195 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1196 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1197 return (NULL); 1198 1199 while (bp->b_cont) { 1200 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1201 freemsg(head); 1202 return (NULL); 1203 } 1204 nbp = nbp->b_cont; 1205 bp = bp->b_cont; 1206 } 1207 return (head); 1208 } 1209 1210 /* 1211 * Copy data from message and data block to newly allocated message and 1212 * data block. Returns new message block pointer, or NULL if error. 1213 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1214 * as in the original even when db_base is not word aligned. (bug 1052877) 1215 */ 1216 mblk_t * 1217 copyb(mblk_t *bp) 1218 { 1219 mblk_t *nbp; 1220 dblk_t *dp, *ndp; 1221 uchar_t *base; 1222 size_t size; 1223 size_t unaligned; 1224 1225 ASSERT(bp->b_wptr >= bp->b_rptr); 1226 1227 dp = bp->b_datap; 1228 if (dp->db_fthdr != NULL) 1229 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1230 1231 /* 1232 * Special handling for Multidata message; this should be 1233 * removed once a copy-callback routine is made available. 1234 */ 1235 if (dp->db_type == M_MULTIDATA) { 1236 cred_t *cr; 1237 1238 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1239 return (NULL); 1240 1241 nbp->b_flag = bp->b_flag; 1242 nbp->b_band = bp->b_band; 1243 ndp = nbp->b_datap; 1244 1245 /* See comments below on potential issues. */ 1246 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1247 1248 ASSERT(ndp->db_type == dp->db_type); 1249 cr = dp->db_credp; 1250 if (cr != NULL) 1251 crhold(ndp->db_credp = cr); 1252 ndp->db_cpid = dp->db_cpid; 1253 return (nbp); 1254 } 1255 1256 size = dp->db_lim - dp->db_base; 1257 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1258 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1259 return (NULL); 1260 nbp->b_flag = bp->b_flag; 1261 nbp->b_band = bp->b_band; 1262 ndp = nbp->b_datap; 1263 1264 /* 1265 * Well, here is a potential issue. If we are trying to 1266 * trace a flow, and we copy the message, we might lose 1267 * information about where this message might have been. 1268 * So we should inherit the FT data. On the other hand, 1269 * a user might be interested only in alloc to free data. 1270 * So I guess the real answer is to provide a tunable. 1271 */ 1272 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1273 1274 base = ndp->db_base + unaligned; 1275 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1276 1277 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1278 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1279 1280 return (nbp); 1281 } 1282 1283 /* 1284 * Copy data from message to newly allocated message using new 1285 * data blocks. Returns a pointer to the new message, or NULL if error. 1286 */ 1287 mblk_t * 1288 copymsg(mblk_t *bp) 1289 { 1290 mblk_t *head, *nbp; 1291 1292 if (!bp || !(nbp = head = copyb(bp))) 1293 return (NULL); 1294 1295 while (bp->b_cont) { 1296 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1297 freemsg(head); 1298 return (NULL); 1299 } 1300 nbp = nbp->b_cont; 1301 bp = bp->b_cont; 1302 } 1303 return (head); 1304 } 1305 1306 /* 1307 * link a message block to tail of message 1308 */ 1309 void 1310 linkb(mblk_t *mp, mblk_t *bp) 1311 { 1312 ASSERT(mp && bp); 1313 1314 for (; mp->b_cont; mp = mp->b_cont) 1315 ; 1316 mp->b_cont = bp; 1317 } 1318 1319 /* 1320 * unlink a message block from head of message 1321 * return pointer to new message. 1322 * NULL if message becomes empty. 1323 */ 1324 mblk_t * 1325 unlinkb(mblk_t *bp) 1326 { 1327 mblk_t *bp1; 1328 1329 bp1 = bp->b_cont; 1330 bp->b_cont = NULL; 1331 return (bp1); 1332 } 1333 1334 /* 1335 * remove a message block "bp" from message "mp" 1336 * 1337 * Return pointer to new message or NULL if no message remains. 1338 * Return -1 if bp is not found in message. 1339 */ 1340 mblk_t * 1341 rmvb(mblk_t *mp, mblk_t *bp) 1342 { 1343 mblk_t *tmp; 1344 mblk_t *lastp = NULL; 1345 1346 ASSERT(mp && bp); 1347 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1348 if (tmp == bp) { 1349 if (lastp) 1350 lastp->b_cont = tmp->b_cont; 1351 else 1352 mp = tmp->b_cont; 1353 tmp->b_cont = NULL; 1354 return (mp); 1355 } 1356 lastp = tmp; 1357 } 1358 return ((mblk_t *)-1); 1359 } 1360 1361 /* 1362 * Concatenate and align first len bytes of common 1363 * message type. Len == -1, means concat everything. 1364 * Returns 1 on success, 0 on failure 1365 * After the pullup, mp points to the pulled up data. 1366 */ 1367 int 1368 pullupmsg(mblk_t *mp, ssize_t len) 1369 { 1370 mblk_t *bp, *b_cont; 1371 dblk_t *dbp; 1372 ssize_t n; 1373 1374 ASSERT(mp->b_datap->db_ref > 0); 1375 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1376 1377 /* 1378 * We won't handle Multidata message, since it contains 1379 * metadata which this function has no knowledge of; we 1380 * assert on DEBUG, and return failure otherwise. 1381 */ 1382 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1383 if (mp->b_datap->db_type == M_MULTIDATA) 1384 return (0); 1385 1386 if (len == -1) { 1387 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1388 return (1); 1389 len = xmsgsize(mp); 1390 } else { 1391 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1392 ASSERT(first_mblk_len >= 0); 1393 /* 1394 * If the length is less than that of the first mblk, 1395 * we want to pull up the message into an aligned mblk. 1396 * Though not part of the spec, some callers assume it. 1397 */ 1398 if (len <= first_mblk_len) { 1399 if (str_aligned(mp->b_rptr)) 1400 return (1); 1401 len = first_mblk_len; 1402 } else if (xmsgsize(mp) < len) 1403 return (0); 1404 } 1405 1406 if ((bp = allocb_tmpl(len, mp)) == NULL) 1407 return (0); 1408 1409 dbp = bp->b_datap; 1410 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1411 mp->b_datap = dbp; /* ... and mp heads the new message */ 1412 mp->b_datap->db_mblk = mp; 1413 bp->b_datap->db_mblk = bp; 1414 mp->b_rptr = mp->b_wptr = dbp->db_base; 1415 1416 do { 1417 ASSERT(bp->b_datap->db_ref > 0); 1418 ASSERT(bp->b_wptr >= bp->b_rptr); 1419 n = MIN(bp->b_wptr - bp->b_rptr, len); 1420 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1421 mp->b_wptr += n; 1422 bp->b_rptr += n; 1423 len -= n; 1424 if (bp->b_rptr != bp->b_wptr) 1425 break; 1426 b_cont = bp->b_cont; 1427 freeb(bp); 1428 bp = b_cont; 1429 } while (len && bp); 1430 1431 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1432 1433 return (1); 1434 } 1435 1436 /* 1437 * Concatenate and align at least the first len bytes of common message 1438 * type. Len == -1 means concatenate everything. The original message is 1439 * unaltered. Returns a pointer to a new message on success, otherwise 1440 * returns NULL. 1441 */ 1442 mblk_t * 1443 msgpullup(mblk_t *mp, ssize_t len) 1444 { 1445 mblk_t *newmp; 1446 ssize_t totlen; 1447 ssize_t n; 1448 1449 /* 1450 * We won't handle Multidata message, since it contains 1451 * metadata which this function has no knowledge of; we 1452 * assert on DEBUG, and return failure otherwise. 1453 */ 1454 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1455 if (mp->b_datap->db_type == M_MULTIDATA) 1456 return (NULL); 1457 1458 totlen = xmsgsize(mp); 1459 1460 if ((len > 0) && (len > totlen)) 1461 return (NULL); 1462 1463 /* 1464 * Copy all of the first msg type into one new mblk, then dupmsg 1465 * and link the rest onto this. 1466 */ 1467 1468 len = totlen; 1469 1470 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1471 return (NULL); 1472 1473 newmp->b_flag = mp->b_flag; 1474 newmp->b_band = mp->b_band; 1475 1476 while (len > 0) { 1477 n = mp->b_wptr - mp->b_rptr; 1478 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1479 if (n > 0) 1480 bcopy(mp->b_rptr, newmp->b_wptr, n); 1481 newmp->b_wptr += n; 1482 len -= n; 1483 mp = mp->b_cont; 1484 } 1485 1486 if (mp != NULL) { 1487 newmp->b_cont = dupmsg(mp); 1488 if (newmp->b_cont == NULL) { 1489 freemsg(newmp); 1490 return (NULL); 1491 } 1492 } 1493 1494 return (newmp); 1495 } 1496 1497 /* 1498 * Trim bytes from message 1499 * len > 0, trim from head 1500 * len < 0, trim from tail 1501 * Returns 1 on success, 0 on failure. 1502 */ 1503 int 1504 adjmsg(mblk_t *mp, ssize_t len) 1505 { 1506 mblk_t *bp; 1507 mblk_t *save_bp = NULL; 1508 mblk_t *prev_bp; 1509 mblk_t *bcont; 1510 unsigned char type; 1511 ssize_t n; 1512 int fromhead; 1513 int first; 1514 1515 ASSERT(mp != NULL); 1516 /* 1517 * We won't handle Multidata message, since it contains 1518 * metadata which this function has no knowledge of; we 1519 * assert on DEBUG, and return failure otherwise. 1520 */ 1521 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1522 if (mp->b_datap->db_type == M_MULTIDATA) 1523 return (0); 1524 1525 if (len < 0) { 1526 fromhead = 0; 1527 len = -len; 1528 } else { 1529 fromhead = 1; 1530 } 1531 1532 if (xmsgsize(mp) < len) 1533 return (0); 1534 1535 1536 if (fromhead) { 1537 first = 1; 1538 while (len) { 1539 ASSERT(mp->b_wptr >= mp->b_rptr); 1540 n = MIN(mp->b_wptr - mp->b_rptr, len); 1541 mp->b_rptr += n; 1542 len -= n; 1543 1544 /* 1545 * If this is not the first zero length 1546 * message remove it 1547 */ 1548 if (!first && (mp->b_wptr == mp->b_rptr)) { 1549 bcont = mp->b_cont; 1550 freeb(mp); 1551 mp = save_bp->b_cont = bcont; 1552 } else { 1553 save_bp = mp; 1554 mp = mp->b_cont; 1555 } 1556 first = 0; 1557 } 1558 } else { 1559 type = mp->b_datap->db_type; 1560 while (len) { 1561 bp = mp; 1562 save_bp = NULL; 1563 1564 /* 1565 * Find the last message of same type 1566 */ 1567 1568 while (bp && bp->b_datap->db_type == type) { 1569 ASSERT(bp->b_wptr >= bp->b_rptr); 1570 prev_bp = save_bp; 1571 save_bp = bp; 1572 bp = bp->b_cont; 1573 } 1574 if (save_bp == NULL) 1575 break; 1576 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1577 save_bp->b_wptr -= n; 1578 len -= n; 1579 1580 /* 1581 * If this is not the first message 1582 * and we have taken away everything 1583 * from this message, remove it 1584 */ 1585 1586 if ((save_bp != mp) && 1587 (save_bp->b_wptr == save_bp->b_rptr)) { 1588 bcont = save_bp->b_cont; 1589 freeb(save_bp); 1590 prev_bp->b_cont = bcont; 1591 } 1592 } 1593 } 1594 return (1); 1595 } 1596 1597 /* 1598 * get number of data bytes in message 1599 */ 1600 size_t 1601 msgdsize(mblk_t *bp) 1602 { 1603 size_t count = 0; 1604 1605 for (; bp; bp = bp->b_cont) 1606 if (bp->b_datap->db_type == M_DATA) { 1607 ASSERT(bp->b_wptr >= bp->b_rptr); 1608 count += bp->b_wptr - bp->b_rptr; 1609 } 1610 return (count); 1611 } 1612 1613 /* 1614 * Get a message off head of queue 1615 * 1616 * If queue has no buffers then mark queue 1617 * with QWANTR. (queue wants to be read by 1618 * someone when data becomes available) 1619 * 1620 * If there is something to take off then do so. 1621 * If queue falls below hi water mark turn off QFULL 1622 * flag. Decrement weighted count of queue. 1623 * Also turn off QWANTR because queue is being read. 1624 * 1625 * The queue count is maintained on a per-band basis. 1626 * Priority band 0 (normal messages) uses q_count, 1627 * q_lowat, etc. Non-zero priority bands use the 1628 * fields in their respective qband structures 1629 * (qb_count, qb_lowat, etc.) All messages appear 1630 * on the same list, linked via their b_next pointers. 1631 * q_first is the head of the list. q_count does 1632 * not reflect the size of all the messages on the 1633 * queue. It only reflects those messages in the 1634 * normal band of flow. The one exception to this 1635 * deals with high priority messages. They are in 1636 * their own conceptual "band", but are accounted 1637 * against q_count. 1638 * 1639 * If queue count is below the lo water mark and QWANTW 1640 * is set, enable the closest backq which has a service 1641 * procedure and turn off the QWANTW flag. 1642 * 1643 * getq could be built on top of rmvq, but isn't because 1644 * of performance considerations. 1645 * 1646 * A note on the use of q_count and q_mblkcnt: 1647 * q_count is the traditional byte count for messages that 1648 * have been put on a queue. Documentation tells us that 1649 * we shouldn't rely on that count, but some drivers/modules 1650 * do. What was needed, however, is a mechanism to prevent 1651 * runaway streams from consuming all of the resources, 1652 * and particularly be able to flow control zero-length 1653 * messages. q_mblkcnt is used for this purpose. It 1654 * counts the number of mblk's that are being put on 1655 * the queue. The intention here, is that each mblk should 1656 * contain one byte of data and, for the purpose of 1657 * flow-control, logically does. A queue will become 1658 * full when EITHER of these values (q_count and q_mblkcnt) 1659 * reach the highwater mark. It will clear when BOTH 1660 * of them drop below the highwater mark. And it will 1661 * backenable when BOTH of them drop below the lowwater 1662 * mark. 1663 * With this algorithm, a driver/module might be able 1664 * to find a reasonably accurate q_count, and the 1665 * framework can still try and limit resource usage. 1666 */ 1667 mblk_t * 1668 getq(queue_t *q) 1669 { 1670 mblk_t *bp; 1671 uchar_t band = 0; 1672 1673 bp = getq_noenab(q); 1674 if (bp != NULL) 1675 band = bp->b_band; 1676 1677 /* 1678 * Inlined from qbackenable(). 1679 * Quick check without holding the lock. 1680 */ 1681 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1682 return (bp); 1683 1684 qbackenable(q, band); 1685 return (bp); 1686 } 1687 1688 /* 1689 * Calculate number of data bytes in a single data message block taking 1690 * multidata messages into account. 1691 */ 1692 1693 #define ADD_MBLK_SIZE(mp, size) \ 1694 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1695 (size) += MBLKL(mp); \ 1696 } else { \ 1697 uint_t pinuse; \ 1698 \ 1699 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1700 (size) += pinuse; \ 1701 } 1702 1703 /* 1704 * Like getq() but does not backenable. This is used by the stream 1705 * head when a putback() is likely. The caller must call qbackenable() 1706 * after it is done with accessing the queue. 1707 */ 1708 mblk_t * 1709 getq_noenab(queue_t *q) 1710 { 1711 mblk_t *bp; 1712 mblk_t *tmp; 1713 qband_t *qbp; 1714 kthread_id_t freezer; 1715 int bytecnt = 0, mblkcnt = 0; 1716 1717 /* freezestr should allow its caller to call getq/putq */ 1718 freezer = STREAM(q)->sd_freezer; 1719 if (freezer == curthread) { 1720 ASSERT(frozenstr(q)); 1721 ASSERT(MUTEX_HELD(QLOCK(q))); 1722 } else 1723 mutex_enter(QLOCK(q)); 1724 1725 if ((bp = q->q_first) == 0) { 1726 q->q_flag |= QWANTR; 1727 } else { 1728 if ((q->q_first = bp->b_next) == NULL) 1729 q->q_last = NULL; 1730 else 1731 q->q_first->b_prev = NULL; 1732 1733 /* Get message byte count for q_count accounting */ 1734 for (tmp = bp; tmp; tmp = tmp->b_cont) { 1735 ADD_MBLK_SIZE(tmp, bytecnt); 1736 mblkcnt++; 1737 } 1738 1739 if (bp->b_band == 0) { 1740 q->q_count -= bytecnt; 1741 q->q_mblkcnt -= mblkcnt; 1742 if ((q->q_count < q->q_hiwat) && 1743 (q->q_mblkcnt < q->q_hiwat)) { 1744 q->q_flag &= ~QFULL; 1745 } 1746 } else { 1747 int i; 1748 1749 ASSERT(bp->b_band <= q->q_nband); 1750 ASSERT(q->q_bandp != NULL); 1751 ASSERT(MUTEX_HELD(QLOCK(q))); 1752 qbp = q->q_bandp; 1753 i = bp->b_band; 1754 while (--i > 0) 1755 qbp = qbp->qb_next; 1756 if (qbp->qb_first == qbp->qb_last) { 1757 qbp->qb_first = NULL; 1758 qbp->qb_last = NULL; 1759 } else { 1760 qbp->qb_first = bp->b_next; 1761 } 1762 qbp->qb_count -= bytecnt; 1763 qbp->qb_mblkcnt -= mblkcnt; 1764 if ((qbp->qb_count < qbp->qb_hiwat) && 1765 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1766 qbp->qb_flag &= ~QB_FULL; 1767 } 1768 } 1769 q->q_flag &= ~QWANTR; 1770 bp->b_next = NULL; 1771 bp->b_prev = NULL; 1772 } 1773 if (freezer != curthread) 1774 mutex_exit(QLOCK(q)); 1775 1776 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 1777 1778 return (bp); 1779 } 1780 1781 /* 1782 * Determine if a backenable is needed after removing a message in the 1783 * specified band. 1784 * NOTE: This routine assumes that something like getq_noenab() has been 1785 * already called. 1786 * 1787 * For the read side it is ok to hold sd_lock across calling this (and the 1788 * stream head often does). 1789 * But for the write side strwakeq might be invoked and it acquires sd_lock. 1790 */ 1791 void 1792 qbackenable(queue_t *q, uchar_t band) 1793 { 1794 int backenab = 0; 1795 qband_t *qbp; 1796 kthread_id_t freezer; 1797 1798 ASSERT(q); 1799 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 1800 1801 /* 1802 * Quick check without holding the lock. 1803 * OK since after getq() has lowered the q_count these flags 1804 * would not change unless either the qbackenable() is done by 1805 * another thread (which is ok) or the queue has gotten QFULL 1806 * in which case another backenable will take place when the queue 1807 * drops below q_lowat. 1808 */ 1809 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1810 return; 1811 1812 /* freezestr should allow its caller to call getq/putq */ 1813 freezer = STREAM(q)->sd_freezer; 1814 if (freezer == curthread) { 1815 ASSERT(frozenstr(q)); 1816 ASSERT(MUTEX_HELD(QLOCK(q))); 1817 } else 1818 mutex_enter(QLOCK(q)); 1819 1820 if (band == 0) { 1821 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 1822 q->q_mblkcnt < q->q_lowat)) { 1823 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 1824 } 1825 } else { 1826 int i; 1827 1828 ASSERT((unsigned)band <= q->q_nband); 1829 ASSERT(q->q_bandp != NULL); 1830 1831 qbp = q->q_bandp; 1832 i = band; 1833 while (--i > 0) 1834 qbp = qbp->qb_next; 1835 1836 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 1837 qbp->qb_mblkcnt < qbp->qb_lowat)) { 1838 backenab = qbp->qb_flag & QB_WANTW; 1839 } 1840 } 1841 1842 if (backenab == 0) { 1843 if (freezer != curthread) 1844 mutex_exit(QLOCK(q)); 1845 return; 1846 } 1847 1848 /* Have to drop the lock across strwakeq and backenable */ 1849 if (backenab & QWANTWSYNC) 1850 q->q_flag &= ~QWANTWSYNC; 1851 if (backenab & (QWANTW|QB_WANTW)) { 1852 if (band != 0) 1853 qbp->qb_flag &= ~QB_WANTW; 1854 else { 1855 q->q_flag &= ~QWANTW; 1856 } 1857 } 1858 1859 if (freezer != curthread) 1860 mutex_exit(QLOCK(q)); 1861 1862 if (backenab & QWANTWSYNC) 1863 strwakeq(q, QWANTWSYNC); 1864 if (backenab & (QWANTW|QB_WANTW)) 1865 backenable(q, band); 1866 } 1867 1868 /* 1869 * Remove a message from a queue. The queue count and other 1870 * flow control parameters are adjusted and the back queue 1871 * enabled if necessary. 1872 * 1873 * rmvq can be called with the stream frozen, but other utility functions 1874 * holding QLOCK, and by streams modules without any locks/frozen. 1875 */ 1876 void 1877 rmvq(queue_t *q, mblk_t *mp) 1878 { 1879 ASSERT(mp != NULL); 1880 1881 rmvq_noenab(q, mp); 1882 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 1883 /* 1884 * qbackenable can handle a frozen stream but not a "random" 1885 * qlock being held. Drop lock across qbackenable. 1886 */ 1887 mutex_exit(QLOCK(q)); 1888 qbackenable(q, mp->b_band); 1889 mutex_enter(QLOCK(q)); 1890 } else { 1891 qbackenable(q, mp->b_band); 1892 } 1893 } 1894 1895 /* 1896 * Like rmvq() but without any backenabling. 1897 * This exists to handle SR_CONSOL_DATA in strrput(). 1898 */ 1899 void 1900 rmvq_noenab(queue_t *q, mblk_t *mp) 1901 { 1902 mblk_t *tmp; 1903 int i; 1904 qband_t *qbp = NULL; 1905 kthread_id_t freezer; 1906 int bytecnt = 0, mblkcnt = 0; 1907 1908 freezer = STREAM(q)->sd_freezer; 1909 if (freezer == curthread) { 1910 ASSERT(frozenstr(q)); 1911 ASSERT(MUTEX_HELD(QLOCK(q))); 1912 } else if (MUTEX_HELD(QLOCK(q))) { 1913 /* Don't drop lock on exit */ 1914 freezer = curthread; 1915 } else 1916 mutex_enter(QLOCK(q)); 1917 1918 ASSERT(mp->b_band <= q->q_nband); 1919 if (mp->b_band != 0) { /* Adjust band pointers */ 1920 ASSERT(q->q_bandp != NULL); 1921 qbp = q->q_bandp; 1922 i = mp->b_band; 1923 while (--i > 0) 1924 qbp = qbp->qb_next; 1925 if (mp == qbp->qb_first) { 1926 if (mp->b_next && mp->b_band == mp->b_next->b_band) 1927 qbp->qb_first = mp->b_next; 1928 else 1929 qbp->qb_first = NULL; 1930 } 1931 if (mp == qbp->qb_last) { 1932 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 1933 qbp->qb_last = mp->b_prev; 1934 else 1935 qbp->qb_last = NULL; 1936 } 1937 } 1938 1939 /* 1940 * Remove the message from the list. 1941 */ 1942 if (mp->b_prev) 1943 mp->b_prev->b_next = mp->b_next; 1944 else 1945 q->q_first = mp->b_next; 1946 if (mp->b_next) 1947 mp->b_next->b_prev = mp->b_prev; 1948 else 1949 q->q_last = mp->b_prev; 1950 mp->b_next = NULL; 1951 mp->b_prev = NULL; 1952 1953 /* Get the size of the message for q_count accounting */ 1954 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1955 ADD_MBLK_SIZE(tmp, bytecnt); 1956 mblkcnt++; 1957 } 1958 1959 if (mp->b_band == 0) { /* Perform q_count accounting */ 1960 q->q_count -= bytecnt; 1961 q->q_mblkcnt -= mblkcnt; 1962 if ((q->q_count < q->q_hiwat) && 1963 (q->q_mblkcnt < q->q_hiwat)) { 1964 q->q_flag &= ~QFULL; 1965 } 1966 } else { /* Perform qb_count accounting */ 1967 qbp->qb_count -= bytecnt; 1968 qbp->qb_mblkcnt -= mblkcnt; 1969 if ((qbp->qb_count < qbp->qb_hiwat) && 1970 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1971 qbp->qb_flag &= ~QB_FULL; 1972 } 1973 } 1974 if (freezer != curthread) 1975 mutex_exit(QLOCK(q)); 1976 1977 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 1978 } 1979 1980 /* 1981 * Empty a queue. 1982 * If flag is set, remove all messages. Otherwise, remove 1983 * only non-control messages. If queue falls below its low 1984 * water mark, and QWANTW is set, enable the nearest upstream 1985 * service procedure. 1986 * 1987 * Historical note: when merging the M_FLUSH code in strrput with this 1988 * code one difference was discovered. flushq did not have a check 1989 * for q_lowat == 0 in the backenabling test. 1990 * 1991 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 1992 * if one exists on the queue. 1993 */ 1994 void 1995 flushq_common(queue_t *q, int flag, int pcproto_flag) 1996 { 1997 mblk_t *mp, *nmp; 1998 qband_t *qbp; 1999 int backenab = 0; 2000 unsigned char bpri; 2001 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2002 2003 if (q->q_first == NULL) 2004 return; 2005 2006 mutex_enter(QLOCK(q)); 2007 mp = q->q_first; 2008 q->q_first = NULL; 2009 q->q_last = NULL; 2010 q->q_count = 0; 2011 q->q_mblkcnt = 0; 2012 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2013 qbp->qb_first = NULL; 2014 qbp->qb_last = NULL; 2015 qbp->qb_count = 0; 2016 qbp->qb_mblkcnt = 0; 2017 qbp->qb_flag &= ~QB_FULL; 2018 } 2019 q->q_flag &= ~QFULL; 2020 mutex_exit(QLOCK(q)); 2021 while (mp) { 2022 nmp = mp->b_next; 2023 mp->b_next = mp->b_prev = NULL; 2024 2025 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2026 2027 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2028 (void) putq(q, mp); 2029 else if (flag || datamsg(mp->b_datap->db_type)) 2030 freemsg(mp); 2031 else 2032 (void) putq(q, mp); 2033 mp = nmp; 2034 } 2035 bpri = 1; 2036 mutex_enter(QLOCK(q)); 2037 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2038 if ((qbp->qb_flag & QB_WANTW) && 2039 (((qbp->qb_count < qbp->qb_lowat) && 2040 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2041 qbp->qb_lowat == 0)) { 2042 qbp->qb_flag &= ~QB_WANTW; 2043 backenab = 1; 2044 qbf[bpri] = 1; 2045 } else 2046 qbf[bpri] = 0; 2047 bpri++; 2048 } 2049 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2050 if ((q->q_flag & QWANTW) && 2051 (((q->q_count < q->q_lowat) && 2052 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2053 q->q_flag &= ~QWANTW; 2054 backenab = 1; 2055 qbf[0] = 1; 2056 } else 2057 qbf[0] = 0; 2058 2059 /* 2060 * If any band can now be written to, and there is a writer 2061 * for that band, then backenable the closest service procedure. 2062 */ 2063 if (backenab) { 2064 mutex_exit(QLOCK(q)); 2065 for (bpri = q->q_nband; bpri != 0; bpri--) 2066 if (qbf[bpri]) 2067 backenable(q, bpri); 2068 if (qbf[0]) 2069 backenable(q, 0); 2070 } else 2071 mutex_exit(QLOCK(q)); 2072 } 2073 2074 /* 2075 * The real flushing takes place in flushq_common. This is done so that 2076 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2077 * or not. Currently the only place that uses this flag is the stream head. 2078 */ 2079 void 2080 flushq(queue_t *q, int flag) 2081 { 2082 flushq_common(q, flag, 0); 2083 } 2084 2085 /* 2086 * Flush the queue of messages of the given priority band. 2087 * There is some duplication of code between flushq and flushband. 2088 * This is because we want to optimize the code as much as possible. 2089 * The assumption is that there will be more messages in the normal 2090 * (priority 0) band than in any other. 2091 * 2092 * Historical note: when merging the M_FLUSH code in strrput with this 2093 * code one difference was discovered. flushband had an extra check for 2094 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2095 * case. That check does not match the man page for flushband and was not 2096 * in the strrput flush code hence it was removed. 2097 */ 2098 void 2099 flushband(queue_t *q, unsigned char pri, int flag) 2100 { 2101 mblk_t *mp; 2102 mblk_t *nmp; 2103 mblk_t *last; 2104 qband_t *qbp; 2105 int band; 2106 2107 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2108 if (pri > q->q_nband) { 2109 return; 2110 } 2111 mutex_enter(QLOCK(q)); 2112 if (pri == 0) { 2113 mp = q->q_first; 2114 q->q_first = NULL; 2115 q->q_last = NULL; 2116 q->q_count = 0; 2117 q->q_mblkcnt = 0; 2118 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2119 qbp->qb_first = NULL; 2120 qbp->qb_last = NULL; 2121 qbp->qb_count = 0; 2122 qbp->qb_mblkcnt = 0; 2123 qbp->qb_flag &= ~QB_FULL; 2124 } 2125 q->q_flag &= ~QFULL; 2126 mutex_exit(QLOCK(q)); 2127 while (mp) { 2128 nmp = mp->b_next; 2129 mp->b_next = mp->b_prev = NULL; 2130 if ((mp->b_band == 0) && 2131 ((flag == FLUSHALL) || 2132 datamsg(mp->b_datap->db_type))) 2133 freemsg(mp); 2134 else 2135 (void) putq(q, mp); 2136 mp = nmp; 2137 } 2138 mutex_enter(QLOCK(q)); 2139 if ((q->q_flag & QWANTW) && 2140 (((q->q_count < q->q_lowat) && 2141 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2142 q->q_flag &= ~QWANTW; 2143 mutex_exit(QLOCK(q)); 2144 2145 backenable(q, pri); 2146 } else 2147 mutex_exit(QLOCK(q)); 2148 } else { /* pri != 0 */ 2149 boolean_t flushed = B_FALSE; 2150 band = pri; 2151 2152 ASSERT(MUTEX_HELD(QLOCK(q))); 2153 qbp = q->q_bandp; 2154 while (--band > 0) 2155 qbp = qbp->qb_next; 2156 mp = qbp->qb_first; 2157 if (mp == NULL) { 2158 mutex_exit(QLOCK(q)); 2159 return; 2160 } 2161 last = qbp->qb_last->b_next; 2162 /* 2163 * rmvq_noenab() and freemsg() are called for each mblk that 2164 * meets the criteria. The loop is executed until the last 2165 * mblk has been processed. 2166 */ 2167 while (mp != last) { 2168 ASSERT(mp->b_band == pri); 2169 nmp = mp->b_next; 2170 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2171 rmvq_noenab(q, mp); 2172 freemsg(mp); 2173 flushed = B_TRUE; 2174 } 2175 mp = nmp; 2176 } 2177 mutex_exit(QLOCK(q)); 2178 2179 /* 2180 * If any mblk(s) has been freed, we know that qbackenable() 2181 * will need to be called. 2182 */ 2183 if (flushed) 2184 qbackenable(q, pri); 2185 } 2186 } 2187 2188 /* 2189 * Return 1 if the queue is not full. If the queue is full, return 2190 * 0 (may not put message) and set QWANTW flag (caller wants to write 2191 * to the queue). 2192 */ 2193 int 2194 canput(queue_t *q) 2195 { 2196 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2197 2198 /* this is for loopback transports, they should not do a canput */ 2199 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2200 2201 /* Find next forward module that has a service procedure */ 2202 q = q->q_nfsrv; 2203 2204 if (!(q->q_flag & QFULL)) { 2205 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2206 return (1); 2207 } 2208 mutex_enter(QLOCK(q)); 2209 if (q->q_flag & QFULL) { 2210 q->q_flag |= QWANTW; 2211 mutex_exit(QLOCK(q)); 2212 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2213 return (0); 2214 } 2215 mutex_exit(QLOCK(q)); 2216 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2217 return (1); 2218 } 2219 2220 /* 2221 * This is the new canput for use with priority bands. Return 1 if the 2222 * band is not full. If the band is full, return 0 (may not put message) 2223 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2224 * write to the queue). 2225 */ 2226 int 2227 bcanput(queue_t *q, unsigned char pri) 2228 { 2229 qband_t *qbp; 2230 2231 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2232 if (!q) 2233 return (0); 2234 2235 /* Find next forward module that has a service procedure */ 2236 q = q->q_nfsrv; 2237 2238 mutex_enter(QLOCK(q)); 2239 if (pri == 0) { 2240 if (q->q_flag & QFULL) { 2241 q->q_flag |= QWANTW; 2242 mutex_exit(QLOCK(q)); 2243 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2244 "bcanput:%p %X %d", q, pri, 0); 2245 return (0); 2246 } 2247 } else { /* pri != 0 */ 2248 if (pri > q->q_nband) { 2249 /* 2250 * No band exists yet, so return success. 2251 */ 2252 mutex_exit(QLOCK(q)); 2253 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2254 "bcanput:%p %X %d", q, pri, 1); 2255 return (1); 2256 } 2257 qbp = q->q_bandp; 2258 while (--pri) 2259 qbp = qbp->qb_next; 2260 if (qbp->qb_flag & QB_FULL) { 2261 qbp->qb_flag |= QB_WANTW; 2262 mutex_exit(QLOCK(q)); 2263 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2264 "bcanput:%p %X %d", q, pri, 0); 2265 return (0); 2266 } 2267 } 2268 mutex_exit(QLOCK(q)); 2269 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2270 "bcanput:%p %X %d", q, pri, 1); 2271 return (1); 2272 } 2273 2274 /* 2275 * Put a message on a queue. 2276 * 2277 * Messages are enqueued on a priority basis. The priority classes 2278 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2279 * and B_NORMAL (type < QPCTL && band == 0). 2280 * 2281 * Add appropriate weighted data block sizes to queue count. 2282 * If queue hits high water mark then set QFULL flag. 2283 * 2284 * If QNOENAB is not set (putq is allowed to enable the queue), 2285 * enable the queue only if the message is PRIORITY, 2286 * or the QWANTR flag is set (indicating that the service procedure 2287 * is ready to read the queue. This implies that a service 2288 * procedure must NEVER put a high priority message back on its own 2289 * queue, as this would result in an infinite loop (!). 2290 */ 2291 int 2292 putq(queue_t *q, mblk_t *bp) 2293 { 2294 mblk_t *tmp; 2295 qband_t *qbp = NULL; 2296 int mcls = (int)queclass(bp); 2297 kthread_id_t freezer; 2298 int bytecnt = 0, mblkcnt = 0; 2299 2300 freezer = STREAM(q)->sd_freezer; 2301 if (freezer == curthread) { 2302 ASSERT(frozenstr(q)); 2303 ASSERT(MUTEX_HELD(QLOCK(q))); 2304 } else 2305 mutex_enter(QLOCK(q)); 2306 2307 /* 2308 * Make sanity checks and if qband structure is not yet 2309 * allocated, do so. 2310 */ 2311 if (mcls == QPCTL) { 2312 if (bp->b_band != 0) 2313 bp->b_band = 0; /* force to be correct */ 2314 } else if (bp->b_band != 0) { 2315 int i; 2316 qband_t **qbpp; 2317 2318 if (bp->b_band > q->q_nband) { 2319 2320 /* 2321 * The qband structure for this priority band is 2322 * not on the queue yet, so we have to allocate 2323 * one on the fly. It would be wasteful to 2324 * associate the qband structures with every 2325 * queue when the queues are allocated. This is 2326 * because most queues will only need the normal 2327 * band of flow which can be described entirely 2328 * by the queue itself. 2329 */ 2330 qbpp = &q->q_bandp; 2331 while (*qbpp) 2332 qbpp = &(*qbpp)->qb_next; 2333 while (bp->b_band > q->q_nband) { 2334 if ((*qbpp = allocband()) == NULL) { 2335 if (freezer != curthread) 2336 mutex_exit(QLOCK(q)); 2337 return (0); 2338 } 2339 (*qbpp)->qb_hiwat = q->q_hiwat; 2340 (*qbpp)->qb_lowat = q->q_lowat; 2341 q->q_nband++; 2342 qbpp = &(*qbpp)->qb_next; 2343 } 2344 } 2345 ASSERT(MUTEX_HELD(QLOCK(q))); 2346 qbp = q->q_bandp; 2347 i = bp->b_band; 2348 while (--i) 2349 qbp = qbp->qb_next; 2350 } 2351 2352 /* 2353 * If queue is empty, add the message and initialize the pointers. 2354 * Otherwise, adjust message pointers and queue pointers based on 2355 * the type of the message and where it belongs on the queue. Some 2356 * code is duplicated to minimize the number of conditionals and 2357 * hopefully minimize the amount of time this routine takes. 2358 */ 2359 if (!q->q_first) { 2360 bp->b_next = NULL; 2361 bp->b_prev = NULL; 2362 q->q_first = bp; 2363 q->q_last = bp; 2364 if (qbp) { 2365 qbp->qb_first = bp; 2366 qbp->qb_last = bp; 2367 } 2368 } else if (!qbp) { /* bp->b_band == 0 */ 2369 2370 /* 2371 * If queue class of message is less than or equal to 2372 * that of the last one on the queue, tack on to the end. 2373 */ 2374 tmp = q->q_last; 2375 if (mcls <= (int)queclass(tmp)) { 2376 bp->b_next = NULL; 2377 bp->b_prev = tmp; 2378 tmp->b_next = bp; 2379 q->q_last = bp; 2380 } else { 2381 tmp = q->q_first; 2382 while ((int)queclass(tmp) >= mcls) 2383 tmp = tmp->b_next; 2384 2385 /* 2386 * Insert bp before tmp. 2387 */ 2388 bp->b_next = tmp; 2389 bp->b_prev = tmp->b_prev; 2390 if (tmp->b_prev) 2391 tmp->b_prev->b_next = bp; 2392 else 2393 q->q_first = bp; 2394 tmp->b_prev = bp; 2395 } 2396 } else { /* bp->b_band != 0 */ 2397 if (qbp->qb_first) { 2398 tmp = qbp->qb_last; 2399 2400 /* 2401 * Insert bp after the last message in this band. 2402 */ 2403 bp->b_next = tmp->b_next; 2404 if (tmp->b_next) 2405 tmp->b_next->b_prev = bp; 2406 else 2407 q->q_last = bp; 2408 bp->b_prev = tmp; 2409 tmp->b_next = bp; 2410 } else { 2411 tmp = q->q_last; 2412 if ((mcls < (int)queclass(tmp)) || 2413 (bp->b_band <= tmp->b_band)) { 2414 2415 /* 2416 * Tack bp on end of queue. 2417 */ 2418 bp->b_next = NULL; 2419 bp->b_prev = tmp; 2420 tmp->b_next = bp; 2421 q->q_last = bp; 2422 } else { 2423 tmp = q->q_first; 2424 while (tmp->b_datap->db_type >= QPCTL) 2425 tmp = tmp->b_next; 2426 while (tmp->b_band >= bp->b_band) 2427 tmp = tmp->b_next; 2428 2429 /* 2430 * Insert bp before tmp. 2431 */ 2432 bp->b_next = tmp; 2433 bp->b_prev = tmp->b_prev; 2434 if (tmp->b_prev) 2435 tmp->b_prev->b_next = bp; 2436 else 2437 q->q_first = bp; 2438 tmp->b_prev = bp; 2439 } 2440 qbp->qb_first = bp; 2441 } 2442 qbp->qb_last = bp; 2443 } 2444 2445 /* Get message byte count for q_count accounting */ 2446 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2447 ADD_MBLK_SIZE(tmp, bytecnt); 2448 mblkcnt++; 2449 } 2450 2451 if (qbp) { 2452 qbp->qb_count += bytecnt; 2453 qbp->qb_mblkcnt += mblkcnt; 2454 if ((qbp->qb_count >= qbp->qb_hiwat) || 2455 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2456 qbp->qb_flag |= QB_FULL; 2457 } 2458 } else { 2459 q->q_count += bytecnt; 2460 q->q_mblkcnt += mblkcnt; 2461 if ((q->q_count >= q->q_hiwat) || 2462 (q->q_mblkcnt >= q->q_hiwat)) { 2463 q->q_flag |= QFULL; 2464 } 2465 } 2466 2467 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2468 2469 if ((mcls > QNORM) || 2470 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2471 qenable_locked(q); 2472 ASSERT(MUTEX_HELD(QLOCK(q))); 2473 if (freezer != curthread) 2474 mutex_exit(QLOCK(q)); 2475 2476 return (1); 2477 } 2478 2479 /* 2480 * Put stuff back at beginning of Q according to priority order. 2481 * See comment on putq above for details. 2482 */ 2483 int 2484 putbq(queue_t *q, mblk_t *bp) 2485 { 2486 mblk_t *tmp; 2487 qband_t *qbp = NULL; 2488 int mcls = (int)queclass(bp); 2489 kthread_id_t freezer; 2490 int bytecnt = 0, mblkcnt = 0; 2491 2492 ASSERT(q && bp); 2493 ASSERT(bp->b_next == NULL); 2494 freezer = STREAM(q)->sd_freezer; 2495 if (freezer == curthread) { 2496 ASSERT(frozenstr(q)); 2497 ASSERT(MUTEX_HELD(QLOCK(q))); 2498 } else 2499 mutex_enter(QLOCK(q)); 2500 2501 /* 2502 * Make sanity checks and if qband structure is not yet 2503 * allocated, do so. 2504 */ 2505 if (mcls == QPCTL) { 2506 if (bp->b_band != 0) 2507 bp->b_band = 0; /* force to be correct */ 2508 } else if (bp->b_band != 0) { 2509 int i; 2510 qband_t **qbpp; 2511 2512 if (bp->b_band > q->q_nband) { 2513 qbpp = &q->q_bandp; 2514 while (*qbpp) 2515 qbpp = &(*qbpp)->qb_next; 2516 while (bp->b_band > q->q_nband) { 2517 if ((*qbpp = allocband()) == NULL) { 2518 if (freezer != curthread) 2519 mutex_exit(QLOCK(q)); 2520 return (0); 2521 } 2522 (*qbpp)->qb_hiwat = q->q_hiwat; 2523 (*qbpp)->qb_lowat = q->q_lowat; 2524 q->q_nband++; 2525 qbpp = &(*qbpp)->qb_next; 2526 } 2527 } 2528 qbp = q->q_bandp; 2529 i = bp->b_band; 2530 while (--i) 2531 qbp = qbp->qb_next; 2532 } 2533 2534 /* 2535 * If queue is empty or if message is high priority, 2536 * place on the front of the queue. 2537 */ 2538 tmp = q->q_first; 2539 if ((!tmp) || (mcls == QPCTL)) { 2540 bp->b_next = tmp; 2541 if (tmp) 2542 tmp->b_prev = bp; 2543 else 2544 q->q_last = bp; 2545 q->q_first = bp; 2546 bp->b_prev = NULL; 2547 if (qbp) { 2548 qbp->qb_first = bp; 2549 qbp->qb_last = bp; 2550 } 2551 } else if (qbp) { /* bp->b_band != 0 */ 2552 tmp = qbp->qb_first; 2553 if (tmp) { 2554 2555 /* 2556 * Insert bp before the first message in this band. 2557 */ 2558 bp->b_next = tmp; 2559 bp->b_prev = tmp->b_prev; 2560 if (tmp->b_prev) 2561 tmp->b_prev->b_next = bp; 2562 else 2563 q->q_first = bp; 2564 tmp->b_prev = bp; 2565 } else { 2566 tmp = q->q_last; 2567 if ((mcls < (int)queclass(tmp)) || 2568 (bp->b_band < tmp->b_band)) { 2569 2570 /* 2571 * Tack bp on end of queue. 2572 */ 2573 bp->b_next = NULL; 2574 bp->b_prev = tmp; 2575 tmp->b_next = bp; 2576 q->q_last = bp; 2577 } else { 2578 tmp = q->q_first; 2579 while (tmp->b_datap->db_type >= QPCTL) 2580 tmp = tmp->b_next; 2581 while (tmp->b_band > bp->b_band) 2582 tmp = tmp->b_next; 2583 2584 /* 2585 * Insert bp before tmp. 2586 */ 2587 bp->b_next = tmp; 2588 bp->b_prev = tmp->b_prev; 2589 if (tmp->b_prev) 2590 tmp->b_prev->b_next = bp; 2591 else 2592 q->q_first = bp; 2593 tmp->b_prev = bp; 2594 } 2595 qbp->qb_last = bp; 2596 } 2597 qbp->qb_first = bp; 2598 } else { /* bp->b_band == 0 && !QPCTL */ 2599 2600 /* 2601 * If the queue class or band is less than that of the last 2602 * message on the queue, tack bp on the end of the queue. 2603 */ 2604 tmp = q->q_last; 2605 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2606 bp->b_next = NULL; 2607 bp->b_prev = tmp; 2608 tmp->b_next = bp; 2609 q->q_last = bp; 2610 } else { 2611 tmp = q->q_first; 2612 while (tmp->b_datap->db_type >= QPCTL) 2613 tmp = tmp->b_next; 2614 while (tmp->b_band > bp->b_band) 2615 tmp = tmp->b_next; 2616 2617 /* 2618 * Insert bp before tmp. 2619 */ 2620 bp->b_next = tmp; 2621 bp->b_prev = tmp->b_prev; 2622 if (tmp->b_prev) 2623 tmp->b_prev->b_next = bp; 2624 else 2625 q->q_first = bp; 2626 tmp->b_prev = bp; 2627 } 2628 } 2629 2630 /* Get message byte count for q_count accounting */ 2631 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2632 ADD_MBLK_SIZE(tmp, bytecnt); 2633 mblkcnt++; 2634 } 2635 if (qbp) { 2636 qbp->qb_count += bytecnt; 2637 qbp->qb_mblkcnt += mblkcnt; 2638 if ((qbp->qb_count >= qbp->qb_hiwat) || 2639 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2640 qbp->qb_flag |= QB_FULL; 2641 } 2642 } else { 2643 q->q_count += bytecnt; 2644 q->q_mblkcnt += mblkcnt; 2645 if ((q->q_count >= q->q_hiwat) || 2646 (q->q_mblkcnt >= q->q_hiwat)) { 2647 q->q_flag |= QFULL; 2648 } 2649 } 2650 2651 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2652 2653 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2654 qenable_locked(q); 2655 ASSERT(MUTEX_HELD(QLOCK(q))); 2656 if (freezer != curthread) 2657 mutex_exit(QLOCK(q)); 2658 2659 return (1); 2660 } 2661 2662 /* 2663 * Insert a message before an existing message on the queue. If the 2664 * existing message is NULL, the new messages is placed on the end of 2665 * the queue. The queue class of the new message is ignored. However, 2666 * the priority band of the new message must adhere to the following 2667 * ordering: 2668 * 2669 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2670 * 2671 * All flow control parameters are updated. 2672 * 2673 * insq can be called with the stream frozen, but other utility functions 2674 * holding QLOCK, and by streams modules without any locks/frozen. 2675 */ 2676 int 2677 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2678 { 2679 mblk_t *tmp; 2680 qband_t *qbp = NULL; 2681 int mcls = (int)queclass(mp); 2682 kthread_id_t freezer; 2683 int bytecnt = 0, mblkcnt = 0; 2684 2685 freezer = STREAM(q)->sd_freezer; 2686 if (freezer == curthread) { 2687 ASSERT(frozenstr(q)); 2688 ASSERT(MUTEX_HELD(QLOCK(q))); 2689 } else if (MUTEX_HELD(QLOCK(q))) { 2690 /* Don't drop lock on exit */ 2691 freezer = curthread; 2692 } else 2693 mutex_enter(QLOCK(q)); 2694 2695 if (mcls == QPCTL) { 2696 if (mp->b_band != 0) 2697 mp->b_band = 0; /* force to be correct */ 2698 if (emp && emp->b_prev && 2699 (emp->b_prev->b_datap->db_type < QPCTL)) 2700 goto badord; 2701 } 2702 if (emp) { 2703 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2704 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2705 (emp->b_prev->b_band < mp->b_band))) { 2706 goto badord; 2707 } 2708 } else { 2709 tmp = q->q_last; 2710 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2711 badord: 2712 cmn_err(CE_WARN, 2713 "insq: attempt to insert message out of order " 2714 "on q %p", (void *)q); 2715 if (freezer != curthread) 2716 mutex_exit(QLOCK(q)); 2717 return (0); 2718 } 2719 } 2720 2721 if (mp->b_band != 0) { 2722 int i; 2723 qband_t **qbpp; 2724 2725 if (mp->b_band > q->q_nband) { 2726 qbpp = &q->q_bandp; 2727 while (*qbpp) 2728 qbpp = &(*qbpp)->qb_next; 2729 while (mp->b_band > q->q_nband) { 2730 if ((*qbpp = allocband()) == NULL) { 2731 if (freezer != curthread) 2732 mutex_exit(QLOCK(q)); 2733 return (0); 2734 } 2735 (*qbpp)->qb_hiwat = q->q_hiwat; 2736 (*qbpp)->qb_lowat = q->q_lowat; 2737 q->q_nband++; 2738 qbpp = &(*qbpp)->qb_next; 2739 } 2740 } 2741 qbp = q->q_bandp; 2742 i = mp->b_band; 2743 while (--i) 2744 qbp = qbp->qb_next; 2745 } 2746 2747 if ((mp->b_next = emp) != NULL) { 2748 if ((mp->b_prev = emp->b_prev) != NULL) 2749 emp->b_prev->b_next = mp; 2750 else 2751 q->q_first = mp; 2752 emp->b_prev = mp; 2753 } else { 2754 if ((mp->b_prev = q->q_last) != NULL) 2755 q->q_last->b_next = mp; 2756 else 2757 q->q_first = mp; 2758 q->q_last = mp; 2759 } 2760 2761 /* Get mblk and byte count for q_count accounting */ 2762 for (tmp = mp; tmp; tmp = tmp->b_cont) { 2763 ADD_MBLK_SIZE(tmp, bytecnt); 2764 mblkcnt++; 2765 } 2766 2767 if (qbp) { /* adjust qband pointers and count */ 2768 if (!qbp->qb_first) { 2769 qbp->qb_first = mp; 2770 qbp->qb_last = mp; 2771 } else { 2772 if (mp->b_prev == NULL || (mp->b_prev != NULL && 2773 (mp->b_prev->b_band != mp->b_band))) 2774 qbp->qb_first = mp; 2775 else if (mp->b_next == NULL || (mp->b_next != NULL && 2776 (mp->b_next->b_band != mp->b_band))) 2777 qbp->qb_last = mp; 2778 } 2779 qbp->qb_count += bytecnt; 2780 qbp->qb_mblkcnt += mblkcnt; 2781 if ((qbp->qb_count >= qbp->qb_hiwat) || 2782 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2783 qbp->qb_flag |= QB_FULL; 2784 } 2785 } else { 2786 q->q_count += bytecnt; 2787 q->q_mblkcnt += mblkcnt; 2788 if ((q->q_count >= q->q_hiwat) || 2789 (q->q_mblkcnt >= q->q_hiwat)) { 2790 q->q_flag |= QFULL; 2791 } 2792 } 2793 2794 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 2795 2796 if (canenable(q) && (q->q_flag & QWANTR)) 2797 qenable_locked(q); 2798 2799 ASSERT(MUTEX_HELD(QLOCK(q))); 2800 if (freezer != curthread) 2801 mutex_exit(QLOCK(q)); 2802 2803 return (1); 2804 } 2805 2806 /* 2807 * Create and put a control message on queue. 2808 */ 2809 int 2810 putctl(queue_t *q, int type) 2811 { 2812 mblk_t *bp; 2813 2814 if ((datamsg(type) && (type != M_DELAY)) || 2815 (bp = allocb_tryhard(0)) == NULL) 2816 return (0); 2817 bp->b_datap->db_type = (unsigned char) type; 2818 2819 put(q, bp); 2820 2821 return (1); 2822 } 2823 2824 /* 2825 * Control message with a single-byte parameter 2826 */ 2827 int 2828 putctl1(queue_t *q, int type, int param) 2829 { 2830 mblk_t *bp; 2831 2832 if ((datamsg(type) && (type != M_DELAY)) || 2833 (bp = allocb_tryhard(1)) == NULL) 2834 return (0); 2835 bp->b_datap->db_type = (unsigned char)type; 2836 *bp->b_wptr++ = (unsigned char)param; 2837 2838 put(q, bp); 2839 2840 return (1); 2841 } 2842 2843 int 2844 putnextctl1(queue_t *q, int type, int param) 2845 { 2846 mblk_t *bp; 2847 2848 if ((datamsg(type) && (type != M_DELAY)) || 2849 ((bp = allocb_tryhard(1)) == NULL)) 2850 return (0); 2851 2852 bp->b_datap->db_type = (unsigned char)type; 2853 *bp->b_wptr++ = (unsigned char)param; 2854 2855 putnext(q, bp); 2856 2857 return (1); 2858 } 2859 2860 int 2861 putnextctl(queue_t *q, int type) 2862 { 2863 mblk_t *bp; 2864 2865 if ((datamsg(type) && (type != M_DELAY)) || 2866 ((bp = allocb_tryhard(0)) == NULL)) 2867 return (0); 2868 bp->b_datap->db_type = (unsigned char)type; 2869 2870 putnext(q, bp); 2871 2872 return (1); 2873 } 2874 2875 /* 2876 * Return the queue upstream from this one 2877 */ 2878 queue_t * 2879 backq(queue_t *q) 2880 { 2881 q = _OTHERQ(q); 2882 if (q->q_next) { 2883 q = q->q_next; 2884 return (_OTHERQ(q)); 2885 } 2886 return (NULL); 2887 } 2888 2889 /* 2890 * Send a block back up the queue in reverse from this 2891 * one (e.g. to respond to ioctls) 2892 */ 2893 void 2894 qreply(queue_t *q, mblk_t *bp) 2895 { 2896 ASSERT(q && bp); 2897 2898 putnext(_OTHERQ(q), bp); 2899 } 2900 2901 /* 2902 * Streams Queue Scheduling 2903 * 2904 * Queues are enabled through qenable() when they have messages to 2905 * process. They are serviced by queuerun(), which runs each enabled 2906 * queue's service procedure. The call to queuerun() is processor 2907 * dependent - the general principle is that it be run whenever a queue 2908 * is enabled but before returning to user level. For system calls, 2909 * the function runqueues() is called if their action causes a queue 2910 * to be enabled. For device interrupts, queuerun() should be 2911 * called before returning from the last level of interrupt. Beyond 2912 * this, no timing assumptions should be made about queue scheduling. 2913 */ 2914 2915 /* 2916 * Enable a queue: put it on list of those whose service procedures are 2917 * ready to run and set up the scheduling mechanism. 2918 * The broadcast is done outside the mutex -> to avoid the woken thread 2919 * from contending with the mutex. This is OK 'cos the queue has been 2920 * enqueued on the runlist and flagged safely at this point. 2921 */ 2922 void 2923 qenable(queue_t *q) 2924 { 2925 mutex_enter(QLOCK(q)); 2926 qenable_locked(q); 2927 mutex_exit(QLOCK(q)); 2928 } 2929 /* 2930 * Return number of messages on queue 2931 */ 2932 int 2933 qsize(queue_t *qp) 2934 { 2935 int count = 0; 2936 mblk_t *mp; 2937 2938 mutex_enter(QLOCK(qp)); 2939 for (mp = qp->q_first; mp; mp = mp->b_next) 2940 count++; 2941 mutex_exit(QLOCK(qp)); 2942 return (count); 2943 } 2944 2945 /* 2946 * noenable - set queue so that putq() will not enable it. 2947 * enableok - set queue so that putq() can enable it. 2948 */ 2949 void 2950 noenable(queue_t *q) 2951 { 2952 mutex_enter(QLOCK(q)); 2953 q->q_flag |= QNOENB; 2954 mutex_exit(QLOCK(q)); 2955 } 2956 2957 void 2958 enableok(queue_t *q) 2959 { 2960 mutex_enter(QLOCK(q)); 2961 q->q_flag &= ~QNOENB; 2962 mutex_exit(QLOCK(q)); 2963 } 2964 2965 /* 2966 * Set queue fields. 2967 */ 2968 int 2969 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 2970 { 2971 qband_t *qbp = NULL; 2972 queue_t *wrq; 2973 int error = 0; 2974 kthread_id_t freezer; 2975 2976 freezer = STREAM(q)->sd_freezer; 2977 if (freezer == curthread) { 2978 ASSERT(frozenstr(q)); 2979 ASSERT(MUTEX_HELD(QLOCK(q))); 2980 } else 2981 mutex_enter(QLOCK(q)); 2982 2983 if (what >= QBAD) { 2984 error = EINVAL; 2985 goto done; 2986 } 2987 if (pri != 0) { 2988 int i; 2989 qband_t **qbpp; 2990 2991 if (pri > q->q_nband) { 2992 qbpp = &q->q_bandp; 2993 while (*qbpp) 2994 qbpp = &(*qbpp)->qb_next; 2995 while (pri > q->q_nband) { 2996 if ((*qbpp = allocband()) == NULL) { 2997 error = EAGAIN; 2998 goto done; 2999 } 3000 (*qbpp)->qb_hiwat = q->q_hiwat; 3001 (*qbpp)->qb_lowat = q->q_lowat; 3002 q->q_nband++; 3003 qbpp = &(*qbpp)->qb_next; 3004 } 3005 } 3006 qbp = q->q_bandp; 3007 i = pri; 3008 while (--i) 3009 qbp = qbp->qb_next; 3010 } 3011 switch (what) { 3012 3013 case QHIWAT: 3014 if (qbp) 3015 qbp->qb_hiwat = (size_t)val; 3016 else 3017 q->q_hiwat = (size_t)val; 3018 break; 3019 3020 case QLOWAT: 3021 if (qbp) 3022 qbp->qb_lowat = (size_t)val; 3023 else 3024 q->q_lowat = (size_t)val; 3025 break; 3026 3027 case QMAXPSZ: 3028 if (qbp) 3029 error = EINVAL; 3030 else 3031 q->q_maxpsz = (ssize_t)val; 3032 3033 /* 3034 * Performance concern, strwrite looks at the module below 3035 * the stream head for the maxpsz each time it does a write 3036 * we now cache it at the stream head. Check to see if this 3037 * queue is sitting directly below the stream head. 3038 */ 3039 wrq = STREAM(q)->sd_wrq; 3040 if (q != wrq->q_next) 3041 break; 3042 3043 /* 3044 * If the stream is not frozen drop the current QLOCK and 3045 * acquire the sd_wrq QLOCK which protects sd_qn_* 3046 */ 3047 if (freezer != curthread) { 3048 mutex_exit(QLOCK(q)); 3049 mutex_enter(QLOCK(wrq)); 3050 } 3051 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3052 3053 if (strmsgsz != 0) { 3054 if (val == INFPSZ) 3055 val = strmsgsz; 3056 else { 3057 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3058 val = MIN(PIPE_BUF, val); 3059 else 3060 val = MIN(strmsgsz, val); 3061 } 3062 } 3063 STREAM(q)->sd_qn_maxpsz = val; 3064 if (freezer != curthread) { 3065 mutex_exit(QLOCK(wrq)); 3066 mutex_enter(QLOCK(q)); 3067 } 3068 break; 3069 3070 case QMINPSZ: 3071 if (qbp) 3072 error = EINVAL; 3073 else 3074 q->q_minpsz = (ssize_t)val; 3075 3076 /* 3077 * Performance concern, strwrite looks at the module below 3078 * the stream head for the maxpsz each time it does a write 3079 * we now cache it at the stream head. Check to see if this 3080 * queue is sitting directly below the stream head. 3081 */ 3082 wrq = STREAM(q)->sd_wrq; 3083 if (q != wrq->q_next) 3084 break; 3085 3086 /* 3087 * If the stream is not frozen drop the current QLOCK and 3088 * acquire the sd_wrq QLOCK which protects sd_qn_* 3089 */ 3090 if (freezer != curthread) { 3091 mutex_exit(QLOCK(q)); 3092 mutex_enter(QLOCK(wrq)); 3093 } 3094 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3095 3096 if (freezer != curthread) { 3097 mutex_exit(QLOCK(wrq)); 3098 mutex_enter(QLOCK(q)); 3099 } 3100 break; 3101 3102 case QSTRUIOT: 3103 if (qbp) 3104 error = EINVAL; 3105 else 3106 q->q_struiot = (ushort_t)val; 3107 break; 3108 3109 case QCOUNT: 3110 case QFIRST: 3111 case QLAST: 3112 case QFLAG: 3113 error = EPERM; 3114 break; 3115 3116 default: 3117 error = EINVAL; 3118 break; 3119 } 3120 done: 3121 if (freezer != curthread) 3122 mutex_exit(QLOCK(q)); 3123 return (error); 3124 } 3125 3126 /* 3127 * Get queue fields. 3128 */ 3129 int 3130 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3131 { 3132 qband_t *qbp = NULL; 3133 int error = 0; 3134 kthread_id_t freezer; 3135 3136 freezer = STREAM(q)->sd_freezer; 3137 if (freezer == curthread) { 3138 ASSERT(frozenstr(q)); 3139 ASSERT(MUTEX_HELD(QLOCK(q))); 3140 } else 3141 mutex_enter(QLOCK(q)); 3142 if (what >= QBAD) { 3143 error = EINVAL; 3144 goto done; 3145 } 3146 if (pri != 0) { 3147 int i; 3148 qband_t **qbpp; 3149 3150 if (pri > q->q_nband) { 3151 qbpp = &q->q_bandp; 3152 while (*qbpp) 3153 qbpp = &(*qbpp)->qb_next; 3154 while (pri > q->q_nband) { 3155 if ((*qbpp = allocband()) == NULL) { 3156 error = EAGAIN; 3157 goto done; 3158 } 3159 (*qbpp)->qb_hiwat = q->q_hiwat; 3160 (*qbpp)->qb_lowat = q->q_lowat; 3161 q->q_nband++; 3162 qbpp = &(*qbpp)->qb_next; 3163 } 3164 } 3165 qbp = q->q_bandp; 3166 i = pri; 3167 while (--i) 3168 qbp = qbp->qb_next; 3169 } 3170 switch (what) { 3171 case QHIWAT: 3172 if (qbp) 3173 *(size_t *)valp = qbp->qb_hiwat; 3174 else 3175 *(size_t *)valp = q->q_hiwat; 3176 break; 3177 3178 case QLOWAT: 3179 if (qbp) 3180 *(size_t *)valp = qbp->qb_lowat; 3181 else 3182 *(size_t *)valp = q->q_lowat; 3183 break; 3184 3185 case QMAXPSZ: 3186 if (qbp) 3187 error = EINVAL; 3188 else 3189 *(ssize_t *)valp = q->q_maxpsz; 3190 break; 3191 3192 case QMINPSZ: 3193 if (qbp) 3194 error = EINVAL; 3195 else 3196 *(ssize_t *)valp = q->q_minpsz; 3197 break; 3198 3199 case QCOUNT: 3200 if (qbp) 3201 *(size_t *)valp = qbp->qb_count; 3202 else 3203 *(size_t *)valp = q->q_count; 3204 break; 3205 3206 case QFIRST: 3207 if (qbp) 3208 *(mblk_t **)valp = qbp->qb_first; 3209 else 3210 *(mblk_t **)valp = q->q_first; 3211 break; 3212 3213 case QLAST: 3214 if (qbp) 3215 *(mblk_t **)valp = qbp->qb_last; 3216 else 3217 *(mblk_t **)valp = q->q_last; 3218 break; 3219 3220 case QFLAG: 3221 if (qbp) 3222 *(uint_t *)valp = qbp->qb_flag; 3223 else 3224 *(uint_t *)valp = q->q_flag; 3225 break; 3226 3227 case QSTRUIOT: 3228 if (qbp) 3229 error = EINVAL; 3230 else 3231 *(short *)valp = q->q_struiot; 3232 break; 3233 3234 default: 3235 error = EINVAL; 3236 break; 3237 } 3238 done: 3239 if (freezer != curthread) 3240 mutex_exit(QLOCK(q)); 3241 return (error); 3242 } 3243 3244 /* 3245 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3246 * QWANTWSYNC or QWANTR or QWANTW, 3247 * 3248 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3249 * deferred wakeup will be done. Also if strpoll() in progress then a 3250 * deferred pollwakeup will be done. 3251 */ 3252 void 3253 strwakeq(queue_t *q, int flag) 3254 { 3255 stdata_t *stp = STREAM(q); 3256 pollhead_t *pl; 3257 3258 mutex_enter(&stp->sd_lock); 3259 pl = &stp->sd_pollist; 3260 if (flag & QWANTWSYNC) { 3261 ASSERT(!(q->q_flag & QREADR)); 3262 if (stp->sd_flag & WSLEEP) { 3263 stp->sd_flag &= ~WSLEEP; 3264 cv_broadcast(&stp->sd_wrq->q_wait); 3265 } else { 3266 stp->sd_wakeq |= WSLEEP; 3267 } 3268 3269 mutex_exit(&stp->sd_lock); 3270 pollwakeup(pl, POLLWRNORM); 3271 mutex_enter(&stp->sd_lock); 3272 3273 if (stp->sd_sigflags & S_WRNORM) 3274 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3275 } else if (flag & QWANTR) { 3276 if (stp->sd_flag & RSLEEP) { 3277 stp->sd_flag &= ~RSLEEP; 3278 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3279 } else { 3280 stp->sd_wakeq |= RSLEEP; 3281 } 3282 3283 mutex_exit(&stp->sd_lock); 3284 pollwakeup(pl, POLLIN | POLLRDNORM); 3285 mutex_enter(&stp->sd_lock); 3286 3287 { 3288 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3289 3290 if (events) 3291 strsendsig(stp->sd_siglist, events, 0, 0); 3292 } 3293 } else { 3294 if (stp->sd_flag & WSLEEP) { 3295 stp->sd_flag &= ~WSLEEP; 3296 cv_broadcast(&stp->sd_wrq->q_wait); 3297 } 3298 3299 mutex_exit(&stp->sd_lock); 3300 pollwakeup(pl, POLLWRNORM); 3301 mutex_enter(&stp->sd_lock); 3302 3303 if (stp->sd_sigflags & S_WRNORM) 3304 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3305 } 3306 mutex_exit(&stp->sd_lock); 3307 } 3308 3309 int 3310 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3311 { 3312 stdata_t *stp = STREAM(q); 3313 int typ = STRUIOT_STANDARD; 3314 uio_t *uiop = &dp->d_uio; 3315 dblk_t *dbp; 3316 ssize_t uiocnt; 3317 ssize_t cnt; 3318 unsigned char *ptr; 3319 ssize_t resid; 3320 int error = 0; 3321 on_trap_data_t otd; 3322 queue_t *stwrq; 3323 3324 /* 3325 * Plumbing may change while taking the type so store the 3326 * queue in a temporary variable. It doesn't matter even 3327 * if the we take the type from the previous plumbing, 3328 * that's because if the plumbing has changed when we were 3329 * holding the queue in a temporary variable, we can continue 3330 * processing the message the way it would have been processed 3331 * in the old plumbing, without any side effects but a bit 3332 * extra processing for partial ip header checksum. 3333 * 3334 * This has been done to avoid holding the sd_lock which is 3335 * very hot. 3336 */ 3337 3338 stwrq = stp->sd_struiowrq; 3339 if (stwrq) 3340 typ = stwrq->q_struiot; 3341 3342 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3343 dbp = mp->b_datap; 3344 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3345 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3346 cnt = MIN(uiocnt, uiop->uio_resid); 3347 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3348 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3349 /* 3350 * Either this mblk has already been processed 3351 * or there is no more room in this mblk (?). 3352 */ 3353 continue; 3354 } 3355 switch (typ) { 3356 case STRUIOT_STANDARD: 3357 if (noblock) { 3358 if (on_trap(&otd, OT_DATA_ACCESS)) { 3359 no_trap(); 3360 error = EWOULDBLOCK; 3361 goto out; 3362 } 3363 } 3364 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3365 if (noblock) 3366 no_trap(); 3367 goto out; 3368 } 3369 if (noblock) 3370 no_trap(); 3371 break; 3372 3373 default: 3374 error = EIO; 3375 goto out; 3376 } 3377 dbp->db_struioflag |= STRUIO_DONE; 3378 dbp->db_cksumstuff += cnt; 3379 } 3380 out: 3381 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3382 /* 3383 * A fault has occured and some bytes were moved to the 3384 * current mblk, the uio_t has already been updated by 3385 * the appropriate uio routine, so also update the mblk 3386 * to reflect this in case this same mblk chain is used 3387 * again (after the fault has been handled). 3388 */ 3389 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3390 if (uiocnt >= resid) 3391 dbp->db_cksumstuff += resid; 3392 } 3393 return (error); 3394 } 3395 3396 /* 3397 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3398 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3399 * that removeq() will not try to close the queue while a thread is inside the 3400 * queue. 3401 */ 3402 static boolean_t 3403 rwnext_enter(queue_t *qp) 3404 { 3405 mutex_enter(QLOCK(qp)); 3406 if (qp->q_flag & QWCLOSE) { 3407 mutex_exit(QLOCK(qp)); 3408 return (B_FALSE); 3409 } 3410 qp->q_rwcnt++; 3411 ASSERT(qp->q_rwcnt != 0); 3412 mutex_exit(QLOCK(qp)); 3413 return (B_TRUE); 3414 } 3415 3416 /* 3417 * Decrease the count of threads running in sync stream queue and wake up any 3418 * threads blocked in removeq(). 3419 */ 3420 static void 3421 rwnext_exit(queue_t *qp) 3422 { 3423 mutex_enter(QLOCK(qp)); 3424 qp->q_rwcnt--; 3425 if (qp->q_flag & QWANTRMQSYNC) { 3426 qp->q_flag &= ~QWANTRMQSYNC; 3427 cv_broadcast(&qp->q_wait); 3428 } 3429 mutex_exit(QLOCK(qp)); 3430 } 3431 3432 /* 3433 * The purpose of rwnext() is to call the rw procedure of the next 3434 * (downstream) modules queue. 3435 * 3436 * treated as put entrypoint for perimeter syncronization. 3437 * 3438 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3439 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3440 * not matter if any regular put entrypoints have been already entered. We 3441 * can't increment one of the sq_putcounts (instead of sq_count) because 3442 * qwait_rw won't know which counter to decrement. 3443 * 3444 * It would be reasonable to add the lockless FASTPUT logic. 3445 */ 3446 int 3447 rwnext(queue_t *qp, struiod_t *dp) 3448 { 3449 queue_t *nqp; 3450 syncq_t *sq; 3451 uint16_t count; 3452 uint16_t flags; 3453 struct qinit *qi; 3454 int (*proc)(); 3455 struct stdata *stp; 3456 int isread; 3457 int rval; 3458 3459 stp = STREAM(qp); 3460 /* 3461 * Prevent q_next from changing by holding sd_lock until acquiring 3462 * SQLOCK. Note that a read-side rwnext from the streamhead will 3463 * already have sd_lock acquired. In either case sd_lock is always 3464 * released after acquiring SQLOCK. 3465 * 3466 * The streamhead read-side holding sd_lock when calling rwnext is 3467 * required to prevent a race condition were M_DATA mblks flowing 3468 * up the read-side of the stream could be bypassed by a rwnext() 3469 * down-call. In this case sd_lock acts as the streamhead perimeter. 3470 */ 3471 if ((nqp = _WR(qp)) == qp) { 3472 isread = 0; 3473 mutex_enter(&stp->sd_lock); 3474 qp = nqp->q_next; 3475 } else { 3476 isread = 1; 3477 if (nqp != stp->sd_wrq) 3478 /* Not streamhead */ 3479 mutex_enter(&stp->sd_lock); 3480 qp = _RD(nqp->q_next); 3481 } 3482 qi = qp->q_qinfo; 3483 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3484 /* 3485 * Not a synchronous module or no r/w procedure for this 3486 * queue, so just return EINVAL and let the caller handle it. 3487 */ 3488 mutex_exit(&stp->sd_lock); 3489 return (EINVAL); 3490 } 3491 3492 if (rwnext_enter(qp) == B_FALSE) { 3493 mutex_exit(&stp->sd_lock); 3494 return (EINVAL); 3495 } 3496 3497 sq = qp->q_syncq; 3498 mutex_enter(SQLOCK(sq)); 3499 mutex_exit(&stp->sd_lock); 3500 count = sq->sq_count; 3501 flags = sq->sq_flags; 3502 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3503 3504 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3505 /* 3506 * if this queue is being closed, return. 3507 */ 3508 if (qp->q_flag & QWCLOSE) { 3509 mutex_exit(SQLOCK(sq)); 3510 rwnext_exit(qp); 3511 return (EINVAL); 3512 } 3513 3514 /* 3515 * Wait until we can enter the inner perimeter. 3516 */ 3517 sq->sq_flags = flags | SQ_WANTWAKEUP; 3518 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3519 count = sq->sq_count; 3520 flags = sq->sq_flags; 3521 } 3522 3523 if (isread == 0 && stp->sd_struiowrq == NULL || 3524 isread == 1 && stp->sd_struiordq == NULL) { 3525 /* 3526 * Stream plumbing changed while waiting for inner perimeter 3527 * so just return EINVAL and let the caller handle it. 3528 */ 3529 mutex_exit(SQLOCK(sq)); 3530 rwnext_exit(qp); 3531 return (EINVAL); 3532 } 3533 if (!(flags & SQ_CIPUT)) 3534 sq->sq_flags = flags | SQ_EXCL; 3535 sq->sq_count = count + 1; 3536 ASSERT(sq->sq_count != 0); /* Wraparound */ 3537 /* 3538 * Note: The only message ordering guarantee that rwnext() makes is 3539 * for the write queue flow-control case. All others (r/w queue 3540 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3541 * the queue's rw procedure. This could be genralized here buy 3542 * running the queue's service procedure, but that wouldn't be 3543 * the most efficent for all cases. 3544 */ 3545 mutex_exit(SQLOCK(sq)); 3546 if (! isread && (qp->q_flag & QFULL)) { 3547 /* 3548 * Write queue may be flow controlled. If so, 3549 * mark the queue for wakeup when it's not. 3550 */ 3551 mutex_enter(QLOCK(qp)); 3552 if (qp->q_flag & QFULL) { 3553 qp->q_flag |= QWANTWSYNC; 3554 mutex_exit(QLOCK(qp)); 3555 rval = EWOULDBLOCK; 3556 goto out; 3557 } 3558 mutex_exit(QLOCK(qp)); 3559 } 3560 3561 if (! isread && dp->d_mp) 3562 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3563 dp->d_mp->b_datap->db_base); 3564 3565 rval = (*proc)(qp, dp); 3566 3567 if (isread && dp->d_mp) 3568 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3569 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3570 out: 3571 /* 3572 * The queue is protected from being freed by sq_count, so it is 3573 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3574 */ 3575 rwnext_exit(qp); 3576 3577 mutex_enter(SQLOCK(sq)); 3578 flags = sq->sq_flags; 3579 ASSERT(sq->sq_count != 0); 3580 sq->sq_count--; 3581 if (flags & SQ_TAIL) { 3582 putnext_tail(sq, qp, flags); 3583 /* 3584 * The only purpose of this ASSERT is to preserve calling stack 3585 * in DEBUG kernel. 3586 */ 3587 ASSERT(flags & SQ_TAIL); 3588 return (rval); 3589 } 3590 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3591 /* 3592 * Safe to always drop SQ_EXCL: 3593 * Not SQ_CIPUT means we set SQ_EXCL above 3594 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3595 * did a qwriter(INNER) in which case nobody else 3596 * is in the inner perimeter and we are exiting. 3597 * 3598 * I would like to make the following assertion: 3599 * 3600 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3601 * sq->sq_count == 0); 3602 * 3603 * which indicates that if we are both putshared and exclusive, 3604 * we became exclusive while executing the putproc, and the only 3605 * claim on the syncq was the one we dropped a few lines above. 3606 * But other threads that enter putnext while the syncq is exclusive 3607 * need to make a claim as they may need to drop SQLOCK in the 3608 * has_writers case to avoid deadlocks. If these threads are 3609 * delayed or preempted, it is possible that the writer thread can 3610 * find out that there are other claims making the (sq_count == 0) 3611 * test invalid. 3612 */ 3613 3614 sq->sq_flags = flags & ~SQ_EXCL; 3615 if (sq->sq_flags & SQ_WANTWAKEUP) { 3616 sq->sq_flags &= ~SQ_WANTWAKEUP; 3617 cv_broadcast(&sq->sq_wait); 3618 } 3619 mutex_exit(SQLOCK(sq)); 3620 return (rval); 3621 } 3622 3623 /* 3624 * The purpose of infonext() is to call the info procedure of the next 3625 * (downstream) modules queue. 3626 * 3627 * treated as put entrypoint for perimeter syncronization. 3628 * 3629 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3630 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3631 * it does not matter if any regular put entrypoints have been already 3632 * entered. 3633 */ 3634 int 3635 infonext(queue_t *qp, infod_t *idp) 3636 { 3637 queue_t *nqp; 3638 syncq_t *sq; 3639 uint16_t count; 3640 uint16_t flags; 3641 struct qinit *qi; 3642 int (*proc)(); 3643 struct stdata *stp; 3644 int rval; 3645 3646 stp = STREAM(qp); 3647 /* 3648 * Prevent q_next from changing by holding sd_lock until 3649 * acquiring SQLOCK. 3650 */ 3651 mutex_enter(&stp->sd_lock); 3652 if ((nqp = _WR(qp)) == qp) { 3653 qp = nqp->q_next; 3654 } else { 3655 qp = _RD(nqp->q_next); 3656 } 3657 qi = qp->q_qinfo; 3658 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3659 mutex_exit(&stp->sd_lock); 3660 return (EINVAL); 3661 } 3662 sq = qp->q_syncq; 3663 mutex_enter(SQLOCK(sq)); 3664 mutex_exit(&stp->sd_lock); 3665 count = sq->sq_count; 3666 flags = sq->sq_flags; 3667 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3668 3669 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3670 /* 3671 * Wait until we can enter the inner perimeter. 3672 */ 3673 sq->sq_flags = flags | SQ_WANTWAKEUP; 3674 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3675 count = sq->sq_count; 3676 flags = sq->sq_flags; 3677 } 3678 3679 if (! (flags & SQ_CIPUT)) 3680 sq->sq_flags = flags | SQ_EXCL; 3681 sq->sq_count = count + 1; 3682 ASSERT(sq->sq_count != 0); /* Wraparound */ 3683 mutex_exit(SQLOCK(sq)); 3684 3685 rval = (*proc)(qp, idp); 3686 3687 mutex_enter(SQLOCK(sq)); 3688 flags = sq->sq_flags; 3689 ASSERT(sq->sq_count != 0); 3690 sq->sq_count--; 3691 if (flags & SQ_TAIL) { 3692 putnext_tail(sq, qp, flags); 3693 /* 3694 * The only purpose of this ASSERT is to preserve calling stack 3695 * in DEBUG kernel. 3696 */ 3697 ASSERT(flags & SQ_TAIL); 3698 return (rval); 3699 } 3700 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3701 /* 3702 * XXXX 3703 * I am not certain the next comment is correct here. I need to consider 3704 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3705 * might cause other problems. It just might be safer to drop it if 3706 * !SQ_CIPUT because that is when we set it. 3707 */ 3708 /* 3709 * Safe to always drop SQ_EXCL: 3710 * Not SQ_CIPUT means we set SQ_EXCL above 3711 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3712 * did a qwriter(INNER) in which case nobody else 3713 * is in the inner perimeter and we are exiting. 3714 * 3715 * I would like to make the following assertion: 3716 * 3717 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3718 * sq->sq_count == 0); 3719 * 3720 * which indicates that if we are both putshared and exclusive, 3721 * we became exclusive while executing the putproc, and the only 3722 * claim on the syncq was the one we dropped a few lines above. 3723 * But other threads that enter putnext while the syncq is exclusive 3724 * need to make a claim as they may need to drop SQLOCK in the 3725 * has_writers case to avoid deadlocks. If these threads are 3726 * delayed or preempted, it is possible that the writer thread can 3727 * find out that there are other claims making the (sq_count == 0) 3728 * test invalid. 3729 */ 3730 3731 sq->sq_flags = flags & ~SQ_EXCL; 3732 mutex_exit(SQLOCK(sq)); 3733 return (rval); 3734 } 3735 3736 /* 3737 * Return nonzero if the queue is responsible for struio(), else return 0. 3738 */ 3739 int 3740 isuioq(queue_t *q) 3741 { 3742 if (q->q_flag & QREADR) 3743 return (STREAM(q)->sd_struiordq == q); 3744 else 3745 return (STREAM(q)->sd_struiowrq == q); 3746 } 3747 3748 #if defined(__sparc) 3749 int disable_putlocks = 0; 3750 #else 3751 int disable_putlocks = 1; 3752 #endif 3753 3754 /* 3755 * called by create_putlock. 3756 */ 3757 static void 3758 create_syncq_putlocks(queue_t *q) 3759 { 3760 syncq_t *sq = q->q_syncq; 3761 ciputctrl_t *cip; 3762 int i; 3763 3764 ASSERT(sq != NULL); 3765 3766 ASSERT(disable_putlocks == 0); 3767 ASSERT(n_ciputctrl >= min_n_ciputctrl); 3768 ASSERT(ciputctrl_cache != NULL); 3769 3770 if (!(sq->sq_type & SQ_CIPUT)) 3771 return; 3772 3773 for (i = 0; i <= 1; i++) { 3774 if (sq->sq_ciputctrl == NULL) { 3775 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3776 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3777 mutex_enter(SQLOCK(sq)); 3778 if (sq->sq_ciputctrl != NULL) { 3779 mutex_exit(SQLOCK(sq)); 3780 kmem_cache_free(ciputctrl_cache, cip); 3781 } else { 3782 ASSERT(sq->sq_nciputctrl == 0); 3783 sq->sq_nciputctrl = n_ciputctrl - 1; 3784 /* 3785 * putnext checks sq_ciputctrl without holding 3786 * SQLOCK. if it is not NULL putnext assumes 3787 * sq_nciputctrl is initialized. membar below 3788 * insures that. 3789 */ 3790 membar_producer(); 3791 sq->sq_ciputctrl = cip; 3792 mutex_exit(SQLOCK(sq)); 3793 } 3794 } 3795 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 3796 if (i == 1) 3797 break; 3798 q = _OTHERQ(q); 3799 if (!(q->q_flag & QPERQ)) { 3800 ASSERT(sq == q->q_syncq); 3801 break; 3802 } 3803 ASSERT(q->q_syncq != NULL); 3804 ASSERT(sq != q->q_syncq); 3805 sq = q->q_syncq; 3806 ASSERT(sq->sq_type & SQ_CIPUT); 3807 } 3808 } 3809 3810 /* 3811 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 3812 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 3813 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 3814 * starting from q and down to the driver. 3815 * 3816 * This should be called after the affected queues are part of stream 3817 * geometry. It should be called from driver/module open routine after 3818 * qprocson() call. It is also called from nfs syscall where it is known that 3819 * stream is configured and won't change its geometry during create_putlock 3820 * call. 3821 * 3822 * caller normally uses 0 value for the stream argument to speed up MT putnext 3823 * into the perimeter of q for example because its perimeter is per module 3824 * (e.g. IP). 3825 * 3826 * caller normally uses non 0 value for the stream argument to hint the system 3827 * that the stream of q is a very contended global system stream 3828 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 3829 * particularly MT hot. 3830 * 3831 * Caller insures stream plumbing won't happen while we are here and therefore 3832 * q_next can be safely used. 3833 */ 3834 3835 void 3836 create_putlocks(queue_t *q, int stream) 3837 { 3838 ciputctrl_t *cip; 3839 struct stdata *stp = STREAM(q); 3840 3841 q = _WR(q); 3842 ASSERT(stp != NULL); 3843 3844 if (disable_putlocks != 0) 3845 return; 3846 3847 if (n_ciputctrl < min_n_ciputctrl) 3848 return; 3849 3850 ASSERT(ciputctrl_cache != NULL); 3851 3852 if (stream != 0 && stp->sd_ciputctrl == NULL) { 3853 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3854 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3855 mutex_enter(&stp->sd_lock); 3856 if (stp->sd_ciputctrl != NULL) { 3857 mutex_exit(&stp->sd_lock); 3858 kmem_cache_free(ciputctrl_cache, cip); 3859 } else { 3860 ASSERT(stp->sd_nciputctrl == 0); 3861 stp->sd_nciputctrl = n_ciputctrl - 1; 3862 /* 3863 * putnext checks sd_ciputctrl without holding 3864 * sd_lock. if it is not NULL putnext assumes 3865 * sd_nciputctrl is initialized. membar below 3866 * insures that. 3867 */ 3868 membar_producer(); 3869 stp->sd_ciputctrl = cip; 3870 mutex_exit(&stp->sd_lock); 3871 } 3872 } 3873 3874 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 3875 3876 while (_SAMESTR(q)) { 3877 create_syncq_putlocks(q); 3878 if (stream == 0) 3879 return; 3880 q = q->q_next; 3881 } 3882 ASSERT(q != NULL); 3883 create_syncq_putlocks(q); 3884 } 3885 3886 /* 3887 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 3888 * through a stream. 3889 * 3890 * Data currently record per event is a hrtime stamp, queue address, event 3891 * type, and a per type datum. Much of the STREAMS framework is instrumented 3892 * for automatic flow tracing (when enabled). Events can be defined and used 3893 * by STREAMS modules and drivers. 3894 * 3895 * Global objects: 3896 * 3897 * str_ftevent() - Add a flow-trace event to a dblk. 3898 * str_ftfree() - Free flow-trace data 3899 * 3900 * Local objects: 3901 * 3902 * fthdr_cache - pointer to the kmem cache for trace header. 3903 * ftblk_cache - pointer to the kmem cache for trace data blocks. 3904 */ 3905 3906 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 3907 3908 void 3909 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 3910 { 3911 ftblk_t *bp = hp->tail; 3912 ftblk_t *nbp; 3913 ftevnt_t *ep; 3914 int ix, nix; 3915 3916 ASSERT(hp != NULL); 3917 3918 for (;;) { 3919 if ((ix = bp->ix) == FTBLK_EVNTS) { 3920 /* 3921 * Tail doesn't have room, so need a new tail. 3922 * 3923 * To make this MT safe, first, allocate a new 3924 * ftblk, and initialize it. To make life a 3925 * little easier, reserve the first slot (mostly 3926 * by making ix = 1). When we are finished with 3927 * the initialization, CAS this pointer to the 3928 * tail. If this succeeds, this is the new 3929 * "next" block. Otherwise, another thread 3930 * got here first, so free the block and start 3931 * again. 3932 */ 3933 if (!(nbp = kmem_cache_alloc(ftblk_cache, 3934 KM_NOSLEEP))) { 3935 /* no mem, so punt */ 3936 str_ftnever++; 3937 /* free up all flow data? */ 3938 return; 3939 } 3940 nbp->nxt = NULL; 3941 nbp->ix = 1; 3942 /* 3943 * Just in case there is another thread about 3944 * to get the next index, we need to make sure 3945 * the value is there for it. 3946 */ 3947 membar_producer(); 3948 if (casptr(&hp->tail, bp, nbp) == bp) { 3949 /* CAS was successful */ 3950 bp->nxt = nbp; 3951 membar_producer(); 3952 bp = nbp; 3953 ix = 0; 3954 goto cas_good; 3955 } else { 3956 kmem_cache_free(ftblk_cache, nbp); 3957 bp = hp->tail; 3958 continue; 3959 } 3960 } 3961 nix = ix + 1; 3962 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 3963 cas_good: 3964 if (curthread != hp->thread) { 3965 hp->thread = curthread; 3966 evnt |= FTEV_CS; 3967 } 3968 if (CPU->cpu_seqid != hp->cpu_seqid) { 3969 hp->cpu_seqid = CPU->cpu_seqid; 3970 evnt |= FTEV_PS; 3971 } 3972 ep = &bp->ev[ix]; 3973 break; 3974 } 3975 } 3976 3977 if (evnt & FTEV_QMASK) { 3978 queue_t *qp = p; 3979 3980 /* 3981 * It is possible that the module info is broke 3982 * (as is logsubr.c at this comment writing). 3983 * Instead of panicing or doing other unmentionables, 3984 * we shall put a dummy name as the mid, and continue. 3985 */ 3986 if (qp->q_qinfo == NULL) 3987 ep->mid = "NONAME"; 3988 else 3989 ep->mid = qp->q_qinfo->qi_minfo->mi_idname; 3990 3991 if (!(qp->q_flag & QREADR)) 3992 evnt |= FTEV_ISWR; 3993 } else { 3994 ep->mid = (char *)p; 3995 } 3996 3997 ep->ts = gethrtime(); 3998 ep->evnt = evnt; 3999 ep->data = data; 4000 hp->hash = (hp->hash << 9) + hp->hash; 4001 hp->hash += (evnt << 16) | data; 4002 hp->hash += (uintptr_t)ep->mid; 4003 } 4004 4005 /* 4006 * Free flow-trace data. 4007 */ 4008 void 4009 str_ftfree(dblk_t *dbp) 4010 { 4011 fthdr_t *hp = dbp->db_fthdr; 4012 ftblk_t *bp = &hp->first; 4013 ftblk_t *nbp; 4014 4015 if (bp != hp->tail || bp->ix != 0) { 4016 /* 4017 * Clear out the hash, have the tail point to itself, and free 4018 * any continuation blocks. 4019 */ 4020 bp = hp->first.nxt; 4021 hp->tail = &hp->first; 4022 hp->hash = 0; 4023 hp->first.nxt = NULL; 4024 hp->first.ix = 0; 4025 while (bp != NULL) { 4026 nbp = bp->nxt; 4027 kmem_cache_free(ftblk_cache, bp); 4028 bp = nbp; 4029 } 4030 } 4031 kmem_cache_free(fthdr_cache, hp); 4032 dbp->db_fthdr = NULL; 4033 } 4034