xref: /linux/net/sunrpc/xprtsock.c (revision 27258e448eb301cf89e351df87aa8cb916653bf2)
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat
7  * TCP send fixes (C) 1998 Red Hat
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  *
17  * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18  *   <gilles.quillard@bull.net>
19  */
20 
21 #include <linux/types.h>
22 #include <linux/slab.h>
23 #include <linux/module.h>
24 #include <linux/capability.h>
25 #include <linux/pagemap.h>
26 #include <linux/errno.h>
27 #include <linux/socket.h>
28 #include <linux/in.h>
29 #include <linux/net.h>
30 #include <linux/mm.h>
31 #include <linux/udp.h>
32 #include <linux/tcp.h>
33 #include <linux/sunrpc/clnt.h>
34 #include <linux/sunrpc/sched.h>
35 #include <linux/sunrpc/xprtsock.h>
36 #include <linux/file.h>
37 #ifdef CONFIG_NFS_V4_1
38 #include <linux/sunrpc/bc_xprt.h>
39 #endif
40 
41 #include <net/sock.h>
42 #include <net/checksum.h>
43 #include <net/udp.h>
44 #include <net/tcp.h>
45 
46 /*
47  * xprtsock tunables
48  */
49 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
50 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
51 
52 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
53 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
54 
55 #define XS_TCP_LINGER_TO	(15U * HZ)
56 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
57 
58 /*
59  * We can register our own files under /proc/sys/sunrpc by
60  * calling register_sysctl_table() again.  The files in that
61  * directory become the union of all files registered there.
62  *
63  * We simply need to make sure that we don't collide with
64  * someone else's file names!
65  */
66 
67 #ifdef RPC_DEBUG
68 
69 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
70 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
71 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
72 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
73 
74 static struct ctl_table_header *sunrpc_table_header;
75 
76 /*
77  * FIXME: changing the UDP slot table size should also resize the UDP
78  *        socket buffers for existing UDP transports
79  */
80 static ctl_table xs_tunables_table[] = {
81 	{
82 		.ctl_name	= CTL_SLOTTABLE_UDP,
83 		.procname	= "udp_slot_table_entries",
84 		.data		= &xprt_udp_slot_table_entries,
85 		.maxlen		= sizeof(unsigned int),
86 		.mode		= 0644,
87 		.proc_handler	= &proc_dointvec_minmax,
88 		.strategy	= &sysctl_intvec,
89 		.extra1		= &min_slot_table_size,
90 		.extra2		= &max_slot_table_size
91 	},
92 	{
93 		.ctl_name	= CTL_SLOTTABLE_TCP,
94 		.procname	= "tcp_slot_table_entries",
95 		.data		= &xprt_tcp_slot_table_entries,
96 		.maxlen		= sizeof(unsigned int),
97 		.mode		= 0644,
98 		.proc_handler	= &proc_dointvec_minmax,
99 		.strategy	= &sysctl_intvec,
100 		.extra1		= &min_slot_table_size,
101 		.extra2		= &max_slot_table_size
102 	},
103 	{
104 		.ctl_name	= CTL_MIN_RESVPORT,
105 		.procname	= "min_resvport",
106 		.data		= &xprt_min_resvport,
107 		.maxlen		= sizeof(unsigned int),
108 		.mode		= 0644,
109 		.proc_handler	= &proc_dointvec_minmax,
110 		.strategy	= &sysctl_intvec,
111 		.extra1		= &xprt_min_resvport_limit,
112 		.extra2		= &xprt_max_resvport_limit
113 	},
114 	{
115 		.ctl_name	= CTL_MAX_RESVPORT,
116 		.procname	= "max_resvport",
117 		.data		= &xprt_max_resvport,
118 		.maxlen		= sizeof(unsigned int),
119 		.mode		= 0644,
120 		.proc_handler	= &proc_dointvec_minmax,
121 		.strategy	= &sysctl_intvec,
122 		.extra1		= &xprt_min_resvport_limit,
123 		.extra2		= &xprt_max_resvport_limit
124 	},
125 	{
126 		.procname	= "tcp_fin_timeout",
127 		.data		= &xs_tcp_fin_timeout,
128 		.maxlen		= sizeof(xs_tcp_fin_timeout),
129 		.mode		= 0644,
130 		.proc_handler	= &proc_dointvec_jiffies,
131 		.strategy	= sysctl_jiffies
132 	},
133 	{
134 		.ctl_name = 0,
135 	},
136 };
137 
138 static ctl_table sunrpc_table[] = {
139 	{
140 		.ctl_name	= CTL_SUNRPC,
141 		.procname	= "sunrpc",
142 		.mode		= 0555,
143 		.child		= xs_tunables_table
144 	},
145 	{
146 		.ctl_name = 0,
147 	},
148 };
149 
150 #endif
151 
152 /*
153  * Time out for an RPC UDP socket connect.  UDP socket connects are
154  * synchronous, but we set a timeout anyway in case of resource
155  * exhaustion on the local host.
156  */
157 #define XS_UDP_CONN_TO		(5U * HZ)
158 
159 /*
160  * Wait duration for an RPC TCP connection to be established.  Solaris
161  * NFS over TCP uses 60 seconds, for example, which is in line with how
162  * long a server takes to reboot.
163  */
164 #define XS_TCP_CONN_TO		(60U * HZ)
165 
166 /*
167  * Wait duration for a reply from the RPC portmapper.
168  */
169 #define XS_BIND_TO		(60U * HZ)
170 
171 /*
172  * Delay if a UDP socket connect error occurs.  This is most likely some
173  * kind of resource problem on the local host.
174  */
175 #define XS_UDP_REEST_TO		(2U * HZ)
176 
177 /*
178  * The reestablish timeout allows clients to delay for a bit before attempting
179  * to reconnect to a server that just dropped our connection.
180  *
181  * We implement an exponential backoff when trying to reestablish a TCP
182  * transport connection with the server.  Some servers like to drop a TCP
183  * connection when they are overworked, so we start with a short timeout and
184  * increase over time if the server is down or not responding.
185  */
186 #define XS_TCP_INIT_REEST_TO	(3U * HZ)
187 #define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)
188 
189 /*
190  * TCP idle timeout; client drops the transport socket if it is idle
191  * for this long.  Note that we also timeout UDP sockets to prevent
192  * holding port numbers when there is no RPC traffic.
193  */
194 #define XS_IDLE_DISC_TO		(5U * 60 * HZ)
195 
196 #ifdef RPC_DEBUG
197 # undef  RPC_DEBUG_DATA
198 # define RPCDBG_FACILITY	RPCDBG_TRANS
199 #endif
200 
201 #ifdef RPC_DEBUG_DATA
202 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
203 {
204 	u8 *buf = (u8 *) packet;
205 	int j;
206 
207 	dprintk("RPC:       %s\n", msg);
208 	for (j = 0; j < count && j < 128; j += 4) {
209 		if (!(j & 31)) {
210 			if (j)
211 				dprintk("\n");
212 			dprintk("0x%04x ", j);
213 		}
214 		dprintk("%02x%02x%02x%02x ",
215 			buf[j], buf[j+1], buf[j+2], buf[j+3]);
216 	}
217 	dprintk("\n");
218 }
219 #else
220 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
221 {
222 	/* NOP */
223 }
224 #endif
225 
226 struct sock_xprt {
227 	struct rpc_xprt		xprt;
228 
229 	/*
230 	 * Network layer
231 	 */
232 	struct socket *		sock;
233 	struct sock *		inet;
234 
235 	/*
236 	 * State of TCP reply receive
237 	 */
238 	__be32			tcp_fraghdr,
239 				tcp_xid;
240 
241 	u32			tcp_offset,
242 				tcp_reclen;
243 
244 	unsigned long		tcp_copied,
245 				tcp_flags;
246 
247 	/*
248 	 * Connection of transports
249 	 */
250 	struct delayed_work	connect_worker;
251 	struct sockaddr_storage	srcaddr;
252 	unsigned short		srcport;
253 
254 	/*
255 	 * UDP socket buffer size parameters
256 	 */
257 	size_t			rcvsize,
258 				sndsize;
259 
260 	/*
261 	 * Saved socket callback addresses
262 	 */
263 	void			(*old_data_ready)(struct sock *, int);
264 	void			(*old_state_change)(struct sock *);
265 	void			(*old_write_space)(struct sock *);
266 	void			(*old_error_report)(struct sock *);
267 };
268 
269 /*
270  * TCP receive state flags
271  */
272 #define TCP_RCV_LAST_FRAG	(1UL << 0)
273 #define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
274 #define TCP_RCV_COPY_XID	(1UL << 2)
275 #define TCP_RCV_COPY_DATA	(1UL << 3)
276 #define TCP_RCV_READ_CALLDIR	(1UL << 4)
277 #define TCP_RCV_COPY_CALLDIR	(1UL << 5)
278 
279 /*
280  * TCP RPC flags
281  */
282 #define TCP_RPC_REPLY		(1UL << 6)
283 
284 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
285 {
286 	return (struct sockaddr *) &xprt->addr;
287 }
288 
289 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
290 {
291 	return (struct sockaddr_in *) &xprt->addr;
292 }
293 
294 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
295 {
296 	return (struct sockaddr_in6 *) &xprt->addr;
297 }
298 
299 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
300 {
301 	struct sockaddr *sap = xs_addr(xprt);
302 	struct sockaddr_in6 *sin6;
303 	struct sockaddr_in *sin;
304 	char buf[128];
305 
306 	(void)rpc_ntop(sap, buf, sizeof(buf));
307 	xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
308 
309 	switch (sap->sa_family) {
310 	case AF_INET:
311 		sin = xs_addr_in(xprt);
312 		(void)snprintf(buf, sizeof(buf), "%02x%02x%02x%02x",
313 					NIPQUAD(sin->sin_addr.s_addr));
314 		break;
315 	case AF_INET6:
316 		sin6 = xs_addr_in6(xprt);
317 		(void)snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
318 		break;
319 	default:
320 		BUG();
321 	}
322 	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
323 }
324 
325 static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
326 {
327 	struct sockaddr *sap = xs_addr(xprt);
328 	char buf[128];
329 
330 	(void)snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
331 	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
332 
333 	(void)snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
334 	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
335 }
336 
337 static void xs_format_peer_addresses(struct rpc_xprt *xprt,
338 				     const char *protocol,
339 				     const char *netid)
340 {
341 	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
342 	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
343 	xs_format_common_peer_addresses(xprt);
344 	xs_format_common_peer_ports(xprt);
345 }
346 
347 static void xs_update_peer_port(struct rpc_xprt *xprt)
348 {
349 	kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
350 	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
351 
352 	xs_format_common_peer_ports(xprt);
353 }
354 
355 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
356 {
357 	unsigned int i;
358 
359 	for (i = 0; i < RPC_DISPLAY_MAX; i++)
360 		switch (i) {
361 		case RPC_DISPLAY_PROTO:
362 		case RPC_DISPLAY_NETID:
363 			continue;
364 		default:
365 			kfree(xprt->address_strings[i]);
366 		}
367 }
368 
369 #define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
370 
371 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
372 {
373 	struct msghdr msg = {
374 		.msg_name	= addr,
375 		.msg_namelen	= addrlen,
376 		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
377 	};
378 	struct kvec iov = {
379 		.iov_base	= vec->iov_base + base,
380 		.iov_len	= vec->iov_len - base,
381 	};
382 
383 	if (iov.iov_len != 0)
384 		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
385 	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
386 }
387 
388 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
389 {
390 	struct page **ppage;
391 	unsigned int remainder;
392 	int err, sent = 0;
393 
394 	remainder = xdr->page_len - base;
395 	base += xdr->page_base;
396 	ppage = xdr->pages + (base >> PAGE_SHIFT);
397 	base &= ~PAGE_MASK;
398 	for(;;) {
399 		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
400 		int flags = XS_SENDMSG_FLAGS;
401 
402 		remainder -= len;
403 		if (remainder != 0 || more)
404 			flags |= MSG_MORE;
405 		err = sock->ops->sendpage(sock, *ppage, base, len, flags);
406 		if (remainder == 0 || err != len)
407 			break;
408 		sent += err;
409 		ppage++;
410 		base = 0;
411 	}
412 	if (sent == 0)
413 		return err;
414 	if (err > 0)
415 		sent += err;
416 	return sent;
417 }
418 
419 /**
420  * xs_sendpages - write pages directly to a socket
421  * @sock: socket to send on
422  * @addr: UDP only -- address of destination
423  * @addrlen: UDP only -- length of destination address
424  * @xdr: buffer containing this request
425  * @base: starting position in the buffer
426  *
427  */
428 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
429 {
430 	unsigned int remainder = xdr->len - base;
431 	int err, sent = 0;
432 
433 	if (unlikely(!sock))
434 		return -ENOTSOCK;
435 
436 	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
437 	if (base != 0) {
438 		addr = NULL;
439 		addrlen = 0;
440 	}
441 
442 	if (base < xdr->head[0].iov_len || addr != NULL) {
443 		unsigned int len = xdr->head[0].iov_len - base;
444 		remainder -= len;
445 		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
446 		if (remainder == 0 || err != len)
447 			goto out;
448 		sent += err;
449 		base = 0;
450 	} else
451 		base -= xdr->head[0].iov_len;
452 
453 	if (base < xdr->page_len) {
454 		unsigned int len = xdr->page_len - base;
455 		remainder -= len;
456 		err = xs_send_pagedata(sock, xdr, base, remainder != 0);
457 		if (remainder == 0 || err != len)
458 			goto out;
459 		sent += err;
460 		base = 0;
461 	} else
462 		base -= xdr->page_len;
463 
464 	if (base >= xdr->tail[0].iov_len)
465 		return sent;
466 	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
467 out:
468 	if (sent == 0)
469 		return err;
470 	if (err > 0)
471 		sent += err;
472 	return sent;
473 }
474 
475 static void xs_nospace_callback(struct rpc_task *task)
476 {
477 	struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
478 
479 	transport->inet->sk_write_pending--;
480 	clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
481 }
482 
483 /**
484  * xs_nospace - place task on wait queue if transmit was incomplete
485  * @task: task to put to sleep
486  *
487  */
488 static int xs_nospace(struct rpc_task *task)
489 {
490 	struct rpc_rqst *req = task->tk_rqstp;
491 	struct rpc_xprt *xprt = req->rq_xprt;
492 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
493 	int ret = 0;
494 
495 	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
496 			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
497 			req->rq_slen);
498 
499 	/* Protect against races with write_space */
500 	spin_lock_bh(&xprt->transport_lock);
501 
502 	/* Don't race with disconnect */
503 	if (xprt_connected(xprt)) {
504 		if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
505 			ret = -EAGAIN;
506 			/*
507 			 * Notify TCP that we're limited by the application
508 			 * window size
509 			 */
510 			set_bit(SOCK_NOSPACE, &transport->sock->flags);
511 			transport->inet->sk_write_pending++;
512 			/* ...and wait for more buffer space */
513 			xprt_wait_for_buffer_space(task, xs_nospace_callback);
514 		}
515 	} else {
516 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
517 		ret = -ENOTCONN;
518 	}
519 
520 	spin_unlock_bh(&xprt->transport_lock);
521 	return ret;
522 }
523 
524 /**
525  * xs_udp_send_request - write an RPC request to a UDP socket
526  * @task: address of RPC task that manages the state of an RPC request
527  *
528  * Return values:
529  *        0:	The request has been sent
530  *   EAGAIN:	The socket was blocked, please call again later to
531  *		complete the request
532  * ENOTCONN:	Caller needs to invoke connect logic then call again
533  *    other:	Some other error occured, the request was not sent
534  */
535 static int xs_udp_send_request(struct rpc_task *task)
536 {
537 	struct rpc_rqst *req = task->tk_rqstp;
538 	struct rpc_xprt *xprt = req->rq_xprt;
539 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
540 	struct xdr_buf *xdr = &req->rq_snd_buf;
541 	int status;
542 
543 	xs_pktdump("packet data:",
544 				req->rq_svec->iov_base,
545 				req->rq_svec->iov_len);
546 
547 	if (!xprt_bound(xprt))
548 		return -ENOTCONN;
549 	status = xs_sendpages(transport->sock,
550 			      xs_addr(xprt),
551 			      xprt->addrlen, xdr,
552 			      req->rq_bytes_sent);
553 
554 	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
555 			xdr->len - req->rq_bytes_sent, status);
556 
557 	if (status >= 0) {
558 		task->tk_bytes_sent += status;
559 		if (status >= req->rq_slen)
560 			return 0;
561 		/* Still some bytes left; set up for a retry later. */
562 		status = -EAGAIN;
563 	}
564 	if (!transport->sock)
565 		goto out;
566 
567 	switch (status) {
568 	case -ENOTSOCK:
569 		status = -ENOTCONN;
570 		/* Should we call xs_close() here? */
571 		break;
572 	case -EAGAIN:
573 		status = xs_nospace(task);
574 		break;
575 	default:
576 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
577 			-status);
578 	case -ENETUNREACH:
579 	case -EPIPE:
580 	case -ECONNREFUSED:
581 		/* When the server has died, an ICMP port unreachable message
582 		 * prompts ECONNREFUSED. */
583 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
584 	}
585 out:
586 	return status;
587 }
588 
589 /**
590  * xs_tcp_shutdown - gracefully shut down a TCP socket
591  * @xprt: transport
592  *
593  * Initiates a graceful shutdown of the TCP socket by calling the
594  * equivalent of shutdown(SHUT_WR);
595  */
596 static void xs_tcp_shutdown(struct rpc_xprt *xprt)
597 {
598 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
599 	struct socket *sock = transport->sock;
600 
601 	if (sock != NULL)
602 		kernel_sock_shutdown(sock, SHUT_WR);
603 }
604 
605 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
606 {
607 	u32 reclen = buf->len - sizeof(rpc_fraghdr);
608 	rpc_fraghdr *base = buf->head[0].iov_base;
609 	*base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
610 }
611 
612 /**
613  * xs_tcp_send_request - write an RPC request to a TCP socket
614  * @task: address of RPC task that manages the state of an RPC request
615  *
616  * Return values:
617  *        0:	The request has been sent
618  *   EAGAIN:	The socket was blocked, please call again later to
619  *		complete the request
620  * ENOTCONN:	Caller needs to invoke connect logic then call again
621  *    other:	Some other error occured, the request was not sent
622  *
623  * XXX: In the case of soft timeouts, should we eventually give up
624  *	if sendmsg is not able to make progress?
625  */
626 static int xs_tcp_send_request(struct rpc_task *task)
627 {
628 	struct rpc_rqst *req = task->tk_rqstp;
629 	struct rpc_xprt *xprt = req->rq_xprt;
630 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
631 	struct xdr_buf *xdr = &req->rq_snd_buf;
632 	int status;
633 
634 	xs_encode_tcp_record_marker(&req->rq_snd_buf);
635 
636 	xs_pktdump("packet data:",
637 				req->rq_svec->iov_base,
638 				req->rq_svec->iov_len);
639 
640 	/* Continue transmitting the packet/record. We must be careful
641 	 * to cope with writespace callbacks arriving _after_ we have
642 	 * called sendmsg(). */
643 	while (1) {
644 		status = xs_sendpages(transport->sock,
645 					NULL, 0, xdr, req->rq_bytes_sent);
646 
647 		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
648 				xdr->len - req->rq_bytes_sent, status);
649 
650 		if (unlikely(status < 0))
651 			break;
652 
653 		/* If we've sent the entire packet, immediately
654 		 * reset the count of bytes sent. */
655 		req->rq_bytes_sent += status;
656 		task->tk_bytes_sent += status;
657 		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
658 			req->rq_bytes_sent = 0;
659 			return 0;
660 		}
661 
662 		if (status != 0)
663 			continue;
664 		status = -EAGAIN;
665 		break;
666 	}
667 	if (!transport->sock)
668 		goto out;
669 
670 	switch (status) {
671 	case -ENOTSOCK:
672 		status = -ENOTCONN;
673 		/* Should we call xs_close() here? */
674 		break;
675 	case -EAGAIN:
676 		status = xs_nospace(task);
677 		break;
678 	default:
679 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
680 			-status);
681 	case -ECONNRESET:
682 	case -EPIPE:
683 		xs_tcp_shutdown(xprt);
684 	case -ECONNREFUSED:
685 	case -ENOTCONN:
686 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
687 	}
688 out:
689 	return status;
690 }
691 
692 /**
693  * xs_tcp_release_xprt - clean up after a tcp transmission
694  * @xprt: transport
695  * @task: rpc task
696  *
697  * This cleans up if an error causes us to abort the transmission of a request.
698  * In this case, the socket may need to be reset in order to avoid confusing
699  * the server.
700  */
701 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
702 {
703 	struct rpc_rqst *req;
704 
705 	if (task != xprt->snd_task)
706 		return;
707 	if (task == NULL)
708 		goto out_release;
709 	req = task->tk_rqstp;
710 	if (req->rq_bytes_sent == 0)
711 		goto out_release;
712 	if (req->rq_bytes_sent == req->rq_snd_buf.len)
713 		goto out_release;
714 	set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
715 out_release:
716 	xprt_release_xprt(xprt, task);
717 }
718 
719 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
720 {
721 	transport->old_data_ready = sk->sk_data_ready;
722 	transport->old_state_change = sk->sk_state_change;
723 	transport->old_write_space = sk->sk_write_space;
724 	transport->old_error_report = sk->sk_error_report;
725 }
726 
727 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
728 {
729 	sk->sk_data_ready = transport->old_data_ready;
730 	sk->sk_state_change = transport->old_state_change;
731 	sk->sk_write_space = transport->old_write_space;
732 	sk->sk_error_report = transport->old_error_report;
733 }
734 
735 static void xs_reset_transport(struct sock_xprt *transport)
736 {
737 	struct socket *sock = transport->sock;
738 	struct sock *sk = transport->inet;
739 
740 	if (sk == NULL)
741 		return;
742 
743 	write_lock_bh(&sk->sk_callback_lock);
744 	transport->inet = NULL;
745 	transport->sock = NULL;
746 
747 	sk->sk_user_data = NULL;
748 
749 	xs_restore_old_callbacks(transport, sk);
750 	write_unlock_bh(&sk->sk_callback_lock);
751 
752 	sk->sk_no_check = 0;
753 
754 	sock_release(sock);
755 }
756 
757 /**
758  * xs_close - close a socket
759  * @xprt: transport
760  *
761  * This is used when all requests are complete; ie, no DRC state remains
762  * on the server we want to save.
763  *
764  * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
765  * xs_reset_transport() zeroing the socket from underneath a writer.
766  */
767 static void xs_close(struct rpc_xprt *xprt)
768 {
769 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
770 
771 	dprintk("RPC:       xs_close xprt %p\n", xprt);
772 
773 	xs_reset_transport(transport);
774 
775 	smp_mb__before_clear_bit();
776 	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
777 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
778 	clear_bit(XPRT_CLOSING, &xprt->state);
779 	smp_mb__after_clear_bit();
780 	xprt_disconnect_done(xprt);
781 }
782 
783 static void xs_tcp_close(struct rpc_xprt *xprt)
784 {
785 	if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
786 		xs_close(xprt);
787 	else
788 		xs_tcp_shutdown(xprt);
789 }
790 
791 /**
792  * xs_destroy - prepare to shutdown a transport
793  * @xprt: doomed transport
794  *
795  */
796 static void xs_destroy(struct rpc_xprt *xprt)
797 {
798 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
799 
800 	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
801 
802 	cancel_rearming_delayed_work(&transport->connect_worker);
803 
804 	xs_close(xprt);
805 	xs_free_peer_addresses(xprt);
806 	kfree(xprt->slot);
807 	kfree(xprt);
808 	module_put(THIS_MODULE);
809 }
810 
811 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
812 {
813 	return (struct rpc_xprt *) sk->sk_user_data;
814 }
815 
816 /**
817  * xs_udp_data_ready - "data ready" callback for UDP sockets
818  * @sk: socket with data to read
819  * @len: how much data to read
820  *
821  */
822 static void xs_udp_data_ready(struct sock *sk, int len)
823 {
824 	struct rpc_task *task;
825 	struct rpc_xprt *xprt;
826 	struct rpc_rqst *rovr;
827 	struct sk_buff *skb;
828 	int err, repsize, copied;
829 	u32 _xid;
830 	__be32 *xp;
831 
832 	read_lock(&sk->sk_callback_lock);
833 	dprintk("RPC:       xs_udp_data_ready...\n");
834 	if (!(xprt = xprt_from_sock(sk)))
835 		goto out;
836 
837 	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
838 		goto out;
839 
840 	if (xprt->shutdown)
841 		goto dropit;
842 
843 	repsize = skb->len - sizeof(struct udphdr);
844 	if (repsize < 4) {
845 		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
846 		goto dropit;
847 	}
848 
849 	/* Copy the XID from the skb... */
850 	xp = skb_header_pointer(skb, sizeof(struct udphdr),
851 				sizeof(_xid), &_xid);
852 	if (xp == NULL)
853 		goto dropit;
854 
855 	/* Look up and lock the request corresponding to the given XID */
856 	spin_lock(&xprt->transport_lock);
857 	rovr = xprt_lookup_rqst(xprt, *xp);
858 	if (!rovr)
859 		goto out_unlock;
860 	task = rovr->rq_task;
861 
862 	if ((copied = rovr->rq_private_buf.buflen) > repsize)
863 		copied = repsize;
864 
865 	/* Suck it into the iovec, verify checksum if not done by hw. */
866 	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
867 		UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
868 		goto out_unlock;
869 	}
870 
871 	UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
872 
873 	/* Something worked... */
874 	dst_confirm(skb_dst(skb));
875 
876 	xprt_adjust_cwnd(task, copied);
877 	xprt_update_rtt(task);
878 	xprt_complete_rqst(task, copied);
879 
880  out_unlock:
881 	spin_unlock(&xprt->transport_lock);
882  dropit:
883 	skb_free_datagram(sk, skb);
884  out:
885 	read_unlock(&sk->sk_callback_lock);
886 }
887 
888 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
889 {
890 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
891 	size_t len, used;
892 	char *p;
893 
894 	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
895 	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
896 	used = xdr_skb_read_bits(desc, p, len);
897 	transport->tcp_offset += used;
898 	if (used != len)
899 		return;
900 
901 	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
902 	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
903 		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
904 	else
905 		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
906 	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
907 
908 	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
909 	transport->tcp_offset = 0;
910 
911 	/* Sanity check of the record length */
912 	if (unlikely(transport->tcp_reclen < 8)) {
913 		dprintk("RPC:       invalid TCP record fragment length\n");
914 		xprt_force_disconnect(xprt);
915 		return;
916 	}
917 	dprintk("RPC:       reading TCP record fragment of length %d\n",
918 			transport->tcp_reclen);
919 }
920 
921 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
922 {
923 	if (transport->tcp_offset == transport->tcp_reclen) {
924 		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
925 		transport->tcp_offset = 0;
926 		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
927 			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
928 			transport->tcp_flags |= TCP_RCV_COPY_XID;
929 			transport->tcp_copied = 0;
930 		}
931 	}
932 }
933 
934 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
935 {
936 	size_t len, used;
937 	char *p;
938 
939 	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
940 	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
941 	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
942 	used = xdr_skb_read_bits(desc, p, len);
943 	transport->tcp_offset += used;
944 	if (used != len)
945 		return;
946 	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
947 	transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
948 	transport->tcp_copied = 4;
949 	dprintk("RPC:       reading %s XID %08x\n",
950 			(transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
951 							      : "request with",
952 			ntohl(transport->tcp_xid));
953 	xs_tcp_check_fraghdr(transport);
954 }
955 
956 static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
957 				       struct xdr_skb_reader *desc)
958 {
959 	size_t len, used;
960 	u32 offset;
961 	__be32	calldir;
962 
963 	/*
964 	 * We want transport->tcp_offset to be 8 at the end of this routine
965 	 * (4 bytes for the xid and 4 bytes for the call/reply flag).
966 	 * When this function is called for the first time,
967 	 * transport->tcp_offset is 4 (after having already read the xid).
968 	 */
969 	offset = transport->tcp_offset - sizeof(transport->tcp_xid);
970 	len = sizeof(calldir) - offset;
971 	dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
972 	used = xdr_skb_read_bits(desc, &calldir, len);
973 	transport->tcp_offset += used;
974 	if (used != len)
975 		return;
976 	transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
977 	transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
978 	transport->tcp_flags |= TCP_RCV_COPY_DATA;
979 	/*
980 	 * We don't yet have the XDR buffer, so we will write the calldir
981 	 * out after we get the buffer from the 'struct rpc_rqst'
982 	 */
983 	if (ntohl(calldir) == RPC_REPLY)
984 		transport->tcp_flags |= TCP_RPC_REPLY;
985 	else
986 		transport->tcp_flags &= ~TCP_RPC_REPLY;
987 	dprintk("RPC:       reading %s CALL/REPLY flag %08x\n",
988 			(transport->tcp_flags & TCP_RPC_REPLY) ?
989 				"reply for" : "request with", calldir);
990 	xs_tcp_check_fraghdr(transport);
991 }
992 
993 static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
994 				     struct xdr_skb_reader *desc,
995 				     struct rpc_rqst *req)
996 {
997 	struct sock_xprt *transport =
998 				container_of(xprt, struct sock_xprt, xprt);
999 	struct xdr_buf *rcvbuf;
1000 	size_t len;
1001 	ssize_t r;
1002 
1003 	rcvbuf = &req->rq_private_buf;
1004 
1005 	if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1006 		/*
1007 		 * Save the RPC direction in the XDR buffer
1008 		 */
1009 		__be32	calldir = transport->tcp_flags & TCP_RPC_REPLY ?
1010 					htonl(RPC_REPLY) : 0;
1011 
1012 		memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1013 			&calldir, sizeof(calldir));
1014 		transport->tcp_copied += sizeof(calldir);
1015 		transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1016 	}
1017 
1018 	len = desc->count;
1019 	if (len > transport->tcp_reclen - transport->tcp_offset) {
1020 		struct xdr_skb_reader my_desc;
1021 
1022 		len = transport->tcp_reclen - transport->tcp_offset;
1023 		memcpy(&my_desc, desc, sizeof(my_desc));
1024 		my_desc.count = len;
1025 		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1026 					  &my_desc, xdr_skb_read_bits);
1027 		desc->count -= r;
1028 		desc->offset += r;
1029 	} else
1030 		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1031 					  desc, xdr_skb_read_bits);
1032 
1033 	if (r > 0) {
1034 		transport->tcp_copied += r;
1035 		transport->tcp_offset += r;
1036 	}
1037 	if (r != len) {
1038 		/* Error when copying to the receive buffer,
1039 		 * usually because we weren't able to allocate
1040 		 * additional buffer pages. All we can do now
1041 		 * is turn off TCP_RCV_COPY_DATA, so the request
1042 		 * will not receive any additional updates,
1043 		 * and time out.
1044 		 * Any remaining data from this record will
1045 		 * be discarded.
1046 		 */
1047 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1048 		dprintk("RPC:       XID %08x truncated request\n",
1049 				ntohl(transport->tcp_xid));
1050 		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1051 				"tcp_offset = %u, tcp_reclen = %u\n",
1052 				xprt, transport->tcp_copied,
1053 				transport->tcp_offset, transport->tcp_reclen);
1054 		return;
1055 	}
1056 
1057 	dprintk("RPC:       XID %08x read %Zd bytes\n",
1058 			ntohl(transport->tcp_xid), r);
1059 	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1060 			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
1061 			transport->tcp_offset, transport->tcp_reclen);
1062 
1063 	if (transport->tcp_copied == req->rq_private_buf.buflen)
1064 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1065 	else if (transport->tcp_offset == transport->tcp_reclen) {
1066 		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1067 			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1068 	}
1069 
1070 	return;
1071 }
1072 
1073 /*
1074  * Finds the request corresponding to the RPC xid and invokes the common
1075  * tcp read code to read the data.
1076  */
1077 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1078 				    struct xdr_skb_reader *desc)
1079 {
1080 	struct sock_xprt *transport =
1081 				container_of(xprt, struct sock_xprt, xprt);
1082 	struct rpc_rqst *req;
1083 
1084 	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1085 
1086 	/* Find and lock the request corresponding to this xid */
1087 	spin_lock(&xprt->transport_lock);
1088 	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1089 	if (!req) {
1090 		dprintk("RPC:       XID %08x request not found!\n",
1091 				ntohl(transport->tcp_xid));
1092 		spin_unlock(&xprt->transport_lock);
1093 		return -1;
1094 	}
1095 
1096 	xs_tcp_read_common(xprt, desc, req);
1097 
1098 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1099 		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1100 
1101 	spin_unlock(&xprt->transport_lock);
1102 	return 0;
1103 }
1104 
1105 #if defined(CONFIG_NFS_V4_1)
1106 /*
1107  * Obtains an rpc_rqst previously allocated and invokes the common
1108  * tcp read code to read the data.  The result is placed in the callback
1109  * queue.
1110  * If we're unable to obtain the rpc_rqst we schedule the closing of the
1111  * connection and return -1.
1112  */
1113 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1114 				       struct xdr_skb_reader *desc)
1115 {
1116 	struct sock_xprt *transport =
1117 				container_of(xprt, struct sock_xprt, xprt);
1118 	struct rpc_rqst *req;
1119 
1120 	req = xprt_alloc_bc_request(xprt);
1121 	if (req == NULL) {
1122 		printk(KERN_WARNING "Callback slot table overflowed\n");
1123 		xprt_force_disconnect(xprt);
1124 		return -1;
1125 	}
1126 
1127 	req->rq_xid = transport->tcp_xid;
1128 	dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1129 	xs_tcp_read_common(xprt, desc, req);
1130 
1131 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1132 		struct svc_serv *bc_serv = xprt->bc_serv;
1133 
1134 		/*
1135 		 * Add callback request to callback list.  The callback
1136 		 * service sleeps on the sv_cb_waitq waiting for new
1137 		 * requests.  Wake it up after adding enqueing the
1138 		 * request.
1139 		 */
1140 		dprintk("RPC:       add callback request to list\n");
1141 		spin_lock(&bc_serv->sv_cb_lock);
1142 		list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1143 		spin_unlock(&bc_serv->sv_cb_lock);
1144 		wake_up(&bc_serv->sv_cb_waitq);
1145 	}
1146 
1147 	req->rq_private_buf.len = transport->tcp_copied;
1148 
1149 	return 0;
1150 }
1151 
1152 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1153 					struct xdr_skb_reader *desc)
1154 {
1155 	struct sock_xprt *transport =
1156 				container_of(xprt, struct sock_xprt, xprt);
1157 
1158 	return (transport->tcp_flags & TCP_RPC_REPLY) ?
1159 		xs_tcp_read_reply(xprt, desc) :
1160 		xs_tcp_read_callback(xprt, desc);
1161 }
1162 #else
1163 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1164 					struct xdr_skb_reader *desc)
1165 {
1166 	return xs_tcp_read_reply(xprt, desc);
1167 }
1168 #endif /* CONFIG_NFS_V4_1 */
1169 
1170 /*
1171  * Read data off the transport.  This can be either an RPC_CALL or an
1172  * RPC_REPLY.  Relay the processing to helper functions.
1173  */
1174 static void xs_tcp_read_data(struct rpc_xprt *xprt,
1175 				    struct xdr_skb_reader *desc)
1176 {
1177 	struct sock_xprt *transport =
1178 				container_of(xprt, struct sock_xprt, xprt);
1179 
1180 	if (_xs_tcp_read_data(xprt, desc) == 0)
1181 		xs_tcp_check_fraghdr(transport);
1182 	else {
1183 		/*
1184 		 * The transport_lock protects the request handling.
1185 		 * There's no need to hold it to update the tcp_flags.
1186 		 */
1187 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1188 	}
1189 }
1190 
1191 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1192 {
1193 	size_t len;
1194 
1195 	len = transport->tcp_reclen - transport->tcp_offset;
1196 	if (len > desc->count)
1197 		len = desc->count;
1198 	desc->count -= len;
1199 	desc->offset += len;
1200 	transport->tcp_offset += len;
1201 	dprintk("RPC:       discarded %Zu bytes\n", len);
1202 	xs_tcp_check_fraghdr(transport);
1203 }
1204 
1205 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1206 {
1207 	struct rpc_xprt *xprt = rd_desc->arg.data;
1208 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1209 	struct xdr_skb_reader desc = {
1210 		.skb	= skb,
1211 		.offset	= offset,
1212 		.count	= len,
1213 	};
1214 
1215 	dprintk("RPC:       xs_tcp_data_recv started\n");
1216 	do {
1217 		/* Read in a new fragment marker if necessary */
1218 		/* Can we ever really expect to get completely empty fragments? */
1219 		if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1220 			xs_tcp_read_fraghdr(xprt, &desc);
1221 			continue;
1222 		}
1223 		/* Read in the xid if necessary */
1224 		if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1225 			xs_tcp_read_xid(transport, &desc);
1226 			continue;
1227 		}
1228 		/* Read in the call/reply flag */
1229 		if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1230 			xs_tcp_read_calldir(transport, &desc);
1231 			continue;
1232 		}
1233 		/* Read in the request data */
1234 		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1235 			xs_tcp_read_data(xprt, &desc);
1236 			continue;
1237 		}
1238 		/* Skip over any trailing bytes on short reads */
1239 		xs_tcp_read_discard(transport, &desc);
1240 	} while (desc.count);
1241 	dprintk("RPC:       xs_tcp_data_recv done\n");
1242 	return len - desc.count;
1243 }
1244 
1245 /**
1246  * xs_tcp_data_ready - "data ready" callback for TCP sockets
1247  * @sk: socket with data to read
1248  * @bytes: how much data to read
1249  *
1250  */
1251 static void xs_tcp_data_ready(struct sock *sk, int bytes)
1252 {
1253 	struct rpc_xprt *xprt;
1254 	read_descriptor_t rd_desc;
1255 	int read;
1256 
1257 	dprintk("RPC:       xs_tcp_data_ready...\n");
1258 
1259 	read_lock(&sk->sk_callback_lock);
1260 	if (!(xprt = xprt_from_sock(sk)))
1261 		goto out;
1262 	if (xprt->shutdown)
1263 		goto out;
1264 
1265 	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1266 	rd_desc.arg.data = xprt;
1267 	do {
1268 		rd_desc.count = 65536;
1269 		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1270 	} while (read > 0);
1271 out:
1272 	read_unlock(&sk->sk_callback_lock);
1273 }
1274 
1275 /*
1276  * Do the equivalent of linger/linger2 handling for dealing with
1277  * broken servers that don't close the socket in a timely
1278  * fashion
1279  */
1280 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1281 		unsigned long timeout)
1282 {
1283 	struct sock_xprt *transport;
1284 
1285 	if (xprt_test_and_set_connecting(xprt))
1286 		return;
1287 	set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1288 	transport = container_of(xprt, struct sock_xprt, xprt);
1289 	queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1290 			   timeout);
1291 }
1292 
1293 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1294 {
1295 	struct sock_xprt *transport;
1296 
1297 	transport = container_of(xprt, struct sock_xprt, xprt);
1298 
1299 	if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1300 	    !cancel_delayed_work(&transport->connect_worker))
1301 		return;
1302 	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1303 	xprt_clear_connecting(xprt);
1304 }
1305 
1306 static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1307 {
1308 	smp_mb__before_clear_bit();
1309 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1310 	clear_bit(XPRT_CLOSING, &xprt->state);
1311 	smp_mb__after_clear_bit();
1312 	/* Mark transport as closed and wake up all pending tasks */
1313 	xprt_disconnect_done(xprt);
1314 }
1315 
1316 /**
1317  * xs_tcp_state_change - callback to handle TCP socket state changes
1318  * @sk: socket whose state has changed
1319  *
1320  */
1321 static void xs_tcp_state_change(struct sock *sk)
1322 {
1323 	struct rpc_xprt *xprt;
1324 
1325 	read_lock(&sk->sk_callback_lock);
1326 	if (!(xprt = xprt_from_sock(sk)))
1327 		goto out;
1328 	dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1329 	dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
1330 			sk->sk_state, xprt_connected(xprt),
1331 			sock_flag(sk, SOCK_DEAD),
1332 			sock_flag(sk, SOCK_ZAPPED));
1333 
1334 	switch (sk->sk_state) {
1335 	case TCP_ESTABLISHED:
1336 		spin_lock_bh(&xprt->transport_lock);
1337 		if (!xprt_test_and_set_connected(xprt)) {
1338 			struct sock_xprt *transport = container_of(xprt,
1339 					struct sock_xprt, xprt);
1340 
1341 			/* Reset TCP record info */
1342 			transport->tcp_offset = 0;
1343 			transport->tcp_reclen = 0;
1344 			transport->tcp_copied = 0;
1345 			transport->tcp_flags =
1346 				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1347 
1348 			xprt_wake_pending_tasks(xprt, -EAGAIN);
1349 		}
1350 		spin_unlock_bh(&xprt->transport_lock);
1351 		break;
1352 	case TCP_FIN_WAIT1:
1353 		/* The client initiated a shutdown of the socket */
1354 		xprt->connect_cookie++;
1355 		xprt->reestablish_timeout = 0;
1356 		set_bit(XPRT_CLOSING, &xprt->state);
1357 		smp_mb__before_clear_bit();
1358 		clear_bit(XPRT_CONNECTED, &xprt->state);
1359 		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1360 		smp_mb__after_clear_bit();
1361 		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1362 		break;
1363 	case TCP_CLOSE_WAIT:
1364 		/* The server initiated a shutdown of the socket */
1365 		xprt_force_disconnect(xprt);
1366 	case TCP_SYN_SENT:
1367 		xprt->connect_cookie++;
1368 	case TCP_CLOSING:
1369 		/*
1370 		 * If the server closed down the connection, make sure that
1371 		 * we back off before reconnecting
1372 		 */
1373 		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1374 			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1375 		break;
1376 	case TCP_LAST_ACK:
1377 		set_bit(XPRT_CLOSING, &xprt->state);
1378 		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1379 		smp_mb__before_clear_bit();
1380 		clear_bit(XPRT_CONNECTED, &xprt->state);
1381 		smp_mb__after_clear_bit();
1382 		break;
1383 	case TCP_CLOSE:
1384 		xs_tcp_cancel_linger_timeout(xprt);
1385 		xs_sock_mark_closed(xprt);
1386 	}
1387  out:
1388 	read_unlock(&sk->sk_callback_lock);
1389 }
1390 
1391 /**
1392  * xs_error_report - callback mainly for catching socket errors
1393  * @sk: socket
1394  */
1395 static void xs_error_report(struct sock *sk)
1396 {
1397 	struct rpc_xprt *xprt;
1398 
1399 	read_lock(&sk->sk_callback_lock);
1400 	if (!(xprt = xprt_from_sock(sk)))
1401 		goto out;
1402 	dprintk("RPC:       %s client %p...\n"
1403 			"RPC:       error %d\n",
1404 			__func__, xprt, sk->sk_err);
1405 	xprt_wake_pending_tasks(xprt, -EAGAIN);
1406 out:
1407 	read_unlock(&sk->sk_callback_lock);
1408 }
1409 
1410 static void xs_write_space(struct sock *sk)
1411 {
1412 	struct socket *sock;
1413 	struct rpc_xprt *xprt;
1414 
1415 	if (unlikely(!(sock = sk->sk_socket)))
1416 		return;
1417 	clear_bit(SOCK_NOSPACE, &sock->flags);
1418 
1419 	if (unlikely(!(xprt = xprt_from_sock(sk))))
1420 		return;
1421 	if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1422 		return;
1423 
1424 	xprt_write_space(xprt);
1425 }
1426 
1427 /**
1428  * xs_udp_write_space - callback invoked when socket buffer space
1429  *                             becomes available
1430  * @sk: socket whose state has changed
1431  *
1432  * Called when more output buffer space is available for this socket.
1433  * We try not to wake our writers until they can make "significant"
1434  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1435  * with a bunch of small requests.
1436  */
1437 static void xs_udp_write_space(struct sock *sk)
1438 {
1439 	read_lock(&sk->sk_callback_lock);
1440 
1441 	/* from net/core/sock.c:sock_def_write_space */
1442 	if (sock_writeable(sk))
1443 		xs_write_space(sk);
1444 
1445 	read_unlock(&sk->sk_callback_lock);
1446 }
1447 
1448 /**
1449  * xs_tcp_write_space - callback invoked when socket buffer space
1450  *                             becomes available
1451  * @sk: socket whose state has changed
1452  *
1453  * Called when more output buffer space is available for this socket.
1454  * We try not to wake our writers until they can make "significant"
1455  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1456  * with a bunch of small requests.
1457  */
1458 static void xs_tcp_write_space(struct sock *sk)
1459 {
1460 	read_lock(&sk->sk_callback_lock);
1461 
1462 	/* from net/core/stream.c:sk_stream_write_space */
1463 	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1464 		xs_write_space(sk);
1465 
1466 	read_unlock(&sk->sk_callback_lock);
1467 }
1468 
1469 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1470 {
1471 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1472 	struct sock *sk = transport->inet;
1473 
1474 	if (transport->rcvsize) {
1475 		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1476 		sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1477 	}
1478 	if (transport->sndsize) {
1479 		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1480 		sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1481 		sk->sk_write_space(sk);
1482 	}
1483 }
1484 
1485 /**
1486  * xs_udp_set_buffer_size - set send and receive limits
1487  * @xprt: generic transport
1488  * @sndsize: requested size of send buffer, in bytes
1489  * @rcvsize: requested size of receive buffer, in bytes
1490  *
1491  * Set socket send and receive buffer size limits.
1492  */
1493 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1494 {
1495 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1496 
1497 	transport->sndsize = 0;
1498 	if (sndsize)
1499 		transport->sndsize = sndsize + 1024;
1500 	transport->rcvsize = 0;
1501 	if (rcvsize)
1502 		transport->rcvsize = rcvsize + 1024;
1503 
1504 	xs_udp_do_set_buffer_size(xprt);
1505 }
1506 
1507 /**
1508  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1509  * @task: task that timed out
1510  *
1511  * Adjust the congestion window after a retransmit timeout has occurred.
1512  */
1513 static void xs_udp_timer(struct rpc_task *task)
1514 {
1515 	xprt_adjust_cwnd(task, -ETIMEDOUT);
1516 }
1517 
1518 static unsigned short xs_get_random_port(void)
1519 {
1520 	unsigned short range = xprt_max_resvport - xprt_min_resvport;
1521 	unsigned short rand = (unsigned short) net_random() % range;
1522 	return rand + xprt_min_resvport;
1523 }
1524 
1525 /**
1526  * xs_set_port - reset the port number in the remote endpoint address
1527  * @xprt: generic transport
1528  * @port: new port number
1529  *
1530  */
1531 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1532 {
1533 	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1534 
1535 	rpc_set_port(xs_addr(xprt), port);
1536 	xs_update_peer_port(xprt);
1537 }
1538 
1539 static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock)
1540 {
1541 	unsigned short port = transport->srcport;
1542 
1543 	if (port == 0 && transport->xprt.resvport)
1544 		port = xs_get_random_port();
1545 	return port;
1546 }
1547 
1548 static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port)
1549 {
1550 	if (transport->srcport != 0)
1551 		transport->srcport = 0;
1552 	if (!transport->xprt.resvport)
1553 		return 0;
1554 	if (port <= xprt_min_resvport || port > xprt_max_resvport)
1555 		return xprt_max_resvport;
1556 	return --port;
1557 }
1558 
1559 static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
1560 {
1561 	struct sockaddr_in myaddr = {
1562 		.sin_family = AF_INET,
1563 	};
1564 	struct sockaddr_in *sa;
1565 	int err, nloop = 0;
1566 	unsigned short port = xs_get_srcport(transport, sock);
1567 	unsigned short last;
1568 
1569 	sa = (struct sockaddr_in *)&transport->srcaddr;
1570 	myaddr.sin_addr = sa->sin_addr;
1571 	do {
1572 		myaddr.sin_port = htons(port);
1573 		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1574 						sizeof(myaddr));
1575 		if (port == 0)
1576 			break;
1577 		if (err == 0) {
1578 			transport->srcport = port;
1579 			break;
1580 		}
1581 		last = port;
1582 		port = xs_next_srcport(transport, sock, port);
1583 		if (port > last)
1584 			nloop++;
1585 	} while (err == -EADDRINUSE && nloop != 2);
1586 	dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
1587 			__func__, &myaddr.sin_addr,
1588 			port, err ? "failed" : "ok", err);
1589 	return err;
1590 }
1591 
1592 static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
1593 {
1594 	struct sockaddr_in6 myaddr = {
1595 		.sin6_family = AF_INET6,
1596 	};
1597 	struct sockaddr_in6 *sa;
1598 	int err, nloop = 0;
1599 	unsigned short port = xs_get_srcport(transport, sock);
1600 	unsigned short last;
1601 
1602 	sa = (struct sockaddr_in6 *)&transport->srcaddr;
1603 	myaddr.sin6_addr = sa->sin6_addr;
1604 	do {
1605 		myaddr.sin6_port = htons(port);
1606 		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1607 						sizeof(myaddr));
1608 		if (port == 0)
1609 			break;
1610 		if (err == 0) {
1611 			transport->srcport = port;
1612 			break;
1613 		}
1614 		last = port;
1615 		port = xs_next_srcport(transport, sock, port);
1616 		if (port > last)
1617 			nloop++;
1618 	} while (err == -EADDRINUSE && nloop != 2);
1619 	dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
1620 		&myaddr.sin6_addr, port, err ? "failed" : "ok", err);
1621 	return err;
1622 }
1623 
1624 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1625 static struct lock_class_key xs_key[2];
1626 static struct lock_class_key xs_slock_key[2];
1627 
1628 static inline void xs_reclassify_socket4(struct socket *sock)
1629 {
1630 	struct sock *sk = sock->sk;
1631 
1632 	BUG_ON(sock_owned_by_user(sk));
1633 	sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1634 		&xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1635 }
1636 
1637 static inline void xs_reclassify_socket6(struct socket *sock)
1638 {
1639 	struct sock *sk = sock->sk;
1640 
1641 	BUG_ON(sock_owned_by_user(sk));
1642 	sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1643 		&xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1644 }
1645 #else
1646 static inline void xs_reclassify_socket4(struct socket *sock)
1647 {
1648 }
1649 
1650 static inline void xs_reclassify_socket6(struct socket *sock)
1651 {
1652 }
1653 #endif
1654 
1655 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1656 {
1657 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1658 
1659 	if (!transport->inet) {
1660 		struct sock *sk = sock->sk;
1661 
1662 		write_lock_bh(&sk->sk_callback_lock);
1663 
1664 		xs_save_old_callbacks(transport, sk);
1665 
1666 		sk->sk_user_data = xprt;
1667 		sk->sk_data_ready = xs_udp_data_ready;
1668 		sk->sk_write_space = xs_udp_write_space;
1669 		sk->sk_error_report = xs_error_report;
1670 		sk->sk_no_check = UDP_CSUM_NORCV;
1671 		sk->sk_allocation = GFP_ATOMIC;
1672 
1673 		xprt_set_connected(xprt);
1674 
1675 		/* Reset to new socket */
1676 		transport->sock = sock;
1677 		transport->inet = sk;
1678 
1679 		write_unlock_bh(&sk->sk_callback_lock);
1680 	}
1681 	xs_udp_do_set_buffer_size(xprt);
1682 }
1683 
1684 /**
1685  * xs_udp_connect_worker4 - set up a UDP socket
1686  * @work: RPC transport to connect
1687  *
1688  * Invoked by a work queue tasklet.
1689  */
1690 static void xs_udp_connect_worker4(struct work_struct *work)
1691 {
1692 	struct sock_xprt *transport =
1693 		container_of(work, struct sock_xprt, connect_worker.work);
1694 	struct rpc_xprt *xprt = &transport->xprt;
1695 	struct socket *sock = transport->sock;
1696 	int err, status = -EIO;
1697 
1698 	if (xprt->shutdown)
1699 		goto out;
1700 
1701 	/* Start by resetting any existing state */
1702 	xs_reset_transport(transport);
1703 
1704 	err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock);
1705 	if (err < 0) {
1706 		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1707 		goto out;
1708 	}
1709 	xs_reclassify_socket4(sock);
1710 
1711 	if (xs_bind4(transport, sock)) {
1712 		sock_release(sock);
1713 		goto out;
1714 	}
1715 
1716 	dprintk("RPC:       worker connecting xprt %p via %s to "
1717 				"%s (port %s)\n", xprt,
1718 			xprt->address_strings[RPC_DISPLAY_PROTO],
1719 			xprt->address_strings[RPC_DISPLAY_ADDR],
1720 			xprt->address_strings[RPC_DISPLAY_PORT]);
1721 
1722 	xs_udp_finish_connecting(xprt, sock);
1723 	status = 0;
1724 out:
1725 	xprt_clear_connecting(xprt);
1726 	xprt_wake_pending_tasks(xprt, status);
1727 }
1728 
1729 /**
1730  * xs_udp_connect_worker6 - set up a UDP socket
1731  * @work: RPC transport to connect
1732  *
1733  * Invoked by a work queue tasklet.
1734  */
1735 static void xs_udp_connect_worker6(struct work_struct *work)
1736 {
1737 	struct sock_xprt *transport =
1738 		container_of(work, struct sock_xprt, connect_worker.work);
1739 	struct rpc_xprt *xprt = &transport->xprt;
1740 	struct socket *sock = transport->sock;
1741 	int err, status = -EIO;
1742 
1743 	if (xprt->shutdown)
1744 		goto out;
1745 
1746 	/* Start by resetting any existing state */
1747 	xs_reset_transport(transport);
1748 
1749 	err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock);
1750 	if (err < 0) {
1751 		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1752 		goto out;
1753 	}
1754 	xs_reclassify_socket6(sock);
1755 
1756 	if (xs_bind6(transport, sock) < 0) {
1757 		sock_release(sock);
1758 		goto out;
1759 	}
1760 
1761 	dprintk("RPC:       worker connecting xprt %p via %s to "
1762 				"%s (port %s)\n", xprt,
1763 			xprt->address_strings[RPC_DISPLAY_PROTO],
1764 			xprt->address_strings[RPC_DISPLAY_ADDR],
1765 			xprt->address_strings[RPC_DISPLAY_PORT]);
1766 
1767 	xs_udp_finish_connecting(xprt, sock);
1768 	status = 0;
1769 out:
1770 	xprt_clear_connecting(xprt);
1771 	xprt_wake_pending_tasks(xprt, status);
1772 }
1773 
1774 /*
1775  * We need to preserve the port number so the reply cache on the server can
1776  * find our cached RPC replies when we get around to reconnecting.
1777  */
1778 static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1779 {
1780 	int result;
1781 	struct sockaddr any;
1782 
1783 	dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1784 
1785 	/*
1786 	 * Disconnect the transport socket by doing a connect operation
1787 	 * with AF_UNSPEC.  This should return immediately...
1788 	 */
1789 	memset(&any, 0, sizeof(any));
1790 	any.sa_family = AF_UNSPEC;
1791 	result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1792 	if (!result)
1793 		xs_sock_mark_closed(xprt);
1794 	else
1795 		dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1796 				result);
1797 }
1798 
1799 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1800 {
1801 	unsigned int state = transport->inet->sk_state;
1802 
1803 	if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
1804 		return;
1805 	if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
1806 		return;
1807 	xs_abort_connection(xprt, transport);
1808 }
1809 
1810 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1811 {
1812 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1813 
1814 	if (!transport->inet) {
1815 		struct sock *sk = sock->sk;
1816 
1817 		write_lock_bh(&sk->sk_callback_lock);
1818 
1819 		xs_save_old_callbacks(transport, sk);
1820 
1821 		sk->sk_user_data = xprt;
1822 		sk->sk_data_ready = xs_tcp_data_ready;
1823 		sk->sk_state_change = xs_tcp_state_change;
1824 		sk->sk_write_space = xs_tcp_write_space;
1825 		sk->sk_error_report = xs_error_report;
1826 		sk->sk_allocation = GFP_ATOMIC;
1827 
1828 		/* socket options */
1829 		sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1830 		sock_reset_flag(sk, SOCK_LINGER);
1831 		tcp_sk(sk)->linger2 = 0;
1832 		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1833 
1834 		xprt_clear_connected(xprt);
1835 
1836 		/* Reset to new socket */
1837 		transport->sock = sock;
1838 		transport->inet = sk;
1839 
1840 		write_unlock_bh(&sk->sk_callback_lock);
1841 	}
1842 
1843 	if (!xprt_bound(xprt))
1844 		return -ENOTCONN;
1845 
1846 	/* Tell the socket layer to start connecting... */
1847 	xprt->stat.connect_count++;
1848 	xprt->stat.connect_start = jiffies;
1849 	return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
1850 }
1851 
1852 /**
1853  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
1854  * @xprt: RPC transport to connect
1855  * @transport: socket transport to connect
1856  * @create_sock: function to create a socket of the correct type
1857  *
1858  * Invoked by a work queue tasklet.
1859  */
1860 static void xs_tcp_setup_socket(struct rpc_xprt *xprt,
1861 		struct sock_xprt *transport,
1862 		struct socket *(*create_sock)(struct rpc_xprt *,
1863 			struct sock_xprt *))
1864 {
1865 	struct socket *sock = transport->sock;
1866 	int status = -EIO;
1867 
1868 	if (xprt->shutdown)
1869 		goto out;
1870 
1871 	if (!sock) {
1872 		clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1873 		sock = create_sock(xprt, transport);
1874 		if (IS_ERR(sock)) {
1875 			status = PTR_ERR(sock);
1876 			goto out;
1877 		}
1878 	} else {
1879 		int abort_and_exit;
1880 
1881 		abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
1882 				&xprt->state);
1883 		/* "close" the socket, preserving the local port */
1884 		xs_tcp_reuse_connection(xprt, transport);
1885 
1886 		if (abort_and_exit)
1887 			goto out_eagain;
1888 	}
1889 
1890 	dprintk("RPC:       worker connecting xprt %p via %s to "
1891 				"%s (port %s)\n", xprt,
1892 			xprt->address_strings[RPC_DISPLAY_PROTO],
1893 			xprt->address_strings[RPC_DISPLAY_ADDR],
1894 			xprt->address_strings[RPC_DISPLAY_PORT]);
1895 
1896 	status = xs_tcp_finish_connecting(xprt, sock);
1897 	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1898 			xprt, -status, xprt_connected(xprt),
1899 			sock->sk->sk_state);
1900 	switch (status) {
1901 	default:
1902 		printk("%s: connect returned unhandled error %d\n",
1903 			__func__, status);
1904 	case -EADDRNOTAVAIL:
1905 		/* We're probably in TIME_WAIT. Get rid of existing socket,
1906 		 * and retry
1907 		 */
1908 		set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1909 		xprt_force_disconnect(xprt);
1910 		break;
1911 	case -ECONNREFUSED:
1912 	case -ECONNRESET:
1913 	case -ENETUNREACH:
1914 		/* retry with existing socket, after a delay */
1915 	case 0:
1916 	case -EINPROGRESS:
1917 	case -EALREADY:
1918 		xprt_clear_connecting(xprt);
1919 		return;
1920 	}
1921 out_eagain:
1922 	status = -EAGAIN;
1923 out:
1924 	xprt_clear_connecting(xprt);
1925 	xprt_wake_pending_tasks(xprt, status);
1926 }
1927 
1928 static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt,
1929 		struct sock_xprt *transport)
1930 {
1931 	struct socket *sock;
1932 	int err;
1933 
1934 	/* start from scratch */
1935 	err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1936 	if (err < 0) {
1937 		dprintk("RPC:       can't create TCP transport socket (%d).\n",
1938 				-err);
1939 		goto out_err;
1940 	}
1941 	xs_reclassify_socket4(sock);
1942 
1943 	if (xs_bind4(transport, sock) < 0) {
1944 		sock_release(sock);
1945 		goto out_err;
1946 	}
1947 	return sock;
1948 out_err:
1949 	return ERR_PTR(-EIO);
1950 }
1951 
1952 /**
1953  * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
1954  * @work: RPC transport to connect
1955  *
1956  * Invoked by a work queue tasklet.
1957  */
1958 static void xs_tcp_connect_worker4(struct work_struct *work)
1959 {
1960 	struct sock_xprt *transport =
1961 		container_of(work, struct sock_xprt, connect_worker.work);
1962 	struct rpc_xprt *xprt = &transport->xprt;
1963 
1964 	xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4);
1965 }
1966 
1967 static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt,
1968 		struct sock_xprt *transport)
1969 {
1970 	struct socket *sock;
1971 	int err;
1972 
1973 	/* start from scratch */
1974 	err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock);
1975 	if (err < 0) {
1976 		dprintk("RPC:       can't create TCP transport socket (%d).\n",
1977 				-err);
1978 		goto out_err;
1979 	}
1980 	xs_reclassify_socket6(sock);
1981 
1982 	if (xs_bind6(transport, sock) < 0) {
1983 		sock_release(sock);
1984 		goto out_err;
1985 	}
1986 	return sock;
1987 out_err:
1988 	return ERR_PTR(-EIO);
1989 }
1990 
1991 /**
1992  * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
1993  * @work: RPC transport to connect
1994  *
1995  * Invoked by a work queue tasklet.
1996  */
1997 static void xs_tcp_connect_worker6(struct work_struct *work)
1998 {
1999 	struct sock_xprt *transport =
2000 		container_of(work, struct sock_xprt, connect_worker.work);
2001 	struct rpc_xprt *xprt = &transport->xprt;
2002 
2003 	xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6);
2004 }
2005 
2006 /**
2007  * xs_connect - connect a socket to a remote endpoint
2008  * @task: address of RPC task that manages state of connect request
2009  *
2010  * TCP: If the remote end dropped the connection, delay reconnecting.
2011  *
2012  * UDP socket connects are synchronous, but we use a work queue anyway
2013  * to guarantee that even unprivileged user processes can set up a
2014  * socket on a privileged port.
2015  *
2016  * If a UDP socket connect fails, the delay behavior here prevents
2017  * retry floods (hard mounts).
2018  */
2019 static void xs_connect(struct rpc_task *task)
2020 {
2021 	struct rpc_xprt *xprt = task->tk_xprt;
2022 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2023 
2024 	if (xprt_test_and_set_connecting(xprt))
2025 		return;
2026 
2027 	if (transport->sock != NULL) {
2028 		dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2029 				"seconds\n",
2030 				xprt, xprt->reestablish_timeout / HZ);
2031 		queue_delayed_work(rpciod_workqueue,
2032 				   &transport->connect_worker,
2033 				   xprt->reestablish_timeout);
2034 		xprt->reestablish_timeout <<= 1;
2035 		if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2036 			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2037 	} else {
2038 		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2039 		queue_delayed_work(rpciod_workqueue,
2040 				   &transport->connect_worker, 0);
2041 	}
2042 }
2043 
2044 static void xs_tcp_connect(struct rpc_task *task)
2045 {
2046 	struct rpc_xprt *xprt = task->tk_xprt;
2047 
2048 	/* Exit if we need to wait for socket shutdown to complete */
2049 	if (test_bit(XPRT_CLOSING, &xprt->state))
2050 		return;
2051 	xs_connect(task);
2052 }
2053 
2054 /**
2055  * xs_udp_print_stats - display UDP socket-specifc stats
2056  * @xprt: rpc_xprt struct containing statistics
2057  * @seq: output file
2058  *
2059  */
2060 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2061 {
2062 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2063 
2064 	seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
2065 			transport->srcport,
2066 			xprt->stat.bind_count,
2067 			xprt->stat.sends,
2068 			xprt->stat.recvs,
2069 			xprt->stat.bad_xids,
2070 			xprt->stat.req_u,
2071 			xprt->stat.bklog_u);
2072 }
2073 
2074 /**
2075  * xs_tcp_print_stats - display TCP socket-specifc stats
2076  * @xprt: rpc_xprt struct containing statistics
2077  * @seq: output file
2078  *
2079  */
2080 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2081 {
2082 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2083 	long idle_time = 0;
2084 
2085 	if (xprt_connected(xprt))
2086 		idle_time = (long)(jiffies - xprt->last_used) / HZ;
2087 
2088 	seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
2089 			transport->srcport,
2090 			xprt->stat.bind_count,
2091 			xprt->stat.connect_count,
2092 			xprt->stat.connect_time,
2093 			idle_time,
2094 			xprt->stat.sends,
2095 			xprt->stat.recvs,
2096 			xprt->stat.bad_xids,
2097 			xprt->stat.req_u,
2098 			xprt->stat.bklog_u);
2099 }
2100 
2101 static struct rpc_xprt_ops xs_udp_ops = {
2102 	.set_buffer_size	= xs_udp_set_buffer_size,
2103 	.reserve_xprt		= xprt_reserve_xprt_cong,
2104 	.release_xprt		= xprt_release_xprt_cong,
2105 	.rpcbind		= rpcb_getport_async,
2106 	.set_port		= xs_set_port,
2107 	.connect		= xs_connect,
2108 	.buf_alloc		= rpc_malloc,
2109 	.buf_free		= rpc_free,
2110 	.send_request		= xs_udp_send_request,
2111 	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
2112 	.timer			= xs_udp_timer,
2113 	.release_request	= xprt_release_rqst_cong,
2114 	.close			= xs_close,
2115 	.destroy		= xs_destroy,
2116 	.print_stats		= xs_udp_print_stats,
2117 };
2118 
2119 static struct rpc_xprt_ops xs_tcp_ops = {
2120 	.reserve_xprt		= xprt_reserve_xprt,
2121 	.release_xprt		= xs_tcp_release_xprt,
2122 	.rpcbind		= rpcb_getport_async,
2123 	.set_port		= xs_set_port,
2124 	.connect		= xs_tcp_connect,
2125 	.buf_alloc		= rpc_malloc,
2126 	.buf_free		= rpc_free,
2127 	.send_request		= xs_tcp_send_request,
2128 	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
2129 #if defined(CONFIG_NFS_V4_1)
2130 	.release_request	= bc_release_request,
2131 #endif /* CONFIG_NFS_V4_1 */
2132 	.close			= xs_tcp_close,
2133 	.destroy		= xs_destroy,
2134 	.print_stats		= xs_tcp_print_stats,
2135 };
2136 
2137 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2138 				      unsigned int slot_table_size)
2139 {
2140 	struct rpc_xprt *xprt;
2141 	struct sock_xprt *new;
2142 
2143 	if (args->addrlen > sizeof(xprt->addr)) {
2144 		dprintk("RPC:       xs_setup_xprt: address too large\n");
2145 		return ERR_PTR(-EBADF);
2146 	}
2147 
2148 	new = kzalloc(sizeof(*new), GFP_KERNEL);
2149 	if (new == NULL) {
2150 		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2151 				"rpc_xprt\n");
2152 		return ERR_PTR(-ENOMEM);
2153 	}
2154 	xprt = &new->xprt;
2155 
2156 	xprt->max_reqs = slot_table_size;
2157 	xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
2158 	if (xprt->slot == NULL) {
2159 		kfree(xprt);
2160 		dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
2161 				"table\n");
2162 		return ERR_PTR(-ENOMEM);
2163 	}
2164 
2165 	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2166 	xprt->addrlen = args->addrlen;
2167 	if (args->srcaddr)
2168 		memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
2169 
2170 	return xprt;
2171 }
2172 
2173 static const struct rpc_timeout xs_udp_default_timeout = {
2174 	.to_initval = 5 * HZ,
2175 	.to_maxval = 30 * HZ,
2176 	.to_increment = 5 * HZ,
2177 	.to_retries = 5,
2178 };
2179 
2180 /**
2181  * xs_setup_udp - Set up transport to use a UDP socket
2182  * @args: rpc transport creation arguments
2183  *
2184  */
2185 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2186 {
2187 	struct sockaddr *addr = args->dstaddr;
2188 	struct rpc_xprt *xprt;
2189 	struct sock_xprt *transport;
2190 
2191 	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
2192 	if (IS_ERR(xprt))
2193 		return xprt;
2194 	transport = container_of(xprt, struct sock_xprt, xprt);
2195 
2196 	xprt->prot = IPPROTO_UDP;
2197 	xprt->tsh_size = 0;
2198 	/* XXX: header size can vary due to auth type, IPv6, etc. */
2199 	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2200 
2201 	xprt->bind_timeout = XS_BIND_TO;
2202 	xprt->connect_timeout = XS_UDP_CONN_TO;
2203 	xprt->reestablish_timeout = XS_UDP_REEST_TO;
2204 	xprt->idle_timeout = XS_IDLE_DISC_TO;
2205 
2206 	xprt->ops = &xs_udp_ops;
2207 
2208 	xprt->timeout = &xs_udp_default_timeout;
2209 
2210 	switch (addr->sa_family) {
2211 	case AF_INET:
2212 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2213 			xprt_set_bound(xprt);
2214 
2215 		INIT_DELAYED_WORK(&transport->connect_worker,
2216 					xs_udp_connect_worker4);
2217 		xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2218 		break;
2219 	case AF_INET6:
2220 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2221 			xprt_set_bound(xprt);
2222 
2223 		INIT_DELAYED_WORK(&transport->connect_worker,
2224 					xs_udp_connect_worker6);
2225 		xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2226 		break;
2227 	default:
2228 		kfree(xprt);
2229 		return ERR_PTR(-EAFNOSUPPORT);
2230 	}
2231 
2232 	if (xprt_bound(xprt))
2233 		dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2234 				xprt->address_strings[RPC_DISPLAY_ADDR],
2235 				xprt->address_strings[RPC_DISPLAY_PORT],
2236 				xprt->address_strings[RPC_DISPLAY_PROTO]);
2237 	else
2238 		dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2239 				xprt->address_strings[RPC_DISPLAY_ADDR],
2240 				xprt->address_strings[RPC_DISPLAY_PROTO]);
2241 
2242 	if (try_module_get(THIS_MODULE))
2243 		return xprt;
2244 
2245 	kfree(xprt->slot);
2246 	kfree(xprt);
2247 	return ERR_PTR(-EINVAL);
2248 }
2249 
2250 static const struct rpc_timeout xs_tcp_default_timeout = {
2251 	.to_initval = 60 * HZ,
2252 	.to_maxval = 60 * HZ,
2253 	.to_retries = 2,
2254 };
2255 
2256 /**
2257  * xs_setup_tcp - Set up transport to use a TCP socket
2258  * @args: rpc transport creation arguments
2259  *
2260  */
2261 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2262 {
2263 	struct sockaddr *addr = args->dstaddr;
2264 	struct rpc_xprt *xprt;
2265 	struct sock_xprt *transport;
2266 
2267 	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2268 	if (IS_ERR(xprt))
2269 		return xprt;
2270 	transport = container_of(xprt, struct sock_xprt, xprt);
2271 
2272 	xprt->prot = IPPROTO_TCP;
2273 	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2274 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2275 
2276 	xprt->bind_timeout = XS_BIND_TO;
2277 	xprt->connect_timeout = XS_TCP_CONN_TO;
2278 	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2279 	xprt->idle_timeout = XS_IDLE_DISC_TO;
2280 
2281 	xprt->ops = &xs_tcp_ops;
2282 	xprt->timeout = &xs_tcp_default_timeout;
2283 
2284 	switch (addr->sa_family) {
2285 	case AF_INET:
2286 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2287 			xprt_set_bound(xprt);
2288 
2289 		INIT_DELAYED_WORK(&transport->connect_worker,
2290 					xs_tcp_connect_worker4);
2291 		xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2292 		break;
2293 	case AF_INET6:
2294 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2295 			xprt_set_bound(xprt);
2296 
2297 		INIT_DELAYED_WORK(&transport->connect_worker,
2298 					xs_tcp_connect_worker6);
2299 		xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2300 		break;
2301 	default:
2302 		kfree(xprt);
2303 		return ERR_PTR(-EAFNOSUPPORT);
2304 	}
2305 
2306 	if (xprt_bound(xprt))
2307 		dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2308 				xprt->address_strings[RPC_DISPLAY_ADDR],
2309 				xprt->address_strings[RPC_DISPLAY_PORT],
2310 				xprt->address_strings[RPC_DISPLAY_PROTO]);
2311 	else
2312 		dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2313 				xprt->address_strings[RPC_DISPLAY_ADDR],
2314 				xprt->address_strings[RPC_DISPLAY_PROTO]);
2315 
2316 
2317 	if (try_module_get(THIS_MODULE))
2318 		return xprt;
2319 
2320 	kfree(xprt->slot);
2321 	kfree(xprt);
2322 	return ERR_PTR(-EINVAL);
2323 }
2324 
2325 static struct xprt_class	xs_udp_transport = {
2326 	.list		= LIST_HEAD_INIT(xs_udp_transport.list),
2327 	.name		= "udp",
2328 	.owner		= THIS_MODULE,
2329 	.ident		= IPPROTO_UDP,
2330 	.setup		= xs_setup_udp,
2331 };
2332 
2333 static struct xprt_class	xs_tcp_transport = {
2334 	.list		= LIST_HEAD_INIT(xs_tcp_transport.list),
2335 	.name		= "tcp",
2336 	.owner		= THIS_MODULE,
2337 	.ident		= IPPROTO_TCP,
2338 	.setup		= xs_setup_tcp,
2339 };
2340 
2341 /**
2342  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2343  *
2344  */
2345 int init_socket_xprt(void)
2346 {
2347 #ifdef RPC_DEBUG
2348 	if (!sunrpc_table_header)
2349 		sunrpc_table_header = register_sysctl_table(sunrpc_table);
2350 #endif
2351 
2352 	xprt_register_transport(&xs_udp_transport);
2353 	xprt_register_transport(&xs_tcp_transport);
2354 
2355 	return 0;
2356 }
2357 
2358 /**
2359  * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2360  *
2361  */
2362 void cleanup_socket_xprt(void)
2363 {
2364 #ifdef RPC_DEBUG
2365 	if (sunrpc_table_header) {
2366 		unregister_sysctl_table(sunrpc_table_header);
2367 		sunrpc_table_header = NULL;
2368 	}
2369 #endif
2370 
2371 	xprt_unregister_transport(&xs_udp_transport);
2372 	xprt_unregister_transport(&xs_tcp_transport);
2373 }
2374 
2375 static int param_set_uint_minmax(const char *val, struct kernel_param *kp,
2376 		unsigned int min, unsigned int max)
2377 {
2378 	unsigned long num;
2379 	int ret;
2380 
2381 	if (!val)
2382 		return -EINVAL;
2383 	ret = strict_strtoul(val, 0, &num);
2384 	if (ret == -EINVAL || num < min || num > max)
2385 		return -EINVAL;
2386 	*((unsigned int *)kp->arg) = num;
2387 	return 0;
2388 }
2389 
2390 static int param_set_portnr(const char *val, struct kernel_param *kp)
2391 {
2392 	return param_set_uint_minmax(val, kp,
2393 			RPC_MIN_RESVPORT,
2394 			RPC_MAX_RESVPORT);
2395 }
2396 
2397 static int param_get_portnr(char *buffer, struct kernel_param *kp)
2398 {
2399 	return param_get_uint(buffer, kp);
2400 }
2401 #define param_check_portnr(name, p) \
2402 	__param_check(name, p, unsigned int);
2403 
2404 module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
2405 module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
2406 
2407 static int param_set_slot_table_size(const char *val, struct kernel_param *kp)
2408 {
2409 	return param_set_uint_minmax(val, kp,
2410 			RPC_MIN_SLOT_TABLE,
2411 			RPC_MAX_SLOT_TABLE);
2412 }
2413 
2414 static int param_get_slot_table_size(char *buffer, struct kernel_param *kp)
2415 {
2416 	return param_get_uint(buffer, kp);
2417 }
2418 #define param_check_slot_table_size(name, p) \
2419 	__param_check(name, p, unsigned int);
2420 
2421 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
2422 		   slot_table_size, 0644);
2423 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
2424 		   slot_table_size, 0644);
2425 
2426