1 /* $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */ 2 3 /*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #if defined(LIBC_SCCS) && !defined(lint) 32 static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro"; 33 static char *sccsid = "@(#)clnt_tcp.c 2.2 88/08/01 4.0 RPCSRC"; 34 static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 35 #endif 36 #include <sys/cdefs.h> 37 __FBSDID("$FreeBSD$"); 38 39 /* 40 * clnt_tcp.c, Implements a TCP/IP based, client side RPC. 41 * 42 * Copyright (C) 1984, Sun Microsystems, Inc. 43 * 44 * TCP based RPC supports 'batched calls'. 45 * A sequence of calls may be batched-up in a send buffer. The rpc call 46 * return immediately to the client even though the call was not necessarily 47 * sent. The batching occurs if the results' xdr routine is NULL (0) AND 48 * the rpc timeout value is zero (see clnt.h, rpc). 49 * 50 * Clients should NOT casually batch calls that in fact return results; that is, 51 * the server side should be aware that a call is batched and not produce any 52 * return message. Batched calls that produce many result messages can 53 * deadlock (netlock) the client and the server.... 54 * 55 * Now go hang yourself. 56 */ 57 58 #include <sys/param.h> 59 #include <sys/systm.h> 60 #include <sys/kernel.h> 61 #include <sys/lock.h> 62 #include <sys/malloc.h> 63 #include <sys/mbuf.h> 64 #include <sys/mutex.h> 65 #include <sys/pcpu.h> 66 #include <sys/proc.h> 67 #include <sys/protosw.h> 68 #include <sys/socket.h> 69 #include <sys/socketvar.h> 70 #include <sys/sx.h> 71 #include <sys/syslog.h> 72 #include <sys/time.h> 73 #include <sys/uio.h> 74 75 #include <net/vnet.h> 76 77 #include <netinet/tcp.h> 78 79 #include <rpc/rpc.h> 80 #include <rpc/rpc_com.h> 81 #include <rpc/krpc.h> 82 83 struct cmessage { 84 struct cmsghdr cmsg; 85 struct cmsgcred cmcred; 86 }; 87 88 static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *, 89 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 90 static void clnt_vc_geterr(CLIENT *, struct rpc_err *); 91 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *); 92 static void clnt_vc_abort(CLIENT *); 93 static bool_t clnt_vc_control(CLIENT *, u_int, void *); 94 static void clnt_vc_close(CLIENT *); 95 static void clnt_vc_destroy(CLIENT *); 96 static bool_t time_not_ok(struct timeval *); 97 static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag); 98 99 static struct clnt_ops clnt_vc_ops = { 100 .cl_call = clnt_vc_call, 101 .cl_abort = clnt_vc_abort, 102 .cl_geterr = clnt_vc_geterr, 103 .cl_freeres = clnt_vc_freeres, 104 .cl_close = clnt_vc_close, 105 .cl_destroy = clnt_vc_destroy, 106 .cl_control = clnt_vc_control 107 }; 108 109 static void clnt_vc_upcallsdone(struct ct_data *); 110 111 static int fake_wchan; 112 113 /* 114 * Create a client handle for a connection. 115 * Default options are set, which the user can change using clnt_control()'s. 116 * The rpc/vc package does buffering similar to stdio, so the client 117 * must pick send and receive buffer sizes, 0 => use the default. 118 * NB: fd is copied into a private area. 119 * NB: The rpch->cl_auth is set null authentication. Caller may wish to 120 * set this something more useful. 121 * 122 * fd should be an open socket 123 */ 124 CLIENT * 125 clnt_vc_create( 126 struct socket *so, /* open file descriptor */ 127 struct sockaddr *raddr, /* servers address */ 128 const rpcprog_t prog, /* program number */ 129 const rpcvers_t vers, /* version number */ 130 size_t sendsz, /* buffer recv size */ 131 size_t recvsz, /* buffer send size */ 132 int intrflag) /* interruptible */ 133 { 134 CLIENT *cl; /* client handle */ 135 struct ct_data *ct = NULL; /* client handle */ 136 struct timeval now; 137 struct rpc_msg call_msg; 138 static uint32_t disrupt; 139 struct __rpc_sockinfo si; 140 XDR xdrs; 141 int error, interrupted, one = 1, sleep_flag; 142 struct sockopt sopt; 143 144 if (disrupt == 0) 145 disrupt = (uint32_t)(long)raddr; 146 147 cl = (CLIENT *)mem_alloc(sizeof (*cl)); 148 ct = (struct ct_data *)mem_alloc(sizeof (*ct)); 149 150 mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF); 151 ct->ct_threads = 0; 152 ct->ct_closing = FALSE; 153 ct->ct_closed = FALSE; 154 ct->ct_upcallrefs = 0; 155 156 if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) { 157 error = soconnect(so, raddr, curthread); 158 SOCK_LOCK(so); 159 interrupted = 0; 160 sleep_flag = PSOCK; 161 if (intrflag != 0) 162 sleep_flag |= PCATCH; 163 while ((so->so_state & SS_ISCONNECTING) 164 && so->so_error == 0) { 165 error = msleep(&so->so_timeo, SOCK_MTX(so), 166 sleep_flag, "connec", 0); 167 if (error) { 168 if (error == EINTR || error == ERESTART) 169 interrupted = 1; 170 break; 171 } 172 } 173 if (error == 0) { 174 error = so->so_error; 175 so->so_error = 0; 176 } 177 SOCK_UNLOCK(so); 178 if (error) { 179 if (!interrupted) 180 so->so_state &= ~SS_ISCONNECTING; 181 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 182 rpc_createerr.cf_error.re_errno = error; 183 goto err; 184 } 185 } 186 187 if (!__rpc_socket2sockinfo(so, &si)) { 188 goto err; 189 } 190 191 if (so->so_proto->pr_flags & PR_CONNREQUIRED) { 192 bzero(&sopt, sizeof(sopt)); 193 sopt.sopt_dir = SOPT_SET; 194 sopt.sopt_level = SOL_SOCKET; 195 sopt.sopt_name = SO_KEEPALIVE; 196 sopt.sopt_val = &one; 197 sopt.sopt_valsize = sizeof(one); 198 sosetopt(so, &sopt); 199 } 200 201 if (so->so_proto->pr_protocol == IPPROTO_TCP) { 202 bzero(&sopt, sizeof(sopt)); 203 sopt.sopt_dir = SOPT_SET; 204 sopt.sopt_level = IPPROTO_TCP; 205 sopt.sopt_name = TCP_NODELAY; 206 sopt.sopt_val = &one; 207 sopt.sopt_valsize = sizeof(one); 208 sosetopt(so, &sopt); 209 } 210 211 ct->ct_closeit = FALSE; 212 213 /* 214 * Set up private data struct 215 */ 216 ct->ct_socket = so; 217 ct->ct_wait.tv_sec = -1; 218 ct->ct_wait.tv_usec = -1; 219 memcpy(&ct->ct_addr, raddr, raddr->sa_len); 220 221 /* 222 * Initialize call message 223 */ 224 getmicrotime(&now); 225 ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now); 226 call_msg.rm_xid = ct->ct_xid; 227 call_msg.rm_direction = CALL; 228 call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION; 229 call_msg.rm_call.cb_prog = (uint32_t)prog; 230 call_msg.rm_call.cb_vers = (uint32_t)vers; 231 232 /* 233 * pre-serialize the static part of the call msg and stash it away 234 */ 235 xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE, 236 XDR_ENCODE); 237 if (! xdr_callhdr(&xdrs, &call_msg)) { 238 if (ct->ct_closeit) { 239 soclose(ct->ct_socket); 240 } 241 goto err; 242 } 243 ct->ct_mpos = XDR_GETPOS(&xdrs); 244 XDR_DESTROY(&xdrs); 245 ct->ct_waitchan = "rpcrecv"; 246 ct->ct_waitflag = 0; 247 248 /* 249 * Create a client handle which uses xdrrec for serialization 250 * and authnone for authentication. 251 */ 252 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 253 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 254 error = soreserve(ct->ct_socket, sendsz, recvsz); 255 if (error != 0) { 256 if (ct->ct_closeit) { 257 soclose(ct->ct_socket); 258 } 259 goto err; 260 } 261 cl->cl_refs = 1; 262 cl->cl_ops = &clnt_vc_ops; 263 cl->cl_private = ct; 264 cl->cl_auth = authnone_create(); 265 266 SOCKBUF_LOCK(&ct->ct_socket->so_rcv); 267 soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct); 268 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); 269 270 ct->ct_record = NULL; 271 ct->ct_record_resid = 0; 272 TAILQ_INIT(&ct->ct_pending); 273 return (cl); 274 275 err: 276 mtx_destroy(&ct->ct_lock); 277 mem_free(ct, sizeof (struct ct_data)); 278 mem_free(cl, sizeof (CLIENT)); 279 280 return ((CLIENT *)NULL); 281 } 282 283 static enum clnt_stat 284 clnt_vc_call( 285 CLIENT *cl, /* client handle */ 286 struct rpc_callextra *ext, /* call metadata */ 287 rpcproc_t proc, /* procedure number */ 288 struct mbuf *args, /* pointer to args */ 289 struct mbuf **resultsp, /* pointer to results */ 290 struct timeval utimeout) 291 { 292 struct ct_data *ct = (struct ct_data *) cl->cl_private; 293 AUTH *auth; 294 struct rpc_err *errp; 295 enum clnt_stat stat; 296 XDR xdrs; 297 struct rpc_msg reply_msg; 298 bool_t ok; 299 int nrefreshes = 2; /* number of times to refresh cred */ 300 struct timeval timeout; 301 uint32_t xid; 302 struct mbuf *mreq = NULL, *results; 303 struct ct_request *cr; 304 int error, trycnt; 305 306 cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK); 307 308 mtx_lock(&ct->ct_lock); 309 310 if (ct->ct_closing || ct->ct_closed) { 311 mtx_unlock(&ct->ct_lock); 312 free(cr, M_RPC); 313 return (RPC_CANTSEND); 314 } 315 ct->ct_threads++; 316 317 if (ext) { 318 auth = ext->rc_auth; 319 errp = &ext->rc_err; 320 } else { 321 auth = cl->cl_auth; 322 errp = &ct->ct_error; 323 } 324 325 cr->cr_mrep = NULL; 326 cr->cr_error = 0; 327 328 if (ct->ct_wait.tv_usec == -1) { 329 timeout = utimeout; /* use supplied timeout */ 330 } else { 331 timeout = ct->ct_wait; /* use default timeout */ 332 } 333 334 /* 335 * After 15sec of looping, allow it to return RPC_CANTSEND, which will 336 * cause the clnt_reconnect layer to create a new TCP connection. 337 */ 338 trycnt = 15 * hz; 339 call_again: 340 mtx_assert(&ct->ct_lock, MA_OWNED); 341 if (ct->ct_closing || ct->ct_closed) { 342 ct->ct_threads--; 343 wakeup(ct); 344 mtx_unlock(&ct->ct_lock); 345 free(cr, M_RPC); 346 return (RPC_CANTSEND); 347 } 348 349 ct->ct_xid++; 350 xid = ct->ct_xid; 351 352 mtx_unlock(&ct->ct_lock); 353 354 /* 355 * Leave space to pre-pend the record mark. 356 */ 357 mreq = m_gethdr(M_WAITOK, MT_DATA); 358 mreq->m_data += sizeof(uint32_t); 359 KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN, 360 ("RPC header too big")); 361 bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos); 362 mreq->m_len = ct->ct_mpos; 363 364 /* 365 * The XID is the first thing in the request. 366 */ 367 *mtod(mreq, uint32_t *) = htonl(xid); 368 369 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 370 371 errp->re_status = stat = RPC_SUCCESS; 372 373 if ((! XDR_PUTINT32(&xdrs, &proc)) || 374 (! AUTH_MARSHALL(auth, xid, &xdrs, 375 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 376 errp->re_status = stat = RPC_CANTENCODEARGS; 377 mtx_lock(&ct->ct_lock); 378 goto out; 379 } 380 mreq->m_pkthdr.len = m_length(mreq, NULL); 381 382 /* 383 * Prepend a record marker containing the packet length. 384 */ 385 M_PREPEND(mreq, sizeof(uint32_t), M_WAITOK); 386 *mtod(mreq, uint32_t *) = 387 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t))); 388 389 cr->cr_xid = xid; 390 mtx_lock(&ct->ct_lock); 391 /* 392 * Check to see if the other end has already started to close down 393 * the connection. The upcall will have set ct_error.re_status 394 * to RPC_CANTRECV if this is the case. 395 * If the other end starts to close down the connection after this 396 * point, it will be detected later when cr_error is checked, 397 * since the request is in the ct_pending queue. 398 */ 399 if (ct->ct_error.re_status == RPC_CANTRECV) { 400 if (errp != &ct->ct_error) { 401 errp->re_errno = ct->ct_error.re_errno; 402 errp->re_status = RPC_CANTRECV; 403 } 404 stat = RPC_CANTRECV; 405 goto out; 406 } 407 TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link); 408 mtx_unlock(&ct->ct_lock); 409 410 /* 411 * sosend consumes mreq. 412 */ 413 error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread); 414 mreq = NULL; 415 if (error == EMSGSIZE || (error == ERESTART && 416 (ct->ct_waitflag & PCATCH) == 0 && trycnt-- > 0)) { 417 SOCKBUF_LOCK(&ct->ct_socket->so_snd); 418 sbwait(&ct->ct_socket->so_snd); 419 SOCKBUF_UNLOCK(&ct->ct_socket->so_snd); 420 AUTH_VALIDATE(auth, xid, NULL, NULL); 421 mtx_lock(&ct->ct_lock); 422 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 423 /* Sleep for 1 clock tick before trying the sosend() again. */ 424 msleep(&fake_wchan, &ct->ct_lock, 0, "rpclpsnd", 1); 425 goto call_again; 426 } 427 428 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 429 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 430 reply_msg.acpted_rply.ar_verf.oa_length = 0; 431 reply_msg.acpted_rply.ar_results.where = NULL; 432 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 433 434 mtx_lock(&ct->ct_lock); 435 if (error) { 436 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 437 errp->re_errno = error; 438 errp->re_status = stat = RPC_CANTSEND; 439 goto out; 440 } 441 442 /* 443 * Check to see if we got an upcall while waiting for the 444 * lock. In both these cases, the request has been removed 445 * from ct->ct_pending. 446 */ 447 if (cr->cr_error) { 448 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 449 errp->re_errno = cr->cr_error; 450 errp->re_status = stat = RPC_CANTRECV; 451 goto out; 452 } 453 if (cr->cr_mrep) { 454 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 455 goto got_reply; 456 } 457 458 /* 459 * Hack to provide rpc-based message passing 460 */ 461 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { 462 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 463 errp->re_status = stat = RPC_TIMEDOUT; 464 goto out; 465 } 466 467 error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan, 468 tvtohz(&timeout)); 469 470 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 471 472 if (error) { 473 /* 474 * The sleep returned an error so our request is still 475 * on the list. Turn the error code into an 476 * appropriate client status. 477 */ 478 errp->re_errno = error; 479 switch (error) { 480 case EINTR: 481 stat = RPC_INTR; 482 break; 483 case EWOULDBLOCK: 484 stat = RPC_TIMEDOUT; 485 break; 486 default: 487 stat = RPC_CANTRECV; 488 } 489 errp->re_status = stat; 490 goto out; 491 } else { 492 /* 493 * We were woken up by the upcall. If the 494 * upcall had a receive error, report that, 495 * otherwise we have a reply. 496 */ 497 if (cr->cr_error) { 498 errp->re_errno = cr->cr_error; 499 errp->re_status = stat = RPC_CANTRECV; 500 goto out; 501 } 502 } 503 504 got_reply: 505 /* 506 * Now decode and validate the response. We need to drop the 507 * lock since xdr_replymsg may end up sleeping in malloc. 508 */ 509 mtx_unlock(&ct->ct_lock); 510 511 if (ext && ext->rc_feedback) 512 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 513 514 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 515 ok = xdr_replymsg(&xdrs, &reply_msg); 516 cr->cr_mrep = NULL; 517 518 if (ok) { 519 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 520 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 521 errp->re_status = stat = RPC_SUCCESS; 522 else 523 stat = _seterr_reply(&reply_msg, errp); 524 525 if (stat == RPC_SUCCESS) { 526 results = xdrmbuf_getall(&xdrs); 527 if (!AUTH_VALIDATE(auth, xid, 528 &reply_msg.acpted_rply.ar_verf, 529 &results)) { 530 errp->re_status = stat = RPC_AUTHERROR; 531 errp->re_why = AUTH_INVALIDRESP; 532 } else { 533 KASSERT(results, 534 ("auth validated but no result")); 535 *resultsp = results; 536 } 537 } /* end successful completion */ 538 /* 539 * If unsuccessful AND error is an authentication error 540 * then refresh credentials and try again, else break 541 */ 542 else if (stat == RPC_AUTHERROR) 543 /* maybe our credentials need to be refreshed ... */ 544 if (nrefreshes > 0 && 545 AUTH_REFRESH(auth, &reply_msg)) { 546 nrefreshes--; 547 XDR_DESTROY(&xdrs); 548 mtx_lock(&ct->ct_lock); 549 goto call_again; 550 } 551 /* end of unsuccessful completion */ 552 } /* end of valid reply message */ 553 else { 554 errp->re_status = stat = RPC_CANTDECODERES; 555 } 556 XDR_DESTROY(&xdrs); 557 mtx_lock(&ct->ct_lock); 558 out: 559 mtx_assert(&ct->ct_lock, MA_OWNED); 560 561 KASSERT(stat != RPC_SUCCESS || *resultsp, 562 ("RPC_SUCCESS without reply")); 563 564 if (mreq) 565 m_freem(mreq); 566 if (cr->cr_mrep) 567 m_freem(cr->cr_mrep); 568 569 ct->ct_threads--; 570 if (ct->ct_closing) 571 wakeup(ct); 572 573 mtx_unlock(&ct->ct_lock); 574 575 if (auth && stat != RPC_SUCCESS) 576 AUTH_VALIDATE(auth, xid, NULL, NULL); 577 578 free(cr, M_RPC); 579 580 return (stat); 581 } 582 583 static void 584 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp) 585 { 586 struct ct_data *ct = (struct ct_data *) cl->cl_private; 587 588 *errp = ct->ct_error; 589 } 590 591 static bool_t 592 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 593 { 594 XDR xdrs; 595 bool_t dummy; 596 597 xdrs.x_op = XDR_FREE; 598 dummy = (*xdr_res)(&xdrs, res_ptr); 599 600 return (dummy); 601 } 602 603 /*ARGSUSED*/ 604 static void 605 clnt_vc_abort(CLIENT *cl) 606 { 607 } 608 609 static bool_t 610 clnt_vc_control(CLIENT *cl, u_int request, void *info) 611 { 612 struct ct_data *ct = (struct ct_data *)cl->cl_private; 613 void *infop = info; 614 SVCXPRT *xprt; 615 616 mtx_lock(&ct->ct_lock); 617 618 switch (request) { 619 case CLSET_FD_CLOSE: 620 ct->ct_closeit = TRUE; 621 mtx_unlock(&ct->ct_lock); 622 return (TRUE); 623 case CLSET_FD_NCLOSE: 624 ct->ct_closeit = FALSE; 625 mtx_unlock(&ct->ct_lock); 626 return (TRUE); 627 default: 628 break; 629 } 630 631 /* for other requests which use info */ 632 if (info == NULL) { 633 mtx_unlock(&ct->ct_lock); 634 return (FALSE); 635 } 636 switch (request) { 637 case CLSET_TIMEOUT: 638 if (time_not_ok((struct timeval *)info)) { 639 mtx_unlock(&ct->ct_lock); 640 return (FALSE); 641 } 642 ct->ct_wait = *(struct timeval *)infop; 643 break; 644 case CLGET_TIMEOUT: 645 *(struct timeval *)infop = ct->ct_wait; 646 break; 647 case CLGET_SERVER_ADDR: 648 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len); 649 break; 650 case CLGET_SVC_ADDR: 651 /* 652 * Slightly different semantics to userland - we use 653 * sockaddr instead of netbuf. 654 */ 655 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len); 656 break; 657 case CLSET_SVC_ADDR: /* set to new address */ 658 mtx_unlock(&ct->ct_lock); 659 return (FALSE); 660 case CLGET_XID: 661 *(uint32_t *)info = ct->ct_xid; 662 break; 663 case CLSET_XID: 664 /* This will set the xid of the NEXT call */ 665 /* decrement by 1 as clnt_vc_call() increments once */ 666 ct->ct_xid = *(uint32_t *)info - 1; 667 break; 668 case CLGET_VERS: 669 /* 670 * This RELIES on the information that, in the call body, 671 * the version number field is the fifth field from the 672 * beginning of the RPC header. MUST be changed if the 673 * call_struct is changed 674 */ 675 *(uint32_t *)info = 676 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc + 677 4 * BYTES_PER_XDR_UNIT)); 678 break; 679 680 case CLSET_VERS: 681 *(uint32_t *)(void *)(ct->ct_mcallc + 682 4 * BYTES_PER_XDR_UNIT) = 683 htonl(*(uint32_t *)info); 684 break; 685 686 case CLGET_PROG: 687 /* 688 * This RELIES on the information that, in the call body, 689 * the program number field is the fourth field from the 690 * beginning of the RPC header. MUST be changed if the 691 * call_struct is changed 692 */ 693 *(uint32_t *)info = 694 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc + 695 3 * BYTES_PER_XDR_UNIT)); 696 break; 697 698 case CLSET_PROG: 699 *(uint32_t *)(void *)(ct->ct_mcallc + 700 3 * BYTES_PER_XDR_UNIT) = 701 htonl(*(uint32_t *)info); 702 break; 703 704 case CLSET_WAITCHAN: 705 ct->ct_waitchan = (const char *)info; 706 break; 707 708 case CLGET_WAITCHAN: 709 *(const char **) info = ct->ct_waitchan; 710 break; 711 712 case CLSET_INTERRUPTIBLE: 713 if (*(int *) info) 714 ct->ct_waitflag = PCATCH; 715 else 716 ct->ct_waitflag = 0; 717 break; 718 719 case CLGET_INTERRUPTIBLE: 720 if (ct->ct_waitflag) 721 *(int *) info = TRUE; 722 else 723 *(int *) info = FALSE; 724 break; 725 726 case CLSET_BACKCHANNEL: 727 xprt = (SVCXPRT *)info; 728 if (ct->ct_backchannelxprt == NULL) { 729 xprt->xp_p2 = ct; 730 ct->ct_backchannelxprt = xprt; 731 } 732 break; 733 734 default: 735 mtx_unlock(&ct->ct_lock); 736 return (FALSE); 737 } 738 739 mtx_unlock(&ct->ct_lock); 740 return (TRUE); 741 } 742 743 static void 744 clnt_vc_close(CLIENT *cl) 745 { 746 struct ct_data *ct = (struct ct_data *) cl->cl_private; 747 struct ct_request *cr; 748 749 mtx_lock(&ct->ct_lock); 750 751 if (ct->ct_closed) { 752 mtx_unlock(&ct->ct_lock); 753 return; 754 } 755 756 if (ct->ct_closing) { 757 while (ct->ct_closing) 758 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0); 759 KASSERT(ct->ct_closed, ("client should be closed")); 760 mtx_unlock(&ct->ct_lock); 761 return; 762 } 763 764 if (ct->ct_socket) { 765 ct->ct_closing = TRUE; 766 mtx_unlock(&ct->ct_lock); 767 768 SOCKBUF_LOCK(&ct->ct_socket->so_rcv); 769 soupcall_clear(ct->ct_socket, SO_RCV); 770 clnt_vc_upcallsdone(ct); 771 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); 772 773 /* 774 * Abort any pending requests and wait until everyone 775 * has finished with clnt_vc_call. 776 */ 777 mtx_lock(&ct->ct_lock); 778 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { 779 cr->cr_xid = 0; 780 cr->cr_error = ESHUTDOWN; 781 wakeup(cr); 782 } 783 784 while (ct->ct_threads) 785 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0); 786 } 787 788 ct->ct_closing = FALSE; 789 ct->ct_closed = TRUE; 790 mtx_unlock(&ct->ct_lock); 791 wakeup(ct); 792 } 793 794 static void 795 clnt_vc_destroy(CLIENT *cl) 796 { 797 struct ct_data *ct = (struct ct_data *) cl->cl_private; 798 struct socket *so = NULL; 799 SVCXPRT *xprt; 800 801 clnt_vc_close(cl); 802 803 mtx_lock(&ct->ct_lock); 804 xprt = ct->ct_backchannelxprt; 805 ct->ct_backchannelxprt = NULL; 806 if (xprt != NULL) { 807 mtx_unlock(&ct->ct_lock); /* To avoid a LOR. */ 808 sx_xlock(&xprt->xp_lock); 809 mtx_lock(&ct->ct_lock); 810 xprt->xp_p2 = NULL; 811 sx_xunlock(&xprt->xp_lock); 812 } 813 814 if (ct->ct_socket) { 815 if (ct->ct_closeit) { 816 so = ct->ct_socket; 817 } 818 } 819 820 mtx_unlock(&ct->ct_lock); 821 822 mtx_destroy(&ct->ct_lock); 823 if (so) { 824 soshutdown(so, SHUT_WR); 825 soclose(so); 826 } 827 mem_free(ct, sizeof(struct ct_data)); 828 if (cl->cl_netid && cl->cl_netid[0]) 829 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 830 if (cl->cl_tp && cl->cl_tp[0]) 831 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 832 mem_free(cl, sizeof(CLIENT)); 833 } 834 835 /* 836 * Make sure that the time is not garbage. -1 value is disallowed. 837 * Note this is different from time_not_ok in clnt_dg.c 838 */ 839 static bool_t 840 time_not_ok(struct timeval *t) 841 { 842 return (t->tv_sec <= -1 || t->tv_sec > 100000000 || 843 t->tv_usec <= -1 || t->tv_usec > 1000000); 844 } 845 846 int 847 clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) 848 { 849 struct ct_data *ct = (struct ct_data *) arg; 850 struct uio uio; 851 struct mbuf *m, *m2; 852 struct ct_request *cr; 853 int error, rcvflag, foundreq; 854 uint32_t xid_plus_direction[2], header; 855 bool_t do_read; 856 SVCXPRT *xprt; 857 struct cf_conn *cd; 858 859 CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t)); 860 ct->ct_upcallrefs++; 861 uio.uio_td = curthread; 862 do { 863 /* 864 * If ct_record_resid is zero, we are waiting for a 865 * record mark. 866 */ 867 if (ct->ct_record_resid == 0) { 868 869 /* 870 * Make sure there is either a whole record 871 * mark in the buffer or there is some other 872 * error condition 873 */ 874 do_read = FALSE; 875 if (sbavail(&so->so_rcv) >= sizeof(uint32_t) 876 || (so->so_rcv.sb_state & SBS_CANTRCVMORE) 877 || so->so_error) 878 do_read = TRUE; 879 880 if (!do_read) 881 break; 882 883 SOCKBUF_UNLOCK(&so->so_rcv); 884 uio.uio_resid = sizeof(uint32_t); 885 m = NULL; 886 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; 887 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 888 SOCKBUF_LOCK(&so->so_rcv); 889 890 if (error == EWOULDBLOCK) 891 break; 892 893 /* 894 * If there was an error, wake up all pending 895 * requests. 896 */ 897 if (error || uio.uio_resid > 0) { 898 wakeup_all: 899 mtx_lock(&ct->ct_lock); 900 if (!error) { 901 /* 902 * We must have got EOF trying 903 * to read from the stream. 904 */ 905 error = ECONNRESET; 906 } 907 ct->ct_error.re_status = RPC_CANTRECV; 908 ct->ct_error.re_errno = error; 909 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { 910 cr->cr_error = error; 911 wakeup(cr); 912 } 913 mtx_unlock(&ct->ct_lock); 914 break; 915 } 916 m_copydata(m, 0, sizeof(uint32_t), (char *)&header); 917 header = ntohl(header); 918 ct->ct_record = NULL; 919 ct->ct_record_resid = header & 0x7fffffff; 920 ct->ct_record_eor = ((header & 0x80000000) != 0); 921 m_freem(m); 922 } else { 923 /* 924 * Wait until the socket has the whole record 925 * buffered. 926 */ 927 do_read = FALSE; 928 if (sbavail(&so->so_rcv) >= ct->ct_record_resid 929 || (so->so_rcv.sb_state & SBS_CANTRCVMORE) 930 || so->so_error) 931 do_read = TRUE; 932 933 if (!do_read) 934 break; 935 936 /* 937 * We have the record mark. Read as much as 938 * the socket has buffered up to the end of 939 * this record. 940 */ 941 SOCKBUF_UNLOCK(&so->so_rcv); 942 uio.uio_resid = ct->ct_record_resid; 943 m = NULL; 944 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; 945 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 946 SOCKBUF_LOCK(&so->so_rcv); 947 948 if (error == EWOULDBLOCK) 949 break; 950 951 if (error || uio.uio_resid == ct->ct_record_resid) 952 goto wakeup_all; 953 954 /* 955 * If we have part of the record already, 956 * chain this bit onto the end. 957 */ 958 if (ct->ct_record) 959 m_last(ct->ct_record)->m_next = m; 960 else 961 ct->ct_record = m; 962 963 ct->ct_record_resid = uio.uio_resid; 964 965 /* 966 * If we have the entire record, see if we can 967 * match it to a request. 968 */ 969 if (ct->ct_record_resid == 0 970 && ct->ct_record_eor) { 971 /* 972 * The XID is in the first uint32_t of 973 * the reply and the message direction 974 * is the second one. 975 */ 976 if (ct->ct_record->m_len < 977 sizeof(xid_plus_direction) && 978 m_length(ct->ct_record, NULL) < 979 sizeof(xid_plus_direction)) { 980 m_freem(ct->ct_record); 981 break; 982 } 983 m_copydata(ct->ct_record, 0, 984 sizeof(xid_plus_direction), 985 (char *)xid_plus_direction); 986 xid_plus_direction[0] = 987 ntohl(xid_plus_direction[0]); 988 xid_plus_direction[1] = 989 ntohl(xid_plus_direction[1]); 990 /* Check message direction. */ 991 if (xid_plus_direction[1] == CALL) { 992 /* This is a backchannel request. */ 993 mtx_lock(&ct->ct_lock); 994 xprt = ct->ct_backchannelxprt; 995 if (xprt == NULL) { 996 mtx_unlock(&ct->ct_lock); 997 /* Just throw it away. */ 998 m_freem(ct->ct_record); 999 ct->ct_record = NULL; 1000 } else { 1001 cd = (struct cf_conn *) 1002 xprt->xp_p1; 1003 m2 = cd->mreq; 1004 /* 1005 * The requests are chained 1006 * in the m_nextpkt list. 1007 */ 1008 while (m2 != NULL && 1009 m2->m_nextpkt != NULL) 1010 /* Find end of list. */ 1011 m2 = m2->m_nextpkt; 1012 if (m2 != NULL) 1013 m2->m_nextpkt = 1014 ct->ct_record; 1015 else 1016 cd->mreq = 1017 ct->ct_record; 1018 ct->ct_record->m_nextpkt = 1019 NULL; 1020 ct->ct_record = NULL; 1021 xprt_active(xprt); 1022 mtx_unlock(&ct->ct_lock); 1023 } 1024 } else { 1025 mtx_lock(&ct->ct_lock); 1026 foundreq = 0; 1027 TAILQ_FOREACH(cr, &ct->ct_pending, 1028 cr_link) { 1029 if (cr->cr_xid == 1030 xid_plus_direction[0]) { 1031 /* 1032 * This one 1033 * matches. We leave 1034 * the reply mbuf in 1035 * cr->cr_mrep. Set 1036 * the XID to zero so 1037 * that we will ignore 1038 * any duplicated 1039 * replies. 1040 */ 1041 cr->cr_xid = 0; 1042 cr->cr_mrep = 1043 ct->ct_record; 1044 cr->cr_error = 0; 1045 foundreq = 1; 1046 wakeup(cr); 1047 break; 1048 } 1049 } 1050 mtx_unlock(&ct->ct_lock); 1051 1052 if (!foundreq) 1053 m_freem(ct->ct_record); 1054 ct->ct_record = NULL; 1055 } 1056 } 1057 } 1058 } while (m); 1059 ct->ct_upcallrefs--; 1060 if (ct->ct_upcallrefs < 0) 1061 panic("rpcvc upcall refcnt"); 1062 if (ct->ct_upcallrefs == 0) 1063 wakeup(&ct->ct_upcallrefs); 1064 return (SU_OK); 1065 } 1066 1067 /* 1068 * Wait for all upcalls in progress to complete. 1069 */ 1070 static void 1071 clnt_vc_upcallsdone(struct ct_data *ct) 1072 { 1073 1074 SOCKBUF_LOCK_ASSERT(&ct->ct_socket->so_rcv); 1075 1076 while (ct->ct_upcallrefs > 0) 1077 (void) msleep(&ct->ct_upcallrefs, 1078 SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0); 1079 } 1080