1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 * Copyright 2018 Joyent, Inc. 26 */ 27 28 /* 29 * STREAMS Buffering module 30 * 31 * This streams module collects incoming messages from modules below 32 * it on the stream and buffers them up into a smaller number of 33 * aggregated messages. Its main purpose is to reduce overhead by 34 * cutting down on the number of read (or getmsg) calls its client 35 * user process makes. 36 * - only M_DATA is buffered. 37 * - multithreading assumes configured as D_MTQPAIR 38 * - packets are lost only if flag SB_NO_HEADER is clear and buffer 39 * allocation fails. 40 * - in order message transmission. This is enforced for messages other 41 * than high priority messages. 42 * - zero length messages on the read side are not passed up the 43 * stream but used internally for synchronization. 44 * FLAGS: 45 * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA. 46 * (conversion is the default for backwards compatibility 47 * hence the negative logic). 48 * - SB_NO_HEADER - no headers in buffered data. 49 * (adding headers is the default for backwards compatibility 50 * hence the negative logic). 51 * - SB_DEFER_CHUNK - provides improved response time in question-answer 52 * applications. Buffering is not enabled until the second message 53 * is received on the read side within the sb_ticks interval. 54 * This option will often be used in combination with flag SB_SEND_ON_WRITE. 55 * - SB_SEND_ON_WRITE - a write message results in any pending buffered read 56 * data being immediately sent upstream. 57 * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates 58 * the blocked flow condition downstream. If this flag is clear (default) 59 * messages will be dropped if the upstream flow is blocked. 60 */ 61 62 63 #include <sys/types.h> 64 #include <sys/errno.h> 65 #include <sys/debug.h> 66 #include <sys/stropts.h> 67 #include <sys/time.h> 68 #include <sys/stream.h> 69 #include <sys/conf.h> 70 #include <sys/ddi.h> 71 #include <sys/sunddi.h> 72 #include <sys/kmem.h> 73 #include <sys/strsun.h> 74 #include <sys/bufmod.h> 75 #include <sys/modctl.h> 76 #include <sys/isa_defs.h> 77 78 /* 79 * Per-Stream state information. 80 * 81 * If sb_ticks is negative, we don't deliver chunks until they're 82 * full. If it's zero, we deliver every packet as it arrives. (In 83 * this case we force sb_chunk to zero, to make the implementation 84 * easier.) Otherwise, sb_ticks gives the number of ticks in a 85 * buffering interval. The interval begins when the a read side data 86 * message is received and a timeout is not active. If sb_snap is 87 * zero, no truncation of the msg is done. 88 */ 89 struct sb { 90 queue_t *sb_rq; /* our rq */ 91 mblk_t *sb_mp; /* partial chunk */ 92 mblk_t *sb_head; /* pre-allocated space for the next header */ 93 mblk_t *sb_tail; /* first mblk of last message appended */ 94 uint_t sb_mlen; /* sb_mp length */ 95 uint_t sb_mcount; /* input msg count in sb_mp */ 96 uint_t sb_chunk; /* max chunk size */ 97 clock_t sb_ticks; /* timeout interval */ 98 timeout_id_t sb_timeoutid; /* qtimeout() id */ 99 uint_t sb_drops; /* cumulative # discarded msgs */ 100 uint_t sb_snap; /* snapshot length */ 101 uint_t sb_flags; /* flags field */ 102 uint_t sb_state; /* state variable */ 103 }; 104 105 /* 106 * Function prototypes. 107 */ 108 static int sbopen(queue_t *, dev_t *, int, int, cred_t *); 109 static int sbclose(queue_t *, int, cred_t *); 110 static void sbwput(queue_t *, mblk_t *); 111 static void sbrput(queue_t *, mblk_t *); 112 static void sbrsrv(queue_t *); 113 static void sbioctl(queue_t *, mblk_t *); 114 static void sbaddmsg(queue_t *, mblk_t *); 115 static void sbtick(void *); 116 static void sbclosechunk(struct sb *); 117 static void sbsendit(queue_t *, mblk_t *); 118 119 static struct module_info sb_minfo = { 120 21, /* mi_idnum */ 121 "bufmod", /* mi_idname */ 122 0, /* mi_minpsz */ 123 INFPSZ, /* mi_maxpsz */ 124 1, /* mi_hiwat */ 125 0 /* mi_lowat */ 126 }; 127 128 static struct qinit sb_rinit = { 129 (int (*)())sbrput, /* qi_putp */ 130 (int (*)())sbrsrv, /* qi_srvp */ 131 sbopen, /* qi_qopen */ 132 sbclose, /* qi_qclose */ 133 NULL, /* qi_qadmin */ 134 &sb_minfo, /* qi_minfo */ 135 NULL /* qi_mstat */ 136 }; 137 138 static struct qinit sb_winit = { 139 (int (*)())sbwput, /* qi_putp */ 140 NULL, /* qi_srvp */ 141 NULL, /* qi_qopen */ 142 NULL, /* qi_qclose */ 143 NULL, /* qi_qadmin */ 144 &sb_minfo, /* qi_minfo */ 145 NULL /* qi_mstat */ 146 }; 147 148 static struct streamtab sb_info = { 149 &sb_rinit, /* st_rdinit */ 150 &sb_winit, /* st_wrinit */ 151 NULL, /* st_muxrinit */ 152 NULL /* st_muxwinit */ 153 }; 154 155 156 /* 157 * This is the loadable module wrapper. 158 */ 159 160 static struct fmodsw fsw = { 161 "bufmod", 162 &sb_info, 163 D_MTQPAIR | D_MP 164 }; 165 166 /* 167 * Module linkage information for the kernel. 168 */ 169 170 static struct modlstrmod modlstrmod = { 171 &mod_strmodops, "streams buffer mod", &fsw 172 }; 173 174 static struct modlinkage modlinkage = { 175 MODREV_1, &modlstrmod, NULL 176 }; 177 178 179 int 180 _init(void) 181 { 182 return (mod_install(&modlinkage)); 183 } 184 185 int 186 _fini(void) 187 { 188 return (mod_remove(&modlinkage)); 189 } 190 191 int 192 _info(struct modinfo *modinfop) 193 { 194 return (mod_info(&modlinkage, modinfop)); 195 } 196 197 198 /* ARGSUSED */ 199 static int 200 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp) 201 { 202 struct sb *sbp; 203 ASSERT(rq); 204 205 if (sflag != MODOPEN) 206 return (EINVAL); 207 208 if (rq->q_ptr) 209 return (0); 210 211 /* 212 * Allocate and initialize per-Stream structure. 213 */ 214 sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP); 215 sbp->sb_rq = rq; 216 sbp->sb_ticks = -1; 217 sbp->sb_chunk = SB_DFLT_CHUNK; 218 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 219 sbp->sb_mlen = 0; 220 sbp->sb_mcount = 0; 221 sbp->sb_timeoutid = 0; 222 sbp->sb_drops = 0; 223 sbp->sb_snap = 0; 224 sbp->sb_flags = 0; 225 sbp->sb_state = 0; 226 227 rq->q_ptr = WR(rq)->q_ptr = sbp; 228 229 qprocson(rq); 230 231 232 return (0); 233 } 234 235 /* ARGSUSED1 */ 236 static int 237 sbclose(queue_t *rq, int flag, cred_t *credp) 238 { 239 struct sb *sbp = (struct sb *)rq->q_ptr; 240 241 ASSERT(sbp); 242 243 qprocsoff(rq); 244 /* 245 * Cancel an outstanding timeout 246 */ 247 if (sbp->sb_timeoutid != 0) { 248 (void) quntimeout(rq, sbp->sb_timeoutid); 249 sbp->sb_timeoutid = 0; 250 } 251 /* 252 * Free the current chunk. 253 */ 254 if (sbp->sb_mp) { 255 freemsg(sbp->sb_mp); 256 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 257 sbp->sb_mlen = 0; 258 } 259 260 /* 261 * Free the per-Stream structure. 262 */ 263 kmem_free((caddr_t)sbp, sizeof (struct sb)); 264 rq->q_ptr = WR(rq)->q_ptr = NULL; 265 266 return (0); 267 } 268 269 /* 270 * the correction factor is introduced to compensate for 271 * whatever assumptions the modules below have made about 272 * how much traffic is flowing through the stream and the fact 273 * that bufmod may be snipping messages with the sb_snap length. 274 */ 275 #define SNIT_HIWAT(msgsize, fudge) ((4 * msgsize * fudge) + 512) 276 #define SNIT_LOWAT(msgsize, fudge) ((2 * msgsize * fudge) + 256) 277 278 279 static void 280 sbioc(queue_t *wq, mblk_t *mp) 281 { 282 struct iocblk *iocp; 283 struct sb *sbp = (struct sb *)wq->q_ptr; 284 clock_t ticks; 285 mblk_t *mop; 286 287 iocp = (struct iocblk *)mp->b_rptr; 288 289 switch (iocp->ioc_cmd) { 290 case SBIOCGCHUNK: 291 case SBIOCGSNAP: 292 case SBIOCGFLAGS: 293 case SBIOCGTIME: 294 miocack(wq, mp, 0, 0); 295 return; 296 297 case SBIOCSTIME: 298 #ifdef _SYSCALL32_IMPL 299 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 300 struct timeval32 *t32; 301 302 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 303 if (t32->tv_sec < 0 || t32->tv_usec < 0) { 304 miocnak(wq, mp, 0, EINVAL); 305 break; 306 } 307 ticks = TIMEVAL_TO_TICK(t32); 308 } else 309 #endif /* _SYSCALL32_IMPL */ 310 { 311 struct timeval *tb; 312 313 tb = (struct timeval *)mp->b_cont->b_rptr; 314 315 if (tb->tv_sec < 0 || tb->tv_usec < 0) { 316 miocnak(wq, mp, 0, EINVAL); 317 break; 318 } 319 ticks = TIMEVAL_TO_TICK(tb); 320 } 321 sbp->sb_ticks = ticks; 322 if (ticks == 0) 323 sbp->sb_chunk = 0; 324 miocack(wq, mp, 0, 0); 325 sbclosechunk(sbp); 326 return; 327 328 case SBIOCSCHUNK: 329 /* 330 * set up hi/lo water marks on stream head read queue. 331 * unlikely to run out of resources. Fix at later date. 332 */ 333 if ((mop = allocb(sizeof (struct stroptions), 334 BPRI_MED)) != NULL) { 335 struct stroptions *sop; 336 uint_t chunk; 337 338 chunk = *(uint_t *)mp->b_cont->b_rptr; 339 mop->b_datap->db_type = M_SETOPTS; 340 mop->b_wptr += sizeof (struct stroptions); 341 sop = (struct stroptions *)mop->b_rptr; 342 sop->so_flags = SO_HIWAT | SO_LOWAT; 343 sop->so_hiwat = SNIT_HIWAT(chunk, 1); 344 sop->so_lowat = SNIT_LOWAT(chunk, 1); 345 qreply(wq, mop); 346 } 347 348 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr; 349 miocack(wq, mp, 0, 0); 350 sbclosechunk(sbp); 351 return; 352 353 case SBIOCSFLAGS: 354 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr; 355 miocack(wq, mp, 0, 0); 356 return; 357 358 case SBIOCSSNAP: 359 /* 360 * if chunking dont worry about effects of 361 * snipping of message size on head flow control 362 * since it has a relatively small bearing on the 363 * data rate onto the streamn head. 364 */ 365 if (!sbp->sb_chunk) { 366 /* 367 * set up hi/lo water marks on stream head read queue. 368 * unlikely to run out of resources. Fix at later date. 369 */ 370 if ((mop = allocb(sizeof (struct stroptions), 371 BPRI_MED)) != NULL) { 372 struct stroptions *sop; 373 uint_t snap; 374 int fudge; 375 376 snap = *(uint_t *)mp->b_cont->b_rptr; 377 mop->b_datap->db_type = M_SETOPTS; 378 mop->b_wptr += sizeof (struct stroptions); 379 sop = (struct stroptions *)mop->b_rptr; 380 sop->so_flags = SO_HIWAT | SO_LOWAT; 381 fudge = snap <= 100 ? 4 : 382 snap <= 400 ? 2 : 383 1; 384 sop->so_hiwat = SNIT_HIWAT(snap, fudge); 385 sop->so_lowat = SNIT_LOWAT(snap, fudge); 386 qreply(wq, mop); 387 } 388 } 389 390 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr; 391 miocack(wq, mp, 0, 0); 392 return; 393 394 default: 395 ASSERT(0); 396 return; 397 } 398 } 399 400 /* 401 * Write-side put procedure. Its main task is to detect ioctls 402 * for manipulating the buffering state and hand them to sbioctl. 403 * Other message types are passed on through. 404 */ 405 static void 406 sbwput(queue_t *wq, mblk_t *mp) 407 { 408 struct sb *sbp = (struct sb *)wq->q_ptr; 409 struct copyresp *resp; 410 411 if (sbp->sb_flags & SB_SEND_ON_WRITE) 412 sbclosechunk(sbp); 413 switch (mp->b_datap->db_type) { 414 case M_IOCTL: 415 sbioctl(wq, mp); 416 break; 417 418 case M_IOCDATA: 419 resp = (struct copyresp *)mp->b_rptr; 420 if (resp->cp_rval) { 421 /* 422 * Just free message on failure. 423 */ 424 freemsg(mp); 425 break; 426 } 427 428 switch (resp->cp_cmd) { 429 case SBIOCSTIME: 430 case SBIOCSCHUNK: 431 case SBIOCSFLAGS: 432 case SBIOCSSNAP: 433 case SBIOCGTIME: 434 case SBIOCGCHUNK: 435 case SBIOCGSNAP: 436 case SBIOCGFLAGS: 437 sbioc(wq, mp); 438 break; 439 440 default: 441 putnext(wq, mp); 442 break; 443 } 444 break; 445 446 default: 447 putnext(wq, mp); 448 break; 449 } 450 } 451 452 /* 453 * Read-side put procedure. It's responsible for buffering up incoming 454 * messages and grouping them into aggregates according to the current 455 * buffering parameters. 456 */ 457 static void 458 sbrput(queue_t *rq, mblk_t *mp) 459 { 460 struct sb *sbp = (struct sb *)rq->q_ptr; 461 462 ASSERT(sbp); 463 464 switch (mp->b_datap->db_type) { 465 case M_PROTO: 466 if (sbp->sb_flags & SB_NO_PROTO_CVT) { 467 sbclosechunk(sbp); 468 sbsendit(rq, mp); 469 break; 470 } else { 471 /* 472 * Convert M_PROTO to M_DATA. 473 */ 474 mp->b_datap->db_type = M_DATA; 475 } 476 /* FALLTHRU */ 477 478 case M_DATA: 479 if ((sbp->sb_flags & SB_DEFER_CHUNK) && 480 !(sbp->sb_state & SB_FRCVD)) { 481 sbclosechunk(sbp); 482 sbsendit(rq, mp); 483 sbp->sb_state |= SB_FRCVD; 484 } else 485 sbaddmsg(rq, mp); 486 487 if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid)) 488 sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick, 489 sbp, sbp->sb_ticks); 490 491 break; 492 493 case M_FLUSH: 494 if (*mp->b_rptr & FLUSHR) { 495 /* 496 * Reset timeout, flush the chunk currently in 497 * progress, and start a new chunk. 498 */ 499 if (sbp->sb_timeoutid) { 500 (void) quntimeout(sbp->sb_rq, 501 sbp->sb_timeoutid); 502 sbp->sb_timeoutid = 0; 503 } 504 if (sbp->sb_mp) { 505 freemsg(sbp->sb_mp); 506 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 507 sbp->sb_mlen = 0; 508 sbp->sb_mcount = 0; 509 } 510 flushq(rq, FLUSHALL); 511 } 512 putnext(rq, mp); 513 break; 514 515 case M_CTL: 516 /* 517 * Zero-length M_CTL means our timeout() popped. 518 */ 519 if (MBLKL(mp) == 0) { 520 freemsg(mp); 521 sbclosechunk(sbp); 522 } else { 523 sbclosechunk(sbp); 524 sbsendit(rq, mp); 525 } 526 break; 527 528 default: 529 if (mp->b_datap->db_type <= QPCTL) { 530 sbclosechunk(sbp); 531 sbsendit(rq, mp); 532 } else { 533 /* Note: out of band */ 534 putnext(rq, mp); 535 } 536 break; 537 } 538 } 539 540 /* 541 * read service procedure. 542 */ 543 /* ARGSUSED */ 544 static void 545 sbrsrv(queue_t *rq) 546 { 547 mblk_t *mp; 548 549 /* 550 * High priority messages shouldn't get here but if 551 * one does, jam it through to avoid infinite loop. 552 */ 553 while ((mp = getq(rq)) != NULL) { 554 if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) { 555 /* should only get here if SB_NO_SROPS */ 556 (void) putbq(rq, mp); 557 return; 558 } 559 putnext(rq, mp); 560 } 561 } 562 563 /* 564 * Handle write-side M_IOCTL messages. 565 */ 566 static void 567 sbioctl(queue_t *wq, mblk_t *mp) 568 { 569 struct sb *sbp = (struct sb *)wq->q_ptr; 570 struct iocblk *iocp = (struct iocblk *)mp->b_rptr; 571 struct timeval *t; 572 clock_t ticks; 573 mblk_t *mop; 574 int transparent = iocp->ioc_count; 575 mblk_t *datamp; 576 int error; 577 578 switch (iocp->ioc_cmd) { 579 case SBIOCSTIME: 580 if (iocp->ioc_count == TRANSPARENT) { 581 #ifdef _SYSCALL32_IMPL 582 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 583 mcopyin(mp, NULL, sizeof (struct timeval32), 584 NULL); 585 } else 586 #endif /* _SYSCALL32_IMPL */ 587 { 588 mcopyin(mp, NULL, sizeof (*t), NULL); 589 } 590 qreply(wq, mp); 591 } else { 592 /* 593 * Verify argument length. 594 */ 595 #ifdef _SYSCALL32_IMPL 596 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 597 struct timeval32 *t32; 598 599 error = miocpullup(mp, 600 sizeof (struct timeval32)); 601 if (error != 0) { 602 miocnak(wq, mp, 0, error); 603 break; 604 } 605 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 606 if (t32->tv_sec < 0 || t32->tv_usec < 0) { 607 miocnak(wq, mp, 0, EINVAL); 608 break; 609 } 610 ticks = TIMEVAL_TO_TICK(t32); 611 } else 612 #endif /* _SYSCALL32_IMPL */ 613 { 614 error = miocpullup(mp, sizeof (struct timeval)); 615 if (error != 0) { 616 miocnak(wq, mp, 0, error); 617 break; 618 } 619 620 t = (struct timeval *)mp->b_cont->b_rptr; 621 if (t->tv_sec < 0 || t->tv_usec < 0) { 622 miocnak(wq, mp, 0, EINVAL); 623 break; 624 } 625 ticks = TIMEVAL_TO_TICK(t); 626 } 627 sbp->sb_ticks = ticks; 628 if (ticks == 0) 629 sbp->sb_chunk = 0; 630 miocack(wq, mp, 0, 0); 631 sbclosechunk(sbp); 632 } 633 break; 634 635 case SBIOCGTIME: { 636 struct timeval *t; 637 638 /* 639 * Verify argument length. 640 */ 641 if (transparent != TRANSPARENT) { 642 #ifdef _SYSCALL32_IMPL 643 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 644 error = miocpullup(mp, 645 sizeof (struct timeval32)); 646 if (error != 0) { 647 miocnak(wq, mp, 0, error); 648 break; 649 } 650 } else 651 #endif /* _SYSCALL32_IMPL */ 652 error = miocpullup(mp, sizeof (struct timeval)); 653 if (error != 0) { 654 miocnak(wq, mp, 0, error); 655 break; 656 } 657 } 658 659 /* 660 * If infinite timeout, return range error 661 * for the ioctl. 662 */ 663 if (sbp->sb_ticks < 0) { 664 miocnak(wq, mp, 0, ERANGE); 665 break; 666 } 667 668 #ifdef _SYSCALL32_IMPL 669 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 670 struct timeval32 *t32; 671 672 if (transparent == TRANSPARENT) { 673 datamp = allocb(sizeof (*t32), BPRI_MED); 674 if (datamp == NULL) { 675 miocnak(wq, mp, 0, EAGAIN); 676 break; 677 } 678 mcopyout(mp, NULL, sizeof (*t32), NULL, datamp); 679 } 680 681 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 682 TICK_TO_TIMEVAL32(sbp->sb_ticks, t32); 683 684 if (transparent == TRANSPARENT) 685 qreply(wq, mp); 686 else 687 miocack(wq, mp, sizeof (*t32), 0); 688 } else 689 #endif /* _SYSCALL32_IMPL */ 690 { 691 if (transparent == TRANSPARENT) { 692 datamp = allocb(sizeof (*t), BPRI_MED); 693 if (datamp == NULL) { 694 miocnak(wq, mp, 0, EAGAIN); 695 break; 696 } 697 mcopyout(mp, NULL, sizeof (*t), NULL, datamp); 698 } 699 700 t = (struct timeval *)mp->b_cont->b_rptr; 701 TICK_TO_TIMEVAL(sbp->sb_ticks, t); 702 703 if (transparent == TRANSPARENT) 704 qreply(wq, mp); 705 else 706 miocack(wq, mp, sizeof (*t), 0); 707 } 708 break; 709 } 710 711 case SBIOCCTIME: 712 sbp->sb_ticks = -1; 713 miocack(wq, mp, 0, 0); 714 break; 715 716 case SBIOCSCHUNK: 717 if (iocp->ioc_count == TRANSPARENT) { 718 mcopyin(mp, NULL, sizeof (uint_t), NULL); 719 qreply(wq, mp); 720 } else { 721 /* 722 * Verify argument length. 723 */ 724 error = miocpullup(mp, sizeof (uint_t)); 725 if (error != 0) { 726 miocnak(wq, mp, 0, error); 727 break; 728 } 729 730 /* 731 * set up hi/lo water marks on stream head read queue. 732 * unlikely to run out of resources. Fix at later date. 733 */ 734 if ((mop = allocb(sizeof (struct stroptions), 735 BPRI_MED)) != NULL) { 736 struct stroptions *sop; 737 uint_t chunk; 738 739 chunk = *(uint_t *)mp->b_cont->b_rptr; 740 mop->b_datap->db_type = M_SETOPTS; 741 mop->b_wptr += sizeof (struct stroptions); 742 sop = (struct stroptions *)mop->b_rptr; 743 sop->so_flags = SO_HIWAT | SO_LOWAT; 744 sop->so_hiwat = SNIT_HIWAT(chunk, 1); 745 sop->so_lowat = SNIT_LOWAT(chunk, 1); 746 qreply(wq, mop); 747 } 748 749 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr; 750 miocack(wq, mp, 0, 0); 751 sbclosechunk(sbp); 752 } 753 break; 754 755 case SBIOCGCHUNK: 756 /* 757 * Verify argument length. 758 */ 759 if (transparent != TRANSPARENT) { 760 error = miocpullup(mp, sizeof (uint_t)); 761 if (error != 0) { 762 miocnak(wq, mp, 0, error); 763 break; 764 } 765 } 766 767 if (transparent == TRANSPARENT) { 768 datamp = allocb(sizeof (uint_t), BPRI_MED); 769 if (datamp == NULL) { 770 miocnak(wq, mp, 0, EAGAIN); 771 break; 772 } 773 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 774 } 775 776 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk; 777 778 if (transparent == TRANSPARENT) 779 qreply(wq, mp); 780 else 781 miocack(wq, mp, sizeof (uint_t), 0); 782 break; 783 784 case SBIOCSSNAP: 785 if (iocp->ioc_count == TRANSPARENT) { 786 mcopyin(mp, NULL, sizeof (uint_t), NULL); 787 qreply(wq, mp); 788 } else { 789 /* 790 * Verify argument length. 791 */ 792 error = miocpullup(mp, sizeof (uint_t)); 793 if (error != 0) { 794 miocnak(wq, mp, 0, error); 795 break; 796 } 797 798 /* 799 * if chunking dont worry about effects of 800 * snipping of message size on head flow control 801 * since it has a relatively small bearing on the 802 * data rate onto the streamn head. 803 */ 804 if (!sbp->sb_chunk) { 805 /* 806 * set up hi/lo water marks on stream 807 * head read queue. unlikely to run out 808 * of resources. Fix at later date. 809 */ 810 if ((mop = allocb(sizeof (struct stroptions), 811 BPRI_MED)) != NULL) { 812 struct stroptions *sop; 813 uint_t snap; 814 int fudge; 815 816 snap = *(uint_t *)mp->b_cont->b_rptr; 817 mop->b_datap->db_type = M_SETOPTS; 818 mop->b_wptr += sizeof (*sop); 819 sop = (struct stroptions *)mop->b_rptr; 820 sop->so_flags = SO_HIWAT | SO_LOWAT; 821 fudge = (snap <= 100) ? 4 : 822 (snap <= 400) ? 2 : 1; 823 sop->so_hiwat = SNIT_HIWAT(snap, fudge); 824 sop->so_lowat = SNIT_LOWAT(snap, fudge); 825 qreply(wq, mop); 826 } 827 } 828 829 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr; 830 831 miocack(wq, mp, 0, 0); 832 } 833 break; 834 835 case SBIOCGSNAP: 836 /* 837 * Verify argument length 838 */ 839 if (transparent != TRANSPARENT) { 840 error = miocpullup(mp, sizeof (uint_t)); 841 if (error != 0) { 842 miocnak(wq, mp, 0, error); 843 break; 844 } 845 } 846 847 if (transparent == TRANSPARENT) { 848 datamp = allocb(sizeof (uint_t), BPRI_MED); 849 if (datamp == NULL) { 850 miocnak(wq, mp, 0, EAGAIN); 851 break; 852 } 853 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 854 } 855 856 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap; 857 858 if (transparent == TRANSPARENT) 859 qreply(wq, mp); 860 else 861 miocack(wq, mp, sizeof (uint_t), 0); 862 break; 863 864 case SBIOCSFLAGS: 865 /* 866 * set the flags. 867 */ 868 if (iocp->ioc_count == TRANSPARENT) { 869 mcopyin(mp, NULL, sizeof (uint_t), NULL); 870 qreply(wq, mp); 871 } else { 872 error = miocpullup(mp, sizeof (uint_t)); 873 if (error != 0) { 874 miocnak(wq, mp, 0, error); 875 break; 876 } 877 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr; 878 miocack(wq, mp, 0, 0); 879 } 880 break; 881 882 case SBIOCGFLAGS: 883 /* 884 * Verify argument length 885 */ 886 if (transparent != TRANSPARENT) { 887 error = miocpullup(mp, sizeof (uint_t)); 888 if (error != 0) { 889 miocnak(wq, mp, 0, error); 890 break; 891 } 892 } 893 894 if (transparent == TRANSPARENT) { 895 datamp = allocb(sizeof (uint_t), BPRI_MED); 896 if (datamp == NULL) { 897 miocnak(wq, mp, 0, EAGAIN); 898 break; 899 } 900 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 901 } 902 903 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags; 904 905 if (transparent == TRANSPARENT) 906 qreply(wq, mp); 907 else 908 miocack(wq, mp, sizeof (uint_t), 0); 909 break; 910 911 912 default: 913 putnext(wq, mp); 914 break; 915 } 916 } 917 918 /* 919 * Given a length l, calculate the amount of extra storage 920 * required to round it up to the next multiple of the alignment a. 921 */ 922 #define RoundUpAmt(l, a) ((l) % (a) ? (a) - ((l) % (a)) : 0) 923 /* 924 * Calculate additional amount of space required for alignment. 925 */ 926 #define Align(l) RoundUpAmt(l, sizeof (ulong_t)) 927 /* 928 * Smallest possible message size when headers are enabled. 929 * This is used to calculate whether a chunk is nearly full. 930 */ 931 #define SMALLEST_MESSAGE sizeof (struct sb_hdr) + _POINTER_ALIGNMENT 932 933 /* 934 * Process a read-side M_DATA message. 935 * 936 * If the currently accumulating chunk doesn't have enough room 937 * for the message, close off the chunk, pass it upward, and start 938 * a new one. Then add the message to the current chunk, taking 939 * account of the possibility that the message's size exceeds the 940 * chunk size. 941 * 942 * If headers are enabled add an sb_hdr header and trailing alignment padding. 943 * 944 * To optimise performance the total number of msgbs should be kept 945 * to a minimum. This is achieved by using any remaining space in message N 946 * for both its own padding as well as the header of message N+1 if possible. 947 * If there's insufficient space we allocate one message to hold this 'wrapper'. 948 * (there's likely to be space beyond message N, since allocb would have 949 * rounded up the required size to one of the dblk_sizes). 950 * 951 */ 952 static void 953 sbaddmsg(queue_t *rq, mblk_t *mp) 954 { 955 struct sb *sbp; 956 struct timeval t; 957 struct sb_hdr hp; 958 mblk_t *wrapper; /* padding for msg N, header for msg N+1 */ 959 mblk_t *last; /* last mblk of current message */ 960 size_t wrapperlen; /* length of header + padding */ 961 size_t origlen; /* data length before truncation */ 962 size_t pad; /* bytes required to align header */ 963 964 sbp = (struct sb *)rq->q_ptr; 965 966 origlen = msgdsize(mp); 967 968 /* 969 * Truncate the message. 970 */ 971 if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) && 972 (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1)) 973 hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap; 974 else 975 hp.sbh_totlen = hp.sbh_msglen = origlen; 976 977 if (sbp->sb_flags & SB_NO_HEADER) { 978 979 /* 980 * Would the inclusion of this message overflow the current 981 * chunk? If so close the chunk off and start a new one. 982 */ 983 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk) 984 sbclosechunk(sbp); 985 /* 986 * First message too big for chunk - just send it up. 987 * This will always be true when we're not chunking. 988 */ 989 if (hp.sbh_totlen > sbp->sb_chunk) { 990 sbsendit(rq, mp); 991 return; 992 } 993 994 /* 995 * We now know that the msg will fit in the chunk. 996 * Link it onto the end of the chunk. 997 * Since linkb() walks the entire chain, we keep a pointer to 998 * the first mblk of the last msgb added and call linkb on that 999 * that last message, rather than performing the 1000 * O(n) linkb() operation on the whole chain. 1001 * sb_head isn't needed in this SB_NO_HEADER mode. 1002 */ 1003 if (sbp->sb_mp) 1004 linkb(sbp->sb_tail, mp); 1005 else 1006 sbp->sb_mp = mp; 1007 1008 sbp->sb_tail = mp; 1009 sbp->sb_mlen += hp.sbh_totlen; 1010 sbp->sb_mcount++; 1011 } else { 1012 /* Timestamp must be done immediately */ 1013 uniqtime(&t); 1014 TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t); 1015 1016 pad = Align(hp.sbh_totlen); 1017 hp.sbh_totlen += sizeof (hp); 1018 1019 /* We can't fit this message on the current chunk. */ 1020 if ((sbp->sb_mlen + hp.sbh_totlen) > sbp->sb_chunk) 1021 sbclosechunk(sbp); 1022 1023 /* 1024 * If we closed it (just now or during a previous 1025 * call) then allocate the head of a new chunk. 1026 */ 1027 if (sbp->sb_head == NULL) { 1028 /* Allocate leading header of new chunk */ 1029 sbp->sb_head = allocb(sizeof (hp), BPRI_MED); 1030 if (sbp->sb_head == NULL) { 1031 /* 1032 * Memory allocation failure. 1033 * This will need to be revisited 1034 * since using certain flag combinations 1035 * can result in messages being dropped 1036 * silently. 1037 */ 1038 freemsg(mp); 1039 sbp->sb_drops++; 1040 return; 1041 } 1042 sbp->sb_mp = sbp->sb_head; 1043 } 1044 1045 /* 1046 * Set the header values and join the message to the 1047 * chunk. The header values are copied into the chunk 1048 * after we adjust for padding below. 1049 */ 1050 hp.sbh_drops = sbp->sb_drops; 1051 hp.sbh_origlen = origlen; 1052 linkb(sbp->sb_head, mp); 1053 sbp->sb_mcount++; 1054 sbp->sb_mlen += hp.sbh_totlen; 1055 1056 /* 1057 * There's no chance to fit another message on the 1058 * chunk -- forgo the padding and close the chunk. 1059 */ 1060 if ((sbp->sb_mlen + pad + SMALLEST_MESSAGE) > sbp->sb_chunk) { 1061 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, 1062 sizeof (hp)); 1063 sbp->sb_head->b_wptr += sizeof (hp); 1064 ASSERT(sbp->sb_head->b_wptr <= 1065 sbp->sb_head->b_datap->db_lim); 1066 sbclosechunk(sbp); 1067 return; 1068 } 1069 1070 /* 1071 * We may add another message to this chunk -- adjust 1072 * the headers for padding to be added below. 1073 */ 1074 hp.sbh_totlen += pad; 1075 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp)); 1076 sbp->sb_head->b_wptr += sizeof (hp); 1077 ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim); 1078 sbp->sb_mlen += pad; 1079 1080 /* 1081 * Find space for the wrapper. The wrapper consists of: 1082 * 1083 * 1) Padding for this message (this is to ensure each header 1084 * begins on an 8 byte boundary in the userland buffer). 1085 * 1086 * 2) Space for the next message's header, in case the next 1087 * next message will fit in this chunk. 1088 * 1089 * It may be possible to append the wrapper to the last mblk 1090 * of the message, but only if we 'own' the data. If the dblk 1091 * has been shared through dupmsg() we mustn't alter it. 1092 */ 1093 wrapperlen = (sizeof (hp) + pad); 1094 1095 /* Is there space for the wrapper beyond the message's data ? */ 1096 for (last = mp; last->b_cont; last = last->b_cont) 1097 ; 1098 1099 if ((wrapperlen <= MBLKTAIL(last)) && 1100 (last->b_datap->db_ref == 1)) { 1101 if (pad > 0) { 1102 /* 1103 * Pad with zeroes to the next pointer boundary 1104 * (we don't want to disclose kernel data to 1105 * users), then advance wptr. 1106 */ 1107 (void) memset(last->b_wptr, 0, pad); 1108 last->b_wptr += pad; 1109 } 1110 /* Remember where to write the header information */ 1111 sbp->sb_head = last; 1112 } else { 1113 /* Have to allocate additional space for the wrapper */ 1114 wrapper = allocb(wrapperlen, BPRI_MED); 1115 if (wrapper == NULL) { 1116 sbclosechunk(sbp); 1117 return; 1118 } 1119 if (pad > 0) { 1120 /* 1121 * Pad with zeroes (we don't want to disclose 1122 * kernel data to users). 1123 */ 1124 (void) memset(wrapper->b_wptr, 0, pad); 1125 wrapper->b_wptr += pad; 1126 } 1127 /* Link the wrapper msg onto the end of the chunk */ 1128 linkb(mp, wrapper); 1129 /* Remember to write the next header in this wrapper */ 1130 sbp->sb_head = wrapper; 1131 } 1132 } 1133 } 1134 1135 /* 1136 * Called from timeout(). 1137 * Signal a timeout by passing a zero-length M_CTL msg in the read-side 1138 * to synchronize with any active module threads (open, close, wput, rput). 1139 */ 1140 static void 1141 sbtick(void *arg) 1142 { 1143 struct sb *sbp = arg; 1144 queue_t *rq; 1145 1146 ASSERT(sbp); 1147 1148 rq = sbp->sb_rq; 1149 sbp->sb_timeoutid = 0; /* timeout has fired */ 1150 1151 if (putctl(rq, M_CTL) == 0) /* failure */ 1152 sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks); 1153 } 1154 1155 /* 1156 * Close off the currently accumulating chunk and pass 1157 * it upward. Takes care of resetting timers as well. 1158 * 1159 * This routine is called both directly and as a result 1160 * of the chunk timeout expiring. 1161 */ 1162 static void 1163 sbclosechunk(struct sb *sbp) 1164 { 1165 mblk_t *mp; 1166 queue_t *rq; 1167 1168 ASSERT(sbp); 1169 1170 if (sbp->sb_timeoutid) { 1171 (void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid); 1172 sbp->sb_timeoutid = 0; 1173 } 1174 1175 mp = sbp->sb_mp; 1176 rq = sbp->sb_rq; 1177 1178 /* 1179 * If there's currently a chunk in progress, close it off 1180 * and try to send it up. 1181 */ 1182 if (mp) { 1183 sbsendit(rq, mp); 1184 } 1185 1186 /* 1187 * Clear old chunk. Ready for new msgs. 1188 */ 1189 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 1190 sbp->sb_mlen = 0; 1191 sbp->sb_mcount = 0; 1192 if (sbp->sb_flags & SB_DEFER_CHUNK) 1193 sbp->sb_state &= ~SB_FRCVD; 1194 1195 } 1196 1197 static void 1198 sbsendit(queue_t *rq, mblk_t *mp) 1199 { 1200 struct sb *sbp = (struct sb *)rq->q_ptr; 1201 1202 if (!canputnext(rq)) { 1203 if (sbp->sb_flags & SB_NO_DROPS) 1204 (void) putq(rq, mp); 1205 else { 1206 freemsg(mp); 1207 sbp->sb_drops += sbp->sb_mcount; 1208 } 1209 return; 1210 } 1211 /* 1212 * If there are messages on the q already, keep 1213 * queueing them since they need to be processed in order. 1214 */ 1215 if (qsize(rq) > 0) { 1216 /* should only get here if SB_NO_DROPS */ 1217 (void) putq(rq, mp); 1218 } 1219 else 1220 putnext(rq, mp); 1221 } 1222