1 /* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3 /* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 /* 32 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 33 */ 34 35 #if defined(LIBC_SCCS) && !defined(lint) 36 #ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 38 #endif 39 #include <sys/cdefs.h> 40 __FBSDID("$FreeBSD$"); 41 42 /* 43 * Implements a connectionless client side RPC. 44 */ 45 46 #include <sys/param.h> 47 #include <sys/systm.h> 48 #include <sys/kernel.h> 49 #include <sys/lock.h> 50 #include <sys/malloc.h> 51 #include <sys/mbuf.h> 52 #include <sys/mutex.h> 53 #include <sys/pcpu.h> 54 #include <sys/proc.h> 55 #include <sys/socket.h> 56 #include <sys/socketvar.h> 57 #include <sys/time.h> 58 #include <sys/uio.h> 59 60 #include <net/vnet.h> 61 62 #include <rpc/rpc.h> 63 #include <rpc/rpc_com.h> 64 65 66 #ifdef _FREEFALL_CONFIG 67 /* 68 * Disable RPC exponential back-off for FreeBSD.org systems. 69 */ 70 #define RPC_MAX_BACKOFF 1 /* second */ 71 #else 72 #define RPC_MAX_BACKOFF 30 /* seconds */ 73 #endif 74 75 static bool_t time_not_ok(struct timeval *); 76 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 77 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 78 static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 79 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 80 static void clnt_dg_abort(CLIENT *); 81 static bool_t clnt_dg_control(CLIENT *, u_int, void *); 82 static void clnt_dg_close(CLIENT *); 83 static void clnt_dg_destroy(CLIENT *); 84 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 85 86 static struct clnt_ops clnt_dg_ops = { 87 .cl_call = clnt_dg_call, 88 .cl_abort = clnt_dg_abort, 89 .cl_geterr = clnt_dg_geterr, 90 .cl_freeres = clnt_dg_freeres, 91 .cl_close = clnt_dg_close, 92 .cl_destroy = clnt_dg_destroy, 93 .cl_control = clnt_dg_control 94 }; 95 96 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; 97 98 /* 99 * A pending RPC request which awaits a reply. Requests which have 100 * received their reply will have cr_xid set to zero and cr_mrep to 101 * the mbuf chain of the reply. 102 */ 103 struct cu_request { 104 TAILQ_ENTRY(cu_request) cr_link; 105 CLIENT *cr_client; /* owner */ 106 uint32_t cr_xid; /* XID of request */ 107 struct mbuf *cr_mrep; /* reply received by upcall */ 108 int cr_error; /* any error from upcall */ 109 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 110 }; 111 112 TAILQ_HEAD(cu_request_list, cu_request); 113 114 #define MCALL_MSG_SIZE 24 115 116 /* 117 * This structure is pointed to by the socket buffer's sb_upcallarg 118 * member. It is separate from the client private data to facilitate 119 * multiple clients sharing the same socket. The cs_lock mutex is used 120 * to protect all fields of this structure, the socket's receive 121 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 122 * structures is installed on the socket. 123 */ 124 struct cu_socket { 125 struct mtx cs_lock; 126 int cs_refs; /* Count of clients */ 127 struct cu_request_list cs_pending; /* Requests awaiting replies */ 128 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 129 }; 130 131 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 132 133 /* 134 * Private data kept per client handle 135 */ 136 struct cu_data { 137 int cu_threads; /* # threads in clnt_vc_call */ 138 bool_t cu_closing; /* TRUE if we are closing */ 139 bool_t cu_closed; /* TRUE if we are closed */ 140 struct socket *cu_socket; /* connection socket */ 141 bool_t cu_closeit; /* opened by library */ 142 struct sockaddr_storage cu_raddr; /* remote address */ 143 int cu_rlen; 144 struct timeval cu_wait; /* retransmit interval */ 145 struct timeval cu_total; /* total time for the call */ 146 struct rpc_err cu_error; 147 uint32_t cu_xid; 148 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 149 size_t cu_mcalllen; 150 size_t cu_sendsz; /* send size */ 151 size_t cu_recvsz; /* recv size */ 152 int cu_async; 153 int cu_connect; /* Use connect(). */ 154 int cu_connected; /* Have done connect(). */ 155 const char *cu_waitchan; 156 int cu_waitflag; 157 int cu_cwnd; /* congestion window */ 158 int cu_sent; /* number of in-flight RPCs */ 159 bool_t cu_cwnd_wait; 160 }; 161 162 #define CWNDSCALE 256 163 #define MAXCWND (32 * CWNDSCALE) 164 165 /* 166 * Connection less client creation returns with client handle parameters. 167 * Default options are set, which the user can change using clnt_control(). 168 * fd should be open and bound. 169 * NB: The rpch->cl_auth is initialized to null authentication. 170 * Caller may wish to set this something more useful. 171 * 172 * sendsz and recvsz are the maximum allowable packet sizes that can be 173 * sent and received. Normally they are the same, but they can be 174 * changed to improve the program efficiency and buffer allocation. 175 * If they are 0, use the transport default. 176 * 177 * If svcaddr is NULL, returns NULL. 178 */ 179 CLIENT * 180 clnt_dg_create( 181 struct socket *so, 182 struct sockaddr *svcaddr, /* servers address */ 183 rpcprog_t program, /* program number */ 184 rpcvers_t version, /* version number */ 185 size_t sendsz, /* buffer recv size */ 186 size_t recvsz) /* buffer send size */ 187 { 188 CLIENT *cl = NULL; /* client handle */ 189 struct cu_data *cu = NULL; /* private data */ 190 struct cu_socket *cs = NULL; 191 struct sockbuf *sb; 192 struct timeval now; 193 struct rpc_msg call_msg; 194 struct __rpc_sockinfo si; 195 XDR xdrs; 196 197 if (svcaddr == NULL) { 198 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 199 return (NULL); 200 } 201 202 CURVNET_SET(so->so_vnet); 203 if (!__rpc_socket2sockinfo(so, &si)) { 204 rpc_createerr.cf_stat = RPC_TLIERROR; 205 rpc_createerr.cf_error.re_errno = 0; 206 CURVNET_RESTORE(); 207 return (NULL); 208 } 209 CURVNET_RESTORE(); 210 211 /* 212 * Find the receive and the send size 213 */ 214 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 215 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 216 if ((sendsz == 0) || (recvsz == 0)) { 217 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 218 rpc_createerr.cf_error.re_errno = 0; 219 return (NULL); 220 } 221 222 cl = mem_alloc(sizeof (CLIENT)); 223 224 /* 225 * Should be multiple of 4 for XDR. 226 */ 227 sendsz = ((sendsz + 3) / 4) * 4; 228 recvsz = ((recvsz + 3) / 4) * 4; 229 cu = mem_alloc(sizeof (*cu)); 230 cu->cu_threads = 0; 231 cu->cu_closing = FALSE; 232 cu->cu_closed = FALSE; 233 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 234 cu->cu_rlen = svcaddr->sa_len; 235 /* Other values can also be set through clnt_control() */ 236 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 237 cu->cu_wait.tv_usec = 0; 238 cu->cu_total.tv_sec = -1; 239 cu->cu_total.tv_usec = -1; 240 cu->cu_sendsz = sendsz; 241 cu->cu_recvsz = recvsz; 242 cu->cu_async = FALSE; 243 cu->cu_connect = FALSE; 244 cu->cu_connected = FALSE; 245 cu->cu_waitchan = "rpcrecv"; 246 cu->cu_waitflag = 0; 247 cu->cu_cwnd = MAXCWND / 2; 248 cu->cu_sent = 0; 249 cu->cu_cwnd_wait = FALSE; 250 (void) getmicrotime(&now); 251 cu->cu_xid = __RPC_GETXID(&now); 252 call_msg.rm_xid = cu->cu_xid; 253 call_msg.rm_call.cb_prog = program; 254 call_msg.rm_call.cb_vers = version; 255 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 256 if (! xdr_callhdr(&xdrs, &call_msg)) { 257 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 258 rpc_createerr.cf_error.re_errno = 0; 259 goto err2; 260 } 261 cu->cu_mcalllen = XDR_GETPOS(&xdrs); 262 263 /* 264 * By default, closeit is always FALSE. It is users responsibility 265 * to do a close on it, else the user may use clnt_control 266 * to let clnt_destroy do it for him/her. 267 */ 268 cu->cu_closeit = FALSE; 269 cu->cu_socket = so; 270 soreserve(so, 256*1024, 256*1024); 271 272 sb = &so->so_rcv; 273 SOCKBUF_LOCK(&so->so_rcv); 274 recheck_socket: 275 if (sb->sb_upcall) { 276 if (sb->sb_upcall != clnt_dg_soupcall) { 277 SOCKBUF_UNLOCK(&so->so_rcv); 278 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 279 goto err2; 280 } 281 cs = (struct cu_socket *) sb->sb_upcallarg; 282 mtx_lock(&cs->cs_lock); 283 cs->cs_refs++; 284 mtx_unlock(&cs->cs_lock); 285 } else { 286 /* 287 * We are the first on this socket - allocate the 288 * structure and install it in the socket. 289 */ 290 SOCKBUF_UNLOCK(&so->so_rcv); 291 cs = mem_alloc(sizeof(*cs)); 292 SOCKBUF_LOCK(&so->so_rcv); 293 if (sb->sb_upcall) { 294 /* 295 * We have lost a race with some other client. 296 */ 297 mem_free(cs, sizeof(*cs)); 298 goto recheck_socket; 299 } 300 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 301 cs->cs_refs = 1; 302 cs->cs_upcallrefs = 0; 303 TAILQ_INIT(&cs->cs_pending); 304 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 305 } 306 SOCKBUF_UNLOCK(&so->so_rcv); 307 308 cl->cl_refs = 1; 309 cl->cl_ops = &clnt_dg_ops; 310 cl->cl_private = (caddr_t)(void *)cu; 311 cl->cl_auth = authnone_create(); 312 cl->cl_tp = NULL; 313 cl->cl_netid = NULL; 314 return (cl); 315 err2: 316 if (cl) { 317 mem_free(cl, sizeof (CLIENT)); 318 if (cu) 319 mem_free(cu, sizeof (*cu)); 320 } 321 return (NULL); 322 } 323 324 static enum clnt_stat 325 clnt_dg_call( 326 CLIENT *cl, /* client handle */ 327 struct rpc_callextra *ext, /* call metadata */ 328 rpcproc_t proc, /* procedure number */ 329 struct mbuf *args, /* pointer to args */ 330 struct mbuf **resultsp, /* pointer to results */ 331 struct timeval utimeout) /* seconds to wait before giving up */ 332 { 333 struct cu_data *cu = (struct cu_data *)cl->cl_private; 334 struct cu_socket *cs; 335 struct rpc_timers *rt; 336 AUTH *auth; 337 struct rpc_err *errp; 338 enum clnt_stat stat; 339 XDR xdrs; 340 struct rpc_msg reply_msg; 341 bool_t ok; 342 int retrans; /* number of re-transmits so far */ 343 int nrefreshes = 2; /* number of times to refresh cred */ 344 struct timeval *tvp; 345 int timeout; 346 int retransmit_time; 347 int next_sendtime, starttime, rtt, time_waited, tv = 0; 348 struct sockaddr *sa; 349 socklen_t salen; 350 uint32_t xid = 0; 351 struct mbuf *mreq = NULL, *results; 352 struct cu_request *cr; 353 int error; 354 355 cs = cu->cu_socket->so_rcv.sb_upcallarg; 356 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 357 358 mtx_lock(&cs->cs_lock); 359 360 if (cu->cu_closing || cu->cu_closed) { 361 mtx_unlock(&cs->cs_lock); 362 free(cr, M_RPC); 363 return (RPC_CANTSEND); 364 } 365 cu->cu_threads++; 366 367 if (ext) { 368 auth = ext->rc_auth; 369 errp = &ext->rc_err; 370 } else { 371 auth = cl->cl_auth; 372 errp = &cu->cu_error; 373 } 374 375 cr->cr_client = cl; 376 cr->cr_mrep = NULL; 377 cr->cr_error = 0; 378 379 if (cu->cu_total.tv_usec == -1) { 380 tvp = &utimeout; /* use supplied timeout */ 381 } else { 382 tvp = &cu->cu_total; /* use default timeout */ 383 } 384 if (tvp->tv_sec || tvp->tv_usec) 385 timeout = tvtohz(tvp); 386 else 387 timeout = 0; 388 389 if (cu->cu_connect && !cu->cu_connected) { 390 mtx_unlock(&cs->cs_lock); 391 error = soconnect(cu->cu_socket, 392 (struct sockaddr *)&cu->cu_raddr, curthread); 393 mtx_lock(&cs->cs_lock); 394 if (error) { 395 errp->re_errno = error; 396 errp->re_status = stat = RPC_CANTSEND; 397 goto out; 398 } 399 cu->cu_connected = 1; 400 } 401 if (cu->cu_connected) { 402 sa = NULL; 403 salen = 0; 404 } else { 405 sa = (struct sockaddr *)&cu->cu_raddr; 406 salen = cu->cu_rlen; 407 } 408 time_waited = 0; 409 retrans = 0; 410 if (ext && ext->rc_timers) { 411 rt = ext->rc_timers; 412 if (!rt->rt_rtxcur) 413 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 414 retransmit_time = next_sendtime = rt->rt_rtxcur; 415 } else { 416 rt = NULL; 417 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 418 } 419 420 starttime = ticks; 421 422 call_again: 423 mtx_assert(&cs->cs_lock, MA_OWNED); 424 425 cu->cu_xid++; 426 xid = cu->cu_xid; 427 428 send_again: 429 mtx_unlock(&cs->cs_lock); 430 431 MGETHDR(mreq, M_WAIT, MT_DATA); 432 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 433 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 434 mreq->m_len = cu->cu_mcalllen; 435 436 /* 437 * The XID is the first thing in the request. 438 */ 439 *mtod(mreq, uint32_t *) = htonl(xid); 440 441 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 442 443 if (cu->cu_async == TRUE && args == NULL) 444 goto get_reply; 445 446 if ((! XDR_PUTINT32(&xdrs, &proc)) || 447 (! AUTH_MARSHALL(auth, xid, &xdrs, 448 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 449 errp->re_status = stat = RPC_CANTENCODEARGS; 450 mtx_lock(&cs->cs_lock); 451 goto out; 452 } 453 mreq->m_pkthdr.len = m_length(mreq, NULL); 454 455 cr->cr_xid = xid; 456 mtx_lock(&cs->cs_lock); 457 458 /* 459 * Try to get a place in the congestion window. 460 */ 461 while (cu->cu_sent >= cu->cu_cwnd) { 462 cu->cu_cwnd_wait = TRUE; 463 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 464 cu->cu_waitflag, "rpccwnd", 0); 465 if (error) { 466 errp->re_errno = error; 467 errp->re_status = stat = RPC_CANTSEND; 468 goto out; 469 } 470 } 471 cu->cu_sent += CWNDSCALE; 472 473 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 474 mtx_unlock(&cs->cs_lock); 475 476 /* 477 * sosend consumes mreq. 478 */ 479 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 480 mreq = NULL; 481 482 /* 483 * sub-optimal code appears here because we have 484 * some clock time to spare while the packets are in flight. 485 * (We assume that this is actually only executed once.) 486 */ 487 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 488 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 489 reply_msg.acpted_rply.ar_verf.oa_length = 0; 490 reply_msg.acpted_rply.ar_results.where = NULL; 491 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 492 493 mtx_lock(&cs->cs_lock); 494 if (error) { 495 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 496 errp->re_errno = error; 497 errp->re_status = stat = RPC_CANTSEND; 498 cu->cu_sent -= CWNDSCALE; 499 if (cu->cu_cwnd_wait) { 500 cu->cu_cwnd_wait = FALSE; 501 wakeup(&cu->cu_cwnd_wait); 502 } 503 goto out; 504 } 505 506 /* 507 * Check to see if we got an upcall while waiting for the 508 * lock. 509 */ 510 if (cr->cr_error) { 511 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 512 errp->re_errno = cr->cr_error; 513 errp->re_status = stat = RPC_CANTRECV; 514 cu->cu_sent -= CWNDSCALE; 515 if (cu->cu_cwnd_wait) { 516 cu->cu_cwnd_wait = FALSE; 517 wakeup(&cu->cu_cwnd_wait); 518 } 519 goto out; 520 } 521 if (cr->cr_mrep) { 522 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 523 cu->cu_sent -= CWNDSCALE; 524 if (cu->cu_cwnd_wait) { 525 cu->cu_cwnd_wait = FALSE; 526 wakeup(&cu->cu_cwnd_wait); 527 } 528 goto got_reply; 529 } 530 531 /* 532 * Hack to provide rpc-based message passing 533 */ 534 if (timeout == 0) { 535 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 536 errp->re_status = stat = RPC_TIMEDOUT; 537 cu->cu_sent -= CWNDSCALE; 538 if (cu->cu_cwnd_wait) { 539 cu->cu_cwnd_wait = FALSE; 540 wakeup(&cu->cu_cwnd_wait); 541 } 542 goto out; 543 } 544 545 get_reply: 546 for (;;) { 547 /* Decide how long to wait. */ 548 if (next_sendtime < timeout) 549 tv = next_sendtime; 550 else 551 tv = timeout; 552 tv -= time_waited; 553 554 if (tv > 0) { 555 if (cu->cu_closing || cu->cu_closed) { 556 error = 0; 557 cr->cr_error = ESHUTDOWN; 558 } else { 559 error = msleep(cr, &cs->cs_lock, 560 cu->cu_waitflag, cu->cu_waitchan, tv); 561 } 562 } else { 563 error = EWOULDBLOCK; 564 } 565 566 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 567 cu->cu_sent -= CWNDSCALE; 568 if (cu->cu_cwnd_wait) { 569 cu->cu_cwnd_wait = FALSE; 570 wakeup(&cu->cu_cwnd_wait); 571 } 572 573 if (!error) { 574 /* 575 * We were woken up by the upcall. If the 576 * upcall had a receive error, report that, 577 * otherwise we have a reply. 578 */ 579 if (cr->cr_error) { 580 errp->re_errno = cr->cr_error; 581 errp->re_status = stat = RPC_CANTRECV; 582 goto out; 583 } 584 585 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 586 + cu->cu_cwnd / 2) / cu->cu_cwnd; 587 if (cu->cu_cwnd > MAXCWND) 588 cu->cu_cwnd = MAXCWND; 589 590 if (rt) { 591 /* 592 * Add one to the time since a tick 593 * count of N means that the actual 594 * time taken was somewhere between N 595 * and N+1. 596 */ 597 rtt = ticks - starttime + 1; 598 599 /* 600 * Update our estimate of the round 601 * trip time using roughly the 602 * algorithm described in RFC 603 * 2988. Given an RTT sample R: 604 * 605 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 606 * SRTT = (1-alpha) * SRTT + alpha * R 607 * 608 * where alpha = 0.125 and beta = 0.25. 609 * 610 * The initial retransmit timeout is 611 * SRTT + 4*RTTVAR and doubles on each 612 * retransmision. 613 */ 614 if (rt->rt_srtt == 0) { 615 rt->rt_srtt = rtt; 616 rt->rt_deviate = rtt / 2; 617 } else { 618 int32_t error = rtt - rt->rt_srtt; 619 rt->rt_srtt += error / 8; 620 error = abs(error) - rt->rt_deviate; 621 rt->rt_deviate += error / 4; 622 } 623 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 624 } 625 626 break; 627 } 628 629 /* 630 * The sleep returned an error so our request is still 631 * on the list. If we got EWOULDBLOCK, we may want to 632 * re-send the request. 633 */ 634 if (error != EWOULDBLOCK) { 635 errp->re_errno = error; 636 if (error == EINTR) 637 errp->re_status = stat = RPC_INTR; 638 else 639 errp->re_status = stat = RPC_CANTRECV; 640 goto out; 641 } 642 643 time_waited = ticks - starttime; 644 645 /* Check for timeout. */ 646 if (time_waited > timeout) { 647 errp->re_errno = EWOULDBLOCK; 648 errp->re_status = stat = RPC_TIMEDOUT; 649 goto out; 650 } 651 652 /* Retransmit if necessary. */ 653 if (time_waited >= next_sendtime) { 654 cu->cu_cwnd /= 2; 655 if (cu->cu_cwnd < CWNDSCALE) 656 cu->cu_cwnd = CWNDSCALE; 657 if (ext && ext->rc_feedback) { 658 mtx_unlock(&cs->cs_lock); 659 if (retrans == 0) 660 ext->rc_feedback(FEEDBACK_REXMIT1, 661 proc, ext->rc_feedback_arg); 662 else 663 ext->rc_feedback(FEEDBACK_REXMIT2, 664 proc, ext->rc_feedback_arg); 665 mtx_lock(&cs->cs_lock); 666 } 667 if (cu->cu_closing || cu->cu_closed) { 668 errp->re_errno = ESHUTDOWN; 669 errp->re_status = stat = RPC_CANTRECV; 670 goto out; 671 } 672 retrans++; 673 /* update retransmit_time */ 674 if (retransmit_time < RPC_MAX_BACKOFF * hz) 675 retransmit_time = 2 * retransmit_time; 676 next_sendtime += retransmit_time; 677 goto send_again; 678 } 679 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 680 } 681 682 got_reply: 683 /* 684 * Now decode and validate the response. We need to drop the 685 * lock since xdr_replymsg may end up sleeping in malloc. 686 */ 687 mtx_unlock(&cs->cs_lock); 688 689 if (ext && ext->rc_feedback) 690 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 691 692 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 693 ok = xdr_replymsg(&xdrs, &reply_msg); 694 cr->cr_mrep = NULL; 695 696 if (ok) { 697 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 698 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 699 errp->re_status = stat = RPC_SUCCESS; 700 else 701 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 702 703 if (errp->re_status == RPC_SUCCESS) { 704 results = xdrmbuf_getall(&xdrs); 705 if (! AUTH_VALIDATE(auth, xid, 706 &reply_msg.acpted_rply.ar_verf, 707 &results)) { 708 errp->re_status = stat = RPC_AUTHERROR; 709 errp->re_why = AUTH_INVALIDRESP; 710 if (retrans && 711 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 712 /* 713 * If we retransmitted, its 714 * possible that we will 715 * receive a reply for one of 716 * the earlier transmissions 717 * (which will use an older 718 * RPCSEC_GSS sequence 719 * number). In this case, just 720 * go back and listen for a 721 * new reply. We could keep a 722 * record of all the seq 723 * numbers we have transmitted 724 * so far so that we could 725 * accept a reply for any of 726 * them here. 727 */ 728 XDR_DESTROY(&xdrs); 729 mtx_lock(&cs->cs_lock); 730 TAILQ_INSERT_TAIL(&cs->cs_pending, 731 cr, cr_link); 732 cr->cr_mrep = NULL; 733 goto get_reply; 734 } 735 } else { 736 *resultsp = results; 737 } 738 } /* end successful completion */ 739 /* 740 * If unsuccesful AND error is an authentication error 741 * then refresh credentials and try again, else break 742 */ 743 else if (stat == RPC_AUTHERROR) 744 /* maybe our credentials need to be refreshed ... */ 745 if (nrefreshes > 0 && 746 AUTH_REFRESH(auth, &reply_msg)) { 747 nrefreshes--; 748 XDR_DESTROY(&xdrs); 749 mtx_lock(&cs->cs_lock); 750 goto call_again; 751 } 752 /* end of unsuccessful completion */ 753 } /* end of valid reply message */ 754 else { 755 errp->re_status = stat = RPC_CANTDECODERES; 756 757 } 758 XDR_DESTROY(&xdrs); 759 mtx_lock(&cs->cs_lock); 760 out: 761 mtx_assert(&cs->cs_lock, MA_OWNED); 762 763 if (mreq) 764 m_freem(mreq); 765 if (cr->cr_mrep) 766 m_freem(cr->cr_mrep); 767 768 cu->cu_threads--; 769 if (cu->cu_closing) 770 wakeup(cu); 771 772 mtx_unlock(&cs->cs_lock); 773 774 if (auth && stat != RPC_SUCCESS) 775 AUTH_VALIDATE(auth, xid, NULL, NULL); 776 777 free(cr, M_RPC); 778 779 return (stat); 780 } 781 782 static void 783 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 784 { 785 struct cu_data *cu = (struct cu_data *)cl->cl_private; 786 787 *errp = cu->cu_error; 788 } 789 790 static bool_t 791 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 792 { 793 XDR xdrs; 794 bool_t dummy; 795 796 xdrs.x_op = XDR_FREE; 797 dummy = (*xdr_res)(&xdrs, res_ptr); 798 799 return (dummy); 800 } 801 802 /*ARGSUSED*/ 803 static void 804 clnt_dg_abort(CLIENT *h) 805 { 806 } 807 808 static bool_t 809 clnt_dg_control(CLIENT *cl, u_int request, void *info) 810 { 811 struct cu_data *cu = (struct cu_data *)cl->cl_private; 812 struct cu_socket *cs; 813 struct sockaddr *addr; 814 815 cs = cu->cu_socket->so_rcv.sb_upcallarg; 816 mtx_lock(&cs->cs_lock); 817 818 switch (request) { 819 case CLSET_FD_CLOSE: 820 cu->cu_closeit = TRUE; 821 mtx_unlock(&cs->cs_lock); 822 return (TRUE); 823 case CLSET_FD_NCLOSE: 824 cu->cu_closeit = FALSE; 825 mtx_unlock(&cs->cs_lock); 826 return (TRUE); 827 } 828 829 /* for other requests which use info */ 830 if (info == NULL) { 831 mtx_unlock(&cs->cs_lock); 832 return (FALSE); 833 } 834 switch (request) { 835 case CLSET_TIMEOUT: 836 if (time_not_ok((struct timeval *)info)) { 837 mtx_unlock(&cs->cs_lock); 838 return (FALSE); 839 } 840 cu->cu_total = *(struct timeval *)info; 841 break; 842 case CLGET_TIMEOUT: 843 *(struct timeval *)info = cu->cu_total; 844 break; 845 case CLSET_RETRY_TIMEOUT: 846 if (time_not_ok((struct timeval *)info)) { 847 mtx_unlock(&cs->cs_lock); 848 return (FALSE); 849 } 850 cu->cu_wait = *(struct timeval *)info; 851 break; 852 case CLGET_RETRY_TIMEOUT: 853 *(struct timeval *)info = cu->cu_wait; 854 break; 855 case CLGET_SVC_ADDR: 856 /* 857 * Slightly different semantics to userland - we use 858 * sockaddr instead of netbuf. 859 */ 860 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 861 break; 862 case CLSET_SVC_ADDR: /* set to new address */ 863 addr = (struct sockaddr *)info; 864 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 865 break; 866 case CLGET_XID: 867 *(uint32_t *)info = cu->cu_xid; 868 break; 869 870 case CLSET_XID: 871 /* This will set the xid of the NEXT call */ 872 /* decrement by 1 as clnt_dg_call() increments once */ 873 cu->cu_xid = *(uint32_t *)info - 1; 874 break; 875 876 case CLGET_VERS: 877 /* 878 * This RELIES on the information that, in the call body, 879 * the version number field is the fifth field from the 880 * begining of the RPC header. MUST be changed if the 881 * call_struct is changed 882 */ 883 *(uint32_t *)info = 884 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 885 4 * BYTES_PER_XDR_UNIT)); 886 break; 887 888 case CLSET_VERS: 889 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 890 = htonl(*(uint32_t *)info); 891 break; 892 893 case CLGET_PROG: 894 /* 895 * This RELIES on the information that, in the call body, 896 * the program number field is the fourth field from the 897 * begining of the RPC header. MUST be changed if the 898 * call_struct is changed 899 */ 900 *(uint32_t *)info = 901 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 902 3 * BYTES_PER_XDR_UNIT)); 903 break; 904 905 case CLSET_PROG: 906 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 907 = htonl(*(uint32_t *)info); 908 break; 909 case CLSET_ASYNC: 910 cu->cu_async = *(int *)info; 911 break; 912 case CLSET_CONNECT: 913 cu->cu_connect = *(int *)info; 914 break; 915 case CLSET_WAITCHAN: 916 cu->cu_waitchan = (const char *)info; 917 break; 918 case CLGET_WAITCHAN: 919 *(const char **) info = cu->cu_waitchan; 920 break; 921 case CLSET_INTERRUPTIBLE: 922 if (*(int *) info) 923 cu->cu_waitflag = PCATCH; 924 else 925 cu->cu_waitflag = 0; 926 break; 927 case CLGET_INTERRUPTIBLE: 928 if (cu->cu_waitflag) 929 *(int *) info = TRUE; 930 else 931 *(int *) info = FALSE; 932 break; 933 default: 934 mtx_unlock(&cs->cs_lock); 935 return (FALSE); 936 } 937 mtx_unlock(&cs->cs_lock); 938 return (TRUE); 939 } 940 941 static void 942 clnt_dg_close(CLIENT *cl) 943 { 944 struct cu_data *cu = (struct cu_data *)cl->cl_private; 945 struct cu_socket *cs; 946 struct cu_request *cr; 947 948 cs = cu->cu_socket->so_rcv.sb_upcallarg; 949 mtx_lock(&cs->cs_lock); 950 951 if (cu->cu_closed) { 952 mtx_unlock(&cs->cs_lock); 953 return; 954 } 955 956 if (cu->cu_closing) { 957 while (cu->cu_closing) 958 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 959 KASSERT(cu->cu_closed, ("client should be closed")); 960 mtx_unlock(&cs->cs_lock); 961 return; 962 } 963 964 /* 965 * Abort any pending requests and wait until everyone 966 * has finished with clnt_vc_call. 967 */ 968 cu->cu_closing = TRUE; 969 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 970 if (cr->cr_client == cl) { 971 cr->cr_xid = 0; 972 cr->cr_error = ESHUTDOWN; 973 wakeup(cr); 974 } 975 } 976 977 while (cu->cu_threads) 978 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 979 980 cu->cu_closing = FALSE; 981 cu->cu_closed = TRUE; 982 983 mtx_unlock(&cs->cs_lock); 984 wakeup(cu); 985 } 986 987 static void 988 clnt_dg_destroy(CLIENT *cl) 989 { 990 struct cu_data *cu = (struct cu_data *)cl->cl_private; 991 struct cu_socket *cs; 992 struct socket *so = NULL; 993 bool_t lastsocketref; 994 995 cs = cu->cu_socket->so_rcv.sb_upcallarg; 996 clnt_dg_close(cl); 997 998 mtx_lock(&cs->cs_lock); 999 1000 cs->cs_refs--; 1001 if (cs->cs_refs == 0) { 1002 mtx_unlock(&cs->cs_lock); 1003 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 1004 soupcall_clear(cu->cu_socket, SO_RCV); 1005 clnt_dg_upcallsdone(cu->cu_socket, cs); 1006 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1007 mtx_destroy(&cs->cs_lock); 1008 mem_free(cs, sizeof(*cs)); 1009 lastsocketref = TRUE; 1010 } else { 1011 mtx_unlock(&cs->cs_lock); 1012 lastsocketref = FALSE; 1013 } 1014 1015 if (cu->cu_closeit && lastsocketref) { 1016 so = cu->cu_socket; 1017 cu->cu_socket = NULL; 1018 } 1019 1020 if (so) 1021 soclose(so); 1022 1023 if (cl->cl_netid && cl->cl_netid[0]) 1024 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1025 if (cl->cl_tp && cl->cl_tp[0]) 1026 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1027 mem_free(cu, sizeof (*cu)); 1028 mem_free(cl, sizeof (CLIENT)); 1029 } 1030 1031 /* 1032 * Make sure that the time is not garbage. -1 value is allowed. 1033 */ 1034 static bool_t 1035 time_not_ok(struct timeval *t) 1036 { 1037 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1038 t->tv_usec < -1 || t->tv_usec > 1000000); 1039 } 1040 1041 int 1042 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1043 { 1044 struct cu_socket *cs = (struct cu_socket *) arg; 1045 struct uio uio; 1046 struct mbuf *m; 1047 struct mbuf *control; 1048 struct cu_request *cr; 1049 int error, rcvflag, foundreq; 1050 uint32_t xid; 1051 1052 cs->cs_upcallrefs++; 1053 uio.uio_resid = 1000000000; 1054 uio.uio_td = curthread; 1055 do { 1056 SOCKBUF_UNLOCK(&so->so_rcv); 1057 m = NULL; 1058 control = NULL; 1059 rcvflag = MSG_DONTWAIT; 1060 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1061 if (control) 1062 m_freem(control); 1063 SOCKBUF_LOCK(&so->so_rcv); 1064 1065 if (error == EWOULDBLOCK) 1066 break; 1067 1068 /* 1069 * If there was an error, wake up all pending 1070 * requests. 1071 */ 1072 if (error) { 1073 mtx_lock(&cs->cs_lock); 1074 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1075 cr->cr_xid = 0; 1076 cr->cr_error = error; 1077 wakeup(cr); 1078 } 1079 mtx_unlock(&cs->cs_lock); 1080 break; 1081 } 1082 1083 /* 1084 * The XID is in the first uint32_t of the reply. 1085 */ 1086 if (m->m_len < sizeof(xid)) 1087 m = m_pullup(m, sizeof(xid)); 1088 if (!m) 1089 /* 1090 * Should never happen. 1091 */ 1092 continue; 1093 1094 xid = ntohl(*mtod(m, uint32_t *)); 1095 1096 /* 1097 * Attempt to match this reply with a pending request. 1098 */ 1099 mtx_lock(&cs->cs_lock); 1100 foundreq = 0; 1101 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1102 if (cr->cr_xid == xid) { 1103 /* 1104 * This one matches. We leave the 1105 * reply mbuf in cr->cr_mrep. Set the 1106 * XID to zero so that we will ignore 1107 * any duplicated replies that arrive 1108 * before clnt_dg_call removes it from 1109 * the queue. 1110 */ 1111 cr->cr_xid = 0; 1112 cr->cr_mrep = m; 1113 cr->cr_error = 0; 1114 foundreq = 1; 1115 wakeup(cr); 1116 break; 1117 } 1118 } 1119 mtx_unlock(&cs->cs_lock); 1120 1121 /* 1122 * If we didn't find the matching request, just drop 1123 * it - its probably a repeated reply. 1124 */ 1125 if (!foundreq) 1126 m_freem(m); 1127 } while (m); 1128 cs->cs_upcallrefs--; 1129 if (cs->cs_upcallrefs < 0) 1130 panic("rpcdg upcall refcnt"); 1131 if (cs->cs_upcallrefs == 0) 1132 wakeup(&cs->cs_upcallrefs); 1133 return (SU_OK); 1134 } 1135 1136 /* 1137 * Wait for all upcalls in progress to complete. 1138 */ 1139 static void 1140 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1141 { 1142 1143 SOCKBUF_LOCK_ASSERT(&so->so_rcv); 1144 1145 while (cs->cs_upcallrefs > 0) 1146 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1147 "rpcdgup", 0); 1148 } 1149