1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * linux/net/sunrpc/xprtsock.c 4 * 5 * Client-side transport implementation for sockets. 6 * 7 * TCP callback races fixes (C) 1998 Red Hat 8 * TCP send fixes (C) 1998 Red Hat 9 * TCP NFS related read + write fixes 10 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 11 * 12 * Rewrite of larges part of the code in order to stabilize TCP stuff. 13 * Fix behaviour when socket buffer is full. 14 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 15 * 16 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com> 17 * 18 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005. 19 * <gilles.quillard@bull.net> 20 */ 21 22 #include <linux/types.h> 23 #include <linux/string.h> 24 #include <linux/slab.h> 25 #include <linux/module.h> 26 #include <linux/capability.h> 27 #include <linux/pagemap.h> 28 #include <linux/errno.h> 29 #include <linux/socket.h> 30 #include <linux/in.h> 31 #include <linux/net.h> 32 #include <linux/mm.h> 33 #include <linux/un.h> 34 #include <linux/udp.h> 35 #include <linux/tcp.h> 36 #include <linux/sunrpc/clnt.h> 37 #include <linux/sunrpc/addr.h> 38 #include <linux/sunrpc/sched.h> 39 #include <linux/sunrpc/svcsock.h> 40 #include <linux/sunrpc/xprtsock.h> 41 #include <linux/file.h> 42 #ifdef CONFIG_SUNRPC_BACKCHANNEL 43 #include <linux/sunrpc/bc_xprt.h> 44 #endif 45 46 #include <net/sock.h> 47 #include <net/checksum.h> 48 #include <net/udp.h> 49 #include <net/tcp.h> 50 #include <net/tls_prot.h> 51 #include <net/handshake.h> 52 53 #include <linux/bvec.h> 54 #include <linux/highmem.h> 55 #include <linux/uio.h> 56 #include <linux/sched/mm.h> 57 58 #include <trace/events/sock.h> 59 #include <trace/events/sunrpc.h> 60 61 #include "socklib.h" 62 #include "sunrpc.h" 63 64 static void xs_close(struct rpc_xprt *xprt); 65 static void xs_reset_srcport(struct sock_xprt *transport); 66 static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock); 67 static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, 68 struct socket *sock); 69 70 /* 71 * xprtsock tunables 72 */ 73 static unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; 74 static unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE; 75 static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE; 76 77 static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; 78 static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; 79 80 #define XS_TCP_LINGER_TO (15U * HZ) 81 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; 82 83 /* 84 * We can register our own files under /proc/sys/sunrpc by 85 * calling register_sysctl() again. The files in that 86 * directory become the union of all files registered there. 87 * 88 * We simply need to make sure that we don't collide with 89 * someone else's file names! 90 */ 91 92 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; 93 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; 94 static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT; 95 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT; 96 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT; 97 98 static struct ctl_table_header *sunrpc_table_header; 99 100 static struct xprt_class xs_local_transport; 101 static struct xprt_class xs_udp_transport; 102 static struct xprt_class xs_tcp_transport; 103 static struct xprt_class xs_tcp_tls_transport; 104 static struct xprt_class xs_bc_tcp_transport; 105 106 /* 107 * FIXME: changing the UDP slot table size should also resize the UDP 108 * socket buffers for existing UDP transports 109 */ 110 static struct ctl_table xs_tunables_table[] = { 111 { 112 .procname = "udp_slot_table_entries", 113 .data = &xprt_udp_slot_table_entries, 114 .maxlen = sizeof(unsigned int), 115 .mode = 0644, 116 .proc_handler = proc_dointvec_minmax, 117 .extra1 = &min_slot_table_size, 118 .extra2 = &max_slot_table_size 119 }, 120 { 121 .procname = "tcp_slot_table_entries", 122 .data = &xprt_tcp_slot_table_entries, 123 .maxlen = sizeof(unsigned int), 124 .mode = 0644, 125 .proc_handler = proc_dointvec_minmax, 126 .extra1 = &min_slot_table_size, 127 .extra2 = &max_slot_table_size 128 }, 129 { 130 .procname = "tcp_max_slot_table_entries", 131 .data = &xprt_max_tcp_slot_table_entries, 132 .maxlen = sizeof(unsigned int), 133 .mode = 0644, 134 .proc_handler = proc_dointvec_minmax, 135 .extra1 = &min_slot_table_size, 136 .extra2 = &max_tcp_slot_table_limit 137 }, 138 { 139 .procname = "min_resvport", 140 .data = &xprt_min_resvport, 141 .maxlen = sizeof(unsigned int), 142 .mode = 0644, 143 .proc_handler = proc_dointvec_minmax, 144 .extra1 = &xprt_min_resvport_limit, 145 .extra2 = &xprt_max_resvport_limit 146 }, 147 { 148 .procname = "max_resvport", 149 .data = &xprt_max_resvport, 150 .maxlen = sizeof(unsigned int), 151 .mode = 0644, 152 .proc_handler = proc_dointvec_minmax, 153 .extra1 = &xprt_min_resvport_limit, 154 .extra2 = &xprt_max_resvport_limit 155 }, 156 { 157 .procname = "tcp_fin_timeout", 158 .data = &xs_tcp_fin_timeout, 159 .maxlen = sizeof(xs_tcp_fin_timeout), 160 .mode = 0644, 161 .proc_handler = proc_dointvec_jiffies, 162 }, 163 }; 164 165 /* 166 * Wait duration for a reply from the RPC portmapper. 167 */ 168 #define XS_BIND_TO (60U * HZ) 169 170 /* 171 * Delay if a UDP socket connect error occurs. This is most likely some 172 * kind of resource problem on the local host. 173 */ 174 #define XS_UDP_REEST_TO (2U * HZ) 175 176 /* 177 * The reestablish timeout allows clients to delay for a bit before attempting 178 * to reconnect to a server that just dropped our connection. 179 * 180 * We implement an exponential backoff when trying to reestablish a TCP 181 * transport connection with the server. Some servers like to drop a TCP 182 * connection when they are overworked, so we start with a short timeout and 183 * increase over time if the server is down or not responding. 184 */ 185 #define XS_TCP_INIT_REEST_TO (3U * HZ) 186 187 /* 188 * TCP idle timeout; client drops the transport socket if it is idle 189 * for this long. Note that we also timeout UDP sockets to prevent 190 * holding port numbers when there is no RPC traffic. 191 */ 192 #define XS_IDLE_DISC_TO (5U * 60 * HZ) 193 194 /* 195 * TLS handshake timeout. 196 */ 197 #define XS_TLS_HANDSHAKE_TO (10U * HZ) 198 199 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 200 # undef RPC_DEBUG_DATA 201 # define RPCDBG_FACILITY RPCDBG_TRANS 202 #endif 203 204 #ifdef RPC_DEBUG_DATA 205 static void xs_pktdump(char *msg, u32 *packet, unsigned int count) 206 { 207 u8 *buf = (u8 *) packet; 208 int j; 209 210 dprintk("RPC: %s\n", msg); 211 for (j = 0; j < count && j < 128; j += 4) { 212 if (!(j & 31)) { 213 if (j) 214 dprintk("\n"); 215 dprintk("0x%04x ", j); 216 } 217 dprintk("%02x%02x%02x%02x ", 218 buf[j], buf[j+1], buf[j+2], buf[j+3]); 219 } 220 dprintk("\n"); 221 } 222 #else 223 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count) 224 { 225 /* NOP */ 226 } 227 #endif 228 229 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) 230 { 231 return (struct rpc_xprt *) sk->sk_user_data; 232 } 233 234 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) 235 { 236 return (struct sockaddr *) &xprt->addr; 237 } 238 239 static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt) 240 { 241 return (struct sockaddr_un *) &xprt->addr; 242 } 243 244 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt) 245 { 246 return (struct sockaddr_in *) &xprt->addr; 247 } 248 249 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt) 250 { 251 return (struct sockaddr_in6 *) &xprt->addr; 252 } 253 254 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt) 255 { 256 struct sockaddr *sap = xs_addr(xprt); 257 struct sockaddr_in6 *sin6; 258 struct sockaddr_in *sin; 259 struct sockaddr_un *sun; 260 char buf[128]; 261 262 switch (sap->sa_family) { 263 case AF_LOCAL: 264 sun = xs_addr_un(xprt); 265 if (sun->sun_path[0]) { 266 strscpy(buf, sun->sun_path, sizeof(buf)); 267 } else { 268 buf[0] = '@'; 269 strscpy(buf+1, sun->sun_path+1, sizeof(buf)-1); 270 } 271 xprt->address_strings[RPC_DISPLAY_ADDR] = 272 kstrdup(buf, GFP_KERNEL); 273 break; 274 case AF_INET: 275 (void)rpc_ntop(sap, buf, sizeof(buf)); 276 xprt->address_strings[RPC_DISPLAY_ADDR] = 277 kstrdup(buf, GFP_KERNEL); 278 sin = xs_addr_in(xprt); 279 snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr)); 280 break; 281 case AF_INET6: 282 (void)rpc_ntop(sap, buf, sizeof(buf)); 283 xprt->address_strings[RPC_DISPLAY_ADDR] = 284 kstrdup(buf, GFP_KERNEL); 285 sin6 = xs_addr_in6(xprt); 286 snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr); 287 break; 288 default: 289 BUG(); 290 } 291 292 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); 293 } 294 295 static void xs_format_common_peer_ports(struct rpc_xprt *xprt) 296 { 297 struct sockaddr *sap = xs_addr(xprt); 298 char buf[128]; 299 300 snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap)); 301 xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL); 302 303 snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap)); 304 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); 305 } 306 307 static void xs_format_peer_addresses(struct rpc_xprt *xprt, 308 const char *protocol, 309 const char *netid) 310 { 311 xprt->address_strings[RPC_DISPLAY_PROTO] = protocol; 312 xprt->address_strings[RPC_DISPLAY_NETID] = netid; 313 xs_format_common_peer_addresses(xprt); 314 xs_format_common_peer_ports(xprt); 315 } 316 317 static void xs_update_peer_port(struct rpc_xprt *xprt) 318 { 319 kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]); 320 kfree(xprt->address_strings[RPC_DISPLAY_PORT]); 321 322 xs_format_common_peer_ports(xprt); 323 } 324 325 static void xs_free_peer_addresses(struct rpc_xprt *xprt) 326 { 327 unsigned int i; 328 329 for (i = 0; i < RPC_DISPLAY_MAX; i++) 330 switch (i) { 331 case RPC_DISPLAY_PROTO: 332 case RPC_DISPLAY_NETID: 333 continue; 334 default: 335 kfree(xprt->address_strings[i]); 336 } 337 } 338 339 static size_t 340 xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp) 341 { 342 size_t i,n; 343 344 if (!want || !(buf->flags & XDRBUF_SPARSE_PAGES)) 345 return want; 346 n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT; 347 for (i = 0; i < n; i++) { 348 if (buf->pages[i]) 349 continue; 350 buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp); 351 if (!buf->pages[i]) { 352 i *= PAGE_SIZE; 353 return i > buf->page_base ? i - buf->page_base : 0; 354 } 355 } 356 return want; 357 } 358 359 static int 360 xs_sock_process_cmsg(struct socket *sock, struct msghdr *msg, 361 unsigned int *msg_flags, struct cmsghdr *cmsg, int ret) 362 { 363 u8 content_type = tls_get_record_type(sock->sk, cmsg); 364 u8 level, description; 365 366 switch (content_type) { 367 case 0: 368 break; 369 case TLS_RECORD_TYPE_DATA: 370 /* TLS sets EOR at the end of each application data 371 * record, even though there might be more frames 372 * waiting to be decrypted. 373 */ 374 *msg_flags &= ~MSG_EOR; 375 break; 376 case TLS_RECORD_TYPE_ALERT: 377 tls_alert_recv(sock->sk, msg, &level, &description); 378 ret = (level == TLS_ALERT_LEVEL_FATAL) ? 379 -EACCES : -EAGAIN; 380 break; 381 default: 382 /* discard this record type */ 383 ret = -EAGAIN; 384 } 385 return ret; 386 } 387 388 static int 389 xs_sock_recv_cmsg(struct socket *sock, unsigned int *msg_flags, int flags) 390 { 391 union { 392 struct cmsghdr cmsg; 393 u8 buf[CMSG_SPACE(sizeof(u8))]; 394 } u; 395 u8 alert[2]; 396 struct kvec alert_kvec = { 397 .iov_base = alert, 398 .iov_len = sizeof(alert), 399 }; 400 struct msghdr msg = { 401 .msg_flags = *msg_flags, 402 .msg_control = &u, 403 .msg_controllen = sizeof(u), 404 }; 405 int ret; 406 407 iov_iter_kvec(&msg.msg_iter, ITER_DEST, &alert_kvec, 1, 408 alert_kvec.iov_len); 409 ret = sock_recvmsg(sock, &msg, flags); 410 if (ret > 0 && 411 tls_get_record_type(sock->sk, &u.cmsg) == TLS_RECORD_TYPE_ALERT) { 412 iov_iter_revert(&msg.msg_iter, ret); 413 ret = xs_sock_process_cmsg(sock, &msg, msg_flags, &u.cmsg, 414 -EAGAIN); 415 } 416 return ret; 417 } 418 419 static ssize_t 420 xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek) 421 { 422 ssize_t ret; 423 if (seek != 0) 424 iov_iter_advance(&msg->msg_iter, seek); 425 ret = sock_recvmsg(sock, msg, flags); 426 /* Handle TLS inband control message lazily */ 427 if (msg->msg_flags & MSG_CTRUNC) { 428 msg->msg_flags &= ~(MSG_CTRUNC | MSG_EOR); 429 if (ret == 0 || ret == -EIO) 430 ret = xs_sock_recv_cmsg(sock, &msg->msg_flags, flags); 431 } 432 return ret > 0 ? ret + seek : ret; 433 } 434 435 static ssize_t 436 xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags, 437 struct kvec *kvec, size_t count, size_t seek) 438 { 439 iov_iter_kvec(&msg->msg_iter, ITER_DEST, kvec, 1, count); 440 return xs_sock_recvmsg(sock, msg, flags, seek); 441 } 442 443 static ssize_t 444 xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags, 445 struct bio_vec *bvec, unsigned long nr, size_t count, 446 size_t seek) 447 { 448 iov_iter_bvec(&msg->msg_iter, ITER_DEST, bvec, nr, count); 449 return xs_sock_recvmsg(sock, msg, flags, seek); 450 } 451 452 static ssize_t 453 xs_read_discard(struct socket *sock, struct msghdr *msg, int flags, 454 size_t count) 455 { 456 iov_iter_discard(&msg->msg_iter, ITER_DEST, count); 457 return xs_sock_recvmsg(sock, msg, flags, 0); 458 } 459 460 #if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE 461 static void 462 xs_flush_bvec(const struct bio_vec *bvec, size_t count, size_t seek) 463 { 464 struct bvec_iter bi = { 465 .bi_size = count, 466 }; 467 struct bio_vec bv; 468 469 bvec_iter_advance(bvec, &bi, seek & PAGE_MASK); 470 for_each_bvec(bv, bvec, bi, bi) 471 flush_dcache_page(bv.bv_page); 472 } 473 #else 474 static inline void 475 xs_flush_bvec(const struct bio_vec *bvec, size_t count, size_t seek) 476 { 477 } 478 #endif 479 480 static ssize_t 481 xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, 482 struct xdr_buf *buf, size_t count, size_t seek, size_t *read) 483 { 484 size_t want, seek_init = seek, offset = 0; 485 ssize_t ret; 486 487 want = min_t(size_t, count, buf->head[0].iov_len); 488 if (seek < want) { 489 ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek); 490 if (ret <= 0) 491 goto sock_err; 492 offset += ret; 493 if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) 494 goto out; 495 if (ret != want) 496 goto out; 497 seek = 0; 498 } else { 499 seek -= want; 500 offset += want; 501 } 502 503 want = xs_alloc_sparse_pages( 504 buf, min_t(size_t, count - offset, buf->page_len), 505 GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN); 506 if (seek < want) { 507 ret = xs_read_bvec(sock, msg, flags, buf->bvec, 508 xdr_buf_pagecount(buf), 509 want + buf->page_base, 510 seek + buf->page_base); 511 if (ret <= 0) 512 goto sock_err; 513 xs_flush_bvec(buf->bvec, ret, seek + buf->page_base); 514 ret -= buf->page_base; 515 offset += ret; 516 if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) 517 goto out; 518 if (ret != want) 519 goto out; 520 seek = 0; 521 } else { 522 seek -= want; 523 offset += want; 524 } 525 526 want = min_t(size_t, count - offset, buf->tail[0].iov_len); 527 if (seek < want) { 528 ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek); 529 if (ret <= 0) 530 goto sock_err; 531 offset += ret; 532 if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) 533 goto out; 534 if (ret != want) 535 goto out; 536 } else if (offset < seek_init) 537 offset = seek_init; 538 ret = -EMSGSIZE; 539 out: 540 *read = offset - seek_init; 541 return ret; 542 sock_err: 543 offset += seek; 544 goto out; 545 } 546 547 static void 548 xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf) 549 { 550 if (!transport->recv.copied) { 551 if (buf->head[0].iov_len >= transport->recv.offset) 552 memcpy(buf->head[0].iov_base, 553 &transport->recv.xid, 554 transport->recv.offset); 555 transport->recv.copied = transport->recv.offset; 556 } 557 } 558 559 static bool 560 xs_read_stream_request_done(struct sock_xprt *transport) 561 { 562 return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT); 563 } 564 565 static void 566 xs_read_stream_check_eor(struct sock_xprt *transport, 567 struct msghdr *msg) 568 { 569 if (xs_read_stream_request_done(transport)) 570 msg->msg_flags |= MSG_EOR; 571 } 572 573 static ssize_t 574 xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg, 575 int flags, struct rpc_rqst *req) 576 { 577 struct xdr_buf *buf = &req->rq_private_buf; 578 size_t want, read; 579 ssize_t ret; 580 581 xs_read_header(transport, buf); 582 583 want = transport->recv.len - transport->recv.offset; 584 if (want != 0) { 585 ret = xs_read_xdr_buf(transport->sock, msg, flags, buf, 586 transport->recv.copied + want, 587 transport->recv.copied, 588 &read); 589 transport->recv.offset += read; 590 transport->recv.copied += read; 591 } 592 593 if (transport->recv.offset == transport->recv.len) 594 xs_read_stream_check_eor(transport, msg); 595 596 if (want == 0) 597 return 0; 598 599 switch (ret) { 600 default: 601 break; 602 case -EFAULT: 603 case -EMSGSIZE: 604 msg->msg_flags |= MSG_TRUNC; 605 return read; 606 case 0: 607 return -ESHUTDOWN; 608 } 609 return ret < 0 ? ret : read; 610 } 611 612 static size_t 613 xs_read_stream_headersize(bool isfrag) 614 { 615 if (isfrag) 616 return sizeof(__be32); 617 return 3 * sizeof(__be32); 618 } 619 620 static ssize_t 621 xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg, 622 int flags, size_t want, size_t seek) 623 { 624 struct kvec kvec = { 625 .iov_base = &transport->recv.fraghdr, 626 .iov_len = want, 627 }; 628 return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek); 629 } 630 631 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 632 static ssize_t 633 xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) 634 { 635 struct rpc_xprt *xprt = &transport->xprt; 636 struct rpc_rqst *req; 637 ssize_t ret; 638 639 /* Is this transport associated with the backchannel? */ 640 if (!xprt->bc_serv) 641 return -ESHUTDOWN; 642 643 /* Look up and lock the request corresponding to the given XID */ 644 req = xprt_lookup_bc_request(xprt, transport->recv.xid); 645 if (!req) { 646 printk(KERN_WARNING "Callback slot table overflowed\n"); 647 return -ESHUTDOWN; 648 } 649 if (transport->recv.copied && !req->rq_private_buf.len) 650 return -ESHUTDOWN; 651 652 ret = xs_read_stream_request(transport, msg, flags, req); 653 if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) 654 xprt_complete_bc_request(req, transport->recv.copied); 655 else 656 req->rq_private_buf.len = transport->recv.copied; 657 658 return ret; 659 } 660 #else /* CONFIG_SUNRPC_BACKCHANNEL */ 661 static ssize_t 662 xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) 663 { 664 return -ESHUTDOWN; 665 } 666 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 667 668 static ssize_t 669 xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags) 670 { 671 struct rpc_xprt *xprt = &transport->xprt; 672 struct rpc_rqst *req; 673 ssize_t ret = 0; 674 675 /* Look up and lock the request corresponding to the given XID */ 676 spin_lock(&xprt->queue_lock); 677 req = xprt_lookup_rqst(xprt, transport->recv.xid); 678 if (!req || (transport->recv.copied && !req->rq_private_buf.len)) { 679 msg->msg_flags |= MSG_TRUNC; 680 goto out; 681 } 682 xprt_pin_rqst(req); 683 spin_unlock(&xprt->queue_lock); 684 685 ret = xs_read_stream_request(transport, msg, flags, req); 686 687 spin_lock(&xprt->queue_lock); 688 if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) 689 xprt_complete_rqst(req->rq_task, transport->recv.copied); 690 else 691 req->rq_private_buf.len = transport->recv.copied; 692 xprt_unpin_rqst(req); 693 out: 694 spin_unlock(&xprt->queue_lock); 695 return ret; 696 } 697 698 static ssize_t 699 xs_read_stream(struct sock_xprt *transport, int flags) 700 { 701 struct msghdr msg = { 0 }; 702 size_t want, read = 0; 703 ssize_t ret = 0; 704 705 if (transport->recv.len == 0) { 706 want = xs_read_stream_headersize(transport->recv.copied != 0); 707 ret = xs_read_stream_header(transport, &msg, flags, want, 708 transport->recv.offset); 709 if (ret <= 0) 710 goto out_err; 711 transport->recv.offset = ret; 712 if (transport->recv.offset != want) 713 return transport->recv.offset; 714 transport->recv.len = be32_to_cpu(transport->recv.fraghdr) & 715 RPC_FRAGMENT_SIZE_MASK; 716 transport->recv.offset -= sizeof(transport->recv.fraghdr); 717 read = ret; 718 } 719 720 switch (be32_to_cpu(transport->recv.calldir)) { 721 default: 722 msg.msg_flags |= MSG_TRUNC; 723 break; 724 case RPC_CALL: 725 ret = xs_read_stream_call(transport, &msg, flags); 726 break; 727 case RPC_REPLY: 728 ret = xs_read_stream_reply(transport, &msg, flags); 729 } 730 if (msg.msg_flags & MSG_TRUNC) { 731 transport->recv.calldir = cpu_to_be32(-1); 732 transport->recv.copied = -1; 733 } 734 if (ret < 0) 735 goto out_err; 736 read += ret; 737 if (transport->recv.offset < transport->recv.len) { 738 if (!(msg.msg_flags & MSG_TRUNC)) 739 return read; 740 msg.msg_flags = 0; 741 ret = xs_read_discard(transport->sock, &msg, flags, 742 transport->recv.len - transport->recv.offset); 743 if (ret <= 0) 744 goto out_err; 745 transport->recv.offset += ret; 746 read += ret; 747 if (transport->recv.offset != transport->recv.len) 748 return read; 749 } 750 if (xs_read_stream_request_done(transport)) { 751 trace_xs_stream_read_request(transport); 752 transport->recv.copied = 0; 753 } 754 transport->recv.offset = 0; 755 transport->recv.len = 0; 756 return read; 757 out_err: 758 return ret != 0 ? ret : -ESHUTDOWN; 759 } 760 761 static __poll_t xs_poll_socket(struct sock_xprt *transport) 762 { 763 return transport->sock->ops->poll(transport->file, transport->sock, 764 NULL); 765 } 766 767 static bool xs_poll_socket_readable(struct sock_xprt *transport) 768 { 769 __poll_t events = xs_poll_socket(transport); 770 771 return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP); 772 } 773 774 static void xs_poll_check_readable(struct sock_xprt *transport) 775 { 776 777 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); 778 if (test_bit(XPRT_SOCK_IGNORE_RECV, &transport->sock_state)) 779 return; 780 if (!xs_poll_socket_readable(transport)) 781 return; 782 if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) 783 queue_work(xprtiod_workqueue, &transport->recv_worker); 784 } 785 786 static void xs_stream_data_receive(struct sock_xprt *transport) 787 { 788 size_t read = 0; 789 ssize_t ret = 0; 790 791 mutex_lock(&transport->recv_mutex); 792 if (transport->sock == NULL) 793 goto out; 794 for (;;) { 795 ret = xs_read_stream(transport, MSG_DONTWAIT); 796 if (ret < 0) 797 break; 798 read += ret; 799 cond_resched(); 800 } 801 if (ret == -ESHUTDOWN) 802 kernel_sock_shutdown(transport->sock, SHUT_RDWR); 803 else if (ret == -EACCES) 804 xprt_wake_pending_tasks(&transport->xprt, -EACCES); 805 else 806 xs_poll_check_readable(transport); 807 out: 808 mutex_unlock(&transport->recv_mutex); 809 trace_xs_stream_read_data(&transport->xprt, ret, read); 810 } 811 812 static void xs_stream_data_receive_workfn(struct work_struct *work) 813 { 814 struct sock_xprt *transport = 815 container_of(work, struct sock_xprt, recv_worker); 816 unsigned int pflags = memalloc_nofs_save(); 817 818 xs_stream_data_receive(transport); 819 memalloc_nofs_restore(pflags); 820 } 821 822 static void 823 xs_stream_reset_connect(struct sock_xprt *transport) 824 { 825 transport->recv.offset = 0; 826 transport->recv.len = 0; 827 transport->recv.copied = 0; 828 transport->xmit.offset = 0; 829 } 830 831 static void 832 xs_stream_start_connect(struct sock_xprt *transport) 833 { 834 transport->xprt.stat.connect_count++; 835 transport->xprt.stat.connect_start = jiffies; 836 } 837 838 #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) 839 840 /** 841 * xs_nospace - handle transmit was incomplete 842 * @req: pointer to RPC request 843 * @transport: pointer to struct sock_xprt 844 * 845 */ 846 static int xs_nospace(struct rpc_rqst *req, struct sock_xprt *transport) 847 { 848 struct rpc_xprt *xprt = &transport->xprt; 849 struct sock *sk = transport->inet; 850 int ret = -EAGAIN; 851 852 trace_rpc_socket_nospace(req, transport); 853 854 /* Protect against races with write_space */ 855 spin_lock(&xprt->transport_lock); 856 857 /* Don't race with disconnect */ 858 if (xprt_connected(xprt)) { 859 /* wait for more buffer space */ 860 set_bit(XPRT_SOCK_NOSPACE, &transport->sock_state); 861 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 862 sk->sk_write_pending++; 863 xprt_wait_for_buffer_space(xprt); 864 } else 865 ret = -ENOTCONN; 866 867 spin_unlock(&xprt->transport_lock); 868 return ret; 869 } 870 871 static int xs_sock_nospace(struct rpc_rqst *req) 872 { 873 struct sock_xprt *transport = 874 container_of(req->rq_xprt, struct sock_xprt, xprt); 875 struct sock *sk = transport->inet; 876 int ret = -EAGAIN; 877 878 lock_sock(sk); 879 if (!sock_writeable(sk)) 880 ret = xs_nospace(req, transport); 881 release_sock(sk); 882 return ret; 883 } 884 885 static int xs_stream_nospace(struct rpc_rqst *req, bool vm_wait) 886 { 887 struct sock_xprt *transport = 888 container_of(req->rq_xprt, struct sock_xprt, xprt); 889 struct sock *sk = transport->inet; 890 int ret = -EAGAIN; 891 892 if (vm_wait) 893 return -ENOBUFS; 894 lock_sock(sk); 895 if (!sk_stream_memory_free(sk)) 896 ret = xs_nospace(req, transport); 897 release_sock(sk); 898 return ret; 899 } 900 901 static int xs_stream_prepare_request(struct rpc_rqst *req, struct xdr_buf *buf) 902 { 903 return xdr_alloc_bvec(buf, rpc_task_gfp_mask()); 904 } 905 906 static void xs_stream_abort_send_request(struct rpc_rqst *req) 907 { 908 struct rpc_xprt *xprt = req->rq_xprt; 909 struct sock_xprt *transport = 910 container_of(xprt, struct sock_xprt, xprt); 911 912 if (transport->xmit.offset != 0 && 913 !test_bit(XPRT_CLOSE_WAIT, &xprt->state)) 914 xprt_force_disconnect(xprt); 915 } 916 917 /* 918 * Determine if the previous message in the stream was aborted before it 919 * could complete transmission. 920 */ 921 static bool 922 xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req) 923 { 924 return transport->xmit.offset != 0 && req->rq_bytes_sent == 0; 925 } 926 927 /* 928 * Return the stream record marker field for a record of length < 2^31-1 929 */ 930 static rpc_fraghdr 931 xs_stream_record_marker(struct xdr_buf *xdr) 932 { 933 if (!xdr->len) 934 return 0; 935 return cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | (u32)xdr->len); 936 } 937 938 /** 939 * xs_local_send_request - write an RPC request to an AF_LOCAL socket 940 * @req: pointer to RPC request 941 * 942 * Return values: 943 * 0: The request has been sent 944 * EAGAIN: The socket was blocked, please call again later to 945 * complete the request 946 * ENOTCONN: Caller needs to invoke connect logic then call again 947 * other: Some other error occurred, the request was not sent 948 */ 949 static int xs_local_send_request(struct rpc_rqst *req) 950 { 951 struct rpc_xprt *xprt = req->rq_xprt; 952 struct sock_xprt *transport = 953 container_of(xprt, struct sock_xprt, xprt); 954 struct xdr_buf *xdr = &req->rq_snd_buf; 955 rpc_fraghdr rm = xs_stream_record_marker(xdr); 956 unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen; 957 struct msghdr msg = { 958 .msg_flags = XS_SENDMSG_FLAGS, 959 }; 960 bool vm_wait; 961 unsigned int sent; 962 int status; 963 964 /* Close the stream if the previous transmission was incomplete */ 965 if (xs_send_request_was_aborted(transport, req)) { 966 xprt_force_disconnect(xprt); 967 return -ENOTCONN; 968 } 969 970 xs_pktdump("packet data:", 971 req->rq_svec->iov_base, req->rq_svec->iov_len); 972 973 vm_wait = sk_stream_is_writeable(transport->inet) ? true : false; 974 975 req->rq_xtime = ktime_get(); 976 status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 977 transport->xmit.offset, rm, &sent); 978 dprintk("RPC: %s(%u) = %d\n", 979 __func__, xdr->len - transport->xmit.offset, status); 980 981 if (likely(sent > 0) || status == 0) { 982 transport->xmit.offset += sent; 983 req->rq_bytes_sent = transport->xmit.offset; 984 if (likely(req->rq_bytes_sent >= msglen)) { 985 req->rq_xmit_bytes_sent += transport->xmit.offset; 986 transport->xmit.offset = 0; 987 return 0; 988 } 989 status = -EAGAIN; 990 vm_wait = false; 991 } 992 993 switch (status) { 994 case -EAGAIN: 995 status = xs_stream_nospace(req, vm_wait); 996 break; 997 default: 998 dprintk("RPC: sendmsg returned unrecognized error %d\n", 999 -status); 1000 fallthrough; 1001 case -EPIPE: 1002 xprt_force_disconnect(xprt); 1003 status = -ENOTCONN; 1004 } 1005 1006 return status; 1007 } 1008 1009 /** 1010 * xs_udp_send_request - write an RPC request to a UDP socket 1011 * @req: pointer to RPC request 1012 * 1013 * Return values: 1014 * 0: The request has been sent 1015 * EAGAIN: The socket was blocked, please call again later to 1016 * complete the request 1017 * ENOTCONN: Caller needs to invoke connect logic then call again 1018 * other: Some other error occurred, the request was not sent 1019 */ 1020 static int xs_udp_send_request(struct rpc_rqst *req) 1021 { 1022 struct rpc_xprt *xprt = req->rq_xprt; 1023 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1024 struct xdr_buf *xdr = &req->rq_snd_buf; 1025 struct msghdr msg = { 1026 .msg_name = xs_addr(xprt), 1027 .msg_namelen = xprt->addrlen, 1028 .msg_flags = XS_SENDMSG_FLAGS, 1029 }; 1030 unsigned int sent; 1031 int status; 1032 1033 xs_pktdump("packet data:", 1034 req->rq_svec->iov_base, 1035 req->rq_svec->iov_len); 1036 1037 if (!xprt_bound(xprt)) 1038 return -ENOTCONN; 1039 1040 if (!xprt_request_get_cong(xprt, req)) 1041 return -EBADSLT; 1042 1043 status = xdr_alloc_bvec(xdr, rpc_task_gfp_mask()); 1044 if (status < 0) 1045 return status; 1046 req->rq_xtime = ktime_get(); 1047 status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, 0, &sent); 1048 1049 dprintk("RPC: xs_udp_send_request(%u) = %d\n", 1050 xdr->len, status); 1051 1052 /* firewall is blocking us, don't return -EAGAIN or we end up looping */ 1053 if (status == -EPERM) 1054 goto process_status; 1055 1056 if (status == -EAGAIN && sock_writeable(transport->inet)) 1057 status = -ENOBUFS; 1058 1059 if (sent > 0 || status == 0) { 1060 req->rq_xmit_bytes_sent += sent; 1061 if (sent >= req->rq_slen) 1062 return 0; 1063 /* Still some bytes left; set up for a retry later. */ 1064 status = -EAGAIN; 1065 } 1066 1067 process_status: 1068 switch (status) { 1069 case -ENOTSOCK: 1070 status = -ENOTCONN; 1071 /* Should we call xs_close() here? */ 1072 break; 1073 case -EAGAIN: 1074 status = xs_sock_nospace(req); 1075 break; 1076 case -ENETUNREACH: 1077 case -ENOBUFS: 1078 case -EPIPE: 1079 case -ECONNREFUSED: 1080 case -EPERM: 1081 /* When the server has died, an ICMP port unreachable message 1082 * prompts ECONNREFUSED. */ 1083 break; 1084 default: 1085 dprintk("RPC: sendmsg returned unrecognized error %d\n", 1086 -status); 1087 } 1088 1089 return status; 1090 } 1091 1092 /** 1093 * xs_tcp_send_request - write an RPC request to a TCP socket 1094 * @req: pointer to RPC request 1095 * 1096 * Return values: 1097 * 0: The request has been sent 1098 * EAGAIN: The socket was blocked, please call again later to 1099 * complete the request 1100 * ENOTCONN: Caller needs to invoke connect logic then call again 1101 * other: Some other error occurred, the request was not sent 1102 * 1103 * XXX: In the case of soft timeouts, should we eventually give up 1104 * if sendmsg is not able to make progress? 1105 */ 1106 static int xs_tcp_send_request(struct rpc_rqst *req) 1107 { 1108 struct rpc_xprt *xprt = req->rq_xprt; 1109 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1110 struct xdr_buf *xdr = &req->rq_snd_buf; 1111 rpc_fraghdr rm = xs_stream_record_marker(xdr); 1112 unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen; 1113 struct msghdr msg = { 1114 .msg_flags = XS_SENDMSG_FLAGS, 1115 }; 1116 bool vm_wait; 1117 unsigned int sent; 1118 int status; 1119 1120 /* Close the stream if the previous transmission was incomplete */ 1121 if (xs_send_request_was_aborted(transport, req)) { 1122 if (transport->sock != NULL) 1123 kernel_sock_shutdown(transport->sock, SHUT_RDWR); 1124 return -ENOTCONN; 1125 } 1126 if (!transport->inet) 1127 return -ENOTCONN; 1128 1129 xs_pktdump("packet data:", 1130 req->rq_svec->iov_base, 1131 req->rq_svec->iov_len); 1132 1133 if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state)) 1134 xs_tcp_set_socket_timeouts(xprt, transport->sock); 1135 1136 xs_set_srcport(transport, transport->sock); 1137 1138 /* Continue transmitting the packet/record. We must be careful 1139 * to cope with writespace callbacks arriving _after_ we have 1140 * called sendmsg(). */ 1141 req->rq_xtime = ktime_get(); 1142 tcp_sock_set_cork(transport->inet, true); 1143 1144 vm_wait = sk_stream_is_writeable(transport->inet) ? true : false; 1145 1146 do { 1147 status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 1148 transport->xmit.offset, rm, &sent); 1149 1150 dprintk("RPC: xs_tcp_send_request(%u) = %d\n", 1151 xdr->len - transport->xmit.offset, status); 1152 1153 /* If we've sent the entire packet, immediately 1154 * reset the count of bytes sent. */ 1155 transport->xmit.offset += sent; 1156 req->rq_bytes_sent = transport->xmit.offset; 1157 if (likely(req->rq_bytes_sent >= msglen)) { 1158 req->rq_xmit_bytes_sent += transport->xmit.offset; 1159 transport->xmit.offset = 0; 1160 if (atomic_long_read(&xprt->xmit_queuelen) == 1) 1161 tcp_sock_set_cork(transport->inet, false); 1162 return 0; 1163 } 1164 1165 WARN_ON_ONCE(sent == 0 && status == 0); 1166 1167 if (sent > 0) 1168 vm_wait = false; 1169 1170 } while (status == 0); 1171 1172 switch (status) { 1173 case -ENOTSOCK: 1174 status = -ENOTCONN; 1175 /* Should we call xs_close() here? */ 1176 break; 1177 case -EAGAIN: 1178 status = xs_stream_nospace(req, vm_wait); 1179 break; 1180 case -ECONNRESET: 1181 case -ECONNREFUSED: 1182 case -ENOTCONN: 1183 case -EADDRINUSE: 1184 case -ENOBUFS: 1185 case -EPIPE: 1186 break; 1187 default: 1188 dprintk("RPC: sendmsg returned unrecognized error %d\n", 1189 -status); 1190 } 1191 1192 return status; 1193 } 1194 1195 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) 1196 { 1197 transport->old_data_ready = sk->sk_data_ready; 1198 transport->old_state_change = sk->sk_state_change; 1199 transport->old_write_space = sk->sk_write_space; 1200 transport->old_error_report = sk->sk_error_report; 1201 } 1202 1203 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk) 1204 { 1205 sk->sk_data_ready = transport->old_data_ready; 1206 sk->sk_state_change = transport->old_state_change; 1207 sk->sk_write_space = transport->old_write_space; 1208 sk->sk_error_report = transport->old_error_report; 1209 } 1210 1211 static void xs_sock_reset_state_flags(struct rpc_xprt *xprt) 1212 { 1213 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1214 1215 transport->xprt_err = 0; 1216 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); 1217 clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state); 1218 clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state); 1219 clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state); 1220 clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state); 1221 clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); 1222 } 1223 1224 static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr) 1225 { 1226 set_bit(nr, &transport->sock_state); 1227 queue_work(xprtiod_workqueue, &transport->error_worker); 1228 } 1229 1230 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) 1231 { 1232 xprt->connect_cookie++; 1233 smp_mb__before_atomic(); 1234 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1235 clear_bit(XPRT_CLOSING, &xprt->state); 1236 xs_sock_reset_state_flags(xprt); 1237 smp_mb__after_atomic(); 1238 } 1239 1240 /** 1241 * xs_error_report - callback to handle TCP socket state errors 1242 * @sk: socket 1243 * 1244 * Note: we don't call sock_error() since there may be a rpc_task 1245 * using the socket, and so we don't want to clear sk->sk_err. 1246 */ 1247 static void xs_error_report(struct sock *sk) 1248 { 1249 struct sock_xprt *transport; 1250 struct rpc_xprt *xprt; 1251 1252 if (!(xprt = xprt_from_sock(sk))) 1253 return; 1254 1255 transport = container_of(xprt, struct sock_xprt, xprt); 1256 transport->xprt_err = -sk->sk_err; 1257 if (transport->xprt_err == 0) 1258 return; 1259 dprintk("RPC: xs_error_report client %p, error=%d...\n", 1260 xprt, -transport->xprt_err); 1261 trace_rpc_socket_error(xprt, sk->sk_socket, transport->xprt_err); 1262 1263 /* barrier ensures xprt_err is set before XPRT_SOCK_WAKE_ERROR */ 1264 smp_mb__before_atomic(); 1265 xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR); 1266 } 1267 1268 static void xs_reset_transport(struct sock_xprt *transport) 1269 { 1270 struct socket *sock = transport->sock; 1271 struct sock *sk = transport->inet; 1272 struct rpc_xprt *xprt = &transport->xprt; 1273 struct file *filp = transport->file; 1274 1275 if (sk == NULL) 1276 return; 1277 /* 1278 * Make sure we're calling this in a context from which it is safe 1279 * to call __fput_sync(). In practice that means rpciod and the 1280 * system workqueue. 1281 */ 1282 if (!(current->flags & PF_WQ_WORKER)) { 1283 WARN_ON_ONCE(1); 1284 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 1285 return; 1286 } 1287 1288 if (atomic_read(&transport->xprt.swapper)) 1289 sk_clear_memalloc(sk); 1290 1291 tls_handshake_cancel(sk); 1292 1293 kernel_sock_shutdown(sock, SHUT_RDWR); 1294 1295 mutex_lock(&transport->recv_mutex); 1296 lock_sock(sk); 1297 transport->inet = NULL; 1298 transport->sock = NULL; 1299 transport->file = NULL; 1300 1301 sk->sk_user_data = NULL; 1302 sk->sk_sndtimeo = 0; 1303 1304 xs_restore_old_callbacks(transport, sk); 1305 xprt_clear_connected(xprt); 1306 xs_sock_reset_connection_flags(xprt); 1307 /* Reset stream record info */ 1308 xs_stream_reset_connect(transport); 1309 release_sock(sk); 1310 mutex_unlock(&transport->recv_mutex); 1311 1312 trace_rpc_socket_close(xprt, sock); 1313 __fput_sync(filp); 1314 1315 xprt_disconnect_done(xprt); 1316 } 1317 1318 /** 1319 * xs_close - close a socket 1320 * @xprt: transport 1321 * 1322 * This is used when all requests are complete; ie, no DRC state remains 1323 * on the server we want to save. 1324 * 1325 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with 1326 * xs_reset_transport() zeroing the socket from underneath a writer. 1327 */ 1328 static void xs_close(struct rpc_xprt *xprt) 1329 { 1330 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1331 1332 dprintk("RPC: xs_close xprt %p\n", xprt); 1333 1334 if (transport->sock) 1335 tls_handshake_close(transport->sock); 1336 xs_reset_transport(transport); 1337 xprt->reestablish_timeout = 0; 1338 } 1339 1340 static void xs_inject_disconnect(struct rpc_xprt *xprt) 1341 { 1342 dprintk("RPC: injecting transport disconnect on xprt=%p\n", 1343 xprt); 1344 xprt_disconnect_done(xprt); 1345 } 1346 1347 static void xs_xprt_free(struct rpc_xprt *xprt) 1348 { 1349 xs_free_peer_addresses(xprt); 1350 xprt_free(xprt); 1351 } 1352 1353 /** 1354 * xs_destroy - prepare to shutdown a transport 1355 * @xprt: doomed transport 1356 * 1357 */ 1358 static void xs_destroy(struct rpc_xprt *xprt) 1359 { 1360 struct sock_xprt *transport = container_of(xprt, 1361 struct sock_xprt, xprt); 1362 dprintk("RPC: xs_destroy xprt %p\n", xprt); 1363 1364 cancel_delayed_work_sync(&transport->connect_worker); 1365 xs_close(xprt); 1366 cancel_work_sync(&transport->recv_worker); 1367 cancel_work_sync(&transport->error_worker); 1368 xs_xprt_free(xprt); 1369 module_put(THIS_MODULE); 1370 } 1371 1372 /** 1373 * xs_udp_data_read_skb - receive callback for UDP sockets 1374 * @xprt: transport 1375 * @sk: socket 1376 * @skb: skbuff 1377 * 1378 */ 1379 static void xs_udp_data_read_skb(struct rpc_xprt *xprt, 1380 struct sock *sk, 1381 struct sk_buff *skb) 1382 { 1383 struct rpc_task *task; 1384 struct rpc_rqst *rovr; 1385 int repsize, copied; 1386 u32 _xid; 1387 __be32 *xp; 1388 1389 repsize = skb->len; 1390 if (repsize < 4) { 1391 dprintk("RPC: impossible RPC reply size %d!\n", repsize); 1392 return; 1393 } 1394 1395 /* Copy the XID from the skb... */ 1396 xp = skb_header_pointer(skb, 0, sizeof(_xid), &_xid); 1397 if (xp == NULL) 1398 return; 1399 1400 /* Look up and lock the request corresponding to the given XID */ 1401 spin_lock(&xprt->queue_lock); 1402 rovr = xprt_lookup_rqst(xprt, *xp); 1403 if (!rovr) 1404 goto out_unlock; 1405 xprt_pin_rqst(rovr); 1406 xprt_update_rtt(rovr->rq_task); 1407 spin_unlock(&xprt->queue_lock); 1408 task = rovr->rq_task; 1409 1410 if ((copied = rovr->rq_private_buf.buflen) > repsize) 1411 copied = repsize; 1412 1413 /* Suck it into the iovec, verify checksum if not done by hw. */ 1414 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { 1415 spin_lock(&xprt->queue_lock); 1416 __UDPX_INC_STATS(sk, UDP_MIB_INERRORS); 1417 goto out_unpin; 1418 } 1419 1420 1421 spin_lock(&xprt->transport_lock); 1422 xprt_adjust_cwnd(xprt, task, copied); 1423 spin_unlock(&xprt->transport_lock); 1424 spin_lock(&xprt->queue_lock); 1425 xprt_complete_rqst(task, copied); 1426 __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); 1427 out_unpin: 1428 xprt_unpin_rqst(rovr); 1429 out_unlock: 1430 spin_unlock(&xprt->queue_lock); 1431 } 1432 1433 static void xs_udp_data_receive(struct sock_xprt *transport) 1434 { 1435 struct sk_buff *skb; 1436 struct sock *sk; 1437 int err; 1438 1439 mutex_lock(&transport->recv_mutex); 1440 sk = transport->inet; 1441 if (sk == NULL) 1442 goto out; 1443 for (;;) { 1444 skb = skb_recv_udp(sk, MSG_DONTWAIT, &err); 1445 if (skb == NULL) 1446 break; 1447 xs_udp_data_read_skb(&transport->xprt, sk, skb); 1448 consume_skb(skb); 1449 cond_resched(); 1450 } 1451 xs_poll_check_readable(transport); 1452 out: 1453 mutex_unlock(&transport->recv_mutex); 1454 } 1455 1456 static void xs_udp_data_receive_workfn(struct work_struct *work) 1457 { 1458 struct sock_xprt *transport = 1459 container_of(work, struct sock_xprt, recv_worker); 1460 unsigned int pflags = memalloc_nofs_save(); 1461 1462 xs_udp_data_receive(transport); 1463 memalloc_nofs_restore(pflags); 1464 } 1465 1466 /** 1467 * xs_data_ready - "data ready" callback for sockets 1468 * @sk: socket with data to read 1469 * 1470 */ 1471 static void xs_data_ready(struct sock *sk) 1472 { 1473 struct rpc_xprt *xprt; 1474 1475 trace_sk_data_ready(sk); 1476 1477 xprt = xprt_from_sock(sk); 1478 if (xprt != NULL) { 1479 struct sock_xprt *transport = container_of(xprt, 1480 struct sock_xprt, xprt); 1481 1482 trace_xs_data_ready(xprt); 1483 1484 transport->old_data_ready(sk); 1485 1486 if (test_bit(XPRT_SOCK_IGNORE_RECV, &transport->sock_state)) 1487 return; 1488 1489 /* Any data means we had a useful conversation, so 1490 * then we don't need to delay the next reconnect 1491 */ 1492 if (xprt->reestablish_timeout) 1493 xprt->reestablish_timeout = 0; 1494 if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) 1495 queue_work(xprtiod_workqueue, &transport->recv_worker); 1496 } 1497 } 1498 1499 /* 1500 * Helper function to force a TCP close if the server is sending 1501 * junk and/or it has put us in CLOSE_WAIT 1502 */ 1503 static void xs_tcp_force_close(struct rpc_xprt *xprt) 1504 { 1505 xprt_force_disconnect(xprt); 1506 } 1507 1508 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1509 static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt) 1510 { 1511 return PAGE_SIZE; 1512 } 1513 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1514 1515 /** 1516 * xs_local_state_change - callback to handle AF_LOCAL socket state changes 1517 * @sk: socket whose state has changed 1518 * 1519 */ 1520 static void xs_local_state_change(struct sock *sk) 1521 { 1522 struct rpc_xprt *xprt; 1523 struct sock_xprt *transport; 1524 1525 if (!(xprt = xprt_from_sock(sk))) 1526 return; 1527 transport = container_of(xprt, struct sock_xprt, xprt); 1528 if (sk->sk_shutdown & SHUTDOWN_MASK) { 1529 clear_bit(XPRT_CONNECTED, &xprt->state); 1530 /* Trigger the socket release */ 1531 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); 1532 } 1533 } 1534 1535 /** 1536 * xs_tcp_state_change - callback to handle TCP socket state changes 1537 * @sk: socket whose state has changed 1538 * 1539 */ 1540 static void xs_tcp_state_change(struct sock *sk) 1541 { 1542 struct rpc_xprt *xprt; 1543 struct sock_xprt *transport; 1544 1545 if (!(xprt = xprt_from_sock(sk))) 1546 return; 1547 dprintk("RPC: xs_tcp_state_change client %p...\n", xprt); 1548 dprintk("RPC: state %x conn %d dead %d zapped %d sk_shutdown %d\n", 1549 sk->sk_state, xprt_connected(xprt), 1550 sock_flag(sk, SOCK_DEAD), 1551 sock_flag(sk, SOCK_ZAPPED), 1552 sk->sk_shutdown); 1553 1554 transport = container_of(xprt, struct sock_xprt, xprt); 1555 trace_rpc_socket_state_change(xprt, sk->sk_socket); 1556 switch (sk->sk_state) { 1557 case TCP_ESTABLISHED: 1558 if (!xprt_test_and_set_connected(xprt)) { 1559 xprt->connect_cookie++; 1560 clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); 1561 xprt_clear_connecting(xprt); 1562 1563 xprt->stat.connect_count++; 1564 xprt->stat.connect_time += (long)jiffies - 1565 xprt->stat.connect_start; 1566 xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING); 1567 } 1568 break; 1569 case TCP_FIN_WAIT1: 1570 /* The client initiated a shutdown of the socket */ 1571 xprt->connect_cookie++; 1572 xprt->reestablish_timeout = 0; 1573 set_bit(XPRT_CLOSING, &xprt->state); 1574 smp_mb__before_atomic(); 1575 clear_bit(XPRT_CONNECTED, &xprt->state); 1576 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1577 smp_mb__after_atomic(); 1578 break; 1579 case TCP_CLOSE_WAIT: 1580 /* The server initiated a shutdown of the socket */ 1581 xprt->connect_cookie++; 1582 clear_bit(XPRT_CONNECTED, &xprt->state); 1583 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); 1584 fallthrough; 1585 case TCP_CLOSING: 1586 /* 1587 * If the server closed down the connection, make sure that 1588 * we back off before reconnecting 1589 */ 1590 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 1591 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 1592 break; 1593 case TCP_LAST_ACK: 1594 set_bit(XPRT_CLOSING, &xprt->state); 1595 smp_mb__before_atomic(); 1596 clear_bit(XPRT_CONNECTED, &xprt->state); 1597 smp_mb__after_atomic(); 1598 break; 1599 case TCP_CLOSE: 1600 if (test_and_clear_bit(XPRT_SOCK_CONNECTING, 1601 &transport->sock_state)) { 1602 xs_reset_srcport(transport); 1603 xprt_clear_connecting(xprt); 1604 } 1605 clear_bit(XPRT_CLOSING, &xprt->state); 1606 /* Trigger the socket release */ 1607 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); 1608 } 1609 } 1610 1611 static void xs_write_space(struct sock *sk) 1612 { 1613 struct sock_xprt *transport; 1614 struct rpc_xprt *xprt; 1615 1616 if (!sk->sk_socket) 1617 return; 1618 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 1619 1620 if (unlikely(!(xprt = xprt_from_sock(sk)))) 1621 return; 1622 transport = container_of(xprt, struct sock_xprt, xprt); 1623 if (!test_and_clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state)) 1624 return; 1625 xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE); 1626 sk->sk_write_pending--; 1627 } 1628 1629 /** 1630 * xs_udp_write_space - callback invoked when socket buffer space 1631 * becomes available 1632 * @sk: socket whose state has changed 1633 * 1634 * Called when more output buffer space is available for this socket. 1635 * We try not to wake our writers until they can make "significant" 1636 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1637 * with a bunch of small requests. 1638 */ 1639 static void xs_udp_write_space(struct sock *sk) 1640 { 1641 /* from net/core/sock.c:sock_def_write_space */ 1642 if (sock_writeable(sk)) 1643 xs_write_space(sk); 1644 } 1645 1646 /** 1647 * xs_tcp_write_space - callback invoked when socket buffer space 1648 * becomes available 1649 * @sk: socket whose state has changed 1650 * 1651 * Called when more output buffer space is available for this socket. 1652 * We try not to wake our writers until they can make "significant" 1653 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1654 * with a bunch of small requests. 1655 */ 1656 static void xs_tcp_write_space(struct sock *sk) 1657 { 1658 /* from net/core/stream.c:sk_stream_write_space */ 1659 if (sk_stream_is_writeable(sk)) 1660 xs_write_space(sk); 1661 } 1662 1663 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt) 1664 { 1665 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1666 struct sock *sk = transport->inet; 1667 1668 if (transport->rcvsize) { 1669 sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 1670 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2; 1671 } 1672 if (transport->sndsize) { 1673 sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 1674 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2; 1675 sk->sk_write_space(sk); 1676 } 1677 } 1678 1679 /** 1680 * xs_udp_set_buffer_size - set send and receive limits 1681 * @xprt: generic transport 1682 * @sndsize: requested size of send buffer, in bytes 1683 * @rcvsize: requested size of receive buffer, in bytes 1684 * 1685 * Set socket send and receive buffer size limits. 1686 */ 1687 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize) 1688 { 1689 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1690 1691 transport->sndsize = 0; 1692 if (sndsize) 1693 transport->sndsize = sndsize + 1024; 1694 transport->rcvsize = 0; 1695 if (rcvsize) 1696 transport->rcvsize = rcvsize + 1024; 1697 1698 xs_udp_do_set_buffer_size(xprt); 1699 } 1700 1701 /** 1702 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport 1703 * @xprt: controlling transport 1704 * @task: task that timed out 1705 * 1706 * Adjust the congestion window after a retransmit timeout has occurred. 1707 */ 1708 static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task) 1709 { 1710 spin_lock(&xprt->transport_lock); 1711 xprt_adjust_cwnd(xprt, task, -ETIMEDOUT); 1712 spin_unlock(&xprt->transport_lock); 1713 } 1714 1715 static int xs_get_random_port(void) 1716 { 1717 unsigned short min = xprt_min_resvport, max = xprt_max_resvport; 1718 unsigned short range; 1719 unsigned short rand; 1720 1721 if (max < min) 1722 return -EADDRINUSE; 1723 range = max - min + 1; 1724 rand = get_random_u32_below(range); 1725 return rand + min; 1726 } 1727 1728 static unsigned short xs_sock_getport(struct socket *sock) 1729 { 1730 struct sockaddr_storage buf; 1731 unsigned short port = 0; 1732 1733 if (kernel_getsockname(sock, (struct sockaddr *)&buf) < 0) 1734 goto out; 1735 switch (buf.ss_family) { 1736 case AF_INET6: 1737 port = ntohs(((struct sockaddr_in6 *)&buf)->sin6_port); 1738 break; 1739 case AF_INET: 1740 port = ntohs(((struct sockaddr_in *)&buf)->sin_port); 1741 } 1742 out: 1743 return port; 1744 } 1745 1746 /** 1747 * xs_set_port - reset the port number in the remote endpoint address 1748 * @xprt: generic transport 1749 * @port: new port number 1750 * 1751 */ 1752 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) 1753 { 1754 dprintk("RPC: setting port for xprt %p to %u\n", xprt, port); 1755 1756 rpc_set_port(xs_addr(xprt), port); 1757 xs_update_peer_port(xprt); 1758 } 1759 1760 static void xs_reset_srcport(struct sock_xprt *transport) 1761 { 1762 transport->srcport = 0; 1763 } 1764 1765 static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock) 1766 { 1767 if (transport->srcport == 0 && transport->xprt.reuseport) 1768 transport->srcport = xs_sock_getport(sock); 1769 } 1770 1771 static int xs_get_srcport(struct sock_xprt *transport) 1772 { 1773 int port = transport->srcport; 1774 1775 if (port == 0 && transport->xprt.resvport) 1776 port = xs_get_random_port(); 1777 return port; 1778 } 1779 1780 static unsigned short xs_sock_srcport(struct rpc_xprt *xprt) 1781 { 1782 struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt); 1783 unsigned short ret = 0; 1784 mutex_lock(&sock->recv_mutex); 1785 if (sock->sock) 1786 ret = xs_sock_getport(sock->sock); 1787 mutex_unlock(&sock->recv_mutex); 1788 return ret; 1789 } 1790 1791 static int xs_sock_srcaddr(struct rpc_xprt *xprt, char *buf, size_t buflen) 1792 { 1793 struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt); 1794 union { 1795 struct sockaddr sa; 1796 struct sockaddr_storage st; 1797 } saddr; 1798 int ret = -ENOTCONN; 1799 1800 mutex_lock(&sock->recv_mutex); 1801 if (sock->sock) { 1802 ret = kernel_getsockname(sock->sock, &saddr.sa); 1803 if (ret >= 0) 1804 ret = snprintf(buf, buflen, "%pISc", &saddr.sa); 1805 } 1806 mutex_unlock(&sock->recv_mutex); 1807 return ret; 1808 } 1809 1810 static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port) 1811 { 1812 if (transport->srcport != 0) 1813 transport->srcport = 0; 1814 if (!transport->xprt.resvport) 1815 return 0; 1816 if (port <= xprt_min_resvport || port > xprt_max_resvport) 1817 return xprt_max_resvport; 1818 return --port; 1819 } 1820 static int xs_bind(struct sock_xprt *transport, struct socket *sock) 1821 { 1822 struct sockaddr_storage myaddr; 1823 int err, nloop = 0; 1824 int port = xs_get_srcport(transport); 1825 unsigned short last; 1826 1827 /* 1828 * If we are asking for any ephemeral port (i.e. port == 0 && 1829 * transport->xprt.resvport == 0), don't bind. Let the local 1830 * port selection happen implicitly when the socket is used 1831 * (for example at connect time). 1832 * 1833 * This ensures that we can continue to establish TCP 1834 * connections even when all local ephemeral ports are already 1835 * a part of some TCP connection. This makes no difference 1836 * for UDP sockets, but also doesn't harm them. 1837 * 1838 * If we're asking for any reserved port (i.e. port == 0 && 1839 * transport->xprt.resvport == 1) xs_get_srcport above will 1840 * ensure that port is non-zero and we will bind as needed. 1841 */ 1842 if (port <= 0) 1843 return port; 1844 1845 memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen); 1846 do { 1847 rpc_set_port((struct sockaddr *)&myaddr, port); 1848 err = kernel_bind(sock, (struct sockaddr *)&myaddr, 1849 transport->xprt.addrlen); 1850 if (err == 0) { 1851 if (transport->xprt.reuseport) 1852 transport->srcport = port; 1853 break; 1854 } 1855 last = port; 1856 port = xs_next_srcport(transport, port); 1857 if (port > last) 1858 nloop++; 1859 } while (err == -EADDRINUSE && nloop != 2); 1860 1861 if (myaddr.ss_family == AF_INET) 1862 dprintk("RPC: %s %pI4:%u: %s (%d)\n", __func__, 1863 &((struct sockaddr_in *)&myaddr)->sin_addr, 1864 port, err ? "failed" : "ok", err); 1865 else 1866 dprintk("RPC: %s %pI6:%u: %s (%d)\n", __func__, 1867 &((struct sockaddr_in6 *)&myaddr)->sin6_addr, 1868 port, err ? "failed" : "ok", err); 1869 return err; 1870 } 1871 1872 /* 1873 * We don't support autobind on AF_LOCAL sockets 1874 */ 1875 static void xs_local_rpcbind(struct rpc_task *task) 1876 { 1877 xprt_set_bound(task->tk_xprt); 1878 } 1879 1880 static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port) 1881 { 1882 } 1883 1884 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1885 static struct lock_class_key xs_key[3]; 1886 static struct lock_class_key xs_slock_key[3]; 1887 1888 static inline void xs_reclassify_socketu(struct socket *sock) 1889 { 1890 struct sock *sk = sock->sk; 1891 1892 sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC", 1893 &xs_slock_key[0], "sk_lock-AF_LOCAL-RPC", &xs_key[0]); 1894 } 1895 1896 static inline void xs_reclassify_socket4(struct socket *sock) 1897 { 1898 struct sock *sk = sock->sk; 1899 1900 sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC", 1901 &xs_slock_key[1], "sk_lock-AF_INET-RPC", &xs_key[1]); 1902 } 1903 1904 static inline void xs_reclassify_socket6(struct socket *sock) 1905 { 1906 struct sock *sk = sock->sk; 1907 1908 sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC", 1909 &xs_slock_key[2], "sk_lock-AF_INET6-RPC", &xs_key[2]); 1910 } 1911 1912 static inline void xs_reclassify_socket(int family, struct socket *sock) 1913 { 1914 if (WARN_ON_ONCE(!sock_allow_reclassification(sock->sk))) 1915 return; 1916 1917 switch (family) { 1918 case AF_LOCAL: 1919 xs_reclassify_socketu(sock); 1920 break; 1921 case AF_INET: 1922 xs_reclassify_socket4(sock); 1923 break; 1924 case AF_INET6: 1925 xs_reclassify_socket6(sock); 1926 break; 1927 } 1928 } 1929 #else 1930 static inline void xs_reclassify_socket(int family, struct socket *sock) 1931 { 1932 } 1933 #endif 1934 1935 static void xs_dummy_setup_socket(struct work_struct *work) 1936 { 1937 } 1938 1939 static struct socket *xs_create_sock(struct rpc_xprt *xprt, 1940 struct sock_xprt *transport, int family, int type, 1941 int protocol, bool reuseport) 1942 { 1943 struct file *filp; 1944 struct socket *sock; 1945 int err; 1946 1947 err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1); 1948 if (err < 0) { 1949 dprintk("RPC: can't create %d transport socket (%d).\n", 1950 protocol, -err); 1951 goto out; 1952 } 1953 xs_reclassify_socket(family, sock); 1954 1955 if (reuseport) 1956 sock_set_reuseport(sock->sk); 1957 1958 err = xs_bind(transport, sock); 1959 if (err) { 1960 sock_release(sock); 1961 goto out; 1962 } 1963 1964 if (protocol == IPPROTO_TCP) 1965 sk_net_refcnt_upgrade(sock->sk); 1966 1967 filp = sock_alloc_file(sock, O_NONBLOCK, NULL); 1968 if (IS_ERR(filp)) 1969 return ERR_CAST(filp); 1970 transport->file = filp; 1971 1972 return sock; 1973 out: 1974 return ERR_PTR(err); 1975 } 1976 1977 static int xs_local_finish_connecting(struct rpc_xprt *xprt, 1978 struct socket *sock) 1979 { 1980 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, 1981 xprt); 1982 1983 if (!transport->inet) { 1984 struct sock *sk = sock->sk; 1985 1986 lock_sock(sk); 1987 1988 xs_save_old_callbacks(transport, sk); 1989 1990 sk->sk_user_data = xprt; 1991 sk->sk_data_ready = xs_data_ready; 1992 sk->sk_write_space = xs_udp_write_space; 1993 sk->sk_state_change = xs_local_state_change; 1994 sk->sk_error_report = xs_error_report; 1995 sk->sk_use_task_frag = false; 1996 1997 xprt_clear_connected(xprt); 1998 1999 /* Reset to new socket */ 2000 transport->sock = sock; 2001 transport->inet = sk; 2002 2003 release_sock(sk); 2004 } 2005 2006 xs_stream_start_connect(transport); 2007 2008 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); 2009 } 2010 2011 /** 2012 * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint 2013 * @transport: socket transport to connect 2014 */ 2015 static int xs_local_setup_socket(struct sock_xprt *transport) 2016 { 2017 struct rpc_xprt *xprt = &transport->xprt; 2018 struct file *filp; 2019 struct socket *sock; 2020 int status; 2021 2022 status = __sock_create(xprt->xprt_net, AF_LOCAL, 2023 SOCK_STREAM, 0, &sock, 1); 2024 if (status < 0) { 2025 dprintk("RPC: can't create AF_LOCAL " 2026 "transport socket (%d).\n", -status); 2027 goto out; 2028 } 2029 xs_reclassify_socket(AF_LOCAL, sock); 2030 2031 filp = sock_alloc_file(sock, O_NONBLOCK, NULL); 2032 if (IS_ERR(filp)) { 2033 status = PTR_ERR(filp); 2034 goto out; 2035 } 2036 transport->file = filp; 2037 2038 dprintk("RPC: worker connecting xprt %p via AF_LOCAL to %s\n", 2039 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 2040 2041 status = xs_local_finish_connecting(xprt, sock); 2042 trace_rpc_socket_connect(xprt, sock, status); 2043 switch (status) { 2044 case 0: 2045 dprintk("RPC: xprt %p connected to %s\n", 2046 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 2047 xprt->stat.connect_count++; 2048 xprt->stat.connect_time += (long)jiffies - 2049 xprt->stat.connect_start; 2050 xprt_set_connected(xprt); 2051 break; 2052 case -ENOBUFS: 2053 break; 2054 case -ENOENT: 2055 dprintk("RPC: xprt %p: socket %s does not exist\n", 2056 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 2057 break; 2058 case -ECONNREFUSED: 2059 dprintk("RPC: xprt %p: connection refused for %s\n", 2060 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 2061 break; 2062 default: 2063 printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n", 2064 __func__, -status, 2065 xprt->address_strings[RPC_DISPLAY_ADDR]); 2066 } 2067 2068 out: 2069 xprt_clear_connecting(xprt); 2070 xprt_wake_pending_tasks(xprt, status); 2071 return status; 2072 } 2073 2074 static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task) 2075 { 2076 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2077 int ret; 2078 2079 if (transport->file) 2080 goto force_disconnect; 2081 2082 if (RPC_IS_ASYNC(task)) { 2083 /* 2084 * We want the AF_LOCAL connect to be resolved in the 2085 * filesystem namespace of the process making the rpc 2086 * call. Thus we connect synchronously. 2087 * 2088 * If we want to support asynchronous AF_LOCAL calls, 2089 * we'll need to figure out how to pass a namespace to 2090 * connect. 2091 */ 2092 rpc_task_set_rpc_status(task, -ENOTCONN); 2093 goto out_wake; 2094 } 2095 ret = xs_local_setup_socket(transport); 2096 if (ret && !RPC_IS_SOFTCONN(task)) 2097 msleep_interruptible(15000); 2098 return; 2099 force_disconnect: 2100 xprt_force_disconnect(xprt); 2101 out_wake: 2102 xprt_clear_connecting(xprt); 2103 xprt_wake_pending_tasks(xprt, -ENOTCONN); 2104 } 2105 2106 #if IS_ENABLED(CONFIG_SUNRPC_SWAP) 2107 /* 2108 * Note that this should be called with XPRT_LOCKED held, or recv_mutex 2109 * held, or when we otherwise know that we have exclusive access to the 2110 * socket, to guard against races with xs_reset_transport. 2111 */ 2112 static void xs_set_memalloc(struct rpc_xprt *xprt) 2113 { 2114 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, 2115 xprt); 2116 2117 /* 2118 * If there's no sock, then we have nothing to set. The 2119 * reconnecting process will get it for us. 2120 */ 2121 if (!transport->inet) 2122 return; 2123 if (atomic_read(&xprt->swapper)) 2124 sk_set_memalloc(transport->inet); 2125 } 2126 2127 /** 2128 * xs_enable_swap - Tag this transport as being used for swap. 2129 * @xprt: transport to tag 2130 * 2131 * Take a reference to this transport on behalf of the rpc_clnt, and 2132 * optionally mark it for swapping if it wasn't already. 2133 */ 2134 static int 2135 xs_enable_swap(struct rpc_xprt *xprt) 2136 { 2137 struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt); 2138 2139 mutex_lock(&xs->recv_mutex); 2140 if (atomic_inc_return(&xprt->swapper) == 1 && 2141 xs->inet) 2142 sk_set_memalloc(xs->inet); 2143 mutex_unlock(&xs->recv_mutex); 2144 return 0; 2145 } 2146 2147 /** 2148 * xs_disable_swap - Untag this transport as being used for swap. 2149 * @xprt: transport to tag 2150 * 2151 * Drop a "swapper" reference to this xprt on behalf of the rpc_clnt. If the 2152 * swapper refcount goes to 0, untag the socket as a memalloc socket. 2153 */ 2154 static void 2155 xs_disable_swap(struct rpc_xprt *xprt) 2156 { 2157 struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt); 2158 2159 mutex_lock(&xs->recv_mutex); 2160 if (atomic_dec_and_test(&xprt->swapper) && 2161 xs->inet) 2162 sk_clear_memalloc(xs->inet); 2163 mutex_unlock(&xs->recv_mutex); 2164 } 2165 #else 2166 static void xs_set_memalloc(struct rpc_xprt *xprt) 2167 { 2168 } 2169 2170 static int 2171 xs_enable_swap(struct rpc_xprt *xprt) 2172 { 2173 return -EINVAL; 2174 } 2175 2176 static void 2177 xs_disable_swap(struct rpc_xprt *xprt) 2178 { 2179 } 2180 #endif 2181 2182 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 2183 { 2184 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2185 2186 if (!transport->inet) { 2187 struct sock *sk = sock->sk; 2188 2189 lock_sock(sk); 2190 2191 xs_save_old_callbacks(transport, sk); 2192 2193 sk->sk_user_data = xprt; 2194 sk->sk_data_ready = xs_data_ready; 2195 sk->sk_write_space = xs_udp_write_space; 2196 sk->sk_use_task_frag = false; 2197 2198 xprt_set_connected(xprt); 2199 2200 /* Reset to new socket */ 2201 transport->sock = sock; 2202 transport->inet = sk; 2203 2204 xs_set_memalloc(xprt); 2205 2206 release_sock(sk); 2207 } 2208 xs_udp_do_set_buffer_size(xprt); 2209 2210 xprt->stat.connect_start = jiffies; 2211 } 2212 2213 static void xs_udp_setup_socket(struct work_struct *work) 2214 { 2215 struct sock_xprt *transport = 2216 container_of(work, struct sock_xprt, connect_worker.work); 2217 struct rpc_xprt *xprt = &transport->xprt; 2218 struct socket *sock; 2219 int status = -EIO; 2220 unsigned int pflags = current->flags; 2221 2222 if (atomic_read(&xprt->swapper)) 2223 current->flags |= PF_MEMALLOC; 2224 sock = xs_create_sock(xprt, transport, 2225 xs_addr(xprt)->sa_family, SOCK_DGRAM, 2226 IPPROTO_UDP, false); 2227 if (IS_ERR(sock)) 2228 goto out; 2229 2230 dprintk("RPC: worker connecting xprt %p via %s to " 2231 "%s (port %s)\n", xprt, 2232 xprt->address_strings[RPC_DISPLAY_PROTO], 2233 xprt->address_strings[RPC_DISPLAY_ADDR], 2234 xprt->address_strings[RPC_DISPLAY_PORT]); 2235 2236 xs_udp_finish_connecting(xprt, sock); 2237 trace_rpc_socket_connect(xprt, sock, 0); 2238 status = 0; 2239 out: 2240 xprt_clear_connecting(xprt); 2241 xprt_unlock_connect(xprt, transport); 2242 xprt_wake_pending_tasks(xprt, status); 2243 current_restore_flags(pflags, PF_MEMALLOC); 2244 } 2245 2246 /** 2247 * xs_tcp_shutdown - gracefully shut down a TCP socket 2248 * @xprt: transport 2249 * 2250 * Initiates a graceful shutdown of the TCP socket by calling the 2251 * equivalent of shutdown(SHUT_RDWR); 2252 */ 2253 static void xs_tcp_shutdown(struct rpc_xprt *xprt) 2254 { 2255 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2256 struct socket *sock = transport->sock; 2257 int skst = transport->inet ? transport->inet->sk_state : TCP_CLOSE; 2258 2259 if (sock == NULL) 2260 return; 2261 if (!xprt->reuseport) { 2262 xs_close(xprt); 2263 return; 2264 } 2265 switch (skst) { 2266 case TCP_FIN_WAIT1: 2267 case TCP_FIN_WAIT2: 2268 case TCP_LAST_ACK: 2269 break; 2270 case TCP_ESTABLISHED: 2271 case TCP_CLOSE_WAIT: 2272 kernel_sock_shutdown(sock, SHUT_RDWR); 2273 trace_rpc_socket_shutdown(xprt, sock); 2274 break; 2275 default: 2276 xs_reset_transport(transport); 2277 } 2278 } 2279 2280 static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, 2281 struct socket *sock) 2282 { 2283 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2284 struct net *net = sock_net(sock->sk); 2285 unsigned long connect_timeout; 2286 unsigned long syn_retries; 2287 unsigned int keepidle; 2288 unsigned int keepcnt; 2289 unsigned int timeo; 2290 unsigned long t; 2291 2292 spin_lock(&xprt->transport_lock); 2293 keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ); 2294 keepcnt = xprt->timeout->to_retries + 1; 2295 timeo = jiffies_to_msecs(xprt->timeout->to_initval) * 2296 (xprt->timeout->to_retries + 1); 2297 clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); 2298 spin_unlock(&xprt->transport_lock); 2299 2300 /* TCP Keepalive options */ 2301 sock_set_keepalive(sock->sk); 2302 tcp_sock_set_keepidle(sock->sk, keepidle); 2303 tcp_sock_set_keepintvl(sock->sk, keepidle); 2304 tcp_sock_set_keepcnt(sock->sk, keepcnt); 2305 2306 /* TCP user timeout (see RFC5482) */ 2307 tcp_sock_set_user_timeout(sock->sk, timeo); 2308 2309 /* Connect timeout */ 2310 connect_timeout = max_t(unsigned long, 2311 DIV_ROUND_UP(xprt->connect_timeout, HZ), 1); 2312 syn_retries = max_t(unsigned long, 2313 READ_ONCE(net->ipv4.sysctl_tcp_syn_retries), 1); 2314 for (t = 0; t <= syn_retries && (1UL << t) < connect_timeout; t++) 2315 ; 2316 if (t <= syn_retries) 2317 tcp_sock_set_syncnt(sock->sk, t - 1); 2318 } 2319 2320 static void xs_tcp_do_set_connect_timeout(struct rpc_xprt *xprt, 2321 unsigned long connect_timeout) 2322 { 2323 struct sock_xprt *transport = 2324 container_of(xprt, struct sock_xprt, xprt); 2325 struct rpc_timeout to; 2326 unsigned long initval; 2327 2328 memcpy(&to, xprt->timeout, sizeof(to)); 2329 /* Arbitrary lower limit */ 2330 initval = max_t(unsigned long, connect_timeout, XS_TCP_INIT_REEST_TO); 2331 to.to_initval = initval; 2332 to.to_maxval = initval; 2333 to.to_retries = 0; 2334 memcpy(&transport->tcp_timeout, &to, sizeof(transport->tcp_timeout)); 2335 xprt->timeout = &transport->tcp_timeout; 2336 xprt->connect_timeout = connect_timeout; 2337 } 2338 2339 static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt, 2340 unsigned long connect_timeout, 2341 unsigned long reconnect_timeout) 2342 { 2343 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2344 2345 spin_lock(&xprt->transport_lock); 2346 if (reconnect_timeout < xprt->max_reconnect_timeout) 2347 xprt->max_reconnect_timeout = reconnect_timeout; 2348 if (connect_timeout < xprt->connect_timeout) 2349 xs_tcp_do_set_connect_timeout(xprt, connect_timeout); 2350 set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); 2351 spin_unlock(&xprt->transport_lock); 2352 } 2353 2354 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 2355 { 2356 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2357 2358 if (!transport->inet) { 2359 struct sock *sk = sock->sk; 2360 2361 /* Avoid temporary address, they are bad for long-lived 2362 * connections such as NFS mounts. 2363 * RFC4941, section 3.6 suggests that: 2364 * Individual applications, which have specific 2365 * knowledge about the normal duration of connections, 2366 * MAY override this as appropriate. 2367 */ 2368 if (xs_addr(xprt)->sa_family == PF_INET6) { 2369 ip6_sock_set_addr_preferences(sk, 2370 IPV6_PREFER_SRC_PUBLIC); 2371 } 2372 2373 xs_tcp_set_socket_timeouts(xprt, sock); 2374 tcp_sock_set_nodelay(sk); 2375 2376 lock_sock(sk); 2377 2378 xs_save_old_callbacks(transport, sk); 2379 2380 sk->sk_user_data = xprt; 2381 sk->sk_data_ready = xs_data_ready; 2382 sk->sk_state_change = xs_tcp_state_change; 2383 sk->sk_write_space = xs_tcp_write_space; 2384 sk->sk_error_report = xs_error_report; 2385 sk->sk_use_task_frag = false; 2386 2387 /* socket options */ 2388 sock_reset_flag(sk, SOCK_LINGER); 2389 2390 xprt_clear_connected(xprt); 2391 2392 /* Reset to new socket */ 2393 transport->sock = sock; 2394 transport->inet = sk; 2395 2396 release_sock(sk); 2397 } 2398 2399 if (!xprt_bound(xprt)) 2400 return -ENOTCONN; 2401 2402 xs_set_memalloc(xprt); 2403 2404 xs_stream_start_connect(transport); 2405 2406 /* Tell the socket layer to start connecting... */ 2407 set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); 2408 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); 2409 } 2410 2411 /** 2412 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint 2413 * @work: queued work item 2414 * 2415 * Invoked by a work queue tasklet. 2416 */ 2417 static void xs_tcp_setup_socket(struct work_struct *work) 2418 { 2419 struct sock_xprt *transport = 2420 container_of(work, struct sock_xprt, connect_worker.work); 2421 struct socket *sock = transport->sock; 2422 struct rpc_xprt *xprt = &transport->xprt; 2423 int status; 2424 unsigned int pflags = current->flags; 2425 2426 if (atomic_read(&xprt->swapper)) 2427 current->flags |= PF_MEMALLOC; 2428 2429 if (xprt_connected(xprt)) 2430 goto out; 2431 if (test_and_clear_bit(XPRT_SOCK_CONNECT_SENT, 2432 &transport->sock_state) || 2433 !sock) { 2434 xs_reset_transport(transport); 2435 sock = xs_create_sock(xprt, transport, xs_addr(xprt)->sa_family, 2436 SOCK_STREAM, IPPROTO_TCP, true); 2437 if (IS_ERR(sock)) { 2438 xprt_wake_pending_tasks(xprt, PTR_ERR(sock)); 2439 goto out; 2440 } 2441 } 2442 2443 dprintk("RPC: worker connecting xprt %p via %s to " 2444 "%s (port %s)\n", xprt, 2445 xprt->address_strings[RPC_DISPLAY_PROTO], 2446 xprt->address_strings[RPC_DISPLAY_ADDR], 2447 xprt->address_strings[RPC_DISPLAY_PORT]); 2448 2449 status = xs_tcp_finish_connecting(xprt, sock); 2450 trace_rpc_socket_connect(xprt, sock, status); 2451 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 2452 xprt, -status, xprt_connected(xprt), 2453 sock->sk->sk_state); 2454 switch (status) { 2455 case 0: 2456 case -EINPROGRESS: 2457 /* SYN_SENT! */ 2458 set_bit(XPRT_SOCK_CONNECT_SENT, &transport->sock_state); 2459 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 2460 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2461 fallthrough; 2462 case -EALREADY: 2463 goto out_unlock; 2464 case -EADDRNOTAVAIL: 2465 /* Source port number is unavailable. Try a new one! */ 2466 transport->srcport = 0; 2467 status = -EAGAIN; 2468 break; 2469 case -EPERM: 2470 /* Happens, for instance, if a BPF program is preventing 2471 * the connect. Remap the error so upper layers can better 2472 * deal with it. 2473 */ 2474 status = -ECONNREFUSED; 2475 fallthrough; 2476 case -EINVAL: 2477 /* Happens, for instance, if the user specified a link 2478 * local IPv6 address without a scope-id. 2479 */ 2480 case -ECONNREFUSED: 2481 case -ECONNRESET: 2482 case -ENETDOWN: 2483 case -ENETUNREACH: 2484 case -EHOSTUNREACH: 2485 case -EADDRINUSE: 2486 case -ENOBUFS: 2487 case -ENOTCONN: 2488 break; 2489 default: 2490 printk("%s: connect returned unhandled error %d\n", 2491 __func__, status); 2492 status = -EAGAIN; 2493 } 2494 2495 /* xs_tcp_force_close() wakes tasks with a fixed error code. 2496 * We need to wake them first to ensure the correct error code. 2497 */ 2498 xprt_wake_pending_tasks(xprt, status); 2499 xs_tcp_force_close(xprt); 2500 out: 2501 xprt_clear_connecting(xprt); 2502 out_unlock: 2503 xprt_unlock_connect(xprt, transport); 2504 current_restore_flags(pflags, PF_MEMALLOC); 2505 } 2506 2507 /* 2508 * Transfer the connected socket to @upper_transport, then mark that 2509 * xprt CONNECTED. 2510 */ 2511 static int xs_tcp_tls_finish_connecting(struct rpc_xprt *lower_xprt, 2512 struct sock_xprt *upper_transport) 2513 { 2514 struct sock_xprt *lower_transport = 2515 container_of(lower_xprt, struct sock_xprt, xprt); 2516 struct rpc_xprt *upper_xprt = &upper_transport->xprt; 2517 2518 if (!upper_transport->inet) { 2519 struct socket *sock = lower_transport->sock; 2520 struct sock *sk = sock->sk; 2521 2522 /* Avoid temporary address, they are bad for long-lived 2523 * connections such as NFS mounts. 2524 * RFC4941, section 3.6 suggests that: 2525 * Individual applications, which have specific 2526 * knowledge about the normal duration of connections, 2527 * MAY override this as appropriate. 2528 */ 2529 if (xs_addr(upper_xprt)->sa_family == PF_INET6) 2530 ip6_sock_set_addr_preferences(sk, IPV6_PREFER_SRC_PUBLIC); 2531 2532 xs_tcp_set_socket_timeouts(upper_xprt, sock); 2533 tcp_sock_set_nodelay(sk); 2534 2535 lock_sock(sk); 2536 2537 /* @sk is already connected, so it now has the RPC callbacks. 2538 * Reach into @lower_transport to save the original ones. 2539 */ 2540 upper_transport->old_data_ready = lower_transport->old_data_ready; 2541 upper_transport->old_state_change = lower_transport->old_state_change; 2542 upper_transport->old_write_space = lower_transport->old_write_space; 2543 upper_transport->old_error_report = lower_transport->old_error_report; 2544 sk->sk_user_data = upper_xprt; 2545 2546 /* socket options */ 2547 sock_reset_flag(sk, SOCK_LINGER); 2548 2549 xprt_clear_connected(upper_xprt); 2550 2551 upper_transport->sock = sock; 2552 upper_transport->inet = sk; 2553 upper_transport->file = lower_transport->file; 2554 2555 release_sock(sk); 2556 2557 /* Reset lower_transport before shutting down its clnt */ 2558 mutex_lock(&lower_transport->recv_mutex); 2559 lower_transport->inet = NULL; 2560 lower_transport->sock = NULL; 2561 lower_transport->file = NULL; 2562 2563 xprt_clear_connected(lower_xprt); 2564 xs_sock_reset_connection_flags(lower_xprt); 2565 xs_stream_reset_connect(lower_transport); 2566 mutex_unlock(&lower_transport->recv_mutex); 2567 } 2568 2569 if (!xprt_bound(upper_xprt)) 2570 return -ENOTCONN; 2571 2572 xs_set_memalloc(upper_xprt); 2573 2574 if (!xprt_test_and_set_connected(upper_xprt)) { 2575 upper_xprt->connect_cookie++; 2576 clear_bit(XPRT_SOCK_CONNECTING, &upper_transport->sock_state); 2577 xprt_clear_connecting(upper_xprt); 2578 2579 upper_xprt->stat.connect_count++; 2580 upper_xprt->stat.connect_time += (long)jiffies - 2581 upper_xprt->stat.connect_start; 2582 xs_run_error_worker(upper_transport, XPRT_SOCK_WAKE_PENDING); 2583 } 2584 return 0; 2585 } 2586 2587 /** 2588 * xs_tls_handshake_done - TLS handshake completion handler 2589 * @data: address of xprt to wake 2590 * @status: status of handshake 2591 * @peerid: serial number of key containing the remote's identity 2592 * 2593 */ 2594 static void xs_tls_handshake_done(void *data, int status, key_serial_t peerid) 2595 { 2596 struct rpc_xprt *lower_xprt = data; 2597 struct sock_xprt *lower_transport = 2598 container_of(lower_xprt, struct sock_xprt, xprt); 2599 2600 switch (status) { 2601 case 0: 2602 case -EACCES: 2603 case -ETIMEDOUT: 2604 lower_transport->xprt_err = status; 2605 break; 2606 default: 2607 lower_transport->xprt_err = -EACCES; 2608 } 2609 complete(&lower_transport->handshake_done); 2610 xprt_put(lower_xprt); 2611 } 2612 2613 static int xs_tls_handshake_sync(struct rpc_xprt *lower_xprt, struct xprtsec_parms *xprtsec) 2614 { 2615 struct sock_xprt *lower_transport = 2616 container_of(lower_xprt, struct sock_xprt, xprt); 2617 struct tls_handshake_args args = { 2618 .ta_sock = lower_transport->sock, 2619 .ta_done = xs_tls_handshake_done, 2620 .ta_data = xprt_get(lower_xprt), 2621 .ta_peername = lower_xprt->servername, 2622 }; 2623 struct sock *sk = lower_transport->inet; 2624 int rc; 2625 2626 init_completion(&lower_transport->handshake_done); 2627 set_bit(XPRT_SOCK_IGNORE_RECV, &lower_transport->sock_state); 2628 lower_transport->xprt_err = -ETIMEDOUT; 2629 switch (xprtsec->policy) { 2630 case RPC_XPRTSEC_TLS_ANON: 2631 rc = tls_client_hello_anon(&args, GFP_KERNEL); 2632 if (rc) 2633 goto out_put_xprt; 2634 break; 2635 case RPC_XPRTSEC_TLS_X509: 2636 args.ta_my_cert = xprtsec->cert_serial; 2637 args.ta_my_privkey = xprtsec->privkey_serial; 2638 rc = tls_client_hello_x509(&args, GFP_KERNEL); 2639 if (rc) 2640 goto out_put_xprt; 2641 break; 2642 default: 2643 rc = -EACCES; 2644 goto out_put_xprt; 2645 } 2646 2647 rc = wait_for_completion_interruptible_timeout(&lower_transport->handshake_done, 2648 XS_TLS_HANDSHAKE_TO); 2649 if (rc <= 0) { 2650 tls_handshake_cancel(sk); 2651 if (rc == 0) 2652 rc = -ETIMEDOUT; 2653 goto out_put_xprt; 2654 } 2655 2656 rc = lower_transport->xprt_err; 2657 2658 out: 2659 xs_stream_reset_connect(lower_transport); 2660 clear_bit(XPRT_SOCK_IGNORE_RECV, &lower_transport->sock_state); 2661 return rc; 2662 2663 out_put_xprt: 2664 xprt_put(lower_xprt); 2665 goto out; 2666 } 2667 2668 /** 2669 * xs_tcp_tls_setup_socket - establish a TLS session on a TCP socket 2670 * @work: queued work item 2671 * 2672 * Invoked by a work queue tasklet. 2673 * 2674 * For RPC-with-TLS, there is a two-stage connection process. 2675 * 2676 * The "upper-layer xprt" is visible to the RPC consumer. Once it has 2677 * been marked connected, the consumer knows that a TCP connection and 2678 * a TLS session have been established. 2679 * 2680 * A "lower-layer xprt", created in this function, handles the mechanics 2681 * of connecting the TCP socket, performing the RPC_AUTH_TLS probe, and 2682 * then driving the TLS handshake. Once all that is complete, the upper 2683 * layer xprt is marked connected. 2684 */ 2685 static void xs_tcp_tls_setup_socket(struct work_struct *work) 2686 { 2687 struct sock_xprt *upper_transport = 2688 container_of(work, struct sock_xprt, connect_worker.work); 2689 struct rpc_clnt *upper_clnt = upper_transport->clnt; 2690 struct rpc_xprt *upper_xprt = &upper_transport->xprt; 2691 struct rpc_create_args args = { 2692 .net = upper_xprt->xprt_net, 2693 .protocol = upper_xprt->prot, 2694 .address = (struct sockaddr *)&upper_xprt->addr, 2695 .addrsize = upper_xprt->addrlen, 2696 .timeout = upper_clnt->cl_timeout, 2697 .servername = upper_xprt->servername, 2698 .program = upper_clnt->cl_program, 2699 .prognumber = upper_clnt->cl_prog, 2700 .version = upper_clnt->cl_vers, 2701 .authflavor = RPC_AUTH_TLS, 2702 .cred = upper_clnt->cl_cred, 2703 .xprtsec = { 2704 .policy = RPC_XPRTSEC_NONE, 2705 }, 2706 .stats = upper_clnt->cl_stats, 2707 }; 2708 unsigned int pflags = current->flags; 2709 struct rpc_clnt *lower_clnt; 2710 struct rpc_xprt *lower_xprt; 2711 int status; 2712 2713 if (atomic_read(&upper_xprt->swapper)) 2714 current->flags |= PF_MEMALLOC; 2715 2716 xs_stream_start_connect(upper_transport); 2717 2718 /* This implicitly sends an RPC_AUTH_TLS probe */ 2719 lower_clnt = rpc_create(&args); 2720 if (IS_ERR(lower_clnt)) { 2721 trace_rpc_tls_unavailable(upper_clnt, upper_xprt); 2722 clear_bit(XPRT_SOCK_CONNECTING, &upper_transport->sock_state); 2723 xprt_clear_connecting(upper_xprt); 2724 xprt_wake_pending_tasks(upper_xprt, PTR_ERR(lower_clnt)); 2725 xs_run_error_worker(upper_transport, XPRT_SOCK_WAKE_PENDING); 2726 goto out_unlock; 2727 } 2728 2729 /* RPC_AUTH_TLS probe was successful. Try a TLS handshake on 2730 * the lower xprt. 2731 */ 2732 rcu_read_lock(); 2733 lower_xprt = rcu_dereference(lower_clnt->cl_xprt); 2734 rcu_read_unlock(); 2735 2736 if (wait_on_bit_lock(&lower_xprt->state, XPRT_LOCKED, TASK_KILLABLE)) 2737 goto out_unlock; 2738 2739 status = xs_tls_handshake_sync(lower_xprt, &upper_xprt->xprtsec); 2740 if (status) { 2741 trace_rpc_tls_not_started(upper_clnt, upper_xprt); 2742 goto out_close; 2743 } 2744 2745 status = xs_tcp_tls_finish_connecting(lower_xprt, upper_transport); 2746 if (status) 2747 goto out_close; 2748 xprt_release_write(lower_xprt, NULL); 2749 trace_rpc_socket_connect(upper_xprt, upper_transport->sock, 0); 2750 rpc_shutdown_client(lower_clnt); 2751 2752 /* Check for ingress data that arrived before the socket's 2753 * ->data_ready callback was set up. 2754 */ 2755 xs_poll_check_readable(upper_transport); 2756 2757 out_unlock: 2758 current_restore_flags(pflags, PF_MEMALLOC); 2759 upper_transport->clnt = NULL; 2760 xprt_unlock_connect(upper_xprt, upper_transport); 2761 return; 2762 2763 out_close: 2764 xprt_release_write(lower_xprt, NULL); 2765 rpc_shutdown_client(lower_clnt); 2766 2767 /* xprt_force_disconnect() wakes tasks with a fixed tk_status code. 2768 * Wake them first here to ensure they get our tk_status code. 2769 */ 2770 xprt_wake_pending_tasks(upper_xprt, status); 2771 xs_tcp_force_close(upper_xprt); 2772 xprt_clear_connecting(upper_xprt); 2773 goto out_unlock; 2774 } 2775 2776 /** 2777 * xs_connect - connect a socket to a remote endpoint 2778 * @xprt: pointer to transport structure 2779 * @task: address of RPC task that manages state of connect request 2780 * 2781 * TCP: If the remote end dropped the connection, delay reconnecting. 2782 * 2783 * UDP socket connects are synchronous, but we use a work queue anyway 2784 * to guarantee that even unprivileged user processes can set up a 2785 * socket on a privileged port. 2786 * 2787 * If a UDP socket connect fails, the delay behavior here prevents 2788 * retry floods (hard mounts). 2789 */ 2790 static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) 2791 { 2792 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2793 unsigned long delay = 0; 2794 2795 WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport)); 2796 2797 if (transport->sock != NULL) { 2798 dprintk("RPC: xs_connect delayed xprt %p for %lu " 2799 "seconds\n", xprt, xprt->reestablish_timeout / HZ); 2800 2801 delay = xprt_reconnect_delay(xprt); 2802 xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO); 2803 2804 } else 2805 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); 2806 2807 transport->clnt = task->tk_client; 2808 queue_delayed_work(xprtiod_workqueue, 2809 &transport->connect_worker, 2810 delay); 2811 } 2812 2813 static void xs_wake_disconnect(struct sock_xprt *transport) 2814 { 2815 if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state)) 2816 xs_tcp_force_close(&transport->xprt); 2817 } 2818 2819 static void xs_wake_write(struct sock_xprt *transport) 2820 { 2821 if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state)) 2822 xprt_write_space(&transport->xprt); 2823 } 2824 2825 static void xs_wake_error(struct sock_xprt *transport) 2826 { 2827 int sockerr; 2828 2829 if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state)) 2830 return; 2831 sockerr = xchg(&transport->xprt_err, 0); 2832 if (sockerr < 0) { 2833 xprt_wake_pending_tasks(&transport->xprt, sockerr); 2834 xs_tcp_force_close(&transport->xprt); 2835 } 2836 } 2837 2838 static void xs_wake_pending(struct sock_xprt *transport) 2839 { 2840 if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state)) 2841 xprt_wake_pending_tasks(&transport->xprt, -EAGAIN); 2842 } 2843 2844 static void xs_error_handle(struct work_struct *work) 2845 { 2846 struct sock_xprt *transport = container_of(work, 2847 struct sock_xprt, error_worker); 2848 2849 xs_wake_disconnect(transport); 2850 xs_wake_write(transport); 2851 xs_wake_error(transport); 2852 xs_wake_pending(transport); 2853 } 2854 2855 /** 2856 * xs_local_print_stats - display AF_LOCAL socket-specific stats 2857 * @xprt: rpc_xprt struct containing statistics 2858 * @seq: output file 2859 * 2860 */ 2861 static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2862 { 2863 long idle_time = 0; 2864 2865 if (xprt_connected(xprt)) 2866 idle_time = (long)(jiffies - xprt->last_used) / HZ; 2867 2868 seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu " 2869 "%llu %llu %lu %llu %llu\n", 2870 xprt->stat.bind_count, 2871 xprt->stat.connect_count, 2872 xprt->stat.connect_time / HZ, 2873 idle_time, 2874 xprt->stat.sends, 2875 xprt->stat.recvs, 2876 xprt->stat.bad_xids, 2877 xprt->stat.req_u, 2878 xprt->stat.bklog_u, 2879 xprt->stat.max_slots, 2880 xprt->stat.sending_u, 2881 xprt->stat.pending_u); 2882 } 2883 2884 /** 2885 * xs_udp_print_stats - display UDP socket-specific stats 2886 * @xprt: rpc_xprt struct containing statistics 2887 * @seq: output file 2888 * 2889 */ 2890 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2891 { 2892 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2893 2894 seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %llu %llu " 2895 "%lu %llu %llu\n", 2896 transport->srcport, 2897 xprt->stat.bind_count, 2898 xprt->stat.sends, 2899 xprt->stat.recvs, 2900 xprt->stat.bad_xids, 2901 xprt->stat.req_u, 2902 xprt->stat.bklog_u, 2903 xprt->stat.max_slots, 2904 xprt->stat.sending_u, 2905 xprt->stat.pending_u); 2906 } 2907 2908 /** 2909 * xs_tcp_print_stats - display TCP socket-specific stats 2910 * @xprt: rpc_xprt struct containing statistics 2911 * @seq: output file 2912 * 2913 */ 2914 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2915 { 2916 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2917 long idle_time = 0; 2918 2919 if (xprt_connected(xprt)) 2920 idle_time = (long)(jiffies - xprt->last_used) / HZ; 2921 2922 seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu " 2923 "%llu %llu %lu %llu %llu\n", 2924 transport->srcport, 2925 xprt->stat.bind_count, 2926 xprt->stat.connect_count, 2927 xprt->stat.connect_time / HZ, 2928 idle_time, 2929 xprt->stat.sends, 2930 xprt->stat.recvs, 2931 xprt->stat.bad_xids, 2932 xprt->stat.req_u, 2933 xprt->stat.bklog_u, 2934 xprt->stat.max_slots, 2935 xprt->stat.sending_u, 2936 xprt->stat.pending_u); 2937 } 2938 2939 /* 2940 * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason 2941 * we allocate pages instead doing a kmalloc like rpc_malloc is because we want 2942 * to use the server side send routines. 2943 */ 2944 static int bc_malloc(struct rpc_task *task) 2945 { 2946 struct rpc_rqst *rqst = task->tk_rqstp; 2947 size_t size = rqst->rq_callsize; 2948 struct page *page; 2949 struct rpc_buffer *buf; 2950 2951 if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) { 2952 WARN_ONCE(1, "xprtsock: large bc buffer request (size %zu)\n", 2953 size); 2954 return -EINVAL; 2955 } 2956 2957 page = alloc_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN); 2958 if (!page) 2959 return -ENOMEM; 2960 2961 buf = page_address(page); 2962 buf->len = PAGE_SIZE; 2963 2964 rqst->rq_buffer = buf->data; 2965 rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize; 2966 return 0; 2967 } 2968 2969 /* 2970 * Free the space allocated in the bc_alloc routine 2971 */ 2972 static void bc_free(struct rpc_task *task) 2973 { 2974 void *buffer = task->tk_rqstp->rq_buffer; 2975 struct rpc_buffer *buf; 2976 2977 buf = container_of(buffer, struct rpc_buffer, data); 2978 free_page((unsigned long)buf); 2979 } 2980 2981 static int bc_sendto(struct rpc_rqst *req) 2982 { 2983 struct xdr_buf *xdr = &req->rq_snd_buf; 2984 struct sock_xprt *transport = 2985 container_of(req->rq_xprt, struct sock_xprt, xprt); 2986 struct msghdr msg = { 2987 .msg_flags = 0, 2988 }; 2989 rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | 2990 (u32)xdr->len); 2991 unsigned int sent = 0; 2992 int err; 2993 2994 req->rq_xtime = ktime_get(); 2995 err = xdr_alloc_bvec(xdr, rpc_task_gfp_mask()); 2996 if (err < 0) 2997 return err; 2998 err = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, marker, &sent); 2999 xdr_free_bvec(xdr); 3000 if (err < 0 || sent != (xdr->len + sizeof(marker))) 3001 return -EAGAIN; 3002 return sent; 3003 } 3004 3005 /** 3006 * bc_send_request - Send a backchannel Call on a TCP socket 3007 * @req: rpc_rqst containing Call message to be sent 3008 * 3009 * xpt_mutex ensures @rqstp's whole message is written to the socket 3010 * without interruption. 3011 * 3012 * Return values: 3013 * %0 if the message was sent successfully 3014 * %ENOTCONN if the message was not sent 3015 */ 3016 static int bc_send_request(struct rpc_rqst *req) 3017 { 3018 struct svc_xprt *xprt; 3019 int len; 3020 3021 /* 3022 * Get the server socket associated with this callback xprt 3023 */ 3024 xprt = req->rq_xprt->bc_xprt; 3025 3026 /* 3027 * Grab the mutex to serialize data as the connection is shared 3028 * with the fore channel 3029 */ 3030 mutex_lock(&xprt->xpt_mutex); 3031 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 3032 len = -ENOTCONN; 3033 else 3034 len = bc_sendto(req); 3035 mutex_unlock(&xprt->xpt_mutex); 3036 3037 if (len > 0) 3038 len = 0; 3039 3040 return len; 3041 } 3042 3043 static void bc_close(struct rpc_xprt *xprt) 3044 { 3045 xprt_disconnect_done(xprt); 3046 } 3047 3048 static void bc_destroy(struct rpc_xprt *xprt) 3049 { 3050 dprintk("RPC: bc_destroy xprt %p\n", xprt); 3051 3052 xs_xprt_free(xprt); 3053 module_put(THIS_MODULE); 3054 } 3055 3056 static const struct rpc_xprt_ops xs_local_ops = { 3057 .reserve_xprt = xprt_reserve_xprt, 3058 .release_xprt = xprt_release_xprt, 3059 .alloc_slot = xprt_alloc_slot, 3060 .free_slot = xprt_free_slot, 3061 .rpcbind = xs_local_rpcbind, 3062 .set_port = xs_local_set_port, 3063 .connect = xs_local_connect, 3064 .buf_alloc = rpc_malloc, 3065 .buf_free = rpc_free, 3066 .prepare_request = xs_stream_prepare_request, 3067 .send_request = xs_local_send_request, 3068 .abort_send_request = xs_stream_abort_send_request, 3069 .wait_for_reply_request = xprt_wait_for_reply_request_def, 3070 .close = xs_close, 3071 .destroy = xs_destroy, 3072 .print_stats = xs_local_print_stats, 3073 .enable_swap = xs_enable_swap, 3074 .disable_swap = xs_disable_swap, 3075 }; 3076 3077 static const struct rpc_xprt_ops xs_udp_ops = { 3078 .set_buffer_size = xs_udp_set_buffer_size, 3079 .reserve_xprt = xprt_reserve_xprt_cong, 3080 .release_xprt = xprt_release_xprt_cong, 3081 .alloc_slot = xprt_alloc_slot, 3082 .free_slot = xprt_free_slot, 3083 .rpcbind = rpcb_getport_async, 3084 .set_port = xs_set_port, 3085 .connect = xs_connect, 3086 .get_srcaddr = xs_sock_srcaddr, 3087 .get_srcport = xs_sock_srcport, 3088 .buf_alloc = rpc_malloc, 3089 .buf_free = rpc_free, 3090 .send_request = xs_udp_send_request, 3091 .wait_for_reply_request = xprt_wait_for_reply_request_rtt, 3092 .timer = xs_udp_timer, 3093 .release_request = xprt_release_rqst_cong, 3094 .close = xs_close, 3095 .destroy = xs_destroy, 3096 .print_stats = xs_udp_print_stats, 3097 .enable_swap = xs_enable_swap, 3098 .disable_swap = xs_disable_swap, 3099 .inject_disconnect = xs_inject_disconnect, 3100 }; 3101 3102 static const struct rpc_xprt_ops xs_tcp_ops = { 3103 .reserve_xprt = xprt_reserve_xprt, 3104 .release_xprt = xprt_release_xprt, 3105 .alloc_slot = xprt_alloc_slot, 3106 .free_slot = xprt_free_slot, 3107 .rpcbind = rpcb_getport_async, 3108 .set_port = xs_set_port, 3109 .connect = xs_connect, 3110 .get_srcaddr = xs_sock_srcaddr, 3111 .get_srcport = xs_sock_srcport, 3112 .buf_alloc = rpc_malloc, 3113 .buf_free = rpc_free, 3114 .prepare_request = xs_stream_prepare_request, 3115 .send_request = xs_tcp_send_request, 3116 .abort_send_request = xs_stream_abort_send_request, 3117 .wait_for_reply_request = xprt_wait_for_reply_request_def, 3118 .close = xs_tcp_shutdown, 3119 .destroy = xs_destroy, 3120 .set_connect_timeout = xs_tcp_set_connect_timeout, 3121 .print_stats = xs_tcp_print_stats, 3122 .enable_swap = xs_enable_swap, 3123 .disable_swap = xs_disable_swap, 3124 .inject_disconnect = xs_inject_disconnect, 3125 #ifdef CONFIG_SUNRPC_BACKCHANNEL 3126 .bc_setup = xprt_setup_bc, 3127 .bc_maxpayload = xs_tcp_bc_maxpayload, 3128 .bc_num_slots = xprt_bc_max_slots, 3129 .bc_free_rqst = xprt_free_bc_rqst, 3130 .bc_destroy = xprt_destroy_bc, 3131 #endif 3132 }; 3133 3134 /* 3135 * The rpc_xprt_ops for the server backchannel 3136 */ 3137 3138 static const struct rpc_xprt_ops bc_tcp_ops = { 3139 .reserve_xprt = xprt_reserve_xprt, 3140 .release_xprt = xprt_release_xprt, 3141 .alloc_slot = xprt_alloc_slot, 3142 .free_slot = xprt_free_slot, 3143 .buf_alloc = bc_malloc, 3144 .buf_free = bc_free, 3145 .send_request = bc_send_request, 3146 .wait_for_reply_request = xprt_wait_for_reply_request_def, 3147 .close = bc_close, 3148 .destroy = bc_destroy, 3149 .print_stats = xs_tcp_print_stats, 3150 .enable_swap = xs_enable_swap, 3151 .disable_swap = xs_disable_swap, 3152 .inject_disconnect = xs_inject_disconnect, 3153 }; 3154 3155 static int xs_init_anyaddr(const int family, struct sockaddr *sap) 3156 { 3157 static const struct sockaddr_in sin = { 3158 .sin_family = AF_INET, 3159 .sin_addr.s_addr = htonl(INADDR_ANY), 3160 }; 3161 static const struct sockaddr_in6 sin6 = { 3162 .sin6_family = AF_INET6, 3163 .sin6_addr = IN6ADDR_ANY_INIT, 3164 }; 3165 3166 switch (family) { 3167 case AF_LOCAL: 3168 break; 3169 case AF_INET: 3170 memcpy(sap, &sin, sizeof(sin)); 3171 break; 3172 case AF_INET6: 3173 memcpy(sap, &sin6, sizeof(sin6)); 3174 break; 3175 default: 3176 dprintk("RPC: %s: Bad address family\n", __func__); 3177 return -EAFNOSUPPORT; 3178 } 3179 return 0; 3180 } 3181 3182 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, 3183 unsigned int slot_table_size, 3184 unsigned int max_slot_table_size) 3185 { 3186 struct rpc_xprt *xprt; 3187 struct sock_xprt *new; 3188 3189 if (args->addrlen > sizeof(xprt->addr)) { 3190 dprintk("RPC: xs_setup_xprt: address too large\n"); 3191 return ERR_PTR(-EBADF); 3192 } 3193 3194 xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size, 3195 max_slot_table_size); 3196 if (xprt == NULL) { 3197 dprintk("RPC: xs_setup_xprt: couldn't allocate " 3198 "rpc_xprt\n"); 3199 return ERR_PTR(-ENOMEM); 3200 } 3201 3202 new = container_of(xprt, struct sock_xprt, xprt); 3203 mutex_init(&new->recv_mutex); 3204 memcpy(&xprt->addr, args->dstaddr, args->addrlen); 3205 xprt->addrlen = args->addrlen; 3206 if (args->srcaddr) 3207 memcpy(&new->srcaddr, args->srcaddr, args->addrlen); 3208 else { 3209 int err; 3210 err = xs_init_anyaddr(args->dstaddr->sa_family, 3211 (struct sockaddr *)&new->srcaddr); 3212 if (err != 0) { 3213 xprt_free(xprt); 3214 return ERR_PTR(err); 3215 } 3216 } 3217 3218 return xprt; 3219 } 3220 3221 static const struct rpc_timeout xs_local_default_timeout = { 3222 .to_initval = 10 * HZ, 3223 .to_maxval = 10 * HZ, 3224 .to_retries = 2, 3225 }; 3226 3227 /** 3228 * xs_setup_local - Set up transport to use an AF_LOCAL socket 3229 * @args: rpc transport creation arguments 3230 * 3231 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP 3232 */ 3233 static struct rpc_xprt *xs_setup_local(struct xprt_create *args) 3234 { 3235 struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr; 3236 struct sock_xprt *transport; 3237 struct rpc_xprt *xprt; 3238 struct rpc_xprt *ret; 3239 3240 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, 3241 xprt_max_tcp_slot_table_entries); 3242 if (IS_ERR(xprt)) 3243 return xprt; 3244 transport = container_of(xprt, struct sock_xprt, xprt); 3245 3246 xprt->prot = 0; 3247 xprt->xprt_class = &xs_local_transport; 3248 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 3249 3250 xprt->bind_timeout = XS_BIND_TO; 3251 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 3252 xprt->idle_timeout = XS_IDLE_DISC_TO; 3253 3254 xprt->ops = &xs_local_ops; 3255 xprt->timeout = &xs_local_default_timeout; 3256 3257 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); 3258 INIT_WORK(&transport->error_worker, xs_error_handle); 3259 INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); 3260 3261 switch (sun->sun_family) { 3262 case AF_LOCAL: 3263 if (sun->sun_path[0] != '/' && sun->sun_path[0] != '\0') { 3264 dprintk("RPC: bad AF_LOCAL address: %s\n", 3265 sun->sun_path); 3266 ret = ERR_PTR(-EINVAL); 3267 goto out_err; 3268 } 3269 xprt_set_bound(xprt); 3270 xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL); 3271 break; 3272 default: 3273 ret = ERR_PTR(-EAFNOSUPPORT); 3274 goto out_err; 3275 } 3276 3277 dprintk("RPC: set up xprt to %s via AF_LOCAL\n", 3278 xprt->address_strings[RPC_DISPLAY_ADDR]); 3279 3280 if (try_module_get(THIS_MODULE)) 3281 return xprt; 3282 ret = ERR_PTR(-EINVAL); 3283 out_err: 3284 xs_xprt_free(xprt); 3285 return ret; 3286 } 3287 3288 static const struct rpc_timeout xs_udp_default_timeout = { 3289 .to_initval = 5 * HZ, 3290 .to_maxval = 30 * HZ, 3291 .to_increment = 5 * HZ, 3292 .to_retries = 5, 3293 }; 3294 3295 /** 3296 * xs_setup_udp - Set up transport to use a UDP socket 3297 * @args: rpc transport creation arguments 3298 * 3299 */ 3300 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) 3301 { 3302 struct sockaddr *addr = args->dstaddr; 3303 struct rpc_xprt *xprt; 3304 struct sock_xprt *transport; 3305 struct rpc_xprt *ret; 3306 3307 xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries, 3308 xprt_udp_slot_table_entries); 3309 if (IS_ERR(xprt)) 3310 return xprt; 3311 transport = container_of(xprt, struct sock_xprt, xprt); 3312 3313 xprt->prot = IPPROTO_UDP; 3314 xprt->xprt_class = &xs_udp_transport; 3315 /* XXX: header size can vary due to auth type, IPv6, etc. */ 3316 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 3317 3318 xprt->bind_timeout = XS_BIND_TO; 3319 xprt->reestablish_timeout = XS_UDP_REEST_TO; 3320 xprt->idle_timeout = XS_IDLE_DISC_TO; 3321 3322 xprt->ops = &xs_udp_ops; 3323 3324 xprt->timeout = &xs_udp_default_timeout; 3325 3326 INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); 3327 INIT_WORK(&transport->error_worker, xs_error_handle); 3328 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); 3329 3330 switch (addr->sa_family) { 3331 case AF_INET: 3332 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 3333 xprt_set_bound(xprt); 3334 3335 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); 3336 break; 3337 case AF_INET6: 3338 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 3339 xprt_set_bound(xprt); 3340 3341 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); 3342 break; 3343 default: 3344 ret = ERR_PTR(-EAFNOSUPPORT); 3345 goto out_err; 3346 } 3347 3348 if (xprt_bound(xprt)) 3349 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 3350 xprt->address_strings[RPC_DISPLAY_ADDR], 3351 xprt->address_strings[RPC_DISPLAY_PORT], 3352 xprt->address_strings[RPC_DISPLAY_PROTO]); 3353 else 3354 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 3355 xprt->address_strings[RPC_DISPLAY_ADDR], 3356 xprt->address_strings[RPC_DISPLAY_PROTO]); 3357 3358 if (try_module_get(THIS_MODULE)) 3359 return xprt; 3360 ret = ERR_PTR(-EINVAL); 3361 out_err: 3362 xs_xprt_free(xprt); 3363 return ret; 3364 } 3365 3366 static const struct rpc_timeout xs_tcp_default_timeout = { 3367 .to_initval = 60 * HZ, 3368 .to_maxval = 60 * HZ, 3369 .to_retries = 2, 3370 }; 3371 3372 /** 3373 * xs_setup_tcp - Set up transport to use a TCP socket 3374 * @args: rpc transport creation arguments 3375 * 3376 */ 3377 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) 3378 { 3379 struct sockaddr *addr = args->dstaddr; 3380 struct rpc_xprt *xprt; 3381 struct sock_xprt *transport; 3382 struct rpc_xprt *ret; 3383 unsigned int max_slot_table_size = xprt_max_tcp_slot_table_entries; 3384 3385 if (args->flags & XPRT_CREATE_INFINITE_SLOTS) 3386 max_slot_table_size = RPC_MAX_SLOT_TABLE_LIMIT; 3387 3388 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, 3389 max_slot_table_size); 3390 if (IS_ERR(xprt)) 3391 return xprt; 3392 transport = container_of(xprt, struct sock_xprt, xprt); 3393 3394 xprt->prot = IPPROTO_TCP; 3395 xprt->xprt_class = &xs_tcp_transport; 3396 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 3397 3398 xprt->bind_timeout = XS_BIND_TO; 3399 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 3400 xprt->idle_timeout = XS_IDLE_DISC_TO; 3401 3402 xprt->ops = &xs_tcp_ops; 3403 xprt->timeout = &xs_tcp_default_timeout; 3404 3405 xprt->max_reconnect_timeout = xprt->timeout->to_maxval; 3406 if (args->reconnect_timeout) 3407 xprt->max_reconnect_timeout = args->reconnect_timeout; 3408 3409 xprt->connect_timeout = xprt->timeout->to_initval * 3410 (xprt->timeout->to_retries + 1); 3411 if (args->connect_timeout) 3412 xs_tcp_do_set_connect_timeout(xprt, args->connect_timeout); 3413 3414 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); 3415 INIT_WORK(&transport->error_worker, xs_error_handle); 3416 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); 3417 3418 switch (addr->sa_family) { 3419 case AF_INET: 3420 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 3421 xprt_set_bound(xprt); 3422 3423 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); 3424 break; 3425 case AF_INET6: 3426 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 3427 xprt_set_bound(xprt); 3428 3429 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); 3430 break; 3431 default: 3432 ret = ERR_PTR(-EAFNOSUPPORT); 3433 goto out_err; 3434 } 3435 3436 if (xprt_bound(xprt)) 3437 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 3438 xprt->address_strings[RPC_DISPLAY_ADDR], 3439 xprt->address_strings[RPC_DISPLAY_PORT], 3440 xprt->address_strings[RPC_DISPLAY_PROTO]); 3441 else 3442 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 3443 xprt->address_strings[RPC_DISPLAY_ADDR], 3444 xprt->address_strings[RPC_DISPLAY_PROTO]); 3445 3446 if (try_module_get(THIS_MODULE)) 3447 return xprt; 3448 ret = ERR_PTR(-EINVAL); 3449 out_err: 3450 xs_xprt_free(xprt); 3451 return ret; 3452 } 3453 3454 /** 3455 * xs_setup_tcp_tls - Set up transport to use a TCP with TLS 3456 * @args: rpc transport creation arguments 3457 * 3458 */ 3459 static struct rpc_xprt *xs_setup_tcp_tls(struct xprt_create *args) 3460 { 3461 struct sockaddr *addr = args->dstaddr; 3462 struct rpc_xprt *xprt; 3463 struct sock_xprt *transport; 3464 struct rpc_xprt *ret; 3465 unsigned int max_slot_table_size = xprt_max_tcp_slot_table_entries; 3466 3467 if (args->flags & XPRT_CREATE_INFINITE_SLOTS) 3468 max_slot_table_size = RPC_MAX_SLOT_TABLE_LIMIT; 3469 3470 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, 3471 max_slot_table_size); 3472 if (IS_ERR(xprt)) 3473 return xprt; 3474 transport = container_of(xprt, struct sock_xprt, xprt); 3475 3476 xprt->prot = IPPROTO_TCP; 3477 xprt->xprt_class = &xs_tcp_transport; 3478 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 3479 3480 xprt->bind_timeout = XS_BIND_TO; 3481 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 3482 xprt->idle_timeout = XS_IDLE_DISC_TO; 3483 3484 xprt->ops = &xs_tcp_ops; 3485 xprt->timeout = &xs_tcp_default_timeout; 3486 3487 xprt->max_reconnect_timeout = xprt->timeout->to_maxval; 3488 xprt->connect_timeout = xprt->timeout->to_initval * 3489 (xprt->timeout->to_retries + 1); 3490 3491 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); 3492 INIT_WORK(&transport->error_worker, xs_error_handle); 3493 3494 switch (args->xprtsec.policy) { 3495 case RPC_XPRTSEC_TLS_ANON: 3496 case RPC_XPRTSEC_TLS_X509: 3497 xprt->xprtsec = args->xprtsec; 3498 INIT_DELAYED_WORK(&transport->connect_worker, 3499 xs_tcp_tls_setup_socket); 3500 break; 3501 default: 3502 ret = ERR_PTR(-EACCES); 3503 goto out_err; 3504 } 3505 3506 switch (addr->sa_family) { 3507 case AF_INET: 3508 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 3509 xprt_set_bound(xprt); 3510 3511 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); 3512 break; 3513 case AF_INET6: 3514 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 3515 xprt_set_bound(xprt); 3516 3517 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); 3518 break; 3519 default: 3520 ret = ERR_PTR(-EAFNOSUPPORT); 3521 goto out_err; 3522 } 3523 3524 if (xprt_bound(xprt)) 3525 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 3526 xprt->address_strings[RPC_DISPLAY_ADDR], 3527 xprt->address_strings[RPC_DISPLAY_PORT], 3528 xprt->address_strings[RPC_DISPLAY_PROTO]); 3529 else 3530 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 3531 xprt->address_strings[RPC_DISPLAY_ADDR], 3532 xprt->address_strings[RPC_DISPLAY_PROTO]); 3533 3534 if (try_module_get(THIS_MODULE)) 3535 return xprt; 3536 ret = ERR_PTR(-EINVAL); 3537 out_err: 3538 xs_xprt_free(xprt); 3539 return ret; 3540 } 3541 3542 /** 3543 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket 3544 * @args: rpc transport creation arguments 3545 * 3546 */ 3547 static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args) 3548 { 3549 struct sockaddr *addr = args->dstaddr; 3550 struct rpc_xprt *xprt; 3551 struct sock_xprt *transport; 3552 struct svc_sock *bc_sock; 3553 struct rpc_xprt *ret; 3554 3555 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, 3556 xprt_tcp_slot_table_entries); 3557 if (IS_ERR(xprt)) 3558 return xprt; 3559 transport = container_of(xprt, struct sock_xprt, xprt); 3560 3561 xprt->prot = IPPROTO_TCP; 3562 xprt->xprt_class = &xs_bc_tcp_transport; 3563 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 3564 xprt->timeout = &xs_tcp_default_timeout; 3565 3566 /* backchannel */ 3567 xprt_set_bound(xprt); 3568 xprt->bind_timeout = 0; 3569 xprt->reestablish_timeout = 0; 3570 xprt->idle_timeout = 0; 3571 3572 xprt->ops = &bc_tcp_ops; 3573 3574 switch (addr->sa_family) { 3575 case AF_INET: 3576 xs_format_peer_addresses(xprt, "tcp", 3577 RPCBIND_NETID_TCP); 3578 break; 3579 case AF_INET6: 3580 xs_format_peer_addresses(xprt, "tcp", 3581 RPCBIND_NETID_TCP6); 3582 break; 3583 default: 3584 ret = ERR_PTR(-EAFNOSUPPORT); 3585 goto out_err; 3586 } 3587 3588 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 3589 xprt->address_strings[RPC_DISPLAY_ADDR], 3590 xprt->address_strings[RPC_DISPLAY_PORT], 3591 xprt->address_strings[RPC_DISPLAY_PROTO]); 3592 3593 /* 3594 * Once we've associated a backchannel xprt with a connection, 3595 * we want to keep it around as long as the connection lasts, 3596 * in case we need to start using it for a backchannel again; 3597 * this reference won't be dropped until bc_xprt is destroyed. 3598 */ 3599 xprt_get(xprt); 3600 args->bc_xprt->xpt_bc_xprt = xprt; 3601 xprt->bc_xprt = args->bc_xprt; 3602 bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt); 3603 transport->sock = bc_sock->sk_sock; 3604 transport->inet = bc_sock->sk_sk; 3605 3606 /* 3607 * Since we don't want connections for the backchannel, we set 3608 * the xprt status to connected 3609 */ 3610 xprt_set_connected(xprt); 3611 3612 if (try_module_get(THIS_MODULE)) 3613 return xprt; 3614 3615 args->bc_xprt->xpt_bc_xprt = NULL; 3616 args->bc_xprt->xpt_bc_xps = NULL; 3617 xprt_put(xprt); 3618 ret = ERR_PTR(-EINVAL); 3619 out_err: 3620 xs_xprt_free(xprt); 3621 return ret; 3622 } 3623 3624 static struct xprt_class xs_local_transport = { 3625 .list = LIST_HEAD_INIT(xs_local_transport.list), 3626 .name = "named UNIX socket", 3627 .owner = THIS_MODULE, 3628 .ident = XPRT_TRANSPORT_LOCAL, 3629 .setup = xs_setup_local, 3630 .netid = { "" }, 3631 }; 3632 3633 static struct xprt_class xs_udp_transport = { 3634 .list = LIST_HEAD_INIT(xs_udp_transport.list), 3635 .name = "udp", 3636 .owner = THIS_MODULE, 3637 .ident = XPRT_TRANSPORT_UDP, 3638 .setup = xs_setup_udp, 3639 .netid = { "udp", "udp6", "" }, 3640 }; 3641 3642 static struct xprt_class xs_tcp_transport = { 3643 .list = LIST_HEAD_INIT(xs_tcp_transport.list), 3644 .name = "tcp", 3645 .owner = THIS_MODULE, 3646 .ident = XPRT_TRANSPORT_TCP, 3647 .setup = xs_setup_tcp, 3648 .netid = { "tcp", "tcp6", "" }, 3649 }; 3650 3651 static struct xprt_class xs_tcp_tls_transport = { 3652 .list = LIST_HEAD_INIT(xs_tcp_tls_transport.list), 3653 .name = "tcp-with-tls", 3654 .owner = THIS_MODULE, 3655 .ident = XPRT_TRANSPORT_TCP_TLS, 3656 .setup = xs_setup_tcp_tls, 3657 .netid = { "tcp", "tcp6", "" }, 3658 }; 3659 3660 static struct xprt_class xs_bc_tcp_transport = { 3661 .list = LIST_HEAD_INIT(xs_bc_tcp_transport.list), 3662 .name = "tcp NFSv4.1 backchannel", 3663 .owner = THIS_MODULE, 3664 .ident = XPRT_TRANSPORT_BC_TCP, 3665 .setup = xs_setup_bc_tcp, 3666 .netid = { "" }, 3667 }; 3668 3669 /** 3670 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client 3671 * 3672 */ 3673 int init_socket_xprt(void) 3674 { 3675 if (!sunrpc_table_header) 3676 sunrpc_table_header = register_sysctl("sunrpc", xs_tunables_table); 3677 3678 xprt_register_transport(&xs_local_transport); 3679 xprt_register_transport(&xs_udp_transport); 3680 xprt_register_transport(&xs_tcp_transport); 3681 xprt_register_transport(&xs_tcp_tls_transport); 3682 xprt_register_transport(&xs_bc_tcp_transport); 3683 3684 return 0; 3685 } 3686 3687 /** 3688 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister 3689 * 3690 */ 3691 void cleanup_socket_xprt(void) 3692 { 3693 if (sunrpc_table_header) { 3694 unregister_sysctl_table(sunrpc_table_header); 3695 sunrpc_table_header = NULL; 3696 } 3697 3698 xprt_unregister_transport(&xs_local_transport); 3699 xprt_unregister_transport(&xs_udp_transport); 3700 xprt_unregister_transport(&xs_tcp_transport); 3701 xprt_unregister_transport(&xs_tcp_tls_transport); 3702 xprt_unregister_transport(&xs_bc_tcp_transport); 3703 } 3704 3705 static int param_set_portnr(const char *val, const struct kernel_param *kp) 3706 { 3707 return param_set_uint_minmax(val, kp, 3708 RPC_MIN_RESVPORT, 3709 RPC_MAX_RESVPORT); 3710 } 3711 3712 static const struct kernel_param_ops param_ops_portnr = { 3713 .set = param_set_portnr, 3714 .get = param_get_uint, 3715 }; 3716 3717 #define param_check_portnr(name, p) \ 3718 __param_check(name, p, unsigned int); 3719 3720 module_param_named(min_resvport, xprt_min_resvport, portnr, 0644); 3721 module_param_named(max_resvport, xprt_max_resvport, portnr, 0644); 3722 3723 static int param_set_slot_table_size(const char *val, 3724 const struct kernel_param *kp) 3725 { 3726 return param_set_uint_minmax(val, kp, 3727 RPC_MIN_SLOT_TABLE, 3728 RPC_MAX_SLOT_TABLE); 3729 } 3730 3731 static const struct kernel_param_ops param_ops_slot_table_size = { 3732 .set = param_set_slot_table_size, 3733 .get = param_get_uint, 3734 }; 3735 3736 #define param_check_slot_table_size(name, p) \ 3737 __param_check(name, p, unsigned int); 3738 3739 static int param_set_max_slot_table_size(const char *val, 3740 const struct kernel_param *kp) 3741 { 3742 return param_set_uint_minmax(val, kp, 3743 RPC_MIN_SLOT_TABLE, 3744 RPC_MAX_SLOT_TABLE_LIMIT); 3745 } 3746 3747 static const struct kernel_param_ops param_ops_max_slot_table_size = { 3748 .set = param_set_max_slot_table_size, 3749 .get = param_get_uint, 3750 }; 3751 3752 #define param_check_max_slot_table_size(name, p) \ 3753 __param_check(name, p, unsigned int); 3754 3755 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries, 3756 slot_table_size, 0644); 3757 module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries, 3758 max_slot_table_size, 0644); 3759 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries, 3760 slot_table_size, 0644); 3761