1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 /* 33 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 34 */ 35 36 #if defined(LIBC_SCCS) && !defined(lint) 37 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 38 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 39 #endif 40 #include <sys/cdefs.h> 41 __FBSDID("$FreeBSD$"); 42 43 /* 44 * Implements a connectionless client side RPC. 45 */ 46 47 #include <sys/param.h> 48 #include <sys/systm.h> 49 #include <sys/kernel.h> 50 #include <sys/lock.h> 51 #include <sys/malloc.h> 52 #include <sys/mbuf.h> 53 #include <sys/mutex.h> 54 #include <sys/pcpu.h> 55 #include <sys/proc.h> 56 #include <sys/socket.h> 57 #include <sys/socketvar.h> 58 #include <sys/time.h> 59 #include <sys/uio.h> 60 61 #include <net/vnet.h> 62 63 #include <rpc/rpc.h> 64 #include <rpc/rpc_com.h> 65 66 67 #ifdef _FREEFALL_CONFIG 68 /* 69 * Disable RPC exponential back-off for FreeBSD.org systems. 70 */ 71 #define RPC_MAX_BACKOFF 1 /* second */ 72 #else 73 #define RPC_MAX_BACKOFF 30 /* seconds */ 74 #endif 75 76 static bool_t time_not_ok(struct timeval *); 77 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 78 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 79 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 80 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 81 static void clnt_dg_abort(CLIENT *); 82 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 83 static void clnt_dg_close(CLIENT *); 84 static void clnt_dg_destroy(CLIENT *); 85 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 86 87 static struct clnt_ops clnt_dg_ops = { 88 .cl_call = clnt_dg_call, 89 .cl_abort = clnt_dg_abort, 90 .cl_geterr = clnt_dg_geterr, 91 .cl_freeres = clnt_dg_freeres, 92 .cl_close = clnt_dg_close, 93 .cl_destroy = clnt_dg_destroy, 94 .cl_control = clnt_dg_control 95 }; 96 97 /* 98 * A pending RPC request which awaits a reply. Requests which have 99 * received their reply will have cr_xid set to zero and cr_mrep to 100 * the mbuf chain of the reply. 101 */ 102 struct cu_request { 103 TAILQ_ENTRY(cu_request) cr_link; 104 CLIENT *cr_client; /* owner */ 105 uint32_t cr_xid; /* XID of request */ 106 struct mbuf *cr_mrep; /* reply received by upcall */ 107 int cr_error; /* any error from upcall */ 108 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 109 }; 110 111 TAILQ_HEAD(cu_request_list, cu_request); 112 113 #define MCALL_MSG_SIZE 24 114 115 /* 116 * This structure is pointed to by the socket buffer's sb_upcallarg 117 * member. It is separate from the client private data to facilitate 118 * multiple clients sharing the same socket. The cs_lock mutex is used 119 * to protect all fields of this structure, the socket's receive 120 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 121 * structures is installed on the socket. 122 */ 123 struct cu_socket { 124 struct mtx cs_lock; 125 int cs_refs; /* Count of clients */ 126 struct cu_request_list cs_pending; /* Requests awaiting replies */ 127 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 128 }; 129 130 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 131 132 /* 133 * Private data kept per client handle 134 */ 135 struct cu_data { 136 int cu_threads; /* # threads in clnt_vc_call */ 137 bool_t cu_closing; /* TRUE if we are closing */ 138 bool_t cu_closed; /* TRUE if we are closed */ 139 struct socket *cu_socket; /* connection socket */ 140 bool_t cu_closeit; /* opened by library */ 141 struct sockaddr_storage cu_raddr; /* remote address */ 142 int cu_rlen; 143 struct timeval cu_wait; /* retransmit interval */ 144 struct timeval cu_total; /* total time for the call */ 145 struct rpc_err cu_error; 146 uint32_t cu_xid; 147 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 148 size_t cu_mcalllen; 149 size_t cu_sendsz; /* send size */ 150 size_t cu_recvsz; /* recv size */ 151 int cu_async; 152 int cu_connect; /* Use connect(). */ 153 int cu_connected; /* Have done connect(). */ 154 const char *cu_waitchan; 155 int cu_waitflag; 156 int cu_cwnd; /* congestion window */ 157 int cu_sent; /* number of in-flight RPCs */ 158 bool_t cu_cwnd_wait; 159 }; 160 161 #define CWNDSCALE 256 162 #define MAXCWND (32 * CWNDSCALE) 163 164 /* 165 * Connection less client creation returns with client handle parameters. 166 * Default options are set, which the user can change using clnt_control(). 167 * fd should be open and bound. 168 * NB: The rpch->cl_auth is initialized to null authentication. 169 * Caller may wish to set this something more useful. 170 * 171 * sendsz and recvsz are the maximum allowable packet sizes that can be 172 * sent and received. Normally they are the same, but they can be 173 * changed to improve the program efficiency and buffer allocation. 174 * If they are 0, use the transport default. 175 * 176 * If svcaddr is NULL, returns NULL. 177 */ 178 CLIENT * 179 clnt_dg_create( 180 struct socket *so, 181 struct sockaddr *svcaddr, /* servers address */ 182 rpcprog_t program, /* program number */ 183 rpcvers_t version, /* version number */ 184 size_t sendsz, /* buffer recv size */ 185 size_t recvsz) /* buffer send size */ 186 { 187 CLIENT *cl = NULL; /* client handle */ 188 struct cu_data *cu = NULL; /* private data */ 189 struct cu_socket *cs = NULL; 190 struct sockbuf *sb; 191 struct timeval now; 192 struct rpc_msg call_msg; 193 struct __rpc_sockinfo si; 194 XDR xdrs; 195 int error; 196 197 if (svcaddr == NULL) { 198 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 199 return (NULL); 200 } 201 202 if (!__rpc_socket2sockinfo(so, &si)) { 203 rpc_createerr.cf_stat = RPC_TLIERROR; 204 rpc_createerr.cf_error.re_errno = 0; 205 return (NULL); 206 } 207 208 /* 209 * Find the receive and the send size 210 */ 211 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 212 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 213 if ((sendsz == 0) || (recvsz == 0)) { 214 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 215 rpc_createerr.cf_error.re_errno = 0; 216 return (NULL); 217 } 218 219 cl = mem_alloc(sizeof (CLIENT)); 220 221 /* 222 * Should be multiple of 4 for XDR. 223 */ 224 sendsz = rounddown(sendsz + 3, 4); 225 recvsz = rounddown(recvsz + 3, 4); 226 cu = mem_alloc(sizeof (*cu)); 227 cu->cu_threads = 0; 228 cu->cu_closing = FALSE; 229 cu->cu_closed = FALSE; 230 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 231 cu->cu_rlen = svcaddr->sa_len; 232 /* Other values can also be set through clnt_control() */ 233 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 234 cu->cu_wait.tv_usec = 0; 235 cu->cu_total.tv_sec = -1; 236 cu->cu_total.tv_usec = -1; 237 cu->cu_sendsz = sendsz; 238 cu->cu_recvsz = recvsz; 239 cu->cu_async = FALSE; 240 cu->cu_connect = FALSE; 241 cu->cu_connected = FALSE; 242 cu->cu_waitchan = "rpcrecv"; 243 cu->cu_waitflag = 0; 244 cu->cu_cwnd = MAXCWND / 2; 245 cu->cu_sent = 0; 246 cu->cu_cwnd_wait = FALSE; 247 (void) getmicrotime(&now); 248 cu->cu_xid = __RPC_GETXID(&now); 249 call_msg.rm_xid = cu->cu_xid; 250 call_msg.rm_call.cb_prog = program; 251 call_msg.rm_call.cb_vers = version; 252 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 253 if (! xdr_callhdr(&xdrs, &call_msg)) { 254 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 255 rpc_createerr.cf_error.re_errno = 0; 256 goto err2; 257 } 258 cu->cu_mcalllen = XDR_GETPOS(&xdrs); 259 260 /* 261 * By default, closeit is always FALSE. It is users responsibility 262 * to do a close on it, else the user may use clnt_control 263 * to let clnt_destroy do it for him/her. 264 */ 265 cu->cu_closeit = FALSE; 266 cu->cu_socket = so; 267 error = soreserve(so, (u_long)sendsz, (u_long)recvsz); 268 if (error != 0) { 269 rpc_createerr.cf_stat = RPC_FAILED; 270 rpc_createerr.cf_error.re_errno = error; 271 goto err2; 272 } 273 274 sb = &so->so_rcv; 275 SOCKBUF_LOCK(&so->so_rcv); 276 recheck_socket: 277 if (sb->sb_upcall) { 278 if (sb->sb_upcall != clnt_dg_soupcall) { 279 SOCKBUF_UNLOCK(&so->so_rcv); 280 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 281 goto err2; 282 } 283 cs = (struct cu_socket *) sb->sb_upcallarg; 284 mtx_lock(&cs->cs_lock); 285 cs->cs_refs++; 286 mtx_unlock(&cs->cs_lock); 287 } else { 288 /* 289 * We are the first on this socket - allocate the 290 * structure and install it in the socket. 291 */ 292 SOCKBUF_UNLOCK(&so->so_rcv); 293 cs = mem_alloc(sizeof(*cs)); 294 SOCKBUF_LOCK(&so->so_rcv); 295 if (sb->sb_upcall) { 296 /* 297 * We have lost a race with some other client. 298 */ 299 mem_free(cs, sizeof(*cs)); 300 goto recheck_socket; 301 } 302 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 303 cs->cs_refs = 1; 304 cs->cs_upcallrefs = 0; 305 TAILQ_INIT(&cs->cs_pending); 306 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 307 } 308 SOCKBUF_UNLOCK(&so->so_rcv); 309 310 cl->cl_refs = 1; 311 cl->cl_ops = &clnt_dg_ops; 312 cl->cl_private = (caddr_t)(void *)cu; 313 cl->cl_auth = authnone_create(); 314 cl->cl_tp = NULL; 315 cl->cl_netid = NULL; 316 return (cl); 317 err2: 318 mem_free(cl, sizeof (CLIENT)); 319 mem_free(cu, sizeof (*cu)); 320 321 return (NULL); 322 } 323 324 static enum clnt_stat 325 clnt_dg_call( 326 CLIENT *cl, /* client handle */ 327 struct rpc_callextra *ext, /* call metadata */ 328 rpcproc_t proc, /* procedure number */ 329 struct mbuf *args, /* pointer to args */ 330 struct mbuf **resultsp, /* pointer to results */ 331 struct timeval utimeout) /* seconds to wait before giving up */ 332 { 333 struct cu_data *cu = (struct cu_data *)cl->cl_private; 334 struct cu_socket *cs; 335 struct rpc_timers *rt; 336 AUTH *auth; 337 struct rpc_err *errp; 338 enum clnt_stat stat; 339 XDR xdrs; 340 struct rpc_msg reply_msg; 341 bool_t ok; 342 int retrans; /* number of re-transmits so far */ 343 int nrefreshes = 2; /* number of times to refresh cred */ 344 struct timeval *tvp; 345 int timeout; 346 int retransmit_time; 347 int next_sendtime, starttime, rtt, time_waited, tv = 0; 348 struct sockaddr *sa; 349 uint32_t xid = 0; 350 struct mbuf *mreq = NULL, *results; 351 struct cu_request *cr; 352 int error; 353 354 cs = cu->cu_socket->so_rcv.sb_upcallarg; 355 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 356 357 mtx_lock(&cs->cs_lock); 358 359 if (cu->cu_closing || cu->cu_closed) { 360 mtx_unlock(&cs->cs_lock); 361 free(cr, M_RPC); 362 return (RPC_CANTSEND); 363 } 364 cu->cu_threads++; 365 366 if (ext) { 367 auth = ext->rc_auth; 368 errp = &ext->rc_err; 369 } else { 370 auth = cl->cl_auth; 371 errp = &cu->cu_error; 372 } 373 374 cr->cr_client = cl; 375 cr->cr_mrep = NULL; 376 cr->cr_error = 0; 377 378 if (cu->cu_total.tv_usec == -1) { 379 tvp = &utimeout; /* use supplied timeout */ 380 } else { 381 tvp = &cu->cu_total; /* use default timeout */ 382 } 383 if (tvp->tv_sec || tvp->tv_usec) 384 timeout = tvtohz(tvp); 385 else 386 timeout = 0; 387 388 if (cu->cu_connect && !cu->cu_connected) { 389 mtx_unlock(&cs->cs_lock); 390 error = soconnect(cu->cu_socket, 391 (struct sockaddr *)&cu->cu_raddr, curthread); 392 mtx_lock(&cs->cs_lock); 393 if (error) { 394 errp->re_errno = error; 395 errp->re_status = stat = RPC_CANTSEND; 396 goto out; 397 } 398 cu->cu_connected = 1; 399 } 400 if (cu->cu_connected) 401 sa = NULL; 402 else 403 sa = (struct sockaddr *)&cu->cu_raddr; 404 time_waited = 0; 405 retrans = 0; 406 if (ext && ext->rc_timers) { 407 rt = ext->rc_timers; 408 if (!rt->rt_rtxcur) 409 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 410 retransmit_time = next_sendtime = rt->rt_rtxcur; 411 } else { 412 rt = NULL; 413 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 414 } 415 416 starttime = ticks; 417 418 call_again: 419 mtx_assert(&cs->cs_lock, MA_OWNED); 420 421 cu->cu_xid++; 422 xid = cu->cu_xid; 423 424 send_again: 425 mtx_unlock(&cs->cs_lock); 426 427 mreq = m_gethdr(M_WAITOK, MT_DATA); 428 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 429 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 430 mreq->m_len = cu->cu_mcalllen; 431 432 /* 433 * The XID is the first thing in the request. 434 */ 435 *mtod(mreq, uint32_t *) = htonl(xid); 436 437 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 438 439 if (cu->cu_async == TRUE && args == NULL) 440 goto get_reply; 441 442 if ((! XDR_PUTINT32(&xdrs, &proc)) || 443 (! AUTH_MARSHALL(auth, xid, &xdrs, 444 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 445 errp->re_status = stat = RPC_CANTENCODEARGS; 446 mtx_lock(&cs->cs_lock); 447 goto out; 448 } 449 mreq->m_pkthdr.len = m_length(mreq, NULL); 450 451 cr->cr_xid = xid; 452 mtx_lock(&cs->cs_lock); 453 454 /* 455 * Try to get a place in the congestion window. 456 */ 457 while (cu->cu_sent >= cu->cu_cwnd) { 458 cu->cu_cwnd_wait = TRUE; 459 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 460 cu->cu_waitflag, "rpccwnd", 0); 461 if (error) { 462 errp->re_errno = error; 463 if (error == EINTR || error == ERESTART) 464 errp->re_status = stat = RPC_INTR; 465 else 466 errp->re_status = stat = RPC_CANTSEND; 467 goto out; 468 } 469 } 470 cu->cu_sent += CWNDSCALE; 471 472 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 473 mtx_unlock(&cs->cs_lock); 474 475 /* 476 * sosend consumes mreq. 477 */ 478 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 479 mreq = NULL; 480 481 /* 482 * sub-optimal code appears here because we have 483 * some clock time to spare while the packets are in flight. 484 * (We assume that this is actually only executed once.) 485 */ 486 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 487 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 488 reply_msg.acpted_rply.ar_verf.oa_length = 0; 489 reply_msg.acpted_rply.ar_results.where = NULL; 490 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 491 492 mtx_lock(&cs->cs_lock); 493 if (error) { 494 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 495 errp->re_errno = error; 496 errp->re_status = stat = RPC_CANTSEND; 497 cu->cu_sent -= CWNDSCALE; 498 if (cu->cu_cwnd_wait) { 499 cu->cu_cwnd_wait = FALSE; 500 wakeup(&cu->cu_cwnd_wait); 501 } 502 goto out; 503 } 504 505 /* 506 * Check to see if we got an upcall while waiting for the 507 * lock. 508 */ 509 if (cr->cr_error) { 510 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 511 errp->re_errno = cr->cr_error; 512 errp->re_status = stat = RPC_CANTRECV; 513 cu->cu_sent -= CWNDSCALE; 514 if (cu->cu_cwnd_wait) { 515 cu->cu_cwnd_wait = FALSE; 516 wakeup(&cu->cu_cwnd_wait); 517 } 518 goto out; 519 } 520 if (cr->cr_mrep) { 521 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 522 cu->cu_sent -= CWNDSCALE; 523 if (cu->cu_cwnd_wait) { 524 cu->cu_cwnd_wait = FALSE; 525 wakeup(&cu->cu_cwnd_wait); 526 } 527 goto got_reply; 528 } 529 530 /* 531 * Hack to provide rpc-based message passing 532 */ 533 if (timeout == 0) { 534 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 535 errp->re_status = stat = RPC_TIMEDOUT; 536 cu->cu_sent -= CWNDSCALE; 537 if (cu->cu_cwnd_wait) { 538 cu->cu_cwnd_wait = FALSE; 539 wakeup(&cu->cu_cwnd_wait); 540 } 541 goto out; 542 } 543 544 get_reply: 545 for (;;) { 546 /* Decide how long to wait. */ 547 if (next_sendtime < timeout) 548 tv = next_sendtime; 549 else 550 tv = timeout; 551 tv -= time_waited; 552 553 if (tv > 0) { 554 if (cu->cu_closing || cu->cu_closed) { 555 error = 0; 556 cr->cr_error = ESHUTDOWN; 557 } else { 558 error = msleep(cr, &cs->cs_lock, 559 cu->cu_waitflag, cu->cu_waitchan, tv); 560 } 561 } else { 562 error = EWOULDBLOCK; 563 } 564 565 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 566 cu->cu_sent -= CWNDSCALE; 567 if (cu->cu_cwnd_wait) { 568 cu->cu_cwnd_wait = FALSE; 569 wakeup(&cu->cu_cwnd_wait); 570 } 571 572 if (!error) { 573 /* 574 * We were woken up by the upcall. If the 575 * upcall had a receive error, report that, 576 * otherwise we have a reply. 577 */ 578 if (cr->cr_error) { 579 errp->re_errno = cr->cr_error; 580 errp->re_status = stat = RPC_CANTRECV; 581 goto out; 582 } 583 584 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 585 + cu->cu_cwnd / 2) / cu->cu_cwnd; 586 if (cu->cu_cwnd > MAXCWND) 587 cu->cu_cwnd = MAXCWND; 588 589 if (rt) { 590 /* 591 * Add one to the time since a tick 592 * count of N means that the actual 593 * time taken was somewhere between N 594 * and N+1. 595 */ 596 rtt = ticks - starttime + 1; 597 598 /* 599 * Update our estimate of the round 600 * trip time using roughly the 601 * algorithm described in RFC 602 * 2988. Given an RTT sample R: 603 * 604 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 605 * SRTT = (1-alpha) * SRTT + alpha * R 606 * 607 * where alpha = 0.125 and beta = 0.25. 608 * 609 * The initial retransmit timeout is 610 * SRTT + 4*RTTVAR and doubles on each 611 * retransmision. 612 */ 613 if (rt->rt_srtt == 0) { 614 rt->rt_srtt = rtt; 615 rt->rt_deviate = rtt / 2; 616 } else { 617 int32_t error = rtt - rt->rt_srtt; 618 rt->rt_srtt += error / 8; 619 error = abs(error) - rt->rt_deviate; 620 rt->rt_deviate += error / 4; 621 } 622 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 623 } 624 625 break; 626 } 627 628 /* 629 * The sleep returned an error so our request is still 630 * on the list. If we got EWOULDBLOCK, we may want to 631 * re-send the request. 632 */ 633 if (error != EWOULDBLOCK) { 634 errp->re_errno = error; 635 if (error == EINTR || error == ERESTART) 636 errp->re_status = stat = RPC_INTR; 637 else 638 errp->re_status = stat = RPC_CANTRECV; 639 goto out; 640 } 641 642 time_waited = ticks - starttime; 643 644 /* Check for timeout. */ 645 if (time_waited > timeout) { 646 errp->re_errno = EWOULDBLOCK; 647 errp->re_status = stat = RPC_TIMEDOUT; 648 goto out; 649 } 650 651 /* Retransmit if necessary. */ 652 if (time_waited >= next_sendtime) { 653 cu->cu_cwnd /= 2; 654 if (cu->cu_cwnd < CWNDSCALE) 655 cu->cu_cwnd = CWNDSCALE; 656 if (ext && ext->rc_feedback) { 657 mtx_unlock(&cs->cs_lock); 658 if (retrans == 0) 659 ext->rc_feedback(FEEDBACK_REXMIT1, 660 proc, ext->rc_feedback_arg); 661 else 662 ext->rc_feedback(FEEDBACK_REXMIT2, 663 proc, ext->rc_feedback_arg); 664 mtx_lock(&cs->cs_lock); 665 } 666 if (cu->cu_closing || cu->cu_closed) { 667 errp->re_errno = ESHUTDOWN; 668 errp->re_status = stat = RPC_CANTRECV; 669 goto out; 670 } 671 retrans++; 672 /* update retransmit_time */ 673 if (retransmit_time < RPC_MAX_BACKOFF * hz) 674 retransmit_time = 2 * retransmit_time; 675 next_sendtime += retransmit_time; 676 goto send_again; 677 } 678 cu->cu_sent += CWNDSCALE; 679 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 680 } 681 682 got_reply: 683 /* 684 * Now decode and validate the response. We need to drop the 685 * lock since xdr_replymsg may end up sleeping in malloc. 686 */ 687 mtx_unlock(&cs->cs_lock); 688 689 if (ext && ext->rc_feedback) 690 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 691 692 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 693 ok = xdr_replymsg(&xdrs, &reply_msg); 694 cr->cr_mrep = NULL; 695 696 if (ok) { 697 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 698 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 699 errp->re_status = stat = RPC_SUCCESS; 700 else 701 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 702 703 if (errp->re_status == RPC_SUCCESS) { 704 results = xdrmbuf_getall(&xdrs); 705 if (! AUTH_VALIDATE(auth, xid, 706 &reply_msg.acpted_rply.ar_verf, 707 &results)) { 708 errp->re_status = stat = RPC_AUTHERROR; 709 errp->re_why = AUTH_INVALIDRESP; 710 if (retrans && 711 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 712 /* 713 * If we retransmitted, its 714 * possible that we will 715 * receive a reply for one of 716 * the earlier transmissions 717 * (which will use an older 718 * RPCSEC_GSS sequence 719 * number). In this case, just 720 * go back and listen for a 721 * new reply. We could keep a 722 * record of all the seq 723 * numbers we have transmitted 724 * so far so that we could 725 * accept a reply for any of 726 * them here. 727 */ 728 XDR_DESTROY(&xdrs); 729 mtx_lock(&cs->cs_lock); 730 cu->cu_sent += CWNDSCALE; 731 TAILQ_INSERT_TAIL(&cs->cs_pending, 732 cr, cr_link); 733 cr->cr_mrep = NULL; 734 goto get_reply; 735 } 736 } else { 737 *resultsp = results; 738 } 739 } /* end successful completion */ 740 /* 741 * If unsuccessful AND error is an authentication error 742 * then refresh credentials and try again, else break 743 */ 744 else if (stat == RPC_AUTHERROR) 745 /* maybe our credentials need to be refreshed ... */ 746 if (nrefreshes > 0 && 747 AUTH_REFRESH(auth, &reply_msg)) { 748 nrefreshes--; 749 XDR_DESTROY(&xdrs); 750 mtx_lock(&cs->cs_lock); 751 goto call_again; 752 } 753 /* end of unsuccessful completion */ 754 } /* end of valid reply message */ 755 else { 756 errp->re_status = stat = RPC_CANTDECODERES; 757 758 } 759 XDR_DESTROY(&xdrs); 760 mtx_lock(&cs->cs_lock); 761 out: 762 mtx_assert(&cs->cs_lock, MA_OWNED); 763 764 if (mreq) 765 m_freem(mreq); 766 if (cr->cr_mrep) 767 m_freem(cr->cr_mrep); 768 769 cu->cu_threads--; 770 if (cu->cu_closing) 771 wakeup(cu); 772 773 mtx_unlock(&cs->cs_lock); 774 775 if (auth && stat != RPC_SUCCESS) 776 AUTH_VALIDATE(auth, xid, NULL, NULL); 777 778 free(cr, M_RPC); 779 780 return (stat); 781 } 782 783 static void 784 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 785 { 786 struct cu_data *cu = (struct cu_data *)cl->cl_private; 787 788 *errp = cu->cu_error; 789 } 790 791 static bool_t 792 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 793 { 794 XDR xdrs; 795 bool_t dummy; 796 797 xdrs.x_op = XDR_FREE; 798 dummy = (*xdr_res)(&xdrs, res_ptr); 799 800 return (dummy); 801 } 802 803 /*ARGSUSED*/ 804 static void 805 clnt_dg_abort(CLIENT *h) 806 { 807 } 808 809 static bool_t 810 clnt_dg_control(CLIENT *cl, u_int request, void *info) 811 { 812 struct cu_data *cu = (struct cu_data *)cl->cl_private; 813 struct cu_socket *cs; 814 struct sockaddr *addr; 815 816 cs = cu->cu_socket->so_rcv.sb_upcallarg; 817 mtx_lock(&cs->cs_lock); 818 819 switch (request) { 820 case CLSET_FD_CLOSE: 821 cu->cu_closeit = TRUE; 822 mtx_unlock(&cs->cs_lock); 823 return (TRUE); 824 case CLSET_FD_NCLOSE: 825 cu->cu_closeit = FALSE; 826 mtx_unlock(&cs->cs_lock); 827 return (TRUE); 828 } 829 830 /* for other requests which use info */ 831 if (info == NULL) { 832 mtx_unlock(&cs->cs_lock); 833 return (FALSE); 834 } 835 switch (request) { 836 case CLSET_TIMEOUT: 837 if (time_not_ok((struct timeval *)info)) { 838 mtx_unlock(&cs->cs_lock); 839 return (FALSE); 840 } 841 cu->cu_total = *(struct timeval *)info; 842 break; 843 case CLGET_TIMEOUT: 844 *(struct timeval *)info = cu->cu_total; 845 break; 846 case CLSET_RETRY_TIMEOUT: 847 if (time_not_ok((struct timeval *)info)) { 848 mtx_unlock(&cs->cs_lock); 849 return (FALSE); 850 } 851 cu->cu_wait = *(struct timeval *)info; 852 break; 853 case CLGET_RETRY_TIMEOUT: 854 *(struct timeval *)info = cu->cu_wait; 855 break; 856 case CLGET_SVC_ADDR: 857 /* 858 * Slightly different semantics to userland - we use 859 * sockaddr instead of netbuf. 860 */ 861 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 862 break; 863 case CLSET_SVC_ADDR: /* set to new address */ 864 addr = (struct sockaddr *)info; 865 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 866 break; 867 case CLGET_XID: 868 *(uint32_t *)info = cu->cu_xid; 869 break; 870 871 case CLSET_XID: 872 /* This will set the xid of the NEXT call */ 873 /* decrement by 1 as clnt_dg_call() increments once */ 874 cu->cu_xid = *(uint32_t *)info - 1; 875 break; 876 877 case CLGET_VERS: 878 /* 879 * This RELIES on the information that, in the call body, 880 * the version number field is the fifth field from the 881 * beginning of the RPC header. MUST be changed if the 882 * call_struct is changed 883 */ 884 *(uint32_t *)info = 885 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 886 4 * BYTES_PER_XDR_UNIT)); 887 break; 888 889 case CLSET_VERS: 890 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 891 = htonl(*(uint32_t *)info); 892 break; 893 894 case CLGET_PROG: 895 /* 896 * This RELIES on the information that, in the call body, 897 * the program number field is the fourth field from the 898 * beginning of the RPC header. MUST be changed if the 899 * call_struct is changed 900 */ 901 *(uint32_t *)info = 902 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 903 3 * BYTES_PER_XDR_UNIT)); 904 break; 905 906 case CLSET_PROG: 907 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 908 = htonl(*(uint32_t *)info); 909 break; 910 case CLSET_ASYNC: 911 cu->cu_async = *(int *)info; 912 break; 913 case CLSET_CONNECT: 914 cu->cu_connect = *(int *)info; 915 break; 916 case CLSET_WAITCHAN: 917 cu->cu_waitchan = (const char *)info; 918 break; 919 case CLGET_WAITCHAN: 920 *(const char **) info = cu->cu_waitchan; 921 break; 922 case CLSET_INTERRUPTIBLE: 923 if (*(int *) info) 924 cu->cu_waitflag = PCATCH; 925 else 926 cu->cu_waitflag = 0; 927 break; 928 case CLGET_INTERRUPTIBLE: 929 if (cu->cu_waitflag) 930 *(int *) info = TRUE; 931 else 932 *(int *) info = FALSE; 933 break; 934 default: 935 mtx_unlock(&cs->cs_lock); 936 return (FALSE); 937 } 938 mtx_unlock(&cs->cs_lock); 939 return (TRUE); 940 } 941 942 static void 943 clnt_dg_close(CLIENT *cl) 944 { 945 struct cu_data *cu = (struct cu_data *)cl->cl_private; 946 struct cu_socket *cs; 947 struct cu_request *cr; 948 949 cs = cu->cu_socket->so_rcv.sb_upcallarg; 950 mtx_lock(&cs->cs_lock); 951 952 if (cu->cu_closed) { 953 mtx_unlock(&cs->cs_lock); 954 return; 955 } 956 957 if (cu->cu_closing) { 958 while (cu->cu_closing) 959 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 960 KASSERT(cu->cu_closed, ("client should be closed")); 961 mtx_unlock(&cs->cs_lock); 962 return; 963 } 964 965 /* 966 * Abort any pending requests and wait until everyone 967 * has finished with clnt_vc_call. 968 */ 969 cu->cu_closing = TRUE; 970 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 971 if (cr->cr_client == cl) { 972 cr->cr_xid = 0; 973 cr->cr_error = ESHUTDOWN; 974 wakeup(cr); 975 } 976 } 977 978 while (cu->cu_threads) 979 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 980 981 cu->cu_closing = FALSE; 982 cu->cu_closed = TRUE; 983 984 mtx_unlock(&cs->cs_lock); 985 wakeup(cu); 986 } 987 988 static void 989 clnt_dg_destroy(CLIENT *cl) 990 { 991 struct cu_data *cu = (struct cu_data *)cl->cl_private; 992 struct cu_socket *cs; 993 struct socket *so = NULL; 994 bool_t lastsocketref; 995 996 cs = cu->cu_socket->so_rcv.sb_upcallarg; 997 clnt_dg_close(cl); 998 999 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 1000 mtx_lock(&cs->cs_lock); 1001 1002 cs->cs_refs--; 1003 if (cs->cs_refs == 0) { 1004 mtx_unlock(&cs->cs_lock); 1005 soupcall_clear(cu->cu_socket, SO_RCV); 1006 clnt_dg_upcallsdone(cu->cu_socket, cs); 1007 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1008 mtx_destroy(&cs->cs_lock); 1009 mem_free(cs, sizeof(*cs)); 1010 lastsocketref = TRUE; 1011 } else { 1012 mtx_unlock(&cs->cs_lock); 1013 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1014 lastsocketref = FALSE; 1015 } 1016 1017 if (cu->cu_closeit && lastsocketref) { 1018 so = cu->cu_socket; 1019 cu->cu_socket = NULL; 1020 } 1021 1022 if (so) 1023 soclose(so); 1024 1025 if (cl->cl_netid && cl->cl_netid[0]) 1026 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1027 if (cl->cl_tp && cl->cl_tp[0]) 1028 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1029 mem_free(cu, sizeof (*cu)); 1030 mem_free(cl, sizeof (CLIENT)); 1031 } 1032 1033 /* 1034 * Make sure that the time is not garbage. -1 value is allowed. 1035 */ 1036 static bool_t 1037 time_not_ok(struct timeval *t) 1038 { 1039 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1040 t->tv_usec < -1 || t->tv_usec > 1000000); 1041 } 1042 1043 int 1044 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1045 { 1046 struct cu_socket *cs = (struct cu_socket *) arg; 1047 struct uio uio; 1048 struct mbuf *m; 1049 struct mbuf *control; 1050 struct cu_request *cr; 1051 int error, rcvflag, foundreq; 1052 uint32_t xid; 1053 1054 cs->cs_upcallrefs++; 1055 uio.uio_resid = 1000000000; 1056 uio.uio_td = curthread; 1057 do { 1058 SOCKBUF_UNLOCK(&so->so_rcv); 1059 m = NULL; 1060 control = NULL; 1061 rcvflag = MSG_DONTWAIT; 1062 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1063 if (control) 1064 m_freem(control); 1065 SOCKBUF_LOCK(&so->so_rcv); 1066 1067 if (error == EWOULDBLOCK) 1068 break; 1069 1070 /* 1071 * If there was an error, wake up all pending 1072 * requests. 1073 */ 1074 if (error) { 1075 mtx_lock(&cs->cs_lock); 1076 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1077 cr->cr_xid = 0; 1078 cr->cr_error = error; 1079 wakeup(cr); 1080 } 1081 mtx_unlock(&cs->cs_lock); 1082 break; 1083 } 1084 1085 /* 1086 * The XID is in the first uint32_t of the reply. 1087 */ 1088 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) { 1089 /* 1090 * Should never happen. 1091 */ 1092 m_freem(m); 1093 continue; 1094 } 1095 1096 m_copydata(m, 0, sizeof(xid), (char *)&xid); 1097 xid = ntohl(xid); 1098 1099 /* 1100 * Attempt to match this reply with a pending request. 1101 */ 1102 mtx_lock(&cs->cs_lock); 1103 foundreq = 0; 1104 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1105 if (cr->cr_xid == xid) { 1106 /* 1107 * This one matches. We leave the 1108 * reply mbuf in cr->cr_mrep. Set the 1109 * XID to zero so that we will ignore 1110 * any duplicated replies that arrive 1111 * before clnt_dg_call removes it from 1112 * the queue. 1113 */ 1114 cr->cr_xid = 0; 1115 cr->cr_mrep = m; 1116 cr->cr_error = 0; 1117 foundreq = 1; 1118 wakeup(cr); 1119 break; 1120 } 1121 } 1122 mtx_unlock(&cs->cs_lock); 1123 1124 /* 1125 * If we didn't find the matching request, just drop 1126 * it - its probably a repeated reply. 1127 */ 1128 if (!foundreq) 1129 m_freem(m); 1130 } while (m); 1131 cs->cs_upcallrefs--; 1132 if (cs->cs_upcallrefs < 0) 1133 panic("rpcdg upcall refcnt"); 1134 if (cs->cs_upcallrefs == 0) 1135 wakeup(&cs->cs_upcallrefs); 1136 return (SU_OK); 1137 } 1138 1139 /* 1140 * Wait for all upcalls in progress to complete. 1141 */ 1142 static void 1143 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1144 { 1145 1146 SOCKBUF_LOCK_ASSERT(&so->so_rcv); 1147 1148 while (cs->cs_upcallrefs > 0) 1149 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1150 "rpcdgup", 0); 1151 } 1152