xref: /titanic_50/usr/src/lib/libnsl/rpc/clnt_vc.c (revision 8eea8e29cc4374d1ee24c25a07f45af132db3499)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
27 /* All Rights Reserved */
28 /*
29  * Portions of this source code were derived from Berkeley
30  * 4.3 BSD under license from the Regents of the University of
31  * California.
32  */
33 
34 #pragma ident	"%Z%%M%	%I%	%E% SMI"
35 
36 /*
37  * clnt_vc.c
38  *
39  * Implements a connectionful client side RPC.
40  *
41  * Connectionful RPC supports 'batched calls'.
42  * A sequence of calls may be batched-up in a send buffer. The rpc call
43  * return immediately to the client even though the call was not necessarily
44  * sent. The batching occurs if the results' xdr routine is NULL (0) AND
45  * the rpc timeout value is zero (see clnt.h, rpc).
46  *
47  * Clients should NOT casually batch calls that in fact return results; that
48  * is the server side should be aware that a call is batched and not produce
49  * any return message. Batched calls that produce many result messages can
50  * deadlock (netlock) the client and the server....
51  */
52 
53 
54 #include "mt.h"
55 #include "rpc_mt.h"
56 #include <assert.h>
57 #include <rpc/rpc.h>
58 #include <rpc/trace.h>
59 #include <errno.h>
60 #include <sys/byteorder.h>
61 #include <sys/mkdev.h>
62 #include <sys/poll.h>
63 #include <syslog.h>
64 #include <stdlib.h>
65 #include <unistd.h>
66 #include <netinet/tcp.h>
67 
68 #define	MCALL_MSG_SIZE 24
69 #define	SECS_TO_MS 1000
70 #define	USECS_TO_MS 1/1000
71 #ifndef MIN
72 #define	MIN(a, b)	(((a) < (b)) ? (a) : (b))
73 #endif
74 
75 extern int __rpc_timeval_to_msec();
76 extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *);
77 extern bool_t xdr_opaque_auth();
78 extern bool_t __rpc_gss_wrap();
79 extern bool_t __rpc_gss_unwrap();
80 
81 CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t,
82 		rpcvers_t, uint_t, uint_t, const struct timeval *);
83 
84 static struct clnt_ops	*clnt_vc_ops();
85 #ifdef __STDC__
86 static int		read_vc(void *, caddr_t, int);
87 static int		write_vc(void *, caddr_t, int);
88 #else
89 static int		read_vc();
90 static int		write_vc();
91 #endif
92 static int		t_rcvall();
93 static bool_t		time_not_ok();
94 static bool_t		set_up_connection();
95 
96 struct ct_data;
97 static bool_t		set_io_mode(struct ct_data *ct, int ioMode);
98 
99 /*
100  * Lock table handle used by various MT sync. routines
101  */
102 static mutex_t	vctbl_lock = DEFAULTMUTEX;
103 static void	*vctbl = NULL;
104 
105 static const char clnt_vc_errstr[] = "%s : %s";
106 static const char clnt_vc_str[] = "clnt_vc_create";
107 static const char clnt_read_vc_str[] = "read_vc";
108 static const char __no_mem_str[] = "out of memory";
109 static const char no_fcntl_getfl_str[] = "could not get status flags and modes";
110 static const char no_nonblock_str[] = "could not set transport blocking mode";
111 
112 /*
113  * Private data structure
114  */
115 struct ct_data {
116 	int		ct_fd;		/* connection's fd */
117 	bool_t		ct_closeit;	/* close it on destroy */
118 	int		ct_tsdu;	/* size of tsdu */
119 	int		ct_wait;	/* wait interval in milliseconds */
120 	bool_t		ct_waitset;	/* wait set by clnt_control? */
121 	struct netbuf	ct_addr;	/* remote addr */
122 	struct rpc_err	ct_error;
123 	char		ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */
124 	uint_t		ct_mpos;	/* pos after marshal */
125 	XDR		ct_xdrs;	/* XDR stream */
126 
127 	/* NON STANDARD INFO - 00-08-31 */
128 	bool_t		ct_is_oneway; /* True if the current call is oneway. */
129 	bool_t		ct_is_blocking;
130 	ushort_t	ct_io_mode;
131 	ushort_t	ct_blocking_mode;
132 	uint_t		ct_bufferSize; /* Total size of the buffer. */
133 	uint_t		ct_bufferPendingSize; /* Size of unsent data. */
134 	char 		*ct_buffer; /* Pointer to the buffer. */
135 	char 		*ct_bufferWritePtr; /* Ptr to the first free byte. */
136 	char 		*ct_bufferReadPtr; /* Ptr to the first byte of data. */
137 };
138 
139 struct nb_reg_node {
140 	struct nb_reg_node *next;
141 	struct ct_data *ct;
142 };
143 
144 static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first;
145 static struct nb_reg_node *nb_free  = (struct nb_reg_node *)&nb_free;
146 
147 static bool_t exit_handler_set = FALSE;
148 
149 static mutex_t nb_list_mutex = DEFAULTMUTEX;
150 
151 
152 /* Define some macros to manage the linked list. */
153 #define	LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l)
154 #define	LIST_CLR(l) (l = (struct nb_reg_node *)&l)
155 #define	LIST_ADD(l, node) (node->next = l->next, l = node)
156 #define	LIST_EXTRACT(l, node) (node = l, l = l->next)
157 #define	LIST_FOR_EACH(l, node) \
158 	for (node = l; node != (struct nb_reg_node *)&l; node = node->next)
159 
160 
161 /* Default size of the IO buffer used in non blocking mode */
162 #define	DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024)
163 
164 static int nb_send(struct ct_data *ct, void *buff,
165     unsigned  int  nbytes);
166 
167 static int do_flush(struct ct_data *ct, uint_t flush_mode);
168 
169 static bool_t set_flush_mode(struct ct_data *ct,
170     int P_mode);
171 
172 static bool_t set_blocking_connection(struct ct_data *ct,
173     bool_t blocking);
174 
175 static int register_nb(struct ct_data *ct);
176 static int unregister_nb(struct ct_data *ct);
177 
178 
179 /*
180  * Change the mode of the underlying fd.
181  */
182 static bool_t
183 set_blocking_connection(struct ct_data *ct, bool_t blocking)
184 {
185 	int flag;
186 
187 	/*
188 	 * If the underlying fd is already in the required mode,
189 	 * avoid the syscall.
190 	 */
191 	if (ct->ct_is_blocking == blocking)
192 		return (TRUE);
193 
194 	if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) {
195 		(void) syslog(LOG_ERR, "set_blocking_connection : %s",
196 		    no_fcntl_getfl_str);
197 		return (FALSE);
198 	}
199 
200 	flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK;
201 	if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) {
202 		(void) syslog(LOG_ERR, "set_blocking_connection : %s",
203 		    no_nonblock_str);
204 		return (FALSE);
205 	}
206 	ct->ct_is_blocking = blocking;
207 	return (TRUE);
208 }
209 
210 /*
211  * Create a client handle for a connection.
212  * Default options are set, which the user can change using clnt_control()'s.
213  * The rpc/vc package does buffering similar to stdio, so the client
214  * must pick send and receive buffer sizes, 0 => use the default.
215  * NB: fd is copied into a private area.
216  * NB: The rpch->cl_auth is set null authentication. Caller may wish to
217  * set this something more useful.
218  *
219  * fd should be open and bound.
220  */
221 CLIENT *
222 clnt_vc_create(fd, svcaddr, prog, vers, sendsz, recvsz)
223 	int fd;				/* open file descriptor */
224 	struct netbuf *svcaddr;		/* servers address */
225 	rpcprog_t prog;			/* program number */
226 	rpcvers_t vers;			/* version number */
227 	uint_t sendsz;			/* buffer recv size */
228 	uint_t recvsz;			/* buffer send size */
229 {
230 	return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz,
231 			recvsz, NULL));
232 }
233 
234 /*
235  * This has the same definition as clnt_vc_create(), except it
236  * takes an additional parameter - a pointer to a timeval structure.
237  *
238  * Not a public interface. This is for clnt_create_timed,
239  * clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout
240  * value to control a tcp connection attempt.
241  * (for bug 4049792: clnt_create_timed does not time out)
242  *
243  * If tp is NULL, use default timeout to set up the connection.
244  */
245 CLIENT *
246 _clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz, recvsz, tp)
247 	int fd;				/* open file descriptor */
248 	struct netbuf *svcaddr;		/* servers address */
249 	rpcprog_t prog;			/* program number */
250 	rpcvers_t vers;			/* version number */
251 	uint_t sendsz;			/* buffer recv size */
252 	uint_t recvsz;			/* buffer send size */
253 	const struct timeval *tp;	/* connection timeout */
254 {
255 	CLIENT *cl;			/* client handle */
256 	struct ct_data *ct;		/* private data */
257 	struct timeval now;
258 	struct rpc_msg call_msg;
259 	struct t_info tinfo;
260 	int flag;
261 
262 	trace5(TR_clnt_vc_create, 0, prog, vers, sendsz, recvsz);
263 
264 	cl = (CLIENT *)mem_alloc(sizeof (*cl));
265 	ct = (struct ct_data *)mem_alloc(sizeof (*ct));
266 	if ((cl == (CLIENT *)NULL) || (ct == (struct ct_data *)NULL)) {
267 		(void) syslog(LOG_ERR, clnt_vc_errstr,
268 				clnt_vc_str, __no_mem_str);
269 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
270 		rpc_createerr.cf_error.re_errno = errno;
271 		rpc_createerr.cf_error.re_terrno = 0;
272 		goto err;
273 	}
274 	ct->ct_addr.buf = NULL;
275 
276 	sig_mutex_lock(&vctbl_lock);
277 
278 	if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) {
279 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
280 		rpc_createerr.cf_error.re_errno = errno;
281 		rpc_createerr.cf_error.re_terrno = 0;
282 		sig_mutex_unlock(&vctbl_lock);
283 		goto err;
284 	}
285 
286 	ct->ct_io_mode = RPC_CL_BLOCKING;
287 	ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH;
288 
289 	ct->ct_buffer = NULL;	/* We allocate the buffer when needed. */
290 	ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE;
291 	ct->ct_bufferPendingSize = 0;
292 	ct->ct_bufferWritePtr = NULL;
293 	ct->ct_bufferReadPtr = NULL;
294 
295 	/* Check the current state of the fd. */
296 	if ((flag = fcntl(fd, F_GETFL, 0)) < 0) {
297 		(void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s",
298 		    no_fcntl_getfl_str);
299 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
300 		rpc_createerr.cf_error.re_terrno = errno;
301 		rpc_createerr.cf_error.re_errno = 0;
302 		sig_mutex_unlock(&vctbl_lock);
303 		goto err;
304 	}
305 	ct->ct_is_blocking = flag&O_NONBLOCK? FALSE:TRUE;
306 
307 	if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) {
308 		sig_mutex_unlock(&vctbl_lock);
309 		goto err;
310 	}
311 	sig_mutex_unlock(&vctbl_lock);
312 
313 	/*
314 	 * Set up other members of private data struct
315 	 */
316 	ct->ct_fd = fd;
317 	/*
318 	 * The actual value will be set by clnt_call or clnt_control
319 	 */
320 	ct->ct_wait = 30000;
321 	ct->ct_waitset = FALSE;
322 	/*
323 	 * By default, closeit is always FALSE. It is users responsibility
324 	 * to do a t_close on it, else the user may use clnt_control
325 	 * to let clnt_destroy do it for him/her.
326 	 */
327 	ct->ct_closeit = FALSE;
328 
329 	/*
330 	 * Initialize call message
331 	 */
332 	(void) gettimeofday(&now, (struct timezone *)0);
333 	call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
334 	call_msg.rm_call.cb_prog = prog;
335 	call_msg.rm_call.cb_vers = vers;
336 
337 	/*
338 	 * pre-serialize the static part of the call msg and stash it away
339 	 */
340 	xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE);
341 	if (! xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
342 		goto err;
343 	}
344 	ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
345 	XDR_DESTROY(&(ct->ct_xdrs));
346 
347 	if (t_getinfo(fd, &tinfo) == -1) {
348 		rpc_createerr.cf_stat = RPC_TLIERROR;
349 		rpc_createerr.cf_error.re_terrno = t_errno;
350 		rpc_createerr.cf_error.re_errno = 0;
351 		goto err;
352 	}
353 	/*
354 	 * Find the receive and the send size
355 	 */
356 	sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu);
357 	recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu);
358 	if ((sendsz == 0) || (recvsz == 0)) {
359 		rpc_createerr.cf_stat = RPC_TLIERROR;
360 		rpc_createerr.cf_error.re_terrno = 0;
361 		rpc_createerr.cf_error.re_errno = 0;
362 		goto err;
363 	}
364 	ct->ct_tsdu = tinfo.tsdu;
365 	/*
366 	 * Create a client handle which uses xdrrec for serialization
367 	 * and authnone for authentication.
368 	 */
369 	ct->ct_xdrs.x_ops = NULL;
370 	xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct,
371 			read_vc, write_vc);
372 	if (ct->ct_xdrs.x_ops == NULL) {
373 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
374 		rpc_createerr.cf_error.re_terrno = 0;
375 		rpc_createerr.cf_error.re_errno = ENOMEM;
376 		goto err;
377 	}
378 	cl->cl_ops = clnt_vc_ops();
379 	cl->cl_private = (caddr_t)ct;
380 	cl->cl_auth = authnone_create();
381 	cl->cl_tp = (char *)NULL;
382 	cl->cl_netid = (char *)NULL;
383 	trace3(TR_clnt_vc_create, 1, prog, vers);
384 	return (cl);
385 
386 err:
387 	if (cl) {
388 		if (ct) {
389 			if (ct->ct_addr.len)
390 				mem_free(ct->ct_addr.buf, ct->ct_addr.len);
391 			mem_free((caddr_t)ct, sizeof (struct ct_data));
392 		}
393 		mem_free((caddr_t)cl, sizeof (CLIENT));
394 	}
395 	trace3(TR_clnt_vc_create, 1, prog, vers);
396 	return ((CLIENT *)NULL);
397 }
398 
399 #define	TCPOPT_BUFSIZE 128
400 
401 /*
402  * Set tcp connection timeout value.
403  * Retun 0 for success, -1 for failure.
404  */
405 static int
406 _set_tcp_conntime(int fd, int optval)
407 {
408 	struct t_optmgmt req, res;
409 	struct opthdr *opt;
410 	int *ip;
411 	char buf[TCPOPT_BUFSIZE];
412 
413 	opt = (struct opthdr *)buf;
414 	opt->level =  IPPROTO_TCP;
415 	opt->name = TCP_CONN_ABORT_THRESHOLD;
416 	opt->len = sizeof (int);
417 
418 	req.flags = T_NEGOTIATE;
419 	req.opt.len = sizeof (struct opthdr) + opt->len;
420 	req.opt.buf = (char *)opt;
421 	ip = (int *)((char *)buf + sizeof (struct opthdr));
422 	*ip = optval;
423 
424 	res.flags = 0;
425 	res.opt.buf = (char *)buf;
426 	res.opt.maxlen = sizeof (buf);
427 	if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
428 		return (-1);
429 	}
430 	return (0);
431 }
432 
433 /*
434  * Get current tcp connection timeout value.
435  * Retun 0 for success, -1 for failure.
436  */
437 static int
438 _get_tcp_conntime(int fd)
439 {
440 	struct t_optmgmt req, res;
441 	struct opthdr *opt;
442 	int *ip, retval;
443 	char buf[TCPOPT_BUFSIZE];
444 
445 	opt = (struct opthdr *)buf;
446 	opt->level =  IPPROTO_TCP;
447 	opt->name = TCP_CONN_ABORT_THRESHOLD;
448 	opt->len = sizeof (int);
449 
450 	req.flags = T_CURRENT;
451 	req.opt.len = sizeof (struct opthdr) + opt->len;
452 	req.opt.buf = (char *)opt;
453 	ip = (int *)((char *)buf + sizeof (struct opthdr));
454 	*ip = 0;
455 
456 	res.flags = 0;
457 	res.opt.buf = (char *)buf;
458 	res.opt.maxlen = sizeof (buf);
459 	if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
460 		return (-1);
461 	}
462 
463 	ip = (int *)((char *)buf + sizeof (struct opthdr));
464 	retval = *ip;
465 	return (retval);
466 }
467 
468 static bool_t
469 set_up_connection(fd, svcaddr, ct, tp)
470 	int fd;
471 	struct netbuf *svcaddr;		/* servers address */
472 	struct ct_data *ct;
473 	struct timeval *tp;
474 {
475 	int state;
476 	struct t_call sndcallstr, *rcvcall;
477 	int nconnect;
478 	bool_t connected, do_rcv_connect;
479 	int curr_time = 0;
480 
481 	ct->ct_addr.len = 0;
482 	state = t_getstate(fd);
483 	if (state == -1) {
484 		rpc_createerr.cf_stat = RPC_TLIERROR;
485 		rpc_createerr.cf_error.re_errno = 0;
486 		rpc_createerr.cf_error.re_terrno = t_errno;
487 		return (FALSE);
488 	}
489 
490 #ifdef DEBUG
491 	fprintf(stderr, "set_up_connection: state = %d\n", state);
492 #endif
493 	switch (state) {
494 	case T_IDLE:
495 		if (svcaddr == (struct netbuf *)NULL) {
496 			rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
497 			return (FALSE);
498 		}
499 		/*
500 		 * Connect only if state is IDLE and svcaddr known
501 		 */
502 /* LINTED pointer alignment */
503 		rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR);
504 		if (rcvcall == NULL) {
505 			rpc_createerr.cf_stat = RPC_TLIERROR;
506 			rpc_createerr.cf_error.re_terrno = t_errno;
507 			rpc_createerr.cf_error.re_errno = errno;
508 			return (FALSE);
509 		}
510 		rcvcall->udata.maxlen = 0;
511 		sndcallstr.addr = *svcaddr;
512 		sndcallstr.opt.len = 0;
513 		sndcallstr.udata.len = 0;
514 		/*
515 		 * Even NULL could have sufficed for rcvcall, because
516 		 * the address returned is same for all cases except
517 		 * for the gateway case, and hence required.
518 		 */
519 		connected = FALSE;
520 		do_rcv_connect = FALSE;
521 
522 		/*
523 		 * If there is a timeout value specified, we will try to
524 		 * reset the tcp connection timeout. If the transport does
525 		 * not support the TCP_CONN_ABORT_THRESHOLD option or fails
526 		 * for other reason, default timeout will be used.
527 		 */
528 		if (tp != NULL) {
529 		    int ms;
530 
531 		    /* TCP_CONN_ABORT_THRESHOLD takes int value in millisecs */
532 		    ms = tp->tv_sec * SECS_TO_MS + tp->tv_usec * USECS_TO_MS;
533 		    if (((curr_time = _get_tcp_conntime(fd)) != -1) &&
534 			(_set_tcp_conntime(fd, ms) == 0)) {
535 #ifdef DEBUG
536 			fprintf(stderr, "set_up_connection: set tcp ");
537 			fprintf(stderr, "connection timeout to %d ms\n", ms);
538 #endif
539 		    }
540 		}
541 
542 		for (nconnect = 0; nconnect < 3; nconnect++) {
543 			if (t_connect(fd, &sndcallstr, rcvcall) != -1) {
544 				connected = TRUE;
545 				break;
546 			}
547 			if (!(t_errno == TSYSERR && errno == EINTR)) {
548 				break;
549 			}
550 			if ((state = t_getstate(fd)) == T_OUTCON) {
551 				do_rcv_connect = TRUE;
552 				break;
553 			}
554 			if (state != T_IDLE) {
555 				break;
556 			}
557 		}
558 		if (do_rcv_connect) {
559 			do {
560 				if (t_rcvconnect(fd, rcvcall) != -1) {
561 					connected = TRUE;
562 					break;
563 				}
564 			} while (t_errno == TSYSERR && errno == EINTR);
565 		}
566 
567 		/*
568 		 * Set the connection timeout back to its old value.
569 		 */
570 		if (curr_time) {
571 			_set_tcp_conntime(fd, curr_time);
572 		}
573 
574 		if (!connected) {
575 			rpc_createerr.cf_stat = RPC_TLIERROR;
576 			rpc_createerr.cf_error.re_terrno = t_errno;
577 			rpc_createerr.cf_error.re_errno = errno;
578 			(void) t_free((char *)rcvcall, T_CALL);
579 #ifdef DEBUG
580 			fprintf(stderr, "clnt_vc: t_connect error %d\n",
581 				rpc_createerr.cf_error.re_terrno);
582 #endif
583 			return (FALSE);
584 		}
585 
586 		/* Free old area if allocated */
587 		if (ct->ct_addr.buf)
588 			free(ct->ct_addr.buf);
589 		ct->ct_addr = rcvcall->addr;	/* To get the new address */
590 		/* So that address buf does not get freed */
591 		rcvcall->addr.buf = NULL;
592 		(void) t_free((char *)rcvcall, T_CALL);
593 		break;
594 	case T_DATAXFER:
595 	case T_OUTCON:
596 		if (svcaddr == (struct netbuf *)NULL) {
597 			/*
598 			 * svcaddr could also be NULL in cases where the
599 			 * client is already bound and connected.
600 			 */
601 			ct->ct_addr.len = 0;
602 		} else {
603 			ct->ct_addr.buf = malloc(svcaddr->len);
604 			if (ct->ct_addr.buf == (char *)NULL) {
605 				(void) syslog(LOG_ERR, clnt_vc_errstr,
606 					clnt_vc_str, __no_mem_str);
607 				rpc_createerr.cf_stat = RPC_SYSTEMERROR;
608 				rpc_createerr.cf_error.re_errno = errno;
609 				rpc_createerr.cf_error.re_terrno = 0;
610 				return (FALSE);
611 			}
612 			(void) memcpy(ct->ct_addr.buf, svcaddr->buf,
613 					(int)svcaddr->len);
614 			ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len;
615 		}
616 		break;
617 	default:
618 		rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
619 		return (FALSE);
620 	}
621 	return (TRUE);
622 }
623 
624 static enum clnt_stat
625 clnt_vc_call(cl, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout)
626 	CLIENT *cl;
627 	rpcproc_t proc;
628 	xdrproc_t xdr_args;
629 	caddr_t args_ptr;
630 	xdrproc_t xdr_results;
631 	caddr_t results_ptr;
632 	struct timeval timeout;
633 {
634 /* LINTED pointer alignment */
635 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
636 	XDR *xdrs = &(ct->ct_xdrs);
637 	struct rpc_msg reply_msg;
638 	uint32_t x_id;
639 /* LINTED pointer alignment */
640 	uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);	/* yuk */
641 	bool_t shipnow;
642 	int refreshes = 2;
643 
644 	trace3(TR_clnt_vc_call, 0, cl, proc);
645 
646 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
647 		rpc_callerr.re_status = RPC_FAILED;
648 		rpc_callerr.re_errno = errno;
649 		rpc_fd_unlock(vctbl, ct->ct_fd);
650 		return (RPC_FAILED);
651 	}
652 
653 	ct->ct_is_oneway = FALSE;
654 	if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
655 		if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) {
656 			rpc_fd_unlock(vctbl, ct->ct_fd);
657 			return (RPC_FAILED);  /* XXX */
658 		}
659 	}
660 
661 	if (!ct->ct_waitset) {
662 		/* If time is not within limits, we ignore it. */
663 		if (time_not_ok(&timeout) == FALSE)
664 			ct->ct_wait = __rpc_timeval_to_msec(&timeout);
665 	} else {
666 		timeout.tv_sec = (ct->ct_wait / 1000);
667 		timeout.tv_usec = (ct->ct_wait % 1000) * 1000;
668 	}
669 
670 	shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) &&
671 	    (timeout.tv_usec == 0)) ? FALSE : TRUE;
672 call_again:
673 	xdrs->x_op = XDR_ENCODE;
674 	rpc_callerr.re_status = RPC_SUCCESS;
675 	/*
676 	 * Due to little endian byte order, it is necessary to convert to host
677 	 * format before decrementing xid.
678 	 */
679 	x_id = ntohl(*msg_x_id) - 1;
680 	*msg_x_id = htonl(x_id);
681 
682 	if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
683 		if ((! XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
684 		    (! XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
685 		    (! AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
686 		    (! xdr_args(xdrs, args_ptr))) {
687 			if (rpc_callerr.re_status == RPC_SUCCESS)
688 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
689 			(void) xdrrec_endofrecord(xdrs, TRUE);
690 			rpc_fd_unlock(vctbl, ct->ct_fd);
691 			trace3(TR_clnt_vc_call, 1, cl, proc);
692 			return (rpc_callerr.re_status);
693 		}
694 	} else {
695 /* LINTED pointer alignment */
696 		uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
697 		IXDR_PUT_U_INT32(u, proc);
698 		if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
699 		    ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
700 			if (rpc_callerr.re_status == RPC_SUCCESS)
701 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
702 			(void) xdrrec_endofrecord(xdrs, TRUE);
703 			rpc_fd_unlock(vctbl, ct->ct_fd);
704 			trace3(TR_clnt_vc_call, 1, cl, proc);
705 			return (rpc_callerr.re_status);
706 		}
707 	}
708 	if (! xdrrec_endofrecord(xdrs, shipnow)) {
709 		rpc_fd_unlock(vctbl, ct->ct_fd);
710 		trace3(TR_clnt_vc_call, 1, cl, proc);
711 		return (rpc_callerr.re_status = RPC_CANTSEND);
712 	}
713 	if (! shipnow) {
714 		rpc_fd_unlock(vctbl, ct->ct_fd);
715 		trace3(TR_clnt_vc_call, 1, cl, proc);
716 		return (RPC_SUCCESS);
717 	}
718 	/*
719 	 * Hack to provide rpc-based message passing
720 	 */
721 	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
722 		rpc_fd_unlock(vctbl, ct->ct_fd);
723 		trace3(TR_clnt_vc_call, 1, cl, proc);
724 		return (rpc_callerr.re_status = RPC_TIMEDOUT);
725 	}
726 
727 
728 	/*
729 	 * Keep receiving until we get a valid transaction id
730 	 */
731 	xdrs->x_op = XDR_DECODE;
732 	/*CONSTANTCONDITION*/
733 	while (TRUE) {
734 		reply_msg.acpted_rply.ar_verf = _null_auth;
735 		reply_msg.acpted_rply.ar_results.where = NULL;
736 		reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
737 		if (! xdrrec_skiprecord(xdrs)) {
738 			rpc_fd_unlock(vctbl, ct->ct_fd);
739 			trace3(TR_clnt_vc_call, 1, cl, proc);
740 			return (rpc_callerr.re_status);
741 		}
742 		/* now decode and validate the response header */
743 		if (! xdr_replymsg(xdrs, &reply_msg)) {
744 			if (rpc_callerr.re_status == RPC_SUCCESS)
745 				continue;
746 			rpc_fd_unlock(vctbl, ct->ct_fd);
747 			trace3(TR_clnt_vc_call, 1, cl, proc);
748 			return (rpc_callerr.re_status);
749 		}
750 		if (reply_msg.rm_xid == x_id)
751 			break;
752 	}
753 
754 	/*
755 	 * process header
756 	 */
757 	if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
758 	    (reply_msg.acpted_rply.ar_stat == SUCCESS))
759 		rpc_callerr.re_status = RPC_SUCCESS;
760 	else
761 		__seterr_reply(&reply_msg, &(rpc_callerr));
762 
763 	if (rpc_callerr.re_status == RPC_SUCCESS) {
764 		if (! AUTH_VALIDATE(cl->cl_auth,
765 				&reply_msg.acpted_rply.ar_verf)) {
766 			rpc_callerr.re_status = RPC_AUTHERROR;
767 			rpc_callerr.re_why = AUTH_INVALIDRESP;
768 		} else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
769 			if (!(*xdr_results)(xdrs, results_ptr)) {
770 				if (rpc_callerr.re_status == RPC_SUCCESS)
771 				    rpc_callerr.re_status = RPC_CANTDECODERES;
772 			}
773 		} else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results,
774 							results_ptr)) {
775 			if (rpc_callerr.re_status == RPC_SUCCESS)
776 				rpc_callerr.re_status = RPC_CANTDECODERES;
777 		}
778 	}	/* end successful completion */
779 	/*
780 	 * If unsuccesful AND error is an authentication error
781 	 * then refresh credentials and try again, else break
782 	 */
783 	else if (rpc_callerr.re_status == RPC_AUTHERROR) {
784 		/* maybe our credentials need to be refreshed ... */
785 		if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
786 			goto call_again;
787 		else
788 			/*
789 			 * We are setting rpc_callerr here given that libnsl
790 			 * is not reentrant thereby reinitializing the TSD.
791 			 * If not set here then success could be returned even
792 			 * though refresh failed.
793 			 */
794 			rpc_callerr.re_status = RPC_AUTHERROR;
795 	} /* end of unsuccessful completion */
796 	/* free verifier ... */
797 	if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED &&
798 			reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
799 		xdrs->x_op = XDR_FREE;
800 		(void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
801 	}
802 	rpc_fd_unlock(vctbl, ct->ct_fd);
803 	trace3(TR_clnt_vc_call, 1, cl, proc);
804 	return (rpc_callerr.re_status);
805 }
806 
807 static enum clnt_stat
808 clnt_vc_send(cl, proc, xdr_args, args_ptr)
809 	CLIENT *cl;
810 	rpcproc_t proc;
811 	xdrproc_t xdr_args;
812 	caddr_t args_ptr;
813 {
814 /* LINTED pointer alignment */
815 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
816 	XDR *xdrs = &(ct->ct_xdrs);
817 	uint32_t x_id;
818 /* LINTED pointer alignment */
819 	uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);	/* yuk */
820 
821 	trace3(TR_clnt_vc_send, 0, cl, proc);
822 
823 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
824 		rpc_callerr.re_status = RPC_FAILED;
825 		rpc_callerr.re_errno = errno;
826 		rpc_fd_unlock(vctbl, ct->ct_fd);
827 		return (RPC_FAILED);
828 	}
829 
830 	ct->ct_is_oneway = TRUE;
831 
832 	xdrs->x_op = XDR_ENCODE;
833 	rpc_callerr.re_status = RPC_SUCCESS;
834 	/*
835 	 * Due to little endian byte order, it is necessary to convert to host
836 	 * format before decrementing xid.
837 	 */
838 	x_id = ntohl(*msg_x_id) - 1;
839 	*msg_x_id = htonl(x_id);
840 
841 	if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
842 		if ((! XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
843 		    (! XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
844 		    (! AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
845 		    (! xdr_args(xdrs, args_ptr))) {
846 			if (rpc_callerr.re_status == RPC_SUCCESS)
847 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
848 			(void) xdrrec_endofrecord(xdrs, TRUE);
849 			rpc_fd_unlock(vctbl, ct->ct_fd);
850 			trace3(TR_clnt_vc_send, 1, cl, proc);
851 			return (rpc_callerr.re_status);
852 		}
853 	} else {
854 /* LINTED pointer alignment */
855 		uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
856 		IXDR_PUT_U_INT32(u, proc);
857 		if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
858 		    ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
859 			if (rpc_callerr.re_status == RPC_SUCCESS)
860 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
861 			(void) xdrrec_endofrecord(xdrs, TRUE);
862 			rpc_fd_unlock(vctbl, ct->ct_fd);
863 			trace3(TR_clnt_vc_send, 1, cl, proc);
864 			return (rpc_callerr.re_status);
865 		}
866 	}
867 
868 	/*
869 	 * Do not need to check errors, as the following code does
870 	 * not depend on the successful completion of the call.
871 	 * An error, if any occurs, is reported through
872 	 * rpc_callerr.re_status.
873 	 */
874 	xdrrec_endofrecord(xdrs, TRUE);
875 
876 	rpc_fd_unlock(vctbl, ct->ct_fd);
877 	trace3(TR_clnt_vc_call, 1, cl, proc);
878 	return (rpc_callerr.re_status);
879 }
880 
881 static void
882 clnt_vc_geterr(cl, errp)
883 	CLIENT *cl;
884 	struct rpc_err *errp;
885 {
886 	trace2(TR_clnt_vc_geterr, 0, cl);
887 	*errp = rpc_callerr;
888 	trace2(TR_clnt_vc_geterr, 1, cl);
889 }
890 
891 static bool_t
892 clnt_vc_freeres(cl, xdr_res, res_ptr)
893 	CLIENT *cl;
894 	xdrproc_t xdr_res;
895 	caddr_t res_ptr;
896 {
897 /* LINTED pointer alignment */
898 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
899 	XDR *xdrs = &(ct->ct_xdrs);
900 	bool_t dummy;
901 
902 	trace2(TR_clnt_vc_freeres, 0, cl);
903 	rpc_fd_lock(vctbl, ct->ct_fd);
904 	xdrs->x_op = XDR_FREE;
905 	dummy = (*xdr_res)(xdrs, res_ptr);
906 	rpc_fd_unlock(vctbl, ct->ct_fd);
907 	trace2(TR_clnt_vc_freeres, 1, cl);
908 	return (dummy);
909 }
910 
911 static void
912 clnt_vc_abort(void)
913 {
914 	trace1(TR_clnt_vc_abort, 0);
915 	trace1(TR_clnt_vc_abort, 1);
916 }
917 
918 /*ARGSUSED*/
919 static bool_t
920 clnt_vc_control(cl, request, info)
921 	CLIENT *cl;
922 	int request;
923 	char *info;
924 {
925 	bool_t ret;
926 /* LINTED pointer alignment */
927 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
928 
929 	trace3(TR_clnt_vc_control, 0, cl, request);
930 
931 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
932 		rpc_fd_unlock(vctbl, ct->ct_fd);
933 		return (RPC_FAILED);
934 	}
935 
936 	switch (request) {
937 	case CLSET_FD_CLOSE:
938 		ct->ct_closeit = TRUE;
939 		rpc_fd_unlock(vctbl, ct->ct_fd);
940 		trace3(TR_clnt_vc_control, 1, cl, request);
941 		return (TRUE);
942 	case CLSET_FD_NCLOSE:
943 		ct->ct_closeit = FALSE;
944 		rpc_fd_unlock(vctbl, ct->ct_fd);
945 		trace3(TR_clnt_vc_control, 1, cl, request);
946 		return (TRUE);
947 	case CLFLUSH:
948 		if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
949 			int res;
950 			res = do_flush(ct, (info == NULL ||
951 			    *(int *)info == RPC_CL_DEFAULT_FLUSH)?
952 			    ct->ct_blocking_mode: *(int *)info);
953 			ret = (0 == res);
954 		}
955 		rpc_fd_unlock(vctbl, ct->ct_fd);
956 		return (ret);
957 	}
958 
959 	/* for other requests which use info */
960 	if (info == NULL) {
961 		rpc_fd_unlock(vctbl, ct->ct_fd);
962 		trace3(TR_clnt_vc_control, 1, cl, request);
963 		return (FALSE);
964 	}
965 	switch (request) {
966 	case CLSET_TIMEOUT:
967 /* LINTED pointer alignment */
968 		if (time_not_ok((struct timeval *)info)) {
969 			rpc_fd_unlock(vctbl, ct->ct_fd);
970 			trace3(TR_clnt_vc_control, 1, cl, request);
971 			return (FALSE);
972 		}
973 /* LINTED pointer alignment */
974 		ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info);
975 		ct->ct_waitset = TRUE;
976 		break;
977 	case CLGET_TIMEOUT:
978 /* LINTED pointer alignment */
979 		((struct timeval *)info)->tv_sec = ct->ct_wait / 1000;
980 /* LINTED pointer alignment */
981 		((struct timeval *)info)->tv_usec =
982 			(ct->ct_wait % 1000) * 1000;
983 		break;
984 	case CLGET_SERVER_ADDR:	/* For compatibility only */
985 		(void) memcpy(info, ct->ct_addr.buf, (int)ct->ct_addr.len);
986 		break;
987 	case CLGET_FD:
988 /* LINTED pointer alignment */
989 		*(int *)info = ct->ct_fd;
990 		break;
991 	case CLGET_SVC_ADDR:
992 		/* The caller should not free this memory area */
993 /* LINTED pointer alignment */
994 		*(struct netbuf *)info = ct->ct_addr;
995 		break;
996 	case CLSET_SVC_ADDR:		/* set to new address */
997 #ifdef undef
998 		/*
999 		 * XXX: once the t_snddis(), followed by t_connect() starts to
1000 		 * work, this ifdef should be removed.  CLIENT handle reuse
1001 		 * would then be possible for COTS as well.
1002 		 */
1003 		if (t_snddis(ct->ct_fd, NULL) == -1) {
1004 			rpc_createerr.cf_stat = RPC_TLIERROR;
1005 			rpc_createerr.cf_error.re_terrno = t_errno;
1006 			rpc_createerr.cf_error.re_errno = errno;
1007 			rpc_fd_unlock(vctbl, ct->ct_fd);
1008 			trace3(TR_clnt_vc_control, 1, cl, request);
1009 			return (FALSE);
1010 		}
1011 		ret = set_up_connection(ct->ct_fd, (struct netbuf *)info,
1012 			ct, NULL));
1013 		rpc_fd_unlock(vctbl, ct->ct_fd);
1014 		trace3(TR_clnt_vc_control, 1, cl, request);
1015 		return (ret);
1016 #else
1017 		rpc_fd_unlock(vctbl, ct->ct_fd);
1018 		trace3(TR_clnt_vc_control, 1, cl, request);
1019 		return (FALSE);
1020 #endif
1021 	case CLGET_XID:
1022 		/*
1023 		 * use the knowledge that xid is the
1024 		 * first element in the call structure
1025 		 * This will get the xid of the PREVIOUS call
1026 		 */
1027 /* LINTED pointer alignment */
1028 		*(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall);
1029 		break;
1030 	case CLSET_XID:
1031 		/* This will set the xid of the NEXT call */
1032 /* LINTED pointer alignment */
1033 		*(uint32_t *)ct->ct_mcall =  htonl(*(uint32_t *)info + 1);
1034 		/* increment by 1 as clnt_vc_call() decrements once */
1035 		break;
1036 	case CLGET_VERS:
1037 		/*
1038 		 * This RELIES on the information that, in the call body,
1039 		 * the version number field is the fifth field from the
1040 		 * begining of the RPC header. MUST be changed if the
1041 		 * call_struct is changed
1042 		 */
1043 /* LINTED pointer alignment */
1044 		*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
1045 						4 * BYTES_PER_XDR_UNIT));
1046 		break;
1047 
1048 	case CLSET_VERS:
1049 /* LINTED pointer alignment */
1050 		*(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) =
1051 /* LINTED pointer alignment */
1052 			htonl(*(uint32_t *)info);
1053 		break;
1054 
1055 	case CLGET_PROG:
1056 		/*
1057 		 * This RELIES on the information that, in the call body,
1058 		 * the program number field is the fourth field from the
1059 		 * begining of the RPC header. MUST be changed if the
1060 		 * call_struct is changed
1061 		 */
1062 /* LINTED pointer alignment */
1063 		*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
1064 						3 * BYTES_PER_XDR_UNIT));
1065 		break;
1066 
1067 	case CLSET_PROG:
1068 /* LINTED pointer alignment */
1069 		*(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) =
1070 /* LINTED pointer alignment */
1071 			htonl(*(uint32_t *)info);
1072 		break;
1073 
1074 	case CLSET_IO_MODE:
1075 		if (!set_io_mode(ct, *(int *)info)) {
1076 		    rpc_fd_unlock(vctbl, ct->ct_fd);
1077 		    return (FALSE);
1078 		}
1079 		break;
1080 	case CLSET_FLUSH_MODE:
1081 		/* Set a specific FLUSH_MODE */
1082 		if (!set_flush_mode(ct, *(int *)info)) {
1083 		    rpc_fd_unlock(vctbl, ct->ct_fd);
1084 		    return (FALSE);
1085 		}
1086 		break;
1087 	case CLGET_FLUSH_MODE:
1088 		*(rpcflushmode_t *)info = ct->ct_blocking_mode;
1089 		break;
1090 
1091 	case CLGET_IO_MODE:
1092 		*(rpciomode_t *)info = ct->ct_io_mode;
1093 		break;
1094 
1095 	case CLGET_CURRENT_REC_SIZE:
1096 		/*
1097 		 * Returns the current amount of memory allocated
1098 		 * to pending requests
1099 		 */
1100 		*(int *)info = ct->ct_bufferPendingSize;
1101 		break;
1102 
1103 	case CLSET_CONNMAXREC_SIZE:
1104 		/* Cannot resize the buffer if it is used. */
1105 		if (ct->ct_bufferPendingSize != 0) {
1106 			rpc_fd_unlock(vctbl, ct->ct_fd);
1107 			return (FALSE);
1108 		}
1109 		/*
1110 		 * If the new size is equal to the current size,
1111 		 * there is nothing to do.
1112 		 */
1113 		if (ct->ct_bufferSize == *(uint_t *)info)
1114 			break;
1115 
1116 		ct->ct_bufferSize = *(uint_t *)info;
1117 		if (ct->ct_buffer) {
1118 			free(ct->ct_buffer);
1119 			ct->ct_buffer = NULL;
1120 			ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL;
1121 		}
1122 		break;
1123 
1124 	case CLGET_CONNMAXREC_SIZE:
1125 		/*
1126 		 * Returns the size of buffer allocated
1127 		 * to pending requests
1128 		 */
1129 		*(uint_t *)info = ct->ct_bufferSize;
1130 		break;
1131 
1132 	default:
1133 		rpc_fd_unlock(vctbl, ct->ct_fd);
1134 		trace3(TR_clnt_vc_control, 1, cl, request);
1135 		return (FALSE);
1136 	}
1137 	rpc_fd_unlock(vctbl, ct->ct_fd);
1138 	trace3(TR_clnt_vc_control, 1, cl, request);
1139 	return (TRUE);
1140 }
1141 
1142 static void
1143 clnt_vc_destroy(cl)
1144 	CLIENT *cl;
1145 {
1146 /* LINTED pointer alignment */
1147 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
1148 	int ct_fd = ct->ct_fd;
1149 
1150 	trace2(TR_clnt_vc_destroy, 0, cl);
1151 	rpc_fd_lock(vctbl, ct_fd);
1152 
1153 	if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1154 	    do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1155 	    unregister_nb(ct);
1156 	}
1157 
1158 	if (ct->ct_closeit)
1159 		(void) t_close(ct_fd);
1160 	XDR_DESTROY(&(ct->ct_xdrs));
1161 	if (ct->ct_addr.buf)
1162 		(void) free(ct->ct_addr.buf);
1163 	mem_free((caddr_t)ct, sizeof (struct ct_data));
1164 	if (cl->cl_netid && cl->cl_netid[0])
1165 		mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
1166 	if (cl->cl_tp && cl->cl_tp[0])
1167 		mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
1168 	mem_free((caddr_t)cl, sizeof (CLIENT));
1169 	rpc_fd_unlock(vctbl, ct_fd);
1170 	trace2(TR_clnt_vc_destroy, 1, cl);
1171 }
1172 
1173 /*
1174  * Interface between xdr serializer and vc connection.
1175  * Behaves like the system calls, read & write, but keeps some error state
1176  * around for the rpc level.
1177  */
1178 static int
1179 read_vc(void *ct_tmp, caddr_t buf, int len)
1180 {
1181 	static pthread_key_t pfdp_key;
1182 	struct pollfd *pfdp;
1183 	int npfd;		/* total number of pfdp allocated */
1184 	struct ct_data *ct = ct_tmp;
1185 	struct timeval starttime;
1186 	struct timeval curtime;
1187 	struct timeval time_waited;
1188 	struct timeval timeout;
1189 	int poll_time;
1190 	int delta;
1191 
1192 	trace2(TR_read_vc, 0, len);
1193 
1194 	if (len == 0) {
1195 		trace2(TR_read_vc, 1, len);
1196 		return (0);
1197 	}
1198 
1199 	/*
1200 	 * Allocate just one the first time.  thr_get_storage() may
1201 	 * return a larger buffer, left over from the last time we were
1202 	 * here, but that's OK.  realloc() will deal with it properly.
1203 	 */
1204 	npfd = 1;
1205 	pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free);
1206 	if (pfdp == NULL) {
1207 		(void) syslog(LOG_ERR, clnt_vc_errstr,
1208 			clnt_read_vc_str, __no_mem_str);
1209 		rpc_callerr.re_status = RPC_SYSTEMERROR;
1210 		rpc_callerr.re_errno = errno;
1211 		rpc_callerr.re_terrno = 0;
1212 		trace2(TR_read_vc, 1, len);
1213 		return (-1);
1214 	}
1215 
1216 	/*
1217 	 *	N.B.:  slot 0 in the pollfd array is reserved for the file
1218 	 *	descriptor we're really interested in (as opposed to the
1219 	 *	callback descriptors).
1220 	 */
1221 	pfdp[0].fd = ct->ct_fd;
1222 	pfdp[0].events = MASKVAL;
1223 	pfdp[0].revents = 0;
1224 	poll_time = ct->ct_wait;
1225 	if (gettimeofday(&starttime, (struct timezone *)NULL) == -1) {
1226 		syslog(LOG_ERR, "Unable to get time of day: %m");
1227 		return (-1);
1228 	}
1229 
1230 	for (;;) {
1231 		extern void (*_svc_getreqset_proc)();
1232 		extern pollfd_t *svc_pollfd;
1233 		extern int svc_max_pollfd;
1234 		int fds;
1235 
1236 		/* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */
1237 
1238 		if (_svc_getreqset_proc) {
1239 			sig_rw_rdlock(&svc_fd_lock);
1240 
1241 			/* reallocate pfdp to svc_max_pollfd +1 */
1242 			if (npfd != (svc_max_pollfd + 1)) {
1243 				struct pollfd *tmp_pfdp = realloc(pfdp,
1244 						sizeof (struct pollfd) *
1245 						(svc_max_pollfd + 1));
1246 				if (tmp_pfdp == NULL) {
1247 					sig_rw_unlock(&svc_fd_lock);
1248 					(void) syslog(LOG_ERR, clnt_vc_errstr,
1249 						clnt_read_vc_str, __no_mem_str);
1250 					rpc_callerr.re_status = RPC_SYSTEMERROR;
1251 					rpc_callerr.re_errno = errno;
1252 					rpc_callerr.re_terrno = 0;
1253 					trace2(TR_read_vc, 1, len);
1254 					return (-1);
1255 				}
1256 
1257 				pfdp = tmp_pfdp;
1258 				npfd = svc_max_pollfd + 1;
1259 				pthread_setspecific(pfdp_key, pfdp);
1260 			}
1261 			if (npfd > 1)
1262 				(void) memcpy(&pfdp[1], svc_pollfd,
1263 				    sizeof (struct pollfd) * (npfd - 1));
1264 
1265 			sig_rw_unlock(&svc_fd_lock);
1266 		} else {
1267 			npfd = 1;	/* don't forget about pfdp[0] */
1268 		}
1269 
1270 		switch (fds = poll(pfdp, npfd, poll_time)) {
1271 		case 0:
1272 			rpc_callerr.re_status = RPC_TIMEDOUT;
1273 			trace2(TR_read_vc, 1, len);
1274 			return (-1);
1275 
1276 		case -1:
1277 			if (errno != EINTR)
1278 				continue;
1279 			else {
1280 				/*
1281 				 * interrupted by another signal,
1282 				 * update time_waited
1283 				 */
1284 
1285 				if (gettimeofday(&curtime,
1286 				(struct timezone *)NULL) == -1) {
1287 					syslog(LOG_ERR,
1288 					    "Unable to get time of day:  %m");
1289 					errno = 0;
1290 					continue;
1291 				};
1292 				delta = (curtime.tv_sec -
1293 						starttime.tv_sec) * 1000 +
1294 					(curtime.tv_usec -
1295 						starttime.tv_usec) / 1000;
1296 				poll_time -= delta;
1297 				if (poll_time < 0) {
1298 					rpc_callerr.re_status =
1299 						RPC_TIMEDOUT;
1300 					errno = 0;
1301 					trace2(TR_read_vc, 1, len);
1302 					return (-1);
1303 				} else {
1304 					errno = 0; /* reset it */
1305 					continue;
1306 				}
1307 			}
1308 		}
1309 
1310 		if (pfdp[0].revents == 0) {
1311 			/* must be for server side of the house */
1312 			(*_svc_getreqset_proc)(&pfdp[1], fds);
1313 			continue;	/* do poll again */
1314 		}
1315 
1316 		if (pfdp[0].revents & POLLNVAL) {
1317 			rpc_callerr.re_status = RPC_CANTRECV;
1318 			/*
1319 			 *	Note:  we're faking errno here because we
1320 			 *	previously would have expected select() to
1321 			 *	return -1 with errno EBADF.  Poll(BA_OS)
1322 			 *	returns 0 and sets the POLLNVAL revents flag
1323 			 *	instead.
1324 			 */
1325 			rpc_callerr.re_errno = errno = EBADF;
1326 			trace2(TR_read_vc, 1, len);
1327 			return (-1);
1328 		}
1329 
1330 		if (pfdp[0].revents & (POLLERR | POLLHUP)) {
1331 			rpc_callerr.re_status = RPC_CANTRECV;
1332 			rpc_callerr.re_errno = errno = EPIPE;
1333 			trace2(TR_read_vc, 1, len);
1334 			return (-1);
1335 		}
1336 		break;
1337 	}
1338 
1339 	switch (len = t_rcvall(ct->ct_fd, buf, len)) {
1340 	case 0:
1341 		/* premature eof */
1342 		rpc_callerr.re_errno = ENOLINK;
1343 		rpc_callerr.re_terrno = 0;
1344 		rpc_callerr.re_status = RPC_CANTRECV;
1345 		len = -1;	/* it's really an error */
1346 		break;
1347 
1348 	case -1:
1349 		rpc_callerr.re_terrno = t_errno;
1350 		rpc_callerr.re_errno = 0;
1351 		rpc_callerr.re_status = RPC_CANTRECV;
1352 		break;
1353 	}
1354 	trace2(TR_read_vc, 1, len);
1355 	return (len);
1356 }
1357 
1358 static int
1359 write_vc(ct_tmp, buf, len)
1360 	void *ct_tmp;
1361 	caddr_t buf;
1362 	int len;
1363 {
1364 	int i, cnt;
1365 	struct ct_data *ct = ct_tmp;
1366 	int flag;
1367 	int maxsz;
1368 
1369 	trace2(TR_write_vc, 0, len);
1370 
1371 	maxsz = ct->ct_tsdu;
1372 
1373 	/* Handle the non-blocking mode */
1374 	if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1375 		/*
1376 		 * Test a special case here. If the length of the current
1377 		 * write is greater than the transport data unit, and the
1378 		 * mode is non blocking, we return RPC_CANTSEND.
1379 		 * XXX  this is not very clean.
1380 		 */
1381 		if (maxsz > 0 && len > maxsz) {
1382 			rpc_callerr.re_terrno = errno;
1383 			rpc_callerr.re_errno = 0;
1384 			rpc_callerr.re_status = RPC_CANTSEND;
1385 			return (-1);
1386 		}
1387 
1388 		len = nb_send(ct, buf, (unsigned)len);
1389 		if (len == -1) {
1390 			rpc_callerr.re_terrno = errno;
1391 			rpc_callerr.re_errno = 0;
1392 			rpc_callerr.re_status = RPC_CANTSEND;
1393 		} else if (len == -2) {
1394 			rpc_callerr.re_terrno = 0;
1395 			rpc_callerr.re_errno = 0;
1396 			rpc_callerr.re_status = RPC_CANTSTORE;
1397 		}
1398 		trace2(TR_write_vc, 1, len);
1399 		return (len);
1400 	}
1401 
1402 	if ((maxsz == 0) || (maxsz == -1)) {
1403 		/*
1404 		 * T_snd may return -1 for error on connection (connection
1405 		 * needs to be repaired/closed, and -2 for flow-control
1406 		 * handling error (no operation to do, just wait and call
1407 		 * T_Flush()).
1408 		 */
1409 		if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) {
1410 			rpc_callerr.re_terrno = t_errno;
1411 			rpc_callerr.re_errno = 0;
1412 			rpc_callerr.re_status = RPC_CANTSEND;
1413 		}
1414 		trace2(TR_write_vc, 1, len);
1415 		return (len);
1416 	}
1417 
1418 	/*
1419 	 * This for those transports which have a max size for data.
1420 	 */
1421 	for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) {
1422 		flag = cnt > maxsz ? T_MORE : 0;
1423 		if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz),
1424 				flag)) == -1) {
1425 			rpc_callerr.re_terrno = t_errno;
1426 			rpc_callerr.re_errno = 0;
1427 			rpc_callerr.re_status = RPC_CANTSEND;
1428 			trace2(TR_write_vc, 1, len);
1429 			return (-1);
1430 		}
1431 	}
1432 	trace2(TR_write_vc, 1, len);
1433 	return (len);
1434 }
1435 
1436 /*
1437  * Receive the required bytes of data, even if it is fragmented.
1438  */
1439 static int
1440 t_rcvall(fd, buf, len)
1441 	int fd;
1442 	char *buf;
1443 	int len;
1444 {
1445 	int moreflag;
1446 	int final = 0;
1447 	int res;
1448 
1449 	trace3(TR_t_rcvall, 0, fd, len);
1450 	do {
1451 		moreflag = 0;
1452 		res = t_rcv(fd, buf, (unsigned)len, &moreflag);
1453 		if (res == -1) {
1454 			if (t_errno == TLOOK)
1455 				switch (t_look(fd)) {
1456 				case T_DISCONNECT:
1457 					t_rcvdis(fd, NULL);
1458 					t_snddis(fd, NULL);
1459 					trace3(TR_t_rcvall, 1, fd, len);
1460 					return (-1);
1461 				case T_ORDREL:
1462 				/* Received orderly release indication */
1463 					t_rcvrel(fd);
1464 				/* Send orderly release indicator */
1465 					(void) t_sndrel(fd);
1466 					trace3(TR_t_rcvall, 1, fd, len);
1467 					return (-1);
1468 				default:
1469 					trace3(TR_t_rcvall, 1, fd, len);
1470 					return (-1);
1471 				}
1472 		} else if (res == 0) {
1473 			trace3(TR_t_rcvall, 1, fd, len);
1474 			return (0);
1475 		}
1476 		final += res;
1477 		buf += res;
1478 		len -= res;
1479 	} while ((len > 0) && (moreflag & T_MORE));
1480 	trace3(TR_t_rcvall, 1, fd, len);
1481 	return (final);
1482 }
1483 
1484 static struct clnt_ops *
1485 clnt_vc_ops(void)
1486 {
1487 	static struct clnt_ops ops;
1488 	extern mutex_t	ops_lock;
1489 
1490 	/* VARIABLES PROTECTED BY ops_lock: ops */
1491 
1492 	trace1(TR_clnt_vc_ops, 0);
1493 	sig_mutex_lock(&ops_lock);
1494 	if (ops.cl_call == NULL) {
1495 		ops.cl_call = clnt_vc_call;
1496 		ops.cl_send = clnt_vc_send;
1497 		ops.cl_abort = clnt_vc_abort;
1498 		ops.cl_geterr = clnt_vc_geterr;
1499 		ops.cl_freeres = clnt_vc_freeres;
1500 		ops.cl_destroy = clnt_vc_destroy;
1501 		ops.cl_control = clnt_vc_control;
1502 	}
1503 	sig_mutex_unlock(&ops_lock);
1504 	trace1(TR_clnt_vc_ops, 1);
1505 	return (&ops);
1506 }
1507 
1508 /*
1509  * Make sure that the time is not garbage.   -1 value is disallowed.
1510  * Note this is different from time_not_ok in clnt_dg.c
1511  */
1512 static bool_t
1513 time_not_ok(t)
1514 	struct timeval *t;
1515 {
1516 	trace1(TR_time_not_ok, 0);
1517 	trace1(TR_time_not_ok, 1);
1518 	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
1519 		t->tv_usec <= -1 || t->tv_usec > 1000000);
1520 }
1521 
1522 
1523 /* Compute the # of bytes that remains until the end of the buffer */
1524 #define	REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer))
1525 
1526 static int
1527 addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes)
1528 {
1529 	if (NULL == ct->ct_buffer) {
1530 		/* Buffer not allocated yet. */
1531 		char *buffer;
1532 
1533 		buffer = (char *)malloc(ct->ct_bufferSize);
1534 		if (NULL == buffer) {
1535 			errno = ENOMEM;
1536 			return (-1);
1537 		}
1538 		memcpy(buffer, dataToAdd, nBytes);
1539 
1540 		ct->ct_buffer = buffer;
1541 		ct->ct_bufferReadPtr = buffer;
1542 		ct->ct_bufferWritePtr = buffer + nBytes;
1543 		ct->ct_bufferPendingSize = nBytes;
1544 	} else {
1545 		/*
1546 		 * For an already allocated buffer, two mem copies
1547 		 * might be needed, depending on the current
1548 		 * writing position.
1549 		 */
1550 
1551 		/* Compute the length of the first copy. */
1552 		int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr));
1553 
1554 		ct->ct_bufferPendingSize += nBytes;
1555 
1556 		memcpy(ct->ct_bufferWritePtr, dataToAdd, len);
1557 		ct->ct_bufferWritePtr += len;
1558 		nBytes -= len;
1559 		if (0 == nBytes) {
1560 			/* One memcopy needed. */
1561 
1562 			/*
1563 			 * If the write pointer is at the end of the buffer,
1564 			 * wrap it now.
1565 			 */
1566 			if (ct->ct_bufferWritePtr ==
1567 			    (ct->ct_buffer + ct->ct_bufferSize)) {
1568 				ct->ct_bufferWritePtr = ct->ct_buffer;
1569 			}
1570 		} else {
1571 			/* Two memcopy needed. */
1572 			dataToAdd += len;
1573 
1574 			/*
1575 			 * Copy the remaining data to the beginning of the
1576 			 * buffer
1577 			 */
1578 			memcpy(ct->ct_buffer, dataToAdd, nBytes);
1579 			ct->ct_bufferWritePtr = ct->ct_buffer + nBytes;
1580 		}
1581 	}
1582 	return (0);
1583 }
1584 
1585 static void
1586 getFromBuffer(struct ct_data *ct, char **data, unsigned int *nBytes)
1587 {
1588 	int len = MIN(ct->ct_bufferPendingSize, REMAIN_BYTES(bufferReadPtr));
1589 	*data = ct->ct_bufferReadPtr;
1590 	*nBytes = len;
1591 }
1592 
1593 static void
1594 consumeFromBuffer(struct ct_data *ct, unsigned int nBytes)
1595 {
1596 	ct->ct_bufferPendingSize -= nBytes;
1597 	if (ct->ct_bufferPendingSize == 0) {
1598 		/*
1599 		 * If the buffer contains no data, we set the two pointers at
1600 		 * the beginning of the buffer (to miminize buffer wraps).
1601 		 */
1602 		ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer;
1603 	} else {
1604 		ct->ct_bufferReadPtr += nBytes;
1605 		if (ct->ct_bufferReadPtr >
1606 		    ct->ct_buffer + ct->ct_bufferSize) {
1607 			ct->ct_bufferReadPtr -= ct->ct_bufferSize;
1608 		}
1609 	}
1610 }
1611 
1612 static int
1613 iovFromBuffer(struct ct_data *ct, struct iovec *iov)
1614 {
1615 	int l;
1616 
1617 	if (ct->ct_bufferPendingSize == 0)
1618 		return (0);
1619 
1620 	l = REMAIN_BYTES(bufferReadPtr);
1621 	if (l < ct->ct_bufferPendingSize) {
1622 		/* Buffer in two fragments. */
1623 		iov[0].iov_base = ct->ct_bufferReadPtr;
1624 		iov[0].iov_len  = l;
1625 
1626 		iov[1].iov_base = ct->ct_buffer;
1627 		iov[1].iov_len  = ct->ct_bufferPendingSize - l;
1628 		return (2);
1629 	} else {
1630 		/* Buffer in one fragment. */
1631 		iov[0].iov_base = ct->ct_bufferReadPtr;
1632 		iov[0].iov_len  = ct->ct_bufferPendingSize;
1633 		return (1);
1634 	}
1635 }
1636 
1637 static bool_t
1638 set_flush_mode(struct ct_data *ct, int mode)
1639 {
1640 	switch (mode) {
1641 	case RPC_CL_BLOCKING_FLUSH:
1642 		/* flush as most as possible without blocking */
1643 	case RPC_CL_BESTEFFORT_FLUSH:
1644 		/* flush the buffer completely (possibly blocking) */
1645 	case RPC_CL_DEFAULT_FLUSH:
1646 		/* flush according to the currently defined policy */
1647 		ct->ct_blocking_mode = mode;
1648 		return (TRUE);
1649 	default:
1650 		return (FALSE);
1651 	}
1652 }
1653 
1654 static bool_t
1655 set_io_mode(struct ct_data *ct, int ioMode)
1656 {
1657 	switch (ioMode) {
1658 	case RPC_CL_BLOCKING:
1659 		if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1660 			if (NULL != ct->ct_buffer) {
1661 				/*
1662 				 * If a buffer was allocated for this
1663 				 * connection, flush it now, and free it.
1664 				 */
1665 				do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1666 				free(ct->ct_buffer);
1667 				ct->ct_buffer = NULL;
1668 			}
1669 			unregister_nb(ct);
1670 			ct->ct_io_mode = ioMode;
1671 		}
1672 		break;
1673 	case RPC_CL_NONBLOCKING:
1674 		if (ct->ct_io_mode == RPC_CL_BLOCKING) {
1675 			if (-1 == register_nb(ct)) {
1676 				return (FALSE);
1677 			}
1678 			ct->ct_io_mode = ioMode;
1679 		}
1680 		break;
1681 	default:
1682 		return (FALSE);
1683 	}
1684 	return (TRUE);
1685 }
1686 
1687 static int
1688 do_flush(struct ct_data *ct, uint_t flush_mode)
1689 {
1690 	int result;
1691 	if (ct->ct_bufferPendingSize == 0) {
1692 		return (0);
1693 	}
1694 
1695 	switch (flush_mode) {
1696 	case RPC_CL_BLOCKING_FLUSH:
1697 		if (!set_blocking_connection(ct, TRUE)) {
1698 			return (-1);
1699 		}
1700 		while (ct->ct_bufferPendingSize > 0) {
1701 			if (REMAIN_BYTES(bufferReadPtr) <
1702 			    ct->ct_bufferPendingSize) {
1703 				struct iovec iov[2];
1704 				iovFromBuffer(ct, iov);
1705 				result = writev(ct->ct_fd, iov, 2);
1706 			} else {
1707 				result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1708 				    ct->ct_bufferPendingSize, 0);
1709 			}
1710 			if (result < 0) {
1711 				return (-1);
1712 			}
1713 			consumeFromBuffer(ct, result);
1714 		}
1715 
1716 		break;
1717 
1718 	case RPC_CL_BESTEFFORT_FLUSH:
1719 		set_blocking_connection(ct, FALSE);
1720 		if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) {
1721 			struct iovec iov[2];
1722 			iovFromBuffer(ct, iov);
1723 			result = writev(ct->ct_fd, iov, 2);
1724 		} else {
1725 			result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1726 			    ct->ct_bufferPendingSize, 0);
1727 		}
1728 		if (result < 0) {
1729 			if (errno != EWOULDBLOCK) {
1730 				perror("flush");
1731 				return (-1);
1732 			}
1733 			return (0);
1734 		}
1735 		if (result > 0)
1736 			consumeFromBuffer(ct, result);
1737 		break;
1738 	}
1739 	return (0);
1740 }
1741 
1742 /*
1743  * Non blocking send.
1744  */
1745 
1746 static int
1747 nb_send(struct ct_data *ct, void *buff, unsigned int nBytes)
1748 {
1749 	int result;
1750 
1751 	if (!(ntohl(*(uint32_t *)buff) & 2^31)) {
1752 		return (-1);
1753 	}
1754 
1755 	/*
1756 	 * Check to see if the current message can be stored fully in the
1757 	 * buffer. We have to check this now because it may be impossible
1758 	 * to send any data, so the message must be stored in the buffer.
1759 	 */
1760 	if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) {
1761 		/* Try to flush  (to free some space). */
1762 		do_flush(ct, RPC_CL_BESTEFFORT_FLUSH);
1763 
1764 		/* Can we store the message now ? */
1765 		if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize))
1766 			return (-2);
1767 	}
1768 
1769 	set_blocking_connection(ct, FALSE);
1770 
1771 	/*
1772 	 * If there is no data pending, we can simply try
1773 	 * to send our data.
1774 	 */
1775 	if (ct->ct_bufferPendingSize == 0) {
1776 		result = t_snd(ct->ct_fd, buff, nBytes, 0);
1777 		if (result == -1) {
1778 			if (errno == EWOULDBLOCK) {
1779 				result = 0;
1780 			} else {
1781 				perror("send");
1782 				return (-1);
1783 			}
1784 		}
1785 		/*
1786 		 * If we have not sent all data, we must store them
1787 		 * in the buffer.
1788 		 */
1789 		if (result != nBytes) {
1790 			if (addInBuffer(ct, (char *)buff + result,
1791 			    nBytes - result) == -1) {
1792 				return (-1);
1793 			}
1794 		}
1795 	} else {
1796 		/*
1797 		 * Some data pending in the buffer.  We try to send
1798 		 * both buffer data and current message in one shot.
1799 		 */
1800 		struct iovec iov[3];
1801 		int i = iovFromBuffer(ct, &iov[0]);
1802 
1803 		iov[i].iov_base = buff;
1804 		iov[i].iov_len  = nBytes;
1805 
1806 		result = writev(ct->ct_fd, iov, i+1);
1807 		if (result == -1) {
1808 			if (errno == EWOULDBLOCK) {
1809 				/* No bytes sent */
1810 				result = 0;
1811 			} else {
1812 				return (-1);
1813 			}
1814 		}
1815 
1816 		/*
1817 		 * Add the bytes from the message
1818 		 * that we have not sent.
1819 		 */
1820 		if (result <= ct->ct_bufferPendingSize) {
1821 			/* No bytes from the message sent */
1822 			consumeFromBuffer(ct, result);
1823 			if (addInBuffer(ct, buff, nBytes) == -1) {
1824 				return (-1);
1825 			}
1826 		} else {
1827 			/*
1828 			 * Some bytes of the message are sent.
1829 			 * Compute the length of the message that has
1830 			 * been sent.
1831 			 */
1832 			int len = result - ct->ct_bufferPendingSize;
1833 
1834 			/* So, empty the buffer. */
1835 			ct->ct_bufferReadPtr = ct->ct_buffer;
1836 			ct->ct_bufferWritePtr = ct->ct_buffer;
1837 			ct->ct_bufferPendingSize = 0;
1838 
1839 			/* And add the remaining part of the message. */
1840 			if (len != nBytes) {
1841 				if (addInBuffer(ct, (char *)buff + len,
1842 					nBytes-len) == -1) {
1843 					return (-1);
1844 				}
1845 			}
1846 		}
1847 	}
1848 	return (nBytes);
1849 }
1850 
1851 static void
1852 flush_registered_clients()
1853 {
1854 	struct nb_reg_node *node;
1855 
1856 	if (LIST_ISEMPTY(nb_first)) {
1857 		return;
1858 	}
1859 
1860 	LIST_FOR_EACH(nb_first, node) {
1861 		do_flush(node->ct, RPC_CL_BLOCKING_FLUSH);
1862 	}
1863 }
1864 
1865 static int
1866 allocate_chunk()
1867 {
1868 #define	CHUNK_SIZE 16
1869 	struct nb_reg_node *chk = (struct nb_reg_node *)
1870 	    malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE);
1871 	struct nb_reg_node *n;
1872 	int i;
1873 
1874 	if (NULL == chk) {
1875 		return (-1);
1876 	}
1877 
1878 	n = chk;
1879 	for (i = 0; i < CHUNK_SIZE-1; ++i) {
1880 		n[i].next = &(n[i+1]);
1881 	}
1882 	n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free;
1883 	nb_free = chk;
1884 	return (0);
1885 }
1886 
1887 static int
1888 register_nb(struct ct_data *ct)
1889 {
1890 	struct nb_reg_node *node;
1891 
1892 	mutex_lock(&nb_list_mutex);
1893 
1894 	if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) {
1895 		mutex_unlock(&nb_list_mutex);
1896 		errno = ENOMEM;
1897 		return (-1);
1898 	}
1899 
1900 	if (!exit_handler_set) {
1901 		atexit(flush_registered_clients);
1902 		exit_handler_set = TRUE;
1903 	}
1904 	/* Get the first free node */
1905 	LIST_EXTRACT(nb_free, node);
1906 
1907 	node->ct = ct;
1908 
1909 	LIST_ADD(nb_first, node);
1910 	mutex_unlock(&nb_list_mutex);
1911 
1912 	return (0);
1913 }
1914 
1915 static int
1916 unregister_nb(struct ct_data *ct)
1917 {
1918 	struct nb_reg_node *node;
1919 
1920 	mutex_lock(&nb_list_mutex);
1921 	assert(! LIST_ISEMPTY(nb_first));
1922 
1923 	node = nb_first;
1924 	LIST_FOR_EACH(nb_first, node) {
1925 		if (node->next->ct == ct) {
1926 			/* Get the node to unregister. */
1927 			struct nb_reg_node *n = node->next;
1928 			node->next = n->next;
1929 
1930 			n->ct = NULL;
1931 			LIST_ADD(nb_free, n);
1932 			break;
1933 		}
1934 	}
1935 	mutex_unlock(&nb_list_mutex);
1936 	return (0);
1937 }
1938