1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 /* 31 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 32 */ 33 34 #if defined(LIBC_SCCS) && !defined(lint) 35 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 36 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 37 #endif 38 #include <sys/cdefs.h> 39 __FBSDID("$FreeBSD$"); 40 41 /* 42 * Implements a connectionless client side RPC. 43 */ 44 45 #include <sys/param.h> 46 #include <sys/systm.h> 47 #include <sys/kernel.h> 48 #include <sys/lock.h> 49 #include <sys/malloc.h> 50 #include <sys/mbuf.h> 51 #include <sys/mutex.h> 52 #include <sys/pcpu.h> 53 #include <sys/proc.h> 54 #include <sys/socket.h> 55 #include <sys/socketvar.h> 56 #include <sys/time.h> 57 #include <sys/uio.h> 58 59 #include <net/vnet.h> 60 61 #include <rpc/rpc.h> 62 #include <rpc/rpc_com.h> 63 64 65 #ifdef _FREEFALL_CONFIG 66 /* 67 * Disable RPC exponential back-off for FreeBSD.org systems. 68 */ 69 #define RPC_MAX_BACKOFF 1 /* second */ 70 #else 71 #define RPC_MAX_BACKOFF 30 /* seconds */ 72 #endif 73 74 static bool_t time_not_ok(struct timeval *); 75 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 76 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 77 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 78 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 79 static void clnt_dg_abort(CLIENT *); 80 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 81 static void clnt_dg_close(CLIENT *); 82 static void clnt_dg_destroy(CLIENT *); 83 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 84 85 static struct clnt_ops clnt_dg_ops = { 86 .cl_call = clnt_dg_call, 87 .cl_abort = clnt_dg_abort, 88 .cl_geterr = clnt_dg_geterr, 89 .cl_freeres = clnt_dg_freeres, 90 .cl_close = clnt_dg_close, 91 .cl_destroy = clnt_dg_destroy, 92 .cl_control = clnt_dg_control 93 }; 94 95 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; 96 97 /* 98 * A pending RPC request which awaits a reply. Requests which have 99 * received their reply will have cr_xid set to zero and cr_mrep to 100 * the mbuf chain of the reply. 101 */ 102 struct cu_request { 103 TAILQ_ENTRY(cu_request) cr_link; 104 CLIENT *cr_client; /* owner */ 105 uint32_t cr_xid; /* XID of request */ 106 struct mbuf *cr_mrep; /* reply received by upcall */ 107 int cr_error; /* any error from upcall */ 108 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 109 }; 110 111 TAILQ_HEAD(cu_request_list, cu_request); 112 113 #define MCALL_MSG_SIZE 24 114 115 /* 116 * This structure is pointed to by the socket buffer's sb_upcallarg 117 * member. It is separate from the client private data to facilitate 118 * multiple clients sharing the same socket. The cs_lock mutex is used 119 * to protect all fields of this structure, the socket's receive 120 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 121 * structures is installed on the socket. 122 */ 123 struct cu_socket { 124 struct mtx cs_lock; 125 int cs_refs; /* Count of clients */ 126 struct cu_request_list cs_pending; /* Requests awaiting replies */ 127 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 128 }; 129 130 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 131 132 /* 133 * Private data kept per client handle 134 */ 135 struct cu_data { 136 int cu_threads; /* # threads in clnt_vc_call */ 137 bool_t cu_closing; /* TRUE if we are closing */ 138 bool_t cu_closed; /* TRUE if we are closed */ 139 struct socket *cu_socket; /* connection socket */ 140 bool_t cu_closeit; /* opened by library */ 141 struct sockaddr_storage cu_raddr; /* remote address */ 142 int cu_rlen; 143 struct timeval cu_wait; /* retransmit interval */ 144 struct timeval cu_total; /* total time for the call */ 145 struct rpc_err cu_error; 146 uint32_t cu_xid; 147 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 148 size_t cu_mcalllen; 149 size_t cu_sendsz; /* send size */ 150 size_t cu_recvsz; /* recv size */ 151 int cu_async; 152 int cu_connect; /* Use connect(). */ 153 int cu_connected; /* Have done connect(). */ 154 const char *cu_waitchan; 155 int cu_waitflag; 156 int cu_cwnd; /* congestion window */ 157 int cu_sent; /* number of in-flight RPCs */ 158 bool_t cu_cwnd_wait; 159 }; 160 161 #define CWNDSCALE 256 162 #define MAXCWND (32 * CWNDSCALE) 163 164 /* 165 * Connection less client creation returns with client handle parameters. 166 * Default options are set, which the user can change using clnt_control(). 167 * fd should be open and bound. 168 * NB: The rpch->cl_auth is initialized to null authentication. 169 * Caller may wish to set this something more useful. 170 * 171 * sendsz and recvsz are the maximum allowable packet sizes that can be 172 * sent and received. Normally they are the same, but they can be 173 * changed to improve the program efficiency and buffer allocation. 174 * If they are 0, use the transport default. 175 * 176 * If svcaddr is NULL, returns NULL. 177 */ 178 CLIENT * 179 clnt_dg_create( 180 struct socket *so, 181 struct sockaddr *svcaddr, /* servers address */ 182 rpcprog_t program, /* program number */ 183 rpcvers_t version, /* version number */ 184 size_t sendsz, /* buffer recv size */ 185 size_t recvsz) /* buffer send size */ 186 { 187 CLIENT *cl = NULL; /* client handle */ 188 struct cu_data *cu = NULL; /* private data */ 189 struct cu_socket *cs = NULL; 190 struct sockbuf *sb; 191 struct timeval now; 192 struct rpc_msg call_msg; 193 struct __rpc_sockinfo si; 194 XDR xdrs; 195 int error; 196 197 if (svcaddr == NULL) { 198 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 199 return (NULL); 200 } 201 202 if (!__rpc_socket2sockinfo(so, &si)) { 203 rpc_createerr.cf_stat = RPC_TLIERROR; 204 rpc_createerr.cf_error.re_errno = 0; 205 return (NULL); 206 } 207 208 /* 209 * Find the receive and the send size 210 */ 211 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 212 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 213 if ((sendsz == 0) || (recvsz == 0)) { 214 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 215 rpc_createerr.cf_error.re_errno = 0; 216 return (NULL); 217 } 218 219 cl = mem_alloc(sizeof (CLIENT)); 220 221 /* 222 * Should be multiple of 4 for XDR. 223 */ 224 sendsz = ((sendsz + 3) / 4) * 4; 225 recvsz = ((recvsz + 3) / 4) * 4; 226 cu = mem_alloc(sizeof (*cu)); 227 cu->cu_threads = 0; 228 cu->cu_closing = FALSE; 229 cu->cu_closed = FALSE; 230 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 231 cu->cu_rlen = svcaddr->sa_len; 232 /* Other values can also be set through clnt_control() */ 233 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 234 cu->cu_wait.tv_usec = 0; 235 cu->cu_total.tv_sec = -1; 236 cu->cu_total.tv_usec = -1; 237 cu->cu_sendsz = sendsz; 238 cu->cu_recvsz = recvsz; 239 cu->cu_async = FALSE; 240 cu->cu_connect = FALSE; 241 cu->cu_connected = FALSE; 242 cu->cu_waitchan = "rpcrecv"; 243 cu->cu_waitflag = 0; 244 cu->cu_cwnd = MAXCWND / 2; 245 cu->cu_sent = 0; 246 cu->cu_cwnd_wait = FALSE; 247 (void) getmicrotime(&now); 248 cu->cu_xid = __RPC_GETXID(&now); 249 call_msg.rm_xid = cu->cu_xid; 250 call_msg.rm_call.cb_prog = program; 251 call_msg.rm_call.cb_vers = version; 252 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 253 if (! xdr_callhdr(&xdrs, &call_msg)) { 254 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 255 rpc_createerr.cf_error.re_errno = 0; 256 goto err2; 257 } 258 cu->cu_mcalllen = XDR_GETPOS(&xdrs); 259 260 /* 261 * By default, closeit is always FALSE. It is users responsibility 262 * to do a close on it, else the user may use clnt_control 263 * to let clnt_destroy do it for him/her. 264 */ 265 cu->cu_closeit = FALSE; 266 cu->cu_socket = so; 267 error = soreserve(so, (u_long)sendsz, (u_long)recvsz); 268 if (error != 0) { 269 rpc_createerr.cf_stat = RPC_FAILED; 270 rpc_createerr.cf_error.re_errno = error; 271 goto err2; 272 } 273 274 sb = &so->so_rcv; 275 SOCKBUF_LOCK(&so->so_rcv); 276 recheck_socket: 277 if (sb->sb_upcall) { 278 if (sb->sb_upcall != clnt_dg_soupcall) { 279 SOCKBUF_UNLOCK(&so->so_rcv); 280 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 281 goto err2; 282 } 283 cs = (struct cu_socket *) sb->sb_upcallarg; 284 mtx_lock(&cs->cs_lock); 285 cs->cs_refs++; 286 mtx_unlock(&cs->cs_lock); 287 } else { 288 /* 289 * We are the first on this socket - allocate the 290 * structure and install it in the socket. 291 */ 292 SOCKBUF_UNLOCK(&so->so_rcv); 293 cs = mem_alloc(sizeof(*cs)); 294 SOCKBUF_LOCK(&so->so_rcv); 295 if (sb->sb_upcall) { 296 /* 297 * We have lost a race with some other client. 298 */ 299 mem_free(cs, sizeof(*cs)); 300 goto recheck_socket; 301 } 302 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 303 cs->cs_refs = 1; 304 cs->cs_upcallrefs = 0; 305 TAILQ_INIT(&cs->cs_pending); 306 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 307 } 308 SOCKBUF_UNLOCK(&so->so_rcv); 309 310 cl->cl_refs = 1; 311 cl->cl_ops = &clnt_dg_ops; 312 cl->cl_private = (caddr_t)(void *)cu; 313 cl->cl_auth = authnone_create(); 314 cl->cl_tp = NULL; 315 cl->cl_netid = NULL; 316 return (cl); 317 err2: 318 if (cl) { 319 mem_free(cl, sizeof (CLIENT)); 320 if (cu) 321 mem_free(cu, sizeof (*cu)); 322 } 323 return (NULL); 324 } 325 326 static enum clnt_stat 327 clnt_dg_call( 328 CLIENT *cl, /* client handle */ 329 struct rpc_callextra *ext, /* call metadata */ 330 rpcproc_t proc, /* procedure number */ 331 struct mbuf *args, /* pointer to args */ 332 struct mbuf **resultsp, /* pointer to results */ 333 struct timeval utimeout) /* seconds to wait before giving up */ 334 { 335 struct cu_data *cu = (struct cu_data *)cl->cl_private; 336 struct cu_socket *cs; 337 struct rpc_timers *rt; 338 AUTH *auth; 339 struct rpc_err *errp; 340 enum clnt_stat stat; 341 XDR xdrs; 342 struct rpc_msg reply_msg; 343 bool_t ok; 344 int retrans; /* number of re-transmits so far */ 345 int nrefreshes = 2; /* number of times to refresh cred */ 346 struct timeval *tvp; 347 int timeout; 348 int retransmit_time; 349 int next_sendtime, starttime, rtt, time_waited, tv = 0; 350 struct sockaddr *sa; 351 socklen_t salen; 352 uint32_t xid = 0; 353 struct mbuf *mreq = NULL, *results; 354 struct cu_request *cr; 355 int error; 356 357 cs = cu->cu_socket->so_rcv.sb_upcallarg; 358 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 359 360 mtx_lock(&cs->cs_lock); 361 362 if (cu->cu_closing || cu->cu_closed) { 363 mtx_unlock(&cs->cs_lock); 364 free(cr, M_RPC); 365 return (RPC_CANTSEND); 366 } 367 cu->cu_threads++; 368 369 if (ext) { 370 auth = ext->rc_auth; 371 errp = &ext->rc_err; 372 } else { 373 auth = cl->cl_auth; 374 errp = &cu->cu_error; 375 } 376 377 cr->cr_client = cl; 378 cr->cr_mrep = NULL; 379 cr->cr_error = 0; 380 381 if (cu->cu_total.tv_usec == -1) { 382 tvp = &utimeout; /* use supplied timeout */ 383 } else { 384 tvp = &cu->cu_total; /* use default timeout */ 385 } 386 if (tvp->tv_sec || tvp->tv_usec) 387 timeout = tvtohz(tvp); 388 else 389 timeout = 0; 390 391 if (cu->cu_connect && !cu->cu_connected) { 392 mtx_unlock(&cs->cs_lock); 393 error = soconnect(cu->cu_socket, 394 (struct sockaddr *)&cu->cu_raddr, curthread); 395 mtx_lock(&cs->cs_lock); 396 if (error) { 397 errp->re_errno = error; 398 errp->re_status = stat = RPC_CANTSEND; 399 goto out; 400 } 401 cu->cu_connected = 1; 402 } 403 if (cu->cu_connected) { 404 sa = NULL; 405 salen = 0; 406 } else { 407 sa = (struct sockaddr *)&cu->cu_raddr; 408 salen = cu->cu_rlen; 409 } 410 time_waited = 0; 411 retrans = 0; 412 if (ext && ext->rc_timers) { 413 rt = ext->rc_timers; 414 if (!rt->rt_rtxcur) 415 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 416 retransmit_time = next_sendtime = rt->rt_rtxcur; 417 } else { 418 rt = NULL; 419 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 420 } 421 422 starttime = ticks; 423 424 call_again: 425 mtx_assert(&cs->cs_lock, MA_OWNED); 426 427 cu->cu_xid++; 428 xid = cu->cu_xid; 429 430 send_again: 431 mtx_unlock(&cs->cs_lock); 432 433 mreq = m_gethdr(M_WAITOK, MT_DATA); 434 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 435 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 436 mreq->m_len = cu->cu_mcalllen; 437 438 /* 439 * The XID is the first thing in the request. 440 */ 441 *mtod(mreq, uint32_t *) = htonl(xid); 442 443 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 444 445 if (cu->cu_async == TRUE && args == NULL) 446 goto get_reply; 447 448 if ((! XDR_PUTINT32(&xdrs, &proc)) || 449 (! AUTH_MARSHALL(auth, xid, &xdrs, 450 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 451 errp->re_status = stat = RPC_CANTENCODEARGS; 452 mtx_lock(&cs->cs_lock); 453 goto out; 454 } 455 mreq->m_pkthdr.len = m_length(mreq, NULL); 456 457 cr->cr_xid = xid; 458 mtx_lock(&cs->cs_lock); 459 460 /* 461 * Try to get a place in the congestion window. 462 */ 463 while (cu->cu_sent >= cu->cu_cwnd) { 464 cu->cu_cwnd_wait = TRUE; 465 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 466 cu->cu_waitflag, "rpccwnd", 0); 467 if (error) { 468 errp->re_errno = error; 469 if (error == EINTR || error == ERESTART) 470 errp->re_status = stat = RPC_INTR; 471 else 472 errp->re_status = stat = RPC_CANTSEND; 473 goto out; 474 } 475 } 476 cu->cu_sent += CWNDSCALE; 477 478 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 479 mtx_unlock(&cs->cs_lock); 480 481 /* 482 * sosend consumes mreq. 483 */ 484 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 485 mreq = NULL; 486 487 /* 488 * sub-optimal code appears here because we have 489 * some clock time to spare while the packets are in flight. 490 * (We assume that this is actually only executed once.) 491 */ 492 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 493 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 494 reply_msg.acpted_rply.ar_verf.oa_length = 0; 495 reply_msg.acpted_rply.ar_results.where = NULL; 496 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 497 498 mtx_lock(&cs->cs_lock); 499 if (error) { 500 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 501 errp->re_errno = error; 502 errp->re_status = stat = RPC_CANTSEND; 503 cu->cu_sent -= CWNDSCALE; 504 if (cu->cu_cwnd_wait) { 505 cu->cu_cwnd_wait = FALSE; 506 wakeup(&cu->cu_cwnd_wait); 507 } 508 goto out; 509 } 510 511 /* 512 * Check to see if we got an upcall while waiting for the 513 * lock. 514 */ 515 if (cr->cr_error) { 516 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 517 errp->re_errno = cr->cr_error; 518 errp->re_status = stat = RPC_CANTRECV; 519 cu->cu_sent -= CWNDSCALE; 520 if (cu->cu_cwnd_wait) { 521 cu->cu_cwnd_wait = FALSE; 522 wakeup(&cu->cu_cwnd_wait); 523 } 524 goto out; 525 } 526 if (cr->cr_mrep) { 527 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 528 cu->cu_sent -= CWNDSCALE; 529 if (cu->cu_cwnd_wait) { 530 cu->cu_cwnd_wait = FALSE; 531 wakeup(&cu->cu_cwnd_wait); 532 } 533 goto got_reply; 534 } 535 536 /* 537 * Hack to provide rpc-based message passing 538 */ 539 if (timeout == 0) { 540 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 541 errp->re_status = stat = RPC_TIMEDOUT; 542 cu->cu_sent -= CWNDSCALE; 543 if (cu->cu_cwnd_wait) { 544 cu->cu_cwnd_wait = FALSE; 545 wakeup(&cu->cu_cwnd_wait); 546 } 547 goto out; 548 } 549 550 get_reply: 551 for (;;) { 552 /* Decide how long to wait. */ 553 if (next_sendtime < timeout) 554 tv = next_sendtime; 555 else 556 tv = timeout; 557 tv -= time_waited; 558 559 if (tv > 0) { 560 if (cu->cu_closing || cu->cu_closed) { 561 error = 0; 562 cr->cr_error = ESHUTDOWN; 563 } else { 564 error = msleep(cr, &cs->cs_lock, 565 cu->cu_waitflag, cu->cu_waitchan, tv); 566 } 567 } else { 568 error = EWOULDBLOCK; 569 } 570 571 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 572 cu->cu_sent -= CWNDSCALE; 573 if (cu->cu_cwnd_wait) { 574 cu->cu_cwnd_wait = FALSE; 575 wakeup(&cu->cu_cwnd_wait); 576 } 577 578 if (!error) { 579 /* 580 * We were woken up by the upcall. If the 581 * upcall had a receive error, report that, 582 * otherwise we have a reply. 583 */ 584 if (cr->cr_error) { 585 errp->re_errno = cr->cr_error; 586 errp->re_status = stat = RPC_CANTRECV; 587 goto out; 588 } 589 590 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 591 + cu->cu_cwnd / 2) / cu->cu_cwnd; 592 if (cu->cu_cwnd > MAXCWND) 593 cu->cu_cwnd = MAXCWND; 594 595 if (rt) { 596 /* 597 * Add one to the time since a tick 598 * count of N means that the actual 599 * time taken was somewhere between N 600 * and N+1. 601 */ 602 rtt = ticks - starttime + 1; 603 604 /* 605 * Update our estimate of the round 606 * trip time using roughly the 607 * algorithm described in RFC 608 * 2988. Given an RTT sample R: 609 * 610 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 611 * SRTT = (1-alpha) * SRTT + alpha * R 612 * 613 * where alpha = 0.125 and beta = 0.25. 614 * 615 * The initial retransmit timeout is 616 * SRTT + 4*RTTVAR and doubles on each 617 * retransmision. 618 */ 619 if (rt->rt_srtt == 0) { 620 rt->rt_srtt = rtt; 621 rt->rt_deviate = rtt / 2; 622 } else { 623 int32_t error = rtt - rt->rt_srtt; 624 rt->rt_srtt += error / 8; 625 error = abs(error) - rt->rt_deviate; 626 rt->rt_deviate += error / 4; 627 } 628 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 629 } 630 631 break; 632 } 633 634 /* 635 * The sleep returned an error so our request is still 636 * on the list. If we got EWOULDBLOCK, we may want to 637 * re-send the request. 638 */ 639 if (error != EWOULDBLOCK) { 640 errp->re_errno = error; 641 if (error == EINTR || error == ERESTART) 642 errp->re_status = stat = RPC_INTR; 643 else 644 errp->re_status = stat = RPC_CANTRECV; 645 goto out; 646 } 647 648 time_waited = ticks - starttime; 649 650 /* Check for timeout. */ 651 if (time_waited > timeout) { 652 errp->re_errno = EWOULDBLOCK; 653 errp->re_status = stat = RPC_TIMEDOUT; 654 goto out; 655 } 656 657 /* Retransmit if necessary. */ 658 if (time_waited >= next_sendtime) { 659 cu->cu_cwnd /= 2; 660 if (cu->cu_cwnd < CWNDSCALE) 661 cu->cu_cwnd = CWNDSCALE; 662 if (ext && ext->rc_feedback) { 663 mtx_unlock(&cs->cs_lock); 664 if (retrans == 0) 665 ext->rc_feedback(FEEDBACK_REXMIT1, 666 proc, ext->rc_feedback_arg); 667 else 668 ext->rc_feedback(FEEDBACK_REXMIT2, 669 proc, ext->rc_feedback_arg); 670 mtx_lock(&cs->cs_lock); 671 } 672 if (cu->cu_closing || cu->cu_closed) { 673 errp->re_errno = ESHUTDOWN; 674 errp->re_status = stat = RPC_CANTRECV; 675 goto out; 676 } 677 retrans++; 678 /* update retransmit_time */ 679 if (retransmit_time < RPC_MAX_BACKOFF * hz) 680 retransmit_time = 2 * retransmit_time; 681 next_sendtime += retransmit_time; 682 goto send_again; 683 } 684 cu->cu_sent += CWNDSCALE; 685 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 686 } 687 688 got_reply: 689 /* 690 * Now decode and validate the response. We need to drop the 691 * lock since xdr_replymsg may end up sleeping in malloc. 692 */ 693 mtx_unlock(&cs->cs_lock); 694 695 if (ext && ext->rc_feedback) 696 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 697 698 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 699 ok = xdr_replymsg(&xdrs, &reply_msg); 700 cr->cr_mrep = NULL; 701 702 if (ok) { 703 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 704 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 705 errp->re_status = stat = RPC_SUCCESS; 706 else 707 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 708 709 if (errp->re_status == RPC_SUCCESS) { 710 results = xdrmbuf_getall(&xdrs); 711 if (! AUTH_VALIDATE(auth, xid, 712 &reply_msg.acpted_rply.ar_verf, 713 &results)) { 714 errp->re_status = stat = RPC_AUTHERROR; 715 errp->re_why = AUTH_INVALIDRESP; 716 if (retrans && 717 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 718 /* 719 * If we retransmitted, its 720 * possible that we will 721 * receive a reply for one of 722 * the earlier transmissions 723 * (which will use an older 724 * RPCSEC_GSS sequence 725 * number). In this case, just 726 * go back and listen for a 727 * new reply. We could keep a 728 * record of all the seq 729 * numbers we have transmitted 730 * so far so that we could 731 * accept a reply for any of 732 * them here. 733 */ 734 XDR_DESTROY(&xdrs); 735 mtx_lock(&cs->cs_lock); 736 cu->cu_sent += CWNDSCALE; 737 TAILQ_INSERT_TAIL(&cs->cs_pending, 738 cr, cr_link); 739 cr->cr_mrep = NULL; 740 goto get_reply; 741 } 742 } else { 743 *resultsp = results; 744 } 745 } /* end successful completion */ 746 /* 747 * If unsuccesful AND error is an authentication error 748 * then refresh credentials and try again, else break 749 */ 750 else if (stat == RPC_AUTHERROR) 751 /* maybe our credentials need to be refreshed ... */ 752 if (nrefreshes > 0 && 753 AUTH_REFRESH(auth, &reply_msg)) { 754 nrefreshes--; 755 XDR_DESTROY(&xdrs); 756 mtx_lock(&cs->cs_lock); 757 goto call_again; 758 } 759 /* end of unsuccessful completion */ 760 } /* end of valid reply message */ 761 else { 762 errp->re_status = stat = RPC_CANTDECODERES; 763 764 } 765 XDR_DESTROY(&xdrs); 766 mtx_lock(&cs->cs_lock); 767 out: 768 mtx_assert(&cs->cs_lock, MA_OWNED); 769 770 if (mreq) 771 m_freem(mreq); 772 if (cr->cr_mrep) 773 m_freem(cr->cr_mrep); 774 775 cu->cu_threads--; 776 if (cu->cu_closing) 777 wakeup(cu); 778 779 mtx_unlock(&cs->cs_lock); 780 781 if (auth && stat != RPC_SUCCESS) 782 AUTH_VALIDATE(auth, xid, NULL, NULL); 783 784 free(cr, M_RPC); 785 786 return (stat); 787 } 788 789 static void 790 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 791 { 792 struct cu_data *cu = (struct cu_data *)cl->cl_private; 793 794 *errp = cu->cu_error; 795 } 796 797 static bool_t 798 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 799 { 800 XDR xdrs; 801 bool_t dummy; 802 803 xdrs.x_op = XDR_FREE; 804 dummy = (*xdr_res)(&xdrs, res_ptr); 805 806 return (dummy); 807 } 808 809 /*ARGSUSED*/ 810 static void 811 clnt_dg_abort(CLIENT *h) 812 { 813 } 814 815 static bool_t 816 clnt_dg_control(CLIENT *cl, u_int request, void *info) 817 { 818 struct cu_data *cu = (struct cu_data *)cl->cl_private; 819 struct cu_socket *cs; 820 struct sockaddr *addr; 821 822 cs = cu->cu_socket->so_rcv.sb_upcallarg; 823 mtx_lock(&cs->cs_lock); 824 825 switch (request) { 826 case CLSET_FD_CLOSE: 827 cu->cu_closeit = TRUE; 828 mtx_unlock(&cs->cs_lock); 829 return (TRUE); 830 case CLSET_FD_NCLOSE: 831 cu->cu_closeit = FALSE; 832 mtx_unlock(&cs->cs_lock); 833 return (TRUE); 834 } 835 836 /* for other requests which use info */ 837 if (info == NULL) { 838 mtx_unlock(&cs->cs_lock); 839 return (FALSE); 840 } 841 switch (request) { 842 case CLSET_TIMEOUT: 843 if (time_not_ok((struct timeval *)info)) { 844 mtx_unlock(&cs->cs_lock); 845 return (FALSE); 846 } 847 cu->cu_total = *(struct timeval *)info; 848 break; 849 case CLGET_TIMEOUT: 850 *(struct timeval *)info = cu->cu_total; 851 break; 852 case CLSET_RETRY_TIMEOUT: 853 if (time_not_ok((struct timeval *)info)) { 854 mtx_unlock(&cs->cs_lock); 855 return (FALSE); 856 } 857 cu->cu_wait = *(struct timeval *)info; 858 break; 859 case CLGET_RETRY_TIMEOUT: 860 *(struct timeval *)info = cu->cu_wait; 861 break; 862 case CLGET_SVC_ADDR: 863 /* 864 * Slightly different semantics to userland - we use 865 * sockaddr instead of netbuf. 866 */ 867 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 868 break; 869 case CLSET_SVC_ADDR: /* set to new address */ 870 addr = (struct sockaddr *)info; 871 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 872 break; 873 case CLGET_XID: 874 *(uint32_t *)info = cu->cu_xid; 875 break; 876 877 case CLSET_XID: 878 /* This will set the xid of the NEXT call */ 879 /* decrement by 1 as clnt_dg_call() increments once */ 880 cu->cu_xid = *(uint32_t *)info - 1; 881 break; 882 883 case CLGET_VERS: 884 /* 885 * This RELIES on the information that, in the call body, 886 * the version number field is the fifth field from the 887 * begining of the RPC header. MUST be changed if the 888 * call_struct is changed 889 */ 890 *(uint32_t *)info = 891 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 892 4 * BYTES_PER_XDR_UNIT)); 893 break; 894 895 case CLSET_VERS: 896 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 897 = htonl(*(uint32_t *)info); 898 break; 899 900 case CLGET_PROG: 901 /* 902 * This RELIES on the information that, in the call body, 903 * the program number field is the fourth field from the 904 * begining of the RPC header. MUST be changed if the 905 * call_struct is changed 906 */ 907 *(uint32_t *)info = 908 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 909 3 * BYTES_PER_XDR_UNIT)); 910 break; 911 912 case CLSET_PROG: 913 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 914 = htonl(*(uint32_t *)info); 915 break; 916 case CLSET_ASYNC: 917 cu->cu_async = *(int *)info; 918 break; 919 case CLSET_CONNECT: 920 cu->cu_connect = *(int *)info; 921 break; 922 case CLSET_WAITCHAN: 923 cu->cu_waitchan = (const char *)info; 924 break; 925 case CLGET_WAITCHAN: 926 *(const char **) info = cu->cu_waitchan; 927 break; 928 case CLSET_INTERRUPTIBLE: 929 if (*(int *) info) 930 cu->cu_waitflag = PCATCH; 931 else 932 cu->cu_waitflag = 0; 933 break; 934 case CLGET_INTERRUPTIBLE: 935 if (cu->cu_waitflag) 936 *(int *) info = TRUE; 937 else 938 *(int *) info = FALSE; 939 break; 940 default: 941 mtx_unlock(&cs->cs_lock); 942 return (FALSE); 943 } 944 mtx_unlock(&cs->cs_lock); 945 return (TRUE); 946 } 947 948 static void 949 clnt_dg_close(CLIENT *cl) 950 { 951 struct cu_data *cu = (struct cu_data *)cl->cl_private; 952 struct cu_socket *cs; 953 struct cu_request *cr; 954 955 cs = cu->cu_socket->so_rcv.sb_upcallarg; 956 mtx_lock(&cs->cs_lock); 957 958 if (cu->cu_closed) { 959 mtx_unlock(&cs->cs_lock); 960 return; 961 } 962 963 if (cu->cu_closing) { 964 while (cu->cu_closing) 965 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 966 KASSERT(cu->cu_closed, ("client should be closed")); 967 mtx_unlock(&cs->cs_lock); 968 return; 969 } 970 971 /* 972 * Abort any pending requests and wait until everyone 973 * has finished with clnt_vc_call. 974 */ 975 cu->cu_closing = TRUE; 976 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 977 if (cr->cr_client == cl) { 978 cr->cr_xid = 0; 979 cr->cr_error = ESHUTDOWN; 980 wakeup(cr); 981 } 982 } 983 984 while (cu->cu_threads) 985 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 986 987 cu->cu_closing = FALSE; 988 cu->cu_closed = TRUE; 989 990 mtx_unlock(&cs->cs_lock); 991 wakeup(cu); 992 } 993 994 static void 995 clnt_dg_destroy(CLIENT *cl) 996 { 997 struct cu_data *cu = (struct cu_data *)cl->cl_private; 998 struct cu_socket *cs; 999 struct socket *so = NULL; 1000 bool_t lastsocketref; 1001 1002 cs = cu->cu_socket->so_rcv.sb_upcallarg; 1003 clnt_dg_close(cl); 1004 1005 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 1006 mtx_lock(&cs->cs_lock); 1007 1008 cs->cs_refs--; 1009 if (cs->cs_refs == 0) { 1010 mtx_unlock(&cs->cs_lock); 1011 soupcall_clear(cu->cu_socket, SO_RCV); 1012 clnt_dg_upcallsdone(cu->cu_socket, cs); 1013 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1014 mtx_destroy(&cs->cs_lock); 1015 mem_free(cs, sizeof(*cs)); 1016 lastsocketref = TRUE; 1017 } else { 1018 mtx_unlock(&cs->cs_lock); 1019 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1020 lastsocketref = FALSE; 1021 } 1022 1023 if (cu->cu_closeit && lastsocketref) { 1024 so = cu->cu_socket; 1025 cu->cu_socket = NULL; 1026 } 1027 1028 if (so) 1029 soclose(so); 1030 1031 if (cl->cl_netid && cl->cl_netid[0]) 1032 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1033 if (cl->cl_tp && cl->cl_tp[0]) 1034 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1035 mem_free(cu, sizeof (*cu)); 1036 mem_free(cl, sizeof (CLIENT)); 1037 } 1038 1039 /* 1040 * Make sure that the time is not garbage. -1 value is allowed. 1041 */ 1042 static bool_t 1043 time_not_ok(struct timeval *t) 1044 { 1045 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1046 t->tv_usec < -1 || t->tv_usec > 1000000); 1047 } 1048 1049 int 1050 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1051 { 1052 struct cu_socket *cs = (struct cu_socket *) arg; 1053 struct uio uio; 1054 struct mbuf *m; 1055 struct mbuf *control; 1056 struct cu_request *cr; 1057 int error, rcvflag, foundreq; 1058 uint32_t xid; 1059 1060 cs->cs_upcallrefs++; 1061 uio.uio_resid = 1000000000; 1062 uio.uio_td = curthread; 1063 do { 1064 SOCKBUF_UNLOCK(&so->so_rcv); 1065 m = NULL; 1066 control = NULL; 1067 rcvflag = MSG_DONTWAIT; 1068 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1069 if (control) 1070 m_freem(control); 1071 SOCKBUF_LOCK(&so->so_rcv); 1072 1073 if (error == EWOULDBLOCK) 1074 break; 1075 1076 /* 1077 * If there was an error, wake up all pending 1078 * requests. 1079 */ 1080 if (error) { 1081 mtx_lock(&cs->cs_lock); 1082 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1083 cr->cr_xid = 0; 1084 cr->cr_error = error; 1085 wakeup(cr); 1086 } 1087 mtx_unlock(&cs->cs_lock); 1088 break; 1089 } 1090 1091 /* 1092 * The XID is in the first uint32_t of the reply. 1093 */ 1094 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) { 1095 /* 1096 * Should never happen. 1097 */ 1098 m_freem(m); 1099 continue; 1100 } 1101 1102 m_copydata(m, 0, sizeof(xid), (char *)&xid); 1103 xid = ntohl(xid); 1104 1105 /* 1106 * Attempt to match this reply with a pending request. 1107 */ 1108 mtx_lock(&cs->cs_lock); 1109 foundreq = 0; 1110 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1111 if (cr->cr_xid == xid) { 1112 /* 1113 * This one matches. We leave the 1114 * reply mbuf in cr->cr_mrep. Set the 1115 * XID to zero so that we will ignore 1116 * any duplicated replies that arrive 1117 * before clnt_dg_call removes it from 1118 * the queue. 1119 */ 1120 cr->cr_xid = 0; 1121 cr->cr_mrep = m; 1122 cr->cr_error = 0; 1123 foundreq = 1; 1124 wakeup(cr); 1125 break; 1126 } 1127 } 1128 mtx_unlock(&cs->cs_lock); 1129 1130 /* 1131 * If we didn't find the matching request, just drop 1132 * it - its probably a repeated reply. 1133 */ 1134 if (!foundreq) 1135 m_freem(m); 1136 } while (m); 1137 cs->cs_upcallrefs--; 1138 if (cs->cs_upcallrefs < 0) 1139 panic("rpcdg upcall refcnt"); 1140 if (cs->cs_upcallrefs == 0) 1141 wakeup(&cs->cs_upcallrefs); 1142 return (SU_OK); 1143 } 1144 1145 /* 1146 * Wait for all upcalls in progress to complete. 1147 */ 1148 static void 1149 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1150 { 1151 1152 SOCKBUF_LOCK_ASSERT(&so->so_rcv); 1153 1154 while (cs->cs_upcallrefs > 0) 1155 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1156 "rpcdgup", 0); 1157 } 1158