1 /* $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */ 2 3 /* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 32 #if defined(LIBC_SCCS) && !defined(lint) 33 static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro"; 34 static char *sccsid = "@(#)clnt_tcp.c 2.2 88/08/01 4.0 RPCSRC"; 35 static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 36 #endif 37 #include <sys/cdefs.h> 38 __FBSDID("$FreeBSD$"); 39 40 /* 41 * clnt_tcp.c, Implements a TCP/IP based, client side RPC. 42 * 43 * Copyright (C) 1984, Sun Microsystems, Inc. 44 * 45 * TCP based RPC supports 'batched calls'. 46 * A sequence of calls may be batched-up in a send buffer. The rpc call 47 * return immediately to the client even though the call was not necessarily 48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND 49 * the rpc timeout value is zero (see clnt.h, rpc). 50 * 51 * Clients should NOT casually batch calls that in fact return results; that is, 52 * the server side should be aware that a call is batched and not produce any 53 * return message. Batched calls that produce many result messages can 54 * deadlock (netlock) the client and the server.... 55 * 56 * Now go hang yourself. 57 */ 58 59 #include <sys/param.h> 60 #include <sys/systm.h> 61 #include <sys/lock.h> 62 #include <sys/malloc.h> 63 #include <sys/mbuf.h> 64 #include <sys/mutex.h> 65 #include <sys/pcpu.h> 66 #include <sys/proc.h> 67 #include <sys/protosw.h> 68 #include <sys/socket.h> 69 #include <sys/socketvar.h> 70 #include <sys/sx.h> 71 #include <sys/syslog.h> 72 #include <sys/time.h> 73 #include <sys/uio.h> 74 75 #include <net/vnet.h> 76 77 #include <netinet/tcp.h> 78 79 #include <rpc/rpc.h> 80 #include <rpc/rpc_com.h> 81 #include <rpc/krpc.h> 82 83 struct cmessage { 84 struct cmsghdr cmsg; 85 struct cmsgcred cmcred; 86 }; 87 88 static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *, 89 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 90 static void clnt_vc_geterr(CLIENT *, struct rpc_err *); 91 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *); 92 static void clnt_vc_abort(CLIENT *); 93 static bool_t clnt_vc_control(CLIENT *, u_int, void *); 94 static void clnt_vc_close(CLIENT *); 95 static void clnt_vc_destroy(CLIENT *); 96 static bool_t time_not_ok(struct timeval *); 97 static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag); 98 99 static struct clnt_ops clnt_vc_ops = { 100 .cl_call = clnt_vc_call, 101 .cl_abort = clnt_vc_abort, 102 .cl_geterr = clnt_vc_geterr, 103 .cl_freeres = clnt_vc_freeres, 104 .cl_close = clnt_vc_close, 105 .cl_destroy = clnt_vc_destroy, 106 .cl_control = clnt_vc_control 107 }; 108 109 static void clnt_vc_upcallsdone(struct ct_data *); 110 111 static const char clnt_vc_errstr[] = "%s : %s"; 112 static const char clnt_vc_str[] = "clnt_vc_create"; 113 static const char clnt_read_vc_str[] = "read_vc"; 114 static const char __no_mem_str[] = "out of memory"; 115 116 /* 117 * Create a client handle for a connection. 118 * Default options are set, which the user can change using clnt_control()'s. 119 * The rpc/vc package does buffering similar to stdio, so the client 120 * must pick send and receive buffer sizes, 0 => use the default. 121 * NB: fd is copied into a private area. 122 * NB: The rpch->cl_auth is set null authentication. Caller may wish to 123 * set this something more useful. 124 * 125 * fd should be an open socket 126 */ 127 CLIENT * 128 clnt_vc_create( 129 struct socket *so, /* open file descriptor */ 130 struct sockaddr *raddr, /* servers address */ 131 const rpcprog_t prog, /* program number */ 132 const rpcvers_t vers, /* version number */ 133 size_t sendsz, /* buffer recv size */ 134 size_t recvsz, /* buffer send size */ 135 int intrflag) /* interruptible */ 136 { 137 CLIENT *cl; /* client handle */ 138 struct ct_data *ct = NULL; /* client handle */ 139 struct timeval now; 140 struct rpc_msg call_msg; 141 static uint32_t disrupt; 142 struct __rpc_sockinfo si; 143 XDR xdrs; 144 int error, interrupted, one = 1, sleep_flag; 145 struct sockopt sopt; 146 147 if (disrupt == 0) 148 disrupt = (uint32_t)(long)raddr; 149 150 cl = (CLIENT *)mem_alloc(sizeof (*cl)); 151 ct = (struct ct_data *)mem_alloc(sizeof (*ct)); 152 153 mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF); 154 ct->ct_threads = 0; 155 ct->ct_closing = FALSE; 156 ct->ct_closed = FALSE; 157 ct->ct_upcallrefs = 0; 158 159 if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) { 160 error = soconnect(so, raddr, curthread); 161 SOCK_LOCK(so); 162 interrupted = 0; 163 sleep_flag = PSOCK; 164 if (intrflag != 0) 165 sleep_flag |= (PCATCH | PBDRY); 166 while ((so->so_state & SS_ISCONNECTING) 167 && so->so_error == 0) { 168 error = msleep(&so->so_timeo, SOCK_MTX(so), 169 sleep_flag, "connec", 0); 170 if (error) { 171 if (error == EINTR || error == ERESTART) 172 interrupted = 1; 173 break; 174 } 175 } 176 if (error == 0) { 177 error = so->so_error; 178 so->so_error = 0; 179 } 180 SOCK_UNLOCK(so); 181 if (error) { 182 if (!interrupted) 183 so->so_state &= ~SS_ISCONNECTING; 184 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 185 rpc_createerr.cf_error.re_errno = error; 186 goto err; 187 } 188 } 189 190 if (!__rpc_socket2sockinfo(so, &si)) { 191 goto err; 192 } 193 194 if (so->so_proto->pr_flags & PR_CONNREQUIRED) { 195 bzero(&sopt, sizeof(sopt)); 196 sopt.sopt_dir = SOPT_SET; 197 sopt.sopt_level = SOL_SOCKET; 198 sopt.sopt_name = SO_KEEPALIVE; 199 sopt.sopt_val = &one; 200 sopt.sopt_valsize = sizeof(one); 201 sosetopt(so, &sopt); 202 } 203 204 if (so->so_proto->pr_protocol == IPPROTO_TCP) { 205 bzero(&sopt, sizeof(sopt)); 206 sopt.sopt_dir = SOPT_SET; 207 sopt.sopt_level = IPPROTO_TCP; 208 sopt.sopt_name = TCP_NODELAY; 209 sopt.sopt_val = &one; 210 sopt.sopt_valsize = sizeof(one); 211 sosetopt(so, &sopt); 212 } 213 214 ct->ct_closeit = FALSE; 215 216 /* 217 * Set up private data struct 218 */ 219 ct->ct_socket = so; 220 ct->ct_wait.tv_sec = -1; 221 ct->ct_wait.tv_usec = -1; 222 memcpy(&ct->ct_addr, raddr, raddr->sa_len); 223 224 /* 225 * Initialize call message 226 */ 227 getmicrotime(&now); 228 ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now); 229 call_msg.rm_xid = ct->ct_xid; 230 call_msg.rm_direction = CALL; 231 call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION; 232 call_msg.rm_call.cb_prog = (uint32_t)prog; 233 call_msg.rm_call.cb_vers = (uint32_t)vers; 234 235 /* 236 * pre-serialize the static part of the call msg and stash it away 237 */ 238 xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE, 239 XDR_ENCODE); 240 if (! xdr_callhdr(&xdrs, &call_msg)) { 241 if (ct->ct_closeit) { 242 soclose(ct->ct_socket); 243 } 244 goto err; 245 } 246 ct->ct_mpos = XDR_GETPOS(&xdrs); 247 XDR_DESTROY(&xdrs); 248 ct->ct_waitchan = "rpcrecv"; 249 ct->ct_waitflag = 0; 250 251 /* 252 * Create a client handle which uses xdrrec for serialization 253 * and authnone for authentication. 254 */ 255 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 256 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 257 error = soreserve(ct->ct_socket, sendsz, recvsz); 258 if (error != 0) { 259 if (ct->ct_closeit) { 260 soclose(ct->ct_socket); 261 } 262 goto err; 263 } 264 cl->cl_refs = 1; 265 cl->cl_ops = &clnt_vc_ops; 266 cl->cl_private = ct; 267 cl->cl_auth = authnone_create(); 268 269 SOCKBUF_LOCK(&ct->ct_socket->so_rcv); 270 soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct); 271 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); 272 273 ct->ct_record = NULL; 274 ct->ct_record_resid = 0; 275 TAILQ_INIT(&ct->ct_pending); 276 return (cl); 277 278 err: 279 if (cl) { 280 if (ct) { 281 mtx_destroy(&ct->ct_lock); 282 mem_free(ct, sizeof (struct ct_data)); 283 } 284 if (cl) 285 mem_free(cl, sizeof (CLIENT)); 286 } 287 return ((CLIENT *)NULL); 288 } 289 290 static enum clnt_stat 291 clnt_vc_call( 292 CLIENT *cl, /* client handle */ 293 struct rpc_callextra *ext, /* call metadata */ 294 rpcproc_t proc, /* procedure number */ 295 struct mbuf *args, /* pointer to args */ 296 struct mbuf **resultsp, /* pointer to results */ 297 struct timeval utimeout) 298 { 299 struct ct_data *ct = (struct ct_data *) cl->cl_private; 300 AUTH *auth; 301 struct rpc_err *errp; 302 enum clnt_stat stat; 303 XDR xdrs; 304 struct rpc_msg reply_msg; 305 bool_t ok; 306 int nrefreshes = 2; /* number of times to refresh cred */ 307 struct timeval timeout; 308 uint32_t xid; 309 struct mbuf *mreq = NULL, *results; 310 struct ct_request *cr; 311 int error; 312 313 cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK); 314 315 mtx_lock(&ct->ct_lock); 316 317 if (ct->ct_closing || ct->ct_closed) { 318 mtx_unlock(&ct->ct_lock); 319 free(cr, M_RPC); 320 return (RPC_CANTSEND); 321 } 322 ct->ct_threads++; 323 324 if (ext) { 325 auth = ext->rc_auth; 326 errp = &ext->rc_err; 327 } else { 328 auth = cl->cl_auth; 329 errp = &ct->ct_error; 330 } 331 332 cr->cr_mrep = NULL; 333 cr->cr_error = 0; 334 335 if (ct->ct_wait.tv_usec == -1) { 336 timeout = utimeout; /* use supplied timeout */ 337 } else { 338 timeout = ct->ct_wait; /* use default timeout */ 339 } 340 341 call_again: 342 mtx_assert(&ct->ct_lock, MA_OWNED); 343 344 ct->ct_xid++; 345 xid = ct->ct_xid; 346 347 mtx_unlock(&ct->ct_lock); 348 349 /* 350 * Leave space to pre-pend the record mark. 351 */ 352 MGETHDR(mreq, M_WAITOK, MT_DATA); 353 mreq->m_data += sizeof(uint32_t); 354 KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN, 355 ("RPC header too big")); 356 bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos); 357 mreq->m_len = ct->ct_mpos; 358 359 /* 360 * The XID is the first thing in the request. 361 */ 362 *mtod(mreq, uint32_t *) = htonl(xid); 363 364 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 365 366 errp->re_status = stat = RPC_SUCCESS; 367 368 if ((! XDR_PUTINT32(&xdrs, &proc)) || 369 (! AUTH_MARSHALL(auth, xid, &xdrs, 370 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 371 errp->re_status = stat = RPC_CANTENCODEARGS; 372 mtx_lock(&ct->ct_lock); 373 goto out; 374 } 375 mreq->m_pkthdr.len = m_length(mreq, NULL); 376 377 /* 378 * Prepend a record marker containing the packet length. 379 */ 380 M_PREPEND(mreq, sizeof(uint32_t), M_WAITOK); 381 *mtod(mreq, uint32_t *) = 382 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t))); 383 384 cr->cr_xid = xid; 385 mtx_lock(&ct->ct_lock); 386 /* 387 * Check to see if the other end has already started to close down 388 * the connection. The upcall will have set ct_error.re_status 389 * to RPC_CANTRECV if this is the case. 390 * If the other end starts to close down the connection after this 391 * point, it will be detected later when cr_error is checked, 392 * since the request is in the ct_pending queue. 393 */ 394 if (ct->ct_error.re_status == RPC_CANTRECV) { 395 if (errp != &ct->ct_error) { 396 errp->re_errno = ct->ct_error.re_errno; 397 errp->re_status = RPC_CANTRECV; 398 } 399 stat = RPC_CANTRECV; 400 goto out; 401 } 402 TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link); 403 mtx_unlock(&ct->ct_lock); 404 405 /* 406 * sosend consumes mreq. 407 */ 408 error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread); 409 mreq = NULL; 410 if (error == EMSGSIZE) { 411 SOCKBUF_LOCK(&ct->ct_socket->so_snd); 412 sbwait(&ct->ct_socket->so_snd); 413 SOCKBUF_UNLOCK(&ct->ct_socket->so_snd); 414 AUTH_VALIDATE(auth, xid, NULL, NULL); 415 mtx_lock(&ct->ct_lock); 416 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 417 goto call_again; 418 } 419 420 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 421 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 422 reply_msg.acpted_rply.ar_verf.oa_length = 0; 423 reply_msg.acpted_rply.ar_results.where = NULL; 424 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 425 426 mtx_lock(&ct->ct_lock); 427 if (error) { 428 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 429 errp->re_errno = error; 430 errp->re_status = stat = RPC_CANTSEND; 431 goto out; 432 } 433 434 /* 435 * Check to see if we got an upcall while waiting for the 436 * lock. In both these cases, the request has been removed 437 * from ct->ct_pending. 438 */ 439 if (cr->cr_error) { 440 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 441 errp->re_errno = cr->cr_error; 442 errp->re_status = stat = RPC_CANTRECV; 443 goto out; 444 } 445 if (cr->cr_mrep) { 446 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 447 goto got_reply; 448 } 449 450 /* 451 * Hack to provide rpc-based message passing 452 */ 453 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { 454 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 455 errp->re_status = stat = RPC_TIMEDOUT; 456 goto out; 457 } 458 459 error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan, 460 tvtohz(&timeout)); 461 462 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 463 464 if (error) { 465 /* 466 * The sleep returned an error so our request is still 467 * on the list. Turn the error code into an 468 * appropriate client status. 469 */ 470 errp->re_errno = error; 471 switch (error) { 472 case EINTR: 473 case ERESTART: 474 stat = RPC_INTR; 475 break; 476 case EWOULDBLOCK: 477 stat = RPC_TIMEDOUT; 478 break; 479 default: 480 stat = RPC_CANTRECV; 481 } 482 errp->re_status = stat; 483 goto out; 484 } else { 485 /* 486 * We were woken up by the upcall. If the 487 * upcall had a receive error, report that, 488 * otherwise we have a reply. 489 */ 490 if (cr->cr_error) { 491 errp->re_errno = cr->cr_error; 492 errp->re_status = stat = RPC_CANTRECV; 493 goto out; 494 } 495 } 496 497 got_reply: 498 /* 499 * Now decode and validate the response. We need to drop the 500 * lock since xdr_replymsg may end up sleeping in malloc. 501 */ 502 mtx_unlock(&ct->ct_lock); 503 504 if (ext && ext->rc_feedback) 505 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 506 507 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 508 ok = xdr_replymsg(&xdrs, &reply_msg); 509 cr->cr_mrep = NULL; 510 511 if (ok) { 512 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 513 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 514 errp->re_status = stat = RPC_SUCCESS; 515 else 516 stat = _seterr_reply(&reply_msg, errp); 517 518 if (stat == RPC_SUCCESS) { 519 results = xdrmbuf_getall(&xdrs); 520 if (!AUTH_VALIDATE(auth, xid, 521 &reply_msg.acpted_rply.ar_verf, 522 &results)) { 523 errp->re_status = stat = RPC_AUTHERROR; 524 errp->re_why = AUTH_INVALIDRESP; 525 } else { 526 KASSERT(results, 527 ("auth validated but no result")); 528 *resultsp = results; 529 } 530 } /* end successful completion */ 531 /* 532 * If unsuccesful AND error is an authentication error 533 * then refresh credentials and try again, else break 534 */ 535 else if (stat == RPC_AUTHERROR) 536 /* maybe our credentials need to be refreshed ... */ 537 if (nrefreshes > 0 && 538 AUTH_REFRESH(auth, &reply_msg)) { 539 nrefreshes--; 540 XDR_DESTROY(&xdrs); 541 mtx_lock(&ct->ct_lock); 542 goto call_again; 543 } 544 /* end of unsuccessful completion */ 545 } /* end of valid reply message */ 546 else { 547 errp->re_status = stat = RPC_CANTDECODERES; 548 } 549 XDR_DESTROY(&xdrs); 550 mtx_lock(&ct->ct_lock); 551 out: 552 mtx_assert(&ct->ct_lock, MA_OWNED); 553 554 KASSERT(stat != RPC_SUCCESS || *resultsp, 555 ("RPC_SUCCESS without reply")); 556 557 if (mreq) 558 m_freem(mreq); 559 if (cr->cr_mrep) 560 m_freem(cr->cr_mrep); 561 562 ct->ct_threads--; 563 if (ct->ct_closing) 564 wakeup(ct); 565 566 mtx_unlock(&ct->ct_lock); 567 568 if (auth && stat != RPC_SUCCESS) 569 AUTH_VALIDATE(auth, xid, NULL, NULL); 570 571 free(cr, M_RPC); 572 573 return (stat); 574 } 575 576 static void 577 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp) 578 { 579 struct ct_data *ct = (struct ct_data *) cl->cl_private; 580 581 *errp = ct->ct_error; 582 } 583 584 static bool_t 585 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 586 { 587 XDR xdrs; 588 bool_t dummy; 589 590 xdrs.x_op = XDR_FREE; 591 dummy = (*xdr_res)(&xdrs, res_ptr); 592 593 return (dummy); 594 } 595 596 /*ARGSUSED*/ 597 static void 598 clnt_vc_abort(CLIENT *cl) 599 { 600 } 601 602 static bool_t 603 clnt_vc_control(CLIENT *cl, u_int request, void *info) 604 { 605 struct ct_data *ct = (struct ct_data *)cl->cl_private; 606 void *infop = info; 607 SVCXPRT *xprt; 608 609 mtx_lock(&ct->ct_lock); 610 611 switch (request) { 612 case CLSET_FD_CLOSE: 613 ct->ct_closeit = TRUE; 614 mtx_unlock(&ct->ct_lock); 615 return (TRUE); 616 case CLSET_FD_NCLOSE: 617 ct->ct_closeit = FALSE; 618 mtx_unlock(&ct->ct_lock); 619 return (TRUE); 620 default: 621 break; 622 } 623 624 /* for other requests which use info */ 625 if (info == NULL) { 626 mtx_unlock(&ct->ct_lock); 627 return (FALSE); 628 } 629 switch (request) { 630 case CLSET_TIMEOUT: 631 if (time_not_ok((struct timeval *)info)) { 632 mtx_unlock(&ct->ct_lock); 633 return (FALSE); 634 } 635 ct->ct_wait = *(struct timeval *)infop; 636 break; 637 case CLGET_TIMEOUT: 638 *(struct timeval *)infop = ct->ct_wait; 639 break; 640 case CLGET_SERVER_ADDR: 641 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len); 642 break; 643 case CLGET_SVC_ADDR: 644 /* 645 * Slightly different semantics to userland - we use 646 * sockaddr instead of netbuf. 647 */ 648 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len); 649 break; 650 case CLSET_SVC_ADDR: /* set to new address */ 651 mtx_unlock(&ct->ct_lock); 652 return (FALSE); 653 case CLGET_XID: 654 *(uint32_t *)info = ct->ct_xid; 655 break; 656 case CLSET_XID: 657 /* This will set the xid of the NEXT call */ 658 /* decrement by 1 as clnt_vc_call() increments once */ 659 ct->ct_xid = *(uint32_t *)info - 1; 660 break; 661 case CLGET_VERS: 662 /* 663 * This RELIES on the information that, in the call body, 664 * the version number field is the fifth field from the 665 * begining of the RPC header. MUST be changed if the 666 * call_struct is changed 667 */ 668 *(uint32_t *)info = 669 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc + 670 4 * BYTES_PER_XDR_UNIT)); 671 break; 672 673 case CLSET_VERS: 674 *(uint32_t *)(void *)(ct->ct_mcallc + 675 4 * BYTES_PER_XDR_UNIT) = 676 htonl(*(uint32_t *)info); 677 break; 678 679 case CLGET_PROG: 680 /* 681 * This RELIES on the information that, in the call body, 682 * the program number field is the fourth field from the 683 * begining of the RPC header. MUST be changed if the 684 * call_struct is changed 685 */ 686 *(uint32_t *)info = 687 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc + 688 3 * BYTES_PER_XDR_UNIT)); 689 break; 690 691 case CLSET_PROG: 692 *(uint32_t *)(void *)(ct->ct_mcallc + 693 3 * BYTES_PER_XDR_UNIT) = 694 htonl(*(uint32_t *)info); 695 break; 696 697 case CLSET_WAITCHAN: 698 ct->ct_waitchan = (const char *)info; 699 break; 700 701 case CLGET_WAITCHAN: 702 *(const char **) info = ct->ct_waitchan; 703 break; 704 705 case CLSET_INTERRUPTIBLE: 706 if (*(int *) info) 707 ct->ct_waitflag = PCATCH | PBDRY; 708 else 709 ct->ct_waitflag = 0; 710 break; 711 712 case CLGET_INTERRUPTIBLE: 713 if (ct->ct_waitflag) 714 *(int *) info = TRUE; 715 else 716 *(int *) info = FALSE; 717 break; 718 719 case CLSET_BACKCHANNEL: 720 xprt = (SVCXPRT *)info; 721 if (ct->ct_backchannelxprt == NULL) { 722 xprt->xp_p2 = ct; 723 ct->ct_backchannelxprt = xprt; 724 } 725 break; 726 727 default: 728 mtx_unlock(&ct->ct_lock); 729 return (FALSE); 730 } 731 732 mtx_unlock(&ct->ct_lock); 733 return (TRUE); 734 } 735 736 static void 737 clnt_vc_close(CLIENT *cl) 738 { 739 struct ct_data *ct = (struct ct_data *) cl->cl_private; 740 struct ct_request *cr; 741 742 mtx_lock(&ct->ct_lock); 743 744 if (ct->ct_closed) { 745 mtx_unlock(&ct->ct_lock); 746 return; 747 } 748 749 if (ct->ct_closing) { 750 while (ct->ct_closing) 751 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0); 752 KASSERT(ct->ct_closed, ("client should be closed")); 753 mtx_unlock(&ct->ct_lock); 754 return; 755 } 756 757 if (ct->ct_socket) { 758 ct->ct_closing = TRUE; 759 mtx_unlock(&ct->ct_lock); 760 761 SOCKBUF_LOCK(&ct->ct_socket->so_rcv); 762 soupcall_clear(ct->ct_socket, SO_RCV); 763 clnt_vc_upcallsdone(ct); 764 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); 765 766 /* 767 * Abort any pending requests and wait until everyone 768 * has finished with clnt_vc_call. 769 */ 770 mtx_lock(&ct->ct_lock); 771 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { 772 cr->cr_xid = 0; 773 cr->cr_error = ESHUTDOWN; 774 wakeup(cr); 775 } 776 777 while (ct->ct_threads) 778 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0); 779 } 780 781 ct->ct_closing = FALSE; 782 ct->ct_closed = TRUE; 783 mtx_unlock(&ct->ct_lock); 784 wakeup(ct); 785 } 786 787 static void 788 clnt_vc_destroy(CLIENT *cl) 789 { 790 struct ct_data *ct = (struct ct_data *) cl->cl_private; 791 struct socket *so = NULL; 792 SVCXPRT *xprt; 793 794 clnt_vc_close(cl); 795 796 mtx_lock(&ct->ct_lock); 797 xprt = ct->ct_backchannelxprt; 798 ct->ct_backchannelxprt = NULL; 799 if (xprt != NULL) { 800 mtx_unlock(&ct->ct_lock); /* To avoid a LOR. */ 801 sx_xlock(&xprt->xp_lock); 802 mtx_lock(&ct->ct_lock); 803 xprt->xp_p2 = NULL; 804 xprt_unregister(xprt); 805 } 806 807 if (ct->ct_socket) { 808 if (ct->ct_closeit) { 809 so = ct->ct_socket; 810 } 811 } 812 813 mtx_unlock(&ct->ct_lock); 814 if (xprt != NULL) { 815 sx_xunlock(&xprt->xp_lock); 816 SVC_RELEASE(xprt); 817 } 818 819 mtx_destroy(&ct->ct_lock); 820 if (so) { 821 soshutdown(so, SHUT_WR); 822 soclose(so); 823 } 824 mem_free(ct, sizeof(struct ct_data)); 825 if (cl->cl_netid && cl->cl_netid[0]) 826 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 827 if (cl->cl_tp && cl->cl_tp[0]) 828 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 829 mem_free(cl, sizeof(CLIENT)); 830 } 831 832 /* 833 * Make sure that the time is not garbage. -1 value is disallowed. 834 * Note this is different from time_not_ok in clnt_dg.c 835 */ 836 static bool_t 837 time_not_ok(struct timeval *t) 838 { 839 return (t->tv_sec <= -1 || t->tv_sec > 100000000 || 840 t->tv_usec <= -1 || t->tv_usec > 1000000); 841 } 842 843 int 844 clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) 845 { 846 struct ct_data *ct = (struct ct_data *) arg; 847 struct uio uio; 848 struct mbuf *m, *m2; 849 struct ct_request *cr; 850 int error, rcvflag, foundreq; 851 uint32_t xid_plus_direction[2], header; 852 bool_t do_read; 853 SVCXPRT *xprt; 854 struct cf_conn *cd; 855 856 CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t)); 857 ct->ct_upcallrefs++; 858 uio.uio_td = curthread; 859 do { 860 /* 861 * If ct_record_resid is zero, we are waiting for a 862 * record mark. 863 */ 864 if (ct->ct_record_resid == 0) { 865 866 /* 867 * Make sure there is either a whole record 868 * mark in the buffer or there is some other 869 * error condition 870 */ 871 do_read = FALSE; 872 if (so->so_rcv.sb_cc >= sizeof(uint32_t) 873 || (so->so_rcv.sb_state & SBS_CANTRCVMORE) 874 || so->so_error) 875 do_read = TRUE; 876 877 if (!do_read) 878 break; 879 880 SOCKBUF_UNLOCK(&so->so_rcv); 881 uio.uio_resid = sizeof(uint32_t); 882 m = NULL; 883 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; 884 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 885 SOCKBUF_LOCK(&so->so_rcv); 886 887 if (error == EWOULDBLOCK) 888 break; 889 890 /* 891 * If there was an error, wake up all pending 892 * requests. 893 */ 894 if (error || uio.uio_resid > 0) { 895 wakeup_all: 896 mtx_lock(&ct->ct_lock); 897 if (!error) { 898 /* 899 * We must have got EOF trying 900 * to read from the stream. 901 */ 902 error = ECONNRESET; 903 } 904 ct->ct_error.re_status = RPC_CANTRECV; 905 ct->ct_error.re_errno = error; 906 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { 907 cr->cr_error = error; 908 wakeup(cr); 909 } 910 mtx_unlock(&ct->ct_lock); 911 break; 912 } 913 m_copydata(m, 0, sizeof(uint32_t), (char *)&header); 914 header = ntohl(header); 915 ct->ct_record = NULL; 916 ct->ct_record_resid = header & 0x7fffffff; 917 ct->ct_record_eor = ((header & 0x80000000) != 0); 918 m_freem(m); 919 } else { 920 /* 921 * Wait until the socket has the whole record 922 * buffered. 923 */ 924 do_read = FALSE; 925 if (so->so_rcv.sb_cc >= ct->ct_record_resid 926 || (so->so_rcv.sb_state & SBS_CANTRCVMORE) 927 || so->so_error) 928 do_read = TRUE; 929 930 if (!do_read) 931 break; 932 933 /* 934 * We have the record mark. Read as much as 935 * the socket has buffered up to the end of 936 * this record. 937 */ 938 SOCKBUF_UNLOCK(&so->so_rcv); 939 uio.uio_resid = ct->ct_record_resid; 940 m = NULL; 941 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; 942 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 943 SOCKBUF_LOCK(&so->so_rcv); 944 945 if (error == EWOULDBLOCK) 946 break; 947 948 if (error || uio.uio_resid == ct->ct_record_resid) 949 goto wakeup_all; 950 951 /* 952 * If we have part of the record already, 953 * chain this bit onto the end. 954 */ 955 if (ct->ct_record) 956 m_last(ct->ct_record)->m_next = m; 957 else 958 ct->ct_record = m; 959 960 ct->ct_record_resid = uio.uio_resid; 961 962 /* 963 * If we have the entire record, see if we can 964 * match it to a request. 965 */ 966 if (ct->ct_record_resid == 0 967 && ct->ct_record_eor) { 968 /* 969 * The XID is in the first uint32_t of 970 * the reply and the message direction 971 * is the second one. 972 */ 973 if (ct->ct_record->m_len < 974 sizeof(xid_plus_direction) && 975 m_length(ct->ct_record, NULL) < 976 sizeof(xid_plus_direction)) { 977 m_freem(ct->ct_record); 978 break; 979 } 980 m_copydata(ct->ct_record, 0, 981 sizeof(xid_plus_direction), 982 (char *)xid_plus_direction); 983 xid_plus_direction[0] = 984 ntohl(xid_plus_direction[0]); 985 xid_plus_direction[1] = 986 ntohl(xid_plus_direction[1]); 987 /* Check message direction. */ 988 if (xid_plus_direction[1] == CALL) { 989 /* This is a backchannel request. */ 990 mtx_lock(&ct->ct_lock); 991 xprt = ct->ct_backchannelxprt; 992 if (xprt == NULL) { 993 mtx_unlock(&ct->ct_lock); 994 /* Just throw it away. */ 995 m_freem(ct->ct_record); 996 ct->ct_record = NULL; 997 } else { 998 cd = (struct cf_conn *) 999 xprt->xp_p1; 1000 m2 = cd->mreq; 1001 /* 1002 * The requests are chained 1003 * in the m_nextpkt list. 1004 */ 1005 while (m2 != NULL && 1006 m2->m_nextpkt != NULL) 1007 /* Find end of list. */ 1008 m2 = m2->m_nextpkt; 1009 if (m2 != NULL) 1010 m2->m_nextpkt = 1011 ct->ct_record; 1012 else 1013 cd->mreq = 1014 ct->ct_record; 1015 ct->ct_record->m_nextpkt = 1016 NULL; 1017 ct->ct_record = NULL; 1018 xprt_active(xprt); 1019 mtx_unlock(&ct->ct_lock); 1020 } 1021 } else { 1022 mtx_lock(&ct->ct_lock); 1023 foundreq = 0; 1024 TAILQ_FOREACH(cr, &ct->ct_pending, 1025 cr_link) { 1026 if (cr->cr_xid == 1027 xid_plus_direction[0]) { 1028 /* 1029 * This one 1030 * matches. We leave 1031 * the reply mbuf in 1032 * cr->cr_mrep. Set 1033 * the XID to zero so 1034 * that we will ignore 1035 * any duplicated 1036 * replies. 1037 */ 1038 cr->cr_xid = 0; 1039 cr->cr_mrep = 1040 ct->ct_record; 1041 cr->cr_error = 0; 1042 foundreq = 1; 1043 wakeup(cr); 1044 break; 1045 } 1046 } 1047 mtx_unlock(&ct->ct_lock); 1048 1049 if (!foundreq) 1050 m_freem(ct->ct_record); 1051 ct->ct_record = NULL; 1052 } 1053 } 1054 } 1055 } while (m); 1056 ct->ct_upcallrefs--; 1057 if (ct->ct_upcallrefs < 0) 1058 panic("rpcvc upcall refcnt"); 1059 if (ct->ct_upcallrefs == 0) 1060 wakeup(&ct->ct_upcallrefs); 1061 return (SU_OK); 1062 } 1063 1064 /* 1065 * Wait for all upcalls in progress to complete. 1066 */ 1067 static void 1068 clnt_vc_upcallsdone(struct ct_data *ct) 1069 { 1070 1071 SOCKBUF_LOCK_ASSERT(&ct->ct_socket->so_rcv); 1072 1073 while (ct->ct_upcallrefs > 0) 1074 (void) msleep(&ct->ct_upcallrefs, 1075 SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0); 1076 } 1077