1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 /* 32 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 33 */ 34 35 #if defined(LIBC_SCCS) && !defined(lint) 36 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 38 #endif 39 #include <sys/cdefs.h> 40 __FBSDID("$FreeBSD$"); 41 42 /* 43 * Implements a connectionless client side RPC. 44 */ 45 46 #include <sys/param.h> 47 #include <sys/systm.h> 48 #include <sys/kernel.h> 49 #include <sys/lock.h> 50 #include <sys/malloc.h> 51 #include <sys/mbuf.h> 52 #include <sys/mutex.h> 53 #include <sys/pcpu.h> 54 #include <sys/proc.h> 55 #include <sys/socket.h> 56 #include <sys/socketvar.h> 57 #include <sys/time.h> 58 #include <sys/uio.h> 59 60 #include <rpc/rpc.h> 61 #include <rpc/rpc_com.h> 62 63 64 #ifdef _FREEFALL_CONFIG 65 /* 66 * Disable RPC exponential back-off for FreeBSD.org systems. 67 */ 68 #define RPC_MAX_BACKOFF 1 /* second */ 69 #else 70 #define RPC_MAX_BACKOFF 30 /* seconds */ 71 #endif 72 73 static bool_t time_not_ok(struct timeval *); 74 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 75 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 76 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 77 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 78 static void clnt_dg_abort(CLIENT *); 79 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 80 static void clnt_dg_close(CLIENT *); 81 static void clnt_dg_destroy(CLIENT *); 82 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 83 84 static struct clnt_ops clnt_dg_ops = { 85 .cl_call = clnt_dg_call, 86 .cl_abort = clnt_dg_abort, 87 .cl_geterr = clnt_dg_geterr, 88 .cl_freeres = clnt_dg_freeres, 89 .cl_close = clnt_dg_close, 90 .cl_destroy = clnt_dg_destroy, 91 .cl_control = clnt_dg_control 92 }; 93 94 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; 95 96 /* 97 * A pending RPC request which awaits a reply. Requests which have 98 * received their reply will have cr_xid set to zero and cr_mrep to 99 * the mbuf chain of the reply. 100 */ 101 struct cu_request { 102 TAILQ_ENTRY(cu_request) cr_link; 103 CLIENT *cr_client; /* owner */ 104 uint32_t cr_xid; /* XID of request */ 105 struct mbuf *cr_mrep; /* reply received by upcall */ 106 int cr_error; /* any error from upcall */ 107 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 108 }; 109 110 TAILQ_HEAD(cu_request_list, cu_request); 111 112 #define MCALL_MSG_SIZE 24 113 114 /* 115 * This structure is pointed to by the socket buffer's sb_upcallarg 116 * member. It is separate from the client private data to facilitate 117 * multiple clients sharing the same socket. The cs_lock mutex is used 118 * to protect all fields of this structure, the socket's receive 119 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 120 * structures is installed on the socket. 121 */ 122 struct cu_socket { 123 struct mtx cs_lock; 124 int cs_refs; /* Count of clients */ 125 struct cu_request_list cs_pending; /* Requests awaiting replies */ 126 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 127 }; 128 129 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 130 131 /* 132 * Private data kept per client handle 133 */ 134 struct cu_data { 135 int cu_threads; /* # threads in clnt_vc_call */ 136 bool_t cu_closing; /* TRUE if we are closing */ 137 bool_t cu_closed; /* TRUE if we are closed */ 138 struct socket *cu_socket; /* connection socket */ 139 bool_t cu_closeit; /* opened by library */ 140 struct sockaddr_storage cu_raddr; /* remote address */ 141 int cu_rlen; 142 struct timeval cu_wait; /* retransmit interval */ 143 struct timeval cu_total; /* total time for the call */ 144 struct rpc_err cu_error; 145 uint32_t cu_xid; 146 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 147 size_t cu_mcalllen; 148 size_t cu_sendsz; /* send size */ 149 size_t cu_recvsz; /* recv size */ 150 int cu_async; 151 int cu_connect; /* Use connect(). */ 152 int cu_connected; /* Have done connect(). */ 153 const char *cu_waitchan; 154 int cu_waitflag; 155 int cu_cwnd; /* congestion window */ 156 int cu_sent; /* number of in-flight RPCs */ 157 bool_t cu_cwnd_wait; 158 }; 159 160 #define CWNDSCALE 256 161 #define MAXCWND (32 * CWNDSCALE) 162 163 /* 164 * Connection less client creation returns with client handle parameters. 165 * Default options are set, which the user can change using clnt_control(). 166 * fd should be open and bound. 167 * NB: The rpch->cl_auth is initialized to null authentication. 168 * Caller may wish to set this something more useful. 169 * 170 * sendsz and recvsz are the maximum allowable packet sizes that can be 171 * sent and received. Normally they are the same, but they can be 172 * changed to improve the program efficiency and buffer allocation. 173 * If they are 0, use the transport default. 174 * 175 * If svcaddr is NULL, returns NULL. 176 */ 177 CLIENT * 178 clnt_dg_create( 179 struct socket *so, 180 struct sockaddr *svcaddr, /* servers address */ 181 rpcprog_t program, /* program number */ 182 rpcvers_t version, /* version number */ 183 size_t sendsz, /* buffer recv size */ 184 size_t recvsz) /* buffer send size */ 185 { 186 CLIENT *cl = NULL; /* client handle */ 187 struct cu_data *cu = NULL; /* private data */ 188 struct cu_socket *cs = NULL; 189 struct sockbuf *sb; 190 struct timeval now; 191 struct rpc_msg call_msg; 192 struct __rpc_sockinfo si; 193 XDR xdrs; 194 195 if (svcaddr == NULL) { 196 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 197 return (NULL); 198 } 199 200 if (!__rpc_socket2sockinfo(so, &si)) { 201 rpc_createerr.cf_stat = RPC_TLIERROR; 202 rpc_createerr.cf_error.re_errno = 0; 203 return (NULL); 204 } 205 206 /* 207 * Find the receive and the send size 208 */ 209 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 210 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 211 if ((sendsz == 0) || (recvsz == 0)) { 212 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 213 rpc_createerr.cf_error.re_errno = 0; 214 return (NULL); 215 } 216 217 cl = mem_alloc(sizeof (CLIENT)); 218 219 /* 220 * Should be multiple of 4 for XDR. 221 */ 222 sendsz = ((sendsz + 3) / 4) * 4; 223 recvsz = ((recvsz + 3) / 4) * 4; 224 cu = mem_alloc(sizeof (*cu)); 225 cu->cu_threads = 0; 226 cu->cu_closing = FALSE; 227 cu->cu_closed = FALSE; 228 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 229 cu->cu_rlen = svcaddr->sa_len; 230 /* Other values can also be set through clnt_control() */ 231 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 232 cu->cu_wait.tv_usec = 0; 233 cu->cu_total.tv_sec = -1; 234 cu->cu_total.tv_usec = -1; 235 cu->cu_sendsz = sendsz; 236 cu->cu_recvsz = recvsz; 237 cu->cu_async = FALSE; 238 cu->cu_connect = FALSE; 239 cu->cu_connected = FALSE; 240 cu->cu_waitchan = "rpcrecv"; 241 cu->cu_waitflag = 0; 242 cu->cu_cwnd = MAXCWND / 2; 243 cu->cu_sent = 0; 244 cu->cu_cwnd_wait = FALSE; 245 (void) getmicrotime(&now); 246 cu->cu_xid = __RPC_GETXID(&now); 247 call_msg.rm_xid = cu->cu_xid; 248 call_msg.rm_call.cb_prog = program; 249 call_msg.rm_call.cb_vers = version; 250 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 251 if (! xdr_callhdr(&xdrs, &call_msg)) { 252 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 253 rpc_createerr.cf_error.re_errno = 0; 254 goto err2; 255 } 256 cu->cu_mcalllen = XDR_GETPOS(&xdrs);; 257 258 /* 259 * By default, closeit is always FALSE. It is users responsibility 260 * to do a close on it, else the user may use clnt_control 261 * to let clnt_destroy do it for him/her. 262 */ 263 cu->cu_closeit = FALSE; 264 cu->cu_socket = so; 265 soreserve(so, 256*1024, 256*1024); 266 267 sb = &so->so_rcv; 268 SOCKBUF_LOCK(&so->so_rcv); 269 recheck_socket: 270 if (sb->sb_upcall) { 271 if (sb->sb_upcall != clnt_dg_soupcall) { 272 SOCKBUF_UNLOCK(&so->so_rcv); 273 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 274 goto err2; 275 } 276 cs = (struct cu_socket *) sb->sb_upcallarg; 277 mtx_lock(&cs->cs_lock); 278 cs->cs_refs++; 279 mtx_unlock(&cs->cs_lock); 280 } else { 281 /* 282 * We are the first on this socket - allocate the 283 * structure and install it in the socket. 284 */ 285 SOCKBUF_UNLOCK(&so->so_rcv); 286 cs = mem_alloc(sizeof(*cs)); 287 SOCKBUF_LOCK(&so->so_rcv); 288 if (sb->sb_upcall) { 289 /* 290 * We have lost a race with some other client. 291 */ 292 mem_free(cs, sizeof(*cs)); 293 goto recheck_socket; 294 } 295 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 296 cs->cs_refs = 1; 297 cs->cs_upcallrefs = 0; 298 TAILQ_INIT(&cs->cs_pending); 299 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 300 } 301 SOCKBUF_UNLOCK(&so->so_rcv); 302 303 cl->cl_refs = 1; 304 cl->cl_ops = &clnt_dg_ops; 305 cl->cl_private = (caddr_t)(void *)cu; 306 cl->cl_auth = authnone_create(); 307 cl->cl_tp = NULL; 308 cl->cl_netid = NULL; 309 return (cl); 310 err2: 311 if (cl) { 312 mem_free(cl, sizeof (CLIENT)); 313 if (cu) 314 mem_free(cu, sizeof (*cu)); 315 } 316 return (NULL); 317 } 318 319 static enum clnt_stat 320 clnt_dg_call( 321 CLIENT *cl, /* client handle */ 322 struct rpc_callextra *ext, /* call metadata */ 323 rpcproc_t proc, /* procedure number */ 324 struct mbuf *args, /* pointer to args */ 325 struct mbuf **resultsp, /* pointer to results */ 326 struct timeval utimeout) /* seconds to wait before giving up */ 327 { 328 struct cu_data *cu = (struct cu_data *)cl->cl_private; 329 struct cu_socket *cs; 330 struct rpc_timers *rt; 331 AUTH *auth; 332 struct rpc_err *errp; 333 enum clnt_stat stat; 334 XDR xdrs; 335 struct rpc_msg reply_msg; 336 bool_t ok; 337 int retrans; /* number of re-transmits so far */ 338 int nrefreshes = 2; /* number of times to refresh cred */ 339 struct timeval *tvp; 340 int timeout; 341 int retransmit_time; 342 int next_sendtime, starttime, rtt, time_waited, tv = 0; 343 struct sockaddr *sa; 344 socklen_t salen; 345 uint32_t xid = 0; 346 struct mbuf *mreq = NULL, *results; 347 struct cu_request *cr; 348 int error; 349 350 cs = cu->cu_socket->so_rcv.sb_upcallarg; 351 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 352 353 mtx_lock(&cs->cs_lock); 354 355 if (cu->cu_closing || cu->cu_closed) { 356 mtx_unlock(&cs->cs_lock); 357 free(cr, M_RPC); 358 return (RPC_CANTSEND); 359 } 360 cu->cu_threads++; 361 362 if (ext) { 363 auth = ext->rc_auth; 364 errp = &ext->rc_err; 365 } else { 366 auth = cl->cl_auth; 367 errp = &cu->cu_error; 368 } 369 370 cr->cr_client = cl; 371 cr->cr_mrep = NULL; 372 cr->cr_error = 0; 373 374 if (cu->cu_total.tv_usec == -1) { 375 tvp = &utimeout; /* use supplied timeout */ 376 } else { 377 tvp = &cu->cu_total; /* use default timeout */ 378 } 379 if (tvp->tv_sec || tvp->tv_usec) 380 timeout = tvtohz(tvp); 381 else 382 timeout = 0; 383 384 if (cu->cu_connect && !cu->cu_connected) { 385 mtx_unlock(&cs->cs_lock); 386 error = soconnect(cu->cu_socket, 387 (struct sockaddr *)&cu->cu_raddr, curthread); 388 mtx_lock(&cs->cs_lock); 389 if (error) { 390 errp->re_errno = error; 391 errp->re_status = stat = RPC_CANTSEND; 392 goto out; 393 } 394 cu->cu_connected = 1; 395 } 396 if (cu->cu_connected) { 397 sa = NULL; 398 salen = 0; 399 } else { 400 sa = (struct sockaddr *)&cu->cu_raddr; 401 salen = cu->cu_rlen; 402 } 403 time_waited = 0; 404 retrans = 0; 405 if (ext && ext->rc_timers) { 406 rt = ext->rc_timers; 407 if (!rt->rt_rtxcur) 408 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 409 retransmit_time = next_sendtime = rt->rt_rtxcur; 410 } else { 411 rt = NULL; 412 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 413 } 414 415 starttime = ticks; 416 417 call_again: 418 mtx_assert(&cs->cs_lock, MA_OWNED); 419 420 cu->cu_xid++; 421 xid = cu->cu_xid; 422 423 send_again: 424 mtx_unlock(&cs->cs_lock); 425 426 MGETHDR(mreq, M_WAIT, MT_DATA); 427 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 428 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 429 mreq->m_len = cu->cu_mcalllen; 430 431 /* 432 * The XID is the first thing in the request. 433 */ 434 *mtod(mreq, uint32_t *) = htonl(xid); 435 436 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 437 438 if (cu->cu_async == TRUE && args == NULL) 439 goto get_reply; 440 441 if ((! XDR_PUTINT32(&xdrs, &proc)) || 442 (! AUTH_MARSHALL(auth, xid, &xdrs, 443 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 444 errp->re_status = stat = RPC_CANTENCODEARGS; 445 mtx_lock(&cs->cs_lock); 446 goto out; 447 } 448 mreq->m_pkthdr.len = m_length(mreq, NULL); 449 450 cr->cr_xid = xid; 451 mtx_lock(&cs->cs_lock); 452 453 /* 454 * Try to get a place in the congestion window. 455 */ 456 while (cu->cu_sent >= cu->cu_cwnd) { 457 cu->cu_cwnd_wait = TRUE; 458 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 459 cu->cu_waitflag, "rpccwnd", 0); 460 if (error) { 461 errp->re_errno = error; 462 errp->re_status = stat = RPC_CANTSEND; 463 goto out; 464 } 465 } 466 cu->cu_sent += CWNDSCALE; 467 468 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 469 mtx_unlock(&cs->cs_lock); 470 471 /* 472 * sosend consumes mreq. 473 */ 474 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 475 mreq = NULL; 476 477 /* 478 * sub-optimal code appears here because we have 479 * some clock time to spare while the packets are in flight. 480 * (We assume that this is actually only executed once.) 481 */ 482 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 483 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 484 reply_msg.acpted_rply.ar_verf.oa_length = 0; 485 reply_msg.acpted_rply.ar_results.where = NULL; 486 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 487 488 mtx_lock(&cs->cs_lock); 489 if (error) { 490 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 491 errp->re_errno = error; 492 errp->re_status = stat = RPC_CANTSEND; 493 cu->cu_sent -= CWNDSCALE; 494 if (cu->cu_cwnd_wait) { 495 cu->cu_cwnd_wait = FALSE; 496 wakeup(&cu->cu_cwnd_wait); 497 } 498 goto out; 499 } 500 501 /* 502 * Check to see if we got an upcall while waiting for the 503 * lock. 504 */ 505 if (cr->cr_error) { 506 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 507 errp->re_errno = cr->cr_error; 508 errp->re_status = stat = RPC_CANTRECV; 509 cu->cu_sent -= CWNDSCALE; 510 if (cu->cu_cwnd_wait) { 511 cu->cu_cwnd_wait = FALSE; 512 wakeup(&cu->cu_cwnd_wait); 513 } 514 goto out; 515 } 516 if (cr->cr_mrep) { 517 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 518 cu->cu_sent -= CWNDSCALE; 519 if (cu->cu_cwnd_wait) { 520 cu->cu_cwnd_wait = FALSE; 521 wakeup(&cu->cu_cwnd_wait); 522 } 523 goto got_reply; 524 } 525 526 /* 527 * Hack to provide rpc-based message passing 528 */ 529 if (timeout == 0) { 530 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 531 errp->re_status = stat = RPC_TIMEDOUT; 532 cu->cu_sent -= CWNDSCALE; 533 if (cu->cu_cwnd_wait) { 534 cu->cu_cwnd_wait = FALSE; 535 wakeup(&cu->cu_cwnd_wait); 536 } 537 goto out; 538 } 539 540 get_reply: 541 for (;;) { 542 /* Decide how long to wait. */ 543 if (next_sendtime < timeout) 544 tv = next_sendtime; 545 else 546 tv = timeout; 547 tv -= time_waited; 548 549 if (tv > 0) { 550 if (cu->cu_closing || cu->cu_closed) { 551 error = 0; 552 cr->cr_error = ESHUTDOWN; 553 } else { 554 error = msleep(cr, &cs->cs_lock, 555 cu->cu_waitflag, cu->cu_waitchan, tv); 556 } 557 } else { 558 error = EWOULDBLOCK; 559 } 560 561 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 562 cu->cu_sent -= CWNDSCALE; 563 if (cu->cu_cwnd_wait) { 564 cu->cu_cwnd_wait = FALSE; 565 wakeup(&cu->cu_cwnd_wait); 566 } 567 568 if (!error) { 569 /* 570 * We were woken up by the upcall. If the 571 * upcall had a receive error, report that, 572 * otherwise we have a reply. 573 */ 574 if (cr->cr_error) { 575 errp->re_errno = cr->cr_error; 576 errp->re_status = stat = RPC_CANTRECV; 577 goto out; 578 } 579 580 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 581 + cu->cu_cwnd / 2) / cu->cu_cwnd; 582 if (cu->cu_cwnd > MAXCWND) 583 cu->cu_cwnd = MAXCWND; 584 585 if (rt) { 586 /* 587 * Add one to the time since a tick 588 * count of N means that the actual 589 * time taken was somewhere between N 590 * and N+1. 591 */ 592 rtt = ticks - starttime + 1; 593 594 /* 595 * Update our estimate of the round 596 * trip time using roughly the 597 * algorithm described in RFC 598 * 2988. Given an RTT sample R: 599 * 600 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 601 * SRTT = (1-alpha) * SRTT + alpha * R 602 * 603 * where alpha = 0.125 and beta = 0.25. 604 * 605 * The initial retransmit timeout is 606 * SRTT + 4*RTTVAR and doubles on each 607 * retransmision. 608 */ 609 if (rt->rt_srtt == 0) { 610 rt->rt_srtt = rtt; 611 rt->rt_deviate = rtt / 2; 612 } else { 613 int32_t error = rtt - rt->rt_srtt; 614 rt->rt_srtt += error / 8; 615 error = abs(error) - rt->rt_deviate; 616 rt->rt_deviate += error / 4; 617 } 618 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 619 } 620 621 break; 622 } 623 624 /* 625 * The sleep returned an error so our request is still 626 * on the list. If we got EWOULDBLOCK, we may want to 627 * re-send the request. 628 */ 629 if (error != EWOULDBLOCK) { 630 errp->re_errno = error; 631 if (error == EINTR) 632 errp->re_status = stat = RPC_INTR; 633 else 634 errp->re_status = stat = RPC_CANTRECV; 635 goto out; 636 } 637 638 time_waited = ticks - starttime; 639 640 /* Check for timeout. */ 641 if (time_waited > timeout) { 642 errp->re_errno = EWOULDBLOCK; 643 errp->re_status = stat = RPC_TIMEDOUT; 644 goto out; 645 } 646 647 /* Retransmit if necessary. */ 648 if (time_waited >= next_sendtime) { 649 cu->cu_cwnd /= 2; 650 if (cu->cu_cwnd < CWNDSCALE) 651 cu->cu_cwnd = CWNDSCALE; 652 if (ext && ext->rc_feedback) { 653 mtx_unlock(&cs->cs_lock); 654 if (retrans == 0) 655 ext->rc_feedback(FEEDBACK_REXMIT1, 656 proc, ext->rc_feedback_arg); 657 else 658 ext->rc_feedback(FEEDBACK_REXMIT2, 659 proc, ext->rc_feedback_arg); 660 mtx_lock(&cs->cs_lock); 661 } 662 if (cu->cu_closing || cu->cu_closed) { 663 errp->re_errno = ESHUTDOWN; 664 errp->re_status = stat = RPC_CANTRECV; 665 goto out; 666 } 667 retrans++; 668 /* update retransmit_time */ 669 if (retransmit_time < RPC_MAX_BACKOFF * hz) 670 retransmit_time = 2 * retransmit_time; 671 next_sendtime += retransmit_time; 672 goto send_again; 673 } 674 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 675 } 676 677 got_reply: 678 /* 679 * Now decode and validate the response. We need to drop the 680 * lock since xdr_replymsg may end up sleeping in malloc. 681 */ 682 mtx_unlock(&cs->cs_lock); 683 684 if (ext && ext->rc_feedback) 685 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 686 687 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 688 ok = xdr_replymsg(&xdrs, &reply_msg); 689 cr->cr_mrep = NULL; 690 691 if (ok) { 692 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 693 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 694 errp->re_status = stat = RPC_SUCCESS; 695 else 696 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 697 698 if (errp->re_status == RPC_SUCCESS) { 699 results = xdrmbuf_getall(&xdrs); 700 if (! AUTH_VALIDATE(auth, xid, 701 &reply_msg.acpted_rply.ar_verf, 702 &results)) { 703 errp->re_status = stat = RPC_AUTHERROR; 704 errp->re_why = AUTH_INVALIDRESP; 705 if (retrans && 706 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 707 /* 708 * If we retransmitted, its 709 * possible that we will 710 * receive a reply for one of 711 * the earlier transmissions 712 * (which will use an older 713 * RPCSEC_GSS sequence 714 * number). In this case, just 715 * go back and listen for a 716 * new reply. We could keep a 717 * record of all the seq 718 * numbers we have transmitted 719 * so far so that we could 720 * accept a reply for any of 721 * them here. 722 */ 723 XDR_DESTROY(&xdrs); 724 mtx_lock(&cs->cs_lock); 725 TAILQ_INSERT_TAIL(&cs->cs_pending, 726 cr, cr_link); 727 cr->cr_mrep = NULL; 728 goto get_reply; 729 } 730 } else { 731 *resultsp = results; 732 } 733 } /* end successful completion */ 734 /* 735 * If unsuccesful AND error is an authentication error 736 * then refresh credentials and try again, else break 737 */ 738 else if (stat == RPC_AUTHERROR) 739 /* maybe our credentials need to be refreshed ... */ 740 if (nrefreshes > 0 && 741 AUTH_REFRESH(auth, &reply_msg)) { 742 nrefreshes--; 743 XDR_DESTROY(&xdrs); 744 mtx_lock(&cs->cs_lock); 745 goto call_again; 746 } 747 /* end of unsuccessful completion */ 748 } /* end of valid reply message */ 749 else { 750 errp->re_status = stat = RPC_CANTDECODERES; 751 752 } 753 XDR_DESTROY(&xdrs); 754 mtx_lock(&cs->cs_lock); 755 out: 756 mtx_assert(&cs->cs_lock, MA_OWNED); 757 758 if (mreq) 759 m_freem(mreq); 760 if (cr->cr_mrep) 761 m_freem(cr->cr_mrep); 762 763 cu->cu_threads--; 764 if (cu->cu_closing) 765 wakeup(cu); 766 767 mtx_unlock(&cs->cs_lock); 768 769 if (auth && stat != RPC_SUCCESS) 770 AUTH_VALIDATE(auth, xid, NULL, NULL); 771 772 free(cr, M_RPC); 773 774 return (stat); 775 } 776 777 static void 778 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 779 { 780 struct cu_data *cu = (struct cu_data *)cl->cl_private; 781 782 *errp = cu->cu_error; 783 } 784 785 static bool_t 786 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 787 { 788 XDR xdrs; 789 bool_t dummy; 790 791 xdrs.x_op = XDR_FREE; 792 dummy = (*xdr_res)(&xdrs, res_ptr); 793 794 return (dummy); 795 } 796 797 /*ARGSUSED*/ 798 static void 799 clnt_dg_abort(CLIENT *h) 800 { 801 } 802 803 static bool_t 804 clnt_dg_control(CLIENT *cl, u_int request, void *info) 805 { 806 struct cu_data *cu = (struct cu_data *)cl->cl_private; 807 struct cu_socket *cs; 808 struct sockaddr *addr; 809 810 cs = cu->cu_socket->so_rcv.sb_upcallarg; 811 mtx_lock(&cs->cs_lock); 812 813 switch (request) { 814 case CLSET_FD_CLOSE: 815 cu->cu_closeit = TRUE; 816 mtx_unlock(&cs->cs_lock); 817 return (TRUE); 818 case CLSET_FD_NCLOSE: 819 cu->cu_closeit = FALSE; 820 mtx_unlock(&cs->cs_lock); 821 return (TRUE); 822 } 823 824 /* for other requests which use info */ 825 if (info == NULL) { 826 mtx_unlock(&cs->cs_lock); 827 return (FALSE); 828 } 829 switch (request) { 830 case CLSET_TIMEOUT: 831 if (time_not_ok((struct timeval *)info)) { 832 mtx_unlock(&cs->cs_lock); 833 return (FALSE); 834 } 835 cu->cu_total = *(struct timeval *)info; 836 break; 837 case CLGET_TIMEOUT: 838 *(struct timeval *)info = cu->cu_total; 839 break; 840 case CLSET_RETRY_TIMEOUT: 841 if (time_not_ok((struct timeval *)info)) { 842 mtx_unlock(&cs->cs_lock); 843 return (FALSE); 844 } 845 cu->cu_wait = *(struct timeval *)info; 846 break; 847 case CLGET_RETRY_TIMEOUT: 848 *(struct timeval *)info = cu->cu_wait; 849 break; 850 case CLGET_SVC_ADDR: 851 /* 852 * Slightly different semantics to userland - we use 853 * sockaddr instead of netbuf. 854 */ 855 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 856 break; 857 case CLSET_SVC_ADDR: /* set to new address */ 858 addr = (struct sockaddr *)info; 859 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 860 break; 861 case CLGET_XID: 862 *(uint32_t *)info = cu->cu_xid; 863 break; 864 865 case CLSET_XID: 866 /* This will set the xid of the NEXT call */ 867 /* decrement by 1 as clnt_dg_call() increments once */ 868 cu->cu_xid = *(uint32_t *)info - 1; 869 break; 870 871 case CLGET_VERS: 872 /* 873 * This RELIES on the information that, in the call body, 874 * the version number field is the fifth field from the 875 * begining of the RPC header. MUST be changed if the 876 * call_struct is changed 877 */ 878 *(uint32_t *)info = 879 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 880 4 * BYTES_PER_XDR_UNIT)); 881 break; 882 883 case CLSET_VERS: 884 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 885 = htonl(*(uint32_t *)info); 886 break; 887 888 case CLGET_PROG: 889 /* 890 * This RELIES on the information that, in the call body, 891 * the program number field is the fourth field from the 892 * begining of the RPC header. MUST be changed if the 893 * call_struct is changed 894 */ 895 *(uint32_t *)info = 896 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 897 3 * BYTES_PER_XDR_UNIT)); 898 break; 899 900 case CLSET_PROG: 901 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 902 = htonl(*(uint32_t *)info); 903 break; 904 case CLSET_ASYNC: 905 cu->cu_async = *(int *)info; 906 break; 907 case CLSET_CONNECT: 908 cu->cu_connect = *(int *)info; 909 break; 910 case CLSET_WAITCHAN: 911 cu->cu_waitchan = (const char *)info; 912 break; 913 case CLGET_WAITCHAN: 914 *(const char **) info = cu->cu_waitchan; 915 break; 916 case CLSET_INTERRUPTIBLE: 917 if (*(int *) info) 918 cu->cu_waitflag = PCATCH; 919 else 920 cu->cu_waitflag = 0; 921 break; 922 case CLGET_INTERRUPTIBLE: 923 if (cu->cu_waitflag) 924 *(int *) info = TRUE; 925 else 926 *(int *) info = FALSE; 927 break; 928 default: 929 mtx_unlock(&cs->cs_lock); 930 return (FALSE); 931 } 932 mtx_unlock(&cs->cs_lock); 933 return (TRUE); 934 } 935 936 static void 937 clnt_dg_close(CLIENT *cl) 938 { 939 struct cu_data *cu = (struct cu_data *)cl->cl_private; 940 struct cu_socket *cs; 941 struct cu_request *cr; 942 943 cs = cu->cu_socket->so_rcv.sb_upcallarg; 944 mtx_lock(&cs->cs_lock); 945 946 if (cu->cu_closed) { 947 mtx_unlock(&cs->cs_lock); 948 return; 949 } 950 951 if (cu->cu_closing) { 952 while (cu->cu_closing) 953 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 954 KASSERT(cu->cu_closed, ("client should be closed")); 955 mtx_unlock(&cs->cs_lock); 956 return; 957 } 958 959 /* 960 * Abort any pending requests and wait until everyone 961 * has finished with clnt_vc_call. 962 */ 963 cu->cu_closing = TRUE; 964 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 965 if (cr->cr_client == cl) { 966 cr->cr_xid = 0; 967 cr->cr_error = ESHUTDOWN; 968 wakeup(cr); 969 } 970 } 971 972 while (cu->cu_threads) 973 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 974 975 cu->cu_closing = FALSE; 976 cu->cu_closed = TRUE; 977 978 mtx_unlock(&cs->cs_lock); 979 wakeup(cu); 980 } 981 982 static void 983 clnt_dg_destroy(CLIENT *cl) 984 { 985 struct cu_data *cu = (struct cu_data *)cl->cl_private; 986 struct cu_socket *cs; 987 struct socket *so = NULL; 988 bool_t lastsocketref; 989 990 cs = cu->cu_socket->so_rcv.sb_upcallarg; 991 clnt_dg_close(cl); 992 993 mtx_lock(&cs->cs_lock); 994 995 cs->cs_refs--; 996 if (cs->cs_refs == 0) { 997 mtx_unlock(&cs->cs_lock); 998 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 999 soupcall_clear(cu->cu_socket, SO_RCV); 1000 clnt_dg_upcallsdone(cu->cu_socket, cs); 1001 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1002 mtx_destroy(&cs->cs_lock); 1003 mem_free(cs, sizeof(*cs)); 1004 lastsocketref = TRUE; 1005 } else { 1006 mtx_unlock(&cs->cs_lock); 1007 lastsocketref = FALSE; 1008 } 1009 1010 if (cu->cu_closeit && lastsocketref) { 1011 so = cu->cu_socket; 1012 cu->cu_socket = NULL; 1013 } 1014 1015 if (so) 1016 soclose(so); 1017 1018 if (cl->cl_netid && cl->cl_netid[0]) 1019 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1020 if (cl->cl_tp && cl->cl_tp[0]) 1021 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1022 mem_free(cu, sizeof (*cu)); 1023 mem_free(cl, sizeof (CLIENT)); 1024 } 1025 1026 /* 1027 * Make sure that the time is not garbage. -1 value is allowed. 1028 */ 1029 static bool_t 1030 time_not_ok(struct timeval *t) 1031 { 1032 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1033 t->tv_usec < -1 || t->tv_usec > 1000000); 1034 } 1035 1036 int 1037 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1038 { 1039 struct cu_socket *cs = (struct cu_socket *) arg; 1040 struct uio uio; 1041 struct mbuf *m; 1042 struct mbuf *control; 1043 struct cu_request *cr; 1044 int error, rcvflag, foundreq; 1045 uint32_t xid; 1046 1047 cs->cs_upcallrefs++; 1048 uio.uio_resid = 1000000000; 1049 uio.uio_td = curthread; 1050 do { 1051 SOCKBUF_UNLOCK(&so->so_rcv); 1052 m = NULL; 1053 control = NULL; 1054 rcvflag = MSG_DONTWAIT; 1055 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1056 if (control) 1057 m_freem(control); 1058 SOCKBUF_LOCK(&so->so_rcv); 1059 1060 if (error == EWOULDBLOCK) 1061 break; 1062 1063 /* 1064 * If there was an error, wake up all pending 1065 * requests. 1066 */ 1067 if (error) { 1068 mtx_lock(&cs->cs_lock); 1069 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1070 cr->cr_xid = 0; 1071 cr->cr_error = error; 1072 wakeup(cr); 1073 } 1074 mtx_unlock(&cs->cs_lock); 1075 break; 1076 } 1077 1078 /* 1079 * The XID is in the first uint32_t of the reply. 1080 */ 1081 if (m->m_len < sizeof(xid)) 1082 m = m_pullup(m, sizeof(xid)); 1083 if (!m) 1084 /* 1085 * Should never happen. 1086 */ 1087 continue; 1088 1089 xid = ntohl(*mtod(m, uint32_t *)); 1090 1091 /* 1092 * Attempt to match this reply with a pending request. 1093 */ 1094 mtx_lock(&cs->cs_lock); 1095 foundreq = 0; 1096 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1097 if (cr->cr_xid == xid) { 1098 /* 1099 * This one matches. We leave the 1100 * reply mbuf in cr->cr_mrep. Set the 1101 * XID to zero so that we will ignore 1102 * any duplicated replies that arrive 1103 * before clnt_dg_call removes it from 1104 * the queue. 1105 */ 1106 cr->cr_xid = 0; 1107 cr->cr_mrep = m; 1108 cr->cr_error = 0; 1109 foundreq = 1; 1110 wakeup(cr); 1111 break; 1112 } 1113 } 1114 mtx_unlock(&cs->cs_lock); 1115 1116 /* 1117 * If we didn't find the matching request, just drop 1118 * it - its probably a repeated reply. 1119 */ 1120 if (!foundreq) 1121 m_freem(m); 1122 } while (m); 1123 cs->cs_upcallrefs--; 1124 if (cs->cs_upcallrefs < 0) 1125 panic("rpcdg upcall refcnt"); 1126 if (cs->cs_upcallrefs == 0) 1127 wakeup(&cs->cs_upcallrefs); 1128 return (SU_OK); 1129 } 1130 1131 /* 1132 * Wait for all upcalls in progress to complete. 1133 */ 1134 static void 1135 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1136 { 1137 1138 SOCKBUF_LOCK_ASSERT(&so->so_rcv); 1139 1140 while (cs->cs_upcallrefs > 0) 1141 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1142 "rpcdgup", 0); 1143 } 1144