1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 /* 25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 */ 28 29 #include <sys/types.h> 30 #include <sys/param.h> 31 #include <sys/thread.h> 32 #include <sys/sysmacros.h> 33 #include <sys/stropts.h> 34 #include <sys/stream.h> 35 #include <sys/strsubr.h> 36 #include <sys/strsun.h> 37 #include <sys/conf.h> 38 #include <sys/debug.h> 39 #include <sys/cmn_err.h> 40 #include <sys/kmem.h> 41 #include <sys/atomic.h> 42 #include <sys/errno.h> 43 #include <sys/vtrace.h> 44 #include <sys/ftrace.h> 45 #include <sys/ontrap.h> 46 #include <sys/multidata.h> 47 #include <sys/multidata_impl.h> 48 #include <sys/sdt.h> 49 #include <sys/strft.h> 50 51 #ifdef DEBUG 52 #include <sys/kmem_impl.h> 53 #endif 54 55 /* 56 * This file contains all the STREAMS utility routines that may 57 * be used by modules and drivers. 58 */ 59 60 /* 61 * STREAMS message allocator: principles of operation 62 * 63 * The streams message allocator consists of all the routines that 64 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 65 * dupb(), freeb() and freemsg(). What follows is a high-level view 66 * of how the allocator works. 67 * 68 * Every streams message consists of one or more mblks, a dblk, and data. 69 * All mblks for all types of messages come from a common mblk_cache. 70 * The dblk and data come in several flavors, depending on how the 71 * message is allocated: 72 * 73 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 74 * fixed-size dblk/data caches. For message sizes that are multiples of 75 * PAGESIZE, dblks are allocated separately from the buffer. 76 * The associated buffer is allocated by the constructor using kmem_alloc(). 77 * For all other message sizes, dblk and its associated data is allocated 78 * as a single contiguous chunk of memory. 79 * Objects in these caches consist of a dblk plus its associated data. 80 * allocb() determines the nearest-size cache by table lookup: 81 * the dblk_cache[] array provides the mapping from size to dblk cache. 82 * 83 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 84 * kmem_alloc()'ing a buffer for the data and supplying that 85 * buffer to gesballoc(), described below. 86 * 87 * (3) The four flavors of [d]esballoc[a] are all implemented by a 88 * common routine, gesballoc() ("generic esballoc"). gesballoc() 89 * allocates a dblk from the global dblk_esb_cache and sets db_base, 90 * db_lim and db_frtnp to describe the caller-supplied buffer. 91 * 92 * While there are several routines to allocate messages, there is only 93 * one routine to free messages: freeb(). freeb() simply invokes the 94 * dblk's free method, dbp->db_free(), which is set at allocation time. 95 * 96 * dupb() creates a new reference to a message by allocating a new mblk, 97 * incrementing the dblk reference count and setting the dblk's free 98 * method to dblk_decref(). The dblk's original free method is retained 99 * in db_lastfree. dblk_decref() decrements the reference count on each 100 * freeb(). If this is not the last reference it just frees the mblk; 101 * if this *is* the last reference, it restores db_free to db_lastfree, 102 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 103 * 104 * The implementation makes aggressive use of kmem object caching for 105 * maximum performance. This makes the code simple and compact, but 106 * also a bit abstruse in some places. The invariants that constitute a 107 * message's constructed state, described below, are more subtle than usual. 108 * 109 * Every dblk has an "attached mblk" as part of its constructed state. 110 * The mblk is allocated by the dblk's constructor and remains attached 111 * until the message is either dup'ed or pulled up. In the dupb() case 112 * the mblk association doesn't matter until the last free, at which time 113 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 114 * the mblk association because it swaps the leading mblks of two messages, 115 * so it is responsible for swapping their db_mblk pointers accordingly. 116 * From a constructed-state viewpoint it doesn't matter that a dblk's 117 * attached mblk can change while the message is allocated; all that 118 * matters is that the dblk has *some* attached mblk when it's freed. 119 * 120 * The sizes of the allocb() small-message caches are not magical. 121 * They represent a good trade-off between internal and external 122 * fragmentation for current workloads. They should be reevaluated 123 * periodically, especially if allocations larger than DBLK_MAX_CACHE 124 * become common. We use 64-byte alignment so that dblks don't 125 * straddle cache lines unnecessarily. 126 */ 127 #define DBLK_MAX_CACHE 73728 128 #define DBLK_CACHE_ALIGN 64 129 #define DBLK_MIN_SIZE 8 130 #define DBLK_SIZE_SHIFT 3 131 132 #ifdef _BIG_ENDIAN 133 #define DBLK_RTFU_SHIFT(field) \ 134 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 135 #else 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 138 #endif 139 140 #define DBLK_RTFU(ref, type, flags, uioflag) \ 141 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 142 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 143 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 144 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 145 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 146 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 148 149 static size_t dblk_sizes[] = { 150 #ifdef _LP64 151 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856, 152 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624, 153 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392, 154 #else 155 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904, 156 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672, 157 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440, 158 #endif 159 DBLK_MAX_CACHE, 0 160 }; 161 162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 163 static struct kmem_cache *mblk_cache; 164 static struct kmem_cache *dblk_esb_cache; 165 static struct kmem_cache *fthdr_cache; 166 static struct kmem_cache *ftblk_cache; 167 168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 169 static mblk_t *allocb_oversize(size_t size, int flags); 170 static int allocb_tryhard_fails; 171 static void frnop_func(void *arg); 172 frtn_t frnop = { frnop_func }; 173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 174 175 static boolean_t rwnext_enter(queue_t *qp); 176 static void rwnext_exit(queue_t *qp); 177 178 /* 179 * Patchable mblk/dblk kmem_cache flags. 180 */ 181 int dblk_kmem_flags = 0; 182 int mblk_kmem_flags = 0; 183 184 static int 185 dblk_constructor(void *buf, void *cdrarg, int kmflags) 186 { 187 dblk_t *dbp = buf; 188 ssize_t msg_size = (ssize_t)cdrarg; 189 size_t index; 190 191 ASSERT(msg_size != 0); 192 193 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 194 195 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 196 197 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 198 return (-1); 199 if ((msg_size & PAGEOFFSET) == 0) { 200 dbp->db_base = kmem_alloc(msg_size, kmflags); 201 if (dbp->db_base == NULL) { 202 kmem_cache_free(mblk_cache, dbp->db_mblk); 203 return (-1); 204 } 205 } else { 206 dbp->db_base = (unsigned char *)&dbp[1]; 207 } 208 209 dbp->db_mblk->b_datap = dbp; 210 dbp->db_cache = dblk_cache[index]; 211 dbp->db_lim = dbp->db_base + msg_size; 212 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 213 dbp->db_frtnp = NULL; 214 dbp->db_fthdr = NULL; 215 dbp->db_credp = NULL; 216 dbp->db_cpid = -1; 217 dbp->db_struioflag = 0; 218 dbp->db_struioun.cksum.flags = 0; 219 return (0); 220 } 221 222 /*ARGSUSED*/ 223 static int 224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 225 { 226 dblk_t *dbp = buf; 227 228 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 229 return (-1); 230 dbp->db_mblk->b_datap = dbp; 231 dbp->db_cache = dblk_esb_cache; 232 dbp->db_fthdr = NULL; 233 dbp->db_credp = NULL; 234 dbp->db_cpid = -1; 235 dbp->db_struioflag = 0; 236 dbp->db_struioun.cksum.flags = 0; 237 return (0); 238 } 239 240 static int 241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 242 { 243 dblk_t *dbp = buf; 244 bcache_t *bcp = cdrarg; 245 246 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 247 return (-1); 248 249 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags); 250 if (dbp->db_base == NULL) { 251 kmem_cache_free(mblk_cache, dbp->db_mblk); 252 return (-1); 253 } 254 255 dbp->db_mblk->b_datap = dbp; 256 dbp->db_cache = (void *)bcp; 257 dbp->db_lim = dbp->db_base + bcp->size; 258 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 259 dbp->db_frtnp = NULL; 260 dbp->db_fthdr = NULL; 261 dbp->db_credp = NULL; 262 dbp->db_cpid = -1; 263 dbp->db_struioflag = 0; 264 dbp->db_struioun.cksum.flags = 0; 265 return (0); 266 } 267 268 /*ARGSUSED*/ 269 static void 270 dblk_destructor(void *buf, void *cdrarg) 271 { 272 dblk_t *dbp = buf; 273 ssize_t msg_size = (ssize_t)cdrarg; 274 275 ASSERT(dbp->db_mblk->b_datap == dbp); 276 ASSERT(msg_size != 0); 277 ASSERT(dbp->db_struioflag == 0); 278 ASSERT(dbp->db_struioun.cksum.flags == 0); 279 280 if ((msg_size & PAGEOFFSET) == 0) { 281 kmem_free(dbp->db_base, msg_size); 282 } 283 284 kmem_cache_free(mblk_cache, dbp->db_mblk); 285 } 286 287 static void 288 bcache_dblk_destructor(void *buf, void *cdrarg) 289 { 290 dblk_t *dbp = buf; 291 bcache_t *bcp = cdrarg; 292 293 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 294 295 ASSERT(dbp->db_mblk->b_datap == dbp); 296 ASSERT(dbp->db_struioflag == 0); 297 ASSERT(dbp->db_struioun.cksum.flags == 0); 298 299 kmem_cache_free(mblk_cache, dbp->db_mblk); 300 } 301 302 /* ARGSUSED */ 303 static int 304 ftblk_constructor(void *buf, void *cdrarg, int kmflags) 305 { 306 ftblk_t *fbp = buf; 307 int i; 308 309 bzero(fbp, sizeof (ftblk_t)); 310 if (str_ftstack != 0) { 311 for (i = 0; i < FTBLK_EVNTS; i++) 312 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags); 313 } 314 315 return (0); 316 } 317 318 /* ARGSUSED */ 319 static void 320 ftblk_destructor(void *buf, void *cdrarg) 321 { 322 ftblk_t *fbp = buf; 323 int i; 324 325 if (str_ftstack != 0) { 326 for (i = 0; i < FTBLK_EVNTS; i++) { 327 if (fbp->ev[i].stk != NULL) { 328 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t)); 329 fbp->ev[i].stk = NULL; 330 } 331 } 332 } 333 } 334 335 static int 336 fthdr_constructor(void *buf, void *cdrarg, int kmflags) 337 { 338 fthdr_t *fhp = buf; 339 340 return (ftblk_constructor(&fhp->first, cdrarg, kmflags)); 341 } 342 343 static void 344 fthdr_destructor(void *buf, void *cdrarg) 345 { 346 fthdr_t *fhp = buf; 347 348 ftblk_destructor(&fhp->first, cdrarg); 349 } 350 351 void 352 streams_msg_init(void) 353 { 354 char name[40]; 355 size_t size; 356 size_t lastsize = DBLK_MIN_SIZE; 357 size_t *sizep; 358 struct kmem_cache *cp; 359 size_t tot_size; 360 int offset; 361 362 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32, 363 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags); 364 365 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 366 367 if ((offset = (size & PAGEOFFSET)) != 0) { 368 /* 369 * We are in the middle of a page, dblk should 370 * be allocated on the same page 371 */ 372 tot_size = size + sizeof (dblk_t); 373 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 374 < PAGESIZE); 375 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 376 377 } else { 378 379 /* 380 * buf size is multiple of page size, dblk and 381 * buffer are allocated separately. 382 */ 383 384 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 385 tot_size = sizeof (dblk_t); 386 } 387 388 (void) sprintf(name, "streams_dblk_%ld", size); 389 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN, 390 dblk_constructor, dblk_destructor, NULL, (void *)(size), 391 NULL, dblk_kmem_flags); 392 393 while (lastsize <= size) { 394 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 395 lastsize += DBLK_MIN_SIZE; 396 } 397 } 398 399 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t), 400 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL, 401 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags); 402 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32, 403 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0); 404 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32, 405 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0); 406 407 /* Initialize Multidata caches */ 408 mmd_init(); 409 410 /* initialize throttling queue for esballoc */ 411 esballoc_queue_init(); 412 } 413 414 /*ARGSUSED*/ 415 mblk_t * 416 allocb(size_t size, uint_t pri) 417 { 418 dblk_t *dbp; 419 mblk_t *mp; 420 size_t index; 421 422 index = (size - 1) >> DBLK_SIZE_SHIFT; 423 424 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 425 if (size != 0) { 426 mp = allocb_oversize(size, KM_NOSLEEP); 427 goto out; 428 } 429 index = 0; 430 } 431 432 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 433 mp = NULL; 434 goto out; 435 } 436 437 mp = dbp->db_mblk; 438 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 439 mp->b_next = mp->b_prev = mp->b_cont = NULL; 440 mp->b_rptr = mp->b_wptr = dbp->db_base; 441 mp->b_queue = NULL; 442 MBLK_BAND_FLAG_WORD(mp) = 0; 443 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 444 out: 445 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 446 447 return (mp); 448 } 449 450 /* 451 * Allocate an mblk taking db_credp and db_cpid from the template. 452 * Allow the cred to be NULL. 453 */ 454 mblk_t * 455 allocb_tmpl(size_t size, const mblk_t *tmpl) 456 { 457 mblk_t *mp = allocb(size, 0); 458 459 if (mp != NULL) { 460 dblk_t *src = tmpl->b_datap; 461 dblk_t *dst = mp->b_datap; 462 cred_t *cr; 463 pid_t cpid; 464 465 cr = msg_getcred(tmpl, &cpid); 466 if (cr != NULL) 467 crhold(dst->db_credp = cr); 468 dst->db_cpid = cpid; 469 dst->db_type = src->db_type; 470 } 471 return (mp); 472 } 473 474 mblk_t * 475 allocb_cred(size_t size, cred_t *cr, pid_t cpid) 476 { 477 mblk_t *mp = allocb(size, 0); 478 479 ASSERT(cr != NULL); 480 if (mp != NULL) { 481 dblk_t *dbp = mp->b_datap; 482 483 crhold(dbp->db_credp = cr); 484 dbp->db_cpid = cpid; 485 } 486 return (mp); 487 } 488 489 mblk_t * 490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid) 491 { 492 mblk_t *mp = allocb_wait(size, 0, flags, error); 493 494 ASSERT(cr != NULL); 495 if (mp != NULL) { 496 dblk_t *dbp = mp->b_datap; 497 498 crhold(dbp->db_credp = cr); 499 dbp->db_cpid = cpid; 500 } 501 502 return (mp); 503 } 504 505 /* 506 * Extract the db_cred (and optionally db_cpid) from a message. 507 * We find the first mblk which has a non-NULL db_cred and use that. 508 * If none found we return NULL. 509 * Does NOT get a hold on the cred. 510 */ 511 cred_t * 512 msg_getcred(const mblk_t *mp, pid_t *cpidp) 513 { 514 cred_t *cr = NULL; 515 cred_t *cr2; 516 mblk_t *mp2; 517 518 while (mp != NULL) { 519 dblk_t *dbp = mp->b_datap; 520 521 cr = dbp->db_credp; 522 if (cr == NULL) { 523 mp = mp->b_cont; 524 continue; 525 } 526 if (cpidp != NULL) 527 *cpidp = dbp->db_cpid; 528 529 #ifdef DEBUG 530 /* 531 * Normally there should at most one db_credp in a message. 532 * But if there are multiple (as in the case of some M_IOC* 533 * and some internal messages in TCP/IP bind logic) then 534 * they must be identical in the normal case. 535 * However, a socket can be shared between different uids 536 * in which case data queued in TCP would be from different 537 * creds. Thus we can only assert for the zoneid being the 538 * same. Due to Multi-level Level Ports for TX, some 539 * cred_t can have a NULL cr_zone, and we skip the comparison 540 * in that case. 541 */ 542 mp2 = mp->b_cont; 543 while (mp2 != NULL) { 544 cr2 = DB_CRED(mp2); 545 if (cr2 != NULL) { 546 DTRACE_PROBE2(msg__getcred, 547 cred_t *, cr, cred_t *, cr2); 548 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 549 crgetzone(cr) == NULL || 550 crgetzone(cr2) == NULL); 551 } 552 mp2 = mp2->b_cont; 553 } 554 #endif 555 return (cr); 556 } 557 if (cpidp != NULL) 558 *cpidp = NOPID; 559 return (NULL); 560 } 561 562 /* 563 * Variant of msg_getcred which, when a cred is found 564 * 1. Returns with a hold on the cred 565 * 2. Clears the first cred in the mblk. 566 * This is more efficient to use than a msg_getcred() + crhold() when 567 * the message is freed after the cred has been extracted. 568 * 569 * The caller is responsible for ensuring that there is no other reference 570 * on the message since db_credp can not be cleared when there are other 571 * references. 572 */ 573 cred_t * 574 msg_extractcred(mblk_t *mp, pid_t *cpidp) 575 { 576 cred_t *cr = NULL; 577 cred_t *cr2; 578 mblk_t *mp2; 579 580 while (mp != NULL) { 581 dblk_t *dbp = mp->b_datap; 582 583 cr = dbp->db_credp; 584 if (cr == NULL) { 585 mp = mp->b_cont; 586 continue; 587 } 588 ASSERT(dbp->db_ref == 1); 589 dbp->db_credp = NULL; 590 if (cpidp != NULL) 591 *cpidp = dbp->db_cpid; 592 #ifdef DEBUG 593 /* 594 * Normally there should at most one db_credp in a message. 595 * But if there are multiple (as in the case of some M_IOC* 596 * and some internal messages in TCP/IP bind logic) then 597 * they must be identical in the normal case. 598 * However, a socket can be shared between different uids 599 * in which case data queued in TCP would be from different 600 * creds. Thus we can only assert for the zoneid being the 601 * same. Due to Multi-level Level Ports for TX, some 602 * cred_t can have a NULL cr_zone, and we skip the comparison 603 * in that case. 604 */ 605 mp2 = mp->b_cont; 606 while (mp2 != NULL) { 607 cr2 = DB_CRED(mp2); 608 if (cr2 != NULL) { 609 DTRACE_PROBE2(msg__extractcred, 610 cred_t *, cr, cred_t *, cr2); 611 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 612 crgetzone(cr) == NULL || 613 crgetzone(cr2) == NULL); 614 } 615 mp2 = mp2->b_cont; 616 } 617 #endif 618 return (cr); 619 } 620 return (NULL); 621 } 622 /* 623 * Get the label for a message. Uses the first mblk in the message 624 * which has a non-NULL db_credp. 625 * Returns NULL if there is no credp. 626 */ 627 extern struct ts_label_s * 628 msg_getlabel(const mblk_t *mp) 629 { 630 cred_t *cr = msg_getcred(mp, NULL); 631 632 if (cr == NULL) 633 return (NULL); 634 635 return (crgetlabel(cr)); 636 } 637 638 void 639 freeb(mblk_t *mp) 640 { 641 dblk_t *dbp = mp->b_datap; 642 643 ASSERT(dbp->db_ref > 0); 644 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 645 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 646 647 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 648 649 dbp->db_free(mp, dbp); 650 } 651 652 void 653 freemsg(mblk_t *mp) 654 { 655 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 656 while (mp) { 657 dblk_t *dbp = mp->b_datap; 658 mblk_t *mp_cont = mp->b_cont; 659 660 ASSERT(dbp->db_ref > 0); 661 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 662 663 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 664 665 dbp->db_free(mp, dbp); 666 mp = mp_cont; 667 } 668 } 669 670 /* 671 * Reallocate a block for another use. Try hard to use the old block. 672 * If the old data is wanted (copy), leave b_wptr at the end of the data, 673 * otherwise return b_wptr = b_rptr. 674 * 675 * This routine is private and unstable. 676 */ 677 mblk_t * 678 reallocb(mblk_t *mp, size_t size, uint_t copy) 679 { 680 mblk_t *mp1; 681 unsigned char *old_rptr; 682 ptrdiff_t cur_size; 683 684 if (mp == NULL) 685 return (allocb(size, BPRI_HI)); 686 687 cur_size = mp->b_wptr - mp->b_rptr; 688 old_rptr = mp->b_rptr; 689 690 ASSERT(mp->b_datap->db_ref != 0); 691 692 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 693 /* 694 * If the data is wanted and it will fit where it is, no 695 * work is required. 696 */ 697 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 698 return (mp); 699 700 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 701 mp1 = mp; 702 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 703 /* XXX other mp state could be copied too, db_flags ... ? */ 704 mp1->b_cont = mp->b_cont; 705 } else { 706 return (NULL); 707 } 708 709 if (copy) { 710 bcopy(old_rptr, mp1->b_rptr, cur_size); 711 mp1->b_wptr = mp1->b_rptr + cur_size; 712 } 713 714 if (mp != mp1) 715 freeb(mp); 716 717 return (mp1); 718 } 719 720 static void 721 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 722 { 723 ASSERT(dbp->db_mblk == mp); 724 if (dbp->db_fthdr != NULL) 725 str_ftfree(dbp); 726 727 /* set credp and projid to be 'unspecified' before returning to cache */ 728 if (dbp->db_credp != NULL) { 729 crfree(dbp->db_credp); 730 dbp->db_credp = NULL; 731 } 732 dbp->db_cpid = -1; 733 734 /* Reset the struioflag and the checksum flag fields */ 735 dbp->db_struioflag = 0; 736 dbp->db_struioun.cksum.flags = 0; 737 738 /* and the COOKED and/or UIOA flag(s) */ 739 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA); 740 741 kmem_cache_free(dbp->db_cache, dbp); 742 } 743 744 static void 745 dblk_decref(mblk_t *mp, dblk_t *dbp) 746 { 747 if (dbp->db_ref != 1) { 748 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 749 -(1 << DBLK_RTFU_SHIFT(db_ref))); 750 /* 751 * atomic_add_32_nv() just decremented db_ref, so we no longer 752 * have a reference to the dblk, which means another thread 753 * could free it. Therefore we cannot examine the dblk to 754 * determine whether ours was the last reference. Instead, 755 * we extract the new and minimum reference counts from rtfu. 756 * Note that all we're really saying is "if (ref != refmin)". 757 */ 758 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 759 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 760 kmem_cache_free(mblk_cache, mp); 761 return; 762 } 763 } 764 dbp->db_mblk = mp; 765 dbp->db_free = dbp->db_lastfree; 766 dbp->db_lastfree(mp, dbp); 767 } 768 769 mblk_t * 770 dupb(mblk_t *mp) 771 { 772 dblk_t *dbp = mp->b_datap; 773 mblk_t *new_mp; 774 uint32_t oldrtfu, newrtfu; 775 776 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 777 goto out; 778 779 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 780 new_mp->b_rptr = mp->b_rptr; 781 new_mp->b_wptr = mp->b_wptr; 782 new_mp->b_datap = dbp; 783 new_mp->b_queue = NULL; 784 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 785 786 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 787 788 dbp->db_free = dblk_decref; 789 do { 790 ASSERT(dbp->db_ref > 0); 791 oldrtfu = DBLK_RTFU_WORD(dbp); 792 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 793 /* 794 * If db_ref is maxed out we can't dup this message anymore. 795 */ 796 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 797 kmem_cache_free(mblk_cache, new_mp); 798 new_mp = NULL; 799 goto out; 800 } 801 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 802 803 out: 804 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 805 return (new_mp); 806 } 807 808 static void 809 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 810 { 811 frtn_t *frp = dbp->db_frtnp; 812 813 ASSERT(dbp->db_mblk == mp); 814 frp->free_func(frp->free_arg); 815 if (dbp->db_fthdr != NULL) 816 str_ftfree(dbp); 817 818 /* set credp and projid to be 'unspecified' before returning to cache */ 819 if (dbp->db_credp != NULL) { 820 crfree(dbp->db_credp); 821 dbp->db_credp = NULL; 822 } 823 dbp->db_cpid = -1; 824 dbp->db_struioflag = 0; 825 dbp->db_struioun.cksum.flags = 0; 826 827 kmem_cache_free(dbp->db_cache, dbp); 828 } 829 830 /*ARGSUSED*/ 831 static void 832 frnop_func(void *arg) 833 { 834 } 835 836 /* 837 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 838 */ 839 static mblk_t * 840 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 841 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 842 { 843 dblk_t *dbp; 844 mblk_t *mp; 845 846 ASSERT(base != NULL && frp != NULL); 847 848 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 849 mp = NULL; 850 goto out; 851 } 852 853 mp = dbp->db_mblk; 854 dbp->db_base = base; 855 dbp->db_lim = base + size; 856 dbp->db_free = dbp->db_lastfree = lastfree; 857 dbp->db_frtnp = frp; 858 DBLK_RTFU_WORD(dbp) = db_rtfu; 859 mp->b_next = mp->b_prev = mp->b_cont = NULL; 860 mp->b_rptr = mp->b_wptr = base; 861 mp->b_queue = NULL; 862 MBLK_BAND_FLAG_WORD(mp) = 0; 863 864 out: 865 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 866 return (mp); 867 } 868 869 /*ARGSUSED*/ 870 mblk_t * 871 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 872 { 873 mblk_t *mp; 874 875 /* 876 * Note that this is structured to allow the common case (i.e. 877 * STREAMS flowtracing disabled) to call gesballoc() with tail 878 * call optimization. 879 */ 880 if (!str_ftnever) { 881 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 882 frp, freebs_enqueue, KM_NOSLEEP); 883 884 if (mp != NULL) 885 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 886 return (mp); 887 } 888 889 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 890 frp, freebs_enqueue, KM_NOSLEEP)); 891 } 892 893 /* 894 * Same as esballoc() but sleeps waiting for memory. 895 */ 896 /*ARGSUSED*/ 897 mblk_t * 898 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 899 { 900 mblk_t *mp; 901 902 /* 903 * Note that this is structured to allow the common case (i.e. 904 * STREAMS flowtracing disabled) to call gesballoc() with tail 905 * call optimization. 906 */ 907 if (!str_ftnever) { 908 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 909 frp, freebs_enqueue, KM_SLEEP); 910 911 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 912 return (mp); 913 } 914 915 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 916 frp, freebs_enqueue, KM_SLEEP)); 917 } 918 919 /*ARGSUSED*/ 920 mblk_t * 921 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 922 { 923 mblk_t *mp; 924 925 /* 926 * Note that this is structured to allow the common case (i.e. 927 * STREAMS flowtracing disabled) to call gesballoc() with tail 928 * call optimization. 929 */ 930 if (!str_ftnever) { 931 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 932 frp, dblk_lastfree_desb, KM_NOSLEEP); 933 934 if (mp != NULL) 935 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 936 return (mp); 937 } 938 939 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 940 frp, dblk_lastfree_desb, KM_NOSLEEP)); 941 } 942 943 /*ARGSUSED*/ 944 mblk_t * 945 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 946 { 947 mblk_t *mp; 948 949 /* 950 * Note that this is structured to allow the common case (i.e. 951 * STREAMS flowtracing disabled) to call gesballoc() with tail 952 * call optimization. 953 */ 954 if (!str_ftnever) { 955 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 956 frp, freebs_enqueue, KM_NOSLEEP); 957 958 if (mp != NULL) 959 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 960 return (mp); 961 } 962 963 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 964 frp, freebs_enqueue, KM_NOSLEEP)); 965 } 966 967 /*ARGSUSED*/ 968 mblk_t * 969 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 970 { 971 mblk_t *mp; 972 973 /* 974 * Note that this is structured to allow the common case (i.e. 975 * STREAMS flowtracing disabled) to call gesballoc() with tail 976 * call optimization. 977 */ 978 if (!str_ftnever) { 979 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 980 frp, dblk_lastfree_desb, KM_NOSLEEP); 981 982 if (mp != NULL) 983 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 984 return (mp); 985 } 986 987 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 988 frp, dblk_lastfree_desb, KM_NOSLEEP)); 989 } 990 991 static void 992 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 993 { 994 bcache_t *bcp = dbp->db_cache; 995 996 ASSERT(dbp->db_mblk == mp); 997 if (dbp->db_fthdr != NULL) 998 str_ftfree(dbp); 999 1000 /* set credp and projid to be 'unspecified' before returning to cache */ 1001 if (dbp->db_credp != NULL) { 1002 crfree(dbp->db_credp); 1003 dbp->db_credp = NULL; 1004 } 1005 dbp->db_cpid = -1; 1006 dbp->db_struioflag = 0; 1007 dbp->db_struioun.cksum.flags = 0; 1008 1009 mutex_enter(&bcp->mutex); 1010 kmem_cache_free(bcp->dblk_cache, dbp); 1011 bcp->alloc--; 1012 1013 if (bcp->alloc == 0 && bcp->destroy != 0) { 1014 kmem_cache_destroy(bcp->dblk_cache); 1015 kmem_cache_destroy(bcp->buffer_cache); 1016 mutex_exit(&bcp->mutex); 1017 mutex_destroy(&bcp->mutex); 1018 kmem_free(bcp, sizeof (bcache_t)); 1019 } else { 1020 mutex_exit(&bcp->mutex); 1021 } 1022 } 1023 1024 bcache_t * 1025 bcache_create(char *name, size_t size, uint_t align) 1026 { 1027 bcache_t *bcp; 1028 char buffer[255]; 1029 1030 ASSERT((align & (align - 1)) == 0); 1031 1032 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL) 1033 return (NULL); 1034 1035 bcp->size = size; 1036 bcp->align = align; 1037 bcp->alloc = 0; 1038 bcp->destroy = 0; 1039 1040 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 1041 1042 (void) sprintf(buffer, "%s_buffer_cache", name); 1043 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 1044 NULL, NULL, NULL, 0); 1045 (void) sprintf(buffer, "%s_dblk_cache", name); 1046 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 1047 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 1048 NULL, (void *)bcp, NULL, 0); 1049 1050 return (bcp); 1051 } 1052 1053 void 1054 bcache_destroy(bcache_t *bcp) 1055 { 1056 ASSERT(bcp != NULL); 1057 1058 mutex_enter(&bcp->mutex); 1059 if (bcp->alloc == 0) { 1060 kmem_cache_destroy(bcp->dblk_cache); 1061 kmem_cache_destroy(bcp->buffer_cache); 1062 mutex_exit(&bcp->mutex); 1063 mutex_destroy(&bcp->mutex); 1064 kmem_free(bcp, sizeof (bcache_t)); 1065 } else { 1066 bcp->destroy++; 1067 mutex_exit(&bcp->mutex); 1068 } 1069 } 1070 1071 /*ARGSUSED*/ 1072 mblk_t * 1073 bcache_allocb(bcache_t *bcp, uint_t pri) 1074 { 1075 dblk_t *dbp; 1076 mblk_t *mp = NULL; 1077 1078 ASSERT(bcp != NULL); 1079 1080 mutex_enter(&bcp->mutex); 1081 if (bcp->destroy != 0) { 1082 mutex_exit(&bcp->mutex); 1083 goto out; 1084 } 1085 1086 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 1087 mutex_exit(&bcp->mutex); 1088 goto out; 1089 } 1090 bcp->alloc++; 1091 mutex_exit(&bcp->mutex); 1092 1093 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 1094 1095 mp = dbp->db_mblk; 1096 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1097 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1098 mp->b_rptr = mp->b_wptr = dbp->db_base; 1099 mp->b_queue = NULL; 1100 MBLK_BAND_FLAG_WORD(mp) = 0; 1101 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 1102 out: 1103 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 1104 1105 return (mp); 1106 } 1107 1108 static void 1109 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 1110 { 1111 ASSERT(dbp->db_mblk == mp); 1112 if (dbp->db_fthdr != NULL) 1113 str_ftfree(dbp); 1114 1115 /* set credp and projid to be 'unspecified' before returning to cache */ 1116 if (dbp->db_credp != NULL) { 1117 crfree(dbp->db_credp); 1118 dbp->db_credp = NULL; 1119 } 1120 dbp->db_cpid = -1; 1121 dbp->db_struioflag = 0; 1122 dbp->db_struioun.cksum.flags = 0; 1123 1124 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 1125 kmem_cache_free(dbp->db_cache, dbp); 1126 } 1127 1128 static mblk_t * 1129 allocb_oversize(size_t size, int kmflags) 1130 { 1131 mblk_t *mp; 1132 void *buf; 1133 1134 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 1135 if ((buf = kmem_alloc(size, kmflags)) == NULL) 1136 return (NULL); 1137 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 1138 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 1139 kmem_free(buf, size); 1140 1141 if (mp != NULL) 1142 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 1143 1144 return (mp); 1145 } 1146 1147 mblk_t * 1148 allocb_tryhard(size_t target_size) 1149 { 1150 size_t size; 1151 mblk_t *bp; 1152 1153 for (size = target_size; size < target_size + 512; 1154 size += DBLK_CACHE_ALIGN) 1155 if ((bp = allocb(size, BPRI_HI)) != NULL) 1156 return (bp); 1157 allocb_tryhard_fails++; 1158 return (NULL); 1159 } 1160 1161 /* 1162 * This routine is consolidation private for STREAMS internal use 1163 * This routine may only be called from sync routines (i.e., not 1164 * from put or service procedures). It is located here (rather 1165 * than strsubr.c) so that we don't have to expose all of the 1166 * allocb() implementation details in header files. 1167 */ 1168 mblk_t * 1169 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 1170 { 1171 dblk_t *dbp; 1172 mblk_t *mp; 1173 size_t index; 1174 1175 index = (size -1) >> DBLK_SIZE_SHIFT; 1176 1177 if (flags & STR_NOSIG) { 1178 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 1179 if (size != 0) { 1180 mp = allocb_oversize(size, KM_SLEEP); 1181 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1182 (uintptr_t)mp); 1183 return (mp); 1184 } 1185 index = 0; 1186 } 1187 1188 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1189 mp = dbp->db_mblk; 1190 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1191 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1192 mp->b_rptr = mp->b_wptr = dbp->db_base; 1193 mp->b_queue = NULL; 1194 MBLK_BAND_FLAG_WORD(mp) = 0; 1195 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1196 1197 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1198 1199 } else { 1200 while ((mp = allocb(size, pri)) == NULL) { 1201 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1202 return (NULL); 1203 } 1204 } 1205 1206 return (mp); 1207 } 1208 1209 /* 1210 * Call function 'func' with 'arg' when a class zero block can 1211 * be allocated with priority 'pri'. 1212 */ 1213 bufcall_id_t 1214 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1215 { 1216 return (bufcall(1, pri, func, arg)); 1217 } 1218 1219 /* 1220 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1221 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1222 * This provides consistency for all internal allocators of ioctl. 1223 */ 1224 mblk_t * 1225 mkiocb(uint_t cmd) 1226 { 1227 struct iocblk *ioc; 1228 mblk_t *mp; 1229 1230 /* 1231 * Allocate enough space for any of the ioctl related messages. 1232 */ 1233 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1234 return (NULL); 1235 1236 bzero(mp->b_rptr, sizeof (union ioctypes)); 1237 1238 /* 1239 * Set the mblk_t information and ptrs correctly. 1240 */ 1241 mp->b_wptr += sizeof (struct iocblk); 1242 mp->b_datap->db_type = M_IOCTL; 1243 1244 /* 1245 * Fill in the fields. 1246 */ 1247 ioc = (struct iocblk *)mp->b_rptr; 1248 ioc->ioc_cmd = cmd; 1249 ioc->ioc_cr = kcred; 1250 ioc->ioc_id = getiocseqno(); 1251 ioc->ioc_flag = IOC_NATIVE; 1252 return (mp); 1253 } 1254 1255 /* 1256 * test if block of given size can be allocated with a request of 1257 * the given priority. 1258 * 'pri' is no longer used, but is retained for compatibility. 1259 */ 1260 /* ARGSUSED */ 1261 int 1262 testb(size_t size, uint_t pri) 1263 { 1264 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1265 } 1266 1267 /* 1268 * Call function 'func' with argument 'arg' when there is a reasonably 1269 * good chance that a block of size 'size' can be allocated. 1270 * 'pri' is no longer used, but is retained for compatibility. 1271 */ 1272 /* ARGSUSED */ 1273 bufcall_id_t 1274 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1275 { 1276 static long bid = 1; /* always odd to save checking for zero */ 1277 bufcall_id_t bc_id; 1278 struct strbufcall *bcp; 1279 1280 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1281 return (0); 1282 1283 bcp->bc_func = func; 1284 bcp->bc_arg = arg; 1285 bcp->bc_size = size; 1286 bcp->bc_next = NULL; 1287 bcp->bc_executor = NULL; 1288 1289 mutex_enter(&strbcall_lock); 1290 /* 1291 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1292 * should be no references to bcp since it may be freed by 1293 * runbufcalls(). Since bcp_id field is returned, we save its value in 1294 * the local var. 1295 */ 1296 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1297 1298 /* 1299 * add newly allocated stream event to existing 1300 * linked list of events. 1301 */ 1302 if (strbcalls.bc_head == NULL) { 1303 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1304 } else { 1305 strbcalls.bc_tail->bc_next = bcp; 1306 strbcalls.bc_tail = bcp; 1307 } 1308 1309 cv_signal(&strbcall_cv); 1310 mutex_exit(&strbcall_lock); 1311 return (bc_id); 1312 } 1313 1314 /* 1315 * Cancel a bufcall request. 1316 */ 1317 void 1318 unbufcall(bufcall_id_t id) 1319 { 1320 strbufcall_t *bcp, *pbcp; 1321 1322 mutex_enter(&strbcall_lock); 1323 again: 1324 pbcp = NULL; 1325 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1326 if (id == bcp->bc_id) 1327 break; 1328 pbcp = bcp; 1329 } 1330 if (bcp) { 1331 if (bcp->bc_executor != NULL) { 1332 if (bcp->bc_executor != curthread) { 1333 cv_wait(&bcall_cv, &strbcall_lock); 1334 goto again; 1335 } 1336 } else { 1337 if (pbcp) 1338 pbcp->bc_next = bcp->bc_next; 1339 else 1340 strbcalls.bc_head = bcp->bc_next; 1341 if (bcp == strbcalls.bc_tail) 1342 strbcalls.bc_tail = pbcp; 1343 kmem_free(bcp, sizeof (strbufcall_t)); 1344 } 1345 } 1346 mutex_exit(&strbcall_lock); 1347 } 1348 1349 /* 1350 * Duplicate a message block by block (uses dupb), returning 1351 * a pointer to the duplicate message. 1352 * Returns a non-NULL value only if the entire message 1353 * was dup'd. 1354 */ 1355 mblk_t * 1356 dupmsg(mblk_t *bp) 1357 { 1358 mblk_t *head, *nbp; 1359 1360 if (!bp || !(nbp = head = dupb(bp))) 1361 return (NULL); 1362 1363 while (bp->b_cont) { 1364 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1365 freemsg(head); 1366 return (NULL); 1367 } 1368 nbp = nbp->b_cont; 1369 bp = bp->b_cont; 1370 } 1371 return (head); 1372 } 1373 1374 #define DUPB_NOLOAN(bp) \ 1375 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1376 copyb((bp)) : dupb((bp))) 1377 1378 mblk_t * 1379 dupmsg_noloan(mblk_t *bp) 1380 { 1381 mblk_t *head, *nbp; 1382 1383 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1384 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1385 return (NULL); 1386 1387 while (bp->b_cont) { 1388 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1389 freemsg(head); 1390 return (NULL); 1391 } 1392 nbp = nbp->b_cont; 1393 bp = bp->b_cont; 1394 } 1395 return (head); 1396 } 1397 1398 /* 1399 * Copy data from message and data block to newly allocated message and 1400 * data block. Returns new message block pointer, or NULL if error. 1401 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1402 * as in the original even when db_base is not word aligned. (bug 1052877) 1403 */ 1404 mblk_t * 1405 copyb(mblk_t *bp) 1406 { 1407 mblk_t *nbp; 1408 dblk_t *dp, *ndp; 1409 uchar_t *base; 1410 size_t size; 1411 size_t unaligned; 1412 1413 ASSERT(bp->b_wptr >= bp->b_rptr); 1414 1415 dp = bp->b_datap; 1416 if (dp->db_fthdr != NULL) 1417 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1418 1419 /* 1420 * Special handling for Multidata message; this should be 1421 * removed once a copy-callback routine is made available. 1422 */ 1423 if (dp->db_type == M_MULTIDATA) { 1424 cred_t *cr; 1425 1426 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1427 return (NULL); 1428 1429 nbp->b_flag = bp->b_flag; 1430 nbp->b_band = bp->b_band; 1431 ndp = nbp->b_datap; 1432 1433 /* See comments below on potential issues. */ 1434 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1435 1436 ASSERT(ndp->db_type == dp->db_type); 1437 cr = dp->db_credp; 1438 if (cr != NULL) 1439 crhold(ndp->db_credp = cr); 1440 ndp->db_cpid = dp->db_cpid; 1441 return (nbp); 1442 } 1443 1444 size = dp->db_lim - dp->db_base; 1445 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1446 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1447 return (NULL); 1448 nbp->b_flag = bp->b_flag; 1449 nbp->b_band = bp->b_band; 1450 ndp = nbp->b_datap; 1451 1452 /* 1453 * Well, here is a potential issue. If we are trying to 1454 * trace a flow, and we copy the message, we might lose 1455 * information about where this message might have been. 1456 * So we should inherit the FT data. On the other hand, 1457 * a user might be interested only in alloc to free data. 1458 * So I guess the real answer is to provide a tunable. 1459 */ 1460 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1461 1462 base = ndp->db_base + unaligned; 1463 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1464 1465 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1466 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1467 1468 return (nbp); 1469 } 1470 1471 /* 1472 * Copy data from message to newly allocated message using new 1473 * data blocks. Returns a pointer to the new message, or NULL if error. 1474 */ 1475 mblk_t * 1476 copymsg(mblk_t *bp) 1477 { 1478 mblk_t *head, *nbp; 1479 1480 if (!bp || !(nbp = head = copyb(bp))) 1481 return (NULL); 1482 1483 while (bp->b_cont) { 1484 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1485 freemsg(head); 1486 return (NULL); 1487 } 1488 nbp = nbp->b_cont; 1489 bp = bp->b_cont; 1490 } 1491 return (head); 1492 } 1493 1494 /* 1495 * link a message block to tail of message 1496 */ 1497 void 1498 linkb(mblk_t *mp, mblk_t *bp) 1499 { 1500 ASSERT(mp && bp); 1501 1502 for (; mp->b_cont; mp = mp->b_cont) 1503 ; 1504 mp->b_cont = bp; 1505 } 1506 1507 /* 1508 * unlink a message block from head of message 1509 * return pointer to new message. 1510 * NULL if message becomes empty. 1511 */ 1512 mblk_t * 1513 unlinkb(mblk_t *bp) 1514 { 1515 mblk_t *bp1; 1516 1517 bp1 = bp->b_cont; 1518 bp->b_cont = NULL; 1519 return (bp1); 1520 } 1521 1522 /* 1523 * remove a message block "bp" from message "mp" 1524 * 1525 * Return pointer to new message or NULL if no message remains. 1526 * Return -1 if bp is not found in message. 1527 */ 1528 mblk_t * 1529 rmvb(mblk_t *mp, mblk_t *bp) 1530 { 1531 mblk_t *tmp; 1532 mblk_t *lastp = NULL; 1533 1534 ASSERT(mp && bp); 1535 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1536 if (tmp == bp) { 1537 if (lastp) 1538 lastp->b_cont = tmp->b_cont; 1539 else 1540 mp = tmp->b_cont; 1541 tmp->b_cont = NULL; 1542 return (mp); 1543 } 1544 lastp = tmp; 1545 } 1546 return ((mblk_t *)-1); 1547 } 1548 1549 /* 1550 * Concatenate and align first len bytes of common 1551 * message type. Len == -1, means concat everything. 1552 * Returns 1 on success, 0 on failure 1553 * After the pullup, mp points to the pulled up data. 1554 */ 1555 int 1556 pullupmsg(mblk_t *mp, ssize_t len) 1557 { 1558 mblk_t *bp, *b_cont; 1559 dblk_t *dbp; 1560 ssize_t n; 1561 1562 ASSERT(mp->b_datap->db_ref > 0); 1563 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1564 1565 /* 1566 * We won't handle Multidata message, since it contains 1567 * metadata which this function has no knowledge of; we 1568 * assert on DEBUG, and return failure otherwise. 1569 */ 1570 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1571 if (mp->b_datap->db_type == M_MULTIDATA) 1572 return (0); 1573 1574 if (len == -1) { 1575 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1576 return (1); 1577 len = xmsgsize(mp); 1578 } else { 1579 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1580 ASSERT(first_mblk_len >= 0); 1581 /* 1582 * If the length is less than that of the first mblk, 1583 * we want to pull up the message into an aligned mblk. 1584 * Though not part of the spec, some callers assume it. 1585 */ 1586 if (len <= first_mblk_len) { 1587 if (str_aligned(mp->b_rptr)) 1588 return (1); 1589 len = first_mblk_len; 1590 } else if (xmsgsize(mp) < len) 1591 return (0); 1592 } 1593 1594 if ((bp = allocb_tmpl(len, mp)) == NULL) 1595 return (0); 1596 1597 dbp = bp->b_datap; 1598 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1599 mp->b_datap = dbp; /* ... and mp heads the new message */ 1600 mp->b_datap->db_mblk = mp; 1601 bp->b_datap->db_mblk = bp; 1602 mp->b_rptr = mp->b_wptr = dbp->db_base; 1603 1604 do { 1605 ASSERT(bp->b_datap->db_ref > 0); 1606 ASSERT(bp->b_wptr >= bp->b_rptr); 1607 n = MIN(bp->b_wptr - bp->b_rptr, len); 1608 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1609 mp->b_wptr += n; 1610 bp->b_rptr += n; 1611 len -= n; 1612 if (bp->b_rptr != bp->b_wptr) 1613 break; 1614 b_cont = bp->b_cont; 1615 freeb(bp); 1616 bp = b_cont; 1617 } while (len && bp); 1618 1619 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1620 1621 return (1); 1622 } 1623 1624 /* 1625 * Concatenate and align at least the first len bytes of common message 1626 * type. Len == -1 means concatenate everything. The original message is 1627 * unaltered. Returns a pointer to a new message on success, otherwise 1628 * returns NULL. 1629 */ 1630 mblk_t * 1631 msgpullup(mblk_t *mp, ssize_t len) 1632 { 1633 mblk_t *newmp; 1634 ssize_t totlen; 1635 ssize_t n; 1636 1637 /* 1638 * We won't handle Multidata message, since it contains 1639 * metadata which this function has no knowledge of; we 1640 * assert on DEBUG, and return failure otherwise. 1641 */ 1642 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1643 if (mp->b_datap->db_type == M_MULTIDATA) 1644 return (NULL); 1645 1646 totlen = xmsgsize(mp); 1647 1648 if ((len > 0) && (len > totlen)) 1649 return (NULL); 1650 1651 /* 1652 * Copy all of the first msg type into one new mblk, then dupmsg 1653 * and link the rest onto this. 1654 */ 1655 1656 len = totlen; 1657 1658 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1659 return (NULL); 1660 1661 newmp->b_flag = mp->b_flag; 1662 newmp->b_band = mp->b_band; 1663 1664 while (len > 0) { 1665 n = mp->b_wptr - mp->b_rptr; 1666 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1667 if (n > 0) 1668 bcopy(mp->b_rptr, newmp->b_wptr, n); 1669 newmp->b_wptr += n; 1670 len -= n; 1671 mp = mp->b_cont; 1672 } 1673 1674 if (mp != NULL) { 1675 newmp->b_cont = dupmsg(mp); 1676 if (newmp->b_cont == NULL) { 1677 freemsg(newmp); 1678 return (NULL); 1679 } 1680 } 1681 1682 return (newmp); 1683 } 1684 1685 /* 1686 * Trim bytes from message 1687 * len > 0, trim from head 1688 * len < 0, trim from tail 1689 * Returns 1 on success, 0 on failure. 1690 */ 1691 int 1692 adjmsg(mblk_t *mp, ssize_t len) 1693 { 1694 mblk_t *bp; 1695 mblk_t *save_bp = NULL; 1696 mblk_t *prev_bp; 1697 mblk_t *bcont; 1698 unsigned char type; 1699 ssize_t n; 1700 int fromhead; 1701 int first; 1702 1703 ASSERT(mp != NULL); 1704 /* 1705 * We won't handle Multidata message, since it contains 1706 * metadata which this function has no knowledge of; we 1707 * assert on DEBUG, and return failure otherwise. 1708 */ 1709 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1710 if (mp->b_datap->db_type == M_MULTIDATA) 1711 return (0); 1712 1713 if (len < 0) { 1714 fromhead = 0; 1715 len = -len; 1716 } else { 1717 fromhead = 1; 1718 } 1719 1720 if (xmsgsize(mp) < len) 1721 return (0); 1722 1723 if (fromhead) { 1724 first = 1; 1725 while (len) { 1726 ASSERT(mp->b_wptr >= mp->b_rptr); 1727 n = MIN(mp->b_wptr - mp->b_rptr, len); 1728 mp->b_rptr += n; 1729 len -= n; 1730 1731 /* 1732 * If this is not the first zero length 1733 * message remove it 1734 */ 1735 if (!first && (mp->b_wptr == mp->b_rptr)) { 1736 bcont = mp->b_cont; 1737 freeb(mp); 1738 mp = save_bp->b_cont = bcont; 1739 } else { 1740 save_bp = mp; 1741 mp = mp->b_cont; 1742 } 1743 first = 0; 1744 } 1745 } else { 1746 type = mp->b_datap->db_type; 1747 while (len) { 1748 bp = mp; 1749 save_bp = NULL; 1750 1751 /* 1752 * Find the last message of same type 1753 */ 1754 while (bp && bp->b_datap->db_type == type) { 1755 ASSERT(bp->b_wptr >= bp->b_rptr); 1756 prev_bp = save_bp; 1757 save_bp = bp; 1758 bp = bp->b_cont; 1759 } 1760 if (save_bp == NULL) 1761 break; 1762 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1763 save_bp->b_wptr -= n; 1764 len -= n; 1765 1766 /* 1767 * If this is not the first message 1768 * and we have taken away everything 1769 * from this message, remove it 1770 */ 1771 1772 if ((save_bp != mp) && 1773 (save_bp->b_wptr == save_bp->b_rptr)) { 1774 bcont = save_bp->b_cont; 1775 freeb(save_bp); 1776 prev_bp->b_cont = bcont; 1777 } 1778 } 1779 } 1780 return (1); 1781 } 1782 1783 /* 1784 * get number of data bytes in message 1785 */ 1786 size_t 1787 msgdsize(mblk_t *bp) 1788 { 1789 size_t count = 0; 1790 1791 for (; bp; bp = bp->b_cont) 1792 if (bp->b_datap->db_type == M_DATA) { 1793 ASSERT(bp->b_wptr >= bp->b_rptr); 1794 count += bp->b_wptr - bp->b_rptr; 1795 } 1796 return (count); 1797 } 1798 1799 /* 1800 * Get a message off head of queue 1801 * 1802 * If queue has no buffers then mark queue 1803 * with QWANTR. (queue wants to be read by 1804 * someone when data becomes available) 1805 * 1806 * If there is something to take off then do so. 1807 * If queue falls below hi water mark turn off QFULL 1808 * flag. Decrement weighted count of queue. 1809 * Also turn off QWANTR because queue is being read. 1810 * 1811 * The queue count is maintained on a per-band basis. 1812 * Priority band 0 (normal messages) uses q_count, 1813 * q_lowat, etc. Non-zero priority bands use the 1814 * fields in their respective qband structures 1815 * (qb_count, qb_lowat, etc.) All messages appear 1816 * on the same list, linked via their b_next pointers. 1817 * q_first is the head of the list. q_count does 1818 * not reflect the size of all the messages on the 1819 * queue. It only reflects those messages in the 1820 * normal band of flow. The one exception to this 1821 * deals with high priority messages. They are in 1822 * their own conceptual "band", but are accounted 1823 * against q_count. 1824 * 1825 * If queue count is below the lo water mark and QWANTW 1826 * is set, enable the closest backq which has a service 1827 * procedure and turn off the QWANTW flag. 1828 * 1829 * getq could be built on top of rmvq, but isn't because 1830 * of performance considerations. 1831 * 1832 * A note on the use of q_count and q_mblkcnt: 1833 * q_count is the traditional byte count for messages that 1834 * have been put on a queue. Documentation tells us that 1835 * we shouldn't rely on that count, but some drivers/modules 1836 * do. What was needed, however, is a mechanism to prevent 1837 * runaway streams from consuming all of the resources, 1838 * and particularly be able to flow control zero-length 1839 * messages. q_mblkcnt is used for this purpose. It 1840 * counts the number of mblk's that are being put on 1841 * the queue. The intention here, is that each mblk should 1842 * contain one byte of data and, for the purpose of 1843 * flow-control, logically does. A queue will become 1844 * full when EITHER of these values (q_count and q_mblkcnt) 1845 * reach the highwater mark. It will clear when BOTH 1846 * of them drop below the highwater mark. And it will 1847 * backenable when BOTH of them drop below the lowwater 1848 * mark. 1849 * With this algorithm, a driver/module might be able 1850 * to find a reasonably accurate q_count, and the 1851 * framework can still try and limit resource usage. 1852 */ 1853 mblk_t * 1854 getq(queue_t *q) 1855 { 1856 mblk_t *bp; 1857 uchar_t band = 0; 1858 1859 bp = getq_noenab(q, 0); 1860 if (bp != NULL) 1861 band = bp->b_band; 1862 1863 /* 1864 * Inlined from qbackenable(). 1865 * Quick check without holding the lock. 1866 */ 1867 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1868 return (bp); 1869 1870 qbackenable(q, band); 1871 return (bp); 1872 } 1873 1874 /* 1875 * Calculate number of data bytes in a single data message block taking 1876 * multidata messages into account. 1877 */ 1878 1879 #define ADD_MBLK_SIZE(mp, size) \ 1880 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1881 (size) += MBLKL(mp); \ 1882 } else { \ 1883 uint_t pinuse; \ 1884 \ 1885 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1886 (size) += pinuse; \ 1887 } 1888 1889 /* 1890 * Returns the number of bytes in a message (a message is defined as a 1891 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we 1892 * also return the number of distinct mblks in the message. 1893 */ 1894 int 1895 mp_cont_len(mblk_t *bp, int *mblkcnt) 1896 { 1897 mblk_t *mp; 1898 int mblks = 0; 1899 int bytes = 0; 1900 1901 for (mp = bp; mp != NULL; mp = mp->b_cont) { 1902 ADD_MBLK_SIZE(mp, bytes); 1903 mblks++; 1904 } 1905 1906 if (mblkcnt != NULL) 1907 *mblkcnt = mblks; 1908 1909 return (bytes); 1910 } 1911 1912 /* 1913 * Like getq() but does not backenable. This is used by the stream 1914 * head when a putback() is likely. The caller must call qbackenable() 1915 * after it is done with accessing the queue. 1916 * The rbytes arguments to getq_noneab() allows callers to specify a 1917 * the maximum number of bytes to return. If the current amount on the 1918 * queue is less than this then the entire message will be returned. 1919 * A value of 0 returns the entire message and is equivalent to the old 1920 * default behaviour prior to the addition of the rbytes argument. 1921 */ 1922 mblk_t * 1923 getq_noenab(queue_t *q, ssize_t rbytes) 1924 { 1925 mblk_t *bp, *mp1; 1926 mblk_t *mp2 = NULL; 1927 qband_t *qbp; 1928 kthread_id_t freezer; 1929 int bytecnt = 0, mblkcnt = 0; 1930 1931 /* freezestr should allow its caller to call getq/putq */ 1932 freezer = STREAM(q)->sd_freezer; 1933 if (freezer == curthread) { 1934 ASSERT(frozenstr(q)); 1935 ASSERT(MUTEX_HELD(QLOCK(q))); 1936 } else 1937 mutex_enter(QLOCK(q)); 1938 1939 if ((bp = q->q_first) == 0) { 1940 q->q_flag |= QWANTR; 1941 } else { 1942 /* 1943 * If the caller supplied a byte threshold and there is 1944 * more than this amount on the queue then break up the 1945 * the message appropriately. We can only safely do 1946 * this for M_DATA messages. 1947 */ 1948 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) && 1949 (q->q_count > rbytes)) { 1950 /* 1951 * Inline version of mp_cont_len() which terminates 1952 * when we meet or exceed rbytes. 1953 */ 1954 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) { 1955 mblkcnt++; 1956 ADD_MBLK_SIZE(mp1, bytecnt); 1957 if (bytecnt >= rbytes) 1958 break; 1959 } 1960 /* 1961 * We need to account for the following scenarios: 1962 * 1963 * 1) Too much data in the first message: 1964 * mp1 will be the mblk which puts us over our 1965 * byte limit. 1966 * 2) Not enough data in the first message: 1967 * mp1 will be NULL. 1968 * 3) Exactly the right amount of data contained within 1969 * whole mblks: 1970 * mp1->b_cont will be where we break the message. 1971 */ 1972 if (bytecnt > rbytes) { 1973 /* 1974 * Dup/copy mp1 and put what we don't need 1975 * back onto the queue. Adjust the read/write 1976 * and continuation pointers appropriately 1977 * and decrement the current mblk count to 1978 * reflect we are putting an mblk back onto 1979 * the queue. 1980 * When adjusting the message pointers, it's 1981 * OK to use the existing bytecnt and the 1982 * requested amount (rbytes) to calculate the 1983 * the new write offset (b_wptr) of what we 1984 * are taking. However, we cannot use these 1985 * values when calculating the read offset of 1986 * the mblk we are putting back on the queue. 1987 * This is because the begining (b_rptr) of the 1988 * mblk represents some arbitrary point within 1989 * the message. 1990 * It's simplest to do this by advancing b_rptr 1991 * by the new length of mp1 as we don't have to 1992 * remember any intermediate state. 1993 */ 1994 ASSERT(mp1 != NULL); 1995 mblkcnt--; 1996 if ((mp2 = dupb(mp1)) == NULL && 1997 (mp2 = copyb(mp1)) == NULL) { 1998 bytecnt = mblkcnt = 0; 1999 goto dup_failed; 2000 } 2001 mp2->b_cont = mp1->b_cont; 2002 mp1->b_wptr -= bytecnt - rbytes; 2003 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr; 2004 mp1->b_cont = NULL; 2005 bytecnt = rbytes; 2006 } else { 2007 /* 2008 * Either there is not enough data in the first 2009 * message or there is no excess data to deal 2010 * with. If mp1 is NULL, we are taking the 2011 * whole message. No need to do anything. 2012 * Otherwise we assign mp1->b_cont to mp2 as 2013 * we will be putting this back onto the head of 2014 * the queue. 2015 */ 2016 if (mp1 != NULL) { 2017 mp2 = mp1->b_cont; 2018 mp1->b_cont = NULL; 2019 } 2020 } 2021 /* 2022 * If mp2 is not NULL then we have part of the message 2023 * to put back onto the queue. 2024 */ 2025 if (mp2 != NULL) { 2026 if ((mp2->b_next = bp->b_next) == NULL) 2027 q->q_last = mp2; 2028 else 2029 bp->b_next->b_prev = mp2; 2030 q->q_first = mp2; 2031 } else { 2032 if ((q->q_first = bp->b_next) == NULL) 2033 q->q_last = NULL; 2034 else 2035 q->q_first->b_prev = NULL; 2036 } 2037 } else { 2038 /* 2039 * Either no byte threshold was supplied, there is 2040 * not enough on the queue or we failed to 2041 * duplicate/copy a data block. In these cases we 2042 * just take the entire first message. 2043 */ 2044 dup_failed: 2045 bytecnt = mp_cont_len(bp, &mblkcnt); 2046 if ((q->q_first = bp->b_next) == NULL) 2047 q->q_last = NULL; 2048 else 2049 q->q_first->b_prev = NULL; 2050 } 2051 if (bp->b_band == 0) { 2052 q->q_count -= bytecnt; 2053 q->q_mblkcnt -= mblkcnt; 2054 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2055 (q->q_mblkcnt < q->q_hiwat))) { 2056 q->q_flag &= ~QFULL; 2057 } 2058 } else { 2059 int i; 2060 2061 ASSERT(bp->b_band <= q->q_nband); 2062 ASSERT(q->q_bandp != NULL); 2063 ASSERT(MUTEX_HELD(QLOCK(q))); 2064 qbp = q->q_bandp; 2065 i = bp->b_band; 2066 while (--i > 0) 2067 qbp = qbp->qb_next; 2068 if (qbp->qb_first == qbp->qb_last) { 2069 qbp->qb_first = NULL; 2070 qbp->qb_last = NULL; 2071 } else { 2072 qbp->qb_first = bp->b_next; 2073 } 2074 qbp->qb_count -= bytecnt; 2075 qbp->qb_mblkcnt -= mblkcnt; 2076 if (qbp->qb_mblkcnt == 0 || 2077 ((qbp->qb_count < qbp->qb_hiwat) && 2078 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2079 qbp->qb_flag &= ~QB_FULL; 2080 } 2081 } 2082 q->q_flag &= ~QWANTR; 2083 bp->b_next = NULL; 2084 bp->b_prev = NULL; 2085 } 2086 if (freezer != curthread) 2087 mutex_exit(QLOCK(q)); 2088 2089 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 2090 2091 return (bp); 2092 } 2093 2094 /* 2095 * Determine if a backenable is needed after removing a message in the 2096 * specified band. 2097 * NOTE: This routine assumes that something like getq_noenab() has been 2098 * already called. 2099 * 2100 * For the read side it is ok to hold sd_lock across calling this (and the 2101 * stream head often does). 2102 * But for the write side strwakeq might be invoked and it acquires sd_lock. 2103 */ 2104 void 2105 qbackenable(queue_t *q, uchar_t band) 2106 { 2107 int backenab = 0; 2108 qband_t *qbp; 2109 kthread_id_t freezer; 2110 2111 ASSERT(q); 2112 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 2113 2114 /* 2115 * Quick check without holding the lock. 2116 * OK since after getq() has lowered the q_count these flags 2117 * would not change unless either the qbackenable() is done by 2118 * another thread (which is ok) or the queue has gotten QFULL 2119 * in which case another backenable will take place when the queue 2120 * drops below q_lowat. 2121 */ 2122 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 2123 return; 2124 2125 /* freezestr should allow its caller to call getq/putq */ 2126 freezer = STREAM(q)->sd_freezer; 2127 if (freezer == curthread) { 2128 ASSERT(frozenstr(q)); 2129 ASSERT(MUTEX_HELD(QLOCK(q))); 2130 } else 2131 mutex_enter(QLOCK(q)); 2132 2133 if (band == 0) { 2134 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 2135 q->q_mblkcnt < q->q_lowat)) { 2136 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 2137 } 2138 } else { 2139 int i; 2140 2141 ASSERT((unsigned)band <= q->q_nband); 2142 ASSERT(q->q_bandp != NULL); 2143 2144 qbp = q->q_bandp; 2145 i = band; 2146 while (--i > 0) 2147 qbp = qbp->qb_next; 2148 2149 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 2150 qbp->qb_mblkcnt < qbp->qb_lowat)) { 2151 backenab = qbp->qb_flag & QB_WANTW; 2152 } 2153 } 2154 2155 if (backenab == 0) { 2156 if (freezer != curthread) 2157 mutex_exit(QLOCK(q)); 2158 return; 2159 } 2160 2161 /* Have to drop the lock across strwakeq and backenable */ 2162 if (backenab & QWANTWSYNC) 2163 q->q_flag &= ~QWANTWSYNC; 2164 if (backenab & (QWANTW|QB_WANTW)) { 2165 if (band != 0) 2166 qbp->qb_flag &= ~QB_WANTW; 2167 else { 2168 q->q_flag &= ~QWANTW; 2169 } 2170 } 2171 2172 if (freezer != curthread) 2173 mutex_exit(QLOCK(q)); 2174 2175 if (backenab & QWANTWSYNC) 2176 strwakeq(q, QWANTWSYNC); 2177 if (backenab & (QWANTW|QB_WANTW)) 2178 backenable(q, band); 2179 } 2180 2181 /* 2182 * Remove a message from a queue. The queue count and other 2183 * flow control parameters are adjusted and the back queue 2184 * enabled if necessary. 2185 * 2186 * rmvq can be called with the stream frozen, but other utility functions 2187 * holding QLOCK, and by streams modules without any locks/frozen. 2188 */ 2189 void 2190 rmvq(queue_t *q, mblk_t *mp) 2191 { 2192 ASSERT(mp != NULL); 2193 2194 rmvq_noenab(q, mp); 2195 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 2196 /* 2197 * qbackenable can handle a frozen stream but not a "random" 2198 * qlock being held. Drop lock across qbackenable. 2199 */ 2200 mutex_exit(QLOCK(q)); 2201 qbackenable(q, mp->b_band); 2202 mutex_enter(QLOCK(q)); 2203 } else { 2204 qbackenable(q, mp->b_band); 2205 } 2206 } 2207 2208 /* 2209 * Like rmvq() but without any backenabling. 2210 * This exists to handle SR_CONSOL_DATA in strrput(). 2211 */ 2212 void 2213 rmvq_noenab(queue_t *q, mblk_t *mp) 2214 { 2215 int i; 2216 qband_t *qbp = NULL; 2217 kthread_id_t freezer; 2218 int bytecnt = 0, mblkcnt = 0; 2219 2220 freezer = STREAM(q)->sd_freezer; 2221 if (freezer == curthread) { 2222 ASSERT(frozenstr(q)); 2223 ASSERT(MUTEX_HELD(QLOCK(q))); 2224 } else if (MUTEX_HELD(QLOCK(q))) { 2225 /* Don't drop lock on exit */ 2226 freezer = curthread; 2227 } else 2228 mutex_enter(QLOCK(q)); 2229 2230 ASSERT(mp->b_band <= q->q_nband); 2231 if (mp->b_band != 0) { /* Adjust band pointers */ 2232 ASSERT(q->q_bandp != NULL); 2233 qbp = q->q_bandp; 2234 i = mp->b_band; 2235 while (--i > 0) 2236 qbp = qbp->qb_next; 2237 if (mp == qbp->qb_first) { 2238 if (mp->b_next && mp->b_band == mp->b_next->b_band) 2239 qbp->qb_first = mp->b_next; 2240 else 2241 qbp->qb_first = NULL; 2242 } 2243 if (mp == qbp->qb_last) { 2244 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 2245 qbp->qb_last = mp->b_prev; 2246 else 2247 qbp->qb_last = NULL; 2248 } 2249 } 2250 2251 /* 2252 * Remove the message from the list. 2253 */ 2254 if (mp->b_prev) 2255 mp->b_prev->b_next = mp->b_next; 2256 else 2257 q->q_first = mp->b_next; 2258 if (mp->b_next) 2259 mp->b_next->b_prev = mp->b_prev; 2260 else 2261 q->q_last = mp->b_prev; 2262 mp->b_next = NULL; 2263 mp->b_prev = NULL; 2264 2265 /* Get the size of the message for q_count accounting */ 2266 bytecnt = mp_cont_len(mp, &mblkcnt); 2267 2268 if (mp->b_band == 0) { /* Perform q_count accounting */ 2269 q->q_count -= bytecnt; 2270 q->q_mblkcnt -= mblkcnt; 2271 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2272 (q->q_mblkcnt < q->q_hiwat))) { 2273 q->q_flag &= ~QFULL; 2274 } 2275 } else { /* Perform qb_count accounting */ 2276 qbp->qb_count -= bytecnt; 2277 qbp->qb_mblkcnt -= mblkcnt; 2278 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) && 2279 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2280 qbp->qb_flag &= ~QB_FULL; 2281 } 2282 } 2283 if (freezer != curthread) 2284 mutex_exit(QLOCK(q)); 2285 2286 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 2287 } 2288 2289 /* 2290 * Empty a queue. 2291 * If flag is set, remove all messages. Otherwise, remove 2292 * only non-control messages. If queue falls below its low 2293 * water mark, and QWANTW is set, enable the nearest upstream 2294 * service procedure. 2295 * 2296 * Historical note: when merging the M_FLUSH code in strrput with this 2297 * code one difference was discovered. flushq did not have a check 2298 * for q_lowat == 0 in the backenabling test. 2299 * 2300 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2301 * if one exists on the queue. 2302 */ 2303 void 2304 flushq_common(queue_t *q, int flag, int pcproto_flag) 2305 { 2306 mblk_t *mp, *nmp; 2307 qband_t *qbp; 2308 int backenab = 0; 2309 unsigned char bpri; 2310 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2311 2312 if (q->q_first == NULL) 2313 return; 2314 2315 mutex_enter(QLOCK(q)); 2316 mp = q->q_first; 2317 q->q_first = NULL; 2318 q->q_last = NULL; 2319 q->q_count = 0; 2320 q->q_mblkcnt = 0; 2321 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2322 qbp->qb_first = NULL; 2323 qbp->qb_last = NULL; 2324 qbp->qb_count = 0; 2325 qbp->qb_mblkcnt = 0; 2326 qbp->qb_flag &= ~QB_FULL; 2327 } 2328 q->q_flag &= ~QFULL; 2329 mutex_exit(QLOCK(q)); 2330 while (mp) { 2331 nmp = mp->b_next; 2332 mp->b_next = mp->b_prev = NULL; 2333 2334 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2335 2336 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2337 (void) putq(q, mp); 2338 else if (flag || datamsg(mp->b_datap->db_type)) 2339 freemsg(mp); 2340 else 2341 (void) putq(q, mp); 2342 mp = nmp; 2343 } 2344 bpri = 1; 2345 mutex_enter(QLOCK(q)); 2346 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2347 if ((qbp->qb_flag & QB_WANTW) && 2348 (((qbp->qb_count < qbp->qb_lowat) && 2349 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2350 qbp->qb_lowat == 0)) { 2351 qbp->qb_flag &= ~QB_WANTW; 2352 backenab = 1; 2353 qbf[bpri] = 1; 2354 } else 2355 qbf[bpri] = 0; 2356 bpri++; 2357 } 2358 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2359 if ((q->q_flag & QWANTW) && 2360 (((q->q_count < q->q_lowat) && 2361 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2362 q->q_flag &= ~QWANTW; 2363 backenab = 1; 2364 qbf[0] = 1; 2365 } else 2366 qbf[0] = 0; 2367 2368 /* 2369 * If any band can now be written to, and there is a writer 2370 * for that band, then backenable the closest service procedure. 2371 */ 2372 if (backenab) { 2373 mutex_exit(QLOCK(q)); 2374 for (bpri = q->q_nband; bpri != 0; bpri--) 2375 if (qbf[bpri]) 2376 backenable(q, bpri); 2377 if (qbf[0]) 2378 backenable(q, 0); 2379 } else 2380 mutex_exit(QLOCK(q)); 2381 } 2382 2383 /* 2384 * The real flushing takes place in flushq_common. This is done so that 2385 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2386 * or not. Currently the only place that uses this flag is the stream head. 2387 */ 2388 void 2389 flushq(queue_t *q, int flag) 2390 { 2391 flushq_common(q, flag, 0); 2392 } 2393 2394 /* 2395 * Flush the queue of messages of the given priority band. 2396 * There is some duplication of code between flushq and flushband. 2397 * This is because we want to optimize the code as much as possible. 2398 * The assumption is that there will be more messages in the normal 2399 * (priority 0) band than in any other. 2400 * 2401 * Historical note: when merging the M_FLUSH code in strrput with this 2402 * code one difference was discovered. flushband had an extra check for 2403 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2404 * case. That check does not match the man page for flushband and was not 2405 * in the strrput flush code hence it was removed. 2406 */ 2407 void 2408 flushband(queue_t *q, unsigned char pri, int flag) 2409 { 2410 mblk_t *mp; 2411 mblk_t *nmp; 2412 mblk_t *last; 2413 qband_t *qbp; 2414 int band; 2415 2416 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2417 if (pri > q->q_nband) { 2418 return; 2419 } 2420 mutex_enter(QLOCK(q)); 2421 if (pri == 0) { 2422 mp = q->q_first; 2423 q->q_first = NULL; 2424 q->q_last = NULL; 2425 q->q_count = 0; 2426 q->q_mblkcnt = 0; 2427 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2428 qbp->qb_first = NULL; 2429 qbp->qb_last = NULL; 2430 qbp->qb_count = 0; 2431 qbp->qb_mblkcnt = 0; 2432 qbp->qb_flag &= ~QB_FULL; 2433 } 2434 q->q_flag &= ~QFULL; 2435 mutex_exit(QLOCK(q)); 2436 while (mp) { 2437 nmp = mp->b_next; 2438 mp->b_next = mp->b_prev = NULL; 2439 if ((mp->b_band == 0) && 2440 ((flag == FLUSHALL) || 2441 datamsg(mp->b_datap->db_type))) 2442 freemsg(mp); 2443 else 2444 (void) putq(q, mp); 2445 mp = nmp; 2446 } 2447 mutex_enter(QLOCK(q)); 2448 if ((q->q_flag & QWANTW) && 2449 (((q->q_count < q->q_lowat) && 2450 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2451 q->q_flag &= ~QWANTW; 2452 mutex_exit(QLOCK(q)); 2453 2454 backenable(q, pri); 2455 } else 2456 mutex_exit(QLOCK(q)); 2457 } else { /* pri != 0 */ 2458 boolean_t flushed = B_FALSE; 2459 band = pri; 2460 2461 ASSERT(MUTEX_HELD(QLOCK(q))); 2462 qbp = q->q_bandp; 2463 while (--band > 0) 2464 qbp = qbp->qb_next; 2465 mp = qbp->qb_first; 2466 if (mp == NULL) { 2467 mutex_exit(QLOCK(q)); 2468 return; 2469 } 2470 last = qbp->qb_last->b_next; 2471 /* 2472 * rmvq_noenab() and freemsg() are called for each mblk that 2473 * meets the criteria. The loop is executed until the last 2474 * mblk has been processed. 2475 */ 2476 while (mp != last) { 2477 ASSERT(mp->b_band == pri); 2478 nmp = mp->b_next; 2479 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2480 rmvq_noenab(q, mp); 2481 freemsg(mp); 2482 flushed = B_TRUE; 2483 } 2484 mp = nmp; 2485 } 2486 mutex_exit(QLOCK(q)); 2487 2488 /* 2489 * If any mblk(s) has been freed, we know that qbackenable() 2490 * will need to be called. 2491 */ 2492 if (flushed) 2493 qbackenable(q, pri); 2494 } 2495 } 2496 2497 /* 2498 * Return 1 if the queue is not full. If the queue is full, return 2499 * 0 (may not put message) and set QWANTW flag (caller wants to write 2500 * to the queue). 2501 */ 2502 int 2503 canput(queue_t *q) 2504 { 2505 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2506 2507 /* this is for loopback transports, they should not do a canput */ 2508 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2509 2510 /* Find next forward module that has a service procedure */ 2511 q = q->q_nfsrv; 2512 2513 if (!(q->q_flag & QFULL)) { 2514 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2515 return (1); 2516 } 2517 mutex_enter(QLOCK(q)); 2518 if (q->q_flag & QFULL) { 2519 q->q_flag |= QWANTW; 2520 mutex_exit(QLOCK(q)); 2521 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2522 return (0); 2523 } 2524 mutex_exit(QLOCK(q)); 2525 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2526 return (1); 2527 } 2528 2529 /* 2530 * This is the new canput for use with priority bands. Return 1 if the 2531 * band is not full. If the band is full, return 0 (may not put message) 2532 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2533 * write to the queue). 2534 */ 2535 int 2536 bcanput(queue_t *q, unsigned char pri) 2537 { 2538 qband_t *qbp; 2539 2540 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2541 if (!q) 2542 return (0); 2543 2544 /* Find next forward module that has a service procedure */ 2545 q = q->q_nfsrv; 2546 2547 mutex_enter(QLOCK(q)); 2548 if (pri == 0) { 2549 if (q->q_flag & QFULL) { 2550 q->q_flag |= QWANTW; 2551 mutex_exit(QLOCK(q)); 2552 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2553 "bcanput:%p %X %d", q, pri, 0); 2554 return (0); 2555 } 2556 } else { /* pri != 0 */ 2557 if (pri > q->q_nband) { 2558 /* 2559 * No band exists yet, so return success. 2560 */ 2561 mutex_exit(QLOCK(q)); 2562 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2563 "bcanput:%p %X %d", q, pri, 1); 2564 return (1); 2565 } 2566 qbp = q->q_bandp; 2567 while (--pri) 2568 qbp = qbp->qb_next; 2569 if (qbp->qb_flag & QB_FULL) { 2570 qbp->qb_flag |= QB_WANTW; 2571 mutex_exit(QLOCK(q)); 2572 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2573 "bcanput:%p %X %d", q, pri, 0); 2574 return (0); 2575 } 2576 } 2577 mutex_exit(QLOCK(q)); 2578 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2579 "bcanput:%p %X %d", q, pri, 1); 2580 return (1); 2581 } 2582 2583 /* 2584 * Put a message on a queue. 2585 * 2586 * Messages are enqueued on a priority basis. The priority classes 2587 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2588 * and B_NORMAL (type < QPCTL && band == 0). 2589 * 2590 * Add appropriate weighted data block sizes to queue count. 2591 * If queue hits high water mark then set QFULL flag. 2592 * 2593 * If QNOENAB is not set (putq is allowed to enable the queue), 2594 * enable the queue only if the message is PRIORITY, 2595 * or the QWANTR flag is set (indicating that the service procedure 2596 * is ready to read the queue. This implies that a service 2597 * procedure must NEVER put a high priority message back on its own 2598 * queue, as this would result in an infinite loop (!). 2599 */ 2600 int 2601 putq(queue_t *q, mblk_t *bp) 2602 { 2603 mblk_t *tmp; 2604 qband_t *qbp = NULL; 2605 int mcls = (int)queclass(bp); 2606 kthread_id_t freezer; 2607 int bytecnt = 0, mblkcnt = 0; 2608 2609 freezer = STREAM(q)->sd_freezer; 2610 if (freezer == curthread) { 2611 ASSERT(frozenstr(q)); 2612 ASSERT(MUTEX_HELD(QLOCK(q))); 2613 } else 2614 mutex_enter(QLOCK(q)); 2615 2616 /* 2617 * Make sanity checks and if qband structure is not yet 2618 * allocated, do so. 2619 */ 2620 if (mcls == QPCTL) { 2621 if (bp->b_band != 0) 2622 bp->b_band = 0; /* force to be correct */ 2623 } else if (bp->b_band != 0) { 2624 int i; 2625 qband_t **qbpp; 2626 2627 if (bp->b_band > q->q_nband) { 2628 2629 /* 2630 * The qband structure for this priority band is 2631 * not on the queue yet, so we have to allocate 2632 * one on the fly. It would be wasteful to 2633 * associate the qband structures with every 2634 * queue when the queues are allocated. This is 2635 * because most queues will only need the normal 2636 * band of flow which can be described entirely 2637 * by the queue itself. 2638 */ 2639 qbpp = &q->q_bandp; 2640 while (*qbpp) 2641 qbpp = &(*qbpp)->qb_next; 2642 while (bp->b_band > q->q_nband) { 2643 if ((*qbpp = allocband()) == NULL) { 2644 if (freezer != curthread) 2645 mutex_exit(QLOCK(q)); 2646 return (0); 2647 } 2648 (*qbpp)->qb_hiwat = q->q_hiwat; 2649 (*qbpp)->qb_lowat = q->q_lowat; 2650 q->q_nband++; 2651 qbpp = &(*qbpp)->qb_next; 2652 } 2653 } 2654 ASSERT(MUTEX_HELD(QLOCK(q))); 2655 qbp = q->q_bandp; 2656 i = bp->b_band; 2657 while (--i) 2658 qbp = qbp->qb_next; 2659 } 2660 2661 /* 2662 * If queue is empty, add the message and initialize the pointers. 2663 * Otherwise, adjust message pointers and queue pointers based on 2664 * the type of the message and where it belongs on the queue. Some 2665 * code is duplicated to minimize the number of conditionals and 2666 * hopefully minimize the amount of time this routine takes. 2667 */ 2668 if (!q->q_first) { 2669 bp->b_next = NULL; 2670 bp->b_prev = NULL; 2671 q->q_first = bp; 2672 q->q_last = bp; 2673 if (qbp) { 2674 qbp->qb_first = bp; 2675 qbp->qb_last = bp; 2676 } 2677 } else if (!qbp) { /* bp->b_band == 0 */ 2678 2679 /* 2680 * If queue class of message is less than or equal to 2681 * that of the last one on the queue, tack on to the end. 2682 */ 2683 tmp = q->q_last; 2684 if (mcls <= (int)queclass(tmp)) { 2685 bp->b_next = NULL; 2686 bp->b_prev = tmp; 2687 tmp->b_next = bp; 2688 q->q_last = bp; 2689 } else { 2690 tmp = q->q_first; 2691 while ((int)queclass(tmp) >= mcls) 2692 tmp = tmp->b_next; 2693 2694 /* 2695 * Insert bp before tmp. 2696 */ 2697 bp->b_next = tmp; 2698 bp->b_prev = tmp->b_prev; 2699 if (tmp->b_prev) 2700 tmp->b_prev->b_next = bp; 2701 else 2702 q->q_first = bp; 2703 tmp->b_prev = bp; 2704 } 2705 } else { /* bp->b_band != 0 */ 2706 if (qbp->qb_first) { 2707 tmp = qbp->qb_last; 2708 2709 /* 2710 * Insert bp after the last message in this band. 2711 */ 2712 bp->b_next = tmp->b_next; 2713 if (tmp->b_next) 2714 tmp->b_next->b_prev = bp; 2715 else 2716 q->q_last = bp; 2717 bp->b_prev = tmp; 2718 tmp->b_next = bp; 2719 } else { 2720 tmp = q->q_last; 2721 if ((mcls < (int)queclass(tmp)) || 2722 (bp->b_band <= tmp->b_band)) { 2723 2724 /* 2725 * Tack bp on end of queue. 2726 */ 2727 bp->b_next = NULL; 2728 bp->b_prev = tmp; 2729 tmp->b_next = bp; 2730 q->q_last = bp; 2731 } else { 2732 tmp = q->q_first; 2733 while (tmp->b_datap->db_type >= QPCTL) 2734 tmp = tmp->b_next; 2735 while (tmp->b_band >= bp->b_band) 2736 tmp = tmp->b_next; 2737 2738 /* 2739 * Insert bp before tmp. 2740 */ 2741 bp->b_next = tmp; 2742 bp->b_prev = tmp->b_prev; 2743 if (tmp->b_prev) 2744 tmp->b_prev->b_next = bp; 2745 else 2746 q->q_first = bp; 2747 tmp->b_prev = bp; 2748 } 2749 qbp->qb_first = bp; 2750 } 2751 qbp->qb_last = bp; 2752 } 2753 2754 /* Get message byte count for q_count accounting */ 2755 bytecnt = mp_cont_len(bp, &mblkcnt); 2756 2757 if (qbp) { 2758 qbp->qb_count += bytecnt; 2759 qbp->qb_mblkcnt += mblkcnt; 2760 if ((qbp->qb_count >= qbp->qb_hiwat) || 2761 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2762 qbp->qb_flag |= QB_FULL; 2763 } 2764 } else { 2765 q->q_count += bytecnt; 2766 q->q_mblkcnt += mblkcnt; 2767 if ((q->q_count >= q->q_hiwat) || 2768 (q->q_mblkcnt >= q->q_hiwat)) { 2769 q->q_flag |= QFULL; 2770 } 2771 } 2772 2773 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2774 2775 if ((mcls > QNORM) || 2776 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2777 qenable_locked(q); 2778 ASSERT(MUTEX_HELD(QLOCK(q))); 2779 if (freezer != curthread) 2780 mutex_exit(QLOCK(q)); 2781 2782 return (1); 2783 } 2784 2785 /* 2786 * Put stuff back at beginning of Q according to priority order. 2787 * See comment on putq above for details. 2788 */ 2789 int 2790 putbq(queue_t *q, mblk_t *bp) 2791 { 2792 mblk_t *tmp; 2793 qband_t *qbp = NULL; 2794 int mcls = (int)queclass(bp); 2795 kthread_id_t freezer; 2796 int bytecnt = 0, mblkcnt = 0; 2797 2798 ASSERT(q && bp); 2799 ASSERT(bp->b_next == NULL); 2800 freezer = STREAM(q)->sd_freezer; 2801 if (freezer == curthread) { 2802 ASSERT(frozenstr(q)); 2803 ASSERT(MUTEX_HELD(QLOCK(q))); 2804 } else 2805 mutex_enter(QLOCK(q)); 2806 2807 /* 2808 * Make sanity checks and if qband structure is not yet 2809 * allocated, do so. 2810 */ 2811 if (mcls == QPCTL) { 2812 if (bp->b_band != 0) 2813 bp->b_band = 0; /* force to be correct */ 2814 } else if (bp->b_band != 0) { 2815 int i; 2816 qband_t **qbpp; 2817 2818 if (bp->b_band > q->q_nband) { 2819 qbpp = &q->q_bandp; 2820 while (*qbpp) 2821 qbpp = &(*qbpp)->qb_next; 2822 while (bp->b_band > q->q_nband) { 2823 if ((*qbpp = allocband()) == NULL) { 2824 if (freezer != curthread) 2825 mutex_exit(QLOCK(q)); 2826 return (0); 2827 } 2828 (*qbpp)->qb_hiwat = q->q_hiwat; 2829 (*qbpp)->qb_lowat = q->q_lowat; 2830 q->q_nband++; 2831 qbpp = &(*qbpp)->qb_next; 2832 } 2833 } 2834 qbp = q->q_bandp; 2835 i = bp->b_band; 2836 while (--i) 2837 qbp = qbp->qb_next; 2838 } 2839 2840 /* 2841 * If queue is empty or if message is high priority, 2842 * place on the front of the queue. 2843 */ 2844 tmp = q->q_first; 2845 if ((!tmp) || (mcls == QPCTL)) { 2846 bp->b_next = tmp; 2847 if (tmp) 2848 tmp->b_prev = bp; 2849 else 2850 q->q_last = bp; 2851 q->q_first = bp; 2852 bp->b_prev = NULL; 2853 if (qbp) { 2854 qbp->qb_first = bp; 2855 qbp->qb_last = bp; 2856 } 2857 } else if (qbp) { /* bp->b_band != 0 */ 2858 tmp = qbp->qb_first; 2859 if (tmp) { 2860 2861 /* 2862 * Insert bp before the first message in this band. 2863 */ 2864 bp->b_next = tmp; 2865 bp->b_prev = tmp->b_prev; 2866 if (tmp->b_prev) 2867 tmp->b_prev->b_next = bp; 2868 else 2869 q->q_first = bp; 2870 tmp->b_prev = bp; 2871 } else { 2872 tmp = q->q_last; 2873 if ((mcls < (int)queclass(tmp)) || 2874 (bp->b_band < tmp->b_band)) { 2875 2876 /* 2877 * Tack bp on end of queue. 2878 */ 2879 bp->b_next = NULL; 2880 bp->b_prev = tmp; 2881 tmp->b_next = bp; 2882 q->q_last = bp; 2883 } else { 2884 tmp = q->q_first; 2885 while (tmp->b_datap->db_type >= QPCTL) 2886 tmp = tmp->b_next; 2887 while (tmp->b_band > bp->b_band) 2888 tmp = tmp->b_next; 2889 2890 /* 2891 * Insert bp before tmp. 2892 */ 2893 bp->b_next = tmp; 2894 bp->b_prev = tmp->b_prev; 2895 if (tmp->b_prev) 2896 tmp->b_prev->b_next = bp; 2897 else 2898 q->q_first = bp; 2899 tmp->b_prev = bp; 2900 } 2901 qbp->qb_last = bp; 2902 } 2903 qbp->qb_first = bp; 2904 } else { /* bp->b_band == 0 && !QPCTL */ 2905 2906 /* 2907 * If the queue class or band is less than that of the last 2908 * message on the queue, tack bp on the end of the queue. 2909 */ 2910 tmp = q->q_last; 2911 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2912 bp->b_next = NULL; 2913 bp->b_prev = tmp; 2914 tmp->b_next = bp; 2915 q->q_last = bp; 2916 } else { 2917 tmp = q->q_first; 2918 while (tmp->b_datap->db_type >= QPCTL) 2919 tmp = tmp->b_next; 2920 while (tmp->b_band > bp->b_band) 2921 tmp = tmp->b_next; 2922 2923 /* 2924 * Insert bp before tmp. 2925 */ 2926 bp->b_next = tmp; 2927 bp->b_prev = tmp->b_prev; 2928 if (tmp->b_prev) 2929 tmp->b_prev->b_next = bp; 2930 else 2931 q->q_first = bp; 2932 tmp->b_prev = bp; 2933 } 2934 } 2935 2936 /* Get message byte count for q_count accounting */ 2937 bytecnt = mp_cont_len(bp, &mblkcnt); 2938 2939 if (qbp) { 2940 qbp->qb_count += bytecnt; 2941 qbp->qb_mblkcnt += mblkcnt; 2942 if ((qbp->qb_count >= qbp->qb_hiwat) || 2943 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2944 qbp->qb_flag |= QB_FULL; 2945 } 2946 } else { 2947 q->q_count += bytecnt; 2948 q->q_mblkcnt += mblkcnt; 2949 if ((q->q_count >= q->q_hiwat) || 2950 (q->q_mblkcnt >= q->q_hiwat)) { 2951 q->q_flag |= QFULL; 2952 } 2953 } 2954 2955 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2956 2957 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2958 qenable_locked(q); 2959 ASSERT(MUTEX_HELD(QLOCK(q))); 2960 if (freezer != curthread) 2961 mutex_exit(QLOCK(q)); 2962 2963 return (1); 2964 } 2965 2966 /* 2967 * Insert a message before an existing message on the queue. If the 2968 * existing message is NULL, the new messages is placed on the end of 2969 * the queue. The queue class of the new message is ignored. However, 2970 * the priority band of the new message must adhere to the following 2971 * ordering: 2972 * 2973 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2974 * 2975 * All flow control parameters are updated. 2976 * 2977 * insq can be called with the stream frozen, but other utility functions 2978 * holding QLOCK, and by streams modules without any locks/frozen. 2979 */ 2980 int 2981 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2982 { 2983 mblk_t *tmp; 2984 qband_t *qbp = NULL; 2985 int mcls = (int)queclass(mp); 2986 kthread_id_t freezer; 2987 int bytecnt = 0, mblkcnt = 0; 2988 2989 freezer = STREAM(q)->sd_freezer; 2990 if (freezer == curthread) { 2991 ASSERT(frozenstr(q)); 2992 ASSERT(MUTEX_HELD(QLOCK(q))); 2993 } else if (MUTEX_HELD(QLOCK(q))) { 2994 /* Don't drop lock on exit */ 2995 freezer = curthread; 2996 } else 2997 mutex_enter(QLOCK(q)); 2998 2999 if (mcls == QPCTL) { 3000 if (mp->b_band != 0) 3001 mp->b_band = 0; /* force to be correct */ 3002 if (emp && emp->b_prev && 3003 (emp->b_prev->b_datap->db_type < QPCTL)) 3004 goto badord; 3005 } 3006 if (emp) { 3007 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 3008 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 3009 (emp->b_prev->b_band < mp->b_band))) { 3010 goto badord; 3011 } 3012 } else { 3013 tmp = q->q_last; 3014 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 3015 badord: 3016 cmn_err(CE_WARN, 3017 "insq: attempt to insert message out of order " 3018 "on q %p", (void *)q); 3019 if (freezer != curthread) 3020 mutex_exit(QLOCK(q)); 3021 return (0); 3022 } 3023 } 3024 3025 if (mp->b_band != 0) { 3026 int i; 3027 qband_t **qbpp; 3028 3029 if (mp->b_band > q->q_nband) { 3030 qbpp = &q->q_bandp; 3031 while (*qbpp) 3032 qbpp = &(*qbpp)->qb_next; 3033 while (mp->b_band > q->q_nband) { 3034 if ((*qbpp = allocband()) == NULL) { 3035 if (freezer != curthread) 3036 mutex_exit(QLOCK(q)); 3037 return (0); 3038 } 3039 (*qbpp)->qb_hiwat = q->q_hiwat; 3040 (*qbpp)->qb_lowat = q->q_lowat; 3041 q->q_nband++; 3042 qbpp = &(*qbpp)->qb_next; 3043 } 3044 } 3045 qbp = q->q_bandp; 3046 i = mp->b_band; 3047 while (--i) 3048 qbp = qbp->qb_next; 3049 } 3050 3051 if ((mp->b_next = emp) != NULL) { 3052 if ((mp->b_prev = emp->b_prev) != NULL) 3053 emp->b_prev->b_next = mp; 3054 else 3055 q->q_first = mp; 3056 emp->b_prev = mp; 3057 } else { 3058 if ((mp->b_prev = q->q_last) != NULL) 3059 q->q_last->b_next = mp; 3060 else 3061 q->q_first = mp; 3062 q->q_last = mp; 3063 } 3064 3065 /* Get mblk and byte count for q_count accounting */ 3066 bytecnt = mp_cont_len(mp, &mblkcnt); 3067 3068 if (qbp) { /* adjust qband pointers and count */ 3069 if (!qbp->qb_first) { 3070 qbp->qb_first = mp; 3071 qbp->qb_last = mp; 3072 } else { 3073 if (mp->b_prev == NULL || (mp->b_prev != NULL && 3074 (mp->b_prev->b_band != mp->b_band))) 3075 qbp->qb_first = mp; 3076 else if (mp->b_next == NULL || (mp->b_next != NULL && 3077 (mp->b_next->b_band != mp->b_band))) 3078 qbp->qb_last = mp; 3079 } 3080 qbp->qb_count += bytecnt; 3081 qbp->qb_mblkcnt += mblkcnt; 3082 if ((qbp->qb_count >= qbp->qb_hiwat) || 3083 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 3084 qbp->qb_flag |= QB_FULL; 3085 } 3086 } else { 3087 q->q_count += bytecnt; 3088 q->q_mblkcnt += mblkcnt; 3089 if ((q->q_count >= q->q_hiwat) || 3090 (q->q_mblkcnt >= q->q_hiwat)) { 3091 q->q_flag |= QFULL; 3092 } 3093 } 3094 3095 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 3096 3097 if (canenable(q) && (q->q_flag & QWANTR)) 3098 qenable_locked(q); 3099 3100 ASSERT(MUTEX_HELD(QLOCK(q))); 3101 if (freezer != curthread) 3102 mutex_exit(QLOCK(q)); 3103 3104 return (1); 3105 } 3106 3107 /* 3108 * Create and put a control message on queue. 3109 */ 3110 int 3111 putctl(queue_t *q, int type) 3112 { 3113 mblk_t *bp; 3114 3115 if ((datamsg(type) && (type != M_DELAY)) || 3116 (bp = allocb_tryhard(0)) == NULL) 3117 return (0); 3118 bp->b_datap->db_type = (unsigned char) type; 3119 3120 put(q, bp); 3121 3122 return (1); 3123 } 3124 3125 /* 3126 * Control message with a single-byte parameter 3127 */ 3128 int 3129 putctl1(queue_t *q, int type, int param) 3130 { 3131 mblk_t *bp; 3132 3133 if ((datamsg(type) && (type != M_DELAY)) || 3134 (bp = allocb_tryhard(1)) == NULL) 3135 return (0); 3136 bp->b_datap->db_type = (unsigned char)type; 3137 *bp->b_wptr++ = (unsigned char)param; 3138 3139 put(q, bp); 3140 3141 return (1); 3142 } 3143 3144 int 3145 putnextctl1(queue_t *q, int type, int param) 3146 { 3147 mblk_t *bp; 3148 3149 if ((datamsg(type) && (type != M_DELAY)) || 3150 ((bp = allocb_tryhard(1)) == NULL)) 3151 return (0); 3152 3153 bp->b_datap->db_type = (unsigned char)type; 3154 *bp->b_wptr++ = (unsigned char)param; 3155 3156 putnext(q, bp); 3157 3158 return (1); 3159 } 3160 3161 int 3162 putnextctl(queue_t *q, int type) 3163 { 3164 mblk_t *bp; 3165 3166 if ((datamsg(type) && (type != M_DELAY)) || 3167 ((bp = allocb_tryhard(0)) == NULL)) 3168 return (0); 3169 bp->b_datap->db_type = (unsigned char)type; 3170 3171 putnext(q, bp); 3172 3173 return (1); 3174 } 3175 3176 /* 3177 * Return the queue upstream from this one 3178 */ 3179 queue_t * 3180 backq(queue_t *q) 3181 { 3182 q = _OTHERQ(q); 3183 if (q->q_next) { 3184 q = q->q_next; 3185 return (_OTHERQ(q)); 3186 } 3187 return (NULL); 3188 } 3189 3190 /* 3191 * Send a block back up the queue in reverse from this 3192 * one (e.g. to respond to ioctls) 3193 */ 3194 void 3195 qreply(queue_t *q, mblk_t *bp) 3196 { 3197 ASSERT(q && bp); 3198 3199 putnext(_OTHERQ(q), bp); 3200 } 3201 3202 /* 3203 * Streams Queue Scheduling 3204 * 3205 * Queues are enabled through qenable() when they have messages to 3206 * process. They are serviced by queuerun(), which runs each enabled 3207 * queue's service procedure. The call to queuerun() is processor 3208 * dependent - the general principle is that it be run whenever a queue 3209 * is enabled but before returning to user level. For system calls, 3210 * the function runqueues() is called if their action causes a queue 3211 * to be enabled. For device interrupts, queuerun() should be 3212 * called before returning from the last level of interrupt. Beyond 3213 * this, no timing assumptions should be made about queue scheduling. 3214 */ 3215 3216 /* 3217 * Enable a queue: put it on list of those whose service procedures are 3218 * ready to run and set up the scheduling mechanism. 3219 * The broadcast is done outside the mutex -> to avoid the woken thread 3220 * from contending with the mutex. This is OK 'cos the queue has been 3221 * enqueued on the runlist and flagged safely at this point. 3222 */ 3223 void 3224 qenable(queue_t *q) 3225 { 3226 mutex_enter(QLOCK(q)); 3227 qenable_locked(q); 3228 mutex_exit(QLOCK(q)); 3229 } 3230 /* 3231 * Return number of messages on queue 3232 */ 3233 int 3234 qsize(queue_t *qp) 3235 { 3236 int count = 0; 3237 mblk_t *mp; 3238 3239 mutex_enter(QLOCK(qp)); 3240 for (mp = qp->q_first; mp; mp = mp->b_next) 3241 count++; 3242 mutex_exit(QLOCK(qp)); 3243 return (count); 3244 } 3245 3246 /* 3247 * noenable - set queue so that putq() will not enable it. 3248 * enableok - set queue so that putq() can enable it. 3249 */ 3250 void 3251 noenable(queue_t *q) 3252 { 3253 mutex_enter(QLOCK(q)); 3254 q->q_flag |= QNOENB; 3255 mutex_exit(QLOCK(q)); 3256 } 3257 3258 void 3259 enableok(queue_t *q) 3260 { 3261 mutex_enter(QLOCK(q)); 3262 q->q_flag &= ~QNOENB; 3263 mutex_exit(QLOCK(q)); 3264 } 3265 3266 /* 3267 * Set queue fields. 3268 */ 3269 int 3270 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 3271 { 3272 qband_t *qbp = NULL; 3273 queue_t *wrq; 3274 int error = 0; 3275 kthread_id_t freezer; 3276 3277 freezer = STREAM(q)->sd_freezer; 3278 if (freezer == curthread) { 3279 ASSERT(frozenstr(q)); 3280 ASSERT(MUTEX_HELD(QLOCK(q))); 3281 } else 3282 mutex_enter(QLOCK(q)); 3283 3284 if (what >= QBAD) { 3285 error = EINVAL; 3286 goto done; 3287 } 3288 if (pri != 0) { 3289 int i; 3290 qband_t **qbpp; 3291 3292 if (pri > q->q_nband) { 3293 qbpp = &q->q_bandp; 3294 while (*qbpp) 3295 qbpp = &(*qbpp)->qb_next; 3296 while (pri > q->q_nband) { 3297 if ((*qbpp = allocband()) == NULL) { 3298 error = EAGAIN; 3299 goto done; 3300 } 3301 (*qbpp)->qb_hiwat = q->q_hiwat; 3302 (*qbpp)->qb_lowat = q->q_lowat; 3303 q->q_nband++; 3304 qbpp = &(*qbpp)->qb_next; 3305 } 3306 } 3307 qbp = q->q_bandp; 3308 i = pri; 3309 while (--i) 3310 qbp = qbp->qb_next; 3311 } 3312 switch (what) { 3313 3314 case QHIWAT: 3315 if (qbp) 3316 qbp->qb_hiwat = (size_t)val; 3317 else 3318 q->q_hiwat = (size_t)val; 3319 break; 3320 3321 case QLOWAT: 3322 if (qbp) 3323 qbp->qb_lowat = (size_t)val; 3324 else 3325 q->q_lowat = (size_t)val; 3326 break; 3327 3328 case QMAXPSZ: 3329 if (qbp) 3330 error = EINVAL; 3331 else 3332 q->q_maxpsz = (ssize_t)val; 3333 3334 /* 3335 * Performance concern, strwrite looks at the module below 3336 * the stream head for the maxpsz each time it does a write 3337 * we now cache it at the stream head. Check to see if this 3338 * queue is sitting directly below the stream head. 3339 */ 3340 wrq = STREAM(q)->sd_wrq; 3341 if (q != wrq->q_next) 3342 break; 3343 3344 /* 3345 * If the stream is not frozen drop the current QLOCK and 3346 * acquire the sd_wrq QLOCK which protects sd_qn_* 3347 */ 3348 if (freezer != curthread) { 3349 mutex_exit(QLOCK(q)); 3350 mutex_enter(QLOCK(wrq)); 3351 } 3352 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3353 3354 if (strmsgsz != 0) { 3355 if (val == INFPSZ) 3356 val = strmsgsz; 3357 else { 3358 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3359 val = MIN(PIPE_BUF, val); 3360 else 3361 val = MIN(strmsgsz, val); 3362 } 3363 } 3364 STREAM(q)->sd_qn_maxpsz = val; 3365 if (freezer != curthread) { 3366 mutex_exit(QLOCK(wrq)); 3367 mutex_enter(QLOCK(q)); 3368 } 3369 break; 3370 3371 case QMINPSZ: 3372 if (qbp) 3373 error = EINVAL; 3374 else 3375 q->q_minpsz = (ssize_t)val; 3376 3377 /* 3378 * Performance concern, strwrite looks at the module below 3379 * the stream head for the maxpsz each time it does a write 3380 * we now cache it at the stream head. Check to see if this 3381 * queue is sitting directly below the stream head. 3382 */ 3383 wrq = STREAM(q)->sd_wrq; 3384 if (q != wrq->q_next) 3385 break; 3386 3387 /* 3388 * If the stream is not frozen drop the current QLOCK and 3389 * acquire the sd_wrq QLOCK which protects sd_qn_* 3390 */ 3391 if (freezer != curthread) { 3392 mutex_exit(QLOCK(q)); 3393 mutex_enter(QLOCK(wrq)); 3394 } 3395 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3396 3397 if (freezer != curthread) { 3398 mutex_exit(QLOCK(wrq)); 3399 mutex_enter(QLOCK(q)); 3400 } 3401 break; 3402 3403 case QSTRUIOT: 3404 if (qbp) 3405 error = EINVAL; 3406 else 3407 q->q_struiot = (ushort_t)val; 3408 break; 3409 3410 case QCOUNT: 3411 case QFIRST: 3412 case QLAST: 3413 case QFLAG: 3414 error = EPERM; 3415 break; 3416 3417 default: 3418 error = EINVAL; 3419 break; 3420 } 3421 done: 3422 if (freezer != curthread) 3423 mutex_exit(QLOCK(q)); 3424 return (error); 3425 } 3426 3427 /* 3428 * Get queue fields. 3429 */ 3430 int 3431 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3432 { 3433 qband_t *qbp = NULL; 3434 int error = 0; 3435 kthread_id_t freezer; 3436 3437 freezer = STREAM(q)->sd_freezer; 3438 if (freezer == curthread) { 3439 ASSERT(frozenstr(q)); 3440 ASSERT(MUTEX_HELD(QLOCK(q))); 3441 } else 3442 mutex_enter(QLOCK(q)); 3443 if (what >= QBAD) { 3444 error = EINVAL; 3445 goto done; 3446 } 3447 if (pri != 0) { 3448 int i; 3449 qband_t **qbpp; 3450 3451 if (pri > q->q_nband) { 3452 qbpp = &q->q_bandp; 3453 while (*qbpp) 3454 qbpp = &(*qbpp)->qb_next; 3455 while (pri > q->q_nband) { 3456 if ((*qbpp = allocband()) == NULL) { 3457 error = EAGAIN; 3458 goto done; 3459 } 3460 (*qbpp)->qb_hiwat = q->q_hiwat; 3461 (*qbpp)->qb_lowat = q->q_lowat; 3462 q->q_nband++; 3463 qbpp = &(*qbpp)->qb_next; 3464 } 3465 } 3466 qbp = q->q_bandp; 3467 i = pri; 3468 while (--i) 3469 qbp = qbp->qb_next; 3470 } 3471 switch (what) { 3472 case QHIWAT: 3473 if (qbp) 3474 *(size_t *)valp = qbp->qb_hiwat; 3475 else 3476 *(size_t *)valp = q->q_hiwat; 3477 break; 3478 3479 case QLOWAT: 3480 if (qbp) 3481 *(size_t *)valp = qbp->qb_lowat; 3482 else 3483 *(size_t *)valp = q->q_lowat; 3484 break; 3485 3486 case QMAXPSZ: 3487 if (qbp) 3488 error = EINVAL; 3489 else 3490 *(ssize_t *)valp = q->q_maxpsz; 3491 break; 3492 3493 case QMINPSZ: 3494 if (qbp) 3495 error = EINVAL; 3496 else 3497 *(ssize_t *)valp = q->q_minpsz; 3498 break; 3499 3500 case QCOUNT: 3501 if (qbp) 3502 *(size_t *)valp = qbp->qb_count; 3503 else 3504 *(size_t *)valp = q->q_count; 3505 break; 3506 3507 case QFIRST: 3508 if (qbp) 3509 *(mblk_t **)valp = qbp->qb_first; 3510 else 3511 *(mblk_t **)valp = q->q_first; 3512 break; 3513 3514 case QLAST: 3515 if (qbp) 3516 *(mblk_t **)valp = qbp->qb_last; 3517 else 3518 *(mblk_t **)valp = q->q_last; 3519 break; 3520 3521 case QFLAG: 3522 if (qbp) 3523 *(uint_t *)valp = qbp->qb_flag; 3524 else 3525 *(uint_t *)valp = q->q_flag; 3526 break; 3527 3528 case QSTRUIOT: 3529 if (qbp) 3530 error = EINVAL; 3531 else 3532 *(short *)valp = q->q_struiot; 3533 break; 3534 3535 default: 3536 error = EINVAL; 3537 break; 3538 } 3539 done: 3540 if (freezer != curthread) 3541 mutex_exit(QLOCK(q)); 3542 return (error); 3543 } 3544 3545 /* 3546 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3547 * QWANTWSYNC or QWANTR or QWANTW, 3548 * 3549 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3550 * deferred wakeup will be done. Also if strpoll() in progress then a 3551 * deferred pollwakeup will be done. 3552 */ 3553 void 3554 strwakeq(queue_t *q, int flag) 3555 { 3556 stdata_t *stp = STREAM(q); 3557 pollhead_t *pl; 3558 3559 mutex_enter(&stp->sd_lock); 3560 pl = &stp->sd_pollist; 3561 if (flag & QWANTWSYNC) { 3562 ASSERT(!(q->q_flag & QREADR)); 3563 if (stp->sd_flag & WSLEEP) { 3564 stp->sd_flag &= ~WSLEEP; 3565 cv_broadcast(&stp->sd_wrq->q_wait); 3566 } else { 3567 stp->sd_wakeq |= WSLEEP; 3568 } 3569 3570 mutex_exit(&stp->sd_lock); 3571 pollwakeup(pl, POLLWRNORM); 3572 mutex_enter(&stp->sd_lock); 3573 3574 if (stp->sd_sigflags & S_WRNORM) 3575 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3576 } else if (flag & QWANTR) { 3577 if (stp->sd_flag & RSLEEP) { 3578 stp->sd_flag &= ~RSLEEP; 3579 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3580 } else { 3581 stp->sd_wakeq |= RSLEEP; 3582 } 3583 3584 mutex_exit(&stp->sd_lock); 3585 pollwakeup(pl, POLLIN | POLLRDNORM); 3586 mutex_enter(&stp->sd_lock); 3587 3588 { 3589 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3590 3591 if (events) 3592 strsendsig(stp->sd_siglist, events, 0, 0); 3593 } 3594 } else { 3595 if (stp->sd_flag & WSLEEP) { 3596 stp->sd_flag &= ~WSLEEP; 3597 cv_broadcast(&stp->sd_wrq->q_wait); 3598 } 3599 3600 mutex_exit(&stp->sd_lock); 3601 pollwakeup(pl, POLLWRNORM); 3602 mutex_enter(&stp->sd_lock); 3603 3604 if (stp->sd_sigflags & S_WRNORM) 3605 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3606 } 3607 mutex_exit(&stp->sd_lock); 3608 } 3609 3610 int 3611 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3612 { 3613 stdata_t *stp = STREAM(q); 3614 int typ = STRUIOT_STANDARD; 3615 uio_t *uiop = &dp->d_uio; 3616 dblk_t *dbp; 3617 ssize_t uiocnt; 3618 ssize_t cnt; 3619 unsigned char *ptr; 3620 ssize_t resid; 3621 int error = 0; 3622 on_trap_data_t otd; 3623 queue_t *stwrq; 3624 3625 /* 3626 * Plumbing may change while taking the type so store the 3627 * queue in a temporary variable. It doesn't matter even 3628 * if the we take the type from the previous plumbing, 3629 * that's because if the plumbing has changed when we were 3630 * holding the queue in a temporary variable, we can continue 3631 * processing the message the way it would have been processed 3632 * in the old plumbing, without any side effects but a bit 3633 * extra processing for partial ip header checksum. 3634 * 3635 * This has been done to avoid holding the sd_lock which is 3636 * very hot. 3637 */ 3638 3639 stwrq = stp->sd_struiowrq; 3640 if (stwrq) 3641 typ = stwrq->q_struiot; 3642 3643 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3644 dbp = mp->b_datap; 3645 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3646 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3647 cnt = MIN(uiocnt, uiop->uio_resid); 3648 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3649 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3650 /* 3651 * Either this mblk has already been processed 3652 * or there is no more room in this mblk (?). 3653 */ 3654 continue; 3655 } 3656 switch (typ) { 3657 case STRUIOT_STANDARD: 3658 if (noblock) { 3659 if (on_trap(&otd, OT_DATA_ACCESS)) { 3660 no_trap(); 3661 error = EWOULDBLOCK; 3662 goto out; 3663 } 3664 } 3665 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3666 if (noblock) 3667 no_trap(); 3668 goto out; 3669 } 3670 if (noblock) 3671 no_trap(); 3672 break; 3673 3674 default: 3675 error = EIO; 3676 goto out; 3677 } 3678 dbp->db_struioflag |= STRUIO_DONE; 3679 dbp->db_cksumstuff += cnt; 3680 } 3681 out: 3682 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3683 /* 3684 * A fault has occured and some bytes were moved to the 3685 * current mblk, the uio_t has already been updated by 3686 * the appropriate uio routine, so also update the mblk 3687 * to reflect this in case this same mblk chain is used 3688 * again (after the fault has been handled). 3689 */ 3690 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3691 if (uiocnt >= resid) 3692 dbp->db_cksumstuff += resid; 3693 } 3694 return (error); 3695 } 3696 3697 /* 3698 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3699 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3700 * that removeq() will not try to close the queue while a thread is inside the 3701 * queue. 3702 */ 3703 static boolean_t 3704 rwnext_enter(queue_t *qp) 3705 { 3706 mutex_enter(QLOCK(qp)); 3707 if (qp->q_flag & QWCLOSE) { 3708 mutex_exit(QLOCK(qp)); 3709 return (B_FALSE); 3710 } 3711 qp->q_rwcnt++; 3712 ASSERT(qp->q_rwcnt != 0); 3713 mutex_exit(QLOCK(qp)); 3714 return (B_TRUE); 3715 } 3716 3717 /* 3718 * Decrease the count of threads running in sync stream queue and wake up any 3719 * threads blocked in removeq(). 3720 */ 3721 static void 3722 rwnext_exit(queue_t *qp) 3723 { 3724 mutex_enter(QLOCK(qp)); 3725 qp->q_rwcnt--; 3726 if (qp->q_flag & QWANTRMQSYNC) { 3727 qp->q_flag &= ~QWANTRMQSYNC; 3728 cv_broadcast(&qp->q_wait); 3729 } 3730 mutex_exit(QLOCK(qp)); 3731 } 3732 3733 /* 3734 * The purpose of rwnext() is to call the rw procedure of the next 3735 * (downstream) modules queue. 3736 * 3737 * treated as put entrypoint for perimeter syncronization. 3738 * 3739 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3740 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3741 * not matter if any regular put entrypoints have been already entered. We 3742 * can't increment one of the sq_putcounts (instead of sq_count) because 3743 * qwait_rw won't know which counter to decrement. 3744 * 3745 * It would be reasonable to add the lockless FASTPUT logic. 3746 */ 3747 int 3748 rwnext(queue_t *qp, struiod_t *dp) 3749 { 3750 queue_t *nqp; 3751 syncq_t *sq; 3752 uint16_t count; 3753 uint16_t flags; 3754 struct qinit *qi; 3755 int (*proc)(); 3756 struct stdata *stp; 3757 int isread; 3758 int rval; 3759 3760 stp = STREAM(qp); 3761 /* 3762 * Prevent q_next from changing by holding sd_lock until acquiring 3763 * SQLOCK. Note that a read-side rwnext from the streamhead will 3764 * already have sd_lock acquired. In either case sd_lock is always 3765 * released after acquiring SQLOCK. 3766 * 3767 * The streamhead read-side holding sd_lock when calling rwnext is 3768 * required to prevent a race condition were M_DATA mblks flowing 3769 * up the read-side of the stream could be bypassed by a rwnext() 3770 * down-call. In this case sd_lock acts as the streamhead perimeter. 3771 */ 3772 if ((nqp = _WR(qp)) == qp) { 3773 isread = 0; 3774 mutex_enter(&stp->sd_lock); 3775 qp = nqp->q_next; 3776 } else { 3777 isread = 1; 3778 if (nqp != stp->sd_wrq) 3779 /* Not streamhead */ 3780 mutex_enter(&stp->sd_lock); 3781 qp = _RD(nqp->q_next); 3782 } 3783 qi = qp->q_qinfo; 3784 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3785 /* 3786 * Not a synchronous module or no r/w procedure for this 3787 * queue, so just return EINVAL and let the caller handle it. 3788 */ 3789 mutex_exit(&stp->sd_lock); 3790 return (EINVAL); 3791 } 3792 3793 if (rwnext_enter(qp) == B_FALSE) { 3794 mutex_exit(&stp->sd_lock); 3795 return (EINVAL); 3796 } 3797 3798 sq = qp->q_syncq; 3799 mutex_enter(SQLOCK(sq)); 3800 mutex_exit(&stp->sd_lock); 3801 count = sq->sq_count; 3802 flags = sq->sq_flags; 3803 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3804 3805 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3806 /* 3807 * if this queue is being closed, return. 3808 */ 3809 if (qp->q_flag & QWCLOSE) { 3810 mutex_exit(SQLOCK(sq)); 3811 rwnext_exit(qp); 3812 return (EINVAL); 3813 } 3814 3815 /* 3816 * Wait until we can enter the inner perimeter. 3817 */ 3818 sq->sq_flags = flags | SQ_WANTWAKEUP; 3819 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3820 count = sq->sq_count; 3821 flags = sq->sq_flags; 3822 } 3823 3824 if (isread == 0 && stp->sd_struiowrq == NULL || 3825 isread == 1 && stp->sd_struiordq == NULL) { 3826 /* 3827 * Stream plumbing changed while waiting for inner perimeter 3828 * so just return EINVAL and let the caller handle it. 3829 */ 3830 mutex_exit(SQLOCK(sq)); 3831 rwnext_exit(qp); 3832 return (EINVAL); 3833 } 3834 if (!(flags & SQ_CIPUT)) 3835 sq->sq_flags = flags | SQ_EXCL; 3836 sq->sq_count = count + 1; 3837 ASSERT(sq->sq_count != 0); /* Wraparound */ 3838 /* 3839 * Note: The only message ordering guarantee that rwnext() makes is 3840 * for the write queue flow-control case. All others (r/w queue 3841 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3842 * the queue's rw procedure. This could be genralized here buy 3843 * running the queue's service procedure, but that wouldn't be 3844 * the most efficent for all cases. 3845 */ 3846 mutex_exit(SQLOCK(sq)); 3847 if (! isread && (qp->q_flag & QFULL)) { 3848 /* 3849 * Write queue may be flow controlled. If so, 3850 * mark the queue for wakeup when it's not. 3851 */ 3852 mutex_enter(QLOCK(qp)); 3853 if (qp->q_flag & QFULL) { 3854 qp->q_flag |= QWANTWSYNC; 3855 mutex_exit(QLOCK(qp)); 3856 rval = EWOULDBLOCK; 3857 goto out; 3858 } 3859 mutex_exit(QLOCK(qp)); 3860 } 3861 3862 if (! isread && dp->d_mp) 3863 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3864 dp->d_mp->b_datap->db_base); 3865 3866 rval = (*proc)(qp, dp); 3867 3868 if (isread && dp->d_mp) 3869 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3870 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3871 out: 3872 /* 3873 * The queue is protected from being freed by sq_count, so it is 3874 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3875 */ 3876 rwnext_exit(qp); 3877 3878 mutex_enter(SQLOCK(sq)); 3879 flags = sq->sq_flags; 3880 ASSERT(sq->sq_count != 0); 3881 sq->sq_count--; 3882 if (flags & SQ_TAIL) { 3883 putnext_tail(sq, qp, flags); 3884 /* 3885 * The only purpose of this ASSERT is to preserve calling stack 3886 * in DEBUG kernel. 3887 */ 3888 ASSERT(flags & SQ_TAIL); 3889 return (rval); 3890 } 3891 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3892 /* 3893 * Safe to always drop SQ_EXCL: 3894 * Not SQ_CIPUT means we set SQ_EXCL above 3895 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3896 * did a qwriter(INNER) in which case nobody else 3897 * is in the inner perimeter and we are exiting. 3898 * 3899 * I would like to make the following assertion: 3900 * 3901 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3902 * sq->sq_count == 0); 3903 * 3904 * which indicates that if we are both putshared and exclusive, 3905 * we became exclusive while executing the putproc, and the only 3906 * claim on the syncq was the one we dropped a few lines above. 3907 * But other threads that enter putnext while the syncq is exclusive 3908 * need to make a claim as they may need to drop SQLOCK in the 3909 * has_writers case to avoid deadlocks. If these threads are 3910 * delayed or preempted, it is possible that the writer thread can 3911 * find out that there are other claims making the (sq_count == 0) 3912 * test invalid. 3913 */ 3914 3915 sq->sq_flags = flags & ~SQ_EXCL; 3916 if (sq->sq_flags & SQ_WANTWAKEUP) { 3917 sq->sq_flags &= ~SQ_WANTWAKEUP; 3918 cv_broadcast(&sq->sq_wait); 3919 } 3920 mutex_exit(SQLOCK(sq)); 3921 return (rval); 3922 } 3923 3924 /* 3925 * The purpose of infonext() is to call the info procedure of the next 3926 * (downstream) modules queue. 3927 * 3928 * treated as put entrypoint for perimeter syncronization. 3929 * 3930 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3931 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3932 * it does not matter if any regular put entrypoints have been already 3933 * entered. 3934 */ 3935 int 3936 infonext(queue_t *qp, infod_t *idp) 3937 { 3938 queue_t *nqp; 3939 syncq_t *sq; 3940 uint16_t count; 3941 uint16_t flags; 3942 struct qinit *qi; 3943 int (*proc)(); 3944 struct stdata *stp; 3945 int rval; 3946 3947 stp = STREAM(qp); 3948 /* 3949 * Prevent q_next from changing by holding sd_lock until 3950 * acquiring SQLOCK. 3951 */ 3952 mutex_enter(&stp->sd_lock); 3953 if ((nqp = _WR(qp)) == qp) { 3954 qp = nqp->q_next; 3955 } else { 3956 qp = _RD(nqp->q_next); 3957 } 3958 qi = qp->q_qinfo; 3959 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3960 mutex_exit(&stp->sd_lock); 3961 return (EINVAL); 3962 } 3963 sq = qp->q_syncq; 3964 mutex_enter(SQLOCK(sq)); 3965 mutex_exit(&stp->sd_lock); 3966 count = sq->sq_count; 3967 flags = sq->sq_flags; 3968 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3969 3970 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3971 /* 3972 * Wait until we can enter the inner perimeter. 3973 */ 3974 sq->sq_flags = flags | SQ_WANTWAKEUP; 3975 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3976 count = sq->sq_count; 3977 flags = sq->sq_flags; 3978 } 3979 3980 if (! (flags & SQ_CIPUT)) 3981 sq->sq_flags = flags | SQ_EXCL; 3982 sq->sq_count = count + 1; 3983 ASSERT(sq->sq_count != 0); /* Wraparound */ 3984 mutex_exit(SQLOCK(sq)); 3985 3986 rval = (*proc)(qp, idp); 3987 3988 mutex_enter(SQLOCK(sq)); 3989 flags = sq->sq_flags; 3990 ASSERT(sq->sq_count != 0); 3991 sq->sq_count--; 3992 if (flags & SQ_TAIL) { 3993 putnext_tail(sq, qp, flags); 3994 /* 3995 * The only purpose of this ASSERT is to preserve calling stack 3996 * in DEBUG kernel. 3997 */ 3998 ASSERT(flags & SQ_TAIL); 3999 return (rval); 4000 } 4001 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 4002 /* 4003 * XXXX 4004 * I am not certain the next comment is correct here. I need to consider 4005 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 4006 * might cause other problems. It just might be safer to drop it if 4007 * !SQ_CIPUT because that is when we set it. 4008 */ 4009 /* 4010 * Safe to always drop SQ_EXCL: 4011 * Not SQ_CIPUT means we set SQ_EXCL above 4012 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 4013 * did a qwriter(INNER) in which case nobody else 4014 * is in the inner perimeter and we are exiting. 4015 * 4016 * I would like to make the following assertion: 4017 * 4018 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 4019 * sq->sq_count == 0); 4020 * 4021 * which indicates that if we are both putshared and exclusive, 4022 * we became exclusive while executing the putproc, and the only 4023 * claim on the syncq was the one we dropped a few lines above. 4024 * But other threads that enter putnext while the syncq is exclusive 4025 * need to make a claim as they may need to drop SQLOCK in the 4026 * has_writers case to avoid deadlocks. If these threads are 4027 * delayed or preempted, it is possible that the writer thread can 4028 * find out that there are other claims making the (sq_count == 0) 4029 * test invalid. 4030 */ 4031 4032 sq->sq_flags = flags & ~SQ_EXCL; 4033 mutex_exit(SQLOCK(sq)); 4034 return (rval); 4035 } 4036 4037 /* 4038 * Return nonzero if the queue is responsible for struio(), else return 0. 4039 */ 4040 int 4041 isuioq(queue_t *q) 4042 { 4043 if (q->q_flag & QREADR) 4044 return (STREAM(q)->sd_struiordq == q); 4045 else 4046 return (STREAM(q)->sd_struiowrq == q); 4047 } 4048 4049 #if defined(__sparc) 4050 int disable_putlocks = 0; 4051 #else 4052 int disable_putlocks = 1; 4053 #endif 4054 4055 /* 4056 * called by create_putlock. 4057 */ 4058 static void 4059 create_syncq_putlocks(queue_t *q) 4060 { 4061 syncq_t *sq = q->q_syncq; 4062 ciputctrl_t *cip; 4063 int i; 4064 4065 ASSERT(sq != NULL); 4066 4067 ASSERT(disable_putlocks == 0); 4068 ASSERT(n_ciputctrl >= min_n_ciputctrl); 4069 ASSERT(ciputctrl_cache != NULL); 4070 4071 if (!(sq->sq_type & SQ_CIPUT)) 4072 return; 4073 4074 for (i = 0; i <= 1; i++) { 4075 if (sq->sq_ciputctrl == NULL) { 4076 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4077 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4078 mutex_enter(SQLOCK(sq)); 4079 if (sq->sq_ciputctrl != NULL) { 4080 mutex_exit(SQLOCK(sq)); 4081 kmem_cache_free(ciputctrl_cache, cip); 4082 } else { 4083 ASSERT(sq->sq_nciputctrl == 0); 4084 sq->sq_nciputctrl = n_ciputctrl - 1; 4085 /* 4086 * putnext checks sq_ciputctrl without holding 4087 * SQLOCK. if it is not NULL putnext assumes 4088 * sq_nciputctrl is initialized. membar below 4089 * insures that. 4090 */ 4091 membar_producer(); 4092 sq->sq_ciputctrl = cip; 4093 mutex_exit(SQLOCK(sq)); 4094 } 4095 } 4096 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 4097 if (i == 1) 4098 break; 4099 q = _OTHERQ(q); 4100 if (!(q->q_flag & QPERQ)) { 4101 ASSERT(sq == q->q_syncq); 4102 break; 4103 } 4104 ASSERT(q->q_syncq != NULL); 4105 ASSERT(sq != q->q_syncq); 4106 sq = q->q_syncq; 4107 ASSERT(sq->sq_type & SQ_CIPUT); 4108 } 4109 } 4110 4111 /* 4112 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 4113 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 4114 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 4115 * starting from q and down to the driver. 4116 * 4117 * This should be called after the affected queues are part of stream 4118 * geometry. It should be called from driver/module open routine after 4119 * qprocson() call. It is also called from nfs syscall where it is known that 4120 * stream is configured and won't change its geometry during create_putlock 4121 * call. 4122 * 4123 * caller normally uses 0 value for the stream argument to speed up MT putnext 4124 * into the perimeter of q for example because its perimeter is per module 4125 * (e.g. IP). 4126 * 4127 * caller normally uses non 0 value for the stream argument to hint the system 4128 * that the stream of q is a very contended global system stream 4129 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 4130 * particularly MT hot. 4131 * 4132 * Caller insures stream plumbing won't happen while we are here and therefore 4133 * q_next can be safely used. 4134 */ 4135 4136 void 4137 create_putlocks(queue_t *q, int stream) 4138 { 4139 ciputctrl_t *cip; 4140 struct stdata *stp = STREAM(q); 4141 4142 q = _WR(q); 4143 ASSERT(stp != NULL); 4144 4145 if (disable_putlocks != 0) 4146 return; 4147 4148 if (n_ciputctrl < min_n_ciputctrl) 4149 return; 4150 4151 ASSERT(ciputctrl_cache != NULL); 4152 4153 if (stream != 0 && stp->sd_ciputctrl == NULL) { 4154 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4155 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4156 mutex_enter(&stp->sd_lock); 4157 if (stp->sd_ciputctrl != NULL) { 4158 mutex_exit(&stp->sd_lock); 4159 kmem_cache_free(ciputctrl_cache, cip); 4160 } else { 4161 ASSERT(stp->sd_nciputctrl == 0); 4162 stp->sd_nciputctrl = n_ciputctrl - 1; 4163 /* 4164 * putnext checks sd_ciputctrl without holding 4165 * sd_lock. if it is not NULL putnext assumes 4166 * sd_nciputctrl is initialized. membar below 4167 * insures that. 4168 */ 4169 membar_producer(); 4170 stp->sd_ciputctrl = cip; 4171 mutex_exit(&stp->sd_lock); 4172 } 4173 } 4174 4175 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 4176 4177 while (_SAMESTR(q)) { 4178 create_syncq_putlocks(q); 4179 if (stream == 0) 4180 return; 4181 q = q->q_next; 4182 } 4183 ASSERT(q != NULL); 4184 create_syncq_putlocks(q); 4185 } 4186 4187 /* 4188 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 4189 * through a stream. 4190 * 4191 * Data currently record per-event is a timestamp, module/driver name, 4192 * downstream module/driver name, optional callstack, event type and a per 4193 * type datum. Much of the STREAMS framework is instrumented for automatic 4194 * flow tracing (when enabled). Events can be defined and used by STREAMS 4195 * modules and drivers. 4196 * 4197 * Global objects: 4198 * 4199 * str_ftevent() - Add a flow-trace event to a dblk. 4200 * str_ftfree() - Free flow-trace data 4201 * 4202 * Local objects: 4203 * 4204 * fthdr_cache - pointer to the kmem cache for trace header. 4205 * ftblk_cache - pointer to the kmem cache for trace data blocks. 4206 */ 4207 4208 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 4209 int str_ftstack = 0; /* Don't record event call stacks */ 4210 4211 void 4212 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 4213 { 4214 ftblk_t *bp = hp->tail; 4215 ftblk_t *nbp; 4216 ftevnt_t *ep; 4217 int ix, nix; 4218 4219 ASSERT(hp != NULL); 4220 4221 for (;;) { 4222 if ((ix = bp->ix) == FTBLK_EVNTS) { 4223 /* 4224 * Tail doesn't have room, so need a new tail. 4225 * 4226 * To make this MT safe, first, allocate a new 4227 * ftblk, and initialize it. To make life a 4228 * little easier, reserve the first slot (mostly 4229 * by making ix = 1). When we are finished with 4230 * the initialization, CAS this pointer to the 4231 * tail. If this succeeds, this is the new 4232 * "next" block. Otherwise, another thread 4233 * got here first, so free the block and start 4234 * again. 4235 */ 4236 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP); 4237 if (nbp == NULL) { 4238 /* no mem, so punt */ 4239 str_ftnever++; 4240 /* free up all flow data? */ 4241 return; 4242 } 4243 nbp->nxt = NULL; 4244 nbp->ix = 1; 4245 /* 4246 * Just in case there is another thread about 4247 * to get the next index, we need to make sure 4248 * the value is there for it. 4249 */ 4250 membar_producer(); 4251 if (casptr(&hp->tail, bp, nbp) == bp) { 4252 /* CAS was successful */ 4253 bp->nxt = nbp; 4254 membar_producer(); 4255 bp = nbp; 4256 ix = 0; 4257 goto cas_good; 4258 } else { 4259 kmem_cache_free(ftblk_cache, nbp); 4260 bp = hp->tail; 4261 continue; 4262 } 4263 } 4264 nix = ix + 1; 4265 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 4266 cas_good: 4267 if (curthread != hp->thread) { 4268 hp->thread = curthread; 4269 evnt |= FTEV_CS; 4270 } 4271 if (CPU->cpu_seqid != hp->cpu_seqid) { 4272 hp->cpu_seqid = CPU->cpu_seqid; 4273 evnt |= FTEV_PS; 4274 } 4275 ep = &bp->ev[ix]; 4276 break; 4277 } 4278 } 4279 4280 if (evnt & FTEV_QMASK) { 4281 queue_t *qp = p; 4282 4283 if (!(qp->q_flag & QREADR)) 4284 evnt |= FTEV_ISWR; 4285 4286 ep->mid = Q2NAME(qp); 4287 4288 /* 4289 * We only record the next queue name for FTEV_PUTNEXT since 4290 * that's the only time we *really* need it, and the putnext() 4291 * code ensures that qp->q_next won't vanish. (We could use 4292 * claimstr()/releasestr() but at a performance cost.) 4293 */ 4294 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL) 4295 ep->midnext = Q2NAME(qp->q_next); 4296 else 4297 ep->midnext = NULL; 4298 } else { 4299 ep->mid = p; 4300 ep->midnext = NULL; 4301 } 4302 4303 if (ep->stk != NULL) 4304 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH); 4305 4306 ep->ts = gethrtime(); 4307 ep->evnt = evnt; 4308 ep->data = data; 4309 hp->hash = (hp->hash << 9) + hp->hash; 4310 hp->hash += (evnt << 16) | data; 4311 hp->hash += (uintptr_t)ep->mid; 4312 } 4313 4314 /* 4315 * Free flow-trace data. 4316 */ 4317 void 4318 str_ftfree(dblk_t *dbp) 4319 { 4320 fthdr_t *hp = dbp->db_fthdr; 4321 ftblk_t *bp = &hp->first; 4322 ftblk_t *nbp; 4323 4324 if (bp != hp->tail || bp->ix != 0) { 4325 /* 4326 * Clear out the hash, have the tail point to itself, and free 4327 * any continuation blocks. 4328 */ 4329 bp = hp->first.nxt; 4330 hp->tail = &hp->first; 4331 hp->hash = 0; 4332 hp->first.nxt = NULL; 4333 hp->first.ix = 0; 4334 while (bp != NULL) { 4335 nbp = bp->nxt; 4336 kmem_cache_free(ftblk_cache, bp); 4337 bp = nbp; 4338 } 4339 } 4340 kmem_cache_free(fthdr_cache, hp); 4341 dbp->db_fthdr = NULL; 4342 } 4343