1 /* $NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $ */ 2 3 /*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #if defined(LIBC_SCCS) && !defined(lint) 32 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro"; 33 static char *sccsid = "@(#)svc_tcp.c 2.2 88/08/01 4.0 RPCSRC"; 34 #endif 35 #include <sys/cdefs.h> 36 __FBSDID("$FreeBSD$"); 37 38 /* 39 * svc_vc.c, Server side for Connection Oriented based RPC. 40 * 41 * Actually implements two flavors of transporter - 42 * a tcp rendezvouser (a listner and connection establisher) 43 * and a record/tcp stream. 44 */ 45 46 #include <sys/param.h> 47 #include <sys/lock.h> 48 #include <sys/kernel.h> 49 #include <sys/malloc.h> 50 #include <sys/mbuf.h> 51 #include <sys/mutex.h> 52 #include <sys/proc.h> 53 #include <sys/protosw.h> 54 #include <sys/queue.h> 55 #include <sys/socket.h> 56 #include <sys/socketvar.h> 57 #include <sys/sx.h> 58 #include <sys/systm.h> 59 #include <sys/uio.h> 60 61 #include <net/vnet.h> 62 63 #include <netinet/tcp.h> 64 65 #include <rpc/rpc.h> 66 67 #include <rpc/krpc.h> 68 #include <rpc/rpc_com.h> 69 70 #include <security/mac/mac_framework.h> 71 72 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *, 73 struct sockaddr **, struct mbuf **); 74 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *); 75 static void svc_vc_rendezvous_destroy(SVCXPRT *); 76 static bool_t svc_vc_null(void); 77 static void svc_vc_destroy(SVCXPRT *); 78 static enum xprt_stat svc_vc_stat(SVCXPRT *); 79 static bool_t svc_vc_ack(SVCXPRT *, uint32_t *); 80 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *, 81 struct sockaddr **, struct mbuf **); 82 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *, 83 struct sockaddr *, struct mbuf *, uint32_t *seq); 84 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in); 85 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq, 86 void *in); 87 static void svc_vc_backchannel_destroy(SVCXPRT *); 88 static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *); 89 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *, 90 struct sockaddr **, struct mbuf **); 91 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *, 92 struct sockaddr *, struct mbuf *, uint32_t *); 93 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, 94 void *in); 95 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so, 96 struct sockaddr *raddr); 97 static int svc_vc_accept(struct socket *head, struct socket **sop); 98 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag); 99 100 static struct xp_ops svc_vc_rendezvous_ops = { 101 .xp_recv = svc_vc_rendezvous_recv, 102 .xp_stat = svc_vc_rendezvous_stat, 103 .xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *, 104 struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null, 105 .xp_destroy = svc_vc_rendezvous_destroy, 106 .xp_control = svc_vc_rendezvous_control 107 }; 108 109 static struct xp_ops svc_vc_ops = { 110 .xp_recv = svc_vc_recv, 111 .xp_stat = svc_vc_stat, 112 .xp_ack = svc_vc_ack, 113 .xp_reply = svc_vc_reply, 114 .xp_destroy = svc_vc_destroy, 115 .xp_control = svc_vc_control 116 }; 117 118 static struct xp_ops svc_vc_backchannel_ops = { 119 .xp_recv = svc_vc_backchannel_recv, 120 .xp_stat = svc_vc_backchannel_stat, 121 .xp_reply = svc_vc_backchannel_reply, 122 .xp_destroy = svc_vc_backchannel_destroy, 123 .xp_control = svc_vc_backchannel_control 124 }; 125 126 /* 127 * Usage: 128 * xprt = svc_vc_create(sock, send_buf_size, recv_buf_size); 129 * 130 * Creates, registers, and returns a (rpc) tcp based transporter. 131 * Once *xprt is initialized, it is registered as a transporter 132 * see (svc.h, xprt_register). This routine returns 133 * a NULL if a problem occurred. 134 * 135 * The filedescriptor passed in is expected to refer to a bound, but 136 * not yet connected socket. 137 * 138 * Since streams do buffered io similar to stdio, the caller can specify 139 * how big the send and receive buffers are via the second and third parms; 140 * 0 => use the system default. 141 */ 142 SVCXPRT * 143 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize, 144 size_t recvsize) 145 { 146 SVCXPRT *xprt; 147 struct sockaddr* sa; 148 int error; 149 150 SOCK_LOCK(so); 151 if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) { 152 SOCK_UNLOCK(so); 153 CURVNET_SET(so->so_vnet); 154 error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa); 155 CURVNET_RESTORE(); 156 if (error) 157 return (NULL); 158 xprt = svc_vc_create_conn(pool, so, sa); 159 free(sa, M_SONAME); 160 return (xprt); 161 } 162 SOCK_UNLOCK(so); 163 164 xprt = svc_xprt_alloc(); 165 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 166 xprt->xp_pool = pool; 167 xprt->xp_socket = so; 168 xprt->xp_p1 = NULL; 169 xprt->xp_p2 = NULL; 170 xprt->xp_ops = &svc_vc_rendezvous_ops; 171 172 CURVNET_SET(so->so_vnet); 173 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 174 CURVNET_RESTORE(); 175 if (error) { 176 goto cleanup_svc_vc_create; 177 } 178 179 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 180 free(sa, M_SONAME); 181 182 xprt_register(xprt); 183 184 solisten(so, -1, curthread); 185 186 SOCKBUF_LOCK(&so->so_rcv); 187 xprt->xp_upcallset = 1; 188 soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt); 189 SOCKBUF_UNLOCK(&so->so_rcv); 190 191 return (xprt); 192 193 cleanup_svc_vc_create: 194 sx_destroy(&xprt->xp_lock); 195 svc_xprt_free(xprt); 196 197 return (NULL); 198 } 199 200 /* 201 * Create a new transport for a socket optained via soaccept(). 202 */ 203 SVCXPRT * 204 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr) 205 { 206 SVCXPRT *xprt; 207 struct cf_conn *cd; 208 struct sockaddr* sa = NULL; 209 struct sockopt opt; 210 int one = 1; 211 int error; 212 213 bzero(&opt, sizeof(struct sockopt)); 214 opt.sopt_dir = SOPT_SET; 215 opt.sopt_level = SOL_SOCKET; 216 opt.sopt_name = SO_KEEPALIVE; 217 opt.sopt_val = &one; 218 opt.sopt_valsize = sizeof(one); 219 error = sosetopt(so, &opt); 220 if (error) { 221 return (NULL); 222 } 223 224 if (so->so_proto->pr_protocol == IPPROTO_TCP) { 225 bzero(&opt, sizeof(struct sockopt)); 226 opt.sopt_dir = SOPT_SET; 227 opt.sopt_level = IPPROTO_TCP; 228 opt.sopt_name = TCP_NODELAY; 229 opt.sopt_val = &one; 230 opt.sopt_valsize = sizeof(one); 231 error = sosetopt(so, &opt); 232 if (error) { 233 return (NULL); 234 } 235 } 236 237 cd = mem_alloc(sizeof(*cd)); 238 cd->strm_stat = XPRT_IDLE; 239 240 xprt = svc_xprt_alloc(); 241 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 242 xprt->xp_pool = pool; 243 xprt->xp_socket = so; 244 xprt->xp_p1 = cd; 245 xprt->xp_p2 = NULL; 246 xprt->xp_ops = &svc_vc_ops; 247 248 /* 249 * See http://www.connectathon.org/talks96/nfstcp.pdf - client 250 * has a 5 minute timer, server has a 6 minute timer. 251 */ 252 xprt->xp_idletimeout = 6 * 60; 253 254 memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len); 255 256 CURVNET_SET(so->so_vnet); 257 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 258 CURVNET_RESTORE(); 259 if (error) 260 goto cleanup_svc_vc_create; 261 262 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 263 free(sa, M_SONAME); 264 265 xprt_register(xprt); 266 267 SOCKBUF_LOCK(&so->so_rcv); 268 xprt->xp_upcallset = 1; 269 soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt); 270 SOCKBUF_UNLOCK(&so->so_rcv); 271 272 /* 273 * Throw the transport into the active list in case it already 274 * has some data buffered. 275 */ 276 sx_xlock(&xprt->xp_lock); 277 xprt_active(xprt); 278 sx_xunlock(&xprt->xp_lock); 279 280 return (xprt); 281 cleanup_svc_vc_create: 282 sx_destroy(&xprt->xp_lock); 283 svc_xprt_free(xprt); 284 mem_free(cd, sizeof(*cd)); 285 286 return (NULL); 287 } 288 289 /* 290 * Create a new transport for a backchannel on a clnt_vc socket. 291 */ 292 SVCXPRT * 293 svc_vc_create_backchannel(SVCPOOL *pool) 294 { 295 SVCXPRT *xprt = NULL; 296 struct cf_conn *cd = NULL; 297 298 cd = mem_alloc(sizeof(*cd)); 299 cd->strm_stat = XPRT_IDLE; 300 301 xprt = svc_xprt_alloc(); 302 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 303 xprt->xp_pool = pool; 304 xprt->xp_socket = NULL; 305 xprt->xp_p1 = cd; 306 xprt->xp_p2 = NULL; 307 xprt->xp_ops = &svc_vc_backchannel_ops; 308 return (xprt); 309 } 310 311 /* 312 * This does all of the accept except the final call to soaccept. The 313 * caller will call soaccept after dropping its locks (soaccept may 314 * call malloc). 315 */ 316 int 317 svc_vc_accept(struct socket *head, struct socket **sop) 318 { 319 int error = 0; 320 struct socket *so; 321 322 if ((head->so_options & SO_ACCEPTCONN) == 0) { 323 error = EINVAL; 324 goto done; 325 } 326 #ifdef MAC 327 error = mac_socket_check_accept(curthread->td_ucred, head); 328 if (error != 0) 329 goto done; 330 #endif 331 ACCEPT_LOCK(); 332 if (TAILQ_EMPTY(&head->so_comp)) { 333 ACCEPT_UNLOCK(); 334 error = EWOULDBLOCK; 335 goto done; 336 } 337 so = TAILQ_FIRST(&head->so_comp); 338 KASSERT(!(so->so_qstate & SQ_INCOMP), ("svc_vc_accept: so SQ_INCOMP")); 339 KASSERT(so->so_qstate & SQ_COMP, ("svc_vc_accept: so not SQ_COMP")); 340 341 /* 342 * Before changing the flags on the socket, we have to bump the 343 * reference count. Otherwise, if the protocol calls sofree(), 344 * the socket will be released due to a zero refcount. 345 * XXX might not need soref() since this is simpler than kern_accept. 346 */ 347 SOCK_LOCK(so); /* soref() and so_state update */ 348 soref(so); /* file descriptor reference */ 349 350 TAILQ_REMOVE(&head->so_comp, so, so_list); 351 head->so_qlen--; 352 so->so_state |= (head->so_state & SS_NBIO); 353 so->so_qstate &= ~SQ_COMP; 354 so->so_head = NULL; 355 356 SOCK_UNLOCK(so); 357 ACCEPT_UNLOCK(); 358 359 *sop = so; 360 361 /* connection has been removed from the listen queue */ 362 KNOTE_UNLOCKED(&head->so_rcv.sb_sel.si_note, 0); 363 done: 364 return (error); 365 } 366 367 /*ARGSUSED*/ 368 static bool_t 369 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg, 370 struct sockaddr **addrp, struct mbuf **mp) 371 { 372 struct socket *so = NULL; 373 struct sockaddr *sa = NULL; 374 int error; 375 SVCXPRT *new_xprt; 376 377 /* 378 * The socket upcall calls xprt_active() which will eventually 379 * cause the server to call us here. We attempt to accept a 380 * connection from the socket and turn it into a new 381 * transport. If the accept fails, we have drained all pending 382 * connections so we call xprt_inactive(). 383 */ 384 sx_xlock(&xprt->xp_lock); 385 386 error = svc_vc_accept(xprt->xp_socket, &so); 387 388 if (error == EWOULDBLOCK) { 389 /* 390 * We must re-test for new connections after taking 391 * the lock to protect us in the case where a new 392 * connection arrives after our call to accept fails 393 * with EWOULDBLOCK. 394 */ 395 ACCEPT_LOCK(); 396 if (TAILQ_EMPTY(&xprt->xp_socket->so_comp)) 397 xprt_inactive_self(xprt); 398 ACCEPT_UNLOCK(); 399 sx_xunlock(&xprt->xp_lock); 400 return (FALSE); 401 } 402 403 if (error) { 404 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); 405 if (xprt->xp_upcallset) { 406 xprt->xp_upcallset = 0; 407 soupcall_clear(xprt->xp_socket, SO_RCV); 408 } 409 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); 410 xprt_inactive_self(xprt); 411 sx_xunlock(&xprt->xp_lock); 412 return (FALSE); 413 } 414 415 sx_xunlock(&xprt->xp_lock); 416 417 sa = NULL; 418 error = soaccept(so, &sa); 419 420 if (error) { 421 /* 422 * XXX not sure if I need to call sofree or soclose here. 423 */ 424 if (sa) 425 free(sa, M_SONAME); 426 return (FALSE); 427 } 428 429 /* 430 * svc_vc_create_conn will call xprt_register - we don't need 431 * to do anything with the new connection except derefence it. 432 */ 433 new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa); 434 if (!new_xprt) { 435 soclose(so); 436 } else { 437 SVC_RELEASE(new_xprt); 438 } 439 440 free(sa, M_SONAME); 441 442 return (FALSE); /* there is never an rpc msg to be processed */ 443 } 444 445 /*ARGSUSED*/ 446 static enum xprt_stat 447 svc_vc_rendezvous_stat(SVCXPRT *xprt) 448 { 449 450 return (XPRT_IDLE); 451 } 452 453 static void 454 svc_vc_destroy_common(SVCXPRT *xprt) 455 { 456 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); 457 if (xprt->xp_upcallset) { 458 xprt->xp_upcallset = 0; 459 soupcall_clear(xprt->xp_socket, SO_RCV); 460 } 461 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); 462 463 if (xprt->xp_socket) 464 (void)soclose(xprt->xp_socket); 465 466 if (xprt->xp_netid) 467 (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1); 468 svc_xprt_free(xprt); 469 } 470 471 static void 472 svc_vc_rendezvous_destroy(SVCXPRT *xprt) 473 { 474 475 svc_vc_destroy_common(xprt); 476 } 477 478 static void 479 svc_vc_destroy(SVCXPRT *xprt) 480 { 481 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 482 483 svc_vc_destroy_common(xprt); 484 485 if (cd->mreq) 486 m_freem(cd->mreq); 487 if (cd->mpending) 488 m_freem(cd->mpending); 489 mem_free(cd, sizeof(*cd)); 490 } 491 492 static void 493 svc_vc_backchannel_destroy(SVCXPRT *xprt) 494 { 495 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 496 struct mbuf *m, *m2; 497 498 svc_xprt_free(xprt); 499 m = cd->mreq; 500 while (m != NULL) { 501 m2 = m; 502 m = m->m_nextpkt; 503 m_freem(m2); 504 } 505 mem_free(cd, sizeof(*cd)); 506 } 507 508 /*ARGSUSED*/ 509 static bool_t 510 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in) 511 { 512 return (FALSE); 513 } 514 515 static bool_t 516 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in) 517 { 518 519 return (FALSE); 520 } 521 522 static bool_t 523 svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in) 524 { 525 526 return (FALSE); 527 } 528 529 static enum xprt_stat 530 svc_vc_stat(SVCXPRT *xprt) 531 { 532 struct cf_conn *cd; 533 534 cd = (struct cf_conn *)(xprt->xp_p1); 535 536 if (cd->strm_stat == XPRT_DIED) 537 return (XPRT_DIED); 538 539 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) 540 return (XPRT_MOREREQS); 541 542 if (soreadable(xprt->xp_socket)) 543 return (XPRT_MOREREQS); 544 545 return (XPRT_IDLE); 546 } 547 548 static bool_t 549 svc_vc_ack(SVCXPRT *xprt, uint32_t *ack) 550 { 551 552 *ack = atomic_load_acq_32(&xprt->xp_snt_cnt); 553 *ack -= sbused(&xprt->xp_socket->so_snd); 554 return (TRUE); 555 } 556 557 static enum xprt_stat 558 svc_vc_backchannel_stat(SVCXPRT *xprt) 559 { 560 struct cf_conn *cd; 561 562 cd = (struct cf_conn *)(xprt->xp_p1); 563 564 if (cd->mreq != NULL) 565 return (XPRT_MOREREQS); 566 567 return (XPRT_IDLE); 568 } 569 570 /* 571 * If we have an mbuf chain in cd->mpending, try to parse a record from it, 572 * leaving the result in cd->mreq. If we don't have a complete record, leave 573 * the partial result in cd->mreq and try to read more from the socket. 574 */ 575 static int 576 svc_vc_process_pending(SVCXPRT *xprt) 577 { 578 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 579 struct socket *so = xprt->xp_socket; 580 struct mbuf *m; 581 582 /* 583 * If cd->resid is non-zero, we have part of the 584 * record already, otherwise we are expecting a record 585 * marker. 586 */ 587 if (!cd->resid && cd->mpending) { 588 /* 589 * See if there is enough data buffered to 590 * make up a record marker. Make sure we can 591 * handle the case where the record marker is 592 * split across more than one mbuf. 593 */ 594 size_t n = 0; 595 uint32_t header; 596 597 m = cd->mpending; 598 while (n < sizeof(uint32_t) && m) { 599 n += m->m_len; 600 m = m->m_next; 601 } 602 if (n < sizeof(uint32_t)) { 603 so->so_rcv.sb_lowat = sizeof(uint32_t) - n; 604 return (FALSE); 605 } 606 m_copydata(cd->mpending, 0, sizeof(header), 607 (char *)&header); 608 header = ntohl(header); 609 cd->eor = (header & 0x80000000) != 0; 610 cd->resid = header & 0x7fffffff; 611 m_adj(cd->mpending, sizeof(uint32_t)); 612 } 613 614 /* 615 * Start pulling off mbufs from cd->mpending 616 * until we either have a complete record or 617 * we run out of data. We use m_split to pull 618 * data - it will pull as much as possible and 619 * split the last mbuf if necessary. 620 */ 621 while (cd->mpending && cd->resid) { 622 m = cd->mpending; 623 if (cd->mpending->m_next 624 || cd->mpending->m_len > cd->resid) 625 cd->mpending = m_split(cd->mpending, 626 cd->resid, M_WAITOK); 627 else 628 cd->mpending = NULL; 629 if (cd->mreq) 630 m_last(cd->mreq)->m_next = m; 631 else 632 cd->mreq = m; 633 while (m) { 634 cd->resid -= m->m_len; 635 m = m->m_next; 636 } 637 } 638 639 /* 640 * Block receive upcalls if we have more data pending, 641 * otherwise report our need. 642 */ 643 if (cd->mpending) 644 so->so_rcv.sb_lowat = INT_MAX; 645 else 646 so->so_rcv.sb_lowat = 647 imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2)); 648 return (TRUE); 649 } 650 651 static bool_t 652 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, 653 struct sockaddr **addrp, struct mbuf **mp) 654 { 655 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 656 struct uio uio; 657 struct mbuf *m; 658 struct socket* so = xprt->xp_socket; 659 XDR xdrs; 660 int error, rcvflag; 661 uint32_t xid_plus_direction[2]; 662 663 /* 664 * Serialise access to the socket and our own record parsing 665 * state. 666 */ 667 sx_xlock(&xprt->xp_lock); 668 669 for (;;) { 670 /* If we have no request ready, check pending queue. */ 671 while (cd->mpending && 672 (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) { 673 if (!svc_vc_process_pending(xprt)) 674 break; 675 } 676 677 /* Process and return complete request in cd->mreq. */ 678 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) { 679 680 /* 681 * Now, check for a backchannel reply. 682 * The XID is in the first uint32_t of the reply 683 * and the message direction is the second one. 684 */ 685 if ((cd->mreq->m_len >= sizeof(xid_plus_direction) || 686 m_length(cd->mreq, NULL) >= 687 sizeof(xid_plus_direction)) && 688 xprt->xp_p2 != NULL) { 689 m_copydata(cd->mreq, 0, 690 sizeof(xid_plus_direction), 691 (char *)xid_plus_direction); 692 xid_plus_direction[0] = 693 ntohl(xid_plus_direction[0]); 694 xid_plus_direction[1] = 695 ntohl(xid_plus_direction[1]); 696 /* Check message direction. */ 697 if (xid_plus_direction[1] == REPLY) { 698 clnt_bck_svccall(xprt->xp_p2, 699 cd->mreq, 700 xid_plus_direction[0]); 701 cd->mreq = NULL; 702 continue; 703 } 704 } 705 706 xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); 707 cd->mreq = NULL; 708 709 /* Check for next request in a pending queue. */ 710 svc_vc_process_pending(xprt); 711 if (cd->mreq == NULL || cd->resid != 0) { 712 SOCKBUF_LOCK(&so->so_rcv); 713 if (!soreadable(so)) 714 xprt_inactive_self(xprt); 715 SOCKBUF_UNLOCK(&so->so_rcv); 716 } 717 718 sx_xunlock(&xprt->xp_lock); 719 720 if (! xdr_callmsg(&xdrs, msg)) { 721 XDR_DESTROY(&xdrs); 722 return (FALSE); 723 } 724 725 *addrp = NULL; 726 *mp = xdrmbuf_getall(&xdrs); 727 XDR_DESTROY(&xdrs); 728 729 return (TRUE); 730 } 731 732 /* 733 * The socket upcall calls xprt_active() which will eventually 734 * cause the server to call us here. We attempt to 735 * read as much as possible from the socket and put 736 * the result in cd->mpending. If the read fails, 737 * we have drained both cd->mpending and the socket so 738 * we can call xprt_inactive(). 739 */ 740 uio.uio_resid = 1000000000; 741 uio.uio_td = curthread; 742 m = NULL; 743 rcvflag = MSG_DONTWAIT; 744 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 745 746 if (error == EWOULDBLOCK) { 747 /* 748 * We must re-test for readability after 749 * taking the lock to protect us in the case 750 * where a new packet arrives on the socket 751 * after our call to soreceive fails with 752 * EWOULDBLOCK. 753 */ 754 SOCKBUF_LOCK(&so->so_rcv); 755 if (!soreadable(so)) 756 xprt_inactive_self(xprt); 757 SOCKBUF_UNLOCK(&so->so_rcv); 758 sx_xunlock(&xprt->xp_lock); 759 return (FALSE); 760 } 761 762 if (error) { 763 SOCKBUF_LOCK(&so->so_rcv); 764 if (xprt->xp_upcallset) { 765 xprt->xp_upcallset = 0; 766 soupcall_clear(so, SO_RCV); 767 } 768 SOCKBUF_UNLOCK(&so->so_rcv); 769 xprt_inactive_self(xprt); 770 cd->strm_stat = XPRT_DIED; 771 sx_xunlock(&xprt->xp_lock); 772 return (FALSE); 773 } 774 775 if (!m) { 776 /* 777 * EOF - the other end has closed the socket. 778 */ 779 xprt_inactive_self(xprt); 780 cd->strm_stat = XPRT_DIED; 781 sx_xunlock(&xprt->xp_lock); 782 return (FALSE); 783 } 784 785 if (cd->mpending) 786 m_last(cd->mpending)->m_next = m; 787 else 788 cd->mpending = m; 789 } 790 } 791 792 static bool_t 793 svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg, 794 struct sockaddr **addrp, struct mbuf **mp) 795 { 796 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 797 struct ct_data *ct; 798 struct mbuf *m; 799 XDR xdrs; 800 801 sx_xlock(&xprt->xp_lock); 802 ct = (struct ct_data *)xprt->xp_p2; 803 if (ct == NULL) { 804 sx_xunlock(&xprt->xp_lock); 805 return (FALSE); 806 } 807 mtx_lock(&ct->ct_lock); 808 m = cd->mreq; 809 if (m == NULL) { 810 xprt_inactive_self(xprt); 811 mtx_unlock(&ct->ct_lock); 812 sx_xunlock(&xprt->xp_lock); 813 return (FALSE); 814 } 815 cd->mreq = m->m_nextpkt; 816 mtx_unlock(&ct->ct_lock); 817 sx_xunlock(&xprt->xp_lock); 818 819 xdrmbuf_create(&xdrs, m, XDR_DECODE); 820 if (! xdr_callmsg(&xdrs, msg)) { 821 XDR_DESTROY(&xdrs); 822 return (FALSE); 823 } 824 *addrp = NULL; 825 *mp = xdrmbuf_getall(&xdrs); 826 XDR_DESTROY(&xdrs); 827 return (TRUE); 828 } 829 830 static bool_t 831 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, 832 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 833 { 834 XDR xdrs; 835 struct mbuf *mrep; 836 bool_t stat = TRUE; 837 int error, len; 838 839 /* 840 * Leave space for record mark. 841 */ 842 mrep = m_gethdr(M_WAITOK, MT_DATA); 843 mrep->m_data += sizeof(uint32_t); 844 845 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 846 847 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 848 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 849 if (!xdr_replymsg(&xdrs, msg)) 850 stat = FALSE; 851 else 852 xdrmbuf_append(&xdrs, m); 853 } else { 854 stat = xdr_replymsg(&xdrs, msg); 855 } 856 857 if (stat) { 858 m_fixhdr(mrep); 859 860 /* 861 * Prepend a record marker containing the reply length. 862 */ 863 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); 864 len = mrep->m_pkthdr.len; 865 *mtod(mrep, uint32_t *) = 866 htonl(0x80000000 | (len - sizeof(uint32_t))); 867 atomic_add_32(&xprt->xp_snd_cnt, len); 868 error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL, 869 0, curthread); 870 if (!error) { 871 atomic_add_rel_32(&xprt->xp_snt_cnt, len); 872 if (seq) 873 *seq = xprt->xp_snd_cnt; 874 stat = TRUE; 875 } else 876 atomic_subtract_32(&xprt->xp_snd_cnt, len); 877 } else { 878 m_freem(mrep); 879 } 880 881 XDR_DESTROY(&xdrs); 882 883 return (stat); 884 } 885 886 static bool_t 887 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg, 888 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 889 { 890 struct ct_data *ct; 891 XDR xdrs; 892 struct mbuf *mrep; 893 bool_t stat = TRUE; 894 int error; 895 896 /* 897 * Leave space for record mark. 898 */ 899 mrep = m_gethdr(M_WAITOK, MT_DATA); 900 mrep->m_data += sizeof(uint32_t); 901 902 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 903 904 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 905 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 906 if (!xdr_replymsg(&xdrs, msg)) 907 stat = FALSE; 908 else 909 xdrmbuf_append(&xdrs, m); 910 } else { 911 stat = xdr_replymsg(&xdrs, msg); 912 } 913 914 if (stat) { 915 m_fixhdr(mrep); 916 917 /* 918 * Prepend a record marker containing the reply length. 919 */ 920 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); 921 *mtod(mrep, uint32_t *) = 922 htonl(0x80000000 | (mrep->m_pkthdr.len 923 - sizeof(uint32_t))); 924 sx_xlock(&xprt->xp_lock); 925 ct = (struct ct_data *)xprt->xp_p2; 926 if (ct != NULL) 927 error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL, 928 0, curthread); 929 else 930 error = EPIPE; 931 sx_xunlock(&xprt->xp_lock); 932 if (!error) { 933 stat = TRUE; 934 } 935 } else { 936 m_freem(mrep); 937 } 938 939 XDR_DESTROY(&xdrs); 940 941 return (stat); 942 } 943 944 static bool_t 945 svc_vc_null() 946 { 947 948 return (FALSE); 949 } 950 951 static int 952 svc_vc_soupcall(struct socket *so, void *arg, int waitflag) 953 { 954 SVCXPRT *xprt = (SVCXPRT *) arg; 955 956 if (soreadable(xprt->xp_socket)) 957 xprt_active(xprt); 958 return (SU_OK); 959 } 960 961 #if 0 962 /* 963 * Get the effective UID of the sending process. Used by rpcbind, keyserv 964 * and rpc.yppasswdd on AF_LOCAL. 965 */ 966 int 967 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) { 968 int sock, ret; 969 gid_t egid; 970 uid_t euid; 971 struct sockaddr *sa; 972 973 sock = transp->xp_fd; 974 sa = (struct sockaddr *)transp->xp_rtaddr; 975 if (sa->sa_family == AF_LOCAL) { 976 ret = getpeereid(sock, &euid, &egid); 977 if (ret == 0) 978 *uid = euid; 979 return (ret); 980 } else 981 return (-1); 982 } 983 #endif 984