1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 /* 33 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 34 */ 35 36 /* 37 * Implements a connectionless client side RPC. 38 */ 39 40 #include <sys/param.h> 41 #include <sys/systm.h> 42 #include <sys/kernel.h> 43 #include <sys/lock.h> 44 #include <sys/malloc.h> 45 #include <sys/mbuf.h> 46 #include <sys/mutex.h> 47 #include <sys/pcpu.h> 48 #include <sys/proc.h> 49 #include <sys/socket.h> 50 #include <sys/socketvar.h> 51 #include <sys/time.h> 52 #include <sys/uio.h> 53 54 #include <net/vnet.h> 55 56 #include <rpc/rpc.h> 57 #include <rpc/rpc_com.h> 58 59 60 #ifdef _FREEFALL_CONFIG 61 /* 62 * Disable RPC exponential back-off for FreeBSD.org systems. 63 */ 64 #define RPC_MAX_BACKOFF 1 /* second */ 65 #else 66 #define RPC_MAX_BACKOFF 30 /* seconds */ 67 #endif 68 69 static bool_t time_not_ok(struct timeval *); 70 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 71 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 72 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 73 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 74 static void clnt_dg_abort(CLIENT *); 75 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 76 static void clnt_dg_close(CLIENT *); 77 static void clnt_dg_destroy(CLIENT *); 78 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 79 80 static const struct clnt_ops clnt_dg_ops = { 81 .cl_call = clnt_dg_call, 82 .cl_abort = clnt_dg_abort, 83 .cl_geterr = clnt_dg_geterr, 84 .cl_freeres = clnt_dg_freeres, 85 .cl_close = clnt_dg_close, 86 .cl_destroy = clnt_dg_destroy, 87 .cl_control = clnt_dg_control 88 }; 89 90 static volatile uint32_t rpc_xid = 0; 91 92 /* 93 * A pending RPC request which awaits a reply. Requests which have 94 * received their reply will have cr_xid set to zero and cr_mrep to 95 * the mbuf chain of the reply. 96 */ 97 struct cu_request { 98 TAILQ_ENTRY(cu_request) cr_link; 99 CLIENT *cr_client; /* owner */ 100 uint32_t cr_xid; /* XID of request */ 101 struct mbuf *cr_mrep; /* reply received by upcall */ 102 int cr_error; /* any error from upcall */ 103 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 104 }; 105 106 TAILQ_HEAD(cu_request_list, cu_request); 107 108 #define MCALL_MSG_SIZE 24 109 110 /* 111 * This structure is pointed to by the socket buffer's sb_upcallarg 112 * member. It is separate from the client private data to facilitate 113 * multiple clients sharing the same socket. The cs_lock mutex is used 114 * to protect all fields of this structure, the socket's receive 115 * buffer lock is used to ensure that exactly one of these 116 * structures is installed on the socket. 117 */ 118 struct cu_socket { 119 struct mtx cs_lock; 120 int cs_refs; /* Count of clients */ 121 struct cu_request_list cs_pending; /* Requests awaiting replies */ 122 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 123 }; 124 125 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 126 127 /* 128 * Private data kept per client handle 129 */ 130 struct cu_data { 131 int cu_threads; /* # threads in clnt_vc_call */ 132 bool_t cu_closing; /* TRUE if we are closing */ 133 bool_t cu_closed; /* TRUE if we are closed */ 134 struct socket *cu_socket; /* connection socket */ 135 bool_t cu_closeit; /* opened by library */ 136 struct sockaddr_storage cu_raddr; /* remote address */ 137 int cu_rlen; 138 struct timeval cu_wait; /* retransmit interval */ 139 struct timeval cu_total; /* total time for the call */ 140 struct rpc_err cu_error; 141 uint32_t cu_xid; 142 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 143 size_t cu_mcalllen; 144 size_t cu_sendsz; /* send size */ 145 size_t cu_recvsz; /* recv size */ 146 int cu_async; 147 int cu_connect; /* Use connect(). */ 148 int cu_connected; /* Have done connect(). */ 149 const char *cu_waitchan; 150 int cu_waitflag; 151 int cu_cwnd; /* congestion window */ 152 int cu_sent; /* number of in-flight RPCs */ 153 bool_t cu_cwnd_wait; 154 }; 155 156 #define CWNDSCALE 256 157 #define MAXCWND (32 * CWNDSCALE) 158 159 /* 160 * Connection less client creation returns with client handle parameters. 161 * Default options are set, which the user can change using clnt_control(). 162 * fd should be open and bound. 163 * NB: The rpch->cl_auth is initialized to null authentication. 164 * Caller may wish to set this something more useful. 165 * 166 * sendsz and recvsz are the maximum allowable packet sizes that can be 167 * sent and received. Normally they are the same, but they can be 168 * changed to improve the program efficiency and buffer allocation. 169 * If they are 0, use the transport default. 170 * 171 * If svcaddr is NULL, returns NULL. 172 */ 173 CLIENT * 174 clnt_dg_create( 175 struct socket *so, 176 struct sockaddr *svcaddr, /* servers address */ 177 rpcprog_t program, /* program number */ 178 rpcvers_t version, /* version number */ 179 size_t sendsz, /* buffer recv size */ 180 size_t recvsz) /* buffer send size */ 181 { 182 CLIENT *cl = NULL; /* client handle */ 183 struct cu_data *cu = NULL; /* private data */ 184 struct cu_socket *cs = NULL; 185 struct sockbuf *sb; 186 struct timeval now; 187 struct rpc_msg call_msg; 188 struct __rpc_sockinfo si; 189 XDR xdrs; 190 int error; 191 uint32_t newxid; 192 193 if (svcaddr == NULL) { 194 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 195 return (NULL); 196 } 197 198 if (!__rpc_socket2sockinfo(so, &si)) { 199 rpc_createerr.cf_stat = RPC_TLIERROR; 200 rpc_createerr.cf_error.re_errno = 0; 201 return (NULL); 202 } 203 204 /* 205 * Find the receive and the send size 206 */ 207 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 208 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 209 if ((sendsz == 0) || (recvsz == 0)) { 210 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 211 rpc_createerr.cf_error.re_errno = 0; 212 return (NULL); 213 } 214 215 cl = mem_alloc(sizeof (CLIENT)); 216 217 /* 218 * Should be multiple of 4 for XDR. 219 */ 220 sendsz = rounddown(sendsz + 3, 4); 221 recvsz = rounddown(recvsz + 3, 4); 222 cu = mem_alloc(sizeof (*cu)); 223 cu->cu_threads = 0; 224 cu->cu_closing = FALSE; 225 cu->cu_closed = FALSE; 226 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 227 cu->cu_rlen = svcaddr->sa_len; 228 /* Other values can also be set through clnt_control() */ 229 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 230 cu->cu_wait.tv_usec = 0; 231 cu->cu_total.tv_sec = -1; 232 cu->cu_total.tv_usec = -1; 233 cu->cu_sendsz = sendsz; 234 cu->cu_recvsz = recvsz; 235 cu->cu_async = FALSE; 236 cu->cu_connect = FALSE; 237 cu->cu_connected = FALSE; 238 cu->cu_waitchan = "rpcrecv"; 239 cu->cu_waitflag = 0; 240 cu->cu_cwnd = MAXCWND / 2; 241 cu->cu_sent = 0; 242 cu->cu_cwnd_wait = FALSE; 243 (void) getmicrotime(&now); 244 /* Clip at 28bits so that it will not wrap around. */ 245 newxid = __RPC_GETXID(&now) & 0xfffffff; 246 atomic_cmpset_32(&rpc_xid, 0, newxid); 247 call_msg.rm_xid = atomic_fetchadd_32(&rpc_xid, 1); 248 call_msg.rm_call.cb_prog = program; 249 call_msg.rm_call.cb_vers = version; 250 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 251 if (! xdr_callhdr(&xdrs, &call_msg)) { 252 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 253 rpc_createerr.cf_error.re_errno = 0; 254 goto err2; 255 } 256 cu->cu_mcalllen = XDR_GETPOS(&xdrs); 257 258 /* 259 * By default, closeit is always FALSE. It is users responsibility 260 * to do a close on it, else the user may use clnt_control 261 * to let clnt_destroy do it for him/her. 262 */ 263 cu->cu_closeit = FALSE; 264 cu->cu_socket = so; 265 error = soreserve(so, (u_long)sendsz, (u_long)recvsz); 266 if (error != 0) { 267 rpc_createerr.cf_stat = RPC_FAILED; 268 rpc_createerr.cf_error.re_errno = error; 269 goto err2; 270 } 271 272 sb = &so->so_rcv; 273 SOCK_RECVBUF_LOCK(so); 274 recheck_socket: 275 if (sb->sb_upcall) { 276 if (sb->sb_upcall != clnt_dg_soupcall) { 277 SOCK_RECVBUF_UNLOCK(so); 278 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 279 goto err2; 280 } 281 cs = (struct cu_socket *) sb->sb_upcallarg; 282 mtx_lock(&cs->cs_lock); 283 cs->cs_refs++; 284 mtx_unlock(&cs->cs_lock); 285 } else { 286 /* 287 * We are the first on this socket - allocate the 288 * structure and install it in the socket. 289 */ 290 SOCK_RECVBUF_UNLOCK(so); 291 cs = mem_alloc(sizeof(*cs)); 292 SOCK_RECVBUF_LOCK(so); 293 if (sb->sb_upcall) { 294 /* 295 * We have lost a race with some other client. 296 */ 297 mem_free(cs, sizeof(*cs)); 298 goto recheck_socket; 299 } 300 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 301 cs->cs_refs = 1; 302 cs->cs_upcallrefs = 0; 303 TAILQ_INIT(&cs->cs_pending); 304 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 305 } 306 SOCK_RECVBUF_UNLOCK(so); 307 308 cl->cl_refs = 1; 309 cl->cl_ops = &clnt_dg_ops; 310 cl->cl_private = (caddr_t)(void *)cu; 311 cl->cl_auth = authnone_create(); 312 cl->cl_tp = NULL; 313 cl->cl_netid = NULL; 314 return (cl); 315 err2: 316 mem_free(cl, sizeof (CLIENT)); 317 mem_free(cu, sizeof (*cu)); 318 319 return (NULL); 320 } 321 322 static enum clnt_stat 323 clnt_dg_call( 324 CLIENT *cl, /* client handle */ 325 struct rpc_callextra *ext, /* call metadata */ 326 rpcproc_t proc, /* procedure number */ 327 struct mbuf *args, /* pointer to args */ 328 struct mbuf **resultsp, /* pointer to results */ 329 struct timeval utimeout) /* seconds to wait before giving up */ 330 { 331 struct cu_data *cu = (struct cu_data *)cl->cl_private; 332 struct cu_socket *cs; 333 struct rpc_timers *rt; 334 AUTH *auth; 335 struct rpc_err *errp; 336 enum clnt_stat stat; 337 XDR xdrs; 338 struct rpc_msg reply_msg; 339 bool_t ok; 340 int retrans; /* number of re-transmits so far */ 341 int nrefreshes = 2; /* number of times to refresh cred */ 342 struct timeval *tvp; 343 int timeout; 344 int retransmit_time; 345 int next_sendtime, starttime, rtt, time_waited, tv = 0; 346 struct sockaddr *sa; 347 uint32_t xid = 0; 348 struct mbuf *mreq = NULL, *results; 349 struct cu_request *cr; 350 int error; 351 352 cs = cu->cu_socket->so_rcv.sb_upcallarg; 353 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 354 355 mtx_lock(&cs->cs_lock); 356 357 if (cu->cu_closing || cu->cu_closed) { 358 mtx_unlock(&cs->cs_lock); 359 free(cr, M_RPC); 360 return (RPC_CANTSEND); 361 } 362 cu->cu_threads++; 363 364 if (ext) { 365 auth = ext->rc_auth; 366 errp = &ext->rc_err; 367 } else { 368 auth = cl->cl_auth; 369 errp = &cu->cu_error; 370 } 371 372 cr->cr_client = cl; 373 cr->cr_mrep = NULL; 374 cr->cr_error = 0; 375 376 if (cu->cu_total.tv_usec == -1) { 377 tvp = &utimeout; /* use supplied timeout */ 378 } else { 379 tvp = &cu->cu_total; /* use default timeout */ 380 } 381 if (tvp->tv_sec || tvp->tv_usec) 382 timeout = tvtohz(tvp); 383 else 384 timeout = 0; 385 386 if (cu->cu_connect && !cu->cu_connected) { 387 mtx_unlock(&cs->cs_lock); 388 error = soconnect(cu->cu_socket, 389 (struct sockaddr *)&cu->cu_raddr, curthread); 390 mtx_lock(&cs->cs_lock); 391 if (error) { 392 errp->re_errno = error; 393 errp->re_status = stat = RPC_CANTSEND; 394 goto out; 395 } 396 cu->cu_connected = 1; 397 } 398 if (cu->cu_connected) 399 sa = NULL; 400 else 401 sa = (struct sockaddr *)&cu->cu_raddr; 402 time_waited = 0; 403 retrans = 0; 404 if (ext && ext->rc_timers) { 405 rt = ext->rc_timers; 406 if (!rt->rt_rtxcur) 407 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 408 retransmit_time = next_sendtime = rt->rt_rtxcur; 409 } else { 410 rt = NULL; 411 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 412 } 413 414 starttime = ticks; 415 416 call_again: 417 mtx_assert(&cs->cs_lock, MA_OWNED); 418 419 xid = atomic_fetchadd_32(&rpc_xid, 1); 420 421 send_again: 422 mtx_unlock(&cs->cs_lock); 423 424 mreq = m_gethdr(M_WAITOK, MT_DATA); 425 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 426 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 427 mreq->m_len = cu->cu_mcalllen; 428 429 /* 430 * The XID is the first thing in the request. 431 */ 432 *mtod(mreq, uint32_t *) = htonl(xid); 433 434 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 435 436 if (cu->cu_async == TRUE && args == NULL) 437 goto get_reply; 438 439 if ((! XDR_PUTINT32(&xdrs, &proc)) || 440 (! AUTH_MARSHALL(auth, xid, &xdrs, 441 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 442 errp->re_status = stat = RPC_CANTENCODEARGS; 443 mtx_lock(&cs->cs_lock); 444 goto out; 445 } 446 mreq->m_pkthdr.len = m_length(mreq, NULL); 447 448 cr->cr_xid = xid; 449 mtx_lock(&cs->cs_lock); 450 451 /* 452 * Try to get a place in the congestion window. 453 */ 454 while (cu->cu_sent >= cu->cu_cwnd) { 455 cu->cu_cwnd_wait = TRUE; 456 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 457 cu->cu_waitflag, "rpccwnd", 0); 458 if (error) { 459 errp->re_errno = error; 460 if (error == EINTR || error == ERESTART) 461 errp->re_status = stat = RPC_INTR; 462 else 463 errp->re_status = stat = RPC_CANTSEND; 464 goto out; 465 } 466 } 467 cu->cu_sent += CWNDSCALE; 468 469 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 470 mtx_unlock(&cs->cs_lock); 471 472 /* 473 * sosend consumes mreq. 474 */ 475 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 476 mreq = NULL; 477 478 /* 479 * sub-optimal code appears here because we have 480 * some clock time to spare while the packets are in flight. 481 * (We assume that this is actually only executed once.) 482 */ 483 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 484 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 485 reply_msg.acpted_rply.ar_verf.oa_length = 0; 486 reply_msg.acpted_rply.ar_results.where = NULL; 487 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 488 489 mtx_lock(&cs->cs_lock); 490 if (error) { 491 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 492 errp->re_errno = error; 493 errp->re_status = stat = RPC_CANTSEND; 494 cu->cu_sent -= CWNDSCALE; 495 if (cu->cu_cwnd_wait) { 496 cu->cu_cwnd_wait = FALSE; 497 wakeup(&cu->cu_cwnd_wait); 498 } 499 goto out; 500 } 501 502 /* 503 * Check to see if we got an upcall while waiting for the 504 * lock. 505 */ 506 if (cr->cr_error) { 507 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 508 errp->re_errno = cr->cr_error; 509 errp->re_status = stat = RPC_CANTRECV; 510 cu->cu_sent -= CWNDSCALE; 511 if (cu->cu_cwnd_wait) { 512 cu->cu_cwnd_wait = FALSE; 513 wakeup(&cu->cu_cwnd_wait); 514 } 515 goto out; 516 } 517 if (cr->cr_mrep) { 518 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 519 cu->cu_sent -= CWNDSCALE; 520 if (cu->cu_cwnd_wait) { 521 cu->cu_cwnd_wait = FALSE; 522 wakeup(&cu->cu_cwnd_wait); 523 } 524 goto got_reply; 525 } 526 527 /* 528 * Hack to provide rpc-based message passing 529 */ 530 if (timeout == 0) { 531 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 532 errp->re_status = stat = RPC_TIMEDOUT; 533 cu->cu_sent -= CWNDSCALE; 534 if (cu->cu_cwnd_wait) { 535 cu->cu_cwnd_wait = FALSE; 536 wakeup(&cu->cu_cwnd_wait); 537 } 538 goto out; 539 } 540 541 get_reply: 542 for (;;) { 543 /* Decide how long to wait. */ 544 if (next_sendtime < timeout) 545 tv = next_sendtime; 546 else 547 tv = timeout; 548 tv -= time_waited; 549 550 if (tv > 0) { 551 if (cu->cu_closing || cu->cu_closed) { 552 error = 0; 553 cr->cr_error = ESHUTDOWN; 554 } else { 555 error = msleep(cr, &cs->cs_lock, 556 cu->cu_waitflag, cu->cu_waitchan, tv); 557 } 558 } else { 559 error = EWOULDBLOCK; 560 } 561 562 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 563 cu->cu_sent -= CWNDSCALE; 564 if (cu->cu_cwnd_wait) { 565 cu->cu_cwnd_wait = FALSE; 566 wakeup(&cu->cu_cwnd_wait); 567 } 568 569 if (!error) { 570 /* 571 * We were woken up by the upcall. If the 572 * upcall had a receive error, report that, 573 * otherwise we have a reply. 574 */ 575 if (cr->cr_error) { 576 errp->re_errno = cr->cr_error; 577 errp->re_status = stat = RPC_CANTRECV; 578 goto out; 579 } 580 581 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 582 + cu->cu_cwnd / 2) / cu->cu_cwnd; 583 if (cu->cu_cwnd > MAXCWND) 584 cu->cu_cwnd = MAXCWND; 585 586 if (rt) { 587 /* 588 * Add one to the time since a tick 589 * count of N means that the actual 590 * time taken was somewhere between N 591 * and N+1. 592 */ 593 rtt = ticks - starttime + 1; 594 595 /* 596 * Update our estimate of the round 597 * trip time using roughly the 598 * algorithm described in RFC 599 * 2988. Given an RTT sample R: 600 * 601 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 602 * SRTT = (1-alpha) * SRTT + alpha * R 603 * 604 * where alpha = 0.125 and beta = 0.25. 605 * 606 * The initial retransmit timeout is 607 * SRTT + 4*RTTVAR and doubles on each 608 * retransmision. 609 */ 610 if (rt->rt_srtt == 0) { 611 rt->rt_srtt = rtt; 612 rt->rt_deviate = rtt / 2; 613 } else { 614 int32_t error = rtt - rt->rt_srtt; 615 rt->rt_srtt += error / 8; 616 error = abs(error) - rt->rt_deviate; 617 rt->rt_deviate += error / 4; 618 } 619 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 620 } 621 622 break; 623 } 624 625 /* 626 * The sleep returned an error so our request is still 627 * on the list. If we got EWOULDBLOCK, we may want to 628 * re-send the request. 629 */ 630 if (error != EWOULDBLOCK) { 631 errp->re_errno = error; 632 if (error == EINTR || error == ERESTART) 633 errp->re_status = stat = RPC_INTR; 634 else 635 errp->re_status = stat = RPC_CANTRECV; 636 goto out; 637 } 638 639 time_waited = ticks - starttime; 640 641 /* Check for timeout. */ 642 if (time_waited > timeout) { 643 errp->re_errno = EWOULDBLOCK; 644 errp->re_status = stat = RPC_TIMEDOUT; 645 goto out; 646 } 647 648 /* Retransmit if necessary. */ 649 if (time_waited >= next_sendtime) { 650 cu->cu_cwnd /= 2; 651 if (cu->cu_cwnd < CWNDSCALE) 652 cu->cu_cwnd = CWNDSCALE; 653 if (ext && ext->rc_feedback) { 654 mtx_unlock(&cs->cs_lock); 655 if (retrans == 0) 656 ext->rc_feedback(FEEDBACK_REXMIT1, 657 proc, ext->rc_feedback_arg); 658 else 659 ext->rc_feedback(FEEDBACK_REXMIT2, 660 proc, ext->rc_feedback_arg); 661 mtx_lock(&cs->cs_lock); 662 } 663 if (cu->cu_closing || cu->cu_closed) { 664 errp->re_errno = ESHUTDOWN; 665 errp->re_status = stat = RPC_CANTRECV; 666 goto out; 667 } 668 retrans++; 669 /* update retransmit_time */ 670 if (retransmit_time < RPC_MAX_BACKOFF * hz) 671 retransmit_time = 2 * retransmit_time; 672 next_sendtime += retransmit_time; 673 goto send_again; 674 } 675 cu->cu_sent += CWNDSCALE; 676 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 677 } 678 679 got_reply: 680 /* 681 * Now decode and validate the response. We need to drop the 682 * lock since xdr_replymsg may end up sleeping in malloc. 683 */ 684 mtx_unlock(&cs->cs_lock); 685 686 if (ext && ext->rc_feedback) 687 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 688 689 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 690 ok = xdr_replymsg(&xdrs, &reply_msg); 691 cr->cr_mrep = NULL; 692 693 if (ok) { 694 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 695 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 696 errp->re_status = stat = RPC_SUCCESS; 697 else 698 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 699 700 if (errp->re_status == RPC_SUCCESS) { 701 results = xdrmbuf_getall(&xdrs); 702 if (! AUTH_VALIDATE(auth, xid, 703 &reply_msg.acpted_rply.ar_verf, 704 &results)) { 705 errp->re_status = stat = RPC_AUTHERROR; 706 errp->re_why = AUTH_INVALIDRESP; 707 if (retrans && 708 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 709 /* 710 * If we retransmitted, its 711 * possible that we will 712 * receive a reply for one of 713 * the earlier transmissions 714 * (which will use an older 715 * RPCSEC_GSS sequence 716 * number). In this case, just 717 * go back and listen for a 718 * new reply. We could keep a 719 * record of all the seq 720 * numbers we have transmitted 721 * so far so that we could 722 * accept a reply for any of 723 * them here. 724 */ 725 XDR_DESTROY(&xdrs); 726 mtx_lock(&cs->cs_lock); 727 cu->cu_sent += CWNDSCALE; 728 TAILQ_INSERT_TAIL(&cs->cs_pending, 729 cr, cr_link); 730 cr->cr_mrep = NULL; 731 goto get_reply; 732 } 733 } else { 734 *resultsp = results; 735 } 736 } /* end successful completion */ 737 /* 738 * If unsuccessful AND error is an authentication error 739 * then refresh credentials and try again, else break 740 */ 741 else if (stat == RPC_AUTHERROR) 742 /* maybe our credentials need to be refreshed ... */ 743 if (nrefreshes > 0 && 744 AUTH_REFRESH(auth, &reply_msg)) { 745 nrefreshes--; 746 XDR_DESTROY(&xdrs); 747 mtx_lock(&cs->cs_lock); 748 goto call_again; 749 } 750 /* end of unsuccessful completion */ 751 } /* end of valid reply message */ 752 else { 753 errp->re_status = stat = RPC_CANTDECODERES; 754 755 } 756 XDR_DESTROY(&xdrs); 757 mtx_lock(&cs->cs_lock); 758 out: 759 mtx_assert(&cs->cs_lock, MA_OWNED); 760 761 if (mreq) 762 m_freem(mreq); 763 if (cr->cr_mrep) 764 m_freem(cr->cr_mrep); 765 766 cu->cu_threads--; 767 if (cu->cu_closing) 768 wakeup(cu); 769 770 mtx_unlock(&cs->cs_lock); 771 772 if (auth && stat != RPC_SUCCESS) 773 AUTH_VALIDATE(auth, xid, NULL, NULL); 774 775 free(cr, M_RPC); 776 777 return (stat); 778 } 779 780 static void 781 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 782 { 783 struct cu_data *cu = (struct cu_data *)cl->cl_private; 784 785 *errp = cu->cu_error; 786 } 787 788 static bool_t 789 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 790 { 791 XDR xdrs; 792 bool_t dummy; 793 794 xdrs.x_op = XDR_FREE; 795 dummy = (*xdr_res)(&xdrs, res_ptr); 796 797 return (dummy); 798 } 799 800 /*ARGSUSED*/ 801 static void 802 clnt_dg_abort(CLIENT *h) 803 { 804 } 805 806 static bool_t 807 clnt_dg_control(CLIENT *cl, u_int request, void *info) 808 { 809 struct cu_data *cu = (struct cu_data *)cl->cl_private; 810 struct cu_socket *cs; 811 struct sockaddr *addr; 812 813 cs = cu->cu_socket->so_rcv.sb_upcallarg; 814 mtx_lock(&cs->cs_lock); 815 816 switch (request) { 817 case CLSET_FD_CLOSE: 818 cu->cu_closeit = TRUE; 819 mtx_unlock(&cs->cs_lock); 820 return (TRUE); 821 case CLSET_FD_NCLOSE: 822 cu->cu_closeit = FALSE; 823 mtx_unlock(&cs->cs_lock); 824 return (TRUE); 825 } 826 827 /* for other requests which use info */ 828 if (info == NULL) { 829 mtx_unlock(&cs->cs_lock); 830 return (FALSE); 831 } 832 switch (request) { 833 case CLSET_TIMEOUT: 834 if (time_not_ok((struct timeval *)info)) { 835 mtx_unlock(&cs->cs_lock); 836 return (FALSE); 837 } 838 cu->cu_total = *(struct timeval *)info; 839 break; 840 case CLGET_TIMEOUT: 841 *(struct timeval *)info = cu->cu_total; 842 break; 843 case CLSET_RETRY_TIMEOUT: 844 if (time_not_ok((struct timeval *)info)) { 845 mtx_unlock(&cs->cs_lock); 846 return (FALSE); 847 } 848 cu->cu_wait = *(struct timeval *)info; 849 break; 850 case CLGET_RETRY_TIMEOUT: 851 *(struct timeval *)info = cu->cu_wait; 852 break; 853 case CLGET_SVC_ADDR: 854 /* 855 * Slightly different semantics to userland - we use 856 * sockaddr instead of netbuf. 857 */ 858 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 859 break; 860 case CLSET_SVC_ADDR: /* set to new address */ 861 addr = (struct sockaddr *)info; 862 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 863 break; 864 case CLGET_XID: 865 *(uint32_t *)info = atomic_load_32(&rpc_xid); 866 break; 867 868 case CLSET_XID: 869 /* This will set the xid of the NEXT call */ 870 /* decrement by 1 as clnt_dg_call() increments once */ 871 atomic_store_32(&rpc_xid, *(uint32_t *)info - 1); 872 break; 873 874 case CLGET_VERS: 875 /* 876 * This RELIES on the information that, in the call body, 877 * the version number field is the fifth field from the 878 * beginning of the RPC header. MUST be changed if the 879 * call_struct is changed 880 */ 881 *(uint32_t *)info = 882 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 883 4 * BYTES_PER_XDR_UNIT)); 884 break; 885 886 case CLSET_VERS: 887 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 888 = htonl(*(uint32_t *)info); 889 break; 890 891 case CLGET_PROG: 892 /* 893 * This RELIES on the information that, in the call body, 894 * the program number field is the fourth field from the 895 * beginning of the RPC header. MUST be changed if the 896 * call_struct is changed 897 */ 898 *(uint32_t *)info = 899 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 900 3 * BYTES_PER_XDR_UNIT)); 901 break; 902 903 case CLSET_PROG: 904 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 905 = htonl(*(uint32_t *)info); 906 break; 907 case CLSET_ASYNC: 908 cu->cu_async = *(int *)info; 909 break; 910 case CLSET_CONNECT: 911 cu->cu_connect = *(int *)info; 912 break; 913 case CLSET_WAITCHAN: 914 cu->cu_waitchan = (const char *)info; 915 break; 916 case CLGET_WAITCHAN: 917 *(const char **) info = cu->cu_waitchan; 918 break; 919 case CLSET_INTERRUPTIBLE: 920 if (*(int *) info) 921 cu->cu_waitflag = PCATCH; 922 else 923 cu->cu_waitflag = 0; 924 break; 925 case CLGET_INTERRUPTIBLE: 926 if (cu->cu_waitflag) 927 *(int *) info = TRUE; 928 else 929 *(int *) info = FALSE; 930 break; 931 default: 932 mtx_unlock(&cs->cs_lock); 933 return (FALSE); 934 } 935 mtx_unlock(&cs->cs_lock); 936 return (TRUE); 937 } 938 939 static void 940 clnt_dg_close(CLIENT *cl) 941 { 942 struct cu_data *cu = (struct cu_data *)cl->cl_private; 943 struct cu_socket *cs; 944 struct cu_request *cr; 945 946 cs = cu->cu_socket->so_rcv.sb_upcallarg; 947 mtx_lock(&cs->cs_lock); 948 949 if (cu->cu_closed) { 950 mtx_unlock(&cs->cs_lock); 951 return; 952 } 953 954 if (cu->cu_closing) { 955 while (cu->cu_closing) 956 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 957 KASSERT(cu->cu_closed, ("client should be closed")); 958 mtx_unlock(&cs->cs_lock); 959 return; 960 } 961 962 /* 963 * Abort any pending requests and wait until everyone 964 * has finished with clnt_vc_call. 965 */ 966 cu->cu_closing = TRUE; 967 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 968 if (cr->cr_client == cl) { 969 cr->cr_xid = 0; 970 cr->cr_error = ESHUTDOWN; 971 wakeup(cr); 972 } 973 } 974 975 while (cu->cu_threads) 976 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 977 978 cu->cu_closing = FALSE; 979 cu->cu_closed = TRUE; 980 981 mtx_unlock(&cs->cs_lock); 982 wakeup(cu); 983 } 984 985 static void 986 clnt_dg_destroy(CLIENT *cl) 987 { 988 struct cu_data *cu = (struct cu_data *)cl->cl_private; 989 struct cu_socket *cs; 990 struct socket *so = NULL; 991 bool_t lastsocketref; 992 993 cs = cu->cu_socket->so_rcv.sb_upcallarg; 994 clnt_dg_close(cl); 995 996 SOCK_RECVBUF_LOCK(cu->cu_socket); 997 mtx_lock(&cs->cs_lock); 998 999 cs->cs_refs--; 1000 if (cs->cs_refs == 0) { 1001 mtx_unlock(&cs->cs_lock); 1002 soupcall_clear(cu->cu_socket, SO_RCV); 1003 clnt_dg_upcallsdone(cu->cu_socket, cs); 1004 SOCK_RECVBUF_UNLOCK(cu->cu_socket); 1005 mtx_destroy(&cs->cs_lock); 1006 mem_free(cs, sizeof(*cs)); 1007 lastsocketref = TRUE; 1008 } else { 1009 mtx_unlock(&cs->cs_lock); 1010 SOCK_RECVBUF_UNLOCK(cu->cu_socket); 1011 lastsocketref = FALSE; 1012 } 1013 1014 if (cu->cu_closeit && lastsocketref) { 1015 so = cu->cu_socket; 1016 cu->cu_socket = NULL; 1017 } 1018 1019 if (so) 1020 soclose(so); 1021 1022 if (cl->cl_netid && cl->cl_netid[0]) 1023 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1024 if (cl->cl_tp && cl->cl_tp[0]) 1025 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1026 mem_free(cu, sizeof (*cu)); 1027 mem_free(cl, sizeof (CLIENT)); 1028 } 1029 1030 /* 1031 * Make sure that the time is not garbage. -1 value is allowed. 1032 */ 1033 static bool_t 1034 time_not_ok(struct timeval *t) 1035 { 1036 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1037 t->tv_usec < -1 || t->tv_usec > 1000000); 1038 } 1039 1040 int 1041 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1042 { 1043 struct cu_socket *cs = (struct cu_socket *) arg; 1044 struct uio uio; 1045 struct mbuf *m; 1046 struct mbuf *control; 1047 struct cu_request *cr; 1048 int error, rcvflag, foundreq; 1049 uint32_t xid; 1050 1051 cs->cs_upcallrefs++; 1052 uio.uio_resid = 1000000000; 1053 uio.uio_td = curthread; 1054 do { 1055 SOCK_RECVBUF_UNLOCK(so); 1056 m = NULL; 1057 control = NULL; 1058 rcvflag = MSG_DONTWAIT; 1059 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1060 if (control) 1061 m_freem(control); 1062 SOCK_RECVBUF_LOCK(so); 1063 1064 if (error == EWOULDBLOCK) 1065 break; 1066 1067 /* 1068 * If there was an error, wake up all pending 1069 * requests. 1070 */ 1071 if (error) { 1072 mtx_lock(&cs->cs_lock); 1073 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1074 cr->cr_xid = 0; 1075 cr->cr_error = error; 1076 wakeup(cr); 1077 } 1078 mtx_unlock(&cs->cs_lock); 1079 break; 1080 } 1081 1082 /* 1083 * The XID is in the first uint32_t of the reply. 1084 */ 1085 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) { 1086 /* 1087 * Should never happen. 1088 */ 1089 m_freem(m); 1090 continue; 1091 } 1092 1093 m_copydata(m, 0, sizeof(xid), (char *)&xid); 1094 xid = ntohl(xid); 1095 1096 /* 1097 * Attempt to match this reply with a pending request. 1098 */ 1099 mtx_lock(&cs->cs_lock); 1100 foundreq = 0; 1101 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1102 if (cr->cr_xid == xid) { 1103 /* 1104 * This one matches. We leave the 1105 * reply mbuf in cr->cr_mrep. Set the 1106 * XID to zero so that we will ignore 1107 * any duplicated replies that arrive 1108 * before clnt_dg_call removes it from 1109 * the queue. 1110 */ 1111 cr->cr_xid = 0; 1112 cr->cr_mrep = m; 1113 cr->cr_error = 0; 1114 foundreq = 1; 1115 wakeup(cr); 1116 break; 1117 } 1118 } 1119 mtx_unlock(&cs->cs_lock); 1120 1121 /* 1122 * If we didn't find the matching request, just drop 1123 * it - its probably a repeated reply. 1124 */ 1125 if (!foundreq) 1126 m_freem(m); 1127 } while (m); 1128 cs->cs_upcallrefs--; 1129 if (cs->cs_upcallrefs < 0) 1130 panic("rpcdg upcall refcnt"); 1131 if (cs->cs_upcallrefs == 0) 1132 wakeup(&cs->cs_upcallrefs); 1133 return (SU_OK); 1134 } 1135 1136 /* 1137 * Wait for all upcalls in progress to complete. 1138 */ 1139 static void 1140 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1141 { 1142 1143 SOCK_RECVBUF_LOCK_ASSERT(so); 1144 1145 while (cs->cs_upcallrefs > 0) 1146 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1147 "rpcdgup", 0); 1148 } 1149