xref: /illumos-gate/usr/src/uts/common/io/bufmod.c (revision 4eaa471005973e11a6110b69fe990530b3b95a38)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * STREAMS Buffering module
31  *
32  * This streams module collects incoming messages from modules below
33  * it on the stream and buffers them up into a smaller number of
34  * aggregated messages.  Its main purpose is to reduce overhead by
35  * cutting down on the number of read (or getmsg) calls its client
36  * user process makes.
37  *  - only M_DATA is buffered.
38  *  - multithreading assumes configured as D_MTQPAIR
39  *  - packets are lost only if flag SB_NO_HEADER is clear and buffer
40  *    allocation fails.
41  *  - in order message transmission. This is enforced for messages other
42  *    than high priority messages.
43  *  - zero length messages on the read side are not passed up the
44  *    stream but used internally for synchronization.
45  * FLAGS:
46  * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
47  *   (conversion is the default for backwards compatibility
48  *    hence the negative logic).
49  * - SB_NO_HEADER - no headers in buffered data.
50  *   (adding headers is the default for backwards compatibility
51  *    hence the negative logic).
52  * - SB_DEFER_CHUNK - provides improved response time in question-answer
53  *   applications. Buffering is not enabled until the second message
54  *   is received on the read side within the sb_ticks interval.
55  *   This option will often be used in combination with flag SB_SEND_ON_WRITE.
56  * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
57  *   data being immediately sent upstream.
58  * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
59  *   the blocked flow condition downstream. If this flag is clear (default)
60  *   messages will be dropped if the upstream flow is blocked.
61  */
62 
63 
64 #include	<sys/types.h>
65 #include	<sys/errno.h>
66 #include	<sys/debug.h>
67 #include	<sys/stropts.h>
68 #include	<sys/time.h>
69 #include	<sys/stream.h>
70 #include	<sys/conf.h>
71 #include	<sys/ddi.h>
72 #include	<sys/sunddi.h>
73 #include	<sys/kmem.h>
74 #include	<sys/strsun.h>
75 #include	<sys/bufmod.h>
76 #include	<sys/modctl.h>
77 #include	<sys/isa_defs.h>
78 
79 /*
80  * Per-Stream state information.
81  *
82  * If sb_ticks is negative, we don't deliver chunks until they're
83  * full.  If it's zero, we deliver every packet as it arrives.  (In
84  * this case we force sb_chunk to zero, to make the implementation
85  * easier.)  Otherwise, sb_ticks gives the number of ticks in a
86  * buffering interval. The interval begins when the a read side data
87  * message is received and a timeout is not active. If sb_snap is
88  * zero, no truncation of the msg is done.
89  */
90 struct sb {
91 	queue_t	*sb_rq;		/* our rq */
92 	mblk_t	*sb_mp;		/* partial chunk */
93 	mblk_t  *sb_head;	/* pre-allocated space for the next header */
94 	mblk_t	*sb_tail;	/* first mblk of last message appended */
95 	uint_t	sb_mlen;	/* sb_mp length */
96 	uint_t	sb_mcount;	/* input msg count in sb_mp */
97 	uint_t	sb_chunk;	/* max chunk size */
98 	clock_t	sb_ticks;	/* timeout interval */
99 	timeout_id_t sb_timeoutid; /* qtimeout() id */
100 	uint_t	sb_drops;	/* cumulative # discarded msgs */
101 	uint_t	sb_snap;	/* snapshot length */
102 	uint_t	sb_flags;	/* flags field */
103 	uint_t	sb_state;	/* state variable */
104 };
105 
106 /*
107  * Function prototypes.
108  */
109 static	int	sbopen(queue_t *, dev_t *, int, int, cred_t *);
110 static	int	sbclose(queue_t *, int, cred_t *);
111 static	void	sbwput(queue_t *, mblk_t *);
112 static	void	sbrput(queue_t *, mblk_t *);
113 static	void	sbrsrv(queue_t *);
114 static	void	sbioctl(queue_t *, mblk_t *);
115 static	void	sbaddmsg(queue_t *, mblk_t *);
116 static	void	sbtick(void *);
117 static	void	sbclosechunk(struct sb *);
118 static	void	sbsendit(queue_t *, mblk_t *);
119 
120 static struct module_info	sb_minfo = {
121 	21,		/* mi_idnum */
122 	"bufmod",	/* mi_idname */
123 	0,		/* mi_minpsz */
124 	INFPSZ,		/* mi_maxpsz */
125 	1,		/* mi_hiwat */
126 	0		/* mi_lowat */
127 };
128 
129 static struct qinit	sb_rinit = {
130 	(int (*)())sbrput,	/* qi_putp */
131 	(int (*)())sbrsrv,	/* qi_srvp */
132 	sbopen,			/* qi_qopen */
133 	sbclose,		/* qi_qclose */
134 	NULL,			/* qi_qadmin */
135 	&sb_minfo,		/* qi_minfo */
136 	NULL			/* qi_mstat */
137 };
138 
139 static struct qinit	sb_winit = {
140 	(int (*)())sbwput,	/* qi_putp */
141 	NULL,			/* qi_srvp */
142 	NULL,			/* qi_qopen */
143 	NULL,			/* qi_qclose */
144 	NULL,			/* qi_qadmin */
145 	&sb_minfo,		/* qi_minfo */
146 	NULL			/* qi_mstat */
147 };
148 
149 static struct streamtab	sb_info = {
150 	&sb_rinit,	/* st_rdinit */
151 	&sb_winit,	/* st_wrinit */
152 	NULL,		/* st_muxrinit */
153 	NULL		/* st_muxwinit */
154 };
155 
156 
157 /*
158  * This is the loadable module wrapper.
159  */
160 
161 static struct fmodsw fsw = {
162 	"bufmod",
163 	&sb_info,
164 	D_MTQPAIR | D_MP
165 };
166 
167 /*
168  * Module linkage information for the kernel.
169  */
170 
171 static struct modlstrmod modlstrmod = {
172 	&mod_strmodops, "streams buffer mod", &fsw
173 };
174 
175 static struct modlinkage modlinkage = {
176 	MODREV_1, &modlstrmod, NULL
177 };
178 
179 
180 int
181 _init(void)
182 {
183 	return (mod_install(&modlinkage));
184 }
185 
186 int
187 _fini(void)
188 {
189 	return (mod_remove(&modlinkage));
190 }
191 
192 int
193 _info(struct modinfo *modinfop)
194 {
195 	return (mod_info(&modlinkage, modinfop));
196 }
197 
198 
199 /* ARGSUSED */
200 static int
201 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
202 {
203 	struct sb	*sbp;
204 	ASSERT(rq);
205 
206 	if (sflag != MODOPEN)
207 		return (EINVAL);
208 
209 	if (rq->q_ptr)
210 		return (0);
211 
212 	/*
213 	 * Allocate and initialize per-Stream structure.
214 	 */
215 	sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
216 	sbp->sb_rq = rq;
217 	sbp->sb_ticks = -1;
218 	sbp->sb_chunk = SB_DFLT_CHUNK;
219 	sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
220 	sbp->sb_mlen = 0;
221 	sbp->sb_mcount = 0;
222 	sbp->sb_timeoutid = 0;
223 	sbp->sb_drops = 0;
224 	sbp->sb_snap = 0;
225 	sbp->sb_flags = 0;
226 	sbp->sb_state = 0;
227 
228 	rq->q_ptr = WR(rq)->q_ptr = sbp;
229 
230 	qprocson(rq);
231 
232 
233 	return (0);
234 }
235 
236 /* ARGSUSED1 */
237 static int
238 sbclose(queue_t *rq, int flag, cred_t *credp)
239 {
240 	struct	sb	*sbp = (struct sb *)rq->q_ptr;
241 
242 	ASSERT(sbp);
243 
244 	qprocsoff(rq);
245 	/*
246 	 * Cancel an outstanding timeout
247 	 */
248 	if (sbp->sb_timeoutid != 0) {
249 		(void) quntimeout(rq, sbp->sb_timeoutid);
250 		sbp->sb_timeoutid = 0;
251 	}
252 	/*
253 	 * Free the current chunk.
254 	 */
255 	if (sbp->sb_mp) {
256 		freemsg(sbp->sb_mp);
257 		sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
258 		sbp->sb_mlen = 0;
259 	}
260 
261 	/*
262 	 * Free the per-Stream structure.
263 	 */
264 	kmem_free((caddr_t)sbp, sizeof (struct sb));
265 	rq->q_ptr = WR(rq)->q_ptr = NULL;
266 
267 	return (0);
268 }
269 
270 /*
271  * the correction factor is introduced to compensate for
272  * whatever assumptions the modules below have made about
273  * how much traffic is flowing through the stream and the fact
274  * that bufmod may be snipping messages with the sb_snap length.
275  */
276 #define	SNIT_HIWAT(msgsize, fudge)	((4 * msgsize * fudge) + 512)
277 #define	SNIT_LOWAT(msgsize, fudge)	((2 * msgsize * fudge) + 256)
278 
279 
280 static void
281 sbioc(queue_t *wq, mblk_t *mp)
282 {
283 	struct iocblk *iocp;
284 	struct sb *sbp = (struct sb *)wq->q_ptr;
285 	clock_t	ticks;
286 	mblk_t	*mop;
287 
288 	iocp = (struct iocblk *)mp->b_rptr;
289 
290 	switch (iocp->ioc_cmd) {
291 	case SBIOCGCHUNK:
292 	case SBIOCGSNAP:
293 	case SBIOCGFLAGS:
294 	case SBIOCGTIME:
295 		miocack(wq, mp, 0, 0);
296 		return;
297 
298 	case SBIOCSTIME:
299 #ifdef _SYSCALL32_IMPL
300 		if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
301 			struct timeval32 *t32;
302 
303 			t32 = (struct timeval32 *)mp->b_cont->b_rptr;
304 			if (t32->tv_sec < 0 || t32->tv_usec < 0) {
305 				miocnak(wq, mp, 0, EINVAL);
306 				break;
307 			}
308 			ticks = TIMEVAL_TO_TICK(t32);
309 		} else
310 #endif /* _SYSCALL32_IMPL */
311 		{
312 			struct timeval *tb;
313 
314 			tb = (struct timeval *)mp->b_cont->b_rptr;
315 
316 			if (tb->tv_sec < 0 || tb->tv_usec < 0) {
317 				miocnak(wq, mp, 0, EINVAL);
318 				break;
319 			}
320 			ticks = TIMEVAL_TO_TICK(tb);
321 		}
322 		sbp->sb_ticks = ticks;
323 		if (ticks == 0)
324 			sbp->sb_chunk = 0;
325 		miocack(wq, mp, 0, 0);
326 		sbclosechunk(sbp);
327 		return;
328 
329 	case SBIOCSCHUNK:
330 		/*
331 		 * set up hi/lo water marks on stream head read queue.
332 		 * unlikely to run out of resources. Fix at later date.
333 		 */
334 		if ((mop = allocb(sizeof (struct stroptions),
335 		    BPRI_MED)) != NULL) {
336 			struct stroptions *sop;
337 			uint_t chunk;
338 
339 			chunk = *(uint_t *)mp->b_cont->b_rptr;
340 			mop->b_datap->db_type = M_SETOPTS;
341 			mop->b_wptr += sizeof (struct stroptions);
342 			sop = (struct stroptions *)mop->b_rptr;
343 			sop->so_flags = SO_HIWAT | SO_LOWAT;
344 			sop->so_hiwat = SNIT_HIWAT(chunk, 1);
345 			sop->so_lowat = SNIT_LOWAT(chunk, 1);
346 			qreply(wq, mop);
347 		}
348 
349 		sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
350 		miocack(wq, mp, 0, 0);
351 		sbclosechunk(sbp);
352 		return;
353 
354 	case SBIOCSFLAGS:
355 		sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
356 		miocack(wq, mp, 0, 0);
357 		return;
358 
359 	case SBIOCSSNAP:
360 		/*
361 		 * if chunking dont worry about effects of
362 		 * snipping of message size on head flow control
363 		 * since it has a relatively small bearing on the
364 		 * data rate onto the streamn head.
365 		 */
366 		if (!sbp->sb_chunk) {
367 			/*
368 			 * set up hi/lo water marks on stream head read queue.
369 			 * unlikely to run out of resources. Fix at later date.
370 			 */
371 			if ((mop = allocb(sizeof (struct stroptions),
372 			    BPRI_MED)) != NULL) {
373 				struct stroptions *sop;
374 				uint_t snap;
375 				int fudge;
376 
377 				snap = *(uint_t *)mp->b_cont->b_rptr;
378 				mop->b_datap->db_type = M_SETOPTS;
379 				mop->b_wptr += sizeof (struct stroptions);
380 				sop = (struct stroptions *)mop->b_rptr;
381 				sop->so_flags = SO_HIWAT | SO_LOWAT;
382 				fudge = snap <= 100 ?   4 :
383 				    snap <= 400 ?   2 :
384 				    1;
385 				sop->so_hiwat = SNIT_HIWAT(snap, fudge);
386 				sop->so_lowat = SNIT_LOWAT(snap, fudge);
387 				qreply(wq, mop);
388 			}
389 		}
390 
391 		sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
392 		miocack(wq, mp, 0, 0);
393 		return;
394 
395 	default:
396 		ASSERT(0);
397 		return;
398 	}
399 }
400 
401 /*
402  * Write-side put procedure.  Its main task is to detect ioctls
403  * for manipulating the buffering state and hand them to sbioctl.
404  * Other message types are passed on through.
405  */
406 static void
407 sbwput(queue_t *wq, mblk_t *mp)
408 {
409 	struct	sb	*sbp = (struct sb *)wq->q_ptr;
410 	struct copyresp *resp;
411 
412 	if (sbp->sb_flags & SB_SEND_ON_WRITE)
413 		sbclosechunk(sbp);
414 	switch (mp->b_datap->db_type) {
415 	case M_IOCTL:
416 		sbioctl(wq, mp);
417 		break;
418 
419 	case M_IOCDATA:
420 		resp = (struct copyresp *)mp->b_rptr;
421 		if (resp->cp_rval) {
422 			/*
423 			 * Just free message on failure.
424 			 */
425 			freemsg(mp);
426 			break;
427 		}
428 
429 		switch (resp->cp_cmd) {
430 		case SBIOCSTIME:
431 		case SBIOCSCHUNK:
432 		case SBIOCSFLAGS:
433 		case SBIOCSSNAP:
434 		case SBIOCGTIME:
435 		case SBIOCGCHUNK:
436 		case SBIOCGSNAP:
437 		case SBIOCGFLAGS:
438 			sbioc(wq, mp);
439 			break;
440 
441 		default:
442 			putnext(wq, mp);
443 			break;
444 		}
445 		break;
446 
447 	default:
448 		putnext(wq, mp);
449 		break;
450 	}
451 }
452 
453 /*
454  * Read-side put procedure.  It's responsible for buffering up incoming
455  * messages and grouping them into aggregates according to the current
456  * buffering parameters.
457  */
458 static void
459 sbrput(queue_t *rq, mblk_t *mp)
460 {
461 	struct	sb	*sbp = (struct sb *)rq->q_ptr;
462 
463 	ASSERT(sbp);
464 
465 	switch (mp->b_datap->db_type) {
466 	case M_PROTO:
467 		if (sbp->sb_flags & SB_NO_PROTO_CVT) {
468 			sbclosechunk(sbp);
469 			sbsendit(rq, mp);
470 			break;
471 		} else {
472 			/*
473 			 * Convert M_PROTO to M_DATA.
474 			 */
475 			mp->b_datap->db_type = M_DATA;
476 		}
477 		/* FALLTHRU */
478 
479 	case M_DATA:
480 		if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
481 		    !(sbp->sb_state & SB_FRCVD)) {
482 			sbclosechunk(sbp);
483 			sbsendit(rq, mp);
484 			sbp->sb_state |= SB_FRCVD;
485 		} else
486 			sbaddmsg(rq, mp);
487 
488 		if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
489 			sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
490 			    sbp, sbp->sb_ticks);
491 
492 		break;
493 
494 	case M_FLUSH:
495 		if (*mp->b_rptr & FLUSHR) {
496 			/*
497 			 * Reset timeout, flush the chunk currently in
498 			 * progress, and start a new chunk.
499 			 */
500 			if (sbp->sb_timeoutid) {
501 				(void) quntimeout(sbp->sb_rq,
502 				    sbp->sb_timeoutid);
503 				sbp->sb_timeoutid = 0;
504 			}
505 			if (sbp->sb_mp) {
506 				freemsg(sbp->sb_mp);
507 				sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
508 				sbp->sb_mlen = 0;
509 				sbp->sb_mcount = 0;
510 			}
511 			flushq(rq, FLUSHALL);
512 		}
513 		putnext(rq, mp);
514 		break;
515 
516 	case M_CTL:
517 		/*
518 		 * Zero-length M_CTL means our timeout() popped.
519 		 */
520 		if (MBLKL(mp) == 0) {
521 			freemsg(mp);
522 			sbclosechunk(sbp);
523 		} else {
524 			sbclosechunk(sbp);
525 			sbsendit(rq, mp);
526 		}
527 		break;
528 
529 	default:
530 		if (mp->b_datap->db_type <= QPCTL) {
531 			sbclosechunk(sbp);
532 			sbsendit(rq, mp);
533 		} else {
534 			/* Note: out of band */
535 			putnext(rq, mp);
536 		}
537 		break;
538 	}
539 }
540 
541 /*
542  *  read service procedure.
543  */
544 /* ARGSUSED */
545 static void
546 sbrsrv(queue_t *rq)
547 {
548 	mblk_t	*mp;
549 
550 	/*
551 	 * High priority messages shouldn't get here but if
552 	 * one does, jam it through to avoid infinite loop.
553 	 */
554 	while ((mp = getq(rq)) != NULL) {
555 		if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
556 			/* should only get here if SB_NO_SROPS */
557 			(void) putbq(rq, mp);
558 			return;
559 		}
560 		putnext(rq, mp);
561 	}
562 }
563 
564 /*
565  * Handle write-side M_IOCTL messages.
566  */
567 static void
568 sbioctl(queue_t *wq, mblk_t *mp)
569 {
570 	struct	sb	*sbp = (struct sb *)wq->q_ptr;
571 	struct iocblk	*iocp = (struct iocblk *)mp->b_rptr;
572 	struct	timeval	*t;
573 	clock_t	ticks;
574 	mblk_t	*mop;
575 	int	transparent = iocp->ioc_count;
576 	mblk_t	*datamp;
577 	int	error;
578 
579 	switch (iocp->ioc_cmd) {
580 	case SBIOCSTIME:
581 		if (iocp->ioc_count == TRANSPARENT) {
582 #ifdef _SYSCALL32_IMPL
583 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
584 				mcopyin(mp, NULL, sizeof (struct timeval32),
585 				    NULL);
586 			} else
587 #endif /* _SYSCALL32_IMPL */
588 			{
589 				mcopyin(mp, NULL, sizeof (*t), NULL);
590 			}
591 			qreply(wq, mp);
592 		} else {
593 			/*
594 			 * Verify argument length.
595 			 */
596 #ifdef _SYSCALL32_IMPL
597 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
598 				struct timeval32 *t32;
599 
600 				error = miocpullup(mp,
601 				    sizeof (struct timeval32));
602 				if (error != 0) {
603 					miocnak(wq, mp, 0, error);
604 					break;
605 				}
606 				t32 = (struct timeval32 *)mp->b_cont->b_rptr;
607 				if (t32->tv_sec < 0 || t32->tv_usec < 0) {
608 					miocnak(wq, mp, 0, EINVAL);
609 					break;
610 				}
611 				ticks = TIMEVAL_TO_TICK(t32);
612 			} else
613 #endif /* _SYSCALL32_IMPL */
614 			{
615 				error = miocpullup(mp, sizeof (struct timeval));
616 				if (error != 0) {
617 					miocnak(wq, mp, 0, error);
618 					break;
619 				}
620 
621 				t = (struct timeval *)mp->b_cont->b_rptr;
622 				if (t->tv_sec < 0 || t->tv_usec < 0) {
623 					miocnak(wq, mp, 0, EINVAL);
624 					break;
625 				}
626 				ticks = TIMEVAL_TO_TICK(t);
627 			}
628 			sbp->sb_ticks = ticks;
629 			if (ticks == 0)
630 				sbp->sb_chunk = 0;
631 			miocack(wq, mp, 0, 0);
632 			sbclosechunk(sbp);
633 		}
634 		break;
635 
636 	case SBIOCGTIME: {
637 		struct timeval *t;
638 
639 		/*
640 		 * Verify argument length.
641 		 */
642 		if (transparent != TRANSPARENT) {
643 #ifdef _SYSCALL32_IMPL
644 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
645 				error = miocpullup(mp,
646 				    sizeof (struct timeval32));
647 				if (error != 0) {
648 					miocnak(wq, mp, 0, error);
649 					break;
650 				}
651 			} else
652 #endif /* _SYSCALL32_IMPL */
653 			error = miocpullup(mp, sizeof (struct timeval));
654 			if (error != 0) {
655 				miocnak(wq, mp, 0, error);
656 				break;
657 			}
658 		}
659 
660 		/*
661 		 * If infinite timeout, return range error
662 		 * for the ioctl.
663 		 */
664 		if (sbp->sb_ticks < 0) {
665 			miocnak(wq, mp, 0, ERANGE);
666 			break;
667 		}
668 
669 #ifdef _SYSCALL32_IMPL
670 		if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
671 			struct timeval32 *t32;
672 
673 			if (transparent == TRANSPARENT) {
674 				datamp = allocb(sizeof (*t32), BPRI_MED);
675 				if (datamp == NULL) {
676 					miocnak(wq, mp, 0, EAGAIN);
677 					break;
678 				}
679 				mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
680 			}
681 
682 			t32 = (struct timeval32 *)mp->b_cont->b_rptr;
683 			TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
684 
685 			if (transparent == TRANSPARENT)
686 				qreply(wq, mp);
687 			else
688 				miocack(wq, mp, sizeof (*t32), 0);
689 		} else
690 #endif /* _SYSCALL32_IMPL */
691 		{
692 			if (transparent == TRANSPARENT) {
693 				datamp = allocb(sizeof (*t), BPRI_MED);
694 				if (datamp == NULL) {
695 					miocnak(wq, mp, 0, EAGAIN);
696 					break;
697 				}
698 				mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
699 			}
700 
701 			t = (struct timeval *)mp->b_cont->b_rptr;
702 			TICK_TO_TIMEVAL(sbp->sb_ticks, t);
703 
704 			if (transparent == TRANSPARENT)
705 				qreply(wq, mp);
706 			else
707 				miocack(wq, mp, sizeof (*t), 0);
708 		}
709 		break;
710 	}
711 
712 	case SBIOCCTIME:
713 		sbp->sb_ticks = -1;
714 		miocack(wq, mp, 0, 0);
715 		break;
716 
717 	case SBIOCSCHUNK:
718 		if (iocp->ioc_count == TRANSPARENT) {
719 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
720 			qreply(wq, mp);
721 		} else {
722 			/*
723 			 * Verify argument length.
724 			 */
725 			error = miocpullup(mp, sizeof (uint_t));
726 			if (error != 0) {
727 				miocnak(wq, mp, 0, error);
728 				break;
729 			}
730 
731 			/*
732 			 * set up hi/lo water marks on stream head read queue.
733 			 * unlikely to run out of resources. Fix at later date.
734 			 */
735 			if ((mop = allocb(sizeof (struct stroptions),
736 			    BPRI_MED)) != NULL) {
737 				struct stroptions *sop;
738 				uint_t chunk;
739 
740 				chunk = *(uint_t *)mp->b_cont->b_rptr;
741 				mop->b_datap->db_type = M_SETOPTS;
742 				mop->b_wptr += sizeof (struct stroptions);
743 				sop = (struct stroptions *)mop->b_rptr;
744 				sop->so_flags = SO_HIWAT | SO_LOWAT;
745 				sop->so_hiwat = SNIT_HIWAT(chunk, 1);
746 				sop->so_lowat = SNIT_LOWAT(chunk, 1);
747 				qreply(wq, mop);
748 			}
749 
750 			sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
751 			miocack(wq, mp, 0, 0);
752 			sbclosechunk(sbp);
753 		}
754 		break;
755 
756 	case SBIOCGCHUNK:
757 		/*
758 		 * Verify argument length.
759 		 */
760 		if (transparent != TRANSPARENT) {
761 			error = miocpullup(mp, sizeof (uint_t));
762 			if (error != 0) {
763 				miocnak(wq, mp, 0, error);
764 				break;
765 			}
766 		}
767 
768 		if (transparent == TRANSPARENT) {
769 			datamp = allocb(sizeof (uint_t), BPRI_MED);
770 			if (datamp == NULL) {
771 				miocnak(wq, mp, 0, EAGAIN);
772 				break;
773 			}
774 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
775 		}
776 
777 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
778 
779 		if (transparent == TRANSPARENT)
780 			qreply(wq, mp);
781 		else
782 			miocack(wq, mp, sizeof (uint_t), 0);
783 		break;
784 
785 	case SBIOCSSNAP:
786 		if (iocp->ioc_count == TRANSPARENT) {
787 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
788 			qreply(wq, mp);
789 		} else {
790 			/*
791 			 * Verify argument length.
792 			 */
793 			error = miocpullup(mp, sizeof (uint_t));
794 			if (error != 0) {
795 				miocnak(wq, mp, 0, error);
796 				break;
797 			}
798 
799 			/*
800 			 * if chunking dont worry about effects of
801 			 * snipping of message size on head flow control
802 			 * since it has a relatively small bearing on the
803 			 * data rate onto the streamn head.
804 			 */
805 			if (!sbp->sb_chunk) {
806 				/*
807 				 * set up hi/lo water marks on stream
808 				 * head read queue.  unlikely to run out
809 				 * of resources. Fix at later date.
810 				 */
811 				if ((mop = allocb(sizeof (struct stroptions),
812 				    BPRI_MED)) != NULL) {
813 					struct stroptions *sop;
814 					uint_t snap;
815 					int fudge;
816 
817 					snap = *(uint_t *)mp->b_cont->b_rptr;
818 					mop->b_datap->db_type = M_SETOPTS;
819 					mop->b_wptr += sizeof (*sop);
820 					sop = (struct stroptions *)mop->b_rptr;
821 					sop->so_flags = SO_HIWAT | SO_LOWAT;
822 					fudge = (snap <= 100) ? 4 :
823 					    (snap <= 400) ? 2 : 1;
824 					sop->so_hiwat = SNIT_HIWAT(snap, fudge);
825 					sop->so_lowat = SNIT_LOWAT(snap, fudge);
826 					qreply(wq, mop);
827 				}
828 			}
829 
830 			sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
831 
832 			miocack(wq, mp, 0, 0);
833 		}
834 		break;
835 
836 	case SBIOCGSNAP:
837 		/*
838 		 * Verify argument length
839 		 */
840 		if (transparent != TRANSPARENT) {
841 			error = miocpullup(mp, sizeof (uint_t));
842 			if (error != 0) {
843 				miocnak(wq, mp, 0, error);
844 				break;
845 			}
846 		}
847 
848 		if (transparent == TRANSPARENT) {
849 			datamp = allocb(sizeof (uint_t), BPRI_MED);
850 			if (datamp == NULL) {
851 				miocnak(wq, mp, 0, EAGAIN);
852 				break;
853 			}
854 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
855 		}
856 
857 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
858 
859 		if (transparent == TRANSPARENT)
860 			qreply(wq, mp);
861 		else
862 			miocack(wq, mp, sizeof (uint_t), 0);
863 		break;
864 
865 	case SBIOCSFLAGS:
866 		/*
867 		 * set the flags.
868 		 */
869 		if (iocp->ioc_count == TRANSPARENT) {
870 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
871 			qreply(wq, mp);
872 		} else {
873 			error = miocpullup(mp, sizeof (uint_t));
874 			if (error != 0) {
875 				miocnak(wq, mp, 0, error);
876 				break;
877 			}
878 			sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
879 			miocack(wq, mp, 0, 0);
880 		}
881 		break;
882 
883 	case SBIOCGFLAGS:
884 		/*
885 		 * Verify argument length
886 		 */
887 		if (transparent != TRANSPARENT) {
888 			error = miocpullup(mp, sizeof (uint_t));
889 			if (error != 0) {
890 				miocnak(wq, mp, 0, error);
891 				break;
892 			}
893 		}
894 
895 		if (transparent == TRANSPARENT) {
896 			datamp = allocb(sizeof (uint_t), BPRI_MED);
897 			if (datamp == NULL) {
898 				miocnak(wq, mp, 0, EAGAIN);
899 				break;
900 			}
901 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
902 		}
903 
904 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
905 
906 		if (transparent == TRANSPARENT)
907 			qreply(wq, mp);
908 		else
909 			miocack(wq, mp, sizeof (uint_t), 0);
910 		break;
911 
912 
913 	default:
914 		putnext(wq, mp);
915 		break;
916 	}
917 }
918 
919 /*
920  * Given a length l, calculate the amount of extra storage
921  * required to round it up to the next multiple of the alignment a.
922  */
923 #define	RoundUpAmt(l, a)	((l) % (a) ? (a) - ((l) % (a)) : 0)
924 /*
925  * Calculate additional amount of space required for alignment.
926  */
927 #define	Align(l)		RoundUpAmt(l, sizeof (ulong_t))
928 /*
929  * Smallest possible message size when headers are enabled.
930  * This is used to calculate whether a chunk is nearly full.
931  */
932 #define	SMALLEST_MESSAGE	sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
933 
934 /*
935  * Process a read-side M_DATA message.
936  *
937  * If the currently accumulating chunk doesn't have enough room
938  * for the message, close off the chunk, pass it upward, and start
939  * a new one.  Then add the message to the current chunk, taking
940  * account of the possibility that the message's size exceeds the
941  * chunk size.
942  *
943  * If headers are enabled add an sb_hdr header and trailing alignment padding.
944  *
945  * To optimise performance the total number of msgbs should be kept
946  * to a minimum. This is achieved by using any remaining space in message N
947  * for both its own padding as well as the header of message N+1 if possible.
948  * If there's insufficient space we allocate one message to hold this 'wrapper'.
949  * (there's likely to be space beyond message N, since allocb would have
950  * rounded up the required size to one of the dblk_sizes).
951  *
952  */
953 static void
954 sbaddmsg(queue_t *rq, mblk_t *mp)
955 {
956 	struct sb	*sbp;
957 	struct timeval	t;
958 	struct sb_hdr	hp;
959 	mblk_t *wrapper;	/* padding for msg N, header for msg N+1 */
960 	mblk_t *last;		/* last mblk of current message */
961 	size_t wrapperlen;	/* length of header + padding */
962 	size_t origlen;		/* data length before truncation */
963 	size_t pad;		/* bytes required to align header */
964 
965 	sbp = (struct sb *)rq->q_ptr;
966 
967 	origlen = msgdsize(mp);
968 
969 	/*
970 	 * Truncate the message.
971 	 */
972 	if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
973 			(adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
974 		hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
975 	else
976 		hp.sbh_totlen = hp.sbh_msglen = origlen;
977 
978 	if (sbp->sb_flags & SB_NO_HEADER) {
979 
980 		/*
981 		 * Would the inclusion of this message overflow the current
982 		 * chunk? If so close the chunk off and start a new one.
983 		 */
984 		if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
985 			sbclosechunk(sbp);
986 		/*
987 		 * First message too big for chunk - just send it up.
988 		 * This will always be true when we're not chunking.
989 		 */
990 		if (hp.sbh_totlen > sbp->sb_chunk) {
991 			sbsendit(rq, mp);
992 			return;
993 		}
994 
995 		/*
996 		 * We now know that the msg will fit in the chunk.
997 		 * Link it onto the end of the chunk.
998 		 * Since linkb() walks the entire chain, we keep a pointer to
999 		 * the first mblk of the last msgb added and call linkb on that
1000 		 * that last message, rather than performing the
1001 		 * O(n) linkb() operation on the whole chain.
1002 		 * sb_head isn't needed in this SB_NO_HEADER mode.
1003 		 */
1004 		if (sbp->sb_mp)
1005 			linkb(sbp->sb_tail, mp);
1006 		else
1007 			sbp->sb_mp = mp;
1008 
1009 		sbp->sb_tail = mp;
1010 		sbp->sb_mlen += hp.sbh_totlen;
1011 		sbp->sb_mcount++;
1012 	} else {
1013 		/* Timestamp must be done immediately */
1014 		uniqtime(&t);
1015 		TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
1016 
1017 		pad = Align(hp.sbh_totlen);
1018 		hp.sbh_totlen += sizeof (hp);
1019 		hp.sbh_totlen += pad;
1020 
1021 		/*
1022 		 * Would the inclusion of this message overflow the current
1023 		 * chunk? If so close the chunk off and start a new one.
1024 		 */
1025 		if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
1026 				sbclosechunk(sbp);
1027 
1028 		if (sbp->sb_head == NULL) {
1029 			/* Allocate leading header of new chunk */
1030 			sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
1031 			if (sbp->sb_head == NULL) {
1032 				/*
1033 				 * Memory allocation failure.
1034 				 * This will need to be revisited
1035 				 * since using certain flag combinations
1036 				 * can result in messages being dropped
1037 				 * silently.
1038 				 */
1039 				freemsg(mp);
1040 				sbp->sb_drops++;
1041 				return;
1042 			}
1043 			sbp->sb_mp = sbp->sb_head;
1044 		}
1045 
1046 		/*
1047 		 * Copy header into message
1048 		 */
1049 		hp.sbh_drops = sbp->sb_drops;
1050 		hp.sbh_origlen = origlen;
1051 		(void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
1052 		sbp->sb_head->b_wptr += sizeof (hp);
1053 
1054 		ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
1055 
1056 		/*
1057 		 * Join message to the chunk
1058 		 */
1059 		linkb(sbp->sb_head, mp);
1060 
1061 		sbp->sb_mcount++;
1062 		sbp->sb_mlen += hp.sbh_totlen;
1063 
1064 		/*
1065 		 * If the first message alone is too big for the chunk close
1066 		 * the chunk now.
1067 		 * If the next message would immediately cause the chunk to
1068 		 * overflow we may as well close the chunk now. The next
1069 		 * message is certain to be at least SMALLEST_MESSAGE size.
1070 		 */
1071 		if (hp.sbh_totlen + SMALLEST_MESSAGE > sbp->sb_chunk) {
1072 			sbclosechunk(sbp);
1073 			return;
1074 		}
1075 
1076 		/*
1077 		 * Find space for the wrapper. The wrapper consists of:
1078 		 *
1079 		 * 1) Padding for this message (this is to ensure each header
1080 		 * begins on an 8 byte boundary in the userland buffer).
1081 		 *
1082 		 * 2) Space for the next message's header, in case the next
1083 		 * next message will fit in this chunk.
1084 		 *
1085 		 * It may be possible to append the wrapper to the last mblk
1086 		 * of the message, but only if we 'own' the data. If the dblk
1087 		 * has been shared through dupmsg() we mustn't alter it.
1088 		 */
1089 
1090 		wrapperlen = (sizeof (hp) + pad);
1091 
1092 		/* Is there space for the wrapper beyond the message's data ? */
1093 		for (last = mp; last->b_cont; last = last->b_cont)
1094 			;
1095 
1096 		if ((wrapperlen <= MBLKTAIL(last)) &&
1097 			(last->b_datap->db_ref == 1)) {
1098 			if (pad > 0) {
1099 				/*
1100 				 * Pad with zeroes to the next pointer boundary
1101 				 * (we don't want to disclose kernel data to
1102 				 * users), then advance wptr.
1103 				 */
1104 				(void) memset(last->b_wptr, 0, pad);
1105 				last->b_wptr += pad;
1106 			}
1107 			/* Remember where to write the header information */
1108 			sbp->sb_head = last;
1109 		} else {
1110 			/* Have to allocate additional space for the wrapper */
1111 			wrapper = allocb(wrapperlen, BPRI_MED);
1112 			if (wrapper == NULL) {
1113 				sbclosechunk(sbp);
1114 				return;
1115 			}
1116 			if (pad > 0) {
1117 				/*
1118 				 * Pad with zeroes (we don't want to disclose
1119 				 * kernel data to users).
1120 				 */
1121 				(void) memset(wrapper->b_wptr, 0, pad);
1122 				wrapper->b_wptr += pad;
1123 			}
1124 			/* Link the wrapper msg onto the end of the chunk */
1125 			linkb(mp, wrapper);
1126 			/* Remember to write the next header in this wrapper */
1127 			sbp->sb_head = wrapper;
1128 		}
1129 	}
1130 }
1131 
1132 /*
1133  * Called from timeout().
1134  * Signal a timeout by passing a zero-length M_CTL msg in the read-side
1135  * to synchronize with any active module threads (open, close, wput, rput).
1136  */
1137 static void
1138 sbtick(void *arg)
1139 {
1140 	struct sb *sbp = arg;
1141 	queue_t	*rq;
1142 
1143 	ASSERT(sbp);
1144 
1145 	rq = sbp->sb_rq;
1146 	sbp->sb_timeoutid = 0;		/* timeout has fired */
1147 
1148 	if (putctl(rq, M_CTL) == 0)	/* failure */
1149 		sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
1150 }
1151 
1152 /*
1153  * Close off the currently accumulating chunk and pass
1154  * it upward.  Takes care of resetting timers as well.
1155  *
1156  * This routine is called both directly and as a result
1157  * of the chunk timeout expiring.
1158  */
1159 static void
1160 sbclosechunk(struct sb *sbp)
1161 {
1162 	mblk_t	*mp;
1163 	queue_t	*rq;
1164 
1165 	ASSERT(sbp);
1166 
1167 	if (sbp->sb_timeoutid) {
1168 		(void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
1169 		sbp->sb_timeoutid = 0;
1170 	}
1171 
1172 	mp = sbp->sb_mp;
1173 	rq = sbp->sb_rq;
1174 
1175 	/*
1176 	 * If there's currently a chunk in progress, close it off
1177 	 * and try to send it up.
1178 	 */
1179 	if (mp) {
1180 		sbsendit(rq, mp);
1181 	}
1182 
1183 	/*
1184 	 * Clear old chunk.  Ready for new msgs.
1185 	 */
1186 	sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
1187 	sbp->sb_mlen = 0;
1188 	sbp->sb_mcount = 0;
1189 	if (sbp->sb_flags & SB_DEFER_CHUNK)
1190 		sbp->sb_state &= ~SB_FRCVD;
1191 
1192 }
1193 
1194 static void
1195 sbsendit(queue_t *rq, mblk_t *mp)
1196 {
1197 	struct	sb	*sbp = (struct sb *)rq->q_ptr;
1198 
1199 	if (!canputnext(rq)) {
1200 		if (sbp->sb_flags & SB_NO_DROPS)
1201 			(void) putq(rq, mp);
1202 		else {
1203 			freemsg(mp);
1204 			sbp->sb_drops += sbp->sb_mcount;
1205 		}
1206 		return;
1207 	}
1208 	/*
1209 	 * If there are messages on the q already, keep
1210 	 * queueing them since they need to be processed in order.
1211 	 */
1212 	if (qsize(rq) > 0) {
1213 		/* should only get here if SB_NO_DROPS */
1214 		(void) putq(rq, mp);
1215 	}
1216 	else
1217 		putnext(rq, mp);
1218 }
1219