1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * STREAMS Buffering module 31 * 32 * This streams module collects incoming messages from modules below 33 * it on the stream and buffers them up into a smaller number of 34 * aggregated messages. Its main purpose is to reduce overhead by 35 * cutting down on the number of read (or getmsg) calls its client 36 * user process makes. 37 * - only M_DATA is buffered. 38 * - multithreading assumes configured as D_MTQPAIR 39 * - packets are lost only if flag SB_NO_HEADER is clear and buffer 40 * allocation fails. 41 * - in order message transmission. This is enforced for messages other 42 * than high priority messages. 43 * - zero length messages on the read side are not passed up the 44 * stream but used internally for synchronization. 45 * FLAGS: 46 * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA. 47 * (conversion is the default for backwards compatibility 48 * hence the negative logic). 49 * - SB_NO_HEADER - no headers in buffered data. 50 * (adding headers is the default for backwards compatibility 51 * hence the negative logic). 52 * - SB_DEFER_CHUNK - provides improved response time in question-answer 53 * applications. Buffering is not enabled until the second message 54 * is received on the read side within the sb_ticks interval. 55 * This option will often be used in combination with flag SB_SEND_ON_WRITE. 56 * - SB_SEND_ON_WRITE - a write message results in any pending buffered read 57 * data being immediately sent upstream. 58 * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates 59 * the blocked flow condition downstream. If this flag is clear (default) 60 * messages will be dropped if the upstream flow is blocked. 61 */ 62 63 64 #include <sys/types.h> 65 #include <sys/errno.h> 66 #include <sys/debug.h> 67 #include <sys/stropts.h> 68 #include <sys/time.h> 69 #include <sys/stream.h> 70 #include <sys/conf.h> 71 #include <sys/ddi.h> 72 #include <sys/sunddi.h> 73 #include <sys/kmem.h> 74 #include <sys/strsun.h> 75 #include <sys/bufmod.h> 76 #include <sys/modctl.h> 77 #include <sys/isa_defs.h> 78 79 /* 80 * Per-Stream state information. 81 * 82 * If sb_ticks is negative, we don't deliver chunks until they're 83 * full. If it's zero, we deliver every packet as it arrives. (In 84 * this case we force sb_chunk to zero, to make the implementation 85 * easier.) Otherwise, sb_ticks gives the number of ticks in a 86 * buffering interval. The interval begins when the a read side data 87 * message is received and a timeout is not active. If sb_snap is 88 * zero, no truncation of the msg is done. 89 */ 90 struct sb { 91 queue_t *sb_rq; /* our rq */ 92 mblk_t *sb_mp; /* partial chunk */ 93 mblk_t *sb_head; /* pre-allocated space for the next header */ 94 mblk_t *sb_tail; /* first mblk of last message appended */ 95 uint_t sb_mlen; /* sb_mp length */ 96 uint_t sb_mcount; /* input msg count in sb_mp */ 97 uint_t sb_chunk; /* max chunk size */ 98 clock_t sb_ticks; /* timeout interval */ 99 timeout_id_t sb_timeoutid; /* qtimeout() id */ 100 uint_t sb_drops; /* cumulative # discarded msgs */ 101 uint_t sb_snap; /* snapshot length */ 102 uint_t sb_flags; /* flags field */ 103 uint_t sb_state; /* state variable */ 104 }; 105 106 /* 107 * Function prototypes. 108 */ 109 static int sbopen(queue_t *, dev_t *, int, int, cred_t *); 110 static int sbclose(queue_t *, int, cred_t *); 111 static void sbwput(queue_t *, mblk_t *); 112 static void sbrput(queue_t *, mblk_t *); 113 static void sbrsrv(queue_t *); 114 static void sbioctl(queue_t *, mblk_t *); 115 static void sbaddmsg(queue_t *, mblk_t *); 116 static void sbtick(void *); 117 static void sbclosechunk(struct sb *); 118 static void sbsendit(queue_t *, mblk_t *); 119 120 static struct module_info sb_minfo = { 121 21, /* mi_idnum */ 122 "bufmod", /* mi_idname */ 123 0, /* mi_minpsz */ 124 INFPSZ, /* mi_maxpsz */ 125 1, /* mi_hiwat */ 126 0 /* mi_lowat */ 127 }; 128 129 static struct qinit sb_rinit = { 130 (int (*)())sbrput, /* qi_putp */ 131 (int (*)())sbrsrv, /* qi_srvp */ 132 sbopen, /* qi_qopen */ 133 sbclose, /* qi_qclose */ 134 NULL, /* qi_qadmin */ 135 &sb_minfo, /* qi_minfo */ 136 NULL /* qi_mstat */ 137 }; 138 139 static struct qinit sb_winit = { 140 (int (*)())sbwput, /* qi_putp */ 141 NULL, /* qi_srvp */ 142 NULL, /* qi_qopen */ 143 NULL, /* qi_qclose */ 144 NULL, /* qi_qadmin */ 145 &sb_minfo, /* qi_minfo */ 146 NULL /* qi_mstat */ 147 }; 148 149 static struct streamtab sb_info = { 150 &sb_rinit, /* st_rdinit */ 151 &sb_winit, /* st_wrinit */ 152 NULL, /* st_muxrinit */ 153 NULL /* st_muxwinit */ 154 }; 155 156 157 /* 158 * This is the loadable module wrapper. 159 */ 160 161 static struct fmodsw fsw = { 162 "bufmod", 163 &sb_info, 164 D_MTQPAIR | D_MP 165 }; 166 167 /* 168 * Module linkage information for the kernel. 169 */ 170 171 static struct modlstrmod modlstrmod = { 172 &mod_strmodops, "streams buffer mod", &fsw 173 }; 174 175 static struct modlinkage modlinkage = { 176 MODREV_1, &modlstrmod, NULL 177 }; 178 179 180 int 181 _init(void) 182 { 183 return (mod_install(&modlinkage)); 184 } 185 186 int 187 _fini(void) 188 { 189 return (mod_remove(&modlinkage)); 190 } 191 192 int 193 _info(struct modinfo *modinfop) 194 { 195 return (mod_info(&modlinkage, modinfop)); 196 } 197 198 199 /* ARGSUSED */ 200 static int 201 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp) 202 { 203 struct sb *sbp; 204 ASSERT(rq); 205 206 if (sflag != MODOPEN) 207 return (EINVAL); 208 209 if (rq->q_ptr) 210 return (0); 211 212 /* 213 * Allocate and initialize per-Stream structure. 214 */ 215 sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP); 216 sbp->sb_rq = rq; 217 sbp->sb_ticks = -1; 218 sbp->sb_chunk = SB_DFLT_CHUNK; 219 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 220 sbp->sb_mlen = 0; 221 sbp->sb_mcount = 0; 222 sbp->sb_timeoutid = 0; 223 sbp->sb_drops = 0; 224 sbp->sb_snap = 0; 225 sbp->sb_flags = 0; 226 sbp->sb_state = 0; 227 228 rq->q_ptr = WR(rq)->q_ptr = sbp; 229 230 qprocson(rq); 231 232 233 return (0); 234 } 235 236 /* ARGSUSED1 */ 237 static int 238 sbclose(queue_t *rq, int flag, cred_t *credp) 239 { 240 struct sb *sbp = (struct sb *)rq->q_ptr; 241 242 ASSERT(sbp); 243 244 qprocsoff(rq); 245 /* 246 * Cancel an outstanding timeout 247 */ 248 if (sbp->sb_timeoutid != 0) { 249 (void) quntimeout(rq, sbp->sb_timeoutid); 250 sbp->sb_timeoutid = 0; 251 } 252 /* 253 * Free the current chunk. 254 */ 255 if (sbp->sb_mp) { 256 freemsg(sbp->sb_mp); 257 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 258 sbp->sb_mlen = 0; 259 } 260 261 /* 262 * Free the per-Stream structure. 263 */ 264 kmem_free((caddr_t)sbp, sizeof (struct sb)); 265 rq->q_ptr = WR(rq)->q_ptr = NULL; 266 267 return (0); 268 } 269 270 /* 271 * the correction factor is introduced to compensate for 272 * whatever assumptions the modules below have made about 273 * how much traffic is flowing through the stream and the fact 274 * that bufmod may be snipping messages with the sb_snap length. 275 */ 276 #define SNIT_HIWAT(msgsize, fudge) ((4 * msgsize * fudge) + 512) 277 #define SNIT_LOWAT(msgsize, fudge) ((2 * msgsize * fudge) + 256) 278 279 280 static void 281 sbioc(queue_t *wq, mblk_t *mp) 282 { 283 struct iocblk *iocp; 284 struct sb *sbp = (struct sb *)wq->q_ptr; 285 clock_t ticks; 286 mblk_t *mop; 287 288 iocp = (struct iocblk *)mp->b_rptr; 289 290 switch (iocp->ioc_cmd) { 291 case SBIOCGCHUNK: 292 case SBIOCGSNAP: 293 case SBIOCGFLAGS: 294 case SBIOCGTIME: 295 miocack(wq, mp, 0, 0); 296 return; 297 298 case SBIOCSTIME: 299 #ifdef _SYSCALL32_IMPL 300 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 301 struct timeval32 *t32; 302 303 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 304 if (t32->tv_sec < 0 || t32->tv_usec < 0) { 305 miocnak(wq, mp, 0, EINVAL); 306 break; 307 } 308 ticks = TIMEVAL_TO_TICK(t32); 309 } else 310 #endif /* _SYSCALL32_IMPL */ 311 { 312 struct timeval *tb; 313 314 tb = (struct timeval *)mp->b_cont->b_rptr; 315 316 if (tb->tv_sec < 0 || tb->tv_usec < 0) { 317 miocnak(wq, mp, 0, EINVAL); 318 break; 319 } 320 ticks = TIMEVAL_TO_TICK(tb); 321 } 322 sbp->sb_ticks = ticks; 323 if (ticks == 0) 324 sbp->sb_chunk = 0; 325 miocack(wq, mp, 0, 0); 326 sbclosechunk(sbp); 327 return; 328 329 case SBIOCSCHUNK: 330 /* 331 * set up hi/lo water marks on stream head read queue. 332 * unlikely to run out of resources. Fix at later date. 333 */ 334 if ((mop = allocb(sizeof (struct stroptions), 335 BPRI_MED)) != NULL) { 336 struct stroptions *sop; 337 uint_t chunk; 338 339 chunk = *(uint_t *)mp->b_cont->b_rptr; 340 mop->b_datap->db_type = M_SETOPTS; 341 mop->b_wptr += sizeof (struct stroptions); 342 sop = (struct stroptions *)mop->b_rptr; 343 sop->so_flags = SO_HIWAT | SO_LOWAT; 344 sop->so_hiwat = SNIT_HIWAT(chunk, 1); 345 sop->so_lowat = SNIT_LOWAT(chunk, 1); 346 qreply(wq, mop); 347 } 348 349 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr; 350 miocack(wq, mp, 0, 0); 351 sbclosechunk(sbp); 352 return; 353 354 case SBIOCSFLAGS: 355 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr; 356 miocack(wq, mp, 0, 0); 357 return; 358 359 case SBIOCSSNAP: 360 /* 361 * if chunking dont worry about effects of 362 * snipping of message size on head flow control 363 * since it has a relatively small bearing on the 364 * data rate onto the streamn head. 365 */ 366 if (!sbp->sb_chunk) { 367 /* 368 * set up hi/lo water marks on stream head read queue. 369 * unlikely to run out of resources. Fix at later date. 370 */ 371 if ((mop = allocb(sizeof (struct stroptions), 372 BPRI_MED)) != NULL) { 373 struct stroptions *sop; 374 uint_t snap; 375 int fudge; 376 377 snap = *(uint_t *)mp->b_cont->b_rptr; 378 mop->b_datap->db_type = M_SETOPTS; 379 mop->b_wptr += sizeof (struct stroptions); 380 sop = (struct stroptions *)mop->b_rptr; 381 sop->so_flags = SO_HIWAT | SO_LOWAT; 382 fudge = snap <= 100 ? 4 : 383 snap <= 400 ? 2 : 384 1; 385 sop->so_hiwat = SNIT_HIWAT(snap, fudge); 386 sop->so_lowat = SNIT_LOWAT(snap, fudge); 387 qreply(wq, mop); 388 } 389 } 390 391 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr; 392 miocack(wq, mp, 0, 0); 393 return; 394 395 default: 396 ASSERT(0); 397 return; 398 } 399 } 400 401 /* 402 * Write-side put procedure. Its main task is to detect ioctls 403 * for manipulating the buffering state and hand them to sbioctl. 404 * Other message types are passed on through. 405 */ 406 static void 407 sbwput(queue_t *wq, mblk_t *mp) 408 { 409 struct sb *sbp = (struct sb *)wq->q_ptr; 410 struct copyresp *resp; 411 412 if (sbp->sb_flags & SB_SEND_ON_WRITE) 413 sbclosechunk(sbp); 414 switch (mp->b_datap->db_type) { 415 case M_IOCTL: 416 sbioctl(wq, mp); 417 break; 418 419 case M_IOCDATA: 420 resp = (struct copyresp *)mp->b_rptr; 421 if (resp->cp_rval) { 422 /* 423 * Just free message on failure. 424 */ 425 freemsg(mp); 426 break; 427 } 428 429 switch (resp->cp_cmd) { 430 case SBIOCSTIME: 431 case SBIOCSCHUNK: 432 case SBIOCSFLAGS: 433 case SBIOCSSNAP: 434 case SBIOCGTIME: 435 case SBIOCGCHUNK: 436 case SBIOCGSNAP: 437 case SBIOCGFLAGS: 438 sbioc(wq, mp); 439 break; 440 441 default: 442 putnext(wq, mp); 443 break; 444 } 445 break; 446 447 default: 448 putnext(wq, mp); 449 break; 450 } 451 } 452 453 /* 454 * Read-side put procedure. It's responsible for buffering up incoming 455 * messages and grouping them into aggregates according to the current 456 * buffering parameters. 457 */ 458 static void 459 sbrput(queue_t *rq, mblk_t *mp) 460 { 461 struct sb *sbp = (struct sb *)rq->q_ptr; 462 463 ASSERT(sbp); 464 465 switch (mp->b_datap->db_type) { 466 case M_PROTO: 467 if (sbp->sb_flags & SB_NO_PROTO_CVT) { 468 sbclosechunk(sbp); 469 sbsendit(rq, mp); 470 break; 471 } else { 472 /* 473 * Convert M_PROTO to M_DATA. 474 */ 475 mp->b_datap->db_type = M_DATA; 476 } 477 /* FALLTHRU */ 478 479 case M_DATA: 480 if ((sbp->sb_flags & SB_DEFER_CHUNK) && 481 !(sbp->sb_state & SB_FRCVD)) { 482 sbclosechunk(sbp); 483 sbsendit(rq, mp); 484 sbp->sb_state |= SB_FRCVD; 485 } else 486 sbaddmsg(rq, mp); 487 488 if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid)) 489 sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick, 490 sbp, sbp->sb_ticks); 491 492 break; 493 494 case M_FLUSH: 495 if (*mp->b_rptr & FLUSHR) { 496 /* 497 * Reset timeout, flush the chunk currently in 498 * progress, and start a new chunk. 499 */ 500 if (sbp->sb_timeoutid) { 501 (void) quntimeout(sbp->sb_rq, 502 sbp->sb_timeoutid); 503 sbp->sb_timeoutid = 0; 504 } 505 if (sbp->sb_mp) { 506 freemsg(sbp->sb_mp); 507 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 508 sbp->sb_mlen = 0; 509 sbp->sb_mcount = 0; 510 } 511 flushq(rq, FLUSHALL); 512 } 513 putnext(rq, mp); 514 break; 515 516 case M_CTL: 517 /* 518 * Zero-length M_CTL means our timeout() popped. 519 */ 520 if (MBLKL(mp) == 0) { 521 freemsg(mp); 522 sbclosechunk(sbp); 523 } else { 524 sbclosechunk(sbp); 525 sbsendit(rq, mp); 526 } 527 break; 528 529 default: 530 if (mp->b_datap->db_type <= QPCTL) { 531 sbclosechunk(sbp); 532 sbsendit(rq, mp); 533 } else { 534 /* Note: out of band */ 535 putnext(rq, mp); 536 } 537 break; 538 } 539 } 540 541 /* 542 * read service procedure. 543 */ 544 /* ARGSUSED */ 545 static void 546 sbrsrv(queue_t *rq) 547 { 548 mblk_t *mp; 549 550 /* 551 * High priority messages shouldn't get here but if 552 * one does, jam it through to avoid infinite loop. 553 */ 554 while ((mp = getq(rq)) != NULL) { 555 if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) { 556 /* should only get here if SB_NO_SROPS */ 557 (void) putbq(rq, mp); 558 return; 559 } 560 putnext(rq, mp); 561 } 562 } 563 564 /* 565 * Handle write-side M_IOCTL messages. 566 */ 567 static void 568 sbioctl(queue_t *wq, mblk_t *mp) 569 { 570 struct sb *sbp = (struct sb *)wq->q_ptr; 571 struct iocblk *iocp = (struct iocblk *)mp->b_rptr; 572 struct timeval *t; 573 clock_t ticks; 574 mblk_t *mop; 575 int transparent = iocp->ioc_count; 576 mblk_t *datamp; 577 int error; 578 579 switch (iocp->ioc_cmd) { 580 case SBIOCSTIME: 581 if (iocp->ioc_count == TRANSPARENT) { 582 #ifdef _SYSCALL32_IMPL 583 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 584 mcopyin(mp, NULL, sizeof (struct timeval32), 585 NULL); 586 } else 587 #endif /* _SYSCALL32_IMPL */ 588 { 589 mcopyin(mp, NULL, sizeof (*t), NULL); 590 } 591 qreply(wq, mp); 592 } else { 593 /* 594 * Verify argument length. 595 */ 596 #ifdef _SYSCALL32_IMPL 597 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 598 struct timeval32 *t32; 599 600 error = miocpullup(mp, 601 sizeof (struct timeval32)); 602 if (error != 0) { 603 miocnak(wq, mp, 0, error); 604 break; 605 } 606 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 607 if (t32->tv_sec < 0 || t32->tv_usec < 0) { 608 miocnak(wq, mp, 0, EINVAL); 609 break; 610 } 611 ticks = TIMEVAL_TO_TICK(t32); 612 } else 613 #endif /* _SYSCALL32_IMPL */ 614 { 615 error = miocpullup(mp, sizeof (struct timeval)); 616 if (error != 0) { 617 miocnak(wq, mp, 0, error); 618 break; 619 } 620 621 t = (struct timeval *)mp->b_cont->b_rptr; 622 if (t->tv_sec < 0 || t->tv_usec < 0) { 623 miocnak(wq, mp, 0, EINVAL); 624 break; 625 } 626 ticks = TIMEVAL_TO_TICK(t); 627 } 628 sbp->sb_ticks = ticks; 629 if (ticks == 0) 630 sbp->sb_chunk = 0; 631 miocack(wq, mp, 0, 0); 632 sbclosechunk(sbp); 633 } 634 break; 635 636 case SBIOCGTIME: { 637 struct timeval *t; 638 639 /* 640 * Verify argument length. 641 */ 642 if (transparent != TRANSPARENT) { 643 #ifdef _SYSCALL32_IMPL 644 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 645 error = miocpullup(mp, 646 sizeof (struct timeval32)); 647 if (error != 0) { 648 miocnak(wq, mp, 0, error); 649 break; 650 } 651 } else 652 #endif /* _SYSCALL32_IMPL */ 653 error = miocpullup(mp, sizeof (struct timeval)); 654 if (error != 0) { 655 miocnak(wq, mp, 0, error); 656 break; 657 } 658 } 659 660 /* 661 * If infinite timeout, return range error 662 * for the ioctl. 663 */ 664 if (sbp->sb_ticks < 0) { 665 miocnak(wq, mp, 0, ERANGE); 666 break; 667 } 668 669 #ifdef _SYSCALL32_IMPL 670 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 671 struct timeval32 *t32; 672 673 if (transparent == TRANSPARENT) { 674 datamp = allocb(sizeof (*t32), BPRI_MED); 675 if (datamp == NULL) { 676 miocnak(wq, mp, 0, EAGAIN); 677 break; 678 } 679 mcopyout(mp, NULL, sizeof (*t32), NULL, datamp); 680 } 681 682 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 683 TICK_TO_TIMEVAL32(sbp->sb_ticks, t32); 684 685 if (transparent == TRANSPARENT) 686 qreply(wq, mp); 687 else 688 miocack(wq, mp, sizeof (*t32), 0); 689 } else 690 #endif /* _SYSCALL32_IMPL */ 691 { 692 if (transparent == TRANSPARENT) { 693 datamp = allocb(sizeof (*t), BPRI_MED); 694 if (datamp == NULL) { 695 miocnak(wq, mp, 0, EAGAIN); 696 break; 697 } 698 mcopyout(mp, NULL, sizeof (*t), NULL, datamp); 699 } 700 701 t = (struct timeval *)mp->b_cont->b_rptr; 702 TICK_TO_TIMEVAL(sbp->sb_ticks, t); 703 704 if (transparent == TRANSPARENT) 705 qreply(wq, mp); 706 else 707 miocack(wq, mp, sizeof (*t), 0); 708 } 709 break; 710 } 711 712 case SBIOCCTIME: 713 sbp->sb_ticks = -1; 714 miocack(wq, mp, 0, 0); 715 break; 716 717 case SBIOCSCHUNK: 718 if (iocp->ioc_count == TRANSPARENT) { 719 mcopyin(mp, NULL, sizeof (uint_t), NULL); 720 qreply(wq, mp); 721 } else { 722 /* 723 * Verify argument length. 724 */ 725 error = miocpullup(mp, sizeof (uint_t)); 726 if (error != 0) { 727 miocnak(wq, mp, 0, error); 728 break; 729 } 730 731 /* 732 * set up hi/lo water marks on stream head read queue. 733 * unlikely to run out of resources. Fix at later date. 734 */ 735 if ((mop = allocb(sizeof (struct stroptions), 736 BPRI_MED)) != NULL) { 737 struct stroptions *sop; 738 uint_t chunk; 739 740 chunk = *(uint_t *)mp->b_cont->b_rptr; 741 mop->b_datap->db_type = M_SETOPTS; 742 mop->b_wptr += sizeof (struct stroptions); 743 sop = (struct stroptions *)mop->b_rptr; 744 sop->so_flags = SO_HIWAT | SO_LOWAT; 745 sop->so_hiwat = SNIT_HIWAT(chunk, 1); 746 sop->so_lowat = SNIT_LOWAT(chunk, 1); 747 qreply(wq, mop); 748 } 749 750 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr; 751 miocack(wq, mp, 0, 0); 752 sbclosechunk(sbp); 753 } 754 break; 755 756 case SBIOCGCHUNK: 757 /* 758 * Verify argument length. 759 */ 760 if (transparent != TRANSPARENT) { 761 error = miocpullup(mp, sizeof (uint_t)); 762 if (error != 0) { 763 miocnak(wq, mp, 0, error); 764 break; 765 } 766 } 767 768 if (transparent == TRANSPARENT) { 769 datamp = allocb(sizeof (uint_t), BPRI_MED); 770 if (datamp == NULL) { 771 miocnak(wq, mp, 0, EAGAIN); 772 break; 773 } 774 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 775 } 776 777 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk; 778 779 if (transparent == TRANSPARENT) 780 qreply(wq, mp); 781 else 782 miocack(wq, mp, sizeof (uint_t), 0); 783 break; 784 785 case SBIOCSSNAP: 786 if (iocp->ioc_count == TRANSPARENT) { 787 mcopyin(mp, NULL, sizeof (uint_t), NULL); 788 qreply(wq, mp); 789 } else { 790 /* 791 * Verify argument length. 792 */ 793 error = miocpullup(mp, sizeof (uint_t)); 794 if (error != 0) { 795 miocnak(wq, mp, 0, error); 796 break; 797 } 798 799 /* 800 * if chunking dont worry about effects of 801 * snipping of message size on head flow control 802 * since it has a relatively small bearing on the 803 * data rate onto the streamn head. 804 */ 805 if (!sbp->sb_chunk) { 806 /* 807 * set up hi/lo water marks on stream 808 * head read queue. unlikely to run out 809 * of resources. Fix at later date. 810 */ 811 if ((mop = allocb(sizeof (struct stroptions), 812 BPRI_MED)) != NULL) { 813 struct stroptions *sop; 814 uint_t snap; 815 int fudge; 816 817 snap = *(uint_t *)mp->b_cont->b_rptr; 818 mop->b_datap->db_type = M_SETOPTS; 819 mop->b_wptr += sizeof (*sop); 820 sop = (struct stroptions *)mop->b_rptr; 821 sop->so_flags = SO_HIWAT | SO_LOWAT; 822 fudge = (snap <= 100) ? 4 : 823 (snap <= 400) ? 2 : 1; 824 sop->so_hiwat = SNIT_HIWAT(snap, fudge); 825 sop->so_lowat = SNIT_LOWAT(snap, fudge); 826 qreply(wq, mop); 827 } 828 } 829 830 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr; 831 832 miocack(wq, mp, 0, 0); 833 } 834 break; 835 836 case SBIOCGSNAP: 837 /* 838 * Verify argument length 839 */ 840 if (transparent != TRANSPARENT) { 841 error = miocpullup(mp, sizeof (uint_t)); 842 if (error != 0) { 843 miocnak(wq, mp, 0, error); 844 break; 845 } 846 } 847 848 if (transparent == TRANSPARENT) { 849 datamp = allocb(sizeof (uint_t), BPRI_MED); 850 if (datamp == NULL) { 851 miocnak(wq, mp, 0, EAGAIN); 852 break; 853 } 854 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 855 } 856 857 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap; 858 859 if (transparent == TRANSPARENT) 860 qreply(wq, mp); 861 else 862 miocack(wq, mp, sizeof (uint_t), 0); 863 break; 864 865 case SBIOCSFLAGS: 866 /* 867 * set the flags. 868 */ 869 if (iocp->ioc_count == TRANSPARENT) { 870 mcopyin(mp, NULL, sizeof (uint_t), NULL); 871 qreply(wq, mp); 872 } else { 873 error = miocpullup(mp, sizeof (uint_t)); 874 if (error != 0) { 875 miocnak(wq, mp, 0, error); 876 break; 877 } 878 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr; 879 miocack(wq, mp, 0, 0); 880 } 881 break; 882 883 case SBIOCGFLAGS: 884 /* 885 * Verify argument length 886 */ 887 if (transparent != TRANSPARENT) { 888 error = miocpullup(mp, sizeof (uint_t)); 889 if (error != 0) { 890 miocnak(wq, mp, 0, error); 891 break; 892 } 893 } 894 895 if (transparent == TRANSPARENT) { 896 datamp = allocb(sizeof (uint_t), BPRI_MED); 897 if (datamp == NULL) { 898 miocnak(wq, mp, 0, EAGAIN); 899 break; 900 } 901 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 902 } 903 904 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags; 905 906 if (transparent == TRANSPARENT) 907 qreply(wq, mp); 908 else 909 miocack(wq, mp, sizeof (uint_t), 0); 910 break; 911 912 913 default: 914 putnext(wq, mp); 915 break; 916 } 917 } 918 919 /* 920 * Given a length l, calculate the amount of extra storage 921 * required to round it up to the next multiple of the alignment a. 922 */ 923 #define RoundUpAmt(l, a) ((l) % (a) ? (a) - ((l) % (a)) : 0) 924 /* 925 * Calculate additional amount of space required for alignment. 926 */ 927 #define Align(l) RoundUpAmt(l, sizeof (ulong_t)) 928 /* 929 * Smallest possible message size when headers are enabled. 930 * This is used to calculate whether a chunk is nearly full. 931 */ 932 #define SMALLEST_MESSAGE sizeof (struct sb_hdr) + _POINTER_ALIGNMENT 933 934 /* 935 * Process a read-side M_DATA message. 936 * 937 * If the currently accumulating chunk doesn't have enough room 938 * for the message, close off the chunk, pass it upward, and start 939 * a new one. Then add the message to the current chunk, taking 940 * account of the possibility that the message's size exceeds the 941 * chunk size. 942 * 943 * If headers are enabled add an sb_hdr header and trailing alignment padding. 944 * 945 * To optimise performance the total number of msgbs should be kept 946 * to a minimum. This is achieved by using any remaining space in message N 947 * for both its own padding as well as the header of message N+1 if possible. 948 * If there's insufficient space we allocate one message to hold this 'wrapper'. 949 * (there's likely to be space beyond message N, since allocb would have 950 * rounded up the required size to one of the dblk_sizes). 951 * 952 */ 953 static void 954 sbaddmsg(queue_t *rq, mblk_t *mp) 955 { 956 struct sb *sbp; 957 struct timeval t; 958 struct sb_hdr hp; 959 mblk_t *wrapper; /* padding for msg N, header for msg N+1 */ 960 mblk_t *last; /* last mblk of current message */ 961 size_t wrapperlen; /* length of header + padding */ 962 size_t origlen; /* data length before truncation */ 963 size_t pad; /* bytes required to align header */ 964 965 sbp = (struct sb *)rq->q_ptr; 966 967 origlen = msgdsize(mp); 968 969 /* 970 * Truncate the message. 971 */ 972 if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) && 973 (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1)) 974 hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap; 975 else 976 hp.sbh_totlen = hp.sbh_msglen = origlen; 977 978 if (sbp->sb_flags & SB_NO_HEADER) { 979 980 /* 981 * Would the inclusion of this message overflow the current 982 * chunk? If so close the chunk off and start a new one. 983 */ 984 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk) 985 sbclosechunk(sbp); 986 /* 987 * First message too big for chunk - just send it up. 988 * This will always be true when we're not chunking. 989 */ 990 if (hp.sbh_totlen > sbp->sb_chunk) { 991 sbsendit(rq, mp); 992 return; 993 } 994 995 /* 996 * We now know that the msg will fit in the chunk. 997 * Link it onto the end of the chunk. 998 * Since linkb() walks the entire chain, we keep a pointer to 999 * the first mblk of the last msgb added and call linkb on that 1000 * that last message, rather than performing the 1001 * O(n) linkb() operation on the whole chain. 1002 * sb_head isn't needed in this SB_NO_HEADER mode. 1003 */ 1004 if (sbp->sb_mp) 1005 linkb(sbp->sb_tail, mp); 1006 else 1007 sbp->sb_mp = mp; 1008 1009 sbp->sb_tail = mp; 1010 sbp->sb_mlen += hp.sbh_totlen; 1011 sbp->sb_mcount++; 1012 } else { 1013 /* Timestamp must be done immediately */ 1014 uniqtime(&t); 1015 TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t); 1016 1017 pad = Align(hp.sbh_totlen); 1018 hp.sbh_totlen += sizeof (hp); 1019 hp.sbh_totlen += pad; 1020 1021 /* 1022 * Would the inclusion of this message overflow the current 1023 * chunk? If so close the chunk off and start a new one. 1024 */ 1025 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk) 1026 sbclosechunk(sbp); 1027 1028 if (sbp->sb_head == NULL) { 1029 /* Allocate leading header of new chunk */ 1030 sbp->sb_head = allocb(sizeof (hp), BPRI_MED); 1031 if (sbp->sb_head == NULL) { 1032 /* 1033 * Memory allocation failure. 1034 * This will need to be revisited 1035 * since using certain flag combinations 1036 * can result in messages being dropped 1037 * silently. 1038 */ 1039 freemsg(mp); 1040 sbp->sb_drops++; 1041 return; 1042 } 1043 sbp->sb_mp = sbp->sb_head; 1044 } 1045 1046 /* 1047 * Copy header into message 1048 */ 1049 hp.sbh_drops = sbp->sb_drops; 1050 hp.sbh_origlen = origlen; 1051 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp)); 1052 sbp->sb_head->b_wptr += sizeof (hp); 1053 1054 ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim); 1055 1056 /* 1057 * Join message to the chunk 1058 */ 1059 linkb(sbp->sb_head, mp); 1060 1061 sbp->sb_mcount++; 1062 sbp->sb_mlen += hp.sbh_totlen; 1063 1064 /* 1065 * If the first message alone is too big for the chunk close 1066 * the chunk now. 1067 * If the next message would immediately cause the chunk to 1068 * overflow we may as well close the chunk now. The next 1069 * message is certain to be at least SMALLEST_MESSAGE size. 1070 */ 1071 if (hp.sbh_totlen + SMALLEST_MESSAGE > sbp->sb_chunk) { 1072 sbclosechunk(sbp); 1073 return; 1074 } 1075 1076 /* 1077 * Find space for the wrapper. The wrapper consists of: 1078 * 1079 * 1) Padding for this message (this is to ensure each header 1080 * begins on an 8 byte boundary in the userland buffer). 1081 * 1082 * 2) Space for the next message's header, in case the next 1083 * next message will fit in this chunk. 1084 * 1085 * It may be possible to append the wrapper to the last mblk 1086 * of the message, but only if we 'own' the data. If the dblk 1087 * has been shared through dupmsg() we mustn't alter it. 1088 */ 1089 1090 wrapperlen = (sizeof (hp) + pad); 1091 1092 /* Is there space for the wrapper beyond the message's data ? */ 1093 for (last = mp; last->b_cont; last = last->b_cont) 1094 ; 1095 1096 if ((wrapperlen <= MBLKTAIL(last)) && 1097 (last->b_datap->db_ref == 1)) { 1098 if (pad > 0) { 1099 /* 1100 * Pad with zeroes to the next pointer boundary 1101 * (we don't want to disclose kernel data to 1102 * users), then advance wptr. 1103 */ 1104 (void) memset(last->b_wptr, 0, pad); 1105 last->b_wptr += pad; 1106 } 1107 /* Remember where to write the header information */ 1108 sbp->sb_head = last; 1109 } else { 1110 /* Have to allocate additional space for the wrapper */ 1111 wrapper = allocb(wrapperlen, BPRI_MED); 1112 if (wrapper == NULL) { 1113 sbclosechunk(sbp); 1114 return; 1115 } 1116 if (pad > 0) { 1117 /* 1118 * Pad with zeroes (we don't want to disclose 1119 * kernel data to users). 1120 */ 1121 (void) memset(wrapper->b_wptr, 0, pad); 1122 wrapper->b_wptr += pad; 1123 } 1124 /* Link the wrapper msg onto the end of the chunk */ 1125 linkb(mp, wrapper); 1126 /* Remember to write the next header in this wrapper */ 1127 sbp->sb_head = wrapper; 1128 } 1129 } 1130 } 1131 1132 /* 1133 * Called from timeout(). 1134 * Signal a timeout by passing a zero-length M_CTL msg in the read-side 1135 * to synchronize with any active module threads (open, close, wput, rput). 1136 */ 1137 static void 1138 sbtick(void *arg) 1139 { 1140 struct sb *sbp = arg; 1141 queue_t *rq; 1142 1143 ASSERT(sbp); 1144 1145 rq = sbp->sb_rq; 1146 sbp->sb_timeoutid = 0; /* timeout has fired */ 1147 1148 if (putctl(rq, M_CTL) == 0) /* failure */ 1149 sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks); 1150 } 1151 1152 /* 1153 * Close off the currently accumulating chunk and pass 1154 * it upward. Takes care of resetting timers as well. 1155 * 1156 * This routine is called both directly and as a result 1157 * of the chunk timeout expiring. 1158 */ 1159 static void 1160 sbclosechunk(struct sb *sbp) 1161 { 1162 mblk_t *mp; 1163 queue_t *rq; 1164 1165 ASSERT(sbp); 1166 1167 if (sbp->sb_timeoutid) { 1168 (void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid); 1169 sbp->sb_timeoutid = 0; 1170 } 1171 1172 mp = sbp->sb_mp; 1173 rq = sbp->sb_rq; 1174 1175 /* 1176 * If there's currently a chunk in progress, close it off 1177 * and try to send it up. 1178 */ 1179 if (mp) { 1180 sbsendit(rq, mp); 1181 } 1182 1183 /* 1184 * Clear old chunk. Ready for new msgs. 1185 */ 1186 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 1187 sbp->sb_mlen = 0; 1188 sbp->sb_mcount = 0; 1189 if (sbp->sb_flags & SB_DEFER_CHUNK) 1190 sbp->sb_state &= ~SB_FRCVD; 1191 1192 } 1193 1194 static void 1195 sbsendit(queue_t *rq, mblk_t *mp) 1196 { 1197 struct sb *sbp = (struct sb *)rq->q_ptr; 1198 1199 if (!canputnext(rq)) { 1200 if (sbp->sb_flags & SB_NO_DROPS) 1201 (void) putq(rq, mp); 1202 else { 1203 freemsg(mp); 1204 sbp->sb_drops += sbp->sb_mcount; 1205 } 1206 return; 1207 } 1208 /* 1209 * If there are messages on the q already, keep 1210 * queueing them since they need to be processed in order. 1211 */ 1212 if (qsize(rq) > 0) { 1213 /* should only get here if SB_NO_DROPS */ 1214 (void) putq(rq, mp); 1215 } 1216 else 1217 putnext(rq, mp); 1218 } 1219