1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 23 /* All Rights Reserved */ 24 25 26 /* 27 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 28 * Use is subject to license terms. 29 */ 30 31 #pragma ident "%Z%%M% %I% %E% SMI" 32 33 #include <sys/types.h> 34 #include <sys/param.h> 35 #include <sys/thread.h> 36 #include <sys/sysmacros.h> 37 #include <sys/stropts.h> 38 #include <sys/stream.h> 39 #include <sys/strsubr.h> 40 #include <sys/strsun.h> 41 #include <sys/conf.h> 42 #include <sys/debug.h> 43 #include <sys/cmn_err.h> 44 #include <sys/kmem.h> 45 #include <sys/atomic.h> 46 #include <sys/errno.h> 47 #include <sys/vtrace.h> 48 #include <sys/ftrace.h> 49 #include <sys/ontrap.h> 50 #include <sys/multidata.h> 51 #include <sys/multidata_impl.h> 52 #include <sys/sdt.h> 53 54 #ifdef DEBUG 55 #include <sys/kmem_impl.h> 56 #endif 57 58 /* 59 * This file contains all the STREAMS utility routines that may 60 * be used by modules and drivers. 61 */ 62 63 /* 64 * STREAMS message allocator: principles of operation 65 * 66 * The streams message allocator consists of all the routines that 67 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 68 * dupb(), freeb() and freemsg(). What follows is a high-level view 69 * of how the allocator works. 70 * 71 * Every streams message consists of one or more mblks, a dblk, and data. 72 * All mblks for all types of messages come from a common mblk_cache. 73 * The dblk and data come in several flavors, depending on how the 74 * message is allocated: 75 * 76 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 77 * fixed-size dblk/data caches. For message sizes that are multiples of 78 * PAGESIZE, dblks are allocated separately from the buffer. 79 * The associated buffer is allocated by the constructor using kmem_alloc(). 80 * For all other message sizes, dblk and its associated data is allocated 81 * as a single contiguous chunk of memory. 82 * Objects in these caches consist of a dblk plus its associated data. 83 * allocb() determines the nearest-size cache by table lookup: 84 * the dblk_cache[] array provides the mapping from size to dblk cache. 85 * 86 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 87 * kmem_alloc()'ing a buffer for the data and supplying that 88 * buffer to gesballoc(), described below. 89 * 90 * (3) The four flavors of [d]esballoc[a] are all implemented by a 91 * common routine, gesballoc() ("generic esballoc"). gesballoc() 92 * allocates a dblk from the global dblk_esb_cache and sets db_base, 93 * db_lim and db_frtnp to describe the caller-supplied buffer. 94 * 95 * While there are several routines to allocate messages, there is only 96 * one routine to free messages: freeb(). freeb() simply invokes the 97 * dblk's free method, dbp->db_free(), which is set at allocation time. 98 * 99 * dupb() creates a new reference to a message by allocating a new mblk, 100 * incrementing the dblk reference count and setting the dblk's free 101 * method to dblk_decref(). The dblk's original free method is retained 102 * in db_lastfree. dblk_decref() decrements the reference count on each 103 * freeb(). If this is not the last reference it just frees the mblk; 104 * if this *is* the last reference, it restores db_free to db_lastfree, 105 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 106 * 107 * The implementation makes aggressive use of kmem object caching for 108 * maximum performance. This makes the code simple and compact, but 109 * also a bit abstruse in some places. The invariants that constitute a 110 * message's constructed state, described below, are more subtle than usual. 111 * 112 * Every dblk has an "attached mblk" as part of its constructed state. 113 * The mblk is allocated by the dblk's constructor and remains attached 114 * until the message is either dup'ed or pulled up. In the dupb() case 115 * the mblk association doesn't matter until the last free, at which time 116 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 117 * the mblk association because it swaps the leading mblks of two messages, 118 * so it is responsible for swapping their db_mblk pointers accordingly. 119 * From a constructed-state viewpoint it doesn't matter that a dblk's 120 * attached mblk can change while the message is allocated; all that 121 * matters is that the dblk has *some* attached mblk when it's freed. 122 * 123 * The sizes of the allocb() small-message caches are not magical. 124 * They represent a good trade-off between internal and external 125 * fragmentation for current workloads. They should be reevaluated 126 * periodically, especially if allocations larger than DBLK_MAX_CACHE 127 * become common. We use 64-byte alignment so that dblks don't 128 * straddle cache lines unnecessarily. 129 */ 130 #define DBLK_MAX_CACHE 73728 131 #define DBLK_CACHE_ALIGN 64 132 #define DBLK_MIN_SIZE 8 133 #define DBLK_SIZE_SHIFT 3 134 135 #ifdef _BIG_ENDIAN 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 138 #else 139 #define DBLK_RTFU_SHIFT(field) \ 140 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 141 #endif 142 143 #define DBLK_RTFU(ref, type, flags, uioflag) \ 144 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 145 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 146 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 147 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 148 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 149 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 150 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 151 152 static size_t dblk_sizes[] = { 153 #ifdef _LP64 154 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920, 155 8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688, 156 40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456, 157 #else 158 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968, 159 8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736, 160 40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504, 161 #endif 162 DBLK_MAX_CACHE, 0 163 }; 164 165 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 166 static struct kmem_cache *mblk_cache; 167 static struct kmem_cache *dblk_esb_cache; 168 static struct kmem_cache *fthdr_cache; 169 static struct kmem_cache *ftblk_cache; 170 171 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 172 static mblk_t *allocb_oversize(size_t size, int flags); 173 static int allocb_tryhard_fails; 174 static void frnop_func(void *arg); 175 frtn_t frnop = { frnop_func }; 176 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 177 178 static boolean_t rwnext_enter(queue_t *qp); 179 static void rwnext_exit(queue_t *qp); 180 181 /* 182 * Patchable mblk/dblk kmem_cache flags. 183 */ 184 int dblk_kmem_flags = 0; 185 int mblk_kmem_flags = 0; 186 187 188 static int 189 dblk_constructor(void *buf, void *cdrarg, int kmflags) 190 { 191 dblk_t *dbp = buf; 192 ssize_t msg_size = (ssize_t)cdrarg; 193 size_t index; 194 195 ASSERT(msg_size != 0); 196 197 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 198 199 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 200 201 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 202 return (-1); 203 if ((msg_size & PAGEOFFSET) == 0) { 204 dbp->db_base = kmem_alloc(msg_size, kmflags); 205 if (dbp->db_base == NULL) { 206 kmem_cache_free(mblk_cache, dbp->db_mblk); 207 return (-1); 208 } 209 } else { 210 dbp->db_base = (unsigned char *)&dbp[1]; 211 } 212 213 dbp->db_mblk->b_datap = dbp; 214 dbp->db_cache = dblk_cache[index]; 215 dbp->db_lim = dbp->db_base + msg_size; 216 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 217 dbp->db_frtnp = NULL; 218 dbp->db_fthdr = NULL; 219 dbp->db_credp = NULL; 220 dbp->db_cpid = -1; 221 dbp->db_struioflag = 0; 222 dbp->db_struioun.cksum.flags = 0; 223 return (0); 224 } 225 226 /*ARGSUSED*/ 227 static int 228 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 229 { 230 dblk_t *dbp = buf; 231 232 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 233 return (-1); 234 dbp->db_mblk->b_datap = dbp; 235 dbp->db_cache = dblk_esb_cache; 236 dbp->db_fthdr = NULL; 237 dbp->db_credp = NULL; 238 dbp->db_cpid = -1; 239 dbp->db_struioflag = 0; 240 dbp->db_struioun.cksum.flags = 0; 241 return (0); 242 } 243 244 static int 245 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 246 { 247 dblk_t *dbp = buf; 248 bcache_t *bcp = (bcache_t *)cdrarg; 249 250 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 251 return (-1); 252 253 if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache, 254 kmflags)) == NULL) { 255 kmem_cache_free(mblk_cache, dbp->db_mblk); 256 return (-1); 257 } 258 259 dbp->db_mblk->b_datap = dbp; 260 dbp->db_cache = (void *)bcp; 261 dbp->db_lim = dbp->db_base + bcp->size; 262 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 263 dbp->db_frtnp = NULL; 264 dbp->db_fthdr = NULL; 265 dbp->db_credp = NULL; 266 dbp->db_cpid = -1; 267 dbp->db_struioflag = 0; 268 dbp->db_struioun.cksum.flags = 0; 269 return (0); 270 } 271 272 /*ARGSUSED*/ 273 static void 274 dblk_destructor(void *buf, void *cdrarg) 275 { 276 dblk_t *dbp = buf; 277 ssize_t msg_size = (ssize_t)cdrarg; 278 279 ASSERT(dbp->db_mblk->b_datap == dbp); 280 281 ASSERT(msg_size != 0); 282 283 ASSERT(dbp->db_struioflag == 0); 284 ASSERT(dbp->db_struioun.cksum.flags == 0); 285 286 if ((msg_size & PAGEOFFSET) == 0) { 287 kmem_free(dbp->db_base, msg_size); 288 } 289 290 kmem_cache_free(mblk_cache, dbp->db_mblk); 291 } 292 293 static void 294 bcache_dblk_destructor(void *buf, void *cdrarg) 295 { 296 dblk_t *dbp = buf; 297 bcache_t *bcp = (bcache_t *)cdrarg; 298 299 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 300 301 ASSERT(dbp->db_mblk->b_datap == dbp); 302 303 ASSERT(dbp->db_struioflag == 0); 304 ASSERT(dbp->db_struioun.cksum.flags == 0); 305 306 kmem_cache_free(mblk_cache, dbp->db_mblk); 307 } 308 309 void 310 streams_msg_init(void) 311 { 312 char name[40]; 313 size_t size; 314 size_t lastsize = DBLK_MIN_SIZE; 315 size_t *sizep; 316 struct kmem_cache *cp; 317 size_t tot_size; 318 int offset; 319 320 mblk_cache = kmem_cache_create("streams_mblk", 321 sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL, 322 mblk_kmem_flags); 323 324 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 325 326 if ((offset = (size & PAGEOFFSET)) != 0) { 327 /* 328 * We are in the middle of a page, dblk should 329 * be allocated on the same page 330 */ 331 tot_size = size + sizeof (dblk_t); 332 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 333 < PAGESIZE); 334 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 335 336 } else { 337 338 /* 339 * buf size is multiple of page size, dblk and 340 * buffer are allocated separately. 341 */ 342 343 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 344 tot_size = sizeof (dblk_t); 345 } 346 347 (void) sprintf(name, "streams_dblk_%ld", size); 348 cp = kmem_cache_create(name, tot_size, 349 DBLK_CACHE_ALIGN, dblk_constructor, 350 dblk_destructor, NULL, 351 (void *)(size), NULL, dblk_kmem_flags); 352 353 while (lastsize <= size) { 354 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 355 lastsize += DBLK_MIN_SIZE; 356 } 357 } 358 359 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", 360 sizeof (dblk_t), DBLK_CACHE_ALIGN, 361 dblk_esb_constructor, dblk_destructor, NULL, 362 (void *) sizeof (dblk_t), NULL, dblk_kmem_flags); 363 fthdr_cache = kmem_cache_create("streams_fthdr", 364 sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 365 ftblk_cache = kmem_cache_create("streams_ftblk", 366 sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0); 367 368 /* Initialize Multidata caches */ 369 mmd_init(); 370 } 371 372 /*ARGSUSED*/ 373 mblk_t * 374 allocb(size_t size, uint_t pri) 375 { 376 dblk_t *dbp; 377 mblk_t *mp; 378 size_t index; 379 380 index = (size - 1) >> DBLK_SIZE_SHIFT; 381 382 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 383 if (size != 0) { 384 mp = allocb_oversize(size, KM_NOSLEEP); 385 goto out; 386 } 387 index = 0; 388 } 389 390 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 391 mp = NULL; 392 goto out; 393 } 394 395 mp = dbp->db_mblk; 396 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 397 mp->b_next = mp->b_prev = mp->b_cont = NULL; 398 mp->b_rptr = mp->b_wptr = dbp->db_base; 399 mp->b_queue = NULL; 400 MBLK_BAND_FLAG_WORD(mp) = 0; 401 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 402 out: 403 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 404 405 return (mp); 406 } 407 408 mblk_t * 409 allocb_tmpl(size_t size, const mblk_t *tmpl) 410 { 411 mblk_t *mp = allocb(size, 0); 412 413 if (mp != NULL) { 414 cred_t *cr = DB_CRED(tmpl); 415 if (cr != NULL) 416 crhold(mp->b_datap->db_credp = cr); 417 DB_CPID(mp) = DB_CPID(tmpl); 418 DB_TYPE(mp) = DB_TYPE(tmpl); 419 } 420 return (mp); 421 } 422 423 mblk_t * 424 allocb_cred(size_t size, cred_t *cr) 425 { 426 mblk_t *mp = allocb(size, 0); 427 428 if (mp != NULL && cr != NULL) 429 crhold(mp->b_datap->db_credp = cr); 430 431 return (mp); 432 } 433 434 mblk_t * 435 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr) 436 { 437 mblk_t *mp = allocb_wait(size, 0, flags, error); 438 439 if (mp != NULL && cr != NULL) 440 crhold(mp->b_datap->db_credp = cr); 441 442 return (mp); 443 } 444 445 void 446 freeb(mblk_t *mp) 447 { 448 dblk_t *dbp = mp->b_datap; 449 450 ASSERT(dbp->db_ref > 0); 451 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 452 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 453 454 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 455 456 dbp->db_free(mp, dbp); 457 } 458 459 void 460 freemsg(mblk_t *mp) 461 { 462 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 463 while (mp) { 464 dblk_t *dbp = mp->b_datap; 465 mblk_t *mp_cont = mp->b_cont; 466 467 ASSERT(dbp->db_ref > 0); 468 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 469 470 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 471 472 dbp->db_free(mp, dbp); 473 mp = mp_cont; 474 } 475 } 476 477 /* 478 * Reallocate a block for another use. Try hard to use the old block. 479 * If the old data is wanted (copy), leave b_wptr at the end of the data, 480 * otherwise return b_wptr = b_rptr. 481 * 482 * This routine is private and unstable. 483 */ 484 mblk_t * 485 reallocb(mblk_t *mp, size_t size, uint_t copy) 486 { 487 mblk_t *mp1; 488 unsigned char *old_rptr; 489 ptrdiff_t cur_size; 490 491 if (mp == NULL) 492 return (allocb(size, BPRI_HI)); 493 494 cur_size = mp->b_wptr - mp->b_rptr; 495 old_rptr = mp->b_rptr; 496 497 ASSERT(mp->b_datap->db_ref != 0); 498 499 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 500 /* 501 * If the data is wanted and it will fit where it is, no 502 * work is required. 503 */ 504 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 505 return (mp); 506 507 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 508 mp1 = mp; 509 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 510 /* XXX other mp state could be copied too, db_flags ... ? */ 511 mp1->b_cont = mp->b_cont; 512 } else { 513 return (NULL); 514 } 515 516 if (copy) { 517 bcopy(old_rptr, mp1->b_rptr, cur_size); 518 mp1->b_wptr = mp1->b_rptr + cur_size; 519 } 520 521 if (mp != mp1) 522 freeb(mp); 523 524 return (mp1); 525 } 526 527 static void 528 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 529 { 530 ASSERT(dbp->db_mblk == mp); 531 if (dbp->db_fthdr != NULL) 532 str_ftfree(dbp); 533 534 /* set credp and projid to be 'unspecified' before returning to cache */ 535 if (dbp->db_credp != NULL) { 536 crfree(dbp->db_credp); 537 dbp->db_credp = NULL; 538 } 539 dbp->db_cpid = -1; 540 541 /* Reset the struioflag and the checksum flag fields */ 542 dbp->db_struioflag = 0; 543 dbp->db_struioun.cksum.flags = 0; 544 545 /* and the COOKED flag */ 546 dbp->db_flags &= ~DBLK_COOKED; 547 548 kmem_cache_free(dbp->db_cache, dbp); 549 } 550 551 static void 552 dblk_decref(mblk_t *mp, dblk_t *dbp) 553 { 554 if (dbp->db_ref != 1) { 555 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 556 -(1 << DBLK_RTFU_SHIFT(db_ref))); 557 /* 558 * atomic_add_32_nv() just decremented db_ref, so we no longer 559 * have a reference to the dblk, which means another thread 560 * could free it. Therefore we cannot examine the dblk to 561 * determine whether ours was the last reference. Instead, 562 * we extract the new and minimum reference counts from rtfu. 563 * Note that all we're really saying is "if (ref != refmin)". 564 */ 565 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 566 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 567 kmem_cache_free(mblk_cache, mp); 568 return; 569 } 570 } 571 dbp->db_mblk = mp; 572 dbp->db_free = dbp->db_lastfree; 573 dbp->db_lastfree(mp, dbp); 574 } 575 576 mblk_t * 577 dupb(mblk_t *mp) 578 { 579 dblk_t *dbp = mp->b_datap; 580 mblk_t *new_mp; 581 uint32_t oldrtfu, newrtfu; 582 583 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 584 goto out; 585 586 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 587 new_mp->b_rptr = mp->b_rptr; 588 new_mp->b_wptr = mp->b_wptr; 589 new_mp->b_datap = dbp; 590 new_mp->b_queue = NULL; 591 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 592 593 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 594 595 /* 596 * First-dup optimization. The enabling assumption is that there 597 * can can never be a race (in correct code) to dup the first copy 598 * of a message. Therefore we don't need to do it atomically. 599 */ 600 if (dbp->db_free != dblk_decref) { 601 dbp->db_free = dblk_decref; 602 dbp->db_ref++; 603 goto out; 604 } 605 606 do { 607 ASSERT(dbp->db_ref > 0); 608 oldrtfu = DBLK_RTFU_WORD(dbp); 609 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 610 /* 611 * If db_ref is maxed out we can't dup this message anymore. 612 */ 613 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 614 kmem_cache_free(mblk_cache, new_mp); 615 new_mp = NULL; 616 goto out; 617 } 618 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 619 620 out: 621 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 622 return (new_mp); 623 } 624 625 static void 626 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 627 { 628 frtn_t *frp = dbp->db_frtnp; 629 630 ASSERT(dbp->db_mblk == mp); 631 frp->free_func(frp->free_arg); 632 if (dbp->db_fthdr != NULL) 633 str_ftfree(dbp); 634 635 /* set credp and projid to be 'unspecified' before returning to cache */ 636 if (dbp->db_credp != NULL) { 637 crfree(dbp->db_credp); 638 dbp->db_credp = NULL; 639 } 640 dbp->db_cpid = -1; 641 dbp->db_struioflag = 0; 642 dbp->db_struioun.cksum.flags = 0; 643 644 kmem_cache_free(dbp->db_cache, dbp); 645 } 646 647 /*ARGSUSED*/ 648 static void 649 frnop_func(void *arg) 650 { 651 } 652 653 /* 654 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 655 */ 656 static mblk_t * 657 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 658 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 659 { 660 dblk_t *dbp; 661 mblk_t *mp; 662 663 ASSERT(base != NULL && frp != NULL); 664 665 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 666 mp = NULL; 667 goto out; 668 } 669 670 mp = dbp->db_mblk; 671 dbp->db_base = base; 672 dbp->db_lim = base + size; 673 dbp->db_free = dbp->db_lastfree = lastfree; 674 dbp->db_frtnp = frp; 675 DBLK_RTFU_WORD(dbp) = db_rtfu; 676 mp->b_next = mp->b_prev = mp->b_cont = NULL; 677 mp->b_rptr = mp->b_wptr = base; 678 mp->b_queue = NULL; 679 MBLK_BAND_FLAG_WORD(mp) = 0; 680 681 out: 682 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 683 return (mp); 684 } 685 686 /*ARGSUSED*/ 687 mblk_t * 688 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 689 { 690 mblk_t *mp; 691 692 /* 693 * Note that this is structured to allow the common case (i.e. 694 * STREAMS flowtracing disabled) to call gesballoc() with tail 695 * call optimization. 696 */ 697 if (!str_ftnever) { 698 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 699 frp, freebs_enqueue, KM_NOSLEEP); 700 701 if (mp != NULL) 702 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 703 return (mp); 704 } 705 706 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 707 frp, freebs_enqueue, KM_NOSLEEP)); 708 } 709 710 /* 711 * Same as esballoc() but sleeps waiting for memory. 712 */ 713 /*ARGSUSED*/ 714 mblk_t * 715 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 716 { 717 mblk_t *mp; 718 719 /* 720 * Note that this is structured to allow the common case (i.e. 721 * STREAMS flowtracing disabled) to call gesballoc() with tail 722 * call optimization. 723 */ 724 if (!str_ftnever) { 725 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 726 frp, freebs_enqueue, KM_SLEEP); 727 728 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 729 return (mp); 730 } 731 732 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 733 frp, freebs_enqueue, KM_SLEEP)); 734 } 735 736 /*ARGSUSED*/ 737 mblk_t * 738 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 739 { 740 mblk_t *mp; 741 742 /* 743 * Note that this is structured to allow the common case (i.e. 744 * STREAMS flowtracing disabled) to call gesballoc() with tail 745 * call optimization. 746 */ 747 if (!str_ftnever) { 748 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 749 frp, dblk_lastfree_desb, KM_NOSLEEP); 750 751 if (mp != NULL) 752 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 753 return (mp); 754 } 755 756 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 757 frp, dblk_lastfree_desb, KM_NOSLEEP)); 758 } 759 760 /*ARGSUSED*/ 761 mblk_t * 762 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 763 { 764 mblk_t *mp; 765 766 /* 767 * Note that this is structured to allow the common case (i.e. 768 * STREAMS flowtracing disabled) to call gesballoc() with tail 769 * call optimization. 770 */ 771 if (!str_ftnever) { 772 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 773 frp, freebs_enqueue, KM_NOSLEEP); 774 775 if (mp != NULL) 776 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 777 return (mp); 778 } 779 780 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 781 frp, freebs_enqueue, KM_NOSLEEP)); 782 } 783 784 /*ARGSUSED*/ 785 mblk_t * 786 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 787 { 788 mblk_t *mp; 789 790 /* 791 * Note that this is structured to allow the common case (i.e. 792 * STREAMS flowtracing disabled) to call gesballoc() with tail 793 * call optimization. 794 */ 795 if (!str_ftnever) { 796 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 797 frp, dblk_lastfree_desb, KM_NOSLEEP); 798 799 if (mp != NULL) 800 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 801 return (mp); 802 } 803 804 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 805 frp, dblk_lastfree_desb, KM_NOSLEEP)); 806 } 807 808 static void 809 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 810 { 811 bcache_t *bcp = dbp->db_cache; 812 813 ASSERT(dbp->db_mblk == mp); 814 if (dbp->db_fthdr != NULL) 815 str_ftfree(dbp); 816 817 /* set credp and projid to be 'unspecified' before returning to cache */ 818 if (dbp->db_credp != NULL) { 819 crfree(dbp->db_credp); 820 dbp->db_credp = NULL; 821 } 822 dbp->db_cpid = -1; 823 dbp->db_struioflag = 0; 824 dbp->db_struioun.cksum.flags = 0; 825 826 mutex_enter(&bcp->mutex); 827 kmem_cache_free(bcp->dblk_cache, dbp); 828 bcp->alloc--; 829 830 if (bcp->alloc == 0 && bcp->destroy != 0) { 831 kmem_cache_destroy(bcp->dblk_cache); 832 kmem_cache_destroy(bcp->buffer_cache); 833 mutex_exit(&bcp->mutex); 834 mutex_destroy(&bcp->mutex); 835 kmem_free(bcp, sizeof (bcache_t)); 836 } else { 837 mutex_exit(&bcp->mutex); 838 } 839 } 840 841 bcache_t * 842 bcache_create(char *name, size_t size, uint_t align) 843 { 844 bcache_t *bcp; 845 char buffer[255]; 846 847 ASSERT((align & (align - 1)) == 0); 848 849 if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == 850 NULL) { 851 return (NULL); 852 } 853 854 bcp->size = size; 855 bcp->align = align; 856 bcp->alloc = 0; 857 bcp->destroy = 0; 858 859 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 860 861 (void) sprintf(buffer, "%s_buffer_cache", name); 862 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 863 NULL, NULL, NULL, 0); 864 (void) sprintf(buffer, "%s_dblk_cache", name); 865 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 866 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 867 NULL, (void *)bcp, NULL, 0); 868 869 return (bcp); 870 } 871 872 void 873 bcache_destroy(bcache_t *bcp) 874 { 875 ASSERT(bcp != NULL); 876 877 mutex_enter(&bcp->mutex); 878 if (bcp->alloc == 0) { 879 kmem_cache_destroy(bcp->dblk_cache); 880 kmem_cache_destroy(bcp->buffer_cache); 881 mutex_exit(&bcp->mutex); 882 mutex_destroy(&bcp->mutex); 883 kmem_free(bcp, sizeof (bcache_t)); 884 } else { 885 bcp->destroy++; 886 mutex_exit(&bcp->mutex); 887 } 888 } 889 890 /*ARGSUSED*/ 891 mblk_t * 892 bcache_allocb(bcache_t *bcp, uint_t pri) 893 { 894 dblk_t *dbp; 895 mblk_t *mp = NULL; 896 897 ASSERT(bcp != NULL); 898 899 mutex_enter(&bcp->mutex); 900 if (bcp->destroy != 0) { 901 mutex_exit(&bcp->mutex); 902 goto out; 903 } 904 905 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 906 mutex_exit(&bcp->mutex); 907 goto out; 908 } 909 bcp->alloc++; 910 mutex_exit(&bcp->mutex); 911 912 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 913 914 mp = dbp->db_mblk; 915 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 916 mp->b_next = mp->b_prev = mp->b_cont = NULL; 917 mp->b_rptr = mp->b_wptr = dbp->db_base; 918 mp->b_queue = NULL; 919 MBLK_BAND_FLAG_WORD(mp) = 0; 920 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 921 out: 922 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 923 924 return (mp); 925 } 926 927 static void 928 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 929 { 930 ASSERT(dbp->db_mblk == mp); 931 if (dbp->db_fthdr != NULL) 932 str_ftfree(dbp); 933 934 /* set credp and projid to be 'unspecified' before returning to cache */ 935 if (dbp->db_credp != NULL) { 936 crfree(dbp->db_credp); 937 dbp->db_credp = NULL; 938 } 939 dbp->db_cpid = -1; 940 dbp->db_struioflag = 0; 941 dbp->db_struioun.cksum.flags = 0; 942 943 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 944 kmem_cache_free(dbp->db_cache, dbp); 945 } 946 947 static mblk_t * 948 allocb_oversize(size_t size, int kmflags) 949 { 950 mblk_t *mp; 951 void *buf; 952 953 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 954 if ((buf = kmem_alloc(size, kmflags)) == NULL) 955 return (NULL); 956 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 957 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 958 kmem_free(buf, size); 959 960 if (mp != NULL) 961 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 962 963 return (mp); 964 } 965 966 mblk_t * 967 allocb_tryhard(size_t target_size) 968 { 969 size_t size; 970 mblk_t *bp; 971 972 for (size = target_size; size < target_size + 512; 973 size += DBLK_CACHE_ALIGN) 974 if ((bp = allocb(size, BPRI_HI)) != NULL) 975 return (bp); 976 allocb_tryhard_fails++; 977 return (NULL); 978 } 979 980 /* 981 * This routine is consolidation private for STREAMS internal use 982 * This routine may only be called from sync routines (i.e., not 983 * from put or service procedures). It is located here (rather 984 * than strsubr.c) so that we don't have to expose all of the 985 * allocb() implementation details in header files. 986 */ 987 mblk_t * 988 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 989 { 990 dblk_t *dbp; 991 mblk_t *mp; 992 size_t index; 993 994 index = (size -1) >> DBLK_SIZE_SHIFT; 995 996 if (flags & STR_NOSIG) { 997 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 998 if (size != 0) { 999 mp = allocb_oversize(size, KM_SLEEP); 1000 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1001 (uintptr_t)mp); 1002 return (mp); 1003 } 1004 index = 0; 1005 } 1006 1007 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1008 mp = dbp->db_mblk; 1009 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1010 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1011 mp->b_rptr = mp->b_wptr = dbp->db_base; 1012 mp->b_queue = NULL; 1013 MBLK_BAND_FLAG_WORD(mp) = 0; 1014 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1015 1016 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1017 1018 } else { 1019 while ((mp = allocb(size, pri)) == NULL) { 1020 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1021 return (NULL); 1022 } 1023 } 1024 1025 return (mp); 1026 } 1027 1028 /* 1029 * Call function 'func' with 'arg' when a class zero block can 1030 * be allocated with priority 'pri'. 1031 */ 1032 bufcall_id_t 1033 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1034 { 1035 return (bufcall(1, pri, func, arg)); 1036 } 1037 1038 /* 1039 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1040 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1041 * This provides consistency for all internal allocators of ioctl. 1042 */ 1043 mblk_t * 1044 mkiocb(uint_t cmd) 1045 { 1046 struct iocblk *ioc; 1047 mblk_t *mp; 1048 1049 /* 1050 * Allocate enough space for any of the ioctl related messages. 1051 */ 1052 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1053 return (NULL); 1054 1055 bzero(mp->b_rptr, sizeof (union ioctypes)); 1056 1057 /* 1058 * Set the mblk_t information and ptrs correctly. 1059 */ 1060 mp->b_wptr += sizeof (struct iocblk); 1061 mp->b_datap->db_type = M_IOCTL; 1062 1063 /* 1064 * Fill in the fields. 1065 */ 1066 ioc = (struct iocblk *)mp->b_rptr; 1067 ioc->ioc_cmd = cmd; 1068 ioc->ioc_cr = kcred; 1069 ioc->ioc_id = getiocseqno(); 1070 ioc->ioc_flag = IOC_NATIVE; 1071 return (mp); 1072 } 1073 1074 /* 1075 * test if block of given size can be allocated with a request of 1076 * the given priority. 1077 * 'pri' is no longer used, but is retained for compatibility. 1078 */ 1079 /* ARGSUSED */ 1080 int 1081 testb(size_t size, uint_t pri) 1082 { 1083 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1084 } 1085 1086 /* 1087 * Call function 'func' with argument 'arg' when there is a reasonably 1088 * good chance that a block of size 'size' can be allocated. 1089 * 'pri' is no longer used, but is retained for compatibility. 1090 */ 1091 /* ARGSUSED */ 1092 bufcall_id_t 1093 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1094 { 1095 static long bid = 1; /* always odd to save checking for zero */ 1096 bufcall_id_t bc_id; 1097 struct strbufcall *bcp; 1098 1099 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1100 return (0); 1101 1102 bcp->bc_func = func; 1103 bcp->bc_arg = arg; 1104 bcp->bc_size = size; 1105 bcp->bc_next = NULL; 1106 bcp->bc_executor = NULL; 1107 1108 mutex_enter(&strbcall_lock); 1109 /* 1110 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1111 * should be no references to bcp since it may be freed by 1112 * runbufcalls(). Since bcp_id field is returned, we save its value in 1113 * the local var. 1114 */ 1115 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1116 1117 /* 1118 * add newly allocated stream event to existing 1119 * linked list of events. 1120 */ 1121 if (strbcalls.bc_head == NULL) { 1122 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1123 } else { 1124 strbcalls.bc_tail->bc_next = bcp; 1125 strbcalls.bc_tail = bcp; 1126 } 1127 1128 cv_signal(&strbcall_cv); 1129 mutex_exit(&strbcall_lock); 1130 return (bc_id); 1131 } 1132 1133 /* 1134 * Cancel a bufcall request. 1135 */ 1136 void 1137 unbufcall(bufcall_id_t id) 1138 { 1139 strbufcall_t *bcp, *pbcp; 1140 1141 mutex_enter(&strbcall_lock); 1142 again: 1143 pbcp = NULL; 1144 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1145 if (id == bcp->bc_id) 1146 break; 1147 pbcp = bcp; 1148 } 1149 if (bcp) { 1150 if (bcp->bc_executor != NULL) { 1151 if (bcp->bc_executor != curthread) { 1152 cv_wait(&bcall_cv, &strbcall_lock); 1153 goto again; 1154 } 1155 } else { 1156 if (pbcp) 1157 pbcp->bc_next = bcp->bc_next; 1158 else 1159 strbcalls.bc_head = bcp->bc_next; 1160 if (bcp == strbcalls.bc_tail) 1161 strbcalls.bc_tail = pbcp; 1162 kmem_free(bcp, sizeof (strbufcall_t)); 1163 } 1164 } 1165 mutex_exit(&strbcall_lock); 1166 } 1167 1168 /* 1169 * Duplicate a message block by block (uses dupb), returning 1170 * a pointer to the duplicate message. 1171 * Returns a non-NULL value only if the entire message 1172 * was dup'd. 1173 */ 1174 mblk_t * 1175 dupmsg(mblk_t *bp) 1176 { 1177 mblk_t *head, *nbp; 1178 1179 if (!bp || !(nbp = head = dupb(bp))) 1180 return (NULL); 1181 1182 while (bp->b_cont) { 1183 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1184 freemsg(head); 1185 return (NULL); 1186 } 1187 nbp = nbp->b_cont; 1188 bp = bp->b_cont; 1189 } 1190 return (head); 1191 } 1192 1193 #define DUPB_NOLOAN(bp) \ 1194 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1195 copyb((bp)) : dupb((bp))) 1196 1197 mblk_t * 1198 dupmsg_noloan(mblk_t *bp) 1199 { 1200 mblk_t *head, *nbp; 1201 1202 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1203 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1204 return (NULL); 1205 1206 while (bp->b_cont) { 1207 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1208 freemsg(head); 1209 return (NULL); 1210 } 1211 nbp = nbp->b_cont; 1212 bp = bp->b_cont; 1213 } 1214 return (head); 1215 } 1216 1217 /* 1218 * Copy data from message and data block to newly allocated message and 1219 * data block. Returns new message block pointer, or NULL if error. 1220 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1221 * as in the original even when db_base is not word aligned. (bug 1052877) 1222 */ 1223 mblk_t * 1224 copyb(mblk_t *bp) 1225 { 1226 mblk_t *nbp; 1227 dblk_t *dp, *ndp; 1228 uchar_t *base; 1229 size_t size; 1230 size_t unaligned; 1231 1232 ASSERT(bp->b_wptr >= bp->b_rptr); 1233 1234 dp = bp->b_datap; 1235 if (dp->db_fthdr != NULL) 1236 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1237 1238 /* 1239 * Special handling for Multidata message; this should be 1240 * removed once a copy-callback routine is made available. 1241 */ 1242 if (dp->db_type == M_MULTIDATA) { 1243 cred_t *cr; 1244 1245 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1246 return (NULL); 1247 1248 nbp->b_flag = bp->b_flag; 1249 nbp->b_band = bp->b_band; 1250 ndp = nbp->b_datap; 1251 1252 /* See comments below on potential issues. */ 1253 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1254 1255 ASSERT(ndp->db_type == dp->db_type); 1256 cr = dp->db_credp; 1257 if (cr != NULL) 1258 crhold(ndp->db_credp = cr); 1259 ndp->db_cpid = dp->db_cpid; 1260 return (nbp); 1261 } 1262 1263 size = dp->db_lim - dp->db_base; 1264 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1265 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1266 return (NULL); 1267 nbp->b_flag = bp->b_flag; 1268 nbp->b_band = bp->b_band; 1269 ndp = nbp->b_datap; 1270 1271 /* 1272 * Well, here is a potential issue. If we are trying to 1273 * trace a flow, and we copy the message, we might lose 1274 * information about where this message might have been. 1275 * So we should inherit the FT data. On the other hand, 1276 * a user might be interested only in alloc to free data. 1277 * So I guess the real answer is to provide a tunable. 1278 */ 1279 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1280 1281 base = ndp->db_base + unaligned; 1282 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1283 1284 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1285 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1286 1287 return (nbp); 1288 } 1289 1290 /* 1291 * Copy data from message to newly allocated message using new 1292 * data blocks. Returns a pointer to the new message, or NULL if error. 1293 */ 1294 mblk_t * 1295 copymsg(mblk_t *bp) 1296 { 1297 mblk_t *head, *nbp; 1298 1299 if (!bp || !(nbp = head = copyb(bp))) 1300 return (NULL); 1301 1302 while (bp->b_cont) { 1303 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1304 freemsg(head); 1305 return (NULL); 1306 } 1307 nbp = nbp->b_cont; 1308 bp = bp->b_cont; 1309 } 1310 return (head); 1311 } 1312 1313 /* 1314 * link a message block to tail of message 1315 */ 1316 void 1317 linkb(mblk_t *mp, mblk_t *bp) 1318 { 1319 ASSERT(mp && bp); 1320 1321 for (; mp->b_cont; mp = mp->b_cont) 1322 ; 1323 mp->b_cont = bp; 1324 } 1325 1326 /* 1327 * unlink a message block from head of message 1328 * return pointer to new message. 1329 * NULL if message becomes empty. 1330 */ 1331 mblk_t * 1332 unlinkb(mblk_t *bp) 1333 { 1334 mblk_t *bp1; 1335 1336 bp1 = bp->b_cont; 1337 bp->b_cont = NULL; 1338 return (bp1); 1339 } 1340 1341 /* 1342 * remove a message block "bp" from message "mp" 1343 * 1344 * Return pointer to new message or NULL if no message remains. 1345 * Return -1 if bp is not found in message. 1346 */ 1347 mblk_t * 1348 rmvb(mblk_t *mp, mblk_t *bp) 1349 { 1350 mblk_t *tmp; 1351 mblk_t *lastp = NULL; 1352 1353 ASSERT(mp && bp); 1354 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1355 if (tmp == bp) { 1356 if (lastp) 1357 lastp->b_cont = tmp->b_cont; 1358 else 1359 mp = tmp->b_cont; 1360 tmp->b_cont = NULL; 1361 return (mp); 1362 } 1363 lastp = tmp; 1364 } 1365 return ((mblk_t *)-1); 1366 } 1367 1368 /* 1369 * Concatenate and align first len bytes of common 1370 * message type. Len == -1, means concat everything. 1371 * Returns 1 on success, 0 on failure 1372 * After the pullup, mp points to the pulled up data. 1373 */ 1374 int 1375 pullupmsg(mblk_t *mp, ssize_t len) 1376 { 1377 mblk_t *bp, *b_cont; 1378 dblk_t *dbp; 1379 ssize_t n; 1380 1381 ASSERT(mp->b_datap->db_ref > 0); 1382 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1383 1384 /* 1385 * We won't handle Multidata message, since it contains 1386 * metadata which this function has no knowledge of; we 1387 * assert on DEBUG, and return failure otherwise. 1388 */ 1389 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1390 if (mp->b_datap->db_type == M_MULTIDATA) 1391 return (0); 1392 1393 if (len == -1) { 1394 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1395 return (1); 1396 len = xmsgsize(mp); 1397 } else { 1398 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1399 ASSERT(first_mblk_len >= 0); 1400 /* 1401 * If the length is less than that of the first mblk, 1402 * we want to pull up the message into an aligned mblk. 1403 * Though not part of the spec, some callers assume it. 1404 */ 1405 if (len <= first_mblk_len) { 1406 if (str_aligned(mp->b_rptr)) 1407 return (1); 1408 len = first_mblk_len; 1409 } else if (xmsgsize(mp) < len) 1410 return (0); 1411 } 1412 1413 if ((bp = allocb_tmpl(len, mp)) == NULL) 1414 return (0); 1415 1416 dbp = bp->b_datap; 1417 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1418 mp->b_datap = dbp; /* ... and mp heads the new message */ 1419 mp->b_datap->db_mblk = mp; 1420 bp->b_datap->db_mblk = bp; 1421 mp->b_rptr = mp->b_wptr = dbp->db_base; 1422 1423 do { 1424 ASSERT(bp->b_datap->db_ref > 0); 1425 ASSERT(bp->b_wptr >= bp->b_rptr); 1426 n = MIN(bp->b_wptr - bp->b_rptr, len); 1427 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1428 mp->b_wptr += n; 1429 bp->b_rptr += n; 1430 len -= n; 1431 if (bp->b_rptr != bp->b_wptr) 1432 break; 1433 b_cont = bp->b_cont; 1434 freeb(bp); 1435 bp = b_cont; 1436 } while (len && bp); 1437 1438 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1439 1440 return (1); 1441 } 1442 1443 /* 1444 * Concatenate and align at least the first len bytes of common message 1445 * type. Len == -1 means concatenate everything. The original message is 1446 * unaltered. Returns a pointer to a new message on success, otherwise 1447 * returns NULL. 1448 */ 1449 mblk_t * 1450 msgpullup(mblk_t *mp, ssize_t len) 1451 { 1452 mblk_t *newmp; 1453 ssize_t totlen; 1454 ssize_t n; 1455 1456 /* 1457 * We won't handle Multidata message, since it contains 1458 * metadata which this function has no knowledge of; we 1459 * assert on DEBUG, and return failure otherwise. 1460 */ 1461 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1462 if (mp->b_datap->db_type == M_MULTIDATA) 1463 return (NULL); 1464 1465 totlen = xmsgsize(mp); 1466 1467 if ((len > 0) && (len > totlen)) 1468 return (NULL); 1469 1470 /* 1471 * Copy all of the first msg type into one new mblk, then dupmsg 1472 * and link the rest onto this. 1473 */ 1474 1475 len = totlen; 1476 1477 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1478 return (NULL); 1479 1480 newmp->b_flag = mp->b_flag; 1481 newmp->b_band = mp->b_band; 1482 1483 while (len > 0) { 1484 n = mp->b_wptr - mp->b_rptr; 1485 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1486 if (n > 0) 1487 bcopy(mp->b_rptr, newmp->b_wptr, n); 1488 newmp->b_wptr += n; 1489 len -= n; 1490 mp = mp->b_cont; 1491 } 1492 1493 if (mp != NULL) { 1494 newmp->b_cont = dupmsg(mp); 1495 if (newmp->b_cont == NULL) { 1496 freemsg(newmp); 1497 return (NULL); 1498 } 1499 } 1500 1501 return (newmp); 1502 } 1503 1504 /* 1505 * Trim bytes from message 1506 * len > 0, trim from head 1507 * len < 0, trim from tail 1508 * Returns 1 on success, 0 on failure. 1509 */ 1510 int 1511 adjmsg(mblk_t *mp, ssize_t len) 1512 { 1513 mblk_t *bp; 1514 mblk_t *save_bp = NULL; 1515 mblk_t *prev_bp; 1516 mblk_t *bcont; 1517 unsigned char type; 1518 ssize_t n; 1519 int fromhead; 1520 int first; 1521 1522 ASSERT(mp != NULL); 1523 /* 1524 * We won't handle Multidata message, since it contains 1525 * metadata which this function has no knowledge of; we 1526 * assert on DEBUG, and return failure otherwise. 1527 */ 1528 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1529 if (mp->b_datap->db_type == M_MULTIDATA) 1530 return (0); 1531 1532 if (len < 0) { 1533 fromhead = 0; 1534 len = -len; 1535 } else { 1536 fromhead = 1; 1537 } 1538 1539 if (xmsgsize(mp) < len) 1540 return (0); 1541 1542 1543 if (fromhead) { 1544 first = 1; 1545 while (len) { 1546 ASSERT(mp->b_wptr >= mp->b_rptr); 1547 n = MIN(mp->b_wptr - mp->b_rptr, len); 1548 mp->b_rptr += n; 1549 len -= n; 1550 1551 /* 1552 * If this is not the first zero length 1553 * message remove it 1554 */ 1555 if (!first && (mp->b_wptr == mp->b_rptr)) { 1556 bcont = mp->b_cont; 1557 freeb(mp); 1558 mp = save_bp->b_cont = bcont; 1559 } else { 1560 save_bp = mp; 1561 mp = mp->b_cont; 1562 } 1563 first = 0; 1564 } 1565 } else { 1566 type = mp->b_datap->db_type; 1567 while (len) { 1568 bp = mp; 1569 save_bp = NULL; 1570 1571 /* 1572 * Find the last message of same type 1573 */ 1574 1575 while (bp && bp->b_datap->db_type == type) { 1576 ASSERT(bp->b_wptr >= bp->b_rptr); 1577 prev_bp = save_bp; 1578 save_bp = bp; 1579 bp = bp->b_cont; 1580 } 1581 if (save_bp == NULL) 1582 break; 1583 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1584 save_bp->b_wptr -= n; 1585 len -= n; 1586 1587 /* 1588 * If this is not the first message 1589 * and we have taken away everything 1590 * from this message, remove it 1591 */ 1592 1593 if ((save_bp != mp) && 1594 (save_bp->b_wptr == save_bp->b_rptr)) { 1595 bcont = save_bp->b_cont; 1596 freeb(save_bp); 1597 prev_bp->b_cont = bcont; 1598 } 1599 } 1600 } 1601 return (1); 1602 } 1603 1604 /* 1605 * get number of data bytes in message 1606 */ 1607 size_t 1608 msgdsize(mblk_t *bp) 1609 { 1610 size_t count = 0; 1611 1612 for (; bp; bp = bp->b_cont) 1613 if (bp->b_datap->db_type == M_DATA) { 1614 ASSERT(bp->b_wptr >= bp->b_rptr); 1615 count += bp->b_wptr - bp->b_rptr; 1616 } 1617 return (count); 1618 } 1619 1620 /* 1621 * Get a message off head of queue 1622 * 1623 * If queue has no buffers then mark queue 1624 * with QWANTR. (queue wants to be read by 1625 * someone when data becomes available) 1626 * 1627 * If there is something to take off then do so. 1628 * If queue falls below hi water mark turn off QFULL 1629 * flag. Decrement weighted count of queue. 1630 * Also turn off QWANTR because queue is being read. 1631 * 1632 * The queue count is maintained on a per-band basis. 1633 * Priority band 0 (normal messages) uses q_count, 1634 * q_lowat, etc. Non-zero priority bands use the 1635 * fields in their respective qband structures 1636 * (qb_count, qb_lowat, etc.) All messages appear 1637 * on the same list, linked via their b_next pointers. 1638 * q_first is the head of the list. q_count does 1639 * not reflect the size of all the messages on the 1640 * queue. It only reflects those messages in the 1641 * normal band of flow. The one exception to this 1642 * deals with high priority messages. They are in 1643 * their own conceptual "band", but are accounted 1644 * against q_count. 1645 * 1646 * If queue count is below the lo water mark and QWANTW 1647 * is set, enable the closest backq which has a service 1648 * procedure and turn off the QWANTW flag. 1649 * 1650 * getq could be built on top of rmvq, but isn't because 1651 * of performance considerations. 1652 * 1653 * A note on the use of q_count and q_mblkcnt: 1654 * q_count is the traditional byte count for messages that 1655 * have been put on a queue. Documentation tells us that 1656 * we shouldn't rely on that count, but some drivers/modules 1657 * do. What was needed, however, is a mechanism to prevent 1658 * runaway streams from consuming all of the resources, 1659 * and particularly be able to flow control zero-length 1660 * messages. q_mblkcnt is used for this purpose. It 1661 * counts the number of mblk's that are being put on 1662 * the queue. The intention here, is that each mblk should 1663 * contain one byte of data and, for the purpose of 1664 * flow-control, logically does. A queue will become 1665 * full when EITHER of these values (q_count and q_mblkcnt) 1666 * reach the highwater mark. It will clear when BOTH 1667 * of them drop below the highwater mark. And it will 1668 * backenable when BOTH of them drop below the lowwater 1669 * mark. 1670 * With this algorithm, a driver/module might be able 1671 * to find a reasonably accurate q_count, and the 1672 * framework can still try and limit resource usage. 1673 */ 1674 mblk_t * 1675 getq(queue_t *q) 1676 { 1677 mblk_t *bp; 1678 uchar_t band = 0; 1679 1680 bp = getq_noenab(q); 1681 if (bp != NULL) 1682 band = bp->b_band; 1683 1684 /* 1685 * Inlined from qbackenable(). 1686 * Quick check without holding the lock. 1687 */ 1688 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1689 return (bp); 1690 1691 qbackenable(q, band); 1692 return (bp); 1693 } 1694 1695 /* 1696 * Calculate number of data bytes in a single data message block taking 1697 * multidata messages into account. 1698 */ 1699 1700 #define ADD_MBLK_SIZE(mp, size) \ 1701 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1702 (size) += MBLKL(mp); \ 1703 } else { \ 1704 uint_t pinuse; \ 1705 \ 1706 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1707 (size) += pinuse; \ 1708 } 1709 1710 /* 1711 * Like getq() but does not backenable. This is used by the stream 1712 * head when a putback() is likely. The caller must call qbackenable() 1713 * after it is done with accessing the queue. 1714 */ 1715 mblk_t * 1716 getq_noenab(queue_t *q) 1717 { 1718 mblk_t *bp; 1719 mblk_t *tmp; 1720 qband_t *qbp; 1721 kthread_id_t freezer; 1722 int bytecnt = 0, mblkcnt = 0; 1723 1724 /* freezestr should allow its caller to call getq/putq */ 1725 freezer = STREAM(q)->sd_freezer; 1726 if (freezer == curthread) { 1727 ASSERT(frozenstr(q)); 1728 ASSERT(MUTEX_HELD(QLOCK(q))); 1729 } else 1730 mutex_enter(QLOCK(q)); 1731 1732 if ((bp = q->q_first) == 0) { 1733 q->q_flag |= QWANTR; 1734 } else { 1735 if ((q->q_first = bp->b_next) == NULL) 1736 q->q_last = NULL; 1737 else 1738 q->q_first->b_prev = NULL; 1739 1740 /* Get message byte count for q_count accounting */ 1741 for (tmp = bp; tmp; tmp = tmp->b_cont) { 1742 ADD_MBLK_SIZE(tmp, bytecnt); 1743 mblkcnt++; 1744 } 1745 1746 if (bp->b_band == 0) { 1747 q->q_count -= bytecnt; 1748 q->q_mblkcnt -= mblkcnt; 1749 if ((q->q_count < q->q_hiwat) && 1750 (q->q_mblkcnt < q->q_hiwat)) { 1751 q->q_flag &= ~QFULL; 1752 } 1753 } else { 1754 int i; 1755 1756 ASSERT(bp->b_band <= q->q_nband); 1757 ASSERT(q->q_bandp != NULL); 1758 ASSERT(MUTEX_HELD(QLOCK(q))); 1759 qbp = q->q_bandp; 1760 i = bp->b_band; 1761 while (--i > 0) 1762 qbp = qbp->qb_next; 1763 if (qbp->qb_first == qbp->qb_last) { 1764 qbp->qb_first = NULL; 1765 qbp->qb_last = NULL; 1766 } else { 1767 qbp->qb_first = bp->b_next; 1768 } 1769 qbp->qb_count -= bytecnt; 1770 qbp->qb_mblkcnt -= mblkcnt; 1771 if ((qbp->qb_count < qbp->qb_hiwat) && 1772 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1773 qbp->qb_flag &= ~QB_FULL; 1774 } 1775 } 1776 q->q_flag &= ~QWANTR; 1777 bp->b_next = NULL; 1778 bp->b_prev = NULL; 1779 } 1780 if (freezer != curthread) 1781 mutex_exit(QLOCK(q)); 1782 1783 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 1784 1785 return (bp); 1786 } 1787 1788 /* 1789 * Determine if a backenable is needed after removing a message in the 1790 * specified band. 1791 * NOTE: This routine assumes that something like getq_noenab() has been 1792 * already called. 1793 * 1794 * For the read side it is ok to hold sd_lock across calling this (and the 1795 * stream head often does). 1796 * But for the write side strwakeq might be invoked and it acquires sd_lock. 1797 */ 1798 void 1799 qbackenable(queue_t *q, uchar_t band) 1800 { 1801 int backenab = 0; 1802 qband_t *qbp; 1803 kthread_id_t freezer; 1804 1805 ASSERT(q); 1806 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 1807 1808 /* 1809 * Quick check without holding the lock. 1810 * OK since after getq() has lowered the q_count these flags 1811 * would not change unless either the qbackenable() is done by 1812 * another thread (which is ok) or the queue has gotten QFULL 1813 * in which case another backenable will take place when the queue 1814 * drops below q_lowat. 1815 */ 1816 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1817 return; 1818 1819 /* freezestr should allow its caller to call getq/putq */ 1820 freezer = STREAM(q)->sd_freezer; 1821 if (freezer == curthread) { 1822 ASSERT(frozenstr(q)); 1823 ASSERT(MUTEX_HELD(QLOCK(q))); 1824 } else 1825 mutex_enter(QLOCK(q)); 1826 1827 if (band == 0) { 1828 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 1829 q->q_mblkcnt < q->q_lowat)) { 1830 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 1831 } 1832 } else { 1833 int i; 1834 1835 ASSERT((unsigned)band <= q->q_nband); 1836 ASSERT(q->q_bandp != NULL); 1837 1838 qbp = q->q_bandp; 1839 i = band; 1840 while (--i > 0) 1841 qbp = qbp->qb_next; 1842 1843 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 1844 qbp->qb_mblkcnt < qbp->qb_lowat)) { 1845 backenab = qbp->qb_flag & QB_WANTW; 1846 } 1847 } 1848 1849 if (backenab == 0) { 1850 if (freezer != curthread) 1851 mutex_exit(QLOCK(q)); 1852 return; 1853 } 1854 1855 /* Have to drop the lock across strwakeq and backenable */ 1856 if (backenab & QWANTWSYNC) 1857 q->q_flag &= ~QWANTWSYNC; 1858 if (backenab & (QWANTW|QB_WANTW)) { 1859 if (band != 0) 1860 qbp->qb_flag &= ~QB_WANTW; 1861 else { 1862 q->q_flag &= ~QWANTW; 1863 } 1864 } 1865 1866 if (freezer != curthread) 1867 mutex_exit(QLOCK(q)); 1868 1869 if (backenab & QWANTWSYNC) 1870 strwakeq(q, QWANTWSYNC); 1871 if (backenab & (QWANTW|QB_WANTW)) 1872 backenable(q, band); 1873 } 1874 1875 /* 1876 * Remove a message from a queue. The queue count and other 1877 * flow control parameters are adjusted and the back queue 1878 * enabled if necessary. 1879 * 1880 * rmvq can be called with the stream frozen, but other utility functions 1881 * holding QLOCK, and by streams modules without any locks/frozen. 1882 */ 1883 void 1884 rmvq(queue_t *q, mblk_t *mp) 1885 { 1886 ASSERT(mp != NULL); 1887 1888 rmvq_noenab(q, mp); 1889 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 1890 /* 1891 * qbackenable can handle a frozen stream but not a "random" 1892 * qlock being held. Drop lock across qbackenable. 1893 */ 1894 mutex_exit(QLOCK(q)); 1895 qbackenable(q, mp->b_band); 1896 mutex_enter(QLOCK(q)); 1897 } else { 1898 qbackenable(q, mp->b_band); 1899 } 1900 } 1901 1902 /* 1903 * Like rmvq() but without any backenabling. 1904 * This exists to handle SR_CONSOL_DATA in strrput(). 1905 */ 1906 void 1907 rmvq_noenab(queue_t *q, mblk_t *mp) 1908 { 1909 mblk_t *tmp; 1910 int i; 1911 qband_t *qbp = NULL; 1912 kthread_id_t freezer; 1913 int bytecnt = 0, mblkcnt = 0; 1914 1915 freezer = STREAM(q)->sd_freezer; 1916 if (freezer == curthread) { 1917 ASSERT(frozenstr(q)); 1918 ASSERT(MUTEX_HELD(QLOCK(q))); 1919 } else if (MUTEX_HELD(QLOCK(q))) { 1920 /* Don't drop lock on exit */ 1921 freezer = curthread; 1922 } else 1923 mutex_enter(QLOCK(q)); 1924 1925 ASSERT(mp->b_band <= q->q_nband); 1926 if (mp->b_band != 0) { /* Adjust band pointers */ 1927 ASSERT(q->q_bandp != NULL); 1928 qbp = q->q_bandp; 1929 i = mp->b_band; 1930 while (--i > 0) 1931 qbp = qbp->qb_next; 1932 if (mp == qbp->qb_first) { 1933 if (mp->b_next && mp->b_band == mp->b_next->b_band) 1934 qbp->qb_first = mp->b_next; 1935 else 1936 qbp->qb_first = NULL; 1937 } 1938 if (mp == qbp->qb_last) { 1939 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 1940 qbp->qb_last = mp->b_prev; 1941 else 1942 qbp->qb_last = NULL; 1943 } 1944 } 1945 1946 /* 1947 * Remove the message from the list. 1948 */ 1949 if (mp->b_prev) 1950 mp->b_prev->b_next = mp->b_next; 1951 else 1952 q->q_first = mp->b_next; 1953 if (mp->b_next) 1954 mp->b_next->b_prev = mp->b_prev; 1955 else 1956 q->q_last = mp->b_prev; 1957 mp->b_next = NULL; 1958 mp->b_prev = NULL; 1959 1960 /* Get the size of the message for q_count accounting */ 1961 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1962 ADD_MBLK_SIZE(tmp, bytecnt); 1963 mblkcnt++; 1964 } 1965 1966 if (mp->b_band == 0) { /* Perform q_count accounting */ 1967 q->q_count -= bytecnt; 1968 q->q_mblkcnt -= mblkcnt; 1969 if ((q->q_count < q->q_hiwat) && 1970 (q->q_mblkcnt < q->q_hiwat)) { 1971 q->q_flag &= ~QFULL; 1972 } 1973 } else { /* Perform qb_count accounting */ 1974 qbp->qb_count -= bytecnt; 1975 qbp->qb_mblkcnt -= mblkcnt; 1976 if ((qbp->qb_count < qbp->qb_hiwat) && 1977 (qbp->qb_mblkcnt < qbp->qb_hiwat)) { 1978 qbp->qb_flag &= ~QB_FULL; 1979 } 1980 } 1981 if (freezer != curthread) 1982 mutex_exit(QLOCK(q)); 1983 1984 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 1985 } 1986 1987 /* 1988 * Empty a queue. 1989 * If flag is set, remove all messages. Otherwise, remove 1990 * only non-control messages. If queue falls below its low 1991 * water mark, and QWANTW is set, enable the nearest upstream 1992 * service procedure. 1993 * 1994 * Historical note: when merging the M_FLUSH code in strrput with this 1995 * code one difference was discovered. flushq did not have a check 1996 * for q_lowat == 0 in the backenabling test. 1997 * 1998 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 1999 * if one exists on the queue. 2000 */ 2001 void 2002 flushq_common(queue_t *q, int flag, int pcproto_flag) 2003 { 2004 mblk_t *mp, *nmp; 2005 qband_t *qbp; 2006 int backenab = 0; 2007 unsigned char bpri; 2008 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2009 2010 if (q->q_first == NULL) 2011 return; 2012 2013 mutex_enter(QLOCK(q)); 2014 mp = q->q_first; 2015 q->q_first = NULL; 2016 q->q_last = NULL; 2017 q->q_count = 0; 2018 q->q_mblkcnt = 0; 2019 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2020 qbp->qb_first = NULL; 2021 qbp->qb_last = NULL; 2022 qbp->qb_count = 0; 2023 qbp->qb_mblkcnt = 0; 2024 qbp->qb_flag &= ~QB_FULL; 2025 } 2026 q->q_flag &= ~QFULL; 2027 mutex_exit(QLOCK(q)); 2028 while (mp) { 2029 nmp = mp->b_next; 2030 mp->b_next = mp->b_prev = NULL; 2031 2032 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2033 2034 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2035 (void) putq(q, mp); 2036 else if (flag || datamsg(mp->b_datap->db_type)) 2037 freemsg(mp); 2038 else 2039 (void) putq(q, mp); 2040 mp = nmp; 2041 } 2042 bpri = 1; 2043 mutex_enter(QLOCK(q)); 2044 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2045 if ((qbp->qb_flag & QB_WANTW) && 2046 (((qbp->qb_count < qbp->qb_lowat) && 2047 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2048 qbp->qb_lowat == 0)) { 2049 qbp->qb_flag &= ~QB_WANTW; 2050 backenab = 1; 2051 qbf[bpri] = 1; 2052 } else 2053 qbf[bpri] = 0; 2054 bpri++; 2055 } 2056 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2057 if ((q->q_flag & QWANTW) && 2058 (((q->q_count < q->q_lowat) && 2059 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2060 q->q_flag &= ~QWANTW; 2061 backenab = 1; 2062 qbf[0] = 1; 2063 } else 2064 qbf[0] = 0; 2065 2066 /* 2067 * If any band can now be written to, and there is a writer 2068 * for that band, then backenable the closest service procedure. 2069 */ 2070 if (backenab) { 2071 mutex_exit(QLOCK(q)); 2072 for (bpri = q->q_nband; bpri != 0; bpri--) 2073 if (qbf[bpri]) 2074 backenable(q, bpri); 2075 if (qbf[0]) 2076 backenable(q, 0); 2077 } else 2078 mutex_exit(QLOCK(q)); 2079 } 2080 2081 /* 2082 * The real flushing takes place in flushq_common. This is done so that 2083 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2084 * or not. Currently the only place that uses this flag is the stream head. 2085 */ 2086 void 2087 flushq(queue_t *q, int flag) 2088 { 2089 flushq_common(q, flag, 0); 2090 } 2091 2092 /* 2093 * Flush the queue of messages of the given priority band. 2094 * There is some duplication of code between flushq and flushband. 2095 * This is because we want to optimize the code as much as possible. 2096 * The assumption is that there will be more messages in the normal 2097 * (priority 0) band than in any other. 2098 * 2099 * Historical note: when merging the M_FLUSH code in strrput with this 2100 * code one difference was discovered. flushband had an extra check for 2101 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2102 * case. That check does not match the man page for flushband and was not 2103 * in the strrput flush code hence it was removed. 2104 */ 2105 void 2106 flushband(queue_t *q, unsigned char pri, int flag) 2107 { 2108 mblk_t *mp; 2109 mblk_t *nmp; 2110 mblk_t *last; 2111 qband_t *qbp; 2112 int band; 2113 2114 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2115 if (pri > q->q_nband) { 2116 return; 2117 } 2118 mutex_enter(QLOCK(q)); 2119 if (pri == 0) { 2120 mp = q->q_first; 2121 q->q_first = NULL; 2122 q->q_last = NULL; 2123 q->q_count = 0; 2124 q->q_mblkcnt = 0; 2125 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2126 qbp->qb_first = NULL; 2127 qbp->qb_last = NULL; 2128 qbp->qb_count = 0; 2129 qbp->qb_mblkcnt = 0; 2130 qbp->qb_flag &= ~QB_FULL; 2131 } 2132 q->q_flag &= ~QFULL; 2133 mutex_exit(QLOCK(q)); 2134 while (mp) { 2135 nmp = mp->b_next; 2136 mp->b_next = mp->b_prev = NULL; 2137 if ((mp->b_band == 0) && 2138 ((flag == FLUSHALL) || 2139 datamsg(mp->b_datap->db_type))) 2140 freemsg(mp); 2141 else 2142 (void) putq(q, mp); 2143 mp = nmp; 2144 } 2145 mutex_enter(QLOCK(q)); 2146 if ((q->q_flag & QWANTW) && 2147 (((q->q_count < q->q_lowat) && 2148 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2149 q->q_flag &= ~QWANTW; 2150 mutex_exit(QLOCK(q)); 2151 2152 backenable(q, pri); 2153 } else 2154 mutex_exit(QLOCK(q)); 2155 } else { /* pri != 0 */ 2156 boolean_t flushed = B_FALSE; 2157 band = pri; 2158 2159 ASSERT(MUTEX_HELD(QLOCK(q))); 2160 qbp = q->q_bandp; 2161 while (--band > 0) 2162 qbp = qbp->qb_next; 2163 mp = qbp->qb_first; 2164 if (mp == NULL) { 2165 mutex_exit(QLOCK(q)); 2166 return; 2167 } 2168 last = qbp->qb_last->b_next; 2169 /* 2170 * rmvq_noenab() and freemsg() are called for each mblk that 2171 * meets the criteria. The loop is executed until the last 2172 * mblk has been processed. 2173 */ 2174 while (mp != last) { 2175 ASSERT(mp->b_band == pri); 2176 nmp = mp->b_next; 2177 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2178 rmvq_noenab(q, mp); 2179 freemsg(mp); 2180 flushed = B_TRUE; 2181 } 2182 mp = nmp; 2183 } 2184 mutex_exit(QLOCK(q)); 2185 2186 /* 2187 * If any mblk(s) has been freed, we know that qbackenable() 2188 * will need to be called. 2189 */ 2190 if (flushed) 2191 qbackenable(q, pri); 2192 } 2193 } 2194 2195 /* 2196 * Return 1 if the queue is not full. If the queue is full, return 2197 * 0 (may not put message) and set QWANTW flag (caller wants to write 2198 * to the queue). 2199 */ 2200 int 2201 canput(queue_t *q) 2202 { 2203 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2204 2205 /* this is for loopback transports, they should not do a canput */ 2206 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2207 2208 /* Find next forward module that has a service procedure */ 2209 q = q->q_nfsrv; 2210 2211 if (!(q->q_flag & QFULL)) { 2212 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2213 return (1); 2214 } 2215 mutex_enter(QLOCK(q)); 2216 if (q->q_flag & QFULL) { 2217 q->q_flag |= QWANTW; 2218 mutex_exit(QLOCK(q)); 2219 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2220 return (0); 2221 } 2222 mutex_exit(QLOCK(q)); 2223 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2224 return (1); 2225 } 2226 2227 /* 2228 * This is the new canput for use with priority bands. Return 1 if the 2229 * band is not full. If the band is full, return 0 (may not put message) 2230 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2231 * write to the queue). 2232 */ 2233 int 2234 bcanput(queue_t *q, unsigned char pri) 2235 { 2236 qband_t *qbp; 2237 2238 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2239 if (!q) 2240 return (0); 2241 2242 /* Find next forward module that has a service procedure */ 2243 q = q->q_nfsrv; 2244 2245 mutex_enter(QLOCK(q)); 2246 if (pri == 0) { 2247 if (q->q_flag & QFULL) { 2248 q->q_flag |= QWANTW; 2249 mutex_exit(QLOCK(q)); 2250 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2251 "bcanput:%p %X %d", q, pri, 0); 2252 return (0); 2253 } 2254 } else { /* pri != 0 */ 2255 if (pri > q->q_nband) { 2256 /* 2257 * No band exists yet, so return success. 2258 */ 2259 mutex_exit(QLOCK(q)); 2260 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2261 "bcanput:%p %X %d", q, pri, 1); 2262 return (1); 2263 } 2264 qbp = q->q_bandp; 2265 while (--pri) 2266 qbp = qbp->qb_next; 2267 if (qbp->qb_flag & QB_FULL) { 2268 qbp->qb_flag |= QB_WANTW; 2269 mutex_exit(QLOCK(q)); 2270 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2271 "bcanput:%p %X %d", q, pri, 0); 2272 return (0); 2273 } 2274 } 2275 mutex_exit(QLOCK(q)); 2276 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2277 "bcanput:%p %X %d", q, pri, 1); 2278 return (1); 2279 } 2280 2281 /* 2282 * Put a message on a queue. 2283 * 2284 * Messages are enqueued on a priority basis. The priority classes 2285 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2286 * and B_NORMAL (type < QPCTL && band == 0). 2287 * 2288 * Add appropriate weighted data block sizes to queue count. 2289 * If queue hits high water mark then set QFULL flag. 2290 * 2291 * If QNOENAB is not set (putq is allowed to enable the queue), 2292 * enable the queue only if the message is PRIORITY, 2293 * or the QWANTR flag is set (indicating that the service procedure 2294 * is ready to read the queue. This implies that a service 2295 * procedure must NEVER put a high priority message back on its own 2296 * queue, as this would result in an infinite loop (!). 2297 */ 2298 int 2299 putq(queue_t *q, mblk_t *bp) 2300 { 2301 mblk_t *tmp; 2302 qband_t *qbp = NULL; 2303 int mcls = (int)queclass(bp); 2304 kthread_id_t freezer; 2305 int bytecnt = 0, mblkcnt = 0; 2306 2307 freezer = STREAM(q)->sd_freezer; 2308 if (freezer == curthread) { 2309 ASSERT(frozenstr(q)); 2310 ASSERT(MUTEX_HELD(QLOCK(q))); 2311 } else 2312 mutex_enter(QLOCK(q)); 2313 2314 /* 2315 * Make sanity checks and if qband structure is not yet 2316 * allocated, do so. 2317 */ 2318 if (mcls == QPCTL) { 2319 if (bp->b_band != 0) 2320 bp->b_band = 0; /* force to be correct */ 2321 } else if (bp->b_band != 0) { 2322 int i; 2323 qband_t **qbpp; 2324 2325 if (bp->b_band > q->q_nband) { 2326 2327 /* 2328 * The qband structure for this priority band is 2329 * not on the queue yet, so we have to allocate 2330 * one on the fly. It would be wasteful to 2331 * associate the qband structures with every 2332 * queue when the queues are allocated. This is 2333 * because most queues will only need the normal 2334 * band of flow which can be described entirely 2335 * by the queue itself. 2336 */ 2337 qbpp = &q->q_bandp; 2338 while (*qbpp) 2339 qbpp = &(*qbpp)->qb_next; 2340 while (bp->b_band > q->q_nband) { 2341 if ((*qbpp = allocband()) == NULL) { 2342 if (freezer != curthread) 2343 mutex_exit(QLOCK(q)); 2344 return (0); 2345 } 2346 (*qbpp)->qb_hiwat = q->q_hiwat; 2347 (*qbpp)->qb_lowat = q->q_lowat; 2348 q->q_nband++; 2349 qbpp = &(*qbpp)->qb_next; 2350 } 2351 } 2352 ASSERT(MUTEX_HELD(QLOCK(q))); 2353 qbp = q->q_bandp; 2354 i = bp->b_band; 2355 while (--i) 2356 qbp = qbp->qb_next; 2357 } 2358 2359 /* 2360 * If queue is empty, add the message and initialize the pointers. 2361 * Otherwise, adjust message pointers and queue pointers based on 2362 * the type of the message and where it belongs on the queue. Some 2363 * code is duplicated to minimize the number of conditionals and 2364 * hopefully minimize the amount of time this routine takes. 2365 */ 2366 if (!q->q_first) { 2367 bp->b_next = NULL; 2368 bp->b_prev = NULL; 2369 q->q_first = bp; 2370 q->q_last = bp; 2371 if (qbp) { 2372 qbp->qb_first = bp; 2373 qbp->qb_last = bp; 2374 } 2375 } else if (!qbp) { /* bp->b_band == 0 */ 2376 2377 /* 2378 * If queue class of message is less than or equal to 2379 * that of the last one on the queue, tack on to the end. 2380 */ 2381 tmp = q->q_last; 2382 if (mcls <= (int)queclass(tmp)) { 2383 bp->b_next = NULL; 2384 bp->b_prev = tmp; 2385 tmp->b_next = bp; 2386 q->q_last = bp; 2387 } else { 2388 tmp = q->q_first; 2389 while ((int)queclass(tmp) >= mcls) 2390 tmp = tmp->b_next; 2391 2392 /* 2393 * Insert bp before tmp. 2394 */ 2395 bp->b_next = tmp; 2396 bp->b_prev = tmp->b_prev; 2397 if (tmp->b_prev) 2398 tmp->b_prev->b_next = bp; 2399 else 2400 q->q_first = bp; 2401 tmp->b_prev = bp; 2402 } 2403 } else { /* bp->b_band != 0 */ 2404 if (qbp->qb_first) { 2405 tmp = qbp->qb_last; 2406 2407 /* 2408 * Insert bp after the last message in this band. 2409 */ 2410 bp->b_next = tmp->b_next; 2411 if (tmp->b_next) 2412 tmp->b_next->b_prev = bp; 2413 else 2414 q->q_last = bp; 2415 bp->b_prev = tmp; 2416 tmp->b_next = bp; 2417 } else { 2418 tmp = q->q_last; 2419 if ((mcls < (int)queclass(tmp)) || 2420 (bp->b_band <= tmp->b_band)) { 2421 2422 /* 2423 * Tack bp on end of queue. 2424 */ 2425 bp->b_next = NULL; 2426 bp->b_prev = tmp; 2427 tmp->b_next = bp; 2428 q->q_last = bp; 2429 } else { 2430 tmp = q->q_first; 2431 while (tmp->b_datap->db_type >= QPCTL) 2432 tmp = tmp->b_next; 2433 while (tmp->b_band >= bp->b_band) 2434 tmp = tmp->b_next; 2435 2436 /* 2437 * Insert bp before tmp. 2438 */ 2439 bp->b_next = tmp; 2440 bp->b_prev = tmp->b_prev; 2441 if (tmp->b_prev) 2442 tmp->b_prev->b_next = bp; 2443 else 2444 q->q_first = bp; 2445 tmp->b_prev = bp; 2446 } 2447 qbp->qb_first = bp; 2448 } 2449 qbp->qb_last = bp; 2450 } 2451 2452 /* Get message byte count for q_count accounting */ 2453 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2454 ADD_MBLK_SIZE(tmp, bytecnt); 2455 mblkcnt++; 2456 } 2457 2458 if (qbp) { 2459 qbp->qb_count += bytecnt; 2460 qbp->qb_mblkcnt += mblkcnt; 2461 if ((qbp->qb_count >= qbp->qb_hiwat) || 2462 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2463 qbp->qb_flag |= QB_FULL; 2464 } 2465 } else { 2466 q->q_count += bytecnt; 2467 q->q_mblkcnt += mblkcnt; 2468 if ((q->q_count >= q->q_hiwat) || 2469 (q->q_mblkcnt >= q->q_hiwat)) { 2470 q->q_flag |= QFULL; 2471 } 2472 } 2473 2474 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2475 2476 if ((mcls > QNORM) || 2477 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2478 qenable_locked(q); 2479 ASSERT(MUTEX_HELD(QLOCK(q))); 2480 if (freezer != curthread) 2481 mutex_exit(QLOCK(q)); 2482 2483 return (1); 2484 } 2485 2486 /* 2487 * Put stuff back at beginning of Q according to priority order. 2488 * See comment on putq above for details. 2489 */ 2490 int 2491 putbq(queue_t *q, mblk_t *bp) 2492 { 2493 mblk_t *tmp; 2494 qband_t *qbp = NULL; 2495 int mcls = (int)queclass(bp); 2496 kthread_id_t freezer; 2497 int bytecnt = 0, mblkcnt = 0; 2498 2499 ASSERT(q && bp); 2500 ASSERT(bp->b_next == NULL); 2501 freezer = STREAM(q)->sd_freezer; 2502 if (freezer == curthread) { 2503 ASSERT(frozenstr(q)); 2504 ASSERT(MUTEX_HELD(QLOCK(q))); 2505 } else 2506 mutex_enter(QLOCK(q)); 2507 2508 /* 2509 * Make sanity checks and if qband structure is not yet 2510 * allocated, do so. 2511 */ 2512 if (mcls == QPCTL) { 2513 if (bp->b_band != 0) 2514 bp->b_band = 0; /* force to be correct */ 2515 } else if (bp->b_band != 0) { 2516 int i; 2517 qband_t **qbpp; 2518 2519 if (bp->b_band > q->q_nband) { 2520 qbpp = &q->q_bandp; 2521 while (*qbpp) 2522 qbpp = &(*qbpp)->qb_next; 2523 while (bp->b_band > q->q_nband) { 2524 if ((*qbpp = allocband()) == NULL) { 2525 if (freezer != curthread) 2526 mutex_exit(QLOCK(q)); 2527 return (0); 2528 } 2529 (*qbpp)->qb_hiwat = q->q_hiwat; 2530 (*qbpp)->qb_lowat = q->q_lowat; 2531 q->q_nband++; 2532 qbpp = &(*qbpp)->qb_next; 2533 } 2534 } 2535 qbp = q->q_bandp; 2536 i = bp->b_band; 2537 while (--i) 2538 qbp = qbp->qb_next; 2539 } 2540 2541 /* 2542 * If queue is empty or if message is high priority, 2543 * place on the front of the queue. 2544 */ 2545 tmp = q->q_first; 2546 if ((!tmp) || (mcls == QPCTL)) { 2547 bp->b_next = tmp; 2548 if (tmp) 2549 tmp->b_prev = bp; 2550 else 2551 q->q_last = bp; 2552 q->q_first = bp; 2553 bp->b_prev = NULL; 2554 if (qbp) { 2555 qbp->qb_first = bp; 2556 qbp->qb_last = bp; 2557 } 2558 } else if (qbp) { /* bp->b_band != 0 */ 2559 tmp = qbp->qb_first; 2560 if (tmp) { 2561 2562 /* 2563 * Insert bp before the first message in this band. 2564 */ 2565 bp->b_next = tmp; 2566 bp->b_prev = tmp->b_prev; 2567 if (tmp->b_prev) 2568 tmp->b_prev->b_next = bp; 2569 else 2570 q->q_first = bp; 2571 tmp->b_prev = bp; 2572 } else { 2573 tmp = q->q_last; 2574 if ((mcls < (int)queclass(tmp)) || 2575 (bp->b_band < tmp->b_band)) { 2576 2577 /* 2578 * Tack bp on end of queue. 2579 */ 2580 bp->b_next = NULL; 2581 bp->b_prev = tmp; 2582 tmp->b_next = bp; 2583 q->q_last = bp; 2584 } else { 2585 tmp = q->q_first; 2586 while (tmp->b_datap->db_type >= QPCTL) 2587 tmp = tmp->b_next; 2588 while (tmp->b_band > bp->b_band) 2589 tmp = tmp->b_next; 2590 2591 /* 2592 * Insert bp before tmp. 2593 */ 2594 bp->b_next = tmp; 2595 bp->b_prev = tmp->b_prev; 2596 if (tmp->b_prev) 2597 tmp->b_prev->b_next = bp; 2598 else 2599 q->q_first = bp; 2600 tmp->b_prev = bp; 2601 } 2602 qbp->qb_last = bp; 2603 } 2604 qbp->qb_first = bp; 2605 } else { /* bp->b_band == 0 && !QPCTL */ 2606 2607 /* 2608 * If the queue class or band is less than that of the last 2609 * message on the queue, tack bp on the end of the queue. 2610 */ 2611 tmp = q->q_last; 2612 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2613 bp->b_next = NULL; 2614 bp->b_prev = tmp; 2615 tmp->b_next = bp; 2616 q->q_last = bp; 2617 } else { 2618 tmp = q->q_first; 2619 while (tmp->b_datap->db_type >= QPCTL) 2620 tmp = tmp->b_next; 2621 while (tmp->b_band > bp->b_band) 2622 tmp = tmp->b_next; 2623 2624 /* 2625 * Insert bp before tmp. 2626 */ 2627 bp->b_next = tmp; 2628 bp->b_prev = tmp->b_prev; 2629 if (tmp->b_prev) 2630 tmp->b_prev->b_next = bp; 2631 else 2632 q->q_first = bp; 2633 tmp->b_prev = bp; 2634 } 2635 } 2636 2637 /* Get message byte count for q_count accounting */ 2638 for (tmp = bp; tmp; tmp = tmp->b_cont) { 2639 ADD_MBLK_SIZE(tmp, bytecnt); 2640 mblkcnt++; 2641 } 2642 if (qbp) { 2643 qbp->qb_count += bytecnt; 2644 qbp->qb_mblkcnt += mblkcnt; 2645 if ((qbp->qb_count >= qbp->qb_hiwat) || 2646 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2647 qbp->qb_flag |= QB_FULL; 2648 } 2649 } else { 2650 q->q_count += bytecnt; 2651 q->q_mblkcnt += mblkcnt; 2652 if ((q->q_count >= q->q_hiwat) || 2653 (q->q_mblkcnt >= q->q_hiwat)) { 2654 q->q_flag |= QFULL; 2655 } 2656 } 2657 2658 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2659 2660 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2661 qenable_locked(q); 2662 ASSERT(MUTEX_HELD(QLOCK(q))); 2663 if (freezer != curthread) 2664 mutex_exit(QLOCK(q)); 2665 2666 return (1); 2667 } 2668 2669 /* 2670 * Insert a message before an existing message on the queue. If the 2671 * existing message is NULL, the new messages is placed on the end of 2672 * the queue. The queue class of the new message is ignored. However, 2673 * the priority band of the new message must adhere to the following 2674 * ordering: 2675 * 2676 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2677 * 2678 * All flow control parameters are updated. 2679 * 2680 * insq can be called with the stream frozen, but other utility functions 2681 * holding QLOCK, and by streams modules without any locks/frozen. 2682 */ 2683 int 2684 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2685 { 2686 mblk_t *tmp; 2687 qband_t *qbp = NULL; 2688 int mcls = (int)queclass(mp); 2689 kthread_id_t freezer; 2690 int bytecnt = 0, mblkcnt = 0; 2691 2692 freezer = STREAM(q)->sd_freezer; 2693 if (freezer == curthread) { 2694 ASSERT(frozenstr(q)); 2695 ASSERT(MUTEX_HELD(QLOCK(q))); 2696 } else if (MUTEX_HELD(QLOCK(q))) { 2697 /* Don't drop lock on exit */ 2698 freezer = curthread; 2699 } else 2700 mutex_enter(QLOCK(q)); 2701 2702 if (mcls == QPCTL) { 2703 if (mp->b_band != 0) 2704 mp->b_band = 0; /* force to be correct */ 2705 if (emp && emp->b_prev && 2706 (emp->b_prev->b_datap->db_type < QPCTL)) 2707 goto badord; 2708 } 2709 if (emp) { 2710 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2711 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2712 (emp->b_prev->b_band < mp->b_band))) { 2713 goto badord; 2714 } 2715 } else { 2716 tmp = q->q_last; 2717 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2718 badord: 2719 cmn_err(CE_WARN, 2720 "insq: attempt to insert message out of order " 2721 "on q %p", (void *)q); 2722 if (freezer != curthread) 2723 mutex_exit(QLOCK(q)); 2724 return (0); 2725 } 2726 } 2727 2728 if (mp->b_band != 0) { 2729 int i; 2730 qband_t **qbpp; 2731 2732 if (mp->b_band > q->q_nband) { 2733 qbpp = &q->q_bandp; 2734 while (*qbpp) 2735 qbpp = &(*qbpp)->qb_next; 2736 while (mp->b_band > q->q_nband) { 2737 if ((*qbpp = allocband()) == NULL) { 2738 if (freezer != curthread) 2739 mutex_exit(QLOCK(q)); 2740 return (0); 2741 } 2742 (*qbpp)->qb_hiwat = q->q_hiwat; 2743 (*qbpp)->qb_lowat = q->q_lowat; 2744 q->q_nband++; 2745 qbpp = &(*qbpp)->qb_next; 2746 } 2747 } 2748 qbp = q->q_bandp; 2749 i = mp->b_band; 2750 while (--i) 2751 qbp = qbp->qb_next; 2752 } 2753 2754 if ((mp->b_next = emp) != NULL) { 2755 if ((mp->b_prev = emp->b_prev) != NULL) 2756 emp->b_prev->b_next = mp; 2757 else 2758 q->q_first = mp; 2759 emp->b_prev = mp; 2760 } else { 2761 if ((mp->b_prev = q->q_last) != NULL) 2762 q->q_last->b_next = mp; 2763 else 2764 q->q_first = mp; 2765 q->q_last = mp; 2766 } 2767 2768 /* Get mblk and byte count for q_count accounting */ 2769 for (tmp = mp; tmp; tmp = tmp->b_cont) { 2770 ADD_MBLK_SIZE(tmp, bytecnt); 2771 mblkcnt++; 2772 } 2773 2774 if (qbp) { /* adjust qband pointers and count */ 2775 if (!qbp->qb_first) { 2776 qbp->qb_first = mp; 2777 qbp->qb_last = mp; 2778 } else { 2779 if (mp->b_prev == NULL || (mp->b_prev != NULL && 2780 (mp->b_prev->b_band != mp->b_band))) 2781 qbp->qb_first = mp; 2782 else if (mp->b_next == NULL || (mp->b_next != NULL && 2783 (mp->b_next->b_band != mp->b_band))) 2784 qbp->qb_last = mp; 2785 } 2786 qbp->qb_count += bytecnt; 2787 qbp->qb_mblkcnt += mblkcnt; 2788 if ((qbp->qb_count >= qbp->qb_hiwat) || 2789 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2790 qbp->qb_flag |= QB_FULL; 2791 } 2792 } else { 2793 q->q_count += bytecnt; 2794 q->q_mblkcnt += mblkcnt; 2795 if ((q->q_count >= q->q_hiwat) || 2796 (q->q_mblkcnt >= q->q_hiwat)) { 2797 q->q_flag |= QFULL; 2798 } 2799 } 2800 2801 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 2802 2803 if (canenable(q) && (q->q_flag & QWANTR)) 2804 qenable_locked(q); 2805 2806 ASSERT(MUTEX_HELD(QLOCK(q))); 2807 if (freezer != curthread) 2808 mutex_exit(QLOCK(q)); 2809 2810 return (1); 2811 } 2812 2813 /* 2814 * Create and put a control message on queue. 2815 */ 2816 int 2817 putctl(queue_t *q, int type) 2818 { 2819 mblk_t *bp; 2820 2821 if ((datamsg(type) && (type != M_DELAY)) || 2822 (bp = allocb_tryhard(0)) == NULL) 2823 return (0); 2824 bp->b_datap->db_type = (unsigned char) type; 2825 2826 put(q, bp); 2827 2828 return (1); 2829 } 2830 2831 /* 2832 * Control message with a single-byte parameter 2833 */ 2834 int 2835 putctl1(queue_t *q, int type, int param) 2836 { 2837 mblk_t *bp; 2838 2839 if ((datamsg(type) && (type != M_DELAY)) || 2840 (bp = allocb_tryhard(1)) == NULL) 2841 return (0); 2842 bp->b_datap->db_type = (unsigned char)type; 2843 *bp->b_wptr++ = (unsigned char)param; 2844 2845 put(q, bp); 2846 2847 return (1); 2848 } 2849 2850 int 2851 putnextctl1(queue_t *q, int type, int param) 2852 { 2853 mblk_t *bp; 2854 2855 if ((datamsg(type) && (type != M_DELAY)) || 2856 ((bp = allocb_tryhard(1)) == NULL)) 2857 return (0); 2858 2859 bp->b_datap->db_type = (unsigned char)type; 2860 *bp->b_wptr++ = (unsigned char)param; 2861 2862 putnext(q, bp); 2863 2864 return (1); 2865 } 2866 2867 int 2868 putnextctl(queue_t *q, int type) 2869 { 2870 mblk_t *bp; 2871 2872 if ((datamsg(type) && (type != M_DELAY)) || 2873 ((bp = allocb_tryhard(0)) == NULL)) 2874 return (0); 2875 bp->b_datap->db_type = (unsigned char)type; 2876 2877 putnext(q, bp); 2878 2879 return (1); 2880 } 2881 2882 /* 2883 * Return the queue upstream from this one 2884 */ 2885 queue_t * 2886 backq(queue_t *q) 2887 { 2888 q = _OTHERQ(q); 2889 if (q->q_next) { 2890 q = q->q_next; 2891 return (_OTHERQ(q)); 2892 } 2893 return (NULL); 2894 } 2895 2896 /* 2897 * Send a block back up the queue in reverse from this 2898 * one (e.g. to respond to ioctls) 2899 */ 2900 void 2901 qreply(queue_t *q, mblk_t *bp) 2902 { 2903 ASSERT(q && bp); 2904 2905 putnext(_OTHERQ(q), bp); 2906 } 2907 2908 /* 2909 * Streams Queue Scheduling 2910 * 2911 * Queues are enabled through qenable() when they have messages to 2912 * process. They are serviced by queuerun(), which runs each enabled 2913 * queue's service procedure. The call to queuerun() is processor 2914 * dependent - the general principle is that it be run whenever a queue 2915 * is enabled but before returning to user level. For system calls, 2916 * the function runqueues() is called if their action causes a queue 2917 * to be enabled. For device interrupts, queuerun() should be 2918 * called before returning from the last level of interrupt. Beyond 2919 * this, no timing assumptions should be made about queue scheduling. 2920 */ 2921 2922 /* 2923 * Enable a queue: put it on list of those whose service procedures are 2924 * ready to run and set up the scheduling mechanism. 2925 * The broadcast is done outside the mutex -> to avoid the woken thread 2926 * from contending with the mutex. This is OK 'cos the queue has been 2927 * enqueued on the runlist and flagged safely at this point. 2928 */ 2929 void 2930 qenable(queue_t *q) 2931 { 2932 mutex_enter(QLOCK(q)); 2933 qenable_locked(q); 2934 mutex_exit(QLOCK(q)); 2935 } 2936 /* 2937 * Return number of messages on queue 2938 */ 2939 int 2940 qsize(queue_t *qp) 2941 { 2942 int count = 0; 2943 mblk_t *mp; 2944 2945 mutex_enter(QLOCK(qp)); 2946 for (mp = qp->q_first; mp; mp = mp->b_next) 2947 count++; 2948 mutex_exit(QLOCK(qp)); 2949 return (count); 2950 } 2951 2952 /* 2953 * noenable - set queue so that putq() will not enable it. 2954 * enableok - set queue so that putq() can enable it. 2955 */ 2956 void 2957 noenable(queue_t *q) 2958 { 2959 mutex_enter(QLOCK(q)); 2960 q->q_flag |= QNOENB; 2961 mutex_exit(QLOCK(q)); 2962 } 2963 2964 void 2965 enableok(queue_t *q) 2966 { 2967 mutex_enter(QLOCK(q)); 2968 q->q_flag &= ~QNOENB; 2969 mutex_exit(QLOCK(q)); 2970 } 2971 2972 /* 2973 * Set queue fields. 2974 */ 2975 int 2976 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 2977 { 2978 qband_t *qbp = NULL; 2979 queue_t *wrq; 2980 int error = 0; 2981 kthread_id_t freezer; 2982 2983 freezer = STREAM(q)->sd_freezer; 2984 if (freezer == curthread) { 2985 ASSERT(frozenstr(q)); 2986 ASSERT(MUTEX_HELD(QLOCK(q))); 2987 } else 2988 mutex_enter(QLOCK(q)); 2989 2990 if (what >= QBAD) { 2991 error = EINVAL; 2992 goto done; 2993 } 2994 if (pri != 0) { 2995 int i; 2996 qband_t **qbpp; 2997 2998 if (pri > q->q_nband) { 2999 qbpp = &q->q_bandp; 3000 while (*qbpp) 3001 qbpp = &(*qbpp)->qb_next; 3002 while (pri > q->q_nband) { 3003 if ((*qbpp = allocband()) == NULL) { 3004 error = EAGAIN; 3005 goto done; 3006 } 3007 (*qbpp)->qb_hiwat = q->q_hiwat; 3008 (*qbpp)->qb_lowat = q->q_lowat; 3009 q->q_nband++; 3010 qbpp = &(*qbpp)->qb_next; 3011 } 3012 } 3013 qbp = q->q_bandp; 3014 i = pri; 3015 while (--i) 3016 qbp = qbp->qb_next; 3017 } 3018 switch (what) { 3019 3020 case QHIWAT: 3021 if (qbp) 3022 qbp->qb_hiwat = (size_t)val; 3023 else 3024 q->q_hiwat = (size_t)val; 3025 break; 3026 3027 case QLOWAT: 3028 if (qbp) 3029 qbp->qb_lowat = (size_t)val; 3030 else 3031 q->q_lowat = (size_t)val; 3032 break; 3033 3034 case QMAXPSZ: 3035 if (qbp) 3036 error = EINVAL; 3037 else 3038 q->q_maxpsz = (ssize_t)val; 3039 3040 /* 3041 * Performance concern, strwrite looks at the module below 3042 * the stream head for the maxpsz each time it does a write 3043 * we now cache it at the stream head. Check to see if this 3044 * queue is sitting directly below the stream head. 3045 */ 3046 wrq = STREAM(q)->sd_wrq; 3047 if (q != wrq->q_next) 3048 break; 3049 3050 /* 3051 * If the stream is not frozen drop the current QLOCK and 3052 * acquire the sd_wrq QLOCK which protects sd_qn_* 3053 */ 3054 if (freezer != curthread) { 3055 mutex_exit(QLOCK(q)); 3056 mutex_enter(QLOCK(wrq)); 3057 } 3058 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3059 3060 if (strmsgsz != 0) { 3061 if (val == INFPSZ) 3062 val = strmsgsz; 3063 else { 3064 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3065 val = MIN(PIPE_BUF, val); 3066 else 3067 val = MIN(strmsgsz, val); 3068 } 3069 } 3070 STREAM(q)->sd_qn_maxpsz = val; 3071 if (freezer != curthread) { 3072 mutex_exit(QLOCK(wrq)); 3073 mutex_enter(QLOCK(q)); 3074 } 3075 break; 3076 3077 case QMINPSZ: 3078 if (qbp) 3079 error = EINVAL; 3080 else 3081 q->q_minpsz = (ssize_t)val; 3082 3083 /* 3084 * Performance concern, strwrite looks at the module below 3085 * the stream head for the maxpsz each time it does a write 3086 * we now cache it at the stream head. Check to see if this 3087 * queue is sitting directly below the stream head. 3088 */ 3089 wrq = STREAM(q)->sd_wrq; 3090 if (q != wrq->q_next) 3091 break; 3092 3093 /* 3094 * If the stream is not frozen drop the current QLOCK and 3095 * acquire the sd_wrq QLOCK which protects sd_qn_* 3096 */ 3097 if (freezer != curthread) { 3098 mutex_exit(QLOCK(q)); 3099 mutex_enter(QLOCK(wrq)); 3100 } 3101 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3102 3103 if (freezer != curthread) { 3104 mutex_exit(QLOCK(wrq)); 3105 mutex_enter(QLOCK(q)); 3106 } 3107 break; 3108 3109 case QSTRUIOT: 3110 if (qbp) 3111 error = EINVAL; 3112 else 3113 q->q_struiot = (ushort_t)val; 3114 break; 3115 3116 case QCOUNT: 3117 case QFIRST: 3118 case QLAST: 3119 case QFLAG: 3120 error = EPERM; 3121 break; 3122 3123 default: 3124 error = EINVAL; 3125 break; 3126 } 3127 done: 3128 if (freezer != curthread) 3129 mutex_exit(QLOCK(q)); 3130 return (error); 3131 } 3132 3133 /* 3134 * Get queue fields. 3135 */ 3136 int 3137 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3138 { 3139 qband_t *qbp = NULL; 3140 int error = 0; 3141 kthread_id_t freezer; 3142 3143 freezer = STREAM(q)->sd_freezer; 3144 if (freezer == curthread) { 3145 ASSERT(frozenstr(q)); 3146 ASSERT(MUTEX_HELD(QLOCK(q))); 3147 } else 3148 mutex_enter(QLOCK(q)); 3149 if (what >= QBAD) { 3150 error = EINVAL; 3151 goto done; 3152 } 3153 if (pri != 0) { 3154 int i; 3155 qband_t **qbpp; 3156 3157 if (pri > q->q_nband) { 3158 qbpp = &q->q_bandp; 3159 while (*qbpp) 3160 qbpp = &(*qbpp)->qb_next; 3161 while (pri > q->q_nband) { 3162 if ((*qbpp = allocband()) == NULL) { 3163 error = EAGAIN; 3164 goto done; 3165 } 3166 (*qbpp)->qb_hiwat = q->q_hiwat; 3167 (*qbpp)->qb_lowat = q->q_lowat; 3168 q->q_nband++; 3169 qbpp = &(*qbpp)->qb_next; 3170 } 3171 } 3172 qbp = q->q_bandp; 3173 i = pri; 3174 while (--i) 3175 qbp = qbp->qb_next; 3176 } 3177 switch (what) { 3178 case QHIWAT: 3179 if (qbp) 3180 *(size_t *)valp = qbp->qb_hiwat; 3181 else 3182 *(size_t *)valp = q->q_hiwat; 3183 break; 3184 3185 case QLOWAT: 3186 if (qbp) 3187 *(size_t *)valp = qbp->qb_lowat; 3188 else 3189 *(size_t *)valp = q->q_lowat; 3190 break; 3191 3192 case QMAXPSZ: 3193 if (qbp) 3194 error = EINVAL; 3195 else 3196 *(ssize_t *)valp = q->q_maxpsz; 3197 break; 3198 3199 case QMINPSZ: 3200 if (qbp) 3201 error = EINVAL; 3202 else 3203 *(ssize_t *)valp = q->q_minpsz; 3204 break; 3205 3206 case QCOUNT: 3207 if (qbp) 3208 *(size_t *)valp = qbp->qb_count; 3209 else 3210 *(size_t *)valp = q->q_count; 3211 break; 3212 3213 case QFIRST: 3214 if (qbp) 3215 *(mblk_t **)valp = qbp->qb_first; 3216 else 3217 *(mblk_t **)valp = q->q_first; 3218 break; 3219 3220 case QLAST: 3221 if (qbp) 3222 *(mblk_t **)valp = qbp->qb_last; 3223 else 3224 *(mblk_t **)valp = q->q_last; 3225 break; 3226 3227 case QFLAG: 3228 if (qbp) 3229 *(uint_t *)valp = qbp->qb_flag; 3230 else 3231 *(uint_t *)valp = q->q_flag; 3232 break; 3233 3234 case QSTRUIOT: 3235 if (qbp) 3236 error = EINVAL; 3237 else 3238 *(short *)valp = q->q_struiot; 3239 break; 3240 3241 default: 3242 error = EINVAL; 3243 break; 3244 } 3245 done: 3246 if (freezer != curthread) 3247 mutex_exit(QLOCK(q)); 3248 return (error); 3249 } 3250 3251 /* 3252 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3253 * QWANTWSYNC or QWANTR or QWANTW, 3254 * 3255 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3256 * deferred wakeup will be done. Also if strpoll() in progress then a 3257 * deferred pollwakeup will be done. 3258 */ 3259 void 3260 strwakeq(queue_t *q, int flag) 3261 { 3262 stdata_t *stp = STREAM(q); 3263 pollhead_t *pl; 3264 3265 mutex_enter(&stp->sd_lock); 3266 pl = &stp->sd_pollist; 3267 if (flag & QWANTWSYNC) { 3268 ASSERT(!(q->q_flag & QREADR)); 3269 if (stp->sd_flag & WSLEEP) { 3270 stp->sd_flag &= ~WSLEEP; 3271 cv_broadcast(&stp->sd_wrq->q_wait); 3272 } else { 3273 stp->sd_wakeq |= WSLEEP; 3274 } 3275 3276 mutex_exit(&stp->sd_lock); 3277 pollwakeup(pl, POLLWRNORM); 3278 mutex_enter(&stp->sd_lock); 3279 3280 if (stp->sd_sigflags & S_WRNORM) 3281 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3282 } else if (flag & QWANTR) { 3283 if (stp->sd_flag & RSLEEP) { 3284 stp->sd_flag &= ~RSLEEP; 3285 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3286 } else { 3287 stp->sd_wakeq |= RSLEEP; 3288 } 3289 3290 mutex_exit(&stp->sd_lock); 3291 pollwakeup(pl, POLLIN | POLLRDNORM); 3292 mutex_enter(&stp->sd_lock); 3293 3294 { 3295 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3296 3297 if (events) 3298 strsendsig(stp->sd_siglist, events, 0, 0); 3299 } 3300 } else { 3301 if (stp->sd_flag & WSLEEP) { 3302 stp->sd_flag &= ~WSLEEP; 3303 cv_broadcast(&stp->sd_wrq->q_wait); 3304 } 3305 3306 mutex_exit(&stp->sd_lock); 3307 pollwakeup(pl, POLLWRNORM); 3308 mutex_enter(&stp->sd_lock); 3309 3310 if (stp->sd_sigflags & S_WRNORM) 3311 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3312 } 3313 mutex_exit(&stp->sd_lock); 3314 } 3315 3316 int 3317 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3318 { 3319 stdata_t *stp = STREAM(q); 3320 int typ = STRUIOT_STANDARD; 3321 uio_t *uiop = &dp->d_uio; 3322 dblk_t *dbp; 3323 ssize_t uiocnt; 3324 ssize_t cnt; 3325 unsigned char *ptr; 3326 ssize_t resid; 3327 int error = 0; 3328 on_trap_data_t otd; 3329 queue_t *stwrq; 3330 3331 /* 3332 * Plumbing may change while taking the type so store the 3333 * queue in a temporary variable. It doesn't matter even 3334 * if the we take the type from the previous plumbing, 3335 * that's because if the plumbing has changed when we were 3336 * holding the queue in a temporary variable, we can continue 3337 * processing the message the way it would have been processed 3338 * in the old plumbing, without any side effects but a bit 3339 * extra processing for partial ip header checksum. 3340 * 3341 * This has been done to avoid holding the sd_lock which is 3342 * very hot. 3343 */ 3344 3345 stwrq = stp->sd_struiowrq; 3346 if (stwrq) 3347 typ = stwrq->q_struiot; 3348 3349 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3350 dbp = mp->b_datap; 3351 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3352 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3353 cnt = MIN(uiocnt, uiop->uio_resid); 3354 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3355 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3356 /* 3357 * Either this mblk has already been processed 3358 * or there is no more room in this mblk (?). 3359 */ 3360 continue; 3361 } 3362 switch (typ) { 3363 case STRUIOT_STANDARD: 3364 if (noblock) { 3365 if (on_trap(&otd, OT_DATA_ACCESS)) { 3366 no_trap(); 3367 error = EWOULDBLOCK; 3368 goto out; 3369 } 3370 } 3371 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3372 if (noblock) 3373 no_trap(); 3374 goto out; 3375 } 3376 if (noblock) 3377 no_trap(); 3378 break; 3379 3380 default: 3381 error = EIO; 3382 goto out; 3383 } 3384 dbp->db_struioflag |= STRUIO_DONE; 3385 dbp->db_cksumstuff += cnt; 3386 } 3387 out: 3388 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3389 /* 3390 * A fault has occured and some bytes were moved to the 3391 * current mblk, the uio_t has already been updated by 3392 * the appropriate uio routine, so also update the mblk 3393 * to reflect this in case this same mblk chain is used 3394 * again (after the fault has been handled). 3395 */ 3396 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3397 if (uiocnt >= resid) 3398 dbp->db_cksumstuff += resid; 3399 } 3400 return (error); 3401 } 3402 3403 /* 3404 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3405 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3406 * that removeq() will not try to close the queue while a thread is inside the 3407 * queue. 3408 */ 3409 static boolean_t 3410 rwnext_enter(queue_t *qp) 3411 { 3412 mutex_enter(QLOCK(qp)); 3413 if (qp->q_flag & QWCLOSE) { 3414 mutex_exit(QLOCK(qp)); 3415 return (B_FALSE); 3416 } 3417 qp->q_rwcnt++; 3418 ASSERT(qp->q_rwcnt != 0); 3419 mutex_exit(QLOCK(qp)); 3420 return (B_TRUE); 3421 } 3422 3423 /* 3424 * Decrease the count of threads running in sync stream queue and wake up any 3425 * threads blocked in removeq(). 3426 */ 3427 static void 3428 rwnext_exit(queue_t *qp) 3429 { 3430 mutex_enter(QLOCK(qp)); 3431 qp->q_rwcnt--; 3432 if (qp->q_flag & QWANTRMQSYNC) { 3433 qp->q_flag &= ~QWANTRMQSYNC; 3434 cv_broadcast(&qp->q_wait); 3435 } 3436 mutex_exit(QLOCK(qp)); 3437 } 3438 3439 /* 3440 * The purpose of rwnext() is to call the rw procedure of the next 3441 * (downstream) modules queue. 3442 * 3443 * treated as put entrypoint for perimeter syncronization. 3444 * 3445 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3446 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3447 * not matter if any regular put entrypoints have been already entered. We 3448 * can't increment one of the sq_putcounts (instead of sq_count) because 3449 * qwait_rw won't know which counter to decrement. 3450 * 3451 * It would be reasonable to add the lockless FASTPUT logic. 3452 */ 3453 int 3454 rwnext(queue_t *qp, struiod_t *dp) 3455 { 3456 queue_t *nqp; 3457 syncq_t *sq; 3458 uint16_t count; 3459 uint16_t flags; 3460 struct qinit *qi; 3461 int (*proc)(); 3462 struct stdata *stp; 3463 int isread; 3464 int rval; 3465 3466 stp = STREAM(qp); 3467 /* 3468 * Prevent q_next from changing by holding sd_lock until acquiring 3469 * SQLOCK. Note that a read-side rwnext from the streamhead will 3470 * already have sd_lock acquired. In either case sd_lock is always 3471 * released after acquiring SQLOCK. 3472 * 3473 * The streamhead read-side holding sd_lock when calling rwnext is 3474 * required to prevent a race condition were M_DATA mblks flowing 3475 * up the read-side of the stream could be bypassed by a rwnext() 3476 * down-call. In this case sd_lock acts as the streamhead perimeter. 3477 */ 3478 if ((nqp = _WR(qp)) == qp) { 3479 isread = 0; 3480 mutex_enter(&stp->sd_lock); 3481 qp = nqp->q_next; 3482 } else { 3483 isread = 1; 3484 if (nqp != stp->sd_wrq) 3485 /* Not streamhead */ 3486 mutex_enter(&stp->sd_lock); 3487 qp = _RD(nqp->q_next); 3488 } 3489 qi = qp->q_qinfo; 3490 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3491 /* 3492 * Not a synchronous module or no r/w procedure for this 3493 * queue, so just return EINVAL and let the caller handle it. 3494 */ 3495 mutex_exit(&stp->sd_lock); 3496 return (EINVAL); 3497 } 3498 3499 if (rwnext_enter(qp) == B_FALSE) { 3500 mutex_exit(&stp->sd_lock); 3501 return (EINVAL); 3502 } 3503 3504 sq = qp->q_syncq; 3505 mutex_enter(SQLOCK(sq)); 3506 mutex_exit(&stp->sd_lock); 3507 count = sq->sq_count; 3508 flags = sq->sq_flags; 3509 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3510 3511 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3512 /* 3513 * if this queue is being closed, return. 3514 */ 3515 if (qp->q_flag & QWCLOSE) { 3516 mutex_exit(SQLOCK(sq)); 3517 rwnext_exit(qp); 3518 return (EINVAL); 3519 } 3520 3521 /* 3522 * Wait until we can enter the inner perimeter. 3523 */ 3524 sq->sq_flags = flags | SQ_WANTWAKEUP; 3525 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3526 count = sq->sq_count; 3527 flags = sq->sq_flags; 3528 } 3529 3530 if (isread == 0 && stp->sd_struiowrq == NULL || 3531 isread == 1 && stp->sd_struiordq == NULL) { 3532 /* 3533 * Stream plumbing changed while waiting for inner perimeter 3534 * so just return EINVAL and let the caller handle it. 3535 */ 3536 mutex_exit(SQLOCK(sq)); 3537 rwnext_exit(qp); 3538 return (EINVAL); 3539 } 3540 if (!(flags & SQ_CIPUT)) 3541 sq->sq_flags = flags | SQ_EXCL; 3542 sq->sq_count = count + 1; 3543 ASSERT(sq->sq_count != 0); /* Wraparound */ 3544 /* 3545 * Note: The only message ordering guarantee that rwnext() makes is 3546 * for the write queue flow-control case. All others (r/w queue 3547 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3548 * the queue's rw procedure. This could be genralized here buy 3549 * running the queue's service procedure, but that wouldn't be 3550 * the most efficent for all cases. 3551 */ 3552 mutex_exit(SQLOCK(sq)); 3553 if (! isread && (qp->q_flag & QFULL)) { 3554 /* 3555 * Write queue may be flow controlled. If so, 3556 * mark the queue for wakeup when it's not. 3557 */ 3558 mutex_enter(QLOCK(qp)); 3559 if (qp->q_flag & QFULL) { 3560 qp->q_flag |= QWANTWSYNC; 3561 mutex_exit(QLOCK(qp)); 3562 rval = EWOULDBLOCK; 3563 goto out; 3564 } 3565 mutex_exit(QLOCK(qp)); 3566 } 3567 3568 if (! isread && dp->d_mp) 3569 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3570 dp->d_mp->b_datap->db_base); 3571 3572 rval = (*proc)(qp, dp); 3573 3574 if (isread && dp->d_mp) 3575 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3576 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3577 out: 3578 /* 3579 * The queue is protected from being freed by sq_count, so it is 3580 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3581 */ 3582 rwnext_exit(qp); 3583 3584 mutex_enter(SQLOCK(sq)); 3585 flags = sq->sq_flags; 3586 ASSERT(sq->sq_count != 0); 3587 sq->sq_count--; 3588 if (flags & SQ_TAIL) { 3589 putnext_tail(sq, qp, flags); 3590 /* 3591 * The only purpose of this ASSERT is to preserve calling stack 3592 * in DEBUG kernel. 3593 */ 3594 ASSERT(flags & SQ_TAIL); 3595 return (rval); 3596 } 3597 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3598 /* 3599 * Safe to always drop SQ_EXCL: 3600 * Not SQ_CIPUT means we set SQ_EXCL above 3601 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3602 * did a qwriter(INNER) in which case nobody else 3603 * is in the inner perimeter and we are exiting. 3604 * 3605 * I would like to make the following assertion: 3606 * 3607 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3608 * sq->sq_count == 0); 3609 * 3610 * which indicates that if we are both putshared and exclusive, 3611 * we became exclusive while executing the putproc, and the only 3612 * claim on the syncq was the one we dropped a few lines above. 3613 * But other threads that enter putnext while the syncq is exclusive 3614 * need to make a claim as they may need to drop SQLOCK in the 3615 * has_writers case to avoid deadlocks. If these threads are 3616 * delayed or preempted, it is possible that the writer thread can 3617 * find out that there are other claims making the (sq_count == 0) 3618 * test invalid. 3619 */ 3620 3621 sq->sq_flags = flags & ~SQ_EXCL; 3622 if (sq->sq_flags & SQ_WANTWAKEUP) { 3623 sq->sq_flags &= ~SQ_WANTWAKEUP; 3624 cv_broadcast(&sq->sq_wait); 3625 } 3626 mutex_exit(SQLOCK(sq)); 3627 return (rval); 3628 } 3629 3630 /* 3631 * The purpose of infonext() is to call the info procedure of the next 3632 * (downstream) modules queue. 3633 * 3634 * treated as put entrypoint for perimeter syncronization. 3635 * 3636 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3637 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3638 * it does not matter if any regular put entrypoints have been already 3639 * entered. 3640 */ 3641 int 3642 infonext(queue_t *qp, infod_t *idp) 3643 { 3644 queue_t *nqp; 3645 syncq_t *sq; 3646 uint16_t count; 3647 uint16_t flags; 3648 struct qinit *qi; 3649 int (*proc)(); 3650 struct stdata *stp; 3651 int rval; 3652 3653 stp = STREAM(qp); 3654 /* 3655 * Prevent q_next from changing by holding sd_lock until 3656 * acquiring SQLOCK. 3657 */ 3658 mutex_enter(&stp->sd_lock); 3659 if ((nqp = _WR(qp)) == qp) { 3660 qp = nqp->q_next; 3661 } else { 3662 qp = _RD(nqp->q_next); 3663 } 3664 qi = qp->q_qinfo; 3665 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3666 mutex_exit(&stp->sd_lock); 3667 return (EINVAL); 3668 } 3669 sq = qp->q_syncq; 3670 mutex_enter(SQLOCK(sq)); 3671 mutex_exit(&stp->sd_lock); 3672 count = sq->sq_count; 3673 flags = sq->sq_flags; 3674 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3675 3676 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3677 /* 3678 * Wait until we can enter the inner perimeter. 3679 */ 3680 sq->sq_flags = flags | SQ_WANTWAKEUP; 3681 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3682 count = sq->sq_count; 3683 flags = sq->sq_flags; 3684 } 3685 3686 if (! (flags & SQ_CIPUT)) 3687 sq->sq_flags = flags | SQ_EXCL; 3688 sq->sq_count = count + 1; 3689 ASSERT(sq->sq_count != 0); /* Wraparound */ 3690 mutex_exit(SQLOCK(sq)); 3691 3692 rval = (*proc)(qp, idp); 3693 3694 mutex_enter(SQLOCK(sq)); 3695 flags = sq->sq_flags; 3696 ASSERT(sq->sq_count != 0); 3697 sq->sq_count--; 3698 if (flags & SQ_TAIL) { 3699 putnext_tail(sq, qp, flags); 3700 /* 3701 * The only purpose of this ASSERT is to preserve calling stack 3702 * in DEBUG kernel. 3703 */ 3704 ASSERT(flags & SQ_TAIL); 3705 return (rval); 3706 } 3707 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3708 /* 3709 * XXXX 3710 * I am not certain the next comment is correct here. I need to consider 3711 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3712 * might cause other problems. It just might be safer to drop it if 3713 * !SQ_CIPUT because that is when we set it. 3714 */ 3715 /* 3716 * Safe to always drop SQ_EXCL: 3717 * Not SQ_CIPUT means we set SQ_EXCL above 3718 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3719 * did a qwriter(INNER) in which case nobody else 3720 * is in the inner perimeter and we are exiting. 3721 * 3722 * I would like to make the following assertion: 3723 * 3724 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3725 * sq->sq_count == 0); 3726 * 3727 * which indicates that if we are both putshared and exclusive, 3728 * we became exclusive while executing the putproc, and the only 3729 * claim on the syncq was the one we dropped a few lines above. 3730 * But other threads that enter putnext while the syncq is exclusive 3731 * need to make a claim as they may need to drop SQLOCK in the 3732 * has_writers case to avoid deadlocks. If these threads are 3733 * delayed or preempted, it is possible that the writer thread can 3734 * find out that there are other claims making the (sq_count == 0) 3735 * test invalid. 3736 */ 3737 3738 sq->sq_flags = flags & ~SQ_EXCL; 3739 mutex_exit(SQLOCK(sq)); 3740 return (rval); 3741 } 3742 3743 /* 3744 * Return nonzero if the queue is responsible for struio(), else return 0. 3745 */ 3746 int 3747 isuioq(queue_t *q) 3748 { 3749 if (q->q_flag & QREADR) 3750 return (STREAM(q)->sd_struiordq == q); 3751 else 3752 return (STREAM(q)->sd_struiowrq == q); 3753 } 3754 3755 #if defined(__sparc) 3756 int disable_putlocks = 0; 3757 #else 3758 int disable_putlocks = 1; 3759 #endif 3760 3761 /* 3762 * called by create_putlock. 3763 */ 3764 static void 3765 create_syncq_putlocks(queue_t *q) 3766 { 3767 syncq_t *sq = q->q_syncq; 3768 ciputctrl_t *cip; 3769 int i; 3770 3771 ASSERT(sq != NULL); 3772 3773 ASSERT(disable_putlocks == 0); 3774 ASSERT(n_ciputctrl >= min_n_ciputctrl); 3775 ASSERT(ciputctrl_cache != NULL); 3776 3777 if (!(sq->sq_type & SQ_CIPUT)) 3778 return; 3779 3780 for (i = 0; i <= 1; i++) { 3781 if (sq->sq_ciputctrl == NULL) { 3782 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3783 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3784 mutex_enter(SQLOCK(sq)); 3785 if (sq->sq_ciputctrl != NULL) { 3786 mutex_exit(SQLOCK(sq)); 3787 kmem_cache_free(ciputctrl_cache, cip); 3788 } else { 3789 ASSERT(sq->sq_nciputctrl == 0); 3790 sq->sq_nciputctrl = n_ciputctrl - 1; 3791 /* 3792 * putnext checks sq_ciputctrl without holding 3793 * SQLOCK. if it is not NULL putnext assumes 3794 * sq_nciputctrl is initialized. membar below 3795 * insures that. 3796 */ 3797 membar_producer(); 3798 sq->sq_ciputctrl = cip; 3799 mutex_exit(SQLOCK(sq)); 3800 } 3801 } 3802 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 3803 if (i == 1) 3804 break; 3805 q = _OTHERQ(q); 3806 if (!(q->q_flag & QPERQ)) { 3807 ASSERT(sq == q->q_syncq); 3808 break; 3809 } 3810 ASSERT(q->q_syncq != NULL); 3811 ASSERT(sq != q->q_syncq); 3812 sq = q->q_syncq; 3813 ASSERT(sq->sq_type & SQ_CIPUT); 3814 } 3815 } 3816 3817 /* 3818 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 3819 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 3820 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 3821 * starting from q and down to the driver. 3822 * 3823 * This should be called after the affected queues are part of stream 3824 * geometry. It should be called from driver/module open routine after 3825 * qprocson() call. It is also called from nfs syscall where it is known that 3826 * stream is configured and won't change its geometry during create_putlock 3827 * call. 3828 * 3829 * caller normally uses 0 value for the stream argument to speed up MT putnext 3830 * into the perimeter of q for example because its perimeter is per module 3831 * (e.g. IP). 3832 * 3833 * caller normally uses non 0 value for the stream argument to hint the system 3834 * that the stream of q is a very contended global system stream 3835 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 3836 * particularly MT hot. 3837 * 3838 * Caller insures stream plumbing won't happen while we are here and therefore 3839 * q_next can be safely used. 3840 */ 3841 3842 void 3843 create_putlocks(queue_t *q, int stream) 3844 { 3845 ciputctrl_t *cip; 3846 struct stdata *stp = STREAM(q); 3847 3848 q = _WR(q); 3849 ASSERT(stp != NULL); 3850 3851 if (disable_putlocks != 0) 3852 return; 3853 3854 if (n_ciputctrl < min_n_ciputctrl) 3855 return; 3856 3857 ASSERT(ciputctrl_cache != NULL); 3858 3859 if (stream != 0 && stp->sd_ciputctrl == NULL) { 3860 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 3861 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 3862 mutex_enter(&stp->sd_lock); 3863 if (stp->sd_ciputctrl != NULL) { 3864 mutex_exit(&stp->sd_lock); 3865 kmem_cache_free(ciputctrl_cache, cip); 3866 } else { 3867 ASSERT(stp->sd_nciputctrl == 0); 3868 stp->sd_nciputctrl = n_ciputctrl - 1; 3869 /* 3870 * putnext checks sd_ciputctrl without holding 3871 * sd_lock. if it is not NULL putnext assumes 3872 * sd_nciputctrl is initialized. membar below 3873 * insures that. 3874 */ 3875 membar_producer(); 3876 stp->sd_ciputctrl = cip; 3877 mutex_exit(&stp->sd_lock); 3878 } 3879 } 3880 3881 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 3882 3883 while (_SAMESTR(q)) { 3884 create_syncq_putlocks(q); 3885 if (stream == 0) 3886 return; 3887 q = q->q_next; 3888 } 3889 ASSERT(q != NULL); 3890 create_syncq_putlocks(q); 3891 } 3892 3893 /* 3894 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 3895 * through a stream. 3896 * 3897 * Data currently record per event is a hrtime stamp, queue address, event 3898 * type, and a per type datum. Much of the STREAMS framework is instrumented 3899 * for automatic flow tracing (when enabled). Events can be defined and used 3900 * by STREAMS modules and drivers. 3901 * 3902 * Global objects: 3903 * 3904 * str_ftevent() - Add a flow-trace event to a dblk. 3905 * str_ftfree() - Free flow-trace data 3906 * 3907 * Local objects: 3908 * 3909 * fthdr_cache - pointer to the kmem cache for trace header. 3910 * ftblk_cache - pointer to the kmem cache for trace data blocks. 3911 */ 3912 3913 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 3914 3915 void 3916 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 3917 { 3918 ftblk_t *bp = hp->tail; 3919 ftblk_t *nbp; 3920 ftevnt_t *ep; 3921 int ix, nix; 3922 3923 ASSERT(hp != NULL); 3924 3925 for (;;) { 3926 if ((ix = bp->ix) == FTBLK_EVNTS) { 3927 /* 3928 * Tail doesn't have room, so need a new tail. 3929 * 3930 * To make this MT safe, first, allocate a new 3931 * ftblk, and initialize it. To make life a 3932 * little easier, reserve the first slot (mostly 3933 * by making ix = 1). When we are finished with 3934 * the initialization, CAS this pointer to the 3935 * tail. If this succeeds, this is the new 3936 * "next" block. Otherwise, another thread 3937 * got here first, so free the block and start 3938 * again. 3939 */ 3940 if (!(nbp = kmem_cache_alloc(ftblk_cache, 3941 KM_NOSLEEP))) { 3942 /* no mem, so punt */ 3943 str_ftnever++; 3944 /* free up all flow data? */ 3945 return; 3946 } 3947 nbp->nxt = NULL; 3948 nbp->ix = 1; 3949 /* 3950 * Just in case there is another thread about 3951 * to get the next index, we need to make sure 3952 * the value is there for it. 3953 */ 3954 membar_producer(); 3955 if (casptr(&hp->tail, bp, nbp) == bp) { 3956 /* CAS was successful */ 3957 bp->nxt = nbp; 3958 membar_producer(); 3959 bp = nbp; 3960 ix = 0; 3961 goto cas_good; 3962 } else { 3963 kmem_cache_free(ftblk_cache, nbp); 3964 bp = hp->tail; 3965 continue; 3966 } 3967 } 3968 nix = ix + 1; 3969 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 3970 cas_good: 3971 if (curthread != hp->thread) { 3972 hp->thread = curthread; 3973 evnt |= FTEV_CS; 3974 } 3975 if (CPU->cpu_seqid != hp->cpu_seqid) { 3976 hp->cpu_seqid = CPU->cpu_seqid; 3977 evnt |= FTEV_PS; 3978 } 3979 ep = &bp->ev[ix]; 3980 break; 3981 } 3982 } 3983 3984 if (evnt & FTEV_QMASK) { 3985 queue_t *qp = p; 3986 3987 /* 3988 * It is possible that the module info is broke 3989 * (as is logsubr.c at this comment writing). 3990 * Instead of panicing or doing other unmentionables, 3991 * we shall put a dummy name as the mid, and continue. 3992 */ 3993 if (qp->q_qinfo == NULL) 3994 ep->mid = "NONAME"; 3995 else 3996 ep->mid = qp->q_qinfo->qi_minfo->mi_idname; 3997 3998 if (!(qp->q_flag & QREADR)) 3999 evnt |= FTEV_ISWR; 4000 } else { 4001 ep->mid = (char *)p; 4002 } 4003 4004 ep->ts = gethrtime(); 4005 ep->evnt = evnt; 4006 ep->data = data; 4007 hp->hash = (hp->hash << 9) + hp->hash; 4008 hp->hash += (evnt << 16) | data; 4009 hp->hash += (uintptr_t)ep->mid; 4010 } 4011 4012 /* 4013 * Free flow-trace data. 4014 */ 4015 void 4016 str_ftfree(dblk_t *dbp) 4017 { 4018 fthdr_t *hp = dbp->db_fthdr; 4019 ftblk_t *bp = &hp->first; 4020 ftblk_t *nbp; 4021 4022 if (bp != hp->tail || bp->ix != 0) { 4023 /* 4024 * Clear out the hash, have the tail point to itself, and free 4025 * any continuation blocks. 4026 */ 4027 bp = hp->first.nxt; 4028 hp->tail = &hp->first; 4029 hp->hash = 0; 4030 hp->first.nxt = NULL; 4031 hp->first.ix = 0; 4032 while (bp != NULL) { 4033 nbp = bp->nxt; 4034 kmem_cache_free(ftblk_cache, bp); 4035 bp = nbp; 4036 } 4037 } 4038 kmem_cache_free(fthdr_cache, hp); 4039 dbp->db_fthdr = NULL; 4040 } 4041