1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 /* 32 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 33 */ 34 35 #if defined(LIBC_SCCS) && !defined(lint) 36 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 38 #endif 39 #include <sys/cdefs.h> 40 __FBSDID("$FreeBSD$"); 41 42 /* 43 * Implements a connectionless client side RPC. 44 */ 45 46 #include <sys/param.h> 47 #include <sys/systm.h> 48 #include <sys/kernel.h> 49 #include <sys/lock.h> 50 #include <sys/malloc.h> 51 #include <sys/mbuf.h> 52 #include <sys/mutex.h> 53 #include <sys/pcpu.h> 54 #include <sys/proc.h> 55 #include <sys/socket.h> 56 #include <sys/socketvar.h> 57 #include <sys/time.h> 58 #include <sys/uio.h> 59 60 #include <rpc/rpc.h> 61 #include <rpc/rpc_com.h> 62 63 64 #ifdef _FREEFALL_CONFIG 65 /* 66 * Disable RPC exponential back-off for FreeBSD.org systems. 67 */ 68 #define RPC_MAX_BACKOFF 1 /* second */ 69 #else 70 #define RPC_MAX_BACKOFF 30 /* seconds */ 71 #endif 72 73 static bool_t time_not_ok(struct timeval *); 74 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 75 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 76 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 77 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 78 static void clnt_dg_abort(CLIENT *); 79 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 80 static void clnt_dg_close(CLIENT *); 81 static void clnt_dg_destroy(CLIENT *); 82 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 83 84 static struct clnt_ops clnt_dg_ops = { 85 .cl_call = clnt_dg_call, 86 .cl_abort = clnt_dg_abort, 87 .cl_geterr = clnt_dg_geterr, 88 .cl_freeres = clnt_dg_freeres, 89 .cl_close = clnt_dg_close, 90 .cl_destroy = clnt_dg_destroy, 91 .cl_control = clnt_dg_control 92 }; 93 94 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; 95 96 /* 97 * A pending RPC request which awaits a reply. Requests which have 98 * received their reply will have cr_xid set to zero and cr_mrep to 99 * the mbuf chain of the reply. 100 */ 101 struct cu_request { 102 TAILQ_ENTRY(cu_request) cr_link; 103 CLIENT *cr_client; /* owner */ 104 uint32_t cr_xid; /* XID of request */ 105 struct mbuf *cr_mrep; /* reply received by upcall */ 106 int cr_error; /* any error from upcall */ 107 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 108 }; 109 110 TAILQ_HEAD(cu_request_list, cu_request); 111 112 #define MCALL_MSG_SIZE 24 113 114 /* 115 * This structure is pointed to by the socket buffer's sb_upcallarg 116 * member. It is separate from the client private data to facilitate 117 * multiple clients sharing the same socket. The cs_lock mutex is used 118 * to protect all fields of this structure, the socket's receive 119 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 120 * structures is installed on the socket. 121 */ 122 struct cu_socket { 123 struct mtx cs_lock; 124 int cs_refs; /* Count of clients */ 125 struct cu_request_list cs_pending; /* Requests awaiting replies */ 126 }; 127 128 /* 129 * Private data kept per client handle 130 */ 131 struct cu_data { 132 int cu_threads; /* # threads in clnt_vc_call */ 133 bool_t cu_closing; /* TRUE if we are closing */ 134 bool_t cu_closed; /* TRUE if we are closed */ 135 struct socket *cu_socket; /* connection socket */ 136 bool_t cu_closeit; /* opened by library */ 137 struct sockaddr_storage cu_raddr; /* remote address */ 138 int cu_rlen; 139 struct timeval cu_wait; /* retransmit interval */ 140 struct timeval cu_total; /* total time for the call */ 141 struct rpc_err cu_error; 142 uint32_t cu_xid; 143 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 144 size_t cu_mcalllen; 145 size_t cu_sendsz; /* send size */ 146 size_t cu_recvsz; /* recv size */ 147 int cu_async; 148 int cu_connect; /* Use connect(). */ 149 int cu_connected; /* Have done connect(). */ 150 const char *cu_waitchan; 151 int cu_waitflag; 152 int cu_cwnd; /* congestion window */ 153 int cu_sent; /* number of in-flight RPCs */ 154 bool_t cu_cwnd_wait; 155 }; 156 157 #define CWNDSCALE 256 158 #define MAXCWND (32 * CWNDSCALE) 159 160 /* 161 * Connection less client creation returns with client handle parameters. 162 * Default options are set, which the user can change using clnt_control(). 163 * fd should be open and bound. 164 * NB: The rpch->cl_auth is initialized to null authentication. 165 * Caller may wish to set this something more useful. 166 * 167 * sendsz and recvsz are the maximum allowable packet sizes that can be 168 * sent and received. Normally they are the same, but they can be 169 * changed to improve the program efficiency and buffer allocation. 170 * If they are 0, use the transport default. 171 * 172 * If svcaddr is NULL, returns NULL. 173 */ 174 CLIENT * 175 clnt_dg_create( 176 struct socket *so, 177 struct sockaddr *svcaddr, /* servers address */ 178 rpcprog_t program, /* program number */ 179 rpcvers_t version, /* version number */ 180 size_t sendsz, /* buffer recv size */ 181 size_t recvsz) /* buffer send size */ 182 { 183 CLIENT *cl = NULL; /* client handle */ 184 struct cu_data *cu = NULL; /* private data */ 185 struct cu_socket *cs = NULL; 186 struct sockbuf *sb; 187 struct timeval now; 188 struct rpc_msg call_msg; 189 struct __rpc_sockinfo si; 190 XDR xdrs; 191 192 if (svcaddr == NULL) { 193 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 194 return (NULL); 195 } 196 197 if (!__rpc_socket2sockinfo(so, &si)) { 198 rpc_createerr.cf_stat = RPC_TLIERROR; 199 rpc_createerr.cf_error.re_errno = 0; 200 return (NULL); 201 } 202 203 /* 204 * Find the receive and the send size 205 */ 206 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 207 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 208 if ((sendsz == 0) || (recvsz == 0)) { 209 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 210 rpc_createerr.cf_error.re_errno = 0; 211 return (NULL); 212 } 213 214 cl = mem_alloc(sizeof (CLIENT)); 215 216 /* 217 * Should be multiple of 4 for XDR. 218 */ 219 sendsz = ((sendsz + 3) / 4) * 4; 220 recvsz = ((recvsz + 3) / 4) * 4; 221 cu = mem_alloc(sizeof (*cu)); 222 cu->cu_threads = 0; 223 cu->cu_closing = FALSE; 224 cu->cu_closed = FALSE; 225 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 226 cu->cu_rlen = svcaddr->sa_len; 227 /* Other values can also be set through clnt_control() */ 228 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 229 cu->cu_wait.tv_usec = 0; 230 cu->cu_total.tv_sec = -1; 231 cu->cu_total.tv_usec = -1; 232 cu->cu_sendsz = sendsz; 233 cu->cu_recvsz = recvsz; 234 cu->cu_async = FALSE; 235 cu->cu_connect = FALSE; 236 cu->cu_connected = FALSE; 237 cu->cu_waitchan = "rpcrecv"; 238 cu->cu_waitflag = 0; 239 cu->cu_cwnd = MAXCWND / 2; 240 cu->cu_sent = 0; 241 cu->cu_cwnd_wait = FALSE; 242 (void) getmicrotime(&now); 243 cu->cu_xid = __RPC_GETXID(&now); 244 call_msg.rm_xid = cu->cu_xid; 245 call_msg.rm_call.cb_prog = program; 246 call_msg.rm_call.cb_vers = version; 247 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 248 if (! xdr_callhdr(&xdrs, &call_msg)) { 249 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 250 rpc_createerr.cf_error.re_errno = 0; 251 goto err2; 252 } 253 cu->cu_mcalllen = XDR_GETPOS(&xdrs);; 254 255 /* 256 * By default, closeit is always FALSE. It is users responsibility 257 * to do a close on it, else the user may use clnt_control 258 * to let clnt_destroy do it for him/her. 259 */ 260 cu->cu_closeit = FALSE; 261 cu->cu_socket = so; 262 soreserve(so, 256*1024, 256*1024); 263 264 sb = &so->so_rcv; 265 SOCKBUF_LOCK(&so->so_rcv); 266 recheck_socket: 267 if (sb->sb_upcall) { 268 if (sb->sb_upcall != clnt_dg_soupcall) { 269 SOCKBUF_UNLOCK(&so->so_rcv); 270 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 271 goto err2; 272 } 273 cs = (struct cu_socket *) sb->sb_upcallarg; 274 mtx_lock(&cs->cs_lock); 275 cs->cs_refs++; 276 mtx_unlock(&cs->cs_lock); 277 } else { 278 /* 279 * We are the first on this socket - allocate the 280 * structure and install it in the socket. 281 */ 282 SOCKBUF_UNLOCK(&so->so_rcv); 283 cs = mem_alloc(sizeof(*cs)); 284 SOCKBUF_LOCK(&so->so_rcv); 285 if (sb->sb_upcall) { 286 /* 287 * We have lost a race with some other client. 288 */ 289 mem_free(cs, sizeof(*cs)); 290 goto recheck_socket; 291 } 292 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 293 cs->cs_refs = 1; 294 TAILQ_INIT(&cs->cs_pending); 295 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 296 } 297 SOCKBUF_UNLOCK(&so->so_rcv); 298 299 cl->cl_refs = 1; 300 cl->cl_ops = &clnt_dg_ops; 301 cl->cl_private = (caddr_t)(void *)cu; 302 cl->cl_auth = authnone_create(); 303 cl->cl_tp = NULL; 304 cl->cl_netid = NULL; 305 return (cl); 306 err2: 307 if (cl) { 308 mem_free(cl, sizeof (CLIENT)); 309 if (cu) 310 mem_free(cu, sizeof (*cu)); 311 } 312 return (NULL); 313 } 314 315 static enum clnt_stat 316 clnt_dg_call( 317 CLIENT *cl, /* client handle */ 318 struct rpc_callextra *ext, /* call metadata */ 319 rpcproc_t proc, /* procedure number */ 320 struct mbuf *args, /* pointer to args */ 321 struct mbuf **resultsp, /* pointer to results */ 322 struct timeval utimeout) /* seconds to wait before giving up */ 323 { 324 struct cu_data *cu = (struct cu_data *)cl->cl_private; 325 struct cu_socket *cs; 326 struct rpc_timers *rt; 327 AUTH *auth; 328 struct rpc_err *errp; 329 enum clnt_stat stat; 330 XDR xdrs; 331 struct rpc_msg reply_msg; 332 bool_t ok; 333 int retrans; /* number of re-transmits so far */ 334 int nrefreshes = 2; /* number of times to refresh cred */ 335 struct timeval *tvp; 336 int timeout; 337 int retransmit_time; 338 int next_sendtime, starttime, rtt, time_waited, tv = 0; 339 struct sockaddr *sa; 340 socklen_t salen; 341 uint32_t xid = 0; 342 struct mbuf *mreq = NULL, *results; 343 struct cu_request *cr; 344 int error; 345 346 cs = cu->cu_socket->so_rcv.sb_upcallarg; 347 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 348 349 mtx_lock(&cs->cs_lock); 350 351 if (cu->cu_closing || cu->cu_closed) { 352 mtx_unlock(&cs->cs_lock); 353 free(cr, M_RPC); 354 return (RPC_CANTSEND); 355 } 356 cu->cu_threads++; 357 358 if (ext) { 359 auth = ext->rc_auth; 360 errp = &ext->rc_err; 361 } else { 362 auth = cl->cl_auth; 363 errp = &cu->cu_error; 364 } 365 366 cr->cr_client = cl; 367 cr->cr_mrep = NULL; 368 cr->cr_error = 0; 369 370 if (cu->cu_total.tv_usec == -1) { 371 tvp = &utimeout; /* use supplied timeout */ 372 } else { 373 tvp = &cu->cu_total; /* use default timeout */ 374 } 375 if (tvp->tv_sec || tvp->tv_usec) 376 timeout = tvtohz(tvp); 377 else 378 timeout = 0; 379 380 if (cu->cu_connect && !cu->cu_connected) { 381 mtx_unlock(&cs->cs_lock); 382 error = soconnect(cu->cu_socket, 383 (struct sockaddr *)&cu->cu_raddr, curthread); 384 mtx_lock(&cs->cs_lock); 385 if (error) { 386 errp->re_errno = error; 387 errp->re_status = stat = RPC_CANTSEND; 388 goto out; 389 } 390 cu->cu_connected = 1; 391 } 392 if (cu->cu_connected) { 393 sa = NULL; 394 salen = 0; 395 } else { 396 sa = (struct sockaddr *)&cu->cu_raddr; 397 salen = cu->cu_rlen; 398 } 399 time_waited = 0; 400 retrans = 0; 401 if (ext && ext->rc_timers) { 402 rt = ext->rc_timers; 403 if (!rt->rt_rtxcur) 404 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 405 retransmit_time = next_sendtime = rt->rt_rtxcur; 406 } else { 407 rt = NULL; 408 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 409 } 410 411 starttime = ticks; 412 413 call_again: 414 mtx_assert(&cs->cs_lock, MA_OWNED); 415 416 cu->cu_xid++; 417 xid = cu->cu_xid; 418 419 send_again: 420 mtx_unlock(&cs->cs_lock); 421 422 MGETHDR(mreq, M_WAIT, MT_DATA); 423 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 424 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 425 mreq->m_len = cu->cu_mcalllen; 426 427 /* 428 * The XID is the first thing in the request. 429 */ 430 *mtod(mreq, uint32_t *) = htonl(xid); 431 432 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 433 434 if (cu->cu_async == TRUE && args == NULL) 435 goto get_reply; 436 437 if ((! XDR_PUTINT32(&xdrs, &proc)) || 438 (! AUTH_MARSHALL(auth, xid, &xdrs, 439 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 440 errp->re_status = stat = RPC_CANTENCODEARGS; 441 mtx_lock(&cs->cs_lock); 442 goto out; 443 } 444 mreq->m_pkthdr.len = m_length(mreq, NULL); 445 446 cr->cr_xid = xid; 447 mtx_lock(&cs->cs_lock); 448 449 /* 450 * Try to get a place in the congestion window. 451 */ 452 while (cu->cu_sent >= cu->cu_cwnd) { 453 cu->cu_cwnd_wait = TRUE; 454 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 455 cu->cu_waitflag, "rpccwnd", 0); 456 if (error) { 457 errp->re_errno = error; 458 errp->re_status = stat = RPC_CANTSEND; 459 goto out; 460 } 461 } 462 cu->cu_sent += CWNDSCALE; 463 464 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 465 mtx_unlock(&cs->cs_lock); 466 467 /* 468 * sosend consumes mreq. 469 */ 470 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 471 mreq = NULL; 472 473 /* 474 * sub-optimal code appears here because we have 475 * some clock time to spare while the packets are in flight. 476 * (We assume that this is actually only executed once.) 477 */ 478 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 479 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 480 reply_msg.acpted_rply.ar_verf.oa_length = 0; 481 reply_msg.acpted_rply.ar_results.where = NULL; 482 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 483 484 mtx_lock(&cs->cs_lock); 485 if (error) { 486 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 487 errp->re_errno = error; 488 errp->re_status = stat = RPC_CANTSEND; 489 cu->cu_sent -= CWNDSCALE; 490 if (cu->cu_cwnd_wait) { 491 cu->cu_cwnd_wait = FALSE; 492 wakeup(&cu->cu_cwnd_wait); 493 } 494 goto out; 495 } 496 497 /* 498 * Check to see if we got an upcall while waiting for the 499 * lock. 500 */ 501 if (cr->cr_error) { 502 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 503 errp->re_errno = cr->cr_error; 504 errp->re_status = stat = RPC_CANTRECV; 505 cu->cu_sent -= CWNDSCALE; 506 if (cu->cu_cwnd_wait) { 507 cu->cu_cwnd_wait = FALSE; 508 wakeup(&cu->cu_cwnd_wait); 509 } 510 goto out; 511 } 512 if (cr->cr_mrep) { 513 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 514 cu->cu_sent -= CWNDSCALE; 515 if (cu->cu_cwnd_wait) { 516 cu->cu_cwnd_wait = FALSE; 517 wakeup(&cu->cu_cwnd_wait); 518 } 519 goto got_reply; 520 } 521 522 /* 523 * Hack to provide rpc-based message passing 524 */ 525 if (timeout == 0) { 526 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 527 errp->re_status = stat = RPC_TIMEDOUT; 528 cu->cu_sent -= CWNDSCALE; 529 if (cu->cu_cwnd_wait) { 530 cu->cu_cwnd_wait = FALSE; 531 wakeup(&cu->cu_cwnd_wait); 532 } 533 goto out; 534 } 535 536 get_reply: 537 for (;;) { 538 /* Decide how long to wait. */ 539 if (next_sendtime < timeout) 540 tv = next_sendtime; 541 else 542 tv = timeout; 543 tv -= time_waited; 544 545 if (tv > 0) { 546 if (cu->cu_closing || cu->cu_closed) 547 error = 0; 548 else 549 error = msleep(cr, &cs->cs_lock, 550 cu->cu_waitflag, cu->cu_waitchan, tv); 551 } else { 552 error = EWOULDBLOCK; 553 } 554 555 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 556 cu->cu_sent -= CWNDSCALE; 557 if (cu->cu_cwnd_wait) { 558 cu->cu_cwnd_wait = FALSE; 559 wakeup(&cu->cu_cwnd_wait); 560 } 561 562 if (!error) { 563 /* 564 * We were woken up by the upcall. If the 565 * upcall had a receive error, report that, 566 * otherwise we have a reply. 567 */ 568 if (cr->cr_error) { 569 errp->re_errno = cr->cr_error; 570 errp->re_status = stat = RPC_CANTRECV; 571 goto out; 572 } 573 574 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 575 + cu->cu_cwnd / 2) / cu->cu_cwnd; 576 if (cu->cu_cwnd > MAXCWND) 577 cu->cu_cwnd = MAXCWND; 578 579 if (rt) { 580 /* 581 * Add one to the time since a tick 582 * count of N means that the actual 583 * time taken was somewhere between N 584 * and N+1. 585 */ 586 rtt = ticks - starttime + 1; 587 588 /* 589 * Update our estimate of the round 590 * trip time using roughly the 591 * algorithm described in RFC 592 * 2988. Given an RTT sample R: 593 * 594 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 595 * SRTT = (1-alpha) * SRTT + alpha * R 596 * 597 * where alpha = 0.125 and beta = 0.25. 598 * 599 * The initial retransmit timeout is 600 * SRTT + 4*RTTVAR and doubles on each 601 * retransmision. 602 */ 603 if (rt->rt_srtt == 0) { 604 rt->rt_srtt = rtt; 605 rt->rt_deviate = rtt / 2; 606 } else { 607 int32_t error = rtt - rt->rt_srtt; 608 rt->rt_srtt += error / 8; 609 error = abs(error) - rt->rt_deviate; 610 rt->rt_deviate += error / 4; 611 } 612 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 613 } 614 615 break; 616 } 617 618 /* 619 * The sleep returned an error so our request is still 620 * on the list. If we got EWOULDBLOCK, we may want to 621 * re-send the request. 622 */ 623 if (error != EWOULDBLOCK) { 624 errp->re_errno = error; 625 if (error == EINTR) 626 errp->re_status = stat = RPC_INTR; 627 else 628 errp->re_status = stat = RPC_CANTRECV; 629 goto out; 630 } 631 632 time_waited = ticks - starttime; 633 634 /* Check for timeout. */ 635 if (time_waited > timeout) { 636 errp->re_errno = EWOULDBLOCK; 637 errp->re_status = stat = RPC_TIMEDOUT; 638 goto out; 639 } 640 641 /* Retransmit if necessary. */ 642 if (time_waited >= next_sendtime) { 643 cu->cu_cwnd /= 2; 644 if (cu->cu_cwnd < CWNDSCALE) 645 cu->cu_cwnd = CWNDSCALE; 646 if (ext && ext->rc_feedback) { 647 mtx_unlock(&cs->cs_lock); 648 if (retrans == 0) 649 ext->rc_feedback(FEEDBACK_REXMIT1, 650 proc, ext->rc_feedback_arg); 651 else 652 ext->rc_feedback(FEEDBACK_REXMIT2, 653 proc, ext->rc_feedback_arg); 654 mtx_lock(&cs->cs_lock); 655 } 656 if (cu->cu_closing || cu->cu_closed) { 657 errp->re_errno = ESHUTDOWN; 658 errp->re_status = stat = RPC_CANTRECV; 659 goto out; 660 } 661 retrans++; 662 /* update retransmit_time */ 663 if (retransmit_time < RPC_MAX_BACKOFF * hz) 664 retransmit_time = 2 * retransmit_time; 665 next_sendtime += retransmit_time; 666 goto send_again; 667 } 668 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 669 } 670 671 got_reply: 672 /* 673 * Now decode and validate the response. We need to drop the 674 * lock since xdr_replymsg may end up sleeping in malloc. 675 */ 676 mtx_unlock(&cs->cs_lock); 677 678 if (ext && ext->rc_feedback) 679 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 680 681 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 682 ok = xdr_replymsg(&xdrs, &reply_msg); 683 cr->cr_mrep = NULL; 684 685 if (ok) { 686 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 687 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 688 errp->re_status = stat = RPC_SUCCESS; 689 else 690 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 691 692 if (errp->re_status == RPC_SUCCESS) { 693 results = xdrmbuf_getall(&xdrs); 694 if (! AUTH_VALIDATE(auth, xid, 695 &reply_msg.acpted_rply.ar_verf, 696 &results)) { 697 errp->re_status = stat = RPC_AUTHERROR; 698 errp->re_why = AUTH_INVALIDRESP; 699 if (retrans && 700 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 701 /* 702 * If we retransmitted, its 703 * possible that we will 704 * receive a reply for one of 705 * the earlier transmissions 706 * (which will use an older 707 * RPCSEC_GSS sequence 708 * number). In this case, just 709 * go back and listen for a 710 * new reply. We could keep a 711 * record of all the seq 712 * numbers we have transmitted 713 * so far so that we could 714 * accept a reply for any of 715 * them here. 716 */ 717 XDR_DESTROY(&xdrs); 718 mtx_lock(&cs->cs_lock); 719 TAILQ_INSERT_TAIL(&cs->cs_pending, 720 cr, cr_link); 721 cr->cr_mrep = NULL; 722 goto get_reply; 723 } 724 } else { 725 *resultsp = results; 726 } 727 } /* end successful completion */ 728 /* 729 * If unsuccesful AND error is an authentication error 730 * then refresh credentials and try again, else break 731 */ 732 else if (stat == RPC_AUTHERROR) 733 /* maybe our credentials need to be refreshed ... */ 734 if (nrefreshes > 0 && 735 AUTH_REFRESH(auth, &reply_msg)) { 736 nrefreshes--; 737 XDR_DESTROY(&xdrs); 738 mtx_lock(&cs->cs_lock); 739 goto call_again; 740 } 741 /* end of unsuccessful completion */ 742 } /* end of valid reply message */ 743 else { 744 errp->re_status = stat = RPC_CANTDECODERES; 745 746 } 747 XDR_DESTROY(&xdrs); 748 mtx_lock(&cs->cs_lock); 749 out: 750 mtx_assert(&cs->cs_lock, MA_OWNED); 751 752 if (mreq) 753 m_freem(mreq); 754 if (cr->cr_mrep) 755 m_freem(cr->cr_mrep); 756 757 cu->cu_threads--; 758 if (cu->cu_closing) 759 wakeup(cu); 760 761 mtx_unlock(&cs->cs_lock); 762 763 if (auth && stat != RPC_SUCCESS) 764 AUTH_VALIDATE(auth, xid, NULL, NULL); 765 766 free(cr, M_RPC); 767 768 return (stat); 769 } 770 771 static void 772 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 773 { 774 struct cu_data *cu = (struct cu_data *)cl->cl_private; 775 776 *errp = cu->cu_error; 777 } 778 779 static bool_t 780 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 781 { 782 XDR xdrs; 783 bool_t dummy; 784 785 xdrs.x_op = XDR_FREE; 786 dummy = (*xdr_res)(&xdrs, res_ptr); 787 788 return (dummy); 789 } 790 791 /*ARGSUSED*/ 792 static void 793 clnt_dg_abort(CLIENT *h) 794 { 795 } 796 797 static bool_t 798 clnt_dg_control(CLIENT *cl, u_int request, void *info) 799 { 800 struct cu_data *cu = (struct cu_data *)cl->cl_private; 801 struct cu_socket *cs; 802 struct sockaddr *addr; 803 804 cs = cu->cu_socket->so_rcv.sb_upcallarg; 805 mtx_lock(&cs->cs_lock); 806 807 switch (request) { 808 case CLSET_FD_CLOSE: 809 cu->cu_closeit = TRUE; 810 mtx_unlock(&cs->cs_lock); 811 return (TRUE); 812 case CLSET_FD_NCLOSE: 813 cu->cu_closeit = FALSE; 814 mtx_unlock(&cs->cs_lock); 815 return (TRUE); 816 } 817 818 /* for other requests which use info */ 819 if (info == NULL) { 820 mtx_unlock(&cs->cs_lock); 821 return (FALSE); 822 } 823 switch (request) { 824 case CLSET_TIMEOUT: 825 if (time_not_ok((struct timeval *)info)) { 826 mtx_unlock(&cs->cs_lock); 827 return (FALSE); 828 } 829 cu->cu_total = *(struct timeval *)info; 830 break; 831 case CLGET_TIMEOUT: 832 *(struct timeval *)info = cu->cu_total; 833 break; 834 case CLSET_RETRY_TIMEOUT: 835 if (time_not_ok((struct timeval *)info)) { 836 mtx_unlock(&cs->cs_lock); 837 return (FALSE); 838 } 839 cu->cu_wait = *(struct timeval *)info; 840 break; 841 case CLGET_RETRY_TIMEOUT: 842 *(struct timeval *)info = cu->cu_wait; 843 break; 844 case CLGET_SVC_ADDR: 845 /* 846 * Slightly different semantics to userland - we use 847 * sockaddr instead of netbuf. 848 */ 849 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 850 break; 851 case CLSET_SVC_ADDR: /* set to new address */ 852 addr = (struct sockaddr *)info; 853 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 854 break; 855 case CLGET_XID: 856 *(uint32_t *)info = cu->cu_xid; 857 break; 858 859 case CLSET_XID: 860 /* This will set the xid of the NEXT call */ 861 /* decrement by 1 as clnt_dg_call() increments once */ 862 cu->cu_xid = *(uint32_t *)info - 1; 863 break; 864 865 case CLGET_VERS: 866 /* 867 * This RELIES on the information that, in the call body, 868 * the version number field is the fifth field from the 869 * begining of the RPC header. MUST be changed if the 870 * call_struct is changed 871 */ 872 *(uint32_t *)info = 873 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 874 4 * BYTES_PER_XDR_UNIT)); 875 break; 876 877 case CLSET_VERS: 878 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 879 = htonl(*(uint32_t *)info); 880 break; 881 882 case CLGET_PROG: 883 /* 884 * This RELIES on the information that, in the call body, 885 * the program number field is the fourth field from the 886 * begining of the RPC header. MUST be changed if the 887 * call_struct is changed 888 */ 889 *(uint32_t *)info = 890 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 891 3 * BYTES_PER_XDR_UNIT)); 892 break; 893 894 case CLSET_PROG: 895 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 896 = htonl(*(uint32_t *)info); 897 break; 898 case CLSET_ASYNC: 899 cu->cu_async = *(int *)info; 900 break; 901 case CLSET_CONNECT: 902 cu->cu_connect = *(int *)info; 903 break; 904 case CLSET_WAITCHAN: 905 cu->cu_waitchan = (const char *)info; 906 break; 907 case CLGET_WAITCHAN: 908 *(const char **) info = cu->cu_waitchan; 909 break; 910 case CLSET_INTERRUPTIBLE: 911 if (*(int *) info) 912 cu->cu_waitflag = PCATCH; 913 else 914 cu->cu_waitflag = 0; 915 break; 916 case CLGET_INTERRUPTIBLE: 917 if (cu->cu_waitflag) 918 *(int *) info = TRUE; 919 else 920 *(int *) info = FALSE; 921 break; 922 default: 923 mtx_unlock(&cs->cs_lock); 924 return (FALSE); 925 } 926 mtx_unlock(&cs->cs_lock); 927 return (TRUE); 928 } 929 930 static void 931 clnt_dg_close(CLIENT *cl) 932 { 933 struct cu_data *cu = (struct cu_data *)cl->cl_private; 934 struct cu_socket *cs; 935 struct cu_request *cr; 936 937 cs = cu->cu_socket->so_rcv.sb_upcallarg; 938 mtx_lock(&cs->cs_lock); 939 940 if (cu->cu_closed) { 941 mtx_unlock(&cs->cs_lock); 942 return; 943 } 944 945 if (cu->cu_closing) { 946 while (cu->cu_closing) 947 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 948 KASSERT(cu->cu_closed, ("client should be closed")); 949 mtx_unlock(&cs->cs_lock); 950 return; 951 } 952 953 /* 954 * Abort any pending requests and wait until everyone 955 * has finished with clnt_vc_call. 956 */ 957 cu->cu_closing = TRUE; 958 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 959 if (cr->cr_client == cl) { 960 cr->cr_xid = 0; 961 cr->cr_error = ESHUTDOWN; 962 wakeup(cr); 963 } 964 } 965 966 while (cu->cu_threads) 967 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 968 969 cu->cu_closing = FALSE; 970 cu->cu_closed = TRUE; 971 972 mtx_unlock(&cs->cs_lock); 973 wakeup(cu); 974 } 975 976 static void 977 clnt_dg_destroy(CLIENT *cl) 978 { 979 struct cu_data *cu = (struct cu_data *)cl->cl_private; 980 struct cu_socket *cs; 981 struct socket *so = NULL; 982 bool_t lastsocketref; 983 984 cs = cu->cu_socket->so_rcv.sb_upcallarg; 985 clnt_dg_close(cl); 986 987 mtx_lock(&cs->cs_lock); 988 989 cs->cs_refs--; 990 if (cs->cs_refs == 0) { 991 mtx_destroy(&cs->cs_lock); 992 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 993 soupcall_clear(cu->cu_socket, SO_RCV); 994 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 995 mem_free(cs, sizeof(*cs)); 996 lastsocketref = TRUE; 997 } else { 998 mtx_unlock(&cs->cs_lock); 999 lastsocketref = FALSE; 1000 } 1001 1002 if (cu->cu_closeit && lastsocketref) { 1003 so = cu->cu_socket; 1004 cu->cu_socket = NULL; 1005 } 1006 1007 if (so) 1008 soclose(so); 1009 1010 if (cl->cl_netid && cl->cl_netid[0]) 1011 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1012 if (cl->cl_tp && cl->cl_tp[0]) 1013 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1014 mem_free(cu, sizeof (*cu)); 1015 mem_free(cl, sizeof (CLIENT)); 1016 } 1017 1018 /* 1019 * Make sure that the time is not garbage. -1 value is allowed. 1020 */ 1021 static bool_t 1022 time_not_ok(struct timeval *t) 1023 { 1024 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1025 t->tv_usec < -1 || t->tv_usec > 1000000); 1026 } 1027 1028 int 1029 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1030 { 1031 struct cu_socket *cs = (struct cu_socket *) arg; 1032 struct uio uio; 1033 struct mbuf *m; 1034 struct mbuf *control; 1035 struct cu_request *cr; 1036 int error, rcvflag, foundreq; 1037 uint32_t xid; 1038 1039 uio.uio_resid = 1000000000; 1040 uio.uio_td = curthread; 1041 do { 1042 SOCKBUF_UNLOCK(&so->so_rcv); 1043 m = NULL; 1044 control = NULL; 1045 rcvflag = MSG_DONTWAIT; 1046 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1047 if (control) 1048 m_freem(control); 1049 SOCKBUF_LOCK(&so->so_rcv); 1050 1051 if (error == EWOULDBLOCK) 1052 break; 1053 1054 /* 1055 * If there was an error, wake up all pending 1056 * requests. 1057 */ 1058 if (error) { 1059 mtx_lock(&cs->cs_lock); 1060 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1061 cr->cr_xid = 0; 1062 cr->cr_error = error; 1063 wakeup(cr); 1064 } 1065 mtx_unlock(&cs->cs_lock); 1066 break; 1067 } 1068 1069 /* 1070 * The XID is in the first uint32_t of the reply. 1071 */ 1072 if (m->m_len < sizeof(xid)) 1073 m = m_pullup(m, sizeof(xid)); 1074 if (!m) 1075 /* 1076 * Should never happen. 1077 */ 1078 continue; 1079 1080 xid = ntohl(*mtod(m, uint32_t *)); 1081 1082 /* 1083 * Attempt to match this reply with a pending request. 1084 */ 1085 mtx_lock(&cs->cs_lock); 1086 foundreq = 0; 1087 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1088 if (cr->cr_xid == xid) { 1089 /* 1090 * This one matches. We leave the 1091 * reply mbuf in cr->cr_mrep. Set the 1092 * XID to zero so that we will ignore 1093 * any duplicated replies that arrive 1094 * before clnt_dg_call removes it from 1095 * the queue. 1096 */ 1097 cr->cr_xid = 0; 1098 cr->cr_mrep = m; 1099 cr->cr_error = 0; 1100 foundreq = 1; 1101 wakeup(cr); 1102 break; 1103 } 1104 } 1105 mtx_unlock(&cs->cs_lock); 1106 1107 /* 1108 * If we didn't find the matching request, just drop 1109 * it - its probably a repeated reply. 1110 */ 1111 if (!foundreq) 1112 m_freem(m); 1113 } while (m); 1114 return (SU_OK); 1115 } 1116 1117