1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 /* 25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 */ 28 29 #include <sys/types.h> 30 #include <sys/param.h> 31 #include <sys/thread.h> 32 #include <sys/sysmacros.h> 33 #include <sys/stropts.h> 34 #include <sys/stream.h> 35 #include <sys/strsubr.h> 36 #include <sys/strsun.h> 37 #include <sys/conf.h> 38 #include <sys/debug.h> 39 #include <sys/cmn_err.h> 40 #include <sys/kmem.h> 41 #include <sys/atomic.h> 42 #include <sys/errno.h> 43 #include <sys/vtrace.h> 44 #include <sys/ftrace.h> 45 #include <sys/ontrap.h> 46 #include <sys/multidata.h> 47 #include <sys/multidata_impl.h> 48 #include <sys/sdt.h> 49 #include <sys/strft.h> 50 51 #ifdef DEBUG 52 #include <sys/kmem_impl.h> 53 #endif 54 55 /* 56 * This file contains all the STREAMS utility routines that may 57 * be used by modules and drivers. 58 */ 59 60 /* 61 * STREAMS message allocator: principles of operation 62 * 63 * The streams message allocator consists of all the routines that 64 * allocate, dup and free streams messages: allocb(), [d]esballoc[a], 65 * dupb(), freeb() and freemsg(). What follows is a high-level view 66 * of how the allocator works. 67 * 68 * Every streams message consists of one or more mblks, a dblk, and data. 69 * All mblks for all types of messages come from a common mblk_cache. 70 * The dblk and data come in several flavors, depending on how the 71 * message is allocated: 72 * 73 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of 74 * fixed-size dblk/data caches. For message sizes that are multiples of 75 * PAGESIZE, dblks are allocated separately from the buffer. 76 * The associated buffer is allocated by the constructor using kmem_alloc(). 77 * For all other message sizes, dblk and its associated data is allocated 78 * as a single contiguous chunk of memory. 79 * Objects in these caches consist of a dblk plus its associated data. 80 * allocb() determines the nearest-size cache by table lookup: 81 * the dblk_cache[] array provides the mapping from size to dblk cache. 82 * 83 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by 84 * kmem_alloc()'ing a buffer for the data and supplying that 85 * buffer to gesballoc(), described below. 86 * 87 * (3) The four flavors of [d]esballoc[a] are all implemented by a 88 * common routine, gesballoc() ("generic esballoc"). gesballoc() 89 * allocates a dblk from the global dblk_esb_cache and sets db_base, 90 * db_lim and db_frtnp to describe the caller-supplied buffer. 91 * 92 * While there are several routines to allocate messages, there is only 93 * one routine to free messages: freeb(). freeb() simply invokes the 94 * dblk's free method, dbp->db_free(), which is set at allocation time. 95 * 96 * dupb() creates a new reference to a message by allocating a new mblk, 97 * incrementing the dblk reference count and setting the dblk's free 98 * method to dblk_decref(). The dblk's original free method is retained 99 * in db_lastfree. dblk_decref() decrements the reference count on each 100 * freeb(). If this is not the last reference it just frees the mblk; 101 * if this *is* the last reference, it restores db_free to db_lastfree, 102 * sets db_mblk to the current mblk (see below), and invokes db_lastfree. 103 * 104 * The implementation makes aggressive use of kmem object caching for 105 * maximum performance. This makes the code simple and compact, but 106 * also a bit abstruse in some places. The invariants that constitute a 107 * message's constructed state, described below, are more subtle than usual. 108 * 109 * Every dblk has an "attached mblk" as part of its constructed state. 110 * The mblk is allocated by the dblk's constructor and remains attached 111 * until the message is either dup'ed or pulled up. In the dupb() case 112 * the mblk association doesn't matter until the last free, at which time 113 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects 114 * the mblk association because it swaps the leading mblks of two messages, 115 * so it is responsible for swapping their db_mblk pointers accordingly. 116 * From a constructed-state viewpoint it doesn't matter that a dblk's 117 * attached mblk can change while the message is allocated; all that 118 * matters is that the dblk has *some* attached mblk when it's freed. 119 * 120 * The sizes of the allocb() small-message caches are not magical. 121 * They represent a good trade-off between internal and external 122 * fragmentation for current workloads. They should be reevaluated 123 * periodically, especially if allocations larger than DBLK_MAX_CACHE 124 * become common. We use 64-byte alignment so that dblks don't 125 * straddle cache lines unnecessarily. 126 */ 127 #define DBLK_MAX_CACHE 73728 128 #define DBLK_CACHE_ALIGN 64 129 #define DBLK_MIN_SIZE 8 130 #define DBLK_SIZE_SHIFT 3 131 132 #ifdef _BIG_ENDIAN 133 #define DBLK_RTFU_SHIFT(field) \ 134 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field)) 135 #else 136 #define DBLK_RTFU_SHIFT(field) \ 137 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref)) 138 #endif 139 140 #define DBLK_RTFU(ref, type, flags, uioflag) \ 141 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \ 142 ((type) << DBLK_RTFU_SHIFT(db_type)) | \ 143 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \ 144 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag))) 145 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref)) 146 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref)) 147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band)) 148 149 static size_t dblk_sizes[] = { 150 #ifdef _LP64 151 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856, 152 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624, 153 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392, 154 #else 155 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904, 156 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672, 157 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440, 158 #endif 159 DBLK_MAX_CACHE, 0 160 }; 161 162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE]; 163 static struct kmem_cache *mblk_cache; 164 static struct kmem_cache *dblk_esb_cache; 165 static struct kmem_cache *fthdr_cache; 166 static struct kmem_cache *ftblk_cache; 167 168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp); 169 static mblk_t *allocb_oversize(size_t size, int flags); 170 static int allocb_tryhard_fails; 171 static void frnop_func(void *arg); 172 frtn_t frnop = { frnop_func }; 173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp); 174 175 static boolean_t rwnext_enter(queue_t *qp); 176 static void rwnext_exit(queue_t *qp); 177 178 /* 179 * Patchable mblk/dblk kmem_cache flags. 180 */ 181 int dblk_kmem_flags = 0; 182 int mblk_kmem_flags = 0; 183 184 static int 185 dblk_constructor(void *buf, void *cdrarg, int kmflags) 186 { 187 dblk_t *dbp = buf; 188 ssize_t msg_size = (ssize_t)cdrarg; 189 size_t index; 190 191 ASSERT(msg_size != 0); 192 193 index = (msg_size - 1) >> DBLK_SIZE_SHIFT; 194 195 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)); 196 197 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 198 return (-1); 199 if ((msg_size & PAGEOFFSET) == 0) { 200 dbp->db_base = kmem_alloc(msg_size, kmflags); 201 if (dbp->db_base == NULL) { 202 kmem_cache_free(mblk_cache, dbp->db_mblk); 203 return (-1); 204 } 205 } else { 206 dbp->db_base = (unsigned char *)&dbp[1]; 207 } 208 209 dbp->db_mblk->b_datap = dbp; 210 dbp->db_cache = dblk_cache[index]; 211 dbp->db_lim = dbp->db_base + msg_size; 212 dbp->db_free = dbp->db_lastfree = dblk_lastfree; 213 dbp->db_frtnp = NULL; 214 dbp->db_fthdr = NULL; 215 dbp->db_credp = NULL; 216 dbp->db_cpid = -1; 217 dbp->db_struioflag = 0; 218 dbp->db_struioun.cksum.flags = 0; 219 return (0); 220 } 221 222 /*ARGSUSED*/ 223 static int 224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags) 225 { 226 dblk_t *dbp = buf; 227 228 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 229 return (-1); 230 dbp->db_mblk->b_datap = dbp; 231 dbp->db_cache = dblk_esb_cache; 232 dbp->db_fthdr = NULL; 233 dbp->db_credp = NULL; 234 dbp->db_cpid = -1; 235 dbp->db_struioflag = 0; 236 dbp->db_struioun.cksum.flags = 0; 237 return (0); 238 } 239 240 static int 241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags) 242 { 243 dblk_t *dbp = buf; 244 bcache_t *bcp = cdrarg; 245 246 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL) 247 return (-1); 248 249 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags); 250 if (dbp->db_base == NULL) { 251 kmem_cache_free(mblk_cache, dbp->db_mblk); 252 return (-1); 253 } 254 255 dbp->db_mblk->b_datap = dbp; 256 dbp->db_cache = (void *)bcp; 257 dbp->db_lim = dbp->db_base + bcp->size; 258 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree; 259 dbp->db_frtnp = NULL; 260 dbp->db_fthdr = NULL; 261 dbp->db_credp = NULL; 262 dbp->db_cpid = -1; 263 dbp->db_struioflag = 0; 264 dbp->db_struioun.cksum.flags = 0; 265 return (0); 266 } 267 268 /*ARGSUSED*/ 269 static void 270 dblk_destructor(void *buf, void *cdrarg) 271 { 272 dblk_t *dbp = buf; 273 ssize_t msg_size = (ssize_t)cdrarg; 274 275 ASSERT(dbp->db_mblk->b_datap == dbp); 276 ASSERT(msg_size != 0); 277 ASSERT(dbp->db_struioflag == 0); 278 ASSERT(dbp->db_struioun.cksum.flags == 0); 279 280 if ((msg_size & PAGEOFFSET) == 0) { 281 kmem_free(dbp->db_base, msg_size); 282 } 283 284 kmem_cache_free(mblk_cache, dbp->db_mblk); 285 } 286 287 static void 288 bcache_dblk_destructor(void *buf, void *cdrarg) 289 { 290 dblk_t *dbp = buf; 291 bcache_t *bcp = cdrarg; 292 293 kmem_cache_free(bcp->buffer_cache, dbp->db_base); 294 295 ASSERT(dbp->db_mblk->b_datap == dbp); 296 ASSERT(dbp->db_struioflag == 0); 297 ASSERT(dbp->db_struioun.cksum.flags == 0); 298 299 kmem_cache_free(mblk_cache, dbp->db_mblk); 300 } 301 302 /* ARGSUSED */ 303 static int 304 ftblk_constructor(void *buf, void *cdrarg, int kmflags) 305 { 306 ftblk_t *fbp = buf; 307 int i; 308 309 bzero(fbp, sizeof (ftblk_t)); 310 if (str_ftstack != 0) { 311 for (i = 0; i < FTBLK_EVNTS; i++) 312 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags); 313 } 314 315 return (0); 316 } 317 318 /* ARGSUSED */ 319 static void 320 ftblk_destructor(void *buf, void *cdrarg) 321 { 322 ftblk_t *fbp = buf; 323 int i; 324 325 if (str_ftstack != 0) { 326 for (i = 0; i < FTBLK_EVNTS; i++) { 327 if (fbp->ev[i].stk != NULL) { 328 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t)); 329 fbp->ev[i].stk = NULL; 330 } 331 } 332 } 333 } 334 335 static int 336 fthdr_constructor(void *buf, void *cdrarg, int kmflags) 337 { 338 fthdr_t *fhp = buf; 339 340 return (ftblk_constructor(&fhp->first, cdrarg, kmflags)); 341 } 342 343 static void 344 fthdr_destructor(void *buf, void *cdrarg) 345 { 346 fthdr_t *fhp = buf; 347 348 ftblk_destructor(&fhp->first, cdrarg); 349 } 350 351 void 352 streams_msg_init(void) 353 { 354 char name[40]; 355 size_t size; 356 size_t lastsize = DBLK_MIN_SIZE; 357 size_t *sizep; 358 struct kmem_cache *cp; 359 size_t tot_size; 360 int offset; 361 362 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32, 363 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags); 364 365 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) { 366 367 if ((offset = (size & PAGEOFFSET)) != 0) { 368 /* 369 * We are in the middle of a page, dblk should 370 * be allocated on the same page 371 */ 372 tot_size = size + sizeof (dblk_t); 373 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t)) 374 < PAGESIZE); 375 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0); 376 377 } else { 378 379 /* 380 * buf size is multiple of page size, dblk and 381 * buffer are allocated separately. 382 */ 383 384 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0); 385 tot_size = sizeof (dblk_t); 386 } 387 388 (void) sprintf(name, "streams_dblk_%ld", size); 389 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN, 390 dblk_constructor, dblk_destructor, NULL, (void *)(size), 391 NULL, dblk_kmem_flags); 392 393 while (lastsize <= size) { 394 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp; 395 lastsize += DBLK_MIN_SIZE; 396 } 397 } 398 399 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t), 400 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL, 401 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags); 402 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32, 403 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0); 404 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32, 405 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0); 406 407 /* Initialize Multidata caches */ 408 mmd_init(); 409 410 /* initialize throttling queue for esballoc */ 411 esballoc_queue_init(); 412 } 413 414 /*ARGSUSED*/ 415 mblk_t * 416 allocb(size_t size, uint_t pri) 417 { 418 dblk_t *dbp; 419 mblk_t *mp; 420 size_t index; 421 422 index = (size - 1) >> DBLK_SIZE_SHIFT; 423 424 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 425 if (size != 0) { 426 mp = allocb_oversize(size, KM_NOSLEEP); 427 goto out; 428 } 429 index = 0; 430 } 431 432 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) { 433 mp = NULL; 434 goto out; 435 } 436 437 mp = dbp->db_mblk; 438 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 439 mp->b_next = mp->b_prev = mp->b_cont = NULL; 440 mp->b_rptr = mp->b_wptr = dbp->db_base; 441 mp->b_queue = NULL; 442 MBLK_BAND_FLAG_WORD(mp) = 0; 443 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size); 444 out: 445 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp); 446 447 return (mp); 448 } 449 450 /* 451 * Allocate an mblk taking db_credp and db_cpid from the template. 452 * Allow the cred to be NULL. 453 */ 454 mblk_t * 455 allocb_tmpl(size_t size, const mblk_t *tmpl) 456 { 457 mblk_t *mp = allocb(size, 0); 458 459 if (mp != NULL) { 460 dblk_t *src = tmpl->b_datap; 461 dblk_t *dst = mp->b_datap; 462 cred_t *cr; 463 pid_t cpid; 464 465 cr = msg_getcred(tmpl, &cpid); 466 if (cr != NULL) 467 crhold(dst->db_credp = cr); 468 dst->db_cpid = cpid; 469 dst->db_type = src->db_type; 470 } 471 return (mp); 472 } 473 474 mblk_t * 475 allocb_cred(size_t size, cred_t *cr, pid_t cpid) 476 { 477 mblk_t *mp = allocb(size, 0); 478 479 ASSERT(cr != NULL); 480 if (mp != NULL) { 481 dblk_t *dbp = mp->b_datap; 482 483 crhold(dbp->db_credp = cr); 484 dbp->db_cpid = cpid; 485 } 486 return (mp); 487 } 488 489 mblk_t * 490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid) 491 { 492 mblk_t *mp = allocb_wait(size, 0, flags, error); 493 494 ASSERT(cr != NULL); 495 if (mp != NULL) { 496 dblk_t *dbp = mp->b_datap; 497 498 crhold(dbp->db_credp = cr); 499 dbp->db_cpid = cpid; 500 } 501 502 return (mp); 503 } 504 505 /* 506 * Extract the db_cred (and optionally db_cpid) from a message. 507 * We find the first mblk which has a non-NULL db_cred and use that. 508 * If none found we return NULL. 509 * Does NOT get a hold on the cred. 510 */ 511 cred_t * 512 msg_getcred(const mblk_t *mp, pid_t *cpidp) 513 { 514 cred_t *cr = NULL; 515 cred_t *cr2; 516 mblk_t *mp2; 517 518 while (mp != NULL) { 519 dblk_t *dbp = mp->b_datap; 520 521 cr = dbp->db_credp; 522 if (cr == NULL) { 523 mp = mp->b_cont; 524 continue; 525 } 526 if (cpidp != NULL) 527 *cpidp = dbp->db_cpid; 528 529 #ifdef DEBUG 530 /* 531 * Normally there should at most one db_credp in a message. 532 * But if there are multiple (as in the case of some M_IOC* 533 * and some internal messages in TCP/IP bind logic) then 534 * they must be identical in the normal case. 535 * However, a socket can be shared between different uids 536 * in which case data queued in TCP would be from different 537 * creds. Thus we can only assert for the zoneid being the 538 * same. Due to Multi-level Level Ports for TX, some 539 * cred_t can have a NULL cr_zone, and we skip the comparison 540 * in that case. 541 */ 542 mp2 = mp->b_cont; 543 while (mp2 != NULL) { 544 cr2 = DB_CRED(mp2); 545 if (cr2 != NULL) { 546 DTRACE_PROBE2(msg__getcred, 547 cred_t *, cr, cred_t *, cr2); 548 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 549 crgetzone(cr) == NULL || 550 crgetzone(cr2) == NULL); 551 } 552 mp2 = mp2->b_cont; 553 } 554 #endif 555 return (cr); 556 } 557 if (cpidp != NULL) 558 *cpidp = NOPID; 559 return (NULL); 560 } 561 562 /* 563 * Variant of msg_getcred which, when a cred is found 564 * 1. Returns with a hold on the cred 565 * 2. Clears the first cred in the mblk. 566 * This is more efficient to use than a msg_getcred() + crhold() when 567 * the message is freed after the cred has been extracted. 568 * 569 * The caller is responsible for ensuring that there is no other reference 570 * on the message since db_credp can not be cleared when there are other 571 * references. 572 */ 573 cred_t * 574 msg_extractcred(mblk_t *mp, pid_t *cpidp) 575 { 576 cred_t *cr = NULL; 577 cred_t *cr2; 578 mblk_t *mp2; 579 580 while (mp != NULL) { 581 dblk_t *dbp = mp->b_datap; 582 583 cr = dbp->db_credp; 584 if (cr == NULL) { 585 mp = mp->b_cont; 586 continue; 587 } 588 ASSERT(dbp->db_ref == 1); 589 dbp->db_credp = NULL; 590 if (cpidp != NULL) 591 *cpidp = dbp->db_cpid; 592 #ifdef DEBUG 593 /* 594 * Normally there should at most one db_credp in a message. 595 * But if there are multiple (as in the case of some M_IOC* 596 * and some internal messages in TCP/IP bind logic) then 597 * they must be identical in the normal case. 598 * However, a socket can be shared between different uids 599 * in which case data queued in TCP would be from different 600 * creds. Thus we can only assert for the zoneid being the 601 * same. Due to Multi-level Level Ports for TX, some 602 * cred_t can have a NULL cr_zone, and we skip the comparison 603 * in that case. 604 */ 605 mp2 = mp->b_cont; 606 while (mp2 != NULL) { 607 cr2 = DB_CRED(mp2); 608 if (cr2 != NULL) { 609 DTRACE_PROBE2(msg__extractcred, 610 cred_t *, cr, cred_t *, cr2); 611 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) || 612 crgetzone(cr) == NULL || 613 crgetzone(cr2) == NULL); 614 } 615 mp2 = mp2->b_cont; 616 } 617 #endif 618 return (cr); 619 } 620 return (NULL); 621 } 622 /* 623 * Get the label for a message. Uses the first mblk in the message 624 * which has a non-NULL db_credp. 625 * Returns NULL if there is no credp. 626 */ 627 extern struct ts_label_s * 628 msg_getlabel(const mblk_t *mp) 629 { 630 cred_t *cr = msg_getcred(mp, NULL); 631 632 if (cr == NULL) 633 return (NULL); 634 635 return (crgetlabel(cr)); 636 } 637 638 void 639 freeb(mblk_t *mp) 640 { 641 dblk_t *dbp = mp->b_datap; 642 643 ASSERT(dbp->db_ref > 0); 644 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 645 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp); 646 647 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 648 649 dbp->db_free(mp, dbp); 650 } 651 652 void 653 freemsg(mblk_t *mp) 654 { 655 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp); 656 while (mp) { 657 dblk_t *dbp = mp->b_datap; 658 mblk_t *mp_cont = mp->b_cont; 659 660 ASSERT(dbp->db_ref > 0); 661 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 662 663 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref); 664 665 dbp->db_free(mp, dbp); 666 mp = mp_cont; 667 } 668 } 669 670 /* 671 * Reallocate a block for another use. Try hard to use the old block. 672 * If the old data is wanted (copy), leave b_wptr at the end of the data, 673 * otherwise return b_wptr = b_rptr. 674 * 675 * This routine is private and unstable. 676 */ 677 mblk_t * 678 reallocb(mblk_t *mp, size_t size, uint_t copy) 679 { 680 mblk_t *mp1; 681 unsigned char *old_rptr; 682 ptrdiff_t cur_size; 683 684 if (mp == NULL) 685 return (allocb(size, BPRI_HI)); 686 687 cur_size = mp->b_wptr - mp->b_rptr; 688 old_rptr = mp->b_rptr; 689 690 ASSERT(mp->b_datap->db_ref != 0); 691 692 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) { 693 /* 694 * If the data is wanted and it will fit where it is, no 695 * work is required. 696 */ 697 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size) 698 return (mp); 699 700 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base; 701 mp1 = mp; 702 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) { 703 /* XXX other mp state could be copied too, db_flags ... ? */ 704 mp1->b_cont = mp->b_cont; 705 } else { 706 return (NULL); 707 } 708 709 if (copy) { 710 bcopy(old_rptr, mp1->b_rptr, cur_size); 711 mp1->b_wptr = mp1->b_rptr + cur_size; 712 } 713 714 if (mp != mp1) 715 freeb(mp); 716 717 return (mp1); 718 } 719 720 static void 721 dblk_lastfree(mblk_t *mp, dblk_t *dbp) 722 { 723 ASSERT(dbp->db_mblk == mp); 724 if (dbp->db_fthdr != NULL) 725 str_ftfree(dbp); 726 727 /* set credp and projid to be 'unspecified' before returning to cache */ 728 if (dbp->db_credp != NULL) { 729 crfree(dbp->db_credp); 730 dbp->db_credp = NULL; 731 } 732 dbp->db_cpid = -1; 733 734 /* Reset the struioflag and the checksum flag fields */ 735 dbp->db_struioflag = 0; 736 dbp->db_struioun.cksum.flags = 0; 737 738 /* and the COOKED and/or UIOA flag(s) */ 739 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA); 740 741 kmem_cache_free(dbp->db_cache, dbp); 742 } 743 744 static void 745 dblk_decref(mblk_t *mp, dblk_t *dbp) 746 { 747 if (dbp->db_ref != 1) { 748 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp), 749 -(1 << DBLK_RTFU_SHIFT(db_ref))); 750 /* 751 * atomic_add_32_nv() just decremented db_ref, so we no longer 752 * have a reference to the dblk, which means another thread 753 * could free it. Therefore we cannot examine the dblk to 754 * determine whether ours was the last reference. Instead, 755 * we extract the new and minimum reference counts from rtfu. 756 * Note that all we're really saying is "if (ref != refmin)". 757 */ 758 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) != 759 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) { 760 kmem_cache_free(mblk_cache, mp); 761 return; 762 } 763 } 764 dbp->db_mblk = mp; 765 dbp->db_free = dbp->db_lastfree; 766 dbp->db_lastfree(mp, dbp); 767 } 768 769 mblk_t * 770 dupb(mblk_t *mp) 771 { 772 dblk_t *dbp = mp->b_datap; 773 mblk_t *new_mp; 774 uint32_t oldrtfu, newrtfu; 775 776 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL) 777 goto out; 778 779 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL; 780 new_mp->b_rptr = mp->b_rptr; 781 new_mp->b_wptr = mp->b_wptr; 782 new_mp->b_datap = dbp; 783 new_mp->b_queue = NULL; 784 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp); 785 786 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref); 787 788 dbp->db_free = dblk_decref; 789 do { 790 ASSERT(dbp->db_ref > 0); 791 oldrtfu = DBLK_RTFU_WORD(dbp); 792 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref)); 793 /* 794 * If db_ref is maxed out we can't dup this message anymore. 795 */ 796 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) { 797 kmem_cache_free(mblk_cache, new_mp); 798 new_mp = NULL; 799 goto out; 800 } 801 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != 802 oldrtfu); 803 804 out: 805 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp); 806 return (new_mp); 807 } 808 809 static void 810 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp) 811 { 812 frtn_t *frp = dbp->db_frtnp; 813 814 ASSERT(dbp->db_mblk == mp); 815 frp->free_func(frp->free_arg); 816 if (dbp->db_fthdr != NULL) 817 str_ftfree(dbp); 818 819 /* set credp and projid to be 'unspecified' before returning to cache */ 820 if (dbp->db_credp != NULL) { 821 crfree(dbp->db_credp); 822 dbp->db_credp = NULL; 823 } 824 dbp->db_cpid = -1; 825 dbp->db_struioflag = 0; 826 dbp->db_struioun.cksum.flags = 0; 827 828 kmem_cache_free(dbp->db_cache, dbp); 829 } 830 831 /*ARGSUSED*/ 832 static void 833 frnop_func(void *arg) 834 { 835 } 836 837 /* 838 * Generic esballoc used to implement the four flavors: [d]esballoc[a]. 839 */ 840 static mblk_t * 841 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp, 842 void (*lastfree)(mblk_t *, dblk_t *), int kmflags) 843 { 844 dblk_t *dbp; 845 mblk_t *mp; 846 847 ASSERT(base != NULL && frp != NULL); 848 849 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) { 850 mp = NULL; 851 goto out; 852 } 853 854 mp = dbp->db_mblk; 855 dbp->db_base = base; 856 dbp->db_lim = base + size; 857 dbp->db_free = dbp->db_lastfree = lastfree; 858 dbp->db_frtnp = frp; 859 DBLK_RTFU_WORD(dbp) = db_rtfu; 860 mp->b_next = mp->b_prev = mp->b_cont = NULL; 861 mp->b_rptr = mp->b_wptr = base; 862 mp->b_queue = NULL; 863 MBLK_BAND_FLAG_WORD(mp) = 0; 864 865 out: 866 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp); 867 return (mp); 868 } 869 870 /*ARGSUSED*/ 871 mblk_t * 872 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 873 { 874 mblk_t *mp; 875 876 /* 877 * Note that this is structured to allow the common case (i.e. 878 * STREAMS flowtracing disabled) to call gesballoc() with tail 879 * call optimization. 880 */ 881 if (!str_ftnever) { 882 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 883 frp, freebs_enqueue, KM_NOSLEEP); 884 885 if (mp != NULL) 886 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 887 return (mp); 888 } 889 890 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 891 frp, freebs_enqueue, KM_NOSLEEP)); 892 } 893 894 /* 895 * Same as esballoc() but sleeps waiting for memory. 896 */ 897 /*ARGSUSED*/ 898 mblk_t * 899 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 900 { 901 mblk_t *mp; 902 903 /* 904 * Note that this is structured to allow the common case (i.e. 905 * STREAMS flowtracing disabled) to call gesballoc() with tail 906 * call optimization. 907 */ 908 if (!str_ftnever) { 909 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 910 frp, freebs_enqueue, KM_SLEEP); 911 912 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size); 913 return (mp); 914 } 915 916 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 917 frp, freebs_enqueue, KM_SLEEP)); 918 } 919 920 /*ARGSUSED*/ 921 mblk_t * 922 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 923 { 924 mblk_t *mp; 925 926 /* 927 * Note that this is structured to allow the common case (i.e. 928 * STREAMS flowtracing disabled) to call gesballoc() with tail 929 * call optimization. 930 */ 931 if (!str_ftnever) { 932 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 933 frp, dblk_lastfree_desb, KM_NOSLEEP); 934 935 if (mp != NULL) 936 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size); 937 return (mp); 938 } 939 940 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0), 941 frp, dblk_lastfree_desb, KM_NOSLEEP)); 942 } 943 944 /*ARGSUSED*/ 945 mblk_t * 946 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 947 { 948 mblk_t *mp; 949 950 /* 951 * Note that this is structured to allow the common case (i.e. 952 * STREAMS flowtracing disabled) to call gesballoc() with tail 953 * call optimization. 954 */ 955 if (!str_ftnever) { 956 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 957 frp, freebs_enqueue, KM_NOSLEEP); 958 959 if (mp != NULL) 960 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size); 961 return (mp); 962 } 963 964 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 965 frp, freebs_enqueue, KM_NOSLEEP)); 966 } 967 968 /*ARGSUSED*/ 969 mblk_t * 970 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp) 971 { 972 mblk_t *mp; 973 974 /* 975 * Note that this is structured to allow the common case (i.e. 976 * STREAMS flowtracing disabled) to call gesballoc() with tail 977 * call optimization. 978 */ 979 if (!str_ftnever) { 980 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 981 frp, dblk_lastfree_desb, KM_NOSLEEP); 982 983 if (mp != NULL) 984 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size); 985 return (mp); 986 } 987 988 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0), 989 frp, dblk_lastfree_desb, KM_NOSLEEP)); 990 } 991 992 static void 993 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp) 994 { 995 bcache_t *bcp = dbp->db_cache; 996 997 ASSERT(dbp->db_mblk == mp); 998 if (dbp->db_fthdr != NULL) 999 str_ftfree(dbp); 1000 1001 /* set credp and projid to be 'unspecified' before returning to cache */ 1002 if (dbp->db_credp != NULL) { 1003 crfree(dbp->db_credp); 1004 dbp->db_credp = NULL; 1005 } 1006 dbp->db_cpid = -1; 1007 dbp->db_struioflag = 0; 1008 dbp->db_struioun.cksum.flags = 0; 1009 1010 mutex_enter(&bcp->mutex); 1011 kmem_cache_free(bcp->dblk_cache, dbp); 1012 bcp->alloc--; 1013 1014 if (bcp->alloc == 0 && bcp->destroy != 0) { 1015 kmem_cache_destroy(bcp->dblk_cache); 1016 kmem_cache_destroy(bcp->buffer_cache); 1017 mutex_exit(&bcp->mutex); 1018 mutex_destroy(&bcp->mutex); 1019 kmem_free(bcp, sizeof (bcache_t)); 1020 } else { 1021 mutex_exit(&bcp->mutex); 1022 } 1023 } 1024 1025 bcache_t * 1026 bcache_create(char *name, size_t size, uint_t align) 1027 { 1028 bcache_t *bcp; 1029 char buffer[255]; 1030 1031 ASSERT((align & (align - 1)) == 0); 1032 1033 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL) 1034 return (NULL); 1035 1036 bcp->size = size; 1037 bcp->align = align; 1038 bcp->alloc = 0; 1039 bcp->destroy = 0; 1040 1041 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL); 1042 1043 (void) sprintf(buffer, "%s_buffer_cache", name); 1044 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL, 1045 NULL, NULL, NULL, 0); 1046 (void) sprintf(buffer, "%s_dblk_cache", name); 1047 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t), 1048 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor, 1049 NULL, (void *)bcp, NULL, 0); 1050 1051 return (bcp); 1052 } 1053 1054 void 1055 bcache_destroy(bcache_t *bcp) 1056 { 1057 ASSERT(bcp != NULL); 1058 1059 mutex_enter(&bcp->mutex); 1060 if (bcp->alloc == 0) { 1061 kmem_cache_destroy(bcp->dblk_cache); 1062 kmem_cache_destroy(bcp->buffer_cache); 1063 mutex_exit(&bcp->mutex); 1064 mutex_destroy(&bcp->mutex); 1065 kmem_free(bcp, sizeof (bcache_t)); 1066 } else { 1067 bcp->destroy++; 1068 mutex_exit(&bcp->mutex); 1069 } 1070 } 1071 1072 /*ARGSUSED*/ 1073 mblk_t * 1074 bcache_allocb(bcache_t *bcp, uint_t pri) 1075 { 1076 dblk_t *dbp; 1077 mblk_t *mp = NULL; 1078 1079 ASSERT(bcp != NULL); 1080 1081 mutex_enter(&bcp->mutex); 1082 if (bcp->destroy != 0) { 1083 mutex_exit(&bcp->mutex); 1084 goto out; 1085 } 1086 1087 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) { 1088 mutex_exit(&bcp->mutex); 1089 goto out; 1090 } 1091 bcp->alloc++; 1092 mutex_exit(&bcp->mutex); 1093 1094 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0); 1095 1096 mp = dbp->db_mblk; 1097 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1098 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1099 mp->b_rptr = mp->b_wptr = dbp->db_base; 1100 mp->b_queue = NULL; 1101 MBLK_BAND_FLAG_WORD(mp) = 0; 1102 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size); 1103 out: 1104 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp); 1105 1106 return (mp); 1107 } 1108 1109 static void 1110 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp) 1111 { 1112 ASSERT(dbp->db_mblk == mp); 1113 if (dbp->db_fthdr != NULL) 1114 str_ftfree(dbp); 1115 1116 /* set credp and projid to be 'unspecified' before returning to cache */ 1117 if (dbp->db_credp != NULL) { 1118 crfree(dbp->db_credp); 1119 dbp->db_credp = NULL; 1120 } 1121 dbp->db_cpid = -1; 1122 dbp->db_struioflag = 0; 1123 dbp->db_struioun.cksum.flags = 0; 1124 1125 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base); 1126 kmem_cache_free(dbp->db_cache, dbp); 1127 } 1128 1129 static mblk_t * 1130 allocb_oversize(size_t size, int kmflags) 1131 { 1132 mblk_t *mp; 1133 void *buf; 1134 1135 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN); 1136 if ((buf = kmem_alloc(size, kmflags)) == NULL) 1137 return (NULL); 1138 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0), 1139 &frnop, dblk_lastfree_oversize, kmflags)) == NULL) 1140 kmem_free(buf, size); 1141 1142 if (mp != NULL) 1143 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size); 1144 1145 return (mp); 1146 } 1147 1148 mblk_t * 1149 allocb_tryhard(size_t target_size) 1150 { 1151 size_t size; 1152 mblk_t *bp; 1153 1154 for (size = target_size; size < target_size + 512; 1155 size += DBLK_CACHE_ALIGN) 1156 if ((bp = allocb(size, BPRI_HI)) != NULL) 1157 return (bp); 1158 allocb_tryhard_fails++; 1159 return (NULL); 1160 } 1161 1162 /* 1163 * This routine is consolidation private for STREAMS internal use 1164 * This routine may only be called from sync routines (i.e., not 1165 * from put or service procedures). It is located here (rather 1166 * than strsubr.c) so that we don't have to expose all of the 1167 * allocb() implementation details in header files. 1168 */ 1169 mblk_t * 1170 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error) 1171 { 1172 dblk_t *dbp; 1173 mblk_t *mp; 1174 size_t index; 1175 1176 index = (size -1) >> DBLK_SIZE_SHIFT; 1177 1178 if (flags & STR_NOSIG) { 1179 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) { 1180 if (size != 0) { 1181 mp = allocb_oversize(size, KM_SLEEP); 1182 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", 1183 (uintptr_t)mp); 1184 return (mp); 1185 } 1186 index = 0; 1187 } 1188 1189 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP); 1190 mp = dbp->db_mblk; 1191 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0); 1192 mp->b_next = mp->b_prev = mp->b_cont = NULL; 1193 mp->b_rptr = mp->b_wptr = dbp->db_base; 1194 mp->b_queue = NULL; 1195 MBLK_BAND_FLAG_WORD(mp) = 0; 1196 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size); 1197 1198 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp); 1199 1200 } else { 1201 while ((mp = allocb(size, pri)) == NULL) { 1202 if ((*error = strwaitbuf(size, BPRI_HI)) != 0) 1203 return (NULL); 1204 } 1205 } 1206 1207 return (mp); 1208 } 1209 1210 /* 1211 * Call function 'func' with 'arg' when a class zero block can 1212 * be allocated with priority 'pri'. 1213 */ 1214 bufcall_id_t 1215 esbbcall(uint_t pri, void (*func)(void *), void *arg) 1216 { 1217 return (bufcall(1, pri, func, arg)); 1218 } 1219 1220 /* 1221 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials 1222 * ioc_id, rval and error of the struct ioctl to set up an ioctl call. 1223 * This provides consistency for all internal allocators of ioctl. 1224 */ 1225 mblk_t * 1226 mkiocb(uint_t cmd) 1227 { 1228 struct iocblk *ioc; 1229 mblk_t *mp; 1230 1231 /* 1232 * Allocate enough space for any of the ioctl related messages. 1233 */ 1234 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL) 1235 return (NULL); 1236 1237 bzero(mp->b_rptr, sizeof (union ioctypes)); 1238 1239 /* 1240 * Set the mblk_t information and ptrs correctly. 1241 */ 1242 mp->b_wptr += sizeof (struct iocblk); 1243 mp->b_datap->db_type = M_IOCTL; 1244 1245 /* 1246 * Fill in the fields. 1247 */ 1248 ioc = (struct iocblk *)mp->b_rptr; 1249 ioc->ioc_cmd = cmd; 1250 ioc->ioc_cr = kcred; 1251 ioc->ioc_id = getiocseqno(); 1252 ioc->ioc_flag = IOC_NATIVE; 1253 return (mp); 1254 } 1255 1256 /* 1257 * test if block of given size can be allocated with a request of 1258 * the given priority. 1259 * 'pri' is no longer used, but is retained for compatibility. 1260 */ 1261 /* ARGSUSED */ 1262 int 1263 testb(size_t size, uint_t pri) 1264 { 1265 return ((size + sizeof (dblk_t)) <= kmem_avail()); 1266 } 1267 1268 /* 1269 * Call function 'func' with argument 'arg' when there is a reasonably 1270 * good chance that a block of size 'size' can be allocated. 1271 * 'pri' is no longer used, but is retained for compatibility. 1272 */ 1273 /* ARGSUSED */ 1274 bufcall_id_t 1275 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg) 1276 { 1277 static long bid = 1; /* always odd to save checking for zero */ 1278 bufcall_id_t bc_id; 1279 struct strbufcall *bcp; 1280 1281 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL) 1282 return (0); 1283 1284 bcp->bc_func = func; 1285 bcp->bc_arg = arg; 1286 bcp->bc_size = size; 1287 bcp->bc_next = NULL; 1288 bcp->bc_executor = NULL; 1289 1290 mutex_enter(&strbcall_lock); 1291 /* 1292 * After bcp is linked into strbcalls and strbcall_lock is dropped there 1293 * should be no references to bcp since it may be freed by 1294 * runbufcalls(). Since bcp_id field is returned, we save its value in 1295 * the local var. 1296 */ 1297 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */ 1298 1299 /* 1300 * add newly allocated stream event to existing 1301 * linked list of events. 1302 */ 1303 if (strbcalls.bc_head == NULL) { 1304 strbcalls.bc_head = strbcalls.bc_tail = bcp; 1305 } else { 1306 strbcalls.bc_tail->bc_next = bcp; 1307 strbcalls.bc_tail = bcp; 1308 } 1309 1310 cv_signal(&strbcall_cv); 1311 mutex_exit(&strbcall_lock); 1312 return (bc_id); 1313 } 1314 1315 /* 1316 * Cancel a bufcall request. 1317 */ 1318 void 1319 unbufcall(bufcall_id_t id) 1320 { 1321 strbufcall_t *bcp, *pbcp; 1322 1323 mutex_enter(&strbcall_lock); 1324 again: 1325 pbcp = NULL; 1326 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) { 1327 if (id == bcp->bc_id) 1328 break; 1329 pbcp = bcp; 1330 } 1331 if (bcp) { 1332 if (bcp->bc_executor != NULL) { 1333 if (bcp->bc_executor != curthread) { 1334 cv_wait(&bcall_cv, &strbcall_lock); 1335 goto again; 1336 } 1337 } else { 1338 if (pbcp) 1339 pbcp->bc_next = bcp->bc_next; 1340 else 1341 strbcalls.bc_head = bcp->bc_next; 1342 if (bcp == strbcalls.bc_tail) 1343 strbcalls.bc_tail = pbcp; 1344 kmem_free(bcp, sizeof (strbufcall_t)); 1345 } 1346 } 1347 mutex_exit(&strbcall_lock); 1348 } 1349 1350 /* 1351 * Duplicate a message block by block (uses dupb), returning 1352 * a pointer to the duplicate message. 1353 * Returns a non-NULL value only if the entire message 1354 * was dup'd. 1355 */ 1356 mblk_t * 1357 dupmsg(mblk_t *bp) 1358 { 1359 mblk_t *head, *nbp; 1360 1361 if (!bp || !(nbp = head = dupb(bp))) 1362 return (NULL); 1363 1364 while (bp->b_cont) { 1365 if (!(nbp->b_cont = dupb(bp->b_cont))) { 1366 freemsg(head); 1367 return (NULL); 1368 } 1369 nbp = nbp->b_cont; 1370 bp = bp->b_cont; 1371 } 1372 return (head); 1373 } 1374 1375 #define DUPB_NOLOAN(bp) \ 1376 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \ 1377 copyb((bp)) : dupb((bp))) 1378 1379 mblk_t * 1380 dupmsg_noloan(mblk_t *bp) 1381 { 1382 mblk_t *head, *nbp; 1383 1384 if (bp == NULL || DB_TYPE(bp) != M_DATA || 1385 ((nbp = head = DUPB_NOLOAN(bp)) == NULL)) 1386 return (NULL); 1387 1388 while (bp->b_cont) { 1389 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) { 1390 freemsg(head); 1391 return (NULL); 1392 } 1393 nbp = nbp->b_cont; 1394 bp = bp->b_cont; 1395 } 1396 return (head); 1397 } 1398 1399 /* 1400 * Copy data from message and data block to newly allocated message and 1401 * data block. Returns new message block pointer, or NULL if error. 1402 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy 1403 * as in the original even when db_base is not word aligned. (bug 1052877) 1404 */ 1405 mblk_t * 1406 copyb(mblk_t *bp) 1407 { 1408 mblk_t *nbp; 1409 dblk_t *dp, *ndp; 1410 uchar_t *base; 1411 size_t size; 1412 size_t unaligned; 1413 1414 ASSERT(bp->b_wptr >= bp->b_rptr); 1415 1416 dp = bp->b_datap; 1417 if (dp->db_fthdr != NULL) 1418 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0); 1419 1420 /* 1421 * Special handling for Multidata message; this should be 1422 * removed once a copy-callback routine is made available. 1423 */ 1424 if (dp->db_type == M_MULTIDATA) { 1425 cred_t *cr; 1426 1427 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL) 1428 return (NULL); 1429 1430 nbp->b_flag = bp->b_flag; 1431 nbp->b_band = bp->b_band; 1432 ndp = nbp->b_datap; 1433 1434 /* See comments below on potential issues. */ 1435 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1436 1437 ASSERT(ndp->db_type == dp->db_type); 1438 cr = dp->db_credp; 1439 if (cr != NULL) 1440 crhold(ndp->db_credp = cr); 1441 ndp->db_cpid = dp->db_cpid; 1442 return (nbp); 1443 } 1444 1445 size = dp->db_lim - dp->db_base; 1446 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t)); 1447 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL) 1448 return (NULL); 1449 nbp->b_flag = bp->b_flag; 1450 nbp->b_band = bp->b_band; 1451 ndp = nbp->b_datap; 1452 1453 /* 1454 * Well, here is a potential issue. If we are trying to 1455 * trace a flow, and we copy the message, we might lose 1456 * information about where this message might have been. 1457 * So we should inherit the FT data. On the other hand, 1458 * a user might be interested only in alloc to free data. 1459 * So I guess the real answer is to provide a tunable. 1460 */ 1461 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1); 1462 1463 base = ndp->db_base + unaligned; 1464 bcopy(dp->db_base, ndp->db_base + unaligned, size); 1465 1466 nbp->b_rptr = base + (bp->b_rptr - dp->db_base); 1467 nbp->b_wptr = nbp->b_rptr + MBLKL(bp); 1468 1469 return (nbp); 1470 } 1471 1472 /* 1473 * Copy data from message to newly allocated message using new 1474 * data blocks. Returns a pointer to the new message, or NULL if error. 1475 */ 1476 mblk_t * 1477 copymsg(mblk_t *bp) 1478 { 1479 mblk_t *head, *nbp; 1480 1481 if (!bp || !(nbp = head = copyb(bp))) 1482 return (NULL); 1483 1484 while (bp->b_cont) { 1485 if (!(nbp->b_cont = copyb(bp->b_cont))) { 1486 freemsg(head); 1487 return (NULL); 1488 } 1489 nbp = nbp->b_cont; 1490 bp = bp->b_cont; 1491 } 1492 return (head); 1493 } 1494 1495 /* 1496 * link a message block to tail of message 1497 */ 1498 void 1499 linkb(mblk_t *mp, mblk_t *bp) 1500 { 1501 ASSERT(mp && bp); 1502 1503 for (; mp->b_cont; mp = mp->b_cont) 1504 ; 1505 mp->b_cont = bp; 1506 } 1507 1508 /* 1509 * unlink a message block from head of message 1510 * return pointer to new message. 1511 * NULL if message becomes empty. 1512 */ 1513 mblk_t * 1514 unlinkb(mblk_t *bp) 1515 { 1516 mblk_t *bp1; 1517 1518 bp1 = bp->b_cont; 1519 bp->b_cont = NULL; 1520 return (bp1); 1521 } 1522 1523 /* 1524 * remove a message block "bp" from message "mp" 1525 * 1526 * Return pointer to new message or NULL if no message remains. 1527 * Return -1 if bp is not found in message. 1528 */ 1529 mblk_t * 1530 rmvb(mblk_t *mp, mblk_t *bp) 1531 { 1532 mblk_t *tmp; 1533 mblk_t *lastp = NULL; 1534 1535 ASSERT(mp && bp); 1536 for (tmp = mp; tmp; tmp = tmp->b_cont) { 1537 if (tmp == bp) { 1538 if (lastp) 1539 lastp->b_cont = tmp->b_cont; 1540 else 1541 mp = tmp->b_cont; 1542 tmp->b_cont = NULL; 1543 return (mp); 1544 } 1545 lastp = tmp; 1546 } 1547 return ((mblk_t *)-1); 1548 } 1549 1550 /* 1551 * Concatenate and align first len bytes of common 1552 * message type. Len == -1, means concat everything. 1553 * Returns 1 on success, 0 on failure 1554 * After the pullup, mp points to the pulled up data. 1555 */ 1556 int 1557 pullupmsg(mblk_t *mp, ssize_t len) 1558 { 1559 mblk_t *bp, *b_cont; 1560 dblk_t *dbp; 1561 ssize_t n; 1562 1563 ASSERT(mp->b_datap->db_ref > 0); 1564 ASSERT(mp->b_next == NULL && mp->b_prev == NULL); 1565 1566 /* 1567 * We won't handle Multidata message, since it contains 1568 * metadata which this function has no knowledge of; we 1569 * assert on DEBUG, and return failure otherwise. 1570 */ 1571 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1572 if (mp->b_datap->db_type == M_MULTIDATA) 1573 return (0); 1574 1575 if (len == -1) { 1576 if (mp->b_cont == NULL && str_aligned(mp->b_rptr)) 1577 return (1); 1578 len = xmsgsize(mp); 1579 } else { 1580 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr; 1581 ASSERT(first_mblk_len >= 0); 1582 /* 1583 * If the length is less than that of the first mblk, 1584 * we want to pull up the message into an aligned mblk. 1585 * Though not part of the spec, some callers assume it. 1586 */ 1587 if (len <= first_mblk_len) { 1588 if (str_aligned(mp->b_rptr)) 1589 return (1); 1590 len = first_mblk_len; 1591 } else if (xmsgsize(mp) < len) 1592 return (0); 1593 } 1594 1595 if ((bp = allocb_tmpl(len, mp)) == NULL) 1596 return (0); 1597 1598 dbp = bp->b_datap; 1599 *bp = *mp; /* swap mblks so bp heads the old msg... */ 1600 mp->b_datap = dbp; /* ... and mp heads the new message */ 1601 mp->b_datap->db_mblk = mp; 1602 bp->b_datap->db_mblk = bp; 1603 mp->b_rptr = mp->b_wptr = dbp->db_base; 1604 1605 do { 1606 ASSERT(bp->b_datap->db_ref > 0); 1607 ASSERT(bp->b_wptr >= bp->b_rptr); 1608 n = MIN(bp->b_wptr - bp->b_rptr, len); 1609 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1610 if (n > 0) 1611 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n); 1612 mp->b_wptr += n; 1613 bp->b_rptr += n; 1614 len -= n; 1615 if (bp->b_rptr != bp->b_wptr) 1616 break; 1617 b_cont = bp->b_cont; 1618 freeb(bp); 1619 bp = b_cont; 1620 } while (len && bp); 1621 1622 mp->b_cont = bp; /* tack on whatever wasn't pulled up */ 1623 1624 return (1); 1625 } 1626 1627 /* 1628 * Concatenate and align at least the first len bytes of common message 1629 * type. Len == -1 means concatenate everything. The original message is 1630 * unaltered. Returns a pointer to a new message on success, otherwise 1631 * returns NULL. 1632 */ 1633 mblk_t * 1634 msgpullup(mblk_t *mp, ssize_t len) 1635 { 1636 mblk_t *newmp; 1637 ssize_t totlen; 1638 ssize_t n; 1639 1640 /* 1641 * We won't handle Multidata message, since it contains 1642 * metadata which this function has no knowledge of; we 1643 * assert on DEBUG, and return failure otherwise. 1644 */ 1645 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1646 if (mp->b_datap->db_type == M_MULTIDATA) 1647 return (NULL); 1648 1649 totlen = xmsgsize(mp); 1650 1651 if ((len > 0) && (len > totlen)) 1652 return (NULL); 1653 1654 /* 1655 * Copy all of the first msg type into one new mblk, then dupmsg 1656 * and link the rest onto this. 1657 */ 1658 1659 len = totlen; 1660 1661 if ((newmp = allocb_tmpl(len, mp)) == NULL) 1662 return (NULL); 1663 1664 newmp->b_flag = mp->b_flag; 1665 newmp->b_band = mp->b_band; 1666 1667 while (len > 0) { 1668 n = mp->b_wptr - mp->b_rptr; 1669 ASSERT(n >= 0); /* allow zero-length mblk_t's */ 1670 if (n > 0) 1671 bcopy(mp->b_rptr, newmp->b_wptr, n); 1672 newmp->b_wptr += n; 1673 len -= n; 1674 mp = mp->b_cont; 1675 } 1676 1677 if (mp != NULL) { 1678 newmp->b_cont = dupmsg(mp); 1679 if (newmp->b_cont == NULL) { 1680 freemsg(newmp); 1681 return (NULL); 1682 } 1683 } 1684 1685 return (newmp); 1686 } 1687 1688 /* 1689 * Trim bytes from message 1690 * len > 0, trim from head 1691 * len < 0, trim from tail 1692 * Returns 1 on success, 0 on failure. 1693 */ 1694 int 1695 adjmsg(mblk_t *mp, ssize_t len) 1696 { 1697 mblk_t *bp; 1698 mblk_t *save_bp = NULL; 1699 mblk_t *prev_bp; 1700 mblk_t *bcont; 1701 unsigned char type; 1702 ssize_t n; 1703 int fromhead; 1704 int first; 1705 1706 ASSERT(mp != NULL); 1707 /* 1708 * We won't handle Multidata message, since it contains 1709 * metadata which this function has no knowledge of; we 1710 * assert on DEBUG, and return failure otherwise. 1711 */ 1712 ASSERT(mp->b_datap->db_type != M_MULTIDATA); 1713 if (mp->b_datap->db_type == M_MULTIDATA) 1714 return (0); 1715 1716 if (len < 0) { 1717 fromhead = 0; 1718 len = -len; 1719 } else { 1720 fromhead = 1; 1721 } 1722 1723 if (xmsgsize(mp) < len) 1724 return (0); 1725 1726 if (fromhead) { 1727 first = 1; 1728 while (len) { 1729 ASSERT(mp->b_wptr >= mp->b_rptr); 1730 n = MIN(mp->b_wptr - mp->b_rptr, len); 1731 mp->b_rptr += n; 1732 len -= n; 1733 1734 /* 1735 * If this is not the first zero length 1736 * message remove it 1737 */ 1738 if (!first && (mp->b_wptr == mp->b_rptr)) { 1739 bcont = mp->b_cont; 1740 freeb(mp); 1741 mp = save_bp->b_cont = bcont; 1742 } else { 1743 save_bp = mp; 1744 mp = mp->b_cont; 1745 } 1746 first = 0; 1747 } 1748 } else { 1749 type = mp->b_datap->db_type; 1750 while (len) { 1751 bp = mp; 1752 save_bp = NULL; 1753 1754 /* 1755 * Find the last message of same type 1756 */ 1757 while (bp && bp->b_datap->db_type == type) { 1758 ASSERT(bp->b_wptr >= bp->b_rptr); 1759 prev_bp = save_bp; 1760 save_bp = bp; 1761 bp = bp->b_cont; 1762 } 1763 if (save_bp == NULL) 1764 break; 1765 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len); 1766 save_bp->b_wptr -= n; 1767 len -= n; 1768 1769 /* 1770 * If this is not the first message 1771 * and we have taken away everything 1772 * from this message, remove it 1773 */ 1774 1775 if ((save_bp != mp) && 1776 (save_bp->b_wptr == save_bp->b_rptr)) { 1777 bcont = save_bp->b_cont; 1778 freeb(save_bp); 1779 prev_bp->b_cont = bcont; 1780 } 1781 } 1782 } 1783 return (1); 1784 } 1785 1786 /* 1787 * get number of data bytes in message 1788 */ 1789 size_t 1790 msgdsize(mblk_t *bp) 1791 { 1792 size_t count = 0; 1793 1794 for (; bp; bp = bp->b_cont) 1795 if (bp->b_datap->db_type == M_DATA) { 1796 ASSERT(bp->b_wptr >= bp->b_rptr); 1797 count += bp->b_wptr - bp->b_rptr; 1798 } 1799 return (count); 1800 } 1801 1802 /* 1803 * Get a message off head of queue 1804 * 1805 * If queue has no buffers then mark queue 1806 * with QWANTR. (queue wants to be read by 1807 * someone when data becomes available) 1808 * 1809 * If there is something to take off then do so. 1810 * If queue falls below hi water mark turn off QFULL 1811 * flag. Decrement weighted count of queue. 1812 * Also turn off QWANTR because queue is being read. 1813 * 1814 * The queue count is maintained on a per-band basis. 1815 * Priority band 0 (normal messages) uses q_count, 1816 * q_lowat, etc. Non-zero priority bands use the 1817 * fields in their respective qband structures 1818 * (qb_count, qb_lowat, etc.) All messages appear 1819 * on the same list, linked via their b_next pointers. 1820 * q_first is the head of the list. q_count does 1821 * not reflect the size of all the messages on the 1822 * queue. It only reflects those messages in the 1823 * normal band of flow. The one exception to this 1824 * deals with high priority messages. They are in 1825 * their own conceptual "band", but are accounted 1826 * against q_count. 1827 * 1828 * If queue count is below the lo water mark and QWANTW 1829 * is set, enable the closest backq which has a service 1830 * procedure and turn off the QWANTW flag. 1831 * 1832 * getq could be built on top of rmvq, but isn't because 1833 * of performance considerations. 1834 * 1835 * A note on the use of q_count and q_mblkcnt: 1836 * q_count is the traditional byte count for messages that 1837 * have been put on a queue. Documentation tells us that 1838 * we shouldn't rely on that count, but some drivers/modules 1839 * do. What was needed, however, is a mechanism to prevent 1840 * runaway streams from consuming all of the resources, 1841 * and particularly be able to flow control zero-length 1842 * messages. q_mblkcnt is used for this purpose. It 1843 * counts the number of mblk's that are being put on 1844 * the queue. The intention here, is that each mblk should 1845 * contain one byte of data and, for the purpose of 1846 * flow-control, logically does. A queue will become 1847 * full when EITHER of these values (q_count and q_mblkcnt) 1848 * reach the highwater mark. It will clear when BOTH 1849 * of them drop below the highwater mark. And it will 1850 * backenable when BOTH of them drop below the lowwater 1851 * mark. 1852 * With this algorithm, a driver/module might be able 1853 * to find a reasonably accurate q_count, and the 1854 * framework can still try and limit resource usage. 1855 */ 1856 mblk_t * 1857 getq(queue_t *q) 1858 { 1859 mblk_t *bp; 1860 uchar_t band = 0; 1861 1862 bp = getq_noenab(q, 0); 1863 if (bp != NULL) 1864 band = bp->b_band; 1865 1866 /* 1867 * Inlined from qbackenable(). 1868 * Quick check without holding the lock. 1869 */ 1870 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 1871 return (bp); 1872 1873 qbackenable(q, band); 1874 return (bp); 1875 } 1876 1877 /* 1878 * Calculate number of data bytes in a single data message block taking 1879 * multidata messages into account. 1880 */ 1881 1882 #define ADD_MBLK_SIZE(mp, size) \ 1883 if (DB_TYPE(mp) != M_MULTIDATA) { \ 1884 (size) += MBLKL(mp); \ 1885 } else { \ 1886 uint_t pinuse; \ 1887 \ 1888 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \ 1889 (size) += pinuse; \ 1890 } 1891 1892 /* 1893 * Returns the number of bytes in a message (a message is defined as a 1894 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we 1895 * also return the number of distinct mblks in the message. 1896 */ 1897 int 1898 mp_cont_len(mblk_t *bp, int *mblkcnt) 1899 { 1900 mblk_t *mp; 1901 int mblks = 0; 1902 int bytes = 0; 1903 1904 for (mp = bp; mp != NULL; mp = mp->b_cont) { 1905 ADD_MBLK_SIZE(mp, bytes); 1906 mblks++; 1907 } 1908 1909 if (mblkcnt != NULL) 1910 *mblkcnt = mblks; 1911 1912 return (bytes); 1913 } 1914 1915 /* 1916 * Like getq() but does not backenable. This is used by the stream 1917 * head when a putback() is likely. The caller must call qbackenable() 1918 * after it is done with accessing the queue. 1919 * The rbytes arguments to getq_noneab() allows callers to specify a 1920 * the maximum number of bytes to return. If the current amount on the 1921 * queue is less than this then the entire message will be returned. 1922 * A value of 0 returns the entire message and is equivalent to the old 1923 * default behaviour prior to the addition of the rbytes argument. 1924 */ 1925 mblk_t * 1926 getq_noenab(queue_t *q, ssize_t rbytes) 1927 { 1928 mblk_t *bp, *mp1; 1929 mblk_t *mp2 = NULL; 1930 qband_t *qbp; 1931 kthread_id_t freezer; 1932 int bytecnt = 0, mblkcnt = 0; 1933 1934 /* freezestr should allow its caller to call getq/putq */ 1935 freezer = STREAM(q)->sd_freezer; 1936 if (freezer == curthread) { 1937 ASSERT(frozenstr(q)); 1938 ASSERT(MUTEX_HELD(QLOCK(q))); 1939 } else 1940 mutex_enter(QLOCK(q)); 1941 1942 if ((bp = q->q_first) == 0) { 1943 q->q_flag |= QWANTR; 1944 } else { 1945 /* 1946 * If the caller supplied a byte threshold and there is 1947 * more than this amount on the queue then break up the 1948 * the message appropriately. We can only safely do 1949 * this for M_DATA messages. 1950 */ 1951 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) && 1952 (q->q_count > rbytes)) { 1953 /* 1954 * Inline version of mp_cont_len() which terminates 1955 * when we meet or exceed rbytes. 1956 */ 1957 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) { 1958 mblkcnt++; 1959 ADD_MBLK_SIZE(mp1, bytecnt); 1960 if (bytecnt >= rbytes) 1961 break; 1962 } 1963 /* 1964 * We need to account for the following scenarios: 1965 * 1966 * 1) Too much data in the first message: 1967 * mp1 will be the mblk which puts us over our 1968 * byte limit. 1969 * 2) Not enough data in the first message: 1970 * mp1 will be NULL. 1971 * 3) Exactly the right amount of data contained within 1972 * whole mblks: 1973 * mp1->b_cont will be where we break the message. 1974 */ 1975 if (bytecnt > rbytes) { 1976 /* 1977 * Dup/copy mp1 and put what we don't need 1978 * back onto the queue. Adjust the read/write 1979 * and continuation pointers appropriately 1980 * and decrement the current mblk count to 1981 * reflect we are putting an mblk back onto 1982 * the queue. 1983 * When adjusting the message pointers, it's 1984 * OK to use the existing bytecnt and the 1985 * requested amount (rbytes) to calculate the 1986 * the new write offset (b_wptr) of what we 1987 * are taking. However, we cannot use these 1988 * values when calculating the read offset of 1989 * the mblk we are putting back on the queue. 1990 * This is because the begining (b_rptr) of the 1991 * mblk represents some arbitrary point within 1992 * the message. 1993 * It's simplest to do this by advancing b_rptr 1994 * by the new length of mp1 as we don't have to 1995 * remember any intermediate state. 1996 */ 1997 ASSERT(mp1 != NULL); 1998 mblkcnt--; 1999 if ((mp2 = dupb(mp1)) == NULL && 2000 (mp2 = copyb(mp1)) == NULL) { 2001 bytecnt = mblkcnt = 0; 2002 goto dup_failed; 2003 } 2004 mp2->b_cont = mp1->b_cont; 2005 mp1->b_wptr -= bytecnt - rbytes; 2006 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr; 2007 mp1->b_cont = NULL; 2008 bytecnt = rbytes; 2009 } else { 2010 /* 2011 * Either there is not enough data in the first 2012 * message or there is no excess data to deal 2013 * with. If mp1 is NULL, we are taking the 2014 * whole message. No need to do anything. 2015 * Otherwise we assign mp1->b_cont to mp2 as 2016 * we will be putting this back onto the head of 2017 * the queue. 2018 */ 2019 if (mp1 != NULL) { 2020 mp2 = mp1->b_cont; 2021 mp1->b_cont = NULL; 2022 } 2023 } 2024 /* 2025 * If mp2 is not NULL then we have part of the message 2026 * to put back onto the queue. 2027 */ 2028 if (mp2 != NULL) { 2029 if ((mp2->b_next = bp->b_next) == NULL) 2030 q->q_last = mp2; 2031 else 2032 bp->b_next->b_prev = mp2; 2033 q->q_first = mp2; 2034 } else { 2035 if ((q->q_first = bp->b_next) == NULL) 2036 q->q_last = NULL; 2037 else 2038 q->q_first->b_prev = NULL; 2039 } 2040 } else { 2041 /* 2042 * Either no byte threshold was supplied, there is 2043 * not enough on the queue or we failed to 2044 * duplicate/copy a data block. In these cases we 2045 * just take the entire first message. 2046 */ 2047 dup_failed: 2048 bytecnt = mp_cont_len(bp, &mblkcnt); 2049 if ((q->q_first = bp->b_next) == NULL) 2050 q->q_last = NULL; 2051 else 2052 q->q_first->b_prev = NULL; 2053 } 2054 if (bp->b_band == 0) { 2055 q->q_count -= bytecnt; 2056 q->q_mblkcnt -= mblkcnt; 2057 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2058 (q->q_mblkcnt < q->q_hiwat))) { 2059 q->q_flag &= ~QFULL; 2060 } 2061 } else { 2062 int i; 2063 2064 ASSERT(bp->b_band <= q->q_nband); 2065 ASSERT(q->q_bandp != NULL); 2066 ASSERT(MUTEX_HELD(QLOCK(q))); 2067 qbp = q->q_bandp; 2068 i = bp->b_band; 2069 while (--i > 0) 2070 qbp = qbp->qb_next; 2071 if (qbp->qb_first == qbp->qb_last) { 2072 qbp->qb_first = NULL; 2073 qbp->qb_last = NULL; 2074 } else { 2075 qbp->qb_first = bp->b_next; 2076 } 2077 qbp->qb_count -= bytecnt; 2078 qbp->qb_mblkcnt -= mblkcnt; 2079 if (qbp->qb_mblkcnt == 0 || 2080 ((qbp->qb_count < qbp->qb_hiwat) && 2081 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2082 qbp->qb_flag &= ~QB_FULL; 2083 } 2084 } 2085 q->q_flag &= ~QWANTR; 2086 bp->b_next = NULL; 2087 bp->b_prev = NULL; 2088 } 2089 if (freezer != curthread) 2090 mutex_exit(QLOCK(q)); 2091 2092 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0); 2093 2094 return (bp); 2095 } 2096 2097 /* 2098 * Determine if a backenable is needed after removing a message in the 2099 * specified band. 2100 * NOTE: This routine assumes that something like getq_noenab() has been 2101 * already called. 2102 * 2103 * For the read side it is ok to hold sd_lock across calling this (and the 2104 * stream head often does). 2105 * But for the write side strwakeq might be invoked and it acquires sd_lock. 2106 */ 2107 void 2108 qbackenable(queue_t *q, uchar_t band) 2109 { 2110 int backenab = 0; 2111 qband_t *qbp; 2112 kthread_id_t freezer; 2113 2114 ASSERT(q); 2115 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock)); 2116 2117 /* 2118 * Quick check without holding the lock. 2119 * OK since after getq() has lowered the q_count these flags 2120 * would not change unless either the qbackenable() is done by 2121 * another thread (which is ok) or the queue has gotten QFULL 2122 * in which case another backenable will take place when the queue 2123 * drops below q_lowat. 2124 */ 2125 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0) 2126 return; 2127 2128 /* freezestr should allow its caller to call getq/putq */ 2129 freezer = STREAM(q)->sd_freezer; 2130 if (freezer == curthread) { 2131 ASSERT(frozenstr(q)); 2132 ASSERT(MUTEX_HELD(QLOCK(q))); 2133 } else 2134 mutex_enter(QLOCK(q)); 2135 2136 if (band == 0) { 2137 if (q->q_lowat == 0 || (q->q_count < q->q_lowat && 2138 q->q_mblkcnt < q->q_lowat)) { 2139 backenab = q->q_flag & (QWANTW|QWANTWSYNC); 2140 } 2141 } else { 2142 int i; 2143 2144 ASSERT((unsigned)band <= q->q_nband); 2145 ASSERT(q->q_bandp != NULL); 2146 2147 qbp = q->q_bandp; 2148 i = band; 2149 while (--i > 0) 2150 qbp = qbp->qb_next; 2151 2152 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat && 2153 qbp->qb_mblkcnt < qbp->qb_lowat)) { 2154 backenab = qbp->qb_flag & QB_WANTW; 2155 } 2156 } 2157 2158 if (backenab == 0) { 2159 if (freezer != curthread) 2160 mutex_exit(QLOCK(q)); 2161 return; 2162 } 2163 2164 /* Have to drop the lock across strwakeq and backenable */ 2165 if (backenab & QWANTWSYNC) 2166 q->q_flag &= ~QWANTWSYNC; 2167 if (backenab & (QWANTW|QB_WANTW)) { 2168 if (band != 0) 2169 qbp->qb_flag &= ~QB_WANTW; 2170 else { 2171 q->q_flag &= ~QWANTW; 2172 } 2173 } 2174 2175 if (freezer != curthread) 2176 mutex_exit(QLOCK(q)); 2177 2178 if (backenab & QWANTWSYNC) 2179 strwakeq(q, QWANTWSYNC); 2180 if (backenab & (QWANTW|QB_WANTW)) 2181 backenable(q, band); 2182 } 2183 2184 /* 2185 * Remove a message from a queue. The queue count and other 2186 * flow control parameters are adjusted and the back queue 2187 * enabled if necessary. 2188 * 2189 * rmvq can be called with the stream frozen, but other utility functions 2190 * holding QLOCK, and by streams modules without any locks/frozen. 2191 */ 2192 void 2193 rmvq(queue_t *q, mblk_t *mp) 2194 { 2195 ASSERT(mp != NULL); 2196 2197 rmvq_noenab(q, mp); 2198 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) { 2199 /* 2200 * qbackenable can handle a frozen stream but not a "random" 2201 * qlock being held. Drop lock across qbackenable. 2202 */ 2203 mutex_exit(QLOCK(q)); 2204 qbackenable(q, mp->b_band); 2205 mutex_enter(QLOCK(q)); 2206 } else { 2207 qbackenable(q, mp->b_band); 2208 } 2209 } 2210 2211 /* 2212 * Like rmvq() but without any backenabling. 2213 * This exists to handle SR_CONSOL_DATA in strrput(). 2214 */ 2215 void 2216 rmvq_noenab(queue_t *q, mblk_t *mp) 2217 { 2218 int i; 2219 qband_t *qbp = NULL; 2220 kthread_id_t freezer; 2221 int bytecnt = 0, mblkcnt = 0; 2222 2223 freezer = STREAM(q)->sd_freezer; 2224 if (freezer == curthread) { 2225 ASSERT(frozenstr(q)); 2226 ASSERT(MUTEX_HELD(QLOCK(q))); 2227 } else if (MUTEX_HELD(QLOCK(q))) { 2228 /* Don't drop lock on exit */ 2229 freezer = curthread; 2230 } else 2231 mutex_enter(QLOCK(q)); 2232 2233 ASSERT(mp->b_band <= q->q_nband); 2234 if (mp->b_band != 0) { /* Adjust band pointers */ 2235 ASSERT(q->q_bandp != NULL); 2236 qbp = q->q_bandp; 2237 i = mp->b_band; 2238 while (--i > 0) 2239 qbp = qbp->qb_next; 2240 if (mp == qbp->qb_first) { 2241 if (mp->b_next && mp->b_band == mp->b_next->b_band) 2242 qbp->qb_first = mp->b_next; 2243 else 2244 qbp->qb_first = NULL; 2245 } 2246 if (mp == qbp->qb_last) { 2247 if (mp->b_prev && mp->b_band == mp->b_prev->b_band) 2248 qbp->qb_last = mp->b_prev; 2249 else 2250 qbp->qb_last = NULL; 2251 } 2252 } 2253 2254 /* 2255 * Remove the message from the list. 2256 */ 2257 if (mp->b_prev) 2258 mp->b_prev->b_next = mp->b_next; 2259 else 2260 q->q_first = mp->b_next; 2261 if (mp->b_next) 2262 mp->b_next->b_prev = mp->b_prev; 2263 else 2264 q->q_last = mp->b_prev; 2265 mp->b_next = NULL; 2266 mp->b_prev = NULL; 2267 2268 /* Get the size of the message for q_count accounting */ 2269 bytecnt = mp_cont_len(mp, &mblkcnt); 2270 2271 if (mp->b_band == 0) { /* Perform q_count accounting */ 2272 q->q_count -= bytecnt; 2273 q->q_mblkcnt -= mblkcnt; 2274 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) && 2275 (q->q_mblkcnt < q->q_hiwat))) { 2276 q->q_flag &= ~QFULL; 2277 } 2278 } else { /* Perform qb_count accounting */ 2279 qbp->qb_count -= bytecnt; 2280 qbp->qb_mblkcnt -= mblkcnt; 2281 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) && 2282 (qbp->qb_mblkcnt < qbp->qb_hiwat))) { 2283 qbp->qb_flag &= ~QB_FULL; 2284 } 2285 } 2286 if (freezer != curthread) 2287 mutex_exit(QLOCK(q)); 2288 2289 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0); 2290 } 2291 2292 /* 2293 * Empty a queue. 2294 * If flag is set, remove all messages. Otherwise, remove 2295 * only non-control messages. If queue falls below its low 2296 * water mark, and QWANTW is set, enable the nearest upstream 2297 * service procedure. 2298 * 2299 * Historical note: when merging the M_FLUSH code in strrput with this 2300 * code one difference was discovered. flushq did not have a check 2301 * for q_lowat == 0 in the backenabling test. 2302 * 2303 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed 2304 * if one exists on the queue. 2305 */ 2306 void 2307 flushq_common(queue_t *q, int flag, int pcproto_flag) 2308 { 2309 mblk_t *mp, *nmp; 2310 qband_t *qbp; 2311 int backenab = 0; 2312 unsigned char bpri; 2313 unsigned char qbf[NBAND]; /* band flushing backenable flags */ 2314 2315 if (q->q_first == NULL) 2316 return; 2317 2318 mutex_enter(QLOCK(q)); 2319 mp = q->q_first; 2320 q->q_first = NULL; 2321 q->q_last = NULL; 2322 q->q_count = 0; 2323 q->q_mblkcnt = 0; 2324 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2325 qbp->qb_first = NULL; 2326 qbp->qb_last = NULL; 2327 qbp->qb_count = 0; 2328 qbp->qb_mblkcnt = 0; 2329 qbp->qb_flag &= ~QB_FULL; 2330 } 2331 q->q_flag &= ~QFULL; 2332 mutex_exit(QLOCK(q)); 2333 while (mp) { 2334 nmp = mp->b_next; 2335 mp->b_next = mp->b_prev = NULL; 2336 2337 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0); 2338 2339 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO)) 2340 (void) putq(q, mp); 2341 else if (flag || datamsg(mp->b_datap->db_type)) 2342 freemsg(mp); 2343 else 2344 (void) putq(q, mp); 2345 mp = nmp; 2346 } 2347 bpri = 1; 2348 mutex_enter(QLOCK(q)); 2349 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2350 if ((qbp->qb_flag & QB_WANTW) && 2351 (((qbp->qb_count < qbp->qb_lowat) && 2352 (qbp->qb_mblkcnt < qbp->qb_lowat)) || 2353 qbp->qb_lowat == 0)) { 2354 qbp->qb_flag &= ~QB_WANTW; 2355 backenab = 1; 2356 qbf[bpri] = 1; 2357 } else 2358 qbf[bpri] = 0; 2359 bpri++; 2360 } 2361 ASSERT(bpri == (unsigned char)(q->q_nband + 1)); 2362 if ((q->q_flag & QWANTW) && 2363 (((q->q_count < q->q_lowat) && 2364 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2365 q->q_flag &= ~QWANTW; 2366 backenab = 1; 2367 qbf[0] = 1; 2368 } else 2369 qbf[0] = 0; 2370 2371 /* 2372 * If any band can now be written to, and there is a writer 2373 * for that band, then backenable the closest service procedure. 2374 */ 2375 if (backenab) { 2376 mutex_exit(QLOCK(q)); 2377 for (bpri = q->q_nband; bpri != 0; bpri--) 2378 if (qbf[bpri]) 2379 backenable(q, bpri); 2380 if (qbf[0]) 2381 backenable(q, 0); 2382 } else 2383 mutex_exit(QLOCK(q)); 2384 } 2385 2386 /* 2387 * The real flushing takes place in flushq_common. This is done so that 2388 * a flag which specifies whether or not M_PCPROTO messages should be flushed 2389 * or not. Currently the only place that uses this flag is the stream head. 2390 */ 2391 void 2392 flushq(queue_t *q, int flag) 2393 { 2394 flushq_common(q, flag, 0); 2395 } 2396 2397 /* 2398 * Flush the queue of messages of the given priority band. 2399 * There is some duplication of code between flushq and flushband. 2400 * This is because we want to optimize the code as much as possible. 2401 * The assumption is that there will be more messages in the normal 2402 * (priority 0) band than in any other. 2403 * 2404 * Historical note: when merging the M_FLUSH code in strrput with this 2405 * code one difference was discovered. flushband had an extra check for 2406 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 2407 * case. That check does not match the man page for flushband and was not 2408 * in the strrput flush code hence it was removed. 2409 */ 2410 void 2411 flushband(queue_t *q, unsigned char pri, int flag) 2412 { 2413 mblk_t *mp; 2414 mblk_t *nmp; 2415 mblk_t *last; 2416 qband_t *qbp; 2417 int band; 2418 2419 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL)); 2420 if (pri > q->q_nband) { 2421 return; 2422 } 2423 mutex_enter(QLOCK(q)); 2424 if (pri == 0) { 2425 mp = q->q_first; 2426 q->q_first = NULL; 2427 q->q_last = NULL; 2428 q->q_count = 0; 2429 q->q_mblkcnt = 0; 2430 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) { 2431 qbp->qb_first = NULL; 2432 qbp->qb_last = NULL; 2433 qbp->qb_count = 0; 2434 qbp->qb_mblkcnt = 0; 2435 qbp->qb_flag &= ~QB_FULL; 2436 } 2437 q->q_flag &= ~QFULL; 2438 mutex_exit(QLOCK(q)); 2439 while (mp) { 2440 nmp = mp->b_next; 2441 mp->b_next = mp->b_prev = NULL; 2442 if ((mp->b_band == 0) && 2443 ((flag == FLUSHALL) || 2444 datamsg(mp->b_datap->db_type))) 2445 freemsg(mp); 2446 else 2447 (void) putq(q, mp); 2448 mp = nmp; 2449 } 2450 mutex_enter(QLOCK(q)); 2451 if ((q->q_flag & QWANTW) && 2452 (((q->q_count < q->q_lowat) && 2453 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) { 2454 q->q_flag &= ~QWANTW; 2455 mutex_exit(QLOCK(q)); 2456 2457 backenable(q, pri); 2458 } else 2459 mutex_exit(QLOCK(q)); 2460 } else { /* pri != 0 */ 2461 boolean_t flushed = B_FALSE; 2462 band = pri; 2463 2464 ASSERT(MUTEX_HELD(QLOCK(q))); 2465 qbp = q->q_bandp; 2466 while (--band > 0) 2467 qbp = qbp->qb_next; 2468 mp = qbp->qb_first; 2469 if (mp == NULL) { 2470 mutex_exit(QLOCK(q)); 2471 return; 2472 } 2473 last = qbp->qb_last->b_next; 2474 /* 2475 * rmvq_noenab() and freemsg() are called for each mblk that 2476 * meets the criteria. The loop is executed until the last 2477 * mblk has been processed. 2478 */ 2479 while (mp != last) { 2480 ASSERT(mp->b_band == pri); 2481 nmp = mp->b_next; 2482 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) { 2483 rmvq_noenab(q, mp); 2484 freemsg(mp); 2485 flushed = B_TRUE; 2486 } 2487 mp = nmp; 2488 } 2489 mutex_exit(QLOCK(q)); 2490 2491 /* 2492 * If any mblk(s) has been freed, we know that qbackenable() 2493 * will need to be called. 2494 */ 2495 if (flushed) 2496 qbackenable(q, pri); 2497 } 2498 } 2499 2500 /* 2501 * Return 1 if the queue is not full. If the queue is full, return 2502 * 0 (may not put message) and set QWANTW flag (caller wants to write 2503 * to the queue). 2504 */ 2505 int 2506 canput(queue_t *q) 2507 { 2508 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q); 2509 2510 /* this is for loopback transports, they should not do a canput */ 2511 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv)); 2512 2513 /* Find next forward module that has a service procedure */ 2514 q = q->q_nfsrv; 2515 2516 if (!(q->q_flag & QFULL)) { 2517 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2518 return (1); 2519 } 2520 mutex_enter(QLOCK(q)); 2521 if (q->q_flag & QFULL) { 2522 q->q_flag |= QWANTW; 2523 mutex_exit(QLOCK(q)); 2524 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0); 2525 return (0); 2526 } 2527 mutex_exit(QLOCK(q)); 2528 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1); 2529 return (1); 2530 } 2531 2532 /* 2533 * This is the new canput for use with priority bands. Return 1 if the 2534 * band is not full. If the band is full, return 0 (may not put message) 2535 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to 2536 * write to the queue). 2537 */ 2538 int 2539 bcanput(queue_t *q, unsigned char pri) 2540 { 2541 qband_t *qbp; 2542 2543 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri); 2544 if (!q) 2545 return (0); 2546 2547 /* Find next forward module that has a service procedure */ 2548 q = q->q_nfsrv; 2549 2550 mutex_enter(QLOCK(q)); 2551 if (pri == 0) { 2552 if (q->q_flag & QFULL) { 2553 q->q_flag |= QWANTW; 2554 mutex_exit(QLOCK(q)); 2555 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2556 "bcanput:%p %X %d", q, pri, 0); 2557 return (0); 2558 } 2559 } else { /* pri != 0 */ 2560 if (pri > q->q_nband) { 2561 /* 2562 * No band exists yet, so return success. 2563 */ 2564 mutex_exit(QLOCK(q)); 2565 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2566 "bcanput:%p %X %d", q, pri, 1); 2567 return (1); 2568 } 2569 qbp = q->q_bandp; 2570 while (--pri) 2571 qbp = qbp->qb_next; 2572 if (qbp->qb_flag & QB_FULL) { 2573 qbp->qb_flag |= QB_WANTW; 2574 mutex_exit(QLOCK(q)); 2575 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2576 "bcanput:%p %X %d", q, pri, 0); 2577 return (0); 2578 } 2579 } 2580 mutex_exit(QLOCK(q)); 2581 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT, 2582 "bcanput:%p %X %d", q, pri, 1); 2583 return (1); 2584 } 2585 2586 /* 2587 * Put a message on a queue. 2588 * 2589 * Messages are enqueued on a priority basis. The priority classes 2590 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), 2591 * and B_NORMAL (type < QPCTL && band == 0). 2592 * 2593 * Add appropriate weighted data block sizes to queue count. 2594 * If queue hits high water mark then set QFULL flag. 2595 * 2596 * If QNOENAB is not set (putq is allowed to enable the queue), 2597 * enable the queue only if the message is PRIORITY, 2598 * or the QWANTR flag is set (indicating that the service procedure 2599 * is ready to read the queue. This implies that a service 2600 * procedure must NEVER put a high priority message back on its own 2601 * queue, as this would result in an infinite loop (!). 2602 */ 2603 int 2604 putq(queue_t *q, mblk_t *bp) 2605 { 2606 mblk_t *tmp; 2607 qband_t *qbp = NULL; 2608 int mcls = (int)queclass(bp); 2609 kthread_id_t freezer; 2610 int bytecnt = 0, mblkcnt = 0; 2611 2612 freezer = STREAM(q)->sd_freezer; 2613 if (freezer == curthread) { 2614 ASSERT(frozenstr(q)); 2615 ASSERT(MUTEX_HELD(QLOCK(q))); 2616 } else 2617 mutex_enter(QLOCK(q)); 2618 2619 /* 2620 * Make sanity checks and if qband structure is not yet 2621 * allocated, do so. 2622 */ 2623 if (mcls == QPCTL) { 2624 if (bp->b_band != 0) 2625 bp->b_band = 0; /* force to be correct */ 2626 } else if (bp->b_band != 0) { 2627 int i; 2628 qband_t **qbpp; 2629 2630 if (bp->b_band > q->q_nband) { 2631 2632 /* 2633 * The qband structure for this priority band is 2634 * not on the queue yet, so we have to allocate 2635 * one on the fly. It would be wasteful to 2636 * associate the qband structures with every 2637 * queue when the queues are allocated. This is 2638 * because most queues will only need the normal 2639 * band of flow which can be described entirely 2640 * by the queue itself. 2641 */ 2642 qbpp = &q->q_bandp; 2643 while (*qbpp) 2644 qbpp = &(*qbpp)->qb_next; 2645 while (bp->b_band > q->q_nband) { 2646 if ((*qbpp = allocband()) == NULL) { 2647 if (freezer != curthread) 2648 mutex_exit(QLOCK(q)); 2649 return (0); 2650 } 2651 (*qbpp)->qb_hiwat = q->q_hiwat; 2652 (*qbpp)->qb_lowat = q->q_lowat; 2653 q->q_nband++; 2654 qbpp = &(*qbpp)->qb_next; 2655 } 2656 } 2657 ASSERT(MUTEX_HELD(QLOCK(q))); 2658 qbp = q->q_bandp; 2659 i = bp->b_band; 2660 while (--i) 2661 qbp = qbp->qb_next; 2662 } 2663 2664 /* 2665 * If queue is empty, add the message and initialize the pointers. 2666 * Otherwise, adjust message pointers and queue pointers based on 2667 * the type of the message and where it belongs on the queue. Some 2668 * code is duplicated to minimize the number of conditionals and 2669 * hopefully minimize the amount of time this routine takes. 2670 */ 2671 if (!q->q_first) { 2672 bp->b_next = NULL; 2673 bp->b_prev = NULL; 2674 q->q_first = bp; 2675 q->q_last = bp; 2676 if (qbp) { 2677 qbp->qb_first = bp; 2678 qbp->qb_last = bp; 2679 } 2680 } else if (!qbp) { /* bp->b_band == 0 */ 2681 2682 /* 2683 * If queue class of message is less than or equal to 2684 * that of the last one on the queue, tack on to the end. 2685 */ 2686 tmp = q->q_last; 2687 if (mcls <= (int)queclass(tmp)) { 2688 bp->b_next = NULL; 2689 bp->b_prev = tmp; 2690 tmp->b_next = bp; 2691 q->q_last = bp; 2692 } else { 2693 tmp = q->q_first; 2694 while ((int)queclass(tmp) >= mcls) 2695 tmp = tmp->b_next; 2696 2697 /* 2698 * Insert bp before tmp. 2699 */ 2700 bp->b_next = tmp; 2701 bp->b_prev = tmp->b_prev; 2702 if (tmp->b_prev) 2703 tmp->b_prev->b_next = bp; 2704 else 2705 q->q_first = bp; 2706 tmp->b_prev = bp; 2707 } 2708 } else { /* bp->b_band != 0 */ 2709 if (qbp->qb_first) { 2710 tmp = qbp->qb_last; 2711 2712 /* 2713 * Insert bp after the last message in this band. 2714 */ 2715 bp->b_next = tmp->b_next; 2716 if (tmp->b_next) 2717 tmp->b_next->b_prev = bp; 2718 else 2719 q->q_last = bp; 2720 bp->b_prev = tmp; 2721 tmp->b_next = bp; 2722 } else { 2723 tmp = q->q_last; 2724 if ((mcls < (int)queclass(tmp)) || 2725 (bp->b_band <= tmp->b_band)) { 2726 2727 /* 2728 * Tack bp on end of queue. 2729 */ 2730 bp->b_next = NULL; 2731 bp->b_prev = tmp; 2732 tmp->b_next = bp; 2733 q->q_last = bp; 2734 } else { 2735 tmp = q->q_first; 2736 while (tmp->b_datap->db_type >= QPCTL) 2737 tmp = tmp->b_next; 2738 while (tmp->b_band >= bp->b_band) 2739 tmp = tmp->b_next; 2740 2741 /* 2742 * Insert bp before tmp. 2743 */ 2744 bp->b_next = tmp; 2745 bp->b_prev = tmp->b_prev; 2746 if (tmp->b_prev) 2747 tmp->b_prev->b_next = bp; 2748 else 2749 q->q_first = bp; 2750 tmp->b_prev = bp; 2751 } 2752 qbp->qb_first = bp; 2753 } 2754 qbp->qb_last = bp; 2755 } 2756 2757 /* Get message byte count for q_count accounting */ 2758 bytecnt = mp_cont_len(bp, &mblkcnt); 2759 2760 if (qbp) { 2761 qbp->qb_count += bytecnt; 2762 qbp->qb_mblkcnt += mblkcnt; 2763 if ((qbp->qb_count >= qbp->qb_hiwat) || 2764 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2765 qbp->qb_flag |= QB_FULL; 2766 } 2767 } else { 2768 q->q_count += bytecnt; 2769 q->q_mblkcnt += mblkcnt; 2770 if ((q->q_count >= q->q_hiwat) || 2771 (q->q_mblkcnt >= q->q_hiwat)) { 2772 q->q_flag |= QFULL; 2773 } 2774 } 2775 2776 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0); 2777 2778 if ((mcls > QNORM) || 2779 (canenable(q) && (q->q_flag & QWANTR || bp->b_band))) 2780 qenable_locked(q); 2781 ASSERT(MUTEX_HELD(QLOCK(q))); 2782 if (freezer != curthread) 2783 mutex_exit(QLOCK(q)); 2784 2785 return (1); 2786 } 2787 2788 /* 2789 * Put stuff back at beginning of Q according to priority order. 2790 * See comment on putq above for details. 2791 */ 2792 int 2793 putbq(queue_t *q, mblk_t *bp) 2794 { 2795 mblk_t *tmp; 2796 qband_t *qbp = NULL; 2797 int mcls = (int)queclass(bp); 2798 kthread_id_t freezer; 2799 int bytecnt = 0, mblkcnt = 0; 2800 2801 ASSERT(q && bp); 2802 ASSERT(bp->b_next == NULL); 2803 freezer = STREAM(q)->sd_freezer; 2804 if (freezer == curthread) { 2805 ASSERT(frozenstr(q)); 2806 ASSERT(MUTEX_HELD(QLOCK(q))); 2807 } else 2808 mutex_enter(QLOCK(q)); 2809 2810 /* 2811 * Make sanity checks and if qband structure is not yet 2812 * allocated, do so. 2813 */ 2814 if (mcls == QPCTL) { 2815 if (bp->b_band != 0) 2816 bp->b_band = 0; /* force to be correct */ 2817 } else if (bp->b_band != 0) { 2818 int i; 2819 qband_t **qbpp; 2820 2821 if (bp->b_band > q->q_nband) { 2822 qbpp = &q->q_bandp; 2823 while (*qbpp) 2824 qbpp = &(*qbpp)->qb_next; 2825 while (bp->b_band > q->q_nband) { 2826 if ((*qbpp = allocband()) == NULL) { 2827 if (freezer != curthread) 2828 mutex_exit(QLOCK(q)); 2829 return (0); 2830 } 2831 (*qbpp)->qb_hiwat = q->q_hiwat; 2832 (*qbpp)->qb_lowat = q->q_lowat; 2833 q->q_nband++; 2834 qbpp = &(*qbpp)->qb_next; 2835 } 2836 } 2837 qbp = q->q_bandp; 2838 i = bp->b_band; 2839 while (--i) 2840 qbp = qbp->qb_next; 2841 } 2842 2843 /* 2844 * If queue is empty or if message is high priority, 2845 * place on the front of the queue. 2846 */ 2847 tmp = q->q_first; 2848 if ((!tmp) || (mcls == QPCTL)) { 2849 bp->b_next = tmp; 2850 if (tmp) 2851 tmp->b_prev = bp; 2852 else 2853 q->q_last = bp; 2854 q->q_first = bp; 2855 bp->b_prev = NULL; 2856 if (qbp) { 2857 qbp->qb_first = bp; 2858 qbp->qb_last = bp; 2859 } 2860 } else if (qbp) { /* bp->b_band != 0 */ 2861 tmp = qbp->qb_first; 2862 if (tmp) { 2863 2864 /* 2865 * Insert bp before the first message in this band. 2866 */ 2867 bp->b_next = tmp; 2868 bp->b_prev = tmp->b_prev; 2869 if (tmp->b_prev) 2870 tmp->b_prev->b_next = bp; 2871 else 2872 q->q_first = bp; 2873 tmp->b_prev = bp; 2874 } else { 2875 tmp = q->q_last; 2876 if ((mcls < (int)queclass(tmp)) || 2877 (bp->b_band < tmp->b_band)) { 2878 2879 /* 2880 * Tack bp on end of queue. 2881 */ 2882 bp->b_next = NULL; 2883 bp->b_prev = tmp; 2884 tmp->b_next = bp; 2885 q->q_last = bp; 2886 } else { 2887 tmp = q->q_first; 2888 while (tmp->b_datap->db_type >= QPCTL) 2889 tmp = tmp->b_next; 2890 while (tmp->b_band > bp->b_band) 2891 tmp = tmp->b_next; 2892 2893 /* 2894 * Insert bp before tmp. 2895 */ 2896 bp->b_next = tmp; 2897 bp->b_prev = tmp->b_prev; 2898 if (tmp->b_prev) 2899 tmp->b_prev->b_next = bp; 2900 else 2901 q->q_first = bp; 2902 tmp->b_prev = bp; 2903 } 2904 qbp->qb_last = bp; 2905 } 2906 qbp->qb_first = bp; 2907 } else { /* bp->b_band == 0 && !QPCTL */ 2908 2909 /* 2910 * If the queue class or band is less than that of the last 2911 * message on the queue, tack bp on the end of the queue. 2912 */ 2913 tmp = q->q_last; 2914 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) { 2915 bp->b_next = NULL; 2916 bp->b_prev = tmp; 2917 tmp->b_next = bp; 2918 q->q_last = bp; 2919 } else { 2920 tmp = q->q_first; 2921 while (tmp->b_datap->db_type >= QPCTL) 2922 tmp = tmp->b_next; 2923 while (tmp->b_band > bp->b_band) 2924 tmp = tmp->b_next; 2925 2926 /* 2927 * Insert bp before tmp. 2928 */ 2929 bp->b_next = tmp; 2930 bp->b_prev = tmp->b_prev; 2931 if (tmp->b_prev) 2932 tmp->b_prev->b_next = bp; 2933 else 2934 q->q_first = bp; 2935 tmp->b_prev = bp; 2936 } 2937 } 2938 2939 /* Get message byte count for q_count accounting */ 2940 bytecnt = mp_cont_len(bp, &mblkcnt); 2941 2942 if (qbp) { 2943 qbp->qb_count += bytecnt; 2944 qbp->qb_mblkcnt += mblkcnt; 2945 if ((qbp->qb_count >= qbp->qb_hiwat) || 2946 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 2947 qbp->qb_flag |= QB_FULL; 2948 } 2949 } else { 2950 q->q_count += bytecnt; 2951 q->q_mblkcnt += mblkcnt; 2952 if ((q->q_count >= q->q_hiwat) || 2953 (q->q_mblkcnt >= q->q_hiwat)) { 2954 q->q_flag |= QFULL; 2955 } 2956 } 2957 2958 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0); 2959 2960 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR))) 2961 qenable_locked(q); 2962 ASSERT(MUTEX_HELD(QLOCK(q))); 2963 if (freezer != curthread) 2964 mutex_exit(QLOCK(q)); 2965 2966 return (1); 2967 } 2968 2969 /* 2970 * Insert a message before an existing message on the queue. If the 2971 * existing message is NULL, the new messages is placed on the end of 2972 * the queue. The queue class of the new message is ignored. However, 2973 * the priority band of the new message must adhere to the following 2974 * ordering: 2975 * 2976 * emp->b_prev->b_band >= mp->b_band >= emp->b_band. 2977 * 2978 * All flow control parameters are updated. 2979 * 2980 * insq can be called with the stream frozen, but other utility functions 2981 * holding QLOCK, and by streams modules without any locks/frozen. 2982 */ 2983 int 2984 insq(queue_t *q, mblk_t *emp, mblk_t *mp) 2985 { 2986 mblk_t *tmp; 2987 qband_t *qbp = NULL; 2988 int mcls = (int)queclass(mp); 2989 kthread_id_t freezer; 2990 int bytecnt = 0, mblkcnt = 0; 2991 2992 freezer = STREAM(q)->sd_freezer; 2993 if (freezer == curthread) { 2994 ASSERT(frozenstr(q)); 2995 ASSERT(MUTEX_HELD(QLOCK(q))); 2996 } else if (MUTEX_HELD(QLOCK(q))) { 2997 /* Don't drop lock on exit */ 2998 freezer = curthread; 2999 } else 3000 mutex_enter(QLOCK(q)); 3001 3002 if (mcls == QPCTL) { 3003 if (mp->b_band != 0) 3004 mp->b_band = 0; /* force to be correct */ 3005 if (emp && emp->b_prev && 3006 (emp->b_prev->b_datap->db_type < QPCTL)) 3007 goto badord; 3008 } 3009 if (emp) { 3010 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) || 3011 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) && 3012 (emp->b_prev->b_band < mp->b_band))) { 3013 goto badord; 3014 } 3015 } else { 3016 tmp = q->q_last; 3017 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) { 3018 badord: 3019 cmn_err(CE_WARN, 3020 "insq: attempt to insert message out of order " 3021 "on q %p", (void *)q); 3022 if (freezer != curthread) 3023 mutex_exit(QLOCK(q)); 3024 return (0); 3025 } 3026 } 3027 3028 if (mp->b_band != 0) { 3029 int i; 3030 qband_t **qbpp; 3031 3032 if (mp->b_band > q->q_nband) { 3033 qbpp = &q->q_bandp; 3034 while (*qbpp) 3035 qbpp = &(*qbpp)->qb_next; 3036 while (mp->b_band > q->q_nband) { 3037 if ((*qbpp = allocband()) == NULL) { 3038 if (freezer != curthread) 3039 mutex_exit(QLOCK(q)); 3040 return (0); 3041 } 3042 (*qbpp)->qb_hiwat = q->q_hiwat; 3043 (*qbpp)->qb_lowat = q->q_lowat; 3044 q->q_nband++; 3045 qbpp = &(*qbpp)->qb_next; 3046 } 3047 } 3048 qbp = q->q_bandp; 3049 i = mp->b_band; 3050 while (--i) 3051 qbp = qbp->qb_next; 3052 } 3053 3054 if ((mp->b_next = emp) != NULL) { 3055 if ((mp->b_prev = emp->b_prev) != NULL) 3056 emp->b_prev->b_next = mp; 3057 else 3058 q->q_first = mp; 3059 emp->b_prev = mp; 3060 } else { 3061 if ((mp->b_prev = q->q_last) != NULL) 3062 q->q_last->b_next = mp; 3063 else 3064 q->q_first = mp; 3065 q->q_last = mp; 3066 } 3067 3068 /* Get mblk and byte count for q_count accounting */ 3069 bytecnt = mp_cont_len(mp, &mblkcnt); 3070 3071 if (qbp) { /* adjust qband pointers and count */ 3072 if (!qbp->qb_first) { 3073 qbp->qb_first = mp; 3074 qbp->qb_last = mp; 3075 } else { 3076 if (mp->b_prev == NULL || (mp->b_prev != NULL && 3077 (mp->b_prev->b_band != mp->b_band))) 3078 qbp->qb_first = mp; 3079 else if (mp->b_next == NULL || (mp->b_next != NULL && 3080 (mp->b_next->b_band != mp->b_band))) 3081 qbp->qb_last = mp; 3082 } 3083 qbp->qb_count += bytecnt; 3084 qbp->qb_mblkcnt += mblkcnt; 3085 if ((qbp->qb_count >= qbp->qb_hiwat) || 3086 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) { 3087 qbp->qb_flag |= QB_FULL; 3088 } 3089 } else { 3090 q->q_count += bytecnt; 3091 q->q_mblkcnt += mblkcnt; 3092 if ((q->q_count >= q->q_hiwat) || 3093 (q->q_mblkcnt >= q->q_hiwat)) { 3094 q->q_flag |= QFULL; 3095 } 3096 } 3097 3098 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0); 3099 3100 if (canenable(q) && (q->q_flag & QWANTR)) 3101 qenable_locked(q); 3102 3103 ASSERT(MUTEX_HELD(QLOCK(q))); 3104 if (freezer != curthread) 3105 mutex_exit(QLOCK(q)); 3106 3107 return (1); 3108 } 3109 3110 /* 3111 * Create and put a control message on queue. 3112 */ 3113 int 3114 putctl(queue_t *q, int type) 3115 { 3116 mblk_t *bp; 3117 3118 if ((datamsg(type) && (type != M_DELAY)) || 3119 (bp = allocb_tryhard(0)) == NULL) 3120 return (0); 3121 bp->b_datap->db_type = (unsigned char) type; 3122 3123 put(q, bp); 3124 3125 return (1); 3126 } 3127 3128 /* 3129 * Control message with a single-byte parameter 3130 */ 3131 int 3132 putctl1(queue_t *q, int type, int param) 3133 { 3134 mblk_t *bp; 3135 3136 if ((datamsg(type) && (type != M_DELAY)) || 3137 (bp = allocb_tryhard(1)) == NULL) 3138 return (0); 3139 bp->b_datap->db_type = (unsigned char)type; 3140 *bp->b_wptr++ = (unsigned char)param; 3141 3142 put(q, bp); 3143 3144 return (1); 3145 } 3146 3147 int 3148 putnextctl1(queue_t *q, int type, int param) 3149 { 3150 mblk_t *bp; 3151 3152 if ((datamsg(type) && (type != M_DELAY)) || 3153 ((bp = allocb_tryhard(1)) == NULL)) 3154 return (0); 3155 3156 bp->b_datap->db_type = (unsigned char)type; 3157 *bp->b_wptr++ = (unsigned char)param; 3158 3159 putnext(q, bp); 3160 3161 return (1); 3162 } 3163 3164 int 3165 putnextctl(queue_t *q, int type) 3166 { 3167 mblk_t *bp; 3168 3169 if ((datamsg(type) && (type != M_DELAY)) || 3170 ((bp = allocb_tryhard(0)) == NULL)) 3171 return (0); 3172 bp->b_datap->db_type = (unsigned char)type; 3173 3174 putnext(q, bp); 3175 3176 return (1); 3177 } 3178 3179 /* 3180 * Return the queue upstream from this one 3181 */ 3182 queue_t * 3183 backq(queue_t *q) 3184 { 3185 q = _OTHERQ(q); 3186 if (q->q_next) { 3187 q = q->q_next; 3188 return (_OTHERQ(q)); 3189 } 3190 return (NULL); 3191 } 3192 3193 /* 3194 * Send a block back up the queue in reverse from this 3195 * one (e.g. to respond to ioctls) 3196 */ 3197 void 3198 qreply(queue_t *q, mblk_t *bp) 3199 { 3200 ASSERT(q && bp); 3201 3202 putnext(_OTHERQ(q), bp); 3203 } 3204 3205 /* 3206 * Streams Queue Scheduling 3207 * 3208 * Queues are enabled through qenable() when they have messages to 3209 * process. They are serviced by queuerun(), which runs each enabled 3210 * queue's service procedure. The call to queuerun() is processor 3211 * dependent - the general principle is that it be run whenever a queue 3212 * is enabled but before returning to user level. For system calls, 3213 * the function runqueues() is called if their action causes a queue 3214 * to be enabled. For device interrupts, queuerun() should be 3215 * called before returning from the last level of interrupt. Beyond 3216 * this, no timing assumptions should be made about queue scheduling. 3217 */ 3218 3219 /* 3220 * Enable a queue: put it on list of those whose service procedures are 3221 * ready to run and set up the scheduling mechanism. 3222 * The broadcast is done outside the mutex -> to avoid the woken thread 3223 * from contending with the mutex. This is OK 'cos the queue has been 3224 * enqueued on the runlist and flagged safely at this point. 3225 */ 3226 void 3227 qenable(queue_t *q) 3228 { 3229 mutex_enter(QLOCK(q)); 3230 qenable_locked(q); 3231 mutex_exit(QLOCK(q)); 3232 } 3233 /* 3234 * Return number of messages on queue 3235 */ 3236 int 3237 qsize(queue_t *qp) 3238 { 3239 int count = 0; 3240 mblk_t *mp; 3241 3242 mutex_enter(QLOCK(qp)); 3243 for (mp = qp->q_first; mp; mp = mp->b_next) 3244 count++; 3245 mutex_exit(QLOCK(qp)); 3246 return (count); 3247 } 3248 3249 /* 3250 * noenable - set queue so that putq() will not enable it. 3251 * enableok - set queue so that putq() can enable it. 3252 */ 3253 void 3254 noenable(queue_t *q) 3255 { 3256 mutex_enter(QLOCK(q)); 3257 q->q_flag |= QNOENB; 3258 mutex_exit(QLOCK(q)); 3259 } 3260 3261 void 3262 enableok(queue_t *q) 3263 { 3264 mutex_enter(QLOCK(q)); 3265 q->q_flag &= ~QNOENB; 3266 mutex_exit(QLOCK(q)); 3267 } 3268 3269 /* 3270 * Set queue fields. 3271 */ 3272 int 3273 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val) 3274 { 3275 qband_t *qbp = NULL; 3276 queue_t *wrq; 3277 int error = 0; 3278 kthread_id_t freezer; 3279 3280 freezer = STREAM(q)->sd_freezer; 3281 if (freezer == curthread) { 3282 ASSERT(frozenstr(q)); 3283 ASSERT(MUTEX_HELD(QLOCK(q))); 3284 } else 3285 mutex_enter(QLOCK(q)); 3286 3287 if (what >= QBAD) { 3288 error = EINVAL; 3289 goto done; 3290 } 3291 if (pri != 0) { 3292 int i; 3293 qband_t **qbpp; 3294 3295 if (pri > q->q_nband) { 3296 qbpp = &q->q_bandp; 3297 while (*qbpp) 3298 qbpp = &(*qbpp)->qb_next; 3299 while (pri > q->q_nband) { 3300 if ((*qbpp = allocband()) == NULL) { 3301 error = EAGAIN; 3302 goto done; 3303 } 3304 (*qbpp)->qb_hiwat = q->q_hiwat; 3305 (*qbpp)->qb_lowat = q->q_lowat; 3306 q->q_nband++; 3307 qbpp = &(*qbpp)->qb_next; 3308 } 3309 } 3310 qbp = q->q_bandp; 3311 i = pri; 3312 while (--i) 3313 qbp = qbp->qb_next; 3314 } 3315 switch (what) { 3316 3317 case QHIWAT: 3318 if (qbp) 3319 qbp->qb_hiwat = (size_t)val; 3320 else 3321 q->q_hiwat = (size_t)val; 3322 break; 3323 3324 case QLOWAT: 3325 if (qbp) 3326 qbp->qb_lowat = (size_t)val; 3327 else 3328 q->q_lowat = (size_t)val; 3329 break; 3330 3331 case QMAXPSZ: 3332 if (qbp) 3333 error = EINVAL; 3334 else 3335 q->q_maxpsz = (ssize_t)val; 3336 3337 /* 3338 * Performance concern, strwrite looks at the module below 3339 * the stream head for the maxpsz each time it does a write 3340 * we now cache it at the stream head. Check to see if this 3341 * queue is sitting directly below the stream head. 3342 */ 3343 wrq = STREAM(q)->sd_wrq; 3344 if (q != wrq->q_next) 3345 break; 3346 3347 /* 3348 * If the stream is not frozen drop the current QLOCK and 3349 * acquire the sd_wrq QLOCK which protects sd_qn_* 3350 */ 3351 if (freezer != curthread) { 3352 mutex_exit(QLOCK(q)); 3353 mutex_enter(QLOCK(wrq)); 3354 } 3355 ASSERT(MUTEX_HELD(QLOCK(wrq))); 3356 3357 if (strmsgsz != 0) { 3358 if (val == INFPSZ) 3359 val = strmsgsz; 3360 else { 3361 if (STREAM(q)->sd_vnode->v_type == VFIFO) 3362 val = MIN(PIPE_BUF, val); 3363 else 3364 val = MIN(strmsgsz, val); 3365 } 3366 } 3367 STREAM(q)->sd_qn_maxpsz = val; 3368 if (freezer != curthread) { 3369 mutex_exit(QLOCK(wrq)); 3370 mutex_enter(QLOCK(q)); 3371 } 3372 break; 3373 3374 case QMINPSZ: 3375 if (qbp) 3376 error = EINVAL; 3377 else 3378 q->q_minpsz = (ssize_t)val; 3379 3380 /* 3381 * Performance concern, strwrite looks at the module below 3382 * the stream head for the maxpsz each time it does a write 3383 * we now cache it at the stream head. Check to see if this 3384 * queue is sitting directly below the stream head. 3385 */ 3386 wrq = STREAM(q)->sd_wrq; 3387 if (q != wrq->q_next) 3388 break; 3389 3390 /* 3391 * If the stream is not frozen drop the current QLOCK and 3392 * acquire the sd_wrq QLOCK which protects sd_qn_* 3393 */ 3394 if (freezer != curthread) { 3395 mutex_exit(QLOCK(q)); 3396 mutex_enter(QLOCK(wrq)); 3397 } 3398 STREAM(q)->sd_qn_minpsz = (ssize_t)val; 3399 3400 if (freezer != curthread) { 3401 mutex_exit(QLOCK(wrq)); 3402 mutex_enter(QLOCK(q)); 3403 } 3404 break; 3405 3406 case QSTRUIOT: 3407 if (qbp) 3408 error = EINVAL; 3409 else 3410 q->q_struiot = (ushort_t)val; 3411 break; 3412 3413 case QCOUNT: 3414 case QFIRST: 3415 case QLAST: 3416 case QFLAG: 3417 error = EPERM; 3418 break; 3419 3420 default: 3421 error = EINVAL; 3422 break; 3423 } 3424 done: 3425 if (freezer != curthread) 3426 mutex_exit(QLOCK(q)); 3427 return (error); 3428 } 3429 3430 /* 3431 * Get queue fields. 3432 */ 3433 int 3434 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp) 3435 { 3436 qband_t *qbp = NULL; 3437 int error = 0; 3438 kthread_id_t freezer; 3439 3440 freezer = STREAM(q)->sd_freezer; 3441 if (freezer == curthread) { 3442 ASSERT(frozenstr(q)); 3443 ASSERT(MUTEX_HELD(QLOCK(q))); 3444 } else 3445 mutex_enter(QLOCK(q)); 3446 if (what >= QBAD) { 3447 error = EINVAL; 3448 goto done; 3449 } 3450 if (pri != 0) { 3451 int i; 3452 qband_t **qbpp; 3453 3454 if (pri > q->q_nband) { 3455 qbpp = &q->q_bandp; 3456 while (*qbpp) 3457 qbpp = &(*qbpp)->qb_next; 3458 while (pri > q->q_nband) { 3459 if ((*qbpp = allocband()) == NULL) { 3460 error = EAGAIN; 3461 goto done; 3462 } 3463 (*qbpp)->qb_hiwat = q->q_hiwat; 3464 (*qbpp)->qb_lowat = q->q_lowat; 3465 q->q_nband++; 3466 qbpp = &(*qbpp)->qb_next; 3467 } 3468 } 3469 qbp = q->q_bandp; 3470 i = pri; 3471 while (--i) 3472 qbp = qbp->qb_next; 3473 } 3474 switch (what) { 3475 case QHIWAT: 3476 if (qbp) 3477 *(size_t *)valp = qbp->qb_hiwat; 3478 else 3479 *(size_t *)valp = q->q_hiwat; 3480 break; 3481 3482 case QLOWAT: 3483 if (qbp) 3484 *(size_t *)valp = qbp->qb_lowat; 3485 else 3486 *(size_t *)valp = q->q_lowat; 3487 break; 3488 3489 case QMAXPSZ: 3490 if (qbp) 3491 error = EINVAL; 3492 else 3493 *(ssize_t *)valp = q->q_maxpsz; 3494 break; 3495 3496 case QMINPSZ: 3497 if (qbp) 3498 error = EINVAL; 3499 else 3500 *(ssize_t *)valp = q->q_minpsz; 3501 break; 3502 3503 case QCOUNT: 3504 if (qbp) 3505 *(size_t *)valp = qbp->qb_count; 3506 else 3507 *(size_t *)valp = q->q_count; 3508 break; 3509 3510 case QFIRST: 3511 if (qbp) 3512 *(mblk_t **)valp = qbp->qb_first; 3513 else 3514 *(mblk_t **)valp = q->q_first; 3515 break; 3516 3517 case QLAST: 3518 if (qbp) 3519 *(mblk_t **)valp = qbp->qb_last; 3520 else 3521 *(mblk_t **)valp = q->q_last; 3522 break; 3523 3524 case QFLAG: 3525 if (qbp) 3526 *(uint_t *)valp = qbp->qb_flag; 3527 else 3528 *(uint_t *)valp = q->q_flag; 3529 break; 3530 3531 case QSTRUIOT: 3532 if (qbp) 3533 error = EINVAL; 3534 else 3535 *(short *)valp = q->q_struiot; 3536 break; 3537 3538 default: 3539 error = EINVAL; 3540 break; 3541 } 3542 done: 3543 if (freezer != curthread) 3544 mutex_exit(QLOCK(q)); 3545 return (error); 3546 } 3547 3548 /* 3549 * Function awakes all in cvwait/sigwait/pollwait, on one of: 3550 * QWANTWSYNC or QWANTR or QWANTW, 3551 * 3552 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a 3553 * deferred wakeup will be done. Also if strpoll() in progress then a 3554 * deferred pollwakeup will be done. 3555 */ 3556 void 3557 strwakeq(queue_t *q, int flag) 3558 { 3559 stdata_t *stp = STREAM(q); 3560 pollhead_t *pl; 3561 3562 mutex_enter(&stp->sd_lock); 3563 pl = &stp->sd_pollist; 3564 if (flag & QWANTWSYNC) { 3565 ASSERT(!(q->q_flag & QREADR)); 3566 if (stp->sd_flag & WSLEEP) { 3567 stp->sd_flag &= ~WSLEEP; 3568 cv_broadcast(&stp->sd_wrq->q_wait); 3569 } else { 3570 stp->sd_wakeq |= WSLEEP; 3571 } 3572 3573 mutex_exit(&stp->sd_lock); 3574 pollwakeup(pl, POLLWRNORM); 3575 mutex_enter(&stp->sd_lock); 3576 3577 if (stp->sd_sigflags & S_WRNORM) 3578 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3579 } else if (flag & QWANTR) { 3580 if (stp->sd_flag & RSLEEP) { 3581 stp->sd_flag &= ~RSLEEP; 3582 cv_broadcast(&_RD(stp->sd_wrq)->q_wait); 3583 } else { 3584 stp->sd_wakeq |= RSLEEP; 3585 } 3586 3587 mutex_exit(&stp->sd_lock); 3588 pollwakeup(pl, POLLIN | POLLRDNORM); 3589 mutex_enter(&stp->sd_lock); 3590 3591 { 3592 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM); 3593 3594 if (events) 3595 strsendsig(stp->sd_siglist, events, 0, 0); 3596 } 3597 } else { 3598 if (stp->sd_flag & WSLEEP) { 3599 stp->sd_flag &= ~WSLEEP; 3600 cv_broadcast(&stp->sd_wrq->q_wait); 3601 } 3602 3603 mutex_exit(&stp->sd_lock); 3604 pollwakeup(pl, POLLWRNORM); 3605 mutex_enter(&stp->sd_lock); 3606 3607 if (stp->sd_sigflags & S_WRNORM) 3608 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0); 3609 } 3610 mutex_exit(&stp->sd_lock); 3611 } 3612 3613 int 3614 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock) 3615 { 3616 stdata_t *stp = STREAM(q); 3617 int typ = STRUIOT_STANDARD; 3618 uio_t *uiop = &dp->d_uio; 3619 dblk_t *dbp; 3620 ssize_t uiocnt; 3621 ssize_t cnt; 3622 unsigned char *ptr; 3623 ssize_t resid; 3624 int error = 0; 3625 on_trap_data_t otd; 3626 queue_t *stwrq; 3627 3628 /* 3629 * Plumbing may change while taking the type so store the 3630 * queue in a temporary variable. It doesn't matter even 3631 * if the we take the type from the previous plumbing, 3632 * that's because if the plumbing has changed when we were 3633 * holding the queue in a temporary variable, we can continue 3634 * processing the message the way it would have been processed 3635 * in the old plumbing, without any side effects but a bit 3636 * extra processing for partial ip header checksum. 3637 * 3638 * This has been done to avoid holding the sd_lock which is 3639 * very hot. 3640 */ 3641 3642 stwrq = stp->sd_struiowrq; 3643 if (stwrq) 3644 typ = stwrq->q_struiot; 3645 3646 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) { 3647 dbp = mp->b_datap; 3648 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff); 3649 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3650 cnt = MIN(uiocnt, uiop->uio_resid); 3651 if (!(dbp->db_struioflag & STRUIO_SPEC) || 3652 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) { 3653 /* 3654 * Either this mblk has already been processed 3655 * or there is no more room in this mblk (?). 3656 */ 3657 continue; 3658 } 3659 switch (typ) { 3660 case STRUIOT_STANDARD: 3661 if (noblock) { 3662 if (on_trap(&otd, OT_DATA_ACCESS)) { 3663 no_trap(); 3664 error = EWOULDBLOCK; 3665 goto out; 3666 } 3667 } 3668 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) { 3669 if (noblock) 3670 no_trap(); 3671 goto out; 3672 } 3673 if (noblock) 3674 no_trap(); 3675 break; 3676 3677 default: 3678 error = EIO; 3679 goto out; 3680 } 3681 dbp->db_struioflag |= STRUIO_DONE; 3682 dbp->db_cksumstuff += cnt; 3683 } 3684 out: 3685 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) { 3686 /* 3687 * A fault has occured and some bytes were moved to the 3688 * current mblk, the uio_t has already been updated by 3689 * the appropriate uio routine, so also update the mblk 3690 * to reflect this in case this same mblk chain is used 3691 * again (after the fault has been handled). 3692 */ 3693 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff; 3694 if (uiocnt >= resid) 3695 dbp->db_cksumstuff += resid; 3696 } 3697 return (error); 3698 } 3699 3700 /* 3701 * Try to enter queue synchronously. Any attempt to enter a closing queue will 3702 * fails. The qp->q_rwcnt keeps track of the number of successful entries so 3703 * that removeq() will not try to close the queue while a thread is inside the 3704 * queue. 3705 */ 3706 static boolean_t 3707 rwnext_enter(queue_t *qp) 3708 { 3709 mutex_enter(QLOCK(qp)); 3710 if (qp->q_flag & QWCLOSE) { 3711 mutex_exit(QLOCK(qp)); 3712 return (B_FALSE); 3713 } 3714 qp->q_rwcnt++; 3715 ASSERT(qp->q_rwcnt != 0); 3716 mutex_exit(QLOCK(qp)); 3717 return (B_TRUE); 3718 } 3719 3720 /* 3721 * Decrease the count of threads running in sync stream queue and wake up any 3722 * threads blocked in removeq(). 3723 */ 3724 static void 3725 rwnext_exit(queue_t *qp) 3726 { 3727 mutex_enter(QLOCK(qp)); 3728 qp->q_rwcnt--; 3729 if (qp->q_flag & QWANTRMQSYNC) { 3730 qp->q_flag &= ~QWANTRMQSYNC; 3731 cv_broadcast(&qp->q_wait); 3732 } 3733 mutex_exit(QLOCK(qp)); 3734 } 3735 3736 /* 3737 * The purpose of rwnext() is to call the rw procedure of the next 3738 * (downstream) modules queue. 3739 * 3740 * treated as put entrypoint for perimeter syncronization. 3741 * 3742 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3743 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does 3744 * not matter if any regular put entrypoints have been already entered. We 3745 * can't increment one of the sq_putcounts (instead of sq_count) because 3746 * qwait_rw won't know which counter to decrement. 3747 * 3748 * It would be reasonable to add the lockless FASTPUT logic. 3749 */ 3750 int 3751 rwnext(queue_t *qp, struiod_t *dp) 3752 { 3753 queue_t *nqp; 3754 syncq_t *sq; 3755 uint16_t count; 3756 uint16_t flags; 3757 struct qinit *qi; 3758 int (*proc)(); 3759 struct stdata *stp; 3760 int isread; 3761 int rval; 3762 3763 stp = STREAM(qp); 3764 /* 3765 * Prevent q_next from changing by holding sd_lock until acquiring 3766 * SQLOCK. Note that a read-side rwnext from the streamhead will 3767 * already have sd_lock acquired. In either case sd_lock is always 3768 * released after acquiring SQLOCK. 3769 * 3770 * The streamhead read-side holding sd_lock when calling rwnext is 3771 * required to prevent a race condition were M_DATA mblks flowing 3772 * up the read-side of the stream could be bypassed by a rwnext() 3773 * down-call. In this case sd_lock acts as the streamhead perimeter. 3774 */ 3775 if ((nqp = _WR(qp)) == qp) { 3776 isread = 0; 3777 mutex_enter(&stp->sd_lock); 3778 qp = nqp->q_next; 3779 } else { 3780 isread = 1; 3781 if (nqp != stp->sd_wrq) 3782 /* Not streamhead */ 3783 mutex_enter(&stp->sd_lock); 3784 qp = _RD(nqp->q_next); 3785 } 3786 qi = qp->q_qinfo; 3787 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) { 3788 /* 3789 * Not a synchronous module or no r/w procedure for this 3790 * queue, so just return EINVAL and let the caller handle it. 3791 */ 3792 mutex_exit(&stp->sd_lock); 3793 return (EINVAL); 3794 } 3795 3796 if (rwnext_enter(qp) == B_FALSE) { 3797 mutex_exit(&stp->sd_lock); 3798 return (EINVAL); 3799 } 3800 3801 sq = qp->q_syncq; 3802 mutex_enter(SQLOCK(sq)); 3803 mutex_exit(&stp->sd_lock); 3804 count = sq->sq_count; 3805 flags = sq->sq_flags; 3806 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3807 3808 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3809 /* 3810 * if this queue is being closed, return. 3811 */ 3812 if (qp->q_flag & QWCLOSE) { 3813 mutex_exit(SQLOCK(sq)); 3814 rwnext_exit(qp); 3815 return (EINVAL); 3816 } 3817 3818 /* 3819 * Wait until we can enter the inner perimeter. 3820 */ 3821 sq->sq_flags = flags | SQ_WANTWAKEUP; 3822 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3823 count = sq->sq_count; 3824 flags = sq->sq_flags; 3825 } 3826 3827 if (isread == 0 && stp->sd_struiowrq == NULL || 3828 isread == 1 && stp->sd_struiordq == NULL) { 3829 /* 3830 * Stream plumbing changed while waiting for inner perimeter 3831 * so just return EINVAL and let the caller handle it. 3832 */ 3833 mutex_exit(SQLOCK(sq)); 3834 rwnext_exit(qp); 3835 return (EINVAL); 3836 } 3837 if (!(flags & SQ_CIPUT)) 3838 sq->sq_flags = flags | SQ_EXCL; 3839 sq->sq_count = count + 1; 3840 ASSERT(sq->sq_count != 0); /* Wraparound */ 3841 /* 3842 * Note: The only message ordering guarantee that rwnext() makes is 3843 * for the write queue flow-control case. All others (r/w queue 3844 * with q_count > 0 (or q_first != 0)) are the resposibilty of 3845 * the queue's rw procedure. This could be genralized here buy 3846 * running the queue's service procedure, but that wouldn't be 3847 * the most efficent for all cases. 3848 */ 3849 mutex_exit(SQLOCK(sq)); 3850 if (! isread && (qp->q_flag & QFULL)) { 3851 /* 3852 * Write queue may be flow controlled. If so, 3853 * mark the queue for wakeup when it's not. 3854 */ 3855 mutex_enter(QLOCK(qp)); 3856 if (qp->q_flag & QFULL) { 3857 qp->q_flag |= QWANTWSYNC; 3858 mutex_exit(QLOCK(qp)); 3859 rval = EWOULDBLOCK; 3860 goto out; 3861 } 3862 mutex_exit(QLOCK(qp)); 3863 } 3864 3865 if (! isread && dp->d_mp) 3866 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr - 3867 dp->d_mp->b_datap->db_base); 3868 3869 rval = (*proc)(qp, dp); 3870 3871 if (isread && dp->d_mp) 3872 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT, 3873 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base); 3874 out: 3875 /* 3876 * The queue is protected from being freed by sq_count, so it is 3877 * safe to call rwnext_exit and reacquire SQLOCK(sq). 3878 */ 3879 rwnext_exit(qp); 3880 3881 mutex_enter(SQLOCK(sq)); 3882 flags = sq->sq_flags; 3883 ASSERT(sq->sq_count != 0); 3884 sq->sq_count--; 3885 if (flags & SQ_TAIL) { 3886 putnext_tail(sq, qp, flags); 3887 /* 3888 * The only purpose of this ASSERT is to preserve calling stack 3889 * in DEBUG kernel. 3890 */ 3891 ASSERT(flags & SQ_TAIL); 3892 return (rval); 3893 } 3894 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 3895 /* 3896 * Safe to always drop SQ_EXCL: 3897 * Not SQ_CIPUT means we set SQ_EXCL above 3898 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 3899 * did a qwriter(INNER) in which case nobody else 3900 * is in the inner perimeter and we are exiting. 3901 * 3902 * I would like to make the following assertion: 3903 * 3904 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 3905 * sq->sq_count == 0); 3906 * 3907 * which indicates that if we are both putshared and exclusive, 3908 * we became exclusive while executing the putproc, and the only 3909 * claim on the syncq was the one we dropped a few lines above. 3910 * But other threads that enter putnext while the syncq is exclusive 3911 * need to make a claim as they may need to drop SQLOCK in the 3912 * has_writers case to avoid deadlocks. If these threads are 3913 * delayed or preempted, it is possible that the writer thread can 3914 * find out that there are other claims making the (sq_count == 0) 3915 * test invalid. 3916 */ 3917 3918 sq->sq_flags = flags & ~SQ_EXCL; 3919 if (sq->sq_flags & SQ_WANTWAKEUP) { 3920 sq->sq_flags &= ~SQ_WANTWAKEUP; 3921 cv_broadcast(&sq->sq_wait); 3922 } 3923 mutex_exit(SQLOCK(sq)); 3924 return (rval); 3925 } 3926 3927 /* 3928 * The purpose of infonext() is to call the info procedure of the next 3929 * (downstream) modules queue. 3930 * 3931 * treated as put entrypoint for perimeter syncronization. 3932 * 3933 * There's no need to grab sq_putlocks here (which only exist for CIPUT 3934 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and 3935 * it does not matter if any regular put entrypoints have been already 3936 * entered. 3937 */ 3938 int 3939 infonext(queue_t *qp, infod_t *idp) 3940 { 3941 queue_t *nqp; 3942 syncq_t *sq; 3943 uint16_t count; 3944 uint16_t flags; 3945 struct qinit *qi; 3946 int (*proc)(); 3947 struct stdata *stp; 3948 int rval; 3949 3950 stp = STREAM(qp); 3951 /* 3952 * Prevent q_next from changing by holding sd_lock until 3953 * acquiring SQLOCK. 3954 */ 3955 mutex_enter(&stp->sd_lock); 3956 if ((nqp = _WR(qp)) == qp) { 3957 qp = nqp->q_next; 3958 } else { 3959 qp = _RD(nqp->q_next); 3960 } 3961 qi = qp->q_qinfo; 3962 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) { 3963 mutex_exit(&stp->sd_lock); 3964 return (EINVAL); 3965 } 3966 sq = qp->q_syncq; 3967 mutex_enter(SQLOCK(sq)); 3968 mutex_exit(&stp->sd_lock); 3969 count = sq->sq_count; 3970 flags = sq->sq_flags; 3971 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT)); 3972 3973 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) { 3974 /* 3975 * Wait until we can enter the inner perimeter. 3976 */ 3977 sq->sq_flags = flags | SQ_WANTWAKEUP; 3978 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3979 count = sq->sq_count; 3980 flags = sq->sq_flags; 3981 } 3982 3983 if (! (flags & SQ_CIPUT)) 3984 sq->sq_flags = flags | SQ_EXCL; 3985 sq->sq_count = count + 1; 3986 ASSERT(sq->sq_count != 0); /* Wraparound */ 3987 mutex_exit(SQLOCK(sq)); 3988 3989 rval = (*proc)(qp, idp); 3990 3991 mutex_enter(SQLOCK(sq)); 3992 flags = sq->sq_flags; 3993 ASSERT(sq->sq_count != 0); 3994 sq->sq_count--; 3995 if (flags & SQ_TAIL) { 3996 putnext_tail(sq, qp, flags); 3997 /* 3998 * The only purpose of this ASSERT is to preserve calling stack 3999 * in DEBUG kernel. 4000 */ 4001 ASSERT(flags & SQ_TAIL); 4002 return (rval); 4003 } 4004 ASSERT(flags & (SQ_EXCL|SQ_CIPUT)); 4005 /* 4006 * XXXX 4007 * I am not certain the next comment is correct here. I need to consider 4008 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT 4009 * might cause other problems. It just might be safer to drop it if 4010 * !SQ_CIPUT because that is when we set it. 4011 */ 4012 /* 4013 * Safe to always drop SQ_EXCL: 4014 * Not SQ_CIPUT means we set SQ_EXCL above 4015 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure 4016 * did a qwriter(INNER) in which case nobody else 4017 * is in the inner perimeter and we are exiting. 4018 * 4019 * I would like to make the following assertion: 4020 * 4021 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || 4022 * sq->sq_count == 0); 4023 * 4024 * which indicates that if we are both putshared and exclusive, 4025 * we became exclusive while executing the putproc, and the only 4026 * claim on the syncq was the one we dropped a few lines above. 4027 * But other threads that enter putnext while the syncq is exclusive 4028 * need to make a claim as they may need to drop SQLOCK in the 4029 * has_writers case to avoid deadlocks. If these threads are 4030 * delayed or preempted, it is possible that the writer thread can 4031 * find out that there are other claims making the (sq_count == 0) 4032 * test invalid. 4033 */ 4034 4035 sq->sq_flags = flags & ~SQ_EXCL; 4036 mutex_exit(SQLOCK(sq)); 4037 return (rval); 4038 } 4039 4040 /* 4041 * Return nonzero if the queue is responsible for struio(), else return 0. 4042 */ 4043 int 4044 isuioq(queue_t *q) 4045 { 4046 if (q->q_flag & QREADR) 4047 return (STREAM(q)->sd_struiordq == q); 4048 else 4049 return (STREAM(q)->sd_struiowrq == q); 4050 } 4051 4052 #if defined(__sparc) 4053 int disable_putlocks = 0; 4054 #else 4055 int disable_putlocks = 1; 4056 #endif 4057 4058 /* 4059 * called by create_putlock. 4060 */ 4061 static void 4062 create_syncq_putlocks(queue_t *q) 4063 { 4064 syncq_t *sq = q->q_syncq; 4065 ciputctrl_t *cip; 4066 int i; 4067 4068 ASSERT(sq != NULL); 4069 4070 ASSERT(disable_putlocks == 0); 4071 ASSERT(n_ciputctrl >= min_n_ciputctrl); 4072 ASSERT(ciputctrl_cache != NULL); 4073 4074 if (!(sq->sq_type & SQ_CIPUT)) 4075 return; 4076 4077 for (i = 0; i <= 1; i++) { 4078 if (sq->sq_ciputctrl == NULL) { 4079 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4080 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4081 mutex_enter(SQLOCK(sq)); 4082 if (sq->sq_ciputctrl != NULL) { 4083 mutex_exit(SQLOCK(sq)); 4084 kmem_cache_free(ciputctrl_cache, cip); 4085 } else { 4086 ASSERT(sq->sq_nciputctrl == 0); 4087 sq->sq_nciputctrl = n_ciputctrl - 1; 4088 /* 4089 * putnext checks sq_ciputctrl without holding 4090 * SQLOCK. if it is not NULL putnext assumes 4091 * sq_nciputctrl is initialized. membar below 4092 * insures that. 4093 */ 4094 membar_producer(); 4095 sq->sq_ciputctrl = cip; 4096 mutex_exit(SQLOCK(sq)); 4097 } 4098 } 4099 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 4100 if (i == 1) 4101 break; 4102 q = _OTHERQ(q); 4103 if (!(q->q_flag & QPERQ)) { 4104 ASSERT(sq == q->q_syncq); 4105 break; 4106 } 4107 ASSERT(q->q_syncq != NULL); 4108 ASSERT(sq != q->q_syncq); 4109 sq = q->q_syncq; 4110 ASSERT(sq->sq_type & SQ_CIPUT); 4111 } 4112 } 4113 4114 /* 4115 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for 4116 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for 4117 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's 4118 * starting from q and down to the driver. 4119 * 4120 * This should be called after the affected queues are part of stream 4121 * geometry. It should be called from driver/module open routine after 4122 * qprocson() call. It is also called from nfs syscall where it is known that 4123 * stream is configured and won't change its geometry during create_putlock 4124 * call. 4125 * 4126 * caller normally uses 0 value for the stream argument to speed up MT putnext 4127 * into the perimeter of q for example because its perimeter is per module 4128 * (e.g. IP). 4129 * 4130 * caller normally uses non 0 value for the stream argument to hint the system 4131 * that the stream of q is a very contended global system stream 4132 * (e.g. NFS/UDP) and the part of the stream from q to the driver is 4133 * particularly MT hot. 4134 * 4135 * Caller insures stream plumbing won't happen while we are here and therefore 4136 * q_next can be safely used. 4137 */ 4138 4139 void 4140 create_putlocks(queue_t *q, int stream) 4141 { 4142 ciputctrl_t *cip; 4143 struct stdata *stp = STREAM(q); 4144 4145 q = _WR(q); 4146 ASSERT(stp != NULL); 4147 4148 if (disable_putlocks != 0) 4149 return; 4150 4151 if (n_ciputctrl < min_n_ciputctrl) 4152 return; 4153 4154 ASSERT(ciputctrl_cache != NULL); 4155 4156 if (stream != 0 && stp->sd_ciputctrl == NULL) { 4157 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP); 4158 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0); 4159 mutex_enter(&stp->sd_lock); 4160 if (stp->sd_ciputctrl != NULL) { 4161 mutex_exit(&stp->sd_lock); 4162 kmem_cache_free(ciputctrl_cache, cip); 4163 } else { 4164 ASSERT(stp->sd_nciputctrl == 0); 4165 stp->sd_nciputctrl = n_ciputctrl - 1; 4166 /* 4167 * putnext checks sd_ciputctrl without holding 4168 * sd_lock. if it is not NULL putnext assumes 4169 * sd_nciputctrl is initialized. membar below 4170 * insures that. 4171 */ 4172 membar_producer(); 4173 stp->sd_ciputctrl = cip; 4174 mutex_exit(&stp->sd_lock); 4175 } 4176 } 4177 4178 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1); 4179 4180 while (_SAMESTR(q)) { 4181 create_syncq_putlocks(q); 4182 if (stream == 0) 4183 return; 4184 q = q->q_next; 4185 } 4186 ASSERT(q != NULL); 4187 create_syncq_putlocks(q); 4188 } 4189 4190 /* 4191 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows 4192 * through a stream. 4193 * 4194 * Data currently record per-event is a timestamp, module/driver name, 4195 * downstream module/driver name, optional callstack, event type and a per 4196 * type datum. Much of the STREAMS framework is instrumented for automatic 4197 * flow tracing (when enabled). Events can be defined and used by STREAMS 4198 * modules and drivers. 4199 * 4200 * Global objects: 4201 * 4202 * str_ftevent() - Add a flow-trace event to a dblk. 4203 * str_ftfree() - Free flow-trace data 4204 * 4205 * Local objects: 4206 * 4207 * fthdr_cache - pointer to the kmem cache for trace header. 4208 * ftblk_cache - pointer to the kmem cache for trace data blocks. 4209 */ 4210 4211 int str_ftnever = 1; /* Don't do STREAMS flow tracing */ 4212 int str_ftstack = 0; /* Don't record event call stacks */ 4213 4214 void 4215 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data) 4216 { 4217 ftblk_t *bp = hp->tail; 4218 ftblk_t *nbp; 4219 ftevnt_t *ep; 4220 int ix, nix; 4221 4222 ASSERT(hp != NULL); 4223 4224 for (;;) { 4225 if ((ix = bp->ix) == FTBLK_EVNTS) { 4226 /* 4227 * Tail doesn't have room, so need a new tail. 4228 * 4229 * To make this MT safe, first, allocate a new 4230 * ftblk, and initialize it. To make life a 4231 * little easier, reserve the first slot (mostly 4232 * by making ix = 1). When we are finished with 4233 * the initialization, CAS this pointer to the 4234 * tail. If this succeeds, this is the new 4235 * "next" block. Otherwise, another thread 4236 * got here first, so free the block and start 4237 * again. 4238 */ 4239 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP); 4240 if (nbp == NULL) { 4241 /* no mem, so punt */ 4242 str_ftnever++; 4243 /* free up all flow data? */ 4244 return; 4245 } 4246 nbp->nxt = NULL; 4247 nbp->ix = 1; 4248 /* 4249 * Just in case there is another thread about 4250 * to get the next index, we need to make sure 4251 * the value is there for it. 4252 */ 4253 membar_producer(); 4254 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) { 4255 /* CAS was successful */ 4256 bp->nxt = nbp; 4257 membar_producer(); 4258 bp = nbp; 4259 ix = 0; 4260 goto cas_good; 4261 } else { 4262 kmem_cache_free(ftblk_cache, nbp); 4263 bp = hp->tail; 4264 continue; 4265 } 4266 } 4267 nix = ix + 1; 4268 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) { 4269 cas_good: 4270 if (curthread != hp->thread) { 4271 hp->thread = curthread; 4272 evnt |= FTEV_CS; 4273 } 4274 if (CPU->cpu_seqid != hp->cpu_seqid) { 4275 hp->cpu_seqid = CPU->cpu_seqid; 4276 evnt |= FTEV_PS; 4277 } 4278 ep = &bp->ev[ix]; 4279 break; 4280 } 4281 } 4282 4283 if (evnt & FTEV_QMASK) { 4284 queue_t *qp = p; 4285 4286 if (!(qp->q_flag & QREADR)) 4287 evnt |= FTEV_ISWR; 4288 4289 ep->mid = Q2NAME(qp); 4290 4291 /* 4292 * We only record the next queue name for FTEV_PUTNEXT since 4293 * that's the only time we *really* need it, and the putnext() 4294 * code ensures that qp->q_next won't vanish. (We could use 4295 * claimstr()/releasestr() but at a performance cost.) 4296 */ 4297 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL) 4298 ep->midnext = Q2NAME(qp->q_next); 4299 else 4300 ep->midnext = NULL; 4301 } else { 4302 ep->mid = p; 4303 ep->midnext = NULL; 4304 } 4305 4306 if (ep->stk != NULL) 4307 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH); 4308 4309 ep->ts = gethrtime(); 4310 ep->evnt = evnt; 4311 ep->data = data; 4312 hp->hash = (hp->hash << 9) + hp->hash; 4313 hp->hash += (evnt << 16) | data; 4314 hp->hash += (uintptr_t)ep->mid; 4315 } 4316 4317 /* 4318 * Free flow-trace data. 4319 */ 4320 void 4321 str_ftfree(dblk_t *dbp) 4322 { 4323 fthdr_t *hp = dbp->db_fthdr; 4324 ftblk_t *bp = &hp->first; 4325 ftblk_t *nbp; 4326 4327 if (bp != hp->tail || bp->ix != 0) { 4328 /* 4329 * Clear out the hash, have the tail point to itself, and free 4330 * any continuation blocks. 4331 */ 4332 bp = hp->first.nxt; 4333 hp->tail = &hp->first; 4334 hp->hash = 0; 4335 hp->first.nxt = NULL; 4336 hp->first.ix = 0; 4337 while (bp != NULL) { 4338 nbp = bp->nxt; 4339 kmem_cache_free(ftblk_cache, bp); 4340 bp = nbp; 4341 } 4342 } 4343 kmem_cache_free(fthdr_cache, hp); 4344 dbp->db_fthdr = NULL; 4345 } 4346