1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 * Copyright 2018 Joyent, Inc.
26 */
27
28 /*
29 * STREAMS Buffering module
30 *
31 * This streams module collects incoming messages from modules below
32 * it on the stream and buffers them up into a smaller number of
33 * aggregated messages. Its main purpose is to reduce overhead by
34 * cutting down on the number of read (or getmsg) calls its client
35 * user process makes.
36 * - only M_DATA is buffered.
37 * - multithreading assumes configured as D_MTQPAIR
38 * - packets are lost only if flag SB_NO_HEADER is clear and buffer
39 * allocation fails.
40 * - in order message transmission. This is enforced for messages other
41 * than high priority messages.
42 * - zero length messages on the read side are not passed up the
43 * stream but used internally for synchronization.
44 * FLAGS:
45 * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
46 * (conversion is the default for backwards compatibility
47 * hence the negative logic).
48 * - SB_NO_HEADER - no headers in buffered data.
49 * (adding headers is the default for backwards compatibility
50 * hence the negative logic).
51 * - SB_DEFER_CHUNK - provides improved response time in question-answer
52 * applications. Buffering is not enabled until the second message
53 * is received on the read side within the sb_ticks interval.
54 * This option will often be used in combination with flag SB_SEND_ON_WRITE.
55 * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
56 * data being immediately sent upstream.
57 * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
58 * the blocked flow condition downstream. If this flag is clear (default)
59 * messages will be dropped if the upstream flow is blocked.
60 */
61
62
63 #include <sys/types.h>
64 #include <sys/errno.h>
65 #include <sys/debug.h>
66 #include <sys/stropts.h>
67 #include <sys/time.h>
68 #include <sys/stream.h>
69 #include <sys/conf.h>
70 #include <sys/ddi.h>
71 #include <sys/sunddi.h>
72 #include <sys/kmem.h>
73 #include <sys/strsun.h>
74 #include <sys/bufmod.h>
75 #include <sys/modctl.h>
76 #include <sys/isa_defs.h>
77
78 /*
79 * Per-Stream state information.
80 *
81 * If sb_ticks is negative, we don't deliver chunks until they're
82 * full. If it's zero, we deliver every packet as it arrives. (In
83 * this case we force sb_chunk to zero, to make the implementation
84 * easier.) Otherwise, sb_ticks gives the number of ticks in a
85 * buffering interval. The interval begins when the a read side data
86 * message is received and a timeout is not active. If sb_snap is
87 * zero, no truncation of the msg is done.
88 */
89 struct sb {
90 queue_t *sb_rq; /* our rq */
91 mblk_t *sb_mp; /* partial chunk */
92 mblk_t *sb_head; /* pre-allocated space for the next header */
93 mblk_t *sb_tail; /* first mblk of last message appended */
94 uint_t sb_mlen; /* sb_mp length */
95 uint_t sb_mcount; /* input msg count in sb_mp */
96 uint_t sb_chunk; /* max chunk size */
97 clock_t sb_ticks; /* timeout interval */
98 timeout_id_t sb_timeoutid; /* qtimeout() id */
99 uint_t sb_drops; /* cumulative # discarded msgs */
100 uint_t sb_snap; /* snapshot length */
101 uint_t sb_flags; /* flags field */
102 uint_t sb_state; /* state variable */
103 };
104
105 /*
106 * Function prototypes.
107 */
108 static int sbopen(queue_t *, dev_t *, int, int, cred_t *);
109 static int sbclose(queue_t *, int, cred_t *);
110 static void sbwput(queue_t *, mblk_t *);
111 static void sbrput(queue_t *, mblk_t *);
112 static void sbrsrv(queue_t *);
113 static void sbioctl(queue_t *, mblk_t *);
114 static void sbaddmsg(queue_t *, mblk_t *);
115 static void sbtick(void *);
116 static void sbclosechunk(struct sb *);
117 static void sbsendit(queue_t *, mblk_t *);
118
119 static struct module_info sb_minfo = {
120 21, /* mi_idnum */
121 "bufmod", /* mi_idname */
122 0, /* mi_minpsz */
123 INFPSZ, /* mi_maxpsz */
124 1, /* mi_hiwat */
125 0 /* mi_lowat */
126 };
127
128 static struct qinit sb_rinit = {
129 (int (*)())sbrput, /* qi_putp */
130 (int (*)())sbrsrv, /* qi_srvp */
131 sbopen, /* qi_qopen */
132 sbclose, /* qi_qclose */
133 NULL, /* qi_qadmin */
134 &sb_minfo, /* qi_minfo */
135 NULL /* qi_mstat */
136 };
137
138 static struct qinit sb_winit = {
139 (int (*)())sbwput, /* qi_putp */
140 NULL, /* qi_srvp */
141 NULL, /* qi_qopen */
142 NULL, /* qi_qclose */
143 NULL, /* qi_qadmin */
144 &sb_minfo, /* qi_minfo */
145 NULL /* qi_mstat */
146 };
147
148 static struct streamtab sb_info = {
149 &sb_rinit, /* st_rdinit */
150 &sb_winit, /* st_wrinit */
151 NULL, /* st_muxrinit */
152 NULL /* st_muxwinit */
153 };
154
155
156 /*
157 * This is the loadable module wrapper.
158 */
159
160 static struct fmodsw fsw = {
161 "bufmod",
162 &sb_info,
163 D_MTQPAIR | D_MP
164 };
165
166 /*
167 * Module linkage information for the kernel.
168 */
169
170 static struct modlstrmod modlstrmod = {
171 &mod_strmodops, "streams buffer mod", &fsw
172 };
173
174 static struct modlinkage modlinkage = {
175 MODREV_1, &modlstrmod, NULL
176 };
177
178
179 int
_init(void)180 _init(void)
181 {
182 return (mod_install(&modlinkage));
183 }
184
185 int
_fini(void)186 _fini(void)
187 {
188 return (mod_remove(&modlinkage));
189 }
190
191 int
_info(struct modinfo * modinfop)192 _info(struct modinfo *modinfop)
193 {
194 return (mod_info(&modlinkage, modinfop));
195 }
196
197
198 /* ARGSUSED */
199 static int
sbopen(queue_t * rq,dev_t * dev,int oflag,int sflag,cred_t * crp)200 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
201 {
202 struct sb *sbp;
203 ASSERT(rq);
204
205 if (sflag != MODOPEN)
206 return (EINVAL);
207
208 if (rq->q_ptr)
209 return (0);
210
211 /*
212 * Allocate and initialize per-Stream structure.
213 */
214 sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
215 sbp->sb_rq = rq;
216 sbp->sb_ticks = -1;
217 sbp->sb_chunk = SB_DFLT_CHUNK;
218 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
219 sbp->sb_mlen = 0;
220 sbp->sb_mcount = 0;
221 sbp->sb_timeoutid = 0;
222 sbp->sb_drops = 0;
223 sbp->sb_snap = 0;
224 sbp->sb_flags = 0;
225 sbp->sb_state = 0;
226
227 rq->q_ptr = WR(rq)->q_ptr = sbp;
228
229 qprocson(rq);
230
231
232 return (0);
233 }
234
235 /* ARGSUSED1 */
236 static int
sbclose(queue_t * rq,int flag,cred_t * credp)237 sbclose(queue_t *rq, int flag, cred_t *credp)
238 {
239 struct sb *sbp = (struct sb *)rq->q_ptr;
240
241 ASSERT(sbp);
242
243 qprocsoff(rq);
244 /*
245 * Cancel an outstanding timeout
246 */
247 if (sbp->sb_timeoutid != 0) {
248 (void) quntimeout(rq, sbp->sb_timeoutid);
249 sbp->sb_timeoutid = 0;
250 }
251 /*
252 * Free the current chunk.
253 */
254 if (sbp->sb_mp) {
255 freemsg(sbp->sb_mp);
256 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
257 sbp->sb_mlen = 0;
258 }
259
260 /*
261 * Free the per-Stream structure.
262 */
263 kmem_free((caddr_t)sbp, sizeof (struct sb));
264 rq->q_ptr = WR(rq)->q_ptr = NULL;
265
266 return (0);
267 }
268
269 /*
270 * the correction factor is introduced to compensate for
271 * whatever assumptions the modules below have made about
272 * how much traffic is flowing through the stream and the fact
273 * that bufmod may be snipping messages with the sb_snap length.
274 */
275 #define SNIT_HIWAT(msgsize, fudge) ((4 * msgsize * fudge) + 512)
276 #define SNIT_LOWAT(msgsize, fudge) ((2 * msgsize * fudge) + 256)
277
278
279 static void
sbioc(queue_t * wq,mblk_t * mp)280 sbioc(queue_t *wq, mblk_t *mp)
281 {
282 struct iocblk *iocp;
283 struct sb *sbp = (struct sb *)wq->q_ptr;
284 clock_t ticks;
285 mblk_t *mop;
286
287 iocp = (struct iocblk *)mp->b_rptr;
288
289 switch (iocp->ioc_cmd) {
290 case SBIOCGCHUNK:
291 case SBIOCGSNAP:
292 case SBIOCGFLAGS:
293 case SBIOCGTIME:
294 miocack(wq, mp, 0, 0);
295 return;
296
297 case SBIOCSTIME:
298 #ifdef _SYSCALL32_IMPL
299 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
300 struct timeval32 *t32;
301
302 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
303 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
304 miocnak(wq, mp, 0, EINVAL);
305 break;
306 }
307 ticks = TIMEVAL_TO_TICK(t32);
308 } else
309 #endif /* _SYSCALL32_IMPL */
310 {
311 struct timeval *tb;
312
313 tb = (struct timeval *)mp->b_cont->b_rptr;
314
315 if (tb->tv_sec < 0 || tb->tv_usec < 0) {
316 miocnak(wq, mp, 0, EINVAL);
317 break;
318 }
319 ticks = TIMEVAL_TO_TICK(tb);
320 }
321 sbp->sb_ticks = ticks;
322 if (ticks == 0)
323 sbp->sb_chunk = 0;
324 miocack(wq, mp, 0, 0);
325 sbclosechunk(sbp);
326 return;
327
328 case SBIOCSCHUNK:
329 /*
330 * set up hi/lo water marks on stream head read queue.
331 * unlikely to run out of resources. Fix at later date.
332 */
333 if ((mop = allocb(sizeof (struct stroptions),
334 BPRI_MED)) != NULL) {
335 struct stroptions *sop;
336 uint_t chunk;
337
338 chunk = *(uint_t *)mp->b_cont->b_rptr;
339 mop->b_datap->db_type = M_SETOPTS;
340 mop->b_wptr += sizeof (struct stroptions);
341 sop = (struct stroptions *)mop->b_rptr;
342 sop->so_flags = SO_HIWAT | SO_LOWAT;
343 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
344 sop->so_lowat = SNIT_LOWAT(chunk, 1);
345 qreply(wq, mop);
346 }
347
348 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
349 miocack(wq, mp, 0, 0);
350 sbclosechunk(sbp);
351 return;
352
353 case SBIOCSFLAGS:
354 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
355 miocack(wq, mp, 0, 0);
356 return;
357
358 case SBIOCSSNAP:
359 /*
360 * if chunking dont worry about effects of
361 * snipping of message size on head flow control
362 * since it has a relatively small bearing on the
363 * data rate onto the streamn head.
364 */
365 if (!sbp->sb_chunk) {
366 /*
367 * set up hi/lo water marks on stream head read queue.
368 * unlikely to run out of resources. Fix at later date.
369 */
370 if ((mop = allocb(sizeof (struct stroptions),
371 BPRI_MED)) != NULL) {
372 struct stroptions *sop;
373 uint_t snap;
374 int fudge;
375
376 snap = *(uint_t *)mp->b_cont->b_rptr;
377 mop->b_datap->db_type = M_SETOPTS;
378 mop->b_wptr += sizeof (struct stroptions);
379 sop = (struct stroptions *)mop->b_rptr;
380 sop->so_flags = SO_HIWAT | SO_LOWAT;
381 fudge = snap <= 100 ? 4 :
382 snap <= 400 ? 2 :
383 1;
384 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
385 sop->so_lowat = SNIT_LOWAT(snap, fudge);
386 qreply(wq, mop);
387 }
388 }
389
390 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
391 miocack(wq, mp, 0, 0);
392 return;
393
394 default:
395 ASSERT(0);
396 return;
397 }
398 }
399
400 /*
401 * Write-side put procedure. Its main task is to detect ioctls
402 * for manipulating the buffering state and hand them to sbioctl.
403 * Other message types are passed on through.
404 */
405 static void
sbwput(queue_t * wq,mblk_t * mp)406 sbwput(queue_t *wq, mblk_t *mp)
407 {
408 struct sb *sbp = (struct sb *)wq->q_ptr;
409 struct copyresp *resp;
410
411 if (sbp->sb_flags & SB_SEND_ON_WRITE)
412 sbclosechunk(sbp);
413 switch (mp->b_datap->db_type) {
414 case M_IOCTL:
415 sbioctl(wq, mp);
416 break;
417
418 case M_IOCDATA:
419 resp = (struct copyresp *)mp->b_rptr;
420 if (resp->cp_rval) {
421 /*
422 * Just free message on failure.
423 */
424 freemsg(mp);
425 break;
426 }
427
428 switch (resp->cp_cmd) {
429 case SBIOCSTIME:
430 case SBIOCSCHUNK:
431 case SBIOCSFLAGS:
432 case SBIOCSSNAP:
433 case SBIOCGTIME:
434 case SBIOCGCHUNK:
435 case SBIOCGSNAP:
436 case SBIOCGFLAGS:
437 sbioc(wq, mp);
438 break;
439
440 default:
441 putnext(wq, mp);
442 break;
443 }
444 break;
445
446 default:
447 putnext(wq, mp);
448 break;
449 }
450 }
451
452 /*
453 * Read-side put procedure. It's responsible for buffering up incoming
454 * messages and grouping them into aggregates according to the current
455 * buffering parameters.
456 */
457 static void
sbrput(queue_t * rq,mblk_t * mp)458 sbrput(queue_t *rq, mblk_t *mp)
459 {
460 struct sb *sbp = (struct sb *)rq->q_ptr;
461
462 ASSERT(sbp);
463
464 switch (mp->b_datap->db_type) {
465 case M_PROTO:
466 if (sbp->sb_flags & SB_NO_PROTO_CVT) {
467 sbclosechunk(sbp);
468 sbsendit(rq, mp);
469 break;
470 } else {
471 /*
472 * Convert M_PROTO to M_DATA.
473 */
474 mp->b_datap->db_type = M_DATA;
475 }
476 /* FALLTHRU */
477
478 case M_DATA:
479 if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
480 !(sbp->sb_state & SB_FRCVD)) {
481 sbclosechunk(sbp);
482 sbsendit(rq, mp);
483 sbp->sb_state |= SB_FRCVD;
484 } else
485 sbaddmsg(rq, mp);
486
487 if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
488 sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
489 sbp, sbp->sb_ticks);
490
491 break;
492
493 case M_FLUSH:
494 if (*mp->b_rptr & FLUSHR) {
495 /*
496 * Reset timeout, flush the chunk currently in
497 * progress, and start a new chunk.
498 */
499 if (sbp->sb_timeoutid) {
500 (void) quntimeout(sbp->sb_rq,
501 sbp->sb_timeoutid);
502 sbp->sb_timeoutid = 0;
503 }
504 if (sbp->sb_mp) {
505 freemsg(sbp->sb_mp);
506 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
507 sbp->sb_mlen = 0;
508 sbp->sb_mcount = 0;
509 }
510 flushq(rq, FLUSHALL);
511 }
512 putnext(rq, mp);
513 break;
514
515 case M_CTL:
516 /*
517 * Zero-length M_CTL means our timeout() popped.
518 */
519 if (MBLKL(mp) == 0) {
520 freemsg(mp);
521 sbclosechunk(sbp);
522 } else {
523 sbclosechunk(sbp);
524 sbsendit(rq, mp);
525 }
526 break;
527
528 default:
529 if (mp->b_datap->db_type <= QPCTL) {
530 sbclosechunk(sbp);
531 sbsendit(rq, mp);
532 } else {
533 /* Note: out of band */
534 putnext(rq, mp);
535 }
536 break;
537 }
538 }
539
540 /*
541 * read service procedure.
542 */
543 /* ARGSUSED */
544 static void
sbrsrv(queue_t * rq)545 sbrsrv(queue_t *rq)
546 {
547 mblk_t *mp;
548
549 /*
550 * High priority messages shouldn't get here but if
551 * one does, jam it through to avoid infinite loop.
552 */
553 while ((mp = getq(rq)) != NULL) {
554 if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
555 /* should only get here if SB_NO_SROPS */
556 (void) putbq(rq, mp);
557 return;
558 }
559 putnext(rq, mp);
560 }
561 }
562
563 /*
564 * Handle write-side M_IOCTL messages.
565 */
566 static void
sbioctl(queue_t * wq,mblk_t * mp)567 sbioctl(queue_t *wq, mblk_t *mp)
568 {
569 struct sb *sbp = (struct sb *)wq->q_ptr;
570 struct iocblk *iocp = (struct iocblk *)mp->b_rptr;
571 struct timeval *t;
572 clock_t ticks;
573 mblk_t *mop;
574 int transparent = iocp->ioc_count;
575 mblk_t *datamp;
576 int error;
577
578 switch (iocp->ioc_cmd) {
579 case SBIOCSTIME:
580 if (iocp->ioc_count == TRANSPARENT) {
581 #ifdef _SYSCALL32_IMPL
582 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
583 mcopyin(mp, NULL, sizeof (struct timeval32),
584 NULL);
585 } else
586 #endif /* _SYSCALL32_IMPL */
587 {
588 mcopyin(mp, NULL, sizeof (*t), NULL);
589 }
590 qreply(wq, mp);
591 } else {
592 /*
593 * Verify argument length.
594 */
595 #ifdef _SYSCALL32_IMPL
596 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
597 struct timeval32 *t32;
598
599 error = miocpullup(mp,
600 sizeof (struct timeval32));
601 if (error != 0) {
602 miocnak(wq, mp, 0, error);
603 break;
604 }
605 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
606 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
607 miocnak(wq, mp, 0, EINVAL);
608 break;
609 }
610 ticks = TIMEVAL_TO_TICK(t32);
611 } else
612 #endif /* _SYSCALL32_IMPL */
613 {
614 error = miocpullup(mp, sizeof (struct timeval));
615 if (error != 0) {
616 miocnak(wq, mp, 0, error);
617 break;
618 }
619
620 t = (struct timeval *)mp->b_cont->b_rptr;
621 if (t->tv_sec < 0 || t->tv_usec < 0) {
622 miocnak(wq, mp, 0, EINVAL);
623 break;
624 }
625 ticks = TIMEVAL_TO_TICK(t);
626 }
627 sbp->sb_ticks = ticks;
628 if (ticks == 0)
629 sbp->sb_chunk = 0;
630 miocack(wq, mp, 0, 0);
631 sbclosechunk(sbp);
632 }
633 break;
634
635 case SBIOCGTIME: {
636 struct timeval *t;
637
638 /*
639 * Verify argument length.
640 */
641 if (transparent != TRANSPARENT) {
642 #ifdef _SYSCALL32_IMPL
643 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
644 error = miocpullup(mp,
645 sizeof (struct timeval32));
646 if (error != 0) {
647 miocnak(wq, mp, 0, error);
648 break;
649 }
650 } else
651 #endif /* _SYSCALL32_IMPL */
652 error = miocpullup(mp, sizeof (struct timeval));
653 if (error != 0) {
654 miocnak(wq, mp, 0, error);
655 break;
656 }
657 }
658
659 /*
660 * If infinite timeout, return range error
661 * for the ioctl.
662 */
663 if (sbp->sb_ticks < 0) {
664 miocnak(wq, mp, 0, ERANGE);
665 break;
666 }
667
668 #ifdef _SYSCALL32_IMPL
669 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
670 struct timeval32 *t32;
671
672 if (transparent == TRANSPARENT) {
673 datamp = allocb(sizeof (*t32), BPRI_MED);
674 if (datamp == NULL) {
675 miocnak(wq, mp, 0, EAGAIN);
676 break;
677 }
678 mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
679 }
680
681 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
682 TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
683
684 if (transparent == TRANSPARENT)
685 qreply(wq, mp);
686 else
687 miocack(wq, mp, sizeof (*t32), 0);
688 } else
689 #endif /* _SYSCALL32_IMPL */
690 {
691 if (transparent == TRANSPARENT) {
692 datamp = allocb(sizeof (*t), BPRI_MED);
693 if (datamp == NULL) {
694 miocnak(wq, mp, 0, EAGAIN);
695 break;
696 }
697 mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
698 }
699
700 t = (struct timeval *)mp->b_cont->b_rptr;
701 TICK_TO_TIMEVAL(sbp->sb_ticks, t);
702
703 if (transparent == TRANSPARENT)
704 qreply(wq, mp);
705 else
706 miocack(wq, mp, sizeof (*t), 0);
707 }
708 break;
709 }
710
711 case SBIOCCTIME:
712 sbp->sb_ticks = -1;
713 miocack(wq, mp, 0, 0);
714 break;
715
716 case SBIOCSCHUNK:
717 if (iocp->ioc_count == TRANSPARENT) {
718 mcopyin(mp, NULL, sizeof (uint_t), NULL);
719 qreply(wq, mp);
720 } else {
721 /*
722 * Verify argument length.
723 */
724 error = miocpullup(mp, sizeof (uint_t));
725 if (error != 0) {
726 miocnak(wq, mp, 0, error);
727 break;
728 }
729
730 /*
731 * set up hi/lo water marks on stream head read queue.
732 * unlikely to run out of resources. Fix at later date.
733 */
734 if ((mop = allocb(sizeof (struct stroptions),
735 BPRI_MED)) != NULL) {
736 struct stroptions *sop;
737 uint_t chunk;
738
739 chunk = *(uint_t *)mp->b_cont->b_rptr;
740 mop->b_datap->db_type = M_SETOPTS;
741 mop->b_wptr += sizeof (struct stroptions);
742 sop = (struct stroptions *)mop->b_rptr;
743 sop->so_flags = SO_HIWAT | SO_LOWAT;
744 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
745 sop->so_lowat = SNIT_LOWAT(chunk, 1);
746 qreply(wq, mop);
747 }
748
749 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
750 miocack(wq, mp, 0, 0);
751 sbclosechunk(sbp);
752 }
753 break;
754
755 case SBIOCGCHUNK:
756 /*
757 * Verify argument length.
758 */
759 if (transparent != TRANSPARENT) {
760 error = miocpullup(mp, sizeof (uint_t));
761 if (error != 0) {
762 miocnak(wq, mp, 0, error);
763 break;
764 }
765 }
766
767 if (transparent == TRANSPARENT) {
768 datamp = allocb(sizeof (uint_t), BPRI_MED);
769 if (datamp == NULL) {
770 miocnak(wq, mp, 0, EAGAIN);
771 break;
772 }
773 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
774 }
775
776 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
777
778 if (transparent == TRANSPARENT)
779 qreply(wq, mp);
780 else
781 miocack(wq, mp, sizeof (uint_t), 0);
782 break;
783
784 case SBIOCSSNAP:
785 if (iocp->ioc_count == TRANSPARENT) {
786 mcopyin(mp, NULL, sizeof (uint_t), NULL);
787 qreply(wq, mp);
788 } else {
789 /*
790 * Verify argument length.
791 */
792 error = miocpullup(mp, sizeof (uint_t));
793 if (error != 0) {
794 miocnak(wq, mp, 0, error);
795 break;
796 }
797
798 /*
799 * if chunking dont worry about effects of
800 * snipping of message size on head flow control
801 * since it has a relatively small bearing on the
802 * data rate onto the streamn head.
803 */
804 if (!sbp->sb_chunk) {
805 /*
806 * set up hi/lo water marks on stream
807 * head read queue. unlikely to run out
808 * of resources. Fix at later date.
809 */
810 if ((mop = allocb(sizeof (struct stroptions),
811 BPRI_MED)) != NULL) {
812 struct stroptions *sop;
813 uint_t snap;
814 int fudge;
815
816 snap = *(uint_t *)mp->b_cont->b_rptr;
817 mop->b_datap->db_type = M_SETOPTS;
818 mop->b_wptr += sizeof (*sop);
819 sop = (struct stroptions *)mop->b_rptr;
820 sop->so_flags = SO_HIWAT | SO_LOWAT;
821 fudge = (snap <= 100) ? 4 :
822 (snap <= 400) ? 2 : 1;
823 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
824 sop->so_lowat = SNIT_LOWAT(snap, fudge);
825 qreply(wq, mop);
826 }
827 }
828
829 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
830
831 miocack(wq, mp, 0, 0);
832 }
833 break;
834
835 case SBIOCGSNAP:
836 /*
837 * Verify argument length
838 */
839 if (transparent != TRANSPARENT) {
840 error = miocpullup(mp, sizeof (uint_t));
841 if (error != 0) {
842 miocnak(wq, mp, 0, error);
843 break;
844 }
845 }
846
847 if (transparent == TRANSPARENT) {
848 datamp = allocb(sizeof (uint_t), BPRI_MED);
849 if (datamp == NULL) {
850 miocnak(wq, mp, 0, EAGAIN);
851 break;
852 }
853 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
854 }
855
856 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
857
858 if (transparent == TRANSPARENT)
859 qreply(wq, mp);
860 else
861 miocack(wq, mp, sizeof (uint_t), 0);
862 break;
863
864 case SBIOCSFLAGS:
865 /*
866 * set the flags.
867 */
868 if (iocp->ioc_count == TRANSPARENT) {
869 mcopyin(mp, NULL, sizeof (uint_t), NULL);
870 qreply(wq, mp);
871 } else {
872 error = miocpullup(mp, sizeof (uint_t));
873 if (error != 0) {
874 miocnak(wq, mp, 0, error);
875 break;
876 }
877 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
878 miocack(wq, mp, 0, 0);
879 }
880 break;
881
882 case SBIOCGFLAGS:
883 /*
884 * Verify argument length
885 */
886 if (transparent != TRANSPARENT) {
887 error = miocpullup(mp, sizeof (uint_t));
888 if (error != 0) {
889 miocnak(wq, mp, 0, error);
890 break;
891 }
892 }
893
894 if (transparent == TRANSPARENT) {
895 datamp = allocb(sizeof (uint_t), BPRI_MED);
896 if (datamp == NULL) {
897 miocnak(wq, mp, 0, EAGAIN);
898 break;
899 }
900 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
901 }
902
903 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
904
905 if (transparent == TRANSPARENT)
906 qreply(wq, mp);
907 else
908 miocack(wq, mp, sizeof (uint_t), 0);
909 break;
910
911
912 default:
913 putnext(wq, mp);
914 break;
915 }
916 }
917
918 /*
919 * Given a length l, calculate the amount of extra storage
920 * required to round it up to the next multiple of the alignment a.
921 */
922 #define RoundUpAmt(l, a) ((l) % (a) ? (a) - ((l) % (a)) : 0)
923 /*
924 * Calculate additional amount of space required for alignment.
925 */
926 #define Align(l) RoundUpAmt(l, sizeof (ulong_t))
927 /*
928 * Smallest possible message size when headers are enabled.
929 * This is used to calculate whether a chunk is nearly full.
930 */
931 #define SMALLEST_MESSAGE sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
932
933 /*
934 * Process a read-side M_DATA message.
935 *
936 * If the currently accumulating chunk doesn't have enough room
937 * for the message, close off the chunk, pass it upward, and start
938 * a new one. Then add the message to the current chunk, taking
939 * account of the possibility that the message's size exceeds the
940 * chunk size.
941 *
942 * If headers are enabled add an sb_hdr header and trailing alignment padding.
943 *
944 * To optimise performance the total number of msgbs should be kept
945 * to a minimum. This is achieved by using any remaining space in message N
946 * for both its own padding as well as the header of message N+1 if possible.
947 * If there's insufficient space we allocate one message to hold this 'wrapper'.
948 * (there's likely to be space beyond message N, since allocb would have
949 * rounded up the required size to one of the dblk_sizes).
950 *
951 */
952 static void
sbaddmsg(queue_t * rq,mblk_t * mp)953 sbaddmsg(queue_t *rq, mblk_t *mp)
954 {
955 struct sb *sbp;
956 struct timeval t;
957 struct sb_hdr hp;
958 mblk_t *wrapper; /* padding for msg N, header for msg N+1 */
959 mblk_t *last; /* last mblk of current message */
960 size_t wrapperlen; /* length of header + padding */
961 size_t origlen; /* data length before truncation */
962 size_t pad; /* bytes required to align header */
963
964 sbp = (struct sb *)rq->q_ptr;
965
966 origlen = msgdsize(mp);
967
968 /*
969 * Truncate the message.
970 */
971 if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
972 (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
973 hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
974 else
975 hp.sbh_totlen = hp.sbh_msglen = origlen;
976
977 if (sbp->sb_flags & SB_NO_HEADER) {
978
979 /*
980 * Would the inclusion of this message overflow the current
981 * chunk? If so close the chunk off and start a new one.
982 */
983 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
984 sbclosechunk(sbp);
985 /*
986 * First message too big for chunk - just send it up.
987 * This will always be true when we're not chunking.
988 */
989 if (hp.sbh_totlen > sbp->sb_chunk) {
990 sbsendit(rq, mp);
991 return;
992 }
993
994 /*
995 * We now know that the msg will fit in the chunk.
996 * Link it onto the end of the chunk.
997 * Since linkb() walks the entire chain, we keep a pointer to
998 * the first mblk of the last msgb added and call linkb on that
999 * that last message, rather than performing the
1000 * O(n) linkb() operation on the whole chain.
1001 * sb_head isn't needed in this SB_NO_HEADER mode.
1002 */
1003 if (sbp->sb_mp)
1004 linkb(sbp->sb_tail, mp);
1005 else
1006 sbp->sb_mp = mp;
1007
1008 sbp->sb_tail = mp;
1009 sbp->sb_mlen += hp.sbh_totlen;
1010 sbp->sb_mcount++;
1011 } else {
1012 /* Timestamp must be done immediately */
1013 uniqtime(&t);
1014 TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
1015
1016 pad = Align(hp.sbh_totlen);
1017 hp.sbh_totlen += sizeof (hp);
1018
1019 /* We can't fit this message on the current chunk. */
1020 if ((sbp->sb_mlen + hp.sbh_totlen) > sbp->sb_chunk)
1021 sbclosechunk(sbp);
1022
1023 /*
1024 * If we closed it (just now or during a previous
1025 * call) then allocate the head of a new chunk.
1026 */
1027 if (sbp->sb_head == NULL) {
1028 /* Allocate leading header of new chunk */
1029 sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
1030 if (sbp->sb_head == NULL) {
1031 /*
1032 * Memory allocation failure.
1033 * This will need to be revisited
1034 * since using certain flag combinations
1035 * can result in messages being dropped
1036 * silently.
1037 */
1038 freemsg(mp);
1039 sbp->sb_drops++;
1040 return;
1041 }
1042 sbp->sb_mp = sbp->sb_head;
1043 }
1044
1045 /*
1046 * Set the header values and join the message to the
1047 * chunk. The header values are copied into the chunk
1048 * after we adjust for padding below.
1049 */
1050 hp.sbh_drops = sbp->sb_drops;
1051 hp.sbh_origlen = origlen;
1052 linkb(sbp->sb_head, mp);
1053 sbp->sb_mcount++;
1054 sbp->sb_mlen += hp.sbh_totlen;
1055
1056 /*
1057 * There's no chance to fit another message on the
1058 * chunk -- forgo the padding and close the chunk.
1059 */
1060 if ((sbp->sb_mlen + pad + SMALLEST_MESSAGE) > sbp->sb_chunk) {
1061 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp,
1062 sizeof (hp));
1063 sbp->sb_head->b_wptr += sizeof (hp);
1064 ASSERT(sbp->sb_head->b_wptr <=
1065 sbp->sb_head->b_datap->db_lim);
1066 sbclosechunk(sbp);
1067 return;
1068 }
1069
1070 /*
1071 * We may add another message to this chunk -- adjust
1072 * the headers for padding to be added below.
1073 */
1074 hp.sbh_totlen += pad;
1075 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
1076 sbp->sb_head->b_wptr += sizeof (hp);
1077 ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
1078 sbp->sb_mlen += pad;
1079
1080 /*
1081 * Find space for the wrapper. The wrapper consists of:
1082 *
1083 * 1) Padding for this message (this is to ensure each header
1084 * begins on an 8 byte boundary in the userland buffer).
1085 *
1086 * 2) Space for the next message's header, in case the next
1087 * next message will fit in this chunk.
1088 *
1089 * It may be possible to append the wrapper to the last mblk
1090 * of the message, but only if we 'own' the data. If the dblk
1091 * has been shared through dupmsg() we mustn't alter it.
1092 */
1093 wrapperlen = (sizeof (hp) + pad);
1094
1095 /* Is there space for the wrapper beyond the message's data ? */
1096 for (last = mp; last->b_cont; last = last->b_cont)
1097 ;
1098
1099 if ((wrapperlen <= MBLKTAIL(last)) &&
1100 (last->b_datap->db_ref == 1)) {
1101 if (pad > 0) {
1102 /*
1103 * Pad with zeroes to the next pointer boundary
1104 * (we don't want to disclose kernel data to
1105 * users), then advance wptr.
1106 */
1107 (void) memset(last->b_wptr, 0, pad);
1108 last->b_wptr += pad;
1109 }
1110 /* Remember where to write the header information */
1111 sbp->sb_head = last;
1112 } else {
1113 /* Have to allocate additional space for the wrapper */
1114 wrapper = allocb(wrapperlen, BPRI_MED);
1115 if (wrapper == NULL) {
1116 sbclosechunk(sbp);
1117 return;
1118 }
1119 if (pad > 0) {
1120 /*
1121 * Pad with zeroes (we don't want to disclose
1122 * kernel data to users).
1123 */
1124 (void) memset(wrapper->b_wptr, 0, pad);
1125 wrapper->b_wptr += pad;
1126 }
1127 /* Link the wrapper msg onto the end of the chunk */
1128 linkb(mp, wrapper);
1129 /* Remember to write the next header in this wrapper */
1130 sbp->sb_head = wrapper;
1131 }
1132 }
1133 }
1134
1135 /*
1136 * Called from timeout().
1137 * Signal a timeout by passing a zero-length M_CTL msg in the read-side
1138 * to synchronize with any active module threads (open, close, wput, rput).
1139 */
1140 static void
sbtick(void * arg)1141 sbtick(void *arg)
1142 {
1143 struct sb *sbp = arg;
1144 queue_t *rq;
1145
1146 ASSERT(sbp);
1147
1148 rq = sbp->sb_rq;
1149 sbp->sb_timeoutid = 0; /* timeout has fired */
1150
1151 if (putctl(rq, M_CTL) == 0) /* failure */
1152 sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
1153 }
1154
1155 /*
1156 * Close off the currently accumulating chunk and pass
1157 * it upward. Takes care of resetting timers as well.
1158 *
1159 * This routine is called both directly and as a result
1160 * of the chunk timeout expiring.
1161 */
1162 static void
sbclosechunk(struct sb * sbp)1163 sbclosechunk(struct sb *sbp)
1164 {
1165 mblk_t *mp;
1166 queue_t *rq;
1167
1168 ASSERT(sbp);
1169
1170 if (sbp->sb_timeoutid) {
1171 (void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
1172 sbp->sb_timeoutid = 0;
1173 }
1174
1175 mp = sbp->sb_mp;
1176 rq = sbp->sb_rq;
1177
1178 /*
1179 * If there's currently a chunk in progress, close it off
1180 * and try to send it up.
1181 */
1182 if (mp) {
1183 sbsendit(rq, mp);
1184 }
1185
1186 /*
1187 * Clear old chunk. Ready for new msgs.
1188 */
1189 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
1190 sbp->sb_mlen = 0;
1191 sbp->sb_mcount = 0;
1192 if (sbp->sb_flags & SB_DEFER_CHUNK)
1193 sbp->sb_state &= ~SB_FRCVD;
1194
1195 }
1196
1197 static void
sbsendit(queue_t * rq,mblk_t * mp)1198 sbsendit(queue_t *rq, mblk_t *mp)
1199 {
1200 struct sb *sbp = (struct sb *)rq->q_ptr;
1201
1202 if (!canputnext(rq)) {
1203 if (sbp->sb_flags & SB_NO_DROPS)
1204 (void) putq(rq, mp);
1205 else {
1206 freemsg(mp);
1207 sbp->sb_drops += sbp->sb_mcount;
1208 }
1209 return;
1210 }
1211 /*
1212 * If there are messages on the q already, keep
1213 * queueing them since they need to be processed in order.
1214 */
1215 if (qsize(rq) > 0) {
1216 /* should only get here if SB_NO_DROPS */
1217 (void) putq(rq, mp);
1218 }
1219 else
1220 putnext(rq, mp);
1221 }
1222