1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 25 /* 26 * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 27 * Use is subject to license terms. 28 */ 29 30 #pragma ident "%Z%%M% %I% %E% SMI" 31 32 #include <sys/types.h> 33 #include <sys/param.h> 34 #include <sys/thread.h> 35 #include <sys/sysmacros.h> 36 #include <sys/stropts.h> 37 #include <sys/stream.h> 38 #include <sys/strsubr.h> 39 #include <sys/strsun.h> 40 #include <sys/conf.h> 41 #include <sys/debug.h> 42 #include <sys/cmn_err.h> 43 #include <sys/kmem.h> 44 #include <sys/atomic.h> 45 #include <sys/errno.h> 46 #include <sys/vtrace.h> 47 #include <sys/ftrace.h> 48 #include <sys/ontrap.h> 49 #include <sys/multidata.h> 50 #include <sys/multidata_impl.h> 51 #include <sys/sdt.h> 52 #include <sys/strft.h> 53 54 #ifdef DEBUG 55 #include <sys/kmem_impl.h> 56 #endif 57 58 /* 59 * This file contains all the STREAMS utility routines that may 60 * be used by modules and drivers. 61 */ 62 63 /* 64 * STREAMS message allocator: principles of operation 65 * 66 * The streams message allocator consists of all the routines that 67 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 68 * dupb(), freeb() and freemsg(). What follows is a high-level view 69 * of how the allocator works. 70 * 71 * Every streams message consists of one or more mblks, a dblk, and data. 72 * All mblks for all types of messages come from a common mblk_cache. 73 * The dblk and data come in several flavors, depending on how the 74 * message is allocated: 75 * 76 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 77 * fixed-size dblk/data caches. For message sizes that are multiples of 78 * PAGESIZE, dblks are allocated separately from the buffer. 79 * The associated buffer is allocated by the constructor using kmem_alloc(). 80 * For all other message sizes, dblk and its associated data is allocated 81 * as a single contiguous chunk of memory. 82 * Objects in these caches consist of a dblk plus its associated data. 83 * allocb() determines the nearest-size cache by table lookup: 84 * the dblk_cache[] array provides the mapping from size to dblk cache. 85 * 86 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 87 * kmem_alloc()'ing a buffer for the data and supplying that 88 * buffer to gesballoc(), described below. 89 * 90 * (3) The four flavors of [d]esballoc[a] are all implemented by a 91 * common routine, gesballoc() ("generic esballoc"). gesballoc() 92 * allocates a dblk from the global dblk_esb_cache and sets db_base, 93 * db_lim and db_frtnp to describe the caller-supplied buffer. 94 * 95 * While there are several routines to allocate messages, there is only 96 * one routine to free messages: freeb(). freeb() simply invokes the 97 * dblk's free method, dbp->db_free(), which is set at allocation time. 98 * 99 * dupb() creates a new reference to a message by allocating a new mblk, 100 * incrementing the dblk reference count and setting the dblk's free 101 * method to dblk_decref(). The dblk's original free method is retained 102 * in db_lastfree. dblk_decref() decrements the reference count on each 103 * freeb(). If this is not the last reference it just frees the mblk; 104 * if this *is* the last reference, it restores db_free to db_lastfree, 105 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 106 * 107 * The implementation makes aggressive use of kmem object caching for 108 * maximum performance. This makes the code simple and compact, but 109 * also a bit abstruse in some places. The invariants that constitute a 110 * message's constructed state, described below, are more subtle than usual. 111 * 112 * Every dblk has an "attached mblk" as part of its constructed state. 113 * The mblk is allocated by the dblk's constructor and remains attached 114 * until the message is either dup'ed or pulled up. In the dupb() case 115 * the mblk association doesn't matter until the last free, at which time 116 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 117 * the mblk association because it swaps the leading mblks of two messages, 118 * so it is responsible for swapping their db_mblk pointers accordingly. 119 * From a constructed-state viewpoint it doesn't matter that a dblk's 120 * attached mblk can change while the message is allocated; all that 121 * matters is that the dblk has *some* attached mblk when it's freed. 122 * 123 * The sizes of the allocb() small-message caches are not magical. 124 * They represent a good trade-off between internal and external 125 * fragmentation for current workloads. They should be reevaluated 126 * periodically, especially if allocations larger than DBLK_MAX_CACHE 127 * become common. We use 64-byte alignment so that dblks don't 128 * straddle cache lines unnecessarily. 129 */ 130 #define DBLK_MAX_CACHE 73728 131 #define DBLK_CACHE_ALIGN 64 132 #define DBLK_MIN_SIZE 8 133 #define DBLK_SIZE_SHIFT 3 134 135 #ifdef _BIG_ENDIAN 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 138 #else 139 #define DBLK_RTFU_SHIFT(field) \ 140 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 141 #endif 142 143 #define DBLK_RTFU(ref, type, flags, uioflag) \ 144 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 145 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 146 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 147 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 148 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 149 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 150 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 151 152 static size_t dblk_sizes[] = { 153 #ifdef _LP64 154 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920, 155 8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688, 156 40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456, 157 #else 158 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968, 159 8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736, 160 40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504, 161 #endif 162 DBLK_MAX_CACHE, 0 163 }; 164 165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 166 static struct kmem_cache *mblk_cache; 167 static struct kmem_cache *dblk_esb_cache; 168 static struct kmem_cache *fthdr_cache; 169 static struct kmem_cache *ftblk_cache; 170 171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 172 static mblk_t *allocb_oversize(size_t size, int flags); 173 static int allocb_tryhard_fails; 174 static void frnop_func(void *arg); 175 frtn_t frnop = { frnop_func }; 176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 177 178 static boolean_t rwnext_enter(queue_t *qp); 179 static void rwnext_exit(queue_t *qp); 180 181 /* 182 * Patchable mblk/dblk kmem_cache flags. 183 */ 184 int dblk_kmem_flags = 0; 185 int mblk_kmem_flags = 0; 186 187 188 static int 189 dblk_constructor(void *buf, void *cdrarg, int kmflags) 190 { 191 dblk_t *dbp = buf; 192 ssize_t msg_size = (ssize_t)cdrarg; 193 size_t index; 194 195 ASSERT(msg_size != 0); 196 197 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 198 199 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 200 201 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 202 return (-1); 203 if ((msg_size & PAGEOFFSET) == 0) { 204 dbp->db_base = kmem_alloc(msg_size, kmflags); 205 if (dbp->db_base == NULL) { 206 kmem_cache_free(mblk_cache, dbp->db_mblk); 207 return (-1); 208 } 209 } else { 210 dbp->db_base = (unsigned char *)&dbp[1]; 211 } 212 213 dbp->db_mblk->b_datap = dbp; 214 dbp->db_cache = dblk_cache[index]; 215 dbp->db_lim = dbp->db_base + msg_size; 216 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 217 dbp->db_frtnp = NULL; 218 dbp->db_fthdr = NULL; 219 dbp->db_credp = NULL; 220 dbp->db_cpid = -1; 221 dbp->db_struioflag = 0; 222 dbp->db_struioun.cksum.flags = 0; 223 return (0); 224 } 225 226 /*ARGSUSED*/ 227 static int 228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 229 { 230 dblk_t *dbp = buf; 231 232 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 233 return (-1); 234 dbp->db_mblk->b_datap = dbp; 235 dbp->db_cache = dblk_esb_cache; 236 dbp->db_fthdr = NULL; 237 dbp->db_credp = NULL; 238 dbp->db_cpid = -1; 239 dbp->db_struioflag = 0; 240 dbp->db_struioun.cksum.flags = 0; 241 return (0); 242 } 243 244 static int 245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 246 { 247 dblk_t *dbp = buf; 248 bcache_t *bcp = (bcache_t *)cdrarg; 249 250 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 251 return (-1); 252 253 if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache, 254 kmflags)) == NULL) { 255 kmem_cache_free(mblk_cache, dbp->db_mblk); 256 return (-1); 257 } 258 259 dbp->db_mblk->b_datap = dbp; 260 dbp->db_cache = (void *)bcp; 261 dbp->db_lim = dbp->db_base + bcp->size; 262 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 263 dbp->db_frtnp = NULL; 264 dbp->db_fthdr = NULL; 265 dbp->db_credp = NULL; 266 dbp->db_cpid = -1; 267 dbp->db_struioflag = 0; 268 dbp->db_struioun.cksum.flags = 0; 269 return (0); 270 } 271 272 /*ARGSUSED*/ 273 static void 274 dblk_destructor(void *buf, void *cdrarg) 275 { 276 dblk_t *dbp = buf; 277 ssize_t msg_size = (ssize_t)cdrarg; 278 279 ASSERT(dbp->db_mblk->b_datap == dbp); 280 281 ASSERT(msg_size != 0); 282 283 ASSERT(dbp->db_struioflag == 0); 284 ASSERT(dbp->db_struioun.cksum.flags == 0); 285 286 if ((msg_size & PAGEOFFSET) == 0) { 287 kmem_free(dbp->db_base, msg_size); 288 } 289 290 kmem_cache_free(mblk_cache, dbp->db_mblk); 291 } 292 293 static void 294 bcache_dblk_destructor(void *buf, void *cdrarg) 295 { 296 dblk_t *dbp = buf; 297 bcache_t *bcp = (bcache_t *)cdrarg; 298 299 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 300 301 ASSERT(dbp->db_mblk->b_datap == dbp); 302 303 ASSERT(dbp->db_struioflag == 0); 304 ASSERT(dbp->db_struioun.cksum.flags == 0); 305 306 kmem_cache_free(mblk_cache, dbp->db_mblk); 307 } 308 309 void 310 streams_msg_init(void) 311 { 312 char name[40]; 313 size_t size; 314 size_t lastsize = DBLK_MIN_SIZE; 315 size_t *sizep; 316 struct kmem_cache *cp; 317 size_t tot_size; 318 int offset; 319 320 mblk_cache = kmem_cache_create("streams_mblk", 321 sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL, 322 mblk_kmem_flags); 323 324 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 325 326 if ((offset = (size & PAGEOFFSET)) != 0) { 327 /* 328 * We are in the middle of a page, dblk should 329 * be allocated on the same page 330 */ 331 tot_size = size + sizeof (dblk_t); 332 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 333 < PAGESIZE); 334 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 335 336 } else { 337 338 /* 339 * buf size is multiple of page size, dblk and 340 * buffer are allocated separately. 341 */ 342 343 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 344 tot_size = sizeof (dblk_t); 345 } 346 347 (void) sprintf(name, "streams_dblk_%ld", size); 348 cp = kmem_cache_create(name, tot_size, 349 DBLK_CACHE_ALIGN, dblk_constructor, 350 dblk_destructor, NULL, 351 (void *)(size), NULL, dblk_kmem_flags); 352 353 while (lastsize <= size) { 354 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 355 lastsize += DBLK_MIN_SIZE; 356 } 357 } 358 359 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", 360 sizeof (dblk_t), DBLK_CACHE_ALIGN, 361 dblk_esb_constructor, dblk_destructor, NULL, 362 (void *) sizeof (dblk_t), NULL, dblk_kmem_flags); 363 fthdr_cache = kmem_cache_create("streams_fthdr", 364 sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 365 ftblk_cache = kmem_cache_create("streams_ftblk", 366 sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 367 368 /* Initialize Multidata caches */ 369 mmd_init(); 370 371 /* initialize throttling queue for esballoc */ 372 esballoc_queue_init(); 373 } 374 375 /*ARGSUSED*/ 376 mblk_t * 377 allocb(size_t size, uint_t pri) 378 { 379 dblk_t *dbp; 380 mblk_t *mp; 381 size_t index; 382 383 index = (size - 1) >> DBLK_SIZE_SHIFT; 384 385 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 386 if (size != 0) { 387 mp = allocb_oversize(size, KM_NOSLEEP); 388 goto out; 389 } 390 index = 0; 391 } 392 393 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 394 mp = NULL; 395 goto out; 396 } 397 398 mp = dbp->db_mblk; 399 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 400 mp->b_next = mp->b_prev = mp->b_cont = NULL; 401 mp->b_rptr = mp->b_wptr = dbp->db_base; 402 mp->b_queue = NULL; 403 MBLK_BAND_FLAG_WORD(mp) = 0; 404 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 405 out: 406 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 407 408 return (mp); 409 } 410 411 mblk_t * 412 allocb_tmpl(size_t size, const mblk_t *tmpl) 413 { 414 mblk_t *mp = allocb(size, 0); 415 416 if (mp != NULL) { 417 cred_t *cr = DB_CRED(tmpl); 418 if (cr != NULL) 419 crhold(mp->b_datap->db_credp = cr); 420 DB_CPID(mp) = DB_CPID(tmpl); 421 DB_TYPE(mp) = DB_TYPE(tmpl); 422 } 423 return (mp); 424 } 425 426 mblk_t * 427 allocb_cred(size_t size, cred_t *cr) 428 { 429 mblk_t *mp = allocb(size, 0); 430 431 if (mp != NULL && cr != NULL) 432 crhold(mp->b_datap->db_credp = cr); 433 434 return (mp); 435 } 436 437 mblk_t * 438 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr) 439 { 440 mblk_t *mp = allocb_wait(size, 0, flags, error); 441 442 if (mp != NULL && cr != NULL) 443 crhold(mp->b_datap->db_credp = cr); 444 445 return (mp); 446 } 447 448 void 449 freeb(mblk_t *mp) 450 { 451 dblk_t *dbp = mp->b_datap; 452 453 ASSERT(dbp->db_ref > 0); 454 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 455 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 456 457 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 458 459 dbp->db_free(mp, dbp); 460 } 461 462 void 463 freemsg(mblk_t *mp) 464 { 465 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 466 while (mp) { 467 dblk_t *dbp = mp->b_datap; 468 mblk_t *mp_cont = mp->b_cont; 469 470 ASSERT(dbp->db_ref > 0); 471 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 472 473 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 474 475 dbp->db_free(mp, dbp); 476 mp = mp_cont; 477 } 478 } 479 480 /* 481 * Reallocate a block for another use. Try hard to use the old block. 482 * If the old data is wanted (copy), leave b_wptr at the end of the data, 483 * otherwise return b_wptr = b_rptr. 484 * 485 * This routine is private and unstable. 486 */ 487 mblk_t * 488 reallocb(mblk_t *mp, size_t size, uint_t copy) 489 { 490 mblk_t *mp1; 491 unsigned char *old_rptr; 492 ptrdiff_t cur_size; 493 494 if (mp == NULL) 495 return (allocb(size, BPRI_HI)); 496 497 cur_size = mp->b_wptr - mp->b_rptr; 498 old_rptr = mp->b_rptr; 499 500 ASSERT(mp->b_datap->db_ref != 0); 501 502 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 503 /* 504 * If the data is wanted and it will fit where it is, no 505 * work is required. 506 */ 507 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 508 return (mp); 509 510 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 511 mp1 = mp; 512 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 513 /* XXX other mp state could be copied too, db_flags ... ? */ 514 mp1->b_cont = mp->b_cont; 515 } else { 516 return (NULL); 517 } 518 519 if (copy) { 520 bcopy(old_rptr, mp1->b_rptr, cur_size); 521 mp1->b_wptr = mp1->b_rptr + cur_size; 522 } 523 524 if (mp != mp1) 525 freeb(mp); 526 527 return (mp1); 528 } 529 530 static void 531 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 532 { 533 ASSERT(dbp->db_mblk == mp); 534 if (dbp->db_fthdr != NULL) 535 str_ftfree(dbp); 536 537 /* set credp and projid to be 'unspecified' before returning to cache */ 538 if (dbp->db_credp != NULL) { 539 crfree(dbp->db_credp); 540 dbp->db_credp = NULL; 541 } 542 dbp->db_cpid = -1; 543 544 /* Reset the struioflag and the checksum flag fields */ 545 dbp->db_struioflag = 0; 546 dbp->db_struioun.cksum.flags = 0; 547 548 /* and the COOKED flag */ 549 dbp->db_flags &= ~DBLK_COOKED; 550 551 kmem_cache_free(dbp->db_cache, dbp); 552 } 553 554 static void 555 dblk_decref(mblk_t *mp, dblk_t *dbp) 556 { 557 if (dbp->db_ref != 1) { 558 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 559 -(1 << DBLK_RTFU_SHIFT(db_ref))); 560 /* 561 * atomic_add_32_nv() just decremented db_ref, so we no longer 562 * have a reference to the dblk, which means another thread 563 * could free it. Therefore we cannot examine the dblk to 564 * determine whether ours was the last reference. Instead, 565 * we extract the new and minimum reference counts from rtfu. 566 * Note that all we're really saying is "if (ref != refmin)". 567 */ 568 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 569 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 570 kmem_cache_free(mblk_cache, mp); 571 return; 572 } 573 } 574 dbp->db_mblk = mp; 575 dbp->db_free = dbp->db_lastfree; 576 dbp->db_lastfree(mp, dbp); 577 } 578 579 mblk_t * 580 dupb(mblk_t *mp) 581 { 582 dblk_t *dbp = mp->b_datap; 583 mblk_t *new_mp; 584 uint32_t oldrtfu, newrtfu; 585 586 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 587 goto out; 588 589 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 590 new_mp->b_rptr = mp->b_rptr; 591 new_mp->b_wptr = mp->b_wptr; 592 new_mp->b_datap = dbp; 593 new_mp->b_queue = NULL; 594 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 595 596 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 597 598 dbp->db_free = dblk_decref; 599 do { 600 ASSERT(dbp->db_ref > 0); 601 oldrtfu = DBLK_RTFU_WORD(dbp); 602 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 603 /* 604 * If db_ref is maxed out we can't dup this message anymore. 605 */ 606 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 607 kmem_cache_free(mblk_cache, new_mp); 608 new_mp = NULL; 609 goto out; 610 } 611 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 612 613 out: 614 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 615 return (new_mp); 616 } 617 618 static void 619 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 620 { 621 frtn_t *frp = dbp->db_frtnp; 622 623 ASSERT(dbp->db_mblk == mp); 624 frp->free_func(frp->free_arg); 625 if (dbp->db_fthdr != NULL) 626 str_ftfree(dbp); 627 628 /* set credp and projid to be 'unspecified' before returning to cache */ 629 if (dbp->db_credp != NULL) { 630 crfree(dbp->db_credp); 631 dbp->db_credp = NULL; 632 } 633 dbp->db_cpid = -1; 634 dbp->db_struioflag = 0; 635 dbp->db_struioun.cksum.flags = 0; 636 637 kmem_cache_free(dbp->db_cache, dbp); 638 } 639 640 /*ARGSUSED*/ 641 static void 642 frnop_func(void *arg) 643 { 644 } 645 646 /* 647 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 648 */ 649 static mblk_t * 650 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 651 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 652 { 653 dblk_t *dbp; 654 mblk_t *mp; 655 656 ASSERT(base != NULL && frp != NULL); 657 658 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 659 mp = NULL; 660 goto out; 661 } 662 663 mp = dbp->db_mblk; 664 dbp->db_base = base; 665 dbp->db_lim = base + size; 666 dbp->db_free = dbp->db_lastfree = lastfree; 667 dbp->db_frtnp = frp; 668 DBLK_RTFU_WORD(dbp) = db_rtfu; 669 mp->b_next = mp->b_prev = mp->b_cont = NULL; 670 mp->b_rptr = mp->b_wptr = base; 671 mp->b_queue = NULL; 672 MBLK_BAND_FLAG_WORD(mp) = 0; 673 674 out: 675 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 676 return (mp); 677 } 678 679 /*ARGSUSED*/ 680 mblk_t * 681 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 682 { 683 mblk_t *mp; 684 685 /* 686 * Note that this is structured to allow the common case (i.e. 687 * STREAMS flowtracing disabled) to call gesballoc() with tail 688 * call optimization. 689 */ 690 if (!str_ftnever) { 691 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 692 frp, freebs_enqueue, KM_NOSLEEP); 693 694 if (mp != NULL) 695 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 696 return (mp); 697 } 698 699 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 700 frp, freebs_enqueue, KM_NOSLEEP)); 701 } 702 703 /* 704 * Same as esballoc() but sleeps waiting for memory. 705 */ 706 /*ARGSUSED*/ 707 mblk_t * 708 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 709 { 710 mblk_t *mp; 711 712 /* 713 * Note that this is structured to allow the common case (i.e. 714 * STREAMS flowtracing disabled) to call gesballoc() with tail 715 * call optimization. 716 */ 717 if (!str_ftnever) { 718 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 719 frp, freebs_enqueue, KM_SLEEP); 720 721 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 722 return (mp); 723 } 724 725 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 726 frp, freebs_enqueue, KM_SLEEP)); 727 } 728 729 /*ARGSUSED*/ 730 mblk_t * 731 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 732 { 733 mblk_t *mp; 734 735 /* 736 * Note that this is structured to allow the common case (i.e. 737 * STREAMS flowtracing disabled) to call gesballoc() with tail 738 * call optimization. 739 */ 740 if (!str_ftnever) { 741 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 742 frp, dblk_lastfree_desb, KM_NOSLEEP); 743 744 if (mp != NULL) 745 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 746 return (mp); 747 } 748 749 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 750 frp, dblk_lastfree_desb, KM_NOSLEEP)); 751 } 752 753 /*ARGSUSED*/ 754 mblk_t * 755 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 756 { 757 mblk_t *mp; 758 759 /* 760 * Note that this is structured to allow the common case (i.e. 761 * STREAMS flowtracing disabled) to call gesballoc() with tail 762 * call optimization. 763 */ 764 if (!str_ftnever) { 765 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 766 frp, freebs_enqueue, KM_NOSLEEP); 767 768 if (mp != NULL) 769 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 770 return (mp); 771 } 772 773 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 774 frp, freebs_enqueue, KM_NOSLEEP)); 775 } 776 777 /*ARGSUSED*/ 778 mblk_t * 779 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 780 { 781 mblk_t *mp; 782 783 /* 784 * Note that this is structured to allow the common case (i.e. 785 * STREAMS flowtracing disabled) to call gesballoc() with tail 786 * call optimization. 787 */ 788 if (!str_ftnever) { 789 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 790 frp, dblk_lastfree_desb, KM_NOSLEEP); 791 792 if (mp != NULL) 793 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 794 return (mp); 795 } 796 797 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 798 frp, dblk_lastfree_desb, KM_NOSLEEP)); 799 } 800 801 static void 802 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 803 { 804 bcache_t *bcp = dbp->db_cache; 805 806 ASSERT(dbp->db_mblk == mp); 807 if (dbp->db_fthdr != NULL) 808 str_ftfree(dbp); 809 810 /* set credp and projid to be 'unspecified' before returning to cache */ 811 if (dbp->db_credp != NULL) { 812 crfree(dbp->db_credp); 813 dbp->db_credp = NULL; 814 } 815 dbp->db_cpid = -1; 816 dbp->db_struioflag = 0; 817 dbp->db_struioun.cksum.flags = 0; 818 819 mutex_enter(&bcp->mutex); 820 kmem_cache_free(bcp->dblk_cache, dbp); 821 bcp->alloc--; 822 823 if (bcp->alloc == 0 && bcp->destroy != 0) { 824 kmem_cache_destroy(bcp->dblk_cache); 825 kmem_cache_destroy(bcp->buffer_cache); 826 mutex_exit(&bcp->mutex); 827 mutex_destroy(&bcp->mutex); 828 kmem_free(bcp, sizeof (bcache_t)); 829 } else { 830 mutex_exit(&bcp->mutex); 831 } 832 } 833 834 bcache_t * 835 bcache_create(char *name, size_t size, uint_t align) 836 { 837 bcache_t *bcp; 838 char buffer[255]; 839 840 ASSERT((align & (align - 1)) == 0); 841 842 if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == 843 NULL) { 844 return (NULL); 845 } 846 847 bcp->size = size; 848 bcp->align = align; 849 bcp->alloc = 0; 850 bcp->destroy = 0; 851 852 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 853 854 (void) sprintf(buffer, "%s_buffer_cache", name); 855 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 856 NULL, NULL, NULL, 0); 857 (void) sprintf(buffer, "%s_dblk_cache", name); 858 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 859 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 860 NULL, (void *)bcp, NULL, 0); 861 862 return (bcp); 863 } 864 865 void 866 bcache_destroy(bcache_t *bcp) 867 { 868 ASSERT(bcp != NULL); 869 870 mutex_enter(&bcp->mutex); 871 if (bcp->alloc == 0) { 872 kmem_cache_destroy(bcp->dblk_cache); 873 kmem_cache_destroy(bcp->buffer_cache); 874 mutex_exit(&bcp->mutex); 875 mutex_destroy(&bcp->mutex); 876 kmem_free(bcp, sizeof (bcache_t)); 877 } else { 878 bcp->destroy++; 879 mutex_exit(&bcp->mutex); 880 } 881 } 882 883 /*ARGSUSED*/ 884 mblk_t * 885 bcache_allocb(bcache_t *bcp, uint_t pri) 886 { 887 dblk_t *dbp; 888 mblk_t *mp = NULL; 889 890 ASSERT(bcp != NULL); 891 892 mutex_enter(&bcp->mutex); 893 if (bcp->destroy != 0) { 894 mutex_exit(&bcp->mutex); 895 goto out; 896 } 897 898 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 899 mutex_exit(&bcp->mutex); 900 goto out; 901 } 902 bcp->alloc++; 903 mutex_exit(&bcp->mutex); 904 905 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 906 907 mp = dbp->db_mblk; 908 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 909 mp->b_next = mp->b_prev = mp->b_cont = NULL; 910 mp->b_rptr = mp->b_wptr = dbp->db_base; 911 mp->b_queue = NULL; 912 MBLK_BAND_FLAG_WORD(mp) = 0; 913 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 914 out: 915 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 916 917 return (mp); 918 } 919 920 static void 921 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 922 { 923 ASSERT(dbp->db_mblk == mp); 924 if (dbp->db_fthdr != NULL) 925 str_ftfree(dbp); 926 927 /* set credp and projid to be 'unspecified' before returning to cache */ 928 if (dbp->db_credp != NULL) { 929 crfree(dbp->db_credp); 930 dbp->db_credp = NULL; 931 } 932 dbp->db_cpid = -1; 933 dbp->db_struioflag = 0; 934 dbp->db_struioun.cksum.flags = 0; 935 936 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 937 kmem_cache_free(dbp->db_cache, dbp); 938 } 939 940 static mblk_t * 941 allocb_oversize(size_t size, int kmflags) 942 { 943 mblk_t *mp; 944 void *buf; 945 946 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 947 if ((buf = kmem_alloc(size, kmflags)) == NULL) 948 return (NULL); 949 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 950 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 951 kmem_free(buf, size); 952 953 if (mp != NULL) 954 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 955 956 return (mp); 957 } 958 959 mblk_t * 960 allocb_tryhard(size_t target_size) 961 { 962 size_t size; 963 mblk_t *bp; 964 965 for (size = target_size; size < target_size + 512; 966 size += DBLK_CACHE_ALIGN) 967 if ((bp = allocb(size, BPRI_HI)) != NULL) 968 return (bp); 969 allocb_tryhard_fails++; 970 return (NULL); 971 } 972 973 /* 974 * This routine is consolidation private for STREAMS internal use 975 * This routine may only be called from sync routines (i.e., not 976 * from put or service procedures). It is located here (rather 977 * than strsubr.c) so that we don't have to expose all of the 978 * allocb() implementation details in header files. 979 */ 980 mblk_t * 981 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 982 { 983 dblk_t *dbp; 984 mblk_t *mp; 985 size_t index; 986 987 index = (size -1) >> DBLK_SIZE_SHIFT; 988 989 if (flags & STR_NOSIG) { 990 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 991 if (size != 0) { 992 mp = allocb_oversize(size, KM_SLEEP); 993 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 994 (uintptr_t)mp); 995 return (mp); 996 } 997 index = 0; 998 } 999 1000 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1001 mp = dbp->db_mblk; 1002 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1003 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1004 mp->b_rptr = mp->b_wptr = dbp->db_base; 1005 mp->b_queue = NULL; 1006 MBLK_BAND_FLAG_WORD(mp) = 0; 1007 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1008 1009 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1010 1011 } else { 1012 while ((mp = allocb(size, pri)) == NULL) { 1013 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1014 return (NULL); 1015 } 1016 } 1017 1018 return (mp); 1019 } 1020 1021 /* 1022 * Call function 'func' with 'arg' when a class zero block can 1023 * be allocated with priority 'pri'. 1024 */ 1025 bufcall_id_t 1026 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1027 { 1028 return (bufcall(1, pri, func, arg)); 1029 } 1030 1031 /* 1032 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1033 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1034 * This provides consistency for all internal allocators of ioctl. 1035 */ 1036 mblk_t * 1037 mkiocb(uint_t cmd) 1038 { 1039 struct iocblk *ioc; 1040 mblk_t *mp; 1041 1042 /* 1043 * Allocate enough space for any of the ioctl related messages. 1044 */ 1045 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1046 return (NULL); 1047 1048 bzero(mp->b_rptr, sizeof (union ioctypes)); 1049 1050 /* 1051 * Set the mblk_t information and ptrs correctly. 1052 */ 1053 mp->b_wptr += sizeof (struct iocblk); 1054 mp->b_datap->db_type = M_IOCTL; 1055 1056 /* 1057 * Fill in the fields. 1058 */ 1059 ioc = (struct iocblk *)mp->b_rptr; 1060 ioc->ioc_cmd = cmd; 1061 ioc->ioc_cr = kcred; 1062 ioc->ioc_id = getiocseqno(); 1063 ioc->ioc_flag = IOC_NATIVE; 1064 return (mp); 1065 } 1066 1067 /* 1068 * test if block of given size can be allocated with a request of 1069 * the given priority. 1070 * 'pri' is no longer used, but is retained for compatibility. 1071 */ 1072 /* ARGSUSED */ 1073 int 1074 testb(size_t size, uint_t pri) 1075 { 1076 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1077 } 1078 1079 /* 1080 * Call function 'func' with argument 'arg' when there is a reasonably 1081 * good chance that a block of size 'size' can be allocated. 1082 * 'pri' is no longer used, but is retained for compatibility. 1083 */ 1084 /* ARGSUSED */ 1085 bufcall_id_t 1086 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1087 { 1088 static long bid = 1; /* always odd to save checking for zero */ 1089 bufcall_id_t bc_id; 1090 struct strbufcall *bcp; 1091 1092 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1093 return (0); 1094 1095 bcp->bc_func = func; 1096 bcp->bc_arg = arg; 1097 bcp->bc_size = size; 1098 bcp->bc_next = NULL; 1099 bcp->bc_executor = NULL; 1100 1101 mutex_enter(&strbcall_lock); 1102 /* 1103 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1104 * should be no references to bcp since it may be freed by 1105 * runbufcalls(). Since bcp_id field is returned, we save its value in 1106 * the local var. 1107 */ 1108 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1109 1110 /* 1111 * add newly allocated stream event to existing 1112 * linked list of events. 1113 */ 1114 if (strbcalls.bc_head == NULL) { 1115 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1116 } else { 1117 strbcalls.bc_tail->bc_next = bcp; 1118 strbcalls.bc_tail = bcp; 1119 } 1120 1121 cv_signal(&strbcall_cv); 1122 mutex_exit(&strbcall_lock); 1123 return (bc_id); 1124 } 1125 1126 /* 1127 * Cancel a bufcall request. 1128 */ 1129 void 1130 unbufcall(bufcall_id_t id) 1131 { 1132 strbufcall_t *bcp, *pbcp; 1133 1134 mutex_enter(&strbcall_lock); 1135 again: 1136 pbcp = NULL; 1137 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1138 if (id == bcp->bc_id) 1139 break; 1140 pbcp = bcp; 1141 } 1142 if (bcp) { 1143 if (bcp->bc_executor != NULL) { 1144 if (bcp->bc_executor != curthread) { 1145 cv_wait(&bcall_cv, &strbcall_lock); 1146 goto again; 1147 } 1148 } else { 1149 if (pbcp) 1150 pbcp->bc_next = bcp->bc_next; 1151 else 1152 strbcalls.bc_head = bcp->bc_next; 1153 if (bcp == strbcalls.bc_tail) 1154 strbcalls.bc_tail = pbcp; 1155 kmem_free(bcp, sizeof (strbufcall_t)); 1156 } 1157 } 1158 mutex_exit(&strbcall_lock); 1159 } 1160 1161 /* 1162 * Duplicate a message block by block (uses dupb), returning 1163 * a pointer to the duplicate message. 1164 * Returns a non-NULL value only if the entire message 1165 * was dup'd. 1166 */ 1167 mblk_t * 1168 dupmsg(mblk_t *bp) 1169 { 1170 mblk_t *head, *nbp; 1171 1172 if (!bp || !(nbp = head = dupb(bp))) 1173 return (NULL); 1174 1175 while (bp->b_cont) { 1176 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1177 freemsg(head); 1178 return (NULL); 1179 } 1180 nbp = nbp->b_cont; 1181 bp = bp->b_cont; 1182 } 1183 return (head); 1184 } 1185 1186 #define DUPB_NOLOAN(bp) \ 1187 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1188 copyb((bp)) : dupb((bp))) 1189 1190 mblk_t * 1191 dupmsg_noloan(mblk_t *bp) 1192 { 1193 mblk_t *head, *nbp; 1194 1195 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1196 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1197 return (NULL); 1198 1199 while (bp->b_cont) { 1200 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1201 freemsg(head); 1202 return (NULL); 1203 } 1204 nbp = nbp->b_cont; 1205 bp = bp->b_cont; 1206 } 1207 return (head); 1208 } 1209 1210 /* 1211 * Copy data from message and data block to newly allocated message and 1212 * data block. Returns new message block pointer, or NULL if error. 1213 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1214 * as in the original even when db_base is not word aligned. (bug 1052877) 1215 */ 1216 mblk_t * 1217 copyb(mblk_t *bp) 1218 { 1219 mblk_t *nbp; 1220 dblk_t *dp, *ndp; 1221 uchar_t *base; 1222 size_t size; 1223 size_t unaligned; 1224 1225 ASSERT(bp->b_wptr >= bp->b_rptr); 1226 1227 dp = bp->b_datap; 1228 if (dp->db_fthdr != NULL) 1229 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1230 1231 /* 1232 * Special handling for Multidata message; this should be 1233 * removed once a copy-callback routine is made available. 1234 */ 1235 if (dp->db_type == M_MULTIDATA) { 1236 cred_t *cr; 1237 1238 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1239 return (NULL); 1240 1241 nbp->b_flag = bp->b_flag; 1242 nbp->b_band = bp->b_band; 1243 ndp = nbp->b_datap; 1244 1245 /* See comments below on potential issues. */ 1246 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1247 1248 ASSERT(ndp->db_type == dp->db_type); 1249 cr = dp->db_credp; 1250 if (cr != NULL) 1251 crhold(ndp->db_credp = cr); 1252 ndp->db_cpid = dp->db_cpid; 1253 return (nbp); 1254 } 1255 1256 size = dp->db_lim - dp->db_base; 1257 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1258 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1259 return (NULL); 1260 nbp->b_flag = bp->b_flag; 1261 nbp->b_band = bp->b_band; 1262 ndp = nbp->b_datap; 1263 1264 /* 1265 * Well, here is a potential issue. If we are trying to 1266 * trace a flow, and we copy the message, we might lose 1267 * information about where this message might have been. 1268 * So we should inherit the FT data. On the other hand, 1269 * a user might be interested only in alloc to free data. 1270 * So I guess the real answer is to provide a tunable. 1271 */ 1272 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1273 1274 base = ndp->db_base + unaligned; 1275 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1276 1277 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1278 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1279 1280 return (nbp); 1281 } 1282 1283 /* 1284 * Copy data from message to newly allocated message using new 1285 * data blocks. Returns a pointer to the new message, or NULL if error. 1286 */ 1287 mblk_t * 1288 copymsg(mblk_t *bp) 1289 { 1290 mblk_t *head, *nbp; 1291 1292 if (!bp || !(nbp = head = copyb(bp))) 1293 return (NULL); 1294 1295 while (bp->b_cont) { 1296 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1297 freemsg(head); 1298 return (NULL); 1299 } 1300 nbp = nbp->b_cont; 1301 bp = bp->b_cont; 1302 } 1303 return (head); 1304 } 1305 1306 /* 1307 * link a message block to tail of message 1308 */ 1309 void 1310 linkb(mblk_t *mp, mblk_t *bp) 1311 { 1312 ASSERT(mp && bp); 1313 1314 for (; mp->b_cont; mp = mp->b_cont) 1315 ; 1316 mp->b_cont = bp; 1317 } 1318 1319 /* 1320 * unlink a message block from head of message 1321 * return pointer to new message. 1322 * NULL if message becomes empty. 1323 */ 1324 mblk_t * 1325 unlinkb(mblk_t *bp) 1326 { 1327 mblk_t *bp1; 1328 1329 bp1 = bp->b_cont; 1330 bp->b_cont = NULL; 1331 return (bp1); 1332 } 1333 1334 /* 1335 * remove a message block "bp" from message "mp" 1336 * 1337 * Return pointer to new message or NULL if no message remains. 1338 * Return -1 if bp is not found in message. 1339 */ 1340 mblk_t * 1341 rmvb(mblk_t *mp, mblk_t *bp) 1342 { 1343 mblk_t *tmp; 1344 mblk_t *lastp = NULL; 1345 1346 ASSERT(mp && bp); 1347 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1348 if (tmp == bp) { 1349 if (lastp) 1350 lastp->b_cont = tmp->b_cont; 1351 else 1352 mp = tmp->b_cont; 1353 tmp->b_cont = NULL; 1354 return (mp); 1355 } 1356 lastp = tmp; 1357 } 1358 return ((mblk_t *)-1); 1359 } 1360 1361 /* 1362 * Concatenate and align first len bytes of common 1363 * message type. Len == -1, means concat everything. 1364 * Returns 1 on success, 0 on failure 1365 * After the pullup, mp points to the pulled up data. 1366 */ 1367 int 1368 pullupmsg(mblk_t *mp, ssize_t len) 1369 { 1370 mblk_t *bp, *b_cont; 1371 dblk_t *dbp; 1372 ssize_t n; 1373 1374 ASSERT(mp->b_datap->db_ref > 0); 1375 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1376 1377 /* 1378 * We won't handle Multidata message, since it contains 1379 * metadata which this function has no knowledge of; we 1380 * assert on DEBUG, and return failure otherwise. 1381 */ 1382 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1383 if (mp->b_datap->db_type == M_MULTIDATA) 1384 return (0); 1385 1386 if (len == -1) { 1387 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1388 return (1); 1389 len = xmsgsize(mp); 1390 } else { 1391 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1392 ASSERT(first_mblk_len >= 0); 1393 /* 1394 * If the length is less than that of the first mblk, 1395 * we want to pull up the message into an aligned mblk. 1396 * Though not part of the spec, some callers assume it. 1397 */ 1398 if (len <= first_mblk_len) { 1399 if (str_aligned(mp->b_rptr)) 1400 return (1); 1401 len = first_mblk_len; 1402 } else if (xmsgsize(mp) < len) 1403 return (0); 1404 } 1405 1406 if ((bp = allocb_tmpl(len, mp)) == NULL) 1407 return (0); 1408 1409 dbp = bp->b_datap; 1410 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1411 mp->b_datap = dbp; /* ... and mp heads the new message */ 1412 mp->b_datap->db_mblk = mp; 1413 bp->b_datap->db_mblk = bp; 1414 mp->b_rptr = mp->b_wptr = dbp->db_base; 1415 1416 do { 1417 ASSERT(bp->b_datap->db_ref > 0); 1418 ASSERT(bp->b_wptr >= bp->b_rptr); 1419 n = MIN(bp->b_wptr - bp->b_rptr, len); 1420 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1421 mp->b_wptr += n; 1422 bp->b_rptr += n; 1423 len -= n; 1424 if (bp->b_rptr != bp->b_wptr) 1425 break; 1426 b_cont = bp->b_cont; 1427 freeb(bp); 1428 bp = b_cont; 1429 } while (len && bp); 1430 1431 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1432 1433 return (1); 1434 } 1435 1436 /* 1437 * Concatenate and align at least the first len bytes of common message 1438 * type. Len == -1 means concatenate everything. The original message is 1439 * unaltered. Returns a pointer to a new message on success, otherwise 1440 * returns NULL. 1441 */ 1442 mblk_t * 1443 msgpullup(mblk_t *mp, ssize_t len) 1444 { 1445 mblk_t *newmp; 1446 ssize_t totlen; 1447 ssize_t n; 1448 1449 /* 1450 * We won't handle Multidata message, since it contains 1451 * metadata which this function has no knowledge of; we 1452 * assert on DEBUG, and return failure otherwise. 1453 */ 1454 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1455 if (mp->b_datap->db_type == M_MULTIDATA) 1456 return (NULL); 1457 1458 totlen = xmsgsize(mp); 1459 1460 if ((len > 0) && (len > totlen)) 1461 return (NULL); 1462 1463 /* 1464 * Copy all of the first msg type into one new mblk, then dupmsg 1465 * and link the rest onto this. 1466 */ 1467 1468 len = totlen; 1469 1470 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1471 return (NULL); 1472 1473 newmp->b_flag = mp->b_flag; 1474 newmp->b_band = mp->b_band; 1475 1476 while (len > 0) { 1477 n = mp->b_wptr - mp->b_rptr; 1478 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1479 if (n > 0) 1480 bcopy(mp->b_rptr, newmp->b_wptr, n); 1481 newmp->b_wptr += n; 1482 len -= n; 1483 mp = mp->b_cont; 1484 } 1485 1486 if (mp != NULL) { 1487 newmp->b_cont = dupmsg(mp); 1488 if (newmp->b_cont == NULL) { 1489 freemsg(newmp); 1490 return (NULL); 1491 } 1492 } 1493 1494 return (newmp); 1495 } 1496 1497 /* 1498 * Trim bytes from message 1499 * len > 0, trim from head 1500 * len < 0, trim from tail 1501 * Returns 1 on success, 0 on failure. 1502 */ 1503 int 1504 adjmsg(mblk_t *mp, ssize_t len) 1505 { 1506 mblk_t *bp; 1507 mblk_t *save_bp = NULL; 1508 mblk_t *prev_bp; 1509 mblk_t *bcont; 1510 unsigned char type; 1511 ssize_t n; 1512 int fromhead; 1513 int first; 1514 1515 ASSERT(mp != NULL); 1516 /* 1517 * We won't handle Multidata message, since it contains 1518 * metadata which this function has no knowledge of; we 1519 * assert on DEBUG, and return failure otherwise. 1520 */ 1521 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1522 if (mp->b_datap->db_type == M_MULTIDATA) 1523 return (0); 1524 1525 if (len < 0) { 1526 fromhead = 0; 1527 len = -len; 1528 } else { 1529 fromhead = 1; 1530 } 1531 1532 if (xmsgsize(mp) < len) 1533 return (0); 1534 1535 1536 if (fromhead) { 1537 first = 1; 1538 while (len) { 1539 ASSERT(mp->b_wptr >= mp->b_rptr); 1540 n = MIN(mp->b_wptr - mp->b_rptr, len); 1541 mp->b_rptr += n; 1542 len -= n; 1543 1544 /* 1545 * If this is not the first zero length 1546 * message remove it 1547 */ 1548 if (!first && (mp->b_wptr == mp->b_rptr)) { 1549 bcont = mp->b_cont; 1550 freeb(mp); 1551 mp = save_bp->b_cont = bcont; 1552 } else { 1553 save_bp = mp; 1554 mp = mp->b_cont; 1555 } 1556 first = 0; 1557 } 1558 } else { 1559 type = mp->b_datap->db_type; 1560 while (len) { 1561 bp = mp; 1562 save_bp = NULL; 1563 1564 /* 1565 * Find the last message of same type 1566 */ 1567 1568 while (bp && bp->b_datap->db_type == type) { 1569 ASSERT(bp->b_wptr >= bp->b_rptr); 1570 prev_bp = save_bp; 1571 save_bp = bp; 1572 bp = bp->b_cont; 1573 } 1574 if (save_bp == NULL) 1575 break; 1576 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1577 save_bp->b_wptr -= n; 1578 len -= n; 1579 1580 /* 1581 * If this is not the first message 1582 * and we have taken away everything 1583 * from this message, remove it 1584 */ 1585 1586 if ((save_bp != mp) && 1587 (save_bp->b_wptr == save_bp->b_rptr)) { 1588 bcont = save_bp->b_cont; 1589 freeb(save_bp); 1590 prev_bp->b_cont = bcont; 1591 } 1592 } 1593 } 1594 return (1); 1595 } 1596 1597 /* 1598 * get number of data bytes in message 1599 */ 1600 size_t 1601 msgdsize(mblk_t *bp) 1602 { 1603 size_t count = 0; 1604 1605 for (; bp; bp = bp->b_cont) 1606 if (bp->b_datap->db_type == M_DATA) { 1607 ASSERT(bp->b_wptr >= bp->b_rptr); 1608 count += bp->b_wptr - bp->b_rptr; 1609 } 1610 return (count); 1611 } 1612 1613 /* 1614 * Get a message off head of queue 1615 * 1616 * If queue has no buffers then mark queue 1617 * with QWANTR. (queue wants to be read by 1618 * someone when data becomes available) 1619 * 1620 * If there is something to take off then do so. 1621 * If queue falls below hi water mark turn off QFULL 1622 * flag. Decrement weighted count of queue. 1623 * Also turn off QWANTR because queue is being read. 1624 * 1625 * The queue count is maintained on a per-band basis. 1626 * Priority band 0 (normal messages) uses q_count, 1627 * q_lowat, etc. Non-zero priority bands use the 1628 * fields in their respective qband structures 1629 * (qb_count, qb_lowat, etc.) All messages appear 1630 * on the same list, linked via their b_next pointers. 1631 * q_first is the head of the list. q_count does 1632 * not reflect the size of all the messages on the 1633 * queue. It only reflects those messages in the 1634 * normal band of flow. The one exception to this 1635 * deals with high priority messages. They are in 1636 * their own conceptual "band", but are accounted 1637 * against q_count. 1638 * 1639 * If queue count is below the lo water mark and QWANTW 1640 * is set, enable the closest backq which has a service 1641 * procedure and turn off the QWANTW flag. 1642 * 1643 * getq could be built on top of rmvq, but isn't because 1644 * of performance considerations. 1645 * 1646 * A note on the use of q_count and q_mblkcnt: 1647 * q_count is the traditional byte count for messages that 1648 * have been put on a queue. Documentation tells us that 1649 * we shouldn't rely on that count, but some drivers/modules 1650 * do. What was needed, however, is a mechanism to prevent 1651 * runaway streams from consuming all of the resources, 1652 * and particularly be able to flow control zero-length 1653 * messages. q_mblkcnt is used for this purpose. It 1654 * counts the number of mblk's that are being put on 1655 * the queue. The intention here, is that each mblk should 1656 * contain one byte of data and, for the purpose of 1657 * flow-control, logically does. A queue will become 1658 * full when EITHER of these values (q_count and q_mblkcnt) 1659 * reach the highwater mark. It will clear when BOTH 1660 * of them drop below the highwater mark. And it will 1661 * backenable when BOTH of them drop below the lowwater 1662 * mark. 1663 * With this algorithm, a driver/module might be able 1664 * to find a reasonably accurate q_count, and the 1665 * framework can still try and limit resource usage. 1666 */ 1667 mblk_t * 1668 getq(queue_t *q) 1669 { 1670 mblk_t *bp; 1671 uchar_t band = 0; 1672 1673 bp = getq_noenab(q); 1674 if (bp != NULL) 1675 band = bp->b_band; 1676 1677 /* 1678 * Inlined from qbackenable(). 1679 * Quick check without holding the lock. 1680 */ 1681 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1682 return (bp); 1683 1684 qbackenable(q, band); 1685 return (bp); 1686 } 1687 1688 /* 1689 * Calculate number of data bytes in a single data message block taking 1690 * multidata messages into account. 1691 */ 1692 1693 #define ADD_MBLK_SIZE(mp, size) \ 1694 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1695 (size) += MBLKL(mp); \ 1696 } else { \ 1697 uint_t pinuse; \ 1698 \ 1699 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1700 (size) += pinuse; \ 1701 } 1702 1703 /* 1704 * Like getq() but does not backenable. This is used by the stream 1705 * head when a putback() is likely. The caller must call qbackenable() 1706 * after it is done with accessing the queue. 1707 */ 1708 mblk_t * 1709 getq_noenab(queue_t *q) 1710 { 1711 mblk_t *bp; 1712 mblk_t *tmp; 1713 qband_t *qbp; 1714 kthread_id_t freezer; 1715 int bytecnt = 0, mblkcnt = 0; 1716 1717 /* freezestr should allow its caller to call getq/putq */ 1718 freezer = STREAM(q)->sd_freezer; 1719 if (freezer == curthread) { 1720 ASSERT(frozenstr(q)); 1721 ASSERT(MUTEX_HELD(QLOCK(q))); 1722 } else 1723 mutex_enter(QLOCK(q)); 1724 1725 if ((bp = q->q_first) == 0) { 1726 q->q_flag |= QWANTR; 1727 } else { 1728 if ((q->q_first = bp->b_next) == NULL) 1729 q->q_last = NULL; 1730 else 1731 q->q_first->b_prev = NULL; 1732 1733 /* Get message byte count for q_count accounting */ 1734 for (tmp = bp; tmp; tmp = tmp->b_cont) { 1735 ADD_MBLK_SIZE(tmp, bytecnt); 1736 mblkcnt++; 1737 } 1738 1739 if (bp->b_band == 0) { 1740 q->q_count -= bytecnt; 1741 q->q_mblkcnt -= mblkcnt; 1742 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 1743 (q->q_mblkcnt < q->q_hiwat))) { 1744 q->q_flag &= ~QFULL; 1745 } 1746 } else { 1747 int i; 1748 1749 ASSERT(bp->b_band <= q->q_nband); 1750 ASSERT(q->q_bandp != NULL); 1751 ASSERT(MUTEX_HELD(QLOCK(q))); 1752 qbp = q->q_bandp; 1753 i = bp->b_band; 1754 while (--i > 0) 1755 qbp = qbp->qb_next; 1756 if (qbp->qb_first == qbp->qb_last) { 1757 qbp->qb_first = NULL; 1758 qbp->qb_last = NULL; 1759 } else { 1760 qbp->qb_first = bp->b_next; 1761 } 1762 qbp->qb_count -= bytecnt; 1763 qbp->qb_mblkcnt -= mblkcnt; 1764 if (qbp->qb_mblkcnt == 0 || 1765 ((qbp->qb_count < qbp->qb_hiwat) && 1766 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 1767 qbp->qb_flag &= ~QB_FULL; 1768 } 1769 } 1770 q->q_flag &= ~QWANTR; 1771 bp->b_next = NULL; 1772 bp->b_prev = NULL; 1773 } 1774 if (freezer != curthread) 1775 mutex_exit(QLOCK(q)); 1776 1777 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 1778 1779 return (bp); 1780 } 1781 1782 /* 1783 * Determine if a backenable is needed after removing a message in the 1784 * specified band. 1785 * NOTE: This routine assumes that something like getq_noenab() has been 1786 * already called. 1787 * 1788 * For the read side it is ok to hold sd_lock across calling this (and the 1789 * stream head often does). 1790 * But for the write side strwakeq might be invoked and it acquires sd_lock. 1791 */ 1792 void 1793 qbackenable(queue_t *q, uchar_t band) 1794 { 1795 int backenab = 0; 1796 qband_t *qbp; 1797 kthread_id_t freezer; 1798 1799 ASSERT(q); 1800 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 1801 1802 /* 1803 * Quick check without holding the lock. 1804 * OK since after getq() has lowered the q_count these flags 1805 * would not change unless either the qbackenable() is done by 1806 * another thread (which is ok) or the queue has gotten QFULL 1807 * in which case another backenable will take place when the queue 1808 * drops below q_lowat. 1809 */ 1810 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1811 return; 1812 1813 /* freezestr should allow its caller to call getq/putq */ 1814 freezer = STREAM(q)->sd_freezer; 1815 if (freezer == curthread) { 1816 ASSERT(frozenstr(q)); 1817 ASSERT(MUTEX_HELD(QLOCK(q))); 1818 } else 1819 mutex_enter(QLOCK(q)); 1820 1821 if (band == 0) { 1822 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 1823 q->q_mblkcnt < q->q_lowat)) { 1824 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 1825 } 1826 } else { 1827 int i; 1828 1829 ASSERT((unsigned)band <= q->q_nband); 1830 ASSERT(q->q_bandp != NULL); 1831 1832 qbp = q->q_bandp; 1833 i = band; 1834 while (--i > 0) 1835 qbp = qbp->qb_next; 1836 1837 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 1838 qbp->qb_mblkcnt < qbp->qb_lowat)) { 1839 backenab = qbp->qb_flag & QB_WANTW; 1840 } 1841 } 1842 1843 if (backenab == 0) { 1844 if (freezer != curthread) 1845 mutex_exit(QLOCK(q)); 1846 return; 1847 } 1848 1849 /* Have to drop the lock across strwakeq and backenable */ 1850 if (backenab & QWANTWSYNC) 1851 q->q_flag &= ~QWANTWSYNC; 1852 if (backenab & (QWANTW|QB_WANTW)) { 1853 if (band != 0) 1854 qbp->qb_flag &= ~QB_WANTW; 1855 else { 1856 q->q_flag &= ~QWANTW; 1857 } 1858 } 1859 1860 if (freezer != curthread) 1861 mutex_exit(QLOCK(q)); 1862 1863 if (backenab & QWANTWSYNC) 1864 strwakeq(q, QWANTWSYNC); 1865 if (backenab & (QWANTW|QB_WANTW)) 1866 backenable(q, band); 1867 } 1868 1869 /* 1870 * Remove a message from a queue. The queue count and other 1871 * flow control parameters are adjusted and the back queue 1872 * enabled if necessary. 1873 * 1874 * rmvq can be called with the stream frozen, but other utility functions 1875 * holding QLOCK, and by streams modules without any locks/frozen. 1876 */ 1877 void 1878 rmvq(queue_t *q, mblk_t *mp) 1879 { 1880 ASSERT(mp != NULL); 1881 1882 rmvq_noenab(q, mp); 1883 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 1884 /* 1885 * qbackenable can handle a frozen stream but not a "random" 1886 * qlock being held. Drop lock across qbackenable. 1887 */ 1888 mutex_exit(QLOCK(q)); 1889 qbackenable(q, mp->b_band); 1890 mutex_enter(QLOCK(q)); 1891 } else { 1892 qbackenable(q, mp->b_band); 1893 } 1894 } 1895 1896 /* 1897 * Like rmvq() but without any backenabling. 1898 * This exists to handle SR_CONSOL_DATA in strrput(). 1899 */ 1900 void 1901 rmvq_noenab(queue_t *q, mblk_t *mp) 1902 { 1903 mblk_t *tmp; 1904 int i; 1905 qband_t *qbp = NULL; 1906 kthread_id_t freezer; 1907 int bytecnt = 0, mblkcnt = 0; 1908 1909 freezer = STREAM(q)->sd_freezer; 1910 if (freezer == curthread) { 1911 ASSERT(frozenstr(q)); 1912 ASSERT(MUTEX_HELD(QLOCK(q))); 1913 } else if (MUTEX_HELD(QLOCK(q))) { 1914 /* Don't drop lock on exit */ 1915 freezer = curthread; 1916 } else 1917 mutex_enter(QLOCK(q)); 1918 1919 ASSERT(mp->b_band <= q->q_nband); 1920 if (mp->b_band != 0) { /* Adjust band pointers */ 1921 ASSERT(q->q_bandp != NULL); 1922 qbp = q->q_bandp; 1923 i = mp->b_band; 1924 while (--i > 0) 1925 qbp = qbp->qb_next; 1926 if (mp == qbp->qb_first) { 1927 if (mp->b_next && mp->b_band == mp->b_next->b_band) 1928 qbp->qb_first = mp->b_next; 1929 else 1930 qbp->qb_first = NULL; 1931 } 1932 if (mp == qbp->qb_last) { 1933 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 1934 qbp->qb_last = mp->b_prev; 1935 else 1936 qbp->qb_last = NULL; 1937 } 1938 } 1939 1940 /* 1941 * Remove the message from the list. 1942 */ 1943 if (mp->b_prev) 1944 mp->b_prev->b_next = mp->b_next; 1945 else 1946 q->q_first = mp->b_next; 1947 if (mp->b_next) 1948 mp->b_next->b_prev = mp->b_prev; 1949 else 1950 q->q_last = mp->b_prev; 1951 mp->b_next = NULL; 1952 mp->b_prev = NULL; 1953 1954 /* Get the size of the message for q_count accounting */ 1955 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1956 ADD_MBLK_SIZE(tmp, bytecnt); 1957 mblkcnt++; 1958 } 1959 1960 if (mp->b_band == 0) { /* Perform q_count accounting */ 1961 q->q_count -= bytecnt; 1962 q->q_mblkcnt -= mblkcnt; 1963 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 1964 (q->q_mblkcnt < q->q_hiwat))) { 1965 q->q_flag &= ~QFULL; 1966 } 1967 } else { /* Perform qb_count accounting */ 1968 qbp->qb_count -= bytecnt; 1969 qbp->qb_mblkcnt -= mblkcnt; 1970 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) && 1971 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 1972 qbp->qb_flag &= ~QB_FULL; 1973 } 1974 } 1975 if (freezer != curthread) 1976 mutex_exit(QLOCK(q)); 1977 1978 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 1979 } 1980 1981 /* 1982 * Empty a queue. 1983 * If flag is set, remove all messages. Otherwise, remove 1984 * only non-control messages. If queue falls below its low 1985 * water mark, and QWANTW is set, enable the nearest upstream 1986 * service procedure. 1987 * 1988 * Historical note: when merging the M_FLUSH code in strrput with this 1989 * code one difference was discovered. flushq did not have a check 1990 * for q_lowat == 0 in the backenabling test. 1991 * 1992 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 1993 * if one exists on the queue. 1994 */ 1995 void 1996 flushq_common(queue_t *q, int flag, int pcproto_flag) 1997 { 1998 mblk_t *mp, *nmp; 1999 qband_t *qbp; 2000 int backenab = 0; 2001 unsigned char bpri; 2002 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2003 2004 if (q->q_first == NULL) 2005 return; 2006 2007 mutex_enter(QLOCK(q)); 2008 mp = q->q_first; 2009 q->q_first = NULL; 2010 q->q_last = NULL; 2011 q->q_count = 0; 2012 q->q_mblkcnt = 0; 2013 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2014 qbp->qb_first = NULL; 2015 qbp->qb_last = NULL; 2016 qbp->qb_count = 0; 2017 qbp->qb_mblkcnt = 0; 2018 qbp->qb_flag &= ~QB_FULL; 2019 } 2020 q->q_flag &= ~QFULL; 2021 mutex_exit(QLOCK(q)); 2022 while (mp) { 2023 nmp = mp->b_next; 2024 mp->b_next = mp->b_prev = NULL; 2025 2026 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2027 2028 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2029 (void) putq(q, mp); 2030 else if (flag || datamsg(mp->b_datap->db_type)) 2031 freemsg(mp); 2032 else 2033 (void) putq(q, mp); 2034 mp = nmp; 2035 } 2036 bpri = 1; 2037 mutex_enter(QLOCK(q)); 2038 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2039 if ((qbp->qb_flag & QB_WANTW) && 2040 (((qbp->qb_count < qbp->qb_lowat) && 2041 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2042 qbp->qb_lowat == 0)) { 2043 qbp->qb_flag &= ~QB_WANTW; 2044 backenab = 1; 2045 qbf[bpri] = 1; 2046 } else 2047 qbf[bpri] = 0; 2048 bpri++; 2049 } 2050 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2051 if ((q->q_flag & QWANTW) && 2052 (((q->q_count < q->q_lowat) && 2053 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2054 q->q_flag &= ~QWANTW; 2055 backenab = 1; 2056 qbf[0] = 1; 2057 } else 2058 qbf[0] = 0; 2059 2060 /* 2061 * If any band can now be written to, and there is a writer 2062 * for that band, then backenable the closest service procedure. 2063 */ 2064 if (backenab) { 2065 mutex_exit(QLOCK(q)); 2066 for (bpri = q->q_nband; bpri != 0; bpri--) 2067 if (qbf[bpri]) 2068 backenable(q, bpri); 2069 if (qbf[0]) 2070 backenable(q, 0); 2071 } else 2072 mutex_exit(QLOCK(q)); 2073 } 2074 2075 /* 2076 * The real flushing takes place in flushq_common. This is done so that 2077 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2078 * or not. Currently the only place that uses this flag is the stream head. 2079 */ 2080 void 2081 flushq(queue_t *q, int flag) 2082 { 2083 flushq_common(q, flag, 0); 2084 } 2085 2086 /* 2087 * Flush the queue of messages of the given priority band. 2088 * There is some duplication of code between flushq and flushband. 2089 * This is because we want to optimize the code as much as possible. 2090 * The assumption is that there will be more messages in the normal 2091 * (priority 0) band than in any other. 2092 * 2093 * Historical note: when merging the M_FLUSH code in strrput with this 2094 * code one difference was discovered. flushband had an extra check for 2095 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2096 * case. That check does not match the man page for flushband and was not 2097 * in the strrput flush code hence it was removed. 2098 */ 2099 void 2100 flushband(queue_t *q, unsigned char pri, int flag) 2101 { 2102 mblk_t *mp; 2103 mblk_t *nmp; 2104 mblk_t *last; 2105 qband_t *qbp; 2106 int band; 2107 2108 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2109 if (pri > q->q_nband) { 2110 return; 2111 } 2112 mutex_enter(QLOCK(q)); 2113 if (pri == 0) { 2114 mp = q->q_first; 2115 q->q_first = NULL; 2116 q->q_last = NULL; 2117 q->q_count = 0; 2118 q->q_mblkcnt = 0; 2119 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2120 qbp->qb_first = NULL; 2121 qbp->qb_last = NULL; 2122 qbp->qb_count = 0; 2123 qbp->qb_mblkcnt = 0; 2124 qbp->qb_flag &= ~QB_FULL; 2125 } 2126 q->q_flag &= ~QFULL; 2127 mutex_exit(QLOCK(q)); 2128 while (mp) { 2129 nmp = mp->b_next; 2130 mp->b_next = mp->b_prev = NULL; 2131 if ((mp->b_band == 0) && 2132 ((flag == FLUSHALL) || 2133 datamsg(mp->b_datap->db_type))) 2134 freemsg(mp); 2135 else 2136 (void) putq(q, mp); 2137 mp = nmp; 2138 } 2139 mutex_enter(QLOCK(q)); 2140 if ((q->q_flag & QWANTW) && 2141 (((q->q_count < q->q_lowat) && 2142 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2143 q->q_flag &= ~QWANTW; 2144 mutex_exit(QLOCK(q)); 2145 2146 backenable(q, pri); 2147 } else 2148 mutex_exit(QLOCK(q)); 2149 } else { /* pri != 0 */ 2150 boolean_t flushed = B_FALSE; 2151 band = pri; 2152 2153 ASSERT(MUTEX_HELD(QLOCK(q))); 2154 qbp = q->q_bandp; 2155 while (--band > 0) 2156 qbp = qbp->qb_next; 2157 mp = qbp->qb_first; 2158 if (mp == NULL) { 2159 mutex_exit(QLOCK(q)); 2160 return; 2161 } 2162 last = qbp->qb_last->b_next; 2163 /* 2164 * rmvq_noenab() and freemsg() are called for each mblk that 2165 * meets the criteria. The loop is executed until the last 2166 * mblk has been processed. 2167 */ 2168 while (mp != last) { 2169 ASSERT(mp->b_band == pri); 2170 nmp = mp->b_next; 2171 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2172 rmvq_noenab(q, mp); 2173 freemsg(mp); 2174 flushed = B_TRUE; 2175 } 2176 mp = nmp; 2177 } 2178 mutex_exit(QLOCK(q)); 2179 2180 /* 2181 * If any mblk(s) has been freed, we know that qbackenable() 2182 * will need to be called. 2183 */ 2184 if (flushed) 2185 qbackenable(q, pri); 2186 } 2187 } 2188 2189 /* 2190 * Return 1 if the queue is not full. If the queue is full, return 2191 * 0 (may not put message) and set QWANTW flag (caller wants to write 2192 * to the queue). 2193 */ 2194 int 2195 canput(queue_t *q) 2196 { 2197 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2198 2199 /* this is for loopback transports, they should not do a canput */ 2200 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2201 2202 /* Find next forward module that has a service procedure */ 2203 q = q->q_nfsrv; 2204 2205 if (!(q->q_flag & QFULL)) { 2206 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2207 return (1); 2208 } 2209 mutex_enter(QLOCK(q)); 2210 if (q->q_flag & QFULL) { 2211 q->q_flag |= QWANTW; 2212 mutex_exit(QLOCK(q)); 2213 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2214 return (0); 2215 } 2216 mutex_exit(QLOCK(q)); 2217 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2218 return (1); 2219 } 2220 2221 /* 2222 * This is the new canput for use with priority bands. Return 1 if the 2223 * band is not full. If the band is full, return 0 (may not put message) 2224 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2225 * write to the queue). 2226 */ 2227 int 2228 bcanput(queue_t *q, unsigned char pri) 2229 { 2230 qband_t *qbp; 2231 2232 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2233 if (!q) 2234 return (0); 2235 2236 /* Find next forward module that has a service procedure */ 2237 q = q->q_nfsrv; 2238 2239 mutex_enter(QLOCK(q)); 2240 if (pri == 0) { 2241 if (q->q_flag & QFULL) { 2242 q->q_flag |= QWANTW; 2243 mutex_exit(QLOCK(q)); 2244 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2245 "bcanput:%p %X %d", q, pri, 0); 2246 return (0); 2247 } 2248 } else { /* pri != 0 */ 2249 if (pri > q->q_nband) { 2250 /* 2251 * No band exists yet, so return success. 2252 */ 2253 mutex_exit(QLOCK(q)); 2254 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2255 "bcanput:%p %X %d", q, pri, 1); 2256 return (1); 2257 } 2258 qbp = q->q_bandp; 2259 while (--pri) 2260 qbp = qbp->qb_next; 2261 if (qbp->qb_flag & QB_FULL) { 2262 qbp->qb_flag |= QB_WANTW; 2263 mutex_exit(QLOCK(q)); 2264 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2265 "bcanput:%p %X %d", q, pri, 0); 2266 return (0); 2267 } 2268 } 2269 mutex_exit(QLOCK(q)); 2270 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2271 "bcanput:%p %X %d", q, pri, 1); 2272 return (1); 2273 } 2274 2275 /* 2276 * Put a message on a queue. 2277 * 2278 * Messages are enqueued on a priority basis. The priority classes 2279 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2280 * and B_NORMAL (type < QPCTL && band == 0). 2281 * 2282 * Add appropriate weighted data block sizes to queue count. 2283 * If queue hits high water mark then set QFULL flag. 2284 * 2285 * If QNOENAB is not set (putq is allowed to enable the queue), 2286 * enable the queue only if the message is PRIORITY, 2287 * or the QWANTR flag is set (indicating that the service procedure 2288 * is ready to read the queue. This implies that a service 2289 * procedure must NEVER put a high priority message back on its own 2290 * queue, as this would result in an infinite loop (!). 2291 */ 2292 int 2293 putq(queue_t *q, mblk_t *bp) 2294 { 2295 mblk_t *tmp; 2296 qband_t *qbp = NULL; 2297 int mcls = (int)queclass(bp); 2298 kthread_id_t freezer; 2299 int bytecnt = 0, mblkcnt = 0; 2300 2301 freezer = STREAM(q)->sd_freezer; 2302 if (freezer == curthread) { 2303 ASSERT(frozenstr(q)); 2304 ASSERT(MUTEX_HELD(QLOCK(q))); 2305 } else 2306 mutex_enter(QLOCK(q)); 2307 2308 /* 2309 * Make sanity checks and if qband structure is not yet 2310 * allocated, do so. 2311 */ 2312 if (mcls == QPCTL) { 2313 if (bp->b_band != 0) 2314 bp->b_band = 0; /* force to be correct */ 2315 } else if (bp->b_band != 0) { 2316 int i; 2317 qband_t **qbpp; 2318 2319 if (bp->b_band > q->q_nband) { 2320 2321 /* 2322 * The qband structure for this priority band is 2323 * not on the queue yet, so we have to allocate 2324 * one on the fly. It would be wasteful to 2325 * associate the qband structures with every 2326 * queue when the queues are allocated. This is 2327 * because most queues will only need the normal 2328 * band of flow which can be described entirely 2329 * by the queue itself. 2330 */ 2331 qbpp = &q->q_bandp; 2332 while (*qbpp) 2333 qbpp = &(*qbpp)->qb_next; 2334 while (bp->b_band > q->q_nband) { 2335 if ((*qbpp = allocband()) == NULL) { 2336 if (freezer != curthread) 2337 mutex_exit(QLOCK(q)); 2338 return (0); 2339 } 2340 (*qbpp)->qb_hiwat = q->q_hiwat; 2341 (*qbpp)->qb_lowat = q->q_lowat; 2342 q->q_nband++; 2343 qbpp = &(*qbpp)->qb_next; 2344 } 2345 } 2346 ASSERT(MUTEX_HELD(QLOCK(q))); 2347 qbp = q->q_bandp; 2348 i = bp->b_band; 2349 while (--i) 2350 qbp = qbp->qb_next; 2351 } 2352 2353 /* 2354 * If queue is empty, add the message and initialize the pointers. 2355 * Otherwise, adjust message pointers and queue pointers based on 2356 * the type of the message and where it belongs on the queue. Some 2357 * code is duplicated to minimize the number of conditionals and 2358 * hopefully minimize the amount of time this routine takes. 2359 */ 2360 if (!q->q_first) { 2361 bp->b_next = NULL; 2362 bp->b_prev = NULL; 2363 q->q_first = bp; 2364 q->q_last = bp; 2365 if (qbp) { 2366 qbp->qb_first = bp; 2367 qbp->qb_last = bp; 2368 } 2369 } else if (!qbp) { /* bp->b_band == 0 */ 2370 2371 /* 2372 * If queue class of message is less than or equal to 2373 * that of the last one on the queue, tack on to the end. 2374 */ 2375 tmp = q->q_last; 2376 if (mcls <= (int)queclass(tmp)) { 2377 bp->b_next = NULL; 2378 bp->b_prev = tmp; 2379 tmp->b_next = bp; 2380 q->q_last = bp; 2381 } else { 2382 tmp = q->q_first; 2383 while ((int)queclass(tmp) >= mcls) 2384 tmp = tmp->b_next; 2385 2386 /* 2387 * Insert bp before tmp. 2388 */ 2389 bp->b_next = tmp; 2390 bp->b_prev = tmp->b_prev; 2391 if (tmp->b_prev) 2392 tmp->b_prev->b_next = bp; 2393 else 2394 q->q_first = bp; 2395 tmp->b_prev = bp; 2396 } 2397 } else { /* bp->b_band != 0 */ 2398 if (qbp->qb_first) { 2399 tmp = qbp->qb_last; 2400 2401 /* 2402 * Insert bp after the last message in this band. 2403 */ 2404 bp->b_next = tmp->b_next; 2405 if (tmp->b_next) 2406 tmp->b_next->b_prev = bp; 2407 else 2408 q->q_last = bp; 2409 bp->b_prev = tmp; 2410 tmp->b_next = bp; 2411 } else { 2412 tmp = q->q_last; 2413 if ((mcls < (int)queclass(tmp)) || 2414 (bp->b_band <= tmp->b_band)) { 2415 2416 /* 2417 * Tack bp on end of queue. 2418 */ 2419 bp->b_next = NULL; 2420 bp->b_prev = tmp; 2421 tmp->b_next = bp; 2422 q->q_last = bp; 2423 } else { 2424 tmp = q->q_first; 2425 while (tmp->b_datap->db_type >= QPCTL) 2426 tmp = tmp->b_next; 2427 while (tmp->b_band >= bp->b_band) 2428 tmp = tmp->b_next; 2429 2430 /* 2431 * Insert bp before tmp. 2432 */ 2433 bp->b_next = tmp; 2434 bp->b_prev = tmp->b_prev; 2435 if (tmp->b_prev) 2436 tmp->b_prev->b_next = bp; 2437 else 2438 q->q_first = bp; 2439 tmp->b_prev = bp; 2440 } 2441 qbp->qb_first = bp; 2442 } 2443 qbp->qb_last = bp; 2444 } 2445 2446 /* Get message byte count for q_count accounting */ 2447 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2448 ADD_MBLK_SIZE(tmp, bytecnt); 2449 mblkcnt++; 2450 } 2451 2452 if (qbp) { 2453 qbp->qb_count += bytecnt; 2454 qbp->qb_mblkcnt += mblkcnt; 2455 if ((qbp->qb_count >= qbp->qb_hiwat) || 2456 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2457 qbp->qb_flag |= QB_FULL; 2458 } 2459 } else { 2460 q->q_count += bytecnt; 2461 q->q_mblkcnt += mblkcnt; 2462 if ((q->q_count >= q->q_hiwat) || 2463 (q->q_mblkcnt >= q->q_hiwat)) { 2464 q->q_flag |= QFULL; 2465 } 2466 } 2467 2468 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2469 2470 if ((mcls > QNORM) || 2471 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2472 qenable_locked(q); 2473 ASSERT(MUTEX_HELD(QLOCK(q))); 2474 if (freezer != curthread) 2475 mutex_exit(QLOCK(q)); 2476 2477 return (1); 2478 } 2479 2480 /* 2481 * Put stuff back at beginning of Q according to priority order. 2482 * See comment on putq above for details. 2483 */ 2484 int 2485 putbq(queue_t *q, mblk_t *bp) 2486 { 2487 mblk_t *tmp; 2488 qband_t *qbp = NULL; 2489 int mcls = (int)queclass(bp); 2490 kthread_id_t freezer; 2491 int bytecnt = 0, mblkcnt = 0; 2492 2493 ASSERT(q && bp); 2494 ASSERT(bp->b_next == NULL); 2495 freezer = STREAM(q)->sd_freezer; 2496 if (freezer == curthread) { 2497 ASSERT(frozenstr(q)); 2498 ASSERT(MUTEX_HELD(QLOCK(q))); 2499 } else 2500 mutex_enter(QLOCK(q)); 2501 2502 /* 2503 * Make sanity checks and if qband structure is not yet 2504 * allocated, do so. 2505 */ 2506 if (mcls == QPCTL) { 2507 if (bp->b_band != 0) 2508 bp->b_band = 0; /* force to be correct */ 2509 } else if (bp->b_band != 0) { 2510 int i; 2511 qband_t **qbpp; 2512 2513 if (bp->b_band > q->q_nband) { 2514 qbpp = &q->q_bandp; 2515 while (*qbpp) 2516 qbpp = &(*qbpp)->qb_next; 2517 while (bp->b_band > q->q_nband) { 2518 if ((*qbpp = allocband()) == NULL) { 2519 if (freezer != curthread) 2520 mutex_exit(QLOCK(q)); 2521 return (0); 2522 } 2523 (*qbpp)->qb_hiwat = q->q_hiwat; 2524 (*qbpp)->qb_lowat = q->q_lowat; 2525 q->q_nband++; 2526 qbpp = &(*qbpp)->qb_next; 2527 } 2528 } 2529 qbp = q->q_bandp; 2530 i = bp->b_band; 2531 while (--i) 2532 qbp = qbp->qb_next; 2533 } 2534 2535 /* 2536 * If queue is empty or if message is high priority, 2537 * place on the front of the queue. 2538 */ 2539 tmp = q->q_first; 2540 if ((!tmp) || (mcls == QPCTL)) { 2541 bp->b_next = tmp; 2542 if (tmp) 2543 tmp->b_prev = bp; 2544 else 2545 q->q_last = bp; 2546 q->q_first = bp; 2547 bp->b_prev = NULL; 2548 if (qbp) { 2549 qbp->qb_first = bp; 2550 qbp->qb_last = bp; 2551 } 2552 } else if (qbp) { /* bp->b_band != 0 */ 2553 tmp = qbp->qb_first; 2554 if (tmp) { 2555 2556 /* 2557 * Insert bp before the first message in this band. 2558 */ 2559 bp->b_next = tmp; 2560 bp->b_prev = tmp->b_prev; 2561 if (tmp->b_prev) 2562 tmp->b_prev->b_next = bp; 2563 else 2564 q->q_first = bp; 2565 tmp->b_prev = bp; 2566 } else { 2567 tmp = q->q_last; 2568 if ((mcls < (int)queclass(tmp)) || 2569 (bp->b_band < tmp->b_band)) { 2570 2571 /* 2572 * Tack bp on end of queue. 2573 */ 2574 bp->b_next = NULL; 2575 bp->b_prev = tmp; 2576 tmp->b_next = bp; 2577 q->q_last = bp; 2578 } else { 2579 tmp = q->q_first; 2580 while (tmp->b_datap->db_type >= QPCTL) 2581 tmp = tmp->b_next; 2582 while (tmp->b_band > bp->b_band) 2583 tmp = tmp->b_next; 2584 2585 /* 2586 * Insert bp before tmp. 2587 */ 2588 bp->b_next = tmp; 2589 bp->b_prev = tmp->b_prev; 2590 if (tmp->b_prev) 2591 tmp->b_prev->b_next = bp; 2592 else 2593 q->q_first = bp; 2594 tmp->b_prev = bp; 2595 } 2596 qbp->qb_last = bp; 2597 } 2598 qbp->qb_first = bp; 2599 } else { /* bp->b_band == 0 && !QPCTL */ 2600 2601 /* 2602 * If the queue class or band is less than that of the last 2603 * message on the queue, tack bp on the end of the queue. 2604 */ 2605 tmp = q->q_last; 2606 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2607 bp->b_next = NULL; 2608 bp->b_prev = tmp; 2609 tmp->b_next = bp; 2610 q->q_last = bp; 2611 } else { 2612 tmp = q->q_first; 2613 while (tmp->b_datap->db_type >= QPCTL) 2614 tmp = tmp->b_next; 2615 while (tmp->b_band > bp->b_band) 2616 tmp = tmp->b_next; 2617 2618 /* 2619 * Insert bp before tmp. 2620 */ 2621 bp->b_next = tmp; 2622 bp->b_prev = tmp->b_prev; 2623 if (tmp->b_prev) 2624 tmp->b_prev->b_next = bp; 2625 else 2626 q->q_first = bp; 2627 tmp->b_prev = bp; 2628 } 2629 } 2630 2631 /* Get message byte count for q_count accounting */ 2632 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2633 ADD_MBLK_SIZE(tmp, bytecnt); 2634 mblkcnt++; 2635 } 2636 if (qbp) { 2637 qbp->qb_count += bytecnt; 2638 qbp->qb_mblkcnt += mblkcnt; 2639 if ((qbp->qb_count >= qbp->qb_hiwat) || 2640 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2641 qbp->qb_flag |= QB_FULL; 2642 } 2643 } else { 2644 q->q_count += bytecnt; 2645 q->q_mblkcnt += mblkcnt; 2646 if ((q->q_count >= q->q_hiwat) || 2647 (q->q_mblkcnt >= q->q_hiwat)) { 2648 q->q_flag |= QFULL; 2649 } 2650 } 2651 2652 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2653 2654 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2655 qenable_locked(q); 2656 ASSERT(MUTEX_HELD(QLOCK(q))); 2657 if (freezer != curthread) 2658 mutex_exit(QLOCK(q)); 2659 2660 return (1); 2661 } 2662 2663 /* 2664 * Insert a message before an existing message on the queue. If the 2665 * existing message is NULL, the new messages is placed on the end of 2666 * the queue. The queue class of the new message is ignored. However, 2667 * the priority band of the new message must adhere to the following 2668 * ordering: 2669 * 2670 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2671 * 2672 * All flow control parameters are updated. 2673 * 2674 * insq can be called with the stream frozen, but other utility functions 2675 * holding QLOCK, and by streams modules without any locks/frozen. 2676 */ 2677 int 2678 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2679 { 2680 mblk_t *tmp; 2681 qband_t *qbp = NULL; 2682 int mcls = (int)queclass(mp); 2683 kthread_id_t freezer; 2684 int bytecnt = 0, mblkcnt = 0; 2685 2686 freezer = STREAM(q)->sd_freezer; 2687 if (freezer == curthread) { 2688 ASSERT(frozenstr(q)); 2689 ASSERT(MUTEX_HELD(QLOCK(q))); 2690 } else if (MUTEX_HELD(QLOCK(q))) { 2691 /* Don't drop lock on exit */ 2692 freezer = curthread; 2693 } else 2694 mutex_enter(QLOCK(q)); 2695 2696 if (mcls == QPCTL) { 2697 if (mp->b_band != 0) 2698 mp->b_band = 0; /* force to be correct */ 2699 if (emp && emp->b_prev && 2700 (emp->b_prev->b_datap->db_type < QPCTL)) 2701 goto badord; 2702 } 2703 if (emp) { 2704 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2705 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2706 (emp->b_prev->b_band < mp->b_band))) { 2707 goto badord; 2708 } 2709 } else { 2710 tmp = q->q_last; 2711 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2712 badord: 2713 cmn_err(CE_WARN, 2714 "insq: attempt to insert message out of order " 2715 "on q %p", (void *)q); 2716 if (freezer != curthread) 2717 mutex_exit(QLOCK(q)); 2718 return (0); 2719 } 2720 } 2721 2722 if (mp->b_band != 0) { 2723 int i; 2724 qband_t **qbpp; 2725 2726 if (mp->b_band > q->q_nband) { 2727 qbpp = &q->q_bandp; 2728 while (*qbpp) 2729 qbpp = &(*qbpp)->qb_next; 2730 while (mp->b_band > q->q_nband) { 2731 if ((*qbpp = allocband()) == NULL) { 2732 if (freezer != curthread) 2733 mutex_exit(QLOCK(q)); 2734 return (0); 2735 } 2736 (*qbpp)->qb_hiwat = q->q_hiwat; 2737 (*qbpp)->qb_lowat = q->q_lowat; 2738 q->q_nband++; 2739 qbpp = &(*qbpp)->qb_next; 2740 } 2741 } 2742 qbp = q->q_bandp; 2743 i = mp->b_band; 2744 while (--i) 2745 qbp = qbp->qb_next; 2746 } 2747 2748 if ((mp->b_next = emp) != NULL) { 2749 if ((mp->b_prev = emp->b_prev) != NULL) 2750 emp->b_prev->b_next = mp; 2751 else 2752 q->q_first = mp; 2753 emp->b_prev = mp; 2754 } else { 2755 if ((mp->b_prev = q->q_last) != NULL) 2756 q->q_last->b_next = mp; 2757 else 2758 q->q_first = mp; 2759 q->q_last = mp; 2760 } 2761 2762 /* Get mblk and byte count for q_count accounting */ 2763 for (tmp = mp; tmp; tmp = tmp->b_cont) { 2764 ADD_MBLK_SIZE(tmp, bytecnt); 2765 mblkcnt++; 2766 } 2767 2768 if (qbp) { /* adjust qband pointers and count */ 2769 if (!qbp->qb_first) { 2770 qbp->qb_first = mp; 2771 qbp->qb_last = mp; 2772 } else { 2773 if (mp->b_prev == NULL || (mp->b_prev != NULL && 2774 (mp->b_prev->b_band != mp->b_band))) 2775 qbp->qb_first = mp; 2776 else if (mp->b_next == NULL || (mp->b_next != NULL && 2777 (mp->b_next->b_band != mp->b_band))) 2778 qbp->qb_last = mp; 2779 } 2780 qbp->qb_count += bytecnt; 2781 qbp->qb_mblkcnt += mblkcnt; 2782 if ((qbp->qb_count >= qbp->qb_hiwat) || 2783 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2784 qbp->qb_flag |= QB_FULL; 2785 } 2786 } else { 2787 q->q_count += bytecnt; 2788 q->q_mblkcnt += mblkcnt; 2789 if ((q->q_count >= q->q_hiwat) || 2790 (q->q_mblkcnt >= q->q_hiwat)) { 2791 q->q_flag |= QFULL; 2792 } 2793 } 2794 2795 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 2796 2797 if (canenable(q) && (q->q_flag & QWANTR)) 2798 qenable_locked(q); 2799 2800 ASSERT(MUTEX_HELD(QLOCK(q))); 2801 if (freezer != curthread) 2802 mutex_exit(QLOCK(q)); 2803 2804 return (1); 2805 } 2806 2807 /* 2808 * Create and put a control message on queue. 2809 */ 2810 int 2811 putctl(queue_t *q, int type) 2812 { 2813 mblk_t *bp; 2814 2815 if ((datamsg(type) && (type != M_DELAY)) || 2816 (bp = allocb_tryhard(0)) == NULL) 2817 return (0); 2818 bp->b_datap->db_type = (unsigned char) type; 2819 2820 put(q, bp); 2821 2822 return (1); 2823 } 2824 2825 /* 2826 * Control message with a single-byte parameter 2827 */ 2828 int 2829 putctl1(queue_t *q, int type, int param) 2830 { 2831 mblk_t *bp; 2832 2833 if ((datamsg(type) && (type != M_DELAY)) || 2834 (bp = allocb_tryhard(1)) == NULL) 2835 return (0); 2836 bp->b_datap->db_type = (unsigned char)type; 2837 *bp->b_wptr++ = (unsigned char)param; 2838 2839 put(q, bp); 2840 2841 return (1); 2842 } 2843 2844 int 2845 putnextctl1(queue_t *q, int type, int param) 2846 { 2847 mblk_t *bp; 2848 2849 if ((datamsg(type) && (type != M_DELAY)) || 2850 ((bp = allocb_tryhard(1)) == NULL)) 2851 return (0); 2852 2853 bp->b_datap->db_type = (unsigned char)type; 2854 *bp->b_wptr++ = (unsigned char)param; 2855 2856 putnext(q, bp); 2857 2858 return (1); 2859 } 2860 2861 int 2862 putnextctl(queue_t *q, int type) 2863 { 2864 mblk_t *bp; 2865 2866 if ((datamsg(type) && (type != M_DELAY)) || 2867 ((bp = allocb_tryhard(0)) == NULL)) 2868 return (0); 2869 bp->b_datap->db_type = (unsigned char)type; 2870 2871 putnext(q, bp); 2872 2873 return (1); 2874 } 2875 2876 /* 2877 * Return the queue upstream from this one 2878 */ 2879 queue_t * 2880 backq(queue_t *q) 2881 { 2882 q = _OTHERQ(q); 2883 if (q->q_next) { 2884 q = q->q_next; 2885 return (_OTHERQ(q)); 2886 } 2887 return (NULL); 2888 } 2889 2890 /* 2891 * Send a block back up the queue in reverse from this 2892 * one (e.g. to respond to ioctls) 2893 */ 2894 void 2895 qreply(queue_t *q, mblk_t *bp) 2896 { 2897 ASSERT(q && bp); 2898 2899 putnext(_OTHERQ(q), bp); 2900 } 2901 2902 /* 2903 * Streams Queue Scheduling 2904 * 2905 * Queues are enabled through qenable() when they have messages to 2906 * process. They are serviced by queuerun(), which runs each enabled 2907 * queue's service procedure. The call to queuerun() is processor 2908 * dependent - the general principle is that it be run whenever a queue 2909 * is enabled but before returning to user level. For system calls, 2910 * the function runqueues() is called if their action causes a queue 2911 * to be enabled. For device interrupts, queuerun() should be 2912 * called before returning from the last level of interrupt. Beyond 2913 * this, no timing assumptions should be made about queue scheduling. 2914 */ 2915 2916 /* 2917 * Enable a queue: put it on list of those whose service procedures are 2918 * ready to run and set up the scheduling mechanism. 2919 * The broadcast is done outside the mutex -> to avoid the woken thread 2920 * from contending with the mutex. This is OK 'cos the queue has been 2921 * enqueued on the runlist and flagged safely at this point. 2922 */ 2923 void 2924 qenable(queue_t *q) 2925 { 2926 mutex_enter(QLOCK(q)); 2927 qenable_locked(q); 2928 mutex_exit(QLOCK(q)); 2929 } 2930 /* 2931 * Return number of messages on queue 2932 */ 2933 int 2934 qsize(queue_t *qp) 2935 { 2936 int count = 0; 2937 mblk_t *mp; 2938 2939 mutex_enter(QLOCK(qp)); 2940 for (mp = qp->q_first; mp; mp = mp->b_next) 2941 count++; 2942 mutex_exit(QLOCK(qp)); 2943 return (count); 2944 } 2945 2946 /* 2947 * noenable - set queue so that putq() will not enable it. 2948 * enableok - set queue so that putq() can enable it. 2949 */ 2950 void 2951 noenable(queue_t *q) 2952 { 2953 mutex_enter(QLOCK(q)); 2954 q->q_flag |= QNOENB; 2955 mutex_exit(QLOCK(q)); 2956 } 2957 2958 void 2959 enableok(queue_t *q) 2960 { 2961 mutex_enter(QLOCK(q)); 2962 q->q_flag &= ~QNOENB; 2963 mutex_exit(QLOCK(q)); 2964 } 2965 2966 /* 2967 * Set queue fields. 2968 */ 2969 int 2970 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 2971 { 2972 qband_t *qbp = NULL; 2973 queue_t *wrq; 2974 int error = 0; 2975 kthread_id_t freezer; 2976 2977 freezer = STREAM(q)->sd_freezer; 2978 if (freezer == curthread) { 2979 ASSERT(frozenstr(q)); 2980 ASSERT(MUTEX_HELD(QLOCK(q))); 2981 } else 2982 mutex_enter(QLOCK(q)); 2983 2984 if (what >= QBAD) { 2985 error = EINVAL; 2986 goto done; 2987 } 2988 if (pri != 0) { 2989 int i; 2990 qband_t **qbpp; 2991 2992 if (pri > q->q_nband) { 2993 qbpp = &q->q_bandp; 2994 while (*qbpp) 2995 qbpp = &(*qbpp)->qb_next; 2996 while (pri > q->q_nband) { 2997 if ((*qbpp = allocband()) == NULL) { 2998 error = EAGAIN; 2999 goto done; 3000 } 3001 (*qbpp)->qb_hiwat = q->q_hiwat; 3002 (*qbpp)->qb_lowat = q->q_lowat; 3003 q->q_nband++; 3004 qbpp = &(*qbpp)->qb_next; 3005 } 3006 } 3007 qbp = q->q_bandp; 3008 i = pri; 3009 while (--i) 3010 qbp = qbp->qb_next; 3011 } 3012 switch (what) { 3013 3014 case QHIWAT: 3015 if (qbp) 3016 qbp->qb_hiwat = (size_t)val; 3017 else 3018 q->q_hiwat = (size_t)val; 3019 break; 3020 3021 case QLOWAT: 3022 if (qbp) 3023 qbp->qb_lowat = (size_t)val; 3024 else 3025 q->q_lowat = (size_t)val; 3026 break; 3027 3028 case QMAXPSZ: 3029 if (qbp) 3030 error = EINVAL; 3031 else 3032 q->q_maxpsz = (ssize_t)val; 3033 3034 /* 3035 * Performance concern, strwrite looks at the module below 3036 * the stream head for the maxpsz each time it does a write 3037 * we now cache it at the stream head. Check to see if this 3038 * queue is sitting directly below the stream head. 3039 */ 3040 wrq = STREAM(q)->sd_wrq; 3041 if (q != wrq->q_next) 3042 break; 3043 3044 /* 3045 * If the stream is not frozen drop the current QLOCK and 3046 * acquire the sd_wrq QLOCK which protects sd_qn_* 3047 */ 3048 if (freezer != curthread) { 3049 mutex_exit(QLOCK(q)); 3050 mutex_enter(QLOCK(wrq)); 3051 } 3052 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3053 3054 if (strmsgsz != 0) { 3055 if (val == INFPSZ) 3056 val = strmsgsz; 3057 else { 3058 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3059 val = MIN(PIPE_BUF, val); 3060 else 3061 val = MIN(strmsgsz, val); 3062 } 3063 } 3064 STREAM(q)->sd_qn_maxpsz = val; 3065 if (freezer != curthread) { 3066 mutex_exit(QLOCK(wrq)); 3067 mutex_enter(QLOCK(q)); 3068 } 3069 break; 3070 3071 case QMINPSZ: 3072 if (qbp) 3073 error = EINVAL; 3074 else 3075 q->q_minpsz = (ssize_t)val; 3076 3077 /* 3078 * Performance concern, strwrite looks at the module below 3079 * the stream head for the maxpsz each time it does a write 3080 * we now cache it at the stream head. Check to see if this 3081 * queue is sitting directly below the stream head. 3082 */ 3083 wrq = STREAM(q)->sd_wrq; 3084 if (q != wrq->q_next) 3085 break; 3086 3087 /* 3088 * If the stream is not frozen drop the current QLOCK and 3089 * acquire the sd_wrq QLOCK which protects sd_qn_* 3090 */ 3091 if (freezer != curthread) { 3092 mutex_exit(QLOCK(q)); 3093 mutex_enter(QLOCK(wrq)); 3094 } 3095 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3096 3097 if (freezer != curthread) { 3098 mutex_exit(QLOCK(wrq)); 3099 mutex_enter(QLOCK(q)); 3100 } 3101 break; 3102 3103 case QSTRUIOT: 3104 if (qbp) 3105 error = EINVAL; 3106 else 3107 q->q_struiot = (ushort_t)val; 3108 break; 3109 3110 case QCOUNT: 3111 case QFIRST: 3112 case QLAST: 3113 case QFLAG: 3114 error = EPERM; 3115 break; 3116 3117 default: 3118 error = EINVAL; 3119 break; 3120 } 3121 done: 3122 if (freezer != curthread) 3123 mutex_exit(QLOCK(q)); 3124 return (error); 3125 } 3126 3127 /* 3128 * Get queue fields. 3129 */ 3130 int 3131 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3132 { 3133 qband_t *qbp = NULL; 3134 int error = 0; 3135 kthread_id_t freezer; 3136 3137 freezer = STREAM(q)->sd_freezer; 3138 if (freezer == curthread) { 3139 ASSERT(frozenstr(q)); 3140 ASSERT(MUTEX_HELD(QLOCK(q))); 3141 } else 3142 mutex_enter(QLOCK(q)); 3143 if (what >= QBAD) { 3144 error = EINVAL; 3145 goto done; 3146 } 3147 if (pri != 0) { 3148 int i; 3149 qband_t **qbpp; 3150 3151 if (pri > q->q_nband) { 3152 qbpp = &q->q_bandp; 3153 while (*qbpp) 3154 qbpp = &(*qbpp)->qb_next; 3155 while (pri > q->q_nband) { 3156 if ((*qbpp = allocband()) == NULL) { 3157 error = EAGAIN; 3158 goto done; 3159 } 3160 (*qbpp)->qb_hiwat = q->q_hiwat; 3161 (*qbpp)->qb_lowat = q->q_lowat; 3162 q->q_nband++; 3163 qbpp = &(*qbpp)->qb_next; 3164 } 3165 } 3166 qbp = q->q_bandp; 3167 i = pri; 3168 while (--i) 3169 qbp = qbp->qb_next; 3170 } 3171 switch (what) { 3172 case QHIWAT: 3173 if (qbp) 3174 *(size_t *)valp = qbp->qb_hiwat; 3175 else 3176 *(size_t *)valp = q->q_hiwat; 3177 break; 3178 3179 case QLOWAT: 3180 if (qbp) 3181 *(size_t *)valp = qbp->qb_lowat; 3182 else 3183 *(size_t *)valp = q->q_lowat; 3184 break; 3185 3186 case QMAXPSZ: 3187 if (qbp) 3188 error = EINVAL; 3189 else 3190 *(ssize_t *)valp = q->q_maxpsz; 3191 break; 3192 3193 case QMINPSZ: 3194 if (qbp) 3195 error = EINVAL; 3196 else 3197 *(ssize_t *)valp = q->q_minpsz; 3198 break; 3199 3200 case QCOUNT: 3201 if (qbp) 3202 *(size_t *)valp = qbp->qb_count; 3203 else 3204 *(size_t *)valp = q->q_count; 3205 break; 3206 3207 case QFIRST: 3208 if (qbp) 3209 *(mblk_t **)valp = qbp->qb_first; 3210 else 3211 *(mblk_t **)valp = q->q_first; 3212 break; 3213 3214 case QLAST: 3215 if (qbp) 3216 *(mblk_t **)valp = qbp->qb_last; 3217 else 3218 *(mblk_t **)valp = q->q_last; 3219 break; 3220 3221 case QFLAG: 3222 if (qbp) 3223 *(uint_t *)valp = qbp->qb_flag; 3224 else 3225 *(uint_t *)valp = q->q_flag; 3226 break; 3227 3228 case QSTRUIOT: 3229 if (qbp) 3230 error = EINVAL; 3231 else 3232 *(short *)valp = q->q_struiot; 3233 break; 3234 3235 default: 3236 error = EINVAL; 3237 break; 3238 } 3239 done: 3240 if (freezer != curthread) 3241 mutex_exit(QLOCK(q)); 3242 return (error); 3243 } 3244 3245 /* 3246 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3247 * QWANTWSYNC or QWANTR or QWANTW, 3248 * 3249 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3250 * deferred wakeup will be done. Also if strpoll() in progress then a 3251 * deferred pollwakeup will be done. 3252 */ 3253 void 3254 strwakeq(queue_t *q, int flag) 3255 { 3256 stdata_t *stp = STREAM(q); 3257 pollhead_t *pl; 3258 3259 mutex_enter(&stp->sd_lock); 3260 pl = &stp->sd_pollist; 3261 if (flag & QWANTWSYNC) { 3262 ASSERT(!(q->q_flag & QREADR)); 3263 if (stp->sd_flag & WSLEEP) { 3264 stp->sd_flag &= ~WSLEEP; 3265 cv_broadcast(&stp->sd_wrq->q_wait); 3266 } else { 3267 stp->sd_wakeq |= WSLEEP; 3268 } 3269 3270 mutex_exit(&stp->sd_lock); 3271 pollwakeup(pl, POLLWRNORM); 3272 mutex_enter(&stp->sd_lock); 3273 3274 if (stp->sd_sigflags & S_WRNORM) 3275 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3276 } else if (flag & QWANTR) { 3277 if (stp->sd_flag & RSLEEP) { 3278 stp->sd_flag &= ~RSLEEP; 3279 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3280 } else { 3281 stp->sd_wakeq |= RSLEEP; 3282 } 3283 3284 mutex_exit(&stp->sd_lock); 3285 pollwakeup(pl, POLLIN | POLLRDNORM); 3286 mutex_enter(&stp->sd_lock); 3287 3288 { 3289 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3290 3291 if (events) 3292 strsendsig(stp->sd_siglist, events, 0, 0); 3293 } 3294 } else { 3295 if (stp->sd_flag & WSLEEP) { 3296 stp->sd_flag &= ~WSLEEP; 3297 cv_broadcast(&stp->sd_wrq->q_wait); 3298 } 3299 3300 mutex_exit(&stp->sd_lock); 3301 pollwakeup(pl, POLLWRNORM); 3302 mutex_enter(&stp->sd_lock); 3303 3304 if (stp->sd_sigflags & S_WRNORM) 3305 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3306 } 3307 mutex_exit(&stp->sd_lock); 3308 } 3309 3310 int 3311 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3312 { 3313 stdata_t *stp = STREAM(q); 3314 int typ = STRUIOT_STANDARD; 3315 uio_t *uiop = &dp->d_uio; 3316 dblk_t *dbp; 3317 ssize_t uiocnt; 3318 ssize_t cnt; 3319 unsigned char *ptr; 3320 ssize_t resid; 3321 int error = 0; 3322 on_trap_data_t otd; 3323 queue_t *stwrq; 3324 3325 /* 3326 * Plumbing may change while taking the type so store the 3327 * queue in a temporary variable. It doesn't matter even 3328 * if the we take the type from the previous plumbing, 3329 * that's because if the plumbing has changed when we were 3330 * holding the queue in a temporary variable, we can continue 3331 * processing the message the way it would have been processed 3332 * in the old plumbing, without any side effects but a bit 3333 * extra processing for partial ip header checksum. 3334 * 3335 * This has been done to avoid holding the sd_lock which is 3336 * very hot. 3337 */ 3338 3339 stwrq = stp->sd_struiowrq; 3340 if (stwrq) 3341 typ = stwrq->q_struiot; 3342 3343 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3344 dbp = mp->b_datap; 3345 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3346 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3347 cnt = MIN(uiocnt, uiop->uio_resid); 3348 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3349 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3350 /* 3351 * Either this mblk has already been processed 3352 * or there is no more room in this mblk (?). 3353 */ 3354 continue; 3355 } 3356 switch (typ) { 3357 case STRUIOT_STANDARD: 3358 if (noblock) { 3359 if (on_trap(&otd, OT_DATA_ACCESS)) { 3360 no_trap(); 3361 error = EWOULDBLOCK; 3362 goto out; 3363 } 3364 } 3365 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3366 if (noblock) 3367 no_trap(); 3368 goto out; 3369 } 3370 if (noblock) 3371 no_trap(); 3372 break; 3373 3374 default: 3375 error = EIO; 3376 goto out; 3377 } 3378 dbp->db_struioflag |= STRUIO_DONE; 3379 dbp->db_cksumstuff += cnt; 3380 } 3381 out: 3382 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3383 /* 3384 * A fault has occured and some bytes were moved to the 3385 * current mblk, the uio_t has already been updated by 3386 * the appropriate uio routine, so also update the mblk 3387 * to reflect this in case this same mblk chain is used 3388 * again (after the fault has been handled). 3389 */ 3390 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3391 if (uiocnt >= resid) 3392 dbp->db_cksumstuff += resid; 3393 } 3394 return (error); 3395 } 3396 3397 /* 3398 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3399 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3400 * that removeq() will not try to close the queue while a thread is inside the 3401 * queue. 3402 */ 3403 static boolean_t 3404 rwnext_enter(queue_t *qp) 3405 { 3406 mutex_enter(QLOCK(qp)); 3407 if (qp->q_flag & QWCLOSE) { 3408 mutex_exit(QLOCK(qp)); 3409 return (B_FALSE); 3410 } 3411 qp->q_rwcnt++; 3412 ASSERT(qp->q_rwcnt != 0); 3413 mutex_exit(QLOCK(qp)); 3414 return (B_TRUE); 3415 } 3416 3417 /* 3418 * Decrease the count of threads running in sync stream queue and wake up any 3419 * threads blocked in removeq(). 3420 */ 3421 static void 3422 rwnext_exit(queue_t *qp) 3423 { 3424 mutex_enter(QLOCK(qp)); 3425 qp->q_rwcnt--; 3426 if (qp->q_flag & QWANTRMQSYNC) { 3427 qp->q_flag &= ~QWANTRMQSYNC; 3428 cv_broadcast(&qp->q_wait); 3429 } 3430 mutex_exit(QLOCK(qp)); 3431 } 3432 3433 /* 3434 * The purpose of rwnext() is to call the rw procedure of the next 3435 * (downstream) modules queue. 3436 * 3437 * treated as put entrypoint for perimeter syncronization. 3438 * 3439 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3440 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3441 * not matter if any regular put entrypoints have been already entered. We 3442 * can't increment one of the sq_putcounts (instead of sq_count) because 3443 * qwait_rw won't know which counter to decrement. 3444 * 3445 * It would be reasonable to add the lockless FASTPUT logic. 3446 */ 3447 int 3448 rwnext(queue_t *qp, struiod_t *dp) 3449 { 3450 queue_t *nqp; 3451 syncq_t *sq; 3452 uint16_t count; 3453 uint16_t flags; 3454 struct qinit *qi; 3455 int (*proc)(); 3456 struct stdata *stp; 3457 int isread; 3458 int rval; 3459 3460 stp = STREAM(qp); 3461 /* 3462 * Prevent q_next from changing by holding sd_lock until acquiring 3463 * SQLOCK. Note that a read-side rwnext from the streamhead will 3464 * already have sd_lock acquired. In either case sd_lock is always 3465 * released after acquiring SQLOCK. 3466 * 3467 * The streamhead read-side holding sd_lock when calling rwnext is 3468 * required to prevent a race condition were M_DATA mblks flowing 3469 * up the read-side of the stream could be bypassed by a rwnext() 3470 * down-call. In this case sd_lock acts as the streamhead perimeter. 3471 */ 3472 if ((nqp = _WR(qp)) == qp) { 3473 isread = 0; 3474 mutex_enter(&stp->sd_lock); 3475 qp = nqp->q_next; 3476 } else { 3477 isread = 1; 3478 if (nqp != stp->sd_wrq) 3479 /* Not streamhead */ 3480 mutex_enter(&stp->sd_lock); 3481 qp = _RD(nqp->q_next); 3482 } 3483 qi = qp->q_qinfo; 3484 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3485 /* 3486 * Not a synchronous module or no r/w procedure for this 3487 * queue, so just return EINVAL and let the caller handle it. 3488 */ 3489 mutex_exit(&stp->sd_lock); 3490 return (EINVAL); 3491 } 3492 3493 if (rwnext_enter(qp) == B_FALSE) { 3494 mutex_exit(&stp->sd_lock); 3495 return (EINVAL); 3496 } 3497 3498 sq = qp->q_syncq; 3499 mutex_enter(SQLOCK(sq)); 3500 mutex_exit(&stp->sd_lock); 3501 count = sq->sq_count; 3502 flags = sq->sq_flags; 3503 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3504 3505 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3506 /* 3507 * if this queue is being closed, return. 3508 */ 3509 if (qp->q_flag & QWCLOSE) { 3510 mutex_exit(SQLOCK(sq)); 3511 rwnext_exit(qp); 3512 return (EINVAL); 3513 } 3514 3515 /* 3516 * Wait until we can enter the inner perimeter. 3517 */ 3518 sq->sq_flags = flags | SQ_WANTWAKEUP; 3519 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3520 count = sq->sq_count; 3521 flags = sq->sq_flags; 3522 } 3523 3524 if (isread == 0 && stp->sd_struiowrq == NULL || 3525 isread == 1 && stp->sd_struiordq == NULL) { 3526 /* 3527 * Stream plumbing changed while waiting for inner perimeter 3528 * so just return EINVAL and let the caller handle it. 3529 */ 3530 mutex_exit(SQLOCK(sq)); 3531 rwnext_exit(qp); 3532 return (EINVAL); 3533 } 3534 if (!(flags & SQ_CIPUT)) 3535 sq->sq_flags = flags | SQ_EXCL; 3536 sq->sq_count = count + 1; 3537 ASSERT(sq->sq_count != 0); /* Wraparound */ 3538 /* 3539 * Note: The only message ordering guarantee that rwnext() makes is 3540 * for the write queue flow-control case. All others (r/w queue 3541 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3542 * the queue's rw procedure. This could be genralized here buy 3543 * running the queue's service procedure, but that wouldn't be 3544 * the most efficent for all cases. 3545 */ 3546 mutex_exit(SQLOCK(sq)); 3547 if (! isread && (qp->q_flag & QFULL)) { 3548 /* 3549 * Write queue may be flow controlled. If so, 3550 * mark the queue for wakeup when it's not. 3551 */ 3552 mutex_enter(QLOCK(qp)); 3553 if (qp->q_flag & QFULL) { 3554 qp->q_flag |= QWANTWSYNC; 3555 mutex_exit(QLOCK(qp)); 3556 rval = EWOULDBLOCK; 3557 goto out; 3558 } 3559 mutex_exit(QLOCK(qp)); 3560 } 3561 3562 if (! isread && dp->d_mp) 3563 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3564 dp->d_mp->b_datap->db_base); 3565 3566 rval = (*proc)(qp, dp); 3567 3568 if (isread && dp->d_mp) 3569 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3570 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3571 out: 3572 /* 3573 * The queue is protected from being freed by sq_count, so it is 3574 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3575 */ 3576 rwnext_exit(qp); 3577 3578 mutex_enter(SQLOCK(sq)); 3579 flags = sq->sq_flags; 3580 ASSERT(sq->sq_count != 0); 3581 sq->sq_count--; 3582 if (flags & SQ_TAIL) { 3583 putnext_tail(sq, qp, flags); 3584 /* 3585 * The only purpose of this ASSERT is to preserve calling stack 3586 * in DEBUG kernel. 3587 */ 3588 ASSERT(flags & SQ_TAIL); 3589 return (rval); 3590 } 3591 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3592 /* 3593 * Safe to always drop SQ_EXCL: 3594 * Not SQ_CIPUT means we set SQ_EXCL above 3595 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3596 * did a qwriter(INNER) in which case nobody else 3597 * is in the inner perimeter and we are exiting. 3598 * 3599 * I would like to make the following assertion: 3600 * 3601 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3602 * sq->sq_count == 0); 3603 * 3604 * which indicates that if we are both putshared and exclusive, 3605 * we became exclusive while executing the putproc, and the only 3606 * claim on the syncq was the one we dropped a few lines above. 3607 * But other threads that enter putnext while the syncq is exclusive 3608 * need to make a claim as they may need to drop SQLOCK in the 3609 * has_writers case to avoid deadlocks. If these threads are 3610 * delayed or preempted, it is possible that the writer thread can 3611 * find out that there are other claims making the (sq_count == 0) 3612 * test invalid. 3613 */ 3614 3615 sq->sq_flags = flags & ~SQ_EXCL; 3616 if (sq->sq_flags & SQ_WANTWAKEUP) { 3617 sq->sq_flags &= ~SQ_WANTWAKEUP; 3618 cv_broadcast(&sq->sq_wait); 3619 } 3620 mutex_exit(SQLOCK(sq)); 3621 return (rval); 3622 } 3623 3624 /* 3625 * The purpose of infonext() is to call the info procedure of the next 3626 * (downstream) modules queue. 3627 * 3628 * treated as put entrypoint for perimeter syncronization. 3629 * 3630 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3631 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3632 * it does not matter if any regular put entrypoints have been already 3633 * entered. 3634 */ 3635 int 3636 infonext(queue_t *qp, infod_t *idp) 3637 { 3638 queue_t *nqp; 3639 syncq_t *sq; 3640 uint16_t count; 3641 uint16_t flags; 3642 struct qinit *qi; 3643 int (*proc)(); 3644 struct stdata *stp; 3645 int rval; 3646 3647 stp = STREAM(qp); 3648 /* 3649 * Prevent q_next from changing by holding sd_lock until 3650 * acquiring SQLOCK. 3651 */ 3652 mutex_enter(&stp->sd_lock); 3653 if ((nqp = _WR(qp)) == qp) { 3654 qp = nqp->q_next; 3655 } else { 3656 qp = _RD(nqp->q_next); 3657 } 3658 qi = qp->q_qinfo; 3659 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3660 mutex_exit(&stp->sd_lock); 3661 return (EINVAL); 3662 } 3663 sq = qp->q_syncq; 3664 mutex_enter(SQLOCK(sq)); 3665 mutex_exit(&stp->sd_lock); 3666 count = sq->sq_count; 3667 flags = sq->sq_flags; 3668 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3669 3670 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3671 /* 3672 * Wait until we can enter the inner perimeter. 3673 */ 3674 sq->sq_flags = flags | SQ_WANTWAKEUP; 3675 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3676 count = sq->sq_count; 3677 flags = sq->sq_flags; 3678 } 3679 3680 if (! (flags & SQ_CIPUT)) 3681 sq->sq_flags = flags | SQ_EXCL; 3682 sq->sq_count = count + 1; 3683 ASSERT(sq->sq_count != 0); /* Wraparound */ 3684 mutex_exit(SQLOCK(sq)); 3685 3686 rval = (*proc)(qp, idp); 3687 3688 mutex_enter(SQLOCK(sq)); 3689 flags = sq->sq_flags; 3690 ASSERT(sq->sq_count != 0); 3691 sq->sq_count--; 3692 if (flags & SQ_TAIL) { 3693 putnext_tail(sq, qp, flags); 3694 /* 3695 * The only purpose of this ASSERT is to preserve calling stack 3696 * in DEBUG kernel. 3697 */ 3698 ASSERT(flags & SQ_TAIL); 3699 return (rval); 3700 } 3701 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3702 /* 3703 * XXXX 3704 * I am not certain the next comment is correct here. I need to consider 3705 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3706 * might cause other problems. It just might be safer to drop it if 3707 * !SQ_CIPUT because that is when we set it. 3708 */ 3709 /* 3710 * Safe to always drop SQ_EXCL: 3711 * Not SQ_CIPUT means we set SQ_EXCL above 3712 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3713 * did a qwriter(INNER) in which case nobody else 3714 * is in the inner perimeter and we are exiting. 3715 * 3716 * I would like to make the following assertion: 3717 * 3718 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3719 * sq->sq_count == 0); 3720 * 3721 * which indicates that if we are both putshared and exclusive, 3722 * we became exclusive while executing the putproc, and the only 3723 * claim on the syncq was the one we dropped a few lines above. 3724 * But other threads that enter putnext while the syncq is exclusive 3725 * need to make a claim as they may need to drop SQLOCK in the 3726 * has_writers case to avoid deadlocks. If these threads are 3727 * delayed or preempted, it is possible that the writer thread can 3728 * find out that there are other claims making the (sq_count == 0) 3729 * test invalid. 3730 */ 3731 3732 sq->sq_flags = flags & ~SQ_EXCL; 3733 mutex_exit(SQLOCK(sq)); 3734 return (rval); 3735 } 3736 3737 /* 3738 * Return nonzero if the queue is responsible for struio(), else return 0. 3739 */ 3740 int 3741 isuioq(queue_t *q) 3742 { 3743 if (q->q_flag & QREADR) 3744 return (STREAM(q)->sd_struiordq == q); 3745 else 3746 return (STREAM(q)->sd_struiowrq == q); 3747 } 3748 3749 #if defined(__sparc) 3750 int disable_putlocks = 0; 3751 #else 3752 int disable_putlocks = 1; 3753 #endif 3754 3755 /* 3756 * called by create_putlock. 3757 */ 3758 static void 3759 create_syncq_putlocks(queue_t *q) 3760 { 3761 syncq_t *sq = q->q_syncq; 3762 ciputctrl_t *cip; 3763 int i; 3764 3765 ASSERT(sq != NULL); 3766 3767 ASSERT(disable_putlocks == 0); 3768 ASSERT(n_ciputctrl >= min_n_ciputctrl); 3769 ASSERT(ciputctrl_cache != NULL); 3770 3771 if (!(sq->sq_type & SQ_CIPUT)) 3772 return; 3773 3774 for (i = 0; i <= 1; i++) { 3775 if (sq->sq_ciputctrl == NULL) { 3776 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3777 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3778 mutex_enter(SQLOCK(sq)); 3779 if (sq->sq_ciputctrl != NULL) { 3780 mutex_exit(SQLOCK(sq)); 3781 kmem_cache_free(ciputctrl_cache, cip); 3782 } else { 3783 ASSERT(sq->sq_nciputctrl == 0); 3784 sq->sq_nciputctrl = n_ciputctrl - 1; 3785 /* 3786 * putnext checks sq_ciputctrl without holding 3787 * SQLOCK. if it is not NULL putnext assumes 3788 * sq_nciputctrl is initialized. membar below 3789 * insures that. 3790 */ 3791 membar_producer(); 3792 sq->sq_ciputctrl = cip; 3793 mutex_exit(SQLOCK(sq)); 3794 } 3795 } 3796 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 3797 if (i == 1) 3798 break; 3799 q = _OTHERQ(q); 3800 if (!(q->q_flag & QPERQ)) { 3801 ASSERT(sq == q->q_syncq); 3802 break; 3803 } 3804 ASSERT(q->q_syncq != NULL); 3805 ASSERT(sq != q->q_syncq); 3806 sq = q->q_syncq; 3807 ASSERT(sq->sq_type & SQ_CIPUT); 3808 } 3809 } 3810 3811 /* 3812 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 3813 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 3814 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 3815 * starting from q and down to the driver. 3816 * 3817 * This should be called after the affected queues are part of stream 3818 * geometry. It should be called from driver/module open routine after 3819 * qprocson() call. It is also called from nfs syscall where it is known that 3820 * stream is configured and won't change its geometry during create_putlock 3821 * call. 3822 * 3823 * caller normally uses 0 value for the stream argument to speed up MT putnext 3824 * into the perimeter of q for example because its perimeter is per module 3825 * (e.g. IP). 3826 * 3827 * caller normally uses non 0 value for the stream argument to hint the system 3828 * that the stream of q is a very contended global system stream 3829 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 3830 * particularly MT hot. 3831 * 3832 * Caller insures stream plumbing won't happen while we are here and therefore 3833 * q_next can be safely used. 3834 */ 3835 3836 void 3837 create_putlocks(queue_t *q, int stream) 3838 { 3839 ciputctrl_t *cip; 3840 struct stdata *stp = STREAM(q); 3841 3842 q = _WR(q); 3843 ASSERT(stp != NULL); 3844 3845 if (disable_putlocks != 0) 3846 return; 3847 3848 if (n_ciputctrl < min_n_ciputctrl) 3849 return; 3850 3851 ASSERT(ciputctrl_cache != NULL); 3852 3853 if (stream != 0 && stp->sd_ciputctrl == NULL) { 3854 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3855 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3856 mutex_enter(&stp->sd_lock); 3857 if (stp->sd_ciputctrl != NULL) { 3858 mutex_exit(&stp->sd_lock); 3859 kmem_cache_free(ciputctrl_cache, cip); 3860 } else { 3861 ASSERT(stp->sd_nciputctrl == 0); 3862 stp->sd_nciputctrl = n_ciputctrl - 1; 3863 /* 3864 * putnext checks sd_ciputctrl without holding 3865 * sd_lock. if it is not NULL putnext assumes 3866 * sd_nciputctrl is initialized. membar below 3867 * insures that. 3868 */ 3869 membar_producer(); 3870 stp->sd_ciputctrl = cip; 3871 mutex_exit(&stp->sd_lock); 3872 } 3873 } 3874 3875 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 3876 3877 while (_SAMESTR(q)) { 3878 create_syncq_putlocks(q); 3879 if (stream == 0) 3880 return; 3881 q = q->q_next; 3882 } 3883 ASSERT(q != NULL); 3884 create_syncq_putlocks(q); 3885 } 3886 3887 /* 3888 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 3889 * through a stream. 3890 * 3891 * Data currently record per event is a hrtime stamp, queue address, event 3892 * type, and a per type datum. Much of the STREAMS framework is instrumented 3893 * for automatic flow tracing (when enabled). Events can be defined and used 3894 * by STREAMS modules and drivers. 3895 * 3896 * Global objects: 3897 * 3898 * str_ftevent() - Add a flow-trace event to a dblk. 3899 * str_ftfree() - Free flow-trace data 3900 * 3901 * Local objects: 3902 * 3903 * fthdr_cache - pointer to the kmem cache for trace header. 3904 * ftblk_cache - pointer to the kmem cache for trace data blocks. 3905 */ 3906 3907 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 3908 3909 void 3910 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 3911 { 3912 ftblk_t *bp = hp->tail; 3913 ftblk_t *nbp; 3914 ftevnt_t *ep; 3915 int ix, nix; 3916 3917 ASSERT(hp != NULL); 3918 3919 for (;;) { 3920 if ((ix = bp->ix) == FTBLK_EVNTS) { 3921 /* 3922 * Tail doesn't have room, so need a new tail. 3923 * 3924 * To make this MT safe, first, allocate a new 3925 * ftblk, and initialize it. To make life a 3926 * little easier, reserve the first slot (mostly 3927 * by making ix = 1). When we are finished with 3928 * the initialization, CAS this pointer to the 3929 * tail. If this succeeds, this is the new 3930 * "next" block. Otherwise, another thread 3931 * got here first, so free the block and start 3932 * again. 3933 */ 3934 if (!(nbp = kmem_cache_alloc(ftblk_cache, 3935 KM_NOSLEEP))) { 3936 /* no mem, so punt */ 3937 str_ftnever++; 3938 /* free up all flow data? */ 3939 return; 3940 } 3941 nbp->nxt = NULL; 3942 nbp->ix = 1; 3943 /* 3944 * Just in case there is another thread about 3945 * to get the next index, we need to make sure 3946 * the value is there for it. 3947 */ 3948 membar_producer(); 3949 if (casptr(&hp->tail, bp, nbp) == bp) { 3950 /* CAS was successful */ 3951 bp->nxt = nbp; 3952 membar_producer(); 3953 bp = nbp; 3954 ix = 0; 3955 goto cas_good; 3956 } else { 3957 kmem_cache_free(ftblk_cache, nbp); 3958 bp = hp->tail; 3959 continue; 3960 } 3961 } 3962 nix = ix + 1; 3963 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 3964 cas_good: 3965 if (curthread != hp->thread) { 3966 hp->thread = curthread; 3967 evnt |= FTEV_CS; 3968 } 3969 if (CPU->cpu_seqid != hp->cpu_seqid) { 3970 hp->cpu_seqid = CPU->cpu_seqid; 3971 evnt |= FTEV_PS; 3972 } 3973 ep = &bp->ev[ix]; 3974 break; 3975 } 3976 } 3977 3978 if (evnt & FTEV_QMASK) { 3979 queue_t *qp = p; 3980 3981 /* 3982 * It is possible that the module info is broke 3983 * (as is logsubr.c at this comment writing). 3984 * Instead of panicing or doing other unmentionables, 3985 * we shall put a dummy name as the mid, and continue. 3986 */ 3987 if (qp->q_qinfo == NULL) 3988 ep->mid = "NONAME"; 3989 else 3990 ep->mid = qp->q_qinfo->qi_minfo->mi_idname; 3991 3992 if (!(qp->q_flag & QREADR)) 3993 evnt |= FTEV_ISWR; 3994 } else { 3995 ep->mid = (char *)p; 3996 } 3997 3998 ep->ts = gethrtime(); 3999 ep->evnt = evnt; 4000 ep->data = data; 4001 hp->hash = (hp->hash << 9) + hp->hash; 4002 hp->hash += (evnt << 16) | data; 4003 hp->hash += (uintptr_t)ep->mid; 4004 } 4005 4006 /* 4007 * Free flow-trace data. 4008 */ 4009 void 4010 str_ftfree(dblk_t *dbp) 4011 { 4012 fthdr_t *hp = dbp->db_fthdr; 4013 ftblk_t *bp = &hp->first; 4014 ftblk_t *nbp; 4015 4016 if (bp != hp->tail || bp->ix != 0) { 4017 /* 4018 * Clear out the hash, have the tail point to itself, and free 4019 * any continuation blocks. 4020 */ 4021 bp = hp->first.nxt; 4022 hp->tail = &hp->first; 4023 hp->hash = 0; 4024 hp->first.nxt = NULL; 4025 hp->first.ix = 0; 4026 while (bp != NULL) { 4027 nbp = bp->nxt; 4028 kmem_cache_free(ftblk_cache, bp); 4029 bp = nbp; 4030 } 4031 } 4032 kmem_cache_free(fthdr_cache, hp); 4033 dbp->db_fthdr = NULL; 4034 } 4035