1 /* $NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $ */ 2 3 /*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #if defined(LIBC_SCCS) && !defined(lint) 32 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro"; 33 static char *sccsid = "@(#)svc_tcp.c 2.2 88/08/01 4.0 RPCSRC"; 34 #endif 35 #include <sys/cdefs.h> 36 __FBSDID("$FreeBSD$"); 37 38 /* 39 * svc_vc.c, Server side for Connection Oriented based RPC. 40 * 41 * Actually implements two flavors of transporter - 42 * a tcp rendezvouser (a listner and connection establisher) 43 * and a record/tcp stream. 44 */ 45 46 #include <sys/param.h> 47 #include <sys/lock.h> 48 #include <sys/kernel.h> 49 #include <sys/malloc.h> 50 #include <sys/mbuf.h> 51 #include <sys/mutex.h> 52 #include <sys/proc.h> 53 #include <sys/protosw.h> 54 #include <sys/queue.h> 55 #include <sys/socket.h> 56 #include <sys/socketvar.h> 57 #include <sys/sx.h> 58 #include <sys/systm.h> 59 #include <sys/uio.h> 60 61 #include <net/vnet.h> 62 63 #include <netinet/tcp.h> 64 65 #include <rpc/rpc.h> 66 67 #include <rpc/krpc.h> 68 #include <rpc/rpc_com.h> 69 70 #include <security/mac/mac_framework.h> 71 72 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *, 73 struct sockaddr **, struct mbuf **); 74 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *); 75 static void svc_vc_rendezvous_destroy(SVCXPRT *); 76 static bool_t svc_vc_null(void); 77 static void svc_vc_destroy(SVCXPRT *); 78 static enum xprt_stat svc_vc_stat(SVCXPRT *); 79 static bool_t svc_vc_ack(SVCXPRT *, uint32_t *); 80 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *, 81 struct sockaddr **, struct mbuf **); 82 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *, 83 struct sockaddr *, struct mbuf *, uint32_t *seq); 84 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in); 85 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq, 86 void *in); 87 static void svc_vc_backchannel_destroy(SVCXPRT *); 88 static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *); 89 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *, 90 struct sockaddr **, struct mbuf **); 91 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *, 92 struct sockaddr *, struct mbuf *, uint32_t *); 93 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, 94 void *in); 95 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so, 96 struct sockaddr *raddr); 97 static int svc_vc_accept(struct socket *head, struct socket **sop); 98 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag); 99 100 static struct xp_ops svc_vc_rendezvous_ops = { 101 .xp_recv = svc_vc_rendezvous_recv, 102 .xp_stat = svc_vc_rendezvous_stat, 103 .xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *, 104 struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null, 105 .xp_destroy = svc_vc_rendezvous_destroy, 106 .xp_control = svc_vc_rendezvous_control 107 }; 108 109 static struct xp_ops svc_vc_ops = { 110 .xp_recv = svc_vc_recv, 111 .xp_stat = svc_vc_stat, 112 .xp_ack = svc_vc_ack, 113 .xp_reply = svc_vc_reply, 114 .xp_destroy = svc_vc_destroy, 115 .xp_control = svc_vc_control 116 }; 117 118 static struct xp_ops svc_vc_backchannel_ops = { 119 .xp_recv = svc_vc_backchannel_recv, 120 .xp_stat = svc_vc_backchannel_stat, 121 .xp_reply = svc_vc_backchannel_reply, 122 .xp_destroy = svc_vc_backchannel_destroy, 123 .xp_control = svc_vc_backchannel_control 124 }; 125 126 /* 127 * Usage: 128 * xprt = svc_vc_create(sock, send_buf_size, recv_buf_size); 129 * 130 * Creates, registers, and returns a (rpc) tcp based transporter. 131 * Once *xprt is initialized, it is registered as a transporter 132 * see (svc.h, xprt_register). This routine returns 133 * a NULL if a problem occurred. 134 * 135 * The filedescriptor passed in is expected to refer to a bound, but 136 * not yet connected socket. 137 * 138 * Since streams do buffered io similar to stdio, the caller can specify 139 * how big the send and receive buffers are via the second and third parms; 140 * 0 => use the system default. 141 */ 142 SVCXPRT * 143 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize, 144 size_t recvsize) 145 { 146 SVCXPRT *xprt = NULL; 147 struct sockaddr* sa; 148 int error; 149 150 SOCK_LOCK(so); 151 if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) { 152 SOCK_UNLOCK(so); 153 CURVNET_SET(so->so_vnet); 154 error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa); 155 CURVNET_RESTORE(); 156 if (error) 157 return (NULL); 158 xprt = svc_vc_create_conn(pool, so, sa); 159 free(sa, M_SONAME); 160 return (xprt); 161 } 162 SOCK_UNLOCK(so); 163 164 xprt = svc_xprt_alloc(); 165 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 166 xprt->xp_pool = pool; 167 xprt->xp_socket = so; 168 xprt->xp_p1 = NULL; 169 xprt->xp_p2 = NULL; 170 xprt->xp_ops = &svc_vc_rendezvous_ops; 171 172 CURVNET_SET(so->so_vnet); 173 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 174 CURVNET_RESTORE(); 175 if (error) { 176 goto cleanup_svc_vc_create; 177 } 178 179 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 180 free(sa, M_SONAME); 181 182 xprt_register(xprt); 183 184 solisten(so, -1, curthread); 185 186 SOCKBUF_LOCK(&so->so_rcv); 187 xprt->xp_upcallset = 1; 188 soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt); 189 SOCKBUF_UNLOCK(&so->so_rcv); 190 191 return (xprt); 192 cleanup_svc_vc_create: 193 if (xprt) { 194 sx_destroy(&xprt->xp_lock); 195 svc_xprt_free(xprt); 196 } 197 return (NULL); 198 } 199 200 /* 201 * Create a new transport for a socket optained via soaccept(). 202 */ 203 SVCXPRT * 204 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr) 205 { 206 SVCXPRT *xprt = NULL; 207 struct cf_conn *cd = NULL; 208 struct sockaddr* sa = NULL; 209 struct sockopt opt; 210 int one = 1; 211 int error; 212 213 bzero(&opt, sizeof(struct sockopt)); 214 opt.sopt_dir = SOPT_SET; 215 opt.sopt_level = SOL_SOCKET; 216 opt.sopt_name = SO_KEEPALIVE; 217 opt.sopt_val = &one; 218 opt.sopt_valsize = sizeof(one); 219 error = sosetopt(so, &opt); 220 if (error) { 221 return (NULL); 222 } 223 224 if (so->so_proto->pr_protocol == IPPROTO_TCP) { 225 bzero(&opt, sizeof(struct sockopt)); 226 opt.sopt_dir = SOPT_SET; 227 opt.sopt_level = IPPROTO_TCP; 228 opt.sopt_name = TCP_NODELAY; 229 opt.sopt_val = &one; 230 opt.sopt_valsize = sizeof(one); 231 error = sosetopt(so, &opt); 232 if (error) { 233 return (NULL); 234 } 235 } 236 237 cd = mem_alloc(sizeof(*cd)); 238 cd->strm_stat = XPRT_IDLE; 239 240 xprt = svc_xprt_alloc(); 241 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 242 xprt->xp_pool = pool; 243 xprt->xp_socket = so; 244 xprt->xp_p1 = cd; 245 xprt->xp_p2 = NULL; 246 xprt->xp_ops = &svc_vc_ops; 247 248 /* 249 * See http://www.connectathon.org/talks96/nfstcp.pdf - client 250 * has a 5 minute timer, server has a 6 minute timer. 251 */ 252 xprt->xp_idletimeout = 6 * 60; 253 254 memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len); 255 256 CURVNET_SET(so->so_vnet); 257 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 258 CURVNET_RESTORE(); 259 if (error) 260 goto cleanup_svc_vc_create; 261 262 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 263 free(sa, M_SONAME); 264 265 xprt_register(xprt); 266 267 SOCKBUF_LOCK(&so->so_rcv); 268 xprt->xp_upcallset = 1; 269 soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt); 270 SOCKBUF_UNLOCK(&so->so_rcv); 271 272 /* 273 * Throw the transport into the active list in case it already 274 * has some data buffered. 275 */ 276 sx_xlock(&xprt->xp_lock); 277 xprt_active(xprt); 278 sx_xunlock(&xprt->xp_lock); 279 280 return (xprt); 281 cleanup_svc_vc_create: 282 if (xprt) { 283 sx_destroy(&xprt->xp_lock); 284 svc_xprt_free(xprt); 285 } 286 if (cd) 287 mem_free(cd, sizeof(*cd)); 288 return (NULL); 289 } 290 291 /* 292 * Create a new transport for a backchannel on a clnt_vc socket. 293 */ 294 SVCXPRT * 295 svc_vc_create_backchannel(SVCPOOL *pool) 296 { 297 SVCXPRT *xprt = NULL; 298 struct cf_conn *cd = NULL; 299 300 cd = mem_alloc(sizeof(*cd)); 301 cd->strm_stat = XPRT_IDLE; 302 303 xprt = svc_xprt_alloc(); 304 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 305 xprt->xp_pool = pool; 306 xprt->xp_socket = NULL; 307 xprt->xp_p1 = cd; 308 xprt->xp_p2 = NULL; 309 xprt->xp_ops = &svc_vc_backchannel_ops; 310 return (xprt); 311 } 312 313 /* 314 * This does all of the accept except the final call to soaccept. The 315 * caller will call soaccept after dropping its locks (soaccept may 316 * call malloc). 317 */ 318 int 319 svc_vc_accept(struct socket *head, struct socket **sop) 320 { 321 int error = 0; 322 struct socket *so; 323 324 if ((head->so_options & SO_ACCEPTCONN) == 0) { 325 error = EINVAL; 326 goto done; 327 } 328 #ifdef MAC 329 error = mac_socket_check_accept(curthread->td_ucred, head); 330 if (error != 0) 331 goto done; 332 #endif 333 ACCEPT_LOCK(); 334 if (TAILQ_EMPTY(&head->so_comp)) { 335 ACCEPT_UNLOCK(); 336 error = EWOULDBLOCK; 337 goto done; 338 } 339 so = TAILQ_FIRST(&head->so_comp); 340 KASSERT(!(so->so_qstate & SQ_INCOMP), ("svc_vc_accept: so SQ_INCOMP")); 341 KASSERT(so->so_qstate & SQ_COMP, ("svc_vc_accept: so not SQ_COMP")); 342 343 /* 344 * Before changing the flags on the socket, we have to bump the 345 * reference count. Otherwise, if the protocol calls sofree(), 346 * the socket will be released due to a zero refcount. 347 * XXX might not need soref() since this is simpler than kern_accept. 348 */ 349 SOCK_LOCK(so); /* soref() and so_state update */ 350 soref(so); /* file descriptor reference */ 351 352 TAILQ_REMOVE(&head->so_comp, so, so_list); 353 head->so_qlen--; 354 so->so_state |= (head->so_state & SS_NBIO); 355 so->so_qstate &= ~SQ_COMP; 356 so->so_head = NULL; 357 358 SOCK_UNLOCK(so); 359 ACCEPT_UNLOCK(); 360 361 *sop = so; 362 363 /* connection has been removed from the listen queue */ 364 KNOTE_UNLOCKED(&head->so_rcv.sb_sel.si_note, 0); 365 done: 366 return (error); 367 } 368 369 /*ARGSUSED*/ 370 static bool_t 371 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg, 372 struct sockaddr **addrp, struct mbuf **mp) 373 { 374 struct socket *so = NULL; 375 struct sockaddr *sa = NULL; 376 int error; 377 SVCXPRT *new_xprt; 378 379 /* 380 * The socket upcall calls xprt_active() which will eventually 381 * cause the server to call us here. We attempt to accept a 382 * connection from the socket and turn it into a new 383 * transport. If the accept fails, we have drained all pending 384 * connections so we call xprt_inactive(). 385 */ 386 sx_xlock(&xprt->xp_lock); 387 388 error = svc_vc_accept(xprt->xp_socket, &so); 389 390 if (error == EWOULDBLOCK) { 391 /* 392 * We must re-test for new connections after taking 393 * the lock to protect us in the case where a new 394 * connection arrives after our call to accept fails 395 * with EWOULDBLOCK. 396 */ 397 ACCEPT_LOCK(); 398 if (TAILQ_EMPTY(&xprt->xp_socket->so_comp)) 399 xprt_inactive_self(xprt); 400 ACCEPT_UNLOCK(); 401 sx_xunlock(&xprt->xp_lock); 402 return (FALSE); 403 } 404 405 if (error) { 406 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); 407 if (xprt->xp_upcallset) { 408 xprt->xp_upcallset = 0; 409 soupcall_clear(xprt->xp_socket, SO_RCV); 410 } 411 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); 412 xprt_inactive_self(xprt); 413 sx_xunlock(&xprt->xp_lock); 414 return (FALSE); 415 } 416 417 sx_xunlock(&xprt->xp_lock); 418 419 sa = NULL; 420 error = soaccept(so, &sa); 421 422 if (error) { 423 /* 424 * XXX not sure if I need to call sofree or soclose here. 425 */ 426 if (sa) 427 free(sa, M_SONAME); 428 return (FALSE); 429 } 430 431 /* 432 * svc_vc_create_conn will call xprt_register - we don't need 433 * to do anything with the new connection except derefence it. 434 */ 435 new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa); 436 if (!new_xprt) { 437 soclose(so); 438 } else { 439 SVC_RELEASE(new_xprt); 440 } 441 442 free(sa, M_SONAME); 443 444 return (FALSE); /* there is never an rpc msg to be processed */ 445 } 446 447 /*ARGSUSED*/ 448 static enum xprt_stat 449 svc_vc_rendezvous_stat(SVCXPRT *xprt) 450 { 451 452 return (XPRT_IDLE); 453 } 454 455 static void 456 svc_vc_destroy_common(SVCXPRT *xprt) 457 { 458 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); 459 if (xprt->xp_upcallset) { 460 xprt->xp_upcallset = 0; 461 soupcall_clear(xprt->xp_socket, SO_RCV); 462 } 463 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); 464 465 if (xprt->xp_socket) 466 (void)soclose(xprt->xp_socket); 467 468 if (xprt->xp_netid) 469 (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1); 470 svc_xprt_free(xprt); 471 } 472 473 static void 474 svc_vc_rendezvous_destroy(SVCXPRT *xprt) 475 { 476 477 svc_vc_destroy_common(xprt); 478 } 479 480 static void 481 svc_vc_destroy(SVCXPRT *xprt) 482 { 483 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 484 485 svc_vc_destroy_common(xprt); 486 487 if (cd->mreq) 488 m_freem(cd->mreq); 489 if (cd->mpending) 490 m_freem(cd->mpending); 491 mem_free(cd, sizeof(*cd)); 492 } 493 494 static void 495 svc_vc_backchannel_destroy(SVCXPRT *xprt) 496 { 497 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 498 struct mbuf *m, *m2; 499 500 svc_xprt_free(xprt); 501 m = cd->mreq; 502 while (m != NULL) { 503 m2 = m; 504 m = m->m_nextpkt; 505 m_freem(m2); 506 } 507 mem_free(cd, sizeof(*cd)); 508 } 509 510 /*ARGSUSED*/ 511 static bool_t 512 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in) 513 { 514 return (FALSE); 515 } 516 517 static bool_t 518 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in) 519 { 520 521 return (FALSE); 522 } 523 524 static bool_t 525 svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in) 526 { 527 528 return (FALSE); 529 } 530 531 static enum xprt_stat 532 svc_vc_stat(SVCXPRT *xprt) 533 { 534 struct cf_conn *cd; 535 536 cd = (struct cf_conn *)(xprt->xp_p1); 537 538 if (cd->strm_stat == XPRT_DIED) 539 return (XPRT_DIED); 540 541 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) 542 return (XPRT_MOREREQS); 543 544 if (soreadable(xprt->xp_socket)) 545 return (XPRT_MOREREQS); 546 547 return (XPRT_IDLE); 548 } 549 550 static bool_t 551 svc_vc_ack(SVCXPRT *xprt, uint32_t *ack) 552 { 553 554 *ack = atomic_load_acq_32(&xprt->xp_snt_cnt); 555 *ack -= sbused(&xprt->xp_socket->so_snd); 556 return (TRUE); 557 } 558 559 static enum xprt_stat 560 svc_vc_backchannel_stat(SVCXPRT *xprt) 561 { 562 struct cf_conn *cd; 563 564 cd = (struct cf_conn *)(xprt->xp_p1); 565 566 if (cd->mreq != NULL) 567 return (XPRT_MOREREQS); 568 569 return (XPRT_IDLE); 570 } 571 572 /* 573 * If we have an mbuf chain in cd->mpending, try to parse a record from it, 574 * leaving the result in cd->mreq. If we don't have a complete record, leave 575 * the partial result in cd->mreq and try to read more from the socket. 576 */ 577 static int 578 svc_vc_process_pending(SVCXPRT *xprt) 579 { 580 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 581 struct socket *so = xprt->xp_socket; 582 struct mbuf *m; 583 584 /* 585 * If cd->resid is non-zero, we have part of the 586 * record already, otherwise we are expecting a record 587 * marker. 588 */ 589 if (!cd->resid && cd->mpending) { 590 /* 591 * See if there is enough data buffered to 592 * make up a record marker. Make sure we can 593 * handle the case where the record marker is 594 * split across more than one mbuf. 595 */ 596 size_t n = 0; 597 uint32_t header; 598 599 m = cd->mpending; 600 while (n < sizeof(uint32_t) && m) { 601 n += m->m_len; 602 m = m->m_next; 603 } 604 if (n < sizeof(uint32_t)) { 605 so->so_rcv.sb_lowat = sizeof(uint32_t) - n; 606 return (FALSE); 607 } 608 m_copydata(cd->mpending, 0, sizeof(header), 609 (char *)&header); 610 header = ntohl(header); 611 cd->eor = (header & 0x80000000) != 0; 612 cd->resid = header & 0x7fffffff; 613 m_adj(cd->mpending, sizeof(uint32_t)); 614 } 615 616 /* 617 * Start pulling off mbufs from cd->mpending 618 * until we either have a complete record or 619 * we run out of data. We use m_split to pull 620 * data - it will pull as much as possible and 621 * split the last mbuf if necessary. 622 */ 623 while (cd->mpending && cd->resid) { 624 m = cd->mpending; 625 if (cd->mpending->m_next 626 || cd->mpending->m_len > cd->resid) 627 cd->mpending = m_split(cd->mpending, 628 cd->resid, M_WAITOK); 629 else 630 cd->mpending = NULL; 631 if (cd->mreq) 632 m_last(cd->mreq)->m_next = m; 633 else 634 cd->mreq = m; 635 while (m) { 636 cd->resid -= m->m_len; 637 m = m->m_next; 638 } 639 } 640 641 /* 642 * Block receive upcalls if we have more data pending, 643 * otherwise report our need. 644 */ 645 if (cd->mpending) 646 so->so_rcv.sb_lowat = INT_MAX; 647 else 648 so->so_rcv.sb_lowat = 649 imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2)); 650 return (TRUE); 651 } 652 653 static bool_t 654 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, 655 struct sockaddr **addrp, struct mbuf **mp) 656 { 657 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 658 struct uio uio; 659 struct mbuf *m; 660 struct socket* so = xprt->xp_socket; 661 XDR xdrs; 662 int error, rcvflag; 663 uint32_t xid_plus_direction[2]; 664 665 /* 666 * Serialise access to the socket and our own record parsing 667 * state. 668 */ 669 sx_xlock(&xprt->xp_lock); 670 671 for (;;) { 672 /* If we have no request ready, check pending queue. */ 673 while (cd->mpending && 674 (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) { 675 if (!svc_vc_process_pending(xprt)) 676 break; 677 } 678 679 /* Process and return complete request in cd->mreq. */ 680 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) { 681 682 /* 683 * Now, check for a backchannel reply. 684 * The XID is in the first uint32_t of the reply 685 * and the message direction is the second one. 686 */ 687 if ((cd->mreq->m_len >= sizeof(xid_plus_direction) || 688 m_length(cd->mreq, NULL) >= 689 sizeof(xid_plus_direction)) && 690 xprt->xp_p2 != NULL) { 691 m_copydata(cd->mreq, 0, 692 sizeof(xid_plus_direction), 693 (char *)xid_plus_direction); 694 xid_plus_direction[0] = 695 ntohl(xid_plus_direction[0]); 696 xid_plus_direction[1] = 697 ntohl(xid_plus_direction[1]); 698 /* Check message direction. */ 699 if (xid_plus_direction[1] == REPLY) { 700 clnt_bck_svccall(xprt->xp_p2, 701 cd->mreq, 702 xid_plus_direction[0]); 703 cd->mreq = NULL; 704 continue; 705 } 706 } 707 708 xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); 709 cd->mreq = NULL; 710 711 /* Check for next request in a pending queue. */ 712 svc_vc_process_pending(xprt); 713 if (cd->mreq == NULL || cd->resid != 0) { 714 SOCKBUF_LOCK(&so->so_rcv); 715 if (!soreadable(so)) 716 xprt_inactive_self(xprt); 717 SOCKBUF_UNLOCK(&so->so_rcv); 718 } 719 720 sx_xunlock(&xprt->xp_lock); 721 722 if (! xdr_callmsg(&xdrs, msg)) { 723 XDR_DESTROY(&xdrs); 724 return (FALSE); 725 } 726 727 *addrp = NULL; 728 *mp = xdrmbuf_getall(&xdrs); 729 XDR_DESTROY(&xdrs); 730 731 return (TRUE); 732 } 733 734 /* 735 * The socket upcall calls xprt_active() which will eventually 736 * cause the server to call us here. We attempt to 737 * read as much as possible from the socket and put 738 * the result in cd->mpending. If the read fails, 739 * we have drained both cd->mpending and the socket so 740 * we can call xprt_inactive(). 741 */ 742 uio.uio_resid = 1000000000; 743 uio.uio_td = curthread; 744 m = NULL; 745 rcvflag = MSG_DONTWAIT; 746 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 747 748 if (error == EWOULDBLOCK) { 749 /* 750 * We must re-test for readability after 751 * taking the lock to protect us in the case 752 * where a new packet arrives on the socket 753 * after our call to soreceive fails with 754 * EWOULDBLOCK. 755 */ 756 SOCKBUF_LOCK(&so->so_rcv); 757 if (!soreadable(so)) 758 xprt_inactive_self(xprt); 759 SOCKBUF_UNLOCK(&so->so_rcv); 760 sx_xunlock(&xprt->xp_lock); 761 return (FALSE); 762 } 763 764 if (error) { 765 SOCKBUF_LOCK(&so->so_rcv); 766 if (xprt->xp_upcallset) { 767 xprt->xp_upcallset = 0; 768 soupcall_clear(so, SO_RCV); 769 } 770 SOCKBUF_UNLOCK(&so->so_rcv); 771 xprt_inactive_self(xprt); 772 cd->strm_stat = XPRT_DIED; 773 sx_xunlock(&xprt->xp_lock); 774 return (FALSE); 775 } 776 777 if (!m) { 778 /* 779 * EOF - the other end has closed the socket. 780 */ 781 xprt_inactive_self(xprt); 782 cd->strm_stat = XPRT_DIED; 783 sx_xunlock(&xprt->xp_lock); 784 return (FALSE); 785 } 786 787 if (cd->mpending) 788 m_last(cd->mpending)->m_next = m; 789 else 790 cd->mpending = m; 791 } 792 } 793 794 static bool_t 795 svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg, 796 struct sockaddr **addrp, struct mbuf **mp) 797 { 798 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 799 struct ct_data *ct; 800 struct mbuf *m; 801 XDR xdrs; 802 803 sx_xlock(&xprt->xp_lock); 804 ct = (struct ct_data *)xprt->xp_p2; 805 if (ct == NULL) { 806 sx_xunlock(&xprt->xp_lock); 807 return (FALSE); 808 } 809 mtx_lock(&ct->ct_lock); 810 m = cd->mreq; 811 if (m == NULL) { 812 xprt_inactive_self(xprt); 813 mtx_unlock(&ct->ct_lock); 814 sx_xunlock(&xprt->xp_lock); 815 return (FALSE); 816 } 817 cd->mreq = m->m_nextpkt; 818 mtx_unlock(&ct->ct_lock); 819 sx_xunlock(&xprt->xp_lock); 820 821 xdrmbuf_create(&xdrs, m, XDR_DECODE); 822 if (! xdr_callmsg(&xdrs, msg)) { 823 XDR_DESTROY(&xdrs); 824 return (FALSE); 825 } 826 *addrp = NULL; 827 *mp = xdrmbuf_getall(&xdrs); 828 XDR_DESTROY(&xdrs); 829 return (TRUE); 830 } 831 832 static bool_t 833 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, 834 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 835 { 836 XDR xdrs; 837 struct mbuf *mrep; 838 bool_t stat = TRUE; 839 int error, len; 840 841 /* 842 * Leave space for record mark. 843 */ 844 mrep = m_gethdr(M_WAITOK, MT_DATA); 845 mrep->m_data += sizeof(uint32_t); 846 847 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 848 849 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 850 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 851 if (!xdr_replymsg(&xdrs, msg)) 852 stat = FALSE; 853 else 854 xdrmbuf_append(&xdrs, m); 855 } else { 856 stat = xdr_replymsg(&xdrs, msg); 857 } 858 859 if (stat) { 860 m_fixhdr(mrep); 861 862 /* 863 * Prepend a record marker containing the reply length. 864 */ 865 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); 866 len = mrep->m_pkthdr.len; 867 *mtod(mrep, uint32_t *) = 868 htonl(0x80000000 | (len - sizeof(uint32_t))); 869 atomic_add_32(&xprt->xp_snd_cnt, len); 870 error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL, 871 0, curthread); 872 if (!error) { 873 atomic_add_rel_32(&xprt->xp_snt_cnt, len); 874 if (seq) 875 *seq = xprt->xp_snd_cnt; 876 stat = TRUE; 877 } else 878 atomic_subtract_32(&xprt->xp_snd_cnt, len); 879 } else { 880 m_freem(mrep); 881 } 882 883 XDR_DESTROY(&xdrs); 884 885 return (stat); 886 } 887 888 static bool_t 889 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg, 890 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 891 { 892 struct ct_data *ct; 893 XDR xdrs; 894 struct mbuf *mrep; 895 bool_t stat = TRUE; 896 int error; 897 898 /* 899 * Leave space for record mark. 900 */ 901 mrep = m_gethdr(M_WAITOK, MT_DATA); 902 mrep->m_data += sizeof(uint32_t); 903 904 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 905 906 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 907 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 908 if (!xdr_replymsg(&xdrs, msg)) 909 stat = FALSE; 910 else 911 xdrmbuf_append(&xdrs, m); 912 } else { 913 stat = xdr_replymsg(&xdrs, msg); 914 } 915 916 if (stat) { 917 m_fixhdr(mrep); 918 919 /* 920 * Prepend a record marker containing the reply length. 921 */ 922 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); 923 *mtod(mrep, uint32_t *) = 924 htonl(0x80000000 | (mrep->m_pkthdr.len 925 - sizeof(uint32_t))); 926 sx_xlock(&xprt->xp_lock); 927 ct = (struct ct_data *)xprt->xp_p2; 928 if (ct != NULL) 929 error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL, 930 0, curthread); 931 else 932 error = EPIPE; 933 sx_xunlock(&xprt->xp_lock); 934 if (!error) { 935 stat = TRUE; 936 } 937 } else { 938 m_freem(mrep); 939 } 940 941 XDR_DESTROY(&xdrs); 942 943 return (stat); 944 } 945 946 static bool_t 947 svc_vc_null() 948 { 949 950 return (FALSE); 951 } 952 953 static int 954 svc_vc_soupcall(struct socket *so, void *arg, int waitflag) 955 { 956 SVCXPRT *xprt = (SVCXPRT *) arg; 957 958 if (soreadable(xprt->xp_socket)) 959 xprt_active(xprt); 960 return (SU_OK); 961 } 962 963 #if 0 964 /* 965 * Get the effective UID of the sending process. Used by rpcbind, keyserv 966 * and rpc.yppasswdd on AF_LOCAL. 967 */ 968 int 969 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) { 970 int sock, ret; 971 gid_t egid; 972 uid_t euid; 973 struct sockaddr *sa; 974 975 sock = transp->xp_fd; 976 sa = (struct sockaddr *)transp->xp_rtaddr; 977 if (sa->sa_family == AF_LOCAL) { 978 ret = getpeereid(sock, &euid, &egid); 979 if (ret == 0) 980 *uid = euid; 981 return (ret); 982 } else 983 return (-1); 984 } 985 #endif 986