1 /* $NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #if defined(LIBC_SCCS) && !defined(lint) 34 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro"; 35 static char *sccsid = "@(#)svc_tcp.c 2.2 88/08/01 4.0 RPCSRC"; 36 #endif 37 #include <sys/cdefs.h> 38 __FBSDID("$FreeBSD$"); 39 40 /* 41 * svc_vc.c, Server side for Connection Oriented based RPC. 42 * 43 * Actually implements two flavors of transporter - 44 * a tcp rendezvouser (a listner and connection establisher) 45 * and a record/tcp stream. 46 */ 47 48 #include <sys/param.h> 49 #include <sys/lock.h> 50 #include <sys/kernel.h> 51 #include <sys/malloc.h> 52 #include <sys/mbuf.h> 53 #include <sys/mutex.h> 54 #include <sys/proc.h> 55 #include <sys/protosw.h> 56 #include <sys/queue.h> 57 #include <sys/socket.h> 58 #include <sys/socketvar.h> 59 #include <sys/sx.h> 60 #include <sys/systm.h> 61 #include <sys/uio.h> 62 63 #include <net/vnet.h> 64 65 #include <netinet/tcp.h> 66 67 #include <rpc/rpc.h> 68 69 #include <rpc/krpc.h> 70 #include <rpc/rpc_com.h> 71 72 #include <security/mac/mac_framework.h> 73 74 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *, 75 struct sockaddr **, struct mbuf **); 76 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *); 77 static void svc_vc_rendezvous_destroy(SVCXPRT *); 78 static bool_t svc_vc_null(void); 79 static void svc_vc_destroy(SVCXPRT *); 80 static enum xprt_stat svc_vc_stat(SVCXPRT *); 81 static bool_t svc_vc_ack(SVCXPRT *, uint32_t *); 82 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *, 83 struct sockaddr **, struct mbuf **); 84 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *, 85 struct sockaddr *, struct mbuf *, uint32_t *seq); 86 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in); 87 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq, 88 void *in); 89 static void svc_vc_backchannel_destroy(SVCXPRT *); 90 static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *); 91 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *, 92 struct sockaddr **, struct mbuf **); 93 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *, 94 struct sockaddr *, struct mbuf *, uint32_t *); 95 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, 96 void *in); 97 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so, 98 struct sockaddr *raddr); 99 static int svc_vc_accept(struct socket *head, struct socket **sop); 100 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag); 101 static int svc_vc_rendezvous_soupcall(struct socket *, void *, int); 102 103 static struct xp_ops svc_vc_rendezvous_ops = { 104 .xp_recv = svc_vc_rendezvous_recv, 105 .xp_stat = svc_vc_rendezvous_stat, 106 .xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *, 107 struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null, 108 .xp_destroy = svc_vc_rendezvous_destroy, 109 .xp_control = svc_vc_rendezvous_control 110 }; 111 112 static struct xp_ops svc_vc_ops = { 113 .xp_recv = svc_vc_recv, 114 .xp_stat = svc_vc_stat, 115 .xp_ack = svc_vc_ack, 116 .xp_reply = svc_vc_reply, 117 .xp_destroy = svc_vc_destroy, 118 .xp_control = svc_vc_control 119 }; 120 121 static struct xp_ops svc_vc_backchannel_ops = { 122 .xp_recv = svc_vc_backchannel_recv, 123 .xp_stat = svc_vc_backchannel_stat, 124 .xp_reply = svc_vc_backchannel_reply, 125 .xp_destroy = svc_vc_backchannel_destroy, 126 .xp_control = svc_vc_backchannel_control 127 }; 128 129 /* 130 * Usage: 131 * xprt = svc_vc_create(sock, send_buf_size, recv_buf_size); 132 * 133 * Creates, registers, and returns a (rpc) tcp based transporter. 134 * Once *xprt is initialized, it is registered as a transporter 135 * see (svc.h, xprt_register). This routine returns 136 * a NULL if a problem occurred. 137 * 138 * The filedescriptor passed in is expected to refer to a bound, but 139 * not yet connected socket. 140 * 141 * Since streams do buffered io similar to stdio, the caller can specify 142 * how big the send and receive buffers are via the second and third parms; 143 * 0 => use the system default. 144 */ 145 SVCXPRT * 146 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize, 147 size_t recvsize) 148 { 149 SVCXPRT *xprt; 150 struct sockaddr* sa; 151 int error; 152 153 SOCK_LOCK(so); 154 if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) { 155 SOCK_UNLOCK(so); 156 CURVNET_SET(so->so_vnet); 157 error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa); 158 CURVNET_RESTORE(); 159 if (error) 160 return (NULL); 161 xprt = svc_vc_create_conn(pool, so, sa); 162 free(sa, M_SONAME); 163 return (xprt); 164 } 165 SOCK_UNLOCK(so); 166 167 xprt = svc_xprt_alloc(); 168 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 169 xprt->xp_pool = pool; 170 xprt->xp_socket = so; 171 xprt->xp_p1 = NULL; 172 xprt->xp_p2 = NULL; 173 xprt->xp_ops = &svc_vc_rendezvous_ops; 174 175 CURVNET_SET(so->so_vnet); 176 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 177 CURVNET_RESTORE(); 178 if (error) { 179 goto cleanup_svc_vc_create; 180 } 181 182 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 183 free(sa, M_SONAME); 184 185 xprt_register(xprt); 186 187 solisten(so, -1, curthread); 188 189 SOLISTEN_LOCK(so); 190 xprt->xp_upcallset = 1; 191 solisten_upcall_set(so, svc_vc_rendezvous_soupcall, xprt); 192 SOLISTEN_UNLOCK(so); 193 194 return (xprt); 195 196 cleanup_svc_vc_create: 197 sx_destroy(&xprt->xp_lock); 198 svc_xprt_free(xprt); 199 200 return (NULL); 201 } 202 203 /* 204 * Create a new transport for a socket optained via soaccept(). 205 */ 206 SVCXPRT * 207 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr) 208 { 209 SVCXPRT *xprt; 210 struct cf_conn *cd; 211 struct sockaddr* sa = NULL; 212 struct sockopt opt; 213 int one = 1; 214 int error; 215 216 bzero(&opt, sizeof(struct sockopt)); 217 opt.sopt_dir = SOPT_SET; 218 opt.sopt_level = SOL_SOCKET; 219 opt.sopt_name = SO_KEEPALIVE; 220 opt.sopt_val = &one; 221 opt.sopt_valsize = sizeof(one); 222 error = sosetopt(so, &opt); 223 if (error) { 224 return (NULL); 225 } 226 227 if (so->so_proto->pr_protocol == IPPROTO_TCP) { 228 bzero(&opt, sizeof(struct sockopt)); 229 opt.sopt_dir = SOPT_SET; 230 opt.sopt_level = IPPROTO_TCP; 231 opt.sopt_name = TCP_NODELAY; 232 opt.sopt_val = &one; 233 opt.sopt_valsize = sizeof(one); 234 error = sosetopt(so, &opt); 235 if (error) { 236 return (NULL); 237 } 238 } 239 240 cd = mem_alloc(sizeof(*cd)); 241 cd->strm_stat = XPRT_IDLE; 242 243 xprt = svc_xprt_alloc(); 244 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 245 xprt->xp_pool = pool; 246 xprt->xp_socket = so; 247 xprt->xp_p1 = cd; 248 xprt->xp_p2 = NULL; 249 xprt->xp_ops = &svc_vc_ops; 250 251 /* 252 * See http://www.connectathon.org/talks96/nfstcp.pdf - client 253 * has a 5 minute timer, server has a 6 minute timer. 254 */ 255 xprt->xp_idletimeout = 6 * 60; 256 257 memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len); 258 259 CURVNET_SET(so->so_vnet); 260 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 261 CURVNET_RESTORE(); 262 if (error) 263 goto cleanup_svc_vc_create; 264 265 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 266 free(sa, M_SONAME); 267 268 xprt_register(xprt); 269 270 SOCKBUF_LOCK(&so->so_rcv); 271 xprt->xp_upcallset = 1; 272 soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt); 273 SOCKBUF_UNLOCK(&so->so_rcv); 274 275 /* 276 * Throw the transport into the active list in case it already 277 * has some data buffered. 278 */ 279 sx_xlock(&xprt->xp_lock); 280 xprt_active(xprt); 281 sx_xunlock(&xprt->xp_lock); 282 283 return (xprt); 284 cleanup_svc_vc_create: 285 sx_destroy(&xprt->xp_lock); 286 svc_xprt_free(xprt); 287 mem_free(cd, sizeof(*cd)); 288 289 return (NULL); 290 } 291 292 /* 293 * Create a new transport for a backchannel on a clnt_vc socket. 294 */ 295 SVCXPRT * 296 svc_vc_create_backchannel(SVCPOOL *pool) 297 { 298 SVCXPRT *xprt = NULL; 299 struct cf_conn *cd = NULL; 300 301 cd = mem_alloc(sizeof(*cd)); 302 cd->strm_stat = XPRT_IDLE; 303 304 xprt = svc_xprt_alloc(); 305 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 306 xprt->xp_pool = pool; 307 xprt->xp_socket = NULL; 308 xprt->xp_p1 = cd; 309 xprt->xp_p2 = NULL; 310 xprt->xp_ops = &svc_vc_backchannel_ops; 311 return (xprt); 312 } 313 314 /* 315 * This does all of the accept except the final call to soaccept. The 316 * caller will call soaccept after dropping its locks (soaccept may 317 * call malloc). 318 */ 319 int 320 svc_vc_accept(struct socket *head, struct socket **sop) 321 { 322 struct socket *so; 323 int error = 0; 324 short nbio; 325 326 /* XXXGL: shouldn't that be an assertion? */ 327 if ((head->so_options & SO_ACCEPTCONN) == 0) { 328 error = EINVAL; 329 goto done; 330 } 331 #ifdef MAC 332 error = mac_socket_check_accept(curthread->td_ucred, head); 333 if (error != 0) 334 goto done; 335 #endif 336 /* 337 * XXXGL: we want non-blocking semantics. The socket could be a 338 * socket created by kernel as well as socket shared with userland, 339 * so we can't be sure about presense of SS_NBIO. We also shall not 340 * toggle it on the socket, since that may surprise userland. So we 341 * set SS_NBIO only temporarily. 342 */ 343 SOLISTEN_LOCK(head); 344 nbio = head->so_state & SS_NBIO; 345 head->so_state |= SS_NBIO; 346 error = solisten_dequeue(head, &so, 0); 347 head->so_state &= (nbio & ~SS_NBIO); 348 if (error) 349 goto done; 350 351 so->so_state |= nbio; 352 *sop = so; 353 354 /* connection has been removed from the listen queue */ 355 KNOTE_UNLOCKED(&head->so_rdsel.si_note, 0); 356 done: 357 return (error); 358 } 359 360 /*ARGSUSED*/ 361 static bool_t 362 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg, 363 struct sockaddr **addrp, struct mbuf **mp) 364 { 365 struct socket *so = NULL; 366 struct sockaddr *sa = NULL; 367 int error; 368 SVCXPRT *new_xprt; 369 370 /* 371 * The socket upcall calls xprt_active() which will eventually 372 * cause the server to call us here. We attempt to accept a 373 * connection from the socket and turn it into a new 374 * transport. If the accept fails, we have drained all pending 375 * connections so we call xprt_inactive(). 376 */ 377 sx_xlock(&xprt->xp_lock); 378 379 error = svc_vc_accept(xprt->xp_socket, &so); 380 381 if (error == EWOULDBLOCK) { 382 /* 383 * We must re-test for new connections after taking 384 * the lock to protect us in the case where a new 385 * connection arrives after our call to accept fails 386 * with EWOULDBLOCK. 387 */ 388 SOLISTEN_LOCK(xprt->xp_socket); 389 if (TAILQ_EMPTY(&xprt->xp_socket->sol_comp)) 390 xprt_inactive_self(xprt); 391 SOLISTEN_UNLOCK(xprt->xp_socket); 392 sx_xunlock(&xprt->xp_lock); 393 return (FALSE); 394 } 395 396 if (error) { 397 SOLISTEN_LOCK(xprt->xp_socket); 398 if (xprt->xp_upcallset) { 399 xprt->xp_upcallset = 0; 400 soupcall_clear(xprt->xp_socket, SO_RCV); 401 } 402 SOLISTEN_UNLOCK(xprt->xp_socket); 403 xprt_inactive_self(xprt); 404 sx_xunlock(&xprt->xp_lock); 405 return (FALSE); 406 } 407 408 sx_xunlock(&xprt->xp_lock); 409 410 sa = NULL; 411 error = soaccept(so, &sa); 412 413 if (error) { 414 /* 415 * XXX not sure if I need to call sofree or soclose here. 416 */ 417 if (sa) 418 free(sa, M_SONAME); 419 return (FALSE); 420 } 421 422 /* 423 * svc_vc_create_conn will call xprt_register - we don't need 424 * to do anything with the new connection except derefence it. 425 */ 426 new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa); 427 if (!new_xprt) { 428 soclose(so); 429 } else { 430 SVC_RELEASE(new_xprt); 431 } 432 433 free(sa, M_SONAME); 434 435 return (FALSE); /* there is never an rpc msg to be processed */ 436 } 437 438 /*ARGSUSED*/ 439 static enum xprt_stat 440 svc_vc_rendezvous_stat(SVCXPRT *xprt) 441 { 442 443 return (XPRT_IDLE); 444 } 445 446 static void 447 svc_vc_destroy_common(SVCXPRT *xprt) 448 { 449 450 if (xprt->xp_socket) 451 (void)soclose(xprt->xp_socket); 452 453 if (xprt->xp_netid) 454 (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1); 455 svc_xprt_free(xprt); 456 } 457 458 static void 459 svc_vc_rendezvous_destroy(SVCXPRT *xprt) 460 { 461 462 SOLISTEN_LOCK(xprt->xp_socket); 463 if (xprt->xp_upcallset) { 464 xprt->xp_upcallset = 0; 465 solisten_upcall_set(xprt->xp_socket, NULL, NULL); 466 } 467 SOLISTEN_UNLOCK(xprt->xp_socket); 468 469 svc_vc_destroy_common(xprt); 470 } 471 472 static void 473 svc_vc_destroy(SVCXPRT *xprt) 474 { 475 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 476 477 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); 478 if (xprt->xp_upcallset) { 479 xprt->xp_upcallset = 0; 480 soupcall_clear(xprt->xp_socket, SO_RCV); 481 } 482 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); 483 484 svc_vc_destroy_common(xprt); 485 486 if (cd->mreq) 487 m_freem(cd->mreq); 488 if (cd->mpending) 489 m_freem(cd->mpending); 490 mem_free(cd, sizeof(*cd)); 491 } 492 493 static void 494 svc_vc_backchannel_destroy(SVCXPRT *xprt) 495 { 496 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 497 struct mbuf *m, *m2; 498 499 svc_xprt_free(xprt); 500 m = cd->mreq; 501 while (m != NULL) { 502 m2 = m; 503 m = m->m_nextpkt; 504 m_freem(m2); 505 } 506 mem_free(cd, sizeof(*cd)); 507 } 508 509 /*ARGSUSED*/ 510 static bool_t 511 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in) 512 { 513 return (FALSE); 514 } 515 516 static bool_t 517 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in) 518 { 519 520 return (FALSE); 521 } 522 523 static bool_t 524 svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in) 525 { 526 527 return (FALSE); 528 } 529 530 static enum xprt_stat 531 svc_vc_stat(SVCXPRT *xprt) 532 { 533 struct cf_conn *cd; 534 535 cd = (struct cf_conn *)(xprt->xp_p1); 536 537 if (cd->strm_stat == XPRT_DIED) 538 return (XPRT_DIED); 539 540 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) 541 return (XPRT_MOREREQS); 542 543 if (soreadable(xprt->xp_socket)) 544 return (XPRT_MOREREQS); 545 546 return (XPRT_IDLE); 547 } 548 549 static bool_t 550 svc_vc_ack(SVCXPRT *xprt, uint32_t *ack) 551 { 552 553 *ack = atomic_load_acq_32(&xprt->xp_snt_cnt); 554 *ack -= sbused(&xprt->xp_socket->so_snd); 555 return (TRUE); 556 } 557 558 static enum xprt_stat 559 svc_vc_backchannel_stat(SVCXPRT *xprt) 560 { 561 struct cf_conn *cd; 562 563 cd = (struct cf_conn *)(xprt->xp_p1); 564 565 if (cd->mreq != NULL) 566 return (XPRT_MOREREQS); 567 568 return (XPRT_IDLE); 569 } 570 571 /* 572 * If we have an mbuf chain in cd->mpending, try to parse a record from it, 573 * leaving the result in cd->mreq. If we don't have a complete record, leave 574 * the partial result in cd->mreq and try to read more from the socket. 575 */ 576 static int 577 svc_vc_process_pending(SVCXPRT *xprt) 578 { 579 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 580 struct socket *so = xprt->xp_socket; 581 struct mbuf *m; 582 583 /* 584 * If cd->resid is non-zero, we have part of the 585 * record already, otherwise we are expecting a record 586 * marker. 587 */ 588 if (!cd->resid && cd->mpending) { 589 /* 590 * See if there is enough data buffered to 591 * make up a record marker. Make sure we can 592 * handle the case where the record marker is 593 * split across more than one mbuf. 594 */ 595 size_t n = 0; 596 uint32_t header; 597 598 m = cd->mpending; 599 while (n < sizeof(uint32_t) && m) { 600 n += m->m_len; 601 m = m->m_next; 602 } 603 if (n < sizeof(uint32_t)) { 604 so->so_rcv.sb_lowat = sizeof(uint32_t) - n; 605 return (FALSE); 606 } 607 m_copydata(cd->mpending, 0, sizeof(header), 608 (char *)&header); 609 header = ntohl(header); 610 cd->eor = (header & 0x80000000) != 0; 611 cd->resid = header & 0x7fffffff; 612 m_adj(cd->mpending, sizeof(uint32_t)); 613 } 614 615 /* 616 * Start pulling off mbufs from cd->mpending 617 * until we either have a complete record or 618 * we run out of data. We use m_split to pull 619 * data - it will pull as much as possible and 620 * split the last mbuf if necessary. 621 */ 622 while (cd->mpending && cd->resid) { 623 m = cd->mpending; 624 if (cd->mpending->m_next 625 || cd->mpending->m_len > cd->resid) 626 cd->mpending = m_split(cd->mpending, 627 cd->resid, M_WAITOK); 628 else 629 cd->mpending = NULL; 630 if (cd->mreq) 631 m_last(cd->mreq)->m_next = m; 632 else 633 cd->mreq = m; 634 while (m) { 635 cd->resid -= m->m_len; 636 m = m->m_next; 637 } 638 } 639 640 /* 641 * Block receive upcalls if we have more data pending, 642 * otherwise report our need. 643 */ 644 if (cd->mpending) 645 so->so_rcv.sb_lowat = INT_MAX; 646 else 647 so->so_rcv.sb_lowat = 648 imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2)); 649 return (TRUE); 650 } 651 652 static bool_t 653 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, 654 struct sockaddr **addrp, struct mbuf **mp) 655 { 656 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 657 struct uio uio; 658 struct mbuf *m; 659 struct socket* so = xprt->xp_socket; 660 XDR xdrs; 661 int error, rcvflag; 662 uint32_t xid_plus_direction[2]; 663 664 /* 665 * Serialise access to the socket and our own record parsing 666 * state. 667 */ 668 sx_xlock(&xprt->xp_lock); 669 670 for (;;) { 671 /* If we have no request ready, check pending queue. */ 672 while (cd->mpending && 673 (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) { 674 if (!svc_vc_process_pending(xprt)) 675 break; 676 } 677 678 /* Process and return complete request in cd->mreq. */ 679 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) { 680 681 /* 682 * Now, check for a backchannel reply. 683 * The XID is in the first uint32_t of the reply 684 * and the message direction is the second one. 685 */ 686 if ((cd->mreq->m_len >= sizeof(xid_plus_direction) || 687 m_length(cd->mreq, NULL) >= 688 sizeof(xid_plus_direction)) && 689 xprt->xp_p2 != NULL) { 690 m_copydata(cd->mreq, 0, 691 sizeof(xid_plus_direction), 692 (char *)xid_plus_direction); 693 xid_plus_direction[0] = 694 ntohl(xid_plus_direction[0]); 695 xid_plus_direction[1] = 696 ntohl(xid_plus_direction[1]); 697 /* Check message direction. */ 698 if (xid_plus_direction[1] == REPLY) { 699 clnt_bck_svccall(xprt->xp_p2, 700 cd->mreq, 701 xid_plus_direction[0]); 702 cd->mreq = NULL; 703 continue; 704 } 705 } 706 707 xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); 708 cd->mreq = NULL; 709 710 /* Check for next request in a pending queue. */ 711 svc_vc_process_pending(xprt); 712 if (cd->mreq == NULL || cd->resid != 0) { 713 SOCKBUF_LOCK(&so->so_rcv); 714 if (!soreadable(so)) 715 xprt_inactive_self(xprt); 716 SOCKBUF_UNLOCK(&so->so_rcv); 717 } 718 719 sx_xunlock(&xprt->xp_lock); 720 721 if (! xdr_callmsg(&xdrs, msg)) { 722 XDR_DESTROY(&xdrs); 723 return (FALSE); 724 } 725 726 *addrp = NULL; 727 *mp = xdrmbuf_getall(&xdrs); 728 XDR_DESTROY(&xdrs); 729 730 return (TRUE); 731 } 732 733 /* 734 * The socket upcall calls xprt_active() which will eventually 735 * cause the server to call us here. We attempt to 736 * read as much as possible from the socket and put 737 * the result in cd->mpending. If the read fails, 738 * we have drained both cd->mpending and the socket so 739 * we can call xprt_inactive(). 740 */ 741 uio.uio_resid = 1000000000; 742 uio.uio_td = curthread; 743 m = NULL; 744 rcvflag = MSG_DONTWAIT; 745 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 746 747 if (error == EWOULDBLOCK) { 748 /* 749 * We must re-test for readability after 750 * taking the lock to protect us in the case 751 * where a new packet arrives on the socket 752 * after our call to soreceive fails with 753 * EWOULDBLOCK. 754 */ 755 SOCKBUF_LOCK(&so->so_rcv); 756 if (!soreadable(so)) 757 xprt_inactive_self(xprt); 758 SOCKBUF_UNLOCK(&so->so_rcv); 759 sx_xunlock(&xprt->xp_lock); 760 return (FALSE); 761 } 762 763 if (error) { 764 SOCKBUF_LOCK(&so->so_rcv); 765 if (xprt->xp_upcallset) { 766 xprt->xp_upcallset = 0; 767 soupcall_clear(so, SO_RCV); 768 } 769 SOCKBUF_UNLOCK(&so->so_rcv); 770 xprt_inactive_self(xprt); 771 cd->strm_stat = XPRT_DIED; 772 sx_xunlock(&xprt->xp_lock); 773 return (FALSE); 774 } 775 776 if (!m) { 777 /* 778 * EOF - the other end has closed the socket. 779 */ 780 xprt_inactive_self(xprt); 781 cd->strm_stat = XPRT_DIED; 782 sx_xunlock(&xprt->xp_lock); 783 return (FALSE); 784 } 785 786 if (cd->mpending) 787 m_last(cd->mpending)->m_next = m; 788 else 789 cd->mpending = m; 790 } 791 } 792 793 static bool_t 794 svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg, 795 struct sockaddr **addrp, struct mbuf **mp) 796 { 797 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 798 struct ct_data *ct; 799 struct mbuf *m; 800 XDR xdrs; 801 802 sx_xlock(&xprt->xp_lock); 803 ct = (struct ct_data *)xprt->xp_p2; 804 if (ct == NULL) { 805 sx_xunlock(&xprt->xp_lock); 806 return (FALSE); 807 } 808 mtx_lock(&ct->ct_lock); 809 m = cd->mreq; 810 if (m == NULL) { 811 xprt_inactive_self(xprt); 812 mtx_unlock(&ct->ct_lock); 813 sx_xunlock(&xprt->xp_lock); 814 return (FALSE); 815 } 816 cd->mreq = m->m_nextpkt; 817 mtx_unlock(&ct->ct_lock); 818 sx_xunlock(&xprt->xp_lock); 819 820 xdrmbuf_create(&xdrs, m, XDR_DECODE); 821 if (! xdr_callmsg(&xdrs, msg)) { 822 XDR_DESTROY(&xdrs); 823 return (FALSE); 824 } 825 *addrp = NULL; 826 *mp = xdrmbuf_getall(&xdrs); 827 XDR_DESTROY(&xdrs); 828 return (TRUE); 829 } 830 831 static bool_t 832 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, 833 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 834 { 835 XDR xdrs; 836 struct mbuf *mrep; 837 bool_t stat = TRUE; 838 int error, len; 839 840 /* 841 * Leave space for record mark. 842 */ 843 mrep = m_gethdr(M_WAITOK, MT_DATA); 844 mrep->m_data += sizeof(uint32_t); 845 846 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 847 848 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 849 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 850 if (!xdr_replymsg(&xdrs, msg)) 851 stat = FALSE; 852 else 853 xdrmbuf_append(&xdrs, m); 854 } else { 855 stat = xdr_replymsg(&xdrs, msg); 856 } 857 858 if (stat) { 859 m_fixhdr(mrep); 860 861 /* 862 * Prepend a record marker containing the reply length. 863 */ 864 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); 865 len = mrep->m_pkthdr.len; 866 *mtod(mrep, uint32_t *) = 867 htonl(0x80000000 | (len - sizeof(uint32_t))); 868 atomic_add_32(&xprt->xp_snd_cnt, len); 869 error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL, 870 0, curthread); 871 if (!error) { 872 atomic_add_rel_32(&xprt->xp_snt_cnt, len); 873 if (seq) 874 *seq = xprt->xp_snd_cnt; 875 stat = TRUE; 876 } else 877 atomic_subtract_32(&xprt->xp_snd_cnt, len); 878 } else { 879 m_freem(mrep); 880 } 881 882 XDR_DESTROY(&xdrs); 883 884 return (stat); 885 } 886 887 static bool_t 888 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg, 889 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 890 { 891 struct ct_data *ct; 892 XDR xdrs; 893 struct mbuf *mrep; 894 bool_t stat = TRUE; 895 int error; 896 897 /* 898 * Leave space for record mark. 899 */ 900 mrep = m_gethdr(M_WAITOK, MT_DATA); 901 mrep->m_data += sizeof(uint32_t); 902 903 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 904 905 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 906 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 907 if (!xdr_replymsg(&xdrs, msg)) 908 stat = FALSE; 909 else 910 xdrmbuf_append(&xdrs, m); 911 } else { 912 stat = xdr_replymsg(&xdrs, msg); 913 } 914 915 if (stat) { 916 m_fixhdr(mrep); 917 918 /* 919 * Prepend a record marker containing the reply length. 920 */ 921 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); 922 *mtod(mrep, uint32_t *) = 923 htonl(0x80000000 | (mrep->m_pkthdr.len 924 - sizeof(uint32_t))); 925 sx_xlock(&xprt->xp_lock); 926 ct = (struct ct_data *)xprt->xp_p2; 927 if (ct != NULL) 928 error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL, 929 0, curthread); 930 else 931 error = EPIPE; 932 sx_xunlock(&xprt->xp_lock); 933 if (!error) { 934 stat = TRUE; 935 } 936 } else { 937 m_freem(mrep); 938 } 939 940 XDR_DESTROY(&xdrs); 941 942 return (stat); 943 } 944 945 static bool_t 946 svc_vc_null() 947 { 948 949 return (FALSE); 950 } 951 952 static int 953 svc_vc_soupcall(struct socket *so, void *arg, int waitflag) 954 { 955 SVCXPRT *xprt = (SVCXPRT *) arg; 956 957 if (soreadable(xprt->xp_socket)) 958 xprt_active(xprt); 959 return (SU_OK); 960 } 961 962 static int 963 svc_vc_rendezvous_soupcall(struct socket *head, void *arg, int waitflag) 964 { 965 SVCXPRT *xprt = (SVCXPRT *) arg; 966 967 if (!TAILQ_EMPTY(&head->sol_comp)) 968 xprt_active(xprt); 969 return (SU_OK); 970 } 971 972 #if 0 973 /* 974 * Get the effective UID of the sending process. Used by rpcbind, keyserv 975 * and rpc.yppasswdd on AF_LOCAL. 976 */ 977 int 978 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) { 979 int sock, ret; 980 gid_t egid; 981 uid_t euid; 982 struct sockaddr *sa; 983 984 sock = transp->xp_fd; 985 sa = (struct sockaddr *)transp->xp_rtaddr; 986 if (sa->sa_family == AF_LOCAL) { 987 ret = getpeereid(sock, &euid, &egid); 988 if (ret == 0) 989 *uid = euid; 990 return (ret); 991 } else 992 return (-1); 993 } 994 #endif 995