xref: /titanic_51/usr/src/lib/libnsl/rpc/clnt_vc.c (revision 7e89328164e4b89906924cf4e0387ea13a77631b)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2015 Nexenta Systems, Inc.  All rights reserved.
24  */
25 
26 /*
27  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
28  * Use is subject to license terms.
29  */
30 
31 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
32 /* All Rights Reserved */
33 /*
34  * Portions of this source code were derived from Berkeley
35  * 4.3 BSD under license from the Regents of the University of
36  * California.
37  */
38 
39 /*
40  * clnt_vc.c
41  *
42  * Implements a connectionful client side RPC.
43  *
44  * Connectionful RPC supports 'batched calls'.
45  * A sequence of calls may be batched-up in a send buffer. The rpc call
46  * return immediately to the client even though the call was not necessarily
47  * sent. The batching occurs if the results' xdr routine is NULL (0) AND
48  * the rpc timeout value is zero (see clnt.h, rpc).
49  *
50  * Clients should NOT casually batch calls that in fact return results; that
51  * is the server side should be aware that a call is batched and not produce
52  * any return message. Batched calls that produce many result messages can
53  * deadlock (netlock) the client and the server....
54  */
55 
56 
57 #include "mt.h"
58 #include "rpc_mt.h"
59 #include <assert.h>
60 #include <rpc/rpc.h>
61 #include <errno.h>
62 #include <sys/byteorder.h>
63 #include <sys/mkdev.h>
64 #include <sys/poll.h>
65 #include <syslog.h>
66 #include <stdlib.h>
67 #include <unistd.h>
68 #include <netinet/tcp.h>
69 #include <limits.h>
70 
71 #define	MCALL_MSG_SIZE 24
72 #define	SECS_TO_NS(x)	((hrtime_t)(x) * 1000 * 1000 * 1000)
73 #define	MSECS_TO_NS(x)	((hrtime_t)(x) * 1000 * 1000)
74 #define	USECS_TO_NS(x)	((hrtime_t)(x) * 1000)
75 #define	NSECS_TO_MS(x)	((x) / 1000 / 1000)
76 #ifndef MIN
77 #define	MIN(a, b)	(((a) < (b)) ? (a) : (b))
78 #endif
79 
80 extern int __rpc_timeval_to_msec(struct timeval *);
81 extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *);
82 extern bool_t xdr_opaque_auth(XDR *, struct opaque_auth *);
83 extern bool_t __rpc_gss_wrap(AUTH *, char *, uint_t, XDR *, bool_t (*)(),
84 								caddr_t);
85 extern bool_t __rpc_gss_unwrap(AUTH *, XDR *, bool_t (*)(), caddr_t);
86 extern CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t,
87 		rpcvers_t, uint_t, uint_t, const struct timeval *);
88 
89 static struct clnt_ops	*clnt_vc_ops(void);
90 static int		read_vc(void *, caddr_t, int);
91 static int		write_vc(void *, caddr_t, int);
92 static int		t_rcvall(int, char *, int);
93 static bool_t		time_not_ok(struct timeval *);
94 
95 struct ct_data;
96 static bool_t		set_up_connection(int, struct netbuf *,
97 				struct ct_data *, const struct timeval *);
98 static bool_t		set_io_mode(struct ct_data *, int);
99 
100 /*
101  * Lock table handle used by various MT sync. routines
102  */
103 static mutex_t	vctbl_lock = DEFAULTMUTEX;
104 static void	*vctbl = NULL;
105 
106 static const char clnt_vc_errstr[] = "%s : %s";
107 static const char clnt_vc_str[] = "clnt_vc_create";
108 static const char clnt_read_vc_str[] = "read_vc";
109 static const char __no_mem_str[] = "out of memory";
110 static const char no_fcntl_getfl_str[] = "could not get status flags and modes";
111 static const char no_nonblock_str[] = "could not set transport blocking mode";
112 
113 /*
114  * Private data structure
115  */
116 struct ct_data {
117 	int		ct_fd;		/* connection's fd */
118 	bool_t		ct_closeit;	/* close it on destroy */
119 	int		ct_tsdu;	/* size of tsdu */
120 	int		ct_wait;	/* wait interval in milliseconds */
121 	bool_t		ct_waitset;	/* wait set by clnt_control? */
122 	struct netbuf	ct_addr;	/* remote addr */
123 	struct rpc_err	ct_error;
124 	char		ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */
125 	uint_t		ct_mpos;	/* pos after marshal */
126 	XDR		ct_xdrs;	/* XDR stream */
127 
128 	/* NON STANDARD INFO - 00-08-31 */
129 	bool_t		ct_is_oneway; /* True if the current call is oneway. */
130 	bool_t		ct_is_blocking;
131 	ushort_t	ct_io_mode;
132 	ushort_t	ct_blocking_mode;
133 	uint_t		ct_bufferSize; /* Total size of the buffer. */
134 	uint_t		ct_bufferPendingSize; /* Size of unsent data. */
135 	char 		*ct_buffer; /* Pointer to the buffer. */
136 	char 		*ct_bufferWritePtr; /* Ptr to the first free byte. */
137 	char 		*ct_bufferReadPtr; /* Ptr to the first byte of data. */
138 };
139 
140 struct nb_reg_node {
141 	struct nb_reg_node *next;
142 	struct ct_data *ct;
143 };
144 
145 static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first;
146 static struct nb_reg_node *nb_free  = (struct nb_reg_node *)&nb_free;
147 
148 static bool_t exit_handler_set = FALSE;
149 
150 static mutex_t nb_list_mutex = DEFAULTMUTEX;
151 
152 
153 /* Define some macros to manage the linked list. */
154 #define	LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l)
155 #define	LIST_CLR(l) (l = (struct nb_reg_node *)&l)
156 #define	LIST_ADD(l, node) (node->next = l->next, l = node)
157 #define	LIST_EXTRACT(l, node) (node = l, l = l->next)
158 #define	LIST_FOR_EACH(l, node) \
159 	for (node = l; node != (struct nb_reg_node *)&l; node = node->next)
160 
161 
162 /* Default size of the IO buffer used in non blocking mode */
163 #define	DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024)
164 
165 static int nb_send(struct ct_data *, void *, unsigned int);
166 static int do_flush(struct ct_data *, uint_t);
167 static bool_t set_flush_mode(struct ct_data *, int);
168 static bool_t set_blocking_connection(struct ct_data *, bool_t);
169 
170 static int register_nb(struct ct_data *);
171 static int unregister_nb(struct ct_data *);
172 
173 
174 /*
175  * Change the mode of the underlying fd.
176  */
177 static bool_t
178 set_blocking_connection(struct ct_data *ct, bool_t blocking)
179 {
180 	int flag;
181 
182 	/*
183 	 * If the underlying fd is already in the required mode,
184 	 * avoid the syscall.
185 	 */
186 	if (ct->ct_is_blocking == blocking)
187 		return (TRUE);
188 
189 	if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) {
190 		(void) syslog(LOG_ERR, "set_blocking_connection : %s",
191 		    no_fcntl_getfl_str);
192 		return (FALSE);
193 	}
194 
195 	flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK;
196 	if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) {
197 		(void) syslog(LOG_ERR, "set_blocking_connection : %s",
198 		    no_nonblock_str);
199 		return (FALSE);
200 	}
201 	ct->ct_is_blocking = blocking;
202 	return (TRUE);
203 }
204 
205 /*
206  * Create a client handle for a connection.
207  * Default options are set, which the user can change using clnt_control()'s.
208  * The rpc/vc package does buffering similar to stdio, so the client
209  * must pick send and receive buffer sizes, 0 => use the default.
210  * NB: fd is copied into a private area.
211  * NB: The rpch->cl_auth is set null authentication. Caller may wish to
212  * set this something more useful.
213  *
214  * fd should be open and bound.
215  */
216 CLIENT *
217 clnt_vc_create(const int fd, struct netbuf *svcaddr, const rpcprog_t prog,
218 	const rpcvers_t vers, const uint_t sendsz, const uint_t recvsz)
219 {
220 	return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz,
221 	    recvsz, NULL));
222 }
223 
224 /*
225  * This has the same definition as clnt_vc_create(), except it
226  * takes an additional parameter - a pointer to a timeval structure.
227  *
228  * Not a public interface. This is for clnt_create_timed,
229  * clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout
230  * value to control a tcp connection attempt.
231  * (for bug 4049792: clnt_create_timed does not time out)
232  *
233  * If tp is NULL, use default timeout to set up the connection.
234  */
235 CLIENT *
236 _clnt_vc_create_timed(int fd, struct netbuf *svcaddr, rpcprog_t prog,
237 	rpcvers_t vers, uint_t sendsz, uint_t recvsz, const struct timeval *tp)
238 {
239 	CLIENT *cl;			/* client handle */
240 	struct ct_data *ct;		/* private data */
241 	struct timeval now;
242 	struct rpc_msg call_msg;
243 	struct t_info tinfo;
244 	int flag;
245 
246 	cl = malloc(sizeof (*cl));
247 	if ((ct = malloc(sizeof (*ct))) != NULL)
248 		ct->ct_addr.buf = NULL;
249 
250 	if ((cl == NULL) || (ct == NULL)) {
251 		(void) syslog(LOG_ERR, clnt_vc_errstr,
252 		    clnt_vc_str, __no_mem_str);
253 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
254 		rpc_createerr.cf_error.re_errno = errno;
255 		rpc_createerr.cf_error.re_terrno = 0;
256 		goto err;
257 	}
258 
259 	/*
260 	 * The only use of vctbl_lock is for serializing the creation of
261 	 * vctbl. Once created the lock needs to be released so we don't
262 	 * hold it across the set_up_connection() call and end up with a
263 	 * bunch of threads stuck waiting for the mutex.
264 	 */
265 	sig_mutex_lock(&vctbl_lock);
266 
267 	if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) {
268 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
269 		rpc_createerr.cf_error.re_errno = errno;
270 		rpc_createerr.cf_error.re_terrno = 0;
271 		sig_mutex_unlock(&vctbl_lock);
272 		goto err;
273 	}
274 
275 	sig_mutex_unlock(&vctbl_lock);
276 
277 	ct->ct_io_mode = RPC_CL_BLOCKING;
278 	ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH;
279 
280 	ct->ct_buffer = NULL;	/* We allocate the buffer when needed. */
281 	ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE;
282 	ct->ct_bufferPendingSize = 0;
283 	ct->ct_bufferWritePtr = NULL;
284 	ct->ct_bufferReadPtr = NULL;
285 
286 	/* Check the current state of the fd. */
287 	if ((flag = fcntl(fd, F_GETFL, 0)) < 0) {
288 		(void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s",
289 		    no_fcntl_getfl_str);
290 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
291 		rpc_createerr.cf_error.re_terrno = errno;
292 		rpc_createerr.cf_error.re_errno = 0;
293 		goto err;
294 	}
295 	ct->ct_is_blocking = flag & O_NONBLOCK ? FALSE : TRUE;
296 
297 	if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) {
298 		goto err;
299 	}
300 
301 	/*
302 	 * Set up other members of private data struct
303 	 */
304 	ct->ct_fd = fd;
305 	/*
306 	 * The actual value will be set by clnt_call or clnt_control
307 	 */
308 	ct->ct_wait = 30000;
309 	ct->ct_waitset = FALSE;
310 	/*
311 	 * By default, closeit is always FALSE. It is users responsibility
312 	 * to do a t_close on it, else the user may use clnt_control
313 	 * to let clnt_destroy do it for him/her.
314 	 */
315 	ct->ct_closeit = FALSE;
316 
317 	/*
318 	 * Initialize call message
319 	 */
320 	(void) gettimeofday(&now, (struct timezone *)0);
321 	call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
322 	call_msg.rm_call.cb_prog = prog;
323 	call_msg.rm_call.cb_vers = vers;
324 
325 	/*
326 	 * pre-serialize the static part of the call msg and stash it away
327 	 */
328 	xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE);
329 	if (!xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
330 		goto err;
331 	}
332 	ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
333 	XDR_DESTROY(&(ct->ct_xdrs));
334 
335 	if (t_getinfo(fd, &tinfo) == -1) {
336 		rpc_createerr.cf_stat = RPC_TLIERROR;
337 		rpc_createerr.cf_error.re_terrno = t_errno;
338 		rpc_createerr.cf_error.re_errno = 0;
339 		goto err;
340 	}
341 	/*
342 	 * Find the receive and the send size
343 	 */
344 	sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu);
345 	recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu);
346 	if ((sendsz == 0) || (recvsz == 0)) {
347 		rpc_createerr.cf_stat = RPC_TLIERROR;
348 		rpc_createerr.cf_error.re_terrno = 0;
349 		rpc_createerr.cf_error.re_errno = 0;
350 		goto err;
351 	}
352 	ct->ct_tsdu = tinfo.tsdu;
353 	/*
354 	 * Create a client handle which uses xdrrec for serialization
355 	 * and authnone for authentication.
356 	 */
357 	ct->ct_xdrs.x_ops = NULL;
358 	xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct,
359 	    read_vc, write_vc);
360 	if (ct->ct_xdrs.x_ops == NULL) {
361 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
362 		rpc_createerr.cf_error.re_terrno = 0;
363 		rpc_createerr.cf_error.re_errno = ENOMEM;
364 		goto err;
365 	}
366 	cl->cl_ops = clnt_vc_ops();
367 	cl->cl_private = (caddr_t)ct;
368 	cl->cl_auth = authnone_create();
369 	cl->cl_tp = NULL;
370 	cl->cl_netid = NULL;
371 	return (cl);
372 
373 err:
374 	if (ct) {
375 		free(ct->ct_addr.buf);
376 		free(ct);
377 	}
378 	free(cl);
379 
380 	return (NULL);
381 }
382 
383 #define	TCPOPT_BUFSIZE 128
384 
385 /*
386  * Set tcp connection timeout value.
387  * Retun 0 for success, -1 for failure.
388  */
389 static int
390 _set_tcp_conntime(int fd, int optval)
391 {
392 	struct t_optmgmt req, res;
393 	struct opthdr *opt;
394 	int *ip;
395 	char buf[TCPOPT_BUFSIZE];
396 
397 	/* LINTED pointer cast */
398 	opt = (struct opthdr *)buf;
399 	opt->level =  IPPROTO_TCP;
400 	opt->name = TCP_CONN_ABORT_THRESHOLD;
401 	opt->len = sizeof (int);
402 
403 	req.flags = T_NEGOTIATE;
404 	req.opt.len = sizeof (struct opthdr) + opt->len;
405 	req.opt.buf = (char *)opt;
406 	/* LINTED pointer cast */
407 	ip = (int *)((char *)buf + sizeof (struct opthdr));
408 	*ip = optval;
409 
410 	res.flags = 0;
411 	res.opt.buf = (char *)buf;
412 	res.opt.maxlen = sizeof (buf);
413 	if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
414 		return (-1);
415 	}
416 	return (0);
417 }
418 
419 /*
420  * Get current tcp connection timeout value.
421  * Retun the timeout in milliseconds, or -1 for failure.
422  */
423 static int
424 _get_tcp_conntime(int fd)
425 {
426 	struct t_optmgmt req, res;
427 	struct opthdr *opt;
428 	int *ip, retval;
429 	char buf[TCPOPT_BUFSIZE];
430 
431 	/* LINTED pointer cast */
432 	opt = (struct opthdr *)buf;
433 	opt->level =  IPPROTO_TCP;
434 	opt->name = TCP_CONN_ABORT_THRESHOLD;
435 	opt->len = sizeof (int);
436 
437 	req.flags = T_CURRENT;
438 	req.opt.len = sizeof (struct opthdr) + opt->len;
439 	req.opt.buf = (char *)opt;
440 	/* LINTED pointer cast */
441 	ip = (int *)((char *)buf + sizeof (struct opthdr));
442 	*ip = 0;
443 
444 	res.flags = 0;
445 	res.opt.buf = (char *)buf;
446 	res.opt.maxlen = sizeof (buf);
447 	if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
448 		return (-1);
449 	}
450 
451 	/* LINTED pointer cast */
452 	ip = (int *)((char *)buf + sizeof (struct opthdr));
453 	retval = *ip;
454 	return (retval);
455 }
456 
457 static bool_t
458 set_up_connection(int fd, struct netbuf *svcaddr, struct ct_data *ct,
459     const struct timeval *tp)
460 {
461 	int state;
462 	struct t_call sndcallstr, *rcvcall;
463 	int nconnect;
464 	bool_t connected, do_rcv_connect;
465 	int curr_time = -1;
466 	hrtime_t start;
467 	hrtime_t tout;	/* timeout in nanoseconds (from tp) */
468 
469 	ct->ct_addr.len = 0;
470 	state = t_getstate(fd);
471 	if (state == -1) {
472 		rpc_createerr.cf_stat = RPC_TLIERROR;
473 		rpc_createerr.cf_error.re_errno = 0;
474 		rpc_createerr.cf_error.re_terrno = t_errno;
475 		return (FALSE);
476 	}
477 
478 	switch (state) {
479 	case T_IDLE:
480 		if (svcaddr == NULL) {
481 			rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
482 			return (FALSE);
483 		}
484 		/*
485 		 * Connect only if state is IDLE and svcaddr known
486 		 */
487 /* LINTED pointer alignment */
488 		rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR);
489 		if (rcvcall == NULL) {
490 			rpc_createerr.cf_stat = RPC_TLIERROR;
491 			rpc_createerr.cf_error.re_terrno = t_errno;
492 			rpc_createerr.cf_error.re_errno = errno;
493 			return (FALSE);
494 		}
495 		rcvcall->udata.maxlen = 0;
496 		sndcallstr.addr = *svcaddr;
497 		sndcallstr.opt.len = 0;
498 		sndcallstr.udata.len = 0;
499 		/*
500 		 * Even NULL could have sufficed for rcvcall, because
501 		 * the address returned is same for all cases except
502 		 * for the gateway case, and hence required.
503 		 */
504 		connected = FALSE;
505 		do_rcv_connect = FALSE;
506 
507 		/*
508 		 * If there is a timeout value specified, we will try to
509 		 * reset the tcp connection timeout. If the transport does
510 		 * not support the TCP_CONN_ABORT_THRESHOLD option or fails
511 		 * for other reason, default timeout will be used.
512 		 */
513 		if (tp != NULL) {
514 			start = gethrtime();
515 
516 			/*
517 			 * Calculate the timeout in nanoseconds
518 			 */
519 			tout = SECS_TO_NS(tp->tv_sec) +
520 			    USECS_TO_NS(tp->tv_usec);
521 			curr_time = _get_tcp_conntime(fd);
522 		}
523 
524 		for (nconnect = 0; nconnect < 3; nconnect++) {
525 			if (tp != NULL) {
526 				/*
527 				 * Calculate the elapsed time
528 				 */
529 				hrtime_t elapsed = gethrtime() - start;
530 				if (elapsed >= tout)
531 					break;
532 
533 				if (curr_time != -1) {
534 					int ms;
535 
536 					/*
537 					 * TCP_CONN_ABORT_THRESHOLD takes int
538 					 * value in milliseconds.  Make sure we
539 					 * do not overflow.
540 					 */
541 					if (NSECS_TO_MS(tout - elapsed) >=
542 					    INT_MAX) {
543 						ms = INT_MAX;
544 					} else {
545 						ms = (int)
546 						    NSECS_TO_MS(tout - elapsed);
547 						if (MSECS_TO_NS(ms) !=
548 						    tout - elapsed)
549 							ms++;
550 					}
551 
552 					(void) _set_tcp_conntime(fd, ms);
553 				}
554 			}
555 
556 			if (t_connect(fd, &sndcallstr, rcvcall) != -1) {
557 				connected = TRUE;
558 				break;
559 			}
560 			if (t_errno == TLOOK) {
561 				switch (t_look(fd)) {
562 				case T_DISCONNECT:
563 					(void) t_rcvdis(fd, (struct
564 					    t_discon *) NULL);
565 					break;
566 				default:
567 					break;
568 				}
569 			} else if (!(t_errno == TSYSERR && errno == EINTR)) {
570 				break;
571 			}
572 			if ((state = t_getstate(fd)) == T_OUTCON) {
573 				do_rcv_connect = TRUE;
574 				break;
575 			}
576 			if (state != T_IDLE) {
577 				break;
578 			}
579 		}
580 		if (do_rcv_connect) {
581 			do {
582 				if (t_rcvconnect(fd, rcvcall) != -1) {
583 					connected = TRUE;
584 					break;
585 				}
586 			} while (t_errno == TSYSERR && errno == EINTR);
587 		}
588 
589 		/*
590 		 * Set the connection timeout back to its old value.
591 		 */
592 		if (curr_time != -1) {
593 			(void) _set_tcp_conntime(fd, curr_time);
594 		}
595 
596 		if (!connected) {
597 			rpc_createerr.cf_stat = RPC_TLIERROR;
598 			rpc_createerr.cf_error.re_terrno = t_errno;
599 			rpc_createerr.cf_error.re_errno = errno;
600 			(void) t_free((char *)rcvcall, T_CALL);
601 			return (FALSE);
602 		}
603 
604 		/* Free old area if allocated */
605 		if (ct->ct_addr.buf)
606 			free(ct->ct_addr.buf);
607 		ct->ct_addr = rcvcall->addr;	/* To get the new address */
608 		/* So that address buf does not get freed */
609 		rcvcall->addr.buf = NULL;
610 		(void) t_free((char *)rcvcall, T_CALL);
611 		break;
612 	case T_DATAXFER:
613 	case T_OUTCON:
614 		if (svcaddr == NULL) {
615 			/*
616 			 * svcaddr could also be NULL in cases where the
617 			 * client is already bound and connected.
618 			 */
619 			ct->ct_addr.len = 0;
620 		} else {
621 			ct->ct_addr.buf = malloc(svcaddr->len);
622 			if (ct->ct_addr.buf == NULL) {
623 				(void) syslog(LOG_ERR, clnt_vc_errstr,
624 				    clnt_vc_str, __no_mem_str);
625 				rpc_createerr.cf_stat = RPC_SYSTEMERROR;
626 				rpc_createerr.cf_error.re_errno = errno;
627 				rpc_createerr.cf_error.re_terrno = 0;
628 				return (FALSE);
629 			}
630 			(void) memcpy(ct->ct_addr.buf, svcaddr->buf,
631 			    (size_t)svcaddr->len);
632 			ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len;
633 		}
634 		break;
635 	default:
636 		rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
637 		return (FALSE);
638 	}
639 	return (TRUE);
640 }
641 
642 static enum clnt_stat
643 clnt_vc_call(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr,
644 	xdrproc_t xdr_results, caddr_t results_ptr, struct timeval timeout)
645 {
646 /* LINTED pointer alignment */
647 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
648 	XDR *xdrs = &(ct->ct_xdrs);
649 	struct rpc_msg reply_msg;
650 	uint32_t x_id;
651 /* LINTED pointer alignment */
652 	uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);	/* yuk */
653 	bool_t shipnow;
654 	int refreshes = 2;
655 
656 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
657 		rpc_callerr.re_status = RPC_FAILED;
658 		rpc_callerr.re_errno = errno;
659 		rpc_fd_unlock(vctbl, ct->ct_fd);
660 		return (RPC_FAILED);
661 	}
662 
663 	ct->ct_is_oneway = FALSE;
664 	if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
665 		if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) {
666 			rpc_fd_unlock(vctbl, ct->ct_fd);
667 			return (RPC_FAILED);  /* XXX */
668 		}
669 	}
670 
671 	if (!ct->ct_waitset) {
672 		/* If time is not within limits, we ignore it. */
673 		if (time_not_ok(&timeout) == FALSE)
674 			ct->ct_wait = __rpc_timeval_to_msec(&timeout);
675 	} else {
676 		timeout.tv_sec = (ct->ct_wait / 1000);
677 		timeout.tv_usec = (ct->ct_wait % 1000) * 1000;
678 	}
679 
680 	shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) &&
681 	    (timeout.tv_usec == 0)) ? FALSE : TRUE;
682 call_again:
683 	xdrs->x_op = XDR_ENCODE;
684 	rpc_callerr.re_status = RPC_SUCCESS;
685 	/*
686 	 * Due to little endian byte order, it is necessary to convert to host
687 	 * format before decrementing xid.
688 	 */
689 	x_id = ntohl(*msg_x_id) - 1;
690 	*msg_x_id = htonl(x_id);
691 
692 	if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
693 		if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
694 		    (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
695 		    (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
696 		    (!xdr_args(xdrs, args_ptr))) {
697 			if (rpc_callerr.re_status == RPC_SUCCESS)
698 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
699 			(void) xdrrec_endofrecord(xdrs, TRUE);
700 			rpc_fd_unlock(vctbl, ct->ct_fd);
701 			return (rpc_callerr.re_status);
702 		}
703 	} else {
704 /* LINTED pointer alignment */
705 		uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
706 		IXDR_PUT_U_INT32(u, proc);
707 		if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
708 		    ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
709 			if (rpc_callerr.re_status == RPC_SUCCESS)
710 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
711 			(void) xdrrec_endofrecord(xdrs, TRUE);
712 			rpc_fd_unlock(vctbl, ct->ct_fd);
713 			return (rpc_callerr.re_status);
714 		}
715 	}
716 	if (!xdrrec_endofrecord(xdrs, shipnow)) {
717 		rpc_fd_unlock(vctbl, ct->ct_fd);
718 		return (rpc_callerr.re_status = RPC_CANTSEND);
719 	}
720 	if (!shipnow) {
721 		rpc_fd_unlock(vctbl, ct->ct_fd);
722 		return (RPC_SUCCESS);
723 	}
724 	/*
725 	 * Hack to provide rpc-based message passing
726 	 */
727 	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
728 		rpc_fd_unlock(vctbl, ct->ct_fd);
729 		return (rpc_callerr.re_status = RPC_TIMEDOUT);
730 	}
731 
732 
733 	/*
734 	 * Keep receiving until we get a valid transaction id
735 	 */
736 	xdrs->x_op = XDR_DECODE;
737 	for (;;) {
738 		reply_msg.acpted_rply.ar_verf = _null_auth;
739 		reply_msg.acpted_rply.ar_results.where = NULL;
740 		reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
741 		if (!xdrrec_skiprecord(xdrs)) {
742 			rpc_fd_unlock(vctbl, ct->ct_fd);
743 			return (rpc_callerr.re_status);
744 		}
745 		/* now decode and validate the response header */
746 		if (!xdr_replymsg(xdrs, &reply_msg)) {
747 			if (rpc_callerr.re_status == RPC_SUCCESS)
748 				continue;
749 			rpc_fd_unlock(vctbl, ct->ct_fd);
750 			return (rpc_callerr.re_status);
751 		}
752 		if (reply_msg.rm_xid == x_id)
753 			break;
754 	}
755 
756 	/*
757 	 * process header
758 	 */
759 	if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
760 	    (reply_msg.acpted_rply.ar_stat == SUCCESS))
761 		rpc_callerr.re_status = RPC_SUCCESS;
762 	else
763 		__seterr_reply(&reply_msg, &(rpc_callerr));
764 
765 	if (rpc_callerr.re_status == RPC_SUCCESS) {
766 		if (!AUTH_VALIDATE(cl->cl_auth,
767 		    &reply_msg.acpted_rply.ar_verf)) {
768 			rpc_callerr.re_status = RPC_AUTHERROR;
769 			rpc_callerr.re_why = AUTH_INVALIDRESP;
770 		} else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
771 			if (!(*xdr_results)(xdrs, results_ptr)) {
772 				if (rpc_callerr.re_status == RPC_SUCCESS)
773 					rpc_callerr.re_status =
774 					    RPC_CANTDECODERES;
775 			}
776 		} else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results,
777 		    results_ptr)) {
778 			if (rpc_callerr.re_status == RPC_SUCCESS)
779 				rpc_callerr.re_status = RPC_CANTDECODERES;
780 		}
781 	}	/* end successful completion */
782 	/*
783 	 * If unsuccesful AND error is an authentication error
784 	 * then refresh credentials and try again, else break
785 	 */
786 	else if (rpc_callerr.re_status == RPC_AUTHERROR) {
787 		/* maybe our credentials need to be refreshed ... */
788 		if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
789 			goto call_again;
790 		else
791 			/*
792 			 * We are setting rpc_callerr here given that libnsl
793 			 * is not reentrant thereby reinitializing the TSD.
794 			 * If not set here then success could be returned even
795 			 * though refresh failed.
796 			 */
797 			rpc_callerr.re_status = RPC_AUTHERROR;
798 	} /* end of unsuccessful completion */
799 	/* free verifier ... */
800 	if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED &&
801 	    reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
802 		xdrs->x_op = XDR_FREE;
803 		(void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
804 	}
805 	rpc_fd_unlock(vctbl, ct->ct_fd);
806 	return (rpc_callerr.re_status);
807 }
808 
809 static enum clnt_stat
810 clnt_vc_send(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr)
811 {
812 /* LINTED pointer alignment */
813 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
814 	XDR *xdrs = &(ct->ct_xdrs);
815 	uint32_t x_id;
816 /* LINTED pointer alignment */
817 	uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);	/* yuk */
818 
819 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
820 		rpc_callerr.re_status = RPC_FAILED;
821 		rpc_callerr.re_errno = errno;
822 		rpc_fd_unlock(vctbl, ct->ct_fd);
823 		return (RPC_FAILED);
824 	}
825 
826 	ct->ct_is_oneway = TRUE;
827 
828 	xdrs->x_op = XDR_ENCODE;
829 	rpc_callerr.re_status = RPC_SUCCESS;
830 	/*
831 	 * Due to little endian byte order, it is necessary to convert to host
832 	 * format before decrementing xid.
833 	 */
834 	x_id = ntohl(*msg_x_id) - 1;
835 	*msg_x_id = htonl(x_id);
836 
837 	if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
838 		if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
839 		    (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
840 		    (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
841 		    (!xdr_args(xdrs, args_ptr))) {
842 			if (rpc_callerr.re_status == RPC_SUCCESS)
843 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
844 			(void) xdrrec_endofrecord(xdrs, TRUE);
845 			rpc_fd_unlock(vctbl, ct->ct_fd);
846 			return (rpc_callerr.re_status);
847 		}
848 	} else {
849 /* LINTED pointer alignment */
850 		uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
851 		IXDR_PUT_U_INT32(u, proc);
852 		if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
853 		    ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
854 			if (rpc_callerr.re_status == RPC_SUCCESS)
855 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
856 			(void) xdrrec_endofrecord(xdrs, TRUE);
857 			rpc_fd_unlock(vctbl, ct->ct_fd);
858 			return (rpc_callerr.re_status);
859 		}
860 	}
861 
862 	/*
863 	 * Do not need to check errors, as the following code does
864 	 * not depend on the successful completion of the call.
865 	 * An error, if any occurs, is reported through
866 	 * rpc_callerr.re_status.
867 	 */
868 	(void) xdrrec_endofrecord(xdrs, TRUE);
869 
870 	rpc_fd_unlock(vctbl, ct->ct_fd);
871 	return (rpc_callerr.re_status);
872 }
873 
874 /* ARGSUSED */
875 static void
876 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
877 {
878 	*errp = rpc_callerr;
879 }
880 
881 static bool_t
882 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, caddr_t res_ptr)
883 {
884 /* LINTED pointer alignment */
885 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
886 	XDR *xdrs = &(ct->ct_xdrs);
887 	bool_t stat;
888 
889 	(void) rpc_fd_lock(vctbl, ct->ct_fd);
890 	xdrs->x_op = XDR_FREE;
891 	stat = (*xdr_res)(xdrs, res_ptr);
892 	rpc_fd_unlock(vctbl, ct->ct_fd);
893 	return (stat);
894 }
895 
896 static void
897 clnt_vc_abort(void)
898 {
899 }
900 
901 /*ARGSUSED*/
902 static bool_t
903 clnt_vc_control(CLIENT *cl, int request, char *info)
904 {
905 	bool_t ret;
906 /* LINTED pointer alignment */
907 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
908 
909 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
910 		rpc_fd_unlock(vctbl, ct->ct_fd);
911 		return (FALSE);
912 	}
913 
914 	switch (request) {
915 	case CLSET_FD_CLOSE:
916 		ct->ct_closeit = TRUE;
917 		rpc_fd_unlock(vctbl, ct->ct_fd);
918 		return (TRUE);
919 	case CLSET_FD_NCLOSE:
920 		ct->ct_closeit = FALSE;
921 		rpc_fd_unlock(vctbl, ct->ct_fd);
922 		return (TRUE);
923 	case CLFLUSH:
924 		if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
925 			int res;
926 			res = do_flush(ct, (info == NULL ||
927 			    /* LINTED pointer cast */
928 			    *(int *)info == RPC_CL_DEFAULT_FLUSH)?
929 			    /* LINTED pointer cast */
930 			    ct->ct_blocking_mode: *(int *)info);
931 			ret = (0 == res);
932 		} else {
933 			ret = FALSE;
934 		}
935 		rpc_fd_unlock(vctbl, ct->ct_fd);
936 		return (ret);
937 	}
938 
939 	/* for other requests which use info */
940 	if (info == NULL) {
941 		rpc_fd_unlock(vctbl, ct->ct_fd);
942 		return (FALSE);
943 	}
944 	switch (request) {
945 	case CLSET_TIMEOUT:
946 /* LINTED pointer alignment */
947 		if (time_not_ok((struct timeval *)info)) {
948 			rpc_fd_unlock(vctbl, ct->ct_fd);
949 			return (FALSE);
950 		}
951 /* LINTED pointer alignment */
952 		ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info);
953 		ct->ct_waitset = TRUE;
954 		break;
955 	case CLGET_TIMEOUT:
956 /* LINTED pointer alignment */
957 		((struct timeval *)info)->tv_sec = ct->ct_wait / 1000;
958 /* LINTED pointer alignment */
959 		((struct timeval *)info)->tv_usec = (ct->ct_wait % 1000) * 1000;
960 		break;
961 	case CLGET_SERVER_ADDR:	/* For compatibility only */
962 		(void) memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len);
963 		break;
964 	case CLGET_FD:
965 /* LINTED pointer alignment */
966 		*(int *)info = ct->ct_fd;
967 		break;
968 	case CLGET_SVC_ADDR:
969 		/* The caller should not free this memory area */
970 /* LINTED pointer alignment */
971 		*(struct netbuf *)info = ct->ct_addr;
972 		break;
973 	case CLSET_SVC_ADDR:		/* set to new address */
974 #ifdef undef
975 		/*
976 		 * XXX: once the t_snddis(), followed by t_connect() starts to
977 		 * work, this ifdef should be removed.  CLIENT handle reuse
978 		 * would then be possible for COTS as well.
979 		 */
980 		if (t_snddis(ct->ct_fd, NULL) == -1) {
981 			rpc_createerr.cf_stat = RPC_TLIERROR;
982 			rpc_createerr.cf_error.re_terrno = t_errno;
983 			rpc_createerr.cf_error.re_errno = errno;
984 			rpc_fd_unlock(vctbl, ct->ct_fd);
985 			return (FALSE);
986 		}
987 		ret = set_up_connection(ct->ct_fd, (struct netbuf *)info,
988 		    ct, NULL);
989 		rpc_fd_unlock(vctbl, ct->ct_fd);
990 		return (ret);
991 #else
992 		rpc_fd_unlock(vctbl, ct->ct_fd);
993 		return (FALSE);
994 #endif
995 	case CLGET_XID:
996 		/*
997 		 * use the knowledge that xid is the
998 		 * first element in the call structure
999 		 * This will get the xid of the PREVIOUS call
1000 		 */
1001 /* LINTED pointer alignment */
1002 		*(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall);
1003 		break;
1004 	case CLSET_XID:
1005 		/* This will set the xid of the NEXT call */
1006 /* LINTED pointer alignment */
1007 		*(uint32_t *)ct->ct_mcall =  htonl(*(uint32_t *)info + 1);
1008 		/* increment by 1 as clnt_vc_call() decrements once */
1009 		break;
1010 	case CLGET_VERS:
1011 		/*
1012 		 * This RELIES on the information that, in the call body,
1013 		 * the version number field is the fifth field from the
1014 		 * begining of the RPC header. MUST be changed if the
1015 		 * call_struct is changed
1016 		 */
1017 /* LINTED pointer alignment */
1018 		*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
1019 		    4 * BYTES_PER_XDR_UNIT));
1020 		break;
1021 
1022 	case CLSET_VERS:
1023 /* LINTED pointer alignment */
1024 		*(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) =
1025 /* LINTED pointer alignment */
1026 		    htonl(*(uint32_t *)info);
1027 		break;
1028 
1029 	case CLGET_PROG:
1030 		/*
1031 		 * This RELIES on the information that, in the call body,
1032 		 * the program number field is the fourth field from the
1033 		 * begining of the RPC header. MUST be changed if the
1034 		 * call_struct is changed
1035 		 */
1036 /* LINTED pointer alignment */
1037 		*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
1038 		    3 * BYTES_PER_XDR_UNIT));
1039 		break;
1040 
1041 	case CLSET_PROG:
1042 /* LINTED pointer alignment */
1043 		*(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) =
1044 /* LINTED pointer alignment */
1045 		    htonl(*(uint32_t *)info);
1046 		break;
1047 
1048 	case CLSET_IO_MODE:
1049 		/* LINTED pointer cast */
1050 		if (!set_io_mode(ct, *(int *)info)) {
1051 			rpc_fd_unlock(vctbl, ct->ct_fd);
1052 			return (FALSE);
1053 		}
1054 		break;
1055 	case CLSET_FLUSH_MODE:
1056 		/* Set a specific FLUSH_MODE */
1057 		/* LINTED pointer cast */
1058 		if (!set_flush_mode(ct, *(int *)info)) {
1059 			rpc_fd_unlock(vctbl, ct->ct_fd);
1060 			return (FALSE);
1061 		}
1062 		break;
1063 	case CLGET_FLUSH_MODE:
1064 		/* LINTED pointer cast */
1065 		*(rpcflushmode_t *)info = ct->ct_blocking_mode;
1066 		break;
1067 
1068 	case CLGET_IO_MODE:
1069 		/* LINTED pointer cast */
1070 		*(rpciomode_t *)info = ct->ct_io_mode;
1071 		break;
1072 
1073 	case CLGET_CURRENT_REC_SIZE:
1074 		/*
1075 		 * Returns the current amount of memory allocated
1076 		 * to pending requests
1077 		 */
1078 		/* LINTED pointer cast */
1079 		*(int *)info = ct->ct_bufferPendingSize;
1080 		break;
1081 
1082 	case CLSET_CONNMAXREC_SIZE:
1083 		/* Cannot resize the buffer if it is used. */
1084 		if (ct->ct_bufferPendingSize != 0) {
1085 			rpc_fd_unlock(vctbl, ct->ct_fd);
1086 			return (FALSE);
1087 		}
1088 		/*
1089 		 * If the new size is equal to the current size,
1090 		 * there is nothing to do.
1091 		 */
1092 		/* LINTED pointer cast */
1093 		if (ct->ct_bufferSize == *(uint_t *)info)
1094 			break;
1095 
1096 		/* LINTED pointer cast */
1097 		ct->ct_bufferSize = *(uint_t *)info;
1098 		if (ct->ct_buffer) {
1099 			free(ct->ct_buffer);
1100 			ct->ct_buffer = NULL;
1101 			ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL;
1102 		}
1103 		break;
1104 
1105 	case CLGET_CONNMAXREC_SIZE:
1106 		/*
1107 		 * Returns the size of buffer allocated
1108 		 * to pending requests
1109 		 */
1110 		/* LINTED pointer cast */
1111 		*(uint_t *)info = ct->ct_bufferSize;
1112 		break;
1113 
1114 	default:
1115 		rpc_fd_unlock(vctbl, ct->ct_fd);
1116 		return (FALSE);
1117 	}
1118 	rpc_fd_unlock(vctbl, ct->ct_fd);
1119 	return (TRUE);
1120 }
1121 
1122 static void
1123 clnt_vc_destroy(CLIENT *cl)
1124 {
1125 /* LINTED pointer alignment */
1126 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
1127 	int ct_fd = ct->ct_fd;
1128 
1129 	(void) rpc_fd_lock(vctbl, ct_fd);
1130 
1131 	if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1132 		(void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1133 		(void) unregister_nb(ct);
1134 	}
1135 
1136 	if (ct->ct_closeit)
1137 		(void) t_close(ct_fd);
1138 	XDR_DESTROY(&(ct->ct_xdrs));
1139 	if (ct->ct_addr.buf)
1140 		free(ct->ct_addr.buf);
1141 	free(ct);
1142 	if (cl->cl_netid && cl->cl_netid[0])
1143 		free(cl->cl_netid);
1144 	if (cl->cl_tp && cl->cl_tp[0])
1145 		free(cl->cl_tp);
1146 	free(cl);
1147 	rpc_fd_unlock(vctbl, ct_fd);
1148 }
1149 
1150 /*
1151  * Interface between xdr serializer and vc connection.
1152  * Behaves like the system calls, read & write, but keeps some error state
1153  * around for the rpc level.
1154  */
1155 static int
1156 read_vc(void *ct_tmp, caddr_t buf, int len)
1157 {
1158 	static pthread_key_t pfdp_key = PTHREAD_ONCE_KEY_NP;
1159 	struct pollfd *pfdp;
1160 	int npfd;		/* total number of pfdp allocated */
1161 	struct ct_data *ct = ct_tmp;
1162 	struct timeval starttime;
1163 	struct timeval curtime;
1164 	int poll_time;
1165 	int delta;
1166 
1167 	if (len == 0)
1168 		return (0);
1169 
1170 	/*
1171 	 * Allocate just one the first time.  thr_get_storage() may
1172 	 * return a larger buffer, left over from the last time we were
1173 	 * here, but that's OK.  realloc() will deal with it properly.
1174 	 */
1175 	npfd = 1;
1176 	pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free);
1177 	if (pfdp == NULL) {
1178 		(void) syslog(LOG_ERR, clnt_vc_errstr,
1179 		    clnt_read_vc_str, __no_mem_str);
1180 		rpc_callerr.re_status = RPC_SYSTEMERROR;
1181 		rpc_callerr.re_errno = errno;
1182 		rpc_callerr.re_terrno = 0;
1183 		return (-1);
1184 	}
1185 
1186 	/*
1187 	 *	N.B.:  slot 0 in the pollfd array is reserved for the file
1188 	 *	descriptor we're really interested in (as opposed to the
1189 	 *	callback descriptors).
1190 	 */
1191 	pfdp[0].fd = ct->ct_fd;
1192 	pfdp[0].events = MASKVAL;
1193 	pfdp[0].revents = 0;
1194 	poll_time = ct->ct_wait;
1195 	if (gettimeofday(&starttime, NULL) == -1) {
1196 		syslog(LOG_ERR, "Unable to get time of day: %m");
1197 		return (-1);
1198 	}
1199 
1200 	for (;;) {
1201 		extern void (*_svc_getreqset_proc)();
1202 		extern pollfd_t *svc_pollfd;
1203 		extern int svc_max_pollfd;
1204 		int fds;
1205 
1206 		/* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */
1207 
1208 		if (_svc_getreqset_proc) {
1209 			sig_rw_rdlock(&svc_fd_lock);
1210 
1211 			/* reallocate pfdp to svc_max_pollfd +1 */
1212 			if (npfd != (svc_max_pollfd + 1)) {
1213 				struct pollfd *tmp_pfdp = realloc(pfdp,
1214 				    sizeof (struct pollfd) *
1215 				    (svc_max_pollfd + 1));
1216 				if (tmp_pfdp == NULL) {
1217 					sig_rw_unlock(&svc_fd_lock);
1218 					(void) syslog(LOG_ERR, clnt_vc_errstr,
1219 					    clnt_read_vc_str, __no_mem_str);
1220 					rpc_callerr.re_status = RPC_SYSTEMERROR;
1221 					rpc_callerr.re_errno = errno;
1222 					rpc_callerr.re_terrno = 0;
1223 					return (-1);
1224 				}
1225 
1226 				pfdp = tmp_pfdp;
1227 				npfd = svc_max_pollfd + 1;
1228 				(void) pthread_setspecific(pfdp_key, pfdp);
1229 			}
1230 			if (npfd > 1)
1231 				(void) memcpy(&pfdp[1], svc_pollfd,
1232 				    sizeof (struct pollfd) * (npfd - 1));
1233 
1234 			sig_rw_unlock(&svc_fd_lock);
1235 		} else {
1236 			npfd = 1;	/* don't forget about pfdp[0] */
1237 		}
1238 
1239 		switch (fds = poll(pfdp, npfd, poll_time)) {
1240 		case 0:
1241 			rpc_callerr.re_status = RPC_TIMEDOUT;
1242 			return (-1);
1243 
1244 		case -1:
1245 			if (errno != EINTR)
1246 				continue;
1247 			else {
1248 				/*
1249 				 * interrupted by another signal,
1250 				 * update time_waited
1251 				 */
1252 
1253 				if (gettimeofday(&curtime, NULL) == -1) {
1254 					syslog(LOG_ERR,
1255 					    "Unable to get time of day:  %m");
1256 					errno = 0;
1257 					continue;
1258 				};
1259 				delta = (curtime.tv_sec -
1260 				    starttime.tv_sec) * 1000 +
1261 				    (curtime.tv_usec -
1262 				    starttime.tv_usec) / 1000;
1263 				poll_time -= delta;
1264 				if (poll_time < 0) {
1265 					rpc_callerr.re_status = RPC_TIMEDOUT;
1266 					errno = 0;
1267 					return (-1);
1268 				} else {
1269 					errno = 0; /* reset it */
1270 					continue;
1271 				}
1272 			}
1273 		}
1274 
1275 		if (pfdp[0].revents == 0) {
1276 			/* must be for server side of the house */
1277 			(*_svc_getreqset_proc)(&pfdp[1], fds);
1278 			continue;	/* do poll again */
1279 		}
1280 
1281 		if (pfdp[0].revents & POLLNVAL) {
1282 			rpc_callerr.re_status = RPC_CANTRECV;
1283 			/*
1284 			 *	Note:  we're faking errno here because we
1285 			 *	previously would have expected select() to
1286 			 *	return -1 with errno EBADF.  Poll(BA_OS)
1287 			 *	returns 0 and sets the POLLNVAL revents flag
1288 			 *	instead.
1289 			 */
1290 			rpc_callerr.re_errno = errno = EBADF;
1291 			return (-1);
1292 		}
1293 
1294 		if (pfdp[0].revents & (POLLERR | POLLHUP)) {
1295 			rpc_callerr.re_status = RPC_CANTRECV;
1296 			rpc_callerr.re_errno = errno = EPIPE;
1297 			return (-1);
1298 		}
1299 		break;
1300 	}
1301 
1302 	switch (len = t_rcvall(ct->ct_fd, buf, len)) {
1303 	case 0:
1304 		/* premature eof */
1305 		rpc_callerr.re_errno = ENOLINK;
1306 		rpc_callerr.re_terrno = 0;
1307 		rpc_callerr.re_status = RPC_CANTRECV;
1308 		len = -1;	/* it's really an error */
1309 		break;
1310 
1311 	case -1:
1312 		rpc_callerr.re_terrno = t_errno;
1313 		rpc_callerr.re_errno = 0;
1314 		rpc_callerr.re_status = RPC_CANTRECV;
1315 		break;
1316 	}
1317 	return (len);
1318 }
1319 
1320 static int
1321 write_vc(void *ct_tmp, caddr_t buf, int len)
1322 {
1323 	int i, cnt;
1324 	struct ct_data *ct = ct_tmp;
1325 	int flag;
1326 	int maxsz;
1327 
1328 	maxsz = ct->ct_tsdu;
1329 
1330 	/* Handle the non-blocking mode */
1331 	if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1332 		/*
1333 		 * Test a special case here. If the length of the current
1334 		 * write is greater than the transport data unit, and the
1335 		 * mode is non blocking, we return RPC_CANTSEND.
1336 		 * XXX  this is not very clean.
1337 		 */
1338 		if (maxsz > 0 && len > maxsz) {
1339 			rpc_callerr.re_terrno = errno;
1340 			rpc_callerr.re_errno = 0;
1341 			rpc_callerr.re_status = RPC_CANTSEND;
1342 			return (-1);
1343 		}
1344 
1345 		len = nb_send(ct, buf, (unsigned)len);
1346 		if (len == -1) {
1347 			rpc_callerr.re_terrno = errno;
1348 			rpc_callerr.re_errno = 0;
1349 			rpc_callerr.re_status = RPC_CANTSEND;
1350 		} else if (len == -2) {
1351 			rpc_callerr.re_terrno = 0;
1352 			rpc_callerr.re_errno = 0;
1353 			rpc_callerr.re_status = RPC_CANTSTORE;
1354 		}
1355 		return (len);
1356 	}
1357 
1358 	if ((maxsz == 0) || (maxsz == -1)) {
1359 		/*
1360 		 * T_snd may return -1 for error on connection (connection
1361 		 * needs to be repaired/closed, and -2 for flow-control
1362 		 * handling error (no operation to do, just wait and call
1363 		 * T_Flush()).
1364 		 */
1365 		if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) {
1366 			rpc_callerr.re_terrno = t_errno;
1367 			rpc_callerr.re_errno = 0;
1368 			rpc_callerr.re_status = RPC_CANTSEND;
1369 		}
1370 		return (len);
1371 	}
1372 
1373 	/*
1374 	 * This for those transports which have a max size for data.
1375 	 */
1376 	for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) {
1377 		flag = cnt > maxsz ? T_MORE : 0;
1378 		if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz),
1379 		    flag)) == -1) {
1380 			rpc_callerr.re_terrno = t_errno;
1381 			rpc_callerr.re_errno = 0;
1382 			rpc_callerr.re_status = RPC_CANTSEND;
1383 			return (-1);
1384 		}
1385 	}
1386 	return (len);
1387 }
1388 
1389 /*
1390  * Receive the required bytes of data, even if it is fragmented.
1391  */
1392 static int
1393 t_rcvall(int fd, char *buf, int len)
1394 {
1395 	int moreflag;
1396 	int final = 0;
1397 	int res;
1398 
1399 	do {
1400 		moreflag = 0;
1401 		res = t_rcv(fd, buf, (unsigned)len, &moreflag);
1402 		if (res == -1) {
1403 			if (t_errno == TLOOK)
1404 				switch (t_look(fd)) {
1405 				case T_DISCONNECT:
1406 					(void) t_rcvdis(fd, NULL);
1407 					(void) t_snddis(fd, NULL);
1408 					return (-1);
1409 				case T_ORDREL:
1410 				/* Received orderly release indication */
1411 					(void) t_rcvrel(fd);
1412 				/* Send orderly release indicator */
1413 					(void) t_sndrel(fd);
1414 					return (-1);
1415 				default:
1416 					return (-1);
1417 				}
1418 		} else if (res == 0) {
1419 			return (0);
1420 		}
1421 		final += res;
1422 		buf += res;
1423 		len -= res;
1424 	} while ((len > 0) && (moreflag & T_MORE));
1425 	return (final);
1426 }
1427 
1428 static struct clnt_ops *
1429 clnt_vc_ops(void)
1430 {
1431 	static struct clnt_ops ops;
1432 	extern mutex_t	ops_lock;
1433 
1434 	/* VARIABLES PROTECTED BY ops_lock: ops */
1435 
1436 	sig_mutex_lock(&ops_lock);
1437 	if (ops.cl_call == NULL) {
1438 		ops.cl_call = clnt_vc_call;
1439 		ops.cl_send = clnt_vc_send;
1440 		ops.cl_abort = clnt_vc_abort;
1441 		ops.cl_geterr = clnt_vc_geterr;
1442 		ops.cl_freeres = clnt_vc_freeres;
1443 		ops.cl_destroy = clnt_vc_destroy;
1444 		ops.cl_control = clnt_vc_control;
1445 	}
1446 	sig_mutex_unlock(&ops_lock);
1447 	return (&ops);
1448 }
1449 
1450 /*
1451  * Make sure that the time is not garbage.   -1 value is disallowed.
1452  * Note this is different from time_not_ok in clnt_dg.c
1453  */
1454 static bool_t
1455 time_not_ok(struct timeval *t)
1456 {
1457 	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
1458 	    t->tv_usec <= -1 || t->tv_usec > 1000000);
1459 }
1460 
1461 
1462 /* Compute the # of bytes that remains until the end of the buffer */
1463 #define	REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer))
1464 
1465 static int
1466 addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes)
1467 {
1468 	if (NULL == ct->ct_buffer) {
1469 		/* Buffer not allocated yet. */
1470 		char *buffer;
1471 
1472 		buffer = malloc(ct->ct_bufferSize);
1473 		if (NULL == buffer) {
1474 			errno = ENOMEM;
1475 			return (-1);
1476 		}
1477 		(void) memcpy(buffer, dataToAdd, nBytes);
1478 
1479 		ct->ct_buffer = buffer;
1480 		ct->ct_bufferReadPtr = buffer;
1481 		ct->ct_bufferWritePtr = buffer + nBytes;
1482 		ct->ct_bufferPendingSize = nBytes;
1483 	} else {
1484 		/*
1485 		 * For an already allocated buffer, two mem copies
1486 		 * might be needed, depending on the current
1487 		 * writing position.
1488 		 */
1489 
1490 		/* Compute the length of the first copy. */
1491 		int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr));
1492 
1493 		ct->ct_bufferPendingSize += nBytes;
1494 
1495 		(void) memcpy(ct->ct_bufferWritePtr, dataToAdd, len);
1496 		ct->ct_bufferWritePtr += len;
1497 		nBytes -= len;
1498 		if (0 == nBytes) {
1499 			/* One memcopy needed. */
1500 
1501 			/*
1502 			 * If the write pointer is at the end of the buffer,
1503 			 * wrap it now.
1504 			 */
1505 			if (ct->ct_bufferWritePtr ==
1506 			    (ct->ct_buffer + ct->ct_bufferSize)) {
1507 				ct->ct_bufferWritePtr = ct->ct_buffer;
1508 			}
1509 		} else {
1510 			/* Two memcopy needed. */
1511 			dataToAdd += len;
1512 
1513 			/*
1514 			 * Copy the remaining data to the beginning of the
1515 			 * buffer
1516 			 */
1517 			(void) memcpy(ct->ct_buffer, dataToAdd, nBytes);
1518 			ct->ct_bufferWritePtr = ct->ct_buffer + nBytes;
1519 		}
1520 	}
1521 	return (0);
1522 }
1523 
1524 static void
1525 consumeFromBuffer(struct ct_data *ct, unsigned int nBytes)
1526 {
1527 	ct->ct_bufferPendingSize -= nBytes;
1528 	if (ct->ct_bufferPendingSize == 0) {
1529 		/*
1530 		 * If the buffer contains no data, we set the two pointers at
1531 		 * the beginning of the buffer (to miminize buffer wraps).
1532 		 */
1533 		ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer;
1534 	} else {
1535 		ct->ct_bufferReadPtr += nBytes;
1536 		if (ct->ct_bufferReadPtr >
1537 		    ct->ct_buffer + ct->ct_bufferSize) {
1538 			ct->ct_bufferReadPtr -= ct->ct_bufferSize;
1539 		}
1540 	}
1541 }
1542 
1543 static int
1544 iovFromBuffer(struct ct_data *ct, struct iovec *iov)
1545 {
1546 	int l;
1547 
1548 	if (ct->ct_bufferPendingSize == 0)
1549 		return (0);
1550 
1551 	l = REMAIN_BYTES(bufferReadPtr);
1552 	if (l < ct->ct_bufferPendingSize) {
1553 		/* Buffer in two fragments. */
1554 		iov[0].iov_base = ct->ct_bufferReadPtr;
1555 		iov[0].iov_len  = l;
1556 
1557 		iov[1].iov_base = ct->ct_buffer;
1558 		iov[1].iov_len  = ct->ct_bufferPendingSize - l;
1559 		return (2);
1560 	} else {
1561 		/* Buffer in one fragment. */
1562 		iov[0].iov_base = ct->ct_bufferReadPtr;
1563 		iov[0].iov_len  = ct->ct_bufferPendingSize;
1564 		return (1);
1565 	}
1566 }
1567 
1568 static bool_t
1569 set_flush_mode(struct ct_data *ct, int mode)
1570 {
1571 	switch (mode) {
1572 	case RPC_CL_BLOCKING_FLUSH:
1573 		/* flush as most as possible without blocking */
1574 	case RPC_CL_BESTEFFORT_FLUSH:
1575 		/* flush the buffer completely (possibly blocking) */
1576 	case RPC_CL_DEFAULT_FLUSH:
1577 		/* flush according to the currently defined policy */
1578 		ct->ct_blocking_mode = mode;
1579 		return (TRUE);
1580 	default:
1581 		return (FALSE);
1582 	}
1583 }
1584 
1585 static bool_t
1586 set_io_mode(struct ct_data *ct, int ioMode)
1587 {
1588 	switch (ioMode) {
1589 	case RPC_CL_BLOCKING:
1590 		if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1591 			if (NULL != ct->ct_buffer) {
1592 				/*
1593 				 * If a buffer was allocated for this
1594 				 * connection, flush it now, and free it.
1595 				 */
1596 				(void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1597 				free(ct->ct_buffer);
1598 				ct->ct_buffer = NULL;
1599 			}
1600 			(void) unregister_nb(ct);
1601 			ct->ct_io_mode = ioMode;
1602 		}
1603 		break;
1604 	case RPC_CL_NONBLOCKING:
1605 		if (ct->ct_io_mode == RPC_CL_BLOCKING) {
1606 			if (-1 == register_nb(ct)) {
1607 				return (FALSE);
1608 			}
1609 			ct->ct_io_mode = ioMode;
1610 		}
1611 		break;
1612 	default:
1613 		return (FALSE);
1614 	}
1615 	return (TRUE);
1616 }
1617 
1618 static int
1619 do_flush(struct ct_data *ct, uint_t flush_mode)
1620 {
1621 	int result;
1622 	if (ct->ct_bufferPendingSize == 0) {
1623 		return (0);
1624 	}
1625 
1626 	switch (flush_mode) {
1627 	case RPC_CL_BLOCKING_FLUSH:
1628 		if (!set_blocking_connection(ct, TRUE)) {
1629 			return (-1);
1630 		}
1631 		while (ct->ct_bufferPendingSize > 0) {
1632 			if (REMAIN_BYTES(bufferReadPtr) <
1633 			    ct->ct_bufferPendingSize) {
1634 				struct iovec iov[2];
1635 				(void) iovFromBuffer(ct, iov);
1636 				result = writev(ct->ct_fd, iov, 2);
1637 			} else {
1638 				result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1639 				    ct->ct_bufferPendingSize, 0);
1640 			}
1641 			if (result < 0) {
1642 				return (-1);
1643 			}
1644 			consumeFromBuffer(ct, result);
1645 		}
1646 
1647 		break;
1648 
1649 	case RPC_CL_BESTEFFORT_FLUSH:
1650 		(void) set_blocking_connection(ct, FALSE);
1651 		if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) {
1652 			struct iovec iov[2];
1653 			(void) iovFromBuffer(ct, iov);
1654 			result = writev(ct->ct_fd, iov, 2);
1655 		} else {
1656 			result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1657 			    ct->ct_bufferPendingSize, 0);
1658 		}
1659 		if (result < 0) {
1660 			if (errno != EWOULDBLOCK) {
1661 				perror("flush");
1662 				return (-1);
1663 			}
1664 			return (0);
1665 		}
1666 		if (result > 0)
1667 			consumeFromBuffer(ct, result);
1668 		break;
1669 	}
1670 	return (0);
1671 }
1672 
1673 /*
1674  * Non blocking send.
1675  */
1676 
1677 static int
1678 nb_send(struct ct_data *ct, void *buff, unsigned int nBytes)
1679 {
1680 	int result;
1681 
1682 	if (!(ntohl(*(uint32_t *)buff) & 2^31)) {
1683 		return (-1);
1684 	}
1685 
1686 	/*
1687 	 * Check to see if the current message can be stored fully in the
1688 	 * buffer. We have to check this now because it may be impossible
1689 	 * to send any data, so the message must be stored in the buffer.
1690 	 */
1691 	if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) {
1692 		/* Try to flush  (to free some space). */
1693 		(void) do_flush(ct, RPC_CL_BESTEFFORT_FLUSH);
1694 
1695 		/* Can we store the message now ? */
1696 		if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize))
1697 			return (-2);
1698 	}
1699 
1700 	(void) set_blocking_connection(ct, FALSE);
1701 
1702 	/*
1703 	 * If there is no data pending, we can simply try
1704 	 * to send our data.
1705 	 */
1706 	if (ct->ct_bufferPendingSize == 0) {
1707 		result = t_snd(ct->ct_fd, buff, nBytes, 0);
1708 		if (result == -1) {
1709 			if (errno == EWOULDBLOCK) {
1710 				result = 0;
1711 			} else {
1712 				perror("send");
1713 				return (-1);
1714 			}
1715 		}
1716 		/*
1717 		 * If we have not sent all data, we must store them
1718 		 * in the buffer.
1719 		 */
1720 		if (result != nBytes) {
1721 			if (addInBuffer(ct, (char *)buff + result,
1722 			    nBytes - result) == -1) {
1723 				return (-1);
1724 			}
1725 		}
1726 	} else {
1727 		/*
1728 		 * Some data pending in the buffer.  We try to send
1729 		 * both buffer data and current message in one shot.
1730 		 */
1731 		struct iovec iov[3];
1732 		int i = iovFromBuffer(ct, &iov[0]);
1733 
1734 		iov[i].iov_base = buff;
1735 		iov[i].iov_len  = nBytes;
1736 
1737 		result = writev(ct->ct_fd, iov, i+1);
1738 		if (result == -1) {
1739 			if (errno == EWOULDBLOCK) {
1740 				/* No bytes sent */
1741 				result = 0;
1742 			} else {
1743 				return (-1);
1744 			}
1745 		}
1746 
1747 		/*
1748 		 * Add the bytes from the message
1749 		 * that we have not sent.
1750 		 */
1751 		if (result <= ct->ct_bufferPendingSize) {
1752 			/* No bytes from the message sent */
1753 			consumeFromBuffer(ct, result);
1754 			if (addInBuffer(ct, buff, nBytes) == -1) {
1755 				return (-1);
1756 			}
1757 		} else {
1758 			/*
1759 			 * Some bytes of the message are sent.
1760 			 * Compute the length of the message that has
1761 			 * been sent.
1762 			 */
1763 			int len = result - ct->ct_bufferPendingSize;
1764 
1765 			/* So, empty the buffer. */
1766 			ct->ct_bufferReadPtr = ct->ct_buffer;
1767 			ct->ct_bufferWritePtr = ct->ct_buffer;
1768 			ct->ct_bufferPendingSize = 0;
1769 
1770 			/* And add the remaining part of the message. */
1771 			if (len != nBytes) {
1772 				if (addInBuffer(ct, (char *)buff + len,
1773 				    nBytes-len) == -1) {
1774 					return (-1);
1775 				}
1776 			}
1777 		}
1778 	}
1779 	return (nBytes);
1780 }
1781 
1782 static void
1783 flush_registered_clients(void)
1784 {
1785 	struct nb_reg_node *node;
1786 
1787 	if (LIST_ISEMPTY(nb_first)) {
1788 		return;
1789 	}
1790 
1791 	LIST_FOR_EACH(nb_first, node) {
1792 		(void) do_flush(node->ct, RPC_CL_BLOCKING_FLUSH);
1793 	}
1794 }
1795 
1796 static int
1797 allocate_chunk(void)
1798 {
1799 #define	CHUNK_SIZE 16
1800 	struct nb_reg_node *chk =
1801 	    malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE);
1802 	struct nb_reg_node *n;
1803 	int i;
1804 
1805 	if (NULL == chk) {
1806 		return (-1);
1807 	}
1808 
1809 	n = chk;
1810 	for (i = 0; i < CHUNK_SIZE-1; ++i) {
1811 		n[i].next = &(n[i+1]);
1812 	}
1813 	n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free;
1814 	nb_free = chk;
1815 	return (0);
1816 }
1817 
1818 static int
1819 register_nb(struct ct_data *ct)
1820 {
1821 	struct nb_reg_node *node;
1822 
1823 	(void) mutex_lock(&nb_list_mutex);
1824 
1825 	if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) {
1826 		(void) mutex_unlock(&nb_list_mutex);
1827 		errno = ENOMEM;
1828 		return (-1);
1829 	}
1830 
1831 	if (!exit_handler_set) {
1832 		(void) atexit(flush_registered_clients);
1833 		exit_handler_set = TRUE;
1834 	}
1835 	/* Get the first free node */
1836 	LIST_EXTRACT(nb_free, node);
1837 
1838 	node->ct = ct;
1839 
1840 	LIST_ADD(nb_first, node);
1841 	(void) mutex_unlock(&nb_list_mutex);
1842 
1843 	return (0);
1844 }
1845 
1846 static int
1847 unregister_nb(struct ct_data *ct)
1848 {
1849 	struct nb_reg_node *node;
1850 
1851 	(void) mutex_lock(&nb_list_mutex);
1852 	assert(!LIST_ISEMPTY(nb_first));
1853 
1854 	node = nb_first;
1855 	LIST_FOR_EACH(nb_first, node) {
1856 		if (node->next->ct == ct) {
1857 			/* Get the node to unregister. */
1858 			struct nb_reg_node *n = node->next;
1859 			node->next = n->next;
1860 
1861 			n->ct = NULL;
1862 			LIST_ADD(nb_free, n);
1863 			break;
1864 		}
1865 	}
1866 	(void) mutex_unlock(&nb_list_mutex);
1867 	return (0);
1868 }
1869