1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #pragma ident "%Z%%M% %I% %E% SMI"
28
29 /*
30 * STREAMS Buffering module
31 *
32 * This streams module collects incoming messages from modules below
33 * it on the stream and buffers them up into a smaller number of
34 * aggregated messages. Its main purpose is to reduce overhead by
35 * cutting down on the number of read (or getmsg) calls its client
36 * user process makes.
37 * - only M_DATA is buffered.
38 * - multithreading assumes configured as D_MTQPAIR
39 * - packets are lost only if flag SB_NO_HEADER is clear and buffer
40 * allocation fails.
41 * - in order message transmission. This is enforced for messages other
42 * than high priority messages.
43 * - zero length messages on the read side are not passed up the
44 * stream but used internally for synchronization.
45 * FLAGS:
46 * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
47 * (conversion is the default for backwards compatibility
48 * hence the negative logic).
49 * - SB_NO_HEADER - no headers in buffered data.
50 * (adding headers is the default for backwards compatibility
51 * hence the negative logic).
52 * - SB_DEFER_CHUNK - provides improved response time in question-answer
53 * applications. Buffering is not enabled until the second message
54 * is received on the read side within the sb_ticks interval.
55 * This option will often be used in combination with flag SB_SEND_ON_WRITE.
56 * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
57 * data being immediately sent upstream.
58 * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
59 * the blocked flow condition downstream. If this flag is clear (default)
60 * messages will be dropped if the upstream flow is blocked.
61 */
62
63
64 #include <sys/types.h>
65 #include <sys/errno.h>
66 #include <sys/debug.h>
67 #include <sys/stropts.h>
68 #include <sys/time.h>
69 #include <sys/stream.h>
70 #include <sys/conf.h>
71 #include <sys/ddi.h>
72 #include <sys/sunddi.h>
73 #include <sys/kmem.h>
74 #include <sys/strsun.h>
75 #include <sys/bufmod.h>
76 #include <sys/modctl.h>
77 #include <sys/isa_defs.h>
78
79 /*
80 * Per-Stream state information.
81 *
82 * If sb_ticks is negative, we don't deliver chunks until they're
83 * full. If it's zero, we deliver every packet as it arrives. (In
84 * this case we force sb_chunk to zero, to make the implementation
85 * easier.) Otherwise, sb_ticks gives the number of ticks in a
86 * buffering interval. The interval begins when the a read side data
87 * message is received and a timeout is not active. If sb_snap is
88 * zero, no truncation of the msg is done.
89 */
90 struct sb {
91 queue_t *sb_rq; /* our rq */
92 mblk_t *sb_mp; /* partial chunk */
93 mblk_t *sb_head; /* pre-allocated space for the next header */
94 mblk_t *sb_tail; /* first mblk of last message appended */
95 uint_t sb_mlen; /* sb_mp length */
96 uint_t sb_mcount; /* input msg count in sb_mp */
97 uint_t sb_chunk; /* max chunk size */
98 clock_t sb_ticks; /* timeout interval */
99 timeout_id_t sb_timeoutid; /* qtimeout() id */
100 uint_t sb_drops; /* cumulative # discarded msgs */
101 uint_t sb_snap; /* snapshot length */
102 uint_t sb_flags; /* flags field */
103 uint_t sb_state; /* state variable */
104 };
105
106 /*
107 * Function prototypes.
108 */
109 static int sbopen(queue_t *, dev_t *, int, int, cred_t *);
110 static int sbclose(queue_t *, int, cred_t *);
111 static void sbwput(queue_t *, mblk_t *);
112 static void sbrput(queue_t *, mblk_t *);
113 static void sbrsrv(queue_t *);
114 static void sbioctl(queue_t *, mblk_t *);
115 static void sbaddmsg(queue_t *, mblk_t *);
116 static void sbtick(void *);
117 static void sbclosechunk(struct sb *);
118 static void sbsendit(queue_t *, mblk_t *);
119
120 static struct module_info sb_minfo = {
121 21, /* mi_idnum */
122 "bufmod", /* mi_idname */
123 0, /* mi_minpsz */
124 INFPSZ, /* mi_maxpsz */
125 1, /* mi_hiwat */
126 0 /* mi_lowat */
127 };
128
129 static struct qinit sb_rinit = {
130 (int (*)())sbrput, /* qi_putp */
131 (int (*)())sbrsrv, /* qi_srvp */
132 sbopen, /* qi_qopen */
133 sbclose, /* qi_qclose */
134 NULL, /* qi_qadmin */
135 &sb_minfo, /* qi_minfo */
136 NULL /* qi_mstat */
137 };
138
139 static struct qinit sb_winit = {
140 (int (*)())sbwput, /* qi_putp */
141 NULL, /* qi_srvp */
142 NULL, /* qi_qopen */
143 NULL, /* qi_qclose */
144 NULL, /* qi_qadmin */
145 &sb_minfo, /* qi_minfo */
146 NULL /* qi_mstat */
147 };
148
149 static struct streamtab sb_info = {
150 &sb_rinit, /* st_rdinit */
151 &sb_winit, /* st_wrinit */
152 NULL, /* st_muxrinit */
153 NULL /* st_muxwinit */
154 };
155
156
157 /*
158 * This is the loadable module wrapper.
159 */
160
161 static struct fmodsw fsw = {
162 "bufmod",
163 &sb_info,
164 D_MTQPAIR | D_MP
165 };
166
167 /*
168 * Module linkage information for the kernel.
169 */
170
171 static struct modlstrmod modlstrmod = {
172 &mod_strmodops, "streams buffer mod", &fsw
173 };
174
175 static struct modlinkage modlinkage = {
176 MODREV_1, &modlstrmod, NULL
177 };
178
179
180 int
_init(void)181 _init(void)
182 {
183 return (mod_install(&modlinkage));
184 }
185
186 int
_fini(void)187 _fini(void)
188 {
189 return (mod_remove(&modlinkage));
190 }
191
192 int
_info(struct modinfo * modinfop)193 _info(struct modinfo *modinfop)
194 {
195 return (mod_info(&modlinkage, modinfop));
196 }
197
198
199 /* ARGSUSED */
200 static int
sbopen(queue_t * rq,dev_t * dev,int oflag,int sflag,cred_t * crp)201 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
202 {
203 struct sb *sbp;
204 ASSERT(rq);
205
206 if (sflag != MODOPEN)
207 return (EINVAL);
208
209 if (rq->q_ptr)
210 return (0);
211
212 /*
213 * Allocate and initialize per-Stream structure.
214 */
215 sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
216 sbp->sb_rq = rq;
217 sbp->sb_ticks = -1;
218 sbp->sb_chunk = SB_DFLT_CHUNK;
219 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
220 sbp->sb_mlen = 0;
221 sbp->sb_mcount = 0;
222 sbp->sb_timeoutid = 0;
223 sbp->sb_drops = 0;
224 sbp->sb_snap = 0;
225 sbp->sb_flags = 0;
226 sbp->sb_state = 0;
227
228 rq->q_ptr = WR(rq)->q_ptr = sbp;
229
230 qprocson(rq);
231
232
233 return (0);
234 }
235
236 /* ARGSUSED1 */
237 static int
sbclose(queue_t * rq,int flag,cred_t * credp)238 sbclose(queue_t *rq, int flag, cred_t *credp)
239 {
240 struct sb *sbp = (struct sb *)rq->q_ptr;
241
242 ASSERT(sbp);
243
244 qprocsoff(rq);
245 /*
246 * Cancel an outstanding timeout
247 */
248 if (sbp->sb_timeoutid != 0) {
249 (void) quntimeout(rq, sbp->sb_timeoutid);
250 sbp->sb_timeoutid = 0;
251 }
252 /*
253 * Free the current chunk.
254 */
255 if (sbp->sb_mp) {
256 freemsg(sbp->sb_mp);
257 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
258 sbp->sb_mlen = 0;
259 }
260
261 /*
262 * Free the per-Stream structure.
263 */
264 kmem_free((caddr_t)sbp, sizeof (struct sb));
265 rq->q_ptr = WR(rq)->q_ptr = NULL;
266
267 return (0);
268 }
269
270 /*
271 * the correction factor is introduced to compensate for
272 * whatever assumptions the modules below have made about
273 * how much traffic is flowing through the stream and the fact
274 * that bufmod may be snipping messages with the sb_snap length.
275 */
276 #define SNIT_HIWAT(msgsize, fudge) ((4 * msgsize * fudge) + 512)
277 #define SNIT_LOWAT(msgsize, fudge) ((2 * msgsize * fudge) + 256)
278
279
280 static void
sbioc(queue_t * wq,mblk_t * mp)281 sbioc(queue_t *wq, mblk_t *mp)
282 {
283 struct iocblk *iocp;
284 struct sb *sbp = (struct sb *)wq->q_ptr;
285 clock_t ticks;
286 mblk_t *mop;
287
288 iocp = (struct iocblk *)mp->b_rptr;
289
290 switch (iocp->ioc_cmd) {
291 case SBIOCGCHUNK:
292 case SBIOCGSNAP:
293 case SBIOCGFLAGS:
294 case SBIOCGTIME:
295 miocack(wq, mp, 0, 0);
296 return;
297
298 case SBIOCSTIME:
299 #ifdef _SYSCALL32_IMPL
300 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
301 struct timeval32 *t32;
302
303 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
304 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
305 miocnak(wq, mp, 0, EINVAL);
306 break;
307 }
308 ticks = TIMEVAL_TO_TICK(t32);
309 } else
310 #endif /* _SYSCALL32_IMPL */
311 {
312 struct timeval *tb;
313
314 tb = (struct timeval *)mp->b_cont->b_rptr;
315
316 if (tb->tv_sec < 0 || tb->tv_usec < 0) {
317 miocnak(wq, mp, 0, EINVAL);
318 break;
319 }
320 ticks = TIMEVAL_TO_TICK(tb);
321 }
322 sbp->sb_ticks = ticks;
323 if (ticks == 0)
324 sbp->sb_chunk = 0;
325 miocack(wq, mp, 0, 0);
326 sbclosechunk(sbp);
327 return;
328
329 case SBIOCSCHUNK:
330 /*
331 * set up hi/lo water marks on stream head read queue.
332 * unlikely to run out of resources. Fix at later date.
333 */
334 if ((mop = allocb(sizeof (struct stroptions),
335 BPRI_MED)) != NULL) {
336 struct stroptions *sop;
337 uint_t chunk;
338
339 chunk = *(uint_t *)mp->b_cont->b_rptr;
340 mop->b_datap->db_type = M_SETOPTS;
341 mop->b_wptr += sizeof (struct stroptions);
342 sop = (struct stroptions *)mop->b_rptr;
343 sop->so_flags = SO_HIWAT | SO_LOWAT;
344 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
345 sop->so_lowat = SNIT_LOWAT(chunk, 1);
346 qreply(wq, mop);
347 }
348
349 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
350 miocack(wq, mp, 0, 0);
351 sbclosechunk(sbp);
352 return;
353
354 case SBIOCSFLAGS:
355 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
356 miocack(wq, mp, 0, 0);
357 return;
358
359 case SBIOCSSNAP:
360 /*
361 * if chunking dont worry about effects of
362 * snipping of message size on head flow control
363 * since it has a relatively small bearing on the
364 * data rate onto the streamn head.
365 */
366 if (!sbp->sb_chunk) {
367 /*
368 * set up hi/lo water marks on stream head read queue.
369 * unlikely to run out of resources. Fix at later date.
370 */
371 if ((mop = allocb(sizeof (struct stroptions),
372 BPRI_MED)) != NULL) {
373 struct stroptions *sop;
374 uint_t snap;
375 int fudge;
376
377 snap = *(uint_t *)mp->b_cont->b_rptr;
378 mop->b_datap->db_type = M_SETOPTS;
379 mop->b_wptr += sizeof (struct stroptions);
380 sop = (struct stroptions *)mop->b_rptr;
381 sop->so_flags = SO_HIWAT | SO_LOWAT;
382 fudge = snap <= 100 ? 4 :
383 snap <= 400 ? 2 :
384 1;
385 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
386 sop->so_lowat = SNIT_LOWAT(snap, fudge);
387 qreply(wq, mop);
388 }
389 }
390
391 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
392 miocack(wq, mp, 0, 0);
393 return;
394
395 default:
396 ASSERT(0);
397 return;
398 }
399 }
400
401 /*
402 * Write-side put procedure. Its main task is to detect ioctls
403 * for manipulating the buffering state and hand them to sbioctl.
404 * Other message types are passed on through.
405 */
406 static void
sbwput(queue_t * wq,mblk_t * mp)407 sbwput(queue_t *wq, mblk_t *mp)
408 {
409 struct sb *sbp = (struct sb *)wq->q_ptr;
410 struct copyresp *resp;
411
412 if (sbp->sb_flags & SB_SEND_ON_WRITE)
413 sbclosechunk(sbp);
414 switch (mp->b_datap->db_type) {
415 case M_IOCTL:
416 sbioctl(wq, mp);
417 break;
418
419 case M_IOCDATA:
420 resp = (struct copyresp *)mp->b_rptr;
421 if (resp->cp_rval) {
422 /*
423 * Just free message on failure.
424 */
425 freemsg(mp);
426 break;
427 }
428
429 switch (resp->cp_cmd) {
430 case SBIOCSTIME:
431 case SBIOCSCHUNK:
432 case SBIOCSFLAGS:
433 case SBIOCSSNAP:
434 case SBIOCGTIME:
435 case SBIOCGCHUNK:
436 case SBIOCGSNAP:
437 case SBIOCGFLAGS:
438 sbioc(wq, mp);
439 break;
440
441 default:
442 putnext(wq, mp);
443 break;
444 }
445 break;
446
447 default:
448 putnext(wq, mp);
449 break;
450 }
451 }
452
453 /*
454 * Read-side put procedure. It's responsible for buffering up incoming
455 * messages and grouping them into aggregates according to the current
456 * buffering parameters.
457 */
458 static void
sbrput(queue_t * rq,mblk_t * mp)459 sbrput(queue_t *rq, mblk_t *mp)
460 {
461 struct sb *sbp = (struct sb *)rq->q_ptr;
462
463 ASSERT(sbp);
464
465 switch (mp->b_datap->db_type) {
466 case M_PROTO:
467 if (sbp->sb_flags & SB_NO_PROTO_CVT) {
468 sbclosechunk(sbp);
469 sbsendit(rq, mp);
470 break;
471 } else {
472 /*
473 * Convert M_PROTO to M_DATA.
474 */
475 mp->b_datap->db_type = M_DATA;
476 }
477 /* FALLTHRU */
478
479 case M_DATA:
480 if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
481 !(sbp->sb_state & SB_FRCVD)) {
482 sbclosechunk(sbp);
483 sbsendit(rq, mp);
484 sbp->sb_state |= SB_FRCVD;
485 } else
486 sbaddmsg(rq, mp);
487
488 if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
489 sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
490 sbp, sbp->sb_ticks);
491
492 break;
493
494 case M_FLUSH:
495 if (*mp->b_rptr & FLUSHR) {
496 /*
497 * Reset timeout, flush the chunk currently in
498 * progress, and start a new chunk.
499 */
500 if (sbp->sb_timeoutid) {
501 (void) quntimeout(sbp->sb_rq,
502 sbp->sb_timeoutid);
503 sbp->sb_timeoutid = 0;
504 }
505 if (sbp->sb_mp) {
506 freemsg(sbp->sb_mp);
507 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
508 sbp->sb_mlen = 0;
509 sbp->sb_mcount = 0;
510 }
511 flushq(rq, FLUSHALL);
512 }
513 putnext(rq, mp);
514 break;
515
516 case M_CTL:
517 /*
518 * Zero-length M_CTL means our timeout() popped.
519 */
520 if (MBLKL(mp) == 0) {
521 freemsg(mp);
522 sbclosechunk(sbp);
523 } else {
524 sbclosechunk(sbp);
525 sbsendit(rq, mp);
526 }
527 break;
528
529 default:
530 if (mp->b_datap->db_type <= QPCTL) {
531 sbclosechunk(sbp);
532 sbsendit(rq, mp);
533 } else {
534 /* Note: out of band */
535 putnext(rq, mp);
536 }
537 break;
538 }
539 }
540
541 /*
542 * read service procedure.
543 */
544 /* ARGSUSED */
545 static void
sbrsrv(queue_t * rq)546 sbrsrv(queue_t *rq)
547 {
548 mblk_t *mp;
549
550 /*
551 * High priority messages shouldn't get here but if
552 * one does, jam it through to avoid infinite loop.
553 */
554 while ((mp = getq(rq)) != NULL) {
555 if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
556 /* should only get here if SB_NO_SROPS */
557 (void) putbq(rq, mp);
558 return;
559 }
560 putnext(rq, mp);
561 }
562 }
563
564 /*
565 * Handle write-side M_IOCTL messages.
566 */
567 static void
sbioctl(queue_t * wq,mblk_t * mp)568 sbioctl(queue_t *wq, mblk_t *mp)
569 {
570 struct sb *sbp = (struct sb *)wq->q_ptr;
571 struct iocblk *iocp = (struct iocblk *)mp->b_rptr;
572 struct timeval *t;
573 clock_t ticks;
574 mblk_t *mop;
575 int transparent = iocp->ioc_count;
576 mblk_t *datamp;
577 int error;
578
579 switch (iocp->ioc_cmd) {
580 case SBIOCSTIME:
581 if (iocp->ioc_count == TRANSPARENT) {
582 #ifdef _SYSCALL32_IMPL
583 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
584 mcopyin(mp, NULL, sizeof (struct timeval32),
585 NULL);
586 } else
587 #endif /* _SYSCALL32_IMPL */
588 {
589 mcopyin(mp, NULL, sizeof (*t), NULL);
590 }
591 qreply(wq, mp);
592 } else {
593 /*
594 * Verify argument length.
595 */
596 #ifdef _SYSCALL32_IMPL
597 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
598 struct timeval32 *t32;
599
600 error = miocpullup(mp,
601 sizeof (struct timeval32));
602 if (error != 0) {
603 miocnak(wq, mp, 0, error);
604 break;
605 }
606 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
607 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
608 miocnak(wq, mp, 0, EINVAL);
609 break;
610 }
611 ticks = TIMEVAL_TO_TICK(t32);
612 } else
613 #endif /* _SYSCALL32_IMPL */
614 {
615 error = miocpullup(mp, sizeof (struct timeval));
616 if (error != 0) {
617 miocnak(wq, mp, 0, error);
618 break;
619 }
620
621 t = (struct timeval *)mp->b_cont->b_rptr;
622 if (t->tv_sec < 0 || t->tv_usec < 0) {
623 miocnak(wq, mp, 0, EINVAL);
624 break;
625 }
626 ticks = TIMEVAL_TO_TICK(t);
627 }
628 sbp->sb_ticks = ticks;
629 if (ticks == 0)
630 sbp->sb_chunk = 0;
631 miocack(wq, mp, 0, 0);
632 sbclosechunk(sbp);
633 }
634 break;
635
636 case SBIOCGTIME: {
637 struct timeval *t;
638
639 /*
640 * Verify argument length.
641 */
642 if (transparent != TRANSPARENT) {
643 #ifdef _SYSCALL32_IMPL
644 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
645 error = miocpullup(mp,
646 sizeof (struct timeval32));
647 if (error != 0) {
648 miocnak(wq, mp, 0, error);
649 break;
650 }
651 } else
652 #endif /* _SYSCALL32_IMPL */
653 error = miocpullup(mp, sizeof (struct timeval));
654 if (error != 0) {
655 miocnak(wq, mp, 0, error);
656 break;
657 }
658 }
659
660 /*
661 * If infinite timeout, return range error
662 * for the ioctl.
663 */
664 if (sbp->sb_ticks < 0) {
665 miocnak(wq, mp, 0, ERANGE);
666 break;
667 }
668
669 #ifdef _SYSCALL32_IMPL
670 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
671 struct timeval32 *t32;
672
673 if (transparent == TRANSPARENT) {
674 datamp = allocb(sizeof (*t32), BPRI_MED);
675 if (datamp == NULL) {
676 miocnak(wq, mp, 0, EAGAIN);
677 break;
678 }
679 mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
680 }
681
682 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
683 TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
684
685 if (transparent == TRANSPARENT)
686 qreply(wq, mp);
687 else
688 miocack(wq, mp, sizeof (*t32), 0);
689 } else
690 #endif /* _SYSCALL32_IMPL */
691 {
692 if (transparent == TRANSPARENT) {
693 datamp = allocb(sizeof (*t), BPRI_MED);
694 if (datamp == NULL) {
695 miocnak(wq, mp, 0, EAGAIN);
696 break;
697 }
698 mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
699 }
700
701 t = (struct timeval *)mp->b_cont->b_rptr;
702 TICK_TO_TIMEVAL(sbp->sb_ticks, t);
703
704 if (transparent == TRANSPARENT)
705 qreply(wq, mp);
706 else
707 miocack(wq, mp, sizeof (*t), 0);
708 }
709 break;
710 }
711
712 case SBIOCCTIME:
713 sbp->sb_ticks = -1;
714 miocack(wq, mp, 0, 0);
715 break;
716
717 case SBIOCSCHUNK:
718 if (iocp->ioc_count == TRANSPARENT) {
719 mcopyin(mp, NULL, sizeof (uint_t), NULL);
720 qreply(wq, mp);
721 } else {
722 /*
723 * Verify argument length.
724 */
725 error = miocpullup(mp, sizeof (uint_t));
726 if (error != 0) {
727 miocnak(wq, mp, 0, error);
728 break;
729 }
730
731 /*
732 * set up hi/lo water marks on stream head read queue.
733 * unlikely to run out of resources. Fix at later date.
734 */
735 if ((mop = allocb(sizeof (struct stroptions),
736 BPRI_MED)) != NULL) {
737 struct stroptions *sop;
738 uint_t chunk;
739
740 chunk = *(uint_t *)mp->b_cont->b_rptr;
741 mop->b_datap->db_type = M_SETOPTS;
742 mop->b_wptr += sizeof (struct stroptions);
743 sop = (struct stroptions *)mop->b_rptr;
744 sop->so_flags = SO_HIWAT | SO_LOWAT;
745 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
746 sop->so_lowat = SNIT_LOWAT(chunk, 1);
747 qreply(wq, mop);
748 }
749
750 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
751 miocack(wq, mp, 0, 0);
752 sbclosechunk(sbp);
753 }
754 break;
755
756 case SBIOCGCHUNK:
757 /*
758 * Verify argument length.
759 */
760 if (transparent != TRANSPARENT) {
761 error = miocpullup(mp, sizeof (uint_t));
762 if (error != 0) {
763 miocnak(wq, mp, 0, error);
764 break;
765 }
766 }
767
768 if (transparent == TRANSPARENT) {
769 datamp = allocb(sizeof (uint_t), BPRI_MED);
770 if (datamp == NULL) {
771 miocnak(wq, mp, 0, EAGAIN);
772 break;
773 }
774 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
775 }
776
777 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
778
779 if (transparent == TRANSPARENT)
780 qreply(wq, mp);
781 else
782 miocack(wq, mp, sizeof (uint_t), 0);
783 break;
784
785 case SBIOCSSNAP:
786 if (iocp->ioc_count == TRANSPARENT) {
787 mcopyin(mp, NULL, sizeof (uint_t), NULL);
788 qreply(wq, mp);
789 } else {
790 /*
791 * Verify argument length.
792 */
793 error = miocpullup(mp, sizeof (uint_t));
794 if (error != 0) {
795 miocnak(wq, mp, 0, error);
796 break;
797 }
798
799 /*
800 * if chunking dont worry about effects of
801 * snipping of message size on head flow control
802 * since it has a relatively small bearing on the
803 * data rate onto the streamn head.
804 */
805 if (!sbp->sb_chunk) {
806 /*
807 * set up hi/lo water marks on stream
808 * head read queue. unlikely to run out
809 * of resources. Fix at later date.
810 */
811 if ((mop = allocb(sizeof (struct stroptions),
812 BPRI_MED)) != NULL) {
813 struct stroptions *sop;
814 uint_t snap;
815 int fudge;
816
817 snap = *(uint_t *)mp->b_cont->b_rptr;
818 mop->b_datap->db_type = M_SETOPTS;
819 mop->b_wptr += sizeof (*sop);
820 sop = (struct stroptions *)mop->b_rptr;
821 sop->so_flags = SO_HIWAT | SO_LOWAT;
822 fudge = (snap <= 100) ? 4 :
823 (snap <= 400) ? 2 : 1;
824 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
825 sop->so_lowat = SNIT_LOWAT(snap, fudge);
826 qreply(wq, mop);
827 }
828 }
829
830 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
831
832 miocack(wq, mp, 0, 0);
833 }
834 break;
835
836 case SBIOCGSNAP:
837 /*
838 * Verify argument length
839 */
840 if (transparent != TRANSPARENT) {
841 error = miocpullup(mp, sizeof (uint_t));
842 if (error != 0) {
843 miocnak(wq, mp, 0, error);
844 break;
845 }
846 }
847
848 if (transparent == TRANSPARENT) {
849 datamp = allocb(sizeof (uint_t), BPRI_MED);
850 if (datamp == NULL) {
851 miocnak(wq, mp, 0, EAGAIN);
852 break;
853 }
854 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
855 }
856
857 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
858
859 if (transparent == TRANSPARENT)
860 qreply(wq, mp);
861 else
862 miocack(wq, mp, sizeof (uint_t), 0);
863 break;
864
865 case SBIOCSFLAGS:
866 /*
867 * set the flags.
868 */
869 if (iocp->ioc_count == TRANSPARENT) {
870 mcopyin(mp, NULL, sizeof (uint_t), NULL);
871 qreply(wq, mp);
872 } else {
873 error = miocpullup(mp, sizeof (uint_t));
874 if (error != 0) {
875 miocnak(wq, mp, 0, error);
876 break;
877 }
878 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
879 miocack(wq, mp, 0, 0);
880 }
881 break;
882
883 case SBIOCGFLAGS:
884 /*
885 * Verify argument length
886 */
887 if (transparent != TRANSPARENT) {
888 error = miocpullup(mp, sizeof (uint_t));
889 if (error != 0) {
890 miocnak(wq, mp, 0, error);
891 break;
892 }
893 }
894
895 if (transparent == TRANSPARENT) {
896 datamp = allocb(sizeof (uint_t), BPRI_MED);
897 if (datamp == NULL) {
898 miocnak(wq, mp, 0, EAGAIN);
899 break;
900 }
901 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
902 }
903
904 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
905
906 if (transparent == TRANSPARENT)
907 qreply(wq, mp);
908 else
909 miocack(wq, mp, sizeof (uint_t), 0);
910 break;
911
912
913 default:
914 putnext(wq, mp);
915 break;
916 }
917 }
918
919 /*
920 * Given a length l, calculate the amount of extra storage
921 * required to round it up to the next multiple of the alignment a.
922 */
923 #define RoundUpAmt(l, a) ((l) % (a) ? (a) - ((l) % (a)) : 0)
924 /*
925 * Calculate additional amount of space required for alignment.
926 */
927 #define Align(l) RoundUpAmt(l, sizeof (ulong_t))
928 /*
929 * Smallest possible message size when headers are enabled.
930 * This is used to calculate whether a chunk is nearly full.
931 */
932 #define SMALLEST_MESSAGE sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
933
934 /*
935 * Process a read-side M_DATA message.
936 *
937 * If the currently accumulating chunk doesn't have enough room
938 * for the message, close off the chunk, pass it upward, and start
939 * a new one. Then add the message to the current chunk, taking
940 * account of the possibility that the message's size exceeds the
941 * chunk size.
942 *
943 * If headers are enabled add an sb_hdr header and trailing alignment padding.
944 *
945 * To optimise performance the total number of msgbs should be kept
946 * to a minimum. This is achieved by using any remaining space in message N
947 * for both its own padding as well as the header of message N+1 if possible.
948 * If there's insufficient space we allocate one message to hold this 'wrapper'.
949 * (there's likely to be space beyond message N, since allocb would have
950 * rounded up the required size to one of the dblk_sizes).
951 *
952 */
953 static void
sbaddmsg(queue_t * rq,mblk_t * mp)954 sbaddmsg(queue_t *rq, mblk_t *mp)
955 {
956 struct sb *sbp;
957 struct timeval t;
958 struct sb_hdr hp;
959 mblk_t *wrapper; /* padding for msg N, header for msg N+1 */
960 mblk_t *last; /* last mblk of current message */
961 size_t wrapperlen; /* length of header + padding */
962 size_t origlen; /* data length before truncation */
963 size_t pad; /* bytes required to align header */
964
965 sbp = (struct sb *)rq->q_ptr;
966
967 origlen = msgdsize(mp);
968
969 /*
970 * Truncate the message.
971 */
972 if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
973 (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
974 hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
975 else
976 hp.sbh_totlen = hp.sbh_msglen = origlen;
977
978 if (sbp->sb_flags & SB_NO_HEADER) {
979
980 /*
981 * Would the inclusion of this message overflow the current
982 * chunk? If so close the chunk off and start a new one.
983 */
984 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
985 sbclosechunk(sbp);
986 /*
987 * First message too big for chunk - just send it up.
988 * This will always be true when we're not chunking.
989 */
990 if (hp.sbh_totlen > sbp->sb_chunk) {
991 sbsendit(rq, mp);
992 return;
993 }
994
995 /*
996 * We now know that the msg will fit in the chunk.
997 * Link it onto the end of the chunk.
998 * Since linkb() walks the entire chain, we keep a pointer to
999 * the first mblk of the last msgb added and call linkb on that
1000 * that last message, rather than performing the
1001 * O(n) linkb() operation on the whole chain.
1002 * sb_head isn't needed in this SB_NO_HEADER mode.
1003 */
1004 if (sbp->sb_mp)
1005 linkb(sbp->sb_tail, mp);
1006 else
1007 sbp->sb_mp = mp;
1008
1009 sbp->sb_tail = mp;
1010 sbp->sb_mlen += hp.sbh_totlen;
1011 sbp->sb_mcount++;
1012 } else {
1013 /* Timestamp must be done immediately */
1014 uniqtime(&t);
1015 TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
1016
1017 pad = Align(hp.sbh_totlen);
1018 hp.sbh_totlen += sizeof (hp);
1019 hp.sbh_totlen += pad;
1020
1021 /*
1022 * Would the inclusion of this message overflow the current
1023 * chunk? If so close the chunk off and start a new one.
1024 */
1025 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
1026 sbclosechunk(sbp);
1027
1028 if (sbp->sb_head == NULL) {
1029 /* Allocate leading header of new chunk */
1030 sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
1031 if (sbp->sb_head == NULL) {
1032 /*
1033 * Memory allocation failure.
1034 * This will need to be revisited
1035 * since using certain flag combinations
1036 * can result in messages being dropped
1037 * silently.
1038 */
1039 freemsg(mp);
1040 sbp->sb_drops++;
1041 return;
1042 }
1043 sbp->sb_mp = sbp->sb_head;
1044 }
1045
1046 /*
1047 * Copy header into message
1048 */
1049 hp.sbh_drops = sbp->sb_drops;
1050 hp.sbh_origlen = origlen;
1051 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
1052 sbp->sb_head->b_wptr += sizeof (hp);
1053
1054 ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
1055
1056 /*
1057 * Join message to the chunk
1058 */
1059 linkb(sbp->sb_head, mp);
1060
1061 sbp->sb_mcount++;
1062 sbp->sb_mlen += hp.sbh_totlen;
1063
1064 /*
1065 * If the first message alone is too big for the chunk close
1066 * the chunk now.
1067 * If the next message would immediately cause the chunk to
1068 * overflow we may as well close the chunk now. The next
1069 * message is certain to be at least SMALLEST_MESSAGE size.
1070 */
1071 if (hp.sbh_totlen + SMALLEST_MESSAGE > sbp->sb_chunk) {
1072 sbclosechunk(sbp);
1073 return;
1074 }
1075
1076 /*
1077 * Find space for the wrapper. The wrapper consists of:
1078 *
1079 * 1) Padding for this message (this is to ensure each header
1080 * begins on an 8 byte boundary in the userland buffer).
1081 *
1082 * 2) Space for the next message's header, in case the next
1083 * next message will fit in this chunk.
1084 *
1085 * It may be possible to append the wrapper to the last mblk
1086 * of the message, but only if we 'own' the data. If the dblk
1087 * has been shared through dupmsg() we mustn't alter it.
1088 */
1089
1090 wrapperlen = (sizeof (hp) + pad);
1091
1092 /* Is there space for the wrapper beyond the message's data ? */
1093 for (last = mp; last->b_cont; last = last->b_cont)
1094 ;
1095
1096 if ((wrapperlen <= MBLKTAIL(last)) &&
1097 (last->b_datap->db_ref == 1)) {
1098 if (pad > 0) {
1099 /*
1100 * Pad with zeroes to the next pointer boundary
1101 * (we don't want to disclose kernel data to
1102 * users), then advance wptr.
1103 */
1104 (void) memset(last->b_wptr, 0, pad);
1105 last->b_wptr += pad;
1106 }
1107 /* Remember where to write the header information */
1108 sbp->sb_head = last;
1109 } else {
1110 /* Have to allocate additional space for the wrapper */
1111 wrapper = allocb(wrapperlen, BPRI_MED);
1112 if (wrapper == NULL) {
1113 sbclosechunk(sbp);
1114 return;
1115 }
1116 if (pad > 0) {
1117 /*
1118 * Pad with zeroes (we don't want to disclose
1119 * kernel data to users).
1120 */
1121 (void) memset(wrapper->b_wptr, 0, pad);
1122 wrapper->b_wptr += pad;
1123 }
1124 /* Link the wrapper msg onto the end of the chunk */
1125 linkb(mp, wrapper);
1126 /* Remember to write the next header in this wrapper */
1127 sbp->sb_head = wrapper;
1128 }
1129 }
1130 }
1131
1132 /*
1133 * Called from timeout().
1134 * Signal a timeout by passing a zero-length M_CTL msg in the read-side
1135 * to synchronize with any active module threads (open, close, wput, rput).
1136 */
1137 static void
sbtick(void * arg)1138 sbtick(void *arg)
1139 {
1140 struct sb *sbp = arg;
1141 queue_t *rq;
1142
1143 ASSERT(sbp);
1144
1145 rq = sbp->sb_rq;
1146 sbp->sb_timeoutid = 0; /* timeout has fired */
1147
1148 if (putctl(rq, M_CTL) == 0) /* failure */
1149 sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
1150 }
1151
1152 /*
1153 * Close off the currently accumulating chunk and pass
1154 * it upward. Takes care of resetting timers as well.
1155 *
1156 * This routine is called both directly and as a result
1157 * of the chunk timeout expiring.
1158 */
1159 static void
sbclosechunk(struct sb * sbp)1160 sbclosechunk(struct sb *sbp)
1161 {
1162 mblk_t *mp;
1163 queue_t *rq;
1164
1165 ASSERT(sbp);
1166
1167 if (sbp->sb_timeoutid) {
1168 (void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
1169 sbp->sb_timeoutid = 0;
1170 }
1171
1172 mp = sbp->sb_mp;
1173 rq = sbp->sb_rq;
1174
1175 /*
1176 * If there's currently a chunk in progress, close it off
1177 * and try to send it up.
1178 */
1179 if (mp) {
1180 sbsendit(rq, mp);
1181 }
1182
1183 /*
1184 * Clear old chunk. Ready for new msgs.
1185 */
1186 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
1187 sbp->sb_mlen = 0;
1188 sbp->sb_mcount = 0;
1189 if (sbp->sb_flags & SB_DEFER_CHUNK)
1190 sbp->sb_state &= ~SB_FRCVD;
1191
1192 }
1193
1194 static void
sbsendit(queue_t * rq,mblk_t * mp)1195 sbsendit(queue_t *rq, mblk_t *mp)
1196 {
1197 struct sb *sbp = (struct sb *)rq->q_ptr;
1198
1199 if (!canputnext(rq)) {
1200 if (sbp->sb_flags & SB_NO_DROPS)
1201 (void) putq(rq, mp);
1202 else {
1203 freemsg(mp);
1204 sbp->sb_drops += sbp->sb_mcount;
1205 }
1206 return;
1207 }
1208 /*
1209 * If there are messages on the q already, keep
1210 * queueing them since they need to be processed in order.
1211 */
1212 if (qsize(rq) > 0) {
1213 /* should only get here if SB_NO_DROPS */
1214 (void) putq(rq, mp);
1215 }
1216 else
1217 putnext(rq, mp);
1218 }
1219