1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 23 /* All Rights Reserved */ 24 25 26 /* 27 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 28 * Use is subject to license terms. 29 */ 30 31 #pragma ident "%Z%%M% %I% %E% SMI" 32 33 #include <sys/types.h> 34 #include <sys/param.h> 35 #include <sys/thread.h> 36 #include <sys/sysmacros.h> 37 #include <sys/stropts.h> 38 #include <sys/stream.h> 39 #include <sys/strsubr.h> 40 #include <sys/strsun.h> 41 #include <sys/conf.h> 42 #include <sys/debug.h> 43 #include <sys/cmn_err.h> 44 #include <sys/kmem.h> 45 #include <sys/atomic.h> 46 #include <sys/errno.h> 47 #include <sys/vtrace.h> 48 #include <sys/ftrace.h> 49 #include <sys/ontrap.h> 50 #include <sys/multidata.h> 51 #include <sys/multidata_impl.h> 52 #include <sys/sdt.h> 53 #include <sys/strft.h> 54 55 #ifdef DEBUG 56 #include <sys/kmem_impl.h> 57 #endif 58 59 /* 60 * This file contains all the STREAMS utility routines that may 61 * be used by modules and drivers. 62 */ 63 64 /* 65 * STREAMS message allocator: principles of operation 66 * 67 * The streams message allocator consists of all the routines that 68 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 69 * dupb(), freeb() and freemsg(). What follows is a high-level view 70 * of how the allocator works. 71 * 72 * Every streams message consists of one or more mblks, a dblk, and data. 73 * All mblks for all types of messages come from a common mblk_cache. 74 * The dblk and data come in several flavors, depending on how the 75 * message is allocated: 76 * 77 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 78 * fixed-size dblk/data caches. For message sizes that are multiples of 79 * PAGESIZE, dblks are allocated separately from the buffer. 80 * The associated buffer is allocated by the constructor using kmem_alloc(). 81 * For all other message sizes, dblk and its associated data is allocated 82 * as a single contiguous chunk of memory. 83 * Objects in these caches consist of a dblk plus its associated data. 84 * allocb() determines the nearest-size cache by table lookup: 85 * the dblk_cache[] array provides the mapping from size to dblk cache. 86 * 87 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 88 * kmem_alloc()'ing a buffer for the data and supplying that 89 * buffer to gesballoc(), described below. 90 * 91 * (3) The four flavors of [d]esballoc[a] are all implemented by a 92 * common routine, gesballoc() ("generic esballoc"). gesballoc() 93 * allocates a dblk from the global dblk_esb_cache and sets db_base, 94 * db_lim and db_frtnp to describe the caller-supplied buffer. 95 * 96 * While there are several routines to allocate messages, there is only 97 * one routine to free messages: freeb(). freeb() simply invokes the 98 * dblk's free method, dbp->db_free(), which is set at allocation time. 99 * 100 * dupb() creates a new reference to a message by allocating a new mblk, 101 * incrementing the dblk reference count and setting the dblk's free 102 * method to dblk_decref(). The dblk's original free method is retained 103 * in db_lastfree. dblk_decref() decrements the reference count on each 104 * freeb(). If this is not the last reference it just frees the mblk; 105 * if this *is* the last reference, it restores db_free to db_lastfree, 106 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 107 * 108 * The implementation makes aggressive use of kmem object caching for 109 * maximum performance. This makes the code simple and compact, but 110 * also a bit abstruse in some places. The invariants that constitute a 111 * message's constructed state, described below, are more subtle than usual. 112 * 113 * Every dblk has an "attached mblk" as part of its constructed state. 114 * The mblk is allocated by the dblk's constructor and remains attached 115 * until the message is either dup'ed or pulled up. In the dupb() case 116 * the mblk association doesn't matter until the last free, at which time 117 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 118 * the mblk association because it swaps the leading mblks of two messages, 119 * so it is responsible for swapping their db_mblk pointers accordingly. 120 * From a constructed-state viewpoint it doesn't matter that a dblk's 121 * attached mblk can change while the message is allocated; all that 122 * matters is that the dblk has *some* attached mblk when it's freed. 123 * 124 * The sizes of the allocb() small-message caches are not magical. 125 * They represent a good trade-off between internal and external 126 * fragmentation for current workloads. They should be reevaluated 127 * periodically, especially if allocations larger than DBLK_MAX_CACHE 128 * become common. We use 64-byte alignment so that dblks don't 129 * straddle cache lines unnecessarily. 130 */ 131 #define DBLK_MAX_CACHE 73728 132 #define DBLK_CACHE_ALIGN 64 133 #define DBLK_MIN_SIZE 8 134 #define DBLK_SIZE_SHIFT 3 135 136 #ifdef _BIG_ENDIAN 137 #define DBLK_RTFU_SHIFT(field) \ 138 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 139 #else 140 #define DBLK_RTFU_SHIFT(field) \ 141 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 142 #endif 143 144 #define DBLK_RTFU(ref, type, flags, uioflag) \ 145 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 146 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 147 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 148 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 149 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 150 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 151 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 152 153 static size_t dblk_sizes[] = { 154 #ifdef _LP64 155 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920, 156 8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688, 157 40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456, 158 #else 159 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968, 160 8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736, 161 40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504, 162 #endif 163 DBLK_MAX_CACHE, 0 164 }; 165 166 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 167 static struct kmem_cache *mblk_cache; 168 static struct kmem_cache *dblk_esb_cache; 169 static struct kmem_cache *fthdr_cache; 170 static struct kmem_cache *ftblk_cache; 171 172 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 173 static mblk_t *allocb_oversize(size_t size, int flags); 174 static int allocb_tryhard_fails; 175 static void frnop_func(void *arg); 176 frtn_t frnop = { frnop_func }; 177 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 178 179 static boolean_t rwnext_enter(queue_t *qp); 180 static void rwnext_exit(queue_t *qp); 181 182 /* 183 * Patchable mblk/dblk kmem_cache flags. 184 */ 185 int dblk_kmem_flags = 0; 186 int mblk_kmem_flags = 0; 187 188 189 static int 190 dblk_constructor(void *buf, void *cdrarg, int kmflags) 191 { 192 dblk_t *dbp = buf; 193 ssize_t msg_size = (ssize_t)cdrarg; 194 size_t index; 195 196 ASSERT(msg_size != 0); 197 198 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 199 200 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 201 202 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 203 return (-1); 204 if ((msg_size & PAGEOFFSET) == 0) { 205 dbp->db_base = kmem_alloc(msg_size, kmflags); 206 if (dbp->db_base == NULL) { 207 kmem_cache_free(mblk_cache, dbp->db_mblk); 208 return (-1); 209 } 210 } else { 211 dbp->db_base = (unsigned char *)&dbp[1]; 212 } 213 214 dbp->db_mblk->b_datap = dbp; 215 dbp->db_cache = dblk_cache[index]; 216 dbp->db_lim = dbp->db_base + msg_size; 217 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 218 dbp->db_frtnp = NULL; 219 dbp->db_fthdr = NULL; 220 dbp->db_credp = NULL; 221 dbp->db_cpid = -1; 222 dbp->db_struioflag = 0; 223 dbp->db_struioun.cksum.flags = 0; 224 return (0); 225 } 226 227 /*ARGSUSED*/ 228 static int 229 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 230 { 231 dblk_t *dbp = buf; 232 233 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 234 return (-1); 235 dbp->db_mblk->b_datap = dbp; 236 dbp->db_cache = dblk_esb_cache; 237 dbp->db_fthdr = NULL; 238 dbp->db_credp = NULL; 239 dbp->db_cpid = -1; 240 dbp->db_struioflag = 0; 241 dbp->db_struioun.cksum.flags = 0; 242 return (0); 243 } 244 245 static int 246 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 247 { 248 dblk_t *dbp = buf; 249 bcache_t *bcp = (bcache_t *)cdrarg; 250 251 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 252 return (-1); 253 254 if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache, 255 kmflags)) == NULL) { 256 kmem_cache_free(mblk_cache, dbp->db_mblk); 257 return (-1); 258 } 259 260 dbp->db_mblk->b_datap = dbp; 261 dbp->db_cache = (void *)bcp; 262 dbp->db_lim = dbp->db_base + bcp->size; 263 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 264 dbp->db_frtnp = NULL; 265 dbp->db_fthdr = NULL; 266 dbp->db_credp = NULL; 267 dbp->db_cpid = -1; 268 dbp->db_struioflag = 0; 269 dbp->db_struioun.cksum.flags = 0; 270 return (0); 271 } 272 273 /*ARGSUSED*/ 274 static void 275 dblk_destructor(void *buf, void *cdrarg) 276 { 277 dblk_t *dbp = buf; 278 ssize_t msg_size = (ssize_t)cdrarg; 279 280 ASSERT(dbp->db_mblk->b_datap == dbp); 281 282 ASSERT(msg_size != 0); 283 284 ASSERT(dbp->db_struioflag == 0); 285 ASSERT(dbp->db_struioun.cksum.flags == 0); 286 287 if ((msg_size & PAGEOFFSET) == 0) { 288 kmem_free(dbp->db_base, msg_size); 289 } 290 291 kmem_cache_free(mblk_cache, dbp->db_mblk); 292 } 293 294 static void 295 bcache_dblk_destructor(void *buf, void *cdrarg) 296 { 297 dblk_t *dbp = buf; 298 bcache_t *bcp = (bcache_t *)cdrarg; 299 300 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 301 302 ASSERT(dbp->db_mblk->b_datap == dbp); 303 304 ASSERT(dbp->db_struioflag == 0); 305 ASSERT(dbp->db_struioun.cksum.flags == 0); 306 307 kmem_cache_free(mblk_cache, dbp->db_mblk); 308 } 309 310 void 311 streams_msg_init(void) 312 { 313 char name[40]; 314 size_t size; 315 size_t lastsize = DBLK_MIN_SIZE; 316 size_t *sizep; 317 struct kmem_cache *cp; 318 size_t tot_size; 319 int offset; 320 321 mblk_cache = kmem_cache_create("streams_mblk", 322 sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL, 323 mblk_kmem_flags); 324 325 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 326 327 if ((offset = (size & PAGEOFFSET)) != 0) { 328 /* 329 * We are in the middle of a page, dblk should 330 * be allocated on the same page 331 */ 332 tot_size = size + sizeof (dblk_t); 333 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 334 < PAGESIZE); 335 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 336 337 } else { 338 339 /* 340 * buf size is multiple of page size, dblk and 341 * buffer are allocated separately. 342 */ 343 344 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 345 tot_size = sizeof (dblk_t); 346 } 347 348 (void) sprintf(name, "streams_dblk_%ld", size); 349 cp = kmem_cache_create(name, tot_size, 350 DBLK_CACHE_ALIGN, dblk_constructor, 351 dblk_destructor, NULL, 352 (void *)(size), NULL, dblk_kmem_flags); 353 354 while (lastsize <= size) { 355 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 356 lastsize += DBLK_MIN_SIZE; 357 } 358 } 359 360 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", 361 sizeof (dblk_t), DBLK_CACHE_ALIGN, 362 dblk_esb_constructor, dblk_destructor, NULL, 363 (void *) sizeof (dblk_t), NULL, dblk_kmem_flags); 364 fthdr_cache = kmem_cache_create("streams_fthdr", 365 sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 366 ftblk_cache = kmem_cache_create("streams_ftblk", 367 sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 368 369 /* Initialize Multidata caches */ 370 mmd_init(); 371 } 372 373 /*ARGSUSED*/ 374 mblk_t * 375 allocb(size_t size, uint_t pri) 376 { 377 dblk_t *dbp; 378 mblk_t *mp; 379 size_t index; 380 381 index = (size - 1) >> DBLK_SIZE_SHIFT; 382 383 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 384 if (size != 0) { 385 mp = allocb_oversize(size, KM_NOSLEEP); 386 goto out; 387 } 388 index = 0; 389 } 390 391 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 392 mp = NULL; 393 goto out; 394 } 395 396 mp = dbp->db_mblk; 397 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 398 mp->b_next = mp->b_prev = mp->b_cont = NULL; 399 mp->b_rptr = mp->b_wptr = dbp->db_base; 400 mp->b_queue = NULL; 401 MBLK_BAND_FLAG_WORD(mp) = 0; 402 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 403 out: 404 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 405 406 return (mp); 407 } 408 409 mblk_t * 410 allocb_tmpl(size_t size, const mblk_t *tmpl) 411 { 412 mblk_t *mp = allocb(size, 0); 413 414 if (mp != NULL) { 415 cred_t *cr = DB_CRED(tmpl); 416 if (cr != NULL) 417 crhold(mp->b_datap->db_credp = cr); 418 DB_CPID(mp) = DB_CPID(tmpl); 419 DB_TYPE(mp) = DB_TYPE(tmpl); 420 } 421 return (mp); 422 } 423 424 mblk_t * 425 allocb_cred(size_t size, cred_t *cr) 426 { 427 mblk_t *mp = allocb(size, 0); 428 429 if (mp != NULL && cr != NULL) 430 crhold(mp->b_datap->db_credp = cr); 431 432 return (mp); 433 } 434 435 mblk_t * 436 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr) 437 { 438 mblk_t *mp = allocb_wait(size, 0, flags, error); 439 440 if (mp != NULL && cr != NULL) 441 crhold(mp->b_datap->db_credp = cr); 442 443 return (mp); 444 } 445 446 void 447 freeb(mblk_t *mp) 448 { 449 dblk_t *dbp = mp->b_datap; 450 451 ASSERT(dbp->db_ref > 0); 452 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 453 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 454 455 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 456 457 dbp->db_free(mp, dbp); 458 } 459 460 void 461 freemsg(mblk_t *mp) 462 { 463 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 464 while (mp) { 465 dblk_t *dbp = mp->b_datap; 466 mblk_t *mp_cont = mp->b_cont; 467 468 ASSERT(dbp->db_ref > 0); 469 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 470 471 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 472 473 dbp->db_free(mp, dbp); 474 mp = mp_cont; 475 } 476 } 477 478 /* 479 * Reallocate a block for another use. Try hard to use the old block. 480 * If the old data is wanted (copy), leave b_wptr at the end of the data, 481 * otherwise return b_wptr = b_rptr. 482 * 483 * This routine is private and unstable. 484 */ 485 mblk_t * 486 reallocb(mblk_t *mp, size_t size, uint_t copy) 487 { 488 mblk_t *mp1; 489 unsigned char *old_rptr; 490 ptrdiff_t cur_size; 491 492 if (mp == NULL) 493 return (allocb(size, BPRI_HI)); 494 495 cur_size = mp->b_wptr - mp->b_rptr; 496 old_rptr = mp->b_rptr; 497 498 ASSERT(mp->b_datap->db_ref != 0); 499 500 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 501 /* 502 * If the data is wanted and it will fit where it is, no 503 * work is required. 504 */ 505 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 506 return (mp); 507 508 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 509 mp1 = mp; 510 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 511 /* XXX other mp state could be copied too, db_flags ... ? */ 512 mp1->b_cont = mp->b_cont; 513 } else { 514 return (NULL); 515 } 516 517 if (copy) { 518 bcopy(old_rptr, mp1->b_rptr, cur_size); 519 mp1->b_wptr = mp1->b_rptr + cur_size; 520 } 521 522 if (mp != mp1) 523 freeb(mp); 524 525 return (mp1); 526 } 527 528 static void 529 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 530 { 531 ASSERT(dbp->db_mblk == mp); 532 if (dbp->db_fthdr != NULL) 533 str_ftfree(dbp); 534 535 /* set credp and projid to be 'unspecified' before returning to cache */ 536 if (dbp->db_credp != NULL) { 537 crfree(dbp->db_credp); 538 dbp->db_credp = NULL; 539 } 540 dbp->db_cpid = -1; 541 542 /* Reset the struioflag and the checksum flag fields */ 543 dbp->db_struioflag = 0; 544 dbp->db_struioun.cksum.flags = 0; 545 546 /* and the COOKED flag */ 547 dbp->db_flags &= ~DBLK_COOKED; 548 549 kmem_cache_free(dbp->db_cache, dbp); 550 } 551 552 static void 553 dblk_decref(mblk_t *mp, dblk_t *dbp) 554 { 555 if (dbp->db_ref != 1) { 556 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 557 -(1 << DBLK_RTFU_SHIFT(db_ref))); 558 /* 559 * atomic_add_32_nv() just decremented db_ref, so we no longer 560 * have a reference to the dblk, which means another thread 561 * could free it. Therefore we cannot examine the dblk to 562 * determine whether ours was the last reference. Instead, 563 * we extract the new and minimum reference counts from rtfu. 564 * Note that all we're really saying is "if (ref != refmin)". 565 */ 566 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 567 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 568 kmem_cache_free(mblk_cache, mp); 569 return; 570 } 571 } 572 dbp->db_mblk = mp; 573 dbp->db_free = dbp->db_lastfree; 574 dbp->db_lastfree(mp, dbp); 575 } 576 577 mblk_t * 578 dupb(mblk_t *mp) 579 { 580 dblk_t *dbp = mp->b_datap; 581 mblk_t *new_mp; 582 uint32_t oldrtfu, newrtfu; 583 584 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 585 goto out; 586 587 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 588 new_mp->b_rptr = mp->b_rptr; 589 new_mp->b_wptr = mp->b_wptr; 590 new_mp->b_datap = dbp; 591 new_mp->b_queue = NULL; 592 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 593 594 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 595 596 /* 597 * First-dup optimization. The enabling assumption is that there 598 * can can never be a race (in correct code) to dup the first copy 599 * of a message. Therefore we don't need to do it atomically. 600 */ 601 if (dbp->db_free != dblk_decref) { 602 dbp->db_free = dblk_decref; 603 dbp->db_ref++; 604 goto out; 605 } 606 607 do { 608 ASSERT(dbp->db_ref > 0); 609 oldrtfu = DBLK_RTFU_WORD(dbp); 610 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 611 /* 612 * If db_ref is maxed out we can't dup this message anymore. 613 */ 614 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 615 kmem_cache_free(mblk_cache, new_mp); 616 new_mp = NULL; 617 goto out; 618 } 619 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 620 621 out: 622 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 623 return (new_mp); 624 } 625 626 static void 627 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 628 { 629 frtn_t *frp = dbp->db_frtnp; 630 631 ASSERT(dbp->db_mblk == mp); 632 frp->free_func(frp->free_arg); 633 if (dbp->db_fthdr != NULL) 634 str_ftfree(dbp); 635 636 /* set credp and projid to be 'unspecified' before returning to cache */ 637 if (dbp->db_credp != NULL) { 638 crfree(dbp->db_credp); 639 dbp->db_credp = NULL; 640 } 641 dbp->db_cpid = -1; 642 dbp->db_struioflag = 0; 643 dbp->db_struioun.cksum.flags = 0; 644 645 kmem_cache_free(dbp->db_cache, dbp); 646 } 647 648 /*ARGSUSED*/ 649 static void 650 frnop_func(void *arg) 651 { 652 } 653 654 /* 655 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 656 */ 657 static mblk_t * 658 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 659 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 660 { 661 dblk_t *dbp; 662 mblk_t *mp; 663 664 ASSERT(base != NULL && frp != NULL); 665 666 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 667 mp = NULL; 668 goto out; 669 } 670 671 mp = dbp->db_mblk; 672 dbp->db_base = base; 673 dbp->db_lim = base + size; 674 dbp->db_free = dbp->db_lastfree = lastfree; 675 dbp->db_frtnp = frp; 676 DBLK_RTFU_WORD(dbp) = db_rtfu; 677 mp->b_next = mp->b_prev = mp->b_cont = NULL; 678 mp->b_rptr = mp->b_wptr = base; 679 mp->b_queue = NULL; 680 MBLK_BAND_FLAG_WORD(mp) = 0; 681 682 out: 683 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 684 return (mp); 685 } 686 687 /*ARGSUSED*/ 688 mblk_t * 689 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 690 { 691 mblk_t *mp; 692 693 /* 694 * Note that this is structured to allow the common case (i.e. 695 * STREAMS flowtracing disabled) to call gesballoc() with tail 696 * call optimization. 697 */ 698 if (!str_ftnever) { 699 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 700 frp, freebs_enqueue, KM_NOSLEEP); 701 702 if (mp != NULL) 703 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 704 return (mp); 705 } 706 707 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 708 frp, freebs_enqueue, KM_NOSLEEP)); 709 } 710 711 /* 712 * Same as esballoc() but sleeps waiting for memory. 713 */ 714 /*ARGSUSED*/ 715 mblk_t * 716 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 717 { 718 mblk_t *mp; 719 720 /* 721 * Note that this is structured to allow the common case (i.e. 722 * STREAMS flowtracing disabled) to call gesballoc() with tail 723 * call optimization. 724 */ 725 if (!str_ftnever) { 726 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 727 frp, freebs_enqueue, KM_SLEEP); 728 729 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 730 return (mp); 731 } 732 733 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 734 frp, freebs_enqueue, KM_SLEEP)); 735 } 736 737 /*ARGSUSED*/ 738 mblk_t * 739 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 740 { 741 mblk_t *mp; 742 743 /* 744 * Note that this is structured to allow the common case (i.e. 745 * STREAMS flowtracing disabled) to call gesballoc() with tail 746 * call optimization. 747 */ 748 if (!str_ftnever) { 749 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 750 frp, dblk_lastfree_desb, KM_NOSLEEP); 751 752 if (mp != NULL) 753 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 754 return (mp); 755 } 756 757 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 758 frp, dblk_lastfree_desb, KM_NOSLEEP)); 759 } 760 761 /*ARGSUSED*/ 762 mblk_t * 763 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 764 { 765 mblk_t *mp; 766 767 /* 768 * Note that this is structured to allow the common case (i.e. 769 * STREAMS flowtracing disabled) to call gesballoc() with tail 770 * call optimization. 771 */ 772 if (!str_ftnever) { 773 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 774 frp, freebs_enqueue, KM_NOSLEEP); 775 776 if (mp != NULL) 777 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 778 return (mp); 779 } 780 781 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 782 frp, freebs_enqueue, KM_NOSLEEP)); 783 } 784 785 /*ARGSUSED*/ 786 mblk_t * 787 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 788 { 789 mblk_t *mp; 790 791 /* 792 * Note that this is structured to allow the common case (i.e. 793 * STREAMS flowtracing disabled) to call gesballoc() with tail 794 * call optimization. 795 */ 796 if (!str_ftnever) { 797 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 798 frp, dblk_lastfree_desb, KM_NOSLEEP); 799 800 if (mp != NULL) 801 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 802 return (mp); 803 } 804 805 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 806 frp, dblk_lastfree_desb, KM_NOSLEEP)); 807 } 808 809 static void 810 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 811 { 812 bcache_t *bcp = dbp->db_cache; 813 814 ASSERT(dbp->db_mblk == mp); 815 if (dbp->db_fthdr != NULL) 816 str_ftfree(dbp); 817 818 /* set credp and projid to be 'unspecified' before returning to cache */ 819 if (dbp->db_credp != NULL) { 820 crfree(dbp->db_credp); 821 dbp->db_credp = NULL; 822 } 823 dbp->db_cpid = -1; 824 dbp->db_struioflag = 0; 825 dbp->db_struioun.cksum.flags = 0; 826 827 mutex_enter(&bcp->mutex); 828 kmem_cache_free(bcp->dblk_cache, dbp); 829 bcp->alloc--; 830 831 if (bcp->alloc == 0 && bcp->destroy != 0) { 832 kmem_cache_destroy(bcp->dblk_cache); 833 kmem_cache_destroy(bcp->buffer_cache); 834 mutex_exit(&bcp->mutex); 835 mutex_destroy(&bcp->mutex); 836 kmem_free(bcp, sizeof (bcache_t)); 837 } else { 838 mutex_exit(&bcp->mutex); 839 } 840 } 841 842 bcache_t * 843 bcache_create(char *name, size_t size, uint_t align) 844 { 845 bcache_t *bcp; 846 char buffer[255]; 847 848 ASSERT((align & (align - 1)) == 0); 849 850 if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == 851 NULL) { 852 return (NULL); 853 } 854 855 bcp->size = size; 856 bcp->align = align; 857 bcp->alloc = 0; 858 bcp->destroy = 0; 859 860 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 861 862 (void) sprintf(buffer, "%s_buffer_cache", name); 863 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 864 NULL, NULL, NULL, 0); 865 (void) sprintf(buffer, "%s_dblk_cache", name); 866 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 867 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 868 NULL, (void *)bcp, NULL, 0); 869 870 return (bcp); 871 } 872 873 void 874 bcache_destroy(bcache_t *bcp) 875 { 876 ASSERT(bcp != NULL); 877 878 mutex_enter(&bcp->mutex); 879 if (bcp->alloc == 0) { 880 kmem_cache_destroy(bcp->dblk_cache); 881 kmem_cache_destroy(bcp->buffer_cache); 882 mutex_exit(&bcp->mutex); 883 mutex_destroy(&bcp->mutex); 884 kmem_free(bcp, sizeof (bcache_t)); 885 } else { 886 bcp->destroy++; 887 mutex_exit(&bcp->mutex); 888 } 889 } 890 891 /*ARGSUSED*/ 892 mblk_t * 893 bcache_allocb(bcache_t *bcp, uint_t pri) 894 { 895 dblk_t *dbp; 896 mblk_t *mp = NULL; 897 898 ASSERT(bcp != NULL); 899 900 mutex_enter(&bcp->mutex); 901 if (bcp->destroy != 0) { 902 mutex_exit(&bcp->mutex); 903 goto out; 904 } 905 906 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 907 mutex_exit(&bcp->mutex); 908 goto out; 909 } 910 bcp->alloc++; 911 mutex_exit(&bcp->mutex); 912 913 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 914 915 mp = dbp->db_mblk; 916 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 917 mp->b_next = mp->b_prev = mp->b_cont = NULL; 918 mp->b_rptr = mp->b_wptr = dbp->db_base; 919 mp->b_queue = NULL; 920 MBLK_BAND_FLAG_WORD(mp) = 0; 921 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 922 out: 923 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 924 925 return (mp); 926 } 927 928 static void 929 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 930 { 931 ASSERT(dbp->db_mblk == mp); 932 if (dbp->db_fthdr != NULL) 933 str_ftfree(dbp); 934 935 /* set credp and projid to be 'unspecified' before returning to cache */ 936 if (dbp->db_credp != NULL) { 937 crfree(dbp->db_credp); 938 dbp->db_credp = NULL; 939 } 940 dbp->db_cpid = -1; 941 dbp->db_struioflag = 0; 942 dbp->db_struioun.cksum.flags = 0; 943 944 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 945 kmem_cache_free(dbp->db_cache, dbp); 946 } 947 948 static mblk_t * 949 allocb_oversize(size_t size, int kmflags) 950 { 951 mblk_t *mp; 952 void *buf; 953 954 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 955 if ((buf = kmem_alloc(size, kmflags)) == NULL) 956 return (NULL); 957 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 958 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 959 kmem_free(buf, size); 960 961 if (mp != NULL) 962 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 963 964 return (mp); 965 } 966 967 mblk_t * 968 allocb_tryhard(size_t target_size) 969 { 970 size_t size; 971 mblk_t *bp; 972 973 for (size = target_size; size < target_size + 512; 974 size += DBLK_CACHE_ALIGN) 975 if ((bp = allocb(size, BPRI_HI)) != NULL) 976 return (bp); 977 allocb_tryhard_fails++; 978 return (NULL); 979 } 980 981 /* 982 * This routine is consolidation private for STREAMS internal use 983 * This routine may only be called from sync routines (i.e., not 984 * from put or service procedures). It is located here (rather 985 * than strsubr.c) so that we don't have to expose all of the 986 * allocb() implementation details in header files. 987 */ 988 mblk_t * 989 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 990 { 991 dblk_t *dbp; 992 mblk_t *mp; 993 size_t index; 994 995 index = (size -1) >> DBLK_SIZE_SHIFT; 996 997 if (flags & STR_NOSIG) { 998 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 999 if (size != 0) { 1000 mp = allocb_oversize(size, KM_SLEEP); 1001 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1002 (uintptr_t)mp); 1003 return (mp); 1004 } 1005 index = 0; 1006 } 1007 1008 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1009 mp = dbp->db_mblk; 1010 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1011 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1012 mp->b_rptr = mp->b_wptr = dbp->db_base; 1013 mp->b_queue = NULL; 1014 MBLK_BAND_FLAG_WORD(mp) = 0; 1015 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1016 1017 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1018 1019 } else { 1020 while ((mp = allocb(size, pri)) == NULL) { 1021 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1022 return (NULL); 1023 } 1024 } 1025 1026 return (mp); 1027 } 1028 1029 /* 1030 * Call function 'func' with 'arg' when a class zero block can 1031 * be allocated with priority 'pri'. 1032 */ 1033 bufcall_id_t 1034 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1035 { 1036 return (bufcall(1, pri, func, arg)); 1037 } 1038 1039 /* 1040 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1041 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1042 * This provides consistency for all internal allocators of ioctl. 1043 */ 1044 mblk_t * 1045 mkiocb(uint_t cmd) 1046 { 1047 struct iocblk *ioc; 1048 mblk_t *mp; 1049 1050 /* 1051 * Allocate enough space for any of the ioctl related messages. 1052 */ 1053 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1054 return (NULL); 1055 1056 bzero(mp->b_rptr, sizeof (union ioctypes)); 1057 1058 /* 1059 * Set the mblk_t information and ptrs correctly. 1060 */ 1061 mp->b_wptr += sizeof (struct iocblk); 1062 mp->b_datap->db_type = M_IOCTL; 1063 1064 /* 1065 * Fill in the fields. 1066 */ 1067 ioc = (struct iocblk *)mp->b_rptr; 1068 ioc->ioc_cmd = cmd; 1069 ioc->ioc_cr = kcred; 1070 ioc->ioc_id = getiocseqno(); 1071 ioc->ioc_flag = IOC_NATIVE; 1072 return (mp); 1073 } 1074 1075 /* 1076 * test if block of given size can be allocated with a request of 1077 * the given priority. 1078 * 'pri' is no longer used, but is retained for compatibility. 1079 */ 1080 /* ARGSUSED */ 1081 int 1082 testb(size_t size, uint_t pri) 1083 { 1084 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1085 } 1086 1087 /* 1088 * Call function 'func' with argument 'arg' when there is a reasonably 1089 * good chance that a block of size 'size' can be allocated. 1090 * 'pri' is no longer used, but is retained for compatibility. 1091 */ 1092 /* ARGSUSED */ 1093 bufcall_id_t 1094 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1095 { 1096 static long bid = 1; /* always odd to save checking for zero */ 1097 bufcall_id_t bc_id; 1098 struct strbufcall *bcp; 1099 1100 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1101 return (0); 1102 1103 bcp->bc_func = func; 1104 bcp->bc_arg = arg; 1105 bcp->bc_size = size; 1106 bcp->bc_next = NULL; 1107 bcp->bc_executor = NULL; 1108 1109 mutex_enter(&strbcall_lock); 1110 /* 1111 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1112 * should be no references to bcp since it may be freed by 1113 * runbufcalls(). Since bcp_id field is returned, we save its value in 1114 * the local var. 1115 */ 1116 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1117 1118 /* 1119 * add newly allocated stream event to existing 1120 * linked list of events. 1121 */ 1122 if (strbcalls.bc_head == NULL) { 1123 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1124 } else { 1125 strbcalls.bc_tail->bc_next = bcp; 1126 strbcalls.bc_tail = bcp; 1127 } 1128 1129 cv_signal(&strbcall_cv); 1130 mutex_exit(&strbcall_lock); 1131 return (bc_id); 1132 } 1133 1134 /* 1135 * Cancel a bufcall request. 1136 */ 1137 void 1138 unbufcall(bufcall_id_t id) 1139 { 1140 strbufcall_t *bcp, *pbcp; 1141 1142 mutex_enter(&strbcall_lock); 1143 again: 1144 pbcp = NULL; 1145 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1146 if (id == bcp->bc_id) 1147 break; 1148 pbcp = bcp; 1149 } 1150 if (bcp) { 1151 if (bcp->bc_executor != NULL) { 1152 if (bcp->bc_executor != curthread) { 1153 cv_wait(&bcall_cv, &strbcall_lock); 1154 goto again; 1155 } 1156 } else { 1157 if (pbcp) 1158 pbcp->bc_next = bcp->bc_next; 1159 else 1160 strbcalls.bc_head = bcp->bc_next; 1161 if (bcp == strbcalls.bc_tail) 1162 strbcalls.bc_tail = pbcp; 1163 kmem_free(bcp, sizeof (strbufcall_t)); 1164 } 1165 } 1166 mutex_exit(&strbcall_lock); 1167 } 1168 1169 /* 1170 * Duplicate a message block by block (uses dupb), returning 1171 * a pointer to the duplicate message. 1172 * Returns a non-NULL value only if the entire message 1173 * was dup'd. 1174 */ 1175 mblk_t * 1176 dupmsg(mblk_t *bp) 1177 { 1178 mblk_t *head, *nbp; 1179 1180 if (!bp || !(nbp = head = dupb(bp))) 1181 return (NULL); 1182 1183 while (bp->b_cont) { 1184 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1185 freemsg(head); 1186 return (NULL); 1187 } 1188 nbp = nbp->b_cont; 1189 bp = bp->b_cont; 1190 } 1191 return (head); 1192 } 1193 1194 #define DUPB_NOLOAN(bp) \ 1195 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1196 copyb((bp)) : dupb((bp))) 1197 1198 mblk_t * 1199 dupmsg_noloan(mblk_t *bp) 1200 { 1201 mblk_t *head, *nbp; 1202 1203 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1204 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1205 return (NULL); 1206 1207 while (bp->b_cont) { 1208 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1209 freemsg(head); 1210 return (NULL); 1211 } 1212 nbp = nbp->b_cont; 1213 bp = bp->b_cont; 1214 } 1215 return (head); 1216 } 1217 1218 /* 1219 * Copy data from message and data block to newly allocated message and 1220 * data block. Returns new message block pointer, or NULL if error. 1221 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1222 * as in the original even when db_base is not word aligned. (bug 1052877) 1223 */ 1224 mblk_t * 1225 copyb(mblk_t *bp) 1226 { 1227 mblk_t *nbp; 1228 dblk_t *dp, *ndp; 1229 uchar_t *base; 1230 size_t size; 1231 size_t unaligned; 1232 1233 ASSERT(bp->b_wptr >= bp->b_rptr); 1234 1235 dp = bp->b_datap; 1236 if (dp->db_fthdr != NULL) 1237 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1238 1239 /* 1240 * Special handling for Multidata message; this should be 1241 * removed once a copy-callback routine is made available. 1242 */ 1243 if (dp->db_type == M_MULTIDATA) { 1244 cred_t *cr; 1245 1246 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1247 return (NULL); 1248 1249 nbp->b_flag = bp->b_flag; 1250 nbp->b_band = bp->b_band; 1251 ndp = nbp->b_datap; 1252 1253 /* See comments below on potential issues. */ 1254 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1255 1256 ASSERT(ndp->db_type == dp->db_type); 1257 cr = dp->db_credp; 1258 if (cr != NULL) 1259 crhold(ndp->db_credp = cr); 1260 ndp->db_cpid = dp->db_cpid; 1261 return (nbp); 1262 } 1263 1264 size = dp->db_lim - dp->db_base; 1265 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1266 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1267 return (NULL); 1268 nbp->b_flag = bp->b_flag; 1269 nbp->b_band = bp->b_band; 1270 ndp = nbp->b_datap; 1271 1272 /* 1273 * Well, here is a potential issue. If we are trying to 1274 * trace a flow, and we copy the message, we might lose 1275 * information about where this message might have been. 1276 * So we should inherit the FT data. On the other hand, 1277 * a user might be interested only in alloc to free data. 1278 * So I guess the real answer is to provide a tunable. 1279 */ 1280 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1281 1282 base = ndp->db_base + unaligned; 1283 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1284 1285 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1286 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1287 1288 return (nbp); 1289 } 1290 1291 /* 1292 * Copy data from message to newly allocated message using new 1293 * data blocks. Returns a pointer to the new message, or NULL if error. 1294 */ 1295 mblk_t * 1296 copymsg(mblk_t *bp) 1297 { 1298 mblk_t *head, *nbp; 1299 1300 if (!bp || !(nbp = head = copyb(bp))) 1301 return (NULL); 1302 1303 while (bp->b_cont) { 1304 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1305 freemsg(head); 1306 return (NULL); 1307 } 1308 nbp = nbp->b_cont; 1309 bp = bp->b_cont; 1310 } 1311 return (head); 1312 } 1313 1314 /* 1315 * link a message block to tail of message 1316 */ 1317 void 1318 linkb(mblk_t *mp, mblk_t *bp) 1319 { 1320 ASSERT(mp && bp); 1321 1322 for (; mp->b_cont; mp = mp->b_cont) 1323 ; 1324 mp->b_cont = bp; 1325 } 1326 1327 /* 1328 * unlink a message block from head of message 1329 * return pointer to new message. 1330 * NULL if message becomes empty. 1331 */ 1332 mblk_t * 1333 unlinkb(mblk_t *bp) 1334 { 1335 mblk_t *bp1; 1336 1337 bp1 = bp->b_cont; 1338 bp->b_cont = NULL; 1339 return (bp1); 1340 } 1341 1342 /* 1343 * remove a message block "bp" from message "mp" 1344 * 1345 * Return pointer to new message or NULL if no message remains. 1346 * Return -1 if bp is not found in message. 1347 */ 1348 mblk_t * 1349 rmvb(mblk_t *mp, mblk_t *bp) 1350 { 1351 mblk_t *tmp; 1352 mblk_t *lastp = NULL; 1353 1354 ASSERT(mp && bp); 1355 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1356 if (tmp == bp) { 1357 if (lastp) 1358 lastp->b_cont = tmp->b_cont; 1359 else 1360 mp = tmp->b_cont; 1361 tmp->b_cont = NULL; 1362 return (mp); 1363 } 1364 lastp = tmp; 1365 } 1366 return ((mblk_t *)-1); 1367 } 1368 1369 /* 1370 * Concatenate and align first len bytes of common 1371 * message type. Len == -1, means concat everything. 1372 * Returns 1 on success, 0 on failure 1373 * After the pullup, mp points to the pulled up data. 1374 */ 1375 int 1376 pullupmsg(mblk_t *mp, ssize_t len) 1377 { 1378 mblk_t *bp, *b_cont; 1379 dblk_t *dbp; 1380 ssize_t n; 1381 1382 ASSERT(mp->b_datap->db_ref > 0); 1383 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1384 1385 /* 1386 * We won't handle Multidata message, since it contains 1387 * metadata which this function has no knowledge of; we 1388 * assert on DEBUG, and return failure otherwise. 1389 */ 1390 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1391 if (mp->b_datap->db_type == M_MULTIDATA) 1392 return (0); 1393 1394 if (len == -1) { 1395 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1396 return (1); 1397 len = xmsgsize(mp); 1398 } else { 1399 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1400 ASSERT(first_mblk_len >= 0); 1401 /* 1402 * If the length is less than that of the first mblk, 1403 * we want to pull up the message into an aligned mblk. 1404 * Though not part of the spec, some callers assume it. 1405 */ 1406 if (len <= first_mblk_len) { 1407 if (str_aligned(mp->b_rptr)) 1408 return (1); 1409 len = first_mblk_len; 1410 } else if (xmsgsize(mp) < len) 1411 return (0); 1412 } 1413 1414 if ((bp = allocb_tmpl(len, mp)) == NULL) 1415 return (0); 1416 1417 dbp = bp->b_datap; 1418 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1419 mp->b_datap = dbp; /* ... and mp heads the new message */ 1420 mp->b_datap->db_mblk = mp; 1421 bp->b_datap->db_mblk = bp; 1422 mp->b_rptr = mp->b_wptr = dbp->db_base; 1423 1424 do { 1425 ASSERT(bp->b_datap->db_ref > 0); 1426 ASSERT(bp->b_wptr >= bp->b_rptr); 1427 n = MIN(bp->b_wptr - bp->b_rptr, len); 1428 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1429 mp->b_wptr += n; 1430 bp->b_rptr += n; 1431 len -= n; 1432 if (bp->b_rptr != bp->b_wptr) 1433 break; 1434 b_cont = bp->b_cont; 1435 freeb(bp); 1436 bp = b_cont; 1437 } while (len && bp); 1438 1439 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1440 1441 return (1); 1442 } 1443 1444 /* 1445 * Concatenate and align at least the first len bytes of common message 1446 * type. Len == -1 means concatenate everything. The original message is 1447 * unaltered. Returns a pointer to a new message on success, otherwise 1448 * returns NULL. 1449 */ 1450 mblk_t * 1451 msgpullup(mblk_t *mp, ssize_t len) 1452 { 1453 mblk_t *newmp; 1454 ssize_t totlen; 1455 ssize_t n; 1456 1457 /* 1458 * We won't handle Multidata message, since it contains 1459 * metadata which this function has no knowledge of; we 1460 * assert on DEBUG, and return failure otherwise. 1461 */ 1462 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1463 if (mp->b_datap->db_type == M_MULTIDATA) 1464 return (NULL); 1465 1466 totlen = xmsgsize(mp); 1467 1468 if ((len > 0) && (len > totlen)) 1469 return (NULL); 1470 1471 /* 1472 * Copy all of the first msg type into one new mblk, then dupmsg 1473 * and link the rest onto this. 1474 */ 1475 1476 len = totlen; 1477 1478 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1479 return (NULL); 1480 1481 newmp->b_flag = mp->b_flag; 1482 newmp->b_band = mp->b_band; 1483 1484 while (len > 0) { 1485 n = mp->b_wptr - mp->b_rptr; 1486 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1487 if (n > 0) 1488 bcopy(mp->b_rptr, newmp->b_wptr, n); 1489 newmp->b_wptr += n; 1490 len -= n; 1491 mp = mp->b_cont; 1492 } 1493 1494 if (mp != NULL) { 1495 newmp->b_cont = dupmsg(mp); 1496 if (newmp->b_cont == NULL) { 1497 freemsg(newmp); 1498 return (NULL); 1499 } 1500 } 1501 1502 return (newmp); 1503 } 1504 1505 /* 1506 * Trim bytes from message 1507 * len > 0, trim from head 1508 * len < 0, trim from tail 1509 * Returns 1 on success, 0 on failure. 1510 */ 1511 int 1512 adjmsg(mblk_t *mp, ssize_t len) 1513 { 1514 mblk_t *bp; 1515 mblk_t *save_bp = NULL; 1516 mblk_t *prev_bp; 1517 mblk_t *bcont; 1518 unsigned char type; 1519 ssize_t n; 1520 int fromhead; 1521 int first; 1522 1523 ASSERT(mp != NULL); 1524 /* 1525 * We won't handle Multidata message, since it contains 1526 * metadata which this function has no knowledge of; we 1527 * assert on DEBUG, and return failure otherwise. 1528 */ 1529 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1530 if (mp->b_datap->db_type == M_MULTIDATA) 1531 return (0); 1532 1533 if (len < 0) { 1534 fromhead = 0; 1535 len = -len; 1536 } else { 1537 fromhead = 1; 1538 } 1539 1540 if (xmsgsize(mp) < len) 1541 return (0); 1542 1543 1544 if (fromhead) { 1545 first = 1; 1546 while (len) { 1547 ASSERT(mp->b_wptr >= mp->b_rptr); 1548 n = MIN(mp->b_wptr - mp->b_rptr, len); 1549 mp->b_rptr += n; 1550 len -= n; 1551 1552 /* 1553 * If this is not the first zero length 1554 * message remove it 1555 */ 1556 if (!first && (mp->b_wptr == mp->b_rptr)) { 1557 bcont = mp->b_cont; 1558 freeb(mp); 1559 mp = save_bp->b_cont = bcont; 1560 } else { 1561 save_bp = mp; 1562 mp = mp->b_cont; 1563 } 1564 first = 0; 1565 } 1566 } else { 1567 type = mp->b_datap->db_type; 1568 while (len) { 1569 bp = mp; 1570 save_bp = NULL; 1571 1572 /* 1573 * Find the last message of same type 1574 */ 1575 1576 while (bp && bp->b_datap->db_type == type) { 1577 ASSERT(bp->b_wptr >= bp->b_rptr); 1578 prev_bp = save_bp; 1579 save_bp = bp; 1580 bp = bp->b_cont; 1581 } 1582 if (save_bp == NULL) 1583 break; 1584 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1585 save_bp->b_wptr -= n; 1586 len -= n; 1587 1588 /* 1589 * If this is not the first message 1590 * and we have taken away everything 1591 * from this message, remove it 1592 */ 1593 1594 if ((save_bp != mp) && 1595 (save_bp->b_wptr == save_bp->b_rptr)) { 1596 bcont = save_bp->b_cont; 1597 freeb(save_bp); 1598 prev_bp->b_cont = bcont; 1599 } 1600 } 1601 } 1602 return (1); 1603 } 1604 1605 /* 1606 * get number of data bytes in message 1607 */ 1608 size_t 1609 msgdsize(mblk_t *bp) 1610 { 1611 size_t count = 0; 1612 1613 for (; bp; bp = bp->b_cont) 1614 if (bp->b_datap->db_type == M_DATA) { 1615 ASSERT(bp->b_wptr >= bp->b_rptr); 1616 count += bp->b_wptr - bp->b_rptr; 1617 } 1618 return (count); 1619 } 1620 1621 /* 1622 * Get a message off head of queue 1623 * 1624 * If queue has no buffers then mark queue 1625 * with QWANTR. (queue wants to be read by 1626 * someone when data becomes available) 1627 * 1628 * If there is something to take off then do so. 1629 * If queue falls below hi water mark turn off QFULL 1630 * flag. Decrement weighted count of queue. 1631 * Also turn off QWANTR because queue is being read. 1632 * 1633 * The queue count is maintained on a per-band basis. 1634 * Priority band 0 (normal messages) uses q_count, 1635 * q_lowat, etc. Non-zero priority bands use the 1636 * fields in their respective qband structures 1637 * (qb_count, qb_lowat, etc.) All messages appear 1638 * on the same list, linked via their b_next pointers. 1639 * q_first is the head of the list. q_count does 1640 * not reflect the size of all the messages on the 1641 * queue. It only reflects those messages in the 1642 * normal band of flow. The one exception to this 1643 * deals with high priority messages. They are in 1644 * their own conceptual "band", but are accounted 1645 * against q_count. 1646 * 1647 * If queue count is below the lo water mark and QWANTW 1648 * is set, enable the closest backq which has a service 1649 * procedure and turn off the QWANTW flag. 1650 * 1651 * getq could be built on top of rmvq, but isn't because 1652 * of performance considerations. 1653 * 1654 * A note on the use of q_count and q_mblkcnt: 1655 * q_count is the traditional byte count for messages that 1656 * have been put on a queue. Documentation tells us that 1657 * we shouldn't rely on that count, but some drivers/modules 1658 * do. What was needed, however, is a mechanism to prevent 1659 * runaway streams from consuming all of the resources, 1660 * and particularly be able to flow control zero-length 1661 * messages. q_mblkcnt is used for this purpose. It 1662 * counts the number of mblk's that are being put on 1663 * the queue. The intention here, is that each mblk should 1664 * contain one byte of data and, for the purpose of 1665 * flow-control, logically does. A queue will become 1666 * full when EITHER of these values (q_count and q_mblkcnt) 1667 * reach the highwater mark. It will clear when BOTH 1668 * of them drop below the highwater mark. And it will 1669 * backenable when BOTH of them drop below the lowwater 1670 * mark. 1671 * With this algorithm, a driver/module might be able 1672 * to find a reasonably accurate q_count, and the 1673 * framework can still try and limit resource usage. 1674 */ 1675 mblk_t * 1676 getq(queue_t *q) 1677 { 1678 mblk_t *bp; 1679 uchar_t band = 0; 1680 1681 bp = getq_noenab(q); 1682 if (bp != NULL) 1683 band = bp->b_band; 1684 1685 /* 1686 * Inlined from qbackenable(). 1687 * Quick check without holding the lock. 1688 */ 1689 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1690 return (bp); 1691 1692 qbackenable(q, band); 1693 return (bp); 1694 } 1695 1696 /* 1697 * Calculate number of data bytes in a single data message block taking 1698 * multidata messages into account. 1699 */ 1700 1701 #define ADD_MBLK_SIZE(mp, size) \ 1702 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1703 (size) += MBLKL(mp); \ 1704 } else { \ 1705 uint_t pinuse; \ 1706 \ 1707 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1708 (size) += pinuse; \ 1709 } 1710 1711 /* 1712 * Like getq() but does not backenable. This is used by the stream 1713 * head when a putback() is likely. The caller must call qbackenable() 1714 * after it is done with accessing the queue. 1715 */ 1716 mblk_t * 1717 getq_noenab(queue_t *q) 1718 { 1719 mblk_t *bp; 1720 mblk_t *tmp; 1721 qband_t *qbp; 1722 kthread_id_t freezer; 1723 int bytecnt = 0, mblkcnt = 0; 1724 1725 /* freezestr should allow its caller to call getq/putq */ 1726 freezer = STREAM(q)->sd_freezer; 1727 if (freezer == curthread) { 1728 ASSERT(frozenstr(q)); 1729 ASSERT(MUTEX_HELD(QLOCK(q))); 1730 } else 1731 mutex_enter(QLOCK(q)); 1732 1733 if ((bp = q->q_first) == 0) { 1734 q->q_flag |= QWANTR; 1735 } else { 1736 if ((q->q_first = bp->b_next) == NULL) 1737 q->q_last = NULL; 1738 else 1739 q->q_first->b_prev = NULL; 1740 1741 /* Get message byte count for q_count accounting */ 1742 for (tmp = bp; tmp; tmp = tmp->b_cont) { 1743 ADD_MBLK_SIZE(tmp, bytecnt); 1744 mblkcnt++; 1745 } 1746 1747 if (bp->b_band == 0) { 1748 q->q_count -= bytecnt; 1749 q->q_mblkcnt -= mblkcnt; 1750 if ((q->q_count < q->q_hiwat) && 1751 (q->q_mblkcnt < q->q_hiwat)) { 1752 q->q_flag &= ~QFULL; 1753 } 1754 } else { 1755 int i; 1756 1757 ASSERT(bp->b_band <= q->q_nband); 1758 ASSERT(q->q_bandp != NULL); 1759 ASSERT(MUTEX_HELD(QLOCK(q))); 1760 qbp = q->q_bandp; 1761 i = bp->b_band; 1762 while (--i > 0) 1763 qbp = qbp->qb_next; 1764 if (qbp->qb_first == qbp->qb_last) { 1765 qbp->qb_first = NULL; 1766 qbp->qb_last = NULL; 1767 } else { 1768 qbp->qb_first = bp->b_next; 1769 } 1770 qbp->qb_count -= bytecnt; 1771 qbp->qb_mblkcnt -= mblkcnt; 1772 if ((qbp->qb_count < qbp->qb_hiwat) && 1773 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1774 qbp->qb_flag &= ~QB_FULL; 1775 } 1776 } 1777 q->q_flag &= ~QWANTR; 1778 bp->b_next = NULL; 1779 bp->b_prev = NULL; 1780 } 1781 if (freezer != curthread) 1782 mutex_exit(QLOCK(q)); 1783 1784 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 1785 1786 return (bp); 1787 } 1788 1789 /* 1790 * Determine if a backenable is needed after removing a message in the 1791 * specified band. 1792 * NOTE: This routine assumes that something like getq_noenab() has been 1793 * already called. 1794 * 1795 * For the read side it is ok to hold sd_lock across calling this (and the 1796 * stream head often does). 1797 * But for the write side strwakeq might be invoked and it acquires sd_lock. 1798 */ 1799 void 1800 qbackenable(queue_t *q, uchar_t band) 1801 { 1802 int backenab = 0; 1803 qband_t *qbp; 1804 kthread_id_t freezer; 1805 1806 ASSERT(q); 1807 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 1808 1809 /* 1810 * Quick check without holding the lock. 1811 * OK since after getq() has lowered the q_count these flags 1812 * would not change unless either the qbackenable() is done by 1813 * another thread (which is ok) or the queue has gotten QFULL 1814 * in which case another backenable will take place when the queue 1815 * drops below q_lowat. 1816 */ 1817 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1818 return; 1819 1820 /* freezestr should allow its caller to call getq/putq */ 1821 freezer = STREAM(q)->sd_freezer; 1822 if (freezer == curthread) { 1823 ASSERT(frozenstr(q)); 1824 ASSERT(MUTEX_HELD(QLOCK(q))); 1825 } else 1826 mutex_enter(QLOCK(q)); 1827 1828 if (band == 0) { 1829 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 1830 q->q_mblkcnt < q->q_lowat)) { 1831 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 1832 } 1833 } else { 1834 int i; 1835 1836 ASSERT((unsigned)band <= q->q_nband); 1837 ASSERT(q->q_bandp != NULL); 1838 1839 qbp = q->q_bandp; 1840 i = band; 1841 while (--i > 0) 1842 qbp = qbp->qb_next; 1843 1844 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 1845 qbp->qb_mblkcnt < qbp->qb_lowat)) { 1846 backenab = qbp->qb_flag & QB_WANTW; 1847 } 1848 } 1849 1850 if (backenab == 0) { 1851 if (freezer != curthread) 1852 mutex_exit(QLOCK(q)); 1853 return; 1854 } 1855 1856 /* Have to drop the lock across strwakeq and backenable */ 1857 if (backenab & QWANTWSYNC) 1858 q->q_flag &= ~QWANTWSYNC; 1859 if (backenab & (QWANTW|QB_WANTW)) { 1860 if (band != 0) 1861 qbp->qb_flag &= ~QB_WANTW; 1862 else { 1863 q->q_flag &= ~QWANTW; 1864 } 1865 } 1866 1867 if (freezer != curthread) 1868 mutex_exit(QLOCK(q)); 1869 1870 if (backenab & QWANTWSYNC) 1871 strwakeq(q, QWANTWSYNC); 1872 if (backenab & (QWANTW|QB_WANTW)) 1873 backenable(q, band); 1874 } 1875 1876 /* 1877 * Remove a message from a queue. The queue count and other 1878 * flow control parameters are adjusted and the back queue 1879 * enabled if necessary. 1880 * 1881 * rmvq can be called with the stream frozen, but other utility functions 1882 * holding QLOCK, and by streams modules without any locks/frozen. 1883 */ 1884 void 1885 rmvq(queue_t *q, mblk_t *mp) 1886 { 1887 ASSERT(mp != NULL); 1888 1889 rmvq_noenab(q, mp); 1890 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 1891 /* 1892 * qbackenable can handle a frozen stream but not a "random" 1893 * qlock being held. Drop lock across qbackenable. 1894 */ 1895 mutex_exit(QLOCK(q)); 1896 qbackenable(q, mp->b_band); 1897 mutex_enter(QLOCK(q)); 1898 } else { 1899 qbackenable(q, mp->b_band); 1900 } 1901 } 1902 1903 /* 1904 * Like rmvq() but without any backenabling. 1905 * This exists to handle SR_CONSOL_DATA in strrput(). 1906 */ 1907 void 1908 rmvq_noenab(queue_t *q, mblk_t *mp) 1909 { 1910 mblk_t *tmp; 1911 int i; 1912 qband_t *qbp = NULL; 1913 kthread_id_t freezer; 1914 int bytecnt = 0, mblkcnt = 0; 1915 1916 freezer = STREAM(q)->sd_freezer; 1917 if (freezer == curthread) { 1918 ASSERT(frozenstr(q)); 1919 ASSERT(MUTEX_HELD(QLOCK(q))); 1920 } else if (MUTEX_HELD(QLOCK(q))) { 1921 /* Don't drop lock on exit */ 1922 freezer = curthread; 1923 } else 1924 mutex_enter(QLOCK(q)); 1925 1926 ASSERT(mp->b_band <= q->q_nband); 1927 if (mp->b_band != 0) { /* Adjust band pointers */ 1928 ASSERT(q->q_bandp != NULL); 1929 qbp = q->q_bandp; 1930 i = mp->b_band; 1931 while (--i > 0) 1932 qbp = qbp->qb_next; 1933 if (mp == qbp->qb_first) { 1934 if (mp->b_next && mp->b_band == mp->b_next->b_band) 1935 qbp->qb_first = mp->b_next; 1936 else 1937 qbp->qb_first = NULL; 1938 } 1939 if (mp == qbp->qb_last) { 1940 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 1941 qbp->qb_last = mp->b_prev; 1942 else 1943 qbp->qb_last = NULL; 1944 } 1945 } 1946 1947 /* 1948 * Remove the message from the list. 1949 */ 1950 if (mp->b_prev) 1951 mp->b_prev->b_next = mp->b_next; 1952 else 1953 q->q_first = mp->b_next; 1954 if (mp->b_next) 1955 mp->b_next->b_prev = mp->b_prev; 1956 else 1957 q->q_last = mp->b_prev; 1958 mp->b_next = NULL; 1959 mp->b_prev = NULL; 1960 1961 /* Get the size of the message for q_count accounting */ 1962 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1963 ADD_MBLK_SIZE(tmp, bytecnt); 1964 mblkcnt++; 1965 } 1966 1967 if (mp->b_band == 0) { /* Perform q_count accounting */ 1968 q->q_count -= bytecnt; 1969 q->q_mblkcnt -= mblkcnt; 1970 if ((q->q_count < q->q_hiwat) && 1971 (q->q_mblkcnt < q->q_hiwat)) { 1972 q->q_flag &= ~QFULL; 1973 } 1974 } else { /* Perform qb_count accounting */ 1975 qbp->qb_count -= bytecnt; 1976 qbp->qb_mblkcnt -= mblkcnt; 1977 if ((qbp->qb_count < qbp->qb_hiwat) && 1978 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1979 qbp->qb_flag &= ~QB_FULL; 1980 } 1981 } 1982 if (freezer != curthread) 1983 mutex_exit(QLOCK(q)); 1984 1985 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 1986 } 1987 1988 /* 1989 * Empty a queue. 1990 * If flag is set, remove all messages. Otherwise, remove 1991 * only non-control messages. If queue falls below its low 1992 * water mark, and QWANTW is set, enable the nearest upstream 1993 * service procedure. 1994 * 1995 * Historical note: when merging the M_FLUSH code in strrput with this 1996 * code one difference was discovered. flushq did not have a check 1997 * for q_lowat == 0 in the backenabling test. 1998 * 1999 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2000 * if one exists on the queue. 2001 */ 2002 void 2003 flushq_common(queue_t *q, int flag, int pcproto_flag) 2004 { 2005 mblk_t *mp, *nmp; 2006 qband_t *qbp; 2007 int backenab = 0; 2008 unsigned char bpri; 2009 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2010 2011 if (q->q_first == NULL) 2012 return; 2013 2014 mutex_enter(QLOCK(q)); 2015 mp = q->q_first; 2016 q->q_first = NULL; 2017 q->q_last = NULL; 2018 q->q_count = 0; 2019 q->q_mblkcnt = 0; 2020 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2021 qbp->qb_first = NULL; 2022 qbp->qb_last = NULL; 2023 qbp->qb_count = 0; 2024 qbp->qb_mblkcnt = 0; 2025 qbp->qb_flag &= ~QB_FULL; 2026 } 2027 q->q_flag &= ~QFULL; 2028 mutex_exit(QLOCK(q)); 2029 while (mp) { 2030 nmp = mp->b_next; 2031 mp->b_next = mp->b_prev = NULL; 2032 2033 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2034 2035 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2036 (void) putq(q, mp); 2037 else if (flag || datamsg(mp->b_datap->db_type)) 2038 freemsg(mp); 2039 else 2040 (void) putq(q, mp); 2041 mp = nmp; 2042 } 2043 bpri = 1; 2044 mutex_enter(QLOCK(q)); 2045 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2046 if ((qbp->qb_flag & QB_WANTW) && 2047 (((qbp->qb_count < qbp->qb_lowat) && 2048 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2049 qbp->qb_lowat == 0)) { 2050 qbp->qb_flag &= ~QB_WANTW; 2051 backenab = 1; 2052 qbf[bpri] = 1; 2053 } else 2054 qbf[bpri] = 0; 2055 bpri++; 2056 } 2057 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2058 if ((q->q_flag & QWANTW) && 2059 (((q->q_count < q->q_lowat) && 2060 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2061 q->q_flag &= ~QWANTW; 2062 backenab = 1; 2063 qbf[0] = 1; 2064 } else 2065 qbf[0] = 0; 2066 2067 /* 2068 * If any band can now be written to, and there is a writer 2069 * for that band, then backenable the closest service procedure. 2070 */ 2071 if (backenab) { 2072 mutex_exit(QLOCK(q)); 2073 for (bpri = q->q_nband; bpri != 0; bpri--) 2074 if (qbf[bpri]) 2075 backenable(q, bpri); 2076 if (qbf[0]) 2077 backenable(q, 0); 2078 } else 2079 mutex_exit(QLOCK(q)); 2080 } 2081 2082 /* 2083 * The real flushing takes place in flushq_common. This is done so that 2084 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2085 * or not. Currently the only place that uses this flag is the stream head. 2086 */ 2087 void 2088 flushq(queue_t *q, int flag) 2089 { 2090 flushq_common(q, flag, 0); 2091 } 2092 2093 /* 2094 * Flush the queue of messages of the given priority band. 2095 * There is some duplication of code between flushq and flushband. 2096 * This is because we want to optimize the code as much as possible. 2097 * The assumption is that there will be more messages in the normal 2098 * (priority 0) band than in any other. 2099 * 2100 * Historical note: when merging the M_FLUSH code in strrput with this 2101 * code one difference was discovered. flushband had an extra check for 2102 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2103 * case. That check does not match the man page for flushband and was not 2104 * in the strrput flush code hence it was removed. 2105 */ 2106 void 2107 flushband(queue_t *q, unsigned char pri, int flag) 2108 { 2109 mblk_t *mp; 2110 mblk_t *nmp; 2111 mblk_t *last; 2112 qband_t *qbp; 2113 int band; 2114 2115 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2116 if (pri > q->q_nband) { 2117 return; 2118 } 2119 mutex_enter(QLOCK(q)); 2120 if (pri == 0) { 2121 mp = q->q_first; 2122 q->q_first = NULL; 2123 q->q_last = NULL; 2124 q->q_count = 0; 2125 q->q_mblkcnt = 0; 2126 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2127 qbp->qb_first = NULL; 2128 qbp->qb_last = NULL; 2129 qbp->qb_count = 0; 2130 qbp->qb_mblkcnt = 0; 2131 qbp->qb_flag &= ~QB_FULL; 2132 } 2133 q->q_flag &= ~QFULL; 2134 mutex_exit(QLOCK(q)); 2135 while (mp) { 2136 nmp = mp->b_next; 2137 mp->b_next = mp->b_prev = NULL; 2138 if ((mp->b_band == 0) && 2139 ((flag == FLUSHALL) || 2140 datamsg(mp->b_datap->db_type))) 2141 freemsg(mp); 2142 else 2143 (void) putq(q, mp); 2144 mp = nmp; 2145 } 2146 mutex_enter(QLOCK(q)); 2147 if ((q->q_flag & QWANTW) && 2148 (((q->q_count < q->q_lowat) && 2149 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2150 q->q_flag &= ~QWANTW; 2151 mutex_exit(QLOCK(q)); 2152 2153 backenable(q, pri); 2154 } else 2155 mutex_exit(QLOCK(q)); 2156 } else { /* pri != 0 */ 2157 boolean_t flushed = B_FALSE; 2158 band = pri; 2159 2160 ASSERT(MUTEX_HELD(QLOCK(q))); 2161 qbp = q->q_bandp; 2162 while (--band > 0) 2163 qbp = qbp->qb_next; 2164 mp = qbp->qb_first; 2165 if (mp == NULL) { 2166 mutex_exit(QLOCK(q)); 2167 return; 2168 } 2169 last = qbp->qb_last->b_next; 2170 /* 2171 * rmvq_noenab() and freemsg() are called for each mblk that 2172 * meets the criteria. The loop is executed until the last 2173 * mblk has been processed. 2174 */ 2175 while (mp != last) { 2176 ASSERT(mp->b_band == pri); 2177 nmp = mp->b_next; 2178 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2179 rmvq_noenab(q, mp); 2180 freemsg(mp); 2181 flushed = B_TRUE; 2182 } 2183 mp = nmp; 2184 } 2185 mutex_exit(QLOCK(q)); 2186 2187 /* 2188 * If any mblk(s) has been freed, we know that qbackenable() 2189 * will need to be called. 2190 */ 2191 if (flushed) 2192 qbackenable(q, pri); 2193 } 2194 } 2195 2196 /* 2197 * Return 1 if the queue is not full. If the queue is full, return 2198 * 0 (may not put message) and set QWANTW flag (caller wants to write 2199 * to the queue). 2200 */ 2201 int 2202 canput(queue_t *q) 2203 { 2204 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2205 2206 /* this is for loopback transports, they should not do a canput */ 2207 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2208 2209 /* Find next forward module that has a service procedure */ 2210 q = q->q_nfsrv; 2211 2212 if (!(q->q_flag & QFULL)) { 2213 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2214 return (1); 2215 } 2216 mutex_enter(QLOCK(q)); 2217 if (q->q_flag & QFULL) { 2218 q->q_flag |= QWANTW; 2219 mutex_exit(QLOCK(q)); 2220 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2221 return (0); 2222 } 2223 mutex_exit(QLOCK(q)); 2224 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2225 return (1); 2226 } 2227 2228 /* 2229 * This is the new canput for use with priority bands. Return 1 if the 2230 * band is not full. If the band is full, return 0 (may not put message) 2231 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2232 * write to the queue). 2233 */ 2234 int 2235 bcanput(queue_t *q, unsigned char pri) 2236 { 2237 qband_t *qbp; 2238 2239 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2240 if (!q) 2241 return (0); 2242 2243 /* Find next forward module that has a service procedure */ 2244 q = q->q_nfsrv; 2245 2246 mutex_enter(QLOCK(q)); 2247 if (pri == 0) { 2248 if (q->q_flag & QFULL) { 2249 q->q_flag |= QWANTW; 2250 mutex_exit(QLOCK(q)); 2251 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2252 "bcanput:%p %X %d", q, pri, 0); 2253 return (0); 2254 } 2255 } else { /* pri != 0 */ 2256 if (pri > q->q_nband) { 2257 /* 2258 * No band exists yet, so return success. 2259 */ 2260 mutex_exit(QLOCK(q)); 2261 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2262 "bcanput:%p %X %d", q, pri, 1); 2263 return (1); 2264 } 2265 qbp = q->q_bandp; 2266 while (--pri) 2267 qbp = qbp->qb_next; 2268 if (qbp->qb_flag & QB_FULL) { 2269 qbp->qb_flag |= QB_WANTW; 2270 mutex_exit(QLOCK(q)); 2271 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2272 "bcanput:%p %X %d", q, pri, 0); 2273 return (0); 2274 } 2275 } 2276 mutex_exit(QLOCK(q)); 2277 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2278 "bcanput:%p %X %d", q, pri, 1); 2279 return (1); 2280 } 2281 2282 /* 2283 * Put a message on a queue. 2284 * 2285 * Messages are enqueued on a priority basis. The priority classes 2286 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2287 * and B_NORMAL (type < QPCTL && band == 0). 2288 * 2289 * Add appropriate weighted data block sizes to queue count. 2290 * If queue hits high water mark then set QFULL flag. 2291 * 2292 * If QNOENAB is not set (putq is allowed to enable the queue), 2293 * enable the queue only if the message is PRIORITY, 2294 * or the QWANTR flag is set (indicating that the service procedure 2295 * is ready to read the queue. This implies that a service 2296 * procedure must NEVER put a high priority message back on its own 2297 * queue, as this would result in an infinite loop (!). 2298 */ 2299 int 2300 putq(queue_t *q, mblk_t *bp) 2301 { 2302 mblk_t *tmp; 2303 qband_t *qbp = NULL; 2304 int mcls = (int)queclass(bp); 2305 kthread_id_t freezer; 2306 int bytecnt = 0, mblkcnt = 0; 2307 2308 freezer = STREAM(q)->sd_freezer; 2309 if (freezer == curthread) { 2310 ASSERT(frozenstr(q)); 2311 ASSERT(MUTEX_HELD(QLOCK(q))); 2312 } else 2313 mutex_enter(QLOCK(q)); 2314 2315 /* 2316 * Make sanity checks and if qband structure is not yet 2317 * allocated, do so. 2318 */ 2319 if (mcls == QPCTL) { 2320 if (bp->b_band != 0) 2321 bp->b_band = 0; /* force to be correct */ 2322 } else if (bp->b_band != 0) { 2323 int i; 2324 qband_t **qbpp; 2325 2326 if (bp->b_band > q->q_nband) { 2327 2328 /* 2329 * The qband structure for this priority band is 2330 * not on the queue yet, so we have to allocate 2331 * one on the fly. It would be wasteful to 2332 * associate the qband structures with every 2333 * queue when the queues are allocated. This is 2334 * because most queues will only need the normal 2335 * band of flow which can be described entirely 2336 * by the queue itself. 2337 */ 2338 qbpp = &q->q_bandp; 2339 while (*qbpp) 2340 qbpp = &(*qbpp)->qb_next; 2341 while (bp->b_band > q->q_nband) { 2342 if ((*qbpp = allocband()) == NULL) { 2343 if (freezer != curthread) 2344 mutex_exit(QLOCK(q)); 2345 return (0); 2346 } 2347 (*qbpp)->qb_hiwat = q->q_hiwat; 2348 (*qbpp)->qb_lowat = q->q_lowat; 2349 q->q_nband++; 2350 qbpp = &(*qbpp)->qb_next; 2351 } 2352 } 2353 ASSERT(MUTEX_HELD(QLOCK(q))); 2354 qbp = q->q_bandp; 2355 i = bp->b_band; 2356 while (--i) 2357 qbp = qbp->qb_next; 2358 } 2359 2360 /* 2361 * If queue is empty, add the message and initialize the pointers. 2362 * Otherwise, adjust message pointers and queue pointers based on 2363 * the type of the message and where it belongs on the queue. Some 2364 * code is duplicated to minimize the number of conditionals and 2365 * hopefully minimize the amount of time this routine takes. 2366 */ 2367 if (!q->q_first) { 2368 bp->b_next = NULL; 2369 bp->b_prev = NULL; 2370 q->q_first = bp; 2371 q->q_last = bp; 2372 if (qbp) { 2373 qbp->qb_first = bp; 2374 qbp->qb_last = bp; 2375 } 2376 } else if (!qbp) { /* bp->b_band == 0 */ 2377 2378 /* 2379 * If queue class of message is less than or equal to 2380 * that of the last one on the queue, tack on to the end. 2381 */ 2382 tmp = q->q_last; 2383 if (mcls <= (int)queclass(tmp)) { 2384 bp->b_next = NULL; 2385 bp->b_prev = tmp; 2386 tmp->b_next = bp; 2387 q->q_last = bp; 2388 } else { 2389 tmp = q->q_first; 2390 while ((int)queclass(tmp) >= mcls) 2391 tmp = tmp->b_next; 2392 2393 /* 2394 * Insert bp before tmp. 2395 */ 2396 bp->b_next = tmp; 2397 bp->b_prev = tmp->b_prev; 2398 if (tmp->b_prev) 2399 tmp->b_prev->b_next = bp; 2400 else 2401 q->q_first = bp; 2402 tmp->b_prev = bp; 2403 } 2404 } else { /* bp->b_band != 0 */ 2405 if (qbp->qb_first) { 2406 tmp = qbp->qb_last; 2407 2408 /* 2409 * Insert bp after the last message in this band. 2410 */ 2411 bp->b_next = tmp->b_next; 2412 if (tmp->b_next) 2413 tmp->b_next->b_prev = bp; 2414 else 2415 q->q_last = bp; 2416 bp->b_prev = tmp; 2417 tmp->b_next = bp; 2418 } else { 2419 tmp = q->q_last; 2420 if ((mcls < (int)queclass(tmp)) || 2421 (bp->b_band <= tmp->b_band)) { 2422 2423 /* 2424 * Tack bp on end of queue. 2425 */ 2426 bp->b_next = NULL; 2427 bp->b_prev = tmp; 2428 tmp->b_next = bp; 2429 q->q_last = bp; 2430 } else { 2431 tmp = q->q_first; 2432 while (tmp->b_datap->db_type >= QPCTL) 2433 tmp = tmp->b_next; 2434 while (tmp->b_band >= bp->b_band) 2435 tmp = tmp->b_next; 2436 2437 /* 2438 * Insert bp before tmp. 2439 */ 2440 bp->b_next = tmp; 2441 bp->b_prev = tmp->b_prev; 2442 if (tmp->b_prev) 2443 tmp->b_prev->b_next = bp; 2444 else 2445 q->q_first = bp; 2446 tmp->b_prev = bp; 2447 } 2448 qbp->qb_first = bp; 2449 } 2450 qbp->qb_last = bp; 2451 } 2452 2453 /* Get message byte count for q_count accounting */ 2454 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2455 ADD_MBLK_SIZE(tmp, bytecnt); 2456 mblkcnt++; 2457 } 2458 2459 if (qbp) { 2460 qbp->qb_count += bytecnt; 2461 qbp->qb_mblkcnt += mblkcnt; 2462 if ((qbp->qb_count >= qbp->qb_hiwat) || 2463 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2464 qbp->qb_flag |= QB_FULL; 2465 } 2466 } else { 2467 q->q_count += bytecnt; 2468 q->q_mblkcnt += mblkcnt; 2469 if ((q->q_count >= q->q_hiwat) || 2470 (q->q_mblkcnt >= q->q_hiwat)) { 2471 q->q_flag |= QFULL; 2472 } 2473 } 2474 2475 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2476 2477 if ((mcls > QNORM) || 2478 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2479 qenable_locked(q); 2480 ASSERT(MUTEX_HELD(QLOCK(q))); 2481 if (freezer != curthread) 2482 mutex_exit(QLOCK(q)); 2483 2484 return (1); 2485 } 2486 2487 /* 2488 * Put stuff back at beginning of Q according to priority order. 2489 * See comment on putq above for details. 2490 */ 2491 int 2492 putbq(queue_t *q, mblk_t *bp) 2493 { 2494 mblk_t *tmp; 2495 qband_t *qbp = NULL; 2496 int mcls = (int)queclass(bp); 2497 kthread_id_t freezer; 2498 int bytecnt = 0, mblkcnt = 0; 2499 2500 ASSERT(q && bp); 2501 ASSERT(bp->b_next == NULL); 2502 freezer = STREAM(q)->sd_freezer; 2503 if (freezer == curthread) { 2504 ASSERT(frozenstr(q)); 2505 ASSERT(MUTEX_HELD(QLOCK(q))); 2506 } else 2507 mutex_enter(QLOCK(q)); 2508 2509 /* 2510 * Make sanity checks and if qband structure is not yet 2511 * allocated, do so. 2512 */ 2513 if (mcls == QPCTL) { 2514 if (bp->b_band != 0) 2515 bp->b_band = 0; /* force to be correct */ 2516 } else if (bp->b_band != 0) { 2517 int i; 2518 qband_t **qbpp; 2519 2520 if (bp->b_band > q->q_nband) { 2521 qbpp = &q->q_bandp; 2522 while (*qbpp) 2523 qbpp = &(*qbpp)->qb_next; 2524 while (bp->b_band > q->q_nband) { 2525 if ((*qbpp = allocband()) == NULL) { 2526 if (freezer != curthread) 2527 mutex_exit(QLOCK(q)); 2528 return (0); 2529 } 2530 (*qbpp)->qb_hiwat = q->q_hiwat; 2531 (*qbpp)->qb_lowat = q->q_lowat; 2532 q->q_nband++; 2533 qbpp = &(*qbpp)->qb_next; 2534 } 2535 } 2536 qbp = q->q_bandp; 2537 i = bp->b_band; 2538 while (--i) 2539 qbp = qbp->qb_next; 2540 } 2541 2542 /* 2543 * If queue is empty or if message is high priority, 2544 * place on the front of the queue. 2545 */ 2546 tmp = q->q_first; 2547 if ((!tmp) || (mcls == QPCTL)) { 2548 bp->b_next = tmp; 2549 if (tmp) 2550 tmp->b_prev = bp; 2551 else 2552 q->q_last = bp; 2553 q->q_first = bp; 2554 bp->b_prev = NULL; 2555 if (qbp) { 2556 qbp->qb_first = bp; 2557 qbp->qb_last = bp; 2558 } 2559 } else if (qbp) { /* bp->b_band != 0 */ 2560 tmp = qbp->qb_first; 2561 if (tmp) { 2562 2563 /* 2564 * Insert bp before the first message in this band. 2565 */ 2566 bp->b_next = tmp; 2567 bp->b_prev = tmp->b_prev; 2568 if (tmp->b_prev) 2569 tmp->b_prev->b_next = bp; 2570 else 2571 q->q_first = bp; 2572 tmp->b_prev = bp; 2573 } else { 2574 tmp = q->q_last; 2575 if ((mcls < (int)queclass(tmp)) || 2576 (bp->b_band < tmp->b_band)) { 2577 2578 /* 2579 * Tack bp on end of queue. 2580 */ 2581 bp->b_next = NULL; 2582 bp->b_prev = tmp; 2583 tmp->b_next = bp; 2584 q->q_last = bp; 2585 } else { 2586 tmp = q->q_first; 2587 while (tmp->b_datap->db_type >= QPCTL) 2588 tmp = tmp->b_next; 2589 while (tmp->b_band > bp->b_band) 2590 tmp = tmp->b_next; 2591 2592 /* 2593 * Insert bp before tmp. 2594 */ 2595 bp->b_next = tmp; 2596 bp->b_prev = tmp->b_prev; 2597 if (tmp->b_prev) 2598 tmp->b_prev->b_next = bp; 2599 else 2600 q->q_first = bp; 2601 tmp->b_prev = bp; 2602 } 2603 qbp->qb_last = bp; 2604 } 2605 qbp->qb_first = bp; 2606 } else { /* bp->b_band == 0 && !QPCTL */ 2607 2608 /* 2609 * If the queue class or band is less than that of the last 2610 * message on the queue, tack bp on the end of the queue. 2611 */ 2612 tmp = q->q_last; 2613 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2614 bp->b_next = NULL; 2615 bp->b_prev = tmp; 2616 tmp->b_next = bp; 2617 q->q_last = bp; 2618 } else { 2619 tmp = q->q_first; 2620 while (tmp->b_datap->db_type >= QPCTL) 2621 tmp = tmp->b_next; 2622 while (tmp->b_band > bp->b_band) 2623 tmp = tmp->b_next; 2624 2625 /* 2626 * Insert bp before tmp. 2627 */ 2628 bp->b_next = tmp; 2629 bp->b_prev = tmp->b_prev; 2630 if (tmp->b_prev) 2631 tmp->b_prev->b_next = bp; 2632 else 2633 q->q_first = bp; 2634 tmp->b_prev = bp; 2635 } 2636 } 2637 2638 /* Get message byte count for q_count accounting */ 2639 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2640 ADD_MBLK_SIZE(tmp, bytecnt); 2641 mblkcnt++; 2642 } 2643 if (qbp) { 2644 qbp->qb_count += bytecnt; 2645 qbp->qb_mblkcnt += mblkcnt; 2646 if ((qbp->qb_count >= qbp->qb_hiwat) || 2647 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2648 qbp->qb_flag |= QB_FULL; 2649 } 2650 } else { 2651 q->q_count += bytecnt; 2652 q->q_mblkcnt += mblkcnt; 2653 if ((q->q_count >= q->q_hiwat) || 2654 (q->q_mblkcnt >= q->q_hiwat)) { 2655 q->q_flag |= QFULL; 2656 } 2657 } 2658 2659 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2660 2661 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2662 qenable_locked(q); 2663 ASSERT(MUTEX_HELD(QLOCK(q))); 2664 if (freezer != curthread) 2665 mutex_exit(QLOCK(q)); 2666 2667 return (1); 2668 } 2669 2670 /* 2671 * Insert a message before an existing message on the queue. If the 2672 * existing message is NULL, the new messages is placed on the end of 2673 * the queue. The queue class of the new message is ignored. However, 2674 * the priority band of the new message must adhere to the following 2675 * ordering: 2676 * 2677 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2678 * 2679 * All flow control parameters are updated. 2680 * 2681 * insq can be called with the stream frozen, but other utility functions 2682 * holding QLOCK, and by streams modules without any locks/frozen. 2683 */ 2684 int 2685 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2686 { 2687 mblk_t *tmp; 2688 qband_t *qbp = NULL; 2689 int mcls = (int)queclass(mp); 2690 kthread_id_t freezer; 2691 int bytecnt = 0, mblkcnt = 0; 2692 2693 freezer = STREAM(q)->sd_freezer; 2694 if (freezer == curthread) { 2695 ASSERT(frozenstr(q)); 2696 ASSERT(MUTEX_HELD(QLOCK(q))); 2697 } else if (MUTEX_HELD(QLOCK(q))) { 2698 /* Don't drop lock on exit */ 2699 freezer = curthread; 2700 } else 2701 mutex_enter(QLOCK(q)); 2702 2703 if (mcls == QPCTL) { 2704 if (mp->b_band != 0) 2705 mp->b_band = 0; /* force to be correct */ 2706 if (emp && emp->b_prev && 2707 (emp->b_prev->b_datap->db_type < QPCTL)) 2708 goto badord; 2709 } 2710 if (emp) { 2711 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2712 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2713 (emp->b_prev->b_band < mp->b_band))) { 2714 goto badord; 2715 } 2716 } else { 2717 tmp = q->q_last; 2718 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2719 badord: 2720 cmn_err(CE_WARN, 2721 "insq: attempt to insert message out of order " 2722 "on q %p", (void *)q); 2723 if (freezer != curthread) 2724 mutex_exit(QLOCK(q)); 2725 return (0); 2726 } 2727 } 2728 2729 if (mp->b_band != 0) { 2730 int i; 2731 qband_t **qbpp; 2732 2733 if (mp->b_band > q->q_nband) { 2734 qbpp = &q->q_bandp; 2735 while (*qbpp) 2736 qbpp = &(*qbpp)->qb_next; 2737 while (mp->b_band > q->q_nband) { 2738 if ((*qbpp = allocband()) == NULL) { 2739 if (freezer != curthread) 2740 mutex_exit(QLOCK(q)); 2741 return (0); 2742 } 2743 (*qbpp)->qb_hiwat = q->q_hiwat; 2744 (*qbpp)->qb_lowat = q->q_lowat; 2745 q->q_nband++; 2746 qbpp = &(*qbpp)->qb_next; 2747 } 2748 } 2749 qbp = q->q_bandp; 2750 i = mp->b_band; 2751 while (--i) 2752 qbp = qbp->qb_next; 2753 } 2754 2755 if ((mp->b_next = emp) != NULL) { 2756 if ((mp->b_prev = emp->b_prev) != NULL) 2757 emp->b_prev->b_next = mp; 2758 else 2759 q->q_first = mp; 2760 emp->b_prev = mp; 2761 } else { 2762 if ((mp->b_prev = q->q_last) != NULL) 2763 q->q_last->b_next = mp; 2764 else 2765 q->q_first = mp; 2766 q->q_last = mp; 2767 } 2768 2769 /* Get mblk and byte count for q_count accounting */ 2770 for (tmp = mp; tmp; tmp = tmp->b_cont) { 2771 ADD_MBLK_SIZE(tmp, bytecnt); 2772 mblkcnt++; 2773 } 2774 2775 if (qbp) { /* adjust qband pointers and count */ 2776 if (!qbp->qb_first) { 2777 qbp->qb_first = mp; 2778 qbp->qb_last = mp; 2779 } else { 2780 if (mp->b_prev == NULL || (mp->b_prev != NULL && 2781 (mp->b_prev->b_band != mp->b_band))) 2782 qbp->qb_first = mp; 2783 else if (mp->b_next == NULL || (mp->b_next != NULL && 2784 (mp->b_next->b_band != mp->b_band))) 2785 qbp->qb_last = mp; 2786 } 2787 qbp->qb_count += bytecnt; 2788 qbp->qb_mblkcnt += mblkcnt; 2789 if ((qbp->qb_count >= qbp->qb_hiwat) || 2790 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2791 qbp->qb_flag |= QB_FULL; 2792 } 2793 } else { 2794 q->q_count += bytecnt; 2795 q->q_mblkcnt += mblkcnt; 2796 if ((q->q_count >= q->q_hiwat) || 2797 (q->q_mblkcnt >= q->q_hiwat)) { 2798 q->q_flag |= QFULL; 2799 } 2800 } 2801 2802 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 2803 2804 if (canenable(q) && (q->q_flag & QWANTR)) 2805 qenable_locked(q); 2806 2807 ASSERT(MUTEX_HELD(QLOCK(q))); 2808 if (freezer != curthread) 2809 mutex_exit(QLOCK(q)); 2810 2811 return (1); 2812 } 2813 2814 /* 2815 * Create and put a control message on queue. 2816 */ 2817 int 2818 putctl(queue_t *q, int type) 2819 { 2820 mblk_t *bp; 2821 2822 if ((datamsg(type) && (type != M_DELAY)) || 2823 (bp = allocb_tryhard(0)) == NULL) 2824 return (0); 2825 bp->b_datap->db_type = (unsigned char) type; 2826 2827 put(q, bp); 2828 2829 return (1); 2830 } 2831 2832 /* 2833 * Control message with a single-byte parameter 2834 */ 2835 int 2836 putctl1(queue_t *q, int type, int param) 2837 { 2838 mblk_t *bp; 2839 2840 if ((datamsg(type) && (type != M_DELAY)) || 2841 (bp = allocb_tryhard(1)) == NULL) 2842 return (0); 2843 bp->b_datap->db_type = (unsigned char)type; 2844 *bp->b_wptr++ = (unsigned char)param; 2845 2846 put(q, bp); 2847 2848 return (1); 2849 } 2850 2851 int 2852 putnextctl1(queue_t *q, int type, int param) 2853 { 2854 mblk_t *bp; 2855 2856 if ((datamsg(type) && (type != M_DELAY)) || 2857 ((bp = allocb_tryhard(1)) == NULL)) 2858 return (0); 2859 2860 bp->b_datap->db_type = (unsigned char)type; 2861 *bp->b_wptr++ = (unsigned char)param; 2862 2863 putnext(q, bp); 2864 2865 return (1); 2866 } 2867 2868 int 2869 putnextctl(queue_t *q, int type) 2870 { 2871 mblk_t *bp; 2872 2873 if ((datamsg(type) && (type != M_DELAY)) || 2874 ((bp = allocb_tryhard(0)) == NULL)) 2875 return (0); 2876 bp->b_datap->db_type = (unsigned char)type; 2877 2878 putnext(q, bp); 2879 2880 return (1); 2881 } 2882 2883 /* 2884 * Return the queue upstream from this one 2885 */ 2886 queue_t * 2887 backq(queue_t *q) 2888 { 2889 q = _OTHERQ(q); 2890 if (q->q_next) { 2891 q = q->q_next; 2892 return (_OTHERQ(q)); 2893 } 2894 return (NULL); 2895 } 2896 2897 /* 2898 * Send a block back up the queue in reverse from this 2899 * one (e.g. to respond to ioctls) 2900 */ 2901 void 2902 qreply(queue_t *q, mblk_t *bp) 2903 { 2904 ASSERT(q && bp); 2905 2906 putnext(_OTHERQ(q), bp); 2907 } 2908 2909 /* 2910 * Streams Queue Scheduling 2911 * 2912 * Queues are enabled through qenable() when they have messages to 2913 * process. They are serviced by queuerun(), which runs each enabled 2914 * queue's service procedure. The call to queuerun() is processor 2915 * dependent - the general principle is that it be run whenever a queue 2916 * is enabled but before returning to user level. For system calls, 2917 * the function runqueues() is called if their action causes a queue 2918 * to be enabled. For device interrupts, queuerun() should be 2919 * called before returning from the last level of interrupt. Beyond 2920 * this, no timing assumptions should be made about queue scheduling. 2921 */ 2922 2923 /* 2924 * Enable a queue: put it on list of those whose service procedures are 2925 * ready to run and set up the scheduling mechanism. 2926 * The broadcast is done outside the mutex -> to avoid the woken thread 2927 * from contending with the mutex. This is OK 'cos the queue has been 2928 * enqueued on the runlist and flagged safely at this point. 2929 */ 2930 void 2931 qenable(queue_t *q) 2932 { 2933 mutex_enter(QLOCK(q)); 2934 qenable_locked(q); 2935 mutex_exit(QLOCK(q)); 2936 } 2937 /* 2938 * Return number of messages on queue 2939 */ 2940 int 2941 qsize(queue_t *qp) 2942 { 2943 int count = 0; 2944 mblk_t *mp; 2945 2946 mutex_enter(QLOCK(qp)); 2947 for (mp = qp->q_first; mp; mp = mp->b_next) 2948 count++; 2949 mutex_exit(QLOCK(qp)); 2950 return (count); 2951 } 2952 2953 /* 2954 * noenable - set queue so that putq() will not enable it. 2955 * enableok - set queue so that putq() can enable it. 2956 */ 2957 void 2958 noenable(queue_t *q) 2959 { 2960 mutex_enter(QLOCK(q)); 2961 q->q_flag |= QNOENB; 2962 mutex_exit(QLOCK(q)); 2963 } 2964 2965 void 2966 enableok(queue_t *q) 2967 { 2968 mutex_enter(QLOCK(q)); 2969 q->q_flag &= ~QNOENB; 2970 mutex_exit(QLOCK(q)); 2971 } 2972 2973 /* 2974 * Set queue fields. 2975 */ 2976 int 2977 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 2978 { 2979 qband_t *qbp = NULL; 2980 queue_t *wrq; 2981 int error = 0; 2982 kthread_id_t freezer; 2983 2984 freezer = STREAM(q)->sd_freezer; 2985 if (freezer == curthread) { 2986 ASSERT(frozenstr(q)); 2987 ASSERT(MUTEX_HELD(QLOCK(q))); 2988 } else 2989 mutex_enter(QLOCK(q)); 2990 2991 if (what >= QBAD) { 2992 error = EINVAL; 2993 goto done; 2994 } 2995 if (pri != 0) { 2996 int i; 2997 qband_t **qbpp; 2998 2999 if (pri > q->q_nband) { 3000 qbpp = &q->q_bandp; 3001 while (*qbpp) 3002 qbpp = &(*qbpp)->qb_next; 3003 while (pri > q->q_nband) { 3004 if ((*qbpp = allocband()) == NULL) { 3005 error = EAGAIN; 3006 goto done; 3007 } 3008 (*qbpp)->qb_hiwat = q->q_hiwat; 3009 (*qbpp)->qb_lowat = q->q_lowat; 3010 q->q_nband++; 3011 qbpp = &(*qbpp)->qb_next; 3012 } 3013 } 3014 qbp = q->q_bandp; 3015 i = pri; 3016 while (--i) 3017 qbp = qbp->qb_next; 3018 } 3019 switch (what) { 3020 3021 case QHIWAT: 3022 if (qbp) 3023 qbp->qb_hiwat = (size_t)val; 3024 else 3025 q->q_hiwat = (size_t)val; 3026 break; 3027 3028 case QLOWAT: 3029 if (qbp) 3030 qbp->qb_lowat = (size_t)val; 3031 else 3032 q->q_lowat = (size_t)val; 3033 break; 3034 3035 case QMAXPSZ: 3036 if (qbp) 3037 error = EINVAL; 3038 else 3039 q->q_maxpsz = (ssize_t)val; 3040 3041 /* 3042 * Performance concern, strwrite looks at the module below 3043 * the stream head for the maxpsz each time it does a write 3044 * we now cache it at the stream head. Check to see if this 3045 * queue is sitting directly below the stream head. 3046 */ 3047 wrq = STREAM(q)->sd_wrq; 3048 if (q != wrq->q_next) 3049 break; 3050 3051 /* 3052 * If the stream is not frozen drop the current QLOCK and 3053 * acquire the sd_wrq QLOCK which protects sd_qn_* 3054 */ 3055 if (freezer != curthread) { 3056 mutex_exit(QLOCK(q)); 3057 mutex_enter(QLOCK(wrq)); 3058 } 3059 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3060 3061 if (strmsgsz != 0) { 3062 if (val == INFPSZ) 3063 val = strmsgsz; 3064 else { 3065 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3066 val = MIN(PIPE_BUF, val); 3067 else 3068 val = MIN(strmsgsz, val); 3069 } 3070 } 3071 STREAM(q)->sd_qn_maxpsz = val; 3072 if (freezer != curthread) { 3073 mutex_exit(QLOCK(wrq)); 3074 mutex_enter(QLOCK(q)); 3075 } 3076 break; 3077 3078 case QMINPSZ: 3079 if (qbp) 3080 error = EINVAL; 3081 else 3082 q->q_minpsz = (ssize_t)val; 3083 3084 /* 3085 * Performance concern, strwrite looks at the module below 3086 * the stream head for the maxpsz each time it does a write 3087 * we now cache it at the stream head. Check to see if this 3088 * queue is sitting directly below the stream head. 3089 */ 3090 wrq = STREAM(q)->sd_wrq; 3091 if (q != wrq->q_next) 3092 break; 3093 3094 /* 3095 * If the stream is not frozen drop the current QLOCK and 3096 * acquire the sd_wrq QLOCK which protects sd_qn_* 3097 */ 3098 if (freezer != curthread) { 3099 mutex_exit(QLOCK(q)); 3100 mutex_enter(QLOCK(wrq)); 3101 } 3102 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3103 3104 if (freezer != curthread) { 3105 mutex_exit(QLOCK(wrq)); 3106 mutex_enter(QLOCK(q)); 3107 } 3108 break; 3109 3110 case QSTRUIOT: 3111 if (qbp) 3112 error = EINVAL; 3113 else 3114 q->q_struiot = (ushort_t)val; 3115 break; 3116 3117 case QCOUNT: 3118 case QFIRST: 3119 case QLAST: 3120 case QFLAG: 3121 error = EPERM; 3122 break; 3123 3124 default: 3125 error = EINVAL; 3126 break; 3127 } 3128 done: 3129 if (freezer != curthread) 3130 mutex_exit(QLOCK(q)); 3131 return (error); 3132 } 3133 3134 /* 3135 * Get queue fields. 3136 */ 3137 int 3138 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3139 { 3140 qband_t *qbp = NULL; 3141 int error = 0; 3142 kthread_id_t freezer; 3143 3144 freezer = STREAM(q)->sd_freezer; 3145 if (freezer == curthread) { 3146 ASSERT(frozenstr(q)); 3147 ASSERT(MUTEX_HELD(QLOCK(q))); 3148 } else 3149 mutex_enter(QLOCK(q)); 3150 if (what >= QBAD) { 3151 error = EINVAL; 3152 goto done; 3153 } 3154 if (pri != 0) { 3155 int i; 3156 qband_t **qbpp; 3157 3158 if (pri > q->q_nband) { 3159 qbpp = &q->q_bandp; 3160 while (*qbpp) 3161 qbpp = &(*qbpp)->qb_next; 3162 while (pri > q->q_nband) { 3163 if ((*qbpp = allocband()) == NULL) { 3164 error = EAGAIN; 3165 goto done; 3166 } 3167 (*qbpp)->qb_hiwat = q->q_hiwat; 3168 (*qbpp)->qb_lowat = q->q_lowat; 3169 q->q_nband++; 3170 qbpp = &(*qbpp)->qb_next; 3171 } 3172 } 3173 qbp = q->q_bandp; 3174 i = pri; 3175 while (--i) 3176 qbp = qbp->qb_next; 3177 } 3178 switch (what) { 3179 case QHIWAT: 3180 if (qbp) 3181 *(size_t *)valp = qbp->qb_hiwat; 3182 else 3183 *(size_t *)valp = q->q_hiwat; 3184 break; 3185 3186 case QLOWAT: 3187 if (qbp) 3188 *(size_t *)valp = qbp->qb_lowat; 3189 else 3190 *(size_t *)valp = q->q_lowat; 3191 break; 3192 3193 case QMAXPSZ: 3194 if (qbp) 3195 error = EINVAL; 3196 else 3197 *(ssize_t *)valp = q->q_maxpsz; 3198 break; 3199 3200 case QMINPSZ: 3201 if (qbp) 3202 error = EINVAL; 3203 else 3204 *(ssize_t *)valp = q->q_minpsz; 3205 break; 3206 3207 case QCOUNT: 3208 if (qbp) 3209 *(size_t *)valp = qbp->qb_count; 3210 else 3211 *(size_t *)valp = q->q_count; 3212 break; 3213 3214 case QFIRST: 3215 if (qbp) 3216 *(mblk_t **)valp = qbp->qb_first; 3217 else 3218 *(mblk_t **)valp = q->q_first; 3219 break; 3220 3221 case QLAST: 3222 if (qbp) 3223 *(mblk_t **)valp = qbp->qb_last; 3224 else 3225 *(mblk_t **)valp = q->q_last; 3226 break; 3227 3228 case QFLAG: 3229 if (qbp) 3230 *(uint_t *)valp = qbp->qb_flag; 3231 else 3232 *(uint_t *)valp = q->q_flag; 3233 break; 3234 3235 case QSTRUIOT: 3236 if (qbp) 3237 error = EINVAL; 3238 else 3239 *(short *)valp = q->q_struiot; 3240 break; 3241 3242 default: 3243 error = EINVAL; 3244 break; 3245 } 3246 done: 3247 if (freezer != curthread) 3248 mutex_exit(QLOCK(q)); 3249 return (error); 3250 } 3251 3252 /* 3253 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3254 * QWANTWSYNC or QWANTR or QWANTW, 3255 * 3256 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3257 * deferred wakeup will be done. Also if strpoll() in progress then a 3258 * deferred pollwakeup will be done. 3259 */ 3260 void 3261 strwakeq(queue_t *q, int flag) 3262 { 3263 stdata_t *stp = STREAM(q); 3264 pollhead_t *pl; 3265 3266 mutex_enter(&stp->sd_lock); 3267 pl = &stp->sd_pollist; 3268 if (flag & QWANTWSYNC) { 3269 ASSERT(!(q->q_flag & QREADR)); 3270 if (stp->sd_flag & WSLEEP) { 3271 stp->sd_flag &= ~WSLEEP; 3272 cv_broadcast(&stp->sd_wrq->q_wait); 3273 } else { 3274 stp->sd_wakeq |= WSLEEP; 3275 } 3276 3277 mutex_exit(&stp->sd_lock); 3278 pollwakeup(pl, POLLWRNORM); 3279 mutex_enter(&stp->sd_lock); 3280 3281 if (stp->sd_sigflags & S_WRNORM) 3282 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3283 } else if (flag & QWANTR) { 3284 if (stp->sd_flag & RSLEEP) { 3285 stp->sd_flag &= ~RSLEEP; 3286 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3287 } else { 3288 stp->sd_wakeq |= RSLEEP; 3289 } 3290 3291 mutex_exit(&stp->sd_lock); 3292 pollwakeup(pl, POLLIN | POLLRDNORM); 3293 mutex_enter(&stp->sd_lock); 3294 3295 { 3296 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3297 3298 if (events) 3299 strsendsig(stp->sd_siglist, events, 0, 0); 3300 } 3301 } else { 3302 if (stp->sd_flag & WSLEEP) { 3303 stp->sd_flag &= ~WSLEEP; 3304 cv_broadcast(&stp->sd_wrq->q_wait); 3305 } 3306 3307 mutex_exit(&stp->sd_lock); 3308 pollwakeup(pl, POLLWRNORM); 3309 mutex_enter(&stp->sd_lock); 3310 3311 if (stp->sd_sigflags & S_WRNORM) 3312 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3313 } 3314 mutex_exit(&stp->sd_lock); 3315 } 3316 3317 int 3318 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3319 { 3320 stdata_t *stp = STREAM(q); 3321 int typ = STRUIOT_STANDARD; 3322 uio_t *uiop = &dp->d_uio; 3323 dblk_t *dbp; 3324 ssize_t uiocnt; 3325 ssize_t cnt; 3326 unsigned char *ptr; 3327 ssize_t resid; 3328 int error = 0; 3329 on_trap_data_t otd; 3330 queue_t *stwrq; 3331 3332 /* 3333 * Plumbing may change while taking the type so store the 3334 * queue in a temporary variable. It doesn't matter even 3335 * if the we take the type from the previous plumbing, 3336 * that's because if the plumbing has changed when we were 3337 * holding the queue in a temporary variable, we can continue 3338 * processing the message the way it would have been processed 3339 * in the old plumbing, without any side effects but a bit 3340 * extra processing for partial ip header checksum. 3341 * 3342 * This has been done to avoid holding the sd_lock which is 3343 * very hot. 3344 */ 3345 3346 stwrq = stp->sd_struiowrq; 3347 if (stwrq) 3348 typ = stwrq->q_struiot; 3349 3350 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3351 dbp = mp->b_datap; 3352 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3353 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3354 cnt = MIN(uiocnt, uiop->uio_resid); 3355 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3356 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3357 /* 3358 * Either this mblk has already been processed 3359 * or there is no more room in this mblk (?). 3360 */ 3361 continue; 3362 } 3363 switch (typ) { 3364 case STRUIOT_STANDARD: 3365 if (noblock) { 3366 if (on_trap(&otd, OT_DATA_ACCESS)) { 3367 no_trap(); 3368 error = EWOULDBLOCK; 3369 goto out; 3370 } 3371 } 3372 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3373 if (noblock) 3374 no_trap(); 3375 goto out; 3376 } 3377 if (noblock) 3378 no_trap(); 3379 break; 3380 3381 default: 3382 error = EIO; 3383 goto out; 3384 } 3385 dbp->db_struioflag |= STRUIO_DONE; 3386 dbp->db_cksumstuff += cnt; 3387 } 3388 out: 3389 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3390 /* 3391 * A fault has occured and some bytes were moved to the 3392 * current mblk, the uio_t has already been updated by 3393 * the appropriate uio routine, so also update the mblk 3394 * to reflect this in case this same mblk chain is used 3395 * again (after the fault has been handled). 3396 */ 3397 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3398 if (uiocnt >= resid) 3399 dbp->db_cksumstuff += resid; 3400 } 3401 return (error); 3402 } 3403 3404 /* 3405 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3406 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3407 * that removeq() will not try to close the queue while a thread is inside the 3408 * queue. 3409 */ 3410 static boolean_t 3411 rwnext_enter(queue_t *qp) 3412 { 3413 mutex_enter(QLOCK(qp)); 3414 if (qp->q_flag & QWCLOSE) { 3415 mutex_exit(QLOCK(qp)); 3416 return (B_FALSE); 3417 } 3418 qp->q_rwcnt++; 3419 ASSERT(qp->q_rwcnt != 0); 3420 mutex_exit(QLOCK(qp)); 3421 return (B_TRUE); 3422 } 3423 3424 /* 3425 * Decrease the count of threads running in sync stream queue and wake up any 3426 * threads blocked in removeq(). 3427 */ 3428 static void 3429 rwnext_exit(queue_t *qp) 3430 { 3431 mutex_enter(QLOCK(qp)); 3432 qp->q_rwcnt--; 3433 if (qp->q_flag & QWANTRMQSYNC) { 3434 qp->q_flag &= ~QWANTRMQSYNC; 3435 cv_broadcast(&qp->q_wait); 3436 } 3437 mutex_exit(QLOCK(qp)); 3438 } 3439 3440 /* 3441 * The purpose of rwnext() is to call the rw procedure of the next 3442 * (downstream) modules queue. 3443 * 3444 * treated as put entrypoint for perimeter syncronization. 3445 * 3446 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3447 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3448 * not matter if any regular put entrypoints have been already entered. We 3449 * can't increment one of the sq_putcounts (instead of sq_count) because 3450 * qwait_rw won't know which counter to decrement. 3451 * 3452 * It would be reasonable to add the lockless FASTPUT logic. 3453 */ 3454 int 3455 rwnext(queue_t *qp, struiod_t *dp) 3456 { 3457 queue_t *nqp; 3458 syncq_t *sq; 3459 uint16_t count; 3460 uint16_t flags; 3461 struct qinit *qi; 3462 int (*proc)(); 3463 struct stdata *stp; 3464 int isread; 3465 int rval; 3466 3467 stp = STREAM(qp); 3468 /* 3469 * Prevent q_next from changing by holding sd_lock until acquiring 3470 * SQLOCK. Note that a read-side rwnext from the streamhead will 3471 * already have sd_lock acquired. In either case sd_lock is always 3472 * released after acquiring SQLOCK. 3473 * 3474 * The streamhead read-side holding sd_lock when calling rwnext is 3475 * required to prevent a race condition were M_DATA mblks flowing 3476 * up the read-side of the stream could be bypassed by a rwnext() 3477 * down-call. In this case sd_lock acts as the streamhead perimeter. 3478 */ 3479 if ((nqp = _WR(qp)) == qp) { 3480 isread = 0; 3481 mutex_enter(&stp->sd_lock); 3482 qp = nqp->q_next; 3483 } else { 3484 isread = 1; 3485 if (nqp != stp->sd_wrq) 3486 /* Not streamhead */ 3487 mutex_enter(&stp->sd_lock); 3488 qp = _RD(nqp->q_next); 3489 } 3490 qi = qp->q_qinfo; 3491 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3492 /* 3493 * Not a synchronous module or no r/w procedure for this 3494 * queue, so just return EINVAL and let the caller handle it. 3495 */ 3496 mutex_exit(&stp->sd_lock); 3497 return (EINVAL); 3498 } 3499 3500 if (rwnext_enter(qp) == B_FALSE) { 3501 mutex_exit(&stp->sd_lock); 3502 return (EINVAL); 3503 } 3504 3505 sq = qp->q_syncq; 3506 mutex_enter(SQLOCK(sq)); 3507 mutex_exit(&stp->sd_lock); 3508 count = sq->sq_count; 3509 flags = sq->sq_flags; 3510 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3511 3512 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3513 /* 3514 * if this queue is being closed, return. 3515 */ 3516 if (qp->q_flag & QWCLOSE) { 3517 mutex_exit(SQLOCK(sq)); 3518 rwnext_exit(qp); 3519 return (EINVAL); 3520 } 3521 3522 /* 3523 * Wait until we can enter the inner perimeter. 3524 */ 3525 sq->sq_flags = flags | SQ_WANTWAKEUP; 3526 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3527 count = sq->sq_count; 3528 flags = sq->sq_flags; 3529 } 3530 3531 if (isread == 0 && stp->sd_struiowrq == NULL || 3532 isread == 1 && stp->sd_struiordq == NULL) { 3533 /* 3534 * Stream plumbing changed while waiting for inner perimeter 3535 * so just return EINVAL and let the caller handle it. 3536 */ 3537 mutex_exit(SQLOCK(sq)); 3538 rwnext_exit(qp); 3539 return (EINVAL); 3540 } 3541 if (!(flags & SQ_CIPUT)) 3542 sq->sq_flags = flags | SQ_EXCL; 3543 sq->sq_count = count + 1; 3544 ASSERT(sq->sq_count != 0); /* Wraparound */ 3545 /* 3546 * Note: The only message ordering guarantee that rwnext() makes is 3547 * for the write queue flow-control case. All others (r/w queue 3548 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3549 * the queue's rw procedure. This could be genralized here buy 3550 * running the queue's service procedure, but that wouldn't be 3551 * the most efficent for all cases. 3552 */ 3553 mutex_exit(SQLOCK(sq)); 3554 if (! isread && (qp->q_flag & QFULL)) { 3555 /* 3556 * Write queue may be flow controlled. If so, 3557 * mark the queue for wakeup when it's not. 3558 */ 3559 mutex_enter(QLOCK(qp)); 3560 if (qp->q_flag & QFULL) { 3561 qp->q_flag |= QWANTWSYNC; 3562 mutex_exit(QLOCK(qp)); 3563 rval = EWOULDBLOCK; 3564 goto out; 3565 } 3566 mutex_exit(QLOCK(qp)); 3567 } 3568 3569 if (! isread && dp->d_mp) 3570 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3571 dp->d_mp->b_datap->db_base); 3572 3573 rval = (*proc)(qp, dp); 3574 3575 if (isread && dp->d_mp) 3576 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3577 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3578 out: 3579 /* 3580 * The queue is protected from being freed by sq_count, so it is 3581 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3582 */ 3583 rwnext_exit(qp); 3584 3585 mutex_enter(SQLOCK(sq)); 3586 flags = sq->sq_flags; 3587 ASSERT(sq->sq_count != 0); 3588 sq->sq_count--; 3589 if (flags & SQ_TAIL) { 3590 putnext_tail(sq, qp, flags); 3591 /* 3592 * The only purpose of this ASSERT is to preserve calling stack 3593 * in DEBUG kernel. 3594 */ 3595 ASSERT(flags & SQ_TAIL); 3596 return (rval); 3597 } 3598 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3599 /* 3600 * Safe to always drop SQ_EXCL: 3601 * Not SQ_CIPUT means we set SQ_EXCL above 3602 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3603 * did a qwriter(INNER) in which case nobody else 3604 * is in the inner perimeter and we are exiting. 3605 * 3606 * I would like to make the following assertion: 3607 * 3608 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3609 * sq->sq_count == 0); 3610 * 3611 * which indicates that if we are both putshared and exclusive, 3612 * we became exclusive while executing the putproc, and the only 3613 * claim on the syncq was the one we dropped a few lines above. 3614 * But other threads that enter putnext while the syncq is exclusive 3615 * need to make a claim as they may need to drop SQLOCK in the 3616 * has_writers case to avoid deadlocks. If these threads are 3617 * delayed or preempted, it is possible that the writer thread can 3618 * find out that there are other claims making the (sq_count == 0) 3619 * test invalid. 3620 */ 3621 3622 sq->sq_flags = flags & ~SQ_EXCL; 3623 if (sq->sq_flags & SQ_WANTWAKEUP) { 3624 sq->sq_flags &= ~SQ_WANTWAKEUP; 3625 cv_broadcast(&sq->sq_wait); 3626 } 3627 mutex_exit(SQLOCK(sq)); 3628 return (rval); 3629 } 3630 3631 /* 3632 * The purpose of infonext() is to call the info procedure of the next 3633 * (downstream) modules queue. 3634 * 3635 * treated as put entrypoint for perimeter syncronization. 3636 * 3637 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3638 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3639 * it does not matter if any regular put entrypoints have been already 3640 * entered. 3641 */ 3642 int 3643 infonext(queue_t *qp, infod_t *idp) 3644 { 3645 queue_t *nqp; 3646 syncq_t *sq; 3647 uint16_t count; 3648 uint16_t flags; 3649 struct qinit *qi; 3650 int (*proc)(); 3651 struct stdata *stp; 3652 int rval; 3653 3654 stp = STREAM(qp); 3655 /* 3656 * Prevent q_next from changing by holding sd_lock until 3657 * acquiring SQLOCK. 3658 */ 3659 mutex_enter(&stp->sd_lock); 3660 if ((nqp = _WR(qp)) == qp) { 3661 qp = nqp->q_next; 3662 } else { 3663 qp = _RD(nqp->q_next); 3664 } 3665 qi = qp->q_qinfo; 3666 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3667 mutex_exit(&stp->sd_lock); 3668 return (EINVAL); 3669 } 3670 sq = qp->q_syncq; 3671 mutex_enter(SQLOCK(sq)); 3672 mutex_exit(&stp->sd_lock); 3673 count = sq->sq_count; 3674 flags = sq->sq_flags; 3675 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3676 3677 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3678 /* 3679 * Wait until we can enter the inner perimeter. 3680 */ 3681 sq->sq_flags = flags | SQ_WANTWAKEUP; 3682 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3683 count = sq->sq_count; 3684 flags = sq->sq_flags; 3685 } 3686 3687 if (! (flags & SQ_CIPUT)) 3688 sq->sq_flags = flags | SQ_EXCL; 3689 sq->sq_count = count + 1; 3690 ASSERT(sq->sq_count != 0); /* Wraparound */ 3691 mutex_exit(SQLOCK(sq)); 3692 3693 rval = (*proc)(qp, idp); 3694 3695 mutex_enter(SQLOCK(sq)); 3696 flags = sq->sq_flags; 3697 ASSERT(sq->sq_count != 0); 3698 sq->sq_count--; 3699 if (flags & SQ_TAIL) { 3700 putnext_tail(sq, qp, flags); 3701 /* 3702 * The only purpose of this ASSERT is to preserve calling stack 3703 * in DEBUG kernel. 3704 */ 3705 ASSERT(flags & SQ_TAIL); 3706 return (rval); 3707 } 3708 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3709 /* 3710 * XXXX 3711 * I am not certain the next comment is correct here. I need to consider 3712 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3713 * might cause other problems. It just might be safer to drop it if 3714 * !SQ_CIPUT because that is when we set it. 3715 */ 3716 /* 3717 * Safe to always drop SQ_EXCL: 3718 * Not SQ_CIPUT means we set SQ_EXCL above 3719 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3720 * did a qwriter(INNER) in which case nobody else 3721 * is in the inner perimeter and we are exiting. 3722 * 3723 * I would like to make the following assertion: 3724 * 3725 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3726 * sq->sq_count == 0); 3727 * 3728 * which indicates that if we are both putshared and exclusive, 3729 * we became exclusive while executing the putproc, and the only 3730 * claim on the syncq was the one we dropped a few lines above. 3731 * But other threads that enter putnext while the syncq is exclusive 3732 * need to make a claim as they may need to drop SQLOCK in the 3733 * has_writers case to avoid deadlocks. If these threads are 3734 * delayed or preempted, it is possible that the writer thread can 3735 * find out that there are other claims making the (sq_count == 0) 3736 * test invalid. 3737 */ 3738 3739 sq->sq_flags = flags & ~SQ_EXCL; 3740 mutex_exit(SQLOCK(sq)); 3741 return (rval); 3742 } 3743 3744 /* 3745 * Return nonzero if the queue is responsible for struio(), else return 0. 3746 */ 3747 int 3748 isuioq(queue_t *q) 3749 { 3750 if (q->q_flag & QREADR) 3751 return (STREAM(q)->sd_struiordq == q); 3752 else 3753 return (STREAM(q)->sd_struiowrq == q); 3754 } 3755 3756 #if defined(__sparc) 3757 int disable_putlocks = 0; 3758 #else 3759 int disable_putlocks = 1; 3760 #endif 3761 3762 /* 3763 * called by create_putlock. 3764 */ 3765 static void 3766 create_syncq_putlocks(queue_t *q) 3767 { 3768 syncq_t *sq = q->q_syncq; 3769 ciputctrl_t *cip; 3770 int i; 3771 3772 ASSERT(sq != NULL); 3773 3774 ASSERT(disable_putlocks == 0); 3775 ASSERT(n_ciputctrl >= min_n_ciputctrl); 3776 ASSERT(ciputctrl_cache != NULL); 3777 3778 if (!(sq->sq_type & SQ_CIPUT)) 3779 return; 3780 3781 for (i = 0; i <= 1; i++) { 3782 if (sq->sq_ciputctrl == NULL) { 3783 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3784 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3785 mutex_enter(SQLOCK(sq)); 3786 if (sq->sq_ciputctrl != NULL) { 3787 mutex_exit(SQLOCK(sq)); 3788 kmem_cache_free(ciputctrl_cache, cip); 3789 } else { 3790 ASSERT(sq->sq_nciputctrl == 0); 3791 sq->sq_nciputctrl = n_ciputctrl - 1; 3792 /* 3793 * putnext checks sq_ciputctrl without holding 3794 * SQLOCK. if it is not NULL putnext assumes 3795 * sq_nciputctrl is initialized. membar below 3796 * insures that. 3797 */ 3798 membar_producer(); 3799 sq->sq_ciputctrl = cip; 3800 mutex_exit(SQLOCK(sq)); 3801 } 3802 } 3803 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 3804 if (i == 1) 3805 break; 3806 q = _OTHERQ(q); 3807 if (!(q->q_flag & QPERQ)) { 3808 ASSERT(sq == q->q_syncq); 3809 break; 3810 } 3811 ASSERT(q->q_syncq != NULL); 3812 ASSERT(sq != q->q_syncq); 3813 sq = q->q_syncq; 3814 ASSERT(sq->sq_type & SQ_CIPUT); 3815 } 3816 } 3817 3818 /* 3819 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 3820 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 3821 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 3822 * starting from q and down to the driver. 3823 * 3824 * This should be called after the affected queues are part of stream 3825 * geometry. It should be called from driver/module open routine after 3826 * qprocson() call. It is also called from nfs syscall where it is known that 3827 * stream is configured and won't change its geometry during create_putlock 3828 * call. 3829 * 3830 * caller normally uses 0 value for the stream argument to speed up MT putnext 3831 * into the perimeter of q for example because its perimeter is per module 3832 * (e.g. IP). 3833 * 3834 * caller normally uses non 0 value for the stream argument to hint the system 3835 * that the stream of q is a very contended global system stream 3836 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 3837 * particularly MT hot. 3838 * 3839 * Caller insures stream plumbing won't happen while we are here and therefore 3840 * q_next can be safely used. 3841 */ 3842 3843 void 3844 create_putlocks(queue_t *q, int stream) 3845 { 3846 ciputctrl_t *cip; 3847 struct stdata *stp = STREAM(q); 3848 3849 q = _WR(q); 3850 ASSERT(stp != NULL); 3851 3852 if (disable_putlocks != 0) 3853 return; 3854 3855 if (n_ciputctrl < min_n_ciputctrl) 3856 return; 3857 3858 ASSERT(ciputctrl_cache != NULL); 3859 3860 if (stream != 0 && stp->sd_ciputctrl == NULL) { 3861 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3862 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3863 mutex_enter(&stp->sd_lock); 3864 if (stp->sd_ciputctrl != NULL) { 3865 mutex_exit(&stp->sd_lock); 3866 kmem_cache_free(ciputctrl_cache, cip); 3867 } else { 3868 ASSERT(stp->sd_nciputctrl == 0); 3869 stp->sd_nciputctrl = n_ciputctrl - 1; 3870 /* 3871 * putnext checks sd_ciputctrl without holding 3872 * sd_lock. if it is not NULL putnext assumes 3873 * sd_nciputctrl is initialized. membar below 3874 * insures that. 3875 */ 3876 membar_producer(); 3877 stp->sd_ciputctrl = cip; 3878 mutex_exit(&stp->sd_lock); 3879 } 3880 } 3881 3882 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 3883 3884 while (_SAMESTR(q)) { 3885 create_syncq_putlocks(q); 3886 if (stream == 0) 3887 return; 3888 q = q->q_next; 3889 } 3890 ASSERT(q != NULL); 3891 create_syncq_putlocks(q); 3892 } 3893 3894 /* 3895 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 3896 * through a stream. 3897 * 3898 * Data currently record per event is a hrtime stamp, queue address, event 3899 * type, and a per type datum. Much of the STREAMS framework is instrumented 3900 * for automatic flow tracing (when enabled). Events can be defined and used 3901 * by STREAMS modules and drivers. 3902 * 3903 * Global objects: 3904 * 3905 * str_ftevent() - Add a flow-trace event to a dblk. 3906 * str_ftfree() - Free flow-trace data 3907 * 3908 * Local objects: 3909 * 3910 * fthdr_cache - pointer to the kmem cache for trace header. 3911 * ftblk_cache - pointer to the kmem cache for trace data blocks. 3912 */ 3913 3914 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 3915 3916 void 3917 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 3918 { 3919 ftblk_t *bp = hp->tail; 3920 ftblk_t *nbp; 3921 ftevnt_t *ep; 3922 int ix, nix; 3923 3924 ASSERT(hp != NULL); 3925 3926 for (;;) { 3927 if ((ix = bp->ix) == FTBLK_EVNTS) { 3928 /* 3929 * Tail doesn't have room, so need a new tail. 3930 * 3931 * To make this MT safe, first, allocate a new 3932 * ftblk, and initialize it. To make life a 3933 * little easier, reserve the first slot (mostly 3934 * by making ix = 1). When we are finished with 3935 * the initialization, CAS this pointer to the 3936 * tail. If this succeeds, this is the new 3937 * "next" block. Otherwise, another thread 3938 * got here first, so free the block and start 3939 * again. 3940 */ 3941 if (!(nbp = kmem_cache_alloc(ftblk_cache, 3942 KM_NOSLEEP))) { 3943 /* no mem, so punt */ 3944 str_ftnever++; 3945 /* free up all flow data? */ 3946 return; 3947 } 3948 nbp->nxt = NULL; 3949 nbp->ix = 1; 3950 /* 3951 * Just in case there is another thread about 3952 * to get the next index, we need to make sure 3953 * the value is there for it. 3954 */ 3955 membar_producer(); 3956 if (casptr(&hp->tail, bp, nbp) == bp) { 3957 /* CAS was successful */ 3958 bp->nxt = nbp; 3959 membar_producer(); 3960 bp = nbp; 3961 ix = 0; 3962 goto cas_good; 3963 } else { 3964 kmem_cache_free(ftblk_cache, nbp); 3965 bp = hp->tail; 3966 continue; 3967 } 3968 } 3969 nix = ix + 1; 3970 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 3971 cas_good: 3972 if (curthread != hp->thread) { 3973 hp->thread = curthread; 3974 evnt |= FTEV_CS; 3975 } 3976 if (CPU->cpu_seqid != hp->cpu_seqid) { 3977 hp->cpu_seqid = CPU->cpu_seqid; 3978 evnt |= FTEV_PS; 3979 } 3980 ep = &bp->ev[ix]; 3981 break; 3982 } 3983 } 3984 3985 if (evnt & FTEV_QMASK) { 3986 queue_t *qp = p; 3987 3988 /* 3989 * It is possible that the module info is broke 3990 * (as is logsubr.c at this comment writing). 3991 * Instead of panicing or doing other unmentionables, 3992 * we shall put a dummy name as the mid, and continue. 3993 */ 3994 if (qp->q_qinfo == NULL) 3995 ep->mid = "NONAME"; 3996 else 3997 ep->mid = qp->q_qinfo->qi_minfo->mi_idname; 3998 3999 if (!(qp->q_flag & QREADR)) 4000 evnt |= FTEV_ISWR; 4001 } else { 4002 ep->mid = (char *)p; 4003 } 4004 4005 ep->ts = gethrtime(); 4006 ep->evnt = evnt; 4007 ep->data = data; 4008 hp->hash = (hp->hash << 9) + hp->hash; 4009 hp->hash += (evnt << 16) | data; 4010 hp->hash += (uintptr_t)ep->mid; 4011 } 4012 4013 /* 4014 * Free flow-trace data. 4015 */ 4016 void 4017 str_ftfree(dblk_t *dbp) 4018 { 4019 fthdr_t *hp = dbp->db_fthdr; 4020 ftblk_t *bp = &hp->first; 4021 ftblk_t *nbp; 4022 4023 if (bp != hp->tail || bp->ix != 0) { 4024 /* 4025 * Clear out the hash, have the tail point to itself, and free 4026 * any continuation blocks. 4027 */ 4028 bp = hp->first.nxt; 4029 hp->tail = &hp->first; 4030 hp->hash = 0; 4031 hp->first.nxt = NULL; 4032 hp->first.ix = 0; 4033 while (bp != NULL) { 4034 nbp = bp->nxt; 4035 kmem_cache_free(ftblk_cache, bp); 4036 bp = nbp; 4037 } 4038 } 4039 kmem_cache_free(fthdr_cache, hp); 4040 dbp->db_fthdr = NULL; 4041 } 4042