1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 /* 31 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 32 */ 33 34 #if defined(LIBC_SCCS) && !defined(lint) 35 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 36 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 37 #endif 38 #include <sys/cdefs.h> 39 __FBSDID("$FreeBSD$"); 40 41 /* 42 * Implements a connectionless client side RPC. 43 */ 44 45 #include <sys/param.h> 46 #include <sys/systm.h> 47 #include <sys/kernel.h> 48 #include <sys/lock.h> 49 #include <sys/malloc.h> 50 #include <sys/mbuf.h> 51 #include <sys/mutex.h> 52 #include <sys/pcpu.h> 53 #include <sys/proc.h> 54 #include <sys/socket.h> 55 #include <sys/socketvar.h> 56 #include <sys/time.h> 57 #include <sys/uio.h> 58 59 #include <net/vnet.h> 60 61 #include <rpc/rpc.h> 62 #include <rpc/rpc_com.h> 63 64 65 #ifdef _FREEFALL_CONFIG 66 /* 67 * Disable RPC exponential back-off for FreeBSD.org systems. 68 */ 69 #define RPC_MAX_BACKOFF 1 /* second */ 70 #else 71 #define RPC_MAX_BACKOFF 30 /* seconds */ 72 #endif 73 74 static bool_t time_not_ok(struct timeval *); 75 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 76 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 77 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 78 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 79 static void clnt_dg_abort(CLIENT *); 80 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 81 static void clnt_dg_close(CLIENT *); 82 static void clnt_dg_destroy(CLIENT *); 83 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 84 85 static struct clnt_ops clnt_dg_ops = { 86 .cl_call = clnt_dg_call, 87 .cl_abort = clnt_dg_abort, 88 .cl_geterr = clnt_dg_geterr, 89 .cl_freeres = clnt_dg_freeres, 90 .cl_close = clnt_dg_close, 91 .cl_destroy = clnt_dg_destroy, 92 .cl_control = clnt_dg_control 93 }; 94 95 /* 96 * A pending RPC request which awaits a reply. Requests which have 97 * received their reply will have cr_xid set to zero and cr_mrep to 98 * the mbuf chain of the reply. 99 */ 100 struct cu_request { 101 TAILQ_ENTRY(cu_request) cr_link; 102 CLIENT *cr_client; /* owner */ 103 uint32_t cr_xid; /* XID of request */ 104 struct mbuf *cr_mrep; /* reply received by upcall */ 105 int cr_error; /* any error from upcall */ 106 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 107 }; 108 109 TAILQ_HEAD(cu_request_list, cu_request); 110 111 #define MCALL_MSG_SIZE 24 112 113 /* 114 * This structure is pointed to by the socket buffer's sb_upcallarg 115 * member. It is separate from the client private data to facilitate 116 * multiple clients sharing the same socket. The cs_lock mutex is used 117 * to protect all fields of this structure, the socket's receive 118 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 119 * structures is installed on the socket. 120 */ 121 struct cu_socket { 122 struct mtx cs_lock; 123 int cs_refs; /* Count of clients */ 124 struct cu_request_list cs_pending; /* Requests awaiting replies */ 125 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 126 }; 127 128 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 129 130 /* 131 * Private data kept per client handle 132 */ 133 struct cu_data { 134 int cu_threads; /* # threads in clnt_vc_call */ 135 bool_t cu_closing; /* TRUE if we are closing */ 136 bool_t cu_closed; /* TRUE if we are closed */ 137 struct socket *cu_socket; /* connection socket */ 138 bool_t cu_closeit; /* opened by library */ 139 struct sockaddr_storage cu_raddr; /* remote address */ 140 int cu_rlen; 141 struct timeval cu_wait; /* retransmit interval */ 142 struct timeval cu_total; /* total time for the call */ 143 struct rpc_err cu_error; 144 uint32_t cu_xid; 145 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 146 size_t cu_mcalllen; 147 size_t cu_sendsz; /* send size */ 148 size_t cu_recvsz; /* recv size */ 149 int cu_async; 150 int cu_connect; /* Use connect(). */ 151 int cu_connected; /* Have done connect(). */ 152 const char *cu_waitchan; 153 int cu_waitflag; 154 int cu_cwnd; /* congestion window */ 155 int cu_sent; /* number of in-flight RPCs */ 156 bool_t cu_cwnd_wait; 157 }; 158 159 #define CWNDSCALE 256 160 #define MAXCWND (32 * CWNDSCALE) 161 162 /* 163 * Connection less client creation returns with client handle parameters. 164 * Default options are set, which the user can change using clnt_control(). 165 * fd should be open and bound. 166 * NB: The rpch->cl_auth is initialized to null authentication. 167 * Caller may wish to set this something more useful. 168 * 169 * sendsz and recvsz are the maximum allowable packet sizes that can be 170 * sent and received. Normally they are the same, but they can be 171 * changed to improve the program efficiency and buffer allocation. 172 * If they are 0, use the transport default. 173 * 174 * If svcaddr is NULL, returns NULL. 175 */ 176 CLIENT * 177 clnt_dg_create( 178 struct socket *so, 179 struct sockaddr *svcaddr, /* servers address */ 180 rpcprog_t program, /* program number */ 181 rpcvers_t version, /* version number */ 182 size_t sendsz, /* buffer recv size */ 183 size_t recvsz) /* buffer send size */ 184 { 185 CLIENT *cl = NULL; /* client handle */ 186 struct cu_data *cu = NULL; /* private data */ 187 struct cu_socket *cs = NULL; 188 struct sockbuf *sb; 189 struct timeval now; 190 struct rpc_msg call_msg; 191 struct __rpc_sockinfo si; 192 XDR xdrs; 193 int error; 194 195 if (svcaddr == NULL) { 196 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 197 return (NULL); 198 } 199 200 if (!__rpc_socket2sockinfo(so, &si)) { 201 rpc_createerr.cf_stat = RPC_TLIERROR; 202 rpc_createerr.cf_error.re_errno = 0; 203 return (NULL); 204 } 205 206 /* 207 * Find the receive and the send size 208 */ 209 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 210 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 211 if ((sendsz == 0) || (recvsz == 0)) { 212 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 213 rpc_createerr.cf_error.re_errno = 0; 214 return (NULL); 215 } 216 217 cl = mem_alloc(sizeof (CLIENT)); 218 219 /* 220 * Should be multiple of 4 for XDR. 221 */ 222 sendsz = rounddown(sendsz + 3, 4); 223 recvsz = rounddown(recvsz + 3, 4); 224 cu = mem_alloc(sizeof (*cu)); 225 cu->cu_threads = 0; 226 cu->cu_closing = FALSE; 227 cu->cu_closed = FALSE; 228 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 229 cu->cu_rlen = svcaddr->sa_len; 230 /* Other values can also be set through clnt_control() */ 231 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 232 cu->cu_wait.tv_usec = 0; 233 cu->cu_total.tv_sec = -1; 234 cu->cu_total.tv_usec = -1; 235 cu->cu_sendsz = sendsz; 236 cu->cu_recvsz = recvsz; 237 cu->cu_async = FALSE; 238 cu->cu_connect = FALSE; 239 cu->cu_connected = FALSE; 240 cu->cu_waitchan = "rpcrecv"; 241 cu->cu_waitflag = 0; 242 cu->cu_cwnd = MAXCWND / 2; 243 cu->cu_sent = 0; 244 cu->cu_cwnd_wait = FALSE; 245 (void) getmicrotime(&now); 246 cu->cu_xid = __RPC_GETXID(&now); 247 call_msg.rm_xid = cu->cu_xid; 248 call_msg.rm_call.cb_prog = program; 249 call_msg.rm_call.cb_vers = version; 250 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 251 if (! xdr_callhdr(&xdrs, &call_msg)) { 252 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 253 rpc_createerr.cf_error.re_errno = 0; 254 goto err2; 255 } 256 cu->cu_mcalllen = XDR_GETPOS(&xdrs); 257 258 /* 259 * By default, closeit is always FALSE. It is users responsibility 260 * to do a close on it, else the user may use clnt_control 261 * to let clnt_destroy do it for him/her. 262 */ 263 cu->cu_closeit = FALSE; 264 cu->cu_socket = so; 265 error = soreserve(so, (u_long)sendsz, (u_long)recvsz); 266 if (error != 0) { 267 rpc_createerr.cf_stat = RPC_FAILED; 268 rpc_createerr.cf_error.re_errno = error; 269 goto err2; 270 } 271 272 sb = &so->so_rcv; 273 SOCKBUF_LOCK(&so->so_rcv); 274 recheck_socket: 275 if (sb->sb_upcall) { 276 if (sb->sb_upcall != clnt_dg_soupcall) { 277 SOCKBUF_UNLOCK(&so->so_rcv); 278 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 279 goto err2; 280 } 281 cs = (struct cu_socket *) sb->sb_upcallarg; 282 mtx_lock(&cs->cs_lock); 283 cs->cs_refs++; 284 mtx_unlock(&cs->cs_lock); 285 } else { 286 /* 287 * We are the first on this socket - allocate the 288 * structure and install it in the socket. 289 */ 290 SOCKBUF_UNLOCK(&so->so_rcv); 291 cs = mem_alloc(sizeof(*cs)); 292 SOCKBUF_LOCK(&so->so_rcv); 293 if (sb->sb_upcall) { 294 /* 295 * We have lost a race with some other client. 296 */ 297 mem_free(cs, sizeof(*cs)); 298 goto recheck_socket; 299 } 300 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 301 cs->cs_refs = 1; 302 cs->cs_upcallrefs = 0; 303 TAILQ_INIT(&cs->cs_pending); 304 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 305 } 306 SOCKBUF_UNLOCK(&so->so_rcv); 307 308 cl->cl_refs = 1; 309 cl->cl_ops = &clnt_dg_ops; 310 cl->cl_private = (caddr_t)(void *)cu; 311 cl->cl_auth = authnone_create(); 312 cl->cl_tp = NULL; 313 cl->cl_netid = NULL; 314 return (cl); 315 err2: 316 mem_free(cl, sizeof (CLIENT)); 317 mem_free(cu, sizeof (*cu)); 318 319 return (NULL); 320 } 321 322 static enum clnt_stat 323 clnt_dg_call( 324 CLIENT *cl, /* client handle */ 325 struct rpc_callextra *ext, /* call metadata */ 326 rpcproc_t proc, /* procedure number */ 327 struct mbuf *args, /* pointer to args */ 328 struct mbuf **resultsp, /* pointer to results */ 329 struct timeval utimeout) /* seconds to wait before giving up */ 330 { 331 struct cu_data *cu = (struct cu_data *)cl->cl_private; 332 struct cu_socket *cs; 333 struct rpc_timers *rt; 334 AUTH *auth; 335 struct rpc_err *errp; 336 enum clnt_stat stat; 337 XDR xdrs; 338 struct rpc_msg reply_msg; 339 bool_t ok; 340 int retrans; /* number of re-transmits so far */ 341 int nrefreshes = 2; /* number of times to refresh cred */ 342 struct timeval *tvp; 343 int timeout; 344 int retransmit_time; 345 int next_sendtime, starttime, rtt, time_waited, tv = 0; 346 struct sockaddr *sa; 347 socklen_t salen; 348 uint32_t xid = 0; 349 struct mbuf *mreq = NULL, *results; 350 struct cu_request *cr; 351 int error; 352 353 cs = cu->cu_socket->so_rcv.sb_upcallarg; 354 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 355 356 mtx_lock(&cs->cs_lock); 357 358 if (cu->cu_closing || cu->cu_closed) { 359 mtx_unlock(&cs->cs_lock); 360 free(cr, M_RPC); 361 return (RPC_CANTSEND); 362 } 363 cu->cu_threads++; 364 365 if (ext) { 366 auth = ext->rc_auth; 367 errp = &ext->rc_err; 368 } else { 369 auth = cl->cl_auth; 370 errp = &cu->cu_error; 371 } 372 373 cr->cr_client = cl; 374 cr->cr_mrep = NULL; 375 cr->cr_error = 0; 376 377 if (cu->cu_total.tv_usec == -1) { 378 tvp = &utimeout; /* use supplied timeout */ 379 } else { 380 tvp = &cu->cu_total; /* use default timeout */ 381 } 382 if (tvp->tv_sec || tvp->tv_usec) 383 timeout = tvtohz(tvp); 384 else 385 timeout = 0; 386 387 if (cu->cu_connect && !cu->cu_connected) { 388 mtx_unlock(&cs->cs_lock); 389 error = soconnect(cu->cu_socket, 390 (struct sockaddr *)&cu->cu_raddr, curthread); 391 mtx_lock(&cs->cs_lock); 392 if (error) { 393 errp->re_errno = error; 394 errp->re_status = stat = RPC_CANTSEND; 395 goto out; 396 } 397 cu->cu_connected = 1; 398 } 399 if (cu->cu_connected) { 400 sa = NULL; 401 salen = 0; 402 } else { 403 sa = (struct sockaddr *)&cu->cu_raddr; 404 salen = cu->cu_rlen; 405 } 406 time_waited = 0; 407 retrans = 0; 408 if (ext && ext->rc_timers) { 409 rt = ext->rc_timers; 410 if (!rt->rt_rtxcur) 411 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 412 retransmit_time = next_sendtime = rt->rt_rtxcur; 413 } else { 414 rt = NULL; 415 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 416 } 417 418 starttime = ticks; 419 420 call_again: 421 mtx_assert(&cs->cs_lock, MA_OWNED); 422 423 cu->cu_xid++; 424 xid = cu->cu_xid; 425 426 send_again: 427 mtx_unlock(&cs->cs_lock); 428 429 mreq = m_gethdr(M_WAITOK, MT_DATA); 430 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 431 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 432 mreq->m_len = cu->cu_mcalllen; 433 434 /* 435 * The XID is the first thing in the request. 436 */ 437 *mtod(mreq, uint32_t *) = htonl(xid); 438 439 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 440 441 if (cu->cu_async == TRUE && args == NULL) 442 goto get_reply; 443 444 if ((! XDR_PUTINT32(&xdrs, &proc)) || 445 (! AUTH_MARSHALL(auth, xid, &xdrs, 446 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 447 errp->re_status = stat = RPC_CANTENCODEARGS; 448 mtx_lock(&cs->cs_lock); 449 goto out; 450 } 451 mreq->m_pkthdr.len = m_length(mreq, NULL); 452 453 cr->cr_xid = xid; 454 mtx_lock(&cs->cs_lock); 455 456 /* 457 * Try to get a place in the congestion window. 458 */ 459 while (cu->cu_sent >= cu->cu_cwnd) { 460 cu->cu_cwnd_wait = TRUE; 461 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 462 cu->cu_waitflag, "rpccwnd", 0); 463 if (error) { 464 errp->re_errno = error; 465 if (error == EINTR || error == ERESTART) 466 errp->re_status = stat = RPC_INTR; 467 else 468 errp->re_status = stat = RPC_CANTSEND; 469 goto out; 470 } 471 } 472 cu->cu_sent += CWNDSCALE; 473 474 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 475 mtx_unlock(&cs->cs_lock); 476 477 /* 478 * sosend consumes mreq. 479 */ 480 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 481 mreq = NULL; 482 483 /* 484 * sub-optimal code appears here because we have 485 * some clock time to spare while the packets are in flight. 486 * (We assume that this is actually only executed once.) 487 */ 488 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 489 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 490 reply_msg.acpted_rply.ar_verf.oa_length = 0; 491 reply_msg.acpted_rply.ar_results.where = NULL; 492 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 493 494 mtx_lock(&cs->cs_lock); 495 if (error) { 496 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 497 errp->re_errno = error; 498 errp->re_status = stat = RPC_CANTSEND; 499 cu->cu_sent -= CWNDSCALE; 500 if (cu->cu_cwnd_wait) { 501 cu->cu_cwnd_wait = FALSE; 502 wakeup(&cu->cu_cwnd_wait); 503 } 504 goto out; 505 } 506 507 /* 508 * Check to see if we got an upcall while waiting for the 509 * lock. 510 */ 511 if (cr->cr_error) { 512 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 513 errp->re_errno = cr->cr_error; 514 errp->re_status = stat = RPC_CANTRECV; 515 cu->cu_sent -= CWNDSCALE; 516 if (cu->cu_cwnd_wait) { 517 cu->cu_cwnd_wait = FALSE; 518 wakeup(&cu->cu_cwnd_wait); 519 } 520 goto out; 521 } 522 if (cr->cr_mrep) { 523 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 524 cu->cu_sent -= CWNDSCALE; 525 if (cu->cu_cwnd_wait) { 526 cu->cu_cwnd_wait = FALSE; 527 wakeup(&cu->cu_cwnd_wait); 528 } 529 goto got_reply; 530 } 531 532 /* 533 * Hack to provide rpc-based message passing 534 */ 535 if (timeout == 0) { 536 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 537 errp->re_status = stat = RPC_TIMEDOUT; 538 cu->cu_sent -= CWNDSCALE; 539 if (cu->cu_cwnd_wait) { 540 cu->cu_cwnd_wait = FALSE; 541 wakeup(&cu->cu_cwnd_wait); 542 } 543 goto out; 544 } 545 546 get_reply: 547 for (;;) { 548 /* Decide how long to wait. */ 549 if (next_sendtime < timeout) 550 tv = next_sendtime; 551 else 552 tv = timeout; 553 tv -= time_waited; 554 555 if (tv > 0) { 556 if (cu->cu_closing || cu->cu_closed) { 557 error = 0; 558 cr->cr_error = ESHUTDOWN; 559 } else { 560 error = msleep(cr, &cs->cs_lock, 561 cu->cu_waitflag, cu->cu_waitchan, tv); 562 } 563 } else { 564 error = EWOULDBLOCK; 565 } 566 567 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 568 cu->cu_sent -= CWNDSCALE; 569 if (cu->cu_cwnd_wait) { 570 cu->cu_cwnd_wait = FALSE; 571 wakeup(&cu->cu_cwnd_wait); 572 } 573 574 if (!error) { 575 /* 576 * We were woken up by the upcall. If the 577 * upcall had a receive error, report that, 578 * otherwise we have a reply. 579 */ 580 if (cr->cr_error) { 581 errp->re_errno = cr->cr_error; 582 errp->re_status = stat = RPC_CANTRECV; 583 goto out; 584 } 585 586 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 587 + cu->cu_cwnd / 2) / cu->cu_cwnd; 588 if (cu->cu_cwnd > MAXCWND) 589 cu->cu_cwnd = MAXCWND; 590 591 if (rt) { 592 /* 593 * Add one to the time since a tick 594 * count of N means that the actual 595 * time taken was somewhere between N 596 * and N+1. 597 */ 598 rtt = ticks - starttime + 1; 599 600 /* 601 * Update our estimate of the round 602 * trip time using roughly the 603 * algorithm described in RFC 604 * 2988. Given an RTT sample R: 605 * 606 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 607 * SRTT = (1-alpha) * SRTT + alpha * R 608 * 609 * where alpha = 0.125 and beta = 0.25. 610 * 611 * The initial retransmit timeout is 612 * SRTT + 4*RTTVAR and doubles on each 613 * retransmision. 614 */ 615 if (rt->rt_srtt == 0) { 616 rt->rt_srtt = rtt; 617 rt->rt_deviate = rtt / 2; 618 } else { 619 int32_t error = rtt - rt->rt_srtt; 620 rt->rt_srtt += error / 8; 621 error = abs(error) - rt->rt_deviate; 622 rt->rt_deviate += error / 4; 623 } 624 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 625 } 626 627 break; 628 } 629 630 /* 631 * The sleep returned an error so our request is still 632 * on the list. If we got EWOULDBLOCK, we may want to 633 * re-send the request. 634 */ 635 if (error != EWOULDBLOCK) { 636 errp->re_errno = error; 637 if (error == EINTR || error == ERESTART) 638 errp->re_status = stat = RPC_INTR; 639 else 640 errp->re_status = stat = RPC_CANTRECV; 641 goto out; 642 } 643 644 time_waited = ticks - starttime; 645 646 /* Check for timeout. */ 647 if (time_waited > timeout) { 648 errp->re_errno = EWOULDBLOCK; 649 errp->re_status = stat = RPC_TIMEDOUT; 650 goto out; 651 } 652 653 /* Retransmit if necessary. */ 654 if (time_waited >= next_sendtime) { 655 cu->cu_cwnd /= 2; 656 if (cu->cu_cwnd < CWNDSCALE) 657 cu->cu_cwnd = CWNDSCALE; 658 if (ext && ext->rc_feedback) { 659 mtx_unlock(&cs->cs_lock); 660 if (retrans == 0) 661 ext->rc_feedback(FEEDBACK_REXMIT1, 662 proc, ext->rc_feedback_arg); 663 else 664 ext->rc_feedback(FEEDBACK_REXMIT2, 665 proc, ext->rc_feedback_arg); 666 mtx_lock(&cs->cs_lock); 667 } 668 if (cu->cu_closing || cu->cu_closed) { 669 errp->re_errno = ESHUTDOWN; 670 errp->re_status = stat = RPC_CANTRECV; 671 goto out; 672 } 673 retrans++; 674 /* update retransmit_time */ 675 if (retransmit_time < RPC_MAX_BACKOFF * hz) 676 retransmit_time = 2 * retransmit_time; 677 next_sendtime += retransmit_time; 678 goto send_again; 679 } 680 cu->cu_sent += CWNDSCALE; 681 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 682 } 683 684 got_reply: 685 /* 686 * Now decode and validate the response. We need to drop the 687 * lock since xdr_replymsg may end up sleeping in malloc. 688 */ 689 mtx_unlock(&cs->cs_lock); 690 691 if (ext && ext->rc_feedback) 692 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 693 694 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 695 ok = xdr_replymsg(&xdrs, &reply_msg); 696 cr->cr_mrep = NULL; 697 698 if (ok) { 699 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 700 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 701 errp->re_status = stat = RPC_SUCCESS; 702 else 703 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 704 705 if (errp->re_status == RPC_SUCCESS) { 706 results = xdrmbuf_getall(&xdrs); 707 if (! AUTH_VALIDATE(auth, xid, 708 &reply_msg.acpted_rply.ar_verf, 709 &results)) { 710 errp->re_status = stat = RPC_AUTHERROR; 711 errp->re_why = AUTH_INVALIDRESP; 712 if (retrans && 713 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 714 /* 715 * If we retransmitted, its 716 * possible that we will 717 * receive a reply for one of 718 * the earlier transmissions 719 * (which will use an older 720 * RPCSEC_GSS sequence 721 * number). In this case, just 722 * go back and listen for a 723 * new reply. We could keep a 724 * record of all the seq 725 * numbers we have transmitted 726 * so far so that we could 727 * accept a reply for any of 728 * them here. 729 */ 730 XDR_DESTROY(&xdrs); 731 mtx_lock(&cs->cs_lock); 732 cu->cu_sent += CWNDSCALE; 733 TAILQ_INSERT_TAIL(&cs->cs_pending, 734 cr, cr_link); 735 cr->cr_mrep = NULL; 736 goto get_reply; 737 } 738 } else { 739 *resultsp = results; 740 } 741 } /* end successful completion */ 742 /* 743 * If unsuccessful AND error is an authentication error 744 * then refresh credentials and try again, else break 745 */ 746 else if (stat == RPC_AUTHERROR) 747 /* maybe our credentials need to be refreshed ... */ 748 if (nrefreshes > 0 && 749 AUTH_REFRESH(auth, &reply_msg)) { 750 nrefreshes--; 751 XDR_DESTROY(&xdrs); 752 mtx_lock(&cs->cs_lock); 753 goto call_again; 754 } 755 /* end of unsuccessful completion */ 756 } /* end of valid reply message */ 757 else { 758 errp->re_status = stat = RPC_CANTDECODERES; 759 760 } 761 XDR_DESTROY(&xdrs); 762 mtx_lock(&cs->cs_lock); 763 out: 764 mtx_assert(&cs->cs_lock, MA_OWNED); 765 766 if (mreq) 767 m_freem(mreq); 768 if (cr->cr_mrep) 769 m_freem(cr->cr_mrep); 770 771 cu->cu_threads--; 772 if (cu->cu_closing) 773 wakeup(cu); 774 775 mtx_unlock(&cs->cs_lock); 776 777 if (auth && stat != RPC_SUCCESS) 778 AUTH_VALIDATE(auth, xid, NULL, NULL); 779 780 free(cr, M_RPC); 781 782 return (stat); 783 } 784 785 static void 786 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 787 { 788 struct cu_data *cu = (struct cu_data *)cl->cl_private; 789 790 *errp = cu->cu_error; 791 } 792 793 static bool_t 794 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 795 { 796 XDR xdrs; 797 bool_t dummy; 798 799 xdrs.x_op = XDR_FREE; 800 dummy = (*xdr_res)(&xdrs, res_ptr); 801 802 return (dummy); 803 } 804 805 /*ARGSUSED*/ 806 static void 807 clnt_dg_abort(CLIENT *h) 808 { 809 } 810 811 static bool_t 812 clnt_dg_control(CLIENT *cl, u_int request, void *info) 813 { 814 struct cu_data *cu = (struct cu_data *)cl->cl_private; 815 struct cu_socket *cs; 816 struct sockaddr *addr; 817 818 cs = cu->cu_socket->so_rcv.sb_upcallarg; 819 mtx_lock(&cs->cs_lock); 820 821 switch (request) { 822 case CLSET_FD_CLOSE: 823 cu->cu_closeit = TRUE; 824 mtx_unlock(&cs->cs_lock); 825 return (TRUE); 826 case CLSET_FD_NCLOSE: 827 cu->cu_closeit = FALSE; 828 mtx_unlock(&cs->cs_lock); 829 return (TRUE); 830 } 831 832 /* for other requests which use info */ 833 if (info == NULL) { 834 mtx_unlock(&cs->cs_lock); 835 return (FALSE); 836 } 837 switch (request) { 838 case CLSET_TIMEOUT: 839 if (time_not_ok((struct timeval *)info)) { 840 mtx_unlock(&cs->cs_lock); 841 return (FALSE); 842 } 843 cu->cu_total = *(struct timeval *)info; 844 break; 845 case CLGET_TIMEOUT: 846 *(struct timeval *)info = cu->cu_total; 847 break; 848 case CLSET_RETRY_TIMEOUT: 849 if (time_not_ok((struct timeval *)info)) { 850 mtx_unlock(&cs->cs_lock); 851 return (FALSE); 852 } 853 cu->cu_wait = *(struct timeval *)info; 854 break; 855 case CLGET_RETRY_TIMEOUT: 856 *(struct timeval *)info = cu->cu_wait; 857 break; 858 case CLGET_SVC_ADDR: 859 /* 860 * Slightly different semantics to userland - we use 861 * sockaddr instead of netbuf. 862 */ 863 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 864 break; 865 case CLSET_SVC_ADDR: /* set to new address */ 866 addr = (struct sockaddr *)info; 867 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 868 break; 869 case CLGET_XID: 870 *(uint32_t *)info = cu->cu_xid; 871 break; 872 873 case CLSET_XID: 874 /* This will set the xid of the NEXT call */ 875 /* decrement by 1 as clnt_dg_call() increments once */ 876 cu->cu_xid = *(uint32_t *)info - 1; 877 break; 878 879 case CLGET_VERS: 880 /* 881 * This RELIES on the information that, in the call body, 882 * the version number field is the fifth field from the 883 * beginning of the RPC header. MUST be changed if the 884 * call_struct is changed 885 */ 886 *(uint32_t *)info = 887 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 888 4 * BYTES_PER_XDR_UNIT)); 889 break; 890 891 case CLSET_VERS: 892 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 893 = htonl(*(uint32_t *)info); 894 break; 895 896 case CLGET_PROG: 897 /* 898 * This RELIES on the information that, in the call body, 899 * the program number field is the fourth field from the 900 * beginning of the RPC header. MUST be changed if the 901 * call_struct is changed 902 */ 903 *(uint32_t *)info = 904 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 905 3 * BYTES_PER_XDR_UNIT)); 906 break; 907 908 case CLSET_PROG: 909 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 910 = htonl(*(uint32_t *)info); 911 break; 912 case CLSET_ASYNC: 913 cu->cu_async = *(int *)info; 914 break; 915 case CLSET_CONNECT: 916 cu->cu_connect = *(int *)info; 917 break; 918 case CLSET_WAITCHAN: 919 cu->cu_waitchan = (const char *)info; 920 break; 921 case CLGET_WAITCHAN: 922 *(const char **) info = cu->cu_waitchan; 923 break; 924 case CLSET_INTERRUPTIBLE: 925 if (*(int *) info) 926 cu->cu_waitflag = PCATCH; 927 else 928 cu->cu_waitflag = 0; 929 break; 930 case CLGET_INTERRUPTIBLE: 931 if (cu->cu_waitflag) 932 *(int *) info = TRUE; 933 else 934 *(int *) info = FALSE; 935 break; 936 default: 937 mtx_unlock(&cs->cs_lock); 938 return (FALSE); 939 } 940 mtx_unlock(&cs->cs_lock); 941 return (TRUE); 942 } 943 944 static void 945 clnt_dg_close(CLIENT *cl) 946 { 947 struct cu_data *cu = (struct cu_data *)cl->cl_private; 948 struct cu_socket *cs; 949 struct cu_request *cr; 950 951 cs = cu->cu_socket->so_rcv.sb_upcallarg; 952 mtx_lock(&cs->cs_lock); 953 954 if (cu->cu_closed) { 955 mtx_unlock(&cs->cs_lock); 956 return; 957 } 958 959 if (cu->cu_closing) { 960 while (cu->cu_closing) 961 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 962 KASSERT(cu->cu_closed, ("client should be closed")); 963 mtx_unlock(&cs->cs_lock); 964 return; 965 } 966 967 /* 968 * Abort any pending requests and wait until everyone 969 * has finished with clnt_vc_call. 970 */ 971 cu->cu_closing = TRUE; 972 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 973 if (cr->cr_client == cl) { 974 cr->cr_xid = 0; 975 cr->cr_error = ESHUTDOWN; 976 wakeup(cr); 977 } 978 } 979 980 while (cu->cu_threads) 981 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 982 983 cu->cu_closing = FALSE; 984 cu->cu_closed = TRUE; 985 986 mtx_unlock(&cs->cs_lock); 987 wakeup(cu); 988 } 989 990 static void 991 clnt_dg_destroy(CLIENT *cl) 992 { 993 struct cu_data *cu = (struct cu_data *)cl->cl_private; 994 struct cu_socket *cs; 995 struct socket *so = NULL; 996 bool_t lastsocketref; 997 998 cs = cu->cu_socket->so_rcv.sb_upcallarg; 999 clnt_dg_close(cl); 1000 1001 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 1002 mtx_lock(&cs->cs_lock); 1003 1004 cs->cs_refs--; 1005 if (cs->cs_refs == 0) { 1006 mtx_unlock(&cs->cs_lock); 1007 soupcall_clear(cu->cu_socket, SO_RCV); 1008 clnt_dg_upcallsdone(cu->cu_socket, cs); 1009 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1010 mtx_destroy(&cs->cs_lock); 1011 mem_free(cs, sizeof(*cs)); 1012 lastsocketref = TRUE; 1013 } else { 1014 mtx_unlock(&cs->cs_lock); 1015 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1016 lastsocketref = FALSE; 1017 } 1018 1019 if (cu->cu_closeit && lastsocketref) { 1020 so = cu->cu_socket; 1021 cu->cu_socket = NULL; 1022 } 1023 1024 if (so) 1025 soclose(so); 1026 1027 if (cl->cl_netid && cl->cl_netid[0]) 1028 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1029 if (cl->cl_tp && cl->cl_tp[0]) 1030 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1031 mem_free(cu, sizeof (*cu)); 1032 mem_free(cl, sizeof (CLIENT)); 1033 } 1034 1035 /* 1036 * Make sure that the time is not garbage. -1 value is allowed. 1037 */ 1038 static bool_t 1039 time_not_ok(struct timeval *t) 1040 { 1041 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1042 t->tv_usec < -1 || t->tv_usec > 1000000); 1043 } 1044 1045 int 1046 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1047 { 1048 struct cu_socket *cs = (struct cu_socket *) arg; 1049 struct uio uio; 1050 struct mbuf *m; 1051 struct mbuf *control; 1052 struct cu_request *cr; 1053 int error, rcvflag, foundreq; 1054 uint32_t xid; 1055 1056 cs->cs_upcallrefs++; 1057 uio.uio_resid = 1000000000; 1058 uio.uio_td = curthread; 1059 do { 1060 SOCKBUF_UNLOCK(&so->so_rcv); 1061 m = NULL; 1062 control = NULL; 1063 rcvflag = MSG_DONTWAIT; 1064 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1065 if (control) 1066 m_freem(control); 1067 SOCKBUF_LOCK(&so->so_rcv); 1068 1069 if (error == EWOULDBLOCK) 1070 break; 1071 1072 /* 1073 * If there was an error, wake up all pending 1074 * requests. 1075 */ 1076 if (error) { 1077 mtx_lock(&cs->cs_lock); 1078 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1079 cr->cr_xid = 0; 1080 cr->cr_error = error; 1081 wakeup(cr); 1082 } 1083 mtx_unlock(&cs->cs_lock); 1084 break; 1085 } 1086 1087 /* 1088 * The XID is in the first uint32_t of the reply. 1089 */ 1090 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) { 1091 /* 1092 * Should never happen. 1093 */ 1094 m_freem(m); 1095 continue; 1096 } 1097 1098 m_copydata(m, 0, sizeof(xid), (char *)&xid); 1099 xid = ntohl(xid); 1100 1101 /* 1102 * Attempt to match this reply with a pending request. 1103 */ 1104 mtx_lock(&cs->cs_lock); 1105 foundreq = 0; 1106 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1107 if (cr->cr_xid == xid) { 1108 /* 1109 * This one matches. We leave the 1110 * reply mbuf in cr->cr_mrep. Set the 1111 * XID to zero so that we will ignore 1112 * any duplicated replies that arrive 1113 * before clnt_dg_call removes it from 1114 * the queue. 1115 */ 1116 cr->cr_xid = 0; 1117 cr->cr_mrep = m; 1118 cr->cr_error = 0; 1119 foundreq = 1; 1120 wakeup(cr); 1121 break; 1122 } 1123 } 1124 mtx_unlock(&cs->cs_lock); 1125 1126 /* 1127 * If we didn't find the matching request, just drop 1128 * it - its probably a repeated reply. 1129 */ 1130 if (!foundreq) 1131 m_freem(m); 1132 } while (m); 1133 cs->cs_upcallrefs--; 1134 if (cs->cs_upcallrefs < 0) 1135 panic("rpcdg upcall refcnt"); 1136 if (cs->cs_upcallrefs == 0) 1137 wakeup(&cs->cs_upcallrefs); 1138 return (SU_OK); 1139 } 1140 1141 /* 1142 * Wait for all upcalls in progress to complete. 1143 */ 1144 static void 1145 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1146 { 1147 1148 SOCKBUF_LOCK_ASSERT(&so->so_rcv); 1149 1150 while (cs->cs_upcallrefs > 0) 1151 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1152 "rpcdgup", 0); 1153 } 1154