1 /* $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */ 2 3 /*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #if defined(LIBC_SCCS) && !defined(lint) 32 static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro"; 33 static char *sccsid = "@(#)clnt_tcp.c 2.2 88/08/01 4.0 RPCSRC"; 34 static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 35 #endif 36 #include <sys/cdefs.h> 37 __FBSDID("$FreeBSD$"); 38 39 /* 40 * clnt_tcp.c, Implements a TCP/IP based, client side RPC. 41 * 42 * Copyright (C) 1984, Sun Microsystems, Inc. 43 * 44 * TCP based RPC supports 'batched calls'. 45 * A sequence of calls may be batched-up in a send buffer. The rpc call 46 * return immediately to the client even though the call was not necessarily 47 * sent. The batching occurs if the results' xdr routine is NULL (0) AND 48 * the rpc timeout value is zero (see clnt.h, rpc). 49 * 50 * Clients should NOT casually batch calls that in fact return results; that is, 51 * the server side should be aware that a call is batched and not produce any 52 * return message. Batched calls that produce many result messages can 53 * deadlock (netlock) the client and the server.... 54 * 55 * Now go hang yourself. 56 */ 57 58 #include <sys/param.h> 59 #include <sys/systm.h> 60 #include <sys/lock.h> 61 #include <sys/malloc.h> 62 #include <sys/mbuf.h> 63 #include <sys/mutex.h> 64 #include <sys/pcpu.h> 65 #include <sys/proc.h> 66 #include <sys/protosw.h> 67 #include <sys/socket.h> 68 #include <sys/socketvar.h> 69 #include <sys/sx.h> 70 #include <sys/syslog.h> 71 #include <sys/time.h> 72 #include <sys/uio.h> 73 74 #include <net/vnet.h> 75 76 #include <netinet/tcp.h> 77 78 #include <rpc/rpc.h> 79 #include <rpc/rpc_com.h> 80 #include <rpc/krpc.h> 81 82 struct cmessage { 83 struct cmsghdr cmsg; 84 struct cmsgcred cmcred; 85 }; 86 87 static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *, 88 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 89 static void clnt_vc_geterr(CLIENT *, struct rpc_err *); 90 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *); 91 static void clnt_vc_abort(CLIENT *); 92 static bool_t clnt_vc_control(CLIENT *, u_int, void *); 93 static void clnt_vc_close(CLIENT *); 94 static void clnt_vc_destroy(CLIENT *); 95 static bool_t time_not_ok(struct timeval *); 96 static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag); 97 98 static struct clnt_ops clnt_vc_ops = { 99 .cl_call = clnt_vc_call, 100 .cl_abort = clnt_vc_abort, 101 .cl_geterr = clnt_vc_geterr, 102 .cl_freeres = clnt_vc_freeres, 103 .cl_close = clnt_vc_close, 104 .cl_destroy = clnt_vc_destroy, 105 .cl_control = clnt_vc_control 106 }; 107 108 static void clnt_vc_upcallsdone(struct ct_data *); 109 110 static const char clnt_vc_errstr[] = "%s : %s"; 111 static const char clnt_vc_str[] = "clnt_vc_create"; 112 static const char clnt_read_vc_str[] = "read_vc"; 113 static const char __no_mem_str[] = "out of memory"; 114 115 /* 116 * Create a client handle for a connection. 117 * Default options are set, which the user can change using clnt_control()'s. 118 * The rpc/vc package does buffering similar to stdio, so the client 119 * must pick send and receive buffer sizes, 0 => use the default. 120 * NB: fd is copied into a private area. 121 * NB: The rpch->cl_auth is set null authentication. Caller may wish to 122 * set this something more useful. 123 * 124 * fd should be an open socket 125 */ 126 CLIENT * 127 clnt_vc_create( 128 struct socket *so, /* open file descriptor */ 129 struct sockaddr *raddr, /* servers address */ 130 const rpcprog_t prog, /* program number */ 131 const rpcvers_t vers, /* version number */ 132 size_t sendsz, /* buffer recv size */ 133 size_t recvsz, /* buffer send size */ 134 int intrflag) /* interruptible */ 135 { 136 CLIENT *cl; /* client handle */ 137 struct ct_data *ct = NULL; /* client handle */ 138 struct timeval now; 139 struct rpc_msg call_msg; 140 static uint32_t disrupt; 141 struct __rpc_sockinfo si; 142 XDR xdrs; 143 int error, interrupted, one = 1, sleep_flag; 144 struct sockopt sopt; 145 146 if (disrupt == 0) 147 disrupt = (uint32_t)(long)raddr; 148 149 cl = (CLIENT *)mem_alloc(sizeof (*cl)); 150 ct = (struct ct_data *)mem_alloc(sizeof (*ct)); 151 152 mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF); 153 ct->ct_threads = 0; 154 ct->ct_closing = FALSE; 155 ct->ct_closed = FALSE; 156 ct->ct_upcallrefs = 0; 157 158 if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) { 159 error = soconnect(so, raddr, curthread); 160 SOCK_LOCK(so); 161 interrupted = 0; 162 sleep_flag = PSOCK; 163 if (intrflag != 0) 164 sleep_flag |= PCATCH; 165 while ((so->so_state & SS_ISCONNECTING) 166 && so->so_error == 0) { 167 error = msleep(&so->so_timeo, SOCK_MTX(so), 168 sleep_flag, "connec", 0); 169 if (error) { 170 if (error == EINTR || error == ERESTART) 171 interrupted = 1; 172 break; 173 } 174 } 175 if (error == 0) { 176 error = so->so_error; 177 so->so_error = 0; 178 } 179 SOCK_UNLOCK(so); 180 if (error) { 181 if (!interrupted) 182 so->so_state &= ~SS_ISCONNECTING; 183 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 184 rpc_createerr.cf_error.re_errno = error; 185 goto err; 186 } 187 } 188 189 if (!__rpc_socket2sockinfo(so, &si)) { 190 goto err; 191 } 192 193 if (so->so_proto->pr_flags & PR_CONNREQUIRED) { 194 bzero(&sopt, sizeof(sopt)); 195 sopt.sopt_dir = SOPT_SET; 196 sopt.sopt_level = SOL_SOCKET; 197 sopt.sopt_name = SO_KEEPALIVE; 198 sopt.sopt_val = &one; 199 sopt.sopt_valsize = sizeof(one); 200 sosetopt(so, &sopt); 201 } 202 203 if (so->so_proto->pr_protocol == IPPROTO_TCP) { 204 bzero(&sopt, sizeof(sopt)); 205 sopt.sopt_dir = SOPT_SET; 206 sopt.sopt_level = IPPROTO_TCP; 207 sopt.sopt_name = TCP_NODELAY; 208 sopt.sopt_val = &one; 209 sopt.sopt_valsize = sizeof(one); 210 sosetopt(so, &sopt); 211 } 212 213 ct->ct_closeit = FALSE; 214 215 /* 216 * Set up private data struct 217 */ 218 ct->ct_socket = so; 219 ct->ct_wait.tv_sec = -1; 220 ct->ct_wait.tv_usec = -1; 221 memcpy(&ct->ct_addr, raddr, raddr->sa_len); 222 223 /* 224 * Initialize call message 225 */ 226 getmicrotime(&now); 227 ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now); 228 call_msg.rm_xid = ct->ct_xid; 229 call_msg.rm_direction = CALL; 230 call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION; 231 call_msg.rm_call.cb_prog = (uint32_t)prog; 232 call_msg.rm_call.cb_vers = (uint32_t)vers; 233 234 /* 235 * pre-serialize the static part of the call msg and stash it away 236 */ 237 xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE, 238 XDR_ENCODE); 239 if (! xdr_callhdr(&xdrs, &call_msg)) { 240 if (ct->ct_closeit) { 241 soclose(ct->ct_socket); 242 } 243 goto err; 244 } 245 ct->ct_mpos = XDR_GETPOS(&xdrs); 246 XDR_DESTROY(&xdrs); 247 ct->ct_waitchan = "rpcrecv"; 248 ct->ct_waitflag = 0; 249 250 /* 251 * Create a client handle which uses xdrrec for serialization 252 * and authnone for authentication. 253 */ 254 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 255 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 256 error = soreserve(ct->ct_socket, sendsz, recvsz); 257 if (error != 0) { 258 if (ct->ct_closeit) { 259 soclose(ct->ct_socket); 260 } 261 goto err; 262 } 263 cl->cl_refs = 1; 264 cl->cl_ops = &clnt_vc_ops; 265 cl->cl_private = ct; 266 cl->cl_auth = authnone_create(); 267 268 SOCKBUF_LOCK(&ct->ct_socket->so_rcv); 269 soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct); 270 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); 271 272 ct->ct_record = NULL; 273 ct->ct_record_resid = 0; 274 TAILQ_INIT(&ct->ct_pending); 275 return (cl); 276 277 err: 278 if (cl) { 279 if (ct) { 280 mtx_destroy(&ct->ct_lock); 281 mem_free(ct, sizeof (struct ct_data)); 282 } 283 if (cl) 284 mem_free(cl, sizeof (CLIENT)); 285 } 286 return ((CLIENT *)NULL); 287 } 288 289 static enum clnt_stat 290 clnt_vc_call( 291 CLIENT *cl, /* client handle */ 292 struct rpc_callextra *ext, /* call metadata */ 293 rpcproc_t proc, /* procedure number */ 294 struct mbuf *args, /* pointer to args */ 295 struct mbuf **resultsp, /* pointer to results */ 296 struct timeval utimeout) 297 { 298 struct ct_data *ct = (struct ct_data *) cl->cl_private; 299 AUTH *auth; 300 struct rpc_err *errp; 301 enum clnt_stat stat; 302 XDR xdrs; 303 struct rpc_msg reply_msg; 304 bool_t ok; 305 int nrefreshes = 2; /* number of times to refresh cred */ 306 struct timeval timeout; 307 uint32_t xid; 308 struct mbuf *mreq = NULL, *results; 309 struct ct_request *cr; 310 int error; 311 312 cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK); 313 314 mtx_lock(&ct->ct_lock); 315 316 if (ct->ct_closing || ct->ct_closed) { 317 mtx_unlock(&ct->ct_lock); 318 free(cr, M_RPC); 319 return (RPC_CANTSEND); 320 } 321 ct->ct_threads++; 322 323 if (ext) { 324 auth = ext->rc_auth; 325 errp = &ext->rc_err; 326 } else { 327 auth = cl->cl_auth; 328 errp = &ct->ct_error; 329 } 330 331 cr->cr_mrep = NULL; 332 cr->cr_error = 0; 333 334 if (ct->ct_wait.tv_usec == -1) { 335 timeout = utimeout; /* use supplied timeout */ 336 } else { 337 timeout = ct->ct_wait; /* use default timeout */ 338 } 339 340 call_again: 341 mtx_assert(&ct->ct_lock, MA_OWNED); 342 343 ct->ct_xid++; 344 xid = ct->ct_xid; 345 346 mtx_unlock(&ct->ct_lock); 347 348 /* 349 * Leave space to pre-pend the record mark. 350 */ 351 mreq = m_gethdr(M_WAITOK, MT_DATA); 352 mreq->m_data += sizeof(uint32_t); 353 KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN, 354 ("RPC header too big")); 355 bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos); 356 mreq->m_len = ct->ct_mpos; 357 358 /* 359 * The XID is the first thing in the request. 360 */ 361 *mtod(mreq, uint32_t *) = htonl(xid); 362 363 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 364 365 errp->re_status = stat = RPC_SUCCESS; 366 367 if ((! XDR_PUTINT32(&xdrs, &proc)) || 368 (! AUTH_MARSHALL(auth, xid, &xdrs, 369 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 370 errp->re_status = stat = RPC_CANTENCODEARGS; 371 mtx_lock(&ct->ct_lock); 372 goto out; 373 } 374 mreq->m_pkthdr.len = m_length(mreq, NULL); 375 376 /* 377 * Prepend a record marker containing the packet length. 378 */ 379 M_PREPEND(mreq, sizeof(uint32_t), M_WAITOK); 380 *mtod(mreq, uint32_t *) = 381 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t))); 382 383 cr->cr_xid = xid; 384 mtx_lock(&ct->ct_lock); 385 /* 386 * Check to see if the other end has already started to close down 387 * the connection. The upcall will have set ct_error.re_status 388 * to RPC_CANTRECV if this is the case. 389 * If the other end starts to close down the connection after this 390 * point, it will be detected later when cr_error is checked, 391 * since the request is in the ct_pending queue. 392 */ 393 if (ct->ct_error.re_status == RPC_CANTRECV) { 394 if (errp != &ct->ct_error) { 395 errp->re_errno = ct->ct_error.re_errno; 396 errp->re_status = RPC_CANTRECV; 397 } 398 stat = RPC_CANTRECV; 399 goto out; 400 } 401 TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link); 402 mtx_unlock(&ct->ct_lock); 403 404 /* 405 * sosend consumes mreq. 406 */ 407 error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread); 408 mreq = NULL; 409 if (error == EMSGSIZE) { 410 SOCKBUF_LOCK(&ct->ct_socket->so_snd); 411 sbwait(&ct->ct_socket->so_snd); 412 SOCKBUF_UNLOCK(&ct->ct_socket->so_snd); 413 AUTH_VALIDATE(auth, xid, NULL, NULL); 414 mtx_lock(&ct->ct_lock); 415 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 416 goto call_again; 417 } 418 419 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 420 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 421 reply_msg.acpted_rply.ar_verf.oa_length = 0; 422 reply_msg.acpted_rply.ar_results.where = NULL; 423 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 424 425 mtx_lock(&ct->ct_lock); 426 if (error) { 427 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 428 errp->re_errno = error; 429 errp->re_status = stat = RPC_CANTSEND; 430 goto out; 431 } 432 433 /* 434 * Check to see if we got an upcall while waiting for the 435 * lock. In both these cases, the request has been removed 436 * from ct->ct_pending. 437 */ 438 if (cr->cr_error) { 439 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 440 errp->re_errno = cr->cr_error; 441 errp->re_status = stat = RPC_CANTRECV; 442 goto out; 443 } 444 if (cr->cr_mrep) { 445 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 446 goto got_reply; 447 } 448 449 /* 450 * Hack to provide rpc-based message passing 451 */ 452 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { 453 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 454 errp->re_status = stat = RPC_TIMEDOUT; 455 goto out; 456 } 457 458 error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan, 459 tvtohz(&timeout)); 460 461 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 462 463 if (error) { 464 /* 465 * The sleep returned an error so our request is still 466 * on the list. Turn the error code into an 467 * appropriate client status. 468 */ 469 errp->re_errno = error; 470 switch (error) { 471 case EINTR: 472 stat = RPC_INTR; 473 break; 474 case EWOULDBLOCK: 475 stat = RPC_TIMEDOUT; 476 break; 477 default: 478 stat = RPC_CANTRECV; 479 } 480 errp->re_status = stat; 481 goto out; 482 } else { 483 /* 484 * We were woken up by the upcall. If the 485 * upcall had a receive error, report that, 486 * otherwise we have a reply. 487 */ 488 if (cr->cr_error) { 489 errp->re_errno = cr->cr_error; 490 errp->re_status = stat = RPC_CANTRECV; 491 goto out; 492 } 493 } 494 495 got_reply: 496 /* 497 * Now decode and validate the response. We need to drop the 498 * lock since xdr_replymsg may end up sleeping in malloc. 499 */ 500 mtx_unlock(&ct->ct_lock); 501 502 if (ext && ext->rc_feedback) 503 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 504 505 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 506 ok = xdr_replymsg(&xdrs, &reply_msg); 507 cr->cr_mrep = NULL; 508 509 if (ok) { 510 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 511 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 512 errp->re_status = stat = RPC_SUCCESS; 513 else 514 stat = _seterr_reply(&reply_msg, errp); 515 516 if (stat == RPC_SUCCESS) { 517 results = xdrmbuf_getall(&xdrs); 518 if (!AUTH_VALIDATE(auth, xid, 519 &reply_msg.acpted_rply.ar_verf, 520 &results)) { 521 errp->re_status = stat = RPC_AUTHERROR; 522 errp->re_why = AUTH_INVALIDRESP; 523 } else { 524 KASSERT(results, 525 ("auth validated but no result")); 526 *resultsp = results; 527 } 528 } /* end successful completion */ 529 /* 530 * If unsuccesful AND error is an authentication error 531 * then refresh credentials and try again, else break 532 */ 533 else if (stat == RPC_AUTHERROR) 534 /* maybe our credentials need to be refreshed ... */ 535 if (nrefreshes > 0 && 536 AUTH_REFRESH(auth, &reply_msg)) { 537 nrefreshes--; 538 XDR_DESTROY(&xdrs); 539 mtx_lock(&ct->ct_lock); 540 goto call_again; 541 } 542 /* end of unsuccessful completion */ 543 } /* end of valid reply message */ 544 else { 545 errp->re_status = stat = RPC_CANTDECODERES; 546 } 547 XDR_DESTROY(&xdrs); 548 mtx_lock(&ct->ct_lock); 549 out: 550 mtx_assert(&ct->ct_lock, MA_OWNED); 551 552 KASSERT(stat != RPC_SUCCESS || *resultsp, 553 ("RPC_SUCCESS without reply")); 554 555 if (mreq) 556 m_freem(mreq); 557 if (cr->cr_mrep) 558 m_freem(cr->cr_mrep); 559 560 ct->ct_threads--; 561 if (ct->ct_closing) 562 wakeup(ct); 563 564 mtx_unlock(&ct->ct_lock); 565 566 if (auth && stat != RPC_SUCCESS) 567 AUTH_VALIDATE(auth, xid, NULL, NULL); 568 569 free(cr, M_RPC); 570 571 return (stat); 572 } 573 574 static void 575 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp) 576 { 577 struct ct_data *ct = (struct ct_data *) cl->cl_private; 578 579 *errp = ct->ct_error; 580 } 581 582 static bool_t 583 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 584 { 585 XDR xdrs; 586 bool_t dummy; 587 588 xdrs.x_op = XDR_FREE; 589 dummy = (*xdr_res)(&xdrs, res_ptr); 590 591 return (dummy); 592 } 593 594 /*ARGSUSED*/ 595 static void 596 clnt_vc_abort(CLIENT *cl) 597 { 598 } 599 600 static bool_t 601 clnt_vc_control(CLIENT *cl, u_int request, void *info) 602 { 603 struct ct_data *ct = (struct ct_data *)cl->cl_private; 604 void *infop = info; 605 SVCXPRT *xprt; 606 607 mtx_lock(&ct->ct_lock); 608 609 switch (request) { 610 case CLSET_FD_CLOSE: 611 ct->ct_closeit = TRUE; 612 mtx_unlock(&ct->ct_lock); 613 return (TRUE); 614 case CLSET_FD_NCLOSE: 615 ct->ct_closeit = FALSE; 616 mtx_unlock(&ct->ct_lock); 617 return (TRUE); 618 default: 619 break; 620 } 621 622 /* for other requests which use info */ 623 if (info == NULL) { 624 mtx_unlock(&ct->ct_lock); 625 return (FALSE); 626 } 627 switch (request) { 628 case CLSET_TIMEOUT: 629 if (time_not_ok((struct timeval *)info)) { 630 mtx_unlock(&ct->ct_lock); 631 return (FALSE); 632 } 633 ct->ct_wait = *(struct timeval *)infop; 634 break; 635 case CLGET_TIMEOUT: 636 *(struct timeval *)infop = ct->ct_wait; 637 break; 638 case CLGET_SERVER_ADDR: 639 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len); 640 break; 641 case CLGET_SVC_ADDR: 642 /* 643 * Slightly different semantics to userland - we use 644 * sockaddr instead of netbuf. 645 */ 646 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len); 647 break; 648 case CLSET_SVC_ADDR: /* set to new address */ 649 mtx_unlock(&ct->ct_lock); 650 return (FALSE); 651 case CLGET_XID: 652 *(uint32_t *)info = ct->ct_xid; 653 break; 654 case CLSET_XID: 655 /* This will set the xid of the NEXT call */ 656 /* decrement by 1 as clnt_vc_call() increments once */ 657 ct->ct_xid = *(uint32_t *)info - 1; 658 break; 659 case CLGET_VERS: 660 /* 661 * This RELIES on the information that, in the call body, 662 * the version number field is the fifth field from the 663 * begining of the RPC header. MUST be changed if the 664 * call_struct is changed 665 */ 666 *(uint32_t *)info = 667 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc + 668 4 * BYTES_PER_XDR_UNIT)); 669 break; 670 671 case CLSET_VERS: 672 *(uint32_t *)(void *)(ct->ct_mcallc + 673 4 * BYTES_PER_XDR_UNIT) = 674 htonl(*(uint32_t *)info); 675 break; 676 677 case CLGET_PROG: 678 /* 679 * This RELIES on the information that, in the call body, 680 * the program number field is the fourth field from the 681 * begining of the RPC header. MUST be changed if the 682 * call_struct is changed 683 */ 684 *(uint32_t *)info = 685 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc + 686 3 * BYTES_PER_XDR_UNIT)); 687 break; 688 689 case CLSET_PROG: 690 *(uint32_t *)(void *)(ct->ct_mcallc + 691 3 * BYTES_PER_XDR_UNIT) = 692 htonl(*(uint32_t *)info); 693 break; 694 695 case CLSET_WAITCHAN: 696 ct->ct_waitchan = (const char *)info; 697 break; 698 699 case CLGET_WAITCHAN: 700 *(const char **) info = ct->ct_waitchan; 701 break; 702 703 case CLSET_INTERRUPTIBLE: 704 if (*(int *) info) 705 ct->ct_waitflag = PCATCH; 706 else 707 ct->ct_waitflag = 0; 708 break; 709 710 case CLGET_INTERRUPTIBLE: 711 if (ct->ct_waitflag) 712 *(int *) info = TRUE; 713 else 714 *(int *) info = FALSE; 715 break; 716 717 case CLSET_BACKCHANNEL: 718 xprt = (SVCXPRT *)info; 719 if (ct->ct_backchannelxprt == NULL) { 720 xprt->xp_p2 = ct; 721 ct->ct_backchannelxprt = xprt; 722 } 723 break; 724 725 default: 726 mtx_unlock(&ct->ct_lock); 727 return (FALSE); 728 } 729 730 mtx_unlock(&ct->ct_lock); 731 return (TRUE); 732 } 733 734 static void 735 clnt_vc_close(CLIENT *cl) 736 { 737 struct ct_data *ct = (struct ct_data *) cl->cl_private; 738 struct ct_request *cr; 739 740 mtx_lock(&ct->ct_lock); 741 742 if (ct->ct_closed) { 743 mtx_unlock(&ct->ct_lock); 744 return; 745 } 746 747 if (ct->ct_closing) { 748 while (ct->ct_closing) 749 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0); 750 KASSERT(ct->ct_closed, ("client should be closed")); 751 mtx_unlock(&ct->ct_lock); 752 return; 753 } 754 755 if (ct->ct_socket) { 756 ct->ct_closing = TRUE; 757 mtx_unlock(&ct->ct_lock); 758 759 SOCKBUF_LOCK(&ct->ct_socket->so_rcv); 760 soupcall_clear(ct->ct_socket, SO_RCV); 761 clnt_vc_upcallsdone(ct); 762 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); 763 764 /* 765 * Abort any pending requests and wait until everyone 766 * has finished with clnt_vc_call. 767 */ 768 mtx_lock(&ct->ct_lock); 769 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { 770 cr->cr_xid = 0; 771 cr->cr_error = ESHUTDOWN; 772 wakeup(cr); 773 } 774 775 while (ct->ct_threads) 776 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0); 777 } 778 779 ct->ct_closing = FALSE; 780 ct->ct_closed = TRUE; 781 mtx_unlock(&ct->ct_lock); 782 wakeup(ct); 783 } 784 785 static void 786 clnt_vc_destroy(CLIENT *cl) 787 { 788 struct ct_data *ct = (struct ct_data *) cl->cl_private; 789 struct socket *so = NULL; 790 SVCXPRT *xprt; 791 792 clnt_vc_close(cl); 793 794 mtx_lock(&ct->ct_lock); 795 xprt = ct->ct_backchannelxprt; 796 ct->ct_backchannelxprt = NULL; 797 if (xprt != NULL) { 798 mtx_unlock(&ct->ct_lock); /* To avoid a LOR. */ 799 sx_xlock(&xprt->xp_lock); 800 mtx_lock(&ct->ct_lock); 801 xprt->xp_p2 = NULL; 802 xprt_unregister(xprt); 803 } 804 805 if (ct->ct_socket) { 806 if (ct->ct_closeit) { 807 so = ct->ct_socket; 808 } 809 } 810 811 mtx_unlock(&ct->ct_lock); 812 if (xprt != NULL) { 813 sx_xunlock(&xprt->xp_lock); 814 SVC_RELEASE(xprt); 815 } 816 817 mtx_destroy(&ct->ct_lock); 818 if (so) { 819 soshutdown(so, SHUT_WR); 820 soclose(so); 821 } 822 mem_free(ct, sizeof(struct ct_data)); 823 if (cl->cl_netid && cl->cl_netid[0]) 824 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 825 if (cl->cl_tp && cl->cl_tp[0]) 826 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 827 mem_free(cl, sizeof(CLIENT)); 828 } 829 830 /* 831 * Make sure that the time is not garbage. -1 value is disallowed. 832 * Note this is different from time_not_ok in clnt_dg.c 833 */ 834 static bool_t 835 time_not_ok(struct timeval *t) 836 { 837 return (t->tv_sec <= -1 || t->tv_sec > 100000000 || 838 t->tv_usec <= -1 || t->tv_usec > 1000000); 839 } 840 841 int 842 clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) 843 { 844 struct ct_data *ct = (struct ct_data *) arg; 845 struct uio uio; 846 struct mbuf *m, *m2; 847 struct ct_request *cr; 848 int error, rcvflag, foundreq; 849 uint32_t xid_plus_direction[2], header; 850 bool_t do_read; 851 SVCXPRT *xprt; 852 struct cf_conn *cd; 853 854 CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t)); 855 ct->ct_upcallrefs++; 856 uio.uio_td = curthread; 857 do { 858 /* 859 * If ct_record_resid is zero, we are waiting for a 860 * record mark. 861 */ 862 if (ct->ct_record_resid == 0) { 863 864 /* 865 * Make sure there is either a whole record 866 * mark in the buffer or there is some other 867 * error condition 868 */ 869 do_read = FALSE; 870 if (so->so_rcv.sb_cc >= sizeof(uint32_t) 871 || (so->so_rcv.sb_state & SBS_CANTRCVMORE) 872 || so->so_error) 873 do_read = TRUE; 874 875 if (!do_read) 876 break; 877 878 SOCKBUF_UNLOCK(&so->so_rcv); 879 uio.uio_resid = sizeof(uint32_t); 880 m = NULL; 881 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; 882 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 883 SOCKBUF_LOCK(&so->so_rcv); 884 885 if (error == EWOULDBLOCK) 886 break; 887 888 /* 889 * If there was an error, wake up all pending 890 * requests. 891 */ 892 if (error || uio.uio_resid > 0) { 893 wakeup_all: 894 mtx_lock(&ct->ct_lock); 895 if (!error) { 896 /* 897 * We must have got EOF trying 898 * to read from the stream. 899 */ 900 error = ECONNRESET; 901 } 902 ct->ct_error.re_status = RPC_CANTRECV; 903 ct->ct_error.re_errno = error; 904 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { 905 cr->cr_error = error; 906 wakeup(cr); 907 } 908 mtx_unlock(&ct->ct_lock); 909 break; 910 } 911 m_copydata(m, 0, sizeof(uint32_t), (char *)&header); 912 header = ntohl(header); 913 ct->ct_record = NULL; 914 ct->ct_record_resid = header & 0x7fffffff; 915 ct->ct_record_eor = ((header & 0x80000000) != 0); 916 m_freem(m); 917 } else { 918 /* 919 * Wait until the socket has the whole record 920 * buffered. 921 */ 922 do_read = FALSE; 923 if (so->so_rcv.sb_cc >= ct->ct_record_resid 924 || (so->so_rcv.sb_state & SBS_CANTRCVMORE) 925 || so->so_error) 926 do_read = TRUE; 927 928 if (!do_read) 929 break; 930 931 /* 932 * We have the record mark. Read as much as 933 * the socket has buffered up to the end of 934 * this record. 935 */ 936 SOCKBUF_UNLOCK(&so->so_rcv); 937 uio.uio_resid = ct->ct_record_resid; 938 m = NULL; 939 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; 940 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 941 SOCKBUF_LOCK(&so->so_rcv); 942 943 if (error == EWOULDBLOCK) 944 break; 945 946 if (error || uio.uio_resid == ct->ct_record_resid) 947 goto wakeup_all; 948 949 /* 950 * If we have part of the record already, 951 * chain this bit onto the end. 952 */ 953 if (ct->ct_record) 954 m_last(ct->ct_record)->m_next = m; 955 else 956 ct->ct_record = m; 957 958 ct->ct_record_resid = uio.uio_resid; 959 960 /* 961 * If we have the entire record, see if we can 962 * match it to a request. 963 */ 964 if (ct->ct_record_resid == 0 965 && ct->ct_record_eor) { 966 /* 967 * The XID is in the first uint32_t of 968 * the reply and the message direction 969 * is the second one. 970 */ 971 if (ct->ct_record->m_len < 972 sizeof(xid_plus_direction) && 973 m_length(ct->ct_record, NULL) < 974 sizeof(xid_plus_direction)) { 975 m_freem(ct->ct_record); 976 break; 977 } 978 m_copydata(ct->ct_record, 0, 979 sizeof(xid_plus_direction), 980 (char *)xid_plus_direction); 981 xid_plus_direction[0] = 982 ntohl(xid_plus_direction[0]); 983 xid_plus_direction[1] = 984 ntohl(xid_plus_direction[1]); 985 /* Check message direction. */ 986 if (xid_plus_direction[1] == CALL) { 987 /* This is a backchannel request. */ 988 mtx_lock(&ct->ct_lock); 989 xprt = ct->ct_backchannelxprt; 990 if (xprt == NULL) { 991 mtx_unlock(&ct->ct_lock); 992 /* Just throw it away. */ 993 m_freem(ct->ct_record); 994 ct->ct_record = NULL; 995 } else { 996 cd = (struct cf_conn *) 997 xprt->xp_p1; 998 m2 = cd->mreq; 999 /* 1000 * The requests are chained 1001 * in the m_nextpkt list. 1002 */ 1003 while (m2 != NULL && 1004 m2->m_nextpkt != NULL) 1005 /* Find end of list. */ 1006 m2 = m2->m_nextpkt; 1007 if (m2 != NULL) 1008 m2->m_nextpkt = 1009 ct->ct_record; 1010 else 1011 cd->mreq = 1012 ct->ct_record; 1013 ct->ct_record->m_nextpkt = 1014 NULL; 1015 ct->ct_record = NULL; 1016 xprt_active(xprt); 1017 mtx_unlock(&ct->ct_lock); 1018 } 1019 } else { 1020 mtx_lock(&ct->ct_lock); 1021 foundreq = 0; 1022 TAILQ_FOREACH(cr, &ct->ct_pending, 1023 cr_link) { 1024 if (cr->cr_xid == 1025 xid_plus_direction[0]) { 1026 /* 1027 * This one 1028 * matches. We leave 1029 * the reply mbuf in 1030 * cr->cr_mrep. Set 1031 * the XID to zero so 1032 * that we will ignore 1033 * any duplicated 1034 * replies. 1035 */ 1036 cr->cr_xid = 0; 1037 cr->cr_mrep = 1038 ct->ct_record; 1039 cr->cr_error = 0; 1040 foundreq = 1; 1041 wakeup(cr); 1042 break; 1043 } 1044 } 1045 mtx_unlock(&ct->ct_lock); 1046 1047 if (!foundreq) 1048 m_freem(ct->ct_record); 1049 ct->ct_record = NULL; 1050 } 1051 } 1052 } 1053 } while (m); 1054 ct->ct_upcallrefs--; 1055 if (ct->ct_upcallrefs < 0) 1056 panic("rpcvc upcall refcnt"); 1057 if (ct->ct_upcallrefs == 0) 1058 wakeup(&ct->ct_upcallrefs); 1059 return (SU_OK); 1060 } 1061 1062 /* 1063 * Wait for all upcalls in progress to complete. 1064 */ 1065 static void 1066 clnt_vc_upcallsdone(struct ct_data *ct) 1067 { 1068 1069 SOCKBUF_LOCK_ASSERT(&ct->ct_socket->so_rcv); 1070 1071 while (ct->ct_upcallrefs > 0) 1072 (void) msleep(&ct->ct_upcallrefs, 1073 SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0); 1074 } 1075