1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 23 /* All Rights Reserved */ 24 25 26 /* 27 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 28 * Use is subject to license terms. 29 */ 30 31 #pragma ident "%Z%%M% %I% %E% SMI" 32 33 #include <sys/types.h> 34 #include <sys/param.h> 35 #include <sys/thread.h> 36 #include <sys/sysmacros.h> 37 #include <sys/stropts.h> 38 #include <sys/stream.h> 39 #include <sys/strsubr.h> 40 #include <sys/strsun.h> 41 #include <sys/conf.h> 42 #include <sys/debug.h> 43 #include <sys/cmn_err.h> 44 #include <sys/kmem.h> 45 #include <sys/atomic.h> 46 #include <sys/errno.h> 47 #include <sys/vtrace.h> 48 #include <sys/ftrace.h> 49 #include <sys/ontrap.h> 50 #include <sys/multidata.h> 51 #include <sys/multidata_impl.h> 52 #include <sys/sdt.h> 53 54 #ifdef DEBUG 55 #include <sys/kmem_impl.h> 56 #endif 57 58 /* 59 * This file contains all the STREAMS utility routines that may 60 * be used by modules and drivers. 61 */ 62 63 /* 64 * STREAMS message allocator: principles of operation 65 * 66 * The streams message allocator consists of all the routines that 67 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 68 * dupb(), freeb() and freemsg(). What follows is a high-level view 69 * of how the allocator works. 70 * 71 * Every streams message consists of one or more mblks, a dblk, and data. 72 * All mblks for all types of messages come from a common mblk_cache. 73 * The dblk and data come in several flavors, depending on how the 74 * message is allocated: 75 * 76 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 77 * fixed-size dblk/data caches. For message sizes that are multiples of 78 * PAGESIZE, dblks are allocated separately from the buffer. 79 * The associated buffer is allocated by the constructor using kmem_alloc(). 80 * For all other message sizes, dblk and its associated data is allocated 81 * as a single contiguous chunk of memory. 82 * Objects in these caches consist of a dblk plus its associated data. 83 * allocb() determines the nearest-size cache by table lookup: 84 * the dblk_cache[] array provides the mapping from size to dblk cache. 85 * 86 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 87 * kmem_alloc()'ing a buffer for the data and supplying that 88 * buffer to gesballoc(), described below. 89 * 90 * (3) The four flavors of [d]esballoc[a] are all implemented by a 91 * common routine, gesballoc() ("generic esballoc"). gesballoc() 92 * allocates a dblk from the global dblk_esb_cache and sets db_base, 93 * db_lim and db_frtnp to describe the caller-supplied buffer. 94 * 95 * While there are several routines to allocate messages, there is only 96 * one routine to free messages: freeb(). freeb() simply invokes the 97 * dblk's free method, dbp->db_free(), which is set at allocation time. 98 * 99 * dupb() creates a new reference to a message by allocating a new mblk, 100 * incrementing the dblk reference count and setting the dblk's free 101 * method to dblk_decref(). The dblk's original free method is retained 102 * in db_lastfree. dblk_decref() decrements the reference count on each 103 * freeb(). If this is not the last reference it just frees the mblk; 104 * if this *is* the last reference, it restores db_free to db_lastfree, 105 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 106 * 107 * The implementation makes aggressive use of kmem object caching for 108 * maximum performance. This makes the code simple and compact, but 109 * also a bit abstruse in some places. The invariants that constitute a 110 * message's constructed state, described below, are more subtle than usual. 111 * 112 * Every dblk has an "attached mblk" as part of its constructed state. 113 * The mblk is allocated by the dblk's constructor and remains attached 114 * until the message is either dup'ed or pulled up. In the dupb() case 115 * the mblk association doesn't matter until the last free, at which time 116 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 117 * the mblk association because it swaps the leading mblks of two messages, 118 * so it is responsible for swapping their db_mblk pointers accordingly. 119 * From a constructed-state viewpoint it doesn't matter that a dblk's 120 * attached mblk can change while the message is allocated; all that 121 * matters is that the dblk has *some* attached mblk when it's freed. 122 * 123 * The sizes of the allocb() small-message caches are not magical. 124 * They represent a good trade-off between internal and external 125 * fragmentation for current workloads. They should be reevaluated 126 * periodically, especially if allocations larger than DBLK_MAX_CACHE 127 * become common. We use 64-byte alignment so that dblks don't 128 * straddle cache lines unnecessarily. 129 */ 130 #define DBLK_MAX_CACHE 73728 131 #define DBLK_CACHE_ALIGN 64 132 #define DBLK_MIN_SIZE 8 133 #define DBLK_SIZE_SHIFT 3 134 135 #ifdef _BIG_ENDIAN 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 138 #else 139 #define DBLK_RTFU_SHIFT(field) \ 140 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 141 #endif 142 143 #define DBLK_RTFU(ref, type, flags, uioflag) \ 144 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 145 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 146 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 147 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 148 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 149 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 150 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 151 152 static size_t dblk_sizes[] = { 153 #ifdef _LP64 154 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920, 155 8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688, 156 40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456, 157 #else 158 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968, 159 8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736, 160 40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504, 161 #endif 162 DBLK_MAX_CACHE, 0 163 }; 164 165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 166 static struct kmem_cache *mblk_cache; 167 static struct kmem_cache *dblk_esb_cache; 168 static struct kmem_cache *fthdr_cache; 169 static struct kmem_cache *ftblk_cache; 170 171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 172 static mblk_t *allocb_oversize(size_t size, int flags); 173 static int allocb_tryhard_fails; 174 static void frnop_func(void *arg); 175 frtn_t frnop = { frnop_func }; 176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 177 178 static boolean_t rwnext_enter(queue_t *qp); 179 static void rwnext_exit(queue_t *qp); 180 181 /* 182 * Patchable mblk/dblk kmem_cache flags. 183 */ 184 int dblk_kmem_flags = 0; 185 int mblk_kmem_flags = 0; 186 187 188 static int 189 dblk_constructor(void *buf, void *cdrarg, int kmflags) 190 { 191 dblk_t *dbp = buf; 192 ssize_t msg_size = (ssize_t)cdrarg; 193 size_t index; 194 195 ASSERT(msg_size != 0); 196 197 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 198 199 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 200 201 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 202 return (-1); 203 if ((msg_size & PAGEOFFSET) == 0) { 204 dbp->db_base = kmem_alloc(msg_size, kmflags); 205 if (dbp->db_base == NULL) { 206 kmem_cache_free(mblk_cache, dbp->db_mblk); 207 return (-1); 208 } 209 } else { 210 dbp->db_base = (unsigned char *)&dbp[1]; 211 } 212 213 dbp->db_mblk->b_datap = dbp; 214 dbp->db_cache = dblk_cache[index]; 215 dbp->db_lim = dbp->db_base + msg_size; 216 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 217 dbp->db_frtnp = NULL; 218 dbp->db_fthdr = NULL; 219 dbp->db_credp = NULL; 220 dbp->db_cpid = -1; 221 dbp->db_struioflag = 0; 222 dbp->db_struioun.cksum.flags = 0; 223 return (0); 224 } 225 226 /*ARGSUSED*/ 227 static int 228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 229 { 230 dblk_t *dbp = buf; 231 232 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 233 return (-1); 234 dbp->db_mblk->b_datap = dbp; 235 dbp->db_cache = dblk_esb_cache; 236 dbp->db_fthdr = NULL; 237 dbp->db_credp = NULL; 238 dbp->db_cpid = -1; 239 dbp->db_struioflag = 0; 240 dbp->db_struioun.cksum.flags = 0; 241 return (0); 242 } 243 244 static int 245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 246 { 247 dblk_t *dbp = buf; 248 bcache_t *bcp = (bcache_t *)cdrarg; 249 250 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 251 return (-1); 252 253 if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache, 254 kmflags)) == NULL) { 255 kmem_cache_free(mblk_cache, dbp->db_mblk); 256 return (-1); 257 } 258 259 dbp->db_mblk->b_datap = dbp; 260 dbp->db_cache = (void *)bcp; 261 dbp->db_lim = dbp->db_base + bcp->size; 262 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 263 dbp->db_frtnp = NULL; 264 dbp->db_fthdr = NULL; 265 dbp->db_credp = NULL; 266 dbp->db_cpid = -1; 267 dbp->db_struioflag = 0; 268 dbp->db_struioun.cksum.flags = 0; 269 return (0); 270 } 271 272 /*ARGSUSED*/ 273 static void 274 dblk_destructor(void *buf, void *cdrarg) 275 { 276 dblk_t *dbp = buf; 277 ssize_t msg_size = (ssize_t)cdrarg; 278 279 ASSERT(dbp->db_mblk->b_datap == dbp); 280 281 ASSERT(msg_size != 0); 282 283 ASSERT(dbp->db_struioflag == 0); 284 ASSERT(dbp->db_struioun.cksum.flags == 0); 285 286 if ((msg_size & PAGEOFFSET) == 0) { 287 kmem_free(dbp->db_base, msg_size); 288 } 289 290 kmem_cache_free(mblk_cache, dbp->db_mblk); 291 } 292 293 static void 294 bcache_dblk_destructor(void *buf, void *cdrarg) 295 { 296 dblk_t *dbp = buf; 297 bcache_t *bcp = (bcache_t *)cdrarg; 298 299 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 300 301 ASSERT(dbp->db_mblk->b_datap == dbp); 302 303 ASSERT(dbp->db_struioflag == 0); 304 ASSERT(dbp->db_struioun.cksum.flags == 0); 305 306 kmem_cache_free(mblk_cache, dbp->db_mblk); 307 } 308 309 void 310 streams_msg_init(void) 311 { 312 char name[40]; 313 size_t size; 314 size_t lastsize = DBLK_MIN_SIZE; 315 size_t *sizep; 316 struct kmem_cache *cp; 317 size_t tot_size; 318 int offset; 319 320 mblk_cache = kmem_cache_create("streams_mblk", 321 sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL, 322 mblk_kmem_flags); 323 324 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 325 326 if ((offset = (size & PAGEOFFSET)) != 0) { 327 /* 328 * We are in the middle of a page, dblk should 329 * be allocated on the same page 330 */ 331 tot_size = size + sizeof (dblk_t); 332 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 333 < PAGESIZE); 334 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 335 336 } else { 337 338 /* 339 * buf size is multiple of page size, dblk and 340 * buffer are allocated separately. 341 */ 342 343 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 344 tot_size = sizeof (dblk_t); 345 } 346 347 (void) sprintf(name, "streams_dblk_%ld", size); 348 cp = kmem_cache_create(name, tot_size, 349 DBLK_CACHE_ALIGN, dblk_constructor, 350 dblk_destructor, NULL, 351 (void *)(size), NULL, dblk_kmem_flags); 352 353 while (lastsize <= size) { 354 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 355 lastsize += DBLK_MIN_SIZE; 356 } 357 } 358 359 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", 360 sizeof (dblk_t), DBLK_CACHE_ALIGN, 361 dblk_esb_constructor, dblk_destructor, NULL, 362 (void *) sizeof (dblk_t), NULL, dblk_kmem_flags); 363 fthdr_cache = kmem_cache_create("streams_fthdr", 364 sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 365 ftblk_cache = kmem_cache_create("streams_ftblk", 366 sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 367 368 /* Initialize Multidata caches */ 369 mmd_init(); 370 } 371 372 /*ARGSUSED*/ 373 mblk_t * 374 allocb(size_t size, uint_t pri) 375 { 376 dblk_t *dbp; 377 mblk_t *mp; 378 size_t index; 379 380 index = (size - 1) >> DBLK_SIZE_SHIFT; 381 382 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 383 if (size != 0) { 384 mp = allocb_oversize(size, KM_NOSLEEP); 385 goto out; 386 } 387 index = 0; 388 } 389 390 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 391 mp = NULL; 392 goto out; 393 } 394 395 mp = dbp->db_mblk; 396 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 397 mp->b_next = mp->b_prev = mp->b_cont = NULL; 398 mp->b_rptr = mp->b_wptr = dbp->db_base; 399 mp->b_queue = NULL; 400 MBLK_BAND_FLAG_WORD(mp) = 0; 401 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 402 out: 403 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 404 405 return (mp); 406 } 407 408 mblk_t * 409 allocb_tmpl(size_t size, const mblk_t *tmpl) 410 { 411 mblk_t *mp = allocb(size, 0); 412 413 if (mp != NULL) { 414 cred_t *cr = DB_CRED(tmpl); 415 if (cr != NULL) 416 crhold(mp->b_datap->db_credp = cr); 417 DB_CPID(mp) = DB_CPID(tmpl); 418 DB_TYPE(mp) = DB_TYPE(tmpl); 419 } 420 return (mp); 421 } 422 423 mblk_t * 424 allocb_cred(size_t size, cred_t *cr) 425 { 426 mblk_t *mp = allocb(size, 0); 427 428 if (mp != NULL && cr != NULL) 429 crhold(mp->b_datap->db_credp = cr); 430 431 return (mp); 432 } 433 434 mblk_t * 435 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr) 436 { 437 mblk_t *mp = allocb_wait(size, 0, flags, error); 438 439 if (mp != NULL && cr != NULL) 440 crhold(mp->b_datap->db_credp = cr); 441 442 return (mp); 443 } 444 445 void 446 freeb(mblk_t *mp) 447 { 448 dblk_t *dbp = mp->b_datap; 449 450 ASSERT(dbp->db_ref > 0); 451 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 452 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 453 454 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 455 456 dbp->db_free(mp, dbp); 457 } 458 459 void 460 freemsg(mblk_t *mp) 461 { 462 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 463 while (mp) { 464 dblk_t *dbp = mp->b_datap; 465 mblk_t *mp_cont = mp->b_cont; 466 467 ASSERT(dbp->db_ref > 0); 468 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 469 470 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 471 472 dbp->db_free(mp, dbp); 473 mp = mp_cont; 474 } 475 } 476 477 /* 478 * Reallocate a block for another use. Try hard to use the old block. 479 * If the old data is wanted (copy), leave b_wptr at the end of the data, 480 * otherwise return b_wptr = b_rptr. 481 * 482 * This routine is private and unstable. 483 */ 484 mblk_t * 485 reallocb(mblk_t *mp, size_t size, uint_t copy) 486 { 487 mblk_t *mp1; 488 unsigned char *old_rptr; 489 ptrdiff_t cur_size; 490 491 if (mp == NULL) 492 return (allocb(size, BPRI_HI)); 493 494 cur_size = mp->b_wptr - mp->b_rptr; 495 old_rptr = mp->b_rptr; 496 497 ASSERT(mp->b_datap->db_ref != 0); 498 499 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 500 /* 501 * If the data is wanted and it will fit where it is, no 502 * work is required. 503 */ 504 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 505 return (mp); 506 507 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 508 mp1 = mp; 509 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 510 /* XXX other mp state could be copied too, db_flags ... ? */ 511 mp1->b_cont = mp->b_cont; 512 } else { 513 return (NULL); 514 } 515 516 if (copy) { 517 bcopy(old_rptr, mp1->b_rptr, cur_size); 518 mp1->b_wptr = mp1->b_rptr + cur_size; 519 } 520 521 if (mp != mp1) 522 freeb(mp); 523 524 return (mp1); 525 } 526 527 static void 528 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 529 { 530 ASSERT(dbp->db_mblk == mp); 531 if (dbp->db_fthdr != NULL) 532 str_ftfree(dbp); 533 534 /* set credp and projid to be 'unspecified' before returning to cache */ 535 if (dbp->db_credp != NULL) { 536 crfree(dbp->db_credp); 537 dbp->db_credp = NULL; 538 } 539 dbp->db_cpid = -1; 540 541 /* Reset the struioflag and the checksum flag fields */ 542 dbp->db_struioflag = 0; 543 dbp->db_struioun.cksum.flags = 0; 544 545 kmem_cache_free(dbp->db_cache, dbp); 546 } 547 548 static void 549 dblk_decref(mblk_t *mp, dblk_t *dbp) 550 { 551 if (dbp->db_ref != 1) { 552 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 553 -(1 << DBLK_RTFU_SHIFT(db_ref))); 554 /* 555 * atomic_add_32_nv() just decremented db_ref, so we no longer 556 * have a reference to the dblk, which means another thread 557 * could free it. Therefore we cannot examine the dblk to 558 * determine whether ours was the last reference. Instead, 559 * we extract the new and minimum reference counts from rtfu. 560 * Note that all we're really saying is "if (ref != refmin)". 561 */ 562 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 563 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 564 kmem_cache_free(mblk_cache, mp); 565 return; 566 } 567 } 568 dbp->db_mblk = mp; 569 dbp->db_free = dbp->db_lastfree; 570 dbp->db_lastfree(mp, dbp); 571 } 572 573 mblk_t * 574 dupb(mblk_t *mp) 575 { 576 dblk_t *dbp = mp->b_datap; 577 mblk_t *new_mp; 578 uint32_t oldrtfu, newrtfu; 579 580 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 581 goto out; 582 583 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 584 new_mp->b_rptr = mp->b_rptr; 585 new_mp->b_wptr = mp->b_wptr; 586 new_mp->b_datap = dbp; 587 new_mp->b_queue = NULL; 588 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 589 590 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 591 592 /* 593 * First-dup optimization. The enabling assumption is that there 594 * can can never be a race (in correct code) to dup the first copy 595 * of a message. Therefore we don't need to do it atomically. 596 */ 597 if (dbp->db_free != dblk_decref) { 598 dbp->db_free = dblk_decref; 599 dbp->db_ref++; 600 goto out; 601 } 602 603 do { 604 ASSERT(dbp->db_ref > 0); 605 oldrtfu = DBLK_RTFU_WORD(dbp); 606 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 607 /* 608 * If db_ref is maxed out we can't dup this message anymore. 609 */ 610 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 611 kmem_cache_free(mblk_cache, new_mp); 612 new_mp = NULL; 613 goto out; 614 } 615 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 616 617 out: 618 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 619 return (new_mp); 620 } 621 622 static void 623 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 624 { 625 frtn_t *frp = dbp->db_frtnp; 626 627 ASSERT(dbp->db_mblk == mp); 628 frp->free_func(frp->free_arg); 629 if (dbp->db_fthdr != NULL) 630 str_ftfree(dbp); 631 632 /* set credp and projid to be 'unspecified' before returning to cache */ 633 if (dbp->db_credp != NULL) { 634 crfree(dbp->db_credp); 635 dbp->db_credp = NULL; 636 } 637 dbp->db_cpid = -1; 638 dbp->db_struioflag = 0; 639 dbp->db_struioun.cksum.flags = 0; 640 641 kmem_cache_free(dbp->db_cache, dbp); 642 } 643 644 /*ARGSUSED*/ 645 static void 646 frnop_func(void *arg) 647 { 648 } 649 650 /* 651 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 652 */ 653 static mblk_t * 654 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 655 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 656 { 657 dblk_t *dbp; 658 mblk_t *mp; 659 660 ASSERT(base != NULL && frp != NULL); 661 662 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 663 mp = NULL; 664 goto out; 665 } 666 667 mp = dbp->db_mblk; 668 dbp->db_base = base; 669 dbp->db_lim = base + size; 670 dbp->db_free = dbp->db_lastfree = lastfree; 671 dbp->db_frtnp = frp; 672 DBLK_RTFU_WORD(dbp) = db_rtfu; 673 mp->b_next = mp->b_prev = mp->b_cont = NULL; 674 mp->b_rptr = mp->b_wptr = base; 675 mp->b_queue = NULL; 676 MBLK_BAND_FLAG_WORD(mp) = 0; 677 678 out: 679 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 680 return (mp); 681 } 682 683 /*ARGSUSED*/ 684 mblk_t * 685 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 686 { 687 mblk_t *mp; 688 689 /* 690 * Note that this is structured to allow the common case (i.e. 691 * STREAMS flowtracing disabled) to call gesballoc() with tail 692 * call optimization. 693 */ 694 if (!str_ftnever) { 695 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 696 frp, freebs_enqueue, KM_NOSLEEP); 697 698 if (mp != NULL) 699 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 700 return (mp); 701 } 702 703 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 704 frp, freebs_enqueue, KM_NOSLEEP)); 705 } 706 707 /* 708 * Same as esballoc() but sleeps waiting for memory. 709 */ 710 /*ARGSUSED*/ 711 mblk_t * 712 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 713 { 714 mblk_t *mp; 715 716 /* 717 * Note that this is structured to allow the common case (i.e. 718 * STREAMS flowtracing disabled) to call gesballoc() with tail 719 * call optimization. 720 */ 721 if (!str_ftnever) { 722 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 723 frp, freebs_enqueue, KM_SLEEP); 724 725 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 726 return (mp); 727 } 728 729 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 730 frp, freebs_enqueue, KM_SLEEP)); 731 } 732 733 /*ARGSUSED*/ 734 mblk_t * 735 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 736 { 737 mblk_t *mp; 738 739 /* 740 * Note that this is structured to allow the common case (i.e. 741 * STREAMS flowtracing disabled) to call gesballoc() with tail 742 * call optimization. 743 */ 744 if (!str_ftnever) { 745 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 746 frp, dblk_lastfree_desb, KM_NOSLEEP); 747 748 if (mp != NULL) 749 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 750 return (mp); 751 } 752 753 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 754 frp, dblk_lastfree_desb, KM_NOSLEEP)); 755 } 756 757 /*ARGSUSED*/ 758 mblk_t * 759 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 760 { 761 mblk_t *mp; 762 763 /* 764 * Note that this is structured to allow the common case (i.e. 765 * STREAMS flowtracing disabled) to call gesballoc() with tail 766 * call optimization. 767 */ 768 if (!str_ftnever) { 769 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 770 frp, freebs_enqueue, KM_NOSLEEP); 771 772 if (mp != NULL) 773 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 774 return (mp); 775 } 776 777 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 778 frp, freebs_enqueue, KM_NOSLEEP)); 779 } 780 781 /*ARGSUSED*/ 782 mblk_t * 783 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 784 { 785 mblk_t *mp; 786 787 /* 788 * Note that this is structured to allow the common case (i.e. 789 * STREAMS flowtracing disabled) to call gesballoc() with tail 790 * call optimization. 791 */ 792 if (!str_ftnever) { 793 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 794 frp, dblk_lastfree_desb, KM_NOSLEEP); 795 796 if (mp != NULL) 797 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 798 return (mp); 799 } 800 801 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 802 frp, dblk_lastfree_desb, KM_NOSLEEP)); 803 } 804 805 static void 806 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 807 { 808 bcache_t *bcp = dbp->db_cache; 809 810 ASSERT(dbp->db_mblk == mp); 811 if (dbp->db_fthdr != NULL) 812 str_ftfree(dbp); 813 814 /* set credp and projid to be 'unspecified' before returning to cache */ 815 if (dbp->db_credp != NULL) { 816 crfree(dbp->db_credp); 817 dbp->db_credp = NULL; 818 } 819 dbp->db_cpid = -1; 820 dbp->db_struioflag = 0; 821 dbp->db_struioun.cksum.flags = 0; 822 823 mutex_enter(&bcp->mutex); 824 kmem_cache_free(bcp->dblk_cache, dbp); 825 bcp->alloc--; 826 827 if (bcp->alloc == 0 && bcp->destroy != 0) { 828 kmem_cache_destroy(bcp->dblk_cache); 829 kmem_cache_destroy(bcp->buffer_cache); 830 mutex_exit(&bcp->mutex); 831 mutex_destroy(&bcp->mutex); 832 kmem_free(bcp, sizeof (bcache_t)); 833 } else { 834 mutex_exit(&bcp->mutex); 835 } 836 } 837 838 bcache_t * 839 bcache_create(char *name, size_t size, uint_t align) 840 { 841 bcache_t *bcp; 842 char buffer[255]; 843 844 ASSERT((align & (align - 1)) == 0); 845 846 if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == 847 NULL) { 848 return (NULL); 849 } 850 851 bcp->size = size; 852 bcp->align = align; 853 bcp->alloc = 0; 854 bcp->destroy = 0; 855 856 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 857 858 (void) sprintf(buffer, "%s_buffer_cache", name); 859 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 860 NULL, NULL, NULL, 0); 861 (void) sprintf(buffer, "%s_dblk_cache", name); 862 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 863 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 864 NULL, (void *)bcp, NULL, 0); 865 866 return (bcp); 867 } 868 869 void 870 bcache_destroy(bcache_t *bcp) 871 { 872 ASSERT(bcp != NULL); 873 874 mutex_enter(&bcp->mutex); 875 if (bcp->alloc == 0) { 876 kmem_cache_destroy(bcp->dblk_cache); 877 kmem_cache_destroy(bcp->buffer_cache); 878 mutex_exit(&bcp->mutex); 879 mutex_destroy(&bcp->mutex); 880 kmem_free(bcp, sizeof (bcache_t)); 881 } else { 882 bcp->destroy++; 883 mutex_exit(&bcp->mutex); 884 } 885 } 886 887 /*ARGSUSED*/ 888 mblk_t * 889 bcache_allocb(bcache_t *bcp, uint_t pri) 890 { 891 dblk_t *dbp; 892 mblk_t *mp = NULL; 893 894 ASSERT(bcp != NULL); 895 896 mutex_enter(&bcp->mutex); 897 if (bcp->destroy != 0) { 898 mutex_exit(&bcp->mutex); 899 goto out; 900 } 901 902 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 903 mutex_exit(&bcp->mutex); 904 goto out; 905 } 906 bcp->alloc++; 907 mutex_exit(&bcp->mutex); 908 909 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 910 911 mp = dbp->db_mblk; 912 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 913 mp->b_next = mp->b_prev = mp->b_cont = NULL; 914 mp->b_rptr = mp->b_wptr = dbp->db_base; 915 mp->b_queue = NULL; 916 MBLK_BAND_FLAG_WORD(mp) = 0; 917 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 918 out: 919 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 920 921 return (mp); 922 } 923 924 static void 925 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 926 { 927 ASSERT(dbp->db_mblk == mp); 928 if (dbp->db_fthdr != NULL) 929 str_ftfree(dbp); 930 931 /* set credp and projid to be 'unspecified' before returning to cache */ 932 if (dbp->db_credp != NULL) { 933 crfree(dbp->db_credp); 934 dbp->db_credp = NULL; 935 } 936 dbp->db_cpid = -1; 937 dbp->db_struioflag = 0; 938 dbp->db_struioun.cksum.flags = 0; 939 940 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 941 kmem_cache_free(dbp->db_cache, dbp); 942 } 943 944 static mblk_t * 945 allocb_oversize(size_t size, int kmflags) 946 { 947 mblk_t *mp; 948 void *buf; 949 950 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 951 if ((buf = kmem_alloc(size, kmflags)) == NULL) 952 return (NULL); 953 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 954 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 955 kmem_free(buf, size); 956 957 if (mp != NULL) 958 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 959 960 return (mp); 961 } 962 963 mblk_t * 964 allocb_tryhard(size_t target_size) 965 { 966 size_t size; 967 mblk_t *bp; 968 969 for (size = target_size; size < target_size + 512; 970 size += DBLK_CACHE_ALIGN) 971 if ((bp = allocb(size, BPRI_HI)) != NULL) 972 return (bp); 973 allocb_tryhard_fails++; 974 return (NULL); 975 } 976 977 /* 978 * This routine is consolidation private for STREAMS internal use 979 * This routine may only be called from sync routines (i.e., not 980 * from put or service procedures). It is located here (rather 981 * than strsubr.c) so that we don't have to expose all of the 982 * allocb() implementation details in header files. 983 */ 984 mblk_t * 985 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 986 { 987 dblk_t *dbp; 988 mblk_t *mp; 989 size_t index; 990 991 index = (size -1) >> DBLK_SIZE_SHIFT; 992 993 if (flags & STR_NOSIG) { 994 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 995 if (size != 0) { 996 mp = allocb_oversize(size, KM_SLEEP); 997 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 998 (uintptr_t)mp); 999 return (mp); 1000 } 1001 index = 0; 1002 } 1003 1004 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1005 mp = dbp->db_mblk; 1006 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1007 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1008 mp->b_rptr = mp->b_wptr = dbp->db_base; 1009 mp->b_queue = NULL; 1010 MBLK_BAND_FLAG_WORD(mp) = 0; 1011 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1012 1013 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1014 1015 } else { 1016 while ((mp = allocb(size, pri)) == NULL) { 1017 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1018 return (NULL); 1019 } 1020 } 1021 1022 return (mp); 1023 } 1024 1025 /* 1026 * Call function 'func' with 'arg' when a class zero block can 1027 * be allocated with priority 'pri'. 1028 */ 1029 bufcall_id_t 1030 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1031 { 1032 return (bufcall(1, pri, func, arg)); 1033 } 1034 1035 /* 1036 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1037 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1038 * This provides consistency for all internal allocators of ioctl. 1039 */ 1040 mblk_t * 1041 mkiocb(uint_t cmd) 1042 { 1043 struct iocblk *ioc; 1044 mblk_t *mp; 1045 1046 /* 1047 * Allocate enough space for any of the ioctl related messages. 1048 */ 1049 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1050 return (NULL); 1051 1052 bzero(mp->b_rptr, sizeof (union ioctypes)); 1053 1054 /* 1055 * Set the mblk_t information and ptrs correctly. 1056 */ 1057 mp->b_wptr += sizeof (struct iocblk); 1058 mp->b_datap->db_type = M_IOCTL; 1059 1060 /* 1061 * Fill in the fields. 1062 */ 1063 ioc = (struct iocblk *)mp->b_rptr; 1064 ioc->ioc_cmd = cmd; 1065 ioc->ioc_cr = kcred; 1066 ioc->ioc_id = getiocseqno(); 1067 ioc->ioc_flag = IOC_NATIVE; 1068 return (mp); 1069 } 1070 1071 /* 1072 * test if block of given size can be allocated with a request of 1073 * the given priority. 1074 * 'pri' is no longer used, but is retained for compatibility. 1075 */ 1076 /* ARGSUSED */ 1077 int 1078 testb(size_t size, uint_t pri) 1079 { 1080 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1081 } 1082 1083 /* 1084 * Call function 'func' with argument 'arg' when there is a reasonably 1085 * good chance that a block of size 'size' can be allocated. 1086 * 'pri' is no longer used, but is retained for compatibility. 1087 */ 1088 /* ARGSUSED */ 1089 bufcall_id_t 1090 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1091 { 1092 static long bid = 1; /* always odd to save checking for zero */ 1093 bufcall_id_t bc_id; 1094 struct strbufcall *bcp; 1095 1096 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1097 return (0); 1098 1099 bcp->bc_func = func; 1100 bcp->bc_arg = arg; 1101 bcp->bc_size = size; 1102 bcp->bc_next = NULL; 1103 bcp->bc_executor = NULL; 1104 1105 mutex_enter(&strbcall_lock); 1106 /* 1107 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1108 * should be no references to bcp since it may be freed by 1109 * runbufcalls(). Since bcp_id field is returned, we save its value in 1110 * the local var. 1111 */ 1112 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1113 1114 /* 1115 * add newly allocated stream event to existing 1116 * linked list of events. 1117 */ 1118 if (strbcalls.bc_head == NULL) { 1119 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1120 } else { 1121 strbcalls.bc_tail->bc_next = bcp; 1122 strbcalls.bc_tail = bcp; 1123 } 1124 1125 cv_signal(&strbcall_cv); 1126 mutex_exit(&strbcall_lock); 1127 return (bc_id); 1128 } 1129 1130 /* 1131 * Cancel a bufcall request. 1132 */ 1133 void 1134 unbufcall(bufcall_id_t id) 1135 { 1136 strbufcall_t *bcp, *pbcp; 1137 1138 mutex_enter(&strbcall_lock); 1139 again: 1140 pbcp = NULL; 1141 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1142 if (id == bcp->bc_id) 1143 break; 1144 pbcp = bcp; 1145 } 1146 if (bcp) { 1147 if (bcp->bc_executor != NULL) { 1148 if (bcp->bc_executor != curthread) { 1149 cv_wait(&bcall_cv, &strbcall_lock); 1150 goto again; 1151 } 1152 } else { 1153 if (pbcp) 1154 pbcp->bc_next = bcp->bc_next; 1155 else 1156 strbcalls.bc_head = bcp->bc_next; 1157 if (bcp == strbcalls.bc_tail) 1158 strbcalls.bc_tail = pbcp; 1159 kmem_free(bcp, sizeof (strbufcall_t)); 1160 } 1161 } 1162 mutex_exit(&strbcall_lock); 1163 } 1164 1165 /* 1166 * Duplicate a message block by block (uses dupb), returning 1167 * a pointer to the duplicate message. 1168 * Returns a non-NULL value only if the entire message 1169 * was dup'd. 1170 */ 1171 mblk_t * 1172 dupmsg(mblk_t *bp) 1173 { 1174 mblk_t *head, *nbp; 1175 1176 if (!bp || !(nbp = head = dupb(bp))) 1177 return (NULL); 1178 1179 while (bp->b_cont) { 1180 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1181 freemsg(head); 1182 return (NULL); 1183 } 1184 nbp = nbp->b_cont; 1185 bp = bp->b_cont; 1186 } 1187 return (head); 1188 } 1189 1190 #define DUPB_NOLOAN(bp) \ 1191 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1192 copyb((bp)) : dupb((bp))) 1193 1194 mblk_t * 1195 dupmsg_noloan(mblk_t *bp) 1196 { 1197 mblk_t *head, *nbp; 1198 1199 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1200 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1201 return (NULL); 1202 1203 while (bp->b_cont) { 1204 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1205 freemsg(head); 1206 return (NULL); 1207 } 1208 nbp = nbp->b_cont; 1209 bp = bp->b_cont; 1210 } 1211 return (head); 1212 } 1213 1214 /* 1215 * Copy data from message and data block to newly allocated message and 1216 * data block. Returns new message block pointer, or NULL if error. 1217 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1218 * as in the original even when db_base is not word aligned. (bug 1052877) 1219 */ 1220 mblk_t * 1221 copyb(mblk_t *bp) 1222 { 1223 mblk_t *nbp; 1224 dblk_t *dp, *ndp; 1225 uchar_t *base; 1226 size_t size; 1227 size_t unaligned; 1228 1229 ASSERT(bp->b_wptr >= bp->b_rptr); 1230 1231 dp = bp->b_datap; 1232 if (dp->db_fthdr != NULL) 1233 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1234 1235 /* 1236 * Special handling for Multidata message; this should be 1237 * removed once a copy-callback routine is made available. 1238 */ 1239 if (dp->db_type == M_MULTIDATA) { 1240 cred_t *cr; 1241 1242 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1243 return (NULL); 1244 1245 nbp->b_flag = bp->b_flag; 1246 nbp->b_band = bp->b_band; 1247 ndp = nbp->b_datap; 1248 1249 /* See comments below on potential issues. */ 1250 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1251 1252 ASSERT(ndp->db_type == dp->db_type); 1253 cr = dp->db_credp; 1254 if (cr != NULL) 1255 crhold(ndp->db_credp = cr); 1256 ndp->db_cpid = dp->db_cpid; 1257 return (nbp); 1258 } 1259 1260 size = dp->db_lim - dp->db_base; 1261 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1262 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1263 return (NULL); 1264 nbp->b_flag = bp->b_flag; 1265 nbp->b_band = bp->b_band; 1266 ndp = nbp->b_datap; 1267 1268 /* 1269 * Well, here is a potential issue. If we are trying to 1270 * trace a flow, and we copy the message, we might lose 1271 * information about where this message might have been. 1272 * So we should inherit the FT data. On the other hand, 1273 * a user might be interested only in alloc to free data. 1274 * So I guess the real answer is to provide a tunable. 1275 */ 1276 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1277 1278 base = ndp->db_base + unaligned; 1279 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1280 1281 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1282 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1283 1284 return (nbp); 1285 } 1286 1287 /* 1288 * Copy data from message to newly allocated message using new 1289 * data blocks. Returns a pointer to the new message, or NULL if error. 1290 */ 1291 mblk_t * 1292 copymsg(mblk_t *bp) 1293 { 1294 mblk_t *head, *nbp; 1295 1296 if (!bp || !(nbp = head = copyb(bp))) 1297 return (NULL); 1298 1299 while (bp->b_cont) { 1300 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1301 freemsg(head); 1302 return (NULL); 1303 } 1304 nbp = nbp->b_cont; 1305 bp = bp->b_cont; 1306 } 1307 return (head); 1308 } 1309 1310 /* 1311 * link a message block to tail of message 1312 */ 1313 void 1314 linkb(mblk_t *mp, mblk_t *bp) 1315 { 1316 ASSERT(mp && bp); 1317 1318 for (; mp->b_cont; mp = mp->b_cont) 1319 ; 1320 mp->b_cont = bp; 1321 } 1322 1323 /* 1324 * unlink a message block from head of message 1325 * return pointer to new message. 1326 * NULL if message becomes empty. 1327 */ 1328 mblk_t * 1329 unlinkb(mblk_t *bp) 1330 { 1331 mblk_t *bp1; 1332 1333 bp1 = bp->b_cont; 1334 bp->b_cont = NULL; 1335 return (bp1); 1336 } 1337 1338 /* 1339 * remove a message block "bp" from message "mp" 1340 * 1341 * Return pointer to new message or NULL if no message remains. 1342 * Return -1 if bp is not found in message. 1343 */ 1344 mblk_t * 1345 rmvb(mblk_t *mp, mblk_t *bp) 1346 { 1347 mblk_t *tmp; 1348 mblk_t *lastp = NULL; 1349 1350 ASSERT(mp && bp); 1351 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1352 if (tmp == bp) { 1353 if (lastp) 1354 lastp->b_cont = tmp->b_cont; 1355 else 1356 mp = tmp->b_cont; 1357 tmp->b_cont = NULL; 1358 return (mp); 1359 } 1360 lastp = tmp; 1361 } 1362 return ((mblk_t *)-1); 1363 } 1364 1365 /* 1366 * Concatenate and align first len bytes of common 1367 * message type. Len == -1, means concat everything. 1368 * Returns 1 on success, 0 on failure 1369 * After the pullup, mp points to the pulled up data. 1370 */ 1371 int 1372 pullupmsg(mblk_t *mp, ssize_t len) 1373 { 1374 mblk_t *bp, *b_cont; 1375 dblk_t *dbp; 1376 ssize_t n; 1377 1378 ASSERT(mp->b_datap->db_ref > 0); 1379 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1380 1381 /* 1382 * We won't handle Multidata message, since it contains 1383 * metadata which this function has no knowledge of; we 1384 * assert on DEBUG, and return failure otherwise. 1385 */ 1386 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1387 if (mp->b_datap->db_type == M_MULTIDATA) 1388 return (0); 1389 1390 if (len == -1) { 1391 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1392 return (1); 1393 len = xmsgsize(mp); 1394 } else { 1395 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1396 ASSERT(first_mblk_len >= 0); 1397 /* 1398 * If the length is less than that of the first mblk, 1399 * we want to pull up the message into an aligned mblk. 1400 * Though not part of the spec, some callers assume it. 1401 */ 1402 if (len <= first_mblk_len) { 1403 if (str_aligned(mp->b_rptr)) 1404 return (1); 1405 len = first_mblk_len; 1406 } else if (xmsgsize(mp) < len) 1407 return (0); 1408 } 1409 1410 if ((bp = allocb_tmpl(len, mp)) == NULL) 1411 return (0); 1412 1413 dbp = bp->b_datap; 1414 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1415 mp->b_datap = dbp; /* ... and mp heads the new message */ 1416 mp->b_datap->db_mblk = mp; 1417 bp->b_datap->db_mblk = bp; 1418 mp->b_rptr = mp->b_wptr = dbp->db_base; 1419 1420 do { 1421 ASSERT(bp->b_datap->db_ref > 0); 1422 ASSERT(bp->b_wptr >= bp->b_rptr); 1423 n = MIN(bp->b_wptr - bp->b_rptr, len); 1424 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1425 mp->b_wptr += n; 1426 bp->b_rptr += n; 1427 len -= n; 1428 if (bp->b_rptr != bp->b_wptr) 1429 break; 1430 b_cont = bp->b_cont; 1431 freeb(bp); 1432 bp = b_cont; 1433 } while (len && bp); 1434 1435 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1436 1437 return (1); 1438 } 1439 1440 /* 1441 * Concatenate and align at least the first len bytes of common message 1442 * type. Len == -1 means concatenate everything. The original message is 1443 * unaltered. Returns a pointer to a new message on success, otherwise 1444 * returns NULL. 1445 */ 1446 mblk_t * 1447 msgpullup(mblk_t *mp, ssize_t len) 1448 { 1449 mblk_t *newmp; 1450 ssize_t totlen; 1451 ssize_t n; 1452 1453 /* 1454 * We won't handle Multidata message, since it contains 1455 * metadata which this function has no knowledge of; we 1456 * assert on DEBUG, and return failure otherwise. 1457 */ 1458 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1459 if (mp->b_datap->db_type == M_MULTIDATA) 1460 return (NULL); 1461 1462 totlen = xmsgsize(mp); 1463 1464 if ((len > 0) && (len > totlen)) 1465 return (NULL); 1466 1467 /* 1468 * Copy all of the first msg type into one new mblk, then dupmsg 1469 * and link the rest onto this. 1470 */ 1471 1472 len = totlen; 1473 1474 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1475 return (NULL); 1476 1477 newmp->b_flag = mp->b_flag; 1478 newmp->b_band = mp->b_band; 1479 1480 while (len > 0) { 1481 n = mp->b_wptr - mp->b_rptr; 1482 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1483 if (n > 0) 1484 bcopy(mp->b_rptr, newmp->b_wptr, n); 1485 newmp->b_wptr += n; 1486 len -= n; 1487 mp = mp->b_cont; 1488 } 1489 1490 if (mp != NULL) { 1491 newmp->b_cont = dupmsg(mp); 1492 if (newmp->b_cont == NULL) { 1493 freemsg(newmp); 1494 return (NULL); 1495 } 1496 } 1497 1498 return (newmp); 1499 } 1500 1501 /* 1502 * Trim bytes from message 1503 * len > 0, trim from head 1504 * len < 0, trim from tail 1505 * Returns 1 on success, 0 on failure. 1506 */ 1507 int 1508 adjmsg(mblk_t *mp, ssize_t len) 1509 { 1510 mblk_t *bp; 1511 mblk_t *save_bp = NULL; 1512 mblk_t *prev_bp; 1513 mblk_t *bcont; 1514 unsigned char type; 1515 ssize_t n; 1516 int fromhead; 1517 int first; 1518 1519 ASSERT(mp != NULL); 1520 /* 1521 * We won't handle Multidata message, since it contains 1522 * metadata which this function has no knowledge of; we 1523 * assert on DEBUG, and return failure otherwise. 1524 */ 1525 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1526 if (mp->b_datap->db_type == M_MULTIDATA) 1527 return (0); 1528 1529 if (len < 0) { 1530 fromhead = 0; 1531 len = -len; 1532 } else { 1533 fromhead = 1; 1534 } 1535 1536 if (xmsgsize(mp) < len) 1537 return (0); 1538 1539 1540 if (fromhead) { 1541 first = 1; 1542 while (len) { 1543 ASSERT(mp->b_wptr >= mp->b_rptr); 1544 n = MIN(mp->b_wptr - mp->b_rptr, len); 1545 mp->b_rptr += n; 1546 len -= n; 1547 1548 /* 1549 * If this is not the first zero length 1550 * message remove it 1551 */ 1552 if (!first && (mp->b_wptr == mp->b_rptr)) { 1553 bcont = mp->b_cont; 1554 freeb(mp); 1555 mp = save_bp->b_cont = bcont; 1556 } else { 1557 save_bp = mp; 1558 mp = mp->b_cont; 1559 } 1560 first = 0; 1561 } 1562 } else { 1563 type = mp->b_datap->db_type; 1564 while (len) { 1565 bp = mp; 1566 save_bp = NULL; 1567 1568 /* 1569 * Find the last message of same type 1570 */ 1571 1572 while (bp && bp->b_datap->db_type == type) { 1573 ASSERT(bp->b_wptr >= bp->b_rptr); 1574 prev_bp = save_bp; 1575 save_bp = bp; 1576 bp = bp->b_cont; 1577 } 1578 if (save_bp == NULL) 1579 break; 1580 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1581 save_bp->b_wptr -= n; 1582 len -= n; 1583 1584 /* 1585 * If this is not the first message 1586 * and we have taken away everything 1587 * from this message, remove it 1588 */ 1589 1590 if ((save_bp != mp) && 1591 (save_bp->b_wptr == save_bp->b_rptr)) { 1592 bcont = save_bp->b_cont; 1593 freeb(save_bp); 1594 prev_bp->b_cont = bcont; 1595 } 1596 } 1597 } 1598 return (1); 1599 } 1600 1601 /* 1602 * get number of data bytes in message 1603 */ 1604 size_t 1605 msgdsize(mblk_t *bp) 1606 { 1607 size_t count = 0; 1608 1609 for (; bp; bp = bp->b_cont) 1610 if (bp->b_datap->db_type == M_DATA) { 1611 ASSERT(bp->b_wptr >= bp->b_rptr); 1612 count += bp->b_wptr - bp->b_rptr; 1613 } 1614 return (count); 1615 } 1616 1617 /* 1618 * Get a message off head of queue 1619 * 1620 * If queue has no buffers then mark queue 1621 * with QWANTR. (queue wants to be read by 1622 * someone when data becomes available) 1623 * 1624 * If there is something to take off then do so. 1625 * If queue falls below hi water mark turn off QFULL 1626 * flag. Decrement weighted count of queue. 1627 * Also turn off QWANTR because queue is being read. 1628 * 1629 * The queue count is maintained on a per-band basis. 1630 * Priority band 0 (normal messages) uses q_count, 1631 * q_lowat, etc. Non-zero priority bands use the 1632 * fields in their respective qband structures 1633 * (qb_count, qb_lowat, etc.) All messages appear 1634 * on the same list, linked via their b_next pointers. 1635 * q_first is the head of the list. q_count does 1636 * not reflect the size of all the messages on the 1637 * queue. It only reflects those messages in the 1638 * normal band of flow. The one exception to this 1639 * deals with high priority messages. They are in 1640 * their own conceptual "band", but are accounted 1641 * against q_count. 1642 * 1643 * If queue count is below the lo water mark and QWANTW 1644 * is set, enable the closest backq which has a service 1645 * procedure and turn off the QWANTW flag. 1646 * 1647 * getq could be built on top of rmvq, but isn't because 1648 * of performance considerations. 1649 * 1650 * A note on the use of q_count and q_mblkcnt: 1651 * q_count is the traditional byte count for messages that 1652 * have been put on a queue. Documentation tells us that 1653 * we shouldn't rely on that count, but some drivers/modules 1654 * do. What was needed, however, is a mechanism to prevent 1655 * runaway streams from consuming all of the resources, 1656 * and particularly be able to flow control zero-length 1657 * messages. q_mblkcnt is used for this purpose. It 1658 * counts the number of mblk's that are being put on 1659 * the queue. The intention here, is that each mblk should 1660 * contain one byte of data and, for the purpose of 1661 * flow-control, logically does. A queue will become 1662 * full when EITHER of these values (q_count and q_mblkcnt) 1663 * reach the highwater mark. It will clear when BOTH 1664 * of them drop below the highwater mark. And it will 1665 * backenable when BOTH of them drop below the lowwater 1666 * mark. 1667 * With this algorithm, a driver/module might be able 1668 * to find a reasonably accurate q_count, and the 1669 * framework can still try and limit resource usage. 1670 */ 1671 mblk_t * 1672 getq(queue_t *q) 1673 { 1674 mblk_t *bp; 1675 uchar_t band = 0; 1676 1677 bp = getq_noenab(q); 1678 if (bp != NULL) 1679 band = bp->b_band; 1680 1681 /* 1682 * Inlined from qbackenable(). 1683 * Quick check without holding the lock. 1684 */ 1685 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1686 return (bp); 1687 1688 qbackenable(q, band); 1689 return (bp); 1690 } 1691 1692 /* 1693 * Calculate number of data bytes in a single data message block taking 1694 * multidata messages into account. 1695 */ 1696 1697 #define ADD_MBLK_SIZE(mp, size) \ 1698 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1699 (size) += MBLKL(mp); \ 1700 } else { \ 1701 uint_t pinuse; \ 1702 \ 1703 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1704 (size) += pinuse; \ 1705 } 1706 1707 /* 1708 * Like getq() but does not backenable. This is used by the stream 1709 * head when a putback() is likely. The caller must call qbackenable() 1710 * after it is done with accessing the queue. 1711 */ 1712 mblk_t * 1713 getq_noenab(queue_t *q) 1714 { 1715 mblk_t *bp; 1716 mblk_t *tmp; 1717 qband_t *qbp; 1718 kthread_id_t freezer; 1719 int bytecnt = 0, mblkcnt = 0; 1720 1721 /* freezestr should allow its caller to call getq/putq */ 1722 freezer = STREAM(q)->sd_freezer; 1723 if (freezer == curthread) { 1724 ASSERT(frozenstr(q)); 1725 ASSERT(MUTEX_HELD(QLOCK(q))); 1726 } else 1727 mutex_enter(QLOCK(q)); 1728 1729 if ((bp = q->q_first) == 0) { 1730 q->q_flag |= QWANTR; 1731 } else { 1732 if ((q->q_first = bp->b_next) == NULL) 1733 q->q_last = NULL; 1734 else 1735 q->q_first->b_prev = NULL; 1736 1737 /* Get message byte count for q_count accounting */ 1738 for (tmp = bp; tmp; tmp = tmp->b_cont) { 1739 ADD_MBLK_SIZE(tmp, bytecnt); 1740 mblkcnt++; 1741 } 1742 1743 if (bp->b_band == 0) { 1744 q->q_count -= bytecnt; 1745 q->q_mblkcnt -= mblkcnt; 1746 if ((q->q_count < q->q_hiwat) && 1747 (q->q_mblkcnt < q->q_hiwat)) { 1748 q->q_flag &= ~QFULL; 1749 } 1750 } else { 1751 int i; 1752 1753 ASSERT(bp->b_band <= q->q_nband); 1754 ASSERT(q->q_bandp != NULL); 1755 ASSERT(MUTEX_HELD(QLOCK(q))); 1756 qbp = q->q_bandp; 1757 i = bp->b_band; 1758 while (--i > 0) 1759 qbp = qbp->qb_next; 1760 if (qbp->qb_first == qbp->qb_last) { 1761 qbp->qb_first = NULL; 1762 qbp->qb_last = NULL; 1763 } else { 1764 qbp->qb_first = bp->b_next; 1765 } 1766 qbp->qb_count -= bytecnt; 1767 qbp->qb_mblkcnt -= mblkcnt; 1768 if ((qbp->qb_count < qbp->qb_hiwat) && 1769 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1770 qbp->qb_flag &= ~QB_FULL; 1771 } 1772 } 1773 q->q_flag &= ~QWANTR; 1774 bp->b_next = NULL; 1775 bp->b_prev = NULL; 1776 } 1777 if (freezer != curthread) 1778 mutex_exit(QLOCK(q)); 1779 1780 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 1781 1782 return (bp); 1783 } 1784 1785 /* 1786 * Determine if a backenable is needed after removing a message in the 1787 * specified band. 1788 * NOTE: This routine assumes that something like getq_noenab() has been 1789 * already called. 1790 * 1791 * For the read side it is ok to hold sd_lock across calling this (and the 1792 * stream head often does). 1793 * But for the write side strwakeq might be invoked and it acquires sd_lock. 1794 */ 1795 void 1796 qbackenable(queue_t *q, uchar_t band) 1797 { 1798 int backenab = 0; 1799 qband_t *qbp; 1800 kthread_id_t freezer; 1801 1802 ASSERT(q); 1803 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 1804 1805 /* 1806 * Quick check without holding the lock. 1807 * OK since after getq() has lowered the q_count these flags 1808 * would not change unless either the qbackenable() is done by 1809 * another thread (which is ok) or the queue has gotten QFULL 1810 * in which case another backenable will take place when the queue 1811 * drops below q_lowat. 1812 */ 1813 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1814 return; 1815 1816 /* freezestr should allow its caller to call getq/putq */ 1817 freezer = STREAM(q)->sd_freezer; 1818 if (freezer == curthread) { 1819 ASSERT(frozenstr(q)); 1820 ASSERT(MUTEX_HELD(QLOCK(q))); 1821 } else 1822 mutex_enter(QLOCK(q)); 1823 1824 if (band == 0) { 1825 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 1826 q->q_mblkcnt < q->q_lowat)) { 1827 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 1828 } 1829 } else { 1830 int i; 1831 1832 ASSERT((unsigned)band <= q->q_nband); 1833 ASSERT(q->q_bandp != NULL); 1834 1835 qbp = q->q_bandp; 1836 i = band; 1837 while (--i > 0) 1838 qbp = qbp->qb_next; 1839 1840 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 1841 qbp->qb_mblkcnt < qbp->qb_lowat)) { 1842 backenab = qbp->qb_flag & QB_WANTW; 1843 } 1844 } 1845 1846 if (backenab == 0) { 1847 if (freezer != curthread) 1848 mutex_exit(QLOCK(q)); 1849 return; 1850 } 1851 1852 /* Have to drop the lock across strwakeq and backenable */ 1853 if (backenab & QWANTWSYNC) 1854 q->q_flag &= ~QWANTWSYNC; 1855 if (backenab & (QWANTW|QB_WANTW)) { 1856 if (band != 0) 1857 qbp->qb_flag &= ~QB_WANTW; 1858 else { 1859 q->q_flag &= ~QWANTW; 1860 } 1861 } 1862 1863 if (freezer != curthread) 1864 mutex_exit(QLOCK(q)); 1865 1866 if (backenab & QWANTWSYNC) 1867 strwakeq(q, QWANTWSYNC); 1868 if (backenab & (QWANTW|QB_WANTW)) 1869 backenable(q, band); 1870 } 1871 1872 /* 1873 * Remove a message from a queue. The queue count and other 1874 * flow control parameters are adjusted and the back queue 1875 * enabled if necessary. 1876 * 1877 * rmvq can be called with the stream frozen, but other utility functions 1878 * holding QLOCK, and by streams modules without any locks/frozen. 1879 */ 1880 void 1881 rmvq(queue_t *q, mblk_t *mp) 1882 { 1883 ASSERT(mp != NULL); 1884 1885 rmvq_noenab(q, mp); 1886 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 1887 /* 1888 * qbackenable can handle a frozen stream but not a "random" 1889 * qlock being held. Drop lock across qbackenable. 1890 */ 1891 mutex_exit(QLOCK(q)); 1892 qbackenable(q, mp->b_band); 1893 mutex_enter(QLOCK(q)); 1894 } else { 1895 qbackenable(q, mp->b_band); 1896 } 1897 } 1898 1899 /* 1900 * Like rmvq() but without any backenabling. 1901 * This exists to handle SR_CONSOL_DATA in strrput(). 1902 */ 1903 void 1904 rmvq_noenab(queue_t *q, mblk_t *mp) 1905 { 1906 mblk_t *tmp; 1907 int i; 1908 qband_t *qbp = NULL; 1909 kthread_id_t freezer; 1910 int bytecnt = 0, mblkcnt = 0; 1911 1912 freezer = STREAM(q)->sd_freezer; 1913 if (freezer == curthread) { 1914 ASSERT(frozenstr(q)); 1915 ASSERT(MUTEX_HELD(QLOCK(q))); 1916 } else if (MUTEX_HELD(QLOCK(q))) { 1917 /* Don't drop lock on exit */ 1918 freezer = curthread; 1919 } else 1920 mutex_enter(QLOCK(q)); 1921 1922 ASSERT(mp->b_band <= q->q_nband); 1923 if (mp->b_band != 0) { /* Adjust band pointers */ 1924 ASSERT(q->q_bandp != NULL); 1925 qbp = q->q_bandp; 1926 i = mp->b_band; 1927 while (--i > 0) 1928 qbp = qbp->qb_next; 1929 if (mp == qbp->qb_first) { 1930 if (mp->b_next && mp->b_band == mp->b_next->b_band) 1931 qbp->qb_first = mp->b_next; 1932 else 1933 qbp->qb_first = NULL; 1934 } 1935 if (mp == qbp->qb_last) { 1936 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 1937 qbp->qb_last = mp->b_prev; 1938 else 1939 qbp->qb_last = NULL; 1940 } 1941 } 1942 1943 /* 1944 * Remove the message from the list. 1945 */ 1946 if (mp->b_prev) 1947 mp->b_prev->b_next = mp->b_next; 1948 else 1949 q->q_first = mp->b_next; 1950 if (mp->b_next) 1951 mp->b_next->b_prev = mp->b_prev; 1952 else 1953 q->q_last = mp->b_prev; 1954 mp->b_next = NULL; 1955 mp->b_prev = NULL; 1956 1957 /* Get the size of the message for q_count accounting */ 1958 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1959 ADD_MBLK_SIZE(tmp, bytecnt); 1960 mblkcnt++; 1961 } 1962 1963 if (mp->b_band == 0) { /* Perform q_count accounting */ 1964 q->q_count -= bytecnt; 1965 q->q_mblkcnt -= mblkcnt; 1966 if ((q->q_count < q->q_hiwat) && 1967 (q->q_mblkcnt < q->q_hiwat)) { 1968 q->q_flag &= ~QFULL; 1969 } 1970 } else { /* Perform qb_count accounting */ 1971 qbp->qb_count -= bytecnt; 1972 qbp->qb_mblkcnt -= mblkcnt; 1973 if ((qbp->qb_count < qbp->qb_hiwat) && 1974 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1975 qbp->qb_flag &= ~QB_FULL; 1976 } 1977 } 1978 if (freezer != curthread) 1979 mutex_exit(QLOCK(q)); 1980 1981 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 1982 } 1983 1984 /* 1985 * Empty a queue. 1986 * If flag is set, remove all messages. Otherwise, remove 1987 * only non-control messages. If queue falls below its low 1988 * water mark, and QWANTW is set, enable the nearest upstream 1989 * service procedure. 1990 * 1991 * Historical note: when merging the M_FLUSH code in strrput with this 1992 * code one difference was discovered. flushq did not have a check 1993 * for q_lowat == 0 in the backenabling test. 1994 * 1995 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 1996 * if one exists on the queue. 1997 */ 1998 void 1999 flushq_common(queue_t *q, int flag, int pcproto_flag) 2000 { 2001 mblk_t *mp, *nmp; 2002 qband_t *qbp; 2003 int backenab = 0; 2004 unsigned char bpri; 2005 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2006 2007 if (q->q_first == NULL) 2008 return; 2009 2010 mutex_enter(QLOCK(q)); 2011 mp = q->q_first; 2012 q->q_first = NULL; 2013 q->q_last = NULL; 2014 q->q_count = 0; 2015 q->q_mblkcnt = 0; 2016 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2017 qbp->qb_first = NULL; 2018 qbp->qb_last = NULL; 2019 qbp->qb_count = 0; 2020 qbp->qb_mblkcnt = 0; 2021 qbp->qb_flag &= ~QB_FULL; 2022 } 2023 q->q_flag &= ~QFULL; 2024 mutex_exit(QLOCK(q)); 2025 while (mp) { 2026 nmp = mp->b_next; 2027 mp->b_next = mp->b_prev = NULL; 2028 2029 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2030 2031 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2032 (void) putq(q, mp); 2033 else if (flag || datamsg(mp->b_datap->db_type)) 2034 freemsg(mp); 2035 else 2036 (void) putq(q, mp); 2037 mp = nmp; 2038 } 2039 bpri = 1; 2040 mutex_enter(QLOCK(q)); 2041 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2042 if ((qbp->qb_flag & QB_WANTW) && 2043 (((qbp->qb_count < qbp->qb_lowat) && 2044 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2045 qbp->qb_lowat == 0)) { 2046 qbp->qb_flag &= ~QB_WANTW; 2047 backenab = 1; 2048 qbf[bpri] = 1; 2049 } else 2050 qbf[bpri] = 0; 2051 bpri++; 2052 } 2053 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2054 if ((q->q_flag & QWANTW) && 2055 (((q->q_count < q->q_lowat) && 2056 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2057 q->q_flag &= ~QWANTW; 2058 backenab = 1; 2059 qbf[0] = 1; 2060 } else 2061 qbf[0] = 0; 2062 2063 /* 2064 * If any band can now be written to, and there is a writer 2065 * for that band, then backenable the closest service procedure. 2066 */ 2067 if (backenab) { 2068 mutex_exit(QLOCK(q)); 2069 for (bpri = q->q_nband; bpri != 0; bpri--) 2070 if (qbf[bpri]) 2071 backenable(q, bpri); 2072 if (qbf[0]) 2073 backenable(q, 0); 2074 } else 2075 mutex_exit(QLOCK(q)); 2076 } 2077 2078 /* 2079 * The real flushing takes place in flushq_common. This is done so that 2080 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2081 * or not. Currently the only place that uses this flag is the stream head. 2082 */ 2083 void 2084 flushq(queue_t *q, int flag) 2085 { 2086 flushq_common(q, flag, 0); 2087 } 2088 2089 /* 2090 * Flush the queue of messages of the given priority band. 2091 * There is some duplication of code between flushq and flushband. 2092 * This is because we want to optimize the code as much as possible. 2093 * The assumption is that there will be more messages in the normal 2094 * (priority 0) band than in any other. 2095 * 2096 * Historical note: when merging the M_FLUSH code in strrput with this 2097 * code one difference was discovered. flushband had an extra check for 2098 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2099 * case. That check does not match the man page for flushband and was not 2100 * in the strrput flush code hence it was removed. 2101 */ 2102 void 2103 flushband(queue_t *q, unsigned char pri, int flag) 2104 { 2105 mblk_t *mp; 2106 mblk_t *nmp; 2107 mblk_t *last; 2108 qband_t *qbp; 2109 int band; 2110 2111 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2112 if (pri > q->q_nband) { 2113 return; 2114 } 2115 mutex_enter(QLOCK(q)); 2116 if (pri == 0) { 2117 mp = q->q_first; 2118 q->q_first = NULL; 2119 q->q_last = NULL; 2120 q->q_count = 0; 2121 q->q_mblkcnt = 0; 2122 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2123 qbp->qb_first = NULL; 2124 qbp->qb_last = NULL; 2125 qbp->qb_count = 0; 2126 qbp->qb_mblkcnt = 0; 2127 qbp->qb_flag &= ~QB_FULL; 2128 } 2129 q->q_flag &= ~QFULL; 2130 mutex_exit(QLOCK(q)); 2131 while (mp) { 2132 nmp = mp->b_next; 2133 mp->b_next = mp->b_prev = NULL; 2134 if ((mp->b_band == 0) && 2135 ((flag == FLUSHALL) || 2136 datamsg(mp->b_datap->db_type))) 2137 freemsg(mp); 2138 else 2139 (void) putq(q, mp); 2140 mp = nmp; 2141 } 2142 mutex_enter(QLOCK(q)); 2143 if ((q->q_flag & QWANTW) && 2144 (((q->q_count < q->q_lowat) && 2145 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2146 q->q_flag &= ~QWANTW; 2147 mutex_exit(QLOCK(q)); 2148 2149 backenable(q, pri); 2150 } else 2151 mutex_exit(QLOCK(q)); 2152 } else { /* pri != 0 */ 2153 boolean_t flushed = B_FALSE; 2154 band = pri; 2155 2156 ASSERT(MUTEX_HELD(QLOCK(q))); 2157 qbp = q->q_bandp; 2158 while (--band > 0) 2159 qbp = qbp->qb_next; 2160 mp = qbp->qb_first; 2161 if (mp == NULL) { 2162 mutex_exit(QLOCK(q)); 2163 return; 2164 } 2165 last = qbp->qb_last->b_next; 2166 /* 2167 * rmvq_noenab() and freemsg() are called for each mblk that 2168 * meets the criteria. The loop is executed until the last 2169 * mblk has been processed. 2170 */ 2171 while (mp != last) { 2172 ASSERT(mp->b_band == pri); 2173 nmp = mp->b_next; 2174 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2175 rmvq_noenab(q, mp); 2176 freemsg(mp); 2177 flushed = B_TRUE; 2178 } 2179 mp = nmp; 2180 } 2181 mutex_exit(QLOCK(q)); 2182 2183 /* 2184 * If any mblk(s) has been freed, we know that qbackenable() 2185 * will need to be called. 2186 */ 2187 if (flushed) 2188 qbackenable(q, pri); 2189 } 2190 } 2191 2192 /* 2193 * Return 1 if the queue is not full. If the queue is full, return 2194 * 0 (may not put message) and set QWANTW flag (caller wants to write 2195 * to the queue). 2196 */ 2197 int 2198 canput(queue_t *q) 2199 { 2200 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2201 2202 /* this is for loopback transports, they should not do a canput */ 2203 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2204 2205 /* Find next forward module that has a service procedure */ 2206 q = q->q_nfsrv; 2207 2208 if (!(q->q_flag & QFULL)) { 2209 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2210 return (1); 2211 } 2212 mutex_enter(QLOCK(q)); 2213 if (q->q_flag & QFULL) { 2214 q->q_flag |= QWANTW; 2215 mutex_exit(QLOCK(q)); 2216 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2217 return (0); 2218 } 2219 mutex_exit(QLOCK(q)); 2220 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2221 return (1); 2222 } 2223 2224 /* 2225 * This is the new canput for use with priority bands. Return 1 if the 2226 * band is not full. If the band is full, return 0 (may not put message) 2227 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2228 * write to the queue). 2229 */ 2230 int 2231 bcanput(queue_t *q, unsigned char pri) 2232 { 2233 qband_t *qbp; 2234 2235 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2236 if (!q) 2237 return (0); 2238 2239 /* Find next forward module that has a service procedure */ 2240 q = q->q_nfsrv; 2241 2242 mutex_enter(QLOCK(q)); 2243 if (pri == 0) { 2244 if (q->q_flag & QFULL) { 2245 q->q_flag |= QWANTW; 2246 mutex_exit(QLOCK(q)); 2247 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2248 "bcanput:%p %X %d", q, pri, 0); 2249 return (0); 2250 } 2251 } else { /* pri != 0 */ 2252 if (pri > q->q_nband) { 2253 /* 2254 * No band exists yet, so return success. 2255 */ 2256 mutex_exit(QLOCK(q)); 2257 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2258 "bcanput:%p %X %d", q, pri, 1); 2259 return (1); 2260 } 2261 qbp = q->q_bandp; 2262 while (--pri) 2263 qbp = qbp->qb_next; 2264 if (qbp->qb_flag & QB_FULL) { 2265 qbp->qb_flag |= QB_WANTW; 2266 mutex_exit(QLOCK(q)); 2267 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2268 "bcanput:%p %X %d", q, pri, 0); 2269 return (0); 2270 } 2271 } 2272 mutex_exit(QLOCK(q)); 2273 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2274 "bcanput:%p %X %d", q, pri, 1); 2275 return (1); 2276 } 2277 2278 /* 2279 * Put a message on a queue. 2280 * 2281 * Messages are enqueued on a priority basis. The priority classes 2282 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2283 * and B_NORMAL (type < QPCTL && band == 0). 2284 * 2285 * Add appropriate weighted data block sizes to queue count. 2286 * If queue hits high water mark then set QFULL flag. 2287 * 2288 * If QNOENAB is not set (putq is allowed to enable the queue), 2289 * enable the queue only if the message is PRIORITY, 2290 * or the QWANTR flag is set (indicating that the service procedure 2291 * is ready to read the queue. This implies that a service 2292 * procedure must NEVER put a high priority message back on its own 2293 * queue, as this would result in an infinite loop (!). 2294 */ 2295 int 2296 putq(queue_t *q, mblk_t *bp) 2297 { 2298 mblk_t *tmp; 2299 qband_t *qbp = NULL; 2300 int mcls = (int)queclass(bp); 2301 kthread_id_t freezer; 2302 int bytecnt = 0, mblkcnt = 0; 2303 2304 freezer = STREAM(q)->sd_freezer; 2305 if (freezer == curthread) { 2306 ASSERT(frozenstr(q)); 2307 ASSERT(MUTEX_HELD(QLOCK(q))); 2308 } else 2309 mutex_enter(QLOCK(q)); 2310 2311 /* 2312 * Make sanity checks and if qband structure is not yet 2313 * allocated, do so. 2314 */ 2315 if (mcls == QPCTL) { 2316 if (bp->b_band != 0) 2317 bp->b_band = 0; /* force to be correct */ 2318 } else if (bp->b_band != 0) { 2319 int i; 2320 qband_t **qbpp; 2321 2322 if (bp->b_band > q->q_nband) { 2323 2324 /* 2325 * The qband structure for this priority band is 2326 * not on the queue yet, so we have to allocate 2327 * one on the fly. It would be wasteful to 2328 * associate the qband structures with every 2329 * queue when the queues are allocated. This is 2330 * because most queues will only need the normal 2331 * band of flow which can be described entirely 2332 * by the queue itself. 2333 */ 2334 qbpp = &q->q_bandp; 2335 while (*qbpp) 2336 qbpp = &(*qbpp)->qb_next; 2337 while (bp->b_band > q->q_nband) { 2338 if ((*qbpp = allocband()) == NULL) { 2339 if (freezer != curthread) 2340 mutex_exit(QLOCK(q)); 2341 return (0); 2342 } 2343 (*qbpp)->qb_hiwat = q->q_hiwat; 2344 (*qbpp)->qb_lowat = q->q_lowat; 2345 q->q_nband++; 2346 qbpp = &(*qbpp)->qb_next; 2347 } 2348 } 2349 ASSERT(MUTEX_HELD(QLOCK(q))); 2350 qbp = q->q_bandp; 2351 i = bp->b_band; 2352 while (--i) 2353 qbp = qbp->qb_next; 2354 } 2355 2356 /* 2357 * If queue is empty, add the message and initialize the pointers. 2358 * Otherwise, adjust message pointers and queue pointers based on 2359 * the type of the message and where it belongs on the queue. Some 2360 * code is duplicated to minimize the number of conditionals and 2361 * hopefully minimize the amount of time this routine takes. 2362 */ 2363 if (!q->q_first) { 2364 bp->b_next = NULL; 2365 bp->b_prev = NULL; 2366 q->q_first = bp; 2367 q->q_last = bp; 2368 if (qbp) { 2369 qbp->qb_first = bp; 2370 qbp->qb_last = bp; 2371 } 2372 } else if (!qbp) { /* bp->b_band == 0 */ 2373 2374 /* 2375 * If queue class of message is less than or equal to 2376 * that of the last one on the queue, tack on to the end. 2377 */ 2378 tmp = q->q_last; 2379 if (mcls <= (int)queclass(tmp)) { 2380 bp->b_next = NULL; 2381 bp->b_prev = tmp; 2382 tmp->b_next = bp; 2383 q->q_last = bp; 2384 } else { 2385 tmp = q->q_first; 2386 while ((int)queclass(tmp) >= mcls) 2387 tmp = tmp->b_next; 2388 2389 /* 2390 * Insert bp before tmp. 2391 */ 2392 bp->b_next = tmp; 2393 bp->b_prev = tmp->b_prev; 2394 if (tmp->b_prev) 2395 tmp->b_prev->b_next = bp; 2396 else 2397 q->q_first = bp; 2398 tmp->b_prev = bp; 2399 } 2400 } else { /* bp->b_band != 0 */ 2401 if (qbp->qb_first) { 2402 tmp = qbp->qb_last; 2403 2404 /* 2405 * Insert bp after the last message in this band. 2406 */ 2407 bp->b_next = tmp->b_next; 2408 if (tmp->b_next) 2409 tmp->b_next->b_prev = bp; 2410 else 2411 q->q_last = bp; 2412 bp->b_prev = tmp; 2413 tmp->b_next = bp; 2414 } else { 2415 tmp = q->q_last; 2416 if ((mcls < (int)queclass(tmp)) || 2417 (bp->b_band <= tmp->b_band)) { 2418 2419 /* 2420 * Tack bp on end of queue. 2421 */ 2422 bp->b_next = NULL; 2423 bp->b_prev = tmp; 2424 tmp->b_next = bp; 2425 q->q_last = bp; 2426 } else { 2427 tmp = q->q_first; 2428 while (tmp->b_datap->db_type >= QPCTL) 2429 tmp = tmp->b_next; 2430 while (tmp->b_band >= bp->b_band) 2431 tmp = tmp->b_next; 2432 2433 /* 2434 * Insert bp before tmp. 2435 */ 2436 bp->b_next = tmp; 2437 bp->b_prev = tmp->b_prev; 2438 if (tmp->b_prev) 2439 tmp->b_prev->b_next = bp; 2440 else 2441 q->q_first = bp; 2442 tmp->b_prev = bp; 2443 } 2444 qbp->qb_first = bp; 2445 } 2446 qbp->qb_last = bp; 2447 } 2448 2449 /* Get message byte count for q_count accounting */ 2450 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2451 ADD_MBLK_SIZE(tmp, bytecnt); 2452 mblkcnt++; 2453 } 2454 2455 if (qbp) { 2456 qbp->qb_count += bytecnt; 2457 qbp->qb_mblkcnt += mblkcnt; 2458 if ((qbp->qb_count >= qbp->qb_hiwat) || 2459 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2460 qbp->qb_flag |= QB_FULL; 2461 } 2462 } else { 2463 q->q_count += bytecnt; 2464 q->q_mblkcnt += mblkcnt; 2465 if ((q->q_count >= q->q_hiwat) || 2466 (q->q_mblkcnt >= q->q_hiwat)) { 2467 q->q_flag |= QFULL; 2468 } 2469 } 2470 2471 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2472 2473 if ((mcls > QNORM) || 2474 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2475 qenable_locked(q); 2476 ASSERT(MUTEX_HELD(QLOCK(q))); 2477 if (freezer != curthread) 2478 mutex_exit(QLOCK(q)); 2479 2480 return (1); 2481 } 2482 2483 /* 2484 * Put stuff back at beginning of Q according to priority order. 2485 * See comment on putq above for details. 2486 */ 2487 int 2488 putbq(queue_t *q, mblk_t *bp) 2489 { 2490 mblk_t *tmp; 2491 qband_t *qbp = NULL; 2492 int mcls = (int)queclass(bp); 2493 kthread_id_t freezer; 2494 int bytecnt = 0, mblkcnt = 0; 2495 2496 ASSERT(q && bp); 2497 ASSERT(bp->b_next == NULL); 2498 freezer = STREAM(q)->sd_freezer; 2499 if (freezer == curthread) { 2500 ASSERT(frozenstr(q)); 2501 ASSERT(MUTEX_HELD(QLOCK(q))); 2502 } else 2503 mutex_enter(QLOCK(q)); 2504 2505 /* 2506 * Make sanity checks and if qband structure is not yet 2507 * allocated, do so. 2508 */ 2509 if (mcls == QPCTL) { 2510 if (bp->b_band != 0) 2511 bp->b_band = 0; /* force to be correct */ 2512 } else if (bp->b_band != 0) { 2513 int i; 2514 qband_t **qbpp; 2515 2516 if (bp->b_band > q->q_nband) { 2517 qbpp = &q->q_bandp; 2518 while (*qbpp) 2519 qbpp = &(*qbpp)->qb_next; 2520 while (bp->b_band > q->q_nband) { 2521 if ((*qbpp = allocband()) == NULL) { 2522 if (freezer != curthread) 2523 mutex_exit(QLOCK(q)); 2524 return (0); 2525 } 2526 (*qbpp)->qb_hiwat = q->q_hiwat; 2527 (*qbpp)->qb_lowat = q->q_lowat; 2528 q->q_nband++; 2529 qbpp = &(*qbpp)->qb_next; 2530 } 2531 } 2532 qbp = q->q_bandp; 2533 i = bp->b_band; 2534 while (--i) 2535 qbp = qbp->qb_next; 2536 } 2537 2538 /* 2539 * If queue is empty or if message is high priority, 2540 * place on the front of the queue. 2541 */ 2542 tmp = q->q_first; 2543 if ((!tmp) || (mcls == QPCTL)) { 2544 bp->b_next = tmp; 2545 if (tmp) 2546 tmp->b_prev = bp; 2547 else 2548 q->q_last = bp; 2549 q->q_first = bp; 2550 bp->b_prev = NULL; 2551 if (qbp) { 2552 qbp->qb_first = bp; 2553 qbp->qb_last = bp; 2554 } 2555 } else if (qbp) { /* bp->b_band != 0 */ 2556 tmp = qbp->qb_first; 2557 if (tmp) { 2558 2559 /* 2560 * Insert bp before the first message in this band. 2561 */ 2562 bp->b_next = tmp; 2563 bp->b_prev = tmp->b_prev; 2564 if (tmp->b_prev) 2565 tmp->b_prev->b_next = bp; 2566 else 2567 q->q_first = bp; 2568 tmp->b_prev = bp; 2569 } else { 2570 tmp = q->q_last; 2571 if ((mcls < (int)queclass(tmp)) || 2572 (bp->b_band < tmp->b_band)) { 2573 2574 /* 2575 * Tack bp on end of queue. 2576 */ 2577 bp->b_next = NULL; 2578 bp->b_prev = tmp; 2579 tmp->b_next = bp; 2580 q->q_last = bp; 2581 } else { 2582 tmp = q->q_first; 2583 while (tmp->b_datap->db_type >= QPCTL) 2584 tmp = tmp->b_next; 2585 while (tmp->b_band > bp->b_band) 2586 tmp = tmp->b_next; 2587 2588 /* 2589 * Insert bp before tmp. 2590 */ 2591 bp->b_next = tmp; 2592 bp->b_prev = tmp->b_prev; 2593 if (tmp->b_prev) 2594 tmp->b_prev->b_next = bp; 2595 else 2596 q->q_first = bp; 2597 tmp->b_prev = bp; 2598 } 2599 qbp->qb_last = bp; 2600 } 2601 qbp->qb_first = bp; 2602 } else { /* bp->b_band == 0 && !QPCTL */ 2603 2604 /* 2605 * If the queue class or band is less than that of the last 2606 * message on the queue, tack bp on the end of the queue. 2607 */ 2608 tmp = q->q_last; 2609 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2610 bp->b_next = NULL; 2611 bp->b_prev = tmp; 2612 tmp->b_next = bp; 2613 q->q_last = bp; 2614 } else { 2615 tmp = q->q_first; 2616 while (tmp->b_datap->db_type >= QPCTL) 2617 tmp = tmp->b_next; 2618 while (tmp->b_band > bp->b_band) 2619 tmp = tmp->b_next; 2620 2621 /* 2622 * Insert bp before tmp. 2623 */ 2624 bp->b_next = tmp; 2625 bp->b_prev = tmp->b_prev; 2626 if (tmp->b_prev) 2627 tmp->b_prev->b_next = bp; 2628 else 2629 q->q_first = bp; 2630 tmp->b_prev = bp; 2631 } 2632 } 2633 2634 /* Get message byte count for q_count accounting */ 2635 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2636 ADD_MBLK_SIZE(tmp, bytecnt); 2637 mblkcnt++; 2638 } 2639 if (qbp) { 2640 qbp->qb_count += bytecnt; 2641 qbp->qb_mblkcnt += mblkcnt; 2642 if ((qbp->qb_count >= qbp->qb_hiwat) || 2643 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2644 qbp->qb_flag |= QB_FULL; 2645 } 2646 } else { 2647 q->q_count += bytecnt; 2648 q->q_mblkcnt += mblkcnt; 2649 if ((q->q_count >= q->q_hiwat) || 2650 (q->q_mblkcnt >= q->q_hiwat)) { 2651 q->q_flag |= QFULL; 2652 } 2653 } 2654 2655 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2656 2657 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2658 qenable_locked(q); 2659 ASSERT(MUTEX_HELD(QLOCK(q))); 2660 if (freezer != curthread) 2661 mutex_exit(QLOCK(q)); 2662 2663 return (1); 2664 } 2665 2666 /* 2667 * Insert a message before an existing message on the queue. If the 2668 * existing message is NULL, the new messages is placed on the end of 2669 * the queue. The queue class of the new message is ignored. However, 2670 * the priority band of the new message must adhere to the following 2671 * ordering: 2672 * 2673 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2674 * 2675 * All flow control parameters are updated. 2676 * 2677 * insq can be called with the stream frozen, but other utility functions 2678 * holding QLOCK, and by streams modules without any locks/frozen. 2679 */ 2680 int 2681 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2682 { 2683 mblk_t *tmp; 2684 qband_t *qbp = NULL; 2685 int mcls = (int)queclass(mp); 2686 kthread_id_t freezer; 2687 int bytecnt = 0, mblkcnt = 0; 2688 2689 freezer = STREAM(q)->sd_freezer; 2690 if (freezer == curthread) { 2691 ASSERT(frozenstr(q)); 2692 ASSERT(MUTEX_HELD(QLOCK(q))); 2693 } else if (MUTEX_HELD(QLOCK(q))) { 2694 /* Don't drop lock on exit */ 2695 freezer = curthread; 2696 } else 2697 mutex_enter(QLOCK(q)); 2698 2699 if (mcls == QPCTL) { 2700 if (mp->b_band != 0) 2701 mp->b_band = 0; /* force to be correct */ 2702 if (emp && emp->b_prev && 2703 (emp->b_prev->b_datap->db_type < QPCTL)) 2704 goto badord; 2705 } 2706 if (emp) { 2707 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2708 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2709 (emp->b_prev->b_band < mp->b_band))) { 2710 goto badord; 2711 } 2712 } else { 2713 tmp = q->q_last; 2714 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2715 badord: 2716 cmn_err(CE_WARN, 2717 "insq: attempt to insert message out of order " 2718 "on q %p", (void *)q); 2719 if (freezer != curthread) 2720 mutex_exit(QLOCK(q)); 2721 return (0); 2722 } 2723 } 2724 2725 if (mp->b_band != 0) { 2726 int i; 2727 qband_t **qbpp; 2728 2729 if (mp->b_band > q->q_nband) { 2730 qbpp = &q->q_bandp; 2731 while (*qbpp) 2732 qbpp = &(*qbpp)->qb_next; 2733 while (mp->b_band > q->q_nband) { 2734 if ((*qbpp = allocband()) == NULL) { 2735 if (freezer != curthread) 2736 mutex_exit(QLOCK(q)); 2737 return (0); 2738 } 2739 (*qbpp)->qb_hiwat = q->q_hiwat; 2740 (*qbpp)->qb_lowat = q->q_lowat; 2741 q->q_nband++; 2742 qbpp = &(*qbpp)->qb_next; 2743 } 2744 } 2745 qbp = q->q_bandp; 2746 i = mp->b_band; 2747 while (--i) 2748 qbp = qbp->qb_next; 2749 } 2750 2751 if ((mp->b_next = emp) != NULL) { 2752 if ((mp->b_prev = emp->b_prev) != NULL) 2753 emp->b_prev->b_next = mp; 2754 else 2755 q->q_first = mp; 2756 emp->b_prev = mp; 2757 } else { 2758 if ((mp->b_prev = q->q_last) != NULL) 2759 q->q_last->b_next = mp; 2760 else 2761 q->q_first = mp; 2762 q->q_last = mp; 2763 } 2764 2765 /* Get mblk and byte count for q_count accounting */ 2766 for (tmp = mp; tmp; tmp = tmp->b_cont) { 2767 ADD_MBLK_SIZE(tmp, bytecnt); 2768 mblkcnt++; 2769 } 2770 2771 if (qbp) { /* adjust qband pointers and count */ 2772 if (!qbp->qb_first) { 2773 qbp->qb_first = mp; 2774 qbp->qb_last = mp; 2775 } else { 2776 if (mp->b_prev == NULL || (mp->b_prev != NULL && 2777 (mp->b_prev->b_band != mp->b_band))) 2778 qbp->qb_first = mp; 2779 else if (mp->b_next == NULL || (mp->b_next != NULL && 2780 (mp->b_next->b_band != mp->b_band))) 2781 qbp->qb_last = mp; 2782 } 2783 qbp->qb_count += bytecnt; 2784 qbp->qb_mblkcnt += mblkcnt; 2785 if ((qbp->qb_count >= qbp->qb_hiwat) || 2786 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2787 qbp->qb_flag |= QB_FULL; 2788 } 2789 } else { 2790 q->q_count += bytecnt; 2791 q->q_mblkcnt += mblkcnt; 2792 if ((q->q_count >= q->q_hiwat) || 2793 (q->q_mblkcnt >= q->q_hiwat)) { 2794 q->q_flag |= QFULL; 2795 } 2796 } 2797 2798 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 2799 2800 if (canenable(q) && (q->q_flag & QWANTR)) 2801 qenable_locked(q); 2802 2803 ASSERT(MUTEX_HELD(QLOCK(q))); 2804 if (freezer != curthread) 2805 mutex_exit(QLOCK(q)); 2806 2807 return (1); 2808 } 2809 2810 /* 2811 * Create and put a control message on queue. 2812 */ 2813 int 2814 putctl(queue_t *q, int type) 2815 { 2816 mblk_t *bp; 2817 2818 if ((datamsg(type) && (type != M_DELAY)) || 2819 (bp = allocb_tryhard(0)) == NULL) 2820 return (0); 2821 bp->b_datap->db_type = (unsigned char) type; 2822 2823 put(q, bp); 2824 2825 return (1); 2826 } 2827 2828 /* 2829 * Control message with a single-byte parameter 2830 */ 2831 int 2832 putctl1(queue_t *q, int type, int param) 2833 { 2834 mblk_t *bp; 2835 2836 if ((datamsg(type) && (type != M_DELAY)) || 2837 (bp = allocb_tryhard(1)) == NULL) 2838 return (0); 2839 bp->b_datap->db_type = (unsigned char)type; 2840 *bp->b_wptr++ = (unsigned char)param; 2841 2842 put(q, bp); 2843 2844 return (1); 2845 } 2846 2847 int 2848 putnextctl1(queue_t *q, int type, int param) 2849 { 2850 mblk_t *bp; 2851 2852 if ((datamsg(type) && (type != M_DELAY)) || 2853 ((bp = allocb_tryhard(1)) == NULL)) 2854 return (0); 2855 2856 bp->b_datap->db_type = (unsigned char)type; 2857 *bp->b_wptr++ = (unsigned char)param; 2858 2859 putnext(q, bp); 2860 2861 return (1); 2862 } 2863 2864 int 2865 putnextctl(queue_t *q, int type) 2866 { 2867 mblk_t *bp; 2868 2869 if ((datamsg(type) && (type != M_DELAY)) || 2870 ((bp = allocb_tryhard(0)) == NULL)) 2871 return (0); 2872 bp->b_datap->db_type = (unsigned char)type; 2873 2874 putnext(q, bp); 2875 2876 return (1); 2877 } 2878 2879 /* 2880 * Return the queue upstream from this one 2881 */ 2882 queue_t * 2883 backq(queue_t *q) 2884 { 2885 q = _OTHERQ(q); 2886 if (q->q_next) { 2887 q = q->q_next; 2888 return (_OTHERQ(q)); 2889 } 2890 return (NULL); 2891 } 2892 2893 /* 2894 * Send a block back up the queue in reverse from this 2895 * one (e.g. to respond to ioctls) 2896 */ 2897 void 2898 qreply(queue_t *q, mblk_t *bp) 2899 { 2900 ASSERT(q && bp); 2901 2902 putnext(_OTHERQ(q), bp); 2903 } 2904 2905 /* 2906 * Streams Queue Scheduling 2907 * 2908 * Queues are enabled through qenable() when they have messages to 2909 * process. They are serviced by queuerun(), which runs each enabled 2910 * queue's service procedure. The call to queuerun() is processor 2911 * dependent - the general principle is that it be run whenever a queue 2912 * is enabled but before returning to user level. For system calls, 2913 * the function runqueues() is called if their action causes a queue 2914 * to be enabled. For device interrupts, queuerun() should be 2915 * called before returning from the last level of interrupt. Beyond 2916 * this, no timing assumptions should be made about queue scheduling. 2917 */ 2918 2919 /* 2920 * Enable a queue: put it on list of those whose service procedures are 2921 * ready to run and set up the scheduling mechanism. 2922 * The broadcast is done outside the mutex -> to avoid the woken thread 2923 * from contending with the mutex. This is OK 'cos the queue has been 2924 * enqueued on the runlist and flagged safely at this point. 2925 */ 2926 void 2927 qenable(queue_t *q) 2928 { 2929 mutex_enter(QLOCK(q)); 2930 qenable_locked(q); 2931 mutex_exit(QLOCK(q)); 2932 } 2933 /* 2934 * Return number of messages on queue 2935 */ 2936 int 2937 qsize(queue_t *qp) 2938 { 2939 int count = 0; 2940 mblk_t *mp; 2941 2942 mutex_enter(QLOCK(qp)); 2943 for (mp = qp->q_first; mp; mp = mp->b_next) 2944 count++; 2945 mutex_exit(QLOCK(qp)); 2946 return (count); 2947 } 2948 2949 /* 2950 * noenable - set queue so that putq() will not enable it. 2951 * enableok - set queue so that putq() can enable it. 2952 */ 2953 void 2954 noenable(queue_t *q) 2955 { 2956 mutex_enter(QLOCK(q)); 2957 q->q_flag |= QNOENB; 2958 mutex_exit(QLOCK(q)); 2959 } 2960 2961 void 2962 enableok(queue_t *q) 2963 { 2964 mutex_enter(QLOCK(q)); 2965 q->q_flag &= ~QNOENB; 2966 mutex_exit(QLOCK(q)); 2967 } 2968 2969 /* 2970 * Set queue fields. 2971 */ 2972 int 2973 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 2974 { 2975 qband_t *qbp = NULL; 2976 queue_t *wrq; 2977 int error = 0; 2978 kthread_id_t freezer; 2979 2980 freezer = STREAM(q)->sd_freezer; 2981 if (freezer == curthread) { 2982 ASSERT(frozenstr(q)); 2983 ASSERT(MUTEX_HELD(QLOCK(q))); 2984 } else 2985 mutex_enter(QLOCK(q)); 2986 2987 if (what >= QBAD) { 2988 error = EINVAL; 2989 goto done; 2990 } 2991 if (pri != 0) { 2992 int i; 2993 qband_t **qbpp; 2994 2995 if (pri > q->q_nband) { 2996 qbpp = &q->q_bandp; 2997 while (*qbpp) 2998 qbpp = &(*qbpp)->qb_next; 2999 while (pri > q->q_nband) { 3000 if ((*qbpp = allocband()) == NULL) { 3001 error = EAGAIN; 3002 goto done; 3003 } 3004 (*qbpp)->qb_hiwat = q->q_hiwat; 3005 (*qbpp)->qb_lowat = q->q_lowat; 3006 q->q_nband++; 3007 qbpp = &(*qbpp)->qb_next; 3008 } 3009 } 3010 qbp = q->q_bandp; 3011 i = pri; 3012 while (--i) 3013 qbp = qbp->qb_next; 3014 } 3015 switch (what) { 3016 3017 case QHIWAT: 3018 if (qbp) 3019 qbp->qb_hiwat = (size_t)val; 3020 else 3021 q->q_hiwat = (size_t)val; 3022 break; 3023 3024 case QLOWAT: 3025 if (qbp) 3026 qbp->qb_lowat = (size_t)val; 3027 else 3028 q->q_lowat = (size_t)val; 3029 break; 3030 3031 case QMAXPSZ: 3032 if (qbp) 3033 error = EINVAL; 3034 else 3035 q->q_maxpsz = (ssize_t)val; 3036 3037 /* 3038 * Performance concern, strwrite looks at the module below 3039 * the stream head for the maxpsz each time it does a write 3040 * we now cache it at the stream head. Check to see if this 3041 * queue is sitting directly below the stream head. 3042 */ 3043 wrq = STREAM(q)->sd_wrq; 3044 if (q != wrq->q_next) 3045 break; 3046 3047 /* 3048 * If the stream is not frozen drop the current QLOCK and 3049 * acquire the sd_wrq QLOCK which protects sd_qn_* 3050 */ 3051 if (freezer != curthread) { 3052 mutex_exit(QLOCK(q)); 3053 mutex_enter(QLOCK(wrq)); 3054 } 3055 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3056 3057 if (strmsgsz != 0) { 3058 if (val == INFPSZ) 3059 val = strmsgsz; 3060 else { 3061 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3062 val = MIN(PIPE_BUF, val); 3063 else 3064 val = MIN(strmsgsz, val); 3065 } 3066 } 3067 STREAM(q)->sd_qn_maxpsz = val; 3068 if (freezer != curthread) { 3069 mutex_exit(QLOCK(wrq)); 3070 mutex_enter(QLOCK(q)); 3071 } 3072 break; 3073 3074 case QMINPSZ: 3075 if (qbp) 3076 error = EINVAL; 3077 else 3078 q->q_minpsz = (ssize_t)val; 3079 3080 /* 3081 * Performance concern, strwrite looks at the module below 3082 * the stream head for the maxpsz each time it does a write 3083 * we now cache it at the stream head. Check to see if this 3084 * queue is sitting directly below the stream head. 3085 */ 3086 wrq = STREAM(q)->sd_wrq; 3087 if (q != wrq->q_next) 3088 break; 3089 3090 /* 3091 * If the stream is not frozen drop the current QLOCK and 3092 * acquire the sd_wrq QLOCK which protects sd_qn_* 3093 */ 3094 if (freezer != curthread) { 3095 mutex_exit(QLOCK(q)); 3096 mutex_enter(QLOCK(wrq)); 3097 } 3098 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3099 3100 if (freezer != curthread) { 3101 mutex_exit(QLOCK(wrq)); 3102 mutex_enter(QLOCK(q)); 3103 } 3104 break; 3105 3106 case QSTRUIOT: 3107 if (qbp) 3108 error = EINVAL; 3109 else 3110 q->q_struiot = (ushort_t)val; 3111 break; 3112 3113 case QCOUNT: 3114 case QFIRST: 3115 case QLAST: 3116 case QFLAG: 3117 error = EPERM; 3118 break; 3119 3120 default: 3121 error = EINVAL; 3122 break; 3123 } 3124 done: 3125 if (freezer != curthread) 3126 mutex_exit(QLOCK(q)); 3127 return (error); 3128 } 3129 3130 /* 3131 * Get queue fields. 3132 */ 3133 int 3134 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3135 { 3136 qband_t *qbp = NULL; 3137 int error = 0; 3138 kthread_id_t freezer; 3139 3140 freezer = STREAM(q)->sd_freezer; 3141 if (freezer == curthread) { 3142 ASSERT(frozenstr(q)); 3143 ASSERT(MUTEX_HELD(QLOCK(q))); 3144 } else 3145 mutex_enter(QLOCK(q)); 3146 if (what >= QBAD) { 3147 error = EINVAL; 3148 goto done; 3149 } 3150 if (pri != 0) { 3151 int i; 3152 qband_t **qbpp; 3153 3154 if (pri > q->q_nband) { 3155 qbpp = &q->q_bandp; 3156 while (*qbpp) 3157 qbpp = &(*qbpp)->qb_next; 3158 while (pri > q->q_nband) { 3159 if ((*qbpp = allocband()) == NULL) { 3160 error = EAGAIN; 3161 goto done; 3162 } 3163 (*qbpp)->qb_hiwat = q->q_hiwat; 3164 (*qbpp)->qb_lowat = q->q_lowat; 3165 q->q_nband++; 3166 qbpp = &(*qbpp)->qb_next; 3167 } 3168 } 3169 qbp = q->q_bandp; 3170 i = pri; 3171 while (--i) 3172 qbp = qbp->qb_next; 3173 } 3174 switch (what) { 3175 case QHIWAT: 3176 if (qbp) 3177 *(size_t *)valp = qbp->qb_hiwat; 3178 else 3179 *(size_t *)valp = q->q_hiwat; 3180 break; 3181 3182 case QLOWAT: 3183 if (qbp) 3184 *(size_t *)valp = qbp->qb_lowat; 3185 else 3186 *(size_t *)valp = q->q_lowat; 3187 break; 3188 3189 case QMAXPSZ: 3190 if (qbp) 3191 error = EINVAL; 3192 else 3193 *(ssize_t *)valp = q->q_maxpsz; 3194 break; 3195 3196 case QMINPSZ: 3197 if (qbp) 3198 error = EINVAL; 3199 else 3200 *(ssize_t *)valp = q->q_minpsz; 3201 break; 3202 3203 case QCOUNT: 3204 if (qbp) 3205 *(size_t *)valp = qbp->qb_count; 3206 else 3207 *(size_t *)valp = q->q_count; 3208 break; 3209 3210 case QFIRST: 3211 if (qbp) 3212 *(mblk_t **)valp = qbp->qb_first; 3213 else 3214 *(mblk_t **)valp = q->q_first; 3215 break; 3216 3217 case QLAST: 3218 if (qbp) 3219 *(mblk_t **)valp = qbp->qb_last; 3220 else 3221 *(mblk_t **)valp = q->q_last; 3222 break; 3223 3224 case QFLAG: 3225 if (qbp) 3226 *(uint_t *)valp = qbp->qb_flag; 3227 else 3228 *(uint_t *)valp = q->q_flag; 3229 break; 3230 3231 case QSTRUIOT: 3232 if (qbp) 3233 error = EINVAL; 3234 else 3235 *(short *)valp = q->q_struiot; 3236 break; 3237 3238 default: 3239 error = EINVAL; 3240 break; 3241 } 3242 done: 3243 if (freezer != curthread) 3244 mutex_exit(QLOCK(q)); 3245 return (error); 3246 } 3247 3248 /* 3249 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3250 * QWANTWSYNC or QWANTR or QWANTW, 3251 * 3252 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3253 * deferred wakeup will be done. Also if strpoll() in progress then a 3254 * deferred pollwakeup will be done. 3255 */ 3256 void 3257 strwakeq(queue_t *q, int flag) 3258 { 3259 stdata_t *stp = STREAM(q); 3260 pollhead_t *pl; 3261 3262 mutex_enter(&stp->sd_lock); 3263 pl = &stp->sd_pollist; 3264 if (flag & QWANTWSYNC) { 3265 ASSERT(!(q->q_flag & QREADR)); 3266 if (stp->sd_flag & WSLEEP) { 3267 stp->sd_flag &= ~WSLEEP; 3268 cv_broadcast(&stp->sd_wrq->q_wait); 3269 } else { 3270 stp->sd_wakeq |= WSLEEP; 3271 } 3272 3273 mutex_exit(&stp->sd_lock); 3274 pollwakeup(pl, POLLWRNORM); 3275 mutex_enter(&stp->sd_lock); 3276 3277 if (stp->sd_sigflags & S_WRNORM) 3278 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3279 } else if (flag & QWANTR) { 3280 if (stp->sd_flag & RSLEEP) { 3281 stp->sd_flag &= ~RSLEEP; 3282 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3283 } else { 3284 stp->sd_wakeq |= RSLEEP; 3285 } 3286 3287 mutex_exit(&stp->sd_lock); 3288 pollwakeup(pl, POLLIN | POLLRDNORM); 3289 mutex_enter(&stp->sd_lock); 3290 3291 { 3292 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3293 3294 if (events) 3295 strsendsig(stp->sd_siglist, events, 0, 0); 3296 } 3297 } else { 3298 if (stp->sd_flag & WSLEEP) { 3299 stp->sd_flag &= ~WSLEEP; 3300 cv_broadcast(&stp->sd_wrq->q_wait); 3301 } 3302 3303 mutex_exit(&stp->sd_lock); 3304 pollwakeup(pl, POLLWRNORM); 3305 mutex_enter(&stp->sd_lock); 3306 3307 if (stp->sd_sigflags & S_WRNORM) 3308 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3309 } 3310 mutex_exit(&stp->sd_lock); 3311 } 3312 3313 int 3314 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3315 { 3316 stdata_t *stp = STREAM(q); 3317 int typ = STRUIOT_STANDARD; 3318 uio_t *uiop = &dp->d_uio; 3319 dblk_t *dbp; 3320 ssize_t uiocnt; 3321 ssize_t cnt; 3322 unsigned char *ptr; 3323 ssize_t resid; 3324 int error = 0; 3325 on_trap_data_t otd; 3326 queue_t *stwrq; 3327 3328 /* 3329 * Plumbing may change while taking the type so store the 3330 * queue in a temporary variable. It doesn't matter even 3331 * if the we take the type from the previous plumbing, 3332 * that's because if the plumbing has changed when we were 3333 * holding the queue in a temporary variable, we can continue 3334 * processing the message the way it would have been processed 3335 * in the old plumbing, without any side effects but a bit 3336 * extra processing for partial ip header checksum. 3337 * 3338 * This has been done to avoid holding the sd_lock which is 3339 * very hot. 3340 */ 3341 3342 stwrq = stp->sd_struiowrq; 3343 if (stwrq) 3344 typ = stwrq->q_struiot; 3345 3346 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3347 dbp = mp->b_datap; 3348 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3349 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3350 cnt = MIN(uiocnt, uiop->uio_resid); 3351 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3352 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3353 /* 3354 * Either this mblk has already been processed 3355 * or there is no more room in this mblk (?). 3356 */ 3357 continue; 3358 } 3359 switch (typ) { 3360 case STRUIOT_STANDARD: 3361 if (noblock) { 3362 if (on_trap(&otd, OT_DATA_ACCESS)) { 3363 no_trap(); 3364 error = EWOULDBLOCK; 3365 goto out; 3366 } 3367 } 3368 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3369 if (noblock) 3370 no_trap(); 3371 goto out; 3372 } 3373 if (noblock) 3374 no_trap(); 3375 break; 3376 3377 default: 3378 error = EIO; 3379 goto out; 3380 } 3381 dbp->db_struioflag |= STRUIO_DONE; 3382 dbp->db_cksumstuff += cnt; 3383 } 3384 out: 3385 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3386 /* 3387 * A fault has occured and some bytes were moved to the 3388 * current mblk, the uio_t has already been updated by 3389 * the appropriate uio routine, so also update the mblk 3390 * to reflect this in case this same mblk chain is used 3391 * again (after the fault has been handled). 3392 */ 3393 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3394 if (uiocnt >= resid) 3395 dbp->db_cksumstuff += resid; 3396 } 3397 return (error); 3398 } 3399 3400 /* 3401 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3402 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3403 * that removeq() will not try to close the queue while a thread is inside the 3404 * queue. 3405 */ 3406 static boolean_t 3407 rwnext_enter(queue_t *qp) 3408 { 3409 mutex_enter(QLOCK(qp)); 3410 if (qp->q_flag & QWCLOSE) { 3411 mutex_exit(QLOCK(qp)); 3412 return (B_FALSE); 3413 } 3414 qp->q_rwcnt++; 3415 ASSERT(qp->q_rwcnt != 0); 3416 mutex_exit(QLOCK(qp)); 3417 return (B_TRUE); 3418 } 3419 3420 /* 3421 * Decrease the count of threads running in sync stream queue and wake up any 3422 * threads blocked in removeq(). 3423 */ 3424 static void 3425 rwnext_exit(queue_t *qp) 3426 { 3427 mutex_enter(QLOCK(qp)); 3428 qp->q_rwcnt--; 3429 if (qp->q_flag & QWANTRMQSYNC) { 3430 qp->q_flag &= ~QWANTRMQSYNC; 3431 cv_broadcast(&qp->q_wait); 3432 } 3433 mutex_exit(QLOCK(qp)); 3434 } 3435 3436 /* 3437 * The purpose of rwnext() is to call the rw procedure of the next 3438 * (downstream) modules queue. 3439 * 3440 * treated as put entrypoint for perimeter syncronization. 3441 * 3442 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3443 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3444 * not matter if any regular put entrypoints have been already entered. We 3445 * can't increment one of the sq_putcounts (instead of sq_count) because 3446 * qwait_rw won't know which counter to decrement. 3447 * 3448 * It would be reasonable to add the lockless FASTPUT logic. 3449 */ 3450 int 3451 rwnext(queue_t *qp, struiod_t *dp) 3452 { 3453 queue_t *nqp; 3454 syncq_t *sq; 3455 uint16_t count; 3456 uint16_t flags; 3457 struct qinit *qi; 3458 int (*proc)(); 3459 struct stdata *stp; 3460 int isread; 3461 int rval; 3462 3463 stp = STREAM(qp); 3464 /* 3465 * Prevent q_next from changing by holding sd_lock until acquiring 3466 * SQLOCK. Note that a read-side rwnext from the streamhead will 3467 * already have sd_lock acquired. In either case sd_lock is always 3468 * released after acquiring SQLOCK. 3469 * 3470 * The streamhead read-side holding sd_lock when calling rwnext is 3471 * required to prevent a race condition were M_DATA mblks flowing 3472 * up the read-side of the stream could be bypassed by a rwnext() 3473 * down-call. In this case sd_lock acts as the streamhead perimeter. 3474 */ 3475 if ((nqp = _WR(qp)) == qp) { 3476 isread = 0; 3477 mutex_enter(&stp->sd_lock); 3478 qp = nqp->q_next; 3479 } else { 3480 isread = 1; 3481 if (nqp != stp->sd_wrq) 3482 /* Not streamhead */ 3483 mutex_enter(&stp->sd_lock); 3484 qp = _RD(nqp->q_next); 3485 } 3486 qi = qp->q_qinfo; 3487 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3488 /* 3489 * Not a synchronous module or no r/w procedure for this 3490 * queue, so just return EINVAL and let the caller handle it. 3491 */ 3492 mutex_exit(&stp->sd_lock); 3493 return (EINVAL); 3494 } 3495 3496 if (rwnext_enter(qp) == B_FALSE) { 3497 mutex_exit(&stp->sd_lock); 3498 return (EINVAL); 3499 } 3500 3501 sq = qp->q_syncq; 3502 mutex_enter(SQLOCK(sq)); 3503 mutex_exit(&stp->sd_lock); 3504 count = sq->sq_count; 3505 flags = sq->sq_flags; 3506 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3507 3508 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3509 /* 3510 * if this queue is being closed, return. 3511 */ 3512 if (qp->q_flag & QWCLOSE) { 3513 mutex_exit(SQLOCK(sq)); 3514 rwnext_exit(qp); 3515 return (EINVAL); 3516 } 3517 3518 /* 3519 * Wait until we can enter the inner perimeter. 3520 */ 3521 sq->sq_flags = flags | SQ_WANTWAKEUP; 3522 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3523 count = sq->sq_count; 3524 flags = sq->sq_flags; 3525 } 3526 3527 if (isread == 0 && stp->sd_struiowrq == NULL || 3528 isread == 1 && stp->sd_struiordq == NULL) { 3529 /* 3530 * Stream plumbing changed while waiting for inner perimeter 3531 * so just return EINVAL and let the caller handle it. 3532 */ 3533 mutex_exit(SQLOCK(sq)); 3534 rwnext_exit(qp); 3535 return (EINVAL); 3536 } 3537 if (!(flags & SQ_CIPUT)) 3538 sq->sq_flags = flags | SQ_EXCL; 3539 sq->sq_count = count + 1; 3540 ASSERT(sq->sq_count != 0); /* Wraparound */ 3541 /* 3542 * Note: The only message ordering guarantee that rwnext() makes is 3543 * for the write queue flow-control case. All others (r/w queue 3544 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3545 * the queue's rw procedure. This could be genralized here buy 3546 * running the queue's service procedure, but that wouldn't be 3547 * the most efficent for all cases. 3548 */ 3549 mutex_exit(SQLOCK(sq)); 3550 if (! isread && (qp->q_flag & QFULL)) { 3551 /* 3552 * Write queue may be flow controlled. If so, 3553 * mark the queue for wakeup when it's not. 3554 */ 3555 mutex_enter(QLOCK(qp)); 3556 if (qp->q_flag & QFULL) { 3557 qp->q_flag |= QWANTWSYNC; 3558 mutex_exit(QLOCK(qp)); 3559 rval = EWOULDBLOCK; 3560 goto out; 3561 } 3562 mutex_exit(QLOCK(qp)); 3563 } 3564 3565 if (! isread && dp->d_mp) 3566 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3567 dp->d_mp->b_datap->db_base); 3568 3569 rval = (*proc)(qp, dp); 3570 3571 if (isread && dp->d_mp) 3572 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3573 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3574 out: 3575 /* 3576 * The queue is protected from being freed by sq_count, so it is 3577 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3578 */ 3579 rwnext_exit(qp); 3580 3581 mutex_enter(SQLOCK(sq)); 3582 flags = sq->sq_flags; 3583 ASSERT(sq->sq_count != 0); 3584 sq->sq_count--; 3585 if (flags & SQ_TAIL) { 3586 putnext_tail(sq, qp, flags); 3587 /* 3588 * The only purpose of this ASSERT is to preserve calling stack 3589 * in DEBUG kernel. 3590 */ 3591 ASSERT(flags & SQ_TAIL); 3592 return (rval); 3593 } 3594 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3595 /* 3596 * Safe to always drop SQ_EXCL: 3597 * Not SQ_CIPUT means we set SQ_EXCL above 3598 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3599 * did a qwriter(INNER) in which case nobody else 3600 * is in the inner perimeter and we are exiting. 3601 * 3602 * I would like to make the following assertion: 3603 * 3604 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3605 * sq->sq_count == 0); 3606 * 3607 * which indicates that if we are both putshared and exclusive, 3608 * we became exclusive while executing the putproc, and the only 3609 * claim on the syncq was the one we dropped a few lines above. 3610 * But other threads that enter putnext while the syncq is exclusive 3611 * need to make a claim as they may need to drop SQLOCK in the 3612 * has_writers case to avoid deadlocks. If these threads are 3613 * delayed or preempted, it is possible that the writer thread can 3614 * find out that there are other claims making the (sq_count == 0) 3615 * test invalid. 3616 */ 3617 3618 sq->sq_flags = flags & ~SQ_EXCL; 3619 if (sq->sq_flags & SQ_WANTWAKEUP) { 3620 sq->sq_flags &= ~SQ_WANTWAKEUP; 3621 cv_broadcast(&sq->sq_wait); 3622 } 3623 mutex_exit(SQLOCK(sq)); 3624 return (rval); 3625 } 3626 3627 /* 3628 * The purpose of infonext() is to call the info procedure of the next 3629 * (downstream) modules queue. 3630 * 3631 * treated as put entrypoint for perimeter syncronization. 3632 * 3633 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3634 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3635 * it does not matter if any regular put entrypoints have been already 3636 * entered. 3637 */ 3638 int 3639 infonext(queue_t *qp, infod_t *idp) 3640 { 3641 queue_t *nqp; 3642 syncq_t *sq; 3643 uint16_t count; 3644 uint16_t flags; 3645 struct qinit *qi; 3646 int (*proc)(); 3647 struct stdata *stp; 3648 int rval; 3649 3650 stp = STREAM(qp); 3651 /* 3652 * Prevent q_next from changing by holding sd_lock until 3653 * acquiring SQLOCK. 3654 */ 3655 mutex_enter(&stp->sd_lock); 3656 if ((nqp = _WR(qp)) == qp) { 3657 qp = nqp->q_next; 3658 } else { 3659 qp = _RD(nqp->q_next); 3660 } 3661 qi = qp->q_qinfo; 3662 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3663 mutex_exit(&stp->sd_lock); 3664 return (EINVAL); 3665 } 3666 sq = qp->q_syncq; 3667 mutex_enter(SQLOCK(sq)); 3668 mutex_exit(&stp->sd_lock); 3669 count = sq->sq_count; 3670 flags = sq->sq_flags; 3671 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3672 3673 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3674 /* 3675 * Wait until we can enter the inner perimeter. 3676 */ 3677 sq->sq_flags = flags | SQ_WANTWAKEUP; 3678 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3679 count = sq->sq_count; 3680 flags = sq->sq_flags; 3681 } 3682 3683 if (! (flags & SQ_CIPUT)) 3684 sq->sq_flags = flags | SQ_EXCL; 3685 sq->sq_count = count + 1; 3686 ASSERT(sq->sq_count != 0); /* Wraparound */ 3687 mutex_exit(SQLOCK(sq)); 3688 3689 rval = (*proc)(qp, idp); 3690 3691 mutex_enter(SQLOCK(sq)); 3692 flags = sq->sq_flags; 3693 ASSERT(sq->sq_count != 0); 3694 sq->sq_count--; 3695 if (flags & SQ_TAIL) { 3696 putnext_tail(sq, qp, flags); 3697 /* 3698 * The only purpose of this ASSERT is to preserve calling stack 3699 * in DEBUG kernel. 3700 */ 3701 ASSERT(flags & SQ_TAIL); 3702 return (rval); 3703 } 3704 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3705 /* 3706 * XXXX 3707 * I am not certain the next comment is correct here. I need to consider 3708 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3709 * might cause other problems. It just might be safer to drop it if 3710 * !SQ_CIPUT because that is when we set it. 3711 */ 3712 /* 3713 * Safe to always drop SQ_EXCL: 3714 * Not SQ_CIPUT means we set SQ_EXCL above 3715 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3716 * did a qwriter(INNER) in which case nobody else 3717 * is in the inner perimeter and we are exiting. 3718 * 3719 * I would like to make the following assertion: 3720 * 3721 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3722 * sq->sq_count == 0); 3723 * 3724 * which indicates that if we are both putshared and exclusive, 3725 * we became exclusive while executing the putproc, and the only 3726 * claim on the syncq was the one we dropped a few lines above. 3727 * But other threads that enter putnext while the syncq is exclusive 3728 * need to make a claim as they may need to drop SQLOCK in the 3729 * has_writers case to avoid deadlocks. If these threads are 3730 * delayed or preempted, it is possible that the writer thread can 3731 * find out that there are other claims making the (sq_count == 0) 3732 * test invalid. 3733 */ 3734 3735 sq->sq_flags = flags & ~SQ_EXCL; 3736 mutex_exit(SQLOCK(sq)); 3737 return (rval); 3738 } 3739 3740 /* 3741 * Return nonzero if the queue is responsible for struio(), else return 0. 3742 */ 3743 int 3744 isuioq(queue_t *q) 3745 { 3746 if (q->q_flag & QREADR) 3747 return (STREAM(q)->sd_struiordq == q); 3748 else 3749 return (STREAM(q)->sd_struiowrq == q); 3750 } 3751 3752 #if defined(__sparc) 3753 int disable_putlocks = 0; 3754 #else 3755 int disable_putlocks = 1; 3756 #endif 3757 3758 /* 3759 * called by create_putlock. 3760 */ 3761 static void 3762 create_syncq_putlocks(queue_t *q) 3763 { 3764 syncq_t *sq = q->q_syncq; 3765 ciputctrl_t *cip; 3766 int i; 3767 3768 ASSERT(sq != NULL); 3769 3770 ASSERT(disable_putlocks == 0); 3771 ASSERT(n_ciputctrl >= min_n_ciputctrl); 3772 ASSERT(ciputctrl_cache != NULL); 3773 3774 if (!(sq->sq_type & SQ_CIPUT)) 3775 return; 3776 3777 for (i = 0; i <= 1; i++) { 3778 if (sq->sq_ciputctrl == NULL) { 3779 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3780 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3781 mutex_enter(SQLOCK(sq)); 3782 if (sq->sq_ciputctrl != NULL) { 3783 mutex_exit(SQLOCK(sq)); 3784 kmem_cache_free(ciputctrl_cache, cip); 3785 } else { 3786 ASSERT(sq->sq_nciputctrl == 0); 3787 sq->sq_nciputctrl = n_ciputctrl - 1; 3788 /* 3789 * putnext checks sq_ciputctrl without holding 3790 * SQLOCK. if it is not NULL putnext assumes 3791 * sq_nciputctrl is initialized. membar below 3792 * insures that. 3793 */ 3794 membar_producer(); 3795 sq->sq_ciputctrl = cip; 3796 mutex_exit(SQLOCK(sq)); 3797 } 3798 } 3799 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 3800 if (i == 1) 3801 break; 3802 q = _OTHERQ(q); 3803 if (!(q->q_flag & QPERQ)) { 3804 ASSERT(sq == q->q_syncq); 3805 break; 3806 } 3807 ASSERT(q->q_syncq != NULL); 3808 ASSERT(sq != q->q_syncq); 3809 sq = q->q_syncq; 3810 ASSERT(sq->sq_type & SQ_CIPUT); 3811 } 3812 } 3813 3814 /* 3815 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 3816 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 3817 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 3818 * starting from q and down to the driver. 3819 * 3820 * This should be called after the affected queues are part of stream 3821 * geometry. It should be called from driver/module open routine after 3822 * qprocson() call. It is also called from nfs syscall where it is known that 3823 * stream is configured and won't change its geometry during create_putlock 3824 * call. 3825 * 3826 * caller normally uses 0 value for the stream argument to speed up MT putnext 3827 * into the perimeter of q for example because its perimeter is per module 3828 * (e.g. IP). 3829 * 3830 * caller normally uses non 0 value for the stream argument to hint the system 3831 * that the stream of q is a very contended global system stream 3832 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 3833 * particularly MT hot. 3834 * 3835 * Caller insures stream plumbing won't happen while we are here and therefore 3836 * q_next can be safely used. 3837 */ 3838 3839 void 3840 create_putlocks(queue_t *q, int stream) 3841 { 3842 ciputctrl_t *cip; 3843 struct stdata *stp = STREAM(q); 3844 3845 q = _WR(q); 3846 ASSERT(stp != NULL); 3847 3848 if (disable_putlocks != 0) 3849 return; 3850 3851 if (n_ciputctrl < min_n_ciputctrl) 3852 return; 3853 3854 ASSERT(ciputctrl_cache != NULL); 3855 3856 if (stream != 0 && stp->sd_ciputctrl == NULL) { 3857 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3858 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3859 mutex_enter(&stp->sd_lock); 3860 if (stp->sd_ciputctrl != NULL) { 3861 mutex_exit(&stp->sd_lock); 3862 kmem_cache_free(ciputctrl_cache, cip); 3863 } else { 3864 ASSERT(stp->sd_nciputctrl == 0); 3865 stp->sd_nciputctrl = n_ciputctrl - 1; 3866 /* 3867 * putnext checks sd_ciputctrl without holding 3868 * sd_lock. if it is not NULL putnext assumes 3869 * sd_nciputctrl is initialized. membar below 3870 * insures that. 3871 */ 3872 membar_producer(); 3873 stp->sd_ciputctrl = cip; 3874 mutex_exit(&stp->sd_lock); 3875 } 3876 } 3877 3878 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 3879 3880 while (_SAMESTR(q)) { 3881 create_syncq_putlocks(q); 3882 if (stream == 0) 3883 return; 3884 q = q->q_next; 3885 } 3886 ASSERT(q != NULL); 3887 create_syncq_putlocks(q); 3888 } 3889 3890 /* 3891 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 3892 * through a stream. 3893 * 3894 * Data currently record per event is a hrtime stamp, queue address, event 3895 * type, and a per type datum. Much of the STREAMS framework is instrumented 3896 * for automatic flow tracing (when enabled). Events can be defined and used 3897 * by STREAMS modules and drivers. 3898 * 3899 * Global objects: 3900 * 3901 * str_ftevent() - Add a flow-trace event to a dblk. 3902 * str_ftfree() - Free flow-trace data 3903 * 3904 * Local objects: 3905 * 3906 * fthdr_cache - pointer to the kmem cache for trace header. 3907 * ftblk_cache - pointer to the kmem cache for trace data blocks. 3908 */ 3909 3910 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 3911 3912 void 3913 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 3914 { 3915 ftblk_t *bp = hp->tail; 3916 ftblk_t *nbp; 3917 ftevnt_t *ep; 3918 int ix, nix; 3919 3920 ASSERT(hp != NULL); 3921 3922 for (;;) { 3923 if ((ix = bp->ix) == FTBLK_EVNTS) { 3924 /* 3925 * Tail doesn't have room, so need a new tail. 3926 * 3927 * To make this MT safe, first, allocate a new 3928 * ftblk, and initialize it. To make life a 3929 * little easier, reserve the first slot (mostly 3930 * by making ix = 1). When we are finished with 3931 * the initialization, CAS this pointer to the 3932 * tail. If this succeeds, this is the new 3933 * "next" block. Otherwise, another thread 3934 * got here first, so free the block and start 3935 * again. 3936 */ 3937 if (!(nbp = kmem_cache_alloc(ftblk_cache, 3938 KM_NOSLEEP))) { 3939 /* no mem, so punt */ 3940 str_ftnever++; 3941 /* free up all flow data? */ 3942 return; 3943 } 3944 nbp->nxt = NULL; 3945 nbp->ix = 1; 3946 /* 3947 * Just in case there is another thread about 3948 * to get the next index, we need to make sure 3949 * the value is there for it. 3950 */ 3951 membar_producer(); 3952 if (casptr(&hp->tail, bp, nbp) == bp) { 3953 /* CAS was successful */ 3954 bp->nxt = nbp; 3955 membar_producer(); 3956 bp = nbp; 3957 ix = 0; 3958 goto cas_good; 3959 } else { 3960 kmem_cache_free(ftblk_cache, nbp); 3961 bp = hp->tail; 3962 continue; 3963 } 3964 } 3965 nix = ix + 1; 3966 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 3967 cas_good: 3968 if (curthread != hp->thread) { 3969 hp->thread = curthread; 3970 evnt |= FTEV_CS; 3971 } 3972 if (CPU->cpu_seqid != hp->cpu_seqid) { 3973 hp->cpu_seqid = CPU->cpu_seqid; 3974 evnt |= FTEV_PS; 3975 } 3976 ep = &bp->ev[ix]; 3977 break; 3978 } 3979 } 3980 3981 if (evnt & FTEV_QMASK) { 3982 queue_t *qp = p; 3983 3984 /* 3985 * It is possible that the module info is broke 3986 * (as is logsubr.c at this comment writing). 3987 * Instead of panicing or doing other unmentionables, 3988 * we shall put a dummy name as the mid, and continue. 3989 */ 3990 if (qp->q_qinfo == NULL) 3991 ep->mid = "NONAME"; 3992 else 3993 ep->mid = qp->q_qinfo->qi_minfo->mi_idname; 3994 3995 if (!(qp->q_flag & QREADR)) 3996 evnt |= FTEV_ISWR; 3997 } else { 3998 ep->mid = (char *)p; 3999 } 4000 4001 ep->ts = gethrtime(); 4002 ep->evnt = evnt; 4003 ep->data = data; 4004 hp->hash = (hp->hash << 9) + hp->hash; 4005 hp->hash += (evnt << 16) | data; 4006 hp->hash += (uintptr_t)ep->mid; 4007 } 4008 4009 /* 4010 * Free flow-trace data. 4011 */ 4012 void 4013 str_ftfree(dblk_t *dbp) 4014 { 4015 fthdr_t *hp = dbp->db_fthdr; 4016 ftblk_t *bp = &hp->first; 4017 ftblk_t *nbp; 4018 4019 if (bp != hp->tail || bp->ix != 0) { 4020 /* 4021 * Clear out the hash, have the tail point to itself, and free 4022 * any continuation blocks. 4023 */ 4024 bp = hp->first.nxt; 4025 hp->tail = &hp->first; 4026 hp->hash = 0; 4027 hp->first.nxt = NULL; 4028 hp->first.ix = 0; 4029 while (bp != NULL) { 4030 nbp = bp->nxt; 4031 kmem_cache_free(ftblk_cache, bp); 4032 bp = nbp; 4033 } 4034 } 4035 kmem_cache_free(fthdr_cache, hp); 4036 dbp->db_fthdr = NULL; 4037 } 4038