1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * linux/net/sunrpc/xprtsock.c 4 * 5 * Client-side transport implementation for sockets. 6 * 7 * TCP callback races fixes (C) 1998 Red Hat 8 * TCP send fixes (C) 1998 Red Hat 9 * TCP NFS related read + write fixes 10 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 11 * 12 * Rewrite of larges part of the code in order to stabilize TCP stuff. 13 * Fix behaviour when socket buffer is full. 14 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 15 * 16 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com> 17 * 18 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005. 19 * <gilles.quillard@bull.net> 20 */ 21 22 #include <linux/types.h> 23 #include <linux/string.h> 24 #include <linux/slab.h> 25 #include <linux/module.h> 26 #include <linux/capability.h> 27 #include <linux/pagemap.h> 28 #include <linux/errno.h> 29 #include <linux/socket.h> 30 #include <linux/in.h> 31 #include <linux/net.h> 32 #include <linux/mm.h> 33 #include <linux/un.h> 34 #include <linux/udp.h> 35 #include <linux/tcp.h> 36 #include <linux/sunrpc/clnt.h> 37 #include <linux/sunrpc/addr.h> 38 #include <linux/sunrpc/sched.h> 39 #include <linux/sunrpc/svcsock.h> 40 #include <linux/sunrpc/xprtsock.h> 41 #include <linux/file.h> 42 #ifdef CONFIG_SUNRPC_BACKCHANNEL 43 #include <linux/sunrpc/bc_xprt.h> 44 #endif 45 46 #include <net/sock.h> 47 #include <net/checksum.h> 48 #include <net/udp.h> 49 #include <net/tcp.h> 50 #include <net/tls_prot.h> 51 #include <net/handshake.h> 52 53 #include <linux/bvec.h> 54 #include <linux/highmem.h> 55 #include <linux/uio.h> 56 #include <linux/sched/mm.h> 57 58 #include <trace/events/sock.h> 59 #include <trace/events/sunrpc.h> 60 61 #include "socklib.h" 62 #include "sunrpc.h" 63 64 static void xs_close(struct rpc_xprt *xprt); 65 static void xs_reset_srcport(struct sock_xprt *transport); 66 static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock); 67 static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, 68 struct socket *sock); 69 70 /* 71 * xprtsock tunables 72 */ 73 static unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; 74 static unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE; 75 static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE; 76 77 static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; 78 static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; 79 80 #define XS_TCP_LINGER_TO (15U * HZ) 81 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; 82 83 /* 84 * We can register our own files under /proc/sys/sunrpc by 85 * calling register_sysctl() again. The files in that 86 * directory become the union of all files registered there. 87 * 88 * We simply need to make sure that we don't collide with 89 * someone else's file names! 90 */ 91 92 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; 93 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; 94 static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT; 95 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT; 96 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT; 97 98 static struct ctl_table_header *sunrpc_table_header; 99 100 static struct xprt_class xs_local_transport; 101 static struct xprt_class xs_udp_transport; 102 static struct xprt_class xs_tcp_transport; 103 static struct xprt_class xs_tcp_tls_transport; 104 static struct xprt_class xs_bc_tcp_transport; 105 106 /* 107 * FIXME: changing the UDP slot table size should also resize the UDP 108 * socket buffers for existing UDP transports 109 */ 110 static struct ctl_table xs_tunables_table[] = { 111 { 112 .procname = "udp_slot_table_entries", 113 .data = &xprt_udp_slot_table_entries, 114 .maxlen = sizeof(unsigned int), 115 .mode = 0644, 116 .proc_handler = proc_dointvec_minmax, 117 .extra1 = &min_slot_table_size, 118 .extra2 = &max_slot_table_size 119 }, 120 { 121 .procname = "tcp_slot_table_entries", 122 .data = &xprt_tcp_slot_table_entries, 123 .maxlen = sizeof(unsigned int), 124 .mode = 0644, 125 .proc_handler = proc_dointvec_minmax, 126 .extra1 = &min_slot_table_size, 127 .extra2 = &max_slot_table_size 128 }, 129 { 130 .procname = "tcp_max_slot_table_entries", 131 .data = &xprt_max_tcp_slot_table_entries, 132 .maxlen = sizeof(unsigned int), 133 .mode = 0644, 134 .proc_handler = proc_dointvec_minmax, 135 .extra1 = &min_slot_table_size, 136 .extra2 = &max_tcp_slot_table_limit 137 }, 138 { 139 .procname = "min_resvport", 140 .data = &xprt_min_resvport, 141 .maxlen = sizeof(unsigned int), 142 .mode = 0644, 143 .proc_handler = proc_dointvec_minmax, 144 .extra1 = &xprt_min_resvport_limit, 145 .extra2 = &xprt_max_resvport_limit 146 }, 147 { 148 .procname = "max_resvport", 149 .data = &xprt_max_resvport, 150 .maxlen = sizeof(unsigned int), 151 .mode = 0644, 152 .proc_handler = proc_dointvec_minmax, 153 .extra1 = &xprt_min_resvport_limit, 154 .extra2 = &xprt_max_resvport_limit 155 }, 156 { 157 .procname = "tcp_fin_timeout", 158 .data = &xs_tcp_fin_timeout, 159 .maxlen = sizeof(xs_tcp_fin_timeout), 160 .mode = 0644, 161 .proc_handler = proc_dointvec_jiffies, 162 }, 163 }; 164 165 /* 166 * Wait duration for a reply from the RPC portmapper. 167 */ 168 #define XS_BIND_TO (60U * HZ) 169 170 /* 171 * Delay if a UDP socket connect error occurs. This is most likely some 172 * kind of resource problem on the local host. 173 */ 174 #define XS_UDP_REEST_TO (2U * HZ) 175 176 /* 177 * The reestablish timeout allows clients to delay for a bit before attempting 178 * to reconnect to a server that just dropped our connection. 179 * 180 * We implement an exponential backoff when trying to reestablish a TCP 181 * transport connection with the server. Some servers like to drop a TCP 182 * connection when they are overworked, so we start with a short timeout and 183 * increase over time if the server is down or not responding. 184 */ 185 #define XS_TCP_INIT_REEST_TO (3U * HZ) 186 187 /* 188 * TCP idle timeout; client drops the transport socket if it is idle 189 * for this long. Note that we also timeout UDP sockets to prevent 190 * holding port numbers when there is no RPC traffic. 191 */ 192 #define XS_IDLE_DISC_TO (5U * 60 * HZ) 193 194 /* 195 * TLS handshake timeout. 196 */ 197 #define XS_TLS_HANDSHAKE_TO (10U * HZ) 198 199 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 200 # undef RPC_DEBUG_DATA 201 # define RPCDBG_FACILITY RPCDBG_TRANS 202 #endif 203 204 #ifdef RPC_DEBUG_DATA 205 static void xs_pktdump(char *msg, u32 *packet, unsigned int count) 206 { 207 u8 *buf = (u8 *) packet; 208 int j; 209 210 dprintk("RPC: %s\n", msg); 211 for (j = 0; j < count && j < 128; j += 4) { 212 if (!(j & 31)) { 213 if (j) 214 dprintk("\n"); 215 dprintk("0x%04x ", j); 216 } 217 dprintk("%02x%02x%02x%02x ", 218 buf[j], buf[j+1], buf[j+2], buf[j+3]); 219 } 220 dprintk("\n"); 221 } 222 #else 223 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count) 224 { 225 /* NOP */ 226 } 227 #endif 228 229 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) 230 { 231 return (struct rpc_xprt *) sk->sk_user_data; 232 } 233 234 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) 235 { 236 return (struct sockaddr *) &xprt->addr; 237 } 238 239 static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt) 240 { 241 return (struct sockaddr_un *) &xprt->addr; 242 } 243 244 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt) 245 { 246 return (struct sockaddr_in *) &xprt->addr; 247 } 248 249 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt) 250 { 251 return (struct sockaddr_in6 *) &xprt->addr; 252 } 253 254 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt) 255 { 256 struct sockaddr *sap = xs_addr(xprt); 257 struct sockaddr_in6 *sin6; 258 struct sockaddr_in *sin; 259 struct sockaddr_un *sun; 260 char buf[128]; 261 262 switch (sap->sa_family) { 263 case AF_LOCAL: 264 sun = xs_addr_un(xprt); 265 if (sun->sun_path[0]) { 266 strscpy(buf, sun->sun_path, sizeof(buf)); 267 } else { 268 buf[0] = '@'; 269 strscpy(buf+1, sun->sun_path+1, sizeof(buf)-1); 270 } 271 xprt->address_strings[RPC_DISPLAY_ADDR] = 272 kstrdup(buf, GFP_KERNEL); 273 break; 274 case AF_INET: 275 (void)rpc_ntop(sap, buf, sizeof(buf)); 276 xprt->address_strings[RPC_DISPLAY_ADDR] = 277 kstrdup(buf, GFP_KERNEL); 278 sin = xs_addr_in(xprt); 279 snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr)); 280 break; 281 case AF_INET6: 282 (void)rpc_ntop(sap, buf, sizeof(buf)); 283 xprt->address_strings[RPC_DISPLAY_ADDR] = 284 kstrdup(buf, GFP_KERNEL); 285 sin6 = xs_addr_in6(xprt); 286 snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr); 287 break; 288 default: 289 BUG(); 290 } 291 292 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); 293 } 294 295 static void xs_format_common_peer_ports(struct rpc_xprt *xprt) 296 { 297 struct sockaddr *sap = xs_addr(xprt); 298 char buf[128]; 299 300 snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap)); 301 xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL); 302 303 snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap)); 304 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); 305 } 306 307 static void xs_format_peer_addresses(struct rpc_xprt *xprt, 308 const char *protocol, 309 const char *netid) 310 { 311 xprt->address_strings[RPC_DISPLAY_PROTO] = protocol; 312 xprt->address_strings[RPC_DISPLAY_NETID] = netid; 313 xs_format_common_peer_addresses(xprt); 314 xs_format_common_peer_ports(xprt); 315 } 316 317 static void xs_update_peer_port(struct rpc_xprt *xprt) 318 { 319 kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]); 320 kfree(xprt->address_strings[RPC_DISPLAY_PORT]); 321 322 xs_format_common_peer_ports(xprt); 323 } 324 325 static void xs_free_peer_addresses(struct rpc_xprt *xprt) 326 { 327 unsigned int i; 328 329 for (i = 0; i < RPC_DISPLAY_MAX; i++) 330 switch (i) { 331 case RPC_DISPLAY_PROTO: 332 case RPC_DISPLAY_NETID: 333 continue; 334 default: 335 kfree(xprt->address_strings[i]); 336 } 337 } 338 339 static size_t 340 xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp) 341 { 342 size_t i,n; 343 344 if (!want || !(buf->flags & XDRBUF_SPARSE_PAGES)) 345 return want; 346 n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT; 347 for (i = 0; i < n; i++) { 348 if (buf->pages[i]) 349 continue; 350 buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp); 351 if (!buf->pages[i]) { 352 i *= PAGE_SIZE; 353 return i > buf->page_base ? i - buf->page_base : 0; 354 } 355 } 356 return want; 357 } 358 359 static int 360 xs_sock_process_cmsg(struct socket *sock, struct msghdr *msg, 361 unsigned int *msg_flags, struct cmsghdr *cmsg, int ret) 362 { 363 u8 content_type = tls_get_record_type(sock->sk, cmsg); 364 u8 level, description; 365 366 switch (content_type) { 367 case 0: 368 break; 369 case TLS_RECORD_TYPE_DATA: 370 /* TLS sets EOR at the end of each application data 371 * record, even though there might be more frames 372 * waiting to be decrypted. 373 */ 374 *msg_flags &= ~MSG_EOR; 375 break; 376 case TLS_RECORD_TYPE_ALERT: 377 tls_alert_recv(sock->sk, msg, &level, &description); 378 ret = (level == TLS_ALERT_LEVEL_FATAL) ? 379 -EACCES : -EAGAIN; 380 break; 381 default: 382 /* discard this record type */ 383 ret = -EAGAIN; 384 } 385 return ret; 386 } 387 388 static int 389 xs_sock_recv_cmsg(struct socket *sock, unsigned int *msg_flags, int flags) 390 { 391 union { 392 struct cmsghdr cmsg; 393 u8 buf[CMSG_SPACE(sizeof(u8))]; 394 } u; 395 u8 alert[2]; 396 struct kvec alert_kvec = { 397 .iov_base = alert, 398 .iov_len = sizeof(alert), 399 }; 400 struct msghdr msg = { 401 .msg_flags = *msg_flags, 402 .msg_control = &u, 403 .msg_controllen = sizeof(u), 404 }; 405 int ret; 406 407 iov_iter_kvec(&msg.msg_iter, ITER_DEST, &alert_kvec, 1, 408 alert_kvec.iov_len); 409 ret = sock_recvmsg(sock, &msg, flags); 410 if (ret > 0) { 411 if (tls_get_record_type(sock->sk, &u.cmsg) == TLS_RECORD_TYPE_ALERT) 412 iov_iter_revert(&msg.msg_iter, ret); 413 ret = xs_sock_process_cmsg(sock, &msg, msg_flags, &u.cmsg, 414 -EAGAIN); 415 } 416 return ret; 417 } 418 419 static ssize_t 420 xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek) 421 { 422 ssize_t ret; 423 if (seek != 0) 424 iov_iter_advance(&msg->msg_iter, seek); 425 ret = sock_recvmsg(sock, msg, flags); 426 /* Handle TLS inband control message lazily */ 427 if (msg->msg_flags & MSG_CTRUNC) { 428 msg->msg_flags &= ~(MSG_CTRUNC | MSG_EOR); 429 if (ret == 0 || ret == -EIO) 430 ret = xs_sock_recv_cmsg(sock, &msg->msg_flags, flags); 431 } 432 return ret > 0 ? ret + seek : ret; 433 } 434 435 static ssize_t 436 xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags, 437 struct kvec *kvec, size_t count, size_t seek) 438 { 439 iov_iter_kvec(&msg->msg_iter, ITER_DEST, kvec, 1, count); 440 return xs_sock_recvmsg(sock, msg, flags, seek); 441 } 442 443 static ssize_t 444 xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags, 445 struct bio_vec *bvec, unsigned long nr, size_t count, 446 size_t seek) 447 { 448 iov_iter_bvec(&msg->msg_iter, ITER_DEST, bvec, nr, count); 449 return xs_sock_recvmsg(sock, msg, flags, seek); 450 } 451 452 static ssize_t 453 xs_read_discard(struct socket *sock, struct msghdr *msg, int flags, 454 size_t count) 455 { 456 iov_iter_discard(&msg->msg_iter, ITER_DEST, count); 457 return xs_sock_recvmsg(sock, msg, flags, 0); 458 } 459 460 #if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE 461 static void 462 xs_flush_bvec(const struct bio_vec *bvec, size_t count, size_t seek) 463 { 464 struct bvec_iter bi = { 465 .bi_size = count, 466 }; 467 struct bio_vec bv; 468 469 bvec_iter_advance(bvec, &bi, seek & PAGE_MASK); 470 for_each_bvec(bv, bvec, bi, bi) 471 flush_dcache_page(bv.bv_page); 472 } 473 #else 474 static inline void 475 xs_flush_bvec(const struct bio_vec *bvec, size_t count, size_t seek) 476 { 477 } 478 #endif 479 480 static ssize_t 481 xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, 482 struct xdr_buf *buf, size_t count, size_t seek, size_t *read) 483 { 484 size_t want, seek_init = seek, offset = 0; 485 ssize_t ret; 486 487 want = min_t(size_t, count, buf->head[0].iov_len); 488 if (seek < want) { 489 ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek); 490 if (ret <= 0) 491 goto sock_err; 492 offset += ret; 493 if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) 494 goto out; 495 if (ret != want) 496 goto out; 497 seek = 0; 498 } else { 499 seek -= want; 500 offset += want; 501 } 502 503 want = xs_alloc_sparse_pages( 504 buf, min_t(size_t, count - offset, buf->page_len), 505 GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN); 506 if (seek < want) { 507 ret = xs_read_bvec(sock, msg, flags, buf->bvec, 508 xdr_buf_pagecount(buf), 509 want + buf->page_base, 510 seek + buf->page_base); 511 if (ret <= 0) 512 goto sock_err; 513 xs_flush_bvec(buf->bvec, ret, seek + buf->page_base); 514 ret -= buf->page_base; 515 offset += ret; 516 if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) 517 goto out; 518 if (ret != want) 519 goto out; 520 seek = 0; 521 } else { 522 seek -= want; 523 offset += want; 524 } 525 526 want = min_t(size_t, count - offset, buf->tail[0].iov_len); 527 if (seek < want) { 528 ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek); 529 if (ret <= 0) 530 goto sock_err; 531 offset += ret; 532 if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) 533 goto out; 534 if (ret != want) 535 goto out; 536 } else if (offset < seek_init) 537 offset = seek_init; 538 ret = -EMSGSIZE; 539 out: 540 *read = offset - seek_init; 541 return ret; 542 sock_err: 543 offset += seek; 544 goto out; 545 } 546 547 static void 548 xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf) 549 { 550 if (!transport->recv.copied) { 551 if (buf->head[0].iov_len >= transport->recv.offset) 552 memcpy(buf->head[0].iov_base, 553 &transport->recv.xid, 554 transport->recv.offset); 555 transport->recv.copied = transport->recv.offset; 556 } 557 } 558 559 static bool 560 xs_read_stream_request_done(struct sock_xprt *transport) 561 { 562 return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT); 563 } 564 565 static void 566 xs_read_stream_check_eor(struct sock_xprt *transport, 567 struct msghdr *msg) 568 { 569 if (xs_read_stream_request_done(transport)) 570 msg->msg_flags |= MSG_EOR; 571 } 572 573 static ssize_t 574 xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg, 575 int flags, struct rpc_rqst *req) 576 { 577 struct xdr_buf *buf = &req->rq_private_buf; 578 size_t want, read; 579 ssize_t ret; 580 581 xs_read_header(transport, buf); 582 583 want = transport->recv.len - transport->recv.offset; 584 if (want != 0) { 585 ret = xs_read_xdr_buf(transport->sock, msg, flags, buf, 586 transport->recv.copied + want, 587 transport->recv.copied, 588 &read); 589 transport->recv.offset += read; 590 transport->recv.copied += read; 591 } 592 593 if (transport->recv.offset == transport->recv.len) 594 xs_read_stream_check_eor(transport, msg); 595 596 if (want == 0) 597 return 0; 598 599 switch (ret) { 600 default: 601 break; 602 case -EFAULT: 603 case -EMSGSIZE: 604 msg->msg_flags |= MSG_TRUNC; 605 return read; 606 case 0: 607 return -ESHUTDOWN; 608 } 609 return ret < 0 ? ret : read; 610 } 611 612 static size_t 613 xs_read_stream_headersize(bool isfrag) 614 { 615 if (isfrag) 616 return sizeof(__be32); 617 return 3 * sizeof(__be32); 618 } 619 620 static ssize_t 621 xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg, 622 int flags, size_t want, size_t seek) 623 { 624 struct kvec kvec = { 625 .iov_base = &transport->recv.fraghdr, 626 .iov_len = want, 627 }; 628 return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek); 629 } 630 631 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 632 static ssize_t 633 xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) 634 { 635 struct rpc_xprt *xprt = &transport->xprt; 636 struct rpc_rqst *req; 637 ssize_t ret; 638 639 /* Is this transport associated with the backchannel? */ 640 if (!xprt->bc_serv) 641 return -ESHUTDOWN; 642 643 /* Look up and lock the request corresponding to the given XID */ 644 req = xprt_lookup_bc_request(xprt, transport->recv.xid); 645 if (!req) { 646 printk(KERN_WARNING "Callback slot table overflowed\n"); 647 return -ESHUTDOWN; 648 } 649 if (transport->recv.copied && !req->rq_private_buf.len) 650 return -ESHUTDOWN; 651 652 ret = xs_read_stream_request(transport, msg, flags, req); 653 if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) 654 xprt_complete_bc_request(req, transport->recv.copied); 655 else 656 req->rq_private_buf.len = transport->recv.copied; 657 658 return ret; 659 } 660 #else /* CONFIG_SUNRPC_BACKCHANNEL */ 661 static ssize_t 662 xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) 663 { 664 return -ESHUTDOWN; 665 } 666 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 667 668 static ssize_t 669 xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags) 670 { 671 struct rpc_xprt *xprt = &transport->xprt; 672 struct rpc_rqst *req; 673 ssize_t ret = 0; 674 675 /* Look up and lock the request corresponding to the given XID */ 676 spin_lock(&xprt->queue_lock); 677 req = xprt_lookup_rqst(xprt, transport->recv.xid); 678 if (!req || (transport->recv.copied && !req->rq_private_buf.len)) { 679 msg->msg_flags |= MSG_TRUNC; 680 goto out; 681 } 682 xprt_pin_rqst(req); 683 spin_unlock(&xprt->queue_lock); 684 685 ret = xs_read_stream_request(transport, msg, flags, req); 686 687 spin_lock(&xprt->queue_lock); 688 if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) 689 xprt_complete_rqst(req->rq_task, transport->recv.copied); 690 else 691 req->rq_private_buf.len = transport->recv.copied; 692 xprt_unpin_rqst(req); 693 out: 694 spin_unlock(&xprt->queue_lock); 695 return ret; 696 } 697 698 static ssize_t 699 xs_read_stream(struct sock_xprt *transport, int flags) 700 { 701 struct msghdr msg = { 0 }; 702 size_t want, read = 0; 703 ssize_t ret = 0; 704 705 if (transport->recv.len == 0) { 706 want = xs_read_stream_headersize(transport->recv.copied != 0); 707 ret = xs_read_stream_header(transport, &msg, flags, want, 708 transport->recv.offset); 709 if (ret <= 0) 710 goto out_err; 711 transport->recv.offset = ret; 712 if (transport->recv.offset != want) 713 return transport->recv.offset; 714 transport->recv.len = be32_to_cpu(transport->recv.fraghdr) & 715 RPC_FRAGMENT_SIZE_MASK; 716 transport->recv.offset -= sizeof(transport->recv.fraghdr); 717 read = ret; 718 } 719 720 switch (be32_to_cpu(transport->recv.calldir)) { 721 default: 722 msg.msg_flags |= MSG_TRUNC; 723 break; 724 case RPC_CALL: 725 ret = xs_read_stream_call(transport, &msg, flags); 726 break; 727 case RPC_REPLY: 728 ret = xs_read_stream_reply(transport, &msg, flags); 729 } 730 if (msg.msg_flags & MSG_TRUNC) { 731 transport->recv.calldir = cpu_to_be32(-1); 732 transport->recv.copied = -1; 733 } 734 if (ret < 0) 735 goto out_err; 736 read += ret; 737 if (transport->recv.offset < transport->recv.len) { 738 if (!(msg.msg_flags & MSG_TRUNC)) 739 return read; 740 msg.msg_flags = 0; 741 ret = xs_read_discard(transport->sock, &msg, flags, 742 transport->recv.len - transport->recv.offset); 743 if (ret <= 0) 744 goto out_err; 745 transport->recv.offset += ret; 746 read += ret; 747 if (transport->recv.offset != transport->recv.len) 748 return read; 749 } 750 if (xs_read_stream_request_done(transport)) { 751 trace_xs_stream_read_request(transport); 752 transport->recv.copied = 0; 753 } 754 transport->recv.offset = 0; 755 transport->recv.len = 0; 756 return read; 757 out_err: 758 return ret != 0 ? ret : -ESHUTDOWN; 759 } 760 761 static __poll_t xs_poll_socket(struct sock_xprt *transport) 762 { 763 return transport->sock->ops->poll(transport->file, transport->sock, 764 NULL); 765 } 766 767 static bool xs_poll_socket_readable(struct sock_xprt *transport) 768 { 769 __poll_t events = xs_poll_socket(transport); 770 771 return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP); 772 } 773 774 static void xs_poll_check_readable(struct sock_xprt *transport) 775 { 776 777 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); 778 if (test_bit(XPRT_SOCK_IGNORE_RECV, &transport->sock_state)) 779 return; 780 if (!xs_poll_socket_readable(transport)) 781 return; 782 if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) 783 queue_work(xprtiod_workqueue, &transport->recv_worker); 784 } 785 786 static void xs_stream_data_receive(struct sock_xprt *transport) 787 { 788 size_t read = 0; 789 ssize_t ret = 0; 790 791 mutex_lock(&transport->recv_mutex); 792 if (transport->sock == NULL) 793 goto out; 794 for (;;) { 795 ret = xs_read_stream(transport, MSG_DONTWAIT); 796 if (ret < 0) 797 break; 798 read += ret; 799 cond_resched(); 800 } 801 if (ret == -ESHUTDOWN) 802 kernel_sock_shutdown(transport->sock, SHUT_RDWR); 803 else if (ret == -EACCES) 804 xprt_wake_pending_tasks(&transport->xprt, -EACCES); 805 else 806 xs_poll_check_readable(transport); 807 out: 808 mutex_unlock(&transport->recv_mutex); 809 trace_xs_stream_read_data(&transport->xprt, ret, read); 810 } 811 812 static void xs_stream_data_receive_workfn(struct work_struct *work) 813 { 814 struct sock_xprt *transport = 815 container_of(work, struct sock_xprt, recv_worker); 816 unsigned int pflags = memalloc_nofs_save(); 817 818 xs_stream_data_receive(transport); 819 memalloc_nofs_restore(pflags); 820 } 821 822 static void 823 xs_stream_reset_connect(struct sock_xprt *transport) 824 { 825 transport->recv.offset = 0; 826 transport->recv.len = 0; 827 transport->recv.copied = 0; 828 transport->xmit.offset = 0; 829 } 830 831 static void 832 xs_stream_start_connect(struct sock_xprt *transport) 833 { 834 transport->xprt.stat.connect_count++; 835 transport->xprt.stat.connect_start = jiffies; 836 } 837 838 #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) 839 840 /** 841 * xs_nospace - handle transmit was incomplete 842 * @req: pointer to RPC request 843 * @transport: pointer to struct sock_xprt 844 * 845 */ 846 static int xs_nospace(struct rpc_rqst *req, struct sock_xprt *transport) 847 { 848 struct rpc_xprt *xprt = &transport->xprt; 849 struct sock *sk = transport->inet; 850 int ret = -EAGAIN; 851 852 trace_rpc_socket_nospace(req, transport); 853 854 /* Protect against races with write_space */ 855 spin_lock(&xprt->transport_lock); 856 857 /* Don't race with disconnect */ 858 if (xprt_connected(xprt)) { 859 /* wait for more buffer space */ 860 set_bit(XPRT_SOCK_NOSPACE, &transport->sock_state); 861 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 862 sk->sk_write_pending++; 863 xprt_wait_for_buffer_space(xprt); 864 } else 865 ret = -ENOTCONN; 866 867 spin_unlock(&xprt->transport_lock); 868 return ret; 869 } 870 871 static int xs_sock_nospace(struct rpc_rqst *req) 872 { 873 struct sock_xprt *transport = 874 container_of(req->rq_xprt, struct sock_xprt, xprt); 875 struct sock *sk = transport->inet; 876 int ret = -EAGAIN; 877 878 lock_sock(sk); 879 if (!sock_writeable(sk)) 880 ret = xs_nospace(req, transport); 881 release_sock(sk); 882 return ret; 883 } 884 885 static int xs_stream_nospace(struct rpc_rqst *req, bool vm_wait) 886 { 887 struct sock_xprt *transport = 888 container_of(req->rq_xprt, struct sock_xprt, xprt); 889 struct sock *sk = transport->inet; 890 int ret = -EAGAIN; 891 892 if (vm_wait) 893 return -ENOBUFS; 894 lock_sock(sk); 895 if (!sk_stream_memory_free(sk)) 896 ret = xs_nospace(req, transport); 897 release_sock(sk); 898 return ret; 899 } 900 901 static int xs_stream_prepare_request(struct rpc_rqst *req, struct xdr_buf *buf) 902 { 903 return xdr_alloc_bvec(buf, rpc_task_gfp_mask()); 904 } 905 906 static void xs_stream_abort_send_request(struct rpc_rqst *req) 907 { 908 struct rpc_xprt *xprt = req->rq_xprt; 909 struct sock_xprt *transport = 910 container_of(xprt, struct sock_xprt, xprt); 911 912 if (transport->xmit.offset != 0 && 913 !test_bit(XPRT_CLOSE_WAIT, &xprt->state)) 914 xprt_force_disconnect(xprt); 915 } 916 917 /* 918 * Determine if the previous message in the stream was aborted before it 919 * could complete transmission. 920 */ 921 static bool 922 xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req) 923 { 924 return transport->xmit.offset != 0 && req->rq_bytes_sent == 0; 925 } 926 927 /* 928 * Return the stream record marker field for a record of length < 2^31-1 929 */ 930 static rpc_fraghdr 931 xs_stream_record_marker(struct xdr_buf *xdr) 932 { 933 if (!xdr->len) 934 return 0; 935 return cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | (u32)xdr->len); 936 } 937 938 /** 939 * xs_local_send_request - write an RPC request to an AF_LOCAL socket 940 * @req: pointer to RPC request 941 * 942 * Return values: 943 * 0: The request has been sent 944 * EAGAIN: The socket was blocked, please call again later to 945 * complete the request 946 * ENOTCONN: Caller needs to invoke connect logic then call again 947 * other: Some other error occurred, the request was not sent 948 */ 949 static int xs_local_send_request(struct rpc_rqst *req) 950 { 951 struct rpc_xprt *xprt = req->rq_xprt; 952 struct sock_xprt *transport = 953 container_of(xprt, struct sock_xprt, xprt); 954 struct xdr_buf *xdr = &req->rq_snd_buf; 955 rpc_fraghdr rm = xs_stream_record_marker(xdr); 956 unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen; 957 struct msghdr msg = { 958 .msg_flags = XS_SENDMSG_FLAGS, 959 }; 960 bool vm_wait; 961 unsigned int sent; 962 int status; 963 964 /* Close the stream if the previous transmission was incomplete */ 965 if (xs_send_request_was_aborted(transport, req)) { 966 xprt_force_disconnect(xprt); 967 return -ENOTCONN; 968 } 969 970 xs_pktdump("packet data:", 971 req->rq_svec->iov_base, req->rq_svec->iov_len); 972 973 vm_wait = sk_stream_is_writeable(transport->inet) ? true : false; 974 975 req->rq_xtime = ktime_get(); 976 status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 977 transport->xmit.offset, rm, &sent); 978 dprintk("RPC: %s(%u) = %d\n", 979 __func__, xdr->len - transport->xmit.offset, status); 980 981 if (likely(sent > 0) || status == 0) { 982 transport->xmit.offset += sent; 983 req->rq_bytes_sent = transport->xmit.offset; 984 if (likely(req->rq_bytes_sent >= msglen)) { 985 req->rq_xmit_bytes_sent += transport->xmit.offset; 986 transport->xmit.offset = 0; 987 return 0; 988 } 989 status = -EAGAIN; 990 vm_wait = false; 991 } 992 993 switch (status) { 994 case -EAGAIN: 995 status = xs_stream_nospace(req, vm_wait); 996 break; 997 default: 998 dprintk("RPC: sendmsg returned unrecognized error %d\n", 999 -status); 1000 fallthrough; 1001 case -EPIPE: 1002 xprt_force_disconnect(xprt); 1003 status = -ENOTCONN; 1004 } 1005 1006 return status; 1007 } 1008 1009 /** 1010 * xs_udp_send_request - write an RPC request to a UDP socket 1011 * @req: pointer to RPC request 1012 * 1013 * Return values: 1014 * 0: The request has been sent 1015 * EAGAIN: The socket was blocked, please call again later to 1016 * complete the request 1017 * ENOTCONN: Caller needs to invoke connect logic then call again 1018 * other: Some other error occurred, the request was not sent 1019 */ 1020 static int xs_udp_send_request(struct rpc_rqst *req) 1021 { 1022 struct rpc_xprt *xprt = req->rq_xprt; 1023 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1024 struct xdr_buf *xdr = &req->rq_snd_buf; 1025 struct msghdr msg = { 1026 .msg_name = xs_addr(xprt), 1027 .msg_namelen = xprt->addrlen, 1028 .msg_flags = XS_SENDMSG_FLAGS, 1029 }; 1030 unsigned int sent; 1031 int status; 1032 1033 xs_pktdump("packet data:", 1034 req->rq_svec->iov_base, 1035 req->rq_svec->iov_len); 1036 1037 if (!xprt_bound(xprt)) 1038 return -ENOTCONN; 1039 1040 if (!xprt_request_get_cong(xprt, req)) 1041 return -EBADSLT; 1042 1043 status = xdr_alloc_bvec(xdr, rpc_task_gfp_mask()); 1044 if (status < 0) 1045 return status; 1046 req->rq_xtime = ktime_get(); 1047 status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, 0, &sent); 1048 1049 dprintk("RPC: xs_udp_send_request(%u) = %d\n", 1050 xdr->len, status); 1051 1052 /* firewall is blocking us, don't return -EAGAIN or we end up looping */ 1053 if (status == -EPERM) 1054 goto process_status; 1055 1056 if (status == -EAGAIN && sock_writeable(transport->inet)) 1057 status = -ENOBUFS; 1058 1059 if (sent > 0 || status == 0) { 1060 req->rq_xmit_bytes_sent += sent; 1061 if (sent >= req->rq_slen) 1062 return 0; 1063 /* Still some bytes left; set up for a retry later. */ 1064 status = -EAGAIN; 1065 } 1066 1067 process_status: 1068 switch (status) { 1069 case -ENOTSOCK: 1070 status = -ENOTCONN; 1071 /* Should we call xs_close() here? */ 1072 break; 1073 case -EAGAIN: 1074 status = xs_sock_nospace(req); 1075 break; 1076 case -ENETUNREACH: 1077 case -ENOBUFS: 1078 case -EPIPE: 1079 case -ECONNREFUSED: 1080 case -EPERM: 1081 /* When the server has died, an ICMP port unreachable message 1082 * prompts ECONNREFUSED. */ 1083 break; 1084 default: 1085 dprintk("RPC: sendmsg returned unrecognized error %d\n", 1086 -status); 1087 } 1088 1089 return status; 1090 } 1091 1092 /** 1093 * xs_tcp_send_request - write an RPC request to a TCP socket 1094 * @req: pointer to RPC request 1095 * 1096 * Return values: 1097 * 0: The request has been sent 1098 * EAGAIN: The socket was blocked, please call again later to 1099 * complete the request 1100 * ENOTCONN: Caller needs to invoke connect logic then call again 1101 * other: Some other error occurred, the request was not sent 1102 * 1103 * XXX: In the case of soft timeouts, should we eventually give up 1104 * if sendmsg is not able to make progress? 1105 */ 1106 static int xs_tcp_send_request(struct rpc_rqst *req) 1107 { 1108 struct rpc_xprt *xprt = req->rq_xprt; 1109 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1110 struct xdr_buf *xdr = &req->rq_snd_buf; 1111 rpc_fraghdr rm = xs_stream_record_marker(xdr); 1112 unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen; 1113 struct msghdr msg = { 1114 .msg_flags = XS_SENDMSG_FLAGS, 1115 }; 1116 bool vm_wait; 1117 unsigned int sent; 1118 int status; 1119 1120 /* Close the stream if the previous transmission was incomplete */ 1121 if (xs_send_request_was_aborted(transport, req)) { 1122 if (transport->sock != NULL) 1123 kernel_sock_shutdown(transport->sock, SHUT_RDWR); 1124 return -ENOTCONN; 1125 } 1126 if (!transport->inet) 1127 return -ENOTCONN; 1128 1129 xs_pktdump("packet data:", 1130 req->rq_svec->iov_base, 1131 req->rq_svec->iov_len); 1132 1133 if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state)) 1134 xs_tcp_set_socket_timeouts(xprt, transport->sock); 1135 1136 xs_set_srcport(transport, transport->sock); 1137 1138 /* Continue transmitting the packet/record. We must be careful 1139 * to cope with writespace callbacks arriving _after_ we have 1140 * called sendmsg(). */ 1141 req->rq_xtime = ktime_get(); 1142 tcp_sock_set_cork(transport->inet, true); 1143 1144 vm_wait = sk_stream_is_writeable(transport->inet) ? true : false; 1145 1146 do { 1147 status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 1148 transport->xmit.offset, rm, &sent); 1149 1150 dprintk("RPC: xs_tcp_send_request(%u) = %d\n", 1151 xdr->len - transport->xmit.offset, status); 1152 1153 /* If we've sent the entire packet, immediately 1154 * reset the count of bytes sent. */ 1155 transport->xmit.offset += sent; 1156 req->rq_bytes_sent = transport->xmit.offset; 1157 if (likely(req->rq_bytes_sent >= msglen)) { 1158 req->rq_xmit_bytes_sent += transport->xmit.offset; 1159 transport->xmit.offset = 0; 1160 if (atomic_long_read(&xprt->xmit_queuelen) == 1) 1161 tcp_sock_set_cork(transport->inet, false); 1162 return 0; 1163 } 1164 1165 WARN_ON_ONCE(sent == 0 && status == 0); 1166 1167 if (sent > 0) 1168 vm_wait = false; 1169 1170 } while (status == 0); 1171 1172 switch (status) { 1173 case -ENOTSOCK: 1174 status = -ENOTCONN; 1175 /* Should we call xs_close() here? */ 1176 break; 1177 case -EAGAIN: 1178 status = xs_stream_nospace(req, vm_wait); 1179 break; 1180 case -ECONNRESET: 1181 case -ECONNREFUSED: 1182 case -ENOTCONN: 1183 case -EADDRINUSE: 1184 case -ENOBUFS: 1185 case -EPIPE: 1186 break; 1187 default: 1188 dprintk("RPC: sendmsg returned unrecognized error %d\n", 1189 -status); 1190 } 1191 1192 return status; 1193 } 1194 1195 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) 1196 { 1197 transport->old_data_ready = sk->sk_data_ready; 1198 transport->old_state_change = sk->sk_state_change; 1199 transport->old_write_space = sk->sk_write_space; 1200 transport->old_error_report = sk->sk_error_report; 1201 } 1202 1203 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk) 1204 { 1205 sk->sk_data_ready = transport->old_data_ready; 1206 sk->sk_state_change = transport->old_state_change; 1207 sk->sk_write_space = transport->old_write_space; 1208 sk->sk_error_report = transport->old_error_report; 1209 } 1210 1211 static void xs_sock_reset_state_flags(struct rpc_xprt *xprt) 1212 { 1213 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1214 1215 transport->xprt_err = 0; 1216 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); 1217 clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state); 1218 clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state); 1219 clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state); 1220 clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state); 1221 clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); 1222 } 1223 1224 static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr) 1225 { 1226 set_bit(nr, &transport->sock_state); 1227 queue_work(xprtiod_workqueue, &transport->error_worker); 1228 } 1229 1230 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) 1231 { 1232 xprt->connect_cookie++; 1233 smp_mb__before_atomic(); 1234 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1235 clear_bit(XPRT_CLOSING, &xprt->state); 1236 xs_sock_reset_state_flags(xprt); 1237 smp_mb__after_atomic(); 1238 } 1239 1240 /** 1241 * xs_error_report - callback to handle TCP socket state errors 1242 * @sk: socket 1243 * 1244 * Note: we don't call sock_error() since there may be a rpc_task 1245 * using the socket, and so we don't want to clear sk->sk_err. 1246 */ 1247 static void xs_error_report(struct sock *sk) 1248 { 1249 struct sock_xprt *transport; 1250 struct rpc_xprt *xprt; 1251 1252 if (!(xprt = xprt_from_sock(sk))) 1253 return; 1254 1255 transport = container_of(xprt, struct sock_xprt, xprt); 1256 transport->xprt_err = -sk->sk_err; 1257 if (transport->xprt_err == 0) 1258 return; 1259 dprintk("RPC: xs_error_report client %p, error=%d...\n", 1260 xprt, -transport->xprt_err); 1261 trace_rpc_socket_error(xprt, sk->sk_socket, transport->xprt_err); 1262 1263 /* barrier ensures xprt_err is set before XPRT_SOCK_WAKE_ERROR */ 1264 smp_mb__before_atomic(); 1265 xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR); 1266 } 1267 1268 static void xs_reset_transport(struct sock_xprt *transport) 1269 { 1270 struct socket *sock = transport->sock; 1271 struct sock *sk = transport->inet; 1272 struct rpc_xprt *xprt = &transport->xprt; 1273 struct file *filp = transport->file; 1274 1275 if (sk == NULL) 1276 return; 1277 /* 1278 * Make sure we're calling this in a context from which it is safe 1279 * to call __fput_sync(). In practice that means rpciod and the 1280 * system workqueue. 1281 */ 1282 if (!(current->flags & PF_WQ_WORKER)) { 1283 WARN_ON_ONCE(1); 1284 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 1285 return; 1286 } 1287 1288 if (atomic_read(&transport->xprt.swapper)) 1289 sk_clear_memalloc(sk); 1290 1291 tls_handshake_cancel(sk); 1292 1293 kernel_sock_shutdown(sock, SHUT_RDWR); 1294 1295 mutex_lock(&transport->recv_mutex); 1296 lock_sock(sk); 1297 transport->inet = NULL; 1298 transport->sock = NULL; 1299 transport->file = NULL; 1300 1301 sk->sk_user_data = NULL; 1302 sk->sk_sndtimeo = 0; 1303 1304 xs_restore_old_callbacks(transport, sk); 1305 xprt_clear_connected(xprt); 1306 xs_sock_reset_connection_flags(xprt); 1307 /* Reset stream record info */ 1308 xs_stream_reset_connect(transport); 1309 release_sock(sk); 1310 mutex_unlock(&transport->recv_mutex); 1311 1312 trace_rpc_socket_close(xprt, sock); 1313 __fput_sync(filp); 1314 1315 xprt_disconnect_done(xprt); 1316 } 1317 1318 /** 1319 * xs_close - close a socket 1320 * @xprt: transport 1321 * 1322 * This is used when all requests are complete; ie, no DRC state remains 1323 * on the server we want to save. 1324 * 1325 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with 1326 * xs_reset_transport() zeroing the socket from underneath a writer. 1327 */ 1328 static void xs_close(struct rpc_xprt *xprt) 1329 { 1330 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1331 1332 dprintk("RPC: xs_close xprt %p\n", xprt); 1333 1334 if (transport->sock) 1335 tls_handshake_close(transport->sock); 1336 xs_reset_transport(transport); 1337 xprt->reestablish_timeout = 0; 1338 } 1339 1340 static void xs_inject_disconnect(struct rpc_xprt *xprt) 1341 { 1342 dprintk("RPC: injecting transport disconnect on xprt=%p\n", 1343 xprt); 1344 xprt_disconnect_done(xprt); 1345 } 1346 1347 static void xs_xprt_free(struct rpc_xprt *xprt) 1348 { 1349 xs_free_peer_addresses(xprt); 1350 xprt_free(xprt); 1351 } 1352 1353 /** 1354 * xs_destroy - prepare to shutdown a transport 1355 * @xprt: doomed transport 1356 * 1357 */ 1358 static void xs_destroy(struct rpc_xprt *xprt) 1359 { 1360 struct sock_xprt *transport = container_of(xprt, 1361 struct sock_xprt, xprt); 1362 dprintk("RPC: xs_destroy xprt %p\n", xprt); 1363 1364 cancel_delayed_work_sync(&transport->connect_worker); 1365 xs_close(xprt); 1366 cancel_work_sync(&transport->recv_worker); 1367 cancel_work_sync(&transport->error_worker); 1368 xs_xprt_free(xprt); 1369 module_put(THIS_MODULE); 1370 } 1371 1372 /** 1373 * xs_udp_data_read_skb - receive callback for UDP sockets 1374 * @xprt: transport 1375 * @sk: socket 1376 * @skb: skbuff 1377 * 1378 */ 1379 static void xs_udp_data_read_skb(struct rpc_xprt *xprt, 1380 struct sock *sk, 1381 struct sk_buff *skb) 1382 { 1383 struct rpc_task *task; 1384 struct rpc_rqst *rovr; 1385 int repsize, copied; 1386 u32 _xid; 1387 __be32 *xp; 1388 1389 repsize = skb->len; 1390 if (repsize < 4) { 1391 dprintk("RPC: impossible RPC reply size %d!\n", repsize); 1392 return; 1393 } 1394 1395 /* Copy the XID from the skb... */ 1396 xp = skb_header_pointer(skb, 0, sizeof(_xid), &_xid); 1397 if (xp == NULL) 1398 return; 1399 1400 /* Look up and lock the request corresponding to the given XID */ 1401 spin_lock(&xprt->queue_lock); 1402 rovr = xprt_lookup_rqst(xprt, *xp); 1403 if (!rovr) 1404 goto out_unlock; 1405 xprt_pin_rqst(rovr); 1406 xprt_update_rtt(rovr->rq_task); 1407 spin_unlock(&xprt->queue_lock); 1408 task = rovr->rq_task; 1409 1410 if ((copied = rovr->rq_private_buf.buflen) > repsize) 1411 copied = repsize; 1412 1413 /* Suck it into the iovec, verify checksum if not done by hw. */ 1414 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { 1415 spin_lock(&xprt->queue_lock); 1416 __UDPX_INC_STATS(sk, UDP_MIB_INERRORS); 1417 goto out_unpin; 1418 } 1419 1420 1421 spin_lock(&xprt->transport_lock); 1422 xprt_adjust_cwnd(xprt, task, copied); 1423 spin_unlock(&xprt->transport_lock); 1424 spin_lock(&xprt->queue_lock); 1425 xprt_complete_rqst(task, copied); 1426 __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); 1427 out_unpin: 1428 xprt_unpin_rqst(rovr); 1429 out_unlock: 1430 spin_unlock(&xprt->queue_lock); 1431 } 1432 1433 static void xs_udp_data_receive(struct sock_xprt *transport) 1434 { 1435 struct sk_buff *skb; 1436 struct sock *sk; 1437 int err; 1438 1439 mutex_lock(&transport->recv_mutex); 1440 sk = transport->inet; 1441 if (sk == NULL) 1442 goto out; 1443 for (;;) { 1444 skb = skb_recv_udp(sk, MSG_DONTWAIT, &err); 1445 if (skb == NULL) 1446 break; 1447 xs_udp_data_read_skb(&transport->xprt, sk, skb); 1448 consume_skb(skb); 1449 cond_resched(); 1450 } 1451 xs_poll_check_readable(transport); 1452 out: 1453 mutex_unlock(&transport->recv_mutex); 1454 } 1455 1456 static void xs_udp_data_receive_workfn(struct work_struct *work) 1457 { 1458 struct sock_xprt *transport = 1459 container_of(work, struct sock_xprt, recv_worker); 1460 unsigned int pflags = memalloc_nofs_save(); 1461 1462 xs_udp_data_receive(transport); 1463 memalloc_nofs_restore(pflags); 1464 } 1465 1466 /** 1467 * xs_data_ready - "data ready" callback for sockets 1468 * @sk: socket with data to read 1469 * 1470 */ 1471 static void xs_data_ready(struct sock *sk) 1472 { 1473 struct rpc_xprt *xprt; 1474 1475 trace_sk_data_ready(sk); 1476 1477 xprt = xprt_from_sock(sk); 1478 if (xprt != NULL) { 1479 struct sock_xprt *transport = container_of(xprt, 1480 struct sock_xprt, xprt); 1481 1482 trace_xs_data_ready(xprt); 1483 1484 transport->old_data_ready(sk); 1485 1486 if (test_bit(XPRT_SOCK_IGNORE_RECV, &transport->sock_state)) 1487 return; 1488 1489 /* Any data means we had a useful conversation, so 1490 * then we don't need to delay the next reconnect 1491 */ 1492 if (xprt->reestablish_timeout) 1493 xprt->reestablish_timeout = 0; 1494 if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) 1495 queue_work(xprtiod_workqueue, &transport->recv_worker); 1496 } 1497 } 1498 1499 /* 1500 * Helper function to force a TCP close if the server is sending 1501 * junk and/or it has put us in CLOSE_WAIT 1502 */ 1503 static void xs_tcp_force_close(struct rpc_xprt *xprt) 1504 { 1505 xprt_force_disconnect(xprt); 1506 } 1507 1508 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1509 static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt) 1510 { 1511 return PAGE_SIZE; 1512 } 1513 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1514 1515 /** 1516 * xs_local_state_change - callback to handle AF_LOCAL socket state changes 1517 * @sk: socket whose state has changed 1518 * 1519 */ 1520 static void xs_local_state_change(struct sock *sk) 1521 { 1522 struct rpc_xprt *xprt; 1523 struct sock_xprt *transport; 1524 1525 if (!(xprt = xprt_from_sock(sk))) 1526 return; 1527 transport = container_of(xprt, struct sock_xprt, xprt); 1528 if (sk->sk_shutdown & SHUTDOWN_MASK) { 1529 clear_bit(XPRT_CONNECTED, &xprt->state); 1530 /* Trigger the socket release */ 1531 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); 1532 } 1533 } 1534 1535 /** 1536 * xs_tcp_state_change - callback to handle TCP socket state changes 1537 * @sk: socket whose state has changed 1538 * 1539 */ 1540 static void xs_tcp_state_change(struct sock *sk) 1541 { 1542 struct rpc_xprt *xprt; 1543 struct sock_xprt *transport; 1544 1545 if (!(xprt = xprt_from_sock(sk))) 1546 return; 1547 dprintk("RPC: xs_tcp_state_change client %p...\n", xprt); 1548 dprintk("RPC: state %x conn %d dead %d zapped %d sk_shutdown %d\n", 1549 sk->sk_state, xprt_connected(xprt), 1550 sock_flag(sk, SOCK_DEAD), 1551 sock_flag(sk, SOCK_ZAPPED), 1552 sk->sk_shutdown); 1553 1554 transport = container_of(xprt, struct sock_xprt, xprt); 1555 trace_rpc_socket_state_change(xprt, sk->sk_socket); 1556 switch (sk->sk_state) { 1557 case TCP_ESTABLISHED: 1558 if (!xprt_test_and_set_connected(xprt)) { 1559 xprt->connect_cookie++; 1560 clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); 1561 xprt_clear_connecting(xprt); 1562 1563 xprt->stat.connect_count++; 1564 xprt->stat.connect_time += (long)jiffies - 1565 xprt->stat.connect_start; 1566 xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING); 1567 } 1568 break; 1569 case TCP_FIN_WAIT1: 1570 /* The client initiated a shutdown of the socket */ 1571 xprt->connect_cookie++; 1572 xprt->reestablish_timeout = 0; 1573 set_bit(XPRT_CLOSING, &xprt->state); 1574 smp_mb__before_atomic(); 1575 clear_bit(XPRT_CONNECTED, &xprt->state); 1576 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1577 smp_mb__after_atomic(); 1578 break; 1579 case TCP_CLOSE_WAIT: 1580 /* The server initiated a shutdown of the socket */ 1581 xprt->connect_cookie++; 1582 clear_bit(XPRT_CONNECTED, &xprt->state); 1583 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); 1584 fallthrough; 1585 case TCP_CLOSING: 1586 /* 1587 * If the server closed down the connection, make sure that 1588 * we back off before reconnecting 1589 */ 1590 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 1591 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 1592 break; 1593 case TCP_LAST_ACK: 1594 set_bit(XPRT_CLOSING, &xprt->state); 1595 smp_mb__before_atomic(); 1596 clear_bit(XPRT_CONNECTED, &xprt->state); 1597 smp_mb__after_atomic(); 1598 break; 1599 case TCP_CLOSE: 1600 if (test_and_clear_bit(XPRT_SOCK_CONNECTING, 1601 &transport->sock_state)) { 1602 xs_reset_srcport(transport); 1603 xprt_clear_connecting(xprt); 1604 } 1605 clear_bit(XPRT_CLOSING, &xprt->state); 1606 /* Trigger the socket release */ 1607 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); 1608 } 1609 } 1610 1611 static void xs_write_space(struct sock *sk) 1612 { 1613 struct sock_xprt *transport; 1614 struct rpc_xprt *xprt; 1615 1616 if (!sk->sk_socket) 1617 return; 1618 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 1619 1620 if (unlikely(!(xprt = xprt_from_sock(sk)))) 1621 return; 1622 transport = container_of(xprt, struct sock_xprt, xprt); 1623 if (!test_and_clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state)) 1624 return; 1625 xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE); 1626 sk->sk_write_pending--; 1627 } 1628 1629 /** 1630 * xs_udp_write_space - callback invoked when socket buffer space 1631 * becomes available 1632 * @sk: socket whose state has changed 1633 * 1634 * Called when more output buffer space is available for this socket. 1635 * We try not to wake our writers until they can make "significant" 1636 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1637 * with a bunch of small requests. 1638 */ 1639 static void xs_udp_write_space(struct sock *sk) 1640 { 1641 /* from net/core/sock.c:sock_def_write_space */ 1642 if (sock_writeable(sk)) 1643 xs_write_space(sk); 1644 } 1645 1646 /** 1647 * xs_tcp_write_space - callback invoked when socket buffer space 1648 * becomes available 1649 * @sk: socket whose state has changed 1650 * 1651 * Called when more output buffer space is available for this socket. 1652 * We try not to wake our writers until they can make "significant" 1653 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1654 * with a bunch of small requests. 1655 */ 1656 static void xs_tcp_write_space(struct sock *sk) 1657 { 1658 /* from net/core/stream.c:sk_stream_write_space */ 1659 if (sk_stream_is_writeable(sk)) 1660 xs_write_space(sk); 1661 } 1662 1663 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt) 1664 { 1665 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1666 struct sock *sk = transport->inet; 1667 1668 if (transport->rcvsize) { 1669 sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 1670 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2; 1671 } 1672 if (transport->sndsize) { 1673 sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 1674 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2; 1675 sk->sk_write_space(sk); 1676 } 1677 } 1678 1679 /** 1680 * xs_udp_set_buffer_size - set send and receive limits 1681 * @xprt: generic transport 1682 * @sndsize: requested size of send buffer, in bytes 1683 * @rcvsize: requested size of receive buffer, in bytes 1684 * 1685 * Set socket send and receive buffer size limits. 1686 */ 1687 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize) 1688 { 1689 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1690 1691 transport->sndsize = 0; 1692 if (sndsize) 1693 transport->sndsize = sndsize + 1024; 1694 transport->rcvsize = 0; 1695 if (rcvsize) 1696 transport->rcvsize = rcvsize + 1024; 1697 1698 xs_udp_do_set_buffer_size(xprt); 1699 } 1700 1701 /** 1702 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport 1703 * @xprt: controlling transport 1704 * @task: task that timed out 1705 * 1706 * Adjust the congestion window after a retransmit timeout has occurred. 1707 */ 1708 static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task) 1709 { 1710 spin_lock(&xprt->transport_lock); 1711 xprt_adjust_cwnd(xprt, task, -ETIMEDOUT); 1712 spin_unlock(&xprt->transport_lock); 1713 } 1714 1715 static int xs_get_random_port(void) 1716 { 1717 unsigned short min = xprt_min_resvport, max = xprt_max_resvport; 1718 unsigned short range; 1719 unsigned short rand; 1720 1721 if (max < min) 1722 return -EADDRINUSE; 1723 range = max - min + 1; 1724 rand = get_random_u32_below(range); 1725 return rand + min; 1726 } 1727 1728 static unsigned short xs_sock_getport(struct socket *sock) 1729 { 1730 struct sockaddr_storage buf; 1731 unsigned short port = 0; 1732 1733 if (kernel_getsockname(sock, (struct sockaddr *)&buf) < 0) 1734 goto out; 1735 switch (buf.ss_family) { 1736 case AF_INET6: 1737 port = ntohs(((struct sockaddr_in6 *)&buf)->sin6_port); 1738 break; 1739 case AF_INET: 1740 port = ntohs(((struct sockaddr_in *)&buf)->sin_port); 1741 } 1742 out: 1743 return port; 1744 } 1745 1746 /** 1747 * xs_set_port - reset the port number in the remote endpoint address 1748 * @xprt: generic transport 1749 * @port: new port number 1750 * 1751 */ 1752 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) 1753 { 1754 dprintk("RPC: setting port for xprt %p to %u\n", xprt, port); 1755 1756 rpc_set_port(xs_addr(xprt), port); 1757 xs_update_peer_port(xprt); 1758 } 1759 1760 static void xs_reset_srcport(struct sock_xprt *transport) 1761 { 1762 transport->srcport = 0; 1763 } 1764 1765 static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock) 1766 { 1767 if (transport->srcport == 0 && transport->xprt.reuseport) 1768 transport->srcport = xs_sock_getport(sock); 1769 } 1770 1771 static int xs_get_srcport(struct sock_xprt *transport) 1772 { 1773 int port = transport->srcport; 1774 1775 if (port == 0 && transport->xprt.resvport) 1776 port = xs_get_random_port(); 1777 return port; 1778 } 1779 1780 static unsigned short xs_sock_srcport(struct rpc_xprt *xprt) 1781 { 1782 struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt); 1783 unsigned short ret = 0; 1784 mutex_lock(&sock->recv_mutex); 1785 if (sock->sock) 1786 ret = xs_sock_getport(sock->sock); 1787 mutex_unlock(&sock->recv_mutex); 1788 return ret; 1789 } 1790 1791 static int xs_sock_srcaddr(struct rpc_xprt *xprt, char *buf, size_t buflen) 1792 { 1793 struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt); 1794 union { 1795 struct sockaddr sa; 1796 struct sockaddr_storage st; 1797 } saddr; 1798 int ret = -ENOTCONN; 1799 1800 mutex_lock(&sock->recv_mutex); 1801 if (sock->sock) { 1802 ret = kernel_getsockname(sock->sock, &saddr.sa); 1803 if (ret >= 0) 1804 ret = snprintf(buf, buflen, "%pISc", &saddr.sa); 1805 } 1806 mutex_unlock(&sock->recv_mutex); 1807 return ret; 1808 } 1809 1810 static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port) 1811 { 1812 if (transport->srcport != 0) 1813 transport->srcport = 0; 1814 if (!transport->xprt.resvport) 1815 return 0; 1816 if (port <= xprt_min_resvport || port > xprt_max_resvport) 1817 return xprt_max_resvport; 1818 return --port; 1819 } 1820 static int xs_bind(struct sock_xprt *transport, struct socket *sock) 1821 { 1822 struct sockaddr_storage myaddr; 1823 int err, nloop = 0; 1824 int port = xs_get_srcport(transport); 1825 unsigned short last; 1826 1827 /* 1828 * If we are asking for any ephemeral port (i.e. port == 0 && 1829 * transport->xprt.resvport == 0), don't bind. Let the local 1830 * port selection happen implicitly when the socket is used 1831 * (for example at connect time). 1832 * 1833 * This ensures that we can continue to establish TCP 1834 * connections even when all local ephemeral ports are already 1835 * a part of some TCP connection. This makes no difference 1836 * for UDP sockets, but also doesn't harm them. 1837 * 1838 * If we're asking for any reserved port (i.e. port == 0 && 1839 * transport->xprt.resvport == 1) xs_get_srcport above will 1840 * ensure that port is non-zero and we will bind as needed. 1841 */ 1842 if (port <= 0) 1843 return port; 1844 1845 memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen); 1846 do { 1847 rpc_set_port((struct sockaddr *)&myaddr, port); 1848 err = kernel_bind(sock, (struct sockaddr_unsized *)&myaddr, 1849 transport->xprt.addrlen); 1850 if (err == 0) { 1851 if (transport->xprt.reuseport) 1852 transport->srcport = port; 1853 break; 1854 } 1855 last = port; 1856 port = xs_next_srcport(transport, port); 1857 if (port > last) 1858 nloop++; 1859 } while (err == -EADDRINUSE && nloop != 2); 1860 1861 if (myaddr.ss_family == AF_INET) 1862 dprintk("RPC: %s %pI4:%u: %s (%d)\n", __func__, 1863 &((struct sockaddr_in *)&myaddr)->sin_addr, 1864 port, err ? "failed" : "ok", err); 1865 else 1866 dprintk("RPC: %s %pI6:%u: %s (%d)\n", __func__, 1867 &((struct sockaddr_in6 *)&myaddr)->sin6_addr, 1868 port, err ? "failed" : "ok", err); 1869 return err; 1870 } 1871 1872 /* 1873 * We don't support autobind on AF_LOCAL sockets 1874 */ 1875 static void xs_local_rpcbind(struct rpc_task *task) 1876 { 1877 xprt_set_bound(task->tk_xprt); 1878 } 1879 1880 static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port) 1881 { 1882 } 1883 1884 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1885 static struct lock_class_key xs_key[3]; 1886 static struct lock_class_key xs_slock_key[3]; 1887 1888 static inline void xs_reclassify_socketu(struct socket *sock) 1889 { 1890 struct sock *sk = sock->sk; 1891 1892 sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC", 1893 &xs_slock_key[0], "sk_lock-AF_LOCAL-RPC", &xs_key[0]); 1894 } 1895 1896 static inline void xs_reclassify_socket4(struct socket *sock) 1897 { 1898 struct sock *sk = sock->sk; 1899 1900 sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC", 1901 &xs_slock_key[1], "sk_lock-AF_INET-RPC", &xs_key[1]); 1902 } 1903 1904 static inline void xs_reclassify_socket6(struct socket *sock) 1905 { 1906 struct sock *sk = sock->sk; 1907 1908 sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC", 1909 &xs_slock_key[2], "sk_lock-AF_INET6-RPC", &xs_key[2]); 1910 } 1911 1912 static inline void xs_reclassify_socket(int family, struct socket *sock) 1913 { 1914 if (WARN_ON_ONCE(!sock_allow_reclassification(sock->sk))) 1915 return; 1916 1917 switch (family) { 1918 case AF_LOCAL: 1919 xs_reclassify_socketu(sock); 1920 break; 1921 case AF_INET: 1922 xs_reclassify_socket4(sock); 1923 break; 1924 case AF_INET6: 1925 xs_reclassify_socket6(sock); 1926 break; 1927 } 1928 } 1929 #else 1930 static inline void xs_reclassify_socket(int family, struct socket *sock) 1931 { 1932 } 1933 #endif 1934 1935 static void xs_dummy_setup_socket(struct work_struct *work) 1936 { 1937 } 1938 1939 static struct socket *xs_create_sock(struct rpc_xprt *xprt, 1940 struct sock_xprt *transport, int family, int type, 1941 int protocol, bool reuseport) 1942 { 1943 struct file *filp; 1944 struct socket *sock; 1945 int err; 1946 1947 err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1); 1948 if (err < 0) { 1949 dprintk("RPC: can't create %d transport socket (%d).\n", 1950 protocol, -err); 1951 goto out; 1952 } 1953 xs_reclassify_socket(family, sock); 1954 1955 if (reuseport) 1956 sock_set_reuseport(sock->sk); 1957 1958 err = xs_bind(transport, sock); 1959 if (err) { 1960 sock_release(sock); 1961 goto out; 1962 } 1963 1964 if (protocol == IPPROTO_TCP) 1965 sk_net_refcnt_upgrade(sock->sk); 1966 1967 filp = sock_alloc_file(sock, O_NONBLOCK, NULL); 1968 if (IS_ERR(filp)) 1969 return ERR_CAST(filp); 1970 transport->file = filp; 1971 1972 return sock; 1973 out: 1974 return ERR_PTR(err); 1975 } 1976 1977 static int xs_local_finish_connecting(struct rpc_xprt *xprt, 1978 struct socket *sock) 1979 { 1980 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, 1981 xprt); 1982 1983 if (!transport->inet) { 1984 struct sock *sk = sock->sk; 1985 1986 lock_sock(sk); 1987 1988 xs_save_old_callbacks(transport, sk); 1989 1990 sk->sk_user_data = xprt; 1991 sk->sk_data_ready = xs_data_ready; 1992 sk->sk_write_space = xs_udp_write_space; 1993 sk->sk_state_change = xs_local_state_change; 1994 sk->sk_error_report = xs_error_report; 1995 sk->sk_use_task_frag = false; 1996 1997 xprt_clear_connected(xprt); 1998 1999 /* Reset to new socket */ 2000 transport->sock = sock; 2001 transport->inet = sk; 2002 2003 release_sock(sk); 2004 } 2005 2006 xs_stream_start_connect(transport); 2007 2008 return kernel_connect(sock, (struct sockaddr_unsized *)xs_addr(xprt), xprt->addrlen, 0); 2009 } 2010 2011 /** 2012 * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint 2013 * @transport: socket transport to connect 2014 */ 2015 static int xs_local_setup_socket(struct sock_xprt *transport) 2016 { 2017 struct rpc_xprt *xprt = &transport->xprt; 2018 struct file *filp; 2019 struct socket *sock; 2020 int status; 2021 2022 status = __sock_create(xprt->xprt_net, AF_LOCAL, 2023 SOCK_STREAM, 0, &sock, 1); 2024 if (status < 0) { 2025 dprintk("RPC: can't create AF_LOCAL " 2026 "transport socket (%d).\n", -status); 2027 goto out; 2028 } 2029 xs_reclassify_socket(AF_LOCAL, sock); 2030 2031 filp = sock_alloc_file(sock, O_NONBLOCK, NULL); 2032 if (IS_ERR(filp)) { 2033 status = PTR_ERR(filp); 2034 goto out; 2035 } 2036 transport->file = filp; 2037 2038 dprintk("RPC: worker connecting xprt %p via AF_LOCAL to %s\n", 2039 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 2040 2041 status = xs_local_finish_connecting(xprt, sock); 2042 trace_rpc_socket_connect(xprt, sock, status); 2043 switch (status) { 2044 case 0: 2045 dprintk("RPC: xprt %p connected to %s\n", 2046 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 2047 xprt->stat.connect_count++; 2048 xprt->stat.connect_time += (long)jiffies - 2049 xprt->stat.connect_start; 2050 xprt_set_connected(xprt); 2051 break; 2052 case -ENOBUFS: 2053 break; 2054 case -ENOENT: 2055 dprintk("RPC: xprt %p: socket %s does not exist\n", 2056 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 2057 break; 2058 case -ECONNREFUSED: 2059 dprintk("RPC: xprt %p: connection refused for %s\n", 2060 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 2061 break; 2062 default: 2063 printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n", 2064 __func__, -status, 2065 xprt->address_strings[RPC_DISPLAY_ADDR]); 2066 } 2067 2068 out: 2069 xprt_clear_connecting(xprt); 2070 xprt_wake_pending_tasks(xprt, status); 2071 return status; 2072 } 2073 2074 static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task) 2075 { 2076 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2077 int ret; 2078 2079 if (transport->file) 2080 goto force_disconnect; 2081 2082 if (RPC_IS_ASYNC(task)) { 2083 /* 2084 * We want the AF_LOCAL connect to be resolved in the 2085 * filesystem namespace of the process making the rpc 2086 * call. Thus we connect synchronously. 2087 * 2088 * If we want to support asynchronous AF_LOCAL calls, 2089 * we'll need to figure out how to pass a namespace to 2090 * connect. 2091 */ 2092 rpc_task_set_rpc_status(task, -ENOTCONN); 2093 goto out_wake; 2094 } 2095 ret = xs_local_setup_socket(transport); 2096 if (ret && !RPC_IS_SOFTCONN(task)) 2097 msleep_interruptible(15000); 2098 return; 2099 force_disconnect: 2100 xprt_force_disconnect(xprt); 2101 out_wake: 2102 xprt_clear_connecting(xprt); 2103 xprt_wake_pending_tasks(xprt, -ENOTCONN); 2104 } 2105 2106 #if IS_ENABLED(CONFIG_SUNRPC_SWAP) 2107 /* 2108 * Note that this should be called with XPRT_LOCKED held, or recv_mutex 2109 * held, or when we otherwise know that we have exclusive access to the 2110 * socket, to guard against races with xs_reset_transport. 2111 */ 2112 static void xs_set_memalloc(struct rpc_xprt *xprt) 2113 { 2114 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, 2115 xprt); 2116 2117 /* 2118 * If there's no sock, then we have nothing to set. The 2119 * reconnecting process will get it for us. 2120 */ 2121 if (!transport->inet) 2122 return; 2123 if (atomic_read(&xprt->swapper)) 2124 sk_set_memalloc(transport->inet); 2125 } 2126 2127 /** 2128 * xs_enable_swap - Tag this transport as being used for swap. 2129 * @xprt: transport to tag 2130 * 2131 * Take a reference to this transport on behalf of the rpc_clnt, and 2132 * optionally mark it for swapping if it wasn't already. 2133 */ 2134 static int 2135 xs_enable_swap(struct rpc_xprt *xprt) 2136 { 2137 struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt); 2138 2139 mutex_lock(&xs->recv_mutex); 2140 if (atomic_inc_return(&xprt->swapper) == 1 && 2141 xs->inet) 2142 sk_set_memalloc(xs->inet); 2143 mutex_unlock(&xs->recv_mutex); 2144 return 0; 2145 } 2146 2147 /** 2148 * xs_disable_swap - Untag this transport as being used for swap. 2149 * @xprt: transport to tag 2150 * 2151 * Drop a "swapper" reference to this xprt on behalf of the rpc_clnt. If the 2152 * swapper refcount goes to 0, untag the socket as a memalloc socket. 2153 */ 2154 static void 2155 xs_disable_swap(struct rpc_xprt *xprt) 2156 { 2157 struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt); 2158 2159 mutex_lock(&xs->recv_mutex); 2160 if (atomic_dec_and_test(&xprt->swapper) && 2161 xs->inet) 2162 sk_clear_memalloc(xs->inet); 2163 mutex_unlock(&xs->recv_mutex); 2164 } 2165 #else 2166 static void xs_set_memalloc(struct rpc_xprt *xprt) 2167 { 2168 } 2169 2170 static int 2171 xs_enable_swap(struct rpc_xprt *xprt) 2172 { 2173 return -EINVAL; 2174 } 2175 2176 static void 2177 xs_disable_swap(struct rpc_xprt *xprt) 2178 { 2179 } 2180 #endif 2181 2182 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 2183 { 2184 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2185 2186 if (!transport->inet) { 2187 struct sock *sk = sock->sk; 2188 2189 lock_sock(sk); 2190 2191 xs_save_old_callbacks(transport, sk); 2192 2193 sk->sk_user_data = xprt; 2194 sk->sk_data_ready = xs_data_ready; 2195 sk->sk_write_space = xs_udp_write_space; 2196 sk->sk_use_task_frag = false; 2197 2198 xprt_set_connected(xprt); 2199 2200 /* Reset to new socket */ 2201 transport->sock = sock; 2202 transport->inet = sk; 2203 2204 xs_set_memalloc(xprt); 2205 2206 release_sock(sk); 2207 } 2208 xs_udp_do_set_buffer_size(xprt); 2209 2210 xprt->stat.connect_start = jiffies; 2211 } 2212 2213 static void xs_udp_setup_socket(struct work_struct *work) 2214 { 2215 struct sock_xprt *transport = 2216 container_of(work, struct sock_xprt, connect_worker.work); 2217 struct rpc_xprt *xprt = &transport->xprt; 2218 struct socket *sock; 2219 int status = -EIO; 2220 unsigned int pflags = current->flags; 2221 2222 if (atomic_read(&xprt->swapper)) 2223 current->flags |= PF_MEMALLOC; 2224 sock = xs_create_sock(xprt, transport, 2225 xs_addr(xprt)->sa_family, SOCK_DGRAM, 2226 IPPROTO_UDP, false); 2227 if (IS_ERR(sock)) 2228 goto out; 2229 2230 dprintk("RPC: worker connecting xprt %p via %s to " 2231 "%s (port %s)\n", xprt, 2232 xprt->address_strings[RPC_DISPLAY_PROTO], 2233 xprt->address_strings[RPC_DISPLAY_ADDR], 2234 xprt->address_strings[RPC_DISPLAY_PORT]); 2235 2236 xs_udp_finish_connecting(xprt, sock); 2237 trace_rpc_socket_connect(xprt, sock, 0); 2238 status = 0; 2239 out: 2240 xprt_clear_connecting(xprt); 2241 xprt_unlock_connect(xprt, transport); 2242 xprt_wake_pending_tasks(xprt, status); 2243 current_restore_flags(pflags, PF_MEMALLOC); 2244 } 2245 2246 /** 2247 * xs_tcp_shutdown - gracefully shut down a TCP socket 2248 * @xprt: transport 2249 * 2250 * Initiates a graceful shutdown of the TCP socket by calling the 2251 * equivalent of shutdown(SHUT_RDWR); 2252 */ 2253 static void xs_tcp_shutdown(struct rpc_xprt *xprt) 2254 { 2255 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2256 struct socket *sock = transport->sock; 2257 int skst = transport->inet ? transport->inet->sk_state : TCP_CLOSE; 2258 2259 if (sock == NULL) 2260 return; 2261 if (!xprt->reuseport) { 2262 xs_close(xprt); 2263 return; 2264 } 2265 switch (skst) { 2266 case TCP_FIN_WAIT1: 2267 case TCP_FIN_WAIT2: 2268 case TCP_LAST_ACK: 2269 break; 2270 case TCP_ESTABLISHED: 2271 case TCP_CLOSE_WAIT: 2272 kernel_sock_shutdown(sock, SHUT_RDWR); 2273 trace_rpc_socket_shutdown(xprt, sock); 2274 break; 2275 default: 2276 xs_reset_transport(transport); 2277 } 2278 } 2279 2280 static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, 2281 struct socket *sock) 2282 { 2283 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2284 struct net *net = sock_net(sock->sk); 2285 unsigned long connect_timeout; 2286 unsigned long syn_retries; 2287 unsigned int keepidle; 2288 unsigned int keepcnt; 2289 unsigned int timeo; 2290 unsigned long t; 2291 2292 spin_lock(&xprt->transport_lock); 2293 keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ); 2294 keepcnt = xprt->timeout->to_retries + 1; 2295 timeo = jiffies_to_msecs(xprt->timeout->to_initval) * 2296 (xprt->timeout->to_retries + 1); 2297 clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); 2298 spin_unlock(&xprt->transport_lock); 2299 2300 /* TCP Keepalive options */ 2301 sock_set_keepalive(sock->sk); 2302 tcp_sock_set_keepidle(sock->sk, keepidle); 2303 tcp_sock_set_keepintvl(sock->sk, keepidle); 2304 tcp_sock_set_keepcnt(sock->sk, keepcnt); 2305 2306 /* TCP user timeout (see RFC5482) */ 2307 tcp_sock_set_user_timeout(sock->sk, timeo); 2308 2309 /* Connect timeout */ 2310 connect_timeout = max_t(unsigned long, 2311 DIV_ROUND_UP(xprt->connect_timeout, HZ), 1); 2312 syn_retries = max_t(unsigned long, 2313 READ_ONCE(net->ipv4.sysctl_tcp_syn_retries), 1); 2314 for (t = 0; t <= syn_retries && (1UL << t) < connect_timeout; t++) 2315 ; 2316 if (t <= syn_retries) 2317 tcp_sock_set_syncnt(sock->sk, t - 1); 2318 } 2319 2320 static void xs_tcp_do_set_connect_timeout(struct rpc_xprt *xprt, 2321 unsigned long connect_timeout) 2322 { 2323 struct sock_xprt *transport = 2324 container_of(xprt, struct sock_xprt, xprt); 2325 struct rpc_timeout to; 2326 unsigned long initval; 2327 2328 memcpy(&to, xprt->timeout, sizeof(to)); 2329 /* Arbitrary lower limit */ 2330 initval = max_t(unsigned long, connect_timeout, XS_TCP_INIT_REEST_TO); 2331 to.to_initval = initval; 2332 to.to_maxval = initval; 2333 to.to_retries = 0; 2334 memcpy(&transport->tcp_timeout, &to, sizeof(transport->tcp_timeout)); 2335 xprt->timeout = &transport->tcp_timeout; 2336 xprt->connect_timeout = connect_timeout; 2337 } 2338 2339 static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt, 2340 unsigned long connect_timeout, 2341 unsigned long reconnect_timeout) 2342 { 2343 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2344 2345 spin_lock(&xprt->transport_lock); 2346 if (reconnect_timeout < xprt->max_reconnect_timeout) 2347 xprt->max_reconnect_timeout = reconnect_timeout; 2348 if (connect_timeout < xprt->connect_timeout) 2349 xs_tcp_do_set_connect_timeout(xprt, connect_timeout); 2350 set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); 2351 spin_unlock(&xprt->transport_lock); 2352 } 2353 2354 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 2355 { 2356 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2357 2358 if (!transport->inet) { 2359 struct sock *sk = sock->sk; 2360 2361 /* Avoid temporary address, they are bad for long-lived 2362 * connections such as NFS mounts. 2363 * RFC4941, section 3.6 suggests that: 2364 * Individual applications, which have specific 2365 * knowledge about the normal duration of connections, 2366 * MAY override this as appropriate. 2367 */ 2368 if (xs_addr(xprt)->sa_family == PF_INET6) { 2369 ip6_sock_set_addr_preferences(sk, 2370 IPV6_PREFER_SRC_PUBLIC); 2371 } 2372 2373 xs_tcp_set_socket_timeouts(xprt, sock); 2374 tcp_sock_set_nodelay(sk); 2375 2376 lock_sock(sk); 2377 2378 xs_save_old_callbacks(transport, sk); 2379 2380 sk->sk_user_data = xprt; 2381 sk->sk_data_ready = xs_data_ready; 2382 sk->sk_state_change = xs_tcp_state_change; 2383 sk->sk_write_space = xs_tcp_write_space; 2384 sk->sk_error_report = xs_error_report; 2385 sk->sk_use_task_frag = false; 2386 2387 /* socket options */ 2388 sock_reset_flag(sk, SOCK_LINGER); 2389 2390 xprt_clear_connected(xprt); 2391 2392 /* Reset to new socket */ 2393 transport->sock = sock; 2394 transport->inet = sk; 2395 2396 release_sock(sk); 2397 } 2398 2399 if (!xprt_bound(xprt)) 2400 return -ENOTCONN; 2401 2402 xs_set_memalloc(xprt); 2403 2404 xs_stream_start_connect(transport); 2405 2406 /* Tell the socket layer to start connecting... */ 2407 set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); 2408 return kernel_connect(sock, (struct sockaddr_unsized *)xs_addr(xprt), 2409 xprt->addrlen, O_NONBLOCK); 2410 } 2411 2412 /** 2413 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint 2414 * @work: queued work item 2415 * 2416 * Invoked by a work queue tasklet. 2417 */ 2418 static void xs_tcp_setup_socket(struct work_struct *work) 2419 { 2420 struct sock_xprt *transport = 2421 container_of(work, struct sock_xprt, connect_worker.work); 2422 struct socket *sock = transport->sock; 2423 struct rpc_xprt *xprt = &transport->xprt; 2424 int status; 2425 unsigned int pflags = current->flags; 2426 2427 if (atomic_read(&xprt->swapper)) 2428 current->flags |= PF_MEMALLOC; 2429 2430 if (xprt_connected(xprt)) 2431 goto out; 2432 if (test_and_clear_bit(XPRT_SOCK_CONNECT_SENT, 2433 &transport->sock_state) || 2434 !sock) { 2435 xs_reset_transport(transport); 2436 sock = xs_create_sock(xprt, transport, xs_addr(xprt)->sa_family, 2437 SOCK_STREAM, IPPROTO_TCP, true); 2438 if (IS_ERR(sock)) { 2439 xprt_wake_pending_tasks(xprt, PTR_ERR(sock)); 2440 goto out; 2441 } 2442 } 2443 2444 dprintk("RPC: worker connecting xprt %p via %s to " 2445 "%s (port %s)\n", xprt, 2446 xprt->address_strings[RPC_DISPLAY_PROTO], 2447 xprt->address_strings[RPC_DISPLAY_ADDR], 2448 xprt->address_strings[RPC_DISPLAY_PORT]); 2449 2450 status = xs_tcp_finish_connecting(xprt, sock); 2451 trace_rpc_socket_connect(xprt, sock, status); 2452 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 2453 xprt, -status, xprt_connected(xprt), 2454 sock->sk->sk_state); 2455 switch (status) { 2456 case 0: 2457 case -EINPROGRESS: 2458 /* SYN_SENT! */ 2459 set_bit(XPRT_SOCK_CONNECT_SENT, &transport->sock_state); 2460 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 2461 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2462 fallthrough; 2463 case -EALREADY: 2464 goto out_unlock; 2465 case -EADDRNOTAVAIL: 2466 /* Source port number is unavailable. Try a new one! */ 2467 transport->srcport = 0; 2468 status = -EAGAIN; 2469 break; 2470 case -EPERM: 2471 /* Happens, for instance, if a BPF program is preventing 2472 * the connect. Remap the error so upper layers can better 2473 * deal with it. 2474 */ 2475 status = -ECONNREFUSED; 2476 fallthrough; 2477 case -EINVAL: 2478 /* Happens, for instance, if the user specified a link 2479 * local IPv6 address without a scope-id. 2480 */ 2481 case -ECONNREFUSED: 2482 case -ECONNRESET: 2483 case -ENETDOWN: 2484 case -ENETUNREACH: 2485 case -EHOSTUNREACH: 2486 case -EADDRINUSE: 2487 case -ENOBUFS: 2488 case -ENOTCONN: 2489 break; 2490 default: 2491 printk("%s: connect returned unhandled error %d\n", 2492 __func__, status); 2493 status = -EAGAIN; 2494 } 2495 2496 /* xs_tcp_force_close() wakes tasks with a fixed error code. 2497 * We need to wake them first to ensure the correct error code. 2498 */ 2499 xprt_wake_pending_tasks(xprt, status); 2500 xs_tcp_force_close(xprt); 2501 out: 2502 xprt_clear_connecting(xprt); 2503 out_unlock: 2504 xprt_unlock_connect(xprt, transport); 2505 current_restore_flags(pflags, PF_MEMALLOC); 2506 } 2507 2508 /* 2509 * Transfer the connected socket to @upper_transport, then mark that 2510 * xprt CONNECTED. 2511 */ 2512 static int xs_tcp_tls_finish_connecting(struct rpc_xprt *lower_xprt, 2513 struct sock_xprt *upper_transport) 2514 { 2515 struct sock_xprt *lower_transport = 2516 container_of(lower_xprt, struct sock_xprt, xprt); 2517 struct rpc_xprt *upper_xprt = &upper_transport->xprt; 2518 2519 if (!upper_transport->inet) { 2520 struct socket *sock = lower_transport->sock; 2521 struct sock *sk = sock->sk; 2522 2523 /* Avoid temporary address, they are bad for long-lived 2524 * connections such as NFS mounts. 2525 * RFC4941, section 3.6 suggests that: 2526 * Individual applications, which have specific 2527 * knowledge about the normal duration of connections, 2528 * MAY override this as appropriate. 2529 */ 2530 if (xs_addr(upper_xprt)->sa_family == PF_INET6) 2531 ip6_sock_set_addr_preferences(sk, IPV6_PREFER_SRC_PUBLIC); 2532 2533 xs_tcp_set_socket_timeouts(upper_xprt, sock); 2534 tcp_sock_set_nodelay(sk); 2535 2536 lock_sock(sk); 2537 2538 /* @sk is already connected, so it now has the RPC callbacks. 2539 * Reach into @lower_transport to save the original ones. 2540 */ 2541 upper_transport->old_data_ready = lower_transport->old_data_ready; 2542 upper_transport->old_state_change = lower_transport->old_state_change; 2543 upper_transport->old_write_space = lower_transport->old_write_space; 2544 upper_transport->old_error_report = lower_transport->old_error_report; 2545 sk->sk_user_data = upper_xprt; 2546 2547 /* socket options */ 2548 sock_reset_flag(sk, SOCK_LINGER); 2549 2550 xprt_clear_connected(upper_xprt); 2551 2552 upper_transport->sock = sock; 2553 upper_transport->inet = sk; 2554 upper_transport->file = lower_transport->file; 2555 2556 release_sock(sk); 2557 2558 /* Reset lower_transport before shutting down its clnt */ 2559 mutex_lock(&lower_transport->recv_mutex); 2560 lower_transport->inet = NULL; 2561 lower_transport->sock = NULL; 2562 lower_transport->file = NULL; 2563 2564 xprt_clear_connected(lower_xprt); 2565 xs_sock_reset_connection_flags(lower_xprt); 2566 xs_stream_reset_connect(lower_transport); 2567 mutex_unlock(&lower_transport->recv_mutex); 2568 } 2569 2570 if (!xprt_bound(upper_xprt)) 2571 return -ENOTCONN; 2572 2573 xs_set_memalloc(upper_xprt); 2574 2575 if (!xprt_test_and_set_connected(upper_xprt)) { 2576 upper_xprt->connect_cookie++; 2577 clear_bit(XPRT_SOCK_CONNECTING, &upper_transport->sock_state); 2578 xprt_clear_connecting(upper_xprt); 2579 2580 upper_xprt->stat.connect_count++; 2581 upper_xprt->stat.connect_time += (long)jiffies - 2582 upper_xprt->stat.connect_start; 2583 xs_run_error_worker(upper_transport, XPRT_SOCK_WAKE_PENDING); 2584 } 2585 return 0; 2586 } 2587 2588 /** 2589 * xs_tls_handshake_done - TLS handshake completion handler 2590 * @data: address of xprt to wake 2591 * @status: status of handshake 2592 * @peerid: serial number of key containing the remote's identity 2593 * 2594 */ 2595 static void xs_tls_handshake_done(void *data, int status, key_serial_t peerid) 2596 { 2597 struct rpc_xprt *lower_xprt = data; 2598 struct sock_xprt *lower_transport = 2599 container_of(lower_xprt, struct sock_xprt, xprt); 2600 2601 switch (status) { 2602 case 0: 2603 case -EACCES: 2604 case -ETIMEDOUT: 2605 lower_transport->xprt_err = status; 2606 break; 2607 default: 2608 lower_transport->xprt_err = -EACCES; 2609 } 2610 complete(&lower_transport->handshake_done); 2611 xprt_put(lower_xprt); 2612 } 2613 2614 static int xs_tls_handshake_sync(struct rpc_xprt *lower_xprt, struct xprtsec_parms *xprtsec) 2615 { 2616 struct sock_xprt *lower_transport = 2617 container_of(lower_xprt, struct sock_xprt, xprt); 2618 struct tls_handshake_args args = { 2619 .ta_sock = lower_transport->sock, 2620 .ta_done = xs_tls_handshake_done, 2621 .ta_data = xprt_get(lower_xprt), 2622 .ta_peername = lower_xprt->servername, 2623 }; 2624 struct sock *sk = lower_transport->inet; 2625 int rc; 2626 2627 init_completion(&lower_transport->handshake_done); 2628 set_bit(XPRT_SOCK_IGNORE_RECV, &lower_transport->sock_state); 2629 lower_transport->xprt_err = -ETIMEDOUT; 2630 switch (xprtsec->policy) { 2631 case RPC_XPRTSEC_TLS_ANON: 2632 rc = tls_client_hello_anon(&args, GFP_KERNEL); 2633 if (rc) 2634 goto out_put_xprt; 2635 break; 2636 case RPC_XPRTSEC_TLS_X509: 2637 args.ta_my_cert = xprtsec->cert_serial; 2638 args.ta_my_privkey = xprtsec->privkey_serial; 2639 rc = tls_client_hello_x509(&args, GFP_KERNEL); 2640 if (rc) 2641 goto out_put_xprt; 2642 break; 2643 default: 2644 rc = -EACCES; 2645 goto out_put_xprt; 2646 } 2647 2648 rc = wait_for_completion_interruptible_timeout(&lower_transport->handshake_done, 2649 XS_TLS_HANDSHAKE_TO); 2650 if (rc <= 0) { 2651 tls_handshake_cancel(sk); 2652 if (rc == 0) 2653 rc = -ETIMEDOUT; 2654 goto out_put_xprt; 2655 } 2656 2657 rc = lower_transport->xprt_err; 2658 2659 out: 2660 xs_stream_reset_connect(lower_transport); 2661 clear_bit(XPRT_SOCK_IGNORE_RECV, &lower_transport->sock_state); 2662 return rc; 2663 2664 out_put_xprt: 2665 xprt_put(lower_xprt); 2666 goto out; 2667 } 2668 2669 /** 2670 * xs_tcp_tls_setup_socket - establish a TLS session on a TCP socket 2671 * @work: queued work item 2672 * 2673 * Invoked by a work queue tasklet. 2674 * 2675 * For RPC-with-TLS, there is a two-stage connection process. 2676 * 2677 * The "upper-layer xprt" is visible to the RPC consumer. Once it has 2678 * been marked connected, the consumer knows that a TCP connection and 2679 * a TLS session have been established. 2680 * 2681 * A "lower-layer xprt", created in this function, handles the mechanics 2682 * of connecting the TCP socket, performing the RPC_AUTH_TLS probe, and 2683 * then driving the TLS handshake. Once all that is complete, the upper 2684 * layer xprt is marked connected. 2685 */ 2686 static void xs_tcp_tls_setup_socket(struct work_struct *work) 2687 { 2688 struct sock_xprt *upper_transport = 2689 container_of(work, struct sock_xprt, connect_worker.work); 2690 struct rpc_clnt *upper_clnt = upper_transport->clnt; 2691 struct rpc_xprt *upper_xprt = &upper_transport->xprt; 2692 struct rpc_create_args args = { 2693 .net = upper_xprt->xprt_net, 2694 .protocol = upper_xprt->prot, 2695 .address = (struct sockaddr *)&upper_xprt->addr, 2696 .addrsize = upper_xprt->addrlen, 2697 .timeout = upper_clnt->cl_timeout, 2698 .servername = upper_xprt->servername, 2699 .program = upper_clnt->cl_program, 2700 .prognumber = upper_clnt->cl_prog, 2701 .version = upper_clnt->cl_vers, 2702 .authflavor = RPC_AUTH_TLS, 2703 .cred = upper_clnt->cl_cred, 2704 .xprtsec = { 2705 .policy = RPC_XPRTSEC_NONE, 2706 }, 2707 .stats = upper_clnt->cl_stats, 2708 }; 2709 unsigned int pflags = current->flags; 2710 struct rpc_clnt *lower_clnt; 2711 struct rpc_xprt *lower_xprt; 2712 int status; 2713 2714 if (atomic_read(&upper_xprt->swapper)) 2715 current->flags |= PF_MEMALLOC; 2716 2717 xs_stream_start_connect(upper_transport); 2718 2719 /* This implicitly sends an RPC_AUTH_TLS probe */ 2720 lower_clnt = rpc_create(&args); 2721 if (IS_ERR(lower_clnt)) { 2722 trace_rpc_tls_unavailable(upper_clnt, upper_xprt); 2723 clear_bit(XPRT_SOCK_CONNECTING, &upper_transport->sock_state); 2724 xprt_clear_connecting(upper_xprt); 2725 xprt_wake_pending_tasks(upper_xprt, PTR_ERR(lower_clnt)); 2726 xs_run_error_worker(upper_transport, XPRT_SOCK_WAKE_PENDING); 2727 goto out_unlock; 2728 } 2729 2730 /* RPC_AUTH_TLS probe was successful. Try a TLS handshake on 2731 * the lower xprt. 2732 */ 2733 rcu_read_lock(); 2734 lower_xprt = rcu_dereference(lower_clnt->cl_xprt); 2735 rcu_read_unlock(); 2736 2737 if (wait_on_bit_lock(&lower_xprt->state, XPRT_LOCKED, TASK_KILLABLE)) 2738 goto out_unlock; 2739 2740 status = xs_tls_handshake_sync(lower_xprt, &upper_xprt->xprtsec); 2741 if (status) { 2742 trace_rpc_tls_not_started(upper_clnt, upper_xprt); 2743 goto out_close; 2744 } 2745 2746 status = xs_tcp_tls_finish_connecting(lower_xprt, upper_transport); 2747 if (status) 2748 goto out_close; 2749 xprt_release_write(lower_xprt, NULL); 2750 trace_rpc_socket_connect(upper_xprt, upper_transport->sock, 0); 2751 rpc_shutdown_client(lower_clnt); 2752 2753 /* Check for ingress data that arrived before the socket's 2754 * ->data_ready callback was set up. 2755 */ 2756 xs_poll_check_readable(upper_transport); 2757 2758 out_unlock: 2759 current_restore_flags(pflags, PF_MEMALLOC); 2760 upper_transport->clnt = NULL; 2761 xprt_unlock_connect(upper_xprt, upper_transport); 2762 return; 2763 2764 out_close: 2765 xprt_release_write(lower_xprt, NULL); 2766 rpc_shutdown_client(lower_clnt); 2767 2768 /* xprt_force_disconnect() wakes tasks with a fixed tk_status code. 2769 * Wake them first here to ensure they get our tk_status code. 2770 */ 2771 xprt_wake_pending_tasks(upper_xprt, status); 2772 xs_tcp_force_close(upper_xprt); 2773 xprt_clear_connecting(upper_xprt); 2774 goto out_unlock; 2775 } 2776 2777 /** 2778 * xs_connect - connect a socket to a remote endpoint 2779 * @xprt: pointer to transport structure 2780 * @task: address of RPC task that manages state of connect request 2781 * 2782 * TCP: If the remote end dropped the connection, delay reconnecting. 2783 * 2784 * UDP socket connects are synchronous, but we use a work queue anyway 2785 * to guarantee that even unprivileged user processes can set up a 2786 * socket on a privileged port. 2787 * 2788 * If a UDP socket connect fails, the delay behavior here prevents 2789 * retry floods (hard mounts). 2790 */ 2791 static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) 2792 { 2793 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2794 unsigned long delay = 0; 2795 2796 WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport)); 2797 2798 if (transport->sock != NULL) { 2799 dprintk("RPC: xs_connect delayed xprt %p for %lu " 2800 "seconds\n", xprt, xprt->reestablish_timeout / HZ); 2801 2802 delay = xprt_reconnect_delay(xprt); 2803 xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO); 2804 2805 } else 2806 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); 2807 2808 transport->clnt = task->tk_client; 2809 queue_delayed_work(xprtiod_workqueue, 2810 &transport->connect_worker, 2811 delay); 2812 } 2813 2814 static void xs_wake_disconnect(struct sock_xprt *transport) 2815 { 2816 if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state)) 2817 xs_tcp_force_close(&transport->xprt); 2818 } 2819 2820 static void xs_wake_write(struct sock_xprt *transport) 2821 { 2822 if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state)) 2823 xprt_write_space(&transport->xprt); 2824 } 2825 2826 static void xs_wake_error(struct sock_xprt *transport) 2827 { 2828 int sockerr; 2829 2830 if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state)) 2831 return; 2832 sockerr = xchg(&transport->xprt_err, 0); 2833 if (sockerr < 0) { 2834 xprt_wake_pending_tasks(&transport->xprt, sockerr); 2835 xs_tcp_force_close(&transport->xprt); 2836 } 2837 } 2838 2839 static void xs_wake_pending(struct sock_xprt *transport) 2840 { 2841 if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state)) 2842 xprt_wake_pending_tasks(&transport->xprt, -EAGAIN); 2843 } 2844 2845 static void xs_error_handle(struct work_struct *work) 2846 { 2847 struct sock_xprt *transport = container_of(work, 2848 struct sock_xprt, error_worker); 2849 2850 xs_wake_disconnect(transport); 2851 xs_wake_write(transport); 2852 xs_wake_error(transport); 2853 xs_wake_pending(transport); 2854 } 2855 2856 /** 2857 * xs_local_print_stats - display AF_LOCAL socket-specific stats 2858 * @xprt: rpc_xprt struct containing statistics 2859 * @seq: output file 2860 * 2861 */ 2862 static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2863 { 2864 long idle_time = 0; 2865 2866 if (xprt_connected(xprt)) 2867 idle_time = (long)(jiffies - xprt->last_used) / HZ; 2868 2869 seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu " 2870 "%llu %llu %lu %llu %llu\n", 2871 xprt->stat.bind_count, 2872 xprt->stat.connect_count, 2873 xprt->stat.connect_time / HZ, 2874 idle_time, 2875 xprt->stat.sends, 2876 xprt->stat.recvs, 2877 xprt->stat.bad_xids, 2878 xprt->stat.req_u, 2879 xprt->stat.bklog_u, 2880 xprt->stat.max_slots, 2881 xprt->stat.sending_u, 2882 xprt->stat.pending_u); 2883 } 2884 2885 /** 2886 * xs_udp_print_stats - display UDP socket-specific stats 2887 * @xprt: rpc_xprt struct containing statistics 2888 * @seq: output file 2889 * 2890 */ 2891 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2892 { 2893 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2894 2895 seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %llu %llu " 2896 "%lu %llu %llu\n", 2897 transport->srcport, 2898 xprt->stat.bind_count, 2899 xprt->stat.sends, 2900 xprt->stat.recvs, 2901 xprt->stat.bad_xids, 2902 xprt->stat.req_u, 2903 xprt->stat.bklog_u, 2904 xprt->stat.max_slots, 2905 xprt->stat.sending_u, 2906 xprt->stat.pending_u); 2907 } 2908 2909 /** 2910 * xs_tcp_print_stats - display TCP socket-specific stats 2911 * @xprt: rpc_xprt struct containing statistics 2912 * @seq: output file 2913 * 2914 */ 2915 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2916 { 2917 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2918 long idle_time = 0; 2919 2920 if (xprt_connected(xprt)) 2921 idle_time = (long)(jiffies - xprt->last_used) / HZ; 2922 2923 seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu " 2924 "%llu %llu %lu %llu %llu\n", 2925 transport->srcport, 2926 xprt->stat.bind_count, 2927 xprt->stat.connect_count, 2928 xprt->stat.connect_time / HZ, 2929 idle_time, 2930 xprt->stat.sends, 2931 xprt->stat.recvs, 2932 xprt->stat.bad_xids, 2933 xprt->stat.req_u, 2934 xprt->stat.bklog_u, 2935 xprt->stat.max_slots, 2936 xprt->stat.sending_u, 2937 xprt->stat.pending_u); 2938 } 2939 2940 /* 2941 * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason 2942 * we allocate pages instead doing a kmalloc like rpc_malloc is because we want 2943 * to use the server side send routines. 2944 */ 2945 static int bc_malloc(struct rpc_task *task) 2946 { 2947 struct rpc_rqst *rqst = task->tk_rqstp; 2948 size_t size = rqst->rq_callsize; 2949 struct page *page; 2950 struct rpc_buffer *buf; 2951 2952 if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) { 2953 WARN_ONCE(1, "xprtsock: large bc buffer request (size %zu)\n", 2954 size); 2955 return -EINVAL; 2956 } 2957 2958 page = alloc_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN); 2959 if (!page) 2960 return -ENOMEM; 2961 2962 buf = page_address(page); 2963 buf->len = PAGE_SIZE; 2964 2965 rqst->rq_buffer = buf->data; 2966 rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize; 2967 return 0; 2968 } 2969 2970 /* 2971 * Free the space allocated in the bc_alloc routine 2972 */ 2973 static void bc_free(struct rpc_task *task) 2974 { 2975 void *buffer = task->tk_rqstp->rq_buffer; 2976 struct rpc_buffer *buf; 2977 2978 buf = container_of(buffer, struct rpc_buffer, data); 2979 free_page((unsigned long)buf); 2980 } 2981 2982 static int bc_sendto(struct rpc_rqst *req) 2983 { 2984 struct xdr_buf *xdr = &req->rq_snd_buf; 2985 struct sock_xprt *transport = 2986 container_of(req->rq_xprt, struct sock_xprt, xprt); 2987 struct msghdr msg = { 2988 .msg_flags = 0, 2989 }; 2990 rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | 2991 (u32)xdr->len); 2992 unsigned int sent = 0; 2993 int err; 2994 2995 req->rq_xtime = ktime_get(); 2996 err = xdr_alloc_bvec(xdr, rpc_task_gfp_mask()); 2997 if (err < 0) 2998 return err; 2999 err = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, marker, &sent); 3000 xdr_free_bvec(xdr); 3001 if (err < 0 || sent != (xdr->len + sizeof(marker))) 3002 return -EAGAIN; 3003 return sent; 3004 } 3005 3006 /** 3007 * bc_send_request - Send a backchannel Call on a TCP socket 3008 * @req: rpc_rqst containing Call message to be sent 3009 * 3010 * xpt_mutex ensures @rqstp's whole message is written to the socket 3011 * without interruption. 3012 * 3013 * Return values: 3014 * %0 if the message was sent successfully 3015 * %ENOTCONN if the message was not sent 3016 */ 3017 static int bc_send_request(struct rpc_rqst *req) 3018 { 3019 struct svc_xprt *xprt; 3020 int len; 3021 3022 /* 3023 * Get the server socket associated with this callback xprt 3024 */ 3025 xprt = req->rq_xprt->bc_xprt; 3026 3027 /* 3028 * Grab the mutex to serialize data as the connection is shared 3029 * with the fore channel 3030 */ 3031 mutex_lock(&xprt->xpt_mutex); 3032 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 3033 len = -ENOTCONN; 3034 else 3035 len = bc_sendto(req); 3036 mutex_unlock(&xprt->xpt_mutex); 3037 3038 if (len > 0) 3039 len = 0; 3040 3041 return len; 3042 } 3043 3044 static void bc_close(struct rpc_xprt *xprt) 3045 { 3046 xprt_disconnect_done(xprt); 3047 } 3048 3049 static void bc_destroy(struct rpc_xprt *xprt) 3050 { 3051 dprintk("RPC: bc_destroy xprt %p\n", xprt); 3052 3053 xs_xprt_free(xprt); 3054 module_put(THIS_MODULE); 3055 } 3056 3057 static const struct rpc_xprt_ops xs_local_ops = { 3058 .reserve_xprt = xprt_reserve_xprt, 3059 .release_xprt = xprt_release_xprt, 3060 .alloc_slot = xprt_alloc_slot, 3061 .free_slot = xprt_free_slot, 3062 .rpcbind = xs_local_rpcbind, 3063 .set_port = xs_local_set_port, 3064 .connect = xs_local_connect, 3065 .buf_alloc = rpc_malloc, 3066 .buf_free = rpc_free, 3067 .prepare_request = xs_stream_prepare_request, 3068 .send_request = xs_local_send_request, 3069 .abort_send_request = xs_stream_abort_send_request, 3070 .wait_for_reply_request = xprt_wait_for_reply_request_def, 3071 .close = xs_close, 3072 .destroy = xs_destroy, 3073 .print_stats = xs_local_print_stats, 3074 .enable_swap = xs_enable_swap, 3075 .disable_swap = xs_disable_swap, 3076 }; 3077 3078 static const struct rpc_xprt_ops xs_udp_ops = { 3079 .set_buffer_size = xs_udp_set_buffer_size, 3080 .reserve_xprt = xprt_reserve_xprt_cong, 3081 .release_xprt = xprt_release_xprt_cong, 3082 .alloc_slot = xprt_alloc_slot, 3083 .free_slot = xprt_free_slot, 3084 .rpcbind = rpcb_getport_async, 3085 .set_port = xs_set_port, 3086 .connect = xs_connect, 3087 .get_srcaddr = xs_sock_srcaddr, 3088 .get_srcport = xs_sock_srcport, 3089 .buf_alloc = rpc_malloc, 3090 .buf_free = rpc_free, 3091 .send_request = xs_udp_send_request, 3092 .wait_for_reply_request = xprt_wait_for_reply_request_rtt, 3093 .timer = xs_udp_timer, 3094 .release_request = xprt_release_rqst_cong, 3095 .close = xs_close, 3096 .destroy = xs_destroy, 3097 .print_stats = xs_udp_print_stats, 3098 .enable_swap = xs_enable_swap, 3099 .disable_swap = xs_disable_swap, 3100 .inject_disconnect = xs_inject_disconnect, 3101 }; 3102 3103 static const struct rpc_xprt_ops xs_tcp_ops = { 3104 .reserve_xprt = xprt_reserve_xprt, 3105 .release_xprt = xprt_release_xprt, 3106 .alloc_slot = xprt_alloc_slot, 3107 .free_slot = xprt_free_slot, 3108 .rpcbind = rpcb_getport_async, 3109 .set_port = xs_set_port, 3110 .connect = xs_connect, 3111 .get_srcaddr = xs_sock_srcaddr, 3112 .get_srcport = xs_sock_srcport, 3113 .buf_alloc = rpc_malloc, 3114 .buf_free = rpc_free, 3115 .prepare_request = xs_stream_prepare_request, 3116 .send_request = xs_tcp_send_request, 3117 .abort_send_request = xs_stream_abort_send_request, 3118 .wait_for_reply_request = xprt_wait_for_reply_request_def, 3119 .close = xs_tcp_shutdown, 3120 .destroy = xs_destroy, 3121 .set_connect_timeout = xs_tcp_set_connect_timeout, 3122 .print_stats = xs_tcp_print_stats, 3123 .enable_swap = xs_enable_swap, 3124 .disable_swap = xs_disable_swap, 3125 .inject_disconnect = xs_inject_disconnect, 3126 #ifdef CONFIG_SUNRPC_BACKCHANNEL 3127 .bc_setup = xprt_setup_bc, 3128 .bc_maxpayload = xs_tcp_bc_maxpayload, 3129 .bc_num_slots = xprt_bc_max_slots, 3130 .bc_free_rqst = xprt_free_bc_rqst, 3131 .bc_destroy = xprt_destroy_bc, 3132 #endif 3133 }; 3134 3135 /* 3136 * The rpc_xprt_ops for the server backchannel 3137 */ 3138 3139 static const struct rpc_xprt_ops bc_tcp_ops = { 3140 .reserve_xprt = xprt_reserve_xprt, 3141 .release_xprt = xprt_release_xprt, 3142 .alloc_slot = xprt_alloc_slot, 3143 .free_slot = xprt_free_slot, 3144 .buf_alloc = bc_malloc, 3145 .buf_free = bc_free, 3146 .send_request = bc_send_request, 3147 .wait_for_reply_request = xprt_wait_for_reply_request_def, 3148 .close = bc_close, 3149 .destroy = bc_destroy, 3150 .print_stats = xs_tcp_print_stats, 3151 .enable_swap = xs_enable_swap, 3152 .disable_swap = xs_disable_swap, 3153 .inject_disconnect = xs_inject_disconnect, 3154 }; 3155 3156 static int xs_init_anyaddr(const int family, struct sockaddr *sap) 3157 { 3158 static const struct sockaddr_in sin = { 3159 .sin_family = AF_INET, 3160 .sin_addr.s_addr = htonl(INADDR_ANY), 3161 }; 3162 static const struct sockaddr_in6 sin6 = { 3163 .sin6_family = AF_INET6, 3164 .sin6_addr = IN6ADDR_ANY_INIT, 3165 }; 3166 3167 switch (family) { 3168 case AF_LOCAL: 3169 break; 3170 case AF_INET: 3171 memcpy(sap, &sin, sizeof(sin)); 3172 break; 3173 case AF_INET6: 3174 memcpy(sap, &sin6, sizeof(sin6)); 3175 break; 3176 default: 3177 dprintk("RPC: %s: Bad address family\n", __func__); 3178 return -EAFNOSUPPORT; 3179 } 3180 return 0; 3181 } 3182 3183 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, 3184 unsigned int slot_table_size, 3185 unsigned int max_slot_table_size) 3186 { 3187 struct rpc_xprt *xprt; 3188 struct sock_xprt *new; 3189 3190 if (args->addrlen > sizeof(xprt->addr)) { 3191 dprintk("RPC: xs_setup_xprt: address too large\n"); 3192 return ERR_PTR(-EBADF); 3193 } 3194 3195 xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size, 3196 max_slot_table_size); 3197 if (xprt == NULL) { 3198 dprintk("RPC: xs_setup_xprt: couldn't allocate " 3199 "rpc_xprt\n"); 3200 return ERR_PTR(-ENOMEM); 3201 } 3202 3203 new = container_of(xprt, struct sock_xprt, xprt); 3204 mutex_init(&new->recv_mutex); 3205 memcpy(&xprt->addr, args->dstaddr, args->addrlen); 3206 xprt->addrlen = args->addrlen; 3207 if (args->srcaddr) 3208 memcpy(&new->srcaddr, args->srcaddr, args->addrlen); 3209 else { 3210 int err; 3211 err = xs_init_anyaddr(args->dstaddr->sa_family, 3212 (struct sockaddr *)&new->srcaddr); 3213 if (err != 0) { 3214 xprt_free(xprt); 3215 return ERR_PTR(err); 3216 } 3217 } 3218 3219 return xprt; 3220 } 3221 3222 static const struct rpc_timeout xs_local_default_timeout = { 3223 .to_initval = 10 * HZ, 3224 .to_maxval = 10 * HZ, 3225 .to_retries = 2, 3226 }; 3227 3228 /** 3229 * xs_setup_local - Set up transport to use an AF_LOCAL socket 3230 * @args: rpc transport creation arguments 3231 * 3232 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP 3233 */ 3234 static struct rpc_xprt *xs_setup_local(struct xprt_create *args) 3235 { 3236 struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr; 3237 struct sock_xprt *transport; 3238 struct rpc_xprt *xprt; 3239 struct rpc_xprt *ret; 3240 3241 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, 3242 xprt_max_tcp_slot_table_entries); 3243 if (IS_ERR(xprt)) 3244 return xprt; 3245 transport = container_of(xprt, struct sock_xprt, xprt); 3246 3247 xprt->prot = 0; 3248 xprt->xprt_class = &xs_local_transport; 3249 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 3250 3251 xprt->bind_timeout = XS_BIND_TO; 3252 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 3253 xprt->idle_timeout = XS_IDLE_DISC_TO; 3254 3255 xprt->ops = &xs_local_ops; 3256 xprt->timeout = &xs_local_default_timeout; 3257 3258 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); 3259 INIT_WORK(&transport->error_worker, xs_error_handle); 3260 INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); 3261 3262 switch (sun->sun_family) { 3263 case AF_LOCAL: 3264 if (sun->sun_path[0] != '/' && sun->sun_path[0] != '\0') { 3265 dprintk("RPC: bad AF_LOCAL address: %s\n", 3266 sun->sun_path); 3267 ret = ERR_PTR(-EINVAL); 3268 goto out_err; 3269 } 3270 xprt_set_bound(xprt); 3271 xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL); 3272 break; 3273 default: 3274 ret = ERR_PTR(-EAFNOSUPPORT); 3275 goto out_err; 3276 } 3277 3278 dprintk("RPC: set up xprt to %s via AF_LOCAL\n", 3279 xprt->address_strings[RPC_DISPLAY_ADDR]); 3280 3281 if (try_module_get(THIS_MODULE)) 3282 return xprt; 3283 ret = ERR_PTR(-EINVAL); 3284 out_err: 3285 xs_xprt_free(xprt); 3286 return ret; 3287 } 3288 3289 static const struct rpc_timeout xs_udp_default_timeout = { 3290 .to_initval = 5 * HZ, 3291 .to_maxval = 30 * HZ, 3292 .to_increment = 5 * HZ, 3293 .to_retries = 5, 3294 }; 3295 3296 /** 3297 * xs_setup_udp - Set up transport to use a UDP socket 3298 * @args: rpc transport creation arguments 3299 * 3300 */ 3301 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) 3302 { 3303 struct sockaddr *addr = args->dstaddr; 3304 struct rpc_xprt *xprt; 3305 struct sock_xprt *transport; 3306 struct rpc_xprt *ret; 3307 3308 xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries, 3309 xprt_udp_slot_table_entries); 3310 if (IS_ERR(xprt)) 3311 return xprt; 3312 transport = container_of(xprt, struct sock_xprt, xprt); 3313 3314 xprt->prot = IPPROTO_UDP; 3315 xprt->xprt_class = &xs_udp_transport; 3316 /* XXX: header size can vary due to auth type, IPv6, etc. */ 3317 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 3318 3319 xprt->bind_timeout = XS_BIND_TO; 3320 xprt->reestablish_timeout = XS_UDP_REEST_TO; 3321 xprt->idle_timeout = XS_IDLE_DISC_TO; 3322 3323 xprt->ops = &xs_udp_ops; 3324 3325 xprt->timeout = &xs_udp_default_timeout; 3326 3327 INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); 3328 INIT_WORK(&transport->error_worker, xs_error_handle); 3329 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); 3330 3331 switch (addr->sa_family) { 3332 case AF_INET: 3333 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 3334 xprt_set_bound(xprt); 3335 3336 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); 3337 break; 3338 case AF_INET6: 3339 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 3340 xprt_set_bound(xprt); 3341 3342 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); 3343 break; 3344 default: 3345 ret = ERR_PTR(-EAFNOSUPPORT); 3346 goto out_err; 3347 } 3348 3349 if (xprt_bound(xprt)) 3350 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 3351 xprt->address_strings[RPC_DISPLAY_ADDR], 3352 xprt->address_strings[RPC_DISPLAY_PORT], 3353 xprt->address_strings[RPC_DISPLAY_PROTO]); 3354 else 3355 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 3356 xprt->address_strings[RPC_DISPLAY_ADDR], 3357 xprt->address_strings[RPC_DISPLAY_PROTO]); 3358 3359 if (try_module_get(THIS_MODULE)) 3360 return xprt; 3361 ret = ERR_PTR(-EINVAL); 3362 out_err: 3363 xs_xprt_free(xprt); 3364 return ret; 3365 } 3366 3367 static const struct rpc_timeout xs_tcp_default_timeout = { 3368 .to_initval = 60 * HZ, 3369 .to_maxval = 60 * HZ, 3370 .to_retries = 2, 3371 }; 3372 3373 /** 3374 * xs_setup_tcp - Set up transport to use a TCP socket 3375 * @args: rpc transport creation arguments 3376 * 3377 */ 3378 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) 3379 { 3380 struct sockaddr *addr = args->dstaddr; 3381 struct rpc_xprt *xprt; 3382 struct sock_xprt *transport; 3383 struct rpc_xprt *ret; 3384 unsigned int max_slot_table_size = xprt_max_tcp_slot_table_entries; 3385 3386 if (args->flags & XPRT_CREATE_INFINITE_SLOTS) 3387 max_slot_table_size = RPC_MAX_SLOT_TABLE_LIMIT; 3388 3389 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, 3390 max_slot_table_size); 3391 if (IS_ERR(xprt)) 3392 return xprt; 3393 transport = container_of(xprt, struct sock_xprt, xprt); 3394 3395 xprt->prot = IPPROTO_TCP; 3396 xprt->xprt_class = &xs_tcp_transport; 3397 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 3398 3399 xprt->bind_timeout = XS_BIND_TO; 3400 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 3401 xprt->idle_timeout = XS_IDLE_DISC_TO; 3402 3403 xprt->ops = &xs_tcp_ops; 3404 xprt->timeout = &xs_tcp_default_timeout; 3405 3406 xprt->max_reconnect_timeout = xprt->timeout->to_maxval; 3407 if (args->reconnect_timeout) 3408 xprt->max_reconnect_timeout = args->reconnect_timeout; 3409 3410 xprt->connect_timeout = xprt->timeout->to_initval * 3411 (xprt->timeout->to_retries + 1); 3412 if (args->connect_timeout) 3413 xs_tcp_do_set_connect_timeout(xprt, args->connect_timeout); 3414 3415 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); 3416 INIT_WORK(&transport->error_worker, xs_error_handle); 3417 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); 3418 3419 switch (addr->sa_family) { 3420 case AF_INET: 3421 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 3422 xprt_set_bound(xprt); 3423 3424 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); 3425 break; 3426 case AF_INET6: 3427 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 3428 xprt_set_bound(xprt); 3429 3430 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); 3431 break; 3432 default: 3433 ret = ERR_PTR(-EAFNOSUPPORT); 3434 goto out_err; 3435 } 3436 3437 if (xprt_bound(xprt)) 3438 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 3439 xprt->address_strings[RPC_DISPLAY_ADDR], 3440 xprt->address_strings[RPC_DISPLAY_PORT], 3441 xprt->address_strings[RPC_DISPLAY_PROTO]); 3442 else 3443 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 3444 xprt->address_strings[RPC_DISPLAY_ADDR], 3445 xprt->address_strings[RPC_DISPLAY_PROTO]); 3446 3447 if (try_module_get(THIS_MODULE)) 3448 return xprt; 3449 ret = ERR_PTR(-EINVAL); 3450 out_err: 3451 xs_xprt_free(xprt); 3452 return ret; 3453 } 3454 3455 /** 3456 * xs_setup_tcp_tls - Set up transport to use a TCP with TLS 3457 * @args: rpc transport creation arguments 3458 * 3459 */ 3460 static struct rpc_xprt *xs_setup_tcp_tls(struct xprt_create *args) 3461 { 3462 struct sockaddr *addr = args->dstaddr; 3463 struct rpc_xprt *xprt; 3464 struct sock_xprt *transport; 3465 struct rpc_xprt *ret; 3466 unsigned int max_slot_table_size = xprt_max_tcp_slot_table_entries; 3467 3468 if (args->flags & XPRT_CREATE_INFINITE_SLOTS) 3469 max_slot_table_size = RPC_MAX_SLOT_TABLE_LIMIT; 3470 3471 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, 3472 max_slot_table_size); 3473 if (IS_ERR(xprt)) 3474 return xprt; 3475 transport = container_of(xprt, struct sock_xprt, xprt); 3476 3477 xprt->prot = IPPROTO_TCP; 3478 xprt->xprt_class = &xs_tcp_transport; 3479 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 3480 3481 xprt->bind_timeout = XS_BIND_TO; 3482 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 3483 xprt->idle_timeout = XS_IDLE_DISC_TO; 3484 3485 xprt->ops = &xs_tcp_ops; 3486 xprt->timeout = &xs_tcp_default_timeout; 3487 3488 xprt->max_reconnect_timeout = xprt->timeout->to_maxval; 3489 xprt->connect_timeout = xprt->timeout->to_initval * 3490 (xprt->timeout->to_retries + 1); 3491 3492 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); 3493 INIT_WORK(&transport->error_worker, xs_error_handle); 3494 3495 switch (args->xprtsec.policy) { 3496 case RPC_XPRTSEC_TLS_ANON: 3497 case RPC_XPRTSEC_TLS_X509: 3498 xprt->xprtsec = args->xprtsec; 3499 INIT_DELAYED_WORK(&transport->connect_worker, 3500 xs_tcp_tls_setup_socket); 3501 break; 3502 default: 3503 ret = ERR_PTR(-EACCES); 3504 goto out_err; 3505 } 3506 3507 switch (addr->sa_family) { 3508 case AF_INET: 3509 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 3510 xprt_set_bound(xprt); 3511 3512 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); 3513 break; 3514 case AF_INET6: 3515 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 3516 xprt_set_bound(xprt); 3517 3518 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); 3519 break; 3520 default: 3521 ret = ERR_PTR(-EAFNOSUPPORT); 3522 goto out_err; 3523 } 3524 3525 if (xprt_bound(xprt)) 3526 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 3527 xprt->address_strings[RPC_DISPLAY_ADDR], 3528 xprt->address_strings[RPC_DISPLAY_PORT], 3529 xprt->address_strings[RPC_DISPLAY_PROTO]); 3530 else 3531 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 3532 xprt->address_strings[RPC_DISPLAY_ADDR], 3533 xprt->address_strings[RPC_DISPLAY_PROTO]); 3534 3535 if (try_module_get(THIS_MODULE)) 3536 return xprt; 3537 ret = ERR_PTR(-EINVAL); 3538 out_err: 3539 xs_xprt_free(xprt); 3540 return ret; 3541 } 3542 3543 /** 3544 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket 3545 * @args: rpc transport creation arguments 3546 * 3547 */ 3548 static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args) 3549 { 3550 struct sockaddr *addr = args->dstaddr; 3551 struct rpc_xprt *xprt; 3552 struct sock_xprt *transport; 3553 struct svc_sock *bc_sock; 3554 struct rpc_xprt *ret; 3555 3556 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, 3557 xprt_tcp_slot_table_entries); 3558 if (IS_ERR(xprt)) 3559 return xprt; 3560 transport = container_of(xprt, struct sock_xprt, xprt); 3561 3562 xprt->prot = IPPROTO_TCP; 3563 xprt->xprt_class = &xs_bc_tcp_transport; 3564 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 3565 xprt->timeout = &xs_tcp_default_timeout; 3566 3567 /* backchannel */ 3568 xprt_set_bound(xprt); 3569 xprt->bind_timeout = 0; 3570 xprt->reestablish_timeout = 0; 3571 xprt->idle_timeout = 0; 3572 3573 xprt->ops = &bc_tcp_ops; 3574 3575 switch (addr->sa_family) { 3576 case AF_INET: 3577 xs_format_peer_addresses(xprt, "tcp", 3578 RPCBIND_NETID_TCP); 3579 break; 3580 case AF_INET6: 3581 xs_format_peer_addresses(xprt, "tcp", 3582 RPCBIND_NETID_TCP6); 3583 break; 3584 default: 3585 ret = ERR_PTR(-EAFNOSUPPORT); 3586 goto out_err; 3587 } 3588 3589 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 3590 xprt->address_strings[RPC_DISPLAY_ADDR], 3591 xprt->address_strings[RPC_DISPLAY_PORT], 3592 xprt->address_strings[RPC_DISPLAY_PROTO]); 3593 3594 /* 3595 * Once we've associated a backchannel xprt with a connection, 3596 * we want to keep it around as long as the connection lasts, 3597 * in case we need to start using it for a backchannel again; 3598 * this reference won't be dropped until bc_xprt is destroyed. 3599 */ 3600 xprt_get(xprt); 3601 args->bc_xprt->xpt_bc_xprt = xprt; 3602 xprt->bc_xprt = args->bc_xprt; 3603 bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt); 3604 transport->sock = bc_sock->sk_sock; 3605 transport->inet = bc_sock->sk_sk; 3606 3607 /* 3608 * Since we don't want connections for the backchannel, we set 3609 * the xprt status to connected 3610 */ 3611 xprt_set_connected(xprt); 3612 3613 if (try_module_get(THIS_MODULE)) 3614 return xprt; 3615 3616 args->bc_xprt->xpt_bc_xprt = NULL; 3617 args->bc_xprt->xpt_bc_xps = NULL; 3618 xprt_put(xprt); 3619 ret = ERR_PTR(-EINVAL); 3620 out_err: 3621 xs_xprt_free(xprt); 3622 return ret; 3623 } 3624 3625 static struct xprt_class xs_local_transport = { 3626 .list = LIST_HEAD_INIT(xs_local_transport.list), 3627 .name = "named UNIX socket", 3628 .owner = THIS_MODULE, 3629 .ident = XPRT_TRANSPORT_LOCAL, 3630 .setup = xs_setup_local, 3631 .netid = { "" }, 3632 }; 3633 3634 static struct xprt_class xs_udp_transport = { 3635 .list = LIST_HEAD_INIT(xs_udp_transport.list), 3636 .name = "udp", 3637 .owner = THIS_MODULE, 3638 .ident = XPRT_TRANSPORT_UDP, 3639 .setup = xs_setup_udp, 3640 .netid = { "udp", "udp6", "" }, 3641 }; 3642 3643 static struct xprt_class xs_tcp_transport = { 3644 .list = LIST_HEAD_INIT(xs_tcp_transport.list), 3645 .name = "tcp", 3646 .owner = THIS_MODULE, 3647 .ident = XPRT_TRANSPORT_TCP, 3648 .setup = xs_setup_tcp, 3649 .netid = { "tcp", "tcp6", "" }, 3650 }; 3651 3652 static struct xprt_class xs_tcp_tls_transport = { 3653 .list = LIST_HEAD_INIT(xs_tcp_tls_transport.list), 3654 .name = "tcp-with-tls", 3655 .owner = THIS_MODULE, 3656 .ident = XPRT_TRANSPORT_TCP_TLS, 3657 .setup = xs_setup_tcp_tls, 3658 .netid = { "tcp", "tcp6", "" }, 3659 }; 3660 3661 static struct xprt_class xs_bc_tcp_transport = { 3662 .list = LIST_HEAD_INIT(xs_bc_tcp_transport.list), 3663 .name = "tcp NFSv4.1 backchannel", 3664 .owner = THIS_MODULE, 3665 .ident = XPRT_TRANSPORT_BC_TCP, 3666 .setup = xs_setup_bc_tcp, 3667 .netid = { "" }, 3668 }; 3669 3670 /** 3671 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client 3672 * 3673 */ 3674 int init_socket_xprt(void) 3675 { 3676 if (!sunrpc_table_header) 3677 sunrpc_table_header = register_sysctl("sunrpc", xs_tunables_table); 3678 3679 xprt_register_transport(&xs_local_transport); 3680 xprt_register_transport(&xs_udp_transport); 3681 xprt_register_transport(&xs_tcp_transport); 3682 xprt_register_transport(&xs_tcp_tls_transport); 3683 xprt_register_transport(&xs_bc_tcp_transport); 3684 3685 return 0; 3686 } 3687 3688 /** 3689 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister 3690 * 3691 */ 3692 void cleanup_socket_xprt(void) 3693 { 3694 if (sunrpc_table_header) { 3695 unregister_sysctl_table(sunrpc_table_header); 3696 sunrpc_table_header = NULL; 3697 } 3698 3699 xprt_unregister_transport(&xs_local_transport); 3700 xprt_unregister_transport(&xs_udp_transport); 3701 xprt_unregister_transport(&xs_tcp_transport); 3702 xprt_unregister_transport(&xs_tcp_tls_transport); 3703 xprt_unregister_transport(&xs_bc_tcp_transport); 3704 } 3705 3706 static int param_set_portnr(const char *val, const struct kernel_param *kp) 3707 { 3708 return param_set_uint_minmax(val, kp, 3709 RPC_MIN_RESVPORT, 3710 RPC_MAX_RESVPORT); 3711 } 3712 3713 static const struct kernel_param_ops param_ops_portnr = { 3714 .set = param_set_portnr, 3715 .get = param_get_uint, 3716 }; 3717 3718 #define param_check_portnr(name, p) \ 3719 __param_check(name, p, unsigned int); 3720 3721 module_param_named(min_resvport, xprt_min_resvport, portnr, 0644); 3722 module_param_named(max_resvport, xprt_max_resvport, portnr, 0644); 3723 3724 static int param_set_slot_table_size(const char *val, 3725 const struct kernel_param *kp) 3726 { 3727 return param_set_uint_minmax(val, kp, 3728 RPC_MIN_SLOT_TABLE, 3729 RPC_MAX_SLOT_TABLE); 3730 } 3731 3732 static const struct kernel_param_ops param_ops_slot_table_size = { 3733 .set = param_set_slot_table_size, 3734 .get = param_get_uint, 3735 }; 3736 3737 #define param_check_slot_table_size(name, p) \ 3738 __param_check(name, p, unsigned int); 3739 3740 static int param_set_max_slot_table_size(const char *val, 3741 const struct kernel_param *kp) 3742 { 3743 return param_set_uint_minmax(val, kp, 3744 RPC_MIN_SLOT_TABLE, 3745 RPC_MAX_SLOT_TABLE_LIMIT); 3746 } 3747 3748 static const struct kernel_param_ops param_ops_max_slot_table_size = { 3749 .set = param_set_max_slot_table_size, 3750 .get = param_get_uint, 3751 }; 3752 3753 #define param_check_max_slot_table_size(name, p) \ 3754 __param_check(name, p, unsigned int); 3755 3756 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries, 3757 slot_table_size, 0644); 3758 module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries, 3759 max_slot_table_size, 0644); 3760 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries, 3761 slot_table_size, 0644); 3762