1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 /* 25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 * 28 * Copyright 2021 Tintri by DDN, Inc. All rights reserved. 29 * Copyright 2022 Garrett D'Amore 30 */ 31 32 #include <sys/types.h> 33 #include <sys/param.h> 34 #include <sys/thread.h> 35 #include <sys/sysmacros.h> 36 #include <sys/stropts.h> 37 #include <sys/stream.h> 38 #include <sys/strsubr.h> 39 #include <sys/strsun.h> 40 #include <sys/conf.h> 41 #include <sys/debug.h> 42 #include <sys/cmn_err.h> 43 #include <sys/kmem.h> 44 #include <sys/atomic.h> 45 #include <sys/errno.h> 46 #include <sys/vtrace.h> 47 #include <sys/ftrace.h> 48 #include <sys/ontrap.h> 49 #include <sys/sdt.h> 50 #include <sys/strft.h> 51 52 #ifdef DEBUG 53 #include <sys/kmem_impl.h> 54 #endif 55 56 /* 57 * This file contains all the STREAMS utility routines that may 58 * be used by modules and drivers. 59 */ 60 61 /* 62 * STREAMS message allocator: principles of operation 63 * 64 * The streams message allocator consists of all the routines that 65 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 66 * dupb(), freeb() and freemsg(). What follows is a high-level view 67 * of how the allocator works. 68 * 69 * Every streams message consists of one or more mblks, a dblk, and data. 70 * All mblks for all types of messages come from a common mblk_cache. 71 * The dblk and data come in several flavors, depending on how the 72 * message is allocated: 73 * 74 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 75 * fixed-size dblk/data caches. For message sizes that are multiples of 76 * PAGESIZE, dblks are allocated separately from the buffer. 77 * The associated buffer is allocated by the constructor using kmem_alloc(). 78 * For all other message sizes, dblk and its associated data is allocated 79 * as a single contiguous chunk of memory. 80 * Objects in these caches consist of a dblk plus its associated data. 81 * allocb() determines the nearest-size cache by table lookup: 82 * the dblk_cache[] array provides the mapping from size to dblk cache. 83 * 84 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 85 * kmem_alloc()'ing a buffer for the data and supplying that 86 * buffer to gesballoc(), described below. 87 * 88 * (3) The four flavors of [d]esballoc[a] are all implemented by a 89 * common routine, gesballoc() ("generic esballoc"). gesballoc() 90 * allocates a dblk from the global dblk_esb_cache and sets db_base, 91 * db_lim and db_frtnp to describe the caller-supplied buffer. 92 * 93 * While there are several routines to allocate messages, there is only 94 * one routine to free messages: freeb(). freeb() simply invokes the 95 * dblk's free method, dbp->db_free(), which is set at allocation time. 96 * 97 * dupb() creates a new reference to a message by allocating a new mblk, 98 * incrementing the dblk reference count and setting the dblk's free 99 * method to dblk_decref(). The dblk's original free method is retained 100 * in db_lastfree. dblk_decref() decrements the reference count on each 101 * freeb(). If this is not the last reference it just frees the mblk; 102 * if this *is* the last reference, it restores db_free to db_lastfree, 103 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 104 * 105 * The implementation makes aggressive use of kmem object caching for 106 * maximum performance. This makes the code simple and compact, but 107 * also a bit abstruse in some places. The invariants that constitute a 108 * message's constructed state, described below, are more subtle than usual. 109 * 110 * Every dblk has an "attached mblk" as part of its constructed state. 111 * The mblk is allocated by the dblk's constructor and remains attached 112 * until the message is either dup'ed or pulled up. In the dupb() case 113 * the mblk association doesn't matter until the last free, at which time 114 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 115 * the mblk association because it swaps the leading mblks of two messages, 116 * so it is responsible for swapping their db_mblk pointers accordingly. 117 * From a constructed-state viewpoint it doesn't matter that a dblk's 118 * attached mblk can change while the message is allocated; all that 119 * matters is that the dblk has *some* attached mblk when it's freed. 120 * 121 * The sizes of the allocb() small-message caches are not magical. 122 * They represent a good trade-off between internal and external 123 * fragmentation for current workloads. They should be reevaluated 124 * periodically, especially if allocations larger than DBLK_MAX_CACHE 125 * become common. We use 64-byte alignment so that dblks don't 126 * straddle cache lines unnecessarily. 127 */ 128 #define DBLK_MAX_CACHE 73728 129 #define DBLK_CACHE_ALIGN 64 130 #define DBLK_MIN_SIZE 8 131 #define DBLK_SIZE_SHIFT 3 132 133 #ifdef _BIG_ENDIAN 134 #define DBLK_RTFU_SHIFT(field) \ 135 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 136 #else 137 #define DBLK_RTFU_SHIFT(field) \ 138 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 139 #endif 140 141 #define DBLK_RTFU(ref, type, flags, uioflag) \ 142 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 143 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 144 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 145 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 146 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 147 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 148 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 149 150 static size_t dblk_sizes[] = { 151 #ifdef _LP64 152 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856, 153 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624, 154 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392, 155 #else 156 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904, 157 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672, 158 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440, 159 #endif 160 DBLK_MAX_CACHE, 0 161 }; 162 163 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 164 static struct kmem_cache *mblk_cache; 165 static struct kmem_cache *dblk_esb_cache; 166 static struct kmem_cache *fthdr_cache; 167 static struct kmem_cache *ftblk_cache; 168 169 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 170 static mblk_t *allocb_oversize(size_t size, int flags); 171 static int allocb_tryhard_fails; 172 static void frnop_func(void *arg); 173 frtn_t frnop = { frnop_func }; 174 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 175 176 static boolean_t rwnext_enter(queue_t *qp); 177 static void rwnext_exit(queue_t *qp); 178 179 /* 180 * Patchable mblk/dblk kmem_cache flags. 181 */ 182 int dblk_kmem_flags = 0; 183 int mblk_kmem_flags = 0; 184 185 static int 186 dblk_constructor(void *buf, void *cdrarg, int kmflags) 187 { 188 dblk_t *dbp = buf; 189 ssize_t msg_size = (ssize_t)cdrarg; 190 size_t index; 191 192 ASSERT(msg_size != 0); 193 194 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 195 196 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 197 198 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 199 return (-1); 200 if ((msg_size & PAGEOFFSET) == 0) { 201 dbp->db_base = kmem_alloc(msg_size, kmflags); 202 if (dbp->db_base == NULL) { 203 kmem_cache_free(mblk_cache, dbp->db_mblk); 204 return (-1); 205 } 206 } else { 207 dbp->db_base = (unsigned char *)&dbp[1]; 208 } 209 210 dbp->db_mblk->b_datap = dbp; 211 dbp->db_cache = dblk_cache[index]; 212 dbp->db_lim = dbp->db_base + msg_size; 213 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 214 dbp->db_frtnp = NULL; 215 dbp->db_fthdr = NULL; 216 dbp->db_credp = NULL; 217 dbp->db_cpid = -1; 218 dbp->db_struioflag = 0; 219 dbp->db_struioun.cksum.flags = 0; 220 return (0); 221 } 222 223 /*ARGSUSED*/ 224 static int 225 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 226 { 227 dblk_t *dbp = buf; 228 229 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 230 return (-1); 231 dbp->db_mblk->b_datap = dbp; 232 dbp->db_cache = dblk_esb_cache; 233 dbp->db_fthdr = NULL; 234 dbp->db_credp = NULL; 235 dbp->db_cpid = -1; 236 dbp->db_struioflag = 0; 237 dbp->db_struioun.cksum.flags = 0; 238 return (0); 239 } 240 241 static int 242 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 243 { 244 dblk_t *dbp = buf; 245 bcache_t *bcp = cdrarg; 246 247 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 248 return (-1); 249 250 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags); 251 if (dbp->db_base == NULL) { 252 kmem_cache_free(mblk_cache, dbp->db_mblk); 253 return (-1); 254 } 255 256 dbp->db_mblk->b_datap = dbp; 257 dbp->db_cache = (void *)bcp; 258 dbp->db_lim = dbp->db_base + bcp->size; 259 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 260 dbp->db_frtnp = NULL; 261 dbp->db_fthdr = NULL; 262 dbp->db_credp = NULL; 263 dbp->db_cpid = -1; 264 dbp->db_struioflag = 0; 265 dbp->db_struioun.cksum.flags = 0; 266 return (0); 267 } 268 269 /*ARGSUSED*/ 270 static void 271 dblk_destructor(void *buf, void *cdrarg) 272 { 273 dblk_t *dbp = buf; 274 ssize_t msg_size = (ssize_t)cdrarg; 275 276 ASSERT(dbp->db_mblk->b_datap == dbp); 277 ASSERT(msg_size != 0); 278 ASSERT(dbp->db_struioflag == 0); 279 ASSERT(dbp->db_struioun.cksum.flags == 0); 280 281 if ((msg_size & PAGEOFFSET) == 0) { 282 kmem_free(dbp->db_base, msg_size); 283 } 284 285 kmem_cache_free(mblk_cache, dbp->db_mblk); 286 } 287 288 static void 289 bcache_dblk_destructor(void *buf, void *cdrarg) 290 { 291 dblk_t *dbp = buf; 292 bcache_t *bcp = cdrarg; 293 294 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 295 296 ASSERT(dbp->db_mblk->b_datap == dbp); 297 ASSERT(dbp->db_struioflag == 0); 298 ASSERT(dbp->db_struioun.cksum.flags == 0); 299 300 kmem_cache_free(mblk_cache, dbp->db_mblk); 301 } 302 303 /* ARGSUSED */ 304 static int 305 ftblk_constructor(void *buf, void *cdrarg, int kmflags) 306 { 307 ftblk_t *fbp = buf; 308 int i; 309 310 bzero(fbp, sizeof (ftblk_t)); 311 if (str_ftstack != 0) { 312 for (i = 0; i < FTBLK_EVNTS; i++) 313 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags); 314 } 315 316 return (0); 317 } 318 319 /* ARGSUSED */ 320 static void 321 ftblk_destructor(void *buf, void *cdrarg) 322 { 323 ftblk_t *fbp = buf; 324 int i; 325 326 if (str_ftstack != 0) { 327 for (i = 0; i < FTBLK_EVNTS; i++) { 328 if (fbp->ev[i].stk != NULL) { 329 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t)); 330 fbp->ev[i].stk = NULL; 331 } 332 } 333 } 334 } 335 336 static int 337 fthdr_constructor(void *buf, void *cdrarg, int kmflags) 338 { 339 fthdr_t *fhp = buf; 340 341 return (ftblk_constructor(&fhp->first, cdrarg, kmflags)); 342 } 343 344 static void 345 fthdr_destructor(void *buf, void *cdrarg) 346 { 347 fthdr_t *fhp = buf; 348 349 ftblk_destructor(&fhp->first, cdrarg); 350 } 351 352 void 353 streams_msg_init(void) 354 { 355 char name[40]; 356 size_t size; 357 size_t lastsize = DBLK_MIN_SIZE; 358 size_t *sizep; 359 struct kmem_cache *cp; 360 size_t tot_size; 361 int offset; 362 363 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32, 364 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags); 365 366 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 367 368 if ((offset = (size & PAGEOFFSET)) != 0) { 369 /* 370 * We are in the middle of a page, dblk should 371 * be allocated on the same page 372 */ 373 tot_size = size + sizeof (dblk_t); 374 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 375 < PAGESIZE); 376 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 377 378 } else { 379 380 /* 381 * buf size is multiple of page size, dblk and 382 * buffer are allocated separately. 383 */ 384 385 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 386 tot_size = sizeof (dblk_t); 387 } 388 389 (void) sprintf(name, "streams_dblk_%ld", size); 390 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN, 391 dblk_constructor, dblk_destructor, NULL, (void *)(size), 392 NULL, dblk_kmem_flags); 393 394 while (lastsize <= size) { 395 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 396 lastsize += DBLK_MIN_SIZE; 397 } 398 } 399 400 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t), 401 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL, 402 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags); 403 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32, 404 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0); 405 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32, 406 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0); 407 408 /* initialize throttling queue for esballoc */ 409 esballoc_queue_init(); 410 } 411 412 /*ARGSUSED*/ 413 mblk_t * 414 allocb(size_t size, uint_t pri) 415 { 416 dblk_t *dbp; 417 mblk_t *mp; 418 size_t index; 419 420 index = (size - 1) >> DBLK_SIZE_SHIFT; 421 422 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 423 if (size != 0) { 424 mp = allocb_oversize(size, KM_NOSLEEP); 425 goto out; 426 } 427 index = 0; 428 } 429 430 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 431 mp = NULL; 432 goto out; 433 } 434 435 mp = dbp->db_mblk; 436 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 437 mp->b_next = mp->b_prev = mp->b_cont = NULL; 438 mp->b_rptr = mp->b_wptr = dbp->db_base; 439 mp->b_queue = NULL; 440 MBLK_BAND_FLAG_WORD(mp) = 0; 441 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 442 out: 443 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 444 445 return (mp); 446 } 447 448 /* 449 * Allocate an mblk taking db_credp and db_cpid from the template. 450 * Allow the cred to be NULL. 451 */ 452 mblk_t * 453 allocb_tmpl(size_t size, const mblk_t *tmpl) 454 { 455 mblk_t *mp = allocb(size, 0); 456 457 if (mp != NULL) { 458 dblk_t *src = tmpl->b_datap; 459 dblk_t *dst = mp->b_datap; 460 cred_t *cr; 461 pid_t cpid; 462 463 cr = msg_getcred(tmpl, &cpid); 464 if (cr != NULL) 465 crhold(dst->db_credp = cr); 466 dst->db_cpid = cpid; 467 dst->db_type = src->db_type; 468 } 469 return (mp); 470 } 471 472 mblk_t * 473 allocb_cred(size_t size, cred_t *cr, pid_t cpid) 474 { 475 mblk_t *mp = allocb(size, 0); 476 477 ASSERT(cr != NULL); 478 if (mp != NULL) { 479 dblk_t *dbp = mp->b_datap; 480 481 crhold(dbp->db_credp = cr); 482 dbp->db_cpid = cpid; 483 } 484 return (mp); 485 } 486 487 mblk_t * 488 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid) 489 { 490 mblk_t *mp = allocb_wait(size, 0, flags, error); 491 492 ASSERT(cr != NULL); 493 if (mp != NULL) { 494 dblk_t *dbp = mp->b_datap; 495 496 crhold(dbp->db_credp = cr); 497 dbp->db_cpid = cpid; 498 } 499 500 return (mp); 501 } 502 503 /* 504 * Extract the db_cred (and optionally db_cpid) from a message. 505 * We find the first mblk which has a non-NULL db_cred and use that. 506 * If none found we return NULL. 507 * Does NOT get a hold on the cred. 508 */ 509 cred_t * 510 msg_getcred(const mblk_t *mp, pid_t *cpidp) 511 { 512 cred_t *cr = NULL; 513 cred_t *cr2; 514 mblk_t *mp2; 515 516 while (mp != NULL) { 517 dblk_t *dbp = mp->b_datap; 518 519 cr = dbp->db_credp; 520 if (cr == NULL) { 521 mp = mp->b_cont; 522 continue; 523 } 524 if (cpidp != NULL) 525 *cpidp = dbp->db_cpid; 526 527 #ifdef DEBUG 528 /* 529 * Normally there should at most one db_credp in a message. 530 * But if there are multiple (as in the case of some M_IOC* 531 * and some internal messages in TCP/IP bind logic) then 532 * they must be identical in the normal case. 533 * However, a socket can be shared between different uids 534 * in which case data queued in TCP would be from different 535 * creds. Thus we can only assert for the zoneid being the 536 * same. Due to Multi-level Level Ports for TX, some 537 * cred_t can have a NULL cr_zone, and we skip the comparison 538 * in that case. 539 */ 540 mp2 = mp->b_cont; 541 while (mp2 != NULL) { 542 cr2 = DB_CRED(mp2); 543 if (cr2 != NULL) { 544 DTRACE_PROBE2(msg__getcred, 545 cred_t *, cr, cred_t *, cr2); 546 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 547 crgetzone(cr) == NULL || 548 crgetzone(cr2) == NULL); 549 } 550 mp2 = mp2->b_cont; 551 } 552 #endif 553 return (cr); 554 } 555 if (cpidp != NULL) 556 *cpidp = NOPID; 557 return (NULL); 558 } 559 560 /* 561 * Variant of msg_getcred which, when a cred is found 562 * 1. Returns with a hold on the cred 563 * 2. Clears the first cred in the mblk. 564 * This is more efficient to use than a msg_getcred() + crhold() when 565 * the message is freed after the cred has been extracted. 566 * 567 * The caller is responsible for ensuring that there is no other reference 568 * on the message since db_credp can not be cleared when there are other 569 * references. 570 */ 571 cred_t * 572 msg_extractcred(mblk_t *mp, pid_t *cpidp) 573 { 574 cred_t *cr = NULL; 575 cred_t *cr2; 576 mblk_t *mp2; 577 578 while (mp != NULL) { 579 dblk_t *dbp = mp->b_datap; 580 581 cr = dbp->db_credp; 582 if (cr == NULL) { 583 mp = mp->b_cont; 584 continue; 585 } 586 ASSERT(dbp->db_ref == 1); 587 dbp->db_credp = NULL; 588 if (cpidp != NULL) 589 *cpidp = dbp->db_cpid; 590 #ifdef DEBUG 591 /* 592 * Normally there should at most one db_credp in a message. 593 * But if there are multiple (as in the case of some M_IOC* 594 * and some internal messages in TCP/IP bind logic) then 595 * they must be identical in the normal case. 596 * However, a socket can be shared between different uids 597 * in which case data queued in TCP would be from different 598 * creds. Thus we can only assert for the zoneid being the 599 * same. Due to Multi-level Level Ports for TX, some 600 * cred_t can have a NULL cr_zone, and we skip the comparison 601 * in that case. 602 */ 603 mp2 = mp->b_cont; 604 while (mp2 != NULL) { 605 cr2 = DB_CRED(mp2); 606 if (cr2 != NULL) { 607 DTRACE_PROBE2(msg__extractcred, 608 cred_t *, cr, cred_t *, cr2); 609 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 610 crgetzone(cr) == NULL || 611 crgetzone(cr2) == NULL); 612 } 613 mp2 = mp2->b_cont; 614 } 615 #endif 616 return (cr); 617 } 618 return (NULL); 619 } 620 /* 621 * Get the label for a message. Uses the first mblk in the message 622 * which has a non-NULL db_credp. 623 * Returns NULL if there is no credp. 624 */ 625 extern struct ts_label_s * 626 msg_getlabel(const mblk_t *mp) 627 { 628 cred_t *cr = msg_getcred(mp, NULL); 629 630 if (cr == NULL) 631 return (NULL); 632 633 return (crgetlabel(cr)); 634 } 635 636 void 637 freeb(mblk_t *mp) 638 { 639 dblk_t *dbp = mp->b_datap; 640 641 ASSERT(dbp->db_ref > 0); 642 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 643 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 644 645 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 646 647 dbp->db_free(mp, dbp); 648 } 649 650 void 651 freemsg(mblk_t *mp) 652 { 653 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 654 while (mp) { 655 dblk_t *dbp = mp->b_datap; 656 mblk_t *mp_cont = mp->b_cont; 657 658 ASSERT(dbp->db_ref > 0); 659 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 660 661 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 662 663 dbp->db_free(mp, dbp); 664 mp = mp_cont; 665 } 666 } 667 668 /* 669 * Reallocate a block for another use. Try hard to use the old block. 670 * If the old data is wanted (copy), leave b_wptr at the end of the data, 671 * otherwise return b_wptr = b_rptr. 672 * 673 * This routine is private and unstable. 674 */ 675 mblk_t * 676 reallocb(mblk_t *mp, size_t size, uint_t copy) 677 { 678 mblk_t *mp1; 679 unsigned char *old_rptr; 680 ptrdiff_t cur_size; 681 682 if (mp == NULL) 683 return (allocb(size, BPRI_HI)); 684 685 cur_size = mp->b_wptr - mp->b_rptr; 686 old_rptr = mp->b_rptr; 687 688 ASSERT(mp->b_datap->db_ref != 0); 689 690 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 691 /* 692 * If the data is wanted and it will fit where it is, no 693 * work is required. 694 */ 695 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 696 return (mp); 697 698 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 699 mp1 = mp; 700 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 701 /* XXX other mp state could be copied too, db_flags ... ? */ 702 mp1->b_cont = mp->b_cont; 703 } else { 704 return (NULL); 705 } 706 707 if (copy) { 708 bcopy(old_rptr, mp1->b_rptr, cur_size); 709 mp1->b_wptr = mp1->b_rptr + cur_size; 710 } 711 712 if (mp != mp1) 713 freeb(mp); 714 715 return (mp1); 716 } 717 718 static void 719 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 720 { 721 ASSERT(dbp->db_mblk == mp); 722 if (dbp->db_fthdr != NULL) 723 str_ftfree(dbp); 724 725 /* set credp and projid to be 'unspecified' before returning to cache */ 726 if (dbp->db_credp != NULL) { 727 crfree(dbp->db_credp); 728 dbp->db_credp = NULL; 729 } 730 dbp->db_cpid = -1; 731 732 /* Reset the struioflag and the checksum flag fields */ 733 dbp->db_struioflag = 0; 734 dbp->db_struioun.cksum.flags = 0; 735 736 /* and the COOKED and/or UIOA flag(s) */ 737 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA); 738 739 kmem_cache_free(dbp->db_cache, dbp); 740 } 741 742 static void 743 dblk_decref(mblk_t *mp, dblk_t *dbp) 744 { 745 if (dbp->db_ref != 1) { 746 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 747 -(1 << DBLK_RTFU_SHIFT(db_ref))); 748 /* 749 * atomic_add_32_nv() just decremented db_ref, so we no longer 750 * have a reference to the dblk, which means another thread 751 * could free it. Therefore we cannot examine the dblk to 752 * determine whether ours was the last reference. Instead, 753 * we extract the new and minimum reference counts from rtfu. 754 * Note that all we're really saying is "if (ref != refmin)". 755 */ 756 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 757 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 758 kmem_cache_free(mblk_cache, mp); 759 return; 760 } 761 } 762 dbp->db_mblk = mp; 763 dbp->db_free = dbp->db_lastfree; 764 dbp->db_lastfree(mp, dbp); 765 } 766 767 mblk_t * 768 dupb(mblk_t *mp) 769 { 770 dblk_t *dbp = mp->b_datap; 771 mblk_t *new_mp; 772 uint32_t oldrtfu, newrtfu; 773 774 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 775 goto out; 776 777 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 778 new_mp->b_rptr = mp->b_rptr; 779 new_mp->b_wptr = mp->b_wptr; 780 new_mp->b_datap = dbp; 781 new_mp->b_queue = NULL; 782 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 783 784 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 785 786 dbp->db_free = dblk_decref; 787 do { 788 ASSERT(dbp->db_ref > 0); 789 oldrtfu = DBLK_RTFU_WORD(dbp); 790 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 791 /* 792 * If db_ref is maxed out we can't dup this message anymore. 793 */ 794 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 795 kmem_cache_free(mblk_cache, new_mp); 796 new_mp = NULL; 797 goto out; 798 } 799 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != 800 oldrtfu); 801 802 out: 803 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 804 return (new_mp); 805 } 806 807 static void 808 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 809 { 810 frtn_t *frp = dbp->db_frtnp; 811 812 ASSERT(dbp->db_mblk == mp); 813 frp->free_func(frp->free_arg); 814 if (dbp->db_fthdr != NULL) 815 str_ftfree(dbp); 816 817 /* set credp and projid to be 'unspecified' before returning to cache */ 818 if (dbp->db_credp != NULL) { 819 crfree(dbp->db_credp); 820 dbp->db_credp = NULL; 821 } 822 dbp->db_cpid = -1; 823 dbp->db_struioflag = 0; 824 dbp->db_struioun.cksum.flags = 0; 825 826 kmem_cache_free(dbp->db_cache, dbp); 827 } 828 829 /*ARGSUSED*/ 830 static void 831 frnop_func(void *arg) 832 { 833 } 834 835 /* 836 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 837 * 838 * The variants with a 'd' prefix (desballoc, desballoca) 839 * directly free the mblk when it loses its last ref, 840 * where the other variants free asynchronously. 841 * The variants with an 'a' suffix (esballoca, desballoca) 842 * add an extra ref, effectively letting the streams subsystem 843 * know that the message data should not be modified. 844 * (eg. see db_ref checks in reallocb and elsewhere) 845 * 846 * The method used by the 'a' suffix functions to keep the dblk 847 * db_ref > 1 is non-obvious. The macro DBLK_RTFU(2,...) passed to 848 * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN 849 * bit in db_flags. In dblk_decref() that flag essentially means 850 * the dblk has one extra ref, so the "last ref" is one, not zero. 851 */ 852 static mblk_t * 853 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 854 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 855 { 856 dblk_t *dbp; 857 mblk_t *mp; 858 859 ASSERT(base != NULL && frp != NULL); 860 861 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 862 mp = NULL; 863 goto out; 864 } 865 866 mp = dbp->db_mblk; 867 dbp->db_base = base; 868 dbp->db_lim = base + size; 869 dbp->db_free = dbp->db_lastfree = lastfree; 870 dbp->db_frtnp = frp; 871 DBLK_RTFU_WORD(dbp) = db_rtfu; 872 mp->b_next = mp->b_prev = mp->b_cont = NULL; 873 mp->b_rptr = mp->b_wptr = base; 874 mp->b_queue = NULL; 875 MBLK_BAND_FLAG_WORD(mp) = 0; 876 877 out: 878 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 879 return (mp); 880 } 881 882 /*ARGSUSED*/ 883 mblk_t * 884 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 885 { 886 mblk_t *mp; 887 888 /* 889 * Note that this is structured to allow the common case (i.e. 890 * STREAMS flowtracing disabled) to call gesballoc() with tail 891 * call optimization. 892 */ 893 if (!str_ftnever) { 894 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 895 frp, freebs_enqueue, KM_NOSLEEP); 896 897 if (mp != NULL) 898 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 899 return (mp); 900 } 901 902 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 903 frp, freebs_enqueue, KM_NOSLEEP)); 904 } 905 906 /* 907 * Same as esballoc() but sleeps waiting for memory. 908 */ 909 /*ARGSUSED*/ 910 mblk_t * 911 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 912 { 913 mblk_t *mp; 914 915 /* 916 * Note that this is structured to allow the common case (i.e. 917 * STREAMS flowtracing disabled) to call gesballoc() with tail 918 * call optimization. 919 */ 920 if (!str_ftnever) { 921 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 922 frp, freebs_enqueue, KM_SLEEP); 923 924 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 925 return (mp); 926 } 927 928 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 929 frp, freebs_enqueue, KM_SLEEP)); 930 } 931 932 /*ARGSUSED*/ 933 mblk_t * 934 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 935 { 936 mblk_t *mp; 937 938 /* 939 * Note that this is structured to allow the common case (i.e. 940 * STREAMS flowtracing disabled) to call gesballoc() with tail 941 * call optimization. 942 */ 943 if (!str_ftnever) { 944 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 945 frp, dblk_lastfree_desb, KM_NOSLEEP); 946 947 if (mp != NULL) 948 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 949 return (mp); 950 } 951 952 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 953 frp, dblk_lastfree_desb, KM_NOSLEEP)); 954 } 955 956 /*ARGSUSED*/ 957 mblk_t * 958 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 959 { 960 mblk_t *mp; 961 962 /* 963 * Note that this is structured to allow the common case (i.e. 964 * STREAMS flowtracing disabled) to call gesballoc() with tail 965 * call optimization. 966 */ 967 if (!str_ftnever) { 968 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 969 frp, freebs_enqueue, KM_NOSLEEP); 970 971 if (mp != NULL) 972 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 973 return (mp); 974 } 975 976 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 977 frp, freebs_enqueue, KM_NOSLEEP)); 978 } 979 980 /*ARGSUSED*/ 981 mblk_t * 982 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 983 { 984 mblk_t *mp; 985 986 /* 987 * Note that this is structured to allow the common case (i.e. 988 * STREAMS flowtracing disabled) to call gesballoc() with tail 989 * call optimization. 990 */ 991 if (!str_ftnever) { 992 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 993 frp, dblk_lastfree_desb, KM_NOSLEEP); 994 995 if (mp != NULL) 996 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 997 return (mp); 998 } 999 1000 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 1001 frp, dblk_lastfree_desb, KM_NOSLEEP)); 1002 } 1003 1004 static void 1005 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 1006 { 1007 bcache_t *bcp = dbp->db_cache; 1008 1009 ASSERT(dbp->db_mblk == mp); 1010 if (dbp->db_fthdr != NULL) 1011 str_ftfree(dbp); 1012 1013 /* set credp and projid to be 'unspecified' before returning to cache */ 1014 if (dbp->db_credp != NULL) { 1015 crfree(dbp->db_credp); 1016 dbp->db_credp = NULL; 1017 } 1018 dbp->db_cpid = -1; 1019 dbp->db_struioflag = 0; 1020 dbp->db_struioun.cksum.flags = 0; 1021 1022 mutex_enter(&bcp->mutex); 1023 kmem_cache_free(bcp->dblk_cache, dbp); 1024 bcp->alloc--; 1025 1026 if (bcp->alloc == 0 && bcp->destroy != 0) { 1027 kmem_cache_destroy(bcp->dblk_cache); 1028 kmem_cache_destroy(bcp->buffer_cache); 1029 mutex_exit(&bcp->mutex); 1030 mutex_destroy(&bcp->mutex); 1031 kmem_free(bcp, sizeof (bcache_t)); 1032 } else { 1033 mutex_exit(&bcp->mutex); 1034 } 1035 } 1036 1037 bcache_t * 1038 bcache_create(char *name, size_t size, uint_t align) 1039 { 1040 bcache_t *bcp; 1041 char buffer[255]; 1042 1043 ASSERT((align & (align - 1)) == 0); 1044 1045 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL) 1046 return (NULL); 1047 1048 bcp->size = size; 1049 bcp->align = align; 1050 bcp->alloc = 0; 1051 bcp->destroy = 0; 1052 1053 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 1054 1055 (void) sprintf(buffer, "%s_buffer_cache", name); 1056 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 1057 NULL, NULL, NULL, 0); 1058 (void) sprintf(buffer, "%s_dblk_cache", name); 1059 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 1060 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 1061 NULL, (void *)bcp, NULL, 0); 1062 1063 return (bcp); 1064 } 1065 1066 void 1067 bcache_destroy(bcache_t *bcp) 1068 { 1069 ASSERT(bcp != NULL); 1070 1071 mutex_enter(&bcp->mutex); 1072 if (bcp->alloc == 0) { 1073 kmem_cache_destroy(bcp->dblk_cache); 1074 kmem_cache_destroy(bcp->buffer_cache); 1075 mutex_exit(&bcp->mutex); 1076 mutex_destroy(&bcp->mutex); 1077 kmem_free(bcp, sizeof (bcache_t)); 1078 } else { 1079 bcp->destroy++; 1080 mutex_exit(&bcp->mutex); 1081 } 1082 } 1083 1084 /*ARGSUSED*/ 1085 mblk_t * 1086 bcache_allocb(bcache_t *bcp, uint_t pri) 1087 { 1088 dblk_t *dbp; 1089 mblk_t *mp = NULL; 1090 1091 ASSERT(bcp != NULL); 1092 1093 mutex_enter(&bcp->mutex); 1094 if (bcp->destroy != 0) { 1095 mutex_exit(&bcp->mutex); 1096 goto out; 1097 } 1098 1099 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 1100 mutex_exit(&bcp->mutex); 1101 goto out; 1102 } 1103 bcp->alloc++; 1104 mutex_exit(&bcp->mutex); 1105 1106 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 1107 1108 mp = dbp->db_mblk; 1109 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1110 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1111 mp->b_rptr = mp->b_wptr = dbp->db_base; 1112 mp->b_queue = NULL; 1113 MBLK_BAND_FLAG_WORD(mp) = 0; 1114 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 1115 out: 1116 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 1117 1118 return (mp); 1119 } 1120 1121 static void 1122 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 1123 { 1124 ASSERT(dbp->db_mblk == mp); 1125 if (dbp->db_fthdr != NULL) 1126 str_ftfree(dbp); 1127 1128 /* set credp and projid to be 'unspecified' before returning to cache */ 1129 if (dbp->db_credp != NULL) { 1130 crfree(dbp->db_credp); 1131 dbp->db_credp = NULL; 1132 } 1133 dbp->db_cpid = -1; 1134 dbp->db_struioflag = 0; 1135 dbp->db_struioun.cksum.flags = 0; 1136 1137 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 1138 kmem_cache_free(dbp->db_cache, dbp); 1139 } 1140 1141 static mblk_t * 1142 allocb_oversize(size_t size, int kmflags) 1143 { 1144 mblk_t *mp; 1145 void *buf; 1146 1147 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 1148 if ((buf = kmem_alloc(size, kmflags)) == NULL) 1149 return (NULL); 1150 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 1151 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 1152 kmem_free(buf, size); 1153 1154 if (mp != NULL) 1155 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 1156 1157 return (mp); 1158 } 1159 1160 mblk_t * 1161 allocb_tryhard(size_t target_size) 1162 { 1163 size_t size; 1164 mblk_t *bp; 1165 1166 for (size = target_size; size < target_size + 512; 1167 size += DBLK_CACHE_ALIGN) 1168 if ((bp = allocb(size, BPRI_HI)) != NULL) 1169 return (bp); 1170 allocb_tryhard_fails++; 1171 return (NULL); 1172 } 1173 1174 /* 1175 * This routine is consolidation private for STREAMS internal use 1176 * This routine may only be called from sync routines (i.e., not 1177 * from put or service procedures). It is located here (rather 1178 * than strsubr.c) so that we don't have to expose all of the 1179 * allocb() implementation details in header files. 1180 */ 1181 mblk_t * 1182 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 1183 { 1184 dblk_t *dbp; 1185 mblk_t *mp; 1186 size_t index; 1187 1188 index = (size -1) >> DBLK_SIZE_SHIFT; 1189 1190 if (flags & STR_NOSIG) { 1191 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 1192 if (size != 0) { 1193 mp = allocb_oversize(size, KM_SLEEP); 1194 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1195 (uintptr_t)mp); 1196 return (mp); 1197 } 1198 index = 0; 1199 } 1200 1201 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1202 mp = dbp->db_mblk; 1203 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1204 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1205 mp->b_rptr = mp->b_wptr = dbp->db_base; 1206 mp->b_queue = NULL; 1207 MBLK_BAND_FLAG_WORD(mp) = 0; 1208 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1209 1210 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1211 1212 } else { 1213 while ((mp = allocb(size, pri)) == NULL) { 1214 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1215 return (NULL); 1216 } 1217 } 1218 1219 return (mp); 1220 } 1221 1222 /* 1223 * Call function 'func' with 'arg' when a class zero block can 1224 * be allocated with priority 'pri'. 1225 */ 1226 bufcall_id_t 1227 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1228 { 1229 return (bufcall(1, pri, func, arg)); 1230 } 1231 1232 /* 1233 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1234 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1235 * This provides consistency for all internal allocators of ioctl. 1236 */ 1237 mblk_t * 1238 mkiocb(uint_t cmd) 1239 { 1240 struct iocblk *ioc; 1241 mblk_t *mp; 1242 1243 /* 1244 * Allocate enough space for any of the ioctl related messages. 1245 */ 1246 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1247 return (NULL); 1248 1249 bzero(mp->b_rptr, sizeof (union ioctypes)); 1250 1251 /* 1252 * Set the mblk_t information and ptrs correctly. 1253 */ 1254 mp->b_wptr += sizeof (struct iocblk); 1255 mp->b_datap->db_type = M_IOCTL; 1256 1257 /* 1258 * Fill in the fields. 1259 */ 1260 ioc = (struct iocblk *)mp->b_rptr; 1261 ioc->ioc_cmd = cmd; 1262 ioc->ioc_cr = kcred; 1263 ioc->ioc_id = getiocseqno(); 1264 ioc->ioc_flag = IOC_NATIVE; 1265 return (mp); 1266 } 1267 1268 /* 1269 * test if block of given size can be allocated with a request of 1270 * the given priority. 1271 * 'pri' is no longer used, but is retained for compatibility. 1272 */ 1273 /* ARGSUSED */ 1274 int 1275 testb(size_t size, uint_t pri) 1276 { 1277 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1278 } 1279 1280 /* 1281 * Call function 'func' with argument 'arg' when there is a reasonably 1282 * good chance that a block of size 'size' can be allocated. 1283 * 'pri' is no longer used, but is retained for compatibility. 1284 */ 1285 /* ARGSUSED */ 1286 bufcall_id_t 1287 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1288 { 1289 static long bid = 1; /* always odd to save checking for zero */ 1290 bufcall_id_t bc_id; 1291 struct strbufcall *bcp; 1292 1293 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1294 return (0); 1295 1296 bcp->bc_func = func; 1297 bcp->bc_arg = arg; 1298 bcp->bc_size = size; 1299 bcp->bc_next = NULL; 1300 bcp->bc_executor = NULL; 1301 1302 mutex_enter(&strbcall_lock); 1303 /* 1304 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1305 * should be no references to bcp since it may be freed by 1306 * runbufcalls(). Since bcp_id field is returned, we save its value in 1307 * the local var. 1308 */ 1309 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1310 1311 /* 1312 * add newly allocated stream event to existing 1313 * linked list of events. 1314 */ 1315 if (strbcalls.bc_head == NULL) { 1316 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1317 } else { 1318 strbcalls.bc_tail->bc_next = bcp; 1319 strbcalls.bc_tail = bcp; 1320 } 1321 1322 cv_signal(&strbcall_cv); 1323 mutex_exit(&strbcall_lock); 1324 return (bc_id); 1325 } 1326 1327 /* 1328 * Cancel a bufcall request. 1329 */ 1330 void 1331 unbufcall(bufcall_id_t id) 1332 { 1333 strbufcall_t *bcp, *pbcp; 1334 1335 mutex_enter(&strbcall_lock); 1336 again: 1337 pbcp = NULL; 1338 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1339 if (id == bcp->bc_id) 1340 break; 1341 pbcp = bcp; 1342 } 1343 if (bcp) { 1344 if (bcp->bc_executor != NULL) { 1345 if (bcp->bc_executor != curthread) { 1346 cv_wait(&bcall_cv, &strbcall_lock); 1347 goto again; 1348 } 1349 } else { 1350 if (pbcp) 1351 pbcp->bc_next = bcp->bc_next; 1352 else 1353 strbcalls.bc_head = bcp->bc_next; 1354 if (bcp == strbcalls.bc_tail) 1355 strbcalls.bc_tail = pbcp; 1356 kmem_free(bcp, sizeof (strbufcall_t)); 1357 } 1358 } 1359 mutex_exit(&strbcall_lock); 1360 } 1361 1362 /* 1363 * Duplicate a message block by block (uses dupb), returning 1364 * a pointer to the duplicate message. 1365 * Returns a non-NULL value only if the entire message 1366 * was dup'd. 1367 */ 1368 mblk_t * 1369 dupmsg(mblk_t *bp) 1370 { 1371 mblk_t *head, *nbp; 1372 1373 if (!bp || !(nbp = head = dupb(bp))) 1374 return (NULL); 1375 1376 while (bp->b_cont) { 1377 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1378 freemsg(head); 1379 return (NULL); 1380 } 1381 nbp = nbp->b_cont; 1382 bp = bp->b_cont; 1383 } 1384 return (head); 1385 } 1386 1387 #define DUPB_NOLOAN(bp) \ 1388 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1389 copyb((bp)) : dupb((bp))) 1390 1391 mblk_t * 1392 dupmsg_noloan(mblk_t *bp) 1393 { 1394 mblk_t *head, *nbp; 1395 1396 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1397 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1398 return (NULL); 1399 1400 while (bp->b_cont) { 1401 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1402 freemsg(head); 1403 return (NULL); 1404 } 1405 nbp = nbp->b_cont; 1406 bp = bp->b_cont; 1407 } 1408 return (head); 1409 } 1410 1411 /* 1412 * Copy data from message and data block to newly allocated message and 1413 * data block. Returns new message block pointer, or NULL if error. 1414 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1415 * as in the original even when db_base is not word aligned. (bug 1052877) 1416 */ 1417 mblk_t * 1418 copyb(mblk_t *bp) 1419 { 1420 mblk_t *nbp; 1421 dblk_t *dp, *ndp; 1422 uchar_t *base; 1423 size_t size; 1424 size_t unaligned; 1425 1426 ASSERT(bp->b_wptr >= bp->b_rptr); 1427 1428 dp = bp->b_datap; 1429 if (dp->db_fthdr != NULL) 1430 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1431 1432 size = dp->db_lim - dp->db_base; 1433 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1434 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1435 return (NULL); 1436 nbp->b_flag = bp->b_flag; 1437 nbp->b_band = bp->b_band; 1438 ndp = nbp->b_datap; 1439 1440 /* 1441 * Copy the various checksum information that came in 1442 * originally. 1443 */ 1444 ndp->db_cksumstart = dp->db_cksumstart; 1445 ndp->db_cksumend = dp->db_cksumend; 1446 ndp->db_cksumstuff = dp->db_cksumstuff; 1447 bcopy(dp->db_struioun.data, ndp->db_struioun.data, 1448 sizeof (dp->db_struioun.data)); 1449 1450 /* 1451 * Well, here is a potential issue. If we are trying to 1452 * trace a flow, and we copy the message, we might lose 1453 * information about where this message might have been. 1454 * So we should inherit the FT data. On the other hand, 1455 * a user might be interested only in alloc to free data. 1456 * So I guess the real answer is to provide a tunable. 1457 */ 1458 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1459 1460 base = ndp->db_base + unaligned; 1461 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1462 1463 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1464 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1465 1466 return (nbp); 1467 } 1468 1469 /* 1470 * Copy data from message to newly allocated message using new 1471 * data blocks. Returns a pointer to the new message, or NULL if error. 1472 */ 1473 mblk_t * 1474 copymsg(mblk_t *bp) 1475 { 1476 mblk_t *head, *nbp; 1477 1478 if (!bp || !(nbp = head = copyb(bp))) 1479 return (NULL); 1480 1481 while (bp->b_cont) { 1482 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1483 freemsg(head); 1484 return (NULL); 1485 } 1486 nbp = nbp->b_cont; 1487 bp = bp->b_cont; 1488 } 1489 return (head); 1490 } 1491 1492 /* 1493 * link a message block to tail of message 1494 */ 1495 void 1496 linkb(mblk_t *mp, mblk_t *bp) 1497 { 1498 ASSERT(mp && bp); 1499 1500 for (; mp->b_cont; mp = mp->b_cont) 1501 ; 1502 mp->b_cont = bp; 1503 } 1504 1505 /* 1506 * unlink a message block from head of message 1507 * return pointer to new message. 1508 * NULL if message becomes empty. 1509 */ 1510 mblk_t * 1511 unlinkb(mblk_t *bp) 1512 { 1513 mblk_t *bp1; 1514 1515 bp1 = bp->b_cont; 1516 bp->b_cont = NULL; 1517 return (bp1); 1518 } 1519 1520 /* 1521 * remove a message block "bp" from message "mp" 1522 * 1523 * Return pointer to new message or NULL if no message remains. 1524 * Return -1 if bp is not found in message. 1525 */ 1526 mblk_t * 1527 rmvb(mblk_t *mp, mblk_t *bp) 1528 { 1529 mblk_t *tmp; 1530 mblk_t *lastp = NULL; 1531 1532 ASSERT(mp && bp); 1533 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1534 if (tmp == bp) { 1535 if (lastp) 1536 lastp->b_cont = tmp->b_cont; 1537 else 1538 mp = tmp->b_cont; 1539 tmp->b_cont = NULL; 1540 return (mp); 1541 } 1542 lastp = tmp; 1543 } 1544 return ((mblk_t *)-1); 1545 } 1546 1547 /* 1548 * Concatenate and align first len bytes of common 1549 * message type. Len == -1, means concat everything. 1550 * Returns 1 on success, 0 on failure 1551 * After the pullup, mp points to the pulled up data. 1552 */ 1553 int 1554 pullupmsg(mblk_t *mp, ssize_t len) 1555 { 1556 mblk_t *bp, *b_cont; 1557 dblk_t *dbp; 1558 ssize_t n; 1559 1560 ASSERT(mp->b_datap->db_ref > 0); 1561 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1562 1563 if (len == -1) { 1564 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1565 return (1); 1566 len = xmsgsize(mp); 1567 } else { 1568 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1569 ASSERT(first_mblk_len >= 0); 1570 /* 1571 * If the length is less than that of the first mblk, 1572 * we want to pull up the message into an aligned mblk. 1573 * Though not part of the spec, some callers assume it. 1574 */ 1575 if (len <= first_mblk_len) { 1576 if (str_aligned(mp->b_rptr)) 1577 return (1); 1578 len = first_mblk_len; 1579 } else if (xmsgsize(mp) < len) 1580 return (0); 1581 } 1582 1583 if ((bp = allocb_tmpl(len, mp)) == NULL) 1584 return (0); 1585 1586 dbp = bp->b_datap; 1587 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1588 mp->b_datap = dbp; /* ... and mp heads the new message */ 1589 mp->b_datap->db_mblk = mp; 1590 bp->b_datap->db_mblk = bp; 1591 mp->b_rptr = mp->b_wptr = dbp->db_base; 1592 1593 do { 1594 ASSERT(bp->b_datap->db_ref > 0); 1595 ASSERT(bp->b_wptr >= bp->b_rptr); 1596 n = MIN(bp->b_wptr - bp->b_rptr, len); 1597 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1598 if (n > 0) 1599 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1600 mp->b_wptr += n; 1601 bp->b_rptr += n; 1602 len -= n; 1603 if (bp->b_rptr != bp->b_wptr) 1604 break; 1605 b_cont = bp->b_cont; 1606 freeb(bp); 1607 bp = b_cont; 1608 } while (len && bp); 1609 1610 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1611 1612 return (1); 1613 } 1614 1615 /* 1616 * Concatenate and align at least the first len bytes of common message 1617 * type. Len == -1 means concatenate everything. The original message is 1618 * unaltered. Returns a pointer to a new message on success, otherwise 1619 * returns NULL. 1620 */ 1621 mblk_t * 1622 msgpullup(mblk_t *mp, ssize_t len) 1623 { 1624 mblk_t *newmp; 1625 ssize_t totlen; 1626 ssize_t n; 1627 1628 totlen = xmsgsize(mp); 1629 1630 if ((len > 0) && (len > totlen)) 1631 return (NULL); 1632 1633 /* 1634 * Copy all of the first msg type into one new mblk, then dupmsg 1635 * and link the rest onto this. 1636 */ 1637 1638 len = totlen; 1639 1640 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1641 return (NULL); 1642 1643 newmp->b_flag = mp->b_flag; 1644 newmp->b_band = mp->b_band; 1645 1646 while (len > 0) { 1647 n = mp->b_wptr - mp->b_rptr; 1648 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1649 if (n > 0) 1650 bcopy(mp->b_rptr, newmp->b_wptr, n); 1651 newmp->b_wptr += n; 1652 len -= n; 1653 mp = mp->b_cont; 1654 } 1655 1656 if (mp != NULL) { 1657 newmp->b_cont = dupmsg(mp); 1658 if (newmp->b_cont == NULL) { 1659 freemsg(newmp); 1660 return (NULL); 1661 } 1662 } 1663 1664 return (newmp); 1665 } 1666 1667 /* 1668 * Trim bytes from message 1669 * len > 0, trim from head 1670 * len < 0, trim from tail 1671 * Returns 1 on success, 0 on failure. 1672 */ 1673 int 1674 adjmsg(mblk_t *mp, ssize_t len) 1675 { 1676 mblk_t *bp; 1677 mblk_t *save_bp = NULL; 1678 mblk_t *prev_bp; 1679 mblk_t *bcont; 1680 unsigned char type; 1681 ssize_t n; 1682 int fromhead; 1683 int first; 1684 1685 ASSERT(mp != NULL); 1686 1687 if (len < 0) { 1688 fromhead = 0; 1689 len = -len; 1690 } else { 1691 fromhead = 1; 1692 } 1693 1694 if (xmsgsize(mp) < len) 1695 return (0); 1696 1697 if (fromhead) { 1698 first = 1; 1699 while (len) { 1700 ASSERT(mp->b_wptr >= mp->b_rptr); 1701 n = MIN(mp->b_wptr - mp->b_rptr, len); 1702 mp->b_rptr += n; 1703 len -= n; 1704 1705 /* 1706 * If this is not the first zero length 1707 * message remove it 1708 */ 1709 if (!first && (mp->b_wptr == mp->b_rptr)) { 1710 bcont = mp->b_cont; 1711 freeb(mp); 1712 mp = save_bp->b_cont = bcont; 1713 } else { 1714 save_bp = mp; 1715 mp = mp->b_cont; 1716 } 1717 first = 0; 1718 } 1719 } else { 1720 type = mp->b_datap->db_type; 1721 while (len) { 1722 bp = mp; 1723 save_bp = NULL; 1724 1725 /* 1726 * Find the last message of same type 1727 */ 1728 while (bp && bp->b_datap->db_type == type) { 1729 ASSERT(bp->b_wptr >= bp->b_rptr); 1730 prev_bp = save_bp; 1731 save_bp = bp; 1732 bp = bp->b_cont; 1733 } 1734 if (save_bp == NULL) 1735 break; 1736 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1737 save_bp->b_wptr -= n; 1738 len -= n; 1739 1740 /* 1741 * If this is not the first message 1742 * and we have taken away everything 1743 * from this message, remove it 1744 */ 1745 1746 if ((save_bp != mp) && 1747 (save_bp->b_wptr == save_bp->b_rptr)) { 1748 bcont = save_bp->b_cont; 1749 freeb(save_bp); 1750 prev_bp->b_cont = bcont; 1751 } 1752 } 1753 } 1754 return (1); 1755 } 1756 1757 /* 1758 * get number of data bytes in message 1759 */ 1760 size_t 1761 msgdsize(mblk_t *bp) 1762 { 1763 size_t count = 0; 1764 1765 for (; bp; bp = bp->b_cont) 1766 if (bp->b_datap->db_type == M_DATA) { 1767 ASSERT(bp->b_wptr >= bp->b_rptr); 1768 count += bp->b_wptr - bp->b_rptr; 1769 } 1770 return (count); 1771 } 1772 1773 /* 1774 * Get a message off head of queue 1775 * 1776 * If queue has no buffers then mark queue 1777 * with QWANTR. (queue wants to be read by 1778 * someone when data becomes available) 1779 * 1780 * If there is something to take off then do so. 1781 * If queue falls below hi water mark turn off QFULL 1782 * flag. Decrement weighted count of queue. 1783 * Also turn off QWANTR because queue is being read. 1784 * 1785 * The queue count is maintained on a per-band basis. 1786 * Priority band 0 (normal messages) uses q_count, 1787 * q_lowat, etc. Non-zero priority bands use the 1788 * fields in their respective qband structures 1789 * (qb_count, qb_lowat, etc.) All messages appear 1790 * on the same list, linked via their b_next pointers. 1791 * q_first is the head of the list. q_count does 1792 * not reflect the size of all the messages on the 1793 * queue. It only reflects those messages in the 1794 * normal band of flow. The one exception to this 1795 * deals with high priority messages. They are in 1796 * their own conceptual "band", but are accounted 1797 * against q_count. 1798 * 1799 * If queue count is below the lo water mark and QWANTW 1800 * is set, enable the closest backq which has a service 1801 * procedure and turn off the QWANTW flag. 1802 * 1803 * getq could be built on top of rmvq, but isn't because 1804 * of performance considerations. 1805 * 1806 * A note on the use of q_count and q_mblkcnt: 1807 * q_count is the traditional byte count for messages that 1808 * have been put on a queue. Documentation tells us that 1809 * we shouldn't rely on that count, but some drivers/modules 1810 * do. What was needed, however, is a mechanism to prevent 1811 * runaway streams from consuming all of the resources, 1812 * and particularly be able to flow control zero-length 1813 * messages. q_mblkcnt is used for this purpose. It 1814 * counts the number of mblk's that are being put on 1815 * the queue. The intention here, is that each mblk should 1816 * contain one byte of data and, for the purpose of 1817 * flow-control, logically does. A queue will become 1818 * full when EITHER of these values (q_count and q_mblkcnt) 1819 * reach the highwater mark. It will clear when BOTH 1820 * of them drop below the highwater mark. And it will 1821 * backenable when BOTH of them drop below the lowwater 1822 * mark. 1823 * With this algorithm, a driver/module might be able 1824 * to find a reasonably accurate q_count, and the 1825 * framework can still try and limit resource usage. 1826 */ 1827 mblk_t * 1828 getq(queue_t *q) 1829 { 1830 mblk_t *bp; 1831 uchar_t band = 0; 1832 1833 bp = getq_noenab(q, 0); 1834 if (bp != NULL) 1835 band = bp->b_band; 1836 1837 /* 1838 * Inlined from qbackenable(). 1839 * Quick check without holding the lock. 1840 */ 1841 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1842 return (bp); 1843 1844 qbackenable(q, band); 1845 return (bp); 1846 } 1847 1848 /* 1849 * Returns the number of bytes in a message (a message is defined as a 1850 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we 1851 * also return the number of distinct mblks in the message. 1852 */ 1853 int 1854 mp_cont_len(mblk_t *bp, int *mblkcnt) 1855 { 1856 mblk_t *mp; 1857 int mblks = 0; 1858 int bytes = 0; 1859 1860 for (mp = bp; mp != NULL; mp = mp->b_cont) { 1861 bytes += MBLKL(mp); 1862 mblks++; 1863 } 1864 1865 if (mblkcnt != NULL) 1866 *mblkcnt = mblks; 1867 1868 return (bytes); 1869 } 1870 1871 /* 1872 * Like getq() but does not backenable. This is used by the stream 1873 * head when a putback() is likely. The caller must call qbackenable() 1874 * after it is done with accessing the queue. 1875 * The rbytes arguments to getq_noneab() allows callers to specify a 1876 * the maximum number of bytes to return. If the current amount on the 1877 * queue is less than this then the entire message will be returned. 1878 * A value of 0 returns the entire message and is equivalent to the old 1879 * default behaviour prior to the addition of the rbytes argument. 1880 */ 1881 mblk_t * 1882 getq_noenab(queue_t *q, ssize_t rbytes) 1883 { 1884 mblk_t *bp, *mp1; 1885 mblk_t *mp2 = NULL; 1886 qband_t *qbp; 1887 kthread_id_t freezer; 1888 int bytecnt = 0, mblkcnt = 0; 1889 1890 /* freezestr should allow its caller to call getq/putq */ 1891 freezer = STREAM(q)->sd_freezer; 1892 if (freezer == curthread) { 1893 ASSERT(frozenstr(q)); 1894 ASSERT(MUTEX_HELD(QLOCK(q))); 1895 } else 1896 mutex_enter(QLOCK(q)); 1897 1898 if ((bp = q->q_first) == 0) { 1899 q->q_flag |= QWANTR; 1900 } else { 1901 /* 1902 * If the caller supplied a byte threshold and there is 1903 * more than this amount on the queue then break up the 1904 * the message appropriately. We can only safely do 1905 * this for M_DATA messages. 1906 */ 1907 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) && 1908 (q->q_count > rbytes)) { 1909 /* 1910 * Inline version of mp_cont_len() which terminates 1911 * when we meet or exceed rbytes. 1912 */ 1913 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) { 1914 mblkcnt++; 1915 bytecnt += MBLKL(mp1); 1916 if (bytecnt >= rbytes) 1917 break; 1918 } 1919 /* 1920 * We need to account for the following scenarios: 1921 * 1922 * 1) Too much data in the first message: 1923 * mp1 will be the mblk which puts us over our 1924 * byte limit. 1925 * 2) Not enough data in the first message: 1926 * mp1 will be NULL. 1927 * 3) Exactly the right amount of data contained within 1928 * whole mblks: 1929 * mp1->b_cont will be where we break the message. 1930 */ 1931 if (bytecnt > rbytes) { 1932 /* 1933 * Dup/copy mp1 and put what we don't need 1934 * back onto the queue. Adjust the read/write 1935 * and continuation pointers appropriately 1936 * and decrement the current mblk count to 1937 * reflect we are putting an mblk back onto 1938 * the queue. 1939 * When adjusting the message pointers, it's 1940 * OK to use the existing bytecnt and the 1941 * requested amount (rbytes) to calculate the 1942 * the new write offset (b_wptr) of what we 1943 * are taking. However, we cannot use these 1944 * values when calculating the read offset of 1945 * the mblk we are putting back on the queue. 1946 * This is because the begining (b_rptr) of the 1947 * mblk represents some arbitrary point within 1948 * the message. 1949 * It's simplest to do this by advancing b_rptr 1950 * by the new length of mp1 as we don't have to 1951 * remember any intermediate state. 1952 */ 1953 ASSERT(mp1 != NULL); 1954 mblkcnt--; 1955 if ((mp2 = dupb(mp1)) == NULL && 1956 (mp2 = copyb(mp1)) == NULL) { 1957 bytecnt = mblkcnt = 0; 1958 goto dup_failed; 1959 } 1960 mp2->b_cont = mp1->b_cont; 1961 mp1->b_wptr -= bytecnt - rbytes; 1962 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr; 1963 mp1->b_cont = NULL; 1964 bytecnt = rbytes; 1965 } else { 1966 /* 1967 * Either there is not enough data in the first 1968 * message or there is no excess data to deal 1969 * with. If mp1 is NULL, we are taking the 1970 * whole message. No need to do anything. 1971 * Otherwise we assign mp1->b_cont to mp2 as 1972 * we will be putting this back onto the head of 1973 * the queue. 1974 */ 1975 if (mp1 != NULL) { 1976 mp2 = mp1->b_cont; 1977 mp1->b_cont = NULL; 1978 } 1979 } 1980 /* 1981 * If mp2 is not NULL then we have part of the message 1982 * to put back onto the queue. 1983 */ 1984 if (mp2 != NULL) { 1985 if ((mp2->b_next = bp->b_next) == NULL) 1986 q->q_last = mp2; 1987 else 1988 bp->b_next->b_prev = mp2; 1989 q->q_first = mp2; 1990 } else { 1991 if ((q->q_first = bp->b_next) == NULL) 1992 q->q_last = NULL; 1993 else 1994 q->q_first->b_prev = NULL; 1995 } 1996 } else { 1997 /* 1998 * Either no byte threshold was supplied, there is 1999 * not enough on the queue or we failed to 2000 * duplicate/copy a data block. In these cases we 2001 * just take the entire first message. 2002 */ 2003 dup_failed: 2004 bytecnt = mp_cont_len(bp, &mblkcnt); 2005 if ((q->q_first = bp->b_next) == NULL) 2006 q->q_last = NULL; 2007 else 2008 q->q_first->b_prev = NULL; 2009 } 2010 if (bp->b_band == 0) { 2011 q->q_count -= bytecnt; 2012 q->q_mblkcnt -= mblkcnt; 2013 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2014 (q->q_mblkcnt < q->q_hiwat))) { 2015 q->q_flag &= ~QFULL; 2016 } 2017 } else { 2018 int i; 2019 2020 ASSERT(bp->b_band <= q->q_nband); 2021 ASSERT(q->q_bandp != NULL); 2022 ASSERT(MUTEX_HELD(QLOCK(q))); 2023 qbp = q->q_bandp; 2024 i = bp->b_band; 2025 while (--i > 0) 2026 qbp = qbp->qb_next; 2027 if (qbp->qb_first == qbp->qb_last) { 2028 qbp->qb_first = NULL; 2029 qbp->qb_last = NULL; 2030 } else { 2031 qbp->qb_first = bp->b_next; 2032 } 2033 qbp->qb_count -= bytecnt; 2034 qbp->qb_mblkcnt -= mblkcnt; 2035 if (qbp->qb_mblkcnt == 0 || 2036 ((qbp->qb_count < qbp->qb_hiwat) && 2037 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2038 qbp->qb_flag &= ~QB_FULL; 2039 } 2040 } 2041 q->q_flag &= ~QWANTR; 2042 bp->b_next = NULL; 2043 bp->b_prev = NULL; 2044 } 2045 if (freezer != curthread) 2046 mutex_exit(QLOCK(q)); 2047 2048 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0); 2049 2050 return (bp); 2051 } 2052 2053 /* 2054 * Determine if a backenable is needed after removing a message in the 2055 * specified band. 2056 * NOTE: This routine assumes that something like getq_noenab() has been 2057 * already called. 2058 * 2059 * For the read side it is ok to hold sd_lock across calling this (and the 2060 * stream head often does). 2061 * But for the write side strwakeq might be invoked and it acquires sd_lock. 2062 */ 2063 void 2064 qbackenable(queue_t *q, uchar_t band) 2065 { 2066 int backenab = 0; 2067 qband_t *qbp; 2068 kthread_id_t freezer; 2069 2070 ASSERT(q); 2071 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 2072 2073 /* 2074 * Quick check without holding the lock. 2075 * OK since after getq() has lowered the q_count these flags 2076 * would not change unless either the qbackenable() is done by 2077 * another thread (which is ok) or the queue has gotten QFULL 2078 * in which case another backenable will take place when the queue 2079 * drops below q_lowat. 2080 */ 2081 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 2082 return; 2083 2084 /* freezestr should allow its caller to call getq/putq */ 2085 freezer = STREAM(q)->sd_freezer; 2086 if (freezer == curthread) { 2087 ASSERT(frozenstr(q)); 2088 ASSERT(MUTEX_HELD(QLOCK(q))); 2089 } else 2090 mutex_enter(QLOCK(q)); 2091 2092 if (band == 0) { 2093 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 2094 q->q_mblkcnt < q->q_lowat)) { 2095 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 2096 } 2097 } else { 2098 int i; 2099 2100 ASSERT((unsigned)band <= q->q_nband); 2101 ASSERT(q->q_bandp != NULL); 2102 2103 qbp = q->q_bandp; 2104 i = band; 2105 while (--i > 0) 2106 qbp = qbp->qb_next; 2107 2108 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 2109 qbp->qb_mblkcnt < qbp->qb_lowat)) { 2110 backenab = qbp->qb_flag & QB_WANTW; 2111 } 2112 } 2113 2114 if (backenab == 0) { 2115 if (freezer != curthread) 2116 mutex_exit(QLOCK(q)); 2117 return; 2118 } 2119 2120 /* Have to drop the lock across strwakeq and backenable */ 2121 if (backenab & QWANTWSYNC) 2122 q->q_flag &= ~QWANTWSYNC; 2123 if (backenab & (QWANTW|QB_WANTW)) { 2124 if (band != 0) 2125 qbp->qb_flag &= ~QB_WANTW; 2126 else { 2127 q->q_flag &= ~QWANTW; 2128 } 2129 } 2130 2131 if (freezer != curthread) 2132 mutex_exit(QLOCK(q)); 2133 2134 if (backenab & QWANTWSYNC) 2135 strwakeq(q, QWANTWSYNC); 2136 if (backenab & (QWANTW|QB_WANTW)) 2137 backenable(q, band); 2138 } 2139 2140 /* 2141 * Remove a message from a queue. The queue count and other 2142 * flow control parameters are adjusted and the back queue 2143 * enabled if necessary. 2144 * 2145 * rmvq can be called with the stream frozen, but other utility functions 2146 * holding QLOCK, and by streams modules without any locks/frozen. 2147 */ 2148 void 2149 rmvq(queue_t *q, mblk_t *mp) 2150 { 2151 ASSERT(mp != NULL); 2152 2153 rmvq_noenab(q, mp); 2154 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 2155 /* 2156 * qbackenable can handle a frozen stream but not a "random" 2157 * qlock being held. Drop lock across qbackenable. 2158 */ 2159 mutex_exit(QLOCK(q)); 2160 qbackenable(q, mp->b_band); 2161 mutex_enter(QLOCK(q)); 2162 } else { 2163 qbackenable(q, mp->b_band); 2164 } 2165 } 2166 2167 /* 2168 * Like rmvq() but without any backenabling. 2169 * This exists to handle SR_CONSOL_DATA in strrput(). 2170 */ 2171 void 2172 rmvq_noenab(queue_t *q, mblk_t *mp) 2173 { 2174 int i; 2175 qband_t *qbp = NULL; 2176 kthread_id_t freezer; 2177 int bytecnt = 0, mblkcnt = 0; 2178 2179 freezer = STREAM(q)->sd_freezer; 2180 if (freezer == curthread) { 2181 ASSERT(frozenstr(q)); 2182 ASSERT(MUTEX_HELD(QLOCK(q))); 2183 } else if (MUTEX_HELD(QLOCK(q))) { 2184 /* Don't drop lock on exit */ 2185 freezer = curthread; 2186 } else 2187 mutex_enter(QLOCK(q)); 2188 2189 ASSERT(mp->b_band <= q->q_nband); 2190 if (mp->b_band != 0) { /* Adjust band pointers */ 2191 ASSERT(q->q_bandp != NULL); 2192 qbp = q->q_bandp; 2193 i = mp->b_band; 2194 while (--i > 0) 2195 qbp = qbp->qb_next; 2196 if (mp == qbp->qb_first) { 2197 if (mp->b_next && mp->b_band == mp->b_next->b_band) 2198 qbp->qb_first = mp->b_next; 2199 else 2200 qbp->qb_first = NULL; 2201 } 2202 if (mp == qbp->qb_last) { 2203 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 2204 qbp->qb_last = mp->b_prev; 2205 else 2206 qbp->qb_last = NULL; 2207 } 2208 } 2209 2210 /* 2211 * Remove the message from the list. 2212 */ 2213 if (mp->b_prev) 2214 mp->b_prev->b_next = mp->b_next; 2215 else 2216 q->q_first = mp->b_next; 2217 if (mp->b_next) 2218 mp->b_next->b_prev = mp->b_prev; 2219 else 2220 q->q_last = mp->b_prev; 2221 mp->b_next = NULL; 2222 mp->b_prev = NULL; 2223 2224 /* Get the size of the message for q_count accounting */ 2225 bytecnt = mp_cont_len(mp, &mblkcnt); 2226 2227 if (mp->b_band == 0) { /* Perform q_count accounting */ 2228 q->q_count -= bytecnt; 2229 q->q_mblkcnt -= mblkcnt; 2230 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2231 (q->q_mblkcnt < q->q_hiwat))) { 2232 q->q_flag &= ~QFULL; 2233 } 2234 } else { /* Perform qb_count accounting */ 2235 qbp->qb_count -= bytecnt; 2236 qbp->qb_mblkcnt -= mblkcnt; 2237 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) && 2238 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2239 qbp->qb_flag &= ~QB_FULL; 2240 } 2241 } 2242 if (freezer != curthread) 2243 mutex_exit(QLOCK(q)); 2244 2245 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0); 2246 } 2247 2248 /* 2249 * Empty a queue. 2250 * If flag is set, remove all messages. Otherwise, remove 2251 * only non-control messages. If queue falls below its low 2252 * water mark, and QWANTW is set, enable the nearest upstream 2253 * service procedure. 2254 * 2255 * Historical note: when merging the M_FLUSH code in strrput with this 2256 * code one difference was discovered. flushq did not have a check 2257 * for q_lowat == 0 in the backenabling test. 2258 * 2259 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2260 * if one exists on the queue. 2261 */ 2262 void 2263 flushq_common(queue_t *q, int flag, int pcproto_flag) 2264 { 2265 mblk_t *mp, *nmp; 2266 qband_t *qbp; 2267 int backenab = 0; 2268 unsigned char bpri; 2269 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2270 2271 if (q->q_first == NULL) 2272 return; 2273 2274 mutex_enter(QLOCK(q)); 2275 mp = q->q_first; 2276 q->q_first = NULL; 2277 q->q_last = NULL; 2278 q->q_count = 0; 2279 q->q_mblkcnt = 0; 2280 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2281 qbp->qb_first = NULL; 2282 qbp->qb_last = NULL; 2283 qbp->qb_count = 0; 2284 qbp->qb_mblkcnt = 0; 2285 qbp->qb_flag &= ~QB_FULL; 2286 } 2287 q->q_flag &= ~QFULL; 2288 mutex_exit(QLOCK(q)); 2289 while (mp) { 2290 nmp = mp->b_next; 2291 mp->b_next = mp->b_prev = NULL; 2292 2293 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0); 2294 2295 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2296 (void) putq(q, mp); 2297 else if (flag || datamsg(mp->b_datap->db_type)) 2298 freemsg(mp); 2299 else 2300 (void) putq(q, mp); 2301 mp = nmp; 2302 } 2303 bpri = 1; 2304 mutex_enter(QLOCK(q)); 2305 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2306 if ((qbp->qb_flag & QB_WANTW) && 2307 (((qbp->qb_count < qbp->qb_lowat) && 2308 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2309 qbp->qb_lowat == 0)) { 2310 qbp->qb_flag &= ~QB_WANTW; 2311 backenab = 1; 2312 qbf[bpri] = 1; 2313 } else 2314 qbf[bpri] = 0; 2315 bpri++; 2316 } 2317 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2318 if ((q->q_flag & QWANTW) && 2319 (((q->q_count < q->q_lowat) && 2320 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2321 q->q_flag &= ~QWANTW; 2322 backenab = 1; 2323 qbf[0] = 1; 2324 } else 2325 qbf[0] = 0; 2326 2327 /* 2328 * If any band can now be written to, and there is a writer 2329 * for that band, then backenable the closest service procedure. 2330 */ 2331 if (backenab) { 2332 mutex_exit(QLOCK(q)); 2333 for (bpri = q->q_nband; bpri != 0; bpri--) 2334 if (qbf[bpri]) 2335 backenable(q, bpri); 2336 if (qbf[0]) 2337 backenable(q, 0); 2338 } else 2339 mutex_exit(QLOCK(q)); 2340 } 2341 2342 /* 2343 * The real flushing takes place in flushq_common. This is done so that 2344 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2345 * or not. Currently the only place that uses this flag is the stream head. 2346 */ 2347 void 2348 flushq(queue_t *q, int flag) 2349 { 2350 flushq_common(q, flag, 0); 2351 } 2352 2353 /* 2354 * Flush the queue of messages of the given priority band. 2355 * There is some duplication of code between flushq and flushband. 2356 * This is because we want to optimize the code as much as possible. 2357 * The assumption is that there will be more messages in the normal 2358 * (priority 0) band than in any other. 2359 * 2360 * Historical note: when merging the M_FLUSH code in strrput with this 2361 * code one difference was discovered. flushband had an extra check for 2362 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2363 * case. That check does not match the man page for flushband and was not 2364 * in the strrput flush code hence it was removed. 2365 */ 2366 void 2367 flushband(queue_t *q, unsigned char pri, int flag) 2368 { 2369 mblk_t *mp; 2370 mblk_t *nmp; 2371 mblk_t *last; 2372 qband_t *qbp; 2373 int band; 2374 2375 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2376 if (pri > q->q_nband) { 2377 return; 2378 } 2379 mutex_enter(QLOCK(q)); 2380 if (pri == 0) { 2381 mp = q->q_first; 2382 q->q_first = NULL; 2383 q->q_last = NULL; 2384 q->q_count = 0; 2385 q->q_mblkcnt = 0; 2386 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2387 qbp->qb_first = NULL; 2388 qbp->qb_last = NULL; 2389 qbp->qb_count = 0; 2390 qbp->qb_mblkcnt = 0; 2391 qbp->qb_flag &= ~QB_FULL; 2392 } 2393 q->q_flag &= ~QFULL; 2394 mutex_exit(QLOCK(q)); 2395 while (mp) { 2396 nmp = mp->b_next; 2397 mp->b_next = mp->b_prev = NULL; 2398 if ((mp->b_band == 0) && 2399 ((flag == FLUSHALL) || 2400 datamsg(mp->b_datap->db_type))) 2401 freemsg(mp); 2402 else 2403 (void) putq(q, mp); 2404 mp = nmp; 2405 } 2406 mutex_enter(QLOCK(q)); 2407 if ((q->q_flag & QWANTW) && 2408 (((q->q_count < q->q_lowat) && 2409 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2410 q->q_flag &= ~QWANTW; 2411 mutex_exit(QLOCK(q)); 2412 2413 backenable(q, pri); 2414 } else 2415 mutex_exit(QLOCK(q)); 2416 } else { /* pri != 0 */ 2417 boolean_t flushed = B_FALSE; 2418 band = pri; 2419 2420 ASSERT(MUTEX_HELD(QLOCK(q))); 2421 qbp = q->q_bandp; 2422 while (--band > 0) 2423 qbp = qbp->qb_next; 2424 mp = qbp->qb_first; 2425 if (mp == NULL) { 2426 mutex_exit(QLOCK(q)); 2427 return; 2428 } 2429 last = qbp->qb_last->b_next; 2430 /* 2431 * rmvq_noenab() and freemsg() are called for each mblk that 2432 * meets the criteria. The loop is executed until the last 2433 * mblk has been processed. 2434 */ 2435 while (mp != last) { 2436 ASSERT(mp->b_band == pri); 2437 nmp = mp->b_next; 2438 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2439 rmvq_noenab(q, mp); 2440 freemsg(mp); 2441 flushed = B_TRUE; 2442 } 2443 mp = nmp; 2444 } 2445 mutex_exit(QLOCK(q)); 2446 2447 /* 2448 * If any mblk(s) has been freed, we know that qbackenable() 2449 * will need to be called. 2450 */ 2451 if (flushed) 2452 qbackenable(q, pri); 2453 } 2454 } 2455 2456 /* 2457 * Return 1 if the queue is not full. If the queue is full, return 2458 * 0 (may not put message) and set QWANTW flag (caller wants to write 2459 * to the queue). 2460 */ 2461 int 2462 canput(queue_t *q) 2463 { 2464 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2465 2466 /* this is for loopback transports, they should not do a canput */ 2467 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2468 2469 /* Find next forward module that has a service procedure */ 2470 q = q->q_nfsrv; 2471 2472 if (!(q->q_flag & QFULL)) { 2473 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2474 return (1); 2475 } 2476 mutex_enter(QLOCK(q)); 2477 if (q->q_flag & QFULL) { 2478 q->q_flag |= QWANTW; 2479 mutex_exit(QLOCK(q)); 2480 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2481 return (0); 2482 } 2483 mutex_exit(QLOCK(q)); 2484 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2485 return (1); 2486 } 2487 2488 /* 2489 * This is the new canput for use with priority bands. Return 1 if the 2490 * band is not full. If the band is full, return 0 (may not put message) 2491 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2492 * write to the queue). 2493 */ 2494 int 2495 bcanput(queue_t *q, unsigned char pri) 2496 { 2497 qband_t *qbp; 2498 2499 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2500 if (!q) 2501 return (0); 2502 2503 /* Find next forward module that has a service procedure */ 2504 q = q->q_nfsrv; 2505 2506 mutex_enter(QLOCK(q)); 2507 if (pri == 0) { 2508 if (q->q_flag & QFULL) { 2509 q->q_flag |= QWANTW; 2510 mutex_exit(QLOCK(q)); 2511 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2512 "bcanput:%p %X %d", q, pri, 0); 2513 return (0); 2514 } 2515 } else { /* pri != 0 */ 2516 if (pri > q->q_nband) { 2517 /* 2518 * No band exists yet, so return success. 2519 */ 2520 mutex_exit(QLOCK(q)); 2521 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2522 "bcanput:%p %X %d", q, pri, 1); 2523 return (1); 2524 } 2525 qbp = q->q_bandp; 2526 while (--pri) 2527 qbp = qbp->qb_next; 2528 if (qbp->qb_flag & QB_FULL) { 2529 qbp->qb_flag |= QB_WANTW; 2530 mutex_exit(QLOCK(q)); 2531 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2532 "bcanput:%p %X %d", q, pri, 0); 2533 return (0); 2534 } 2535 } 2536 mutex_exit(QLOCK(q)); 2537 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2538 "bcanput:%p %X %d", q, pri, 1); 2539 return (1); 2540 } 2541 2542 /* 2543 * Put a message on a queue. 2544 * 2545 * Messages are enqueued on a priority basis. The priority classes 2546 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2547 * and B_NORMAL (type < QPCTL && band == 0). 2548 * 2549 * Add appropriate weighted data block sizes to queue count. 2550 * If queue hits high water mark then set QFULL flag. 2551 * 2552 * If QNOENAB is not set (putq is allowed to enable the queue), 2553 * enable the queue only if the message is PRIORITY, 2554 * or the QWANTR flag is set (indicating that the service procedure 2555 * is ready to read the queue. This implies that a service 2556 * procedure must NEVER put a high priority message back on its own 2557 * queue, as this would result in an infinite loop (!). 2558 */ 2559 int 2560 putq(queue_t *q, mblk_t *bp) 2561 { 2562 mblk_t *tmp; 2563 qband_t *qbp = NULL; 2564 int mcls = (int)queclass(bp); 2565 kthread_id_t freezer; 2566 int bytecnt = 0, mblkcnt = 0; 2567 2568 freezer = STREAM(q)->sd_freezer; 2569 if (freezer == curthread) { 2570 ASSERT(frozenstr(q)); 2571 ASSERT(MUTEX_HELD(QLOCK(q))); 2572 } else 2573 mutex_enter(QLOCK(q)); 2574 2575 /* 2576 * Make sanity checks and if qband structure is not yet 2577 * allocated, do so. 2578 */ 2579 if (mcls == QPCTL) { 2580 if (bp->b_band != 0) 2581 bp->b_band = 0; /* force to be correct */ 2582 } else if (bp->b_band != 0) { 2583 int i; 2584 qband_t **qbpp; 2585 2586 if (bp->b_band > q->q_nband) { 2587 2588 /* 2589 * The qband structure for this priority band is 2590 * not on the queue yet, so we have to allocate 2591 * one on the fly. It would be wasteful to 2592 * associate the qband structures with every 2593 * queue when the queues are allocated. This is 2594 * because most queues will only need the normal 2595 * band of flow which can be described entirely 2596 * by the queue itself. 2597 */ 2598 qbpp = &q->q_bandp; 2599 while (*qbpp) 2600 qbpp = &(*qbpp)->qb_next; 2601 while (bp->b_band > q->q_nband) { 2602 if ((*qbpp = allocband()) == NULL) { 2603 if (freezer != curthread) 2604 mutex_exit(QLOCK(q)); 2605 return (0); 2606 } 2607 (*qbpp)->qb_hiwat = q->q_hiwat; 2608 (*qbpp)->qb_lowat = q->q_lowat; 2609 q->q_nband++; 2610 qbpp = &(*qbpp)->qb_next; 2611 } 2612 } 2613 ASSERT(MUTEX_HELD(QLOCK(q))); 2614 qbp = q->q_bandp; 2615 i = bp->b_band; 2616 while (--i) 2617 qbp = qbp->qb_next; 2618 } 2619 2620 /* 2621 * If queue is empty, add the message and initialize the pointers. 2622 * Otherwise, adjust message pointers and queue pointers based on 2623 * the type of the message and where it belongs on the queue. Some 2624 * code is duplicated to minimize the number of conditionals and 2625 * hopefully minimize the amount of time this routine takes. 2626 */ 2627 if (!q->q_first) { 2628 bp->b_next = NULL; 2629 bp->b_prev = NULL; 2630 q->q_first = bp; 2631 q->q_last = bp; 2632 if (qbp) { 2633 qbp->qb_first = bp; 2634 qbp->qb_last = bp; 2635 } 2636 } else if (!qbp) { /* bp->b_band == 0 */ 2637 2638 /* 2639 * If queue class of message is less than or equal to 2640 * that of the last one on the queue, tack on to the end. 2641 */ 2642 tmp = q->q_last; 2643 if (mcls <= (int)queclass(tmp)) { 2644 bp->b_next = NULL; 2645 bp->b_prev = tmp; 2646 tmp->b_next = bp; 2647 q->q_last = bp; 2648 } else { 2649 tmp = q->q_first; 2650 while ((int)queclass(tmp) >= mcls) 2651 tmp = tmp->b_next; 2652 2653 /* 2654 * Insert bp before tmp. 2655 */ 2656 bp->b_next = tmp; 2657 bp->b_prev = tmp->b_prev; 2658 if (tmp->b_prev) 2659 tmp->b_prev->b_next = bp; 2660 else 2661 q->q_first = bp; 2662 tmp->b_prev = bp; 2663 } 2664 } else { /* bp->b_band != 0 */ 2665 if (qbp->qb_first) { 2666 tmp = qbp->qb_last; 2667 2668 /* 2669 * Insert bp after the last message in this band. 2670 */ 2671 bp->b_next = tmp->b_next; 2672 if (tmp->b_next) 2673 tmp->b_next->b_prev = bp; 2674 else 2675 q->q_last = bp; 2676 bp->b_prev = tmp; 2677 tmp->b_next = bp; 2678 } else { 2679 tmp = q->q_last; 2680 if ((mcls < (int)queclass(tmp)) || 2681 (bp->b_band <= tmp->b_band)) { 2682 2683 /* 2684 * Tack bp on end of queue. 2685 */ 2686 bp->b_next = NULL; 2687 bp->b_prev = tmp; 2688 tmp->b_next = bp; 2689 q->q_last = bp; 2690 } else { 2691 tmp = q->q_first; 2692 while (tmp->b_datap->db_type >= QPCTL) 2693 tmp = tmp->b_next; 2694 while (tmp->b_band >= bp->b_band) 2695 tmp = tmp->b_next; 2696 2697 /* 2698 * Insert bp before tmp. 2699 */ 2700 bp->b_next = tmp; 2701 bp->b_prev = tmp->b_prev; 2702 if (tmp->b_prev) 2703 tmp->b_prev->b_next = bp; 2704 else 2705 q->q_first = bp; 2706 tmp->b_prev = bp; 2707 } 2708 qbp->qb_first = bp; 2709 } 2710 qbp->qb_last = bp; 2711 } 2712 2713 /* Get message byte count for q_count accounting */ 2714 bytecnt = mp_cont_len(bp, &mblkcnt); 2715 2716 if (qbp) { 2717 qbp->qb_count += bytecnt; 2718 qbp->qb_mblkcnt += mblkcnt; 2719 if ((qbp->qb_count >= qbp->qb_hiwat) || 2720 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2721 qbp->qb_flag |= QB_FULL; 2722 } 2723 } else { 2724 q->q_count += bytecnt; 2725 q->q_mblkcnt += mblkcnt; 2726 if ((q->q_count >= q->q_hiwat) || 2727 (q->q_mblkcnt >= q->q_hiwat)) { 2728 q->q_flag |= QFULL; 2729 } 2730 } 2731 2732 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0); 2733 2734 if ((mcls > QNORM) || 2735 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2736 qenable_locked(q); 2737 ASSERT(MUTEX_HELD(QLOCK(q))); 2738 if (freezer != curthread) 2739 mutex_exit(QLOCK(q)); 2740 2741 return (1); 2742 } 2743 2744 /* 2745 * Put stuff back at beginning of Q according to priority order. 2746 * See comment on putq above for details. 2747 */ 2748 int 2749 putbq(queue_t *q, mblk_t *bp) 2750 { 2751 mblk_t *tmp; 2752 qband_t *qbp = NULL; 2753 int mcls = (int)queclass(bp); 2754 kthread_id_t freezer; 2755 int bytecnt = 0, mblkcnt = 0; 2756 2757 ASSERT(q && bp); 2758 ASSERT(bp->b_next == NULL); 2759 freezer = STREAM(q)->sd_freezer; 2760 if (freezer == curthread) { 2761 ASSERT(frozenstr(q)); 2762 ASSERT(MUTEX_HELD(QLOCK(q))); 2763 } else 2764 mutex_enter(QLOCK(q)); 2765 2766 /* 2767 * Make sanity checks and if qband structure is not yet 2768 * allocated, do so. 2769 */ 2770 if (mcls == QPCTL) { 2771 if (bp->b_band != 0) 2772 bp->b_band = 0; /* force to be correct */ 2773 } else if (bp->b_band != 0) { 2774 int i; 2775 qband_t **qbpp; 2776 2777 if (bp->b_band > q->q_nband) { 2778 qbpp = &q->q_bandp; 2779 while (*qbpp) 2780 qbpp = &(*qbpp)->qb_next; 2781 while (bp->b_band > q->q_nband) { 2782 if ((*qbpp = allocband()) == NULL) { 2783 if (freezer != curthread) 2784 mutex_exit(QLOCK(q)); 2785 return (0); 2786 } 2787 (*qbpp)->qb_hiwat = q->q_hiwat; 2788 (*qbpp)->qb_lowat = q->q_lowat; 2789 q->q_nband++; 2790 qbpp = &(*qbpp)->qb_next; 2791 } 2792 } 2793 qbp = q->q_bandp; 2794 i = bp->b_band; 2795 while (--i) 2796 qbp = qbp->qb_next; 2797 } 2798 2799 /* 2800 * If queue is empty or if message is high priority, 2801 * place on the front of the queue. 2802 */ 2803 tmp = q->q_first; 2804 if ((!tmp) || (mcls == QPCTL)) { 2805 bp->b_next = tmp; 2806 if (tmp) 2807 tmp->b_prev = bp; 2808 else 2809 q->q_last = bp; 2810 q->q_first = bp; 2811 bp->b_prev = NULL; 2812 if (qbp) { 2813 qbp->qb_first = bp; 2814 qbp->qb_last = bp; 2815 } 2816 } else if (qbp) { /* bp->b_band != 0 */ 2817 tmp = qbp->qb_first; 2818 if (tmp) { 2819 2820 /* 2821 * Insert bp before the first message in this band. 2822 */ 2823 bp->b_next = tmp; 2824 bp->b_prev = tmp->b_prev; 2825 if (tmp->b_prev) 2826 tmp->b_prev->b_next = bp; 2827 else 2828 q->q_first = bp; 2829 tmp->b_prev = bp; 2830 } else { 2831 tmp = q->q_last; 2832 if ((mcls < (int)queclass(tmp)) || 2833 (bp->b_band < tmp->b_band)) { 2834 2835 /* 2836 * Tack bp on end of queue. 2837 */ 2838 bp->b_next = NULL; 2839 bp->b_prev = tmp; 2840 tmp->b_next = bp; 2841 q->q_last = bp; 2842 } else { 2843 tmp = q->q_first; 2844 while (tmp->b_datap->db_type >= QPCTL) 2845 tmp = tmp->b_next; 2846 while (tmp->b_band > bp->b_band) 2847 tmp = tmp->b_next; 2848 2849 /* 2850 * Insert bp before tmp. 2851 */ 2852 bp->b_next = tmp; 2853 bp->b_prev = tmp->b_prev; 2854 if (tmp->b_prev) 2855 tmp->b_prev->b_next = bp; 2856 else 2857 q->q_first = bp; 2858 tmp->b_prev = bp; 2859 } 2860 qbp->qb_last = bp; 2861 } 2862 qbp->qb_first = bp; 2863 } else { /* bp->b_band == 0 && !QPCTL */ 2864 2865 /* 2866 * If the queue class or band is less than that of the last 2867 * message on the queue, tack bp on the end of the queue. 2868 */ 2869 tmp = q->q_last; 2870 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2871 bp->b_next = NULL; 2872 bp->b_prev = tmp; 2873 tmp->b_next = bp; 2874 q->q_last = bp; 2875 } else { 2876 tmp = q->q_first; 2877 while (tmp->b_datap->db_type >= QPCTL) 2878 tmp = tmp->b_next; 2879 while (tmp->b_band > bp->b_band) 2880 tmp = tmp->b_next; 2881 2882 /* 2883 * Insert bp before tmp. 2884 */ 2885 bp->b_next = tmp; 2886 bp->b_prev = tmp->b_prev; 2887 if (tmp->b_prev) 2888 tmp->b_prev->b_next = bp; 2889 else 2890 q->q_first = bp; 2891 tmp->b_prev = bp; 2892 } 2893 } 2894 2895 /* Get message byte count for q_count accounting */ 2896 bytecnt = mp_cont_len(bp, &mblkcnt); 2897 2898 if (qbp) { 2899 qbp->qb_count += bytecnt; 2900 qbp->qb_mblkcnt += mblkcnt; 2901 if ((qbp->qb_count >= qbp->qb_hiwat) || 2902 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2903 qbp->qb_flag |= QB_FULL; 2904 } 2905 } else { 2906 q->q_count += bytecnt; 2907 q->q_mblkcnt += mblkcnt; 2908 if ((q->q_count >= q->q_hiwat) || 2909 (q->q_mblkcnt >= q->q_hiwat)) { 2910 q->q_flag |= QFULL; 2911 } 2912 } 2913 2914 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0); 2915 2916 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2917 qenable_locked(q); 2918 ASSERT(MUTEX_HELD(QLOCK(q))); 2919 if (freezer != curthread) 2920 mutex_exit(QLOCK(q)); 2921 2922 return (1); 2923 } 2924 2925 /* 2926 * Insert a message before an existing message on the queue. If the 2927 * existing message is NULL, the new messages is placed on the end of 2928 * the queue. The queue class of the new message is ignored. However, 2929 * the priority band of the new message must adhere to the following 2930 * ordering: 2931 * 2932 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2933 * 2934 * All flow control parameters are updated. 2935 * 2936 * insq can be called with the stream frozen, but other utility functions 2937 * holding QLOCK, and by streams modules without any locks/frozen. 2938 */ 2939 int 2940 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2941 { 2942 mblk_t *tmp; 2943 qband_t *qbp = NULL; 2944 int mcls = (int)queclass(mp); 2945 kthread_id_t freezer; 2946 int bytecnt = 0, mblkcnt = 0; 2947 2948 freezer = STREAM(q)->sd_freezer; 2949 if (freezer == curthread) { 2950 ASSERT(frozenstr(q)); 2951 ASSERT(MUTEX_HELD(QLOCK(q))); 2952 } else if (MUTEX_HELD(QLOCK(q))) { 2953 /* Don't drop lock on exit */ 2954 freezer = curthread; 2955 } else 2956 mutex_enter(QLOCK(q)); 2957 2958 if (mcls == QPCTL) { 2959 if (mp->b_band != 0) 2960 mp->b_band = 0; /* force to be correct */ 2961 if (emp && emp->b_prev && 2962 (emp->b_prev->b_datap->db_type < QPCTL)) 2963 goto badord; 2964 } 2965 if (emp) { 2966 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2967 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2968 (emp->b_prev->b_band < mp->b_band))) { 2969 goto badord; 2970 } 2971 } else { 2972 tmp = q->q_last; 2973 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2974 badord: 2975 cmn_err(CE_WARN, 2976 "insq: attempt to insert message out of order " 2977 "on q %p", (void *)q); 2978 if (freezer != curthread) 2979 mutex_exit(QLOCK(q)); 2980 return (0); 2981 } 2982 } 2983 2984 if (mp->b_band != 0) { 2985 int i; 2986 qband_t **qbpp; 2987 2988 if (mp->b_band > q->q_nband) { 2989 qbpp = &q->q_bandp; 2990 while (*qbpp) 2991 qbpp = &(*qbpp)->qb_next; 2992 while (mp->b_band > q->q_nband) { 2993 if ((*qbpp = allocband()) == NULL) { 2994 if (freezer != curthread) 2995 mutex_exit(QLOCK(q)); 2996 return (0); 2997 } 2998 (*qbpp)->qb_hiwat = q->q_hiwat; 2999 (*qbpp)->qb_lowat = q->q_lowat; 3000 q->q_nband++; 3001 qbpp = &(*qbpp)->qb_next; 3002 } 3003 } 3004 qbp = q->q_bandp; 3005 i = mp->b_band; 3006 while (--i) 3007 qbp = qbp->qb_next; 3008 } 3009 3010 if ((mp->b_next = emp) != NULL) { 3011 if ((mp->b_prev = emp->b_prev) != NULL) 3012 emp->b_prev->b_next = mp; 3013 else 3014 q->q_first = mp; 3015 emp->b_prev = mp; 3016 } else { 3017 if ((mp->b_prev = q->q_last) != NULL) 3018 q->q_last->b_next = mp; 3019 else 3020 q->q_first = mp; 3021 q->q_last = mp; 3022 } 3023 3024 /* Get mblk and byte count for q_count accounting */ 3025 bytecnt = mp_cont_len(mp, &mblkcnt); 3026 3027 if (qbp) { /* adjust qband pointers and count */ 3028 if (!qbp->qb_first) { 3029 qbp->qb_first = mp; 3030 qbp->qb_last = mp; 3031 } else { 3032 if (mp->b_prev == NULL || (mp->b_prev != NULL && 3033 (mp->b_prev->b_band != mp->b_band))) 3034 qbp->qb_first = mp; 3035 else if (mp->b_next == NULL || (mp->b_next != NULL && 3036 (mp->b_next->b_band != mp->b_band))) 3037 qbp->qb_last = mp; 3038 } 3039 qbp->qb_count += bytecnt; 3040 qbp->qb_mblkcnt += mblkcnt; 3041 if ((qbp->qb_count >= qbp->qb_hiwat) || 3042 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 3043 qbp->qb_flag |= QB_FULL; 3044 } 3045 } else { 3046 q->q_count += bytecnt; 3047 q->q_mblkcnt += mblkcnt; 3048 if ((q->q_count >= q->q_hiwat) || 3049 (q->q_mblkcnt >= q->q_hiwat)) { 3050 q->q_flag |= QFULL; 3051 } 3052 } 3053 3054 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0); 3055 3056 if (canenable(q) && (q->q_flag & QWANTR)) 3057 qenable_locked(q); 3058 3059 ASSERT(MUTEX_HELD(QLOCK(q))); 3060 if (freezer != curthread) 3061 mutex_exit(QLOCK(q)); 3062 3063 return (1); 3064 } 3065 3066 /* 3067 * Create and put a control message on queue. 3068 */ 3069 int 3070 putctl(queue_t *q, int type) 3071 { 3072 mblk_t *bp; 3073 3074 if ((datamsg(type) && (type != M_DELAY)) || 3075 (bp = allocb_tryhard(0)) == NULL) 3076 return (0); 3077 bp->b_datap->db_type = (unsigned char) type; 3078 3079 put(q, bp); 3080 3081 return (1); 3082 } 3083 3084 /* 3085 * Control message with a single-byte parameter 3086 */ 3087 int 3088 putctl1(queue_t *q, int type, int param) 3089 { 3090 mblk_t *bp; 3091 3092 if ((datamsg(type) && (type != M_DELAY)) || 3093 (bp = allocb_tryhard(1)) == NULL) 3094 return (0); 3095 bp->b_datap->db_type = (unsigned char)type; 3096 *bp->b_wptr++ = (unsigned char)param; 3097 3098 put(q, bp); 3099 3100 return (1); 3101 } 3102 3103 int 3104 putnextctl1(queue_t *q, int type, int param) 3105 { 3106 mblk_t *bp; 3107 3108 if ((datamsg(type) && (type != M_DELAY)) || 3109 ((bp = allocb_tryhard(1)) == NULL)) 3110 return (0); 3111 3112 bp->b_datap->db_type = (unsigned char)type; 3113 *bp->b_wptr++ = (unsigned char)param; 3114 3115 putnext(q, bp); 3116 3117 return (1); 3118 } 3119 3120 int 3121 putnextctl(queue_t *q, int type) 3122 { 3123 mblk_t *bp; 3124 3125 if ((datamsg(type) && (type != M_DELAY)) || 3126 ((bp = allocb_tryhard(0)) == NULL)) 3127 return (0); 3128 bp->b_datap->db_type = (unsigned char)type; 3129 3130 putnext(q, bp); 3131 3132 return (1); 3133 } 3134 3135 /* 3136 * Return the queue upstream from this one 3137 */ 3138 queue_t * 3139 backq(queue_t *q) 3140 { 3141 q = _OTHERQ(q); 3142 if (q->q_next) { 3143 q = q->q_next; 3144 return (_OTHERQ(q)); 3145 } 3146 return (NULL); 3147 } 3148 3149 /* 3150 * Send a block back up the queue in reverse from this 3151 * one (e.g. to respond to ioctls) 3152 */ 3153 void 3154 qreply(queue_t *q, mblk_t *bp) 3155 { 3156 ASSERT(q && bp); 3157 3158 putnext(_OTHERQ(q), bp); 3159 } 3160 3161 /* 3162 * Streams Queue Scheduling 3163 * 3164 * Queues are enabled through qenable() when they have messages to 3165 * process. They are serviced by queuerun(), which runs each enabled 3166 * queue's service procedure. The call to queuerun() is processor 3167 * dependent - the general principle is that it be run whenever a queue 3168 * is enabled but before returning to user level. For system calls, 3169 * the function runqueues() is called if their action causes a queue 3170 * to be enabled. For device interrupts, queuerun() should be 3171 * called before returning from the last level of interrupt. Beyond 3172 * this, no timing assumptions should be made about queue scheduling. 3173 */ 3174 3175 /* 3176 * Enable a queue: put it on list of those whose service procedures are 3177 * ready to run and set up the scheduling mechanism. 3178 * The broadcast is done outside the mutex -> to avoid the woken thread 3179 * from contending with the mutex. This is OK 'cos the queue has been 3180 * enqueued on the runlist and flagged safely at this point. 3181 */ 3182 void 3183 qenable(queue_t *q) 3184 { 3185 mutex_enter(QLOCK(q)); 3186 qenable_locked(q); 3187 mutex_exit(QLOCK(q)); 3188 } 3189 /* 3190 * Return number of messages on queue 3191 */ 3192 int 3193 qsize(queue_t *qp) 3194 { 3195 int count = 0; 3196 mblk_t *mp; 3197 3198 mutex_enter(QLOCK(qp)); 3199 for (mp = qp->q_first; mp; mp = mp->b_next) 3200 count++; 3201 mutex_exit(QLOCK(qp)); 3202 return (count); 3203 } 3204 3205 /* 3206 * noenable - set queue so that putq() will not enable it. 3207 * enableok - set queue so that putq() can enable it. 3208 */ 3209 void 3210 noenable(queue_t *q) 3211 { 3212 mutex_enter(QLOCK(q)); 3213 q->q_flag |= QNOENB; 3214 mutex_exit(QLOCK(q)); 3215 } 3216 3217 void 3218 enableok(queue_t *q) 3219 { 3220 mutex_enter(QLOCK(q)); 3221 q->q_flag &= ~QNOENB; 3222 mutex_exit(QLOCK(q)); 3223 } 3224 3225 /* 3226 * Set queue fields. 3227 */ 3228 int 3229 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 3230 { 3231 qband_t *qbp = NULL; 3232 queue_t *wrq; 3233 int error = 0; 3234 kthread_id_t freezer; 3235 3236 freezer = STREAM(q)->sd_freezer; 3237 if (freezer == curthread) { 3238 ASSERT(frozenstr(q)); 3239 ASSERT(MUTEX_HELD(QLOCK(q))); 3240 } else 3241 mutex_enter(QLOCK(q)); 3242 3243 if (what >= QBAD) { 3244 error = EINVAL; 3245 goto done; 3246 } 3247 if (pri != 0) { 3248 int i; 3249 qband_t **qbpp; 3250 3251 if (pri > q->q_nband) { 3252 qbpp = &q->q_bandp; 3253 while (*qbpp) 3254 qbpp = &(*qbpp)->qb_next; 3255 while (pri > q->q_nband) { 3256 if ((*qbpp = allocband()) == NULL) { 3257 error = EAGAIN; 3258 goto done; 3259 } 3260 (*qbpp)->qb_hiwat = q->q_hiwat; 3261 (*qbpp)->qb_lowat = q->q_lowat; 3262 q->q_nband++; 3263 qbpp = &(*qbpp)->qb_next; 3264 } 3265 } 3266 qbp = q->q_bandp; 3267 i = pri; 3268 while (--i) 3269 qbp = qbp->qb_next; 3270 } 3271 switch (what) { 3272 3273 case QHIWAT: 3274 if (qbp) 3275 qbp->qb_hiwat = (size_t)val; 3276 else 3277 q->q_hiwat = (size_t)val; 3278 break; 3279 3280 case QLOWAT: 3281 if (qbp) 3282 qbp->qb_lowat = (size_t)val; 3283 else 3284 q->q_lowat = (size_t)val; 3285 break; 3286 3287 case QMAXPSZ: 3288 if (qbp) 3289 error = EINVAL; 3290 else 3291 q->q_maxpsz = (ssize_t)val; 3292 3293 /* 3294 * Performance concern, strwrite looks at the module below 3295 * the stream head for the maxpsz each time it does a write 3296 * we now cache it at the stream head. Check to see if this 3297 * queue is sitting directly below the stream head. 3298 */ 3299 wrq = STREAM(q)->sd_wrq; 3300 if (q != wrq->q_next) 3301 break; 3302 3303 /* 3304 * If the stream is not frozen drop the current QLOCK and 3305 * acquire the sd_wrq QLOCK which protects sd_qn_* 3306 */ 3307 if (freezer != curthread) { 3308 mutex_exit(QLOCK(q)); 3309 mutex_enter(QLOCK(wrq)); 3310 } 3311 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3312 3313 if (strmsgsz != 0) { 3314 if (val == INFPSZ) 3315 val = strmsgsz; 3316 else { 3317 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3318 val = MIN(PIPE_BUF, val); 3319 else 3320 val = MIN(strmsgsz, val); 3321 } 3322 } 3323 STREAM(q)->sd_qn_maxpsz = val; 3324 if (freezer != curthread) { 3325 mutex_exit(QLOCK(wrq)); 3326 mutex_enter(QLOCK(q)); 3327 } 3328 break; 3329 3330 case QMINPSZ: 3331 if (qbp) 3332 error = EINVAL; 3333 else 3334 q->q_minpsz = (ssize_t)val; 3335 3336 /* 3337 * Performance concern, strwrite looks at the module below 3338 * the stream head for the maxpsz each time it does a write 3339 * we now cache it at the stream head. Check to see if this 3340 * queue is sitting directly below the stream head. 3341 */ 3342 wrq = STREAM(q)->sd_wrq; 3343 if (q != wrq->q_next) 3344 break; 3345 3346 /* 3347 * If the stream is not frozen drop the current QLOCK and 3348 * acquire the sd_wrq QLOCK which protects sd_qn_* 3349 */ 3350 if (freezer != curthread) { 3351 mutex_exit(QLOCK(q)); 3352 mutex_enter(QLOCK(wrq)); 3353 } 3354 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3355 3356 if (freezer != curthread) { 3357 mutex_exit(QLOCK(wrq)); 3358 mutex_enter(QLOCK(q)); 3359 } 3360 break; 3361 3362 case QSTRUIOT: 3363 if (qbp) 3364 error = EINVAL; 3365 else 3366 q->q_struiot = (ushort_t)val; 3367 break; 3368 3369 case QCOUNT: 3370 case QFIRST: 3371 case QLAST: 3372 case QFLAG: 3373 error = EPERM; 3374 break; 3375 3376 default: 3377 error = EINVAL; 3378 break; 3379 } 3380 done: 3381 if (freezer != curthread) 3382 mutex_exit(QLOCK(q)); 3383 return (error); 3384 } 3385 3386 /* 3387 * Get queue fields. 3388 */ 3389 int 3390 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3391 { 3392 qband_t *qbp = NULL; 3393 int error = 0; 3394 kthread_id_t freezer; 3395 3396 freezer = STREAM(q)->sd_freezer; 3397 if (freezer == curthread) { 3398 ASSERT(frozenstr(q)); 3399 ASSERT(MUTEX_HELD(QLOCK(q))); 3400 } else 3401 mutex_enter(QLOCK(q)); 3402 if (what >= QBAD) { 3403 error = EINVAL; 3404 goto done; 3405 } 3406 if (pri != 0) { 3407 int i; 3408 qband_t **qbpp; 3409 3410 if (pri > q->q_nband) { 3411 qbpp = &q->q_bandp; 3412 while (*qbpp) 3413 qbpp = &(*qbpp)->qb_next; 3414 while (pri > q->q_nband) { 3415 if ((*qbpp = allocband()) == NULL) { 3416 error = EAGAIN; 3417 goto done; 3418 } 3419 (*qbpp)->qb_hiwat = q->q_hiwat; 3420 (*qbpp)->qb_lowat = q->q_lowat; 3421 q->q_nband++; 3422 qbpp = &(*qbpp)->qb_next; 3423 } 3424 } 3425 qbp = q->q_bandp; 3426 i = pri; 3427 while (--i) 3428 qbp = qbp->qb_next; 3429 } 3430 switch (what) { 3431 case QHIWAT: 3432 if (qbp) 3433 *(size_t *)valp = qbp->qb_hiwat; 3434 else 3435 *(size_t *)valp = q->q_hiwat; 3436 break; 3437 3438 case QLOWAT: 3439 if (qbp) 3440 *(size_t *)valp = qbp->qb_lowat; 3441 else 3442 *(size_t *)valp = q->q_lowat; 3443 break; 3444 3445 case QMAXPSZ: 3446 if (qbp) 3447 error = EINVAL; 3448 else 3449 *(ssize_t *)valp = q->q_maxpsz; 3450 break; 3451 3452 case QMINPSZ: 3453 if (qbp) 3454 error = EINVAL; 3455 else 3456 *(ssize_t *)valp = q->q_minpsz; 3457 break; 3458 3459 case QCOUNT: 3460 if (qbp) 3461 *(size_t *)valp = qbp->qb_count; 3462 else 3463 *(size_t *)valp = q->q_count; 3464 break; 3465 3466 case QFIRST: 3467 if (qbp) 3468 *(mblk_t **)valp = qbp->qb_first; 3469 else 3470 *(mblk_t **)valp = q->q_first; 3471 break; 3472 3473 case QLAST: 3474 if (qbp) 3475 *(mblk_t **)valp = qbp->qb_last; 3476 else 3477 *(mblk_t **)valp = q->q_last; 3478 break; 3479 3480 case QFLAG: 3481 if (qbp) 3482 *(uint_t *)valp = qbp->qb_flag; 3483 else 3484 *(uint_t *)valp = q->q_flag; 3485 break; 3486 3487 case QSTRUIOT: 3488 if (qbp) 3489 error = EINVAL; 3490 else 3491 *(short *)valp = q->q_struiot; 3492 break; 3493 3494 default: 3495 error = EINVAL; 3496 break; 3497 } 3498 done: 3499 if (freezer != curthread) 3500 mutex_exit(QLOCK(q)); 3501 return (error); 3502 } 3503 3504 /* 3505 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3506 * QWANTWSYNC or QWANTR or QWANTW, 3507 * 3508 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3509 * deferred wakeup will be done. Also if strpoll() in progress then a 3510 * deferred pollwakeup will be done. 3511 */ 3512 void 3513 strwakeq(queue_t *q, int flag) 3514 { 3515 stdata_t *stp = STREAM(q); 3516 pollhead_t *pl; 3517 3518 mutex_enter(&stp->sd_lock); 3519 pl = &stp->sd_pollist; 3520 if (flag & QWANTWSYNC) { 3521 ASSERT(!(q->q_flag & QREADR)); 3522 if (stp->sd_flag & WSLEEP) { 3523 stp->sd_flag &= ~WSLEEP; 3524 cv_broadcast(&stp->sd_wrq->q_wait); 3525 } else { 3526 stp->sd_wakeq |= WSLEEP; 3527 } 3528 3529 mutex_exit(&stp->sd_lock); 3530 pollwakeup(pl, POLLWRNORM); 3531 mutex_enter(&stp->sd_lock); 3532 3533 if (stp->sd_sigflags & S_WRNORM) 3534 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3535 } else if (flag & QWANTR) { 3536 if (stp->sd_flag & RSLEEP) { 3537 stp->sd_flag &= ~RSLEEP; 3538 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3539 } else { 3540 stp->sd_wakeq |= RSLEEP; 3541 } 3542 3543 mutex_exit(&stp->sd_lock); 3544 pollwakeup(pl, POLLIN | POLLRDNORM); 3545 mutex_enter(&stp->sd_lock); 3546 3547 { 3548 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3549 3550 if (events) 3551 strsendsig(stp->sd_siglist, events, 0, 0); 3552 } 3553 } else { 3554 if (stp->sd_flag & WSLEEP) { 3555 stp->sd_flag &= ~WSLEEP; 3556 cv_broadcast(&stp->sd_wrq->q_wait); 3557 } 3558 3559 mutex_exit(&stp->sd_lock); 3560 pollwakeup(pl, POLLWRNORM); 3561 mutex_enter(&stp->sd_lock); 3562 3563 if (stp->sd_sigflags & S_WRNORM) 3564 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3565 } 3566 mutex_exit(&stp->sd_lock); 3567 } 3568 3569 int 3570 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3571 { 3572 stdata_t *stp = STREAM(q); 3573 int typ = STRUIOT_STANDARD; 3574 uio_t *uiop = &dp->d_uio; 3575 dblk_t *dbp; 3576 ssize_t uiocnt; 3577 ssize_t cnt; 3578 unsigned char *ptr; 3579 ssize_t resid; 3580 int error = 0; 3581 on_trap_data_t otd; 3582 queue_t *stwrq; 3583 3584 /* 3585 * Plumbing may change while taking the type so store the 3586 * queue in a temporary variable. It doesn't matter even 3587 * if the we take the type from the previous plumbing, 3588 * that's because if the plumbing has changed when we were 3589 * holding the queue in a temporary variable, we can continue 3590 * processing the message the way it would have been processed 3591 * in the old plumbing, without any side effects but a bit 3592 * extra processing for partial ip header checksum. 3593 * 3594 * This has been done to avoid holding the sd_lock which is 3595 * very hot. 3596 */ 3597 3598 stwrq = stp->sd_struiowrq; 3599 if (stwrq) 3600 typ = stwrq->q_struiot; 3601 3602 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3603 dbp = mp->b_datap; 3604 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3605 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3606 cnt = MIN(uiocnt, uiop->uio_resid); 3607 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3608 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3609 /* 3610 * Either this mblk has already been processed 3611 * or there is no more room in this mblk (?). 3612 */ 3613 continue; 3614 } 3615 switch (typ) { 3616 case STRUIOT_STANDARD: 3617 if (noblock) { 3618 if (on_trap(&otd, OT_DATA_ACCESS)) { 3619 no_trap(); 3620 error = EWOULDBLOCK; 3621 goto out; 3622 } 3623 } 3624 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3625 if (noblock) 3626 no_trap(); 3627 goto out; 3628 } 3629 if (noblock) 3630 no_trap(); 3631 break; 3632 3633 default: 3634 error = EIO; 3635 goto out; 3636 } 3637 dbp->db_struioflag |= STRUIO_DONE; 3638 dbp->db_cksumstuff += cnt; 3639 } 3640 out: 3641 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3642 /* 3643 * A fault has occured and some bytes were moved to the 3644 * current mblk, the uio_t has already been updated by 3645 * the appropriate uio routine, so also update the mblk 3646 * to reflect this in case this same mblk chain is used 3647 * again (after the fault has been handled). 3648 */ 3649 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3650 if (uiocnt >= resid) 3651 dbp->db_cksumstuff += resid; 3652 } 3653 return (error); 3654 } 3655 3656 /* 3657 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3658 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3659 * that removeq() will not try to close the queue while a thread is inside the 3660 * queue. 3661 */ 3662 static boolean_t 3663 rwnext_enter(queue_t *qp) 3664 { 3665 mutex_enter(QLOCK(qp)); 3666 if (qp->q_flag & QWCLOSE) { 3667 mutex_exit(QLOCK(qp)); 3668 return (B_FALSE); 3669 } 3670 qp->q_rwcnt++; 3671 ASSERT(qp->q_rwcnt != 0); 3672 mutex_exit(QLOCK(qp)); 3673 return (B_TRUE); 3674 } 3675 3676 /* 3677 * Decrease the count of threads running in sync stream queue and wake up any 3678 * threads blocked in removeq(). 3679 */ 3680 static void 3681 rwnext_exit(queue_t *qp) 3682 { 3683 mutex_enter(QLOCK(qp)); 3684 qp->q_rwcnt--; 3685 if (qp->q_flag & QWANTRMQSYNC) { 3686 qp->q_flag &= ~QWANTRMQSYNC; 3687 cv_broadcast(&qp->q_wait); 3688 } 3689 mutex_exit(QLOCK(qp)); 3690 } 3691 3692 /* 3693 * The purpose of rwnext() is to call the rw procedure of the next 3694 * (downstream) modules queue. 3695 * 3696 * treated as put entrypoint for perimeter syncronization. 3697 * 3698 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3699 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3700 * not matter if any regular put entrypoints have been already entered. We 3701 * can't increment one of the sq_putcounts (instead of sq_count) because 3702 * qwait_rw won't know which counter to decrement. 3703 * 3704 * It would be reasonable to add the lockless FASTPUT logic. 3705 */ 3706 int 3707 rwnext(queue_t *qp, struiod_t *dp) 3708 { 3709 queue_t *nqp; 3710 syncq_t *sq; 3711 uint16_t count; 3712 uint16_t flags; 3713 struct qinit *qi; 3714 int (*proc)(); 3715 struct stdata *stp; 3716 int isread; 3717 int rval; 3718 3719 stp = STREAM(qp); 3720 /* 3721 * Prevent q_next from changing by holding sd_lock until acquiring 3722 * SQLOCK. Note that a read-side rwnext from the streamhead will 3723 * already have sd_lock acquired. In either case sd_lock is always 3724 * released after acquiring SQLOCK. 3725 * 3726 * The streamhead read-side holding sd_lock when calling rwnext is 3727 * required to prevent a race condition were M_DATA mblks flowing 3728 * up the read-side of the stream could be bypassed by a rwnext() 3729 * down-call. In this case sd_lock acts as the streamhead perimeter. 3730 */ 3731 if ((nqp = _WR(qp)) == qp) { 3732 isread = 0; 3733 mutex_enter(&stp->sd_lock); 3734 qp = nqp->q_next; 3735 } else { 3736 isread = 1; 3737 if (nqp != stp->sd_wrq) 3738 /* Not streamhead */ 3739 mutex_enter(&stp->sd_lock); 3740 qp = _RD(nqp->q_next); 3741 } 3742 qi = qp->q_qinfo; 3743 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3744 /* 3745 * Not a synchronous module or no r/w procedure for this 3746 * queue, so just return EINVAL and let the caller handle it. 3747 */ 3748 mutex_exit(&stp->sd_lock); 3749 return (EINVAL); 3750 } 3751 3752 if (rwnext_enter(qp) == B_FALSE) { 3753 mutex_exit(&stp->sd_lock); 3754 return (EINVAL); 3755 } 3756 3757 sq = qp->q_syncq; 3758 mutex_enter(SQLOCK(sq)); 3759 mutex_exit(&stp->sd_lock); 3760 count = sq->sq_count; 3761 flags = sq->sq_flags; 3762 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3763 3764 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3765 /* 3766 * if this queue is being closed, return. 3767 */ 3768 if (qp->q_flag & QWCLOSE) { 3769 mutex_exit(SQLOCK(sq)); 3770 rwnext_exit(qp); 3771 return (EINVAL); 3772 } 3773 3774 /* 3775 * Wait until we can enter the inner perimeter. 3776 */ 3777 sq->sq_flags = flags | SQ_WANTWAKEUP; 3778 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3779 count = sq->sq_count; 3780 flags = sq->sq_flags; 3781 } 3782 3783 if (isread == 0 && stp->sd_struiowrq == NULL || 3784 isread == 1 && stp->sd_struiordq == NULL) { 3785 /* 3786 * Stream plumbing changed while waiting for inner perimeter 3787 * so just return EINVAL and let the caller handle it. 3788 */ 3789 mutex_exit(SQLOCK(sq)); 3790 rwnext_exit(qp); 3791 return (EINVAL); 3792 } 3793 if (!(flags & SQ_CIPUT)) 3794 sq->sq_flags = flags | SQ_EXCL; 3795 sq->sq_count = count + 1; 3796 ASSERT(sq->sq_count != 0); /* Wraparound */ 3797 /* 3798 * Note: The only message ordering guarantee that rwnext() makes is 3799 * for the write queue flow-control case. All others (r/w queue 3800 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3801 * the queue's rw procedure. This could be genralized here buy 3802 * running the queue's service procedure, but that wouldn't be 3803 * the most efficent for all cases. 3804 */ 3805 mutex_exit(SQLOCK(sq)); 3806 if (! isread && (qp->q_flag & QFULL)) { 3807 /* 3808 * Write queue may be flow controlled. If so, 3809 * mark the queue for wakeup when it's not. 3810 */ 3811 mutex_enter(QLOCK(qp)); 3812 if (qp->q_flag & QFULL) { 3813 qp->q_flag |= QWANTWSYNC; 3814 mutex_exit(QLOCK(qp)); 3815 rval = EWOULDBLOCK; 3816 goto out; 3817 } 3818 mutex_exit(QLOCK(qp)); 3819 } 3820 3821 if (! isread && dp->d_mp) 3822 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3823 dp->d_mp->b_datap->db_base); 3824 3825 rval = (*proc)(qp, dp); 3826 3827 if (isread && dp->d_mp) 3828 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3829 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3830 out: 3831 /* 3832 * The queue is protected from being freed by sq_count, so it is 3833 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3834 */ 3835 rwnext_exit(qp); 3836 3837 mutex_enter(SQLOCK(sq)); 3838 flags = sq->sq_flags; 3839 ASSERT(sq->sq_count != 0); 3840 sq->sq_count--; 3841 if (flags & SQ_TAIL) { 3842 putnext_tail(sq, qp, flags); 3843 /* 3844 * The only purpose of this ASSERT is to preserve calling stack 3845 * in DEBUG kernel. 3846 */ 3847 ASSERT(flags & SQ_TAIL); 3848 return (rval); 3849 } 3850 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3851 /* 3852 * Safe to always drop SQ_EXCL: 3853 * Not SQ_CIPUT means we set SQ_EXCL above 3854 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3855 * did a qwriter(INNER) in which case nobody else 3856 * is in the inner perimeter and we are exiting. 3857 * 3858 * I would like to make the following assertion: 3859 * 3860 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3861 * sq->sq_count == 0); 3862 * 3863 * which indicates that if we are both putshared and exclusive, 3864 * we became exclusive while executing the putproc, and the only 3865 * claim on the syncq was the one we dropped a few lines above. 3866 * But other threads that enter putnext while the syncq is exclusive 3867 * need to make a claim as they may need to drop SQLOCK in the 3868 * has_writers case to avoid deadlocks. If these threads are 3869 * delayed or preempted, it is possible that the writer thread can 3870 * find out that there are other claims making the (sq_count == 0) 3871 * test invalid. 3872 */ 3873 3874 sq->sq_flags = flags & ~SQ_EXCL; 3875 if (sq->sq_flags & SQ_WANTWAKEUP) { 3876 sq->sq_flags &= ~SQ_WANTWAKEUP; 3877 cv_broadcast(&sq->sq_wait); 3878 } 3879 mutex_exit(SQLOCK(sq)); 3880 return (rval); 3881 } 3882 3883 /* 3884 * The purpose of infonext() is to call the info procedure of the next 3885 * (downstream) modules queue. 3886 * 3887 * treated as put entrypoint for perimeter syncronization. 3888 * 3889 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3890 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3891 * it does not matter if any regular put entrypoints have been already 3892 * entered. 3893 */ 3894 int 3895 infonext(queue_t *qp, infod_t *idp) 3896 { 3897 queue_t *nqp; 3898 syncq_t *sq; 3899 uint16_t count; 3900 uint16_t flags; 3901 struct qinit *qi; 3902 int (*proc)(); 3903 struct stdata *stp; 3904 int rval; 3905 3906 stp = STREAM(qp); 3907 /* 3908 * Prevent q_next from changing by holding sd_lock until 3909 * acquiring SQLOCK. 3910 */ 3911 mutex_enter(&stp->sd_lock); 3912 if ((nqp = _WR(qp)) == qp) { 3913 qp = nqp->q_next; 3914 } else { 3915 qp = _RD(nqp->q_next); 3916 } 3917 qi = qp->q_qinfo; 3918 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3919 mutex_exit(&stp->sd_lock); 3920 return (EINVAL); 3921 } 3922 sq = qp->q_syncq; 3923 mutex_enter(SQLOCK(sq)); 3924 mutex_exit(&stp->sd_lock); 3925 count = sq->sq_count; 3926 flags = sq->sq_flags; 3927 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3928 3929 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3930 /* 3931 * Wait until we can enter the inner perimeter. 3932 */ 3933 sq->sq_flags = flags | SQ_WANTWAKEUP; 3934 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3935 count = sq->sq_count; 3936 flags = sq->sq_flags; 3937 } 3938 3939 if (! (flags & SQ_CIPUT)) 3940 sq->sq_flags = flags | SQ_EXCL; 3941 sq->sq_count = count + 1; 3942 ASSERT(sq->sq_count != 0); /* Wraparound */ 3943 mutex_exit(SQLOCK(sq)); 3944 3945 rval = (*proc)(qp, idp); 3946 3947 mutex_enter(SQLOCK(sq)); 3948 flags = sq->sq_flags; 3949 ASSERT(sq->sq_count != 0); 3950 sq->sq_count--; 3951 if (flags & SQ_TAIL) { 3952 putnext_tail(sq, qp, flags); 3953 /* 3954 * The only purpose of this ASSERT is to preserve calling stack 3955 * in DEBUG kernel. 3956 */ 3957 ASSERT(flags & SQ_TAIL); 3958 return (rval); 3959 } 3960 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3961 /* 3962 * XXXX 3963 * I am not certain the next comment is correct here. I need to consider 3964 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3965 * might cause other problems. It just might be safer to drop it if 3966 * !SQ_CIPUT because that is when we set it. 3967 */ 3968 /* 3969 * Safe to always drop SQ_EXCL: 3970 * Not SQ_CIPUT means we set SQ_EXCL above 3971 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3972 * did a qwriter(INNER) in which case nobody else 3973 * is in the inner perimeter and we are exiting. 3974 * 3975 * I would like to make the following assertion: 3976 * 3977 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3978 * sq->sq_count == 0); 3979 * 3980 * which indicates that if we are both putshared and exclusive, 3981 * we became exclusive while executing the putproc, and the only 3982 * claim on the syncq was the one we dropped a few lines above. 3983 * But other threads that enter putnext while the syncq is exclusive 3984 * need to make a claim as they may need to drop SQLOCK in the 3985 * has_writers case to avoid deadlocks. If these threads are 3986 * delayed or preempted, it is possible that the writer thread can 3987 * find out that there are other claims making the (sq_count == 0) 3988 * test invalid. 3989 */ 3990 3991 sq->sq_flags = flags & ~SQ_EXCL; 3992 mutex_exit(SQLOCK(sq)); 3993 return (rval); 3994 } 3995 3996 /* 3997 * Return nonzero if the queue is responsible for struio(), else return 0. 3998 */ 3999 int 4000 isuioq(queue_t *q) 4001 { 4002 if (q->q_flag & QREADR) 4003 return (STREAM(q)->sd_struiordq == q); 4004 else 4005 return (STREAM(q)->sd_struiowrq == q); 4006 } 4007 4008 #if defined(__sparc) 4009 int disable_putlocks = 0; 4010 #else 4011 int disable_putlocks = 1; 4012 #endif 4013 4014 /* 4015 * called by create_putlock. 4016 */ 4017 static void 4018 create_syncq_putlocks(queue_t *q) 4019 { 4020 syncq_t *sq = q->q_syncq; 4021 ciputctrl_t *cip; 4022 int i; 4023 4024 ASSERT(sq != NULL); 4025 4026 ASSERT(disable_putlocks == 0); 4027 ASSERT(n_ciputctrl >= min_n_ciputctrl); 4028 ASSERT(ciputctrl_cache != NULL); 4029 4030 if (!(sq->sq_type & SQ_CIPUT)) 4031 return; 4032 4033 for (i = 0; i <= 1; i++) { 4034 if (sq->sq_ciputctrl == NULL) { 4035 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4036 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4037 mutex_enter(SQLOCK(sq)); 4038 if (sq->sq_ciputctrl != NULL) { 4039 mutex_exit(SQLOCK(sq)); 4040 kmem_cache_free(ciputctrl_cache, cip); 4041 } else { 4042 ASSERT(sq->sq_nciputctrl == 0); 4043 sq->sq_nciputctrl = n_ciputctrl - 1; 4044 /* 4045 * putnext checks sq_ciputctrl without holding 4046 * SQLOCK. if it is not NULL putnext assumes 4047 * sq_nciputctrl is initialized. membar below 4048 * insures that. 4049 */ 4050 membar_producer(); 4051 sq->sq_ciputctrl = cip; 4052 mutex_exit(SQLOCK(sq)); 4053 } 4054 } 4055 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 4056 if (i == 1) 4057 break; 4058 q = _OTHERQ(q); 4059 if (!(q->q_flag & QPERQ)) { 4060 ASSERT(sq == q->q_syncq); 4061 break; 4062 } 4063 ASSERT(q->q_syncq != NULL); 4064 ASSERT(sq != q->q_syncq); 4065 sq = q->q_syncq; 4066 ASSERT(sq->sq_type & SQ_CIPUT); 4067 } 4068 } 4069 4070 /* 4071 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 4072 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 4073 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 4074 * starting from q and down to the driver. 4075 * 4076 * This should be called after the affected queues are part of stream 4077 * geometry. It should be called from driver/module open routine after 4078 * qprocson() call. It is also called from nfs syscall where it is known that 4079 * stream is configured and won't change its geometry during create_putlock 4080 * call. 4081 * 4082 * caller normally uses 0 value for the stream argument to speed up MT putnext 4083 * into the perimeter of q for example because its perimeter is per module 4084 * (e.g. IP). 4085 * 4086 * caller normally uses non 0 value for the stream argument to hint the system 4087 * that the stream of q is a very contended global system stream 4088 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 4089 * particularly MT hot. 4090 * 4091 * Caller insures stream plumbing won't happen while we are here and therefore 4092 * q_next can be safely used. 4093 */ 4094 4095 void 4096 create_putlocks(queue_t *q, int stream) 4097 { 4098 ciputctrl_t *cip; 4099 struct stdata *stp = STREAM(q); 4100 4101 q = _WR(q); 4102 ASSERT(stp != NULL); 4103 4104 if (disable_putlocks != 0) 4105 return; 4106 4107 if (n_ciputctrl < min_n_ciputctrl) 4108 return; 4109 4110 ASSERT(ciputctrl_cache != NULL); 4111 4112 if (stream != 0 && stp->sd_ciputctrl == NULL) { 4113 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4114 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4115 mutex_enter(&stp->sd_lock); 4116 if (stp->sd_ciputctrl != NULL) { 4117 mutex_exit(&stp->sd_lock); 4118 kmem_cache_free(ciputctrl_cache, cip); 4119 } else { 4120 ASSERT(stp->sd_nciputctrl == 0); 4121 stp->sd_nciputctrl = n_ciputctrl - 1; 4122 /* 4123 * putnext checks sd_ciputctrl without holding 4124 * sd_lock. if it is not NULL putnext assumes 4125 * sd_nciputctrl is initialized. membar below 4126 * insures that. 4127 */ 4128 membar_producer(); 4129 stp->sd_ciputctrl = cip; 4130 mutex_exit(&stp->sd_lock); 4131 } 4132 } 4133 4134 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 4135 4136 while (_SAMESTR(q)) { 4137 create_syncq_putlocks(q); 4138 if (stream == 0) 4139 return; 4140 q = q->q_next; 4141 } 4142 ASSERT(q != NULL); 4143 create_syncq_putlocks(q); 4144 } 4145 4146 /* 4147 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 4148 * through a stream. 4149 * 4150 * Data currently record per-event is a timestamp, module/driver name, 4151 * downstream module/driver name, optional callstack, event type and a per 4152 * type datum. Much of the STREAMS framework is instrumented for automatic 4153 * flow tracing (when enabled). Events can be defined and used by STREAMS 4154 * modules and drivers. 4155 * 4156 * Global objects: 4157 * 4158 * str_ftevent() - Add a flow-trace event to a dblk. 4159 * str_ftfree() - Free flow-trace data 4160 * 4161 * Local objects: 4162 * 4163 * fthdr_cache - pointer to the kmem cache for trace header. 4164 * ftblk_cache - pointer to the kmem cache for trace data blocks. 4165 */ 4166 4167 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 4168 int str_ftstack = 0; /* Don't record event call stacks */ 4169 4170 void 4171 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 4172 { 4173 ftblk_t *bp = hp->tail; 4174 ftblk_t *nbp; 4175 ftevnt_t *ep; 4176 int ix, nix; 4177 4178 ASSERT(hp != NULL); 4179 4180 for (;;) { 4181 if ((ix = bp->ix) == FTBLK_EVNTS) { 4182 /* 4183 * Tail doesn't have room, so need a new tail. 4184 * 4185 * To make this MT safe, first, allocate a new 4186 * ftblk, and initialize it. To make life a 4187 * little easier, reserve the first slot (mostly 4188 * by making ix = 1). When we are finished with 4189 * the initialization, CAS this pointer to the 4190 * tail. If this succeeds, this is the new 4191 * "next" block. Otherwise, another thread 4192 * got here first, so free the block and start 4193 * again. 4194 */ 4195 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP); 4196 if (nbp == NULL) { 4197 /* no mem, so punt */ 4198 str_ftnever++; 4199 /* free up all flow data? */ 4200 return; 4201 } 4202 nbp->nxt = NULL; 4203 nbp->ix = 1; 4204 /* 4205 * Just in case there is another thread about 4206 * to get the next index, we need to make sure 4207 * the value is there for it. 4208 */ 4209 membar_producer(); 4210 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) { 4211 /* CAS was successful */ 4212 bp->nxt = nbp; 4213 membar_producer(); 4214 bp = nbp; 4215 ix = 0; 4216 goto cas_good; 4217 } else { 4218 kmem_cache_free(ftblk_cache, nbp); 4219 bp = hp->tail; 4220 continue; 4221 } 4222 } 4223 nix = ix + 1; 4224 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) { 4225 cas_good: 4226 if (curthread != hp->thread) { 4227 hp->thread = curthread; 4228 evnt |= FTEV_CS; 4229 } 4230 if (CPU->cpu_seqid != hp->cpu_seqid) { 4231 hp->cpu_seqid = CPU->cpu_seqid; 4232 evnt |= FTEV_PS; 4233 } 4234 ep = &bp->ev[ix]; 4235 break; 4236 } 4237 } 4238 4239 if (evnt & FTEV_QMASK) { 4240 queue_t *qp = p; 4241 4242 if (!(qp->q_flag & QREADR)) 4243 evnt |= FTEV_ISWR; 4244 4245 ep->mid = Q2NAME(qp); 4246 4247 /* 4248 * We only record the next queue name for FTEV_PUTNEXT since 4249 * that's the only time we *really* need it, and the putnext() 4250 * code ensures that qp->q_next won't vanish. (We could use 4251 * claimstr()/releasestr() but at a performance cost.) 4252 */ 4253 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL) 4254 ep->midnext = Q2NAME(qp->q_next); 4255 else 4256 ep->midnext = NULL; 4257 } else { 4258 ep->mid = p; 4259 ep->midnext = NULL; 4260 } 4261 4262 if (ep->stk != NULL) 4263 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH); 4264 4265 ep->ts = gethrtime(); 4266 ep->evnt = evnt; 4267 ep->data = data; 4268 hp->hash = (hp->hash << 9) + hp->hash; 4269 hp->hash += (evnt << 16) | data; 4270 hp->hash += (uintptr_t)ep->mid; 4271 } 4272 4273 /* 4274 * Free flow-trace data. 4275 */ 4276 void 4277 str_ftfree(dblk_t *dbp) 4278 { 4279 fthdr_t *hp = dbp->db_fthdr; 4280 ftblk_t *bp = &hp->first; 4281 ftblk_t *nbp; 4282 4283 if (bp != hp->tail || bp->ix != 0) { 4284 /* 4285 * Clear out the hash, have the tail point to itself, and free 4286 * any continuation blocks. 4287 */ 4288 bp = hp->first.nxt; 4289 hp->tail = &hp->first; 4290 hp->hash = 0; 4291 hp->first.nxt = NULL; 4292 hp->first.ix = 0; 4293 while (bp != NULL) { 4294 nbp = bp->nxt; 4295 kmem_cache_free(ftblk_cache, bp); 4296 bp = nbp; 4297 } 4298 } 4299 kmem_cache_free(fthdr_cache, hp); 4300 dbp->db_fthdr = NULL; 4301 } 4302