1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 /* 31 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 32 */ 33 34 #if defined(LIBC_SCCS) && !defined(lint) 35 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 36 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 37 #endif 38 #include <sys/cdefs.h> 39 __FBSDID("$FreeBSD$"); 40 41 /* 42 * Implements a connectionless client side RPC. 43 */ 44 45 #include <sys/param.h> 46 #include <sys/systm.h> 47 #include <sys/kernel.h> 48 #include <sys/lock.h> 49 #include <sys/malloc.h> 50 #include <sys/mbuf.h> 51 #include <sys/mutex.h> 52 #include <sys/pcpu.h> 53 #include <sys/proc.h> 54 #include <sys/socket.h> 55 #include <sys/socketvar.h> 56 #include <sys/time.h> 57 #include <sys/uio.h> 58 59 #include <net/vnet.h> 60 61 #include <rpc/rpc.h> 62 #include <rpc/rpc_com.h> 63 64 65 #ifdef _FREEFALL_CONFIG 66 /* 67 * Disable RPC exponential back-off for FreeBSD.org systems. 68 */ 69 #define RPC_MAX_BACKOFF 1 /* second */ 70 #else 71 #define RPC_MAX_BACKOFF 30 /* seconds */ 72 #endif 73 74 static bool_t time_not_ok(struct timeval *); 75 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 76 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 77 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 78 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 79 static void clnt_dg_abort(CLIENT *); 80 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 81 static void clnt_dg_close(CLIENT *); 82 static void clnt_dg_destroy(CLIENT *); 83 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 84 85 static struct clnt_ops clnt_dg_ops = { 86 .cl_call = clnt_dg_call, 87 .cl_abort = clnt_dg_abort, 88 .cl_geterr = clnt_dg_geterr, 89 .cl_freeres = clnt_dg_freeres, 90 .cl_close = clnt_dg_close, 91 .cl_destroy = clnt_dg_destroy, 92 .cl_control = clnt_dg_control 93 }; 94 95 /* 96 * A pending RPC request which awaits a reply. Requests which have 97 * received their reply will have cr_xid set to zero and cr_mrep to 98 * the mbuf chain of the reply. 99 */ 100 struct cu_request { 101 TAILQ_ENTRY(cu_request) cr_link; 102 CLIENT *cr_client; /* owner */ 103 uint32_t cr_xid; /* XID of request */ 104 struct mbuf *cr_mrep; /* reply received by upcall */ 105 int cr_error; /* any error from upcall */ 106 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 107 }; 108 109 TAILQ_HEAD(cu_request_list, cu_request); 110 111 #define MCALL_MSG_SIZE 24 112 113 /* 114 * This structure is pointed to by the socket buffer's sb_upcallarg 115 * member. It is separate from the client private data to facilitate 116 * multiple clients sharing the same socket. The cs_lock mutex is used 117 * to protect all fields of this structure, the socket's receive 118 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 119 * structures is installed on the socket. 120 */ 121 struct cu_socket { 122 struct mtx cs_lock; 123 int cs_refs; /* Count of clients */ 124 struct cu_request_list cs_pending; /* Requests awaiting replies */ 125 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 126 }; 127 128 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 129 130 /* 131 * Private data kept per client handle 132 */ 133 struct cu_data { 134 int cu_threads; /* # threads in clnt_vc_call */ 135 bool_t cu_closing; /* TRUE if we are closing */ 136 bool_t cu_closed; /* TRUE if we are closed */ 137 struct socket *cu_socket; /* connection socket */ 138 bool_t cu_closeit; /* opened by library */ 139 struct sockaddr_storage cu_raddr; /* remote address */ 140 int cu_rlen; 141 struct timeval cu_wait; /* retransmit interval */ 142 struct timeval cu_total; /* total time for the call */ 143 struct rpc_err cu_error; 144 uint32_t cu_xid; 145 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 146 size_t cu_mcalllen; 147 size_t cu_sendsz; /* send size */ 148 size_t cu_recvsz; /* recv size */ 149 int cu_async; 150 int cu_connect; /* Use connect(). */ 151 int cu_connected; /* Have done connect(). */ 152 const char *cu_waitchan; 153 int cu_waitflag; 154 int cu_cwnd; /* congestion window */ 155 int cu_sent; /* number of in-flight RPCs */ 156 bool_t cu_cwnd_wait; 157 }; 158 159 #define CWNDSCALE 256 160 #define MAXCWND (32 * CWNDSCALE) 161 162 /* 163 * Connection less client creation returns with client handle parameters. 164 * Default options are set, which the user can change using clnt_control(). 165 * fd should be open and bound. 166 * NB: The rpch->cl_auth is initialized to null authentication. 167 * Caller may wish to set this something more useful. 168 * 169 * sendsz and recvsz are the maximum allowable packet sizes that can be 170 * sent and received. Normally they are the same, but they can be 171 * changed to improve the program efficiency and buffer allocation. 172 * If they are 0, use the transport default. 173 * 174 * If svcaddr is NULL, returns NULL. 175 */ 176 CLIENT * 177 clnt_dg_create( 178 struct socket *so, 179 struct sockaddr *svcaddr, /* servers address */ 180 rpcprog_t program, /* program number */ 181 rpcvers_t version, /* version number */ 182 size_t sendsz, /* buffer recv size */ 183 size_t recvsz) /* buffer send size */ 184 { 185 CLIENT *cl = NULL; /* client handle */ 186 struct cu_data *cu = NULL; /* private data */ 187 struct cu_socket *cs = NULL; 188 struct sockbuf *sb; 189 struct timeval now; 190 struct rpc_msg call_msg; 191 struct __rpc_sockinfo si; 192 XDR xdrs; 193 int error; 194 195 if (svcaddr == NULL) { 196 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 197 return (NULL); 198 } 199 200 if (!__rpc_socket2sockinfo(so, &si)) { 201 rpc_createerr.cf_stat = RPC_TLIERROR; 202 rpc_createerr.cf_error.re_errno = 0; 203 return (NULL); 204 } 205 206 /* 207 * Find the receive and the send size 208 */ 209 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 210 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 211 if ((sendsz == 0) || (recvsz == 0)) { 212 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 213 rpc_createerr.cf_error.re_errno = 0; 214 return (NULL); 215 } 216 217 cl = mem_alloc(sizeof (CLIENT)); 218 219 /* 220 * Should be multiple of 4 for XDR. 221 */ 222 sendsz = rounddown(sendsz + 3, 4); 223 recvsz = rounddown(recvsz + 3, 4); 224 cu = mem_alloc(sizeof (*cu)); 225 cu->cu_threads = 0; 226 cu->cu_closing = FALSE; 227 cu->cu_closed = FALSE; 228 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 229 cu->cu_rlen = svcaddr->sa_len; 230 /* Other values can also be set through clnt_control() */ 231 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 232 cu->cu_wait.tv_usec = 0; 233 cu->cu_total.tv_sec = -1; 234 cu->cu_total.tv_usec = -1; 235 cu->cu_sendsz = sendsz; 236 cu->cu_recvsz = recvsz; 237 cu->cu_async = FALSE; 238 cu->cu_connect = FALSE; 239 cu->cu_connected = FALSE; 240 cu->cu_waitchan = "rpcrecv"; 241 cu->cu_waitflag = 0; 242 cu->cu_cwnd = MAXCWND / 2; 243 cu->cu_sent = 0; 244 cu->cu_cwnd_wait = FALSE; 245 (void) getmicrotime(&now); 246 cu->cu_xid = __RPC_GETXID(&now); 247 call_msg.rm_xid = cu->cu_xid; 248 call_msg.rm_call.cb_prog = program; 249 call_msg.rm_call.cb_vers = version; 250 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 251 if (! xdr_callhdr(&xdrs, &call_msg)) { 252 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 253 rpc_createerr.cf_error.re_errno = 0; 254 goto err2; 255 } 256 cu->cu_mcalllen = XDR_GETPOS(&xdrs); 257 258 /* 259 * By default, closeit is always FALSE. It is users responsibility 260 * to do a close on it, else the user may use clnt_control 261 * to let clnt_destroy do it for him/her. 262 */ 263 cu->cu_closeit = FALSE; 264 cu->cu_socket = so; 265 error = soreserve(so, (u_long)sendsz, (u_long)recvsz); 266 if (error != 0) { 267 rpc_createerr.cf_stat = RPC_FAILED; 268 rpc_createerr.cf_error.re_errno = error; 269 goto err2; 270 } 271 272 sb = &so->so_rcv; 273 SOCKBUF_LOCK(&so->so_rcv); 274 recheck_socket: 275 if (sb->sb_upcall) { 276 if (sb->sb_upcall != clnt_dg_soupcall) { 277 SOCKBUF_UNLOCK(&so->so_rcv); 278 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 279 goto err2; 280 } 281 cs = (struct cu_socket *) sb->sb_upcallarg; 282 mtx_lock(&cs->cs_lock); 283 cs->cs_refs++; 284 mtx_unlock(&cs->cs_lock); 285 } else { 286 /* 287 * We are the first on this socket - allocate the 288 * structure and install it in the socket. 289 */ 290 SOCKBUF_UNLOCK(&so->so_rcv); 291 cs = mem_alloc(sizeof(*cs)); 292 SOCKBUF_LOCK(&so->so_rcv); 293 if (sb->sb_upcall) { 294 /* 295 * We have lost a race with some other client. 296 */ 297 mem_free(cs, sizeof(*cs)); 298 goto recheck_socket; 299 } 300 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 301 cs->cs_refs = 1; 302 cs->cs_upcallrefs = 0; 303 TAILQ_INIT(&cs->cs_pending); 304 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 305 } 306 SOCKBUF_UNLOCK(&so->so_rcv); 307 308 cl->cl_refs = 1; 309 cl->cl_ops = &clnt_dg_ops; 310 cl->cl_private = (caddr_t)(void *)cu; 311 cl->cl_auth = authnone_create(); 312 cl->cl_tp = NULL; 313 cl->cl_netid = NULL; 314 return (cl); 315 err2: 316 if (cl) { 317 mem_free(cl, sizeof (CLIENT)); 318 if (cu) 319 mem_free(cu, sizeof (*cu)); 320 } 321 return (NULL); 322 } 323 324 static enum clnt_stat 325 clnt_dg_call( 326 CLIENT *cl, /* client handle */ 327 struct rpc_callextra *ext, /* call metadata */ 328 rpcproc_t proc, /* procedure number */ 329 struct mbuf *args, /* pointer to args */ 330 struct mbuf **resultsp, /* pointer to results */ 331 struct timeval utimeout) /* seconds to wait before giving up */ 332 { 333 struct cu_data *cu = (struct cu_data *)cl->cl_private; 334 struct cu_socket *cs; 335 struct rpc_timers *rt; 336 AUTH *auth; 337 struct rpc_err *errp; 338 enum clnt_stat stat; 339 XDR xdrs; 340 struct rpc_msg reply_msg; 341 bool_t ok; 342 int retrans; /* number of re-transmits so far */ 343 int nrefreshes = 2; /* number of times to refresh cred */ 344 struct timeval *tvp; 345 int timeout; 346 int retransmit_time; 347 int next_sendtime, starttime, rtt, time_waited, tv = 0; 348 struct sockaddr *sa; 349 socklen_t salen; 350 uint32_t xid = 0; 351 struct mbuf *mreq = NULL, *results; 352 struct cu_request *cr; 353 int error; 354 355 cs = cu->cu_socket->so_rcv.sb_upcallarg; 356 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 357 358 mtx_lock(&cs->cs_lock); 359 360 if (cu->cu_closing || cu->cu_closed) { 361 mtx_unlock(&cs->cs_lock); 362 free(cr, M_RPC); 363 return (RPC_CANTSEND); 364 } 365 cu->cu_threads++; 366 367 if (ext) { 368 auth = ext->rc_auth; 369 errp = &ext->rc_err; 370 } else { 371 auth = cl->cl_auth; 372 errp = &cu->cu_error; 373 } 374 375 cr->cr_client = cl; 376 cr->cr_mrep = NULL; 377 cr->cr_error = 0; 378 379 if (cu->cu_total.tv_usec == -1) { 380 tvp = &utimeout; /* use supplied timeout */ 381 } else { 382 tvp = &cu->cu_total; /* use default timeout */ 383 } 384 if (tvp->tv_sec || tvp->tv_usec) 385 timeout = tvtohz(tvp); 386 else 387 timeout = 0; 388 389 if (cu->cu_connect && !cu->cu_connected) { 390 mtx_unlock(&cs->cs_lock); 391 error = soconnect(cu->cu_socket, 392 (struct sockaddr *)&cu->cu_raddr, curthread); 393 mtx_lock(&cs->cs_lock); 394 if (error) { 395 errp->re_errno = error; 396 errp->re_status = stat = RPC_CANTSEND; 397 goto out; 398 } 399 cu->cu_connected = 1; 400 } 401 if (cu->cu_connected) { 402 sa = NULL; 403 salen = 0; 404 } else { 405 sa = (struct sockaddr *)&cu->cu_raddr; 406 salen = cu->cu_rlen; 407 } 408 time_waited = 0; 409 retrans = 0; 410 if (ext && ext->rc_timers) { 411 rt = ext->rc_timers; 412 if (!rt->rt_rtxcur) 413 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 414 retransmit_time = next_sendtime = rt->rt_rtxcur; 415 } else { 416 rt = NULL; 417 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 418 } 419 420 starttime = ticks; 421 422 call_again: 423 mtx_assert(&cs->cs_lock, MA_OWNED); 424 425 cu->cu_xid++; 426 xid = cu->cu_xid; 427 428 send_again: 429 mtx_unlock(&cs->cs_lock); 430 431 mreq = m_gethdr(M_WAITOK, MT_DATA); 432 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 433 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 434 mreq->m_len = cu->cu_mcalllen; 435 436 /* 437 * The XID is the first thing in the request. 438 */ 439 *mtod(mreq, uint32_t *) = htonl(xid); 440 441 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 442 443 if (cu->cu_async == TRUE && args == NULL) 444 goto get_reply; 445 446 if ((! XDR_PUTINT32(&xdrs, &proc)) || 447 (! AUTH_MARSHALL(auth, xid, &xdrs, 448 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 449 errp->re_status = stat = RPC_CANTENCODEARGS; 450 mtx_lock(&cs->cs_lock); 451 goto out; 452 } 453 mreq->m_pkthdr.len = m_length(mreq, NULL); 454 455 cr->cr_xid = xid; 456 mtx_lock(&cs->cs_lock); 457 458 /* 459 * Try to get a place in the congestion window. 460 */ 461 while (cu->cu_sent >= cu->cu_cwnd) { 462 cu->cu_cwnd_wait = TRUE; 463 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 464 cu->cu_waitflag, "rpccwnd", 0); 465 if (error) { 466 errp->re_errno = error; 467 if (error == EINTR || error == ERESTART) 468 errp->re_status = stat = RPC_INTR; 469 else 470 errp->re_status = stat = RPC_CANTSEND; 471 goto out; 472 } 473 } 474 cu->cu_sent += CWNDSCALE; 475 476 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 477 mtx_unlock(&cs->cs_lock); 478 479 /* 480 * sosend consumes mreq. 481 */ 482 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 483 mreq = NULL; 484 485 /* 486 * sub-optimal code appears here because we have 487 * some clock time to spare while the packets are in flight. 488 * (We assume that this is actually only executed once.) 489 */ 490 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 491 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 492 reply_msg.acpted_rply.ar_verf.oa_length = 0; 493 reply_msg.acpted_rply.ar_results.where = NULL; 494 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 495 496 mtx_lock(&cs->cs_lock); 497 if (error) { 498 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 499 errp->re_errno = error; 500 errp->re_status = stat = RPC_CANTSEND; 501 cu->cu_sent -= CWNDSCALE; 502 if (cu->cu_cwnd_wait) { 503 cu->cu_cwnd_wait = FALSE; 504 wakeup(&cu->cu_cwnd_wait); 505 } 506 goto out; 507 } 508 509 /* 510 * Check to see if we got an upcall while waiting for the 511 * lock. 512 */ 513 if (cr->cr_error) { 514 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 515 errp->re_errno = cr->cr_error; 516 errp->re_status = stat = RPC_CANTRECV; 517 cu->cu_sent -= CWNDSCALE; 518 if (cu->cu_cwnd_wait) { 519 cu->cu_cwnd_wait = FALSE; 520 wakeup(&cu->cu_cwnd_wait); 521 } 522 goto out; 523 } 524 if (cr->cr_mrep) { 525 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 526 cu->cu_sent -= CWNDSCALE; 527 if (cu->cu_cwnd_wait) { 528 cu->cu_cwnd_wait = FALSE; 529 wakeup(&cu->cu_cwnd_wait); 530 } 531 goto got_reply; 532 } 533 534 /* 535 * Hack to provide rpc-based message passing 536 */ 537 if (timeout == 0) { 538 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 539 errp->re_status = stat = RPC_TIMEDOUT; 540 cu->cu_sent -= CWNDSCALE; 541 if (cu->cu_cwnd_wait) { 542 cu->cu_cwnd_wait = FALSE; 543 wakeup(&cu->cu_cwnd_wait); 544 } 545 goto out; 546 } 547 548 get_reply: 549 for (;;) { 550 /* Decide how long to wait. */ 551 if (next_sendtime < timeout) 552 tv = next_sendtime; 553 else 554 tv = timeout; 555 tv -= time_waited; 556 557 if (tv > 0) { 558 if (cu->cu_closing || cu->cu_closed) { 559 error = 0; 560 cr->cr_error = ESHUTDOWN; 561 } else { 562 error = msleep(cr, &cs->cs_lock, 563 cu->cu_waitflag, cu->cu_waitchan, tv); 564 } 565 } else { 566 error = EWOULDBLOCK; 567 } 568 569 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 570 cu->cu_sent -= CWNDSCALE; 571 if (cu->cu_cwnd_wait) { 572 cu->cu_cwnd_wait = FALSE; 573 wakeup(&cu->cu_cwnd_wait); 574 } 575 576 if (!error) { 577 /* 578 * We were woken up by the upcall. If the 579 * upcall had a receive error, report that, 580 * otherwise we have a reply. 581 */ 582 if (cr->cr_error) { 583 errp->re_errno = cr->cr_error; 584 errp->re_status = stat = RPC_CANTRECV; 585 goto out; 586 } 587 588 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 589 + cu->cu_cwnd / 2) / cu->cu_cwnd; 590 if (cu->cu_cwnd > MAXCWND) 591 cu->cu_cwnd = MAXCWND; 592 593 if (rt) { 594 /* 595 * Add one to the time since a tick 596 * count of N means that the actual 597 * time taken was somewhere between N 598 * and N+1. 599 */ 600 rtt = ticks - starttime + 1; 601 602 /* 603 * Update our estimate of the round 604 * trip time using roughly the 605 * algorithm described in RFC 606 * 2988. Given an RTT sample R: 607 * 608 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 609 * SRTT = (1-alpha) * SRTT + alpha * R 610 * 611 * where alpha = 0.125 and beta = 0.25. 612 * 613 * The initial retransmit timeout is 614 * SRTT + 4*RTTVAR and doubles on each 615 * retransmision. 616 */ 617 if (rt->rt_srtt == 0) { 618 rt->rt_srtt = rtt; 619 rt->rt_deviate = rtt / 2; 620 } else { 621 int32_t error = rtt - rt->rt_srtt; 622 rt->rt_srtt += error / 8; 623 error = abs(error) - rt->rt_deviate; 624 rt->rt_deviate += error / 4; 625 } 626 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 627 } 628 629 break; 630 } 631 632 /* 633 * The sleep returned an error so our request is still 634 * on the list. If we got EWOULDBLOCK, we may want to 635 * re-send the request. 636 */ 637 if (error != EWOULDBLOCK) { 638 errp->re_errno = error; 639 if (error == EINTR || error == ERESTART) 640 errp->re_status = stat = RPC_INTR; 641 else 642 errp->re_status = stat = RPC_CANTRECV; 643 goto out; 644 } 645 646 time_waited = ticks - starttime; 647 648 /* Check for timeout. */ 649 if (time_waited > timeout) { 650 errp->re_errno = EWOULDBLOCK; 651 errp->re_status = stat = RPC_TIMEDOUT; 652 goto out; 653 } 654 655 /* Retransmit if necessary. */ 656 if (time_waited >= next_sendtime) { 657 cu->cu_cwnd /= 2; 658 if (cu->cu_cwnd < CWNDSCALE) 659 cu->cu_cwnd = CWNDSCALE; 660 if (ext && ext->rc_feedback) { 661 mtx_unlock(&cs->cs_lock); 662 if (retrans == 0) 663 ext->rc_feedback(FEEDBACK_REXMIT1, 664 proc, ext->rc_feedback_arg); 665 else 666 ext->rc_feedback(FEEDBACK_REXMIT2, 667 proc, ext->rc_feedback_arg); 668 mtx_lock(&cs->cs_lock); 669 } 670 if (cu->cu_closing || cu->cu_closed) { 671 errp->re_errno = ESHUTDOWN; 672 errp->re_status = stat = RPC_CANTRECV; 673 goto out; 674 } 675 retrans++; 676 /* update retransmit_time */ 677 if (retransmit_time < RPC_MAX_BACKOFF * hz) 678 retransmit_time = 2 * retransmit_time; 679 next_sendtime += retransmit_time; 680 goto send_again; 681 } 682 cu->cu_sent += CWNDSCALE; 683 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 684 } 685 686 got_reply: 687 /* 688 * Now decode and validate the response. We need to drop the 689 * lock since xdr_replymsg may end up sleeping in malloc. 690 */ 691 mtx_unlock(&cs->cs_lock); 692 693 if (ext && ext->rc_feedback) 694 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 695 696 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 697 ok = xdr_replymsg(&xdrs, &reply_msg); 698 cr->cr_mrep = NULL; 699 700 if (ok) { 701 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 702 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 703 errp->re_status = stat = RPC_SUCCESS; 704 else 705 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 706 707 if (errp->re_status == RPC_SUCCESS) { 708 results = xdrmbuf_getall(&xdrs); 709 if (! AUTH_VALIDATE(auth, xid, 710 &reply_msg.acpted_rply.ar_verf, 711 &results)) { 712 errp->re_status = stat = RPC_AUTHERROR; 713 errp->re_why = AUTH_INVALIDRESP; 714 if (retrans && 715 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 716 /* 717 * If we retransmitted, its 718 * possible that we will 719 * receive a reply for one of 720 * the earlier transmissions 721 * (which will use an older 722 * RPCSEC_GSS sequence 723 * number). In this case, just 724 * go back and listen for a 725 * new reply. We could keep a 726 * record of all the seq 727 * numbers we have transmitted 728 * so far so that we could 729 * accept a reply for any of 730 * them here. 731 */ 732 XDR_DESTROY(&xdrs); 733 mtx_lock(&cs->cs_lock); 734 cu->cu_sent += CWNDSCALE; 735 TAILQ_INSERT_TAIL(&cs->cs_pending, 736 cr, cr_link); 737 cr->cr_mrep = NULL; 738 goto get_reply; 739 } 740 } else { 741 *resultsp = results; 742 } 743 } /* end successful completion */ 744 /* 745 * If unsuccesful AND error is an authentication error 746 * then refresh credentials and try again, else break 747 */ 748 else if (stat == RPC_AUTHERROR) 749 /* maybe our credentials need to be refreshed ... */ 750 if (nrefreshes > 0 && 751 AUTH_REFRESH(auth, &reply_msg)) { 752 nrefreshes--; 753 XDR_DESTROY(&xdrs); 754 mtx_lock(&cs->cs_lock); 755 goto call_again; 756 } 757 /* end of unsuccessful completion */ 758 } /* end of valid reply message */ 759 else { 760 errp->re_status = stat = RPC_CANTDECODERES; 761 762 } 763 XDR_DESTROY(&xdrs); 764 mtx_lock(&cs->cs_lock); 765 out: 766 mtx_assert(&cs->cs_lock, MA_OWNED); 767 768 if (mreq) 769 m_freem(mreq); 770 if (cr->cr_mrep) 771 m_freem(cr->cr_mrep); 772 773 cu->cu_threads--; 774 if (cu->cu_closing) 775 wakeup(cu); 776 777 mtx_unlock(&cs->cs_lock); 778 779 if (auth && stat != RPC_SUCCESS) 780 AUTH_VALIDATE(auth, xid, NULL, NULL); 781 782 free(cr, M_RPC); 783 784 return (stat); 785 } 786 787 static void 788 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 789 { 790 struct cu_data *cu = (struct cu_data *)cl->cl_private; 791 792 *errp = cu->cu_error; 793 } 794 795 static bool_t 796 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 797 { 798 XDR xdrs; 799 bool_t dummy; 800 801 xdrs.x_op = XDR_FREE; 802 dummy = (*xdr_res)(&xdrs, res_ptr); 803 804 return (dummy); 805 } 806 807 /*ARGSUSED*/ 808 static void 809 clnt_dg_abort(CLIENT *h) 810 { 811 } 812 813 static bool_t 814 clnt_dg_control(CLIENT *cl, u_int request, void *info) 815 { 816 struct cu_data *cu = (struct cu_data *)cl->cl_private; 817 struct cu_socket *cs; 818 struct sockaddr *addr; 819 820 cs = cu->cu_socket->so_rcv.sb_upcallarg; 821 mtx_lock(&cs->cs_lock); 822 823 switch (request) { 824 case CLSET_FD_CLOSE: 825 cu->cu_closeit = TRUE; 826 mtx_unlock(&cs->cs_lock); 827 return (TRUE); 828 case CLSET_FD_NCLOSE: 829 cu->cu_closeit = FALSE; 830 mtx_unlock(&cs->cs_lock); 831 return (TRUE); 832 } 833 834 /* for other requests which use info */ 835 if (info == NULL) { 836 mtx_unlock(&cs->cs_lock); 837 return (FALSE); 838 } 839 switch (request) { 840 case CLSET_TIMEOUT: 841 if (time_not_ok((struct timeval *)info)) { 842 mtx_unlock(&cs->cs_lock); 843 return (FALSE); 844 } 845 cu->cu_total = *(struct timeval *)info; 846 break; 847 case CLGET_TIMEOUT: 848 *(struct timeval *)info = cu->cu_total; 849 break; 850 case CLSET_RETRY_TIMEOUT: 851 if (time_not_ok((struct timeval *)info)) { 852 mtx_unlock(&cs->cs_lock); 853 return (FALSE); 854 } 855 cu->cu_wait = *(struct timeval *)info; 856 break; 857 case CLGET_RETRY_TIMEOUT: 858 *(struct timeval *)info = cu->cu_wait; 859 break; 860 case CLGET_SVC_ADDR: 861 /* 862 * Slightly different semantics to userland - we use 863 * sockaddr instead of netbuf. 864 */ 865 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 866 break; 867 case CLSET_SVC_ADDR: /* set to new address */ 868 addr = (struct sockaddr *)info; 869 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 870 break; 871 case CLGET_XID: 872 *(uint32_t *)info = cu->cu_xid; 873 break; 874 875 case CLSET_XID: 876 /* This will set the xid of the NEXT call */ 877 /* decrement by 1 as clnt_dg_call() increments once */ 878 cu->cu_xid = *(uint32_t *)info - 1; 879 break; 880 881 case CLGET_VERS: 882 /* 883 * This RELIES on the information that, in the call body, 884 * the version number field is the fifth field from the 885 * begining of the RPC header. MUST be changed if the 886 * call_struct is changed 887 */ 888 *(uint32_t *)info = 889 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 890 4 * BYTES_PER_XDR_UNIT)); 891 break; 892 893 case CLSET_VERS: 894 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 895 = htonl(*(uint32_t *)info); 896 break; 897 898 case CLGET_PROG: 899 /* 900 * This RELIES on the information that, in the call body, 901 * the program number field is the fourth field from the 902 * begining of the RPC header. MUST be changed if the 903 * call_struct is changed 904 */ 905 *(uint32_t *)info = 906 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 907 3 * BYTES_PER_XDR_UNIT)); 908 break; 909 910 case CLSET_PROG: 911 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 912 = htonl(*(uint32_t *)info); 913 break; 914 case CLSET_ASYNC: 915 cu->cu_async = *(int *)info; 916 break; 917 case CLSET_CONNECT: 918 cu->cu_connect = *(int *)info; 919 break; 920 case CLSET_WAITCHAN: 921 cu->cu_waitchan = (const char *)info; 922 break; 923 case CLGET_WAITCHAN: 924 *(const char **) info = cu->cu_waitchan; 925 break; 926 case CLSET_INTERRUPTIBLE: 927 if (*(int *) info) 928 cu->cu_waitflag = PCATCH; 929 else 930 cu->cu_waitflag = 0; 931 break; 932 case CLGET_INTERRUPTIBLE: 933 if (cu->cu_waitflag) 934 *(int *) info = TRUE; 935 else 936 *(int *) info = FALSE; 937 break; 938 default: 939 mtx_unlock(&cs->cs_lock); 940 return (FALSE); 941 } 942 mtx_unlock(&cs->cs_lock); 943 return (TRUE); 944 } 945 946 static void 947 clnt_dg_close(CLIENT *cl) 948 { 949 struct cu_data *cu = (struct cu_data *)cl->cl_private; 950 struct cu_socket *cs; 951 struct cu_request *cr; 952 953 cs = cu->cu_socket->so_rcv.sb_upcallarg; 954 mtx_lock(&cs->cs_lock); 955 956 if (cu->cu_closed) { 957 mtx_unlock(&cs->cs_lock); 958 return; 959 } 960 961 if (cu->cu_closing) { 962 while (cu->cu_closing) 963 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 964 KASSERT(cu->cu_closed, ("client should be closed")); 965 mtx_unlock(&cs->cs_lock); 966 return; 967 } 968 969 /* 970 * Abort any pending requests and wait until everyone 971 * has finished with clnt_vc_call. 972 */ 973 cu->cu_closing = TRUE; 974 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 975 if (cr->cr_client == cl) { 976 cr->cr_xid = 0; 977 cr->cr_error = ESHUTDOWN; 978 wakeup(cr); 979 } 980 } 981 982 while (cu->cu_threads) 983 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 984 985 cu->cu_closing = FALSE; 986 cu->cu_closed = TRUE; 987 988 mtx_unlock(&cs->cs_lock); 989 wakeup(cu); 990 } 991 992 static void 993 clnt_dg_destroy(CLIENT *cl) 994 { 995 struct cu_data *cu = (struct cu_data *)cl->cl_private; 996 struct cu_socket *cs; 997 struct socket *so = NULL; 998 bool_t lastsocketref; 999 1000 cs = cu->cu_socket->so_rcv.sb_upcallarg; 1001 clnt_dg_close(cl); 1002 1003 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 1004 mtx_lock(&cs->cs_lock); 1005 1006 cs->cs_refs--; 1007 if (cs->cs_refs == 0) { 1008 mtx_unlock(&cs->cs_lock); 1009 soupcall_clear(cu->cu_socket, SO_RCV); 1010 clnt_dg_upcallsdone(cu->cu_socket, cs); 1011 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1012 mtx_destroy(&cs->cs_lock); 1013 mem_free(cs, sizeof(*cs)); 1014 lastsocketref = TRUE; 1015 } else { 1016 mtx_unlock(&cs->cs_lock); 1017 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1018 lastsocketref = FALSE; 1019 } 1020 1021 if (cu->cu_closeit && lastsocketref) { 1022 so = cu->cu_socket; 1023 cu->cu_socket = NULL; 1024 } 1025 1026 if (so) 1027 soclose(so); 1028 1029 if (cl->cl_netid && cl->cl_netid[0]) 1030 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1031 if (cl->cl_tp && cl->cl_tp[0]) 1032 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1033 mem_free(cu, sizeof (*cu)); 1034 mem_free(cl, sizeof (CLIENT)); 1035 } 1036 1037 /* 1038 * Make sure that the time is not garbage. -1 value is allowed. 1039 */ 1040 static bool_t 1041 time_not_ok(struct timeval *t) 1042 { 1043 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1044 t->tv_usec < -1 || t->tv_usec > 1000000); 1045 } 1046 1047 int 1048 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1049 { 1050 struct cu_socket *cs = (struct cu_socket *) arg; 1051 struct uio uio; 1052 struct mbuf *m; 1053 struct mbuf *control; 1054 struct cu_request *cr; 1055 int error, rcvflag, foundreq; 1056 uint32_t xid; 1057 1058 cs->cs_upcallrefs++; 1059 uio.uio_resid = 1000000000; 1060 uio.uio_td = curthread; 1061 do { 1062 SOCKBUF_UNLOCK(&so->so_rcv); 1063 m = NULL; 1064 control = NULL; 1065 rcvflag = MSG_DONTWAIT; 1066 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1067 if (control) 1068 m_freem(control); 1069 SOCKBUF_LOCK(&so->so_rcv); 1070 1071 if (error == EWOULDBLOCK) 1072 break; 1073 1074 /* 1075 * If there was an error, wake up all pending 1076 * requests. 1077 */ 1078 if (error) { 1079 mtx_lock(&cs->cs_lock); 1080 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1081 cr->cr_xid = 0; 1082 cr->cr_error = error; 1083 wakeup(cr); 1084 } 1085 mtx_unlock(&cs->cs_lock); 1086 break; 1087 } 1088 1089 /* 1090 * The XID is in the first uint32_t of the reply. 1091 */ 1092 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) { 1093 /* 1094 * Should never happen. 1095 */ 1096 m_freem(m); 1097 continue; 1098 } 1099 1100 m_copydata(m, 0, sizeof(xid), (char *)&xid); 1101 xid = ntohl(xid); 1102 1103 /* 1104 * Attempt to match this reply with a pending request. 1105 */ 1106 mtx_lock(&cs->cs_lock); 1107 foundreq = 0; 1108 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1109 if (cr->cr_xid == xid) { 1110 /* 1111 * This one matches. We leave the 1112 * reply mbuf in cr->cr_mrep. Set the 1113 * XID to zero so that we will ignore 1114 * any duplicated replies that arrive 1115 * before clnt_dg_call removes it from 1116 * the queue. 1117 */ 1118 cr->cr_xid = 0; 1119 cr->cr_mrep = m; 1120 cr->cr_error = 0; 1121 foundreq = 1; 1122 wakeup(cr); 1123 break; 1124 } 1125 } 1126 mtx_unlock(&cs->cs_lock); 1127 1128 /* 1129 * If we didn't find the matching request, just drop 1130 * it - its probably a repeated reply. 1131 */ 1132 if (!foundreq) 1133 m_freem(m); 1134 } while (m); 1135 cs->cs_upcallrefs--; 1136 if (cs->cs_upcallrefs < 0) 1137 panic("rpcdg upcall refcnt"); 1138 if (cs->cs_upcallrefs == 0) 1139 wakeup(&cs->cs_upcallrefs); 1140 return (SU_OK); 1141 } 1142 1143 /* 1144 * Wait for all upcalls in progress to complete. 1145 */ 1146 static void 1147 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1148 { 1149 1150 SOCKBUF_LOCK_ASSERT(&so->so_rcv); 1151 1152 while (cs->cs_upcallrefs > 0) 1153 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1154 "rpcdgup", 0); 1155 } 1156