1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 /* 25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 */ 28 29 #include <sys/types.h> 30 #include <sys/param.h> 31 #include <sys/thread.h> 32 #include <sys/sysmacros.h> 33 #include <sys/stropts.h> 34 #include <sys/stream.h> 35 #include <sys/strsubr.h> 36 #include <sys/strsun.h> 37 #include <sys/conf.h> 38 #include <sys/debug.h> 39 #include <sys/cmn_err.h> 40 #include <sys/kmem.h> 41 #include <sys/atomic.h> 42 #include <sys/errno.h> 43 #include <sys/vtrace.h> 44 #include <sys/ftrace.h> 45 #include <sys/ontrap.h> 46 #include <sys/multidata.h> 47 #include <sys/multidata_impl.h> 48 #include <sys/sdt.h> 49 #include <sys/strft.h> 50 51 #ifdef DEBUG 52 #include <sys/kmem_impl.h> 53 #endif 54 55 /* 56 * This file contains all the STREAMS utility routines that may 57 * be used by modules and drivers. 58 */ 59 60 /* 61 * STREAMS message allocator: principles of operation 62 * 63 * The streams message allocator consists of all the routines that 64 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 65 * dupb(), freeb() and freemsg(). What follows is a high-level view 66 * of how the allocator works. 67 * 68 * Every streams message consists of one or more mblks, a dblk, and data. 69 * All mblks for all types of messages come from a common mblk_cache. 70 * The dblk and data come in several flavors, depending on how the 71 * message is allocated: 72 * 73 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 74 * fixed-size dblk/data caches. For message sizes that are multiples of 75 * PAGESIZE, dblks are allocated separately from the buffer. 76 * The associated buffer is allocated by the constructor using kmem_alloc(). 77 * For all other message sizes, dblk and its associated data is allocated 78 * as a single contiguous chunk of memory. 79 * Objects in these caches consist of a dblk plus its associated data. 80 * allocb() determines the nearest-size cache by table lookup: 81 * the dblk_cache[] array provides the mapping from size to dblk cache. 82 * 83 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 84 * kmem_alloc()'ing a buffer for the data and supplying that 85 * buffer to gesballoc(), described below. 86 * 87 * (3) The four flavors of [d]esballoc[a] are all implemented by a 88 * common routine, gesballoc() ("generic esballoc"). gesballoc() 89 * allocates a dblk from the global dblk_esb_cache and sets db_base, 90 * db_lim and db_frtnp to describe the caller-supplied buffer. 91 * 92 * While there are several routines to allocate messages, there is only 93 * one routine to free messages: freeb(). freeb() simply invokes the 94 * dblk's free method, dbp->db_free(), which is set at allocation time. 95 * 96 * dupb() creates a new reference to a message by allocating a new mblk, 97 * incrementing the dblk reference count and setting the dblk's free 98 * method to dblk_decref(). The dblk's original free method is retained 99 * in db_lastfree. dblk_decref() decrements the reference count on each 100 * freeb(). If this is not the last reference it just frees the mblk; 101 * if this *is* the last reference, it restores db_free to db_lastfree, 102 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 103 * 104 * The implementation makes aggressive use of kmem object caching for 105 * maximum performance. This makes the code simple and compact, but 106 * also a bit abstruse in some places. The invariants that constitute a 107 * message's constructed state, described below, are more subtle than usual. 108 * 109 * Every dblk has an "attached mblk" as part of its constructed state. 110 * The mblk is allocated by the dblk's constructor and remains attached 111 * until the message is either dup'ed or pulled up. In the dupb() case 112 * the mblk association doesn't matter until the last free, at which time 113 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 114 * the mblk association because it swaps the leading mblks of two messages, 115 * so it is responsible for swapping their db_mblk pointers accordingly. 116 * From a constructed-state viewpoint it doesn't matter that a dblk's 117 * attached mblk can change while the message is allocated; all that 118 * matters is that the dblk has *some* attached mblk when it's freed. 119 * 120 * The sizes of the allocb() small-message caches are not magical. 121 * They represent a good trade-off between internal and external 122 * fragmentation for current workloads. They should be reevaluated 123 * periodically, especially if allocations larger than DBLK_MAX_CACHE 124 * become common. We use 64-byte alignment so that dblks don't 125 * straddle cache lines unnecessarily. 126 */ 127 #define DBLK_MAX_CACHE 73728 128 #define DBLK_CACHE_ALIGN 64 129 #define DBLK_MIN_SIZE 8 130 #define DBLK_SIZE_SHIFT 3 131 132 #ifdef _BIG_ENDIAN 133 #define DBLK_RTFU_SHIFT(field) \ 134 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 135 #else 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 138 #endif 139 140 #define DBLK_RTFU(ref, type, flags, uioflag) \ 141 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 142 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 143 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 144 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 145 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 146 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 148 149 static size_t dblk_sizes[] = { 150 #ifdef _LP64 151 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856, 152 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624, 153 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392, 154 #else 155 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904, 156 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672, 157 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440, 158 #endif 159 DBLK_MAX_CACHE, 0 160 }; 161 162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 163 static struct kmem_cache *mblk_cache; 164 static struct kmem_cache *dblk_esb_cache; 165 static struct kmem_cache *fthdr_cache; 166 static struct kmem_cache *ftblk_cache; 167 168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 169 static mblk_t *allocb_oversize(size_t size, int flags); 170 static int allocb_tryhard_fails; 171 static void frnop_func(void *arg); 172 frtn_t frnop = { frnop_func }; 173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 174 175 static boolean_t rwnext_enter(queue_t *qp); 176 static void rwnext_exit(queue_t *qp); 177 178 /* 179 * Patchable mblk/dblk kmem_cache flags. 180 */ 181 int dblk_kmem_flags = 0; 182 int mblk_kmem_flags = 0; 183 184 static int 185 dblk_constructor(void *buf, void *cdrarg, int kmflags) 186 { 187 dblk_t *dbp = buf; 188 ssize_t msg_size = (ssize_t)cdrarg; 189 size_t index; 190 191 ASSERT(msg_size != 0); 192 193 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 194 195 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 196 197 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 198 return (-1); 199 if ((msg_size & PAGEOFFSET) == 0) { 200 dbp->db_base = kmem_alloc(msg_size, kmflags); 201 if (dbp->db_base == NULL) { 202 kmem_cache_free(mblk_cache, dbp->db_mblk); 203 return (-1); 204 } 205 } else { 206 dbp->db_base = (unsigned char *)&dbp[1]; 207 } 208 209 dbp->db_mblk->b_datap = dbp; 210 dbp->db_cache = dblk_cache[index]; 211 dbp->db_lim = dbp->db_base + msg_size; 212 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 213 dbp->db_frtnp = NULL; 214 dbp->db_fthdr = NULL; 215 dbp->db_credp = NULL; 216 dbp->db_cpid = -1; 217 dbp->db_struioflag = 0; 218 dbp->db_struioun.cksum.flags = 0; 219 return (0); 220 } 221 222 /*ARGSUSED*/ 223 static int 224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 225 { 226 dblk_t *dbp = buf; 227 228 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 229 return (-1); 230 dbp->db_mblk->b_datap = dbp; 231 dbp->db_cache = dblk_esb_cache; 232 dbp->db_fthdr = NULL; 233 dbp->db_credp = NULL; 234 dbp->db_cpid = -1; 235 dbp->db_struioflag = 0; 236 dbp->db_struioun.cksum.flags = 0; 237 return (0); 238 } 239 240 static int 241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 242 { 243 dblk_t *dbp = buf; 244 bcache_t *bcp = cdrarg; 245 246 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 247 return (-1); 248 249 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags); 250 if (dbp->db_base == NULL) { 251 kmem_cache_free(mblk_cache, dbp->db_mblk); 252 return (-1); 253 } 254 255 dbp->db_mblk->b_datap = dbp; 256 dbp->db_cache = (void *)bcp; 257 dbp->db_lim = dbp->db_base + bcp->size; 258 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 259 dbp->db_frtnp = NULL; 260 dbp->db_fthdr = NULL; 261 dbp->db_credp = NULL; 262 dbp->db_cpid = -1; 263 dbp->db_struioflag = 0; 264 dbp->db_struioun.cksum.flags = 0; 265 return (0); 266 } 267 268 /*ARGSUSED*/ 269 static void 270 dblk_destructor(void *buf, void *cdrarg) 271 { 272 dblk_t *dbp = buf; 273 ssize_t msg_size = (ssize_t)cdrarg; 274 275 ASSERT(dbp->db_mblk->b_datap == dbp); 276 ASSERT(msg_size != 0); 277 ASSERT(dbp->db_struioflag == 0); 278 ASSERT(dbp->db_struioun.cksum.flags == 0); 279 280 if ((msg_size & PAGEOFFSET) == 0) { 281 kmem_free(dbp->db_base, msg_size); 282 } 283 284 kmem_cache_free(mblk_cache, dbp->db_mblk); 285 } 286 287 static void 288 bcache_dblk_destructor(void *buf, void *cdrarg) 289 { 290 dblk_t *dbp = buf; 291 bcache_t *bcp = cdrarg; 292 293 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 294 295 ASSERT(dbp->db_mblk->b_datap == dbp); 296 ASSERT(dbp->db_struioflag == 0); 297 ASSERT(dbp->db_struioun.cksum.flags == 0); 298 299 kmem_cache_free(mblk_cache, dbp->db_mblk); 300 } 301 302 /* ARGSUSED */ 303 static int 304 ftblk_constructor(void *buf, void *cdrarg, int kmflags) 305 { 306 ftblk_t *fbp = buf; 307 int i; 308 309 bzero(fbp, sizeof (ftblk_t)); 310 if (str_ftstack != 0) { 311 for (i = 0; i < FTBLK_EVNTS; i++) 312 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags); 313 } 314 315 return (0); 316 } 317 318 /* ARGSUSED */ 319 static void 320 ftblk_destructor(void *buf, void *cdrarg) 321 { 322 ftblk_t *fbp = buf; 323 int i; 324 325 if (str_ftstack != 0) { 326 for (i = 0; i < FTBLK_EVNTS; i++) { 327 if (fbp->ev[i].stk != NULL) { 328 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t)); 329 fbp->ev[i].stk = NULL; 330 } 331 } 332 } 333 } 334 335 static int 336 fthdr_constructor(void *buf, void *cdrarg, int kmflags) 337 { 338 fthdr_t *fhp = buf; 339 340 return (ftblk_constructor(&fhp->first, cdrarg, kmflags)); 341 } 342 343 static void 344 fthdr_destructor(void *buf, void *cdrarg) 345 { 346 fthdr_t *fhp = buf; 347 348 ftblk_destructor(&fhp->first, cdrarg); 349 } 350 351 void 352 streams_msg_init(void) 353 { 354 char name[40]; 355 size_t size; 356 size_t lastsize = DBLK_MIN_SIZE; 357 size_t *sizep; 358 struct kmem_cache *cp; 359 size_t tot_size; 360 int offset; 361 362 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32, 363 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags); 364 365 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 366 367 if ((offset = (size & PAGEOFFSET)) != 0) { 368 /* 369 * We are in the middle of a page, dblk should 370 * be allocated on the same page 371 */ 372 tot_size = size + sizeof (dblk_t); 373 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 374 < PAGESIZE); 375 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 376 377 } else { 378 379 /* 380 * buf size is multiple of page size, dblk and 381 * buffer are allocated separately. 382 */ 383 384 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 385 tot_size = sizeof (dblk_t); 386 } 387 388 (void) sprintf(name, "streams_dblk_%ld", size); 389 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN, 390 dblk_constructor, dblk_destructor, NULL, (void *)(size), 391 NULL, dblk_kmem_flags); 392 393 while (lastsize <= size) { 394 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 395 lastsize += DBLK_MIN_SIZE; 396 } 397 } 398 399 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t), 400 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL, 401 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags); 402 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32, 403 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0); 404 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32, 405 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0); 406 407 /* Initialize Multidata caches */ 408 mmd_init(); 409 410 /* initialize throttling queue for esballoc */ 411 esballoc_queue_init(); 412 } 413 414 /*ARGSUSED*/ 415 mblk_t * 416 allocb(size_t size, uint_t pri) 417 { 418 dblk_t *dbp; 419 mblk_t *mp; 420 size_t index; 421 422 index = (size - 1) >> DBLK_SIZE_SHIFT; 423 424 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 425 if (size != 0) { 426 mp = allocb_oversize(size, KM_NOSLEEP); 427 goto out; 428 } 429 index = 0; 430 } 431 432 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 433 mp = NULL; 434 goto out; 435 } 436 437 mp = dbp->db_mblk; 438 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 439 mp->b_next = mp->b_prev = mp->b_cont = NULL; 440 mp->b_rptr = mp->b_wptr = dbp->db_base; 441 mp->b_queue = NULL; 442 MBLK_BAND_FLAG_WORD(mp) = 0; 443 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 444 out: 445 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 446 447 return (mp); 448 } 449 450 /* 451 * Allocate an mblk taking db_credp and db_cpid from the template. 452 * Allow the cred to be NULL. 453 */ 454 mblk_t * 455 allocb_tmpl(size_t size, const mblk_t *tmpl) 456 { 457 mblk_t *mp = allocb(size, 0); 458 459 if (mp != NULL) { 460 dblk_t *src = tmpl->b_datap; 461 dblk_t *dst = mp->b_datap; 462 cred_t *cr; 463 pid_t cpid; 464 465 cr = msg_getcred(tmpl, &cpid); 466 if (cr != NULL) 467 crhold(dst->db_credp = cr); 468 dst->db_cpid = cpid; 469 dst->db_type = src->db_type; 470 } 471 return (mp); 472 } 473 474 mblk_t * 475 allocb_cred(size_t size, cred_t *cr, pid_t cpid) 476 { 477 mblk_t *mp = allocb(size, 0); 478 479 ASSERT(cr != NULL); 480 if (mp != NULL) { 481 dblk_t *dbp = mp->b_datap; 482 483 crhold(dbp->db_credp = cr); 484 dbp->db_cpid = cpid; 485 } 486 return (mp); 487 } 488 489 mblk_t * 490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid) 491 { 492 mblk_t *mp = allocb_wait(size, 0, flags, error); 493 494 ASSERT(cr != NULL); 495 if (mp != NULL) { 496 dblk_t *dbp = mp->b_datap; 497 498 crhold(dbp->db_credp = cr); 499 dbp->db_cpid = cpid; 500 } 501 502 return (mp); 503 } 504 505 /* 506 * Extract the db_cred (and optionally db_cpid) from a message. 507 * We find the first mblk which has a non-NULL db_cred and use that. 508 * If none found we return NULL. 509 * Does NOT get a hold on the cred. 510 */ 511 cred_t * 512 msg_getcred(const mblk_t *mp, pid_t *cpidp) 513 { 514 cred_t *cr = NULL; 515 cred_t *cr2; 516 mblk_t *mp2; 517 518 while (mp != NULL) { 519 dblk_t *dbp = mp->b_datap; 520 521 cr = dbp->db_credp; 522 if (cr == NULL) { 523 mp = mp->b_cont; 524 continue; 525 } 526 if (cpidp != NULL) 527 *cpidp = dbp->db_cpid; 528 529 #ifdef DEBUG 530 /* 531 * Normally there should at most one db_credp in a message. 532 * But if there are multiple (as in the case of some M_IOC* 533 * and some internal messages in TCP/IP bind logic) then 534 * they must be identical in the normal case. 535 * However, a socket can be shared between different uids 536 * in which case data queued in TCP would be from different 537 * creds. Thus we can only assert for the zoneid being the 538 * same. Due to Multi-level Level Ports for TX, some 539 * cred_t can have a NULL cr_zone, and we skip the comparison 540 * in that case. 541 */ 542 mp2 = mp->b_cont; 543 while (mp2 != NULL) { 544 cr2 = DB_CRED(mp2); 545 if (cr2 != NULL) { 546 DTRACE_PROBE2(msg__getcred, 547 cred_t *, cr, cred_t *, cr2); 548 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 549 crgetzone(cr) == NULL || 550 crgetzone(cr2) == NULL); 551 } 552 mp2 = mp2->b_cont; 553 } 554 #endif 555 return (cr); 556 } 557 if (cpidp != NULL) 558 *cpidp = NOPID; 559 return (NULL); 560 } 561 562 /* 563 * Variant of msg_getcred which, when a cred is found 564 * 1. Returns with a hold on the cred 565 * 2. Clears the first cred in the mblk. 566 * This is more efficient to use than a msg_getcred() + crhold() when 567 * the message is freed after the cred has been extracted. 568 * 569 * The caller is responsible for ensuring that there is no other reference 570 * on the message since db_credp can not be cleared when there are other 571 * references. 572 */ 573 cred_t * 574 msg_extractcred(mblk_t *mp, pid_t *cpidp) 575 { 576 cred_t *cr = NULL; 577 cred_t *cr2; 578 mblk_t *mp2; 579 580 while (mp != NULL) { 581 dblk_t *dbp = mp->b_datap; 582 583 cr = dbp->db_credp; 584 if (cr == NULL) { 585 mp = mp->b_cont; 586 continue; 587 } 588 ASSERT(dbp->db_ref == 1); 589 dbp->db_credp = NULL; 590 if (cpidp != NULL) 591 *cpidp = dbp->db_cpid; 592 #ifdef DEBUG 593 /* 594 * Normally there should at most one db_credp in a message. 595 * But if there are multiple (as in the case of some M_IOC* 596 * and some internal messages in TCP/IP bind logic) then 597 * they must be identical in the normal case. 598 * However, a socket can be shared between different uids 599 * in which case data queued in TCP would be from different 600 * creds. Thus we can only assert for the zoneid being the 601 * same. Due to Multi-level Level Ports for TX, some 602 * cred_t can have a NULL cr_zone, and we skip the comparison 603 * in that case. 604 */ 605 mp2 = mp->b_cont; 606 while (mp2 != NULL) { 607 cr2 = DB_CRED(mp2); 608 if (cr2 != NULL) { 609 DTRACE_PROBE2(msg__extractcred, 610 cred_t *, cr, cred_t *, cr2); 611 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 612 crgetzone(cr) == NULL || 613 crgetzone(cr2) == NULL); 614 } 615 mp2 = mp2->b_cont; 616 } 617 #endif 618 return (cr); 619 } 620 return (NULL); 621 } 622 /* 623 * Get the label for a message. Uses the first mblk in the message 624 * which has a non-NULL db_credp. 625 * Returns NULL if there is no credp. 626 */ 627 extern struct ts_label_s * 628 msg_getlabel(const mblk_t *mp) 629 { 630 cred_t *cr = msg_getcred(mp, NULL); 631 632 if (cr == NULL) 633 return (NULL); 634 635 return (crgetlabel(cr)); 636 } 637 638 void 639 freeb(mblk_t *mp) 640 { 641 dblk_t *dbp = mp->b_datap; 642 643 ASSERT(dbp->db_ref > 0); 644 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 645 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 646 647 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 648 649 dbp->db_free(mp, dbp); 650 } 651 652 void 653 freemsg(mblk_t *mp) 654 { 655 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 656 while (mp) { 657 dblk_t *dbp = mp->b_datap; 658 mblk_t *mp_cont = mp->b_cont; 659 660 ASSERT(dbp->db_ref > 0); 661 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 662 663 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 664 665 dbp->db_free(mp, dbp); 666 mp = mp_cont; 667 } 668 } 669 670 /* 671 * Reallocate a block for another use. Try hard to use the old block. 672 * If the old data is wanted (copy), leave b_wptr at the end of the data, 673 * otherwise return b_wptr = b_rptr. 674 * 675 * This routine is private and unstable. 676 */ 677 mblk_t * 678 reallocb(mblk_t *mp, size_t size, uint_t copy) 679 { 680 mblk_t *mp1; 681 unsigned char *old_rptr; 682 ptrdiff_t cur_size; 683 684 if (mp == NULL) 685 return (allocb(size, BPRI_HI)); 686 687 cur_size = mp->b_wptr - mp->b_rptr; 688 old_rptr = mp->b_rptr; 689 690 ASSERT(mp->b_datap->db_ref != 0); 691 692 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 693 /* 694 * If the data is wanted and it will fit where it is, no 695 * work is required. 696 */ 697 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 698 return (mp); 699 700 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 701 mp1 = mp; 702 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 703 /* XXX other mp state could be copied too, db_flags ... ? */ 704 mp1->b_cont = mp->b_cont; 705 } else { 706 return (NULL); 707 } 708 709 if (copy) { 710 bcopy(old_rptr, mp1->b_rptr, cur_size); 711 mp1->b_wptr = mp1->b_rptr + cur_size; 712 } 713 714 if (mp != mp1) 715 freeb(mp); 716 717 return (mp1); 718 } 719 720 static void 721 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 722 { 723 ASSERT(dbp->db_mblk == mp); 724 if (dbp->db_fthdr != NULL) 725 str_ftfree(dbp); 726 727 /* set credp and projid to be 'unspecified' before returning to cache */ 728 if (dbp->db_credp != NULL) { 729 crfree(dbp->db_credp); 730 dbp->db_credp = NULL; 731 } 732 dbp->db_cpid = -1; 733 734 /* Reset the struioflag and the checksum flag fields */ 735 dbp->db_struioflag = 0; 736 dbp->db_struioun.cksum.flags = 0; 737 738 /* and the COOKED and/or UIOA flag(s) */ 739 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA); 740 741 kmem_cache_free(dbp->db_cache, dbp); 742 } 743 744 static void 745 dblk_decref(mblk_t *mp, dblk_t *dbp) 746 { 747 if (dbp->db_ref != 1) { 748 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 749 -(1 << DBLK_RTFU_SHIFT(db_ref))); 750 /* 751 * atomic_add_32_nv() just decremented db_ref, so we no longer 752 * have a reference to the dblk, which means another thread 753 * could free it. Therefore we cannot examine the dblk to 754 * determine whether ours was the last reference. Instead, 755 * we extract the new and minimum reference counts from rtfu. 756 * Note that all we're really saying is "if (ref != refmin)". 757 */ 758 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 759 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 760 kmem_cache_free(mblk_cache, mp); 761 return; 762 } 763 } 764 dbp->db_mblk = mp; 765 dbp->db_free = dbp->db_lastfree; 766 dbp->db_lastfree(mp, dbp); 767 } 768 769 mblk_t * 770 dupb(mblk_t *mp) 771 { 772 dblk_t *dbp = mp->b_datap; 773 mblk_t *new_mp; 774 uint32_t oldrtfu, newrtfu; 775 776 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 777 goto out; 778 779 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 780 new_mp->b_rptr = mp->b_rptr; 781 new_mp->b_wptr = mp->b_wptr; 782 new_mp->b_datap = dbp; 783 new_mp->b_queue = NULL; 784 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 785 786 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 787 788 dbp->db_free = dblk_decref; 789 do { 790 ASSERT(dbp->db_ref > 0); 791 oldrtfu = DBLK_RTFU_WORD(dbp); 792 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 793 /* 794 * If db_ref is maxed out we can't dup this message anymore. 795 */ 796 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 797 kmem_cache_free(mblk_cache, new_mp); 798 new_mp = NULL; 799 goto out; 800 } 801 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != 802 oldrtfu); 803 804 out: 805 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 806 return (new_mp); 807 } 808 809 static void 810 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 811 { 812 frtn_t *frp = dbp->db_frtnp; 813 814 ASSERT(dbp->db_mblk == mp); 815 frp->free_func(frp->free_arg); 816 if (dbp->db_fthdr != NULL) 817 str_ftfree(dbp); 818 819 /* set credp and projid to be 'unspecified' before returning to cache */ 820 if (dbp->db_credp != NULL) { 821 crfree(dbp->db_credp); 822 dbp->db_credp = NULL; 823 } 824 dbp->db_cpid = -1; 825 dbp->db_struioflag = 0; 826 dbp->db_struioun.cksum.flags = 0; 827 828 kmem_cache_free(dbp->db_cache, dbp); 829 } 830 831 /*ARGSUSED*/ 832 static void 833 frnop_func(void *arg) 834 { 835 } 836 837 /* 838 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 839 */ 840 static mblk_t * 841 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 842 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 843 { 844 dblk_t *dbp; 845 mblk_t *mp; 846 847 ASSERT(base != NULL && frp != NULL); 848 849 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 850 mp = NULL; 851 goto out; 852 } 853 854 mp = dbp->db_mblk; 855 dbp->db_base = base; 856 dbp->db_lim = base + size; 857 dbp->db_free = dbp->db_lastfree = lastfree; 858 dbp->db_frtnp = frp; 859 DBLK_RTFU_WORD(dbp) = db_rtfu; 860 mp->b_next = mp->b_prev = mp->b_cont = NULL; 861 mp->b_rptr = mp->b_wptr = base; 862 mp->b_queue = NULL; 863 MBLK_BAND_FLAG_WORD(mp) = 0; 864 865 out: 866 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 867 return (mp); 868 } 869 870 /*ARGSUSED*/ 871 mblk_t * 872 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 873 { 874 mblk_t *mp; 875 876 /* 877 * Note that this is structured to allow the common case (i.e. 878 * STREAMS flowtracing disabled) to call gesballoc() with tail 879 * call optimization. 880 */ 881 if (!str_ftnever) { 882 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 883 frp, freebs_enqueue, KM_NOSLEEP); 884 885 if (mp != NULL) 886 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 887 return (mp); 888 } 889 890 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 891 frp, freebs_enqueue, KM_NOSLEEP)); 892 } 893 894 /* 895 * Same as esballoc() but sleeps waiting for memory. 896 */ 897 /*ARGSUSED*/ 898 mblk_t * 899 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 900 { 901 mblk_t *mp; 902 903 /* 904 * Note that this is structured to allow the common case (i.e. 905 * STREAMS flowtracing disabled) to call gesballoc() with tail 906 * call optimization. 907 */ 908 if (!str_ftnever) { 909 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 910 frp, freebs_enqueue, KM_SLEEP); 911 912 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 913 return (mp); 914 } 915 916 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 917 frp, freebs_enqueue, KM_SLEEP)); 918 } 919 920 /*ARGSUSED*/ 921 mblk_t * 922 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 923 { 924 mblk_t *mp; 925 926 /* 927 * Note that this is structured to allow the common case (i.e. 928 * STREAMS flowtracing disabled) to call gesballoc() with tail 929 * call optimization. 930 */ 931 if (!str_ftnever) { 932 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 933 frp, dblk_lastfree_desb, KM_NOSLEEP); 934 935 if (mp != NULL) 936 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 937 return (mp); 938 } 939 940 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 941 frp, dblk_lastfree_desb, KM_NOSLEEP)); 942 } 943 944 /*ARGSUSED*/ 945 mblk_t * 946 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 947 { 948 mblk_t *mp; 949 950 /* 951 * Note that this is structured to allow the common case (i.e. 952 * STREAMS flowtracing disabled) to call gesballoc() with tail 953 * call optimization. 954 */ 955 if (!str_ftnever) { 956 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 957 frp, freebs_enqueue, KM_NOSLEEP); 958 959 if (mp != NULL) 960 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 961 return (mp); 962 } 963 964 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 965 frp, freebs_enqueue, KM_NOSLEEP)); 966 } 967 968 /*ARGSUSED*/ 969 mblk_t * 970 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 971 { 972 mblk_t *mp; 973 974 /* 975 * Note that this is structured to allow the common case (i.e. 976 * STREAMS flowtracing disabled) to call gesballoc() with tail 977 * call optimization. 978 */ 979 if (!str_ftnever) { 980 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 981 frp, dblk_lastfree_desb, KM_NOSLEEP); 982 983 if (mp != NULL) 984 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 985 return (mp); 986 } 987 988 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 989 frp, dblk_lastfree_desb, KM_NOSLEEP)); 990 } 991 992 static void 993 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 994 { 995 bcache_t *bcp = dbp->db_cache; 996 997 ASSERT(dbp->db_mblk == mp); 998 if (dbp->db_fthdr != NULL) 999 str_ftfree(dbp); 1000 1001 /* set credp and projid to be 'unspecified' before returning to cache */ 1002 if (dbp->db_credp != NULL) { 1003 crfree(dbp->db_credp); 1004 dbp->db_credp = NULL; 1005 } 1006 dbp->db_cpid = -1; 1007 dbp->db_struioflag = 0; 1008 dbp->db_struioun.cksum.flags = 0; 1009 1010 mutex_enter(&bcp->mutex); 1011 kmem_cache_free(bcp->dblk_cache, dbp); 1012 bcp->alloc--; 1013 1014 if (bcp->alloc == 0 && bcp->destroy != 0) { 1015 kmem_cache_destroy(bcp->dblk_cache); 1016 kmem_cache_destroy(bcp->buffer_cache); 1017 mutex_exit(&bcp->mutex); 1018 mutex_destroy(&bcp->mutex); 1019 kmem_free(bcp, sizeof (bcache_t)); 1020 } else { 1021 mutex_exit(&bcp->mutex); 1022 } 1023 } 1024 1025 bcache_t * 1026 bcache_create(char *name, size_t size, uint_t align) 1027 { 1028 bcache_t *bcp; 1029 char buffer[255]; 1030 1031 ASSERT((align & (align - 1)) == 0); 1032 1033 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL) 1034 return (NULL); 1035 1036 bcp->size = size; 1037 bcp->align = align; 1038 bcp->alloc = 0; 1039 bcp->destroy = 0; 1040 1041 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 1042 1043 (void) sprintf(buffer, "%s_buffer_cache", name); 1044 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 1045 NULL, NULL, NULL, 0); 1046 (void) sprintf(buffer, "%s_dblk_cache", name); 1047 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 1048 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 1049 NULL, (void *)bcp, NULL, 0); 1050 1051 return (bcp); 1052 } 1053 1054 void 1055 bcache_destroy(bcache_t *bcp) 1056 { 1057 ASSERT(bcp != NULL); 1058 1059 mutex_enter(&bcp->mutex); 1060 if (bcp->alloc == 0) { 1061 kmem_cache_destroy(bcp->dblk_cache); 1062 kmem_cache_destroy(bcp->buffer_cache); 1063 mutex_exit(&bcp->mutex); 1064 mutex_destroy(&bcp->mutex); 1065 kmem_free(bcp, sizeof (bcache_t)); 1066 } else { 1067 bcp->destroy++; 1068 mutex_exit(&bcp->mutex); 1069 } 1070 } 1071 1072 /*ARGSUSED*/ 1073 mblk_t * 1074 bcache_allocb(bcache_t *bcp, uint_t pri) 1075 { 1076 dblk_t *dbp; 1077 mblk_t *mp = NULL; 1078 1079 ASSERT(bcp != NULL); 1080 1081 mutex_enter(&bcp->mutex); 1082 if (bcp->destroy != 0) { 1083 mutex_exit(&bcp->mutex); 1084 goto out; 1085 } 1086 1087 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 1088 mutex_exit(&bcp->mutex); 1089 goto out; 1090 } 1091 bcp->alloc++; 1092 mutex_exit(&bcp->mutex); 1093 1094 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 1095 1096 mp = dbp->db_mblk; 1097 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1098 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1099 mp->b_rptr = mp->b_wptr = dbp->db_base; 1100 mp->b_queue = NULL; 1101 MBLK_BAND_FLAG_WORD(mp) = 0; 1102 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 1103 out: 1104 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 1105 1106 return (mp); 1107 } 1108 1109 static void 1110 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 1111 { 1112 ASSERT(dbp->db_mblk == mp); 1113 if (dbp->db_fthdr != NULL) 1114 str_ftfree(dbp); 1115 1116 /* set credp and projid to be 'unspecified' before returning to cache */ 1117 if (dbp->db_credp != NULL) { 1118 crfree(dbp->db_credp); 1119 dbp->db_credp = NULL; 1120 } 1121 dbp->db_cpid = -1; 1122 dbp->db_struioflag = 0; 1123 dbp->db_struioun.cksum.flags = 0; 1124 1125 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 1126 kmem_cache_free(dbp->db_cache, dbp); 1127 } 1128 1129 static mblk_t * 1130 allocb_oversize(size_t size, int kmflags) 1131 { 1132 mblk_t *mp; 1133 void *buf; 1134 1135 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 1136 if ((buf = kmem_alloc(size, kmflags)) == NULL) 1137 return (NULL); 1138 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 1139 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 1140 kmem_free(buf, size); 1141 1142 if (mp != NULL) 1143 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 1144 1145 return (mp); 1146 } 1147 1148 mblk_t * 1149 allocb_tryhard(size_t target_size) 1150 { 1151 size_t size; 1152 mblk_t *bp; 1153 1154 for (size = target_size; size < target_size + 512; 1155 size += DBLK_CACHE_ALIGN) 1156 if ((bp = allocb(size, BPRI_HI)) != NULL) 1157 return (bp); 1158 allocb_tryhard_fails++; 1159 return (NULL); 1160 } 1161 1162 /* 1163 * This routine is consolidation private for STREAMS internal use 1164 * This routine may only be called from sync routines (i.e., not 1165 * from put or service procedures). It is located here (rather 1166 * than strsubr.c) so that we don't have to expose all of the 1167 * allocb() implementation details in header files. 1168 */ 1169 mblk_t * 1170 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 1171 { 1172 dblk_t *dbp; 1173 mblk_t *mp; 1174 size_t index; 1175 1176 index = (size -1) >> DBLK_SIZE_SHIFT; 1177 1178 if (flags & STR_NOSIG) { 1179 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 1180 if (size != 0) { 1181 mp = allocb_oversize(size, KM_SLEEP); 1182 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1183 (uintptr_t)mp); 1184 return (mp); 1185 } 1186 index = 0; 1187 } 1188 1189 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1190 mp = dbp->db_mblk; 1191 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1192 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1193 mp->b_rptr = mp->b_wptr = dbp->db_base; 1194 mp->b_queue = NULL; 1195 MBLK_BAND_FLAG_WORD(mp) = 0; 1196 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1197 1198 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1199 1200 } else { 1201 while ((mp = allocb(size, pri)) == NULL) { 1202 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1203 return (NULL); 1204 } 1205 } 1206 1207 return (mp); 1208 } 1209 1210 /* 1211 * Call function 'func' with 'arg' when a class zero block can 1212 * be allocated with priority 'pri'. 1213 */ 1214 bufcall_id_t 1215 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1216 { 1217 return (bufcall(1, pri, func, arg)); 1218 } 1219 1220 /* 1221 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1222 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1223 * This provides consistency for all internal allocators of ioctl. 1224 */ 1225 mblk_t * 1226 mkiocb(uint_t cmd) 1227 { 1228 struct iocblk *ioc; 1229 mblk_t *mp; 1230 1231 /* 1232 * Allocate enough space for any of the ioctl related messages. 1233 */ 1234 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1235 return (NULL); 1236 1237 bzero(mp->b_rptr, sizeof (union ioctypes)); 1238 1239 /* 1240 * Set the mblk_t information and ptrs correctly. 1241 */ 1242 mp->b_wptr += sizeof (struct iocblk); 1243 mp->b_datap->db_type = M_IOCTL; 1244 1245 /* 1246 * Fill in the fields. 1247 */ 1248 ioc = (struct iocblk *)mp->b_rptr; 1249 ioc->ioc_cmd = cmd; 1250 ioc->ioc_cr = kcred; 1251 ioc->ioc_id = getiocseqno(); 1252 ioc->ioc_flag = IOC_NATIVE; 1253 return (mp); 1254 } 1255 1256 /* 1257 * test if block of given size can be allocated with a request of 1258 * the given priority. 1259 * 'pri' is no longer used, but is retained for compatibility. 1260 */ 1261 /* ARGSUSED */ 1262 int 1263 testb(size_t size, uint_t pri) 1264 { 1265 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1266 } 1267 1268 /* 1269 * Call function 'func' with argument 'arg' when there is a reasonably 1270 * good chance that a block of size 'size' can be allocated. 1271 * 'pri' is no longer used, but is retained for compatibility. 1272 */ 1273 /* ARGSUSED */ 1274 bufcall_id_t 1275 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1276 { 1277 static long bid = 1; /* always odd to save checking for zero */ 1278 bufcall_id_t bc_id; 1279 struct strbufcall *bcp; 1280 1281 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1282 return (0); 1283 1284 bcp->bc_func = func; 1285 bcp->bc_arg = arg; 1286 bcp->bc_size = size; 1287 bcp->bc_next = NULL; 1288 bcp->bc_executor = NULL; 1289 1290 mutex_enter(&strbcall_lock); 1291 /* 1292 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1293 * should be no references to bcp since it may be freed by 1294 * runbufcalls(). Since bcp_id field is returned, we save its value in 1295 * the local var. 1296 */ 1297 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1298 1299 /* 1300 * add newly allocated stream event to existing 1301 * linked list of events. 1302 */ 1303 if (strbcalls.bc_head == NULL) { 1304 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1305 } else { 1306 strbcalls.bc_tail->bc_next = bcp; 1307 strbcalls.bc_tail = bcp; 1308 } 1309 1310 cv_signal(&strbcall_cv); 1311 mutex_exit(&strbcall_lock); 1312 return (bc_id); 1313 } 1314 1315 /* 1316 * Cancel a bufcall request. 1317 */ 1318 void 1319 unbufcall(bufcall_id_t id) 1320 { 1321 strbufcall_t *bcp, *pbcp; 1322 1323 mutex_enter(&strbcall_lock); 1324 again: 1325 pbcp = NULL; 1326 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1327 if (id == bcp->bc_id) 1328 break; 1329 pbcp = bcp; 1330 } 1331 if (bcp) { 1332 if (bcp->bc_executor != NULL) { 1333 if (bcp->bc_executor != curthread) { 1334 cv_wait(&bcall_cv, &strbcall_lock); 1335 goto again; 1336 } 1337 } else { 1338 if (pbcp) 1339 pbcp->bc_next = bcp->bc_next; 1340 else 1341 strbcalls.bc_head = bcp->bc_next; 1342 if (bcp == strbcalls.bc_tail) 1343 strbcalls.bc_tail = pbcp; 1344 kmem_free(bcp, sizeof (strbufcall_t)); 1345 } 1346 } 1347 mutex_exit(&strbcall_lock); 1348 } 1349 1350 /* 1351 * Duplicate a message block by block (uses dupb), returning 1352 * a pointer to the duplicate message. 1353 * Returns a non-NULL value only if the entire message 1354 * was dup'd. 1355 */ 1356 mblk_t * 1357 dupmsg(mblk_t *bp) 1358 { 1359 mblk_t *head, *nbp; 1360 1361 if (!bp || !(nbp = head = dupb(bp))) 1362 return (NULL); 1363 1364 while (bp->b_cont) { 1365 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1366 freemsg(head); 1367 return (NULL); 1368 } 1369 nbp = nbp->b_cont; 1370 bp = bp->b_cont; 1371 } 1372 return (head); 1373 } 1374 1375 #define DUPB_NOLOAN(bp) \ 1376 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1377 copyb((bp)) : dupb((bp))) 1378 1379 mblk_t * 1380 dupmsg_noloan(mblk_t *bp) 1381 { 1382 mblk_t *head, *nbp; 1383 1384 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1385 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1386 return (NULL); 1387 1388 while (bp->b_cont) { 1389 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1390 freemsg(head); 1391 return (NULL); 1392 } 1393 nbp = nbp->b_cont; 1394 bp = bp->b_cont; 1395 } 1396 return (head); 1397 } 1398 1399 /* 1400 * Copy data from message and data block to newly allocated message and 1401 * data block. Returns new message block pointer, or NULL if error. 1402 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1403 * as in the original even when db_base is not word aligned. (bug 1052877) 1404 */ 1405 mblk_t * 1406 copyb(mblk_t *bp) 1407 { 1408 mblk_t *nbp; 1409 dblk_t *dp, *ndp; 1410 uchar_t *base; 1411 size_t size; 1412 size_t unaligned; 1413 1414 ASSERT(bp->b_wptr >= bp->b_rptr); 1415 1416 dp = bp->b_datap; 1417 if (dp->db_fthdr != NULL) 1418 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1419 1420 /* 1421 * Special handling for Multidata message; this should be 1422 * removed once a copy-callback routine is made available. 1423 */ 1424 if (dp->db_type == M_MULTIDATA) { 1425 cred_t *cr; 1426 1427 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1428 return (NULL); 1429 1430 nbp->b_flag = bp->b_flag; 1431 nbp->b_band = bp->b_band; 1432 ndp = nbp->b_datap; 1433 1434 /* See comments below on potential issues. */ 1435 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1436 1437 ASSERT(ndp->db_type == dp->db_type); 1438 cr = dp->db_credp; 1439 if (cr != NULL) 1440 crhold(ndp->db_credp = cr); 1441 ndp->db_cpid = dp->db_cpid; 1442 return (nbp); 1443 } 1444 1445 size = dp->db_lim - dp->db_base; 1446 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1447 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1448 return (NULL); 1449 nbp->b_flag = bp->b_flag; 1450 nbp->b_band = bp->b_band; 1451 ndp = nbp->b_datap; 1452 1453 /* 1454 * Copy the various checksum information that came in 1455 * originally. 1456 */ 1457 ndp->db_cksumstart = dp->db_cksumstart; 1458 ndp->db_cksumend = dp->db_cksumend; 1459 ndp->db_cksumstuff = dp->db_cksumstuff; 1460 bcopy(dp->db_struioun.data, ndp->db_struioun.data, 1461 sizeof (dp->db_struioun.data)); 1462 1463 /* 1464 * Well, here is a potential issue. If we are trying to 1465 * trace a flow, and we copy the message, we might lose 1466 * information about where this message might have been. 1467 * So we should inherit the FT data. On the other hand, 1468 * a user might be interested only in alloc to free data. 1469 * So I guess the real answer is to provide a tunable. 1470 */ 1471 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1472 1473 base = ndp->db_base + unaligned; 1474 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1475 1476 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1477 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1478 1479 return (nbp); 1480 } 1481 1482 /* 1483 * Copy data from message to newly allocated message using new 1484 * data blocks. Returns a pointer to the new message, or NULL if error. 1485 */ 1486 mblk_t * 1487 copymsg(mblk_t *bp) 1488 { 1489 mblk_t *head, *nbp; 1490 1491 if (!bp || !(nbp = head = copyb(bp))) 1492 return (NULL); 1493 1494 while (bp->b_cont) { 1495 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1496 freemsg(head); 1497 return (NULL); 1498 } 1499 nbp = nbp->b_cont; 1500 bp = bp->b_cont; 1501 } 1502 return (head); 1503 } 1504 1505 /* 1506 * link a message block to tail of message 1507 */ 1508 void 1509 linkb(mblk_t *mp, mblk_t *bp) 1510 { 1511 ASSERT(mp && bp); 1512 1513 for (; mp->b_cont; mp = mp->b_cont) 1514 ; 1515 mp->b_cont = bp; 1516 } 1517 1518 /* 1519 * unlink a message block from head of message 1520 * return pointer to new message. 1521 * NULL if message becomes empty. 1522 */ 1523 mblk_t * 1524 unlinkb(mblk_t *bp) 1525 { 1526 mblk_t *bp1; 1527 1528 bp1 = bp->b_cont; 1529 bp->b_cont = NULL; 1530 return (bp1); 1531 } 1532 1533 /* 1534 * remove a message block "bp" from message "mp" 1535 * 1536 * Return pointer to new message or NULL if no message remains. 1537 * Return -1 if bp is not found in message. 1538 */ 1539 mblk_t * 1540 rmvb(mblk_t *mp, mblk_t *bp) 1541 { 1542 mblk_t *tmp; 1543 mblk_t *lastp = NULL; 1544 1545 ASSERT(mp && bp); 1546 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1547 if (tmp == bp) { 1548 if (lastp) 1549 lastp->b_cont = tmp->b_cont; 1550 else 1551 mp = tmp->b_cont; 1552 tmp->b_cont = NULL; 1553 return (mp); 1554 } 1555 lastp = tmp; 1556 } 1557 return ((mblk_t *)-1); 1558 } 1559 1560 /* 1561 * Concatenate and align first len bytes of common 1562 * message type. Len == -1, means concat everything. 1563 * Returns 1 on success, 0 on failure 1564 * After the pullup, mp points to the pulled up data. 1565 */ 1566 int 1567 pullupmsg(mblk_t *mp, ssize_t len) 1568 { 1569 mblk_t *bp, *b_cont; 1570 dblk_t *dbp; 1571 ssize_t n; 1572 1573 ASSERT(mp->b_datap->db_ref > 0); 1574 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1575 1576 /* 1577 * We won't handle Multidata message, since it contains 1578 * metadata which this function has no knowledge of; we 1579 * assert on DEBUG, and return failure otherwise. 1580 */ 1581 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1582 if (mp->b_datap->db_type == M_MULTIDATA) 1583 return (0); 1584 1585 if (len == -1) { 1586 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1587 return (1); 1588 len = xmsgsize(mp); 1589 } else { 1590 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1591 ASSERT(first_mblk_len >= 0); 1592 /* 1593 * If the length is less than that of the first mblk, 1594 * we want to pull up the message into an aligned mblk. 1595 * Though not part of the spec, some callers assume it. 1596 */ 1597 if (len <= first_mblk_len) { 1598 if (str_aligned(mp->b_rptr)) 1599 return (1); 1600 len = first_mblk_len; 1601 } else if (xmsgsize(mp) < len) 1602 return (0); 1603 } 1604 1605 if ((bp = allocb_tmpl(len, mp)) == NULL) 1606 return (0); 1607 1608 dbp = bp->b_datap; 1609 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1610 mp->b_datap = dbp; /* ... and mp heads the new message */ 1611 mp->b_datap->db_mblk = mp; 1612 bp->b_datap->db_mblk = bp; 1613 mp->b_rptr = mp->b_wptr = dbp->db_base; 1614 1615 do { 1616 ASSERT(bp->b_datap->db_ref > 0); 1617 ASSERT(bp->b_wptr >= bp->b_rptr); 1618 n = MIN(bp->b_wptr - bp->b_rptr, len); 1619 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1620 if (n > 0) 1621 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1622 mp->b_wptr += n; 1623 bp->b_rptr += n; 1624 len -= n; 1625 if (bp->b_rptr != bp->b_wptr) 1626 break; 1627 b_cont = bp->b_cont; 1628 freeb(bp); 1629 bp = b_cont; 1630 } while (len && bp); 1631 1632 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1633 1634 return (1); 1635 } 1636 1637 /* 1638 * Concatenate and align at least the first len bytes of common message 1639 * type. Len == -1 means concatenate everything. The original message is 1640 * unaltered. Returns a pointer to a new message on success, otherwise 1641 * returns NULL. 1642 */ 1643 mblk_t * 1644 msgpullup(mblk_t *mp, ssize_t len) 1645 { 1646 mblk_t *newmp; 1647 ssize_t totlen; 1648 ssize_t n; 1649 1650 /* 1651 * We won't handle Multidata message, since it contains 1652 * metadata which this function has no knowledge of; we 1653 * assert on DEBUG, and return failure otherwise. 1654 */ 1655 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1656 if (mp->b_datap->db_type == M_MULTIDATA) 1657 return (NULL); 1658 1659 totlen = xmsgsize(mp); 1660 1661 if ((len > 0) && (len > totlen)) 1662 return (NULL); 1663 1664 /* 1665 * Copy all of the first msg type into one new mblk, then dupmsg 1666 * and link the rest onto this. 1667 */ 1668 1669 len = totlen; 1670 1671 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1672 return (NULL); 1673 1674 newmp->b_flag = mp->b_flag; 1675 newmp->b_band = mp->b_band; 1676 1677 while (len > 0) { 1678 n = mp->b_wptr - mp->b_rptr; 1679 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1680 if (n > 0) 1681 bcopy(mp->b_rptr, newmp->b_wptr, n); 1682 newmp->b_wptr += n; 1683 len -= n; 1684 mp = mp->b_cont; 1685 } 1686 1687 if (mp != NULL) { 1688 newmp->b_cont = dupmsg(mp); 1689 if (newmp->b_cont == NULL) { 1690 freemsg(newmp); 1691 return (NULL); 1692 } 1693 } 1694 1695 return (newmp); 1696 } 1697 1698 /* 1699 * Trim bytes from message 1700 * len > 0, trim from head 1701 * len < 0, trim from tail 1702 * Returns 1 on success, 0 on failure. 1703 */ 1704 int 1705 adjmsg(mblk_t *mp, ssize_t len) 1706 { 1707 mblk_t *bp; 1708 mblk_t *save_bp = NULL; 1709 mblk_t *prev_bp; 1710 mblk_t *bcont; 1711 unsigned char type; 1712 ssize_t n; 1713 int fromhead; 1714 int first; 1715 1716 ASSERT(mp != NULL); 1717 /* 1718 * We won't handle Multidata message, since it contains 1719 * metadata which this function has no knowledge of; we 1720 * assert on DEBUG, and return failure otherwise. 1721 */ 1722 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1723 if (mp->b_datap->db_type == M_MULTIDATA) 1724 return (0); 1725 1726 if (len < 0) { 1727 fromhead = 0; 1728 len = -len; 1729 } else { 1730 fromhead = 1; 1731 } 1732 1733 if (xmsgsize(mp) < len) 1734 return (0); 1735 1736 if (fromhead) { 1737 first = 1; 1738 while (len) { 1739 ASSERT(mp->b_wptr >= mp->b_rptr); 1740 n = MIN(mp->b_wptr - mp->b_rptr, len); 1741 mp->b_rptr += n; 1742 len -= n; 1743 1744 /* 1745 * If this is not the first zero length 1746 * message remove it 1747 */ 1748 if (!first && (mp->b_wptr == mp->b_rptr)) { 1749 bcont = mp->b_cont; 1750 freeb(mp); 1751 mp = save_bp->b_cont = bcont; 1752 } else { 1753 save_bp = mp; 1754 mp = mp->b_cont; 1755 } 1756 first = 0; 1757 } 1758 } else { 1759 type = mp->b_datap->db_type; 1760 while (len) { 1761 bp = mp; 1762 save_bp = NULL; 1763 1764 /* 1765 * Find the last message of same type 1766 */ 1767 while (bp && bp->b_datap->db_type == type) { 1768 ASSERT(bp->b_wptr >= bp->b_rptr); 1769 prev_bp = save_bp; 1770 save_bp = bp; 1771 bp = bp->b_cont; 1772 } 1773 if (save_bp == NULL) 1774 break; 1775 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1776 save_bp->b_wptr -= n; 1777 len -= n; 1778 1779 /* 1780 * If this is not the first message 1781 * and we have taken away everything 1782 * from this message, remove it 1783 */ 1784 1785 if ((save_bp != mp) && 1786 (save_bp->b_wptr == save_bp->b_rptr)) { 1787 bcont = save_bp->b_cont; 1788 freeb(save_bp); 1789 prev_bp->b_cont = bcont; 1790 } 1791 } 1792 } 1793 return (1); 1794 } 1795 1796 /* 1797 * get number of data bytes in message 1798 */ 1799 size_t 1800 msgdsize(mblk_t *bp) 1801 { 1802 size_t count = 0; 1803 1804 for (; bp; bp = bp->b_cont) 1805 if (bp->b_datap->db_type == M_DATA) { 1806 ASSERT(bp->b_wptr >= bp->b_rptr); 1807 count += bp->b_wptr - bp->b_rptr; 1808 } 1809 return (count); 1810 } 1811 1812 /* 1813 * Get a message off head of queue 1814 * 1815 * If queue has no buffers then mark queue 1816 * with QWANTR. (queue wants to be read by 1817 * someone when data becomes available) 1818 * 1819 * If there is something to take off then do so. 1820 * If queue falls below hi water mark turn off QFULL 1821 * flag. Decrement weighted count of queue. 1822 * Also turn off QWANTR because queue is being read. 1823 * 1824 * The queue count is maintained on a per-band basis. 1825 * Priority band 0 (normal messages) uses q_count, 1826 * q_lowat, etc. Non-zero priority bands use the 1827 * fields in their respective qband structures 1828 * (qb_count, qb_lowat, etc.) All messages appear 1829 * on the same list, linked via their b_next pointers. 1830 * q_first is the head of the list. q_count does 1831 * not reflect the size of all the messages on the 1832 * queue. It only reflects those messages in the 1833 * normal band of flow. The one exception to this 1834 * deals with high priority messages. They are in 1835 * their own conceptual "band", but are accounted 1836 * against q_count. 1837 * 1838 * If queue count is below the lo water mark and QWANTW 1839 * is set, enable the closest backq which has a service 1840 * procedure and turn off the QWANTW flag. 1841 * 1842 * getq could be built on top of rmvq, but isn't because 1843 * of performance considerations. 1844 * 1845 * A note on the use of q_count and q_mblkcnt: 1846 * q_count is the traditional byte count for messages that 1847 * have been put on a queue. Documentation tells us that 1848 * we shouldn't rely on that count, but some drivers/modules 1849 * do. What was needed, however, is a mechanism to prevent 1850 * runaway streams from consuming all of the resources, 1851 * and particularly be able to flow control zero-length 1852 * messages. q_mblkcnt is used for this purpose. It 1853 * counts the number of mblk's that are being put on 1854 * the queue. The intention here, is that each mblk should 1855 * contain one byte of data and, for the purpose of 1856 * flow-control, logically does. A queue will become 1857 * full when EITHER of these values (q_count and q_mblkcnt) 1858 * reach the highwater mark. It will clear when BOTH 1859 * of them drop below the highwater mark. And it will 1860 * backenable when BOTH of them drop below the lowwater 1861 * mark. 1862 * With this algorithm, a driver/module might be able 1863 * to find a reasonably accurate q_count, and the 1864 * framework can still try and limit resource usage. 1865 */ 1866 mblk_t * 1867 getq(queue_t *q) 1868 { 1869 mblk_t *bp; 1870 uchar_t band = 0; 1871 1872 bp = getq_noenab(q, 0); 1873 if (bp != NULL) 1874 band = bp->b_band; 1875 1876 /* 1877 * Inlined from qbackenable(). 1878 * Quick check without holding the lock. 1879 */ 1880 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1881 return (bp); 1882 1883 qbackenable(q, band); 1884 return (bp); 1885 } 1886 1887 /* 1888 * Calculate number of data bytes in a single data message block taking 1889 * multidata messages into account. 1890 */ 1891 1892 #define ADD_MBLK_SIZE(mp, size) \ 1893 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1894 (size) += MBLKL(mp); \ 1895 } else { \ 1896 uint_t pinuse; \ 1897 \ 1898 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1899 (size) += pinuse; \ 1900 } 1901 1902 /* 1903 * Returns the number of bytes in a message (a message is defined as a 1904 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we 1905 * also return the number of distinct mblks in the message. 1906 */ 1907 int 1908 mp_cont_len(mblk_t *bp, int *mblkcnt) 1909 { 1910 mblk_t *mp; 1911 int mblks = 0; 1912 int bytes = 0; 1913 1914 for (mp = bp; mp != NULL; mp = mp->b_cont) { 1915 ADD_MBLK_SIZE(mp, bytes); 1916 mblks++; 1917 } 1918 1919 if (mblkcnt != NULL) 1920 *mblkcnt = mblks; 1921 1922 return (bytes); 1923 } 1924 1925 /* 1926 * Like getq() but does not backenable. This is used by the stream 1927 * head when a putback() is likely. The caller must call qbackenable() 1928 * after it is done with accessing the queue. 1929 * The rbytes arguments to getq_noneab() allows callers to specify a 1930 * the maximum number of bytes to return. If the current amount on the 1931 * queue is less than this then the entire message will be returned. 1932 * A value of 0 returns the entire message and is equivalent to the old 1933 * default behaviour prior to the addition of the rbytes argument. 1934 */ 1935 mblk_t * 1936 getq_noenab(queue_t *q, ssize_t rbytes) 1937 { 1938 mblk_t *bp, *mp1; 1939 mblk_t *mp2 = NULL; 1940 qband_t *qbp; 1941 kthread_id_t freezer; 1942 int bytecnt = 0, mblkcnt = 0; 1943 1944 /* freezestr should allow its caller to call getq/putq */ 1945 freezer = STREAM(q)->sd_freezer; 1946 if (freezer == curthread) { 1947 ASSERT(frozenstr(q)); 1948 ASSERT(MUTEX_HELD(QLOCK(q))); 1949 } else 1950 mutex_enter(QLOCK(q)); 1951 1952 if ((bp = q->q_first) == 0) { 1953 q->q_flag |= QWANTR; 1954 } else { 1955 /* 1956 * If the caller supplied a byte threshold and there is 1957 * more than this amount on the queue then break up the 1958 * the message appropriately. We can only safely do 1959 * this for M_DATA messages. 1960 */ 1961 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) && 1962 (q->q_count > rbytes)) { 1963 /* 1964 * Inline version of mp_cont_len() which terminates 1965 * when we meet or exceed rbytes. 1966 */ 1967 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) { 1968 mblkcnt++; 1969 ADD_MBLK_SIZE(mp1, bytecnt); 1970 if (bytecnt >= rbytes) 1971 break; 1972 } 1973 /* 1974 * We need to account for the following scenarios: 1975 * 1976 * 1) Too much data in the first message: 1977 * mp1 will be the mblk which puts us over our 1978 * byte limit. 1979 * 2) Not enough data in the first message: 1980 * mp1 will be NULL. 1981 * 3) Exactly the right amount of data contained within 1982 * whole mblks: 1983 * mp1->b_cont will be where we break the message. 1984 */ 1985 if (bytecnt > rbytes) { 1986 /* 1987 * Dup/copy mp1 and put what we don't need 1988 * back onto the queue. Adjust the read/write 1989 * and continuation pointers appropriately 1990 * and decrement the current mblk count to 1991 * reflect we are putting an mblk back onto 1992 * the queue. 1993 * When adjusting the message pointers, it's 1994 * OK to use the existing bytecnt and the 1995 * requested amount (rbytes) to calculate the 1996 * the new write offset (b_wptr) of what we 1997 * are taking. However, we cannot use these 1998 * values when calculating the read offset of 1999 * the mblk we are putting back on the queue. 2000 * This is because the begining (b_rptr) of the 2001 * mblk represents some arbitrary point within 2002 * the message. 2003 * It's simplest to do this by advancing b_rptr 2004 * by the new length of mp1 as we don't have to 2005 * remember any intermediate state. 2006 */ 2007 ASSERT(mp1 != NULL); 2008 mblkcnt--; 2009 if ((mp2 = dupb(mp1)) == NULL && 2010 (mp2 = copyb(mp1)) == NULL) { 2011 bytecnt = mblkcnt = 0; 2012 goto dup_failed; 2013 } 2014 mp2->b_cont = mp1->b_cont; 2015 mp1->b_wptr -= bytecnt - rbytes; 2016 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr; 2017 mp1->b_cont = NULL; 2018 bytecnt = rbytes; 2019 } else { 2020 /* 2021 * Either there is not enough data in the first 2022 * message or there is no excess data to deal 2023 * with. If mp1 is NULL, we are taking the 2024 * whole message. No need to do anything. 2025 * Otherwise we assign mp1->b_cont to mp2 as 2026 * we will be putting this back onto the head of 2027 * the queue. 2028 */ 2029 if (mp1 != NULL) { 2030 mp2 = mp1->b_cont; 2031 mp1->b_cont = NULL; 2032 } 2033 } 2034 /* 2035 * If mp2 is not NULL then we have part of the message 2036 * to put back onto the queue. 2037 */ 2038 if (mp2 != NULL) { 2039 if ((mp2->b_next = bp->b_next) == NULL) 2040 q->q_last = mp2; 2041 else 2042 bp->b_next->b_prev = mp2; 2043 q->q_first = mp2; 2044 } else { 2045 if ((q->q_first = bp->b_next) == NULL) 2046 q->q_last = NULL; 2047 else 2048 q->q_first->b_prev = NULL; 2049 } 2050 } else { 2051 /* 2052 * Either no byte threshold was supplied, there is 2053 * not enough on the queue or we failed to 2054 * duplicate/copy a data block. In these cases we 2055 * just take the entire first message. 2056 */ 2057 dup_failed: 2058 bytecnt = mp_cont_len(bp, &mblkcnt); 2059 if ((q->q_first = bp->b_next) == NULL) 2060 q->q_last = NULL; 2061 else 2062 q->q_first->b_prev = NULL; 2063 } 2064 if (bp->b_band == 0) { 2065 q->q_count -= bytecnt; 2066 q->q_mblkcnt -= mblkcnt; 2067 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2068 (q->q_mblkcnt < q->q_hiwat))) { 2069 q->q_flag &= ~QFULL; 2070 } 2071 } else { 2072 int i; 2073 2074 ASSERT(bp->b_band <= q->q_nband); 2075 ASSERT(q->q_bandp != NULL); 2076 ASSERT(MUTEX_HELD(QLOCK(q))); 2077 qbp = q->q_bandp; 2078 i = bp->b_band; 2079 while (--i > 0) 2080 qbp = qbp->qb_next; 2081 if (qbp->qb_first == qbp->qb_last) { 2082 qbp->qb_first = NULL; 2083 qbp->qb_last = NULL; 2084 } else { 2085 qbp->qb_first = bp->b_next; 2086 } 2087 qbp->qb_count -= bytecnt; 2088 qbp->qb_mblkcnt -= mblkcnt; 2089 if (qbp->qb_mblkcnt == 0 || 2090 ((qbp->qb_count < qbp->qb_hiwat) && 2091 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2092 qbp->qb_flag &= ~QB_FULL; 2093 } 2094 } 2095 q->q_flag &= ~QWANTR; 2096 bp->b_next = NULL; 2097 bp->b_prev = NULL; 2098 } 2099 if (freezer != curthread) 2100 mutex_exit(QLOCK(q)); 2101 2102 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0); 2103 2104 return (bp); 2105 } 2106 2107 /* 2108 * Determine if a backenable is needed after removing a message in the 2109 * specified band. 2110 * NOTE: This routine assumes that something like getq_noenab() has been 2111 * already called. 2112 * 2113 * For the read side it is ok to hold sd_lock across calling this (and the 2114 * stream head often does). 2115 * But for the write side strwakeq might be invoked and it acquires sd_lock. 2116 */ 2117 void 2118 qbackenable(queue_t *q, uchar_t band) 2119 { 2120 int backenab = 0; 2121 qband_t *qbp; 2122 kthread_id_t freezer; 2123 2124 ASSERT(q); 2125 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 2126 2127 /* 2128 * Quick check without holding the lock. 2129 * OK since after getq() has lowered the q_count these flags 2130 * would not change unless either the qbackenable() is done by 2131 * another thread (which is ok) or the queue has gotten QFULL 2132 * in which case another backenable will take place when the queue 2133 * drops below q_lowat. 2134 */ 2135 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 2136 return; 2137 2138 /* freezestr should allow its caller to call getq/putq */ 2139 freezer = STREAM(q)->sd_freezer; 2140 if (freezer == curthread) { 2141 ASSERT(frozenstr(q)); 2142 ASSERT(MUTEX_HELD(QLOCK(q))); 2143 } else 2144 mutex_enter(QLOCK(q)); 2145 2146 if (band == 0) { 2147 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 2148 q->q_mblkcnt < q->q_lowat)) { 2149 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 2150 } 2151 } else { 2152 int i; 2153 2154 ASSERT((unsigned)band <= q->q_nband); 2155 ASSERT(q->q_bandp != NULL); 2156 2157 qbp = q->q_bandp; 2158 i = band; 2159 while (--i > 0) 2160 qbp = qbp->qb_next; 2161 2162 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 2163 qbp->qb_mblkcnt < qbp->qb_lowat)) { 2164 backenab = qbp->qb_flag & QB_WANTW; 2165 } 2166 } 2167 2168 if (backenab == 0) { 2169 if (freezer != curthread) 2170 mutex_exit(QLOCK(q)); 2171 return; 2172 } 2173 2174 /* Have to drop the lock across strwakeq and backenable */ 2175 if (backenab & QWANTWSYNC) 2176 q->q_flag &= ~QWANTWSYNC; 2177 if (backenab & (QWANTW|QB_WANTW)) { 2178 if (band != 0) 2179 qbp->qb_flag &= ~QB_WANTW; 2180 else { 2181 q->q_flag &= ~QWANTW; 2182 } 2183 } 2184 2185 if (freezer != curthread) 2186 mutex_exit(QLOCK(q)); 2187 2188 if (backenab & QWANTWSYNC) 2189 strwakeq(q, QWANTWSYNC); 2190 if (backenab & (QWANTW|QB_WANTW)) 2191 backenable(q, band); 2192 } 2193 2194 /* 2195 * Remove a message from a queue. The queue count and other 2196 * flow control parameters are adjusted and the back queue 2197 * enabled if necessary. 2198 * 2199 * rmvq can be called with the stream frozen, but other utility functions 2200 * holding QLOCK, and by streams modules without any locks/frozen. 2201 */ 2202 void 2203 rmvq(queue_t *q, mblk_t *mp) 2204 { 2205 ASSERT(mp != NULL); 2206 2207 rmvq_noenab(q, mp); 2208 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 2209 /* 2210 * qbackenable can handle a frozen stream but not a "random" 2211 * qlock being held. Drop lock across qbackenable. 2212 */ 2213 mutex_exit(QLOCK(q)); 2214 qbackenable(q, mp->b_band); 2215 mutex_enter(QLOCK(q)); 2216 } else { 2217 qbackenable(q, mp->b_band); 2218 } 2219 } 2220 2221 /* 2222 * Like rmvq() but without any backenabling. 2223 * This exists to handle SR_CONSOL_DATA in strrput(). 2224 */ 2225 void 2226 rmvq_noenab(queue_t *q, mblk_t *mp) 2227 { 2228 int i; 2229 qband_t *qbp = NULL; 2230 kthread_id_t freezer; 2231 int bytecnt = 0, mblkcnt = 0; 2232 2233 freezer = STREAM(q)->sd_freezer; 2234 if (freezer == curthread) { 2235 ASSERT(frozenstr(q)); 2236 ASSERT(MUTEX_HELD(QLOCK(q))); 2237 } else if (MUTEX_HELD(QLOCK(q))) { 2238 /* Don't drop lock on exit */ 2239 freezer = curthread; 2240 } else 2241 mutex_enter(QLOCK(q)); 2242 2243 ASSERT(mp->b_band <= q->q_nband); 2244 if (mp->b_band != 0) { /* Adjust band pointers */ 2245 ASSERT(q->q_bandp != NULL); 2246 qbp = q->q_bandp; 2247 i = mp->b_band; 2248 while (--i > 0) 2249 qbp = qbp->qb_next; 2250 if (mp == qbp->qb_first) { 2251 if (mp->b_next && mp->b_band == mp->b_next->b_band) 2252 qbp->qb_first = mp->b_next; 2253 else 2254 qbp->qb_first = NULL; 2255 } 2256 if (mp == qbp->qb_last) { 2257 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 2258 qbp->qb_last = mp->b_prev; 2259 else 2260 qbp->qb_last = NULL; 2261 } 2262 } 2263 2264 /* 2265 * Remove the message from the list. 2266 */ 2267 if (mp->b_prev) 2268 mp->b_prev->b_next = mp->b_next; 2269 else 2270 q->q_first = mp->b_next; 2271 if (mp->b_next) 2272 mp->b_next->b_prev = mp->b_prev; 2273 else 2274 q->q_last = mp->b_prev; 2275 mp->b_next = NULL; 2276 mp->b_prev = NULL; 2277 2278 /* Get the size of the message for q_count accounting */ 2279 bytecnt = mp_cont_len(mp, &mblkcnt); 2280 2281 if (mp->b_band == 0) { /* Perform q_count accounting */ 2282 q->q_count -= bytecnt; 2283 q->q_mblkcnt -= mblkcnt; 2284 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2285 (q->q_mblkcnt < q->q_hiwat))) { 2286 q->q_flag &= ~QFULL; 2287 } 2288 } else { /* Perform qb_count accounting */ 2289 qbp->qb_count -= bytecnt; 2290 qbp->qb_mblkcnt -= mblkcnt; 2291 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) && 2292 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2293 qbp->qb_flag &= ~QB_FULL; 2294 } 2295 } 2296 if (freezer != curthread) 2297 mutex_exit(QLOCK(q)); 2298 2299 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0); 2300 } 2301 2302 /* 2303 * Empty a queue. 2304 * If flag is set, remove all messages. Otherwise, remove 2305 * only non-control messages. If queue falls below its low 2306 * water mark, and QWANTW is set, enable the nearest upstream 2307 * service procedure. 2308 * 2309 * Historical note: when merging the M_FLUSH code in strrput with this 2310 * code one difference was discovered. flushq did not have a check 2311 * for q_lowat == 0 in the backenabling test. 2312 * 2313 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2314 * if one exists on the queue. 2315 */ 2316 void 2317 flushq_common(queue_t *q, int flag, int pcproto_flag) 2318 { 2319 mblk_t *mp, *nmp; 2320 qband_t *qbp; 2321 int backenab = 0; 2322 unsigned char bpri; 2323 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2324 2325 if (q->q_first == NULL) 2326 return; 2327 2328 mutex_enter(QLOCK(q)); 2329 mp = q->q_first; 2330 q->q_first = NULL; 2331 q->q_last = NULL; 2332 q->q_count = 0; 2333 q->q_mblkcnt = 0; 2334 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2335 qbp->qb_first = NULL; 2336 qbp->qb_last = NULL; 2337 qbp->qb_count = 0; 2338 qbp->qb_mblkcnt = 0; 2339 qbp->qb_flag &= ~QB_FULL; 2340 } 2341 q->q_flag &= ~QFULL; 2342 mutex_exit(QLOCK(q)); 2343 while (mp) { 2344 nmp = mp->b_next; 2345 mp->b_next = mp->b_prev = NULL; 2346 2347 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0); 2348 2349 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2350 (void) putq(q, mp); 2351 else if (flag || datamsg(mp->b_datap->db_type)) 2352 freemsg(mp); 2353 else 2354 (void) putq(q, mp); 2355 mp = nmp; 2356 } 2357 bpri = 1; 2358 mutex_enter(QLOCK(q)); 2359 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2360 if ((qbp->qb_flag & QB_WANTW) && 2361 (((qbp->qb_count < qbp->qb_lowat) && 2362 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2363 qbp->qb_lowat == 0)) { 2364 qbp->qb_flag &= ~QB_WANTW; 2365 backenab = 1; 2366 qbf[bpri] = 1; 2367 } else 2368 qbf[bpri] = 0; 2369 bpri++; 2370 } 2371 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2372 if ((q->q_flag & QWANTW) && 2373 (((q->q_count < q->q_lowat) && 2374 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2375 q->q_flag &= ~QWANTW; 2376 backenab = 1; 2377 qbf[0] = 1; 2378 } else 2379 qbf[0] = 0; 2380 2381 /* 2382 * If any band can now be written to, and there is a writer 2383 * for that band, then backenable the closest service procedure. 2384 */ 2385 if (backenab) { 2386 mutex_exit(QLOCK(q)); 2387 for (bpri = q->q_nband; bpri != 0; bpri--) 2388 if (qbf[bpri]) 2389 backenable(q, bpri); 2390 if (qbf[0]) 2391 backenable(q, 0); 2392 } else 2393 mutex_exit(QLOCK(q)); 2394 } 2395 2396 /* 2397 * The real flushing takes place in flushq_common. This is done so that 2398 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2399 * or not. Currently the only place that uses this flag is the stream head. 2400 */ 2401 void 2402 flushq(queue_t *q, int flag) 2403 { 2404 flushq_common(q, flag, 0); 2405 } 2406 2407 /* 2408 * Flush the queue of messages of the given priority band. 2409 * There is some duplication of code between flushq and flushband. 2410 * This is because we want to optimize the code as much as possible. 2411 * The assumption is that there will be more messages in the normal 2412 * (priority 0) band than in any other. 2413 * 2414 * Historical note: when merging the M_FLUSH code in strrput with this 2415 * code one difference was discovered. flushband had an extra check for 2416 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2417 * case. That check does not match the man page for flushband and was not 2418 * in the strrput flush code hence it was removed. 2419 */ 2420 void 2421 flushband(queue_t *q, unsigned char pri, int flag) 2422 { 2423 mblk_t *mp; 2424 mblk_t *nmp; 2425 mblk_t *last; 2426 qband_t *qbp; 2427 int band; 2428 2429 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2430 if (pri > q->q_nband) { 2431 return; 2432 } 2433 mutex_enter(QLOCK(q)); 2434 if (pri == 0) { 2435 mp = q->q_first; 2436 q->q_first = NULL; 2437 q->q_last = NULL; 2438 q->q_count = 0; 2439 q->q_mblkcnt = 0; 2440 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2441 qbp->qb_first = NULL; 2442 qbp->qb_last = NULL; 2443 qbp->qb_count = 0; 2444 qbp->qb_mblkcnt = 0; 2445 qbp->qb_flag &= ~QB_FULL; 2446 } 2447 q->q_flag &= ~QFULL; 2448 mutex_exit(QLOCK(q)); 2449 while (mp) { 2450 nmp = mp->b_next; 2451 mp->b_next = mp->b_prev = NULL; 2452 if ((mp->b_band == 0) && 2453 ((flag == FLUSHALL) || 2454 datamsg(mp->b_datap->db_type))) 2455 freemsg(mp); 2456 else 2457 (void) putq(q, mp); 2458 mp = nmp; 2459 } 2460 mutex_enter(QLOCK(q)); 2461 if ((q->q_flag & QWANTW) && 2462 (((q->q_count < q->q_lowat) && 2463 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2464 q->q_flag &= ~QWANTW; 2465 mutex_exit(QLOCK(q)); 2466 2467 backenable(q, pri); 2468 } else 2469 mutex_exit(QLOCK(q)); 2470 } else { /* pri != 0 */ 2471 boolean_t flushed = B_FALSE; 2472 band = pri; 2473 2474 ASSERT(MUTEX_HELD(QLOCK(q))); 2475 qbp = q->q_bandp; 2476 while (--band > 0) 2477 qbp = qbp->qb_next; 2478 mp = qbp->qb_first; 2479 if (mp == NULL) { 2480 mutex_exit(QLOCK(q)); 2481 return; 2482 } 2483 last = qbp->qb_last->b_next; 2484 /* 2485 * rmvq_noenab() and freemsg() are called for each mblk that 2486 * meets the criteria. The loop is executed until the last 2487 * mblk has been processed. 2488 */ 2489 while (mp != last) { 2490 ASSERT(mp->b_band == pri); 2491 nmp = mp->b_next; 2492 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2493 rmvq_noenab(q, mp); 2494 freemsg(mp); 2495 flushed = B_TRUE; 2496 } 2497 mp = nmp; 2498 } 2499 mutex_exit(QLOCK(q)); 2500 2501 /* 2502 * If any mblk(s) has been freed, we know that qbackenable() 2503 * will need to be called. 2504 */ 2505 if (flushed) 2506 qbackenable(q, pri); 2507 } 2508 } 2509 2510 /* 2511 * Return 1 if the queue is not full. If the queue is full, return 2512 * 0 (may not put message) and set QWANTW flag (caller wants to write 2513 * to the queue). 2514 */ 2515 int 2516 canput(queue_t *q) 2517 { 2518 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2519 2520 /* this is for loopback transports, they should not do a canput */ 2521 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2522 2523 /* Find next forward module that has a service procedure */ 2524 q = q->q_nfsrv; 2525 2526 if (!(q->q_flag & QFULL)) { 2527 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2528 return (1); 2529 } 2530 mutex_enter(QLOCK(q)); 2531 if (q->q_flag & QFULL) { 2532 q->q_flag |= QWANTW; 2533 mutex_exit(QLOCK(q)); 2534 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2535 return (0); 2536 } 2537 mutex_exit(QLOCK(q)); 2538 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2539 return (1); 2540 } 2541 2542 /* 2543 * This is the new canput for use with priority bands. Return 1 if the 2544 * band is not full. If the band is full, return 0 (may not put message) 2545 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2546 * write to the queue). 2547 */ 2548 int 2549 bcanput(queue_t *q, unsigned char pri) 2550 { 2551 qband_t *qbp; 2552 2553 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2554 if (!q) 2555 return (0); 2556 2557 /* Find next forward module that has a service procedure */ 2558 q = q->q_nfsrv; 2559 2560 mutex_enter(QLOCK(q)); 2561 if (pri == 0) { 2562 if (q->q_flag & QFULL) { 2563 q->q_flag |= QWANTW; 2564 mutex_exit(QLOCK(q)); 2565 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2566 "bcanput:%p %X %d", q, pri, 0); 2567 return (0); 2568 } 2569 } else { /* pri != 0 */ 2570 if (pri > q->q_nband) { 2571 /* 2572 * No band exists yet, so return success. 2573 */ 2574 mutex_exit(QLOCK(q)); 2575 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2576 "bcanput:%p %X %d", q, pri, 1); 2577 return (1); 2578 } 2579 qbp = q->q_bandp; 2580 while (--pri) 2581 qbp = qbp->qb_next; 2582 if (qbp->qb_flag & QB_FULL) { 2583 qbp->qb_flag |= QB_WANTW; 2584 mutex_exit(QLOCK(q)); 2585 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2586 "bcanput:%p %X %d", q, pri, 0); 2587 return (0); 2588 } 2589 } 2590 mutex_exit(QLOCK(q)); 2591 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2592 "bcanput:%p %X %d", q, pri, 1); 2593 return (1); 2594 } 2595 2596 /* 2597 * Put a message on a queue. 2598 * 2599 * Messages are enqueued on a priority basis. The priority classes 2600 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2601 * and B_NORMAL (type < QPCTL && band == 0). 2602 * 2603 * Add appropriate weighted data block sizes to queue count. 2604 * If queue hits high water mark then set QFULL flag. 2605 * 2606 * If QNOENAB is not set (putq is allowed to enable the queue), 2607 * enable the queue only if the message is PRIORITY, 2608 * or the QWANTR flag is set (indicating that the service procedure 2609 * is ready to read the queue. This implies that a service 2610 * procedure must NEVER put a high priority message back on its own 2611 * queue, as this would result in an infinite loop (!). 2612 */ 2613 int 2614 putq(queue_t *q, mblk_t *bp) 2615 { 2616 mblk_t *tmp; 2617 qband_t *qbp = NULL; 2618 int mcls = (int)queclass(bp); 2619 kthread_id_t freezer; 2620 int bytecnt = 0, mblkcnt = 0; 2621 2622 freezer = STREAM(q)->sd_freezer; 2623 if (freezer == curthread) { 2624 ASSERT(frozenstr(q)); 2625 ASSERT(MUTEX_HELD(QLOCK(q))); 2626 } else 2627 mutex_enter(QLOCK(q)); 2628 2629 /* 2630 * Make sanity checks and if qband structure is not yet 2631 * allocated, do so. 2632 */ 2633 if (mcls == QPCTL) { 2634 if (bp->b_band != 0) 2635 bp->b_band = 0; /* force to be correct */ 2636 } else if (bp->b_band != 0) { 2637 int i; 2638 qband_t **qbpp; 2639 2640 if (bp->b_band > q->q_nband) { 2641 2642 /* 2643 * The qband structure for this priority band is 2644 * not on the queue yet, so we have to allocate 2645 * one on the fly. It would be wasteful to 2646 * associate the qband structures with every 2647 * queue when the queues are allocated. This is 2648 * because most queues will only need the normal 2649 * band of flow which can be described entirely 2650 * by the queue itself. 2651 */ 2652 qbpp = &q->q_bandp; 2653 while (*qbpp) 2654 qbpp = &(*qbpp)->qb_next; 2655 while (bp->b_band > q->q_nband) { 2656 if ((*qbpp = allocband()) == NULL) { 2657 if (freezer != curthread) 2658 mutex_exit(QLOCK(q)); 2659 return (0); 2660 } 2661 (*qbpp)->qb_hiwat = q->q_hiwat; 2662 (*qbpp)->qb_lowat = q->q_lowat; 2663 q->q_nband++; 2664 qbpp = &(*qbpp)->qb_next; 2665 } 2666 } 2667 ASSERT(MUTEX_HELD(QLOCK(q))); 2668 qbp = q->q_bandp; 2669 i = bp->b_band; 2670 while (--i) 2671 qbp = qbp->qb_next; 2672 } 2673 2674 /* 2675 * If queue is empty, add the message and initialize the pointers. 2676 * Otherwise, adjust message pointers and queue pointers based on 2677 * the type of the message and where it belongs on the queue. Some 2678 * code is duplicated to minimize the number of conditionals and 2679 * hopefully minimize the amount of time this routine takes. 2680 */ 2681 if (!q->q_first) { 2682 bp->b_next = NULL; 2683 bp->b_prev = NULL; 2684 q->q_first = bp; 2685 q->q_last = bp; 2686 if (qbp) { 2687 qbp->qb_first = bp; 2688 qbp->qb_last = bp; 2689 } 2690 } else if (!qbp) { /* bp->b_band == 0 */ 2691 2692 /* 2693 * If queue class of message is less than or equal to 2694 * that of the last one on the queue, tack on to the end. 2695 */ 2696 tmp = q->q_last; 2697 if (mcls <= (int)queclass(tmp)) { 2698 bp->b_next = NULL; 2699 bp->b_prev = tmp; 2700 tmp->b_next = bp; 2701 q->q_last = bp; 2702 } else { 2703 tmp = q->q_first; 2704 while ((int)queclass(tmp) >= mcls) 2705 tmp = tmp->b_next; 2706 2707 /* 2708 * Insert bp before tmp. 2709 */ 2710 bp->b_next = tmp; 2711 bp->b_prev = tmp->b_prev; 2712 if (tmp->b_prev) 2713 tmp->b_prev->b_next = bp; 2714 else 2715 q->q_first = bp; 2716 tmp->b_prev = bp; 2717 } 2718 } else { /* bp->b_band != 0 */ 2719 if (qbp->qb_first) { 2720 tmp = qbp->qb_last; 2721 2722 /* 2723 * Insert bp after the last message in this band. 2724 */ 2725 bp->b_next = tmp->b_next; 2726 if (tmp->b_next) 2727 tmp->b_next->b_prev = bp; 2728 else 2729 q->q_last = bp; 2730 bp->b_prev = tmp; 2731 tmp->b_next = bp; 2732 } else { 2733 tmp = q->q_last; 2734 if ((mcls < (int)queclass(tmp)) || 2735 (bp->b_band <= tmp->b_band)) { 2736 2737 /* 2738 * Tack bp on end of queue. 2739 */ 2740 bp->b_next = NULL; 2741 bp->b_prev = tmp; 2742 tmp->b_next = bp; 2743 q->q_last = bp; 2744 } else { 2745 tmp = q->q_first; 2746 while (tmp->b_datap->db_type >= QPCTL) 2747 tmp = tmp->b_next; 2748 while (tmp->b_band >= bp->b_band) 2749 tmp = tmp->b_next; 2750 2751 /* 2752 * Insert bp before tmp. 2753 */ 2754 bp->b_next = tmp; 2755 bp->b_prev = tmp->b_prev; 2756 if (tmp->b_prev) 2757 tmp->b_prev->b_next = bp; 2758 else 2759 q->q_first = bp; 2760 tmp->b_prev = bp; 2761 } 2762 qbp->qb_first = bp; 2763 } 2764 qbp->qb_last = bp; 2765 } 2766 2767 /* Get message byte count for q_count accounting */ 2768 bytecnt = mp_cont_len(bp, &mblkcnt); 2769 2770 if (qbp) { 2771 qbp->qb_count += bytecnt; 2772 qbp->qb_mblkcnt += mblkcnt; 2773 if ((qbp->qb_count >= qbp->qb_hiwat) || 2774 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2775 qbp->qb_flag |= QB_FULL; 2776 } 2777 } else { 2778 q->q_count += bytecnt; 2779 q->q_mblkcnt += mblkcnt; 2780 if ((q->q_count >= q->q_hiwat) || 2781 (q->q_mblkcnt >= q->q_hiwat)) { 2782 q->q_flag |= QFULL; 2783 } 2784 } 2785 2786 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0); 2787 2788 if ((mcls > QNORM) || 2789 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2790 qenable_locked(q); 2791 ASSERT(MUTEX_HELD(QLOCK(q))); 2792 if (freezer != curthread) 2793 mutex_exit(QLOCK(q)); 2794 2795 return (1); 2796 } 2797 2798 /* 2799 * Put stuff back at beginning of Q according to priority order. 2800 * See comment on putq above for details. 2801 */ 2802 int 2803 putbq(queue_t *q, mblk_t *bp) 2804 { 2805 mblk_t *tmp; 2806 qband_t *qbp = NULL; 2807 int mcls = (int)queclass(bp); 2808 kthread_id_t freezer; 2809 int bytecnt = 0, mblkcnt = 0; 2810 2811 ASSERT(q && bp); 2812 ASSERT(bp->b_next == NULL); 2813 freezer = STREAM(q)->sd_freezer; 2814 if (freezer == curthread) { 2815 ASSERT(frozenstr(q)); 2816 ASSERT(MUTEX_HELD(QLOCK(q))); 2817 } else 2818 mutex_enter(QLOCK(q)); 2819 2820 /* 2821 * Make sanity checks and if qband structure is not yet 2822 * allocated, do so. 2823 */ 2824 if (mcls == QPCTL) { 2825 if (bp->b_band != 0) 2826 bp->b_band = 0; /* force to be correct */ 2827 } else if (bp->b_band != 0) { 2828 int i; 2829 qband_t **qbpp; 2830 2831 if (bp->b_band > q->q_nband) { 2832 qbpp = &q->q_bandp; 2833 while (*qbpp) 2834 qbpp = &(*qbpp)->qb_next; 2835 while (bp->b_band > q->q_nband) { 2836 if ((*qbpp = allocband()) == NULL) { 2837 if (freezer != curthread) 2838 mutex_exit(QLOCK(q)); 2839 return (0); 2840 } 2841 (*qbpp)->qb_hiwat = q->q_hiwat; 2842 (*qbpp)->qb_lowat = q->q_lowat; 2843 q->q_nband++; 2844 qbpp = &(*qbpp)->qb_next; 2845 } 2846 } 2847 qbp = q->q_bandp; 2848 i = bp->b_band; 2849 while (--i) 2850 qbp = qbp->qb_next; 2851 } 2852 2853 /* 2854 * If queue is empty or if message is high priority, 2855 * place on the front of the queue. 2856 */ 2857 tmp = q->q_first; 2858 if ((!tmp) || (mcls == QPCTL)) { 2859 bp->b_next = tmp; 2860 if (tmp) 2861 tmp->b_prev = bp; 2862 else 2863 q->q_last = bp; 2864 q->q_first = bp; 2865 bp->b_prev = NULL; 2866 if (qbp) { 2867 qbp->qb_first = bp; 2868 qbp->qb_last = bp; 2869 } 2870 } else if (qbp) { /* bp->b_band != 0 */ 2871 tmp = qbp->qb_first; 2872 if (tmp) { 2873 2874 /* 2875 * Insert bp before the first message in this band. 2876 */ 2877 bp->b_next = tmp; 2878 bp->b_prev = tmp->b_prev; 2879 if (tmp->b_prev) 2880 tmp->b_prev->b_next = bp; 2881 else 2882 q->q_first = bp; 2883 tmp->b_prev = bp; 2884 } else { 2885 tmp = q->q_last; 2886 if ((mcls < (int)queclass(tmp)) || 2887 (bp->b_band < tmp->b_band)) { 2888 2889 /* 2890 * Tack bp on end of queue. 2891 */ 2892 bp->b_next = NULL; 2893 bp->b_prev = tmp; 2894 tmp->b_next = bp; 2895 q->q_last = bp; 2896 } else { 2897 tmp = q->q_first; 2898 while (tmp->b_datap->db_type >= QPCTL) 2899 tmp = tmp->b_next; 2900 while (tmp->b_band > bp->b_band) 2901 tmp = tmp->b_next; 2902 2903 /* 2904 * Insert bp before tmp. 2905 */ 2906 bp->b_next = tmp; 2907 bp->b_prev = tmp->b_prev; 2908 if (tmp->b_prev) 2909 tmp->b_prev->b_next = bp; 2910 else 2911 q->q_first = bp; 2912 tmp->b_prev = bp; 2913 } 2914 qbp->qb_last = bp; 2915 } 2916 qbp->qb_first = bp; 2917 } else { /* bp->b_band == 0 && !QPCTL */ 2918 2919 /* 2920 * If the queue class or band is less than that of the last 2921 * message on the queue, tack bp on the end of the queue. 2922 */ 2923 tmp = q->q_last; 2924 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2925 bp->b_next = NULL; 2926 bp->b_prev = tmp; 2927 tmp->b_next = bp; 2928 q->q_last = bp; 2929 } else { 2930 tmp = q->q_first; 2931 while (tmp->b_datap->db_type >= QPCTL) 2932 tmp = tmp->b_next; 2933 while (tmp->b_band > bp->b_band) 2934 tmp = tmp->b_next; 2935 2936 /* 2937 * Insert bp before tmp. 2938 */ 2939 bp->b_next = tmp; 2940 bp->b_prev = tmp->b_prev; 2941 if (tmp->b_prev) 2942 tmp->b_prev->b_next = bp; 2943 else 2944 q->q_first = bp; 2945 tmp->b_prev = bp; 2946 } 2947 } 2948 2949 /* Get message byte count for q_count accounting */ 2950 bytecnt = mp_cont_len(bp, &mblkcnt); 2951 2952 if (qbp) { 2953 qbp->qb_count += bytecnt; 2954 qbp->qb_mblkcnt += mblkcnt; 2955 if ((qbp->qb_count >= qbp->qb_hiwat) || 2956 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2957 qbp->qb_flag |= QB_FULL; 2958 } 2959 } else { 2960 q->q_count += bytecnt; 2961 q->q_mblkcnt += mblkcnt; 2962 if ((q->q_count >= q->q_hiwat) || 2963 (q->q_mblkcnt >= q->q_hiwat)) { 2964 q->q_flag |= QFULL; 2965 } 2966 } 2967 2968 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0); 2969 2970 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2971 qenable_locked(q); 2972 ASSERT(MUTEX_HELD(QLOCK(q))); 2973 if (freezer != curthread) 2974 mutex_exit(QLOCK(q)); 2975 2976 return (1); 2977 } 2978 2979 /* 2980 * Insert a message before an existing message on the queue. If the 2981 * existing message is NULL, the new messages is placed on the end of 2982 * the queue. The queue class of the new message is ignored. However, 2983 * the priority band of the new message must adhere to the following 2984 * ordering: 2985 * 2986 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2987 * 2988 * All flow control parameters are updated. 2989 * 2990 * insq can be called with the stream frozen, but other utility functions 2991 * holding QLOCK, and by streams modules without any locks/frozen. 2992 */ 2993 int 2994 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2995 { 2996 mblk_t *tmp; 2997 qband_t *qbp = NULL; 2998 int mcls = (int)queclass(mp); 2999 kthread_id_t freezer; 3000 int bytecnt = 0, mblkcnt = 0; 3001 3002 freezer = STREAM(q)->sd_freezer; 3003 if (freezer == curthread) { 3004 ASSERT(frozenstr(q)); 3005 ASSERT(MUTEX_HELD(QLOCK(q))); 3006 } else if (MUTEX_HELD(QLOCK(q))) { 3007 /* Don't drop lock on exit */ 3008 freezer = curthread; 3009 } else 3010 mutex_enter(QLOCK(q)); 3011 3012 if (mcls == QPCTL) { 3013 if (mp->b_band != 0) 3014 mp->b_band = 0; /* force to be correct */ 3015 if (emp && emp->b_prev && 3016 (emp->b_prev->b_datap->db_type < QPCTL)) 3017 goto badord; 3018 } 3019 if (emp) { 3020 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 3021 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 3022 (emp->b_prev->b_band < mp->b_band))) { 3023 goto badord; 3024 } 3025 } else { 3026 tmp = q->q_last; 3027 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 3028 badord: 3029 cmn_err(CE_WARN, 3030 "insq: attempt to insert message out of order " 3031 "on q %p", (void *)q); 3032 if (freezer != curthread) 3033 mutex_exit(QLOCK(q)); 3034 return (0); 3035 } 3036 } 3037 3038 if (mp->b_band != 0) { 3039 int i; 3040 qband_t **qbpp; 3041 3042 if (mp->b_band > q->q_nband) { 3043 qbpp = &q->q_bandp; 3044 while (*qbpp) 3045 qbpp = &(*qbpp)->qb_next; 3046 while (mp->b_band > q->q_nband) { 3047 if ((*qbpp = allocband()) == NULL) { 3048 if (freezer != curthread) 3049 mutex_exit(QLOCK(q)); 3050 return (0); 3051 } 3052 (*qbpp)->qb_hiwat = q->q_hiwat; 3053 (*qbpp)->qb_lowat = q->q_lowat; 3054 q->q_nband++; 3055 qbpp = &(*qbpp)->qb_next; 3056 } 3057 } 3058 qbp = q->q_bandp; 3059 i = mp->b_band; 3060 while (--i) 3061 qbp = qbp->qb_next; 3062 } 3063 3064 if ((mp->b_next = emp) != NULL) { 3065 if ((mp->b_prev = emp->b_prev) != NULL) 3066 emp->b_prev->b_next = mp; 3067 else 3068 q->q_first = mp; 3069 emp->b_prev = mp; 3070 } else { 3071 if ((mp->b_prev = q->q_last) != NULL) 3072 q->q_last->b_next = mp; 3073 else 3074 q->q_first = mp; 3075 q->q_last = mp; 3076 } 3077 3078 /* Get mblk and byte count for q_count accounting */ 3079 bytecnt = mp_cont_len(mp, &mblkcnt); 3080 3081 if (qbp) { /* adjust qband pointers and count */ 3082 if (!qbp->qb_first) { 3083 qbp->qb_first = mp; 3084 qbp->qb_last = mp; 3085 } else { 3086 if (mp->b_prev == NULL || (mp->b_prev != NULL && 3087 (mp->b_prev->b_band != mp->b_band))) 3088 qbp->qb_first = mp; 3089 else if (mp->b_next == NULL || (mp->b_next != NULL && 3090 (mp->b_next->b_band != mp->b_band))) 3091 qbp->qb_last = mp; 3092 } 3093 qbp->qb_count += bytecnt; 3094 qbp->qb_mblkcnt += mblkcnt; 3095 if ((qbp->qb_count >= qbp->qb_hiwat) || 3096 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 3097 qbp->qb_flag |= QB_FULL; 3098 } 3099 } else { 3100 q->q_count += bytecnt; 3101 q->q_mblkcnt += mblkcnt; 3102 if ((q->q_count >= q->q_hiwat) || 3103 (q->q_mblkcnt >= q->q_hiwat)) { 3104 q->q_flag |= QFULL; 3105 } 3106 } 3107 3108 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0); 3109 3110 if (canenable(q) && (q->q_flag & QWANTR)) 3111 qenable_locked(q); 3112 3113 ASSERT(MUTEX_HELD(QLOCK(q))); 3114 if (freezer != curthread) 3115 mutex_exit(QLOCK(q)); 3116 3117 return (1); 3118 } 3119 3120 /* 3121 * Create and put a control message on queue. 3122 */ 3123 int 3124 putctl(queue_t *q, int type) 3125 { 3126 mblk_t *bp; 3127 3128 if ((datamsg(type) && (type != M_DELAY)) || 3129 (bp = allocb_tryhard(0)) == NULL) 3130 return (0); 3131 bp->b_datap->db_type = (unsigned char) type; 3132 3133 put(q, bp); 3134 3135 return (1); 3136 } 3137 3138 /* 3139 * Control message with a single-byte parameter 3140 */ 3141 int 3142 putctl1(queue_t *q, int type, int param) 3143 { 3144 mblk_t *bp; 3145 3146 if ((datamsg(type) && (type != M_DELAY)) || 3147 (bp = allocb_tryhard(1)) == NULL) 3148 return (0); 3149 bp->b_datap->db_type = (unsigned char)type; 3150 *bp->b_wptr++ = (unsigned char)param; 3151 3152 put(q, bp); 3153 3154 return (1); 3155 } 3156 3157 int 3158 putnextctl1(queue_t *q, int type, int param) 3159 { 3160 mblk_t *bp; 3161 3162 if ((datamsg(type) && (type != M_DELAY)) || 3163 ((bp = allocb_tryhard(1)) == NULL)) 3164 return (0); 3165 3166 bp->b_datap->db_type = (unsigned char)type; 3167 *bp->b_wptr++ = (unsigned char)param; 3168 3169 putnext(q, bp); 3170 3171 return (1); 3172 } 3173 3174 int 3175 putnextctl(queue_t *q, int type) 3176 { 3177 mblk_t *bp; 3178 3179 if ((datamsg(type) && (type != M_DELAY)) || 3180 ((bp = allocb_tryhard(0)) == NULL)) 3181 return (0); 3182 bp->b_datap->db_type = (unsigned char)type; 3183 3184 putnext(q, bp); 3185 3186 return (1); 3187 } 3188 3189 /* 3190 * Return the queue upstream from this one 3191 */ 3192 queue_t * 3193 backq(queue_t *q) 3194 { 3195 q = _OTHERQ(q); 3196 if (q->q_next) { 3197 q = q->q_next; 3198 return (_OTHERQ(q)); 3199 } 3200 return (NULL); 3201 } 3202 3203 /* 3204 * Send a block back up the queue in reverse from this 3205 * one (e.g. to respond to ioctls) 3206 */ 3207 void 3208 qreply(queue_t *q, mblk_t *bp) 3209 { 3210 ASSERT(q && bp); 3211 3212 putnext(_OTHERQ(q), bp); 3213 } 3214 3215 /* 3216 * Streams Queue Scheduling 3217 * 3218 * Queues are enabled through qenable() when they have messages to 3219 * process. They are serviced by queuerun(), which runs each enabled 3220 * queue's service procedure. The call to queuerun() is processor 3221 * dependent - the general principle is that it be run whenever a queue 3222 * is enabled but before returning to user level. For system calls, 3223 * the function runqueues() is called if their action causes a queue 3224 * to be enabled. For device interrupts, queuerun() should be 3225 * called before returning from the last level of interrupt. Beyond 3226 * this, no timing assumptions should be made about queue scheduling. 3227 */ 3228 3229 /* 3230 * Enable a queue: put it on list of those whose service procedures are 3231 * ready to run and set up the scheduling mechanism. 3232 * The broadcast is done outside the mutex -> to avoid the woken thread 3233 * from contending with the mutex. This is OK 'cos the queue has been 3234 * enqueued on the runlist and flagged safely at this point. 3235 */ 3236 void 3237 qenable(queue_t *q) 3238 { 3239 mutex_enter(QLOCK(q)); 3240 qenable_locked(q); 3241 mutex_exit(QLOCK(q)); 3242 } 3243 /* 3244 * Return number of messages on queue 3245 */ 3246 int 3247 qsize(queue_t *qp) 3248 { 3249 int count = 0; 3250 mblk_t *mp; 3251 3252 mutex_enter(QLOCK(qp)); 3253 for (mp = qp->q_first; mp; mp = mp->b_next) 3254 count++; 3255 mutex_exit(QLOCK(qp)); 3256 return (count); 3257 } 3258 3259 /* 3260 * noenable - set queue so that putq() will not enable it. 3261 * enableok - set queue so that putq() can enable it. 3262 */ 3263 void 3264 noenable(queue_t *q) 3265 { 3266 mutex_enter(QLOCK(q)); 3267 q->q_flag |= QNOENB; 3268 mutex_exit(QLOCK(q)); 3269 } 3270 3271 void 3272 enableok(queue_t *q) 3273 { 3274 mutex_enter(QLOCK(q)); 3275 q->q_flag &= ~QNOENB; 3276 mutex_exit(QLOCK(q)); 3277 } 3278 3279 /* 3280 * Set queue fields. 3281 */ 3282 int 3283 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 3284 { 3285 qband_t *qbp = NULL; 3286 queue_t *wrq; 3287 int error = 0; 3288 kthread_id_t freezer; 3289 3290 freezer = STREAM(q)->sd_freezer; 3291 if (freezer == curthread) { 3292 ASSERT(frozenstr(q)); 3293 ASSERT(MUTEX_HELD(QLOCK(q))); 3294 } else 3295 mutex_enter(QLOCK(q)); 3296 3297 if (what >= QBAD) { 3298 error = EINVAL; 3299 goto done; 3300 } 3301 if (pri != 0) { 3302 int i; 3303 qband_t **qbpp; 3304 3305 if (pri > q->q_nband) { 3306 qbpp = &q->q_bandp; 3307 while (*qbpp) 3308 qbpp = &(*qbpp)->qb_next; 3309 while (pri > q->q_nband) { 3310 if ((*qbpp = allocband()) == NULL) { 3311 error = EAGAIN; 3312 goto done; 3313 } 3314 (*qbpp)->qb_hiwat = q->q_hiwat; 3315 (*qbpp)->qb_lowat = q->q_lowat; 3316 q->q_nband++; 3317 qbpp = &(*qbpp)->qb_next; 3318 } 3319 } 3320 qbp = q->q_bandp; 3321 i = pri; 3322 while (--i) 3323 qbp = qbp->qb_next; 3324 } 3325 switch (what) { 3326 3327 case QHIWAT: 3328 if (qbp) 3329 qbp->qb_hiwat = (size_t)val; 3330 else 3331 q->q_hiwat = (size_t)val; 3332 break; 3333 3334 case QLOWAT: 3335 if (qbp) 3336 qbp->qb_lowat = (size_t)val; 3337 else 3338 q->q_lowat = (size_t)val; 3339 break; 3340 3341 case QMAXPSZ: 3342 if (qbp) 3343 error = EINVAL; 3344 else 3345 q->q_maxpsz = (ssize_t)val; 3346 3347 /* 3348 * Performance concern, strwrite looks at the module below 3349 * the stream head for the maxpsz each time it does a write 3350 * we now cache it at the stream head. Check to see if this 3351 * queue is sitting directly below the stream head. 3352 */ 3353 wrq = STREAM(q)->sd_wrq; 3354 if (q != wrq->q_next) 3355 break; 3356 3357 /* 3358 * If the stream is not frozen drop the current QLOCK and 3359 * acquire the sd_wrq QLOCK which protects sd_qn_* 3360 */ 3361 if (freezer != curthread) { 3362 mutex_exit(QLOCK(q)); 3363 mutex_enter(QLOCK(wrq)); 3364 } 3365 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3366 3367 if (strmsgsz != 0) { 3368 if (val == INFPSZ) 3369 val = strmsgsz; 3370 else { 3371 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3372 val = MIN(PIPE_BUF, val); 3373 else 3374 val = MIN(strmsgsz, val); 3375 } 3376 } 3377 STREAM(q)->sd_qn_maxpsz = val; 3378 if (freezer != curthread) { 3379 mutex_exit(QLOCK(wrq)); 3380 mutex_enter(QLOCK(q)); 3381 } 3382 break; 3383 3384 case QMINPSZ: 3385 if (qbp) 3386 error = EINVAL; 3387 else 3388 q->q_minpsz = (ssize_t)val; 3389 3390 /* 3391 * Performance concern, strwrite looks at the module below 3392 * the stream head for the maxpsz each time it does a write 3393 * we now cache it at the stream head. Check to see if this 3394 * queue is sitting directly below the stream head. 3395 */ 3396 wrq = STREAM(q)->sd_wrq; 3397 if (q != wrq->q_next) 3398 break; 3399 3400 /* 3401 * If the stream is not frozen drop the current QLOCK and 3402 * acquire the sd_wrq QLOCK which protects sd_qn_* 3403 */ 3404 if (freezer != curthread) { 3405 mutex_exit(QLOCK(q)); 3406 mutex_enter(QLOCK(wrq)); 3407 } 3408 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3409 3410 if (freezer != curthread) { 3411 mutex_exit(QLOCK(wrq)); 3412 mutex_enter(QLOCK(q)); 3413 } 3414 break; 3415 3416 case QSTRUIOT: 3417 if (qbp) 3418 error = EINVAL; 3419 else 3420 q->q_struiot = (ushort_t)val; 3421 break; 3422 3423 case QCOUNT: 3424 case QFIRST: 3425 case QLAST: 3426 case QFLAG: 3427 error = EPERM; 3428 break; 3429 3430 default: 3431 error = EINVAL; 3432 break; 3433 } 3434 done: 3435 if (freezer != curthread) 3436 mutex_exit(QLOCK(q)); 3437 return (error); 3438 } 3439 3440 /* 3441 * Get queue fields. 3442 */ 3443 int 3444 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3445 { 3446 qband_t *qbp = NULL; 3447 int error = 0; 3448 kthread_id_t freezer; 3449 3450 freezer = STREAM(q)->sd_freezer; 3451 if (freezer == curthread) { 3452 ASSERT(frozenstr(q)); 3453 ASSERT(MUTEX_HELD(QLOCK(q))); 3454 } else 3455 mutex_enter(QLOCK(q)); 3456 if (what >= QBAD) { 3457 error = EINVAL; 3458 goto done; 3459 } 3460 if (pri != 0) { 3461 int i; 3462 qband_t **qbpp; 3463 3464 if (pri > q->q_nband) { 3465 qbpp = &q->q_bandp; 3466 while (*qbpp) 3467 qbpp = &(*qbpp)->qb_next; 3468 while (pri > q->q_nband) { 3469 if ((*qbpp = allocband()) == NULL) { 3470 error = EAGAIN; 3471 goto done; 3472 } 3473 (*qbpp)->qb_hiwat = q->q_hiwat; 3474 (*qbpp)->qb_lowat = q->q_lowat; 3475 q->q_nband++; 3476 qbpp = &(*qbpp)->qb_next; 3477 } 3478 } 3479 qbp = q->q_bandp; 3480 i = pri; 3481 while (--i) 3482 qbp = qbp->qb_next; 3483 } 3484 switch (what) { 3485 case QHIWAT: 3486 if (qbp) 3487 *(size_t *)valp = qbp->qb_hiwat; 3488 else 3489 *(size_t *)valp = q->q_hiwat; 3490 break; 3491 3492 case QLOWAT: 3493 if (qbp) 3494 *(size_t *)valp = qbp->qb_lowat; 3495 else 3496 *(size_t *)valp = q->q_lowat; 3497 break; 3498 3499 case QMAXPSZ: 3500 if (qbp) 3501 error = EINVAL; 3502 else 3503 *(ssize_t *)valp = q->q_maxpsz; 3504 break; 3505 3506 case QMINPSZ: 3507 if (qbp) 3508 error = EINVAL; 3509 else 3510 *(ssize_t *)valp = q->q_minpsz; 3511 break; 3512 3513 case QCOUNT: 3514 if (qbp) 3515 *(size_t *)valp = qbp->qb_count; 3516 else 3517 *(size_t *)valp = q->q_count; 3518 break; 3519 3520 case QFIRST: 3521 if (qbp) 3522 *(mblk_t **)valp = qbp->qb_first; 3523 else 3524 *(mblk_t **)valp = q->q_first; 3525 break; 3526 3527 case QLAST: 3528 if (qbp) 3529 *(mblk_t **)valp = qbp->qb_last; 3530 else 3531 *(mblk_t **)valp = q->q_last; 3532 break; 3533 3534 case QFLAG: 3535 if (qbp) 3536 *(uint_t *)valp = qbp->qb_flag; 3537 else 3538 *(uint_t *)valp = q->q_flag; 3539 break; 3540 3541 case QSTRUIOT: 3542 if (qbp) 3543 error = EINVAL; 3544 else 3545 *(short *)valp = q->q_struiot; 3546 break; 3547 3548 default: 3549 error = EINVAL; 3550 break; 3551 } 3552 done: 3553 if (freezer != curthread) 3554 mutex_exit(QLOCK(q)); 3555 return (error); 3556 } 3557 3558 /* 3559 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3560 * QWANTWSYNC or QWANTR or QWANTW, 3561 * 3562 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3563 * deferred wakeup will be done. Also if strpoll() in progress then a 3564 * deferred pollwakeup will be done. 3565 */ 3566 void 3567 strwakeq(queue_t *q, int flag) 3568 { 3569 stdata_t *stp = STREAM(q); 3570 pollhead_t *pl; 3571 3572 mutex_enter(&stp->sd_lock); 3573 pl = &stp->sd_pollist; 3574 if (flag & QWANTWSYNC) { 3575 ASSERT(!(q->q_flag & QREADR)); 3576 if (stp->sd_flag & WSLEEP) { 3577 stp->sd_flag &= ~WSLEEP; 3578 cv_broadcast(&stp->sd_wrq->q_wait); 3579 } else { 3580 stp->sd_wakeq |= WSLEEP; 3581 } 3582 3583 mutex_exit(&stp->sd_lock); 3584 pollwakeup(pl, POLLWRNORM); 3585 mutex_enter(&stp->sd_lock); 3586 3587 if (stp->sd_sigflags & S_WRNORM) 3588 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3589 } else if (flag & QWANTR) { 3590 if (stp->sd_flag & RSLEEP) { 3591 stp->sd_flag &= ~RSLEEP; 3592 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3593 } else { 3594 stp->sd_wakeq |= RSLEEP; 3595 } 3596 3597 mutex_exit(&stp->sd_lock); 3598 pollwakeup(pl, POLLIN | POLLRDNORM); 3599 mutex_enter(&stp->sd_lock); 3600 3601 { 3602 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3603 3604 if (events) 3605 strsendsig(stp->sd_siglist, events, 0, 0); 3606 } 3607 } else { 3608 if (stp->sd_flag & WSLEEP) { 3609 stp->sd_flag &= ~WSLEEP; 3610 cv_broadcast(&stp->sd_wrq->q_wait); 3611 } 3612 3613 mutex_exit(&stp->sd_lock); 3614 pollwakeup(pl, POLLWRNORM); 3615 mutex_enter(&stp->sd_lock); 3616 3617 if (stp->sd_sigflags & S_WRNORM) 3618 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3619 } 3620 mutex_exit(&stp->sd_lock); 3621 } 3622 3623 int 3624 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3625 { 3626 stdata_t *stp = STREAM(q); 3627 int typ = STRUIOT_STANDARD; 3628 uio_t *uiop = &dp->d_uio; 3629 dblk_t *dbp; 3630 ssize_t uiocnt; 3631 ssize_t cnt; 3632 unsigned char *ptr; 3633 ssize_t resid; 3634 int error = 0; 3635 on_trap_data_t otd; 3636 queue_t *stwrq; 3637 3638 /* 3639 * Plumbing may change while taking the type so store the 3640 * queue in a temporary variable. It doesn't matter even 3641 * if the we take the type from the previous plumbing, 3642 * that's because if the plumbing has changed when we were 3643 * holding the queue in a temporary variable, we can continue 3644 * processing the message the way it would have been processed 3645 * in the old plumbing, without any side effects but a bit 3646 * extra processing for partial ip header checksum. 3647 * 3648 * This has been done to avoid holding the sd_lock which is 3649 * very hot. 3650 */ 3651 3652 stwrq = stp->sd_struiowrq; 3653 if (stwrq) 3654 typ = stwrq->q_struiot; 3655 3656 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3657 dbp = mp->b_datap; 3658 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3659 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3660 cnt = MIN(uiocnt, uiop->uio_resid); 3661 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3662 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3663 /* 3664 * Either this mblk has already been processed 3665 * or there is no more room in this mblk (?). 3666 */ 3667 continue; 3668 } 3669 switch (typ) { 3670 case STRUIOT_STANDARD: 3671 if (noblock) { 3672 if (on_trap(&otd, OT_DATA_ACCESS)) { 3673 no_trap(); 3674 error = EWOULDBLOCK; 3675 goto out; 3676 } 3677 } 3678 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3679 if (noblock) 3680 no_trap(); 3681 goto out; 3682 } 3683 if (noblock) 3684 no_trap(); 3685 break; 3686 3687 default: 3688 error = EIO; 3689 goto out; 3690 } 3691 dbp->db_struioflag |= STRUIO_DONE; 3692 dbp->db_cksumstuff += cnt; 3693 } 3694 out: 3695 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3696 /* 3697 * A fault has occured and some bytes were moved to the 3698 * current mblk, the uio_t has already been updated by 3699 * the appropriate uio routine, so also update the mblk 3700 * to reflect this in case this same mblk chain is used 3701 * again (after the fault has been handled). 3702 */ 3703 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3704 if (uiocnt >= resid) 3705 dbp->db_cksumstuff += resid; 3706 } 3707 return (error); 3708 } 3709 3710 /* 3711 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3712 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3713 * that removeq() will not try to close the queue while a thread is inside the 3714 * queue. 3715 */ 3716 static boolean_t 3717 rwnext_enter(queue_t *qp) 3718 { 3719 mutex_enter(QLOCK(qp)); 3720 if (qp->q_flag & QWCLOSE) { 3721 mutex_exit(QLOCK(qp)); 3722 return (B_FALSE); 3723 } 3724 qp->q_rwcnt++; 3725 ASSERT(qp->q_rwcnt != 0); 3726 mutex_exit(QLOCK(qp)); 3727 return (B_TRUE); 3728 } 3729 3730 /* 3731 * Decrease the count of threads running in sync stream queue and wake up any 3732 * threads blocked in removeq(). 3733 */ 3734 static void 3735 rwnext_exit(queue_t *qp) 3736 { 3737 mutex_enter(QLOCK(qp)); 3738 qp->q_rwcnt--; 3739 if (qp->q_flag & QWANTRMQSYNC) { 3740 qp->q_flag &= ~QWANTRMQSYNC; 3741 cv_broadcast(&qp->q_wait); 3742 } 3743 mutex_exit(QLOCK(qp)); 3744 } 3745 3746 /* 3747 * The purpose of rwnext() is to call the rw procedure of the next 3748 * (downstream) modules queue. 3749 * 3750 * treated as put entrypoint for perimeter syncronization. 3751 * 3752 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3753 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3754 * not matter if any regular put entrypoints have been already entered. We 3755 * can't increment one of the sq_putcounts (instead of sq_count) because 3756 * qwait_rw won't know which counter to decrement. 3757 * 3758 * It would be reasonable to add the lockless FASTPUT logic. 3759 */ 3760 int 3761 rwnext(queue_t *qp, struiod_t *dp) 3762 { 3763 queue_t *nqp; 3764 syncq_t *sq; 3765 uint16_t count; 3766 uint16_t flags; 3767 struct qinit *qi; 3768 int (*proc)(); 3769 struct stdata *stp; 3770 int isread; 3771 int rval; 3772 3773 stp = STREAM(qp); 3774 /* 3775 * Prevent q_next from changing by holding sd_lock until acquiring 3776 * SQLOCK. Note that a read-side rwnext from the streamhead will 3777 * already have sd_lock acquired. In either case sd_lock is always 3778 * released after acquiring SQLOCK. 3779 * 3780 * The streamhead read-side holding sd_lock when calling rwnext is 3781 * required to prevent a race condition were M_DATA mblks flowing 3782 * up the read-side of the stream could be bypassed by a rwnext() 3783 * down-call. In this case sd_lock acts as the streamhead perimeter. 3784 */ 3785 if ((nqp = _WR(qp)) == qp) { 3786 isread = 0; 3787 mutex_enter(&stp->sd_lock); 3788 qp = nqp->q_next; 3789 } else { 3790 isread = 1; 3791 if (nqp != stp->sd_wrq) 3792 /* Not streamhead */ 3793 mutex_enter(&stp->sd_lock); 3794 qp = _RD(nqp->q_next); 3795 } 3796 qi = qp->q_qinfo; 3797 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3798 /* 3799 * Not a synchronous module or no r/w procedure for this 3800 * queue, so just return EINVAL and let the caller handle it. 3801 */ 3802 mutex_exit(&stp->sd_lock); 3803 return (EINVAL); 3804 } 3805 3806 if (rwnext_enter(qp) == B_FALSE) { 3807 mutex_exit(&stp->sd_lock); 3808 return (EINVAL); 3809 } 3810 3811 sq = qp->q_syncq; 3812 mutex_enter(SQLOCK(sq)); 3813 mutex_exit(&stp->sd_lock); 3814 count = sq->sq_count; 3815 flags = sq->sq_flags; 3816 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3817 3818 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3819 /* 3820 * if this queue is being closed, return. 3821 */ 3822 if (qp->q_flag & QWCLOSE) { 3823 mutex_exit(SQLOCK(sq)); 3824 rwnext_exit(qp); 3825 return (EINVAL); 3826 } 3827 3828 /* 3829 * Wait until we can enter the inner perimeter. 3830 */ 3831 sq->sq_flags = flags | SQ_WANTWAKEUP; 3832 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3833 count = sq->sq_count; 3834 flags = sq->sq_flags; 3835 } 3836 3837 if (isread == 0 && stp->sd_struiowrq == NULL || 3838 isread == 1 && stp->sd_struiordq == NULL) { 3839 /* 3840 * Stream plumbing changed while waiting for inner perimeter 3841 * so just return EINVAL and let the caller handle it. 3842 */ 3843 mutex_exit(SQLOCK(sq)); 3844 rwnext_exit(qp); 3845 return (EINVAL); 3846 } 3847 if (!(flags & SQ_CIPUT)) 3848 sq->sq_flags = flags | SQ_EXCL; 3849 sq->sq_count = count + 1; 3850 ASSERT(sq->sq_count != 0); /* Wraparound */ 3851 /* 3852 * Note: The only message ordering guarantee that rwnext() makes is 3853 * for the write queue flow-control case. All others (r/w queue 3854 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3855 * the queue's rw procedure. This could be genralized here buy 3856 * running the queue's service procedure, but that wouldn't be 3857 * the most efficent for all cases. 3858 */ 3859 mutex_exit(SQLOCK(sq)); 3860 if (! isread && (qp->q_flag & QFULL)) { 3861 /* 3862 * Write queue may be flow controlled. If so, 3863 * mark the queue for wakeup when it's not. 3864 */ 3865 mutex_enter(QLOCK(qp)); 3866 if (qp->q_flag & QFULL) { 3867 qp->q_flag |= QWANTWSYNC; 3868 mutex_exit(QLOCK(qp)); 3869 rval = EWOULDBLOCK; 3870 goto out; 3871 } 3872 mutex_exit(QLOCK(qp)); 3873 } 3874 3875 if (! isread && dp->d_mp) 3876 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3877 dp->d_mp->b_datap->db_base); 3878 3879 rval = (*proc)(qp, dp); 3880 3881 if (isread && dp->d_mp) 3882 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3883 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3884 out: 3885 /* 3886 * The queue is protected from being freed by sq_count, so it is 3887 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3888 */ 3889 rwnext_exit(qp); 3890 3891 mutex_enter(SQLOCK(sq)); 3892 flags = sq->sq_flags; 3893 ASSERT(sq->sq_count != 0); 3894 sq->sq_count--; 3895 if (flags & SQ_TAIL) { 3896 putnext_tail(sq, qp, flags); 3897 /* 3898 * The only purpose of this ASSERT is to preserve calling stack 3899 * in DEBUG kernel. 3900 */ 3901 ASSERT(flags & SQ_TAIL); 3902 return (rval); 3903 } 3904 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3905 /* 3906 * Safe to always drop SQ_EXCL: 3907 * Not SQ_CIPUT means we set SQ_EXCL above 3908 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3909 * did a qwriter(INNER) in which case nobody else 3910 * is in the inner perimeter and we are exiting. 3911 * 3912 * I would like to make the following assertion: 3913 * 3914 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3915 * sq->sq_count == 0); 3916 * 3917 * which indicates that if we are both putshared and exclusive, 3918 * we became exclusive while executing the putproc, and the only 3919 * claim on the syncq was the one we dropped a few lines above. 3920 * But other threads that enter putnext while the syncq is exclusive 3921 * need to make a claim as they may need to drop SQLOCK in the 3922 * has_writers case to avoid deadlocks. If these threads are 3923 * delayed or preempted, it is possible that the writer thread can 3924 * find out that there are other claims making the (sq_count == 0) 3925 * test invalid. 3926 */ 3927 3928 sq->sq_flags = flags & ~SQ_EXCL; 3929 if (sq->sq_flags & SQ_WANTWAKEUP) { 3930 sq->sq_flags &= ~SQ_WANTWAKEUP; 3931 cv_broadcast(&sq->sq_wait); 3932 } 3933 mutex_exit(SQLOCK(sq)); 3934 return (rval); 3935 } 3936 3937 /* 3938 * The purpose of infonext() is to call the info procedure of the next 3939 * (downstream) modules queue. 3940 * 3941 * treated as put entrypoint for perimeter syncronization. 3942 * 3943 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3944 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3945 * it does not matter if any regular put entrypoints have been already 3946 * entered. 3947 */ 3948 int 3949 infonext(queue_t *qp, infod_t *idp) 3950 { 3951 queue_t *nqp; 3952 syncq_t *sq; 3953 uint16_t count; 3954 uint16_t flags; 3955 struct qinit *qi; 3956 int (*proc)(); 3957 struct stdata *stp; 3958 int rval; 3959 3960 stp = STREAM(qp); 3961 /* 3962 * Prevent q_next from changing by holding sd_lock until 3963 * acquiring SQLOCK. 3964 */ 3965 mutex_enter(&stp->sd_lock); 3966 if ((nqp = _WR(qp)) == qp) { 3967 qp = nqp->q_next; 3968 } else { 3969 qp = _RD(nqp->q_next); 3970 } 3971 qi = qp->q_qinfo; 3972 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3973 mutex_exit(&stp->sd_lock); 3974 return (EINVAL); 3975 } 3976 sq = qp->q_syncq; 3977 mutex_enter(SQLOCK(sq)); 3978 mutex_exit(&stp->sd_lock); 3979 count = sq->sq_count; 3980 flags = sq->sq_flags; 3981 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3982 3983 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3984 /* 3985 * Wait until we can enter the inner perimeter. 3986 */ 3987 sq->sq_flags = flags | SQ_WANTWAKEUP; 3988 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3989 count = sq->sq_count; 3990 flags = sq->sq_flags; 3991 } 3992 3993 if (! (flags & SQ_CIPUT)) 3994 sq->sq_flags = flags | SQ_EXCL; 3995 sq->sq_count = count + 1; 3996 ASSERT(sq->sq_count != 0); /* Wraparound */ 3997 mutex_exit(SQLOCK(sq)); 3998 3999 rval = (*proc)(qp, idp); 4000 4001 mutex_enter(SQLOCK(sq)); 4002 flags = sq->sq_flags; 4003 ASSERT(sq->sq_count != 0); 4004 sq->sq_count--; 4005 if (flags & SQ_TAIL) { 4006 putnext_tail(sq, qp, flags); 4007 /* 4008 * The only purpose of this ASSERT is to preserve calling stack 4009 * in DEBUG kernel. 4010 */ 4011 ASSERT(flags & SQ_TAIL); 4012 return (rval); 4013 } 4014 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 4015 /* 4016 * XXXX 4017 * I am not certain the next comment is correct here. I need to consider 4018 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 4019 * might cause other problems. It just might be safer to drop it if 4020 * !SQ_CIPUT because that is when we set it. 4021 */ 4022 /* 4023 * Safe to always drop SQ_EXCL: 4024 * Not SQ_CIPUT means we set SQ_EXCL above 4025 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 4026 * did a qwriter(INNER) in which case nobody else 4027 * is in the inner perimeter and we are exiting. 4028 * 4029 * I would like to make the following assertion: 4030 * 4031 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 4032 * sq->sq_count == 0); 4033 * 4034 * which indicates that if we are both putshared and exclusive, 4035 * we became exclusive while executing the putproc, and the only 4036 * claim on the syncq was the one we dropped a few lines above. 4037 * But other threads that enter putnext while the syncq is exclusive 4038 * need to make a claim as they may need to drop SQLOCK in the 4039 * has_writers case to avoid deadlocks. If these threads are 4040 * delayed or preempted, it is possible that the writer thread can 4041 * find out that there are other claims making the (sq_count == 0) 4042 * test invalid. 4043 */ 4044 4045 sq->sq_flags = flags & ~SQ_EXCL; 4046 mutex_exit(SQLOCK(sq)); 4047 return (rval); 4048 } 4049 4050 /* 4051 * Return nonzero if the queue is responsible for struio(), else return 0. 4052 */ 4053 int 4054 isuioq(queue_t *q) 4055 { 4056 if (q->q_flag & QREADR) 4057 return (STREAM(q)->sd_struiordq == q); 4058 else 4059 return (STREAM(q)->sd_struiowrq == q); 4060 } 4061 4062 #if defined(__sparc) 4063 int disable_putlocks = 0; 4064 #else 4065 int disable_putlocks = 1; 4066 #endif 4067 4068 /* 4069 * called by create_putlock. 4070 */ 4071 static void 4072 create_syncq_putlocks(queue_t *q) 4073 { 4074 syncq_t *sq = q->q_syncq; 4075 ciputctrl_t *cip; 4076 int i; 4077 4078 ASSERT(sq != NULL); 4079 4080 ASSERT(disable_putlocks == 0); 4081 ASSERT(n_ciputctrl >= min_n_ciputctrl); 4082 ASSERT(ciputctrl_cache != NULL); 4083 4084 if (!(sq->sq_type & SQ_CIPUT)) 4085 return; 4086 4087 for (i = 0; i <= 1; i++) { 4088 if (sq->sq_ciputctrl == NULL) { 4089 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4090 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4091 mutex_enter(SQLOCK(sq)); 4092 if (sq->sq_ciputctrl != NULL) { 4093 mutex_exit(SQLOCK(sq)); 4094 kmem_cache_free(ciputctrl_cache, cip); 4095 } else { 4096 ASSERT(sq->sq_nciputctrl == 0); 4097 sq->sq_nciputctrl = n_ciputctrl - 1; 4098 /* 4099 * putnext checks sq_ciputctrl without holding 4100 * SQLOCK. if it is not NULL putnext assumes 4101 * sq_nciputctrl is initialized. membar below 4102 * insures that. 4103 */ 4104 membar_producer(); 4105 sq->sq_ciputctrl = cip; 4106 mutex_exit(SQLOCK(sq)); 4107 } 4108 } 4109 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 4110 if (i == 1) 4111 break; 4112 q = _OTHERQ(q); 4113 if (!(q->q_flag & QPERQ)) { 4114 ASSERT(sq == q->q_syncq); 4115 break; 4116 } 4117 ASSERT(q->q_syncq != NULL); 4118 ASSERT(sq != q->q_syncq); 4119 sq = q->q_syncq; 4120 ASSERT(sq->sq_type & SQ_CIPUT); 4121 } 4122 } 4123 4124 /* 4125 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 4126 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 4127 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 4128 * starting from q and down to the driver. 4129 * 4130 * This should be called after the affected queues are part of stream 4131 * geometry. It should be called from driver/module open routine after 4132 * qprocson() call. It is also called from nfs syscall where it is known that 4133 * stream is configured and won't change its geometry during create_putlock 4134 * call. 4135 * 4136 * caller normally uses 0 value for the stream argument to speed up MT putnext 4137 * into the perimeter of q for example because its perimeter is per module 4138 * (e.g. IP). 4139 * 4140 * caller normally uses non 0 value for the stream argument to hint the system 4141 * that the stream of q is a very contended global system stream 4142 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 4143 * particularly MT hot. 4144 * 4145 * Caller insures stream plumbing won't happen while we are here and therefore 4146 * q_next can be safely used. 4147 */ 4148 4149 void 4150 create_putlocks(queue_t *q, int stream) 4151 { 4152 ciputctrl_t *cip; 4153 struct stdata *stp = STREAM(q); 4154 4155 q = _WR(q); 4156 ASSERT(stp != NULL); 4157 4158 if (disable_putlocks != 0) 4159 return; 4160 4161 if (n_ciputctrl < min_n_ciputctrl) 4162 return; 4163 4164 ASSERT(ciputctrl_cache != NULL); 4165 4166 if (stream != 0 && stp->sd_ciputctrl == NULL) { 4167 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4168 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4169 mutex_enter(&stp->sd_lock); 4170 if (stp->sd_ciputctrl != NULL) { 4171 mutex_exit(&stp->sd_lock); 4172 kmem_cache_free(ciputctrl_cache, cip); 4173 } else { 4174 ASSERT(stp->sd_nciputctrl == 0); 4175 stp->sd_nciputctrl = n_ciputctrl - 1; 4176 /* 4177 * putnext checks sd_ciputctrl without holding 4178 * sd_lock. if it is not NULL putnext assumes 4179 * sd_nciputctrl is initialized. membar below 4180 * insures that. 4181 */ 4182 membar_producer(); 4183 stp->sd_ciputctrl = cip; 4184 mutex_exit(&stp->sd_lock); 4185 } 4186 } 4187 4188 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 4189 4190 while (_SAMESTR(q)) { 4191 create_syncq_putlocks(q); 4192 if (stream == 0) 4193 return; 4194 q = q->q_next; 4195 } 4196 ASSERT(q != NULL); 4197 create_syncq_putlocks(q); 4198 } 4199 4200 /* 4201 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 4202 * through a stream. 4203 * 4204 * Data currently record per-event is a timestamp, module/driver name, 4205 * downstream module/driver name, optional callstack, event type and a per 4206 * type datum. Much of the STREAMS framework is instrumented for automatic 4207 * flow tracing (when enabled). Events can be defined and used by STREAMS 4208 * modules and drivers. 4209 * 4210 * Global objects: 4211 * 4212 * str_ftevent() - Add a flow-trace event to a dblk. 4213 * str_ftfree() - Free flow-trace data 4214 * 4215 * Local objects: 4216 * 4217 * fthdr_cache - pointer to the kmem cache for trace header. 4218 * ftblk_cache - pointer to the kmem cache for trace data blocks. 4219 */ 4220 4221 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 4222 int str_ftstack = 0; /* Don't record event call stacks */ 4223 4224 void 4225 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 4226 { 4227 ftblk_t *bp = hp->tail; 4228 ftblk_t *nbp; 4229 ftevnt_t *ep; 4230 int ix, nix; 4231 4232 ASSERT(hp != NULL); 4233 4234 for (;;) { 4235 if ((ix = bp->ix) == FTBLK_EVNTS) { 4236 /* 4237 * Tail doesn't have room, so need a new tail. 4238 * 4239 * To make this MT safe, first, allocate a new 4240 * ftblk, and initialize it. To make life a 4241 * little easier, reserve the first slot (mostly 4242 * by making ix = 1). When we are finished with 4243 * the initialization, CAS this pointer to the 4244 * tail. If this succeeds, this is the new 4245 * "next" block. Otherwise, another thread 4246 * got here first, so free the block and start 4247 * again. 4248 */ 4249 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP); 4250 if (nbp == NULL) { 4251 /* no mem, so punt */ 4252 str_ftnever++; 4253 /* free up all flow data? */ 4254 return; 4255 } 4256 nbp->nxt = NULL; 4257 nbp->ix = 1; 4258 /* 4259 * Just in case there is another thread about 4260 * to get the next index, we need to make sure 4261 * the value is there for it. 4262 */ 4263 membar_producer(); 4264 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) { 4265 /* CAS was successful */ 4266 bp->nxt = nbp; 4267 membar_producer(); 4268 bp = nbp; 4269 ix = 0; 4270 goto cas_good; 4271 } else { 4272 kmem_cache_free(ftblk_cache, nbp); 4273 bp = hp->tail; 4274 continue; 4275 } 4276 } 4277 nix = ix + 1; 4278 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) { 4279 cas_good: 4280 if (curthread != hp->thread) { 4281 hp->thread = curthread; 4282 evnt |= FTEV_CS; 4283 } 4284 if (CPU->cpu_seqid != hp->cpu_seqid) { 4285 hp->cpu_seqid = CPU->cpu_seqid; 4286 evnt |= FTEV_PS; 4287 } 4288 ep = &bp->ev[ix]; 4289 break; 4290 } 4291 } 4292 4293 if (evnt & FTEV_QMASK) { 4294 queue_t *qp = p; 4295 4296 if (!(qp->q_flag & QREADR)) 4297 evnt |= FTEV_ISWR; 4298 4299 ep->mid = Q2NAME(qp); 4300 4301 /* 4302 * We only record the next queue name for FTEV_PUTNEXT since 4303 * that's the only time we *really* need it, and the putnext() 4304 * code ensures that qp->q_next won't vanish. (We could use 4305 * claimstr()/releasestr() but at a performance cost.) 4306 */ 4307 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL) 4308 ep->midnext = Q2NAME(qp->q_next); 4309 else 4310 ep->midnext = NULL; 4311 } else { 4312 ep->mid = p; 4313 ep->midnext = NULL; 4314 } 4315 4316 if (ep->stk != NULL) 4317 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH); 4318 4319 ep->ts = gethrtime(); 4320 ep->evnt = evnt; 4321 ep->data = data; 4322 hp->hash = (hp->hash << 9) + hp->hash; 4323 hp->hash += (evnt << 16) | data; 4324 hp->hash += (uintptr_t)ep->mid; 4325 } 4326 4327 /* 4328 * Free flow-trace data. 4329 */ 4330 void 4331 str_ftfree(dblk_t *dbp) 4332 { 4333 fthdr_t *hp = dbp->db_fthdr; 4334 ftblk_t *bp = &hp->first; 4335 ftblk_t *nbp; 4336 4337 if (bp != hp->tail || bp->ix != 0) { 4338 /* 4339 * Clear out the hash, have the tail point to itself, and free 4340 * any continuation blocks. 4341 */ 4342 bp = hp->first.nxt; 4343 hp->tail = &hp->first; 4344 hp->hash = 0; 4345 hp->first.nxt = NULL; 4346 hp->first.ix = 0; 4347 while (bp != NULL) { 4348 nbp = bp->nxt; 4349 kmem_cache_free(ftblk_cache, bp); 4350 bp = nbp; 4351 } 4352 } 4353 kmem_cache_free(fthdr_cache, hp); 4354 dbp->db_fthdr = NULL; 4355 } 4356