xref: /illumos-gate/usr/src/uts/common/rpc/rpcmod.c (revision 3d393ee6c37fa10ac512ed6d36109ad616dc7c1a)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /* Copyright (c) 1990 Mentat Inc. */
26 
27 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
28 /*	  All Rights Reserved  	*/
29 
30 /*
31  * Kernel RPC filtering module
32  */
33 
34 #include <sys/param.h>
35 #include <sys/types.h>
36 #include <sys/stream.h>
37 #include <sys/stropts.h>
38 #include <sys/tihdr.h>
39 #include <sys/timod.h>
40 #include <sys/tiuser.h>
41 #include <sys/debug.h>
42 #include <sys/signal.h>
43 #include <sys/pcb.h>
44 #include <sys/user.h>
45 #include <sys/errno.h>
46 #include <sys/cred.h>
47 #include <sys/policy.h>
48 #include <sys/inline.h>
49 #include <sys/cmn_err.h>
50 #include <sys/kmem.h>
51 #include <sys/file.h>
52 #include <sys/sysmacros.h>
53 #include <sys/systm.h>
54 #include <sys/t_lock.h>
55 #include <sys/ddi.h>
56 #include <sys/vtrace.h>
57 #include <sys/callb.h>
58 #include <sys/strsun.h>
59 
60 #include <sys/strlog.h>
61 #include <rpc/rpc_com.h>
62 #include <inet/common.h>
63 #include <rpc/types.h>
64 #include <sys/time.h>
65 #include <rpc/xdr.h>
66 #include <rpc/auth.h>
67 #include <rpc/clnt.h>
68 #include <rpc/rpc_msg.h>
69 #include <rpc/clnt.h>
70 #include <rpc/svc.h>
71 #include <rpc/rpcsys.h>
72 #include <rpc/rpc_rdma.h>
73 
74 /*
75  * This is the loadable module wrapper.
76  */
77 #include <sys/conf.h>
78 #include <sys/modctl.h>
79 #include <sys/syscall.h>
80 
81 extern struct streamtab rpcinfo;
82 
83 static struct fmodsw fsw = {
84 	"rpcmod",
85 	&rpcinfo,
86 	D_NEW|D_MP,
87 };
88 
89 /*
90  * Module linkage information for the kernel.
91  */
92 
93 static struct modlstrmod modlstrmod = {
94 	&mod_strmodops, "rpc interface str mod", &fsw
95 };
96 
97 /*
98  * For the RPC system call.
99  */
100 static struct sysent rpcsysent = {
101 	2,
102 	SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD,
103 	rpcsys
104 };
105 
106 static struct modlsys modlsys = {
107 	&mod_syscallops,
108 	"RPC syscall",
109 	&rpcsysent
110 };
111 
112 #ifdef _SYSCALL32_IMPL
113 static struct modlsys modlsys32 = {
114 	&mod_syscallops32,
115 	"32-bit RPC syscall",
116 	&rpcsysent
117 };
118 #endif /* _SYSCALL32_IMPL */
119 
120 static struct modlinkage modlinkage = {
121 	MODREV_1,
122 	{
123 		&modlsys,
124 #ifdef _SYSCALL32_IMPL
125 		&modlsys32,
126 #endif
127 		&modlstrmod,
128 		NULL
129 	}
130 };
131 
132 int
133 _init(void)
134 {
135 	int error = 0;
136 	callb_id_t cid;
137 	int status;
138 
139 	svc_init();
140 	clnt_init();
141 	cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc");
142 
143 	if (error = mod_install(&modlinkage)) {
144 		/*
145 		 * Could not install module, cleanup previous
146 		 * initialization work.
147 		 */
148 		clnt_fini();
149 		if (cid != NULL)
150 			(void) callb_delete(cid);
151 
152 		return (error);
153 	}
154 
155 	/*
156 	 * Load up the RDMA plugins and initialize the stats. Even if the
157 	 * plugins loadup fails, but rpcmod was successfully installed the
158 	 * counters still get initialized.
159 	 */
160 	rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL);
161 	mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL);
162 
163 	cv_init(&rdma_wait.svc_cv, NULL, CV_DEFAULT, NULL);
164 	mutex_init(&rdma_wait.svc_lock, NULL, MUTEX_DEFAULT, NULL);
165 
166 	mt_kstat_init();
167 
168 	/*
169 	 * Get our identification into ldi.  This is used for loading
170 	 * other modules, e.g. rpcib.
171 	 */
172 	status = ldi_ident_from_mod(&modlinkage, &rpcmod_li);
173 	if (status != 0) {
174 		cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status);
175 		rpcmod_li = NULL;
176 	}
177 
178 	return (error);
179 }
180 
181 /*
182  * The unload entry point fails, because we advertise entry points into
183  * rpcmod from the rest of kRPC: rpcmod_release().
184  */
185 int
186 _fini(void)
187 {
188 	return (EBUSY);
189 }
190 
191 int
192 _info(struct modinfo *modinfop)
193 {
194 	return (mod_info(&modlinkage, modinfop));
195 }
196 
197 extern int nulldev();
198 
199 #define	RPCMOD_ID	2049
200 
201 int rmm_open(), rmm_close();
202 
203 /*
204  * To save instructions, since STREAMS ignores the return value
205  * from these functions, they are defined as void here. Kind of icky, but...
206  */
207 void rmm_rput(queue_t *, mblk_t *);
208 void rmm_wput(queue_t *, mblk_t *);
209 void rmm_rsrv(queue_t *);
210 void rmm_wsrv(queue_t *);
211 
212 int rpcmodopen(), rpcmodclose();
213 void rpcmodrput(), rpcmodwput();
214 void rpcmodrsrv(), rpcmodwsrv();
215 
216 static	void	rpcmodwput_other(queue_t *, mblk_t *);
217 static	int	mir_close(queue_t *q);
218 static	int	mir_open(queue_t *q, dev_t *devp, int flag, int sflag,
219 		    cred_t *credp);
220 static	void	mir_rput(queue_t *q, mblk_t *mp);
221 static	void	mir_rsrv(queue_t *q);
222 static	void	mir_wput(queue_t *q, mblk_t *mp);
223 static	void	mir_wsrv(queue_t *q);
224 
225 static struct module_info rpcmod_info =
226 	{RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024};
227 
228 /*
229  * Read side has no service procedure.
230  */
231 static struct qinit rpcmodrinit = {
232 	(int (*)())rmm_rput,
233 	(int (*)())rmm_rsrv,
234 	rmm_open,
235 	rmm_close,
236 	nulldev,
237 	&rpcmod_info,
238 	NULL
239 };
240 
241 /*
242  * The write put procedure is simply putnext to conserve stack space.
243  * The write service procedure is not used to queue data, but instead to
244  * synchronize with flow control.
245  */
246 static struct qinit rpcmodwinit = {
247 	(int (*)())rmm_wput,
248 	(int (*)())rmm_wsrv,
249 	rmm_open,
250 	rmm_close,
251 	nulldev,
252 	&rpcmod_info,
253 	NULL
254 };
255 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL };
256 
257 struct xprt_style_ops {
258 	int (*xo_open)();
259 	int (*xo_close)();
260 	void (*xo_wput)();
261 	void (*xo_wsrv)();
262 	void (*xo_rput)();
263 	void (*xo_rsrv)();
264 };
265 
266 static struct xprt_style_ops xprt_clts_ops = {
267 	rpcmodopen,
268 	rpcmodclose,
269 	rpcmodwput,
270 	rpcmodwsrv,
271 	rpcmodrput,
272 	NULL
273 };
274 
275 static struct xprt_style_ops xprt_cots_ops = {
276 	mir_open,
277 	mir_close,
278 	mir_wput,
279 	mir_wsrv,
280 	mir_rput,
281 	mir_rsrv
282 };
283 
284 /*
285  * Per rpcmod "slot" data structure. q->q_ptr points to one of these.
286  */
287 struct rpcm {
288 	void		*rm_krpc_cell;	/* Reserved for use by KRPC */
289 	struct		xprt_style_ops	*rm_ops;
290 	int		rm_type;	/* Client or server side stream */
291 #define	RM_CLOSING	0x1		/* somebody is trying to close slot */
292 	uint_t		rm_state;	/* state of the slot. see above */
293 	uint_t		rm_ref;		/* cnt of external references to slot */
294 	kmutex_t	rm_lock;	/* mutex protecting above fields */
295 	kcondvar_t	rm_cwait;	/* condition for closing */
296 	zoneid_t	rm_zoneid;	/* zone which pushed rpcmod */
297 };
298 
299 struct temp_slot {
300 	void *cell;
301 	struct xprt_style_ops *ops;
302 	int type;
303 	mblk_t *info_ack;
304 	kmutex_t lock;
305 	kcondvar_t wait;
306 };
307 
308 typedef struct mir_s {
309 	void	*mir_krpc_cell;	/* Reserved for KRPC use. This field */
310 					/* must be first in the structure. */
311 	struct xprt_style_ops	*rm_ops;
312 	int	mir_type;		/* Client or server side stream */
313 
314 	mblk_t	*mir_head_mp;		/* RPC msg in progress */
315 		/*
316 		 * mir_head_mp points the first mblk being collected in
317 		 * the current RPC message.  Record headers are removed
318 		 * before data is linked into mir_head_mp.
319 		 */
320 	mblk_t	*mir_tail_mp;		/* Last mblk in mir_head_mp */
321 		/*
322 		 * mir_tail_mp points to the last mblk in the message
323 		 * chain starting at mir_head_mp.  It is only valid
324 		 * if mir_head_mp is non-NULL and is used to add new
325 		 * data blocks to the end of chain quickly.
326 		 */
327 
328 	int32_t	mir_frag_len;		/* Bytes seen in the current frag */
329 		/*
330 		 * mir_frag_len starts at -4 for beginning of each fragment.
331 		 * When this length is negative, it indicates the number of
332 		 * bytes that rpcmod needs to complete the record marker
333 		 * header.  When it is positive or zero, it holds the number
334 		 * of bytes that have arrived for the current fragment and
335 		 * are held in mir_header_mp.
336 		 */
337 
338 	int32_t	mir_frag_header;
339 		/*
340 		 * Fragment header as collected for the current fragment.
341 		 * It holds the last-fragment indicator and the number
342 		 * of bytes in the fragment.
343 		 */
344 
345 	unsigned int
346 		mir_ordrel_pending : 1,	/* Sent T_ORDREL_REQ */
347 		mir_hold_inbound : 1,	/* Hold inbound messages on server */
348 					/* side until outbound flow control */
349 					/* is relieved. */
350 		mir_closing : 1,	/* The stream is being closed */
351 		mir_inrservice : 1,	/* data queued or rd srv proc running */
352 		mir_inwservice : 1,	/* data queued or wr srv proc running */
353 		mir_inwflushdata : 1,	/* flush M_DATAs when srv runs */
354 		/*
355 		 * On client streams, mir_clntreq is 0 or 1; it is set
356 		 * to 1 whenever a new request is sent out (mir_wput)
357 		 * and cleared when the timer fires (mir_timer).  If
358 		 * the timer fires with this value equal to 0, then the
359 		 * stream is considered idle and KRPC is notified.
360 		 */
361 		mir_clntreq : 1,
362 		/*
363 		 * On server streams, stop accepting messages
364 		 */
365 		mir_svc_no_more_msgs : 1,
366 		mir_listen_stream : 1,	/* listen end point */
367 		mir_unused : 1,	/* no longer used */
368 		mir_timer_call : 1,
369 		mir_junk_fill_thru_bit_31 : 21;
370 
371 	int	mir_setup_complete;	/* server has initialized everything */
372 	timeout_id_t mir_timer_id;	/* Timer for idle checks */
373 	clock_t	mir_idle_timeout;	/* Allowed idle time before shutdown */
374 		/*
375 		 * This value is copied from clnt_idle_timeout or
376 		 * svc_idle_timeout during the appropriate ioctl.
377 		 * Kept in milliseconds
378 		 */
379 	clock_t	mir_use_timestamp;	/* updated on client with each use */
380 		/*
381 		 * This value is set to lbolt
382 		 * every time a client stream sends or receives data.
383 		 * Even if the timer message arrives, we don't shutdown
384 		 * client unless:
385 		 *    lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp.
386 		 * This value is kept in HZ.
387 		 */
388 
389 	uint_t	*mir_max_msg_sizep;	/* Reference to sanity check size */
390 		/*
391 		 * This pointer is set to &clnt_max_msg_size or
392 		 * &svc_max_msg_size during the appropriate ioctl.
393 		 */
394 	zoneid_t mir_zoneid;	/* zone which pushed rpcmod */
395 	/* Server-side fields. */
396 	int	mir_ref_cnt;		/* Reference count: server side only */
397 					/* counts the number of references */
398 					/* that a kernel RPC server thread */
399 					/* (see svc_run()) has on this rpcmod */
400 					/* slot. Effectively, it is the */
401 					/* number * of unprocessed messages */
402 					/* that have been passed up to the */
403 					/* KRPC layer */
404 
405 	mblk_t	*mir_svc_pend_mp;	/* Pending T_ORDREL_IND or */
406 					/* T_DISCON_IND */
407 
408 	/*
409 	 * these fields are for both client and server, but for debugging,
410 	 * it is easier to have these last in the structure.
411 	 */
412 	kmutex_t	mir_mutex;	/* Mutex and condvar for close */
413 	kcondvar_t	mir_condvar;	/* synchronization. */
414 	kcondvar_t	mir_timer_cv;	/* Timer routine sync. */
415 } mir_t;
416 
417 void tmp_rput(queue_t *q, mblk_t *mp);
418 
419 struct xprt_style_ops tmpops = {
420 	NULL,
421 	NULL,
422 	putnext,
423 	NULL,
424 	tmp_rput,
425 	NULL
426 };
427 
428 void
429 tmp_rput(queue_t *q, mblk_t *mp)
430 {
431 	struct temp_slot *t = (struct temp_slot *)(q->q_ptr);
432 	struct T_info_ack *pptr;
433 
434 	switch (mp->b_datap->db_type) {
435 	case M_PCPROTO:
436 		pptr = (struct T_info_ack *)mp->b_rptr;
437 		switch (pptr->PRIM_type) {
438 		case T_INFO_ACK:
439 			mutex_enter(&t->lock);
440 			t->info_ack = mp;
441 			cv_signal(&t->wait);
442 			mutex_exit(&t->lock);
443 			return;
444 		default:
445 			break;
446 		}
447 	default:
448 		break;
449 	}
450 
451 	/*
452 	 * Not an info-ack, so free it. This is ok because we should
453 	 * not be receiving data until the open finishes: rpcmod
454 	 * is pushed well before the end-point is bound to an address.
455 	 */
456 	freemsg(mp);
457 }
458 
459 int
460 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
461 {
462 	mblk_t *bp;
463 	struct temp_slot ts, *t;
464 	struct T_info_ack *pptr;
465 	int error = 0;
466 
467 	ASSERT(q != NULL);
468 	/*
469 	 * Check for re-opens.
470 	 */
471 	if (q->q_ptr) {
472 		TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END,
473 		    "rpcmodopen_end:(%s)", "q->qptr");
474 		return (0);
475 	}
476 
477 	t = &ts;
478 	bzero(t, sizeof (*t));
479 	q->q_ptr = (void *)t;
480 	WR(q)->q_ptr = (void *)t;
481 
482 	/*
483 	 * Allocate the required messages upfront.
484 	 */
485 	if ((bp = allocb(sizeof (struct T_info_req) +
486 	    sizeof (struct T_info_ack), BPRI_LO)) == (mblk_t *)NULL) {
487 		return (ENOBUFS);
488 	}
489 
490 	mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL);
491 	cv_init(&t->wait, NULL, CV_DEFAULT, NULL);
492 
493 	t->ops = &tmpops;
494 
495 	qprocson(q);
496 	bp->b_datap->db_type = M_PCPROTO;
497 	*(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ;
498 	bp->b_wptr += sizeof (struct T_info_req);
499 	putnext(WR(q), bp);
500 
501 	mutex_enter(&t->lock);
502 	while (t->info_ack == NULL) {
503 		if (cv_wait_sig(&t->wait, &t->lock) == 0) {
504 			error = EINTR;
505 			break;
506 		}
507 	}
508 	mutex_exit(&t->lock);
509 
510 	if (error)
511 		goto out;
512 
513 	pptr = (struct T_info_ack *)t->info_ack->b_rptr;
514 
515 	if (pptr->SERV_type == T_CLTS) {
516 		if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0)
517 			((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops;
518 	} else {
519 		if ((error = mir_open(q, devp, flag, sflag, crp)) == 0)
520 			((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops;
521 	}
522 
523 out:
524 	if (error)
525 		qprocsoff(q);
526 
527 	freemsg(t->info_ack);
528 	mutex_destroy(&t->lock);
529 	cv_destroy(&t->wait);
530 
531 	return (error);
532 }
533 
534 void
535 rmm_rput(queue_t *q, mblk_t  *mp)
536 {
537 	(*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp);
538 }
539 
540 void
541 rmm_rsrv(queue_t *q)
542 {
543 	(*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q);
544 }
545 
546 void
547 rmm_wput(queue_t *q, mblk_t *mp)
548 {
549 	(*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp);
550 }
551 
552 void
553 rmm_wsrv(queue_t *q)
554 {
555 	(*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q);
556 }
557 
558 int
559 rmm_close(queue_t *q, int flag, cred_t *crp)
560 {
561 	return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
562 }
563 
564 /*
565  * rpcmodopen -	open routine gets called when the module gets pushed
566  *		onto the stream.
567  */
568 /*ARGSUSED*/
569 int
570 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
571 {
572 	struct rpcm *rmp;
573 
574 	extern void (*rpc_rele)(queue_t *, mblk_t *);
575 	static void rpcmod_release(queue_t *, mblk_t *);
576 
577 	TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:");
578 
579 	/*
580 	 * Initialize entry points to release a rpcmod slot (and an input
581 	 * message if supplied) and to send an output message to the module
582 	 * below rpcmod.
583 	 */
584 	if (rpc_rele == NULL)
585 		rpc_rele = rpcmod_release;
586 
587 	/*
588 	 * Only sufficiently privileged users can use this module, and it
589 	 * is assumed that they will use this module properly, and NOT send
590 	 * bulk data from downstream.
591 	 */
592 	if (secpolicy_rpcmod_open(crp) != 0)
593 		return (EPERM);
594 
595 	/*
596 	 * Allocate slot data structure.
597 	 */
598 	rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP);
599 
600 	mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL);
601 	cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL);
602 	rmp->rm_zoneid = rpc_zoneid();
603 	/*
604 	 * slot type will be set by kRPC client and server ioctl's
605 	 */
606 	rmp->rm_type = 0;
607 
608 	q->q_ptr = (void *)rmp;
609 	WR(q)->q_ptr = (void *)rmp;
610 
611 	TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end");
612 	return (0);
613 }
614 
615 /*
616  * rpcmodclose - This routine gets called when the module gets popped
617  * off of the stream.
618  */
619 /*ARGSUSED*/
620 int
621 rpcmodclose(queue_t *q, int flag, cred_t *crp)
622 {
623 	struct rpcm *rmp;
624 
625 	ASSERT(q != NULL);
626 	rmp = (struct rpcm *)q->q_ptr;
627 
628 	/*
629 	 * Mark our state as closing.
630 	 */
631 	mutex_enter(&rmp->rm_lock);
632 	rmp->rm_state |= RM_CLOSING;
633 
634 	/*
635 	 * Check and see if there are any messages on the queue.  If so, send
636 	 * the messages, regardless whether the downstream module is ready to
637 	 * accept data.
638 	 */
639 	if (rmp->rm_type == RPC_SERVER) {
640 		flushq(q, FLUSHDATA);
641 
642 		qenable(WR(q));
643 
644 		if (rmp->rm_ref) {
645 			mutex_exit(&rmp->rm_lock);
646 			/*
647 			 * call into SVC to clean the queue
648 			 */
649 			svc_queueclean(q);
650 			mutex_enter(&rmp->rm_lock);
651 
652 			/*
653 			 * Block while there are kRPC threads with a reference
654 			 * to this message.
655 			 */
656 			while (rmp->rm_ref)
657 				cv_wait(&rmp->rm_cwait, &rmp->rm_lock);
658 		}
659 
660 		mutex_exit(&rmp->rm_lock);
661 
662 		/*
663 		 * It is now safe to remove this queue from the stream. No kRPC
664 		 * threads have a reference to the stream, and none ever will,
665 		 * because RM_CLOSING is set.
666 		 */
667 		qprocsoff(q);
668 
669 		/* Notify kRPC that this stream is going away. */
670 		svc_queueclose(q);
671 	} else {
672 		mutex_exit(&rmp->rm_lock);
673 		qprocsoff(q);
674 	}
675 
676 	q->q_ptr = NULL;
677 	WR(q)->q_ptr = NULL;
678 	mutex_destroy(&rmp->rm_lock);
679 	cv_destroy(&rmp->rm_cwait);
680 	kmem_free(rmp, sizeof (*rmp));
681 	return (0);
682 }
683 
684 #ifdef	DEBUG
685 int	rpcmod_send_msg_up = 0;
686 int	rpcmod_send_uderr = 0;
687 int	rpcmod_send_dup = 0;
688 int	rpcmod_send_dup_cnt = 0;
689 #endif
690 
691 /*
692  * rpcmodrput -	Module read put procedure.  This is called from
693  *		the module, driver, or stream head downstream.
694  */
695 void
696 rpcmodrput(queue_t *q, mblk_t *mp)
697 {
698 	struct rpcm *rmp;
699 	union T_primitives *pptr;
700 	int hdrsz;
701 
702 	TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:");
703 
704 	ASSERT(q != NULL);
705 	rmp = (struct rpcm *)q->q_ptr;
706 
707 	if (rmp->rm_type == 0) {
708 		freemsg(mp);
709 		return;
710 	}
711 
712 #ifdef DEBUG
713 	if (rpcmod_send_msg_up > 0) {
714 		mblk_t *nmp = copymsg(mp);
715 		if (nmp) {
716 			putnext(q, nmp);
717 			rpcmod_send_msg_up--;
718 		}
719 	}
720 	if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) {
721 		mblk_t *nmp;
722 		struct T_unitdata_ind *data;
723 		struct T_uderror_ind *ud;
724 		int d;
725 		data = (struct T_unitdata_ind *)mp->b_rptr;
726 		if (data->PRIM_type == T_UNITDATA_IND) {
727 			d = sizeof (*ud) - sizeof (*data);
728 			nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI);
729 			if (nmp) {
730 				ud = (struct T_uderror_ind *)nmp->b_rptr;
731 				ud->PRIM_type = T_UDERROR_IND;
732 				ud->DEST_length = data->SRC_length;
733 				ud->DEST_offset = data->SRC_offset + d;
734 				ud->OPT_length = data->OPT_length;
735 				ud->OPT_offset = data->OPT_offset + d;
736 				ud->ERROR_type = ENETDOWN;
737 				if (data->SRC_length) {
738 					bcopy(mp->b_rptr +
739 					    data->SRC_offset,
740 					    nmp->b_rptr +
741 					    ud->DEST_offset,
742 					    data->SRC_length);
743 				}
744 				if (data->OPT_length) {
745 					bcopy(mp->b_rptr +
746 					    data->OPT_offset,
747 					    nmp->b_rptr +
748 					    ud->OPT_offset,
749 					    data->OPT_length);
750 				}
751 				nmp->b_wptr += d;
752 				nmp->b_wptr += (mp->b_wptr - mp->b_rptr);
753 				nmp->b_datap->db_type = M_PROTO;
754 				putnext(q, nmp);
755 				rpcmod_send_uderr--;
756 			}
757 		}
758 	}
759 #endif
760 	switch (mp->b_datap->db_type) {
761 	default:
762 		putnext(q, mp);
763 		break;
764 
765 	case M_PROTO:
766 	case M_PCPROTO:
767 		ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t));
768 		pptr = (union T_primitives *)mp->b_rptr;
769 
770 		/*
771 		 * Forward this message to krpc if it is data.
772 		 */
773 		if (pptr->type == T_UNITDATA_IND) {
774 			mblk_t *nmp;
775 
776 		/*
777 		 * Check if the module is being popped.
778 		 */
779 			mutex_enter(&rmp->rm_lock);
780 			if (rmp->rm_state & RM_CLOSING) {
781 				mutex_exit(&rmp->rm_lock);
782 				putnext(q, mp);
783 				break;
784 			}
785 
786 			switch (rmp->rm_type) {
787 			case RPC_CLIENT:
788 				mutex_exit(&rmp->rm_lock);
789 				hdrsz = mp->b_wptr - mp->b_rptr;
790 
791 				/*
792 				 * Make sure the header is sane.
793 				 */
794 				if (hdrsz < TUNITDATAINDSZ ||
795 				    hdrsz < (pptr->unitdata_ind.OPT_length +
796 				    pptr->unitdata_ind.OPT_offset) ||
797 				    hdrsz < (pptr->unitdata_ind.SRC_length +
798 				    pptr->unitdata_ind.SRC_offset)) {
799 					freemsg(mp);
800 					return;
801 				}
802 
803 				/*
804 				 * Call clnt_clts_dispatch_notify, so that it
805 				 * can pass the message to the proper caller.
806 				 * Don't discard the header just yet since the
807 				 * client may need the sender's address.
808 				 */
809 				clnt_clts_dispatch_notify(mp, hdrsz,
810 				    rmp->rm_zoneid);
811 				return;
812 			case RPC_SERVER:
813 				/*
814 				 * rm_krpc_cell is exclusively used by the kRPC
815 				 * CLTS server
816 				 */
817 				if (rmp->rm_krpc_cell) {
818 #ifdef DEBUG
819 					/*
820 					 * Test duplicate request cache and
821 					 * rm_ref count handling by sending a
822 					 * duplicate every so often, if
823 					 * desired.
824 					 */
825 					if (rpcmod_send_dup &&
826 					    rpcmod_send_dup_cnt++ %
827 					    rpcmod_send_dup)
828 						nmp = copymsg(mp);
829 					else
830 						nmp = NULL;
831 #endif
832 					/*
833 					 * Raise the reference count on this
834 					 * module to prevent it from being
835 					 * popped before krpc generates the
836 					 * reply.
837 					 */
838 					rmp->rm_ref++;
839 					mutex_exit(&rmp->rm_lock);
840 
841 					/*
842 					 * Submit the message to krpc.
843 					 */
844 					svc_queuereq(q, mp);
845 #ifdef DEBUG
846 					/*
847 					 * Send duplicate if we created one.
848 					 */
849 					if (nmp) {
850 						mutex_enter(&rmp->rm_lock);
851 						rmp->rm_ref++;
852 						mutex_exit(&rmp->rm_lock);
853 						svc_queuereq(q, nmp);
854 					}
855 #endif
856 				} else {
857 					mutex_exit(&rmp->rm_lock);
858 					freemsg(mp);
859 				}
860 				return;
861 			default:
862 				mutex_exit(&rmp->rm_lock);
863 				freemsg(mp);
864 				return;
865 			} /* end switch(rmp->rm_type) */
866 		} else if (pptr->type == T_UDERROR_IND) {
867 			mutex_enter(&rmp->rm_lock);
868 			hdrsz = mp->b_wptr - mp->b_rptr;
869 
870 			/*
871 			 * Make sure the header is sane
872 			 */
873 			if (hdrsz < TUDERRORINDSZ ||
874 			    hdrsz < (pptr->uderror_ind.OPT_length +
875 			    pptr->uderror_ind.OPT_offset) ||
876 			    hdrsz < (pptr->uderror_ind.DEST_length +
877 			    pptr->uderror_ind.DEST_offset)) {
878 				mutex_exit(&rmp->rm_lock);
879 				freemsg(mp);
880 				return;
881 			}
882 
883 			/*
884 			 * In the case where a unit data error has been
885 			 * received, all we need to do is clear the message from
886 			 * the queue.
887 			 */
888 			mutex_exit(&rmp->rm_lock);
889 			freemsg(mp);
890 			RPCLOG(32, "rpcmodrput: unitdata error received at "
891 			    "%ld\n", gethrestime_sec());
892 			return;
893 		} /* end else if (pptr->type == T_UDERROR_IND) */
894 
895 		putnext(q, mp);
896 		break;
897 	} /* end switch (mp->b_datap->db_type) */
898 
899 	TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END,
900 	    "rpcmodrput_end:");
901 	/*
902 	 * Return codes are not looked at by the STREAMS framework.
903 	 */
904 }
905 
906 /*
907  * write put procedure
908  */
909 void
910 rpcmodwput(queue_t *q, mblk_t *mp)
911 {
912 	struct rpcm	*rmp;
913 
914 	ASSERT(q != NULL);
915 
916 	switch (mp->b_datap->db_type) {
917 		case M_PROTO:
918 		case M_PCPROTO:
919 			break;
920 		default:
921 			rpcmodwput_other(q, mp);
922 			return;
923 	}
924 
925 	/*
926 	 * Check to see if we can send the message downstream.
927 	 */
928 	if (canputnext(q)) {
929 		putnext(q, mp);
930 		return;
931 	}
932 
933 	rmp = (struct rpcm *)q->q_ptr;
934 	ASSERT(rmp != NULL);
935 
936 	/*
937 	 * The first canputnext failed.  Try again except this time with the
938 	 * lock held, so that we can check the state of the stream to see if
939 	 * it is closing.  If either of these conditions evaluate to true
940 	 * then send the meesage.
941 	 */
942 	mutex_enter(&rmp->rm_lock);
943 	if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) {
944 		mutex_exit(&rmp->rm_lock);
945 		putnext(q, mp);
946 	} else {
947 		/*
948 		 * canputnext failed again and the stream is not closing.
949 		 * Place the message on the queue and let the service
950 		 * procedure handle the message.
951 		 */
952 		mutex_exit(&rmp->rm_lock);
953 		(void) putq(q, mp);
954 	}
955 }
956 
957 static void
958 rpcmodwput_other(queue_t *q, mblk_t *mp)
959 {
960 	struct rpcm	*rmp;
961 	struct iocblk	*iocp;
962 
963 	rmp = (struct rpcm *)q->q_ptr;
964 	ASSERT(rmp != NULL);
965 
966 	switch (mp->b_datap->db_type) {
967 		case M_IOCTL:
968 			iocp = (struct iocblk *)mp->b_rptr;
969 			ASSERT(iocp != NULL);
970 			switch (iocp->ioc_cmd) {
971 				case RPC_CLIENT:
972 				case RPC_SERVER:
973 					mutex_enter(&rmp->rm_lock);
974 					rmp->rm_type = iocp->ioc_cmd;
975 					mutex_exit(&rmp->rm_lock);
976 					mp->b_datap->db_type = M_IOCACK;
977 					qreply(q, mp);
978 					return;
979 				default:
980 				/*
981 				 * pass the ioctl downstream and hope someone
982 				 * down there knows how to handle it.
983 				 */
984 					putnext(q, mp);
985 					return;
986 			}
987 		default:
988 			break;
989 	}
990 	/*
991 	 * This is something we definitely do not know how to handle, just
992 	 * pass the message downstream
993 	 */
994 	putnext(q, mp);
995 }
996 
997 /*
998  * Module write service procedure. This is called by downstream modules
999  * for back enabling during flow control.
1000  */
1001 void
1002 rpcmodwsrv(queue_t *q)
1003 {
1004 	struct rpcm	*rmp;
1005 	mblk_t		*mp = NULL;
1006 
1007 	rmp = (struct rpcm *)q->q_ptr;
1008 	ASSERT(rmp != NULL);
1009 
1010 	/*
1011 	 * Get messages that may be queued and send them down stream
1012 	 */
1013 	while ((mp = getq(q)) != NULL) {
1014 		/*
1015 		 * Optimize the service procedure for the server-side, by
1016 		 * avoiding a call to canputnext().
1017 		 */
1018 		if (rmp->rm_type == RPC_SERVER || canputnext(q)) {
1019 			putnext(q, mp);
1020 			continue;
1021 		}
1022 		(void) putbq(q, mp);
1023 		return;
1024 	}
1025 }
1026 
1027 static void
1028 rpcmod_release(queue_t *q, mblk_t *bp)
1029 {
1030 	struct rpcm *rmp;
1031 
1032 	/*
1033 	 * For now, just free the message.
1034 	 */
1035 	if (bp)
1036 		freemsg(bp);
1037 	rmp = (struct rpcm *)q->q_ptr;
1038 
1039 	mutex_enter(&rmp->rm_lock);
1040 	rmp->rm_ref--;
1041 
1042 	if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) {
1043 		cv_broadcast(&rmp->rm_cwait);
1044 	}
1045 
1046 	mutex_exit(&rmp->rm_lock);
1047 }
1048 
1049 /*
1050  * This part of rpcmod is pushed on a connection-oriented transport for use
1051  * by RPC.  It serves to bypass the Stream head, implements
1052  * the record marking protocol, and dispatches incoming RPC messages.
1053  */
1054 
1055 /* Default idle timer values */
1056 #define	MIR_CLNT_IDLE_TIMEOUT	(5 * (60 * 1000L))	/* 5 minutes */
1057 #define	MIR_SVC_IDLE_TIMEOUT	(6 * (60 * 1000L))	/* 6 minutes */
1058 #define	MIR_SVC_ORDREL_TIMEOUT	(10 * (60 * 1000L))	/* 10 minutes */
1059 #define	MIR_LASTFRAG	0x80000000	/* Record marker */
1060 
1061 #define	DLEN(mp) (mp->b_cont ? msgdsize(mp) : (mp->b_wptr - mp->b_rptr))
1062 
1063 #define	MIR_SVC_QUIESCED(mir)	\
1064 	(mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0)
1065 
1066 #define	MIR_CLEAR_INRSRV(mir_ptr)	{	\
1067 	(mir_ptr)->mir_inrservice = 0;	\
1068 	if ((mir_ptr)->mir_type == RPC_SERVER &&	\
1069 		(mir_ptr)->mir_closing)	\
1070 		cv_signal(&(mir_ptr)->mir_condvar);	\
1071 }
1072 
1073 /*
1074  * Don't block service procedure (and mir_close) if
1075  * we are in the process of closing.
1076  */
1077 #define	MIR_WCANPUTNEXT(mir_ptr, write_q)	\
1078 	(canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1))
1079 
1080 static int	mir_clnt_dup_request(queue_t *q, mblk_t *mp);
1081 static void	mir_rput_proto(queue_t *q, mblk_t *mp);
1082 static int	mir_svc_policy_notify(queue_t *q, int event);
1083 static void	mir_svc_release(queue_t *wq, mblk_t *mp);
1084 static void	mir_svc_start(queue_t *wq);
1085 static void	mir_svc_idle_start(queue_t *, mir_t *);
1086 static void	mir_svc_idle_stop(queue_t *, mir_t *);
1087 static void	mir_svc_start_close(queue_t *, mir_t *);
1088 static void	mir_clnt_idle_do_stop(queue_t *);
1089 static void	mir_clnt_idle_stop(queue_t *, mir_t *);
1090 static void	mir_clnt_idle_start(queue_t *, mir_t *);
1091 static void	mir_wput(queue_t *q, mblk_t *mp);
1092 static void	mir_wput_other(queue_t *q, mblk_t *mp);
1093 static void	mir_wsrv(queue_t *q);
1094 static	void	mir_disconnect(queue_t *, mir_t *ir);
1095 static	int	mir_check_len(queue_t *, int32_t, mblk_t *);
1096 static	void	mir_timer(void *);
1097 
1098 extern void	(*mir_rele)(queue_t *, mblk_t *);
1099 extern void	(*mir_start)(queue_t *);
1100 extern void	(*clnt_stop_idle)(queue_t *);
1101 
1102 clock_t	clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT;
1103 clock_t	svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT;
1104 
1105 /*
1106  * Timeout for subsequent notifications of idle connection.  This is
1107  * typically used to clean up after a wedged orderly release.
1108  */
1109 clock_t	svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */
1110 
1111 extern	uint_t	*clnt_max_msg_sizep;
1112 extern	uint_t	*svc_max_msg_sizep;
1113 uint_t	clnt_max_msg_size = RPC_MAXDATASIZE;
1114 uint_t	svc_max_msg_size = RPC_MAXDATASIZE;
1115 uint_t	mir_krpc_cell_null;
1116 
1117 static void
1118 mir_timer_stop(mir_t *mir)
1119 {
1120 	timeout_id_t tid;
1121 
1122 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
1123 
1124 	/*
1125 	 * Since the mir_mutex lock needs to be released to call
1126 	 * untimeout(), we need to make sure that no other thread
1127 	 * can start/stop the timer (changing mir_timer_id) during
1128 	 * that time.  The mir_timer_call bit and the mir_timer_cv
1129 	 * condition variable are used to synchronize this.  Setting
1130 	 * mir_timer_call also tells mir_timer() (refer to the comments
1131 	 * in mir_timer()) that it does not need to do anything.
1132 	 */
1133 	while (mir->mir_timer_call)
1134 		cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1135 	mir->mir_timer_call = B_TRUE;
1136 
1137 	if ((tid = mir->mir_timer_id) != 0) {
1138 		mir->mir_timer_id = 0;
1139 		mutex_exit(&mir->mir_mutex);
1140 		(void) untimeout(tid);
1141 		mutex_enter(&mir->mir_mutex);
1142 	}
1143 	mir->mir_timer_call = B_FALSE;
1144 	cv_broadcast(&mir->mir_timer_cv);
1145 }
1146 
1147 static void
1148 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl)
1149 {
1150 	timeout_id_t tid;
1151 
1152 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
1153 
1154 	while (mir->mir_timer_call)
1155 		cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1156 	mir->mir_timer_call = B_TRUE;
1157 
1158 	if ((tid = mir->mir_timer_id) != 0) {
1159 		mutex_exit(&mir->mir_mutex);
1160 		(void) untimeout(tid);
1161 		mutex_enter(&mir->mir_mutex);
1162 	}
1163 	/* Only start the timer when it is not closing. */
1164 	if (!mir->mir_closing) {
1165 		mir->mir_timer_id = timeout(mir_timer, q,
1166 		    MSEC_TO_TICK(intrvl));
1167 	}
1168 	mir->mir_timer_call = B_FALSE;
1169 	cv_broadcast(&mir->mir_timer_cv);
1170 }
1171 
1172 static int
1173 mir_clnt_dup_request(queue_t *q, mblk_t *mp)
1174 {
1175 	mblk_t  *mp1;
1176 	uint32_t  new_xid;
1177 	uint32_t  old_xid;
1178 
1179 	ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex));
1180 	new_xid = BE32_TO_U32(&mp->b_rptr[4]);
1181 	/*
1182 	 * This loop is a bit tacky -- it walks the STREAMS list of
1183 	 * flow-controlled messages.
1184 	 */
1185 	if ((mp1 = q->q_first) != NULL) {
1186 		do {
1187 			old_xid = BE32_TO_U32(&mp1->b_rptr[4]);
1188 			if (new_xid == old_xid)
1189 				return (1);
1190 		} while ((mp1 = mp1->b_next) != NULL);
1191 	}
1192 	return (0);
1193 }
1194 
1195 static int
1196 mir_close(queue_t *q)
1197 {
1198 	mir_t	*mir = q->q_ptr;
1199 	mblk_t	*mp;
1200 	bool_t queue_cleaned = FALSE;
1201 
1202 	RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q);
1203 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1204 	mutex_enter(&mir->mir_mutex);
1205 	if ((mp = mir->mir_head_mp) != NULL) {
1206 		mir->mir_head_mp = NULL;
1207 		mir->mir_tail_mp = NULL;
1208 		freemsg(mp);
1209 	}
1210 	/*
1211 	 * Set mir_closing so we get notified when MIR_SVC_QUIESCED()
1212 	 * is TRUE.  And mir_timer_start() won't start the timer again.
1213 	 */
1214 	mir->mir_closing = B_TRUE;
1215 	mir_timer_stop(mir);
1216 
1217 	if (mir->mir_type == RPC_SERVER) {
1218 		flushq(q, FLUSHDATA);	/* Ditch anything waiting on read q */
1219 
1220 		/*
1221 		 * This will prevent more requests from arriving and
1222 		 * will force rpcmod to ignore flow control.
1223 		 */
1224 		mir_svc_start_close(WR(q), mir);
1225 
1226 		while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) {
1227 
1228 			if (mir->mir_ref_cnt && !mir->mir_inrservice &&
1229 			    (queue_cleaned == FALSE)) {
1230 				/*
1231 				 * call into SVC to clean the queue
1232 				 */
1233 				mutex_exit(&mir->mir_mutex);
1234 				svc_queueclean(q);
1235 				queue_cleaned = TRUE;
1236 				mutex_enter(&mir->mir_mutex);
1237 				continue;
1238 			}
1239 
1240 			/*
1241 			 * Bugid 1253810 - Force the write service
1242 			 * procedure to send its messages, regardless
1243 			 * whether the downstream  module is ready
1244 			 * to accept data.
1245 			 */
1246 			if (mir->mir_inwservice == 1)
1247 				qenable(WR(q));
1248 
1249 			cv_wait(&mir->mir_condvar, &mir->mir_mutex);
1250 		}
1251 
1252 		mutex_exit(&mir->mir_mutex);
1253 		qprocsoff(q);
1254 
1255 		/* Notify KRPC that this stream is going away. */
1256 		svc_queueclose(q);
1257 	} else {
1258 		mutex_exit(&mir->mir_mutex);
1259 		qprocsoff(q);
1260 	}
1261 
1262 	mutex_destroy(&mir->mir_mutex);
1263 	cv_destroy(&mir->mir_condvar);
1264 	cv_destroy(&mir->mir_timer_cv);
1265 	kmem_free(mir, sizeof (mir_t));
1266 	return (0);
1267 }
1268 
1269 /*
1270  * This is server side only (RPC_SERVER).
1271  *
1272  * Exit idle mode.
1273  */
1274 static void
1275 mir_svc_idle_stop(queue_t *q, mir_t *mir)
1276 {
1277 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
1278 	ASSERT((q->q_flag & QREADR) == 0);
1279 	ASSERT(mir->mir_type == RPC_SERVER);
1280 	RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q);
1281 
1282 	mir_timer_stop(mir);
1283 }
1284 
1285 /*
1286  * This is server side only (RPC_SERVER).
1287  *
1288  * Start idle processing, which will include setting idle timer if the
1289  * stream is not being closed.
1290  */
1291 static void
1292 mir_svc_idle_start(queue_t *q, mir_t *mir)
1293 {
1294 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
1295 	ASSERT((q->q_flag & QREADR) == 0);
1296 	ASSERT(mir->mir_type == RPC_SERVER);
1297 	RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q);
1298 
1299 	/*
1300 	 * Don't re-start idle timer if we are closing queues.
1301 	 */
1302 	if (mir->mir_closing) {
1303 		RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n",
1304 		    (void *)q);
1305 
1306 		/*
1307 		 * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED()
1308 		 * is true.  When it is true, and we are in the process of
1309 		 * closing the stream, signal any thread waiting in
1310 		 * mir_close().
1311 		 */
1312 		if (mir->mir_inwservice == 0)
1313 			cv_signal(&mir->mir_condvar);
1314 
1315 	} else {
1316 		RPCLOG(16, "mir_svc_idle_start - reset %s timer\n",
1317 		    mir->mir_ordrel_pending ? "ordrel" : "normal");
1318 		/*
1319 		 * Normal condition, start the idle timer.  If an orderly
1320 		 * release has been sent, set the timeout to wait for the
1321 		 * client to close its side of the connection.  Otherwise,
1322 		 * use the normal idle timeout.
1323 		 */
1324 		mir_timer_start(q, mir, mir->mir_ordrel_pending ?
1325 		    svc_ordrel_timeout : mir->mir_idle_timeout);
1326 	}
1327 }
1328 
1329 /* ARGSUSED */
1330 static int
1331 mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
1332 {
1333 	mir_t	*mir;
1334 
1335 	RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q);
1336 	/* Set variables used directly by KRPC. */
1337 	if (!mir_rele)
1338 		mir_rele = mir_svc_release;
1339 	if (!mir_start)
1340 		mir_start = mir_svc_start;
1341 	if (!clnt_stop_idle)
1342 		clnt_stop_idle = mir_clnt_idle_do_stop;
1343 	if (!clnt_max_msg_sizep)
1344 		clnt_max_msg_sizep = &clnt_max_msg_size;
1345 	if (!svc_max_msg_sizep)
1346 		svc_max_msg_sizep = &svc_max_msg_size;
1347 
1348 	/* Allocate a zero'ed out mir structure for this stream. */
1349 	mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP);
1350 
1351 	/*
1352 	 * We set hold inbound here so that incoming messages will
1353 	 * be held on the read-side queue until the stream is completely
1354 	 * initialized with a RPC_CLIENT or RPC_SERVER ioctl.  During
1355 	 * the ioctl processing, the flag is cleared and any messages that
1356 	 * arrived between the open and the ioctl are delivered to KRPC.
1357 	 *
1358 	 * Early data should never arrive on a client stream since
1359 	 * servers only respond to our requests and we do not send any.
1360 	 * until after the stream is initialized.  Early data is
1361 	 * very common on a server stream where the client will start
1362 	 * sending data as soon as the connection is made (and this
1363 	 * is especially true with TCP where the protocol accepts the
1364 	 * connection before nfsd or KRPC is notified about it).
1365 	 */
1366 
1367 	mir->mir_hold_inbound = 1;
1368 
1369 	/*
1370 	 * Start the record marker looking for a 4-byte header.  When
1371 	 * this length is negative, it indicates that rpcmod is looking
1372 	 * for bytes to consume for the record marker header.  When it
1373 	 * is positive, it holds the number of bytes that have arrived
1374 	 * for the current fragment and are being held in mir_header_mp.
1375 	 */
1376 
1377 	mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1378 
1379 	mir->mir_zoneid = rpc_zoneid();
1380 	mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL);
1381 	cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL);
1382 	cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL);
1383 
1384 	q->q_ptr = (char *)mir;
1385 	WR(q)->q_ptr = (char *)mir;
1386 
1387 	/*
1388 	 * We noenable the read-side queue because we don't want it
1389 	 * automatically enabled by putq.  We enable it explicitly
1390 	 * in mir_wsrv when appropriate. (See additional comments on
1391 	 * flow control at the beginning of mir_rsrv.)
1392 	 */
1393 	noenable(q);
1394 
1395 	qprocson(q);
1396 	return (0);
1397 }
1398 
1399 /*
1400  * Read-side put routine for both the client and server side.  Does the
1401  * record marking for incoming RPC messages, and when complete, dispatches
1402  * the message to either the client or server.
1403  */
1404 static void
1405 mir_rput(queue_t *q, mblk_t *mp)
1406 {
1407 	int	excess;
1408 	int32_t	frag_len, frag_header;
1409 	mblk_t	*cont_mp, *head_mp, *tail_mp, *mp1;
1410 	mir_t	*mir = q->q_ptr;
1411 	boolean_t stop_timer = B_FALSE;
1412 
1413 	ASSERT(mir != NULL);
1414 
1415 	/*
1416 	 * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER
1417 	 * with the corresponding ioctl, then don't accept
1418 	 * any inbound data.  This should never happen for streams
1419 	 * created by nfsd or client-side KRPC because they are careful
1420 	 * to set the mode of the stream before doing anything else.
1421 	 */
1422 	if (mir->mir_type == 0) {
1423 		freemsg(mp);
1424 		return;
1425 	}
1426 
1427 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1428 
1429 	switch (mp->b_datap->db_type) {
1430 	case M_DATA:
1431 		break;
1432 	case M_PROTO:
1433 	case M_PCPROTO:
1434 		if (MBLKL(mp) < sizeof (t_scalar_t)) {
1435 			RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n",
1436 			    (int)MBLKL(mp));
1437 			freemsg(mp);
1438 			return;
1439 		}
1440 		if (((union T_primitives *)mp->b_rptr)->type != T_DATA_IND) {
1441 			mir_rput_proto(q, mp);
1442 			return;
1443 		}
1444 
1445 		/* Throw away the T_DATA_IND block and continue with data. */
1446 		mp1 = mp;
1447 		mp = mp->b_cont;
1448 		freeb(mp1);
1449 		break;
1450 	case M_SETOPTS:
1451 		/*
1452 		 * If a module on the stream is trying set the Stream head's
1453 		 * high water mark, then set our hiwater to the requested
1454 		 * value.  We are the "stream head" for all inbound
1455 		 * data messages since messages are passed directly to KRPC.
1456 		 */
1457 		if (MBLKL(mp) >= sizeof (struct stroptions)) {
1458 			struct stroptions	*stropts;
1459 
1460 			stropts = (struct stroptions *)mp->b_rptr;
1461 			if ((stropts->so_flags & SO_HIWAT) &&
1462 			    !(stropts->so_flags & SO_BAND)) {
1463 				(void) strqset(q, QHIWAT, 0, stropts->so_hiwat);
1464 			}
1465 		}
1466 		putnext(q, mp);
1467 		return;
1468 	case M_FLUSH:
1469 		RPCLOG(32, "mir_rput: ignoring M_FLUSH %x ", *mp->b_rptr);
1470 		RPCLOG(32, "on q 0x%p\n", (void *)q);
1471 		putnext(q, mp);
1472 		return;
1473 	default:
1474 		putnext(q, mp);
1475 		return;
1476 	}
1477 
1478 	mutex_enter(&mir->mir_mutex);
1479 
1480 	/*
1481 	 * If this connection is closing, don't accept any new messages.
1482 	 */
1483 	if (mir->mir_svc_no_more_msgs) {
1484 		ASSERT(mir->mir_type == RPC_SERVER);
1485 		mutex_exit(&mir->mir_mutex);
1486 		freemsg(mp);
1487 		return;
1488 	}
1489 
1490 	/* Get local copies for quicker access. */
1491 	frag_len = mir->mir_frag_len;
1492 	frag_header = mir->mir_frag_header;
1493 	head_mp = mir->mir_head_mp;
1494 	tail_mp = mir->mir_tail_mp;
1495 
1496 	/* Loop, processing each message block in the mp chain separately. */
1497 	do {
1498 		cont_mp = mp->b_cont;
1499 		mp->b_cont = NULL;
1500 
1501 		/*
1502 		 * Drop zero-length mblks to prevent unbounded kernel memory
1503 		 * consumption.
1504 		 */
1505 		if (MBLKL(mp) == 0) {
1506 			freeb(mp);
1507 			continue;
1508 		}
1509 
1510 		/*
1511 		 * If frag_len is negative, we're still in the process of
1512 		 * building frag_header -- try to complete it with this mblk.
1513 		 */
1514 		while (frag_len < 0 && mp->b_rptr < mp->b_wptr) {
1515 			frag_len++;
1516 			frag_header <<= 8;
1517 			frag_header += *mp->b_rptr++;
1518 		}
1519 
1520 		if (MBLKL(mp) == 0 && frag_len < 0) {
1521 			/*
1522 			 * We consumed this mblk while trying to complete the
1523 			 * fragment header.  Free it and move on.
1524 			 */
1525 			freeb(mp);
1526 			continue;
1527 		}
1528 
1529 		ASSERT(frag_len >= 0);
1530 
1531 		/*
1532 		 * Now frag_header has the number of bytes in this fragment
1533 		 * and we're just waiting to collect them all.  Chain our
1534 		 * latest mblk onto the list and see if we now have enough
1535 		 * bytes to complete the fragment.
1536 		 */
1537 		if (head_mp == NULL) {
1538 			ASSERT(tail_mp == NULL);
1539 			head_mp = tail_mp = mp;
1540 		} else {
1541 			tail_mp->b_cont = mp;
1542 			tail_mp = mp;
1543 		}
1544 
1545 		frag_len += MBLKL(mp);
1546 		excess = frag_len - (frag_header & ~MIR_LASTFRAG);
1547 		if (excess < 0) {
1548 			/*
1549 			 * We still haven't received enough data to complete
1550 			 * the fragment, so continue on to the next mblk.
1551 			 */
1552 			continue;
1553 		}
1554 
1555 		/*
1556 		 * We've got a complete fragment.  If there are excess bytes,
1557 		 * then they're part of the next fragment's header (of either
1558 		 * this RPC message or the next RPC message).  Split that part
1559 		 * into its own mblk so that we can safely freeb() it when
1560 		 * building frag_header above.
1561 		 */
1562 		if (excess > 0) {
1563 			if ((mp1 = dupb(mp)) == NULL &&
1564 			    (mp1 = copyb(mp)) == NULL) {
1565 				freemsg(head_mp);
1566 				freemsg(cont_mp);
1567 				RPCLOG0(1, "mir_rput: dupb/copyb failed\n");
1568 				mir->mir_frag_header = 0;
1569 				mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1570 				mir->mir_head_mp = NULL;
1571 				mir->mir_tail_mp = NULL;
1572 				mir_disconnect(q, mir);	/* drops mir_mutex */
1573 				return;
1574 			}
1575 
1576 			/*
1577 			 * Relink the message chain so that the next mblk is
1578 			 * the next fragment header, followed by the rest of
1579 			 * the message chain.
1580 			 */
1581 			mp1->b_cont = cont_mp;
1582 			cont_mp = mp1;
1583 
1584 			/*
1585 			 * Data in the new mblk begins at the next fragment,
1586 			 * and data in the old mblk ends at the next fragment.
1587 			 */
1588 			mp1->b_rptr = mp1->b_wptr - excess;
1589 			mp->b_wptr -= excess;
1590 		}
1591 
1592 		/*
1593 		 * Reset frag_len and frag_header for the next fragment.
1594 		 */
1595 		frag_len = -(int32_t)sizeof (uint32_t);
1596 		if (!(frag_header & MIR_LASTFRAG)) {
1597 			/*
1598 			 * The current fragment is complete, but more
1599 			 * fragments need to be processed before we can
1600 			 * pass along the RPC message headed at head_mp.
1601 			 */
1602 			frag_header = 0;
1603 			continue;
1604 		}
1605 		frag_header = 0;
1606 
1607 		/*
1608 		 * We've got a complete RPC message; pass it to the
1609 		 * appropriate consumer.
1610 		 */
1611 		switch (mir->mir_type) {
1612 		case RPC_CLIENT:
1613 			if (clnt_dispatch_notify(head_mp, mir->mir_zoneid)) {
1614 				/*
1615 				 * Mark this stream as active.  This marker
1616 				 * is used in mir_timer().
1617 				 */
1618 				mir->mir_clntreq = 1;
1619 				mir->mir_use_timestamp = lbolt;
1620 			} else {
1621 				freemsg(head_mp);
1622 			}
1623 			break;
1624 
1625 		case RPC_SERVER:
1626 			/*
1627 			 * Check for flow control before passing the
1628 			 * message to KRPC.
1629 			 */
1630 			if (!mir->mir_hold_inbound) {
1631 				if (mir->mir_krpc_cell) {
1632 					/*
1633 					 * If the reference count is 0
1634 					 * (not including this request),
1635 					 * then the stream is transitioning
1636 					 * from idle to non-idle.  In this case,
1637 					 * we cancel the idle timer.
1638 					 */
1639 					if (mir->mir_ref_cnt++ == 0)
1640 						stop_timer = B_TRUE;
1641 					if (mir_check_len(q,
1642 					    (int32_t)msgdsize(mp), mp))
1643 						return;
1644 					svc_queuereq(q, head_mp); /* to KRPC */
1645 				} else {
1646 					/*
1647 					 * Count # of times this happens. Should
1648 					 * be never, but experience shows
1649 					 * otherwise.
1650 					 */
1651 					mir_krpc_cell_null++;
1652 					freemsg(head_mp);
1653 				}
1654 			} else {
1655 				/*
1656 				 * If the outbound side of the stream is
1657 				 * flow controlled, then hold this message
1658 				 * until client catches up. mir_hold_inbound
1659 				 * is set in mir_wput and cleared in mir_wsrv.
1660 				 */
1661 				(void) putq(q, head_mp);
1662 				mir->mir_inrservice = B_TRUE;
1663 			}
1664 			break;
1665 		default:
1666 			RPCLOG(1, "mir_rput: unknown mir_type %d\n",
1667 			    mir->mir_type);
1668 			freemsg(head_mp);
1669 			break;
1670 		}
1671 
1672 		/*
1673 		 * Reset the chain since we're starting on a new RPC message.
1674 		 */
1675 		head_mp = tail_mp = NULL;
1676 	} while ((mp = cont_mp) != NULL);
1677 
1678 	/*
1679 	 * Sanity check the message length; if it's too large mir_check_len()
1680 	 * will shutdown the connection, drop mir_mutex, and return non-zero.
1681 	 */
1682 	if (head_mp != NULL && mir->mir_setup_complete &&
1683 	    mir_check_len(q, frag_len, head_mp))
1684 		return;
1685 
1686 	/* Save our local copies back in the mir structure. */
1687 	mir->mir_frag_header = frag_header;
1688 	mir->mir_frag_len = frag_len;
1689 	mir->mir_head_mp = head_mp;
1690 	mir->mir_tail_mp = tail_mp;
1691 
1692 	/*
1693 	 * The timer is stopped after the whole message chain is processed.
1694 	 * The reason is that stopping the timer releases the mir_mutex
1695 	 * lock temporarily.  This means that the request can be serviced
1696 	 * while we are still processing the message chain.  This is not
1697 	 * good.  So we stop the timer here instead.
1698 	 *
1699 	 * Note that if the timer fires before we stop it, it will not
1700 	 * do any harm as MIR_SVC_QUIESCED() is false and mir_timer()
1701 	 * will just return.
1702 	 */
1703 	if (stop_timer) {
1704 		RPCLOG(16, "mir_rput: stopping idle timer on 0x%p because "
1705 		    "ref cnt going to non zero\n", (void *)WR(q));
1706 		mir_svc_idle_stop(WR(q), mir);
1707 	}
1708 	mutex_exit(&mir->mir_mutex);
1709 }
1710 
1711 static void
1712 mir_rput_proto(queue_t *q, mblk_t *mp)
1713 {
1714 	mir_t	*mir = (mir_t *)q->q_ptr;
1715 	uint32_t	type;
1716 	uint32_t reason = 0;
1717 
1718 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1719 
1720 	type = ((union T_primitives *)mp->b_rptr)->type;
1721 	switch (mir->mir_type) {
1722 	case RPC_CLIENT:
1723 		switch (type) {
1724 		case T_DISCON_IND:
1725 			reason = ((struct T_discon_ind *)
1726 			    (mp->b_rptr))->DISCON_reason;
1727 			/*FALLTHROUGH*/
1728 		case T_ORDREL_IND:
1729 			mutex_enter(&mir->mir_mutex);
1730 			if (mir->mir_head_mp) {
1731 				freemsg(mir->mir_head_mp);
1732 				mir->mir_head_mp = (mblk_t *)0;
1733 				mir->mir_tail_mp = (mblk_t *)0;
1734 			}
1735 			/*
1736 			 * We are disconnecting, but not necessarily
1737 			 * closing. By not closing, we will fail to
1738 			 * pick up a possibly changed global timeout value,
1739 			 * unless we store it now.
1740 			 */
1741 			mir->mir_idle_timeout = clnt_idle_timeout;
1742 			mir_clnt_idle_stop(WR(q), mir);
1743 
1744 			/*
1745 			 * Even though we are unconnected, we still
1746 			 * leave the idle timer going on the client. The
1747 			 * reason for is that if we've disconnected due
1748 			 * to a server-side disconnect, reset, or connection
1749 			 * timeout, there is a possibility the client may
1750 			 * retry the RPC request. This retry needs to done on
1751 			 * the same bound address for the server to interpret
1752 			 * it as such. However, we don't want
1753 			 * to wait forever for that possibility. If the
1754 			 * end-point stays unconnected for mir_idle_timeout
1755 			 * units of time, then that is a signal to the
1756 			 * connection manager to give up waiting for the
1757 			 * application (eg. NFS) to send a retry.
1758 			 */
1759 			mir_clnt_idle_start(WR(q), mir);
1760 			mutex_exit(&mir->mir_mutex);
1761 			clnt_dispatch_notifyall(WR(q), type, reason);
1762 			freemsg(mp);
1763 			return;
1764 		case T_ERROR_ACK:
1765 		{
1766 			struct T_error_ack	*terror;
1767 
1768 			terror = (struct T_error_ack *)mp->b_rptr;
1769 			RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p",
1770 			    (void *)q);
1771 			RPCLOG(1, " ERROR_prim: %s,",
1772 			    rpc_tpiprim2name(terror->ERROR_prim));
1773 			RPCLOG(1, " TLI_error: %s,",
1774 			    rpc_tpierr2name(terror->TLI_error));
1775 			RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error);
1776 			if (terror->ERROR_prim == T_DISCON_REQ)  {
1777 				clnt_dispatch_notifyall(WR(q), type, reason);
1778 				freemsg(mp);
1779 				return;
1780 			} else {
1781 				if (clnt_dispatch_notifyconn(WR(q), mp))
1782 					return;
1783 			}
1784 			break;
1785 		}
1786 		case T_OK_ACK:
1787 		{
1788 			struct T_ok_ack	*tok = (struct T_ok_ack *)mp->b_rptr;
1789 
1790 			if (tok->CORRECT_prim == T_DISCON_REQ) {
1791 				clnt_dispatch_notifyall(WR(q), type, reason);
1792 				freemsg(mp);
1793 				return;
1794 			} else {
1795 				if (clnt_dispatch_notifyconn(WR(q), mp))
1796 					return;
1797 			}
1798 			break;
1799 		}
1800 		case T_CONN_CON:
1801 		case T_INFO_ACK:
1802 		case T_OPTMGMT_ACK:
1803 			if (clnt_dispatch_notifyconn(WR(q), mp))
1804 				return;
1805 			break;
1806 		case T_BIND_ACK:
1807 			break;
1808 		default:
1809 			RPCLOG(1, "mir_rput: unexpected message %d "
1810 			    "for KRPC client\n",
1811 			    ((union T_primitives *)mp->b_rptr)->type);
1812 			break;
1813 		}
1814 		break;
1815 
1816 	case RPC_SERVER:
1817 		switch (type) {
1818 		case T_BIND_ACK:
1819 		{
1820 			struct T_bind_ack	*tbind;
1821 
1822 			/*
1823 			 * If this is a listening stream, then shut
1824 			 * off the idle timer.
1825 			 */
1826 			tbind = (struct T_bind_ack *)mp->b_rptr;
1827 			if (tbind->CONIND_number > 0) {
1828 				mutex_enter(&mir->mir_mutex);
1829 				mir_svc_idle_stop(WR(q), mir);
1830 
1831 				/*
1832 				 * mark this as a listen endpoint
1833 				 * for special handling.
1834 				 */
1835 
1836 				mir->mir_listen_stream = 1;
1837 				mutex_exit(&mir->mir_mutex);
1838 			}
1839 			break;
1840 		}
1841 		case T_DISCON_IND:
1842 		case T_ORDREL_IND:
1843 			RPCLOG(16, "mir_rput_proto: got %s indication\n",
1844 			    type == T_DISCON_IND ? "disconnect"
1845 			    : "orderly release");
1846 
1847 			/*
1848 			 * For listen endpoint just pass
1849 			 * on the message.
1850 			 */
1851 
1852 			if (mir->mir_listen_stream)
1853 				break;
1854 
1855 			mutex_enter(&mir->mir_mutex);
1856 
1857 			/*
1858 			 * If client wants to break off connection, record
1859 			 * that fact.
1860 			 */
1861 			mir_svc_start_close(WR(q), mir);
1862 
1863 			/*
1864 			 * If we are idle, then send the orderly release
1865 			 * or disconnect indication to nfsd.
1866 			 */
1867 			if (MIR_SVC_QUIESCED(mir)) {
1868 				mutex_exit(&mir->mir_mutex);
1869 				break;
1870 			}
1871 
1872 			RPCLOG(16, "mir_rput_proto: not idle, so "
1873 			    "disconnect/ord rel indication not passed "
1874 			    "upstream on 0x%p\n", (void *)q);
1875 
1876 			/*
1877 			 * Hold the indication until we get idle
1878 			 * If there already is an indication stored,
1879 			 * replace it if the new one is a disconnect. The
1880 			 * reasoning is that disconnection takes less time
1881 			 * to process, and once a client decides to
1882 			 * disconnect, we should do that.
1883 			 */
1884 			if (mir->mir_svc_pend_mp) {
1885 				if (type == T_DISCON_IND) {
1886 					RPCLOG(16, "mir_rput_proto: replacing"
1887 					    " held disconnect/ord rel"
1888 					    " indication with disconnect on"
1889 					    " 0x%p\n", (void *)q);
1890 
1891 					freemsg(mir->mir_svc_pend_mp);
1892 					mir->mir_svc_pend_mp = mp;
1893 				} else {
1894 					RPCLOG(16, "mir_rput_proto: already "
1895 					    "held a disconnect/ord rel "
1896 					    "indication. freeing ord rel "
1897 					    "ind on 0x%p\n", (void *)q);
1898 					freemsg(mp);
1899 				}
1900 			} else
1901 				mir->mir_svc_pend_mp = mp;
1902 
1903 			mutex_exit(&mir->mir_mutex);
1904 			return;
1905 
1906 		default:
1907 			/* nfsd handles server-side non-data messages. */
1908 			break;
1909 		}
1910 		break;
1911 
1912 	default:
1913 		break;
1914 	}
1915 
1916 	putnext(q, mp);
1917 }
1918 
1919 /*
1920  * The server-side read queues are used to hold inbound messages while
1921  * outbound flow control is exerted.  When outbound flow control is
1922  * relieved, mir_wsrv qenables the read-side queue.  Read-side queues
1923  * are not enabled by STREAMS and are explicitly noenable'ed in mir_open.
1924  *
1925  * For the server side,  we have two types of messages queued. The first type
1926  * are messages that are ready to be XDR decoded and and then sent to the
1927  * RPC program's dispatch routine. The second type are "raw" messages that
1928  * haven't been processed, i.e. assembled from rpc record fragements into
1929  * full requests. The only time we will see the second type of message
1930  * queued is if we have a memory allocation failure while processing a
1931  * a raw message. The field mir_first_non_processed_mblk will mark the
1932  * first such raw message. So the flow for server side is:
1933  *
1934  *	- send processed queued messages to kRPC until we run out or find
1935  *	  one that needs additional processing because we were short on memory
1936  *	  earlier
1937  *	- process a message that was deferred because of lack of
1938  *	  memory
1939  *	- continue processing messages until the queue empties or we
1940  *	  have to stop because of lack of memory
1941  *	- during each of the above phase, if the queue is empty and
1942  *	  there are no pending messages that were passed to the RPC
1943  *	  layer, send upstream the pending disconnect/ordrel indication if
1944  *	  there is one
1945  *
1946  * The read-side queue is also enabled by a bufcall callback if dupmsg
1947  * fails in mir_rput.
1948  */
1949 static void
1950 mir_rsrv(queue_t *q)
1951 {
1952 	mir_t	*mir;
1953 	mblk_t	*mp;
1954 	mblk_t	*cmp = NULL;
1955 	boolean_t stop_timer = B_FALSE;
1956 
1957 	mir = (mir_t *)q->q_ptr;
1958 	mutex_enter(&mir->mir_mutex);
1959 
1960 	mp = NULL;
1961 	switch (mir->mir_type) {
1962 	case RPC_SERVER:
1963 		if (mir->mir_ref_cnt == 0)
1964 			mir->mir_hold_inbound = 0;
1965 		if (mir->mir_hold_inbound) {
1966 
1967 			ASSERT(cmp == NULL);
1968 			if (q->q_first == NULL) {
1969 
1970 				MIR_CLEAR_INRSRV(mir);
1971 
1972 				if (MIR_SVC_QUIESCED(mir)) {
1973 					cmp = mir->mir_svc_pend_mp;
1974 					mir->mir_svc_pend_mp = NULL;
1975 				}
1976 			}
1977 
1978 			mutex_exit(&mir->mir_mutex);
1979 
1980 			if (cmp != NULL) {
1981 				RPCLOG(16, "mir_rsrv: line %d: sending a held "
1982 				    "disconnect/ord rel indication upstream\n",
1983 				    __LINE__);
1984 				putnext(q, cmp);
1985 			}
1986 
1987 			return;
1988 		}
1989 		while (mp = getq(q)) {
1990 			if (mir->mir_krpc_cell &&
1991 			    (mir->mir_svc_no_more_msgs == 0)) {
1992 				/*
1993 				 * If we were idle, turn off idle timer since
1994 				 * we aren't idle any more.
1995 				 */
1996 				if (mir->mir_ref_cnt++ == 0)
1997 					stop_timer = B_TRUE;
1998 				if (mir_check_len(q,
1999 				    (int32_t)msgdsize(mp), mp))
2000 					return;
2001 				svc_queuereq(q, mp);
2002 			} else {
2003 				/*
2004 				 * Count # of times this happens. Should be
2005 				 * never, but experience shows otherwise.
2006 				 */
2007 				if (mir->mir_krpc_cell == NULL)
2008 					mir_krpc_cell_null++;
2009 				freemsg(mp);
2010 			}
2011 		}
2012 		break;
2013 	case RPC_CLIENT:
2014 		break;
2015 	default:
2016 		RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type);
2017 
2018 		if (q->q_first == NULL)
2019 			MIR_CLEAR_INRSRV(mir);
2020 
2021 		mutex_exit(&mir->mir_mutex);
2022 
2023 		return;
2024 	}
2025 
2026 	/*
2027 	 * The timer is stopped after all the messages are processed.
2028 	 * The reason is that stopping the timer releases the mir_mutex
2029 	 * lock temporarily.  This means that the request can be serviced
2030 	 * while we are still processing the message queue.  This is not
2031 	 * good.  So we stop the timer here instead.
2032 	 */
2033 	if (stop_timer)  {
2034 		RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref "
2035 		    "cnt going to non zero\n", (void *)WR(q));
2036 		mir_svc_idle_stop(WR(q), mir);
2037 	}
2038 
2039 	if (q->q_first == NULL) {
2040 
2041 		MIR_CLEAR_INRSRV(mir);
2042 
2043 		ASSERT(cmp == NULL);
2044 		if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) {
2045 			cmp = mir->mir_svc_pend_mp;
2046 			mir->mir_svc_pend_mp = NULL;
2047 		}
2048 
2049 		mutex_exit(&mir->mir_mutex);
2050 
2051 		if (cmp != NULL) {
2052 			RPCLOG(16, "mir_rsrv: line %d: sending a held "
2053 			    "disconnect/ord rel indication upstream\n",
2054 			    __LINE__);
2055 			putnext(q, cmp);
2056 		}
2057 
2058 		return;
2059 	}
2060 	mutex_exit(&mir->mir_mutex);
2061 }
2062 
2063 static int mir_svc_policy_fails;
2064 
2065 /*
2066  * Called to send an event code to nfsd/lockd so that it initiates
2067  * connection close.
2068  */
2069 static int
2070 mir_svc_policy_notify(queue_t *q, int event)
2071 {
2072 	mblk_t	*mp;
2073 #ifdef DEBUG
2074 	mir_t *mir = (mir_t *)q->q_ptr;
2075 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2076 #endif
2077 	ASSERT(q->q_flag & QREADR);
2078 
2079 	/*
2080 	 * Create an M_DATA message with the event code and pass it to the
2081 	 * Stream head (nfsd or whoever created the stream will consume it).
2082 	 */
2083 	mp = allocb(sizeof (int), BPRI_HI);
2084 
2085 	if (!mp) {
2086 
2087 		mir_svc_policy_fails++;
2088 		RPCLOG(16, "mir_svc_policy_notify: could not allocate event "
2089 		    "%d\n", event);
2090 		return (ENOMEM);
2091 	}
2092 
2093 	U32_TO_BE32(event, mp->b_rptr);
2094 	mp->b_wptr = mp->b_rptr + sizeof (int);
2095 	putnext(q, mp);
2096 	return (0);
2097 }
2098 
2099 /*
2100  * Server side: start the close phase. We want to get this rpcmod slot in an
2101  * idle state before mir_close() is called.
2102  */
2103 static void
2104 mir_svc_start_close(queue_t *wq, mir_t *mir)
2105 {
2106 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
2107 	ASSERT((wq->q_flag & QREADR) == 0);
2108 	ASSERT(mir->mir_type == RPC_SERVER);
2109 
2110 
2111 	/*
2112 	 * Do not accept any more messages.
2113 	 */
2114 	mir->mir_svc_no_more_msgs = 1;
2115 
2116 	/*
2117 	 * Next two statements will make the read service procedure invoke
2118 	 * svc_queuereq() on everything stuck in the streams read queue.
2119 	 * It's not necessary because enabling the write queue will
2120 	 * have the same effect, but why not speed the process along?
2121 	 */
2122 	mir->mir_hold_inbound = 0;
2123 	qenable(RD(wq));
2124 
2125 	/*
2126 	 * Meanwhile force the write service procedure to send the
2127 	 * responses downstream, regardless of flow control.
2128 	 */
2129 	qenable(wq);
2130 }
2131 
2132 /*
2133  * This routine is called directly by KRPC after a request is completed,
2134  * whether a reply was sent or the request was dropped.
2135  */
2136 static void
2137 mir_svc_release(queue_t *wq, mblk_t *mp)
2138 {
2139 	mir_t   *mir = (mir_t *)wq->q_ptr;
2140 	mblk_t	*cmp = NULL;
2141 
2142 	ASSERT((wq->q_flag & QREADR) == 0);
2143 	if (mp)
2144 		freemsg(mp);
2145 
2146 	mutex_enter(&mir->mir_mutex);
2147 
2148 	/*
2149 	 * Start idle processing if this is the last reference.
2150 	 */
2151 	if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) {
2152 		cmp = mir->mir_svc_pend_mp;
2153 		mir->mir_svc_pend_mp = NULL;
2154 	}
2155 
2156 	if (cmp) {
2157 		RPCLOG(16, "mir_svc_release: sending a held "
2158 		    "disconnect/ord rel indication upstream on queue 0x%p\n",
2159 		    (void *)RD(wq));
2160 
2161 		mutex_exit(&mir->mir_mutex);
2162 
2163 		putnext(RD(wq), cmp);
2164 
2165 		mutex_enter(&mir->mir_mutex);
2166 	}
2167 
2168 	/*
2169 	 * Start idle processing if this is the last reference.
2170 	 */
2171 	if (mir->mir_ref_cnt == 1 && mir->mir_inrservice == 0) {
2172 
2173 		RPCLOG(16, "mir_svc_release starting idle timer on 0x%p "
2174 		    "because ref cnt is zero\n", (void *) wq);
2175 
2176 		mir_svc_idle_start(wq, mir);
2177 	}
2178 
2179 	mir->mir_ref_cnt--;
2180 	ASSERT(mir->mir_ref_cnt >= 0);
2181 
2182 	/*
2183 	 * Wake up the thread waiting to close.
2184 	 */
2185 
2186 	if ((mir->mir_ref_cnt == 0) && mir->mir_closing)
2187 		cv_signal(&mir->mir_condvar);
2188 
2189 	mutex_exit(&mir->mir_mutex);
2190 }
2191 
2192 /*
2193  * This routine is called by server-side KRPC when it is ready to
2194  * handle inbound messages on the stream.
2195  */
2196 static void
2197 mir_svc_start(queue_t *wq)
2198 {
2199 	mir_t   *mir = (mir_t *)wq->q_ptr;
2200 
2201 	/*
2202 	 * no longer need to take the mir_mutex because the
2203 	 * mir_setup_complete field has been moved out of
2204 	 * the binary field protected by the mir_mutex.
2205 	 */
2206 
2207 	mir->mir_setup_complete = 1;
2208 	qenable(RD(wq));
2209 }
2210 
2211 /*
2212  * client side wrapper for stopping timer with normal idle timeout.
2213  */
2214 static void
2215 mir_clnt_idle_stop(queue_t *wq, mir_t *mir)
2216 {
2217 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
2218 	ASSERT((wq->q_flag & QREADR) == 0);
2219 	ASSERT(mir->mir_type == RPC_CLIENT);
2220 
2221 	mir_timer_stop(mir);
2222 }
2223 
2224 /*
2225  * client side wrapper for stopping timer with normal idle timeout.
2226  */
2227 static void
2228 mir_clnt_idle_start(queue_t *wq, mir_t *mir)
2229 {
2230 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
2231 	ASSERT((wq->q_flag & QREADR) == 0);
2232 	ASSERT(mir->mir_type == RPC_CLIENT);
2233 
2234 	mir_timer_start(wq, mir, mir->mir_idle_timeout);
2235 }
2236 
2237 /*
2238  * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on
2239  * end-points that aren't connected.
2240  */
2241 static void
2242 mir_clnt_idle_do_stop(queue_t *wq)
2243 {
2244 	mir_t   *mir = (mir_t *)wq->q_ptr;
2245 
2246 	RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq);
2247 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2248 	mutex_enter(&mir->mir_mutex);
2249 	mir_clnt_idle_stop(wq, mir);
2250 	mutex_exit(&mir->mir_mutex);
2251 }
2252 
2253 /*
2254  * Timer handler.  It handles idle timeout and memory shortage problem.
2255  */
2256 static void
2257 mir_timer(void *arg)
2258 {
2259 	queue_t *wq = (queue_t *)arg;
2260 	mir_t *mir = (mir_t *)wq->q_ptr;
2261 	boolean_t notify;
2262 
2263 	mutex_enter(&mir->mir_mutex);
2264 
2265 	/*
2266 	 * mir_timer_call is set only when either mir_timer_[start|stop]
2267 	 * is progressing.  And mir_timer() can only be run while they
2268 	 * are progressing if the timer is being stopped.  So just
2269 	 * return.
2270 	 */
2271 	if (mir->mir_timer_call) {
2272 		mutex_exit(&mir->mir_mutex);
2273 		return;
2274 	}
2275 	mir->mir_timer_id = 0;
2276 
2277 	switch (mir->mir_type) {
2278 	case RPC_CLIENT:
2279 
2280 		/*
2281 		 * For clients, the timer fires at clnt_idle_timeout
2282 		 * intervals.  If the activity marker (mir_clntreq) is
2283 		 * zero, then the stream has been idle since the last
2284 		 * timer event and we notify KRPC.  If mir_clntreq is
2285 		 * non-zero, then the stream is active and we just
2286 		 * restart the timer for another interval.  mir_clntreq
2287 		 * is set to 1 in mir_wput for every request passed
2288 		 * downstream.
2289 		 *
2290 		 * If this was a memory shortage timer reset the idle
2291 		 * timeout regardless; the mir_clntreq will not be a
2292 		 * valid indicator.
2293 		 *
2294 		 * The timer is initially started in mir_wput during
2295 		 * RPC_CLIENT ioctl processing.
2296 		 *
2297 		 * The timer interval can be changed for individual
2298 		 * streams with the ND variable "mir_idle_timeout".
2299 		 */
2300 		if (mir->mir_clntreq > 0 && mir->mir_use_timestamp +
2301 		    MSEC_TO_TICK(mir->mir_idle_timeout) - lbolt >= 0) {
2302 			clock_t tout;
2303 
2304 			tout = mir->mir_idle_timeout -
2305 			    TICK_TO_MSEC(lbolt - mir->mir_use_timestamp);
2306 			if (tout < 0)
2307 				tout = 1000;
2308 #if 0
2309 			printf("mir_timer[%d < %d + %d]: reset client timer "
2310 			    "to %d (ms)\n", TICK_TO_MSEC(lbolt),
2311 			    TICK_TO_MSEC(mir->mir_use_timestamp),
2312 			    mir->mir_idle_timeout, tout);
2313 #endif
2314 			mir->mir_clntreq = 0;
2315 			mir_timer_start(wq, mir, tout);
2316 			mutex_exit(&mir->mir_mutex);
2317 			return;
2318 		}
2319 #if 0
2320 printf("mir_timer[%d]: doing client timeout\n", lbolt / hz);
2321 #endif
2322 		/*
2323 		 * We are disconnecting, but not necessarily
2324 		 * closing. By not closing, we will fail to
2325 		 * pick up a possibly changed global timeout value,
2326 		 * unless we store it now.
2327 		 */
2328 		mir->mir_idle_timeout = clnt_idle_timeout;
2329 		mir_clnt_idle_start(wq, mir);
2330 
2331 		mutex_exit(&mir->mir_mutex);
2332 		/*
2333 		 * We pass T_ORDREL_REQ as an integer value
2334 		 * to KRPC as the indication that the stream
2335 		 * is idle.  This is not a T_ORDREL_REQ message,
2336 		 * it is just a convenient value since we call
2337 		 * the same KRPC routine for T_ORDREL_INDs and
2338 		 * T_DISCON_INDs.
2339 		 */
2340 		clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0);
2341 		return;
2342 
2343 	case RPC_SERVER:
2344 
2345 		/*
2346 		 * For servers, the timer is only running when the stream
2347 		 * is really idle or memory is short.  The timer is started
2348 		 * by mir_wput when mir_type is set to RPC_SERVER and
2349 		 * by mir_svc_idle_start whenever the stream goes idle
2350 		 * (mir_ref_cnt == 0).  The timer is cancelled in
2351 		 * mir_rput whenever a new inbound request is passed to KRPC
2352 		 * and the stream was previously idle.
2353 		 *
2354 		 * The timer interval can be changed for individual
2355 		 * streams with the ND variable "mir_idle_timeout".
2356 		 *
2357 		 * If the stream is not idle do nothing.
2358 		 */
2359 		if (!MIR_SVC_QUIESCED(mir)) {
2360 			mutex_exit(&mir->mir_mutex);
2361 			return;
2362 		}
2363 
2364 		notify = !mir->mir_inrservice;
2365 		mutex_exit(&mir->mir_mutex);
2366 
2367 		/*
2368 		 * If there is no packet queued up in read queue, the stream
2369 		 * is really idle so notify nfsd to close it.
2370 		 */
2371 		if (notify) {
2372 			RPCLOG(16, "mir_timer: telling stream head listener "
2373 			    "to close stream (0x%p)\n", (void *) RD(wq));
2374 			(void) mir_svc_policy_notify(RD(wq), 1);
2375 		}
2376 		return;
2377 	default:
2378 		RPCLOG(1, "mir_timer: unexpected mir_type %d\n",
2379 		    mir->mir_type);
2380 		mutex_exit(&mir->mir_mutex);
2381 		return;
2382 	}
2383 }
2384 
2385 /*
2386  * Called by the RPC package to send either a call or a return, or a
2387  * transport connection request.  Adds the record marking header.
2388  */
2389 static void
2390 mir_wput(queue_t *q, mblk_t *mp)
2391 {
2392 	uint_t	frag_header;
2393 	mir_t	*mir = (mir_t *)q->q_ptr;
2394 	uchar_t	*rptr = mp->b_rptr;
2395 
2396 	if (!mir) {
2397 		freemsg(mp);
2398 		return;
2399 	}
2400 
2401 	if (mp->b_datap->db_type != M_DATA) {
2402 		mir_wput_other(q, mp);
2403 		return;
2404 	}
2405 
2406 	if (mir->mir_ordrel_pending == 1) {
2407 		freemsg(mp);
2408 		RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n",
2409 		    (void *)q);
2410 		return;
2411 	}
2412 
2413 	frag_header = (uint_t)DLEN(mp);
2414 	frag_header |= MIR_LASTFRAG;
2415 
2416 	/* Stick in the 4 byte record marking header. */
2417 	if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) ||
2418 	    !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) {
2419 		/*
2420 		 * Since we know that M_DATA messages are created exclusively
2421 		 * by KRPC, we expect that KRPC will leave room for our header
2422 		 * and 4 byte align which is normal for XDR.
2423 		 * If KRPC (or someone else) does not cooperate, then we
2424 		 * just throw away the message.
2425 		 */
2426 		RPCLOG(1, "mir_wput: KRPC did not leave space for record "
2427 		    "fragment header (%d bytes left)\n",
2428 		    (int)(rptr - mp->b_datap->db_base));
2429 		freemsg(mp);
2430 		return;
2431 	}
2432 	rptr -= sizeof (uint32_t);
2433 	*(uint32_t *)rptr = htonl(frag_header);
2434 	mp->b_rptr = rptr;
2435 
2436 	mutex_enter(&mir->mir_mutex);
2437 	if (mir->mir_type == RPC_CLIENT) {
2438 		/*
2439 		 * For the client, set mir_clntreq to indicate that the
2440 		 * connection is active.
2441 		 */
2442 		mir->mir_clntreq = 1;
2443 		mir->mir_use_timestamp = lbolt;
2444 	}
2445 
2446 	/*
2447 	 * If we haven't already queued some data and the downstream module
2448 	 * can accept more data, send it on, otherwise we queue the message
2449 	 * and take other actions depending on mir_type.
2450 	 */
2451 	if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) {
2452 		mutex_exit(&mir->mir_mutex);
2453 
2454 		/*
2455 		 * Now we pass the RPC message downstream.
2456 		 */
2457 		putnext(q, mp);
2458 		return;
2459 	}
2460 
2461 	switch (mir->mir_type) {
2462 	case RPC_CLIENT:
2463 		/*
2464 		 * Check for a previous duplicate request on the
2465 		 * queue.  If there is one, then we throw away
2466 		 * the current message and let the previous one
2467 		 * go through.  If we can't find a duplicate, then
2468 		 * send this one.  This tap dance is an effort
2469 		 * to reduce traffic and processing requirements
2470 		 * under load conditions.
2471 		 */
2472 		if (mir_clnt_dup_request(q, mp)) {
2473 			mutex_exit(&mir->mir_mutex);
2474 			freemsg(mp);
2475 			return;
2476 		}
2477 		break;
2478 	case RPC_SERVER:
2479 		/*
2480 		 * Set mir_hold_inbound so that new inbound RPC
2481 		 * messages will be held until the client catches
2482 		 * up on the earlier replies.  This flag is cleared
2483 		 * in mir_wsrv after flow control is relieved;
2484 		 * the read-side queue is also enabled at that time.
2485 		 */
2486 		mir->mir_hold_inbound = 1;
2487 		break;
2488 	default:
2489 		RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type);
2490 		break;
2491 	}
2492 	mir->mir_inwservice = 1;
2493 	(void) putq(q, mp);
2494 	mutex_exit(&mir->mir_mutex);
2495 }
2496 
2497 static void
2498 mir_wput_other(queue_t *q, mblk_t *mp)
2499 {
2500 	mir_t	*mir = (mir_t *)q->q_ptr;
2501 	struct iocblk	*iocp;
2502 	uchar_t	*rptr = mp->b_rptr;
2503 	bool_t	flush_in_svc = FALSE;
2504 
2505 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2506 	switch (mp->b_datap->db_type) {
2507 	case M_IOCTL:
2508 		iocp = (struct iocblk *)rptr;
2509 		switch (iocp->ioc_cmd) {
2510 		case RPC_CLIENT:
2511 			mutex_enter(&mir->mir_mutex);
2512 			if (mir->mir_type != 0 &&
2513 			    mir->mir_type != iocp->ioc_cmd) {
2514 ioc_eperm:
2515 				mutex_exit(&mir->mir_mutex);
2516 				iocp->ioc_error = EPERM;
2517 				iocp->ioc_count = 0;
2518 				mp->b_datap->db_type = M_IOCACK;
2519 				qreply(q, mp);
2520 				return;
2521 			}
2522 
2523 			mir->mir_type = iocp->ioc_cmd;
2524 
2525 			/*
2526 			 * Clear mir_hold_inbound which was set to 1 by
2527 			 * mir_open.  This flag is not used on client
2528 			 * streams.
2529 			 */
2530 			mir->mir_hold_inbound = 0;
2531 			mir->mir_max_msg_sizep = &clnt_max_msg_size;
2532 
2533 			/*
2534 			 * Start the idle timer.  See mir_timer() for more
2535 			 * information on how client timers work.
2536 			 */
2537 			mir->mir_idle_timeout = clnt_idle_timeout;
2538 			mir_clnt_idle_start(q, mir);
2539 			mutex_exit(&mir->mir_mutex);
2540 
2541 			mp->b_datap->db_type = M_IOCACK;
2542 			qreply(q, mp);
2543 			return;
2544 		case RPC_SERVER:
2545 			mutex_enter(&mir->mir_mutex);
2546 			if (mir->mir_type != 0 &&
2547 			    mir->mir_type != iocp->ioc_cmd)
2548 				goto ioc_eperm;
2549 
2550 			/*
2551 			 * We don't clear mir_hold_inbound here because
2552 			 * mir_hold_inbound is used in the flow control
2553 			 * model. If we cleared it here, then we'd commit
2554 			 * a small violation to the model where the transport
2555 			 * might immediately block downstream flow.
2556 			 */
2557 
2558 			mir->mir_type = iocp->ioc_cmd;
2559 			mir->mir_max_msg_sizep = &svc_max_msg_size;
2560 
2561 			/*
2562 			 * Start the idle timer.  See mir_timer() for more
2563 			 * information on how server timers work.
2564 			 *
2565 			 * Note that it is important to start the idle timer
2566 			 * here so that connections time out even if we
2567 			 * never receive any data on them.
2568 			 */
2569 			mir->mir_idle_timeout = svc_idle_timeout;
2570 			RPCLOG(16, "mir_wput_other starting idle timer on 0x%p "
2571 			    "because we got RPC_SERVER ioctl\n", (void *)q);
2572 			mir_svc_idle_start(q, mir);
2573 			mutex_exit(&mir->mir_mutex);
2574 
2575 			mp->b_datap->db_type = M_IOCACK;
2576 			qreply(q, mp);
2577 			return;
2578 		default:
2579 			break;
2580 		}
2581 		break;
2582 
2583 	case M_PROTO:
2584 		if (mir->mir_type == RPC_CLIENT) {
2585 			/*
2586 			 * We are likely being called from the context of a
2587 			 * service procedure. So we need to enqueue. However
2588 			 * enqueing may put our message behind data messages.
2589 			 * So flush the data first.
2590 			 */
2591 			flush_in_svc = TRUE;
2592 		}
2593 		if ((mp->b_wptr - rptr) < sizeof (uint32_t) ||
2594 		    !IS_P2ALIGNED(rptr, sizeof (uint32_t)))
2595 			break;
2596 
2597 		switch (((union T_primitives *)rptr)->type) {
2598 		case T_DATA_REQ:
2599 			/* Don't pass T_DATA_REQ messages downstream. */
2600 			freemsg(mp);
2601 			return;
2602 		case T_ORDREL_REQ:
2603 			RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n",
2604 			    (void *)q);
2605 			mutex_enter(&mir->mir_mutex);
2606 			if (mir->mir_type != RPC_SERVER) {
2607 				/*
2608 				 * We are likely being called from
2609 				 * clnt_dispatch_notifyall(). Sending
2610 				 * a T_ORDREL_REQ will result in
2611 				 * a some kind of _IND message being sent,
2612 				 * will be another call to
2613 				 * clnt_dispatch_notifyall(). To keep the stack
2614 				 * lean, queue this message.
2615 				 */
2616 				mir->mir_inwservice = 1;
2617 				(void) putq(q, mp);
2618 				mutex_exit(&mir->mir_mutex);
2619 				return;
2620 			}
2621 
2622 			/*
2623 			 * Mark the structure such that we don't accept any
2624 			 * more requests from client. We could defer this
2625 			 * until we actually send the orderly release
2626 			 * request downstream, but all that does is delay
2627 			 * the closing of this stream.
2628 			 */
2629 			RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ "
2630 			    " so calling mir_svc_start_close\n", (void *)q);
2631 
2632 			mir_svc_start_close(q, mir);
2633 
2634 			/*
2635 			 * If we have sent down a T_ORDREL_REQ, don't send
2636 			 * any more.
2637 			 */
2638 			if (mir->mir_ordrel_pending) {
2639 				freemsg(mp);
2640 				mutex_exit(&mir->mir_mutex);
2641 				return;
2642 			}
2643 
2644 			/*
2645 			 * If the stream is not idle, then we hold the
2646 			 * orderly release until it becomes idle.  This
2647 			 * ensures that KRPC will be able to reply to
2648 			 * all requests that we have passed to it.
2649 			 *
2650 			 * We also queue the request if there is data already
2651 			 * queued, because we cannot allow the T_ORDREL_REQ
2652 			 * to go before data. When we had a separate reply
2653 			 * count, this was not a problem, because the
2654 			 * reply count was reconciled when mir_wsrv()
2655 			 * completed.
2656 			 */
2657 			if (!MIR_SVC_QUIESCED(mir) ||
2658 			    mir->mir_inwservice == 1) {
2659 				mir->mir_inwservice = 1;
2660 				(void) putq(q, mp);
2661 
2662 				RPCLOG(16, "mir_wput_other: queuing "
2663 				    "T_ORDREL_REQ on 0x%p\n", (void *)q);
2664 
2665 				mutex_exit(&mir->mir_mutex);
2666 				return;
2667 			}
2668 
2669 			/*
2670 			 * Mark the structure so that we know we sent
2671 			 * an orderly release request, and reset the idle timer.
2672 			 */
2673 			mir->mir_ordrel_pending = 1;
2674 
2675 			RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start"
2676 			    " on 0x%p because we got T_ORDREL_REQ\n",
2677 			    (void *)q);
2678 
2679 			mir_svc_idle_start(q, mir);
2680 			mutex_exit(&mir->mir_mutex);
2681 
2682 			/*
2683 			 * When we break, we will putnext the T_ORDREL_REQ.
2684 			 */
2685 			break;
2686 
2687 		case T_CONN_REQ:
2688 			mutex_enter(&mir->mir_mutex);
2689 			if (mir->mir_head_mp != NULL) {
2690 				freemsg(mir->mir_head_mp);
2691 				mir->mir_head_mp = NULL;
2692 				mir->mir_tail_mp = NULL;
2693 			}
2694 			mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
2695 			/*
2696 			 * Restart timer in case mir_clnt_idle_do_stop() was
2697 			 * called.
2698 			 */
2699 			mir->mir_idle_timeout = clnt_idle_timeout;
2700 			mir_clnt_idle_stop(q, mir);
2701 			mir_clnt_idle_start(q, mir);
2702 			mutex_exit(&mir->mir_mutex);
2703 			break;
2704 
2705 		default:
2706 			/*
2707 			 * T_DISCON_REQ is one of the interesting default
2708 			 * cases here. Ideally, an M_FLUSH is done before
2709 			 * T_DISCON_REQ is done. However, that is somewhat
2710 			 * cumbersome for clnt_cots.c to do. So we queue
2711 			 * T_DISCON_REQ, and let the service procedure
2712 			 * flush all M_DATA.
2713 			 */
2714 			break;
2715 		}
2716 		/* fallthru */;
2717 	default:
2718 		if (mp->b_datap->db_type >= QPCTL) {
2719 			if (mp->b_datap->db_type == M_FLUSH) {
2720 				if (mir->mir_type == RPC_CLIENT &&
2721 				    *mp->b_rptr & FLUSHW) {
2722 					RPCLOG(32, "mir_wput_other: flushing "
2723 					    "wq 0x%p\n", (void *)q);
2724 					if (*mp->b_rptr & FLUSHBAND) {
2725 						flushband(q, *(mp->b_rptr + 1),
2726 						    FLUSHDATA);
2727 					} else {
2728 						flushq(q, FLUSHDATA);
2729 					}
2730 				} else {
2731 					RPCLOG(32, "mir_wput_other: ignoring "
2732 					    "M_FLUSH on wq 0x%p\n", (void *)q);
2733 				}
2734 			}
2735 			break;
2736 		}
2737 
2738 		mutex_enter(&mir->mir_mutex);
2739 		if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) {
2740 			mutex_exit(&mir->mir_mutex);
2741 			break;
2742 		}
2743 		mir->mir_inwservice = 1;
2744 		mir->mir_inwflushdata = flush_in_svc;
2745 		(void) putq(q, mp);
2746 		mutex_exit(&mir->mir_mutex);
2747 		qenable(q);
2748 
2749 		return;
2750 	}
2751 	putnext(q, mp);
2752 }
2753 
2754 static void
2755 mir_wsrv(queue_t *q)
2756 {
2757 	mblk_t	*mp;
2758 	mir_t	*mir;
2759 	bool_t flushdata;
2760 
2761 	mir = (mir_t *)q->q_ptr;
2762 	mutex_enter(&mir->mir_mutex);
2763 
2764 	flushdata = mir->mir_inwflushdata;
2765 	mir->mir_inwflushdata = 0;
2766 
2767 	while (mp = getq(q)) {
2768 		if (mp->b_datap->db_type == M_DATA) {
2769 			/*
2770 			 * Do not send any more data if we have sent
2771 			 * a T_ORDREL_REQ.
2772 			 */
2773 			if (flushdata || mir->mir_ordrel_pending == 1) {
2774 				freemsg(mp);
2775 				continue;
2776 			}
2777 
2778 			/*
2779 			 * Make sure that the stream can really handle more
2780 			 * data.
2781 			 */
2782 			if (!MIR_WCANPUTNEXT(mir, q)) {
2783 				(void) putbq(q, mp);
2784 				mutex_exit(&mir->mir_mutex);
2785 				return;
2786 			}
2787 
2788 			/*
2789 			 * Now we pass the RPC message downstream.
2790 			 */
2791 			mutex_exit(&mir->mir_mutex);
2792 			putnext(q, mp);
2793 			mutex_enter(&mir->mir_mutex);
2794 			continue;
2795 		}
2796 
2797 		/*
2798 		 * This is not an RPC message, pass it downstream
2799 		 * (ignoring flow control) if the server side is not sending a
2800 		 * T_ORDREL_REQ downstream.
2801 		 */
2802 		if (mir->mir_type != RPC_SERVER ||
2803 		    ((union T_primitives *)mp->b_rptr)->type !=
2804 		    T_ORDREL_REQ) {
2805 			mutex_exit(&mir->mir_mutex);
2806 			putnext(q, mp);
2807 			mutex_enter(&mir->mir_mutex);
2808 			continue;
2809 		}
2810 
2811 		if (mir->mir_ordrel_pending == 1) {
2812 			/*
2813 			 * Don't send two T_ORDRELs
2814 			 */
2815 			freemsg(mp);
2816 			continue;
2817 		}
2818 
2819 		/*
2820 		 * Mark the structure so that we know we sent an orderly
2821 		 * release request.  We will check to see slot is idle at the
2822 		 * end of this routine, and if so, reset the idle timer to
2823 		 * handle orderly release timeouts.
2824 		 */
2825 		mir->mir_ordrel_pending = 1;
2826 		RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n",
2827 		    (void *)q);
2828 		/*
2829 		 * Send the orderly release downstream. If there are other
2830 		 * pending replies we won't be able to send them.  However,
2831 		 * the only reason we should send the orderly release is if
2832 		 * we were idle, or if an unusual event occurred.
2833 		 */
2834 		mutex_exit(&mir->mir_mutex);
2835 		putnext(q, mp);
2836 		mutex_enter(&mir->mir_mutex);
2837 	}
2838 
2839 	if (q->q_first == NULL)
2840 		/*
2841 		 * If we call mir_svc_idle_start() below, then
2842 		 * clearing mir_inwservice here will also result in
2843 		 * any thread waiting in mir_close() to be signaled.
2844 		 */
2845 		mir->mir_inwservice = 0;
2846 
2847 	if (mir->mir_type != RPC_SERVER) {
2848 		mutex_exit(&mir->mir_mutex);
2849 		return;
2850 	}
2851 
2852 	/*
2853 	 * If idle we call mir_svc_idle_start to start the timer (or wakeup
2854 	 * a close). Also make sure not to start the idle timer on the
2855 	 * listener stream. This can cause nfsd to send an orderly release
2856 	 * command on the listener stream.
2857 	 */
2858 	if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) {
2859 		RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p "
2860 		    "because mir slot is idle\n", (void *)q);
2861 		mir_svc_idle_start(q, mir);
2862 	}
2863 
2864 	/*
2865 	 * If outbound flow control has been relieved, then allow new
2866 	 * inbound requests to be processed.
2867 	 */
2868 	if (mir->mir_hold_inbound) {
2869 		mir->mir_hold_inbound = 0;
2870 		qenable(RD(q));
2871 	}
2872 	mutex_exit(&mir->mir_mutex);
2873 }
2874 
2875 static void
2876 mir_disconnect(queue_t *q, mir_t *mir)
2877 {
2878 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
2879 
2880 	switch (mir->mir_type) {
2881 	case RPC_CLIENT:
2882 		/*
2883 		 * We are disconnecting, but not necessarily
2884 		 * closing. By not closing, we will fail to
2885 		 * pick up a possibly changed global timeout value,
2886 		 * unless we store it now.
2887 		 */
2888 		mir->mir_idle_timeout = clnt_idle_timeout;
2889 		mir_clnt_idle_start(WR(q), mir);
2890 		mutex_exit(&mir->mir_mutex);
2891 
2892 		/*
2893 		 * T_DISCON_REQ is passed to KRPC as an integer value
2894 		 * (this is not a TPI message).  It is used as a
2895 		 * convenient value to indicate a sanity check
2896 		 * failure -- the same KRPC routine is also called
2897 		 * for T_DISCON_INDs and T_ORDREL_INDs.
2898 		 */
2899 		clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0);
2900 		break;
2901 
2902 	case RPC_SERVER:
2903 		mir->mir_svc_no_more_msgs = 1;
2904 		mir_svc_idle_stop(WR(q), mir);
2905 		mutex_exit(&mir->mir_mutex);
2906 		RPCLOG(16, "mir_disconnect: telling "
2907 		    "stream head listener to disconnect stream "
2908 		    "(0x%p)\n", (void *) q);
2909 		(void) mir_svc_policy_notify(q, 2);
2910 		break;
2911 
2912 	default:
2913 		mutex_exit(&mir->mir_mutex);
2914 		break;
2915 	}
2916 }
2917 
2918 /*
2919  * Sanity check the message length, and if it's too large, shutdown the
2920  * connection.  Returns 1 if the connection is shutdown; 0 otherwise.
2921  */
2922 static int
2923 mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp)
2924 {
2925 	mir_t *mir = q->q_ptr;
2926 	uint_t maxsize = 0;
2927 
2928 	if (mir->mir_max_msg_sizep != NULL)
2929 		maxsize = *mir->mir_max_msg_sizep;
2930 
2931 	if (maxsize == 0 || frag_len <= (int)maxsize)
2932 		return (0);
2933 
2934 	freemsg(head_mp);
2935 	mir->mir_head_mp = NULL;
2936 	mir->mir_tail_mp = NULL;
2937 	mir->mir_frag_header = 0;
2938 	mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
2939 	if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
2940 		cmn_err(CE_NOTE,
2941 		    "KRPC: record fragment from %s of size(%d) exceeds "
2942 		    "maximum (%u). Disconnecting",
2943 		    (mir->mir_type == RPC_CLIENT) ? "server" :
2944 		    (mir->mir_type == RPC_SERVER) ? "client" :
2945 		    "test tool", frag_len, maxsize);
2946 	}
2947 
2948 	mir_disconnect(q, mir);
2949 	return (1);
2950 }
2951