1 /* $NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $ */ 2 3 /*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #if defined(LIBC_SCCS) && !defined(lint) 32 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro"; 33 static char *sccsid = "@(#)svc_tcp.c 2.2 88/08/01 4.0 RPCSRC"; 34 #endif 35 #include <sys/cdefs.h> 36 __FBSDID("$FreeBSD$"); 37 38 /* 39 * svc_vc.c, Server side for Connection Oriented based RPC. 40 * 41 * Actually implements two flavors of transporter - 42 * a tcp rendezvouser (a listner and connection establisher) 43 * and a record/tcp stream. 44 */ 45 46 #include <sys/param.h> 47 #include <sys/lock.h> 48 #include <sys/kernel.h> 49 #include <sys/malloc.h> 50 #include <sys/mbuf.h> 51 #include <sys/mutex.h> 52 #include <sys/proc.h> 53 #include <sys/protosw.h> 54 #include <sys/queue.h> 55 #include <sys/socket.h> 56 #include <sys/socketvar.h> 57 #include <sys/sx.h> 58 #include <sys/systm.h> 59 #include <sys/uio.h> 60 61 #include <net/vnet.h> 62 63 #include <netinet/tcp.h> 64 65 #include <rpc/rpc.h> 66 67 #include <rpc/krpc.h> 68 #include <rpc/rpc_com.h> 69 70 #include <security/mac/mac_framework.h> 71 72 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *, 73 struct sockaddr **, struct mbuf **); 74 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *); 75 static void svc_vc_rendezvous_destroy(SVCXPRT *); 76 static bool_t svc_vc_null(void); 77 static void svc_vc_destroy(SVCXPRT *); 78 static enum xprt_stat svc_vc_stat(SVCXPRT *); 79 static bool_t svc_vc_ack(SVCXPRT *, uint32_t *); 80 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *, 81 struct sockaddr **, struct mbuf **); 82 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *, 83 struct sockaddr *, struct mbuf *, uint32_t *seq); 84 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in); 85 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq, 86 void *in); 87 static void svc_vc_backchannel_destroy(SVCXPRT *); 88 static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *); 89 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *, 90 struct sockaddr **, struct mbuf **); 91 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *, 92 struct sockaddr *, struct mbuf *, uint32_t *); 93 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, 94 void *in); 95 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so, 96 struct sockaddr *raddr); 97 static int svc_vc_accept(struct socket *head, struct socket **sop); 98 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag); 99 static int svc_vc_rendezvous_soupcall(struct socket *, void *, int); 100 101 static struct xp_ops svc_vc_rendezvous_ops = { 102 .xp_recv = svc_vc_rendezvous_recv, 103 .xp_stat = svc_vc_rendezvous_stat, 104 .xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *, 105 struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null, 106 .xp_destroy = svc_vc_rendezvous_destroy, 107 .xp_control = svc_vc_rendezvous_control 108 }; 109 110 static struct xp_ops svc_vc_ops = { 111 .xp_recv = svc_vc_recv, 112 .xp_stat = svc_vc_stat, 113 .xp_ack = svc_vc_ack, 114 .xp_reply = svc_vc_reply, 115 .xp_destroy = svc_vc_destroy, 116 .xp_control = svc_vc_control 117 }; 118 119 static struct xp_ops svc_vc_backchannel_ops = { 120 .xp_recv = svc_vc_backchannel_recv, 121 .xp_stat = svc_vc_backchannel_stat, 122 .xp_reply = svc_vc_backchannel_reply, 123 .xp_destroy = svc_vc_backchannel_destroy, 124 .xp_control = svc_vc_backchannel_control 125 }; 126 127 /* 128 * Usage: 129 * xprt = svc_vc_create(sock, send_buf_size, recv_buf_size); 130 * 131 * Creates, registers, and returns a (rpc) tcp based transporter. 132 * Once *xprt is initialized, it is registered as a transporter 133 * see (svc.h, xprt_register). This routine returns 134 * a NULL if a problem occurred. 135 * 136 * The filedescriptor passed in is expected to refer to a bound, but 137 * not yet connected socket. 138 * 139 * Since streams do buffered io similar to stdio, the caller can specify 140 * how big the send and receive buffers are via the second and third parms; 141 * 0 => use the system default. 142 */ 143 SVCXPRT * 144 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize, 145 size_t recvsize) 146 { 147 SVCXPRT *xprt; 148 struct sockaddr* sa; 149 int error; 150 151 SOCK_LOCK(so); 152 if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) { 153 SOCK_UNLOCK(so); 154 CURVNET_SET(so->so_vnet); 155 error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa); 156 CURVNET_RESTORE(); 157 if (error) 158 return (NULL); 159 xprt = svc_vc_create_conn(pool, so, sa); 160 free(sa, M_SONAME); 161 return (xprt); 162 } 163 SOCK_UNLOCK(so); 164 165 xprt = svc_xprt_alloc(); 166 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 167 xprt->xp_pool = pool; 168 xprt->xp_socket = so; 169 xprt->xp_p1 = NULL; 170 xprt->xp_p2 = NULL; 171 xprt->xp_ops = &svc_vc_rendezvous_ops; 172 173 CURVNET_SET(so->so_vnet); 174 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 175 CURVNET_RESTORE(); 176 if (error) { 177 goto cleanup_svc_vc_create; 178 } 179 180 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 181 free(sa, M_SONAME); 182 183 xprt_register(xprt); 184 185 solisten(so, -1, curthread); 186 187 SOLISTEN_LOCK(so); 188 xprt->xp_upcallset = 1; 189 solisten_upcall_set(so, svc_vc_rendezvous_soupcall, xprt); 190 SOLISTEN_UNLOCK(so); 191 192 return (xprt); 193 194 cleanup_svc_vc_create: 195 sx_destroy(&xprt->xp_lock); 196 svc_xprt_free(xprt); 197 198 return (NULL); 199 } 200 201 /* 202 * Create a new transport for a socket optained via soaccept(). 203 */ 204 SVCXPRT * 205 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr) 206 { 207 SVCXPRT *xprt; 208 struct cf_conn *cd; 209 struct sockaddr* sa = NULL; 210 struct sockopt opt; 211 int one = 1; 212 int error; 213 214 bzero(&opt, sizeof(struct sockopt)); 215 opt.sopt_dir = SOPT_SET; 216 opt.sopt_level = SOL_SOCKET; 217 opt.sopt_name = SO_KEEPALIVE; 218 opt.sopt_val = &one; 219 opt.sopt_valsize = sizeof(one); 220 error = sosetopt(so, &opt); 221 if (error) { 222 return (NULL); 223 } 224 225 if (so->so_proto->pr_protocol == IPPROTO_TCP) { 226 bzero(&opt, sizeof(struct sockopt)); 227 opt.sopt_dir = SOPT_SET; 228 opt.sopt_level = IPPROTO_TCP; 229 opt.sopt_name = TCP_NODELAY; 230 opt.sopt_val = &one; 231 opt.sopt_valsize = sizeof(one); 232 error = sosetopt(so, &opt); 233 if (error) { 234 return (NULL); 235 } 236 } 237 238 cd = mem_alloc(sizeof(*cd)); 239 cd->strm_stat = XPRT_IDLE; 240 241 xprt = svc_xprt_alloc(); 242 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 243 xprt->xp_pool = pool; 244 xprt->xp_socket = so; 245 xprt->xp_p1 = cd; 246 xprt->xp_p2 = NULL; 247 xprt->xp_ops = &svc_vc_ops; 248 249 /* 250 * See http://www.connectathon.org/talks96/nfstcp.pdf - client 251 * has a 5 minute timer, server has a 6 minute timer. 252 */ 253 xprt->xp_idletimeout = 6 * 60; 254 255 memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len); 256 257 CURVNET_SET(so->so_vnet); 258 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 259 CURVNET_RESTORE(); 260 if (error) 261 goto cleanup_svc_vc_create; 262 263 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 264 free(sa, M_SONAME); 265 266 xprt_register(xprt); 267 268 SOCKBUF_LOCK(&so->so_rcv); 269 xprt->xp_upcallset = 1; 270 soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt); 271 SOCKBUF_UNLOCK(&so->so_rcv); 272 273 /* 274 * Throw the transport into the active list in case it already 275 * has some data buffered. 276 */ 277 sx_xlock(&xprt->xp_lock); 278 xprt_active(xprt); 279 sx_xunlock(&xprt->xp_lock); 280 281 return (xprt); 282 cleanup_svc_vc_create: 283 sx_destroy(&xprt->xp_lock); 284 svc_xprt_free(xprt); 285 mem_free(cd, sizeof(*cd)); 286 287 return (NULL); 288 } 289 290 /* 291 * Create a new transport for a backchannel on a clnt_vc socket. 292 */ 293 SVCXPRT * 294 svc_vc_create_backchannel(SVCPOOL *pool) 295 { 296 SVCXPRT *xprt = NULL; 297 struct cf_conn *cd = NULL; 298 299 cd = mem_alloc(sizeof(*cd)); 300 cd->strm_stat = XPRT_IDLE; 301 302 xprt = svc_xprt_alloc(); 303 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 304 xprt->xp_pool = pool; 305 xprt->xp_socket = NULL; 306 xprt->xp_p1 = cd; 307 xprt->xp_p2 = NULL; 308 xprt->xp_ops = &svc_vc_backchannel_ops; 309 return (xprt); 310 } 311 312 /* 313 * This does all of the accept except the final call to soaccept. The 314 * caller will call soaccept after dropping its locks (soaccept may 315 * call malloc). 316 */ 317 int 318 svc_vc_accept(struct socket *head, struct socket **sop) 319 { 320 struct socket *so; 321 int error = 0; 322 short nbio; 323 324 /* XXXGL: shouldn't that be an assertion? */ 325 if ((head->so_options & SO_ACCEPTCONN) == 0) { 326 error = EINVAL; 327 goto done; 328 } 329 #ifdef MAC 330 error = mac_socket_check_accept(curthread->td_ucred, head); 331 if (error != 0) 332 goto done; 333 #endif 334 /* 335 * XXXGL: we want non-blocking semantics. The socket could be a 336 * socket created by kernel as well as socket shared with userland, 337 * so we can't be sure about presense of SS_NBIO. We also shall not 338 * toggle it on the socket, since that may surprise userland. So we 339 * set SS_NBIO only temporarily. 340 */ 341 SOLISTEN_LOCK(head); 342 nbio = head->so_state & SS_NBIO; 343 head->so_state |= SS_NBIO; 344 error = solisten_dequeue(head, &so, 0); 345 head->so_state &= (nbio & ~SS_NBIO); 346 if (error) 347 goto done; 348 349 so->so_state |= nbio; 350 *sop = so; 351 352 /* connection has been removed from the listen queue */ 353 KNOTE_UNLOCKED(&head->so_rdsel.si_note, 0); 354 done: 355 return (error); 356 } 357 358 /*ARGSUSED*/ 359 static bool_t 360 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg, 361 struct sockaddr **addrp, struct mbuf **mp) 362 { 363 struct socket *so = NULL; 364 struct sockaddr *sa = NULL; 365 int error; 366 SVCXPRT *new_xprt; 367 368 /* 369 * The socket upcall calls xprt_active() which will eventually 370 * cause the server to call us here. We attempt to accept a 371 * connection from the socket and turn it into a new 372 * transport. If the accept fails, we have drained all pending 373 * connections so we call xprt_inactive(). 374 */ 375 sx_xlock(&xprt->xp_lock); 376 377 error = svc_vc_accept(xprt->xp_socket, &so); 378 379 if (error == EWOULDBLOCK) { 380 /* 381 * We must re-test for new connections after taking 382 * the lock to protect us in the case where a new 383 * connection arrives after our call to accept fails 384 * with EWOULDBLOCK. 385 */ 386 SOLISTEN_LOCK(xprt->xp_socket); 387 if (TAILQ_EMPTY(&xprt->xp_socket->sol_comp)) 388 xprt_inactive_self(xprt); 389 SOLISTEN_UNLOCK(xprt->xp_socket); 390 sx_xunlock(&xprt->xp_lock); 391 return (FALSE); 392 } 393 394 if (error) { 395 SOLISTEN_LOCK(xprt->xp_socket); 396 if (xprt->xp_upcallset) { 397 xprt->xp_upcallset = 0; 398 soupcall_clear(xprt->xp_socket, SO_RCV); 399 } 400 SOLISTEN_UNLOCK(xprt->xp_socket); 401 xprt_inactive_self(xprt); 402 sx_xunlock(&xprt->xp_lock); 403 return (FALSE); 404 } 405 406 sx_xunlock(&xprt->xp_lock); 407 408 sa = NULL; 409 error = soaccept(so, &sa); 410 411 if (error) { 412 /* 413 * XXX not sure if I need to call sofree or soclose here. 414 */ 415 if (sa) 416 free(sa, M_SONAME); 417 return (FALSE); 418 } 419 420 /* 421 * svc_vc_create_conn will call xprt_register - we don't need 422 * to do anything with the new connection except derefence it. 423 */ 424 new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa); 425 if (!new_xprt) { 426 soclose(so); 427 } else { 428 SVC_RELEASE(new_xprt); 429 } 430 431 free(sa, M_SONAME); 432 433 return (FALSE); /* there is never an rpc msg to be processed */ 434 } 435 436 /*ARGSUSED*/ 437 static enum xprt_stat 438 svc_vc_rendezvous_stat(SVCXPRT *xprt) 439 { 440 441 return (XPRT_IDLE); 442 } 443 444 static void 445 svc_vc_destroy_common(SVCXPRT *xprt) 446 { 447 448 if (xprt->xp_socket) 449 (void)soclose(xprt->xp_socket); 450 451 if (xprt->xp_netid) 452 (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1); 453 svc_xprt_free(xprt); 454 } 455 456 static void 457 svc_vc_rendezvous_destroy(SVCXPRT *xprt) 458 { 459 460 SOLISTEN_LOCK(xprt->xp_socket); 461 if (xprt->xp_upcallset) { 462 xprt->xp_upcallset = 0; 463 solisten_upcall_set(xprt->xp_socket, NULL, NULL); 464 } 465 SOLISTEN_UNLOCK(xprt->xp_socket); 466 467 svc_vc_destroy_common(xprt); 468 } 469 470 static void 471 svc_vc_destroy(SVCXPRT *xprt) 472 { 473 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 474 475 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); 476 if (xprt->xp_upcallset) { 477 xprt->xp_upcallset = 0; 478 soupcall_clear(xprt->xp_socket, SO_RCV); 479 } 480 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); 481 482 svc_vc_destroy_common(xprt); 483 484 if (cd->mreq) 485 m_freem(cd->mreq); 486 if (cd->mpending) 487 m_freem(cd->mpending); 488 mem_free(cd, sizeof(*cd)); 489 } 490 491 static void 492 svc_vc_backchannel_destroy(SVCXPRT *xprt) 493 { 494 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 495 struct mbuf *m, *m2; 496 497 svc_xprt_free(xprt); 498 m = cd->mreq; 499 while (m != NULL) { 500 m2 = m; 501 m = m->m_nextpkt; 502 m_freem(m2); 503 } 504 mem_free(cd, sizeof(*cd)); 505 } 506 507 /*ARGSUSED*/ 508 static bool_t 509 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in) 510 { 511 return (FALSE); 512 } 513 514 static bool_t 515 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in) 516 { 517 518 return (FALSE); 519 } 520 521 static bool_t 522 svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in) 523 { 524 525 return (FALSE); 526 } 527 528 static enum xprt_stat 529 svc_vc_stat(SVCXPRT *xprt) 530 { 531 struct cf_conn *cd; 532 533 cd = (struct cf_conn *)(xprt->xp_p1); 534 535 if (cd->strm_stat == XPRT_DIED) 536 return (XPRT_DIED); 537 538 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) 539 return (XPRT_MOREREQS); 540 541 if (soreadable(xprt->xp_socket)) 542 return (XPRT_MOREREQS); 543 544 return (XPRT_IDLE); 545 } 546 547 static bool_t 548 svc_vc_ack(SVCXPRT *xprt, uint32_t *ack) 549 { 550 551 *ack = atomic_load_acq_32(&xprt->xp_snt_cnt); 552 *ack -= sbused(&xprt->xp_socket->so_snd); 553 return (TRUE); 554 } 555 556 static enum xprt_stat 557 svc_vc_backchannel_stat(SVCXPRT *xprt) 558 { 559 struct cf_conn *cd; 560 561 cd = (struct cf_conn *)(xprt->xp_p1); 562 563 if (cd->mreq != NULL) 564 return (XPRT_MOREREQS); 565 566 return (XPRT_IDLE); 567 } 568 569 /* 570 * If we have an mbuf chain in cd->mpending, try to parse a record from it, 571 * leaving the result in cd->mreq. If we don't have a complete record, leave 572 * the partial result in cd->mreq and try to read more from the socket. 573 */ 574 static int 575 svc_vc_process_pending(SVCXPRT *xprt) 576 { 577 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 578 struct socket *so = xprt->xp_socket; 579 struct mbuf *m; 580 581 /* 582 * If cd->resid is non-zero, we have part of the 583 * record already, otherwise we are expecting a record 584 * marker. 585 */ 586 if (!cd->resid && cd->mpending) { 587 /* 588 * See if there is enough data buffered to 589 * make up a record marker. Make sure we can 590 * handle the case where the record marker is 591 * split across more than one mbuf. 592 */ 593 size_t n = 0; 594 uint32_t header; 595 596 m = cd->mpending; 597 while (n < sizeof(uint32_t) && m) { 598 n += m->m_len; 599 m = m->m_next; 600 } 601 if (n < sizeof(uint32_t)) { 602 so->so_rcv.sb_lowat = sizeof(uint32_t) - n; 603 return (FALSE); 604 } 605 m_copydata(cd->mpending, 0, sizeof(header), 606 (char *)&header); 607 header = ntohl(header); 608 cd->eor = (header & 0x80000000) != 0; 609 cd->resid = header & 0x7fffffff; 610 m_adj(cd->mpending, sizeof(uint32_t)); 611 } 612 613 /* 614 * Start pulling off mbufs from cd->mpending 615 * until we either have a complete record or 616 * we run out of data. We use m_split to pull 617 * data - it will pull as much as possible and 618 * split the last mbuf if necessary. 619 */ 620 while (cd->mpending && cd->resid) { 621 m = cd->mpending; 622 if (cd->mpending->m_next 623 || cd->mpending->m_len > cd->resid) 624 cd->mpending = m_split(cd->mpending, 625 cd->resid, M_WAITOK); 626 else 627 cd->mpending = NULL; 628 if (cd->mreq) 629 m_last(cd->mreq)->m_next = m; 630 else 631 cd->mreq = m; 632 while (m) { 633 cd->resid -= m->m_len; 634 m = m->m_next; 635 } 636 } 637 638 /* 639 * Block receive upcalls if we have more data pending, 640 * otherwise report our need. 641 */ 642 if (cd->mpending) 643 so->so_rcv.sb_lowat = INT_MAX; 644 else 645 so->so_rcv.sb_lowat = 646 imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2)); 647 return (TRUE); 648 } 649 650 static bool_t 651 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, 652 struct sockaddr **addrp, struct mbuf **mp) 653 { 654 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 655 struct uio uio; 656 struct mbuf *m; 657 struct socket* so = xprt->xp_socket; 658 XDR xdrs; 659 int error, rcvflag; 660 uint32_t xid_plus_direction[2]; 661 662 /* 663 * Serialise access to the socket and our own record parsing 664 * state. 665 */ 666 sx_xlock(&xprt->xp_lock); 667 668 for (;;) { 669 /* If we have no request ready, check pending queue. */ 670 while (cd->mpending && 671 (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) { 672 if (!svc_vc_process_pending(xprt)) 673 break; 674 } 675 676 /* Process and return complete request in cd->mreq. */ 677 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) { 678 679 /* 680 * Now, check for a backchannel reply. 681 * The XID is in the first uint32_t of the reply 682 * and the message direction is the second one. 683 */ 684 if ((cd->mreq->m_len >= sizeof(xid_plus_direction) || 685 m_length(cd->mreq, NULL) >= 686 sizeof(xid_plus_direction)) && 687 xprt->xp_p2 != NULL) { 688 m_copydata(cd->mreq, 0, 689 sizeof(xid_plus_direction), 690 (char *)xid_plus_direction); 691 xid_plus_direction[0] = 692 ntohl(xid_plus_direction[0]); 693 xid_plus_direction[1] = 694 ntohl(xid_plus_direction[1]); 695 /* Check message direction. */ 696 if (xid_plus_direction[1] == REPLY) { 697 clnt_bck_svccall(xprt->xp_p2, 698 cd->mreq, 699 xid_plus_direction[0]); 700 cd->mreq = NULL; 701 continue; 702 } 703 } 704 705 xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); 706 cd->mreq = NULL; 707 708 /* Check for next request in a pending queue. */ 709 svc_vc_process_pending(xprt); 710 if (cd->mreq == NULL || cd->resid != 0) { 711 SOCKBUF_LOCK(&so->so_rcv); 712 if (!soreadable(so)) 713 xprt_inactive_self(xprt); 714 SOCKBUF_UNLOCK(&so->so_rcv); 715 } 716 717 sx_xunlock(&xprt->xp_lock); 718 719 if (! xdr_callmsg(&xdrs, msg)) { 720 XDR_DESTROY(&xdrs); 721 return (FALSE); 722 } 723 724 *addrp = NULL; 725 *mp = xdrmbuf_getall(&xdrs); 726 XDR_DESTROY(&xdrs); 727 728 return (TRUE); 729 } 730 731 /* 732 * The socket upcall calls xprt_active() which will eventually 733 * cause the server to call us here. We attempt to 734 * read as much as possible from the socket and put 735 * the result in cd->mpending. If the read fails, 736 * we have drained both cd->mpending and the socket so 737 * we can call xprt_inactive(). 738 */ 739 uio.uio_resid = 1000000000; 740 uio.uio_td = curthread; 741 m = NULL; 742 rcvflag = MSG_DONTWAIT; 743 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 744 745 if (error == EWOULDBLOCK) { 746 /* 747 * We must re-test for readability after 748 * taking the lock to protect us in the case 749 * where a new packet arrives on the socket 750 * after our call to soreceive fails with 751 * EWOULDBLOCK. 752 */ 753 SOCKBUF_LOCK(&so->so_rcv); 754 if (!soreadable(so)) 755 xprt_inactive_self(xprt); 756 SOCKBUF_UNLOCK(&so->so_rcv); 757 sx_xunlock(&xprt->xp_lock); 758 return (FALSE); 759 } 760 761 if (error) { 762 SOCKBUF_LOCK(&so->so_rcv); 763 if (xprt->xp_upcallset) { 764 xprt->xp_upcallset = 0; 765 soupcall_clear(so, SO_RCV); 766 } 767 SOCKBUF_UNLOCK(&so->so_rcv); 768 xprt_inactive_self(xprt); 769 cd->strm_stat = XPRT_DIED; 770 sx_xunlock(&xprt->xp_lock); 771 return (FALSE); 772 } 773 774 if (!m) { 775 /* 776 * EOF - the other end has closed the socket. 777 */ 778 xprt_inactive_self(xprt); 779 cd->strm_stat = XPRT_DIED; 780 sx_xunlock(&xprt->xp_lock); 781 return (FALSE); 782 } 783 784 if (cd->mpending) 785 m_last(cd->mpending)->m_next = m; 786 else 787 cd->mpending = m; 788 } 789 } 790 791 static bool_t 792 svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg, 793 struct sockaddr **addrp, struct mbuf **mp) 794 { 795 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 796 struct ct_data *ct; 797 struct mbuf *m; 798 XDR xdrs; 799 800 sx_xlock(&xprt->xp_lock); 801 ct = (struct ct_data *)xprt->xp_p2; 802 if (ct == NULL) { 803 sx_xunlock(&xprt->xp_lock); 804 return (FALSE); 805 } 806 mtx_lock(&ct->ct_lock); 807 m = cd->mreq; 808 if (m == NULL) { 809 xprt_inactive_self(xprt); 810 mtx_unlock(&ct->ct_lock); 811 sx_xunlock(&xprt->xp_lock); 812 return (FALSE); 813 } 814 cd->mreq = m->m_nextpkt; 815 mtx_unlock(&ct->ct_lock); 816 sx_xunlock(&xprt->xp_lock); 817 818 xdrmbuf_create(&xdrs, m, XDR_DECODE); 819 if (! xdr_callmsg(&xdrs, msg)) { 820 XDR_DESTROY(&xdrs); 821 return (FALSE); 822 } 823 *addrp = NULL; 824 *mp = xdrmbuf_getall(&xdrs); 825 XDR_DESTROY(&xdrs); 826 return (TRUE); 827 } 828 829 static bool_t 830 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, 831 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 832 { 833 XDR xdrs; 834 struct mbuf *mrep; 835 bool_t stat = TRUE; 836 int error, len; 837 838 /* 839 * Leave space for record mark. 840 */ 841 mrep = m_gethdr(M_WAITOK, MT_DATA); 842 mrep->m_data += sizeof(uint32_t); 843 844 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 845 846 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 847 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 848 if (!xdr_replymsg(&xdrs, msg)) 849 stat = FALSE; 850 else 851 xdrmbuf_append(&xdrs, m); 852 } else { 853 stat = xdr_replymsg(&xdrs, msg); 854 } 855 856 if (stat) { 857 m_fixhdr(mrep); 858 859 /* 860 * Prepend a record marker containing the reply length. 861 */ 862 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); 863 len = mrep->m_pkthdr.len; 864 *mtod(mrep, uint32_t *) = 865 htonl(0x80000000 | (len - sizeof(uint32_t))); 866 atomic_add_32(&xprt->xp_snd_cnt, len); 867 error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL, 868 0, curthread); 869 if (!error) { 870 atomic_add_rel_32(&xprt->xp_snt_cnt, len); 871 if (seq) 872 *seq = xprt->xp_snd_cnt; 873 stat = TRUE; 874 } else 875 atomic_subtract_32(&xprt->xp_snd_cnt, len); 876 } else { 877 m_freem(mrep); 878 } 879 880 XDR_DESTROY(&xdrs); 881 882 return (stat); 883 } 884 885 static bool_t 886 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg, 887 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 888 { 889 struct ct_data *ct; 890 XDR xdrs; 891 struct mbuf *mrep; 892 bool_t stat = TRUE; 893 int error; 894 895 /* 896 * Leave space for record mark. 897 */ 898 mrep = m_gethdr(M_WAITOK, MT_DATA); 899 mrep->m_data += sizeof(uint32_t); 900 901 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 902 903 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 904 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 905 if (!xdr_replymsg(&xdrs, msg)) 906 stat = FALSE; 907 else 908 xdrmbuf_append(&xdrs, m); 909 } else { 910 stat = xdr_replymsg(&xdrs, msg); 911 } 912 913 if (stat) { 914 m_fixhdr(mrep); 915 916 /* 917 * Prepend a record marker containing the reply length. 918 */ 919 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); 920 *mtod(mrep, uint32_t *) = 921 htonl(0x80000000 | (mrep->m_pkthdr.len 922 - sizeof(uint32_t))); 923 sx_xlock(&xprt->xp_lock); 924 ct = (struct ct_data *)xprt->xp_p2; 925 if (ct != NULL) 926 error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL, 927 0, curthread); 928 else 929 error = EPIPE; 930 sx_xunlock(&xprt->xp_lock); 931 if (!error) { 932 stat = TRUE; 933 } 934 } else { 935 m_freem(mrep); 936 } 937 938 XDR_DESTROY(&xdrs); 939 940 return (stat); 941 } 942 943 static bool_t 944 svc_vc_null() 945 { 946 947 return (FALSE); 948 } 949 950 static int 951 svc_vc_soupcall(struct socket *so, void *arg, int waitflag) 952 { 953 SVCXPRT *xprt = (SVCXPRT *) arg; 954 955 if (soreadable(xprt->xp_socket)) 956 xprt_active(xprt); 957 return (SU_OK); 958 } 959 960 static int 961 svc_vc_rendezvous_soupcall(struct socket *head, void *arg, int waitflag) 962 { 963 SVCXPRT *xprt = (SVCXPRT *) arg; 964 965 if (!TAILQ_EMPTY(&head->sol_comp)) 966 xprt_active(xprt); 967 return (SU_OK); 968 } 969 970 #if 0 971 /* 972 * Get the effective UID of the sending process. Used by rpcbind, keyserv 973 * and rpc.yppasswdd on AF_LOCAL. 974 */ 975 int 976 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) { 977 int sock, ret; 978 gid_t egid; 979 uid_t euid; 980 struct sockaddr *sa; 981 982 sock = transp->xp_fd; 983 sa = (struct sockaddr *)transp->xp_rtaddr; 984 if (sa->sa_family == AF_LOCAL) { 985 ret = getpeereid(sock, &euid, &egid); 986 if (ret == 0) 987 *uid = euid; 988 return (ret); 989 } else 990 return (-1); 991 } 992 #endif 993