1 /* $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */ 2 3 /* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 32 #if defined(LIBC_SCCS) && !defined(lint) 33 static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro"; 34 static char *sccsid = "@(#)clnt_tcp.c 2.2 88/08/01 4.0 RPCSRC"; 35 static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 36 #endif 37 #include <sys/cdefs.h> 38 __FBSDID("$FreeBSD$"); 39 40 /* 41 * clnt_tcp.c, Implements a TCP/IP based, client side RPC. 42 * 43 * Copyright (C) 1984, Sun Microsystems, Inc. 44 * 45 * TCP based RPC supports 'batched calls'. 46 * A sequence of calls may be batched-up in a send buffer. The rpc call 47 * return immediately to the client even though the call was not necessarily 48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND 49 * the rpc timeout value is zero (see clnt.h, rpc). 50 * 51 * Clients should NOT casually batch calls that in fact return results; that is, 52 * the server side should be aware that a call is batched and not produce any 53 * return message. Batched calls that produce many result messages can 54 * deadlock (netlock) the client and the server.... 55 * 56 * Now go hang yourself. 57 */ 58 59 #include <sys/param.h> 60 #include <sys/systm.h> 61 #include <sys/lock.h> 62 #include <sys/malloc.h> 63 #include <sys/mbuf.h> 64 #include <sys/mutex.h> 65 #include <sys/pcpu.h> 66 #include <sys/proc.h> 67 #include <sys/protosw.h> 68 #include <sys/socket.h> 69 #include <sys/socketvar.h> 70 #include <sys/sx.h> 71 #include <sys/syslog.h> 72 #include <sys/time.h> 73 #include <sys/uio.h> 74 75 #include <net/vnet.h> 76 77 #include <netinet/tcp.h> 78 79 #include <rpc/rpc.h> 80 #include <rpc/rpc_com.h> 81 #include <rpc/krpc.h> 82 83 struct cmessage { 84 struct cmsghdr cmsg; 85 struct cmsgcred cmcred; 86 }; 87 88 static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *, 89 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 90 static void clnt_vc_geterr(CLIENT *, struct rpc_err *); 91 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *); 92 static void clnt_vc_abort(CLIENT *); 93 static bool_t clnt_vc_control(CLIENT *, u_int, void *); 94 static void clnt_vc_close(CLIENT *); 95 static void clnt_vc_destroy(CLIENT *); 96 static bool_t time_not_ok(struct timeval *); 97 static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag); 98 99 static struct clnt_ops clnt_vc_ops = { 100 .cl_call = clnt_vc_call, 101 .cl_abort = clnt_vc_abort, 102 .cl_geterr = clnt_vc_geterr, 103 .cl_freeres = clnt_vc_freeres, 104 .cl_close = clnt_vc_close, 105 .cl_destroy = clnt_vc_destroy, 106 .cl_control = clnt_vc_control 107 }; 108 109 static void clnt_vc_upcallsdone(struct ct_data *); 110 111 static const char clnt_vc_errstr[] = "%s : %s"; 112 static const char clnt_vc_str[] = "clnt_vc_create"; 113 static const char clnt_read_vc_str[] = "read_vc"; 114 static const char __no_mem_str[] = "out of memory"; 115 116 /* 117 * Create a client handle for a connection. 118 * Default options are set, which the user can change using clnt_control()'s. 119 * The rpc/vc package does buffering similar to stdio, so the client 120 * must pick send and receive buffer sizes, 0 => use the default. 121 * NB: fd is copied into a private area. 122 * NB: The rpch->cl_auth is set null authentication. Caller may wish to 123 * set this something more useful. 124 * 125 * fd should be an open socket 126 */ 127 CLIENT * 128 clnt_vc_create( 129 struct socket *so, /* open file descriptor */ 130 struct sockaddr *raddr, /* servers address */ 131 const rpcprog_t prog, /* program number */ 132 const rpcvers_t vers, /* version number */ 133 size_t sendsz, /* buffer recv size */ 134 size_t recvsz, /* buffer send size */ 135 int intrflag) /* interruptible */ 136 { 137 CLIENT *cl; /* client handle */ 138 struct ct_data *ct = NULL; /* client handle */ 139 struct timeval now; 140 struct rpc_msg call_msg; 141 static uint32_t disrupt; 142 struct __rpc_sockinfo si; 143 XDR xdrs; 144 int error, interrupted, one = 1, sleep_flag; 145 struct sockopt sopt; 146 147 if (disrupt == 0) 148 disrupt = (uint32_t)(long)raddr; 149 150 cl = (CLIENT *)mem_alloc(sizeof (*cl)); 151 ct = (struct ct_data *)mem_alloc(sizeof (*ct)); 152 153 mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF); 154 ct->ct_threads = 0; 155 ct->ct_closing = FALSE; 156 ct->ct_closed = FALSE; 157 ct->ct_upcallrefs = 0; 158 159 if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) { 160 error = soconnect(so, raddr, curthread); 161 SOCK_LOCK(so); 162 interrupted = 0; 163 sleep_flag = PSOCK; 164 if (intrflag != 0) 165 sleep_flag |= PCATCH; 166 while ((so->so_state & SS_ISCONNECTING) 167 && so->so_error == 0) { 168 error = msleep(&so->so_timeo, SOCK_MTX(so), 169 sleep_flag, "connec", 0); 170 if (error) { 171 if (error == EINTR || error == ERESTART) 172 interrupted = 1; 173 break; 174 } 175 } 176 if (error == 0) { 177 error = so->so_error; 178 so->so_error = 0; 179 } 180 SOCK_UNLOCK(so); 181 if (error) { 182 if (!interrupted) 183 so->so_state &= ~SS_ISCONNECTING; 184 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 185 rpc_createerr.cf_error.re_errno = error; 186 goto err; 187 } 188 } 189 190 if (!__rpc_socket2sockinfo(so, &si)) { 191 goto err; 192 } 193 194 if (so->so_proto->pr_flags & PR_CONNREQUIRED) { 195 bzero(&sopt, sizeof(sopt)); 196 sopt.sopt_dir = SOPT_SET; 197 sopt.sopt_level = SOL_SOCKET; 198 sopt.sopt_name = SO_KEEPALIVE; 199 sopt.sopt_val = &one; 200 sopt.sopt_valsize = sizeof(one); 201 sosetopt(so, &sopt); 202 } 203 204 if (so->so_proto->pr_protocol == IPPROTO_TCP) { 205 bzero(&sopt, sizeof(sopt)); 206 sopt.sopt_dir = SOPT_SET; 207 sopt.sopt_level = IPPROTO_TCP; 208 sopt.sopt_name = TCP_NODELAY; 209 sopt.sopt_val = &one; 210 sopt.sopt_valsize = sizeof(one); 211 sosetopt(so, &sopt); 212 } 213 214 ct->ct_closeit = FALSE; 215 216 /* 217 * Set up private data struct 218 */ 219 ct->ct_socket = so; 220 ct->ct_wait.tv_sec = -1; 221 ct->ct_wait.tv_usec = -1; 222 memcpy(&ct->ct_addr, raddr, raddr->sa_len); 223 224 /* 225 * Initialize call message 226 */ 227 getmicrotime(&now); 228 ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now); 229 call_msg.rm_xid = ct->ct_xid; 230 call_msg.rm_direction = CALL; 231 call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION; 232 call_msg.rm_call.cb_prog = (uint32_t)prog; 233 call_msg.rm_call.cb_vers = (uint32_t)vers; 234 235 /* 236 * pre-serialize the static part of the call msg and stash it away 237 */ 238 xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE, 239 XDR_ENCODE); 240 if (! xdr_callhdr(&xdrs, &call_msg)) { 241 if (ct->ct_closeit) { 242 soclose(ct->ct_socket); 243 } 244 goto err; 245 } 246 ct->ct_mpos = XDR_GETPOS(&xdrs); 247 XDR_DESTROY(&xdrs); 248 ct->ct_waitchan = "rpcrecv"; 249 ct->ct_waitflag = 0; 250 251 /* 252 * Create a client handle which uses xdrrec for serialization 253 * and authnone for authentication. 254 */ 255 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 256 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 257 error = soreserve(ct->ct_socket, sendsz, recvsz); 258 if (error != 0) { 259 if (ct->ct_closeit) { 260 soclose(ct->ct_socket); 261 } 262 goto err; 263 } 264 cl->cl_refs = 1; 265 cl->cl_ops = &clnt_vc_ops; 266 cl->cl_private = ct; 267 cl->cl_auth = authnone_create(); 268 269 SOCKBUF_LOCK(&ct->ct_socket->so_rcv); 270 soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct); 271 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); 272 273 ct->ct_record = NULL; 274 ct->ct_record_resid = 0; 275 TAILQ_INIT(&ct->ct_pending); 276 return (cl); 277 278 err: 279 if (cl) { 280 if (ct) { 281 mtx_destroy(&ct->ct_lock); 282 mem_free(ct, sizeof (struct ct_data)); 283 } 284 if (cl) 285 mem_free(cl, sizeof (CLIENT)); 286 } 287 return ((CLIENT *)NULL); 288 } 289 290 static enum clnt_stat 291 clnt_vc_call( 292 CLIENT *cl, /* client handle */ 293 struct rpc_callextra *ext, /* call metadata */ 294 rpcproc_t proc, /* procedure number */ 295 struct mbuf *args, /* pointer to args */ 296 struct mbuf **resultsp, /* pointer to results */ 297 struct timeval utimeout) 298 { 299 struct ct_data *ct = (struct ct_data *) cl->cl_private; 300 AUTH *auth; 301 struct rpc_err *errp; 302 enum clnt_stat stat; 303 XDR xdrs; 304 struct rpc_msg reply_msg; 305 bool_t ok; 306 int nrefreshes = 2; /* number of times to refresh cred */ 307 struct timeval timeout; 308 uint32_t xid; 309 struct mbuf *mreq = NULL, *results; 310 struct ct_request *cr; 311 int error; 312 313 cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK); 314 315 mtx_lock(&ct->ct_lock); 316 317 if (ct->ct_closing || ct->ct_closed) { 318 mtx_unlock(&ct->ct_lock); 319 free(cr, M_RPC); 320 return (RPC_CANTSEND); 321 } 322 ct->ct_threads++; 323 324 if (ext) { 325 auth = ext->rc_auth; 326 errp = &ext->rc_err; 327 } else { 328 auth = cl->cl_auth; 329 errp = &ct->ct_error; 330 } 331 332 cr->cr_mrep = NULL; 333 cr->cr_error = 0; 334 335 if (ct->ct_wait.tv_usec == -1) { 336 timeout = utimeout; /* use supplied timeout */ 337 } else { 338 timeout = ct->ct_wait; /* use default timeout */ 339 } 340 341 call_again: 342 mtx_assert(&ct->ct_lock, MA_OWNED); 343 344 ct->ct_xid++; 345 xid = ct->ct_xid; 346 347 mtx_unlock(&ct->ct_lock); 348 349 /* 350 * Leave space to pre-pend the record mark. 351 */ 352 mreq = m_gethdr(M_WAITOK, MT_DATA); 353 mreq->m_data += sizeof(uint32_t); 354 KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN, 355 ("RPC header too big")); 356 bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos); 357 mreq->m_len = ct->ct_mpos; 358 359 /* 360 * The XID is the first thing in the request. 361 */ 362 *mtod(mreq, uint32_t *) = htonl(xid); 363 364 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 365 366 errp->re_status = stat = RPC_SUCCESS; 367 368 if ((! XDR_PUTINT32(&xdrs, &proc)) || 369 (! AUTH_MARSHALL(auth, xid, &xdrs, 370 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 371 errp->re_status = stat = RPC_CANTENCODEARGS; 372 mtx_lock(&ct->ct_lock); 373 goto out; 374 } 375 mreq->m_pkthdr.len = m_length(mreq, NULL); 376 377 /* 378 * Prepend a record marker containing the packet length. 379 */ 380 M_PREPEND(mreq, sizeof(uint32_t), M_WAITOK); 381 *mtod(mreq, uint32_t *) = 382 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t))); 383 384 cr->cr_xid = xid; 385 mtx_lock(&ct->ct_lock); 386 /* 387 * Check to see if the other end has already started to close down 388 * the connection. The upcall will have set ct_error.re_status 389 * to RPC_CANTRECV if this is the case. 390 * If the other end starts to close down the connection after this 391 * point, it will be detected later when cr_error is checked, 392 * since the request is in the ct_pending queue. 393 */ 394 if (ct->ct_error.re_status == RPC_CANTRECV) { 395 if (errp != &ct->ct_error) { 396 errp->re_errno = ct->ct_error.re_errno; 397 errp->re_status = RPC_CANTRECV; 398 } 399 stat = RPC_CANTRECV; 400 goto out; 401 } 402 TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link); 403 mtx_unlock(&ct->ct_lock); 404 405 /* 406 * sosend consumes mreq. 407 */ 408 error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread); 409 mreq = NULL; 410 if (error == EMSGSIZE) { 411 SOCKBUF_LOCK(&ct->ct_socket->so_snd); 412 sbwait(&ct->ct_socket->so_snd); 413 SOCKBUF_UNLOCK(&ct->ct_socket->so_snd); 414 AUTH_VALIDATE(auth, xid, NULL, NULL); 415 mtx_lock(&ct->ct_lock); 416 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 417 goto call_again; 418 } 419 420 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 421 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 422 reply_msg.acpted_rply.ar_verf.oa_length = 0; 423 reply_msg.acpted_rply.ar_results.where = NULL; 424 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 425 426 mtx_lock(&ct->ct_lock); 427 if (error) { 428 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 429 errp->re_errno = error; 430 errp->re_status = stat = RPC_CANTSEND; 431 goto out; 432 } 433 434 /* 435 * Check to see if we got an upcall while waiting for the 436 * lock. In both these cases, the request has been removed 437 * from ct->ct_pending. 438 */ 439 if (cr->cr_error) { 440 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 441 errp->re_errno = cr->cr_error; 442 errp->re_status = stat = RPC_CANTRECV; 443 goto out; 444 } 445 if (cr->cr_mrep) { 446 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 447 goto got_reply; 448 } 449 450 /* 451 * Hack to provide rpc-based message passing 452 */ 453 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { 454 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 455 errp->re_status = stat = RPC_TIMEDOUT; 456 goto out; 457 } 458 459 error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan, 460 tvtohz(&timeout)); 461 462 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 463 464 if (error) { 465 /* 466 * The sleep returned an error so our request is still 467 * on the list. Turn the error code into an 468 * appropriate client status. 469 */ 470 errp->re_errno = error; 471 switch (error) { 472 case EINTR: 473 stat = RPC_INTR; 474 break; 475 case EWOULDBLOCK: 476 stat = RPC_TIMEDOUT; 477 break; 478 default: 479 stat = RPC_CANTRECV; 480 } 481 errp->re_status = stat; 482 goto out; 483 } else { 484 /* 485 * We were woken up by the upcall. If the 486 * upcall had a receive error, report that, 487 * otherwise we have a reply. 488 */ 489 if (cr->cr_error) { 490 errp->re_errno = cr->cr_error; 491 errp->re_status = stat = RPC_CANTRECV; 492 goto out; 493 } 494 } 495 496 got_reply: 497 /* 498 * Now decode and validate the response. We need to drop the 499 * lock since xdr_replymsg may end up sleeping in malloc. 500 */ 501 mtx_unlock(&ct->ct_lock); 502 503 if (ext && ext->rc_feedback) 504 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 505 506 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 507 ok = xdr_replymsg(&xdrs, &reply_msg); 508 cr->cr_mrep = NULL; 509 510 if (ok) { 511 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 512 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 513 errp->re_status = stat = RPC_SUCCESS; 514 else 515 stat = _seterr_reply(&reply_msg, errp); 516 517 if (stat == RPC_SUCCESS) { 518 results = xdrmbuf_getall(&xdrs); 519 if (!AUTH_VALIDATE(auth, xid, 520 &reply_msg.acpted_rply.ar_verf, 521 &results)) { 522 errp->re_status = stat = RPC_AUTHERROR; 523 errp->re_why = AUTH_INVALIDRESP; 524 } else { 525 KASSERT(results, 526 ("auth validated but no result")); 527 *resultsp = results; 528 } 529 } /* end successful completion */ 530 /* 531 * If unsuccesful AND error is an authentication error 532 * then refresh credentials and try again, else break 533 */ 534 else if (stat == RPC_AUTHERROR) 535 /* maybe our credentials need to be refreshed ... */ 536 if (nrefreshes > 0 && 537 AUTH_REFRESH(auth, &reply_msg)) { 538 nrefreshes--; 539 XDR_DESTROY(&xdrs); 540 mtx_lock(&ct->ct_lock); 541 goto call_again; 542 } 543 /* end of unsuccessful completion */ 544 } /* end of valid reply message */ 545 else { 546 errp->re_status = stat = RPC_CANTDECODERES; 547 } 548 XDR_DESTROY(&xdrs); 549 mtx_lock(&ct->ct_lock); 550 out: 551 mtx_assert(&ct->ct_lock, MA_OWNED); 552 553 KASSERT(stat != RPC_SUCCESS || *resultsp, 554 ("RPC_SUCCESS without reply")); 555 556 if (mreq) 557 m_freem(mreq); 558 if (cr->cr_mrep) 559 m_freem(cr->cr_mrep); 560 561 ct->ct_threads--; 562 if (ct->ct_closing) 563 wakeup(ct); 564 565 mtx_unlock(&ct->ct_lock); 566 567 if (auth && stat != RPC_SUCCESS) 568 AUTH_VALIDATE(auth, xid, NULL, NULL); 569 570 free(cr, M_RPC); 571 572 return (stat); 573 } 574 575 static void 576 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp) 577 { 578 struct ct_data *ct = (struct ct_data *) cl->cl_private; 579 580 *errp = ct->ct_error; 581 } 582 583 static bool_t 584 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 585 { 586 XDR xdrs; 587 bool_t dummy; 588 589 xdrs.x_op = XDR_FREE; 590 dummy = (*xdr_res)(&xdrs, res_ptr); 591 592 return (dummy); 593 } 594 595 /*ARGSUSED*/ 596 static void 597 clnt_vc_abort(CLIENT *cl) 598 { 599 } 600 601 static bool_t 602 clnt_vc_control(CLIENT *cl, u_int request, void *info) 603 { 604 struct ct_data *ct = (struct ct_data *)cl->cl_private; 605 void *infop = info; 606 SVCXPRT *xprt; 607 608 mtx_lock(&ct->ct_lock); 609 610 switch (request) { 611 case CLSET_FD_CLOSE: 612 ct->ct_closeit = TRUE; 613 mtx_unlock(&ct->ct_lock); 614 return (TRUE); 615 case CLSET_FD_NCLOSE: 616 ct->ct_closeit = FALSE; 617 mtx_unlock(&ct->ct_lock); 618 return (TRUE); 619 default: 620 break; 621 } 622 623 /* for other requests which use info */ 624 if (info == NULL) { 625 mtx_unlock(&ct->ct_lock); 626 return (FALSE); 627 } 628 switch (request) { 629 case CLSET_TIMEOUT: 630 if (time_not_ok((struct timeval *)info)) { 631 mtx_unlock(&ct->ct_lock); 632 return (FALSE); 633 } 634 ct->ct_wait = *(struct timeval *)infop; 635 break; 636 case CLGET_TIMEOUT: 637 *(struct timeval *)infop = ct->ct_wait; 638 break; 639 case CLGET_SERVER_ADDR: 640 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len); 641 break; 642 case CLGET_SVC_ADDR: 643 /* 644 * Slightly different semantics to userland - we use 645 * sockaddr instead of netbuf. 646 */ 647 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len); 648 break; 649 case CLSET_SVC_ADDR: /* set to new address */ 650 mtx_unlock(&ct->ct_lock); 651 return (FALSE); 652 case CLGET_XID: 653 *(uint32_t *)info = ct->ct_xid; 654 break; 655 case CLSET_XID: 656 /* This will set the xid of the NEXT call */ 657 /* decrement by 1 as clnt_vc_call() increments once */ 658 ct->ct_xid = *(uint32_t *)info - 1; 659 break; 660 case CLGET_VERS: 661 /* 662 * This RELIES on the information that, in the call body, 663 * the version number field is the fifth field from the 664 * begining of the RPC header. MUST be changed if the 665 * call_struct is changed 666 */ 667 *(uint32_t *)info = 668 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc + 669 4 * BYTES_PER_XDR_UNIT)); 670 break; 671 672 case CLSET_VERS: 673 *(uint32_t *)(void *)(ct->ct_mcallc + 674 4 * BYTES_PER_XDR_UNIT) = 675 htonl(*(uint32_t *)info); 676 break; 677 678 case CLGET_PROG: 679 /* 680 * This RELIES on the information that, in the call body, 681 * the program number field is the fourth field from the 682 * begining of the RPC header. MUST be changed if the 683 * call_struct is changed 684 */ 685 *(uint32_t *)info = 686 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc + 687 3 * BYTES_PER_XDR_UNIT)); 688 break; 689 690 case CLSET_PROG: 691 *(uint32_t *)(void *)(ct->ct_mcallc + 692 3 * BYTES_PER_XDR_UNIT) = 693 htonl(*(uint32_t *)info); 694 break; 695 696 case CLSET_WAITCHAN: 697 ct->ct_waitchan = (const char *)info; 698 break; 699 700 case CLGET_WAITCHAN: 701 *(const char **) info = ct->ct_waitchan; 702 break; 703 704 case CLSET_INTERRUPTIBLE: 705 if (*(int *) info) 706 ct->ct_waitflag = PCATCH; 707 else 708 ct->ct_waitflag = 0; 709 break; 710 711 case CLGET_INTERRUPTIBLE: 712 if (ct->ct_waitflag) 713 *(int *) info = TRUE; 714 else 715 *(int *) info = FALSE; 716 break; 717 718 case CLSET_BACKCHANNEL: 719 xprt = (SVCXPRT *)info; 720 if (ct->ct_backchannelxprt == NULL) { 721 xprt->xp_p2 = ct; 722 ct->ct_backchannelxprt = xprt; 723 } 724 break; 725 726 default: 727 mtx_unlock(&ct->ct_lock); 728 return (FALSE); 729 } 730 731 mtx_unlock(&ct->ct_lock); 732 return (TRUE); 733 } 734 735 static void 736 clnt_vc_close(CLIENT *cl) 737 { 738 struct ct_data *ct = (struct ct_data *) cl->cl_private; 739 struct ct_request *cr; 740 741 mtx_lock(&ct->ct_lock); 742 743 if (ct->ct_closed) { 744 mtx_unlock(&ct->ct_lock); 745 return; 746 } 747 748 if (ct->ct_closing) { 749 while (ct->ct_closing) 750 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0); 751 KASSERT(ct->ct_closed, ("client should be closed")); 752 mtx_unlock(&ct->ct_lock); 753 return; 754 } 755 756 if (ct->ct_socket) { 757 ct->ct_closing = TRUE; 758 mtx_unlock(&ct->ct_lock); 759 760 SOCKBUF_LOCK(&ct->ct_socket->so_rcv); 761 soupcall_clear(ct->ct_socket, SO_RCV); 762 clnt_vc_upcallsdone(ct); 763 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); 764 765 /* 766 * Abort any pending requests and wait until everyone 767 * has finished with clnt_vc_call. 768 */ 769 mtx_lock(&ct->ct_lock); 770 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { 771 cr->cr_xid = 0; 772 cr->cr_error = ESHUTDOWN; 773 wakeup(cr); 774 } 775 776 while (ct->ct_threads) 777 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0); 778 } 779 780 ct->ct_closing = FALSE; 781 ct->ct_closed = TRUE; 782 mtx_unlock(&ct->ct_lock); 783 wakeup(ct); 784 } 785 786 static void 787 clnt_vc_destroy(CLIENT *cl) 788 { 789 struct ct_data *ct = (struct ct_data *) cl->cl_private; 790 struct socket *so = NULL; 791 SVCXPRT *xprt; 792 793 clnt_vc_close(cl); 794 795 mtx_lock(&ct->ct_lock); 796 xprt = ct->ct_backchannelxprt; 797 ct->ct_backchannelxprt = NULL; 798 if (xprt != NULL) { 799 mtx_unlock(&ct->ct_lock); /* To avoid a LOR. */ 800 sx_xlock(&xprt->xp_lock); 801 mtx_lock(&ct->ct_lock); 802 xprt->xp_p2 = NULL; 803 xprt_unregister(xprt); 804 } 805 806 if (ct->ct_socket) { 807 if (ct->ct_closeit) { 808 so = ct->ct_socket; 809 } 810 } 811 812 mtx_unlock(&ct->ct_lock); 813 if (xprt != NULL) { 814 sx_xunlock(&xprt->xp_lock); 815 SVC_RELEASE(xprt); 816 } 817 818 mtx_destroy(&ct->ct_lock); 819 if (so) { 820 soshutdown(so, SHUT_WR); 821 soclose(so); 822 } 823 mem_free(ct, sizeof(struct ct_data)); 824 if (cl->cl_netid && cl->cl_netid[0]) 825 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 826 if (cl->cl_tp && cl->cl_tp[0]) 827 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 828 mem_free(cl, sizeof(CLIENT)); 829 } 830 831 /* 832 * Make sure that the time is not garbage. -1 value is disallowed. 833 * Note this is different from time_not_ok in clnt_dg.c 834 */ 835 static bool_t 836 time_not_ok(struct timeval *t) 837 { 838 return (t->tv_sec <= -1 || t->tv_sec > 100000000 || 839 t->tv_usec <= -1 || t->tv_usec > 1000000); 840 } 841 842 int 843 clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) 844 { 845 struct ct_data *ct = (struct ct_data *) arg; 846 struct uio uio; 847 struct mbuf *m, *m2; 848 struct ct_request *cr; 849 int error, rcvflag, foundreq; 850 uint32_t xid_plus_direction[2], header; 851 bool_t do_read; 852 SVCXPRT *xprt; 853 struct cf_conn *cd; 854 855 CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t)); 856 ct->ct_upcallrefs++; 857 uio.uio_td = curthread; 858 do { 859 /* 860 * If ct_record_resid is zero, we are waiting for a 861 * record mark. 862 */ 863 if (ct->ct_record_resid == 0) { 864 865 /* 866 * Make sure there is either a whole record 867 * mark in the buffer or there is some other 868 * error condition 869 */ 870 do_read = FALSE; 871 if (so->so_rcv.sb_cc >= sizeof(uint32_t) 872 || (so->so_rcv.sb_state & SBS_CANTRCVMORE) 873 || so->so_error) 874 do_read = TRUE; 875 876 if (!do_read) 877 break; 878 879 SOCKBUF_UNLOCK(&so->so_rcv); 880 uio.uio_resid = sizeof(uint32_t); 881 m = NULL; 882 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; 883 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 884 SOCKBUF_LOCK(&so->so_rcv); 885 886 if (error == EWOULDBLOCK) 887 break; 888 889 /* 890 * If there was an error, wake up all pending 891 * requests. 892 */ 893 if (error || uio.uio_resid > 0) { 894 wakeup_all: 895 mtx_lock(&ct->ct_lock); 896 if (!error) { 897 /* 898 * We must have got EOF trying 899 * to read from the stream. 900 */ 901 error = ECONNRESET; 902 } 903 ct->ct_error.re_status = RPC_CANTRECV; 904 ct->ct_error.re_errno = error; 905 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { 906 cr->cr_error = error; 907 wakeup(cr); 908 } 909 mtx_unlock(&ct->ct_lock); 910 break; 911 } 912 m_copydata(m, 0, sizeof(uint32_t), (char *)&header); 913 header = ntohl(header); 914 ct->ct_record = NULL; 915 ct->ct_record_resid = header & 0x7fffffff; 916 ct->ct_record_eor = ((header & 0x80000000) != 0); 917 m_freem(m); 918 } else { 919 /* 920 * Wait until the socket has the whole record 921 * buffered. 922 */ 923 do_read = FALSE; 924 if (so->so_rcv.sb_cc >= ct->ct_record_resid 925 || (so->so_rcv.sb_state & SBS_CANTRCVMORE) 926 || so->so_error) 927 do_read = TRUE; 928 929 if (!do_read) 930 break; 931 932 /* 933 * We have the record mark. Read as much as 934 * the socket has buffered up to the end of 935 * this record. 936 */ 937 SOCKBUF_UNLOCK(&so->so_rcv); 938 uio.uio_resid = ct->ct_record_resid; 939 m = NULL; 940 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; 941 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 942 SOCKBUF_LOCK(&so->so_rcv); 943 944 if (error == EWOULDBLOCK) 945 break; 946 947 if (error || uio.uio_resid == ct->ct_record_resid) 948 goto wakeup_all; 949 950 /* 951 * If we have part of the record already, 952 * chain this bit onto the end. 953 */ 954 if (ct->ct_record) 955 m_last(ct->ct_record)->m_next = m; 956 else 957 ct->ct_record = m; 958 959 ct->ct_record_resid = uio.uio_resid; 960 961 /* 962 * If we have the entire record, see if we can 963 * match it to a request. 964 */ 965 if (ct->ct_record_resid == 0 966 && ct->ct_record_eor) { 967 /* 968 * The XID is in the first uint32_t of 969 * the reply and the message direction 970 * is the second one. 971 */ 972 if (ct->ct_record->m_len < 973 sizeof(xid_plus_direction) && 974 m_length(ct->ct_record, NULL) < 975 sizeof(xid_plus_direction)) { 976 m_freem(ct->ct_record); 977 break; 978 } 979 m_copydata(ct->ct_record, 0, 980 sizeof(xid_plus_direction), 981 (char *)xid_plus_direction); 982 xid_plus_direction[0] = 983 ntohl(xid_plus_direction[0]); 984 xid_plus_direction[1] = 985 ntohl(xid_plus_direction[1]); 986 /* Check message direction. */ 987 if (xid_plus_direction[1] == CALL) { 988 /* This is a backchannel request. */ 989 mtx_lock(&ct->ct_lock); 990 xprt = ct->ct_backchannelxprt; 991 if (xprt == NULL) { 992 mtx_unlock(&ct->ct_lock); 993 /* Just throw it away. */ 994 m_freem(ct->ct_record); 995 ct->ct_record = NULL; 996 } else { 997 cd = (struct cf_conn *) 998 xprt->xp_p1; 999 m2 = cd->mreq; 1000 /* 1001 * The requests are chained 1002 * in the m_nextpkt list. 1003 */ 1004 while (m2 != NULL && 1005 m2->m_nextpkt != NULL) 1006 /* Find end of list. */ 1007 m2 = m2->m_nextpkt; 1008 if (m2 != NULL) 1009 m2->m_nextpkt = 1010 ct->ct_record; 1011 else 1012 cd->mreq = 1013 ct->ct_record; 1014 ct->ct_record->m_nextpkt = 1015 NULL; 1016 ct->ct_record = NULL; 1017 xprt_active(xprt); 1018 mtx_unlock(&ct->ct_lock); 1019 } 1020 } else { 1021 mtx_lock(&ct->ct_lock); 1022 foundreq = 0; 1023 TAILQ_FOREACH(cr, &ct->ct_pending, 1024 cr_link) { 1025 if (cr->cr_xid == 1026 xid_plus_direction[0]) { 1027 /* 1028 * This one 1029 * matches. We leave 1030 * the reply mbuf in 1031 * cr->cr_mrep. Set 1032 * the XID to zero so 1033 * that we will ignore 1034 * any duplicated 1035 * replies. 1036 */ 1037 cr->cr_xid = 0; 1038 cr->cr_mrep = 1039 ct->ct_record; 1040 cr->cr_error = 0; 1041 foundreq = 1; 1042 wakeup(cr); 1043 break; 1044 } 1045 } 1046 mtx_unlock(&ct->ct_lock); 1047 1048 if (!foundreq) 1049 m_freem(ct->ct_record); 1050 ct->ct_record = NULL; 1051 } 1052 } 1053 } 1054 } while (m); 1055 ct->ct_upcallrefs--; 1056 if (ct->ct_upcallrefs < 0) 1057 panic("rpcvc upcall refcnt"); 1058 if (ct->ct_upcallrefs == 0) 1059 wakeup(&ct->ct_upcallrefs); 1060 return (SU_OK); 1061 } 1062 1063 /* 1064 * Wait for all upcalls in progress to complete. 1065 */ 1066 static void 1067 clnt_vc_upcallsdone(struct ct_data *ct) 1068 { 1069 1070 SOCKBUF_LOCK_ASSERT(&ct->ct_socket->so_rcv); 1071 1072 while (ct->ct_upcallrefs > 0) 1073 (void) msleep(&ct->ct_upcallrefs, 1074 SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0); 1075 } 1076