1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 /* 25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 * 28 * Copyright 2021 Tintri by DDN, Inc. All rights reserved. 29 * Copyright 2022 Garrett D'Amore 30 */ 31 32 #include <sys/types.h> 33 #include <sys/param.h> 34 #include <sys/thread.h> 35 #include <sys/sysmacros.h> 36 #include <sys/stropts.h> 37 #include <sys/stream.h> 38 #include <sys/strsubr.h> 39 #include <sys/strsun.h> 40 #include <sys/conf.h> 41 #include <sys/debug.h> 42 #include <sys/cmn_err.h> 43 #include <sys/kmem.h> 44 #include <sys/atomic.h> 45 #include <sys/errno.h> 46 #include <sys/vtrace.h> 47 #include <sys/ftrace.h> 48 #include <sys/ontrap.h> 49 #include <sys/sdt.h> 50 #include <sys/strft.h> 51 52 #ifdef DEBUG 53 #include <sys/kmem_impl.h> 54 #endif 55 56 /* 57 * This file contains all the STREAMS utility routines that may 58 * be used by modules and drivers. 59 */ 60 61 /* 62 * STREAMS message allocator: principles of operation 63 * 64 * The streams message allocator consists of all the routines that 65 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 66 * dupb(), freeb() and freemsg(). What follows is a high-level view 67 * of how the allocator works. 68 * 69 * Every streams message consists of one or more mblks, a dblk, and data. 70 * All mblks for all types of messages come from a common mblk_cache. 71 * The dblk and data come in several flavors, depending on how the 72 * message is allocated: 73 * 74 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 75 * fixed-size dblk/data caches. For message sizes that are multiples of 76 * PAGESIZE, dblks are allocated separately from the buffer. 77 * The associated buffer is allocated by the constructor using kmem_alloc(). 78 * For all other message sizes, dblk and its associated data is allocated 79 * as a single contiguous chunk of memory. 80 * Objects in these caches consist of a dblk plus its associated data. 81 * allocb() determines the nearest-size cache by table lookup: 82 * the dblk_cache[] array provides the mapping from size to dblk cache. 83 * 84 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 85 * kmem_alloc()'ing a buffer for the data and supplying that 86 * buffer to gesballoc(), described below. 87 * 88 * (3) The four flavors of [d]esballoc[a] are all implemented by a 89 * common routine, gesballoc() ("generic esballoc"). gesballoc() 90 * allocates a dblk from the global dblk_esb_cache and sets db_base, 91 * db_lim and db_frtnp to describe the caller-supplied buffer. 92 * 93 * While there are several routines to allocate messages, there is only 94 * one routine to free messages: freeb(). freeb() simply invokes the 95 * dblk's free method, dbp->db_free(), which is set at allocation time. 96 * 97 * dupb() creates a new reference to a message by allocating a new mblk, 98 * incrementing the dblk reference count and setting the dblk's free 99 * method to dblk_decref(). The dblk's original free method is retained 100 * in db_lastfree. dblk_decref() decrements the reference count on each 101 * freeb(). If this is not the last reference it just frees the mblk; 102 * if this *is* the last reference, it restores db_free to db_lastfree, 103 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 104 * 105 * The implementation makes aggressive use of kmem object caching for 106 * maximum performance. This makes the code simple and compact, but 107 * also a bit abstruse in some places. The invariants that constitute a 108 * message's constructed state, described below, are more subtle than usual. 109 * 110 * Every dblk has an "attached mblk" as part of its constructed state. 111 * The mblk is allocated by the dblk's constructor and remains attached 112 * until the message is either dup'ed or pulled up. In the dupb() case 113 * the mblk association doesn't matter until the last free, at which time 114 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 115 * the mblk association because it swaps the leading mblks of two messages, 116 * so it is responsible for swapping their db_mblk pointers accordingly. 117 * From a constructed-state viewpoint it doesn't matter that a dblk's 118 * attached mblk can change while the message is allocated; all that 119 * matters is that the dblk has *some* attached mblk when it's freed. 120 * 121 * The sizes of the allocb() small-message caches are not magical. 122 * They represent a good trade-off between internal and external 123 * fragmentation for current workloads. They should be reevaluated 124 * periodically, especially if allocations larger than DBLK_MAX_CACHE 125 * become common. We use 64-byte alignment so that dblks don't 126 * straddle cache lines unnecessarily. 127 */ 128 #define DBLK_MAX_CACHE 73728 129 #define DBLK_CACHE_ALIGN 64 130 #define DBLK_MIN_SIZE 8 131 #define DBLK_SIZE_SHIFT 3 132 133 #ifdef _BIG_ENDIAN 134 #define DBLK_RTFU_SHIFT(field) \ 135 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 136 #else 137 #define DBLK_RTFU_SHIFT(field) \ 138 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 139 #endif 140 141 #define DBLK_RTFU(ref, type, flags, uioflag) \ 142 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 143 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 144 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 145 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 146 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 147 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 148 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 149 150 static size_t dblk_sizes[] = { 151 #ifdef _LP64 152 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856, 153 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624, 154 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392, 155 #else 156 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904, 157 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672, 158 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440, 159 #endif 160 DBLK_MAX_CACHE, 0 161 }; 162 163 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 164 static struct kmem_cache *mblk_cache; 165 static struct kmem_cache *dblk_esb_cache; 166 static struct kmem_cache *fthdr_cache; 167 static struct kmem_cache *ftblk_cache; 168 169 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 170 static mblk_t *allocb_oversize(size_t size, int flags); 171 static int allocb_tryhard_fails; 172 static void frnop_func(void *arg); 173 frtn_t frnop = { frnop_func }; 174 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 175 176 static boolean_t rwnext_enter(queue_t *qp); 177 static void rwnext_exit(queue_t *qp); 178 179 /* 180 * Patchable mblk/dblk kmem_cache flags. 181 */ 182 int dblk_kmem_flags = 0; 183 int mblk_kmem_flags = 0; 184 185 static int 186 dblk_constructor(void *buf, void *cdrarg, int kmflags) 187 { 188 dblk_t *dbp = buf; 189 ssize_t msg_size = (ssize_t)cdrarg; 190 size_t index; 191 192 ASSERT(msg_size != 0); 193 194 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 195 196 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 197 198 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 199 return (-1); 200 if ((msg_size & PAGEOFFSET) == 0) { 201 dbp->db_base = kmem_alloc(msg_size, kmflags); 202 if (dbp->db_base == NULL) { 203 kmem_cache_free(mblk_cache, dbp->db_mblk); 204 return (-1); 205 } 206 } else { 207 dbp->db_base = (unsigned char *)&dbp[1]; 208 } 209 210 dbp->db_mblk->b_datap = dbp; 211 dbp->db_cache = dblk_cache[index]; 212 dbp->db_lim = dbp->db_base + msg_size; 213 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 214 dbp->db_frtnp = NULL; 215 dbp->db_fthdr = NULL; 216 dbp->db_credp = NULL; 217 dbp->db_cpid = -1; 218 dbp->db_struioflag = 0; 219 dbp->db_struioun.cksum.flags = 0; 220 return (0); 221 } 222 223 /*ARGSUSED*/ 224 static int 225 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 226 { 227 dblk_t *dbp = buf; 228 229 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 230 return (-1); 231 dbp->db_mblk->b_datap = dbp; 232 dbp->db_cache = dblk_esb_cache; 233 dbp->db_fthdr = NULL; 234 dbp->db_credp = NULL; 235 dbp->db_cpid = -1; 236 dbp->db_struioflag = 0; 237 dbp->db_struioun.cksum.flags = 0; 238 return (0); 239 } 240 241 static int 242 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 243 { 244 dblk_t *dbp = buf; 245 bcache_t *bcp = cdrarg; 246 247 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 248 return (-1); 249 250 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags); 251 if (dbp->db_base == NULL) { 252 kmem_cache_free(mblk_cache, dbp->db_mblk); 253 return (-1); 254 } 255 256 dbp->db_mblk->b_datap = dbp; 257 dbp->db_cache = (void *)bcp; 258 dbp->db_lim = dbp->db_base + bcp->size; 259 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 260 dbp->db_frtnp = NULL; 261 dbp->db_fthdr = NULL; 262 dbp->db_credp = NULL; 263 dbp->db_cpid = -1; 264 dbp->db_struioflag = 0; 265 dbp->db_struioun.cksum.flags = 0; 266 return (0); 267 } 268 269 /*ARGSUSED*/ 270 static void 271 dblk_destructor(void *buf, void *cdrarg) 272 { 273 dblk_t *dbp = buf; 274 ssize_t msg_size = (ssize_t)cdrarg; 275 276 ASSERT(dbp->db_mblk->b_datap == dbp); 277 ASSERT(msg_size != 0); 278 ASSERT(dbp->db_struioflag == 0); 279 ASSERT(dbp->db_struioun.cksum.flags == 0); 280 281 if ((msg_size & PAGEOFFSET) == 0) { 282 kmem_free(dbp->db_base, msg_size); 283 } 284 285 kmem_cache_free(mblk_cache, dbp->db_mblk); 286 } 287 288 static void 289 bcache_dblk_destructor(void *buf, void *cdrarg) 290 { 291 dblk_t *dbp = buf; 292 bcache_t *bcp = cdrarg; 293 294 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 295 296 ASSERT(dbp->db_mblk->b_datap == dbp); 297 ASSERT(dbp->db_struioflag == 0); 298 ASSERT(dbp->db_struioun.cksum.flags == 0); 299 300 kmem_cache_free(mblk_cache, dbp->db_mblk); 301 } 302 303 /* ARGSUSED */ 304 static int 305 ftblk_constructor(void *buf, void *cdrarg, int kmflags) 306 { 307 ftblk_t *fbp = buf; 308 int i; 309 310 bzero(fbp, sizeof (ftblk_t)); 311 if (str_ftstack != 0) { 312 for (i = 0; i < FTBLK_EVNTS; i++) 313 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags); 314 } 315 316 return (0); 317 } 318 319 /* ARGSUSED */ 320 static void 321 ftblk_destructor(void *buf, void *cdrarg) 322 { 323 ftblk_t *fbp = buf; 324 int i; 325 326 if (str_ftstack != 0) { 327 for (i = 0; i < FTBLK_EVNTS; i++) { 328 if (fbp->ev[i].stk != NULL) { 329 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t)); 330 fbp->ev[i].stk = NULL; 331 } 332 } 333 } 334 } 335 336 static int 337 fthdr_constructor(void *buf, void *cdrarg, int kmflags) 338 { 339 fthdr_t *fhp = buf; 340 341 return (ftblk_constructor(&fhp->first, cdrarg, kmflags)); 342 } 343 344 static void 345 fthdr_destructor(void *buf, void *cdrarg) 346 { 347 fthdr_t *fhp = buf; 348 349 ftblk_destructor(&fhp->first, cdrarg); 350 } 351 352 void 353 streams_msg_init(void) 354 { 355 char name[40]; 356 size_t size; 357 size_t lastsize = DBLK_MIN_SIZE; 358 size_t *sizep; 359 struct kmem_cache *cp; 360 size_t tot_size; 361 int offset; 362 363 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32, 364 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags); 365 366 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 367 368 if ((offset = (size & PAGEOFFSET)) != 0) { 369 /* 370 * We are in the middle of a page, dblk should 371 * be allocated on the same page 372 */ 373 tot_size = size + sizeof (dblk_t); 374 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 375 < PAGESIZE); 376 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 377 378 } else { 379 380 /* 381 * buf size is multiple of page size, dblk and 382 * buffer are allocated separately. 383 */ 384 385 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 386 tot_size = sizeof (dblk_t); 387 } 388 389 (void) sprintf(name, "streams_dblk_%ld", size); 390 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN, 391 dblk_constructor, dblk_destructor, NULL, (void *)(size), 392 NULL, dblk_kmem_flags); 393 394 while (lastsize <= size) { 395 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 396 lastsize += DBLK_MIN_SIZE; 397 } 398 } 399 400 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t), 401 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL, 402 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags); 403 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32, 404 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0); 405 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32, 406 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0); 407 408 /* initialize throttling queue for esballoc */ 409 esballoc_queue_init(); 410 } 411 412 /*ARGSUSED*/ 413 mblk_t * 414 allocb(size_t size, uint_t pri) 415 { 416 dblk_t *dbp; 417 mblk_t *mp; 418 size_t index; 419 420 index = (size - 1) >> DBLK_SIZE_SHIFT; 421 422 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 423 if (size != 0) { 424 mp = allocb_oversize(size, KM_NOSLEEP); 425 goto out; 426 } 427 index = 0; 428 } 429 430 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 431 mp = NULL; 432 goto out; 433 } 434 435 mp = dbp->db_mblk; 436 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 437 mp->b_next = mp->b_prev = mp->b_cont = NULL; 438 mp->b_rptr = mp->b_wptr = dbp->db_base; 439 mp->b_queue = NULL; 440 MBLK_BAND_FLAG_WORD(mp) = 0; 441 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 442 out: 443 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 444 445 return (mp); 446 } 447 448 /* 449 * Allocate an mblk taking db_credp and db_cpid from the template. 450 * Allow the cred to be NULL. 451 */ 452 mblk_t * 453 allocb_tmpl(size_t size, const mblk_t *tmpl) 454 { 455 mblk_t *mp = allocb(size, 0); 456 457 if (mp != NULL) { 458 dblk_t *src = tmpl->b_datap; 459 dblk_t *dst = mp->b_datap; 460 cred_t *cr; 461 pid_t cpid; 462 463 cr = msg_getcred(tmpl, &cpid); 464 if (cr != NULL) 465 crhold(dst->db_credp = cr); 466 dst->db_cpid = cpid; 467 dst->db_type = src->db_type; 468 } 469 return (mp); 470 } 471 472 mblk_t * 473 allocb_cred(size_t size, cred_t *cr, pid_t cpid) 474 { 475 mblk_t *mp = allocb(size, 0); 476 477 ASSERT(cr != NULL); 478 if (mp != NULL) { 479 dblk_t *dbp = mp->b_datap; 480 481 crhold(dbp->db_credp = cr); 482 dbp->db_cpid = cpid; 483 } 484 return (mp); 485 } 486 487 mblk_t * 488 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid) 489 { 490 mblk_t *mp = allocb_wait(size, 0, flags, error); 491 492 ASSERT(cr != NULL); 493 if (mp != NULL) { 494 dblk_t *dbp = mp->b_datap; 495 496 crhold(dbp->db_credp = cr); 497 dbp->db_cpid = cpid; 498 } 499 500 return (mp); 501 } 502 503 /* 504 * Extract the db_cred (and optionally db_cpid) from a message. 505 * We find the first mblk which has a non-NULL db_cred and use that. 506 * If none found we return NULL. 507 * Does NOT get a hold on the cred. 508 */ 509 cred_t * 510 msg_getcred(const mblk_t *mp, pid_t *cpidp) 511 { 512 cred_t *cr = NULL; 513 cred_t *cr2; 514 mblk_t *mp2; 515 516 while (mp != NULL) { 517 dblk_t *dbp = mp->b_datap; 518 519 cr = dbp->db_credp; 520 if (cr == NULL) { 521 mp = mp->b_cont; 522 continue; 523 } 524 if (cpidp != NULL) 525 *cpidp = dbp->db_cpid; 526 527 #ifdef DEBUG 528 /* 529 * Normally there should at most one db_credp in a message. 530 * But if there are multiple (as in the case of some M_IOC* 531 * and some internal messages in TCP/IP bind logic) then 532 * they must be identical in the normal case. 533 * However, a socket can be shared between different uids 534 * in which case data queued in TCP would be from different 535 * creds. Thus we can only assert for the zoneid being the 536 * same. Due to Multi-level Level Ports for TX, some 537 * cred_t can have a NULL cr_zone, and we skip the comparison 538 * in that case. 539 */ 540 mp2 = mp->b_cont; 541 while (mp2 != NULL) { 542 cr2 = DB_CRED(mp2); 543 if (cr2 != NULL) { 544 DTRACE_PROBE2(msg__getcred, 545 cred_t *, cr, cred_t *, cr2); 546 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 547 crgetzone(cr) == NULL || 548 crgetzone(cr2) == NULL); 549 } 550 mp2 = mp2->b_cont; 551 } 552 #endif 553 return (cr); 554 } 555 if (cpidp != NULL) 556 *cpidp = NOPID; 557 return (NULL); 558 } 559 560 /* 561 * Variant of msg_getcred which, when a cred is found 562 * 1. Returns with a hold on the cred 563 * 2. Clears the first cred in the mblk. 564 * This is more efficient to use than a msg_getcred() + crhold() when 565 * the message is freed after the cred has been extracted. 566 * 567 * The caller is responsible for ensuring that there is no other reference 568 * on the message since db_credp can not be cleared when there are other 569 * references. 570 */ 571 cred_t * 572 msg_extractcred(mblk_t *mp, pid_t *cpidp) 573 { 574 cred_t *cr = NULL; 575 cred_t *cr2; 576 mblk_t *mp2; 577 578 while (mp != NULL) { 579 dblk_t *dbp = mp->b_datap; 580 581 cr = dbp->db_credp; 582 if (cr == NULL) { 583 mp = mp->b_cont; 584 continue; 585 } 586 ASSERT(dbp->db_ref == 1); 587 dbp->db_credp = NULL; 588 if (cpidp != NULL) 589 *cpidp = dbp->db_cpid; 590 #ifdef DEBUG 591 /* 592 * Normally there should at most one db_credp in a message. 593 * But if there are multiple (as in the case of some M_IOC* 594 * and some internal messages in TCP/IP bind logic) then 595 * they must be identical in the normal case. 596 * However, a socket can be shared between different uids 597 * in which case data queued in TCP would be from different 598 * creds. Thus we can only assert for the zoneid being the 599 * same. Due to Multi-level Level Ports for TX, some 600 * cred_t can have a NULL cr_zone, and we skip the comparison 601 * in that case. 602 */ 603 mp2 = mp->b_cont; 604 while (mp2 != NULL) { 605 cr2 = DB_CRED(mp2); 606 if (cr2 != NULL) { 607 DTRACE_PROBE2(msg__extractcred, 608 cred_t *, cr, cred_t *, cr2); 609 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 610 crgetzone(cr) == NULL || 611 crgetzone(cr2) == NULL); 612 } 613 mp2 = mp2->b_cont; 614 } 615 #endif 616 return (cr); 617 } 618 return (NULL); 619 } 620 /* 621 * Get the label for a message. Uses the first mblk in the message 622 * which has a non-NULL db_credp. 623 * Returns NULL if there is no credp. 624 */ 625 extern struct ts_label_s * 626 msg_getlabel(const mblk_t *mp) 627 { 628 cred_t *cr = msg_getcred(mp, NULL); 629 630 if (cr == NULL) 631 return (NULL); 632 633 return (crgetlabel(cr)); 634 } 635 636 void 637 freeb(mblk_t *mp) 638 { 639 dblk_t *dbp = mp->b_datap; 640 641 ASSERT(dbp->db_ref > 0); 642 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 643 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 644 645 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 646 647 dbp->db_free(mp, dbp); 648 } 649 650 void 651 freemsg(mblk_t *mp) 652 { 653 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 654 while (mp) { 655 dblk_t *dbp = mp->b_datap; 656 mblk_t *mp_cont = mp->b_cont; 657 658 ASSERT(dbp->db_ref > 0); 659 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 660 661 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 662 663 dbp->db_free(mp, dbp); 664 mp = mp_cont; 665 } 666 } 667 668 /* 669 * Reallocate a block for another use. Try hard to use the old block. 670 * If the old data is wanted (copy), leave b_wptr at the end of the data, 671 * otherwise return b_wptr = b_rptr. 672 * 673 * This routine is private and unstable. 674 */ 675 mblk_t * 676 reallocb(mblk_t *mp, size_t size, uint_t copy) 677 { 678 mblk_t *mp1; 679 unsigned char *old_rptr; 680 ptrdiff_t cur_size; 681 682 if (mp == NULL) 683 return (allocb(size, BPRI_HI)); 684 685 cur_size = mp->b_wptr - mp->b_rptr; 686 old_rptr = mp->b_rptr; 687 688 ASSERT(mp->b_datap->db_ref != 0); 689 690 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 691 /* 692 * If the data is wanted and it will fit where it is, no 693 * work is required. 694 */ 695 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 696 return (mp); 697 698 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 699 mp1 = mp; 700 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 701 /* XXX other mp state could be copied too, db_flags ... ? */ 702 mp1->b_cont = mp->b_cont; 703 } else { 704 return (NULL); 705 } 706 707 if (copy) { 708 bcopy(old_rptr, mp1->b_rptr, cur_size); 709 mp1->b_wptr = mp1->b_rptr + cur_size; 710 } 711 712 if (mp != mp1) 713 freeb(mp); 714 715 return (mp1); 716 } 717 718 static void 719 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 720 { 721 ASSERT(dbp->db_mblk == mp); 722 if (dbp->db_fthdr != NULL) 723 str_ftfree(dbp); 724 725 /* set credp and projid to be 'unspecified' before returning to cache */ 726 if (dbp->db_credp != NULL) { 727 crfree(dbp->db_credp); 728 dbp->db_credp = NULL; 729 } 730 dbp->db_cpid = -1; 731 732 /* Reset the struioflag and the checksum flag fields */ 733 dbp->db_struioflag = 0; 734 dbp->db_struioun.cksum.flags = 0; 735 736 /* and the COOKED and/or UIOA flag(s) */ 737 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA); 738 739 kmem_cache_free(dbp->db_cache, dbp); 740 } 741 742 static void 743 dblk_decref(mblk_t *mp, dblk_t *dbp) 744 { 745 if (dbp->db_ref != 1) { 746 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 747 -(1 << DBLK_RTFU_SHIFT(db_ref))); 748 /* 749 * atomic_add_32_nv() just decremented db_ref, so we no longer 750 * have a reference to the dblk, which means another thread 751 * could free it. Therefore we cannot examine the dblk to 752 * determine whether ours was the last reference. Instead, 753 * we extract the new and minimum reference counts from rtfu. 754 * Note that all we're really saying is "if (ref != refmin)". 755 */ 756 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 757 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 758 kmem_cache_free(mblk_cache, mp); 759 return; 760 } 761 } 762 dbp->db_mblk = mp; 763 dbp->db_free = dbp->db_lastfree; 764 dbp->db_lastfree(mp, dbp); 765 } 766 767 mblk_t * 768 dupb(mblk_t *mp) 769 { 770 dblk_t *dbp = mp->b_datap; 771 mblk_t *new_mp; 772 uint32_t oldrtfu, newrtfu; 773 774 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 775 goto out; 776 777 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 778 new_mp->b_rptr = mp->b_rptr; 779 new_mp->b_wptr = mp->b_wptr; 780 new_mp->b_datap = dbp; 781 new_mp->b_queue = NULL; 782 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 783 784 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 785 786 dbp->db_free = dblk_decref; 787 do { 788 ASSERT(dbp->db_ref > 0); 789 oldrtfu = DBLK_RTFU_WORD(dbp); 790 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 791 /* 792 * If db_ref is maxed out we can't dup this message anymore. 793 */ 794 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 795 kmem_cache_free(mblk_cache, new_mp); 796 new_mp = NULL; 797 goto out; 798 } 799 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != 800 oldrtfu); 801 802 out: 803 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 804 return (new_mp); 805 } 806 807 static void 808 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 809 { 810 frtn_t *frp = dbp->db_frtnp; 811 812 ASSERT(dbp->db_mblk == mp); 813 frp->free_func(frp->free_arg); 814 if (dbp->db_fthdr != NULL) 815 str_ftfree(dbp); 816 817 /* set credp and projid to be 'unspecified' before returning to cache */ 818 if (dbp->db_credp != NULL) { 819 crfree(dbp->db_credp); 820 dbp->db_credp = NULL; 821 } 822 dbp->db_cpid = -1; 823 dbp->db_struioflag = 0; 824 dbp->db_struioun.cksum.flags = 0; 825 826 kmem_cache_free(dbp->db_cache, dbp); 827 } 828 829 /*ARGSUSED*/ 830 static void 831 frnop_func(void *arg) 832 { 833 } 834 835 /* 836 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 837 * 838 * The variants with a 'd' prefix (desballoc, desballoca) 839 * directly free the mblk when it loses its last ref, 840 * where the other variants free asynchronously. 841 * The variants with an 'a' suffix (esballoca, desballoca) 842 * add an extra ref, effectively letting the streams subsystem 843 * know that the message data should not be modified. 844 * (eg. see db_ref checks in reallocb and elsewhere) 845 * 846 * The method used by the 'a' suffix functions to keep the dblk 847 * db_ref > 1 is non-obvious. The macro DBLK_RTFU(2,...) passed to 848 * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN 849 * bit in db_flags. In dblk_decref() that flag essentially means 850 * the dblk has one extra ref, so the "last ref" is one, not zero. 851 */ 852 static mblk_t * 853 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 854 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 855 { 856 dblk_t *dbp; 857 mblk_t *mp; 858 859 ASSERT(base != NULL && frp != NULL); 860 861 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 862 mp = NULL; 863 goto out; 864 } 865 866 mp = dbp->db_mblk; 867 dbp->db_base = base; 868 dbp->db_lim = base + size; 869 dbp->db_free = dbp->db_lastfree = lastfree; 870 dbp->db_frtnp = frp; 871 DBLK_RTFU_WORD(dbp) = db_rtfu; 872 mp->b_next = mp->b_prev = mp->b_cont = NULL; 873 mp->b_rptr = mp->b_wptr = base; 874 mp->b_queue = NULL; 875 MBLK_BAND_FLAG_WORD(mp) = 0; 876 877 out: 878 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 879 return (mp); 880 } 881 882 /*ARGSUSED*/ 883 mblk_t * 884 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 885 { 886 mblk_t *mp; 887 888 /* 889 * Note that this is structured to allow the common case (i.e. 890 * STREAMS flowtracing disabled) to call gesballoc() with tail 891 * call optimization. 892 */ 893 if (!str_ftnever) { 894 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 895 frp, freebs_enqueue, KM_NOSLEEP); 896 897 if (mp != NULL) 898 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 899 return (mp); 900 } 901 902 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 903 frp, freebs_enqueue, KM_NOSLEEP)); 904 } 905 906 /* 907 * Same as esballoc() but sleeps waiting for memory. 908 */ 909 /*ARGSUSED*/ 910 mblk_t * 911 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 912 { 913 mblk_t *mp; 914 915 /* 916 * Note that this is structured to allow the common case (i.e. 917 * STREAMS flowtracing disabled) to call gesballoc() with tail 918 * call optimization. 919 */ 920 if (!str_ftnever) { 921 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 922 frp, freebs_enqueue, KM_SLEEP); 923 924 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 925 return (mp); 926 } 927 928 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 929 frp, freebs_enqueue, KM_SLEEP)); 930 } 931 932 /*ARGSUSED*/ 933 mblk_t * 934 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 935 { 936 mblk_t *mp; 937 938 /* 939 * Note that this is structured to allow the common case (i.e. 940 * STREAMS flowtracing disabled) to call gesballoc() with tail 941 * call optimization. 942 */ 943 if (!str_ftnever) { 944 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 945 frp, dblk_lastfree_desb, KM_NOSLEEP); 946 947 if (mp != NULL) 948 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 949 return (mp); 950 } 951 952 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 953 frp, dblk_lastfree_desb, KM_NOSLEEP)); 954 } 955 956 /*ARGSUSED*/ 957 mblk_t * 958 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 959 { 960 mblk_t *mp; 961 962 /* 963 * Note that this is structured to allow the common case (i.e. 964 * STREAMS flowtracing disabled) to call gesballoc() with tail 965 * call optimization. 966 */ 967 if (!str_ftnever) { 968 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 969 frp, freebs_enqueue, KM_NOSLEEP); 970 971 if (mp != NULL) 972 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 973 return (mp); 974 } 975 976 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 977 frp, freebs_enqueue, KM_NOSLEEP)); 978 } 979 980 /* 981 * Same as esballoca() but sleeps waiting for memory. 982 */ 983 mblk_t * 984 esballoca_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 985 { 986 mblk_t *mp; 987 988 /* 989 * Note that this is structured to allow the common case (i.e. 990 * STREAMS flowtracing disabled) to call gesballoc() with tail 991 * call optimization. 992 */ 993 if (!str_ftnever) { 994 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 995 frp, freebs_enqueue, KM_SLEEP); 996 997 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 998 return (mp); 999 } 1000 1001 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 1002 frp, freebs_enqueue, KM_SLEEP)); 1003 } 1004 1005 /*ARGSUSED*/ 1006 mblk_t * 1007 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 1008 { 1009 mblk_t *mp; 1010 1011 /* 1012 * Note that this is structured to allow the common case (i.e. 1013 * STREAMS flowtracing disabled) to call gesballoc() with tail 1014 * call optimization. 1015 */ 1016 if (!str_ftnever) { 1017 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 1018 frp, dblk_lastfree_desb, KM_NOSLEEP); 1019 1020 if (mp != NULL) 1021 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 1022 return (mp); 1023 } 1024 1025 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 1026 frp, dblk_lastfree_desb, KM_NOSLEEP)); 1027 } 1028 1029 static void 1030 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 1031 { 1032 bcache_t *bcp = dbp->db_cache; 1033 1034 ASSERT(dbp->db_mblk == mp); 1035 if (dbp->db_fthdr != NULL) 1036 str_ftfree(dbp); 1037 1038 /* set credp and projid to be 'unspecified' before returning to cache */ 1039 if (dbp->db_credp != NULL) { 1040 crfree(dbp->db_credp); 1041 dbp->db_credp = NULL; 1042 } 1043 dbp->db_cpid = -1; 1044 dbp->db_struioflag = 0; 1045 dbp->db_struioun.cksum.flags = 0; 1046 1047 mutex_enter(&bcp->mutex); 1048 kmem_cache_free(bcp->dblk_cache, dbp); 1049 bcp->alloc--; 1050 1051 if (bcp->alloc == 0 && bcp->destroy != 0) { 1052 kmem_cache_destroy(bcp->dblk_cache); 1053 kmem_cache_destroy(bcp->buffer_cache); 1054 mutex_exit(&bcp->mutex); 1055 mutex_destroy(&bcp->mutex); 1056 kmem_free(bcp, sizeof (bcache_t)); 1057 } else { 1058 mutex_exit(&bcp->mutex); 1059 } 1060 } 1061 1062 bcache_t * 1063 bcache_create(char *name, size_t size, uint_t align) 1064 { 1065 bcache_t *bcp; 1066 char buffer[255]; 1067 1068 ASSERT((align & (align - 1)) == 0); 1069 1070 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL) 1071 return (NULL); 1072 1073 bcp->size = size; 1074 bcp->align = align; 1075 bcp->alloc = 0; 1076 bcp->destroy = 0; 1077 1078 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 1079 1080 (void) sprintf(buffer, "%s_buffer_cache", name); 1081 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 1082 NULL, NULL, NULL, 0); 1083 (void) sprintf(buffer, "%s_dblk_cache", name); 1084 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 1085 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 1086 NULL, (void *)bcp, NULL, 0); 1087 1088 return (bcp); 1089 } 1090 1091 void 1092 bcache_destroy(bcache_t *bcp) 1093 { 1094 ASSERT(bcp != NULL); 1095 1096 mutex_enter(&bcp->mutex); 1097 if (bcp->alloc == 0) { 1098 kmem_cache_destroy(bcp->dblk_cache); 1099 kmem_cache_destroy(bcp->buffer_cache); 1100 mutex_exit(&bcp->mutex); 1101 mutex_destroy(&bcp->mutex); 1102 kmem_free(bcp, sizeof (bcache_t)); 1103 } else { 1104 bcp->destroy++; 1105 mutex_exit(&bcp->mutex); 1106 } 1107 } 1108 1109 /*ARGSUSED*/ 1110 mblk_t * 1111 bcache_allocb(bcache_t *bcp, uint_t pri) 1112 { 1113 dblk_t *dbp; 1114 mblk_t *mp = NULL; 1115 1116 ASSERT(bcp != NULL); 1117 1118 mutex_enter(&bcp->mutex); 1119 if (bcp->destroy != 0) { 1120 mutex_exit(&bcp->mutex); 1121 goto out; 1122 } 1123 1124 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 1125 mutex_exit(&bcp->mutex); 1126 goto out; 1127 } 1128 bcp->alloc++; 1129 mutex_exit(&bcp->mutex); 1130 1131 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 1132 1133 mp = dbp->db_mblk; 1134 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1135 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1136 mp->b_rptr = mp->b_wptr = dbp->db_base; 1137 mp->b_queue = NULL; 1138 MBLK_BAND_FLAG_WORD(mp) = 0; 1139 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 1140 out: 1141 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 1142 1143 return (mp); 1144 } 1145 1146 static void 1147 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 1148 { 1149 ASSERT(dbp->db_mblk == mp); 1150 if (dbp->db_fthdr != NULL) 1151 str_ftfree(dbp); 1152 1153 /* set credp and projid to be 'unspecified' before returning to cache */ 1154 if (dbp->db_credp != NULL) { 1155 crfree(dbp->db_credp); 1156 dbp->db_credp = NULL; 1157 } 1158 dbp->db_cpid = -1; 1159 dbp->db_struioflag = 0; 1160 dbp->db_struioun.cksum.flags = 0; 1161 1162 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 1163 kmem_cache_free(dbp->db_cache, dbp); 1164 } 1165 1166 static mblk_t * 1167 allocb_oversize(size_t size, int kmflags) 1168 { 1169 mblk_t *mp; 1170 void *buf; 1171 1172 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 1173 if ((buf = kmem_alloc(size, kmflags)) == NULL) 1174 return (NULL); 1175 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 1176 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 1177 kmem_free(buf, size); 1178 1179 if (mp != NULL) 1180 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 1181 1182 return (mp); 1183 } 1184 1185 mblk_t * 1186 allocb_tryhard(size_t target_size) 1187 { 1188 size_t size; 1189 mblk_t *bp; 1190 1191 for (size = target_size; size < target_size + 512; 1192 size += DBLK_CACHE_ALIGN) 1193 if ((bp = allocb(size, BPRI_HI)) != NULL) 1194 return (bp); 1195 allocb_tryhard_fails++; 1196 return (NULL); 1197 } 1198 1199 /* 1200 * This routine is consolidation private for STREAMS internal use 1201 * This routine may only be called from sync routines (i.e., not 1202 * from put or service procedures). It is located here (rather 1203 * than strsubr.c) so that we don't have to expose all of the 1204 * allocb() implementation details in header files. 1205 */ 1206 mblk_t * 1207 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 1208 { 1209 dblk_t *dbp; 1210 mblk_t *mp; 1211 size_t index; 1212 1213 index = (size -1) >> DBLK_SIZE_SHIFT; 1214 1215 if (flags & STR_NOSIG) { 1216 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 1217 if (size != 0) { 1218 mp = allocb_oversize(size, KM_SLEEP); 1219 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1220 (uintptr_t)mp); 1221 return (mp); 1222 } 1223 index = 0; 1224 } 1225 1226 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1227 mp = dbp->db_mblk; 1228 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1229 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1230 mp->b_rptr = mp->b_wptr = dbp->db_base; 1231 mp->b_queue = NULL; 1232 MBLK_BAND_FLAG_WORD(mp) = 0; 1233 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1234 1235 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1236 1237 } else { 1238 while ((mp = allocb(size, pri)) == NULL) { 1239 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1240 return (NULL); 1241 } 1242 } 1243 1244 return (mp); 1245 } 1246 1247 /* 1248 * Call function 'func' with 'arg' when a class zero block can 1249 * be allocated with priority 'pri'. 1250 */ 1251 bufcall_id_t 1252 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1253 { 1254 return (bufcall(1, pri, func, arg)); 1255 } 1256 1257 /* 1258 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1259 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1260 * This provides consistency for all internal allocators of ioctl. 1261 */ 1262 mblk_t * 1263 mkiocb(uint_t cmd) 1264 { 1265 struct iocblk *ioc; 1266 mblk_t *mp; 1267 1268 /* 1269 * Allocate enough space for any of the ioctl related messages. 1270 */ 1271 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1272 return (NULL); 1273 1274 bzero(mp->b_rptr, sizeof (union ioctypes)); 1275 1276 /* 1277 * Set the mblk_t information and ptrs correctly. 1278 */ 1279 mp->b_wptr += sizeof (struct iocblk); 1280 mp->b_datap->db_type = M_IOCTL; 1281 1282 /* 1283 * Fill in the fields. 1284 */ 1285 ioc = (struct iocblk *)mp->b_rptr; 1286 ioc->ioc_cmd = cmd; 1287 ioc->ioc_cr = kcred; 1288 ioc->ioc_id = getiocseqno(); 1289 ioc->ioc_flag = IOC_NATIVE; 1290 return (mp); 1291 } 1292 1293 /* 1294 * test if block of given size can be allocated with a request of 1295 * the given priority. 1296 * 'pri' is no longer used, but is retained for compatibility. 1297 */ 1298 /* ARGSUSED */ 1299 int 1300 testb(size_t size, uint_t pri) 1301 { 1302 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1303 } 1304 1305 /* 1306 * Call function 'func' with argument 'arg' when there is a reasonably 1307 * good chance that a block of size 'size' can be allocated. 1308 * 'pri' is no longer used, but is retained for compatibility. 1309 */ 1310 /* ARGSUSED */ 1311 bufcall_id_t 1312 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1313 { 1314 static long bid = 1; /* always odd to save checking for zero */ 1315 bufcall_id_t bc_id; 1316 struct strbufcall *bcp; 1317 1318 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1319 return (0); 1320 1321 bcp->bc_func = func; 1322 bcp->bc_arg = arg; 1323 bcp->bc_size = size; 1324 bcp->bc_next = NULL; 1325 bcp->bc_executor = NULL; 1326 1327 mutex_enter(&strbcall_lock); 1328 /* 1329 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1330 * should be no references to bcp since it may be freed by 1331 * runbufcalls(). Since bcp_id field is returned, we save its value in 1332 * the local var. 1333 */ 1334 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1335 1336 /* 1337 * add newly allocated stream event to existing 1338 * linked list of events. 1339 */ 1340 if (strbcalls.bc_head == NULL) { 1341 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1342 } else { 1343 strbcalls.bc_tail->bc_next = bcp; 1344 strbcalls.bc_tail = bcp; 1345 } 1346 1347 cv_signal(&strbcall_cv); 1348 mutex_exit(&strbcall_lock); 1349 return (bc_id); 1350 } 1351 1352 /* 1353 * Cancel a bufcall request. 1354 */ 1355 void 1356 unbufcall(bufcall_id_t id) 1357 { 1358 strbufcall_t *bcp, *pbcp; 1359 1360 mutex_enter(&strbcall_lock); 1361 again: 1362 pbcp = NULL; 1363 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1364 if (id == bcp->bc_id) 1365 break; 1366 pbcp = bcp; 1367 } 1368 if (bcp) { 1369 if (bcp->bc_executor != NULL) { 1370 if (bcp->bc_executor != curthread) { 1371 cv_wait(&bcall_cv, &strbcall_lock); 1372 goto again; 1373 } 1374 } else { 1375 if (pbcp) 1376 pbcp->bc_next = bcp->bc_next; 1377 else 1378 strbcalls.bc_head = bcp->bc_next; 1379 if (bcp == strbcalls.bc_tail) 1380 strbcalls.bc_tail = pbcp; 1381 kmem_free(bcp, sizeof (strbufcall_t)); 1382 } 1383 } 1384 mutex_exit(&strbcall_lock); 1385 } 1386 1387 /* 1388 * Duplicate a message block by block (uses dupb), returning 1389 * a pointer to the duplicate message. 1390 * Returns a non-NULL value only if the entire message 1391 * was dup'd. 1392 */ 1393 mblk_t * 1394 dupmsg(mblk_t *bp) 1395 { 1396 mblk_t *head, *nbp; 1397 1398 if (!bp || !(nbp = head = dupb(bp))) 1399 return (NULL); 1400 1401 while (bp->b_cont) { 1402 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1403 freemsg(head); 1404 return (NULL); 1405 } 1406 nbp = nbp->b_cont; 1407 bp = bp->b_cont; 1408 } 1409 return (head); 1410 } 1411 1412 #define DUPB_NOLOAN(bp) \ 1413 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1414 copyb((bp)) : dupb((bp))) 1415 1416 mblk_t * 1417 dupmsg_noloan(mblk_t *bp) 1418 { 1419 mblk_t *head, *nbp; 1420 1421 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1422 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1423 return (NULL); 1424 1425 while (bp->b_cont) { 1426 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1427 freemsg(head); 1428 return (NULL); 1429 } 1430 nbp = nbp->b_cont; 1431 bp = bp->b_cont; 1432 } 1433 return (head); 1434 } 1435 1436 /* 1437 * Copy data from message and data block to newly allocated message and 1438 * data block. Returns new message block pointer, or NULL if error. 1439 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1440 * as in the original even when db_base is not word aligned. (bug 1052877) 1441 */ 1442 mblk_t * 1443 copyb(mblk_t *bp) 1444 { 1445 mblk_t *nbp; 1446 dblk_t *dp, *ndp; 1447 uchar_t *base; 1448 size_t size; 1449 size_t unaligned; 1450 1451 ASSERT(bp->b_wptr >= bp->b_rptr); 1452 1453 dp = bp->b_datap; 1454 if (dp->db_fthdr != NULL) 1455 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1456 1457 size = dp->db_lim - dp->db_base; 1458 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1459 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1460 return (NULL); 1461 nbp->b_flag = bp->b_flag; 1462 nbp->b_band = bp->b_band; 1463 ndp = nbp->b_datap; 1464 1465 /* 1466 * Copy the various checksum information that came in 1467 * originally. 1468 */ 1469 ndp->db_cksumstart = dp->db_cksumstart; 1470 ndp->db_cksumend = dp->db_cksumend; 1471 ndp->db_cksumstuff = dp->db_cksumstuff; 1472 bcopy(dp->db_struioun.data, ndp->db_struioun.data, 1473 sizeof (dp->db_struioun.data)); 1474 1475 /* 1476 * Well, here is a potential issue. If we are trying to 1477 * trace a flow, and we copy the message, we might lose 1478 * information about where this message might have been. 1479 * So we should inherit the FT data. On the other hand, 1480 * a user might be interested only in alloc to free data. 1481 * So I guess the real answer is to provide a tunable. 1482 */ 1483 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1484 1485 base = ndp->db_base + unaligned; 1486 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1487 1488 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1489 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1490 1491 return (nbp); 1492 } 1493 1494 /* 1495 * Copy data from message to newly allocated message using new 1496 * data blocks. Returns a pointer to the new message, or NULL if error. 1497 */ 1498 mblk_t * 1499 copymsg(mblk_t *bp) 1500 { 1501 mblk_t *head, *nbp; 1502 1503 if (!bp || !(nbp = head = copyb(bp))) 1504 return (NULL); 1505 1506 while (bp->b_cont) { 1507 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1508 freemsg(head); 1509 return (NULL); 1510 } 1511 nbp = nbp->b_cont; 1512 bp = bp->b_cont; 1513 } 1514 return (head); 1515 } 1516 1517 /* 1518 * link a message block to tail of message 1519 */ 1520 void 1521 linkb(mblk_t *mp, mblk_t *bp) 1522 { 1523 ASSERT(mp && bp); 1524 1525 for (; mp->b_cont; mp = mp->b_cont) 1526 ; 1527 mp->b_cont = bp; 1528 } 1529 1530 /* 1531 * unlink a message block from head of message 1532 * return pointer to new message. 1533 * NULL if message becomes empty. 1534 */ 1535 mblk_t * 1536 unlinkb(mblk_t *bp) 1537 { 1538 mblk_t *bp1; 1539 1540 bp1 = bp->b_cont; 1541 bp->b_cont = NULL; 1542 return (bp1); 1543 } 1544 1545 /* 1546 * remove a message block "bp" from message "mp" 1547 * 1548 * Return pointer to new message or NULL if no message remains. 1549 * Return -1 if bp is not found in message. 1550 */ 1551 mblk_t * 1552 rmvb(mblk_t *mp, mblk_t *bp) 1553 { 1554 mblk_t *tmp; 1555 mblk_t *lastp = NULL; 1556 1557 ASSERT(mp && bp); 1558 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1559 if (tmp == bp) { 1560 if (lastp) 1561 lastp->b_cont = tmp->b_cont; 1562 else 1563 mp = tmp->b_cont; 1564 tmp->b_cont = NULL; 1565 return (mp); 1566 } 1567 lastp = tmp; 1568 } 1569 return ((mblk_t *)-1); 1570 } 1571 1572 /* 1573 * Concatenate and align first len bytes of common 1574 * message type. Len == -1, means concat everything. 1575 * Returns 1 on success, 0 on failure 1576 * After the pullup, mp points to the pulled up data. 1577 */ 1578 int 1579 pullupmsg(mblk_t *mp, ssize_t len) 1580 { 1581 mblk_t *bp, *b_cont; 1582 dblk_t *dbp; 1583 ssize_t n; 1584 1585 ASSERT(mp->b_datap->db_ref > 0); 1586 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1587 1588 if (len == -1) { 1589 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1590 return (1); 1591 len = xmsgsize(mp); 1592 } else { 1593 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1594 ASSERT(first_mblk_len >= 0); 1595 /* 1596 * If the length is less than that of the first mblk, 1597 * we want to pull up the message into an aligned mblk. 1598 * Though not part of the spec, some callers assume it. 1599 */ 1600 if (len <= first_mblk_len) { 1601 if (str_aligned(mp->b_rptr)) 1602 return (1); 1603 len = first_mblk_len; 1604 } else if (xmsgsize(mp) < len) 1605 return (0); 1606 } 1607 1608 if ((bp = allocb_tmpl(len, mp)) == NULL) 1609 return (0); 1610 1611 dbp = bp->b_datap; 1612 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1613 mp->b_datap = dbp; /* ... and mp heads the new message */ 1614 mp->b_datap->db_mblk = mp; 1615 bp->b_datap->db_mblk = bp; 1616 mp->b_rptr = mp->b_wptr = dbp->db_base; 1617 1618 do { 1619 ASSERT(bp->b_datap->db_ref > 0); 1620 ASSERT(bp->b_wptr >= bp->b_rptr); 1621 n = MIN(bp->b_wptr - bp->b_rptr, len); 1622 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1623 if (n > 0) 1624 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1625 mp->b_wptr += n; 1626 bp->b_rptr += n; 1627 len -= n; 1628 if (bp->b_rptr != bp->b_wptr) 1629 break; 1630 b_cont = bp->b_cont; 1631 freeb(bp); 1632 bp = b_cont; 1633 } while (len && bp); 1634 1635 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1636 1637 return (1); 1638 } 1639 1640 /* 1641 * Concatenate and align at least the first len bytes of common message 1642 * type. Len == -1 means concatenate everything. The original message is 1643 * unaltered. Returns a pointer to a new message on success, otherwise 1644 * returns NULL. 1645 */ 1646 mblk_t * 1647 msgpullup(mblk_t *mp, ssize_t len) 1648 { 1649 mblk_t *newmp; 1650 ssize_t totlen; 1651 ssize_t n; 1652 1653 totlen = xmsgsize(mp); 1654 1655 if ((len > 0) && (len > totlen)) 1656 return (NULL); 1657 1658 /* 1659 * Copy all of the first msg type into one new mblk, then dupmsg 1660 * and link the rest onto this. 1661 */ 1662 1663 len = totlen; 1664 1665 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1666 return (NULL); 1667 1668 newmp->b_flag = mp->b_flag; 1669 newmp->b_band = mp->b_band; 1670 1671 while (len > 0) { 1672 n = mp->b_wptr - mp->b_rptr; 1673 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1674 if (n > 0) 1675 bcopy(mp->b_rptr, newmp->b_wptr, n); 1676 newmp->b_wptr += n; 1677 len -= n; 1678 mp = mp->b_cont; 1679 } 1680 1681 if (mp != NULL) { 1682 newmp->b_cont = dupmsg(mp); 1683 if (newmp->b_cont == NULL) { 1684 freemsg(newmp); 1685 return (NULL); 1686 } 1687 } 1688 1689 return (newmp); 1690 } 1691 1692 /* 1693 * Trim bytes from message 1694 * len > 0, trim from head 1695 * len < 0, trim from tail 1696 * Returns 1 on success, 0 on failure. 1697 */ 1698 int 1699 adjmsg(mblk_t *mp, ssize_t len) 1700 { 1701 mblk_t *bp; 1702 mblk_t *save_bp = NULL; 1703 mblk_t *prev_bp; 1704 mblk_t *bcont; 1705 unsigned char type; 1706 ssize_t n; 1707 int fromhead; 1708 int first; 1709 1710 ASSERT(mp != NULL); 1711 1712 if (len < 0) { 1713 fromhead = 0; 1714 len = -len; 1715 } else { 1716 fromhead = 1; 1717 } 1718 1719 if (xmsgsize(mp) < len) 1720 return (0); 1721 1722 if (fromhead) { 1723 first = 1; 1724 while (len) { 1725 ASSERT(mp->b_wptr >= mp->b_rptr); 1726 n = MIN(mp->b_wptr - mp->b_rptr, len); 1727 mp->b_rptr += n; 1728 len -= n; 1729 1730 /* 1731 * If this is not the first zero length 1732 * message remove it 1733 */ 1734 if (!first && (mp->b_wptr == mp->b_rptr)) { 1735 bcont = mp->b_cont; 1736 freeb(mp); 1737 mp = save_bp->b_cont = bcont; 1738 } else { 1739 save_bp = mp; 1740 mp = mp->b_cont; 1741 } 1742 first = 0; 1743 } 1744 } else { 1745 type = mp->b_datap->db_type; 1746 while (len) { 1747 bp = mp; 1748 save_bp = NULL; 1749 1750 /* 1751 * Find the last message of same type 1752 */ 1753 while (bp && bp->b_datap->db_type == type) { 1754 ASSERT(bp->b_wptr >= bp->b_rptr); 1755 prev_bp = save_bp; 1756 save_bp = bp; 1757 bp = bp->b_cont; 1758 } 1759 if (save_bp == NULL) 1760 break; 1761 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1762 save_bp->b_wptr -= n; 1763 len -= n; 1764 1765 /* 1766 * If this is not the first message 1767 * and we have taken away everything 1768 * from this message, remove it 1769 */ 1770 1771 if ((save_bp != mp) && 1772 (save_bp->b_wptr == save_bp->b_rptr)) { 1773 bcont = save_bp->b_cont; 1774 freeb(save_bp); 1775 prev_bp->b_cont = bcont; 1776 } 1777 } 1778 } 1779 return (1); 1780 } 1781 1782 /* 1783 * get number of data bytes in message 1784 */ 1785 size_t 1786 msgdsize(mblk_t *bp) 1787 { 1788 size_t count = 0; 1789 1790 for (; bp; bp = bp->b_cont) 1791 if (bp->b_datap->db_type == M_DATA) { 1792 ASSERT(bp->b_wptr >= bp->b_rptr); 1793 count += bp->b_wptr - bp->b_rptr; 1794 } 1795 return (count); 1796 } 1797 1798 /* 1799 * Get a message off head of queue 1800 * 1801 * If queue has no buffers then mark queue 1802 * with QWANTR. (queue wants to be read by 1803 * someone when data becomes available) 1804 * 1805 * If there is something to take off then do so. 1806 * If queue falls below hi water mark turn off QFULL 1807 * flag. Decrement weighted count of queue. 1808 * Also turn off QWANTR because queue is being read. 1809 * 1810 * The queue count is maintained on a per-band basis. 1811 * Priority band 0 (normal messages) uses q_count, 1812 * q_lowat, etc. Non-zero priority bands use the 1813 * fields in their respective qband structures 1814 * (qb_count, qb_lowat, etc.) All messages appear 1815 * on the same list, linked via their b_next pointers. 1816 * q_first is the head of the list. q_count does 1817 * not reflect the size of all the messages on the 1818 * queue. It only reflects those messages in the 1819 * normal band of flow. The one exception to this 1820 * deals with high priority messages. They are in 1821 * their own conceptual "band", but are accounted 1822 * against q_count. 1823 * 1824 * If queue count is below the lo water mark and QWANTW 1825 * is set, enable the closest backq which has a service 1826 * procedure and turn off the QWANTW flag. 1827 * 1828 * getq could be built on top of rmvq, but isn't because 1829 * of performance considerations. 1830 * 1831 * A note on the use of q_count and q_mblkcnt: 1832 * q_count is the traditional byte count for messages that 1833 * have been put on a queue. Documentation tells us that 1834 * we shouldn't rely on that count, but some drivers/modules 1835 * do. What was needed, however, is a mechanism to prevent 1836 * runaway streams from consuming all of the resources, 1837 * and particularly be able to flow control zero-length 1838 * messages. q_mblkcnt is used for this purpose. It 1839 * counts the number of mblk's that are being put on 1840 * the queue. The intention here, is that each mblk should 1841 * contain one byte of data and, for the purpose of 1842 * flow-control, logically does. A queue will become 1843 * full when EITHER of these values (q_count and q_mblkcnt) 1844 * reach the highwater mark. It will clear when BOTH 1845 * of them drop below the highwater mark. And it will 1846 * backenable when BOTH of them drop below the lowwater 1847 * mark. 1848 * With this algorithm, a driver/module might be able 1849 * to find a reasonably accurate q_count, and the 1850 * framework can still try and limit resource usage. 1851 */ 1852 mblk_t * 1853 getq(queue_t *q) 1854 { 1855 mblk_t *bp; 1856 uchar_t band = 0; 1857 1858 bp = getq_noenab(q, 0); 1859 if (bp != NULL) 1860 band = bp->b_band; 1861 1862 /* 1863 * Inlined from qbackenable(). 1864 * Quick check without holding the lock. 1865 */ 1866 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1867 return (bp); 1868 1869 qbackenable(q, band); 1870 return (bp); 1871 } 1872 1873 /* 1874 * Returns the number of bytes in a message (a message is defined as a 1875 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we 1876 * also return the number of distinct mblks in the message. 1877 */ 1878 int 1879 mp_cont_len(mblk_t *bp, int *mblkcnt) 1880 { 1881 mblk_t *mp; 1882 int mblks = 0; 1883 int bytes = 0; 1884 1885 for (mp = bp; mp != NULL; mp = mp->b_cont) { 1886 bytes += MBLKL(mp); 1887 mblks++; 1888 } 1889 1890 if (mblkcnt != NULL) 1891 *mblkcnt = mblks; 1892 1893 return (bytes); 1894 } 1895 1896 /* 1897 * Like getq() but does not backenable. This is used by the stream 1898 * head when a putback() is likely. The caller must call qbackenable() 1899 * after it is done with accessing the queue. 1900 * The rbytes arguments to getq_noneab() allows callers to specify a 1901 * the maximum number of bytes to return. If the current amount on the 1902 * queue is less than this then the entire message will be returned. 1903 * A value of 0 returns the entire message and is equivalent to the old 1904 * default behaviour prior to the addition of the rbytes argument. 1905 */ 1906 mblk_t * 1907 getq_noenab(queue_t *q, ssize_t rbytes) 1908 { 1909 mblk_t *bp, *mp1; 1910 mblk_t *mp2 = NULL; 1911 qband_t *qbp; 1912 kthread_id_t freezer; 1913 int bytecnt = 0, mblkcnt = 0; 1914 1915 /* freezestr should allow its caller to call getq/putq */ 1916 freezer = STREAM(q)->sd_freezer; 1917 if (freezer == curthread) { 1918 ASSERT(frozenstr(q)); 1919 ASSERT(MUTEX_HELD(QLOCK(q))); 1920 } else 1921 mutex_enter(QLOCK(q)); 1922 1923 if ((bp = q->q_first) == 0) { 1924 q->q_flag |= QWANTR; 1925 } else { 1926 /* 1927 * If the caller supplied a byte threshold and there is 1928 * more than this amount on the queue then break up the 1929 * the message appropriately. We can only safely do 1930 * this for M_DATA messages. 1931 */ 1932 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) && 1933 (q->q_count > rbytes)) { 1934 /* 1935 * Inline version of mp_cont_len() which terminates 1936 * when we meet or exceed rbytes. 1937 */ 1938 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) { 1939 mblkcnt++; 1940 bytecnt += MBLKL(mp1); 1941 if (bytecnt >= rbytes) 1942 break; 1943 } 1944 /* 1945 * We need to account for the following scenarios: 1946 * 1947 * 1) Too much data in the first message: 1948 * mp1 will be the mblk which puts us over our 1949 * byte limit. 1950 * 2) Not enough data in the first message: 1951 * mp1 will be NULL. 1952 * 3) Exactly the right amount of data contained within 1953 * whole mblks: 1954 * mp1->b_cont will be where we break the message. 1955 */ 1956 if (bytecnt > rbytes) { 1957 /* 1958 * Dup/copy mp1 and put what we don't need 1959 * back onto the queue. Adjust the read/write 1960 * and continuation pointers appropriately 1961 * and decrement the current mblk count to 1962 * reflect we are putting an mblk back onto 1963 * the queue. 1964 * When adjusting the message pointers, it's 1965 * OK to use the existing bytecnt and the 1966 * requested amount (rbytes) to calculate the 1967 * the new write offset (b_wptr) of what we 1968 * are taking. However, we cannot use these 1969 * values when calculating the read offset of 1970 * the mblk we are putting back on the queue. 1971 * This is because the begining (b_rptr) of the 1972 * mblk represents some arbitrary point within 1973 * the message. 1974 * It's simplest to do this by advancing b_rptr 1975 * by the new length of mp1 as we don't have to 1976 * remember any intermediate state. 1977 */ 1978 ASSERT(mp1 != NULL); 1979 mblkcnt--; 1980 if ((mp2 = dupb(mp1)) == NULL && 1981 (mp2 = copyb(mp1)) == NULL) { 1982 bytecnt = mblkcnt = 0; 1983 goto dup_failed; 1984 } 1985 mp2->b_cont = mp1->b_cont; 1986 mp1->b_wptr -= bytecnt - rbytes; 1987 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr; 1988 mp1->b_cont = NULL; 1989 bytecnt = rbytes; 1990 } else { 1991 /* 1992 * Either there is not enough data in the first 1993 * message or there is no excess data to deal 1994 * with. If mp1 is NULL, we are taking the 1995 * whole message. No need to do anything. 1996 * Otherwise we assign mp1->b_cont to mp2 as 1997 * we will be putting this back onto the head of 1998 * the queue. 1999 */ 2000 if (mp1 != NULL) { 2001 mp2 = mp1->b_cont; 2002 mp1->b_cont = NULL; 2003 } 2004 } 2005 /* 2006 * If mp2 is not NULL then we have part of the message 2007 * to put back onto the queue. 2008 */ 2009 if (mp2 != NULL) { 2010 if ((mp2->b_next = bp->b_next) == NULL) 2011 q->q_last = mp2; 2012 else 2013 bp->b_next->b_prev = mp2; 2014 q->q_first = mp2; 2015 } else { 2016 if ((q->q_first = bp->b_next) == NULL) 2017 q->q_last = NULL; 2018 else 2019 q->q_first->b_prev = NULL; 2020 } 2021 } else { 2022 /* 2023 * Either no byte threshold was supplied, there is 2024 * not enough on the queue or we failed to 2025 * duplicate/copy a data block. In these cases we 2026 * just take the entire first message. 2027 */ 2028 dup_failed: 2029 bytecnt = mp_cont_len(bp, &mblkcnt); 2030 if ((q->q_first = bp->b_next) == NULL) 2031 q->q_last = NULL; 2032 else 2033 q->q_first->b_prev = NULL; 2034 } 2035 if (bp->b_band == 0) { 2036 q->q_count -= bytecnt; 2037 q->q_mblkcnt -= mblkcnt; 2038 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2039 (q->q_mblkcnt < q->q_hiwat))) { 2040 q->q_flag &= ~QFULL; 2041 } 2042 } else { 2043 int i; 2044 2045 ASSERT(bp->b_band <= q->q_nband); 2046 ASSERT(q->q_bandp != NULL); 2047 ASSERT(MUTEX_HELD(QLOCK(q))); 2048 qbp = q->q_bandp; 2049 i = bp->b_band; 2050 while (--i > 0) 2051 qbp = qbp->qb_next; 2052 if (qbp->qb_first == qbp->qb_last) { 2053 qbp->qb_first = NULL; 2054 qbp->qb_last = NULL; 2055 } else { 2056 qbp->qb_first = bp->b_next; 2057 } 2058 qbp->qb_count -= bytecnt; 2059 qbp->qb_mblkcnt -= mblkcnt; 2060 if (qbp->qb_mblkcnt == 0 || 2061 ((qbp->qb_count < qbp->qb_hiwat) && 2062 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2063 qbp->qb_flag &= ~QB_FULL; 2064 } 2065 } 2066 q->q_flag &= ~QWANTR; 2067 bp->b_next = NULL; 2068 bp->b_prev = NULL; 2069 } 2070 if (freezer != curthread) 2071 mutex_exit(QLOCK(q)); 2072 2073 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0); 2074 2075 return (bp); 2076 } 2077 2078 /* 2079 * Determine if a backenable is needed after removing a message in the 2080 * specified band. 2081 * NOTE: This routine assumes that something like getq_noenab() has been 2082 * already called. 2083 * 2084 * For the read side it is ok to hold sd_lock across calling this (and the 2085 * stream head often does). 2086 * But for the write side strwakeq might be invoked and it acquires sd_lock. 2087 */ 2088 void 2089 qbackenable(queue_t *q, uchar_t band) 2090 { 2091 int backenab = 0; 2092 qband_t *qbp; 2093 kthread_id_t freezer; 2094 2095 ASSERT(q); 2096 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 2097 2098 /* 2099 * Quick check without holding the lock. 2100 * OK since after getq() has lowered the q_count these flags 2101 * would not change unless either the qbackenable() is done by 2102 * another thread (which is ok) or the queue has gotten QFULL 2103 * in which case another backenable will take place when the queue 2104 * drops below q_lowat. 2105 */ 2106 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 2107 return; 2108 2109 /* freezestr should allow its caller to call getq/putq */ 2110 freezer = STREAM(q)->sd_freezer; 2111 if (freezer == curthread) { 2112 ASSERT(frozenstr(q)); 2113 ASSERT(MUTEX_HELD(QLOCK(q))); 2114 } else 2115 mutex_enter(QLOCK(q)); 2116 2117 if (band == 0) { 2118 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 2119 q->q_mblkcnt < q->q_lowat)) { 2120 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 2121 } 2122 } else { 2123 int i; 2124 2125 ASSERT((unsigned)band <= q->q_nband); 2126 ASSERT(q->q_bandp != NULL); 2127 2128 qbp = q->q_bandp; 2129 i = band; 2130 while (--i > 0) 2131 qbp = qbp->qb_next; 2132 2133 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 2134 qbp->qb_mblkcnt < qbp->qb_lowat)) { 2135 backenab = qbp->qb_flag & QB_WANTW; 2136 } 2137 } 2138 2139 if (backenab == 0) { 2140 if (freezer != curthread) 2141 mutex_exit(QLOCK(q)); 2142 return; 2143 } 2144 2145 /* Have to drop the lock across strwakeq and backenable */ 2146 if (backenab & QWANTWSYNC) 2147 q->q_flag &= ~QWANTWSYNC; 2148 if (backenab & (QWANTW|QB_WANTW)) { 2149 if (band != 0) 2150 qbp->qb_flag &= ~QB_WANTW; 2151 else { 2152 q->q_flag &= ~QWANTW; 2153 } 2154 } 2155 2156 if (freezer != curthread) 2157 mutex_exit(QLOCK(q)); 2158 2159 if (backenab & QWANTWSYNC) 2160 strwakeq(q, QWANTWSYNC); 2161 if (backenab & (QWANTW|QB_WANTW)) 2162 backenable(q, band); 2163 } 2164 2165 /* 2166 * Remove a message from a queue. The queue count and other 2167 * flow control parameters are adjusted and the back queue 2168 * enabled if necessary. 2169 * 2170 * rmvq can be called with the stream frozen, but other utility functions 2171 * holding QLOCK, and by streams modules without any locks/frozen. 2172 */ 2173 void 2174 rmvq(queue_t *q, mblk_t *mp) 2175 { 2176 ASSERT(mp != NULL); 2177 2178 rmvq_noenab(q, mp); 2179 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 2180 /* 2181 * qbackenable can handle a frozen stream but not a "random" 2182 * qlock being held. Drop lock across qbackenable. 2183 */ 2184 mutex_exit(QLOCK(q)); 2185 qbackenable(q, mp->b_band); 2186 mutex_enter(QLOCK(q)); 2187 } else { 2188 qbackenable(q, mp->b_band); 2189 } 2190 } 2191 2192 /* 2193 * Like rmvq() but without any backenabling. 2194 * This exists to handle SR_CONSOL_DATA in strrput(). 2195 */ 2196 void 2197 rmvq_noenab(queue_t *q, mblk_t *mp) 2198 { 2199 int i; 2200 qband_t *qbp = NULL; 2201 kthread_id_t freezer; 2202 int bytecnt = 0, mblkcnt = 0; 2203 2204 freezer = STREAM(q)->sd_freezer; 2205 if (freezer == curthread) { 2206 ASSERT(frozenstr(q)); 2207 ASSERT(MUTEX_HELD(QLOCK(q))); 2208 } else if (MUTEX_HELD(QLOCK(q))) { 2209 /* Don't drop lock on exit */ 2210 freezer = curthread; 2211 } else 2212 mutex_enter(QLOCK(q)); 2213 2214 ASSERT(mp->b_band <= q->q_nband); 2215 if (mp->b_band != 0) { /* Adjust band pointers */ 2216 ASSERT(q->q_bandp != NULL); 2217 qbp = q->q_bandp; 2218 i = mp->b_band; 2219 while (--i > 0) 2220 qbp = qbp->qb_next; 2221 if (mp == qbp->qb_first) { 2222 if (mp->b_next && mp->b_band == mp->b_next->b_band) 2223 qbp->qb_first = mp->b_next; 2224 else 2225 qbp->qb_first = NULL; 2226 } 2227 if (mp == qbp->qb_last) { 2228 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 2229 qbp->qb_last = mp->b_prev; 2230 else 2231 qbp->qb_last = NULL; 2232 } 2233 } 2234 2235 /* 2236 * Remove the message from the list. 2237 */ 2238 if (mp->b_prev) 2239 mp->b_prev->b_next = mp->b_next; 2240 else 2241 q->q_first = mp->b_next; 2242 if (mp->b_next) 2243 mp->b_next->b_prev = mp->b_prev; 2244 else 2245 q->q_last = mp->b_prev; 2246 mp->b_next = NULL; 2247 mp->b_prev = NULL; 2248 2249 /* Get the size of the message for q_count accounting */ 2250 bytecnt = mp_cont_len(mp, &mblkcnt); 2251 2252 if (mp->b_band == 0) { /* Perform q_count accounting */ 2253 q->q_count -= bytecnt; 2254 q->q_mblkcnt -= mblkcnt; 2255 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2256 (q->q_mblkcnt < q->q_hiwat))) { 2257 q->q_flag &= ~QFULL; 2258 } 2259 } else { /* Perform qb_count accounting */ 2260 qbp->qb_count -= bytecnt; 2261 qbp->qb_mblkcnt -= mblkcnt; 2262 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) && 2263 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2264 qbp->qb_flag &= ~QB_FULL; 2265 } 2266 } 2267 if (freezer != curthread) 2268 mutex_exit(QLOCK(q)); 2269 2270 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0); 2271 } 2272 2273 /* 2274 * Empty a queue. 2275 * If flag is set, remove all messages. Otherwise, remove 2276 * only non-control messages. If queue falls below its low 2277 * water mark, and QWANTW is set, enable the nearest upstream 2278 * service procedure. 2279 * 2280 * Historical note: when merging the M_FLUSH code in strrput with this 2281 * code one difference was discovered. flushq did not have a check 2282 * for q_lowat == 0 in the backenabling test. 2283 * 2284 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2285 * if one exists on the queue. 2286 */ 2287 void 2288 flushq_common(queue_t *q, int flag, int pcproto_flag) 2289 { 2290 mblk_t *mp, *nmp; 2291 qband_t *qbp; 2292 int backenab = 0; 2293 unsigned char bpri; 2294 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2295 2296 if (q->q_first == NULL) 2297 return; 2298 2299 mutex_enter(QLOCK(q)); 2300 mp = q->q_first; 2301 q->q_first = NULL; 2302 q->q_last = NULL; 2303 q->q_count = 0; 2304 q->q_mblkcnt = 0; 2305 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2306 qbp->qb_first = NULL; 2307 qbp->qb_last = NULL; 2308 qbp->qb_count = 0; 2309 qbp->qb_mblkcnt = 0; 2310 qbp->qb_flag &= ~QB_FULL; 2311 } 2312 q->q_flag &= ~QFULL; 2313 mutex_exit(QLOCK(q)); 2314 while (mp) { 2315 nmp = mp->b_next; 2316 mp->b_next = mp->b_prev = NULL; 2317 2318 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0); 2319 2320 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2321 (void) putq(q, mp); 2322 else if (flag || datamsg(mp->b_datap->db_type)) 2323 freemsg(mp); 2324 else 2325 (void) putq(q, mp); 2326 mp = nmp; 2327 } 2328 bpri = 1; 2329 mutex_enter(QLOCK(q)); 2330 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2331 if ((qbp->qb_flag & QB_WANTW) && 2332 (((qbp->qb_count < qbp->qb_lowat) && 2333 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2334 qbp->qb_lowat == 0)) { 2335 qbp->qb_flag &= ~QB_WANTW; 2336 backenab = 1; 2337 qbf[bpri] = 1; 2338 } else 2339 qbf[bpri] = 0; 2340 bpri++; 2341 } 2342 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2343 if ((q->q_flag & QWANTW) && 2344 (((q->q_count < q->q_lowat) && 2345 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2346 q->q_flag &= ~QWANTW; 2347 backenab = 1; 2348 qbf[0] = 1; 2349 } else 2350 qbf[0] = 0; 2351 2352 /* 2353 * If any band can now be written to, and there is a writer 2354 * for that band, then backenable the closest service procedure. 2355 */ 2356 if (backenab) { 2357 mutex_exit(QLOCK(q)); 2358 for (bpri = q->q_nband; bpri != 0; bpri--) 2359 if (qbf[bpri]) 2360 backenable(q, bpri); 2361 if (qbf[0]) 2362 backenable(q, 0); 2363 } else 2364 mutex_exit(QLOCK(q)); 2365 } 2366 2367 /* 2368 * The real flushing takes place in flushq_common. This is done so that 2369 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2370 * or not. Currently the only place that uses this flag is the stream head. 2371 */ 2372 void 2373 flushq(queue_t *q, int flag) 2374 { 2375 flushq_common(q, flag, 0); 2376 } 2377 2378 /* 2379 * Flush the queue of messages of the given priority band. 2380 * There is some duplication of code between flushq and flushband. 2381 * This is because we want to optimize the code as much as possible. 2382 * The assumption is that there will be more messages in the normal 2383 * (priority 0) band than in any other. 2384 * 2385 * Historical note: when merging the M_FLUSH code in strrput with this 2386 * code one difference was discovered. flushband had an extra check for 2387 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2388 * case. That check does not match the man page for flushband and was not 2389 * in the strrput flush code hence it was removed. 2390 */ 2391 void 2392 flushband(queue_t *q, unsigned char pri, int flag) 2393 { 2394 mblk_t *mp; 2395 mblk_t *nmp; 2396 mblk_t *last; 2397 qband_t *qbp; 2398 int band; 2399 2400 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2401 if (pri > q->q_nband) { 2402 return; 2403 } 2404 mutex_enter(QLOCK(q)); 2405 if (pri == 0) { 2406 mp = q->q_first; 2407 q->q_first = NULL; 2408 q->q_last = NULL; 2409 q->q_count = 0; 2410 q->q_mblkcnt = 0; 2411 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2412 qbp->qb_first = NULL; 2413 qbp->qb_last = NULL; 2414 qbp->qb_count = 0; 2415 qbp->qb_mblkcnt = 0; 2416 qbp->qb_flag &= ~QB_FULL; 2417 } 2418 q->q_flag &= ~QFULL; 2419 mutex_exit(QLOCK(q)); 2420 while (mp) { 2421 nmp = mp->b_next; 2422 mp->b_next = mp->b_prev = NULL; 2423 if ((mp->b_band == 0) && 2424 ((flag == FLUSHALL) || 2425 datamsg(mp->b_datap->db_type))) 2426 freemsg(mp); 2427 else 2428 (void) putq(q, mp); 2429 mp = nmp; 2430 } 2431 mutex_enter(QLOCK(q)); 2432 if ((q->q_flag & QWANTW) && 2433 (((q->q_count < q->q_lowat) && 2434 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2435 q->q_flag &= ~QWANTW; 2436 mutex_exit(QLOCK(q)); 2437 2438 backenable(q, pri); 2439 } else 2440 mutex_exit(QLOCK(q)); 2441 } else { /* pri != 0 */ 2442 boolean_t flushed = B_FALSE; 2443 band = pri; 2444 2445 ASSERT(MUTEX_HELD(QLOCK(q))); 2446 qbp = q->q_bandp; 2447 while (--band > 0) 2448 qbp = qbp->qb_next; 2449 mp = qbp->qb_first; 2450 if (mp == NULL) { 2451 mutex_exit(QLOCK(q)); 2452 return; 2453 } 2454 last = qbp->qb_last->b_next; 2455 /* 2456 * rmvq_noenab() and freemsg() are called for each mblk that 2457 * meets the criteria. The loop is executed until the last 2458 * mblk has been processed. 2459 */ 2460 while (mp != last) { 2461 ASSERT(mp->b_band == pri); 2462 nmp = mp->b_next; 2463 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2464 rmvq_noenab(q, mp); 2465 freemsg(mp); 2466 flushed = B_TRUE; 2467 } 2468 mp = nmp; 2469 } 2470 mutex_exit(QLOCK(q)); 2471 2472 /* 2473 * If any mblk(s) has been freed, we know that qbackenable() 2474 * will need to be called. 2475 */ 2476 if (flushed) 2477 qbackenable(q, pri); 2478 } 2479 } 2480 2481 /* 2482 * Return 1 if the queue is not full. If the queue is full, return 2483 * 0 (may not put message) and set QWANTW flag (caller wants to write 2484 * to the queue). 2485 */ 2486 int 2487 canput(queue_t *q) 2488 { 2489 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2490 2491 /* this is for loopback transports, they should not do a canput */ 2492 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2493 2494 /* Find next forward module that has a service procedure */ 2495 q = q->q_nfsrv; 2496 2497 if (!(q->q_flag & QFULL)) { 2498 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2499 return (1); 2500 } 2501 mutex_enter(QLOCK(q)); 2502 if (q->q_flag & QFULL) { 2503 q->q_flag |= QWANTW; 2504 mutex_exit(QLOCK(q)); 2505 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2506 return (0); 2507 } 2508 mutex_exit(QLOCK(q)); 2509 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2510 return (1); 2511 } 2512 2513 /* 2514 * This is the new canput for use with priority bands. Return 1 if the 2515 * band is not full. If the band is full, return 0 (may not put message) 2516 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2517 * write to the queue). 2518 */ 2519 int 2520 bcanput(queue_t *q, unsigned char pri) 2521 { 2522 qband_t *qbp; 2523 2524 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2525 if (!q) 2526 return (0); 2527 2528 /* Find next forward module that has a service procedure */ 2529 q = q->q_nfsrv; 2530 2531 mutex_enter(QLOCK(q)); 2532 if (pri == 0) { 2533 if (q->q_flag & QFULL) { 2534 q->q_flag |= QWANTW; 2535 mutex_exit(QLOCK(q)); 2536 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2537 "bcanput:%p %X %d", q, pri, 0); 2538 return (0); 2539 } 2540 } else { /* pri != 0 */ 2541 if (pri > q->q_nband) { 2542 /* 2543 * No band exists yet, so return success. 2544 */ 2545 mutex_exit(QLOCK(q)); 2546 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2547 "bcanput:%p %X %d", q, pri, 1); 2548 return (1); 2549 } 2550 qbp = q->q_bandp; 2551 while (--pri) 2552 qbp = qbp->qb_next; 2553 if (qbp->qb_flag & QB_FULL) { 2554 qbp->qb_flag |= QB_WANTW; 2555 mutex_exit(QLOCK(q)); 2556 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2557 "bcanput:%p %X %d", q, pri, 0); 2558 return (0); 2559 } 2560 } 2561 mutex_exit(QLOCK(q)); 2562 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2563 "bcanput:%p %X %d", q, pri, 1); 2564 return (1); 2565 } 2566 2567 /* 2568 * Put a message on a queue. 2569 * 2570 * Messages are enqueued on a priority basis. The priority classes 2571 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2572 * and B_NORMAL (type < QPCTL && band == 0). 2573 * 2574 * Add appropriate weighted data block sizes to queue count. 2575 * If queue hits high water mark then set QFULL flag. 2576 * 2577 * If QNOENAB is not set (putq is allowed to enable the queue), 2578 * enable the queue only if the message is PRIORITY, 2579 * or the QWANTR flag is set (indicating that the service procedure 2580 * is ready to read the queue. This implies that a service 2581 * procedure must NEVER put a high priority message back on its own 2582 * queue, as this would result in an infinite loop (!). 2583 */ 2584 int 2585 putq(queue_t *q, mblk_t *bp) 2586 { 2587 mblk_t *tmp; 2588 qband_t *qbp = NULL; 2589 int mcls = (int)queclass(bp); 2590 kthread_id_t freezer; 2591 int bytecnt = 0, mblkcnt = 0; 2592 2593 freezer = STREAM(q)->sd_freezer; 2594 if (freezer == curthread) { 2595 ASSERT(frozenstr(q)); 2596 ASSERT(MUTEX_HELD(QLOCK(q))); 2597 } else 2598 mutex_enter(QLOCK(q)); 2599 2600 /* 2601 * Make sanity checks and if qband structure is not yet 2602 * allocated, do so. 2603 */ 2604 if (mcls == QPCTL) { 2605 if (bp->b_band != 0) 2606 bp->b_band = 0; /* force to be correct */ 2607 } else if (bp->b_band != 0) { 2608 int i; 2609 qband_t **qbpp; 2610 2611 if (bp->b_band > q->q_nband) { 2612 2613 /* 2614 * The qband structure for this priority band is 2615 * not on the queue yet, so we have to allocate 2616 * one on the fly. It would be wasteful to 2617 * associate the qband structures with every 2618 * queue when the queues are allocated. This is 2619 * because most queues will only need the normal 2620 * band of flow which can be described entirely 2621 * by the queue itself. 2622 */ 2623 qbpp = &q->q_bandp; 2624 while (*qbpp) 2625 qbpp = &(*qbpp)->qb_next; 2626 while (bp->b_band > q->q_nband) { 2627 if ((*qbpp = allocband()) == NULL) { 2628 if (freezer != curthread) 2629 mutex_exit(QLOCK(q)); 2630 return (0); 2631 } 2632 (*qbpp)->qb_hiwat = q->q_hiwat; 2633 (*qbpp)->qb_lowat = q->q_lowat; 2634 q->q_nband++; 2635 qbpp = &(*qbpp)->qb_next; 2636 } 2637 } 2638 ASSERT(MUTEX_HELD(QLOCK(q))); 2639 qbp = q->q_bandp; 2640 i = bp->b_band; 2641 while (--i) 2642 qbp = qbp->qb_next; 2643 } 2644 2645 /* 2646 * If queue is empty, add the message and initialize the pointers. 2647 * Otherwise, adjust message pointers and queue pointers based on 2648 * the type of the message and where it belongs on the queue. Some 2649 * code is duplicated to minimize the number of conditionals and 2650 * hopefully minimize the amount of time this routine takes. 2651 */ 2652 if (!q->q_first) { 2653 bp->b_next = NULL; 2654 bp->b_prev = NULL; 2655 q->q_first = bp; 2656 q->q_last = bp; 2657 if (qbp) { 2658 qbp->qb_first = bp; 2659 qbp->qb_last = bp; 2660 } 2661 } else if (!qbp) { /* bp->b_band == 0 */ 2662 2663 /* 2664 * If queue class of message is less than or equal to 2665 * that of the last one on the queue, tack on to the end. 2666 */ 2667 tmp = q->q_last; 2668 if (mcls <= (int)queclass(tmp)) { 2669 bp->b_next = NULL; 2670 bp->b_prev = tmp; 2671 tmp->b_next = bp; 2672 q->q_last = bp; 2673 } else { 2674 tmp = q->q_first; 2675 while ((int)queclass(tmp) >= mcls) 2676 tmp = tmp->b_next; 2677 2678 /* 2679 * Insert bp before tmp. 2680 */ 2681 bp->b_next = tmp; 2682 bp->b_prev = tmp->b_prev; 2683 if (tmp->b_prev) 2684 tmp->b_prev->b_next = bp; 2685 else 2686 q->q_first = bp; 2687 tmp->b_prev = bp; 2688 } 2689 } else { /* bp->b_band != 0 */ 2690 if (qbp->qb_first) { 2691 tmp = qbp->qb_last; 2692 2693 /* 2694 * Insert bp after the last message in this band. 2695 */ 2696 bp->b_next = tmp->b_next; 2697 if (tmp->b_next) 2698 tmp->b_next->b_prev = bp; 2699 else 2700 q->q_last = bp; 2701 bp->b_prev = tmp; 2702 tmp->b_next = bp; 2703 } else { 2704 tmp = q->q_last; 2705 if ((mcls < (int)queclass(tmp)) || 2706 (bp->b_band <= tmp->b_band)) { 2707 2708 /* 2709 * Tack bp on end of queue. 2710 */ 2711 bp->b_next = NULL; 2712 bp->b_prev = tmp; 2713 tmp->b_next = bp; 2714 q->q_last = bp; 2715 } else { 2716 tmp = q->q_first; 2717 while (tmp->b_datap->db_type >= QPCTL) 2718 tmp = tmp->b_next; 2719 while (tmp->b_band >= bp->b_band) 2720 tmp = tmp->b_next; 2721 2722 /* 2723 * Insert bp before tmp. 2724 */ 2725 bp->b_next = tmp; 2726 bp->b_prev = tmp->b_prev; 2727 if (tmp->b_prev) 2728 tmp->b_prev->b_next = bp; 2729 else 2730 q->q_first = bp; 2731 tmp->b_prev = bp; 2732 } 2733 qbp->qb_first = bp; 2734 } 2735 qbp->qb_last = bp; 2736 } 2737 2738 /* Get message byte count for q_count accounting */ 2739 bytecnt = mp_cont_len(bp, &mblkcnt); 2740 2741 if (qbp) { 2742 qbp->qb_count += bytecnt; 2743 qbp->qb_mblkcnt += mblkcnt; 2744 if ((qbp->qb_count >= qbp->qb_hiwat) || 2745 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2746 qbp->qb_flag |= QB_FULL; 2747 } 2748 } else { 2749 q->q_count += bytecnt; 2750 q->q_mblkcnt += mblkcnt; 2751 if ((q->q_count >= q->q_hiwat) || 2752 (q->q_mblkcnt >= q->q_hiwat)) { 2753 q->q_flag |= QFULL; 2754 } 2755 } 2756 2757 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0); 2758 2759 if ((mcls > QNORM) || 2760 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2761 qenable_locked(q); 2762 ASSERT(MUTEX_HELD(QLOCK(q))); 2763 if (freezer != curthread) 2764 mutex_exit(QLOCK(q)); 2765 2766 return (1); 2767 } 2768 2769 /* 2770 * Put stuff back at beginning of Q according to priority order. 2771 * See comment on putq above for details. 2772 */ 2773 int 2774 putbq(queue_t *q, mblk_t *bp) 2775 { 2776 mblk_t *tmp; 2777 qband_t *qbp = NULL; 2778 int mcls = (int)queclass(bp); 2779 kthread_id_t freezer; 2780 int bytecnt = 0, mblkcnt = 0; 2781 2782 ASSERT(q && bp); 2783 ASSERT(bp->b_next == NULL); 2784 freezer = STREAM(q)->sd_freezer; 2785 if (freezer == curthread) { 2786 ASSERT(frozenstr(q)); 2787 ASSERT(MUTEX_HELD(QLOCK(q))); 2788 } else 2789 mutex_enter(QLOCK(q)); 2790 2791 /* 2792 * Make sanity checks and if qband structure is not yet 2793 * allocated, do so. 2794 */ 2795 if (mcls == QPCTL) { 2796 if (bp->b_band != 0) 2797 bp->b_band = 0; /* force to be correct */ 2798 } else if (bp->b_band != 0) { 2799 int i; 2800 qband_t **qbpp; 2801 2802 if (bp->b_band > q->q_nband) { 2803 qbpp = &q->q_bandp; 2804 while (*qbpp) 2805 qbpp = &(*qbpp)->qb_next; 2806 while (bp->b_band > q->q_nband) { 2807 if ((*qbpp = allocband()) == NULL) { 2808 if (freezer != curthread) 2809 mutex_exit(QLOCK(q)); 2810 return (0); 2811 } 2812 (*qbpp)->qb_hiwat = q->q_hiwat; 2813 (*qbpp)->qb_lowat = q->q_lowat; 2814 q->q_nband++; 2815 qbpp = &(*qbpp)->qb_next; 2816 } 2817 } 2818 qbp = q->q_bandp; 2819 i = bp->b_band; 2820 while (--i) 2821 qbp = qbp->qb_next; 2822 } 2823 2824 /* 2825 * If queue is empty or if message is high priority, 2826 * place on the front of the queue. 2827 */ 2828 tmp = q->q_first; 2829 if ((!tmp) || (mcls == QPCTL)) { 2830 bp->b_next = tmp; 2831 if (tmp) 2832 tmp->b_prev = bp; 2833 else 2834 q->q_last = bp; 2835 q->q_first = bp; 2836 bp->b_prev = NULL; 2837 if (qbp) { 2838 qbp->qb_first = bp; 2839 qbp->qb_last = bp; 2840 } 2841 } else if (qbp) { /* bp->b_band != 0 */ 2842 tmp = qbp->qb_first; 2843 if (tmp) { 2844 2845 /* 2846 * Insert bp before the first message in this band. 2847 */ 2848 bp->b_next = tmp; 2849 bp->b_prev = tmp->b_prev; 2850 if (tmp->b_prev) 2851 tmp->b_prev->b_next = bp; 2852 else 2853 q->q_first = bp; 2854 tmp->b_prev = bp; 2855 } else { 2856 tmp = q->q_last; 2857 if ((mcls < (int)queclass(tmp)) || 2858 (bp->b_band < tmp->b_band)) { 2859 2860 /* 2861 * Tack bp on end of queue. 2862 */ 2863 bp->b_next = NULL; 2864 bp->b_prev = tmp; 2865 tmp->b_next = bp; 2866 q->q_last = bp; 2867 } else { 2868 tmp = q->q_first; 2869 while (tmp->b_datap->db_type >= QPCTL) 2870 tmp = tmp->b_next; 2871 while (tmp->b_band > bp->b_band) 2872 tmp = tmp->b_next; 2873 2874 /* 2875 * Insert bp before tmp. 2876 */ 2877 bp->b_next = tmp; 2878 bp->b_prev = tmp->b_prev; 2879 if (tmp->b_prev) 2880 tmp->b_prev->b_next = bp; 2881 else 2882 q->q_first = bp; 2883 tmp->b_prev = bp; 2884 } 2885 qbp->qb_last = bp; 2886 } 2887 qbp->qb_first = bp; 2888 } else { /* bp->b_band == 0 && !QPCTL */ 2889 2890 /* 2891 * If the queue class or band is less than that of the last 2892 * message on the queue, tack bp on the end of the queue. 2893 */ 2894 tmp = q->q_last; 2895 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2896 bp->b_next = NULL; 2897 bp->b_prev = tmp; 2898 tmp->b_next = bp; 2899 q->q_last = bp; 2900 } else { 2901 tmp = q->q_first; 2902 while (tmp->b_datap->db_type >= QPCTL) 2903 tmp = tmp->b_next; 2904 while (tmp->b_band > bp->b_band) 2905 tmp = tmp->b_next; 2906 2907 /* 2908 * Insert bp before tmp. 2909 */ 2910 bp->b_next = tmp; 2911 bp->b_prev = tmp->b_prev; 2912 if (tmp->b_prev) 2913 tmp->b_prev->b_next = bp; 2914 else 2915 q->q_first = bp; 2916 tmp->b_prev = bp; 2917 } 2918 } 2919 2920 /* Get message byte count for q_count accounting */ 2921 bytecnt = mp_cont_len(bp, &mblkcnt); 2922 2923 if (qbp) { 2924 qbp->qb_count += bytecnt; 2925 qbp->qb_mblkcnt += mblkcnt; 2926 if ((qbp->qb_count >= qbp->qb_hiwat) || 2927 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2928 qbp->qb_flag |= QB_FULL; 2929 } 2930 } else { 2931 q->q_count += bytecnt; 2932 q->q_mblkcnt += mblkcnt; 2933 if ((q->q_count >= q->q_hiwat) || 2934 (q->q_mblkcnt >= q->q_hiwat)) { 2935 q->q_flag |= QFULL; 2936 } 2937 } 2938 2939 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0); 2940 2941 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2942 qenable_locked(q); 2943 ASSERT(MUTEX_HELD(QLOCK(q))); 2944 if (freezer != curthread) 2945 mutex_exit(QLOCK(q)); 2946 2947 return (1); 2948 } 2949 2950 /* 2951 * Insert a message before an existing message on the queue. If the 2952 * existing message is NULL, the new messages is placed on the end of 2953 * the queue. The queue class of the new message is ignored. However, 2954 * the priority band of the new message must adhere to the following 2955 * ordering: 2956 * 2957 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2958 * 2959 * All flow control parameters are updated. 2960 * 2961 * insq can be called with the stream frozen, but other utility functions 2962 * holding QLOCK, and by streams modules without any locks/frozen. 2963 */ 2964 int 2965 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2966 { 2967 mblk_t *tmp; 2968 qband_t *qbp = NULL; 2969 int mcls = (int)queclass(mp); 2970 kthread_id_t freezer; 2971 int bytecnt = 0, mblkcnt = 0; 2972 2973 freezer = STREAM(q)->sd_freezer; 2974 if (freezer == curthread) { 2975 ASSERT(frozenstr(q)); 2976 ASSERT(MUTEX_HELD(QLOCK(q))); 2977 } else if (MUTEX_HELD(QLOCK(q))) { 2978 /* Don't drop lock on exit */ 2979 freezer = curthread; 2980 } else 2981 mutex_enter(QLOCK(q)); 2982 2983 if (mcls == QPCTL) { 2984 if (mp->b_band != 0) 2985 mp->b_band = 0; /* force to be correct */ 2986 if (emp && emp->b_prev && 2987 (emp->b_prev->b_datap->db_type < QPCTL)) 2988 goto badord; 2989 } 2990 if (emp) { 2991 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 2992 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 2993 (emp->b_prev->b_band < mp->b_band))) { 2994 goto badord; 2995 } 2996 } else { 2997 tmp = q->q_last; 2998 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 2999 badord: 3000 cmn_err(CE_WARN, 3001 "insq: attempt to insert message out of order " 3002 "on q %p", (void *)q); 3003 if (freezer != curthread) 3004 mutex_exit(QLOCK(q)); 3005 return (0); 3006 } 3007 } 3008 3009 if (mp->b_band != 0) { 3010 int i; 3011 qband_t **qbpp; 3012 3013 if (mp->b_band > q->q_nband) { 3014 qbpp = &q->q_bandp; 3015 while (*qbpp) 3016 qbpp = &(*qbpp)->qb_next; 3017 while (mp->b_band > q->q_nband) { 3018 if ((*qbpp = allocband()) == NULL) { 3019 if (freezer != curthread) 3020 mutex_exit(QLOCK(q)); 3021 return (0); 3022 } 3023 (*qbpp)->qb_hiwat = q->q_hiwat; 3024 (*qbpp)->qb_lowat = q->q_lowat; 3025 q->q_nband++; 3026 qbpp = &(*qbpp)->qb_next; 3027 } 3028 } 3029 qbp = q->q_bandp; 3030 i = mp->b_band; 3031 while (--i) 3032 qbp = qbp->qb_next; 3033 } 3034 3035 if ((mp->b_next = emp) != NULL) { 3036 if ((mp->b_prev = emp->b_prev) != NULL) 3037 emp->b_prev->b_next = mp; 3038 else 3039 q->q_first = mp; 3040 emp->b_prev = mp; 3041 } else { 3042 if ((mp->b_prev = q->q_last) != NULL) 3043 q->q_last->b_next = mp; 3044 else 3045 q->q_first = mp; 3046 q->q_last = mp; 3047 } 3048 3049 /* Get mblk and byte count for q_count accounting */ 3050 bytecnt = mp_cont_len(mp, &mblkcnt); 3051 3052 if (qbp) { /* adjust qband pointers and count */ 3053 if (!qbp->qb_first) { 3054 qbp->qb_first = mp; 3055 qbp->qb_last = mp; 3056 } else { 3057 if (mp->b_prev == NULL || (mp->b_prev != NULL && 3058 (mp->b_prev->b_band != mp->b_band))) 3059 qbp->qb_first = mp; 3060 else if (mp->b_next == NULL || (mp->b_next != NULL && 3061 (mp->b_next->b_band != mp->b_band))) 3062 qbp->qb_last = mp; 3063 } 3064 qbp->qb_count += bytecnt; 3065 qbp->qb_mblkcnt += mblkcnt; 3066 if ((qbp->qb_count >= qbp->qb_hiwat) || 3067 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 3068 qbp->qb_flag |= QB_FULL; 3069 } 3070 } else { 3071 q->q_count += bytecnt; 3072 q->q_mblkcnt += mblkcnt; 3073 if ((q->q_count >= q->q_hiwat) || 3074 (q->q_mblkcnt >= q->q_hiwat)) { 3075 q->q_flag |= QFULL; 3076 } 3077 } 3078 3079 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0); 3080 3081 if (canenable(q) && (q->q_flag & QWANTR)) 3082 qenable_locked(q); 3083 3084 ASSERT(MUTEX_HELD(QLOCK(q))); 3085 if (freezer != curthread) 3086 mutex_exit(QLOCK(q)); 3087 3088 return (1); 3089 } 3090 3091 /* 3092 * Create and put a control message on queue. 3093 */ 3094 int 3095 putctl(queue_t *q, int type) 3096 { 3097 mblk_t *bp; 3098 3099 if ((datamsg(type) && (type != M_DELAY)) || 3100 (bp = allocb_tryhard(0)) == NULL) 3101 return (0); 3102 bp->b_datap->db_type = (unsigned char) type; 3103 3104 put(q, bp); 3105 3106 return (1); 3107 } 3108 3109 /* 3110 * Control message with a single-byte parameter 3111 */ 3112 int 3113 putctl1(queue_t *q, int type, int param) 3114 { 3115 mblk_t *bp; 3116 3117 if ((datamsg(type) && (type != M_DELAY)) || 3118 (bp = allocb_tryhard(1)) == NULL) 3119 return (0); 3120 bp->b_datap->db_type = (unsigned char)type; 3121 *bp->b_wptr++ = (unsigned char)param; 3122 3123 put(q, bp); 3124 3125 return (1); 3126 } 3127 3128 int 3129 putnextctl1(queue_t *q, int type, int param) 3130 { 3131 mblk_t *bp; 3132 3133 if ((datamsg(type) && (type != M_DELAY)) || 3134 ((bp = allocb_tryhard(1)) == NULL)) 3135 return (0); 3136 3137 bp->b_datap->db_type = (unsigned char)type; 3138 *bp->b_wptr++ = (unsigned char)param; 3139 3140 putnext(q, bp); 3141 3142 return (1); 3143 } 3144 3145 int 3146 putnextctl(queue_t *q, int type) 3147 { 3148 mblk_t *bp; 3149 3150 if ((datamsg(type) && (type != M_DELAY)) || 3151 ((bp = allocb_tryhard(0)) == NULL)) 3152 return (0); 3153 bp->b_datap->db_type = (unsigned char)type; 3154 3155 putnext(q, bp); 3156 3157 return (1); 3158 } 3159 3160 /* 3161 * Return the queue upstream from this one 3162 */ 3163 queue_t * 3164 backq(queue_t *q) 3165 { 3166 q = _OTHERQ(q); 3167 if (q->q_next) { 3168 q = q->q_next; 3169 return (_OTHERQ(q)); 3170 } 3171 return (NULL); 3172 } 3173 3174 /* 3175 * Send a block back up the queue in reverse from this 3176 * one (e.g. to respond to ioctls) 3177 */ 3178 void 3179 qreply(queue_t *q, mblk_t *bp) 3180 { 3181 ASSERT(q && bp); 3182 3183 putnext(_OTHERQ(q), bp); 3184 } 3185 3186 /* 3187 * Streams Queue Scheduling 3188 * 3189 * Queues are enabled through qenable() when they have messages to 3190 * process. They are serviced by queuerun(), which runs each enabled 3191 * queue's service procedure. The call to queuerun() is processor 3192 * dependent - the general principle is that it be run whenever a queue 3193 * is enabled but before returning to user level. For system calls, 3194 * the function runqueues() is called if their action causes a queue 3195 * to be enabled. For device interrupts, queuerun() should be 3196 * called before returning from the last level of interrupt. Beyond 3197 * this, no timing assumptions should be made about queue scheduling. 3198 */ 3199 3200 /* 3201 * Enable a queue: put it on list of those whose service procedures are 3202 * ready to run and set up the scheduling mechanism. 3203 * The broadcast is done outside the mutex -> to avoid the woken thread 3204 * from contending with the mutex. This is OK 'cos the queue has been 3205 * enqueued on the runlist and flagged safely at this point. 3206 */ 3207 void 3208 qenable(queue_t *q) 3209 { 3210 mutex_enter(QLOCK(q)); 3211 qenable_locked(q); 3212 mutex_exit(QLOCK(q)); 3213 } 3214 /* 3215 * Return number of messages on queue 3216 */ 3217 int 3218 qsize(queue_t *qp) 3219 { 3220 int count = 0; 3221 mblk_t *mp; 3222 3223 mutex_enter(QLOCK(qp)); 3224 for (mp = qp->q_first; mp; mp = mp->b_next) 3225 count++; 3226 mutex_exit(QLOCK(qp)); 3227 return (count); 3228 } 3229 3230 /* 3231 * noenable - set queue so that putq() will not enable it. 3232 * enableok - set queue so that putq() can enable it. 3233 */ 3234 void 3235 noenable(queue_t *q) 3236 { 3237 mutex_enter(QLOCK(q)); 3238 q->q_flag |= QNOENB; 3239 mutex_exit(QLOCK(q)); 3240 } 3241 3242 void 3243 enableok(queue_t *q) 3244 { 3245 mutex_enter(QLOCK(q)); 3246 q->q_flag &= ~QNOENB; 3247 mutex_exit(QLOCK(q)); 3248 } 3249 3250 /* 3251 * Set queue fields. 3252 */ 3253 int 3254 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 3255 { 3256 qband_t *qbp = NULL; 3257 queue_t *wrq; 3258 int error = 0; 3259 kthread_id_t freezer; 3260 3261 freezer = STREAM(q)->sd_freezer; 3262 if (freezer == curthread) { 3263 ASSERT(frozenstr(q)); 3264 ASSERT(MUTEX_HELD(QLOCK(q))); 3265 } else 3266 mutex_enter(QLOCK(q)); 3267 3268 if (what >= QBAD) { 3269 error = EINVAL; 3270 goto done; 3271 } 3272 if (pri != 0) { 3273 int i; 3274 qband_t **qbpp; 3275 3276 if (pri > q->q_nband) { 3277 qbpp = &q->q_bandp; 3278 while (*qbpp) 3279 qbpp = &(*qbpp)->qb_next; 3280 while (pri > q->q_nband) { 3281 if ((*qbpp = allocband()) == NULL) { 3282 error = EAGAIN; 3283 goto done; 3284 } 3285 (*qbpp)->qb_hiwat = q->q_hiwat; 3286 (*qbpp)->qb_lowat = q->q_lowat; 3287 q->q_nband++; 3288 qbpp = &(*qbpp)->qb_next; 3289 } 3290 } 3291 qbp = q->q_bandp; 3292 i = pri; 3293 while (--i) 3294 qbp = qbp->qb_next; 3295 } 3296 switch (what) { 3297 3298 case QHIWAT: 3299 if (qbp) 3300 qbp->qb_hiwat = (size_t)val; 3301 else 3302 q->q_hiwat = (size_t)val; 3303 break; 3304 3305 case QLOWAT: 3306 if (qbp) 3307 qbp->qb_lowat = (size_t)val; 3308 else 3309 q->q_lowat = (size_t)val; 3310 break; 3311 3312 case QMAXPSZ: 3313 if (qbp) 3314 error = EINVAL; 3315 else 3316 q->q_maxpsz = (ssize_t)val; 3317 3318 /* 3319 * Performance concern, strwrite looks at the module below 3320 * the stream head for the maxpsz each time it does a write 3321 * we now cache it at the stream head. Check to see if this 3322 * queue is sitting directly below the stream head. 3323 */ 3324 wrq = STREAM(q)->sd_wrq; 3325 if (q != wrq->q_next) 3326 break; 3327 3328 /* 3329 * If the stream is not frozen drop the current QLOCK and 3330 * acquire the sd_wrq QLOCK which protects sd_qn_* 3331 */ 3332 if (freezer != curthread) { 3333 mutex_exit(QLOCK(q)); 3334 mutex_enter(QLOCK(wrq)); 3335 } 3336 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3337 3338 if (strmsgsz != 0) { 3339 if (val == INFPSZ) 3340 val = strmsgsz; 3341 else { 3342 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3343 val = MIN(PIPE_BUF, val); 3344 else 3345 val = MIN(strmsgsz, val); 3346 } 3347 } 3348 STREAM(q)->sd_qn_maxpsz = val; 3349 if (freezer != curthread) { 3350 mutex_exit(QLOCK(wrq)); 3351 mutex_enter(QLOCK(q)); 3352 } 3353 break; 3354 3355 case QMINPSZ: 3356 if (qbp) 3357 error = EINVAL; 3358 else 3359 q->q_minpsz = (ssize_t)val; 3360 3361 /* 3362 * Performance concern, strwrite looks at the module below 3363 * the stream head for the maxpsz each time it does a write 3364 * we now cache it at the stream head. Check to see if this 3365 * queue is sitting directly below the stream head. 3366 */ 3367 wrq = STREAM(q)->sd_wrq; 3368 if (q != wrq->q_next) 3369 break; 3370 3371 /* 3372 * If the stream is not frozen drop the current QLOCK and 3373 * acquire the sd_wrq QLOCK which protects sd_qn_* 3374 */ 3375 if (freezer != curthread) { 3376 mutex_exit(QLOCK(q)); 3377 mutex_enter(QLOCK(wrq)); 3378 } 3379 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3380 3381 if (freezer != curthread) { 3382 mutex_exit(QLOCK(wrq)); 3383 mutex_enter(QLOCK(q)); 3384 } 3385 break; 3386 3387 case QSTRUIOT: 3388 if (qbp) 3389 error = EINVAL; 3390 else 3391 q->q_struiot = (ushort_t)val; 3392 break; 3393 3394 case QCOUNT: 3395 case QFIRST: 3396 case QLAST: 3397 case QFLAG: 3398 error = EPERM; 3399 break; 3400 3401 default: 3402 error = EINVAL; 3403 break; 3404 } 3405 done: 3406 if (freezer != curthread) 3407 mutex_exit(QLOCK(q)); 3408 return (error); 3409 } 3410 3411 /* 3412 * Get queue fields. 3413 */ 3414 int 3415 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3416 { 3417 qband_t *qbp = NULL; 3418 int error = 0; 3419 kthread_id_t freezer; 3420 3421 freezer = STREAM(q)->sd_freezer; 3422 if (freezer == curthread) { 3423 ASSERT(frozenstr(q)); 3424 ASSERT(MUTEX_HELD(QLOCK(q))); 3425 } else 3426 mutex_enter(QLOCK(q)); 3427 if (what >= QBAD) { 3428 error = EINVAL; 3429 goto done; 3430 } 3431 if (pri != 0) { 3432 int i; 3433 qband_t **qbpp; 3434 3435 if (pri > q->q_nband) { 3436 qbpp = &q->q_bandp; 3437 while (*qbpp) 3438 qbpp = &(*qbpp)->qb_next; 3439 while (pri > q->q_nband) { 3440 if ((*qbpp = allocband()) == NULL) { 3441 error = EAGAIN; 3442 goto done; 3443 } 3444 (*qbpp)->qb_hiwat = q->q_hiwat; 3445 (*qbpp)->qb_lowat = q->q_lowat; 3446 q->q_nband++; 3447 qbpp = &(*qbpp)->qb_next; 3448 } 3449 } 3450 qbp = q->q_bandp; 3451 i = pri; 3452 while (--i) 3453 qbp = qbp->qb_next; 3454 } 3455 switch (what) { 3456 case QHIWAT: 3457 if (qbp) 3458 *(size_t *)valp = qbp->qb_hiwat; 3459 else 3460 *(size_t *)valp = q->q_hiwat; 3461 break; 3462 3463 case QLOWAT: 3464 if (qbp) 3465 *(size_t *)valp = qbp->qb_lowat; 3466 else 3467 *(size_t *)valp = q->q_lowat; 3468 break; 3469 3470 case QMAXPSZ: 3471 if (qbp) 3472 error = EINVAL; 3473 else 3474 *(ssize_t *)valp = q->q_maxpsz; 3475 break; 3476 3477 case QMINPSZ: 3478 if (qbp) 3479 error = EINVAL; 3480 else 3481 *(ssize_t *)valp = q->q_minpsz; 3482 break; 3483 3484 case QCOUNT: 3485 if (qbp) 3486 *(size_t *)valp = qbp->qb_count; 3487 else 3488 *(size_t *)valp = q->q_count; 3489 break; 3490 3491 case QFIRST: 3492 if (qbp) 3493 *(mblk_t **)valp = qbp->qb_first; 3494 else 3495 *(mblk_t **)valp = q->q_first; 3496 break; 3497 3498 case QLAST: 3499 if (qbp) 3500 *(mblk_t **)valp = qbp->qb_last; 3501 else 3502 *(mblk_t **)valp = q->q_last; 3503 break; 3504 3505 case QFLAG: 3506 if (qbp) 3507 *(uint_t *)valp = qbp->qb_flag; 3508 else 3509 *(uint_t *)valp = q->q_flag; 3510 break; 3511 3512 case QSTRUIOT: 3513 if (qbp) 3514 error = EINVAL; 3515 else 3516 *(short *)valp = q->q_struiot; 3517 break; 3518 3519 default: 3520 error = EINVAL; 3521 break; 3522 } 3523 done: 3524 if (freezer != curthread) 3525 mutex_exit(QLOCK(q)); 3526 return (error); 3527 } 3528 3529 /* 3530 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3531 * QWANTWSYNC or QWANTR or QWANTW, 3532 * 3533 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3534 * deferred wakeup will be done. Also if strpoll() in progress then a 3535 * deferred pollwakeup will be done. 3536 */ 3537 void 3538 strwakeq(queue_t *q, int flag) 3539 { 3540 stdata_t *stp = STREAM(q); 3541 pollhead_t *pl; 3542 3543 mutex_enter(&stp->sd_lock); 3544 pl = &stp->sd_pollist; 3545 if (flag & QWANTWSYNC) { 3546 ASSERT(!(q->q_flag & QREADR)); 3547 if (stp->sd_flag & WSLEEP) { 3548 stp->sd_flag &= ~WSLEEP; 3549 cv_broadcast(&stp->sd_wrq->q_wait); 3550 } else { 3551 stp->sd_wakeq |= WSLEEP; 3552 } 3553 3554 mutex_exit(&stp->sd_lock); 3555 pollwakeup(pl, POLLWRNORM); 3556 mutex_enter(&stp->sd_lock); 3557 3558 if (stp->sd_sigflags & S_WRNORM) 3559 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3560 } else if (flag & QWANTR) { 3561 if (stp->sd_flag & RSLEEP) { 3562 stp->sd_flag &= ~RSLEEP; 3563 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3564 } else { 3565 stp->sd_wakeq |= RSLEEP; 3566 } 3567 3568 mutex_exit(&stp->sd_lock); 3569 pollwakeup(pl, POLLIN | POLLRDNORM); 3570 mutex_enter(&stp->sd_lock); 3571 3572 { 3573 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3574 3575 if (events) 3576 strsendsig(stp->sd_siglist, events, 0, 0); 3577 } 3578 } else { 3579 if (stp->sd_flag & WSLEEP) { 3580 stp->sd_flag &= ~WSLEEP; 3581 cv_broadcast(&stp->sd_wrq->q_wait); 3582 } 3583 3584 mutex_exit(&stp->sd_lock); 3585 pollwakeup(pl, POLLWRNORM); 3586 mutex_enter(&stp->sd_lock); 3587 3588 if (stp->sd_sigflags & S_WRNORM) 3589 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3590 } 3591 mutex_exit(&stp->sd_lock); 3592 } 3593 3594 int 3595 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3596 { 3597 stdata_t *stp = STREAM(q); 3598 int typ = STRUIOT_STANDARD; 3599 uio_t *uiop = &dp->d_uio; 3600 dblk_t *dbp; 3601 ssize_t uiocnt; 3602 ssize_t cnt; 3603 unsigned char *ptr; 3604 ssize_t resid; 3605 int error = 0; 3606 on_trap_data_t otd; 3607 queue_t *stwrq; 3608 3609 /* 3610 * Plumbing may change while taking the type so store the 3611 * queue in a temporary variable. It doesn't matter even 3612 * if the we take the type from the previous plumbing, 3613 * that's because if the plumbing has changed when we were 3614 * holding the queue in a temporary variable, we can continue 3615 * processing the message the way it would have been processed 3616 * in the old plumbing, without any side effects but a bit 3617 * extra processing for partial ip header checksum. 3618 * 3619 * This has been done to avoid holding the sd_lock which is 3620 * very hot. 3621 */ 3622 3623 stwrq = stp->sd_struiowrq; 3624 if (stwrq) 3625 typ = stwrq->q_struiot; 3626 3627 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3628 dbp = mp->b_datap; 3629 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3630 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3631 cnt = MIN(uiocnt, uiop->uio_resid); 3632 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3633 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3634 /* 3635 * Either this mblk has already been processed 3636 * or there is no more room in this mblk (?). 3637 */ 3638 continue; 3639 } 3640 switch (typ) { 3641 case STRUIOT_STANDARD: 3642 if (noblock) { 3643 if (on_trap(&otd, OT_DATA_ACCESS)) { 3644 no_trap(); 3645 error = EWOULDBLOCK; 3646 goto out; 3647 } 3648 } 3649 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3650 if (noblock) 3651 no_trap(); 3652 goto out; 3653 } 3654 if (noblock) 3655 no_trap(); 3656 break; 3657 3658 default: 3659 error = EIO; 3660 goto out; 3661 } 3662 dbp->db_struioflag |= STRUIO_DONE; 3663 dbp->db_cksumstuff += cnt; 3664 } 3665 out: 3666 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3667 /* 3668 * A fault has occured and some bytes were moved to the 3669 * current mblk, the uio_t has already been updated by 3670 * the appropriate uio routine, so also update the mblk 3671 * to reflect this in case this same mblk chain is used 3672 * again (after the fault has been handled). 3673 */ 3674 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3675 if (uiocnt >= resid) 3676 dbp->db_cksumstuff += resid; 3677 } 3678 return (error); 3679 } 3680 3681 /* 3682 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3683 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3684 * that removeq() will not try to close the queue while a thread is inside the 3685 * queue. 3686 */ 3687 static boolean_t 3688 rwnext_enter(queue_t *qp) 3689 { 3690 mutex_enter(QLOCK(qp)); 3691 if (qp->q_flag & QWCLOSE) { 3692 mutex_exit(QLOCK(qp)); 3693 return (B_FALSE); 3694 } 3695 qp->q_rwcnt++; 3696 ASSERT(qp->q_rwcnt != 0); 3697 mutex_exit(QLOCK(qp)); 3698 return (B_TRUE); 3699 } 3700 3701 /* 3702 * Decrease the count of threads running in sync stream queue and wake up any 3703 * threads blocked in removeq(). 3704 */ 3705 static void 3706 rwnext_exit(queue_t *qp) 3707 { 3708 mutex_enter(QLOCK(qp)); 3709 qp->q_rwcnt--; 3710 if (qp->q_flag & QWANTRMQSYNC) { 3711 qp->q_flag &= ~QWANTRMQSYNC; 3712 cv_broadcast(&qp->q_wait); 3713 } 3714 mutex_exit(QLOCK(qp)); 3715 } 3716 3717 /* 3718 * The purpose of rwnext() is to call the rw procedure of the next 3719 * (downstream) modules queue. 3720 * 3721 * treated as put entrypoint for perimeter syncronization. 3722 * 3723 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3724 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3725 * not matter if any regular put entrypoints have been already entered. We 3726 * can't increment one of the sq_putcounts (instead of sq_count) because 3727 * qwait_rw won't know which counter to decrement. 3728 * 3729 * It would be reasonable to add the lockless FASTPUT logic. 3730 */ 3731 int 3732 rwnext(queue_t *qp, struiod_t *dp) 3733 { 3734 queue_t *nqp; 3735 syncq_t *sq; 3736 uint16_t count; 3737 uint16_t flags; 3738 struct qinit *qi; 3739 int (*proc)(); 3740 struct stdata *stp; 3741 int isread; 3742 int rval; 3743 3744 stp = STREAM(qp); 3745 /* 3746 * Prevent q_next from changing by holding sd_lock until acquiring 3747 * SQLOCK. Note that a read-side rwnext from the streamhead will 3748 * already have sd_lock acquired. In either case sd_lock is always 3749 * released after acquiring SQLOCK. 3750 * 3751 * The streamhead read-side holding sd_lock when calling rwnext is 3752 * required to prevent a race condition were M_DATA mblks flowing 3753 * up the read-side of the stream could be bypassed by a rwnext() 3754 * down-call. In this case sd_lock acts as the streamhead perimeter. 3755 */ 3756 if ((nqp = _WR(qp)) == qp) { 3757 isread = 0; 3758 mutex_enter(&stp->sd_lock); 3759 qp = nqp->q_next; 3760 } else { 3761 isread = 1; 3762 if (nqp != stp->sd_wrq) 3763 /* Not streamhead */ 3764 mutex_enter(&stp->sd_lock); 3765 qp = _RD(nqp->q_next); 3766 } 3767 qi = qp->q_qinfo; 3768 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3769 /* 3770 * Not a synchronous module or no r/w procedure for this 3771 * queue, so just return EINVAL and let the caller handle it. 3772 */ 3773 mutex_exit(&stp->sd_lock); 3774 return (EINVAL); 3775 } 3776 3777 if (rwnext_enter(qp) == B_FALSE) { 3778 mutex_exit(&stp->sd_lock); 3779 return (EINVAL); 3780 } 3781 3782 sq = qp->q_syncq; 3783 mutex_enter(SQLOCK(sq)); 3784 mutex_exit(&stp->sd_lock); 3785 count = sq->sq_count; 3786 flags = sq->sq_flags; 3787 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3788 3789 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3790 /* 3791 * if this queue is being closed, return. 3792 */ 3793 if (qp->q_flag & QWCLOSE) { 3794 mutex_exit(SQLOCK(sq)); 3795 rwnext_exit(qp); 3796 return (EINVAL); 3797 } 3798 3799 /* 3800 * Wait until we can enter the inner perimeter. 3801 */ 3802 sq->sq_flags = flags | SQ_WANTWAKEUP; 3803 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3804 count = sq->sq_count; 3805 flags = sq->sq_flags; 3806 } 3807 3808 if (isread == 0 && stp->sd_struiowrq == NULL || 3809 isread == 1 && stp->sd_struiordq == NULL) { 3810 /* 3811 * Stream plumbing changed while waiting for inner perimeter 3812 * so just return EINVAL and let the caller handle it. 3813 */ 3814 mutex_exit(SQLOCK(sq)); 3815 rwnext_exit(qp); 3816 return (EINVAL); 3817 } 3818 if (!(flags & SQ_CIPUT)) 3819 sq->sq_flags = flags | SQ_EXCL; 3820 sq->sq_count = count + 1; 3821 ASSERT(sq->sq_count != 0); /* Wraparound */ 3822 /* 3823 * Note: The only message ordering guarantee that rwnext() makes is 3824 * for the write queue flow-control case. All others (r/w queue 3825 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3826 * the queue's rw procedure. This could be genralized here buy 3827 * running the queue's service procedure, but that wouldn't be 3828 * the most efficent for all cases. 3829 */ 3830 mutex_exit(SQLOCK(sq)); 3831 if (! isread && (qp->q_flag & QFULL)) { 3832 /* 3833 * Write queue may be flow controlled. If so, 3834 * mark the queue for wakeup when it's not. 3835 */ 3836 mutex_enter(QLOCK(qp)); 3837 if (qp->q_flag & QFULL) { 3838 qp->q_flag |= QWANTWSYNC; 3839 mutex_exit(QLOCK(qp)); 3840 rval = EWOULDBLOCK; 3841 goto out; 3842 } 3843 mutex_exit(QLOCK(qp)); 3844 } 3845 3846 if (! isread && dp->d_mp) 3847 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3848 dp->d_mp->b_datap->db_base); 3849 3850 rval = (*proc)(qp, dp); 3851 3852 if (isread && dp->d_mp) 3853 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3854 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3855 out: 3856 /* 3857 * The queue is protected from being freed by sq_count, so it is 3858 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3859 */ 3860 rwnext_exit(qp); 3861 3862 mutex_enter(SQLOCK(sq)); 3863 flags = sq->sq_flags; 3864 ASSERT(sq->sq_count != 0); 3865 sq->sq_count--; 3866 if (flags & SQ_TAIL) { 3867 putnext_tail(sq, qp, flags); 3868 /* 3869 * The only purpose of this ASSERT is to preserve calling stack 3870 * in DEBUG kernel. 3871 */ 3872 ASSERT(flags & SQ_TAIL); 3873 return (rval); 3874 } 3875 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3876 /* 3877 * Safe to always drop SQ_EXCL: 3878 * Not SQ_CIPUT means we set SQ_EXCL above 3879 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3880 * did a qwriter(INNER) in which case nobody else 3881 * is in the inner perimeter and we are exiting. 3882 * 3883 * I would like to make the following assertion: 3884 * 3885 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3886 * sq->sq_count == 0); 3887 * 3888 * which indicates that if we are both putshared and exclusive, 3889 * we became exclusive while executing the putproc, and the only 3890 * claim on the syncq was the one we dropped a few lines above. 3891 * But other threads that enter putnext while the syncq is exclusive 3892 * need to make a claim as they may need to drop SQLOCK in the 3893 * has_writers case to avoid deadlocks. If these threads are 3894 * delayed or preempted, it is possible that the writer thread can 3895 * find out that there are other claims making the (sq_count == 0) 3896 * test invalid. 3897 */ 3898 3899 sq->sq_flags = flags & ~SQ_EXCL; 3900 if (sq->sq_flags & SQ_WANTWAKEUP) { 3901 sq->sq_flags &= ~SQ_WANTWAKEUP; 3902 cv_broadcast(&sq->sq_wait); 3903 } 3904 mutex_exit(SQLOCK(sq)); 3905 return (rval); 3906 } 3907 3908 /* 3909 * The purpose of infonext() is to call the info procedure of the next 3910 * (downstream) modules queue. 3911 * 3912 * treated as put entrypoint for perimeter syncronization. 3913 * 3914 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3915 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3916 * it does not matter if any regular put entrypoints have been already 3917 * entered. 3918 */ 3919 int 3920 infonext(queue_t *qp, infod_t *idp) 3921 { 3922 queue_t *nqp; 3923 syncq_t *sq; 3924 uint16_t count; 3925 uint16_t flags; 3926 struct qinit *qi; 3927 int (*proc)(); 3928 struct stdata *stp; 3929 int rval; 3930 3931 stp = STREAM(qp); 3932 /* 3933 * Prevent q_next from changing by holding sd_lock until 3934 * acquiring SQLOCK. 3935 */ 3936 mutex_enter(&stp->sd_lock); 3937 if ((nqp = _WR(qp)) == qp) { 3938 qp = nqp->q_next; 3939 } else { 3940 qp = _RD(nqp->q_next); 3941 } 3942 qi = qp->q_qinfo; 3943 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3944 mutex_exit(&stp->sd_lock); 3945 return (EINVAL); 3946 } 3947 sq = qp->q_syncq; 3948 mutex_enter(SQLOCK(sq)); 3949 mutex_exit(&stp->sd_lock); 3950 count = sq->sq_count; 3951 flags = sq->sq_flags; 3952 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3953 3954 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3955 /* 3956 * Wait until we can enter the inner perimeter. 3957 */ 3958 sq->sq_flags = flags | SQ_WANTWAKEUP; 3959 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3960 count = sq->sq_count; 3961 flags = sq->sq_flags; 3962 } 3963 3964 if (! (flags & SQ_CIPUT)) 3965 sq->sq_flags = flags | SQ_EXCL; 3966 sq->sq_count = count + 1; 3967 ASSERT(sq->sq_count != 0); /* Wraparound */ 3968 mutex_exit(SQLOCK(sq)); 3969 3970 rval = (*proc)(qp, idp); 3971 3972 mutex_enter(SQLOCK(sq)); 3973 flags = sq->sq_flags; 3974 ASSERT(sq->sq_count != 0); 3975 sq->sq_count--; 3976 if (flags & SQ_TAIL) { 3977 putnext_tail(sq, qp, flags); 3978 /* 3979 * The only purpose of this ASSERT is to preserve calling stack 3980 * in DEBUG kernel. 3981 */ 3982 ASSERT(flags & SQ_TAIL); 3983 return (rval); 3984 } 3985 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3986 /* 3987 * XXXX 3988 * I am not certain the next comment is correct here. I need to consider 3989 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 3990 * might cause other problems. It just might be safer to drop it if 3991 * !SQ_CIPUT because that is when we set it. 3992 */ 3993 /* 3994 * Safe to always drop SQ_EXCL: 3995 * Not SQ_CIPUT means we set SQ_EXCL above 3996 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3997 * did a qwriter(INNER) in which case nobody else 3998 * is in the inner perimeter and we are exiting. 3999 * 4000 * I would like to make the following assertion: 4001 * 4002 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 4003 * sq->sq_count == 0); 4004 * 4005 * which indicates that if we are both putshared and exclusive, 4006 * we became exclusive while executing the putproc, and the only 4007 * claim on the syncq was the one we dropped a few lines above. 4008 * But other threads that enter putnext while the syncq is exclusive 4009 * need to make a claim as they may need to drop SQLOCK in the 4010 * has_writers case to avoid deadlocks. If these threads are 4011 * delayed or preempted, it is possible that the writer thread can 4012 * find out that there are other claims making the (sq_count == 0) 4013 * test invalid. 4014 */ 4015 4016 sq->sq_flags = flags & ~SQ_EXCL; 4017 mutex_exit(SQLOCK(sq)); 4018 return (rval); 4019 } 4020 4021 /* 4022 * Return nonzero if the queue is responsible for struio(), else return 0. 4023 */ 4024 int 4025 isuioq(queue_t *q) 4026 { 4027 if (q->q_flag & QREADR) 4028 return (STREAM(q)->sd_struiordq == q); 4029 else 4030 return (STREAM(q)->sd_struiowrq == q); 4031 } 4032 4033 #if defined(__sparc) 4034 int disable_putlocks = 0; 4035 #else 4036 int disable_putlocks = 1; 4037 #endif 4038 4039 /* 4040 * called by create_putlock. 4041 */ 4042 static void 4043 create_syncq_putlocks(queue_t *q) 4044 { 4045 syncq_t *sq = q->q_syncq; 4046 ciputctrl_t *cip; 4047 int i; 4048 4049 ASSERT(sq != NULL); 4050 4051 ASSERT(disable_putlocks == 0); 4052 ASSERT(n_ciputctrl >= min_n_ciputctrl); 4053 ASSERT(ciputctrl_cache != NULL); 4054 4055 if (!(sq->sq_type & SQ_CIPUT)) 4056 return; 4057 4058 for (i = 0; i <= 1; i++) { 4059 if (sq->sq_ciputctrl == NULL) { 4060 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4061 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4062 mutex_enter(SQLOCK(sq)); 4063 if (sq->sq_ciputctrl != NULL) { 4064 mutex_exit(SQLOCK(sq)); 4065 kmem_cache_free(ciputctrl_cache, cip); 4066 } else { 4067 ASSERT(sq->sq_nciputctrl == 0); 4068 sq->sq_nciputctrl = n_ciputctrl - 1; 4069 /* 4070 * putnext checks sq_ciputctrl without holding 4071 * SQLOCK. if it is not NULL putnext assumes 4072 * sq_nciputctrl is initialized. membar below 4073 * insures that. 4074 */ 4075 membar_producer(); 4076 sq->sq_ciputctrl = cip; 4077 mutex_exit(SQLOCK(sq)); 4078 } 4079 } 4080 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 4081 if (i == 1) 4082 break; 4083 q = _OTHERQ(q); 4084 if (!(q->q_flag & QPERQ)) { 4085 ASSERT(sq == q->q_syncq); 4086 break; 4087 } 4088 ASSERT(q->q_syncq != NULL); 4089 ASSERT(sq != q->q_syncq); 4090 sq = q->q_syncq; 4091 ASSERT(sq->sq_type & SQ_CIPUT); 4092 } 4093 } 4094 4095 /* 4096 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 4097 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 4098 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 4099 * starting from q and down to the driver. 4100 * 4101 * This should be called after the affected queues are part of stream 4102 * geometry. It should be called from driver/module open routine after 4103 * qprocson() call. It is also called from nfs syscall where it is known that 4104 * stream is configured and won't change its geometry during create_putlock 4105 * call. 4106 * 4107 * caller normally uses 0 value for the stream argument to speed up MT putnext 4108 * into the perimeter of q for example because its perimeter is per module 4109 * (e.g. IP). 4110 * 4111 * caller normally uses non 0 value for the stream argument to hint the system 4112 * that the stream of q is a very contended global system stream 4113 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 4114 * particularly MT hot. 4115 * 4116 * Caller insures stream plumbing won't happen while we are here and therefore 4117 * q_next can be safely used. 4118 */ 4119 4120 void 4121 create_putlocks(queue_t *q, int stream) 4122 { 4123 ciputctrl_t *cip; 4124 struct stdata *stp = STREAM(q); 4125 4126 q = _WR(q); 4127 ASSERT(stp != NULL); 4128 4129 if (disable_putlocks != 0) 4130 return; 4131 4132 if (n_ciputctrl < min_n_ciputctrl) 4133 return; 4134 4135 ASSERT(ciputctrl_cache != NULL); 4136 4137 if (stream != 0 && stp->sd_ciputctrl == NULL) { 4138 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4139 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4140 mutex_enter(&stp->sd_lock); 4141 if (stp->sd_ciputctrl != NULL) { 4142 mutex_exit(&stp->sd_lock); 4143 kmem_cache_free(ciputctrl_cache, cip); 4144 } else { 4145 ASSERT(stp->sd_nciputctrl == 0); 4146 stp->sd_nciputctrl = n_ciputctrl - 1; 4147 /* 4148 * putnext checks sd_ciputctrl without holding 4149 * sd_lock. if it is not NULL putnext assumes 4150 * sd_nciputctrl is initialized. membar below 4151 * insures that. 4152 */ 4153 membar_producer(); 4154 stp->sd_ciputctrl = cip; 4155 mutex_exit(&stp->sd_lock); 4156 } 4157 } 4158 4159 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 4160 4161 while (_SAMESTR(q)) { 4162 create_syncq_putlocks(q); 4163 if (stream == 0) 4164 return; 4165 q = q->q_next; 4166 } 4167 ASSERT(q != NULL); 4168 create_syncq_putlocks(q); 4169 } 4170 4171 /* 4172 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 4173 * through a stream. 4174 * 4175 * Data currently record per-event is a timestamp, module/driver name, 4176 * downstream module/driver name, optional callstack, event type and a per 4177 * type datum. Much of the STREAMS framework is instrumented for automatic 4178 * flow tracing (when enabled). Events can be defined and used by STREAMS 4179 * modules and drivers. 4180 * 4181 * Global objects: 4182 * 4183 * str_ftevent() - Add a flow-trace event to a dblk. 4184 * str_ftfree() - Free flow-trace data 4185 * 4186 * Local objects: 4187 * 4188 * fthdr_cache - pointer to the kmem cache for trace header. 4189 * ftblk_cache - pointer to the kmem cache for trace data blocks. 4190 */ 4191 4192 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 4193 int str_ftstack = 0; /* Don't record event call stacks */ 4194 4195 void 4196 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 4197 { 4198 ftblk_t *bp = hp->tail; 4199 ftblk_t *nbp; 4200 ftevnt_t *ep; 4201 int ix, nix; 4202 4203 ASSERT(hp != NULL); 4204 4205 for (;;) { 4206 if ((ix = bp->ix) == FTBLK_EVNTS) { 4207 /* 4208 * Tail doesn't have room, so need a new tail. 4209 * 4210 * To make this MT safe, first, allocate a new 4211 * ftblk, and initialize it. To make life a 4212 * little easier, reserve the first slot (mostly 4213 * by making ix = 1). When we are finished with 4214 * the initialization, CAS this pointer to the 4215 * tail. If this succeeds, this is the new 4216 * "next" block. Otherwise, another thread 4217 * got here first, so free the block and start 4218 * again. 4219 */ 4220 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP); 4221 if (nbp == NULL) { 4222 /* no mem, so punt */ 4223 str_ftnever++; 4224 /* free up all flow data? */ 4225 return; 4226 } 4227 nbp->nxt = NULL; 4228 nbp->ix = 1; 4229 /* 4230 * Just in case there is another thread about 4231 * to get the next index, we need to make sure 4232 * the value is there for it. 4233 */ 4234 membar_producer(); 4235 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) { 4236 /* CAS was successful */ 4237 bp->nxt = nbp; 4238 membar_producer(); 4239 bp = nbp; 4240 ix = 0; 4241 goto cas_good; 4242 } else { 4243 kmem_cache_free(ftblk_cache, nbp); 4244 bp = hp->tail; 4245 continue; 4246 } 4247 } 4248 nix = ix + 1; 4249 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) { 4250 cas_good: 4251 if (curthread != hp->thread) { 4252 hp->thread = curthread; 4253 evnt |= FTEV_CS; 4254 } 4255 if (CPU->cpu_seqid != hp->cpu_seqid) { 4256 hp->cpu_seqid = CPU->cpu_seqid; 4257 evnt |= FTEV_PS; 4258 } 4259 ep = &bp->ev[ix]; 4260 break; 4261 } 4262 } 4263 4264 if (evnt & FTEV_QMASK) { 4265 queue_t *qp = p; 4266 4267 if (!(qp->q_flag & QREADR)) 4268 evnt |= FTEV_ISWR; 4269 4270 ep->mid = Q2NAME(qp); 4271 4272 /* 4273 * We only record the next queue name for FTEV_PUTNEXT since 4274 * that's the only time we *really* need it, and the putnext() 4275 * code ensures that qp->q_next won't vanish. (We could use 4276 * claimstr()/releasestr() but at a performance cost.) 4277 */ 4278 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL) 4279 ep->midnext = Q2NAME(qp->q_next); 4280 else 4281 ep->midnext = NULL; 4282 } else { 4283 ep->mid = p; 4284 ep->midnext = NULL; 4285 } 4286 4287 if (ep->stk != NULL) 4288 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH); 4289 4290 ep->ts = gethrtime(); 4291 ep->evnt = evnt; 4292 ep->data = data; 4293 hp->hash = (hp->hash << 9) + hp->hash; 4294 hp->hash += (evnt << 16) | data; 4295 hp->hash += (uintptr_t)ep->mid; 4296 } 4297 4298 /* 4299 * Free flow-trace data. 4300 */ 4301 void 4302 str_ftfree(dblk_t *dbp) 4303 { 4304 fthdr_t *hp = dbp->db_fthdr; 4305 ftblk_t *bp = &hp->first; 4306 ftblk_t *nbp; 4307 4308 if (bp != hp->tail || bp->ix != 0) { 4309 /* 4310 * Clear out the hash, have the tail point to itself, and free 4311 * any continuation blocks. 4312 */ 4313 bp = hp->first.nxt; 4314 hp->tail = &hp->first; 4315 hp->hash = 0; 4316 hp->first.nxt = NULL; 4317 hp->first.ix = 0; 4318 while (bp != NULL) { 4319 nbp = bp->nxt; 4320 kmem_cache_free(ftblk_cache, bp); 4321 bp = nbp; 4322 } 4323 } 4324 kmem_cache_free(fthdr_cache, hp); 4325 dbp->db_fthdr = NULL; 4326 } 4327