1 /* 2 * linux/net/sunrpc/xprtsock.c 3 * 4 * Client-side transport implementation for sockets. 5 * 6 * TCP callback races fixes (C) 1998 Red Hat 7 * TCP send fixes (C) 1998 Red Hat 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 * 11 * Rewrite of larges part of the code in order to stabilize TCP stuff. 12 * Fix behaviour when socket buffer is full. 13 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 14 * 15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com> 16 * 17 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005. 18 * <gilles.quillard@bull.net> 19 */ 20 21 #include <linux/types.h> 22 #include <linux/slab.h> 23 #include <linux/module.h> 24 #include <linux/capability.h> 25 #include <linux/pagemap.h> 26 #include <linux/errno.h> 27 #include <linux/socket.h> 28 #include <linux/in.h> 29 #include <linux/net.h> 30 #include <linux/mm.h> 31 #include <linux/udp.h> 32 #include <linux/tcp.h> 33 #include <linux/sunrpc/clnt.h> 34 #include <linux/sunrpc/sched.h> 35 #include <linux/sunrpc/xprtsock.h> 36 #include <linux/file.h> 37 #ifdef CONFIG_NFS_V4_1 38 #include <linux/sunrpc/bc_xprt.h> 39 #endif 40 41 #include <net/sock.h> 42 #include <net/checksum.h> 43 #include <net/udp.h> 44 #include <net/tcp.h> 45 46 /* 47 * xprtsock tunables 48 */ 49 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; 50 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; 51 52 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; 53 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; 54 55 #define XS_TCP_LINGER_TO (15U * HZ) 56 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; 57 58 /* 59 * We can register our own files under /proc/sys/sunrpc by 60 * calling register_sysctl_table() again. The files in that 61 * directory become the union of all files registered there. 62 * 63 * We simply need to make sure that we don't collide with 64 * someone else's file names! 65 */ 66 67 #ifdef RPC_DEBUG 68 69 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; 70 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; 71 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT; 72 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT; 73 74 static struct ctl_table_header *sunrpc_table_header; 75 76 /* 77 * FIXME: changing the UDP slot table size should also resize the UDP 78 * socket buffers for existing UDP transports 79 */ 80 static ctl_table xs_tunables_table[] = { 81 { 82 .ctl_name = CTL_SLOTTABLE_UDP, 83 .procname = "udp_slot_table_entries", 84 .data = &xprt_udp_slot_table_entries, 85 .maxlen = sizeof(unsigned int), 86 .mode = 0644, 87 .proc_handler = &proc_dointvec_minmax, 88 .strategy = &sysctl_intvec, 89 .extra1 = &min_slot_table_size, 90 .extra2 = &max_slot_table_size 91 }, 92 { 93 .ctl_name = CTL_SLOTTABLE_TCP, 94 .procname = "tcp_slot_table_entries", 95 .data = &xprt_tcp_slot_table_entries, 96 .maxlen = sizeof(unsigned int), 97 .mode = 0644, 98 .proc_handler = &proc_dointvec_minmax, 99 .strategy = &sysctl_intvec, 100 .extra1 = &min_slot_table_size, 101 .extra2 = &max_slot_table_size 102 }, 103 { 104 .ctl_name = CTL_MIN_RESVPORT, 105 .procname = "min_resvport", 106 .data = &xprt_min_resvport, 107 .maxlen = sizeof(unsigned int), 108 .mode = 0644, 109 .proc_handler = &proc_dointvec_minmax, 110 .strategy = &sysctl_intvec, 111 .extra1 = &xprt_min_resvport_limit, 112 .extra2 = &xprt_max_resvport_limit 113 }, 114 { 115 .ctl_name = CTL_MAX_RESVPORT, 116 .procname = "max_resvport", 117 .data = &xprt_max_resvport, 118 .maxlen = sizeof(unsigned int), 119 .mode = 0644, 120 .proc_handler = &proc_dointvec_minmax, 121 .strategy = &sysctl_intvec, 122 .extra1 = &xprt_min_resvport_limit, 123 .extra2 = &xprt_max_resvport_limit 124 }, 125 { 126 .procname = "tcp_fin_timeout", 127 .data = &xs_tcp_fin_timeout, 128 .maxlen = sizeof(xs_tcp_fin_timeout), 129 .mode = 0644, 130 .proc_handler = &proc_dointvec_jiffies, 131 .strategy = sysctl_jiffies 132 }, 133 { 134 .ctl_name = 0, 135 }, 136 }; 137 138 static ctl_table sunrpc_table[] = { 139 { 140 .ctl_name = CTL_SUNRPC, 141 .procname = "sunrpc", 142 .mode = 0555, 143 .child = xs_tunables_table 144 }, 145 { 146 .ctl_name = 0, 147 }, 148 }; 149 150 #endif 151 152 /* 153 * Time out for an RPC UDP socket connect. UDP socket connects are 154 * synchronous, but we set a timeout anyway in case of resource 155 * exhaustion on the local host. 156 */ 157 #define XS_UDP_CONN_TO (5U * HZ) 158 159 /* 160 * Wait duration for an RPC TCP connection to be established. Solaris 161 * NFS over TCP uses 60 seconds, for example, which is in line with how 162 * long a server takes to reboot. 163 */ 164 #define XS_TCP_CONN_TO (60U * HZ) 165 166 /* 167 * Wait duration for a reply from the RPC portmapper. 168 */ 169 #define XS_BIND_TO (60U * HZ) 170 171 /* 172 * Delay if a UDP socket connect error occurs. This is most likely some 173 * kind of resource problem on the local host. 174 */ 175 #define XS_UDP_REEST_TO (2U * HZ) 176 177 /* 178 * The reestablish timeout allows clients to delay for a bit before attempting 179 * to reconnect to a server that just dropped our connection. 180 * 181 * We implement an exponential backoff when trying to reestablish a TCP 182 * transport connection with the server. Some servers like to drop a TCP 183 * connection when they are overworked, so we start with a short timeout and 184 * increase over time if the server is down or not responding. 185 */ 186 #define XS_TCP_INIT_REEST_TO (3U * HZ) 187 #define XS_TCP_MAX_REEST_TO (5U * 60 * HZ) 188 189 /* 190 * TCP idle timeout; client drops the transport socket if it is idle 191 * for this long. Note that we also timeout UDP sockets to prevent 192 * holding port numbers when there is no RPC traffic. 193 */ 194 #define XS_IDLE_DISC_TO (5U * 60 * HZ) 195 196 #ifdef RPC_DEBUG 197 # undef RPC_DEBUG_DATA 198 # define RPCDBG_FACILITY RPCDBG_TRANS 199 #endif 200 201 #ifdef RPC_DEBUG_DATA 202 static void xs_pktdump(char *msg, u32 *packet, unsigned int count) 203 { 204 u8 *buf = (u8 *) packet; 205 int j; 206 207 dprintk("RPC: %s\n", msg); 208 for (j = 0; j < count && j < 128; j += 4) { 209 if (!(j & 31)) { 210 if (j) 211 dprintk("\n"); 212 dprintk("0x%04x ", j); 213 } 214 dprintk("%02x%02x%02x%02x ", 215 buf[j], buf[j+1], buf[j+2], buf[j+3]); 216 } 217 dprintk("\n"); 218 } 219 #else 220 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count) 221 { 222 /* NOP */ 223 } 224 #endif 225 226 struct sock_xprt { 227 struct rpc_xprt xprt; 228 229 /* 230 * Network layer 231 */ 232 struct socket * sock; 233 struct sock * inet; 234 235 /* 236 * State of TCP reply receive 237 */ 238 __be32 tcp_fraghdr, 239 tcp_xid; 240 241 u32 tcp_offset, 242 tcp_reclen; 243 244 unsigned long tcp_copied, 245 tcp_flags; 246 247 /* 248 * Connection of transports 249 */ 250 struct delayed_work connect_worker; 251 struct sockaddr_storage addr; 252 unsigned short port; 253 254 /* 255 * UDP socket buffer size parameters 256 */ 257 size_t rcvsize, 258 sndsize; 259 260 /* 261 * Saved socket callback addresses 262 */ 263 void (*old_data_ready)(struct sock *, int); 264 void (*old_state_change)(struct sock *); 265 void (*old_write_space)(struct sock *); 266 void (*old_error_report)(struct sock *); 267 }; 268 269 /* 270 * TCP receive state flags 271 */ 272 #define TCP_RCV_LAST_FRAG (1UL << 0) 273 #define TCP_RCV_COPY_FRAGHDR (1UL << 1) 274 #define TCP_RCV_COPY_XID (1UL << 2) 275 #define TCP_RCV_COPY_DATA (1UL << 3) 276 #define TCP_RCV_READ_CALLDIR (1UL << 4) 277 #define TCP_RCV_COPY_CALLDIR (1UL << 5) 278 279 /* 280 * TCP RPC flags 281 */ 282 #define TCP_RPC_REPLY (1UL << 6) 283 284 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) 285 { 286 return (struct sockaddr *) &xprt->addr; 287 } 288 289 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt) 290 { 291 return (struct sockaddr_in *) &xprt->addr; 292 } 293 294 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt) 295 { 296 return (struct sockaddr_in6 *) &xprt->addr; 297 } 298 299 static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt, 300 const char *protocol, 301 const char *netid) 302 { 303 struct sockaddr_in *addr = xs_addr_in(xprt); 304 char *buf; 305 306 buf = kzalloc(20, GFP_KERNEL); 307 if (buf) { 308 snprintf(buf, 20, "%pI4", &addr->sin_addr.s_addr); 309 } 310 xprt->address_strings[RPC_DISPLAY_ADDR] = buf; 311 312 buf = kzalloc(8, GFP_KERNEL); 313 if (buf) { 314 snprintf(buf, 8, "%u", 315 ntohs(addr->sin_port)); 316 } 317 xprt->address_strings[RPC_DISPLAY_PORT] = buf; 318 319 xprt->address_strings[RPC_DISPLAY_PROTO] = protocol; 320 321 buf = kzalloc(48, GFP_KERNEL); 322 if (buf) { 323 snprintf(buf, 48, "addr=%pI4 port=%u proto=%s", 324 &addr->sin_addr.s_addr, 325 ntohs(addr->sin_port), 326 protocol); 327 } 328 xprt->address_strings[RPC_DISPLAY_ALL] = buf; 329 330 buf = kzalloc(10, GFP_KERNEL); 331 if (buf) { 332 snprintf(buf, 10, "%02x%02x%02x%02x", 333 NIPQUAD(addr->sin_addr.s_addr)); 334 } 335 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf; 336 337 buf = kzalloc(8, GFP_KERNEL); 338 if (buf) { 339 snprintf(buf, 8, "%4hx", 340 ntohs(addr->sin_port)); 341 } 342 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf; 343 344 buf = kzalloc(30, GFP_KERNEL); 345 if (buf) { 346 snprintf(buf, 30, "%pI4.%u.%u", 347 &addr->sin_addr.s_addr, 348 ntohs(addr->sin_port) >> 8, 349 ntohs(addr->sin_port) & 0xff); 350 } 351 xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf; 352 353 xprt->address_strings[RPC_DISPLAY_NETID] = netid; 354 } 355 356 static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt, 357 const char *protocol, 358 const char *netid) 359 { 360 struct sockaddr_in6 *addr = xs_addr_in6(xprt); 361 char *buf; 362 363 buf = kzalloc(40, GFP_KERNEL); 364 if (buf) { 365 snprintf(buf, 40, "%pI6",&addr->sin6_addr); 366 } 367 xprt->address_strings[RPC_DISPLAY_ADDR] = buf; 368 369 buf = kzalloc(8, GFP_KERNEL); 370 if (buf) { 371 snprintf(buf, 8, "%u", 372 ntohs(addr->sin6_port)); 373 } 374 xprt->address_strings[RPC_DISPLAY_PORT] = buf; 375 376 xprt->address_strings[RPC_DISPLAY_PROTO] = protocol; 377 378 buf = kzalloc(64, GFP_KERNEL); 379 if (buf) { 380 snprintf(buf, 64, "addr=%pI6 port=%u proto=%s", 381 &addr->sin6_addr, 382 ntohs(addr->sin6_port), 383 protocol); 384 } 385 xprt->address_strings[RPC_DISPLAY_ALL] = buf; 386 387 buf = kzalloc(36, GFP_KERNEL); 388 if (buf) 389 snprintf(buf, 36, "%pi6", &addr->sin6_addr); 390 391 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf; 392 393 buf = kzalloc(8, GFP_KERNEL); 394 if (buf) { 395 snprintf(buf, 8, "%4hx", 396 ntohs(addr->sin6_port)); 397 } 398 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf; 399 400 buf = kzalloc(50, GFP_KERNEL); 401 if (buf) { 402 snprintf(buf, 50, "%pI6.%u.%u", 403 &addr->sin6_addr, 404 ntohs(addr->sin6_port) >> 8, 405 ntohs(addr->sin6_port) & 0xff); 406 } 407 xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf; 408 409 xprt->address_strings[RPC_DISPLAY_NETID] = netid; 410 } 411 412 static void xs_free_peer_addresses(struct rpc_xprt *xprt) 413 { 414 unsigned int i; 415 416 for (i = 0; i < RPC_DISPLAY_MAX; i++) 417 switch (i) { 418 case RPC_DISPLAY_PROTO: 419 case RPC_DISPLAY_NETID: 420 continue; 421 default: 422 kfree(xprt->address_strings[i]); 423 } 424 } 425 426 #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) 427 428 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) 429 { 430 struct msghdr msg = { 431 .msg_name = addr, 432 .msg_namelen = addrlen, 433 .msg_flags = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0), 434 }; 435 struct kvec iov = { 436 .iov_base = vec->iov_base + base, 437 .iov_len = vec->iov_len - base, 438 }; 439 440 if (iov.iov_len != 0) 441 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); 442 return kernel_sendmsg(sock, &msg, NULL, 0, 0); 443 } 444 445 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more) 446 { 447 struct page **ppage; 448 unsigned int remainder; 449 int err, sent = 0; 450 451 remainder = xdr->page_len - base; 452 base += xdr->page_base; 453 ppage = xdr->pages + (base >> PAGE_SHIFT); 454 base &= ~PAGE_MASK; 455 for(;;) { 456 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder); 457 int flags = XS_SENDMSG_FLAGS; 458 459 remainder -= len; 460 if (remainder != 0 || more) 461 flags |= MSG_MORE; 462 err = sock->ops->sendpage(sock, *ppage, base, len, flags); 463 if (remainder == 0 || err != len) 464 break; 465 sent += err; 466 ppage++; 467 base = 0; 468 } 469 if (sent == 0) 470 return err; 471 if (err > 0) 472 sent += err; 473 return sent; 474 } 475 476 /** 477 * xs_sendpages - write pages directly to a socket 478 * @sock: socket to send on 479 * @addr: UDP only -- address of destination 480 * @addrlen: UDP only -- length of destination address 481 * @xdr: buffer containing this request 482 * @base: starting position in the buffer 483 * 484 */ 485 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) 486 { 487 unsigned int remainder = xdr->len - base; 488 int err, sent = 0; 489 490 if (unlikely(!sock)) 491 return -ENOTSOCK; 492 493 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 494 if (base != 0) { 495 addr = NULL; 496 addrlen = 0; 497 } 498 499 if (base < xdr->head[0].iov_len || addr != NULL) { 500 unsigned int len = xdr->head[0].iov_len - base; 501 remainder -= len; 502 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0); 503 if (remainder == 0 || err != len) 504 goto out; 505 sent += err; 506 base = 0; 507 } else 508 base -= xdr->head[0].iov_len; 509 510 if (base < xdr->page_len) { 511 unsigned int len = xdr->page_len - base; 512 remainder -= len; 513 err = xs_send_pagedata(sock, xdr, base, remainder != 0); 514 if (remainder == 0 || err != len) 515 goto out; 516 sent += err; 517 base = 0; 518 } else 519 base -= xdr->page_len; 520 521 if (base >= xdr->tail[0].iov_len) 522 return sent; 523 err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0); 524 out: 525 if (sent == 0) 526 return err; 527 if (err > 0) 528 sent += err; 529 return sent; 530 } 531 532 static void xs_nospace_callback(struct rpc_task *task) 533 { 534 struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt); 535 536 transport->inet->sk_write_pending--; 537 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 538 } 539 540 /** 541 * xs_nospace - place task on wait queue if transmit was incomplete 542 * @task: task to put to sleep 543 * 544 */ 545 static int xs_nospace(struct rpc_task *task) 546 { 547 struct rpc_rqst *req = task->tk_rqstp; 548 struct rpc_xprt *xprt = req->rq_xprt; 549 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 550 int ret = 0; 551 552 dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", 553 task->tk_pid, req->rq_slen - req->rq_bytes_sent, 554 req->rq_slen); 555 556 /* Protect against races with write_space */ 557 spin_lock_bh(&xprt->transport_lock); 558 559 /* Don't race with disconnect */ 560 if (xprt_connected(xprt)) { 561 if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) { 562 ret = -EAGAIN; 563 /* 564 * Notify TCP that we're limited by the application 565 * window size 566 */ 567 set_bit(SOCK_NOSPACE, &transport->sock->flags); 568 transport->inet->sk_write_pending++; 569 /* ...and wait for more buffer space */ 570 xprt_wait_for_buffer_space(task, xs_nospace_callback); 571 } 572 } else { 573 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 574 ret = -ENOTCONN; 575 } 576 577 spin_unlock_bh(&xprt->transport_lock); 578 return ret; 579 } 580 581 /** 582 * xs_udp_send_request - write an RPC request to a UDP socket 583 * @task: address of RPC task that manages the state of an RPC request 584 * 585 * Return values: 586 * 0: The request has been sent 587 * EAGAIN: The socket was blocked, please call again later to 588 * complete the request 589 * ENOTCONN: Caller needs to invoke connect logic then call again 590 * other: Some other error occured, the request was not sent 591 */ 592 static int xs_udp_send_request(struct rpc_task *task) 593 { 594 struct rpc_rqst *req = task->tk_rqstp; 595 struct rpc_xprt *xprt = req->rq_xprt; 596 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 597 struct xdr_buf *xdr = &req->rq_snd_buf; 598 int status; 599 600 xs_pktdump("packet data:", 601 req->rq_svec->iov_base, 602 req->rq_svec->iov_len); 603 604 if (!xprt_bound(xprt)) 605 return -ENOTCONN; 606 status = xs_sendpages(transport->sock, 607 xs_addr(xprt), 608 xprt->addrlen, xdr, 609 req->rq_bytes_sent); 610 611 dprintk("RPC: xs_udp_send_request(%u) = %d\n", 612 xdr->len - req->rq_bytes_sent, status); 613 614 if (status >= 0) { 615 task->tk_bytes_sent += status; 616 if (status >= req->rq_slen) 617 return 0; 618 /* Still some bytes left; set up for a retry later. */ 619 status = -EAGAIN; 620 } 621 if (!transport->sock) 622 goto out; 623 624 switch (status) { 625 case -ENOTSOCK: 626 status = -ENOTCONN; 627 /* Should we call xs_close() here? */ 628 break; 629 case -EAGAIN: 630 status = xs_nospace(task); 631 break; 632 default: 633 dprintk("RPC: sendmsg returned unrecognized error %d\n", 634 -status); 635 case -ENETUNREACH: 636 case -EPIPE: 637 case -ECONNREFUSED: 638 /* When the server has died, an ICMP port unreachable message 639 * prompts ECONNREFUSED. */ 640 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 641 } 642 out: 643 return status; 644 } 645 646 /** 647 * xs_tcp_shutdown - gracefully shut down a TCP socket 648 * @xprt: transport 649 * 650 * Initiates a graceful shutdown of the TCP socket by calling the 651 * equivalent of shutdown(SHUT_WR); 652 */ 653 static void xs_tcp_shutdown(struct rpc_xprt *xprt) 654 { 655 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 656 struct socket *sock = transport->sock; 657 658 if (sock != NULL) 659 kernel_sock_shutdown(sock, SHUT_WR); 660 } 661 662 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf) 663 { 664 u32 reclen = buf->len - sizeof(rpc_fraghdr); 665 rpc_fraghdr *base = buf->head[0].iov_base; 666 *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen); 667 } 668 669 /** 670 * xs_tcp_send_request - write an RPC request to a TCP socket 671 * @task: address of RPC task that manages the state of an RPC request 672 * 673 * Return values: 674 * 0: The request has been sent 675 * EAGAIN: The socket was blocked, please call again later to 676 * complete the request 677 * ENOTCONN: Caller needs to invoke connect logic then call again 678 * other: Some other error occured, the request was not sent 679 * 680 * XXX: In the case of soft timeouts, should we eventually give up 681 * if sendmsg is not able to make progress? 682 */ 683 static int xs_tcp_send_request(struct rpc_task *task) 684 { 685 struct rpc_rqst *req = task->tk_rqstp; 686 struct rpc_xprt *xprt = req->rq_xprt; 687 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 688 struct xdr_buf *xdr = &req->rq_snd_buf; 689 int status; 690 691 xs_encode_tcp_record_marker(&req->rq_snd_buf); 692 693 xs_pktdump("packet data:", 694 req->rq_svec->iov_base, 695 req->rq_svec->iov_len); 696 697 /* Continue transmitting the packet/record. We must be careful 698 * to cope with writespace callbacks arriving _after_ we have 699 * called sendmsg(). */ 700 while (1) { 701 status = xs_sendpages(transport->sock, 702 NULL, 0, xdr, req->rq_bytes_sent); 703 704 dprintk("RPC: xs_tcp_send_request(%u) = %d\n", 705 xdr->len - req->rq_bytes_sent, status); 706 707 if (unlikely(status < 0)) 708 break; 709 710 /* If we've sent the entire packet, immediately 711 * reset the count of bytes sent. */ 712 req->rq_bytes_sent += status; 713 task->tk_bytes_sent += status; 714 if (likely(req->rq_bytes_sent >= req->rq_slen)) { 715 req->rq_bytes_sent = 0; 716 return 0; 717 } 718 719 if (status != 0) 720 continue; 721 status = -EAGAIN; 722 break; 723 } 724 if (!transport->sock) 725 goto out; 726 727 switch (status) { 728 case -ENOTSOCK: 729 status = -ENOTCONN; 730 /* Should we call xs_close() here? */ 731 break; 732 case -EAGAIN: 733 status = xs_nospace(task); 734 break; 735 default: 736 dprintk("RPC: sendmsg returned unrecognized error %d\n", 737 -status); 738 case -ECONNRESET: 739 case -EPIPE: 740 xs_tcp_shutdown(xprt); 741 case -ECONNREFUSED: 742 case -ENOTCONN: 743 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 744 } 745 out: 746 return status; 747 } 748 749 /** 750 * xs_tcp_release_xprt - clean up after a tcp transmission 751 * @xprt: transport 752 * @task: rpc task 753 * 754 * This cleans up if an error causes us to abort the transmission of a request. 755 * In this case, the socket may need to be reset in order to avoid confusing 756 * the server. 757 */ 758 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 759 { 760 struct rpc_rqst *req; 761 762 if (task != xprt->snd_task) 763 return; 764 if (task == NULL) 765 goto out_release; 766 req = task->tk_rqstp; 767 if (req->rq_bytes_sent == 0) 768 goto out_release; 769 if (req->rq_bytes_sent == req->rq_snd_buf.len) 770 goto out_release; 771 set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state); 772 out_release: 773 xprt_release_xprt(xprt, task); 774 } 775 776 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) 777 { 778 transport->old_data_ready = sk->sk_data_ready; 779 transport->old_state_change = sk->sk_state_change; 780 transport->old_write_space = sk->sk_write_space; 781 transport->old_error_report = sk->sk_error_report; 782 } 783 784 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk) 785 { 786 sk->sk_data_ready = transport->old_data_ready; 787 sk->sk_state_change = transport->old_state_change; 788 sk->sk_write_space = transport->old_write_space; 789 sk->sk_error_report = transport->old_error_report; 790 } 791 792 static void xs_reset_transport(struct sock_xprt *transport) 793 { 794 struct socket *sock = transport->sock; 795 struct sock *sk = transport->inet; 796 797 if (sk == NULL) 798 return; 799 800 write_lock_bh(&sk->sk_callback_lock); 801 transport->inet = NULL; 802 transport->sock = NULL; 803 804 sk->sk_user_data = NULL; 805 806 xs_restore_old_callbacks(transport, sk); 807 write_unlock_bh(&sk->sk_callback_lock); 808 809 sk->sk_no_check = 0; 810 811 sock_release(sock); 812 } 813 814 /** 815 * xs_close - close a socket 816 * @xprt: transport 817 * 818 * This is used when all requests are complete; ie, no DRC state remains 819 * on the server we want to save. 820 * 821 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with 822 * xs_reset_transport() zeroing the socket from underneath a writer. 823 */ 824 static void xs_close(struct rpc_xprt *xprt) 825 { 826 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 827 828 dprintk("RPC: xs_close xprt %p\n", xprt); 829 830 xs_reset_transport(transport); 831 832 smp_mb__before_clear_bit(); 833 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 834 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 835 clear_bit(XPRT_CLOSING, &xprt->state); 836 smp_mb__after_clear_bit(); 837 xprt_disconnect_done(xprt); 838 } 839 840 static void xs_tcp_close(struct rpc_xprt *xprt) 841 { 842 if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state)) 843 xs_close(xprt); 844 else 845 xs_tcp_shutdown(xprt); 846 } 847 848 /** 849 * xs_destroy - prepare to shutdown a transport 850 * @xprt: doomed transport 851 * 852 */ 853 static void xs_destroy(struct rpc_xprt *xprt) 854 { 855 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 856 857 dprintk("RPC: xs_destroy xprt %p\n", xprt); 858 859 cancel_rearming_delayed_work(&transport->connect_worker); 860 861 xs_close(xprt); 862 xs_free_peer_addresses(xprt); 863 kfree(xprt->slot); 864 kfree(xprt); 865 module_put(THIS_MODULE); 866 } 867 868 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) 869 { 870 return (struct rpc_xprt *) sk->sk_user_data; 871 } 872 873 /** 874 * xs_udp_data_ready - "data ready" callback for UDP sockets 875 * @sk: socket with data to read 876 * @len: how much data to read 877 * 878 */ 879 static void xs_udp_data_ready(struct sock *sk, int len) 880 { 881 struct rpc_task *task; 882 struct rpc_xprt *xprt; 883 struct rpc_rqst *rovr; 884 struct sk_buff *skb; 885 int err, repsize, copied; 886 u32 _xid; 887 __be32 *xp; 888 889 read_lock(&sk->sk_callback_lock); 890 dprintk("RPC: xs_udp_data_ready...\n"); 891 if (!(xprt = xprt_from_sock(sk))) 892 goto out; 893 894 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) 895 goto out; 896 897 if (xprt->shutdown) 898 goto dropit; 899 900 repsize = skb->len - sizeof(struct udphdr); 901 if (repsize < 4) { 902 dprintk("RPC: impossible RPC reply size %d!\n", repsize); 903 goto dropit; 904 } 905 906 /* Copy the XID from the skb... */ 907 xp = skb_header_pointer(skb, sizeof(struct udphdr), 908 sizeof(_xid), &_xid); 909 if (xp == NULL) 910 goto dropit; 911 912 /* Look up and lock the request corresponding to the given XID */ 913 spin_lock(&xprt->transport_lock); 914 rovr = xprt_lookup_rqst(xprt, *xp); 915 if (!rovr) 916 goto out_unlock; 917 task = rovr->rq_task; 918 919 if ((copied = rovr->rq_private_buf.buflen) > repsize) 920 copied = repsize; 921 922 /* Suck it into the iovec, verify checksum if not done by hw. */ 923 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { 924 UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS); 925 goto out_unlock; 926 } 927 928 UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS); 929 930 /* Something worked... */ 931 dst_confirm(skb_dst(skb)); 932 933 xprt_adjust_cwnd(task, copied); 934 xprt_update_rtt(task); 935 xprt_complete_rqst(task, copied); 936 937 out_unlock: 938 spin_unlock(&xprt->transport_lock); 939 dropit: 940 skb_free_datagram(sk, skb); 941 out: 942 read_unlock(&sk->sk_callback_lock); 943 } 944 945 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) 946 { 947 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 948 size_t len, used; 949 char *p; 950 951 p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset; 952 len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset; 953 used = xdr_skb_read_bits(desc, p, len); 954 transport->tcp_offset += used; 955 if (used != len) 956 return; 957 958 transport->tcp_reclen = ntohl(transport->tcp_fraghdr); 959 if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT) 960 transport->tcp_flags |= TCP_RCV_LAST_FRAG; 961 else 962 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG; 963 transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK; 964 965 transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR; 966 transport->tcp_offset = 0; 967 968 /* Sanity check of the record length */ 969 if (unlikely(transport->tcp_reclen < 8)) { 970 dprintk("RPC: invalid TCP record fragment length\n"); 971 xprt_force_disconnect(xprt); 972 return; 973 } 974 dprintk("RPC: reading TCP record fragment of length %d\n", 975 transport->tcp_reclen); 976 } 977 978 static void xs_tcp_check_fraghdr(struct sock_xprt *transport) 979 { 980 if (transport->tcp_offset == transport->tcp_reclen) { 981 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR; 982 transport->tcp_offset = 0; 983 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) { 984 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 985 transport->tcp_flags |= TCP_RCV_COPY_XID; 986 transport->tcp_copied = 0; 987 } 988 } 989 } 990 991 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc) 992 { 993 size_t len, used; 994 char *p; 995 996 len = sizeof(transport->tcp_xid) - transport->tcp_offset; 997 dprintk("RPC: reading XID (%Zu bytes)\n", len); 998 p = ((char *) &transport->tcp_xid) + transport->tcp_offset; 999 used = xdr_skb_read_bits(desc, p, len); 1000 transport->tcp_offset += used; 1001 if (used != len) 1002 return; 1003 transport->tcp_flags &= ~TCP_RCV_COPY_XID; 1004 transport->tcp_flags |= TCP_RCV_READ_CALLDIR; 1005 transport->tcp_copied = 4; 1006 dprintk("RPC: reading %s XID %08x\n", 1007 (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for" 1008 : "request with", 1009 ntohl(transport->tcp_xid)); 1010 xs_tcp_check_fraghdr(transport); 1011 } 1012 1013 static inline void xs_tcp_read_calldir(struct sock_xprt *transport, 1014 struct xdr_skb_reader *desc) 1015 { 1016 size_t len, used; 1017 u32 offset; 1018 __be32 calldir; 1019 1020 /* 1021 * We want transport->tcp_offset to be 8 at the end of this routine 1022 * (4 bytes for the xid and 4 bytes for the call/reply flag). 1023 * When this function is called for the first time, 1024 * transport->tcp_offset is 4 (after having already read the xid). 1025 */ 1026 offset = transport->tcp_offset - sizeof(transport->tcp_xid); 1027 len = sizeof(calldir) - offset; 1028 dprintk("RPC: reading CALL/REPLY flag (%Zu bytes)\n", len); 1029 used = xdr_skb_read_bits(desc, &calldir, len); 1030 transport->tcp_offset += used; 1031 if (used != len) 1032 return; 1033 transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR; 1034 transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; 1035 transport->tcp_flags |= TCP_RCV_COPY_DATA; 1036 /* 1037 * We don't yet have the XDR buffer, so we will write the calldir 1038 * out after we get the buffer from the 'struct rpc_rqst' 1039 */ 1040 if (ntohl(calldir) == RPC_REPLY) 1041 transport->tcp_flags |= TCP_RPC_REPLY; 1042 else 1043 transport->tcp_flags &= ~TCP_RPC_REPLY; 1044 dprintk("RPC: reading %s CALL/REPLY flag %08x\n", 1045 (transport->tcp_flags & TCP_RPC_REPLY) ? 1046 "reply for" : "request with", calldir); 1047 xs_tcp_check_fraghdr(transport); 1048 } 1049 1050 static inline void xs_tcp_read_common(struct rpc_xprt *xprt, 1051 struct xdr_skb_reader *desc, 1052 struct rpc_rqst *req) 1053 { 1054 struct sock_xprt *transport = 1055 container_of(xprt, struct sock_xprt, xprt); 1056 struct xdr_buf *rcvbuf; 1057 size_t len; 1058 ssize_t r; 1059 1060 rcvbuf = &req->rq_private_buf; 1061 1062 if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { 1063 /* 1064 * Save the RPC direction in the XDR buffer 1065 */ 1066 __be32 calldir = transport->tcp_flags & TCP_RPC_REPLY ? 1067 htonl(RPC_REPLY) : 0; 1068 1069 memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied, 1070 &calldir, sizeof(calldir)); 1071 transport->tcp_copied += sizeof(calldir); 1072 transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; 1073 } 1074 1075 len = desc->count; 1076 if (len > transport->tcp_reclen - transport->tcp_offset) { 1077 struct xdr_skb_reader my_desc; 1078 1079 len = transport->tcp_reclen - transport->tcp_offset; 1080 memcpy(&my_desc, desc, sizeof(my_desc)); 1081 my_desc.count = len; 1082 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied, 1083 &my_desc, xdr_skb_read_bits); 1084 desc->count -= r; 1085 desc->offset += r; 1086 } else 1087 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied, 1088 desc, xdr_skb_read_bits); 1089 1090 if (r > 0) { 1091 transport->tcp_copied += r; 1092 transport->tcp_offset += r; 1093 } 1094 if (r != len) { 1095 /* Error when copying to the receive buffer, 1096 * usually because we weren't able to allocate 1097 * additional buffer pages. All we can do now 1098 * is turn off TCP_RCV_COPY_DATA, so the request 1099 * will not receive any additional updates, 1100 * and time out. 1101 * Any remaining data from this record will 1102 * be discarded. 1103 */ 1104 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1105 dprintk("RPC: XID %08x truncated request\n", 1106 ntohl(transport->tcp_xid)); 1107 dprintk("RPC: xprt = %p, tcp_copied = %lu, " 1108 "tcp_offset = %u, tcp_reclen = %u\n", 1109 xprt, transport->tcp_copied, 1110 transport->tcp_offset, transport->tcp_reclen); 1111 return; 1112 } 1113 1114 dprintk("RPC: XID %08x read %Zd bytes\n", 1115 ntohl(transport->tcp_xid), r); 1116 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, " 1117 "tcp_reclen = %u\n", xprt, transport->tcp_copied, 1118 transport->tcp_offset, transport->tcp_reclen); 1119 1120 if (transport->tcp_copied == req->rq_private_buf.buflen) 1121 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1122 else if (transport->tcp_offset == transport->tcp_reclen) { 1123 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) 1124 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1125 } 1126 1127 return; 1128 } 1129 1130 /* 1131 * Finds the request corresponding to the RPC xid and invokes the common 1132 * tcp read code to read the data. 1133 */ 1134 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, 1135 struct xdr_skb_reader *desc) 1136 { 1137 struct sock_xprt *transport = 1138 container_of(xprt, struct sock_xprt, xprt); 1139 struct rpc_rqst *req; 1140 1141 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); 1142 1143 /* Find and lock the request corresponding to this xid */ 1144 spin_lock(&xprt->transport_lock); 1145 req = xprt_lookup_rqst(xprt, transport->tcp_xid); 1146 if (!req) { 1147 dprintk("RPC: XID %08x request not found!\n", 1148 ntohl(transport->tcp_xid)); 1149 spin_unlock(&xprt->transport_lock); 1150 return -1; 1151 } 1152 1153 xs_tcp_read_common(xprt, desc, req); 1154 1155 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1156 xprt_complete_rqst(req->rq_task, transport->tcp_copied); 1157 1158 spin_unlock(&xprt->transport_lock); 1159 return 0; 1160 } 1161 1162 #if defined(CONFIG_NFS_V4_1) 1163 /* 1164 * Obtains an rpc_rqst previously allocated and invokes the common 1165 * tcp read code to read the data. The result is placed in the callback 1166 * queue. 1167 * If we're unable to obtain the rpc_rqst we schedule the closing of the 1168 * connection and return -1. 1169 */ 1170 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, 1171 struct xdr_skb_reader *desc) 1172 { 1173 struct sock_xprt *transport = 1174 container_of(xprt, struct sock_xprt, xprt); 1175 struct rpc_rqst *req; 1176 1177 req = xprt_alloc_bc_request(xprt); 1178 if (req == NULL) { 1179 printk(KERN_WARNING "Callback slot table overflowed\n"); 1180 xprt_force_disconnect(xprt); 1181 return -1; 1182 } 1183 1184 req->rq_xid = transport->tcp_xid; 1185 dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); 1186 xs_tcp_read_common(xprt, desc, req); 1187 1188 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { 1189 struct svc_serv *bc_serv = xprt->bc_serv; 1190 1191 /* 1192 * Add callback request to callback list. The callback 1193 * service sleeps on the sv_cb_waitq waiting for new 1194 * requests. Wake it up after adding enqueing the 1195 * request. 1196 */ 1197 dprintk("RPC: add callback request to list\n"); 1198 spin_lock(&bc_serv->sv_cb_lock); 1199 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); 1200 spin_unlock(&bc_serv->sv_cb_lock); 1201 wake_up(&bc_serv->sv_cb_waitq); 1202 } 1203 1204 req->rq_private_buf.len = transport->tcp_copied; 1205 1206 return 0; 1207 } 1208 1209 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1210 struct xdr_skb_reader *desc) 1211 { 1212 struct sock_xprt *transport = 1213 container_of(xprt, struct sock_xprt, xprt); 1214 1215 return (transport->tcp_flags & TCP_RPC_REPLY) ? 1216 xs_tcp_read_reply(xprt, desc) : 1217 xs_tcp_read_callback(xprt, desc); 1218 } 1219 #else 1220 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1221 struct xdr_skb_reader *desc) 1222 { 1223 return xs_tcp_read_reply(xprt, desc); 1224 } 1225 #endif /* CONFIG_NFS_V4_1 */ 1226 1227 /* 1228 * Read data off the transport. This can be either an RPC_CALL or an 1229 * RPC_REPLY. Relay the processing to helper functions. 1230 */ 1231 static void xs_tcp_read_data(struct rpc_xprt *xprt, 1232 struct xdr_skb_reader *desc) 1233 { 1234 struct sock_xprt *transport = 1235 container_of(xprt, struct sock_xprt, xprt); 1236 1237 if (_xs_tcp_read_data(xprt, desc) == 0) 1238 xs_tcp_check_fraghdr(transport); 1239 else { 1240 /* 1241 * The transport_lock protects the request handling. 1242 * There's no need to hold it to update the tcp_flags. 1243 */ 1244 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1245 } 1246 } 1247 1248 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc) 1249 { 1250 size_t len; 1251 1252 len = transport->tcp_reclen - transport->tcp_offset; 1253 if (len > desc->count) 1254 len = desc->count; 1255 desc->count -= len; 1256 desc->offset += len; 1257 transport->tcp_offset += len; 1258 dprintk("RPC: discarded %Zu bytes\n", len); 1259 xs_tcp_check_fraghdr(transport); 1260 } 1261 1262 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len) 1263 { 1264 struct rpc_xprt *xprt = rd_desc->arg.data; 1265 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1266 struct xdr_skb_reader desc = { 1267 .skb = skb, 1268 .offset = offset, 1269 .count = len, 1270 }; 1271 1272 dprintk("RPC: xs_tcp_data_recv started\n"); 1273 do { 1274 /* Read in a new fragment marker if necessary */ 1275 /* Can we ever really expect to get completely empty fragments? */ 1276 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) { 1277 xs_tcp_read_fraghdr(xprt, &desc); 1278 continue; 1279 } 1280 /* Read in the xid if necessary */ 1281 if (transport->tcp_flags & TCP_RCV_COPY_XID) { 1282 xs_tcp_read_xid(transport, &desc); 1283 continue; 1284 } 1285 /* Read in the call/reply flag */ 1286 if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) { 1287 xs_tcp_read_calldir(transport, &desc); 1288 continue; 1289 } 1290 /* Read in the request data */ 1291 if (transport->tcp_flags & TCP_RCV_COPY_DATA) { 1292 xs_tcp_read_data(xprt, &desc); 1293 continue; 1294 } 1295 /* Skip over any trailing bytes on short reads */ 1296 xs_tcp_read_discard(transport, &desc); 1297 } while (desc.count); 1298 dprintk("RPC: xs_tcp_data_recv done\n"); 1299 return len - desc.count; 1300 } 1301 1302 /** 1303 * xs_tcp_data_ready - "data ready" callback for TCP sockets 1304 * @sk: socket with data to read 1305 * @bytes: how much data to read 1306 * 1307 */ 1308 static void xs_tcp_data_ready(struct sock *sk, int bytes) 1309 { 1310 struct rpc_xprt *xprt; 1311 read_descriptor_t rd_desc; 1312 int read; 1313 1314 dprintk("RPC: xs_tcp_data_ready...\n"); 1315 1316 read_lock(&sk->sk_callback_lock); 1317 if (!(xprt = xprt_from_sock(sk))) 1318 goto out; 1319 if (xprt->shutdown) 1320 goto out; 1321 1322 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ 1323 rd_desc.arg.data = xprt; 1324 do { 1325 rd_desc.count = 65536; 1326 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); 1327 } while (read > 0); 1328 out: 1329 read_unlock(&sk->sk_callback_lock); 1330 } 1331 1332 /* 1333 * Do the equivalent of linger/linger2 handling for dealing with 1334 * broken servers that don't close the socket in a timely 1335 * fashion 1336 */ 1337 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt, 1338 unsigned long timeout) 1339 { 1340 struct sock_xprt *transport; 1341 1342 if (xprt_test_and_set_connecting(xprt)) 1343 return; 1344 set_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1345 transport = container_of(xprt, struct sock_xprt, xprt); 1346 queue_delayed_work(rpciod_workqueue, &transport->connect_worker, 1347 timeout); 1348 } 1349 1350 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt) 1351 { 1352 struct sock_xprt *transport; 1353 1354 transport = container_of(xprt, struct sock_xprt, xprt); 1355 1356 if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) || 1357 !cancel_delayed_work(&transport->connect_worker)) 1358 return; 1359 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1360 xprt_clear_connecting(xprt); 1361 } 1362 1363 static void xs_sock_mark_closed(struct rpc_xprt *xprt) 1364 { 1365 smp_mb__before_clear_bit(); 1366 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1367 clear_bit(XPRT_CLOSING, &xprt->state); 1368 smp_mb__after_clear_bit(); 1369 /* Mark transport as closed and wake up all pending tasks */ 1370 xprt_disconnect_done(xprt); 1371 } 1372 1373 /** 1374 * xs_tcp_state_change - callback to handle TCP socket state changes 1375 * @sk: socket whose state has changed 1376 * 1377 */ 1378 static void xs_tcp_state_change(struct sock *sk) 1379 { 1380 struct rpc_xprt *xprt; 1381 1382 read_lock(&sk->sk_callback_lock); 1383 if (!(xprt = xprt_from_sock(sk))) 1384 goto out; 1385 dprintk("RPC: xs_tcp_state_change client %p...\n", xprt); 1386 dprintk("RPC: state %x conn %d dead %d zapped %d\n", 1387 sk->sk_state, xprt_connected(xprt), 1388 sock_flag(sk, SOCK_DEAD), 1389 sock_flag(sk, SOCK_ZAPPED)); 1390 1391 switch (sk->sk_state) { 1392 case TCP_ESTABLISHED: 1393 spin_lock_bh(&xprt->transport_lock); 1394 if (!xprt_test_and_set_connected(xprt)) { 1395 struct sock_xprt *transport = container_of(xprt, 1396 struct sock_xprt, xprt); 1397 1398 /* Reset TCP record info */ 1399 transport->tcp_offset = 0; 1400 transport->tcp_reclen = 0; 1401 transport->tcp_copied = 0; 1402 transport->tcp_flags = 1403 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; 1404 1405 xprt_wake_pending_tasks(xprt, -EAGAIN); 1406 } 1407 spin_unlock_bh(&xprt->transport_lock); 1408 break; 1409 case TCP_FIN_WAIT1: 1410 /* The client initiated a shutdown of the socket */ 1411 xprt->connect_cookie++; 1412 xprt->reestablish_timeout = 0; 1413 set_bit(XPRT_CLOSING, &xprt->state); 1414 smp_mb__before_clear_bit(); 1415 clear_bit(XPRT_CONNECTED, &xprt->state); 1416 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1417 smp_mb__after_clear_bit(); 1418 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); 1419 break; 1420 case TCP_CLOSE_WAIT: 1421 /* The server initiated a shutdown of the socket */ 1422 xprt_force_disconnect(xprt); 1423 case TCP_SYN_SENT: 1424 xprt->connect_cookie++; 1425 case TCP_CLOSING: 1426 /* 1427 * If the server closed down the connection, make sure that 1428 * we back off before reconnecting 1429 */ 1430 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 1431 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 1432 break; 1433 case TCP_LAST_ACK: 1434 set_bit(XPRT_CLOSING, &xprt->state); 1435 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); 1436 smp_mb__before_clear_bit(); 1437 clear_bit(XPRT_CONNECTED, &xprt->state); 1438 smp_mb__after_clear_bit(); 1439 break; 1440 case TCP_CLOSE: 1441 xs_tcp_cancel_linger_timeout(xprt); 1442 xs_sock_mark_closed(xprt); 1443 } 1444 out: 1445 read_unlock(&sk->sk_callback_lock); 1446 } 1447 1448 /** 1449 * xs_error_report - callback mainly for catching socket errors 1450 * @sk: socket 1451 */ 1452 static void xs_error_report(struct sock *sk) 1453 { 1454 struct rpc_xprt *xprt; 1455 1456 read_lock(&sk->sk_callback_lock); 1457 if (!(xprt = xprt_from_sock(sk))) 1458 goto out; 1459 dprintk("RPC: %s client %p...\n" 1460 "RPC: error %d\n", 1461 __func__, xprt, sk->sk_err); 1462 xprt_wake_pending_tasks(xprt, -EAGAIN); 1463 out: 1464 read_unlock(&sk->sk_callback_lock); 1465 } 1466 1467 static void xs_write_space(struct sock *sk) 1468 { 1469 struct socket *sock; 1470 struct rpc_xprt *xprt; 1471 1472 if (unlikely(!(sock = sk->sk_socket))) 1473 return; 1474 clear_bit(SOCK_NOSPACE, &sock->flags); 1475 1476 if (unlikely(!(xprt = xprt_from_sock(sk)))) 1477 return; 1478 if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0) 1479 return; 1480 1481 xprt_write_space(xprt); 1482 } 1483 1484 /** 1485 * xs_udp_write_space - callback invoked when socket buffer space 1486 * becomes available 1487 * @sk: socket whose state has changed 1488 * 1489 * Called when more output buffer space is available for this socket. 1490 * We try not to wake our writers until they can make "significant" 1491 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1492 * with a bunch of small requests. 1493 */ 1494 static void xs_udp_write_space(struct sock *sk) 1495 { 1496 read_lock(&sk->sk_callback_lock); 1497 1498 /* from net/core/sock.c:sock_def_write_space */ 1499 if (sock_writeable(sk)) 1500 xs_write_space(sk); 1501 1502 read_unlock(&sk->sk_callback_lock); 1503 } 1504 1505 /** 1506 * xs_tcp_write_space - callback invoked when socket buffer space 1507 * becomes available 1508 * @sk: socket whose state has changed 1509 * 1510 * Called when more output buffer space is available for this socket. 1511 * We try not to wake our writers until they can make "significant" 1512 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1513 * with a bunch of small requests. 1514 */ 1515 static void xs_tcp_write_space(struct sock *sk) 1516 { 1517 read_lock(&sk->sk_callback_lock); 1518 1519 /* from net/core/stream.c:sk_stream_write_space */ 1520 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) 1521 xs_write_space(sk); 1522 1523 read_unlock(&sk->sk_callback_lock); 1524 } 1525 1526 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt) 1527 { 1528 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1529 struct sock *sk = transport->inet; 1530 1531 if (transport->rcvsize) { 1532 sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 1533 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2; 1534 } 1535 if (transport->sndsize) { 1536 sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 1537 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2; 1538 sk->sk_write_space(sk); 1539 } 1540 } 1541 1542 /** 1543 * xs_udp_set_buffer_size - set send and receive limits 1544 * @xprt: generic transport 1545 * @sndsize: requested size of send buffer, in bytes 1546 * @rcvsize: requested size of receive buffer, in bytes 1547 * 1548 * Set socket send and receive buffer size limits. 1549 */ 1550 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize) 1551 { 1552 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1553 1554 transport->sndsize = 0; 1555 if (sndsize) 1556 transport->sndsize = sndsize + 1024; 1557 transport->rcvsize = 0; 1558 if (rcvsize) 1559 transport->rcvsize = rcvsize + 1024; 1560 1561 xs_udp_do_set_buffer_size(xprt); 1562 } 1563 1564 /** 1565 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport 1566 * @task: task that timed out 1567 * 1568 * Adjust the congestion window after a retransmit timeout has occurred. 1569 */ 1570 static void xs_udp_timer(struct rpc_task *task) 1571 { 1572 xprt_adjust_cwnd(task, -ETIMEDOUT); 1573 } 1574 1575 static unsigned short xs_get_random_port(void) 1576 { 1577 unsigned short range = xprt_max_resvport - xprt_min_resvport; 1578 unsigned short rand = (unsigned short) net_random() % range; 1579 return rand + xprt_min_resvport; 1580 } 1581 1582 /** 1583 * xs_set_port - reset the port number in the remote endpoint address 1584 * @xprt: generic transport 1585 * @port: new port number 1586 * 1587 */ 1588 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) 1589 { 1590 struct sockaddr *addr = xs_addr(xprt); 1591 1592 dprintk("RPC: setting port for xprt %p to %u\n", xprt, port); 1593 1594 switch (addr->sa_family) { 1595 case AF_INET: 1596 ((struct sockaddr_in *)addr)->sin_port = htons(port); 1597 break; 1598 case AF_INET6: 1599 ((struct sockaddr_in6 *)addr)->sin6_port = htons(port); 1600 break; 1601 default: 1602 BUG(); 1603 } 1604 } 1605 1606 static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock) 1607 { 1608 unsigned short port = transport->port; 1609 1610 if (port == 0 && transport->xprt.resvport) 1611 port = xs_get_random_port(); 1612 return port; 1613 } 1614 1615 static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port) 1616 { 1617 if (transport->port != 0) 1618 transport->port = 0; 1619 if (!transport->xprt.resvport) 1620 return 0; 1621 if (port <= xprt_min_resvport || port > xprt_max_resvport) 1622 return xprt_max_resvport; 1623 return --port; 1624 } 1625 1626 static int xs_bind4(struct sock_xprt *transport, struct socket *sock) 1627 { 1628 struct sockaddr_in myaddr = { 1629 .sin_family = AF_INET, 1630 }; 1631 struct sockaddr_in *sa; 1632 int err, nloop = 0; 1633 unsigned short port = xs_get_srcport(transport, sock); 1634 unsigned short last; 1635 1636 sa = (struct sockaddr_in *)&transport->addr; 1637 myaddr.sin_addr = sa->sin_addr; 1638 do { 1639 myaddr.sin_port = htons(port); 1640 err = kernel_bind(sock, (struct sockaddr *) &myaddr, 1641 sizeof(myaddr)); 1642 if (port == 0) 1643 break; 1644 if (err == 0) { 1645 transport->port = port; 1646 break; 1647 } 1648 last = port; 1649 port = xs_next_srcport(transport, sock, port); 1650 if (port > last) 1651 nloop++; 1652 } while (err == -EADDRINUSE && nloop != 2); 1653 dprintk("RPC: %s %pI4:%u: %s (%d)\n", 1654 __func__, &myaddr.sin_addr, 1655 port, err ? "failed" : "ok", err); 1656 return err; 1657 } 1658 1659 static int xs_bind6(struct sock_xprt *transport, struct socket *sock) 1660 { 1661 struct sockaddr_in6 myaddr = { 1662 .sin6_family = AF_INET6, 1663 }; 1664 struct sockaddr_in6 *sa; 1665 int err, nloop = 0; 1666 unsigned short port = xs_get_srcport(transport, sock); 1667 unsigned short last; 1668 1669 sa = (struct sockaddr_in6 *)&transport->addr; 1670 myaddr.sin6_addr = sa->sin6_addr; 1671 do { 1672 myaddr.sin6_port = htons(port); 1673 err = kernel_bind(sock, (struct sockaddr *) &myaddr, 1674 sizeof(myaddr)); 1675 if (port == 0) 1676 break; 1677 if (err == 0) { 1678 transport->port = port; 1679 break; 1680 } 1681 last = port; 1682 port = xs_next_srcport(transport, sock, port); 1683 if (port > last) 1684 nloop++; 1685 } while (err == -EADDRINUSE && nloop != 2); 1686 dprintk("RPC: xs_bind6 %pI6:%u: %s (%d)\n", 1687 &myaddr.sin6_addr, port, err ? "failed" : "ok", err); 1688 return err; 1689 } 1690 1691 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1692 static struct lock_class_key xs_key[2]; 1693 static struct lock_class_key xs_slock_key[2]; 1694 1695 static inline void xs_reclassify_socket4(struct socket *sock) 1696 { 1697 struct sock *sk = sock->sk; 1698 1699 BUG_ON(sock_owned_by_user(sk)); 1700 sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC", 1701 &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]); 1702 } 1703 1704 static inline void xs_reclassify_socket6(struct socket *sock) 1705 { 1706 struct sock *sk = sock->sk; 1707 1708 BUG_ON(sock_owned_by_user(sk)); 1709 sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC", 1710 &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]); 1711 } 1712 #else 1713 static inline void xs_reclassify_socket4(struct socket *sock) 1714 { 1715 } 1716 1717 static inline void xs_reclassify_socket6(struct socket *sock) 1718 { 1719 } 1720 #endif 1721 1722 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 1723 { 1724 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1725 1726 if (!transport->inet) { 1727 struct sock *sk = sock->sk; 1728 1729 write_lock_bh(&sk->sk_callback_lock); 1730 1731 xs_save_old_callbacks(transport, sk); 1732 1733 sk->sk_user_data = xprt; 1734 sk->sk_data_ready = xs_udp_data_ready; 1735 sk->sk_write_space = xs_udp_write_space; 1736 sk->sk_error_report = xs_error_report; 1737 sk->sk_no_check = UDP_CSUM_NORCV; 1738 sk->sk_allocation = GFP_ATOMIC; 1739 1740 xprt_set_connected(xprt); 1741 1742 /* Reset to new socket */ 1743 transport->sock = sock; 1744 transport->inet = sk; 1745 1746 write_unlock_bh(&sk->sk_callback_lock); 1747 } 1748 xs_udp_do_set_buffer_size(xprt); 1749 } 1750 1751 /** 1752 * xs_udp_connect_worker4 - set up a UDP socket 1753 * @work: RPC transport to connect 1754 * 1755 * Invoked by a work queue tasklet. 1756 */ 1757 static void xs_udp_connect_worker4(struct work_struct *work) 1758 { 1759 struct sock_xprt *transport = 1760 container_of(work, struct sock_xprt, connect_worker.work); 1761 struct rpc_xprt *xprt = &transport->xprt; 1762 struct socket *sock = transport->sock; 1763 int err, status = -EIO; 1764 1765 if (xprt->shutdown) 1766 goto out; 1767 1768 /* Start by resetting any existing state */ 1769 xs_reset_transport(transport); 1770 1771 err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock); 1772 if (err < 0) { 1773 dprintk("RPC: can't create UDP transport socket (%d).\n", -err); 1774 goto out; 1775 } 1776 xs_reclassify_socket4(sock); 1777 1778 if (xs_bind4(transport, sock)) { 1779 sock_release(sock); 1780 goto out; 1781 } 1782 1783 dprintk("RPC: worker connecting xprt %p to address: %s\n", 1784 xprt, xprt->address_strings[RPC_DISPLAY_ALL]); 1785 1786 xs_udp_finish_connecting(xprt, sock); 1787 status = 0; 1788 out: 1789 xprt_clear_connecting(xprt); 1790 xprt_wake_pending_tasks(xprt, status); 1791 } 1792 1793 /** 1794 * xs_udp_connect_worker6 - set up a UDP socket 1795 * @work: RPC transport to connect 1796 * 1797 * Invoked by a work queue tasklet. 1798 */ 1799 static void xs_udp_connect_worker6(struct work_struct *work) 1800 { 1801 struct sock_xprt *transport = 1802 container_of(work, struct sock_xprt, connect_worker.work); 1803 struct rpc_xprt *xprt = &transport->xprt; 1804 struct socket *sock = transport->sock; 1805 int err, status = -EIO; 1806 1807 if (xprt->shutdown) 1808 goto out; 1809 1810 /* Start by resetting any existing state */ 1811 xs_reset_transport(transport); 1812 1813 err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock); 1814 if (err < 0) { 1815 dprintk("RPC: can't create UDP transport socket (%d).\n", -err); 1816 goto out; 1817 } 1818 xs_reclassify_socket6(sock); 1819 1820 if (xs_bind6(transport, sock) < 0) { 1821 sock_release(sock); 1822 goto out; 1823 } 1824 1825 dprintk("RPC: worker connecting xprt %p to address: %s\n", 1826 xprt, xprt->address_strings[RPC_DISPLAY_ALL]); 1827 1828 xs_udp_finish_connecting(xprt, sock); 1829 status = 0; 1830 out: 1831 xprt_clear_connecting(xprt); 1832 xprt_wake_pending_tasks(xprt, status); 1833 } 1834 1835 /* 1836 * We need to preserve the port number so the reply cache on the server can 1837 * find our cached RPC replies when we get around to reconnecting. 1838 */ 1839 static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport) 1840 { 1841 int result; 1842 struct sockaddr any; 1843 1844 dprintk("RPC: disconnecting xprt %p to reuse port\n", xprt); 1845 1846 /* 1847 * Disconnect the transport socket by doing a connect operation 1848 * with AF_UNSPEC. This should return immediately... 1849 */ 1850 memset(&any, 0, sizeof(any)); 1851 any.sa_family = AF_UNSPEC; 1852 result = kernel_connect(transport->sock, &any, sizeof(any), 0); 1853 if (!result) 1854 xs_sock_mark_closed(xprt); 1855 else 1856 dprintk("RPC: AF_UNSPEC connect return code %d\n", 1857 result); 1858 } 1859 1860 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport) 1861 { 1862 unsigned int state = transport->inet->sk_state; 1863 1864 if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) 1865 return; 1866 if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) 1867 return; 1868 xs_abort_connection(xprt, transport); 1869 } 1870 1871 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 1872 { 1873 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1874 1875 if (!transport->inet) { 1876 struct sock *sk = sock->sk; 1877 1878 write_lock_bh(&sk->sk_callback_lock); 1879 1880 xs_save_old_callbacks(transport, sk); 1881 1882 sk->sk_user_data = xprt; 1883 sk->sk_data_ready = xs_tcp_data_ready; 1884 sk->sk_state_change = xs_tcp_state_change; 1885 sk->sk_write_space = xs_tcp_write_space; 1886 sk->sk_error_report = xs_error_report; 1887 sk->sk_allocation = GFP_ATOMIC; 1888 1889 /* socket options */ 1890 sk->sk_userlocks |= SOCK_BINDPORT_LOCK; 1891 sock_reset_flag(sk, SOCK_LINGER); 1892 tcp_sk(sk)->linger2 = 0; 1893 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; 1894 1895 xprt_clear_connected(xprt); 1896 1897 /* Reset to new socket */ 1898 transport->sock = sock; 1899 transport->inet = sk; 1900 1901 write_unlock_bh(&sk->sk_callback_lock); 1902 } 1903 1904 if (!xprt_bound(xprt)) 1905 return -ENOTCONN; 1906 1907 /* Tell the socket layer to start connecting... */ 1908 xprt->stat.connect_count++; 1909 xprt->stat.connect_start = jiffies; 1910 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); 1911 } 1912 1913 /** 1914 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint 1915 * @xprt: RPC transport to connect 1916 * @transport: socket transport to connect 1917 * @create_sock: function to create a socket of the correct type 1918 * 1919 * Invoked by a work queue tasklet. 1920 */ 1921 static void xs_tcp_setup_socket(struct rpc_xprt *xprt, 1922 struct sock_xprt *transport, 1923 struct socket *(*create_sock)(struct rpc_xprt *, 1924 struct sock_xprt *)) 1925 { 1926 struct socket *sock = transport->sock; 1927 int status = -EIO; 1928 1929 if (xprt->shutdown) 1930 goto out; 1931 1932 if (!sock) { 1933 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1934 sock = create_sock(xprt, transport); 1935 if (IS_ERR(sock)) { 1936 status = PTR_ERR(sock); 1937 goto out; 1938 } 1939 } else { 1940 int abort_and_exit; 1941 1942 abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT, 1943 &xprt->state); 1944 /* "close" the socket, preserving the local port */ 1945 xs_tcp_reuse_connection(xprt, transport); 1946 1947 if (abort_and_exit) 1948 goto out_eagain; 1949 } 1950 1951 dprintk("RPC: worker connecting xprt %p to address: %s\n", 1952 xprt, xprt->address_strings[RPC_DISPLAY_ALL]); 1953 1954 status = xs_tcp_finish_connecting(xprt, sock); 1955 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 1956 xprt, -status, xprt_connected(xprt), 1957 sock->sk->sk_state); 1958 switch (status) { 1959 default: 1960 printk("%s: connect returned unhandled error %d\n", 1961 __func__, status); 1962 case -EADDRNOTAVAIL: 1963 /* We're probably in TIME_WAIT. Get rid of existing socket, 1964 * and retry 1965 */ 1966 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); 1967 xprt_force_disconnect(xprt); 1968 break; 1969 case -ECONNREFUSED: 1970 case -ECONNRESET: 1971 case -ENETUNREACH: 1972 /* retry with existing socket, after a delay */ 1973 case 0: 1974 case -EINPROGRESS: 1975 case -EALREADY: 1976 xprt_clear_connecting(xprt); 1977 return; 1978 } 1979 out_eagain: 1980 status = -EAGAIN; 1981 out: 1982 xprt_clear_connecting(xprt); 1983 xprt_wake_pending_tasks(xprt, status); 1984 } 1985 1986 static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt, 1987 struct sock_xprt *transport) 1988 { 1989 struct socket *sock; 1990 int err; 1991 1992 /* start from scratch */ 1993 err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); 1994 if (err < 0) { 1995 dprintk("RPC: can't create TCP transport socket (%d).\n", 1996 -err); 1997 goto out_err; 1998 } 1999 xs_reclassify_socket4(sock); 2000 2001 if (xs_bind4(transport, sock) < 0) { 2002 sock_release(sock); 2003 goto out_err; 2004 } 2005 return sock; 2006 out_err: 2007 return ERR_PTR(-EIO); 2008 } 2009 2010 /** 2011 * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint 2012 * @work: RPC transport to connect 2013 * 2014 * Invoked by a work queue tasklet. 2015 */ 2016 static void xs_tcp_connect_worker4(struct work_struct *work) 2017 { 2018 struct sock_xprt *transport = 2019 container_of(work, struct sock_xprt, connect_worker.work); 2020 struct rpc_xprt *xprt = &transport->xprt; 2021 2022 xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4); 2023 } 2024 2025 static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt, 2026 struct sock_xprt *transport) 2027 { 2028 struct socket *sock; 2029 int err; 2030 2031 /* start from scratch */ 2032 err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock); 2033 if (err < 0) { 2034 dprintk("RPC: can't create TCP transport socket (%d).\n", 2035 -err); 2036 goto out_err; 2037 } 2038 xs_reclassify_socket6(sock); 2039 2040 if (xs_bind6(transport, sock) < 0) { 2041 sock_release(sock); 2042 goto out_err; 2043 } 2044 return sock; 2045 out_err: 2046 return ERR_PTR(-EIO); 2047 } 2048 2049 /** 2050 * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint 2051 * @work: RPC transport to connect 2052 * 2053 * Invoked by a work queue tasklet. 2054 */ 2055 static void xs_tcp_connect_worker6(struct work_struct *work) 2056 { 2057 struct sock_xprt *transport = 2058 container_of(work, struct sock_xprt, connect_worker.work); 2059 struct rpc_xprt *xprt = &transport->xprt; 2060 2061 xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6); 2062 } 2063 2064 /** 2065 * xs_connect - connect a socket to a remote endpoint 2066 * @task: address of RPC task that manages state of connect request 2067 * 2068 * TCP: If the remote end dropped the connection, delay reconnecting. 2069 * 2070 * UDP socket connects are synchronous, but we use a work queue anyway 2071 * to guarantee that even unprivileged user processes can set up a 2072 * socket on a privileged port. 2073 * 2074 * If a UDP socket connect fails, the delay behavior here prevents 2075 * retry floods (hard mounts). 2076 */ 2077 static void xs_connect(struct rpc_task *task) 2078 { 2079 struct rpc_xprt *xprt = task->tk_xprt; 2080 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2081 2082 if (xprt_test_and_set_connecting(xprt)) 2083 return; 2084 2085 if (transport->sock != NULL) { 2086 dprintk("RPC: xs_connect delayed xprt %p for %lu " 2087 "seconds\n", 2088 xprt, xprt->reestablish_timeout / HZ); 2089 queue_delayed_work(rpciod_workqueue, 2090 &transport->connect_worker, 2091 xprt->reestablish_timeout); 2092 xprt->reestablish_timeout <<= 1; 2093 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO) 2094 xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO; 2095 } else { 2096 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); 2097 queue_delayed_work(rpciod_workqueue, 2098 &transport->connect_worker, 0); 2099 } 2100 } 2101 2102 static void xs_tcp_connect(struct rpc_task *task) 2103 { 2104 struct rpc_xprt *xprt = task->tk_xprt; 2105 2106 /* Exit if we need to wait for socket shutdown to complete */ 2107 if (test_bit(XPRT_CLOSING, &xprt->state)) 2108 return; 2109 xs_connect(task); 2110 } 2111 2112 /** 2113 * xs_udp_print_stats - display UDP socket-specifc stats 2114 * @xprt: rpc_xprt struct containing statistics 2115 * @seq: output file 2116 * 2117 */ 2118 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2119 { 2120 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2121 2122 seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n", 2123 transport->port, 2124 xprt->stat.bind_count, 2125 xprt->stat.sends, 2126 xprt->stat.recvs, 2127 xprt->stat.bad_xids, 2128 xprt->stat.req_u, 2129 xprt->stat.bklog_u); 2130 } 2131 2132 /** 2133 * xs_tcp_print_stats - display TCP socket-specifc stats 2134 * @xprt: rpc_xprt struct containing statistics 2135 * @seq: output file 2136 * 2137 */ 2138 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2139 { 2140 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2141 long idle_time = 0; 2142 2143 if (xprt_connected(xprt)) 2144 idle_time = (long)(jiffies - xprt->last_used) / HZ; 2145 2146 seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n", 2147 transport->port, 2148 xprt->stat.bind_count, 2149 xprt->stat.connect_count, 2150 xprt->stat.connect_time, 2151 idle_time, 2152 xprt->stat.sends, 2153 xprt->stat.recvs, 2154 xprt->stat.bad_xids, 2155 xprt->stat.req_u, 2156 xprt->stat.bklog_u); 2157 } 2158 2159 static struct rpc_xprt_ops xs_udp_ops = { 2160 .set_buffer_size = xs_udp_set_buffer_size, 2161 .reserve_xprt = xprt_reserve_xprt_cong, 2162 .release_xprt = xprt_release_xprt_cong, 2163 .rpcbind = rpcb_getport_async, 2164 .set_port = xs_set_port, 2165 .connect = xs_connect, 2166 .buf_alloc = rpc_malloc, 2167 .buf_free = rpc_free, 2168 .send_request = xs_udp_send_request, 2169 .set_retrans_timeout = xprt_set_retrans_timeout_rtt, 2170 .timer = xs_udp_timer, 2171 .release_request = xprt_release_rqst_cong, 2172 .close = xs_close, 2173 .destroy = xs_destroy, 2174 .print_stats = xs_udp_print_stats, 2175 }; 2176 2177 static struct rpc_xprt_ops xs_tcp_ops = { 2178 .reserve_xprt = xprt_reserve_xprt, 2179 .release_xprt = xs_tcp_release_xprt, 2180 .rpcbind = rpcb_getport_async, 2181 .set_port = xs_set_port, 2182 .connect = xs_tcp_connect, 2183 .buf_alloc = rpc_malloc, 2184 .buf_free = rpc_free, 2185 .send_request = xs_tcp_send_request, 2186 .set_retrans_timeout = xprt_set_retrans_timeout_def, 2187 #if defined(CONFIG_NFS_V4_1) 2188 .release_request = bc_release_request, 2189 #endif /* CONFIG_NFS_V4_1 */ 2190 .close = xs_tcp_close, 2191 .destroy = xs_destroy, 2192 .print_stats = xs_tcp_print_stats, 2193 }; 2194 2195 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, 2196 unsigned int slot_table_size) 2197 { 2198 struct rpc_xprt *xprt; 2199 struct sock_xprt *new; 2200 2201 if (args->addrlen > sizeof(xprt->addr)) { 2202 dprintk("RPC: xs_setup_xprt: address too large\n"); 2203 return ERR_PTR(-EBADF); 2204 } 2205 2206 new = kzalloc(sizeof(*new), GFP_KERNEL); 2207 if (new == NULL) { 2208 dprintk("RPC: xs_setup_xprt: couldn't allocate " 2209 "rpc_xprt\n"); 2210 return ERR_PTR(-ENOMEM); 2211 } 2212 xprt = &new->xprt; 2213 2214 xprt->max_reqs = slot_table_size; 2215 xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL); 2216 if (xprt->slot == NULL) { 2217 kfree(xprt); 2218 dprintk("RPC: xs_setup_xprt: couldn't allocate slot " 2219 "table\n"); 2220 return ERR_PTR(-ENOMEM); 2221 } 2222 2223 memcpy(&xprt->addr, args->dstaddr, args->addrlen); 2224 xprt->addrlen = args->addrlen; 2225 if (args->srcaddr) 2226 memcpy(&new->addr, args->srcaddr, args->addrlen); 2227 2228 return xprt; 2229 } 2230 2231 static const struct rpc_timeout xs_udp_default_timeout = { 2232 .to_initval = 5 * HZ, 2233 .to_maxval = 30 * HZ, 2234 .to_increment = 5 * HZ, 2235 .to_retries = 5, 2236 }; 2237 2238 /** 2239 * xs_setup_udp - Set up transport to use a UDP socket 2240 * @args: rpc transport creation arguments 2241 * 2242 */ 2243 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) 2244 { 2245 struct sockaddr *addr = args->dstaddr; 2246 struct rpc_xprt *xprt; 2247 struct sock_xprt *transport; 2248 2249 xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries); 2250 if (IS_ERR(xprt)) 2251 return xprt; 2252 transport = container_of(xprt, struct sock_xprt, xprt); 2253 2254 xprt->prot = IPPROTO_UDP; 2255 xprt->tsh_size = 0; 2256 /* XXX: header size can vary due to auth type, IPv6, etc. */ 2257 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 2258 2259 xprt->bind_timeout = XS_BIND_TO; 2260 xprt->connect_timeout = XS_UDP_CONN_TO; 2261 xprt->reestablish_timeout = XS_UDP_REEST_TO; 2262 xprt->idle_timeout = XS_IDLE_DISC_TO; 2263 2264 xprt->ops = &xs_udp_ops; 2265 2266 xprt->timeout = &xs_udp_default_timeout; 2267 2268 switch (addr->sa_family) { 2269 case AF_INET: 2270 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2271 xprt_set_bound(xprt); 2272 2273 INIT_DELAYED_WORK(&transport->connect_worker, 2274 xs_udp_connect_worker4); 2275 xs_format_ipv4_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); 2276 break; 2277 case AF_INET6: 2278 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2279 xprt_set_bound(xprt); 2280 2281 INIT_DELAYED_WORK(&transport->connect_worker, 2282 xs_udp_connect_worker6); 2283 xs_format_ipv6_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); 2284 break; 2285 default: 2286 kfree(xprt); 2287 return ERR_PTR(-EAFNOSUPPORT); 2288 } 2289 2290 dprintk("RPC: set up transport to address %s\n", 2291 xprt->address_strings[RPC_DISPLAY_ALL]); 2292 2293 if (try_module_get(THIS_MODULE)) 2294 return xprt; 2295 2296 kfree(xprt->slot); 2297 kfree(xprt); 2298 return ERR_PTR(-EINVAL); 2299 } 2300 2301 static const struct rpc_timeout xs_tcp_default_timeout = { 2302 .to_initval = 60 * HZ, 2303 .to_maxval = 60 * HZ, 2304 .to_retries = 2, 2305 }; 2306 2307 /** 2308 * xs_setup_tcp - Set up transport to use a TCP socket 2309 * @args: rpc transport creation arguments 2310 * 2311 */ 2312 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) 2313 { 2314 struct sockaddr *addr = args->dstaddr; 2315 struct rpc_xprt *xprt; 2316 struct sock_xprt *transport; 2317 2318 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries); 2319 if (IS_ERR(xprt)) 2320 return xprt; 2321 transport = container_of(xprt, struct sock_xprt, xprt); 2322 2323 xprt->prot = IPPROTO_TCP; 2324 xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); 2325 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 2326 2327 xprt->bind_timeout = XS_BIND_TO; 2328 xprt->connect_timeout = XS_TCP_CONN_TO; 2329 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2330 xprt->idle_timeout = XS_IDLE_DISC_TO; 2331 2332 xprt->ops = &xs_tcp_ops; 2333 xprt->timeout = &xs_tcp_default_timeout; 2334 2335 switch (addr->sa_family) { 2336 case AF_INET: 2337 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2338 xprt_set_bound(xprt); 2339 2340 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4); 2341 xs_format_ipv4_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); 2342 break; 2343 case AF_INET6: 2344 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2345 xprt_set_bound(xprt); 2346 2347 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6); 2348 xs_format_ipv6_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); 2349 break; 2350 default: 2351 kfree(xprt); 2352 return ERR_PTR(-EAFNOSUPPORT); 2353 } 2354 2355 dprintk("RPC: set up transport to address %s\n", 2356 xprt->address_strings[RPC_DISPLAY_ALL]); 2357 2358 if (try_module_get(THIS_MODULE)) 2359 return xprt; 2360 2361 kfree(xprt->slot); 2362 kfree(xprt); 2363 return ERR_PTR(-EINVAL); 2364 } 2365 2366 static struct xprt_class xs_udp_transport = { 2367 .list = LIST_HEAD_INIT(xs_udp_transport.list), 2368 .name = "udp", 2369 .owner = THIS_MODULE, 2370 .ident = IPPROTO_UDP, 2371 .setup = xs_setup_udp, 2372 }; 2373 2374 static struct xprt_class xs_tcp_transport = { 2375 .list = LIST_HEAD_INIT(xs_tcp_transport.list), 2376 .name = "tcp", 2377 .owner = THIS_MODULE, 2378 .ident = IPPROTO_TCP, 2379 .setup = xs_setup_tcp, 2380 }; 2381 2382 /** 2383 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client 2384 * 2385 */ 2386 int init_socket_xprt(void) 2387 { 2388 #ifdef RPC_DEBUG 2389 if (!sunrpc_table_header) 2390 sunrpc_table_header = register_sysctl_table(sunrpc_table); 2391 #endif 2392 2393 xprt_register_transport(&xs_udp_transport); 2394 xprt_register_transport(&xs_tcp_transport); 2395 2396 return 0; 2397 } 2398 2399 /** 2400 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister 2401 * 2402 */ 2403 void cleanup_socket_xprt(void) 2404 { 2405 #ifdef RPC_DEBUG 2406 if (sunrpc_table_header) { 2407 unregister_sysctl_table(sunrpc_table_header); 2408 sunrpc_table_header = NULL; 2409 } 2410 #endif 2411 2412 xprt_unregister_transport(&xs_udp_transport); 2413 xprt_unregister_transport(&xs_tcp_transport); 2414 } 2415