xref: /illumos-gate/usr/src/lib/libnsl/rpc/clnt_vc.c (revision 3bc69b1d22f55fce8569ab14be68200e8b5185ef)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2015 Nexenta Systems, Inc.  All rights reserved.
24  * Copyright (c) 2016 by Delphix. All rights reserved.
25  */
26 
27 /*
28  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
29  * Use is subject to license terms.
30  */
31 
32 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
33 /* All Rights Reserved */
34 /*
35  * Portions of this source code were derived from Berkeley
36  * 4.3 BSD under license from the Regents of the University of
37  * California.
38  */
39 
40 /*
41  * clnt_vc.c
42  *
43  * Implements a connectionful client side RPC.
44  *
45  * Connectionful RPC supports 'batched calls'.
46  * A sequence of calls may be batched-up in a send buffer. The rpc call
47  * return immediately to the client even though the call was not necessarily
48  * sent. The batching occurs if the results' xdr routine is NULL (0) AND
49  * the rpc timeout value is zero (see clnt.h, rpc).
50  *
51  * Clients should NOT casually batch calls that in fact return results; that
52  * is the server side should be aware that a call is batched and not produce
53  * any return message. Batched calls that produce many result messages can
54  * deadlock (netlock) the client and the server....
55  */
56 
57 
58 #include "mt.h"
59 #include "rpc_mt.h"
60 #include <assert.h>
61 #include <rpc/rpc.h>
62 #include <errno.h>
63 #include <sys/byteorder.h>
64 #include <sys/mkdev.h>
65 #include <sys/poll.h>
66 #include <syslog.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <netinet/tcp.h>
70 #include <limits.h>
71 
72 #define	MCALL_MSG_SIZE 24
73 #define	SECS_TO_NS(x)	((hrtime_t)(x) * 1000 * 1000 * 1000)
74 #define	MSECS_TO_NS(x)	((hrtime_t)(x) * 1000 * 1000)
75 #define	USECS_TO_NS(x)	((hrtime_t)(x) * 1000)
76 #define	NSECS_TO_MS(x)	((x) / 1000 / 1000)
77 #ifndef MIN
78 #define	MIN(a, b)	(((a) < (b)) ? (a) : (b))
79 #endif
80 
81 extern int __rpc_timeval_to_msec(struct timeval *);
82 extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *);
83 extern bool_t xdr_opaque_auth(XDR *, struct opaque_auth *);
84 extern bool_t __rpc_gss_wrap(AUTH *, char *, uint_t, XDR *, bool_t (*)(),
85 								caddr_t);
86 extern bool_t __rpc_gss_unwrap(AUTH *, XDR *, bool_t (*)(), caddr_t);
87 extern CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t,
88 		rpcvers_t, uint_t, uint_t, const struct timeval *);
89 
90 static struct clnt_ops	*clnt_vc_ops(void);
91 static int		read_vc(void *, caddr_t, int);
92 static int		write_vc(void *, caddr_t, int);
93 static int		t_rcvall(int, char *, int);
94 static bool_t		time_not_ok(struct timeval *);
95 
96 struct ct_data;
97 static bool_t		set_up_connection(int, struct netbuf *,
98 				struct ct_data *, const struct timeval *);
99 static bool_t		set_io_mode(struct ct_data *, int);
100 
101 /*
102  * Lock table handle used by various MT sync. routines
103  */
104 static mutex_t	vctbl_lock = DEFAULTMUTEX;
105 static void	*vctbl = NULL;
106 
107 static const char clnt_vc_errstr[] = "%s : %s";
108 static const char clnt_vc_str[] = "clnt_vc_create";
109 static const char clnt_read_vc_str[] = "read_vc";
110 static const char __no_mem_str[] = "out of memory";
111 static const char no_fcntl_getfl_str[] = "could not get status flags and modes";
112 static const char no_nonblock_str[] = "could not set transport blocking mode";
113 
114 /*
115  * Private data structure
116  */
117 struct ct_data {
118 	int		ct_fd;		/* connection's fd */
119 	bool_t		ct_closeit;	/* close it on destroy */
120 	int		ct_tsdu;	/* size of tsdu */
121 	int		ct_wait;	/* wait interval in milliseconds */
122 	bool_t		ct_waitset;	/* wait set by clnt_control? */
123 	struct netbuf	ct_addr;	/* remote addr */
124 	struct rpc_err	ct_error;
125 	char		ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */
126 	uint_t		ct_mpos;	/* pos after marshal */
127 	XDR		ct_xdrs;	/* XDR stream */
128 
129 	/* NON STANDARD INFO - 00-08-31 */
130 	bool_t		ct_is_oneway; /* True if the current call is oneway. */
131 	bool_t		ct_is_blocking;
132 	ushort_t	ct_io_mode;
133 	ushort_t	ct_blocking_mode;
134 	uint_t		ct_bufferSize; /* Total size of the buffer. */
135 	uint_t		ct_bufferPendingSize; /* Size of unsent data. */
136 	char		*ct_buffer; /* Pointer to the buffer. */
137 	char		*ct_bufferWritePtr; /* Ptr to the first free byte. */
138 	char		*ct_bufferReadPtr; /* Ptr to the first byte of data. */
139 };
140 
141 struct nb_reg_node {
142 	struct nb_reg_node *next;
143 	struct ct_data *ct;
144 };
145 
146 static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first;
147 static struct nb_reg_node *nb_free  = (struct nb_reg_node *)&nb_free;
148 
149 static bool_t exit_handler_set = FALSE;
150 
151 static mutex_t nb_list_mutex = DEFAULTMUTEX;
152 
153 
154 /* Define some macros to manage the linked list. */
155 #define	LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l)
156 #define	LIST_CLR(l) (l = (struct nb_reg_node *)&l)
157 #define	LIST_ADD(l, node) (node->next = l->next, l = node)
158 #define	LIST_EXTRACT(l, node) (node = l, l = l->next)
159 #define	LIST_FOR_EACH(l, node) \
160 	for (node = l; node != (struct nb_reg_node *)&l; node = node->next)
161 
162 
163 /* Default size of the IO buffer used in non blocking mode */
164 #define	DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024)
165 
166 static int nb_send(struct ct_data *, void *, unsigned int);
167 static int do_flush(struct ct_data *, uint_t);
168 static bool_t set_flush_mode(struct ct_data *, int);
169 static bool_t set_blocking_connection(struct ct_data *, bool_t);
170 
171 static int register_nb(struct ct_data *);
172 static int unregister_nb(struct ct_data *);
173 
174 
175 /*
176  * Change the mode of the underlying fd.
177  */
178 static bool_t
179 set_blocking_connection(struct ct_data *ct, bool_t blocking)
180 {
181 	int flag;
182 
183 	/*
184 	 * If the underlying fd is already in the required mode,
185 	 * avoid the syscall.
186 	 */
187 	if (ct->ct_is_blocking == blocking)
188 		return (TRUE);
189 
190 	if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) {
191 		(void) syslog(LOG_ERR, "set_blocking_connection : %s",
192 		    no_fcntl_getfl_str);
193 		return (FALSE);
194 	}
195 
196 	flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK;
197 	if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) {
198 		(void) syslog(LOG_ERR, "set_blocking_connection : %s",
199 		    no_nonblock_str);
200 		return (FALSE);
201 	}
202 	ct->ct_is_blocking = blocking;
203 	return (TRUE);
204 }
205 
206 /*
207  * Create a client handle for a connection.
208  * Default options are set, which the user can change using clnt_control()'s.
209  * The rpc/vc package does buffering similar to stdio, so the client
210  * must pick send and receive buffer sizes, 0 => use the default.
211  * NB: fd is copied into a private area.
212  * NB: The rpch->cl_auth is set null authentication. Caller may wish to
213  * set this something more useful.
214  *
215  * fd should be open and bound.
216  */
217 CLIENT *
218 clnt_vc_create(const int fd, struct netbuf *svcaddr, const rpcprog_t prog,
219     const rpcvers_t vers, const uint_t sendsz, const uint_t recvsz)
220 {
221 	return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz,
222 	    recvsz, NULL));
223 }
224 
225 /*
226  * This has the same definition as clnt_vc_create(), except it
227  * takes an additional parameter - a pointer to a timeval structure.
228  *
229  * Not a public interface. This is for clnt_create_timed,
230  * clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout
231  * value to control a tcp connection attempt.
232  * (for bug 4049792: clnt_create_timed does not time out)
233  *
234  * If tp is NULL, use default timeout to set up the connection.
235  */
236 CLIENT *
237 _clnt_vc_create_timed(int fd, struct netbuf *svcaddr, rpcprog_t prog,
238     rpcvers_t vers, uint_t sendsz, uint_t recvsz, const struct timeval *tp)
239 {
240 	CLIENT *cl;			/* client handle */
241 	struct ct_data *ct;		/* private data */
242 	struct timeval now;
243 	struct rpc_msg call_msg;
244 	struct t_info tinfo;
245 	int flag;
246 
247 	cl = malloc(sizeof (*cl));
248 	if ((ct = malloc(sizeof (*ct))) != NULL)
249 		ct->ct_addr.buf = NULL;
250 
251 	if ((cl == NULL) || (ct == NULL)) {
252 		(void) syslog(LOG_ERR, clnt_vc_errstr,
253 		    clnt_vc_str, __no_mem_str);
254 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
255 		rpc_createerr.cf_error.re_errno = errno;
256 		rpc_createerr.cf_error.re_terrno = 0;
257 		goto err;
258 	}
259 
260 	/*
261 	 * The only use of vctbl_lock is for serializing the creation of
262 	 * vctbl. Once created the lock needs to be released so we don't
263 	 * hold it across the set_up_connection() call and end up with a
264 	 * bunch of threads stuck waiting for the mutex.
265 	 */
266 	sig_mutex_lock(&vctbl_lock);
267 
268 	if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) {
269 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
270 		rpc_createerr.cf_error.re_errno = errno;
271 		rpc_createerr.cf_error.re_terrno = 0;
272 		sig_mutex_unlock(&vctbl_lock);
273 		goto err;
274 	}
275 
276 	sig_mutex_unlock(&vctbl_lock);
277 
278 	ct->ct_io_mode = RPC_CL_BLOCKING;
279 	ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH;
280 
281 	ct->ct_buffer = NULL;	/* We allocate the buffer when needed. */
282 	ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE;
283 	ct->ct_bufferPendingSize = 0;
284 	ct->ct_bufferWritePtr = NULL;
285 	ct->ct_bufferReadPtr = NULL;
286 
287 	/* Check the current state of the fd. */
288 	if ((flag = fcntl(fd, F_GETFL, 0)) < 0) {
289 		(void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s",
290 		    no_fcntl_getfl_str);
291 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
292 		rpc_createerr.cf_error.re_terrno = errno;
293 		rpc_createerr.cf_error.re_errno = 0;
294 		goto err;
295 	}
296 	ct->ct_is_blocking = flag & O_NONBLOCK ? FALSE : TRUE;
297 
298 	if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) {
299 		goto err;
300 	}
301 
302 	/*
303 	 * Set up other members of private data struct
304 	 */
305 	ct->ct_fd = fd;
306 	/*
307 	 * The actual value will be set by clnt_call or clnt_control
308 	 */
309 	ct->ct_wait = 30000;
310 	ct->ct_waitset = FALSE;
311 	/*
312 	 * By default, closeit is always FALSE. It is users responsibility
313 	 * to do a t_close on it, else the user may use clnt_control
314 	 * to let clnt_destroy do it for them.
315 	 */
316 	ct->ct_closeit = FALSE;
317 
318 	/*
319 	 * Initialize call message
320 	 */
321 	(void) gettimeofday(&now, (struct timezone *)0);
322 	call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
323 	call_msg.rm_call.cb_prog = prog;
324 	call_msg.rm_call.cb_vers = vers;
325 
326 	/*
327 	 * pre-serialize the static part of the call msg and stash it away
328 	 */
329 	xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE);
330 	if (!xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
331 		goto err;
332 	}
333 	ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
334 	XDR_DESTROY(&(ct->ct_xdrs));
335 
336 	if (t_getinfo(fd, &tinfo) == -1) {
337 		rpc_createerr.cf_stat = RPC_TLIERROR;
338 		rpc_createerr.cf_error.re_terrno = t_errno;
339 		rpc_createerr.cf_error.re_errno = 0;
340 		goto err;
341 	}
342 	/*
343 	 * Find the receive and the send size
344 	 */
345 	sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu);
346 	recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu);
347 	if ((sendsz == 0) || (recvsz == 0)) {
348 		rpc_createerr.cf_stat = RPC_TLIERROR;
349 		rpc_createerr.cf_error.re_terrno = 0;
350 		rpc_createerr.cf_error.re_errno = 0;
351 		goto err;
352 	}
353 	ct->ct_tsdu = tinfo.tsdu;
354 	/*
355 	 * Create a client handle which uses xdrrec for serialization
356 	 * and authnone for authentication.
357 	 */
358 	ct->ct_xdrs.x_ops = NULL;
359 	xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct,
360 	    read_vc, write_vc);
361 	if (ct->ct_xdrs.x_ops == NULL) {
362 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
363 		rpc_createerr.cf_error.re_terrno = 0;
364 		rpc_createerr.cf_error.re_errno = ENOMEM;
365 		goto err;
366 	}
367 	cl->cl_ops = clnt_vc_ops();
368 	cl->cl_private = (caddr_t)ct;
369 	cl->cl_auth = authnone_create();
370 	cl->cl_tp = NULL;
371 	cl->cl_netid = NULL;
372 	return (cl);
373 
374 err:
375 	if (ct) {
376 		free(ct->ct_addr.buf);
377 		free(ct);
378 	}
379 	free(cl);
380 
381 	return (NULL);
382 }
383 
384 #define	TCPOPT_BUFSIZE 128
385 
386 /*
387  * Set tcp connection timeout value.
388  * Retun 0 for success, -1 for failure.
389  */
390 static int
391 _set_tcp_conntime(int fd, int optval)
392 {
393 	struct t_optmgmt req, res;
394 	struct opthdr *opt;
395 	int *ip;
396 	char buf[TCPOPT_BUFSIZE];
397 
398 	opt = (struct opthdr *)buf;
399 	opt->level =  IPPROTO_TCP;
400 	opt->name = TCP_CONN_ABORT_THRESHOLD;
401 	opt->len = sizeof (int);
402 
403 	req.flags = T_NEGOTIATE;
404 	req.opt.len = sizeof (struct opthdr) + opt->len;
405 	req.opt.buf = (char *)opt;
406 	ip = (int *)((char *)buf + sizeof (struct opthdr));
407 	*ip = optval;
408 
409 	res.flags = 0;
410 	res.opt.buf = (char *)buf;
411 	res.opt.maxlen = sizeof (buf);
412 	if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
413 		return (-1);
414 	}
415 	return (0);
416 }
417 
418 /*
419  * Get current tcp connection timeout value.
420  * Retun the timeout in milliseconds, or -1 for failure.
421  */
422 static int
423 _get_tcp_conntime(int fd)
424 {
425 	struct t_optmgmt req, res;
426 	struct opthdr *opt;
427 	int *ip, retval;
428 	char buf[TCPOPT_BUFSIZE];
429 
430 	opt = (struct opthdr *)buf;
431 	opt->level =  IPPROTO_TCP;
432 	opt->name = TCP_CONN_ABORT_THRESHOLD;
433 	opt->len = sizeof (int);
434 
435 	req.flags = T_CURRENT;
436 	req.opt.len = sizeof (struct opthdr) + opt->len;
437 	req.opt.buf = (char *)opt;
438 	ip = (int *)((char *)buf + sizeof (struct opthdr));
439 	*ip = 0;
440 
441 	res.flags = 0;
442 	res.opt.buf = (char *)buf;
443 	res.opt.maxlen = sizeof (buf);
444 	if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
445 		return (-1);
446 	}
447 
448 	ip = (int *)((char *)buf + sizeof (struct opthdr));
449 	retval = *ip;
450 	return (retval);
451 }
452 
453 static bool_t
454 set_up_connection(int fd, struct netbuf *svcaddr, struct ct_data *ct,
455     const struct timeval *tp)
456 {
457 	int state;
458 	struct t_call sndcallstr, *rcvcall;
459 	int nconnect;
460 	bool_t connected, do_rcv_connect;
461 	int curr_time = -1;
462 	hrtime_t start;
463 	hrtime_t tout;	/* timeout in nanoseconds (from tp) */
464 
465 	ct->ct_addr.len = 0;
466 	state = t_getstate(fd);
467 	if (state == -1) {
468 		rpc_createerr.cf_stat = RPC_TLIERROR;
469 		rpc_createerr.cf_error.re_errno = 0;
470 		rpc_createerr.cf_error.re_terrno = t_errno;
471 		return (FALSE);
472 	}
473 
474 	switch (state) {
475 	case T_IDLE:
476 		if (svcaddr == NULL) {
477 			rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
478 			return (FALSE);
479 		}
480 		/*
481 		 * Connect only if state is IDLE and svcaddr known
482 		 */
483 		rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR);
484 		if (rcvcall == NULL) {
485 			rpc_createerr.cf_stat = RPC_TLIERROR;
486 			rpc_createerr.cf_error.re_terrno = t_errno;
487 			rpc_createerr.cf_error.re_errno = errno;
488 			return (FALSE);
489 		}
490 		rcvcall->udata.maxlen = 0;
491 		sndcallstr.addr = *svcaddr;
492 		sndcallstr.opt.len = 0;
493 		sndcallstr.udata.len = 0;
494 		/*
495 		 * Even NULL could have sufficed for rcvcall, because
496 		 * the address returned is same for all cases except
497 		 * for the gateway case, and hence required.
498 		 */
499 		connected = FALSE;
500 		do_rcv_connect = FALSE;
501 
502 		/*
503 		 * If there is a timeout value specified, we will try to
504 		 * reset the tcp connection timeout. If the transport does
505 		 * not support the TCP_CONN_ABORT_THRESHOLD option or fails
506 		 * for other reason, default timeout will be used.
507 		 */
508 		if (tp != NULL) {
509 			start = gethrtime();
510 
511 			/*
512 			 * Calculate the timeout in nanoseconds
513 			 */
514 			tout = SECS_TO_NS(tp->tv_sec) +
515 			    USECS_TO_NS(tp->tv_usec);
516 			curr_time = _get_tcp_conntime(fd);
517 		}
518 
519 		for (nconnect = 0; nconnect < 3; nconnect++) {
520 			if (tp != NULL) {
521 				/*
522 				 * Calculate the elapsed time
523 				 */
524 				hrtime_t elapsed = gethrtime() - start;
525 				if (elapsed >= tout)
526 					break;
527 
528 				if (curr_time != -1) {
529 					int ms;
530 
531 					/*
532 					 * TCP_CONN_ABORT_THRESHOLD takes int
533 					 * value in milliseconds.  Make sure we
534 					 * do not overflow.
535 					 */
536 					if (NSECS_TO_MS(tout - elapsed) >=
537 					    INT_MAX) {
538 						ms = INT_MAX;
539 					} else {
540 						ms = (int)
541 						    NSECS_TO_MS(tout - elapsed);
542 						if (MSECS_TO_NS(ms) !=
543 						    tout - elapsed)
544 							ms++;
545 					}
546 
547 					(void) _set_tcp_conntime(fd, ms);
548 				}
549 			}
550 
551 			if (t_connect(fd, &sndcallstr, rcvcall) != -1) {
552 				connected = TRUE;
553 				break;
554 			}
555 			if (t_errno == TLOOK) {
556 				switch (t_look(fd)) {
557 				case T_DISCONNECT:
558 					(void) t_rcvdis(fd, (struct
559 					    t_discon *) NULL);
560 					break;
561 				default:
562 					break;
563 				}
564 			} else if (!(t_errno == TSYSERR && errno == EINTR)) {
565 				break;
566 			}
567 			if ((state = t_getstate(fd)) == T_OUTCON) {
568 				do_rcv_connect = TRUE;
569 				break;
570 			}
571 			if (state != T_IDLE) {
572 				break;
573 			}
574 		}
575 		if (do_rcv_connect) {
576 			do {
577 				if (t_rcvconnect(fd, rcvcall) != -1) {
578 					connected = TRUE;
579 					break;
580 				}
581 			} while (t_errno == TSYSERR && errno == EINTR);
582 		}
583 
584 		/*
585 		 * Set the connection timeout back to its old value.
586 		 */
587 		if (curr_time != -1) {
588 			(void) _set_tcp_conntime(fd, curr_time);
589 		}
590 
591 		if (!connected) {
592 			rpc_createerr.cf_stat = RPC_TLIERROR;
593 			rpc_createerr.cf_error.re_terrno = t_errno;
594 			rpc_createerr.cf_error.re_errno = errno;
595 			(void) t_free((char *)rcvcall, T_CALL);
596 			return (FALSE);
597 		}
598 
599 		/* Free old area if allocated */
600 		if (ct->ct_addr.buf)
601 			free(ct->ct_addr.buf);
602 		ct->ct_addr = rcvcall->addr;	/* To get the new address */
603 		/* So that address buf does not get freed */
604 		rcvcall->addr.buf = NULL;
605 		(void) t_free((char *)rcvcall, T_CALL);
606 		break;
607 	case T_DATAXFER:
608 	case T_OUTCON:
609 		if (svcaddr == NULL) {
610 			/*
611 			 * svcaddr could also be NULL in cases where the
612 			 * client is already bound and connected.
613 			 */
614 			ct->ct_addr.len = 0;
615 		} else {
616 			ct->ct_addr.buf = malloc(svcaddr->len);
617 			if (ct->ct_addr.buf == NULL) {
618 				(void) syslog(LOG_ERR, clnt_vc_errstr,
619 				    clnt_vc_str, __no_mem_str);
620 				rpc_createerr.cf_stat = RPC_SYSTEMERROR;
621 				rpc_createerr.cf_error.re_errno = errno;
622 				rpc_createerr.cf_error.re_terrno = 0;
623 				return (FALSE);
624 			}
625 			(void) memcpy(ct->ct_addr.buf, svcaddr->buf,
626 			    (size_t)svcaddr->len);
627 			ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len;
628 		}
629 		break;
630 	default:
631 		rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
632 		return (FALSE);
633 	}
634 	return (TRUE);
635 }
636 
637 static enum clnt_stat
638 clnt_vc_call(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr,
639     xdrproc_t xdr_results, caddr_t results_ptr, struct timeval timeout)
640 {
641 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
642 	XDR *xdrs = &(ct->ct_xdrs);
643 	struct rpc_msg reply_msg;
644 	uint32_t x_id;
645 	uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);	/* yuk */
646 	bool_t shipnow;
647 	int refreshes = 2;
648 
649 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
650 		rpc_callerr.re_status = RPC_FAILED;
651 		rpc_callerr.re_errno = errno;
652 		rpc_fd_unlock(vctbl, ct->ct_fd);
653 		return (RPC_FAILED);
654 	}
655 
656 	ct->ct_is_oneway = FALSE;
657 	if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
658 		if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) {
659 			rpc_fd_unlock(vctbl, ct->ct_fd);
660 			return (RPC_FAILED);  /* XXX */
661 		}
662 	}
663 
664 	if (!ct->ct_waitset) {
665 		/* If time is not within limits, we ignore it. */
666 		if (time_not_ok(&timeout) == FALSE)
667 			ct->ct_wait = __rpc_timeval_to_msec(&timeout);
668 	} else {
669 		timeout.tv_sec = (ct->ct_wait / 1000);
670 		timeout.tv_usec = (ct->ct_wait % 1000) * 1000;
671 	}
672 
673 	shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) &&
674 	    (timeout.tv_usec == 0)) ? FALSE : TRUE;
675 call_again:
676 	xdrs->x_op = XDR_ENCODE;
677 	rpc_callerr.re_status = RPC_SUCCESS;
678 	/*
679 	 * Due to little endian byte order, it is necessary to convert to host
680 	 * format before decrementing xid.
681 	 */
682 	x_id = ntohl(*msg_x_id) - 1;
683 	*msg_x_id = htonl(x_id);
684 
685 	if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
686 		if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
687 		    (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
688 		    (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
689 		    (!xdr_args(xdrs, args_ptr))) {
690 			if (rpc_callerr.re_status == RPC_SUCCESS)
691 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
692 			(void) xdrrec_endofrecord(xdrs, TRUE);
693 			rpc_fd_unlock(vctbl, ct->ct_fd);
694 			return (rpc_callerr.re_status);
695 		}
696 	} else {
697 		uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
698 		IXDR_PUT_U_INT32(u, proc);
699 		if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
700 		    ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
701 			if (rpc_callerr.re_status == RPC_SUCCESS)
702 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
703 			(void) xdrrec_endofrecord(xdrs, TRUE);
704 			rpc_fd_unlock(vctbl, ct->ct_fd);
705 			return (rpc_callerr.re_status);
706 		}
707 	}
708 	if (!xdrrec_endofrecord(xdrs, shipnow)) {
709 		rpc_fd_unlock(vctbl, ct->ct_fd);
710 		return (rpc_callerr.re_status = RPC_CANTSEND);
711 	}
712 	if (!shipnow) {
713 		rpc_fd_unlock(vctbl, ct->ct_fd);
714 		return (RPC_SUCCESS);
715 	}
716 	/*
717 	 * Hack to provide rpc-based message passing
718 	 */
719 	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
720 		rpc_fd_unlock(vctbl, ct->ct_fd);
721 		return (rpc_callerr.re_status = RPC_TIMEDOUT);
722 	}
723 
724 
725 	/*
726 	 * Keep receiving until we get a valid transaction id
727 	 */
728 	xdrs->x_op = XDR_DECODE;
729 	for (;;) {
730 		reply_msg.acpted_rply.ar_verf = _null_auth;
731 		reply_msg.acpted_rply.ar_results.where = NULL;
732 		reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
733 		if (!xdrrec_skiprecord(xdrs)) {
734 			rpc_fd_unlock(vctbl, ct->ct_fd);
735 			return (rpc_callerr.re_status);
736 		}
737 		/* now decode and validate the response header */
738 		if (!xdr_replymsg(xdrs, &reply_msg)) {
739 			if (rpc_callerr.re_status == RPC_SUCCESS)
740 				continue;
741 			rpc_fd_unlock(vctbl, ct->ct_fd);
742 			return (rpc_callerr.re_status);
743 		}
744 		if (reply_msg.rm_xid == x_id)
745 			break;
746 	}
747 
748 	/*
749 	 * process header
750 	 */
751 	if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
752 	    (reply_msg.acpted_rply.ar_stat == SUCCESS))
753 		rpc_callerr.re_status = RPC_SUCCESS;
754 	else
755 		__seterr_reply(&reply_msg, &(rpc_callerr));
756 
757 	if (rpc_callerr.re_status == RPC_SUCCESS) {
758 		if (!AUTH_VALIDATE(cl->cl_auth,
759 		    &reply_msg.acpted_rply.ar_verf)) {
760 			rpc_callerr.re_status = RPC_AUTHERROR;
761 			rpc_callerr.re_why = AUTH_INVALIDRESP;
762 		} else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
763 			if (!(*xdr_results)(xdrs, results_ptr)) {
764 				if (rpc_callerr.re_status == RPC_SUCCESS)
765 					rpc_callerr.re_status =
766 					    RPC_CANTDECODERES;
767 			}
768 		} else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results,
769 		    results_ptr)) {
770 			if (rpc_callerr.re_status == RPC_SUCCESS)
771 				rpc_callerr.re_status = RPC_CANTDECODERES;
772 		}
773 	}	/* end successful completion */
774 	/*
775 	 * If unsuccesful AND error is an authentication error
776 	 * then refresh credentials and try again, else break
777 	 */
778 	else if (rpc_callerr.re_status == RPC_AUTHERROR) {
779 		/* maybe our credentials need to be refreshed ... */
780 		if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
781 			goto call_again;
782 		else
783 			/*
784 			 * We are setting rpc_callerr here given that libnsl
785 			 * is not reentrant thereby reinitializing the TSD.
786 			 * If not set here then success could be returned even
787 			 * though refresh failed.
788 			 */
789 			rpc_callerr.re_status = RPC_AUTHERROR;
790 	} /* end of unsuccessful completion */
791 	/* free verifier ... */
792 	if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED &&
793 	    reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
794 		xdrs->x_op = XDR_FREE;
795 		(void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
796 	}
797 	rpc_fd_unlock(vctbl, ct->ct_fd);
798 	return (rpc_callerr.re_status);
799 }
800 
801 static enum clnt_stat
802 clnt_vc_send(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr)
803 {
804 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
805 	XDR *xdrs = &(ct->ct_xdrs);
806 	uint32_t x_id;
807 	uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);	/* yuk */
808 
809 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
810 		rpc_callerr.re_status = RPC_FAILED;
811 		rpc_callerr.re_errno = errno;
812 		rpc_fd_unlock(vctbl, ct->ct_fd);
813 		return (RPC_FAILED);
814 	}
815 
816 	ct->ct_is_oneway = TRUE;
817 
818 	xdrs->x_op = XDR_ENCODE;
819 	rpc_callerr.re_status = RPC_SUCCESS;
820 	/*
821 	 * Due to little endian byte order, it is necessary to convert to host
822 	 * format before decrementing xid.
823 	 */
824 	x_id = ntohl(*msg_x_id) - 1;
825 	*msg_x_id = htonl(x_id);
826 
827 	if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
828 		if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
829 		    (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
830 		    (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
831 		    (!xdr_args(xdrs, args_ptr))) {
832 			if (rpc_callerr.re_status == RPC_SUCCESS)
833 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
834 			(void) xdrrec_endofrecord(xdrs, TRUE);
835 			rpc_fd_unlock(vctbl, ct->ct_fd);
836 			return (rpc_callerr.re_status);
837 		}
838 	} else {
839 		uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
840 		IXDR_PUT_U_INT32(u, proc);
841 		if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
842 		    ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
843 			if (rpc_callerr.re_status == RPC_SUCCESS)
844 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
845 			(void) xdrrec_endofrecord(xdrs, TRUE);
846 			rpc_fd_unlock(vctbl, ct->ct_fd);
847 			return (rpc_callerr.re_status);
848 		}
849 	}
850 
851 	/*
852 	 * Do not need to check errors, as the following code does
853 	 * not depend on the successful completion of the call.
854 	 * An error, if any occurs, is reported through
855 	 * rpc_callerr.re_status.
856 	 */
857 	(void) xdrrec_endofrecord(xdrs, TRUE);
858 
859 	rpc_fd_unlock(vctbl, ct->ct_fd);
860 	return (rpc_callerr.re_status);
861 }
862 
863 /* ARGSUSED */
864 static void
865 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
866 {
867 	*errp = rpc_callerr;
868 }
869 
870 static bool_t
871 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, caddr_t res_ptr)
872 {
873 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
874 	XDR *xdrs = &(ct->ct_xdrs);
875 	bool_t stat;
876 
877 	(void) rpc_fd_lock(vctbl, ct->ct_fd);
878 	xdrs->x_op = XDR_FREE;
879 	stat = (*xdr_res)(xdrs, res_ptr);
880 	rpc_fd_unlock(vctbl, ct->ct_fd);
881 	return (stat);
882 }
883 
884 static void
885 clnt_vc_abort(void)
886 {
887 }
888 
889 static bool_t
890 clnt_vc_control(CLIENT *cl, int request, char *info)
891 {
892 	bool_t ret;
893 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
894 
895 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
896 		rpc_fd_unlock(vctbl, ct->ct_fd);
897 		return (FALSE);
898 	}
899 
900 	switch (request) {
901 	case CLSET_FD_CLOSE:
902 		ct->ct_closeit = TRUE;
903 		rpc_fd_unlock(vctbl, ct->ct_fd);
904 		return (TRUE);
905 	case CLSET_FD_NCLOSE:
906 		ct->ct_closeit = FALSE;
907 		rpc_fd_unlock(vctbl, ct->ct_fd);
908 		return (TRUE);
909 	case CLFLUSH:
910 		if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
911 			int res;
912 			res = do_flush(ct, (info == NULL ||
913 			    *(int *)info == RPC_CL_DEFAULT_FLUSH)?
914 			    ct->ct_blocking_mode: *(int *)info);
915 			ret = (0 == res);
916 		} else {
917 			ret = FALSE;
918 		}
919 		rpc_fd_unlock(vctbl, ct->ct_fd);
920 		return (ret);
921 	}
922 
923 	/* for other requests which use info */
924 	if (info == NULL) {
925 		rpc_fd_unlock(vctbl, ct->ct_fd);
926 		return (FALSE);
927 	}
928 	switch (request) {
929 	case CLSET_TIMEOUT:
930 		if (time_not_ok((struct timeval *)info)) {
931 			rpc_fd_unlock(vctbl, ct->ct_fd);
932 			return (FALSE);
933 		}
934 		ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info);
935 		ct->ct_waitset = TRUE;
936 		break;
937 	case CLGET_TIMEOUT:
938 		((struct timeval *)info)->tv_sec = ct->ct_wait / 1000;
939 		((struct timeval *)info)->tv_usec = (ct->ct_wait % 1000) * 1000;
940 		break;
941 	case CLGET_SERVER_ADDR:	/* For compatibility only */
942 		(void) memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len);
943 		break;
944 	case CLGET_FD:
945 		*(int *)info = ct->ct_fd;
946 		break;
947 	case CLGET_SVC_ADDR:
948 		/* The caller should not free this memory area */
949 		*(struct netbuf *)info = ct->ct_addr;
950 		break;
951 	case CLSET_SVC_ADDR:		/* set to new address */
952 #ifdef undef
953 		/*
954 		 * XXX: once the t_snddis(), followed by t_connect() starts to
955 		 * work, this ifdef should be removed.  CLIENT handle reuse
956 		 * would then be possible for COTS as well.
957 		 */
958 		if (t_snddis(ct->ct_fd, NULL) == -1) {
959 			rpc_createerr.cf_stat = RPC_TLIERROR;
960 			rpc_createerr.cf_error.re_terrno = t_errno;
961 			rpc_createerr.cf_error.re_errno = errno;
962 			rpc_fd_unlock(vctbl, ct->ct_fd);
963 			return (FALSE);
964 		}
965 		ret = set_up_connection(ct->ct_fd, (struct netbuf *)info,
966 		    ct, NULL);
967 		rpc_fd_unlock(vctbl, ct->ct_fd);
968 		return (ret);
969 #else
970 		rpc_fd_unlock(vctbl, ct->ct_fd);
971 		return (FALSE);
972 #endif
973 	case CLGET_XID:
974 		/*
975 		 * use the knowledge that xid is the
976 		 * first element in the call structure
977 		 * This will get the xid of the PREVIOUS call
978 		 */
979 		*(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall);
980 		break;
981 	case CLSET_XID:
982 		/* This will set the xid of the NEXT call */
983 		*(uint32_t *)ct->ct_mcall =  htonl(*(uint32_t *)info + 1);
984 		/* increment by 1 as clnt_vc_call() decrements once */
985 		break;
986 	case CLGET_VERS:
987 		/*
988 		 * This RELIES on the information that, in the call body,
989 		 * the version number field is the fifth field from the
990 		 * begining of the RPC header. MUST be changed if the
991 		 * call_struct is changed
992 		 */
993 		*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
994 		    4 * BYTES_PER_XDR_UNIT));
995 		break;
996 
997 	case CLSET_VERS:
998 		*(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) =
999 		    htonl(*(uint32_t *)info);
1000 		break;
1001 
1002 	case CLGET_PROG:
1003 		/*
1004 		 * This RELIES on the information that, in the call body,
1005 		 * the program number field is the fourth field from the
1006 		 * begining of the RPC header. MUST be changed if the
1007 		 * call_struct is changed
1008 		 */
1009 		*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
1010 		    3 * BYTES_PER_XDR_UNIT));
1011 		break;
1012 
1013 	case CLSET_PROG:
1014 		*(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) =
1015 		    htonl(*(uint32_t *)info);
1016 		break;
1017 
1018 	case CLSET_IO_MODE:
1019 		if (!set_io_mode(ct, *(int *)info)) {
1020 			rpc_fd_unlock(vctbl, ct->ct_fd);
1021 			return (FALSE);
1022 		}
1023 		break;
1024 	case CLSET_FLUSH_MODE:
1025 		/* Set a specific FLUSH_MODE */
1026 		if (!set_flush_mode(ct, *(int *)info)) {
1027 			rpc_fd_unlock(vctbl, ct->ct_fd);
1028 			return (FALSE);
1029 		}
1030 		break;
1031 	case CLGET_FLUSH_MODE:
1032 		*(rpcflushmode_t *)info = ct->ct_blocking_mode;
1033 		break;
1034 
1035 	case CLGET_IO_MODE:
1036 		*(rpciomode_t *)info = ct->ct_io_mode;
1037 		break;
1038 
1039 	case CLGET_CURRENT_REC_SIZE:
1040 		/*
1041 		 * Returns the current amount of memory allocated
1042 		 * to pending requests
1043 		 */
1044 		*(int *)info = ct->ct_bufferPendingSize;
1045 		break;
1046 
1047 	case CLSET_CONNMAXREC_SIZE:
1048 		/* Cannot resize the buffer if it is used. */
1049 		if (ct->ct_bufferPendingSize != 0) {
1050 			rpc_fd_unlock(vctbl, ct->ct_fd);
1051 			return (FALSE);
1052 		}
1053 		/*
1054 		 * If the new size is equal to the current size,
1055 		 * there is nothing to do.
1056 		 */
1057 		if (ct->ct_bufferSize == *(uint_t *)info)
1058 			break;
1059 
1060 		ct->ct_bufferSize = *(uint_t *)info;
1061 		if (ct->ct_buffer) {
1062 			free(ct->ct_buffer);
1063 			ct->ct_buffer = NULL;
1064 			ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL;
1065 		}
1066 		break;
1067 
1068 	case CLGET_CONNMAXREC_SIZE:
1069 		/*
1070 		 * Returns the size of buffer allocated
1071 		 * to pending requests
1072 		 */
1073 		*(uint_t *)info = ct->ct_bufferSize;
1074 		break;
1075 
1076 	default:
1077 		rpc_fd_unlock(vctbl, ct->ct_fd);
1078 		return (FALSE);
1079 	}
1080 	rpc_fd_unlock(vctbl, ct->ct_fd);
1081 	return (TRUE);
1082 }
1083 
1084 static void
1085 clnt_vc_destroy(CLIENT *cl)
1086 {
1087 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
1088 	int ct_fd = ct->ct_fd;
1089 
1090 	(void) rpc_fd_lock(vctbl, ct_fd);
1091 
1092 	if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1093 		(void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1094 		(void) unregister_nb(ct);
1095 	}
1096 
1097 	if (ct->ct_closeit)
1098 		(void) t_close(ct_fd);
1099 	XDR_DESTROY(&(ct->ct_xdrs));
1100 	if (ct->ct_addr.buf)
1101 		free(ct->ct_addr.buf);
1102 	free(ct);
1103 	if (cl->cl_netid && cl->cl_netid[0])
1104 		free(cl->cl_netid);
1105 	if (cl->cl_tp && cl->cl_tp[0])
1106 		free(cl->cl_tp);
1107 	free(cl);
1108 	rpc_fd_unlock(vctbl, ct_fd);
1109 }
1110 
1111 /*
1112  * Interface between xdr serializer and vc connection.
1113  * Behaves like the system calls, read & write, but keeps some error state
1114  * around for the rpc level.
1115  */
1116 static int
1117 read_vc(void *ct_tmp, caddr_t buf, int len)
1118 {
1119 	static pthread_key_t pfdp_key = PTHREAD_ONCE_KEY_NP;
1120 	struct pollfd *pfdp;
1121 	int npfd;		/* total number of pfdp allocated */
1122 	struct ct_data *ct = ct_tmp;
1123 	struct timeval starttime;
1124 	struct timeval curtime;
1125 	int poll_time;
1126 	int delta;
1127 
1128 	if (len == 0)
1129 		return (0);
1130 
1131 	/*
1132 	 * Allocate just one the first time.  thr_get_storage() may
1133 	 * return a larger buffer, left over from the last time we were
1134 	 * here, but that's OK.  realloc() will deal with it properly.
1135 	 */
1136 	npfd = 1;
1137 	pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free);
1138 	if (pfdp == NULL) {
1139 		(void) syslog(LOG_ERR, clnt_vc_errstr,
1140 		    clnt_read_vc_str, __no_mem_str);
1141 		rpc_callerr.re_status = RPC_SYSTEMERROR;
1142 		rpc_callerr.re_errno = errno;
1143 		rpc_callerr.re_terrno = 0;
1144 		return (-1);
1145 	}
1146 
1147 	/*
1148 	 *	N.B.:  slot 0 in the pollfd array is reserved for the file
1149 	 *	descriptor we're really interested in (as opposed to the
1150 	 *	callback descriptors).
1151 	 */
1152 	pfdp[0].fd = ct->ct_fd;
1153 	pfdp[0].events = MASKVAL;
1154 	pfdp[0].revents = 0;
1155 	poll_time = ct->ct_wait;
1156 	if (gettimeofday(&starttime, NULL) == -1) {
1157 		syslog(LOG_ERR, "Unable to get time of day: %m");
1158 		return (-1);
1159 	}
1160 
1161 	for (;;) {
1162 		extern void (*_svc_getreqset_proc)();
1163 		extern pollfd_t *svc_pollfd;
1164 		extern int svc_max_pollfd;
1165 		int fds;
1166 
1167 		/* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */
1168 
1169 		if (_svc_getreqset_proc) {
1170 			sig_rw_rdlock(&svc_fd_lock);
1171 
1172 			/* reallocate pfdp to svc_max_pollfd +1 */
1173 			if (npfd != (svc_max_pollfd + 1)) {
1174 				struct pollfd *tmp_pfdp = realloc(pfdp,
1175 				    sizeof (struct pollfd) *
1176 				    (svc_max_pollfd + 1));
1177 				if (tmp_pfdp == NULL) {
1178 					sig_rw_unlock(&svc_fd_lock);
1179 					(void) syslog(LOG_ERR, clnt_vc_errstr,
1180 					    clnt_read_vc_str, __no_mem_str);
1181 					rpc_callerr.re_status = RPC_SYSTEMERROR;
1182 					rpc_callerr.re_errno = errno;
1183 					rpc_callerr.re_terrno = 0;
1184 					return (-1);
1185 				}
1186 
1187 				pfdp = tmp_pfdp;
1188 				npfd = svc_max_pollfd + 1;
1189 				(void) pthread_setspecific(pfdp_key, pfdp);
1190 			}
1191 			if (npfd > 1)
1192 				(void) memcpy(&pfdp[1], svc_pollfd,
1193 				    sizeof (struct pollfd) * (npfd - 1));
1194 
1195 			sig_rw_unlock(&svc_fd_lock);
1196 		} else {
1197 			npfd = 1;	/* don't forget about pfdp[0] */
1198 		}
1199 
1200 		switch (fds = poll(pfdp, npfd, poll_time)) {
1201 		case 0:
1202 			rpc_callerr.re_status = RPC_TIMEDOUT;
1203 			return (-1);
1204 
1205 		case -1:
1206 			if (errno != EINTR)
1207 				continue;
1208 			else {
1209 				/*
1210 				 * interrupted by another signal,
1211 				 * update time_waited
1212 				 */
1213 
1214 				if (gettimeofday(&curtime, NULL) == -1) {
1215 					syslog(LOG_ERR,
1216 					    "Unable to get time of day:  %m");
1217 					errno = 0;
1218 					continue;
1219 				};
1220 				delta = (curtime.tv_sec -
1221 				    starttime.tv_sec) * 1000 +
1222 				    (curtime.tv_usec -
1223 				    starttime.tv_usec) / 1000;
1224 				poll_time -= delta;
1225 				if (poll_time < 0) {
1226 					rpc_callerr.re_status = RPC_TIMEDOUT;
1227 					errno = 0;
1228 					return (-1);
1229 				} else {
1230 					errno = 0; /* reset it */
1231 					continue;
1232 				}
1233 			}
1234 		}
1235 
1236 		if (pfdp[0].revents == 0) {
1237 			/* must be for server side of the house */
1238 			(*_svc_getreqset_proc)(&pfdp[1], fds);
1239 			continue;	/* do poll again */
1240 		}
1241 
1242 		if (pfdp[0].revents & POLLNVAL) {
1243 			rpc_callerr.re_status = RPC_CANTRECV;
1244 			/*
1245 			 *	Note:  we're faking errno here because we
1246 			 *	previously would have expected select() to
1247 			 *	return -1 with errno EBADF.  Poll(BA_OS)
1248 			 *	returns 0 and sets the POLLNVAL revents flag
1249 			 *	instead.
1250 			 */
1251 			rpc_callerr.re_errno = errno = EBADF;
1252 			return (-1);
1253 		}
1254 
1255 		if (pfdp[0].revents & (POLLERR | POLLHUP)) {
1256 			rpc_callerr.re_status = RPC_CANTRECV;
1257 			rpc_callerr.re_errno = errno = EPIPE;
1258 			return (-1);
1259 		}
1260 		break;
1261 	}
1262 
1263 	switch (len = t_rcvall(ct->ct_fd, buf, len)) {
1264 	case 0:
1265 		/* premature eof */
1266 		rpc_callerr.re_errno = ENOLINK;
1267 		rpc_callerr.re_terrno = 0;
1268 		rpc_callerr.re_status = RPC_CANTRECV;
1269 		len = -1;	/* it's really an error */
1270 		break;
1271 
1272 	case -1:
1273 		rpc_callerr.re_terrno = t_errno;
1274 		rpc_callerr.re_errno = 0;
1275 		rpc_callerr.re_status = RPC_CANTRECV;
1276 		break;
1277 	}
1278 	return (len);
1279 }
1280 
1281 static int
1282 write_vc(void *ct_tmp, caddr_t buf, int len)
1283 {
1284 	int i, cnt;
1285 	struct ct_data *ct = ct_tmp;
1286 	int flag;
1287 	int maxsz;
1288 
1289 	maxsz = ct->ct_tsdu;
1290 
1291 	/* Handle the non-blocking mode */
1292 	if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1293 		/*
1294 		 * Test a special case here. If the length of the current
1295 		 * write is greater than the transport data unit, and the
1296 		 * mode is non blocking, we return RPC_CANTSEND.
1297 		 * XXX  this is not very clean.
1298 		 */
1299 		if (maxsz > 0 && len > maxsz) {
1300 			rpc_callerr.re_terrno = errno;
1301 			rpc_callerr.re_errno = 0;
1302 			rpc_callerr.re_status = RPC_CANTSEND;
1303 			return (-1);
1304 		}
1305 
1306 		len = nb_send(ct, buf, (unsigned)len);
1307 		if (len == -1) {
1308 			rpc_callerr.re_terrno = errno;
1309 			rpc_callerr.re_errno = 0;
1310 			rpc_callerr.re_status = RPC_CANTSEND;
1311 		} else if (len == -2) {
1312 			rpc_callerr.re_terrno = 0;
1313 			rpc_callerr.re_errno = 0;
1314 			rpc_callerr.re_status = RPC_CANTSTORE;
1315 		}
1316 		return (len);
1317 	}
1318 
1319 	if ((maxsz == 0) || (maxsz == -1)) {
1320 		/*
1321 		 * T_snd may return -1 for error on connection (connection
1322 		 * needs to be repaired/closed, and -2 for flow-control
1323 		 * handling error (no operation to do, just wait and call
1324 		 * T_Flush()).
1325 		 */
1326 		if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) {
1327 			rpc_callerr.re_terrno = t_errno;
1328 			rpc_callerr.re_errno = 0;
1329 			rpc_callerr.re_status = RPC_CANTSEND;
1330 		}
1331 		return (len);
1332 	}
1333 
1334 	/*
1335 	 * This for those transports which have a max size for data.
1336 	 */
1337 	for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) {
1338 		flag = cnt > maxsz ? T_MORE : 0;
1339 		if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz),
1340 		    flag)) == -1) {
1341 			rpc_callerr.re_terrno = t_errno;
1342 			rpc_callerr.re_errno = 0;
1343 			rpc_callerr.re_status = RPC_CANTSEND;
1344 			return (-1);
1345 		}
1346 	}
1347 	return (len);
1348 }
1349 
1350 /*
1351  * Receive the required bytes of data, even if it is fragmented.
1352  */
1353 static int
1354 t_rcvall(int fd, char *buf, int len)
1355 {
1356 	int moreflag;
1357 	int final = 0;
1358 	int res;
1359 
1360 	do {
1361 		moreflag = 0;
1362 		res = t_rcv(fd, buf, (unsigned)len, &moreflag);
1363 		if (res == -1) {
1364 			if (t_errno == TLOOK)
1365 				switch (t_look(fd)) {
1366 				case T_DISCONNECT:
1367 					(void) t_rcvdis(fd, NULL);
1368 					(void) t_snddis(fd, NULL);
1369 					return (-1);
1370 				case T_ORDREL:
1371 				/* Received orderly release indication */
1372 					(void) t_rcvrel(fd);
1373 				/* Send orderly release indicator */
1374 					(void) t_sndrel(fd);
1375 					return (-1);
1376 				default:
1377 					return (-1);
1378 				}
1379 		} else if (res == 0) {
1380 			return (0);
1381 		}
1382 		final += res;
1383 		buf += res;
1384 		len -= res;
1385 	} while ((len > 0) && (moreflag & T_MORE));
1386 	return (final);
1387 }
1388 
1389 static struct clnt_ops *
1390 clnt_vc_ops(void)
1391 {
1392 	static struct clnt_ops ops;
1393 	extern mutex_t	ops_lock;
1394 
1395 	/* VARIABLES PROTECTED BY ops_lock: ops */
1396 
1397 	sig_mutex_lock(&ops_lock);
1398 	if (ops.cl_call == NULL) {
1399 		ops.cl_call = clnt_vc_call;
1400 		ops.cl_send = clnt_vc_send;
1401 		ops.cl_abort = clnt_vc_abort;
1402 		ops.cl_geterr = clnt_vc_geterr;
1403 		ops.cl_freeres = clnt_vc_freeres;
1404 		ops.cl_destroy = clnt_vc_destroy;
1405 		ops.cl_control = clnt_vc_control;
1406 	}
1407 	sig_mutex_unlock(&ops_lock);
1408 	return (&ops);
1409 }
1410 
1411 /*
1412  * Make sure that the time is not garbage.   -1 value is disallowed.
1413  * Note this is different from time_not_ok in clnt_dg.c
1414  */
1415 static bool_t
1416 time_not_ok(struct timeval *t)
1417 {
1418 	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
1419 	    t->tv_usec <= -1 || t->tv_usec > 1000000);
1420 }
1421 
1422 
1423 /* Compute the # of bytes that remains until the end of the buffer */
1424 #define	REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer))
1425 
1426 static int
1427 addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes)
1428 {
1429 	if (NULL == ct->ct_buffer) {
1430 		/* Buffer not allocated yet. */
1431 		char *buffer;
1432 
1433 		buffer = malloc(ct->ct_bufferSize);
1434 		if (NULL == buffer) {
1435 			errno = ENOMEM;
1436 			return (-1);
1437 		}
1438 		(void) memcpy(buffer, dataToAdd, nBytes);
1439 
1440 		ct->ct_buffer = buffer;
1441 		ct->ct_bufferReadPtr = buffer;
1442 		ct->ct_bufferWritePtr = buffer + nBytes;
1443 		ct->ct_bufferPendingSize = nBytes;
1444 	} else {
1445 		/*
1446 		 * For an already allocated buffer, two mem copies
1447 		 * might be needed, depending on the current
1448 		 * writing position.
1449 		 */
1450 
1451 		/* Compute the length of the first copy. */
1452 		int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr));
1453 
1454 		ct->ct_bufferPendingSize += nBytes;
1455 
1456 		(void) memcpy(ct->ct_bufferWritePtr, dataToAdd, len);
1457 		ct->ct_bufferWritePtr += len;
1458 		nBytes -= len;
1459 		if (0 == nBytes) {
1460 			/* One memcopy needed. */
1461 
1462 			/*
1463 			 * If the write pointer is at the end of the buffer,
1464 			 * wrap it now.
1465 			 */
1466 			if (ct->ct_bufferWritePtr ==
1467 			    (ct->ct_buffer + ct->ct_bufferSize)) {
1468 				ct->ct_bufferWritePtr = ct->ct_buffer;
1469 			}
1470 		} else {
1471 			/* Two memcopy needed. */
1472 			dataToAdd += len;
1473 
1474 			/*
1475 			 * Copy the remaining data to the beginning of the
1476 			 * buffer
1477 			 */
1478 			(void) memcpy(ct->ct_buffer, dataToAdd, nBytes);
1479 			ct->ct_bufferWritePtr = ct->ct_buffer + nBytes;
1480 		}
1481 	}
1482 	return (0);
1483 }
1484 
1485 static void
1486 consumeFromBuffer(struct ct_data *ct, unsigned int nBytes)
1487 {
1488 	ct->ct_bufferPendingSize -= nBytes;
1489 	if (ct->ct_bufferPendingSize == 0) {
1490 		/*
1491 		 * If the buffer contains no data, we set the two pointers at
1492 		 * the beginning of the buffer (to miminize buffer wraps).
1493 		 */
1494 		ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer;
1495 	} else {
1496 		ct->ct_bufferReadPtr += nBytes;
1497 		if (ct->ct_bufferReadPtr >
1498 		    ct->ct_buffer + ct->ct_bufferSize) {
1499 			ct->ct_bufferReadPtr -= ct->ct_bufferSize;
1500 		}
1501 	}
1502 }
1503 
1504 static int
1505 iovFromBuffer(struct ct_data *ct, struct iovec *iov)
1506 {
1507 	int l;
1508 
1509 	if (ct->ct_bufferPendingSize == 0)
1510 		return (0);
1511 
1512 	l = REMAIN_BYTES(bufferReadPtr);
1513 	if (l < ct->ct_bufferPendingSize) {
1514 		/* Buffer in two fragments. */
1515 		iov[0].iov_base = ct->ct_bufferReadPtr;
1516 		iov[0].iov_len  = l;
1517 
1518 		iov[1].iov_base = ct->ct_buffer;
1519 		iov[1].iov_len  = ct->ct_bufferPendingSize - l;
1520 		return (2);
1521 	} else {
1522 		/* Buffer in one fragment. */
1523 		iov[0].iov_base = ct->ct_bufferReadPtr;
1524 		iov[0].iov_len  = ct->ct_bufferPendingSize;
1525 		return (1);
1526 	}
1527 }
1528 
1529 static bool_t
1530 set_flush_mode(struct ct_data *ct, int mode)
1531 {
1532 	switch (mode) {
1533 	case RPC_CL_BLOCKING_FLUSH:
1534 		/* flush as most as possible without blocking */
1535 	case RPC_CL_BESTEFFORT_FLUSH:
1536 		/* flush the buffer completely (possibly blocking) */
1537 	case RPC_CL_DEFAULT_FLUSH:
1538 		/* flush according to the currently defined policy */
1539 		ct->ct_blocking_mode = mode;
1540 		return (TRUE);
1541 	default:
1542 		return (FALSE);
1543 	}
1544 }
1545 
1546 static bool_t
1547 set_io_mode(struct ct_data *ct, int ioMode)
1548 {
1549 	switch (ioMode) {
1550 	case RPC_CL_BLOCKING:
1551 		if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1552 			if (NULL != ct->ct_buffer) {
1553 				/*
1554 				 * If a buffer was allocated for this
1555 				 * connection, flush it now, and free it.
1556 				 */
1557 				(void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1558 				free(ct->ct_buffer);
1559 				ct->ct_buffer = NULL;
1560 			}
1561 			(void) unregister_nb(ct);
1562 			ct->ct_io_mode = ioMode;
1563 		}
1564 		break;
1565 	case RPC_CL_NONBLOCKING:
1566 		if (ct->ct_io_mode == RPC_CL_BLOCKING) {
1567 			if (-1 == register_nb(ct)) {
1568 				return (FALSE);
1569 			}
1570 			ct->ct_io_mode = ioMode;
1571 		}
1572 		break;
1573 	default:
1574 		return (FALSE);
1575 	}
1576 	return (TRUE);
1577 }
1578 
1579 static int
1580 do_flush(struct ct_data *ct, uint_t flush_mode)
1581 {
1582 	int result;
1583 	if (ct->ct_bufferPendingSize == 0) {
1584 		return (0);
1585 	}
1586 
1587 	switch (flush_mode) {
1588 	case RPC_CL_BLOCKING_FLUSH:
1589 		if (!set_blocking_connection(ct, TRUE)) {
1590 			return (-1);
1591 		}
1592 		while (ct->ct_bufferPendingSize > 0) {
1593 			if (REMAIN_BYTES(bufferReadPtr) <
1594 			    ct->ct_bufferPendingSize) {
1595 				struct iovec iov[2];
1596 				(void) iovFromBuffer(ct, iov);
1597 				result = writev(ct->ct_fd, iov, 2);
1598 			} else {
1599 				result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1600 				    ct->ct_bufferPendingSize, 0);
1601 			}
1602 			if (result < 0) {
1603 				return (-1);
1604 			}
1605 			consumeFromBuffer(ct, result);
1606 		}
1607 
1608 		break;
1609 
1610 	case RPC_CL_BESTEFFORT_FLUSH:
1611 		(void) set_blocking_connection(ct, FALSE);
1612 		if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) {
1613 			struct iovec iov[2];
1614 			(void) iovFromBuffer(ct, iov);
1615 			result = writev(ct->ct_fd, iov, 2);
1616 		} else {
1617 			result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1618 			    ct->ct_bufferPendingSize, 0);
1619 		}
1620 		if (result < 0) {
1621 			if (errno != EWOULDBLOCK) {
1622 				perror("flush");
1623 				return (-1);
1624 			}
1625 			return (0);
1626 		}
1627 		if (result > 0)
1628 			consumeFromBuffer(ct, result);
1629 		break;
1630 	}
1631 	return (0);
1632 }
1633 
1634 /*
1635  * Non blocking send.
1636  */
1637 
1638 /*
1639  * Test if this is last fragment. See comment in front of xdr_rec.c
1640  * for details.
1641  */
1642 #define	LAST_FRAG(x)	((ntohl(*(uint32_t *)x) & (1U << 31)) == (1U << 31))
1643 
1644 static int
1645 nb_send(struct ct_data *ct, void *buff, unsigned int nBytes)
1646 {
1647 	int result;
1648 
1649 	if (!LAST_FRAG(buff)) {
1650 		return (-1);
1651 	}
1652 
1653 	/*
1654 	 * Check to see if the current message can be stored fully in the
1655 	 * buffer. We have to check this now because it may be impossible
1656 	 * to send any data, so the message must be stored in the buffer.
1657 	 */
1658 	if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) {
1659 		/* Try to flush  (to free some space). */
1660 		(void) do_flush(ct, RPC_CL_BESTEFFORT_FLUSH);
1661 
1662 		/* Can we store the message now ? */
1663 		if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize))
1664 			return (-2);
1665 	}
1666 
1667 	(void) set_blocking_connection(ct, FALSE);
1668 
1669 	/*
1670 	 * If there is no data pending, we can simply try
1671 	 * to send our data.
1672 	 */
1673 	if (ct->ct_bufferPendingSize == 0) {
1674 		result = t_snd(ct->ct_fd, buff, nBytes, 0);
1675 		if (result == -1) {
1676 			if (errno == EWOULDBLOCK) {
1677 				result = 0;
1678 			} else {
1679 				perror("send");
1680 				return (-1);
1681 			}
1682 		}
1683 		/*
1684 		 * If we have not sent all data, we must store them
1685 		 * in the buffer.
1686 		 */
1687 		if (result != nBytes) {
1688 			if (addInBuffer(ct, (char *)buff + result,
1689 			    nBytes - result) == -1) {
1690 				return (-1);
1691 			}
1692 		}
1693 	} else {
1694 		/*
1695 		 * Some data pending in the buffer.  We try to send
1696 		 * both buffer data and current message in one shot.
1697 		 */
1698 		struct iovec iov[3];
1699 		int i = iovFromBuffer(ct, &iov[0]);
1700 
1701 		iov[i].iov_base = buff;
1702 		iov[i].iov_len  = nBytes;
1703 
1704 		result = writev(ct->ct_fd, iov, i+1);
1705 		if (result == -1) {
1706 			if (errno == EWOULDBLOCK) {
1707 				/* No bytes sent */
1708 				result = 0;
1709 			} else {
1710 				return (-1);
1711 			}
1712 		}
1713 
1714 		/*
1715 		 * Add the bytes from the message
1716 		 * that we have not sent.
1717 		 */
1718 		if (result <= ct->ct_bufferPendingSize) {
1719 			/* No bytes from the message sent */
1720 			consumeFromBuffer(ct, result);
1721 			if (addInBuffer(ct, buff, nBytes) == -1) {
1722 				return (-1);
1723 			}
1724 		} else {
1725 			/*
1726 			 * Some bytes of the message are sent.
1727 			 * Compute the length of the message that has
1728 			 * been sent.
1729 			 */
1730 			int len = result - ct->ct_bufferPendingSize;
1731 
1732 			/* So, empty the buffer. */
1733 			ct->ct_bufferReadPtr = ct->ct_buffer;
1734 			ct->ct_bufferWritePtr = ct->ct_buffer;
1735 			ct->ct_bufferPendingSize = 0;
1736 
1737 			/* And add the remaining part of the message. */
1738 			if (len != nBytes) {
1739 				if (addInBuffer(ct, (char *)buff + len,
1740 				    nBytes-len) == -1) {
1741 					return (-1);
1742 				}
1743 			}
1744 		}
1745 	}
1746 	return (nBytes);
1747 }
1748 
1749 static void
1750 flush_registered_clients(void)
1751 {
1752 	struct nb_reg_node *node;
1753 
1754 	if (LIST_ISEMPTY(nb_first)) {
1755 		return;
1756 	}
1757 
1758 	LIST_FOR_EACH(nb_first, node) {
1759 		(void) do_flush(node->ct, RPC_CL_BLOCKING_FLUSH);
1760 	}
1761 }
1762 
1763 static int
1764 allocate_chunk(void)
1765 {
1766 #define	CHUNK_SIZE 16
1767 	struct nb_reg_node *chk =
1768 	    malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE);
1769 	struct nb_reg_node *n;
1770 	int i;
1771 
1772 	if (NULL == chk) {
1773 		return (-1);
1774 	}
1775 
1776 	n = chk;
1777 	for (i = 0; i < CHUNK_SIZE-1; ++i) {
1778 		n[i].next = &(n[i+1]);
1779 	}
1780 	n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free;
1781 	nb_free = chk;
1782 	return (0);
1783 }
1784 
1785 static int
1786 register_nb(struct ct_data *ct)
1787 {
1788 	struct nb_reg_node *node;
1789 
1790 	(void) mutex_lock(&nb_list_mutex);
1791 
1792 	if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) {
1793 		(void) mutex_unlock(&nb_list_mutex);
1794 		errno = ENOMEM;
1795 		return (-1);
1796 	}
1797 
1798 	if (!exit_handler_set) {
1799 		(void) atexit(flush_registered_clients);
1800 		exit_handler_set = TRUE;
1801 	}
1802 	/* Get the first free node */
1803 	LIST_EXTRACT(nb_free, node);
1804 
1805 	node->ct = ct;
1806 
1807 	LIST_ADD(nb_first, node);
1808 	(void) mutex_unlock(&nb_list_mutex);
1809 
1810 	return (0);
1811 }
1812 
1813 static int
1814 unregister_nb(struct ct_data *ct)
1815 {
1816 	struct nb_reg_node *node;
1817 
1818 	(void) mutex_lock(&nb_list_mutex);
1819 	assert(!LIST_ISEMPTY(nb_first));
1820 
1821 	node = nb_first;
1822 	LIST_FOR_EACH(nb_first, node) {
1823 		if (node->next->ct == ct) {
1824 			/* Get the node to unregister. */
1825 			struct nb_reg_node *n = node->next;
1826 			node->next = n->next;
1827 
1828 			n->ct = NULL;
1829 			LIST_ADD(nb_free, n);
1830 			break;
1831 		}
1832 	}
1833 	(void) mutex_unlock(&nb_list_mutex);
1834 	return (0);
1835 }
1836