xref: /freebsd/sys/rpc/svc_vc.c (revision 7a1c0d963366a31363d3705697a083dd8efee077)
1 /*	$NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $	*/
2 
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  *
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  *
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  *
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  *
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  *
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31 
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)svc_tcp.c	2.2 88/08/01 4.0 RPCSRC";
35 #endif
36 #include <sys/cdefs.h>
37 __FBSDID("$FreeBSD$");
38 
39 /*
40  * svc_vc.c, Server side for Connection Oriented based RPC.
41  *
42  * Actually implements two flavors of transporter -
43  * a tcp rendezvouser (a listner and connection establisher)
44  * and a record/tcp stream.
45  */
46 
47 #include <sys/param.h>
48 #include <sys/lock.h>
49 #include <sys/kernel.h>
50 #include <sys/malloc.h>
51 #include <sys/mbuf.h>
52 #include <sys/mutex.h>
53 #include <sys/proc.h>
54 #include <sys/protosw.h>
55 #include <sys/queue.h>
56 #include <sys/socket.h>
57 #include <sys/socketvar.h>
58 #include <sys/sx.h>
59 #include <sys/systm.h>
60 #include <sys/uio.h>
61 
62 #include <net/vnet.h>
63 
64 #include <netinet/tcp.h>
65 
66 #include <rpc/rpc.h>
67 
68 #include <rpc/rpc_com.h>
69 
70 #include <security/mac/mac_framework.h>
71 
72 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *,
73     struct sockaddr **, struct mbuf **);
74 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *);
75 static void svc_vc_rendezvous_destroy(SVCXPRT *);
76 static bool_t svc_vc_null(void);
77 static void svc_vc_destroy(SVCXPRT *);
78 static enum xprt_stat svc_vc_stat(SVCXPRT *);
79 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
80     struct sockaddr **, struct mbuf **);
81 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
82     struct sockaddr *, struct mbuf *);
83 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
84 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
85     void *in);
86 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
87     struct sockaddr *raddr);
88 static int svc_vc_accept(struct socket *head, struct socket **sop);
89 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag);
90 
91 static struct xp_ops svc_vc_rendezvous_ops = {
92 	.xp_recv =	svc_vc_rendezvous_recv,
93 	.xp_stat =	svc_vc_rendezvous_stat,
94 	.xp_reply =	(bool_t (*)(SVCXPRT *, struct rpc_msg *,
95 		struct sockaddr *, struct mbuf *))svc_vc_null,
96 	.xp_destroy =	svc_vc_rendezvous_destroy,
97 	.xp_control =	svc_vc_rendezvous_control
98 };
99 
100 static struct xp_ops svc_vc_ops = {
101 	.xp_recv =	svc_vc_recv,
102 	.xp_stat =	svc_vc_stat,
103 	.xp_reply =	svc_vc_reply,
104 	.xp_destroy =	svc_vc_destroy,
105 	.xp_control =	svc_vc_control
106 };
107 
108 struct cf_conn {  /* kept in xprt->xp_p1 for actual connection */
109 	enum xprt_stat strm_stat;
110 	struct mbuf *mpending;	/* unparsed data read from the socket */
111 	struct mbuf *mreq;	/* current record being built from mpending */
112 	uint32_t resid;		/* number of bytes needed for fragment */
113 	bool_t eor;		/* reading last fragment of current record */
114 };
115 
116 /*
117  * Usage:
118  *	xprt = svc_vc_create(sock, send_buf_size, recv_buf_size);
119  *
120  * Creates, registers, and returns a (rpc) tcp based transporter.
121  * Once *xprt is initialized, it is registered as a transporter
122  * see (svc.h, xprt_register).  This routine returns
123  * a NULL if a problem occurred.
124  *
125  * The filedescriptor passed in is expected to refer to a bound, but
126  * not yet connected socket.
127  *
128  * Since streams do buffered io similar to stdio, the caller can specify
129  * how big the send and receive buffers are via the second and third parms;
130  * 0 => use the system default.
131  */
132 SVCXPRT *
133 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
134     size_t recvsize)
135 {
136 	SVCXPRT *xprt;
137 	struct sockaddr* sa;
138 	int error;
139 
140 	if (so->so_state & SS_ISCONNECTED) {
141 		error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa);
142 		if (error)
143 			return (NULL);
144 		xprt = svc_vc_create_conn(pool, so, sa);
145 		free(sa, M_SONAME);
146 		return (xprt);
147 	}
148 
149 	xprt = svc_xprt_alloc();
150 	sx_init(&xprt->xp_lock, "xprt->xp_lock");
151 	xprt->xp_pool = pool;
152 	xprt->xp_socket = so;
153 	xprt->xp_p1 = NULL;
154 	xprt->xp_p2 = NULL;
155 	xprt->xp_ops = &svc_vc_rendezvous_ops;
156 
157 	error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
158 	if (error) {
159 		goto cleanup_svc_vc_create;
160 	}
161 
162 	memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
163 	free(sa, M_SONAME);
164 
165 	xprt_register(xprt);
166 
167 	solisten(so, SOMAXCONN, curthread);
168 
169 	SOCKBUF_LOCK(&so->so_rcv);
170 	xprt->xp_upcallset = 1;
171 	soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
172 	SOCKBUF_UNLOCK(&so->so_rcv);
173 
174 	return (xprt);
175 cleanup_svc_vc_create:
176 	if (xprt)
177 		svc_xprt_free(xprt);
178 	return (NULL);
179 }
180 
181 /*
182  * Create a new transport for a socket optained via soaccept().
183  */
184 SVCXPRT *
185 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
186 {
187 	SVCXPRT *xprt = NULL;
188 	struct cf_conn *cd = NULL;
189 	struct sockaddr* sa = NULL;
190 	struct sockopt opt;
191 	int one = 1;
192 	int error;
193 
194 	bzero(&opt, sizeof(struct sockopt));
195 	opt.sopt_dir = SOPT_SET;
196 	opt.sopt_level = SOL_SOCKET;
197 	opt.sopt_name = SO_KEEPALIVE;
198 	opt.sopt_val = &one;
199 	opt.sopt_valsize = sizeof(one);
200 	error = sosetopt(so, &opt);
201 	if (error) {
202 		return (NULL);
203 	}
204 
205 	if (so->so_proto->pr_protocol == IPPROTO_TCP) {
206 		bzero(&opt, sizeof(struct sockopt));
207 		opt.sopt_dir = SOPT_SET;
208 		opt.sopt_level = IPPROTO_TCP;
209 		opt.sopt_name = TCP_NODELAY;
210 		opt.sopt_val = &one;
211 		opt.sopt_valsize = sizeof(one);
212 		error = sosetopt(so, &opt);
213 		if (error) {
214 			return (NULL);
215 		}
216 	}
217 
218 	cd = mem_alloc(sizeof(*cd));
219 	cd->strm_stat = XPRT_IDLE;
220 
221 	xprt = svc_xprt_alloc();
222 	sx_init(&xprt->xp_lock, "xprt->xp_lock");
223 	xprt->xp_pool = pool;
224 	xprt->xp_socket = so;
225 	xprt->xp_p1 = cd;
226 	xprt->xp_p2 = NULL;
227 	xprt->xp_ops = &svc_vc_ops;
228 
229 	/*
230 	 * See http://www.connectathon.org/talks96/nfstcp.pdf - client
231 	 * has a 5 minute timer, server has a 6 minute timer.
232 	 */
233 	xprt->xp_idletimeout = 6 * 60;
234 
235 	memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len);
236 
237 	error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
238 	if (error)
239 		goto cleanup_svc_vc_create;
240 
241 	memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
242 	free(sa, M_SONAME);
243 
244 	xprt_register(xprt);
245 
246 	SOCKBUF_LOCK(&so->so_rcv);
247 	xprt->xp_upcallset = 1;
248 	soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
249 	SOCKBUF_UNLOCK(&so->so_rcv);
250 
251 	/*
252 	 * Throw the transport into the active list in case it already
253 	 * has some data buffered.
254 	 */
255 	sx_xlock(&xprt->xp_lock);
256 	xprt_active(xprt);
257 	sx_xunlock(&xprt->xp_lock);
258 
259 	return (xprt);
260 cleanup_svc_vc_create:
261 	if (xprt) {
262 		mem_free(xprt, sizeof(*xprt));
263 	}
264 	if (cd)
265 		mem_free(cd, sizeof(*cd));
266 	return (NULL);
267 }
268 
269 /*
270  * This does all of the accept except the final call to soaccept. The
271  * caller will call soaccept after dropping its locks (soaccept may
272  * call malloc).
273  */
274 int
275 svc_vc_accept(struct socket *head, struct socket **sop)
276 {
277 	int error = 0;
278 	struct socket *so;
279 
280 	if ((head->so_options & SO_ACCEPTCONN) == 0) {
281 		error = EINVAL;
282 		goto done;
283 	}
284 #ifdef MAC
285 	error = mac_socket_check_accept(curthread->td_ucred, head);
286 	if (error != 0)
287 		goto done;
288 #endif
289 	ACCEPT_LOCK();
290 	if (TAILQ_EMPTY(&head->so_comp)) {
291 		ACCEPT_UNLOCK();
292 		error = EWOULDBLOCK;
293 		goto done;
294 	}
295 	so = TAILQ_FIRST(&head->so_comp);
296 	KASSERT(!(so->so_qstate & SQ_INCOMP), ("svc_vc_accept: so SQ_INCOMP"));
297 	KASSERT(so->so_qstate & SQ_COMP, ("svc_vc_accept: so not SQ_COMP"));
298 
299 	/*
300 	 * Before changing the flags on the socket, we have to bump the
301 	 * reference count.  Otherwise, if the protocol calls sofree(),
302 	 * the socket will be released due to a zero refcount.
303 	 * XXX might not need soref() since this is simpler than kern_accept.
304 	 */
305 	SOCK_LOCK(so);			/* soref() and so_state update */
306 	soref(so);			/* file descriptor reference */
307 
308 	TAILQ_REMOVE(&head->so_comp, so, so_list);
309 	head->so_qlen--;
310 	so->so_state |= (head->so_state & SS_NBIO);
311 	so->so_qstate &= ~SQ_COMP;
312 	so->so_head = NULL;
313 
314 	SOCK_UNLOCK(so);
315 	ACCEPT_UNLOCK();
316 
317 	*sop = so;
318 
319 	/* connection has been removed from the listen queue */
320 	KNOTE_UNLOCKED(&head->so_rcv.sb_sel.si_note, 0);
321 done:
322 	return (error);
323 }
324 
325 /*ARGSUSED*/
326 static bool_t
327 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
328     struct sockaddr **addrp, struct mbuf **mp)
329 {
330 	struct socket *so = NULL;
331 	struct sockaddr *sa = NULL;
332 	int error;
333 	SVCXPRT *new_xprt;
334 
335 	/*
336 	 * The socket upcall calls xprt_active() which will eventually
337 	 * cause the server to call us here. We attempt to accept a
338 	 * connection from the socket and turn it into a new
339 	 * transport. If the accept fails, we have drained all pending
340 	 * connections so we call xprt_inactive().
341 	 */
342 	sx_xlock(&xprt->xp_lock);
343 
344 	error = svc_vc_accept(xprt->xp_socket, &so);
345 
346 	if (error == EWOULDBLOCK) {
347 		/*
348 		 * We must re-test for new connections after taking
349 		 * the lock to protect us in the case where a new
350 		 * connection arrives after our call to accept fails
351 		 * with EWOULDBLOCK. The pool lock protects us from
352 		 * racing the upcall after our TAILQ_EMPTY() call
353 		 * returns false.
354 		 */
355 		ACCEPT_LOCK();
356 		mtx_lock(&xprt->xp_pool->sp_lock);
357 		if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
358 			xprt_inactive_locked(xprt);
359 		mtx_unlock(&xprt->xp_pool->sp_lock);
360 		ACCEPT_UNLOCK();
361 		sx_xunlock(&xprt->xp_lock);
362 		return (FALSE);
363 	}
364 
365 	if (error) {
366 		SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
367 		if (xprt->xp_upcallset) {
368 			xprt->xp_upcallset = 0;
369 			soupcall_clear(xprt->xp_socket, SO_RCV);
370 		}
371 		SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
372 		xprt_inactive(xprt);
373 		sx_xunlock(&xprt->xp_lock);
374 		return (FALSE);
375 	}
376 
377 	sx_xunlock(&xprt->xp_lock);
378 
379 	sa = 0;
380 	error = soaccept(so, &sa);
381 
382 	if (error) {
383 		/*
384 		 * XXX not sure if I need to call sofree or soclose here.
385 		 */
386 		if (sa)
387 			free(sa, M_SONAME);
388 		return (FALSE);
389 	}
390 
391 	/*
392 	 * svc_vc_create_conn will call xprt_register - we don't need
393 	 * to do anything with the new connection except derefence it.
394 	 */
395 	new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa);
396 	if (!new_xprt) {
397 		soclose(so);
398 	} else {
399 		SVC_RELEASE(new_xprt);
400 	}
401 
402 	free(sa, M_SONAME);
403 
404 	return (FALSE); /* there is never an rpc msg to be processed */
405 }
406 
407 /*ARGSUSED*/
408 static enum xprt_stat
409 svc_vc_rendezvous_stat(SVCXPRT *xprt)
410 {
411 
412 	return (XPRT_IDLE);
413 }
414 
415 static void
416 svc_vc_destroy_common(SVCXPRT *xprt)
417 {
418 	SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
419 	if (xprt->xp_upcallset) {
420 		xprt->xp_upcallset = 0;
421 		soupcall_clear(xprt->xp_socket, SO_RCV);
422 	}
423 	SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
424 
425 	sx_destroy(&xprt->xp_lock);
426 	if (xprt->xp_socket)
427 		(void)soclose(xprt->xp_socket);
428 
429 	if (xprt->xp_netid)
430 		(void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1);
431 	svc_xprt_free(xprt);
432 }
433 
434 static void
435 svc_vc_rendezvous_destroy(SVCXPRT *xprt)
436 {
437 
438 	svc_vc_destroy_common(xprt);
439 }
440 
441 static void
442 svc_vc_destroy(SVCXPRT *xprt)
443 {
444 	struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
445 
446 	svc_vc_destroy_common(xprt);
447 
448 	if (cd->mreq)
449 		m_freem(cd->mreq);
450 	if (cd->mpending)
451 		m_freem(cd->mpending);
452 	mem_free(cd, sizeof(*cd));
453 }
454 
455 /*ARGSUSED*/
456 static bool_t
457 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in)
458 {
459 	return (FALSE);
460 }
461 
462 static bool_t
463 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in)
464 {
465 
466 	return (FALSE);
467 }
468 
469 static enum xprt_stat
470 svc_vc_stat(SVCXPRT *xprt)
471 {
472 	struct cf_conn *cd;
473 	struct mbuf *m;
474 	size_t n;
475 
476 	cd = (struct cf_conn *)(xprt->xp_p1);
477 
478 	if (cd->strm_stat == XPRT_DIED)
479 		return (XPRT_DIED);
480 
481 	/*
482 	 * Return XPRT_MOREREQS if we have buffered data and we are
483 	 * mid-record or if we have enough data for a record
484 	 * marker. Since this is only a hint, we read mpending and
485 	 * resid outside the lock. We do need to take the lock if we
486 	 * have to traverse the mbuf chain.
487 	 */
488 	if (cd->mpending) {
489 		if (cd->resid)
490 			return (XPRT_MOREREQS);
491 		n = 0;
492 		sx_xlock(&xprt->xp_lock);
493 		m = cd->mpending;
494 		while (m && n < sizeof(uint32_t)) {
495 			n += m->m_len;
496 			m = m->m_next;
497 		}
498 		sx_xunlock(&xprt->xp_lock);
499 		if (n >= sizeof(uint32_t))
500 			return (XPRT_MOREREQS);
501 	}
502 
503 	if (soreadable(xprt->xp_socket))
504 		return (XPRT_MOREREQS);
505 
506 	return (XPRT_IDLE);
507 }
508 
509 static bool_t
510 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
511     struct sockaddr **addrp, struct mbuf **mp)
512 {
513 	struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
514 	struct uio uio;
515 	struct mbuf *m;
516 	XDR xdrs;
517 	int error, rcvflag;
518 
519 	/*
520 	 * Serialise access to the socket and our own record parsing
521 	 * state.
522 	 */
523 	sx_xlock(&xprt->xp_lock);
524 
525 	for (;;) {
526 		/*
527 		 * If we have an mbuf chain in cd->mpending, try to parse a
528 		 * record from it, leaving the result in cd->mreq. If we don't
529 		 * have a complete record, leave the partial result in
530 		 * cd->mreq and try to read more from the socket.
531 		 */
532 		if (cd->mpending) {
533 			/*
534 			 * If cd->resid is non-zero, we have part of the
535 			 * record already, otherwise we are expecting a record
536 			 * marker.
537 			 */
538 			if (!cd->resid) {
539 				/*
540 				 * See if there is enough data buffered to
541 				 * make up a record marker. Make sure we can
542 				 * handle the case where the record marker is
543 				 * split across more than one mbuf.
544 				 */
545 				size_t n = 0;
546 				uint32_t header;
547 
548 				m = cd->mpending;
549 				while (n < sizeof(uint32_t) && m) {
550 					n += m->m_len;
551 					m = m->m_next;
552 				}
553 				if (n < sizeof(uint32_t))
554 					goto readmore;
555 				m_copydata(cd->mpending, 0, sizeof(header),
556 				    (char *)&header);
557 				header = ntohl(header);
558 				cd->eor = (header & 0x80000000) != 0;
559 				cd->resid = header & 0x7fffffff;
560 				m_adj(cd->mpending, sizeof(uint32_t));
561 			}
562 
563 			/*
564 			 * Start pulling off mbufs from cd->mpending
565 			 * until we either have a complete record or
566 			 * we run out of data. We use m_split to pull
567 			 * data - it will pull as much as possible and
568 			 * split the last mbuf if necessary.
569 			 */
570 			while (cd->mpending && cd->resid) {
571 				m = cd->mpending;
572 				if (cd->mpending->m_next
573 				    || cd->mpending->m_len > cd->resid)
574 					cd->mpending = m_split(cd->mpending,
575 					    cd->resid, M_WAIT);
576 				else
577 					cd->mpending = NULL;
578 				if (cd->mreq)
579 					m_last(cd->mreq)->m_next = m;
580 				else
581 					cd->mreq = m;
582 				while (m) {
583 					cd->resid -= m->m_len;
584 					m = m->m_next;
585 				}
586 			}
587 
588 			/*
589 			 * If cd->resid is zero now, we have managed to
590 			 * receive a record fragment from the stream. Check
591 			 * for the end-of-record mark to see if we need more.
592 			 */
593 			if (cd->resid == 0) {
594 				if (!cd->eor)
595 					continue;
596 
597 				/*
598 				 * Success - we have a complete record in
599 				 * cd->mreq.
600 				 */
601 				xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
602 				cd->mreq = NULL;
603 				sx_xunlock(&xprt->xp_lock);
604 
605 				if (! xdr_callmsg(&xdrs, msg)) {
606 					XDR_DESTROY(&xdrs);
607 					return (FALSE);
608 				}
609 
610 				*addrp = NULL;
611 				*mp = xdrmbuf_getall(&xdrs);
612 				XDR_DESTROY(&xdrs);
613 
614 				return (TRUE);
615 			}
616 		}
617 
618 	readmore:
619 		/*
620 		 * The socket upcall calls xprt_active() which will eventually
621 		 * cause the server to call us here. We attempt to
622 		 * read as much as possible from the socket and put
623 		 * the result in cd->mpending. If the read fails,
624 		 * we have drained both cd->mpending and the socket so
625 		 * we can call xprt_inactive().
626 		 */
627 		uio.uio_resid = 1000000000;
628 		uio.uio_td = curthread;
629 		m = NULL;
630 		rcvflag = MSG_DONTWAIT;
631 		error = soreceive(xprt->xp_socket, NULL, &uio, &m, NULL,
632 		    &rcvflag);
633 
634 		if (error == EWOULDBLOCK) {
635 			/*
636 			 * We must re-test for readability after
637 			 * taking the lock to protect us in the case
638 			 * where a new packet arrives on the socket
639 			 * after our call to soreceive fails with
640 			 * EWOULDBLOCK. The pool lock protects us from
641 			 * racing the upcall after our soreadable()
642 			 * call returns false.
643 			 */
644 			mtx_lock(&xprt->xp_pool->sp_lock);
645 			if (!soreadable(xprt->xp_socket))
646 				xprt_inactive_locked(xprt);
647 			mtx_unlock(&xprt->xp_pool->sp_lock);
648 			sx_xunlock(&xprt->xp_lock);
649 			return (FALSE);
650 		}
651 
652 		if (error) {
653 			SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
654 			if (xprt->xp_upcallset) {
655 				xprt->xp_upcallset = 0;
656 				soupcall_clear(xprt->xp_socket, SO_RCV);
657 			}
658 			SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
659 			xprt_inactive(xprt);
660 			cd->strm_stat = XPRT_DIED;
661 			sx_xunlock(&xprt->xp_lock);
662 			return (FALSE);
663 		}
664 
665 		if (!m) {
666 			/*
667 			 * EOF - the other end has closed the socket.
668 			 */
669 			xprt_inactive(xprt);
670 			cd->strm_stat = XPRT_DIED;
671 			sx_xunlock(&xprt->xp_lock);
672 			return (FALSE);
673 		}
674 
675 		if (cd->mpending)
676 			m_last(cd->mpending)->m_next = m;
677 		else
678 			cd->mpending = m;
679 	}
680 }
681 
682 static bool_t
683 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
684     struct sockaddr *addr, struct mbuf *m)
685 {
686 	XDR xdrs;
687 	struct mbuf *mrep;
688 	bool_t stat = TRUE;
689 	int error;
690 
691 	/*
692 	 * Leave space for record mark.
693 	 */
694 	MGETHDR(mrep, M_WAIT, MT_DATA);
695 	mrep->m_len = 0;
696 	mrep->m_data += sizeof(uint32_t);
697 
698 	xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
699 
700 	if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
701 	    msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
702 		if (!xdr_replymsg(&xdrs, msg))
703 			stat = FALSE;
704 		else
705 			xdrmbuf_append(&xdrs, m);
706 	} else {
707 		stat = xdr_replymsg(&xdrs, msg);
708 	}
709 
710 	if (stat) {
711 		m_fixhdr(mrep);
712 
713 		/*
714 		 * Prepend a record marker containing the reply length.
715 		 */
716 		M_PREPEND(mrep, sizeof(uint32_t), M_WAIT);
717 		*mtod(mrep, uint32_t *) =
718 			htonl(0x80000000 | (mrep->m_pkthdr.len
719 				- sizeof(uint32_t)));
720 		error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
721 		    0, curthread);
722 		if (!error) {
723 			stat = TRUE;
724 		}
725 	} else {
726 		m_freem(mrep);
727 	}
728 
729 	XDR_DESTROY(&xdrs);
730 	xprt->xp_p2 = NULL;
731 
732 	return (stat);
733 }
734 
735 static bool_t
736 svc_vc_null()
737 {
738 
739 	return (FALSE);
740 }
741 
742 static int
743 svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
744 {
745 	SVCXPRT *xprt = (SVCXPRT *) arg;
746 
747 	xprt_active(xprt);
748 	return (SU_OK);
749 }
750 
751 #if 0
752 /*
753  * Get the effective UID of the sending process. Used by rpcbind, keyserv
754  * and rpc.yppasswdd on AF_LOCAL.
755  */
756 int
757 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) {
758 	int sock, ret;
759 	gid_t egid;
760 	uid_t euid;
761 	struct sockaddr *sa;
762 
763 	sock = transp->xp_fd;
764 	sa = (struct sockaddr *)transp->xp_rtaddr;
765 	if (sa->sa_family == AF_LOCAL) {
766 		ret = getpeereid(sock, &euid, &egid);
767 		if (ret == 0)
768 			*uid = euid;
769 		return (ret);
770 	} else
771 		return (-1);
772 }
773 #endif
774