1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 /* 25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 */ 28 29 #include <sys/types.h> 30 #include <sys/param.h> 31 #include <sys/thread.h> 32 #include <sys/sysmacros.h> 33 #include <sys/stropts.h> 34 #include <sys/stream.h> 35 #include <sys/strsubr.h> 36 #include <sys/strsun.h> 37 #include <sys/conf.h> 38 #include <sys/debug.h> 39 #include <sys/cmn_err.h> 40 #include <sys/kmem.h> 41 #include <sys/atomic.h> 42 #include <sys/errno.h> 43 #include <sys/vtrace.h> 44 #include <sys/ftrace.h> 45 #include <sys/ontrap.h> 46 #include <sys/multidata.h> 47 #include <sys/multidata_impl.h> 48 #include <sys/sdt.h> 49 #include <sys/strft.h> 50 51 #ifdef DEBUG 52 #include <sys/kmem_impl.h> 53 #endif 54 55 /* 56 * This file contains all the STREAMS utility routines that may 57 * be used by modules and drivers. 58 */ 59 60 /* 61 * STREAMS message allocator: principles of operation 62 * 63 * The streams message allocator consists of all the routines that 64 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 65 * dupb(), freeb() and freemsg(). What follows is a high-level view 66 * of how the allocator works. 67 * 68 * Every streams message consists of one or more mblks, a dblk, and data. 69 * All mblks for all types of messages come from a common mblk_cache. 70 * The dblk and data come in several flavors, depending on how the 71 * message is allocated: 72 * 73 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 74 * fixed-size dblk/data caches. For message sizes that are multiples of 75 * PAGESIZE, dblks are allocated separately from the buffer. 76 * The associated buffer is allocated by the constructor using kmem_alloc(). 77 * For all other message sizes, dblk and its associated data is allocated 78 * as a single contiguous chunk of memory. 79 * Objects in these caches consist of a dblk plus its associated data. 80 * allocb() determines the nearest-size cache by table lookup: 81 * the dblk_cache[] array provides the mapping from size to dblk cache. 82 * 83 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 84 * kmem_alloc()'ing a buffer for the data and supplying that 85 * buffer to gesballoc(), described below. 86 * 87 * (3) The four flavors of [d]esballoc[a] are all implemented by a 88 * common routine, gesballoc() ("generic esballoc"). gesballoc() 89 * allocates a dblk from the global dblk_esb_cache and sets db_base, 90 * db_lim and db_frtnp to describe the caller-supplied buffer. 91 * 92 * While there are several routines to allocate messages, there is only 93 * one routine to free messages: freeb(). freeb() simply invokes the 94 * dblk's free method, dbp->db_free(), which is set at allocation time. 95 * 96 * dupb() creates a new reference to a message by allocating a new mblk, 97 * incrementing the dblk reference count and setting the dblk's free 98 * method to dblk_decref(). The dblk's original free method is retained 99 * in db_lastfree. dblk_decref() decrements the reference count on each 100 * freeb(). If this is not the last reference it just frees the mblk; 101 * if this *is* the last reference, it restores db_free to db_lastfree, 102 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 103 * 104 * The implementation makes aggressive use of kmem object caching for 105 * maximum performance. This makes the code simple and compact, but 106 * also a bit abstruse in some places. The invariants that constitute a 107 * message's constructed state, described below, are more subtle than usual. 108 * 109 * Every dblk has an "attached mblk" as part of its constructed state. 110 * The mblk is allocated by the dblk's constructor and remains attached 111 * until the message is either dup'ed or pulled up. In the dupb() case 112 * the mblk association doesn't matter until the last free, at which time 113 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 114 * the mblk association because it swaps the leading mblks of two messages, 115 * so it is responsible for swapping their db_mblk pointers accordingly. 116 * From a constructed-state viewpoint it doesn't matter that a dblk's 117 * attached mblk can change while the message is allocated; all that 118 * matters is that the dblk has *some* attached mblk when it's freed. 119 * 120 * The sizes of the allocb() small-message caches are not magical. 121 * They represent a good trade-off between internal and external 122 * fragmentation for current workloads. They should be reevaluated 123 * periodically, especially if allocations larger than DBLK_MAX_CACHE 124 * become common. We use 64-byte alignment so that dblks don't 125 * straddle cache lines unnecessarily. 126 */ 127 #define DBLK_MAX_CACHE 73728 128 #define DBLK_CACHE_ALIGN 64 129 #define DBLK_MIN_SIZE 8 130 #define DBLK_SIZE_SHIFT 3 131 132 #ifdef _BIG_ENDIAN 133 #define DBLK_RTFU_SHIFT(field) \ 134 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 135 #else 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 138 #endif 139 140 #define DBLK_RTFU(ref, type, flags, uioflag) \ 141 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 142 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 143 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 144 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 145 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 146 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 148 149 static size_t dblk_sizes[] = { 150 #ifdef _LP64 151 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856, 152 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624, 153 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392, 154 #else 155 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904, 156 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672, 157 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440, 158 #endif 159 DBLK_MAX_CACHE, 0 160 }; 161 162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 163 static struct kmem_cache *mblk_cache; 164 static struct kmem_cache *dblk_esb_cache; 165 static struct kmem_cache *fthdr_cache; 166 static struct kmem_cache *ftblk_cache; 167 168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 169 static mblk_t *allocb_oversize(size_t size, int flags); 170 static int allocb_tryhard_fails; 171 static void frnop_func(void *arg); 172 frtn_t frnop = { frnop_func }; 173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 174 175 static boolean_t rwnext_enter(queue_t *qp); 176 static void rwnext_exit(queue_t *qp); 177 178 /* 179 * Patchable mblk/dblk kmem_cache flags. 180 */ 181 int dblk_kmem_flags = 0; 182 int mblk_kmem_flags = 0; 183 184 static int 185 dblk_constructor(void *buf, void *cdrarg, int kmflags) 186 { 187 dblk_t *dbp = buf; 188 ssize_t msg_size = (ssize_t)cdrarg; 189 size_t index; 190 191 ASSERT(msg_size != 0); 192 193 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 194 195 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 196 197 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 198 return (-1); 199 if ((msg_size & PAGEOFFSET) == 0) { 200 dbp->db_base = kmem_alloc(msg_size, kmflags); 201 if (dbp->db_base == NULL) { 202 kmem_cache_free(mblk_cache, dbp->db_mblk); 203 return (-1); 204 } 205 } else { 206 dbp->db_base = (unsigned char *)&dbp[1]; 207 } 208 209 dbp->db_mblk->b_datap = dbp; 210 dbp->db_cache = dblk_cache[index]; 211 dbp->db_lim = dbp->db_base + msg_size; 212 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 213 dbp->db_frtnp = NULL; 214 dbp->db_fthdr = NULL; 215 dbp->db_credp = NULL; 216 dbp->db_cpid = -1; 217 dbp->db_struioflag = 0; 218 dbp->db_struioun.cksum.flags = 0; 219 return (0); 220 } 221 222 /*ARGSUSED*/ 223 static int 224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 225 { 226 dblk_t *dbp = buf; 227 228 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 229 return (-1); 230 dbp->db_mblk->b_datap = dbp; 231 dbp->db_cache = dblk_esb_cache; 232 dbp->db_fthdr = NULL; 233 dbp->db_credp = NULL; 234 dbp->db_cpid = -1; 235 dbp->db_struioflag = 0; 236 dbp->db_struioun.cksum.flags = 0; 237 return (0); 238 } 239 240 static int 241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 242 { 243 dblk_t *dbp = buf; 244 bcache_t *bcp = cdrarg; 245 246 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 247 return (-1); 248 249 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags); 250 if (dbp->db_base == NULL) { 251 kmem_cache_free(mblk_cache, dbp->db_mblk); 252 return (-1); 253 } 254 255 dbp->db_mblk->b_datap = dbp; 256 dbp->db_cache = (void *)bcp; 257 dbp->db_lim = dbp->db_base + bcp->size; 258 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 259 dbp->db_frtnp = NULL; 260 dbp->db_fthdr = NULL; 261 dbp->db_credp = NULL; 262 dbp->db_cpid = -1; 263 dbp->db_struioflag = 0; 264 dbp->db_struioun.cksum.flags = 0; 265 return (0); 266 } 267 268 /*ARGSUSED*/ 269 static void 270 dblk_destructor(void *buf, void *cdrarg) 271 { 272 dblk_t *dbp = buf; 273 ssize_t msg_size = (ssize_t)cdrarg; 274 275 ASSERT(dbp->db_mblk->b_datap == dbp); 276 ASSERT(msg_size != 0); 277 ASSERT(dbp->db_struioflag == 0); 278 ASSERT(dbp->db_struioun.cksum.flags == 0); 279 280 if ((msg_size & PAGEOFFSET) == 0) { 281 kmem_free(dbp->db_base, msg_size); 282 } 283 284 kmem_cache_free(mblk_cache, dbp->db_mblk); 285 } 286 287 static void 288 bcache_dblk_destructor(void *buf, void *cdrarg) 289 { 290 dblk_t *dbp = buf; 291 bcache_t *bcp = cdrarg; 292 293 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 294 295 ASSERT(dbp->db_mblk->b_datap == dbp); 296 ASSERT(dbp->db_struioflag == 0); 297 ASSERT(dbp->db_struioun.cksum.flags == 0); 298 299 kmem_cache_free(mblk_cache, dbp->db_mblk); 300 } 301 302 /* ARGSUSED */ 303 static int 304 ftblk_constructor(void *buf, void *cdrarg, int kmflags) 305 { 306 ftblk_t *fbp = buf; 307 int i; 308 309 bzero(fbp, sizeof (ftblk_t)); 310 if (str_ftstack != 0) { 311 for (i = 0; i < FTBLK_EVNTS; i++) 312 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags); 313 } 314 315 return (0); 316 } 317 318 /* ARGSUSED */ 319 static void 320 ftblk_destructor(void *buf, void *cdrarg) 321 { 322 ftblk_t *fbp = buf; 323 int i; 324 325 if (str_ftstack != 0) { 326 for (i = 0; i < FTBLK_EVNTS; i++) { 327 if (fbp->ev[i].stk != NULL) { 328 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t)); 329 fbp->ev[i].stk = NULL; 330 } 331 } 332 } 333 } 334 335 static int 336 fthdr_constructor(void *buf, void *cdrarg, int kmflags) 337 { 338 fthdr_t *fhp = buf; 339 340 return (ftblk_constructor(&fhp->first, cdrarg, kmflags)); 341 } 342 343 static void 344 fthdr_destructor(void *buf, void *cdrarg) 345 { 346 fthdr_t *fhp = buf; 347 348 ftblk_destructor(&fhp->first, cdrarg); 349 } 350 351 void 352 streams_msg_init(void) 353 { 354 char name[40]; 355 size_t size; 356 size_t lastsize = DBLK_MIN_SIZE; 357 size_t *sizep; 358 struct kmem_cache *cp; 359 size_t tot_size; 360 int offset; 361 362 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32, 363 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags); 364 365 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 366 367 if ((offset = (size & PAGEOFFSET)) != 0) { 368 /* 369 * We are in the middle of a page, dblk should 370 * be allocated on the same page 371 */ 372 tot_size = size + sizeof (dblk_t); 373 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 374 < PAGESIZE); 375 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 376 377 } else { 378 379 /* 380 * buf size is multiple of page size, dblk and 381 * buffer are allocated separately. 382 */ 383 384 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 385 tot_size = sizeof (dblk_t); 386 } 387 388 (void) sprintf(name, "streams_dblk_%ld", size); 389 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN, 390 dblk_constructor, dblk_destructor, NULL, (void *)(size), 391 NULL, dblk_kmem_flags); 392 393 while (lastsize <= size) { 394 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 395 lastsize += DBLK_MIN_SIZE; 396 } 397 } 398 399 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t), 400 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL, 401 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags); 402 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32, 403 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0); 404 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32, 405 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0); 406 407 /* Initialize Multidata caches */ 408 mmd_init(); 409 410 /* initialize throttling queue for esballoc */ 411 esballoc_queue_init(); 412 } 413 414 /*ARGSUSED*/ 415 mblk_t * 416 allocb(size_t size, uint_t pri) 417 { 418 dblk_t *dbp; 419 mblk_t *mp; 420 size_t index; 421 422 index = (size - 1) >> DBLK_SIZE_SHIFT; 423 424 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 425 if (size != 0) { 426 mp = allocb_oversize(size, KM_NOSLEEP); 427 goto out; 428 } 429 index = 0; 430 } 431 432 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 433 mp = NULL; 434 goto out; 435 } 436 437 mp = dbp->db_mblk; 438 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 439 mp->b_next = mp->b_prev = mp->b_cont = NULL; 440 mp->b_rptr = mp->b_wptr = dbp->db_base; 441 mp->b_queue = NULL; 442 MBLK_BAND_FLAG_WORD(mp) = 0; 443 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 444 out: 445 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 446 447 return (mp); 448 } 449 450 /* 451 * Allocate an mblk taking db_credp and db_cpid from the template. 452 * Allow the cred to be NULL. 453 */ 454 mblk_t * 455 allocb_tmpl(size_t size, const mblk_t *tmpl) 456 { 457 mblk_t *mp = allocb(size, 0); 458 459 if (mp != NULL) { 460 dblk_t *src = tmpl->b_datap; 461 dblk_t *dst = mp->b_datap; 462 cred_t *cr; 463 pid_t cpid; 464 465 cr = msg_getcred(tmpl, &cpid); 466 if (cr != NULL) 467 crhold(dst->db_credp = cr); 468 dst->db_cpid = cpid; 469 dst->db_type = src->db_type; 470 } 471 return (mp); 472 } 473 474 mblk_t * 475 allocb_cred(size_t size, cred_t *cr, pid_t cpid) 476 { 477 mblk_t *mp = allocb(size, 0); 478 479 ASSERT(cr != NULL); 480 if (mp != NULL) { 481 dblk_t *dbp = mp->b_datap; 482 483 crhold(dbp->db_credp = cr); 484 dbp->db_cpid = cpid; 485 } 486 return (mp); 487 } 488 489 mblk_t * 490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid) 491 { 492 mblk_t *mp = allocb_wait(size, 0, flags, error); 493 494 ASSERT(cr != NULL); 495 if (mp != NULL) { 496 dblk_t *dbp = mp->b_datap; 497 498 crhold(dbp->db_credp = cr); 499 dbp->db_cpid = cpid; 500 } 501 502 return (mp); 503 } 504 505 /* 506 * Extract the db_cred (and optionally db_cpid) from a message. 507 * We find the first mblk which has a non-NULL db_cred and use that. 508 * If none found we return NULL. 509 * Does NOT get a hold on the cred. 510 */ 511 cred_t * 512 msg_getcred(const mblk_t *mp, pid_t *cpidp) 513 { 514 cred_t *cr = NULL; 515 cred_t *cr2; 516 mblk_t *mp2; 517 518 while (mp != NULL) { 519 dblk_t *dbp = mp->b_datap; 520 521 cr = dbp->db_credp; 522 if (cr == NULL) { 523 mp = mp->b_cont; 524 continue; 525 } 526 if (cpidp != NULL) 527 *cpidp = dbp->db_cpid; 528 529 #ifdef DEBUG 530 /* 531 * Normally there should at most one db_credp in a message. 532 * But if there are multiple (as in the case of some M_IOC* 533 * and some internal messages in TCP/IP bind logic) then 534 * they must be identical in the normal case. 535 * However, a socket can be shared between different uids 536 * in which case data queued in TCP would be from different 537 * creds. Thus we can only assert for the zoneid being the 538 * same. Due to Multi-level Level Ports for TX, some 539 * cred_t can have a NULL cr_zone, and we skip the comparison 540 * in that case. 541 */ 542 mp2 = mp->b_cont; 543 while (mp2 != NULL) { 544 cr2 = DB_CRED(mp2); 545 if (cr2 != NULL) { 546 DTRACE_PROBE2(msg__getcred, 547 cred_t *, cr, cred_t *, cr2); 548 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 549 crgetzone(cr) == NULL || 550 crgetzone(cr2) == NULL); 551 } 552 mp2 = mp2->b_cont; 553 } 554 #endif 555 return (cr); 556 } 557 if (cpidp != NULL) 558 *cpidp = NOPID; 559 return (NULL); 560 } 561 562 /* 563 * Variant of msg_getcred which, when a cred is found 564 * 1. Returns with a hold on the cred 565 * 2. Clears the first cred in the mblk. 566 * This is more efficient to use than a msg_getcred() + crhold() when 567 * the message is freed after the cred has been extracted. 568 * 569 * The caller is responsible for ensuring that there is no other reference 570 * on the message since db_credp can not be cleared when there are other 571 * references. 572 */ 573 cred_t * 574 msg_extractcred(mblk_t *mp, pid_t *cpidp) 575 { 576 cred_t *cr = NULL; 577 cred_t *cr2; 578 mblk_t *mp2; 579 580 while (mp != NULL) { 581 dblk_t *dbp = mp->b_datap; 582 583 cr = dbp->db_credp; 584 if (cr == NULL) { 585 mp = mp->b_cont; 586 continue; 587 } 588 ASSERT(dbp->db_ref == 1); 589 dbp->db_credp = NULL; 590 if (cpidp != NULL) 591 *cpidp = dbp->db_cpid; 592 #ifdef DEBUG 593 /* 594 * Normally there should at most one db_credp in a message. 595 * But if there are multiple (as in the case of some M_IOC* 596 * and some internal messages in TCP/IP bind logic) then 597 * they must be identical in the normal case. 598 * However, a socket can be shared between different uids 599 * in which case data queued in TCP would be from different 600 * creds. Thus we can only assert for the zoneid being the 601 * same. Due to Multi-level Level Ports for TX, some 602 * cred_t can have a NULL cr_zone, and we skip the comparison 603 * in that case. 604 */ 605 mp2 = mp->b_cont; 606 while (mp2 != NULL) { 607 cr2 = DB_CRED(mp2); 608 if (cr2 != NULL) { 609 DTRACE_PROBE2(msg__extractcred, 610 cred_t *, cr, cred_t *, cr2); 611 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 612 crgetzone(cr) == NULL || 613 crgetzone(cr2) == NULL); 614 } 615 mp2 = mp2->b_cont; 616 } 617 #endif 618 return (cr); 619 } 620 return (NULL); 621 } 622 /* 623 * Get the label for a message. Uses the first mblk in the message 624 * which has a non-NULL db_credp. 625 * Returns NULL if there is no credp. 626 */ 627 extern struct ts_label_s * 628 msg_getlabel(const mblk_t *mp) 629 { 630 cred_t *cr = msg_getcred(mp, NULL); 631 632 if (cr == NULL) 633 return (NULL); 634 635 return (crgetlabel(cr)); 636 } 637 638 void 639 freeb(mblk_t *mp) 640 { 641 dblk_t *dbp = mp->b_datap; 642 643 ASSERT(dbp->db_ref > 0); 644 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 645 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 646 647 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 648 649 dbp->db_free(mp, dbp); 650 } 651 652 void 653 freemsg(mblk_t *mp) 654 { 655 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 656 while (mp) { 657 dblk_t *dbp = mp->b_datap; 658 mblk_t *mp_cont = mp->b_cont; 659 660 ASSERT(dbp->db_ref > 0); 661 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 662 663 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 664 665 dbp->db_free(mp, dbp); 666 mp = mp_cont; 667 } 668 } 669 670 /* 671 * Reallocate a block for another use. Try hard to use the old block. 672 * If the old data is wanted (copy), leave b_wptr at the end of the data, 673 * otherwise return b_wptr = b_rptr. 674 * 675 * This routine is private and unstable. 676 */ 677 mblk_t * 678 reallocb(mblk_t *mp, size_t size, uint_t copy) 679 { 680 mblk_t *mp1; 681 unsigned char *old_rptr; 682 ptrdiff_t cur_size; 683 684 if (mp == NULL) 685 return (allocb(size, BPRI_HI)); 686 687 cur_size = mp->b_wptr - mp->b_rptr; 688 old_rptr = mp->b_rptr; 689 690 ASSERT(mp->b_datap->db_ref != 0); 691 692 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 693 /* 694 * If the data is wanted and it will fit where it is, no 695 * work is required. 696 */ 697 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 698 return (mp); 699 700 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 701 mp1 = mp; 702 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 703 /* XXX other mp state could be copied too, db_flags ... ? */ 704 mp1->b_cont = mp->b_cont; 705 } else { 706 return (NULL); 707 } 708 709 if (copy) { 710 bcopy(old_rptr, mp1->b_rptr, cur_size); 711 mp1->b_wptr = mp1->b_rptr + cur_size; 712 } 713 714 if (mp != mp1) 715 freeb(mp); 716 717 return (mp1); 718 } 719 720 static void 721 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 722 { 723 ASSERT(dbp->db_mblk == mp); 724 if (dbp->db_fthdr != NULL) 725 str_ftfree(dbp); 726 727 /* set credp and projid to be 'unspecified' before returning to cache */ 728 if (dbp->db_credp != NULL) { 729 crfree(dbp->db_credp); 730 dbp->db_credp = NULL; 731 } 732 dbp->db_cpid = -1; 733 734 /* Reset the struioflag and the checksum flag fields */ 735 dbp->db_struioflag = 0; 736 dbp->db_struioun.cksum.flags = 0; 737 738 /* and the COOKED and/or UIOA flag(s) */ 739 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA); 740 741 kmem_cache_free(dbp->db_cache, dbp); 742 } 743 744 static void 745 dblk_decref(mblk_t *mp, dblk_t *dbp) 746 { 747 if (dbp->db_ref != 1) { 748 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 749 -(1 << DBLK_RTFU_SHIFT(db_ref))); 750 /* 751 * atomic_add_32_nv() just decremented db_ref, so we no longer 752 * have a reference to the dblk, which means another thread 753 * could free it. Therefore we cannot examine the dblk to 754 * determine whether ours was the last reference. Instead, 755 * we extract the new and minimum reference counts from rtfu. 756 * Note that all we're really saying is "if (ref != refmin)". 757 */ 758 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 759 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 760 kmem_cache_free(mblk_cache, mp); 761 return; 762 } 763 } 764 dbp->db_mblk = mp; 765 dbp->db_free = dbp->db_lastfree; 766 dbp->db_lastfree(mp, dbp); 767 } 768 769 mblk_t * 770 dupb(mblk_t *mp) 771 { 772 dblk_t *dbp = mp->b_datap; 773 mblk_t *new_mp; 774 uint32_t oldrtfu, newrtfu; 775 776 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 777 goto out; 778 779 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 780 new_mp->b_rptr = mp->b_rptr; 781 new_mp->b_wptr = mp->b_wptr; 782 new_mp->b_datap = dbp; 783 new_mp->b_queue = NULL; 784 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 785 786 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 787 788 dbp->db_free = dblk_decref; 789 do { 790 ASSERT(dbp->db_ref > 0); 791 oldrtfu = DBLK_RTFU_WORD(dbp); 792 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 793 /* 794 * If db_ref is maxed out we can't dup this message anymore. 795 */ 796 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 797 kmem_cache_free(mblk_cache, new_mp); 798 new_mp = NULL; 799 goto out; 800 } 801 } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu); 802 803 out: 804 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 805 return (new_mp); 806 } 807 808 static void 809 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 810 { 811 frtn_t *frp = dbp->db_frtnp; 812 813 ASSERT(dbp->db_mblk == mp); 814 frp->free_func(frp->free_arg); 815 if (dbp->db_fthdr != NULL) 816 str_ftfree(dbp); 817 818 /* set credp and projid to be 'unspecified' before returning to cache */ 819 if (dbp->db_credp != NULL) { 820 crfree(dbp->db_credp); 821 dbp->db_credp = NULL; 822 } 823 dbp->db_cpid = -1; 824 dbp->db_struioflag = 0; 825 dbp->db_struioun.cksum.flags = 0; 826 827 kmem_cache_free(dbp->db_cache, dbp); 828 } 829 830 /*ARGSUSED*/ 831 static void 832 frnop_func(void *arg) 833 { 834 } 835 836 /* 837 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 838 */ 839 static mblk_t * 840 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 841 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 842 { 843 dblk_t *dbp; 844 mblk_t *mp; 845 846 ASSERT(base != NULL && frp != NULL); 847 848 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 849 mp = NULL; 850 goto out; 851 } 852 853 mp = dbp->db_mblk; 854 dbp->db_base = base; 855 dbp->db_lim = base + size; 856 dbp->db_free = dbp->db_lastfree = lastfree; 857 dbp->db_frtnp = frp; 858 DBLK_RTFU_WORD(dbp) = db_rtfu; 859 mp->b_next = mp->b_prev = mp->b_cont = NULL; 860 mp->b_rptr = mp->b_wptr = base; 861 mp->b_queue = NULL; 862 MBLK_BAND_FLAG_WORD(mp) = 0; 863 864 out: 865 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 866 return (mp); 867 } 868 869 /*ARGSUSED*/ 870 mblk_t * 871 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 872 { 873 mblk_t *mp; 874 875 /* 876 * Note that this is structured to allow the common case (i.e. 877 * STREAMS flowtracing disabled) to call gesballoc() with tail 878 * call optimization. 879 */ 880 if (!str_ftnever) { 881 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 882 frp, freebs_enqueue, KM_NOSLEEP); 883 884 if (mp != NULL) 885 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 886 return (mp); 887 } 888 889 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 890 frp, freebs_enqueue, KM_NOSLEEP)); 891 } 892 893 /* 894 * Same as esballoc() but sleeps waiting for memory. 895 */ 896 /*ARGSUSED*/ 897 mblk_t * 898 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 899 { 900 mblk_t *mp; 901 902 /* 903 * Note that this is structured to allow the common case (i.e. 904 * STREAMS flowtracing disabled) to call gesballoc() with tail 905 * call optimization. 906 */ 907 if (!str_ftnever) { 908 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 909 frp, freebs_enqueue, KM_SLEEP); 910 911 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 912 return (mp); 913 } 914 915 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 916 frp, freebs_enqueue, KM_SLEEP)); 917 } 918 919 /*ARGSUSED*/ 920 mblk_t * 921 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 922 { 923 mblk_t *mp; 924 925 /* 926 * Note that this is structured to allow the common case (i.e. 927 * STREAMS flowtracing disabled) to call gesballoc() with tail 928 * call optimization. 929 */ 930 if (!str_ftnever) { 931 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 932 frp, dblk_lastfree_desb, KM_NOSLEEP); 933 934 if (mp != NULL) 935 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 936 return (mp); 937 } 938 939 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 940 frp, dblk_lastfree_desb, KM_NOSLEEP)); 941 } 942 943 /*ARGSUSED*/ 944 mblk_t * 945 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 946 { 947 mblk_t *mp; 948 949 /* 950 * Note that this is structured to allow the common case (i.e. 951 * STREAMS flowtracing disabled) to call gesballoc() with tail 952 * call optimization. 953 */ 954 if (!str_ftnever) { 955 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 956 frp, freebs_enqueue, KM_NOSLEEP); 957 958 if (mp != NULL) 959 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 960 return (mp); 961 } 962 963 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 964 frp, freebs_enqueue, KM_NOSLEEP)); 965 } 966 967 /*ARGSUSED*/ 968 mblk_t * 969 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 970 { 971 mblk_t *mp; 972 973 /* 974 * Note that this is structured to allow the common case (i.e. 975 * STREAMS flowtracing disabled) to call gesballoc() with tail 976 * call optimization. 977 */ 978 if (!str_ftnever) { 979 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 980 frp, dblk_lastfree_desb, KM_NOSLEEP); 981 982 if (mp != NULL) 983 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 984 return (mp); 985 } 986 987 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 988 frp, dblk_lastfree_desb, KM_NOSLEEP)); 989 } 990 991 static void 992 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 993 { 994 bcache_t *bcp = dbp->db_cache; 995 996 ASSERT(dbp->db_mblk == mp); 997 if (dbp->db_fthdr != NULL) 998 str_ftfree(dbp); 999 1000 /* set credp and projid to be 'unspecified' before returning to cache */ 1001 if (dbp->db_credp != NULL) { 1002 crfree(dbp->db_credp); 1003 dbp->db_credp = NULL; 1004 } 1005 dbp->db_cpid = -1; 1006 dbp->db_struioflag = 0; 1007 dbp->db_struioun.cksum.flags = 0; 1008 1009 mutex_enter(&bcp->mutex); 1010 kmem_cache_free(bcp->dblk_cache, dbp); 1011 bcp->alloc--; 1012 1013 if (bcp->alloc == 0 && bcp->destroy != 0) { 1014 kmem_cache_destroy(bcp->dblk_cache); 1015 kmem_cache_destroy(bcp->buffer_cache); 1016 mutex_exit(&bcp->mutex); 1017 mutex_destroy(&bcp->mutex); 1018 kmem_free(bcp, sizeof (bcache_t)); 1019 } else { 1020 mutex_exit(&bcp->mutex); 1021 } 1022 } 1023 1024 bcache_t * 1025 bcache_create(char *name, size_t size, uint_t align) 1026 { 1027 bcache_t *bcp; 1028 char buffer[255]; 1029 1030 ASSERT((align & (align - 1)) == 0); 1031 1032 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL) 1033 return (NULL); 1034 1035 bcp->size = size; 1036 bcp->align = align; 1037 bcp->alloc = 0; 1038 bcp->destroy = 0; 1039 1040 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 1041 1042 (void) sprintf(buffer, "%s_buffer_cache", name); 1043 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 1044 NULL, NULL, NULL, 0); 1045 (void) sprintf(buffer, "%s_dblk_cache", name); 1046 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 1047 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 1048 NULL, (void *)bcp, NULL, 0); 1049 1050 return (bcp); 1051 } 1052 1053 void 1054 bcache_destroy(bcache_t *bcp) 1055 { 1056 ASSERT(bcp != NULL); 1057 1058 mutex_enter(&bcp->mutex); 1059 if (bcp->alloc == 0) { 1060 kmem_cache_destroy(bcp->dblk_cache); 1061 kmem_cache_destroy(bcp->buffer_cache); 1062 mutex_exit(&bcp->mutex); 1063 mutex_destroy(&bcp->mutex); 1064 kmem_free(bcp, sizeof (bcache_t)); 1065 } else { 1066 bcp->destroy++; 1067 mutex_exit(&bcp->mutex); 1068 } 1069 } 1070 1071 /*ARGSUSED*/ 1072 mblk_t * 1073 bcache_allocb(bcache_t *bcp, uint_t pri) 1074 { 1075 dblk_t *dbp; 1076 mblk_t *mp = NULL; 1077 1078 ASSERT(bcp != NULL); 1079 1080 mutex_enter(&bcp->mutex); 1081 if (bcp->destroy != 0) { 1082 mutex_exit(&bcp->mutex); 1083 goto out; 1084 } 1085 1086 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 1087 mutex_exit(&bcp->mutex); 1088 goto out; 1089 } 1090 bcp->alloc++; 1091 mutex_exit(&bcp->mutex); 1092 1093 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 1094 1095 mp = dbp->db_mblk; 1096 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1097 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1098 mp->b_rptr = mp->b_wptr = dbp->db_base; 1099 mp->b_queue = NULL; 1100 MBLK_BAND_FLAG_WORD(mp) = 0; 1101 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 1102 out: 1103 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 1104 1105 return (mp); 1106 } 1107 1108 static void 1109 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 1110 { 1111 ASSERT(dbp->db_mblk == mp); 1112 if (dbp->db_fthdr != NULL) 1113 str_ftfree(dbp); 1114 1115 /* set credp and projid to be 'unspecified' before returning to cache */ 1116 if (dbp->db_credp != NULL) { 1117 crfree(dbp->db_credp); 1118 dbp->db_credp = NULL; 1119 } 1120 dbp->db_cpid = -1; 1121 dbp->db_struioflag = 0; 1122 dbp->db_struioun.cksum.flags = 0; 1123 1124 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 1125 kmem_cache_free(dbp->db_cache, dbp); 1126 } 1127 1128 static mblk_t * 1129 allocb_oversize(size_t size, int kmflags) 1130 { 1131 mblk_t *mp; 1132 void *buf; 1133 1134 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 1135 if ((buf = kmem_alloc(size, kmflags)) == NULL) 1136 return (NULL); 1137 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 1138 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 1139 kmem_free(buf, size); 1140 1141 if (mp != NULL) 1142 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 1143 1144 return (mp); 1145 } 1146 1147 mblk_t * 1148 allocb_tryhard(size_t target_size) 1149 { 1150 size_t size; 1151 mblk_t *bp; 1152 1153 for (size = target_size; size < target_size + 512; 1154 size += DBLK_CACHE_ALIGN) 1155 if ((bp = allocb(size, BPRI_HI)) != NULL) 1156 return (bp); 1157 allocb_tryhard_fails++; 1158 return (NULL); 1159 } 1160 1161 /* 1162 * This routine is consolidation private for STREAMS internal use 1163 * This routine may only be called from sync routines (i.e., not 1164 * from put or service procedures). It is located here (rather 1165 * than strsubr.c) so that we don't have to expose all of the 1166 * allocb() implementation details in header files. 1167 */ 1168 mblk_t * 1169 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 1170 { 1171 dblk_t *dbp; 1172 mblk_t *mp; 1173 size_t index; 1174 1175 index = (size -1) >> DBLK_SIZE_SHIFT; 1176 1177 if (flags & STR_NOSIG) { 1178 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 1179 if (size != 0) { 1180 mp = allocb_oversize(size, KM_SLEEP); 1181 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1182 (uintptr_t)mp); 1183 return (mp); 1184 } 1185 index = 0; 1186 } 1187 1188 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1189 mp = dbp->db_mblk; 1190 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1191 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1192 mp->b_rptr = mp->b_wptr = dbp->db_base; 1193 mp->b_queue = NULL; 1194 MBLK_BAND_FLAG_WORD(mp) = 0; 1195 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1196 1197 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1198 1199 } else { 1200 while ((mp = allocb(size, pri)) == NULL) { 1201 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1202 return (NULL); 1203 } 1204 } 1205 1206 return (mp); 1207 } 1208 1209 /* 1210 * Call function 'func' with 'arg' when a class zero block can 1211 * be allocated with priority 'pri'. 1212 */ 1213 bufcall_id_t 1214 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1215 { 1216 return (bufcall(1, pri, func, arg)); 1217 } 1218 1219 /* 1220 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1221 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1222 * This provides consistency for all internal allocators of ioctl. 1223 */ 1224 mblk_t * 1225 mkiocb(uint_t cmd) 1226 { 1227 struct iocblk *ioc; 1228 mblk_t *mp; 1229 1230 /* 1231 * Allocate enough space for any of the ioctl related messages. 1232 */ 1233 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1234 return (NULL); 1235 1236 bzero(mp->b_rptr, sizeof (union ioctypes)); 1237 1238 /* 1239 * Set the mblk_t information and ptrs correctly. 1240 */ 1241 mp->b_wptr += sizeof (struct iocblk); 1242 mp->b_datap->db_type = M_IOCTL; 1243 1244 /* 1245 * Fill in the fields. 1246 */ 1247 ioc = (struct iocblk *)mp->b_rptr; 1248 ioc->ioc_cmd = cmd; 1249 ioc->ioc_cr = kcred; 1250 ioc->ioc_id = getiocseqno(); 1251 ioc->ioc_flag = IOC_NATIVE; 1252 return (mp); 1253 } 1254 1255 /* 1256 * test if block of given size can be allocated with a request of 1257 * the given priority. 1258 * 'pri' is no longer used, but is retained for compatibility. 1259 */ 1260 /* ARGSUSED */ 1261 int 1262 testb(size_t size, uint_t pri) 1263 { 1264 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1265 } 1266 1267 /* 1268 * Call function 'func' with argument 'arg' when there is a reasonably 1269 * good chance that a block of size 'size' can be allocated. 1270 * 'pri' is no longer used, but is retained for compatibility. 1271 */ 1272 /* ARGSUSED */ 1273 bufcall_id_t 1274 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1275 { 1276 static long bid = 1; /* always odd to save checking for zero */ 1277 bufcall_id_t bc_id; 1278 struct strbufcall *bcp; 1279 1280 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1281 return (0); 1282 1283 bcp->bc_func = func; 1284 bcp->bc_arg = arg; 1285 bcp->bc_size = size; 1286 bcp->bc_next = NULL; 1287 bcp->bc_executor = NULL; 1288 1289 mutex_enter(&strbcall_lock); 1290 /* 1291 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1292 * should be no references to bcp since it may be freed by 1293 * runbufcalls(). Since bcp_id field is returned, we save its value in 1294 * the local var. 1295 */ 1296 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1297 1298 /* 1299 * add newly allocated stream event to existing 1300 * linked list of events. 1301 */ 1302 if (strbcalls.bc_head == NULL) { 1303 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1304 } else { 1305 strbcalls.bc_tail->bc_next = bcp; 1306 strbcalls.bc_tail = bcp; 1307 } 1308 1309 cv_signal(&strbcall_cv); 1310 mutex_exit(&strbcall_lock); 1311 return (bc_id); 1312 } 1313 1314 /* 1315 * Cancel a bufcall request. 1316 */ 1317 void 1318 unbufcall(bufcall_id_t id) 1319 { 1320 strbufcall_t *bcp, *pbcp; 1321 1322 mutex_enter(&strbcall_lock); 1323 again: 1324 pbcp = NULL; 1325 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1326 if (id == bcp->bc_id) 1327 break; 1328 pbcp = bcp; 1329 } 1330 if (bcp) { 1331 if (bcp->bc_executor != NULL) { 1332 if (bcp->bc_executor != curthread) { 1333 cv_wait(&bcall_cv, &strbcall_lock); 1334 goto again; 1335 } 1336 } else { 1337 if (pbcp) 1338 pbcp->bc_next = bcp->bc_next; 1339 else 1340 strbcalls.bc_head = bcp->bc_next; 1341 if (bcp == strbcalls.bc_tail) 1342 strbcalls.bc_tail = pbcp; 1343 kmem_free(bcp, sizeof (strbufcall_t)); 1344 } 1345 } 1346 mutex_exit(&strbcall_lock); 1347 } 1348 1349 /* 1350 * Duplicate a message block by block (uses dupb), returning 1351 * a pointer to the duplicate message. 1352 * Returns a non-NULL value only if the entire message 1353 * was dup'd. 1354 */ 1355 mblk_t * 1356 dupmsg(mblk_t *bp) 1357 { 1358 mblk_t *head, *nbp; 1359 1360 if (!bp || !(nbp = head = dupb(bp))) 1361 return (NULL); 1362 1363 while (bp->b_cont) { 1364 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1365 freemsg(head); 1366 return (NULL); 1367 } 1368 nbp = nbp->b_cont; 1369 bp = bp->b_cont; 1370 } 1371 return (head); 1372 } 1373 1374 #define DUPB_NOLOAN(bp) \ 1375 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1376 copyb((bp)) : dupb((bp))) 1377 1378 mblk_t * 1379 dupmsg_noloan(mblk_t *bp) 1380 { 1381 mblk_t *head, *nbp; 1382 1383 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1384 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1385 return (NULL); 1386 1387 while (bp->b_cont) { 1388 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1389 freemsg(head); 1390 return (NULL); 1391 } 1392 nbp = nbp->b_cont; 1393 bp = bp->b_cont; 1394 } 1395 return (head); 1396 } 1397 1398 /* 1399 * Copy data from message and data block to newly allocated message and 1400 * data block. Returns new message block pointer, or NULL if error. 1401 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1402 * as in the original even when db_base is not word aligned. (bug 1052877) 1403 */ 1404 mblk_t * 1405 copyb(mblk_t *bp) 1406 { 1407 mblk_t *nbp; 1408 dblk_t *dp, *ndp; 1409 uchar_t *base; 1410 size_t size; 1411 size_t unaligned; 1412 1413 ASSERT(bp->b_wptr >= bp->b_rptr); 1414 1415 dp = bp->b_datap; 1416 if (dp->db_fthdr != NULL) 1417 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1418 1419 /* 1420 * Special handling for Multidata message; this should be 1421 * removed once a copy-callback routine is made available. 1422 */ 1423 if (dp->db_type == M_MULTIDATA) { 1424 cred_t *cr; 1425 1426 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1427 return (NULL); 1428 1429 nbp->b_flag = bp->b_flag; 1430 nbp->b_band = bp->b_band; 1431 ndp = nbp->b_datap; 1432 1433 /* See comments below on potential issues. */ 1434 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1435 1436 ASSERT(ndp->db_type == dp->db_type); 1437 cr = dp->db_credp; 1438 if (cr != NULL) 1439 crhold(ndp->db_credp = cr); 1440 ndp->db_cpid = dp->db_cpid; 1441 return (nbp); 1442 } 1443 1444 size = dp->db_lim - dp->db_base; 1445 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1446 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1447 return (NULL); 1448 nbp->b_flag = bp->b_flag; 1449 nbp->b_band = bp->b_band; 1450 ndp = nbp->b_datap; 1451 1452 /* 1453 * Well, here is a potential issue. If we are trying to 1454 * trace a flow, and we copy the message, we might lose 1455 * information about where this message might have been. 1456 * So we should inherit the FT data. On the other hand, 1457 * a user might be interested only in alloc to free data. 1458 * So I guess the real answer is to provide a tunable. 1459 */ 1460 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1461 1462 base = ndp->db_base + unaligned; 1463 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1464 1465 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1466 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1467 1468 return (nbp); 1469 } 1470 1471 /* 1472 * Copy data from message to newly allocated message using new 1473 * data blocks. Returns a pointer to the new message, or NULL if error. 1474 */ 1475 mblk_t * 1476 copymsg(mblk_t *bp) 1477 { 1478 mblk_t *head, *nbp; 1479 1480 if (!bp || !(nbp = head = copyb(bp))) 1481 return (NULL); 1482 1483 while (bp->b_cont) { 1484 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1485 freemsg(head); 1486 return (NULL); 1487 } 1488 nbp = nbp->b_cont; 1489 bp = bp->b_cont; 1490 } 1491 return (head); 1492 } 1493 1494 /* 1495 * link a message block to tail of message 1496 */ 1497 void 1498 linkb(mblk_t *mp, mblk_t *bp) 1499 { 1500 ASSERT(mp && bp); 1501 1502 for (; mp->b_cont; mp = mp->b_cont) 1503 ; 1504 mp->b_cont = bp; 1505 } 1506 1507 /* 1508 * unlink a message block from head of message 1509 * return pointer to new message. 1510 * NULL if message becomes empty. 1511 */ 1512 mblk_t * 1513 unlinkb(mblk_t *bp) 1514 { 1515 mblk_t *bp1; 1516 1517 bp1 = bp->b_cont; 1518 bp->b_cont = NULL; 1519 return (bp1); 1520 } 1521 1522 /* 1523 * remove a message block "bp" from message "mp" 1524 * 1525 * Return pointer to new message or NULL if no message remains. 1526 * Return -1 if bp is not found in message. 1527 */ 1528 mblk_t * 1529 rmvb(mblk_t *mp, mblk_t *bp) 1530 { 1531 mblk_t *tmp; 1532 mblk_t *lastp = NULL; 1533 1534 ASSERT(mp && bp); 1535 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1536 if (tmp == bp) { 1537 if (lastp) 1538 lastp->b_cont = tmp->b_cont; 1539 else 1540 mp = tmp->b_cont; 1541 tmp->b_cont = NULL; 1542 return (mp); 1543 } 1544 lastp = tmp; 1545 } 1546 return ((mblk_t *)-1); 1547 } 1548 1549 /* 1550 * Concatenate and align first len bytes of common 1551 * message type. Len == -1, means concat everything. 1552 * Returns 1 on success, 0 on failure 1553 * After the pullup, mp points to the pulled up data. 1554 */ 1555 int 1556 pullupmsg(mblk_t *mp, ssize_t len) 1557 { 1558 mblk_t *bp, *b_cont; 1559 dblk_t *dbp; 1560 ssize_t n; 1561 1562 ASSERT(mp->b_datap->db_ref > 0); 1563 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1564 1565 /* 1566 * We won't handle Multidata message, since it contains 1567 * metadata which this function has no knowledge of; we 1568 * assert on DEBUG, and return failure otherwise. 1569 */ 1570 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1571 if (mp->b_datap->db_type == M_MULTIDATA) 1572 return (0); 1573 1574 if (len == -1) { 1575 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1576 return (1); 1577 len = xmsgsize(mp); 1578 } else { 1579 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1580 ASSERT(first_mblk_len >= 0); 1581 /* 1582 * If the length is less than that of the first mblk, 1583 * we want to pull up the message into an aligned mblk. 1584 * Though not part of the spec, some callers assume it. 1585 */ 1586 if (len <= first_mblk_len) { 1587 if (str_aligned(mp->b_rptr)) 1588 return (1); 1589 len = first_mblk_len; 1590 } else if (xmsgsize(mp) < len) 1591 return (0); 1592 } 1593 1594 if ((bp = allocb_tmpl(len, mp)) == NULL) 1595 return (0); 1596 1597 dbp = bp->b_datap; 1598 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1599 mp->b_datap = dbp; /* ... and mp heads the new message */ 1600 mp->b_datap->db_mblk = mp; 1601 bp->b_datap->db_mblk = bp; 1602 mp->b_rptr = mp->b_wptr = dbp->db_base; 1603 1604 do { 1605 ASSERT(bp->b_datap->db_ref > 0); 1606 ASSERT(bp->b_wptr >= bp->b_rptr); 1607 n = MIN(bp->b_wptr - bp->b_rptr, len); 1608 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1609 if (n > 0) 1610 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1611 mp->b_wptr += n; 1612 bp->b_rptr += n; 1613 len -= n; 1614 if (bp->b_rptr != bp->b_wptr) 1615 break; 1616 b_cont = bp->b_cont; 1617 freeb(bp); 1618 bp = b_cont; 1619 } while (len && bp); 1620 1621 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1622 1623 return (1); 1624 } 1625 1626 /* 1627 * Concatenate and align at least the first len bytes of common message 1628 * type. Len == -1 means concatenate everything. The original message is 1629 * unaltered. Returns a pointer to a new message on success, otherwise 1630 * returns NULL. 1631 */ 1632 mblk_t * 1633 msgpullup(mblk_t *mp, ssize_t len) 1634 { 1635 mblk_t *newmp; 1636 ssize_t totlen; 1637 ssize_t n; 1638 1639 /* 1640 * We won't handle Multidata message, since it contains 1641 * metadata which this function has no knowledge of; we 1642 * assert on DEBUG, and return failure otherwise. 1643 */ 1644 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1645 if (mp->b_datap->db_type == M_MULTIDATA) 1646 return (NULL); 1647 1648 totlen = xmsgsize(mp); 1649 1650 if ((len > 0) && (len > totlen)) 1651 return (NULL); 1652 1653 /* 1654 * Copy all of the first msg type into one new mblk, then dupmsg 1655 * and link the rest onto this. 1656 */ 1657 1658 len = totlen; 1659 1660 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1661 return (NULL); 1662 1663 newmp->b_flag = mp->b_flag; 1664 newmp->b_band = mp->b_band; 1665 1666 while (len > 0) { 1667 n = mp->b_wptr - mp->b_rptr; 1668 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1669 if (n > 0) 1670 bcopy(mp->b_rptr, newmp->b_wptr, n); 1671 newmp->b_wptr += n; 1672 len -= n; 1673 mp = mp->b_cont; 1674 } 1675 1676 if (mp != NULL) { 1677 newmp->b_cont = dupmsg(mp); 1678 if (newmp->b_cont == NULL) { 1679 freemsg(newmp); 1680 return (NULL); 1681 } 1682 } 1683 1684 return (newmp); 1685 } 1686 1687 /* 1688 * Trim bytes from message 1689 * len > 0, trim from head 1690 * len < 0, trim from tail 1691 * Returns 1 on success, 0 on failure. 1692 */ 1693 int 1694 adjmsg(mblk_t *mp, ssize_t len) 1695 { 1696 mblk_t *bp; 1697 mblk_t *save_bp = NULL; 1698 mblk_t *prev_bp; 1699 mblk_t *bcont; 1700 unsigned char type; 1701 ssize_t n; 1702 int fromhead; 1703 int first; 1704 1705 ASSERT(mp != NULL); 1706 /* 1707 * We won't handle Multidata message, since it contains 1708 * metadata which this function has no knowledge of; we 1709 * assert on DEBUG, and return failure otherwise. 1710 */ 1711 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1712 if (mp->b_datap->db_type == M_MULTIDATA) 1713 return (0); 1714 1715 if (len < 0) { 1716 fromhead = 0; 1717 len = -len; 1718 } else { 1719 fromhead = 1; 1720 } 1721 1722 if (xmsgsize(mp) < len) 1723 return (0); 1724 1725 if (fromhead) { 1726 first = 1; 1727 while (len) { 1728 ASSERT(mp->b_wptr >= mp->b_rptr); 1729 n = MIN(mp->b_wptr - mp->b_rptr, len); 1730 mp->b_rptr += n; 1731 len -= n; 1732 1733 /* 1734 * If this is not the first zero length 1735 * message remove it 1736 */ 1737 if (!first && (mp->b_wptr == mp->b_rptr)) { 1738 bcont = mp->b_cont; 1739 freeb(mp); 1740 mp = save_bp->b_cont = bcont; 1741 } else { 1742 save_bp = mp; 1743 mp = mp->b_cont; 1744 } 1745 first = 0; 1746 } 1747 } else { 1748 type = mp->b_datap->db_type; 1749 while (len) { 1750 bp = mp; 1751 save_bp = NULL; 1752 1753 /* 1754 * Find the last message of same type 1755 */ 1756 while (bp && bp->b_datap->db_type == type) { 1757 ASSERT(bp->b_wptr >= bp->b_rptr); 1758 prev_bp = save_bp; 1759 save_bp = bp; 1760 bp = bp->b_cont; 1761 } 1762 if (save_bp == NULL) 1763 break; 1764 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1765 save_bp->b_wptr -= n; 1766 len -= n; 1767 1768 /* 1769 * If this is not the first message 1770 * and we have taken away everything 1771 * from this message, remove it 1772 */ 1773 1774 if ((save_bp != mp) && 1775 (save_bp->b_wptr == save_bp->b_rptr)) { 1776 bcont = save_bp->b_cont; 1777 freeb(save_bp); 1778 prev_bp->b_cont = bcont; 1779 } 1780 } 1781 } 1782 return (1); 1783 } 1784 1785 /* 1786 * get number of data bytes in message 1787 */ 1788 size_t 1789 msgdsize(mblk_t *bp) 1790 { 1791 size_t count = 0; 1792 1793 for (; bp; bp = bp->b_cont) 1794 if (bp->b_datap->db_type == M_DATA) { 1795 ASSERT(bp->b_wptr >= bp->b_rptr); 1796 count += bp->b_wptr - bp->b_rptr; 1797 } 1798 return (count); 1799 } 1800 1801 /* 1802 * Get a message off head of queue 1803 * 1804 * If queue has no buffers then mark queue 1805 * with QWANTR. (queue wants to be read by 1806 * someone when data becomes available) 1807 * 1808 * If there is something to take off then do so. 1809 * If queue falls below hi water mark turn off QFULL 1810 * flag. Decrement weighted count of queue. 1811 * Also turn off QWANTR because queue is being read. 1812 * 1813 * The queue count is maintained on a per-band basis. 1814 * Priority band 0 (normal messages) uses q_count, 1815 * q_lowat, etc. Non-zero priority bands use the 1816 * fields in their respective qband structures 1817 * (qb_count, qb_lowat, etc.) All messages appear 1818 * on the same list, linked via their b_next pointers. 1819 * q_first is the head of the list. q_count does 1820 * not reflect the size of all the messages on the 1821 * queue. It only reflects those messages in the 1822 * normal band of flow. The one exception to this 1823 * deals with high priority messages. They are in 1824 * their own conceptual "band", but are accounted 1825 * against q_count. 1826 * 1827 * If queue count is below the lo water mark and QWANTW 1828 * is set, enable the closest backq which has a service 1829 * procedure and turn off the QWANTW flag. 1830 * 1831 * getq could be built on top of rmvq, but isn't because 1832 * of performance considerations. 1833 * 1834 * A note on the use of q_count and q_mblkcnt: 1835 * q_count is the traditional byte count for messages that 1836 * have been put on a queue. Documentation tells us that 1837 * we shouldn't rely on that count, but some drivers/modules 1838 * do. What was needed, however, is a mechanism to prevent 1839 * runaway streams from consuming all of the resources, 1840 * and particularly be able to flow control zero-length 1841 * messages. q_mblkcnt is used for this purpose. It 1842 * counts the number of mblk's that are being put on 1843 * the queue. The intention here, is that each mblk should 1844 * contain one byte of data and, for the purpose of 1845 * flow-control, logically does. A queue will become 1846 * full when EITHER of these values (q_count and q_mblkcnt) 1847 * reach the highwater mark. It will clear when BOTH 1848 * of them drop below the highwater mark. And it will 1849 * backenable when BOTH of them drop below the lowwater 1850 * mark. 1851 * With this algorithm, a driver/module might be able 1852 * to find a reasonably accurate q_count, and the 1853 * framework can still try and limit resource usage. 1854 */ 1855 mblk_t * 1856 getq(queue_t *q) 1857 { 1858 mblk_t *bp; 1859 uchar_t band = 0; 1860 1861 bp = getq_noenab(q, 0); 1862 if (bp != NULL) 1863 band = bp->b_band; 1864 1865 /* 1866 * Inlined from qbackenable(). 1867 * Quick check without holding the lock. 1868 */ 1869 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1870 return (bp); 1871 1872 qbackenable(q, band); 1873 return (bp); 1874 } 1875 1876 /* 1877 * Calculate number of data bytes in a single data message block taking 1878 * multidata messages into account. 1879 */ 1880 1881 #define ADD_MBLK_SIZE(mp, size) \ 1882 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1883 (size) += MBLKL(mp); \ 1884 } else { \ 1885 uint_t pinuse; \ 1886 \ 1887 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1888 (size) += pinuse; \ 1889 } 1890 1891 /* 1892 * Returns the number of bytes in a message (a message is defined as a 1893 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we 1894 * also return the number of distinct mblks in the message. 1895 */ 1896 int 1897 mp_cont_len(mblk_t *bp, int *mblkcnt) 1898 { 1899 mblk_t *mp; 1900 int mblks = 0; 1901 int bytes = 0; 1902 1903 for (mp = bp; mp != NULL; mp = mp->b_cont) { 1904 ADD_MBLK_SIZE(mp, bytes); 1905 mblks++; 1906 } 1907 1908 if (mblkcnt != NULL) 1909 *mblkcnt = mblks; 1910 1911 return (bytes); 1912 } 1913 1914 /* 1915 * Like getq() but does not backenable. This is used by the stream 1916 * head when a putback() is likely. The caller must call qbackenable() 1917 * after it is done with accessing the queue. 1918 * The rbytes arguments to getq_noneab() allows callers to specify a 1919 * the maximum number of bytes to return. If the current amount on the 1920 * queue is less than this then the entire message will be returned. 1921 * A value of 0 returns the entire message and is equivalent to the old 1922 * default behaviour prior to the addition of the rbytes argument. 1923 */ 1924 mblk_t * 1925 getq_noenab(queue_t *q, ssize_t rbytes) 1926 { 1927 mblk_t *bp, *mp1; 1928 mblk_t *mp2 = NULL; 1929 qband_t *qbp; 1930 kthread_id_t freezer; 1931 int bytecnt = 0, mblkcnt = 0; 1932 1933 /* freezestr should allow its caller to call getq/putq */ 1934 freezer = STREAM(q)->sd_freezer; 1935 if (freezer == curthread) { 1936 ASSERT(frozenstr(q)); 1937 ASSERT(MUTEX_HELD(QLOCK(q))); 1938 } else 1939 mutex_enter(QLOCK(q)); 1940 1941 if ((bp = q->q_first) == 0) { 1942 q->q_flag |= QWANTR; 1943 } else { 1944 /* 1945 * If the caller supplied a byte threshold and there is 1946 * more than this amount on the queue then break up the 1947 * the message appropriately. We can only safely do 1948 * this for M_DATA messages. 1949 */ 1950 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) && 1951 (q->q_count > rbytes)) { 1952 /* 1953 * Inline version of mp_cont_len() which terminates 1954 * when we meet or exceed rbytes. 1955 */ 1956 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) { 1957 mblkcnt++; 1958 ADD_MBLK_SIZE(mp1, bytecnt); 1959 if (bytecnt >= rbytes) 1960 break; 1961 } 1962 /* 1963 * We need to account for the following scenarios: 1964 * 1965 * 1) Too much data in the first message: 1966 * mp1 will be the mblk which puts us over our 1967 * byte limit. 1968 * 2) Not enough data in the first message: 1969 * mp1 will be NULL. 1970 * 3) Exactly the right amount of data contained within 1971 * whole mblks: 1972 * mp1->b_cont will be where we break the message. 1973 */ 1974 if (bytecnt > rbytes) { 1975 /* 1976 * Dup/copy mp1 and put what we don't need 1977 * back onto the queue. Adjust the read/write 1978 * and continuation pointers appropriately 1979 * and decrement the current mblk count to 1980 * reflect we are putting an mblk back onto 1981 * the queue. 1982 * When adjusting the message pointers, it's 1983 * OK to use the existing bytecnt and the 1984 * requested amount (rbytes) to calculate the 1985 * the new write offset (b_wptr) of what we 1986 * are taking. However, we cannot use these 1987 * values when calculating the read offset of 1988 * the mblk we are putting back on the queue. 1989 * This is because the begining (b_rptr) of the 1990 * mblk represents some arbitrary point within 1991 * the message. 1992 * It's simplest to do this by advancing b_rptr 1993 * by the new length of mp1 as we don't have to 1994 * remember any intermediate state. 1995 */ 1996 ASSERT(mp1 != NULL); 1997 mblkcnt--; 1998 if ((mp2 = dupb(mp1)) == NULL && 1999 (mp2 = copyb(mp1)) == NULL) { 2000 bytecnt = mblkcnt = 0; 2001 goto dup_failed; 2002 } 2003 mp2->b_cont = mp1->b_cont; 2004 mp1->b_wptr -= bytecnt - rbytes; 2005 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr; 2006 mp1->b_cont = NULL; 2007 bytecnt = rbytes; 2008 } else { 2009 /* 2010 * Either there is not enough data in the first 2011 * message or there is no excess data to deal 2012 * with. If mp1 is NULL, we are taking the 2013 * whole message. No need to do anything. 2014 * Otherwise we assign mp1->b_cont to mp2 as 2015 * we will be putting this back onto the head of 2016 * the queue. 2017 */ 2018 if (mp1 != NULL) { 2019 mp2 = mp1->b_cont; 2020 mp1->b_cont = NULL; 2021 } 2022 } 2023 /* 2024 * If mp2 is not NULL then we have part of the message 2025 * to put back onto the queue. 2026 */ 2027 if (mp2 != NULL) { 2028 if ((mp2->b_next = bp->b_next) == NULL) 2029 q->q_last = mp2; 2030 else 2031 bp->b_next->b_prev = mp2; 2032 q->q_first = mp2; 2033 } else { 2034 if ((q->q_first = bp->b_next) == NULL) 2035 q->q_last = NULL; 2036 else 2037 q->q_first->b_prev = NULL; 2038 } 2039 } else { 2040 /* 2041 * Either no byte threshold was supplied, there is 2042 * not enough on the queue or we failed to 2043 * duplicate/copy a data block. In these cases we 2044 * just take the entire first message. 2045 */ 2046 dup_failed: 2047 bytecnt = mp_cont_len(bp, &mblkcnt); 2048 if ((q->q_first = bp->b_next) == NULL) 2049 q->q_last = NULL; 2050 else 2051 q->q_first->b_prev = NULL; 2052 } 2053 if (bp->b_band == 0) { 2054 q->q_count -= bytecnt; 2055 q->q_mblkcnt -= mblkcnt; 2056 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2057 (q->q_mblkcnt < q->q_hiwat))) { 2058 q->q_flag &= ~QFULL; 2059 } 2060 } else { 2061 int i; 2062 2063 ASSERT(bp->b_band <= q->q_nband); 2064 ASSERT(q->q_bandp != NULL); 2065 ASSERT(MUTEX_HELD(QLOCK(q))); 2066 qbp = q->q_bandp; 2067 i = bp->b_band; 2068 while (--i > 0) 2069 qbp = qbp->qb_next; 2070 if (qbp->qb_first == qbp->qb_last) { 2071 qbp->qb_first = NULL; 2072 qbp->qb_last = NULL; 2073 } else { 2074 qbp->qb_first = bp->b_next; 2075 } 2076 qbp->qb_count -= bytecnt; 2077 qbp->qb_mblkcnt -= mblkcnt; 2078 if (qbp->qb_mblkcnt == 0 || 2079 ((qbp->qb_count < qbp->qb_hiwat) && 2080 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2081 qbp->qb_flag &= ~QB_FULL; 2082 } 2083 } 2084 q->q_flag &= ~QWANTR; 2085 bp->b_next = NULL; 2086 bp->b_prev = NULL; 2087 } 2088 if (freezer != curthread) 2089 mutex_exit(QLOCK(q)); 2090 2091 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL); 2092 2093 return (bp); 2094 } 2095 2096 /* 2097 * Determine if a backenable is needed after removing a message in the 2098 * specified band. 2099 * NOTE: This routine assumes that something like getq_noenab() has been 2100 * already called. 2101 * 2102 * For the read side it is ok to hold sd_lock across calling this (and the 2103 * stream head often does). 2104 * But for the write side strwakeq might be invoked and it acquires sd_lock. 2105 */ 2106 void 2107 qbackenable(queue_t *q, uchar_t band) 2108 { 2109 int backenab = 0; 2110 qband_t *qbp; 2111 kthread_id_t freezer; 2112 2113 ASSERT(q); 2114 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 2115 2116 /* 2117 * Quick check without holding the lock. 2118 * OK since after getq() has lowered the q_count these flags 2119 * would not change unless either the qbackenable() is done by 2120 * another thread (which is ok) or the queue has gotten QFULL 2121 * in which case another backenable will take place when the queue 2122 * drops below q_lowat. 2123 */ 2124 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 2125 return; 2126 2127 /* freezestr should allow its caller to call getq/putq */ 2128 freezer = STREAM(q)->sd_freezer; 2129 if (freezer == curthread) { 2130 ASSERT(frozenstr(q)); 2131 ASSERT(MUTEX_HELD(QLOCK(q))); 2132 } else 2133 mutex_enter(QLOCK(q)); 2134 2135 if (band == 0) { 2136 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 2137 q->q_mblkcnt < q->q_lowat)) { 2138 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 2139 } 2140 } else { 2141 int i; 2142 2143 ASSERT((unsigned)band <= q->q_nband); 2144 ASSERT(q->q_bandp != NULL); 2145 2146 qbp = q->q_bandp; 2147 i = band; 2148 while (--i > 0) 2149 qbp = qbp->qb_next; 2150 2151 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 2152 qbp->qb_mblkcnt < qbp->qb_lowat)) { 2153 backenab = qbp->qb_flag & QB_WANTW; 2154 } 2155 } 2156 2157 if (backenab == 0) { 2158 if (freezer != curthread) 2159 mutex_exit(QLOCK(q)); 2160 return; 2161 } 2162 2163 /* Have to drop the lock across strwakeq and backenable */ 2164 if (backenab & QWANTWSYNC) 2165 q->q_flag &= ~QWANTWSYNC; 2166 if (backenab & (QWANTW|QB_WANTW)) { 2167 if (band != 0) 2168 qbp->qb_flag &= ~QB_WANTW; 2169 else { 2170 q->q_flag &= ~QWANTW; 2171 } 2172 } 2173 2174 if (freezer != curthread) 2175 mutex_exit(QLOCK(q)); 2176 2177 if (backenab & QWANTWSYNC) 2178 strwakeq(q, QWANTWSYNC); 2179 if (backenab & (QWANTW|QB_WANTW)) 2180 backenable(q, band); 2181 } 2182 2183 /* 2184 * Remove a message from a queue. The queue count and other 2185 * flow control parameters are adjusted and the back queue 2186 * enabled if necessary. 2187 * 2188 * rmvq can be called with the stream frozen, but other utility functions 2189 * holding QLOCK, and by streams modules without any locks/frozen. 2190 */ 2191 void 2192 rmvq(queue_t *q, mblk_t *mp) 2193 { 2194 ASSERT(mp != NULL); 2195 2196 rmvq_noenab(q, mp); 2197 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 2198 /* 2199 * qbackenable can handle a frozen stream but not a "random" 2200 * qlock being held. Drop lock across qbackenable. 2201 */ 2202 mutex_exit(QLOCK(q)); 2203 qbackenable(q, mp->b_band); 2204 mutex_enter(QLOCK(q)); 2205 } else { 2206 qbackenable(q, mp->b_band); 2207 } 2208 } 2209 2210 /* 2211 * Like rmvq() but without any backenabling. 2212 * This exists to handle SR_CONSOL_DATA in strrput(). 2213 */ 2214 void 2215 rmvq_noenab(queue_t *q, mblk_t *mp) 2216 { 2217 int i; 2218 qband_t *qbp = NULL; 2219 kthread_id_t freezer; 2220 int bytecnt = 0, mblkcnt = 0; 2221 2222 freezer = STREAM(q)->sd_freezer; 2223 if (freezer == curthread) { 2224 ASSERT(frozenstr(q)); 2225 ASSERT(MUTEX_HELD(QLOCK(q))); 2226 } else if (MUTEX_HELD(QLOCK(q))) { 2227 /* Don't drop lock on exit */ 2228 freezer = curthread; 2229 } else 2230 mutex_enter(QLOCK(q)); 2231 2232 ASSERT(mp->b_band <= q->q_nband); 2233 if (mp->b_band != 0) { /* Adjust band pointers */ 2234 ASSERT(q->q_bandp != NULL); 2235 qbp = q->q_bandp; 2236 i = mp->b_band; 2237 while (--i > 0) 2238 qbp = qbp->qb_next; 2239 if (mp == qbp->qb_first) { 2240 if (mp->b_next && mp->b_band == mp->b_next->b_band) 2241 qbp->qb_first = mp->b_next; 2242 else 2243 qbp->qb_first = NULL; 2244 } 2245 if (mp == qbp->qb_last) { 2246 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 2247 qbp->qb_last = mp->b_prev; 2248 else 2249 qbp->qb_last = NULL; 2250 } 2251 } 2252 2253 /* 2254 * Remove the message from the list. 2255 */ 2256 if (mp->b_prev) 2257 mp->b_prev->b_next = mp->b_next; 2258 else 2259 q->q_first = mp->b_next; 2260 if (mp->b_next) 2261 mp->b_next->b_prev = mp->b_prev; 2262 else 2263 q->q_last = mp->b_prev; 2264 mp->b_next = NULL; 2265 mp->b_prev = NULL; 2266 2267 /* Get the size of the message for q_count accounting */ 2268 bytecnt = mp_cont_len(mp, &mblkcnt); 2269 2270 if (mp->b_band == 0) { /* Perform q_count accounting */ 2271 q->q_count -= bytecnt; 2272 q->q_mblkcnt -= mblkcnt; 2273 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2274 (q->q_mblkcnt < q->q_hiwat))) { 2275 q->q_flag &= ~QFULL; 2276 } 2277 } else { /* Perform qb_count accounting */ 2278 qbp->qb_count -= bytecnt; 2279 qbp->qb_mblkcnt -= mblkcnt; 2280 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) && 2281 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2282 qbp->qb_flag &= ~QB_FULL; 2283 } 2284 } 2285 if (freezer != curthread) 2286 mutex_exit(QLOCK(q)); 2287 2288 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL); 2289 } 2290 2291 /* 2292 * Empty a queue. 2293 * If flag is set, remove all messages. Otherwise, remove 2294 * only non-control messages. If queue falls below its low 2295 * water mark, and QWANTW is set, enable the nearest upstream 2296 * service procedure. 2297 * 2298 * Historical note: when merging the M_FLUSH code in strrput with this 2299 * code one difference was discovered. flushq did not have a check 2300 * for q_lowat == 0 in the backenabling test. 2301 * 2302 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2303 * if one exists on the queue. 2304 */ 2305 void 2306 flushq_common(queue_t *q, int flag, int pcproto_flag) 2307 { 2308 mblk_t *mp, *nmp; 2309 qband_t *qbp; 2310 int backenab = 0; 2311 unsigned char bpri; 2312 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2313 2314 if (q->q_first == NULL) 2315 return; 2316 2317 mutex_enter(QLOCK(q)); 2318 mp = q->q_first; 2319 q->q_first = NULL; 2320 q->q_last = NULL; 2321 q->q_count = 0; 2322 q->q_mblkcnt = 0; 2323 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2324 qbp->qb_first = NULL; 2325 qbp->qb_last = NULL; 2326 qbp->qb_count = 0; 2327 qbp->qb_mblkcnt = 0; 2328 qbp->qb_flag &= ~QB_FULL; 2329 } 2330 q->q_flag &= ~QFULL; 2331 mutex_exit(QLOCK(q)); 2332 while (mp) { 2333 nmp = mp->b_next; 2334 mp->b_next = mp->b_prev = NULL; 2335 2336 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL); 2337 2338 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2339 (void) putq(q, mp); 2340 else if (flag || datamsg(mp->b_datap->db_type)) 2341 freemsg(mp); 2342 else 2343 (void) putq(q, mp); 2344 mp = nmp; 2345 } 2346 bpri = 1; 2347 mutex_enter(QLOCK(q)); 2348 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2349 if ((qbp->qb_flag & QB_WANTW) && 2350 (((qbp->qb_count < qbp->qb_lowat) && 2351 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2352 qbp->qb_lowat == 0)) { 2353 qbp->qb_flag &= ~QB_WANTW; 2354 backenab = 1; 2355 qbf[bpri] = 1; 2356 } else 2357 qbf[bpri] = 0; 2358 bpri++; 2359 } 2360 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2361 if ((q->q_flag & QWANTW) && 2362 (((q->q_count < q->q_lowat) && 2363 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2364 q->q_flag &= ~QWANTW; 2365 backenab = 1; 2366 qbf[0] = 1; 2367 } else 2368 qbf[0] = 0; 2369 2370 /* 2371 * If any band can now be written to, and there is a writer 2372 * for that band, then backenable the closest service procedure. 2373 */ 2374 if (backenab) { 2375 mutex_exit(QLOCK(q)); 2376 for (bpri = q->q_nband; bpri != 0; bpri--) 2377 if (qbf[bpri]) 2378 backenable(q, bpri); 2379 if (qbf[0]) 2380 backenable(q, 0); 2381 } else 2382 mutex_exit(QLOCK(q)); 2383 } 2384 2385 /* 2386 * The real flushing takes place in flushq_common. This is done so that 2387 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2388 * or not. Currently the only place that uses this flag is the stream head. 2389 */ 2390 void 2391 flushq(queue_t *q, int flag) 2392 { 2393 flushq_common(q, flag, 0); 2394 } 2395 2396 /* 2397 * Flush the queue of messages of the given priority band. 2398 * There is some duplication of code between flushq and flushband. 2399 * This is because we want to optimize the code as much as possible. 2400 * The assumption is that there will be more messages in the normal 2401 * (priority 0) band than in any other. 2402 * 2403 * Historical note: when merging the M_FLUSH code in strrput with this 2404 * code one difference was discovered. flushband had an extra check for 2405 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2406 * case. That check does not match the man page for flushband and was not 2407 * in the strrput flush code hence it was removed. 2408 */ 2409 void 2410 flushband(queue_t *q, unsigned char pri, int flag) 2411 { 2412 mblk_t *mp; 2413 mblk_t *nmp; 2414 mblk_t *last; 2415 qband_t *qbp; 2416 int band; 2417 2418 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2419 if (pri > q->q_nband) { 2420 return; 2421 } 2422 mutex_enter(QLOCK(q)); 2423 if (pri == 0) { 2424 mp = q->q_first; 2425 q->q_first = NULL; 2426 q->q_last = NULL; 2427 q->q_count = 0; 2428 q->q_mblkcnt = 0; 2429 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2430 qbp->qb_first = NULL; 2431 qbp->qb_last = NULL; 2432 qbp->qb_count = 0; 2433 qbp->qb_mblkcnt = 0; 2434 qbp->qb_flag &= ~QB_FULL; 2435 } 2436 q->q_flag &= ~QFULL; 2437 mutex_exit(QLOCK(q)); 2438 while (mp) { 2439 nmp = mp->b_next; 2440 mp->b_next = mp->b_prev = NULL; 2441 if ((mp->b_band == 0) && 2442 ((flag == FLUSHALL) || 2443 datamsg(mp->b_datap->db_type))) 2444 freemsg(mp); 2445 else 2446 (void) putq(q, mp); 2447 mp = nmp; 2448 } 2449 mutex_enter(QLOCK(q)); 2450 if ((q->q_flag & QWANTW) && 2451 (((q->q_count < q->q_lowat) && 2452 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2453 q->q_flag &= ~QWANTW; 2454 mutex_exit(QLOCK(q)); 2455 2456 backenable(q, pri); 2457 } else 2458 mutex_exit(QLOCK(q)); 2459 } else { /* pri != 0 */ 2460 boolean_t flushed = B_FALSE; 2461 band = pri; 2462 2463 ASSERT(MUTEX_HELD(QLOCK(q))); 2464 qbp = q->q_bandp; 2465 while (--band > 0) 2466 qbp = qbp->qb_next; 2467 mp = qbp->qb_first; 2468 if (mp == NULL) { 2469 mutex_exit(QLOCK(q)); 2470 return; 2471 } 2472 last = qbp->qb_last->b_next; 2473 /* 2474 * rmvq_noenab() and freemsg() are called for each mblk that 2475 * meets the criteria. The loop is executed until the last 2476 * mblk has been processed. 2477 */ 2478 while (mp != last) { 2479 ASSERT(mp->b_band == pri); 2480 nmp = mp->b_next; 2481 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2482 rmvq_noenab(q, mp); 2483 freemsg(mp); 2484 flushed = B_TRUE; 2485 } 2486 mp = nmp; 2487 } 2488 mutex_exit(QLOCK(q)); 2489 2490 /* 2491 * If any mblk(s) has been freed, we know that qbackenable() 2492 * will need to be called. 2493 */ 2494 if (flushed) 2495 qbackenable(q, pri); 2496 } 2497 } 2498 2499 /* 2500 * Return 1 if the queue is not full. If the queue is full, return 2501 * 0 (may not put message) and set QWANTW flag (caller wants to write 2502 * to the queue). 2503 */ 2504 int 2505 canput(queue_t *q) 2506 { 2507 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2508 2509 /* this is for loopback transports, they should not do a canput */ 2510 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2511 2512 /* Find next forward module that has a service procedure */ 2513 q = q->q_nfsrv; 2514 2515 if (!(q->q_flag & QFULL)) { 2516 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2517 return (1); 2518 } 2519 mutex_enter(QLOCK(q)); 2520 if (q->q_flag & QFULL) { 2521 q->q_flag |= QWANTW; 2522 mutex_exit(QLOCK(q)); 2523 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2524 return (0); 2525 } 2526 mutex_exit(QLOCK(q)); 2527 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2528 return (1); 2529 } 2530 2531 /* 2532 * This is the new canput for use with priority bands. Return 1 if the 2533 * band is not full. If the band is full, return 0 (may not put message) 2534 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2535 * write to the queue). 2536 */ 2537 int 2538 bcanput(queue_t *q, unsigned char pri) 2539 { 2540 qband_t *qbp; 2541 2542 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2543 if (!q) 2544 return (0); 2545 2546 /* Find next forward module that has a service procedure */ 2547 q = q->q_nfsrv; 2548 2549 mutex_enter(QLOCK(q)); 2550 if (pri == 0) { 2551 if (q->q_flag & QFULL) { 2552 q->q_flag |= QWANTW; 2553 mutex_exit(QLOCK(q)); 2554 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2555 "bcanput:%p %X %d", q, pri, 0); 2556 return (0); 2557 } 2558 } else { /* pri != 0 */ 2559 if (pri > q->q_nband) { 2560 /* 2561 * No band exists yet, so return success. 2562 */ 2563 mutex_exit(QLOCK(q)); 2564 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2565 "bcanput:%p %X %d", q, pri, 1); 2566 return (1); 2567 } 2568 qbp = q->q_bandp; 2569 while (--pri) 2570 qbp = qbp->qb_next; 2571 if (qbp->qb_flag & QB_FULL) { 2572 qbp->qb_flag |= QB_WANTW; 2573 mutex_exit(QLOCK(q)); 2574 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2575 "bcanput:%p %X %d", q, pri, 0); 2576 return (0); 2577 } 2578 } 2579 mutex_exit(QLOCK(q)); 2580 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2581 "bcanput:%p %X %d", q, pri, 1); 2582 return (1); 2583 } 2584 2585 /* 2586 * Put a message on a queue. 2587 * 2588 * Messages are enqueued on a priority basis. The priority classes 2589 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2590 * and B_NORMAL (type < QPCTL && band == 0). 2591 * 2592 * Add appropriate weighted data block sizes to queue count. 2593 * If queue hits high water mark then set QFULL flag. 2594 * 2595 * If QNOENAB is not set (putq is allowed to enable the queue), 2596 * enable the queue only if the message is PRIORITY, 2597 * or the QWANTR flag is set (indicating that the service procedure 2598 * is ready to read the queue. This implies that a service 2599 * procedure must NEVER put a high priority message back on its own 2600 * queue, as this would result in an infinite loop (!). 2601 */ 2602 int 2603 putq(queue_t *q, mblk_t *bp) 2604 { 2605 mblk_t *tmp; 2606 qband_t *qbp = NULL; 2607 int mcls = (int)queclass(bp); 2608 kthread_id_t freezer; 2609 int bytecnt = 0, mblkcnt = 0; 2610 2611 freezer = STREAM(q)->sd_freezer; 2612 if (freezer == curthread) { 2613 ASSERT(frozenstr(q)); 2614 ASSERT(MUTEX_HELD(QLOCK(q))); 2615 } else 2616 mutex_enter(QLOCK(q)); 2617 2618 /* 2619 * Make sanity checks and if qband structure is not yet 2620 * allocated, do so. 2621 */ 2622 if (mcls == QPCTL) { 2623 if (bp->b_band != 0) 2624 bp->b_band = 0; /* force to be correct */ 2625 } else if (bp->b_band != 0) { 2626 int i; 2627 qband_t **qbpp; 2628 2629 if (bp->b_band > q->q_nband) { 2630 2631 /* 2632 * The qband structure for this priority band is 2633 * not on the queue yet, so we have to allocate 2634 * one on the fly. It would be wasteful to 2635 * associate the qband structures with every 2636 * queue when the queues are allocated. This is 2637 * because most queues will only need the normal 2638 * band of flow which can be described entirely 2639 * by the queue itself. 2640 */ 2641 qbpp = &q->q_bandp; 2642 while (*qbpp) 2643 qbpp = &(*qbpp)->qb_next; 2644 while (bp->b_band > q->q_nband) { 2645 if ((*qbpp = allocband()) == NULL) { 2646 if (freezer != curthread) 2647 mutex_exit(QLOCK(q)); 2648 return (0); 2649 } 2650 (*qbpp)->qb_hiwat = q->q_hiwat; 2651 (*qbpp)->qb_lowat = q->q_lowat; 2652 q->q_nband++; 2653 qbpp = &(*qbpp)->qb_next; 2654 } 2655 } 2656 ASSERT(MUTEX_HELD(QLOCK(q))); 2657 qbp = q->q_bandp; 2658 i = bp->b_band; 2659 while (--i) 2660 qbp = qbp->qb_next; 2661 } 2662 2663 /* 2664 * If queue is empty, add the message and initialize the pointers. 2665 * Otherwise, adjust message pointers and queue pointers based on 2666 * the type of the message and where it belongs on the queue. Some 2667 * code is duplicated to minimize the number of conditionals and 2668 * hopefully minimize the amount of time this routine takes. 2669 */ 2670 if (!q->q_first) { 2671 bp->b_next = NULL; 2672 bp->b_prev = NULL; 2673 q->q_first = bp; 2674 q->q_last = bp; 2675 if (qbp) { 2676 qbp->qb_first = bp; 2677 qbp->qb_last = bp; 2678 } 2679 } else if (!qbp) { /* bp->b_band == 0 */ 2680 2681 /* 2682 * If queue class of message is less than or equal to 2683 * that of the last one on the queue, tack on to the end. 2684 */ 2685 tmp = q->q_last; 2686 if (mcls <= (int)queclass(tmp)) { 2687 bp->b_next = NULL; 2688 bp->b_prev = tmp; 2689 tmp->b_next = bp; 2690 q->q_last = bp; 2691 } else { 2692 tmp = q->q_first; 2693 while ((int)queclass(tmp) >= mcls) 2694 tmp = tmp->b_next; 2695 2696 /* 2697 * Insert bp before tmp. 2698 */ 2699 bp->b_next = tmp; 2700 bp->b_prev = tmp->b_prev; 2701 if (tmp->b_prev) 2702 tmp->b_prev->b_next = bp; 2703 else 2704 q->q_first = bp; 2705 tmp->b_prev = bp; 2706 } 2707 } else { /* bp->b_band != 0 */ 2708 if (qbp->qb_first) { 2709 tmp = qbp->qb_last; 2710 2711 /* 2712 * Insert bp after the last message in this band. 2713 */ 2714 bp->b_next = tmp->b_next; 2715 if (tmp->b_next) 2716 tmp->b_next->b_prev = bp; 2717 else 2718 q->q_last = bp; 2719 bp->b_prev = tmp; 2720 tmp->b_next = bp; 2721 } else { 2722 tmp = q->q_last; 2723 if ((mcls < (int)queclass(tmp)) || 2724 (bp->b_band <= tmp->b_band)) { 2725 2726 /* 2727 * Tack bp on end of queue. 2728 */ 2729 bp->b_next = NULL; 2730 bp->b_prev = tmp; 2731 tmp->b_next = bp; 2732 q->q_last = bp; 2733 } else { 2734 tmp = q->q_first; 2735 while (tmp->b_datap->db_type >= QPCTL) 2736 tmp = tmp->b_next; 2737 while (tmp->b_band >= bp->b_band) 2738 tmp = tmp->b_next; 2739 2740 /* 2741 * Insert bp before tmp. 2742 */ 2743 bp->b_next = tmp; 2744 bp->b_prev = tmp->b_prev; 2745 if (tmp->b_prev) 2746 tmp->b_prev->b_next = bp; 2747 else 2748 q->q_first = bp; 2749 tmp->b_prev = bp; 2750 } 2751 qbp->qb_first = bp; 2752 } 2753 qbp->qb_last = bp; 2754 } 2755 2756 /* Get message byte count for q_count accounting */ 2757 bytecnt = mp_cont_len(bp, &mblkcnt); 2758 2759 if (qbp) { 2760 qbp->qb_count += bytecnt; 2761 qbp->qb_mblkcnt += mblkcnt; 2762 if ((qbp->qb_count >= qbp->qb_hiwat) || 2763 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2764 qbp->qb_flag |= QB_FULL; 2765 } 2766 } else { 2767 q->q_count += bytecnt; 2768 q->q_mblkcnt += mblkcnt; 2769 if ((q->q_count >= q->q_hiwat) || 2770 (q->q_mblkcnt >= q->q_hiwat)) { 2771 q->q_flag |= QFULL; 2772 } 2773 } 2774 2775 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL); 2776 2777 if ((mcls > QNORM) || 2778 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2779 qenable_locked(q); 2780 ASSERT(MUTEX_HELD(QLOCK(q))); 2781 if (freezer != curthread) 2782 mutex_exit(QLOCK(q)); 2783 2784 return (1); 2785 } 2786 2787 /* 2788 * Put stuff back at beginning of Q according to priority order. 2789 * See comment on putq above for details. 2790 */ 2791 int 2792 putbq(queue_t *q, mblk_t *bp) 2793 { 2794 mblk_t *tmp; 2795 qband_t *qbp = NULL; 2796 int mcls = (int)queclass(bp); 2797 kthread_id_t freezer; 2798 int bytecnt = 0, mblkcnt = 0; 2799 2800 ASSERT(q && bp); 2801 ASSERT(bp->b_next == NULL); 2802 freezer = STREAM(q)->sd_freezer; 2803 if (freezer == curthread) { 2804 ASSERT(frozenstr(q)); 2805 ASSERT(MUTEX_HELD(QLOCK(q))); 2806 } else 2807 mutex_enter(QLOCK(q)); 2808 2809 /* 2810 * Make sanity checks and if qband structure is not yet 2811 * allocated, do so. 2812 */ 2813 if (mcls == QPCTL) { 2814 if (bp->b_band != 0) 2815 bp->b_band = 0; /* force to be correct */ 2816 } else if (bp->b_band != 0) { 2817 int i; 2818 qband_t **qbpp; 2819 2820 if (bp->b_band > q->q_nband) { 2821 qbpp = &q->q_bandp; 2822 while (*qbpp) 2823 qbpp = &(*qbpp)->qb_next; 2824 while (bp->b_band > q->q_nband) { 2825 if ((*qbpp = allocband()) == NULL) { 2826 if (freezer != curthread) 2827 mutex_exit(QLOCK(q)); 2828 return (0); 2829 } 2830 (*qbpp)->qb_hiwat = q->q_hiwat; 2831 (*qbpp)->qb_lowat = q->q_lowat; 2832 q->q_nband++; 2833 qbpp = &(*qbpp)->qb_next; 2834 } 2835 } 2836 qbp = q->q_bandp; 2837 i = bp->b_band; 2838 while (--i) 2839 qbp = qbp->qb_next; 2840 } 2841 2842 /* 2843 * If queue is empty or if message is high priority, 2844 * place on the front of the queue. 2845 */ 2846 tmp = q->q_first; 2847 if ((!tmp) || (mcls == QPCTL)) { 2848 bp->b_next = tmp; 2849 if (tmp) 2850 tmp->b_prev = bp; 2851 else 2852 q->q_last = bp; 2853 q->q_first = bp; 2854 bp->b_prev = NULL; 2855 if (qbp) { 2856 qbp->qb_first = bp; 2857 qbp->qb_last = bp; 2858 } 2859 } else if (qbp) { /* bp->b_band != 0 */ 2860 tmp = qbp->qb_first; 2861 if (tmp) { 2862 2863 /* 2864 * Insert bp before the first message in this band. 2865 */ 2866 bp->b_next = tmp; 2867 bp->b_prev = tmp->b_prev; 2868 if (tmp->b_prev) 2869 tmp->b_prev->b_next = bp; 2870 else 2871 q->q_first = bp; 2872 tmp->b_prev = bp; 2873 } else { 2874 tmp = q->q_last; 2875 if ((mcls < (int)queclass(tmp)) || 2876 (bp->b_band < tmp->b_band)) { 2877 2878 /* 2879 * Tack bp on end of queue. 2880 */ 2881 bp->b_next = NULL; 2882 bp->b_prev = tmp; 2883 tmp->b_next = bp; 2884 q->q_last = bp; 2885 } else { 2886 tmp = q->q_first; 2887 while (tmp->b_datap->db_type >= QPCTL) 2888 tmp = tmp->b_next; 2889 while (tmp->b_band > bp->b_band) 2890 tmp = tmp->b_next; 2891 2892 /* 2893 * Insert bp before tmp. 2894 */ 2895 bp->b_next = tmp; 2896 bp->b_prev = tmp->b_prev; 2897 if (tmp->b_prev) 2898 tmp->b_prev->b_next = bp; 2899 else 2900 q->q_first = bp; 2901 tmp->b_prev = bp; 2902 } 2903 qbp->qb_last = bp; 2904 } 2905 qbp->qb_first = bp; 2906 } else { /* bp->b_band == 0 && !QPCTL */ 2907 2908 /* 2909 * If the queue class or band is less than that of the last 2910 * message on the queue, tack bp on the end of the queue. 2911 */ 2912 tmp = q->q_last; 2913 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2914 bp->b_next = NULL; 2915 bp->b_prev = tmp; 2916 tmp->b_next = bp; 2917 q->q_last = bp; 2918 } else { 2919 tmp = q->q_first; 2920 while (tmp->b_datap->db_type >= QPCTL) 2921 tmp = tmp->b_next; 2922 while (tmp->b_band > bp->b_band) 2923 tmp = tmp->b_next; 2924 2925 /* 2926 * Insert bp before tmp. 2927 */ 2928 bp->b_next = tmp; 2929 bp->b_prev = tmp->b_prev; 2930 if (tmp->b_prev) 2931 tmp->b_prev->b_next = bp; 2932 else 2933 q->q_first = bp; 2934 tmp->b_prev = bp; 2935 } 2936 } 2937 2938 /* Get message byte count for q_count accounting */ 2939 bytecnt = mp_cont_len(bp, &mblkcnt); 2940 2941 if (qbp) { 2942 qbp->qb_count += bytecnt; 2943 qbp->qb_mblkcnt += mblkcnt; 2944 if ((qbp->qb_count >= qbp->qb_hiwat) || 2945 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2946 qbp->qb_flag |= QB_FULL; 2947 } 2948 } else { 2949 q->q_count += bytecnt; 2950 q->q_mblkcnt += mblkcnt; 2951 if ((q->q_count >= q->q_hiwat) || 2952 (q->q_mblkcnt >= q->q_hiwat)) { 2953 q->q_flag |= QFULL; 2954 } 2955 } 2956 2957 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL); 2958 2959 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2960 qenable_locked(q); 2961 ASSERT(MUTEX_HELD(QLOCK(q))); 2962 if (freezer != curthread) 2963 mutex_exit(QLOCK(q)); 2964 2965 return (1); 2966 } 2967 2968 /* 2969 * Insert a message before an existing message on the queue. If the 2970 * existing message is NULL, the new messages is placed on the end of 2971 * the queue. The queue class of the new message is ignored. However, 2972 * the priority band of the new message must adhere to the following 2973 * ordering: 2974 * 2975 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2976 * 2977 * All flow control parameters are updated. 2978 * 2979 * insq can be called with the stream frozen, but other utility functions 2980 * holding QLOCK, and by streams modules without any locks/frozen. 2981 */ 2982 int 2983 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2984 { 2985 mblk_t *tmp; 2986 qband_t *qbp = NULL; 2987 int mcls = (int)queclass(mp); 2988 kthread_id_t freezer; 2989 int bytecnt = 0, mblkcnt = 0; 2990 2991 freezer = STREAM(q)->sd_freezer; 2992 if (freezer == curthread) { 2993 ASSERT(frozenstr(q)); 2994 ASSERT(MUTEX_HELD(QLOCK(q))); 2995 } else if (MUTEX_HELD(QLOCK(q))) { 2996 /* Don't drop lock on exit */ 2997 freezer = curthread; 2998 } else 2999 mutex_enter(QLOCK(q)); 3000 3001 if (mcls == QPCTL) { 3002 if (mp->b_band != 0) 3003 mp->b_band = 0; /* force to be correct */ 3004 if (emp && emp->b_prev && 3005 (emp->b_prev->b_datap->db_type < QPCTL)) 3006 goto badord; 3007 } 3008 if (emp) { 3009 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 3010 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 3011 (emp->b_prev->b_band < mp->b_band))) { 3012 goto badord; 3013 } 3014 } else { 3015 tmp = q->q_last; 3016 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 3017 badord: 3018 cmn_err(CE_WARN, 3019 "insq: attempt to insert message out of order " 3020 "on q %p", (void *)q); 3021 if (freezer != curthread) 3022 mutex_exit(QLOCK(q)); 3023 return (0); 3024 } 3025 } 3026 3027 if (mp->b_band != 0) { 3028 int i; 3029 qband_t **qbpp; 3030 3031 if (mp->b_band > q->q_nband) { 3032 qbpp = &q->q_bandp; 3033 while (*qbpp) 3034 qbpp = &(*qbpp)->qb_next; 3035 while (mp->b_band > q->q_nband) { 3036 if ((*qbpp = allocband()) == NULL) { 3037 if (freezer != curthread) 3038 mutex_exit(QLOCK(q)); 3039 return (0); 3040 } 3041 (*qbpp)->qb_hiwat = q->q_hiwat; 3042 (*qbpp)->qb_lowat = q->q_lowat; 3043 q->q_nband++; 3044 qbpp = &(*qbpp)->qb_next; 3045 } 3046 } 3047 qbp = q->q_bandp; 3048 i = mp->b_band; 3049 while (--i) 3050 qbp = qbp->qb_next; 3051 } 3052 3053 if ((mp->b_next = emp) != NULL) { 3054 if ((mp->b_prev = emp->b_prev) != NULL) 3055 emp->b_prev->b_next = mp; 3056 else 3057 q->q_first = mp; 3058 emp->b_prev = mp; 3059 } else { 3060 if ((mp->b_prev = q->q_last) != NULL) 3061 q->q_last->b_next = mp; 3062 else 3063 q->q_first = mp; 3064 q->q_last = mp; 3065 } 3066 3067 /* Get mblk and byte count for q_count accounting */ 3068 bytecnt = mp_cont_len(mp, &mblkcnt); 3069 3070 if (qbp) { /* adjust qband pointers and count */ 3071 if (!qbp->qb_first) { 3072 qbp->qb_first = mp; 3073 qbp->qb_last = mp; 3074 } else { 3075 if (mp->b_prev == NULL || (mp->b_prev != NULL && 3076 (mp->b_prev->b_band != mp->b_band))) 3077 qbp->qb_first = mp; 3078 else if (mp->b_next == NULL || (mp->b_next != NULL && 3079 (mp->b_next->b_band != mp->b_band))) 3080 qbp->qb_last = mp; 3081 } 3082 qbp->qb_count += bytecnt; 3083 qbp->qb_mblkcnt += mblkcnt; 3084 if ((qbp->qb_count >= qbp->qb_hiwat) || 3085 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 3086 qbp->qb_flag |= QB_FULL; 3087 } 3088 } else { 3089 q->q_count += bytecnt; 3090 q->q_mblkcnt += mblkcnt; 3091 if ((q->q_count >= q->q_hiwat) || 3092 (q->q_mblkcnt >= q->q_hiwat)) { 3093 q->q_flag |= QFULL; 3094 } 3095 } 3096 3097 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL); 3098 3099 if (canenable(q) && (q->q_flag & QWANTR)) 3100 qenable_locked(q); 3101 3102 ASSERT(MUTEX_HELD(QLOCK(q))); 3103 if (freezer != curthread) 3104 mutex_exit(QLOCK(q)); 3105 3106 return (1); 3107 } 3108 3109 /* 3110 * Create and put a control message on queue. 3111 */ 3112 int 3113 putctl(queue_t *q, int type) 3114 { 3115 mblk_t *bp; 3116 3117 if ((datamsg(type) && (type != M_DELAY)) || 3118 (bp = allocb_tryhard(0)) == NULL) 3119 return (0); 3120 bp->b_datap->db_type = (unsigned char) type; 3121 3122 put(q, bp); 3123 3124 return (1); 3125 } 3126 3127 /* 3128 * Control message with a single-byte parameter 3129 */ 3130 int 3131 putctl1(queue_t *q, int type, int param) 3132 { 3133 mblk_t *bp; 3134 3135 if ((datamsg(type) && (type != M_DELAY)) || 3136 (bp = allocb_tryhard(1)) == NULL) 3137 return (0); 3138 bp->b_datap->db_type = (unsigned char)type; 3139 *bp->b_wptr++ = (unsigned char)param; 3140 3141 put(q, bp); 3142 3143 return (1); 3144 } 3145 3146 int 3147 putnextctl1(queue_t *q, int type, int param) 3148 { 3149 mblk_t *bp; 3150 3151 if ((datamsg(type) && (type != M_DELAY)) || 3152 ((bp = allocb_tryhard(1)) == NULL)) 3153 return (0); 3154 3155 bp->b_datap->db_type = (unsigned char)type; 3156 *bp->b_wptr++ = (unsigned char)param; 3157 3158 putnext(q, bp); 3159 3160 return (1); 3161 } 3162 3163 int 3164 putnextctl(queue_t *q, int type) 3165 { 3166 mblk_t *bp; 3167 3168 if ((datamsg(type) && (type != M_DELAY)) || 3169 ((bp = allocb_tryhard(0)) == NULL)) 3170 return (0); 3171 bp->b_datap->db_type = (unsigned char)type; 3172 3173 putnext(q, bp); 3174 3175 return (1); 3176 } 3177 3178 /* 3179 * Return the queue upstream from this one 3180 */ 3181 queue_t * 3182 backq(queue_t *q) 3183 { 3184 q = _OTHERQ(q); 3185 if (q->q_next) { 3186 q = q->q_next; 3187 return (_OTHERQ(q)); 3188 } 3189 return (NULL); 3190 } 3191 3192 /* 3193 * Send a block back up the queue in reverse from this 3194 * one (e.g. to respond to ioctls) 3195 */ 3196 void 3197 qreply(queue_t *q, mblk_t *bp) 3198 { 3199 ASSERT(q && bp); 3200 3201 putnext(_OTHERQ(q), bp); 3202 } 3203 3204 /* 3205 * Streams Queue Scheduling 3206 * 3207 * Queues are enabled through qenable() when they have messages to 3208 * process. They are serviced by queuerun(), which runs each enabled 3209 * queue's service procedure. The call to queuerun() is processor 3210 * dependent - the general principle is that it be run whenever a queue 3211 * is enabled but before returning to user level. For system calls, 3212 * the function runqueues() is called if their action causes a queue 3213 * to be enabled. For device interrupts, queuerun() should be 3214 * called before returning from the last level of interrupt. Beyond 3215 * this, no timing assumptions should be made about queue scheduling. 3216 */ 3217 3218 /* 3219 * Enable a queue: put it on list of those whose service procedures are 3220 * ready to run and set up the scheduling mechanism. 3221 * The broadcast is done outside the mutex -> to avoid the woken thread 3222 * from contending with the mutex. This is OK 'cos the queue has been 3223 * enqueued on the runlist and flagged safely at this point. 3224 */ 3225 void 3226 qenable(queue_t *q) 3227 { 3228 mutex_enter(QLOCK(q)); 3229 qenable_locked(q); 3230 mutex_exit(QLOCK(q)); 3231 } 3232 /* 3233 * Return number of messages on queue 3234 */ 3235 int 3236 qsize(queue_t *qp) 3237 { 3238 int count = 0; 3239 mblk_t *mp; 3240 3241 mutex_enter(QLOCK(qp)); 3242 for (mp = qp->q_first; mp; mp = mp->b_next) 3243 count++; 3244 mutex_exit(QLOCK(qp)); 3245 return (count); 3246 } 3247 3248 /* 3249 * noenable - set queue so that putq() will not enable it. 3250 * enableok - set queue so that putq() can enable it. 3251 */ 3252 void 3253 noenable(queue_t *q) 3254 { 3255 mutex_enter(QLOCK(q)); 3256 q->q_flag |= QNOENB; 3257 mutex_exit(QLOCK(q)); 3258 } 3259 3260 void 3261 enableok(queue_t *q) 3262 { 3263 mutex_enter(QLOCK(q)); 3264 q->q_flag &= ~QNOENB; 3265 mutex_exit(QLOCK(q)); 3266 } 3267 3268 /* 3269 * Set queue fields. 3270 */ 3271 int 3272 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 3273 { 3274 qband_t *qbp = NULL; 3275 queue_t *wrq; 3276 int error = 0; 3277 kthread_id_t freezer; 3278 3279 freezer = STREAM(q)->sd_freezer; 3280 if (freezer == curthread) { 3281 ASSERT(frozenstr(q)); 3282 ASSERT(MUTEX_HELD(QLOCK(q))); 3283 } else 3284 mutex_enter(QLOCK(q)); 3285 3286 if (what >= QBAD) { 3287 error = EINVAL; 3288 goto done; 3289 } 3290 if (pri != 0) { 3291 int i; 3292 qband_t **qbpp; 3293 3294 if (pri > q->q_nband) { 3295 qbpp = &q->q_bandp; 3296 while (*qbpp) 3297 qbpp = &(*qbpp)->qb_next; 3298 while (pri > q->q_nband) { 3299 if ((*qbpp = allocband()) == NULL) { 3300 error = EAGAIN; 3301 goto done; 3302 } 3303 (*qbpp)->qb_hiwat = q->q_hiwat; 3304 (*qbpp)->qb_lowat = q->q_lowat; 3305 q->q_nband++; 3306 qbpp = &(*qbpp)->qb_next; 3307 } 3308 } 3309 qbp = q->q_bandp; 3310 i = pri; 3311 while (--i) 3312 qbp = qbp->qb_next; 3313 } 3314 switch (what) { 3315 3316 case QHIWAT: 3317 if (qbp) 3318 qbp->qb_hiwat = (size_t)val; 3319 else 3320 q->q_hiwat = (size_t)val; 3321 break; 3322 3323 case QLOWAT: 3324 if (qbp) 3325 qbp->qb_lowat = (size_t)val; 3326 else 3327 q->q_lowat = (size_t)val; 3328 break; 3329 3330 case QMAXPSZ: 3331 if (qbp) 3332 error = EINVAL; 3333 else 3334 q->q_maxpsz = (ssize_t)val; 3335 3336 /* 3337 * Performance concern, strwrite looks at the module below 3338 * the stream head for the maxpsz each time it does a write 3339 * we now cache it at the stream head. Check to see if this 3340 * queue is sitting directly below the stream head. 3341 */ 3342 wrq = STREAM(q)->sd_wrq; 3343 if (q != wrq->q_next) 3344 break; 3345 3346 /* 3347 * If the stream is not frozen drop the current QLOCK and 3348 * acquire the sd_wrq QLOCK which protects sd_qn_* 3349 */ 3350 if (freezer != curthread) { 3351 mutex_exit(QLOCK(q)); 3352 mutex_enter(QLOCK(wrq)); 3353 } 3354 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3355 3356 if (strmsgsz != 0) { 3357 if (val == INFPSZ) 3358 val = strmsgsz; 3359 else { 3360 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3361 val = MIN(PIPE_BUF, val); 3362 else 3363 val = MIN(strmsgsz, val); 3364 } 3365 } 3366 STREAM(q)->sd_qn_maxpsz = val; 3367 if (freezer != curthread) { 3368 mutex_exit(QLOCK(wrq)); 3369 mutex_enter(QLOCK(q)); 3370 } 3371 break; 3372 3373 case QMINPSZ: 3374 if (qbp) 3375 error = EINVAL; 3376 else 3377 q->q_minpsz = (ssize_t)val; 3378 3379 /* 3380 * Performance concern, strwrite looks at the module below 3381 * the stream head for the maxpsz each time it does a write 3382 * we now cache it at the stream head. Check to see if this 3383 * queue is sitting directly below the stream head. 3384 */ 3385 wrq = STREAM(q)->sd_wrq; 3386 if (q != wrq->q_next) 3387 break; 3388 3389 /* 3390 * If the stream is not frozen drop the current QLOCK and 3391 * acquire the sd_wrq QLOCK which protects sd_qn_* 3392 */ 3393 if (freezer != curthread) { 3394 mutex_exit(QLOCK(q)); 3395 mutex_enter(QLOCK(wrq)); 3396 } 3397 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3398 3399 if (freezer != curthread) { 3400 mutex_exit(QLOCK(wrq)); 3401 mutex_enter(QLOCK(q)); 3402 } 3403 break; 3404 3405 case QSTRUIOT: 3406 if (qbp) 3407 error = EINVAL; 3408 else 3409 q->q_struiot = (ushort_t)val; 3410 break; 3411 3412 case QCOUNT: 3413 case QFIRST: 3414 case QLAST: 3415 case QFLAG: 3416 error = EPERM; 3417 break; 3418 3419 default: 3420 error = EINVAL; 3421 break; 3422 } 3423 done: 3424 if (freezer != curthread) 3425 mutex_exit(QLOCK(q)); 3426 return (error); 3427 } 3428 3429 /* 3430 * Get queue fields. 3431 */ 3432 int 3433 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3434 { 3435 qband_t *qbp = NULL; 3436 int error = 0; 3437 kthread_id_t freezer; 3438 3439 freezer = STREAM(q)->sd_freezer; 3440 if (freezer == curthread) { 3441 ASSERT(frozenstr(q)); 3442 ASSERT(MUTEX_HELD(QLOCK(q))); 3443 } else 3444 mutex_enter(QLOCK(q)); 3445 if (what >= QBAD) { 3446 error = EINVAL; 3447 goto done; 3448 } 3449 if (pri != 0) { 3450 int i; 3451 qband_t **qbpp; 3452 3453 if (pri > q->q_nband) { 3454 qbpp = &q->q_bandp; 3455 while (*qbpp) 3456 qbpp = &(*qbpp)->qb_next; 3457 while (pri > q->q_nband) { 3458 if ((*qbpp = allocband()) == NULL) { 3459 error = EAGAIN; 3460 goto done; 3461 } 3462 (*qbpp)->qb_hiwat = q->q_hiwat; 3463 (*qbpp)->qb_lowat = q->q_lowat; 3464 q->q_nband++; 3465 qbpp = &(*qbpp)->qb_next; 3466 } 3467 } 3468 qbp = q->q_bandp; 3469 i = pri; 3470 while (--i) 3471 qbp = qbp->qb_next; 3472 } 3473 switch (what) { 3474 case QHIWAT: 3475 if (qbp) 3476 *(size_t *)valp = qbp->qb_hiwat; 3477 else 3478 *(size_t *)valp = q->q_hiwat; 3479 break; 3480 3481 case QLOWAT: 3482 if (qbp) 3483 *(size_t *)valp = qbp->qb_lowat; 3484 else 3485 *(size_t *)valp = q->q_lowat; 3486 break; 3487 3488 case QMAXPSZ: 3489 if (qbp) 3490 error = EINVAL; 3491 else 3492 *(ssize_t *)valp = q->q_maxpsz; 3493 break; 3494 3495 case QMINPSZ: 3496 if (qbp) 3497 error = EINVAL; 3498 else 3499 *(ssize_t *)valp = q->q_minpsz; 3500 break; 3501 3502 case QCOUNT: 3503 if (qbp) 3504 *(size_t *)valp = qbp->qb_count; 3505 else 3506 *(size_t *)valp = q->q_count; 3507 break; 3508 3509 case QFIRST: 3510 if (qbp) 3511 *(mblk_t **)valp = qbp->qb_first; 3512 else 3513 *(mblk_t **)valp = q->q_first; 3514 break; 3515 3516 case QLAST: 3517 if (qbp) 3518 *(mblk_t **)valp = qbp->qb_last; 3519 else 3520 *(mblk_t **)valp = q->q_last; 3521 break; 3522 3523 case QFLAG: 3524 if (qbp) 3525 *(uint_t *)valp = qbp->qb_flag; 3526 else 3527 *(uint_t *)valp = q->q_flag; 3528 break; 3529 3530 case QSTRUIOT: 3531 if (qbp) 3532 error = EINVAL; 3533 else 3534 *(short *)valp = q->q_struiot; 3535 break; 3536 3537 default: 3538 error = EINVAL; 3539 break; 3540 } 3541 done: 3542 if (freezer != curthread) 3543 mutex_exit(QLOCK(q)); 3544 return (error); 3545 } 3546 3547 /* 3548 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3549 * QWANTWSYNC or QWANTR or QWANTW, 3550 * 3551 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3552 * deferred wakeup will be done. Also if strpoll() in progress then a 3553 * deferred pollwakeup will be done. 3554 */ 3555 void 3556 strwakeq(queue_t *q, int flag) 3557 { 3558 stdata_t *stp = STREAM(q); 3559 pollhead_t *pl; 3560 3561 mutex_enter(&stp->sd_lock); 3562 pl = &stp->sd_pollist; 3563 if (flag & QWANTWSYNC) { 3564 ASSERT(!(q->q_flag & QREADR)); 3565 if (stp->sd_flag & WSLEEP) { 3566 stp->sd_flag &= ~WSLEEP; 3567 cv_broadcast(&stp->sd_wrq->q_wait); 3568 } else { 3569 stp->sd_wakeq |= WSLEEP; 3570 } 3571 3572 mutex_exit(&stp->sd_lock); 3573 pollwakeup(pl, POLLWRNORM); 3574 mutex_enter(&stp->sd_lock); 3575 3576 if (stp->sd_sigflags & S_WRNORM) 3577 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3578 } else if (flag & QWANTR) { 3579 if (stp->sd_flag & RSLEEP) { 3580 stp->sd_flag &= ~RSLEEP; 3581 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3582 } else { 3583 stp->sd_wakeq |= RSLEEP; 3584 } 3585 3586 mutex_exit(&stp->sd_lock); 3587 pollwakeup(pl, POLLIN | POLLRDNORM); 3588 mutex_enter(&stp->sd_lock); 3589 3590 { 3591 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3592 3593 if (events) 3594 strsendsig(stp->sd_siglist, events, 0, 0); 3595 } 3596 } else { 3597 if (stp->sd_flag & WSLEEP) { 3598 stp->sd_flag &= ~WSLEEP; 3599 cv_broadcast(&stp->sd_wrq->q_wait); 3600 } 3601 3602 mutex_exit(&stp->sd_lock); 3603 pollwakeup(pl, POLLWRNORM); 3604 mutex_enter(&stp->sd_lock); 3605 3606 if (stp->sd_sigflags & S_WRNORM) 3607 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3608 } 3609 mutex_exit(&stp->sd_lock); 3610 } 3611 3612 int 3613 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3614 { 3615 stdata_t *stp = STREAM(q); 3616 int typ = STRUIOT_STANDARD; 3617 uio_t *uiop = &dp->d_uio; 3618 dblk_t *dbp; 3619 ssize_t uiocnt; 3620 ssize_t cnt; 3621 unsigned char *ptr; 3622 ssize_t resid; 3623 int error = 0; 3624 on_trap_data_t otd; 3625 queue_t *stwrq; 3626 3627 /* 3628 * Plumbing may change while taking the type so store the 3629 * queue in a temporary variable. It doesn't matter even 3630 * if the we take the type from the previous plumbing, 3631 * that's because if the plumbing has changed when we were 3632 * holding the queue in a temporary variable, we can continue 3633 * processing the message the way it would have been processed 3634 * in the old plumbing, without any side effects but a bit 3635 * extra processing for partial ip header checksum. 3636 * 3637 * This has been done to avoid holding the sd_lock which is 3638 * very hot. 3639 */ 3640 3641 stwrq = stp->sd_struiowrq; 3642 if (stwrq) 3643 typ = stwrq->q_struiot; 3644 3645 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3646 dbp = mp->b_datap; 3647 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3648 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3649 cnt = MIN(uiocnt, uiop->uio_resid); 3650 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3651 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3652 /* 3653 * Either this mblk has already been processed 3654 * or there is no more room in this mblk (?). 3655 */ 3656 continue; 3657 } 3658 switch (typ) { 3659 case STRUIOT_STANDARD: 3660 if (noblock) { 3661 if (on_trap(&otd, OT_DATA_ACCESS)) { 3662 no_trap(); 3663 error = EWOULDBLOCK; 3664 goto out; 3665 } 3666 } 3667 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3668 if (noblock) 3669 no_trap(); 3670 goto out; 3671 } 3672 if (noblock) 3673 no_trap(); 3674 break; 3675 3676 default: 3677 error = EIO; 3678 goto out; 3679 } 3680 dbp->db_struioflag |= STRUIO_DONE; 3681 dbp->db_cksumstuff += cnt; 3682 } 3683 out: 3684 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3685 /* 3686 * A fault has occured and some bytes were moved to the 3687 * current mblk, the uio_t has already been updated by 3688 * the appropriate uio routine, so also update the mblk 3689 * to reflect this in case this same mblk chain is used 3690 * again (after the fault has been handled). 3691 */ 3692 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3693 if (uiocnt >= resid) 3694 dbp->db_cksumstuff += resid; 3695 } 3696 return (error); 3697 } 3698 3699 /* 3700 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3701 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3702 * that removeq() will not try to close the queue while a thread is inside the 3703 * queue. 3704 */ 3705 static boolean_t 3706 rwnext_enter(queue_t *qp) 3707 { 3708 mutex_enter(QLOCK(qp)); 3709 if (qp->q_flag & QWCLOSE) { 3710 mutex_exit(QLOCK(qp)); 3711 return (B_FALSE); 3712 } 3713 qp->q_rwcnt++; 3714 ASSERT(qp->q_rwcnt != 0); 3715 mutex_exit(QLOCK(qp)); 3716 return (B_TRUE); 3717 } 3718 3719 /* 3720 * Decrease the count of threads running in sync stream queue and wake up any 3721 * threads blocked in removeq(). 3722 */ 3723 static void 3724 rwnext_exit(queue_t *qp) 3725 { 3726 mutex_enter(QLOCK(qp)); 3727 qp->q_rwcnt--; 3728 if (qp->q_flag & QWANTRMQSYNC) { 3729 qp->q_flag &= ~QWANTRMQSYNC; 3730 cv_broadcast(&qp->q_wait); 3731 } 3732 mutex_exit(QLOCK(qp)); 3733 } 3734 3735 /* 3736 * The purpose of rwnext() is to call the rw procedure of the next 3737 * (downstream) modules queue. 3738 * 3739 * treated as put entrypoint for perimeter syncronization. 3740 * 3741 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3742 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3743 * not matter if any regular put entrypoints have been already entered. We 3744 * can't increment one of the sq_putcounts (instead of sq_count) because 3745 * qwait_rw won't know which counter to decrement. 3746 * 3747 * It would be reasonable to add the lockless FASTPUT logic. 3748 */ 3749 int 3750 rwnext(queue_t *qp, struiod_t *dp) 3751 { 3752 queue_t *nqp; 3753 syncq_t *sq; 3754 uint16_t count; 3755 uint16_t flags; 3756 struct qinit *qi; 3757 int (*proc)(); 3758 struct stdata *stp; 3759 int isread; 3760 int rval; 3761 3762 stp = STREAM(qp); 3763 /* 3764 * Prevent q_next from changing by holding sd_lock until acquiring 3765 * SQLOCK. Note that a read-side rwnext from the streamhead will 3766 * already have sd_lock acquired. In either case sd_lock is always 3767 * released after acquiring SQLOCK. 3768 * 3769 * The streamhead read-side holding sd_lock when calling rwnext is 3770 * required to prevent a race condition were M_DATA mblks flowing 3771 * up the read-side of the stream could be bypassed by a rwnext() 3772 * down-call. In this case sd_lock acts as the streamhead perimeter. 3773 */ 3774 if ((nqp = _WR(qp)) == qp) { 3775 isread = 0; 3776 mutex_enter(&stp->sd_lock); 3777 qp = nqp->q_next; 3778 } else { 3779 isread = 1; 3780 if (nqp != stp->sd_wrq) 3781 /* Not streamhead */ 3782 mutex_enter(&stp->sd_lock); 3783 qp = _RD(nqp->q_next); 3784 } 3785 qi = qp->q_qinfo; 3786 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3787 /* 3788 * Not a synchronous module or no r/w procedure for this 3789 * queue, so just return EINVAL and let the caller handle it. 3790 */ 3791 mutex_exit(&stp->sd_lock); 3792 return (EINVAL); 3793 } 3794 3795 if (rwnext_enter(qp) == B_FALSE) { 3796 mutex_exit(&stp->sd_lock); 3797 return (EINVAL); 3798 } 3799 3800 sq = qp->q_syncq; 3801 mutex_enter(SQLOCK(sq)); 3802 mutex_exit(&stp->sd_lock); 3803 count = sq->sq_count; 3804 flags = sq->sq_flags; 3805 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3806 3807 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3808 /* 3809 * if this queue is being closed, return. 3810 */ 3811 if (qp->q_flag & QWCLOSE) { 3812 mutex_exit(SQLOCK(sq)); 3813 rwnext_exit(qp); 3814 return (EINVAL); 3815 } 3816 3817 /* 3818 * Wait until we can enter the inner perimeter. 3819 */ 3820 sq->sq_flags = flags | SQ_WANTWAKEUP; 3821 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3822 count = sq->sq_count; 3823 flags = sq->sq_flags; 3824 } 3825 3826 if (isread == 0 && stp->sd_struiowrq == NULL || 3827 isread == 1 && stp->sd_struiordq == NULL) { 3828 /* 3829 * Stream plumbing changed while waiting for inner perimeter 3830 * so just return EINVAL and let the caller handle it. 3831 */ 3832 mutex_exit(SQLOCK(sq)); 3833 rwnext_exit(qp); 3834 return (EINVAL); 3835 } 3836 if (!(flags & SQ_CIPUT)) 3837 sq->sq_flags = flags | SQ_EXCL; 3838 sq->sq_count = count + 1; 3839 ASSERT(sq->sq_count != 0); /* Wraparound */ 3840 /* 3841 * Note: The only message ordering guarantee that rwnext() makes is 3842 * for the write queue flow-control case. All others (r/w queue 3843 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3844 * the queue's rw procedure. This could be genralized here buy 3845 * running the queue's service procedure, but that wouldn't be 3846 * the most efficent for all cases. 3847 */ 3848 mutex_exit(SQLOCK(sq)); 3849 if (! isread && (qp->q_flag & QFULL)) { 3850 /* 3851 * Write queue may be flow controlled. If so, 3852 * mark the queue for wakeup when it's not. 3853 */ 3854 mutex_enter(QLOCK(qp)); 3855 if (qp->q_flag & QFULL) { 3856 qp->q_flag |= QWANTWSYNC; 3857 mutex_exit(QLOCK(qp)); 3858 rval = EWOULDBLOCK; 3859 goto out; 3860 } 3861 mutex_exit(QLOCK(qp)); 3862 } 3863 3864 if (! isread && dp->d_mp) 3865 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3866 dp->d_mp->b_datap->db_base); 3867 3868 rval = (*proc)(qp, dp); 3869 3870 if (isread && dp->d_mp) 3871 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3872 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3873 out: 3874 /* 3875 * The queue is protected from being freed by sq_count, so it is 3876 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3877 */ 3878 rwnext_exit(qp); 3879 3880 mutex_enter(SQLOCK(sq)); 3881 flags = sq->sq_flags; 3882 ASSERT(sq->sq_count != 0); 3883 sq->sq_count--; 3884 if (flags & SQ_TAIL) { 3885 putnext_tail(sq, qp, flags); 3886 /* 3887 * The only purpose of this ASSERT is to preserve calling stack 3888 * in DEBUG kernel. 3889 */ 3890 ASSERT(flags & SQ_TAIL); 3891 return (rval); 3892 } 3893 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3894 /* 3895 * Safe to always drop SQ_EXCL: 3896 * Not SQ_CIPUT means we set SQ_EXCL above 3897 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3898 * did a qwriter(INNER) in which case nobody else 3899 * is in the inner perimeter and we are exiting. 3900 * 3901 * I would like to make the following assertion: 3902 * 3903 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3904 * sq->sq_count == 0); 3905 * 3906 * which indicates that if we are both putshared and exclusive, 3907 * we became exclusive while executing the putproc, and the only 3908 * claim on the syncq was the one we dropped a few lines above. 3909 * But other threads that enter putnext while the syncq is exclusive 3910 * need to make a claim as they may need to drop SQLOCK in the 3911 * has_writers case to avoid deadlocks. If these threads are 3912 * delayed or preempted, it is possible that the writer thread can 3913 * find out that there are other claims making the (sq_count == 0) 3914 * test invalid. 3915 */ 3916 3917 sq->sq_flags = flags & ~SQ_EXCL; 3918 if (sq->sq_flags & SQ_WANTWAKEUP) { 3919 sq->sq_flags &= ~SQ_WANTWAKEUP; 3920 cv_broadcast(&sq->sq_wait); 3921 } 3922 mutex_exit(SQLOCK(sq)); 3923 return (rval); 3924 } 3925 3926 /* 3927 * The purpose of infonext() is to call the info procedure of the next 3928 * (downstream) modules queue. 3929 * 3930 * treated as put entrypoint for perimeter syncronization. 3931 * 3932 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3933 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3934 * it does not matter if any regular put entrypoints have been already 3935 * entered. 3936 */ 3937 int 3938 infonext(queue_t *qp, infod_t *idp) 3939 { 3940 queue_t *nqp; 3941 syncq_t *sq; 3942 uint16_t count; 3943 uint16_t flags; 3944 struct qinit *qi; 3945 int (*proc)(); 3946 struct stdata *stp; 3947 int rval; 3948 3949 stp = STREAM(qp); 3950 /* 3951 * Prevent q_next from changing by holding sd_lock until 3952 * acquiring SQLOCK. 3953 */ 3954 mutex_enter(&stp->sd_lock); 3955 if ((nqp = _WR(qp)) == qp) { 3956 qp = nqp->q_next; 3957 } else { 3958 qp = _RD(nqp->q_next); 3959 } 3960 qi = qp->q_qinfo; 3961 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3962 mutex_exit(&stp->sd_lock); 3963 return (EINVAL); 3964 } 3965 sq = qp->q_syncq; 3966 mutex_enter(SQLOCK(sq)); 3967 mutex_exit(&stp->sd_lock); 3968 count = sq->sq_count; 3969 flags = sq->sq_flags; 3970 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3971 3972 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3973 /* 3974 * Wait until we can enter the inner perimeter. 3975 */ 3976 sq->sq_flags = flags | SQ_WANTWAKEUP; 3977 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3978 count = sq->sq_count; 3979 flags = sq->sq_flags; 3980 } 3981 3982 if (! (flags & SQ_CIPUT)) 3983 sq->sq_flags = flags | SQ_EXCL; 3984 sq->sq_count = count + 1; 3985 ASSERT(sq->sq_count != 0); /* Wraparound */ 3986 mutex_exit(SQLOCK(sq)); 3987 3988 rval = (*proc)(qp, idp); 3989 3990 mutex_enter(SQLOCK(sq)); 3991 flags = sq->sq_flags; 3992 ASSERT(sq->sq_count != 0); 3993 sq->sq_count--; 3994 if (flags & SQ_TAIL) { 3995 putnext_tail(sq, qp, flags); 3996 /* 3997 * The only purpose of this ASSERT is to preserve calling stack 3998 * in DEBUG kernel. 3999 */ 4000 ASSERT(flags & SQ_TAIL); 4001 return (rval); 4002 } 4003 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 4004 /* 4005 * XXXX 4006 * I am not certain the next comment is correct here. I need to consider 4007 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 4008 * might cause other problems. It just might be safer to drop it if 4009 * !SQ_CIPUT because that is when we set it. 4010 */ 4011 /* 4012 * Safe to always drop SQ_EXCL: 4013 * Not SQ_CIPUT means we set SQ_EXCL above 4014 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 4015 * did a qwriter(INNER) in which case nobody else 4016 * is in the inner perimeter and we are exiting. 4017 * 4018 * I would like to make the following assertion: 4019 * 4020 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 4021 * sq->sq_count == 0); 4022 * 4023 * which indicates that if we are both putshared and exclusive, 4024 * we became exclusive while executing the putproc, and the only 4025 * claim on the syncq was the one we dropped a few lines above. 4026 * But other threads that enter putnext while the syncq is exclusive 4027 * need to make a claim as they may need to drop SQLOCK in the 4028 * has_writers case to avoid deadlocks. If these threads are 4029 * delayed or preempted, it is possible that the writer thread can 4030 * find out that there are other claims making the (sq_count == 0) 4031 * test invalid. 4032 */ 4033 4034 sq->sq_flags = flags & ~SQ_EXCL; 4035 mutex_exit(SQLOCK(sq)); 4036 return (rval); 4037 } 4038 4039 /* 4040 * Return nonzero if the queue is responsible for struio(), else return 0. 4041 */ 4042 int 4043 isuioq(queue_t *q) 4044 { 4045 if (q->q_flag & QREADR) 4046 return (STREAM(q)->sd_struiordq == q); 4047 else 4048 return (STREAM(q)->sd_struiowrq == q); 4049 } 4050 4051 #if defined(__sparc) 4052 int disable_putlocks = 0; 4053 #else 4054 int disable_putlocks = 1; 4055 #endif 4056 4057 /* 4058 * called by create_putlock. 4059 */ 4060 static void 4061 create_syncq_putlocks(queue_t *q) 4062 { 4063 syncq_t *sq = q->q_syncq; 4064 ciputctrl_t *cip; 4065 int i; 4066 4067 ASSERT(sq != NULL); 4068 4069 ASSERT(disable_putlocks == 0); 4070 ASSERT(n_ciputctrl >= min_n_ciputctrl); 4071 ASSERT(ciputctrl_cache != NULL); 4072 4073 if (!(sq->sq_type & SQ_CIPUT)) 4074 return; 4075 4076 for (i = 0; i <= 1; i++) { 4077 if (sq->sq_ciputctrl == NULL) { 4078 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4079 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4080 mutex_enter(SQLOCK(sq)); 4081 if (sq->sq_ciputctrl != NULL) { 4082 mutex_exit(SQLOCK(sq)); 4083 kmem_cache_free(ciputctrl_cache, cip); 4084 } else { 4085 ASSERT(sq->sq_nciputctrl == 0); 4086 sq->sq_nciputctrl = n_ciputctrl - 1; 4087 /* 4088 * putnext checks sq_ciputctrl without holding 4089 * SQLOCK. if it is not NULL putnext assumes 4090 * sq_nciputctrl is initialized. membar below 4091 * insures that. 4092 */ 4093 membar_producer(); 4094 sq->sq_ciputctrl = cip; 4095 mutex_exit(SQLOCK(sq)); 4096 } 4097 } 4098 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 4099 if (i == 1) 4100 break; 4101 q = _OTHERQ(q); 4102 if (!(q->q_flag & QPERQ)) { 4103 ASSERT(sq == q->q_syncq); 4104 break; 4105 } 4106 ASSERT(q->q_syncq != NULL); 4107 ASSERT(sq != q->q_syncq); 4108 sq = q->q_syncq; 4109 ASSERT(sq->sq_type & SQ_CIPUT); 4110 } 4111 } 4112 4113 /* 4114 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 4115 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 4116 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 4117 * starting from q and down to the driver. 4118 * 4119 * This should be called after the affected queues are part of stream 4120 * geometry. It should be called from driver/module open routine after 4121 * qprocson() call. It is also called from nfs syscall where it is known that 4122 * stream is configured and won't change its geometry during create_putlock 4123 * call. 4124 * 4125 * caller normally uses 0 value for the stream argument to speed up MT putnext 4126 * into the perimeter of q for example because its perimeter is per module 4127 * (e.g. IP). 4128 * 4129 * caller normally uses non 0 value for the stream argument to hint the system 4130 * that the stream of q is a very contended global system stream 4131 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 4132 * particularly MT hot. 4133 * 4134 * Caller insures stream plumbing won't happen while we are here and therefore 4135 * q_next can be safely used. 4136 */ 4137 4138 void 4139 create_putlocks(queue_t *q, int stream) 4140 { 4141 ciputctrl_t *cip; 4142 struct stdata *stp = STREAM(q); 4143 4144 q = _WR(q); 4145 ASSERT(stp != NULL); 4146 4147 if (disable_putlocks != 0) 4148 return; 4149 4150 if (n_ciputctrl < min_n_ciputctrl) 4151 return; 4152 4153 ASSERT(ciputctrl_cache != NULL); 4154 4155 if (stream != 0 && stp->sd_ciputctrl == NULL) { 4156 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4157 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4158 mutex_enter(&stp->sd_lock); 4159 if (stp->sd_ciputctrl != NULL) { 4160 mutex_exit(&stp->sd_lock); 4161 kmem_cache_free(ciputctrl_cache, cip); 4162 } else { 4163 ASSERT(stp->sd_nciputctrl == 0); 4164 stp->sd_nciputctrl = n_ciputctrl - 1; 4165 /* 4166 * putnext checks sd_ciputctrl without holding 4167 * sd_lock. if it is not NULL putnext assumes 4168 * sd_nciputctrl is initialized. membar below 4169 * insures that. 4170 */ 4171 membar_producer(); 4172 stp->sd_ciputctrl = cip; 4173 mutex_exit(&stp->sd_lock); 4174 } 4175 } 4176 4177 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 4178 4179 while (_SAMESTR(q)) { 4180 create_syncq_putlocks(q); 4181 if (stream == 0) 4182 return; 4183 q = q->q_next; 4184 } 4185 ASSERT(q != NULL); 4186 create_syncq_putlocks(q); 4187 } 4188 4189 /* 4190 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 4191 * through a stream. 4192 * 4193 * Data currently record per-event is a timestamp, module/driver name, 4194 * downstream module/driver name, optional callstack, event type and a per 4195 * type datum. Much of the STREAMS framework is instrumented for automatic 4196 * flow tracing (when enabled). Events can be defined and used by STREAMS 4197 * modules and drivers. 4198 * 4199 * Global objects: 4200 * 4201 * str_ftevent() - Add a flow-trace event to a dblk. 4202 * str_ftfree() - Free flow-trace data 4203 * 4204 * Local objects: 4205 * 4206 * fthdr_cache - pointer to the kmem cache for trace header. 4207 * ftblk_cache - pointer to the kmem cache for trace data blocks. 4208 */ 4209 4210 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 4211 int str_ftstack = 0; /* Don't record event call stacks */ 4212 4213 void 4214 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 4215 { 4216 ftblk_t *bp = hp->tail; 4217 ftblk_t *nbp; 4218 ftevnt_t *ep; 4219 int ix, nix; 4220 4221 ASSERT(hp != NULL); 4222 4223 for (;;) { 4224 if ((ix = bp->ix) == FTBLK_EVNTS) { 4225 /* 4226 * Tail doesn't have room, so need a new tail. 4227 * 4228 * To make this MT safe, first, allocate a new 4229 * ftblk, and initialize it. To make life a 4230 * little easier, reserve the first slot (mostly 4231 * by making ix = 1). When we are finished with 4232 * the initialization, CAS this pointer to the 4233 * tail. If this succeeds, this is the new 4234 * "next" block. Otherwise, another thread 4235 * got here first, so free the block and start 4236 * again. 4237 */ 4238 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP); 4239 if (nbp == NULL) { 4240 /* no mem, so punt */ 4241 str_ftnever++; 4242 /* free up all flow data? */ 4243 return; 4244 } 4245 nbp->nxt = NULL; 4246 nbp->ix = 1; 4247 /* 4248 * Just in case there is another thread about 4249 * to get the next index, we need to make sure 4250 * the value is there for it. 4251 */ 4252 membar_producer(); 4253 if (casptr(&hp->tail, bp, nbp) == bp) { 4254 /* CAS was successful */ 4255 bp->nxt = nbp; 4256 membar_producer(); 4257 bp = nbp; 4258 ix = 0; 4259 goto cas_good; 4260 } else { 4261 kmem_cache_free(ftblk_cache, nbp); 4262 bp = hp->tail; 4263 continue; 4264 } 4265 } 4266 nix = ix + 1; 4267 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) { 4268 cas_good: 4269 if (curthread != hp->thread) { 4270 hp->thread = curthread; 4271 evnt |= FTEV_CS; 4272 } 4273 if (CPU->cpu_seqid != hp->cpu_seqid) { 4274 hp->cpu_seqid = CPU->cpu_seqid; 4275 evnt |= FTEV_PS; 4276 } 4277 ep = &bp->ev[ix]; 4278 break; 4279 } 4280 } 4281 4282 if (evnt & FTEV_QMASK) { 4283 queue_t *qp = p; 4284 4285 if (!(qp->q_flag & QREADR)) 4286 evnt |= FTEV_ISWR; 4287 4288 ep->mid = Q2NAME(qp); 4289 4290 /* 4291 * We only record the next queue name for FTEV_PUTNEXT since 4292 * that's the only time we *really* need it, and the putnext() 4293 * code ensures that qp->q_next won't vanish. (We could use 4294 * claimstr()/releasestr() but at a performance cost.) 4295 */ 4296 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL) 4297 ep->midnext = Q2NAME(qp->q_next); 4298 else 4299 ep->midnext = NULL; 4300 } else { 4301 ep->mid = p; 4302 ep->midnext = NULL; 4303 } 4304 4305 if (ep->stk != NULL) 4306 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH); 4307 4308 ep->ts = gethrtime(); 4309 ep->evnt = evnt; 4310 ep->data = data; 4311 hp->hash = (hp->hash << 9) + hp->hash; 4312 hp->hash += (evnt << 16) | data; 4313 hp->hash += (uintptr_t)ep->mid; 4314 } 4315 4316 /* 4317 * Free flow-trace data. 4318 */ 4319 void 4320 str_ftfree(dblk_t *dbp) 4321 { 4322 fthdr_t *hp = dbp->db_fthdr; 4323 ftblk_t *bp = &hp->first; 4324 ftblk_t *nbp; 4325 4326 if (bp != hp->tail || bp->ix != 0) { 4327 /* 4328 * Clear out the hash, have the tail point to itself, and free 4329 * any continuation blocks. 4330 */ 4331 bp = hp->first.nxt; 4332 hp->tail = &hp->first; 4333 hp->hash = 0; 4334 hp->first.nxt = NULL; 4335 hp->first.ix = 0; 4336 while (bp != NULL) { 4337 nbp = bp->nxt; 4338 kmem_cache_free(ftblk_cache, bp); 4339 bp = nbp; 4340 } 4341 } 4342 kmem_cache_free(fthdr_cache, hp); 4343 dbp->db_fthdr = NULL; 4344 } 4345