1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 /* 32 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 33 */ 34 35 #if defined(LIBC_SCCS) && !defined(lint) 36 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 38 #endif 39 #include <sys/cdefs.h> 40 __FBSDID("$FreeBSD$"); 41 42 /* 43 * Implements a connectionless client side RPC. 44 */ 45 46 #include <sys/param.h> 47 #include <sys/systm.h> 48 #include <sys/kernel.h> 49 #include <sys/lock.h> 50 #include <sys/malloc.h> 51 #include <sys/mbuf.h> 52 #include <sys/mutex.h> 53 #include <sys/pcpu.h> 54 #include <sys/proc.h> 55 #include <sys/socket.h> 56 #include <sys/socketvar.h> 57 #include <sys/time.h> 58 #include <sys/uio.h> 59 60 #include <net/vnet.h> 61 62 #include <rpc/rpc.h> 63 #include <rpc/rpc_com.h> 64 65 66 #ifdef _FREEFALL_CONFIG 67 /* 68 * Disable RPC exponential back-off for FreeBSD.org systems. 69 */ 70 #define RPC_MAX_BACKOFF 1 /* second */ 71 #else 72 #define RPC_MAX_BACKOFF 30 /* seconds */ 73 #endif 74 75 static bool_t time_not_ok(struct timeval *); 76 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 77 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 78 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 79 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 80 static void clnt_dg_abort(CLIENT *); 81 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 82 static void clnt_dg_close(CLIENT *); 83 static void clnt_dg_destroy(CLIENT *); 84 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 85 86 static struct clnt_ops clnt_dg_ops = { 87 .cl_call = clnt_dg_call, 88 .cl_abort = clnt_dg_abort, 89 .cl_geterr = clnt_dg_geterr, 90 .cl_freeres = clnt_dg_freeres, 91 .cl_close = clnt_dg_close, 92 .cl_destroy = clnt_dg_destroy, 93 .cl_control = clnt_dg_control 94 }; 95 96 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; 97 98 /* 99 * A pending RPC request which awaits a reply. Requests which have 100 * received their reply will have cr_xid set to zero and cr_mrep to 101 * the mbuf chain of the reply. 102 */ 103 struct cu_request { 104 TAILQ_ENTRY(cu_request) cr_link; 105 CLIENT *cr_client; /* owner */ 106 uint32_t cr_xid; /* XID of request */ 107 struct mbuf *cr_mrep; /* reply received by upcall */ 108 int cr_error; /* any error from upcall */ 109 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 110 }; 111 112 TAILQ_HEAD(cu_request_list, cu_request); 113 114 #define MCALL_MSG_SIZE 24 115 116 /* 117 * This structure is pointed to by the socket buffer's sb_upcallarg 118 * member. It is separate from the client private data to facilitate 119 * multiple clients sharing the same socket. The cs_lock mutex is used 120 * to protect all fields of this structure, the socket's receive 121 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 122 * structures is installed on the socket. 123 */ 124 struct cu_socket { 125 struct mtx cs_lock; 126 int cs_refs; /* Count of clients */ 127 struct cu_request_list cs_pending; /* Requests awaiting replies */ 128 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 129 }; 130 131 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 132 133 /* 134 * Private data kept per client handle 135 */ 136 struct cu_data { 137 int cu_threads; /* # threads in clnt_vc_call */ 138 bool_t cu_closing; /* TRUE if we are closing */ 139 bool_t cu_closed; /* TRUE if we are closed */ 140 struct socket *cu_socket; /* connection socket */ 141 bool_t cu_closeit; /* opened by library */ 142 struct sockaddr_storage cu_raddr; /* remote address */ 143 int cu_rlen; 144 struct timeval cu_wait; /* retransmit interval */ 145 struct timeval cu_total; /* total time for the call */ 146 struct rpc_err cu_error; 147 uint32_t cu_xid; 148 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 149 size_t cu_mcalllen; 150 size_t cu_sendsz; /* send size */ 151 size_t cu_recvsz; /* recv size */ 152 int cu_async; 153 int cu_connect; /* Use connect(). */ 154 int cu_connected; /* Have done connect(). */ 155 const char *cu_waitchan; 156 int cu_waitflag; 157 int cu_cwnd; /* congestion window */ 158 int cu_sent; /* number of in-flight RPCs */ 159 bool_t cu_cwnd_wait; 160 }; 161 162 #define CWNDSCALE 256 163 #define MAXCWND (32 * CWNDSCALE) 164 165 /* 166 * Connection less client creation returns with client handle parameters. 167 * Default options are set, which the user can change using clnt_control(). 168 * fd should be open and bound. 169 * NB: The rpch->cl_auth is initialized to null authentication. 170 * Caller may wish to set this something more useful. 171 * 172 * sendsz and recvsz are the maximum allowable packet sizes that can be 173 * sent and received. Normally they are the same, but they can be 174 * changed to improve the program efficiency and buffer allocation. 175 * If they are 0, use the transport default. 176 * 177 * If svcaddr is NULL, returns NULL. 178 */ 179 CLIENT * 180 clnt_dg_create( 181 struct socket *so, 182 struct sockaddr *svcaddr, /* servers address */ 183 rpcprog_t program, /* program number */ 184 rpcvers_t version, /* version number */ 185 size_t sendsz, /* buffer recv size */ 186 size_t recvsz) /* buffer send size */ 187 { 188 CLIENT *cl = NULL; /* client handle */ 189 struct cu_data *cu = NULL; /* private data */ 190 struct cu_socket *cs = NULL; 191 struct sockbuf *sb; 192 struct timeval now; 193 struct rpc_msg call_msg; 194 struct __rpc_sockinfo si; 195 XDR xdrs; 196 int error; 197 198 if (svcaddr == NULL) { 199 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 200 return (NULL); 201 } 202 203 if (!__rpc_socket2sockinfo(so, &si)) { 204 rpc_createerr.cf_stat = RPC_TLIERROR; 205 rpc_createerr.cf_error.re_errno = 0; 206 return (NULL); 207 } 208 209 /* 210 * Find the receive and the send size 211 */ 212 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 213 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 214 if ((sendsz == 0) || (recvsz == 0)) { 215 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 216 rpc_createerr.cf_error.re_errno = 0; 217 return (NULL); 218 } 219 220 cl = mem_alloc(sizeof (CLIENT)); 221 222 /* 223 * Should be multiple of 4 for XDR. 224 */ 225 sendsz = ((sendsz + 3) / 4) * 4; 226 recvsz = ((recvsz + 3) / 4) * 4; 227 cu = mem_alloc(sizeof (*cu)); 228 cu->cu_threads = 0; 229 cu->cu_closing = FALSE; 230 cu->cu_closed = FALSE; 231 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 232 cu->cu_rlen = svcaddr->sa_len; 233 /* Other values can also be set through clnt_control() */ 234 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 235 cu->cu_wait.tv_usec = 0; 236 cu->cu_total.tv_sec = -1; 237 cu->cu_total.tv_usec = -1; 238 cu->cu_sendsz = sendsz; 239 cu->cu_recvsz = recvsz; 240 cu->cu_async = FALSE; 241 cu->cu_connect = FALSE; 242 cu->cu_connected = FALSE; 243 cu->cu_waitchan = "rpcrecv"; 244 cu->cu_waitflag = 0; 245 cu->cu_cwnd = MAXCWND / 2; 246 cu->cu_sent = 0; 247 cu->cu_cwnd_wait = FALSE; 248 (void) getmicrotime(&now); 249 cu->cu_xid = __RPC_GETXID(&now); 250 call_msg.rm_xid = cu->cu_xid; 251 call_msg.rm_call.cb_prog = program; 252 call_msg.rm_call.cb_vers = version; 253 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 254 if (! xdr_callhdr(&xdrs, &call_msg)) { 255 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 256 rpc_createerr.cf_error.re_errno = 0; 257 goto err2; 258 } 259 cu->cu_mcalllen = XDR_GETPOS(&xdrs); 260 261 /* 262 * By default, closeit is always FALSE. It is users responsibility 263 * to do a close on it, else the user may use clnt_control 264 * to let clnt_destroy do it for him/her. 265 */ 266 cu->cu_closeit = FALSE; 267 cu->cu_socket = so; 268 error = soreserve(so, (u_long)sendsz, (u_long)recvsz); 269 if (error != 0) { 270 rpc_createerr.cf_stat = RPC_FAILED; 271 rpc_createerr.cf_error.re_errno = error; 272 goto err2; 273 } 274 275 sb = &so->so_rcv; 276 SOCKBUF_LOCK(&so->so_rcv); 277 recheck_socket: 278 if (sb->sb_upcall) { 279 if (sb->sb_upcall != clnt_dg_soupcall) { 280 SOCKBUF_UNLOCK(&so->so_rcv); 281 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 282 goto err2; 283 } 284 cs = (struct cu_socket *) sb->sb_upcallarg; 285 mtx_lock(&cs->cs_lock); 286 cs->cs_refs++; 287 mtx_unlock(&cs->cs_lock); 288 } else { 289 /* 290 * We are the first on this socket - allocate the 291 * structure and install it in the socket. 292 */ 293 SOCKBUF_UNLOCK(&so->so_rcv); 294 cs = mem_alloc(sizeof(*cs)); 295 SOCKBUF_LOCK(&so->so_rcv); 296 if (sb->sb_upcall) { 297 /* 298 * We have lost a race with some other client. 299 */ 300 mem_free(cs, sizeof(*cs)); 301 goto recheck_socket; 302 } 303 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 304 cs->cs_refs = 1; 305 cs->cs_upcallrefs = 0; 306 TAILQ_INIT(&cs->cs_pending); 307 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 308 } 309 SOCKBUF_UNLOCK(&so->so_rcv); 310 311 cl->cl_refs = 1; 312 cl->cl_ops = &clnt_dg_ops; 313 cl->cl_private = (caddr_t)(void *)cu; 314 cl->cl_auth = authnone_create(); 315 cl->cl_tp = NULL; 316 cl->cl_netid = NULL; 317 return (cl); 318 err2: 319 if (cl) { 320 mem_free(cl, sizeof (CLIENT)); 321 if (cu) 322 mem_free(cu, sizeof (*cu)); 323 } 324 return (NULL); 325 } 326 327 static enum clnt_stat 328 clnt_dg_call( 329 CLIENT *cl, /* client handle */ 330 struct rpc_callextra *ext, /* call metadata */ 331 rpcproc_t proc, /* procedure number */ 332 struct mbuf *args, /* pointer to args */ 333 struct mbuf **resultsp, /* pointer to results */ 334 struct timeval utimeout) /* seconds to wait before giving up */ 335 { 336 struct cu_data *cu = (struct cu_data *)cl->cl_private; 337 struct cu_socket *cs; 338 struct rpc_timers *rt; 339 AUTH *auth; 340 struct rpc_err *errp; 341 enum clnt_stat stat; 342 XDR xdrs; 343 struct rpc_msg reply_msg; 344 bool_t ok; 345 int retrans; /* number of re-transmits so far */ 346 int nrefreshes = 2; /* number of times to refresh cred */ 347 struct timeval *tvp; 348 int timeout; 349 int retransmit_time; 350 int next_sendtime, starttime, rtt, time_waited, tv = 0; 351 struct sockaddr *sa; 352 socklen_t salen; 353 uint32_t xid = 0; 354 struct mbuf *mreq = NULL, *results; 355 struct cu_request *cr; 356 int error; 357 358 cs = cu->cu_socket->so_rcv.sb_upcallarg; 359 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 360 361 mtx_lock(&cs->cs_lock); 362 363 if (cu->cu_closing || cu->cu_closed) { 364 mtx_unlock(&cs->cs_lock); 365 free(cr, M_RPC); 366 return (RPC_CANTSEND); 367 } 368 cu->cu_threads++; 369 370 if (ext) { 371 auth = ext->rc_auth; 372 errp = &ext->rc_err; 373 } else { 374 auth = cl->cl_auth; 375 errp = &cu->cu_error; 376 } 377 378 cr->cr_client = cl; 379 cr->cr_mrep = NULL; 380 cr->cr_error = 0; 381 382 if (cu->cu_total.tv_usec == -1) { 383 tvp = &utimeout; /* use supplied timeout */ 384 } else { 385 tvp = &cu->cu_total; /* use default timeout */ 386 } 387 if (tvp->tv_sec || tvp->tv_usec) 388 timeout = tvtohz(tvp); 389 else 390 timeout = 0; 391 392 if (cu->cu_connect && !cu->cu_connected) { 393 mtx_unlock(&cs->cs_lock); 394 error = soconnect(cu->cu_socket, 395 (struct sockaddr *)&cu->cu_raddr, curthread); 396 mtx_lock(&cs->cs_lock); 397 if (error) { 398 errp->re_errno = error; 399 errp->re_status = stat = RPC_CANTSEND; 400 goto out; 401 } 402 cu->cu_connected = 1; 403 } 404 if (cu->cu_connected) { 405 sa = NULL; 406 salen = 0; 407 } else { 408 sa = (struct sockaddr *)&cu->cu_raddr; 409 salen = cu->cu_rlen; 410 } 411 time_waited = 0; 412 retrans = 0; 413 if (ext && ext->rc_timers) { 414 rt = ext->rc_timers; 415 if (!rt->rt_rtxcur) 416 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 417 retransmit_time = next_sendtime = rt->rt_rtxcur; 418 } else { 419 rt = NULL; 420 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 421 } 422 423 starttime = ticks; 424 425 call_again: 426 mtx_assert(&cs->cs_lock, MA_OWNED); 427 428 cu->cu_xid++; 429 xid = cu->cu_xid; 430 431 send_again: 432 mtx_unlock(&cs->cs_lock); 433 434 MGETHDR(mreq, M_WAIT, MT_DATA); 435 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 436 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 437 mreq->m_len = cu->cu_mcalllen; 438 439 /* 440 * The XID is the first thing in the request. 441 */ 442 *mtod(mreq, uint32_t *) = htonl(xid); 443 444 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 445 446 if (cu->cu_async == TRUE && args == NULL) 447 goto get_reply; 448 449 if ((! XDR_PUTINT32(&xdrs, &proc)) || 450 (! AUTH_MARSHALL(auth, xid, &xdrs, 451 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 452 errp->re_status = stat = RPC_CANTENCODEARGS; 453 mtx_lock(&cs->cs_lock); 454 goto out; 455 } 456 mreq->m_pkthdr.len = m_length(mreq, NULL); 457 458 cr->cr_xid = xid; 459 mtx_lock(&cs->cs_lock); 460 461 /* 462 * Try to get a place in the congestion window. 463 */ 464 while (cu->cu_sent >= cu->cu_cwnd) { 465 cu->cu_cwnd_wait = TRUE; 466 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 467 cu->cu_waitflag, "rpccwnd", 0); 468 if (error) { 469 errp->re_errno = error; 470 errp->re_status = stat = RPC_CANTSEND; 471 goto out; 472 } 473 } 474 cu->cu_sent += CWNDSCALE; 475 476 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 477 mtx_unlock(&cs->cs_lock); 478 479 /* 480 * sosend consumes mreq. 481 */ 482 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 483 mreq = NULL; 484 485 /* 486 * sub-optimal code appears here because we have 487 * some clock time to spare while the packets are in flight. 488 * (We assume that this is actually only executed once.) 489 */ 490 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 491 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 492 reply_msg.acpted_rply.ar_verf.oa_length = 0; 493 reply_msg.acpted_rply.ar_results.where = NULL; 494 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 495 496 mtx_lock(&cs->cs_lock); 497 if (error) { 498 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 499 errp->re_errno = error; 500 errp->re_status = stat = RPC_CANTSEND; 501 cu->cu_sent -= CWNDSCALE; 502 if (cu->cu_cwnd_wait) { 503 cu->cu_cwnd_wait = FALSE; 504 wakeup(&cu->cu_cwnd_wait); 505 } 506 goto out; 507 } 508 509 /* 510 * Check to see if we got an upcall while waiting for the 511 * lock. 512 */ 513 if (cr->cr_error) { 514 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 515 errp->re_errno = cr->cr_error; 516 errp->re_status = stat = RPC_CANTRECV; 517 cu->cu_sent -= CWNDSCALE; 518 if (cu->cu_cwnd_wait) { 519 cu->cu_cwnd_wait = FALSE; 520 wakeup(&cu->cu_cwnd_wait); 521 } 522 goto out; 523 } 524 if (cr->cr_mrep) { 525 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 526 cu->cu_sent -= CWNDSCALE; 527 if (cu->cu_cwnd_wait) { 528 cu->cu_cwnd_wait = FALSE; 529 wakeup(&cu->cu_cwnd_wait); 530 } 531 goto got_reply; 532 } 533 534 /* 535 * Hack to provide rpc-based message passing 536 */ 537 if (timeout == 0) { 538 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 539 errp->re_status = stat = RPC_TIMEDOUT; 540 cu->cu_sent -= CWNDSCALE; 541 if (cu->cu_cwnd_wait) { 542 cu->cu_cwnd_wait = FALSE; 543 wakeup(&cu->cu_cwnd_wait); 544 } 545 goto out; 546 } 547 548 get_reply: 549 for (;;) { 550 /* Decide how long to wait. */ 551 if (next_sendtime < timeout) 552 tv = next_sendtime; 553 else 554 tv = timeout; 555 tv -= time_waited; 556 557 if (tv > 0) { 558 if (cu->cu_closing || cu->cu_closed) { 559 error = 0; 560 cr->cr_error = ESHUTDOWN; 561 } else { 562 error = msleep(cr, &cs->cs_lock, 563 cu->cu_waitflag, cu->cu_waitchan, tv); 564 } 565 } else { 566 error = EWOULDBLOCK; 567 } 568 569 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 570 cu->cu_sent -= CWNDSCALE; 571 if (cu->cu_cwnd_wait) { 572 cu->cu_cwnd_wait = FALSE; 573 wakeup(&cu->cu_cwnd_wait); 574 } 575 576 if (!error) { 577 /* 578 * We were woken up by the upcall. If the 579 * upcall had a receive error, report that, 580 * otherwise we have a reply. 581 */ 582 if (cr->cr_error) { 583 errp->re_errno = cr->cr_error; 584 errp->re_status = stat = RPC_CANTRECV; 585 goto out; 586 } 587 588 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 589 + cu->cu_cwnd / 2) / cu->cu_cwnd; 590 if (cu->cu_cwnd > MAXCWND) 591 cu->cu_cwnd = MAXCWND; 592 593 if (rt) { 594 /* 595 * Add one to the time since a tick 596 * count of N means that the actual 597 * time taken was somewhere between N 598 * and N+1. 599 */ 600 rtt = ticks - starttime + 1; 601 602 /* 603 * Update our estimate of the round 604 * trip time using roughly the 605 * algorithm described in RFC 606 * 2988. Given an RTT sample R: 607 * 608 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 609 * SRTT = (1-alpha) * SRTT + alpha * R 610 * 611 * where alpha = 0.125 and beta = 0.25. 612 * 613 * The initial retransmit timeout is 614 * SRTT + 4*RTTVAR and doubles on each 615 * retransmision. 616 */ 617 if (rt->rt_srtt == 0) { 618 rt->rt_srtt = rtt; 619 rt->rt_deviate = rtt / 2; 620 } else { 621 int32_t error = rtt - rt->rt_srtt; 622 rt->rt_srtt += error / 8; 623 error = abs(error) - rt->rt_deviate; 624 rt->rt_deviate += error / 4; 625 } 626 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 627 } 628 629 break; 630 } 631 632 /* 633 * The sleep returned an error so our request is still 634 * on the list. If we got EWOULDBLOCK, we may want to 635 * re-send the request. 636 */ 637 if (error != EWOULDBLOCK) { 638 errp->re_errno = error; 639 if (error == EINTR) 640 errp->re_status = stat = RPC_INTR; 641 else 642 errp->re_status = stat = RPC_CANTRECV; 643 goto out; 644 } 645 646 time_waited = ticks - starttime; 647 648 /* Check for timeout. */ 649 if (time_waited > timeout) { 650 errp->re_errno = EWOULDBLOCK; 651 errp->re_status = stat = RPC_TIMEDOUT; 652 goto out; 653 } 654 655 /* Retransmit if necessary. */ 656 if (time_waited >= next_sendtime) { 657 cu->cu_cwnd /= 2; 658 if (cu->cu_cwnd < CWNDSCALE) 659 cu->cu_cwnd = CWNDSCALE; 660 if (ext && ext->rc_feedback) { 661 mtx_unlock(&cs->cs_lock); 662 if (retrans == 0) 663 ext->rc_feedback(FEEDBACK_REXMIT1, 664 proc, ext->rc_feedback_arg); 665 else 666 ext->rc_feedback(FEEDBACK_REXMIT2, 667 proc, ext->rc_feedback_arg); 668 mtx_lock(&cs->cs_lock); 669 } 670 if (cu->cu_closing || cu->cu_closed) { 671 errp->re_errno = ESHUTDOWN; 672 errp->re_status = stat = RPC_CANTRECV; 673 goto out; 674 } 675 retrans++; 676 /* update retransmit_time */ 677 if (retransmit_time < RPC_MAX_BACKOFF * hz) 678 retransmit_time = 2 * retransmit_time; 679 next_sendtime += retransmit_time; 680 goto send_again; 681 } 682 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 683 } 684 685 got_reply: 686 /* 687 * Now decode and validate the response. We need to drop the 688 * lock since xdr_replymsg may end up sleeping in malloc. 689 */ 690 mtx_unlock(&cs->cs_lock); 691 692 if (ext && ext->rc_feedback) 693 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 694 695 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 696 ok = xdr_replymsg(&xdrs, &reply_msg); 697 cr->cr_mrep = NULL; 698 699 if (ok) { 700 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 701 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 702 errp->re_status = stat = RPC_SUCCESS; 703 else 704 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 705 706 if (errp->re_status == RPC_SUCCESS) { 707 results = xdrmbuf_getall(&xdrs); 708 if (! AUTH_VALIDATE(auth, xid, 709 &reply_msg.acpted_rply.ar_verf, 710 &results)) { 711 errp->re_status = stat = RPC_AUTHERROR; 712 errp->re_why = AUTH_INVALIDRESP; 713 if (retrans && 714 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 715 /* 716 * If we retransmitted, its 717 * possible that we will 718 * receive a reply for one of 719 * the earlier transmissions 720 * (which will use an older 721 * RPCSEC_GSS sequence 722 * number). In this case, just 723 * go back and listen for a 724 * new reply. We could keep a 725 * record of all the seq 726 * numbers we have transmitted 727 * so far so that we could 728 * accept a reply for any of 729 * them here. 730 */ 731 XDR_DESTROY(&xdrs); 732 mtx_lock(&cs->cs_lock); 733 TAILQ_INSERT_TAIL(&cs->cs_pending, 734 cr, cr_link); 735 cr->cr_mrep = NULL; 736 goto get_reply; 737 } 738 } else { 739 *resultsp = results; 740 } 741 } /* end successful completion */ 742 /* 743 * If unsuccesful AND error is an authentication error 744 * then refresh credentials and try again, else break 745 */ 746 else if (stat == RPC_AUTHERROR) 747 /* maybe our credentials need to be refreshed ... */ 748 if (nrefreshes > 0 && 749 AUTH_REFRESH(auth, &reply_msg)) { 750 nrefreshes--; 751 XDR_DESTROY(&xdrs); 752 mtx_lock(&cs->cs_lock); 753 goto call_again; 754 } 755 /* end of unsuccessful completion */ 756 } /* end of valid reply message */ 757 else { 758 errp->re_status = stat = RPC_CANTDECODERES; 759 760 } 761 XDR_DESTROY(&xdrs); 762 mtx_lock(&cs->cs_lock); 763 out: 764 mtx_assert(&cs->cs_lock, MA_OWNED); 765 766 if (mreq) 767 m_freem(mreq); 768 if (cr->cr_mrep) 769 m_freem(cr->cr_mrep); 770 771 cu->cu_threads--; 772 if (cu->cu_closing) 773 wakeup(cu); 774 775 mtx_unlock(&cs->cs_lock); 776 777 if (auth && stat != RPC_SUCCESS) 778 AUTH_VALIDATE(auth, xid, NULL, NULL); 779 780 free(cr, M_RPC); 781 782 return (stat); 783 } 784 785 static void 786 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 787 { 788 struct cu_data *cu = (struct cu_data *)cl->cl_private; 789 790 *errp = cu->cu_error; 791 } 792 793 static bool_t 794 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 795 { 796 XDR xdrs; 797 bool_t dummy; 798 799 xdrs.x_op = XDR_FREE; 800 dummy = (*xdr_res)(&xdrs, res_ptr); 801 802 return (dummy); 803 } 804 805 /*ARGSUSED*/ 806 static void 807 clnt_dg_abort(CLIENT *h) 808 { 809 } 810 811 static bool_t 812 clnt_dg_control(CLIENT *cl, u_int request, void *info) 813 { 814 struct cu_data *cu = (struct cu_data *)cl->cl_private; 815 struct cu_socket *cs; 816 struct sockaddr *addr; 817 818 cs = cu->cu_socket->so_rcv.sb_upcallarg; 819 mtx_lock(&cs->cs_lock); 820 821 switch (request) { 822 case CLSET_FD_CLOSE: 823 cu->cu_closeit = TRUE; 824 mtx_unlock(&cs->cs_lock); 825 return (TRUE); 826 case CLSET_FD_NCLOSE: 827 cu->cu_closeit = FALSE; 828 mtx_unlock(&cs->cs_lock); 829 return (TRUE); 830 } 831 832 /* for other requests which use info */ 833 if (info == NULL) { 834 mtx_unlock(&cs->cs_lock); 835 return (FALSE); 836 } 837 switch (request) { 838 case CLSET_TIMEOUT: 839 if (time_not_ok((struct timeval *)info)) { 840 mtx_unlock(&cs->cs_lock); 841 return (FALSE); 842 } 843 cu->cu_total = *(struct timeval *)info; 844 break; 845 case CLGET_TIMEOUT: 846 *(struct timeval *)info = cu->cu_total; 847 break; 848 case CLSET_RETRY_TIMEOUT: 849 if (time_not_ok((struct timeval *)info)) { 850 mtx_unlock(&cs->cs_lock); 851 return (FALSE); 852 } 853 cu->cu_wait = *(struct timeval *)info; 854 break; 855 case CLGET_RETRY_TIMEOUT: 856 *(struct timeval *)info = cu->cu_wait; 857 break; 858 case CLGET_SVC_ADDR: 859 /* 860 * Slightly different semantics to userland - we use 861 * sockaddr instead of netbuf. 862 */ 863 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 864 break; 865 case CLSET_SVC_ADDR: /* set to new address */ 866 addr = (struct sockaddr *)info; 867 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 868 break; 869 case CLGET_XID: 870 *(uint32_t *)info = cu->cu_xid; 871 break; 872 873 case CLSET_XID: 874 /* This will set the xid of the NEXT call */ 875 /* decrement by 1 as clnt_dg_call() increments once */ 876 cu->cu_xid = *(uint32_t *)info - 1; 877 break; 878 879 case CLGET_VERS: 880 /* 881 * This RELIES on the information that, in the call body, 882 * the version number field is the fifth field from the 883 * begining of the RPC header. MUST be changed if the 884 * call_struct is changed 885 */ 886 *(uint32_t *)info = 887 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 888 4 * BYTES_PER_XDR_UNIT)); 889 break; 890 891 case CLSET_VERS: 892 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 893 = htonl(*(uint32_t *)info); 894 break; 895 896 case CLGET_PROG: 897 /* 898 * This RELIES on the information that, in the call body, 899 * the program number field is the fourth field from the 900 * begining of the RPC header. MUST be changed if the 901 * call_struct is changed 902 */ 903 *(uint32_t *)info = 904 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 905 3 * BYTES_PER_XDR_UNIT)); 906 break; 907 908 case CLSET_PROG: 909 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 910 = htonl(*(uint32_t *)info); 911 break; 912 case CLSET_ASYNC: 913 cu->cu_async = *(int *)info; 914 break; 915 case CLSET_CONNECT: 916 cu->cu_connect = *(int *)info; 917 break; 918 case CLSET_WAITCHAN: 919 cu->cu_waitchan = (const char *)info; 920 break; 921 case CLGET_WAITCHAN: 922 *(const char **) info = cu->cu_waitchan; 923 break; 924 case CLSET_INTERRUPTIBLE: 925 if (*(int *) info) 926 cu->cu_waitflag = PCATCH; 927 else 928 cu->cu_waitflag = 0; 929 break; 930 case CLGET_INTERRUPTIBLE: 931 if (cu->cu_waitflag) 932 *(int *) info = TRUE; 933 else 934 *(int *) info = FALSE; 935 break; 936 default: 937 mtx_unlock(&cs->cs_lock); 938 return (FALSE); 939 } 940 mtx_unlock(&cs->cs_lock); 941 return (TRUE); 942 } 943 944 static void 945 clnt_dg_close(CLIENT *cl) 946 { 947 struct cu_data *cu = (struct cu_data *)cl->cl_private; 948 struct cu_socket *cs; 949 struct cu_request *cr; 950 951 cs = cu->cu_socket->so_rcv.sb_upcallarg; 952 mtx_lock(&cs->cs_lock); 953 954 if (cu->cu_closed) { 955 mtx_unlock(&cs->cs_lock); 956 return; 957 } 958 959 if (cu->cu_closing) { 960 while (cu->cu_closing) 961 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 962 KASSERT(cu->cu_closed, ("client should be closed")); 963 mtx_unlock(&cs->cs_lock); 964 return; 965 } 966 967 /* 968 * Abort any pending requests and wait until everyone 969 * has finished with clnt_vc_call. 970 */ 971 cu->cu_closing = TRUE; 972 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 973 if (cr->cr_client == cl) { 974 cr->cr_xid = 0; 975 cr->cr_error = ESHUTDOWN; 976 wakeup(cr); 977 } 978 } 979 980 while (cu->cu_threads) 981 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 982 983 cu->cu_closing = FALSE; 984 cu->cu_closed = TRUE; 985 986 mtx_unlock(&cs->cs_lock); 987 wakeup(cu); 988 } 989 990 static void 991 clnt_dg_destroy(CLIENT *cl) 992 { 993 struct cu_data *cu = (struct cu_data *)cl->cl_private; 994 struct cu_socket *cs; 995 struct socket *so = NULL; 996 bool_t lastsocketref; 997 998 cs = cu->cu_socket->so_rcv.sb_upcallarg; 999 clnt_dg_close(cl); 1000 1001 mtx_lock(&cs->cs_lock); 1002 1003 cs->cs_refs--; 1004 if (cs->cs_refs == 0) { 1005 mtx_unlock(&cs->cs_lock); 1006 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 1007 soupcall_clear(cu->cu_socket, SO_RCV); 1008 clnt_dg_upcallsdone(cu->cu_socket, cs); 1009 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1010 mtx_destroy(&cs->cs_lock); 1011 mem_free(cs, sizeof(*cs)); 1012 lastsocketref = TRUE; 1013 } else { 1014 mtx_unlock(&cs->cs_lock); 1015 lastsocketref = FALSE; 1016 } 1017 1018 if (cu->cu_closeit && lastsocketref) { 1019 so = cu->cu_socket; 1020 cu->cu_socket = NULL; 1021 } 1022 1023 if (so) 1024 soclose(so); 1025 1026 if (cl->cl_netid && cl->cl_netid[0]) 1027 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1028 if (cl->cl_tp && cl->cl_tp[0]) 1029 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1030 mem_free(cu, sizeof (*cu)); 1031 mem_free(cl, sizeof (CLIENT)); 1032 } 1033 1034 /* 1035 * Make sure that the time is not garbage. -1 value is allowed. 1036 */ 1037 static bool_t 1038 time_not_ok(struct timeval *t) 1039 { 1040 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1041 t->tv_usec < -1 || t->tv_usec > 1000000); 1042 } 1043 1044 int 1045 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1046 { 1047 struct cu_socket *cs = (struct cu_socket *) arg; 1048 struct uio uio; 1049 struct mbuf *m; 1050 struct mbuf *control; 1051 struct cu_request *cr; 1052 int error, rcvflag, foundreq; 1053 uint32_t xid; 1054 1055 cs->cs_upcallrefs++; 1056 uio.uio_resid = 1000000000; 1057 uio.uio_td = curthread; 1058 do { 1059 SOCKBUF_UNLOCK(&so->so_rcv); 1060 m = NULL; 1061 control = NULL; 1062 rcvflag = MSG_DONTWAIT; 1063 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1064 if (control) 1065 m_freem(control); 1066 SOCKBUF_LOCK(&so->so_rcv); 1067 1068 if (error == EWOULDBLOCK) 1069 break; 1070 1071 /* 1072 * If there was an error, wake up all pending 1073 * requests. 1074 */ 1075 if (error) { 1076 mtx_lock(&cs->cs_lock); 1077 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1078 cr->cr_xid = 0; 1079 cr->cr_error = error; 1080 wakeup(cr); 1081 } 1082 mtx_unlock(&cs->cs_lock); 1083 break; 1084 } 1085 1086 /* 1087 * The XID is in the first uint32_t of the reply. 1088 */ 1089 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) { 1090 /* 1091 * Should never happen. 1092 */ 1093 m_freem(m); 1094 continue; 1095 } 1096 1097 m_copydata(m, 0, sizeof(xid), (char *)&xid); 1098 xid = ntohl(xid); 1099 1100 /* 1101 * Attempt to match this reply with a pending request. 1102 */ 1103 mtx_lock(&cs->cs_lock); 1104 foundreq = 0; 1105 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1106 if (cr->cr_xid == xid) { 1107 /* 1108 * This one matches. We leave the 1109 * reply mbuf in cr->cr_mrep. Set the 1110 * XID to zero so that we will ignore 1111 * any duplicated replies that arrive 1112 * before clnt_dg_call removes it from 1113 * the queue. 1114 */ 1115 cr->cr_xid = 0; 1116 cr->cr_mrep = m; 1117 cr->cr_error = 0; 1118 foundreq = 1; 1119 wakeup(cr); 1120 break; 1121 } 1122 } 1123 mtx_unlock(&cs->cs_lock); 1124 1125 /* 1126 * If we didn't find the matching request, just drop 1127 * it - its probably a repeated reply. 1128 */ 1129 if (!foundreq) 1130 m_freem(m); 1131 } while (m); 1132 cs->cs_upcallrefs--; 1133 if (cs->cs_upcallrefs < 0) 1134 panic("rpcdg upcall refcnt"); 1135 if (cs->cs_upcallrefs == 0) 1136 wakeup(&cs->cs_upcallrefs); 1137 return (SU_OK); 1138 } 1139 1140 /* 1141 * Wait for all upcalls in progress to complete. 1142 */ 1143 static void 1144 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1145 { 1146 1147 SOCKBUF_LOCK_ASSERT(&so->so_rcv); 1148 1149 while (cs->cs_upcallrefs > 0) 1150 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1151 "rpcdgup", 0); 1152 } 1153