1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 /* 25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 * 28 * Copyright 2021 Tintri by DDN, Inc. All rights reserved. 29 */ 30 31 #include <sys/types.h> 32 #include <sys/param.h> 33 #include <sys/thread.h> 34 #include <sys/sysmacros.h> 35 #include <sys/stropts.h> 36 #include <sys/stream.h> 37 #include <sys/strsubr.h> 38 #include <sys/strsun.h> 39 #include <sys/conf.h> 40 #include <sys/debug.h> 41 #include <sys/cmn_err.h> 42 #include <sys/kmem.h> 43 #include <sys/atomic.h> 44 #include <sys/errno.h> 45 #include <sys/vtrace.h> 46 #include <sys/ftrace.h> 47 #include <sys/ontrap.h> 48 #include <sys/multidata.h> 49 #include <sys/multidata_impl.h> 50 #include <sys/sdt.h> 51 #include <sys/strft.h> 52 53 #ifdef DEBUG 54 #include <sys/kmem_impl.h> 55 #endif 56 57 /* 58 * This file contains all the STREAMS utility routines that may 59 * be used by modules and drivers. 60 */ 61 62 /* 63 * STREAMS message allocator: principles of operation 64 * 65 * The streams message allocator consists of all the routines that 66 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 67 * dupb(), freeb() and freemsg(). What follows is a high-level view 68 * of how the allocator works. 69 * 70 * Every streams message consists of one or more mblks, a dblk, and data. 71 * All mblks for all types of messages come from a common mblk_cache. 72 * The dblk and data come in several flavors, depending on how the 73 * message is allocated: 74 * 75 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 76 * fixed-size dblk/data caches. For message sizes that are multiples of 77 * PAGESIZE, dblks are allocated separately from the buffer. 78 * The associated buffer is allocated by the constructor using kmem_alloc(). 79 * For all other message sizes, dblk and its associated data is allocated 80 * as a single contiguous chunk of memory. 81 * Objects in these caches consist of a dblk plus its associated data. 82 * allocb() determines the nearest-size cache by table lookup: 83 * the dblk_cache[] array provides the mapping from size to dblk cache. 84 * 85 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 86 * kmem_alloc()'ing a buffer for the data and supplying that 87 * buffer to gesballoc(), described below. 88 * 89 * (3) The four flavors of [d]esballoc[a] are all implemented by a 90 * common routine, gesballoc() ("generic esballoc"). gesballoc() 91 * allocates a dblk from the global dblk_esb_cache and sets db_base, 92 * db_lim and db_frtnp to describe the caller-supplied buffer. 93 * 94 * While there are several routines to allocate messages, there is only 95 * one routine to free messages: freeb(). freeb() simply invokes the 96 * dblk's free method, dbp->db_free(), which is set at allocation time. 97 * 98 * dupb() creates a new reference to a message by allocating a new mblk, 99 * incrementing the dblk reference count and setting the dblk's free 100 * method to dblk_decref(). The dblk's original free method is retained 101 * in db_lastfree. dblk_decref() decrements the reference count on each 102 * freeb(). If this is not the last reference it just frees the mblk; 103 * if this *is* the last reference, it restores db_free to db_lastfree, 104 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 105 * 106 * The implementation makes aggressive use of kmem object caching for 107 * maximum performance. This makes the code simple and compact, but 108 * also a bit abstruse in some places. The invariants that constitute a 109 * message's constructed state, described below, are more subtle than usual. 110 * 111 * Every dblk has an "attached mblk" as part of its constructed state. 112 * The mblk is allocated by the dblk's constructor and remains attached 113 * until the message is either dup'ed or pulled up. In the dupb() case 114 * the mblk association doesn't matter until the last free, at which time 115 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 116 * the mblk association because it swaps the leading mblks of two messages, 117 * so it is responsible for swapping their db_mblk pointers accordingly. 118 * From a constructed-state viewpoint it doesn't matter that a dblk's 119 * attached mblk can change while the message is allocated; all that 120 * matters is that the dblk has *some* attached mblk when it's freed. 121 * 122 * The sizes of the allocb() small-message caches are not magical. 123 * They represent a good trade-off between internal and external 124 * fragmentation for current workloads. They should be reevaluated 125 * periodically, especially if allocations larger than DBLK_MAX_CACHE 126 * become common. We use 64-byte alignment so that dblks don't 127 * straddle cache lines unnecessarily. 128 */ 129 #define DBLK_MAX_CACHE 73728 130 #define DBLK_CACHE_ALIGN 64 131 #define DBLK_MIN_SIZE 8 132 #define DBLK_SIZE_SHIFT 3 133 134 #ifdef _BIG_ENDIAN 135 #define DBLK_RTFU_SHIFT(field) \ 136 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 137 #else 138 #define DBLK_RTFU_SHIFT(field) \ 139 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 140 #endif 141 142 #define DBLK_RTFU(ref, type, flags, uioflag) \ 143 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 144 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 145 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 146 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 147 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 148 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 149 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 150 151 static size_t dblk_sizes[] = { 152 #ifdef _LP64 153 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856, 154 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624, 155 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392, 156 #else 157 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904, 158 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672, 159 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440, 160 #endif 161 DBLK_MAX_CACHE, 0 162 }; 163 164 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 165 static struct kmem_cache *mblk_cache; 166 static struct kmem_cache *dblk_esb_cache; 167 static struct kmem_cache *fthdr_cache; 168 static struct kmem_cache *ftblk_cache; 169 170 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 171 static mblk_t *allocb_oversize(size_t size, int flags); 172 static int allocb_tryhard_fails; 173 static void frnop_func(void *arg); 174 frtn_t frnop = { frnop_func }; 175 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 176 177 static boolean_t rwnext_enter(queue_t *qp); 178 static void rwnext_exit(queue_t *qp); 179 180 /* 181 * Patchable mblk/dblk kmem_cache flags. 182 */ 183 int dblk_kmem_flags = 0; 184 int mblk_kmem_flags = 0; 185 186 static int 187 dblk_constructor(void *buf, void *cdrarg, int kmflags) 188 { 189 dblk_t *dbp = buf; 190 ssize_t msg_size = (ssize_t)cdrarg; 191 size_t index; 192 193 ASSERT(msg_size != 0); 194 195 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 196 197 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 198 199 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 200 return (-1); 201 if ((msg_size & PAGEOFFSET) == 0) { 202 dbp->db_base = kmem_alloc(msg_size, kmflags); 203 if (dbp->db_base == NULL) { 204 kmem_cache_free(mblk_cache, dbp->db_mblk); 205 return (-1); 206 } 207 } else { 208 dbp->db_base = (unsigned char *)&dbp[1]; 209 } 210 211 dbp->db_mblk->b_datap = dbp; 212 dbp->db_cache = dblk_cache[index]; 213 dbp->db_lim = dbp->db_base + msg_size; 214 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 215 dbp->db_frtnp = NULL; 216 dbp->db_fthdr = NULL; 217 dbp->db_credp = NULL; 218 dbp->db_cpid = -1; 219 dbp->db_struioflag = 0; 220 dbp->db_struioun.cksum.flags = 0; 221 return (0); 222 } 223 224 /*ARGSUSED*/ 225 static int 226 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 227 { 228 dblk_t *dbp = buf; 229 230 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 231 return (-1); 232 dbp->db_mblk->b_datap = dbp; 233 dbp->db_cache = dblk_esb_cache; 234 dbp->db_fthdr = NULL; 235 dbp->db_credp = NULL; 236 dbp->db_cpid = -1; 237 dbp->db_struioflag = 0; 238 dbp->db_struioun.cksum.flags = 0; 239 return (0); 240 } 241 242 static int 243 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 244 { 245 dblk_t *dbp = buf; 246 bcache_t *bcp = cdrarg; 247 248 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 249 return (-1); 250 251 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags); 252 if (dbp->db_base == NULL) { 253 kmem_cache_free(mblk_cache, dbp->db_mblk); 254 return (-1); 255 } 256 257 dbp->db_mblk->b_datap = dbp; 258 dbp->db_cache = (void *)bcp; 259 dbp->db_lim = dbp->db_base + bcp->size; 260 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 261 dbp->db_frtnp = NULL; 262 dbp->db_fthdr = NULL; 263 dbp->db_credp = NULL; 264 dbp->db_cpid = -1; 265 dbp->db_struioflag = 0; 266 dbp->db_struioun.cksum.flags = 0; 267 return (0); 268 } 269 270 /*ARGSUSED*/ 271 static void 272 dblk_destructor(void *buf, void *cdrarg) 273 { 274 dblk_t *dbp = buf; 275 ssize_t msg_size = (ssize_t)cdrarg; 276 277 ASSERT(dbp->db_mblk->b_datap == dbp); 278 ASSERT(msg_size != 0); 279 ASSERT(dbp->db_struioflag == 0); 280 ASSERT(dbp->db_struioun.cksum.flags == 0); 281 282 if ((msg_size & PAGEOFFSET) == 0) { 283 kmem_free(dbp->db_base, msg_size); 284 } 285 286 kmem_cache_free(mblk_cache, dbp->db_mblk); 287 } 288 289 static void 290 bcache_dblk_destructor(void *buf, void *cdrarg) 291 { 292 dblk_t *dbp = buf; 293 bcache_t *bcp = cdrarg; 294 295 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 296 297 ASSERT(dbp->db_mblk->b_datap == dbp); 298 ASSERT(dbp->db_struioflag == 0); 299 ASSERT(dbp->db_struioun.cksum.flags == 0); 300 301 kmem_cache_free(mblk_cache, dbp->db_mblk); 302 } 303 304 /* ARGSUSED */ 305 static int 306 ftblk_constructor(void *buf, void *cdrarg, int kmflags) 307 { 308 ftblk_t *fbp = buf; 309 int i; 310 311 bzero(fbp, sizeof (ftblk_t)); 312 if (str_ftstack != 0) { 313 for (i = 0; i < FTBLK_EVNTS; i++) 314 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags); 315 } 316 317 return (0); 318 } 319 320 /* ARGSUSED */ 321 static void 322 ftblk_destructor(void *buf, void *cdrarg) 323 { 324 ftblk_t *fbp = buf; 325 int i; 326 327 if (str_ftstack != 0) { 328 for (i = 0; i < FTBLK_EVNTS; i++) { 329 if (fbp->ev[i].stk != NULL) { 330 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t)); 331 fbp->ev[i].stk = NULL; 332 } 333 } 334 } 335 } 336 337 static int 338 fthdr_constructor(void *buf, void *cdrarg, int kmflags) 339 { 340 fthdr_t *fhp = buf; 341 342 return (ftblk_constructor(&fhp->first, cdrarg, kmflags)); 343 } 344 345 static void 346 fthdr_destructor(void *buf, void *cdrarg) 347 { 348 fthdr_t *fhp = buf; 349 350 ftblk_destructor(&fhp->first, cdrarg); 351 } 352 353 void 354 streams_msg_init(void) 355 { 356 char name[40]; 357 size_t size; 358 size_t lastsize = DBLK_MIN_SIZE; 359 size_t *sizep; 360 struct kmem_cache *cp; 361 size_t tot_size; 362 int offset; 363 364 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32, 365 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags); 366 367 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 368 369 if ((offset = (size & PAGEOFFSET)) != 0) { 370 /* 371 * We are in the middle of a page, dblk should 372 * be allocated on the same page 373 */ 374 tot_size = size + sizeof (dblk_t); 375 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 376 < PAGESIZE); 377 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 378 379 } else { 380 381 /* 382 * buf size is multiple of page size, dblk and 383 * buffer are allocated separately. 384 */ 385 386 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 387 tot_size = sizeof (dblk_t); 388 } 389 390 (void) sprintf(name, "streams_dblk_%ld", size); 391 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN, 392 dblk_constructor, dblk_destructor, NULL, (void *)(size), 393 NULL, dblk_kmem_flags); 394 395 while (lastsize <= size) { 396 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 397 lastsize += DBLK_MIN_SIZE; 398 } 399 } 400 401 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t), 402 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL, 403 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags); 404 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32, 405 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0); 406 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32, 407 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0); 408 409 /* Initialize Multidata caches */ 410 mmd_init(); 411 412 /* initialize throttling queue for esballoc */ 413 esballoc_queue_init(); 414 } 415 416 /*ARGSUSED*/ 417 mblk_t * 418 allocb(size_t size, uint_t pri) 419 { 420 dblk_t *dbp; 421 mblk_t *mp; 422 size_t index; 423 424 index = (size - 1) >> DBLK_SIZE_SHIFT; 425 426 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 427 if (size != 0) { 428 mp = allocb_oversize(size, KM_NOSLEEP); 429 goto out; 430 } 431 index = 0; 432 } 433 434 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 435 mp = NULL; 436 goto out; 437 } 438 439 mp = dbp->db_mblk; 440 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 441 mp->b_next = mp->b_prev = mp->b_cont = NULL; 442 mp->b_rptr = mp->b_wptr = dbp->db_base; 443 mp->b_queue = NULL; 444 MBLK_BAND_FLAG_WORD(mp) = 0; 445 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 446 out: 447 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 448 449 return (mp); 450 } 451 452 /* 453 * Allocate an mblk taking db_credp and db_cpid from the template. 454 * Allow the cred to be NULL. 455 */ 456 mblk_t * 457 allocb_tmpl(size_t size, const mblk_t *tmpl) 458 { 459 mblk_t *mp = allocb(size, 0); 460 461 if (mp != NULL) { 462 dblk_t *src = tmpl->b_datap; 463 dblk_t *dst = mp->b_datap; 464 cred_t *cr; 465 pid_t cpid; 466 467 cr = msg_getcred(tmpl, &cpid); 468 if (cr != NULL) 469 crhold(dst->db_credp = cr); 470 dst->db_cpid = cpid; 471 dst->db_type = src->db_type; 472 } 473 return (mp); 474 } 475 476 mblk_t * 477 allocb_cred(size_t size, cred_t *cr, pid_t cpid) 478 { 479 mblk_t *mp = allocb(size, 0); 480 481 ASSERT(cr != NULL); 482 if (mp != NULL) { 483 dblk_t *dbp = mp->b_datap; 484 485 crhold(dbp->db_credp = cr); 486 dbp->db_cpid = cpid; 487 } 488 return (mp); 489 } 490 491 mblk_t * 492 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid) 493 { 494 mblk_t *mp = allocb_wait(size, 0, flags, error); 495 496 ASSERT(cr != NULL); 497 if (mp != NULL) { 498 dblk_t *dbp = mp->b_datap; 499 500 crhold(dbp->db_credp = cr); 501 dbp->db_cpid = cpid; 502 } 503 504 return (mp); 505 } 506 507 /* 508 * Extract the db_cred (and optionally db_cpid) from a message. 509 * We find the first mblk which has a non-NULL db_cred and use that. 510 * If none found we return NULL. 511 * Does NOT get a hold on the cred. 512 */ 513 cred_t * 514 msg_getcred(const mblk_t *mp, pid_t *cpidp) 515 { 516 cred_t *cr = NULL; 517 cred_t *cr2; 518 mblk_t *mp2; 519 520 while (mp != NULL) { 521 dblk_t *dbp = mp->b_datap; 522 523 cr = dbp->db_credp; 524 if (cr == NULL) { 525 mp = mp->b_cont; 526 continue; 527 } 528 if (cpidp != NULL) 529 *cpidp = dbp->db_cpid; 530 531 #ifdef DEBUG 532 /* 533 * Normally there should at most one db_credp in a message. 534 * But if there are multiple (as in the case of some M_IOC* 535 * and some internal messages in TCP/IP bind logic) then 536 * they must be identical in the normal case. 537 * However, a socket can be shared between different uids 538 * in which case data queued in TCP would be from different 539 * creds. Thus we can only assert for the zoneid being the 540 * same. Due to Multi-level Level Ports for TX, some 541 * cred_t can have a NULL cr_zone, and we skip the comparison 542 * in that case. 543 */ 544 mp2 = mp->b_cont; 545 while (mp2 != NULL) { 546 cr2 = DB_CRED(mp2); 547 if (cr2 != NULL) { 548 DTRACE_PROBE2(msg__getcred, 549 cred_t *, cr, cred_t *, cr2); 550 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 551 crgetzone(cr) == NULL || 552 crgetzone(cr2) == NULL); 553 } 554 mp2 = mp2->b_cont; 555 } 556 #endif 557 return (cr); 558 } 559 if (cpidp != NULL) 560 *cpidp = NOPID; 561 return (NULL); 562 } 563 564 /* 565 * Variant of msg_getcred which, when a cred is found 566 * 1. Returns with a hold on the cred 567 * 2. Clears the first cred in the mblk. 568 * This is more efficient to use than a msg_getcred() + crhold() when 569 * the message is freed after the cred has been extracted. 570 * 571 * The caller is responsible for ensuring that there is no other reference 572 * on the message since db_credp can not be cleared when there are other 573 * references. 574 */ 575 cred_t * 576 msg_extractcred(mblk_t *mp, pid_t *cpidp) 577 { 578 cred_t *cr = NULL; 579 cred_t *cr2; 580 mblk_t *mp2; 581 582 while (mp != NULL) { 583 dblk_t *dbp = mp->b_datap; 584 585 cr = dbp->db_credp; 586 if (cr == NULL) { 587 mp = mp->b_cont; 588 continue; 589 } 590 ASSERT(dbp->db_ref == 1); 591 dbp->db_credp = NULL; 592 if (cpidp != NULL) 593 *cpidp = dbp->db_cpid; 594 #ifdef DEBUG 595 /* 596 * Normally there should at most one db_credp in a message. 597 * But if there are multiple (as in the case of some M_IOC* 598 * and some internal messages in TCP/IP bind logic) then 599 * they must be identical in the normal case. 600 * However, a socket can be shared between different uids 601 * in which case data queued in TCP would be from different 602 * creds. Thus we can only assert for the zoneid being the 603 * same. Due to Multi-level Level Ports for TX, some 604 * cred_t can have a NULL cr_zone, and we skip the comparison 605 * in that case. 606 */ 607 mp2 = mp->b_cont; 608 while (mp2 != NULL) { 609 cr2 = DB_CRED(mp2); 610 if (cr2 != NULL) { 611 DTRACE_PROBE2(msg__extractcred, 612 cred_t *, cr, cred_t *, cr2); 613 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 614 crgetzone(cr) == NULL || 615 crgetzone(cr2) == NULL); 616 } 617 mp2 = mp2->b_cont; 618 } 619 #endif 620 return (cr); 621 } 622 return (NULL); 623 } 624 /* 625 * Get the label for a message. Uses the first mblk in the message 626 * which has a non-NULL db_credp. 627 * Returns NULL if there is no credp. 628 */ 629 extern struct ts_label_s * 630 msg_getlabel(const mblk_t *mp) 631 { 632 cred_t *cr = msg_getcred(mp, NULL); 633 634 if (cr == NULL) 635 return (NULL); 636 637 return (crgetlabel(cr)); 638 } 639 640 void 641 freeb(mblk_t *mp) 642 { 643 dblk_t *dbp = mp->b_datap; 644 645 ASSERT(dbp->db_ref > 0); 646 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 647 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 648 649 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 650 651 dbp->db_free(mp, dbp); 652 } 653 654 void 655 freemsg(mblk_t *mp) 656 { 657 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 658 while (mp) { 659 dblk_t *dbp = mp->b_datap; 660 mblk_t *mp_cont = mp->b_cont; 661 662 ASSERT(dbp->db_ref > 0); 663 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 664 665 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 666 667 dbp->db_free(mp, dbp); 668 mp = mp_cont; 669 } 670 } 671 672 /* 673 * Reallocate a block for another use. Try hard to use the old block. 674 * If the old data is wanted (copy), leave b_wptr at the end of the data, 675 * otherwise return b_wptr = b_rptr. 676 * 677 * This routine is private and unstable. 678 */ 679 mblk_t * 680 reallocb(mblk_t *mp, size_t size, uint_t copy) 681 { 682 mblk_t *mp1; 683 unsigned char *old_rptr; 684 ptrdiff_t cur_size; 685 686 if (mp == NULL) 687 return (allocb(size, BPRI_HI)); 688 689 cur_size = mp->b_wptr - mp->b_rptr; 690 old_rptr = mp->b_rptr; 691 692 ASSERT(mp->b_datap->db_ref != 0); 693 694 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 695 /* 696 * If the data is wanted and it will fit where it is, no 697 * work is required. 698 */ 699 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 700 return (mp); 701 702 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 703 mp1 = mp; 704 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 705 /* XXX other mp state could be copied too, db_flags ... ? */ 706 mp1->b_cont = mp->b_cont; 707 } else { 708 return (NULL); 709 } 710 711 if (copy) { 712 bcopy(old_rptr, mp1->b_rptr, cur_size); 713 mp1->b_wptr = mp1->b_rptr + cur_size; 714 } 715 716 if (mp != mp1) 717 freeb(mp); 718 719 return (mp1); 720 } 721 722 static void 723 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 724 { 725 ASSERT(dbp->db_mblk == mp); 726 if (dbp->db_fthdr != NULL) 727 str_ftfree(dbp); 728 729 /* set credp and projid to be 'unspecified' before returning to cache */ 730 if (dbp->db_credp != NULL) { 731 crfree(dbp->db_credp); 732 dbp->db_credp = NULL; 733 } 734 dbp->db_cpid = -1; 735 736 /* Reset the struioflag and the checksum flag fields */ 737 dbp->db_struioflag = 0; 738 dbp->db_struioun.cksum.flags = 0; 739 740 /* and the COOKED and/or UIOA flag(s) */ 741 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA); 742 743 kmem_cache_free(dbp->db_cache, dbp); 744 } 745 746 static void 747 dblk_decref(mblk_t *mp, dblk_t *dbp) 748 { 749 if (dbp->db_ref != 1) { 750 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 751 -(1 << DBLK_RTFU_SHIFT(db_ref))); 752 /* 753 * atomic_add_32_nv() just decremented db_ref, so we no longer 754 * have a reference to the dblk, which means another thread 755 * could free it. Therefore we cannot examine the dblk to 756 * determine whether ours was the last reference. Instead, 757 * we extract the new and minimum reference counts from rtfu. 758 * Note that all we're really saying is "if (ref != refmin)". 759 */ 760 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 761 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 762 kmem_cache_free(mblk_cache, mp); 763 return; 764 } 765 } 766 dbp->db_mblk = mp; 767 dbp->db_free = dbp->db_lastfree; 768 dbp->db_lastfree(mp, dbp); 769 } 770 771 mblk_t * 772 dupb(mblk_t *mp) 773 { 774 dblk_t *dbp = mp->b_datap; 775 mblk_t *new_mp; 776 uint32_t oldrtfu, newrtfu; 777 778 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 779 goto out; 780 781 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 782 new_mp->b_rptr = mp->b_rptr; 783 new_mp->b_wptr = mp->b_wptr; 784 new_mp->b_datap = dbp; 785 new_mp->b_queue = NULL; 786 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 787 788 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 789 790 dbp->db_free = dblk_decref; 791 do { 792 ASSERT(dbp->db_ref > 0); 793 oldrtfu = DBLK_RTFU_WORD(dbp); 794 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 795 /* 796 * If db_ref is maxed out we can't dup this message anymore. 797 */ 798 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 799 kmem_cache_free(mblk_cache, new_mp); 800 new_mp = NULL; 801 goto out; 802 } 803 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != 804 oldrtfu); 805 806 out: 807 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 808 return (new_mp); 809 } 810 811 static void 812 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 813 { 814 frtn_t *frp = dbp->db_frtnp; 815 816 ASSERT(dbp->db_mblk == mp); 817 frp->free_func(frp->free_arg); 818 if (dbp->db_fthdr != NULL) 819 str_ftfree(dbp); 820 821 /* set credp and projid to be 'unspecified' before returning to cache */ 822 if (dbp->db_credp != NULL) { 823 crfree(dbp->db_credp); 824 dbp->db_credp = NULL; 825 } 826 dbp->db_cpid = -1; 827 dbp->db_struioflag = 0; 828 dbp->db_struioun.cksum.flags = 0; 829 830 kmem_cache_free(dbp->db_cache, dbp); 831 } 832 833 /*ARGSUSED*/ 834 static void 835 frnop_func(void *arg) 836 { 837 } 838 839 /* 840 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 841 * 842 * The variants with a 'd' prefix (desballoc, desballoca) 843 * directly free the mblk when it loses its last ref, 844 * where the other variants free asynchronously. 845 * The variants with an 'a' suffix (esballoca, desballoca) 846 * add an extra ref, effectively letting the streams subsystem 847 * know that the message data should not be modified. 848 * (eg. see db_ref checks in reallocb and elsewhere) 849 * 850 * The method used by the 'a' suffix functions to keep the dblk 851 * db_ref > 1 is non-obvious. The macro DBLK_RTFU(2,...) passed to 852 * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN 853 * bit in db_flags. In dblk_decref() that flag essentially means 854 * the dblk has one extra ref, so the "last ref" is one, not zero. 855 */ 856 static mblk_t * 857 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 858 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 859 { 860 dblk_t *dbp; 861 mblk_t *mp; 862 863 ASSERT(base != NULL && frp != NULL); 864 865 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 866 mp = NULL; 867 goto out; 868 } 869 870 mp = dbp->db_mblk; 871 dbp->db_base = base; 872 dbp->db_lim = base + size; 873 dbp->db_free = dbp->db_lastfree = lastfree; 874 dbp->db_frtnp = frp; 875 DBLK_RTFU_WORD(dbp) = db_rtfu; 876 mp->b_next = mp->b_prev = mp->b_cont = NULL; 877 mp->b_rptr = mp->b_wptr = base; 878 mp->b_queue = NULL; 879 MBLK_BAND_FLAG_WORD(mp) = 0; 880 881 out: 882 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 883 return (mp); 884 } 885 886 /*ARGSUSED*/ 887 mblk_t * 888 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 889 { 890 mblk_t *mp; 891 892 /* 893 * Note that this is structured to allow the common case (i.e. 894 * STREAMS flowtracing disabled) to call gesballoc() with tail 895 * call optimization. 896 */ 897 if (!str_ftnever) { 898 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 899 frp, freebs_enqueue, KM_NOSLEEP); 900 901 if (mp != NULL) 902 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 903 return (mp); 904 } 905 906 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 907 frp, freebs_enqueue, KM_NOSLEEP)); 908 } 909 910 /* 911 * Same as esballoc() but sleeps waiting for memory. 912 */ 913 /*ARGSUSED*/ 914 mblk_t * 915 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 916 { 917 mblk_t *mp; 918 919 /* 920 * Note that this is structured to allow the common case (i.e. 921 * STREAMS flowtracing disabled) to call gesballoc() with tail 922 * call optimization. 923 */ 924 if (!str_ftnever) { 925 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 926 frp, freebs_enqueue, KM_SLEEP); 927 928 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 929 return (mp); 930 } 931 932 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 933 frp, freebs_enqueue, KM_SLEEP)); 934 } 935 936 /*ARGSUSED*/ 937 mblk_t * 938 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 939 { 940 mblk_t *mp; 941 942 /* 943 * Note that this is structured to allow the common case (i.e. 944 * STREAMS flowtracing disabled) to call gesballoc() with tail 945 * call optimization. 946 */ 947 if (!str_ftnever) { 948 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 949 frp, dblk_lastfree_desb, KM_NOSLEEP); 950 951 if (mp != NULL) 952 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 953 return (mp); 954 } 955 956 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 957 frp, dblk_lastfree_desb, KM_NOSLEEP)); 958 } 959 960 /*ARGSUSED*/ 961 mblk_t * 962 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 963 { 964 mblk_t *mp; 965 966 /* 967 * Note that this is structured to allow the common case (i.e. 968 * STREAMS flowtracing disabled) to call gesballoc() with tail 969 * call optimization. 970 */ 971 if (!str_ftnever) { 972 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 973 frp, freebs_enqueue, KM_NOSLEEP); 974 975 if (mp != NULL) 976 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 977 return (mp); 978 } 979 980 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 981 frp, freebs_enqueue, KM_NOSLEEP)); 982 } 983 984 /*ARGSUSED*/ 985 mblk_t * 986 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 987 { 988 mblk_t *mp; 989 990 /* 991 * Note that this is structured to allow the common case (i.e. 992 * STREAMS flowtracing disabled) to call gesballoc() with tail 993 * call optimization. 994 */ 995 if (!str_ftnever) { 996 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 997 frp, dblk_lastfree_desb, KM_NOSLEEP); 998 999 if (mp != NULL) 1000 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 1001 return (mp); 1002 } 1003 1004 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 1005 frp, dblk_lastfree_desb, KM_NOSLEEP)); 1006 } 1007 1008 static void 1009 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 1010 { 1011 bcache_t *bcp = dbp->db_cache; 1012 1013 ASSERT(dbp->db_mblk == mp); 1014 if (dbp->db_fthdr != NULL) 1015 str_ftfree(dbp); 1016 1017 /* set credp and projid to be 'unspecified' before returning to cache */ 1018 if (dbp->db_credp != NULL) { 1019 crfree(dbp->db_credp); 1020 dbp->db_credp = NULL; 1021 } 1022 dbp->db_cpid = -1; 1023 dbp->db_struioflag = 0; 1024 dbp->db_struioun.cksum.flags = 0; 1025 1026 mutex_enter(&bcp->mutex); 1027 kmem_cache_free(bcp->dblk_cache, dbp); 1028 bcp->alloc--; 1029 1030 if (bcp->alloc == 0 && bcp->destroy != 0) { 1031 kmem_cache_destroy(bcp->dblk_cache); 1032 kmem_cache_destroy(bcp->buffer_cache); 1033 mutex_exit(&bcp->mutex); 1034 mutex_destroy(&bcp->mutex); 1035 kmem_free(bcp, sizeof (bcache_t)); 1036 } else { 1037 mutex_exit(&bcp->mutex); 1038 } 1039 } 1040 1041 bcache_t * 1042 bcache_create(char *name, size_t size, uint_t align) 1043 { 1044 bcache_t *bcp; 1045 char buffer[255]; 1046 1047 ASSERT((align & (align - 1)) == 0); 1048 1049 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL) 1050 return (NULL); 1051 1052 bcp->size = size; 1053 bcp->align = align; 1054 bcp->alloc = 0; 1055 bcp->destroy = 0; 1056 1057 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 1058 1059 (void) sprintf(buffer, "%s_buffer_cache", name); 1060 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 1061 NULL, NULL, NULL, 0); 1062 (void) sprintf(buffer, "%s_dblk_cache", name); 1063 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 1064 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 1065 NULL, (void *)bcp, NULL, 0); 1066 1067 return (bcp); 1068 } 1069 1070 void 1071 bcache_destroy(bcache_t *bcp) 1072 { 1073 ASSERT(bcp != NULL); 1074 1075 mutex_enter(&bcp->mutex); 1076 if (bcp->alloc == 0) { 1077 kmem_cache_destroy(bcp->dblk_cache); 1078 kmem_cache_destroy(bcp->buffer_cache); 1079 mutex_exit(&bcp->mutex); 1080 mutex_destroy(&bcp->mutex); 1081 kmem_free(bcp, sizeof (bcache_t)); 1082 } else { 1083 bcp->destroy++; 1084 mutex_exit(&bcp->mutex); 1085 } 1086 } 1087 1088 /*ARGSUSED*/ 1089 mblk_t * 1090 bcache_allocb(bcache_t *bcp, uint_t pri) 1091 { 1092 dblk_t *dbp; 1093 mblk_t *mp = NULL; 1094 1095 ASSERT(bcp != NULL); 1096 1097 mutex_enter(&bcp->mutex); 1098 if (bcp->destroy != 0) { 1099 mutex_exit(&bcp->mutex); 1100 goto out; 1101 } 1102 1103 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 1104 mutex_exit(&bcp->mutex); 1105 goto out; 1106 } 1107 bcp->alloc++; 1108 mutex_exit(&bcp->mutex); 1109 1110 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 1111 1112 mp = dbp->db_mblk; 1113 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1114 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1115 mp->b_rptr = mp->b_wptr = dbp->db_base; 1116 mp->b_queue = NULL; 1117 MBLK_BAND_FLAG_WORD(mp) = 0; 1118 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 1119 out: 1120 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 1121 1122 return (mp); 1123 } 1124 1125 static void 1126 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 1127 { 1128 ASSERT(dbp->db_mblk == mp); 1129 if (dbp->db_fthdr != NULL) 1130 str_ftfree(dbp); 1131 1132 /* set credp and projid to be 'unspecified' before returning to cache */ 1133 if (dbp->db_credp != NULL) { 1134 crfree(dbp->db_credp); 1135 dbp->db_credp = NULL; 1136 } 1137 dbp->db_cpid = -1; 1138 dbp->db_struioflag = 0; 1139 dbp->db_struioun.cksum.flags = 0; 1140 1141 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 1142 kmem_cache_free(dbp->db_cache, dbp); 1143 } 1144 1145 static mblk_t * 1146 allocb_oversize(size_t size, int kmflags) 1147 { 1148 mblk_t *mp; 1149 void *buf; 1150 1151 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 1152 if ((buf = kmem_alloc(size, kmflags)) == NULL) 1153 return (NULL); 1154 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 1155 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 1156 kmem_free(buf, size); 1157 1158 if (mp != NULL) 1159 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 1160 1161 return (mp); 1162 } 1163 1164 mblk_t * 1165 allocb_tryhard(size_t target_size) 1166 { 1167 size_t size; 1168 mblk_t *bp; 1169 1170 for (size = target_size; size < target_size + 512; 1171 size += DBLK_CACHE_ALIGN) 1172 if ((bp = allocb(size, BPRI_HI)) != NULL) 1173 return (bp); 1174 allocb_tryhard_fails++; 1175 return (NULL); 1176 } 1177 1178 /* 1179 * This routine is consolidation private for STREAMS internal use 1180 * This routine may only be called from sync routines (i.e., not 1181 * from put or service procedures). It is located here (rather 1182 * than strsubr.c) so that we don't have to expose all of the 1183 * allocb() implementation details in header files. 1184 */ 1185 mblk_t * 1186 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 1187 { 1188 dblk_t *dbp; 1189 mblk_t *mp; 1190 size_t index; 1191 1192 index = (size -1) >> DBLK_SIZE_SHIFT; 1193 1194 if (flags & STR_NOSIG) { 1195 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 1196 if (size != 0) { 1197 mp = allocb_oversize(size, KM_SLEEP); 1198 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1199 (uintptr_t)mp); 1200 return (mp); 1201 } 1202 index = 0; 1203 } 1204 1205 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1206 mp = dbp->db_mblk; 1207 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1208 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1209 mp->b_rptr = mp->b_wptr = dbp->db_base; 1210 mp->b_queue = NULL; 1211 MBLK_BAND_FLAG_WORD(mp) = 0; 1212 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1213 1214 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1215 1216 } else { 1217 while ((mp = allocb(size, pri)) == NULL) { 1218 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1219 return (NULL); 1220 } 1221 } 1222 1223 return (mp); 1224 } 1225 1226 /* 1227 * Call function 'func' with 'arg' when a class zero block can 1228 * be allocated with priority 'pri'. 1229 */ 1230 bufcall_id_t 1231 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1232 { 1233 return (bufcall(1, pri, func, arg)); 1234 } 1235 1236 /* 1237 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1238 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1239 * This provides consistency for all internal allocators of ioctl. 1240 */ 1241 mblk_t * 1242 mkiocb(uint_t cmd) 1243 { 1244 struct iocblk *ioc; 1245 mblk_t *mp; 1246 1247 /* 1248 * Allocate enough space for any of the ioctl related messages. 1249 */ 1250 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1251 return (NULL); 1252 1253 bzero(mp->b_rptr, sizeof (union ioctypes)); 1254 1255 /* 1256 * Set the mblk_t information and ptrs correctly. 1257 */ 1258 mp->b_wptr += sizeof (struct iocblk); 1259 mp->b_datap->db_type = M_IOCTL; 1260 1261 /* 1262 * Fill in the fields. 1263 */ 1264 ioc = (struct iocblk *)mp->b_rptr; 1265 ioc->ioc_cmd = cmd; 1266 ioc->ioc_cr = kcred; 1267 ioc->ioc_id = getiocseqno(); 1268 ioc->ioc_flag = IOC_NATIVE; 1269 return (mp); 1270 } 1271 1272 /* 1273 * test if block of given size can be allocated with a request of 1274 * the given priority. 1275 * 'pri' is no longer used, but is retained for compatibility. 1276 */ 1277 /* ARGSUSED */ 1278 int 1279 testb(size_t size, uint_t pri) 1280 { 1281 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1282 } 1283 1284 /* 1285 * Call function 'func' with argument 'arg' when there is a reasonably 1286 * good chance that a block of size 'size' can be allocated. 1287 * 'pri' is no longer used, but is retained for compatibility. 1288 */ 1289 /* ARGSUSED */ 1290 bufcall_id_t 1291 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1292 { 1293 static long bid = 1; /* always odd to save checking for zero */ 1294 bufcall_id_t bc_id; 1295 struct strbufcall *bcp; 1296 1297 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1298 return (0); 1299 1300 bcp->bc_func = func; 1301 bcp->bc_arg = arg; 1302 bcp->bc_size = size; 1303 bcp->bc_next = NULL; 1304 bcp->bc_executor = NULL; 1305 1306 mutex_enter(&strbcall_lock); 1307 /* 1308 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1309 * should be no references to bcp since it may be freed by 1310 * runbufcalls(). Since bcp_id field is returned, we save its value in 1311 * the local var. 1312 */ 1313 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1314 1315 /* 1316 * add newly allocated stream event to existing 1317 * linked list of events. 1318 */ 1319 if (strbcalls.bc_head == NULL) { 1320 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1321 } else { 1322 strbcalls.bc_tail->bc_next = bcp; 1323 strbcalls.bc_tail = bcp; 1324 } 1325 1326 cv_signal(&strbcall_cv); 1327 mutex_exit(&strbcall_lock); 1328 return (bc_id); 1329 } 1330 1331 /* 1332 * Cancel a bufcall request. 1333 */ 1334 void 1335 unbufcall(bufcall_id_t id) 1336 { 1337 strbufcall_t *bcp, *pbcp; 1338 1339 mutex_enter(&strbcall_lock); 1340 again: 1341 pbcp = NULL; 1342 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1343 if (id == bcp->bc_id) 1344 break; 1345 pbcp = bcp; 1346 } 1347 if (bcp) { 1348 if (bcp->bc_executor != NULL) { 1349 if (bcp->bc_executor != curthread) { 1350 cv_wait(&bcall_cv, &strbcall_lock); 1351 goto again; 1352 } 1353 } else { 1354 if (pbcp) 1355 pbcp->bc_next = bcp->bc_next; 1356 else 1357 strbcalls.bc_head = bcp->bc_next; 1358 if (bcp == strbcalls.bc_tail) 1359 strbcalls.bc_tail = pbcp; 1360 kmem_free(bcp, sizeof (strbufcall_t)); 1361 } 1362 } 1363 mutex_exit(&strbcall_lock); 1364 } 1365 1366 /* 1367 * Duplicate a message block by block (uses dupb), returning 1368 * a pointer to the duplicate message. 1369 * Returns a non-NULL value only if the entire message 1370 * was dup'd. 1371 */ 1372 mblk_t * 1373 dupmsg(mblk_t *bp) 1374 { 1375 mblk_t *head, *nbp; 1376 1377 if (!bp || !(nbp = head = dupb(bp))) 1378 return (NULL); 1379 1380 while (bp->b_cont) { 1381 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1382 freemsg(head); 1383 return (NULL); 1384 } 1385 nbp = nbp->b_cont; 1386 bp = bp->b_cont; 1387 } 1388 return (head); 1389 } 1390 1391 #define DUPB_NOLOAN(bp) \ 1392 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1393 copyb((bp)) : dupb((bp))) 1394 1395 mblk_t * 1396 dupmsg_noloan(mblk_t *bp) 1397 { 1398 mblk_t *head, *nbp; 1399 1400 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1401 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1402 return (NULL); 1403 1404 while (bp->b_cont) { 1405 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1406 freemsg(head); 1407 return (NULL); 1408 } 1409 nbp = nbp->b_cont; 1410 bp = bp->b_cont; 1411 } 1412 return (head); 1413 } 1414 1415 /* 1416 * Copy data from message and data block to newly allocated message and 1417 * data block. Returns new message block pointer, or NULL if error. 1418 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1419 * as in the original even when db_base is not word aligned. (bug 1052877) 1420 */ 1421 mblk_t * 1422 copyb(mblk_t *bp) 1423 { 1424 mblk_t *nbp; 1425 dblk_t *dp, *ndp; 1426 uchar_t *base; 1427 size_t size; 1428 size_t unaligned; 1429 1430 ASSERT(bp->b_wptr >= bp->b_rptr); 1431 1432 dp = bp->b_datap; 1433 if (dp->db_fthdr != NULL) 1434 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1435 1436 /* 1437 * Special handling for Multidata message; this should be 1438 * removed once a copy-callback routine is made available. 1439 */ 1440 if (dp->db_type == M_MULTIDATA) { 1441 cred_t *cr; 1442 1443 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1444 return (NULL); 1445 1446 nbp->b_flag = bp->b_flag; 1447 nbp->b_band = bp->b_band; 1448 ndp = nbp->b_datap; 1449 1450 /* See comments below on potential issues. */ 1451 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1452 1453 ASSERT(ndp->db_type == dp->db_type); 1454 cr = dp->db_credp; 1455 if (cr != NULL) 1456 crhold(ndp->db_credp = cr); 1457 ndp->db_cpid = dp->db_cpid; 1458 return (nbp); 1459 } 1460 1461 size = dp->db_lim - dp->db_base; 1462 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1463 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1464 return (NULL); 1465 nbp->b_flag = bp->b_flag; 1466 nbp->b_band = bp->b_band; 1467 ndp = nbp->b_datap; 1468 1469 /* 1470 * Copy the various checksum information that came in 1471 * originally. 1472 */ 1473 ndp->db_cksumstart = dp->db_cksumstart; 1474 ndp->db_cksumend = dp->db_cksumend; 1475 ndp->db_cksumstuff = dp->db_cksumstuff; 1476 bcopy(dp->db_struioun.data, ndp->db_struioun.data, 1477 sizeof (dp->db_struioun.data)); 1478 1479 /* 1480 * Well, here is a potential issue. If we are trying to 1481 * trace a flow, and we copy the message, we might lose 1482 * information about where this message might have been. 1483 * So we should inherit the FT data. On the other hand, 1484 * a user might be interested only in alloc to free data. 1485 * So I guess the real answer is to provide a tunable. 1486 */ 1487 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1488 1489 base = ndp->db_base + unaligned; 1490 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1491 1492 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1493 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1494 1495 return (nbp); 1496 } 1497 1498 /* 1499 * Copy data from message to newly allocated message using new 1500 * data blocks. Returns a pointer to the new message, or NULL if error. 1501 */ 1502 mblk_t * 1503 copymsg(mblk_t *bp) 1504 { 1505 mblk_t *head, *nbp; 1506 1507 if (!bp || !(nbp = head = copyb(bp))) 1508 return (NULL); 1509 1510 while (bp->b_cont) { 1511 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1512 freemsg(head); 1513 return (NULL); 1514 } 1515 nbp = nbp->b_cont; 1516 bp = bp->b_cont; 1517 } 1518 return (head); 1519 } 1520 1521 /* 1522 * link a message block to tail of message 1523 */ 1524 void 1525 linkb(mblk_t *mp, mblk_t *bp) 1526 { 1527 ASSERT(mp && bp); 1528 1529 for (; mp->b_cont; mp = mp->b_cont) 1530 ; 1531 mp->b_cont = bp; 1532 } 1533 1534 /* 1535 * unlink a message block from head of message 1536 * return pointer to new message. 1537 * NULL if message becomes empty. 1538 */ 1539 mblk_t * 1540 unlinkb(mblk_t *bp) 1541 { 1542 mblk_t *bp1; 1543 1544 bp1 = bp->b_cont; 1545 bp->b_cont = NULL; 1546 return (bp1); 1547 } 1548 1549 /* 1550 * remove a message block "bp" from message "mp" 1551 * 1552 * Return pointer to new message or NULL if no message remains. 1553 * Return -1 if bp is not found in message. 1554 */ 1555 mblk_t * 1556 rmvb(mblk_t *mp, mblk_t *bp) 1557 { 1558 mblk_t *tmp; 1559 mblk_t *lastp = NULL; 1560 1561 ASSERT(mp && bp); 1562 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1563 if (tmp == bp) { 1564 if (lastp) 1565 lastp->b_cont = tmp->b_cont; 1566 else 1567 mp = tmp->b_cont; 1568 tmp->b_cont = NULL; 1569 return (mp); 1570 } 1571 lastp = tmp; 1572 } 1573 return ((mblk_t *)-1); 1574 } 1575 1576 /* 1577 * Concatenate and align first len bytes of common 1578 * message type. Len == -1, means concat everything. 1579 * Returns 1 on success, 0 on failure 1580 * After the pullup, mp points to the pulled up data. 1581 */ 1582 int 1583 pullupmsg(mblk_t *mp, ssize_t len) 1584 { 1585 mblk_t *bp, *b_cont; 1586 dblk_t *dbp; 1587 ssize_t n; 1588 1589 ASSERT(mp->b_datap->db_ref > 0); 1590 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1591 1592 /* 1593 * We won't handle Multidata message, since it contains 1594 * metadata which this function has no knowledge of; we 1595 * assert on DEBUG, and return failure otherwise. 1596 */ 1597 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1598 if (mp->b_datap->db_type == M_MULTIDATA) 1599 return (0); 1600 1601 if (len == -1) { 1602 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1603 return (1); 1604 len = xmsgsize(mp); 1605 } else { 1606 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1607 ASSERT(first_mblk_len >= 0); 1608 /* 1609 * If the length is less than that of the first mblk, 1610 * we want to pull up the message into an aligned mblk. 1611 * Though not part of the spec, some callers assume it. 1612 */ 1613 if (len <= first_mblk_len) { 1614 if (str_aligned(mp->b_rptr)) 1615 return (1); 1616 len = first_mblk_len; 1617 } else if (xmsgsize(mp) < len) 1618 return (0); 1619 } 1620 1621 if ((bp = allocb_tmpl(len, mp)) == NULL) 1622 return (0); 1623 1624 dbp = bp->b_datap; 1625 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1626 mp->b_datap = dbp; /* ... and mp heads the new message */ 1627 mp->b_datap->db_mblk = mp; 1628 bp->b_datap->db_mblk = bp; 1629 mp->b_rptr = mp->b_wptr = dbp->db_base; 1630 1631 do { 1632 ASSERT(bp->b_datap->db_ref > 0); 1633 ASSERT(bp->b_wptr >= bp->b_rptr); 1634 n = MIN(bp->b_wptr - bp->b_rptr, len); 1635 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1636 if (n > 0) 1637 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1638 mp->b_wptr += n; 1639 bp->b_rptr += n; 1640 len -= n; 1641 if (bp->b_rptr != bp->b_wptr) 1642 break; 1643 b_cont = bp->b_cont; 1644 freeb(bp); 1645 bp = b_cont; 1646 } while (len && bp); 1647 1648 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1649 1650 return (1); 1651 } 1652 1653 /* 1654 * Concatenate and align at least the first len bytes of common message 1655 * type. Len == -1 means concatenate everything. The original message is 1656 * unaltered. Returns a pointer to a new message on success, otherwise 1657 * returns NULL. 1658 */ 1659 mblk_t * 1660 msgpullup(mblk_t *mp, ssize_t len) 1661 { 1662 mblk_t *newmp; 1663 ssize_t totlen; 1664 ssize_t n; 1665 1666 /* 1667 * We won't handle Multidata message, since it contains 1668 * metadata which this function has no knowledge of; we 1669 * assert on DEBUG, and return failure otherwise. 1670 */ 1671 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1672 if (mp->b_datap->db_type == M_MULTIDATA) 1673 return (NULL); 1674 1675 totlen = xmsgsize(mp); 1676 1677 if ((len > 0) && (len > totlen)) 1678 return (NULL); 1679 1680 /* 1681 * Copy all of the first msg type into one new mblk, then dupmsg 1682 * and link the rest onto this. 1683 */ 1684 1685 len = totlen; 1686 1687 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1688 return (NULL); 1689 1690 newmp->b_flag = mp->b_flag; 1691 newmp->b_band = mp->b_band; 1692 1693 while (len > 0) { 1694 n = mp->b_wptr - mp->b_rptr; 1695 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1696 if (n > 0) 1697 bcopy(mp->b_rptr, newmp->b_wptr, n); 1698 newmp->b_wptr += n; 1699 len -= n; 1700 mp = mp->b_cont; 1701 } 1702 1703 if (mp != NULL) { 1704 newmp->b_cont = dupmsg(mp); 1705 if (newmp->b_cont == NULL) { 1706 freemsg(newmp); 1707 return (NULL); 1708 } 1709 } 1710 1711 return (newmp); 1712 } 1713 1714 /* 1715 * Trim bytes from message 1716 * len > 0, trim from head 1717 * len < 0, trim from tail 1718 * Returns 1 on success, 0 on failure. 1719 */ 1720 int 1721 adjmsg(mblk_t *mp, ssize_t len) 1722 { 1723 mblk_t *bp; 1724 mblk_t *save_bp = NULL; 1725 mblk_t *prev_bp; 1726 mblk_t *bcont; 1727 unsigned char type; 1728 ssize_t n; 1729 int fromhead; 1730 int first; 1731 1732 ASSERT(mp != NULL); 1733 /* 1734 * We won't handle Multidata message, since it contains 1735 * metadata which this function has no knowledge of; we 1736 * assert on DEBUG, and return failure otherwise. 1737 */ 1738 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1739 if (mp->b_datap->db_type == M_MULTIDATA) 1740 return (0); 1741 1742 if (len < 0) { 1743 fromhead = 0; 1744 len = -len; 1745 } else { 1746 fromhead = 1; 1747 } 1748 1749 if (xmsgsize(mp) < len) 1750 return (0); 1751 1752 if (fromhead) { 1753 first = 1; 1754 while (len) { 1755 ASSERT(mp->b_wptr >= mp->b_rptr); 1756 n = MIN(mp->b_wptr - mp->b_rptr, len); 1757 mp->b_rptr += n; 1758 len -= n; 1759 1760 /* 1761 * If this is not the first zero length 1762 * message remove it 1763 */ 1764 if (!first && (mp->b_wptr == mp->b_rptr)) { 1765 bcont = mp->b_cont; 1766 freeb(mp); 1767 mp = save_bp->b_cont = bcont; 1768 } else { 1769 save_bp = mp; 1770 mp = mp->b_cont; 1771 } 1772 first = 0; 1773 } 1774 } else { 1775 type = mp->b_datap->db_type; 1776 while (len) { 1777 bp = mp; 1778 save_bp = NULL; 1779 1780 /* 1781 * Find the last message of same type 1782 */ 1783 while (bp && bp->b_datap->db_type == type) { 1784 ASSERT(bp->b_wptr >= bp->b_rptr); 1785 prev_bp = save_bp; 1786 save_bp = bp; 1787 bp = bp->b_cont; 1788 } 1789 if (save_bp == NULL) 1790 break; 1791 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1792 save_bp->b_wptr -= n; 1793 len -= n; 1794 1795 /* 1796 * If this is not the first message 1797 * and we have taken away everything 1798 * from this message, remove it 1799 */ 1800 1801 if ((save_bp != mp) && 1802 (save_bp->b_wptr == save_bp->b_rptr)) { 1803 bcont = save_bp->b_cont; 1804 freeb(save_bp); 1805 prev_bp->b_cont = bcont; 1806 } 1807 } 1808 } 1809 return (1); 1810 } 1811 1812 /* 1813 * get number of data bytes in message 1814 */ 1815 size_t 1816 msgdsize(mblk_t *bp) 1817 { 1818 size_t count = 0; 1819 1820 for (; bp; bp = bp->b_cont) 1821 if (bp->b_datap->db_type == M_DATA) { 1822 ASSERT(bp->b_wptr >= bp->b_rptr); 1823 count += bp->b_wptr - bp->b_rptr; 1824 } 1825 return (count); 1826 } 1827 1828 /* 1829 * Get a message off head of queue 1830 * 1831 * If queue has no buffers then mark queue 1832 * with QWANTR. (queue wants to be read by 1833 * someone when data becomes available) 1834 * 1835 * If there is something to take off then do so. 1836 * If queue falls below hi water mark turn off QFULL 1837 * flag. Decrement weighted count of queue. 1838 * Also turn off QWANTR because queue is being read. 1839 * 1840 * The queue count is maintained on a per-band basis. 1841 * Priority band 0 (normal messages) uses q_count, 1842 * q_lowat, etc. Non-zero priority bands use the 1843 * fields in their respective qband structures 1844 * (qb_count, qb_lowat, etc.) All messages appear 1845 * on the same list, linked via their b_next pointers. 1846 * q_first is the head of the list. q_count does 1847 * not reflect the size of all the messages on the 1848 * queue. It only reflects those messages in the 1849 * normal band of flow. The one exception to this 1850 * deals with high priority messages. They are in 1851 * their own conceptual "band", but are accounted 1852 * against q_count. 1853 * 1854 * If queue count is below the lo water mark and QWANTW 1855 * is set, enable the closest backq which has a service 1856 * procedure and turn off the QWANTW flag. 1857 * 1858 * getq could be built on top of rmvq, but isn't because 1859 * of performance considerations. 1860 * 1861 * A note on the use of q_count and q_mblkcnt: 1862 * q_count is the traditional byte count for messages that 1863 * have been put on a queue. Documentation tells us that 1864 * we shouldn't rely on that count, but some drivers/modules 1865 * do. What was needed, however, is a mechanism to prevent 1866 * runaway streams from consuming all of the resources, 1867 * and particularly be able to flow control zero-length 1868 * messages. q_mblkcnt is used for this purpose. It 1869 * counts the number of mblk's that are being put on 1870 * the queue. The intention here, is that each mblk should 1871 * contain one byte of data and, for the purpose of 1872 * flow-control, logically does. A queue will become 1873 * full when EITHER of these values (q_count and q_mblkcnt) 1874 * reach the highwater mark. It will clear when BOTH 1875 * of them drop below the highwater mark. And it will 1876 * backenable when BOTH of them drop below the lowwater 1877 * mark. 1878 * With this algorithm, a driver/module might be able 1879 * to find a reasonably accurate q_count, and the 1880 * framework can still try and limit resource usage. 1881 */ 1882 mblk_t * 1883 getq(queue_t *q) 1884 { 1885 mblk_t *bp; 1886 uchar_t band = 0; 1887 1888 bp = getq_noenab(q, 0); 1889 if (bp != NULL) 1890 band = bp->b_band; 1891 1892 /* 1893 * Inlined from qbackenable(). 1894 * Quick check without holding the lock. 1895 */ 1896 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1897 return (bp); 1898 1899 qbackenable(q, band); 1900 return (bp); 1901 } 1902 1903 /* 1904 * Calculate number of data bytes in a single data message block taking 1905 * multidata messages into account. 1906 */ 1907 1908 #define ADD_MBLK_SIZE(mp, size) \ 1909 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1910 (size) += MBLKL(mp); \ 1911 } else { \ 1912 uint_t pinuse; \ 1913 \ 1914 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1915 (size) += pinuse; \ 1916 } 1917 1918 /* 1919 * Returns the number of bytes in a message (a message is defined as a 1920 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we 1921 * also return the number of distinct mblks in the message. 1922 */ 1923 int 1924 mp_cont_len(mblk_t *bp, int *mblkcnt) 1925 { 1926 mblk_t *mp; 1927 int mblks = 0; 1928 int bytes = 0; 1929 1930 for (mp = bp; mp != NULL; mp = mp->b_cont) { 1931 ADD_MBLK_SIZE(mp, bytes); 1932 mblks++; 1933 } 1934 1935 if (mblkcnt != NULL) 1936 *mblkcnt = mblks; 1937 1938 return (bytes); 1939 } 1940 1941 /* 1942 * Like getq() but does not backenable. This is used by the stream 1943 * head when a putback() is likely. The caller must call qbackenable() 1944 * after it is done with accessing the queue. 1945 * The rbytes arguments to getq_noneab() allows callers to specify a 1946 * the maximum number of bytes to return. If the current amount on the 1947 * queue is less than this then the entire message will be returned. 1948 * A value of 0 returns the entire message and is equivalent to the old 1949 * default behaviour prior to the addition of the rbytes argument. 1950 */ 1951 mblk_t * 1952 getq_noenab(queue_t *q, ssize_t rbytes) 1953 { 1954 mblk_t *bp, *mp1; 1955 mblk_t *mp2 = NULL; 1956 qband_t *qbp; 1957 kthread_id_t freezer; 1958 int bytecnt = 0, mblkcnt = 0; 1959 1960 /* freezestr should allow its caller to call getq/putq */ 1961 freezer = STREAM(q)->sd_freezer; 1962 if (freezer == curthread) { 1963 ASSERT(frozenstr(q)); 1964 ASSERT(MUTEX_HELD(QLOCK(q))); 1965 } else 1966 mutex_enter(QLOCK(q)); 1967 1968 if ((bp = q->q_first) == 0) { 1969 q->q_flag |= QWANTR; 1970 } else { 1971 /* 1972 * If the caller supplied a byte threshold and there is 1973 * more than this amount on the queue then break up the 1974 * the message appropriately. We can only safely do 1975 * this for M_DATA messages. 1976 */ 1977 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) && 1978 (q->q_count > rbytes)) { 1979 /* 1980 * Inline version of mp_cont_len() which terminates 1981 * when we meet or exceed rbytes. 1982 */ 1983 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) { 1984 mblkcnt++; 1985 ADD_MBLK_SIZE(mp1, bytecnt); 1986 if (bytecnt >= rbytes) 1987 break; 1988 } 1989 /* 1990 * We need to account for the following scenarios: 1991 * 1992 * 1) Too much data in the first message: 1993 * mp1 will be the mblk which puts us over our 1994 * byte limit. 1995 * 2) Not enough data in the first message: 1996 * mp1 will be NULL. 1997 * 3) Exactly the right amount of data contained within 1998 * whole mblks: 1999 * mp1->b_cont will be where we break the message. 2000 */ 2001 if (bytecnt > rbytes) { 2002 /* 2003 * Dup/copy mp1 and put what we don't need 2004 * back onto the queue. Adjust the read/write 2005 * and continuation pointers appropriately 2006 * and decrement the current mblk count to 2007 * reflect we are putting an mblk back onto 2008 * the queue. 2009 * When adjusting the message pointers, it's 2010 * OK to use the existing bytecnt and the 2011 * requested amount (rbytes) to calculate the 2012 * the new write offset (b_wptr) of what we 2013 * are taking. However, we cannot use these 2014 * values when calculating the read offset of 2015 * the mblk we are putting back on the queue. 2016 * This is because the begining (b_rptr) of the 2017 * mblk represents some arbitrary point within 2018 * the message. 2019 * It's simplest to do this by advancing b_rptr 2020 * by the new length of mp1 as we don't have to 2021 * remember any intermediate state. 2022 */ 2023 ASSERT(mp1 != NULL); 2024 mblkcnt--; 2025 if ((mp2 = dupb(mp1)) == NULL && 2026 (mp2 = copyb(mp1)) == NULL) { 2027 bytecnt = mblkcnt = 0; 2028 goto dup_failed; 2029 } 2030 mp2->b_cont = mp1->b_cont; 2031 mp1->b_wptr -= bytecnt - rbytes; 2032 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr; 2033 mp1->b_cont = NULL; 2034 bytecnt = rbytes; 2035 } else { 2036 /* 2037 * Either there is not enough data in the first 2038 * message or there is no excess data to deal 2039 * with. If mp1 is NULL, we are taking the 2040 * whole message. No need to do anything. 2041 * Otherwise we assign mp1->b_cont to mp2 as 2042 * we will be putting this back onto the head of 2043 * the queue. 2044 */ 2045 if (mp1 != NULL) { 2046 mp2 = mp1->b_cont; 2047 mp1->b_cont = NULL; 2048 } 2049 } 2050 /* 2051 * If mp2 is not NULL then we have part of the message 2052 * to put back onto the queue. 2053 */ 2054 if (mp2 != NULL) { 2055 if ((mp2->b_next = bp->b_next) == NULL) 2056 q->q_last = mp2; 2057 else 2058 bp->b_next->b_prev = mp2; 2059 q->q_first = mp2; 2060 } else { 2061 if ((q->q_first = bp->b_next) == NULL) 2062 q->q_last = NULL; 2063 else 2064 q->q_first->b_prev = NULL; 2065 } 2066 } else { 2067 /* 2068 * Either no byte threshold was supplied, there is 2069 * not enough on the queue or we failed to 2070 * duplicate/copy a data block. In these cases we 2071 * just take the entire first message. 2072 */ 2073 dup_failed: 2074 bytecnt = mp_cont_len(bp, &mblkcnt); 2075 if ((q->q_first = bp->b_next) == NULL) 2076 q->q_last = NULL; 2077 else 2078 q->q_first->b_prev = NULL; 2079 } 2080 if (bp->b_band == 0) { 2081 q->q_count -= bytecnt; 2082 q->q_mblkcnt -= mblkcnt; 2083 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2084 (q->q_mblkcnt < q->q_hiwat))) { 2085 q->q_flag &= ~QFULL; 2086 } 2087 } else { 2088 int i; 2089 2090 ASSERT(bp->b_band <= q->q_nband); 2091 ASSERT(q->q_bandp != NULL); 2092 ASSERT(MUTEX_HELD(QLOCK(q))); 2093 qbp = q->q_bandp; 2094 i = bp->b_band; 2095 while (--i > 0) 2096 qbp = qbp->qb_next; 2097 if (qbp->qb_first == qbp->qb_last) { 2098 qbp->qb_first = NULL; 2099 qbp->qb_last = NULL; 2100 } else { 2101 qbp->qb_first = bp->b_next; 2102 } 2103 qbp->qb_count -= bytecnt; 2104 qbp->qb_mblkcnt -= mblkcnt; 2105 if (qbp->qb_mblkcnt == 0 || 2106 ((qbp->qb_count < qbp->qb_hiwat) && 2107 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2108 qbp->qb_flag &= ~QB_FULL; 2109 } 2110 } 2111 q->q_flag &= ~QWANTR; 2112 bp->b_next = NULL; 2113 bp->b_prev = NULL; 2114 } 2115 if (freezer != curthread) 2116 mutex_exit(QLOCK(q)); 2117 2118 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0); 2119 2120 return (bp); 2121 } 2122 2123 /* 2124 * Determine if a backenable is needed after removing a message in the 2125 * specified band. 2126 * NOTE: This routine assumes that something like getq_noenab() has been 2127 * already called. 2128 * 2129 * For the read side it is ok to hold sd_lock across calling this (and the 2130 * stream head often does). 2131 * But for the write side strwakeq might be invoked and it acquires sd_lock. 2132 */ 2133 void 2134 qbackenable(queue_t *q, uchar_t band) 2135 { 2136 int backenab = 0; 2137 qband_t *qbp; 2138 kthread_id_t freezer; 2139 2140 ASSERT(q); 2141 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 2142 2143 /* 2144 * Quick check without holding the lock. 2145 * OK since after getq() has lowered the q_count these flags 2146 * would not change unless either the qbackenable() is done by 2147 * another thread (which is ok) or the queue has gotten QFULL 2148 * in which case another backenable will take place when the queue 2149 * drops below q_lowat. 2150 */ 2151 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 2152 return; 2153 2154 /* freezestr should allow its caller to call getq/putq */ 2155 freezer = STREAM(q)->sd_freezer; 2156 if (freezer == curthread) { 2157 ASSERT(frozenstr(q)); 2158 ASSERT(MUTEX_HELD(QLOCK(q))); 2159 } else 2160 mutex_enter(QLOCK(q)); 2161 2162 if (band == 0) { 2163 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 2164 q->q_mblkcnt < q->q_lowat)) { 2165 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 2166 } 2167 } else { 2168 int i; 2169 2170 ASSERT((unsigned)band <= q->q_nband); 2171 ASSERT(q->q_bandp != NULL); 2172 2173 qbp = q->q_bandp; 2174 i = band; 2175 while (--i > 0) 2176 qbp = qbp->qb_next; 2177 2178 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 2179 qbp->qb_mblkcnt < qbp->qb_lowat)) { 2180 backenab = qbp->qb_flag & QB_WANTW; 2181 } 2182 } 2183 2184 if (backenab == 0) { 2185 if (freezer != curthread) 2186 mutex_exit(QLOCK(q)); 2187 return; 2188 } 2189 2190 /* Have to drop the lock across strwakeq and backenable */ 2191 if (backenab & QWANTWSYNC) 2192 q->q_flag &= ~QWANTWSYNC; 2193 if (backenab & (QWANTW|QB_WANTW)) { 2194 if (band != 0) 2195 qbp->qb_flag &= ~QB_WANTW; 2196 else { 2197 q->q_flag &= ~QWANTW; 2198 } 2199 } 2200 2201 if (freezer != curthread) 2202 mutex_exit(QLOCK(q)); 2203 2204 if (backenab & QWANTWSYNC) 2205 strwakeq(q, QWANTWSYNC); 2206 if (backenab & (QWANTW|QB_WANTW)) 2207 backenable(q, band); 2208 } 2209 2210 /* 2211 * Remove a message from a queue. The queue count and other 2212 * flow control parameters are adjusted and the back queue 2213 * enabled if necessary. 2214 * 2215 * rmvq can be called with the stream frozen, but other utility functions 2216 * holding QLOCK, and by streams modules without any locks/frozen. 2217 */ 2218 void 2219 rmvq(queue_t *q, mblk_t *mp) 2220 { 2221 ASSERT(mp != NULL); 2222 2223 rmvq_noenab(q, mp); 2224 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 2225 /* 2226 * qbackenable can handle a frozen stream but not a "random" 2227 * qlock being held. Drop lock across qbackenable. 2228 */ 2229 mutex_exit(QLOCK(q)); 2230 qbackenable(q, mp->b_band); 2231 mutex_enter(QLOCK(q)); 2232 } else { 2233 qbackenable(q, mp->b_band); 2234 } 2235 } 2236 2237 /* 2238 * Like rmvq() but without any backenabling. 2239 * This exists to handle SR_CONSOL_DATA in strrput(). 2240 */ 2241 void 2242 rmvq_noenab(queue_t *q, mblk_t *mp) 2243 { 2244 int i; 2245 qband_t *qbp = NULL; 2246 kthread_id_t freezer; 2247 int bytecnt = 0, mblkcnt = 0; 2248 2249 freezer = STREAM(q)->sd_freezer; 2250 if (freezer == curthread) { 2251 ASSERT(frozenstr(q)); 2252 ASSERT(MUTEX_HELD(QLOCK(q))); 2253 } else if (MUTEX_HELD(QLOCK(q))) { 2254 /* Don't drop lock on exit */ 2255 freezer = curthread; 2256 } else 2257 mutex_enter(QLOCK(q)); 2258 2259 ASSERT(mp->b_band <= q->q_nband); 2260 if (mp->b_band != 0) { /* Adjust band pointers */ 2261 ASSERT(q->q_bandp != NULL); 2262 qbp = q->q_bandp; 2263 i = mp->b_band; 2264 while (--i > 0) 2265 qbp = qbp->qb_next; 2266 if (mp == qbp->qb_first) { 2267 if (mp->b_next && mp->b_band == mp->b_next->b_band) 2268 qbp->qb_first = mp->b_next; 2269 else 2270 qbp->qb_first = NULL; 2271 } 2272 if (mp == qbp->qb_last) { 2273 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 2274 qbp->qb_last = mp->b_prev; 2275 else 2276 qbp->qb_last = NULL; 2277 } 2278 } 2279 2280 /* 2281 * Remove the message from the list. 2282 */ 2283 if (mp->b_prev) 2284 mp->b_prev->b_next = mp->b_next; 2285 else 2286 q->q_first = mp->b_next; 2287 if (mp->b_next) 2288 mp->b_next->b_prev = mp->b_prev; 2289 else 2290 q->q_last = mp->b_prev; 2291 mp->b_next = NULL; 2292 mp->b_prev = NULL; 2293 2294 /* Get the size of the message for q_count accounting */ 2295 bytecnt = mp_cont_len(mp, &mblkcnt); 2296 2297 if (mp->b_band == 0) { /* Perform q_count accounting */ 2298 q->q_count -= bytecnt; 2299 q->q_mblkcnt -= mblkcnt; 2300 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2301 (q->q_mblkcnt < q->q_hiwat))) { 2302 q->q_flag &= ~QFULL; 2303 } 2304 } else { /* Perform qb_count accounting */ 2305 qbp->qb_count -= bytecnt; 2306 qbp->qb_mblkcnt -= mblkcnt; 2307 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) && 2308 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2309 qbp->qb_flag &= ~QB_FULL; 2310 } 2311 } 2312 if (freezer != curthread) 2313 mutex_exit(QLOCK(q)); 2314 2315 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0); 2316 } 2317 2318 /* 2319 * Empty a queue. 2320 * If flag is set, remove all messages. Otherwise, remove 2321 * only non-control messages. If queue falls below its low 2322 * water mark, and QWANTW is set, enable the nearest upstream 2323 * service procedure. 2324 * 2325 * Historical note: when merging the M_FLUSH code in strrput with this 2326 * code one difference was discovered. flushq did not have a check 2327 * for q_lowat == 0 in the backenabling test. 2328 * 2329 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2330 * if one exists on the queue. 2331 */ 2332 void 2333 flushq_common(queue_t *q, int flag, int pcproto_flag) 2334 { 2335 mblk_t *mp, *nmp; 2336 qband_t *qbp; 2337 int backenab = 0; 2338 unsigned char bpri; 2339 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2340 2341 if (q->q_first == NULL) 2342 return; 2343 2344 mutex_enter(QLOCK(q)); 2345 mp = q->q_first; 2346 q->q_first = NULL; 2347 q->q_last = NULL; 2348 q->q_count = 0; 2349 q->q_mblkcnt = 0; 2350 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2351 qbp->qb_first = NULL; 2352 qbp->qb_last = NULL; 2353 qbp->qb_count = 0; 2354 qbp->qb_mblkcnt = 0; 2355 qbp->qb_flag &= ~QB_FULL; 2356 } 2357 q->q_flag &= ~QFULL; 2358 mutex_exit(QLOCK(q)); 2359 while (mp) { 2360 nmp = mp->b_next; 2361 mp->b_next = mp->b_prev = NULL; 2362 2363 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0); 2364 2365 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2366 (void) putq(q, mp); 2367 else if (flag || datamsg(mp->b_datap->db_type)) 2368 freemsg(mp); 2369 else 2370 (void) putq(q, mp); 2371 mp = nmp; 2372 } 2373 bpri = 1; 2374 mutex_enter(QLOCK(q)); 2375 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2376 if ((qbp->qb_flag & QB_WANTW) && 2377 (((qbp->qb_count < qbp->qb_lowat) && 2378 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2379 qbp->qb_lowat == 0)) { 2380 qbp->qb_flag &= ~QB_WANTW; 2381 backenab = 1; 2382 qbf[bpri] = 1; 2383 } else 2384 qbf[bpri] = 0; 2385 bpri++; 2386 } 2387 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2388 if ((q->q_flag & QWANTW) && 2389 (((q->q_count < q->q_lowat) && 2390 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2391 q->q_flag &= ~QWANTW; 2392 backenab = 1; 2393 qbf[0] = 1; 2394 } else 2395 qbf[0] = 0; 2396 2397 /* 2398 * If any band can now be written to, and there is a writer 2399 * for that band, then backenable the closest service procedure. 2400 */ 2401 if (backenab) { 2402 mutex_exit(QLOCK(q)); 2403 for (bpri = q->q_nband; bpri != 0; bpri--) 2404 if (qbf[bpri]) 2405 backenable(q, bpri); 2406 if (qbf[0]) 2407 backenable(q, 0); 2408 } else 2409 mutex_exit(QLOCK(q)); 2410 } 2411 2412 /* 2413 * The real flushing takes place in flushq_common. This is done so that 2414 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2415 * or not. Currently the only place that uses this flag is the stream head. 2416 */ 2417 void 2418 flushq(queue_t *q, int flag) 2419 { 2420 flushq_common(q, flag, 0); 2421 } 2422 2423 /* 2424 * Flush the queue of messages of the given priority band. 2425 * There is some duplication of code between flushq and flushband. 2426 * This is because we want to optimize the code as much as possible. 2427 * The assumption is that there will be more messages in the normal 2428 * (priority 0) band than in any other. 2429 * 2430 * Historical note: when merging the M_FLUSH code in strrput with this 2431 * code one difference was discovered. flushband had an extra check for 2432 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2433 * case. That check does not match the man page for flushband and was not 2434 * in the strrput flush code hence it was removed. 2435 */ 2436 void 2437 flushband(queue_t *q, unsigned char pri, int flag) 2438 { 2439 mblk_t *mp; 2440 mblk_t *nmp; 2441 mblk_t *last; 2442 qband_t *qbp; 2443 int band; 2444 2445 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2446 if (pri > q->q_nband) { 2447 return; 2448 } 2449 mutex_enter(QLOCK(q)); 2450 if (pri == 0) { 2451 mp = q->q_first; 2452 q->q_first = NULL; 2453 q->q_last = NULL; 2454 q->q_count = 0; 2455 q->q_mblkcnt = 0; 2456 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2457 qbp->qb_first = NULL; 2458 qbp->qb_last = NULL; 2459 qbp->qb_count = 0; 2460 qbp->qb_mblkcnt = 0; 2461 qbp->qb_flag &= ~QB_FULL; 2462 } 2463 q->q_flag &= ~QFULL; 2464 mutex_exit(QLOCK(q)); 2465 while (mp) { 2466 nmp = mp->b_next; 2467 mp->b_next = mp->b_prev = NULL; 2468 if ((mp->b_band == 0) && 2469 ((flag == FLUSHALL) || 2470 datamsg(mp->b_datap->db_type))) 2471 freemsg(mp); 2472 else 2473 (void) putq(q, mp); 2474 mp = nmp; 2475 } 2476 mutex_enter(QLOCK(q)); 2477 if ((q->q_flag & QWANTW) && 2478 (((q->q_count < q->q_lowat) && 2479 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2480 q->q_flag &= ~QWANTW; 2481 mutex_exit(QLOCK(q)); 2482 2483 backenable(q, pri); 2484 } else 2485 mutex_exit(QLOCK(q)); 2486 } else { /* pri != 0 */ 2487 boolean_t flushed = B_FALSE; 2488 band = pri; 2489 2490 ASSERT(MUTEX_HELD(QLOCK(q))); 2491 qbp = q->q_bandp; 2492 while (--band > 0) 2493 qbp = qbp->qb_next; 2494 mp = qbp->qb_first; 2495 if (mp == NULL) { 2496 mutex_exit(QLOCK(q)); 2497 return; 2498 } 2499 last = qbp->qb_last->b_next; 2500 /* 2501 * rmvq_noenab() and freemsg() are called for each mblk that 2502 * meets the criteria. The loop is executed until the last 2503 * mblk has been processed. 2504 */ 2505 while (mp != last) { 2506 ASSERT(mp->b_band == pri); 2507 nmp = mp->b_next; 2508 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2509 rmvq_noenab(q, mp); 2510 freemsg(mp); 2511 flushed = B_TRUE; 2512 } 2513 mp = nmp; 2514 } 2515 mutex_exit(QLOCK(q)); 2516 2517 /* 2518 * If any mblk(s) has been freed, we know that qbackenable() 2519 * will need to be called. 2520 */ 2521 if (flushed) 2522 qbackenable(q, pri); 2523 } 2524 } 2525 2526 /* 2527 * Return 1 if the queue is not full. If the queue is full, return 2528 * 0 (may not put message) and set QWANTW flag (caller wants to write 2529 * to the queue). 2530 */ 2531 int 2532 canput(queue_t *q) 2533 { 2534 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2535 2536 /* this is for loopback transports, they should not do a canput */ 2537 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2538 2539 /* Find next forward module that has a service procedure */ 2540 q = q->q_nfsrv; 2541 2542 if (!(q->q_flag & QFULL)) { 2543 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2544 return (1); 2545 } 2546 mutex_enter(QLOCK(q)); 2547 if (q->q_flag & QFULL) { 2548 q->q_flag |= QWANTW; 2549 mutex_exit(QLOCK(q)); 2550 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2551 return (0); 2552 } 2553 mutex_exit(QLOCK(q)); 2554 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2555 return (1); 2556 } 2557 2558 /* 2559 * This is the new canput for use with priority bands. Return 1 if the 2560 * band is not full. If the band is full, return 0 (may not put message) 2561 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2562 * write to the queue). 2563 */ 2564 int 2565 bcanput(queue_t *q, unsigned char pri) 2566 { 2567 qband_t *qbp; 2568 2569 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2570 if (!q) 2571 return (0); 2572 2573 /* Find next forward module that has a service procedure */ 2574 q = q->q_nfsrv; 2575 2576 mutex_enter(QLOCK(q)); 2577 if (pri == 0) { 2578 if (q->q_flag & QFULL) { 2579 q->q_flag |= QWANTW; 2580 mutex_exit(QLOCK(q)); 2581 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2582 "bcanput:%p %X %d", q, pri, 0); 2583 return (0); 2584 } 2585 } else { /* pri != 0 */ 2586 if (pri > q->q_nband) { 2587 /* 2588 * No band exists yet, so return success. 2589 */ 2590 mutex_exit(QLOCK(q)); 2591 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2592 "bcanput:%p %X %d", q, pri, 1); 2593 return (1); 2594 } 2595 qbp = q->q_bandp; 2596 while (--pri) 2597 qbp = qbp->qb_next; 2598 if (qbp->qb_flag & QB_FULL) { 2599 qbp->qb_flag |= QB_WANTW; 2600 mutex_exit(QLOCK(q)); 2601 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2602 "bcanput:%p %X %d", q, pri, 0); 2603 return (0); 2604 } 2605 } 2606 mutex_exit(QLOCK(q)); 2607 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2608 "bcanput:%p %X %d", q, pri, 1); 2609 return (1); 2610 } 2611 2612 /* 2613 * Put a message on a queue. 2614 * 2615 * Messages are enqueued on a priority basis. The priority classes 2616 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2617 * and B_NORMAL (type < QPCTL && band == 0). 2618 * 2619 * Add appropriate weighted data block sizes to queue count. 2620 * If queue hits high water mark then set QFULL flag. 2621 * 2622 * If QNOENAB is not set (putq is allowed to enable the queue), 2623 * enable the queue only if the message is PRIORITY, 2624 * or the QWANTR flag is set (indicating that the service procedure 2625 * is ready to read the queue. This implies that a service 2626 * procedure must NEVER put a high priority message back on its own 2627 * queue, as this would result in an infinite loop (!). 2628 */ 2629 int 2630 putq(queue_t *q, mblk_t *bp) 2631 { 2632 mblk_t *tmp; 2633 qband_t *qbp = NULL; 2634 int mcls = (int)queclass(bp); 2635 kthread_id_t freezer; 2636 int bytecnt = 0, mblkcnt = 0; 2637 2638 freezer = STREAM(q)->sd_freezer; 2639 if (freezer == curthread) { 2640 ASSERT(frozenstr(q)); 2641 ASSERT(MUTEX_HELD(QLOCK(q))); 2642 } else 2643 mutex_enter(QLOCK(q)); 2644 2645 /* 2646 * Make sanity checks and if qband structure is not yet 2647 * allocated, do so. 2648 */ 2649 if (mcls == QPCTL) { 2650 if (bp->b_band != 0) 2651 bp->b_band = 0; /* force to be correct */ 2652 } else if (bp->b_band != 0) { 2653 int i; 2654 qband_t **qbpp; 2655 2656 if (bp->b_band > q->q_nband) { 2657 2658 /* 2659 * The qband structure for this priority band is 2660 * not on the queue yet, so we have to allocate 2661 * one on the fly. It would be wasteful to 2662 * associate the qband structures with every 2663 * queue when the queues are allocated. This is 2664 * because most queues will only need the normal 2665 * band of flow which can be described entirely 2666 * by the queue itself. 2667 */ 2668 qbpp = &q->q_bandp; 2669 while (*qbpp) 2670 qbpp = &(*qbpp)->qb_next; 2671 while (bp->b_band > q->q_nband) { 2672 if ((*qbpp = allocband()) == NULL) { 2673 if (freezer != curthread) 2674 mutex_exit(QLOCK(q)); 2675 return (0); 2676 } 2677 (*qbpp)->qb_hiwat = q->q_hiwat; 2678 (*qbpp)->qb_lowat = q->q_lowat; 2679 q->q_nband++; 2680 qbpp = &(*qbpp)->qb_next; 2681 } 2682 } 2683 ASSERT(MUTEX_HELD(QLOCK(q))); 2684 qbp = q->q_bandp; 2685 i = bp->b_band; 2686 while (--i) 2687 qbp = qbp->qb_next; 2688 } 2689 2690 /* 2691 * If queue is empty, add the message and initialize the pointers. 2692 * Otherwise, adjust message pointers and queue pointers based on 2693 * the type of the message and where it belongs on the queue. Some 2694 * code is duplicated to minimize the number of conditionals and 2695 * hopefully minimize the amount of time this routine takes. 2696 */ 2697 if (!q->q_first) { 2698 bp->b_next = NULL; 2699 bp->b_prev = NULL; 2700 q->q_first = bp; 2701 q->q_last = bp; 2702 if (qbp) { 2703 qbp->qb_first = bp; 2704 qbp->qb_last = bp; 2705 } 2706 } else if (!qbp) { /* bp->b_band == 0 */ 2707 2708 /* 2709 * If queue class of message is less than or equal to 2710 * that of the last one on the queue, tack on to the end. 2711 */ 2712 tmp = q->q_last; 2713 if (mcls <= (int)queclass(tmp)) { 2714 bp->b_next = NULL; 2715 bp->b_prev = tmp; 2716 tmp->b_next = bp; 2717 q->q_last = bp; 2718 } else { 2719 tmp = q->q_first; 2720 while ((int)queclass(tmp) >= mcls) 2721 tmp = tmp->b_next; 2722 2723 /* 2724 * Insert bp before tmp. 2725 */ 2726 bp->b_next = tmp; 2727 bp->b_prev = tmp->b_prev; 2728 if (tmp->b_prev) 2729 tmp->b_prev->b_next = bp; 2730 else 2731 q->q_first = bp; 2732 tmp->b_prev = bp; 2733 } 2734 } else { /* bp->b_band != 0 */ 2735 if (qbp->qb_first) { 2736 tmp = qbp->qb_last; 2737 2738 /* 2739 * Insert bp after the last message in this band. 2740 */ 2741 bp->b_next = tmp->b_next; 2742 if (tmp->b_next) 2743 tmp->b_next->b_prev = bp; 2744 else 2745 q->q_last = bp; 2746 bp->b_prev = tmp; 2747 tmp->b_next = bp; 2748 } else { 2749 tmp = q->q_last; 2750 if ((mcls < (int)queclass(tmp)) || 2751 (bp->b_band <= tmp->b_band)) { 2752 2753 /* 2754 * Tack bp on end of queue. 2755 */ 2756 bp->b_next = NULL; 2757 bp->b_prev = tmp; 2758 tmp->b_next = bp; 2759 q->q_last = bp; 2760 } else { 2761 tmp = q->q_first; 2762 while (tmp->b_datap->db_type >= QPCTL) 2763 tmp = tmp->b_next; 2764 while (tmp->b_band >= bp->b_band) 2765 tmp = tmp->b_next; 2766 2767 /* 2768 * Insert bp before tmp. 2769 */ 2770 bp->b_next = tmp; 2771 bp->b_prev = tmp->b_prev; 2772 if (tmp->b_prev) 2773 tmp->b_prev->b_next = bp; 2774 else 2775 q->q_first = bp; 2776 tmp->b_prev = bp; 2777 } 2778 qbp->qb_first = bp; 2779 } 2780 qbp->qb_last = bp; 2781 } 2782 2783 /* Get message byte count for q_count accounting */ 2784 bytecnt = mp_cont_len(bp, &mblkcnt); 2785 2786 if (qbp) { 2787 qbp->qb_count += bytecnt; 2788 qbp->qb_mblkcnt += mblkcnt; 2789 if ((qbp->qb_count >= qbp->qb_hiwat) || 2790 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2791 qbp->qb_flag |= QB_FULL; 2792 } 2793 } else { 2794 q->q_count += bytecnt; 2795 q->q_mblkcnt += mblkcnt; 2796 if ((q->q_count >= q->q_hiwat) || 2797 (q->q_mblkcnt >= q->q_hiwat)) { 2798 q->q_flag |= QFULL; 2799 } 2800 } 2801 2802 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0); 2803 2804 if ((mcls > QNORM) || 2805 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2806 qenable_locked(q); 2807 ASSERT(MUTEX_HELD(QLOCK(q))); 2808 if (freezer != curthread) 2809 mutex_exit(QLOCK(q)); 2810 2811 return (1); 2812 } 2813 2814 /* 2815 * Put stuff back at beginning of Q according to priority order. 2816 * See comment on putq above for details. 2817 */ 2818 int 2819 putbq(queue_t *q, mblk_t *bp) 2820 { 2821 mblk_t *tmp; 2822 qband_t *qbp = NULL; 2823 int mcls = (int)queclass(bp); 2824 kthread_id_t freezer; 2825 int bytecnt = 0, mblkcnt = 0; 2826 2827 ASSERT(q && bp); 2828 ASSERT(bp->b_next == NULL); 2829 freezer = STREAM(q)->sd_freezer; 2830 if (freezer == curthread) { 2831 ASSERT(frozenstr(q)); 2832 ASSERT(MUTEX_HELD(QLOCK(q))); 2833 } else 2834 mutex_enter(QLOCK(q)); 2835 2836 /* 2837 * Make sanity checks and if qband structure is not yet 2838 * allocated, do so. 2839 */ 2840 if (mcls == QPCTL) { 2841 if (bp->b_band != 0) 2842 bp->b_band = 0; /* force to be correct */ 2843 } else if (bp->b_band != 0) { 2844 int i; 2845 qband_t **qbpp; 2846 2847 if (bp->b_band > q->q_nband) { 2848 qbpp = &q->q_bandp; 2849 while (*qbpp) 2850 qbpp = &(*qbpp)->qb_next; 2851 while (bp->b_band > q->q_nband) { 2852 if ((*qbpp = allocband()) == NULL) { 2853 if (freezer != curthread) 2854 mutex_exit(QLOCK(q)); 2855 return (0); 2856 } 2857 (*qbpp)->qb_hiwat = q->q_hiwat; 2858 (*qbpp)->qb_lowat = q->q_lowat; 2859 q->q_nband++; 2860 qbpp = &(*qbpp)->qb_next; 2861 } 2862 } 2863 qbp = q->q_bandp; 2864 i = bp->b_band; 2865 while (--i) 2866 qbp = qbp->qb_next; 2867 } 2868 2869 /* 2870 * If queue is empty or if message is high priority, 2871 * place on the front of the queue. 2872 */ 2873 tmp = q->q_first; 2874 if ((!tmp) || (mcls == QPCTL)) { 2875 bp->b_next = tmp; 2876 if (tmp) 2877 tmp->b_prev = bp; 2878 else 2879 q->q_last = bp; 2880 q->q_first = bp; 2881 bp->b_prev = NULL; 2882 if (qbp) { 2883 qbp->qb_first = bp; 2884 qbp->qb_last = bp; 2885 } 2886 } else if (qbp) { /* bp->b_band != 0 */ 2887 tmp = qbp->qb_first; 2888 if (tmp) { 2889 2890 /* 2891 * Insert bp before the first message in this band. 2892 */ 2893 bp->b_next = tmp; 2894 bp->b_prev = tmp->b_prev; 2895 if (tmp->b_prev) 2896 tmp->b_prev->b_next = bp; 2897 else 2898 q->q_first = bp; 2899 tmp->b_prev = bp; 2900 } else { 2901 tmp = q->q_last; 2902 if ((mcls < (int)queclass(tmp)) || 2903 (bp->b_band < tmp->b_band)) { 2904 2905 /* 2906 * Tack bp on end of queue. 2907 */ 2908 bp->b_next = NULL; 2909 bp->b_prev = tmp; 2910 tmp->b_next = bp; 2911 q->q_last = bp; 2912 } else { 2913 tmp = q->q_first; 2914 while (tmp->b_datap->db_type >= QPCTL) 2915 tmp = tmp->b_next; 2916 while (tmp->b_band > bp->b_band) 2917 tmp = tmp->b_next; 2918 2919 /* 2920 * Insert bp before tmp. 2921 */ 2922 bp->b_next = tmp; 2923 bp->b_prev = tmp->b_prev; 2924 if (tmp->b_prev) 2925 tmp->b_prev->b_next = bp; 2926 else 2927 q->q_first = bp; 2928 tmp->b_prev = bp; 2929 } 2930 qbp->qb_last = bp; 2931 } 2932 qbp->qb_first = bp; 2933 } else { /* bp->b_band == 0 && !QPCTL */ 2934 2935 /* 2936 * If the queue class or band is less than that of the last 2937 * message on the queue, tack bp on the end of the queue. 2938 */ 2939 tmp = q->q_last; 2940 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2941 bp->b_next = NULL; 2942 bp->b_prev = tmp; 2943 tmp->b_next = bp; 2944 q->q_last = bp; 2945 } else { 2946 tmp = q->q_first; 2947 while (tmp->b_datap->db_type >= QPCTL) 2948 tmp = tmp->b_next; 2949 while (tmp->b_band > bp->b_band) 2950 tmp = tmp->b_next; 2951 2952 /* 2953 * Insert bp before tmp. 2954 */ 2955 bp->b_next = tmp; 2956 bp->b_prev = tmp->b_prev; 2957 if (tmp->b_prev) 2958 tmp->b_prev->b_next = bp; 2959 else 2960 q->q_first = bp; 2961 tmp->b_prev = bp; 2962 } 2963 } 2964 2965 /* Get message byte count for q_count accounting */ 2966 bytecnt = mp_cont_len(bp, &mblkcnt); 2967 2968 if (qbp) { 2969 qbp->qb_count += bytecnt; 2970 qbp->qb_mblkcnt += mblkcnt; 2971 if ((qbp->qb_count >= qbp->qb_hiwat) || 2972 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2973 qbp->qb_flag |= QB_FULL; 2974 } 2975 } else { 2976 q->q_count += bytecnt; 2977 q->q_mblkcnt += mblkcnt; 2978 if ((q->q_count >= q->q_hiwat) || 2979 (q->q_mblkcnt >= q->q_hiwat)) { 2980 q->q_flag |= QFULL; 2981 } 2982 } 2983 2984 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0); 2985 2986 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2987 qenable_locked(q); 2988 ASSERT(MUTEX_HELD(QLOCK(q))); 2989 if (freezer != curthread) 2990 mutex_exit(QLOCK(q)); 2991 2992 return (1); 2993 } 2994 2995 /* 2996 * Insert a message before an existing message on the queue. If the 2997 * existing message is NULL, the new messages is placed on the end of 2998 * the queue. The queue class of the new message is ignored. However, 2999 * the priority band of the new message must adhere to the following 3000 * ordering: 3001 * 3002 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 3003 * 3004 * All flow control parameters are updated. 3005 * 3006 * insq can be called with the stream frozen, but other utility functions 3007 * holding QLOCK, and by streams modules without any locks/frozen. 3008 */ 3009 int 3010 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 3011 { 3012 mblk_t *tmp; 3013 qband_t *qbp = NULL; 3014 int mcls = (int)queclass(mp); 3015 kthread_id_t freezer; 3016 int bytecnt = 0, mblkcnt = 0; 3017 3018 freezer = STREAM(q)->sd_freezer; 3019 if (freezer == curthread) { 3020 ASSERT(frozenstr(q)); 3021 ASSERT(MUTEX_HELD(QLOCK(q))); 3022 } else if (MUTEX_HELD(QLOCK(q))) { 3023 /* Don't drop lock on exit */ 3024 freezer = curthread; 3025 } else 3026 mutex_enter(QLOCK(q)); 3027 3028 if (mcls == QPCTL) { 3029 if (mp->b_band != 0) 3030 mp->b_band = 0; /* force to be correct */ 3031 if (emp && emp->b_prev && 3032 (emp->b_prev->b_datap->db_type < QPCTL)) 3033 goto badord; 3034 } 3035 if (emp) { 3036 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 3037 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 3038 (emp->b_prev->b_band < mp->b_band))) { 3039 goto badord; 3040 } 3041 } else { 3042 tmp = q->q_last; 3043 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 3044 badord: 3045 cmn_err(CE_WARN, 3046 "insq: attempt to insert message out of order " 3047 "on q %p", (void *)q); 3048 if (freezer != curthread) 3049 mutex_exit(QLOCK(q)); 3050 return (0); 3051 } 3052 } 3053 3054 if (mp->b_band != 0) { 3055 int i; 3056 qband_t **qbpp; 3057 3058 if (mp->b_band > q->q_nband) { 3059 qbpp = &q->q_bandp; 3060 while (*qbpp) 3061 qbpp = &(*qbpp)->qb_next; 3062 while (mp->b_band > q->q_nband) { 3063 if ((*qbpp = allocband()) == NULL) { 3064 if (freezer != curthread) 3065 mutex_exit(QLOCK(q)); 3066 return (0); 3067 } 3068 (*qbpp)->qb_hiwat = q->q_hiwat; 3069 (*qbpp)->qb_lowat = q->q_lowat; 3070 q->q_nband++; 3071 qbpp = &(*qbpp)->qb_next; 3072 } 3073 } 3074 qbp = q->q_bandp; 3075 i = mp->b_band; 3076 while (--i) 3077 qbp = qbp->qb_next; 3078 } 3079 3080 if ((mp->b_next = emp) != NULL) { 3081 if ((mp->b_prev = emp->b_prev) != NULL) 3082 emp->b_prev->b_next = mp; 3083 else 3084 q->q_first = mp; 3085 emp->b_prev = mp; 3086 } else { 3087 if ((mp->b_prev = q->q_last) != NULL) 3088 q->q_last->b_next = mp; 3089 else 3090 q->q_first = mp; 3091 q->q_last = mp; 3092 } 3093 3094 /* Get mblk and byte count for q_count accounting */ 3095 bytecnt = mp_cont_len(mp, &mblkcnt); 3096 3097 if (qbp) { /* adjust qband pointers and count */ 3098 if (!qbp->qb_first) { 3099 qbp->qb_first = mp; 3100 qbp->qb_last = mp; 3101 } else { 3102 if (mp->b_prev == NULL || (mp->b_prev != NULL && 3103 (mp->b_prev->b_band != mp->b_band))) 3104 qbp->qb_first = mp; 3105 else if (mp->b_next == NULL || (mp->b_next != NULL && 3106 (mp->b_next->b_band != mp->b_band))) 3107 qbp->qb_last = mp; 3108 } 3109 qbp->qb_count += bytecnt; 3110 qbp->qb_mblkcnt += mblkcnt; 3111 if ((qbp->qb_count >= qbp->qb_hiwat) || 3112 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 3113 qbp->qb_flag |= QB_FULL; 3114 } 3115 } else { 3116 q->q_count += bytecnt; 3117 q->q_mblkcnt += mblkcnt; 3118 if ((q->q_count >= q->q_hiwat) || 3119 (q->q_mblkcnt >= q->q_hiwat)) { 3120 q->q_flag |= QFULL; 3121 } 3122 } 3123 3124 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0); 3125 3126 if (canenable(q) && (q->q_flag & QWANTR)) 3127 qenable_locked(q); 3128 3129 ASSERT(MUTEX_HELD(QLOCK(q))); 3130 if (freezer != curthread) 3131 mutex_exit(QLOCK(q)); 3132 3133 return (1); 3134 } 3135 3136 /* 3137 * Create and put a control message on queue. 3138 */ 3139 int 3140 putctl(queue_t *q, int type) 3141 { 3142 mblk_t *bp; 3143 3144 if ((datamsg(type) && (type != M_DELAY)) || 3145 (bp = allocb_tryhard(0)) == NULL) 3146 return (0); 3147 bp->b_datap->db_type = (unsigned char) type; 3148 3149 put(q, bp); 3150 3151 return (1); 3152 } 3153 3154 /* 3155 * Control message with a single-byte parameter 3156 */ 3157 int 3158 putctl1(queue_t *q, int type, int param) 3159 { 3160 mblk_t *bp; 3161 3162 if ((datamsg(type) && (type != M_DELAY)) || 3163 (bp = allocb_tryhard(1)) == NULL) 3164 return (0); 3165 bp->b_datap->db_type = (unsigned char)type; 3166 *bp->b_wptr++ = (unsigned char)param; 3167 3168 put(q, bp); 3169 3170 return (1); 3171 } 3172 3173 int 3174 putnextctl1(queue_t *q, int type, int param) 3175 { 3176 mblk_t *bp; 3177 3178 if ((datamsg(type) && (type != M_DELAY)) || 3179 ((bp = allocb_tryhard(1)) == NULL)) 3180 return (0); 3181 3182 bp->b_datap->db_type = (unsigned char)type; 3183 *bp->b_wptr++ = (unsigned char)param; 3184 3185 putnext(q, bp); 3186 3187 return (1); 3188 } 3189 3190 int 3191 putnextctl(queue_t *q, int type) 3192 { 3193 mblk_t *bp; 3194 3195 if ((datamsg(type) && (type != M_DELAY)) || 3196 ((bp = allocb_tryhard(0)) == NULL)) 3197 return (0); 3198 bp->b_datap->db_type = (unsigned char)type; 3199 3200 putnext(q, bp); 3201 3202 return (1); 3203 } 3204 3205 /* 3206 * Return the queue upstream from this one 3207 */ 3208 queue_t * 3209 backq(queue_t *q) 3210 { 3211 q = _OTHERQ(q); 3212 if (q->q_next) { 3213 q = q->q_next; 3214 return (_OTHERQ(q)); 3215 } 3216 return (NULL); 3217 } 3218 3219 /* 3220 * Send a block back up the queue in reverse from this 3221 * one (e.g. to respond to ioctls) 3222 */ 3223 void 3224 qreply(queue_t *q, mblk_t *bp) 3225 { 3226 ASSERT(q && bp); 3227 3228 putnext(_OTHERQ(q), bp); 3229 } 3230 3231 /* 3232 * Streams Queue Scheduling 3233 * 3234 * Queues are enabled through qenable() when they have messages to 3235 * process. They are serviced by queuerun(), which runs each enabled 3236 * queue's service procedure. The call to queuerun() is processor 3237 * dependent - the general principle is that it be run whenever a queue 3238 * is enabled but before returning to user level. For system calls, 3239 * the function runqueues() is called if their action causes a queue 3240 * to be enabled. For device interrupts, queuerun() should be 3241 * called before returning from the last level of interrupt. Beyond 3242 * this, no timing assumptions should be made about queue scheduling. 3243 */ 3244 3245 /* 3246 * Enable a queue: put it on list of those whose service procedures are 3247 * ready to run and set up the scheduling mechanism. 3248 * The broadcast is done outside the mutex -> to avoid the woken thread 3249 * from contending with the mutex. This is OK 'cos the queue has been 3250 * enqueued on the runlist and flagged safely at this point. 3251 */ 3252 void 3253 qenable(queue_t *q) 3254 { 3255 mutex_enter(QLOCK(q)); 3256 qenable_locked(q); 3257 mutex_exit(QLOCK(q)); 3258 } 3259 /* 3260 * Return number of messages on queue 3261 */ 3262 int 3263 qsize(queue_t *qp) 3264 { 3265 int count = 0; 3266 mblk_t *mp; 3267 3268 mutex_enter(QLOCK(qp)); 3269 for (mp = qp->q_first; mp; mp = mp->b_next) 3270 count++; 3271 mutex_exit(QLOCK(qp)); 3272 return (count); 3273 } 3274 3275 /* 3276 * noenable - set queue so that putq() will not enable it. 3277 * enableok - set queue so that putq() can enable it. 3278 */ 3279 void 3280 noenable(queue_t *q) 3281 { 3282 mutex_enter(QLOCK(q)); 3283 q->q_flag |= QNOENB; 3284 mutex_exit(QLOCK(q)); 3285 } 3286 3287 void 3288 enableok(queue_t *q) 3289 { 3290 mutex_enter(QLOCK(q)); 3291 q->q_flag &= ~QNOENB; 3292 mutex_exit(QLOCK(q)); 3293 } 3294 3295 /* 3296 * Set queue fields. 3297 */ 3298 int 3299 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 3300 { 3301 qband_t *qbp = NULL; 3302 queue_t *wrq; 3303 int error = 0; 3304 kthread_id_t freezer; 3305 3306 freezer = STREAM(q)->sd_freezer; 3307 if (freezer == curthread) { 3308 ASSERT(frozenstr(q)); 3309 ASSERT(MUTEX_HELD(QLOCK(q))); 3310 } else 3311 mutex_enter(QLOCK(q)); 3312 3313 if (what >= QBAD) { 3314 error = EINVAL; 3315 goto done; 3316 } 3317 if (pri != 0) { 3318 int i; 3319 qband_t **qbpp; 3320 3321 if (pri > q->q_nband) { 3322 qbpp = &q->q_bandp; 3323 while (*qbpp) 3324 qbpp = &(*qbpp)->qb_next; 3325 while (pri > q->q_nband) { 3326 if ((*qbpp = allocband()) == NULL) { 3327 error = EAGAIN; 3328 goto done; 3329 } 3330 (*qbpp)->qb_hiwat = q->q_hiwat; 3331 (*qbpp)->qb_lowat = q->q_lowat; 3332 q->q_nband++; 3333 qbpp = &(*qbpp)->qb_next; 3334 } 3335 } 3336 qbp = q->q_bandp; 3337 i = pri; 3338 while (--i) 3339 qbp = qbp->qb_next; 3340 } 3341 switch (what) { 3342 3343 case QHIWAT: 3344 if (qbp) 3345 qbp->qb_hiwat = (size_t)val; 3346 else 3347 q->q_hiwat = (size_t)val; 3348 break; 3349 3350 case QLOWAT: 3351 if (qbp) 3352 qbp->qb_lowat = (size_t)val; 3353 else 3354 q->q_lowat = (size_t)val; 3355 break; 3356 3357 case QMAXPSZ: 3358 if (qbp) 3359 error = EINVAL; 3360 else 3361 q->q_maxpsz = (ssize_t)val; 3362 3363 /* 3364 * Performance concern, strwrite looks at the module below 3365 * the stream head for the maxpsz each time it does a write 3366 * we now cache it at the stream head. Check to see if this 3367 * queue is sitting directly below the stream head. 3368 */ 3369 wrq = STREAM(q)->sd_wrq; 3370 if (q != wrq->q_next) 3371 break; 3372 3373 /* 3374 * If the stream is not frozen drop the current QLOCK and 3375 * acquire the sd_wrq QLOCK which protects sd_qn_* 3376 */ 3377 if (freezer != curthread) { 3378 mutex_exit(QLOCK(q)); 3379 mutex_enter(QLOCK(wrq)); 3380 } 3381 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3382 3383 if (strmsgsz != 0) { 3384 if (val == INFPSZ) 3385 val = strmsgsz; 3386 else { 3387 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3388 val = MIN(PIPE_BUF, val); 3389 else 3390 val = MIN(strmsgsz, val); 3391 } 3392 } 3393 STREAM(q)->sd_qn_maxpsz = val; 3394 if (freezer != curthread) { 3395 mutex_exit(QLOCK(wrq)); 3396 mutex_enter(QLOCK(q)); 3397 } 3398 break; 3399 3400 case QMINPSZ: 3401 if (qbp) 3402 error = EINVAL; 3403 else 3404 q->q_minpsz = (ssize_t)val; 3405 3406 /* 3407 * Performance concern, strwrite looks at the module below 3408 * the stream head for the maxpsz each time it does a write 3409 * we now cache it at the stream head. Check to see if this 3410 * queue is sitting directly below the stream head. 3411 */ 3412 wrq = STREAM(q)->sd_wrq; 3413 if (q != wrq->q_next) 3414 break; 3415 3416 /* 3417 * If the stream is not frozen drop the current QLOCK and 3418 * acquire the sd_wrq QLOCK which protects sd_qn_* 3419 */ 3420 if (freezer != curthread) { 3421 mutex_exit(QLOCK(q)); 3422 mutex_enter(QLOCK(wrq)); 3423 } 3424 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3425 3426 if (freezer != curthread) { 3427 mutex_exit(QLOCK(wrq)); 3428 mutex_enter(QLOCK(q)); 3429 } 3430 break; 3431 3432 case QSTRUIOT: 3433 if (qbp) 3434 error = EINVAL; 3435 else 3436 q->q_struiot = (ushort_t)val; 3437 break; 3438 3439 case QCOUNT: 3440 case QFIRST: 3441 case QLAST: 3442 case QFLAG: 3443 error = EPERM; 3444 break; 3445 3446 default: 3447 error = EINVAL; 3448 break; 3449 } 3450 done: 3451 if (freezer != curthread) 3452 mutex_exit(QLOCK(q)); 3453 return (error); 3454 } 3455 3456 /* 3457 * Get queue fields. 3458 */ 3459 int 3460 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3461 { 3462 qband_t *qbp = NULL; 3463 int error = 0; 3464 kthread_id_t freezer; 3465 3466 freezer = STREAM(q)->sd_freezer; 3467 if (freezer == curthread) { 3468 ASSERT(frozenstr(q)); 3469 ASSERT(MUTEX_HELD(QLOCK(q))); 3470 } else 3471 mutex_enter(QLOCK(q)); 3472 if (what >= QBAD) { 3473 error = EINVAL; 3474 goto done; 3475 } 3476 if (pri != 0) { 3477 int i; 3478 qband_t **qbpp; 3479 3480 if (pri > q->q_nband) { 3481 qbpp = &q->q_bandp; 3482 while (*qbpp) 3483 qbpp = &(*qbpp)->qb_next; 3484 while (pri > q->q_nband) { 3485 if ((*qbpp = allocband()) == NULL) { 3486 error = EAGAIN; 3487 goto done; 3488 } 3489 (*qbpp)->qb_hiwat = q->q_hiwat; 3490 (*qbpp)->qb_lowat = q->q_lowat; 3491 q->q_nband++; 3492 qbpp = &(*qbpp)->qb_next; 3493 } 3494 } 3495 qbp = q->q_bandp; 3496 i = pri; 3497 while (--i) 3498 qbp = qbp->qb_next; 3499 } 3500 switch (what) { 3501 case QHIWAT: 3502 if (qbp) 3503 *(size_t *)valp = qbp->qb_hiwat; 3504 else 3505 *(size_t *)valp = q->q_hiwat; 3506 break; 3507 3508 case QLOWAT: 3509 if (qbp) 3510 *(size_t *)valp = qbp->qb_lowat; 3511 else 3512 *(size_t *)valp = q->q_lowat; 3513 break; 3514 3515 case QMAXPSZ: 3516 if (qbp) 3517 error = EINVAL; 3518 else 3519 *(ssize_t *)valp = q->q_maxpsz; 3520 break; 3521 3522 case QMINPSZ: 3523 if (qbp) 3524 error = EINVAL; 3525 else 3526 *(ssize_t *)valp = q->q_minpsz; 3527 break; 3528 3529 case QCOUNT: 3530 if (qbp) 3531 *(size_t *)valp = qbp->qb_count; 3532 else 3533 *(size_t *)valp = q->q_count; 3534 break; 3535 3536 case QFIRST: 3537 if (qbp) 3538 *(mblk_t **)valp = qbp->qb_first; 3539 else 3540 *(mblk_t **)valp = q->q_first; 3541 break; 3542 3543 case QLAST: 3544 if (qbp) 3545 *(mblk_t **)valp = qbp->qb_last; 3546 else 3547 *(mblk_t **)valp = q->q_last; 3548 break; 3549 3550 case QFLAG: 3551 if (qbp) 3552 *(uint_t *)valp = qbp->qb_flag; 3553 else 3554 *(uint_t *)valp = q->q_flag; 3555 break; 3556 3557 case QSTRUIOT: 3558 if (qbp) 3559 error = EINVAL; 3560 else 3561 *(short *)valp = q->q_struiot; 3562 break; 3563 3564 default: 3565 error = EINVAL; 3566 break; 3567 } 3568 done: 3569 if (freezer != curthread) 3570 mutex_exit(QLOCK(q)); 3571 return (error); 3572 } 3573 3574 /* 3575 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3576 * QWANTWSYNC or QWANTR or QWANTW, 3577 * 3578 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3579 * deferred wakeup will be done. Also if strpoll() in progress then a 3580 * deferred pollwakeup will be done. 3581 */ 3582 void 3583 strwakeq(queue_t *q, int flag) 3584 { 3585 stdata_t *stp = STREAM(q); 3586 pollhead_t *pl; 3587 3588 mutex_enter(&stp->sd_lock); 3589 pl = &stp->sd_pollist; 3590 if (flag & QWANTWSYNC) { 3591 ASSERT(!(q->q_flag & QREADR)); 3592 if (stp->sd_flag & WSLEEP) { 3593 stp->sd_flag &= ~WSLEEP; 3594 cv_broadcast(&stp->sd_wrq->q_wait); 3595 } else { 3596 stp->sd_wakeq |= WSLEEP; 3597 } 3598 3599 mutex_exit(&stp->sd_lock); 3600 pollwakeup(pl, POLLWRNORM); 3601 mutex_enter(&stp->sd_lock); 3602 3603 if (stp->sd_sigflags & S_WRNORM) 3604 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3605 } else if (flag & QWANTR) { 3606 if (stp->sd_flag & RSLEEP) { 3607 stp->sd_flag &= ~RSLEEP; 3608 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3609 } else { 3610 stp->sd_wakeq |= RSLEEP; 3611 } 3612 3613 mutex_exit(&stp->sd_lock); 3614 pollwakeup(pl, POLLIN | POLLRDNORM); 3615 mutex_enter(&stp->sd_lock); 3616 3617 { 3618 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3619 3620 if (events) 3621 strsendsig(stp->sd_siglist, events, 0, 0); 3622 } 3623 } else { 3624 if (stp->sd_flag & WSLEEP) { 3625 stp->sd_flag &= ~WSLEEP; 3626 cv_broadcast(&stp->sd_wrq->q_wait); 3627 } 3628 3629 mutex_exit(&stp->sd_lock); 3630 pollwakeup(pl, POLLWRNORM); 3631 mutex_enter(&stp->sd_lock); 3632 3633 if (stp->sd_sigflags & S_WRNORM) 3634 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3635 } 3636 mutex_exit(&stp->sd_lock); 3637 } 3638 3639 int 3640 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3641 { 3642 stdata_t *stp = STREAM(q); 3643 int typ = STRUIOT_STANDARD; 3644 uio_t *uiop = &dp->d_uio; 3645 dblk_t *dbp; 3646 ssize_t uiocnt; 3647 ssize_t cnt; 3648 unsigned char *ptr; 3649 ssize_t resid; 3650 int error = 0; 3651 on_trap_data_t otd; 3652 queue_t *stwrq; 3653 3654 /* 3655 * Plumbing may change while taking the type so store the 3656 * queue in a temporary variable. It doesn't matter even 3657 * if the we take the type from the previous plumbing, 3658 * that's because if the plumbing has changed when we were 3659 * holding the queue in a temporary variable, we can continue 3660 * processing the message the way it would have been processed 3661 * in the old plumbing, without any side effects but a bit 3662 * extra processing for partial ip header checksum. 3663 * 3664 * This has been done to avoid holding the sd_lock which is 3665 * very hot. 3666 */ 3667 3668 stwrq = stp->sd_struiowrq; 3669 if (stwrq) 3670 typ = stwrq->q_struiot; 3671 3672 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3673 dbp = mp->b_datap; 3674 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3675 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3676 cnt = MIN(uiocnt, uiop->uio_resid); 3677 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3678 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3679 /* 3680 * Either this mblk has already been processed 3681 * or there is no more room in this mblk (?). 3682 */ 3683 continue; 3684 } 3685 switch (typ) { 3686 case STRUIOT_STANDARD: 3687 if (noblock) { 3688 if (on_trap(&otd, OT_DATA_ACCESS)) { 3689 no_trap(); 3690 error = EWOULDBLOCK; 3691 goto out; 3692 } 3693 } 3694 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3695 if (noblock) 3696 no_trap(); 3697 goto out; 3698 } 3699 if (noblock) 3700 no_trap(); 3701 break; 3702 3703 default: 3704 error = EIO; 3705 goto out; 3706 } 3707 dbp->db_struioflag |= STRUIO_DONE; 3708 dbp->db_cksumstuff += cnt; 3709 } 3710 out: 3711 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3712 /* 3713 * A fault has occured and some bytes were moved to the 3714 * current mblk, the uio_t has already been updated by 3715 * the appropriate uio routine, so also update the mblk 3716 * to reflect this in case this same mblk chain is used 3717 * again (after the fault has been handled). 3718 */ 3719 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3720 if (uiocnt >= resid) 3721 dbp->db_cksumstuff += resid; 3722 } 3723 return (error); 3724 } 3725 3726 /* 3727 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3728 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3729 * that removeq() will not try to close the queue while a thread is inside the 3730 * queue. 3731 */ 3732 static boolean_t 3733 rwnext_enter(queue_t *qp) 3734 { 3735 mutex_enter(QLOCK(qp)); 3736 if (qp->q_flag & QWCLOSE) { 3737 mutex_exit(QLOCK(qp)); 3738 return (B_FALSE); 3739 } 3740 qp->q_rwcnt++; 3741 ASSERT(qp->q_rwcnt != 0); 3742 mutex_exit(QLOCK(qp)); 3743 return (B_TRUE); 3744 } 3745 3746 /* 3747 * Decrease the count of threads running in sync stream queue and wake up any 3748 * threads blocked in removeq(). 3749 */ 3750 static void 3751 rwnext_exit(queue_t *qp) 3752 { 3753 mutex_enter(QLOCK(qp)); 3754 qp->q_rwcnt--; 3755 if (qp->q_flag & QWANTRMQSYNC) { 3756 qp->q_flag &= ~QWANTRMQSYNC; 3757 cv_broadcast(&qp->q_wait); 3758 } 3759 mutex_exit(QLOCK(qp)); 3760 } 3761 3762 /* 3763 * The purpose of rwnext() is to call the rw procedure of the next 3764 * (downstream) modules queue. 3765 * 3766 * treated as put entrypoint for perimeter syncronization. 3767 * 3768 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3769 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3770 * not matter if any regular put entrypoints have been already entered. We 3771 * can't increment one of the sq_putcounts (instead of sq_count) because 3772 * qwait_rw won't know which counter to decrement. 3773 * 3774 * It would be reasonable to add the lockless FASTPUT logic. 3775 */ 3776 int 3777 rwnext(queue_t *qp, struiod_t *dp) 3778 { 3779 queue_t *nqp; 3780 syncq_t *sq; 3781 uint16_t count; 3782 uint16_t flags; 3783 struct qinit *qi; 3784 int (*proc)(); 3785 struct stdata *stp; 3786 int isread; 3787 int rval; 3788 3789 stp = STREAM(qp); 3790 /* 3791 * Prevent q_next from changing by holding sd_lock until acquiring 3792 * SQLOCK. Note that a read-side rwnext from the streamhead will 3793 * already have sd_lock acquired. In either case sd_lock is always 3794 * released after acquiring SQLOCK. 3795 * 3796 * The streamhead read-side holding sd_lock when calling rwnext is 3797 * required to prevent a race condition were M_DATA mblks flowing 3798 * up the read-side of the stream could be bypassed by a rwnext() 3799 * down-call. In this case sd_lock acts as the streamhead perimeter. 3800 */ 3801 if ((nqp = _WR(qp)) == qp) { 3802 isread = 0; 3803 mutex_enter(&stp->sd_lock); 3804 qp = nqp->q_next; 3805 } else { 3806 isread = 1; 3807 if (nqp != stp->sd_wrq) 3808 /* Not streamhead */ 3809 mutex_enter(&stp->sd_lock); 3810 qp = _RD(nqp->q_next); 3811 } 3812 qi = qp->q_qinfo; 3813 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3814 /* 3815 * Not a synchronous module or no r/w procedure for this 3816 * queue, so just return EINVAL and let the caller handle it. 3817 */ 3818 mutex_exit(&stp->sd_lock); 3819 return (EINVAL); 3820 } 3821 3822 if (rwnext_enter(qp) == B_FALSE) { 3823 mutex_exit(&stp->sd_lock); 3824 return (EINVAL); 3825 } 3826 3827 sq = qp->q_syncq; 3828 mutex_enter(SQLOCK(sq)); 3829 mutex_exit(&stp->sd_lock); 3830 count = sq->sq_count; 3831 flags = sq->sq_flags; 3832 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3833 3834 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3835 /* 3836 * if this queue is being closed, return. 3837 */ 3838 if (qp->q_flag & QWCLOSE) { 3839 mutex_exit(SQLOCK(sq)); 3840 rwnext_exit(qp); 3841 return (EINVAL); 3842 } 3843 3844 /* 3845 * Wait until we can enter the inner perimeter. 3846 */ 3847 sq->sq_flags = flags | SQ_WANTWAKEUP; 3848 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3849 count = sq->sq_count; 3850 flags = sq->sq_flags; 3851 } 3852 3853 if (isread == 0 && stp->sd_struiowrq == NULL || 3854 isread == 1 && stp->sd_struiordq == NULL) { 3855 /* 3856 * Stream plumbing changed while waiting for inner perimeter 3857 * so just return EINVAL and let the caller handle it. 3858 */ 3859 mutex_exit(SQLOCK(sq)); 3860 rwnext_exit(qp); 3861 return (EINVAL); 3862 } 3863 if (!(flags & SQ_CIPUT)) 3864 sq->sq_flags = flags | SQ_EXCL; 3865 sq->sq_count = count + 1; 3866 ASSERT(sq->sq_count != 0); /* Wraparound */ 3867 /* 3868 * Note: The only message ordering guarantee that rwnext() makes is 3869 * for the write queue flow-control case. All others (r/w queue 3870 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3871 * the queue's rw procedure. This could be genralized here buy 3872 * running the queue's service procedure, but that wouldn't be 3873 * the most efficent for all cases. 3874 */ 3875 mutex_exit(SQLOCK(sq)); 3876 if (! isread && (qp->q_flag & QFULL)) { 3877 /* 3878 * Write queue may be flow controlled. If so, 3879 * mark the queue for wakeup when it's not. 3880 */ 3881 mutex_enter(QLOCK(qp)); 3882 if (qp->q_flag & QFULL) { 3883 qp->q_flag |= QWANTWSYNC; 3884 mutex_exit(QLOCK(qp)); 3885 rval = EWOULDBLOCK; 3886 goto out; 3887 } 3888 mutex_exit(QLOCK(qp)); 3889 } 3890 3891 if (! isread && dp->d_mp) 3892 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3893 dp->d_mp->b_datap->db_base); 3894 3895 rval = (*proc)(qp, dp); 3896 3897 if (isread && dp->d_mp) 3898 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3899 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3900 out: 3901 /* 3902 * The queue is protected from being freed by sq_count, so it is 3903 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3904 */ 3905 rwnext_exit(qp); 3906 3907 mutex_enter(SQLOCK(sq)); 3908 flags = sq->sq_flags; 3909 ASSERT(sq->sq_count != 0); 3910 sq->sq_count--; 3911 if (flags & SQ_TAIL) { 3912 putnext_tail(sq, qp, flags); 3913 /* 3914 * The only purpose of this ASSERT is to preserve calling stack 3915 * in DEBUG kernel. 3916 */ 3917 ASSERT(flags & SQ_TAIL); 3918 return (rval); 3919 } 3920 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3921 /* 3922 * Safe to always drop SQ_EXCL: 3923 * Not SQ_CIPUT means we set SQ_EXCL above 3924 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3925 * did a qwriter(INNER) in which case nobody else 3926 * is in the inner perimeter and we are exiting. 3927 * 3928 * I would like to make the following assertion: 3929 * 3930 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3931 * sq->sq_count == 0); 3932 * 3933 * which indicates that if we are both putshared and exclusive, 3934 * we became exclusive while executing the putproc, and the only 3935 * claim on the syncq was the one we dropped a few lines above. 3936 * But other threads that enter putnext while the syncq is exclusive 3937 * need to make a claim as they may need to drop SQLOCK in the 3938 * has_writers case to avoid deadlocks. If these threads are 3939 * delayed or preempted, it is possible that the writer thread can 3940 * find out that there are other claims making the (sq_count == 0) 3941 * test invalid. 3942 */ 3943 3944 sq->sq_flags = flags & ~SQ_EXCL; 3945 if (sq->sq_flags & SQ_WANTWAKEUP) { 3946 sq->sq_flags &= ~SQ_WANTWAKEUP; 3947 cv_broadcast(&sq->sq_wait); 3948 } 3949 mutex_exit(SQLOCK(sq)); 3950 return (rval); 3951 } 3952 3953 /* 3954 * The purpose of infonext() is to call the info procedure of the next 3955 * (downstream) modules queue. 3956 * 3957 * treated as put entrypoint for perimeter syncronization. 3958 * 3959 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3960 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3961 * it does not matter if any regular put entrypoints have been already 3962 * entered. 3963 */ 3964 int 3965 infonext(queue_t *qp, infod_t *idp) 3966 { 3967 queue_t *nqp; 3968 syncq_t *sq; 3969 uint16_t count; 3970 uint16_t flags; 3971 struct qinit *qi; 3972 int (*proc)(); 3973 struct stdata *stp; 3974 int rval; 3975 3976 stp = STREAM(qp); 3977 /* 3978 * Prevent q_next from changing by holding sd_lock until 3979 * acquiring SQLOCK. 3980 */ 3981 mutex_enter(&stp->sd_lock); 3982 if ((nqp = _WR(qp)) == qp) { 3983 qp = nqp->q_next; 3984 } else { 3985 qp = _RD(nqp->q_next); 3986 } 3987 qi = qp->q_qinfo; 3988 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3989 mutex_exit(&stp->sd_lock); 3990 return (EINVAL); 3991 } 3992 sq = qp->q_syncq; 3993 mutex_enter(SQLOCK(sq)); 3994 mutex_exit(&stp->sd_lock); 3995 count = sq->sq_count; 3996 flags = sq->sq_flags; 3997 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3998 3999 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 4000 /* 4001 * Wait until we can enter the inner perimeter. 4002 */ 4003 sq->sq_flags = flags | SQ_WANTWAKEUP; 4004 cv_wait(&sq->sq_wait, SQLOCK(sq)); 4005 count = sq->sq_count; 4006 flags = sq->sq_flags; 4007 } 4008 4009 if (! (flags & SQ_CIPUT)) 4010 sq->sq_flags = flags | SQ_EXCL; 4011 sq->sq_count = count + 1; 4012 ASSERT(sq->sq_count != 0); /* Wraparound */ 4013 mutex_exit(SQLOCK(sq)); 4014 4015 rval = (*proc)(qp, idp); 4016 4017 mutex_enter(SQLOCK(sq)); 4018 flags = sq->sq_flags; 4019 ASSERT(sq->sq_count != 0); 4020 sq->sq_count--; 4021 if (flags & SQ_TAIL) { 4022 putnext_tail(sq, qp, flags); 4023 /* 4024 * The only purpose of this ASSERT is to preserve calling stack 4025 * in DEBUG kernel. 4026 */ 4027 ASSERT(flags & SQ_TAIL); 4028 return (rval); 4029 } 4030 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 4031 /* 4032 * XXXX 4033 * I am not certain the next comment is correct here. I need to consider 4034 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 4035 * might cause other problems. It just might be safer to drop it if 4036 * !SQ_CIPUT because that is when we set it. 4037 */ 4038 /* 4039 * Safe to always drop SQ_EXCL: 4040 * Not SQ_CIPUT means we set SQ_EXCL above 4041 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 4042 * did a qwriter(INNER) in which case nobody else 4043 * is in the inner perimeter and we are exiting. 4044 * 4045 * I would like to make the following assertion: 4046 * 4047 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 4048 * sq->sq_count == 0); 4049 * 4050 * which indicates that if we are both putshared and exclusive, 4051 * we became exclusive while executing the putproc, and the only 4052 * claim on the syncq was the one we dropped a few lines above. 4053 * But other threads that enter putnext while the syncq is exclusive 4054 * need to make a claim as they may need to drop SQLOCK in the 4055 * has_writers case to avoid deadlocks. If these threads are 4056 * delayed or preempted, it is possible that the writer thread can 4057 * find out that there are other claims making the (sq_count == 0) 4058 * test invalid. 4059 */ 4060 4061 sq->sq_flags = flags & ~SQ_EXCL; 4062 mutex_exit(SQLOCK(sq)); 4063 return (rval); 4064 } 4065 4066 /* 4067 * Return nonzero if the queue is responsible for struio(), else return 0. 4068 */ 4069 int 4070 isuioq(queue_t *q) 4071 { 4072 if (q->q_flag & QREADR) 4073 return (STREAM(q)->sd_struiordq == q); 4074 else 4075 return (STREAM(q)->sd_struiowrq == q); 4076 } 4077 4078 #if defined(__sparc) 4079 int disable_putlocks = 0; 4080 #else 4081 int disable_putlocks = 1; 4082 #endif 4083 4084 /* 4085 * called by create_putlock. 4086 */ 4087 static void 4088 create_syncq_putlocks(queue_t *q) 4089 { 4090 syncq_t *sq = q->q_syncq; 4091 ciputctrl_t *cip; 4092 int i; 4093 4094 ASSERT(sq != NULL); 4095 4096 ASSERT(disable_putlocks == 0); 4097 ASSERT(n_ciputctrl >= min_n_ciputctrl); 4098 ASSERT(ciputctrl_cache != NULL); 4099 4100 if (!(sq->sq_type & SQ_CIPUT)) 4101 return; 4102 4103 for (i = 0; i <= 1; i++) { 4104 if (sq->sq_ciputctrl == NULL) { 4105 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4106 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4107 mutex_enter(SQLOCK(sq)); 4108 if (sq->sq_ciputctrl != NULL) { 4109 mutex_exit(SQLOCK(sq)); 4110 kmem_cache_free(ciputctrl_cache, cip); 4111 } else { 4112 ASSERT(sq->sq_nciputctrl == 0); 4113 sq->sq_nciputctrl = n_ciputctrl - 1; 4114 /* 4115 * putnext checks sq_ciputctrl without holding 4116 * SQLOCK. if it is not NULL putnext assumes 4117 * sq_nciputctrl is initialized. membar below 4118 * insures that. 4119 */ 4120 membar_producer(); 4121 sq->sq_ciputctrl = cip; 4122 mutex_exit(SQLOCK(sq)); 4123 } 4124 } 4125 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 4126 if (i == 1) 4127 break; 4128 q = _OTHERQ(q); 4129 if (!(q->q_flag & QPERQ)) { 4130 ASSERT(sq == q->q_syncq); 4131 break; 4132 } 4133 ASSERT(q->q_syncq != NULL); 4134 ASSERT(sq != q->q_syncq); 4135 sq = q->q_syncq; 4136 ASSERT(sq->sq_type & SQ_CIPUT); 4137 } 4138 } 4139 4140 /* 4141 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 4142 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 4143 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 4144 * starting from q and down to the driver. 4145 * 4146 * This should be called after the affected queues are part of stream 4147 * geometry. It should be called from driver/module open routine after 4148 * qprocson() call. It is also called from nfs syscall where it is known that 4149 * stream is configured and won't change its geometry during create_putlock 4150 * call. 4151 * 4152 * caller normally uses 0 value for the stream argument to speed up MT putnext 4153 * into the perimeter of q for example because its perimeter is per module 4154 * (e.g. IP). 4155 * 4156 * caller normally uses non 0 value for the stream argument to hint the system 4157 * that the stream of q is a very contended global system stream 4158 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 4159 * particularly MT hot. 4160 * 4161 * Caller insures stream plumbing won't happen while we are here and therefore 4162 * q_next can be safely used. 4163 */ 4164 4165 void 4166 create_putlocks(queue_t *q, int stream) 4167 { 4168 ciputctrl_t *cip; 4169 struct stdata *stp = STREAM(q); 4170 4171 q = _WR(q); 4172 ASSERT(stp != NULL); 4173 4174 if (disable_putlocks != 0) 4175 return; 4176 4177 if (n_ciputctrl < min_n_ciputctrl) 4178 return; 4179 4180 ASSERT(ciputctrl_cache != NULL); 4181 4182 if (stream != 0 && stp->sd_ciputctrl == NULL) { 4183 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4184 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4185 mutex_enter(&stp->sd_lock); 4186 if (stp->sd_ciputctrl != NULL) { 4187 mutex_exit(&stp->sd_lock); 4188 kmem_cache_free(ciputctrl_cache, cip); 4189 } else { 4190 ASSERT(stp->sd_nciputctrl == 0); 4191 stp->sd_nciputctrl = n_ciputctrl - 1; 4192 /* 4193 * putnext checks sd_ciputctrl without holding 4194 * sd_lock. if it is not NULL putnext assumes 4195 * sd_nciputctrl is initialized. membar below 4196 * insures that. 4197 */ 4198 membar_producer(); 4199 stp->sd_ciputctrl = cip; 4200 mutex_exit(&stp->sd_lock); 4201 } 4202 } 4203 4204 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 4205 4206 while (_SAMESTR(q)) { 4207 create_syncq_putlocks(q); 4208 if (stream == 0) 4209 return; 4210 q = q->q_next; 4211 } 4212 ASSERT(q != NULL); 4213 create_syncq_putlocks(q); 4214 } 4215 4216 /* 4217 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 4218 * through a stream. 4219 * 4220 * Data currently record per-event is a timestamp, module/driver name, 4221 * downstream module/driver name, optional callstack, event type and a per 4222 * type datum. Much of the STREAMS framework is instrumented for automatic 4223 * flow tracing (when enabled). Events can be defined and used by STREAMS 4224 * modules and drivers. 4225 * 4226 * Global objects: 4227 * 4228 * str_ftevent() - Add a flow-trace event to a dblk. 4229 * str_ftfree() - Free flow-trace data 4230 * 4231 * Local objects: 4232 * 4233 * fthdr_cache - pointer to the kmem cache for trace header. 4234 * ftblk_cache - pointer to the kmem cache for trace data blocks. 4235 */ 4236 4237 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 4238 int str_ftstack = 0; /* Don't record event call stacks */ 4239 4240 void 4241 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 4242 { 4243 ftblk_t *bp = hp->tail; 4244 ftblk_t *nbp; 4245 ftevnt_t *ep; 4246 int ix, nix; 4247 4248 ASSERT(hp != NULL); 4249 4250 for (;;) { 4251 if ((ix = bp->ix) == FTBLK_EVNTS) { 4252 /* 4253 * Tail doesn't have room, so need a new tail. 4254 * 4255 * To make this MT safe, first, allocate a new 4256 * ftblk, and initialize it. To make life a 4257 * little easier, reserve the first slot (mostly 4258 * by making ix = 1). When we are finished with 4259 * the initialization, CAS this pointer to the 4260 * tail. If this succeeds, this is the new 4261 * "next" block. Otherwise, another thread 4262 * got here first, so free the block and start 4263 * again. 4264 */ 4265 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP); 4266 if (nbp == NULL) { 4267 /* no mem, so punt */ 4268 str_ftnever++; 4269 /* free up all flow data? */ 4270 return; 4271 } 4272 nbp->nxt = NULL; 4273 nbp->ix = 1; 4274 /* 4275 * Just in case there is another thread about 4276 * to get the next index, we need to make sure 4277 * the value is there for it. 4278 */ 4279 membar_producer(); 4280 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) { 4281 /* CAS was successful */ 4282 bp->nxt = nbp; 4283 membar_producer(); 4284 bp = nbp; 4285 ix = 0; 4286 goto cas_good; 4287 } else { 4288 kmem_cache_free(ftblk_cache, nbp); 4289 bp = hp->tail; 4290 continue; 4291 } 4292 } 4293 nix = ix + 1; 4294 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) { 4295 cas_good: 4296 if (curthread != hp->thread) { 4297 hp->thread = curthread; 4298 evnt |= FTEV_CS; 4299 } 4300 if (CPU->cpu_seqid != hp->cpu_seqid) { 4301 hp->cpu_seqid = CPU->cpu_seqid; 4302 evnt |= FTEV_PS; 4303 } 4304 ep = &bp->ev[ix]; 4305 break; 4306 } 4307 } 4308 4309 if (evnt & FTEV_QMASK) { 4310 queue_t *qp = p; 4311 4312 if (!(qp->q_flag & QREADR)) 4313 evnt |= FTEV_ISWR; 4314 4315 ep->mid = Q2NAME(qp); 4316 4317 /* 4318 * We only record the next queue name for FTEV_PUTNEXT since 4319 * that's the only time we *really* need it, and the putnext() 4320 * code ensures that qp->q_next won't vanish. (We could use 4321 * claimstr()/releasestr() but at a performance cost.) 4322 */ 4323 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL) 4324 ep->midnext = Q2NAME(qp->q_next); 4325 else 4326 ep->midnext = NULL; 4327 } else { 4328 ep->mid = p; 4329 ep->midnext = NULL; 4330 } 4331 4332 if (ep->stk != NULL) 4333 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH); 4334 4335 ep->ts = gethrtime(); 4336 ep->evnt = evnt; 4337 ep->data = data; 4338 hp->hash = (hp->hash << 9) + hp->hash; 4339 hp->hash += (evnt << 16) | data; 4340 hp->hash += (uintptr_t)ep->mid; 4341 } 4342 4343 /* 4344 * Free flow-trace data. 4345 */ 4346 void 4347 str_ftfree(dblk_t *dbp) 4348 { 4349 fthdr_t *hp = dbp->db_fthdr; 4350 ftblk_t *bp = &hp->first; 4351 ftblk_t *nbp; 4352 4353 if (bp != hp->tail || bp->ix != 0) { 4354 /* 4355 * Clear out the hash, have the tail point to itself, and free 4356 * any continuation blocks. 4357 */ 4358 bp = hp->first.nxt; 4359 hp->tail = &hp->first; 4360 hp->hash = 0; 4361 hp->first.nxt = NULL; 4362 hp->first.ix = 0; 4363 while (bp != NULL) { 4364 nbp = bp->nxt; 4365 kmem_cache_free(ftblk_cache, bp); 4366 bp = nbp; 4367 } 4368 } 4369 kmem_cache_free(fthdr_cache, hp); 4370 dbp->db_fthdr = NULL; 4371 } 4372