1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 /* 33 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 34 */ 35 36 #if defined(LIBC_SCCS) && !defined(lint) 37 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 38 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 39 #endif 40 #include <sys/cdefs.h> 41 __FBSDID("$FreeBSD$"); 42 43 /* 44 * Implements a connectionless client side RPC. 45 */ 46 47 #include <sys/param.h> 48 #include <sys/systm.h> 49 #include <sys/kernel.h> 50 #include <sys/lock.h> 51 #include <sys/malloc.h> 52 #include <sys/mbuf.h> 53 #include <sys/mutex.h> 54 #include <sys/pcpu.h> 55 #include <sys/proc.h> 56 #include <sys/socket.h> 57 #include <sys/socketvar.h> 58 #include <sys/time.h> 59 #include <sys/uio.h> 60 61 #include <net/vnet.h> 62 63 #include <rpc/rpc.h> 64 #include <rpc/rpc_com.h> 65 66 67 #ifdef _FREEFALL_CONFIG 68 /* 69 * Disable RPC exponential back-off for FreeBSD.org systems. 70 */ 71 #define RPC_MAX_BACKOFF 1 /* second */ 72 #else 73 #define RPC_MAX_BACKOFF 30 /* seconds */ 74 #endif 75 76 static bool_t time_not_ok(struct timeval *); 77 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 78 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 79 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 80 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 81 static void clnt_dg_abort(CLIENT *); 82 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 83 static void clnt_dg_close(CLIENT *); 84 static void clnt_dg_destroy(CLIENT *); 85 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 86 87 static struct clnt_ops clnt_dg_ops = { 88 .cl_call = clnt_dg_call, 89 .cl_abort = clnt_dg_abort, 90 .cl_geterr = clnt_dg_geterr, 91 .cl_freeres = clnt_dg_freeres, 92 .cl_close = clnt_dg_close, 93 .cl_destroy = clnt_dg_destroy, 94 .cl_control = clnt_dg_control 95 }; 96 97 static volatile uint32_t rpc_xid = 0; 98 99 /* 100 * A pending RPC request which awaits a reply. Requests which have 101 * received their reply will have cr_xid set to zero and cr_mrep to 102 * the mbuf chain of the reply. 103 */ 104 struct cu_request { 105 TAILQ_ENTRY(cu_request) cr_link; 106 CLIENT *cr_client; /* owner */ 107 uint32_t cr_xid; /* XID of request */ 108 struct mbuf *cr_mrep; /* reply received by upcall */ 109 int cr_error; /* any error from upcall */ 110 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 111 }; 112 113 TAILQ_HEAD(cu_request_list, cu_request); 114 115 #define MCALL_MSG_SIZE 24 116 117 /* 118 * This structure is pointed to by the socket buffer's sb_upcallarg 119 * member. It is separate from the client private data to facilitate 120 * multiple clients sharing the same socket. The cs_lock mutex is used 121 * to protect all fields of this structure, the socket's receive 122 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 123 * structures is installed on the socket. 124 */ 125 struct cu_socket { 126 struct mtx cs_lock; 127 int cs_refs; /* Count of clients */ 128 struct cu_request_list cs_pending; /* Requests awaiting replies */ 129 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 130 }; 131 132 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 133 134 /* 135 * Private data kept per client handle 136 */ 137 struct cu_data { 138 int cu_threads; /* # threads in clnt_vc_call */ 139 bool_t cu_closing; /* TRUE if we are closing */ 140 bool_t cu_closed; /* TRUE if we are closed */ 141 struct socket *cu_socket; /* connection socket */ 142 bool_t cu_closeit; /* opened by library */ 143 struct sockaddr_storage cu_raddr; /* remote address */ 144 int cu_rlen; 145 struct timeval cu_wait; /* retransmit interval */ 146 struct timeval cu_total; /* total time for the call */ 147 struct rpc_err cu_error; 148 uint32_t cu_xid; 149 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 150 size_t cu_mcalllen; 151 size_t cu_sendsz; /* send size */ 152 size_t cu_recvsz; /* recv size */ 153 int cu_async; 154 int cu_connect; /* Use connect(). */ 155 int cu_connected; /* Have done connect(). */ 156 const char *cu_waitchan; 157 int cu_waitflag; 158 int cu_cwnd; /* congestion window */ 159 int cu_sent; /* number of in-flight RPCs */ 160 bool_t cu_cwnd_wait; 161 }; 162 163 #define CWNDSCALE 256 164 #define MAXCWND (32 * CWNDSCALE) 165 166 /* 167 * Connection less client creation returns with client handle parameters. 168 * Default options are set, which the user can change using clnt_control(). 169 * fd should be open and bound. 170 * NB: The rpch->cl_auth is initialized to null authentication. 171 * Caller may wish to set this something more useful. 172 * 173 * sendsz and recvsz are the maximum allowable packet sizes that can be 174 * sent and received. Normally they are the same, but they can be 175 * changed to improve the program efficiency and buffer allocation. 176 * If they are 0, use the transport default. 177 * 178 * If svcaddr is NULL, returns NULL. 179 */ 180 CLIENT * 181 clnt_dg_create( 182 struct socket *so, 183 struct sockaddr *svcaddr, /* servers address */ 184 rpcprog_t program, /* program number */ 185 rpcvers_t version, /* version number */ 186 size_t sendsz, /* buffer recv size */ 187 size_t recvsz) /* buffer send size */ 188 { 189 CLIENT *cl = NULL; /* client handle */ 190 struct cu_data *cu = NULL; /* private data */ 191 struct cu_socket *cs = NULL; 192 struct sockbuf *sb; 193 struct timeval now; 194 struct rpc_msg call_msg; 195 struct __rpc_sockinfo si; 196 XDR xdrs; 197 int error; 198 uint32_t newxid; 199 200 if (svcaddr == NULL) { 201 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 202 return (NULL); 203 } 204 205 if (!__rpc_socket2sockinfo(so, &si)) { 206 rpc_createerr.cf_stat = RPC_TLIERROR; 207 rpc_createerr.cf_error.re_errno = 0; 208 return (NULL); 209 } 210 211 /* 212 * Find the receive and the send size 213 */ 214 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 215 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 216 if ((sendsz == 0) || (recvsz == 0)) { 217 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 218 rpc_createerr.cf_error.re_errno = 0; 219 return (NULL); 220 } 221 222 cl = mem_alloc(sizeof (CLIENT)); 223 224 /* 225 * Should be multiple of 4 for XDR. 226 */ 227 sendsz = rounddown(sendsz + 3, 4); 228 recvsz = rounddown(recvsz + 3, 4); 229 cu = mem_alloc(sizeof (*cu)); 230 cu->cu_threads = 0; 231 cu->cu_closing = FALSE; 232 cu->cu_closed = FALSE; 233 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 234 cu->cu_rlen = svcaddr->sa_len; 235 /* Other values can also be set through clnt_control() */ 236 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 237 cu->cu_wait.tv_usec = 0; 238 cu->cu_total.tv_sec = -1; 239 cu->cu_total.tv_usec = -1; 240 cu->cu_sendsz = sendsz; 241 cu->cu_recvsz = recvsz; 242 cu->cu_async = FALSE; 243 cu->cu_connect = FALSE; 244 cu->cu_connected = FALSE; 245 cu->cu_waitchan = "rpcrecv"; 246 cu->cu_waitflag = 0; 247 cu->cu_cwnd = MAXCWND / 2; 248 cu->cu_sent = 0; 249 cu->cu_cwnd_wait = FALSE; 250 (void) getmicrotime(&now); 251 /* Clip at 28bits so that it will not wrap around. */ 252 newxid = __RPC_GETXID(&now) & 0xfffffff; 253 atomic_cmpset_32(&rpc_xid, 0, newxid); 254 call_msg.rm_xid = atomic_fetchadd_32(&rpc_xid, 1); 255 call_msg.rm_call.cb_prog = program; 256 call_msg.rm_call.cb_vers = version; 257 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 258 if (! xdr_callhdr(&xdrs, &call_msg)) { 259 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 260 rpc_createerr.cf_error.re_errno = 0; 261 goto err2; 262 } 263 cu->cu_mcalllen = XDR_GETPOS(&xdrs); 264 265 /* 266 * By default, closeit is always FALSE. It is users responsibility 267 * to do a close on it, else the user may use clnt_control 268 * to let clnt_destroy do it for him/her. 269 */ 270 cu->cu_closeit = FALSE; 271 cu->cu_socket = so; 272 error = soreserve(so, (u_long)sendsz, (u_long)recvsz); 273 if (error != 0) { 274 rpc_createerr.cf_stat = RPC_FAILED; 275 rpc_createerr.cf_error.re_errno = error; 276 goto err2; 277 } 278 279 sb = &so->so_rcv; 280 SOCKBUF_LOCK(&so->so_rcv); 281 recheck_socket: 282 if (sb->sb_upcall) { 283 if (sb->sb_upcall != clnt_dg_soupcall) { 284 SOCKBUF_UNLOCK(&so->so_rcv); 285 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 286 goto err2; 287 } 288 cs = (struct cu_socket *) sb->sb_upcallarg; 289 mtx_lock(&cs->cs_lock); 290 cs->cs_refs++; 291 mtx_unlock(&cs->cs_lock); 292 } else { 293 /* 294 * We are the first on this socket - allocate the 295 * structure and install it in the socket. 296 */ 297 SOCKBUF_UNLOCK(&so->so_rcv); 298 cs = mem_alloc(sizeof(*cs)); 299 SOCKBUF_LOCK(&so->so_rcv); 300 if (sb->sb_upcall) { 301 /* 302 * We have lost a race with some other client. 303 */ 304 mem_free(cs, sizeof(*cs)); 305 goto recheck_socket; 306 } 307 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 308 cs->cs_refs = 1; 309 cs->cs_upcallrefs = 0; 310 TAILQ_INIT(&cs->cs_pending); 311 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 312 } 313 SOCKBUF_UNLOCK(&so->so_rcv); 314 315 cl->cl_refs = 1; 316 cl->cl_ops = &clnt_dg_ops; 317 cl->cl_private = (caddr_t)(void *)cu; 318 cl->cl_auth = authnone_create(); 319 cl->cl_tp = NULL; 320 cl->cl_netid = NULL; 321 return (cl); 322 err2: 323 mem_free(cl, sizeof (CLIENT)); 324 mem_free(cu, sizeof (*cu)); 325 326 return (NULL); 327 } 328 329 static enum clnt_stat 330 clnt_dg_call( 331 CLIENT *cl, /* client handle */ 332 struct rpc_callextra *ext, /* call metadata */ 333 rpcproc_t proc, /* procedure number */ 334 struct mbuf *args, /* pointer to args */ 335 struct mbuf **resultsp, /* pointer to results */ 336 struct timeval utimeout) /* seconds to wait before giving up */ 337 { 338 struct cu_data *cu = (struct cu_data *)cl->cl_private; 339 struct cu_socket *cs; 340 struct rpc_timers *rt; 341 AUTH *auth; 342 struct rpc_err *errp; 343 enum clnt_stat stat; 344 XDR xdrs; 345 struct rpc_msg reply_msg; 346 bool_t ok; 347 int retrans; /* number of re-transmits so far */ 348 int nrefreshes = 2; /* number of times to refresh cred */ 349 struct timeval *tvp; 350 int timeout; 351 int retransmit_time; 352 int next_sendtime, starttime, rtt, time_waited, tv = 0; 353 struct sockaddr *sa; 354 uint32_t xid = 0; 355 struct mbuf *mreq = NULL, *results; 356 struct cu_request *cr; 357 int error; 358 359 cs = cu->cu_socket->so_rcv.sb_upcallarg; 360 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 361 362 mtx_lock(&cs->cs_lock); 363 364 if (cu->cu_closing || cu->cu_closed) { 365 mtx_unlock(&cs->cs_lock); 366 free(cr, M_RPC); 367 return (RPC_CANTSEND); 368 } 369 cu->cu_threads++; 370 371 if (ext) { 372 auth = ext->rc_auth; 373 errp = &ext->rc_err; 374 } else { 375 auth = cl->cl_auth; 376 errp = &cu->cu_error; 377 } 378 379 cr->cr_client = cl; 380 cr->cr_mrep = NULL; 381 cr->cr_error = 0; 382 383 if (cu->cu_total.tv_usec == -1) { 384 tvp = &utimeout; /* use supplied timeout */ 385 } else { 386 tvp = &cu->cu_total; /* use default timeout */ 387 } 388 if (tvp->tv_sec || tvp->tv_usec) 389 timeout = tvtohz(tvp); 390 else 391 timeout = 0; 392 393 if (cu->cu_connect && !cu->cu_connected) { 394 mtx_unlock(&cs->cs_lock); 395 error = soconnect(cu->cu_socket, 396 (struct sockaddr *)&cu->cu_raddr, curthread); 397 mtx_lock(&cs->cs_lock); 398 if (error) { 399 errp->re_errno = error; 400 errp->re_status = stat = RPC_CANTSEND; 401 goto out; 402 } 403 cu->cu_connected = 1; 404 } 405 if (cu->cu_connected) 406 sa = NULL; 407 else 408 sa = (struct sockaddr *)&cu->cu_raddr; 409 time_waited = 0; 410 retrans = 0; 411 if (ext && ext->rc_timers) { 412 rt = ext->rc_timers; 413 if (!rt->rt_rtxcur) 414 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 415 retransmit_time = next_sendtime = rt->rt_rtxcur; 416 } else { 417 rt = NULL; 418 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 419 } 420 421 starttime = ticks; 422 423 call_again: 424 mtx_assert(&cs->cs_lock, MA_OWNED); 425 426 xid = atomic_fetchadd_32(&rpc_xid, 1); 427 428 send_again: 429 mtx_unlock(&cs->cs_lock); 430 431 mreq = m_gethdr(M_WAITOK, MT_DATA); 432 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 433 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 434 mreq->m_len = cu->cu_mcalllen; 435 436 /* 437 * The XID is the first thing in the request. 438 */ 439 *mtod(mreq, uint32_t *) = htonl(xid); 440 441 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 442 443 if (cu->cu_async == TRUE && args == NULL) 444 goto get_reply; 445 446 if ((! XDR_PUTINT32(&xdrs, &proc)) || 447 (! AUTH_MARSHALL(auth, xid, &xdrs, 448 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 449 errp->re_status = stat = RPC_CANTENCODEARGS; 450 mtx_lock(&cs->cs_lock); 451 goto out; 452 } 453 mreq->m_pkthdr.len = m_length(mreq, NULL); 454 455 cr->cr_xid = xid; 456 mtx_lock(&cs->cs_lock); 457 458 /* 459 * Try to get a place in the congestion window. 460 */ 461 while (cu->cu_sent >= cu->cu_cwnd) { 462 cu->cu_cwnd_wait = TRUE; 463 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 464 cu->cu_waitflag, "rpccwnd", 0); 465 if (error) { 466 errp->re_errno = error; 467 if (error == EINTR || error == ERESTART) 468 errp->re_status = stat = RPC_INTR; 469 else 470 errp->re_status = stat = RPC_CANTSEND; 471 goto out; 472 } 473 } 474 cu->cu_sent += CWNDSCALE; 475 476 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 477 mtx_unlock(&cs->cs_lock); 478 479 /* 480 * sosend consumes mreq. 481 */ 482 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 483 mreq = NULL; 484 485 /* 486 * sub-optimal code appears here because we have 487 * some clock time to spare while the packets are in flight. 488 * (We assume that this is actually only executed once.) 489 */ 490 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 491 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 492 reply_msg.acpted_rply.ar_verf.oa_length = 0; 493 reply_msg.acpted_rply.ar_results.where = NULL; 494 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 495 496 mtx_lock(&cs->cs_lock); 497 if (error) { 498 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 499 errp->re_errno = error; 500 errp->re_status = stat = RPC_CANTSEND; 501 cu->cu_sent -= CWNDSCALE; 502 if (cu->cu_cwnd_wait) { 503 cu->cu_cwnd_wait = FALSE; 504 wakeup(&cu->cu_cwnd_wait); 505 } 506 goto out; 507 } 508 509 /* 510 * Check to see if we got an upcall while waiting for the 511 * lock. 512 */ 513 if (cr->cr_error) { 514 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 515 errp->re_errno = cr->cr_error; 516 errp->re_status = stat = RPC_CANTRECV; 517 cu->cu_sent -= CWNDSCALE; 518 if (cu->cu_cwnd_wait) { 519 cu->cu_cwnd_wait = FALSE; 520 wakeup(&cu->cu_cwnd_wait); 521 } 522 goto out; 523 } 524 if (cr->cr_mrep) { 525 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 526 cu->cu_sent -= CWNDSCALE; 527 if (cu->cu_cwnd_wait) { 528 cu->cu_cwnd_wait = FALSE; 529 wakeup(&cu->cu_cwnd_wait); 530 } 531 goto got_reply; 532 } 533 534 /* 535 * Hack to provide rpc-based message passing 536 */ 537 if (timeout == 0) { 538 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 539 errp->re_status = stat = RPC_TIMEDOUT; 540 cu->cu_sent -= CWNDSCALE; 541 if (cu->cu_cwnd_wait) { 542 cu->cu_cwnd_wait = FALSE; 543 wakeup(&cu->cu_cwnd_wait); 544 } 545 goto out; 546 } 547 548 get_reply: 549 for (;;) { 550 /* Decide how long to wait. */ 551 if (next_sendtime < timeout) 552 tv = next_sendtime; 553 else 554 tv = timeout; 555 tv -= time_waited; 556 557 if (tv > 0) { 558 if (cu->cu_closing || cu->cu_closed) { 559 error = 0; 560 cr->cr_error = ESHUTDOWN; 561 } else { 562 error = msleep(cr, &cs->cs_lock, 563 cu->cu_waitflag, cu->cu_waitchan, tv); 564 } 565 } else { 566 error = EWOULDBLOCK; 567 } 568 569 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 570 cu->cu_sent -= CWNDSCALE; 571 if (cu->cu_cwnd_wait) { 572 cu->cu_cwnd_wait = FALSE; 573 wakeup(&cu->cu_cwnd_wait); 574 } 575 576 if (!error) { 577 /* 578 * We were woken up by the upcall. If the 579 * upcall had a receive error, report that, 580 * otherwise we have a reply. 581 */ 582 if (cr->cr_error) { 583 errp->re_errno = cr->cr_error; 584 errp->re_status = stat = RPC_CANTRECV; 585 goto out; 586 } 587 588 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 589 + cu->cu_cwnd / 2) / cu->cu_cwnd; 590 if (cu->cu_cwnd > MAXCWND) 591 cu->cu_cwnd = MAXCWND; 592 593 if (rt) { 594 /* 595 * Add one to the time since a tick 596 * count of N means that the actual 597 * time taken was somewhere between N 598 * and N+1. 599 */ 600 rtt = ticks - starttime + 1; 601 602 /* 603 * Update our estimate of the round 604 * trip time using roughly the 605 * algorithm described in RFC 606 * 2988. Given an RTT sample R: 607 * 608 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 609 * SRTT = (1-alpha) * SRTT + alpha * R 610 * 611 * where alpha = 0.125 and beta = 0.25. 612 * 613 * The initial retransmit timeout is 614 * SRTT + 4*RTTVAR and doubles on each 615 * retransmision. 616 */ 617 if (rt->rt_srtt == 0) { 618 rt->rt_srtt = rtt; 619 rt->rt_deviate = rtt / 2; 620 } else { 621 int32_t error = rtt - rt->rt_srtt; 622 rt->rt_srtt += error / 8; 623 error = abs(error) - rt->rt_deviate; 624 rt->rt_deviate += error / 4; 625 } 626 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 627 } 628 629 break; 630 } 631 632 /* 633 * The sleep returned an error so our request is still 634 * on the list. If we got EWOULDBLOCK, we may want to 635 * re-send the request. 636 */ 637 if (error != EWOULDBLOCK) { 638 errp->re_errno = error; 639 if (error == EINTR || error == ERESTART) 640 errp->re_status = stat = RPC_INTR; 641 else 642 errp->re_status = stat = RPC_CANTRECV; 643 goto out; 644 } 645 646 time_waited = ticks - starttime; 647 648 /* Check for timeout. */ 649 if (time_waited > timeout) { 650 errp->re_errno = EWOULDBLOCK; 651 errp->re_status = stat = RPC_TIMEDOUT; 652 goto out; 653 } 654 655 /* Retransmit if necessary. */ 656 if (time_waited >= next_sendtime) { 657 cu->cu_cwnd /= 2; 658 if (cu->cu_cwnd < CWNDSCALE) 659 cu->cu_cwnd = CWNDSCALE; 660 if (ext && ext->rc_feedback) { 661 mtx_unlock(&cs->cs_lock); 662 if (retrans == 0) 663 ext->rc_feedback(FEEDBACK_REXMIT1, 664 proc, ext->rc_feedback_arg); 665 else 666 ext->rc_feedback(FEEDBACK_REXMIT2, 667 proc, ext->rc_feedback_arg); 668 mtx_lock(&cs->cs_lock); 669 } 670 if (cu->cu_closing || cu->cu_closed) { 671 errp->re_errno = ESHUTDOWN; 672 errp->re_status = stat = RPC_CANTRECV; 673 goto out; 674 } 675 retrans++; 676 /* update retransmit_time */ 677 if (retransmit_time < RPC_MAX_BACKOFF * hz) 678 retransmit_time = 2 * retransmit_time; 679 next_sendtime += retransmit_time; 680 goto send_again; 681 } 682 cu->cu_sent += CWNDSCALE; 683 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 684 } 685 686 got_reply: 687 /* 688 * Now decode and validate the response. We need to drop the 689 * lock since xdr_replymsg may end up sleeping in malloc. 690 */ 691 mtx_unlock(&cs->cs_lock); 692 693 if (ext && ext->rc_feedback) 694 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 695 696 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 697 ok = xdr_replymsg(&xdrs, &reply_msg); 698 cr->cr_mrep = NULL; 699 700 if (ok) { 701 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 702 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 703 errp->re_status = stat = RPC_SUCCESS; 704 else 705 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 706 707 if (errp->re_status == RPC_SUCCESS) { 708 results = xdrmbuf_getall(&xdrs); 709 if (! AUTH_VALIDATE(auth, xid, 710 &reply_msg.acpted_rply.ar_verf, 711 &results)) { 712 errp->re_status = stat = RPC_AUTHERROR; 713 errp->re_why = AUTH_INVALIDRESP; 714 if (retrans && 715 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 716 /* 717 * If we retransmitted, its 718 * possible that we will 719 * receive a reply for one of 720 * the earlier transmissions 721 * (which will use an older 722 * RPCSEC_GSS sequence 723 * number). In this case, just 724 * go back and listen for a 725 * new reply. We could keep a 726 * record of all the seq 727 * numbers we have transmitted 728 * so far so that we could 729 * accept a reply for any of 730 * them here. 731 */ 732 XDR_DESTROY(&xdrs); 733 mtx_lock(&cs->cs_lock); 734 cu->cu_sent += CWNDSCALE; 735 TAILQ_INSERT_TAIL(&cs->cs_pending, 736 cr, cr_link); 737 cr->cr_mrep = NULL; 738 goto get_reply; 739 } 740 } else { 741 *resultsp = results; 742 } 743 } /* end successful completion */ 744 /* 745 * If unsuccessful AND error is an authentication error 746 * then refresh credentials and try again, else break 747 */ 748 else if (stat == RPC_AUTHERROR) 749 /* maybe our credentials need to be refreshed ... */ 750 if (nrefreshes > 0 && 751 AUTH_REFRESH(auth, &reply_msg)) { 752 nrefreshes--; 753 XDR_DESTROY(&xdrs); 754 mtx_lock(&cs->cs_lock); 755 goto call_again; 756 } 757 /* end of unsuccessful completion */ 758 } /* end of valid reply message */ 759 else { 760 errp->re_status = stat = RPC_CANTDECODERES; 761 762 } 763 XDR_DESTROY(&xdrs); 764 mtx_lock(&cs->cs_lock); 765 out: 766 mtx_assert(&cs->cs_lock, MA_OWNED); 767 768 if (mreq) 769 m_freem(mreq); 770 if (cr->cr_mrep) 771 m_freem(cr->cr_mrep); 772 773 cu->cu_threads--; 774 if (cu->cu_closing) 775 wakeup(cu); 776 777 mtx_unlock(&cs->cs_lock); 778 779 if (auth && stat != RPC_SUCCESS) 780 AUTH_VALIDATE(auth, xid, NULL, NULL); 781 782 free(cr, M_RPC); 783 784 return (stat); 785 } 786 787 static void 788 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 789 { 790 struct cu_data *cu = (struct cu_data *)cl->cl_private; 791 792 *errp = cu->cu_error; 793 } 794 795 static bool_t 796 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 797 { 798 XDR xdrs; 799 bool_t dummy; 800 801 xdrs.x_op = XDR_FREE; 802 dummy = (*xdr_res)(&xdrs, res_ptr); 803 804 return (dummy); 805 } 806 807 /*ARGSUSED*/ 808 static void 809 clnt_dg_abort(CLIENT *h) 810 { 811 } 812 813 static bool_t 814 clnt_dg_control(CLIENT *cl, u_int request, void *info) 815 { 816 struct cu_data *cu = (struct cu_data *)cl->cl_private; 817 struct cu_socket *cs; 818 struct sockaddr *addr; 819 820 cs = cu->cu_socket->so_rcv.sb_upcallarg; 821 mtx_lock(&cs->cs_lock); 822 823 switch (request) { 824 case CLSET_FD_CLOSE: 825 cu->cu_closeit = TRUE; 826 mtx_unlock(&cs->cs_lock); 827 return (TRUE); 828 case CLSET_FD_NCLOSE: 829 cu->cu_closeit = FALSE; 830 mtx_unlock(&cs->cs_lock); 831 return (TRUE); 832 } 833 834 /* for other requests which use info */ 835 if (info == NULL) { 836 mtx_unlock(&cs->cs_lock); 837 return (FALSE); 838 } 839 switch (request) { 840 case CLSET_TIMEOUT: 841 if (time_not_ok((struct timeval *)info)) { 842 mtx_unlock(&cs->cs_lock); 843 return (FALSE); 844 } 845 cu->cu_total = *(struct timeval *)info; 846 break; 847 case CLGET_TIMEOUT: 848 *(struct timeval *)info = cu->cu_total; 849 break; 850 case CLSET_RETRY_TIMEOUT: 851 if (time_not_ok((struct timeval *)info)) { 852 mtx_unlock(&cs->cs_lock); 853 return (FALSE); 854 } 855 cu->cu_wait = *(struct timeval *)info; 856 break; 857 case CLGET_RETRY_TIMEOUT: 858 *(struct timeval *)info = cu->cu_wait; 859 break; 860 case CLGET_SVC_ADDR: 861 /* 862 * Slightly different semantics to userland - we use 863 * sockaddr instead of netbuf. 864 */ 865 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 866 break; 867 case CLSET_SVC_ADDR: /* set to new address */ 868 addr = (struct sockaddr *)info; 869 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 870 break; 871 case CLGET_XID: 872 *(uint32_t *)info = atomic_load_32(&rpc_xid); 873 break; 874 875 case CLSET_XID: 876 /* This will set the xid of the NEXT call */ 877 /* decrement by 1 as clnt_dg_call() increments once */ 878 atomic_store_32(&rpc_xid, *(uint32_t *)info - 1); 879 break; 880 881 case CLGET_VERS: 882 /* 883 * This RELIES on the information that, in the call body, 884 * the version number field is the fifth field from the 885 * beginning of the RPC header. MUST be changed if the 886 * call_struct is changed 887 */ 888 *(uint32_t *)info = 889 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 890 4 * BYTES_PER_XDR_UNIT)); 891 break; 892 893 case CLSET_VERS: 894 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 895 = htonl(*(uint32_t *)info); 896 break; 897 898 case CLGET_PROG: 899 /* 900 * This RELIES on the information that, in the call body, 901 * the program number field is the fourth field from the 902 * beginning of the RPC header. MUST be changed if the 903 * call_struct is changed 904 */ 905 *(uint32_t *)info = 906 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 907 3 * BYTES_PER_XDR_UNIT)); 908 break; 909 910 case CLSET_PROG: 911 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 912 = htonl(*(uint32_t *)info); 913 break; 914 case CLSET_ASYNC: 915 cu->cu_async = *(int *)info; 916 break; 917 case CLSET_CONNECT: 918 cu->cu_connect = *(int *)info; 919 break; 920 case CLSET_WAITCHAN: 921 cu->cu_waitchan = (const char *)info; 922 break; 923 case CLGET_WAITCHAN: 924 *(const char **) info = cu->cu_waitchan; 925 break; 926 case CLSET_INTERRUPTIBLE: 927 if (*(int *) info) 928 cu->cu_waitflag = PCATCH; 929 else 930 cu->cu_waitflag = 0; 931 break; 932 case CLGET_INTERRUPTIBLE: 933 if (cu->cu_waitflag) 934 *(int *) info = TRUE; 935 else 936 *(int *) info = FALSE; 937 break; 938 default: 939 mtx_unlock(&cs->cs_lock); 940 return (FALSE); 941 } 942 mtx_unlock(&cs->cs_lock); 943 return (TRUE); 944 } 945 946 static void 947 clnt_dg_close(CLIENT *cl) 948 { 949 struct cu_data *cu = (struct cu_data *)cl->cl_private; 950 struct cu_socket *cs; 951 struct cu_request *cr; 952 953 cs = cu->cu_socket->so_rcv.sb_upcallarg; 954 mtx_lock(&cs->cs_lock); 955 956 if (cu->cu_closed) { 957 mtx_unlock(&cs->cs_lock); 958 return; 959 } 960 961 if (cu->cu_closing) { 962 while (cu->cu_closing) 963 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 964 KASSERT(cu->cu_closed, ("client should be closed")); 965 mtx_unlock(&cs->cs_lock); 966 return; 967 } 968 969 /* 970 * Abort any pending requests and wait until everyone 971 * has finished with clnt_vc_call. 972 */ 973 cu->cu_closing = TRUE; 974 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 975 if (cr->cr_client == cl) { 976 cr->cr_xid = 0; 977 cr->cr_error = ESHUTDOWN; 978 wakeup(cr); 979 } 980 } 981 982 while (cu->cu_threads) 983 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 984 985 cu->cu_closing = FALSE; 986 cu->cu_closed = TRUE; 987 988 mtx_unlock(&cs->cs_lock); 989 wakeup(cu); 990 } 991 992 static void 993 clnt_dg_destroy(CLIENT *cl) 994 { 995 struct cu_data *cu = (struct cu_data *)cl->cl_private; 996 struct cu_socket *cs; 997 struct socket *so = NULL; 998 bool_t lastsocketref; 999 1000 cs = cu->cu_socket->so_rcv.sb_upcallarg; 1001 clnt_dg_close(cl); 1002 1003 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 1004 mtx_lock(&cs->cs_lock); 1005 1006 cs->cs_refs--; 1007 if (cs->cs_refs == 0) { 1008 mtx_unlock(&cs->cs_lock); 1009 soupcall_clear(cu->cu_socket, SO_RCV); 1010 clnt_dg_upcallsdone(cu->cu_socket, cs); 1011 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1012 mtx_destroy(&cs->cs_lock); 1013 mem_free(cs, sizeof(*cs)); 1014 lastsocketref = TRUE; 1015 } else { 1016 mtx_unlock(&cs->cs_lock); 1017 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1018 lastsocketref = FALSE; 1019 } 1020 1021 if (cu->cu_closeit && lastsocketref) { 1022 so = cu->cu_socket; 1023 cu->cu_socket = NULL; 1024 } 1025 1026 if (so) 1027 soclose(so); 1028 1029 if (cl->cl_netid && cl->cl_netid[0]) 1030 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1031 if (cl->cl_tp && cl->cl_tp[0]) 1032 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1033 mem_free(cu, sizeof (*cu)); 1034 mem_free(cl, sizeof (CLIENT)); 1035 } 1036 1037 /* 1038 * Make sure that the time is not garbage. -1 value is allowed. 1039 */ 1040 static bool_t 1041 time_not_ok(struct timeval *t) 1042 { 1043 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1044 t->tv_usec < -1 || t->tv_usec > 1000000); 1045 } 1046 1047 int 1048 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1049 { 1050 struct cu_socket *cs = (struct cu_socket *) arg; 1051 struct uio uio; 1052 struct mbuf *m; 1053 struct mbuf *control; 1054 struct cu_request *cr; 1055 int error, rcvflag, foundreq; 1056 uint32_t xid; 1057 1058 cs->cs_upcallrefs++; 1059 uio.uio_resid = 1000000000; 1060 uio.uio_td = curthread; 1061 do { 1062 SOCKBUF_UNLOCK(&so->so_rcv); 1063 m = NULL; 1064 control = NULL; 1065 rcvflag = MSG_DONTWAIT; 1066 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1067 if (control) 1068 m_freem(control); 1069 SOCKBUF_LOCK(&so->so_rcv); 1070 1071 if (error == EWOULDBLOCK) 1072 break; 1073 1074 /* 1075 * If there was an error, wake up all pending 1076 * requests. 1077 */ 1078 if (error) { 1079 mtx_lock(&cs->cs_lock); 1080 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1081 cr->cr_xid = 0; 1082 cr->cr_error = error; 1083 wakeup(cr); 1084 } 1085 mtx_unlock(&cs->cs_lock); 1086 break; 1087 } 1088 1089 /* 1090 * The XID is in the first uint32_t of the reply. 1091 */ 1092 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) { 1093 /* 1094 * Should never happen. 1095 */ 1096 m_freem(m); 1097 continue; 1098 } 1099 1100 m_copydata(m, 0, sizeof(xid), (char *)&xid); 1101 xid = ntohl(xid); 1102 1103 /* 1104 * Attempt to match this reply with a pending request. 1105 */ 1106 mtx_lock(&cs->cs_lock); 1107 foundreq = 0; 1108 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1109 if (cr->cr_xid == xid) { 1110 /* 1111 * This one matches. We leave the 1112 * reply mbuf in cr->cr_mrep. Set the 1113 * XID to zero so that we will ignore 1114 * any duplicated replies that arrive 1115 * before clnt_dg_call removes it from 1116 * the queue. 1117 */ 1118 cr->cr_xid = 0; 1119 cr->cr_mrep = m; 1120 cr->cr_error = 0; 1121 foundreq = 1; 1122 wakeup(cr); 1123 break; 1124 } 1125 } 1126 mtx_unlock(&cs->cs_lock); 1127 1128 /* 1129 * If we didn't find the matching request, just drop 1130 * it - its probably a repeated reply. 1131 */ 1132 if (!foundreq) 1133 m_freem(m); 1134 } while (m); 1135 cs->cs_upcallrefs--; 1136 if (cs->cs_upcallrefs < 0) 1137 panic("rpcdg upcall refcnt"); 1138 if (cs->cs_upcallrefs == 0) 1139 wakeup(&cs->cs_upcallrefs); 1140 return (SU_OK); 1141 } 1142 1143 /* 1144 * Wait for all upcalls in progress to complete. 1145 */ 1146 static void 1147 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1148 { 1149 1150 SOCKBUF_LOCK_ASSERT(&so->so_rcv); 1151 1152 while (cs->cs_upcallrefs > 0) 1153 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1154 "rpcdgup", 0); 1155 } 1156