xref: /freebsd/sys/rpc/svc_vc.c (revision c6ec7d31830ab1c80edae95ad5e4b9dba10c47ac)
1 /*	$NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $	*/
2 
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  *
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  *
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  *
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  *
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  *
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31 
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)svc_tcp.c	2.2 88/08/01 4.0 RPCSRC";
35 #endif
36 #include <sys/cdefs.h>
37 __FBSDID("$FreeBSD$");
38 
39 /*
40  * svc_vc.c, Server side for Connection Oriented based RPC.
41  *
42  * Actually implements two flavors of transporter -
43  * a tcp rendezvouser (a listner and connection establisher)
44  * and a record/tcp stream.
45  */
46 
47 #include <sys/param.h>
48 #include <sys/lock.h>
49 #include <sys/kernel.h>
50 #include <sys/malloc.h>
51 #include <sys/mbuf.h>
52 #include <sys/mutex.h>
53 #include <sys/proc.h>
54 #include <sys/protosw.h>
55 #include <sys/queue.h>
56 #include <sys/socket.h>
57 #include <sys/socketvar.h>
58 #include <sys/sx.h>
59 #include <sys/systm.h>
60 #include <sys/uio.h>
61 
62 #include <net/vnet.h>
63 
64 #include <netinet/tcp.h>
65 
66 #include <rpc/rpc.h>
67 
68 #include <rpc/krpc.h>
69 #include <rpc/rpc_com.h>
70 
71 #include <security/mac/mac_framework.h>
72 
73 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *,
74     struct sockaddr **, struct mbuf **);
75 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *);
76 static void svc_vc_rendezvous_destroy(SVCXPRT *);
77 static bool_t svc_vc_null(void);
78 static void svc_vc_destroy(SVCXPRT *);
79 static enum xprt_stat svc_vc_stat(SVCXPRT *);
80 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
81     struct sockaddr **, struct mbuf **);
82 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
83     struct sockaddr *, struct mbuf *);
84 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
85 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
86     void *in);
87 static void svc_vc_backchannel_destroy(SVCXPRT *);
88 static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
89 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
90     struct sockaddr **, struct mbuf **);
91 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
92     struct sockaddr *, struct mbuf *);
93 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
94     void *in);
95 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
96     struct sockaddr *raddr);
97 static int svc_vc_accept(struct socket *head, struct socket **sop);
98 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag);
99 
100 static struct xp_ops svc_vc_rendezvous_ops = {
101 	.xp_recv =	svc_vc_rendezvous_recv,
102 	.xp_stat =	svc_vc_rendezvous_stat,
103 	.xp_reply =	(bool_t (*)(SVCXPRT *, struct rpc_msg *,
104 		struct sockaddr *, struct mbuf *))svc_vc_null,
105 	.xp_destroy =	svc_vc_rendezvous_destroy,
106 	.xp_control =	svc_vc_rendezvous_control
107 };
108 
109 static struct xp_ops svc_vc_ops = {
110 	.xp_recv =	svc_vc_recv,
111 	.xp_stat =	svc_vc_stat,
112 	.xp_reply =	svc_vc_reply,
113 	.xp_destroy =	svc_vc_destroy,
114 	.xp_control =	svc_vc_control
115 };
116 
117 static struct xp_ops svc_vc_backchannel_ops = {
118 	.xp_recv =	svc_vc_backchannel_recv,
119 	.xp_stat =	svc_vc_backchannel_stat,
120 	.xp_reply =	svc_vc_backchannel_reply,
121 	.xp_destroy =	svc_vc_backchannel_destroy,
122 	.xp_control =	svc_vc_backchannel_control
123 };
124 
125 /*
126  * Usage:
127  *	xprt = svc_vc_create(sock, send_buf_size, recv_buf_size);
128  *
129  * Creates, registers, and returns a (rpc) tcp based transporter.
130  * Once *xprt is initialized, it is registered as a transporter
131  * see (svc.h, xprt_register).  This routine returns
132  * a NULL if a problem occurred.
133  *
134  * The filedescriptor passed in is expected to refer to a bound, but
135  * not yet connected socket.
136  *
137  * Since streams do buffered io similar to stdio, the caller can specify
138  * how big the send and receive buffers are via the second and third parms;
139  * 0 => use the system default.
140  */
141 SVCXPRT *
142 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
143     size_t recvsize)
144 {
145 	SVCXPRT *xprt;
146 	struct sockaddr* sa;
147 	int error;
148 
149 	if (so->so_state & SS_ISCONNECTED) {
150 		error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa);
151 		if (error)
152 			return (NULL);
153 		xprt = svc_vc_create_conn(pool, so, sa);
154 		free(sa, M_SONAME);
155 		return (xprt);
156 	}
157 
158 	xprt = svc_xprt_alloc();
159 	sx_init(&xprt->xp_lock, "xprt->xp_lock");
160 	xprt->xp_pool = pool;
161 	xprt->xp_socket = so;
162 	xprt->xp_p1 = NULL;
163 	xprt->xp_p2 = NULL;
164 	xprt->xp_ops = &svc_vc_rendezvous_ops;
165 
166 	error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
167 	if (error) {
168 		goto cleanup_svc_vc_create;
169 	}
170 
171 	memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
172 	free(sa, M_SONAME);
173 
174 	xprt_register(xprt);
175 
176 	solisten(so, SOMAXCONN, curthread);
177 
178 	SOCKBUF_LOCK(&so->so_rcv);
179 	xprt->xp_upcallset = 1;
180 	soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
181 	SOCKBUF_UNLOCK(&so->so_rcv);
182 
183 	return (xprt);
184 cleanup_svc_vc_create:
185 	if (xprt)
186 		svc_xprt_free(xprt);
187 	return (NULL);
188 }
189 
190 /*
191  * Create a new transport for a socket optained via soaccept().
192  */
193 SVCXPRT *
194 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
195 {
196 	SVCXPRT *xprt = NULL;
197 	struct cf_conn *cd = NULL;
198 	struct sockaddr* sa = NULL;
199 	struct sockopt opt;
200 	int one = 1;
201 	int error;
202 
203 	bzero(&opt, sizeof(struct sockopt));
204 	opt.sopt_dir = SOPT_SET;
205 	opt.sopt_level = SOL_SOCKET;
206 	opt.sopt_name = SO_KEEPALIVE;
207 	opt.sopt_val = &one;
208 	opt.sopt_valsize = sizeof(one);
209 	error = sosetopt(so, &opt);
210 	if (error) {
211 		return (NULL);
212 	}
213 
214 	if (so->so_proto->pr_protocol == IPPROTO_TCP) {
215 		bzero(&opt, sizeof(struct sockopt));
216 		opt.sopt_dir = SOPT_SET;
217 		opt.sopt_level = IPPROTO_TCP;
218 		opt.sopt_name = TCP_NODELAY;
219 		opt.sopt_val = &one;
220 		opt.sopt_valsize = sizeof(one);
221 		error = sosetopt(so, &opt);
222 		if (error) {
223 			return (NULL);
224 		}
225 	}
226 
227 	cd = mem_alloc(sizeof(*cd));
228 	cd->strm_stat = XPRT_IDLE;
229 
230 	xprt = svc_xprt_alloc();
231 	sx_init(&xprt->xp_lock, "xprt->xp_lock");
232 	xprt->xp_pool = pool;
233 	xprt->xp_socket = so;
234 	xprt->xp_p1 = cd;
235 	xprt->xp_p2 = NULL;
236 	xprt->xp_ops = &svc_vc_ops;
237 
238 	/*
239 	 * See http://www.connectathon.org/talks96/nfstcp.pdf - client
240 	 * has a 5 minute timer, server has a 6 minute timer.
241 	 */
242 	xprt->xp_idletimeout = 6 * 60;
243 
244 	memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len);
245 
246 	error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
247 	if (error)
248 		goto cleanup_svc_vc_create;
249 
250 	memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
251 	free(sa, M_SONAME);
252 
253 	xprt_register(xprt);
254 
255 	SOCKBUF_LOCK(&so->so_rcv);
256 	xprt->xp_upcallset = 1;
257 	soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
258 	SOCKBUF_UNLOCK(&so->so_rcv);
259 
260 	/*
261 	 * Throw the transport into the active list in case it already
262 	 * has some data buffered.
263 	 */
264 	sx_xlock(&xprt->xp_lock);
265 	xprt_active(xprt);
266 	sx_xunlock(&xprt->xp_lock);
267 
268 	return (xprt);
269 cleanup_svc_vc_create:
270 	if (xprt) {
271 		mem_free(xprt, sizeof(*xprt));
272 	}
273 	if (cd)
274 		mem_free(cd, sizeof(*cd));
275 	return (NULL);
276 }
277 
278 /*
279  * Create a new transport for a backchannel on a clnt_vc socket.
280  */
281 SVCXPRT *
282 svc_vc_create_backchannel(SVCPOOL *pool)
283 {
284 	SVCXPRT *xprt = NULL;
285 	struct cf_conn *cd = NULL;
286 
287 	cd = mem_alloc(sizeof(*cd));
288 	cd->strm_stat = XPRT_IDLE;
289 
290 	xprt = svc_xprt_alloc();
291 	sx_init(&xprt->xp_lock, "xprt->xp_lock");
292 	xprt->xp_pool = pool;
293 	xprt->xp_socket = NULL;
294 	xprt->xp_p1 = cd;
295 	xprt->xp_p2 = NULL;
296 	xprt->xp_ops = &svc_vc_backchannel_ops;
297 	return (xprt);
298 }
299 
300 /*
301  * This does all of the accept except the final call to soaccept. The
302  * caller will call soaccept after dropping its locks (soaccept may
303  * call malloc).
304  */
305 int
306 svc_vc_accept(struct socket *head, struct socket **sop)
307 {
308 	int error = 0;
309 	struct socket *so;
310 
311 	if ((head->so_options & SO_ACCEPTCONN) == 0) {
312 		error = EINVAL;
313 		goto done;
314 	}
315 #ifdef MAC
316 	error = mac_socket_check_accept(curthread->td_ucred, head);
317 	if (error != 0)
318 		goto done;
319 #endif
320 	ACCEPT_LOCK();
321 	if (TAILQ_EMPTY(&head->so_comp)) {
322 		ACCEPT_UNLOCK();
323 		error = EWOULDBLOCK;
324 		goto done;
325 	}
326 	so = TAILQ_FIRST(&head->so_comp);
327 	KASSERT(!(so->so_qstate & SQ_INCOMP), ("svc_vc_accept: so SQ_INCOMP"));
328 	KASSERT(so->so_qstate & SQ_COMP, ("svc_vc_accept: so not SQ_COMP"));
329 
330 	/*
331 	 * Before changing the flags on the socket, we have to bump the
332 	 * reference count.  Otherwise, if the protocol calls sofree(),
333 	 * the socket will be released due to a zero refcount.
334 	 * XXX might not need soref() since this is simpler than kern_accept.
335 	 */
336 	SOCK_LOCK(so);			/* soref() and so_state update */
337 	soref(so);			/* file descriptor reference */
338 
339 	TAILQ_REMOVE(&head->so_comp, so, so_list);
340 	head->so_qlen--;
341 	so->so_state |= (head->so_state & SS_NBIO);
342 	so->so_qstate &= ~SQ_COMP;
343 	so->so_head = NULL;
344 
345 	SOCK_UNLOCK(so);
346 	ACCEPT_UNLOCK();
347 
348 	*sop = so;
349 
350 	/* connection has been removed from the listen queue */
351 	KNOTE_UNLOCKED(&head->so_rcv.sb_sel.si_note, 0);
352 done:
353 	return (error);
354 }
355 
356 /*ARGSUSED*/
357 static bool_t
358 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
359     struct sockaddr **addrp, struct mbuf **mp)
360 {
361 	struct socket *so = NULL;
362 	struct sockaddr *sa = NULL;
363 	int error;
364 	SVCXPRT *new_xprt;
365 
366 	/*
367 	 * The socket upcall calls xprt_active() which will eventually
368 	 * cause the server to call us here. We attempt to accept a
369 	 * connection from the socket and turn it into a new
370 	 * transport. If the accept fails, we have drained all pending
371 	 * connections so we call xprt_inactive().
372 	 */
373 	sx_xlock(&xprt->xp_lock);
374 
375 	error = svc_vc_accept(xprt->xp_socket, &so);
376 
377 	if (error == EWOULDBLOCK) {
378 		/*
379 		 * We must re-test for new connections after taking
380 		 * the lock to protect us in the case where a new
381 		 * connection arrives after our call to accept fails
382 		 * with EWOULDBLOCK. The pool lock protects us from
383 		 * racing the upcall after our TAILQ_EMPTY() call
384 		 * returns false.
385 		 */
386 		ACCEPT_LOCK();
387 		mtx_lock(&xprt->xp_pool->sp_lock);
388 		if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
389 			xprt_inactive_locked(xprt);
390 		mtx_unlock(&xprt->xp_pool->sp_lock);
391 		ACCEPT_UNLOCK();
392 		sx_xunlock(&xprt->xp_lock);
393 		return (FALSE);
394 	}
395 
396 	if (error) {
397 		SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
398 		if (xprt->xp_upcallset) {
399 			xprt->xp_upcallset = 0;
400 			soupcall_clear(xprt->xp_socket, SO_RCV);
401 		}
402 		SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
403 		xprt_inactive(xprt);
404 		sx_xunlock(&xprt->xp_lock);
405 		return (FALSE);
406 	}
407 
408 	sx_xunlock(&xprt->xp_lock);
409 
410 	sa = 0;
411 	error = soaccept(so, &sa);
412 
413 	if (error) {
414 		/*
415 		 * XXX not sure if I need to call sofree or soclose here.
416 		 */
417 		if (sa)
418 			free(sa, M_SONAME);
419 		return (FALSE);
420 	}
421 
422 	/*
423 	 * svc_vc_create_conn will call xprt_register - we don't need
424 	 * to do anything with the new connection except derefence it.
425 	 */
426 	new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa);
427 	if (!new_xprt) {
428 		soclose(so);
429 	} else {
430 		SVC_RELEASE(new_xprt);
431 	}
432 
433 	free(sa, M_SONAME);
434 
435 	return (FALSE); /* there is never an rpc msg to be processed */
436 }
437 
438 /*ARGSUSED*/
439 static enum xprt_stat
440 svc_vc_rendezvous_stat(SVCXPRT *xprt)
441 {
442 
443 	return (XPRT_IDLE);
444 }
445 
446 static void
447 svc_vc_destroy_common(SVCXPRT *xprt)
448 {
449 	SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
450 	if (xprt->xp_upcallset) {
451 		xprt->xp_upcallset = 0;
452 		soupcall_clear(xprt->xp_socket, SO_RCV);
453 	}
454 	SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
455 
456 	sx_destroy(&xprt->xp_lock);
457 	if (xprt->xp_socket)
458 		(void)soclose(xprt->xp_socket);
459 
460 	if (xprt->xp_netid)
461 		(void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1);
462 	svc_xprt_free(xprt);
463 }
464 
465 static void
466 svc_vc_rendezvous_destroy(SVCXPRT *xprt)
467 {
468 
469 	svc_vc_destroy_common(xprt);
470 }
471 
472 static void
473 svc_vc_destroy(SVCXPRT *xprt)
474 {
475 	struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
476 
477 	svc_vc_destroy_common(xprt);
478 
479 	if (cd->mreq)
480 		m_freem(cd->mreq);
481 	if (cd->mpending)
482 		m_freem(cd->mpending);
483 	mem_free(cd, sizeof(*cd));
484 }
485 
486 static void
487 svc_vc_backchannel_destroy(SVCXPRT *xprt)
488 {
489 	struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
490 	struct mbuf *m, *m2;
491 
492 	svc_xprt_free(xprt);
493 	m = cd->mreq;
494 	while (m != NULL) {
495 		m2 = m;
496 		m = m->m_nextpkt;
497 		m_freem(m2);
498 	}
499 	mem_free(cd, sizeof(*cd));
500 }
501 
502 /*ARGSUSED*/
503 static bool_t
504 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in)
505 {
506 	return (FALSE);
507 }
508 
509 static bool_t
510 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in)
511 {
512 
513 	return (FALSE);
514 }
515 
516 static bool_t
517 svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in)
518 {
519 
520 	return (FALSE);
521 }
522 
523 static enum xprt_stat
524 svc_vc_stat(SVCXPRT *xprt)
525 {
526 	struct cf_conn *cd;
527 	struct mbuf *m;
528 	size_t n;
529 
530 	cd = (struct cf_conn *)(xprt->xp_p1);
531 
532 	if (cd->strm_stat == XPRT_DIED)
533 		return (XPRT_DIED);
534 
535 	/*
536 	 * Return XPRT_MOREREQS if we have buffered data and we are
537 	 * mid-record or if we have enough data for a record
538 	 * marker. Since this is only a hint, we read mpending and
539 	 * resid outside the lock. We do need to take the lock if we
540 	 * have to traverse the mbuf chain.
541 	 */
542 	if (cd->mpending) {
543 		if (cd->resid)
544 			return (XPRT_MOREREQS);
545 		n = 0;
546 		sx_xlock(&xprt->xp_lock);
547 		m = cd->mpending;
548 		while (m && n < sizeof(uint32_t)) {
549 			n += m->m_len;
550 			m = m->m_next;
551 		}
552 		sx_xunlock(&xprt->xp_lock);
553 		if (n >= sizeof(uint32_t))
554 			return (XPRT_MOREREQS);
555 	}
556 
557 	if (soreadable(xprt->xp_socket))
558 		return (XPRT_MOREREQS);
559 
560 	return (XPRT_IDLE);
561 }
562 
563 static enum xprt_stat
564 svc_vc_backchannel_stat(SVCXPRT *xprt)
565 {
566 	struct cf_conn *cd;
567 
568 	cd = (struct cf_conn *)(xprt->xp_p1);
569 
570 	if (cd->mreq != NULL)
571 		return (XPRT_MOREREQS);
572 
573 	return (XPRT_IDLE);
574 }
575 
576 static bool_t
577 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
578     struct sockaddr **addrp, struct mbuf **mp)
579 {
580 	struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
581 	struct uio uio;
582 	struct mbuf *m;
583 	XDR xdrs;
584 	int error, rcvflag;
585 
586 	/*
587 	 * Serialise access to the socket and our own record parsing
588 	 * state.
589 	 */
590 	sx_xlock(&xprt->xp_lock);
591 
592 	for (;;) {
593 		/*
594 		 * If we have an mbuf chain in cd->mpending, try to parse a
595 		 * record from it, leaving the result in cd->mreq. If we don't
596 		 * have a complete record, leave the partial result in
597 		 * cd->mreq and try to read more from the socket.
598 		 */
599 		if (cd->mpending) {
600 			/*
601 			 * If cd->resid is non-zero, we have part of the
602 			 * record already, otherwise we are expecting a record
603 			 * marker.
604 			 */
605 			if (!cd->resid) {
606 				/*
607 				 * See if there is enough data buffered to
608 				 * make up a record marker. Make sure we can
609 				 * handle the case where the record marker is
610 				 * split across more than one mbuf.
611 				 */
612 				size_t n = 0;
613 				uint32_t header;
614 
615 				m = cd->mpending;
616 				while (n < sizeof(uint32_t) && m) {
617 					n += m->m_len;
618 					m = m->m_next;
619 				}
620 				if (n < sizeof(uint32_t))
621 					goto readmore;
622 				m_copydata(cd->mpending, 0, sizeof(header),
623 				    (char *)&header);
624 				header = ntohl(header);
625 				cd->eor = (header & 0x80000000) != 0;
626 				cd->resid = header & 0x7fffffff;
627 				m_adj(cd->mpending, sizeof(uint32_t));
628 			}
629 
630 			/*
631 			 * Start pulling off mbufs from cd->mpending
632 			 * until we either have a complete record or
633 			 * we run out of data. We use m_split to pull
634 			 * data - it will pull as much as possible and
635 			 * split the last mbuf if necessary.
636 			 */
637 			while (cd->mpending && cd->resid) {
638 				m = cd->mpending;
639 				if (cd->mpending->m_next
640 				    || cd->mpending->m_len > cd->resid)
641 					cd->mpending = m_split(cd->mpending,
642 					    cd->resid, M_WAITOK);
643 				else
644 					cd->mpending = NULL;
645 				if (cd->mreq)
646 					m_last(cd->mreq)->m_next = m;
647 				else
648 					cd->mreq = m;
649 				while (m) {
650 					cd->resid -= m->m_len;
651 					m = m->m_next;
652 				}
653 			}
654 
655 			/*
656 			 * If cd->resid is zero now, we have managed to
657 			 * receive a record fragment from the stream. Check
658 			 * for the end-of-record mark to see if we need more.
659 			 */
660 			if (cd->resid == 0) {
661 				if (!cd->eor)
662 					continue;
663 
664 				/*
665 				 * Success - we have a complete record in
666 				 * cd->mreq.
667 				 */
668 				xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
669 				cd->mreq = NULL;
670 				sx_xunlock(&xprt->xp_lock);
671 
672 				if (! xdr_callmsg(&xdrs, msg)) {
673 					XDR_DESTROY(&xdrs);
674 					return (FALSE);
675 				}
676 
677 				*addrp = NULL;
678 				*mp = xdrmbuf_getall(&xdrs);
679 				XDR_DESTROY(&xdrs);
680 
681 				return (TRUE);
682 			}
683 		}
684 
685 	readmore:
686 		/*
687 		 * The socket upcall calls xprt_active() which will eventually
688 		 * cause the server to call us here. We attempt to
689 		 * read as much as possible from the socket and put
690 		 * the result in cd->mpending. If the read fails,
691 		 * we have drained both cd->mpending and the socket so
692 		 * we can call xprt_inactive().
693 		 */
694 		uio.uio_resid = 1000000000;
695 		uio.uio_td = curthread;
696 		m = NULL;
697 		rcvflag = MSG_DONTWAIT;
698 		error = soreceive(xprt->xp_socket, NULL, &uio, &m, NULL,
699 		    &rcvflag);
700 
701 		if (error == EWOULDBLOCK) {
702 			/*
703 			 * We must re-test for readability after
704 			 * taking the lock to protect us in the case
705 			 * where a new packet arrives on the socket
706 			 * after our call to soreceive fails with
707 			 * EWOULDBLOCK. The pool lock protects us from
708 			 * racing the upcall after our soreadable()
709 			 * call returns false.
710 			 */
711 			mtx_lock(&xprt->xp_pool->sp_lock);
712 			if (!soreadable(xprt->xp_socket))
713 				xprt_inactive_locked(xprt);
714 			mtx_unlock(&xprt->xp_pool->sp_lock);
715 			sx_xunlock(&xprt->xp_lock);
716 			return (FALSE);
717 		}
718 
719 		if (error) {
720 			SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
721 			if (xprt->xp_upcallset) {
722 				xprt->xp_upcallset = 0;
723 				soupcall_clear(xprt->xp_socket, SO_RCV);
724 			}
725 			SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
726 			xprt_inactive(xprt);
727 			cd->strm_stat = XPRT_DIED;
728 			sx_xunlock(&xprt->xp_lock);
729 			return (FALSE);
730 		}
731 
732 		if (!m) {
733 			/*
734 			 * EOF - the other end has closed the socket.
735 			 */
736 			xprt_inactive(xprt);
737 			cd->strm_stat = XPRT_DIED;
738 			sx_xunlock(&xprt->xp_lock);
739 			return (FALSE);
740 		}
741 
742 		if (cd->mpending)
743 			m_last(cd->mpending)->m_next = m;
744 		else
745 			cd->mpending = m;
746 	}
747 }
748 
749 static bool_t
750 svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
751     struct sockaddr **addrp, struct mbuf **mp)
752 {
753 	struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
754 	struct ct_data *ct;
755 	struct mbuf *m;
756 	XDR xdrs;
757 
758 	sx_xlock(&xprt->xp_lock);
759 	ct = (struct ct_data *)xprt->xp_p2;
760 	if (ct == NULL) {
761 		sx_xunlock(&xprt->xp_lock);
762 		return (FALSE);
763 	}
764 	mtx_lock(&ct->ct_lock);
765 	m = cd->mreq;
766 	if (m == NULL) {
767 		xprt_inactive(xprt);
768 		mtx_unlock(&ct->ct_lock);
769 		sx_xunlock(&xprt->xp_lock);
770 		return (FALSE);
771 	}
772 	cd->mreq = m->m_nextpkt;
773 	mtx_unlock(&ct->ct_lock);
774 	sx_xunlock(&xprt->xp_lock);
775 
776 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
777 	if (! xdr_callmsg(&xdrs, msg)) {
778 		XDR_DESTROY(&xdrs);
779 		return (FALSE);
780 	}
781 	*addrp = NULL;
782 	*mp = xdrmbuf_getall(&xdrs);
783 	XDR_DESTROY(&xdrs);
784 	return (TRUE);
785 }
786 
787 static bool_t
788 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
789     struct sockaddr *addr, struct mbuf *m)
790 {
791 	XDR xdrs;
792 	struct mbuf *mrep;
793 	bool_t stat = TRUE;
794 	int error;
795 
796 	/*
797 	 * Leave space for record mark.
798 	 */
799 	MGETHDR(mrep, M_WAITOK, MT_DATA);
800 	mrep->m_len = 0;
801 	mrep->m_data += sizeof(uint32_t);
802 
803 	xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
804 
805 	if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
806 	    msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
807 		if (!xdr_replymsg(&xdrs, msg))
808 			stat = FALSE;
809 		else
810 			xdrmbuf_append(&xdrs, m);
811 	} else {
812 		stat = xdr_replymsg(&xdrs, msg);
813 	}
814 
815 	if (stat) {
816 		m_fixhdr(mrep);
817 
818 		/*
819 		 * Prepend a record marker containing the reply length.
820 		 */
821 		M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
822 		*mtod(mrep, uint32_t *) =
823 			htonl(0x80000000 | (mrep->m_pkthdr.len
824 				- sizeof(uint32_t)));
825 		error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
826 		    0, curthread);
827 		if (!error) {
828 			stat = TRUE;
829 		}
830 	} else {
831 		m_freem(mrep);
832 	}
833 
834 	XDR_DESTROY(&xdrs);
835 	xprt->xp_p2 = NULL;
836 
837 	return (stat);
838 }
839 
840 static bool_t
841 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
842     struct sockaddr *addr, struct mbuf *m)
843 {
844 	struct ct_data *ct;
845 	XDR xdrs;
846 	struct mbuf *mrep;
847 	bool_t stat = TRUE;
848 	int error;
849 
850 	/*
851 	 * Leave space for record mark.
852 	 */
853 	MGETHDR(mrep, M_WAITOK, MT_DATA);
854 	mrep->m_len = 0;
855 	mrep->m_data += sizeof(uint32_t);
856 
857 	xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
858 
859 	if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
860 	    msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
861 		if (!xdr_replymsg(&xdrs, msg))
862 			stat = FALSE;
863 		else
864 			xdrmbuf_append(&xdrs, m);
865 	} else {
866 		stat = xdr_replymsg(&xdrs, msg);
867 	}
868 
869 	if (stat) {
870 		m_fixhdr(mrep);
871 
872 		/*
873 		 * Prepend a record marker containing the reply length.
874 		 */
875 		M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
876 		*mtod(mrep, uint32_t *) =
877 			htonl(0x80000000 | (mrep->m_pkthdr.len
878 				- sizeof(uint32_t)));
879 		sx_xlock(&xprt->xp_lock);
880 		ct = (struct ct_data *)xprt->xp_p2;
881 		if (ct != NULL)
882 			error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL,
883 			    0, curthread);
884 		else
885 			error = EPIPE;
886 		sx_xunlock(&xprt->xp_lock);
887 		if (!error) {
888 			stat = TRUE;
889 		}
890 	} else {
891 		m_freem(mrep);
892 	}
893 
894 	XDR_DESTROY(&xdrs);
895 
896 	return (stat);
897 }
898 
899 static bool_t
900 svc_vc_null()
901 {
902 
903 	return (FALSE);
904 }
905 
906 static int
907 svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
908 {
909 	SVCXPRT *xprt = (SVCXPRT *) arg;
910 
911 	xprt_active(xprt);
912 	return (SU_OK);
913 }
914 
915 #if 0
916 /*
917  * Get the effective UID of the sending process. Used by rpcbind, keyserv
918  * and rpc.yppasswdd on AF_LOCAL.
919  */
920 int
921 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) {
922 	int sock, ret;
923 	gid_t egid;
924 	uid_t euid;
925 	struct sockaddr *sa;
926 
927 	sock = transp->xp_fd;
928 	sa = (struct sockaddr *)transp->xp_rtaddr;
929 	if (sa->sa_family == AF_LOCAL) {
930 		ret = getpeereid(sock, &euid, &egid);
931 		if (ret == 0)
932 			*uid = euid;
933 		return (ret);
934 	} else
935 		return (-1);
936 }
937 #endif
938