1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 /* 32 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 33 */ 34 35 #if defined(LIBC_SCCS) && !defined(lint) 36 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 38 #endif 39 #include <sys/cdefs.h> 40 __FBSDID("$FreeBSD$"); 41 42 /* 43 * Implements a connectionless client side RPC. 44 */ 45 46 #include <sys/param.h> 47 #include <sys/systm.h> 48 #include <sys/kernel.h> 49 #include <sys/lock.h> 50 #include <sys/malloc.h> 51 #include <sys/mbuf.h> 52 #include <sys/mutex.h> 53 #include <sys/pcpu.h> 54 #include <sys/proc.h> 55 #include <sys/socket.h> 56 #include <sys/socketvar.h> 57 #include <sys/time.h> 58 #include <sys/uio.h> 59 60 #include <rpc/rpc.h> 61 #include <rpc/rpc_com.h> 62 63 64 #ifdef _FREEFALL_CONFIG 65 /* 66 * Disable RPC exponential back-off for FreeBSD.org systems. 67 */ 68 #define RPC_MAX_BACKOFF 1 /* second */ 69 #else 70 #define RPC_MAX_BACKOFF 30 /* seconds */ 71 #endif 72 73 static bool_t time_not_ok(struct timeval *); 74 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 75 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 76 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 77 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 78 static void clnt_dg_abort(CLIENT *); 79 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 80 static void clnt_dg_close(CLIENT *); 81 static void clnt_dg_destroy(CLIENT *); 82 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 83 84 static struct clnt_ops clnt_dg_ops = { 85 .cl_call = clnt_dg_call, 86 .cl_abort = clnt_dg_abort, 87 .cl_geterr = clnt_dg_geterr, 88 .cl_freeres = clnt_dg_freeres, 89 .cl_close = clnt_dg_close, 90 .cl_destroy = clnt_dg_destroy, 91 .cl_control = clnt_dg_control 92 }; 93 94 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; 95 96 /* 97 * A pending RPC request which awaits a reply. Requests which have 98 * received their reply will have cr_xid set to zero and cr_mrep to 99 * the mbuf chain of the reply. 100 */ 101 struct cu_request { 102 TAILQ_ENTRY(cu_request) cr_link; 103 CLIENT *cr_client; /* owner */ 104 uint32_t cr_xid; /* XID of request */ 105 struct mbuf *cr_mrep; /* reply received by upcall */ 106 int cr_error; /* any error from upcall */ 107 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 108 }; 109 110 TAILQ_HEAD(cu_request_list, cu_request); 111 112 #define MCALL_MSG_SIZE 24 113 114 /* 115 * This structure is pointed to by the socket buffer's sb_upcallarg 116 * member. It is separate from the client private data to facilitate 117 * multiple clients sharing the same socket. The cs_lock mutex is used 118 * to protect all fields of this structure, the socket's receive 119 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 120 * structures is installed on the socket. 121 */ 122 struct cu_socket { 123 struct mtx cs_lock; 124 int cs_refs; /* Count of clients */ 125 struct cu_request_list cs_pending; /* Requests awaiting replies */ 126 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 127 }; 128 129 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 130 131 /* 132 * Private data kept per client handle 133 */ 134 struct cu_data { 135 int cu_threads; /* # threads in clnt_vc_call */ 136 bool_t cu_closing; /* TRUE if we are closing */ 137 bool_t cu_closed; /* TRUE if we are closed */ 138 struct socket *cu_socket; /* connection socket */ 139 bool_t cu_closeit; /* opened by library */ 140 struct sockaddr_storage cu_raddr; /* remote address */ 141 int cu_rlen; 142 struct timeval cu_wait; /* retransmit interval */ 143 struct timeval cu_total; /* total time for the call */ 144 struct rpc_err cu_error; 145 uint32_t cu_xid; 146 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 147 size_t cu_mcalllen; 148 size_t cu_sendsz; /* send size */ 149 size_t cu_recvsz; /* recv size */ 150 int cu_async; 151 int cu_connect; /* Use connect(). */ 152 int cu_connected; /* Have done connect(). */ 153 const char *cu_waitchan; 154 int cu_waitflag; 155 int cu_cwnd; /* congestion window */ 156 int cu_sent; /* number of in-flight RPCs */ 157 bool_t cu_cwnd_wait; 158 }; 159 160 #define CWNDSCALE 256 161 #define MAXCWND (32 * CWNDSCALE) 162 163 /* 164 * Connection less client creation returns with client handle parameters. 165 * Default options are set, which the user can change using clnt_control(). 166 * fd should be open and bound. 167 * NB: The rpch->cl_auth is initialized to null authentication. 168 * Caller may wish to set this something more useful. 169 * 170 * sendsz and recvsz are the maximum allowable packet sizes that can be 171 * sent and received. Normally they are the same, but they can be 172 * changed to improve the program efficiency and buffer allocation. 173 * If they are 0, use the transport default. 174 * 175 * If svcaddr is NULL, returns NULL. 176 */ 177 CLIENT * 178 clnt_dg_create( 179 struct socket *so, 180 struct sockaddr *svcaddr, /* servers address */ 181 rpcprog_t program, /* program number */ 182 rpcvers_t version, /* version number */ 183 size_t sendsz, /* buffer recv size */ 184 size_t recvsz) /* buffer send size */ 185 { 186 CLIENT *cl = NULL; /* client handle */ 187 struct cu_data *cu = NULL; /* private data */ 188 struct cu_socket *cs = NULL; 189 struct sockbuf *sb; 190 struct timeval now; 191 struct rpc_msg call_msg; 192 struct __rpc_sockinfo si; 193 XDR xdrs; 194 195 if (svcaddr == NULL) { 196 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 197 return (NULL); 198 } 199 200 if (!__rpc_socket2sockinfo(so, &si)) { 201 rpc_createerr.cf_stat = RPC_TLIERROR; 202 rpc_createerr.cf_error.re_errno = 0; 203 return (NULL); 204 } 205 206 /* 207 * Find the receive and the send size 208 */ 209 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 210 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 211 if ((sendsz == 0) || (recvsz == 0)) { 212 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 213 rpc_createerr.cf_error.re_errno = 0; 214 return (NULL); 215 } 216 217 cl = mem_alloc(sizeof (CLIENT)); 218 219 /* 220 * Should be multiple of 4 for XDR. 221 */ 222 sendsz = ((sendsz + 3) / 4) * 4; 223 recvsz = ((recvsz + 3) / 4) * 4; 224 cu = mem_alloc(sizeof (*cu)); 225 cu->cu_threads = 0; 226 cu->cu_closing = FALSE; 227 cu->cu_closed = FALSE; 228 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 229 cu->cu_rlen = svcaddr->sa_len; 230 /* Other values can also be set through clnt_control() */ 231 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 232 cu->cu_wait.tv_usec = 0; 233 cu->cu_total.tv_sec = -1; 234 cu->cu_total.tv_usec = -1; 235 cu->cu_sendsz = sendsz; 236 cu->cu_recvsz = recvsz; 237 cu->cu_async = FALSE; 238 cu->cu_connect = FALSE; 239 cu->cu_connected = FALSE; 240 cu->cu_waitchan = "rpcrecv"; 241 cu->cu_waitflag = 0; 242 cu->cu_cwnd = MAXCWND / 2; 243 cu->cu_sent = 0; 244 cu->cu_cwnd_wait = FALSE; 245 (void) getmicrotime(&now); 246 cu->cu_xid = __RPC_GETXID(&now); 247 call_msg.rm_xid = cu->cu_xid; 248 call_msg.rm_call.cb_prog = program; 249 call_msg.rm_call.cb_vers = version; 250 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 251 if (! xdr_callhdr(&xdrs, &call_msg)) { 252 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 253 rpc_createerr.cf_error.re_errno = 0; 254 goto err2; 255 } 256 cu->cu_mcalllen = XDR_GETPOS(&xdrs);; 257 258 /* 259 * By default, closeit is always FALSE. It is users responsibility 260 * to do a close on it, else the user may use clnt_control 261 * to let clnt_destroy do it for him/her. 262 */ 263 cu->cu_closeit = FALSE; 264 cu->cu_socket = so; 265 soreserve(so, 256*1024, 256*1024); 266 267 sb = &so->so_rcv; 268 SOCKBUF_LOCK(&so->so_rcv); 269 recheck_socket: 270 if (sb->sb_upcall) { 271 if (sb->sb_upcall != clnt_dg_soupcall) { 272 SOCKBUF_UNLOCK(&so->so_rcv); 273 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 274 goto err2; 275 } 276 cs = (struct cu_socket *) sb->sb_upcallarg; 277 mtx_lock(&cs->cs_lock); 278 cs->cs_refs++; 279 mtx_unlock(&cs->cs_lock); 280 } else { 281 /* 282 * We are the first on this socket - allocate the 283 * structure and install it in the socket. 284 */ 285 SOCKBUF_UNLOCK(&so->so_rcv); 286 cs = mem_alloc(sizeof(*cs)); 287 SOCKBUF_LOCK(&so->so_rcv); 288 if (sb->sb_upcall) { 289 /* 290 * We have lost a race with some other client. 291 */ 292 mem_free(cs, sizeof(*cs)); 293 goto recheck_socket; 294 } 295 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 296 cs->cs_refs = 1; 297 cs->cs_upcallrefs = 0; 298 TAILQ_INIT(&cs->cs_pending); 299 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 300 } 301 SOCKBUF_UNLOCK(&so->so_rcv); 302 303 cl->cl_refs = 1; 304 cl->cl_ops = &clnt_dg_ops; 305 cl->cl_private = (caddr_t)(void *)cu; 306 cl->cl_auth = authnone_create(); 307 cl->cl_tp = NULL; 308 cl->cl_netid = NULL; 309 return (cl); 310 err2: 311 if (cl) { 312 mem_free(cl, sizeof (CLIENT)); 313 if (cu) 314 mem_free(cu, sizeof (*cu)); 315 } 316 return (NULL); 317 } 318 319 static enum clnt_stat 320 clnt_dg_call( 321 CLIENT *cl, /* client handle */ 322 struct rpc_callextra *ext, /* call metadata */ 323 rpcproc_t proc, /* procedure number */ 324 struct mbuf *args, /* pointer to args */ 325 struct mbuf **resultsp, /* pointer to results */ 326 struct timeval utimeout) /* seconds to wait before giving up */ 327 { 328 struct cu_data *cu = (struct cu_data *)cl->cl_private; 329 struct cu_socket *cs; 330 struct rpc_timers *rt; 331 AUTH *auth; 332 struct rpc_err *errp; 333 enum clnt_stat stat; 334 XDR xdrs; 335 struct rpc_msg reply_msg; 336 bool_t ok; 337 int retrans; /* number of re-transmits so far */ 338 int nrefreshes = 2; /* number of times to refresh cred */ 339 struct timeval *tvp; 340 int timeout; 341 int retransmit_time; 342 int next_sendtime, starttime, rtt, time_waited, tv = 0; 343 struct sockaddr *sa; 344 socklen_t salen; 345 uint32_t xid = 0; 346 struct mbuf *mreq = NULL, *results; 347 struct cu_request *cr; 348 int error; 349 350 cs = cu->cu_socket->so_rcv.sb_upcallarg; 351 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 352 353 mtx_lock(&cs->cs_lock); 354 355 if (cu->cu_closing || cu->cu_closed) { 356 mtx_unlock(&cs->cs_lock); 357 free(cr, M_RPC); 358 return (RPC_CANTSEND); 359 } 360 cu->cu_threads++; 361 362 if (ext) { 363 auth = ext->rc_auth; 364 errp = &ext->rc_err; 365 } else { 366 auth = cl->cl_auth; 367 errp = &cu->cu_error; 368 } 369 370 cr->cr_client = cl; 371 cr->cr_mrep = NULL; 372 cr->cr_error = 0; 373 374 if (cu->cu_total.tv_usec == -1) { 375 tvp = &utimeout; /* use supplied timeout */ 376 } else { 377 tvp = &cu->cu_total; /* use default timeout */ 378 } 379 if (tvp->tv_sec || tvp->tv_usec) 380 timeout = tvtohz(tvp); 381 else 382 timeout = 0; 383 384 if (cu->cu_connect && !cu->cu_connected) { 385 mtx_unlock(&cs->cs_lock); 386 error = soconnect(cu->cu_socket, 387 (struct sockaddr *)&cu->cu_raddr, curthread); 388 mtx_lock(&cs->cs_lock); 389 if (error) { 390 errp->re_errno = error; 391 errp->re_status = stat = RPC_CANTSEND; 392 goto out; 393 } 394 cu->cu_connected = 1; 395 } 396 if (cu->cu_connected) { 397 sa = NULL; 398 salen = 0; 399 } else { 400 sa = (struct sockaddr *)&cu->cu_raddr; 401 salen = cu->cu_rlen; 402 } 403 time_waited = 0; 404 retrans = 0; 405 if (ext && ext->rc_timers) { 406 rt = ext->rc_timers; 407 if (!rt->rt_rtxcur) 408 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 409 retransmit_time = next_sendtime = rt->rt_rtxcur; 410 } else { 411 rt = NULL; 412 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 413 } 414 415 starttime = ticks; 416 417 call_again: 418 mtx_assert(&cs->cs_lock, MA_OWNED); 419 420 cu->cu_xid++; 421 xid = cu->cu_xid; 422 423 send_again: 424 mtx_unlock(&cs->cs_lock); 425 426 MGETHDR(mreq, M_WAIT, MT_DATA); 427 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 428 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 429 mreq->m_len = cu->cu_mcalllen; 430 431 /* 432 * The XID is the first thing in the request. 433 */ 434 *mtod(mreq, uint32_t *) = htonl(xid); 435 436 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 437 438 if (cu->cu_async == TRUE && args == NULL) 439 goto get_reply; 440 441 if ((! XDR_PUTINT32(&xdrs, &proc)) || 442 (! AUTH_MARSHALL(auth, xid, &xdrs, 443 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 444 errp->re_status = stat = RPC_CANTENCODEARGS; 445 mtx_lock(&cs->cs_lock); 446 goto out; 447 } 448 mreq->m_pkthdr.len = m_length(mreq, NULL); 449 450 cr->cr_xid = xid; 451 mtx_lock(&cs->cs_lock); 452 453 /* 454 * Try to get a place in the congestion window. 455 */ 456 while (cu->cu_sent >= cu->cu_cwnd) { 457 cu->cu_cwnd_wait = TRUE; 458 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 459 cu->cu_waitflag, "rpccwnd", 0); 460 if (error) { 461 errp->re_errno = error; 462 errp->re_status = stat = RPC_CANTSEND; 463 goto out; 464 } 465 } 466 cu->cu_sent += CWNDSCALE; 467 468 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 469 mtx_unlock(&cs->cs_lock); 470 471 /* 472 * sosend consumes mreq. 473 */ 474 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 475 mreq = NULL; 476 477 /* 478 * sub-optimal code appears here because we have 479 * some clock time to spare while the packets are in flight. 480 * (We assume that this is actually only executed once.) 481 */ 482 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 483 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 484 reply_msg.acpted_rply.ar_verf.oa_length = 0; 485 reply_msg.acpted_rply.ar_results.where = NULL; 486 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 487 488 mtx_lock(&cs->cs_lock); 489 if (error) { 490 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 491 errp->re_errno = error; 492 errp->re_status = stat = RPC_CANTSEND; 493 cu->cu_sent -= CWNDSCALE; 494 if (cu->cu_cwnd_wait) { 495 cu->cu_cwnd_wait = FALSE; 496 wakeup(&cu->cu_cwnd_wait); 497 } 498 goto out; 499 } 500 501 /* 502 * Check to see if we got an upcall while waiting for the 503 * lock. 504 */ 505 if (cr->cr_error) { 506 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 507 errp->re_errno = cr->cr_error; 508 errp->re_status = stat = RPC_CANTRECV; 509 cu->cu_sent -= CWNDSCALE; 510 if (cu->cu_cwnd_wait) { 511 cu->cu_cwnd_wait = FALSE; 512 wakeup(&cu->cu_cwnd_wait); 513 } 514 goto out; 515 } 516 if (cr->cr_mrep) { 517 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 518 cu->cu_sent -= CWNDSCALE; 519 if (cu->cu_cwnd_wait) { 520 cu->cu_cwnd_wait = FALSE; 521 wakeup(&cu->cu_cwnd_wait); 522 } 523 goto got_reply; 524 } 525 526 /* 527 * Hack to provide rpc-based message passing 528 */ 529 if (timeout == 0) { 530 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 531 errp->re_status = stat = RPC_TIMEDOUT; 532 cu->cu_sent -= CWNDSCALE; 533 if (cu->cu_cwnd_wait) { 534 cu->cu_cwnd_wait = FALSE; 535 wakeup(&cu->cu_cwnd_wait); 536 } 537 goto out; 538 } 539 540 get_reply: 541 for (;;) { 542 /* Decide how long to wait. */ 543 if (next_sendtime < timeout) 544 tv = next_sendtime; 545 else 546 tv = timeout; 547 tv -= time_waited; 548 549 if (tv > 0) { 550 if (cu->cu_closing || cu->cu_closed) 551 error = 0; 552 else 553 error = msleep(cr, &cs->cs_lock, 554 cu->cu_waitflag, cu->cu_waitchan, tv); 555 } else { 556 error = EWOULDBLOCK; 557 } 558 559 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 560 cu->cu_sent -= CWNDSCALE; 561 if (cu->cu_cwnd_wait) { 562 cu->cu_cwnd_wait = FALSE; 563 wakeup(&cu->cu_cwnd_wait); 564 } 565 566 if (!error) { 567 /* 568 * We were woken up by the upcall. If the 569 * upcall had a receive error, report that, 570 * otherwise we have a reply. 571 */ 572 if (cr->cr_error) { 573 errp->re_errno = cr->cr_error; 574 errp->re_status = stat = RPC_CANTRECV; 575 goto out; 576 } 577 578 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 579 + cu->cu_cwnd / 2) / cu->cu_cwnd; 580 if (cu->cu_cwnd > MAXCWND) 581 cu->cu_cwnd = MAXCWND; 582 583 if (rt) { 584 /* 585 * Add one to the time since a tick 586 * count of N means that the actual 587 * time taken was somewhere between N 588 * and N+1. 589 */ 590 rtt = ticks - starttime + 1; 591 592 /* 593 * Update our estimate of the round 594 * trip time using roughly the 595 * algorithm described in RFC 596 * 2988. Given an RTT sample R: 597 * 598 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 599 * SRTT = (1-alpha) * SRTT + alpha * R 600 * 601 * where alpha = 0.125 and beta = 0.25. 602 * 603 * The initial retransmit timeout is 604 * SRTT + 4*RTTVAR and doubles on each 605 * retransmision. 606 */ 607 if (rt->rt_srtt == 0) { 608 rt->rt_srtt = rtt; 609 rt->rt_deviate = rtt / 2; 610 } else { 611 int32_t error = rtt - rt->rt_srtt; 612 rt->rt_srtt += error / 8; 613 error = abs(error) - rt->rt_deviate; 614 rt->rt_deviate += error / 4; 615 } 616 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 617 } 618 619 break; 620 } 621 622 /* 623 * The sleep returned an error so our request is still 624 * on the list. If we got EWOULDBLOCK, we may want to 625 * re-send the request. 626 */ 627 if (error != EWOULDBLOCK) { 628 errp->re_errno = error; 629 if (error == EINTR) 630 errp->re_status = stat = RPC_INTR; 631 else 632 errp->re_status = stat = RPC_CANTRECV; 633 goto out; 634 } 635 636 time_waited = ticks - starttime; 637 638 /* Check for timeout. */ 639 if (time_waited > timeout) { 640 errp->re_errno = EWOULDBLOCK; 641 errp->re_status = stat = RPC_TIMEDOUT; 642 goto out; 643 } 644 645 /* Retransmit if necessary. */ 646 if (time_waited >= next_sendtime) { 647 cu->cu_cwnd /= 2; 648 if (cu->cu_cwnd < CWNDSCALE) 649 cu->cu_cwnd = CWNDSCALE; 650 if (ext && ext->rc_feedback) { 651 mtx_unlock(&cs->cs_lock); 652 if (retrans == 0) 653 ext->rc_feedback(FEEDBACK_REXMIT1, 654 proc, ext->rc_feedback_arg); 655 else 656 ext->rc_feedback(FEEDBACK_REXMIT2, 657 proc, ext->rc_feedback_arg); 658 mtx_lock(&cs->cs_lock); 659 } 660 if (cu->cu_closing || cu->cu_closed) { 661 errp->re_errno = ESHUTDOWN; 662 errp->re_status = stat = RPC_CANTRECV; 663 goto out; 664 } 665 retrans++; 666 /* update retransmit_time */ 667 if (retransmit_time < RPC_MAX_BACKOFF * hz) 668 retransmit_time = 2 * retransmit_time; 669 next_sendtime += retransmit_time; 670 goto send_again; 671 } 672 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 673 } 674 675 got_reply: 676 /* 677 * Now decode and validate the response. We need to drop the 678 * lock since xdr_replymsg may end up sleeping in malloc. 679 */ 680 mtx_unlock(&cs->cs_lock); 681 682 if (ext && ext->rc_feedback) 683 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 684 685 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 686 ok = xdr_replymsg(&xdrs, &reply_msg); 687 cr->cr_mrep = NULL; 688 689 if (ok) { 690 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 691 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 692 errp->re_status = stat = RPC_SUCCESS; 693 else 694 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 695 696 if (errp->re_status == RPC_SUCCESS) { 697 results = xdrmbuf_getall(&xdrs); 698 if (! AUTH_VALIDATE(auth, xid, 699 &reply_msg.acpted_rply.ar_verf, 700 &results)) { 701 errp->re_status = stat = RPC_AUTHERROR; 702 errp->re_why = AUTH_INVALIDRESP; 703 if (retrans && 704 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 705 /* 706 * If we retransmitted, its 707 * possible that we will 708 * receive a reply for one of 709 * the earlier transmissions 710 * (which will use an older 711 * RPCSEC_GSS sequence 712 * number). In this case, just 713 * go back and listen for a 714 * new reply. We could keep a 715 * record of all the seq 716 * numbers we have transmitted 717 * so far so that we could 718 * accept a reply for any of 719 * them here. 720 */ 721 XDR_DESTROY(&xdrs); 722 mtx_lock(&cs->cs_lock); 723 TAILQ_INSERT_TAIL(&cs->cs_pending, 724 cr, cr_link); 725 cr->cr_mrep = NULL; 726 goto get_reply; 727 } 728 } else { 729 *resultsp = results; 730 } 731 } /* end successful completion */ 732 /* 733 * If unsuccesful AND error is an authentication error 734 * then refresh credentials and try again, else break 735 */ 736 else if (stat == RPC_AUTHERROR) 737 /* maybe our credentials need to be refreshed ... */ 738 if (nrefreshes > 0 && 739 AUTH_REFRESH(auth, &reply_msg)) { 740 nrefreshes--; 741 XDR_DESTROY(&xdrs); 742 mtx_lock(&cs->cs_lock); 743 goto call_again; 744 } 745 /* end of unsuccessful completion */ 746 } /* end of valid reply message */ 747 else { 748 errp->re_status = stat = RPC_CANTDECODERES; 749 750 } 751 XDR_DESTROY(&xdrs); 752 mtx_lock(&cs->cs_lock); 753 out: 754 mtx_assert(&cs->cs_lock, MA_OWNED); 755 756 if (mreq) 757 m_freem(mreq); 758 if (cr->cr_mrep) 759 m_freem(cr->cr_mrep); 760 761 cu->cu_threads--; 762 if (cu->cu_closing) 763 wakeup(cu); 764 765 mtx_unlock(&cs->cs_lock); 766 767 if (auth && stat != RPC_SUCCESS) 768 AUTH_VALIDATE(auth, xid, NULL, NULL); 769 770 free(cr, M_RPC); 771 772 return (stat); 773 } 774 775 static void 776 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 777 { 778 struct cu_data *cu = (struct cu_data *)cl->cl_private; 779 780 *errp = cu->cu_error; 781 } 782 783 static bool_t 784 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 785 { 786 XDR xdrs; 787 bool_t dummy; 788 789 xdrs.x_op = XDR_FREE; 790 dummy = (*xdr_res)(&xdrs, res_ptr); 791 792 return (dummy); 793 } 794 795 /*ARGSUSED*/ 796 static void 797 clnt_dg_abort(CLIENT *h) 798 { 799 } 800 801 static bool_t 802 clnt_dg_control(CLIENT *cl, u_int request, void *info) 803 { 804 struct cu_data *cu = (struct cu_data *)cl->cl_private; 805 struct cu_socket *cs; 806 struct sockaddr *addr; 807 808 cs = cu->cu_socket->so_rcv.sb_upcallarg; 809 mtx_lock(&cs->cs_lock); 810 811 switch (request) { 812 case CLSET_FD_CLOSE: 813 cu->cu_closeit = TRUE; 814 mtx_unlock(&cs->cs_lock); 815 return (TRUE); 816 case CLSET_FD_NCLOSE: 817 cu->cu_closeit = FALSE; 818 mtx_unlock(&cs->cs_lock); 819 return (TRUE); 820 } 821 822 /* for other requests which use info */ 823 if (info == NULL) { 824 mtx_unlock(&cs->cs_lock); 825 return (FALSE); 826 } 827 switch (request) { 828 case CLSET_TIMEOUT: 829 if (time_not_ok((struct timeval *)info)) { 830 mtx_unlock(&cs->cs_lock); 831 return (FALSE); 832 } 833 cu->cu_total = *(struct timeval *)info; 834 break; 835 case CLGET_TIMEOUT: 836 *(struct timeval *)info = cu->cu_total; 837 break; 838 case CLSET_RETRY_TIMEOUT: 839 if (time_not_ok((struct timeval *)info)) { 840 mtx_unlock(&cs->cs_lock); 841 return (FALSE); 842 } 843 cu->cu_wait = *(struct timeval *)info; 844 break; 845 case CLGET_RETRY_TIMEOUT: 846 *(struct timeval *)info = cu->cu_wait; 847 break; 848 case CLGET_SVC_ADDR: 849 /* 850 * Slightly different semantics to userland - we use 851 * sockaddr instead of netbuf. 852 */ 853 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 854 break; 855 case CLSET_SVC_ADDR: /* set to new address */ 856 addr = (struct sockaddr *)info; 857 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 858 break; 859 case CLGET_XID: 860 *(uint32_t *)info = cu->cu_xid; 861 break; 862 863 case CLSET_XID: 864 /* This will set the xid of the NEXT call */ 865 /* decrement by 1 as clnt_dg_call() increments once */ 866 cu->cu_xid = *(uint32_t *)info - 1; 867 break; 868 869 case CLGET_VERS: 870 /* 871 * This RELIES on the information that, in the call body, 872 * the version number field is the fifth field from the 873 * begining of the RPC header. MUST be changed if the 874 * call_struct is changed 875 */ 876 *(uint32_t *)info = 877 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 878 4 * BYTES_PER_XDR_UNIT)); 879 break; 880 881 case CLSET_VERS: 882 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 883 = htonl(*(uint32_t *)info); 884 break; 885 886 case CLGET_PROG: 887 /* 888 * This RELIES on the information that, in the call body, 889 * the program number field is the fourth field from the 890 * begining of the RPC header. MUST be changed if the 891 * call_struct is changed 892 */ 893 *(uint32_t *)info = 894 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 895 3 * BYTES_PER_XDR_UNIT)); 896 break; 897 898 case CLSET_PROG: 899 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 900 = htonl(*(uint32_t *)info); 901 break; 902 case CLSET_ASYNC: 903 cu->cu_async = *(int *)info; 904 break; 905 case CLSET_CONNECT: 906 cu->cu_connect = *(int *)info; 907 break; 908 case CLSET_WAITCHAN: 909 cu->cu_waitchan = (const char *)info; 910 break; 911 case CLGET_WAITCHAN: 912 *(const char **) info = cu->cu_waitchan; 913 break; 914 case CLSET_INTERRUPTIBLE: 915 if (*(int *) info) 916 cu->cu_waitflag = PCATCH; 917 else 918 cu->cu_waitflag = 0; 919 break; 920 case CLGET_INTERRUPTIBLE: 921 if (cu->cu_waitflag) 922 *(int *) info = TRUE; 923 else 924 *(int *) info = FALSE; 925 break; 926 default: 927 mtx_unlock(&cs->cs_lock); 928 return (FALSE); 929 } 930 mtx_unlock(&cs->cs_lock); 931 return (TRUE); 932 } 933 934 static void 935 clnt_dg_close(CLIENT *cl) 936 { 937 struct cu_data *cu = (struct cu_data *)cl->cl_private; 938 struct cu_socket *cs; 939 struct cu_request *cr; 940 941 cs = cu->cu_socket->so_rcv.sb_upcallarg; 942 mtx_lock(&cs->cs_lock); 943 944 if (cu->cu_closed) { 945 mtx_unlock(&cs->cs_lock); 946 return; 947 } 948 949 if (cu->cu_closing) { 950 while (cu->cu_closing) 951 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 952 KASSERT(cu->cu_closed, ("client should be closed")); 953 mtx_unlock(&cs->cs_lock); 954 return; 955 } 956 957 /* 958 * Abort any pending requests and wait until everyone 959 * has finished with clnt_vc_call. 960 */ 961 cu->cu_closing = TRUE; 962 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 963 if (cr->cr_client == cl) { 964 cr->cr_xid = 0; 965 cr->cr_error = ESHUTDOWN; 966 wakeup(cr); 967 } 968 } 969 970 while (cu->cu_threads) 971 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 972 973 cu->cu_closing = FALSE; 974 cu->cu_closed = TRUE; 975 976 mtx_unlock(&cs->cs_lock); 977 wakeup(cu); 978 } 979 980 static void 981 clnt_dg_destroy(CLIENT *cl) 982 { 983 struct cu_data *cu = (struct cu_data *)cl->cl_private; 984 struct cu_socket *cs; 985 struct socket *so = NULL; 986 bool_t lastsocketref; 987 988 cs = cu->cu_socket->so_rcv.sb_upcallarg; 989 clnt_dg_close(cl); 990 991 mtx_lock(&cs->cs_lock); 992 993 cs->cs_refs--; 994 if (cs->cs_refs == 0) { 995 mtx_unlock(&cs->cs_lock); 996 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 997 soupcall_clear(cu->cu_socket, SO_RCV); 998 clnt_dg_upcallsdone(cu->cu_socket, cs); 999 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1000 mtx_destroy(&cs->cs_lock); 1001 mem_free(cs, sizeof(*cs)); 1002 lastsocketref = TRUE; 1003 } else { 1004 mtx_unlock(&cs->cs_lock); 1005 lastsocketref = FALSE; 1006 } 1007 1008 if (cu->cu_closeit && lastsocketref) { 1009 so = cu->cu_socket; 1010 cu->cu_socket = NULL; 1011 } 1012 1013 if (so) 1014 soclose(so); 1015 1016 if (cl->cl_netid && cl->cl_netid[0]) 1017 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1018 if (cl->cl_tp && cl->cl_tp[0]) 1019 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1020 mem_free(cu, sizeof (*cu)); 1021 mem_free(cl, sizeof (CLIENT)); 1022 } 1023 1024 /* 1025 * Make sure that the time is not garbage. -1 value is allowed. 1026 */ 1027 static bool_t 1028 time_not_ok(struct timeval *t) 1029 { 1030 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1031 t->tv_usec < -1 || t->tv_usec > 1000000); 1032 } 1033 1034 int 1035 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1036 { 1037 struct cu_socket *cs = (struct cu_socket *) arg; 1038 struct uio uio; 1039 struct mbuf *m; 1040 struct mbuf *control; 1041 struct cu_request *cr; 1042 int error, rcvflag, foundreq; 1043 uint32_t xid; 1044 1045 cs->cs_upcallrefs++; 1046 uio.uio_resid = 1000000000; 1047 uio.uio_td = curthread; 1048 do { 1049 SOCKBUF_UNLOCK(&so->so_rcv); 1050 m = NULL; 1051 control = NULL; 1052 rcvflag = MSG_DONTWAIT; 1053 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1054 if (control) 1055 m_freem(control); 1056 SOCKBUF_LOCK(&so->so_rcv); 1057 1058 if (error == EWOULDBLOCK) 1059 break; 1060 1061 /* 1062 * If there was an error, wake up all pending 1063 * requests. 1064 */ 1065 if (error) { 1066 mtx_lock(&cs->cs_lock); 1067 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1068 cr->cr_xid = 0; 1069 cr->cr_error = error; 1070 wakeup(cr); 1071 } 1072 mtx_unlock(&cs->cs_lock); 1073 break; 1074 } 1075 1076 /* 1077 * The XID is in the first uint32_t of the reply. 1078 */ 1079 if (m->m_len < sizeof(xid)) 1080 m = m_pullup(m, sizeof(xid)); 1081 if (!m) 1082 /* 1083 * Should never happen. 1084 */ 1085 continue; 1086 1087 xid = ntohl(*mtod(m, uint32_t *)); 1088 1089 /* 1090 * Attempt to match this reply with a pending request. 1091 */ 1092 mtx_lock(&cs->cs_lock); 1093 foundreq = 0; 1094 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1095 if (cr->cr_xid == xid) { 1096 /* 1097 * This one matches. We leave the 1098 * reply mbuf in cr->cr_mrep. Set the 1099 * XID to zero so that we will ignore 1100 * any duplicated replies that arrive 1101 * before clnt_dg_call removes it from 1102 * the queue. 1103 */ 1104 cr->cr_xid = 0; 1105 cr->cr_mrep = m; 1106 cr->cr_error = 0; 1107 foundreq = 1; 1108 wakeup(cr); 1109 break; 1110 } 1111 } 1112 mtx_unlock(&cs->cs_lock); 1113 1114 /* 1115 * If we didn't find the matching request, just drop 1116 * it - its probably a repeated reply. 1117 */ 1118 if (!foundreq) 1119 m_freem(m); 1120 } while (m); 1121 cs->cs_upcallrefs--; 1122 if (cs->cs_upcallrefs < 0) 1123 panic("rpcdg upcall refcnt"); 1124 if (cs->cs_upcallrefs == 0) 1125 wakeup(&cs->cs_upcallrefs); 1126 return (SU_OK); 1127 } 1128 1129 /* 1130 * Wait for all upcalls in progress to complete. 1131 */ 1132 static void 1133 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1134 { 1135 1136 SOCKBUF_LOCK_ASSERT(&so->so_rcv); 1137 1138 while (cs->cs_upcallrefs > 0) 1139 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1140 "rpcdgup", 0); 1141 } 1142