xref: /freebsd/sys/rpc/clnt_dg.c (revision cacdd70cc751fb68dec4b86c5e5b8c969b6e26ef)
1 /*	$NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $	*/
2 
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  *
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  *
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  *
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  *
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  *
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31 /*
32  * Copyright (c) 1986-1991 by Sun Microsystems Inc.
33  */
34 
35 #if defined(LIBC_SCCS) && !defined(lint)
36 #ident	"@(#)clnt_dg.c	1.23	94/04/22 SMI"
37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro";
38 #endif
39 #include <sys/cdefs.h>
40 __FBSDID("$FreeBSD$");
41 
42 /*
43  * Implements a connectionless client side RPC.
44  */
45 
46 #include <sys/param.h>
47 #include <sys/systm.h>
48 #include <sys/kernel.h>
49 #include <sys/lock.h>
50 #include <sys/malloc.h>
51 #include <sys/mbuf.h>
52 #include <sys/mutex.h>
53 #include <sys/pcpu.h>
54 #include <sys/proc.h>
55 #include <sys/socket.h>
56 #include <sys/socketvar.h>
57 #include <sys/time.h>
58 #include <sys/uio.h>
59 
60 #include <rpc/rpc.h>
61 #include <rpc/rpc_com.h>
62 
63 
64 #ifdef _FREEFALL_CONFIG
65 /*
66  * Disable RPC exponential back-off for FreeBSD.org systems.
67  */
68 #define	RPC_MAX_BACKOFF		1 /* second */
69 #else
70 #define	RPC_MAX_BACKOFF		30 /* seconds */
71 #endif
72 
73 static bool_t time_not_ok(struct timeval *);
74 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
75     rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
76 static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
77 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
78 static void clnt_dg_abort(CLIENT *);
79 static bool_t clnt_dg_control(CLIENT *, u_int, void *);
80 static void clnt_dg_destroy(CLIENT *);
81 static void clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
82 
83 static struct clnt_ops clnt_dg_ops = {
84 	.cl_call =	clnt_dg_call,
85 	.cl_abort =	clnt_dg_abort,
86 	.cl_geterr =	clnt_dg_geterr,
87 	.cl_freeres =	clnt_dg_freeres,
88 	.cl_destroy =	clnt_dg_destroy,
89 	.cl_control =	clnt_dg_control
90 };
91 
92 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
93 
94 /*
95  * A pending RPC request which awaits a reply. Requests which have
96  * received their reply will have cr_xid set to zero and cr_mrep to
97  * the mbuf chain of the reply.
98  */
99 struct cu_request {
100 	TAILQ_ENTRY(cu_request) cr_link;
101 	CLIENT			*cr_client;	/* owner */
102 	uint32_t		cr_xid;		/* XID of request */
103 	struct mbuf		*cr_mrep;	/* reply received by upcall */
104 	int			cr_error;	/* any error from upcall */
105 };
106 
107 TAILQ_HEAD(cu_request_list, cu_request);
108 
109 #define MCALL_MSG_SIZE 24
110 
111 /*
112  * This structure is pointed to by the socket's so_upcallarg
113  * member. It is separate from the client private data to facilitate
114  * multiple clients sharing the same socket. The cs_lock mutex is used
115  * to protect all fields of this structure, the socket's receive
116  * buffer SOCKBUF_LOCK is used to ensure that exactly one of these
117  * structures is installed on the socket.
118  */
119 struct cu_socket {
120 	struct mtx		cs_lock;
121 	int			cs_refs;	/* Count of clients */
122 	struct cu_request_list	cs_pending;	/* Requests awaiting replies */
123 
124 };
125 
126 /*
127  * Private data kept per client handle
128  */
129 struct cu_data {
130 	int			cu_threads;	/* # threads in clnt_vc_call */
131 	bool_t			cu_closing;	/* TRUE if we are destroying */
132 	struct socket		*cu_socket;	/* connection socket */
133 	bool_t			cu_closeit;	/* opened by library */
134 	struct sockaddr_storage	cu_raddr;	/* remote address */
135 	int			cu_rlen;
136 	struct timeval		cu_wait;	/* retransmit interval */
137 	struct timeval		cu_total;	/* total time for the call */
138 	struct rpc_err		cu_error;
139 	uint32_t		cu_xid;
140 	char			cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
141 	size_t			cu_mcalllen;
142 	size_t			cu_sendsz;	/* send size */
143 	size_t			cu_recvsz;	/* recv size */
144 	int			cu_async;
145 	int			cu_connect;	/* Use connect(). */
146 	int			cu_connected;	/* Have done connect(). */
147 	const char		*cu_waitchan;
148 	int			cu_waitflag;
149 };
150 
151 /*
152  * Connection less client creation returns with client handle parameters.
153  * Default options are set, which the user can change using clnt_control().
154  * fd should be open and bound.
155  * NB: The rpch->cl_auth is initialized to null authentication.
156  * 	Caller may wish to set this something more useful.
157  *
158  * sendsz and recvsz are the maximum allowable packet sizes that can be
159  * sent and received. Normally they are the same, but they can be
160  * changed to improve the program efficiency and buffer allocation.
161  * If they are 0, use the transport default.
162  *
163  * If svcaddr is NULL, returns NULL.
164  */
165 CLIENT *
166 clnt_dg_create(
167 	struct socket *so,
168 	struct sockaddr *svcaddr,	/* servers address */
169 	rpcprog_t program,		/* program number */
170 	rpcvers_t version,		/* version number */
171 	size_t sendsz,			/* buffer recv size */
172 	size_t recvsz)			/* buffer send size */
173 {
174 	CLIENT *cl = NULL;		/* client handle */
175 	struct cu_data *cu = NULL;	/* private data */
176 	struct cu_socket *cs = NULL;
177 	struct timeval now;
178 	struct rpc_msg call_msg;
179 	struct __rpc_sockinfo si;
180 	XDR xdrs;
181 
182 	if (svcaddr == NULL) {
183 		rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
184 		return (NULL);
185 	}
186 
187 	if (!__rpc_socket2sockinfo(so, &si)) {
188 		rpc_createerr.cf_stat = RPC_TLIERROR;
189 		rpc_createerr.cf_error.re_errno = 0;
190 		return (NULL);
191 	}
192 
193 	/*
194 	 * Find the receive and the send size
195 	 */
196 	sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
197 	recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
198 	if ((sendsz == 0) || (recvsz == 0)) {
199 		rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */
200 		rpc_createerr.cf_error.re_errno = 0;
201 		return (NULL);
202 	}
203 
204 	cl = mem_alloc(sizeof (CLIENT));
205 
206 	/*
207 	 * Should be multiple of 4 for XDR.
208 	 */
209 	sendsz = ((sendsz + 3) / 4) * 4;
210 	recvsz = ((recvsz + 3) / 4) * 4;
211 	cu = mem_alloc(sizeof (*cu));
212 	cu->cu_threads = 0;
213 	cu->cu_closing = FALSE;
214 	(void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
215 	cu->cu_rlen = svcaddr->sa_len;
216 	/* Other values can also be set through clnt_control() */
217 	cu->cu_wait.tv_sec = 3;	/* heuristically chosen */
218 	cu->cu_wait.tv_usec = 0;
219 	cu->cu_total.tv_sec = -1;
220 	cu->cu_total.tv_usec = -1;
221 	cu->cu_sendsz = sendsz;
222 	cu->cu_recvsz = recvsz;
223 	cu->cu_async = FALSE;
224 	cu->cu_connect = FALSE;
225 	cu->cu_connected = FALSE;
226 	cu->cu_waitchan = "rpcrecv";
227 	cu->cu_waitflag = 0;
228 	(void) getmicrotime(&now);
229 	cu->cu_xid = __RPC_GETXID(&now);
230 	call_msg.rm_xid = cu->cu_xid;
231 	call_msg.rm_call.cb_prog = program;
232 	call_msg.rm_call.cb_vers = version;
233 	xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE);
234 	if (! xdr_callhdr(&xdrs, &call_msg)) {
235 		rpc_createerr.cf_stat = RPC_CANTENCODEARGS;  /* XXX */
236 		rpc_createerr.cf_error.re_errno = 0;
237 		goto err2;
238 	}
239 	cu->cu_mcalllen = XDR_GETPOS(&xdrs);;
240 
241 	/*
242 	 * By default, closeit is always FALSE. It is users responsibility
243 	 * to do a close on it, else the user may use clnt_control
244 	 * to let clnt_destroy do it for him/her.
245 	 */
246 	cu->cu_closeit = FALSE;
247 	cu->cu_socket = so;
248 	soreserve(so, 256*1024, 256*1024);
249 
250 	SOCKBUF_LOCK(&so->so_rcv);
251 recheck_socket:
252 	if (so->so_upcall) {
253 		if (so->so_upcall != clnt_dg_soupcall) {
254 			SOCKBUF_UNLOCK(&so->so_rcv);
255 			printf("clnt_dg_create(): socket already has an incompatible upcall\n");
256 			goto err2;
257 		}
258 		cs = (struct cu_socket *) so->so_upcallarg;
259 		mtx_lock(&cs->cs_lock);
260 		cs->cs_refs++;
261 		mtx_unlock(&cs->cs_lock);
262 	} else {
263 		/*
264 		 * We are the first on this socket - allocate the
265 		 * structure and install it in the socket.
266 		 */
267 		SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
268 		cs = mem_alloc(sizeof(*cs));
269 		SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
270 		if (so->so_upcall) {
271 			/*
272 			 * We have lost a race with some other client.
273 			 */
274 			mem_free(cs, sizeof(*cs));
275 			goto recheck_socket;
276 		}
277 		mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF);
278 		cs->cs_refs = 1;
279 		TAILQ_INIT(&cs->cs_pending);
280 		so->so_upcallarg = cs;
281 		so->so_upcall = clnt_dg_soupcall;
282 		so->so_rcv.sb_flags |= SB_UPCALL;
283 	}
284 	SOCKBUF_UNLOCK(&so->so_rcv);
285 
286 	cl->cl_refs = 1;
287 	cl->cl_ops = &clnt_dg_ops;
288 	cl->cl_private = (caddr_t)(void *)cu;
289 	cl->cl_auth = authnone_create();
290 	cl->cl_tp = NULL;
291 	cl->cl_netid = NULL;
292 	return (cl);
293 err2:
294 	if (cl) {
295 		mem_free(cl, sizeof (CLIENT));
296 		if (cu)
297 			mem_free(cu, sizeof (*cu));
298 	}
299 	return (NULL);
300 }
301 
302 static enum clnt_stat
303 clnt_dg_call(
304 	CLIENT		*cl,		/* client handle */
305 	struct rpc_callextra *ext,	/* call metadata */
306 	rpcproc_t	proc,		/* procedure number */
307 	xdrproc_t	xargs,		/* xdr routine for args */
308 	void		*argsp,		/* pointer to args */
309 	xdrproc_t	xresults,	/* xdr routine for results */
310 	void		*resultsp,	/* pointer to results */
311 	struct timeval	utimeout)	/* seconds to wait before giving up */
312 {
313 	struct cu_data *cu = (struct cu_data *)cl->cl_private;
314 	struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
315 	AUTH *auth;
316 	XDR xdrs;
317 	struct rpc_msg reply_msg;
318 	bool_t ok;
319 	int retrans;			/* number of re-transmits so far */
320 	int nrefreshes = 2;		/* number of times to refresh cred */
321 	struct timeval *tvp;
322 	int timeout;
323 	int retransmit_time;
324 	int next_sendtime, starttime, time_waited, tv;
325 	struct sockaddr *sa;
326 	socklen_t salen;
327 	uint32_t xid;
328 	struct mbuf *mreq = NULL;
329 	struct cu_request *cr;
330 	int error;
331 
332 	cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
333 
334 	mtx_lock(&cs->cs_lock);
335 
336 	if (cu->cu_closing) {
337 		mtx_unlock(&cs->cs_lock);
338 		free(cr, M_RPC);
339 		return (RPC_CANTSEND);
340 	}
341 	cu->cu_threads++;
342 
343 	if (ext)
344 		auth = ext->rc_auth;
345 	else
346 		auth = cl->cl_auth;
347 
348 	cr->cr_client = cl;
349 	cr->cr_mrep = NULL;
350 	cr->cr_error = 0;
351 
352 	if (cu->cu_total.tv_usec == -1) {
353 		tvp = &utimeout; /* use supplied timeout */
354 	} else {
355 		tvp = &cu->cu_total; /* use default timeout */
356 	}
357 	if (tvp->tv_sec || tvp->tv_usec)
358 		timeout = tvtohz(tvp);
359 	else
360 		timeout = 0;
361 
362 	if (cu->cu_connect && !cu->cu_connected) {
363 		mtx_unlock(&cs->cs_lock);
364 		error = soconnect(cu->cu_socket,
365 		    (struct sockaddr *)&cu->cu_raddr, curthread);
366 		mtx_lock(&cs->cs_lock);
367 		if (error) {
368 			cu->cu_error.re_errno = error;
369 			cu->cu_error.re_status = RPC_CANTSEND;
370 			goto out;
371 		}
372 		cu->cu_connected = 1;
373 	}
374 	if (cu->cu_connected) {
375 		sa = NULL;
376 		salen = 0;
377 	} else {
378 		sa = (struct sockaddr *)&cu->cu_raddr;
379 		salen = cu->cu_rlen;
380 	}
381 	time_waited = 0;
382 	retrans = 0;
383 	retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
384 
385 	starttime = ticks;
386 
387 call_again:
388 	mtx_assert(&cs->cs_lock, MA_OWNED);
389 
390 	cu->cu_xid++;
391 	xid = cu->cu_xid;
392 
393 send_again:
394 	mtx_unlock(&cs->cs_lock);
395 
396 	MGETHDR(mreq, M_WAIT, MT_DATA);
397 	MCLGET(mreq, M_WAIT);
398 	mreq->m_len = 0;
399 	m_append(mreq, cu->cu_mcalllen, cu->cu_mcallc);
400 
401 	/*
402 	 * The XID is the first thing in the request.
403 	 */
404 	*mtod(mreq, uint32_t *) = htonl(xid);
405 
406 	xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
407 
408 	if (cu->cu_async == TRUE && xargs == NULL)
409 		goto get_reply;
410 
411 	if ((! XDR_PUTINT32(&xdrs, &proc)) ||
412 	    (! AUTH_MARSHALL(auth, &xdrs)) ||
413 	    (! (*xargs)(&xdrs, argsp))) {
414 		cu->cu_error.re_status = RPC_CANTENCODEARGS;
415 		mtx_lock(&cs->cs_lock);
416 		goto out;
417 	}
418 	m_fixhdr(mreq);
419 
420 	cr->cr_xid = xid;
421 	mtx_lock(&cs->cs_lock);
422 	TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
423 	mtx_unlock(&cs->cs_lock);
424 
425 	/*
426 	 * sosend consumes mreq.
427 	 */
428 	error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread);
429 	mreq = NULL;
430 
431 	/*
432 	 * sub-optimal code appears here because we have
433 	 * some clock time to spare while the packets are in flight.
434 	 * (We assume that this is actually only executed once.)
435 	 */
436 	reply_msg.acpted_rply.ar_verf = _null_auth;
437 	reply_msg.acpted_rply.ar_results.where = resultsp;
438 	reply_msg.acpted_rply.ar_results.proc = xresults;
439 
440 	mtx_lock(&cs->cs_lock);
441 	if (error) {
442 		TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
443 		cu->cu_error.re_errno = error;
444 		cu->cu_error.re_status = RPC_CANTSEND;
445 		goto out;
446 	}
447 
448 	/*
449 	 * Check to see if we got an upcall while waiting for the
450 	 * lock.
451 	 */
452 	if (cr->cr_error) {
453 		TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
454 		cu->cu_error.re_errno = cr->cr_error;
455 		cu->cu_error.re_status = RPC_CANTRECV;
456 		goto out;
457 	}
458 	if (cr->cr_mrep) {
459 		TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
460 		goto got_reply;
461 	}
462 
463 	/*
464 	 * Hack to provide rpc-based message passing
465 	 */
466 	if (timeout == 0) {
467 		TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
468 		cu->cu_error.re_status = RPC_TIMEDOUT;
469 		goto out;
470 	}
471 
472 get_reply:
473 	for (;;) {
474 		/* Decide how long to wait. */
475 		if (next_sendtime < timeout)
476 			tv = next_sendtime;
477 		else
478 			tv = timeout;
479 		tv -= time_waited;
480 
481 		if (tv > 0) {
482 			if (cu->cu_closing)
483 				error = 0;
484 			else
485 				error = msleep(cr, &cs->cs_lock,
486 				    cu->cu_waitflag, cu->cu_waitchan, tv);
487 		} else {
488 			error = EWOULDBLOCK;
489 		}
490 
491 		TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
492 
493 		if (!error) {
494 			/*
495 			 * We were woken up by the upcall.  If the
496 			 * upcall had a receive error, report that,
497 			 * otherwise we have a reply.
498 			 */
499 			if (cr->cr_error) {
500 				cu->cu_error.re_errno = cr->cr_error;
501 				cu->cu_error.re_status = RPC_CANTRECV;
502 				goto out;
503 			}
504 			break;
505 		}
506 
507 		/*
508 		 * The sleep returned an error so our request is still
509 		 * on the list. If we got EWOULDBLOCK, we may want to
510 		 * re-send the request.
511 		 */
512 		if (error != EWOULDBLOCK) {
513 			cu->cu_error.re_errno = error;
514 			if (error == EINTR)
515 				cu->cu_error.re_status = RPC_INTR;
516 			else
517 				cu->cu_error.re_status = RPC_CANTRECV;
518 			goto out;
519 		}
520 
521 		time_waited = ticks - starttime;
522 
523 		/* Check for timeout. */
524 		if (time_waited > timeout) {
525 			cu->cu_error.re_errno = EWOULDBLOCK;
526 			cu->cu_error.re_status = RPC_TIMEDOUT;
527 			goto out;
528 		}
529 
530 		/* Retransmit if necessary. */
531 		if (time_waited >= next_sendtime) {
532 			if (ext && ext->rc_feedback) {
533 				mtx_unlock(&cs->cs_lock);
534 				if (retrans == 0)
535 					ext->rc_feedback(FEEDBACK_REXMIT1,
536 					    proc, ext->rc_feedback_arg);
537 				else
538 					ext->rc_feedback(FEEDBACK_REXMIT2,
539 					    proc, ext->rc_feedback_arg);
540 				mtx_lock(&cs->cs_lock);
541 			}
542 			if (cu->cu_closing) {
543 				cu->cu_error.re_errno = ESHUTDOWN;
544 				cu->cu_error.re_status = RPC_CANTRECV;
545 				goto out;
546 			}
547 			retrans++;
548 			/* update retransmit_time */
549 			if (retransmit_time < RPC_MAX_BACKOFF * hz)
550 				retransmit_time = 2 * retransmit_time;
551 			next_sendtime += retransmit_time;
552 			goto send_again;
553 		}
554 		TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
555 	}
556 
557 got_reply:
558 	/*
559 	 * Now decode and validate the response. We need to drop the
560 	 * lock since xdr_replymsg may end up sleeping in malloc.
561 	 */
562 	mtx_unlock(&cs->cs_lock);
563 
564 	if (ext && ext->rc_feedback)
565 		ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
566 
567 	xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
568 	ok = xdr_replymsg(&xdrs, &reply_msg);
569 	XDR_DESTROY(&xdrs);
570 	cr->cr_mrep = NULL;
571 
572 	mtx_lock(&cs->cs_lock);
573 
574 	if (ok) {
575 		if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
576 			(reply_msg.acpted_rply.ar_stat == SUCCESS))
577 			cu->cu_error.re_status = RPC_SUCCESS;
578 		else
579 			_seterr_reply(&reply_msg, &(cu->cu_error));
580 
581 		if (cu->cu_error.re_status == RPC_SUCCESS) {
582 			if (! AUTH_VALIDATE(cl->cl_auth,
583 					    &reply_msg.acpted_rply.ar_verf)) {
584 				cu->cu_error.re_status = RPC_AUTHERROR;
585 				cu->cu_error.re_why = AUTH_INVALIDRESP;
586 			}
587 			if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
588 				xdrs.x_op = XDR_FREE;
589 				(void) xdr_opaque_auth(&xdrs,
590 					&(reply_msg.acpted_rply.ar_verf));
591 			}
592 		}		/* end successful completion */
593 		/*
594 		 * If unsuccesful AND error is an authentication error
595 		 * then refresh credentials and try again, else break
596 		 */
597 		else if (cu->cu_error.re_status == RPC_AUTHERROR)
598 			/* maybe our credentials need to be refreshed ... */
599 			if (nrefreshes > 0 &&
600 			    AUTH_REFRESH(cl->cl_auth, &reply_msg)) {
601 				nrefreshes--;
602 				goto call_again;
603 			}
604 		/* end of unsuccessful completion */
605 	}	/* end of valid reply message */
606 	else {
607 		cu->cu_error.re_status = RPC_CANTDECODERES;
608 
609 	}
610 out:
611 	mtx_assert(&cs->cs_lock, MA_OWNED);
612 
613 	if (mreq)
614 		m_freem(mreq);
615 	if (cr->cr_mrep)
616 		m_freem(cr->cr_mrep);
617 
618 	cu->cu_threads--;
619 	if (cu->cu_closing)
620 		wakeup(cu);
621 
622 	mtx_unlock(&cs->cs_lock);
623 
624 	free(cr, M_RPC);
625 
626 	return (cu->cu_error.re_status);
627 }
628 
629 static void
630 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp)
631 {
632 	struct cu_data *cu = (struct cu_data *)cl->cl_private;
633 
634 	*errp = cu->cu_error;
635 }
636 
637 static bool_t
638 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
639 {
640 	XDR xdrs;
641 	bool_t dummy;
642 
643 	xdrs.x_op = XDR_FREE;
644 	dummy = (*xdr_res)(&xdrs, res_ptr);
645 
646 	return (dummy);
647 }
648 
649 /*ARGSUSED*/
650 static void
651 clnt_dg_abort(CLIENT *h)
652 {
653 }
654 
655 static bool_t
656 clnt_dg_control(CLIENT *cl, u_int request, void *info)
657 {
658 	struct cu_data *cu = (struct cu_data *)cl->cl_private;
659 	struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
660 	struct sockaddr *addr;
661 
662 	mtx_lock(&cs->cs_lock);
663 
664 	switch (request) {
665 	case CLSET_FD_CLOSE:
666 		cu->cu_closeit = TRUE;
667 		mtx_unlock(&cs->cs_lock);
668 		return (TRUE);
669 	case CLSET_FD_NCLOSE:
670 		cu->cu_closeit = FALSE;
671 		mtx_unlock(&cs->cs_lock);
672 		return (TRUE);
673 	}
674 
675 	/* for other requests which use info */
676 	if (info == NULL) {
677 		mtx_unlock(&cs->cs_lock);
678 		return (FALSE);
679 	}
680 	switch (request) {
681 	case CLSET_TIMEOUT:
682 		if (time_not_ok((struct timeval *)info)) {
683 			mtx_unlock(&cs->cs_lock);
684 			return (FALSE);
685 		}
686 		cu->cu_total = *(struct timeval *)info;
687 		break;
688 	case CLGET_TIMEOUT:
689 		*(struct timeval *)info = cu->cu_total;
690 		break;
691 	case CLSET_RETRY_TIMEOUT:
692 		if (time_not_ok((struct timeval *)info)) {
693 			mtx_unlock(&cs->cs_lock);
694 			return (FALSE);
695 		}
696 		cu->cu_wait = *(struct timeval *)info;
697 		break;
698 	case CLGET_RETRY_TIMEOUT:
699 		*(struct timeval *)info = cu->cu_wait;
700 		break;
701 	case CLGET_SVC_ADDR:
702 		/*
703 		 * Slightly different semantics to userland - we use
704 		 * sockaddr instead of netbuf.
705 		 */
706 		memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len);
707 		break;
708 	case CLSET_SVC_ADDR:		/* set to new address */
709 		addr = (struct sockaddr *)info;
710 		(void) memcpy(&cu->cu_raddr, addr, addr->sa_len);
711 		break;
712 	case CLGET_XID:
713 		*(uint32_t *)info = cu->cu_xid;
714 		break;
715 
716 	case CLSET_XID:
717 		/* This will set the xid of the NEXT call */
718 		/* decrement by 1 as clnt_dg_call() increments once */
719 		cu->cu_xid = *(uint32_t *)info - 1;
720 		break;
721 
722 	case CLGET_VERS:
723 		/*
724 		 * This RELIES on the information that, in the call body,
725 		 * the version number field is the fifth field from the
726 		 * begining of the RPC header. MUST be changed if the
727 		 * call_struct is changed
728 		 */
729 		*(uint32_t *)info =
730 		    ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
731 		    4 * BYTES_PER_XDR_UNIT));
732 		break;
733 
734 	case CLSET_VERS:
735 		*(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT)
736 			= htonl(*(uint32_t *)info);
737 		break;
738 
739 	case CLGET_PROG:
740 		/*
741 		 * This RELIES on the information that, in the call body,
742 		 * the program number field is the fourth field from the
743 		 * begining of the RPC header. MUST be changed if the
744 		 * call_struct is changed
745 		 */
746 		*(uint32_t *)info =
747 		    ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
748 		    3 * BYTES_PER_XDR_UNIT));
749 		break;
750 
751 	case CLSET_PROG:
752 		*(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT)
753 			= htonl(*(uint32_t *)info);
754 		break;
755 	case CLSET_ASYNC:
756 		cu->cu_async = *(int *)info;
757 		break;
758 	case CLSET_CONNECT:
759 		cu->cu_connect = *(int *)info;
760 		break;
761 	case CLSET_WAITCHAN:
762 		cu->cu_waitchan = *(const char **)info;
763 		break;
764 	case CLGET_WAITCHAN:
765 		*(const char **) info = cu->cu_waitchan;
766 		break;
767 	case CLSET_INTERRUPTIBLE:
768 		if (*(int *) info)
769 			cu->cu_waitflag = PCATCH;
770 		else
771 			cu->cu_waitflag = 0;
772 		break;
773 	case CLGET_INTERRUPTIBLE:
774 		if (cu->cu_waitflag)
775 			*(int *) info = TRUE;
776 		else
777 			*(int *) info = FALSE;
778 		break;
779 	default:
780 		mtx_unlock(&cs->cs_lock);
781 		return (FALSE);
782 	}
783 	mtx_unlock(&cs->cs_lock);
784 	return (TRUE);
785 }
786 
787 static void
788 clnt_dg_destroy(CLIENT *cl)
789 {
790 	struct cu_data *cu = (struct cu_data *)cl->cl_private;
791 	struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
792 	struct cu_request *cr;
793 	struct socket *so = NULL;
794 	bool_t lastsocketref;
795 
796 	mtx_lock(&cs->cs_lock);
797 
798 	/*
799 	 * Abort any pending requests and wait until everyone
800 	 * has finished with clnt_vc_call.
801 	 */
802 	cu->cu_closing = TRUE;
803 	TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
804 		if (cr->cr_client == cl) {
805 			cr->cr_xid = 0;
806 			cr->cr_error = ESHUTDOWN;
807 			wakeup(cr);
808 		}
809 	}
810 
811 	while (cu->cu_threads)
812 		msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
813 
814 	cs->cs_refs--;
815 	if (cs->cs_refs == 0) {
816 		mtx_destroy(&cs->cs_lock);
817 		SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
818 		cu->cu_socket->so_upcallarg = NULL;
819 		cu->cu_socket->so_upcall = NULL;
820 		cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL;
821 		SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
822 		mem_free(cs, sizeof(*cs));
823 		lastsocketref = TRUE;
824 	} else {
825 		mtx_unlock(&cs->cs_lock);
826 		lastsocketref = FALSE;
827 	}
828 
829 	if (cu->cu_closeit && lastsocketref) {
830 		so = cu->cu_socket;
831 		cu->cu_socket = NULL;
832 	}
833 
834 	if (so)
835 		soclose(so);
836 
837 	if (cl->cl_netid && cl->cl_netid[0])
838 		mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
839 	if (cl->cl_tp && cl->cl_tp[0])
840 		mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
841 	mem_free(cu, sizeof (*cu));
842 	mem_free(cl, sizeof (CLIENT));
843 }
844 
845 /*
846  * Make sure that the time is not garbage.  -1 value is allowed.
847  */
848 static bool_t
849 time_not_ok(struct timeval *t)
850 {
851 	return (t->tv_sec < -1 || t->tv_sec > 100000000 ||
852 		t->tv_usec < -1 || t->tv_usec > 1000000);
853 }
854 
855 void
856 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
857 {
858 	struct cu_socket *cs = (struct cu_socket *) arg;
859 	struct uio uio;
860 	struct mbuf *m;
861 	struct mbuf *control;
862 	struct cu_request *cr;
863 	int error, rcvflag, foundreq;
864 	uint32_t xid;
865 
866 	uio.uio_resid = 1000000000;
867 	uio.uio_td = curthread;
868 	do {
869 		m = NULL;
870 		control = NULL;
871 		rcvflag = MSG_DONTWAIT;
872 		error = soreceive(so, NULL, &uio, &m, &control, &rcvflag);
873 		if (control)
874 			m_freem(control);
875 
876 		if (error == EWOULDBLOCK)
877 			break;
878 
879 		/*
880 		 * If there was an error, wake up all pending
881 		 * requests.
882 		 */
883 		if (error) {
884 			mtx_lock(&cs->cs_lock);
885 			TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
886 				cr->cr_xid = 0;
887 				cr->cr_error = error;
888 				wakeup(cr);
889 			}
890 			mtx_unlock(&cs->cs_lock);
891 			break;
892 		}
893 
894 		/*
895 		 * The XID is in the first uint32_t of the reply.
896 		 */
897 		m = m_pullup(m, sizeof(xid));
898 		if (!m)
899 			/*
900 			 * Should never happen.
901 			 */
902 			continue;
903 
904 		xid = ntohl(*mtod(m, uint32_t *));
905 
906 		/*
907 		 * Attempt to match this reply with a pending request.
908 		 */
909 		mtx_lock(&cs->cs_lock);
910 		foundreq = 0;
911 		TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
912 			if (cr->cr_xid == xid) {
913 				/*
914 				 * This one matches. We leave the
915 				 * reply mbuf in cr->cr_mrep. Set the
916 				 * XID to zero so that we will ignore
917 				 * any duplicated replies that arrive
918 				 * before clnt_dg_call removes it from
919 				 * the queue.
920 				 */
921 				cr->cr_xid = 0;
922 				cr->cr_mrep = m;
923 				cr->cr_error = 0;
924 				foundreq = 1;
925 				wakeup(cr);
926 				break;
927 			}
928 		}
929 		mtx_unlock(&cs->cs_lock);
930 
931 		/*
932 		 * If we didn't find the matching request, just drop
933 		 * it - its probably a repeated reply.
934 		 */
935 		if (!foundreq)
936 			m_freem(m);
937 	} while (m);
938 }
939 
940