xref: /illumos-gate/usr/src/uts/common/io/bufmod.c (revision 20a7641f9918de8574b8b3b47dbe35c4bfc78df1)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  * Copyright 2018 Joyent, Inc.
26  */
27 
28 /*
29  * STREAMS Buffering module
30  *
31  * This streams module collects incoming messages from modules below
32  * it on the stream and buffers them up into a smaller number of
33  * aggregated messages.  Its main purpose is to reduce overhead by
34  * cutting down on the number of read (or getmsg) calls its client
35  * user process makes.
36  *  - only M_DATA is buffered.
37  *  - multithreading assumes configured as D_MTQPAIR
38  *  - packets are lost only if flag SB_NO_HEADER is clear and buffer
39  *    allocation fails.
40  *  - in order message transmission. This is enforced for messages other
41  *    than high priority messages.
42  *  - zero length messages on the read side are not passed up the
43  *    stream but used internally for synchronization.
44  * FLAGS:
45  * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
46  *   (conversion is the default for backwards compatibility
47  *    hence the negative logic).
48  * - SB_NO_HEADER - no headers in buffered data.
49  *   (adding headers is the default for backwards compatibility
50  *    hence the negative logic).
51  * - SB_DEFER_CHUNK - provides improved response time in question-answer
52  *   applications. Buffering is not enabled until the second message
53  *   is received on the read side within the sb_ticks interval.
54  *   This option will often be used in combination with flag SB_SEND_ON_WRITE.
55  * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
56  *   data being immediately sent upstream.
57  * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
58  *   the blocked flow condition downstream. If this flag is clear (default)
59  *   messages will be dropped if the upstream flow is blocked.
60  */
61 
62 
63 #include	<sys/types.h>
64 #include	<sys/errno.h>
65 #include	<sys/debug.h>
66 #include	<sys/stropts.h>
67 #include	<sys/time.h>
68 #include	<sys/stream.h>
69 #include	<sys/conf.h>
70 #include	<sys/ddi.h>
71 #include	<sys/sunddi.h>
72 #include	<sys/kmem.h>
73 #include	<sys/strsun.h>
74 #include	<sys/bufmod.h>
75 #include	<sys/modctl.h>
76 #include	<sys/isa_defs.h>
77 
78 /*
79  * Per-Stream state information.
80  *
81  * If sb_ticks is negative, we don't deliver chunks until they're
82  * full.  If it's zero, we deliver every packet as it arrives.  (In
83  * this case we force sb_chunk to zero, to make the implementation
84  * easier.)  Otherwise, sb_ticks gives the number of ticks in a
85  * buffering interval. The interval begins when the a read side data
86  * message is received and a timeout is not active. If sb_snap is
87  * zero, no truncation of the msg is done.
88  */
89 struct sb {
90 	queue_t	*sb_rq;		/* our rq */
91 	mblk_t	*sb_mp;		/* partial chunk */
92 	mblk_t  *sb_head;	/* pre-allocated space for the next header */
93 	mblk_t	*sb_tail;	/* first mblk of last message appended */
94 	uint_t	sb_mlen;	/* sb_mp length */
95 	uint_t	sb_mcount;	/* input msg count in sb_mp */
96 	uint_t	sb_chunk;	/* max chunk size */
97 	clock_t	sb_ticks;	/* timeout interval */
98 	timeout_id_t sb_timeoutid; /* qtimeout() id */
99 	uint_t	sb_drops;	/* cumulative # discarded msgs */
100 	uint_t	sb_snap;	/* snapshot length */
101 	uint_t	sb_flags;	/* flags field */
102 	uint_t	sb_state;	/* state variable */
103 };
104 
105 /*
106  * Function prototypes.
107  */
108 static	int	sbopen(queue_t *, dev_t *, int, int, cred_t *);
109 static	int	sbclose(queue_t *, int, cred_t *);
110 static	int	sbwput(queue_t *, mblk_t *);
111 static	int	sbrput(queue_t *, mblk_t *);
112 static	int	sbrsrv(queue_t *);
113 static	void	sbioctl(queue_t *, mblk_t *);
114 static	void	sbaddmsg(queue_t *, mblk_t *);
115 static	void	sbtick(void *);
116 static	void	sbclosechunk(struct sb *);
117 static	void	sbsendit(queue_t *, mblk_t *);
118 
119 static struct module_info	sb_minfo = {
120 	21,		/* mi_idnum */
121 	"bufmod",	/* mi_idname */
122 	0,		/* mi_minpsz */
123 	INFPSZ,		/* mi_maxpsz */
124 	1,		/* mi_hiwat */
125 	0		/* mi_lowat */
126 };
127 
128 static struct qinit	sb_rinit = {
129 	sbrput,			/* qi_putp */
130 	sbrsrv,			/* qi_srvp */
131 	sbopen,			/* qi_qopen */
132 	sbclose,		/* qi_qclose */
133 	NULL,			/* qi_qadmin */
134 	&sb_minfo,		/* qi_minfo */
135 	NULL			/* qi_mstat */
136 };
137 
138 static struct qinit	sb_winit = {
139 	sbwput,			/* qi_putp */
140 	NULL,			/* qi_srvp */
141 	NULL,			/* qi_qopen */
142 	NULL,			/* qi_qclose */
143 	NULL,			/* qi_qadmin */
144 	&sb_minfo,		/* qi_minfo */
145 	NULL			/* qi_mstat */
146 };
147 
148 static struct streamtab	sb_info = {
149 	&sb_rinit,	/* st_rdinit */
150 	&sb_winit,	/* st_wrinit */
151 	NULL,		/* st_muxrinit */
152 	NULL		/* st_muxwinit */
153 };
154 
155 
156 /*
157  * This is the loadable module wrapper.
158  */
159 
160 static struct fmodsw fsw = {
161 	"bufmod",
162 	&sb_info,
163 	D_MTQPAIR | D_MP
164 };
165 
166 /*
167  * Module linkage information for the kernel.
168  */
169 
170 static struct modlstrmod modlstrmod = {
171 	&mod_strmodops, "streams buffer mod", &fsw
172 };
173 
174 static struct modlinkage modlinkage = {
175 	MODREV_1, &modlstrmod, NULL
176 };
177 
178 
179 int
180 _init(void)
181 {
182 	return (mod_install(&modlinkage));
183 }
184 
185 int
186 _fini(void)
187 {
188 	return (mod_remove(&modlinkage));
189 }
190 
191 int
192 _info(struct modinfo *modinfop)
193 {
194 	return (mod_info(&modlinkage, modinfop));
195 }
196 
197 
198 /* ARGSUSED */
199 static int
200 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
201 {
202 	struct sb	*sbp;
203 	ASSERT(rq);
204 
205 	if (sflag != MODOPEN)
206 		return (EINVAL);
207 
208 	if (rq->q_ptr)
209 		return (0);
210 
211 	/*
212 	 * Allocate and initialize per-Stream structure.
213 	 */
214 	sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
215 	sbp->sb_rq = rq;
216 	sbp->sb_ticks = -1;
217 	sbp->sb_chunk = SB_DFLT_CHUNK;
218 	sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
219 	sbp->sb_mlen = 0;
220 	sbp->sb_mcount = 0;
221 	sbp->sb_timeoutid = 0;
222 	sbp->sb_drops = 0;
223 	sbp->sb_snap = 0;
224 	sbp->sb_flags = 0;
225 	sbp->sb_state = 0;
226 
227 	rq->q_ptr = WR(rq)->q_ptr = sbp;
228 
229 	qprocson(rq);
230 
231 
232 	return (0);
233 }
234 
235 /* ARGSUSED1 */
236 static int
237 sbclose(queue_t *rq, int flag, cred_t *credp)
238 {
239 	struct	sb	*sbp = (struct sb *)rq->q_ptr;
240 
241 	ASSERT(sbp);
242 
243 	qprocsoff(rq);
244 	/*
245 	 * Cancel an outstanding timeout
246 	 */
247 	if (sbp->sb_timeoutid != 0) {
248 		(void) quntimeout(rq, sbp->sb_timeoutid);
249 		sbp->sb_timeoutid = 0;
250 	}
251 	/*
252 	 * Free the current chunk.
253 	 */
254 	if (sbp->sb_mp) {
255 		freemsg(sbp->sb_mp);
256 		sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
257 		sbp->sb_mlen = 0;
258 	}
259 
260 	/*
261 	 * Free the per-Stream structure.
262 	 */
263 	kmem_free((caddr_t)sbp, sizeof (struct sb));
264 	rq->q_ptr = WR(rq)->q_ptr = NULL;
265 
266 	return (0);
267 }
268 
269 /*
270  * the correction factor is introduced to compensate for
271  * whatever assumptions the modules below have made about
272  * how much traffic is flowing through the stream and the fact
273  * that bufmod may be snipping messages with the sb_snap length.
274  */
275 #define	SNIT_HIWAT(msgsize, fudge)	((4 * msgsize * fudge) + 512)
276 #define	SNIT_LOWAT(msgsize, fudge)	((2 * msgsize * fudge) + 256)
277 
278 
279 static void
280 sbioc(queue_t *wq, mblk_t *mp)
281 {
282 	struct iocblk *iocp;
283 	struct sb *sbp = (struct sb *)wq->q_ptr;
284 	clock_t	ticks;
285 	mblk_t	*mop;
286 
287 	iocp = (struct iocblk *)mp->b_rptr;
288 
289 	switch (iocp->ioc_cmd) {
290 	case SBIOCGCHUNK:
291 	case SBIOCGSNAP:
292 	case SBIOCGFLAGS:
293 	case SBIOCGTIME:
294 		miocack(wq, mp, 0, 0);
295 		return;
296 
297 	case SBIOCSTIME:
298 #ifdef _SYSCALL32_IMPL
299 		if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
300 			struct timeval32 *t32;
301 
302 			t32 = (struct timeval32 *)mp->b_cont->b_rptr;
303 			if (t32->tv_sec < 0 || t32->tv_usec < 0) {
304 				miocnak(wq, mp, 0, EINVAL);
305 				break;
306 			}
307 			ticks = TIMEVAL_TO_TICK(t32);
308 		} else
309 #endif /* _SYSCALL32_IMPL */
310 		{
311 			struct timeval *tb;
312 
313 			tb = (struct timeval *)mp->b_cont->b_rptr;
314 
315 			if (tb->tv_sec < 0 || tb->tv_usec < 0) {
316 				miocnak(wq, mp, 0, EINVAL);
317 				break;
318 			}
319 			ticks = TIMEVAL_TO_TICK(tb);
320 		}
321 		sbp->sb_ticks = ticks;
322 		if (ticks == 0)
323 			sbp->sb_chunk = 0;
324 		miocack(wq, mp, 0, 0);
325 		sbclosechunk(sbp);
326 		return;
327 
328 	case SBIOCSCHUNK:
329 		/*
330 		 * set up hi/lo water marks on stream head read queue.
331 		 * unlikely to run out of resources. Fix at later date.
332 		 */
333 		if ((mop = allocb(sizeof (struct stroptions),
334 		    BPRI_MED)) != NULL) {
335 			struct stroptions *sop;
336 			uint_t chunk;
337 
338 			chunk = *(uint_t *)mp->b_cont->b_rptr;
339 			mop->b_datap->db_type = M_SETOPTS;
340 			mop->b_wptr += sizeof (struct stroptions);
341 			sop = (struct stroptions *)mop->b_rptr;
342 			sop->so_flags = SO_HIWAT | SO_LOWAT;
343 			sop->so_hiwat = SNIT_HIWAT(chunk, 1);
344 			sop->so_lowat = SNIT_LOWAT(chunk, 1);
345 			qreply(wq, mop);
346 		}
347 
348 		sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
349 		miocack(wq, mp, 0, 0);
350 		sbclosechunk(sbp);
351 		return;
352 
353 	case SBIOCSFLAGS:
354 		sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
355 		miocack(wq, mp, 0, 0);
356 		return;
357 
358 	case SBIOCSSNAP:
359 		/*
360 		 * if chunking dont worry about effects of
361 		 * snipping of message size on head flow control
362 		 * since it has a relatively small bearing on the
363 		 * data rate onto the streamn head.
364 		 */
365 		if (!sbp->sb_chunk) {
366 			/*
367 			 * set up hi/lo water marks on stream head read queue.
368 			 * unlikely to run out of resources. Fix at later date.
369 			 */
370 			if ((mop = allocb(sizeof (struct stroptions),
371 			    BPRI_MED)) != NULL) {
372 				struct stroptions *sop;
373 				uint_t snap;
374 				int fudge;
375 
376 				snap = *(uint_t *)mp->b_cont->b_rptr;
377 				mop->b_datap->db_type = M_SETOPTS;
378 				mop->b_wptr += sizeof (struct stroptions);
379 				sop = (struct stroptions *)mop->b_rptr;
380 				sop->so_flags = SO_HIWAT | SO_LOWAT;
381 				fudge = snap <= 100 ?   4 :
382 				    snap <= 400 ?   2 :
383 				    1;
384 				sop->so_hiwat = SNIT_HIWAT(snap, fudge);
385 				sop->so_lowat = SNIT_LOWAT(snap, fudge);
386 				qreply(wq, mop);
387 			}
388 		}
389 
390 		sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
391 		miocack(wq, mp, 0, 0);
392 		return;
393 
394 	default:
395 		ASSERT(0);
396 		return;
397 	}
398 }
399 
400 /*
401  * Write-side put procedure.  Its main task is to detect ioctls
402  * for manipulating the buffering state and hand them to sbioctl.
403  * Other message types are passed on through.
404  */
405 static int
406 sbwput(queue_t *wq, mblk_t *mp)
407 {
408 	struct	sb	*sbp = (struct sb *)wq->q_ptr;
409 	struct copyresp *resp;
410 
411 	if (sbp->sb_flags & SB_SEND_ON_WRITE)
412 		sbclosechunk(sbp);
413 	switch (mp->b_datap->db_type) {
414 	case M_IOCTL:
415 		sbioctl(wq, mp);
416 		break;
417 
418 	case M_IOCDATA:
419 		resp = (struct copyresp *)mp->b_rptr;
420 		if (resp->cp_rval) {
421 			/*
422 			 * Just free message on failure.
423 			 */
424 			freemsg(mp);
425 			break;
426 		}
427 
428 		switch (resp->cp_cmd) {
429 		case SBIOCSTIME:
430 		case SBIOCSCHUNK:
431 		case SBIOCSFLAGS:
432 		case SBIOCSSNAP:
433 		case SBIOCGTIME:
434 		case SBIOCGCHUNK:
435 		case SBIOCGSNAP:
436 		case SBIOCGFLAGS:
437 			sbioc(wq, mp);
438 			break;
439 
440 		default:
441 			putnext(wq, mp);
442 			break;
443 		}
444 		break;
445 
446 	default:
447 		putnext(wq, mp);
448 		break;
449 	}
450 	return (0);
451 }
452 
453 /*
454  * Read-side put procedure.  It's responsible for buffering up incoming
455  * messages and grouping them into aggregates according to the current
456  * buffering parameters.
457  */
458 static int
459 sbrput(queue_t *rq, mblk_t *mp)
460 {
461 	struct	sb	*sbp = (struct sb *)rq->q_ptr;
462 
463 	ASSERT(sbp);
464 
465 	switch (mp->b_datap->db_type) {
466 	case M_PROTO:
467 		if (sbp->sb_flags & SB_NO_PROTO_CVT) {
468 			sbclosechunk(sbp);
469 			sbsendit(rq, mp);
470 			break;
471 		} else {
472 			/*
473 			 * Convert M_PROTO to M_DATA.
474 			 */
475 			mp->b_datap->db_type = M_DATA;
476 		}
477 		/* FALLTHRU */
478 
479 	case M_DATA:
480 		if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
481 		    !(sbp->sb_state & SB_FRCVD)) {
482 			sbclosechunk(sbp);
483 			sbsendit(rq, mp);
484 			sbp->sb_state |= SB_FRCVD;
485 		} else
486 			sbaddmsg(rq, mp);
487 
488 		if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
489 			sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
490 			    sbp, sbp->sb_ticks);
491 
492 		break;
493 
494 	case M_FLUSH:
495 		if (*mp->b_rptr & FLUSHR) {
496 			/*
497 			 * Reset timeout, flush the chunk currently in
498 			 * progress, and start a new chunk.
499 			 */
500 			if (sbp->sb_timeoutid) {
501 				(void) quntimeout(sbp->sb_rq,
502 				    sbp->sb_timeoutid);
503 				sbp->sb_timeoutid = 0;
504 			}
505 			if (sbp->sb_mp) {
506 				freemsg(sbp->sb_mp);
507 				sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
508 				sbp->sb_mlen = 0;
509 				sbp->sb_mcount = 0;
510 			}
511 			flushq(rq, FLUSHALL);
512 		}
513 		putnext(rq, mp);
514 		break;
515 
516 	case M_CTL:
517 		/*
518 		 * Zero-length M_CTL means our timeout() popped.
519 		 */
520 		if (MBLKL(mp) == 0) {
521 			freemsg(mp);
522 			sbclosechunk(sbp);
523 		} else {
524 			sbclosechunk(sbp);
525 			sbsendit(rq, mp);
526 		}
527 		break;
528 
529 	default:
530 		if (mp->b_datap->db_type <= QPCTL) {
531 			sbclosechunk(sbp);
532 			sbsendit(rq, mp);
533 		} else {
534 			/* Note: out of band */
535 			putnext(rq, mp);
536 		}
537 		break;
538 	}
539 	return (0);
540 }
541 
542 /*
543  *  read service procedure.
544  */
545 /* ARGSUSED */
546 static int
547 sbrsrv(queue_t *rq)
548 {
549 	mblk_t	*mp;
550 
551 	/*
552 	 * High priority messages shouldn't get here but if
553 	 * one does, jam it through to avoid infinite loop.
554 	 */
555 	while ((mp = getq(rq)) != NULL) {
556 		if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
557 			/* should only get here if SB_NO_SROPS */
558 			(void) putbq(rq, mp);
559 			return (0);
560 		}
561 		putnext(rq, mp);
562 	}
563 	return (0);
564 }
565 
566 /*
567  * Handle write-side M_IOCTL messages.
568  */
569 static void
570 sbioctl(queue_t *wq, mblk_t *mp)
571 {
572 	struct	sb	*sbp = (struct sb *)wq->q_ptr;
573 	struct iocblk	*iocp = (struct iocblk *)mp->b_rptr;
574 	struct	timeval	*t;
575 	clock_t	ticks;
576 	mblk_t	*mop;
577 	int	transparent = iocp->ioc_count;
578 	mblk_t	*datamp;
579 	int	error;
580 
581 	switch (iocp->ioc_cmd) {
582 	case SBIOCSTIME:
583 		if (iocp->ioc_count == TRANSPARENT) {
584 #ifdef _SYSCALL32_IMPL
585 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
586 				mcopyin(mp, NULL, sizeof (struct timeval32),
587 				    NULL);
588 			} else
589 #endif /* _SYSCALL32_IMPL */
590 			{
591 				mcopyin(mp, NULL, sizeof (*t), NULL);
592 			}
593 			qreply(wq, mp);
594 		} else {
595 			/*
596 			 * Verify argument length.
597 			 */
598 #ifdef _SYSCALL32_IMPL
599 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
600 				struct timeval32 *t32;
601 
602 				error = miocpullup(mp,
603 				    sizeof (struct timeval32));
604 				if (error != 0) {
605 					miocnak(wq, mp, 0, error);
606 					break;
607 				}
608 				t32 = (struct timeval32 *)mp->b_cont->b_rptr;
609 				if (t32->tv_sec < 0 || t32->tv_usec < 0) {
610 					miocnak(wq, mp, 0, EINVAL);
611 					break;
612 				}
613 				ticks = TIMEVAL_TO_TICK(t32);
614 			} else
615 #endif /* _SYSCALL32_IMPL */
616 			{
617 				error = miocpullup(mp, sizeof (struct timeval));
618 				if (error != 0) {
619 					miocnak(wq, mp, 0, error);
620 					break;
621 				}
622 
623 				t = (struct timeval *)mp->b_cont->b_rptr;
624 				if (t->tv_sec < 0 || t->tv_usec < 0) {
625 					miocnak(wq, mp, 0, EINVAL);
626 					break;
627 				}
628 				ticks = TIMEVAL_TO_TICK(t);
629 			}
630 			sbp->sb_ticks = ticks;
631 			if (ticks == 0)
632 				sbp->sb_chunk = 0;
633 			miocack(wq, mp, 0, 0);
634 			sbclosechunk(sbp);
635 		}
636 		break;
637 
638 	case SBIOCGTIME: {
639 		struct timeval *t;
640 
641 		/*
642 		 * Verify argument length.
643 		 */
644 		if (transparent != TRANSPARENT) {
645 #ifdef _SYSCALL32_IMPL
646 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
647 				error = miocpullup(mp,
648 				    sizeof (struct timeval32));
649 				if (error != 0) {
650 					miocnak(wq, mp, 0, error);
651 					break;
652 				}
653 			} else
654 #endif /* _SYSCALL32_IMPL */
655 			error = miocpullup(mp, sizeof (struct timeval));
656 			if (error != 0) {
657 				miocnak(wq, mp, 0, error);
658 				break;
659 			}
660 		}
661 
662 		/*
663 		 * If infinite timeout, return range error
664 		 * for the ioctl.
665 		 */
666 		if (sbp->sb_ticks < 0) {
667 			miocnak(wq, mp, 0, ERANGE);
668 			break;
669 		}
670 
671 #ifdef _SYSCALL32_IMPL
672 		if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
673 			struct timeval32 *t32;
674 
675 			if (transparent == TRANSPARENT) {
676 				datamp = allocb(sizeof (*t32), BPRI_MED);
677 				if (datamp == NULL) {
678 					miocnak(wq, mp, 0, EAGAIN);
679 					break;
680 				}
681 				mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
682 			}
683 
684 			t32 = (struct timeval32 *)mp->b_cont->b_rptr;
685 			TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
686 
687 			if (transparent == TRANSPARENT)
688 				qreply(wq, mp);
689 			else
690 				miocack(wq, mp, sizeof (*t32), 0);
691 		} else
692 #endif /* _SYSCALL32_IMPL */
693 		{
694 			if (transparent == TRANSPARENT) {
695 				datamp = allocb(sizeof (*t), BPRI_MED);
696 				if (datamp == NULL) {
697 					miocnak(wq, mp, 0, EAGAIN);
698 					break;
699 				}
700 				mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
701 			}
702 
703 			t = (struct timeval *)mp->b_cont->b_rptr;
704 			TICK_TO_TIMEVAL(sbp->sb_ticks, t);
705 
706 			if (transparent == TRANSPARENT)
707 				qreply(wq, mp);
708 			else
709 				miocack(wq, mp, sizeof (*t), 0);
710 		}
711 		break;
712 	}
713 
714 	case SBIOCCTIME:
715 		sbp->sb_ticks = -1;
716 		miocack(wq, mp, 0, 0);
717 		break;
718 
719 	case SBIOCSCHUNK:
720 		if (iocp->ioc_count == TRANSPARENT) {
721 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
722 			qreply(wq, mp);
723 		} else {
724 			/*
725 			 * Verify argument length.
726 			 */
727 			error = miocpullup(mp, sizeof (uint_t));
728 			if (error != 0) {
729 				miocnak(wq, mp, 0, error);
730 				break;
731 			}
732 
733 			/*
734 			 * set up hi/lo water marks on stream head read queue.
735 			 * unlikely to run out of resources. Fix at later date.
736 			 */
737 			if ((mop = allocb(sizeof (struct stroptions),
738 			    BPRI_MED)) != NULL) {
739 				struct stroptions *sop;
740 				uint_t chunk;
741 
742 				chunk = *(uint_t *)mp->b_cont->b_rptr;
743 				mop->b_datap->db_type = M_SETOPTS;
744 				mop->b_wptr += sizeof (struct stroptions);
745 				sop = (struct stroptions *)mop->b_rptr;
746 				sop->so_flags = SO_HIWAT | SO_LOWAT;
747 				sop->so_hiwat = SNIT_HIWAT(chunk, 1);
748 				sop->so_lowat = SNIT_LOWAT(chunk, 1);
749 				qreply(wq, mop);
750 			}
751 
752 			sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
753 			miocack(wq, mp, 0, 0);
754 			sbclosechunk(sbp);
755 		}
756 		break;
757 
758 	case SBIOCGCHUNK:
759 		/*
760 		 * Verify argument length.
761 		 */
762 		if (transparent != TRANSPARENT) {
763 			error = miocpullup(mp, sizeof (uint_t));
764 			if (error != 0) {
765 				miocnak(wq, mp, 0, error);
766 				break;
767 			}
768 		}
769 
770 		if (transparent == TRANSPARENT) {
771 			datamp = allocb(sizeof (uint_t), BPRI_MED);
772 			if (datamp == NULL) {
773 				miocnak(wq, mp, 0, EAGAIN);
774 				break;
775 			}
776 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
777 		}
778 
779 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
780 
781 		if (transparent == TRANSPARENT)
782 			qreply(wq, mp);
783 		else
784 			miocack(wq, mp, sizeof (uint_t), 0);
785 		break;
786 
787 	case SBIOCSSNAP:
788 		if (iocp->ioc_count == TRANSPARENT) {
789 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
790 			qreply(wq, mp);
791 		} else {
792 			/*
793 			 * Verify argument length.
794 			 */
795 			error = miocpullup(mp, sizeof (uint_t));
796 			if (error != 0) {
797 				miocnak(wq, mp, 0, error);
798 				break;
799 			}
800 
801 			/*
802 			 * if chunking dont worry about effects of
803 			 * snipping of message size on head flow control
804 			 * since it has a relatively small bearing on the
805 			 * data rate onto the streamn head.
806 			 */
807 			if (!sbp->sb_chunk) {
808 				/*
809 				 * set up hi/lo water marks on stream
810 				 * head read queue.  unlikely to run out
811 				 * of resources. Fix at later date.
812 				 */
813 				if ((mop = allocb(sizeof (struct stroptions),
814 				    BPRI_MED)) != NULL) {
815 					struct stroptions *sop;
816 					uint_t snap;
817 					int fudge;
818 
819 					snap = *(uint_t *)mp->b_cont->b_rptr;
820 					mop->b_datap->db_type = M_SETOPTS;
821 					mop->b_wptr += sizeof (*sop);
822 					sop = (struct stroptions *)mop->b_rptr;
823 					sop->so_flags = SO_HIWAT | SO_LOWAT;
824 					fudge = (snap <= 100) ? 4 :
825 					    (snap <= 400) ? 2 : 1;
826 					sop->so_hiwat = SNIT_HIWAT(snap, fudge);
827 					sop->so_lowat = SNIT_LOWAT(snap, fudge);
828 					qreply(wq, mop);
829 				}
830 			}
831 
832 			sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
833 
834 			miocack(wq, mp, 0, 0);
835 		}
836 		break;
837 
838 	case SBIOCGSNAP:
839 		/*
840 		 * Verify argument length
841 		 */
842 		if (transparent != TRANSPARENT) {
843 			error = miocpullup(mp, sizeof (uint_t));
844 			if (error != 0) {
845 				miocnak(wq, mp, 0, error);
846 				break;
847 			}
848 		}
849 
850 		if (transparent == TRANSPARENT) {
851 			datamp = allocb(sizeof (uint_t), BPRI_MED);
852 			if (datamp == NULL) {
853 				miocnak(wq, mp, 0, EAGAIN);
854 				break;
855 			}
856 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
857 		}
858 
859 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
860 
861 		if (transparent == TRANSPARENT)
862 			qreply(wq, mp);
863 		else
864 			miocack(wq, mp, sizeof (uint_t), 0);
865 		break;
866 
867 	case SBIOCSFLAGS:
868 		/*
869 		 * set the flags.
870 		 */
871 		if (iocp->ioc_count == TRANSPARENT) {
872 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
873 			qreply(wq, mp);
874 		} else {
875 			error = miocpullup(mp, sizeof (uint_t));
876 			if (error != 0) {
877 				miocnak(wq, mp, 0, error);
878 				break;
879 			}
880 			sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
881 			miocack(wq, mp, 0, 0);
882 		}
883 		break;
884 
885 	case SBIOCGFLAGS:
886 		/*
887 		 * Verify argument length
888 		 */
889 		if (transparent != TRANSPARENT) {
890 			error = miocpullup(mp, sizeof (uint_t));
891 			if (error != 0) {
892 				miocnak(wq, mp, 0, error);
893 				break;
894 			}
895 		}
896 
897 		if (transparent == TRANSPARENT) {
898 			datamp = allocb(sizeof (uint_t), BPRI_MED);
899 			if (datamp == NULL) {
900 				miocnak(wq, mp, 0, EAGAIN);
901 				break;
902 			}
903 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
904 		}
905 
906 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
907 
908 		if (transparent == TRANSPARENT)
909 			qreply(wq, mp);
910 		else
911 			miocack(wq, mp, sizeof (uint_t), 0);
912 		break;
913 
914 
915 	default:
916 		putnext(wq, mp);
917 		break;
918 	}
919 }
920 
921 /*
922  * Given a length l, calculate the amount of extra storage
923  * required to round it up to the next multiple of the alignment a.
924  */
925 #define	RoundUpAmt(l, a)	((l) % (a) ? (a) - ((l) % (a)) : 0)
926 /*
927  * Calculate additional amount of space required for alignment.
928  */
929 #define	Align(l)		RoundUpAmt(l, sizeof (ulong_t))
930 /*
931  * Smallest possible message size when headers are enabled.
932  * This is used to calculate whether a chunk is nearly full.
933  */
934 #define	SMALLEST_MESSAGE	sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
935 
936 /*
937  * Process a read-side M_DATA message.
938  *
939  * If the currently accumulating chunk doesn't have enough room
940  * for the message, close off the chunk, pass it upward, and start
941  * a new one.  Then add the message to the current chunk, taking
942  * account of the possibility that the message's size exceeds the
943  * chunk size.
944  *
945  * If headers are enabled add an sb_hdr header and trailing alignment padding.
946  *
947  * To optimise performance the total number of msgbs should be kept
948  * to a minimum. This is achieved by using any remaining space in message N
949  * for both its own padding as well as the header of message N+1 if possible.
950  * If there's insufficient space we allocate one message to hold this 'wrapper'.
951  * (there's likely to be space beyond message N, since allocb would have
952  * rounded up the required size to one of the dblk_sizes).
953  *
954  */
955 static void
956 sbaddmsg(queue_t *rq, mblk_t *mp)
957 {
958 	struct sb	*sbp;
959 	struct timeval	t;
960 	struct sb_hdr	hp;
961 	mblk_t *wrapper;	/* padding for msg N, header for msg N+1 */
962 	mblk_t *last;		/* last mblk of current message */
963 	size_t wrapperlen;	/* length of header + padding */
964 	size_t origlen;		/* data length before truncation */
965 	size_t pad;		/* bytes required to align header */
966 
967 	sbp = (struct sb *)rq->q_ptr;
968 
969 	origlen = msgdsize(mp);
970 
971 	/*
972 	 * Truncate the message.
973 	 */
974 	if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
975 	    (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
976 		hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
977 	else
978 		hp.sbh_totlen = hp.sbh_msglen = origlen;
979 
980 	if (sbp->sb_flags & SB_NO_HEADER) {
981 
982 		/*
983 		 * Would the inclusion of this message overflow the current
984 		 * chunk? If so close the chunk off and start a new one.
985 		 */
986 		if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
987 			sbclosechunk(sbp);
988 		/*
989 		 * First message too big for chunk - just send it up.
990 		 * This will always be true when we're not chunking.
991 		 */
992 		if (hp.sbh_totlen > sbp->sb_chunk) {
993 			sbsendit(rq, mp);
994 			return;
995 		}
996 
997 		/*
998 		 * We now know that the msg will fit in the chunk.
999 		 * Link it onto the end of the chunk.
1000 		 * Since linkb() walks the entire chain, we keep a pointer to
1001 		 * the first mblk of the last msgb added and call linkb on that
1002 		 * that last message, rather than performing the
1003 		 * O(n) linkb() operation on the whole chain.
1004 		 * sb_head isn't needed in this SB_NO_HEADER mode.
1005 		 */
1006 		if (sbp->sb_mp)
1007 			linkb(sbp->sb_tail, mp);
1008 		else
1009 			sbp->sb_mp = mp;
1010 
1011 		sbp->sb_tail = mp;
1012 		sbp->sb_mlen += hp.sbh_totlen;
1013 		sbp->sb_mcount++;
1014 	} else {
1015 		/* Timestamp must be done immediately */
1016 		uniqtime(&t);
1017 		TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
1018 
1019 		pad = Align(hp.sbh_totlen);
1020 		hp.sbh_totlen += sizeof (hp);
1021 
1022 		/* We can't fit this message on the current chunk. */
1023 		if ((sbp->sb_mlen + hp.sbh_totlen) > sbp->sb_chunk)
1024 			sbclosechunk(sbp);
1025 
1026 		/*
1027 		 * If we closed it (just now or during a previous
1028 		 * call) then allocate the head of a new chunk.
1029 		 */
1030 		if (sbp->sb_head == NULL) {
1031 			/* Allocate leading header of new chunk */
1032 			sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
1033 			if (sbp->sb_head == NULL) {
1034 				/*
1035 				 * Memory allocation failure.
1036 				 * This will need to be revisited
1037 				 * since using certain flag combinations
1038 				 * can result in messages being dropped
1039 				 * silently.
1040 				 */
1041 				freemsg(mp);
1042 				sbp->sb_drops++;
1043 				return;
1044 			}
1045 			sbp->sb_mp = sbp->sb_head;
1046 		}
1047 
1048 		/*
1049 		 * Set the header values and join the message to the
1050 		 * chunk. The header values are copied into the chunk
1051 		 * after we adjust for padding below.
1052 		 */
1053 		hp.sbh_drops = sbp->sb_drops;
1054 		hp.sbh_origlen = origlen;
1055 		linkb(sbp->sb_head, mp);
1056 		sbp->sb_mcount++;
1057 		sbp->sb_mlen += hp.sbh_totlen;
1058 
1059 		/*
1060 		 * There's no chance to fit another message on the
1061 		 * chunk -- forgo the padding and close the chunk.
1062 		 */
1063 		if ((sbp->sb_mlen + pad + SMALLEST_MESSAGE) > sbp->sb_chunk) {
1064 			(void) memcpy(sbp->sb_head->b_wptr, (char *)&hp,
1065 			    sizeof (hp));
1066 			sbp->sb_head->b_wptr += sizeof (hp);
1067 			ASSERT(sbp->sb_head->b_wptr <=
1068 			    sbp->sb_head->b_datap->db_lim);
1069 			sbclosechunk(sbp);
1070 			return;
1071 		}
1072 
1073 		/*
1074 		 * We may add another message to this chunk -- adjust
1075 		 * the headers for padding to be added below.
1076 		 */
1077 		hp.sbh_totlen += pad;
1078 		(void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
1079 		sbp->sb_head->b_wptr += sizeof (hp);
1080 		ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
1081 		sbp->sb_mlen += pad;
1082 
1083 		/*
1084 		 * Find space for the wrapper. The wrapper consists of:
1085 		 *
1086 		 * 1) Padding for this message (this is to ensure each header
1087 		 * begins on an 8 byte boundary in the userland buffer).
1088 		 *
1089 		 * 2) Space for the next message's header, in case the next
1090 		 * next message will fit in this chunk.
1091 		 *
1092 		 * It may be possible to append the wrapper to the last mblk
1093 		 * of the message, but only if we 'own' the data. If the dblk
1094 		 * has been shared through dupmsg() we mustn't alter it.
1095 		 */
1096 		wrapperlen = (sizeof (hp) + pad);
1097 
1098 		/* Is there space for the wrapper beyond the message's data ? */
1099 		for (last = mp; last->b_cont; last = last->b_cont)
1100 			;
1101 
1102 		if ((wrapperlen <= MBLKTAIL(last)) &&
1103 		    (last->b_datap->db_ref == 1)) {
1104 			if (pad > 0) {
1105 				/*
1106 				 * Pad with zeroes to the next pointer boundary
1107 				 * (we don't want to disclose kernel data to
1108 				 * users), then advance wptr.
1109 				 */
1110 				(void) memset(last->b_wptr, 0, pad);
1111 				last->b_wptr += pad;
1112 			}
1113 			/* Remember where to write the header information */
1114 			sbp->sb_head = last;
1115 		} else {
1116 			/* Have to allocate additional space for the wrapper */
1117 			wrapper = allocb(wrapperlen, BPRI_MED);
1118 			if (wrapper == NULL) {
1119 				sbclosechunk(sbp);
1120 				return;
1121 			}
1122 			if (pad > 0) {
1123 				/*
1124 				 * Pad with zeroes (we don't want to disclose
1125 				 * kernel data to users).
1126 				 */
1127 				(void) memset(wrapper->b_wptr, 0, pad);
1128 				wrapper->b_wptr += pad;
1129 			}
1130 			/* Link the wrapper msg onto the end of the chunk */
1131 			linkb(mp, wrapper);
1132 			/* Remember to write the next header in this wrapper */
1133 			sbp->sb_head = wrapper;
1134 		}
1135 	}
1136 }
1137 
1138 /*
1139  * Called from timeout().
1140  * Signal a timeout by passing a zero-length M_CTL msg in the read-side
1141  * to synchronize with any active module threads (open, close, wput, rput).
1142  */
1143 static void
1144 sbtick(void *arg)
1145 {
1146 	struct sb *sbp = arg;
1147 	queue_t	*rq;
1148 
1149 	ASSERT(sbp);
1150 
1151 	rq = sbp->sb_rq;
1152 	sbp->sb_timeoutid = 0;		/* timeout has fired */
1153 
1154 	if (putctl(rq, M_CTL) == 0)	/* failure */
1155 		sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
1156 }
1157 
1158 /*
1159  * Close off the currently accumulating chunk and pass
1160  * it upward.  Takes care of resetting timers as well.
1161  *
1162  * This routine is called both directly and as a result
1163  * of the chunk timeout expiring.
1164  */
1165 static void
1166 sbclosechunk(struct sb *sbp)
1167 {
1168 	mblk_t	*mp;
1169 	queue_t	*rq;
1170 
1171 	ASSERT(sbp);
1172 
1173 	if (sbp->sb_timeoutid) {
1174 		(void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
1175 		sbp->sb_timeoutid = 0;
1176 	}
1177 
1178 	mp = sbp->sb_mp;
1179 	rq = sbp->sb_rq;
1180 
1181 	/*
1182 	 * If there's currently a chunk in progress, close it off
1183 	 * and try to send it up.
1184 	 */
1185 	if (mp) {
1186 		sbsendit(rq, mp);
1187 	}
1188 
1189 	/*
1190 	 * Clear old chunk.  Ready for new msgs.
1191 	 */
1192 	sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
1193 	sbp->sb_mlen = 0;
1194 	sbp->sb_mcount = 0;
1195 	if (sbp->sb_flags & SB_DEFER_CHUNK)
1196 		sbp->sb_state &= ~SB_FRCVD;
1197 
1198 }
1199 
1200 static void
1201 sbsendit(queue_t *rq, mblk_t *mp)
1202 {
1203 	struct	sb	*sbp = (struct sb *)rq->q_ptr;
1204 
1205 	if (!canputnext(rq)) {
1206 		if (sbp->sb_flags & SB_NO_DROPS)
1207 			(void) putq(rq, mp);
1208 		else {
1209 			freemsg(mp);
1210 			sbp->sb_drops += sbp->sb_mcount;
1211 		}
1212 		return;
1213 	}
1214 	/*
1215 	 * If there are messages on the q already, keep
1216 	 * queueing them since they need to be processed in order.
1217 	 */
1218 	if (qsize(rq) > 0) {
1219 		/* should only get here if SB_NO_DROPS */
1220 		(void) putq(rq, mp);
1221 	}
1222 	else
1223 		putnext(rq, mp);
1224 }
1225