1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2015 Nexenta Systems, Inc. All rights reserved. 24 * Copyright (c) 2016 by Delphix. All rights reserved. 25 */ 26 27 /* 28 * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 29 * Use is subject to license terms. 30 */ 31 32 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ 33 /* All Rights Reserved */ 34 /* 35 * Portions of this source code were derived from Berkeley 36 * 4.3 BSD under license from the Regents of the University of 37 * California. 38 */ 39 40 /* 41 * clnt_vc.c 42 * 43 * Implements a connectionful client side RPC. 44 * 45 * Connectionful RPC supports 'batched calls'. 46 * A sequence of calls may be batched-up in a send buffer. The rpc call 47 * return immediately to the client even though the call was not necessarily 48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND 49 * the rpc timeout value is zero (see clnt.h, rpc). 50 * 51 * Clients should NOT casually batch calls that in fact return results; that 52 * is the server side should be aware that a call is batched and not produce 53 * any return message. Batched calls that produce many result messages can 54 * deadlock (netlock) the client and the server.... 55 */ 56 57 58 #include "mt.h" 59 #include "rpc_mt.h" 60 #include <assert.h> 61 #include <rpc/rpc.h> 62 #include <errno.h> 63 #include <sys/byteorder.h> 64 #include <sys/mkdev.h> 65 #include <sys/poll.h> 66 #include <syslog.h> 67 #include <stdlib.h> 68 #include <unistd.h> 69 #include <netinet/tcp.h> 70 #include <limits.h> 71 72 #define MCALL_MSG_SIZE 24 73 #define SECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000 * 1000) 74 #define MSECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000) 75 #define USECS_TO_NS(x) ((hrtime_t)(x) * 1000) 76 #define NSECS_TO_MS(x) ((x) / 1000 / 1000) 77 #ifndef MIN 78 #define MIN(a, b) (((a) < (b)) ? (a) : (b)) 79 #endif 80 81 extern int __rpc_timeval_to_msec(struct timeval *); 82 extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *); 83 extern bool_t xdr_opaque_auth(XDR *, struct opaque_auth *); 84 extern bool_t __rpc_gss_wrap(AUTH *, char *, uint_t, XDR *, bool_t (*)(), 85 caddr_t); 86 extern bool_t __rpc_gss_unwrap(AUTH *, XDR *, bool_t (*)(), caddr_t); 87 extern CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t, 88 rpcvers_t, uint_t, uint_t, const struct timeval *); 89 90 static struct clnt_ops *clnt_vc_ops(void); 91 static int read_vc(void *, caddr_t, int); 92 static int write_vc(void *, caddr_t, int); 93 static int t_rcvall(int, char *, int); 94 static bool_t time_not_ok(struct timeval *); 95 96 struct ct_data; 97 static bool_t set_up_connection(int, struct netbuf *, 98 struct ct_data *, const struct timeval *); 99 static bool_t set_io_mode(struct ct_data *, int); 100 101 /* 102 * Lock table handle used by various MT sync. routines 103 */ 104 static mutex_t vctbl_lock = DEFAULTMUTEX; 105 static void *vctbl = NULL; 106 107 static const char clnt_vc_errstr[] = "%s : %s"; 108 static const char clnt_vc_str[] = "clnt_vc_create"; 109 static const char clnt_read_vc_str[] = "read_vc"; 110 static const char __no_mem_str[] = "out of memory"; 111 static const char no_fcntl_getfl_str[] = "could not get status flags and modes"; 112 static const char no_nonblock_str[] = "could not set transport blocking mode"; 113 114 /* 115 * Private data structure 116 */ 117 struct ct_data { 118 int ct_fd; /* connection's fd */ 119 bool_t ct_closeit; /* close it on destroy */ 120 int ct_tsdu; /* size of tsdu */ 121 int ct_wait; /* wait interval in milliseconds */ 122 bool_t ct_waitset; /* wait set by clnt_control? */ 123 struct netbuf ct_addr; /* remote addr */ 124 struct rpc_err ct_error; 125 char ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */ 126 uint_t ct_mpos; /* pos after marshal */ 127 XDR ct_xdrs; /* XDR stream */ 128 129 /* NON STANDARD INFO - 00-08-31 */ 130 bool_t ct_is_oneway; /* True if the current call is oneway. */ 131 bool_t ct_is_blocking; 132 ushort_t ct_io_mode; 133 ushort_t ct_blocking_mode; 134 uint_t ct_bufferSize; /* Total size of the buffer. */ 135 uint_t ct_bufferPendingSize; /* Size of unsent data. */ 136 char *ct_buffer; /* Pointer to the buffer. */ 137 char *ct_bufferWritePtr; /* Ptr to the first free byte. */ 138 char *ct_bufferReadPtr; /* Ptr to the first byte of data. */ 139 }; 140 141 struct nb_reg_node { 142 struct nb_reg_node *next; 143 struct ct_data *ct; 144 }; 145 146 static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first; 147 static struct nb_reg_node *nb_free = (struct nb_reg_node *)&nb_free; 148 149 static bool_t exit_handler_set = FALSE; 150 151 static mutex_t nb_list_mutex = DEFAULTMUTEX; 152 153 154 /* Define some macros to manage the linked list. */ 155 #define LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l) 156 #define LIST_CLR(l) (l = (struct nb_reg_node *)&l) 157 #define LIST_ADD(l, node) (node->next = l->next, l = node) 158 #define LIST_EXTRACT(l, node) (node = l, l = l->next) 159 #define LIST_FOR_EACH(l, node) \ 160 for (node = l; node != (struct nb_reg_node *)&l; node = node->next) 161 162 163 /* Default size of the IO buffer used in non blocking mode */ 164 #define DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024) 165 166 static int nb_send(struct ct_data *, void *, unsigned int); 167 static int do_flush(struct ct_data *, uint_t); 168 static bool_t set_flush_mode(struct ct_data *, int); 169 static bool_t set_blocking_connection(struct ct_data *, bool_t); 170 171 static int register_nb(struct ct_data *); 172 static int unregister_nb(struct ct_data *); 173 174 175 /* 176 * Change the mode of the underlying fd. 177 */ 178 static bool_t 179 set_blocking_connection(struct ct_data *ct, bool_t blocking) 180 { 181 int flag; 182 183 /* 184 * If the underlying fd is already in the required mode, 185 * avoid the syscall. 186 */ 187 if (ct->ct_is_blocking == blocking) 188 return (TRUE); 189 190 if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) { 191 (void) syslog(LOG_ERR, "set_blocking_connection : %s", 192 no_fcntl_getfl_str); 193 return (FALSE); 194 } 195 196 flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK; 197 if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) { 198 (void) syslog(LOG_ERR, "set_blocking_connection : %s", 199 no_nonblock_str); 200 return (FALSE); 201 } 202 ct->ct_is_blocking = blocking; 203 return (TRUE); 204 } 205 206 /* 207 * Create a client handle for a connection. 208 * Default options are set, which the user can change using clnt_control()'s. 209 * The rpc/vc package does buffering similar to stdio, so the client 210 * must pick send and receive buffer sizes, 0 => use the default. 211 * NB: fd is copied into a private area. 212 * NB: The rpch->cl_auth is set null authentication. Caller may wish to 213 * set this something more useful. 214 * 215 * fd should be open and bound. 216 */ 217 CLIENT * 218 clnt_vc_create(const int fd, struct netbuf *svcaddr, const rpcprog_t prog, 219 const rpcvers_t vers, const uint_t sendsz, const uint_t recvsz) 220 { 221 return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz, 222 recvsz, NULL)); 223 } 224 225 /* 226 * This has the same definition as clnt_vc_create(), except it 227 * takes an additional parameter - a pointer to a timeval structure. 228 * 229 * Not a public interface. This is for clnt_create_timed, 230 * clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout 231 * value to control a tcp connection attempt. 232 * (for bug 4049792: clnt_create_timed does not time out) 233 * 234 * If tp is NULL, use default timeout to set up the connection. 235 */ 236 CLIENT * 237 _clnt_vc_create_timed(int fd, struct netbuf *svcaddr, rpcprog_t prog, 238 rpcvers_t vers, uint_t sendsz, uint_t recvsz, const struct timeval *tp) 239 { 240 CLIENT *cl; /* client handle */ 241 struct ct_data *ct; /* private data */ 242 struct timeval now; 243 struct rpc_msg call_msg; 244 struct t_info tinfo; 245 int flag; 246 247 cl = malloc(sizeof (*cl)); 248 if ((ct = malloc(sizeof (*ct))) != NULL) 249 ct->ct_addr.buf = NULL; 250 251 if ((cl == NULL) || (ct == NULL)) { 252 (void) syslog(LOG_ERR, clnt_vc_errstr, 253 clnt_vc_str, __no_mem_str); 254 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 255 rpc_createerr.cf_error.re_errno = errno; 256 rpc_createerr.cf_error.re_terrno = 0; 257 goto err; 258 } 259 260 /* 261 * The only use of vctbl_lock is for serializing the creation of 262 * vctbl. Once created the lock needs to be released so we don't 263 * hold it across the set_up_connection() call and end up with a 264 * bunch of threads stuck waiting for the mutex. 265 */ 266 sig_mutex_lock(&vctbl_lock); 267 268 if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) { 269 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 270 rpc_createerr.cf_error.re_errno = errno; 271 rpc_createerr.cf_error.re_terrno = 0; 272 sig_mutex_unlock(&vctbl_lock); 273 goto err; 274 } 275 276 sig_mutex_unlock(&vctbl_lock); 277 278 ct->ct_io_mode = RPC_CL_BLOCKING; 279 ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH; 280 281 ct->ct_buffer = NULL; /* We allocate the buffer when needed. */ 282 ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE; 283 ct->ct_bufferPendingSize = 0; 284 ct->ct_bufferWritePtr = NULL; 285 ct->ct_bufferReadPtr = NULL; 286 287 /* Check the current state of the fd. */ 288 if ((flag = fcntl(fd, F_GETFL, 0)) < 0) { 289 (void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s", 290 no_fcntl_getfl_str); 291 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 292 rpc_createerr.cf_error.re_terrno = errno; 293 rpc_createerr.cf_error.re_errno = 0; 294 goto err; 295 } 296 ct->ct_is_blocking = flag & O_NONBLOCK ? FALSE : TRUE; 297 298 if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) { 299 goto err; 300 } 301 302 /* 303 * Set up other members of private data struct 304 */ 305 ct->ct_fd = fd; 306 /* 307 * The actual value will be set by clnt_call or clnt_control 308 */ 309 ct->ct_wait = 30000; 310 ct->ct_waitset = FALSE; 311 /* 312 * By default, closeit is always FALSE. It is users responsibility 313 * to do a t_close on it, else the user may use clnt_control 314 * to let clnt_destroy do it for them. 315 */ 316 ct->ct_closeit = FALSE; 317 318 /* 319 * Initialize call message 320 */ 321 (void) gettimeofday(&now, (struct timezone *)0); 322 call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec; 323 call_msg.rm_call.cb_prog = prog; 324 call_msg.rm_call.cb_vers = vers; 325 326 /* 327 * pre-serialize the static part of the call msg and stash it away 328 */ 329 xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE); 330 if (!xdr_callhdr(&(ct->ct_xdrs), &call_msg)) { 331 goto err; 332 } 333 ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs)); 334 XDR_DESTROY(&(ct->ct_xdrs)); 335 336 if (t_getinfo(fd, &tinfo) == -1) { 337 rpc_createerr.cf_stat = RPC_TLIERROR; 338 rpc_createerr.cf_error.re_terrno = t_errno; 339 rpc_createerr.cf_error.re_errno = 0; 340 goto err; 341 } 342 /* 343 * Find the receive and the send size 344 */ 345 sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu); 346 recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu); 347 if ((sendsz == 0) || (recvsz == 0)) { 348 rpc_createerr.cf_stat = RPC_TLIERROR; 349 rpc_createerr.cf_error.re_terrno = 0; 350 rpc_createerr.cf_error.re_errno = 0; 351 goto err; 352 } 353 ct->ct_tsdu = tinfo.tsdu; 354 /* 355 * Create a client handle which uses xdrrec for serialization 356 * and authnone for authentication. 357 */ 358 ct->ct_xdrs.x_ops = NULL; 359 xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct, 360 read_vc, write_vc); 361 if (ct->ct_xdrs.x_ops == NULL) { 362 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 363 rpc_createerr.cf_error.re_terrno = 0; 364 rpc_createerr.cf_error.re_errno = ENOMEM; 365 goto err; 366 } 367 cl->cl_ops = clnt_vc_ops(); 368 cl->cl_private = (caddr_t)ct; 369 cl->cl_auth = authnone_create(); 370 cl->cl_tp = NULL; 371 cl->cl_netid = NULL; 372 return (cl); 373 374 err: 375 if (ct) { 376 free(ct->ct_addr.buf); 377 free(ct); 378 } 379 free(cl); 380 381 return (NULL); 382 } 383 384 #define TCPOPT_BUFSIZE 128 385 386 /* 387 * Set tcp connection timeout value. 388 * Retun 0 for success, -1 for failure. 389 */ 390 static int 391 _set_tcp_conntime(int fd, int optval) 392 { 393 struct t_optmgmt req, res; 394 struct opthdr *opt; 395 int *ip; 396 char buf[TCPOPT_BUFSIZE]; 397 398 opt = (struct opthdr *)buf; 399 opt->level = IPPROTO_TCP; 400 opt->name = TCP_CONN_ABORT_THRESHOLD; 401 opt->len = sizeof (int); 402 403 req.flags = T_NEGOTIATE; 404 req.opt.len = sizeof (struct opthdr) + opt->len; 405 req.opt.buf = (char *)opt; 406 ip = (int *)((char *)buf + sizeof (struct opthdr)); 407 *ip = optval; 408 409 res.flags = 0; 410 res.opt.buf = (char *)buf; 411 res.opt.maxlen = sizeof (buf); 412 if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) { 413 return (-1); 414 } 415 return (0); 416 } 417 418 /* 419 * Get current tcp connection timeout value. 420 * Retun the timeout in milliseconds, or -1 for failure. 421 */ 422 static int 423 _get_tcp_conntime(int fd) 424 { 425 struct t_optmgmt req, res; 426 struct opthdr *opt; 427 int *ip, retval; 428 char buf[TCPOPT_BUFSIZE]; 429 430 opt = (struct opthdr *)buf; 431 opt->level = IPPROTO_TCP; 432 opt->name = TCP_CONN_ABORT_THRESHOLD; 433 opt->len = sizeof (int); 434 435 req.flags = T_CURRENT; 436 req.opt.len = sizeof (struct opthdr) + opt->len; 437 req.opt.buf = (char *)opt; 438 ip = (int *)((char *)buf + sizeof (struct opthdr)); 439 *ip = 0; 440 441 res.flags = 0; 442 res.opt.buf = (char *)buf; 443 res.opt.maxlen = sizeof (buf); 444 if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) { 445 return (-1); 446 } 447 448 ip = (int *)((char *)buf + sizeof (struct opthdr)); 449 retval = *ip; 450 return (retval); 451 } 452 453 static bool_t 454 set_up_connection(int fd, struct netbuf *svcaddr, struct ct_data *ct, 455 const struct timeval *tp) 456 { 457 int state; 458 struct t_call sndcallstr, *rcvcall; 459 int nconnect; 460 bool_t connected, do_rcv_connect; 461 int curr_time = -1; 462 hrtime_t start; 463 hrtime_t tout; /* timeout in nanoseconds (from tp) */ 464 465 ct->ct_addr.len = 0; 466 state = t_getstate(fd); 467 if (state == -1) { 468 rpc_createerr.cf_stat = RPC_TLIERROR; 469 rpc_createerr.cf_error.re_errno = 0; 470 rpc_createerr.cf_error.re_terrno = t_errno; 471 return (FALSE); 472 } 473 474 switch (state) { 475 case T_IDLE: 476 if (svcaddr == NULL) { 477 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 478 return (FALSE); 479 } 480 /* 481 * Connect only if state is IDLE and svcaddr known 482 */ 483 rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR); 484 if (rcvcall == NULL) { 485 rpc_createerr.cf_stat = RPC_TLIERROR; 486 rpc_createerr.cf_error.re_terrno = t_errno; 487 rpc_createerr.cf_error.re_errno = errno; 488 return (FALSE); 489 } 490 rcvcall->udata.maxlen = 0; 491 sndcallstr.addr = *svcaddr; 492 sndcallstr.opt.len = 0; 493 sndcallstr.udata.len = 0; 494 /* 495 * Even NULL could have sufficed for rcvcall, because 496 * the address returned is same for all cases except 497 * for the gateway case, and hence required. 498 */ 499 connected = FALSE; 500 do_rcv_connect = FALSE; 501 502 /* 503 * If there is a timeout value specified, we will try to 504 * reset the tcp connection timeout. If the transport does 505 * not support the TCP_CONN_ABORT_THRESHOLD option or fails 506 * for other reason, default timeout will be used. 507 */ 508 if (tp != NULL) { 509 start = gethrtime(); 510 511 /* 512 * Calculate the timeout in nanoseconds 513 */ 514 tout = SECS_TO_NS(tp->tv_sec) + 515 USECS_TO_NS(tp->tv_usec); 516 curr_time = _get_tcp_conntime(fd); 517 } 518 519 for (nconnect = 0; nconnect < 3; nconnect++) { 520 if (tp != NULL) { 521 /* 522 * Calculate the elapsed time 523 */ 524 hrtime_t elapsed = gethrtime() - start; 525 if (elapsed >= tout) 526 break; 527 528 if (curr_time != -1) { 529 int ms; 530 531 /* 532 * TCP_CONN_ABORT_THRESHOLD takes int 533 * value in milliseconds. Make sure we 534 * do not overflow. 535 */ 536 if (NSECS_TO_MS(tout - elapsed) >= 537 INT_MAX) { 538 ms = INT_MAX; 539 } else { 540 ms = (int) 541 NSECS_TO_MS(tout - elapsed); 542 if (MSECS_TO_NS(ms) != 543 tout - elapsed) 544 ms++; 545 } 546 547 (void) _set_tcp_conntime(fd, ms); 548 } 549 } 550 551 if (t_connect(fd, &sndcallstr, rcvcall) != -1) { 552 connected = TRUE; 553 break; 554 } 555 if (t_errno == TLOOK) { 556 switch (t_look(fd)) { 557 case T_DISCONNECT: 558 (void) t_rcvdis(fd, (struct 559 t_discon *) NULL); 560 break; 561 default: 562 break; 563 } 564 } else if (!(t_errno == TSYSERR && errno == EINTR)) { 565 break; 566 } 567 if ((state = t_getstate(fd)) == T_OUTCON) { 568 do_rcv_connect = TRUE; 569 break; 570 } 571 if (state != T_IDLE) { 572 break; 573 } 574 } 575 if (do_rcv_connect) { 576 do { 577 if (t_rcvconnect(fd, rcvcall) != -1) { 578 connected = TRUE; 579 break; 580 } 581 } while (t_errno == TSYSERR && errno == EINTR); 582 } 583 584 /* 585 * Set the connection timeout back to its old value. 586 */ 587 if (curr_time != -1) { 588 (void) _set_tcp_conntime(fd, curr_time); 589 } 590 591 if (!connected) { 592 rpc_createerr.cf_stat = RPC_TLIERROR; 593 rpc_createerr.cf_error.re_terrno = t_errno; 594 rpc_createerr.cf_error.re_errno = errno; 595 (void) t_free((char *)rcvcall, T_CALL); 596 return (FALSE); 597 } 598 599 /* Free old area if allocated */ 600 if (ct->ct_addr.buf) 601 free(ct->ct_addr.buf); 602 ct->ct_addr = rcvcall->addr; /* To get the new address */ 603 /* So that address buf does not get freed */ 604 rcvcall->addr.buf = NULL; 605 (void) t_free((char *)rcvcall, T_CALL); 606 break; 607 case T_DATAXFER: 608 case T_OUTCON: 609 if (svcaddr == NULL) { 610 /* 611 * svcaddr could also be NULL in cases where the 612 * client is already bound and connected. 613 */ 614 ct->ct_addr.len = 0; 615 } else { 616 ct->ct_addr.buf = malloc(svcaddr->len); 617 if (ct->ct_addr.buf == NULL) { 618 (void) syslog(LOG_ERR, clnt_vc_errstr, 619 clnt_vc_str, __no_mem_str); 620 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 621 rpc_createerr.cf_error.re_errno = errno; 622 rpc_createerr.cf_error.re_terrno = 0; 623 return (FALSE); 624 } 625 (void) memcpy(ct->ct_addr.buf, svcaddr->buf, 626 (size_t)svcaddr->len); 627 ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len; 628 } 629 break; 630 default: 631 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 632 return (FALSE); 633 } 634 return (TRUE); 635 } 636 637 static enum clnt_stat 638 clnt_vc_call(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr, 639 xdrproc_t xdr_results, caddr_t results_ptr, struct timeval timeout) 640 { 641 struct ct_data *ct = (struct ct_data *)cl->cl_private; 642 XDR *xdrs = &(ct->ct_xdrs); 643 struct rpc_msg reply_msg; 644 uint32_t x_id; 645 uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */ 646 bool_t shipnow; 647 int refreshes = 2; 648 649 if (rpc_fd_lock(vctbl, ct->ct_fd)) { 650 rpc_callerr.re_status = RPC_FAILED; 651 rpc_callerr.re_errno = errno; 652 rpc_fd_unlock(vctbl, ct->ct_fd); 653 return (RPC_FAILED); 654 } 655 656 ct->ct_is_oneway = FALSE; 657 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 658 if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) { 659 rpc_fd_unlock(vctbl, ct->ct_fd); 660 return (RPC_FAILED); /* XXX */ 661 } 662 } 663 664 if (!ct->ct_waitset) { 665 /* If time is not within limits, we ignore it. */ 666 if (time_not_ok(&timeout) == FALSE) 667 ct->ct_wait = __rpc_timeval_to_msec(&timeout); 668 } else { 669 timeout.tv_sec = (ct->ct_wait / 1000); 670 timeout.tv_usec = (ct->ct_wait % 1000) * 1000; 671 } 672 673 shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) && 674 (timeout.tv_usec == 0)) ? FALSE : TRUE; 675 call_again: 676 xdrs->x_op = XDR_ENCODE; 677 rpc_callerr.re_status = RPC_SUCCESS; 678 /* 679 * Due to little endian byte order, it is necessary to convert to host 680 * format before decrementing xid. 681 */ 682 x_id = ntohl(*msg_x_id) - 1; 683 *msg_x_id = htonl(x_id); 684 685 if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) { 686 if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) || 687 (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) || 688 (!AUTH_MARSHALL(cl->cl_auth, xdrs)) || 689 (!xdr_args(xdrs, args_ptr))) { 690 if (rpc_callerr.re_status == RPC_SUCCESS) 691 rpc_callerr.re_status = RPC_CANTENCODEARGS; 692 (void) xdrrec_endofrecord(xdrs, TRUE); 693 rpc_fd_unlock(vctbl, ct->ct_fd); 694 return (rpc_callerr.re_status); 695 } 696 } else { 697 uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos]; 698 IXDR_PUT_U_INT32(u, proc); 699 if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall, 700 ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) { 701 if (rpc_callerr.re_status == RPC_SUCCESS) 702 rpc_callerr.re_status = RPC_CANTENCODEARGS; 703 (void) xdrrec_endofrecord(xdrs, TRUE); 704 rpc_fd_unlock(vctbl, ct->ct_fd); 705 return (rpc_callerr.re_status); 706 } 707 } 708 if (!xdrrec_endofrecord(xdrs, shipnow)) { 709 rpc_fd_unlock(vctbl, ct->ct_fd); 710 return (rpc_callerr.re_status = RPC_CANTSEND); 711 } 712 if (!shipnow) { 713 rpc_fd_unlock(vctbl, ct->ct_fd); 714 return (RPC_SUCCESS); 715 } 716 /* 717 * Hack to provide rpc-based message passing 718 */ 719 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { 720 rpc_fd_unlock(vctbl, ct->ct_fd); 721 return (rpc_callerr.re_status = RPC_TIMEDOUT); 722 } 723 724 725 /* 726 * Keep receiving until we get a valid transaction id 727 */ 728 xdrs->x_op = XDR_DECODE; 729 for (;;) { 730 reply_msg.acpted_rply.ar_verf = _null_auth; 731 reply_msg.acpted_rply.ar_results.where = NULL; 732 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 733 if (!xdrrec_skiprecord(xdrs)) { 734 rpc_fd_unlock(vctbl, ct->ct_fd); 735 return (rpc_callerr.re_status); 736 } 737 /* now decode and validate the response header */ 738 if (!xdr_replymsg(xdrs, &reply_msg)) { 739 if (rpc_callerr.re_status == RPC_SUCCESS) 740 continue; 741 rpc_fd_unlock(vctbl, ct->ct_fd); 742 return (rpc_callerr.re_status); 743 } 744 if (reply_msg.rm_xid == x_id) 745 break; 746 } 747 748 /* 749 * process header 750 */ 751 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 752 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 753 rpc_callerr.re_status = RPC_SUCCESS; 754 else 755 __seterr_reply(&reply_msg, &(rpc_callerr)); 756 757 if (rpc_callerr.re_status == RPC_SUCCESS) { 758 if (!AUTH_VALIDATE(cl->cl_auth, 759 &reply_msg.acpted_rply.ar_verf)) { 760 rpc_callerr.re_status = RPC_AUTHERROR; 761 rpc_callerr.re_why = AUTH_INVALIDRESP; 762 } else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) { 763 if (!(*xdr_results)(xdrs, results_ptr)) { 764 if (rpc_callerr.re_status == RPC_SUCCESS) 765 rpc_callerr.re_status = 766 RPC_CANTDECODERES; 767 } 768 } else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results, 769 results_ptr)) { 770 if (rpc_callerr.re_status == RPC_SUCCESS) 771 rpc_callerr.re_status = RPC_CANTDECODERES; 772 } 773 } /* end successful completion */ 774 /* 775 * If unsuccesful AND error is an authentication error 776 * then refresh credentials and try again, else break 777 */ 778 else if (rpc_callerr.re_status == RPC_AUTHERROR) { 779 /* maybe our credentials need to be refreshed ... */ 780 if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg)) 781 goto call_again; 782 else 783 /* 784 * We are setting rpc_callerr here given that libnsl 785 * is not reentrant thereby reinitializing the TSD. 786 * If not set here then success could be returned even 787 * though refresh failed. 788 */ 789 rpc_callerr.re_status = RPC_AUTHERROR; 790 } /* end of unsuccessful completion */ 791 /* free verifier ... */ 792 if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED && 793 reply_msg.acpted_rply.ar_verf.oa_base != NULL) { 794 xdrs->x_op = XDR_FREE; 795 (void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf)); 796 } 797 rpc_fd_unlock(vctbl, ct->ct_fd); 798 return (rpc_callerr.re_status); 799 } 800 801 static enum clnt_stat 802 clnt_vc_send(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr) 803 { 804 struct ct_data *ct = (struct ct_data *)cl->cl_private; 805 XDR *xdrs = &(ct->ct_xdrs); 806 uint32_t x_id; 807 uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */ 808 809 if (rpc_fd_lock(vctbl, ct->ct_fd)) { 810 rpc_callerr.re_status = RPC_FAILED; 811 rpc_callerr.re_errno = errno; 812 rpc_fd_unlock(vctbl, ct->ct_fd); 813 return (RPC_FAILED); 814 } 815 816 ct->ct_is_oneway = TRUE; 817 818 xdrs->x_op = XDR_ENCODE; 819 rpc_callerr.re_status = RPC_SUCCESS; 820 /* 821 * Due to little endian byte order, it is necessary to convert to host 822 * format before decrementing xid. 823 */ 824 x_id = ntohl(*msg_x_id) - 1; 825 *msg_x_id = htonl(x_id); 826 827 if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) { 828 if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) || 829 (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) || 830 (!AUTH_MARSHALL(cl->cl_auth, xdrs)) || 831 (!xdr_args(xdrs, args_ptr))) { 832 if (rpc_callerr.re_status == RPC_SUCCESS) 833 rpc_callerr.re_status = RPC_CANTENCODEARGS; 834 (void) xdrrec_endofrecord(xdrs, TRUE); 835 rpc_fd_unlock(vctbl, ct->ct_fd); 836 return (rpc_callerr.re_status); 837 } 838 } else { 839 uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos]; 840 IXDR_PUT_U_INT32(u, proc); 841 if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall, 842 ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) { 843 if (rpc_callerr.re_status == RPC_SUCCESS) 844 rpc_callerr.re_status = RPC_CANTENCODEARGS; 845 (void) xdrrec_endofrecord(xdrs, TRUE); 846 rpc_fd_unlock(vctbl, ct->ct_fd); 847 return (rpc_callerr.re_status); 848 } 849 } 850 851 /* 852 * Do not need to check errors, as the following code does 853 * not depend on the successful completion of the call. 854 * An error, if any occurs, is reported through 855 * rpc_callerr.re_status. 856 */ 857 (void) xdrrec_endofrecord(xdrs, TRUE); 858 859 rpc_fd_unlock(vctbl, ct->ct_fd); 860 return (rpc_callerr.re_status); 861 } 862 863 /* ARGSUSED */ 864 static void 865 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp) 866 { 867 *errp = rpc_callerr; 868 } 869 870 static bool_t 871 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, caddr_t res_ptr) 872 { 873 struct ct_data *ct = (struct ct_data *)cl->cl_private; 874 XDR *xdrs = &(ct->ct_xdrs); 875 bool_t stat; 876 877 (void) rpc_fd_lock(vctbl, ct->ct_fd); 878 xdrs->x_op = XDR_FREE; 879 stat = (*xdr_res)(xdrs, res_ptr); 880 rpc_fd_unlock(vctbl, ct->ct_fd); 881 return (stat); 882 } 883 884 static void 885 clnt_vc_abort(void) 886 { 887 } 888 889 static bool_t 890 clnt_vc_control(CLIENT *cl, int request, char *info) 891 { 892 bool_t ret; 893 struct ct_data *ct = (struct ct_data *)cl->cl_private; 894 895 if (rpc_fd_lock(vctbl, ct->ct_fd)) { 896 rpc_fd_unlock(vctbl, ct->ct_fd); 897 return (FALSE); 898 } 899 900 switch (request) { 901 case CLSET_FD_CLOSE: 902 ct->ct_closeit = TRUE; 903 rpc_fd_unlock(vctbl, ct->ct_fd); 904 return (TRUE); 905 case CLSET_FD_NCLOSE: 906 ct->ct_closeit = FALSE; 907 rpc_fd_unlock(vctbl, ct->ct_fd); 908 return (TRUE); 909 case CLFLUSH: 910 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 911 int res; 912 res = do_flush(ct, (info == NULL || 913 *(int *)info == RPC_CL_DEFAULT_FLUSH)? 914 ct->ct_blocking_mode: *(int *)info); 915 ret = (0 == res); 916 } else { 917 ret = FALSE; 918 } 919 rpc_fd_unlock(vctbl, ct->ct_fd); 920 return (ret); 921 } 922 923 /* for other requests which use info */ 924 if (info == NULL) { 925 rpc_fd_unlock(vctbl, ct->ct_fd); 926 return (FALSE); 927 } 928 switch (request) { 929 case CLSET_TIMEOUT: 930 if (time_not_ok((struct timeval *)info)) { 931 rpc_fd_unlock(vctbl, ct->ct_fd); 932 return (FALSE); 933 } 934 ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info); 935 ct->ct_waitset = TRUE; 936 break; 937 case CLGET_TIMEOUT: 938 ((struct timeval *)info)->tv_sec = ct->ct_wait / 1000; 939 ((struct timeval *)info)->tv_usec = (ct->ct_wait % 1000) * 1000; 940 break; 941 case CLGET_SERVER_ADDR: /* For compatibility only */ 942 (void) memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len); 943 break; 944 case CLGET_FD: 945 *(int *)info = ct->ct_fd; 946 break; 947 case CLGET_SVC_ADDR: 948 /* The caller should not free this memory area */ 949 *(struct netbuf *)info = ct->ct_addr; 950 break; 951 case CLSET_SVC_ADDR: /* set to new address */ 952 #ifdef undef 953 /* 954 * XXX: once the t_snddis(), followed by t_connect() starts to 955 * work, this ifdef should be removed. CLIENT handle reuse 956 * would then be possible for COTS as well. 957 */ 958 if (t_snddis(ct->ct_fd, NULL) == -1) { 959 rpc_createerr.cf_stat = RPC_TLIERROR; 960 rpc_createerr.cf_error.re_terrno = t_errno; 961 rpc_createerr.cf_error.re_errno = errno; 962 rpc_fd_unlock(vctbl, ct->ct_fd); 963 return (FALSE); 964 } 965 ret = set_up_connection(ct->ct_fd, (struct netbuf *)info, 966 ct, NULL); 967 rpc_fd_unlock(vctbl, ct->ct_fd); 968 return (ret); 969 #else 970 rpc_fd_unlock(vctbl, ct->ct_fd); 971 return (FALSE); 972 #endif 973 case CLGET_XID: 974 /* 975 * use the knowledge that xid is the 976 * first element in the call structure 977 * This will get the xid of the PREVIOUS call 978 */ 979 *(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall); 980 break; 981 case CLSET_XID: 982 /* This will set the xid of the NEXT call */ 983 *(uint32_t *)ct->ct_mcall = htonl(*(uint32_t *)info + 1); 984 /* increment by 1 as clnt_vc_call() decrements once */ 985 break; 986 case CLGET_VERS: 987 /* 988 * This RELIES on the information that, in the call body, 989 * the version number field is the fifth field from the 990 * begining of the RPC header. MUST be changed if the 991 * call_struct is changed 992 */ 993 *(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall + 994 4 * BYTES_PER_XDR_UNIT)); 995 break; 996 997 case CLSET_VERS: 998 *(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) = 999 htonl(*(uint32_t *)info); 1000 break; 1001 1002 case CLGET_PROG: 1003 /* 1004 * This RELIES on the information that, in the call body, 1005 * the program number field is the fourth field from the 1006 * begining of the RPC header. MUST be changed if the 1007 * call_struct is changed 1008 */ 1009 *(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall + 1010 3 * BYTES_PER_XDR_UNIT)); 1011 break; 1012 1013 case CLSET_PROG: 1014 *(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) = 1015 htonl(*(uint32_t *)info); 1016 break; 1017 1018 case CLSET_IO_MODE: 1019 if (!set_io_mode(ct, *(int *)info)) { 1020 rpc_fd_unlock(vctbl, ct->ct_fd); 1021 return (FALSE); 1022 } 1023 break; 1024 case CLSET_FLUSH_MODE: 1025 /* Set a specific FLUSH_MODE */ 1026 if (!set_flush_mode(ct, *(int *)info)) { 1027 rpc_fd_unlock(vctbl, ct->ct_fd); 1028 return (FALSE); 1029 } 1030 break; 1031 case CLGET_FLUSH_MODE: 1032 *(rpcflushmode_t *)info = ct->ct_blocking_mode; 1033 break; 1034 1035 case CLGET_IO_MODE: 1036 *(rpciomode_t *)info = ct->ct_io_mode; 1037 break; 1038 1039 case CLGET_CURRENT_REC_SIZE: 1040 /* 1041 * Returns the current amount of memory allocated 1042 * to pending requests 1043 */ 1044 *(int *)info = ct->ct_bufferPendingSize; 1045 break; 1046 1047 case CLSET_CONNMAXREC_SIZE: 1048 /* Cannot resize the buffer if it is used. */ 1049 if (ct->ct_bufferPendingSize != 0) { 1050 rpc_fd_unlock(vctbl, ct->ct_fd); 1051 return (FALSE); 1052 } 1053 /* 1054 * If the new size is equal to the current size, 1055 * there is nothing to do. 1056 */ 1057 if (ct->ct_bufferSize == *(uint_t *)info) 1058 break; 1059 1060 ct->ct_bufferSize = *(uint_t *)info; 1061 if (ct->ct_buffer) { 1062 free(ct->ct_buffer); 1063 ct->ct_buffer = NULL; 1064 ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL; 1065 } 1066 break; 1067 1068 case CLGET_CONNMAXREC_SIZE: 1069 /* 1070 * Returns the size of buffer allocated 1071 * to pending requests 1072 */ 1073 *(uint_t *)info = ct->ct_bufferSize; 1074 break; 1075 1076 default: 1077 rpc_fd_unlock(vctbl, ct->ct_fd); 1078 return (FALSE); 1079 } 1080 rpc_fd_unlock(vctbl, ct->ct_fd); 1081 return (TRUE); 1082 } 1083 1084 static void 1085 clnt_vc_destroy(CLIENT *cl) 1086 { 1087 struct ct_data *ct = (struct ct_data *)cl->cl_private; 1088 int ct_fd = ct->ct_fd; 1089 1090 (void) rpc_fd_lock(vctbl, ct_fd); 1091 1092 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 1093 (void) do_flush(ct, RPC_CL_BLOCKING_FLUSH); 1094 (void) unregister_nb(ct); 1095 } 1096 1097 if (ct->ct_closeit) 1098 (void) t_close(ct_fd); 1099 XDR_DESTROY(&(ct->ct_xdrs)); 1100 if (ct->ct_addr.buf) 1101 free(ct->ct_addr.buf); 1102 free(ct); 1103 if (cl->cl_netid && cl->cl_netid[0]) 1104 free(cl->cl_netid); 1105 if (cl->cl_tp && cl->cl_tp[0]) 1106 free(cl->cl_tp); 1107 free(cl); 1108 rpc_fd_unlock(vctbl, ct_fd); 1109 } 1110 1111 /* 1112 * Interface between xdr serializer and vc connection. 1113 * Behaves like the system calls, read & write, but keeps some error state 1114 * around for the rpc level. 1115 */ 1116 static int 1117 read_vc(void *ct_tmp, caddr_t buf, int len) 1118 { 1119 static pthread_key_t pfdp_key = PTHREAD_ONCE_KEY_NP; 1120 struct pollfd *pfdp; 1121 int npfd; /* total number of pfdp allocated */ 1122 struct ct_data *ct = ct_tmp; 1123 struct timeval starttime; 1124 struct timeval curtime; 1125 int poll_time; 1126 int delta; 1127 1128 if (len == 0) 1129 return (0); 1130 1131 /* 1132 * Allocate just one the first time. thr_get_storage() may 1133 * return a larger buffer, left over from the last time we were 1134 * here, but that's OK. realloc() will deal with it properly. 1135 */ 1136 npfd = 1; 1137 pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free); 1138 if (pfdp == NULL) { 1139 (void) syslog(LOG_ERR, clnt_vc_errstr, 1140 clnt_read_vc_str, __no_mem_str); 1141 rpc_callerr.re_status = RPC_SYSTEMERROR; 1142 rpc_callerr.re_errno = errno; 1143 rpc_callerr.re_terrno = 0; 1144 return (-1); 1145 } 1146 1147 /* 1148 * N.B.: slot 0 in the pollfd array is reserved for the file 1149 * descriptor we're really interested in (as opposed to the 1150 * callback descriptors). 1151 */ 1152 pfdp[0].fd = ct->ct_fd; 1153 pfdp[0].events = MASKVAL; 1154 pfdp[0].revents = 0; 1155 poll_time = ct->ct_wait; 1156 if (gettimeofday(&starttime, NULL) == -1) { 1157 syslog(LOG_ERR, "Unable to get time of day: %m"); 1158 return (-1); 1159 } 1160 1161 for (;;) { 1162 extern void (*_svc_getreqset_proc)(); 1163 extern pollfd_t *svc_pollfd; 1164 extern int svc_max_pollfd; 1165 int fds; 1166 1167 /* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */ 1168 1169 if (_svc_getreqset_proc) { 1170 sig_rw_rdlock(&svc_fd_lock); 1171 1172 /* reallocate pfdp to svc_max_pollfd +1 */ 1173 if (npfd != (svc_max_pollfd + 1)) { 1174 struct pollfd *tmp_pfdp = realloc(pfdp, 1175 sizeof (struct pollfd) * 1176 (svc_max_pollfd + 1)); 1177 if (tmp_pfdp == NULL) { 1178 sig_rw_unlock(&svc_fd_lock); 1179 (void) syslog(LOG_ERR, clnt_vc_errstr, 1180 clnt_read_vc_str, __no_mem_str); 1181 rpc_callerr.re_status = RPC_SYSTEMERROR; 1182 rpc_callerr.re_errno = errno; 1183 rpc_callerr.re_terrno = 0; 1184 return (-1); 1185 } 1186 1187 pfdp = tmp_pfdp; 1188 npfd = svc_max_pollfd + 1; 1189 (void) pthread_setspecific(pfdp_key, pfdp); 1190 } 1191 if (npfd > 1) 1192 (void) memcpy(&pfdp[1], svc_pollfd, 1193 sizeof (struct pollfd) * (npfd - 1)); 1194 1195 sig_rw_unlock(&svc_fd_lock); 1196 } else { 1197 npfd = 1; /* don't forget about pfdp[0] */ 1198 } 1199 1200 switch (fds = poll(pfdp, npfd, poll_time)) { 1201 case 0: 1202 rpc_callerr.re_status = RPC_TIMEDOUT; 1203 return (-1); 1204 1205 case -1: 1206 if (errno != EINTR) 1207 continue; 1208 else { 1209 /* 1210 * interrupted by another signal, 1211 * update time_waited 1212 */ 1213 1214 if (gettimeofday(&curtime, NULL) == -1) { 1215 syslog(LOG_ERR, 1216 "Unable to get time of day: %m"); 1217 errno = 0; 1218 continue; 1219 }; 1220 delta = (curtime.tv_sec - 1221 starttime.tv_sec) * 1000 + 1222 (curtime.tv_usec - 1223 starttime.tv_usec) / 1000; 1224 poll_time -= delta; 1225 if (poll_time < 0) { 1226 rpc_callerr.re_status = RPC_TIMEDOUT; 1227 errno = 0; 1228 return (-1); 1229 } else { 1230 errno = 0; /* reset it */ 1231 continue; 1232 } 1233 } 1234 } 1235 1236 if (pfdp[0].revents == 0) { 1237 /* must be for server side of the house */ 1238 (*_svc_getreqset_proc)(&pfdp[1], fds); 1239 continue; /* do poll again */ 1240 } 1241 1242 if (pfdp[0].revents & POLLNVAL) { 1243 rpc_callerr.re_status = RPC_CANTRECV; 1244 /* 1245 * Note: we're faking errno here because we 1246 * previously would have expected select() to 1247 * return -1 with errno EBADF. Poll(BA_OS) 1248 * returns 0 and sets the POLLNVAL revents flag 1249 * instead. 1250 */ 1251 rpc_callerr.re_errno = errno = EBADF; 1252 return (-1); 1253 } 1254 1255 if (pfdp[0].revents & (POLLERR | POLLHUP)) { 1256 rpc_callerr.re_status = RPC_CANTRECV; 1257 rpc_callerr.re_errno = errno = EPIPE; 1258 return (-1); 1259 } 1260 break; 1261 } 1262 1263 switch (len = t_rcvall(ct->ct_fd, buf, len)) { 1264 case 0: 1265 /* premature eof */ 1266 rpc_callerr.re_errno = ENOLINK; 1267 rpc_callerr.re_terrno = 0; 1268 rpc_callerr.re_status = RPC_CANTRECV; 1269 len = -1; /* it's really an error */ 1270 break; 1271 1272 case -1: 1273 rpc_callerr.re_terrno = t_errno; 1274 rpc_callerr.re_errno = 0; 1275 rpc_callerr.re_status = RPC_CANTRECV; 1276 break; 1277 } 1278 return (len); 1279 } 1280 1281 static int 1282 write_vc(void *ct_tmp, caddr_t buf, int len) 1283 { 1284 int i, cnt; 1285 struct ct_data *ct = ct_tmp; 1286 int flag; 1287 int maxsz; 1288 1289 maxsz = ct->ct_tsdu; 1290 1291 /* Handle the non-blocking mode */ 1292 if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) { 1293 /* 1294 * Test a special case here. If the length of the current 1295 * write is greater than the transport data unit, and the 1296 * mode is non blocking, we return RPC_CANTSEND. 1297 * XXX this is not very clean. 1298 */ 1299 if (maxsz > 0 && len > maxsz) { 1300 rpc_callerr.re_terrno = errno; 1301 rpc_callerr.re_errno = 0; 1302 rpc_callerr.re_status = RPC_CANTSEND; 1303 return (-1); 1304 } 1305 1306 len = nb_send(ct, buf, (unsigned)len); 1307 if (len == -1) { 1308 rpc_callerr.re_terrno = errno; 1309 rpc_callerr.re_errno = 0; 1310 rpc_callerr.re_status = RPC_CANTSEND; 1311 } else if (len == -2) { 1312 rpc_callerr.re_terrno = 0; 1313 rpc_callerr.re_errno = 0; 1314 rpc_callerr.re_status = RPC_CANTSTORE; 1315 } 1316 return (len); 1317 } 1318 1319 if ((maxsz == 0) || (maxsz == -1)) { 1320 /* 1321 * T_snd may return -1 for error on connection (connection 1322 * needs to be repaired/closed, and -2 for flow-control 1323 * handling error (no operation to do, just wait and call 1324 * T_Flush()). 1325 */ 1326 if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) { 1327 rpc_callerr.re_terrno = t_errno; 1328 rpc_callerr.re_errno = 0; 1329 rpc_callerr.re_status = RPC_CANTSEND; 1330 } 1331 return (len); 1332 } 1333 1334 /* 1335 * This for those transports which have a max size for data. 1336 */ 1337 for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) { 1338 flag = cnt > maxsz ? T_MORE : 0; 1339 if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz), 1340 flag)) == -1) { 1341 rpc_callerr.re_terrno = t_errno; 1342 rpc_callerr.re_errno = 0; 1343 rpc_callerr.re_status = RPC_CANTSEND; 1344 return (-1); 1345 } 1346 } 1347 return (len); 1348 } 1349 1350 /* 1351 * Receive the required bytes of data, even if it is fragmented. 1352 */ 1353 static int 1354 t_rcvall(int fd, char *buf, int len) 1355 { 1356 int moreflag; 1357 int final = 0; 1358 int res; 1359 1360 do { 1361 moreflag = 0; 1362 res = t_rcv(fd, buf, (unsigned)len, &moreflag); 1363 if (res == -1) { 1364 if (t_errno == TLOOK) 1365 switch (t_look(fd)) { 1366 case T_DISCONNECT: 1367 (void) t_rcvdis(fd, NULL); 1368 (void) t_snddis(fd, NULL); 1369 return (-1); 1370 case T_ORDREL: 1371 /* Received orderly release indication */ 1372 (void) t_rcvrel(fd); 1373 /* Send orderly release indicator */ 1374 (void) t_sndrel(fd); 1375 return (-1); 1376 default: 1377 return (-1); 1378 } 1379 } else if (res == 0) { 1380 return (0); 1381 } 1382 final += res; 1383 buf += res; 1384 len -= res; 1385 } while ((len > 0) && (moreflag & T_MORE)); 1386 return (final); 1387 } 1388 1389 static struct clnt_ops * 1390 clnt_vc_ops(void) 1391 { 1392 static struct clnt_ops ops; 1393 extern mutex_t ops_lock; 1394 1395 /* VARIABLES PROTECTED BY ops_lock: ops */ 1396 1397 sig_mutex_lock(&ops_lock); 1398 if (ops.cl_call == NULL) { 1399 ops.cl_call = clnt_vc_call; 1400 ops.cl_send = clnt_vc_send; 1401 ops.cl_abort = clnt_vc_abort; 1402 ops.cl_geterr = clnt_vc_geterr; 1403 ops.cl_freeres = clnt_vc_freeres; 1404 ops.cl_destroy = clnt_vc_destroy; 1405 ops.cl_control = clnt_vc_control; 1406 } 1407 sig_mutex_unlock(&ops_lock); 1408 return (&ops); 1409 } 1410 1411 /* 1412 * Make sure that the time is not garbage. -1 value is disallowed. 1413 * Note this is different from time_not_ok in clnt_dg.c 1414 */ 1415 static bool_t 1416 time_not_ok(struct timeval *t) 1417 { 1418 return (t->tv_sec <= -1 || t->tv_sec > 100000000 || 1419 t->tv_usec <= -1 || t->tv_usec > 1000000); 1420 } 1421 1422 1423 /* Compute the # of bytes that remains until the end of the buffer */ 1424 #define REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer)) 1425 1426 static int 1427 addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes) 1428 { 1429 if (NULL == ct->ct_buffer) { 1430 /* Buffer not allocated yet. */ 1431 char *buffer; 1432 1433 buffer = malloc(ct->ct_bufferSize); 1434 if (NULL == buffer) { 1435 errno = ENOMEM; 1436 return (-1); 1437 } 1438 (void) memcpy(buffer, dataToAdd, nBytes); 1439 1440 ct->ct_buffer = buffer; 1441 ct->ct_bufferReadPtr = buffer; 1442 ct->ct_bufferWritePtr = buffer + nBytes; 1443 ct->ct_bufferPendingSize = nBytes; 1444 } else { 1445 /* 1446 * For an already allocated buffer, two mem copies 1447 * might be needed, depending on the current 1448 * writing position. 1449 */ 1450 1451 /* Compute the length of the first copy. */ 1452 int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr)); 1453 1454 ct->ct_bufferPendingSize += nBytes; 1455 1456 (void) memcpy(ct->ct_bufferWritePtr, dataToAdd, len); 1457 ct->ct_bufferWritePtr += len; 1458 nBytes -= len; 1459 if (0 == nBytes) { 1460 /* One memcopy needed. */ 1461 1462 /* 1463 * If the write pointer is at the end of the buffer, 1464 * wrap it now. 1465 */ 1466 if (ct->ct_bufferWritePtr == 1467 (ct->ct_buffer + ct->ct_bufferSize)) { 1468 ct->ct_bufferWritePtr = ct->ct_buffer; 1469 } 1470 } else { 1471 /* Two memcopy needed. */ 1472 dataToAdd += len; 1473 1474 /* 1475 * Copy the remaining data to the beginning of the 1476 * buffer 1477 */ 1478 (void) memcpy(ct->ct_buffer, dataToAdd, nBytes); 1479 ct->ct_bufferWritePtr = ct->ct_buffer + nBytes; 1480 } 1481 } 1482 return (0); 1483 } 1484 1485 static void 1486 consumeFromBuffer(struct ct_data *ct, unsigned int nBytes) 1487 { 1488 ct->ct_bufferPendingSize -= nBytes; 1489 if (ct->ct_bufferPendingSize == 0) { 1490 /* 1491 * If the buffer contains no data, we set the two pointers at 1492 * the beginning of the buffer (to miminize buffer wraps). 1493 */ 1494 ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer; 1495 } else { 1496 ct->ct_bufferReadPtr += nBytes; 1497 if (ct->ct_bufferReadPtr > 1498 ct->ct_buffer + ct->ct_bufferSize) { 1499 ct->ct_bufferReadPtr -= ct->ct_bufferSize; 1500 } 1501 } 1502 } 1503 1504 static int 1505 iovFromBuffer(struct ct_data *ct, struct iovec *iov) 1506 { 1507 int l; 1508 1509 if (ct->ct_bufferPendingSize == 0) 1510 return (0); 1511 1512 l = REMAIN_BYTES(bufferReadPtr); 1513 if (l < ct->ct_bufferPendingSize) { 1514 /* Buffer in two fragments. */ 1515 iov[0].iov_base = ct->ct_bufferReadPtr; 1516 iov[0].iov_len = l; 1517 1518 iov[1].iov_base = ct->ct_buffer; 1519 iov[1].iov_len = ct->ct_bufferPendingSize - l; 1520 return (2); 1521 } else { 1522 /* Buffer in one fragment. */ 1523 iov[0].iov_base = ct->ct_bufferReadPtr; 1524 iov[0].iov_len = ct->ct_bufferPendingSize; 1525 return (1); 1526 } 1527 } 1528 1529 static bool_t 1530 set_flush_mode(struct ct_data *ct, int mode) 1531 { 1532 switch (mode) { 1533 case RPC_CL_BLOCKING_FLUSH: 1534 /* flush as most as possible without blocking */ 1535 case RPC_CL_BESTEFFORT_FLUSH: 1536 /* flush the buffer completely (possibly blocking) */ 1537 case RPC_CL_DEFAULT_FLUSH: 1538 /* flush according to the currently defined policy */ 1539 ct->ct_blocking_mode = mode; 1540 return (TRUE); 1541 default: 1542 return (FALSE); 1543 } 1544 } 1545 1546 static bool_t 1547 set_io_mode(struct ct_data *ct, int ioMode) 1548 { 1549 switch (ioMode) { 1550 case RPC_CL_BLOCKING: 1551 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 1552 if (NULL != ct->ct_buffer) { 1553 /* 1554 * If a buffer was allocated for this 1555 * connection, flush it now, and free it. 1556 */ 1557 (void) do_flush(ct, RPC_CL_BLOCKING_FLUSH); 1558 free(ct->ct_buffer); 1559 ct->ct_buffer = NULL; 1560 } 1561 (void) unregister_nb(ct); 1562 ct->ct_io_mode = ioMode; 1563 } 1564 break; 1565 case RPC_CL_NONBLOCKING: 1566 if (ct->ct_io_mode == RPC_CL_BLOCKING) { 1567 if (-1 == register_nb(ct)) { 1568 return (FALSE); 1569 } 1570 ct->ct_io_mode = ioMode; 1571 } 1572 break; 1573 default: 1574 return (FALSE); 1575 } 1576 return (TRUE); 1577 } 1578 1579 static int 1580 do_flush(struct ct_data *ct, uint_t flush_mode) 1581 { 1582 int result; 1583 if (ct->ct_bufferPendingSize == 0) { 1584 return (0); 1585 } 1586 1587 switch (flush_mode) { 1588 case RPC_CL_BLOCKING_FLUSH: 1589 if (!set_blocking_connection(ct, TRUE)) { 1590 return (-1); 1591 } 1592 while (ct->ct_bufferPendingSize > 0) { 1593 if (REMAIN_BYTES(bufferReadPtr) < 1594 ct->ct_bufferPendingSize) { 1595 struct iovec iov[2]; 1596 (void) iovFromBuffer(ct, iov); 1597 result = writev(ct->ct_fd, iov, 2); 1598 } else { 1599 result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr, 1600 ct->ct_bufferPendingSize, 0); 1601 } 1602 if (result < 0) { 1603 return (-1); 1604 } 1605 consumeFromBuffer(ct, result); 1606 } 1607 1608 break; 1609 1610 case RPC_CL_BESTEFFORT_FLUSH: 1611 (void) set_blocking_connection(ct, FALSE); 1612 if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) { 1613 struct iovec iov[2]; 1614 (void) iovFromBuffer(ct, iov); 1615 result = writev(ct->ct_fd, iov, 2); 1616 } else { 1617 result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr, 1618 ct->ct_bufferPendingSize, 0); 1619 } 1620 if (result < 0) { 1621 if (errno != EWOULDBLOCK) { 1622 perror("flush"); 1623 return (-1); 1624 } 1625 return (0); 1626 } 1627 if (result > 0) 1628 consumeFromBuffer(ct, result); 1629 break; 1630 } 1631 return (0); 1632 } 1633 1634 /* 1635 * Non blocking send. 1636 */ 1637 1638 /* 1639 * Test if this is last fragment. See comment in front of xdr_rec.c 1640 * for details. 1641 */ 1642 #define LAST_FRAG(x) ((ntohl(*(uint32_t *)x) & (1U << 31)) == (1U << 31)) 1643 1644 static int 1645 nb_send(struct ct_data *ct, void *buff, unsigned int nBytes) 1646 { 1647 int result; 1648 1649 if (!LAST_FRAG(buff)) { 1650 return (-1); 1651 } 1652 1653 /* 1654 * Check to see if the current message can be stored fully in the 1655 * buffer. We have to check this now because it may be impossible 1656 * to send any data, so the message must be stored in the buffer. 1657 */ 1658 if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) { 1659 /* Try to flush (to free some space). */ 1660 (void) do_flush(ct, RPC_CL_BESTEFFORT_FLUSH); 1661 1662 /* Can we store the message now ? */ 1663 if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) 1664 return (-2); 1665 } 1666 1667 (void) set_blocking_connection(ct, FALSE); 1668 1669 /* 1670 * If there is no data pending, we can simply try 1671 * to send our data. 1672 */ 1673 if (ct->ct_bufferPendingSize == 0) { 1674 result = t_snd(ct->ct_fd, buff, nBytes, 0); 1675 if (result == -1) { 1676 if (errno == EWOULDBLOCK) { 1677 result = 0; 1678 } else { 1679 perror("send"); 1680 return (-1); 1681 } 1682 } 1683 /* 1684 * If we have not sent all data, we must store them 1685 * in the buffer. 1686 */ 1687 if (result != nBytes) { 1688 if (addInBuffer(ct, (char *)buff + result, 1689 nBytes - result) == -1) { 1690 return (-1); 1691 } 1692 } 1693 } else { 1694 /* 1695 * Some data pending in the buffer. We try to send 1696 * both buffer data and current message in one shot. 1697 */ 1698 struct iovec iov[3]; 1699 int i = iovFromBuffer(ct, &iov[0]); 1700 1701 iov[i].iov_base = buff; 1702 iov[i].iov_len = nBytes; 1703 1704 result = writev(ct->ct_fd, iov, i+1); 1705 if (result == -1) { 1706 if (errno == EWOULDBLOCK) { 1707 /* No bytes sent */ 1708 result = 0; 1709 } else { 1710 return (-1); 1711 } 1712 } 1713 1714 /* 1715 * Add the bytes from the message 1716 * that we have not sent. 1717 */ 1718 if (result <= ct->ct_bufferPendingSize) { 1719 /* No bytes from the message sent */ 1720 consumeFromBuffer(ct, result); 1721 if (addInBuffer(ct, buff, nBytes) == -1) { 1722 return (-1); 1723 } 1724 } else { 1725 /* 1726 * Some bytes of the message are sent. 1727 * Compute the length of the message that has 1728 * been sent. 1729 */ 1730 int len = result - ct->ct_bufferPendingSize; 1731 1732 /* So, empty the buffer. */ 1733 ct->ct_bufferReadPtr = ct->ct_buffer; 1734 ct->ct_bufferWritePtr = ct->ct_buffer; 1735 ct->ct_bufferPendingSize = 0; 1736 1737 /* And add the remaining part of the message. */ 1738 if (len != nBytes) { 1739 if (addInBuffer(ct, (char *)buff + len, 1740 nBytes-len) == -1) { 1741 return (-1); 1742 } 1743 } 1744 } 1745 } 1746 return (nBytes); 1747 } 1748 1749 static void 1750 flush_registered_clients(void) 1751 { 1752 struct nb_reg_node *node; 1753 1754 if (LIST_ISEMPTY(nb_first)) { 1755 return; 1756 } 1757 1758 LIST_FOR_EACH(nb_first, node) { 1759 (void) do_flush(node->ct, RPC_CL_BLOCKING_FLUSH); 1760 } 1761 } 1762 1763 static int 1764 allocate_chunk(void) 1765 { 1766 #define CHUNK_SIZE 16 1767 struct nb_reg_node *chk = 1768 malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE); 1769 struct nb_reg_node *n; 1770 int i; 1771 1772 if (NULL == chk) { 1773 return (-1); 1774 } 1775 1776 n = chk; 1777 for (i = 0; i < CHUNK_SIZE-1; ++i) { 1778 n[i].next = &(n[i+1]); 1779 } 1780 n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free; 1781 nb_free = chk; 1782 return (0); 1783 } 1784 1785 static int 1786 register_nb(struct ct_data *ct) 1787 { 1788 struct nb_reg_node *node; 1789 1790 (void) mutex_lock(&nb_list_mutex); 1791 1792 if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) { 1793 (void) mutex_unlock(&nb_list_mutex); 1794 errno = ENOMEM; 1795 return (-1); 1796 } 1797 1798 if (!exit_handler_set) { 1799 (void) atexit(flush_registered_clients); 1800 exit_handler_set = TRUE; 1801 } 1802 /* Get the first free node */ 1803 LIST_EXTRACT(nb_free, node); 1804 1805 node->ct = ct; 1806 1807 LIST_ADD(nb_first, node); 1808 (void) mutex_unlock(&nb_list_mutex); 1809 1810 return (0); 1811 } 1812 1813 static int 1814 unregister_nb(struct ct_data *ct) 1815 { 1816 struct nb_reg_node *node; 1817 1818 (void) mutex_lock(&nb_list_mutex); 1819 assert(!LIST_ISEMPTY(nb_first)); 1820 1821 node = nb_first; 1822 LIST_FOR_EACH(nb_first, node) { 1823 if (node->next->ct == ct) { 1824 /* Get the node to unregister. */ 1825 struct nb_reg_node *n = node->next; 1826 node->next = n->next; 1827 1828 n->ct = NULL; 1829 LIST_ADD(nb_free, n); 1830 break; 1831 } 1832 } 1833 (void) mutex_unlock(&nb_list_mutex); 1834 return (0); 1835 } 1836