1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 /* 25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 * 28 * Copyright 2021 Tintri by DDN, Inc. All rights reserved. 29 * Copyright 2022 Garrett D'Amore 30 * Copyright 2024 Oxide Computer Company 31 */ 32 33 #include <sys/types.h> 34 #include <sys/param.h> 35 #include <sys/thread.h> 36 #include <sys/sysmacros.h> 37 #include <sys/stropts.h> 38 #include <sys/stream.h> 39 #include <sys/strsubr.h> 40 #include <sys/strsun.h> 41 #include <sys/conf.h> 42 #include <sys/debug.h> 43 #include <sys/cmn_err.h> 44 #include <sys/kmem.h> 45 #include <sys/atomic.h> 46 #include <sys/errno.h> 47 #include <sys/vtrace.h> 48 #include <sys/ftrace.h> 49 #include <sys/ontrap.h> 50 #include <sys/sdt.h> 51 #include <sys/strft.h> 52 53 #ifdef DEBUG 54 #include <sys/kmem_impl.h> 55 #endif 56 57 /* 58 * This file contains all the STREAMS utility routines that may 59 * be used by modules and drivers. 60 */ 61 62 /* 63 * STREAMS message allocator: principles of operation 64 * 65 * The streams message allocator consists of all the routines that 66 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 67 * dupb(), freeb() and freemsg(). What follows is a high-level view 68 * of how the allocator works. 69 * 70 * Every streams message consists of one or more mblks, a dblk, and data. 71 * All mblks for all types of messages come from a common mblk_cache. 72 * The dblk and data come in several flavors, depending on how the 73 * message is allocated: 74 * 75 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 76 * fixed-size dblk/data caches. For message sizes that are multiples of 77 * PAGESIZE, dblks are allocated separately from the buffer. 78 * The associated buffer is allocated by the constructor using kmem_alloc(). 79 * For all other message sizes, dblk and its associated data is allocated 80 * as a single contiguous chunk of memory. 81 * Objects in these caches consist of a dblk plus its associated data. 82 * allocb() determines the nearest-size cache by table lookup: 83 * the dblk_cache[] array provides the mapping from size to dblk cache. 84 * 85 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 86 * kmem_alloc()'ing a buffer for the data and supplying that 87 * buffer to gesballoc(), described below. 88 * 89 * (3) The four flavors of [d]esballoc[a] are all implemented by a 90 * common routine, gesballoc() ("generic esballoc"). gesballoc() 91 * allocates a dblk from the global dblk_esb_cache and sets db_base, 92 * db_lim and db_frtnp to describe the caller-supplied buffer. 93 * 94 * While there are several routines to allocate messages, there is only 95 * one routine to free messages: freeb(). freeb() simply invokes the 96 * dblk's free method, dbp->db_free(), which is set at allocation time. 97 * 98 * dupb() creates a new reference to a message by allocating a new mblk, 99 * incrementing the dblk reference count and setting the dblk's free 100 * method to dblk_decref(). The dblk's original free method is retained 101 * in db_lastfree. dblk_decref() decrements the reference count on each 102 * freeb(). If this is not the last reference it just frees the mblk; 103 * if this *is* the last reference, it restores db_free to db_lastfree, 104 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 105 * 106 * The implementation makes aggressive use of kmem object caching for 107 * maximum performance. This makes the code simple and compact, but 108 * also a bit abstruse in some places. The invariants that constitute a 109 * message's constructed state, described below, are more subtle than usual. 110 * 111 * Every dblk has an "attached mblk" as part of its constructed state. 112 * The mblk is allocated by the dblk's constructor and remains attached 113 * until the message is either dup'ed or pulled up. In the dupb() case 114 * the mblk association doesn't matter until the last free, at which time 115 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 116 * the mblk association because it swaps the leading mblks of two messages, 117 * so it is responsible for swapping their db_mblk pointers accordingly. 118 * From a constructed-state viewpoint it doesn't matter that a dblk's 119 * attached mblk can change while the message is allocated; all that 120 * matters is that the dblk has *some* attached mblk when it's freed. 121 * 122 * The sizes of the allocb() small-message caches are not magical. 123 * They represent a good trade-off between internal and external 124 * fragmentation for current workloads. They should be reevaluated 125 * periodically, especially if allocations larger than DBLK_MAX_CACHE 126 * become common. We use 64-byte alignment so that dblks don't 127 * straddle cache lines unnecessarily. 128 */ 129 #define DBLK_MAX_CACHE 73728 130 #define DBLK_CACHE_ALIGN 64 131 #define DBLK_MIN_SIZE 8 132 #define DBLK_SIZE_SHIFT 3 133 134 #ifdef _BIG_ENDIAN 135 #define DBLK_RTFU_SHIFT(field) \ 136 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 137 #else 138 #define DBLK_RTFU_SHIFT(field) \ 139 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 140 #endif 141 142 #define DBLK_RTFU(ref, type, flags, uioflag) \ 143 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 144 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 145 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 146 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 147 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 148 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 149 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 150 151 static size_t dblk_sizes[] = { 152 #ifdef _LP64 153 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856, 154 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624, 155 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392, 156 #else 157 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904, 158 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672, 159 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440, 160 #endif 161 DBLK_MAX_CACHE, 0 162 }; 163 164 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 165 static struct kmem_cache *mblk_cache; 166 static struct kmem_cache *dblk_esb_cache; 167 static struct kmem_cache *fthdr_cache; 168 static struct kmem_cache *ftblk_cache; 169 170 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 171 static mblk_t *allocb_oversize(size_t size, int flags); 172 static int allocb_tryhard_fails; 173 static void frnop_func(void *arg); 174 frtn_t frnop = { frnop_func }; 175 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 176 177 static boolean_t rwnext_enter(queue_t *qp); 178 static void rwnext_exit(queue_t *qp); 179 180 /* 181 * Patchable mblk/dblk kmem_cache flags. 182 */ 183 int dblk_kmem_flags = 0; 184 int mblk_kmem_flags = 0; 185 186 static int 187 dblk_constructor(void *buf, void *cdrarg, int kmflags) 188 { 189 dblk_t *dbp = buf; 190 ssize_t msg_size = (ssize_t)cdrarg; 191 size_t index; 192 193 ASSERT(msg_size != 0); 194 195 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 196 197 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 198 199 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 200 return (-1); 201 if ((msg_size & PAGEOFFSET) == 0) { 202 dbp->db_base = kmem_alloc(msg_size, kmflags); 203 if (dbp->db_base == NULL) { 204 kmem_cache_free(mblk_cache, dbp->db_mblk); 205 return (-1); 206 } 207 } else { 208 dbp->db_base = (unsigned char *)&dbp[1]; 209 } 210 211 dbp->db_mblk->b_datap = dbp; 212 dbp->db_cache = dblk_cache[index]; 213 dbp->db_lim = dbp->db_base + msg_size; 214 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 215 dbp->db_frtnp = NULL; 216 dbp->db_fthdr = NULL; 217 dbp->db_credp = NULL; 218 dbp->db_cpid = -1; 219 dbp->db_struioflag = 0; 220 dbp->db_struioun.cksum.flags = 0; 221 return (0); 222 } 223 224 /*ARGSUSED*/ 225 static int 226 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 227 { 228 dblk_t *dbp = buf; 229 230 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 231 return (-1); 232 dbp->db_mblk->b_datap = dbp; 233 dbp->db_cache = dblk_esb_cache; 234 dbp->db_fthdr = NULL; 235 dbp->db_credp = NULL; 236 dbp->db_cpid = -1; 237 dbp->db_struioflag = 0; 238 dbp->db_struioun.cksum.flags = 0; 239 return (0); 240 } 241 242 static int 243 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 244 { 245 dblk_t *dbp = buf; 246 bcache_t *bcp = cdrarg; 247 248 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 249 return (-1); 250 251 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags); 252 if (dbp->db_base == NULL) { 253 kmem_cache_free(mblk_cache, dbp->db_mblk); 254 return (-1); 255 } 256 257 dbp->db_mblk->b_datap = dbp; 258 dbp->db_cache = (void *)bcp; 259 dbp->db_lim = dbp->db_base + bcp->size; 260 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 261 dbp->db_frtnp = NULL; 262 dbp->db_fthdr = NULL; 263 dbp->db_credp = NULL; 264 dbp->db_cpid = -1; 265 dbp->db_struioflag = 0; 266 dbp->db_struioun.cksum.flags = 0; 267 return (0); 268 } 269 270 /*ARGSUSED*/ 271 static void 272 dblk_destructor(void *buf, void *cdrarg) 273 { 274 dblk_t *dbp = buf; 275 ssize_t msg_size = (ssize_t)cdrarg; 276 277 ASSERT(dbp->db_mblk->b_datap == dbp); 278 ASSERT(msg_size != 0); 279 ASSERT(dbp->db_struioflag == 0); 280 ASSERT(dbp->db_struioun.cksum.flags == 0); 281 282 if ((msg_size & PAGEOFFSET) == 0) { 283 kmem_free(dbp->db_base, msg_size); 284 } 285 286 kmem_cache_free(mblk_cache, dbp->db_mblk); 287 } 288 289 static void 290 bcache_dblk_destructor(void *buf, void *cdrarg) 291 { 292 dblk_t *dbp = buf; 293 bcache_t *bcp = cdrarg; 294 295 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 296 297 ASSERT(dbp->db_mblk->b_datap == dbp); 298 ASSERT(dbp->db_struioflag == 0); 299 ASSERT(dbp->db_struioun.cksum.flags == 0); 300 301 kmem_cache_free(mblk_cache, dbp->db_mblk); 302 } 303 304 /* ARGSUSED */ 305 static int 306 ftblk_constructor(void *buf, void *cdrarg, int kmflags) 307 { 308 ftblk_t *fbp = buf; 309 int i; 310 311 bzero(fbp, sizeof (ftblk_t)); 312 if (str_ftstack != 0) { 313 for (i = 0; i < FTBLK_EVNTS; i++) 314 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags); 315 } 316 317 return (0); 318 } 319 320 /* ARGSUSED */ 321 static void 322 ftblk_destructor(void *buf, void *cdrarg) 323 { 324 ftblk_t *fbp = buf; 325 int i; 326 327 if (str_ftstack != 0) { 328 for (i = 0; i < FTBLK_EVNTS; i++) { 329 if (fbp->ev[i].stk != NULL) { 330 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t)); 331 fbp->ev[i].stk = NULL; 332 } 333 } 334 } 335 } 336 337 static int 338 fthdr_constructor(void *buf, void *cdrarg, int kmflags) 339 { 340 fthdr_t *fhp = buf; 341 342 return (ftblk_constructor(&fhp->first, cdrarg, kmflags)); 343 } 344 345 static void 346 fthdr_destructor(void *buf, void *cdrarg) 347 { 348 fthdr_t *fhp = buf; 349 350 ftblk_destructor(&fhp->first, cdrarg); 351 } 352 353 void 354 streams_msg_init(void) 355 { 356 char name[40]; 357 size_t size; 358 size_t lastsize = DBLK_MIN_SIZE; 359 size_t *sizep; 360 struct kmem_cache *cp; 361 size_t tot_size; 362 int offset; 363 364 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32, 365 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags); 366 367 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 368 369 if ((offset = (size & PAGEOFFSET)) != 0) { 370 /* 371 * We are in the middle of a page, dblk should 372 * be allocated on the same page 373 */ 374 tot_size = size + sizeof (dblk_t); 375 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 376 < PAGESIZE); 377 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 378 379 } else { 380 381 /* 382 * buf size is multiple of page size, dblk and 383 * buffer are allocated separately. 384 */ 385 386 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 387 tot_size = sizeof (dblk_t); 388 } 389 390 (void) sprintf(name, "streams_dblk_%ld", size); 391 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN, 392 dblk_constructor, dblk_destructor, NULL, (void *)(size), 393 NULL, dblk_kmem_flags); 394 395 while (lastsize <= size) { 396 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 397 lastsize += DBLK_MIN_SIZE; 398 } 399 } 400 401 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t), 402 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL, 403 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags); 404 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32, 405 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0); 406 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32, 407 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0); 408 409 /* initialize throttling queue for esballoc */ 410 esballoc_queue_init(); 411 } 412 413 /*ARGSUSED*/ 414 mblk_t * 415 allocb(size_t size, uint_t pri) 416 { 417 dblk_t *dbp; 418 mblk_t *mp; 419 size_t index; 420 421 index = (size - 1) >> DBLK_SIZE_SHIFT; 422 423 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 424 if (size != 0) { 425 mp = allocb_oversize(size, KM_NOSLEEP); 426 goto out; 427 } 428 index = 0; 429 } 430 431 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 432 mp = NULL; 433 goto out; 434 } 435 436 mp = dbp->db_mblk; 437 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 438 mp->b_next = mp->b_prev = mp->b_cont = NULL; 439 mp->b_rptr = mp->b_wptr = dbp->db_base; 440 mp->b_queue = NULL; 441 MBLK_BAND_FLAG_WORD(mp) = 0; 442 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 443 out: 444 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 445 446 return (mp); 447 } 448 449 /* 450 * Allocate an mblk taking db_credp and db_cpid from the template. 451 * Allow the cred to be NULL. 452 */ 453 mblk_t * 454 allocb_tmpl(size_t size, const mblk_t *tmpl) 455 { 456 mblk_t *mp = allocb(size, 0); 457 458 if (mp != NULL) { 459 dblk_t *src = tmpl->b_datap; 460 dblk_t *dst = mp->b_datap; 461 cred_t *cr; 462 pid_t cpid; 463 464 cr = msg_getcred(tmpl, &cpid); 465 if (cr != NULL) 466 crhold(dst->db_credp = cr); 467 dst->db_cpid = cpid; 468 dst->db_type = src->db_type; 469 } 470 return (mp); 471 } 472 473 mblk_t * 474 allocb_cred(size_t size, cred_t *cr, pid_t cpid) 475 { 476 mblk_t *mp = allocb(size, 0); 477 478 ASSERT(cr != NULL); 479 if (mp != NULL) { 480 dblk_t *dbp = mp->b_datap; 481 482 crhold(dbp->db_credp = cr); 483 dbp->db_cpid = cpid; 484 } 485 return (mp); 486 } 487 488 mblk_t * 489 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid) 490 { 491 mblk_t *mp = allocb_wait(size, 0, flags, error); 492 493 ASSERT(cr != NULL); 494 if (mp != NULL) { 495 dblk_t *dbp = mp->b_datap; 496 497 crhold(dbp->db_credp = cr); 498 dbp->db_cpid = cpid; 499 } 500 501 return (mp); 502 } 503 504 /* 505 * Extract the db_cred (and optionally db_cpid) from a message. 506 * We find the first mblk which has a non-NULL db_cred and use that. 507 * If none found we return NULL. 508 * Does NOT get a hold on the cred. 509 */ 510 cred_t * 511 msg_getcred(const mblk_t *mp, pid_t *cpidp) 512 { 513 cred_t *cr = NULL; 514 cred_t *cr2; 515 mblk_t *mp2; 516 517 while (mp != NULL) { 518 dblk_t *dbp = mp->b_datap; 519 520 cr = dbp->db_credp; 521 if (cr == NULL) { 522 mp = mp->b_cont; 523 continue; 524 } 525 if (cpidp != NULL) 526 *cpidp = dbp->db_cpid; 527 528 #ifdef DEBUG 529 /* 530 * Normally there should at most one db_credp in a message. 531 * But if there are multiple (as in the case of some M_IOC* 532 * and some internal messages in TCP/IP bind logic) then 533 * they must be identical in the normal case. 534 * However, a socket can be shared between different uids 535 * in which case data queued in TCP would be from different 536 * creds. Thus we can only assert for the zoneid being the 537 * same. Due to Multi-level Level Ports for TX, some 538 * cred_t can have a NULL cr_zone, and we skip the comparison 539 * in that case. 540 */ 541 mp2 = mp->b_cont; 542 while (mp2 != NULL) { 543 cr2 = DB_CRED(mp2); 544 if (cr2 != NULL) { 545 DTRACE_PROBE2(msg__getcred, 546 cred_t *, cr, cred_t *, cr2); 547 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 548 crgetzone(cr) == NULL || 549 crgetzone(cr2) == NULL); 550 } 551 mp2 = mp2->b_cont; 552 } 553 #endif 554 return (cr); 555 } 556 if (cpidp != NULL) 557 *cpidp = NOPID; 558 return (NULL); 559 } 560 561 /* 562 * Variant of msg_getcred which, when a cred is found 563 * 1. Returns with a hold on the cred 564 * 2. Clears the first cred in the mblk. 565 * This is more efficient to use than a msg_getcred() + crhold() when 566 * the message is freed after the cred has been extracted. 567 * 568 * The caller is responsible for ensuring that there is no other reference 569 * on the message since db_credp can not be cleared when there are other 570 * references. 571 */ 572 cred_t * 573 msg_extractcred(mblk_t *mp, pid_t *cpidp) 574 { 575 cred_t *cr = NULL; 576 cred_t *cr2; 577 mblk_t *mp2; 578 579 while (mp != NULL) { 580 dblk_t *dbp = mp->b_datap; 581 582 cr = dbp->db_credp; 583 if (cr == NULL) { 584 mp = mp->b_cont; 585 continue; 586 } 587 ASSERT(dbp->db_ref == 1); 588 dbp->db_credp = NULL; 589 if (cpidp != NULL) 590 *cpidp = dbp->db_cpid; 591 #ifdef DEBUG 592 /* 593 * Normally there should at most one db_credp in a message. 594 * But if there are multiple (as in the case of some M_IOC* 595 * and some internal messages in TCP/IP bind logic) then 596 * they must be identical in the normal case. 597 * However, a socket can be shared between different uids 598 * in which case data queued in TCP would be from different 599 * creds. Thus we can only assert for the zoneid being the 600 * same. Due to Multi-level Level Ports for TX, some 601 * cred_t can have a NULL cr_zone, and we skip the comparison 602 * in that case. 603 */ 604 mp2 = mp->b_cont; 605 while (mp2 != NULL) { 606 cr2 = DB_CRED(mp2); 607 if (cr2 != NULL) { 608 DTRACE_PROBE2(msg__extractcred, 609 cred_t *, cr, cred_t *, cr2); 610 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 611 crgetzone(cr) == NULL || 612 crgetzone(cr2) == NULL); 613 } 614 mp2 = mp2->b_cont; 615 } 616 #endif 617 return (cr); 618 } 619 return (NULL); 620 } 621 /* 622 * Get the label for a message. Uses the first mblk in the message 623 * which has a non-NULL db_credp. 624 * Returns NULL if there is no credp. 625 */ 626 extern struct ts_label_s * 627 msg_getlabel(const mblk_t *mp) 628 { 629 cred_t *cr = msg_getcred(mp, NULL); 630 631 if (cr == NULL) 632 return (NULL); 633 634 return (crgetlabel(cr)); 635 } 636 637 void 638 freeb(mblk_t *mp) 639 { 640 dblk_t *dbp = mp->b_datap; 641 642 ASSERT(dbp->db_ref > 0); 643 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 644 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 645 646 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 647 648 dbp->db_free(mp, dbp); 649 } 650 651 void 652 freemsg(mblk_t *mp) 653 { 654 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 655 while (mp) { 656 dblk_t *dbp = mp->b_datap; 657 mblk_t *mp_cont = mp->b_cont; 658 659 ASSERT(dbp->db_ref > 0); 660 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 661 662 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 663 664 dbp->db_free(mp, dbp); 665 mp = mp_cont; 666 } 667 } 668 669 /* 670 * Reallocate a block for another use. Try hard to use the old block. 671 * If the old data is wanted (copy), leave b_wptr at the end of the data, 672 * otherwise return b_wptr = b_rptr. 673 * 674 * This routine is private and unstable. 675 */ 676 mblk_t * 677 reallocb(mblk_t *mp, size_t size, uint_t copy) 678 { 679 mblk_t *mp1; 680 unsigned char *old_rptr; 681 ptrdiff_t cur_size; 682 683 if (mp == NULL) 684 return (allocb(size, BPRI_HI)); 685 686 cur_size = mp->b_wptr - mp->b_rptr; 687 old_rptr = mp->b_rptr; 688 689 ASSERT(mp->b_datap->db_ref != 0); 690 691 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 692 /* 693 * If the data is wanted and it will fit where it is, no 694 * work is required. 695 */ 696 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 697 return (mp); 698 699 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 700 mp1 = mp; 701 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 702 /* XXX other mp state could be copied too, db_flags ... ? */ 703 mp1->b_cont = mp->b_cont; 704 } else { 705 return (NULL); 706 } 707 708 if (copy) { 709 bcopy(old_rptr, mp1->b_rptr, cur_size); 710 mp1->b_wptr = mp1->b_rptr + cur_size; 711 } 712 713 if (mp != mp1) 714 freeb(mp); 715 716 return (mp1); 717 } 718 719 static void 720 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 721 { 722 ASSERT(dbp->db_mblk == mp); 723 if (dbp->db_fthdr != NULL) 724 str_ftfree(dbp); 725 726 /* set credp and projid to be 'unspecified' before returning to cache */ 727 if (dbp->db_credp != NULL) { 728 crfree(dbp->db_credp); 729 dbp->db_credp = NULL; 730 } 731 dbp->db_cpid = -1; 732 733 /* Reset the struioflag and the checksum flag fields */ 734 dbp->db_struioflag = 0; 735 dbp->db_struioun.cksum.flags = 0; 736 737 /* and the COOKED and/or UIOA flag(s) */ 738 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA); 739 740 kmem_cache_free(dbp->db_cache, dbp); 741 } 742 743 static void 744 dblk_decref(mblk_t *mp, dblk_t *dbp) 745 { 746 if (dbp->db_ref != 1) { 747 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 748 -(1 << DBLK_RTFU_SHIFT(db_ref))); 749 /* 750 * atomic_add_32_nv() just decremented db_ref, so we no longer 751 * have a reference to the dblk, which means another thread 752 * could free it. Therefore we cannot examine the dblk to 753 * determine whether ours was the last reference. Instead, 754 * we extract the new and minimum reference counts from rtfu. 755 * Note that all we're really saying is "if (ref != refmin)". 756 */ 757 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 758 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 759 kmem_cache_free(mblk_cache, mp); 760 return; 761 } 762 } 763 dbp->db_mblk = mp; 764 dbp->db_free = dbp->db_lastfree; 765 dbp->db_lastfree(mp, dbp); 766 } 767 768 mblk_t * 769 dupb(mblk_t *mp) 770 { 771 dblk_t *dbp = mp->b_datap; 772 mblk_t *new_mp; 773 uint32_t oldrtfu, newrtfu; 774 775 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 776 goto out; 777 778 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 779 new_mp->b_rptr = mp->b_rptr; 780 new_mp->b_wptr = mp->b_wptr; 781 new_mp->b_datap = dbp; 782 new_mp->b_queue = NULL; 783 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 784 785 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 786 787 dbp->db_free = dblk_decref; 788 do { 789 ASSERT(dbp->db_ref > 0); 790 oldrtfu = DBLK_RTFU_WORD(dbp); 791 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 792 /* 793 * If db_ref is maxed out we can't dup this message anymore. 794 */ 795 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 796 kmem_cache_free(mblk_cache, new_mp); 797 new_mp = NULL; 798 goto out; 799 } 800 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != 801 oldrtfu); 802 803 out: 804 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 805 return (new_mp); 806 } 807 808 static void 809 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 810 { 811 frtn_t *frp = dbp->db_frtnp; 812 813 ASSERT(dbp->db_mblk == mp); 814 frp->free_func(frp->free_arg); 815 if (dbp->db_fthdr != NULL) 816 str_ftfree(dbp); 817 818 /* set credp and projid to be 'unspecified' before returning to cache */ 819 if (dbp->db_credp != NULL) { 820 crfree(dbp->db_credp); 821 dbp->db_credp = NULL; 822 } 823 dbp->db_cpid = -1; 824 dbp->db_struioflag = 0; 825 dbp->db_struioun.cksum.flags = 0; 826 827 kmem_cache_free(dbp->db_cache, dbp); 828 } 829 830 /*ARGSUSED*/ 831 static void 832 frnop_func(void *arg) 833 { 834 } 835 836 /* 837 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 838 * 839 * The variants with a 'd' prefix (desballoc, desballoca) 840 * directly free the mblk when it loses its last ref, 841 * where the other variants free asynchronously. 842 * The variants with an 'a' suffix (esballoca, desballoca) 843 * add an extra ref, effectively letting the streams subsystem 844 * know that the message data should not be modified. 845 * (eg. see db_ref checks in reallocb and elsewhere) 846 * 847 * The method used by the 'a' suffix functions to keep the dblk 848 * db_ref > 1 is non-obvious. The macro DBLK_RTFU(2,...) passed to 849 * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN 850 * bit in db_flags. In dblk_decref() that flag essentially means 851 * the dblk has one extra ref, so the "last ref" is one, not zero. 852 */ 853 static mblk_t * 854 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 855 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 856 { 857 dblk_t *dbp; 858 mblk_t *mp; 859 860 ASSERT(base != NULL && frp != NULL); 861 862 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 863 mp = NULL; 864 goto out; 865 } 866 867 mp = dbp->db_mblk; 868 dbp->db_base = base; 869 dbp->db_lim = base + size; 870 dbp->db_free = dbp->db_lastfree = lastfree; 871 dbp->db_frtnp = frp; 872 DBLK_RTFU_WORD(dbp) = db_rtfu; 873 mp->b_next = mp->b_prev = mp->b_cont = NULL; 874 mp->b_rptr = mp->b_wptr = base; 875 mp->b_queue = NULL; 876 MBLK_BAND_FLAG_WORD(mp) = 0; 877 878 out: 879 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 880 return (mp); 881 } 882 883 /*ARGSUSED*/ 884 mblk_t * 885 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 886 { 887 mblk_t *mp; 888 889 /* 890 * Note that this is structured to allow the common case (i.e. 891 * STREAMS flowtracing disabled) to call gesballoc() with tail 892 * call optimization. 893 */ 894 if (!str_ftnever) { 895 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 896 frp, freebs_enqueue, KM_NOSLEEP); 897 898 if (mp != NULL) 899 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 900 return (mp); 901 } 902 903 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 904 frp, freebs_enqueue, KM_NOSLEEP)); 905 } 906 907 /* 908 * Same as esballoc() but sleeps waiting for memory. 909 */ 910 /*ARGSUSED*/ 911 mblk_t * 912 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 913 { 914 mblk_t *mp; 915 916 /* 917 * Note that this is structured to allow the common case (i.e. 918 * STREAMS flowtracing disabled) to call gesballoc() with tail 919 * call optimization. 920 */ 921 if (!str_ftnever) { 922 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 923 frp, freebs_enqueue, KM_SLEEP); 924 925 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 926 return (mp); 927 } 928 929 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 930 frp, freebs_enqueue, KM_SLEEP)); 931 } 932 933 /*ARGSUSED*/ 934 mblk_t * 935 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 936 { 937 mblk_t *mp; 938 939 /* 940 * Note that this is structured to allow the common case (i.e. 941 * STREAMS flowtracing disabled) to call gesballoc() with tail 942 * call optimization. 943 */ 944 if (!str_ftnever) { 945 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 946 frp, dblk_lastfree_desb, KM_NOSLEEP); 947 948 if (mp != NULL) 949 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 950 return (mp); 951 } 952 953 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 954 frp, dblk_lastfree_desb, KM_NOSLEEP)); 955 } 956 957 /*ARGSUSED*/ 958 mblk_t * 959 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 960 { 961 mblk_t *mp; 962 963 /* 964 * Note that this is structured to allow the common case (i.e. 965 * STREAMS flowtracing disabled) to call gesballoc() with tail 966 * call optimization. 967 */ 968 if (!str_ftnever) { 969 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 970 frp, freebs_enqueue, KM_NOSLEEP); 971 972 if (mp != NULL) 973 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 974 return (mp); 975 } 976 977 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 978 frp, freebs_enqueue, KM_NOSLEEP)); 979 } 980 981 /* 982 * Same as esballoca() but sleeps waiting for memory. 983 */ 984 mblk_t * 985 esballoca_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 986 { 987 mblk_t *mp; 988 989 /* 990 * Note that this is structured to allow the common case (i.e. 991 * STREAMS flowtracing disabled) to call gesballoc() with tail 992 * call optimization. 993 */ 994 if (!str_ftnever) { 995 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 996 frp, freebs_enqueue, KM_SLEEP); 997 998 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 999 return (mp); 1000 } 1001 1002 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 1003 frp, freebs_enqueue, KM_SLEEP)); 1004 } 1005 1006 /*ARGSUSED*/ 1007 mblk_t * 1008 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 1009 { 1010 mblk_t *mp; 1011 1012 /* 1013 * Note that this is structured to allow the common case (i.e. 1014 * STREAMS flowtracing disabled) to call gesballoc() with tail 1015 * call optimization. 1016 */ 1017 if (!str_ftnever) { 1018 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 1019 frp, dblk_lastfree_desb, KM_NOSLEEP); 1020 1021 if (mp != NULL) 1022 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 1023 return (mp); 1024 } 1025 1026 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 1027 frp, dblk_lastfree_desb, KM_NOSLEEP)); 1028 } 1029 1030 static void 1031 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 1032 { 1033 bcache_t *bcp = dbp->db_cache; 1034 1035 ASSERT(dbp->db_mblk == mp); 1036 if (dbp->db_fthdr != NULL) 1037 str_ftfree(dbp); 1038 1039 /* set credp and projid to be 'unspecified' before returning to cache */ 1040 if (dbp->db_credp != NULL) { 1041 crfree(dbp->db_credp); 1042 dbp->db_credp = NULL; 1043 } 1044 dbp->db_cpid = -1; 1045 dbp->db_struioflag = 0; 1046 dbp->db_struioun.cksum.flags = 0; 1047 1048 mutex_enter(&bcp->mutex); 1049 kmem_cache_free(bcp->dblk_cache, dbp); 1050 bcp->alloc--; 1051 1052 if (bcp->alloc == 0 && bcp->destroy != 0) { 1053 kmem_cache_destroy(bcp->dblk_cache); 1054 kmem_cache_destroy(bcp->buffer_cache); 1055 mutex_exit(&bcp->mutex); 1056 mutex_destroy(&bcp->mutex); 1057 kmem_free(bcp, sizeof (bcache_t)); 1058 } else { 1059 mutex_exit(&bcp->mutex); 1060 } 1061 } 1062 1063 bcache_t * 1064 bcache_create(char *name, size_t size, uint_t align) 1065 { 1066 bcache_t *bcp; 1067 char buffer[255]; 1068 1069 ASSERT((align & (align - 1)) == 0); 1070 1071 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL) 1072 return (NULL); 1073 1074 bcp->size = size; 1075 bcp->align = align; 1076 bcp->alloc = 0; 1077 bcp->destroy = 0; 1078 1079 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 1080 1081 (void) sprintf(buffer, "%s_buffer_cache", name); 1082 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 1083 NULL, NULL, NULL, 0); 1084 (void) sprintf(buffer, "%s_dblk_cache", name); 1085 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 1086 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 1087 NULL, (void *)bcp, NULL, 0); 1088 1089 return (bcp); 1090 } 1091 1092 void 1093 bcache_destroy(bcache_t *bcp) 1094 { 1095 ASSERT(bcp != NULL); 1096 1097 mutex_enter(&bcp->mutex); 1098 if (bcp->alloc == 0) { 1099 kmem_cache_destroy(bcp->dblk_cache); 1100 kmem_cache_destroy(bcp->buffer_cache); 1101 mutex_exit(&bcp->mutex); 1102 mutex_destroy(&bcp->mutex); 1103 kmem_free(bcp, sizeof (bcache_t)); 1104 } else { 1105 bcp->destroy++; 1106 mutex_exit(&bcp->mutex); 1107 } 1108 } 1109 1110 /*ARGSUSED*/ 1111 mblk_t * 1112 bcache_allocb(bcache_t *bcp, uint_t pri) 1113 { 1114 dblk_t *dbp; 1115 mblk_t *mp = NULL; 1116 1117 ASSERT(bcp != NULL); 1118 1119 mutex_enter(&bcp->mutex); 1120 if (bcp->destroy != 0) { 1121 mutex_exit(&bcp->mutex); 1122 goto out; 1123 } 1124 1125 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 1126 mutex_exit(&bcp->mutex); 1127 goto out; 1128 } 1129 bcp->alloc++; 1130 mutex_exit(&bcp->mutex); 1131 1132 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 1133 1134 mp = dbp->db_mblk; 1135 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1136 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1137 mp->b_rptr = mp->b_wptr = dbp->db_base; 1138 mp->b_queue = NULL; 1139 MBLK_BAND_FLAG_WORD(mp) = 0; 1140 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 1141 out: 1142 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 1143 1144 return (mp); 1145 } 1146 1147 static void 1148 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 1149 { 1150 ASSERT(dbp->db_mblk == mp); 1151 if (dbp->db_fthdr != NULL) 1152 str_ftfree(dbp); 1153 1154 /* set credp and projid to be 'unspecified' before returning to cache */ 1155 if (dbp->db_credp != NULL) { 1156 crfree(dbp->db_credp); 1157 dbp->db_credp = NULL; 1158 } 1159 dbp->db_cpid = -1; 1160 dbp->db_struioflag = 0; 1161 dbp->db_struioun.cksum.flags = 0; 1162 1163 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 1164 kmem_cache_free(dbp->db_cache, dbp); 1165 } 1166 1167 static mblk_t * 1168 allocb_oversize(size_t size, int kmflags) 1169 { 1170 mblk_t *mp; 1171 void *buf; 1172 1173 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 1174 if ((buf = kmem_alloc(size, kmflags)) == NULL) 1175 return (NULL); 1176 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 1177 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 1178 kmem_free(buf, size); 1179 1180 if (mp != NULL) 1181 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 1182 1183 return (mp); 1184 } 1185 1186 mblk_t * 1187 allocb_tryhard(size_t target_size) 1188 { 1189 size_t size; 1190 mblk_t *bp; 1191 1192 for (size = target_size; size < target_size + 512; 1193 size += DBLK_CACHE_ALIGN) 1194 if ((bp = allocb(size, BPRI_HI)) != NULL) 1195 return (bp); 1196 allocb_tryhard_fails++; 1197 return (NULL); 1198 } 1199 1200 /* 1201 * This routine is consolidation private for STREAMS internal use 1202 * This routine may only be called from sync routines (i.e., not 1203 * from put or service procedures). It is located here (rather 1204 * than strsubr.c) so that we don't have to expose all of the 1205 * allocb() implementation details in header files. 1206 */ 1207 mblk_t * 1208 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 1209 { 1210 dblk_t *dbp; 1211 mblk_t *mp; 1212 size_t index; 1213 1214 index = (size -1) >> DBLK_SIZE_SHIFT; 1215 1216 if (flags & STR_NOSIG) { 1217 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 1218 if (size != 0) { 1219 mp = allocb_oversize(size, KM_SLEEP); 1220 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1221 (uintptr_t)mp); 1222 return (mp); 1223 } 1224 index = 0; 1225 } 1226 1227 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1228 mp = dbp->db_mblk; 1229 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1230 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1231 mp->b_rptr = mp->b_wptr = dbp->db_base; 1232 mp->b_queue = NULL; 1233 MBLK_BAND_FLAG_WORD(mp) = 0; 1234 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1235 1236 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1237 1238 } else { 1239 while ((mp = allocb(size, pri)) == NULL) { 1240 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1241 return (NULL); 1242 } 1243 } 1244 1245 return (mp); 1246 } 1247 1248 /* 1249 * Call function 'func' with 'arg' when a class zero block can 1250 * be allocated with priority 'pri'. 1251 */ 1252 bufcall_id_t 1253 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1254 { 1255 return (bufcall(1, pri, func, arg)); 1256 } 1257 1258 /* 1259 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1260 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1261 * This provides consistency for all internal allocators of ioctl. 1262 */ 1263 mblk_t * 1264 mkiocb(uint_t cmd) 1265 { 1266 struct iocblk *ioc; 1267 mblk_t *mp; 1268 1269 /* 1270 * Allocate enough space for any of the ioctl related messages. 1271 */ 1272 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1273 return (NULL); 1274 1275 bzero(mp->b_rptr, sizeof (union ioctypes)); 1276 1277 /* 1278 * Set the mblk_t information and ptrs correctly. 1279 */ 1280 mp->b_wptr += sizeof (struct iocblk); 1281 mp->b_datap->db_type = M_IOCTL; 1282 1283 /* 1284 * Fill in the fields. 1285 */ 1286 ioc = (struct iocblk *)mp->b_rptr; 1287 ioc->ioc_cmd = cmd; 1288 ioc->ioc_cr = kcred; 1289 ioc->ioc_id = getiocseqno(); 1290 ioc->ioc_flag = IOC_NATIVE; 1291 return (mp); 1292 } 1293 1294 /* 1295 * test if block of given size can be allocated with a request of 1296 * the given priority. 1297 * 'pri' is no longer used, but is retained for compatibility. 1298 */ 1299 /* ARGSUSED */ 1300 int 1301 testb(size_t size, uint_t pri) 1302 { 1303 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1304 } 1305 1306 /* 1307 * Call function 'func' with argument 'arg' when there is a reasonably 1308 * good chance that a block of size 'size' can be allocated. 1309 * 'pri' is no longer used, but is retained for compatibility. 1310 */ 1311 /* ARGSUSED */ 1312 bufcall_id_t 1313 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1314 { 1315 static long bid = 1; /* always odd to save checking for zero */ 1316 bufcall_id_t bc_id; 1317 struct strbufcall *bcp; 1318 1319 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1320 return (0); 1321 1322 bcp->bc_func = func; 1323 bcp->bc_arg = arg; 1324 bcp->bc_size = size; 1325 bcp->bc_next = NULL; 1326 bcp->bc_executor = NULL; 1327 1328 mutex_enter(&strbcall_lock); 1329 /* 1330 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1331 * should be no references to bcp since it may be freed by 1332 * runbufcalls(). Since bcp_id field is returned, we save its value in 1333 * the local var. 1334 */ 1335 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1336 1337 /* 1338 * add newly allocated stream event to existing 1339 * linked list of events. 1340 */ 1341 if (strbcalls.bc_head == NULL) { 1342 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1343 } else { 1344 strbcalls.bc_tail->bc_next = bcp; 1345 strbcalls.bc_tail = bcp; 1346 } 1347 1348 cv_signal(&strbcall_cv); 1349 mutex_exit(&strbcall_lock); 1350 return (bc_id); 1351 } 1352 1353 /* 1354 * Cancel a bufcall request. 1355 */ 1356 void 1357 unbufcall(bufcall_id_t id) 1358 { 1359 strbufcall_t *bcp, *pbcp; 1360 1361 mutex_enter(&strbcall_lock); 1362 again: 1363 pbcp = NULL; 1364 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1365 if (id == bcp->bc_id) 1366 break; 1367 pbcp = bcp; 1368 } 1369 if (bcp) { 1370 if (bcp->bc_executor != NULL) { 1371 if (bcp->bc_executor != curthread) { 1372 cv_wait(&bcall_cv, &strbcall_lock); 1373 goto again; 1374 } 1375 } else { 1376 if (pbcp) 1377 pbcp->bc_next = bcp->bc_next; 1378 else 1379 strbcalls.bc_head = bcp->bc_next; 1380 if (bcp == strbcalls.bc_tail) 1381 strbcalls.bc_tail = pbcp; 1382 kmem_free(bcp, sizeof (strbufcall_t)); 1383 } 1384 } 1385 mutex_exit(&strbcall_lock); 1386 } 1387 1388 /* 1389 * Duplicate a message block by block (uses dupb), returning 1390 * a pointer to the duplicate message. 1391 * Returns a non-NULL value only if the entire message 1392 * was dup'd. 1393 */ 1394 mblk_t * 1395 dupmsg(mblk_t *bp) 1396 { 1397 mblk_t *head, *nbp; 1398 1399 if (!bp || !(nbp = head = dupb(bp))) 1400 return (NULL); 1401 1402 while (bp->b_cont) { 1403 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1404 freemsg(head); 1405 return (NULL); 1406 } 1407 nbp = nbp->b_cont; 1408 bp = bp->b_cont; 1409 } 1410 return (head); 1411 } 1412 1413 #define DUPB_NOLOAN(bp) \ 1414 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1415 copyb((bp)) : dupb((bp))) 1416 1417 mblk_t * 1418 dupmsg_noloan(mblk_t *bp) 1419 { 1420 mblk_t *head, *nbp; 1421 1422 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1423 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1424 return (NULL); 1425 1426 while (bp->b_cont) { 1427 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1428 freemsg(head); 1429 return (NULL); 1430 } 1431 nbp = nbp->b_cont; 1432 bp = bp->b_cont; 1433 } 1434 return (head); 1435 } 1436 1437 /* 1438 * Copy data from message and data block to newly allocated message and 1439 * data block. Returns new message block pointer, or NULL if error. 1440 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1441 * as in the original even when db_base is not word aligned. (bug 1052877) 1442 */ 1443 mblk_t * 1444 copyb(mblk_t *bp) 1445 { 1446 mblk_t *nbp; 1447 dblk_t *dp, *ndp; 1448 uchar_t *base; 1449 size_t size; 1450 size_t unaligned; 1451 1452 ASSERT(bp->b_wptr >= bp->b_rptr); 1453 1454 dp = bp->b_datap; 1455 if (dp->db_fthdr != NULL) 1456 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1457 1458 size = dp->db_lim - dp->db_base; 1459 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1460 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1461 return (NULL); 1462 nbp->b_flag = bp->b_flag; 1463 nbp->b_band = bp->b_band; 1464 ndp = nbp->b_datap; 1465 1466 /* 1467 * Copy the various checksum information that came in 1468 * originally. 1469 */ 1470 ndp->db_cksumstart = dp->db_cksumstart; 1471 ndp->db_cksumend = dp->db_cksumend; 1472 ndp->db_cksumstuff = dp->db_cksumstuff; 1473 bcopy(dp->db_struioun.data, ndp->db_struioun.data, 1474 sizeof (dp->db_struioun.data)); 1475 1476 /* 1477 * Well, here is a potential issue. If we are trying to 1478 * trace a flow, and we copy the message, we might lose 1479 * information about where this message might have been. 1480 * So we should inherit the FT data. On the other hand, 1481 * a user might be interested only in alloc to free data. 1482 * So I guess the real answer is to provide a tunable. 1483 */ 1484 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1485 1486 base = ndp->db_base + unaligned; 1487 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1488 1489 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1490 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1491 1492 return (nbp); 1493 } 1494 1495 /* 1496 * Copy data from message to newly allocated message using new 1497 * data blocks. Returns a pointer to the new message, or NULL if error. 1498 */ 1499 mblk_t * 1500 copymsg(mblk_t *bp) 1501 { 1502 mblk_t *head, *nbp; 1503 1504 if (!bp || !(nbp = head = copyb(bp))) 1505 return (NULL); 1506 1507 while (bp->b_cont) { 1508 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1509 freemsg(head); 1510 return (NULL); 1511 } 1512 nbp = nbp->b_cont; 1513 bp = bp->b_cont; 1514 } 1515 return (head); 1516 } 1517 1518 /* 1519 * link a message block to tail of message 1520 */ 1521 void 1522 linkb(mblk_t *mp, mblk_t *bp) 1523 { 1524 ASSERT(mp && bp); 1525 1526 for (; mp->b_cont; mp = mp->b_cont) 1527 ; 1528 mp->b_cont = bp; 1529 } 1530 1531 /* 1532 * unlink a message block from head of message 1533 * return pointer to new message. 1534 * NULL if message becomes empty. 1535 */ 1536 mblk_t * 1537 unlinkb(mblk_t *bp) 1538 { 1539 mblk_t *bp1; 1540 1541 bp1 = bp->b_cont; 1542 bp->b_cont = NULL; 1543 return (bp1); 1544 } 1545 1546 /* 1547 * remove a message block "bp" from message "mp" 1548 * 1549 * Return pointer to new message or NULL if no message remains. 1550 * Return -1 if bp is not found in message. 1551 */ 1552 mblk_t * 1553 rmvb(mblk_t *mp, mblk_t *bp) 1554 { 1555 mblk_t *tmp; 1556 mblk_t *lastp = NULL; 1557 1558 ASSERT(mp && bp); 1559 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1560 if (tmp == bp) { 1561 if (lastp) 1562 lastp->b_cont = tmp->b_cont; 1563 else 1564 mp = tmp->b_cont; 1565 tmp->b_cont = NULL; 1566 return (mp); 1567 } 1568 lastp = tmp; 1569 } 1570 return ((mblk_t *)-1); 1571 } 1572 1573 /* 1574 * Concatenate and align first len bytes of common 1575 * message type. Len == -1, means concat everything. 1576 * Returns 1 on success, 0 on failure 1577 * After the pullup, mp points to the pulled up data. 1578 */ 1579 int 1580 pullupmsg(mblk_t *mp, ssize_t len) 1581 { 1582 mblk_t *bp, *b_cont; 1583 dblk_t *dbp; 1584 ssize_t n; 1585 1586 ASSERT(mp->b_datap->db_ref > 0); 1587 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1588 1589 if (len == -1) { 1590 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1591 return (1); 1592 len = xmsgsize(mp); 1593 } else { 1594 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1595 ASSERT(first_mblk_len >= 0); 1596 /* 1597 * If the length is less than that of the first mblk, 1598 * we want to pull up the message into an aligned mblk. 1599 * Though not part of the spec, some callers assume it. 1600 */ 1601 if (len <= first_mblk_len) { 1602 if (str_aligned(mp->b_rptr)) 1603 return (1); 1604 len = first_mblk_len; 1605 } else if (xmsgsize(mp) < len) 1606 return (0); 1607 } 1608 1609 if ((bp = allocb_tmpl(len, mp)) == NULL) 1610 return (0); 1611 1612 dbp = bp->b_datap; 1613 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1614 mp->b_datap = dbp; /* ... and mp heads the new message */ 1615 mp->b_datap->db_mblk = mp; 1616 bp->b_datap->db_mblk = bp; 1617 mp->b_rptr = mp->b_wptr = dbp->db_base; 1618 1619 do { 1620 ASSERT(bp->b_datap->db_ref > 0); 1621 ASSERT(bp->b_wptr >= bp->b_rptr); 1622 n = MIN(bp->b_wptr - bp->b_rptr, len); 1623 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1624 if (n > 0) 1625 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1626 mp->b_wptr += n; 1627 bp->b_rptr += n; 1628 len -= n; 1629 if (bp->b_rptr != bp->b_wptr) 1630 break; 1631 b_cont = bp->b_cont; 1632 freeb(bp); 1633 bp = b_cont; 1634 } while (len && bp); 1635 1636 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1637 1638 return (1); 1639 } 1640 1641 /* 1642 * Concatenate and align at least the first len bytes of common message 1643 * type. Len == -1 means concatenate everything. The original message is 1644 * unaltered. Returns a pointer to a new message on success, otherwise 1645 * returns NULL. 1646 */ 1647 mblk_t * 1648 msgpullup(mblk_t *mp, ssize_t len) 1649 { 1650 mblk_t *newmp; 1651 ssize_t totlen = xmsgsize(mp); 1652 ssize_t offset = 0; 1653 1654 if (len == -1) 1655 len = totlen; 1656 1657 if (len < 0 || (len > 0 && len > totlen)) 1658 return (NULL); 1659 1660 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1661 return (NULL); 1662 1663 newmp->b_flag = mp->b_flag; 1664 newmp->b_band = mp->b_band; 1665 1666 while (len > 0) { 1667 ssize_t seglen = MBLKL(mp); 1668 ssize_t n = MIN(seglen, len); 1669 1670 ASSERT3P(mp, !=, NULL); /* guaranteed by len <= totlen */ 1671 ASSERT3S(n, >=, 0); /* allow zero-length mblk_t's */ 1672 if (n > 0) 1673 bcopy(mp->b_rptr, newmp->b_wptr, n); 1674 newmp->b_wptr += n; 1675 len -= n; 1676 1677 if (n == seglen) 1678 mp = mp->b_cont; 1679 else if (len == 0) 1680 offset = n; 1681 } 1682 ASSERT3S(len, ==, 0); 1683 1684 if (mp != NULL) { 1685 newmp->b_cont = dupmsg(mp); 1686 if (newmp->b_cont == NULL) { 1687 freemsg(newmp); 1688 return (NULL); 1689 } 1690 ASSERT3S(offset, >=, 0); 1691 ASSERT3U(MBLKL(newmp->b_cont), >=, offset); 1692 newmp->b_cont->b_rptr += offset; 1693 } 1694 1695 return (newmp); 1696 } 1697 1698 /* 1699 * Trim bytes from message 1700 * len > 0, trim from head 1701 * len < 0, trim from tail 1702 * Returns 1 on success, 0 on failure. 1703 */ 1704 int 1705 adjmsg(mblk_t *mp, ssize_t len) 1706 { 1707 mblk_t *bp; 1708 mblk_t *save_bp = NULL; 1709 mblk_t *prev_bp; 1710 mblk_t *bcont; 1711 unsigned char type; 1712 ssize_t n; 1713 int fromhead; 1714 int first; 1715 1716 ASSERT(mp != NULL); 1717 1718 if (len < 0) { 1719 fromhead = 0; 1720 len = -len; 1721 } else { 1722 fromhead = 1; 1723 } 1724 1725 if (xmsgsize(mp) < len) 1726 return (0); 1727 1728 if (fromhead) { 1729 first = 1; 1730 while (len) { 1731 ASSERT(mp->b_wptr >= mp->b_rptr); 1732 n = MIN(mp->b_wptr - mp->b_rptr, len); 1733 mp->b_rptr += n; 1734 len -= n; 1735 1736 /* 1737 * If this is not the first zero length 1738 * message remove it 1739 */ 1740 if (!first && (mp->b_wptr == mp->b_rptr)) { 1741 bcont = mp->b_cont; 1742 freeb(mp); 1743 mp = save_bp->b_cont = bcont; 1744 } else { 1745 save_bp = mp; 1746 mp = mp->b_cont; 1747 } 1748 first = 0; 1749 } 1750 } else { 1751 type = mp->b_datap->db_type; 1752 while (len) { 1753 bp = mp; 1754 save_bp = NULL; 1755 1756 /* 1757 * Find the last message of same type 1758 */ 1759 while (bp && bp->b_datap->db_type == type) { 1760 ASSERT(bp->b_wptr >= bp->b_rptr); 1761 prev_bp = save_bp; 1762 save_bp = bp; 1763 bp = bp->b_cont; 1764 } 1765 if (save_bp == NULL) 1766 break; 1767 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1768 save_bp->b_wptr -= n; 1769 len -= n; 1770 1771 /* 1772 * If this is not the first message 1773 * and we have taken away everything 1774 * from this message, remove it 1775 */ 1776 1777 if ((save_bp != mp) && 1778 (save_bp->b_wptr == save_bp->b_rptr)) { 1779 bcont = save_bp->b_cont; 1780 freeb(save_bp); 1781 prev_bp->b_cont = bcont; 1782 } 1783 } 1784 } 1785 return (1); 1786 } 1787 1788 /* 1789 * get number of data bytes in message 1790 */ 1791 size_t 1792 msgdsize(mblk_t *bp) 1793 { 1794 size_t count = 0; 1795 1796 for (; bp; bp = bp->b_cont) 1797 if (bp->b_datap->db_type == M_DATA) { 1798 ASSERT(bp->b_wptr >= bp->b_rptr); 1799 count += bp->b_wptr - bp->b_rptr; 1800 } 1801 return (count); 1802 } 1803 1804 /* 1805 * Get a message off head of queue 1806 * 1807 * If queue has no buffers then mark queue 1808 * with QWANTR. (queue wants to be read by 1809 * someone when data becomes available) 1810 * 1811 * If there is something to take off then do so. 1812 * If queue falls below hi water mark turn off QFULL 1813 * flag. Decrement weighted count of queue. 1814 * Also turn off QWANTR because queue is being read. 1815 * 1816 * The queue count is maintained on a per-band basis. 1817 * Priority band 0 (normal messages) uses q_count, 1818 * q_lowat, etc. Non-zero priority bands use the 1819 * fields in their respective qband structures 1820 * (qb_count, qb_lowat, etc.) All messages appear 1821 * on the same list, linked via their b_next pointers. 1822 * q_first is the head of the list. q_count does 1823 * not reflect the size of all the messages on the 1824 * queue. It only reflects those messages in the 1825 * normal band of flow. The one exception to this 1826 * deals with high priority messages. They are in 1827 * their own conceptual "band", but are accounted 1828 * against q_count. 1829 * 1830 * If queue count is below the lo water mark and QWANTW 1831 * is set, enable the closest backq which has a service 1832 * procedure and turn off the QWANTW flag. 1833 * 1834 * getq could be built on top of rmvq, but isn't because 1835 * of performance considerations. 1836 * 1837 * A note on the use of q_count and q_mblkcnt: 1838 * q_count is the traditional byte count for messages that 1839 * have been put on a queue. Documentation tells us that 1840 * we shouldn't rely on that count, but some drivers/modules 1841 * do. What was needed, however, is a mechanism to prevent 1842 * runaway streams from consuming all of the resources, 1843 * and particularly be able to flow control zero-length 1844 * messages. q_mblkcnt is used for this purpose. It 1845 * counts the number of mblk's that are being put on 1846 * the queue. The intention here, is that each mblk should 1847 * contain one byte of data and, for the purpose of 1848 * flow-control, logically does. A queue will become 1849 * full when EITHER of these values (q_count and q_mblkcnt) 1850 * reach the highwater mark. It will clear when BOTH 1851 * of them drop below the highwater mark. And it will 1852 * backenable when BOTH of them drop below the lowwater 1853 * mark. 1854 * With this algorithm, a driver/module might be able 1855 * to find a reasonably accurate q_count, and the 1856 * framework can still try and limit resource usage. 1857 */ 1858 mblk_t * 1859 getq(queue_t *q) 1860 { 1861 mblk_t *bp; 1862 uchar_t band = 0; 1863 1864 bp = getq_noenab(q, 0); 1865 if (bp != NULL) 1866 band = bp->b_band; 1867 1868 /* 1869 * Inlined from qbackenable(). 1870 * Quick check without holding the lock. 1871 */ 1872 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1873 return (bp); 1874 1875 qbackenable(q, band); 1876 return (bp); 1877 } 1878 1879 /* 1880 * Returns the number of bytes in a message (a message is defined as a 1881 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we 1882 * also return the number of distinct mblks in the message. 1883 */ 1884 int 1885 mp_cont_len(mblk_t *bp, int *mblkcnt) 1886 { 1887 mblk_t *mp; 1888 int mblks = 0; 1889 int bytes = 0; 1890 1891 for (mp = bp; mp != NULL; mp = mp->b_cont) { 1892 bytes += MBLKL(mp); 1893 mblks++; 1894 } 1895 1896 if (mblkcnt != NULL) 1897 *mblkcnt = mblks; 1898 1899 return (bytes); 1900 } 1901 1902 /* 1903 * Like getq() but does not backenable. This is used by the stream 1904 * head when a putback() is likely. The caller must call qbackenable() 1905 * after it is done with accessing the queue. 1906 * The rbytes arguments to getq_noneab() allows callers to specify a 1907 * the maximum number of bytes to return. If the current amount on the 1908 * queue is less than this then the entire message will be returned. 1909 * A value of 0 returns the entire message and is equivalent to the old 1910 * default behaviour prior to the addition of the rbytes argument. 1911 */ 1912 mblk_t * 1913 getq_noenab(queue_t *q, ssize_t rbytes) 1914 { 1915 mblk_t *bp, *mp1; 1916 mblk_t *mp2 = NULL; 1917 qband_t *qbp; 1918 kthread_id_t freezer; 1919 int bytecnt = 0, mblkcnt = 0; 1920 1921 /* freezestr should allow its caller to call getq/putq */ 1922 freezer = STREAM(q)->sd_freezer; 1923 if (freezer == curthread) { 1924 ASSERT(frozenstr(q)); 1925 ASSERT(MUTEX_HELD(QLOCK(q))); 1926 } else 1927 mutex_enter(QLOCK(q)); 1928 1929 if ((bp = q->q_first) == 0) { 1930 q->q_flag |= QWANTR; 1931 } else { 1932 /* 1933 * If the caller supplied a byte threshold and there is 1934 * more than this amount on the queue then break up the 1935 * the message appropriately. We can only safely do 1936 * this for M_DATA messages. 1937 */ 1938 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) && 1939 (q->q_count > rbytes)) { 1940 /* 1941 * Inline version of mp_cont_len() which terminates 1942 * when we meet or exceed rbytes. 1943 */ 1944 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) { 1945 mblkcnt++; 1946 bytecnt += MBLKL(mp1); 1947 if (bytecnt >= rbytes) 1948 break; 1949 } 1950 /* 1951 * We need to account for the following scenarios: 1952 * 1953 * 1) Too much data in the first message: 1954 * mp1 will be the mblk which puts us over our 1955 * byte limit. 1956 * 2) Not enough data in the first message: 1957 * mp1 will be NULL. 1958 * 3) Exactly the right amount of data contained within 1959 * whole mblks: 1960 * mp1->b_cont will be where we break the message. 1961 */ 1962 if (bytecnt > rbytes) { 1963 /* 1964 * Dup/copy mp1 and put what we don't need 1965 * back onto the queue. Adjust the read/write 1966 * and continuation pointers appropriately 1967 * and decrement the current mblk count to 1968 * reflect we are putting an mblk back onto 1969 * the queue. 1970 * When adjusting the message pointers, it's 1971 * OK to use the existing bytecnt and the 1972 * requested amount (rbytes) to calculate the 1973 * the new write offset (b_wptr) of what we 1974 * are taking. However, we cannot use these 1975 * values when calculating the read offset of 1976 * the mblk we are putting back on the queue. 1977 * This is because the begining (b_rptr) of the 1978 * mblk represents some arbitrary point within 1979 * the message. 1980 * It's simplest to do this by advancing b_rptr 1981 * by the new length of mp1 as we don't have to 1982 * remember any intermediate state. 1983 */ 1984 ASSERT(mp1 != NULL); 1985 mblkcnt--; 1986 if ((mp2 = dupb(mp1)) == NULL && 1987 (mp2 = copyb(mp1)) == NULL) { 1988 bytecnt = mblkcnt = 0; 1989 goto dup_failed; 1990 } 1991 mp2->b_cont = mp1->b_cont; 1992 mp1->b_wptr -= bytecnt - rbytes; 1993 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr; 1994 mp1->b_cont = NULL; 1995 bytecnt = rbytes; 1996 } else { 1997 /* 1998 * Either there is not enough data in the first 1999 * message or there is no excess data to deal 2000 * with. If mp1 is NULL, we are taking the 2001 * whole message. No need to do anything. 2002 * Otherwise we assign mp1->b_cont to mp2 as 2003 * we will be putting this back onto the head of 2004 * the queue. 2005 */ 2006 if (mp1 != NULL) { 2007 mp2 = mp1->b_cont; 2008 mp1->b_cont = NULL; 2009 } 2010 } 2011 /* 2012 * If mp2 is not NULL then we have part of the message 2013 * to put back onto the queue. 2014 */ 2015 if (mp2 != NULL) { 2016 if ((mp2->b_next = bp->b_next) == NULL) 2017 q->q_last = mp2; 2018 else 2019 bp->b_next->b_prev = mp2; 2020 q->q_first = mp2; 2021 } else { 2022 if ((q->q_first = bp->b_next) == NULL) 2023 q->q_last = NULL; 2024 else 2025 q->q_first->b_prev = NULL; 2026 } 2027 } else { 2028 /* 2029 * Either no byte threshold was supplied, there is 2030 * not enough on the queue or we failed to 2031 * duplicate/copy a data block. In these cases we 2032 * just take the entire first message. 2033 */ 2034 dup_failed: 2035 bytecnt = mp_cont_len(bp, &mblkcnt); 2036 if ((q->q_first = bp->b_next) == NULL) 2037 q->q_last = NULL; 2038 else 2039 q->q_first->b_prev = NULL; 2040 } 2041 if (bp->b_band == 0) { 2042 q->q_count -= bytecnt; 2043 q->q_mblkcnt -= mblkcnt; 2044 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2045 (q->q_mblkcnt < q->q_hiwat))) { 2046 q->q_flag &= ~QFULL; 2047 } 2048 } else { 2049 int i; 2050 2051 ASSERT(bp->b_band <= q->q_nband); 2052 ASSERT(q->q_bandp != NULL); 2053 ASSERT(MUTEX_HELD(QLOCK(q))); 2054 qbp = q->q_bandp; 2055 i = bp->b_band; 2056 while (--i > 0) 2057 qbp = qbp->qb_next; 2058 if (qbp->qb_first == qbp->qb_last) { 2059 qbp->qb_first = NULL; 2060 qbp->qb_last = NULL; 2061 } else { 2062 qbp->qb_first = bp->b_next; 2063 } 2064 qbp->qb_count -= bytecnt; 2065 qbp->qb_mblkcnt -= mblkcnt; 2066 if (qbp->qb_mblkcnt == 0 || 2067 ((qbp->qb_count < qbp->qb_hiwat) && 2068 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2069 qbp->qb_flag &= ~QB_FULL; 2070 } 2071 } 2072 q->q_flag &= ~QWANTR; 2073 bp->b_next = NULL; 2074 bp->b_prev = NULL; 2075 } 2076 if (freezer != curthread) 2077 mutex_exit(QLOCK(q)); 2078 2079 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0); 2080 2081 return (bp); 2082 } 2083 2084 /* 2085 * Determine if a backenable is needed after removing a message in the 2086 * specified band. 2087 * NOTE: This routine assumes that something like getq_noenab() has been 2088 * already called. 2089 * 2090 * For the read side it is ok to hold sd_lock across calling this (and the 2091 * stream head often does). 2092 * But for the write side strwakeq might be invoked and it acquires sd_lock. 2093 */ 2094 void 2095 qbackenable(queue_t *q, uchar_t band) 2096 { 2097 int backenab = 0; 2098 qband_t *qbp; 2099 kthread_id_t freezer; 2100 2101 ASSERT(q); 2102 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 2103 2104 /* 2105 * Quick check without holding the lock. 2106 * OK since after getq() has lowered the q_count these flags 2107 * would not change unless either the qbackenable() is done by 2108 * another thread (which is ok) or the queue has gotten QFULL 2109 * in which case another backenable will take place when the queue 2110 * drops below q_lowat. 2111 */ 2112 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 2113 return; 2114 2115 /* freezestr should allow its caller to call getq/putq */ 2116 freezer = STREAM(q)->sd_freezer; 2117 if (freezer == curthread) { 2118 ASSERT(frozenstr(q)); 2119 ASSERT(MUTEX_HELD(QLOCK(q))); 2120 } else 2121 mutex_enter(QLOCK(q)); 2122 2123 if (band == 0) { 2124 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 2125 q->q_mblkcnt < q->q_lowat)) { 2126 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 2127 } 2128 } else { 2129 int i; 2130 2131 ASSERT((unsigned)band <= q->q_nband); 2132 ASSERT(q->q_bandp != NULL); 2133 2134 qbp = q->q_bandp; 2135 i = band; 2136 while (--i > 0) 2137 qbp = qbp->qb_next; 2138 2139 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 2140 qbp->qb_mblkcnt < qbp->qb_lowat)) { 2141 backenab = qbp->qb_flag & QB_WANTW; 2142 } 2143 } 2144 2145 if (backenab == 0) { 2146 if (freezer != curthread) 2147 mutex_exit(QLOCK(q)); 2148 return; 2149 } 2150 2151 /* Have to drop the lock across strwakeq and backenable */ 2152 if (backenab & QWANTWSYNC) 2153 q->q_flag &= ~QWANTWSYNC; 2154 if (backenab & (QWANTW|QB_WANTW)) { 2155 if (band != 0) 2156 qbp->qb_flag &= ~QB_WANTW; 2157 else { 2158 q->q_flag &= ~QWANTW; 2159 } 2160 } 2161 2162 if (freezer != curthread) 2163 mutex_exit(QLOCK(q)); 2164 2165 if (backenab & QWANTWSYNC) 2166 strwakeq(q, QWANTWSYNC); 2167 if (backenab & (QWANTW|QB_WANTW)) 2168 backenable(q, band); 2169 } 2170 2171 /* 2172 * Remove a message from a queue. The queue count and other 2173 * flow control parameters are adjusted and the back queue 2174 * enabled if necessary. 2175 * 2176 * rmvq can be called with the stream frozen, but other utility functions 2177 * holding QLOCK, and by streams modules without any locks/frozen. 2178 */ 2179 void 2180 rmvq(queue_t *q, mblk_t *mp) 2181 { 2182 ASSERT(mp != NULL); 2183 2184 rmvq_noenab(q, mp); 2185 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 2186 /* 2187 * qbackenable can handle a frozen stream but not a "random" 2188 * qlock being held. Drop lock across qbackenable. 2189 */ 2190 mutex_exit(QLOCK(q)); 2191 qbackenable(q, mp->b_band); 2192 mutex_enter(QLOCK(q)); 2193 } else { 2194 qbackenable(q, mp->b_band); 2195 } 2196 } 2197 2198 /* 2199 * Like rmvq() but without any backenabling. 2200 * This exists to handle SR_CONSOL_DATA in strrput(). 2201 */ 2202 void 2203 rmvq_noenab(queue_t *q, mblk_t *mp) 2204 { 2205 int i; 2206 qband_t *qbp = NULL; 2207 kthread_id_t freezer; 2208 int bytecnt = 0, mblkcnt = 0; 2209 2210 freezer = STREAM(q)->sd_freezer; 2211 if (freezer == curthread) { 2212 ASSERT(frozenstr(q)); 2213 ASSERT(MUTEX_HELD(QLOCK(q))); 2214 } else if (MUTEX_HELD(QLOCK(q))) { 2215 /* Don't drop lock on exit */ 2216 freezer = curthread; 2217 } else 2218 mutex_enter(QLOCK(q)); 2219 2220 ASSERT(mp->b_band <= q->q_nband); 2221 if (mp->b_band != 0) { /* Adjust band pointers */ 2222 ASSERT(q->q_bandp != NULL); 2223 qbp = q->q_bandp; 2224 i = mp->b_band; 2225 while (--i > 0) 2226 qbp = qbp->qb_next; 2227 if (mp == qbp->qb_first) { 2228 if (mp->b_next && mp->b_band == mp->b_next->b_band) 2229 qbp->qb_first = mp->b_next; 2230 else 2231 qbp->qb_first = NULL; 2232 } 2233 if (mp == qbp->qb_last) { 2234 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 2235 qbp->qb_last = mp->b_prev; 2236 else 2237 qbp->qb_last = NULL; 2238 } 2239 } 2240 2241 /* 2242 * Remove the message from the list. 2243 */ 2244 if (mp->b_prev) 2245 mp->b_prev->b_next = mp->b_next; 2246 else 2247 q->q_first = mp->b_next; 2248 if (mp->b_next) 2249 mp->b_next->b_prev = mp->b_prev; 2250 else 2251 q->q_last = mp->b_prev; 2252 mp->b_next = NULL; 2253 mp->b_prev = NULL; 2254 2255 /* Get the size of the message for q_count accounting */ 2256 bytecnt = mp_cont_len(mp, &mblkcnt); 2257 2258 if (mp->b_band == 0) { /* Perform q_count accounting */ 2259 q->q_count -= bytecnt; 2260 q->q_mblkcnt -= mblkcnt; 2261 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2262 (q->q_mblkcnt < q->q_hiwat))) { 2263 q->q_flag &= ~QFULL; 2264 } 2265 } else { /* Perform qb_count accounting */ 2266 qbp->qb_count -= bytecnt; 2267 qbp->qb_mblkcnt -= mblkcnt; 2268 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) && 2269 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2270 qbp->qb_flag &= ~QB_FULL; 2271 } 2272 } 2273 if (freezer != curthread) 2274 mutex_exit(QLOCK(q)); 2275 2276 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0); 2277 } 2278 2279 /* 2280 * Empty a queue. 2281 * If flag is set, remove all messages. Otherwise, remove 2282 * only non-control messages. If queue falls below its low 2283 * water mark, and QWANTW is set, enable the nearest upstream 2284 * service procedure. 2285 * 2286 * Historical note: when merging the M_FLUSH code in strrput with this 2287 * code one difference was discovered. flushq did not have a check 2288 * for q_lowat == 0 in the backenabling test. 2289 * 2290 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2291 * if one exists on the queue. 2292 */ 2293 void 2294 flushq_common(queue_t *q, int flag, int pcproto_flag) 2295 { 2296 mblk_t *mp, *nmp; 2297 qband_t *qbp; 2298 int backenab = 0; 2299 unsigned char bpri; 2300 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2301 2302 if (q->q_first == NULL) 2303 return; 2304 2305 mutex_enter(QLOCK(q)); 2306 mp = q->q_first; 2307 q->q_first = NULL; 2308 q->q_last = NULL; 2309 q->q_count = 0; 2310 q->q_mblkcnt = 0; 2311 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2312 qbp->qb_first = NULL; 2313 qbp->qb_last = NULL; 2314 qbp->qb_count = 0; 2315 qbp->qb_mblkcnt = 0; 2316 qbp->qb_flag &= ~QB_FULL; 2317 } 2318 q->q_flag &= ~QFULL; 2319 mutex_exit(QLOCK(q)); 2320 while (mp) { 2321 nmp = mp->b_next; 2322 mp->b_next = mp->b_prev = NULL; 2323 2324 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0); 2325 2326 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2327 (void) putq(q, mp); 2328 else if (flag || datamsg(mp->b_datap->db_type)) 2329 freemsg(mp); 2330 else 2331 (void) putq(q, mp); 2332 mp = nmp; 2333 } 2334 bpri = 1; 2335 mutex_enter(QLOCK(q)); 2336 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2337 if ((qbp->qb_flag & QB_WANTW) && 2338 (((qbp->qb_count < qbp->qb_lowat) && 2339 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2340 qbp->qb_lowat == 0)) { 2341 qbp->qb_flag &= ~QB_WANTW; 2342 backenab = 1; 2343 qbf[bpri] = 1; 2344 } else 2345 qbf[bpri] = 0; 2346 bpri++; 2347 } 2348 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2349 if ((q->q_flag & QWANTW) && 2350 (((q->q_count < q->q_lowat) && 2351 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2352 q->q_flag &= ~QWANTW; 2353 backenab = 1; 2354 qbf[0] = 1; 2355 } else 2356 qbf[0] = 0; 2357 2358 /* 2359 * If any band can now be written to, and there is a writer 2360 * for that band, then backenable the closest service procedure. 2361 */ 2362 if (backenab) { 2363 mutex_exit(QLOCK(q)); 2364 for (bpri = q->q_nband; bpri != 0; bpri--) 2365 if (qbf[bpri]) 2366 backenable(q, bpri); 2367 if (qbf[0]) 2368 backenable(q, 0); 2369 } else 2370 mutex_exit(QLOCK(q)); 2371 } 2372 2373 /* 2374 * The real flushing takes place in flushq_common. This is done so that 2375 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2376 * or not. Currently the only place that uses this flag is the stream head. 2377 */ 2378 void 2379 flushq(queue_t *q, int flag) 2380 { 2381 flushq_common(q, flag, 0); 2382 } 2383 2384 /* 2385 * Flush the queue of messages of the given priority band. 2386 * There is some duplication of code between flushq and flushband. 2387 * This is because we want to optimize the code as much as possible. 2388 * The assumption is that there will be more messages in the normal 2389 * (priority 0) band than in any other. 2390 * 2391 * Historical note: when merging the M_FLUSH code in strrput with this 2392 * code one difference was discovered. flushband had an extra check for 2393 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2394 * case. That check does not match the man page for flushband and was not 2395 * in the strrput flush code hence it was removed. 2396 */ 2397 void 2398 flushband(queue_t *q, unsigned char pri, int flag) 2399 { 2400 mblk_t *mp; 2401 mblk_t *nmp; 2402 mblk_t *last; 2403 qband_t *qbp; 2404 int band; 2405 2406 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2407 if (pri > q->q_nband) { 2408 return; 2409 } 2410 mutex_enter(QLOCK(q)); 2411 if (pri == 0) { 2412 mp = q->q_first; 2413 q->q_first = NULL; 2414 q->q_last = NULL; 2415 q->q_count = 0; 2416 q->q_mblkcnt = 0; 2417 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2418 qbp->qb_first = NULL; 2419 qbp->qb_last = NULL; 2420 qbp->qb_count = 0; 2421 qbp->qb_mblkcnt = 0; 2422 qbp->qb_flag &= ~QB_FULL; 2423 } 2424 q->q_flag &= ~QFULL; 2425 mutex_exit(QLOCK(q)); 2426 while (mp) { 2427 nmp = mp->b_next; 2428 mp->b_next = mp->b_prev = NULL; 2429 if ((mp->b_band == 0) && 2430 ((flag == FLUSHALL) || 2431 datamsg(mp->b_datap->db_type))) 2432 freemsg(mp); 2433 else 2434 (void) putq(q, mp); 2435 mp = nmp; 2436 } 2437 mutex_enter(QLOCK(q)); 2438 if ((q->q_flag & QWANTW) && 2439 (((q->q_count < q->q_lowat) && 2440 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2441 q->q_flag &= ~QWANTW; 2442 mutex_exit(QLOCK(q)); 2443 2444 backenable(q, pri); 2445 } else 2446 mutex_exit(QLOCK(q)); 2447 } else { /* pri != 0 */ 2448 boolean_t flushed = B_FALSE; 2449 band = pri; 2450 2451 ASSERT(MUTEX_HELD(QLOCK(q))); 2452 qbp = q->q_bandp; 2453 while (--band > 0) 2454 qbp = qbp->qb_next; 2455 mp = qbp->qb_first; 2456 if (mp == NULL) { 2457 mutex_exit(QLOCK(q)); 2458 return; 2459 } 2460 last = qbp->qb_last->b_next; 2461 /* 2462 * rmvq_noenab() and freemsg() are called for each mblk that 2463 * meets the criteria. The loop is executed until the last 2464 * mblk has been processed. 2465 */ 2466 while (mp != last) { 2467 ASSERT(mp->b_band == pri); 2468 nmp = mp->b_next; 2469 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2470 rmvq_noenab(q, mp); 2471 freemsg(mp); 2472 flushed = B_TRUE; 2473 } 2474 mp = nmp; 2475 } 2476 mutex_exit(QLOCK(q)); 2477 2478 /* 2479 * If any mblk(s) has been freed, we know that qbackenable() 2480 * will need to be called. 2481 */ 2482 if (flushed) 2483 qbackenable(q, pri); 2484 } 2485 } 2486 2487 /* 2488 * Return 1 if the queue is not full. If the queue is full, return 2489 * 0 (may not put message) and set QWANTW flag (caller wants to write 2490 * to the queue). 2491 */ 2492 int 2493 canput(queue_t *q) 2494 { 2495 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2496 2497 /* this is for loopback transports, they should not do a canput */ 2498 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2499 2500 /* Find next forward module that has a service procedure */ 2501 q = q->q_nfsrv; 2502 2503 if (!(q->q_flag & QFULL)) { 2504 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2505 return (1); 2506 } 2507 mutex_enter(QLOCK(q)); 2508 if (q->q_flag & QFULL) { 2509 q->q_flag |= QWANTW; 2510 mutex_exit(QLOCK(q)); 2511 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2512 return (0); 2513 } 2514 mutex_exit(QLOCK(q)); 2515 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2516 return (1); 2517 } 2518 2519 /* 2520 * This is the new canput for use with priority bands. Return 1 if the 2521 * band is not full. If the band is full, return 0 (may not put message) 2522 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2523 * write to the queue). 2524 */ 2525 int 2526 bcanput(queue_t *q, unsigned char pri) 2527 { 2528 qband_t *qbp; 2529 2530 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2531 if (!q) 2532 return (0); 2533 2534 /* Find next forward module that has a service procedure */ 2535 q = q->q_nfsrv; 2536 2537 mutex_enter(QLOCK(q)); 2538 if (pri == 0) { 2539 if (q->q_flag & QFULL) { 2540 q->q_flag |= QWANTW; 2541 mutex_exit(QLOCK(q)); 2542 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2543 "bcanput:%p %X %d", q, pri, 0); 2544 return (0); 2545 } 2546 } else { /* pri != 0 */ 2547 if (pri > q->q_nband) { 2548 /* 2549 * No band exists yet, so return success. 2550 */ 2551 mutex_exit(QLOCK(q)); 2552 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2553 "bcanput:%p %X %d", q, pri, 1); 2554 return (1); 2555 } 2556 qbp = q->q_bandp; 2557 while (--pri) 2558 qbp = qbp->qb_next; 2559 if (qbp->qb_flag & QB_FULL) { 2560 qbp->qb_flag |= QB_WANTW; 2561 mutex_exit(QLOCK(q)); 2562 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2563 "bcanput:%p %X %d", q, pri, 0); 2564 return (0); 2565 } 2566 } 2567 mutex_exit(QLOCK(q)); 2568 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2569 "bcanput:%p %X %d", q, pri, 1); 2570 return (1); 2571 } 2572 2573 /* 2574 * Put a message on a queue. 2575 * 2576 * Messages are enqueued on a priority basis. The priority classes 2577 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2578 * and B_NORMAL (type < QPCTL && band == 0). 2579 * 2580 * Add appropriate weighted data block sizes to queue count. 2581 * If queue hits high water mark then set QFULL flag. 2582 * 2583 * If QNOENAB is not set (putq is allowed to enable the queue), 2584 * enable the queue only if the message is PRIORITY, 2585 * or the QWANTR flag is set (indicating that the service procedure 2586 * is ready to read the queue. This implies that a service 2587 * procedure must NEVER put a high priority message back on its own 2588 * queue, as this would result in an infinite loop (!). 2589 */ 2590 int 2591 putq(queue_t *q, mblk_t *bp) 2592 { 2593 mblk_t *tmp; 2594 qband_t *qbp = NULL; 2595 int mcls = (int)queclass(bp); 2596 kthread_id_t freezer; 2597 int bytecnt = 0, mblkcnt = 0; 2598 2599 freezer = STREAM(q)->sd_freezer; 2600 if (freezer == curthread) { 2601 ASSERT(frozenstr(q)); 2602 ASSERT(MUTEX_HELD(QLOCK(q))); 2603 } else 2604 mutex_enter(QLOCK(q)); 2605 2606 /* 2607 * Make sanity checks and if qband structure is not yet 2608 * allocated, do so. 2609 */ 2610 if (mcls == QPCTL) { 2611 if (bp->b_band != 0) 2612 bp->b_band = 0; /* force to be correct */ 2613 } else if (bp->b_band != 0) { 2614 int i; 2615 qband_t **qbpp; 2616 2617 if (bp->b_band > q->q_nband) { 2618 2619 /* 2620 * The qband structure for this priority band is 2621 * not on the queue yet, so we have to allocate 2622 * one on the fly. It would be wasteful to 2623 * associate the qband structures with every 2624 * queue when the queues are allocated. This is 2625 * because most queues will only need the normal 2626 * band of flow which can be described entirely 2627 * by the queue itself. 2628 */ 2629 qbpp = &q->q_bandp; 2630 while (*qbpp) 2631 qbpp = &(*qbpp)->qb_next; 2632 while (bp->b_band > q->q_nband) { 2633 if ((*qbpp = allocband()) == NULL) { 2634 if (freezer != curthread) 2635 mutex_exit(QLOCK(q)); 2636 return (0); 2637 } 2638 (*qbpp)->qb_hiwat = q->q_hiwat; 2639 (*qbpp)->qb_lowat = q->q_lowat; 2640 q->q_nband++; 2641 qbpp = &(*qbpp)->qb_next; 2642 } 2643 } 2644 ASSERT(MUTEX_HELD(QLOCK(q))); 2645 qbp = q->q_bandp; 2646 i = bp->b_band; 2647 while (--i) 2648 qbp = qbp->qb_next; 2649 } 2650 2651 /* 2652 * If queue is empty, add the message and initialize the pointers. 2653 * Otherwise, adjust message pointers and queue pointers based on 2654 * the type of the message and where it belongs on the queue. Some 2655 * code is duplicated to minimize the number of conditionals and 2656 * hopefully minimize the amount of time this routine takes. 2657 */ 2658 if (!q->q_first) { 2659 bp->b_next = NULL; 2660 bp->b_prev = NULL; 2661 q->q_first = bp; 2662 q->q_last = bp; 2663 if (qbp) { 2664 qbp->qb_first = bp; 2665 qbp->qb_last = bp; 2666 } 2667 } else if (!qbp) { /* bp->b_band == 0 */ 2668 2669 /* 2670 * If queue class of message is less than or equal to 2671 * that of the last one on the queue, tack on to the end. 2672 */ 2673 tmp = q->q_last; 2674 if (mcls <= (int)queclass(tmp)) { 2675 bp->b_next = NULL; 2676 bp->b_prev = tmp; 2677 tmp->b_next = bp; 2678 q->q_last = bp; 2679 } else { 2680 tmp = q->q_first; 2681 while ((int)queclass(tmp) >= mcls) 2682 tmp = tmp->b_next; 2683 2684 /* 2685 * Insert bp before tmp. 2686 */ 2687 bp->b_next = tmp; 2688 bp->b_prev = tmp->b_prev; 2689 if (tmp->b_prev) 2690 tmp->b_prev->b_next = bp; 2691 else 2692 q->q_first = bp; 2693 tmp->b_prev = bp; 2694 } 2695 } else { /* bp->b_band != 0 */ 2696 if (qbp->qb_first) { 2697 tmp = qbp->qb_last; 2698 2699 /* 2700 * Insert bp after the last message in this band. 2701 */ 2702 bp->b_next = tmp->b_next; 2703 if (tmp->b_next) 2704 tmp->b_next->b_prev = bp; 2705 else 2706 q->q_last = bp; 2707 bp->b_prev = tmp; 2708 tmp->b_next = bp; 2709 } else { 2710 tmp = q->q_last; 2711 if ((mcls < (int)queclass(tmp)) || 2712 (bp->b_band <= tmp->b_band)) { 2713 2714 /* 2715 * Tack bp on end of queue. 2716 */ 2717 bp->b_next = NULL; 2718 bp->b_prev = tmp; 2719 tmp->b_next = bp; 2720 q->q_last = bp; 2721 } else { 2722 tmp = q->q_first; 2723 while (tmp->b_datap->db_type >= QPCTL) 2724 tmp = tmp->b_next; 2725 while (tmp->b_band >= bp->b_band) 2726 tmp = tmp->b_next; 2727 2728 /* 2729 * Insert bp before tmp. 2730 */ 2731 bp->b_next = tmp; 2732 bp->b_prev = tmp->b_prev; 2733 if (tmp->b_prev) 2734 tmp->b_prev->b_next = bp; 2735 else 2736 q->q_first = bp; 2737 tmp->b_prev = bp; 2738 } 2739 qbp->qb_first = bp; 2740 } 2741 qbp->qb_last = bp; 2742 } 2743 2744 /* Get message byte count for q_count accounting */ 2745 bytecnt = mp_cont_len(bp, &mblkcnt); 2746 2747 if (qbp) { 2748 qbp->qb_count += bytecnt; 2749 qbp->qb_mblkcnt += mblkcnt; 2750 if ((qbp->qb_count >= qbp->qb_hiwat) || 2751 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2752 qbp->qb_flag |= QB_FULL; 2753 } 2754 } else { 2755 q->q_count += bytecnt; 2756 q->q_mblkcnt += mblkcnt; 2757 if ((q->q_count >= q->q_hiwat) || 2758 (q->q_mblkcnt >= q->q_hiwat)) { 2759 q->q_flag |= QFULL; 2760 } 2761 } 2762 2763 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0); 2764 2765 if ((mcls > QNORM) || 2766 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2767 qenable_locked(q); 2768 ASSERT(MUTEX_HELD(QLOCK(q))); 2769 if (freezer != curthread) 2770 mutex_exit(QLOCK(q)); 2771 2772 return (1); 2773 } 2774 2775 /* 2776 * Put stuff back at beginning of Q according to priority order. 2777 * See comment on putq above for details. 2778 */ 2779 int 2780 putbq(queue_t *q, mblk_t *bp) 2781 { 2782 mblk_t *tmp; 2783 qband_t *qbp = NULL; 2784 int mcls = (int)queclass(bp); 2785 kthread_id_t freezer; 2786 int bytecnt = 0, mblkcnt = 0; 2787 2788 ASSERT(q && bp); 2789 ASSERT(bp->b_next == NULL); 2790 freezer = STREAM(q)->sd_freezer; 2791 if (freezer == curthread) { 2792 ASSERT(frozenstr(q)); 2793 ASSERT(MUTEX_HELD(QLOCK(q))); 2794 } else 2795 mutex_enter(QLOCK(q)); 2796 2797 /* 2798 * Make sanity checks and if qband structure is not yet 2799 * allocated, do so. 2800 */ 2801 if (mcls == QPCTL) { 2802 if (bp->b_band != 0) 2803 bp->b_band = 0; /* force to be correct */ 2804 } else if (bp->b_band != 0) { 2805 int i; 2806 qband_t **qbpp; 2807 2808 if (bp->b_band > q->q_nband) { 2809 qbpp = &q->q_bandp; 2810 while (*qbpp) 2811 qbpp = &(*qbpp)->qb_next; 2812 while (bp->b_band > q->q_nband) { 2813 if ((*qbpp = allocband()) == NULL) { 2814 if (freezer != curthread) 2815 mutex_exit(QLOCK(q)); 2816 return (0); 2817 } 2818 (*qbpp)->qb_hiwat = q->q_hiwat; 2819 (*qbpp)->qb_lowat = q->q_lowat; 2820 q->q_nband++; 2821 qbpp = &(*qbpp)->qb_next; 2822 } 2823 } 2824 qbp = q->q_bandp; 2825 i = bp->b_band; 2826 while (--i) 2827 qbp = qbp->qb_next; 2828 } 2829 2830 /* 2831 * If queue is empty or if message is high priority, 2832 * place on the front of the queue. 2833 */ 2834 tmp = q->q_first; 2835 if ((!tmp) || (mcls == QPCTL)) { 2836 bp->b_next = tmp; 2837 if (tmp) 2838 tmp->b_prev = bp; 2839 else 2840 q->q_last = bp; 2841 q->q_first = bp; 2842 bp->b_prev = NULL; 2843 if (qbp) { 2844 qbp->qb_first = bp; 2845 qbp->qb_last = bp; 2846 } 2847 } else if (qbp) { /* bp->b_band != 0 */ 2848 tmp = qbp->qb_first; 2849 if (tmp) { 2850 2851 /* 2852 * Insert bp before the first message in this band. 2853 */ 2854 bp->b_next = tmp; 2855 bp->b_prev = tmp->b_prev; 2856 if (tmp->b_prev) 2857 tmp->b_prev->b_next = bp; 2858 else 2859 q->q_first = bp; 2860 tmp->b_prev = bp; 2861 } else { 2862 tmp = q->q_last; 2863 if ((mcls < (int)queclass(tmp)) || 2864 (bp->b_band < tmp->b_band)) { 2865 2866 /* 2867 * Tack bp on end of queue. 2868 */ 2869 bp->b_next = NULL; 2870 bp->b_prev = tmp; 2871 tmp->b_next = bp; 2872 q->q_last = bp; 2873 } else { 2874 tmp = q->q_first; 2875 while (tmp->b_datap->db_type >= QPCTL) 2876 tmp = tmp->b_next; 2877 while (tmp->b_band > bp->b_band) 2878 tmp = tmp->b_next; 2879 2880 /* 2881 * Insert bp before tmp. 2882 */ 2883 bp->b_next = tmp; 2884 bp->b_prev = tmp->b_prev; 2885 if (tmp->b_prev) 2886 tmp->b_prev->b_next = bp; 2887 else 2888 q->q_first = bp; 2889 tmp->b_prev = bp; 2890 } 2891 qbp->qb_last = bp; 2892 } 2893 qbp->qb_first = bp; 2894 } else { /* bp->b_band == 0 && !QPCTL */ 2895 2896 /* 2897 * If the queue class or band is less than that of the last 2898 * message on the queue, tack bp on the end of the queue. 2899 */ 2900 tmp = q->q_last; 2901 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2902 bp->b_next = NULL; 2903 bp->b_prev = tmp; 2904 tmp->b_next = bp; 2905 q->q_last = bp; 2906 } else { 2907 tmp = q->q_first; 2908 while (tmp->b_datap->db_type >= QPCTL) 2909 tmp = tmp->b_next; 2910 while (tmp->b_band > bp->b_band) 2911 tmp = tmp->b_next; 2912 2913 /* 2914 * Insert bp before tmp. 2915 */ 2916 bp->b_next = tmp; 2917 bp->b_prev = tmp->b_prev; 2918 if (tmp->b_prev) 2919 tmp->b_prev->b_next = bp; 2920 else 2921 q->q_first = bp; 2922 tmp->b_prev = bp; 2923 } 2924 } 2925 2926 /* Get message byte count for q_count accounting */ 2927 bytecnt = mp_cont_len(bp, &mblkcnt); 2928 2929 if (qbp) { 2930 qbp->qb_count += bytecnt; 2931 qbp->qb_mblkcnt += mblkcnt; 2932 if ((qbp->qb_count >= qbp->qb_hiwat) || 2933 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2934 qbp->qb_flag |= QB_FULL; 2935 } 2936 } else { 2937 q->q_count += bytecnt; 2938 q->q_mblkcnt += mblkcnt; 2939 if ((q->q_count >= q->q_hiwat) || 2940 (q->q_mblkcnt >= q->q_hiwat)) { 2941 q->q_flag |= QFULL; 2942 } 2943 } 2944 2945 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0); 2946 2947 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2948 qenable_locked(q); 2949 ASSERT(MUTEX_HELD(QLOCK(q))); 2950 if (freezer != curthread) 2951 mutex_exit(QLOCK(q)); 2952 2953 return (1); 2954 } 2955 2956 /* 2957 * Insert a message before an existing message on the queue. If the 2958 * existing message is NULL, the new messages is placed on the end of 2959 * the queue. The queue class of the new message is ignored. However, 2960 * the priority band of the new message must adhere to the following 2961 * ordering: 2962 * 2963 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2964 * 2965 * All flow control parameters are updated. 2966 * 2967 * insq can be called with the stream frozen, but other utility functions 2968 * holding QLOCK, and by streams modules without any locks/frozen. 2969 */ 2970 int 2971 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2972 { 2973 mblk_t *tmp; 2974 qband_t *qbp = NULL; 2975 int mcls = (int)queclass(mp); 2976 kthread_id_t freezer; 2977 int bytecnt = 0, mblkcnt = 0; 2978 2979 freezer = STREAM(q)->sd_freezer; 2980 if (freezer == curthread) { 2981 ASSERT(frozenstr(q)); 2982 ASSERT(MUTEX_HELD(QLOCK(q))); 2983 } else if (MUTEX_HELD(QLOCK(q))) { 2984 /* Don't drop lock on exit */ 2985 freezer = curthread; 2986 } else 2987 mutex_enter(QLOCK(q)); 2988 2989 if (mcls == QPCTL) { 2990 if (mp->b_band != 0) 2991 mp->b_band = 0; /* force to be correct */ 2992 if (emp && emp->b_prev && 2993 (emp->b_prev->b_datap->db_type < QPCTL)) 2994 goto badord; 2995 } 2996 if (emp) { 2997 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2998 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2999 (emp->b_prev->b_band < mp->b_band))) { 3000 goto badord; 3001 } 3002 } else { 3003 tmp = q->q_last; 3004 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 3005 badord: 3006 cmn_err(CE_WARN, 3007 "insq: attempt to insert message out of order " 3008 "on q %p", (void *)q); 3009 if (freezer != curthread) 3010 mutex_exit(QLOCK(q)); 3011 return (0); 3012 } 3013 } 3014 3015 if (mp->b_band != 0) { 3016 int i; 3017 qband_t **qbpp; 3018 3019 if (mp->b_band > q->q_nband) { 3020 qbpp = &q->q_bandp; 3021 while (*qbpp) 3022 qbpp = &(*qbpp)->qb_next; 3023 while (mp->b_band > q->q_nband) { 3024 if ((*qbpp = allocband()) == NULL) { 3025 if (freezer != curthread) 3026 mutex_exit(QLOCK(q)); 3027 return (0); 3028 } 3029 (*qbpp)->qb_hiwat = q->q_hiwat; 3030 (*qbpp)->qb_lowat = q->q_lowat; 3031 q->q_nband++; 3032 qbpp = &(*qbpp)->qb_next; 3033 } 3034 } 3035 qbp = q->q_bandp; 3036 i = mp->b_band; 3037 while (--i) 3038 qbp = qbp->qb_next; 3039 } 3040 3041 if ((mp->b_next = emp) != NULL) { 3042 if ((mp->b_prev = emp->b_prev) != NULL) 3043 emp->b_prev->b_next = mp; 3044 else 3045 q->q_first = mp; 3046 emp->b_prev = mp; 3047 } else { 3048 if ((mp->b_prev = q->q_last) != NULL) 3049 q->q_last->b_next = mp; 3050 else 3051 q->q_first = mp; 3052 q->q_last = mp; 3053 } 3054 3055 /* Get mblk and byte count for q_count accounting */ 3056 bytecnt = mp_cont_len(mp, &mblkcnt); 3057 3058 if (qbp) { /* adjust qband pointers and count */ 3059 if (!qbp->qb_first) { 3060 qbp->qb_first = mp; 3061 qbp->qb_last = mp; 3062 } else { 3063 if (mp->b_prev == NULL || (mp->b_prev != NULL && 3064 (mp->b_prev->b_band != mp->b_band))) 3065 qbp->qb_first = mp; 3066 else if (mp->b_next == NULL || (mp->b_next != NULL && 3067 (mp->b_next->b_band != mp->b_band))) 3068 qbp->qb_last = mp; 3069 } 3070 qbp->qb_count += bytecnt; 3071 qbp->qb_mblkcnt += mblkcnt; 3072 if ((qbp->qb_count >= qbp->qb_hiwat) || 3073 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 3074 qbp->qb_flag |= QB_FULL; 3075 } 3076 } else { 3077 q->q_count += bytecnt; 3078 q->q_mblkcnt += mblkcnt; 3079 if ((q->q_count >= q->q_hiwat) || 3080 (q->q_mblkcnt >= q->q_hiwat)) { 3081 q->q_flag |= QFULL; 3082 } 3083 } 3084 3085 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0); 3086 3087 if (canenable(q) && (q->q_flag & QWANTR)) 3088 qenable_locked(q); 3089 3090 ASSERT(MUTEX_HELD(QLOCK(q))); 3091 if (freezer != curthread) 3092 mutex_exit(QLOCK(q)); 3093 3094 return (1); 3095 } 3096 3097 /* 3098 * Create and put a control message on queue. 3099 */ 3100 int 3101 putctl(queue_t *q, int type) 3102 { 3103 mblk_t *bp; 3104 3105 if ((datamsg(type) && (type != M_DELAY)) || 3106 (bp = allocb_tryhard(0)) == NULL) 3107 return (0); 3108 bp->b_datap->db_type = (unsigned char) type; 3109 3110 put(q, bp); 3111 3112 return (1); 3113 } 3114 3115 /* 3116 * Control message with a single-byte parameter 3117 */ 3118 int 3119 putctl1(queue_t *q, int type, int param) 3120 { 3121 mblk_t *bp; 3122 3123 if ((datamsg(type) && (type != M_DELAY)) || 3124 (bp = allocb_tryhard(1)) == NULL) 3125 return (0); 3126 bp->b_datap->db_type = (unsigned char)type; 3127 *bp->b_wptr++ = (unsigned char)param; 3128 3129 put(q, bp); 3130 3131 return (1); 3132 } 3133 3134 int 3135 putnextctl1(queue_t *q, int type, int param) 3136 { 3137 mblk_t *bp; 3138 3139 if ((datamsg(type) && (type != M_DELAY)) || 3140 ((bp = allocb_tryhard(1)) == NULL)) 3141 return (0); 3142 3143 bp->b_datap->db_type = (unsigned char)type; 3144 *bp->b_wptr++ = (unsigned char)param; 3145 3146 putnext(q, bp); 3147 3148 return (1); 3149 } 3150 3151 int 3152 putnextctl(queue_t *q, int type) 3153 { 3154 mblk_t *bp; 3155 3156 if ((datamsg(type) && (type != M_DELAY)) || 3157 ((bp = allocb_tryhard(0)) == NULL)) 3158 return (0); 3159 bp->b_datap->db_type = (unsigned char)type; 3160 3161 putnext(q, bp); 3162 3163 return (1); 3164 } 3165 3166 /* 3167 * Return the queue upstream from this one 3168 */ 3169 queue_t * 3170 backq(queue_t *q) 3171 { 3172 q = _OTHERQ(q); 3173 if (q->q_next) { 3174 q = q->q_next; 3175 return (_OTHERQ(q)); 3176 } 3177 return (NULL); 3178 } 3179 3180 /* 3181 * Send a block back up the queue in reverse from this 3182 * one (e.g. to respond to ioctls) 3183 */ 3184 void 3185 qreply(queue_t *q, mblk_t *bp) 3186 { 3187 ASSERT(q && bp); 3188 3189 putnext(_OTHERQ(q), bp); 3190 } 3191 3192 /* 3193 * Streams Queue Scheduling 3194 * 3195 * Queues are enabled through qenable() when they have messages to 3196 * process. They are serviced by queuerun(), which runs each enabled 3197 * queue's service procedure. The call to queuerun() is processor 3198 * dependent - the general principle is that it be run whenever a queue 3199 * is enabled but before returning to user level. For system calls, 3200 * the function runqueues() is called if their action causes a queue 3201 * to be enabled. For device interrupts, queuerun() should be 3202 * called before returning from the last level of interrupt. Beyond 3203 * this, no timing assumptions should be made about queue scheduling. 3204 */ 3205 3206 /* 3207 * Enable a queue: put it on list of those whose service procedures are 3208 * ready to run and set up the scheduling mechanism. 3209 * The broadcast is done outside the mutex -> to avoid the woken thread 3210 * from contending with the mutex. This is OK 'cos the queue has been 3211 * enqueued on the runlist and flagged safely at this point. 3212 */ 3213 void 3214 qenable(queue_t *q) 3215 { 3216 mutex_enter(QLOCK(q)); 3217 qenable_locked(q); 3218 mutex_exit(QLOCK(q)); 3219 } 3220 /* 3221 * Return number of messages on queue 3222 */ 3223 int 3224 qsize(queue_t *qp) 3225 { 3226 int count = 0; 3227 mblk_t *mp; 3228 3229 mutex_enter(QLOCK(qp)); 3230 for (mp = qp->q_first; mp; mp = mp->b_next) 3231 count++; 3232 mutex_exit(QLOCK(qp)); 3233 return (count); 3234 } 3235 3236 /* 3237 * noenable - set queue so that putq() will not enable it. 3238 * enableok - set queue so that putq() can enable it. 3239 */ 3240 void 3241 noenable(queue_t *q) 3242 { 3243 mutex_enter(QLOCK(q)); 3244 q->q_flag |= QNOENB; 3245 mutex_exit(QLOCK(q)); 3246 } 3247 3248 void 3249 enableok(queue_t *q) 3250 { 3251 mutex_enter(QLOCK(q)); 3252 q->q_flag &= ~QNOENB; 3253 mutex_exit(QLOCK(q)); 3254 } 3255 3256 /* 3257 * Set queue fields. 3258 */ 3259 int 3260 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 3261 { 3262 qband_t *qbp = NULL; 3263 queue_t *wrq; 3264 int error = 0; 3265 kthread_id_t freezer; 3266 3267 freezer = STREAM(q)->sd_freezer; 3268 if (freezer == curthread) { 3269 ASSERT(frozenstr(q)); 3270 ASSERT(MUTEX_HELD(QLOCK(q))); 3271 } else 3272 mutex_enter(QLOCK(q)); 3273 3274 if (what >= QBAD) { 3275 error = EINVAL; 3276 goto done; 3277 } 3278 if (pri != 0) { 3279 int i; 3280 qband_t **qbpp; 3281 3282 if (pri > q->q_nband) { 3283 qbpp = &q->q_bandp; 3284 while (*qbpp) 3285 qbpp = &(*qbpp)->qb_next; 3286 while (pri > q->q_nband) { 3287 if ((*qbpp = allocband()) == NULL) { 3288 error = EAGAIN; 3289 goto done; 3290 } 3291 (*qbpp)->qb_hiwat = q->q_hiwat; 3292 (*qbpp)->qb_lowat = q->q_lowat; 3293 q->q_nband++; 3294 qbpp = &(*qbpp)->qb_next; 3295 } 3296 } 3297 qbp = q->q_bandp; 3298 i = pri; 3299 while (--i) 3300 qbp = qbp->qb_next; 3301 } 3302 switch (what) { 3303 3304 case QHIWAT: 3305 if (qbp) 3306 qbp->qb_hiwat = (size_t)val; 3307 else 3308 q->q_hiwat = (size_t)val; 3309 break; 3310 3311 case QLOWAT: 3312 if (qbp) 3313 qbp->qb_lowat = (size_t)val; 3314 else 3315 q->q_lowat = (size_t)val; 3316 break; 3317 3318 case QMAXPSZ: 3319 if (qbp) 3320 error = EINVAL; 3321 else 3322 q->q_maxpsz = (ssize_t)val; 3323 3324 /* 3325 * Performance concern, strwrite looks at the module below 3326 * the stream head for the maxpsz each time it does a write 3327 * we now cache it at the stream head. Check to see if this 3328 * queue is sitting directly below the stream head. 3329 */ 3330 wrq = STREAM(q)->sd_wrq; 3331 if (q != wrq->q_next) 3332 break; 3333 3334 /* 3335 * If the stream is not frozen drop the current QLOCK and 3336 * acquire the sd_wrq QLOCK which protects sd_qn_* 3337 */ 3338 if (freezer != curthread) { 3339 mutex_exit(QLOCK(q)); 3340 mutex_enter(QLOCK(wrq)); 3341 } 3342 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3343 3344 if (strmsgsz != 0) { 3345 if (val == INFPSZ) 3346 val = strmsgsz; 3347 else { 3348 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3349 val = MIN(PIPE_BUF, val); 3350 else 3351 val = MIN(strmsgsz, val); 3352 } 3353 } 3354 STREAM(q)->sd_qn_maxpsz = val; 3355 if (freezer != curthread) { 3356 mutex_exit(QLOCK(wrq)); 3357 mutex_enter(QLOCK(q)); 3358 } 3359 break; 3360 3361 case QMINPSZ: 3362 if (qbp) 3363 error = EINVAL; 3364 else 3365 q->q_minpsz = (ssize_t)val; 3366 3367 /* 3368 * Performance concern, strwrite looks at the module below 3369 * the stream head for the maxpsz each time it does a write 3370 * we now cache it at the stream head. Check to see if this 3371 * queue is sitting directly below the stream head. 3372 */ 3373 wrq = STREAM(q)->sd_wrq; 3374 if (q != wrq->q_next) 3375 break; 3376 3377 /* 3378 * If the stream is not frozen drop the current QLOCK and 3379 * acquire the sd_wrq QLOCK which protects sd_qn_* 3380 */ 3381 if (freezer != curthread) { 3382 mutex_exit(QLOCK(q)); 3383 mutex_enter(QLOCK(wrq)); 3384 } 3385 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3386 3387 if (freezer != curthread) { 3388 mutex_exit(QLOCK(wrq)); 3389 mutex_enter(QLOCK(q)); 3390 } 3391 break; 3392 3393 case QSTRUIOT: 3394 if (qbp) 3395 error = EINVAL; 3396 else 3397 q->q_struiot = (ushort_t)val; 3398 break; 3399 3400 case QCOUNT: 3401 case QFIRST: 3402 case QLAST: 3403 case QFLAG: 3404 error = EPERM; 3405 break; 3406 3407 default: 3408 error = EINVAL; 3409 break; 3410 } 3411 done: 3412 if (freezer != curthread) 3413 mutex_exit(QLOCK(q)); 3414 return (error); 3415 } 3416 3417 /* 3418 * Get queue fields. 3419 */ 3420 int 3421 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3422 { 3423 qband_t *qbp = NULL; 3424 int error = 0; 3425 kthread_id_t freezer; 3426 3427 freezer = STREAM(q)->sd_freezer; 3428 if (freezer == curthread) { 3429 ASSERT(frozenstr(q)); 3430 ASSERT(MUTEX_HELD(QLOCK(q))); 3431 } else 3432 mutex_enter(QLOCK(q)); 3433 if (what >= QBAD) { 3434 error = EINVAL; 3435 goto done; 3436 } 3437 if (pri != 0) { 3438 int i; 3439 qband_t **qbpp; 3440 3441 if (pri > q->q_nband) { 3442 qbpp = &q->q_bandp; 3443 while (*qbpp) 3444 qbpp = &(*qbpp)->qb_next; 3445 while (pri > q->q_nband) { 3446 if ((*qbpp = allocband()) == NULL) { 3447 error = EAGAIN; 3448 goto done; 3449 } 3450 (*qbpp)->qb_hiwat = q->q_hiwat; 3451 (*qbpp)->qb_lowat = q->q_lowat; 3452 q->q_nband++; 3453 qbpp = &(*qbpp)->qb_next; 3454 } 3455 } 3456 qbp = q->q_bandp; 3457 i = pri; 3458 while (--i) 3459 qbp = qbp->qb_next; 3460 } 3461 switch (what) { 3462 case QHIWAT: 3463 if (qbp) 3464 *(size_t *)valp = qbp->qb_hiwat; 3465 else 3466 *(size_t *)valp = q->q_hiwat; 3467 break; 3468 3469 case QLOWAT: 3470 if (qbp) 3471 *(size_t *)valp = qbp->qb_lowat; 3472 else 3473 *(size_t *)valp = q->q_lowat; 3474 break; 3475 3476 case QMAXPSZ: 3477 if (qbp) 3478 error = EINVAL; 3479 else 3480 *(ssize_t *)valp = q->q_maxpsz; 3481 break; 3482 3483 case QMINPSZ: 3484 if (qbp) 3485 error = EINVAL; 3486 else 3487 *(ssize_t *)valp = q->q_minpsz; 3488 break; 3489 3490 case QCOUNT: 3491 if (qbp) 3492 *(size_t *)valp = qbp->qb_count; 3493 else 3494 *(size_t *)valp = q->q_count; 3495 break; 3496 3497 case QFIRST: 3498 if (qbp) 3499 *(mblk_t **)valp = qbp->qb_first; 3500 else 3501 *(mblk_t **)valp = q->q_first; 3502 break; 3503 3504 case QLAST: 3505 if (qbp) 3506 *(mblk_t **)valp = qbp->qb_last; 3507 else 3508 *(mblk_t **)valp = q->q_last; 3509 break; 3510 3511 case QFLAG: 3512 if (qbp) 3513 *(uint_t *)valp = qbp->qb_flag; 3514 else 3515 *(uint_t *)valp = q->q_flag; 3516 break; 3517 3518 case QSTRUIOT: 3519 if (qbp) 3520 error = EINVAL; 3521 else 3522 *(short *)valp = q->q_struiot; 3523 break; 3524 3525 default: 3526 error = EINVAL; 3527 break; 3528 } 3529 done: 3530 if (freezer != curthread) 3531 mutex_exit(QLOCK(q)); 3532 return (error); 3533 } 3534 3535 /* 3536 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3537 * QWANTWSYNC or QWANTR or QWANTW, 3538 * 3539 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3540 * deferred wakeup will be done. Also if strpoll() in progress then a 3541 * deferred pollwakeup will be done. 3542 */ 3543 void 3544 strwakeq(queue_t *q, int flag) 3545 { 3546 stdata_t *stp = STREAM(q); 3547 pollhead_t *pl; 3548 3549 mutex_enter(&stp->sd_lock); 3550 pl = &stp->sd_pollist; 3551 if (flag & QWANTWSYNC) { 3552 ASSERT(!(q->q_flag & QREADR)); 3553 if (stp->sd_flag & WSLEEP) { 3554 stp->sd_flag &= ~WSLEEP; 3555 cv_broadcast(&stp->sd_wrq->q_wait); 3556 } else { 3557 stp->sd_wakeq |= WSLEEP; 3558 } 3559 3560 mutex_exit(&stp->sd_lock); 3561 pollwakeup(pl, POLLWRNORM); 3562 mutex_enter(&stp->sd_lock); 3563 3564 if (stp->sd_sigflags & S_WRNORM) 3565 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3566 } else if (flag & QWANTR) { 3567 if (stp->sd_flag & RSLEEP) { 3568 stp->sd_flag &= ~RSLEEP; 3569 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3570 } else { 3571 stp->sd_wakeq |= RSLEEP; 3572 } 3573 3574 mutex_exit(&stp->sd_lock); 3575 pollwakeup(pl, POLLIN | POLLRDNORM); 3576 mutex_enter(&stp->sd_lock); 3577 3578 { 3579 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3580 3581 if (events) 3582 strsendsig(stp->sd_siglist, events, 0, 0); 3583 } 3584 } else { 3585 if (stp->sd_flag & WSLEEP) { 3586 stp->sd_flag &= ~WSLEEP; 3587 cv_broadcast(&stp->sd_wrq->q_wait); 3588 } 3589 3590 mutex_exit(&stp->sd_lock); 3591 pollwakeup(pl, POLLWRNORM); 3592 mutex_enter(&stp->sd_lock); 3593 3594 if (stp->sd_sigflags & S_WRNORM) 3595 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3596 } 3597 mutex_exit(&stp->sd_lock); 3598 } 3599 3600 int 3601 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3602 { 3603 stdata_t *stp = STREAM(q); 3604 int typ = STRUIOT_STANDARD; 3605 uio_t *uiop = &dp->d_uio; 3606 dblk_t *dbp; 3607 ssize_t uiocnt; 3608 ssize_t cnt; 3609 unsigned char *ptr; 3610 ssize_t resid; 3611 int error = 0; 3612 on_trap_data_t otd; 3613 queue_t *stwrq; 3614 3615 /* 3616 * Plumbing may change while taking the type so store the 3617 * queue in a temporary variable. It doesn't matter even 3618 * if the we take the type from the previous plumbing, 3619 * that's because if the plumbing has changed when we were 3620 * holding the queue in a temporary variable, we can continue 3621 * processing the message the way it would have been processed 3622 * in the old plumbing, without any side effects but a bit 3623 * extra processing for partial ip header checksum. 3624 * 3625 * This has been done to avoid holding the sd_lock which is 3626 * very hot. 3627 */ 3628 3629 stwrq = stp->sd_struiowrq; 3630 if (stwrq) 3631 typ = stwrq->q_struiot; 3632 3633 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3634 dbp = mp->b_datap; 3635 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3636 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3637 cnt = MIN(uiocnt, uiop->uio_resid); 3638 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3639 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3640 /* 3641 * Either this mblk has already been processed 3642 * or there is no more room in this mblk (?). 3643 */ 3644 continue; 3645 } 3646 switch (typ) { 3647 case STRUIOT_STANDARD: 3648 if (noblock) { 3649 if (on_trap(&otd, OT_DATA_ACCESS)) { 3650 no_trap(); 3651 error = EWOULDBLOCK; 3652 goto out; 3653 } 3654 } 3655 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3656 if (noblock) 3657 no_trap(); 3658 goto out; 3659 } 3660 if (noblock) 3661 no_trap(); 3662 break; 3663 3664 default: 3665 error = EIO; 3666 goto out; 3667 } 3668 dbp->db_struioflag |= STRUIO_DONE; 3669 dbp->db_cksumstuff += cnt; 3670 } 3671 out: 3672 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3673 /* 3674 * A fault has occured and some bytes were moved to the 3675 * current mblk, the uio_t has already been updated by 3676 * the appropriate uio routine, so also update the mblk 3677 * to reflect this in case this same mblk chain is used 3678 * again (after the fault has been handled). 3679 */ 3680 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3681 if (uiocnt >= resid) 3682 dbp->db_cksumstuff += resid; 3683 } 3684 return (error); 3685 } 3686 3687 /* 3688 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3689 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3690 * that removeq() will not try to close the queue while a thread is inside the 3691 * queue. 3692 */ 3693 static boolean_t 3694 rwnext_enter(queue_t *qp) 3695 { 3696 mutex_enter(QLOCK(qp)); 3697 if (qp->q_flag & QWCLOSE) { 3698 mutex_exit(QLOCK(qp)); 3699 return (B_FALSE); 3700 } 3701 qp->q_rwcnt++; 3702 ASSERT(qp->q_rwcnt != 0); 3703 mutex_exit(QLOCK(qp)); 3704 return (B_TRUE); 3705 } 3706 3707 /* 3708 * Decrease the count of threads running in sync stream queue and wake up any 3709 * threads blocked in removeq(). 3710 */ 3711 static void 3712 rwnext_exit(queue_t *qp) 3713 { 3714 mutex_enter(QLOCK(qp)); 3715 qp->q_rwcnt--; 3716 if (qp->q_flag & QWANTRMQSYNC) { 3717 qp->q_flag &= ~QWANTRMQSYNC; 3718 cv_broadcast(&qp->q_wait); 3719 } 3720 mutex_exit(QLOCK(qp)); 3721 } 3722 3723 /* 3724 * The purpose of rwnext() is to call the rw procedure of the next 3725 * (downstream) modules queue. 3726 * 3727 * treated as put entrypoint for perimeter syncronization. 3728 * 3729 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3730 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3731 * not matter if any regular put entrypoints have been already entered. We 3732 * can't increment one of the sq_putcounts (instead of sq_count) because 3733 * qwait_rw won't know which counter to decrement. 3734 * 3735 * It would be reasonable to add the lockless FASTPUT logic. 3736 */ 3737 int 3738 rwnext(queue_t *qp, struiod_t *dp) 3739 { 3740 queue_t *nqp; 3741 syncq_t *sq; 3742 uint16_t count; 3743 uint16_t flags; 3744 struct qinit *qi; 3745 int (*proc)(); 3746 struct stdata *stp; 3747 int isread; 3748 int rval; 3749 3750 stp = STREAM(qp); 3751 /* 3752 * Prevent q_next from changing by holding sd_lock until acquiring 3753 * SQLOCK. Note that a read-side rwnext from the streamhead will 3754 * already have sd_lock acquired. In either case sd_lock is always 3755 * released after acquiring SQLOCK. 3756 * 3757 * The streamhead read-side holding sd_lock when calling rwnext is 3758 * required to prevent a race condition were M_DATA mblks flowing 3759 * up the read-side of the stream could be bypassed by a rwnext() 3760 * down-call. In this case sd_lock acts as the streamhead perimeter. 3761 */ 3762 if ((nqp = _WR(qp)) == qp) { 3763 isread = 0; 3764 mutex_enter(&stp->sd_lock); 3765 qp = nqp->q_next; 3766 } else { 3767 isread = 1; 3768 if (nqp != stp->sd_wrq) 3769 /* Not streamhead */ 3770 mutex_enter(&stp->sd_lock); 3771 qp = _RD(nqp->q_next); 3772 } 3773 qi = qp->q_qinfo; 3774 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3775 /* 3776 * Not a synchronous module or no r/w procedure for this 3777 * queue, so just return EINVAL and let the caller handle it. 3778 */ 3779 mutex_exit(&stp->sd_lock); 3780 return (EINVAL); 3781 } 3782 3783 if (rwnext_enter(qp) == B_FALSE) { 3784 mutex_exit(&stp->sd_lock); 3785 return (EINVAL); 3786 } 3787 3788 sq = qp->q_syncq; 3789 mutex_enter(SQLOCK(sq)); 3790 mutex_exit(&stp->sd_lock); 3791 count = sq->sq_count; 3792 flags = sq->sq_flags; 3793 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3794 3795 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3796 /* 3797 * if this queue is being closed, return. 3798 */ 3799 if (qp->q_flag & QWCLOSE) { 3800 mutex_exit(SQLOCK(sq)); 3801 rwnext_exit(qp); 3802 return (EINVAL); 3803 } 3804 3805 /* 3806 * Wait until we can enter the inner perimeter. 3807 */ 3808 sq->sq_flags = flags | SQ_WANTWAKEUP; 3809 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3810 count = sq->sq_count; 3811 flags = sq->sq_flags; 3812 } 3813 3814 if (isread == 0 && stp->sd_struiowrq == NULL || 3815 isread == 1 && stp->sd_struiordq == NULL) { 3816 /* 3817 * Stream plumbing changed while waiting for inner perimeter 3818 * so just return EINVAL and let the caller handle it. 3819 */ 3820 mutex_exit(SQLOCK(sq)); 3821 rwnext_exit(qp); 3822 return (EINVAL); 3823 } 3824 if (!(flags & SQ_CIPUT)) 3825 sq->sq_flags = flags | SQ_EXCL; 3826 sq->sq_count = count + 1; 3827 ASSERT(sq->sq_count != 0); /* Wraparound */ 3828 /* 3829 * Note: The only message ordering guarantee that rwnext() makes is 3830 * for the write queue flow-control case. All others (r/w queue 3831 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3832 * the queue's rw procedure. This could be genralized here buy 3833 * running the queue's service procedure, but that wouldn't be 3834 * the most efficent for all cases. 3835 */ 3836 mutex_exit(SQLOCK(sq)); 3837 if (! isread && (qp->q_flag & QFULL)) { 3838 /* 3839 * Write queue may be flow controlled. If so, 3840 * mark the queue for wakeup when it's not. 3841 */ 3842 mutex_enter(QLOCK(qp)); 3843 if (qp->q_flag & QFULL) { 3844 qp->q_flag |= QWANTWSYNC; 3845 mutex_exit(QLOCK(qp)); 3846 rval = EWOULDBLOCK; 3847 goto out; 3848 } 3849 mutex_exit(QLOCK(qp)); 3850 } 3851 3852 if (! isread && dp->d_mp) 3853 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3854 dp->d_mp->b_datap->db_base); 3855 3856 rval = (*proc)(qp, dp); 3857 3858 if (isread && dp->d_mp) 3859 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3860 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3861 out: 3862 /* 3863 * The queue is protected from being freed by sq_count, so it is 3864 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3865 */ 3866 rwnext_exit(qp); 3867 3868 mutex_enter(SQLOCK(sq)); 3869 flags = sq->sq_flags; 3870 ASSERT(sq->sq_count != 0); 3871 sq->sq_count--; 3872 if (flags & SQ_TAIL) { 3873 putnext_tail(sq, qp, flags); 3874 /* 3875 * The only purpose of this ASSERT is to preserve calling stack 3876 * in DEBUG kernel. 3877 */ 3878 ASSERT(flags & SQ_TAIL); 3879 return (rval); 3880 } 3881 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3882 /* 3883 * Safe to always drop SQ_EXCL: 3884 * Not SQ_CIPUT means we set SQ_EXCL above 3885 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3886 * did a qwriter(INNER) in which case nobody else 3887 * is in the inner perimeter and we are exiting. 3888 * 3889 * I would like to make the following assertion: 3890 * 3891 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3892 * sq->sq_count == 0); 3893 * 3894 * which indicates that if we are both putshared and exclusive, 3895 * we became exclusive while executing the putproc, and the only 3896 * claim on the syncq was the one we dropped a few lines above. 3897 * But other threads that enter putnext while the syncq is exclusive 3898 * need to make a claim as they may need to drop SQLOCK in the 3899 * has_writers case to avoid deadlocks. If these threads are 3900 * delayed or preempted, it is possible that the writer thread can 3901 * find out that there are other claims making the (sq_count == 0) 3902 * test invalid. 3903 */ 3904 3905 sq->sq_flags = flags & ~SQ_EXCL; 3906 if (sq->sq_flags & SQ_WANTWAKEUP) { 3907 sq->sq_flags &= ~SQ_WANTWAKEUP; 3908 cv_broadcast(&sq->sq_wait); 3909 } 3910 mutex_exit(SQLOCK(sq)); 3911 return (rval); 3912 } 3913 3914 /* 3915 * The purpose of infonext() is to call the info procedure of the next 3916 * (downstream) modules queue. 3917 * 3918 * treated as put entrypoint for perimeter syncronization. 3919 * 3920 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3921 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3922 * it does not matter if any regular put entrypoints have been already 3923 * entered. 3924 */ 3925 int 3926 infonext(queue_t *qp, infod_t *idp) 3927 { 3928 queue_t *nqp; 3929 syncq_t *sq; 3930 uint16_t count; 3931 uint16_t flags; 3932 struct qinit *qi; 3933 int (*proc)(); 3934 struct stdata *stp; 3935 int rval; 3936 3937 stp = STREAM(qp); 3938 /* 3939 * Prevent q_next from changing by holding sd_lock until 3940 * acquiring SQLOCK. 3941 */ 3942 mutex_enter(&stp->sd_lock); 3943 if ((nqp = _WR(qp)) == qp) { 3944 qp = nqp->q_next; 3945 } else { 3946 qp = _RD(nqp->q_next); 3947 } 3948 qi = qp->q_qinfo; 3949 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3950 mutex_exit(&stp->sd_lock); 3951 return (EINVAL); 3952 } 3953 sq = qp->q_syncq; 3954 mutex_enter(SQLOCK(sq)); 3955 mutex_exit(&stp->sd_lock); 3956 count = sq->sq_count; 3957 flags = sq->sq_flags; 3958 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3959 3960 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3961 /* 3962 * Wait until we can enter the inner perimeter. 3963 */ 3964 sq->sq_flags = flags | SQ_WANTWAKEUP; 3965 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3966 count = sq->sq_count; 3967 flags = sq->sq_flags; 3968 } 3969 3970 if (! (flags & SQ_CIPUT)) 3971 sq->sq_flags = flags | SQ_EXCL; 3972 sq->sq_count = count + 1; 3973 ASSERT(sq->sq_count != 0); /* Wraparound */ 3974 mutex_exit(SQLOCK(sq)); 3975 3976 rval = (*proc)(qp, idp); 3977 3978 mutex_enter(SQLOCK(sq)); 3979 flags = sq->sq_flags; 3980 ASSERT(sq->sq_count != 0); 3981 sq->sq_count--; 3982 if (flags & SQ_TAIL) { 3983 putnext_tail(sq, qp, flags); 3984 /* 3985 * The only purpose of this ASSERT is to preserve calling stack 3986 * in DEBUG kernel. 3987 */ 3988 ASSERT(flags & SQ_TAIL); 3989 return (rval); 3990 } 3991 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3992 /* 3993 * XXXX 3994 * I am not certain the next comment is correct here. I need to consider 3995 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3996 * might cause other problems. It just might be safer to drop it if 3997 * !SQ_CIPUT because that is when we set it. 3998 */ 3999 /* 4000 * Safe to always drop SQ_EXCL: 4001 * Not SQ_CIPUT means we set SQ_EXCL above 4002 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 4003 * did a qwriter(INNER) in which case nobody else 4004 * is in the inner perimeter and we are exiting. 4005 * 4006 * I would like to make the following assertion: 4007 * 4008 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 4009 * sq->sq_count == 0); 4010 * 4011 * which indicates that if we are both putshared and exclusive, 4012 * we became exclusive while executing the putproc, and the only 4013 * claim on the syncq was the one we dropped a few lines above. 4014 * But other threads that enter putnext while the syncq is exclusive 4015 * need to make a claim as they may need to drop SQLOCK in the 4016 * has_writers case to avoid deadlocks. If these threads are 4017 * delayed or preempted, it is possible that the writer thread can 4018 * find out that there are other claims making the (sq_count == 0) 4019 * test invalid. 4020 */ 4021 4022 sq->sq_flags = flags & ~SQ_EXCL; 4023 mutex_exit(SQLOCK(sq)); 4024 return (rval); 4025 } 4026 4027 /* 4028 * Return nonzero if the queue is responsible for struio(), else return 0. 4029 */ 4030 int 4031 isuioq(queue_t *q) 4032 { 4033 if (q->q_flag & QREADR) 4034 return (STREAM(q)->sd_struiordq == q); 4035 else 4036 return (STREAM(q)->sd_struiowrq == q); 4037 } 4038 4039 #if defined(__sparc) 4040 int disable_putlocks = 0; 4041 #else 4042 int disable_putlocks = 1; 4043 #endif 4044 4045 /* 4046 * called by create_putlock. 4047 */ 4048 static void 4049 create_syncq_putlocks(queue_t *q) 4050 { 4051 syncq_t *sq = q->q_syncq; 4052 ciputctrl_t *cip; 4053 int i; 4054 4055 ASSERT(sq != NULL); 4056 4057 ASSERT(disable_putlocks == 0); 4058 ASSERT(n_ciputctrl >= min_n_ciputctrl); 4059 ASSERT(ciputctrl_cache != NULL); 4060 4061 if (!(sq->sq_type & SQ_CIPUT)) 4062 return; 4063 4064 for (i = 0; i <= 1; i++) { 4065 if (sq->sq_ciputctrl == NULL) { 4066 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4067 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4068 mutex_enter(SQLOCK(sq)); 4069 if (sq->sq_ciputctrl != NULL) { 4070 mutex_exit(SQLOCK(sq)); 4071 kmem_cache_free(ciputctrl_cache, cip); 4072 } else { 4073 ASSERT(sq->sq_nciputctrl == 0); 4074 sq->sq_nciputctrl = n_ciputctrl - 1; 4075 /* 4076 * putnext checks sq_ciputctrl without holding 4077 * SQLOCK. if it is not NULL putnext assumes 4078 * sq_nciputctrl is initialized. membar below 4079 * insures that. 4080 */ 4081 membar_producer(); 4082 sq->sq_ciputctrl = cip; 4083 mutex_exit(SQLOCK(sq)); 4084 } 4085 } 4086 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 4087 if (i == 1) 4088 break; 4089 q = _OTHERQ(q); 4090 if (!(q->q_flag & QPERQ)) { 4091 ASSERT(sq == q->q_syncq); 4092 break; 4093 } 4094 ASSERT(q->q_syncq != NULL); 4095 ASSERT(sq != q->q_syncq); 4096 sq = q->q_syncq; 4097 ASSERT(sq->sq_type & SQ_CIPUT); 4098 } 4099 } 4100 4101 /* 4102 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 4103 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 4104 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 4105 * starting from q and down to the driver. 4106 * 4107 * This should be called after the affected queues are part of stream 4108 * geometry. It should be called from driver/module open routine after 4109 * qprocson() call. It is also called from nfs syscall where it is known that 4110 * stream is configured and won't change its geometry during create_putlock 4111 * call. 4112 * 4113 * caller normally uses 0 value for the stream argument to speed up MT putnext 4114 * into the perimeter of q for example because its perimeter is per module 4115 * (e.g. IP). 4116 * 4117 * caller normally uses non 0 value for the stream argument to hint the system 4118 * that the stream of q is a very contended global system stream 4119 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 4120 * particularly MT hot. 4121 * 4122 * Caller insures stream plumbing won't happen while we are here and therefore 4123 * q_next can be safely used. 4124 */ 4125 4126 void 4127 create_putlocks(queue_t *q, int stream) 4128 { 4129 ciputctrl_t *cip; 4130 struct stdata *stp = STREAM(q); 4131 4132 q = _WR(q); 4133 ASSERT(stp != NULL); 4134 4135 if (disable_putlocks != 0) 4136 return; 4137 4138 if (n_ciputctrl < min_n_ciputctrl) 4139 return; 4140 4141 ASSERT(ciputctrl_cache != NULL); 4142 4143 if (stream != 0 && stp->sd_ciputctrl == NULL) { 4144 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4145 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4146 mutex_enter(&stp->sd_lock); 4147 if (stp->sd_ciputctrl != NULL) { 4148 mutex_exit(&stp->sd_lock); 4149 kmem_cache_free(ciputctrl_cache, cip); 4150 } else { 4151 ASSERT(stp->sd_nciputctrl == 0); 4152 stp->sd_nciputctrl = n_ciputctrl - 1; 4153 /* 4154 * putnext checks sd_ciputctrl without holding 4155 * sd_lock. if it is not NULL putnext assumes 4156 * sd_nciputctrl is initialized. membar below 4157 * insures that. 4158 */ 4159 membar_producer(); 4160 stp->sd_ciputctrl = cip; 4161 mutex_exit(&stp->sd_lock); 4162 } 4163 } 4164 4165 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 4166 4167 while (_SAMESTR(q)) { 4168 create_syncq_putlocks(q); 4169 if (stream == 0) 4170 return; 4171 q = q->q_next; 4172 } 4173 ASSERT(q != NULL); 4174 create_syncq_putlocks(q); 4175 } 4176 4177 /* 4178 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 4179 * through a stream. 4180 * 4181 * Data currently record per-event is a timestamp, module/driver name, 4182 * downstream module/driver name, optional callstack, event type and a per 4183 * type datum. Much of the STREAMS framework is instrumented for automatic 4184 * flow tracing (when enabled). Events can be defined and used by STREAMS 4185 * modules and drivers. 4186 * 4187 * Global objects: 4188 * 4189 * str_ftevent() - Add a flow-trace event to a dblk. 4190 * str_ftfree() - Free flow-trace data 4191 * 4192 * Local objects: 4193 * 4194 * fthdr_cache - pointer to the kmem cache for trace header. 4195 * ftblk_cache - pointer to the kmem cache for trace data blocks. 4196 */ 4197 4198 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 4199 int str_ftstack = 0; /* Don't record event call stacks */ 4200 4201 void 4202 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 4203 { 4204 ftblk_t *bp = hp->tail; 4205 ftblk_t *nbp; 4206 ftevnt_t *ep; 4207 int ix, nix; 4208 4209 ASSERT(hp != NULL); 4210 4211 for (;;) { 4212 if ((ix = bp->ix) == FTBLK_EVNTS) { 4213 /* 4214 * Tail doesn't have room, so need a new tail. 4215 * 4216 * To make this MT safe, first, allocate a new 4217 * ftblk, and initialize it. To make life a 4218 * little easier, reserve the first slot (mostly 4219 * by making ix = 1). When we are finished with 4220 * the initialization, CAS this pointer to the 4221 * tail. If this succeeds, this is the new 4222 * "next" block. Otherwise, another thread 4223 * got here first, so free the block and start 4224 * again. 4225 */ 4226 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP); 4227 if (nbp == NULL) { 4228 /* no mem, so punt */ 4229 str_ftnever++; 4230 /* free up all flow data? */ 4231 return; 4232 } 4233 nbp->nxt = NULL; 4234 nbp->ix = 1; 4235 /* 4236 * Just in case there is another thread about 4237 * to get the next index, we need to make sure 4238 * the value is there for it. 4239 */ 4240 membar_producer(); 4241 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) { 4242 /* CAS was successful */ 4243 bp->nxt = nbp; 4244 membar_producer(); 4245 bp = nbp; 4246 ix = 0; 4247 goto cas_good; 4248 } else { 4249 kmem_cache_free(ftblk_cache, nbp); 4250 bp = hp->tail; 4251 continue; 4252 } 4253 } 4254 nix = ix + 1; 4255 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) { 4256 cas_good: 4257 if (curthread != hp->thread) { 4258 hp->thread = curthread; 4259 evnt |= FTEV_CS; 4260 } 4261 if (CPU->cpu_seqid != hp->cpu_seqid) { 4262 hp->cpu_seqid = CPU->cpu_seqid; 4263 evnt |= FTEV_PS; 4264 } 4265 ep = &bp->ev[ix]; 4266 break; 4267 } 4268 } 4269 4270 if (evnt & FTEV_QMASK) { 4271 queue_t *qp = p; 4272 4273 if (!(qp->q_flag & QREADR)) 4274 evnt |= FTEV_ISWR; 4275 4276 ep->mid = Q2NAME(qp); 4277 4278 /* 4279 * We only record the next queue name for FTEV_PUTNEXT since 4280 * that's the only time we *really* need it, and the putnext() 4281 * code ensures that qp->q_next won't vanish. (We could use 4282 * claimstr()/releasestr() but at a performance cost.) 4283 */ 4284 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL) 4285 ep->midnext = Q2NAME(qp->q_next); 4286 else 4287 ep->midnext = NULL; 4288 } else { 4289 ep->mid = p; 4290 ep->midnext = NULL; 4291 } 4292 4293 if (ep->stk != NULL) 4294 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH); 4295 4296 ep->ts = gethrtime(); 4297 ep->evnt = evnt; 4298 ep->data = data; 4299 hp->hash = (hp->hash << 9) + hp->hash; 4300 hp->hash += (evnt << 16) | data; 4301 hp->hash += (uintptr_t)ep->mid; 4302 } 4303 4304 /* 4305 * Free flow-trace data. 4306 */ 4307 void 4308 str_ftfree(dblk_t *dbp) 4309 { 4310 fthdr_t *hp = dbp->db_fthdr; 4311 ftblk_t *bp = &hp->first; 4312 ftblk_t *nbp; 4313 4314 if (bp != hp->tail || bp->ix != 0) { 4315 /* 4316 * Clear out the hash, have the tail point to itself, and free 4317 * any continuation blocks. 4318 */ 4319 bp = hp->first.nxt; 4320 hp->tail = &hp->first; 4321 hp->hash = 0; 4322 hp->first.nxt = NULL; 4323 hp->first.ix = 0; 4324 while (bp != NULL) { 4325 nbp = bp->nxt; 4326 kmem_cache_free(ftblk_cache, bp); 4327 bp = nbp; 4328 } 4329 } 4330 kmem_cache_free(fthdr_cache, hp); 4331 dbp->db_fthdr = NULL; 4332 } 4333