xref: /illumos-gate/usr/src/uts/common/io/bufmod.c (revision 6e6545bfaed3bab9ce836ee82d1abd8f2edba89a)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  * Copyright 2018 Joyent, Inc.
26  */
27 
28 /*
29  * STREAMS Buffering module
30  *
31  * This streams module collects incoming messages from modules below
32  * it on the stream and buffers them up into a smaller number of
33  * aggregated messages.  Its main purpose is to reduce overhead by
34  * cutting down on the number of read (or getmsg) calls its client
35  * user process makes.
36  *  - only M_DATA is buffered.
37  *  - multithreading assumes configured as D_MTQPAIR
38  *  - packets are lost only if flag SB_NO_HEADER is clear and buffer
39  *    allocation fails.
40  *  - in order message transmission. This is enforced for messages other
41  *    than high priority messages.
42  *  - zero length messages on the read side are not passed up the
43  *    stream but used internally for synchronization.
44  * FLAGS:
45  * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
46  *   (conversion is the default for backwards compatibility
47  *    hence the negative logic).
48  * - SB_NO_HEADER - no headers in buffered data.
49  *   (adding headers is the default for backwards compatibility
50  *    hence the negative logic).
51  * - SB_DEFER_CHUNK - provides improved response time in question-answer
52  *   applications. Buffering is not enabled until the second message
53  *   is received on the read side within the sb_ticks interval.
54  *   This option will often be used in combination with flag SB_SEND_ON_WRITE.
55  * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
56  *   data being immediately sent upstream.
57  * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
58  *   the blocked flow condition downstream. If this flag is clear (default)
59  *   messages will be dropped if the upstream flow is blocked.
60  */
61 
62 
63 #include	<sys/types.h>
64 #include	<sys/errno.h>
65 #include	<sys/debug.h>
66 #include	<sys/stropts.h>
67 #include	<sys/time.h>
68 #include	<sys/stream.h>
69 #include	<sys/conf.h>
70 #include	<sys/ddi.h>
71 #include	<sys/sunddi.h>
72 #include	<sys/kmem.h>
73 #include	<sys/strsun.h>
74 #include	<sys/bufmod.h>
75 #include	<sys/modctl.h>
76 #include	<sys/isa_defs.h>
77 
78 /*
79  * Per-Stream state information.
80  *
81  * If sb_ticks is negative, we don't deliver chunks until they're
82  * full.  If it's zero, we deliver every packet as it arrives.  (In
83  * this case we force sb_chunk to zero, to make the implementation
84  * easier.)  Otherwise, sb_ticks gives the number of ticks in a
85  * buffering interval. The interval begins when the a read side data
86  * message is received and a timeout is not active. If sb_snap is
87  * zero, no truncation of the msg is done.
88  */
89 struct sb {
90 	queue_t	*sb_rq;		/* our rq */
91 	mblk_t	*sb_mp;		/* partial chunk */
92 	mblk_t  *sb_head;	/* pre-allocated space for the next header */
93 	mblk_t	*sb_tail;	/* first mblk of last message appended */
94 	uint_t	sb_mlen;	/* sb_mp length */
95 	uint_t	sb_mcount;	/* input msg count in sb_mp */
96 	uint_t	sb_chunk;	/* max chunk size */
97 	clock_t	sb_ticks;	/* timeout interval */
98 	timeout_id_t sb_timeoutid; /* qtimeout() id */
99 	uint_t	sb_drops;	/* cumulative # discarded msgs */
100 	uint_t	sb_snap;	/* snapshot length */
101 	uint_t	sb_flags;	/* flags field */
102 	uint_t	sb_state;	/* state variable */
103 };
104 
105 /*
106  * Function prototypes.
107  */
108 static	int	sbopen(queue_t *, dev_t *, int, int, cred_t *);
109 static	int	sbclose(queue_t *, int, cred_t *);
110 static	void	sbwput(queue_t *, mblk_t *);
111 static	void	sbrput(queue_t *, mblk_t *);
112 static	void	sbrsrv(queue_t *);
113 static	void	sbioctl(queue_t *, mblk_t *);
114 static	void	sbaddmsg(queue_t *, mblk_t *);
115 static	void	sbtick(void *);
116 static	void	sbclosechunk(struct sb *);
117 static	void	sbsendit(queue_t *, mblk_t *);
118 
119 static struct module_info	sb_minfo = {
120 	21,		/* mi_idnum */
121 	"bufmod",	/* mi_idname */
122 	0,		/* mi_minpsz */
123 	INFPSZ,		/* mi_maxpsz */
124 	1,		/* mi_hiwat */
125 	0		/* mi_lowat */
126 };
127 
128 static struct qinit	sb_rinit = {
129 	(int (*)())sbrput,	/* qi_putp */
130 	(int (*)())sbrsrv,	/* qi_srvp */
131 	sbopen,			/* qi_qopen */
132 	sbclose,		/* qi_qclose */
133 	NULL,			/* qi_qadmin */
134 	&sb_minfo,		/* qi_minfo */
135 	NULL			/* qi_mstat */
136 };
137 
138 static struct qinit	sb_winit = {
139 	(int (*)())sbwput,	/* qi_putp */
140 	NULL,			/* qi_srvp */
141 	NULL,			/* qi_qopen */
142 	NULL,			/* qi_qclose */
143 	NULL,			/* qi_qadmin */
144 	&sb_minfo,		/* qi_minfo */
145 	NULL			/* qi_mstat */
146 };
147 
148 static struct streamtab	sb_info = {
149 	&sb_rinit,	/* st_rdinit */
150 	&sb_winit,	/* st_wrinit */
151 	NULL,		/* st_muxrinit */
152 	NULL		/* st_muxwinit */
153 };
154 
155 
156 /*
157  * This is the loadable module wrapper.
158  */
159 
160 static struct fmodsw fsw = {
161 	"bufmod",
162 	&sb_info,
163 	D_MTQPAIR | D_MP
164 };
165 
166 /*
167  * Module linkage information for the kernel.
168  */
169 
170 static struct modlstrmod modlstrmod = {
171 	&mod_strmodops, "streams buffer mod", &fsw
172 };
173 
174 static struct modlinkage modlinkage = {
175 	MODREV_1, &modlstrmod, NULL
176 };
177 
178 
179 int
180 _init(void)
181 {
182 	return (mod_install(&modlinkage));
183 }
184 
185 int
186 _fini(void)
187 {
188 	return (mod_remove(&modlinkage));
189 }
190 
191 int
192 _info(struct modinfo *modinfop)
193 {
194 	return (mod_info(&modlinkage, modinfop));
195 }
196 
197 
198 /* ARGSUSED */
199 static int
200 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
201 {
202 	struct sb	*sbp;
203 	ASSERT(rq);
204 
205 	if (sflag != MODOPEN)
206 		return (EINVAL);
207 
208 	if (rq->q_ptr)
209 		return (0);
210 
211 	/*
212 	 * Allocate and initialize per-Stream structure.
213 	 */
214 	sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
215 	sbp->sb_rq = rq;
216 	sbp->sb_ticks = -1;
217 	sbp->sb_chunk = SB_DFLT_CHUNK;
218 	sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
219 	sbp->sb_mlen = 0;
220 	sbp->sb_mcount = 0;
221 	sbp->sb_timeoutid = 0;
222 	sbp->sb_drops = 0;
223 	sbp->sb_snap = 0;
224 	sbp->sb_flags = 0;
225 	sbp->sb_state = 0;
226 
227 	rq->q_ptr = WR(rq)->q_ptr = sbp;
228 
229 	qprocson(rq);
230 
231 
232 	return (0);
233 }
234 
235 /* ARGSUSED1 */
236 static int
237 sbclose(queue_t *rq, int flag, cred_t *credp)
238 {
239 	struct	sb	*sbp = (struct sb *)rq->q_ptr;
240 
241 	ASSERT(sbp);
242 
243 	qprocsoff(rq);
244 	/*
245 	 * Cancel an outstanding timeout
246 	 */
247 	if (sbp->sb_timeoutid != 0) {
248 		(void) quntimeout(rq, sbp->sb_timeoutid);
249 		sbp->sb_timeoutid = 0;
250 	}
251 	/*
252 	 * Free the current chunk.
253 	 */
254 	if (sbp->sb_mp) {
255 		freemsg(sbp->sb_mp);
256 		sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
257 		sbp->sb_mlen = 0;
258 	}
259 
260 	/*
261 	 * Free the per-Stream structure.
262 	 */
263 	kmem_free((caddr_t)sbp, sizeof (struct sb));
264 	rq->q_ptr = WR(rq)->q_ptr = NULL;
265 
266 	return (0);
267 }
268 
269 /*
270  * the correction factor is introduced to compensate for
271  * whatever assumptions the modules below have made about
272  * how much traffic is flowing through the stream and the fact
273  * that bufmod may be snipping messages with the sb_snap length.
274  */
275 #define	SNIT_HIWAT(msgsize, fudge)	((4 * msgsize * fudge) + 512)
276 #define	SNIT_LOWAT(msgsize, fudge)	((2 * msgsize * fudge) + 256)
277 
278 
279 static void
280 sbioc(queue_t *wq, mblk_t *mp)
281 {
282 	struct iocblk *iocp;
283 	struct sb *sbp = (struct sb *)wq->q_ptr;
284 	clock_t	ticks;
285 	mblk_t	*mop;
286 
287 	iocp = (struct iocblk *)mp->b_rptr;
288 
289 	switch (iocp->ioc_cmd) {
290 	case SBIOCGCHUNK:
291 	case SBIOCGSNAP:
292 	case SBIOCGFLAGS:
293 	case SBIOCGTIME:
294 		miocack(wq, mp, 0, 0);
295 		return;
296 
297 	case SBIOCSTIME:
298 #ifdef _SYSCALL32_IMPL
299 		if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
300 			struct timeval32 *t32;
301 
302 			t32 = (struct timeval32 *)mp->b_cont->b_rptr;
303 			if (t32->tv_sec < 0 || t32->tv_usec < 0) {
304 				miocnak(wq, mp, 0, EINVAL);
305 				break;
306 			}
307 			ticks = TIMEVAL_TO_TICK(t32);
308 		} else
309 #endif /* _SYSCALL32_IMPL */
310 		{
311 			struct timeval *tb;
312 
313 			tb = (struct timeval *)mp->b_cont->b_rptr;
314 
315 			if (tb->tv_sec < 0 || tb->tv_usec < 0) {
316 				miocnak(wq, mp, 0, EINVAL);
317 				break;
318 			}
319 			ticks = TIMEVAL_TO_TICK(tb);
320 		}
321 		sbp->sb_ticks = ticks;
322 		if (ticks == 0)
323 			sbp->sb_chunk = 0;
324 		miocack(wq, mp, 0, 0);
325 		sbclosechunk(sbp);
326 		return;
327 
328 	case SBIOCSCHUNK:
329 		/*
330 		 * set up hi/lo water marks on stream head read queue.
331 		 * unlikely to run out of resources. Fix at later date.
332 		 */
333 		if ((mop = allocb(sizeof (struct stroptions),
334 		    BPRI_MED)) != NULL) {
335 			struct stroptions *sop;
336 			uint_t chunk;
337 
338 			chunk = *(uint_t *)mp->b_cont->b_rptr;
339 			mop->b_datap->db_type = M_SETOPTS;
340 			mop->b_wptr += sizeof (struct stroptions);
341 			sop = (struct stroptions *)mop->b_rptr;
342 			sop->so_flags = SO_HIWAT | SO_LOWAT;
343 			sop->so_hiwat = SNIT_HIWAT(chunk, 1);
344 			sop->so_lowat = SNIT_LOWAT(chunk, 1);
345 			qreply(wq, mop);
346 		}
347 
348 		sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
349 		miocack(wq, mp, 0, 0);
350 		sbclosechunk(sbp);
351 		return;
352 
353 	case SBIOCSFLAGS:
354 		sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
355 		miocack(wq, mp, 0, 0);
356 		return;
357 
358 	case SBIOCSSNAP:
359 		/*
360 		 * if chunking dont worry about effects of
361 		 * snipping of message size on head flow control
362 		 * since it has a relatively small bearing on the
363 		 * data rate onto the streamn head.
364 		 */
365 		if (!sbp->sb_chunk) {
366 			/*
367 			 * set up hi/lo water marks on stream head read queue.
368 			 * unlikely to run out of resources. Fix at later date.
369 			 */
370 			if ((mop = allocb(sizeof (struct stroptions),
371 			    BPRI_MED)) != NULL) {
372 				struct stroptions *sop;
373 				uint_t snap;
374 				int fudge;
375 
376 				snap = *(uint_t *)mp->b_cont->b_rptr;
377 				mop->b_datap->db_type = M_SETOPTS;
378 				mop->b_wptr += sizeof (struct stroptions);
379 				sop = (struct stroptions *)mop->b_rptr;
380 				sop->so_flags = SO_HIWAT | SO_LOWAT;
381 				fudge = snap <= 100 ?   4 :
382 				    snap <= 400 ?   2 :
383 				    1;
384 				sop->so_hiwat = SNIT_HIWAT(snap, fudge);
385 				sop->so_lowat = SNIT_LOWAT(snap, fudge);
386 				qreply(wq, mop);
387 			}
388 		}
389 
390 		sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
391 		miocack(wq, mp, 0, 0);
392 		return;
393 
394 	default:
395 		ASSERT(0);
396 		return;
397 	}
398 }
399 
400 /*
401  * Write-side put procedure.  Its main task is to detect ioctls
402  * for manipulating the buffering state and hand them to sbioctl.
403  * Other message types are passed on through.
404  */
405 static void
406 sbwput(queue_t *wq, mblk_t *mp)
407 {
408 	struct	sb	*sbp = (struct sb *)wq->q_ptr;
409 	struct copyresp *resp;
410 
411 	if (sbp->sb_flags & SB_SEND_ON_WRITE)
412 		sbclosechunk(sbp);
413 	switch (mp->b_datap->db_type) {
414 	case M_IOCTL:
415 		sbioctl(wq, mp);
416 		break;
417 
418 	case M_IOCDATA:
419 		resp = (struct copyresp *)mp->b_rptr;
420 		if (resp->cp_rval) {
421 			/*
422 			 * Just free message on failure.
423 			 */
424 			freemsg(mp);
425 			break;
426 		}
427 
428 		switch (resp->cp_cmd) {
429 		case SBIOCSTIME:
430 		case SBIOCSCHUNK:
431 		case SBIOCSFLAGS:
432 		case SBIOCSSNAP:
433 		case SBIOCGTIME:
434 		case SBIOCGCHUNK:
435 		case SBIOCGSNAP:
436 		case SBIOCGFLAGS:
437 			sbioc(wq, mp);
438 			break;
439 
440 		default:
441 			putnext(wq, mp);
442 			break;
443 		}
444 		break;
445 
446 	default:
447 		putnext(wq, mp);
448 		break;
449 	}
450 }
451 
452 /*
453  * Read-side put procedure.  It's responsible for buffering up incoming
454  * messages and grouping them into aggregates according to the current
455  * buffering parameters.
456  */
457 static void
458 sbrput(queue_t *rq, mblk_t *mp)
459 {
460 	struct	sb	*sbp = (struct sb *)rq->q_ptr;
461 
462 	ASSERT(sbp);
463 
464 	switch (mp->b_datap->db_type) {
465 	case M_PROTO:
466 		if (sbp->sb_flags & SB_NO_PROTO_CVT) {
467 			sbclosechunk(sbp);
468 			sbsendit(rq, mp);
469 			break;
470 		} else {
471 			/*
472 			 * Convert M_PROTO to M_DATA.
473 			 */
474 			mp->b_datap->db_type = M_DATA;
475 		}
476 		/* FALLTHRU */
477 
478 	case M_DATA:
479 		if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
480 		    !(sbp->sb_state & SB_FRCVD)) {
481 			sbclosechunk(sbp);
482 			sbsendit(rq, mp);
483 			sbp->sb_state |= SB_FRCVD;
484 		} else
485 			sbaddmsg(rq, mp);
486 
487 		if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
488 			sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
489 			    sbp, sbp->sb_ticks);
490 
491 		break;
492 
493 	case M_FLUSH:
494 		if (*mp->b_rptr & FLUSHR) {
495 			/*
496 			 * Reset timeout, flush the chunk currently in
497 			 * progress, and start a new chunk.
498 			 */
499 			if (sbp->sb_timeoutid) {
500 				(void) quntimeout(sbp->sb_rq,
501 				    sbp->sb_timeoutid);
502 				sbp->sb_timeoutid = 0;
503 			}
504 			if (sbp->sb_mp) {
505 				freemsg(sbp->sb_mp);
506 				sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
507 				sbp->sb_mlen = 0;
508 				sbp->sb_mcount = 0;
509 			}
510 			flushq(rq, FLUSHALL);
511 		}
512 		putnext(rq, mp);
513 		break;
514 
515 	case M_CTL:
516 		/*
517 		 * Zero-length M_CTL means our timeout() popped.
518 		 */
519 		if (MBLKL(mp) == 0) {
520 			freemsg(mp);
521 			sbclosechunk(sbp);
522 		} else {
523 			sbclosechunk(sbp);
524 			sbsendit(rq, mp);
525 		}
526 		break;
527 
528 	default:
529 		if (mp->b_datap->db_type <= QPCTL) {
530 			sbclosechunk(sbp);
531 			sbsendit(rq, mp);
532 		} else {
533 			/* Note: out of band */
534 			putnext(rq, mp);
535 		}
536 		break;
537 	}
538 }
539 
540 /*
541  *  read service procedure.
542  */
543 /* ARGSUSED */
544 static void
545 sbrsrv(queue_t *rq)
546 {
547 	mblk_t	*mp;
548 
549 	/*
550 	 * High priority messages shouldn't get here but if
551 	 * one does, jam it through to avoid infinite loop.
552 	 */
553 	while ((mp = getq(rq)) != NULL) {
554 		if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
555 			/* should only get here if SB_NO_SROPS */
556 			(void) putbq(rq, mp);
557 			return;
558 		}
559 		putnext(rq, mp);
560 	}
561 }
562 
563 /*
564  * Handle write-side M_IOCTL messages.
565  */
566 static void
567 sbioctl(queue_t *wq, mblk_t *mp)
568 {
569 	struct	sb	*sbp = (struct sb *)wq->q_ptr;
570 	struct iocblk	*iocp = (struct iocblk *)mp->b_rptr;
571 	struct	timeval	*t;
572 	clock_t	ticks;
573 	mblk_t	*mop;
574 	int	transparent = iocp->ioc_count;
575 	mblk_t	*datamp;
576 	int	error;
577 
578 	switch (iocp->ioc_cmd) {
579 	case SBIOCSTIME:
580 		if (iocp->ioc_count == TRANSPARENT) {
581 #ifdef _SYSCALL32_IMPL
582 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
583 				mcopyin(mp, NULL, sizeof (struct timeval32),
584 				    NULL);
585 			} else
586 #endif /* _SYSCALL32_IMPL */
587 			{
588 				mcopyin(mp, NULL, sizeof (*t), NULL);
589 			}
590 			qreply(wq, mp);
591 		} else {
592 			/*
593 			 * Verify argument length.
594 			 */
595 #ifdef _SYSCALL32_IMPL
596 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
597 				struct timeval32 *t32;
598 
599 				error = miocpullup(mp,
600 				    sizeof (struct timeval32));
601 				if (error != 0) {
602 					miocnak(wq, mp, 0, error);
603 					break;
604 				}
605 				t32 = (struct timeval32 *)mp->b_cont->b_rptr;
606 				if (t32->tv_sec < 0 || t32->tv_usec < 0) {
607 					miocnak(wq, mp, 0, EINVAL);
608 					break;
609 				}
610 				ticks = TIMEVAL_TO_TICK(t32);
611 			} else
612 #endif /* _SYSCALL32_IMPL */
613 			{
614 				error = miocpullup(mp, sizeof (struct timeval));
615 				if (error != 0) {
616 					miocnak(wq, mp, 0, error);
617 					break;
618 				}
619 
620 				t = (struct timeval *)mp->b_cont->b_rptr;
621 				if (t->tv_sec < 0 || t->tv_usec < 0) {
622 					miocnak(wq, mp, 0, EINVAL);
623 					break;
624 				}
625 				ticks = TIMEVAL_TO_TICK(t);
626 			}
627 			sbp->sb_ticks = ticks;
628 			if (ticks == 0)
629 				sbp->sb_chunk = 0;
630 			miocack(wq, mp, 0, 0);
631 			sbclosechunk(sbp);
632 		}
633 		break;
634 
635 	case SBIOCGTIME: {
636 		struct timeval *t;
637 
638 		/*
639 		 * Verify argument length.
640 		 */
641 		if (transparent != TRANSPARENT) {
642 #ifdef _SYSCALL32_IMPL
643 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
644 				error = miocpullup(mp,
645 				    sizeof (struct timeval32));
646 				if (error != 0) {
647 					miocnak(wq, mp, 0, error);
648 					break;
649 				}
650 			} else
651 #endif /* _SYSCALL32_IMPL */
652 			error = miocpullup(mp, sizeof (struct timeval));
653 			if (error != 0) {
654 				miocnak(wq, mp, 0, error);
655 				break;
656 			}
657 		}
658 
659 		/*
660 		 * If infinite timeout, return range error
661 		 * for the ioctl.
662 		 */
663 		if (sbp->sb_ticks < 0) {
664 			miocnak(wq, mp, 0, ERANGE);
665 			break;
666 		}
667 
668 #ifdef _SYSCALL32_IMPL
669 		if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
670 			struct timeval32 *t32;
671 
672 			if (transparent == TRANSPARENT) {
673 				datamp = allocb(sizeof (*t32), BPRI_MED);
674 				if (datamp == NULL) {
675 					miocnak(wq, mp, 0, EAGAIN);
676 					break;
677 				}
678 				mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
679 			}
680 
681 			t32 = (struct timeval32 *)mp->b_cont->b_rptr;
682 			TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
683 
684 			if (transparent == TRANSPARENT)
685 				qreply(wq, mp);
686 			else
687 				miocack(wq, mp, sizeof (*t32), 0);
688 		} else
689 #endif /* _SYSCALL32_IMPL */
690 		{
691 			if (transparent == TRANSPARENT) {
692 				datamp = allocb(sizeof (*t), BPRI_MED);
693 				if (datamp == NULL) {
694 					miocnak(wq, mp, 0, EAGAIN);
695 					break;
696 				}
697 				mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
698 			}
699 
700 			t = (struct timeval *)mp->b_cont->b_rptr;
701 			TICK_TO_TIMEVAL(sbp->sb_ticks, t);
702 
703 			if (transparent == TRANSPARENT)
704 				qreply(wq, mp);
705 			else
706 				miocack(wq, mp, sizeof (*t), 0);
707 		}
708 		break;
709 	}
710 
711 	case SBIOCCTIME:
712 		sbp->sb_ticks = -1;
713 		miocack(wq, mp, 0, 0);
714 		break;
715 
716 	case SBIOCSCHUNK:
717 		if (iocp->ioc_count == TRANSPARENT) {
718 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
719 			qreply(wq, mp);
720 		} else {
721 			/*
722 			 * Verify argument length.
723 			 */
724 			error = miocpullup(mp, sizeof (uint_t));
725 			if (error != 0) {
726 				miocnak(wq, mp, 0, error);
727 				break;
728 			}
729 
730 			/*
731 			 * set up hi/lo water marks on stream head read queue.
732 			 * unlikely to run out of resources. Fix at later date.
733 			 */
734 			if ((mop = allocb(sizeof (struct stroptions),
735 			    BPRI_MED)) != NULL) {
736 				struct stroptions *sop;
737 				uint_t chunk;
738 
739 				chunk = *(uint_t *)mp->b_cont->b_rptr;
740 				mop->b_datap->db_type = M_SETOPTS;
741 				mop->b_wptr += sizeof (struct stroptions);
742 				sop = (struct stroptions *)mop->b_rptr;
743 				sop->so_flags = SO_HIWAT | SO_LOWAT;
744 				sop->so_hiwat = SNIT_HIWAT(chunk, 1);
745 				sop->so_lowat = SNIT_LOWAT(chunk, 1);
746 				qreply(wq, mop);
747 			}
748 
749 			sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
750 			miocack(wq, mp, 0, 0);
751 			sbclosechunk(sbp);
752 		}
753 		break;
754 
755 	case SBIOCGCHUNK:
756 		/*
757 		 * Verify argument length.
758 		 */
759 		if (transparent != TRANSPARENT) {
760 			error = miocpullup(mp, sizeof (uint_t));
761 			if (error != 0) {
762 				miocnak(wq, mp, 0, error);
763 				break;
764 			}
765 		}
766 
767 		if (transparent == TRANSPARENT) {
768 			datamp = allocb(sizeof (uint_t), BPRI_MED);
769 			if (datamp == NULL) {
770 				miocnak(wq, mp, 0, EAGAIN);
771 				break;
772 			}
773 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
774 		}
775 
776 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
777 
778 		if (transparent == TRANSPARENT)
779 			qreply(wq, mp);
780 		else
781 			miocack(wq, mp, sizeof (uint_t), 0);
782 		break;
783 
784 	case SBIOCSSNAP:
785 		if (iocp->ioc_count == TRANSPARENT) {
786 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
787 			qreply(wq, mp);
788 		} else {
789 			/*
790 			 * Verify argument length.
791 			 */
792 			error = miocpullup(mp, sizeof (uint_t));
793 			if (error != 0) {
794 				miocnak(wq, mp, 0, error);
795 				break;
796 			}
797 
798 			/*
799 			 * if chunking dont worry about effects of
800 			 * snipping of message size on head flow control
801 			 * since it has a relatively small bearing on the
802 			 * data rate onto the streamn head.
803 			 */
804 			if (!sbp->sb_chunk) {
805 				/*
806 				 * set up hi/lo water marks on stream
807 				 * head read queue.  unlikely to run out
808 				 * of resources. Fix at later date.
809 				 */
810 				if ((mop = allocb(sizeof (struct stroptions),
811 				    BPRI_MED)) != NULL) {
812 					struct stroptions *sop;
813 					uint_t snap;
814 					int fudge;
815 
816 					snap = *(uint_t *)mp->b_cont->b_rptr;
817 					mop->b_datap->db_type = M_SETOPTS;
818 					mop->b_wptr += sizeof (*sop);
819 					sop = (struct stroptions *)mop->b_rptr;
820 					sop->so_flags = SO_HIWAT | SO_LOWAT;
821 					fudge = (snap <= 100) ? 4 :
822 					    (snap <= 400) ? 2 : 1;
823 					sop->so_hiwat = SNIT_HIWAT(snap, fudge);
824 					sop->so_lowat = SNIT_LOWAT(snap, fudge);
825 					qreply(wq, mop);
826 				}
827 			}
828 
829 			sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
830 
831 			miocack(wq, mp, 0, 0);
832 		}
833 		break;
834 
835 	case SBIOCGSNAP:
836 		/*
837 		 * Verify argument length
838 		 */
839 		if (transparent != TRANSPARENT) {
840 			error = miocpullup(mp, sizeof (uint_t));
841 			if (error != 0) {
842 				miocnak(wq, mp, 0, error);
843 				break;
844 			}
845 		}
846 
847 		if (transparent == TRANSPARENT) {
848 			datamp = allocb(sizeof (uint_t), BPRI_MED);
849 			if (datamp == NULL) {
850 				miocnak(wq, mp, 0, EAGAIN);
851 				break;
852 			}
853 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
854 		}
855 
856 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
857 
858 		if (transparent == TRANSPARENT)
859 			qreply(wq, mp);
860 		else
861 			miocack(wq, mp, sizeof (uint_t), 0);
862 		break;
863 
864 	case SBIOCSFLAGS:
865 		/*
866 		 * set the flags.
867 		 */
868 		if (iocp->ioc_count == TRANSPARENT) {
869 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
870 			qreply(wq, mp);
871 		} else {
872 			error = miocpullup(mp, sizeof (uint_t));
873 			if (error != 0) {
874 				miocnak(wq, mp, 0, error);
875 				break;
876 			}
877 			sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
878 			miocack(wq, mp, 0, 0);
879 		}
880 		break;
881 
882 	case SBIOCGFLAGS:
883 		/*
884 		 * Verify argument length
885 		 */
886 		if (transparent != TRANSPARENT) {
887 			error = miocpullup(mp, sizeof (uint_t));
888 			if (error != 0) {
889 				miocnak(wq, mp, 0, error);
890 				break;
891 			}
892 		}
893 
894 		if (transparent == TRANSPARENT) {
895 			datamp = allocb(sizeof (uint_t), BPRI_MED);
896 			if (datamp == NULL) {
897 				miocnak(wq, mp, 0, EAGAIN);
898 				break;
899 			}
900 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
901 		}
902 
903 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
904 
905 		if (transparent == TRANSPARENT)
906 			qreply(wq, mp);
907 		else
908 			miocack(wq, mp, sizeof (uint_t), 0);
909 		break;
910 
911 
912 	default:
913 		putnext(wq, mp);
914 		break;
915 	}
916 }
917 
918 /*
919  * Given a length l, calculate the amount of extra storage
920  * required to round it up to the next multiple of the alignment a.
921  */
922 #define	RoundUpAmt(l, a)	((l) % (a) ? (a) - ((l) % (a)) : 0)
923 /*
924  * Calculate additional amount of space required for alignment.
925  */
926 #define	Align(l)		RoundUpAmt(l, sizeof (ulong_t))
927 /*
928  * Smallest possible message size when headers are enabled.
929  * This is used to calculate whether a chunk is nearly full.
930  */
931 #define	SMALLEST_MESSAGE	sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
932 
933 /*
934  * Process a read-side M_DATA message.
935  *
936  * If the currently accumulating chunk doesn't have enough room
937  * for the message, close off the chunk, pass it upward, and start
938  * a new one.  Then add the message to the current chunk, taking
939  * account of the possibility that the message's size exceeds the
940  * chunk size.
941  *
942  * If headers are enabled add an sb_hdr header and trailing alignment padding.
943  *
944  * To optimise performance the total number of msgbs should be kept
945  * to a minimum. This is achieved by using any remaining space in message N
946  * for both its own padding as well as the header of message N+1 if possible.
947  * If there's insufficient space we allocate one message to hold this 'wrapper'.
948  * (there's likely to be space beyond message N, since allocb would have
949  * rounded up the required size to one of the dblk_sizes).
950  *
951  */
952 static void
953 sbaddmsg(queue_t *rq, mblk_t *mp)
954 {
955 	struct sb	*sbp;
956 	struct timeval	t;
957 	struct sb_hdr	hp;
958 	mblk_t *wrapper;	/* padding for msg N, header for msg N+1 */
959 	mblk_t *last;		/* last mblk of current message */
960 	size_t wrapperlen;	/* length of header + padding */
961 	size_t origlen;		/* data length before truncation */
962 	size_t pad;		/* bytes required to align header */
963 
964 	sbp = (struct sb *)rq->q_ptr;
965 
966 	origlen = msgdsize(mp);
967 
968 	/*
969 	 * Truncate the message.
970 	 */
971 	if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
972 			(adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
973 		hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
974 	else
975 		hp.sbh_totlen = hp.sbh_msglen = origlen;
976 
977 	if (sbp->sb_flags & SB_NO_HEADER) {
978 
979 		/*
980 		 * Would the inclusion of this message overflow the current
981 		 * chunk? If so close the chunk off and start a new one.
982 		 */
983 		if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
984 			sbclosechunk(sbp);
985 		/*
986 		 * First message too big for chunk - just send it up.
987 		 * This will always be true when we're not chunking.
988 		 */
989 		if (hp.sbh_totlen > sbp->sb_chunk) {
990 			sbsendit(rq, mp);
991 			return;
992 		}
993 
994 		/*
995 		 * We now know that the msg will fit in the chunk.
996 		 * Link it onto the end of the chunk.
997 		 * Since linkb() walks the entire chain, we keep a pointer to
998 		 * the first mblk of the last msgb added and call linkb on that
999 		 * that last message, rather than performing the
1000 		 * O(n) linkb() operation on the whole chain.
1001 		 * sb_head isn't needed in this SB_NO_HEADER mode.
1002 		 */
1003 		if (sbp->sb_mp)
1004 			linkb(sbp->sb_tail, mp);
1005 		else
1006 			sbp->sb_mp = mp;
1007 
1008 		sbp->sb_tail = mp;
1009 		sbp->sb_mlen += hp.sbh_totlen;
1010 		sbp->sb_mcount++;
1011 	} else {
1012 		/* Timestamp must be done immediately */
1013 		uniqtime(&t);
1014 		TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
1015 
1016 		pad = Align(hp.sbh_totlen);
1017 		hp.sbh_totlen += sizeof (hp);
1018 
1019 		/* We can't fit this message on the current chunk. */
1020 		if ((sbp->sb_mlen + hp.sbh_totlen) > sbp->sb_chunk)
1021 			sbclosechunk(sbp);
1022 
1023 		/*
1024 		 * If we closed it (just now or during a previous
1025 		 * call) then allocate the head of a new chunk.
1026 		 */
1027 		if (sbp->sb_head == NULL) {
1028 			/* Allocate leading header of new chunk */
1029 			sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
1030 			if (sbp->sb_head == NULL) {
1031 				/*
1032 				 * Memory allocation failure.
1033 				 * This will need to be revisited
1034 				 * since using certain flag combinations
1035 				 * can result in messages being dropped
1036 				 * silently.
1037 				 */
1038 				freemsg(mp);
1039 				sbp->sb_drops++;
1040 				return;
1041 			}
1042 			sbp->sb_mp = sbp->sb_head;
1043 		}
1044 
1045 		/*
1046 		 * Set the header values and join the message to the
1047 		 * chunk. The header values are copied into the chunk
1048 		 * after we adjust for padding below.
1049 		 */
1050 		hp.sbh_drops = sbp->sb_drops;
1051 		hp.sbh_origlen = origlen;
1052 		linkb(sbp->sb_head, mp);
1053 		sbp->sb_mcount++;
1054 		sbp->sb_mlen += hp.sbh_totlen;
1055 
1056 		/*
1057 		 * There's no chance to fit another message on the
1058 		 * chunk -- forgo the padding and close the chunk.
1059 		 */
1060 		if ((sbp->sb_mlen + pad + SMALLEST_MESSAGE) > sbp->sb_chunk) {
1061 			(void) memcpy(sbp->sb_head->b_wptr, (char *)&hp,
1062 			    sizeof (hp));
1063 			sbp->sb_head->b_wptr += sizeof (hp);
1064 			ASSERT(sbp->sb_head->b_wptr <=
1065 			    sbp->sb_head->b_datap->db_lim);
1066 			sbclosechunk(sbp);
1067 			return;
1068 		}
1069 
1070 		/*
1071 		 * We may add another message to this chunk -- adjust
1072 		 * the headers for padding to be added below.
1073 		 */
1074 		hp.sbh_totlen += pad;
1075 		(void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
1076 		sbp->sb_head->b_wptr += sizeof (hp);
1077 		ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
1078 		sbp->sb_mlen += pad;
1079 
1080 		/*
1081 		 * Find space for the wrapper. The wrapper consists of:
1082 		 *
1083 		 * 1) Padding for this message (this is to ensure each header
1084 		 * begins on an 8 byte boundary in the userland buffer).
1085 		 *
1086 		 * 2) Space for the next message's header, in case the next
1087 		 * next message will fit in this chunk.
1088 		 *
1089 		 * It may be possible to append the wrapper to the last mblk
1090 		 * of the message, but only if we 'own' the data. If the dblk
1091 		 * has been shared through dupmsg() we mustn't alter it.
1092 		 */
1093 		wrapperlen = (sizeof (hp) + pad);
1094 
1095 		/* Is there space for the wrapper beyond the message's data ? */
1096 		for (last = mp; last->b_cont; last = last->b_cont)
1097 			;
1098 
1099 		if ((wrapperlen <= MBLKTAIL(last)) &&
1100 			(last->b_datap->db_ref == 1)) {
1101 			if (pad > 0) {
1102 				/*
1103 				 * Pad with zeroes to the next pointer boundary
1104 				 * (we don't want to disclose kernel data to
1105 				 * users), then advance wptr.
1106 				 */
1107 				(void) memset(last->b_wptr, 0, pad);
1108 				last->b_wptr += pad;
1109 			}
1110 			/* Remember where to write the header information */
1111 			sbp->sb_head = last;
1112 		} else {
1113 			/* Have to allocate additional space for the wrapper */
1114 			wrapper = allocb(wrapperlen, BPRI_MED);
1115 			if (wrapper == NULL) {
1116 				sbclosechunk(sbp);
1117 				return;
1118 			}
1119 			if (pad > 0) {
1120 				/*
1121 				 * Pad with zeroes (we don't want to disclose
1122 				 * kernel data to users).
1123 				 */
1124 				(void) memset(wrapper->b_wptr, 0, pad);
1125 				wrapper->b_wptr += pad;
1126 			}
1127 			/* Link the wrapper msg onto the end of the chunk */
1128 			linkb(mp, wrapper);
1129 			/* Remember to write the next header in this wrapper */
1130 			sbp->sb_head = wrapper;
1131 		}
1132 	}
1133 }
1134 
1135 /*
1136  * Called from timeout().
1137  * Signal a timeout by passing a zero-length M_CTL msg in the read-side
1138  * to synchronize with any active module threads (open, close, wput, rput).
1139  */
1140 static void
1141 sbtick(void *arg)
1142 {
1143 	struct sb *sbp = arg;
1144 	queue_t	*rq;
1145 
1146 	ASSERT(sbp);
1147 
1148 	rq = sbp->sb_rq;
1149 	sbp->sb_timeoutid = 0;		/* timeout has fired */
1150 
1151 	if (putctl(rq, M_CTL) == 0)	/* failure */
1152 		sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
1153 }
1154 
1155 /*
1156  * Close off the currently accumulating chunk and pass
1157  * it upward.  Takes care of resetting timers as well.
1158  *
1159  * This routine is called both directly and as a result
1160  * of the chunk timeout expiring.
1161  */
1162 static void
1163 sbclosechunk(struct sb *sbp)
1164 {
1165 	mblk_t	*mp;
1166 	queue_t	*rq;
1167 
1168 	ASSERT(sbp);
1169 
1170 	if (sbp->sb_timeoutid) {
1171 		(void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
1172 		sbp->sb_timeoutid = 0;
1173 	}
1174 
1175 	mp = sbp->sb_mp;
1176 	rq = sbp->sb_rq;
1177 
1178 	/*
1179 	 * If there's currently a chunk in progress, close it off
1180 	 * and try to send it up.
1181 	 */
1182 	if (mp) {
1183 		sbsendit(rq, mp);
1184 	}
1185 
1186 	/*
1187 	 * Clear old chunk.  Ready for new msgs.
1188 	 */
1189 	sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
1190 	sbp->sb_mlen = 0;
1191 	sbp->sb_mcount = 0;
1192 	if (sbp->sb_flags & SB_DEFER_CHUNK)
1193 		sbp->sb_state &= ~SB_FRCVD;
1194 
1195 }
1196 
1197 static void
1198 sbsendit(queue_t *rq, mblk_t *mp)
1199 {
1200 	struct	sb	*sbp = (struct sb *)rq->q_ptr;
1201 
1202 	if (!canputnext(rq)) {
1203 		if (sbp->sb_flags & SB_NO_DROPS)
1204 			(void) putq(rq, mp);
1205 		else {
1206 			freemsg(mp);
1207 			sbp->sb_drops += sbp->sb_mcount;
1208 		}
1209 		return;
1210 	}
1211 	/*
1212 	 * If there are messages on the q already, keep
1213 	 * queueing them since they need to be processed in order.
1214 	 */
1215 	if (qsize(rq) > 0) {
1216 		/* should only get here if SB_NO_DROPS */
1217 		(void) putq(rq, mp);
1218 	}
1219 	else
1220 		putnext(rq, mp);
1221 }
1222