1 /* 2 * linux/net/sunrpc/xprtsock.c 3 * 4 * Client-side transport implementation for sockets. 5 * 6 * TCP callback races fixes (C) 1998 Red Hat 7 * TCP send fixes (C) 1998 Red Hat 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 * 11 * Rewrite of larges part of the code in order to stabilize TCP stuff. 12 * Fix behaviour when socket buffer is full. 13 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 14 * 15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com> 16 * 17 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005. 18 * <gilles.quillard@bull.net> 19 */ 20 21 #include <linux/types.h> 22 #include <linux/slab.h> 23 #include <linux/module.h> 24 #include <linux/capability.h> 25 #include <linux/pagemap.h> 26 #include <linux/errno.h> 27 #include <linux/socket.h> 28 #include <linux/in.h> 29 #include <linux/net.h> 30 #include <linux/mm.h> 31 #include <linux/udp.h> 32 #include <linux/tcp.h> 33 #include <linux/sunrpc/clnt.h> 34 #include <linux/sunrpc/sched.h> 35 #include <linux/sunrpc/xprtsock.h> 36 #include <linux/file.h> 37 #ifdef CONFIG_NFS_V4_1 38 #include <linux/sunrpc/bc_xprt.h> 39 #endif 40 41 #include <net/sock.h> 42 #include <net/checksum.h> 43 #include <net/udp.h> 44 #include <net/tcp.h> 45 46 /* 47 * xprtsock tunables 48 */ 49 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; 50 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; 51 52 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; 53 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; 54 55 #define XS_TCP_LINGER_TO (15U * HZ) 56 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; 57 58 /* 59 * We can register our own files under /proc/sys/sunrpc by 60 * calling register_sysctl_table() again. The files in that 61 * directory become the union of all files registered there. 62 * 63 * We simply need to make sure that we don't collide with 64 * someone else's file names! 65 */ 66 67 #ifdef RPC_DEBUG 68 69 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; 70 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; 71 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT; 72 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT; 73 74 static struct ctl_table_header *sunrpc_table_header; 75 76 /* 77 * FIXME: changing the UDP slot table size should also resize the UDP 78 * socket buffers for existing UDP transports 79 */ 80 static ctl_table xs_tunables_table[] = { 81 { 82 .ctl_name = CTL_SLOTTABLE_UDP, 83 .procname = "udp_slot_table_entries", 84 .data = &xprt_udp_slot_table_entries, 85 .maxlen = sizeof(unsigned int), 86 .mode = 0644, 87 .proc_handler = &proc_dointvec_minmax, 88 .strategy = &sysctl_intvec, 89 .extra1 = &min_slot_table_size, 90 .extra2 = &max_slot_table_size 91 }, 92 { 93 .ctl_name = CTL_SLOTTABLE_TCP, 94 .procname = "tcp_slot_table_entries", 95 .data = &xprt_tcp_slot_table_entries, 96 .maxlen = sizeof(unsigned int), 97 .mode = 0644, 98 .proc_handler = &proc_dointvec_minmax, 99 .strategy = &sysctl_intvec, 100 .extra1 = &min_slot_table_size, 101 .extra2 = &max_slot_table_size 102 }, 103 { 104 .ctl_name = CTL_MIN_RESVPORT, 105 .procname = "min_resvport", 106 .data = &xprt_min_resvport, 107 .maxlen = sizeof(unsigned int), 108 .mode = 0644, 109 .proc_handler = &proc_dointvec_minmax, 110 .strategy = &sysctl_intvec, 111 .extra1 = &xprt_min_resvport_limit, 112 .extra2 = &xprt_max_resvport_limit 113 }, 114 { 115 .ctl_name = CTL_MAX_RESVPORT, 116 .procname = "max_resvport", 117 .data = &xprt_max_resvport, 118 .maxlen = sizeof(unsigned int), 119 .mode = 0644, 120 .proc_handler = &proc_dointvec_minmax, 121 .strategy = &sysctl_intvec, 122 .extra1 = &xprt_min_resvport_limit, 123 .extra2 = &xprt_max_resvport_limit 124 }, 125 { 126 .procname = "tcp_fin_timeout", 127 .data = &xs_tcp_fin_timeout, 128 .maxlen = sizeof(xs_tcp_fin_timeout), 129 .mode = 0644, 130 .proc_handler = &proc_dointvec_jiffies, 131 .strategy = sysctl_jiffies 132 }, 133 { 134 .ctl_name = 0, 135 }, 136 }; 137 138 static ctl_table sunrpc_table[] = { 139 { 140 .ctl_name = CTL_SUNRPC, 141 .procname = "sunrpc", 142 .mode = 0555, 143 .child = xs_tunables_table 144 }, 145 { 146 .ctl_name = 0, 147 }, 148 }; 149 150 #endif 151 152 /* 153 * Time out for an RPC UDP socket connect. UDP socket connects are 154 * synchronous, but we set a timeout anyway in case of resource 155 * exhaustion on the local host. 156 */ 157 #define XS_UDP_CONN_TO (5U * HZ) 158 159 /* 160 * Wait duration for an RPC TCP connection to be established. Solaris 161 * NFS over TCP uses 60 seconds, for example, which is in line with how 162 * long a server takes to reboot. 163 */ 164 #define XS_TCP_CONN_TO (60U * HZ) 165 166 /* 167 * Wait duration for a reply from the RPC portmapper. 168 */ 169 #define XS_BIND_TO (60U * HZ) 170 171 /* 172 * Delay if a UDP socket connect error occurs. This is most likely some 173 * kind of resource problem on the local host. 174 */ 175 #define XS_UDP_REEST_TO (2U * HZ) 176 177 /* 178 * The reestablish timeout allows clients to delay for a bit before attempting 179 * to reconnect to a server that just dropped our connection. 180 * 181 * We implement an exponential backoff when trying to reestablish a TCP 182 * transport connection with the server. Some servers like to drop a TCP 183 * connection when they are overworked, so we start with a short timeout and 184 * increase over time if the server is down or not responding. 185 */ 186 #define XS_TCP_INIT_REEST_TO (3U * HZ) 187 #define XS_TCP_MAX_REEST_TO (5U * 60 * HZ) 188 189 /* 190 * TCP idle timeout; client drops the transport socket if it is idle 191 * for this long. Note that we also timeout UDP sockets to prevent 192 * holding port numbers when there is no RPC traffic. 193 */ 194 #define XS_IDLE_DISC_TO (5U * 60 * HZ) 195 196 #ifdef RPC_DEBUG 197 # undef RPC_DEBUG_DATA 198 # define RPCDBG_FACILITY RPCDBG_TRANS 199 #endif 200 201 #ifdef RPC_DEBUG_DATA 202 static void xs_pktdump(char *msg, u32 *packet, unsigned int count) 203 { 204 u8 *buf = (u8 *) packet; 205 int j; 206 207 dprintk("RPC: %s\n", msg); 208 for (j = 0; j < count && j < 128; j += 4) { 209 if (!(j & 31)) { 210 if (j) 211 dprintk("\n"); 212 dprintk("0x%04x ", j); 213 } 214 dprintk("%02x%02x%02x%02x ", 215 buf[j], buf[j+1], buf[j+2], buf[j+3]); 216 } 217 dprintk("\n"); 218 } 219 #else 220 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count) 221 { 222 /* NOP */ 223 } 224 #endif 225 226 struct sock_xprt { 227 struct rpc_xprt xprt; 228 229 /* 230 * Network layer 231 */ 232 struct socket * sock; 233 struct sock * inet; 234 235 /* 236 * State of TCP reply receive 237 */ 238 __be32 tcp_fraghdr, 239 tcp_xid; 240 241 u32 tcp_offset, 242 tcp_reclen; 243 244 unsigned long tcp_copied, 245 tcp_flags; 246 247 /* 248 * Connection of transports 249 */ 250 struct delayed_work connect_worker; 251 struct sockaddr_storage srcaddr; 252 unsigned short srcport; 253 254 /* 255 * UDP socket buffer size parameters 256 */ 257 size_t rcvsize, 258 sndsize; 259 260 /* 261 * Saved socket callback addresses 262 */ 263 void (*old_data_ready)(struct sock *, int); 264 void (*old_state_change)(struct sock *); 265 void (*old_write_space)(struct sock *); 266 void (*old_error_report)(struct sock *); 267 }; 268 269 /* 270 * TCP receive state flags 271 */ 272 #define TCP_RCV_LAST_FRAG (1UL << 0) 273 #define TCP_RCV_COPY_FRAGHDR (1UL << 1) 274 #define TCP_RCV_COPY_XID (1UL << 2) 275 #define TCP_RCV_COPY_DATA (1UL << 3) 276 #define TCP_RCV_READ_CALLDIR (1UL << 4) 277 #define TCP_RCV_COPY_CALLDIR (1UL << 5) 278 279 /* 280 * TCP RPC flags 281 */ 282 #define TCP_RPC_REPLY (1UL << 6) 283 284 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) 285 { 286 return (struct sockaddr *) &xprt->addr; 287 } 288 289 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt) 290 { 291 return (struct sockaddr_in *) &xprt->addr; 292 } 293 294 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt) 295 { 296 return (struct sockaddr_in6 *) &xprt->addr; 297 } 298 299 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt) 300 { 301 struct sockaddr *sap = xs_addr(xprt); 302 struct sockaddr_in6 *sin6; 303 struct sockaddr_in *sin; 304 char buf[128]; 305 306 (void)rpc_ntop(sap, buf, sizeof(buf)); 307 xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL); 308 309 switch (sap->sa_family) { 310 case AF_INET: 311 sin = xs_addr_in(xprt); 312 (void)snprintf(buf, sizeof(buf), "%02x%02x%02x%02x", 313 NIPQUAD(sin->sin_addr.s_addr)); 314 break; 315 case AF_INET6: 316 sin6 = xs_addr_in6(xprt); 317 (void)snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr); 318 break; 319 default: 320 BUG(); 321 } 322 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); 323 } 324 325 static void xs_format_common_peer_ports(struct rpc_xprt *xprt) 326 { 327 struct sockaddr *sap = xs_addr(xprt); 328 char buf[128]; 329 330 (void)snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap)); 331 xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL); 332 333 (void)snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap)); 334 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); 335 } 336 337 static void xs_format_peer_addresses(struct rpc_xprt *xprt, 338 const char *protocol, 339 const char *netid) 340 { 341 xprt->address_strings[RPC_DISPLAY_PROTO] = protocol; 342 xprt->address_strings[RPC_DISPLAY_NETID] = netid; 343 xs_format_common_peer_addresses(xprt); 344 xs_format_common_peer_ports(xprt); 345 } 346 347 static void xs_update_peer_port(struct rpc_xprt *xprt) 348 { 349 kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]); 350 kfree(xprt->address_strings[RPC_DISPLAY_PORT]); 351 352 xs_format_common_peer_ports(xprt); 353 } 354 355 static void xs_free_peer_addresses(struct rpc_xprt *xprt) 356 { 357 unsigned int i; 358 359 for (i = 0; i < RPC_DISPLAY_MAX; i++) 360 switch (i) { 361 case RPC_DISPLAY_PROTO: 362 case RPC_DISPLAY_NETID: 363 continue; 364 default: 365 kfree(xprt->address_strings[i]); 366 } 367 } 368 369 #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) 370 371 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) 372 { 373 struct msghdr msg = { 374 .msg_name = addr, 375 .msg_namelen = addrlen, 376 .msg_flags = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0), 377 }; 378 struct kvec iov = { 379 .iov_base = vec->iov_base + base, 380 .iov_len = vec->iov_len - base, 381 }; 382 383 if (iov.iov_len != 0) 384 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); 385 return kernel_sendmsg(sock, &msg, NULL, 0, 0); 386 } 387 388 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more) 389 { 390 struct page **ppage; 391 unsigned int remainder; 392 int err, sent = 0; 393 394 remainder = xdr->page_len - base; 395 base += xdr->page_base; 396 ppage = xdr->pages + (base >> PAGE_SHIFT); 397 base &= ~PAGE_MASK; 398 for(;;) { 399 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder); 400 int flags = XS_SENDMSG_FLAGS; 401 402 remainder -= len; 403 if (remainder != 0 || more) 404 flags |= MSG_MORE; 405 err = sock->ops->sendpage(sock, *ppage, base, len, flags); 406 if (remainder == 0 || err != len) 407 break; 408 sent += err; 409 ppage++; 410 base = 0; 411 } 412 if (sent == 0) 413 return err; 414 if (err > 0) 415 sent += err; 416 return sent; 417 } 418 419 /** 420 * xs_sendpages - write pages directly to a socket 421 * @sock: socket to send on 422 * @addr: UDP only -- address of destination 423 * @addrlen: UDP only -- length of destination address 424 * @xdr: buffer containing this request 425 * @base: starting position in the buffer 426 * 427 */ 428 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) 429 { 430 unsigned int remainder = xdr->len - base; 431 int err, sent = 0; 432 433 if (unlikely(!sock)) 434 return -ENOTSOCK; 435 436 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 437 if (base != 0) { 438 addr = NULL; 439 addrlen = 0; 440 } 441 442 if (base < xdr->head[0].iov_len || addr != NULL) { 443 unsigned int len = xdr->head[0].iov_len - base; 444 remainder -= len; 445 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0); 446 if (remainder == 0 || err != len) 447 goto out; 448 sent += err; 449 base = 0; 450 } else 451 base -= xdr->head[0].iov_len; 452 453 if (base < xdr->page_len) { 454 unsigned int len = xdr->page_len - base; 455 remainder -= len; 456 err = xs_send_pagedata(sock, xdr, base, remainder != 0); 457 if (remainder == 0 || err != len) 458 goto out; 459 sent += err; 460 base = 0; 461 } else 462 base -= xdr->page_len; 463 464 if (base >= xdr->tail[0].iov_len) 465 return sent; 466 err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0); 467 out: 468 if (sent == 0) 469 return err; 470 if (err > 0) 471 sent += err; 472 return sent; 473 } 474 475 static void xs_nospace_callback(struct rpc_task *task) 476 { 477 struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt); 478 479 transport->inet->sk_write_pending--; 480 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 481 } 482 483 /** 484 * xs_nospace - place task on wait queue if transmit was incomplete 485 * @task: task to put to sleep 486 * 487 */ 488 static int xs_nospace(struct rpc_task *task) 489 { 490 struct rpc_rqst *req = task->tk_rqstp; 491 struct rpc_xprt *xprt = req->rq_xprt; 492 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 493 int ret = 0; 494 495 dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", 496 task->tk_pid, req->rq_slen - req->rq_bytes_sent, 497 req->rq_slen); 498 499 /* Protect against races with write_space */ 500 spin_lock_bh(&xprt->transport_lock); 501 502 /* Don't race with disconnect */ 503 if (xprt_connected(xprt)) { 504 if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) { 505 ret = -EAGAIN; 506 /* 507 * Notify TCP that we're limited by the application 508 * window size 509 */ 510 set_bit(SOCK_NOSPACE, &transport->sock->flags); 511 transport->inet->sk_write_pending++; 512 /* ...and wait for more buffer space */ 513 xprt_wait_for_buffer_space(task, xs_nospace_callback); 514 } 515 } else { 516 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 517 ret = -ENOTCONN; 518 } 519 520 spin_unlock_bh(&xprt->transport_lock); 521 return ret; 522 } 523 524 /** 525 * xs_udp_send_request - write an RPC request to a UDP socket 526 * @task: address of RPC task that manages the state of an RPC request 527 * 528 * Return values: 529 * 0: The request has been sent 530 * EAGAIN: The socket was blocked, please call again later to 531 * complete the request 532 * ENOTCONN: Caller needs to invoke connect logic then call again 533 * other: Some other error occured, the request was not sent 534 */ 535 static int xs_udp_send_request(struct rpc_task *task) 536 { 537 struct rpc_rqst *req = task->tk_rqstp; 538 struct rpc_xprt *xprt = req->rq_xprt; 539 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 540 struct xdr_buf *xdr = &req->rq_snd_buf; 541 int status; 542 543 xs_pktdump("packet data:", 544 req->rq_svec->iov_base, 545 req->rq_svec->iov_len); 546 547 if (!xprt_bound(xprt)) 548 return -ENOTCONN; 549 status = xs_sendpages(transport->sock, 550 xs_addr(xprt), 551 xprt->addrlen, xdr, 552 req->rq_bytes_sent); 553 554 dprintk("RPC: xs_udp_send_request(%u) = %d\n", 555 xdr->len - req->rq_bytes_sent, status); 556 557 if (status >= 0) { 558 task->tk_bytes_sent += status; 559 if (status >= req->rq_slen) 560 return 0; 561 /* Still some bytes left; set up for a retry later. */ 562 status = -EAGAIN; 563 } 564 if (!transport->sock) 565 goto out; 566 567 switch (status) { 568 case -ENOTSOCK: 569 status = -ENOTCONN; 570 /* Should we call xs_close() here? */ 571 break; 572 case -EAGAIN: 573 status = xs_nospace(task); 574 break; 575 default: 576 dprintk("RPC: sendmsg returned unrecognized error %d\n", 577 -status); 578 case -ENETUNREACH: 579 case -EPIPE: 580 case -ECONNREFUSED: 581 /* When the server has died, an ICMP port unreachable message 582 * prompts ECONNREFUSED. */ 583 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 584 } 585 out: 586 return status; 587 } 588 589 /** 590 * xs_tcp_shutdown - gracefully shut down a TCP socket 591 * @xprt: transport 592 * 593 * Initiates a graceful shutdown of the TCP socket by calling the 594 * equivalent of shutdown(SHUT_WR); 595 */ 596 static void xs_tcp_shutdown(struct rpc_xprt *xprt) 597 { 598 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 599 struct socket *sock = transport->sock; 600 601 if (sock != NULL) 602 kernel_sock_shutdown(sock, SHUT_WR); 603 } 604 605 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf) 606 { 607 u32 reclen = buf->len - sizeof(rpc_fraghdr); 608 rpc_fraghdr *base = buf->head[0].iov_base; 609 *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen); 610 } 611 612 /** 613 * xs_tcp_send_request - write an RPC request to a TCP socket 614 * @task: address of RPC task that manages the state of an RPC request 615 * 616 * Return values: 617 * 0: The request has been sent 618 * EAGAIN: The socket was blocked, please call again later to 619 * complete the request 620 * ENOTCONN: Caller needs to invoke connect logic then call again 621 * other: Some other error occured, the request was not sent 622 * 623 * XXX: In the case of soft timeouts, should we eventually give up 624 * if sendmsg is not able to make progress? 625 */ 626 static int xs_tcp_send_request(struct rpc_task *task) 627 { 628 struct rpc_rqst *req = task->tk_rqstp; 629 struct rpc_xprt *xprt = req->rq_xprt; 630 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 631 struct xdr_buf *xdr = &req->rq_snd_buf; 632 int status; 633 634 xs_encode_tcp_record_marker(&req->rq_snd_buf); 635 636 xs_pktdump("packet data:", 637 req->rq_svec->iov_base, 638 req->rq_svec->iov_len); 639 640 /* Continue transmitting the packet/record. We must be careful 641 * to cope with writespace callbacks arriving _after_ we have 642 * called sendmsg(). */ 643 while (1) { 644 status = xs_sendpages(transport->sock, 645 NULL, 0, xdr, req->rq_bytes_sent); 646 647 dprintk("RPC: xs_tcp_send_request(%u) = %d\n", 648 xdr->len - req->rq_bytes_sent, status); 649 650 if (unlikely(status < 0)) 651 break; 652 653 /* If we've sent the entire packet, immediately 654 * reset the count of bytes sent. */ 655 req->rq_bytes_sent += status; 656 task->tk_bytes_sent += status; 657 if (likely(req->rq_bytes_sent >= req->rq_slen)) { 658 req->rq_bytes_sent = 0; 659 return 0; 660 } 661 662 if (status != 0) 663 continue; 664 status = -EAGAIN; 665 break; 666 } 667 if (!transport->sock) 668 goto out; 669 670 switch (status) { 671 case -ENOTSOCK: 672 status = -ENOTCONN; 673 /* Should we call xs_close() here? */ 674 break; 675 case -EAGAIN: 676 status = xs_nospace(task); 677 break; 678 default: 679 dprintk("RPC: sendmsg returned unrecognized error %d\n", 680 -status); 681 case -ECONNRESET: 682 case -EPIPE: 683 xs_tcp_shutdown(xprt); 684 case -ECONNREFUSED: 685 case -ENOTCONN: 686 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 687 } 688 out: 689 return status; 690 } 691 692 /** 693 * xs_tcp_release_xprt - clean up after a tcp transmission 694 * @xprt: transport 695 * @task: rpc task 696 * 697 * This cleans up if an error causes us to abort the transmission of a request. 698 * In this case, the socket may need to be reset in order to avoid confusing 699 * the server. 700 */ 701 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 702 { 703 struct rpc_rqst *req; 704 705 if (task != xprt->snd_task) 706 return; 707 if (task == NULL) 708 goto out_release; 709 req = task->tk_rqstp; 710 if (req->rq_bytes_sent == 0) 711 goto out_release; 712 if (req->rq_bytes_sent == req->rq_snd_buf.len) 713 goto out_release; 714 set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state); 715 out_release: 716 xprt_release_xprt(xprt, task); 717 } 718 719 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) 720 { 721 transport->old_data_ready = sk->sk_data_ready; 722 transport->old_state_change = sk->sk_state_change; 723 transport->old_write_space = sk->sk_write_space; 724 transport->old_error_report = sk->sk_error_report; 725 } 726 727 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk) 728 { 729 sk->sk_data_ready = transport->old_data_ready; 730 sk->sk_state_change = transport->old_state_change; 731 sk->sk_write_space = transport->old_write_space; 732 sk->sk_error_report = transport->old_error_report; 733 } 734 735 static void xs_reset_transport(struct sock_xprt *transport) 736 { 737 struct socket *sock = transport->sock; 738 struct sock *sk = transport->inet; 739 740 if (sk == NULL) 741 return; 742 743 write_lock_bh(&sk->sk_callback_lock); 744 transport->inet = NULL; 745 transport->sock = NULL; 746 747 sk->sk_user_data = NULL; 748 749 xs_restore_old_callbacks(transport, sk); 750 write_unlock_bh(&sk->sk_callback_lock); 751 752 sk->sk_no_check = 0; 753 754 sock_release(sock); 755 } 756 757 /** 758 * xs_close - close a socket 759 * @xprt: transport 760 * 761 * This is used when all requests are complete; ie, no DRC state remains 762 * on the server we want to save. 763 * 764 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with 765 * xs_reset_transport() zeroing the socket from underneath a writer. 766 */ 767 static void xs_close(struct rpc_xprt *xprt) 768 { 769 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 770 771 dprintk("RPC: xs_close xprt %p\n", xprt); 772 773 xs_reset_transport(transport); 774 775 smp_mb__before_clear_bit(); 776 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 777 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 778 clear_bit(XPRT_CLOSING, &xprt->state); 779 smp_mb__after_clear_bit(); 780 xprt_disconnect_done(xprt); 781 } 782 783 static void xs_tcp_close(struct rpc_xprt *xprt) 784 { 785 if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state)) 786 xs_close(xprt); 787 else 788 xs_tcp_shutdown(xprt); 789 } 790 791 /** 792 * xs_destroy - prepare to shutdown a transport 793 * @xprt: doomed transport 794 * 795 */ 796 static void xs_destroy(struct rpc_xprt *xprt) 797 { 798 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 799 800 dprintk("RPC: xs_destroy xprt %p\n", xprt); 801 802 cancel_rearming_delayed_work(&transport->connect_worker); 803 804 xs_close(xprt); 805 xs_free_peer_addresses(xprt); 806 kfree(xprt->slot); 807 kfree(xprt); 808 module_put(THIS_MODULE); 809 } 810 811 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) 812 { 813 return (struct rpc_xprt *) sk->sk_user_data; 814 } 815 816 /** 817 * xs_udp_data_ready - "data ready" callback for UDP sockets 818 * @sk: socket with data to read 819 * @len: how much data to read 820 * 821 */ 822 static void xs_udp_data_ready(struct sock *sk, int len) 823 { 824 struct rpc_task *task; 825 struct rpc_xprt *xprt; 826 struct rpc_rqst *rovr; 827 struct sk_buff *skb; 828 int err, repsize, copied; 829 u32 _xid; 830 __be32 *xp; 831 832 read_lock(&sk->sk_callback_lock); 833 dprintk("RPC: xs_udp_data_ready...\n"); 834 if (!(xprt = xprt_from_sock(sk))) 835 goto out; 836 837 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) 838 goto out; 839 840 if (xprt->shutdown) 841 goto dropit; 842 843 repsize = skb->len - sizeof(struct udphdr); 844 if (repsize < 4) { 845 dprintk("RPC: impossible RPC reply size %d!\n", repsize); 846 goto dropit; 847 } 848 849 /* Copy the XID from the skb... */ 850 xp = skb_header_pointer(skb, sizeof(struct udphdr), 851 sizeof(_xid), &_xid); 852 if (xp == NULL) 853 goto dropit; 854 855 /* Look up and lock the request corresponding to the given XID */ 856 spin_lock(&xprt->transport_lock); 857 rovr = xprt_lookup_rqst(xprt, *xp); 858 if (!rovr) 859 goto out_unlock; 860 task = rovr->rq_task; 861 862 if ((copied = rovr->rq_private_buf.buflen) > repsize) 863 copied = repsize; 864 865 /* Suck it into the iovec, verify checksum if not done by hw. */ 866 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { 867 UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS); 868 goto out_unlock; 869 } 870 871 UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS); 872 873 /* Something worked... */ 874 dst_confirm(skb_dst(skb)); 875 876 xprt_adjust_cwnd(task, copied); 877 xprt_update_rtt(task); 878 xprt_complete_rqst(task, copied); 879 880 out_unlock: 881 spin_unlock(&xprt->transport_lock); 882 dropit: 883 skb_free_datagram(sk, skb); 884 out: 885 read_unlock(&sk->sk_callback_lock); 886 } 887 888 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) 889 { 890 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 891 size_t len, used; 892 char *p; 893 894 p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset; 895 len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset; 896 used = xdr_skb_read_bits(desc, p, len); 897 transport->tcp_offset += used; 898 if (used != len) 899 return; 900 901 transport->tcp_reclen = ntohl(transport->tcp_fraghdr); 902 if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT) 903 transport->tcp_flags |= TCP_RCV_LAST_FRAG; 904 else 905 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG; 906 transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK; 907 908 transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR; 909 transport->tcp_offset = 0; 910 911 /* Sanity check of the record length */ 912 if (unlikely(transport->tcp_reclen < 8)) { 913 dprintk("RPC: invalid TCP record fragment length\n"); 914 xprt_force_disconnect(xprt); 915 return; 916 } 917 dprintk("RPC: reading TCP record fragment of length %d\n", 918 transport->tcp_reclen); 919 } 920 921 static void xs_tcp_check_fraghdr(struct sock_xprt *transport) 922 { 923 if (transport->tcp_offset == transport->tcp_reclen) { 924 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR; 925 transport->tcp_offset = 0; 926 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) { 927 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 928 transport->tcp_flags |= TCP_RCV_COPY_XID; 929 transport->tcp_copied = 0; 930 } 931 } 932 } 933 934 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc) 935 { 936 size_t len, used; 937 char *p; 938 939 len = sizeof(transport->tcp_xid) - transport->tcp_offset; 940 dprintk("RPC: reading XID (%Zu bytes)\n", len); 941 p = ((char *) &transport->tcp_xid) + transport->tcp_offset; 942 used = xdr_skb_read_bits(desc, p, len); 943 transport->tcp_offset += used; 944 if (used != len) 945 return; 946 transport->tcp_flags &= ~TCP_RCV_COPY_XID; 947 transport->tcp_flags |= TCP_RCV_READ_CALLDIR; 948 transport->tcp_copied = 4; 949 dprintk("RPC: reading %s XID %08x\n", 950 (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for" 951 : "request with", 952 ntohl(transport->tcp_xid)); 953 xs_tcp_check_fraghdr(transport); 954 } 955 956 static inline void xs_tcp_read_calldir(struct sock_xprt *transport, 957 struct xdr_skb_reader *desc) 958 { 959 size_t len, used; 960 u32 offset; 961 __be32 calldir; 962 963 /* 964 * We want transport->tcp_offset to be 8 at the end of this routine 965 * (4 bytes for the xid and 4 bytes for the call/reply flag). 966 * When this function is called for the first time, 967 * transport->tcp_offset is 4 (after having already read the xid). 968 */ 969 offset = transport->tcp_offset - sizeof(transport->tcp_xid); 970 len = sizeof(calldir) - offset; 971 dprintk("RPC: reading CALL/REPLY flag (%Zu bytes)\n", len); 972 used = xdr_skb_read_bits(desc, &calldir, len); 973 transport->tcp_offset += used; 974 if (used != len) 975 return; 976 transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR; 977 transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; 978 transport->tcp_flags |= TCP_RCV_COPY_DATA; 979 /* 980 * We don't yet have the XDR buffer, so we will write the calldir 981 * out after we get the buffer from the 'struct rpc_rqst' 982 */ 983 if (ntohl(calldir) == RPC_REPLY) 984 transport->tcp_flags |= TCP_RPC_REPLY; 985 else 986 transport->tcp_flags &= ~TCP_RPC_REPLY; 987 dprintk("RPC: reading %s CALL/REPLY flag %08x\n", 988 (transport->tcp_flags & TCP_RPC_REPLY) ? 989 "reply for" : "request with", calldir); 990 xs_tcp_check_fraghdr(transport); 991 } 992 993 static inline void xs_tcp_read_common(struct rpc_xprt *xprt, 994 struct xdr_skb_reader *desc, 995 struct rpc_rqst *req) 996 { 997 struct sock_xprt *transport = 998 container_of(xprt, struct sock_xprt, xprt); 999 struct xdr_buf *rcvbuf; 1000 size_t len; 1001 ssize_t r; 1002 1003 rcvbuf = &req->rq_private_buf; 1004 1005 if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { 1006 /* 1007 * Save the RPC direction in the XDR buffer 1008 */ 1009 __be32 calldir = transport->tcp_flags & TCP_RPC_REPLY ? 1010 htonl(RPC_REPLY) : 0; 1011 1012 memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied, 1013 &calldir, sizeof(calldir)); 1014 transport->tcp_copied += sizeof(calldir); 1015 transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; 1016 } 1017 1018 len = desc->count; 1019 if (len > transport->tcp_reclen - transport->tcp_offset) { 1020 struct xdr_skb_reader my_desc; 1021 1022 len = transport->tcp_reclen - transport->tcp_offset; 1023 memcpy(&my_desc, desc, sizeof(my_desc)); 1024 my_desc.count = len; 1025 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied, 1026 &my_desc, xdr_skb_read_bits); 1027 desc->count -= r; 1028 desc->offset += r; 1029 } else 1030 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied, 1031 desc, xdr_skb_read_bits); 1032 1033 if (r > 0) { 1034 transport->tcp_copied += r; 1035 transport->tcp_offset += r; 1036 } 1037 if (r != len) { 1038 /* Error when copying to the receive buffer, 1039 * usually because we weren't able to allocate 1040 * additional buffer pages. All we can do now 1041 * is turn off TCP_RCV_COPY_DATA, so the request 1042 * will not receive any additional updates, 1043 * and time out. 1044 * Any remaining data from this record will 1045 * be discarded. 1046 */ 1047 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1048 dprintk("RPC: XID %08x truncated request\n", 1049 ntohl(transport->tcp_xid)); 1050 dprintk("RPC: xprt = %p, tcp_copied = %lu, " 1051 "tcp_offset = %u, tcp_reclen = %u\n", 1052 xprt, transport->tcp_copied, 1053 transport->tcp_offset, transport->tcp_reclen); 1054 return; 1055 } 1056 1057 dprintk("RPC: XID %08x read %Zd bytes\n", 1058 ntohl(transport->tcp_xid), r); 1059 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, " 1060 "tcp_reclen = %u\n", xprt, transport->tcp_copied, 1061 transport->tcp_offset, transport->tcp_reclen); 1062 1063 if (transport->tcp_copied == req->rq_private_buf.buflen) 1064 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1065 else if (transport->tcp_offset == transport->tcp_reclen) { 1066 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) 1067 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1068 } 1069 1070 return; 1071 } 1072 1073 /* 1074 * Finds the request corresponding to the RPC xid and invokes the common 1075 * tcp read code to read the data. 1076 */ 1077 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, 1078 struct xdr_skb_reader *desc) 1079 { 1080 struct sock_xprt *transport = 1081 container_of(xprt, struct sock_xprt, xprt); 1082 struct rpc_rqst *req; 1083 1084 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); 1085 1086 /* Find and lock the request corresponding to this xid */ 1087 spin_lock(&xprt->transport_lock); 1088 req = xprt_lookup_rqst(xprt, transport->tcp_xid); 1089 if (!req) { 1090 dprintk("RPC: XID %08x request not found!\n", 1091 ntohl(transport->tcp_xid)); 1092 spin_unlock(&xprt->transport_lock); 1093 return -1; 1094 } 1095 1096 xs_tcp_read_common(xprt, desc, req); 1097 1098 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1099 xprt_complete_rqst(req->rq_task, transport->tcp_copied); 1100 1101 spin_unlock(&xprt->transport_lock); 1102 return 0; 1103 } 1104 1105 #if defined(CONFIG_NFS_V4_1) 1106 /* 1107 * Obtains an rpc_rqst previously allocated and invokes the common 1108 * tcp read code to read the data. The result is placed in the callback 1109 * queue. 1110 * If we're unable to obtain the rpc_rqst we schedule the closing of the 1111 * connection and return -1. 1112 */ 1113 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, 1114 struct xdr_skb_reader *desc) 1115 { 1116 struct sock_xprt *transport = 1117 container_of(xprt, struct sock_xprt, xprt); 1118 struct rpc_rqst *req; 1119 1120 req = xprt_alloc_bc_request(xprt); 1121 if (req == NULL) { 1122 printk(KERN_WARNING "Callback slot table overflowed\n"); 1123 xprt_force_disconnect(xprt); 1124 return -1; 1125 } 1126 1127 req->rq_xid = transport->tcp_xid; 1128 dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); 1129 xs_tcp_read_common(xprt, desc, req); 1130 1131 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { 1132 struct svc_serv *bc_serv = xprt->bc_serv; 1133 1134 /* 1135 * Add callback request to callback list. The callback 1136 * service sleeps on the sv_cb_waitq waiting for new 1137 * requests. Wake it up after adding enqueing the 1138 * request. 1139 */ 1140 dprintk("RPC: add callback request to list\n"); 1141 spin_lock(&bc_serv->sv_cb_lock); 1142 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); 1143 spin_unlock(&bc_serv->sv_cb_lock); 1144 wake_up(&bc_serv->sv_cb_waitq); 1145 } 1146 1147 req->rq_private_buf.len = transport->tcp_copied; 1148 1149 return 0; 1150 } 1151 1152 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1153 struct xdr_skb_reader *desc) 1154 { 1155 struct sock_xprt *transport = 1156 container_of(xprt, struct sock_xprt, xprt); 1157 1158 return (transport->tcp_flags & TCP_RPC_REPLY) ? 1159 xs_tcp_read_reply(xprt, desc) : 1160 xs_tcp_read_callback(xprt, desc); 1161 } 1162 #else 1163 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1164 struct xdr_skb_reader *desc) 1165 { 1166 return xs_tcp_read_reply(xprt, desc); 1167 } 1168 #endif /* CONFIG_NFS_V4_1 */ 1169 1170 /* 1171 * Read data off the transport. This can be either an RPC_CALL or an 1172 * RPC_REPLY. Relay the processing to helper functions. 1173 */ 1174 static void xs_tcp_read_data(struct rpc_xprt *xprt, 1175 struct xdr_skb_reader *desc) 1176 { 1177 struct sock_xprt *transport = 1178 container_of(xprt, struct sock_xprt, xprt); 1179 1180 if (_xs_tcp_read_data(xprt, desc) == 0) 1181 xs_tcp_check_fraghdr(transport); 1182 else { 1183 /* 1184 * The transport_lock protects the request handling. 1185 * There's no need to hold it to update the tcp_flags. 1186 */ 1187 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1188 } 1189 } 1190 1191 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc) 1192 { 1193 size_t len; 1194 1195 len = transport->tcp_reclen - transport->tcp_offset; 1196 if (len > desc->count) 1197 len = desc->count; 1198 desc->count -= len; 1199 desc->offset += len; 1200 transport->tcp_offset += len; 1201 dprintk("RPC: discarded %Zu bytes\n", len); 1202 xs_tcp_check_fraghdr(transport); 1203 } 1204 1205 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len) 1206 { 1207 struct rpc_xprt *xprt = rd_desc->arg.data; 1208 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1209 struct xdr_skb_reader desc = { 1210 .skb = skb, 1211 .offset = offset, 1212 .count = len, 1213 }; 1214 1215 dprintk("RPC: xs_tcp_data_recv started\n"); 1216 do { 1217 /* Read in a new fragment marker if necessary */ 1218 /* Can we ever really expect to get completely empty fragments? */ 1219 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) { 1220 xs_tcp_read_fraghdr(xprt, &desc); 1221 continue; 1222 } 1223 /* Read in the xid if necessary */ 1224 if (transport->tcp_flags & TCP_RCV_COPY_XID) { 1225 xs_tcp_read_xid(transport, &desc); 1226 continue; 1227 } 1228 /* Read in the call/reply flag */ 1229 if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) { 1230 xs_tcp_read_calldir(transport, &desc); 1231 continue; 1232 } 1233 /* Read in the request data */ 1234 if (transport->tcp_flags & TCP_RCV_COPY_DATA) { 1235 xs_tcp_read_data(xprt, &desc); 1236 continue; 1237 } 1238 /* Skip over any trailing bytes on short reads */ 1239 xs_tcp_read_discard(transport, &desc); 1240 } while (desc.count); 1241 dprintk("RPC: xs_tcp_data_recv done\n"); 1242 return len - desc.count; 1243 } 1244 1245 /** 1246 * xs_tcp_data_ready - "data ready" callback for TCP sockets 1247 * @sk: socket with data to read 1248 * @bytes: how much data to read 1249 * 1250 */ 1251 static void xs_tcp_data_ready(struct sock *sk, int bytes) 1252 { 1253 struct rpc_xprt *xprt; 1254 read_descriptor_t rd_desc; 1255 int read; 1256 1257 dprintk("RPC: xs_tcp_data_ready...\n"); 1258 1259 read_lock(&sk->sk_callback_lock); 1260 if (!(xprt = xprt_from_sock(sk))) 1261 goto out; 1262 if (xprt->shutdown) 1263 goto out; 1264 1265 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ 1266 rd_desc.arg.data = xprt; 1267 do { 1268 rd_desc.count = 65536; 1269 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); 1270 } while (read > 0); 1271 out: 1272 read_unlock(&sk->sk_callback_lock); 1273 } 1274 1275 /* 1276 * Do the equivalent of linger/linger2 handling for dealing with 1277 * broken servers that don't close the socket in a timely 1278 * fashion 1279 */ 1280 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt, 1281 unsigned long timeout) 1282 { 1283 struct sock_xprt *transport; 1284 1285 if (xprt_test_and_set_connecting(xprt)) 1286 return; 1287 set_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1288 transport = container_of(xprt, struct sock_xprt, xprt); 1289 queue_delayed_work(rpciod_workqueue, &transport->connect_worker, 1290 timeout); 1291 } 1292 1293 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt) 1294 { 1295 struct sock_xprt *transport; 1296 1297 transport = container_of(xprt, struct sock_xprt, xprt); 1298 1299 if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) || 1300 !cancel_delayed_work(&transport->connect_worker)) 1301 return; 1302 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1303 xprt_clear_connecting(xprt); 1304 } 1305 1306 static void xs_sock_mark_closed(struct rpc_xprt *xprt) 1307 { 1308 smp_mb__before_clear_bit(); 1309 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1310 clear_bit(XPRT_CLOSING, &xprt->state); 1311 smp_mb__after_clear_bit(); 1312 /* Mark transport as closed and wake up all pending tasks */ 1313 xprt_disconnect_done(xprt); 1314 } 1315 1316 /** 1317 * xs_tcp_state_change - callback to handle TCP socket state changes 1318 * @sk: socket whose state has changed 1319 * 1320 */ 1321 static void xs_tcp_state_change(struct sock *sk) 1322 { 1323 struct rpc_xprt *xprt; 1324 1325 read_lock(&sk->sk_callback_lock); 1326 if (!(xprt = xprt_from_sock(sk))) 1327 goto out; 1328 dprintk("RPC: xs_tcp_state_change client %p...\n", xprt); 1329 dprintk("RPC: state %x conn %d dead %d zapped %d\n", 1330 sk->sk_state, xprt_connected(xprt), 1331 sock_flag(sk, SOCK_DEAD), 1332 sock_flag(sk, SOCK_ZAPPED)); 1333 1334 switch (sk->sk_state) { 1335 case TCP_ESTABLISHED: 1336 spin_lock_bh(&xprt->transport_lock); 1337 if (!xprt_test_and_set_connected(xprt)) { 1338 struct sock_xprt *transport = container_of(xprt, 1339 struct sock_xprt, xprt); 1340 1341 /* Reset TCP record info */ 1342 transport->tcp_offset = 0; 1343 transport->tcp_reclen = 0; 1344 transport->tcp_copied = 0; 1345 transport->tcp_flags = 1346 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; 1347 1348 xprt_wake_pending_tasks(xprt, -EAGAIN); 1349 } 1350 spin_unlock_bh(&xprt->transport_lock); 1351 break; 1352 case TCP_FIN_WAIT1: 1353 /* The client initiated a shutdown of the socket */ 1354 xprt->connect_cookie++; 1355 xprt->reestablish_timeout = 0; 1356 set_bit(XPRT_CLOSING, &xprt->state); 1357 smp_mb__before_clear_bit(); 1358 clear_bit(XPRT_CONNECTED, &xprt->state); 1359 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1360 smp_mb__after_clear_bit(); 1361 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); 1362 break; 1363 case TCP_CLOSE_WAIT: 1364 /* The server initiated a shutdown of the socket */ 1365 xprt_force_disconnect(xprt); 1366 case TCP_SYN_SENT: 1367 xprt->connect_cookie++; 1368 case TCP_CLOSING: 1369 /* 1370 * If the server closed down the connection, make sure that 1371 * we back off before reconnecting 1372 */ 1373 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 1374 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 1375 break; 1376 case TCP_LAST_ACK: 1377 set_bit(XPRT_CLOSING, &xprt->state); 1378 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); 1379 smp_mb__before_clear_bit(); 1380 clear_bit(XPRT_CONNECTED, &xprt->state); 1381 smp_mb__after_clear_bit(); 1382 break; 1383 case TCP_CLOSE: 1384 xs_tcp_cancel_linger_timeout(xprt); 1385 xs_sock_mark_closed(xprt); 1386 } 1387 out: 1388 read_unlock(&sk->sk_callback_lock); 1389 } 1390 1391 /** 1392 * xs_error_report - callback mainly for catching socket errors 1393 * @sk: socket 1394 */ 1395 static void xs_error_report(struct sock *sk) 1396 { 1397 struct rpc_xprt *xprt; 1398 1399 read_lock(&sk->sk_callback_lock); 1400 if (!(xprt = xprt_from_sock(sk))) 1401 goto out; 1402 dprintk("RPC: %s client %p...\n" 1403 "RPC: error %d\n", 1404 __func__, xprt, sk->sk_err); 1405 xprt_wake_pending_tasks(xprt, -EAGAIN); 1406 out: 1407 read_unlock(&sk->sk_callback_lock); 1408 } 1409 1410 static void xs_write_space(struct sock *sk) 1411 { 1412 struct socket *sock; 1413 struct rpc_xprt *xprt; 1414 1415 if (unlikely(!(sock = sk->sk_socket))) 1416 return; 1417 clear_bit(SOCK_NOSPACE, &sock->flags); 1418 1419 if (unlikely(!(xprt = xprt_from_sock(sk)))) 1420 return; 1421 if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0) 1422 return; 1423 1424 xprt_write_space(xprt); 1425 } 1426 1427 /** 1428 * xs_udp_write_space - callback invoked when socket buffer space 1429 * becomes available 1430 * @sk: socket whose state has changed 1431 * 1432 * Called when more output buffer space is available for this socket. 1433 * We try not to wake our writers until they can make "significant" 1434 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1435 * with a bunch of small requests. 1436 */ 1437 static void xs_udp_write_space(struct sock *sk) 1438 { 1439 read_lock(&sk->sk_callback_lock); 1440 1441 /* from net/core/sock.c:sock_def_write_space */ 1442 if (sock_writeable(sk)) 1443 xs_write_space(sk); 1444 1445 read_unlock(&sk->sk_callback_lock); 1446 } 1447 1448 /** 1449 * xs_tcp_write_space - callback invoked when socket buffer space 1450 * becomes available 1451 * @sk: socket whose state has changed 1452 * 1453 * Called when more output buffer space is available for this socket. 1454 * We try not to wake our writers until they can make "significant" 1455 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1456 * with a bunch of small requests. 1457 */ 1458 static void xs_tcp_write_space(struct sock *sk) 1459 { 1460 read_lock(&sk->sk_callback_lock); 1461 1462 /* from net/core/stream.c:sk_stream_write_space */ 1463 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) 1464 xs_write_space(sk); 1465 1466 read_unlock(&sk->sk_callback_lock); 1467 } 1468 1469 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt) 1470 { 1471 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1472 struct sock *sk = transport->inet; 1473 1474 if (transport->rcvsize) { 1475 sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 1476 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2; 1477 } 1478 if (transport->sndsize) { 1479 sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 1480 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2; 1481 sk->sk_write_space(sk); 1482 } 1483 } 1484 1485 /** 1486 * xs_udp_set_buffer_size - set send and receive limits 1487 * @xprt: generic transport 1488 * @sndsize: requested size of send buffer, in bytes 1489 * @rcvsize: requested size of receive buffer, in bytes 1490 * 1491 * Set socket send and receive buffer size limits. 1492 */ 1493 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize) 1494 { 1495 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1496 1497 transport->sndsize = 0; 1498 if (sndsize) 1499 transport->sndsize = sndsize + 1024; 1500 transport->rcvsize = 0; 1501 if (rcvsize) 1502 transport->rcvsize = rcvsize + 1024; 1503 1504 xs_udp_do_set_buffer_size(xprt); 1505 } 1506 1507 /** 1508 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport 1509 * @task: task that timed out 1510 * 1511 * Adjust the congestion window after a retransmit timeout has occurred. 1512 */ 1513 static void xs_udp_timer(struct rpc_task *task) 1514 { 1515 xprt_adjust_cwnd(task, -ETIMEDOUT); 1516 } 1517 1518 static unsigned short xs_get_random_port(void) 1519 { 1520 unsigned short range = xprt_max_resvport - xprt_min_resvport; 1521 unsigned short rand = (unsigned short) net_random() % range; 1522 return rand + xprt_min_resvport; 1523 } 1524 1525 /** 1526 * xs_set_port - reset the port number in the remote endpoint address 1527 * @xprt: generic transport 1528 * @port: new port number 1529 * 1530 */ 1531 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) 1532 { 1533 dprintk("RPC: setting port for xprt %p to %u\n", xprt, port); 1534 1535 rpc_set_port(xs_addr(xprt), port); 1536 xs_update_peer_port(xprt); 1537 } 1538 1539 static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock) 1540 { 1541 unsigned short port = transport->srcport; 1542 1543 if (port == 0 && transport->xprt.resvport) 1544 port = xs_get_random_port(); 1545 return port; 1546 } 1547 1548 static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port) 1549 { 1550 if (transport->srcport != 0) 1551 transport->srcport = 0; 1552 if (!transport->xprt.resvport) 1553 return 0; 1554 if (port <= xprt_min_resvport || port > xprt_max_resvport) 1555 return xprt_max_resvport; 1556 return --port; 1557 } 1558 1559 static int xs_bind4(struct sock_xprt *transport, struct socket *sock) 1560 { 1561 struct sockaddr_in myaddr = { 1562 .sin_family = AF_INET, 1563 }; 1564 struct sockaddr_in *sa; 1565 int err, nloop = 0; 1566 unsigned short port = xs_get_srcport(transport, sock); 1567 unsigned short last; 1568 1569 sa = (struct sockaddr_in *)&transport->srcaddr; 1570 myaddr.sin_addr = sa->sin_addr; 1571 do { 1572 myaddr.sin_port = htons(port); 1573 err = kernel_bind(sock, (struct sockaddr *) &myaddr, 1574 sizeof(myaddr)); 1575 if (port == 0) 1576 break; 1577 if (err == 0) { 1578 transport->srcport = port; 1579 break; 1580 } 1581 last = port; 1582 port = xs_next_srcport(transport, sock, port); 1583 if (port > last) 1584 nloop++; 1585 } while (err == -EADDRINUSE && nloop != 2); 1586 dprintk("RPC: %s %pI4:%u: %s (%d)\n", 1587 __func__, &myaddr.sin_addr, 1588 port, err ? "failed" : "ok", err); 1589 return err; 1590 } 1591 1592 static int xs_bind6(struct sock_xprt *transport, struct socket *sock) 1593 { 1594 struct sockaddr_in6 myaddr = { 1595 .sin6_family = AF_INET6, 1596 }; 1597 struct sockaddr_in6 *sa; 1598 int err, nloop = 0; 1599 unsigned short port = xs_get_srcport(transport, sock); 1600 unsigned short last; 1601 1602 sa = (struct sockaddr_in6 *)&transport->srcaddr; 1603 myaddr.sin6_addr = sa->sin6_addr; 1604 do { 1605 myaddr.sin6_port = htons(port); 1606 err = kernel_bind(sock, (struct sockaddr *) &myaddr, 1607 sizeof(myaddr)); 1608 if (port == 0) 1609 break; 1610 if (err == 0) { 1611 transport->srcport = port; 1612 break; 1613 } 1614 last = port; 1615 port = xs_next_srcport(transport, sock, port); 1616 if (port > last) 1617 nloop++; 1618 } while (err == -EADDRINUSE && nloop != 2); 1619 dprintk("RPC: xs_bind6 %pI6:%u: %s (%d)\n", 1620 &myaddr.sin6_addr, port, err ? "failed" : "ok", err); 1621 return err; 1622 } 1623 1624 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1625 static struct lock_class_key xs_key[2]; 1626 static struct lock_class_key xs_slock_key[2]; 1627 1628 static inline void xs_reclassify_socket4(struct socket *sock) 1629 { 1630 struct sock *sk = sock->sk; 1631 1632 BUG_ON(sock_owned_by_user(sk)); 1633 sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC", 1634 &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]); 1635 } 1636 1637 static inline void xs_reclassify_socket6(struct socket *sock) 1638 { 1639 struct sock *sk = sock->sk; 1640 1641 BUG_ON(sock_owned_by_user(sk)); 1642 sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC", 1643 &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]); 1644 } 1645 #else 1646 static inline void xs_reclassify_socket4(struct socket *sock) 1647 { 1648 } 1649 1650 static inline void xs_reclassify_socket6(struct socket *sock) 1651 { 1652 } 1653 #endif 1654 1655 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 1656 { 1657 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1658 1659 if (!transport->inet) { 1660 struct sock *sk = sock->sk; 1661 1662 write_lock_bh(&sk->sk_callback_lock); 1663 1664 xs_save_old_callbacks(transport, sk); 1665 1666 sk->sk_user_data = xprt; 1667 sk->sk_data_ready = xs_udp_data_ready; 1668 sk->sk_write_space = xs_udp_write_space; 1669 sk->sk_error_report = xs_error_report; 1670 sk->sk_no_check = UDP_CSUM_NORCV; 1671 sk->sk_allocation = GFP_ATOMIC; 1672 1673 xprt_set_connected(xprt); 1674 1675 /* Reset to new socket */ 1676 transport->sock = sock; 1677 transport->inet = sk; 1678 1679 write_unlock_bh(&sk->sk_callback_lock); 1680 } 1681 xs_udp_do_set_buffer_size(xprt); 1682 } 1683 1684 /** 1685 * xs_udp_connect_worker4 - set up a UDP socket 1686 * @work: RPC transport to connect 1687 * 1688 * Invoked by a work queue tasklet. 1689 */ 1690 static void xs_udp_connect_worker4(struct work_struct *work) 1691 { 1692 struct sock_xprt *transport = 1693 container_of(work, struct sock_xprt, connect_worker.work); 1694 struct rpc_xprt *xprt = &transport->xprt; 1695 struct socket *sock = transport->sock; 1696 int err, status = -EIO; 1697 1698 if (xprt->shutdown) 1699 goto out; 1700 1701 /* Start by resetting any existing state */ 1702 xs_reset_transport(transport); 1703 1704 err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock); 1705 if (err < 0) { 1706 dprintk("RPC: can't create UDP transport socket (%d).\n", -err); 1707 goto out; 1708 } 1709 xs_reclassify_socket4(sock); 1710 1711 if (xs_bind4(transport, sock)) { 1712 sock_release(sock); 1713 goto out; 1714 } 1715 1716 dprintk("RPC: worker connecting xprt %p via %s to " 1717 "%s (port %s)\n", xprt, 1718 xprt->address_strings[RPC_DISPLAY_PROTO], 1719 xprt->address_strings[RPC_DISPLAY_ADDR], 1720 xprt->address_strings[RPC_DISPLAY_PORT]); 1721 1722 xs_udp_finish_connecting(xprt, sock); 1723 status = 0; 1724 out: 1725 xprt_clear_connecting(xprt); 1726 xprt_wake_pending_tasks(xprt, status); 1727 } 1728 1729 /** 1730 * xs_udp_connect_worker6 - set up a UDP socket 1731 * @work: RPC transport to connect 1732 * 1733 * Invoked by a work queue tasklet. 1734 */ 1735 static void xs_udp_connect_worker6(struct work_struct *work) 1736 { 1737 struct sock_xprt *transport = 1738 container_of(work, struct sock_xprt, connect_worker.work); 1739 struct rpc_xprt *xprt = &transport->xprt; 1740 struct socket *sock = transport->sock; 1741 int err, status = -EIO; 1742 1743 if (xprt->shutdown) 1744 goto out; 1745 1746 /* Start by resetting any existing state */ 1747 xs_reset_transport(transport); 1748 1749 err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock); 1750 if (err < 0) { 1751 dprintk("RPC: can't create UDP transport socket (%d).\n", -err); 1752 goto out; 1753 } 1754 xs_reclassify_socket6(sock); 1755 1756 if (xs_bind6(transport, sock) < 0) { 1757 sock_release(sock); 1758 goto out; 1759 } 1760 1761 dprintk("RPC: worker connecting xprt %p via %s to " 1762 "%s (port %s)\n", xprt, 1763 xprt->address_strings[RPC_DISPLAY_PROTO], 1764 xprt->address_strings[RPC_DISPLAY_ADDR], 1765 xprt->address_strings[RPC_DISPLAY_PORT]); 1766 1767 xs_udp_finish_connecting(xprt, sock); 1768 status = 0; 1769 out: 1770 xprt_clear_connecting(xprt); 1771 xprt_wake_pending_tasks(xprt, status); 1772 } 1773 1774 /* 1775 * We need to preserve the port number so the reply cache on the server can 1776 * find our cached RPC replies when we get around to reconnecting. 1777 */ 1778 static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport) 1779 { 1780 int result; 1781 struct sockaddr any; 1782 1783 dprintk("RPC: disconnecting xprt %p to reuse port\n", xprt); 1784 1785 /* 1786 * Disconnect the transport socket by doing a connect operation 1787 * with AF_UNSPEC. This should return immediately... 1788 */ 1789 memset(&any, 0, sizeof(any)); 1790 any.sa_family = AF_UNSPEC; 1791 result = kernel_connect(transport->sock, &any, sizeof(any), 0); 1792 if (!result) 1793 xs_sock_mark_closed(xprt); 1794 else 1795 dprintk("RPC: AF_UNSPEC connect return code %d\n", 1796 result); 1797 } 1798 1799 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport) 1800 { 1801 unsigned int state = transport->inet->sk_state; 1802 1803 if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) 1804 return; 1805 if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) 1806 return; 1807 xs_abort_connection(xprt, transport); 1808 } 1809 1810 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 1811 { 1812 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1813 1814 if (!transport->inet) { 1815 struct sock *sk = sock->sk; 1816 1817 write_lock_bh(&sk->sk_callback_lock); 1818 1819 xs_save_old_callbacks(transport, sk); 1820 1821 sk->sk_user_data = xprt; 1822 sk->sk_data_ready = xs_tcp_data_ready; 1823 sk->sk_state_change = xs_tcp_state_change; 1824 sk->sk_write_space = xs_tcp_write_space; 1825 sk->sk_error_report = xs_error_report; 1826 sk->sk_allocation = GFP_ATOMIC; 1827 1828 /* socket options */ 1829 sk->sk_userlocks |= SOCK_BINDPORT_LOCK; 1830 sock_reset_flag(sk, SOCK_LINGER); 1831 tcp_sk(sk)->linger2 = 0; 1832 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; 1833 1834 xprt_clear_connected(xprt); 1835 1836 /* Reset to new socket */ 1837 transport->sock = sock; 1838 transport->inet = sk; 1839 1840 write_unlock_bh(&sk->sk_callback_lock); 1841 } 1842 1843 if (!xprt_bound(xprt)) 1844 return -ENOTCONN; 1845 1846 /* Tell the socket layer to start connecting... */ 1847 xprt->stat.connect_count++; 1848 xprt->stat.connect_start = jiffies; 1849 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); 1850 } 1851 1852 /** 1853 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint 1854 * @xprt: RPC transport to connect 1855 * @transport: socket transport to connect 1856 * @create_sock: function to create a socket of the correct type 1857 * 1858 * Invoked by a work queue tasklet. 1859 */ 1860 static void xs_tcp_setup_socket(struct rpc_xprt *xprt, 1861 struct sock_xprt *transport, 1862 struct socket *(*create_sock)(struct rpc_xprt *, 1863 struct sock_xprt *)) 1864 { 1865 struct socket *sock = transport->sock; 1866 int status = -EIO; 1867 1868 if (xprt->shutdown) 1869 goto out; 1870 1871 if (!sock) { 1872 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1873 sock = create_sock(xprt, transport); 1874 if (IS_ERR(sock)) { 1875 status = PTR_ERR(sock); 1876 goto out; 1877 } 1878 } else { 1879 int abort_and_exit; 1880 1881 abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT, 1882 &xprt->state); 1883 /* "close" the socket, preserving the local port */ 1884 xs_tcp_reuse_connection(xprt, transport); 1885 1886 if (abort_and_exit) 1887 goto out_eagain; 1888 } 1889 1890 dprintk("RPC: worker connecting xprt %p via %s to " 1891 "%s (port %s)\n", xprt, 1892 xprt->address_strings[RPC_DISPLAY_PROTO], 1893 xprt->address_strings[RPC_DISPLAY_ADDR], 1894 xprt->address_strings[RPC_DISPLAY_PORT]); 1895 1896 status = xs_tcp_finish_connecting(xprt, sock); 1897 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 1898 xprt, -status, xprt_connected(xprt), 1899 sock->sk->sk_state); 1900 switch (status) { 1901 default: 1902 printk("%s: connect returned unhandled error %d\n", 1903 __func__, status); 1904 case -EADDRNOTAVAIL: 1905 /* We're probably in TIME_WAIT. Get rid of existing socket, 1906 * and retry 1907 */ 1908 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); 1909 xprt_force_disconnect(xprt); 1910 break; 1911 case -ECONNREFUSED: 1912 case -ECONNRESET: 1913 case -ENETUNREACH: 1914 /* retry with existing socket, after a delay */ 1915 case 0: 1916 case -EINPROGRESS: 1917 case -EALREADY: 1918 xprt_clear_connecting(xprt); 1919 return; 1920 } 1921 out_eagain: 1922 status = -EAGAIN; 1923 out: 1924 xprt_clear_connecting(xprt); 1925 xprt_wake_pending_tasks(xprt, status); 1926 } 1927 1928 static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt, 1929 struct sock_xprt *transport) 1930 { 1931 struct socket *sock; 1932 int err; 1933 1934 /* start from scratch */ 1935 err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); 1936 if (err < 0) { 1937 dprintk("RPC: can't create TCP transport socket (%d).\n", 1938 -err); 1939 goto out_err; 1940 } 1941 xs_reclassify_socket4(sock); 1942 1943 if (xs_bind4(transport, sock) < 0) { 1944 sock_release(sock); 1945 goto out_err; 1946 } 1947 return sock; 1948 out_err: 1949 return ERR_PTR(-EIO); 1950 } 1951 1952 /** 1953 * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint 1954 * @work: RPC transport to connect 1955 * 1956 * Invoked by a work queue tasklet. 1957 */ 1958 static void xs_tcp_connect_worker4(struct work_struct *work) 1959 { 1960 struct sock_xprt *transport = 1961 container_of(work, struct sock_xprt, connect_worker.work); 1962 struct rpc_xprt *xprt = &transport->xprt; 1963 1964 xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4); 1965 } 1966 1967 static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt, 1968 struct sock_xprt *transport) 1969 { 1970 struct socket *sock; 1971 int err; 1972 1973 /* start from scratch */ 1974 err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock); 1975 if (err < 0) { 1976 dprintk("RPC: can't create TCP transport socket (%d).\n", 1977 -err); 1978 goto out_err; 1979 } 1980 xs_reclassify_socket6(sock); 1981 1982 if (xs_bind6(transport, sock) < 0) { 1983 sock_release(sock); 1984 goto out_err; 1985 } 1986 return sock; 1987 out_err: 1988 return ERR_PTR(-EIO); 1989 } 1990 1991 /** 1992 * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint 1993 * @work: RPC transport to connect 1994 * 1995 * Invoked by a work queue tasklet. 1996 */ 1997 static void xs_tcp_connect_worker6(struct work_struct *work) 1998 { 1999 struct sock_xprt *transport = 2000 container_of(work, struct sock_xprt, connect_worker.work); 2001 struct rpc_xprt *xprt = &transport->xprt; 2002 2003 xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6); 2004 } 2005 2006 /** 2007 * xs_connect - connect a socket to a remote endpoint 2008 * @task: address of RPC task that manages state of connect request 2009 * 2010 * TCP: If the remote end dropped the connection, delay reconnecting. 2011 * 2012 * UDP socket connects are synchronous, but we use a work queue anyway 2013 * to guarantee that even unprivileged user processes can set up a 2014 * socket on a privileged port. 2015 * 2016 * If a UDP socket connect fails, the delay behavior here prevents 2017 * retry floods (hard mounts). 2018 */ 2019 static void xs_connect(struct rpc_task *task) 2020 { 2021 struct rpc_xprt *xprt = task->tk_xprt; 2022 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2023 2024 if (xprt_test_and_set_connecting(xprt)) 2025 return; 2026 2027 if (transport->sock != NULL) { 2028 dprintk("RPC: xs_connect delayed xprt %p for %lu " 2029 "seconds\n", 2030 xprt, xprt->reestablish_timeout / HZ); 2031 queue_delayed_work(rpciod_workqueue, 2032 &transport->connect_worker, 2033 xprt->reestablish_timeout); 2034 xprt->reestablish_timeout <<= 1; 2035 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO) 2036 xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO; 2037 } else { 2038 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); 2039 queue_delayed_work(rpciod_workqueue, 2040 &transport->connect_worker, 0); 2041 } 2042 } 2043 2044 static void xs_tcp_connect(struct rpc_task *task) 2045 { 2046 struct rpc_xprt *xprt = task->tk_xprt; 2047 2048 /* Exit if we need to wait for socket shutdown to complete */ 2049 if (test_bit(XPRT_CLOSING, &xprt->state)) 2050 return; 2051 xs_connect(task); 2052 } 2053 2054 /** 2055 * xs_udp_print_stats - display UDP socket-specifc stats 2056 * @xprt: rpc_xprt struct containing statistics 2057 * @seq: output file 2058 * 2059 */ 2060 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2061 { 2062 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2063 2064 seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n", 2065 transport->srcport, 2066 xprt->stat.bind_count, 2067 xprt->stat.sends, 2068 xprt->stat.recvs, 2069 xprt->stat.bad_xids, 2070 xprt->stat.req_u, 2071 xprt->stat.bklog_u); 2072 } 2073 2074 /** 2075 * xs_tcp_print_stats - display TCP socket-specifc stats 2076 * @xprt: rpc_xprt struct containing statistics 2077 * @seq: output file 2078 * 2079 */ 2080 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2081 { 2082 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2083 long idle_time = 0; 2084 2085 if (xprt_connected(xprt)) 2086 idle_time = (long)(jiffies - xprt->last_used) / HZ; 2087 2088 seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n", 2089 transport->srcport, 2090 xprt->stat.bind_count, 2091 xprt->stat.connect_count, 2092 xprt->stat.connect_time, 2093 idle_time, 2094 xprt->stat.sends, 2095 xprt->stat.recvs, 2096 xprt->stat.bad_xids, 2097 xprt->stat.req_u, 2098 xprt->stat.bklog_u); 2099 } 2100 2101 static struct rpc_xprt_ops xs_udp_ops = { 2102 .set_buffer_size = xs_udp_set_buffer_size, 2103 .reserve_xprt = xprt_reserve_xprt_cong, 2104 .release_xprt = xprt_release_xprt_cong, 2105 .rpcbind = rpcb_getport_async, 2106 .set_port = xs_set_port, 2107 .connect = xs_connect, 2108 .buf_alloc = rpc_malloc, 2109 .buf_free = rpc_free, 2110 .send_request = xs_udp_send_request, 2111 .set_retrans_timeout = xprt_set_retrans_timeout_rtt, 2112 .timer = xs_udp_timer, 2113 .release_request = xprt_release_rqst_cong, 2114 .close = xs_close, 2115 .destroy = xs_destroy, 2116 .print_stats = xs_udp_print_stats, 2117 }; 2118 2119 static struct rpc_xprt_ops xs_tcp_ops = { 2120 .reserve_xprt = xprt_reserve_xprt, 2121 .release_xprt = xs_tcp_release_xprt, 2122 .rpcbind = rpcb_getport_async, 2123 .set_port = xs_set_port, 2124 .connect = xs_tcp_connect, 2125 .buf_alloc = rpc_malloc, 2126 .buf_free = rpc_free, 2127 .send_request = xs_tcp_send_request, 2128 .set_retrans_timeout = xprt_set_retrans_timeout_def, 2129 #if defined(CONFIG_NFS_V4_1) 2130 .release_request = bc_release_request, 2131 #endif /* CONFIG_NFS_V4_1 */ 2132 .close = xs_tcp_close, 2133 .destroy = xs_destroy, 2134 .print_stats = xs_tcp_print_stats, 2135 }; 2136 2137 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, 2138 unsigned int slot_table_size) 2139 { 2140 struct rpc_xprt *xprt; 2141 struct sock_xprt *new; 2142 2143 if (args->addrlen > sizeof(xprt->addr)) { 2144 dprintk("RPC: xs_setup_xprt: address too large\n"); 2145 return ERR_PTR(-EBADF); 2146 } 2147 2148 new = kzalloc(sizeof(*new), GFP_KERNEL); 2149 if (new == NULL) { 2150 dprintk("RPC: xs_setup_xprt: couldn't allocate " 2151 "rpc_xprt\n"); 2152 return ERR_PTR(-ENOMEM); 2153 } 2154 xprt = &new->xprt; 2155 2156 xprt->max_reqs = slot_table_size; 2157 xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL); 2158 if (xprt->slot == NULL) { 2159 kfree(xprt); 2160 dprintk("RPC: xs_setup_xprt: couldn't allocate slot " 2161 "table\n"); 2162 return ERR_PTR(-ENOMEM); 2163 } 2164 2165 memcpy(&xprt->addr, args->dstaddr, args->addrlen); 2166 xprt->addrlen = args->addrlen; 2167 if (args->srcaddr) 2168 memcpy(&new->srcaddr, args->srcaddr, args->addrlen); 2169 2170 return xprt; 2171 } 2172 2173 static const struct rpc_timeout xs_udp_default_timeout = { 2174 .to_initval = 5 * HZ, 2175 .to_maxval = 30 * HZ, 2176 .to_increment = 5 * HZ, 2177 .to_retries = 5, 2178 }; 2179 2180 /** 2181 * xs_setup_udp - Set up transport to use a UDP socket 2182 * @args: rpc transport creation arguments 2183 * 2184 */ 2185 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) 2186 { 2187 struct sockaddr *addr = args->dstaddr; 2188 struct rpc_xprt *xprt; 2189 struct sock_xprt *transport; 2190 2191 xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries); 2192 if (IS_ERR(xprt)) 2193 return xprt; 2194 transport = container_of(xprt, struct sock_xprt, xprt); 2195 2196 xprt->prot = IPPROTO_UDP; 2197 xprt->tsh_size = 0; 2198 /* XXX: header size can vary due to auth type, IPv6, etc. */ 2199 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 2200 2201 xprt->bind_timeout = XS_BIND_TO; 2202 xprt->connect_timeout = XS_UDP_CONN_TO; 2203 xprt->reestablish_timeout = XS_UDP_REEST_TO; 2204 xprt->idle_timeout = XS_IDLE_DISC_TO; 2205 2206 xprt->ops = &xs_udp_ops; 2207 2208 xprt->timeout = &xs_udp_default_timeout; 2209 2210 switch (addr->sa_family) { 2211 case AF_INET: 2212 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2213 xprt_set_bound(xprt); 2214 2215 INIT_DELAYED_WORK(&transport->connect_worker, 2216 xs_udp_connect_worker4); 2217 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); 2218 break; 2219 case AF_INET6: 2220 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2221 xprt_set_bound(xprt); 2222 2223 INIT_DELAYED_WORK(&transport->connect_worker, 2224 xs_udp_connect_worker6); 2225 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); 2226 break; 2227 default: 2228 kfree(xprt); 2229 return ERR_PTR(-EAFNOSUPPORT); 2230 } 2231 2232 if (xprt_bound(xprt)) 2233 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 2234 xprt->address_strings[RPC_DISPLAY_ADDR], 2235 xprt->address_strings[RPC_DISPLAY_PORT], 2236 xprt->address_strings[RPC_DISPLAY_PROTO]); 2237 else 2238 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 2239 xprt->address_strings[RPC_DISPLAY_ADDR], 2240 xprt->address_strings[RPC_DISPLAY_PROTO]); 2241 2242 if (try_module_get(THIS_MODULE)) 2243 return xprt; 2244 2245 kfree(xprt->slot); 2246 kfree(xprt); 2247 return ERR_PTR(-EINVAL); 2248 } 2249 2250 static const struct rpc_timeout xs_tcp_default_timeout = { 2251 .to_initval = 60 * HZ, 2252 .to_maxval = 60 * HZ, 2253 .to_retries = 2, 2254 }; 2255 2256 /** 2257 * xs_setup_tcp - Set up transport to use a TCP socket 2258 * @args: rpc transport creation arguments 2259 * 2260 */ 2261 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) 2262 { 2263 struct sockaddr *addr = args->dstaddr; 2264 struct rpc_xprt *xprt; 2265 struct sock_xprt *transport; 2266 2267 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries); 2268 if (IS_ERR(xprt)) 2269 return xprt; 2270 transport = container_of(xprt, struct sock_xprt, xprt); 2271 2272 xprt->prot = IPPROTO_TCP; 2273 xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); 2274 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 2275 2276 xprt->bind_timeout = XS_BIND_TO; 2277 xprt->connect_timeout = XS_TCP_CONN_TO; 2278 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2279 xprt->idle_timeout = XS_IDLE_DISC_TO; 2280 2281 xprt->ops = &xs_tcp_ops; 2282 xprt->timeout = &xs_tcp_default_timeout; 2283 2284 switch (addr->sa_family) { 2285 case AF_INET: 2286 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2287 xprt_set_bound(xprt); 2288 2289 INIT_DELAYED_WORK(&transport->connect_worker, 2290 xs_tcp_connect_worker4); 2291 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); 2292 break; 2293 case AF_INET6: 2294 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2295 xprt_set_bound(xprt); 2296 2297 INIT_DELAYED_WORK(&transport->connect_worker, 2298 xs_tcp_connect_worker6); 2299 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); 2300 break; 2301 default: 2302 kfree(xprt); 2303 return ERR_PTR(-EAFNOSUPPORT); 2304 } 2305 2306 if (xprt_bound(xprt)) 2307 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 2308 xprt->address_strings[RPC_DISPLAY_ADDR], 2309 xprt->address_strings[RPC_DISPLAY_PORT], 2310 xprt->address_strings[RPC_DISPLAY_PROTO]); 2311 else 2312 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 2313 xprt->address_strings[RPC_DISPLAY_ADDR], 2314 xprt->address_strings[RPC_DISPLAY_PROTO]); 2315 2316 2317 if (try_module_get(THIS_MODULE)) 2318 return xprt; 2319 2320 kfree(xprt->slot); 2321 kfree(xprt); 2322 return ERR_PTR(-EINVAL); 2323 } 2324 2325 static struct xprt_class xs_udp_transport = { 2326 .list = LIST_HEAD_INIT(xs_udp_transport.list), 2327 .name = "udp", 2328 .owner = THIS_MODULE, 2329 .ident = IPPROTO_UDP, 2330 .setup = xs_setup_udp, 2331 }; 2332 2333 static struct xprt_class xs_tcp_transport = { 2334 .list = LIST_HEAD_INIT(xs_tcp_transport.list), 2335 .name = "tcp", 2336 .owner = THIS_MODULE, 2337 .ident = IPPROTO_TCP, 2338 .setup = xs_setup_tcp, 2339 }; 2340 2341 /** 2342 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client 2343 * 2344 */ 2345 int init_socket_xprt(void) 2346 { 2347 #ifdef RPC_DEBUG 2348 if (!sunrpc_table_header) 2349 sunrpc_table_header = register_sysctl_table(sunrpc_table); 2350 #endif 2351 2352 xprt_register_transport(&xs_udp_transport); 2353 xprt_register_transport(&xs_tcp_transport); 2354 2355 return 0; 2356 } 2357 2358 /** 2359 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister 2360 * 2361 */ 2362 void cleanup_socket_xprt(void) 2363 { 2364 #ifdef RPC_DEBUG 2365 if (sunrpc_table_header) { 2366 unregister_sysctl_table(sunrpc_table_header); 2367 sunrpc_table_header = NULL; 2368 } 2369 #endif 2370 2371 xprt_unregister_transport(&xs_udp_transport); 2372 xprt_unregister_transport(&xs_tcp_transport); 2373 } 2374 2375 static int param_set_uint_minmax(const char *val, struct kernel_param *kp, 2376 unsigned int min, unsigned int max) 2377 { 2378 unsigned long num; 2379 int ret; 2380 2381 if (!val) 2382 return -EINVAL; 2383 ret = strict_strtoul(val, 0, &num); 2384 if (ret == -EINVAL || num < min || num > max) 2385 return -EINVAL; 2386 *((unsigned int *)kp->arg) = num; 2387 return 0; 2388 } 2389 2390 static int param_set_portnr(const char *val, struct kernel_param *kp) 2391 { 2392 return param_set_uint_minmax(val, kp, 2393 RPC_MIN_RESVPORT, 2394 RPC_MAX_RESVPORT); 2395 } 2396 2397 static int param_get_portnr(char *buffer, struct kernel_param *kp) 2398 { 2399 return param_get_uint(buffer, kp); 2400 } 2401 #define param_check_portnr(name, p) \ 2402 __param_check(name, p, unsigned int); 2403 2404 module_param_named(min_resvport, xprt_min_resvport, portnr, 0644); 2405 module_param_named(max_resvport, xprt_max_resvport, portnr, 0644); 2406 2407 static int param_set_slot_table_size(const char *val, struct kernel_param *kp) 2408 { 2409 return param_set_uint_minmax(val, kp, 2410 RPC_MIN_SLOT_TABLE, 2411 RPC_MAX_SLOT_TABLE); 2412 } 2413 2414 static int param_get_slot_table_size(char *buffer, struct kernel_param *kp) 2415 { 2416 return param_get_uint(buffer, kp); 2417 } 2418 #define param_check_slot_table_size(name, p) \ 2419 __param_check(name, p, unsigned int); 2420 2421 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries, 2422 slot_table_size, 0644); 2423 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries, 2424 slot_table_size, 0644); 2425 2426