1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 /* 32 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 33 */ 34 35 #if defined(LIBC_SCCS) && !defined(lint) 36 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 38 #endif 39 #include <sys/cdefs.h> 40 __FBSDID("$FreeBSD$"); 41 42 /* 43 * Implements a connectionless client side RPC. 44 */ 45 46 #include <sys/param.h> 47 #include <sys/systm.h> 48 #include <sys/kernel.h> 49 #include <sys/lock.h> 50 #include <sys/malloc.h> 51 #include <sys/mbuf.h> 52 #include <sys/mutex.h> 53 #include <sys/pcpu.h> 54 #include <sys/proc.h> 55 #include <sys/socket.h> 56 #include <sys/socketvar.h> 57 #include <sys/time.h> 58 #include <sys/uio.h> 59 60 #include <rpc/rpc.h> 61 #include <rpc/rpc_com.h> 62 63 64 #ifdef _FREEFALL_CONFIG 65 /* 66 * Disable RPC exponential back-off for FreeBSD.org systems. 67 */ 68 #define RPC_MAX_BACKOFF 1 /* second */ 69 #else 70 #define RPC_MAX_BACKOFF 30 /* seconds */ 71 #endif 72 73 static bool_t time_not_ok(struct timeval *); 74 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 75 rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval); 76 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 77 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 78 static void clnt_dg_abort(CLIENT *); 79 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 80 static void clnt_dg_destroy(CLIENT *); 81 static void clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 82 83 static struct clnt_ops clnt_dg_ops = { 84 .cl_call = clnt_dg_call, 85 .cl_abort = clnt_dg_abort, 86 .cl_geterr = clnt_dg_geterr, 87 .cl_freeres = clnt_dg_freeres, 88 .cl_destroy = clnt_dg_destroy, 89 .cl_control = clnt_dg_control 90 }; 91 92 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; 93 94 /* 95 * A pending RPC request which awaits a reply. Requests which have 96 * received their reply will have cr_xid set to zero and cr_mrep to 97 * the mbuf chain of the reply. 98 */ 99 struct cu_request { 100 TAILQ_ENTRY(cu_request) cr_link; 101 CLIENT *cr_client; /* owner */ 102 uint32_t cr_xid; /* XID of request */ 103 struct mbuf *cr_mrep; /* reply received by upcall */ 104 int cr_error; /* any error from upcall */ 105 }; 106 107 TAILQ_HEAD(cu_request_list, cu_request); 108 109 #define MCALL_MSG_SIZE 24 110 111 /* 112 * This structure is pointed to by the socket's so_upcallarg 113 * member. It is separate from the client private data to facilitate 114 * multiple clients sharing the same socket. The cs_lock mutex is used 115 * to protect all fields of this structure, the socket's receive 116 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 117 * structures is installed on the socket. 118 */ 119 struct cu_socket { 120 struct mtx cs_lock; 121 int cs_refs; /* Count of clients */ 122 struct cu_request_list cs_pending; /* Requests awaiting replies */ 123 124 }; 125 126 /* 127 * Private data kept per client handle 128 */ 129 struct cu_data { 130 int cu_threads; /* # threads in clnt_vc_call */ 131 bool_t cu_closing; /* TRUE if we are destroying */ 132 struct socket *cu_socket; /* connection socket */ 133 bool_t cu_closeit; /* opened by library */ 134 struct sockaddr_storage cu_raddr; /* remote address */ 135 int cu_rlen; 136 struct timeval cu_wait; /* retransmit interval */ 137 struct timeval cu_total; /* total time for the call */ 138 struct rpc_err cu_error; 139 uint32_t cu_xid; 140 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 141 size_t cu_mcalllen; 142 size_t cu_sendsz; /* send size */ 143 size_t cu_recvsz; /* recv size */ 144 int cu_async; 145 int cu_connect; /* Use connect(). */ 146 int cu_connected; /* Have done connect(). */ 147 const char *cu_waitchan; 148 int cu_waitflag; 149 }; 150 151 /* 152 * Connection less client creation returns with client handle parameters. 153 * Default options are set, which the user can change using clnt_control(). 154 * fd should be open and bound. 155 * NB: The rpch->cl_auth is initialized to null authentication. 156 * Caller may wish to set this something more useful. 157 * 158 * sendsz and recvsz are the maximum allowable packet sizes that can be 159 * sent and received. Normally they are the same, but they can be 160 * changed to improve the program efficiency and buffer allocation. 161 * If they are 0, use the transport default. 162 * 163 * If svcaddr is NULL, returns NULL. 164 */ 165 CLIENT * 166 clnt_dg_create( 167 struct socket *so, 168 struct sockaddr *svcaddr, /* servers address */ 169 rpcprog_t program, /* program number */ 170 rpcvers_t version, /* version number */ 171 size_t sendsz, /* buffer recv size */ 172 size_t recvsz) /* buffer send size */ 173 { 174 CLIENT *cl = NULL; /* client handle */ 175 struct cu_data *cu = NULL; /* private data */ 176 struct cu_socket *cs = NULL; 177 struct timeval now; 178 struct rpc_msg call_msg; 179 struct __rpc_sockinfo si; 180 XDR xdrs; 181 182 if (svcaddr == NULL) { 183 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 184 return (NULL); 185 } 186 187 if (!__rpc_socket2sockinfo(so, &si)) { 188 rpc_createerr.cf_stat = RPC_TLIERROR; 189 rpc_createerr.cf_error.re_errno = 0; 190 return (NULL); 191 } 192 193 /* 194 * Find the receive and the send size 195 */ 196 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 197 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 198 if ((sendsz == 0) || (recvsz == 0)) { 199 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 200 rpc_createerr.cf_error.re_errno = 0; 201 return (NULL); 202 } 203 204 cl = mem_alloc(sizeof (CLIENT)); 205 206 /* 207 * Should be multiple of 4 for XDR. 208 */ 209 sendsz = ((sendsz + 3) / 4) * 4; 210 recvsz = ((recvsz + 3) / 4) * 4; 211 cu = mem_alloc(sizeof (*cu)); 212 cu->cu_threads = 0; 213 cu->cu_closing = FALSE; 214 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 215 cu->cu_rlen = svcaddr->sa_len; 216 /* Other values can also be set through clnt_control() */ 217 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 218 cu->cu_wait.tv_usec = 0; 219 cu->cu_total.tv_sec = -1; 220 cu->cu_total.tv_usec = -1; 221 cu->cu_sendsz = sendsz; 222 cu->cu_recvsz = recvsz; 223 cu->cu_async = FALSE; 224 cu->cu_connect = FALSE; 225 cu->cu_connected = FALSE; 226 cu->cu_waitchan = "rpcrecv"; 227 cu->cu_waitflag = 0; 228 (void) getmicrotime(&now); 229 cu->cu_xid = __RPC_GETXID(&now); 230 call_msg.rm_xid = cu->cu_xid; 231 call_msg.rm_call.cb_prog = program; 232 call_msg.rm_call.cb_vers = version; 233 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 234 if (! xdr_callhdr(&xdrs, &call_msg)) { 235 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 236 rpc_createerr.cf_error.re_errno = 0; 237 goto err2; 238 } 239 cu->cu_mcalllen = XDR_GETPOS(&xdrs);; 240 241 /* 242 * By default, closeit is always FALSE. It is users responsibility 243 * to do a close on it, else the user may use clnt_control 244 * to let clnt_destroy do it for him/her. 245 */ 246 cu->cu_closeit = FALSE; 247 cu->cu_socket = so; 248 soreserve(so, 256*1024, 256*1024); 249 250 SOCKBUF_LOCK(&so->so_rcv); 251 recheck_socket: 252 if (so->so_upcall) { 253 if (so->so_upcall != clnt_dg_soupcall) { 254 SOCKBUF_UNLOCK(&so->so_rcv); 255 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 256 goto err2; 257 } 258 cs = (struct cu_socket *) so->so_upcallarg; 259 mtx_lock(&cs->cs_lock); 260 cs->cs_refs++; 261 mtx_unlock(&cs->cs_lock); 262 } else { 263 /* 264 * We are the first on this socket - allocate the 265 * structure and install it in the socket. 266 */ 267 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 268 cs = mem_alloc(sizeof(*cs)); 269 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 270 if (so->so_upcall) { 271 /* 272 * We have lost a race with some other client. 273 */ 274 mem_free(cs, sizeof(*cs)); 275 goto recheck_socket; 276 } 277 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 278 cs->cs_refs = 1; 279 TAILQ_INIT(&cs->cs_pending); 280 so->so_upcallarg = cs; 281 so->so_upcall = clnt_dg_soupcall; 282 so->so_rcv.sb_flags |= SB_UPCALL; 283 } 284 SOCKBUF_UNLOCK(&so->so_rcv); 285 286 cl->cl_refs = 1; 287 cl->cl_ops = &clnt_dg_ops; 288 cl->cl_private = (caddr_t)(void *)cu; 289 cl->cl_auth = authnone_create(); 290 cl->cl_tp = NULL; 291 cl->cl_netid = NULL; 292 return (cl); 293 err2: 294 if (cl) { 295 mem_free(cl, sizeof (CLIENT)); 296 if (cu) 297 mem_free(cu, sizeof (*cu)); 298 } 299 return (NULL); 300 } 301 302 static enum clnt_stat 303 clnt_dg_call( 304 CLIENT *cl, /* client handle */ 305 struct rpc_callextra *ext, /* call metadata */ 306 rpcproc_t proc, /* procedure number */ 307 xdrproc_t xargs, /* xdr routine for args */ 308 void *argsp, /* pointer to args */ 309 xdrproc_t xresults, /* xdr routine for results */ 310 void *resultsp, /* pointer to results */ 311 struct timeval utimeout) /* seconds to wait before giving up */ 312 { 313 struct cu_data *cu = (struct cu_data *)cl->cl_private; 314 struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; 315 AUTH *auth; 316 XDR xdrs; 317 struct rpc_msg reply_msg; 318 bool_t ok; 319 int retrans; /* number of re-transmits so far */ 320 int nrefreshes = 2; /* number of times to refresh cred */ 321 struct timeval *tvp; 322 int timeout; 323 int retransmit_time; 324 int next_sendtime, starttime, time_waited, tv; 325 struct sockaddr *sa; 326 socklen_t salen; 327 uint32_t xid; 328 struct mbuf *mreq = NULL; 329 struct cu_request *cr; 330 int error; 331 332 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 333 334 mtx_lock(&cs->cs_lock); 335 336 if (cu->cu_closing) { 337 mtx_unlock(&cs->cs_lock); 338 free(cr, M_RPC); 339 return (RPC_CANTSEND); 340 } 341 cu->cu_threads++; 342 343 if (ext) 344 auth = ext->rc_auth; 345 else 346 auth = cl->cl_auth; 347 348 cr->cr_client = cl; 349 cr->cr_mrep = NULL; 350 cr->cr_error = 0; 351 352 if (cu->cu_total.tv_usec == -1) { 353 tvp = &utimeout; /* use supplied timeout */ 354 } else { 355 tvp = &cu->cu_total; /* use default timeout */ 356 } 357 if (tvp->tv_sec || tvp->tv_usec) 358 timeout = tvtohz(tvp); 359 else 360 timeout = 0; 361 362 if (cu->cu_connect && !cu->cu_connected) { 363 mtx_unlock(&cs->cs_lock); 364 error = soconnect(cu->cu_socket, 365 (struct sockaddr *)&cu->cu_raddr, curthread); 366 mtx_lock(&cs->cs_lock); 367 if (error) { 368 cu->cu_error.re_errno = error; 369 cu->cu_error.re_status = RPC_CANTSEND; 370 goto out; 371 } 372 cu->cu_connected = 1; 373 } 374 if (cu->cu_connected) { 375 sa = NULL; 376 salen = 0; 377 } else { 378 sa = (struct sockaddr *)&cu->cu_raddr; 379 salen = cu->cu_rlen; 380 } 381 time_waited = 0; 382 retrans = 0; 383 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 384 385 starttime = ticks; 386 387 call_again: 388 mtx_assert(&cs->cs_lock, MA_OWNED); 389 390 cu->cu_xid++; 391 xid = cu->cu_xid; 392 393 send_again: 394 mtx_unlock(&cs->cs_lock); 395 396 MGETHDR(mreq, M_WAIT, MT_DATA); 397 MCLGET(mreq, M_WAIT); 398 mreq->m_len = 0; 399 m_append(mreq, cu->cu_mcalllen, cu->cu_mcallc); 400 401 /* 402 * The XID is the first thing in the request. 403 */ 404 *mtod(mreq, uint32_t *) = htonl(xid); 405 406 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 407 408 if (cu->cu_async == TRUE && xargs == NULL) 409 goto get_reply; 410 411 if ((! XDR_PUTINT32(&xdrs, &proc)) || 412 (! AUTH_MARSHALL(auth, &xdrs)) || 413 (! (*xargs)(&xdrs, argsp))) { 414 cu->cu_error.re_status = RPC_CANTENCODEARGS; 415 mtx_lock(&cs->cs_lock); 416 goto out; 417 } 418 m_fixhdr(mreq); 419 420 cr->cr_xid = xid; 421 mtx_lock(&cs->cs_lock); 422 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 423 mtx_unlock(&cs->cs_lock); 424 425 /* 426 * sosend consumes mreq. 427 */ 428 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 429 mreq = NULL; 430 431 /* 432 * sub-optimal code appears here because we have 433 * some clock time to spare while the packets are in flight. 434 * (We assume that this is actually only executed once.) 435 */ 436 reply_msg.acpted_rply.ar_verf = _null_auth; 437 reply_msg.acpted_rply.ar_results.where = resultsp; 438 reply_msg.acpted_rply.ar_results.proc = xresults; 439 440 mtx_lock(&cs->cs_lock); 441 if (error) { 442 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 443 cu->cu_error.re_errno = error; 444 cu->cu_error.re_status = RPC_CANTSEND; 445 goto out; 446 } 447 448 /* 449 * Check to see if we got an upcall while waiting for the 450 * lock. 451 */ 452 if (cr->cr_error) { 453 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 454 cu->cu_error.re_errno = cr->cr_error; 455 cu->cu_error.re_status = RPC_CANTRECV; 456 goto out; 457 } 458 if (cr->cr_mrep) { 459 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 460 goto got_reply; 461 } 462 463 /* 464 * Hack to provide rpc-based message passing 465 */ 466 if (timeout == 0) { 467 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 468 cu->cu_error.re_status = RPC_TIMEDOUT; 469 goto out; 470 } 471 472 get_reply: 473 for (;;) { 474 /* Decide how long to wait. */ 475 if (next_sendtime < timeout) 476 tv = next_sendtime; 477 else 478 tv = timeout; 479 tv -= time_waited; 480 481 if (tv > 0) { 482 if (cu->cu_closing) 483 error = 0; 484 else 485 error = msleep(cr, &cs->cs_lock, 486 cu->cu_waitflag, cu->cu_waitchan, tv); 487 } else { 488 error = EWOULDBLOCK; 489 } 490 491 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 492 493 if (!error) { 494 /* 495 * We were woken up by the upcall. If the 496 * upcall had a receive error, report that, 497 * otherwise we have a reply. 498 */ 499 if (cr->cr_error) { 500 cu->cu_error.re_errno = cr->cr_error; 501 cu->cu_error.re_status = RPC_CANTRECV; 502 goto out; 503 } 504 break; 505 } 506 507 /* 508 * The sleep returned an error so our request is still 509 * on the list. If we got EWOULDBLOCK, we may want to 510 * re-send the request. 511 */ 512 if (error != EWOULDBLOCK) { 513 cu->cu_error.re_errno = error; 514 if (error == EINTR) 515 cu->cu_error.re_status = RPC_INTR; 516 else 517 cu->cu_error.re_status = RPC_CANTRECV; 518 goto out; 519 } 520 521 time_waited = ticks - starttime; 522 523 /* Check for timeout. */ 524 if (time_waited > timeout) { 525 cu->cu_error.re_errno = EWOULDBLOCK; 526 cu->cu_error.re_status = RPC_TIMEDOUT; 527 goto out; 528 } 529 530 /* Retransmit if necessary. */ 531 if (time_waited >= next_sendtime) { 532 if (ext && ext->rc_feedback) { 533 mtx_unlock(&cs->cs_lock); 534 if (retrans == 0) 535 ext->rc_feedback(FEEDBACK_REXMIT1, 536 proc, ext->rc_feedback_arg); 537 else 538 ext->rc_feedback(FEEDBACK_REXMIT2, 539 proc, ext->rc_feedback_arg); 540 mtx_lock(&cs->cs_lock); 541 } 542 if (cu->cu_closing) { 543 cu->cu_error.re_errno = ESHUTDOWN; 544 cu->cu_error.re_status = RPC_CANTRECV; 545 goto out; 546 } 547 retrans++; 548 /* update retransmit_time */ 549 if (retransmit_time < RPC_MAX_BACKOFF * hz) 550 retransmit_time = 2 * retransmit_time; 551 next_sendtime += retransmit_time; 552 goto send_again; 553 } 554 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 555 } 556 557 got_reply: 558 /* 559 * Now decode and validate the response. We need to drop the 560 * lock since xdr_replymsg may end up sleeping in malloc. 561 */ 562 mtx_unlock(&cs->cs_lock); 563 564 if (ext && ext->rc_feedback) 565 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 566 567 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 568 ok = xdr_replymsg(&xdrs, &reply_msg); 569 XDR_DESTROY(&xdrs); 570 cr->cr_mrep = NULL; 571 572 mtx_lock(&cs->cs_lock); 573 574 if (ok) { 575 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 576 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 577 cu->cu_error.re_status = RPC_SUCCESS; 578 else 579 _seterr_reply(&reply_msg, &(cu->cu_error)); 580 581 if (cu->cu_error.re_status == RPC_SUCCESS) { 582 if (! AUTH_VALIDATE(cl->cl_auth, 583 &reply_msg.acpted_rply.ar_verf)) { 584 cu->cu_error.re_status = RPC_AUTHERROR; 585 cu->cu_error.re_why = AUTH_INVALIDRESP; 586 } 587 if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) { 588 xdrs.x_op = XDR_FREE; 589 (void) xdr_opaque_auth(&xdrs, 590 &(reply_msg.acpted_rply.ar_verf)); 591 } 592 } /* end successful completion */ 593 /* 594 * If unsuccesful AND error is an authentication error 595 * then refresh credentials and try again, else break 596 */ 597 else if (cu->cu_error.re_status == RPC_AUTHERROR) 598 /* maybe our credentials need to be refreshed ... */ 599 if (nrefreshes > 0 && 600 AUTH_REFRESH(cl->cl_auth, &reply_msg)) { 601 nrefreshes--; 602 goto call_again; 603 } 604 /* end of unsuccessful completion */ 605 } /* end of valid reply message */ 606 else { 607 cu->cu_error.re_status = RPC_CANTDECODERES; 608 609 } 610 out: 611 mtx_assert(&cs->cs_lock, MA_OWNED); 612 613 if (mreq) 614 m_freem(mreq); 615 if (cr->cr_mrep) 616 m_freem(cr->cr_mrep); 617 618 cu->cu_threads--; 619 if (cu->cu_closing) 620 wakeup(cu); 621 622 mtx_unlock(&cs->cs_lock); 623 624 free(cr, M_RPC); 625 626 return (cu->cu_error.re_status); 627 } 628 629 static void 630 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 631 { 632 struct cu_data *cu = (struct cu_data *)cl->cl_private; 633 634 *errp = cu->cu_error; 635 } 636 637 static bool_t 638 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 639 { 640 XDR xdrs; 641 bool_t dummy; 642 643 xdrs.x_op = XDR_FREE; 644 dummy = (*xdr_res)(&xdrs, res_ptr); 645 646 return (dummy); 647 } 648 649 /*ARGSUSED*/ 650 static void 651 clnt_dg_abort(CLIENT *h) 652 { 653 } 654 655 static bool_t 656 clnt_dg_control(CLIENT *cl, u_int request, void *info) 657 { 658 struct cu_data *cu = (struct cu_data *)cl->cl_private; 659 struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; 660 struct sockaddr *addr; 661 662 mtx_lock(&cs->cs_lock); 663 664 switch (request) { 665 case CLSET_FD_CLOSE: 666 cu->cu_closeit = TRUE; 667 mtx_unlock(&cs->cs_lock); 668 return (TRUE); 669 case CLSET_FD_NCLOSE: 670 cu->cu_closeit = FALSE; 671 mtx_unlock(&cs->cs_lock); 672 return (TRUE); 673 } 674 675 /* for other requests which use info */ 676 if (info == NULL) { 677 mtx_unlock(&cs->cs_lock); 678 return (FALSE); 679 } 680 switch (request) { 681 case CLSET_TIMEOUT: 682 if (time_not_ok((struct timeval *)info)) { 683 mtx_unlock(&cs->cs_lock); 684 return (FALSE); 685 } 686 cu->cu_total = *(struct timeval *)info; 687 break; 688 case CLGET_TIMEOUT: 689 *(struct timeval *)info = cu->cu_total; 690 break; 691 case CLSET_RETRY_TIMEOUT: 692 if (time_not_ok((struct timeval *)info)) { 693 mtx_unlock(&cs->cs_lock); 694 return (FALSE); 695 } 696 cu->cu_wait = *(struct timeval *)info; 697 break; 698 case CLGET_RETRY_TIMEOUT: 699 *(struct timeval *)info = cu->cu_wait; 700 break; 701 case CLGET_SVC_ADDR: 702 /* 703 * Slightly different semantics to userland - we use 704 * sockaddr instead of netbuf. 705 */ 706 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 707 break; 708 case CLSET_SVC_ADDR: /* set to new address */ 709 addr = (struct sockaddr *)info; 710 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 711 break; 712 case CLGET_XID: 713 *(uint32_t *)info = cu->cu_xid; 714 break; 715 716 case CLSET_XID: 717 /* This will set the xid of the NEXT call */ 718 /* decrement by 1 as clnt_dg_call() increments once */ 719 cu->cu_xid = *(uint32_t *)info - 1; 720 break; 721 722 case CLGET_VERS: 723 /* 724 * This RELIES on the information that, in the call body, 725 * the version number field is the fifth field from the 726 * begining of the RPC header. MUST be changed if the 727 * call_struct is changed 728 */ 729 *(uint32_t *)info = 730 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 731 4 * BYTES_PER_XDR_UNIT)); 732 break; 733 734 case CLSET_VERS: 735 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 736 = htonl(*(uint32_t *)info); 737 break; 738 739 case CLGET_PROG: 740 /* 741 * This RELIES on the information that, in the call body, 742 * the program number field is the fourth field from the 743 * begining of the RPC header. MUST be changed if the 744 * call_struct is changed 745 */ 746 *(uint32_t *)info = 747 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 748 3 * BYTES_PER_XDR_UNIT)); 749 break; 750 751 case CLSET_PROG: 752 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 753 = htonl(*(uint32_t *)info); 754 break; 755 case CLSET_ASYNC: 756 cu->cu_async = *(int *)info; 757 break; 758 case CLSET_CONNECT: 759 cu->cu_connect = *(int *)info; 760 break; 761 case CLSET_WAITCHAN: 762 cu->cu_waitchan = *(const char **)info; 763 break; 764 case CLGET_WAITCHAN: 765 *(const char **) info = cu->cu_waitchan; 766 break; 767 case CLSET_INTERRUPTIBLE: 768 if (*(int *) info) 769 cu->cu_waitflag = PCATCH; 770 else 771 cu->cu_waitflag = 0; 772 break; 773 case CLGET_INTERRUPTIBLE: 774 if (cu->cu_waitflag) 775 *(int *) info = TRUE; 776 else 777 *(int *) info = FALSE; 778 break; 779 default: 780 mtx_unlock(&cs->cs_lock); 781 return (FALSE); 782 } 783 mtx_unlock(&cs->cs_lock); 784 return (TRUE); 785 } 786 787 static void 788 clnt_dg_destroy(CLIENT *cl) 789 { 790 struct cu_data *cu = (struct cu_data *)cl->cl_private; 791 struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; 792 struct cu_request *cr; 793 struct socket *so = NULL; 794 bool_t lastsocketref; 795 796 mtx_lock(&cs->cs_lock); 797 798 /* 799 * Abort any pending requests and wait until everyone 800 * has finished with clnt_vc_call. 801 */ 802 cu->cu_closing = TRUE; 803 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 804 if (cr->cr_client == cl) { 805 cr->cr_xid = 0; 806 cr->cr_error = ESHUTDOWN; 807 wakeup(cr); 808 } 809 } 810 811 while (cu->cu_threads) 812 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 813 814 cs->cs_refs--; 815 if (cs->cs_refs == 0) { 816 mtx_destroy(&cs->cs_lock); 817 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 818 cu->cu_socket->so_upcallarg = NULL; 819 cu->cu_socket->so_upcall = NULL; 820 cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL; 821 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 822 mem_free(cs, sizeof(*cs)); 823 lastsocketref = TRUE; 824 } else { 825 mtx_unlock(&cs->cs_lock); 826 lastsocketref = FALSE; 827 } 828 829 if (cu->cu_closeit && lastsocketref) { 830 so = cu->cu_socket; 831 cu->cu_socket = NULL; 832 } 833 834 if (so) 835 soclose(so); 836 837 if (cl->cl_netid && cl->cl_netid[0]) 838 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 839 if (cl->cl_tp && cl->cl_tp[0]) 840 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 841 mem_free(cu, sizeof (*cu)); 842 mem_free(cl, sizeof (CLIENT)); 843 } 844 845 /* 846 * Make sure that the time is not garbage. -1 value is allowed. 847 */ 848 static bool_t 849 time_not_ok(struct timeval *t) 850 { 851 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 852 t->tv_usec < -1 || t->tv_usec > 1000000); 853 } 854 855 void 856 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 857 { 858 struct cu_socket *cs = (struct cu_socket *) arg; 859 struct uio uio; 860 struct mbuf *m; 861 struct mbuf *control; 862 struct cu_request *cr; 863 int error, rcvflag, foundreq; 864 uint32_t xid; 865 866 uio.uio_resid = 1000000000; 867 uio.uio_td = curthread; 868 do { 869 m = NULL; 870 control = NULL; 871 rcvflag = MSG_DONTWAIT; 872 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 873 if (control) 874 m_freem(control); 875 876 if (error == EWOULDBLOCK) 877 break; 878 879 /* 880 * If there was an error, wake up all pending 881 * requests. 882 */ 883 if (error) { 884 mtx_lock(&cs->cs_lock); 885 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 886 cr->cr_xid = 0; 887 cr->cr_error = error; 888 wakeup(cr); 889 } 890 mtx_unlock(&cs->cs_lock); 891 break; 892 } 893 894 /* 895 * The XID is in the first uint32_t of the reply. 896 */ 897 m = m_pullup(m, sizeof(xid)); 898 if (!m) 899 /* 900 * Should never happen. 901 */ 902 continue; 903 904 xid = ntohl(*mtod(m, uint32_t *)); 905 906 /* 907 * Attempt to match this reply with a pending request. 908 */ 909 mtx_lock(&cs->cs_lock); 910 foundreq = 0; 911 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 912 if (cr->cr_xid == xid) { 913 /* 914 * This one matches. We leave the 915 * reply mbuf in cr->cr_mrep. Set the 916 * XID to zero so that we will ignore 917 * any duplicated replies that arrive 918 * before clnt_dg_call removes it from 919 * the queue. 920 */ 921 cr->cr_xid = 0; 922 cr->cr_mrep = m; 923 cr->cr_error = 0; 924 foundreq = 1; 925 wakeup(cr); 926 break; 927 } 928 } 929 mtx_unlock(&cs->cs_lock); 930 931 /* 932 * If we didn't find the matching request, just drop 933 * it - its probably a repeated reply. 934 */ 935 if (!foundreq) 936 m_freem(m); 937 } while (m); 938 } 939 940