1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2015 Nexenta Systems, Inc. All rights reserved. 24 * Copyright (c) 2016 by Delphix. All rights reserved. 25 */ 26 27 /* 28 * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 29 * Use is subject to license terms. 30 */ 31 32 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ 33 /* All Rights Reserved */ 34 /* 35 * Portions of this source code were derived from Berkeley 36 * 4.3 BSD under license from the Regents of the University of 37 * California. 38 */ 39 40 /* 41 * clnt_vc.c 42 * 43 * Implements a connectionful client side RPC. 44 * 45 * Connectionful RPC supports 'batched calls'. 46 * A sequence of calls may be batched-up in a send buffer. The rpc call 47 * return immediately to the client even though the call was not necessarily 48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND 49 * the rpc timeout value is zero (see clnt.h, rpc). 50 * 51 * Clients should NOT casually batch calls that in fact return results; that 52 * is the server side should be aware that a call is batched and not produce 53 * any return message. Batched calls that produce many result messages can 54 * deadlock (netlock) the client and the server.... 55 */ 56 57 58 #include "mt.h" 59 #include "rpc_mt.h" 60 #include <assert.h> 61 #include <rpc/rpc.h> 62 #include <errno.h> 63 #include <sys/byteorder.h> 64 #include <sys/mkdev.h> 65 #include <sys/poll.h> 66 #include <syslog.h> 67 #include <stdlib.h> 68 #include <unistd.h> 69 #include <netinet/tcp.h> 70 #include <limits.h> 71 72 #define MCALL_MSG_SIZE 24 73 #define SECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000 * 1000) 74 #define MSECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000) 75 #define USECS_TO_NS(x) ((hrtime_t)(x) * 1000) 76 #define NSECS_TO_MS(x) ((x) / 1000 / 1000) 77 #ifndef MIN 78 #define MIN(a, b) (((a) < (b)) ? (a) : (b)) 79 #endif 80 81 extern int __rpc_timeval_to_msec(struct timeval *); 82 extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *); 83 extern bool_t xdr_opaque_auth(XDR *, struct opaque_auth *); 84 extern bool_t __rpc_gss_wrap(AUTH *, char *, uint_t, XDR *, bool_t (*)(), 85 caddr_t); 86 extern bool_t __rpc_gss_unwrap(AUTH *, XDR *, bool_t (*)(), caddr_t); 87 extern CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t, 88 rpcvers_t, uint_t, uint_t, const struct timeval *); 89 90 static struct clnt_ops *clnt_vc_ops(void); 91 static int read_vc(void *, caddr_t, int); 92 static int write_vc(void *, caddr_t, int); 93 static int t_rcvall(int, char *, int); 94 static bool_t time_not_ok(struct timeval *); 95 96 struct ct_data; 97 static bool_t set_up_connection(int, struct netbuf *, 98 struct ct_data *, const struct timeval *); 99 static bool_t set_io_mode(struct ct_data *, int); 100 101 /* 102 * Lock table handle used by various MT sync. routines 103 */ 104 static mutex_t vctbl_lock = DEFAULTMUTEX; 105 static void *vctbl = NULL; 106 107 static const char clnt_vc_errstr[] = "%s : %s"; 108 static const char clnt_vc_str[] = "clnt_vc_create"; 109 static const char clnt_read_vc_str[] = "read_vc"; 110 static const char __no_mem_str[] = "out of memory"; 111 static const char no_fcntl_getfl_str[] = "could not get status flags and modes"; 112 static const char no_nonblock_str[] = "could not set transport blocking mode"; 113 114 /* 115 * Private data structure 116 */ 117 struct ct_data { 118 int ct_fd; /* connection's fd */ 119 bool_t ct_closeit; /* close it on destroy */ 120 int ct_tsdu; /* size of tsdu */ 121 int ct_wait; /* wait interval in milliseconds */ 122 bool_t ct_waitset; /* wait set by clnt_control? */ 123 struct netbuf ct_addr; /* remote addr */ 124 struct rpc_err ct_error; 125 char ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */ 126 uint_t ct_mpos; /* pos after marshal */ 127 XDR ct_xdrs; /* XDR stream */ 128 129 /* NON STANDARD INFO - 00-08-31 */ 130 bool_t ct_is_oneway; /* True if the current call is oneway. */ 131 bool_t ct_is_blocking; 132 ushort_t ct_io_mode; 133 ushort_t ct_blocking_mode; 134 uint_t ct_bufferSize; /* Total size of the buffer. */ 135 uint_t ct_bufferPendingSize; /* Size of unsent data. */ 136 char *ct_buffer; /* Pointer to the buffer. */ 137 char *ct_bufferWritePtr; /* Ptr to the first free byte. */ 138 char *ct_bufferReadPtr; /* Ptr to the first byte of data. */ 139 }; 140 141 struct nb_reg_node { 142 struct nb_reg_node *next; 143 struct ct_data *ct; 144 }; 145 146 static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first; 147 static struct nb_reg_node *nb_free = (struct nb_reg_node *)&nb_free; 148 149 static bool_t exit_handler_set = FALSE; 150 151 static mutex_t nb_list_mutex = DEFAULTMUTEX; 152 153 154 /* Define some macros to manage the linked list. */ 155 #define LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l) 156 #define LIST_CLR(l) (l = (struct nb_reg_node *)&l) 157 #define LIST_ADD(l, node) (node->next = l->next, l = node) 158 #define LIST_EXTRACT(l, node) (node = l, l = l->next) 159 #define LIST_FOR_EACH(l, node) \ 160 for (node = l; node != (struct nb_reg_node *)&l; node = node->next) 161 162 163 /* Default size of the IO buffer used in non blocking mode */ 164 #define DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024) 165 166 static int nb_send(struct ct_data *, void *, unsigned int); 167 static int do_flush(struct ct_data *, uint_t); 168 static bool_t set_flush_mode(struct ct_data *, int); 169 static bool_t set_blocking_connection(struct ct_data *, bool_t); 170 171 static int register_nb(struct ct_data *); 172 static int unregister_nb(struct ct_data *); 173 174 175 /* 176 * Change the mode of the underlying fd. 177 */ 178 static bool_t 179 set_blocking_connection(struct ct_data *ct, bool_t blocking) 180 { 181 int flag; 182 183 /* 184 * If the underlying fd is already in the required mode, 185 * avoid the syscall. 186 */ 187 if (ct->ct_is_blocking == blocking) 188 return (TRUE); 189 190 if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) { 191 (void) syslog(LOG_ERR, "set_blocking_connection : %s", 192 no_fcntl_getfl_str); 193 return (FALSE); 194 } 195 196 flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK; 197 if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) { 198 (void) syslog(LOG_ERR, "set_blocking_connection : %s", 199 no_nonblock_str); 200 return (FALSE); 201 } 202 ct->ct_is_blocking = blocking; 203 return (TRUE); 204 } 205 206 /* 207 * Create a client handle for a connection. 208 * Default options are set, which the user can change using clnt_control()'s. 209 * The rpc/vc package does buffering similar to stdio, so the client 210 * must pick send and receive buffer sizes, 0 => use the default. 211 * NB: fd is copied into a private area. 212 * NB: The rpch->cl_auth is set null authentication. Caller may wish to 213 * set this something more useful. 214 * 215 * fd should be open and bound. 216 */ 217 CLIENT * 218 clnt_vc_create(const int fd, struct netbuf *svcaddr, const rpcprog_t prog, 219 const rpcvers_t vers, const uint_t sendsz, const uint_t recvsz) 220 { 221 return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz, 222 recvsz, NULL)); 223 } 224 225 /* 226 * This has the same definition as clnt_vc_create(), except it 227 * takes an additional parameter - a pointer to a timeval structure. 228 * 229 * Not a public interface. This is for clnt_create_timed, 230 * clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout 231 * value to control a tcp connection attempt. 232 * (for bug 4049792: clnt_create_timed does not time out) 233 * 234 * If tp is NULL, use default timeout to set up the connection. 235 */ 236 CLIENT * 237 _clnt_vc_create_timed(int fd, struct netbuf *svcaddr, rpcprog_t prog, 238 rpcvers_t vers, uint_t sendsz, uint_t recvsz, const struct timeval *tp) 239 { 240 CLIENT *cl; /* client handle */ 241 struct ct_data *ct; /* private data */ 242 struct timeval now; 243 struct rpc_msg call_msg; 244 struct t_info tinfo; 245 int flag; 246 247 cl = malloc(sizeof (*cl)); 248 if ((ct = malloc(sizeof (*ct))) != NULL) 249 ct->ct_addr.buf = NULL; 250 251 if ((cl == NULL) || (ct == NULL)) { 252 (void) syslog(LOG_ERR, clnt_vc_errstr, 253 clnt_vc_str, __no_mem_str); 254 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 255 rpc_createerr.cf_error.re_errno = errno; 256 rpc_createerr.cf_error.re_terrno = 0; 257 goto err; 258 } 259 260 /* 261 * The only use of vctbl_lock is for serializing the creation of 262 * vctbl. Once created the lock needs to be released so we don't 263 * hold it across the set_up_connection() call and end up with a 264 * bunch of threads stuck waiting for the mutex. 265 */ 266 sig_mutex_lock(&vctbl_lock); 267 268 if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) { 269 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 270 rpc_createerr.cf_error.re_errno = errno; 271 rpc_createerr.cf_error.re_terrno = 0; 272 sig_mutex_unlock(&vctbl_lock); 273 goto err; 274 } 275 276 sig_mutex_unlock(&vctbl_lock); 277 278 ct->ct_io_mode = RPC_CL_BLOCKING; 279 ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH; 280 281 ct->ct_buffer = NULL; /* We allocate the buffer when needed. */ 282 ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE; 283 ct->ct_bufferPendingSize = 0; 284 ct->ct_bufferWritePtr = NULL; 285 ct->ct_bufferReadPtr = NULL; 286 287 /* Check the current state of the fd. */ 288 if ((flag = fcntl(fd, F_GETFL, 0)) < 0) { 289 (void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s", 290 no_fcntl_getfl_str); 291 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 292 rpc_createerr.cf_error.re_terrno = errno; 293 rpc_createerr.cf_error.re_errno = 0; 294 goto err; 295 } 296 ct->ct_is_blocking = flag & O_NONBLOCK ? FALSE : TRUE; 297 298 if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) { 299 goto err; 300 } 301 302 /* 303 * Set up other members of private data struct 304 */ 305 ct->ct_fd = fd; 306 /* 307 * The actual value will be set by clnt_call or clnt_control 308 */ 309 ct->ct_wait = 30000; 310 ct->ct_waitset = FALSE; 311 /* 312 * By default, closeit is always FALSE. It is users responsibility 313 * to do a t_close on it, else the user may use clnt_control 314 * to let clnt_destroy do it for them. 315 */ 316 ct->ct_closeit = FALSE; 317 318 /* 319 * Initialize call message 320 */ 321 (void) gettimeofday(&now, (struct timezone *)0); 322 call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec; 323 call_msg.rm_call.cb_prog = prog; 324 call_msg.rm_call.cb_vers = vers; 325 326 /* 327 * pre-serialize the static part of the call msg and stash it away 328 */ 329 xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE); 330 if (!xdr_callhdr(&(ct->ct_xdrs), &call_msg)) { 331 goto err; 332 } 333 ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs)); 334 XDR_DESTROY(&(ct->ct_xdrs)); 335 336 if (t_getinfo(fd, &tinfo) == -1) { 337 rpc_createerr.cf_stat = RPC_TLIERROR; 338 rpc_createerr.cf_error.re_terrno = t_errno; 339 rpc_createerr.cf_error.re_errno = 0; 340 goto err; 341 } 342 /* 343 * Find the receive and the send size 344 */ 345 sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu); 346 recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu); 347 if ((sendsz == 0) || (recvsz == 0)) { 348 rpc_createerr.cf_stat = RPC_TLIERROR; 349 rpc_createerr.cf_error.re_terrno = 0; 350 rpc_createerr.cf_error.re_errno = 0; 351 goto err; 352 } 353 ct->ct_tsdu = tinfo.tsdu; 354 /* 355 * Create a client handle which uses xdrrec for serialization 356 * and authnone for authentication. 357 */ 358 ct->ct_xdrs.x_ops = NULL; 359 xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct, 360 read_vc, write_vc); 361 if (ct->ct_xdrs.x_ops == NULL) { 362 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 363 rpc_createerr.cf_error.re_terrno = 0; 364 rpc_createerr.cf_error.re_errno = ENOMEM; 365 goto err; 366 } 367 cl->cl_ops = clnt_vc_ops(); 368 cl->cl_private = (caddr_t)ct; 369 cl->cl_auth = authnone_create(); 370 cl->cl_tp = NULL; 371 cl->cl_netid = NULL; 372 return (cl); 373 374 err: 375 if (ct) { 376 free(ct->ct_addr.buf); 377 free(ct); 378 } 379 free(cl); 380 381 return (NULL); 382 } 383 384 #define TCPOPT_BUFSIZE 128 385 386 /* 387 * Set tcp connection timeout value. 388 * Retun 0 for success, -1 for failure. 389 */ 390 static int 391 _set_tcp_conntime(int fd, int optval) 392 { 393 struct t_optmgmt req, res; 394 struct opthdr *opt; 395 int *ip; 396 char buf[TCPOPT_BUFSIZE]; 397 398 /* LINTED pointer cast */ 399 opt = (struct opthdr *)buf; 400 opt->level = IPPROTO_TCP; 401 opt->name = TCP_CONN_ABORT_THRESHOLD; 402 opt->len = sizeof (int); 403 404 req.flags = T_NEGOTIATE; 405 req.opt.len = sizeof (struct opthdr) + opt->len; 406 req.opt.buf = (char *)opt; 407 /* LINTED pointer cast */ 408 ip = (int *)((char *)buf + sizeof (struct opthdr)); 409 *ip = optval; 410 411 res.flags = 0; 412 res.opt.buf = (char *)buf; 413 res.opt.maxlen = sizeof (buf); 414 if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) { 415 return (-1); 416 } 417 return (0); 418 } 419 420 /* 421 * Get current tcp connection timeout value. 422 * Retun the timeout in milliseconds, or -1 for failure. 423 */ 424 static int 425 _get_tcp_conntime(int fd) 426 { 427 struct t_optmgmt req, res; 428 struct opthdr *opt; 429 int *ip, retval; 430 char buf[TCPOPT_BUFSIZE]; 431 432 /* LINTED pointer cast */ 433 opt = (struct opthdr *)buf; 434 opt->level = IPPROTO_TCP; 435 opt->name = TCP_CONN_ABORT_THRESHOLD; 436 opt->len = sizeof (int); 437 438 req.flags = T_CURRENT; 439 req.opt.len = sizeof (struct opthdr) + opt->len; 440 req.opt.buf = (char *)opt; 441 /* LINTED pointer cast */ 442 ip = (int *)((char *)buf + sizeof (struct opthdr)); 443 *ip = 0; 444 445 res.flags = 0; 446 res.opt.buf = (char *)buf; 447 res.opt.maxlen = sizeof (buf); 448 if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) { 449 return (-1); 450 } 451 452 /* LINTED pointer cast */ 453 ip = (int *)((char *)buf + sizeof (struct opthdr)); 454 retval = *ip; 455 return (retval); 456 } 457 458 static bool_t 459 set_up_connection(int fd, struct netbuf *svcaddr, struct ct_data *ct, 460 const struct timeval *tp) 461 { 462 int state; 463 struct t_call sndcallstr, *rcvcall; 464 int nconnect; 465 bool_t connected, do_rcv_connect; 466 int curr_time = -1; 467 hrtime_t start; 468 hrtime_t tout; /* timeout in nanoseconds (from tp) */ 469 470 ct->ct_addr.len = 0; 471 state = t_getstate(fd); 472 if (state == -1) { 473 rpc_createerr.cf_stat = RPC_TLIERROR; 474 rpc_createerr.cf_error.re_errno = 0; 475 rpc_createerr.cf_error.re_terrno = t_errno; 476 return (FALSE); 477 } 478 479 switch (state) { 480 case T_IDLE: 481 if (svcaddr == NULL) { 482 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 483 return (FALSE); 484 } 485 /* 486 * Connect only if state is IDLE and svcaddr known 487 */ 488 /* LINTED pointer alignment */ 489 rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR); 490 if (rcvcall == NULL) { 491 rpc_createerr.cf_stat = RPC_TLIERROR; 492 rpc_createerr.cf_error.re_terrno = t_errno; 493 rpc_createerr.cf_error.re_errno = errno; 494 return (FALSE); 495 } 496 rcvcall->udata.maxlen = 0; 497 sndcallstr.addr = *svcaddr; 498 sndcallstr.opt.len = 0; 499 sndcallstr.udata.len = 0; 500 /* 501 * Even NULL could have sufficed for rcvcall, because 502 * the address returned is same for all cases except 503 * for the gateway case, and hence required. 504 */ 505 connected = FALSE; 506 do_rcv_connect = FALSE; 507 508 /* 509 * If there is a timeout value specified, we will try to 510 * reset the tcp connection timeout. If the transport does 511 * not support the TCP_CONN_ABORT_THRESHOLD option or fails 512 * for other reason, default timeout will be used. 513 */ 514 if (tp != NULL) { 515 start = gethrtime(); 516 517 /* 518 * Calculate the timeout in nanoseconds 519 */ 520 tout = SECS_TO_NS(tp->tv_sec) + 521 USECS_TO_NS(tp->tv_usec); 522 curr_time = _get_tcp_conntime(fd); 523 } 524 525 for (nconnect = 0; nconnect < 3; nconnect++) { 526 if (tp != NULL) { 527 /* 528 * Calculate the elapsed time 529 */ 530 hrtime_t elapsed = gethrtime() - start; 531 if (elapsed >= tout) 532 break; 533 534 if (curr_time != -1) { 535 int ms; 536 537 /* 538 * TCP_CONN_ABORT_THRESHOLD takes int 539 * value in milliseconds. Make sure we 540 * do not overflow. 541 */ 542 if (NSECS_TO_MS(tout - elapsed) >= 543 INT_MAX) { 544 ms = INT_MAX; 545 } else { 546 ms = (int) 547 NSECS_TO_MS(tout - elapsed); 548 if (MSECS_TO_NS(ms) != 549 tout - elapsed) 550 ms++; 551 } 552 553 (void) _set_tcp_conntime(fd, ms); 554 } 555 } 556 557 if (t_connect(fd, &sndcallstr, rcvcall) != -1) { 558 connected = TRUE; 559 break; 560 } 561 if (t_errno == TLOOK) { 562 switch (t_look(fd)) { 563 case T_DISCONNECT: 564 (void) t_rcvdis(fd, (struct 565 t_discon *) NULL); 566 break; 567 default: 568 break; 569 } 570 } else if (!(t_errno == TSYSERR && errno == EINTR)) { 571 break; 572 } 573 if ((state = t_getstate(fd)) == T_OUTCON) { 574 do_rcv_connect = TRUE; 575 break; 576 } 577 if (state != T_IDLE) { 578 break; 579 } 580 } 581 if (do_rcv_connect) { 582 do { 583 if (t_rcvconnect(fd, rcvcall) != -1) { 584 connected = TRUE; 585 break; 586 } 587 } while (t_errno == TSYSERR && errno == EINTR); 588 } 589 590 /* 591 * Set the connection timeout back to its old value. 592 */ 593 if (curr_time != -1) { 594 (void) _set_tcp_conntime(fd, curr_time); 595 } 596 597 if (!connected) { 598 rpc_createerr.cf_stat = RPC_TLIERROR; 599 rpc_createerr.cf_error.re_terrno = t_errno; 600 rpc_createerr.cf_error.re_errno = errno; 601 (void) t_free((char *)rcvcall, T_CALL); 602 return (FALSE); 603 } 604 605 /* Free old area if allocated */ 606 if (ct->ct_addr.buf) 607 free(ct->ct_addr.buf); 608 ct->ct_addr = rcvcall->addr; /* To get the new address */ 609 /* So that address buf does not get freed */ 610 rcvcall->addr.buf = NULL; 611 (void) t_free((char *)rcvcall, T_CALL); 612 break; 613 case T_DATAXFER: 614 case T_OUTCON: 615 if (svcaddr == NULL) { 616 /* 617 * svcaddr could also be NULL in cases where the 618 * client is already bound and connected. 619 */ 620 ct->ct_addr.len = 0; 621 } else { 622 ct->ct_addr.buf = malloc(svcaddr->len); 623 if (ct->ct_addr.buf == NULL) { 624 (void) syslog(LOG_ERR, clnt_vc_errstr, 625 clnt_vc_str, __no_mem_str); 626 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 627 rpc_createerr.cf_error.re_errno = errno; 628 rpc_createerr.cf_error.re_terrno = 0; 629 return (FALSE); 630 } 631 (void) memcpy(ct->ct_addr.buf, svcaddr->buf, 632 (size_t)svcaddr->len); 633 ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len; 634 } 635 break; 636 default: 637 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 638 return (FALSE); 639 } 640 return (TRUE); 641 } 642 643 static enum clnt_stat 644 clnt_vc_call(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr, 645 xdrproc_t xdr_results, caddr_t results_ptr, struct timeval timeout) 646 { 647 /* LINTED pointer alignment */ 648 struct ct_data *ct = (struct ct_data *)cl->cl_private; 649 XDR *xdrs = &(ct->ct_xdrs); 650 struct rpc_msg reply_msg; 651 uint32_t x_id; 652 /* LINTED pointer alignment */ 653 uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */ 654 bool_t shipnow; 655 int refreshes = 2; 656 657 if (rpc_fd_lock(vctbl, ct->ct_fd)) { 658 rpc_callerr.re_status = RPC_FAILED; 659 rpc_callerr.re_errno = errno; 660 rpc_fd_unlock(vctbl, ct->ct_fd); 661 return (RPC_FAILED); 662 } 663 664 ct->ct_is_oneway = FALSE; 665 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 666 if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) { 667 rpc_fd_unlock(vctbl, ct->ct_fd); 668 return (RPC_FAILED); /* XXX */ 669 } 670 } 671 672 if (!ct->ct_waitset) { 673 /* If time is not within limits, we ignore it. */ 674 if (time_not_ok(&timeout) == FALSE) 675 ct->ct_wait = __rpc_timeval_to_msec(&timeout); 676 } else { 677 timeout.tv_sec = (ct->ct_wait / 1000); 678 timeout.tv_usec = (ct->ct_wait % 1000) * 1000; 679 } 680 681 shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) && 682 (timeout.tv_usec == 0)) ? FALSE : TRUE; 683 call_again: 684 xdrs->x_op = XDR_ENCODE; 685 rpc_callerr.re_status = RPC_SUCCESS; 686 /* 687 * Due to little endian byte order, it is necessary to convert to host 688 * format before decrementing xid. 689 */ 690 x_id = ntohl(*msg_x_id) - 1; 691 *msg_x_id = htonl(x_id); 692 693 if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) { 694 if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) || 695 (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) || 696 (!AUTH_MARSHALL(cl->cl_auth, xdrs)) || 697 (!xdr_args(xdrs, args_ptr))) { 698 if (rpc_callerr.re_status == RPC_SUCCESS) 699 rpc_callerr.re_status = RPC_CANTENCODEARGS; 700 (void) xdrrec_endofrecord(xdrs, TRUE); 701 rpc_fd_unlock(vctbl, ct->ct_fd); 702 return (rpc_callerr.re_status); 703 } 704 } else { 705 /* LINTED pointer alignment */ 706 uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos]; 707 IXDR_PUT_U_INT32(u, proc); 708 if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall, 709 ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) { 710 if (rpc_callerr.re_status == RPC_SUCCESS) 711 rpc_callerr.re_status = RPC_CANTENCODEARGS; 712 (void) xdrrec_endofrecord(xdrs, TRUE); 713 rpc_fd_unlock(vctbl, ct->ct_fd); 714 return (rpc_callerr.re_status); 715 } 716 } 717 if (!xdrrec_endofrecord(xdrs, shipnow)) { 718 rpc_fd_unlock(vctbl, ct->ct_fd); 719 return (rpc_callerr.re_status = RPC_CANTSEND); 720 } 721 if (!shipnow) { 722 rpc_fd_unlock(vctbl, ct->ct_fd); 723 return (RPC_SUCCESS); 724 } 725 /* 726 * Hack to provide rpc-based message passing 727 */ 728 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { 729 rpc_fd_unlock(vctbl, ct->ct_fd); 730 return (rpc_callerr.re_status = RPC_TIMEDOUT); 731 } 732 733 734 /* 735 * Keep receiving until we get a valid transaction id 736 */ 737 xdrs->x_op = XDR_DECODE; 738 for (;;) { 739 reply_msg.acpted_rply.ar_verf = _null_auth; 740 reply_msg.acpted_rply.ar_results.where = NULL; 741 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 742 if (!xdrrec_skiprecord(xdrs)) { 743 rpc_fd_unlock(vctbl, ct->ct_fd); 744 return (rpc_callerr.re_status); 745 } 746 /* now decode and validate the response header */ 747 if (!xdr_replymsg(xdrs, &reply_msg)) { 748 if (rpc_callerr.re_status == RPC_SUCCESS) 749 continue; 750 rpc_fd_unlock(vctbl, ct->ct_fd); 751 return (rpc_callerr.re_status); 752 } 753 if (reply_msg.rm_xid == x_id) 754 break; 755 } 756 757 /* 758 * process header 759 */ 760 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 761 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 762 rpc_callerr.re_status = RPC_SUCCESS; 763 else 764 __seterr_reply(&reply_msg, &(rpc_callerr)); 765 766 if (rpc_callerr.re_status == RPC_SUCCESS) { 767 if (!AUTH_VALIDATE(cl->cl_auth, 768 &reply_msg.acpted_rply.ar_verf)) { 769 rpc_callerr.re_status = RPC_AUTHERROR; 770 rpc_callerr.re_why = AUTH_INVALIDRESP; 771 } else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) { 772 if (!(*xdr_results)(xdrs, results_ptr)) { 773 if (rpc_callerr.re_status == RPC_SUCCESS) 774 rpc_callerr.re_status = 775 RPC_CANTDECODERES; 776 } 777 } else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results, 778 results_ptr)) { 779 if (rpc_callerr.re_status == RPC_SUCCESS) 780 rpc_callerr.re_status = RPC_CANTDECODERES; 781 } 782 } /* end successful completion */ 783 /* 784 * If unsuccesful AND error is an authentication error 785 * then refresh credentials and try again, else break 786 */ 787 else if (rpc_callerr.re_status == RPC_AUTHERROR) { 788 /* maybe our credentials need to be refreshed ... */ 789 if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg)) 790 goto call_again; 791 else 792 /* 793 * We are setting rpc_callerr here given that libnsl 794 * is not reentrant thereby reinitializing the TSD. 795 * If not set here then success could be returned even 796 * though refresh failed. 797 */ 798 rpc_callerr.re_status = RPC_AUTHERROR; 799 } /* end of unsuccessful completion */ 800 /* free verifier ... */ 801 if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED && 802 reply_msg.acpted_rply.ar_verf.oa_base != NULL) { 803 xdrs->x_op = XDR_FREE; 804 (void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf)); 805 } 806 rpc_fd_unlock(vctbl, ct->ct_fd); 807 return (rpc_callerr.re_status); 808 } 809 810 static enum clnt_stat 811 clnt_vc_send(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr) 812 { 813 /* LINTED pointer alignment */ 814 struct ct_data *ct = (struct ct_data *)cl->cl_private; 815 XDR *xdrs = &(ct->ct_xdrs); 816 uint32_t x_id; 817 /* LINTED pointer alignment */ 818 uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */ 819 820 if (rpc_fd_lock(vctbl, ct->ct_fd)) { 821 rpc_callerr.re_status = RPC_FAILED; 822 rpc_callerr.re_errno = errno; 823 rpc_fd_unlock(vctbl, ct->ct_fd); 824 return (RPC_FAILED); 825 } 826 827 ct->ct_is_oneway = TRUE; 828 829 xdrs->x_op = XDR_ENCODE; 830 rpc_callerr.re_status = RPC_SUCCESS; 831 /* 832 * Due to little endian byte order, it is necessary to convert to host 833 * format before decrementing xid. 834 */ 835 x_id = ntohl(*msg_x_id) - 1; 836 *msg_x_id = htonl(x_id); 837 838 if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) { 839 if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) || 840 (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) || 841 (!AUTH_MARSHALL(cl->cl_auth, xdrs)) || 842 (!xdr_args(xdrs, args_ptr))) { 843 if (rpc_callerr.re_status == RPC_SUCCESS) 844 rpc_callerr.re_status = RPC_CANTENCODEARGS; 845 (void) xdrrec_endofrecord(xdrs, TRUE); 846 rpc_fd_unlock(vctbl, ct->ct_fd); 847 return (rpc_callerr.re_status); 848 } 849 } else { 850 /* LINTED pointer alignment */ 851 uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos]; 852 IXDR_PUT_U_INT32(u, proc); 853 if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall, 854 ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) { 855 if (rpc_callerr.re_status == RPC_SUCCESS) 856 rpc_callerr.re_status = RPC_CANTENCODEARGS; 857 (void) xdrrec_endofrecord(xdrs, TRUE); 858 rpc_fd_unlock(vctbl, ct->ct_fd); 859 return (rpc_callerr.re_status); 860 } 861 } 862 863 /* 864 * Do not need to check errors, as the following code does 865 * not depend on the successful completion of the call. 866 * An error, if any occurs, is reported through 867 * rpc_callerr.re_status. 868 */ 869 (void) xdrrec_endofrecord(xdrs, TRUE); 870 871 rpc_fd_unlock(vctbl, ct->ct_fd); 872 return (rpc_callerr.re_status); 873 } 874 875 /* ARGSUSED */ 876 static void 877 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp) 878 { 879 *errp = rpc_callerr; 880 } 881 882 static bool_t 883 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, caddr_t res_ptr) 884 { 885 /* LINTED pointer alignment */ 886 struct ct_data *ct = (struct ct_data *)cl->cl_private; 887 XDR *xdrs = &(ct->ct_xdrs); 888 bool_t stat; 889 890 (void) rpc_fd_lock(vctbl, ct->ct_fd); 891 xdrs->x_op = XDR_FREE; 892 stat = (*xdr_res)(xdrs, res_ptr); 893 rpc_fd_unlock(vctbl, ct->ct_fd); 894 return (stat); 895 } 896 897 static void 898 clnt_vc_abort(void) 899 { 900 } 901 902 /*ARGSUSED*/ 903 static bool_t 904 clnt_vc_control(CLIENT *cl, int request, char *info) 905 { 906 bool_t ret; 907 /* LINTED pointer alignment */ 908 struct ct_data *ct = (struct ct_data *)cl->cl_private; 909 910 if (rpc_fd_lock(vctbl, ct->ct_fd)) { 911 rpc_fd_unlock(vctbl, ct->ct_fd); 912 return (FALSE); 913 } 914 915 switch (request) { 916 case CLSET_FD_CLOSE: 917 ct->ct_closeit = TRUE; 918 rpc_fd_unlock(vctbl, ct->ct_fd); 919 return (TRUE); 920 case CLSET_FD_NCLOSE: 921 ct->ct_closeit = FALSE; 922 rpc_fd_unlock(vctbl, ct->ct_fd); 923 return (TRUE); 924 case CLFLUSH: 925 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 926 int res; 927 res = do_flush(ct, (info == NULL || 928 /* LINTED pointer cast */ 929 *(int *)info == RPC_CL_DEFAULT_FLUSH)? 930 /* LINTED pointer cast */ 931 ct->ct_blocking_mode: *(int *)info); 932 ret = (0 == res); 933 } else { 934 ret = FALSE; 935 } 936 rpc_fd_unlock(vctbl, ct->ct_fd); 937 return (ret); 938 } 939 940 /* for other requests which use info */ 941 if (info == NULL) { 942 rpc_fd_unlock(vctbl, ct->ct_fd); 943 return (FALSE); 944 } 945 switch (request) { 946 case CLSET_TIMEOUT: 947 /* LINTED pointer alignment */ 948 if (time_not_ok((struct timeval *)info)) { 949 rpc_fd_unlock(vctbl, ct->ct_fd); 950 return (FALSE); 951 } 952 /* LINTED pointer alignment */ 953 ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info); 954 ct->ct_waitset = TRUE; 955 break; 956 case CLGET_TIMEOUT: 957 /* LINTED pointer alignment */ 958 ((struct timeval *)info)->tv_sec = ct->ct_wait / 1000; 959 /* LINTED pointer alignment */ 960 ((struct timeval *)info)->tv_usec = (ct->ct_wait % 1000) * 1000; 961 break; 962 case CLGET_SERVER_ADDR: /* For compatibility only */ 963 (void) memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len); 964 break; 965 case CLGET_FD: 966 /* LINTED pointer alignment */ 967 *(int *)info = ct->ct_fd; 968 break; 969 case CLGET_SVC_ADDR: 970 /* The caller should not free this memory area */ 971 /* LINTED pointer alignment */ 972 *(struct netbuf *)info = ct->ct_addr; 973 break; 974 case CLSET_SVC_ADDR: /* set to new address */ 975 #ifdef undef 976 /* 977 * XXX: once the t_snddis(), followed by t_connect() starts to 978 * work, this ifdef should be removed. CLIENT handle reuse 979 * would then be possible for COTS as well. 980 */ 981 if (t_snddis(ct->ct_fd, NULL) == -1) { 982 rpc_createerr.cf_stat = RPC_TLIERROR; 983 rpc_createerr.cf_error.re_terrno = t_errno; 984 rpc_createerr.cf_error.re_errno = errno; 985 rpc_fd_unlock(vctbl, ct->ct_fd); 986 return (FALSE); 987 } 988 ret = set_up_connection(ct->ct_fd, (struct netbuf *)info, 989 ct, NULL); 990 rpc_fd_unlock(vctbl, ct->ct_fd); 991 return (ret); 992 #else 993 rpc_fd_unlock(vctbl, ct->ct_fd); 994 return (FALSE); 995 #endif 996 case CLGET_XID: 997 /* 998 * use the knowledge that xid is the 999 * first element in the call structure 1000 * This will get the xid of the PREVIOUS call 1001 */ 1002 /* LINTED pointer alignment */ 1003 *(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall); 1004 break; 1005 case CLSET_XID: 1006 /* This will set the xid of the NEXT call */ 1007 /* LINTED pointer alignment */ 1008 *(uint32_t *)ct->ct_mcall = htonl(*(uint32_t *)info + 1); 1009 /* increment by 1 as clnt_vc_call() decrements once */ 1010 break; 1011 case CLGET_VERS: 1012 /* 1013 * This RELIES on the information that, in the call body, 1014 * the version number field is the fifth field from the 1015 * begining of the RPC header. MUST be changed if the 1016 * call_struct is changed 1017 */ 1018 /* LINTED pointer alignment */ 1019 *(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall + 1020 4 * BYTES_PER_XDR_UNIT)); 1021 break; 1022 1023 case CLSET_VERS: 1024 /* LINTED pointer alignment */ 1025 *(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) = 1026 /* LINTED pointer alignment */ 1027 htonl(*(uint32_t *)info); 1028 break; 1029 1030 case CLGET_PROG: 1031 /* 1032 * This RELIES on the information that, in the call body, 1033 * the program number field is the fourth field from the 1034 * begining of the RPC header. MUST be changed if the 1035 * call_struct is changed 1036 */ 1037 /* LINTED pointer alignment */ 1038 *(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall + 1039 3 * BYTES_PER_XDR_UNIT)); 1040 break; 1041 1042 case CLSET_PROG: 1043 /* LINTED pointer alignment */ 1044 *(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) = 1045 /* LINTED pointer alignment */ 1046 htonl(*(uint32_t *)info); 1047 break; 1048 1049 case CLSET_IO_MODE: 1050 /* LINTED pointer cast */ 1051 if (!set_io_mode(ct, *(int *)info)) { 1052 rpc_fd_unlock(vctbl, ct->ct_fd); 1053 return (FALSE); 1054 } 1055 break; 1056 case CLSET_FLUSH_MODE: 1057 /* Set a specific FLUSH_MODE */ 1058 /* LINTED pointer cast */ 1059 if (!set_flush_mode(ct, *(int *)info)) { 1060 rpc_fd_unlock(vctbl, ct->ct_fd); 1061 return (FALSE); 1062 } 1063 break; 1064 case CLGET_FLUSH_MODE: 1065 /* LINTED pointer cast */ 1066 *(rpcflushmode_t *)info = ct->ct_blocking_mode; 1067 break; 1068 1069 case CLGET_IO_MODE: 1070 /* LINTED pointer cast */ 1071 *(rpciomode_t *)info = ct->ct_io_mode; 1072 break; 1073 1074 case CLGET_CURRENT_REC_SIZE: 1075 /* 1076 * Returns the current amount of memory allocated 1077 * to pending requests 1078 */ 1079 /* LINTED pointer cast */ 1080 *(int *)info = ct->ct_bufferPendingSize; 1081 break; 1082 1083 case CLSET_CONNMAXREC_SIZE: 1084 /* Cannot resize the buffer if it is used. */ 1085 if (ct->ct_bufferPendingSize != 0) { 1086 rpc_fd_unlock(vctbl, ct->ct_fd); 1087 return (FALSE); 1088 } 1089 /* 1090 * If the new size is equal to the current size, 1091 * there is nothing to do. 1092 */ 1093 /* LINTED pointer cast */ 1094 if (ct->ct_bufferSize == *(uint_t *)info) 1095 break; 1096 1097 /* LINTED pointer cast */ 1098 ct->ct_bufferSize = *(uint_t *)info; 1099 if (ct->ct_buffer) { 1100 free(ct->ct_buffer); 1101 ct->ct_buffer = NULL; 1102 ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL; 1103 } 1104 break; 1105 1106 case CLGET_CONNMAXREC_SIZE: 1107 /* 1108 * Returns the size of buffer allocated 1109 * to pending requests 1110 */ 1111 /* LINTED pointer cast */ 1112 *(uint_t *)info = ct->ct_bufferSize; 1113 break; 1114 1115 default: 1116 rpc_fd_unlock(vctbl, ct->ct_fd); 1117 return (FALSE); 1118 } 1119 rpc_fd_unlock(vctbl, ct->ct_fd); 1120 return (TRUE); 1121 } 1122 1123 static void 1124 clnt_vc_destroy(CLIENT *cl) 1125 { 1126 /* LINTED pointer alignment */ 1127 struct ct_data *ct = (struct ct_data *)cl->cl_private; 1128 int ct_fd = ct->ct_fd; 1129 1130 (void) rpc_fd_lock(vctbl, ct_fd); 1131 1132 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 1133 (void) do_flush(ct, RPC_CL_BLOCKING_FLUSH); 1134 (void) unregister_nb(ct); 1135 } 1136 1137 if (ct->ct_closeit) 1138 (void) t_close(ct_fd); 1139 XDR_DESTROY(&(ct->ct_xdrs)); 1140 if (ct->ct_addr.buf) 1141 free(ct->ct_addr.buf); 1142 free(ct); 1143 if (cl->cl_netid && cl->cl_netid[0]) 1144 free(cl->cl_netid); 1145 if (cl->cl_tp && cl->cl_tp[0]) 1146 free(cl->cl_tp); 1147 free(cl); 1148 rpc_fd_unlock(vctbl, ct_fd); 1149 } 1150 1151 /* 1152 * Interface between xdr serializer and vc connection. 1153 * Behaves like the system calls, read & write, but keeps some error state 1154 * around for the rpc level. 1155 */ 1156 static int 1157 read_vc(void *ct_tmp, caddr_t buf, int len) 1158 { 1159 static pthread_key_t pfdp_key = PTHREAD_ONCE_KEY_NP; 1160 struct pollfd *pfdp; 1161 int npfd; /* total number of pfdp allocated */ 1162 struct ct_data *ct = ct_tmp; 1163 struct timeval starttime; 1164 struct timeval curtime; 1165 int poll_time; 1166 int delta; 1167 1168 if (len == 0) 1169 return (0); 1170 1171 /* 1172 * Allocate just one the first time. thr_get_storage() may 1173 * return a larger buffer, left over from the last time we were 1174 * here, but that's OK. realloc() will deal with it properly. 1175 */ 1176 npfd = 1; 1177 pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free); 1178 if (pfdp == NULL) { 1179 (void) syslog(LOG_ERR, clnt_vc_errstr, 1180 clnt_read_vc_str, __no_mem_str); 1181 rpc_callerr.re_status = RPC_SYSTEMERROR; 1182 rpc_callerr.re_errno = errno; 1183 rpc_callerr.re_terrno = 0; 1184 return (-1); 1185 } 1186 1187 /* 1188 * N.B.: slot 0 in the pollfd array is reserved for the file 1189 * descriptor we're really interested in (as opposed to the 1190 * callback descriptors). 1191 */ 1192 pfdp[0].fd = ct->ct_fd; 1193 pfdp[0].events = MASKVAL; 1194 pfdp[0].revents = 0; 1195 poll_time = ct->ct_wait; 1196 if (gettimeofday(&starttime, NULL) == -1) { 1197 syslog(LOG_ERR, "Unable to get time of day: %m"); 1198 return (-1); 1199 } 1200 1201 for (;;) { 1202 extern void (*_svc_getreqset_proc)(); 1203 extern pollfd_t *svc_pollfd; 1204 extern int svc_max_pollfd; 1205 int fds; 1206 1207 /* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */ 1208 1209 if (_svc_getreqset_proc) { 1210 sig_rw_rdlock(&svc_fd_lock); 1211 1212 /* reallocate pfdp to svc_max_pollfd +1 */ 1213 if (npfd != (svc_max_pollfd + 1)) { 1214 struct pollfd *tmp_pfdp = realloc(pfdp, 1215 sizeof (struct pollfd) * 1216 (svc_max_pollfd + 1)); 1217 if (tmp_pfdp == NULL) { 1218 sig_rw_unlock(&svc_fd_lock); 1219 (void) syslog(LOG_ERR, clnt_vc_errstr, 1220 clnt_read_vc_str, __no_mem_str); 1221 rpc_callerr.re_status = RPC_SYSTEMERROR; 1222 rpc_callerr.re_errno = errno; 1223 rpc_callerr.re_terrno = 0; 1224 return (-1); 1225 } 1226 1227 pfdp = tmp_pfdp; 1228 npfd = svc_max_pollfd + 1; 1229 (void) pthread_setspecific(pfdp_key, pfdp); 1230 } 1231 if (npfd > 1) 1232 (void) memcpy(&pfdp[1], svc_pollfd, 1233 sizeof (struct pollfd) * (npfd - 1)); 1234 1235 sig_rw_unlock(&svc_fd_lock); 1236 } else { 1237 npfd = 1; /* don't forget about pfdp[0] */ 1238 } 1239 1240 switch (fds = poll(pfdp, npfd, poll_time)) { 1241 case 0: 1242 rpc_callerr.re_status = RPC_TIMEDOUT; 1243 return (-1); 1244 1245 case -1: 1246 if (errno != EINTR) 1247 continue; 1248 else { 1249 /* 1250 * interrupted by another signal, 1251 * update time_waited 1252 */ 1253 1254 if (gettimeofday(&curtime, NULL) == -1) { 1255 syslog(LOG_ERR, 1256 "Unable to get time of day: %m"); 1257 errno = 0; 1258 continue; 1259 }; 1260 delta = (curtime.tv_sec - 1261 starttime.tv_sec) * 1000 + 1262 (curtime.tv_usec - 1263 starttime.tv_usec) / 1000; 1264 poll_time -= delta; 1265 if (poll_time < 0) { 1266 rpc_callerr.re_status = RPC_TIMEDOUT; 1267 errno = 0; 1268 return (-1); 1269 } else { 1270 errno = 0; /* reset it */ 1271 continue; 1272 } 1273 } 1274 } 1275 1276 if (pfdp[0].revents == 0) { 1277 /* must be for server side of the house */ 1278 (*_svc_getreqset_proc)(&pfdp[1], fds); 1279 continue; /* do poll again */ 1280 } 1281 1282 if (pfdp[0].revents & POLLNVAL) { 1283 rpc_callerr.re_status = RPC_CANTRECV; 1284 /* 1285 * Note: we're faking errno here because we 1286 * previously would have expected select() to 1287 * return -1 with errno EBADF. Poll(BA_OS) 1288 * returns 0 and sets the POLLNVAL revents flag 1289 * instead. 1290 */ 1291 rpc_callerr.re_errno = errno = EBADF; 1292 return (-1); 1293 } 1294 1295 if (pfdp[0].revents & (POLLERR | POLLHUP)) { 1296 rpc_callerr.re_status = RPC_CANTRECV; 1297 rpc_callerr.re_errno = errno = EPIPE; 1298 return (-1); 1299 } 1300 break; 1301 } 1302 1303 switch (len = t_rcvall(ct->ct_fd, buf, len)) { 1304 case 0: 1305 /* premature eof */ 1306 rpc_callerr.re_errno = ENOLINK; 1307 rpc_callerr.re_terrno = 0; 1308 rpc_callerr.re_status = RPC_CANTRECV; 1309 len = -1; /* it's really an error */ 1310 break; 1311 1312 case -1: 1313 rpc_callerr.re_terrno = t_errno; 1314 rpc_callerr.re_errno = 0; 1315 rpc_callerr.re_status = RPC_CANTRECV; 1316 break; 1317 } 1318 return (len); 1319 } 1320 1321 static int 1322 write_vc(void *ct_tmp, caddr_t buf, int len) 1323 { 1324 int i, cnt; 1325 struct ct_data *ct = ct_tmp; 1326 int flag; 1327 int maxsz; 1328 1329 maxsz = ct->ct_tsdu; 1330 1331 /* Handle the non-blocking mode */ 1332 if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) { 1333 /* 1334 * Test a special case here. If the length of the current 1335 * write is greater than the transport data unit, and the 1336 * mode is non blocking, we return RPC_CANTSEND. 1337 * XXX this is not very clean. 1338 */ 1339 if (maxsz > 0 && len > maxsz) { 1340 rpc_callerr.re_terrno = errno; 1341 rpc_callerr.re_errno = 0; 1342 rpc_callerr.re_status = RPC_CANTSEND; 1343 return (-1); 1344 } 1345 1346 len = nb_send(ct, buf, (unsigned)len); 1347 if (len == -1) { 1348 rpc_callerr.re_terrno = errno; 1349 rpc_callerr.re_errno = 0; 1350 rpc_callerr.re_status = RPC_CANTSEND; 1351 } else if (len == -2) { 1352 rpc_callerr.re_terrno = 0; 1353 rpc_callerr.re_errno = 0; 1354 rpc_callerr.re_status = RPC_CANTSTORE; 1355 } 1356 return (len); 1357 } 1358 1359 if ((maxsz == 0) || (maxsz == -1)) { 1360 /* 1361 * T_snd may return -1 for error on connection (connection 1362 * needs to be repaired/closed, and -2 for flow-control 1363 * handling error (no operation to do, just wait and call 1364 * T_Flush()). 1365 */ 1366 if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) { 1367 rpc_callerr.re_terrno = t_errno; 1368 rpc_callerr.re_errno = 0; 1369 rpc_callerr.re_status = RPC_CANTSEND; 1370 } 1371 return (len); 1372 } 1373 1374 /* 1375 * This for those transports which have a max size for data. 1376 */ 1377 for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) { 1378 flag = cnt > maxsz ? T_MORE : 0; 1379 if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz), 1380 flag)) == -1) { 1381 rpc_callerr.re_terrno = t_errno; 1382 rpc_callerr.re_errno = 0; 1383 rpc_callerr.re_status = RPC_CANTSEND; 1384 return (-1); 1385 } 1386 } 1387 return (len); 1388 } 1389 1390 /* 1391 * Receive the required bytes of data, even if it is fragmented. 1392 */ 1393 static int 1394 t_rcvall(int fd, char *buf, int len) 1395 { 1396 int moreflag; 1397 int final = 0; 1398 int res; 1399 1400 do { 1401 moreflag = 0; 1402 res = t_rcv(fd, buf, (unsigned)len, &moreflag); 1403 if (res == -1) { 1404 if (t_errno == TLOOK) 1405 switch (t_look(fd)) { 1406 case T_DISCONNECT: 1407 (void) t_rcvdis(fd, NULL); 1408 (void) t_snddis(fd, NULL); 1409 return (-1); 1410 case T_ORDREL: 1411 /* Received orderly release indication */ 1412 (void) t_rcvrel(fd); 1413 /* Send orderly release indicator */ 1414 (void) t_sndrel(fd); 1415 return (-1); 1416 default: 1417 return (-1); 1418 } 1419 } else if (res == 0) { 1420 return (0); 1421 } 1422 final += res; 1423 buf += res; 1424 len -= res; 1425 } while ((len > 0) && (moreflag & T_MORE)); 1426 return (final); 1427 } 1428 1429 static struct clnt_ops * 1430 clnt_vc_ops(void) 1431 { 1432 static struct clnt_ops ops; 1433 extern mutex_t ops_lock; 1434 1435 /* VARIABLES PROTECTED BY ops_lock: ops */ 1436 1437 sig_mutex_lock(&ops_lock); 1438 if (ops.cl_call == NULL) { 1439 ops.cl_call = clnt_vc_call; 1440 ops.cl_send = clnt_vc_send; 1441 ops.cl_abort = clnt_vc_abort; 1442 ops.cl_geterr = clnt_vc_geterr; 1443 ops.cl_freeres = clnt_vc_freeres; 1444 ops.cl_destroy = clnt_vc_destroy; 1445 ops.cl_control = clnt_vc_control; 1446 } 1447 sig_mutex_unlock(&ops_lock); 1448 return (&ops); 1449 } 1450 1451 /* 1452 * Make sure that the time is not garbage. -1 value is disallowed. 1453 * Note this is different from time_not_ok in clnt_dg.c 1454 */ 1455 static bool_t 1456 time_not_ok(struct timeval *t) 1457 { 1458 return (t->tv_sec <= -1 || t->tv_sec > 100000000 || 1459 t->tv_usec <= -1 || t->tv_usec > 1000000); 1460 } 1461 1462 1463 /* Compute the # of bytes that remains until the end of the buffer */ 1464 #define REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer)) 1465 1466 static int 1467 addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes) 1468 { 1469 if (NULL == ct->ct_buffer) { 1470 /* Buffer not allocated yet. */ 1471 char *buffer; 1472 1473 buffer = malloc(ct->ct_bufferSize); 1474 if (NULL == buffer) { 1475 errno = ENOMEM; 1476 return (-1); 1477 } 1478 (void) memcpy(buffer, dataToAdd, nBytes); 1479 1480 ct->ct_buffer = buffer; 1481 ct->ct_bufferReadPtr = buffer; 1482 ct->ct_bufferWritePtr = buffer + nBytes; 1483 ct->ct_bufferPendingSize = nBytes; 1484 } else { 1485 /* 1486 * For an already allocated buffer, two mem copies 1487 * might be needed, depending on the current 1488 * writing position. 1489 */ 1490 1491 /* Compute the length of the first copy. */ 1492 int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr)); 1493 1494 ct->ct_bufferPendingSize += nBytes; 1495 1496 (void) memcpy(ct->ct_bufferWritePtr, dataToAdd, len); 1497 ct->ct_bufferWritePtr += len; 1498 nBytes -= len; 1499 if (0 == nBytes) { 1500 /* One memcopy needed. */ 1501 1502 /* 1503 * If the write pointer is at the end of the buffer, 1504 * wrap it now. 1505 */ 1506 if (ct->ct_bufferWritePtr == 1507 (ct->ct_buffer + ct->ct_bufferSize)) { 1508 ct->ct_bufferWritePtr = ct->ct_buffer; 1509 } 1510 } else { 1511 /* Two memcopy needed. */ 1512 dataToAdd += len; 1513 1514 /* 1515 * Copy the remaining data to the beginning of the 1516 * buffer 1517 */ 1518 (void) memcpy(ct->ct_buffer, dataToAdd, nBytes); 1519 ct->ct_bufferWritePtr = ct->ct_buffer + nBytes; 1520 } 1521 } 1522 return (0); 1523 } 1524 1525 static void 1526 consumeFromBuffer(struct ct_data *ct, unsigned int nBytes) 1527 { 1528 ct->ct_bufferPendingSize -= nBytes; 1529 if (ct->ct_bufferPendingSize == 0) { 1530 /* 1531 * If the buffer contains no data, we set the two pointers at 1532 * the beginning of the buffer (to miminize buffer wraps). 1533 */ 1534 ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer; 1535 } else { 1536 ct->ct_bufferReadPtr += nBytes; 1537 if (ct->ct_bufferReadPtr > 1538 ct->ct_buffer + ct->ct_bufferSize) { 1539 ct->ct_bufferReadPtr -= ct->ct_bufferSize; 1540 } 1541 } 1542 } 1543 1544 static int 1545 iovFromBuffer(struct ct_data *ct, struct iovec *iov) 1546 { 1547 int l; 1548 1549 if (ct->ct_bufferPendingSize == 0) 1550 return (0); 1551 1552 l = REMAIN_BYTES(bufferReadPtr); 1553 if (l < ct->ct_bufferPendingSize) { 1554 /* Buffer in two fragments. */ 1555 iov[0].iov_base = ct->ct_bufferReadPtr; 1556 iov[0].iov_len = l; 1557 1558 iov[1].iov_base = ct->ct_buffer; 1559 iov[1].iov_len = ct->ct_bufferPendingSize - l; 1560 return (2); 1561 } else { 1562 /* Buffer in one fragment. */ 1563 iov[0].iov_base = ct->ct_bufferReadPtr; 1564 iov[0].iov_len = ct->ct_bufferPendingSize; 1565 return (1); 1566 } 1567 } 1568 1569 static bool_t 1570 set_flush_mode(struct ct_data *ct, int mode) 1571 { 1572 switch (mode) { 1573 case RPC_CL_BLOCKING_FLUSH: 1574 /* flush as most as possible without blocking */ 1575 case RPC_CL_BESTEFFORT_FLUSH: 1576 /* flush the buffer completely (possibly blocking) */ 1577 case RPC_CL_DEFAULT_FLUSH: 1578 /* flush according to the currently defined policy */ 1579 ct->ct_blocking_mode = mode; 1580 return (TRUE); 1581 default: 1582 return (FALSE); 1583 } 1584 } 1585 1586 static bool_t 1587 set_io_mode(struct ct_data *ct, int ioMode) 1588 { 1589 switch (ioMode) { 1590 case RPC_CL_BLOCKING: 1591 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) { 1592 if (NULL != ct->ct_buffer) { 1593 /* 1594 * If a buffer was allocated for this 1595 * connection, flush it now, and free it. 1596 */ 1597 (void) do_flush(ct, RPC_CL_BLOCKING_FLUSH); 1598 free(ct->ct_buffer); 1599 ct->ct_buffer = NULL; 1600 } 1601 (void) unregister_nb(ct); 1602 ct->ct_io_mode = ioMode; 1603 } 1604 break; 1605 case RPC_CL_NONBLOCKING: 1606 if (ct->ct_io_mode == RPC_CL_BLOCKING) { 1607 if (-1 == register_nb(ct)) { 1608 return (FALSE); 1609 } 1610 ct->ct_io_mode = ioMode; 1611 } 1612 break; 1613 default: 1614 return (FALSE); 1615 } 1616 return (TRUE); 1617 } 1618 1619 static int 1620 do_flush(struct ct_data *ct, uint_t flush_mode) 1621 { 1622 int result; 1623 if (ct->ct_bufferPendingSize == 0) { 1624 return (0); 1625 } 1626 1627 switch (flush_mode) { 1628 case RPC_CL_BLOCKING_FLUSH: 1629 if (!set_blocking_connection(ct, TRUE)) { 1630 return (-1); 1631 } 1632 while (ct->ct_bufferPendingSize > 0) { 1633 if (REMAIN_BYTES(bufferReadPtr) < 1634 ct->ct_bufferPendingSize) { 1635 struct iovec iov[2]; 1636 (void) iovFromBuffer(ct, iov); 1637 result = writev(ct->ct_fd, iov, 2); 1638 } else { 1639 result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr, 1640 ct->ct_bufferPendingSize, 0); 1641 } 1642 if (result < 0) { 1643 return (-1); 1644 } 1645 consumeFromBuffer(ct, result); 1646 } 1647 1648 break; 1649 1650 case RPC_CL_BESTEFFORT_FLUSH: 1651 (void) set_blocking_connection(ct, FALSE); 1652 if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) { 1653 struct iovec iov[2]; 1654 (void) iovFromBuffer(ct, iov); 1655 result = writev(ct->ct_fd, iov, 2); 1656 } else { 1657 result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr, 1658 ct->ct_bufferPendingSize, 0); 1659 } 1660 if (result < 0) { 1661 if (errno != EWOULDBLOCK) { 1662 perror("flush"); 1663 return (-1); 1664 } 1665 return (0); 1666 } 1667 if (result > 0) 1668 consumeFromBuffer(ct, result); 1669 break; 1670 } 1671 return (0); 1672 } 1673 1674 /* 1675 * Non blocking send. 1676 */ 1677 1678 static int 1679 nb_send(struct ct_data *ct, void *buff, unsigned int nBytes) 1680 { 1681 int result; 1682 1683 if (!(ntohl(*(uint32_t *)buff) & 2^31)) { 1684 return (-1); 1685 } 1686 1687 /* 1688 * Check to see if the current message can be stored fully in the 1689 * buffer. We have to check this now because it may be impossible 1690 * to send any data, so the message must be stored in the buffer. 1691 */ 1692 if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) { 1693 /* Try to flush (to free some space). */ 1694 (void) do_flush(ct, RPC_CL_BESTEFFORT_FLUSH); 1695 1696 /* Can we store the message now ? */ 1697 if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) 1698 return (-2); 1699 } 1700 1701 (void) set_blocking_connection(ct, FALSE); 1702 1703 /* 1704 * If there is no data pending, we can simply try 1705 * to send our data. 1706 */ 1707 if (ct->ct_bufferPendingSize == 0) { 1708 result = t_snd(ct->ct_fd, buff, nBytes, 0); 1709 if (result == -1) { 1710 if (errno == EWOULDBLOCK) { 1711 result = 0; 1712 } else { 1713 perror("send"); 1714 return (-1); 1715 } 1716 } 1717 /* 1718 * If we have not sent all data, we must store them 1719 * in the buffer. 1720 */ 1721 if (result != nBytes) { 1722 if (addInBuffer(ct, (char *)buff + result, 1723 nBytes - result) == -1) { 1724 return (-1); 1725 } 1726 } 1727 } else { 1728 /* 1729 * Some data pending in the buffer. We try to send 1730 * both buffer data and current message in one shot. 1731 */ 1732 struct iovec iov[3]; 1733 int i = iovFromBuffer(ct, &iov[0]); 1734 1735 iov[i].iov_base = buff; 1736 iov[i].iov_len = nBytes; 1737 1738 result = writev(ct->ct_fd, iov, i+1); 1739 if (result == -1) { 1740 if (errno == EWOULDBLOCK) { 1741 /* No bytes sent */ 1742 result = 0; 1743 } else { 1744 return (-1); 1745 } 1746 } 1747 1748 /* 1749 * Add the bytes from the message 1750 * that we have not sent. 1751 */ 1752 if (result <= ct->ct_bufferPendingSize) { 1753 /* No bytes from the message sent */ 1754 consumeFromBuffer(ct, result); 1755 if (addInBuffer(ct, buff, nBytes) == -1) { 1756 return (-1); 1757 } 1758 } else { 1759 /* 1760 * Some bytes of the message are sent. 1761 * Compute the length of the message that has 1762 * been sent. 1763 */ 1764 int len = result - ct->ct_bufferPendingSize; 1765 1766 /* So, empty the buffer. */ 1767 ct->ct_bufferReadPtr = ct->ct_buffer; 1768 ct->ct_bufferWritePtr = ct->ct_buffer; 1769 ct->ct_bufferPendingSize = 0; 1770 1771 /* And add the remaining part of the message. */ 1772 if (len != nBytes) { 1773 if (addInBuffer(ct, (char *)buff + len, 1774 nBytes-len) == -1) { 1775 return (-1); 1776 } 1777 } 1778 } 1779 } 1780 return (nBytes); 1781 } 1782 1783 static void 1784 flush_registered_clients(void) 1785 { 1786 struct nb_reg_node *node; 1787 1788 if (LIST_ISEMPTY(nb_first)) { 1789 return; 1790 } 1791 1792 LIST_FOR_EACH(nb_first, node) { 1793 (void) do_flush(node->ct, RPC_CL_BLOCKING_FLUSH); 1794 } 1795 } 1796 1797 static int 1798 allocate_chunk(void) 1799 { 1800 #define CHUNK_SIZE 16 1801 struct nb_reg_node *chk = 1802 malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE); 1803 struct nb_reg_node *n; 1804 int i; 1805 1806 if (NULL == chk) { 1807 return (-1); 1808 } 1809 1810 n = chk; 1811 for (i = 0; i < CHUNK_SIZE-1; ++i) { 1812 n[i].next = &(n[i+1]); 1813 } 1814 n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free; 1815 nb_free = chk; 1816 return (0); 1817 } 1818 1819 static int 1820 register_nb(struct ct_data *ct) 1821 { 1822 struct nb_reg_node *node; 1823 1824 (void) mutex_lock(&nb_list_mutex); 1825 1826 if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) { 1827 (void) mutex_unlock(&nb_list_mutex); 1828 errno = ENOMEM; 1829 return (-1); 1830 } 1831 1832 if (!exit_handler_set) { 1833 (void) atexit(flush_registered_clients); 1834 exit_handler_set = TRUE; 1835 } 1836 /* Get the first free node */ 1837 LIST_EXTRACT(nb_free, node); 1838 1839 node->ct = ct; 1840 1841 LIST_ADD(nb_first, node); 1842 (void) mutex_unlock(&nb_list_mutex); 1843 1844 return (0); 1845 } 1846 1847 static int 1848 unregister_nb(struct ct_data *ct) 1849 { 1850 struct nb_reg_node *node; 1851 1852 (void) mutex_lock(&nb_list_mutex); 1853 assert(!LIST_ISEMPTY(nb_first)); 1854 1855 node = nb_first; 1856 LIST_FOR_EACH(nb_first, node) { 1857 if (node->next->ct == ct) { 1858 /* Get the node to unregister. */ 1859 struct nb_reg_node *n = node->next; 1860 node->next = n->next; 1861 1862 n->ct = NULL; 1863 LIST_ADD(nb_free, n); 1864 break; 1865 } 1866 } 1867 (void) mutex_unlock(&nb_list_mutex); 1868 return (0); 1869 } 1870