xref: /linux/net/sunrpc/xprtsock.c (revision 6000fc4d6f3e55ad52cce8d76317187fe01af2aa)
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat
7  * TCP send fixes (C) 1998 Red Hat
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  *
17  * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18  *   <gilles.quillard@bull.net>
19  */
20 
21 #include <linux/types.h>
22 #include <linux/slab.h>
23 #include <linux/module.h>
24 #include <linux/capability.h>
25 #include <linux/pagemap.h>
26 #include <linux/errno.h>
27 #include <linux/socket.h>
28 #include <linux/in.h>
29 #include <linux/net.h>
30 #include <linux/mm.h>
31 #include <linux/udp.h>
32 #include <linux/tcp.h>
33 #include <linux/sunrpc/clnt.h>
34 #include <linux/sunrpc/sched.h>
35 #include <linux/sunrpc/xprtsock.h>
36 #include <linux/file.h>
37 #ifdef CONFIG_NFS_V4_1
38 #include <linux/sunrpc/bc_xprt.h>
39 #endif
40 
41 #include <net/sock.h>
42 #include <net/checksum.h>
43 #include <net/udp.h>
44 #include <net/tcp.h>
45 
46 /*
47  * xprtsock tunables
48  */
49 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
50 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
51 
52 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
53 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
54 
55 #define XS_TCP_LINGER_TO	(15U * HZ)
56 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
57 
58 /*
59  * We can register our own files under /proc/sys/sunrpc by
60  * calling register_sysctl_table() again.  The files in that
61  * directory become the union of all files registered there.
62  *
63  * We simply need to make sure that we don't collide with
64  * someone else's file names!
65  */
66 
67 #ifdef RPC_DEBUG
68 
69 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
70 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
71 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
72 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
73 
74 static struct ctl_table_header *sunrpc_table_header;
75 
76 /*
77  * FIXME: changing the UDP slot table size should also resize the UDP
78  *        socket buffers for existing UDP transports
79  */
80 static ctl_table xs_tunables_table[] = {
81 	{
82 		.ctl_name	= CTL_SLOTTABLE_UDP,
83 		.procname	= "udp_slot_table_entries",
84 		.data		= &xprt_udp_slot_table_entries,
85 		.maxlen		= sizeof(unsigned int),
86 		.mode		= 0644,
87 		.proc_handler	= &proc_dointvec_minmax,
88 		.strategy	= &sysctl_intvec,
89 		.extra1		= &min_slot_table_size,
90 		.extra2		= &max_slot_table_size
91 	},
92 	{
93 		.ctl_name	= CTL_SLOTTABLE_TCP,
94 		.procname	= "tcp_slot_table_entries",
95 		.data		= &xprt_tcp_slot_table_entries,
96 		.maxlen		= sizeof(unsigned int),
97 		.mode		= 0644,
98 		.proc_handler	= &proc_dointvec_minmax,
99 		.strategy	= &sysctl_intvec,
100 		.extra1		= &min_slot_table_size,
101 		.extra2		= &max_slot_table_size
102 	},
103 	{
104 		.ctl_name	= CTL_MIN_RESVPORT,
105 		.procname	= "min_resvport",
106 		.data		= &xprt_min_resvport,
107 		.maxlen		= sizeof(unsigned int),
108 		.mode		= 0644,
109 		.proc_handler	= &proc_dointvec_minmax,
110 		.strategy	= &sysctl_intvec,
111 		.extra1		= &xprt_min_resvport_limit,
112 		.extra2		= &xprt_max_resvport_limit
113 	},
114 	{
115 		.ctl_name	= CTL_MAX_RESVPORT,
116 		.procname	= "max_resvport",
117 		.data		= &xprt_max_resvport,
118 		.maxlen		= sizeof(unsigned int),
119 		.mode		= 0644,
120 		.proc_handler	= &proc_dointvec_minmax,
121 		.strategy	= &sysctl_intvec,
122 		.extra1		= &xprt_min_resvport_limit,
123 		.extra2		= &xprt_max_resvport_limit
124 	},
125 	{
126 		.procname	= "tcp_fin_timeout",
127 		.data		= &xs_tcp_fin_timeout,
128 		.maxlen		= sizeof(xs_tcp_fin_timeout),
129 		.mode		= 0644,
130 		.proc_handler	= &proc_dointvec_jiffies,
131 		.strategy	= sysctl_jiffies
132 	},
133 	{
134 		.ctl_name = 0,
135 	},
136 };
137 
138 static ctl_table sunrpc_table[] = {
139 	{
140 		.ctl_name	= CTL_SUNRPC,
141 		.procname	= "sunrpc",
142 		.mode		= 0555,
143 		.child		= xs_tunables_table
144 	},
145 	{
146 		.ctl_name = 0,
147 	},
148 };
149 
150 #endif
151 
152 /*
153  * Time out for an RPC UDP socket connect.  UDP socket connects are
154  * synchronous, but we set a timeout anyway in case of resource
155  * exhaustion on the local host.
156  */
157 #define XS_UDP_CONN_TO		(5U * HZ)
158 
159 /*
160  * Wait duration for an RPC TCP connection to be established.  Solaris
161  * NFS over TCP uses 60 seconds, for example, which is in line with how
162  * long a server takes to reboot.
163  */
164 #define XS_TCP_CONN_TO		(60U * HZ)
165 
166 /*
167  * Wait duration for a reply from the RPC portmapper.
168  */
169 #define XS_BIND_TO		(60U * HZ)
170 
171 /*
172  * Delay if a UDP socket connect error occurs.  This is most likely some
173  * kind of resource problem on the local host.
174  */
175 #define XS_UDP_REEST_TO		(2U * HZ)
176 
177 /*
178  * The reestablish timeout allows clients to delay for a bit before attempting
179  * to reconnect to a server that just dropped our connection.
180  *
181  * We implement an exponential backoff when trying to reestablish a TCP
182  * transport connection with the server.  Some servers like to drop a TCP
183  * connection when they are overworked, so we start with a short timeout and
184  * increase over time if the server is down or not responding.
185  */
186 #define XS_TCP_INIT_REEST_TO	(3U * HZ)
187 #define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)
188 
189 /*
190  * TCP idle timeout; client drops the transport socket if it is idle
191  * for this long.  Note that we also timeout UDP sockets to prevent
192  * holding port numbers when there is no RPC traffic.
193  */
194 #define XS_IDLE_DISC_TO		(5U * 60 * HZ)
195 
196 #ifdef RPC_DEBUG
197 # undef  RPC_DEBUG_DATA
198 # define RPCDBG_FACILITY	RPCDBG_TRANS
199 #endif
200 
201 #ifdef RPC_DEBUG_DATA
202 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
203 {
204 	u8 *buf = (u8 *) packet;
205 	int j;
206 
207 	dprintk("RPC:       %s\n", msg);
208 	for (j = 0; j < count && j < 128; j += 4) {
209 		if (!(j & 31)) {
210 			if (j)
211 				dprintk("\n");
212 			dprintk("0x%04x ", j);
213 		}
214 		dprintk("%02x%02x%02x%02x ",
215 			buf[j], buf[j+1], buf[j+2], buf[j+3]);
216 	}
217 	dprintk("\n");
218 }
219 #else
220 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
221 {
222 	/* NOP */
223 }
224 #endif
225 
226 struct sock_xprt {
227 	struct rpc_xprt		xprt;
228 
229 	/*
230 	 * Network layer
231 	 */
232 	struct socket *		sock;
233 	struct sock *		inet;
234 
235 	/*
236 	 * State of TCP reply receive
237 	 */
238 	__be32			tcp_fraghdr,
239 				tcp_xid;
240 
241 	u32			tcp_offset,
242 				tcp_reclen;
243 
244 	unsigned long		tcp_copied,
245 				tcp_flags;
246 
247 	/*
248 	 * Connection of transports
249 	 */
250 	struct delayed_work	connect_worker;
251 	struct sockaddr_storage	addr;
252 	unsigned short		port;
253 
254 	/*
255 	 * UDP socket buffer size parameters
256 	 */
257 	size_t			rcvsize,
258 				sndsize;
259 
260 	/*
261 	 * Saved socket callback addresses
262 	 */
263 	void			(*old_data_ready)(struct sock *, int);
264 	void			(*old_state_change)(struct sock *);
265 	void			(*old_write_space)(struct sock *);
266 	void			(*old_error_report)(struct sock *);
267 };
268 
269 /*
270  * TCP receive state flags
271  */
272 #define TCP_RCV_LAST_FRAG	(1UL << 0)
273 #define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
274 #define TCP_RCV_COPY_XID	(1UL << 2)
275 #define TCP_RCV_COPY_DATA	(1UL << 3)
276 #define TCP_RCV_READ_CALLDIR	(1UL << 4)
277 #define TCP_RCV_COPY_CALLDIR	(1UL << 5)
278 
279 /*
280  * TCP RPC flags
281  */
282 #define TCP_RPC_REPLY		(1UL << 6)
283 
284 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
285 {
286 	return (struct sockaddr *) &xprt->addr;
287 }
288 
289 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
290 {
291 	return (struct sockaddr_in *) &xprt->addr;
292 }
293 
294 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
295 {
296 	return (struct sockaddr_in6 *) &xprt->addr;
297 }
298 
299 static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
300 					  const char *protocol,
301 					  const char *netid)
302 {
303 	struct sockaddr_in *addr = xs_addr_in(xprt);
304 	char *buf;
305 
306 	buf = kzalloc(20, GFP_KERNEL);
307 	if (buf) {
308 		snprintf(buf, 20, "%pI4", &addr->sin_addr.s_addr);
309 	}
310 	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
311 
312 	buf = kzalloc(8, GFP_KERNEL);
313 	if (buf) {
314 		snprintf(buf, 8, "%u",
315 				ntohs(addr->sin_port));
316 	}
317 	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
318 
319 	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
320 
321 	buf = kzalloc(48, GFP_KERNEL);
322 	if (buf) {
323 		snprintf(buf, 48, "addr=%pI4 port=%u proto=%s",
324 			&addr->sin_addr.s_addr,
325 			ntohs(addr->sin_port),
326 			protocol);
327 	}
328 	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
329 
330 	buf = kzalloc(10, GFP_KERNEL);
331 	if (buf) {
332 		snprintf(buf, 10, "%02x%02x%02x%02x",
333 				NIPQUAD(addr->sin_addr.s_addr));
334 	}
335 	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
336 
337 	buf = kzalloc(8, GFP_KERNEL);
338 	if (buf) {
339 		snprintf(buf, 8, "%4hx",
340 				ntohs(addr->sin_port));
341 	}
342 	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
343 
344 	buf = kzalloc(30, GFP_KERNEL);
345 	if (buf) {
346 		snprintf(buf, 30, "%pI4.%u.%u",
347 				&addr->sin_addr.s_addr,
348 				ntohs(addr->sin_port) >> 8,
349 				ntohs(addr->sin_port) & 0xff);
350 	}
351 	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
352 
353 	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
354 }
355 
356 static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
357 					  const char *protocol,
358 					  const char *netid)
359 {
360 	struct sockaddr_in6 *addr = xs_addr_in6(xprt);
361 	char *buf;
362 
363 	buf = kzalloc(40, GFP_KERNEL);
364 	if (buf) {
365 		snprintf(buf, 40, "%pI6",&addr->sin6_addr);
366 	}
367 	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
368 
369 	buf = kzalloc(8, GFP_KERNEL);
370 	if (buf) {
371 		snprintf(buf, 8, "%u",
372 				ntohs(addr->sin6_port));
373 	}
374 	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
375 
376 	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
377 
378 	buf = kzalloc(64, GFP_KERNEL);
379 	if (buf) {
380 		snprintf(buf, 64, "addr=%pI6 port=%u proto=%s",
381 				&addr->sin6_addr,
382 				ntohs(addr->sin6_port),
383 				protocol);
384 	}
385 	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
386 
387 	buf = kzalloc(36, GFP_KERNEL);
388 	if (buf)
389 		snprintf(buf, 36, "%pi6", &addr->sin6_addr);
390 
391 	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
392 
393 	buf = kzalloc(8, GFP_KERNEL);
394 	if (buf) {
395 		snprintf(buf, 8, "%4hx",
396 				ntohs(addr->sin6_port));
397 	}
398 	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
399 
400 	buf = kzalloc(50, GFP_KERNEL);
401 	if (buf) {
402 		snprintf(buf, 50, "%pI6.%u.%u",
403 			 &addr->sin6_addr,
404 			 ntohs(addr->sin6_port) >> 8,
405 			 ntohs(addr->sin6_port) & 0xff);
406 	}
407 	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
408 
409 	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
410 }
411 
412 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
413 {
414 	unsigned int i;
415 
416 	for (i = 0; i < RPC_DISPLAY_MAX; i++)
417 		switch (i) {
418 		case RPC_DISPLAY_PROTO:
419 		case RPC_DISPLAY_NETID:
420 			continue;
421 		default:
422 			kfree(xprt->address_strings[i]);
423 		}
424 }
425 
426 #define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
427 
428 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
429 {
430 	struct msghdr msg = {
431 		.msg_name	= addr,
432 		.msg_namelen	= addrlen,
433 		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
434 	};
435 	struct kvec iov = {
436 		.iov_base	= vec->iov_base + base,
437 		.iov_len	= vec->iov_len - base,
438 	};
439 
440 	if (iov.iov_len != 0)
441 		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
442 	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
443 }
444 
445 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
446 {
447 	struct page **ppage;
448 	unsigned int remainder;
449 	int err, sent = 0;
450 
451 	remainder = xdr->page_len - base;
452 	base += xdr->page_base;
453 	ppage = xdr->pages + (base >> PAGE_SHIFT);
454 	base &= ~PAGE_MASK;
455 	for(;;) {
456 		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
457 		int flags = XS_SENDMSG_FLAGS;
458 
459 		remainder -= len;
460 		if (remainder != 0 || more)
461 			flags |= MSG_MORE;
462 		err = sock->ops->sendpage(sock, *ppage, base, len, flags);
463 		if (remainder == 0 || err != len)
464 			break;
465 		sent += err;
466 		ppage++;
467 		base = 0;
468 	}
469 	if (sent == 0)
470 		return err;
471 	if (err > 0)
472 		sent += err;
473 	return sent;
474 }
475 
476 /**
477  * xs_sendpages - write pages directly to a socket
478  * @sock: socket to send on
479  * @addr: UDP only -- address of destination
480  * @addrlen: UDP only -- length of destination address
481  * @xdr: buffer containing this request
482  * @base: starting position in the buffer
483  *
484  */
485 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
486 {
487 	unsigned int remainder = xdr->len - base;
488 	int err, sent = 0;
489 
490 	if (unlikely(!sock))
491 		return -ENOTSOCK;
492 
493 	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
494 	if (base != 0) {
495 		addr = NULL;
496 		addrlen = 0;
497 	}
498 
499 	if (base < xdr->head[0].iov_len || addr != NULL) {
500 		unsigned int len = xdr->head[0].iov_len - base;
501 		remainder -= len;
502 		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
503 		if (remainder == 0 || err != len)
504 			goto out;
505 		sent += err;
506 		base = 0;
507 	} else
508 		base -= xdr->head[0].iov_len;
509 
510 	if (base < xdr->page_len) {
511 		unsigned int len = xdr->page_len - base;
512 		remainder -= len;
513 		err = xs_send_pagedata(sock, xdr, base, remainder != 0);
514 		if (remainder == 0 || err != len)
515 			goto out;
516 		sent += err;
517 		base = 0;
518 	} else
519 		base -= xdr->page_len;
520 
521 	if (base >= xdr->tail[0].iov_len)
522 		return sent;
523 	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
524 out:
525 	if (sent == 0)
526 		return err;
527 	if (err > 0)
528 		sent += err;
529 	return sent;
530 }
531 
532 static void xs_nospace_callback(struct rpc_task *task)
533 {
534 	struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
535 
536 	transport->inet->sk_write_pending--;
537 	clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
538 }
539 
540 /**
541  * xs_nospace - place task on wait queue if transmit was incomplete
542  * @task: task to put to sleep
543  *
544  */
545 static int xs_nospace(struct rpc_task *task)
546 {
547 	struct rpc_rqst *req = task->tk_rqstp;
548 	struct rpc_xprt *xprt = req->rq_xprt;
549 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
550 	int ret = 0;
551 
552 	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
553 			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
554 			req->rq_slen);
555 
556 	/* Protect against races with write_space */
557 	spin_lock_bh(&xprt->transport_lock);
558 
559 	/* Don't race with disconnect */
560 	if (xprt_connected(xprt)) {
561 		if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
562 			ret = -EAGAIN;
563 			/*
564 			 * Notify TCP that we're limited by the application
565 			 * window size
566 			 */
567 			set_bit(SOCK_NOSPACE, &transport->sock->flags);
568 			transport->inet->sk_write_pending++;
569 			/* ...and wait for more buffer space */
570 			xprt_wait_for_buffer_space(task, xs_nospace_callback);
571 		}
572 	} else {
573 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
574 		ret = -ENOTCONN;
575 	}
576 
577 	spin_unlock_bh(&xprt->transport_lock);
578 	return ret;
579 }
580 
581 /**
582  * xs_udp_send_request - write an RPC request to a UDP socket
583  * @task: address of RPC task that manages the state of an RPC request
584  *
585  * Return values:
586  *        0:	The request has been sent
587  *   EAGAIN:	The socket was blocked, please call again later to
588  *		complete the request
589  * ENOTCONN:	Caller needs to invoke connect logic then call again
590  *    other:	Some other error occured, the request was not sent
591  */
592 static int xs_udp_send_request(struct rpc_task *task)
593 {
594 	struct rpc_rqst *req = task->tk_rqstp;
595 	struct rpc_xprt *xprt = req->rq_xprt;
596 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
597 	struct xdr_buf *xdr = &req->rq_snd_buf;
598 	int status;
599 
600 	xs_pktdump("packet data:",
601 				req->rq_svec->iov_base,
602 				req->rq_svec->iov_len);
603 
604 	if (!xprt_bound(xprt))
605 		return -ENOTCONN;
606 	status = xs_sendpages(transport->sock,
607 			      xs_addr(xprt),
608 			      xprt->addrlen, xdr,
609 			      req->rq_bytes_sent);
610 
611 	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
612 			xdr->len - req->rq_bytes_sent, status);
613 
614 	if (status >= 0) {
615 		task->tk_bytes_sent += status;
616 		if (status >= req->rq_slen)
617 			return 0;
618 		/* Still some bytes left; set up for a retry later. */
619 		status = -EAGAIN;
620 	}
621 	if (!transport->sock)
622 		goto out;
623 
624 	switch (status) {
625 	case -ENOTSOCK:
626 		status = -ENOTCONN;
627 		/* Should we call xs_close() here? */
628 		break;
629 	case -EAGAIN:
630 		status = xs_nospace(task);
631 		break;
632 	default:
633 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
634 			-status);
635 	case -ENETUNREACH:
636 	case -EPIPE:
637 	case -ECONNREFUSED:
638 		/* When the server has died, an ICMP port unreachable message
639 		 * prompts ECONNREFUSED. */
640 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
641 	}
642 out:
643 	return status;
644 }
645 
646 /**
647  * xs_tcp_shutdown - gracefully shut down a TCP socket
648  * @xprt: transport
649  *
650  * Initiates a graceful shutdown of the TCP socket by calling the
651  * equivalent of shutdown(SHUT_WR);
652  */
653 static void xs_tcp_shutdown(struct rpc_xprt *xprt)
654 {
655 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
656 	struct socket *sock = transport->sock;
657 
658 	if (sock != NULL)
659 		kernel_sock_shutdown(sock, SHUT_WR);
660 }
661 
662 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
663 {
664 	u32 reclen = buf->len - sizeof(rpc_fraghdr);
665 	rpc_fraghdr *base = buf->head[0].iov_base;
666 	*base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
667 }
668 
669 /**
670  * xs_tcp_send_request - write an RPC request to a TCP socket
671  * @task: address of RPC task that manages the state of an RPC request
672  *
673  * Return values:
674  *        0:	The request has been sent
675  *   EAGAIN:	The socket was blocked, please call again later to
676  *		complete the request
677  * ENOTCONN:	Caller needs to invoke connect logic then call again
678  *    other:	Some other error occured, the request was not sent
679  *
680  * XXX: In the case of soft timeouts, should we eventually give up
681  *	if sendmsg is not able to make progress?
682  */
683 static int xs_tcp_send_request(struct rpc_task *task)
684 {
685 	struct rpc_rqst *req = task->tk_rqstp;
686 	struct rpc_xprt *xprt = req->rq_xprt;
687 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
688 	struct xdr_buf *xdr = &req->rq_snd_buf;
689 	int status;
690 
691 	xs_encode_tcp_record_marker(&req->rq_snd_buf);
692 
693 	xs_pktdump("packet data:",
694 				req->rq_svec->iov_base,
695 				req->rq_svec->iov_len);
696 
697 	/* Continue transmitting the packet/record. We must be careful
698 	 * to cope with writespace callbacks arriving _after_ we have
699 	 * called sendmsg(). */
700 	while (1) {
701 		status = xs_sendpages(transport->sock,
702 					NULL, 0, xdr, req->rq_bytes_sent);
703 
704 		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
705 				xdr->len - req->rq_bytes_sent, status);
706 
707 		if (unlikely(status < 0))
708 			break;
709 
710 		/* If we've sent the entire packet, immediately
711 		 * reset the count of bytes sent. */
712 		req->rq_bytes_sent += status;
713 		task->tk_bytes_sent += status;
714 		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
715 			req->rq_bytes_sent = 0;
716 			return 0;
717 		}
718 
719 		if (status != 0)
720 			continue;
721 		status = -EAGAIN;
722 		break;
723 	}
724 	if (!transport->sock)
725 		goto out;
726 
727 	switch (status) {
728 	case -ENOTSOCK:
729 		status = -ENOTCONN;
730 		/* Should we call xs_close() here? */
731 		break;
732 	case -EAGAIN:
733 		status = xs_nospace(task);
734 		break;
735 	default:
736 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
737 			-status);
738 	case -ECONNRESET:
739 	case -EPIPE:
740 		xs_tcp_shutdown(xprt);
741 	case -ECONNREFUSED:
742 	case -ENOTCONN:
743 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
744 	}
745 out:
746 	return status;
747 }
748 
749 /**
750  * xs_tcp_release_xprt - clean up after a tcp transmission
751  * @xprt: transport
752  * @task: rpc task
753  *
754  * This cleans up if an error causes us to abort the transmission of a request.
755  * In this case, the socket may need to be reset in order to avoid confusing
756  * the server.
757  */
758 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
759 {
760 	struct rpc_rqst *req;
761 
762 	if (task != xprt->snd_task)
763 		return;
764 	if (task == NULL)
765 		goto out_release;
766 	req = task->tk_rqstp;
767 	if (req->rq_bytes_sent == 0)
768 		goto out_release;
769 	if (req->rq_bytes_sent == req->rq_snd_buf.len)
770 		goto out_release;
771 	set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
772 out_release:
773 	xprt_release_xprt(xprt, task);
774 }
775 
776 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
777 {
778 	transport->old_data_ready = sk->sk_data_ready;
779 	transport->old_state_change = sk->sk_state_change;
780 	transport->old_write_space = sk->sk_write_space;
781 	transport->old_error_report = sk->sk_error_report;
782 }
783 
784 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
785 {
786 	sk->sk_data_ready = transport->old_data_ready;
787 	sk->sk_state_change = transport->old_state_change;
788 	sk->sk_write_space = transport->old_write_space;
789 	sk->sk_error_report = transport->old_error_report;
790 }
791 
792 static void xs_reset_transport(struct sock_xprt *transport)
793 {
794 	struct socket *sock = transport->sock;
795 	struct sock *sk = transport->inet;
796 
797 	if (sk == NULL)
798 		return;
799 
800 	write_lock_bh(&sk->sk_callback_lock);
801 	transport->inet = NULL;
802 	transport->sock = NULL;
803 
804 	sk->sk_user_data = NULL;
805 
806 	xs_restore_old_callbacks(transport, sk);
807 	write_unlock_bh(&sk->sk_callback_lock);
808 
809 	sk->sk_no_check = 0;
810 
811 	sock_release(sock);
812 }
813 
814 /**
815  * xs_close - close a socket
816  * @xprt: transport
817  *
818  * This is used when all requests are complete; ie, no DRC state remains
819  * on the server we want to save.
820  *
821  * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
822  * xs_reset_transport() zeroing the socket from underneath a writer.
823  */
824 static void xs_close(struct rpc_xprt *xprt)
825 {
826 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
827 
828 	dprintk("RPC:       xs_close xprt %p\n", xprt);
829 
830 	xs_reset_transport(transport);
831 
832 	smp_mb__before_clear_bit();
833 	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
834 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
835 	clear_bit(XPRT_CLOSING, &xprt->state);
836 	smp_mb__after_clear_bit();
837 	xprt_disconnect_done(xprt);
838 }
839 
840 static void xs_tcp_close(struct rpc_xprt *xprt)
841 {
842 	if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
843 		xs_close(xprt);
844 	else
845 		xs_tcp_shutdown(xprt);
846 }
847 
848 /**
849  * xs_destroy - prepare to shutdown a transport
850  * @xprt: doomed transport
851  *
852  */
853 static void xs_destroy(struct rpc_xprt *xprt)
854 {
855 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
856 
857 	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
858 
859 	cancel_rearming_delayed_work(&transport->connect_worker);
860 
861 	xs_close(xprt);
862 	xs_free_peer_addresses(xprt);
863 	kfree(xprt->slot);
864 	kfree(xprt);
865 	module_put(THIS_MODULE);
866 }
867 
868 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
869 {
870 	return (struct rpc_xprt *) sk->sk_user_data;
871 }
872 
873 /**
874  * xs_udp_data_ready - "data ready" callback for UDP sockets
875  * @sk: socket with data to read
876  * @len: how much data to read
877  *
878  */
879 static void xs_udp_data_ready(struct sock *sk, int len)
880 {
881 	struct rpc_task *task;
882 	struct rpc_xprt *xprt;
883 	struct rpc_rqst *rovr;
884 	struct sk_buff *skb;
885 	int err, repsize, copied;
886 	u32 _xid;
887 	__be32 *xp;
888 
889 	read_lock(&sk->sk_callback_lock);
890 	dprintk("RPC:       xs_udp_data_ready...\n");
891 	if (!(xprt = xprt_from_sock(sk)))
892 		goto out;
893 
894 	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
895 		goto out;
896 
897 	if (xprt->shutdown)
898 		goto dropit;
899 
900 	repsize = skb->len - sizeof(struct udphdr);
901 	if (repsize < 4) {
902 		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
903 		goto dropit;
904 	}
905 
906 	/* Copy the XID from the skb... */
907 	xp = skb_header_pointer(skb, sizeof(struct udphdr),
908 				sizeof(_xid), &_xid);
909 	if (xp == NULL)
910 		goto dropit;
911 
912 	/* Look up and lock the request corresponding to the given XID */
913 	spin_lock(&xprt->transport_lock);
914 	rovr = xprt_lookup_rqst(xprt, *xp);
915 	if (!rovr)
916 		goto out_unlock;
917 	task = rovr->rq_task;
918 
919 	if ((copied = rovr->rq_private_buf.buflen) > repsize)
920 		copied = repsize;
921 
922 	/* Suck it into the iovec, verify checksum if not done by hw. */
923 	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
924 		UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
925 		goto out_unlock;
926 	}
927 
928 	UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
929 
930 	/* Something worked... */
931 	dst_confirm(skb_dst(skb));
932 
933 	xprt_adjust_cwnd(task, copied);
934 	xprt_update_rtt(task);
935 	xprt_complete_rqst(task, copied);
936 
937  out_unlock:
938 	spin_unlock(&xprt->transport_lock);
939  dropit:
940 	skb_free_datagram(sk, skb);
941  out:
942 	read_unlock(&sk->sk_callback_lock);
943 }
944 
945 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
946 {
947 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
948 	size_t len, used;
949 	char *p;
950 
951 	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
952 	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
953 	used = xdr_skb_read_bits(desc, p, len);
954 	transport->tcp_offset += used;
955 	if (used != len)
956 		return;
957 
958 	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
959 	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
960 		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
961 	else
962 		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
963 	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
964 
965 	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
966 	transport->tcp_offset = 0;
967 
968 	/* Sanity check of the record length */
969 	if (unlikely(transport->tcp_reclen < 8)) {
970 		dprintk("RPC:       invalid TCP record fragment length\n");
971 		xprt_force_disconnect(xprt);
972 		return;
973 	}
974 	dprintk("RPC:       reading TCP record fragment of length %d\n",
975 			transport->tcp_reclen);
976 }
977 
978 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
979 {
980 	if (transport->tcp_offset == transport->tcp_reclen) {
981 		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
982 		transport->tcp_offset = 0;
983 		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
984 			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
985 			transport->tcp_flags |= TCP_RCV_COPY_XID;
986 			transport->tcp_copied = 0;
987 		}
988 	}
989 }
990 
991 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
992 {
993 	size_t len, used;
994 	char *p;
995 
996 	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
997 	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
998 	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
999 	used = xdr_skb_read_bits(desc, p, len);
1000 	transport->tcp_offset += used;
1001 	if (used != len)
1002 		return;
1003 	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1004 	transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1005 	transport->tcp_copied = 4;
1006 	dprintk("RPC:       reading %s XID %08x\n",
1007 			(transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1008 							      : "request with",
1009 			ntohl(transport->tcp_xid));
1010 	xs_tcp_check_fraghdr(transport);
1011 }
1012 
1013 static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1014 				       struct xdr_skb_reader *desc)
1015 {
1016 	size_t len, used;
1017 	u32 offset;
1018 	__be32	calldir;
1019 
1020 	/*
1021 	 * We want transport->tcp_offset to be 8 at the end of this routine
1022 	 * (4 bytes for the xid and 4 bytes for the call/reply flag).
1023 	 * When this function is called for the first time,
1024 	 * transport->tcp_offset is 4 (after having already read the xid).
1025 	 */
1026 	offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1027 	len = sizeof(calldir) - offset;
1028 	dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1029 	used = xdr_skb_read_bits(desc, &calldir, len);
1030 	transport->tcp_offset += used;
1031 	if (used != len)
1032 		return;
1033 	transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1034 	transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1035 	transport->tcp_flags |= TCP_RCV_COPY_DATA;
1036 	/*
1037 	 * We don't yet have the XDR buffer, so we will write the calldir
1038 	 * out after we get the buffer from the 'struct rpc_rqst'
1039 	 */
1040 	if (ntohl(calldir) == RPC_REPLY)
1041 		transport->tcp_flags |= TCP_RPC_REPLY;
1042 	else
1043 		transport->tcp_flags &= ~TCP_RPC_REPLY;
1044 	dprintk("RPC:       reading %s CALL/REPLY flag %08x\n",
1045 			(transport->tcp_flags & TCP_RPC_REPLY) ?
1046 				"reply for" : "request with", calldir);
1047 	xs_tcp_check_fraghdr(transport);
1048 }
1049 
1050 static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1051 				     struct xdr_skb_reader *desc,
1052 				     struct rpc_rqst *req)
1053 {
1054 	struct sock_xprt *transport =
1055 				container_of(xprt, struct sock_xprt, xprt);
1056 	struct xdr_buf *rcvbuf;
1057 	size_t len;
1058 	ssize_t r;
1059 
1060 	rcvbuf = &req->rq_private_buf;
1061 
1062 	if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1063 		/*
1064 		 * Save the RPC direction in the XDR buffer
1065 		 */
1066 		__be32	calldir = transport->tcp_flags & TCP_RPC_REPLY ?
1067 					htonl(RPC_REPLY) : 0;
1068 
1069 		memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1070 			&calldir, sizeof(calldir));
1071 		transport->tcp_copied += sizeof(calldir);
1072 		transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1073 	}
1074 
1075 	len = desc->count;
1076 	if (len > transport->tcp_reclen - transport->tcp_offset) {
1077 		struct xdr_skb_reader my_desc;
1078 
1079 		len = transport->tcp_reclen - transport->tcp_offset;
1080 		memcpy(&my_desc, desc, sizeof(my_desc));
1081 		my_desc.count = len;
1082 		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1083 					  &my_desc, xdr_skb_read_bits);
1084 		desc->count -= r;
1085 		desc->offset += r;
1086 	} else
1087 		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1088 					  desc, xdr_skb_read_bits);
1089 
1090 	if (r > 0) {
1091 		transport->tcp_copied += r;
1092 		transport->tcp_offset += r;
1093 	}
1094 	if (r != len) {
1095 		/* Error when copying to the receive buffer,
1096 		 * usually because we weren't able to allocate
1097 		 * additional buffer pages. All we can do now
1098 		 * is turn off TCP_RCV_COPY_DATA, so the request
1099 		 * will not receive any additional updates,
1100 		 * and time out.
1101 		 * Any remaining data from this record will
1102 		 * be discarded.
1103 		 */
1104 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1105 		dprintk("RPC:       XID %08x truncated request\n",
1106 				ntohl(transport->tcp_xid));
1107 		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1108 				"tcp_offset = %u, tcp_reclen = %u\n",
1109 				xprt, transport->tcp_copied,
1110 				transport->tcp_offset, transport->tcp_reclen);
1111 		return;
1112 	}
1113 
1114 	dprintk("RPC:       XID %08x read %Zd bytes\n",
1115 			ntohl(transport->tcp_xid), r);
1116 	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1117 			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
1118 			transport->tcp_offset, transport->tcp_reclen);
1119 
1120 	if (transport->tcp_copied == req->rq_private_buf.buflen)
1121 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1122 	else if (transport->tcp_offset == transport->tcp_reclen) {
1123 		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1124 			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1125 	}
1126 
1127 	return;
1128 }
1129 
1130 /*
1131  * Finds the request corresponding to the RPC xid and invokes the common
1132  * tcp read code to read the data.
1133  */
1134 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1135 				    struct xdr_skb_reader *desc)
1136 {
1137 	struct sock_xprt *transport =
1138 				container_of(xprt, struct sock_xprt, xprt);
1139 	struct rpc_rqst *req;
1140 
1141 	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1142 
1143 	/* Find and lock the request corresponding to this xid */
1144 	spin_lock(&xprt->transport_lock);
1145 	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1146 	if (!req) {
1147 		dprintk("RPC:       XID %08x request not found!\n",
1148 				ntohl(transport->tcp_xid));
1149 		spin_unlock(&xprt->transport_lock);
1150 		return -1;
1151 	}
1152 
1153 	xs_tcp_read_common(xprt, desc, req);
1154 
1155 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1156 		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1157 
1158 	spin_unlock(&xprt->transport_lock);
1159 	return 0;
1160 }
1161 
1162 #if defined(CONFIG_NFS_V4_1)
1163 /*
1164  * Obtains an rpc_rqst previously allocated and invokes the common
1165  * tcp read code to read the data.  The result is placed in the callback
1166  * queue.
1167  * If we're unable to obtain the rpc_rqst we schedule the closing of the
1168  * connection and return -1.
1169  */
1170 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1171 				       struct xdr_skb_reader *desc)
1172 {
1173 	struct sock_xprt *transport =
1174 				container_of(xprt, struct sock_xprt, xprt);
1175 	struct rpc_rqst *req;
1176 
1177 	req = xprt_alloc_bc_request(xprt);
1178 	if (req == NULL) {
1179 		printk(KERN_WARNING "Callback slot table overflowed\n");
1180 		xprt_force_disconnect(xprt);
1181 		return -1;
1182 	}
1183 
1184 	req->rq_xid = transport->tcp_xid;
1185 	dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1186 	xs_tcp_read_common(xprt, desc, req);
1187 
1188 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1189 		struct svc_serv *bc_serv = xprt->bc_serv;
1190 
1191 		/*
1192 		 * Add callback request to callback list.  The callback
1193 		 * service sleeps on the sv_cb_waitq waiting for new
1194 		 * requests.  Wake it up after adding enqueing the
1195 		 * request.
1196 		 */
1197 		dprintk("RPC:       add callback request to list\n");
1198 		spin_lock(&bc_serv->sv_cb_lock);
1199 		list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1200 		spin_unlock(&bc_serv->sv_cb_lock);
1201 		wake_up(&bc_serv->sv_cb_waitq);
1202 	}
1203 
1204 	req->rq_private_buf.len = transport->tcp_copied;
1205 
1206 	return 0;
1207 }
1208 
1209 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1210 					struct xdr_skb_reader *desc)
1211 {
1212 	struct sock_xprt *transport =
1213 				container_of(xprt, struct sock_xprt, xprt);
1214 
1215 	return (transport->tcp_flags & TCP_RPC_REPLY) ?
1216 		xs_tcp_read_reply(xprt, desc) :
1217 		xs_tcp_read_callback(xprt, desc);
1218 }
1219 #else
1220 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1221 					struct xdr_skb_reader *desc)
1222 {
1223 	return xs_tcp_read_reply(xprt, desc);
1224 }
1225 #endif /* CONFIG_NFS_V4_1 */
1226 
1227 /*
1228  * Read data off the transport.  This can be either an RPC_CALL or an
1229  * RPC_REPLY.  Relay the processing to helper functions.
1230  */
1231 static void xs_tcp_read_data(struct rpc_xprt *xprt,
1232 				    struct xdr_skb_reader *desc)
1233 {
1234 	struct sock_xprt *transport =
1235 				container_of(xprt, struct sock_xprt, xprt);
1236 
1237 	if (_xs_tcp_read_data(xprt, desc) == 0)
1238 		xs_tcp_check_fraghdr(transport);
1239 	else {
1240 		/*
1241 		 * The transport_lock protects the request handling.
1242 		 * There's no need to hold it to update the tcp_flags.
1243 		 */
1244 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1245 	}
1246 }
1247 
1248 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1249 {
1250 	size_t len;
1251 
1252 	len = transport->tcp_reclen - transport->tcp_offset;
1253 	if (len > desc->count)
1254 		len = desc->count;
1255 	desc->count -= len;
1256 	desc->offset += len;
1257 	transport->tcp_offset += len;
1258 	dprintk("RPC:       discarded %Zu bytes\n", len);
1259 	xs_tcp_check_fraghdr(transport);
1260 }
1261 
1262 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1263 {
1264 	struct rpc_xprt *xprt = rd_desc->arg.data;
1265 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1266 	struct xdr_skb_reader desc = {
1267 		.skb	= skb,
1268 		.offset	= offset,
1269 		.count	= len,
1270 	};
1271 
1272 	dprintk("RPC:       xs_tcp_data_recv started\n");
1273 	do {
1274 		/* Read in a new fragment marker if necessary */
1275 		/* Can we ever really expect to get completely empty fragments? */
1276 		if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1277 			xs_tcp_read_fraghdr(xprt, &desc);
1278 			continue;
1279 		}
1280 		/* Read in the xid if necessary */
1281 		if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1282 			xs_tcp_read_xid(transport, &desc);
1283 			continue;
1284 		}
1285 		/* Read in the call/reply flag */
1286 		if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1287 			xs_tcp_read_calldir(transport, &desc);
1288 			continue;
1289 		}
1290 		/* Read in the request data */
1291 		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1292 			xs_tcp_read_data(xprt, &desc);
1293 			continue;
1294 		}
1295 		/* Skip over any trailing bytes on short reads */
1296 		xs_tcp_read_discard(transport, &desc);
1297 	} while (desc.count);
1298 	dprintk("RPC:       xs_tcp_data_recv done\n");
1299 	return len - desc.count;
1300 }
1301 
1302 /**
1303  * xs_tcp_data_ready - "data ready" callback for TCP sockets
1304  * @sk: socket with data to read
1305  * @bytes: how much data to read
1306  *
1307  */
1308 static void xs_tcp_data_ready(struct sock *sk, int bytes)
1309 {
1310 	struct rpc_xprt *xprt;
1311 	read_descriptor_t rd_desc;
1312 	int read;
1313 
1314 	dprintk("RPC:       xs_tcp_data_ready...\n");
1315 
1316 	read_lock(&sk->sk_callback_lock);
1317 	if (!(xprt = xprt_from_sock(sk)))
1318 		goto out;
1319 	if (xprt->shutdown)
1320 		goto out;
1321 
1322 	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1323 	rd_desc.arg.data = xprt;
1324 	do {
1325 		rd_desc.count = 65536;
1326 		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1327 	} while (read > 0);
1328 out:
1329 	read_unlock(&sk->sk_callback_lock);
1330 }
1331 
1332 /*
1333  * Do the equivalent of linger/linger2 handling for dealing with
1334  * broken servers that don't close the socket in a timely
1335  * fashion
1336  */
1337 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1338 		unsigned long timeout)
1339 {
1340 	struct sock_xprt *transport;
1341 
1342 	if (xprt_test_and_set_connecting(xprt))
1343 		return;
1344 	set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1345 	transport = container_of(xprt, struct sock_xprt, xprt);
1346 	queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1347 			   timeout);
1348 }
1349 
1350 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1351 {
1352 	struct sock_xprt *transport;
1353 
1354 	transport = container_of(xprt, struct sock_xprt, xprt);
1355 
1356 	if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1357 	    !cancel_delayed_work(&transport->connect_worker))
1358 		return;
1359 	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1360 	xprt_clear_connecting(xprt);
1361 }
1362 
1363 static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1364 {
1365 	smp_mb__before_clear_bit();
1366 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1367 	clear_bit(XPRT_CLOSING, &xprt->state);
1368 	smp_mb__after_clear_bit();
1369 	/* Mark transport as closed and wake up all pending tasks */
1370 	xprt_disconnect_done(xprt);
1371 }
1372 
1373 /**
1374  * xs_tcp_state_change - callback to handle TCP socket state changes
1375  * @sk: socket whose state has changed
1376  *
1377  */
1378 static void xs_tcp_state_change(struct sock *sk)
1379 {
1380 	struct rpc_xprt *xprt;
1381 
1382 	read_lock(&sk->sk_callback_lock);
1383 	if (!(xprt = xprt_from_sock(sk)))
1384 		goto out;
1385 	dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1386 	dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
1387 			sk->sk_state, xprt_connected(xprt),
1388 			sock_flag(sk, SOCK_DEAD),
1389 			sock_flag(sk, SOCK_ZAPPED));
1390 
1391 	switch (sk->sk_state) {
1392 	case TCP_ESTABLISHED:
1393 		spin_lock_bh(&xprt->transport_lock);
1394 		if (!xprt_test_and_set_connected(xprt)) {
1395 			struct sock_xprt *transport = container_of(xprt,
1396 					struct sock_xprt, xprt);
1397 
1398 			/* Reset TCP record info */
1399 			transport->tcp_offset = 0;
1400 			transport->tcp_reclen = 0;
1401 			transport->tcp_copied = 0;
1402 			transport->tcp_flags =
1403 				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1404 
1405 			xprt_wake_pending_tasks(xprt, -EAGAIN);
1406 		}
1407 		spin_unlock_bh(&xprt->transport_lock);
1408 		break;
1409 	case TCP_FIN_WAIT1:
1410 		/* The client initiated a shutdown of the socket */
1411 		xprt->connect_cookie++;
1412 		xprt->reestablish_timeout = 0;
1413 		set_bit(XPRT_CLOSING, &xprt->state);
1414 		smp_mb__before_clear_bit();
1415 		clear_bit(XPRT_CONNECTED, &xprt->state);
1416 		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1417 		smp_mb__after_clear_bit();
1418 		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1419 		break;
1420 	case TCP_CLOSE_WAIT:
1421 		/* The server initiated a shutdown of the socket */
1422 		xprt_force_disconnect(xprt);
1423 	case TCP_SYN_SENT:
1424 		xprt->connect_cookie++;
1425 	case TCP_CLOSING:
1426 		/*
1427 		 * If the server closed down the connection, make sure that
1428 		 * we back off before reconnecting
1429 		 */
1430 		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1431 			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1432 		break;
1433 	case TCP_LAST_ACK:
1434 		set_bit(XPRT_CLOSING, &xprt->state);
1435 		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1436 		smp_mb__before_clear_bit();
1437 		clear_bit(XPRT_CONNECTED, &xprt->state);
1438 		smp_mb__after_clear_bit();
1439 		break;
1440 	case TCP_CLOSE:
1441 		xs_tcp_cancel_linger_timeout(xprt);
1442 		xs_sock_mark_closed(xprt);
1443 	}
1444  out:
1445 	read_unlock(&sk->sk_callback_lock);
1446 }
1447 
1448 /**
1449  * xs_error_report - callback mainly for catching socket errors
1450  * @sk: socket
1451  */
1452 static void xs_error_report(struct sock *sk)
1453 {
1454 	struct rpc_xprt *xprt;
1455 
1456 	read_lock(&sk->sk_callback_lock);
1457 	if (!(xprt = xprt_from_sock(sk)))
1458 		goto out;
1459 	dprintk("RPC:       %s client %p...\n"
1460 			"RPC:       error %d\n",
1461 			__func__, xprt, sk->sk_err);
1462 	xprt_wake_pending_tasks(xprt, -EAGAIN);
1463 out:
1464 	read_unlock(&sk->sk_callback_lock);
1465 }
1466 
1467 static void xs_write_space(struct sock *sk)
1468 {
1469 	struct socket *sock;
1470 	struct rpc_xprt *xprt;
1471 
1472 	if (unlikely(!(sock = sk->sk_socket)))
1473 		return;
1474 	clear_bit(SOCK_NOSPACE, &sock->flags);
1475 
1476 	if (unlikely(!(xprt = xprt_from_sock(sk))))
1477 		return;
1478 	if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1479 		return;
1480 
1481 	xprt_write_space(xprt);
1482 }
1483 
1484 /**
1485  * xs_udp_write_space - callback invoked when socket buffer space
1486  *                             becomes available
1487  * @sk: socket whose state has changed
1488  *
1489  * Called when more output buffer space is available for this socket.
1490  * We try not to wake our writers until they can make "significant"
1491  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1492  * with a bunch of small requests.
1493  */
1494 static void xs_udp_write_space(struct sock *sk)
1495 {
1496 	read_lock(&sk->sk_callback_lock);
1497 
1498 	/* from net/core/sock.c:sock_def_write_space */
1499 	if (sock_writeable(sk))
1500 		xs_write_space(sk);
1501 
1502 	read_unlock(&sk->sk_callback_lock);
1503 }
1504 
1505 /**
1506  * xs_tcp_write_space - callback invoked when socket buffer space
1507  *                             becomes available
1508  * @sk: socket whose state has changed
1509  *
1510  * Called when more output buffer space is available for this socket.
1511  * We try not to wake our writers until they can make "significant"
1512  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1513  * with a bunch of small requests.
1514  */
1515 static void xs_tcp_write_space(struct sock *sk)
1516 {
1517 	read_lock(&sk->sk_callback_lock);
1518 
1519 	/* from net/core/stream.c:sk_stream_write_space */
1520 	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1521 		xs_write_space(sk);
1522 
1523 	read_unlock(&sk->sk_callback_lock);
1524 }
1525 
1526 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1527 {
1528 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1529 	struct sock *sk = transport->inet;
1530 
1531 	if (transport->rcvsize) {
1532 		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1533 		sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1534 	}
1535 	if (transport->sndsize) {
1536 		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1537 		sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1538 		sk->sk_write_space(sk);
1539 	}
1540 }
1541 
1542 /**
1543  * xs_udp_set_buffer_size - set send and receive limits
1544  * @xprt: generic transport
1545  * @sndsize: requested size of send buffer, in bytes
1546  * @rcvsize: requested size of receive buffer, in bytes
1547  *
1548  * Set socket send and receive buffer size limits.
1549  */
1550 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1551 {
1552 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1553 
1554 	transport->sndsize = 0;
1555 	if (sndsize)
1556 		transport->sndsize = sndsize + 1024;
1557 	transport->rcvsize = 0;
1558 	if (rcvsize)
1559 		transport->rcvsize = rcvsize + 1024;
1560 
1561 	xs_udp_do_set_buffer_size(xprt);
1562 }
1563 
1564 /**
1565  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1566  * @task: task that timed out
1567  *
1568  * Adjust the congestion window after a retransmit timeout has occurred.
1569  */
1570 static void xs_udp_timer(struct rpc_task *task)
1571 {
1572 	xprt_adjust_cwnd(task, -ETIMEDOUT);
1573 }
1574 
1575 static unsigned short xs_get_random_port(void)
1576 {
1577 	unsigned short range = xprt_max_resvport - xprt_min_resvport;
1578 	unsigned short rand = (unsigned short) net_random() % range;
1579 	return rand + xprt_min_resvport;
1580 }
1581 
1582 /**
1583  * xs_set_port - reset the port number in the remote endpoint address
1584  * @xprt: generic transport
1585  * @port: new port number
1586  *
1587  */
1588 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1589 {
1590 	struct sockaddr *addr = xs_addr(xprt);
1591 
1592 	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1593 
1594 	switch (addr->sa_family) {
1595 	case AF_INET:
1596 		((struct sockaddr_in *)addr)->sin_port = htons(port);
1597 		break;
1598 	case AF_INET6:
1599 		((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
1600 		break;
1601 	default:
1602 		BUG();
1603 	}
1604 }
1605 
1606 static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock)
1607 {
1608 	unsigned short port = transport->port;
1609 
1610 	if (port == 0 && transport->xprt.resvport)
1611 		port = xs_get_random_port();
1612 	return port;
1613 }
1614 
1615 static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port)
1616 {
1617 	if (transport->port != 0)
1618 		transport->port = 0;
1619 	if (!transport->xprt.resvport)
1620 		return 0;
1621 	if (port <= xprt_min_resvport || port > xprt_max_resvport)
1622 		return xprt_max_resvport;
1623 	return --port;
1624 }
1625 
1626 static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
1627 {
1628 	struct sockaddr_in myaddr = {
1629 		.sin_family = AF_INET,
1630 	};
1631 	struct sockaddr_in *sa;
1632 	int err, nloop = 0;
1633 	unsigned short port = xs_get_srcport(transport, sock);
1634 	unsigned short last;
1635 
1636 	sa = (struct sockaddr_in *)&transport->addr;
1637 	myaddr.sin_addr = sa->sin_addr;
1638 	do {
1639 		myaddr.sin_port = htons(port);
1640 		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1641 						sizeof(myaddr));
1642 		if (port == 0)
1643 			break;
1644 		if (err == 0) {
1645 			transport->port = port;
1646 			break;
1647 		}
1648 		last = port;
1649 		port = xs_next_srcport(transport, sock, port);
1650 		if (port > last)
1651 			nloop++;
1652 	} while (err == -EADDRINUSE && nloop != 2);
1653 	dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
1654 			__func__, &myaddr.sin_addr,
1655 			port, err ? "failed" : "ok", err);
1656 	return err;
1657 }
1658 
1659 static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
1660 {
1661 	struct sockaddr_in6 myaddr = {
1662 		.sin6_family = AF_INET6,
1663 	};
1664 	struct sockaddr_in6 *sa;
1665 	int err, nloop = 0;
1666 	unsigned short port = xs_get_srcport(transport, sock);
1667 	unsigned short last;
1668 
1669 	sa = (struct sockaddr_in6 *)&transport->addr;
1670 	myaddr.sin6_addr = sa->sin6_addr;
1671 	do {
1672 		myaddr.sin6_port = htons(port);
1673 		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1674 						sizeof(myaddr));
1675 		if (port == 0)
1676 			break;
1677 		if (err == 0) {
1678 			transport->port = port;
1679 			break;
1680 		}
1681 		last = port;
1682 		port = xs_next_srcport(transport, sock, port);
1683 		if (port > last)
1684 			nloop++;
1685 	} while (err == -EADDRINUSE && nloop != 2);
1686 	dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
1687 		&myaddr.sin6_addr, port, err ? "failed" : "ok", err);
1688 	return err;
1689 }
1690 
1691 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1692 static struct lock_class_key xs_key[2];
1693 static struct lock_class_key xs_slock_key[2];
1694 
1695 static inline void xs_reclassify_socket4(struct socket *sock)
1696 {
1697 	struct sock *sk = sock->sk;
1698 
1699 	BUG_ON(sock_owned_by_user(sk));
1700 	sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1701 		&xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1702 }
1703 
1704 static inline void xs_reclassify_socket6(struct socket *sock)
1705 {
1706 	struct sock *sk = sock->sk;
1707 
1708 	BUG_ON(sock_owned_by_user(sk));
1709 	sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1710 		&xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1711 }
1712 #else
1713 static inline void xs_reclassify_socket4(struct socket *sock)
1714 {
1715 }
1716 
1717 static inline void xs_reclassify_socket6(struct socket *sock)
1718 {
1719 }
1720 #endif
1721 
1722 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1723 {
1724 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1725 
1726 	if (!transport->inet) {
1727 		struct sock *sk = sock->sk;
1728 
1729 		write_lock_bh(&sk->sk_callback_lock);
1730 
1731 		xs_save_old_callbacks(transport, sk);
1732 
1733 		sk->sk_user_data = xprt;
1734 		sk->sk_data_ready = xs_udp_data_ready;
1735 		sk->sk_write_space = xs_udp_write_space;
1736 		sk->sk_error_report = xs_error_report;
1737 		sk->sk_no_check = UDP_CSUM_NORCV;
1738 		sk->sk_allocation = GFP_ATOMIC;
1739 
1740 		xprt_set_connected(xprt);
1741 
1742 		/* Reset to new socket */
1743 		transport->sock = sock;
1744 		transport->inet = sk;
1745 
1746 		write_unlock_bh(&sk->sk_callback_lock);
1747 	}
1748 	xs_udp_do_set_buffer_size(xprt);
1749 }
1750 
1751 /**
1752  * xs_udp_connect_worker4 - set up a UDP socket
1753  * @work: RPC transport to connect
1754  *
1755  * Invoked by a work queue tasklet.
1756  */
1757 static void xs_udp_connect_worker4(struct work_struct *work)
1758 {
1759 	struct sock_xprt *transport =
1760 		container_of(work, struct sock_xprt, connect_worker.work);
1761 	struct rpc_xprt *xprt = &transport->xprt;
1762 	struct socket *sock = transport->sock;
1763 	int err, status = -EIO;
1764 
1765 	if (xprt->shutdown)
1766 		goto out;
1767 
1768 	/* Start by resetting any existing state */
1769 	xs_reset_transport(transport);
1770 
1771 	err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock);
1772 	if (err < 0) {
1773 		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1774 		goto out;
1775 	}
1776 	xs_reclassify_socket4(sock);
1777 
1778 	if (xs_bind4(transport, sock)) {
1779 		sock_release(sock);
1780 		goto out;
1781 	}
1782 
1783 	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1784 			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1785 
1786 	xs_udp_finish_connecting(xprt, sock);
1787 	status = 0;
1788 out:
1789 	xprt_clear_connecting(xprt);
1790 	xprt_wake_pending_tasks(xprt, status);
1791 }
1792 
1793 /**
1794  * xs_udp_connect_worker6 - set up a UDP socket
1795  * @work: RPC transport to connect
1796  *
1797  * Invoked by a work queue tasklet.
1798  */
1799 static void xs_udp_connect_worker6(struct work_struct *work)
1800 {
1801 	struct sock_xprt *transport =
1802 		container_of(work, struct sock_xprt, connect_worker.work);
1803 	struct rpc_xprt *xprt = &transport->xprt;
1804 	struct socket *sock = transport->sock;
1805 	int err, status = -EIO;
1806 
1807 	if (xprt->shutdown)
1808 		goto out;
1809 
1810 	/* Start by resetting any existing state */
1811 	xs_reset_transport(transport);
1812 
1813 	err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock);
1814 	if (err < 0) {
1815 		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1816 		goto out;
1817 	}
1818 	xs_reclassify_socket6(sock);
1819 
1820 	if (xs_bind6(transport, sock) < 0) {
1821 		sock_release(sock);
1822 		goto out;
1823 	}
1824 
1825 	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1826 			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1827 
1828 	xs_udp_finish_connecting(xprt, sock);
1829 	status = 0;
1830 out:
1831 	xprt_clear_connecting(xprt);
1832 	xprt_wake_pending_tasks(xprt, status);
1833 }
1834 
1835 /*
1836  * We need to preserve the port number so the reply cache on the server can
1837  * find our cached RPC replies when we get around to reconnecting.
1838  */
1839 static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1840 {
1841 	int result;
1842 	struct sockaddr any;
1843 
1844 	dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1845 
1846 	/*
1847 	 * Disconnect the transport socket by doing a connect operation
1848 	 * with AF_UNSPEC.  This should return immediately...
1849 	 */
1850 	memset(&any, 0, sizeof(any));
1851 	any.sa_family = AF_UNSPEC;
1852 	result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1853 	if (!result)
1854 		xs_sock_mark_closed(xprt);
1855 	else
1856 		dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1857 				result);
1858 }
1859 
1860 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1861 {
1862 	unsigned int state = transport->inet->sk_state;
1863 
1864 	if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
1865 		return;
1866 	if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
1867 		return;
1868 	xs_abort_connection(xprt, transport);
1869 }
1870 
1871 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1872 {
1873 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1874 
1875 	if (!transport->inet) {
1876 		struct sock *sk = sock->sk;
1877 
1878 		write_lock_bh(&sk->sk_callback_lock);
1879 
1880 		xs_save_old_callbacks(transport, sk);
1881 
1882 		sk->sk_user_data = xprt;
1883 		sk->sk_data_ready = xs_tcp_data_ready;
1884 		sk->sk_state_change = xs_tcp_state_change;
1885 		sk->sk_write_space = xs_tcp_write_space;
1886 		sk->sk_error_report = xs_error_report;
1887 		sk->sk_allocation = GFP_ATOMIC;
1888 
1889 		/* socket options */
1890 		sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1891 		sock_reset_flag(sk, SOCK_LINGER);
1892 		tcp_sk(sk)->linger2 = 0;
1893 		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1894 
1895 		xprt_clear_connected(xprt);
1896 
1897 		/* Reset to new socket */
1898 		transport->sock = sock;
1899 		transport->inet = sk;
1900 
1901 		write_unlock_bh(&sk->sk_callback_lock);
1902 	}
1903 
1904 	if (!xprt_bound(xprt))
1905 		return -ENOTCONN;
1906 
1907 	/* Tell the socket layer to start connecting... */
1908 	xprt->stat.connect_count++;
1909 	xprt->stat.connect_start = jiffies;
1910 	return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
1911 }
1912 
1913 /**
1914  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
1915  * @xprt: RPC transport to connect
1916  * @transport: socket transport to connect
1917  * @create_sock: function to create a socket of the correct type
1918  *
1919  * Invoked by a work queue tasklet.
1920  */
1921 static void xs_tcp_setup_socket(struct rpc_xprt *xprt,
1922 		struct sock_xprt *transport,
1923 		struct socket *(*create_sock)(struct rpc_xprt *,
1924 			struct sock_xprt *))
1925 {
1926 	struct socket *sock = transport->sock;
1927 	int status = -EIO;
1928 
1929 	if (xprt->shutdown)
1930 		goto out;
1931 
1932 	if (!sock) {
1933 		clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1934 		sock = create_sock(xprt, transport);
1935 		if (IS_ERR(sock)) {
1936 			status = PTR_ERR(sock);
1937 			goto out;
1938 		}
1939 	} else {
1940 		int abort_and_exit;
1941 
1942 		abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
1943 				&xprt->state);
1944 		/* "close" the socket, preserving the local port */
1945 		xs_tcp_reuse_connection(xprt, transport);
1946 
1947 		if (abort_and_exit)
1948 			goto out_eagain;
1949 	}
1950 
1951 	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1952 			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1953 
1954 	status = xs_tcp_finish_connecting(xprt, sock);
1955 	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1956 			xprt, -status, xprt_connected(xprt),
1957 			sock->sk->sk_state);
1958 	switch (status) {
1959 	default:
1960 		printk("%s: connect returned unhandled error %d\n",
1961 			__func__, status);
1962 	case -EADDRNOTAVAIL:
1963 		/* We're probably in TIME_WAIT. Get rid of existing socket,
1964 		 * and retry
1965 		 */
1966 		set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1967 		xprt_force_disconnect(xprt);
1968 		break;
1969 	case -ECONNREFUSED:
1970 	case -ECONNRESET:
1971 	case -ENETUNREACH:
1972 		/* retry with existing socket, after a delay */
1973 	case 0:
1974 	case -EINPROGRESS:
1975 	case -EALREADY:
1976 		xprt_clear_connecting(xprt);
1977 		return;
1978 	}
1979 out_eagain:
1980 	status = -EAGAIN;
1981 out:
1982 	xprt_clear_connecting(xprt);
1983 	xprt_wake_pending_tasks(xprt, status);
1984 }
1985 
1986 static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt,
1987 		struct sock_xprt *transport)
1988 {
1989 	struct socket *sock;
1990 	int err;
1991 
1992 	/* start from scratch */
1993 	err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1994 	if (err < 0) {
1995 		dprintk("RPC:       can't create TCP transport socket (%d).\n",
1996 				-err);
1997 		goto out_err;
1998 	}
1999 	xs_reclassify_socket4(sock);
2000 
2001 	if (xs_bind4(transport, sock) < 0) {
2002 		sock_release(sock);
2003 		goto out_err;
2004 	}
2005 	return sock;
2006 out_err:
2007 	return ERR_PTR(-EIO);
2008 }
2009 
2010 /**
2011  * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
2012  * @work: RPC transport to connect
2013  *
2014  * Invoked by a work queue tasklet.
2015  */
2016 static void xs_tcp_connect_worker4(struct work_struct *work)
2017 {
2018 	struct sock_xprt *transport =
2019 		container_of(work, struct sock_xprt, connect_worker.work);
2020 	struct rpc_xprt *xprt = &transport->xprt;
2021 
2022 	xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4);
2023 }
2024 
2025 static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt,
2026 		struct sock_xprt *transport)
2027 {
2028 	struct socket *sock;
2029 	int err;
2030 
2031 	/* start from scratch */
2032 	err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock);
2033 	if (err < 0) {
2034 		dprintk("RPC:       can't create TCP transport socket (%d).\n",
2035 				-err);
2036 		goto out_err;
2037 	}
2038 	xs_reclassify_socket6(sock);
2039 
2040 	if (xs_bind6(transport, sock) < 0) {
2041 		sock_release(sock);
2042 		goto out_err;
2043 	}
2044 	return sock;
2045 out_err:
2046 	return ERR_PTR(-EIO);
2047 }
2048 
2049 /**
2050  * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
2051  * @work: RPC transport to connect
2052  *
2053  * Invoked by a work queue tasklet.
2054  */
2055 static void xs_tcp_connect_worker6(struct work_struct *work)
2056 {
2057 	struct sock_xprt *transport =
2058 		container_of(work, struct sock_xprt, connect_worker.work);
2059 	struct rpc_xprt *xprt = &transport->xprt;
2060 
2061 	xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6);
2062 }
2063 
2064 /**
2065  * xs_connect - connect a socket to a remote endpoint
2066  * @task: address of RPC task that manages state of connect request
2067  *
2068  * TCP: If the remote end dropped the connection, delay reconnecting.
2069  *
2070  * UDP socket connects are synchronous, but we use a work queue anyway
2071  * to guarantee that even unprivileged user processes can set up a
2072  * socket on a privileged port.
2073  *
2074  * If a UDP socket connect fails, the delay behavior here prevents
2075  * retry floods (hard mounts).
2076  */
2077 static void xs_connect(struct rpc_task *task)
2078 {
2079 	struct rpc_xprt *xprt = task->tk_xprt;
2080 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2081 
2082 	if (xprt_test_and_set_connecting(xprt))
2083 		return;
2084 
2085 	if (transport->sock != NULL) {
2086 		dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2087 				"seconds\n",
2088 				xprt, xprt->reestablish_timeout / HZ);
2089 		queue_delayed_work(rpciod_workqueue,
2090 				   &transport->connect_worker,
2091 				   xprt->reestablish_timeout);
2092 		xprt->reestablish_timeout <<= 1;
2093 		if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2094 			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2095 	} else {
2096 		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2097 		queue_delayed_work(rpciod_workqueue,
2098 				   &transport->connect_worker, 0);
2099 	}
2100 }
2101 
2102 static void xs_tcp_connect(struct rpc_task *task)
2103 {
2104 	struct rpc_xprt *xprt = task->tk_xprt;
2105 
2106 	/* Exit if we need to wait for socket shutdown to complete */
2107 	if (test_bit(XPRT_CLOSING, &xprt->state))
2108 		return;
2109 	xs_connect(task);
2110 }
2111 
2112 /**
2113  * xs_udp_print_stats - display UDP socket-specifc stats
2114  * @xprt: rpc_xprt struct containing statistics
2115  * @seq: output file
2116  *
2117  */
2118 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2119 {
2120 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2121 
2122 	seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
2123 			transport->port,
2124 			xprt->stat.bind_count,
2125 			xprt->stat.sends,
2126 			xprt->stat.recvs,
2127 			xprt->stat.bad_xids,
2128 			xprt->stat.req_u,
2129 			xprt->stat.bklog_u);
2130 }
2131 
2132 /**
2133  * xs_tcp_print_stats - display TCP socket-specifc stats
2134  * @xprt: rpc_xprt struct containing statistics
2135  * @seq: output file
2136  *
2137  */
2138 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2139 {
2140 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2141 	long idle_time = 0;
2142 
2143 	if (xprt_connected(xprt))
2144 		idle_time = (long)(jiffies - xprt->last_used) / HZ;
2145 
2146 	seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
2147 			transport->port,
2148 			xprt->stat.bind_count,
2149 			xprt->stat.connect_count,
2150 			xprt->stat.connect_time,
2151 			idle_time,
2152 			xprt->stat.sends,
2153 			xprt->stat.recvs,
2154 			xprt->stat.bad_xids,
2155 			xprt->stat.req_u,
2156 			xprt->stat.bklog_u);
2157 }
2158 
2159 static struct rpc_xprt_ops xs_udp_ops = {
2160 	.set_buffer_size	= xs_udp_set_buffer_size,
2161 	.reserve_xprt		= xprt_reserve_xprt_cong,
2162 	.release_xprt		= xprt_release_xprt_cong,
2163 	.rpcbind		= rpcb_getport_async,
2164 	.set_port		= xs_set_port,
2165 	.connect		= xs_connect,
2166 	.buf_alloc		= rpc_malloc,
2167 	.buf_free		= rpc_free,
2168 	.send_request		= xs_udp_send_request,
2169 	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
2170 	.timer			= xs_udp_timer,
2171 	.release_request	= xprt_release_rqst_cong,
2172 	.close			= xs_close,
2173 	.destroy		= xs_destroy,
2174 	.print_stats		= xs_udp_print_stats,
2175 };
2176 
2177 static struct rpc_xprt_ops xs_tcp_ops = {
2178 	.reserve_xprt		= xprt_reserve_xprt,
2179 	.release_xprt		= xs_tcp_release_xprt,
2180 	.rpcbind		= rpcb_getport_async,
2181 	.set_port		= xs_set_port,
2182 	.connect		= xs_tcp_connect,
2183 	.buf_alloc		= rpc_malloc,
2184 	.buf_free		= rpc_free,
2185 	.send_request		= xs_tcp_send_request,
2186 	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
2187 #if defined(CONFIG_NFS_V4_1)
2188 	.release_request	= bc_release_request,
2189 #endif /* CONFIG_NFS_V4_1 */
2190 	.close			= xs_tcp_close,
2191 	.destroy		= xs_destroy,
2192 	.print_stats		= xs_tcp_print_stats,
2193 };
2194 
2195 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2196 				      unsigned int slot_table_size)
2197 {
2198 	struct rpc_xprt *xprt;
2199 	struct sock_xprt *new;
2200 
2201 	if (args->addrlen > sizeof(xprt->addr)) {
2202 		dprintk("RPC:       xs_setup_xprt: address too large\n");
2203 		return ERR_PTR(-EBADF);
2204 	}
2205 
2206 	new = kzalloc(sizeof(*new), GFP_KERNEL);
2207 	if (new == NULL) {
2208 		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2209 				"rpc_xprt\n");
2210 		return ERR_PTR(-ENOMEM);
2211 	}
2212 	xprt = &new->xprt;
2213 
2214 	xprt->max_reqs = slot_table_size;
2215 	xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
2216 	if (xprt->slot == NULL) {
2217 		kfree(xprt);
2218 		dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
2219 				"table\n");
2220 		return ERR_PTR(-ENOMEM);
2221 	}
2222 
2223 	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2224 	xprt->addrlen = args->addrlen;
2225 	if (args->srcaddr)
2226 		memcpy(&new->addr, args->srcaddr, args->addrlen);
2227 
2228 	return xprt;
2229 }
2230 
2231 static const struct rpc_timeout xs_udp_default_timeout = {
2232 	.to_initval = 5 * HZ,
2233 	.to_maxval = 30 * HZ,
2234 	.to_increment = 5 * HZ,
2235 	.to_retries = 5,
2236 };
2237 
2238 /**
2239  * xs_setup_udp - Set up transport to use a UDP socket
2240  * @args: rpc transport creation arguments
2241  *
2242  */
2243 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2244 {
2245 	struct sockaddr *addr = args->dstaddr;
2246 	struct rpc_xprt *xprt;
2247 	struct sock_xprt *transport;
2248 
2249 	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
2250 	if (IS_ERR(xprt))
2251 		return xprt;
2252 	transport = container_of(xprt, struct sock_xprt, xprt);
2253 
2254 	xprt->prot = IPPROTO_UDP;
2255 	xprt->tsh_size = 0;
2256 	/* XXX: header size can vary due to auth type, IPv6, etc. */
2257 	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2258 
2259 	xprt->bind_timeout = XS_BIND_TO;
2260 	xprt->connect_timeout = XS_UDP_CONN_TO;
2261 	xprt->reestablish_timeout = XS_UDP_REEST_TO;
2262 	xprt->idle_timeout = XS_IDLE_DISC_TO;
2263 
2264 	xprt->ops = &xs_udp_ops;
2265 
2266 	xprt->timeout = &xs_udp_default_timeout;
2267 
2268 	switch (addr->sa_family) {
2269 	case AF_INET:
2270 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2271 			xprt_set_bound(xprt);
2272 
2273 		INIT_DELAYED_WORK(&transport->connect_worker,
2274 					xs_udp_connect_worker4);
2275 		xs_format_ipv4_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2276 		break;
2277 	case AF_INET6:
2278 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2279 			xprt_set_bound(xprt);
2280 
2281 		INIT_DELAYED_WORK(&transport->connect_worker,
2282 					xs_udp_connect_worker6);
2283 		xs_format_ipv6_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2284 		break;
2285 	default:
2286 		kfree(xprt);
2287 		return ERR_PTR(-EAFNOSUPPORT);
2288 	}
2289 
2290 	dprintk("RPC:       set up transport to address %s\n",
2291 			xprt->address_strings[RPC_DISPLAY_ALL]);
2292 
2293 	if (try_module_get(THIS_MODULE))
2294 		return xprt;
2295 
2296 	kfree(xprt->slot);
2297 	kfree(xprt);
2298 	return ERR_PTR(-EINVAL);
2299 }
2300 
2301 static const struct rpc_timeout xs_tcp_default_timeout = {
2302 	.to_initval = 60 * HZ,
2303 	.to_maxval = 60 * HZ,
2304 	.to_retries = 2,
2305 };
2306 
2307 /**
2308  * xs_setup_tcp - Set up transport to use a TCP socket
2309  * @args: rpc transport creation arguments
2310  *
2311  */
2312 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2313 {
2314 	struct sockaddr *addr = args->dstaddr;
2315 	struct rpc_xprt *xprt;
2316 	struct sock_xprt *transport;
2317 
2318 	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2319 	if (IS_ERR(xprt))
2320 		return xprt;
2321 	transport = container_of(xprt, struct sock_xprt, xprt);
2322 
2323 	xprt->prot = IPPROTO_TCP;
2324 	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2325 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2326 
2327 	xprt->bind_timeout = XS_BIND_TO;
2328 	xprt->connect_timeout = XS_TCP_CONN_TO;
2329 	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2330 	xprt->idle_timeout = XS_IDLE_DISC_TO;
2331 
2332 	xprt->ops = &xs_tcp_ops;
2333 	xprt->timeout = &xs_tcp_default_timeout;
2334 
2335 	switch (addr->sa_family) {
2336 	case AF_INET:
2337 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2338 			xprt_set_bound(xprt);
2339 
2340 		INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4);
2341 		xs_format_ipv4_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2342 		break;
2343 	case AF_INET6:
2344 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2345 			xprt_set_bound(xprt);
2346 
2347 		INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6);
2348 		xs_format_ipv6_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2349 		break;
2350 	default:
2351 		kfree(xprt);
2352 		return ERR_PTR(-EAFNOSUPPORT);
2353 	}
2354 
2355 	dprintk("RPC:       set up transport to address %s\n",
2356 			xprt->address_strings[RPC_DISPLAY_ALL]);
2357 
2358 	if (try_module_get(THIS_MODULE))
2359 		return xprt;
2360 
2361 	kfree(xprt->slot);
2362 	kfree(xprt);
2363 	return ERR_PTR(-EINVAL);
2364 }
2365 
2366 static struct xprt_class	xs_udp_transport = {
2367 	.list		= LIST_HEAD_INIT(xs_udp_transport.list),
2368 	.name		= "udp",
2369 	.owner		= THIS_MODULE,
2370 	.ident		= IPPROTO_UDP,
2371 	.setup		= xs_setup_udp,
2372 };
2373 
2374 static struct xprt_class	xs_tcp_transport = {
2375 	.list		= LIST_HEAD_INIT(xs_tcp_transport.list),
2376 	.name		= "tcp",
2377 	.owner		= THIS_MODULE,
2378 	.ident		= IPPROTO_TCP,
2379 	.setup		= xs_setup_tcp,
2380 };
2381 
2382 /**
2383  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2384  *
2385  */
2386 int init_socket_xprt(void)
2387 {
2388 #ifdef RPC_DEBUG
2389 	if (!sunrpc_table_header)
2390 		sunrpc_table_header = register_sysctl_table(sunrpc_table);
2391 #endif
2392 
2393 	xprt_register_transport(&xs_udp_transport);
2394 	xprt_register_transport(&xs_tcp_transport);
2395 
2396 	return 0;
2397 }
2398 
2399 /**
2400  * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2401  *
2402  */
2403 void cleanup_socket_xprt(void)
2404 {
2405 #ifdef RPC_DEBUG
2406 	if (sunrpc_table_header) {
2407 		unregister_sysctl_table(sunrpc_table_header);
2408 		sunrpc_table_header = NULL;
2409 	}
2410 #endif
2411 
2412 	xprt_unregister_transport(&xs_udp_transport);
2413 	xprt_unregister_transport(&xs_tcp_transport);
2414 }
2415