1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 /* 33 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 34 */ 35 36 #if defined(LIBC_SCCS) && !defined(lint) 37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 38 #endif 39 #include <sys/cdefs.h> 40 /* 41 * Implements a connectionless client side RPC. 42 */ 43 44 #include <sys/param.h> 45 #include <sys/systm.h> 46 #include <sys/kernel.h> 47 #include <sys/lock.h> 48 #include <sys/malloc.h> 49 #include <sys/mbuf.h> 50 #include <sys/mutex.h> 51 #include <sys/pcpu.h> 52 #include <sys/proc.h> 53 #include <sys/socket.h> 54 #include <sys/socketvar.h> 55 #include <sys/time.h> 56 #include <sys/uio.h> 57 58 #include <net/vnet.h> 59 60 #include <rpc/rpc.h> 61 #include <rpc/rpc_com.h> 62 63 64 #ifdef _FREEFALL_CONFIG 65 /* 66 * Disable RPC exponential back-off for FreeBSD.org systems. 67 */ 68 #define RPC_MAX_BACKOFF 1 /* second */ 69 #else 70 #define RPC_MAX_BACKOFF 30 /* seconds */ 71 #endif 72 73 static bool_t time_not_ok(struct timeval *); 74 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 75 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 76 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 77 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 78 static void clnt_dg_abort(CLIENT *); 79 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 80 static void clnt_dg_close(CLIENT *); 81 static void clnt_dg_destroy(CLIENT *); 82 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 83 84 static const struct clnt_ops clnt_dg_ops = { 85 .cl_call = clnt_dg_call, 86 .cl_abort = clnt_dg_abort, 87 .cl_geterr = clnt_dg_geterr, 88 .cl_freeres = clnt_dg_freeres, 89 .cl_close = clnt_dg_close, 90 .cl_destroy = clnt_dg_destroy, 91 .cl_control = clnt_dg_control 92 }; 93 94 static volatile uint32_t rpc_xid = 0; 95 96 /* 97 * A pending RPC request which awaits a reply. Requests which have 98 * received their reply will have cr_xid set to zero and cr_mrep to 99 * the mbuf chain of the reply. 100 */ 101 struct cu_request { 102 TAILQ_ENTRY(cu_request) cr_link; 103 CLIENT *cr_client; /* owner */ 104 uint32_t cr_xid; /* XID of request */ 105 struct mbuf *cr_mrep; /* reply received by upcall */ 106 int cr_error; /* any error from upcall */ 107 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 108 }; 109 110 TAILQ_HEAD(cu_request_list, cu_request); 111 112 #define MCALL_MSG_SIZE 24 113 114 /* 115 * This structure is pointed to by the socket buffer's sb_upcallarg 116 * member. It is separate from the client private data to facilitate 117 * multiple clients sharing the same socket. The cs_lock mutex is used 118 * to protect all fields of this structure, the socket's receive 119 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 120 * structures is installed on the socket. 121 */ 122 struct cu_socket { 123 struct mtx cs_lock; 124 int cs_refs; /* Count of clients */ 125 struct cu_request_list cs_pending; /* Requests awaiting replies */ 126 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 127 }; 128 129 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 130 131 /* 132 * Private data kept per client handle 133 */ 134 struct cu_data { 135 int cu_threads; /* # threads in clnt_vc_call */ 136 bool_t cu_closing; /* TRUE if we are closing */ 137 bool_t cu_closed; /* TRUE if we are closed */ 138 struct socket *cu_socket; /* connection socket */ 139 bool_t cu_closeit; /* opened by library */ 140 struct sockaddr_storage cu_raddr; /* remote address */ 141 int cu_rlen; 142 struct timeval cu_wait; /* retransmit interval */ 143 struct timeval cu_total; /* total time for the call */ 144 struct rpc_err cu_error; 145 uint32_t cu_xid; 146 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 147 size_t cu_mcalllen; 148 size_t cu_sendsz; /* send size */ 149 size_t cu_recvsz; /* recv size */ 150 int cu_async; 151 int cu_connect; /* Use connect(). */ 152 int cu_connected; /* Have done connect(). */ 153 const char *cu_waitchan; 154 int cu_waitflag; 155 int cu_cwnd; /* congestion window */ 156 int cu_sent; /* number of in-flight RPCs */ 157 bool_t cu_cwnd_wait; 158 }; 159 160 #define CWNDSCALE 256 161 #define MAXCWND (32 * CWNDSCALE) 162 163 /* 164 * Connection less client creation returns with client handle parameters. 165 * Default options are set, which the user can change using clnt_control(). 166 * fd should be open and bound. 167 * NB: The rpch->cl_auth is initialized to null authentication. 168 * Caller may wish to set this something more useful. 169 * 170 * sendsz and recvsz are the maximum allowable packet sizes that can be 171 * sent and received. Normally they are the same, but they can be 172 * changed to improve the program efficiency and buffer allocation. 173 * If they are 0, use the transport default. 174 * 175 * If svcaddr is NULL, returns NULL. 176 */ 177 CLIENT * 178 clnt_dg_create( 179 struct socket *so, 180 struct sockaddr *svcaddr, /* servers address */ 181 rpcprog_t program, /* program number */ 182 rpcvers_t version, /* version number */ 183 size_t sendsz, /* buffer recv size */ 184 size_t recvsz) /* buffer send size */ 185 { 186 CLIENT *cl = NULL; /* client handle */ 187 struct cu_data *cu = NULL; /* private data */ 188 struct cu_socket *cs = NULL; 189 struct sockbuf *sb; 190 struct timeval now; 191 struct rpc_msg call_msg; 192 struct __rpc_sockinfo si; 193 XDR xdrs; 194 int error; 195 uint32_t newxid; 196 197 if (svcaddr == NULL) { 198 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 199 return (NULL); 200 } 201 202 if (!__rpc_socket2sockinfo(so, &si)) { 203 rpc_createerr.cf_stat = RPC_TLIERROR; 204 rpc_createerr.cf_error.re_errno = 0; 205 return (NULL); 206 } 207 208 /* 209 * Find the receive and the send size 210 */ 211 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 212 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 213 if ((sendsz == 0) || (recvsz == 0)) { 214 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 215 rpc_createerr.cf_error.re_errno = 0; 216 return (NULL); 217 } 218 219 cl = mem_alloc(sizeof (CLIENT)); 220 221 /* 222 * Should be multiple of 4 for XDR. 223 */ 224 sendsz = rounddown(sendsz + 3, 4); 225 recvsz = rounddown(recvsz + 3, 4); 226 cu = mem_alloc(sizeof (*cu)); 227 cu->cu_threads = 0; 228 cu->cu_closing = FALSE; 229 cu->cu_closed = FALSE; 230 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 231 cu->cu_rlen = svcaddr->sa_len; 232 /* Other values can also be set through clnt_control() */ 233 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 234 cu->cu_wait.tv_usec = 0; 235 cu->cu_total.tv_sec = -1; 236 cu->cu_total.tv_usec = -1; 237 cu->cu_sendsz = sendsz; 238 cu->cu_recvsz = recvsz; 239 cu->cu_async = FALSE; 240 cu->cu_connect = FALSE; 241 cu->cu_connected = FALSE; 242 cu->cu_waitchan = "rpcrecv"; 243 cu->cu_waitflag = 0; 244 cu->cu_cwnd = MAXCWND / 2; 245 cu->cu_sent = 0; 246 cu->cu_cwnd_wait = FALSE; 247 (void) getmicrotime(&now); 248 /* Clip at 28bits so that it will not wrap around. */ 249 newxid = __RPC_GETXID(&now) & 0xfffffff; 250 atomic_cmpset_32(&rpc_xid, 0, newxid); 251 call_msg.rm_xid = atomic_fetchadd_32(&rpc_xid, 1); 252 call_msg.rm_call.cb_prog = program; 253 call_msg.rm_call.cb_vers = version; 254 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 255 if (! xdr_callhdr(&xdrs, &call_msg)) { 256 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 257 rpc_createerr.cf_error.re_errno = 0; 258 goto err2; 259 } 260 cu->cu_mcalllen = XDR_GETPOS(&xdrs); 261 262 /* 263 * By default, closeit is always FALSE. It is users responsibility 264 * to do a close on it, else the user may use clnt_control 265 * to let clnt_destroy do it for him/her. 266 */ 267 cu->cu_closeit = FALSE; 268 cu->cu_socket = so; 269 error = soreserve(so, (u_long)sendsz, (u_long)recvsz); 270 if (error != 0) { 271 rpc_createerr.cf_stat = RPC_FAILED; 272 rpc_createerr.cf_error.re_errno = error; 273 goto err2; 274 } 275 276 sb = &so->so_rcv; 277 SOCKBUF_LOCK(&so->so_rcv); 278 recheck_socket: 279 if (sb->sb_upcall) { 280 if (sb->sb_upcall != clnt_dg_soupcall) { 281 SOCKBUF_UNLOCK(&so->so_rcv); 282 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 283 goto err2; 284 } 285 cs = (struct cu_socket *) sb->sb_upcallarg; 286 mtx_lock(&cs->cs_lock); 287 cs->cs_refs++; 288 mtx_unlock(&cs->cs_lock); 289 } else { 290 /* 291 * We are the first on this socket - allocate the 292 * structure and install it in the socket. 293 */ 294 SOCKBUF_UNLOCK(&so->so_rcv); 295 cs = mem_alloc(sizeof(*cs)); 296 SOCKBUF_LOCK(&so->so_rcv); 297 if (sb->sb_upcall) { 298 /* 299 * We have lost a race with some other client. 300 */ 301 mem_free(cs, sizeof(*cs)); 302 goto recheck_socket; 303 } 304 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 305 cs->cs_refs = 1; 306 cs->cs_upcallrefs = 0; 307 TAILQ_INIT(&cs->cs_pending); 308 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 309 } 310 SOCKBUF_UNLOCK(&so->so_rcv); 311 312 cl->cl_refs = 1; 313 cl->cl_ops = &clnt_dg_ops; 314 cl->cl_private = (caddr_t)(void *)cu; 315 cl->cl_auth = authnone_create(); 316 cl->cl_tp = NULL; 317 cl->cl_netid = NULL; 318 return (cl); 319 err2: 320 mem_free(cl, sizeof (CLIENT)); 321 mem_free(cu, sizeof (*cu)); 322 323 return (NULL); 324 } 325 326 static enum clnt_stat 327 clnt_dg_call( 328 CLIENT *cl, /* client handle */ 329 struct rpc_callextra *ext, /* call metadata */ 330 rpcproc_t proc, /* procedure number */ 331 struct mbuf *args, /* pointer to args */ 332 struct mbuf **resultsp, /* pointer to results */ 333 struct timeval utimeout) /* seconds to wait before giving up */ 334 { 335 struct cu_data *cu = (struct cu_data *)cl->cl_private; 336 struct cu_socket *cs; 337 struct rpc_timers *rt; 338 AUTH *auth; 339 struct rpc_err *errp; 340 enum clnt_stat stat; 341 XDR xdrs; 342 struct rpc_msg reply_msg; 343 bool_t ok; 344 int retrans; /* number of re-transmits so far */ 345 int nrefreshes = 2; /* number of times to refresh cred */ 346 struct timeval *tvp; 347 int timeout; 348 int retransmit_time; 349 int next_sendtime, starttime, rtt, time_waited, tv = 0; 350 struct sockaddr *sa; 351 uint32_t xid = 0; 352 struct mbuf *mreq = NULL, *results; 353 struct cu_request *cr; 354 int error; 355 356 cs = cu->cu_socket->so_rcv.sb_upcallarg; 357 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 358 359 mtx_lock(&cs->cs_lock); 360 361 if (cu->cu_closing || cu->cu_closed) { 362 mtx_unlock(&cs->cs_lock); 363 free(cr, M_RPC); 364 return (RPC_CANTSEND); 365 } 366 cu->cu_threads++; 367 368 if (ext) { 369 auth = ext->rc_auth; 370 errp = &ext->rc_err; 371 } else { 372 auth = cl->cl_auth; 373 errp = &cu->cu_error; 374 } 375 376 cr->cr_client = cl; 377 cr->cr_mrep = NULL; 378 cr->cr_error = 0; 379 380 if (cu->cu_total.tv_usec == -1) { 381 tvp = &utimeout; /* use supplied timeout */ 382 } else { 383 tvp = &cu->cu_total; /* use default timeout */ 384 } 385 if (tvp->tv_sec || tvp->tv_usec) 386 timeout = tvtohz(tvp); 387 else 388 timeout = 0; 389 390 if (cu->cu_connect && !cu->cu_connected) { 391 mtx_unlock(&cs->cs_lock); 392 error = soconnect(cu->cu_socket, 393 (struct sockaddr *)&cu->cu_raddr, curthread); 394 mtx_lock(&cs->cs_lock); 395 if (error) { 396 errp->re_errno = error; 397 errp->re_status = stat = RPC_CANTSEND; 398 goto out; 399 } 400 cu->cu_connected = 1; 401 } 402 if (cu->cu_connected) 403 sa = NULL; 404 else 405 sa = (struct sockaddr *)&cu->cu_raddr; 406 time_waited = 0; 407 retrans = 0; 408 if (ext && ext->rc_timers) { 409 rt = ext->rc_timers; 410 if (!rt->rt_rtxcur) 411 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 412 retransmit_time = next_sendtime = rt->rt_rtxcur; 413 } else { 414 rt = NULL; 415 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 416 } 417 418 starttime = ticks; 419 420 call_again: 421 mtx_assert(&cs->cs_lock, MA_OWNED); 422 423 xid = atomic_fetchadd_32(&rpc_xid, 1); 424 425 send_again: 426 mtx_unlock(&cs->cs_lock); 427 428 mreq = m_gethdr(M_WAITOK, MT_DATA); 429 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 430 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 431 mreq->m_len = cu->cu_mcalllen; 432 433 /* 434 * The XID is the first thing in the request. 435 */ 436 *mtod(mreq, uint32_t *) = htonl(xid); 437 438 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 439 440 if (cu->cu_async == TRUE && args == NULL) 441 goto get_reply; 442 443 if ((! XDR_PUTINT32(&xdrs, &proc)) || 444 (! AUTH_MARSHALL(auth, xid, &xdrs, 445 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 446 errp->re_status = stat = RPC_CANTENCODEARGS; 447 mtx_lock(&cs->cs_lock); 448 goto out; 449 } 450 mreq->m_pkthdr.len = m_length(mreq, NULL); 451 452 cr->cr_xid = xid; 453 mtx_lock(&cs->cs_lock); 454 455 /* 456 * Try to get a place in the congestion window. 457 */ 458 while (cu->cu_sent >= cu->cu_cwnd) { 459 cu->cu_cwnd_wait = TRUE; 460 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 461 cu->cu_waitflag, "rpccwnd", 0); 462 if (error) { 463 errp->re_errno = error; 464 if (error == EINTR || error == ERESTART) 465 errp->re_status = stat = RPC_INTR; 466 else 467 errp->re_status = stat = RPC_CANTSEND; 468 goto out; 469 } 470 } 471 cu->cu_sent += CWNDSCALE; 472 473 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 474 mtx_unlock(&cs->cs_lock); 475 476 /* 477 * sosend consumes mreq. 478 */ 479 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 480 mreq = NULL; 481 482 /* 483 * sub-optimal code appears here because we have 484 * some clock time to spare while the packets are in flight. 485 * (We assume that this is actually only executed once.) 486 */ 487 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 488 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 489 reply_msg.acpted_rply.ar_verf.oa_length = 0; 490 reply_msg.acpted_rply.ar_results.where = NULL; 491 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 492 493 mtx_lock(&cs->cs_lock); 494 if (error) { 495 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 496 errp->re_errno = error; 497 errp->re_status = stat = RPC_CANTSEND; 498 cu->cu_sent -= CWNDSCALE; 499 if (cu->cu_cwnd_wait) { 500 cu->cu_cwnd_wait = FALSE; 501 wakeup(&cu->cu_cwnd_wait); 502 } 503 goto out; 504 } 505 506 /* 507 * Check to see if we got an upcall while waiting for the 508 * lock. 509 */ 510 if (cr->cr_error) { 511 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 512 errp->re_errno = cr->cr_error; 513 errp->re_status = stat = RPC_CANTRECV; 514 cu->cu_sent -= CWNDSCALE; 515 if (cu->cu_cwnd_wait) { 516 cu->cu_cwnd_wait = FALSE; 517 wakeup(&cu->cu_cwnd_wait); 518 } 519 goto out; 520 } 521 if (cr->cr_mrep) { 522 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 523 cu->cu_sent -= CWNDSCALE; 524 if (cu->cu_cwnd_wait) { 525 cu->cu_cwnd_wait = FALSE; 526 wakeup(&cu->cu_cwnd_wait); 527 } 528 goto got_reply; 529 } 530 531 /* 532 * Hack to provide rpc-based message passing 533 */ 534 if (timeout == 0) { 535 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 536 errp->re_status = stat = RPC_TIMEDOUT; 537 cu->cu_sent -= CWNDSCALE; 538 if (cu->cu_cwnd_wait) { 539 cu->cu_cwnd_wait = FALSE; 540 wakeup(&cu->cu_cwnd_wait); 541 } 542 goto out; 543 } 544 545 get_reply: 546 for (;;) { 547 /* Decide how long to wait. */ 548 if (next_sendtime < timeout) 549 tv = next_sendtime; 550 else 551 tv = timeout; 552 tv -= time_waited; 553 554 if (tv > 0) { 555 if (cu->cu_closing || cu->cu_closed) { 556 error = 0; 557 cr->cr_error = ESHUTDOWN; 558 } else { 559 error = msleep(cr, &cs->cs_lock, 560 cu->cu_waitflag, cu->cu_waitchan, tv); 561 } 562 } else { 563 error = EWOULDBLOCK; 564 } 565 566 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 567 cu->cu_sent -= CWNDSCALE; 568 if (cu->cu_cwnd_wait) { 569 cu->cu_cwnd_wait = FALSE; 570 wakeup(&cu->cu_cwnd_wait); 571 } 572 573 if (!error) { 574 /* 575 * We were woken up by the upcall. If the 576 * upcall had a receive error, report that, 577 * otherwise we have a reply. 578 */ 579 if (cr->cr_error) { 580 errp->re_errno = cr->cr_error; 581 errp->re_status = stat = RPC_CANTRECV; 582 goto out; 583 } 584 585 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 586 + cu->cu_cwnd / 2) / cu->cu_cwnd; 587 if (cu->cu_cwnd > MAXCWND) 588 cu->cu_cwnd = MAXCWND; 589 590 if (rt) { 591 /* 592 * Add one to the time since a tick 593 * count of N means that the actual 594 * time taken was somewhere between N 595 * and N+1. 596 */ 597 rtt = ticks - starttime + 1; 598 599 /* 600 * Update our estimate of the round 601 * trip time using roughly the 602 * algorithm described in RFC 603 * 2988. Given an RTT sample R: 604 * 605 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 606 * SRTT = (1-alpha) * SRTT + alpha * R 607 * 608 * where alpha = 0.125 and beta = 0.25. 609 * 610 * The initial retransmit timeout is 611 * SRTT + 4*RTTVAR and doubles on each 612 * retransmision. 613 */ 614 if (rt->rt_srtt == 0) { 615 rt->rt_srtt = rtt; 616 rt->rt_deviate = rtt / 2; 617 } else { 618 int32_t error = rtt - rt->rt_srtt; 619 rt->rt_srtt += error / 8; 620 error = abs(error) - rt->rt_deviate; 621 rt->rt_deviate += error / 4; 622 } 623 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 624 } 625 626 break; 627 } 628 629 /* 630 * The sleep returned an error so our request is still 631 * on the list. If we got EWOULDBLOCK, we may want to 632 * re-send the request. 633 */ 634 if (error != EWOULDBLOCK) { 635 errp->re_errno = error; 636 if (error == EINTR || error == ERESTART) 637 errp->re_status = stat = RPC_INTR; 638 else 639 errp->re_status = stat = RPC_CANTRECV; 640 goto out; 641 } 642 643 time_waited = ticks - starttime; 644 645 /* Check for timeout. */ 646 if (time_waited > timeout) { 647 errp->re_errno = EWOULDBLOCK; 648 errp->re_status = stat = RPC_TIMEDOUT; 649 goto out; 650 } 651 652 /* Retransmit if necessary. */ 653 if (time_waited >= next_sendtime) { 654 cu->cu_cwnd /= 2; 655 if (cu->cu_cwnd < CWNDSCALE) 656 cu->cu_cwnd = CWNDSCALE; 657 if (ext && ext->rc_feedback) { 658 mtx_unlock(&cs->cs_lock); 659 if (retrans == 0) 660 ext->rc_feedback(FEEDBACK_REXMIT1, 661 proc, ext->rc_feedback_arg); 662 else 663 ext->rc_feedback(FEEDBACK_REXMIT2, 664 proc, ext->rc_feedback_arg); 665 mtx_lock(&cs->cs_lock); 666 } 667 if (cu->cu_closing || cu->cu_closed) { 668 errp->re_errno = ESHUTDOWN; 669 errp->re_status = stat = RPC_CANTRECV; 670 goto out; 671 } 672 retrans++; 673 /* update retransmit_time */ 674 if (retransmit_time < RPC_MAX_BACKOFF * hz) 675 retransmit_time = 2 * retransmit_time; 676 next_sendtime += retransmit_time; 677 goto send_again; 678 } 679 cu->cu_sent += CWNDSCALE; 680 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 681 } 682 683 got_reply: 684 /* 685 * Now decode and validate the response. We need to drop the 686 * lock since xdr_replymsg may end up sleeping in malloc. 687 */ 688 mtx_unlock(&cs->cs_lock); 689 690 if (ext && ext->rc_feedback) 691 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 692 693 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 694 ok = xdr_replymsg(&xdrs, &reply_msg); 695 cr->cr_mrep = NULL; 696 697 if (ok) { 698 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 699 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 700 errp->re_status = stat = RPC_SUCCESS; 701 else 702 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 703 704 if (errp->re_status == RPC_SUCCESS) { 705 results = xdrmbuf_getall(&xdrs); 706 if (! AUTH_VALIDATE(auth, xid, 707 &reply_msg.acpted_rply.ar_verf, 708 &results)) { 709 errp->re_status = stat = RPC_AUTHERROR; 710 errp->re_why = AUTH_INVALIDRESP; 711 if (retrans && 712 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 713 /* 714 * If we retransmitted, its 715 * possible that we will 716 * receive a reply for one of 717 * the earlier transmissions 718 * (which will use an older 719 * RPCSEC_GSS sequence 720 * number). In this case, just 721 * go back and listen for a 722 * new reply. We could keep a 723 * record of all the seq 724 * numbers we have transmitted 725 * so far so that we could 726 * accept a reply for any of 727 * them here. 728 */ 729 XDR_DESTROY(&xdrs); 730 mtx_lock(&cs->cs_lock); 731 cu->cu_sent += CWNDSCALE; 732 TAILQ_INSERT_TAIL(&cs->cs_pending, 733 cr, cr_link); 734 cr->cr_mrep = NULL; 735 goto get_reply; 736 } 737 } else { 738 *resultsp = results; 739 } 740 } /* end successful completion */ 741 /* 742 * If unsuccessful AND error is an authentication error 743 * then refresh credentials and try again, else break 744 */ 745 else if (stat == RPC_AUTHERROR) 746 /* maybe our credentials need to be refreshed ... */ 747 if (nrefreshes > 0 && 748 AUTH_REFRESH(auth, &reply_msg)) { 749 nrefreshes--; 750 XDR_DESTROY(&xdrs); 751 mtx_lock(&cs->cs_lock); 752 goto call_again; 753 } 754 /* end of unsuccessful completion */ 755 } /* end of valid reply message */ 756 else { 757 errp->re_status = stat = RPC_CANTDECODERES; 758 759 } 760 XDR_DESTROY(&xdrs); 761 mtx_lock(&cs->cs_lock); 762 out: 763 mtx_assert(&cs->cs_lock, MA_OWNED); 764 765 if (mreq) 766 m_freem(mreq); 767 if (cr->cr_mrep) 768 m_freem(cr->cr_mrep); 769 770 cu->cu_threads--; 771 if (cu->cu_closing) 772 wakeup(cu); 773 774 mtx_unlock(&cs->cs_lock); 775 776 if (auth && stat != RPC_SUCCESS) 777 AUTH_VALIDATE(auth, xid, NULL, NULL); 778 779 free(cr, M_RPC); 780 781 return (stat); 782 } 783 784 static void 785 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 786 { 787 struct cu_data *cu = (struct cu_data *)cl->cl_private; 788 789 *errp = cu->cu_error; 790 } 791 792 static bool_t 793 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 794 { 795 XDR xdrs; 796 bool_t dummy; 797 798 xdrs.x_op = XDR_FREE; 799 dummy = (*xdr_res)(&xdrs, res_ptr); 800 801 return (dummy); 802 } 803 804 /*ARGSUSED*/ 805 static void 806 clnt_dg_abort(CLIENT *h) 807 { 808 } 809 810 static bool_t 811 clnt_dg_control(CLIENT *cl, u_int request, void *info) 812 { 813 struct cu_data *cu = (struct cu_data *)cl->cl_private; 814 struct cu_socket *cs; 815 struct sockaddr *addr; 816 817 cs = cu->cu_socket->so_rcv.sb_upcallarg; 818 mtx_lock(&cs->cs_lock); 819 820 switch (request) { 821 case CLSET_FD_CLOSE: 822 cu->cu_closeit = TRUE; 823 mtx_unlock(&cs->cs_lock); 824 return (TRUE); 825 case CLSET_FD_NCLOSE: 826 cu->cu_closeit = FALSE; 827 mtx_unlock(&cs->cs_lock); 828 return (TRUE); 829 } 830 831 /* for other requests which use info */ 832 if (info == NULL) { 833 mtx_unlock(&cs->cs_lock); 834 return (FALSE); 835 } 836 switch (request) { 837 case CLSET_TIMEOUT: 838 if (time_not_ok((struct timeval *)info)) { 839 mtx_unlock(&cs->cs_lock); 840 return (FALSE); 841 } 842 cu->cu_total = *(struct timeval *)info; 843 break; 844 case CLGET_TIMEOUT: 845 *(struct timeval *)info = cu->cu_total; 846 break; 847 case CLSET_RETRY_TIMEOUT: 848 if (time_not_ok((struct timeval *)info)) { 849 mtx_unlock(&cs->cs_lock); 850 return (FALSE); 851 } 852 cu->cu_wait = *(struct timeval *)info; 853 break; 854 case CLGET_RETRY_TIMEOUT: 855 *(struct timeval *)info = cu->cu_wait; 856 break; 857 case CLGET_SVC_ADDR: 858 /* 859 * Slightly different semantics to userland - we use 860 * sockaddr instead of netbuf. 861 */ 862 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 863 break; 864 case CLSET_SVC_ADDR: /* set to new address */ 865 addr = (struct sockaddr *)info; 866 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 867 break; 868 case CLGET_XID: 869 *(uint32_t *)info = atomic_load_32(&rpc_xid); 870 break; 871 872 case CLSET_XID: 873 /* This will set the xid of the NEXT call */ 874 /* decrement by 1 as clnt_dg_call() increments once */ 875 atomic_store_32(&rpc_xid, *(uint32_t *)info - 1); 876 break; 877 878 case CLGET_VERS: 879 /* 880 * This RELIES on the information that, in the call body, 881 * the version number field is the fifth field from the 882 * beginning of the RPC header. MUST be changed if the 883 * call_struct is changed 884 */ 885 *(uint32_t *)info = 886 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 887 4 * BYTES_PER_XDR_UNIT)); 888 break; 889 890 case CLSET_VERS: 891 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 892 = htonl(*(uint32_t *)info); 893 break; 894 895 case CLGET_PROG: 896 /* 897 * This RELIES on the information that, in the call body, 898 * the program number field is the fourth field from the 899 * beginning of the RPC header. MUST be changed if the 900 * call_struct is changed 901 */ 902 *(uint32_t *)info = 903 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 904 3 * BYTES_PER_XDR_UNIT)); 905 break; 906 907 case CLSET_PROG: 908 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 909 = htonl(*(uint32_t *)info); 910 break; 911 case CLSET_ASYNC: 912 cu->cu_async = *(int *)info; 913 break; 914 case CLSET_CONNECT: 915 cu->cu_connect = *(int *)info; 916 break; 917 case CLSET_WAITCHAN: 918 cu->cu_waitchan = (const char *)info; 919 break; 920 case CLGET_WAITCHAN: 921 *(const char **) info = cu->cu_waitchan; 922 break; 923 case CLSET_INTERRUPTIBLE: 924 if (*(int *) info) 925 cu->cu_waitflag = PCATCH; 926 else 927 cu->cu_waitflag = 0; 928 break; 929 case CLGET_INTERRUPTIBLE: 930 if (cu->cu_waitflag) 931 *(int *) info = TRUE; 932 else 933 *(int *) info = FALSE; 934 break; 935 default: 936 mtx_unlock(&cs->cs_lock); 937 return (FALSE); 938 } 939 mtx_unlock(&cs->cs_lock); 940 return (TRUE); 941 } 942 943 static void 944 clnt_dg_close(CLIENT *cl) 945 { 946 struct cu_data *cu = (struct cu_data *)cl->cl_private; 947 struct cu_socket *cs; 948 struct cu_request *cr; 949 950 cs = cu->cu_socket->so_rcv.sb_upcallarg; 951 mtx_lock(&cs->cs_lock); 952 953 if (cu->cu_closed) { 954 mtx_unlock(&cs->cs_lock); 955 return; 956 } 957 958 if (cu->cu_closing) { 959 while (cu->cu_closing) 960 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 961 KASSERT(cu->cu_closed, ("client should be closed")); 962 mtx_unlock(&cs->cs_lock); 963 return; 964 } 965 966 /* 967 * Abort any pending requests and wait until everyone 968 * has finished with clnt_vc_call. 969 */ 970 cu->cu_closing = TRUE; 971 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 972 if (cr->cr_client == cl) { 973 cr->cr_xid = 0; 974 cr->cr_error = ESHUTDOWN; 975 wakeup(cr); 976 } 977 } 978 979 while (cu->cu_threads) 980 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 981 982 cu->cu_closing = FALSE; 983 cu->cu_closed = TRUE; 984 985 mtx_unlock(&cs->cs_lock); 986 wakeup(cu); 987 } 988 989 static void 990 clnt_dg_destroy(CLIENT *cl) 991 { 992 struct cu_data *cu = (struct cu_data *)cl->cl_private; 993 struct cu_socket *cs; 994 struct socket *so = NULL; 995 bool_t lastsocketref; 996 997 cs = cu->cu_socket->so_rcv.sb_upcallarg; 998 clnt_dg_close(cl); 999 1000 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 1001 mtx_lock(&cs->cs_lock); 1002 1003 cs->cs_refs--; 1004 if (cs->cs_refs == 0) { 1005 mtx_unlock(&cs->cs_lock); 1006 soupcall_clear(cu->cu_socket, SO_RCV); 1007 clnt_dg_upcallsdone(cu->cu_socket, cs); 1008 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1009 mtx_destroy(&cs->cs_lock); 1010 mem_free(cs, sizeof(*cs)); 1011 lastsocketref = TRUE; 1012 } else { 1013 mtx_unlock(&cs->cs_lock); 1014 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1015 lastsocketref = FALSE; 1016 } 1017 1018 if (cu->cu_closeit && lastsocketref) { 1019 so = cu->cu_socket; 1020 cu->cu_socket = NULL; 1021 } 1022 1023 if (so) 1024 soclose(so); 1025 1026 if (cl->cl_netid && cl->cl_netid[0]) 1027 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1028 if (cl->cl_tp && cl->cl_tp[0]) 1029 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1030 mem_free(cu, sizeof (*cu)); 1031 mem_free(cl, sizeof (CLIENT)); 1032 } 1033 1034 /* 1035 * Make sure that the time is not garbage. -1 value is allowed. 1036 */ 1037 static bool_t 1038 time_not_ok(struct timeval *t) 1039 { 1040 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1041 t->tv_usec < -1 || t->tv_usec > 1000000); 1042 } 1043 1044 int 1045 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1046 { 1047 struct cu_socket *cs = (struct cu_socket *) arg; 1048 struct uio uio; 1049 struct mbuf *m; 1050 struct mbuf *control; 1051 struct cu_request *cr; 1052 int error, rcvflag, foundreq; 1053 uint32_t xid; 1054 1055 cs->cs_upcallrefs++; 1056 uio.uio_resid = 1000000000; 1057 uio.uio_td = curthread; 1058 do { 1059 SOCKBUF_UNLOCK(&so->so_rcv); 1060 m = NULL; 1061 control = NULL; 1062 rcvflag = MSG_DONTWAIT; 1063 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1064 if (control) 1065 m_freem(control); 1066 SOCKBUF_LOCK(&so->so_rcv); 1067 1068 if (error == EWOULDBLOCK) 1069 break; 1070 1071 /* 1072 * If there was an error, wake up all pending 1073 * requests. 1074 */ 1075 if (error) { 1076 mtx_lock(&cs->cs_lock); 1077 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1078 cr->cr_xid = 0; 1079 cr->cr_error = error; 1080 wakeup(cr); 1081 } 1082 mtx_unlock(&cs->cs_lock); 1083 break; 1084 } 1085 1086 /* 1087 * The XID is in the first uint32_t of the reply. 1088 */ 1089 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) { 1090 /* 1091 * Should never happen. 1092 */ 1093 m_freem(m); 1094 continue; 1095 } 1096 1097 m_copydata(m, 0, sizeof(xid), (char *)&xid); 1098 xid = ntohl(xid); 1099 1100 /* 1101 * Attempt to match this reply with a pending request. 1102 */ 1103 mtx_lock(&cs->cs_lock); 1104 foundreq = 0; 1105 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1106 if (cr->cr_xid == xid) { 1107 /* 1108 * This one matches. We leave the 1109 * reply mbuf in cr->cr_mrep. Set the 1110 * XID to zero so that we will ignore 1111 * any duplicated replies that arrive 1112 * before clnt_dg_call removes it from 1113 * the queue. 1114 */ 1115 cr->cr_xid = 0; 1116 cr->cr_mrep = m; 1117 cr->cr_error = 0; 1118 foundreq = 1; 1119 wakeup(cr); 1120 break; 1121 } 1122 } 1123 mtx_unlock(&cs->cs_lock); 1124 1125 /* 1126 * If we didn't find the matching request, just drop 1127 * it - its probably a repeated reply. 1128 */ 1129 if (!foundreq) 1130 m_freem(m); 1131 } while (m); 1132 cs->cs_upcallrefs--; 1133 if (cs->cs_upcallrefs < 0) 1134 panic("rpcdg upcall refcnt"); 1135 if (cs->cs_upcallrefs == 0) 1136 wakeup(&cs->cs_upcallrefs); 1137 return (SU_OK); 1138 } 1139 1140 /* 1141 * Wait for all upcalls in progress to complete. 1142 */ 1143 static void 1144 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1145 { 1146 1147 SOCKBUF_LOCK_ASSERT(&so->so_rcv); 1148 1149 while (cs->cs_upcallrefs > 0) 1150 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1151 "rpcdgup", 0); 1152 } 1153