1 /* $NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #if defined(LIBC_SCCS) && !defined(lint) 34 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro"; 35 static char *sccsid = "@(#)svc_tcp.c 2.2 88/08/01 4.0 RPCSRC"; 36 #endif 37 #include <sys/cdefs.h> 38 __FBSDID("$FreeBSD$"); 39 40 /* 41 * svc_vc.c, Server side for Connection Oriented based RPC. 42 * 43 * Actually implements two flavors of transporter - 44 * a tcp rendezvouser (a listner and connection establisher) 45 * and a record/tcp stream. 46 */ 47 48 #include <sys/param.h> 49 #include <sys/limits.h> 50 #include <sys/lock.h> 51 #include <sys/kernel.h> 52 #include <sys/malloc.h> 53 #include <sys/mbuf.h> 54 #include <sys/mutex.h> 55 #include <sys/proc.h> 56 #include <sys/protosw.h> 57 #include <sys/queue.h> 58 #include <sys/socket.h> 59 #include <sys/socketvar.h> 60 #include <sys/sx.h> 61 #include <sys/systm.h> 62 #include <sys/uio.h> 63 64 #include <net/vnet.h> 65 66 #include <netinet/tcp.h> 67 68 #include <rpc/rpc.h> 69 70 #include <rpc/krpc.h> 71 #include <rpc/rpc_com.h> 72 73 #include <security/mac/mac_framework.h> 74 75 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *, 76 struct sockaddr **, struct mbuf **); 77 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *); 78 static void svc_vc_rendezvous_destroy(SVCXPRT *); 79 static bool_t svc_vc_null(void); 80 static void svc_vc_destroy(SVCXPRT *); 81 static enum xprt_stat svc_vc_stat(SVCXPRT *); 82 static bool_t svc_vc_ack(SVCXPRT *, uint32_t *); 83 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *, 84 struct sockaddr **, struct mbuf **); 85 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *, 86 struct sockaddr *, struct mbuf *, uint32_t *seq); 87 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in); 88 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq, 89 void *in); 90 static void svc_vc_backchannel_destroy(SVCXPRT *); 91 static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *); 92 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *, 93 struct sockaddr **, struct mbuf **); 94 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *, 95 struct sockaddr *, struct mbuf *, uint32_t *); 96 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, 97 void *in); 98 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so, 99 struct sockaddr *raddr); 100 static int svc_vc_accept(struct socket *head, struct socket **sop); 101 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag); 102 static int svc_vc_rendezvous_soupcall(struct socket *, void *, int); 103 104 static struct xp_ops svc_vc_rendezvous_ops = { 105 .xp_recv = svc_vc_rendezvous_recv, 106 .xp_stat = svc_vc_rendezvous_stat, 107 .xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *, 108 struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null, 109 .xp_destroy = svc_vc_rendezvous_destroy, 110 .xp_control = svc_vc_rendezvous_control 111 }; 112 113 static struct xp_ops svc_vc_ops = { 114 .xp_recv = svc_vc_recv, 115 .xp_stat = svc_vc_stat, 116 .xp_ack = svc_vc_ack, 117 .xp_reply = svc_vc_reply, 118 .xp_destroy = svc_vc_destroy, 119 .xp_control = svc_vc_control 120 }; 121 122 static struct xp_ops svc_vc_backchannel_ops = { 123 .xp_recv = svc_vc_backchannel_recv, 124 .xp_stat = svc_vc_backchannel_stat, 125 .xp_reply = svc_vc_backchannel_reply, 126 .xp_destroy = svc_vc_backchannel_destroy, 127 .xp_control = svc_vc_backchannel_control 128 }; 129 130 /* 131 * Usage: 132 * xprt = svc_vc_create(sock, send_buf_size, recv_buf_size); 133 * 134 * Creates, registers, and returns a (rpc) tcp based transporter. 135 * Once *xprt is initialized, it is registered as a transporter 136 * see (svc.h, xprt_register). This routine returns 137 * a NULL if a problem occurred. 138 * 139 * The filedescriptor passed in is expected to refer to a bound, but 140 * not yet connected socket. 141 * 142 * Since streams do buffered io similar to stdio, the caller can specify 143 * how big the send and receive buffers are via the second and third parms; 144 * 0 => use the system default. 145 */ 146 SVCXPRT * 147 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize, 148 size_t recvsize) 149 { 150 SVCXPRT *xprt; 151 struct sockaddr* sa; 152 int error; 153 154 SOCK_LOCK(so); 155 if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) { 156 SOCK_UNLOCK(so); 157 CURVNET_SET(so->so_vnet); 158 error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa); 159 CURVNET_RESTORE(); 160 if (error) 161 return (NULL); 162 xprt = svc_vc_create_conn(pool, so, sa); 163 free(sa, M_SONAME); 164 return (xprt); 165 } 166 SOCK_UNLOCK(so); 167 168 xprt = svc_xprt_alloc(); 169 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 170 xprt->xp_pool = pool; 171 xprt->xp_socket = so; 172 xprt->xp_p1 = NULL; 173 xprt->xp_p2 = NULL; 174 xprt->xp_ops = &svc_vc_rendezvous_ops; 175 176 CURVNET_SET(so->so_vnet); 177 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 178 CURVNET_RESTORE(); 179 if (error) { 180 goto cleanup_svc_vc_create; 181 } 182 183 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 184 free(sa, M_SONAME); 185 186 xprt_register(xprt); 187 188 solisten(so, -1, curthread); 189 190 SOLISTEN_LOCK(so); 191 xprt->xp_upcallset = 1; 192 solisten_upcall_set(so, svc_vc_rendezvous_soupcall, xprt); 193 SOLISTEN_UNLOCK(so); 194 195 return (xprt); 196 197 cleanup_svc_vc_create: 198 sx_destroy(&xprt->xp_lock); 199 svc_xprt_free(xprt); 200 201 return (NULL); 202 } 203 204 /* 205 * Create a new transport for a socket optained via soaccept(). 206 */ 207 SVCXPRT * 208 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr) 209 { 210 SVCXPRT *xprt; 211 struct cf_conn *cd; 212 struct sockaddr* sa = NULL; 213 struct sockopt opt; 214 int one = 1; 215 int error; 216 217 bzero(&opt, sizeof(struct sockopt)); 218 opt.sopt_dir = SOPT_SET; 219 opt.sopt_level = SOL_SOCKET; 220 opt.sopt_name = SO_KEEPALIVE; 221 opt.sopt_val = &one; 222 opt.sopt_valsize = sizeof(one); 223 error = sosetopt(so, &opt); 224 if (error) { 225 return (NULL); 226 } 227 228 if (so->so_proto->pr_protocol == IPPROTO_TCP) { 229 bzero(&opt, sizeof(struct sockopt)); 230 opt.sopt_dir = SOPT_SET; 231 opt.sopt_level = IPPROTO_TCP; 232 opt.sopt_name = TCP_NODELAY; 233 opt.sopt_val = &one; 234 opt.sopt_valsize = sizeof(one); 235 error = sosetopt(so, &opt); 236 if (error) { 237 return (NULL); 238 } 239 } 240 241 cd = mem_alloc(sizeof(*cd)); 242 cd->strm_stat = XPRT_IDLE; 243 244 xprt = svc_xprt_alloc(); 245 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 246 xprt->xp_pool = pool; 247 xprt->xp_socket = so; 248 xprt->xp_p1 = cd; 249 xprt->xp_p2 = NULL; 250 xprt->xp_ops = &svc_vc_ops; 251 252 /* 253 * See http://www.connectathon.org/talks96/nfstcp.pdf - client 254 * has a 5 minute timer, server has a 6 minute timer. 255 */ 256 xprt->xp_idletimeout = 6 * 60; 257 258 memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len); 259 260 CURVNET_SET(so->so_vnet); 261 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 262 CURVNET_RESTORE(); 263 if (error) 264 goto cleanup_svc_vc_create; 265 266 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 267 free(sa, M_SONAME); 268 269 xprt_register(xprt); 270 271 SOCKBUF_LOCK(&so->so_rcv); 272 xprt->xp_upcallset = 1; 273 soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt); 274 SOCKBUF_UNLOCK(&so->so_rcv); 275 276 /* 277 * Throw the transport into the active list in case it already 278 * has some data buffered. 279 */ 280 sx_xlock(&xprt->xp_lock); 281 xprt_active(xprt); 282 sx_xunlock(&xprt->xp_lock); 283 284 return (xprt); 285 cleanup_svc_vc_create: 286 sx_destroy(&xprt->xp_lock); 287 svc_xprt_free(xprt); 288 mem_free(cd, sizeof(*cd)); 289 290 return (NULL); 291 } 292 293 /* 294 * Create a new transport for a backchannel on a clnt_vc socket. 295 */ 296 SVCXPRT * 297 svc_vc_create_backchannel(SVCPOOL *pool) 298 { 299 SVCXPRT *xprt = NULL; 300 struct cf_conn *cd = NULL; 301 302 cd = mem_alloc(sizeof(*cd)); 303 cd->strm_stat = XPRT_IDLE; 304 305 xprt = svc_xprt_alloc(); 306 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 307 xprt->xp_pool = pool; 308 xprt->xp_socket = NULL; 309 xprt->xp_p1 = cd; 310 xprt->xp_p2 = NULL; 311 xprt->xp_ops = &svc_vc_backchannel_ops; 312 return (xprt); 313 } 314 315 /* 316 * This does all of the accept except the final call to soaccept. The 317 * caller will call soaccept after dropping its locks (soaccept may 318 * call malloc). 319 */ 320 int 321 svc_vc_accept(struct socket *head, struct socket **sop) 322 { 323 struct socket *so; 324 int error = 0; 325 short nbio; 326 327 /* XXXGL: shouldn't that be an assertion? */ 328 if ((head->so_options & SO_ACCEPTCONN) == 0) { 329 error = EINVAL; 330 goto done; 331 } 332 #ifdef MAC 333 error = mac_socket_check_accept(curthread->td_ucred, head); 334 if (error != 0) 335 goto done; 336 #endif 337 /* 338 * XXXGL: we want non-blocking semantics. The socket could be a 339 * socket created by kernel as well as socket shared with userland, 340 * so we can't be sure about presense of SS_NBIO. We also shall not 341 * toggle it on the socket, since that may surprise userland. So we 342 * set SS_NBIO only temporarily. 343 */ 344 SOLISTEN_LOCK(head); 345 nbio = head->so_state & SS_NBIO; 346 head->so_state |= SS_NBIO; 347 error = solisten_dequeue(head, &so, 0); 348 head->so_state &= (nbio & ~SS_NBIO); 349 if (error) 350 goto done; 351 352 so->so_state |= nbio; 353 *sop = so; 354 355 /* connection has been removed from the listen queue */ 356 KNOTE_UNLOCKED(&head->so_rdsel.si_note, 0); 357 done: 358 return (error); 359 } 360 361 /*ARGSUSED*/ 362 static bool_t 363 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg, 364 struct sockaddr **addrp, struct mbuf **mp) 365 { 366 struct socket *so = NULL; 367 struct sockaddr *sa = NULL; 368 int error; 369 SVCXPRT *new_xprt; 370 371 /* 372 * The socket upcall calls xprt_active() which will eventually 373 * cause the server to call us here. We attempt to accept a 374 * connection from the socket and turn it into a new 375 * transport. If the accept fails, we have drained all pending 376 * connections so we call xprt_inactive(). 377 */ 378 sx_xlock(&xprt->xp_lock); 379 380 error = svc_vc_accept(xprt->xp_socket, &so); 381 382 if (error == EWOULDBLOCK) { 383 /* 384 * We must re-test for new connections after taking 385 * the lock to protect us in the case where a new 386 * connection arrives after our call to accept fails 387 * with EWOULDBLOCK. 388 */ 389 SOLISTEN_LOCK(xprt->xp_socket); 390 if (TAILQ_EMPTY(&xprt->xp_socket->sol_comp)) 391 xprt_inactive_self(xprt); 392 SOLISTEN_UNLOCK(xprt->xp_socket); 393 sx_xunlock(&xprt->xp_lock); 394 return (FALSE); 395 } 396 397 if (error) { 398 SOLISTEN_LOCK(xprt->xp_socket); 399 if (xprt->xp_upcallset) { 400 xprt->xp_upcallset = 0; 401 soupcall_clear(xprt->xp_socket, SO_RCV); 402 } 403 SOLISTEN_UNLOCK(xprt->xp_socket); 404 xprt_inactive_self(xprt); 405 sx_xunlock(&xprt->xp_lock); 406 return (FALSE); 407 } 408 409 sx_xunlock(&xprt->xp_lock); 410 411 sa = NULL; 412 error = soaccept(so, &sa); 413 414 if (error) { 415 /* 416 * XXX not sure if I need to call sofree or soclose here. 417 */ 418 if (sa) 419 free(sa, M_SONAME); 420 return (FALSE); 421 } 422 423 /* 424 * svc_vc_create_conn will call xprt_register - we don't need 425 * to do anything with the new connection except derefence it. 426 */ 427 new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa); 428 if (!new_xprt) { 429 soclose(so); 430 } else { 431 SVC_RELEASE(new_xprt); 432 } 433 434 free(sa, M_SONAME); 435 436 return (FALSE); /* there is never an rpc msg to be processed */ 437 } 438 439 /*ARGSUSED*/ 440 static enum xprt_stat 441 svc_vc_rendezvous_stat(SVCXPRT *xprt) 442 { 443 444 return (XPRT_IDLE); 445 } 446 447 static void 448 svc_vc_destroy_common(SVCXPRT *xprt) 449 { 450 451 if (xprt->xp_socket) 452 (void)soclose(xprt->xp_socket); 453 454 if (xprt->xp_netid) 455 (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1); 456 svc_xprt_free(xprt); 457 } 458 459 static void 460 svc_vc_rendezvous_destroy(SVCXPRT *xprt) 461 { 462 463 SOLISTEN_LOCK(xprt->xp_socket); 464 if (xprt->xp_upcallset) { 465 xprt->xp_upcallset = 0; 466 solisten_upcall_set(xprt->xp_socket, NULL, NULL); 467 } 468 SOLISTEN_UNLOCK(xprt->xp_socket); 469 470 svc_vc_destroy_common(xprt); 471 } 472 473 static void 474 svc_vc_destroy(SVCXPRT *xprt) 475 { 476 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 477 478 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); 479 if (xprt->xp_upcallset) { 480 xprt->xp_upcallset = 0; 481 soupcall_clear(xprt->xp_socket, SO_RCV); 482 } 483 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); 484 485 svc_vc_destroy_common(xprt); 486 487 if (cd->mreq) 488 m_freem(cd->mreq); 489 if (cd->mpending) 490 m_freem(cd->mpending); 491 mem_free(cd, sizeof(*cd)); 492 } 493 494 static void 495 svc_vc_backchannel_destroy(SVCXPRT *xprt) 496 { 497 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 498 struct mbuf *m, *m2; 499 500 svc_xprt_free(xprt); 501 m = cd->mreq; 502 while (m != NULL) { 503 m2 = m; 504 m = m->m_nextpkt; 505 m_freem(m2); 506 } 507 mem_free(cd, sizeof(*cd)); 508 } 509 510 /*ARGSUSED*/ 511 static bool_t 512 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in) 513 { 514 return (FALSE); 515 } 516 517 static bool_t 518 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in) 519 { 520 521 return (FALSE); 522 } 523 524 static bool_t 525 svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in) 526 { 527 528 return (FALSE); 529 } 530 531 static enum xprt_stat 532 svc_vc_stat(SVCXPRT *xprt) 533 { 534 struct cf_conn *cd; 535 536 cd = (struct cf_conn *)(xprt->xp_p1); 537 538 if (cd->strm_stat == XPRT_DIED) 539 return (XPRT_DIED); 540 541 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) 542 return (XPRT_MOREREQS); 543 544 if (soreadable(xprt->xp_socket)) 545 return (XPRT_MOREREQS); 546 547 return (XPRT_IDLE); 548 } 549 550 static bool_t 551 svc_vc_ack(SVCXPRT *xprt, uint32_t *ack) 552 { 553 554 *ack = atomic_load_acq_32(&xprt->xp_snt_cnt); 555 *ack -= sbused(&xprt->xp_socket->so_snd); 556 return (TRUE); 557 } 558 559 static enum xprt_stat 560 svc_vc_backchannel_stat(SVCXPRT *xprt) 561 { 562 struct cf_conn *cd; 563 564 cd = (struct cf_conn *)(xprt->xp_p1); 565 566 if (cd->mreq != NULL) 567 return (XPRT_MOREREQS); 568 569 return (XPRT_IDLE); 570 } 571 572 /* 573 * If we have an mbuf chain in cd->mpending, try to parse a record from it, 574 * leaving the result in cd->mreq. If we don't have a complete record, leave 575 * the partial result in cd->mreq and try to read more from the socket. 576 */ 577 static int 578 svc_vc_process_pending(SVCXPRT *xprt) 579 { 580 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 581 struct socket *so = xprt->xp_socket; 582 struct mbuf *m; 583 584 /* 585 * If cd->resid is non-zero, we have part of the 586 * record already, otherwise we are expecting a record 587 * marker. 588 */ 589 if (!cd->resid && cd->mpending) { 590 /* 591 * See if there is enough data buffered to 592 * make up a record marker. Make sure we can 593 * handle the case where the record marker is 594 * split across more than one mbuf. 595 */ 596 size_t n = 0; 597 uint32_t header; 598 599 m = cd->mpending; 600 while (n < sizeof(uint32_t) && m) { 601 n += m->m_len; 602 m = m->m_next; 603 } 604 if (n < sizeof(uint32_t)) { 605 so->so_rcv.sb_lowat = sizeof(uint32_t) - n; 606 return (FALSE); 607 } 608 m_copydata(cd->mpending, 0, sizeof(header), 609 (char *)&header); 610 header = ntohl(header); 611 cd->eor = (header & 0x80000000) != 0; 612 cd->resid = header & 0x7fffffff; 613 m_adj(cd->mpending, sizeof(uint32_t)); 614 } 615 616 /* 617 * Start pulling off mbufs from cd->mpending 618 * until we either have a complete record or 619 * we run out of data. We use m_split to pull 620 * data - it will pull as much as possible and 621 * split the last mbuf if necessary. 622 */ 623 while (cd->mpending && cd->resid) { 624 m = cd->mpending; 625 if (cd->mpending->m_next 626 || cd->mpending->m_len > cd->resid) 627 cd->mpending = m_split(cd->mpending, 628 cd->resid, M_WAITOK); 629 else 630 cd->mpending = NULL; 631 if (cd->mreq) 632 m_last(cd->mreq)->m_next = m; 633 else 634 cd->mreq = m; 635 while (m) { 636 cd->resid -= m->m_len; 637 m = m->m_next; 638 } 639 } 640 641 /* 642 * Block receive upcalls if we have more data pending, 643 * otherwise report our need. 644 */ 645 if (cd->mpending) 646 so->so_rcv.sb_lowat = INT_MAX; 647 else 648 so->so_rcv.sb_lowat = 649 imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2)); 650 return (TRUE); 651 } 652 653 static bool_t 654 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, 655 struct sockaddr **addrp, struct mbuf **mp) 656 { 657 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 658 struct uio uio; 659 struct mbuf *m; 660 struct socket* so = xprt->xp_socket; 661 XDR xdrs; 662 int error, rcvflag; 663 uint32_t xid_plus_direction[2]; 664 665 /* 666 * Serialise access to the socket and our own record parsing 667 * state. 668 */ 669 sx_xlock(&xprt->xp_lock); 670 671 for (;;) { 672 /* If we have no request ready, check pending queue. */ 673 while (cd->mpending && 674 (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) { 675 if (!svc_vc_process_pending(xprt)) 676 break; 677 } 678 679 /* Process and return complete request in cd->mreq. */ 680 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) { 681 682 /* 683 * Now, check for a backchannel reply. 684 * The XID is in the first uint32_t of the reply 685 * and the message direction is the second one. 686 */ 687 if ((cd->mreq->m_len >= sizeof(xid_plus_direction) || 688 m_length(cd->mreq, NULL) >= 689 sizeof(xid_plus_direction)) && 690 xprt->xp_p2 != NULL) { 691 m_copydata(cd->mreq, 0, 692 sizeof(xid_plus_direction), 693 (char *)xid_plus_direction); 694 xid_plus_direction[0] = 695 ntohl(xid_plus_direction[0]); 696 xid_plus_direction[1] = 697 ntohl(xid_plus_direction[1]); 698 /* Check message direction. */ 699 if (xid_plus_direction[1] == REPLY) { 700 clnt_bck_svccall(xprt->xp_p2, 701 cd->mreq, 702 xid_plus_direction[0]); 703 cd->mreq = NULL; 704 continue; 705 } 706 } 707 708 xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); 709 cd->mreq = NULL; 710 711 /* Check for next request in a pending queue. */ 712 svc_vc_process_pending(xprt); 713 if (cd->mreq == NULL || cd->resid != 0) { 714 SOCKBUF_LOCK(&so->so_rcv); 715 if (!soreadable(so)) 716 xprt_inactive_self(xprt); 717 SOCKBUF_UNLOCK(&so->so_rcv); 718 } 719 720 sx_xunlock(&xprt->xp_lock); 721 722 if (! xdr_callmsg(&xdrs, msg)) { 723 XDR_DESTROY(&xdrs); 724 return (FALSE); 725 } 726 727 *addrp = NULL; 728 *mp = xdrmbuf_getall(&xdrs); 729 XDR_DESTROY(&xdrs); 730 731 return (TRUE); 732 } 733 734 /* 735 * The socket upcall calls xprt_active() which will eventually 736 * cause the server to call us here. We attempt to 737 * read as much as possible from the socket and put 738 * the result in cd->mpending. If the read fails, 739 * we have drained both cd->mpending and the socket so 740 * we can call xprt_inactive(). 741 */ 742 uio.uio_resid = 1000000000; 743 uio.uio_td = curthread; 744 m = NULL; 745 rcvflag = MSG_DONTWAIT; 746 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 747 748 if (error == EWOULDBLOCK) { 749 /* 750 * We must re-test for readability after 751 * taking the lock to protect us in the case 752 * where a new packet arrives on the socket 753 * after our call to soreceive fails with 754 * EWOULDBLOCK. 755 */ 756 SOCKBUF_LOCK(&so->so_rcv); 757 if (!soreadable(so)) 758 xprt_inactive_self(xprt); 759 SOCKBUF_UNLOCK(&so->so_rcv); 760 sx_xunlock(&xprt->xp_lock); 761 return (FALSE); 762 } 763 764 if (error) { 765 SOCKBUF_LOCK(&so->so_rcv); 766 if (xprt->xp_upcallset) { 767 xprt->xp_upcallset = 0; 768 soupcall_clear(so, SO_RCV); 769 } 770 SOCKBUF_UNLOCK(&so->so_rcv); 771 xprt_inactive_self(xprt); 772 cd->strm_stat = XPRT_DIED; 773 sx_xunlock(&xprt->xp_lock); 774 return (FALSE); 775 } 776 777 if (!m) { 778 /* 779 * EOF - the other end has closed the socket. 780 */ 781 xprt_inactive_self(xprt); 782 cd->strm_stat = XPRT_DIED; 783 sx_xunlock(&xprt->xp_lock); 784 return (FALSE); 785 } 786 787 if (cd->mpending) 788 m_last(cd->mpending)->m_next = m; 789 else 790 cd->mpending = m; 791 } 792 } 793 794 static bool_t 795 svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg, 796 struct sockaddr **addrp, struct mbuf **mp) 797 { 798 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 799 struct ct_data *ct; 800 struct mbuf *m; 801 XDR xdrs; 802 803 sx_xlock(&xprt->xp_lock); 804 ct = (struct ct_data *)xprt->xp_p2; 805 if (ct == NULL) { 806 sx_xunlock(&xprt->xp_lock); 807 return (FALSE); 808 } 809 mtx_lock(&ct->ct_lock); 810 m = cd->mreq; 811 if (m == NULL) { 812 xprt_inactive_self(xprt); 813 mtx_unlock(&ct->ct_lock); 814 sx_xunlock(&xprt->xp_lock); 815 return (FALSE); 816 } 817 cd->mreq = m->m_nextpkt; 818 mtx_unlock(&ct->ct_lock); 819 sx_xunlock(&xprt->xp_lock); 820 821 xdrmbuf_create(&xdrs, m, XDR_DECODE); 822 if (! xdr_callmsg(&xdrs, msg)) { 823 XDR_DESTROY(&xdrs); 824 return (FALSE); 825 } 826 *addrp = NULL; 827 *mp = xdrmbuf_getall(&xdrs); 828 XDR_DESTROY(&xdrs); 829 return (TRUE); 830 } 831 832 static bool_t 833 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, 834 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 835 { 836 XDR xdrs; 837 struct mbuf *mrep; 838 bool_t stat = TRUE; 839 int error, len; 840 841 /* 842 * Leave space for record mark. 843 */ 844 mrep = m_gethdr(M_WAITOK, MT_DATA); 845 mrep->m_data += sizeof(uint32_t); 846 847 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 848 849 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 850 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 851 if (!xdr_replymsg(&xdrs, msg)) 852 stat = FALSE; 853 else 854 xdrmbuf_append(&xdrs, m); 855 } else { 856 stat = xdr_replymsg(&xdrs, msg); 857 } 858 859 if (stat) { 860 m_fixhdr(mrep); 861 862 /* 863 * Prepend a record marker containing the reply length. 864 */ 865 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); 866 len = mrep->m_pkthdr.len; 867 *mtod(mrep, uint32_t *) = 868 htonl(0x80000000 | (len - sizeof(uint32_t))); 869 atomic_add_32(&xprt->xp_snd_cnt, len); 870 error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL, 871 0, curthread); 872 if (!error) { 873 atomic_add_rel_32(&xprt->xp_snt_cnt, len); 874 if (seq) 875 *seq = xprt->xp_snd_cnt; 876 stat = TRUE; 877 } else 878 atomic_subtract_32(&xprt->xp_snd_cnt, len); 879 } else { 880 m_freem(mrep); 881 } 882 883 XDR_DESTROY(&xdrs); 884 885 return (stat); 886 } 887 888 static bool_t 889 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg, 890 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 891 { 892 struct ct_data *ct; 893 XDR xdrs; 894 struct mbuf *mrep; 895 bool_t stat = TRUE; 896 int error; 897 898 /* 899 * Leave space for record mark. 900 */ 901 mrep = m_gethdr(M_WAITOK, MT_DATA); 902 mrep->m_data += sizeof(uint32_t); 903 904 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 905 906 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 907 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 908 if (!xdr_replymsg(&xdrs, msg)) 909 stat = FALSE; 910 else 911 xdrmbuf_append(&xdrs, m); 912 } else { 913 stat = xdr_replymsg(&xdrs, msg); 914 } 915 916 if (stat) { 917 m_fixhdr(mrep); 918 919 /* 920 * Prepend a record marker containing the reply length. 921 */ 922 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); 923 *mtod(mrep, uint32_t *) = 924 htonl(0x80000000 | (mrep->m_pkthdr.len 925 - sizeof(uint32_t))); 926 sx_xlock(&xprt->xp_lock); 927 ct = (struct ct_data *)xprt->xp_p2; 928 if (ct != NULL) 929 error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL, 930 0, curthread); 931 else 932 error = EPIPE; 933 sx_xunlock(&xprt->xp_lock); 934 if (!error) { 935 stat = TRUE; 936 } 937 } else { 938 m_freem(mrep); 939 } 940 941 XDR_DESTROY(&xdrs); 942 943 return (stat); 944 } 945 946 static bool_t 947 svc_vc_null() 948 { 949 950 return (FALSE); 951 } 952 953 static int 954 svc_vc_soupcall(struct socket *so, void *arg, int waitflag) 955 { 956 SVCXPRT *xprt = (SVCXPRT *) arg; 957 958 if (soreadable(xprt->xp_socket)) 959 xprt_active(xprt); 960 return (SU_OK); 961 } 962 963 static int 964 svc_vc_rendezvous_soupcall(struct socket *head, void *arg, int waitflag) 965 { 966 SVCXPRT *xprt = (SVCXPRT *) arg; 967 968 if (!TAILQ_EMPTY(&head->sol_comp)) 969 xprt_active(xprt); 970 return (SU_OK); 971 } 972 973 #if 0 974 /* 975 * Get the effective UID of the sending process. Used by rpcbind, keyserv 976 * and rpc.yppasswdd on AF_LOCAL. 977 */ 978 int 979 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) { 980 int sock, ret; 981 gid_t egid; 982 uid_t euid; 983 struct sockaddr *sa; 984 985 sock = transp->xp_fd; 986 sa = (struct sockaddr *)transp->xp_rtaddr; 987 if (sa->sa_family == AF_LOCAL) { 988 ret = getpeereid(sock, &euid, &egid); 989 if (ret == 0) 990 *uid = euid; 991 return (ret); 992 } else 993 return (-1); 994 } 995 #endif 996