1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 /* 32 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 33 */ 34 35 #if defined(LIBC_SCCS) && !defined(lint) 36 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 38 #endif 39 #include <sys/cdefs.h> 40 __FBSDID("$FreeBSD$"); 41 42 /* 43 * Implements a connectionless client side RPC. 44 */ 45 46 #include <sys/param.h> 47 #include <sys/systm.h> 48 #include <sys/kernel.h> 49 #include <sys/lock.h> 50 #include <sys/malloc.h> 51 #include <sys/mbuf.h> 52 #include <sys/mutex.h> 53 #include <sys/pcpu.h> 54 #include <sys/proc.h> 55 #include <sys/socket.h> 56 #include <sys/socketvar.h> 57 #include <sys/time.h> 58 #include <sys/uio.h> 59 60 #include <rpc/rpc.h> 61 #include <rpc/rpc_com.h> 62 63 64 #ifdef _FREEFALL_CONFIG 65 /* 66 * Disable RPC exponential back-off for FreeBSD.org systems. 67 */ 68 #define RPC_MAX_BACKOFF 1 /* second */ 69 #else 70 #define RPC_MAX_BACKOFF 30 /* seconds */ 71 #endif 72 73 static bool_t time_not_ok(struct timeval *); 74 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 75 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 76 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 77 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 78 static void clnt_dg_abort(CLIENT *); 79 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 80 static void clnt_dg_close(CLIENT *); 81 static void clnt_dg_destroy(CLIENT *); 82 static void clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 83 84 static struct clnt_ops clnt_dg_ops = { 85 .cl_call = clnt_dg_call, 86 .cl_abort = clnt_dg_abort, 87 .cl_geterr = clnt_dg_geterr, 88 .cl_freeres = clnt_dg_freeres, 89 .cl_close = clnt_dg_close, 90 .cl_destroy = clnt_dg_destroy, 91 .cl_control = clnt_dg_control 92 }; 93 94 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; 95 96 /* 97 * A pending RPC request which awaits a reply. Requests which have 98 * received their reply will have cr_xid set to zero and cr_mrep to 99 * the mbuf chain of the reply. 100 */ 101 struct cu_request { 102 TAILQ_ENTRY(cu_request) cr_link; 103 CLIENT *cr_client; /* owner */ 104 uint32_t cr_xid; /* XID of request */ 105 struct mbuf *cr_mrep; /* reply received by upcall */ 106 int cr_error; /* any error from upcall */ 107 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 108 }; 109 110 TAILQ_HEAD(cu_request_list, cu_request); 111 112 #define MCALL_MSG_SIZE 24 113 114 /* 115 * This structure is pointed to by the socket's so_upcallarg 116 * member. It is separate from the client private data to facilitate 117 * multiple clients sharing the same socket. The cs_lock mutex is used 118 * to protect all fields of this structure, the socket's receive 119 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 120 * structures is installed on the socket. 121 */ 122 struct cu_socket { 123 struct mtx cs_lock; 124 int cs_refs; /* Count of clients */ 125 struct cu_request_list cs_pending; /* Requests awaiting replies */ 126 }; 127 128 /* 129 * Private data kept per client handle 130 */ 131 struct cu_data { 132 int cu_threads; /* # threads in clnt_vc_call */ 133 bool_t cu_closing; /* TRUE if we are closing */ 134 bool_t cu_closed; /* TRUE if we are closed */ 135 struct socket *cu_socket; /* connection socket */ 136 bool_t cu_closeit; /* opened by library */ 137 struct sockaddr_storage cu_raddr; /* remote address */ 138 int cu_rlen; 139 struct timeval cu_wait; /* retransmit interval */ 140 struct timeval cu_total; /* total time for the call */ 141 struct rpc_err cu_error; 142 uint32_t cu_xid; 143 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 144 size_t cu_mcalllen; 145 size_t cu_sendsz; /* send size */ 146 size_t cu_recvsz; /* recv size */ 147 int cu_async; 148 int cu_connect; /* Use connect(). */ 149 int cu_connected; /* Have done connect(). */ 150 const char *cu_waitchan; 151 int cu_waitflag; 152 int cu_cwnd; /* congestion window */ 153 int cu_sent; /* number of in-flight RPCs */ 154 bool_t cu_cwnd_wait; 155 }; 156 157 #define CWNDSCALE 256 158 #define MAXCWND (32 * CWNDSCALE) 159 160 /* 161 * Connection less client creation returns with client handle parameters. 162 * Default options are set, which the user can change using clnt_control(). 163 * fd should be open and bound. 164 * NB: The rpch->cl_auth is initialized to null authentication. 165 * Caller may wish to set this something more useful. 166 * 167 * sendsz and recvsz are the maximum allowable packet sizes that can be 168 * sent and received. Normally they are the same, but they can be 169 * changed to improve the program efficiency and buffer allocation. 170 * If they are 0, use the transport default. 171 * 172 * If svcaddr is NULL, returns NULL. 173 */ 174 CLIENT * 175 clnt_dg_create( 176 struct socket *so, 177 struct sockaddr *svcaddr, /* servers address */ 178 rpcprog_t program, /* program number */ 179 rpcvers_t version, /* version number */ 180 size_t sendsz, /* buffer recv size */ 181 size_t recvsz) /* buffer send size */ 182 { 183 CLIENT *cl = NULL; /* client handle */ 184 struct cu_data *cu = NULL; /* private data */ 185 struct cu_socket *cs = NULL; 186 struct timeval now; 187 struct rpc_msg call_msg; 188 struct __rpc_sockinfo si; 189 XDR xdrs; 190 191 if (svcaddr == NULL) { 192 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 193 return (NULL); 194 } 195 196 if (!__rpc_socket2sockinfo(so, &si)) { 197 rpc_createerr.cf_stat = RPC_TLIERROR; 198 rpc_createerr.cf_error.re_errno = 0; 199 return (NULL); 200 } 201 202 /* 203 * Find the receive and the send size 204 */ 205 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 206 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 207 if ((sendsz == 0) || (recvsz == 0)) { 208 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 209 rpc_createerr.cf_error.re_errno = 0; 210 return (NULL); 211 } 212 213 cl = mem_alloc(sizeof (CLIENT)); 214 215 /* 216 * Should be multiple of 4 for XDR. 217 */ 218 sendsz = ((sendsz + 3) / 4) * 4; 219 recvsz = ((recvsz + 3) / 4) * 4; 220 cu = mem_alloc(sizeof (*cu)); 221 cu->cu_threads = 0; 222 cu->cu_closing = FALSE; 223 cu->cu_closed = FALSE; 224 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 225 cu->cu_rlen = svcaddr->sa_len; 226 /* Other values can also be set through clnt_control() */ 227 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 228 cu->cu_wait.tv_usec = 0; 229 cu->cu_total.tv_sec = -1; 230 cu->cu_total.tv_usec = -1; 231 cu->cu_sendsz = sendsz; 232 cu->cu_recvsz = recvsz; 233 cu->cu_async = FALSE; 234 cu->cu_connect = FALSE; 235 cu->cu_connected = FALSE; 236 cu->cu_waitchan = "rpcrecv"; 237 cu->cu_waitflag = 0; 238 cu->cu_cwnd = MAXCWND / 2; 239 cu->cu_sent = 0; 240 cu->cu_cwnd_wait = FALSE; 241 (void) getmicrotime(&now); 242 cu->cu_xid = __RPC_GETXID(&now); 243 call_msg.rm_xid = cu->cu_xid; 244 call_msg.rm_call.cb_prog = program; 245 call_msg.rm_call.cb_vers = version; 246 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 247 if (! xdr_callhdr(&xdrs, &call_msg)) { 248 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 249 rpc_createerr.cf_error.re_errno = 0; 250 goto err2; 251 } 252 cu->cu_mcalllen = XDR_GETPOS(&xdrs);; 253 254 /* 255 * By default, closeit is always FALSE. It is users responsibility 256 * to do a close on it, else the user may use clnt_control 257 * to let clnt_destroy do it for him/her. 258 */ 259 cu->cu_closeit = FALSE; 260 cu->cu_socket = so; 261 soreserve(so, 256*1024, 256*1024); 262 263 SOCKBUF_LOCK(&so->so_rcv); 264 recheck_socket: 265 if (so->so_upcall) { 266 if (so->so_upcall != clnt_dg_soupcall) { 267 SOCKBUF_UNLOCK(&so->so_rcv); 268 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 269 goto err2; 270 } 271 cs = (struct cu_socket *) so->so_upcallarg; 272 mtx_lock(&cs->cs_lock); 273 cs->cs_refs++; 274 mtx_unlock(&cs->cs_lock); 275 } else { 276 /* 277 * We are the first on this socket - allocate the 278 * structure and install it in the socket. 279 */ 280 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 281 cs = mem_alloc(sizeof(*cs)); 282 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 283 if (so->so_upcall) { 284 /* 285 * We have lost a race with some other client. 286 */ 287 mem_free(cs, sizeof(*cs)); 288 goto recheck_socket; 289 } 290 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 291 cs->cs_refs = 1; 292 TAILQ_INIT(&cs->cs_pending); 293 so->so_upcallarg = cs; 294 so->so_upcall = clnt_dg_soupcall; 295 so->so_rcv.sb_flags |= SB_UPCALL; 296 } 297 SOCKBUF_UNLOCK(&so->so_rcv); 298 299 cl->cl_refs = 1; 300 cl->cl_ops = &clnt_dg_ops; 301 cl->cl_private = (caddr_t)(void *)cu; 302 cl->cl_auth = authnone_create(); 303 cl->cl_tp = NULL; 304 cl->cl_netid = NULL; 305 return (cl); 306 err2: 307 if (cl) { 308 mem_free(cl, sizeof (CLIENT)); 309 if (cu) 310 mem_free(cu, sizeof (*cu)); 311 } 312 return (NULL); 313 } 314 315 static enum clnt_stat 316 clnt_dg_call( 317 CLIENT *cl, /* client handle */ 318 struct rpc_callextra *ext, /* call metadata */ 319 rpcproc_t proc, /* procedure number */ 320 struct mbuf *args, /* pointer to args */ 321 struct mbuf **resultsp, /* pointer to results */ 322 struct timeval utimeout) /* seconds to wait before giving up */ 323 { 324 struct cu_data *cu = (struct cu_data *)cl->cl_private; 325 struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; 326 struct rpc_timers *rt; 327 AUTH *auth; 328 struct rpc_err *errp; 329 enum clnt_stat stat; 330 XDR xdrs; 331 struct rpc_msg reply_msg; 332 bool_t ok; 333 int retrans; /* number of re-transmits so far */ 334 int nrefreshes = 2; /* number of times to refresh cred */ 335 struct timeval *tvp; 336 int timeout; 337 int retransmit_time; 338 int next_sendtime, starttime, rtt, time_waited, tv = 0; 339 struct sockaddr *sa; 340 socklen_t salen; 341 uint32_t xid = 0; 342 struct mbuf *mreq = NULL, *results; 343 struct cu_request *cr; 344 int error; 345 346 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 347 348 mtx_lock(&cs->cs_lock); 349 350 if (cu->cu_closing || cu->cu_closed) { 351 mtx_unlock(&cs->cs_lock); 352 free(cr, M_RPC); 353 return (RPC_CANTSEND); 354 } 355 cu->cu_threads++; 356 357 if (ext) { 358 auth = ext->rc_auth; 359 errp = &ext->rc_err; 360 } else { 361 auth = cl->cl_auth; 362 errp = &cu->cu_error; 363 } 364 365 cr->cr_client = cl; 366 cr->cr_mrep = NULL; 367 cr->cr_error = 0; 368 369 if (cu->cu_total.tv_usec == -1) { 370 tvp = &utimeout; /* use supplied timeout */ 371 } else { 372 tvp = &cu->cu_total; /* use default timeout */ 373 } 374 if (tvp->tv_sec || tvp->tv_usec) 375 timeout = tvtohz(tvp); 376 else 377 timeout = 0; 378 379 if (cu->cu_connect && !cu->cu_connected) { 380 mtx_unlock(&cs->cs_lock); 381 error = soconnect(cu->cu_socket, 382 (struct sockaddr *)&cu->cu_raddr, curthread); 383 mtx_lock(&cs->cs_lock); 384 if (error) { 385 errp->re_errno = error; 386 errp->re_status = stat = RPC_CANTSEND; 387 goto out; 388 } 389 cu->cu_connected = 1; 390 } 391 if (cu->cu_connected) { 392 sa = NULL; 393 salen = 0; 394 } else { 395 sa = (struct sockaddr *)&cu->cu_raddr; 396 salen = cu->cu_rlen; 397 } 398 time_waited = 0; 399 retrans = 0; 400 if (ext && ext->rc_timers) { 401 rt = ext->rc_timers; 402 if (!rt->rt_rtxcur) 403 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 404 retransmit_time = next_sendtime = rt->rt_rtxcur; 405 } else { 406 rt = NULL; 407 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 408 } 409 410 starttime = ticks; 411 412 call_again: 413 mtx_assert(&cs->cs_lock, MA_OWNED); 414 415 cu->cu_xid++; 416 xid = cu->cu_xid; 417 418 send_again: 419 mtx_unlock(&cs->cs_lock); 420 421 MGETHDR(mreq, M_WAIT, MT_DATA); 422 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 423 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 424 mreq->m_len = cu->cu_mcalllen; 425 426 /* 427 * The XID is the first thing in the request. 428 */ 429 *mtod(mreq, uint32_t *) = htonl(xid); 430 431 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 432 433 if (cu->cu_async == TRUE && args == NULL) 434 goto get_reply; 435 436 if ((! XDR_PUTINT32(&xdrs, &proc)) || 437 (! AUTH_MARSHALL(auth, xid, &xdrs, 438 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 439 errp->re_status = stat = RPC_CANTENCODEARGS; 440 mtx_lock(&cs->cs_lock); 441 goto out; 442 } 443 mreq->m_pkthdr.len = m_length(mreq, NULL); 444 445 cr->cr_xid = xid; 446 mtx_lock(&cs->cs_lock); 447 448 /* 449 * Try to get a place in the congestion window. 450 */ 451 while (cu->cu_sent >= cu->cu_cwnd) { 452 cu->cu_cwnd_wait = TRUE; 453 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 454 cu->cu_waitflag, "rpccwnd", 0); 455 if (error) { 456 errp->re_errno = error; 457 errp->re_status = stat = RPC_CANTSEND; 458 goto out; 459 } 460 } 461 cu->cu_sent += CWNDSCALE; 462 463 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 464 mtx_unlock(&cs->cs_lock); 465 466 /* 467 * sosend consumes mreq. 468 */ 469 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 470 mreq = NULL; 471 472 /* 473 * sub-optimal code appears here because we have 474 * some clock time to spare while the packets are in flight. 475 * (We assume that this is actually only executed once.) 476 */ 477 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 478 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 479 reply_msg.acpted_rply.ar_verf.oa_length = 0; 480 reply_msg.acpted_rply.ar_results.where = NULL; 481 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 482 483 mtx_lock(&cs->cs_lock); 484 if (error) { 485 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 486 errp->re_errno = error; 487 errp->re_status = stat = RPC_CANTSEND; 488 cu->cu_sent -= CWNDSCALE; 489 if (cu->cu_cwnd_wait) { 490 cu->cu_cwnd_wait = FALSE; 491 wakeup(&cu->cu_cwnd_wait); 492 } 493 goto out; 494 } 495 496 /* 497 * Check to see if we got an upcall while waiting for the 498 * lock. 499 */ 500 if (cr->cr_error) { 501 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 502 errp->re_errno = cr->cr_error; 503 errp->re_status = stat = RPC_CANTRECV; 504 cu->cu_sent -= CWNDSCALE; 505 if (cu->cu_cwnd_wait) { 506 cu->cu_cwnd_wait = FALSE; 507 wakeup(&cu->cu_cwnd_wait); 508 } 509 goto out; 510 } 511 if (cr->cr_mrep) { 512 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 513 cu->cu_sent -= CWNDSCALE; 514 if (cu->cu_cwnd_wait) { 515 cu->cu_cwnd_wait = FALSE; 516 wakeup(&cu->cu_cwnd_wait); 517 } 518 goto got_reply; 519 } 520 521 /* 522 * Hack to provide rpc-based message passing 523 */ 524 if (timeout == 0) { 525 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 526 errp->re_status = stat = RPC_TIMEDOUT; 527 cu->cu_sent -= CWNDSCALE; 528 if (cu->cu_cwnd_wait) { 529 cu->cu_cwnd_wait = FALSE; 530 wakeup(&cu->cu_cwnd_wait); 531 } 532 goto out; 533 } 534 535 get_reply: 536 for (;;) { 537 /* Decide how long to wait. */ 538 if (next_sendtime < timeout) 539 tv = next_sendtime; 540 else 541 tv = timeout; 542 tv -= time_waited; 543 544 if (tv > 0) { 545 if (cu->cu_closing || cu->cu_closed) 546 error = 0; 547 else 548 error = msleep(cr, &cs->cs_lock, 549 cu->cu_waitflag, cu->cu_waitchan, tv); 550 } else { 551 error = EWOULDBLOCK; 552 } 553 554 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 555 cu->cu_sent -= CWNDSCALE; 556 if (cu->cu_cwnd_wait) { 557 cu->cu_cwnd_wait = FALSE; 558 wakeup(&cu->cu_cwnd_wait); 559 } 560 561 if (!error) { 562 /* 563 * We were woken up by the upcall. If the 564 * upcall had a receive error, report that, 565 * otherwise we have a reply. 566 */ 567 if (cr->cr_error) { 568 errp->re_errno = cr->cr_error; 569 errp->re_status = stat = RPC_CANTRECV; 570 goto out; 571 } 572 573 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 574 + cu->cu_cwnd / 2) / cu->cu_cwnd; 575 if (cu->cu_cwnd > MAXCWND) 576 cu->cu_cwnd = MAXCWND; 577 578 if (rt) { 579 /* 580 * Add one to the time since a tick 581 * count of N means that the actual 582 * time taken was somewhere between N 583 * and N+1. 584 */ 585 rtt = ticks - starttime + 1; 586 587 /* 588 * Update our estimate of the round 589 * trip time using roughly the 590 * algorithm described in RFC 591 * 2988. Given an RTT sample R: 592 * 593 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 594 * SRTT = (1-alpha) * SRTT + alpha * R 595 * 596 * where alpha = 0.125 and beta = 0.25. 597 * 598 * The initial retransmit timeout is 599 * SRTT + 4*RTTVAR and doubles on each 600 * retransmision. 601 */ 602 if (rt->rt_srtt == 0) { 603 rt->rt_srtt = rtt; 604 rt->rt_deviate = rtt / 2; 605 } else { 606 int32_t error = rtt - rt->rt_srtt; 607 rt->rt_srtt += error / 8; 608 error = abs(error) - rt->rt_deviate; 609 rt->rt_deviate += error / 4; 610 } 611 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 612 } 613 614 break; 615 } 616 617 /* 618 * The sleep returned an error so our request is still 619 * on the list. If we got EWOULDBLOCK, we may want to 620 * re-send the request. 621 */ 622 if (error != EWOULDBLOCK) { 623 errp->re_errno = error; 624 if (error == EINTR) 625 errp->re_status = stat = RPC_INTR; 626 else 627 errp->re_status = stat = RPC_CANTRECV; 628 goto out; 629 } 630 631 time_waited = ticks - starttime; 632 633 /* Check for timeout. */ 634 if (time_waited > timeout) { 635 errp->re_errno = EWOULDBLOCK; 636 errp->re_status = stat = RPC_TIMEDOUT; 637 goto out; 638 } 639 640 /* Retransmit if necessary. */ 641 if (time_waited >= next_sendtime) { 642 cu->cu_cwnd /= 2; 643 if (cu->cu_cwnd < CWNDSCALE) 644 cu->cu_cwnd = CWNDSCALE; 645 if (ext && ext->rc_feedback) { 646 mtx_unlock(&cs->cs_lock); 647 if (retrans == 0) 648 ext->rc_feedback(FEEDBACK_REXMIT1, 649 proc, ext->rc_feedback_arg); 650 else 651 ext->rc_feedback(FEEDBACK_REXMIT2, 652 proc, ext->rc_feedback_arg); 653 mtx_lock(&cs->cs_lock); 654 } 655 if (cu->cu_closing || cu->cu_closed) { 656 errp->re_errno = ESHUTDOWN; 657 errp->re_status = stat = RPC_CANTRECV; 658 goto out; 659 } 660 retrans++; 661 /* update retransmit_time */ 662 if (retransmit_time < RPC_MAX_BACKOFF * hz) 663 retransmit_time = 2 * retransmit_time; 664 next_sendtime += retransmit_time; 665 goto send_again; 666 } 667 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 668 } 669 670 got_reply: 671 /* 672 * Now decode and validate the response. We need to drop the 673 * lock since xdr_replymsg may end up sleeping in malloc. 674 */ 675 mtx_unlock(&cs->cs_lock); 676 677 if (ext && ext->rc_feedback) 678 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 679 680 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 681 ok = xdr_replymsg(&xdrs, &reply_msg); 682 cr->cr_mrep = NULL; 683 684 if (ok) { 685 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 686 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 687 errp->re_status = stat = RPC_SUCCESS; 688 else 689 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 690 691 if (errp->re_status == RPC_SUCCESS) { 692 results = xdrmbuf_getall(&xdrs); 693 if (! AUTH_VALIDATE(auth, xid, 694 &reply_msg.acpted_rply.ar_verf, 695 &results)) { 696 errp->re_status = stat = RPC_AUTHERROR; 697 errp->re_why = AUTH_INVALIDRESP; 698 if (retrans && 699 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 700 /* 701 * If we retransmitted, its 702 * possible that we will 703 * receive a reply for one of 704 * the earlier transmissions 705 * (which will use an older 706 * RPCSEC_GSS sequence 707 * number). In this case, just 708 * go back and listen for a 709 * new reply. We could keep a 710 * record of all the seq 711 * numbers we have transmitted 712 * so far so that we could 713 * accept a reply for any of 714 * them here. 715 */ 716 XDR_DESTROY(&xdrs); 717 mtx_lock(&cs->cs_lock); 718 TAILQ_INSERT_TAIL(&cs->cs_pending, 719 cr, cr_link); 720 cr->cr_mrep = NULL; 721 goto get_reply; 722 } 723 } else { 724 *resultsp = results; 725 } 726 } /* end successful completion */ 727 /* 728 * If unsuccesful AND error is an authentication error 729 * then refresh credentials and try again, else break 730 */ 731 else if (stat == RPC_AUTHERROR) 732 /* maybe our credentials need to be refreshed ... */ 733 if (nrefreshes > 0 && 734 AUTH_REFRESH(auth, &reply_msg)) { 735 nrefreshes--; 736 XDR_DESTROY(&xdrs); 737 mtx_lock(&cs->cs_lock); 738 goto call_again; 739 } 740 /* end of unsuccessful completion */ 741 } /* end of valid reply message */ 742 else { 743 errp->re_status = stat = RPC_CANTDECODERES; 744 745 } 746 XDR_DESTROY(&xdrs); 747 mtx_lock(&cs->cs_lock); 748 out: 749 mtx_assert(&cs->cs_lock, MA_OWNED); 750 751 if (mreq) 752 m_freem(mreq); 753 if (cr->cr_mrep) 754 m_freem(cr->cr_mrep); 755 756 cu->cu_threads--; 757 if (cu->cu_closing) 758 wakeup(cu); 759 760 mtx_unlock(&cs->cs_lock); 761 762 if (auth && stat != RPC_SUCCESS) 763 AUTH_VALIDATE(auth, xid, NULL, NULL); 764 765 free(cr, M_RPC); 766 767 return (stat); 768 } 769 770 static void 771 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 772 { 773 struct cu_data *cu = (struct cu_data *)cl->cl_private; 774 775 *errp = cu->cu_error; 776 } 777 778 static bool_t 779 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 780 { 781 XDR xdrs; 782 bool_t dummy; 783 784 xdrs.x_op = XDR_FREE; 785 dummy = (*xdr_res)(&xdrs, res_ptr); 786 787 return (dummy); 788 } 789 790 /*ARGSUSED*/ 791 static void 792 clnt_dg_abort(CLIENT *h) 793 { 794 } 795 796 static bool_t 797 clnt_dg_control(CLIENT *cl, u_int request, void *info) 798 { 799 struct cu_data *cu = (struct cu_data *)cl->cl_private; 800 struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; 801 struct sockaddr *addr; 802 803 mtx_lock(&cs->cs_lock); 804 805 switch (request) { 806 case CLSET_FD_CLOSE: 807 cu->cu_closeit = TRUE; 808 mtx_unlock(&cs->cs_lock); 809 return (TRUE); 810 case CLSET_FD_NCLOSE: 811 cu->cu_closeit = FALSE; 812 mtx_unlock(&cs->cs_lock); 813 return (TRUE); 814 } 815 816 /* for other requests which use info */ 817 if (info == NULL) { 818 mtx_unlock(&cs->cs_lock); 819 return (FALSE); 820 } 821 switch (request) { 822 case CLSET_TIMEOUT: 823 if (time_not_ok((struct timeval *)info)) { 824 mtx_unlock(&cs->cs_lock); 825 return (FALSE); 826 } 827 cu->cu_total = *(struct timeval *)info; 828 break; 829 case CLGET_TIMEOUT: 830 *(struct timeval *)info = cu->cu_total; 831 break; 832 case CLSET_RETRY_TIMEOUT: 833 if (time_not_ok((struct timeval *)info)) { 834 mtx_unlock(&cs->cs_lock); 835 return (FALSE); 836 } 837 cu->cu_wait = *(struct timeval *)info; 838 break; 839 case CLGET_RETRY_TIMEOUT: 840 *(struct timeval *)info = cu->cu_wait; 841 break; 842 case CLGET_SVC_ADDR: 843 /* 844 * Slightly different semantics to userland - we use 845 * sockaddr instead of netbuf. 846 */ 847 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 848 break; 849 case CLSET_SVC_ADDR: /* set to new address */ 850 addr = (struct sockaddr *)info; 851 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 852 break; 853 case CLGET_XID: 854 *(uint32_t *)info = cu->cu_xid; 855 break; 856 857 case CLSET_XID: 858 /* This will set the xid of the NEXT call */ 859 /* decrement by 1 as clnt_dg_call() increments once */ 860 cu->cu_xid = *(uint32_t *)info - 1; 861 break; 862 863 case CLGET_VERS: 864 /* 865 * This RELIES on the information that, in the call body, 866 * the version number field is the fifth field from the 867 * begining of the RPC header. MUST be changed if the 868 * call_struct is changed 869 */ 870 *(uint32_t *)info = 871 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 872 4 * BYTES_PER_XDR_UNIT)); 873 break; 874 875 case CLSET_VERS: 876 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 877 = htonl(*(uint32_t *)info); 878 break; 879 880 case CLGET_PROG: 881 /* 882 * This RELIES on the information that, in the call body, 883 * the program number field is the fourth field from the 884 * begining of the RPC header. MUST be changed if the 885 * call_struct is changed 886 */ 887 *(uint32_t *)info = 888 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 889 3 * BYTES_PER_XDR_UNIT)); 890 break; 891 892 case CLSET_PROG: 893 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 894 = htonl(*(uint32_t *)info); 895 break; 896 case CLSET_ASYNC: 897 cu->cu_async = *(int *)info; 898 break; 899 case CLSET_CONNECT: 900 cu->cu_connect = *(int *)info; 901 break; 902 case CLSET_WAITCHAN: 903 cu->cu_waitchan = (const char *)info; 904 break; 905 case CLGET_WAITCHAN: 906 *(const char **) info = cu->cu_waitchan; 907 break; 908 case CLSET_INTERRUPTIBLE: 909 if (*(int *) info) 910 cu->cu_waitflag = PCATCH; 911 else 912 cu->cu_waitflag = 0; 913 break; 914 case CLGET_INTERRUPTIBLE: 915 if (cu->cu_waitflag) 916 *(int *) info = TRUE; 917 else 918 *(int *) info = FALSE; 919 break; 920 default: 921 mtx_unlock(&cs->cs_lock); 922 return (FALSE); 923 } 924 mtx_unlock(&cs->cs_lock); 925 return (TRUE); 926 } 927 928 static void 929 clnt_dg_close(CLIENT *cl) 930 { 931 struct cu_data *cu = (struct cu_data *)cl->cl_private; 932 struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; 933 struct cu_request *cr; 934 935 mtx_lock(&cs->cs_lock); 936 937 if (cu->cu_closed) { 938 mtx_unlock(&cs->cs_lock); 939 return; 940 } 941 942 if (cu->cu_closing) { 943 while (cu->cu_closing) 944 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 945 KASSERT(cu->cu_closed, ("client should be closed")); 946 mtx_unlock(&cs->cs_lock); 947 return; 948 } 949 950 /* 951 * Abort any pending requests and wait until everyone 952 * has finished with clnt_vc_call. 953 */ 954 cu->cu_closing = TRUE; 955 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 956 if (cr->cr_client == cl) { 957 cr->cr_xid = 0; 958 cr->cr_error = ESHUTDOWN; 959 wakeup(cr); 960 } 961 } 962 963 while (cu->cu_threads) 964 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 965 966 cu->cu_closing = FALSE; 967 cu->cu_closed = TRUE; 968 969 mtx_unlock(&cs->cs_lock); 970 wakeup(cu); 971 } 972 973 static void 974 clnt_dg_destroy(CLIENT *cl) 975 { 976 struct cu_data *cu = (struct cu_data *)cl->cl_private; 977 struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; 978 struct socket *so = NULL; 979 bool_t lastsocketref; 980 981 clnt_dg_close(cl); 982 983 mtx_lock(&cs->cs_lock); 984 985 cs->cs_refs--; 986 if (cs->cs_refs == 0) { 987 mtx_destroy(&cs->cs_lock); 988 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 989 cu->cu_socket->so_upcallarg = NULL; 990 cu->cu_socket->so_upcall = NULL; 991 cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL; 992 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 993 mem_free(cs, sizeof(*cs)); 994 lastsocketref = TRUE; 995 } else { 996 mtx_unlock(&cs->cs_lock); 997 lastsocketref = FALSE; 998 } 999 1000 if (cu->cu_closeit && lastsocketref) { 1001 so = cu->cu_socket; 1002 cu->cu_socket = NULL; 1003 } 1004 1005 if (so) 1006 soclose(so); 1007 1008 if (cl->cl_netid && cl->cl_netid[0]) 1009 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1010 if (cl->cl_tp && cl->cl_tp[0]) 1011 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1012 mem_free(cu, sizeof (*cu)); 1013 mem_free(cl, sizeof (CLIENT)); 1014 } 1015 1016 /* 1017 * Make sure that the time is not garbage. -1 value is allowed. 1018 */ 1019 static bool_t 1020 time_not_ok(struct timeval *t) 1021 { 1022 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1023 t->tv_usec < -1 || t->tv_usec > 1000000); 1024 } 1025 1026 void 1027 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1028 { 1029 struct cu_socket *cs = (struct cu_socket *) arg; 1030 struct uio uio; 1031 struct mbuf *m; 1032 struct mbuf *control; 1033 struct cu_request *cr; 1034 int error, rcvflag, foundreq; 1035 uint32_t xid; 1036 1037 uio.uio_resid = 1000000000; 1038 uio.uio_td = curthread; 1039 do { 1040 m = NULL; 1041 control = NULL; 1042 rcvflag = MSG_DONTWAIT; 1043 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1044 if (control) 1045 m_freem(control); 1046 1047 if (error == EWOULDBLOCK) 1048 break; 1049 1050 /* 1051 * If there was an error, wake up all pending 1052 * requests. 1053 */ 1054 if (error) { 1055 mtx_lock(&cs->cs_lock); 1056 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1057 cr->cr_xid = 0; 1058 cr->cr_error = error; 1059 wakeup(cr); 1060 } 1061 mtx_unlock(&cs->cs_lock); 1062 break; 1063 } 1064 1065 /* 1066 * The XID is in the first uint32_t of the reply. 1067 */ 1068 if (m->m_len < sizeof(xid)) 1069 m = m_pullup(m, sizeof(xid)); 1070 if (!m) 1071 /* 1072 * Should never happen. 1073 */ 1074 continue; 1075 1076 xid = ntohl(*mtod(m, uint32_t *)); 1077 1078 /* 1079 * Attempt to match this reply with a pending request. 1080 */ 1081 mtx_lock(&cs->cs_lock); 1082 foundreq = 0; 1083 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1084 if (cr->cr_xid == xid) { 1085 /* 1086 * This one matches. We leave the 1087 * reply mbuf in cr->cr_mrep. Set the 1088 * XID to zero so that we will ignore 1089 * any duplicated replies that arrive 1090 * before clnt_dg_call removes it from 1091 * the queue. 1092 */ 1093 cr->cr_xid = 0; 1094 cr->cr_mrep = m; 1095 cr->cr_error = 0; 1096 foundreq = 1; 1097 wakeup(cr); 1098 break; 1099 } 1100 } 1101 mtx_unlock(&cs->cs_lock); 1102 1103 /* 1104 * If we didn't find the matching request, just drop 1105 * it - its probably a repeated reply. 1106 */ 1107 if (!foundreq) 1108 m_freem(m); 1109 } while (m); 1110 } 1111 1112