1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2015 Nexenta Systems, Inc. All rights reserved. 24 */ 25 26 /* 27 * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 28 * Use is subject to license terms. 29 */ 30 31 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ 32 /* All Rights Reserved */ 33 /* 34 * Portions of this source code were derived from Berkeley 35 * 4.3 BSD under license from the Regents of the University of 36 * California. 37 */ 38 39 /* 40 * clnt_vc.c 41 * 42 * Implements a connectionful client side RPC. 43 * 44 * Connectionful RPC supports 'batched calls'. 45 * A sequence of calls may be batched-up in a send buffer. The rpc call 46 * return immediately to the client even though the call was not necessarily 47 * sent. The batching occurs if the results' xdr routine is NULL (0) AND 48 * the rpc timeout value is zero (see clnt.h, rpc). 49 * 50 * Clients should NOT casually batch calls that in fact return results; that 51 * is the server side should be aware that a call is batched and not produce 52 * any return message. Batched calls that produce many result messages can 53 * deadlock (netlock) the client and the server.... 54 */ 55 56 57 #include "mt.h" 58 #include "rpc_mt.h" 59 #include <assert.h> 60 #include <rpc/rpc.h> 61 #include <errno.h> 62 #include <sys/byteorder.h> 63 #include <sys/mkdev.h> 64 #include <sys/poll.h> 65 #include <syslog.h> 66 #include <stdlib.h> 67 #include <unistd.h> 68 #include <netinet/tcp.h> 69 #include <limits.h> 70 71 #define MCALL_MSG_SIZE 24 72 #define SECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000 * 1000) 73 #define MSECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000) 74 #define USECS_TO_NS(x) ((hrtime_t)(x) * 1000) 75 #define NSECS_TO_MS(x) ((x) / 1000 / 1000) 76 #ifndef MIN 77 #define MIN(a, b) (((a) < (b)) ? (a) : (b)) 78 #endif 79 80 extern int __rpc_timeval_to_msec(struct timeval *); 81 extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *); 82 extern bool_t xdr_opaque_auth(XDR *, struct opaque_auth *); 83 extern bool_t __rpc_gss_wrap(AUTH *, char *, uint_t, XDR *, bool_t (*)(), 84 caddr_t); 85 extern bool_t __rpc_gss_unwrap(AUTH *, XDR *, bool_t (*)(), caddr_t); 86 extern CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t, 87 rpcvers_t, uint_t, uint_t, const struct timeval *); 88 89 static struct clnt_ops *clnt_vc_ops(void); 90 static int read_vc(void *, caddr_t, int); 91 static int write_vc(void *, caddr_t, int); 92 static int t_rcvall(int, char *, int); 93 static bool_t time_not_ok(struct timeval *); 94 95 struct ct_data; 96 static bool_t set_up_connection(int, struct netbuf *, 97 struct ct_data *, const struct timeval *); 98 static bool_t set_io_mode(struct ct_data *, int); 99 100 /* 101 * Lock table handle used by various MT sync. routines 102 */ 103 static mutex_t vctbl_lock = DEFAULTMUTEX; 104 static void *vctbl = NULL; 105 106 static const char clnt_vc_errstr[] = "%s : %s"; 107 static const char clnt_vc_str[] = "clnt_vc_create"; 108 static const char clnt_read_vc_str[] = "read_vc"; 109 static const char __no_mem_str[] = "out of memory"; 110 static const char no_fcntl_getfl_str[] = "could not get status flags and modes"; 111 static const char no_nonblock_str[] = "could not set transport blocking mode"; 112 113 /* 114 * Private data structure 115 */ 116 struct ct_data { 117 int ct_fd; /* connection's fd */ 118 bool_t ct_closeit; /* close it on destroy */ 119 int ct_tsdu; /* size of tsdu */ 120 int ct_wait; /* wait interval in milliseconds */ 121 bool_t ct_waitset; /* wait set by clnt_control? */ 122 struct netbuf ct_addr; /* remote addr */ 123 struct rpc_err ct_error; 124 char ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */ 125 uint_t ct_mpos; /* pos after marshal */ 126 XDR ct_xdrs; /* XDR stream */ 127 128 /* NON STANDARD INFO - 00-08-31 */ 129 bool_t ct_is_oneway; /* True if the current call is oneway. */ 130 bool_t ct_is_blocking; 131 ushort_t ct_io_mode; 132 ushort_t ct_blocking_mode; 133 uint_t ct_bufferSize; /* Total size of the buffer. */ 134 uint_t ct_bufferPendingSize; /* Size of unsent data. */ 135 char *ct_buffer; /* Pointer to the buffer. */ 136 char *ct_bufferWritePtr; /* Ptr to the first free byte. */ 137 char *ct_bufferReadPtr; /* Ptr to the first byte of data. */ 138 }; 139 140 struct nb_reg_node { 141 struct nb_reg_node *next; 142 struct ct_data *ct; 143 }; 144 145 static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first; 146 static struct nb_reg_node *nb_free = (struct nb_reg_node *)&nb_free; 147 148 static bool_t exit_handler_set = FALSE; 149 150 static mutex_t nb_list_mutex = DEFAULTMUTEX; 151 152 153 /* Define some macros to manage the linked list. */ 154 #define LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l) 155 #define LIST_CLR(l) (l = (struct nb_reg_node *)&l) 156 #define LIST_ADD(l, node) (node->next = l->next, l = node) 157 #define LIST_EXTRACT(l, node) (node = l, l = l->next) 158 #define LIST_FOR_EACH(l, node) \ 159 for (node = l; node != (struct nb_reg_node *)&l; node = node->next) 160 161 162 /* Default size of the IO buffer used in non blocking mode */ 163 #define DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024) 164 165 static int nb_send(struct ct_data *, void *, unsigned int); 166 static int do_flush(struct ct_data *, uint_t); 167 static bool_t set_flush_mode(struct ct_data *, int); 168 static bool_t set_blocking_connection(struct ct_data *, bool_t); 169 170 static int register_nb(struct ct_data *); 171 static int unregister_nb(struct ct_data *); 172 173 174 /* 175 * Change the mode of the underlying fd. 176 */ 177 static bool_t 178 set_blocking_connection(struct ct_data *ct, bool_t blocking) 179 { 180 int flag; 181 182 /* 183 * If the underlying fd is already in the required mode, 184 * avoid the syscall. 185 */ 186 if (ct->ct_is_blocking == blocking) 187 return (TRUE); 188 189 if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) { 190 (void) syslog(LOG_ERR, "set_blocking_connection : %s", 191 no_fcntl_getfl_str); 192 return (FALSE); 193 } 194 195 flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK; 196 if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) { 197 (void) syslog(LOG_ERR, "set_blocking_connection : %s", 198 no_nonblock_str); 199 return (FALSE); 200 } 201 ct->ct_is_blocking = blocking; 202 return (TRUE); 203 } 204 205 /* 206 * Create a client handle for a connection. 207 * Default options are set, which the user can change using clnt_control()'s. 208 * The rpc/vc package does buffering similar to stdio, so the client 209 * must pick send and receive buffer sizes, 0 => use the default. 210 * NB: fd is copied into a private area. 211 * NB: The rpch->cl_auth is set null authentication. Caller may wish to 212 * set this something more useful. 213 * 214 * fd should be open and bound. 215 */ 216 CLIENT * 217 clnt_vc_create(const int fd, struct netbuf *svcaddr, const rpcprog_t prog, 218 const rpcvers_t vers, const uint_t sendsz, const uint_t recvsz) 219 { 220 return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz, 221 recvsz, NULL)); 222 } 223 224 /* 225 * This has the same definition as clnt_vc_create(), except it 226 * takes an additional parameter - a pointer to a timeval structure. 227 * 228 * Not a public interface. This is for clnt_create_timed, 229 * clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout 230 * value to control a tcp connection attempt. 231 * (for bug 4049792: clnt_create_timed does not time out) 232 * 233 * If tp is NULL, use default timeout to set up the connection. 234 */ 235 CLIENT * 236 _clnt_vc_create_timed(int fd, struct netbuf *svcaddr, rpcprog_t prog, 237 rpcvers_t vers, uint_t sendsz, uint_t recvsz, const struct timeval *tp) 238 { 239 CLIENT *cl; /* client handle */ 240 struct ct_data *ct; /* private data */ 241 struct timeval now; 242 struct rpc_msg call_msg; 243 struct t_info tinfo; 244 int flag; 245 246 cl = malloc(sizeof (*cl)); 247 if ((ct = malloc(sizeof (*ct))) != NULL) 248 ct->ct_addr.buf = NULL; 249 250 if ((cl == NULL) || (ct == NULL)) { 251 (void) syslog(LOG_ERR, clnt_vc_errstr, 252 clnt_vc_str, __no_mem_str); 253 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 254 rpc_createerr.cf_error.re_errno = errno; 255 rpc_createerr.cf_error.re_terrno = 0; 256 goto err; 257 } 258 259 /* 260 * The only use of vctbl_lock is for serializing the creation of 261 * vctbl. Once created the lock needs to be released so we don't 262 * hold it across the set_up_connection() call and end up with a 263 * bunch of threads stuck waiting for the mutex. 264 */ 265 sig_mutex_lock(&vctbl_lock); 266 267 if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) { 268 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 269 rpc_createerr.cf_error.re_errno = errno; 270 rpc_createerr.cf_error.re_terrno = 0; 271 sig_mutex_unlock(&vctbl_lock); 272 goto err; 273 } 274 275 sig_mutex_unlock(&vctbl_lock); 276 277 ct->ct_io_mode = RPC_CL_BLOCKING; 278 ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH; 279 280 ct->ct_buffer = NULL; /* We allocate the buffer when needed. */ 281 ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE; 282 ct->ct_bufferPendingSize = 0; 283 ct->ct_bufferWritePtr = NULL; 284 ct->ct_bufferReadPtr = NULL; 285 286 /* Check the current state of the fd. */ 287 if ((flag = fcntl(fd, F_GETFL, 0)) < 0) { 288 (void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s", 289 no_fcntl_getfl_str); 290 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 291 rpc_createerr.cf_error.re_terrno = errno; 292 rpc_createerr.cf_error.re_errno = 0; 293 goto err; 294 } 295 ct->ct_is_blocking = flag & O_NONBLOCK ? FALSE : TRUE; 296 297 if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) { 298 goto err; 299 } 300 301 /* 302 * Set up other members of private data struct 303 */ 304 ct->ct_fd = fd; 305 /* 306 * The actual value will be set by clnt_call or clnt_control 307 */ 308 ct->ct_wait = 30000; 309 ct->ct_waitset = FALSE; 310 /* 311 * By default, closeit is always FALSE. It is users responsibility 312 * to do a t_close on it, else the user may use clnt_control 313 * to let clnt_destroy do it for him/her. 314 */ 315 ct->ct_closeit = FALSE; 316 317 /* 318 * Initialize call message 319 */ 320 (void) gettimeofday(&now, (struct timezone *)0); 321 call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec; 322 call_msg.rm_call.cb_prog = prog; 323 call_msg.rm_call.cb_vers = vers; 324 325 /* 326 * pre-serialize the static part of the call msg and stash it away 327 */ 328 xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE); 329 if (!xdr_callhdr(&(ct->ct_xdrs), &call_msg)) { 330 goto err; 331 } 332 ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs)); 333 XDR_DESTROY(&(ct->ct_xdrs)); 334 335 if (t_getinfo(fd, &tinfo) == -1) { 336 rpc_createerr.cf_stat = RPC_TLIERROR; 337 rpc_createerr.cf_error.re_terrno = t_errno; 338 rpc_createerr.cf_error.re_errno = 0; 339 goto err; 340 } 341 /* 342 * Find the receive and the send size 343 */ 344 sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu); 345 recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu); 346 if ((sendsz == 0) || (recvsz == 0)) { 347 rpc_createerr.cf_stat = RPC_TLIERROR; 348 rpc_createerr.cf_error.re_terrno = 0; 349 rpc_createerr.cf_error.re_errno = 0; 350 goto err; 351 } 352 ct->ct_tsdu = tinfo.tsdu; 353 /* 354 * Create a client handle which uses xdrrec for serialization 355 * and authnone for authentication. 356 */ 357 ct->ct_xdrs.x_ops = NULL; 358 xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct, 359 read_vc, write_vc); 360 if (ct->ct_xdrs.x_ops == NULL) { 361 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 362 rpc_createerr.cf_error.re_terrno = 0; 363 rpc_createerr.cf_error.re_errno = ENOMEM; 364 goto err; 365 } 366 cl->cl_ops = clnt_vc_ops(); 367 cl->cl_private = (caddr_t)ct; 368 cl->cl_auth = authnone_create(); 369 cl->cl_tp = NULL; 370 cl->cl_netid = NULL; 371 return (cl); 372 373 err: 374 if (ct) { 375 free(ct->ct_addr.buf); 376 free(ct); 377 } 378 free(cl); 379 380 return (NULL); 381 } 382 383 #define TCPOPT_BUFSIZE 128 384 385 /* 386 * Set tcp connection timeout value. 387 * Retun 0 for success, -1 for failure. 388 */ 389 static int 390 _set_tcp_conntime(int fd, int optval) 391 { 392 struct t_optmgmt req, res; 393 struct opthdr *opt; 394 int *ip; 395 char buf[TCPOPT_BUFSIZE]; 396 397 /* LINTED pointer cast */ 398 opt = (struct opthdr *)buf; 399 opt->level = IPPROTO_TCP; 400 opt->name = TCP_CONN_ABORT_THRESHOLD; 401 opt->len = sizeof (int); 402 403 req.flags = T_NEGOTIATE; 404 req.opt.len = sizeof (struct opthdr) + opt->len; 405 req.opt.buf = (char *)opt; 406 /* LINTED pointer cast */ 407 ip = (int *)((char *)buf + sizeof (struct opthdr)); 408 *ip = optval; 409 410 res.flags = 0; 411 res.opt.buf = (char *)buf; 412 res.opt.maxlen = sizeof (buf); 413 if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) { 414 return (-1); 415 } 416 return (0); 417 } 418 419 /* 420 * Get current tcp connection timeout value. 421 * Retun the timeout in milliseconds, or -1 for failure. 422 */ 423 static int 424 _get_tcp_conntime(int fd) 425 { 426 struct t_optmgmt req, res; 427 struct opthdr *opt; 428 int *ip, retval; 429 char buf[TCPOPT_BUFSIZE]; 430 431 /* LINTED pointer cast */ 432 opt = (struct opthdr *)buf; 433 opt->level = IPPROTO_TCP; 434 opt->name = TCP_CONN_ABORT_THRESHOLD; 435 opt->len = sizeof (int); 436 437 req.flags = T_CURRENT; 438 req.opt.len = sizeof (struct opthdr) + opt->len; 439 req.opt.buf = (char *)opt; 440 /* LINTED pointer cast */ 441 ip = (int *)((char *)buf + sizeof (struct opthdr)); 442 *ip = 0; 443 444 res.flags = 0; 445 res.opt.buf = (char *)buf; 446 res.opt.maxlen = sizeof (buf); 447 if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) { 448 return (-1); 449 } 450 451 /* LINTED pointer cast */ 452 ip = (int *)((char *)buf + sizeof (struct opthdr)); 453 retval = *ip; 454 return (retval); 455 } 456 457 static bool_t 458 set_up_connection(int fd, struct netbuf *svcaddr, struct ct_data *ct, 459 const struct timeval *tp) 460 { 461 int state; 462 struct t_call sndcallstr, *rcvcall; 463 int nconnect; 464 bool_t connected, do_rcv_connect; 465 int curr_time = -1; 466 hrtime_t start; 467 hrtime_t tout; /* timeout in nanoseconds (from tp) */ 468 469 ct->ct_addr.len = 0; 470 state = t_getstate(fd); 471 if (state == -1) { 472 rpc_createerr.cf_stat = RPC_TLIERROR; 473 rpc_createerr.cf_error.re_errno = 0; 474 rpc_createerr.cf_error.re_terrno = t_errno; 475 return (FALSE); 476 } 477 478 switch (state) { 479 case T_IDLE: 480 if (svcaddr == NULL) { 481 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 482 return (FALSE); 483 } 484 /* 485 * Connect only if state is IDLE and svcaddr known 486 */ 487 /* LINTED pointer alignment */ 488 rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR); 489 if (rcvcall == NULL) { 490 rpc_createerr.cf_stat = RPC_TLIERROR; 491 rpc_createerr.cf_error.re_terrno = t_errno; 492 rpc_createerr.cf_error.re_errno = errno; 493 return (FALSE); 494 } 495 rcvcall->udata.maxlen = 0; 496 sndcallstr.addr = *svcaddr; 497 sndcallstr.opt.len = 0; 498 sndcallstr.udata.len = 0; 499 /* 500 * Even NULL could have sufficed for rcvcall, because 501 * the address returned is same for all cases except 502 * for the gateway case, and hence required. 503 */ 504 connected = FALSE; 505 do_rcv_connect = FALSE; 506 507 /* 508 * If there is a timeout value specified, we will try to 509 * reset the tcp connection timeout. If the transport does 510 * not support the TCP_CONN_ABORT_THRESHOLD option or fails 511 * for other reason, default timeout will be used. 512 */ 513 if (tp != NULL) { 514 start = gethrtime(); 515 516 /* 517 * Calculate the timeout in nanoseconds 518 */ 519 tout = SECS_TO_NS(tp->tv_sec) + 520 USECS_TO_NS(tp->tv_usec); 521 curr_time = _get_tcp_conntime(fd); 522 } 523 524 for (nconnect = 0; nconnect < 3; nconnect++) { 525 if (tp != NULL) { 526 /* 527 * Calculate the elapsed time 528 */ 529 hrtime_t elapsed = gethrtime() - start; 530 if (elapsed >= tout) 531 break; 532 533 if (curr_time != -1) { 534 int ms; 535 536 /* 537 * TCP_CONN_ABORT_THRESHOLD takes int 538 * value in milliseconds. Make sure we 539 * do not overflow. 540 */ 541 if (NSECS_TO_MS(tout - elapsed) >= 542 INT_MAX) { 543 ms = INT_MAX; 544 } else { 545 ms = (int) 546 NSECS_TO_MS(tout - elapsed); 547 if (MSECS_TO_NS(ms) != 548 tout - elapsed) 549 ms++; 550 } 551 552 (void) _set_tcp_conntime(fd, ms); 553 } 554 } 555 556 if (t_connect(fd, &sndcallstr, rcvcall) != -1) { 557 connected = TRUE; 558 break; 559 } 560 if (t_errno == TLOOK) { 561 switch (t_look(fd)) { 562 case T_DISCONNECT: 563 (void) t_rcvdis(fd, (struct 564 t_discon *) NULL); 565 break; 566 default: 567 break; 568 } 569 } else if (!(t_errno == TSYSERR && errno == EINTR)) { 570 break; 571 } 572 if ((state = t_getstate(fd)) == T_OUTCON) { 573 do_rcv_connect = TRUE; 574 break; 575 } 576 if (state != T_IDLE) { 577 break; 578 } 579 } 580 if (do_rcv_connect) { 581 do { 582 if (t_rcvconnect(fd, rcvcall) != -1) { 583 connected = TRUE; 584 break; 585 } 586 } while (t_errno == TSYSERR && errno == EINTR); 587 } 588 589 /* 590 * Set the connection timeout back to its old value. 591 */ 592 if (curr_time != -1) { 593 (void) _set_tcp_conntime(fd, curr_time); 594 } 595 596 if (!connected) { 597 rpc_createerr.cf_stat = RPC_TLIERROR; 598 rpc_createerr.cf_error.re_terrno = t_errno; 599 rpc_createerr.cf_error.re_errno = errno; 600 (void) t_free((char *)rcvcall, T_CALL); 601 return (FALSE); 602 } 603 604 /* Free old area if allocated */ 605 if (ct->ct_addr.buf) 606 free(ct->ct_addr.buf); 607 ct->ct_addr = rcvcall->addr; /* To get the new address */ 608 /* So that address buf does not get freed */ 609 rcvcall->addr.buf = NULL; 610 (void) t_free((char *)rcvcall, T_CALL); 611 break; 612 case T_DATAXFER: 613 case T_OUTCON: 614 if (svcaddr == NULL) { 615 /* 616 * svcaddr could also be NULL in cases where the 617 * client is already bound and connected. 618 */ 619 ct->ct_addr.len = 0; 620 } else { 621 ct->ct_addr.buf = malloc(svcaddr->len); 622 if (ct->ct_addr.buf == NULL) { 623 (void) syslog(LOG_ERR, clnt_vc_errstr, 624 clnt_vc_str, __no_mem_str); 625 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 626 rpc_createerr.cf_error.re_errno = errno; 627 rpc_createerr.cf_error.re_terrno = 0; 628 return (FALSE); 629 } 630 (void) memcpy(ct->ct_addr.buf, svcaddr->buf, 631 (size_t)svcaddr->len); 632 ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len; 633 } 634 break; 635 default: 636 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 637 return (FALSE); 638 } 639 return (TRUE); 640 } 641 642 static enum clnt_stat 643 clnt_vc_call(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr, 644 xdrproc_t xdr_results, caddr_t results_ptr, struct timeval timeout) 645 { 646 /* LINTED pointer alignment */ 647 struct ct_data *ct = (struct ct_data *)cl->cl_private; 648 XDR *xdrs = &(ct->ct_xdrs); 649 struct rpc_msg reply_msg; 650 uint32_t x_id; 651 /* LINTED pointer alignment */ 652 uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */ 653 bool_t shipnow; 654 int refreshes = 2; 655 656 if (rpc_fd_lock(vctbl, ct->ct_fd)) { 657 rpc_callerr.re_status = RPC_FAILED; 658 rpc_callerr.re_errno = errno; 659 rpc_fd_unlock(vctbl, ct->ct_fd); 660 return (RPC_FAILED); 661 } 662 663 ct->ct_is_oneway = FALSE; 664 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 665 if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) { 666 rpc_fd_unlock(vctbl, ct->ct_fd); 667 return (RPC_FAILED); /* XXX */ 668 } 669 } 670 671 if (!ct->ct_waitset) { 672 /* If time is not within limits, we ignore it. */ 673 if (time_not_ok(&timeout) == FALSE) 674 ct->ct_wait = __rpc_timeval_to_msec(&timeout); 675 } else { 676 timeout.tv_sec = (ct->ct_wait / 1000); 677 timeout.tv_usec = (ct->ct_wait % 1000) * 1000; 678 } 679 680 shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) && 681 (timeout.tv_usec == 0)) ? FALSE : TRUE; 682 call_again: 683 xdrs->x_op = XDR_ENCODE; 684 rpc_callerr.re_status = RPC_SUCCESS; 685 /* 686 * Due to little endian byte order, it is necessary to convert to host 687 * format before decrementing xid. 688 */ 689 x_id = ntohl(*msg_x_id) - 1; 690 *msg_x_id = htonl(x_id); 691 692 if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) { 693 if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) || 694 (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) || 695 (!AUTH_MARSHALL(cl->cl_auth, xdrs)) || 696 (!xdr_args(xdrs, args_ptr))) { 697 if (rpc_callerr.re_status == RPC_SUCCESS) 698 rpc_callerr.re_status = RPC_CANTENCODEARGS; 699 (void) xdrrec_endofrecord(xdrs, TRUE); 700 rpc_fd_unlock(vctbl, ct->ct_fd); 701 return (rpc_callerr.re_status); 702 } 703 } else { 704 /* LINTED pointer alignment */ 705 uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos]; 706 IXDR_PUT_U_INT32(u, proc); 707 if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall, 708 ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) { 709 if (rpc_callerr.re_status == RPC_SUCCESS) 710 rpc_callerr.re_status = RPC_CANTENCODEARGS; 711 (void) xdrrec_endofrecord(xdrs, TRUE); 712 rpc_fd_unlock(vctbl, ct->ct_fd); 713 return (rpc_callerr.re_status); 714 } 715 } 716 if (!xdrrec_endofrecord(xdrs, shipnow)) { 717 rpc_fd_unlock(vctbl, ct->ct_fd); 718 return (rpc_callerr.re_status = RPC_CANTSEND); 719 } 720 if (!shipnow) { 721 rpc_fd_unlock(vctbl, ct->ct_fd); 722 return (RPC_SUCCESS); 723 } 724 /* 725 * Hack to provide rpc-based message passing 726 */ 727 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { 728 rpc_fd_unlock(vctbl, ct->ct_fd); 729 return (rpc_callerr.re_status = RPC_TIMEDOUT); 730 } 731 732 733 /* 734 * Keep receiving until we get a valid transaction id 735 */ 736 xdrs->x_op = XDR_DECODE; 737 for (;;) { 738 reply_msg.acpted_rply.ar_verf = _null_auth; 739 reply_msg.acpted_rply.ar_results.where = NULL; 740 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 741 if (!xdrrec_skiprecord(xdrs)) { 742 rpc_fd_unlock(vctbl, ct->ct_fd); 743 return (rpc_callerr.re_status); 744 } 745 /* now decode and validate the response header */ 746 if (!xdr_replymsg(xdrs, &reply_msg)) { 747 if (rpc_callerr.re_status == RPC_SUCCESS) 748 continue; 749 rpc_fd_unlock(vctbl, ct->ct_fd); 750 return (rpc_callerr.re_status); 751 } 752 if (reply_msg.rm_xid == x_id) 753 break; 754 } 755 756 /* 757 * process header 758 */ 759 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 760 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 761 rpc_callerr.re_status = RPC_SUCCESS; 762 else 763 __seterr_reply(&reply_msg, &(rpc_callerr)); 764 765 if (rpc_callerr.re_status == RPC_SUCCESS) { 766 if (!AUTH_VALIDATE(cl->cl_auth, 767 &reply_msg.acpted_rply.ar_verf)) { 768 rpc_callerr.re_status = RPC_AUTHERROR; 769 rpc_callerr.re_why = AUTH_INVALIDRESP; 770 } else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) { 771 if (!(*xdr_results)(xdrs, results_ptr)) { 772 if (rpc_callerr.re_status == RPC_SUCCESS) 773 rpc_callerr.re_status = 774 RPC_CANTDECODERES; 775 } 776 } else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results, 777 results_ptr)) { 778 if (rpc_callerr.re_status == RPC_SUCCESS) 779 rpc_callerr.re_status = RPC_CANTDECODERES; 780 } 781 } /* end successful completion */ 782 /* 783 * If unsuccesful AND error is an authentication error 784 * then refresh credentials and try again, else break 785 */ 786 else if (rpc_callerr.re_status == RPC_AUTHERROR) { 787 /* maybe our credentials need to be refreshed ... */ 788 if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg)) 789 goto call_again; 790 else 791 /* 792 * We are setting rpc_callerr here given that libnsl 793 * is not reentrant thereby reinitializing the TSD. 794 * If not set here then success could be returned even 795 * though refresh failed. 796 */ 797 rpc_callerr.re_status = RPC_AUTHERROR; 798 } /* end of unsuccessful completion */ 799 /* free verifier ... */ 800 if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED && 801 reply_msg.acpted_rply.ar_verf.oa_base != NULL) { 802 xdrs->x_op = XDR_FREE; 803 (void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf)); 804 } 805 rpc_fd_unlock(vctbl, ct->ct_fd); 806 return (rpc_callerr.re_status); 807 } 808 809 static enum clnt_stat 810 clnt_vc_send(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr) 811 { 812 /* LINTED pointer alignment */ 813 struct ct_data *ct = (struct ct_data *)cl->cl_private; 814 XDR *xdrs = &(ct->ct_xdrs); 815 uint32_t x_id; 816 /* LINTED pointer alignment */ 817 uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */ 818 819 if (rpc_fd_lock(vctbl, ct->ct_fd)) { 820 rpc_callerr.re_status = RPC_FAILED; 821 rpc_callerr.re_errno = errno; 822 rpc_fd_unlock(vctbl, ct->ct_fd); 823 return (RPC_FAILED); 824 } 825 826 ct->ct_is_oneway = TRUE; 827 828 xdrs->x_op = XDR_ENCODE; 829 rpc_callerr.re_status = RPC_SUCCESS; 830 /* 831 * Due to little endian byte order, it is necessary to convert to host 832 * format before decrementing xid. 833 */ 834 x_id = ntohl(*msg_x_id) - 1; 835 *msg_x_id = htonl(x_id); 836 837 if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) { 838 if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) || 839 (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) || 840 (!AUTH_MARSHALL(cl->cl_auth, xdrs)) || 841 (!xdr_args(xdrs, args_ptr))) { 842 if (rpc_callerr.re_status == RPC_SUCCESS) 843 rpc_callerr.re_status = RPC_CANTENCODEARGS; 844 (void) xdrrec_endofrecord(xdrs, TRUE); 845 rpc_fd_unlock(vctbl, ct->ct_fd); 846 return (rpc_callerr.re_status); 847 } 848 } else { 849 /* LINTED pointer alignment */ 850 uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos]; 851 IXDR_PUT_U_INT32(u, proc); 852 if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall, 853 ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) { 854 if (rpc_callerr.re_status == RPC_SUCCESS) 855 rpc_callerr.re_status = RPC_CANTENCODEARGS; 856 (void) xdrrec_endofrecord(xdrs, TRUE); 857 rpc_fd_unlock(vctbl, ct->ct_fd); 858 return (rpc_callerr.re_status); 859 } 860 } 861 862 /* 863 * Do not need to check errors, as the following code does 864 * not depend on the successful completion of the call. 865 * An error, if any occurs, is reported through 866 * rpc_callerr.re_status. 867 */ 868 (void) xdrrec_endofrecord(xdrs, TRUE); 869 870 rpc_fd_unlock(vctbl, ct->ct_fd); 871 return (rpc_callerr.re_status); 872 } 873 874 /* ARGSUSED */ 875 static void 876 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp) 877 { 878 *errp = rpc_callerr; 879 } 880 881 static bool_t 882 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, caddr_t res_ptr) 883 { 884 /* LINTED pointer alignment */ 885 struct ct_data *ct = (struct ct_data *)cl->cl_private; 886 XDR *xdrs = &(ct->ct_xdrs); 887 bool_t stat; 888 889 (void) rpc_fd_lock(vctbl, ct->ct_fd); 890 xdrs->x_op = XDR_FREE; 891 stat = (*xdr_res)(xdrs, res_ptr); 892 rpc_fd_unlock(vctbl, ct->ct_fd); 893 return (stat); 894 } 895 896 static void 897 clnt_vc_abort(void) 898 { 899 } 900 901 /*ARGSUSED*/ 902 static bool_t 903 clnt_vc_control(CLIENT *cl, int request, char *info) 904 { 905 bool_t ret; 906 /* LINTED pointer alignment */ 907 struct ct_data *ct = (struct ct_data *)cl->cl_private; 908 909 if (rpc_fd_lock(vctbl, ct->ct_fd)) { 910 rpc_fd_unlock(vctbl, ct->ct_fd); 911 return (FALSE); 912 } 913 914 switch (request) { 915 case CLSET_FD_CLOSE: 916 ct->ct_closeit = TRUE; 917 rpc_fd_unlock(vctbl, ct->ct_fd); 918 return (TRUE); 919 case CLSET_FD_NCLOSE: 920 ct->ct_closeit = FALSE; 921 rpc_fd_unlock(vctbl, ct->ct_fd); 922 return (TRUE); 923 case CLFLUSH: 924 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 925 int res; 926 res = do_flush(ct, (info == NULL || 927 /* LINTED pointer cast */ 928 *(int *)info == RPC_CL_DEFAULT_FLUSH)? 929 /* LINTED pointer cast */ 930 ct->ct_blocking_mode: *(int *)info); 931 ret = (0 == res); 932 } else { 933 ret = FALSE; 934 } 935 rpc_fd_unlock(vctbl, ct->ct_fd); 936 return (ret); 937 } 938 939 /* for other requests which use info */ 940 if (info == NULL) { 941 rpc_fd_unlock(vctbl, ct->ct_fd); 942 return (FALSE); 943 } 944 switch (request) { 945 case CLSET_TIMEOUT: 946 /* LINTED pointer alignment */ 947 if (time_not_ok((struct timeval *)info)) { 948 rpc_fd_unlock(vctbl, ct->ct_fd); 949 return (FALSE); 950 } 951 /* LINTED pointer alignment */ 952 ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info); 953 ct->ct_waitset = TRUE; 954 break; 955 case CLGET_TIMEOUT: 956 /* LINTED pointer alignment */ 957 ((struct timeval *)info)->tv_sec = ct->ct_wait / 1000; 958 /* LINTED pointer alignment */ 959 ((struct timeval *)info)->tv_usec = (ct->ct_wait % 1000) * 1000; 960 break; 961 case CLGET_SERVER_ADDR: /* For compatibility only */ 962 (void) memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len); 963 break; 964 case CLGET_FD: 965 /* LINTED pointer alignment */ 966 *(int *)info = ct->ct_fd; 967 break; 968 case CLGET_SVC_ADDR: 969 /* The caller should not free this memory area */ 970 /* LINTED pointer alignment */ 971 *(struct netbuf *)info = ct->ct_addr; 972 break; 973 case CLSET_SVC_ADDR: /* set to new address */ 974 #ifdef undef 975 /* 976 * XXX: once the t_snddis(), followed by t_connect() starts to 977 * work, this ifdef should be removed. CLIENT handle reuse 978 * would then be possible for COTS as well. 979 */ 980 if (t_snddis(ct->ct_fd, NULL) == -1) { 981 rpc_createerr.cf_stat = RPC_TLIERROR; 982 rpc_createerr.cf_error.re_terrno = t_errno; 983 rpc_createerr.cf_error.re_errno = errno; 984 rpc_fd_unlock(vctbl, ct->ct_fd); 985 return (FALSE); 986 } 987 ret = set_up_connection(ct->ct_fd, (struct netbuf *)info, 988 ct, NULL); 989 rpc_fd_unlock(vctbl, ct->ct_fd); 990 return (ret); 991 #else 992 rpc_fd_unlock(vctbl, ct->ct_fd); 993 return (FALSE); 994 #endif 995 case CLGET_XID: 996 /* 997 * use the knowledge that xid is the 998 * first element in the call structure 999 * This will get the xid of the PREVIOUS call 1000 */ 1001 /* LINTED pointer alignment */ 1002 *(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall); 1003 break; 1004 case CLSET_XID: 1005 /* This will set the xid of the NEXT call */ 1006 /* LINTED pointer alignment */ 1007 *(uint32_t *)ct->ct_mcall = htonl(*(uint32_t *)info + 1); 1008 /* increment by 1 as clnt_vc_call() decrements once */ 1009 break; 1010 case CLGET_VERS: 1011 /* 1012 * This RELIES on the information that, in the call body, 1013 * the version number field is the fifth field from the 1014 * begining of the RPC header. MUST be changed if the 1015 * call_struct is changed 1016 */ 1017 /* LINTED pointer alignment */ 1018 *(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall + 1019 4 * BYTES_PER_XDR_UNIT)); 1020 break; 1021 1022 case CLSET_VERS: 1023 /* LINTED pointer alignment */ 1024 *(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) = 1025 /* LINTED pointer alignment */ 1026 htonl(*(uint32_t *)info); 1027 break; 1028 1029 case CLGET_PROG: 1030 /* 1031 * This RELIES on the information that, in the call body, 1032 * the program number field is the fourth field from the 1033 * begining of the RPC header. MUST be changed if the 1034 * call_struct is changed 1035 */ 1036 /* LINTED pointer alignment */ 1037 *(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall + 1038 3 * BYTES_PER_XDR_UNIT)); 1039 break; 1040 1041 case CLSET_PROG: 1042 /* LINTED pointer alignment */ 1043 *(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) = 1044 /* LINTED pointer alignment */ 1045 htonl(*(uint32_t *)info); 1046 break; 1047 1048 case CLSET_IO_MODE: 1049 /* LINTED pointer cast */ 1050 if (!set_io_mode(ct, *(int *)info)) { 1051 rpc_fd_unlock(vctbl, ct->ct_fd); 1052 return (FALSE); 1053 } 1054 break; 1055 case CLSET_FLUSH_MODE: 1056 /* Set a specific FLUSH_MODE */ 1057 /* LINTED pointer cast */ 1058 if (!set_flush_mode(ct, *(int *)info)) { 1059 rpc_fd_unlock(vctbl, ct->ct_fd); 1060 return (FALSE); 1061 } 1062 break; 1063 case CLGET_FLUSH_MODE: 1064 /* LINTED pointer cast */ 1065 *(rpcflushmode_t *)info = ct->ct_blocking_mode; 1066 break; 1067 1068 case CLGET_IO_MODE: 1069 /* LINTED pointer cast */ 1070 *(rpciomode_t *)info = ct->ct_io_mode; 1071 break; 1072 1073 case CLGET_CURRENT_REC_SIZE: 1074 /* 1075 * Returns the current amount of memory allocated 1076 * to pending requests 1077 */ 1078 /* LINTED pointer cast */ 1079 *(int *)info = ct->ct_bufferPendingSize; 1080 break; 1081 1082 case CLSET_CONNMAXREC_SIZE: 1083 /* Cannot resize the buffer if it is used. */ 1084 if (ct->ct_bufferPendingSize != 0) { 1085 rpc_fd_unlock(vctbl, ct->ct_fd); 1086 return (FALSE); 1087 } 1088 /* 1089 * If the new size is equal to the current size, 1090 * there is nothing to do. 1091 */ 1092 /* LINTED pointer cast */ 1093 if (ct->ct_bufferSize == *(uint_t *)info) 1094 break; 1095 1096 /* LINTED pointer cast */ 1097 ct->ct_bufferSize = *(uint_t *)info; 1098 if (ct->ct_buffer) { 1099 free(ct->ct_buffer); 1100 ct->ct_buffer = NULL; 1101 ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL; 1102 } 1103 break; 1104 1105 case CLGET_CONNMAXREC_SIZE: 1106 /* 1107 * Returns the size of buffer allocated 1108 * to pending requests 1109 */ 1110 /* LINTED pointer cast */ 1111 *(uint_t *)info = ct->ct_bufferSize; 1112 break; 1113 1114 default: 1115 rpc_fd_unlock(vctbl, ct->ct_fd); 1116 return (FALSE); 1117 } 1118 rpc_fd_unlock(vctbl, ct->ct_fd); 1119 return (TRUE); 1120 } 1121 1122 static void 1123 clnt_vc_destroy(CLIENT *cl) 1124 { 1125 /* LINTED pointer alignment */ 1126 struct ct_data *ct = (struct ct_data *)cl->cl_private; 1127 int ct_fd = ct->ct_fd; 1128 1129 (void) rpc_fd_lock(vctbl, ct_fd); 1130 1131 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 1132 (void) do_flush(ct, RPC_CL_BLOCKING_FLUSH); 1133 (void) unregister_nb(ct); 1134 } 1135 1136 if (ct->ct_closeit) 1137 (void) t_close(ct_fd); 1138 XDR_DESTROY(&(ct->ct_xdrs)); 1139 if (ct->ct_addr.buf) 1140 free(ct->ct_addr.buf); 1141 free(ct); 1142 if (cl->cl_netid && cl->cl_netid[0]) 1143 free(cl->cl_netid); 1144 if (cl->cl_tp && cl->cl_tp[0]) 1145 free(cl->cl_tp); 1146 free(cl); 1147 rpc_fd_unlock(vctbl, ct_fd); 1148 } 1149 1150 /* 1151 * Interface between xdr serializer and vc connection. 1152 * Behaves like the system calls, read & write, but keeps some error state 1153 * around for the rpc level. 1154 */ 1155 static int 1156 read_vc(void *ct_tmp, caddr_t buf, int len) 1157 { 1158 static pthread_key_t pfdp_key = PTHREAD_ONCE_KEY_NP; 1159 struct pollfd *pfdp; 1160 int npfd; /* total number of pfdp allocated */ 1161 struct ct_data *ct = ct_tmp; 1162 struct timeval starttime; 1163 struct timeval curtime; 1164 int poll_time; 1165 int delta; 1166 1167 if (len == 0) 1168 return (0); 1169 1170 /* 1171 * Allocate just one the first time. thr_get_storage() may 1172 * return a larger buffer, left over from the last time we were 1173 * here, but that's OK. realloc() will deal with it properly. 1174 */ 1175 npfd = 1; 1176 pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free); 1177 if (pfdp == NULL) { 1178 (void) syslog(LOG_ERR, clnt_vc_errstr, 1179 clnt_read_vc_str, __no_mem_str); 1180 rpc_callerr.re_status = RPC_SYSTEMERROR; 1181 rpc_callerr.re_errno = errno; 1182 rpc_callerr.re_terrno = 0; 1183 return (-1); 1184 } 1185 1186 /* 1187 * N.B.: slot 0 in the pollfd array is reserved for the file 1188 * descriptor we're really interested in (as opposed to the 1189 * callback descriptors). 1190 */ 1191 pfdp[0].fd = ct->ct_fd; 1192 pfdp[0].events = MASKVAL; 1193 pfdp[0].revents = 0; 1194 poll_time = ct->ct_wait; 1195 if (gettimeofday(&starttime, NULL) == -1) { 1196 syslog(LOG_ERR, "Unable to get time of day: %m"); 1197 return (-1); 1198 } 1199 1200 for (;;) { 1201 extern void (*_svc_getreqset_proc)(); 1202 extern pollfd_t *svc_pollfd; 1203 extern int svc_max_pollfd; 1204 int fds; 1205 1206 /* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */ 1207 1208 if (_svc_getreqset_proc) { 1209 sig_rw_rdlock(&svc_fd_lock); 1210 1211 /* reallocate pfdp to svc_max_pollfd +1 */ 1212 if (npfd != (svc_max_pollfd + 1)) { 1213 struct pollfd *tmp_pfdp = realloc(pfdp, 1214 sizeof (struct pollfd) * 1215 (svc_max_pollfd + 1)); 1216 if (tmp_pfdp == NULL) { 1217 sig_rw_unlock(&svc_fd_lock); 1218 (void) syslog(LOG_ERR, clnt_vc_errstr, 1219 clnt_read_vc_str, __no_mem_str); 1220 rpc_callerr.re_status = RPC_SYSTEMERROR; 1221 rpc_callerr.re_errno = errno; 1222 rpc_callerr.re_terrno = 0; 1223 return (-1); 1224 } 1225 1226 pfdp = tmp_pfdp; 1227 npfd = svc_max_pollfd + 1; 1228 (void) pthread_setspecific(pfdp_key, pfdp); 1229 } 1230 if (npfd > 1) 1231 (void) memcpy(&pfdp[1], svc_pollfd, 1232 sizeof (struct pollfd) * (npfd - 1)); 1233 1234 sig_rw_unlock(&svc_fd_lock); 1235 } else { 1236 npfd = 1; /* don't forget about pfdp[0] */ 1237 } 1238 1239 switch (fds = poll(pfdp, npfd, poll_time)) { 1240 case 0: 1241 rpc_callerr.re_status = RPC_TIMEDOUT; 1242 return (-1); 1243 1244 case -1: 1245 if (errno != EINTR) 1246 continue; 1247 else { 1248 /* 1249 * interrupted by another signal, 1250 * update time_waited 1251 */ 1252 1253 if (gettimeofday(&curtime, NULL) == -1) { 1254 syslog(LOG_ERR, 1255 "Unable to get time of day: %m"); 1256 errno = 0; 1257 continue; 1258 }; 1259 delta = (curtime.tv_sec - 1260 starttime.tv_sec) * 1000 + 1261 (curtime.tv_usec - 1262 starttime.tv_usec) / 1000; 1263 poll_time -= delta; 1264 if (poll_time < 0) { 1265 rpc_callerr.re_status = RPC_TIMEDOUT; 1266 errno = 0; 1267 return (-1); 1268 } else { 1269 errno = 0; /* reset it */ 1270 continue; 1271 } 1272 } 1273 } 1274 1275 if (pfdp[0].revents == 0) { 1276 /* must be for server side of the house */ 1277 (*_svc_getreqset_proc)(&pfdp[1], fds); 1278 continue; /* do poll again */ 1279 } 1280 1281 if (pfdp[0].revents & POLLNVAL) { 1282 rpc_callerr.re_status = RPC_CANTRECV; 1283 /* 1284 * Note: we're faking errno here because we 1285 * previously would have expected select() to 1286 * return -1 with errno EBADF. Poll(BA_OS) 1287 * returns 0 and sets the POLLNVAL revents flag 1288 * instead. 1289 */ 1290 rpc_callerr.re_errno = errno = EBADF; 1291 return (-1); 1292 } 1293 1294 if (pfdp[0].revents & (POLLERR | POLLHUP)) { 1295 rpc_callerr.re_status = RPC_CANTRECV; 1296 rpc_callerr.re_errno = errno = EPIPE; 1297 return (-1); 1298 } 1299 break; 1300 } 1301 1302 switch (len = t_rcvall(ct->ct_fd, buf, len)) { 1303 case 0: 1304 /* premature eof */ 1305 rpc_callerr.re_errno = ENOLINK; 1306 rpc_callerr.re_terrno = 0; 1307 rpc_callerr.re_status = RPC_CANTRECV; 1308 len = -1; /* it's really an error */ 1309 break; 1310 1311 case -1: 1312 rpc_callerr.re_terrno = t_errno; 1313 rpc_callerr.re_errno = 0; 1314 rpc_callerr.re_status = RPC_CANTRECV; 1315 break; 1316 } 1317 return (len); 1318 } 1319 1320 static int 1321 write_vc(void *ct_tmp, caddr_t buf, int len) 1322 { 1323 int i, cnt; 1324 struct ct_data *ct = ct_tmp; 1325 int flag; 1326 int maxsz; 1327 1328 maxsz = ct->ct_tsdu; 1329 1330 /* Handle the non-blocking mode */ 1331 if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) { 1332 /* 1333 * Test a special case here. If the length of the current 1334 * write is greater than the transport data unit, and the 1335 * mode is non blocking, we return RPC_CANTSEND. 1336 * XXX this is not very clean. 1337 */ 1338 if (maxsz > 0 && len > maxsz) { 1339 rpc_callerr.re_terrno = errno; 1340 rpc_callerr.re_errno = 0; 1341 rpc_callerr.re_status = RPC_CANTSEND; 1342 return (-1); 1343 } 1344 1345 len = nb_send(ct, buf, (unsigned)len); 1346 if (len == -1) { 1347 rpc_callerr.re_terrno = errno; 1348 rpc_callerr.re_errno = 0; 1349 rpc_callerr.re_status = RPC_CANTSEND; 1350 } else if (len == -2) { 1351 rpc_callerr.re_terrno = 0; 1352 rpc_callerr.re_errno = 0; 1353 rpc_callerr.re_status = RPC_CANTSTORE; 1354 } 1355 return (len); 1356 } 1357 1358 if ((maxsz == 0) || (maxsz == -1)) { 1359 /* 1360 * T_snd may return -1 for error on connection (connection 1361 * needs to be repaired/closed, and -2 for flow-control 1362 * handling error (no operation to do, just wait and call 1363 * T_Flush()). 1364 */ 1365 if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) { 1366 rpc_callerr.re_terrno = t_errno; 1367 rpc_callerr.re_errno = 0; 1368 rpc_callerr.re_status = RPC_CANTSEND; 1369 } 1370 return (len); 1371 } 1372 1373 /* 1374 * This for those transports which have a max size for data. 1375 */ 1376 for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) { 1377 flag = cnt > maxsz ? T_MORE : 0; 1378 if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz), 1379 flag)) == -1) { 1380 rpc_callerr.re_terrno = t_errno; 1381 rpc_callerr.re_errno = 0; 1382 rpc_callerr.re_status = RPC_CANTSEND; 1383 return (-1); 1384 } 1385 } 1386 return (len); 1387 } 1388 1389 /* 1390 * Receive the required bytes of data, even if it is fragmented. 1391 */ 1392 static int 1393 t_rcvall(int fd, char *buf, int len) 1394 { 1395 int moreflag; 1396 int final = 0; 1397 int res; 1398 1399 do { 1400 moreflag = 0; 1401 res = t_rcv(fd, buf, (unsigned)len, &moreflag); 1402 if (res == -1) { 1403 if (t_errno == TLOOK) 1404 switch (t_look(fd)) { 1405 case T_DISCONNECT: 1406 (void) t_rcvdis(fd, NULL); 1407 (void) t_snddis(fd, NULL); 1408 return (-1); 1409 case T_ORDREL: 1410 /* Received orderly release indication */ 1411 (void) t_rcvrel(fd); 1412 /* Send orderly release indicator */ 1413 (void) t_sndrel(fd); 1414 return (-1); 1415 default: 1416 return (-1); 1417 } 1418 } else if (res == 0) { 1419 return (0); 1420 } 1421 final += res; 1422 buf += res; 1423 len -= res; 1424 } while ((len > 0) && (moreflag & T_MORE)); 1425 return (final); 1426 } 1427 1428 static struct clnt_ops * 1429 clnt_vc_ops(void) 1430 { 1431 static struct clnt_ops ops; 1432 extern mutex_t ops_lock; 1433 1434 /* VARIABLES PROTECTED BY ops_lock: ops */ 1435 1436 sig_mutex_lock(&ops_lock); 1437 if (ops.cl_call == NULL) { 1438 ops.cl_call = clnt_vc_call; 1439 ops.cl_send = clnt_vc_send; 1440 ops.cl_abort = clnt_vc_abort; 1441 ops.cl_geterr = clnt_vc_geterr; 1442 ops.cl_freeres = clnt_vc_freeres; 1443 ops.cl_destroy = clnt_vc_destroy; 1444 ops.cl_control = clnt_vc_control; 1445 } 1446 sig_mutex_unlock(&ops_lock); 1447 return (&ops); 1448 } 1449 1450 /* 1451 * Make sure that the time is not garbage. -1 value is disallowed. 1452 * Note this is different from time_not_ok in clnt_dg.c 1453 */ 1454 static bool_t 1455 time_not_ok(struct timeval *t) 1456 { 1457 return (t->tv_sec <= -1 || t->tv_sec > 100000000 || 1458 t->tv_usec <= -1 || t->tv_usec > 1000000); 1459 } 1460 1461 1462 /* Compute the # of bytes that remains until the end of the buffer */ 1463 #define REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer)) 1464 1465 static int 1466 addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes) 1467 { 1468 if (NULL == ct->ct_buffer) { 1469 /* Buffer not allocated yet. */ 1470 char *buffer; 1471 1472 buffer = malloc(ct->ct_bufferSize); 1473 if (NULL == buffer) { 1474 errno = ENOMEM; 1475 return (-1); 1476 } 1477 (void) memcpy(buffer, dataToAdd, nBytes); 1478 1479 ct->ct_buffer = buffer; 1480 ct->ct_bufferReadPtr = buffer; 1481 ct->ct_bufferWritePtr = buffer + nBytes; 1482 ct->ct_bufferPendingSize = nBytes; 1483 } else { 1484 /* 1485 * For an already allocated buffer, two mem copies 1486 * might be needed, depending on the current 1487 * writing position. 1488 */ 1489 1490 /* Compute the length of the first copy. */ 1491 int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr)); 1492 1493 ct->ct_bufferPendingSize += nBytes; 1494 1495 (void) memcpy(ct->ct_bufferWritePtr, dataToAdd, len); 1496 ct->ct_bufferWritePtr += len; 1497 nBytes -= len; 1498 if (0 == nBytes) { 1499 /* One memcopy needed. */ 1500 1501 /* 1502 * If the write pointer is at the end of the buffer, 1503 * wrap it now. 1504 */ 1505 if (ct->ct_bufferWritePtr == 1506 (ct->ct_buffer + ct->ct_bufferSize)) { 1507 ct->ct_bufferWritePtr = ct->ct_buffer; 1508 } 1509 } else { 1510 /* Two memcopy needed. */ 1511 dataToAdd += len; 1512 1513 /* 1514 * Copy the remaining data to the beginning of the 1515 * buffer 1516 */ 1517 (void) memcpy(ct->ct_buffer, dataToAdd, nBytes); 1518 ct->ct_bufferWritePtr = ct->ct_buffer + nBytes; 1519 } 1520 } 1521 return (0); 1522 } 1523 1524 static void 1525 consumeFromBuffer(struct ct_data *ct, unsigned int nBytes) 1526 { 1527 ct->ct_bufferPendingSize -= nBytes; 1528 if (ct->ct_bufferPendingSize == 0) { 1529 /* 1530 * If the buffer contains no data, we set the two pointers at 1531 * the beginning of the buffer (to miminize buffer wraps). 1532 */ 1533 ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer; 1534 } else { 1535 ct->ct_bufferReadPtr += nBytes; 1536 if (ct->ct_bufferReadPtr > 1537 ct->ct_buffer + ct->ct_bufferSize) { 1538 ct->ct_bufferReadPtr -= ct->ct_bufferSize; 1539 } 1540 } 1541 } 1542 1543 static int 1544 iovFromBuffer(struct ct_data *ct, struct iovec *iov) 1545 { 1546 int l; 1547 1548 if (ct->ct_bufferPendingSize == 0) 1549 return (0); 1550 1551 l = REMAIN_BYTES(bufferReadPtr); 1552 if (l < ct->ct_bufferPendingSize) { 1553 /* Buffer in two fragments. */ 1554 iov[0].iov_base = ct->ct_bufferReadPtr; 1555 iov[0].iov_len = l; 1556 1557 iov[1].iov_base = ct->ct_buffer; 1558 iov[1].iov_len = ct->ct_bufferPendingSize - l; 1559 return (2); 1560 } else { 1561 /* Buffer in one fragment. */ 1562 iov[0].iov_base = ct->ct_bufferReadPtr; 1563 iov[0].iov_len = ct->ct_bufferPendingSize; 1564 return (1); 1565 } 1566 } 1567 1568 static bool_t 1569 set_flush_mode(struct ct_data *ct, int mode) 1570 { 1571 switch (mode) { 1572 case RPC_CL_BLOCKING_FLUSH: 1573 /* flush as most as possible without blocking */ 1574 case RPC_CL_BESTEFFORT_FLUSH: 1575 /* flush the buffer completely (possibly blocking) */ 1576 case RPC_CL_DEFAULT_FLUSH: 1577 /* flush according to the currently defined policy */ 1578 ct->ct_blocking_mode = mode; 1579 return (TRUE); 1580 default: 1581 return (FALSE); 1582 } 1583 } 1584 1585 static bool_t 1586 set_io_mode(struct ct_data *ct, int ioMode) 1587 { 1588 switch (ioMode) { 1589 case RPC_CL_BLOCKING: 1590 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 1591 if (NULL != ct->ct_buffer) { 1592 /* 1593 * If a buffer was allocated for this 1594 * connection, flush it now, and free it. 1595 */ 1596 (void) do_flush(ct, RPC_CL_BLOCKING_FLUSH); 1597 free(ct->ct_buffer); 1598 ct->ct_buffer = NULL; 1599 } 1600 (void) unregister_nb(ct); 1601 ct->ct_io_mode = ioMode; 1602 } 1603 break; 1604 case RPC_CL_NONBLOCKING: 1605 if (ct->ct_io_mode == RPC_CL_BLOCKING) { 1606 if (-1 == register_nb(ct)) { 1607 return (FALSE); 1608 } 1609 ct->ct_io_mode = ioMode; 1610 } 1611 break; 1612 default: 1613 return (FALSE); 1614 } 1615 return (TRUE); 1616 } 1617 1618 static int 1619 do_flush(struct ct_data *ct, uint_t flush_mode) 1620 { 1621 int result; 1622 if (ct->ct_bufferPendingSize == 0) { 1623 return (0); 1624 } 1625 1626 switch (flush_mode) { 1627 case RPC_CL_BLOCKING_FLUSH: 1628 if (!set_blocking_connection(ct, TRUE)) { 1629 return (-1); 1630 } 1631 while (ct->ct_bufferPendingSize > 0) { 1632 if (REMAIN_BYTES(bufferReadPtr) < 1633 ct->ct_bufferPendingSize) { 1634 struct iovec iov[2]; 1635 (void) iovFromBuffer(ct, iov); 1636 result = writev(ct->ct_fd, iov, 2); 1637 } else { 1638 result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr, 1639 ct->ct_bufferPendingSize, 0); 1640 } 1641 if (result < 0) { 1642 return (-1); 1643 } 1644 consumeFromBuffer(ct, result); 1645 } 1646 1647 break; 1648 1649 case RPC_CL_BESTEFFORT_FLUSH: 1650 (void) set_blocking_connection(ct, FALSE); 1651 if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) { 1652 struct iovec iov[2]; 1653 (void) iovFromBuffer(ct, iov); 1654 result = writev(ct->ct_fd, iov, 2); 1655 } else { 1656 result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr, 1657 ct->ct_bufferPendingSize, 0); 1658 } 1659 if (result < 0) { 1660 if (errno != EWOULDBLOCK) { 1661 perror("flush"); 1662 return (-1); 1663 } 1664 return (0); 1665 } 1666 if (result > 0) 1667 consumeFromBuffer(ct, result); 1668 break; 1669 } 1670 return (0); 1671 } 1672 1673 /* 1674 * Non blocking send. 1675 */ 1676 1677 static int 1678 nb_send(struct ct_data *ct, void *buff, unsigned int nBytes) 1679 { 1680 int result; 1681 1682 if (!(ntohl(*(uint32_t *)buff) & 2^31)) { 1683 return (-1); 1684 } 1685 1686 /* 1687 * Check to see if the current message can be stored fully in the 1688 * buffer. We have to check this now because it may be impossible 1689 * to send any data, so the message must be stored in the buffer. 1690 */ 1691 if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) { 1692 /* Try to flush (to free some space). */ 1693 (void) do_flush(ct, RPC_CL_BESTEFFORT_FLUSH); 1694 1695 /* Can we store the message now ? */ 1696 if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) 1697 return (-2); 1698 } 1699 1700 (void) set_blocking_connection(ct, FALSE); 1701 1702 /* 1703 * If there is no data pending, we can simply try 1704 * to send our data. 1705 */ 1706 if (ct->ct_bufferPendingSize == 0) { 1707 result = t_snd(ct->ct_fd, buff, nBytes, 0); 1708 if (result == -1) { 1709 if (errno == EWOULDBLOCK) { 1710 result = 0; 1711 } else { 1712 perror("send"); 1713 return (-1); 1714 } 1715 } 1716 /* 1717 * If we have not sent all data, we must store them 1718 * in the buffer. 1719 */ 1720 if (result != nBytes) { 1721 if (addInBuffer(ct, (char *)buff + result, 1722 nBytes - result) == -1) { 1723 return (-1); 1724 } 1725 } 1726 } else { 1727 /* 1728 * Some data pending in the buffer. We try to send 1729 * both buffer data and current message in one shot. 1730 */ 1731 struct iovec iov[3]; 1732 int i = iovFromBuffer(ct, &iov[0]); 1733 1734 iov[i].iov_base = buff; 1735 iov[i].iov_len = nBytes; 1736 1737 result = writev(ct->ct_fd, iov, i+1); 1738 if (result == -1) { 1739 if (errno == EWOULDBLOCK) { 1740 /* No bytes sent */ 1741 result = 0; 1742 } else { 1743 return (-1); 1744 } 1745 } 1746 1747 /* 1748 * Add the bytes from the message 1749 * that we have not sent. 1750 */ 1751 if (result <= ct->ct_bufferPendingSize) { 1752 /* No bytes from the message sent */ 1753 consumeFromBuffer(ct, result); 1754 if (addInBuffer(ct, buff, nBytes) == -1) { 1755 return (-1); 1756 } 1757 } else { 1758 /* 1759 * Some bytes of the message are sent. 1760 * Compute the length of the message that has 1761 * been sent. 1762 */ 1763 int len = result - ct->ct_bufferPendingSize; 1764 1765 /* So, empty the buffer. */ 1766 ct->ct_bufferReadPtr = ct->ct_buffer; 1767 ct->ct_bufferWritePtr = ct->ct_buffer; 1768 ct->ct_bufferPendingSize = 0; 1769 1770 /* And add the remaining part of the message. */ 1771 if (len != nBytes) { 1772 if (addInBuffer(ct, (char *)buff + len, 1773 nBytes-len) == -1) { 1774 return (-1); 1775 } 1776 } 1777 } 1778 } 1779 return (nBytes); 1780 } 1781 1782 static void 1783 flush_registered_clients(void) 1784 { 1785 struct nb_reg_node *node; 1786 1787 if (LIST_ISEMPTY(nb_first)) { 1788 return; 1789 } 1790 1791 LIST_FOR_EACH(nb_first, node) { 1792 (void) do_flush(node->ct, RPC_CL_BLOCKING_FLUSH); 1793 } 1794 } 1795 1796 static int 1797 allocate_chunk(void) 1798 { 1799 #define CHUNK_SIZE 16 1800 struct nb_reg_node *chk = 1801 malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE); 1802 struct nb_reg_node *n; 1803 int i; 1804 1805 if (NULL == chk) { 1806 return (-1); 1807 } 1808 1809 n = chk; 1810 for (i = 0; i < CHUNK_SIZE-1; ++i) { 1811 n[i].next = &(n[i+1]); 1812 } 1813 n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free; 1814 nb_free = chk; 1815 return (0); 1816 } 1817 1818 static int 1819 register_nb(struct ct_data *ct) 1820 { 1821 struct nb_reg_node *node; 1822 1823 (void) mutex_lock(&nb_list_mutex); 1824 1825 if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) { 1826 (void) mutex_unlock(&nb_list_mutex); 1827 errno = ENOMEM; 1828 return (-1); 1829 } 1830 1831 if (!exit_handler_set) { 1832 (void) atexit(flush_registered_clients); 1833 exit_handler_set = TRUE; 1834 } 1835 /* Get the first free node */ 1836 LIST_EXTRACT(nb_free, node); 1837 1838 node->ct = ct; 1839 1840 LIST_ADD(nb_first, node); 1841 (void) mutex_unlock(&nb_list_mutex); 1842 1843 return (0); 1844 } 1845 1846 static int 1847 unregister_nb(struct ct_data *ct) 1848 { 1849 struct nb_reg_node *node; 1850 1851 (void) mutex_lock(&nb_list_mutex); 1852 assert(!LIST_ISEMPTY(nb_first)); 1853 1854 node = nb_first; 1855 LIST_FOR_EACH(nb_first, node) { 1856 if (node->next->ct == ct) { 1857 /* Get the node to unregister. */ 1858 struct nb_reg_node *n = node->next; 1859 node->next = n->next; 1860 1861 n->ct = NULL; 1862 LIST_ADD(nb_free, n); 1863 break; 1864 } 1865 } 1866 (void) mutex_unlock(&nb_list_mutex); 1867 return (0); 1868 } 1869