1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2015 Nexenta Systems, Inc. All rights reserved.
24 * Copyright (c) 2016 by Delphix. All rights reserved.
25 */
26
27 /*
28 * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
29 * Use is subject to license terms.
30 */
31
32 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
33 /* All Rights Reserved */
34 /*
35 * Portions of this source code were derived from Berkeley
36 * 4.3 BSD under license from the Regents of the University of
37 * California.
38 */
39
40 /*
41 * clnt_vc.c
42 *
43 * Implements a connectionful client side RPC.
44 *
45 * Connectionful RPC supports 'batched calls'.
46 * A sequence of calls may be batched-up in a send buffer. The rpc call
47 * return immediately to the client even though the call was not necessarily
48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND
49 * the rpc timeout value is zero (see clnt.h, rpc).
50 *
51 * Clients should NOT casually batch calls that in fact return results; that
52 * is the server side should be aware that a call is batched and not produce
53 * any return message. Batched calls that produce many result messages can
54 * deadlock (netlock) the client and the server....
55 */
56
57
58 #include "mt.h"
59 #include "rpc_mt.h"
60 #include <assert.h>
61 #include <rpc/rpc.h>
62 #include <errno.h>
63 #include <sys/byteorder.h>
64 #include <sys/mkdev.h>
65 #include <sys/poll.h>
66 #include <syslog.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <netinet/tcp.h>
70 #include <limits.h>
71
72 #define MCALL_MSG_SIZE 24
73 #define SECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000 * 1000)
74 #define MSECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000)
75 #define USECS_TO_NS(x) ((hrtime_t)(x) * 1000)
76 #define NSECS_TO_MS(x) ((x) / 1000 / 1000)
77 #ifndef MIN
78 #define MIN(a, b) (((a) < (b)) ? (a) : (b))
79 #endif
80
81 extern int __rpc_timeval_to_msec(struct timeval *);
82 extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *);
83 extern bool_t xdr_opaque_auth(XDR *, struct opaque_auth *);
84 extern bool_t __rpc_gss_wrap(AUTH *, char *, uint_t, XDR *, bool_t (*)(),
85 caddr_t);
86 extern bool_t __rpc_gss_unwrap(AUTH *, XDR *, bool_t (*)(), caddr_t);
87 extern CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t,
88 rpcvers_t, uint_t, uint_t, const struct timeval *);
89
90 static struct clnt_ops *clnt_vc_ops(void);
91 static int read_vc(void *, caddr_t, int);
92 static int write_vc(void *, caddr_t, int);
93 static int t_rcvall(int, char *, int);
94 static bool_t time_not_ok(struct timeval *);
95
96 struct ct_data;
97 static bool_t set_up_connection(int, struct netbuf *,
98 struct ct_data *, const struct timeval *);
99 static bool_t set_io_mode(struct ct_data *, int);
100
101 /*
102 * Lock table handle used by various MT sync. routines
103 */
104 static mutex_t vctbl_lock = DEFAULTMUTEX;
105 static void *vctbl = NULL;
106
107 static const char clnt_vc_errstr[] = "%s : %s";
108 static const char clnt_vc_str[] = "clnt_vc_create";
109 static const char clnt_read_vc_str[] = "read_vc";
110 static const char __no_mem_str[] = "out of memory";
111 static const char no_fcntl_getfl_str[] = "could not get status flags and modes";
112 static const char no_nonblock_str[] = "could not set transport blocking mode";
113
114 /*
115 * Private data structure
116 */
117 struct ct_data {
118 int ct_fd; /* connection's fd */
119 bool_t ct_closeit; /* close it on destroy */
120 int ct_tsdu; /* size of tsdu */
121 int ct_wait; /* wait interval in milliseconds */
122 bool_t ct_waitset; /* wait set by clnt_control? */
123 struct netbuf ct_addr; /* remote addr */
124 struct rpc_err ct_error;
125 char ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */
126 uint_t ct_mpos; /* pos after marshal */
127 XDR ct_xdrs; /* XDR stream */
128
129 /* NON STANDARD INFO - 00-08-31 */
130 bool_t ct_is_oneway; /* True if the current call is oneway. */
131 bool_t ct_is_blocking;
132 ushort_t ct_io_mode;
133 ushort_t ct_blocking_mode;
134 uint_t ct_bufferSize; /* Total size of the buffer. */
135 uint_t ct_bufferPendingSize; /* Size of unsent data. */
136 char *ct_buffer; /* Pointer to the buffer. */
137 char *ct_bufferWritePtr; /* Ptr to the first free byte. */
138 char *ct_bufferReadPtr; /* Ptr to the first byte of data. */
139 };
140
141 struct nb_reg_node {
142 struct nb_reg_node *next;
143 struct ct_data *ct;
144 };
145
146 static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first;
147 static struct nb_reg_node *nb_free = (struct nb_reg_node *)&nb_free;
148
149 static bool_t exit_handler_set = FALSE;
150
151 static mutex_t nb_list_mutex = DEFAULTMUTEX;
152
153
154 /* Define some macros to manage the linked list. */
155 #define LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l)
156 #define LIST_CLR(l) (l = (struct nb_reg_node *)&l)
157 #define LIST_ADD(l, node) (node->next = l->next, l = node)
158 #define LIST_EXTRACT(l, node) (node = l, l = l->next)
159 #define LIST_FOR_EACH(l, node) \
160 for (node = l; node != (struct nb_reg_node *)&l; node = node->next)
161
162
163 /* Default size of the IO buffer used in non blocking mode */
164 #define DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024)
165
166 static int nb_send(struct ct_data *, void *, unsigned int);
167 static int do_flush(struct ct_data *, uint_t);
168 static bool_t set_flush_mode(struct ct_data *, int);
169 static bool_t set_blocking_connection(struct ct_data *, bool_t);
170
171 static int register_nb(struct ct_data *);
172 static int unregister_nb(struct ct_data *);
173
174
175 /*
176 * Change the mode of the underlying fd.
177 */
178 static bool_t
set_blocking_connection(struct ct_data * ct,bool_t blocking)179 set_blocking_connection(struct ct_data *ct, bool_t blocking)
180 {
181 int flag;
182
183 /*
184 * If the underlying fd is already in the required mode,
185 * avoid the syscall.
186 */
187 if (ct->ct_is_blocking == blocking)
188 return (TRUE);
189
190 if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) {
191 (void) syslog(LOG_ERR, "set_blocking_connection : %s",
192 no_fcntl_getfl_str);
193 return (FALSE);
194 }
195
196 flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK;
197 if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) {
198 (void) syslog(LOG_ERR, "set_blocking_connection : %s",
199 no_nonblock_str);
200 return (FALSE);
201 }
202 ct->ct_is_blocking = blocking;
203 return (TRUE);
204 }
205
206 /*
207 * Create a client handle for a connection.
208 * Default options are set, which the user can change using clnt_control()'s.
209 * The rpc/vc package does buffering similar to stdio, so the client
210 * must pick send and receive buffer sizes, 0 => use the default.
211 * NB: fd is copied into a private area.
212 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
213 * set this something more useful.
214 *
215 * fd should be open and bound.
216 */
217 CLIENT *
clnt_vc_create(const int fd,struct netbuf * svcaddr,const rpcprog_t prog,const rpcvers_t vers,const uint_t sendsz,const uint_t recvsz)218 clnt_vc_create(const int fd, struct netbuf *svcaddr, const rpcprog_t prog,
219 const rpcvers_t vers, const uint_t sendsz, const uint_t recvsz)
220 {
221 return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz,
222 recvsz, NULL));
223 }
224
225 /*
226 * This has the same definition as clnt_vc_create(), except it
227 * takes an additional parameter - a pointer to a timeval structure.
228 *
229 * Not a public interface. This is for clnt_create_timed,
230 * clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout
231 * value to control a tcp connection attempt.
232 * (for bug 4049792: clnt_create_timed does not time out)
233 *
234 * If tp is NULL, use default timeout to set up the connection.
235 */
236 CLIENT *
_clnt_vc_create_timed(int fd,struct netbuf * svcaddr,rpcprog_t prog,rpcvers_t vers,uint_t sendsz,uint_t recvsz,const struct timeval * tp)237 _clnt_vc_create_timed(int fd, struct netbuf *svcaddr, rpcprog_t prog,
238 rpcvers_t vers, uint_t sendsz, uint_t recvsz, const struct timeval *tp)
239 {
240 CLIENT *cl; /* client handle */
241 struct ct_data *ct; /* private data */
242 struct timeval now;
243 struct rpc_msg call_msg;
244 struct t_info tinfo;
245 int flag;
246
247 cl = malloc(sizeof (*cl));
248 if ((ct = malloc(sizeof (*ct))) != NULL)
249 ct->ct_addr.buf = NULL;
250
251 if ((cl == NULL) || (ct == NULL)) {
252 (void) syslog(LOG_ERR, clnt_vc_errstr,
253 clnt_vc_str, __no_mem_str);
254 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
255 rpc_createerr.cf_error.re_errno = errno;
256 rpc_createerr.cf_error.re_terrno = 0;
257 goto err;
258 }
259
260 /*
261 * The only use of vctbl_lock is for serializing the creation of
262 * vctbl. Once created the lock needs to be released so we don't
263 * hold it across the set_up_connection() call and end up with a
264 * bunch of threads stuck waiting for the mutex.
265 */
266 sig_mutex_lock(&vctbl_lock);
267
268 if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) {
269 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
270 rpc_createerr.cf_error.re_errno = errno;
271 rpc_createerr.cf_error.re_terrno = 0;
272 sig_mutex_unlock(&vctbl_lock);
273 goto err;
274 }
275
276 sig_mutex_unlock(&vctbl_lock);
277
278 ct->ct_io_mode = RPC_CL_BLOCKING;
279 ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH;
280
281 ct->ct_buffer = NULL; /* We allocate the buffer when needed. */
282 ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE;
283 ct->ct_bufferPendingSize = 0;
284 ct->ct_bufferWritePtr = NULL;
285 ct->ct_bufferReadPtr = NULL;
286
287 /* Check the current state of the fd. */
288 if ((flag = fcntl(fd, F_GETFL, 0)) < 0) {
289 (void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s",
290 no_fcntl_getfl_str);
291 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
292 rpc_createerr.cf_error.re_terrno = errno;
293 rpc_createerr.cf_error.re_errno = 0;
294 goto err;
295 }
296 ct->ct_is_blocking = flag & O_NONBLOCK ? FALSE : TRUE;
297
298 if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) {
299 goto err;
300 }
301
302 /*
303 * Set up other members of private data struct
304 */
305 ct->ct_fd = fd;
306 /*
307 * The actual value will be set by clnt_call or clnt_control
308 */
309 ct->ct_wait = 30000;
310 ct->ct_waitset = FALSE;
311 /*
312 * By default, closeit is always FALSE. It is users responsibility
313 * to do a t_close on it, else the user may use clnt_control
314 * to let clnt_destroy do it for them.
315 */
316 ct->ct_closeit = FALSE;
317
318 /*
319 * Initialize call message
320 */
321 (void) gettimeofday(&now, (struct timezone *)0);
322 call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
323 call_msg.rm_call.cb_prog = prog;
324 call_msg.rm_call.cb_vers = vers;
325
326 /*
327 * pre-serialize the static part of the call msg and stash it away
328 */
329 xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE);
330 if (!xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
331 goto err;
332 }
333 ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
334 XDR_DESTROY(&(ct->ct_xdrs));
335
336 if (t_getinfo(fd, &tinfo) == -1) {
337 rpc_createerr.cf_stat = RPC_TLIERROR;
338 rpc_createerr.cf_error.re_terrno = t_errno;
339 rpc_createerr.cf_error.re_errno = 0;
340 goto err;
341 }
342 /*
343 * Find the receive and the send size
344 */
345 sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu);
346 recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu);
347 if ((sendsz == 0) || (recvsz == 0)) {
348 rpc_createerr.cf_stat = RPC_TLIERROR;
349 rpc_createerr.cf_error.re_terrno = 0;
350 rpc_createerr.cf_error.re_errno = 0;
351 goto err;
352 }
353 ct->ct_tsdu = tinfo.tsdu;
354 /*
355 * Create a client handle which uses xdrrec for serialization
356 * and authnone for authentication.
357 */
358 ct->ct_xdrs.x_ops = NULL;
359 xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct,
360 read_vc, write_vc);
361 if (ct->ct_xdrs.x_ops == NULL) {
362 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
363 rpc_createerr.cf_error.re_terrno = 0;
364 rpc_createerr.cf_error.re_errno = ENOMEM;
365 goto err;
366 }
367 cl->cl_ops = clnt_vc_ops();
368 cl->cl_private = (caddr_t)ct;
369 cl->cl_auth = authnone_create();
370 cl->cl_tp = NULL;
371 cl->cl_netid = NULL;
372 return (cl);
373
374 err:
375 if (ct) {
376 free(ct->ct_addr.buf);
377 free(ct);
378 }
379 free(cl);
380
381 return (NULL);
382 }
383
384 #define TCPOPT_BUFSIZE 128
385
386 /*
387 * Set tcp connection timeout value.
388 * Retun 0 for success, -1 for failure.
389 */
390 static int
_set_tcp_conntime(int fd,int optval)391 _set_tcp_conntime(int fd, int optval)
392 {
393 struct t_optmgmt req, res;
394 struct opthdr *opt;
395 int *ip;
396 char buf[TCPOPT_BUFSIZE];
397
398 opt = (struct opthdr *)buf;
399 opt->level = IPPROTO_TCP;
400 opt->name = TCP_CONN_ABORT_THRESHOLD;
401 opt->len = sizeof (int);
402
403 req.flags = T_NEGOTIATE;
404 req.opt.len = sizeof (struct opthdr) + opt->len;
405 req.opt.buf = (char *)opt;
406 ip = (int *)((char *)buf + sizeof (struct opthdr));
407 *ip = optval;
408
409 res.flags = 0;
410 res.opt.buf = (char *)buf;
411 res.opt.maxlen = sizeof (buf);
412 if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
413 return (-1);
414 }
415 return (0);
416 }
417
418 /*
419 * Get current tcp connection timeout value.
420 * Retun the timeout in milliseconds, or -1 for failure.
421 */
422 static int
_get_tcp_conntime(int fd)423 _get_tcp_conntime(int fd)
424 {
425 struct t_optmgmt req, res;
426 struct opthdr *opt;
427 int *ip, retval;
428 char buf[TCPOPT_BUFSIZE];
429
430 opt = (struct opthdr *)buf;
431 opt->level = IPPROTO_TCP;
432 opt->name = TCP_CONN_ABORT_THRESHOLD;
433 opt->len = sizeof (int);
434
435 req.flags = T_CURRENT;
436 req.opt.len = sizeof (struct opthdr) + opt->len;
437 req.opt.buf = (char *)opt;
438 ip = (int *)((char *)buf + sizeof (struct opthdr));
439 *ip = 0;
440
441 res.flags = 0;
442 res.opt.buf = (char *)buf;
443 res.opt.maxlen = sizeof (buf);
444 if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
445 return (-1);
446 }
447
448 ip = (int *)((char *)buf + sizeof (struct opthdr));
449 retval = *ip;
450 return (retval);
451 }
452
453 static bool_t
set_up_connection(int fd,struct netbuf * svcaddr,struct ct_data * ct,const struct timeval * tp)454 set_up_connection(int fd, struct netbuf *svcaddr, struct ct_data *ct,
455 const struct timeval *tp)
456 {
457 int state;
458 struct t_call sndcallstr, *rcvcall;
459 int nconnect;
460 bool_t connected, do_rcv_connect;
461 int curr_time = -1;
462 hrtime_t start;
463 hrtime_t tout; /* timeout in nanoseconds (from tp) */
464
465 ct->ct_addr.len = 0;
466 state = t_getstate(fd);
467 if (state == -1) {
468 rpc_createerr.cf_stat = RPC_TLIERROR;
469 rpc_createerr.cf_error.re_errno = 0;
470 rpc_createerr.cf_error.re_terrno = t_errno;
471 return (FALSE);
472 }
473
474 switch (state) {
475 case T_IDLE:
476 if (svcaddr == NULL) {
477 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
478 return (FALSE);
479 }
480 /*
481 * Connect only if state is IDLE and svcaddr known
482 */
483 rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR);
484 if (rcvcall == NULL) {
485 rpc_createerr.cf_stat = RPC_TLIERROR;
486 rpc_createerr.cf_error.re_terrno = t_errno;
487 rpc_createerr.cf_error.re_errno = errno;
488 return (FALSE);
489 }
490 rcvcall->udata.maxlen = 0;
491 sndcallstr.addr = *svcaddr;
492 sndcallstr.opt.len = 0;
493 sndcallstr.udata.len = 0;
494 /*
495 * Even NULL could have sufficed for rcvcall, because
496 * the address returned is same for all cases except
497 * for the gateway case, and hence required.
498 */
499 connected = FALSE;
500 do_rcv_connect = FALSE;
501
502 /*
503 * If there is a timeout value specified, we will try to
504 * reset the tcp connection timeout. If the transport does
505 * not support the TCP_CONN_ABORT_THRESHOLD option or fails
506 * for other reason, default timeout will be used.
507 */
508 if (tp != NULL) {
509 start = gethrtime();
510
511 /*
512 * Calculate the timeout in nanoseconds
513 */
514 tout = SECS_TO_NS(tp->tv_sec) +
515 USECS_TO_NS(tp->tv_usec);
516 curr_time = _get_tcp_conntime(fd);
517 }
518
519 for (nconnect = 0; nconnect < 3; nconnect++) {
520 if (tp != NULL) {
521 /*
522 * Calculate the elapsed time
523 */
524 hrtime_t elapsed = gethrtime() - start;
525 if (elapsed >= tout)
526 break;
527
528 if (curr_time != -1) {
529 int ms;
530
531 /*
532 * TCP_CONN_ABORT_THRESHOLD takes int
533 * value in milliseconds. Make sure we
534 * do not overflow.
535 */
536 if (NSECS_TO_MS(tout - elapsed) >=
537 INT_MAX) {
538 ms = INT_MAX;
539 } else {
540 ms = (int)
541 NSECS_TO_MS(tout - elapsed);
542 if (MSECS_TO_NS(ms) !=
543 tout - elapsed)
544 ms++;
545 }
546
547 (void) _set_tcp_conntime(fd, ms);
548 }
549 }
550
551 if (t_connect(fd, &sndcallstr, rcvcall) != -1) {
552 connected = TRUE;
553 break;
554 }
555 if (t_errno == TLOOK) {
556 switch (t_look(fd)) {
557 case T_DISCONNECT:
558 (void) t_rcvdis(fd, (struct
559 t_discon *) NULL);
560 break;
561 default:
562 break;
563 }
564 } else if (!(t_errno == TSYSERR && errno == EINTR)) {
565 break;
566 }
567 if ((state = t_getstate(fd)) == T_OUTCON) {
568 do_rcv_connect = TRUE;
569 break;
570 }
571 if (state != T_IDLE) {
572 break;
573 }
574 }
575 if (do_rcv_connect) {
576 do {
577 if (t_rcvconnect(fd, rcvcall) != -1) {
578 connected = TRUE;
579 break;
580 }
581 } while (t_errno == TSYSERR && errno == EINTR);
582 }
583
584 /*
585 * Set the connection timeout back to its old value.
586 */
587 if (curr_time != -1) {
588 (void) _set_tcp_conntime(fd, curr_time);
589 }
590
591 if (!connected) {
592 rpc_createerr.cf_stat = RPC_TLIERROR;
593 rpc_createerr.cf_error.re_terrno = t_errno;
594 rpc_createerr.cf_error.re_errno = errno;
595 (void) t_free((char *)rcvcall, T_CALL);
596 return (FALSE);
597 }
598
599 /* Free old area if allocated */
600 if (ct->ct_addr.buf)
601 free(ct->ct_addr.buf);
602 ct->ct_addr = rcvcall->addr; /* To get the new address */
603 /* So that address buf does not get freed */
604 rcvcall->addr.buf = NULL;
605 (void) t_free((char *)rcvcall, T_CALL);
606 break;
607 case T_DATAXFER:
608 case T_OUTCON:
609 if (svcaddr == NULL) {
610 /*
611 * svcaddr could also be NULL in cases where the
612 * client is already bound and connected.
613 */
614 ct->ct_addr.len = 0;
615 } else {
616 ct->ct_addr.buf = malloc(svcaddr->len);
617 if (ct->ct_addr.buf == NULL) {
618 (void) syslog(LOG_ERR, clnt_vc_errstr,
619 clnt_vc_str, __no_mem_str);
620 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
621 rpc_createerr.cf_error.re_errno = errno;
622 rpc_createerr.cf_error.re_terrno = 0;
623 return (FALSE);
624 }
625 (void) memcpy(ct->ct_addr.buf, svcaddr->buf,
626 (size_t)svcaddr->len);
627 ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len;
628 }
629 break;
630 default:
631 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
632 return (FALSE);
633 }
634 return (TRUE);
635 }
636
637 static enum clnt_stat
clnt_vc_call(CLIENT * cl,rpcproc_t proc,xdrproc_t xdr_args,caddr_t args_ptr,xdrproc_t xdr_results,caddr_t results_ptr,struct timeval timeout)638 clnt_vc_call(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr,
639 xdrproc_t xdr_results, caddr_t results_ptr, struct timeval timeout)
640 {
641 struct ct_data *ct = (struct ct_data *)cl->cl_private;
642 XDR *xdrs = &(ct->ct_xdrs);
643 struct rpc_msg reply_msg;
644 uint32_t x_id;
645 uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */
646 bool_t shipnow;
647 int refreshes = 2;
648
649 if (rpc_fd_lock(vctbl, ct->ct_fd)) {
650 rpc_callerr.re_status = RPC_FAILED;
651 rpc_callerr.re_errno = errno;
652 rpc_fd_unlock(vctbl, ct->ct_fd);
653 return (RPC_FAILED);
654 }
655
656 ct->ct_is_oneway = FALSE;
657 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
658 if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) {
659 rpc_fd_unlock(vctbl, ct->ct_fd);
660 return (RPC_FAILED); /* XXX */
661 }
662 }
663
664 if (!ct->ct_waitset) {
665 /* If time is not within limits, we ignore it. */
666 if (time_not_ok(&timeout) == FALSE)
667 ct->ct_wait = __rpc_timeval_to_msec(&timeout);
668 } else {
669 timeout.tv_sec = (ct->ct_wait / 1000);
670 timeout.tv_usec = (ct->ct_wait % 1000) * 1000;
671 }
672
673 shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) &&
674 (timeout.tv_usec == 0)) ? FALSE : TRUE;
675 call_again:
676 xdrs->x_op = XDR_ENCODE;
677 rpc_callerr.re_status = RPC_SUCCESS;
678 /*
679 * Due to little endian byte order, it is necessary to convert to host
680 * format before decrementing xid.
681 */
682 x_id = ntohl(*msg_x_id) - 1;
683 *msg_x_id = htonl(x_id);
684
685 if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
686 if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
687 (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
688 (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
689 (!xdr_args(xdrs, args_ptr))) {
690 if (rpc_callerr.re_status == RPC_SUCCESS)
691 rpc_callerr.re_status = RPC_CANTENCODEARGS;
692 (void) xdrrec_endofrecord(xdrs, TRUE);
693 rpc_fd_unlock(vctbl, ct->ct_fd);
694 return (rpc_callerr.re_status);
695 }
696 } else {
697 uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
698 IXDR_PUT_U_INT32(u, proc);
699 if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
700 ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
701 if (rpc_callerr.re_status == RPC_SUCCESS)
702 rpc_callerr.re_status = RPC_CANTENCODEARGS;
703 (void) xdrrec_endofrecord(xdrs, TRUE);
704 rpc_fd_unlock(vctbl, ct->ct_fd);
705 return (rpc_callerr.re_status);
706 }
707 }
708 if (!xdrrec_endofrecord(xdrs, shipnow)) {
709 rpc_fd_unlock(vctbl, ct->ct_fd);
710 return (rpc_callerr.re_status = RPC_CANTSEND);
711 }
712 if (!shipnow) {
713 rpc_fd_unlock(vctbl, ct->ct_fd);
714 return (RPC_SUCCESS);
715 }
716 /*
717 * Hack to provide rpc-based message passing
718 */
719 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
720 rpc_fd_unlock(vctbl, ct->ct_fd);
721 return (rpc_callerr.re_status = RPC_TIMEDOUT);
722 }
723
724
725 /*
726 * Keep receiving until we get a valid transaction id
727 */
728 xdrs->x_op = XDR_DECODE;
729 for (;;) {
730 reply_msg.acpted_rply.ar_verf = _null_auth;
731 reply_msg.acpted_rply.ar_results.where = NULL;
732 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
733 if (!xdrrec_skiprecord(xdrs)) {
734 rpc_fd_unlock(vctbl, ct->ct_fd);
735 return (rpc_callerr.re_status);
736 }
737 /* now decode and validate the response header */
738 if (!xdr_replymsg(xdrs, &reply_msg)) {
739 if (rpc_callerr.re_status == RPC_SUCCESS)
740 continue;
741 rpc_fd_unlock(vctbl, ct->ct_fd);
742 return (rpc_callerr.re_status);
743 }
744 if (reply_msg.rm_xid == x_id)
745 break;
746 }
747
748 /*
749 * process header
750 */
751 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
752 (reply_msg.acpted_rply.ar_stat == SUCCESS))
753 rpc_callerr.re_status = RPC_SUCCESS;
754 else
755 __seterr_reply(&reply_msg, &(rpc_callerr));
756
757 if (rpc_callerr.re_status == RPC_SUCCESS) {
758 if (!AUTH_VALIDATE(cl->cl_auth,
759 &reply_msg.acpted_rply.ar_verf)) {
760 rpc_callerr.re_status = RPC_AUTHERROR;
761 rpc_callerr.re_why = AUTH_INVALIDRESP;
762 } else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
763 if (!(*xdr_results)(xdrs, results_ptr)) {
764 if (rpc_callerr.re_status == RPC_SUCCESS)
765 rpc_callerr.re_status =
766 RPC_CANTDECODERES;
767 }
768 } else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results,
769 results_ptr)) {
770 if (rpc_callerr.re_status == RPC_SUCCESS)
771 rpc_callerr.re_status = RPC_CANTDECODERES;
772 }
773 } /* end successful completion */
774 /*
775 * If unsuccesful AND error is an authentication error
776 * then refresh credentials and try again, else break
777 */
778 else if (rpc_callerr.re_status == RPC_AUTHERROR) {
779 /* maybe our credentials need to be refreshed ... */
780 if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
781 goto call_again;
782 else
783 /*
784 * We are setting rpc_callerr here given that libnsl
785 * is not reentrant thereby reinitializing the TSD.
786 * If not set here then success could be returned even
787 * though refresh failed.
788 */
789 rpc_callerr.re_status = RPC_AUTHERROR;
790 } /* end of unsuccessful completion */
791 /* free verifier ... */
792 if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED &&
793 reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
794 xdrs->x_op = XDR_FREE;
795 (void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
796 }
797 rpc_fd_unlock(vctbl, ct->ct_fd);
798 return (rpc_callerr.re_status);
799 }
800
801 static enum clnt_stat
clnt_vc_send(CLIENT * cl,rpcproc_t proc,xdrproc_t xdr_args,caddr_t args_ptr)802 clnt_vc_send(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr)
803 {
804 struct ct_data *ct = (struct ct_data *)cl->cl_private;
805 XDR *xdrs = &(ct->ct_xdrs);
806 uint32_t x_id;
807 uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */
808
809 if (rpc_fd_lock(vctbl, ct->ct_fd)) {
810 rpc_callerr.re_status = RPC_FAILED;
811 rpc_callerr.re_errno = errno;
812 rpc_fd_unlock(vctbl, ct->ct_fd);
813 return (RPC_FAILED);
814 }
815
816 ct->ct_is_oneway = TRUE;
817
818 xdrs->x_op = XDR_ENCODE;
819 rpc_callerr.re_status = RPC_SUCCESS;
820 /*
821 * Due to little endian byte order, it is necessary to convert to host
822 * format before decrementing xid.
823 */
824 x_id = ntohl(*msg_x_id) - 1;
825 *msg_x_id = htonl(x_id);
826
827 if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
828 if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
829 (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
830 (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
831 (!xdr_args(xdrs, args_ptr))) {
832 if (rpc_callerr.re_status == RPC_SUCCESS)
833 rpc_callerr.re_status = RPC_CANTENCODEARGS;
834 (void) xdrrec_endofrecord(xdrs, TRUE);
835 rpc_fd_unlock(vctbl, ct->ct_fd);
836 return (rpc_callerr.re_status);
837 }
838 } else {
839 uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
840 IXDR_PUT_U_INT32(u, proc);
841 if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
842 ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
843 if (rpc_callerr.re_status == RPC_SUCCESS)
844 rpc_callerr.re_status = RPC_CANTENCODEARGS;
845 (void) xdrrec_endofrecord(xdrs, TRUE);
846 rpc_fd_unlock(vctbl, ct->ct_fd);
847 return (rpc_callerr.re_status);
848 }
849 }
850
851 /*
852 * Do not need to check errors, as the following code does
853 * not depend on the successful completion of the call.
854 * An error, if any occurs, is reported through
855 * rpc_callerr.re_status.
856 */
857 (void) xdrrec_endofrecord(xdrs, TRUE);
858
859 rpc_fd_unlock(vctbl, ct->ct_fd);
860 return (rpc_callerr.re_status);
861 }
862
863 /* ARGSUSED */
864 static void
clnt_vc_geterr(CLIENT * cl,struct rpc_err * errp)865 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
866 {
867 *errp = rpc_callerr;
868 }
869
870 static bool_t
clnt_vc_freeres(CLIENT * cl,xdrproc_t xdr_res,caddr_t res_ptr)871 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, caddr_t res_ptr)
872 {
873 struct ct_data *ct = (struct ct_data *)cl->cl_private;
874 XDR *xdrs = &(ct->ct_xdrs);
875 bool_t stat;
876
877 (void) rpc_fd_lock(vctbl, ct->ct_fd);
878 xdrs->x_op = XDR_FREE;
879 stat = (*xdr_res)(xdrs, res_ptr);
880 rpc_fd_unlock(vctbl, ct->ct_fd);
881 return (stat);
882 }
883
884 static void
clnt_vc_abort(void)885 clnt_vc_abort(void)
886 {
887 }
888
889 static bool_t
clnt_vc_control(CLIENT * cl,int request,char * info)890 clnt_vc_control(CLIENT *cl, int request, char *info)
891 {
892 bool_t ret;
893 struct ct_data *ct = (struct ct_data *)cl->cl_private;
894
895 if (rpc_fd_lock(vctbl, ct->ct_fd)) {
896 rpc_fd_unlock(vctbl, ct->ct_fd);
897 return (FALSE);
898 }
899
900 switch (request) {
901 case CLSET_FD_CLOSE:
902 ct->ct_closeit = TRUE;
903 rpc_fd_unlock(vctbl, ct->ct_fd);
904 return (TRUE);
905 case CLSET_FD_NCLOSE:
906 ct->ct_closeit = FALSE;
907 rpc_fd_unlock(vctbl, ct->ct_fd);
908 return (TRUE);
909 case CLFLUSH:
910 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
911 int res;
912 res = do_flush(ct, (info == NULL ||
913 *(int *)info == RPC_CL_DEFAULT_FLUSH)?
914 ct->ct_blocking_mode: *(int *)info);
915 ret = (0 == res);
916 } else {
917 ret = FALSE;
918 }
919 rpc_fd_unlock(vctbl, ct->ct_fd);
920 return (ret);
921 }
922
923 /* for other requests which use info */
924 if (info == NULL) {
925 rpc_fd_unlock(vctbl, ct->ct_fd);
926 return (FALSE);
927 }
928 switch (request) {
929 case CLSET_TIMEOUT:
930 if (time_not_ok((struct timeval *)info)) {
931 rpc_fd_unlock(vctbl, ct->ct_fd);
932 return (FALSE);
933 }
934 ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info);
935 ct->ct_waitset = TRUE;
936 break;
937 case CLGET_TIMEOUT:
938 ((struct timeval *)info)->tv_sec = ct->ct_wait / 1000;
939 ((struct timeval *)info)->tv_usec = (ct->ct_wait % 1000) * 1000;
940 break;
941 case CLGET_SERVER_ADDR: /* For compatibility only */
942 (void) memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len);
943 break;
944 case CLGET_FD:
945 *(int *)info = ct->ct_fd;
946 break;
947 case CLGET_SVC_ADDR:
948 /* The caller should not free this memory area */
949 *(struct netbuf *)info = ct->ct_addr;
950 break;
951 case CLSET_SVC_ADDR: /* set to new address */
952 #ifdef undef
953 /*
954 * XXX: once the t_snddis(), followed by t_connect() starts to
955 * work, this ifdef should be removed. CLIENT handle reuse
956 * would then be possible for COTS as well.
957 */
958 if (t_snddis(ct->ct_fd, NULL) == -1) {
959 rpc_createerr.cf_stat = RPC_TLIERROR;
960 rpc_createerr.cf_error.re_terrno = t_errno;
961 rpc_createerr.cf_error.re_errno = errno;
962 rpc_fd_unlock(vctbl, ct->ct_fd);
963 return (FALSE);
964 }
965 ret = set_up_connection(ct->ct_fd, (struct netbuf *)info,
966 ct, NULL);
967 rpc_fd_unlock(vctbl, ct->ct_fd);
968 return (ret);
969 #else
970 rpc_fd_unlock(vctbl, ct->ct_fd);
971 return (FALSE);
972 #endif
973 case CLGET_XID:
974 /*
975 * use the knowledge that xid is the
976 * first element in the call structure
977 * This will get the xid of the PREVIOUS call
978 */
979 *(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall);
980 break;
981 case CLSET_XID:
982 /* This will set the xid of the NEXT call */
983 *(uint32_t *)ct->ct_mcall = htonl(*(uint32_t *)info + 1);
984 /* increment by 1 as clnt_vc_call() decrements once */
985 break;
986 case CLGET_VERS:
987 /*
988 * This RELIES on the information that, in the call body,
989 * the version number field is the fifth field from the
990 * begining of the RPC header. MUST be changed if the
991 * call_struct is changed
992 */
993 *(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
994 4 * BYTES_PER_XDR_UNIT));
995 break;
996
997 case CLSET_VERS:
998 *(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) =
999 htonl(*(uint32_t *)info);
1000 break;
1001
1002 case CLGET_PROG:
1003 /*
1004 * This RELIES on the information that, in the call body,
1005 * the program number field is the fourth field from the
1006 * begining of the RPC header. MUST be changed if the
1007 * call_struct is changed
1008 */
1009 *(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
1010 3 * BYTES_PER_XDR_UNIT));
1011 break;
1012
1013 case CLSET_PROG:
1014 *(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) =
1015 htonl(*(uint32_t *)info);
1016 break;
1017
1018 case CLSET_IO_MODE:
1019 if (!set_io_mode(ct, *(int *)info)) {
1020 rpc_fd_unlock(vctbl, ct->ct_fd);
1021 return (FALSE);
1022 }
1023 break;
1024 case CLSET_FLUSH_MODE:
1025 /* Set a specific FLUSH_MODE */
1026 if (!set_flush_mode(ct, *(int *)info)) {
1027 rpc_fd_unlock(vctbl, ct->ct_fd);
1028 return (FALSE);
1029 }
1030 break;
1031 case CLGET_FLUSH_MODE:
1032 *(rpcflushmode_t *)info = ct->ct_blocking_mode;
1033 break;
1034
1035 case CLGET_IO_MODE:
1036 *(rpciomode_t *)info = ct->ct_io_mode;
1037 break;
1038
1039 case CLGET_CURRENT_REC_SIZE:
1040 /*
1041 * Returns the current amount of memory allocated
1042 * to pending requests
1043 */
1044 *(int *)info = ct->ct_bufferPendingSize;
1045 break;
1046
1047 case CLSET_CONNMAXREC_SIZE:
1048 /* Cannot resize the buffer if it is used. */
1049 if (ct->ct_bufferPendingSize != 0) {
1050 rpc_fd_unlock(vctbl, ct->ct_fd);
1051 return (FALSE);
1052 }
1053 /*
1054 * If the new size is equal to the current size,
1055 * there is nothing to do.
1056 */
1057 if (ct->ct_bufferSize == *(uint_t *)info)
1058 break;
1059
1060 ct->ct_bufferSize = *(uint_t *)info;
1061 if (ct->ct_buffer) {
1062 free(ct->ct_buffer);
1063 ct->ct_buffer = NULL;
1064 ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL;
1065 }
1066 break;
1067
1068 case CLGET_CONNMAXREC_SIZE:
1069 /*
1070 * Returns the size of buffer allocated
1071 * to pending requests
1072 */
1073 *(uint_t *)info = ct->ct_bufferSize;
1074 break;
1075
1076 default:
1077 rpc_fd_unlock(vctbl, ct->ct_fd);
1078 return (FALSE);
1079 }
1080 rpc_fd_unlock(vctbl, ct->ct_fd);
1081 return (TRUE);
1082 }
1083
1084 static void
clnt_vc_destroy(CLIENT * cl)1085 clnt_vc_destroy(CLIENT *cl)
1086 {
1087 struct ct_data *ct = (struct ct_data *)cl->cl_private;
1088 int ct_fd = ct->ct_fd;
1089
1090 (void) rpc_fd_lock(vctbl, ct_fd);
1091
1092 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1093 (void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1094 (void) unregister_nb(ct);
1095 }
1096
1097 if (ct->ct_closeit)
1098 (void) t_close(ct_fd);
1099 XDR_DESTROY(&(ct->ct_xdrs));
1100 if (ct->ct_addr.buf)
1101 free(ct->ct_addr.buf);
1102 free(ct);
1103 if (cl->cl_netid && cl->cl_netid[0])
1104 free(cl->cl_netid);
1105 if (cl->cl_tp && cl->cl_tp[0])
1106 free(cl->cl_tp);
1107 free(cl);
1108 rpc_fd_unlock(vctbl, ct_fd);
1109 }
1110
1111 /*
1112 * Interface between xdr serializer and vc connection.
1113 * Behaves like the system calls, read & write, but keeps some error state
1114 * around for the rpc level.
1115 */
1116 static int
read_vc(void * ct_tmp,caddr_t buf,int len)1117 read_vc(void *ct_tmp, caddr_t buf, int len)
1118 {
1119 static pthread_key_t pfdp_key = PTHREAD_ONCE_KEY_NP;
1120 struct pollfd *pfdp;
1121 int npfd; /* total number of pfdp allocated */
1122 struct ct_data *ct = ct_tmp;
1123 struct timeval starttime;
1124 struct timeval curtime;
1125 int poll_time;
1126 int delta;
1127
1128 if (len == 0)
1129 return (0);
1130
1131 /*
1132 * Allocate just one the first time. thr_get_storage() may
1133 * return a larger buffer, left over from the last time we were
1134 * here, but that's OK. realloc() will deal with it properly.
1135 */
1136 npfd = 1;
1137 pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free);
1138 if (pfdp == NULL) {
1139 (void) syslog(LOG_ERR, clnt_vc_errstr,
1140 clnt_read_vc_str, __no_mem_str);
1141 rpc_callerr.re_status = RPC_SYSTEMERROR;
1142 rpc_callerr.re_errno = errno;
1143 rpc_callerr.re_terrno = 0;
1144 return (-1);
1145 }
1146
1147 /*
1148 * N.B.: slot 0 in the pollfd array is reserved for the file
1149 * descriptor we're really interested in (as opposed to the
1150 * callback descriptors).
1151 */
1152 pfdp[0].fd = ct->ct_fd;
1153 pfdp[0].events = MASKVAL;
1154 pfdp[0].revents = 0;
1155 poll_time = ct->ct_wait;
1156 if (gettimeofday(&starttime, NULL) == -1) {
1157 syslog(LOG_ERR, "Unable to get time of day: %m");
1158 return (-1);
1159 }
1160
1161 for (;;) {
1162 extern void (*_svc_getreqset_proc)();
1163 extern pollfd_t *svc_pollfd;
1164 extern int svc_max_pollfd;
1165 int fds;
1166
1167 /* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */
1168
1169 if (_svc_getreqset_proc) {
1170 sig_rw_rdlock(&svc_fd_lock);
1171
1172 /* reallocate pfdp to svc_max_pollfd +1 */
1173 if (npfd != (svc_max_pollfd + 1)) {
1174 struct pollfd *tmp_pfdp = realloc(pfdp,
1175 sizeof (struct pollfd) *
1176 (svc_max_pollfd + 1));
1177 if (tmp_pfdp == NULL) {
1178 sig_rw_unlock(&svc_fd_lock);
1179 (void) syslog(LOG_ERR, clnt_vc_errstr,
1180 clnt_read_vc_str, __no_mem_str);
1181 rpc_callerr.re_status = RPC_SYSTEMERROR;
1182 rpc_callerr.re_errno = errno;
1183 rpc_callerr.re_terrno = 0;
1184 return (-1);
1185 }
1186
1187 pfdp = tmp_pfdp;
1188 npfd = svc_max_pollfd + 1;
1189 (void) pthread_setspecific(pfdp_key, pfdp);
1190 }
1191 if (npfd > 1)
1192 (void) memcpy(&pfdp[1], svc_pollfd,
1193 sizeof (struct pollfd) * (npfd - 1));
1194
1195 sig_rw_unlock(&svc_fd_lock);
1196 } else {
1197 npfd = 1; /* don't forget about pfdp[0] */
1198 }
1199
1200 switch (fds = poll(pfdp, npfd, poll_time)) {
1201 case 0:
1202 rpc_callerr.re_status = RPC_TIMEDOUT;
1203 return (-1);
1204
1205 case -1:
1206 if (errno != EINTR)
1207 continue;
1208 else {
1209 /*
1210 * interrupted by another signal,
1211 * update time_waited
1212 */
1213
1214 if (gettimeofday(&curtime, NULL) == -1) {
1215 syslog(LOG_ERR,
1216 "Unable to get time of day: %m");
1217 errno = 0;
1218 continue;
1219 };
1220 delta = (curtime.tv_sec -
1221 starttime.tv_sec) * 1000 +
1222 (curtime.tv_usec -
1223 starttime.tv_usec) / 1000;
1224 poll_time -= delta;
1225 if (poll_time < 0) {
1226 rpc_callerr.re_status = RPC_TIMEDOUT;
1227 errno = 0;
1228 return (-1);
1229 } else {
1230 errno = 0; /* reset it */
1231 continue;
1232 }
1233 }
1234 }
1235
1236 if (pfdp[0].revents == 0) {
1237 /* must be for server side of the house */
1238 (*_svc_getreqset_proc)(&pfdp[1], fds);
1239 continue; /* do poll again */
1240 }
1241
1242 if (pfdp[0].revents & POLLNVAL) {
1243 rpc_callerr.re_status = RPC_CANTRECV;
1244 /*
1245 * Note: we're faking errno here because we
1246 * previously would have expected select() to
1247 * return -1 with errno EBADF. Poll(BA_OS)
1248 * returns 0 and sets the POLLNVAL revents flag
1249 * instead.
1250 */
1251 rpc_callerr.re_errno = errno = EBADF;
1252 return (-1);
1253 }
1254
1255 if (pfdp[0].revents & (POLLERR | POLLHUP)) {
1256 rpc_callerr.re_status = RPC_CANTRECV;
1257 rpc_callerr.re_errno = errno = EPIPE;
1258 return (-1);
1259 }
1260 break;
1261 }
1262
1263 switch (len = t_rcvall(ct->ct_fd, buf, len)) {
1264 case 0:
1265 /* premature eof */
1266 rpc_callerr.re_errno = ENOLINK;
1267 rpc_callerr.re_terrno = 0;
1268 rpc_callerr.re_status = RPC_CANTRECV;
1269 len = -1; /* it's really an error */
1270 break;
1271
1272 case -1:
1273 rpc_callerr.re_terrno = t_errno;
1274 rpc_callerr.re_errno = 0;
1275 rpc_callerr.re_status = RPC_CANTRECV;
1276 break;
1277 }
1278 return (len);
1279 }
1280
1281 static int
write_vc(void * ct_tmp,caddr_t buf,int len)1282 write_vc(void *ct_tmp, caddr_t buf, int len)
1283 {
1284 int i, cnt;
1285 struct ct_data *ct = ct_tmp;
1286 int flag;
1287 int maxsz;
1288
1289 maxsz = ct->ct_tsdu;
1290
1291 /* Handle the non-blocking mode */
1292 if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1293 /*
1294 * Test a special case here. If the length of the current
1295 * write is greater than the transport data unit, and the
1296 * mode is non blocking, we return RPC_CANTSEND.
1297 * XXX this is not very clean.
1298 */
1299 if (maxsz > 0 && len > maxsz) {
1300 rpc_callerr.re_terrno = errno;
1301 rpc_callerr.re_errno = 0;
1302 rpc_callerr.re_status = RPC_CANTSEND;
1303 return (-1);
1304 }
1305
1306 len = nb_send(ct, buf, (unsigned)len);
1307 if (len == -1) {
1308 rpc_callerr.re_terrno = errno;
1309 rpc_callerr.re_errno = 0;
1310 rpc_callerr.re_status = RPC_CANTSEND;
1311 } else if (len == -2) {
1312 rpc_callerr.re_terrno = 0;
1313 rpc_callerr.re_errno = 0;
1314 rpc_callerr.re_status = RPC_CANTSTORE;
1315 }
1316 return (len);
1317 }
1318
1319 if ((maxsz == 0) || (maxsz == -1)) {
1320 /*
1321 * T_snd may return -1 for error on connection (connection
1322 * needs to be repaired/closed, and -2 for flow-control
1323 * handling error (no operation to do, just wait and call
1324 * T_Flush()).
1325 */
1326 if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) {
1327 rpc_callerr.re_terrno = t_errno;
1328 rpc_callerr.re_errno = 0;
1329 rpc_callerr.re_status = RPC_CANTSEND;
1330 }
1331 return (len);
1332 }
1333
1334 /*
1335 * This for those transports which have a max size for data.
1336 */
1337 for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) {
1338 flag = cnt > maxsz ? T_MORE : 0;
1339 if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz),
1340 flag)) == -1) {
1341 rpc_callerr.re_terrno = t_errno;
1342 rpc_callerr.re_errno = 0;
1343 rpc_callerr.re_status = RPC_CANTSEND;
1344 return (-1);
1345 }
1346 }
1347 return (len);
1348 }
1349
1350 /*
1351 * Receive the required bytes of data, even if it is fragmented.
1352 */
1353 static int
t_rcvall(int fd,char * buf,int len)1354 t_rcvall(int fd, char *buf, int len)
1355 {
1356 int moreflag;
1357 int final = 0;
1358 int res;
1359
1360 do {
1361 moreflag = 0;
1362 res = t_rcv(fd, buf, (unsigned)len, &moreflag);
1363 if (res == -1) {
1364 if (t_errno == TLOOK)
1365 switch (t_look(fd)) {
1366 case T_DISCONNECT:
1367 (void) t_rcvdis(fd, NULL);
1368 (void) t_snddis(fd, NULL);
1369 return (-1);
1370 case T_ORDREL:
1371 /* Received orderly release indication */
1372 (void) t_rcvrel(fd);
1373 /* Send orderly release indicator */
1374 (void) t_sndrel(fd);
1375 return (-1);
1376 default:
1377 return (-1);
1378 }
1379 } else if (res == 0) {
1380 return (0);
1381 }
1382 final += res;
1383 buf += res;
1384 len -= res;
1385 } while ((len > 0) && (moreflag & T_MORE));
1386 return (final);
1387 }
1388
1389 static struct clnt_ops *
clnt_vc_ops(void)1390 clnt_vc_ops(void)
1391 {
1392 static struct clnt_ops ops;
1393 extern mutex_t ops_lock;
1394
1395 /* VARIABLES PROTECTED BY ops_lock: ops */
1396
1397 sig_mutex_lock(&ops_lock);
1398 if (ops.cl_call == NULL) {
1399 ops.cl_call = clnt_vc_call;
1400 ops.cl_send = clnt_vc_send;
1401 ops.cl_abort = clnt_vc_abort;
1402 ops.cl_geterr = clnt_vc_geterr;
1403 ops.cl_freeres = clnt_vc_freeres;
1404 ops.cl_destroy = clnt_vc_destroy;
1405 ops.cl_control = clnt_vc_control;
1406 }
1407 sig_mutex_unlock(&ops_lock);
1408 return (&ops);
1409 }
1410
1411 /*
1412 * Make sure that the time is not garbage. -1 value is disallowed.
1413 * Note this is different from time_not_ok in clnt_dg.c
1414 */
1415 static bool_t
time_not_ok(struct timeval * t)1416 time_not_ok(struct timeval *t)
1417 {
1418 return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
1419 t->tv_usec <= -1 || t->tv_usec > 1000000);
1420 }
1421
1422
1423 /* Compute the # of bytes that remains until the end of the buffer */
1424 #define REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer))
1425
1426 static int
addInBuffer(struct ct_data * ct,char * dataToAdd,unsigned int nBytes)1427 addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes)
1428 {
1429 if (NULL == ct->ct_buffer) {
1430 /* Buffer not allocated yet. */
1431 char *buffer;
1432
1433 buffer = malloc(ct->ct_bufferSize);
1434 if (NULL == buffer) {
1435 errno = ENOMEM;
1436 return (-1);
1437 }
1438 (void) memcpy(buffer, dataToAdd, nBytes);
1439
1440 ct->ct_buffer = buffer;
1441 ct->ct_bufferReadPtr = buffer;
1442 ct->ct_bufferWritePtr = buffer + nBytes;
1443 ct->ct_bufferPendingSize = nBytes;
1444 } else {
1445 /*
1446 * For an already allocated buffer, two mem copies
1447 * might be needed, depending on the current
1448 * writing position.
1449 */
1450
1451 /* Compute the length of the first copy. */
1452 int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr));
1453
1454 ct->ct_bufferPendingSize += nBytes;
1455
1456 (void) memcpy(ct->ct_bufferWritePtr, dataToAdd, len);
1457 ct->ct_bufferWritePtr += len;
1458 nBytes -= len;
1459 if (0 == nBytes) {
1460 /* One memcopy needed. */
1461
1462 /*
1463 * If the write pointer is at the end of the buffer,
1464 * wrap it now.
1465 */
1466 if (ct->ct_bufferWritePtr ==
1467 (ct->ct_buffer + ct->ct_bufferSize)) {
1468 ct->ct_bufferWritePtr = ct->ct_buffer;
1469 }
1470 } else {
1471 /* Two memcopy needed. */
1472 dataToAdd += len;
1473
1474 /*
1475 * Copy the remaining data to the beginning of the
1476 * buffer
1477 */
1478 (void) memcpy(ct->ct_buffer, dataToAdd, nBytes);
1479 ct->ct_bufferWritePtr = ct->ct_buffer + nBytes;
1480 }
1481 }
1482 return (0);
1483 }
1484
1485 static void
consumeFromBuffer(struct ct_data * ct,unsigned int nBytes)1486 consumeFromBuffer(struct ct_data *ct, unsigned int nBytes)
1487 {
1488 ct->ct_bufferPendingSize -= nBytes;
1489 if (ct->ct_bufferPendingSize == 0) {
1490 /*
1491 * If the buffer contains no data, we set the two pointers at
1492 * the beginning of the buffer (to miminize buffer wraps).
1493 */
1494 ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer;
1495 } else {
1496 ct->ct_bufferReadPtr += nBytes;
1497 if (ct->ct_bufferReadPtr >
1498 ct->ct_buffer + ct->ct_bufferSize) {
1499 ct->ct_bufferReadPtr -= ct->ct_bufferSize;
1500 }
1501 }
1502 }
1503
1504 static int
iovFromBuffer(struct ct_data * ct,struct iovec * iov)1505 iovFromBuffer(struct ct_data *ct, struct iovec *iov)
1506 {
1507 int l;
1508
1509 if (ct->ct_bufferPendingSize == 0)
1510 return (0);
1511
1512 l = REMAIN_BYTES(bufferReadPtr);
1513 if (l < ct->ct_bufferPendingSize) {
1514 /* Buffer in two fragments. */
1515 iov[0].iov_base = ct->ct_bufferReadPtr;
1516 iov[0].iov_len = l;
1517
1518 iov[1].iov_base = ct->ct_buffer;
1519 iov[1].iov_len = ct->ct_bufferPendingSize - l;
1520 return (2);
1521 } else {
1522 /* Buffer in one fragment. */
1523 iov[0].iov_base = ct->ct_bufferReadPtr;
1524 iov[0].iov_len = ct->ct_bufferPendingSize;
1525 return (1);
1526 }
1527 }
1528
1529 static bool_t
set_flush_mode(struct ct_data * ct,int mode)1530 set_flush_mode(struct ct_data *ct, int mode)
1531 {
1532 switch (mode) {
1533 case RPC_CL_BLOCKING_FLUSH:
1534 /* flush as most as possible without blocking */
1535 case RPC_CL_BESTEFFORT_FLUSH:
1536 /* flush the buffer completely (possibly blocking) */
1537 case RPC_CL_DEFAULT_FLUSH:
1538 /* flush according to the currently defined policy */
1539 ct->ct_blocking_mode = mode;
1540 return (TRUE);
1541 default:
1542 return (FALSE);
1543 }
1544 }
1545
1546 static bool_t
set_io_mode(struct ct_data * ct,int ioMode)1547 set_io_mode(struct ct_data *ct, int ioMode)
1548 {
1549 switch (ioMode) {
1550 case RPC_CL_BLOCKING:
1551 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1552 if (NULL != ct->ct_buffer) {
1553 /*
1554 * If a buffer was allocated for this
1555 * connection, flush it now, and free it.
1556 */
1557 (void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1558 free(ct->ct_buffer);
1559 ct->ct_buffer = NULL;
1560 }
1561 (void) unregister_nb(ct);
1562 ct->ct_io_mode = ioMode;
1563 }
1564 break;
1565 case RPC_CL_NONBLOCKING:
1566 if (ct->ct_io_mode == RPC_CL_BLOCKING) {
1567 if (-1 == register_nb(ct)) {
1568 return (FALSE);
1569 }
1570 ct->ct_io_mode = ioMode;
1571 }
1572 break;
1573 default:
1574 return (FALSE);
1575 }
1576 return (TRUE);
1577 }
1578
1579 static int
do_flush(struct ct_data * ct,uint_t flush_mode)1580 do_flush(struct ct_data *ct, uint_t flush_mode)
1581 {
1582 int result;
1583 if (ct->ct_bufferPendingSize == 0) {
1584 return (0);
1585 }
1586
1587 switch (flush_mode) {
1588 case RPC_CL_BLOCKING_FLUSH:
1589 if (!set_blocking_connection(ct, TRUE)) {
1590 return (-1);
1591 }
1592 while (ct->ct_bufferPendingSize > 0) {
1593 if (REMAIN_BYTES(bufferReadPtr) <
1594 ct->ct_bufferPendingSize) {
1595 struct iovec iov[2];
1596 (void) iovFromBuffer(ct, iov);
1597 result = writev(ct->ct_fd, iov, 2);
1598 } else {
1599 result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1600 ct->ct_bufferPendingSize, 0);
1601 }
1602 if (result < 0) {
1603 return (-1);
1604 }
1605 consumeFromBuffer(ct, result);
1606 }
1607
1608 break;
1609
1610 case RPC_CL_BESTEFFORT_FLUSH:
1611 (void) set_blocking_connection(ct, FALSE);
1612 if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) {
1613 struct iovec iov[2];
1614 (void) iovFromBuffer(ct, iov);
1615 result = writev(ct->ct_fd, iov, 2);
1616 } else {
1617 result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1618 ct->ct_bufferPendingSize, 0);
1619 }
1620 if (result < 0) {
1621 if (errno != EWOULDBLOCK) {
1622 perror("flush");
1623 return (-1);
1624 }
1625 return (0);
1626 }
1627 if (result > 0)
1628 consumeFromBuffer(ct, result);
1629 break;
1630 }
1631 return (0);
1632 }
1633
1634 /*
1635 * Non blocking send.
1636 */
1637
1638 /*
1639 * Test if this is last fragment. See comment in front of xdr_rec.c
1640 * for details.
1641 */
1642 #define LAST_FRAG(x) ((ntohl(*(uint32_t *)x) & (1U << 31)) == (1U << 31))
1643
1644 static int
nb_send(struct ct_data * ct,void * buff,unsigned int nBytes)1645 nb_send(struct ct_data *ct, void *buff, unsigned int nBytes)
1646 {
1647 int result;
1648
1649 if (!LAST_FRAG(buff)) {
1650 return (-1);
1651 }
1652
1653 /*
1654 * Check to see if the current message can be stored fully in the
1655 * buffer. We have to check this now because it may be impossible
1656 * to send any data, so the message must be stored in the buffer.
1657 */
1658 if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) {
1659 /* Try to flush (to free some space). */
1660 (void) do_flush(ct, RPC_CL_BESTEFFORT_FLUSH);
1661
1662 /* Can we store the message now ? */
1663 if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize))
1664 return (-2);
1665 }
1666
1667 (void) set_blocking_connection(ct, FALSE);
1668
1669 /*
1670 * If there is no data pending, we can simply try
1671 * to send our data.
1672 */
1673 if (ct->ct_bufferPendingSize == 0) {
1674 result = t_snd(ct->ct_fd, buff, nBytes, 0);
1675 if (result == -1) {
1676 if (errno == EWOULDBLOCK) {
1677 result = 0;
1678 } else {
1679 perror("send");
1680 return (-1);
1681 }
1682 }
1683 /*
1684 * If we have not sent all data, we must store them
1685 * in the buffer.
1686 */
1687 if (result != nBytes) {
1688 if (addInBuffer(ct, (char *)buff + result,
1689 nBytes - result) == -1) {
1690 return (-1);
1691 }
1692 }
1693 } else {
1694 /*
1695 * Some data pending in the buffer. We try to send
1696 * both buffer data and current message in one shot.
1697 */
1698 struct iovec iov[3];
1699 int i = iovFromBuffer(ct, &iov[0]);
1700
1701 iov[i].iov_base = buff;
1702 iov[i].iov_len = nBytes;
1703
1704 result = writev(ct->ct_fd, iov, i+1);
1705 if (result == -1) {
1706 if (errno == EWOULDBLOCK) {
1707 /* No bytes sent */
1708 result = 0;
1709 } else {
1710 return (-1);
1711 }
1712 }
1713
1714 /*
1715 * Add the bytes from the message
1716 * that we have not sent.
1717 */
1718 if (result <= ct->ct_bufferPendingSize) {
1719 /* No bytes from the message sent */
1720 consumeFromBuffer(ct, result);
1721 if (addInBuffer(ct, buff, nBytes) == -1) {
1722 return (-1);
1723 }
1724 } else {
1725 /*
1726 * Some bytes of the message are sent.
1727 * Compute the length of the message that has
1728 * been sent.
1729 */
1730 int len = result - ct->ct_bufferPendingSize;
1731
1732 /* So, empty the buffer. */
1733 ct->ct_bufferReadPtr = ct->ct_buffer;
1734 ct->ct_bufferWritePtr = ct->ct_buffer;
1735 ct->ct_bufferPendingSize = 0;
1736
1737 /* And add the remaining part of the message. */
1738 if (len != nBytes) {
1739 if (addInBuffer(ct, (char *)buff + len,
1740 nBytes-len) == -1) {
1741 return (-1);
1742 }
1743 }
1744 }
1745 }
1746 return (nBytes);
1747 }
1748
1749 static void
flush_registered_clients(void)1750 flush_registered_clients(void)
1751 {
1752 struct nb_reg_node *node;
1753
1754 if (LIST_ISEMPTY(nb_first)) {
1755 return;
1756 }
1757
1758 LIST_FOR_EACH(nb_first, node) {
1759 (void) do_flush(node->ct, RPC_CL_BLOCKING_FLUSH);
1760 }
1761 }
1762
1763 static int
allocate_chunk(void)1764 allocate_chunk(void)
1765 {
1766 #define CHUNK_SIZE 16
1767 struct nb_reg_node *chk =
1768 malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE);
1769 struct nb_reg_node *n;
1770 int i;
1771
1772 if (NULL == chk) {
1773 return (-1);
1774 }
1775
1776 n = chk;
1777 for (i = 0; i < CHUNK_SIZE-1; ++i) {
1778 n[i].next = &(n[i+1]);
1779 }
1780 n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free;
1781 nb_free = chk;
1782 return (0);
1783 }
1784
1785 static int
register_nb(struct ct_data * ct)1786 register_nb(struct ct_data *ct)
1787 {
1788 struct nb_reg_node *node;
1789
1790 (void) mutex_lock(&nb_list_mutex);
1791
1792 if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) {
1793 (void) mutex_unlock(&nb_list_mutex);
1794 errno = ENOMEM;
1795 return (-1);
1796 }
1797
1798 if (!exit_handler_set) {
1799 (void) atexit(flush_registered_clients);
1800 exit_handler_set = TRUE;
1801 }
1802 /* Get the first free node */
1803 LIST_EXTRACT(nb_free, node);
1804
1805 node->ct = ct;
1806
1807 LIST_ADD(nb_first, node);
1808 (void) mutex_unlock(&nb_list_mutex);
1809
1810 return (0);
1811 }
1812
1813 static int
unregister_nb(struct ct_data * ct)1814 unregister_nb(struct ct_data *ct)
1815 {
1816 struct nb_reg_node *node;
1817
1818 (void) mutex_lock(&nb_list_mutex);
1819 assert(!LIST_ISEMPTY(nb_first));
1820
1821 node = nb_first;
1822 LIST_FOR_EACH(nb_first, node) {
1823 if (node->next->ct == ct) {
1824 /* Get the node to unregister. */
1825 struct nb_reg_node *n = node->next;
1826 node->next = n->next;
1827
1828 n->ct = NULL;
1829 LIST_ADD(nb_free, n);
1830 break;
1831 }
1832 }
1833 (void) mutex_unlock(&nb_list_mutex);
1834 return (0);
1835 }
1836