xref: /linux/net/sunrpc/xprt.c (revision d67b569f5f620c0fb95d5212642746b7ba9d29e4)
1 /*
2  *  linux/net/sunrpc/xprt.c
3  *
4  *  This is a generic RPC call interface supporting congestion avoidance,
5  *  and asynchronous calls.
6  *
7  *  The interface works like this:
8  *
9  *  -	When a process places a call, it allocates a request slot if
10  *	one is available. Otherwise, it sleeps on the backlog queue
11  *	(xprt_reserve).
12  *  -	Next, the caller puts together the RPC message, stuffs it into
13  *	the request struct, and calls xprt_call().
14  *  -	xprt_call transmits the message and installs the caller on the
15  *	socket's wait list. At the same time, it installs a timer that
16  *	is run after the packet's timeout has expired.
17  *  -	When a packet arrives, the data_ready handler walks the list of
18  *	pending requests for that socket. If a matching XID is found, the
19  *	caller is woken up, and the timer removed.
20  *  -	When no reply arrives within the timeout interval, the timer is
21  *	fired by the kernel and runs xprt_timer(). It either adjusts the
22  *	timeout values (minor timeout) or wakes up the caller with a status
23  *	of -ETIMEDOUT.
24  *  -	When the caller receives a notification from RPC that a reply arrived,
25  *	it should release the RPC slot, and process the reply.
26  *	If the call timed out, it may choose to retry the operation by
27  *	adjusting the initial timeout value, and simply calling rpc_call
28  *	again.
29  *
30  *  Support for async RPC is done through a set of RPC-specific scheduling
31  *  primitives that `transparently' work for processes as well as async
32  *  tasks that rely on callbacks.
33  *
34  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35  *
36  *  TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37  *  TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38  *  TCP NFS related read + write fixes
39  *   (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
40  *
41  *  Rewrite of larges part of the code in order to stabilize TCP stuff.
42  *  Fix behaviour when socket buffer is full.
43  *   (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
44  */
45 
46 #include <linux/types.h>
47 #include <linux/slab.h>
48 #include <linux/capability.h>
49 #include <linux/sched.h>
50 #include <linux/errno.h>
51 #include <linux/socket.h>
52 #include <linux/in.h>
53 #include <linux/net.h>
54 #include <linux/mm.h>
55 #include <linux/udp.h>
56 #include <linux/tcp.h>
57 #include <linux/sunrpc/clnt.h>
58 #include <linux/file.h>
59 #include <linux/workqueue.h>
60 #include <linux/random.h>
61 
62 #include <net/sock.h>
63 #include <net/checksum.h>
64 #include <net/udp.h>
65 #include <net/tcp.h>
66 
67 /*
68  * Local variables
69  */
70 
71 #ifdef RPC_DEBUG
72 # undef  RPC_DEBUG_DATA
73 # define RPCDBG_FACILITY	RPCDBG_XPRT
74 #endif
75 
76 #define XPRT_MAX_BACKOFF	(8)
77 #define XPRT_IDLE_TIMEOUT	(5*60*HZ)
78 #define XPRT_MAX_RESVPORT	(800)
79 
80 /*
81  * Local functions
82  */
83 static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);
84 static inline void	do_xprt_reserve(struct rpc_task *);
85 static void	xprt_disconnect(struct rpc_xprt *);
86 static void	xprt_connect_status(struct rpc_task *task);
87 static struct rpc_xprt * xprt_setup(int proto, struct sockaddr_in *ap,
88 						struct rpc_timeout *to);
89 static struct socket *xprt_create_socket(struct rpc_xprt *, int, int);
90 static void	xprt_bind_socket(struct rpc_xprt *, struct socket *);
91 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
92 
93 static int	xprt_clear_backlog(struct rpc_xprt *xprt);
94 
95 #ifdef RPC_DEBUG_DATA
96 /*
97  * Print the buffer contents (first 128 bytes only--just enough for
98  * diropres return).
99  */
100 static void
101 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
102 {
103 	u8	*buf = (u8 *) packet;
104 	int	j;
105 
106 	dprintk("RPC:      %s\n", msg);
107 	for (j = 0; j < count && j < 128; j += 4) {
108 		if (!(j & 31)) {
109 			if (j)
110 				dprintk("\n");
111 			dprintk("0x%04x ", j);
112 		}
113 		dprintk("%02x%02x%02x%02x ",
114 			buf[j], buf[j+1], buf[j+2], buf[j+3]);
115 	}
116 	dprintk("\n");
117 }
118 #else
119 static inline void
120 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
121 {
122 	/* NOP */
123 }
124 #endif
125 
126 /*
127  * Look up RPC transport given an INET socket
128  */
129 static inline struct rpc_xprt *
130 xprt_from_sock(struct sock *sk)
131 {
132 	return (struct rpc_xprt *) sk->sk_user_data;
133 }
134 
135 /*
136  * Serialize write access to sockets, in order to prevent different
137  * requests from interfering with each other.
138  * Also prevents TCP socket connects from colliding with writes.
139  */
140 static int
141 __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
142 {
143 	struct rpc_rqst *req = task->tk_rqstp;
144 
145 	if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) {
146 		if (task == xprt->snd_task)
147 			return 1;
148 		if (task == NULL)
149 			return 0;
150 		goto out_sleep;
151 	}
152 	if (xprt->nocong || __xprt_get_cong(xprt, task)) {
153 		xprt->snd_task = task;
154 		if (req) {
155 			req->rq_bytes_sent = 0;
156 			req->rq_ntrans++;
157 		}
158 		return 1;
159 	}
160 	smp_mb__before_clear_bit();
161 	clear_bit(XPRT_LOCKED, &xprt->sockstate);
162 	smp_mb__after_clear_bit();
163 out_sleep:
164 	dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt);
165 	task->tk_timeout = 0;
166 	task->tk_status = -EAGAIN;
167 	if (req && req->rq_ntrans)
168 		rpc_sleep_on(&xprt->resend, task, NULL, NULL);
169 	else
170 		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
171 	return 0;
172 }
173 
174 static inline int
175 xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
176 {
177 	int retval;
178 
179 	spin_lock_bh(&xprt->sock_lock);
180 	retval = __xprt_lock_write(xprt, task);
181 	spin_unlock_bh(&xprt->sock_lock);
182 	return retval;
183 }
184 
185 
186 static void
187 __xprt_lock_write_next(struct rpc_xprt *xprt)
188 {
189 	struct rpc_task *task;
190 
191 	if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate))
192 		return;
193 	if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
194 		goto out_unlock;
195 	task = rpc_wake_up_next(&xprt->resend);
196 	if (!task) {
197 		task = rpc_wake_up_next(&xprt->sending);
198 		if (!task)
199 			goto out_unlock;
200 	}
201 	if (xprt->nocong || __xprt_get_cong(xprt, task)) {
202 		struct rpc_rqst *req = task->tk_rqstp;
203 		xprt->snd_task = task;
204 		if (req) {
205 			req->rq_bytes_sent = 0;
206 			req->rq_ntrans++;
207 		}
208 		return;
209 	}
210 out_unlock:
211 	smp_mb__before_clear_bit();
212 	clear_bit(XPRT_LOCKED, &xprt->sockstate);
213 	smp_mb__after_clear_bit();
214 }
215 
216 /*
217  * Releases the socket for use by other requests.
218  */
219 static void
220 __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
221 {
222 	if (xprt->snd_task == task) {
223 		xprt->snd_task = NULL;
224 		smp_mb__before_clear_bit();
225 		clear_bit(XPRT_LOCKED, &xprt->sockstate);
226 		smp_mb__after_clear_bit();
227 		__xprt_lock_write_next(xprt);
228 	}
229 }
230 
231 static inline void
232 xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
233 {
234 	spin_lock_bh(&xprt->sock_lock);
235 	__xprt_release_write(xprt, task);
236 	spin_unlock_bh(&xprt->sock_lock);
237 }
238 
239 /*
240  * Write data to socket.
241  */
242 static inline int
243 xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
244 {
245 	struct socket	*sock = xprt->sock;
246 	struct xdr_buf	*xdr = &req->rq_snd_buf;
247 	struct sockaddr *addr = NULL;
248 	int addrlen = 0;
249 	unsigned int	skip;
250 	int		result;
251 
252 	if (!sock)
253 		return -ENOTCONN;
254 
255 	xprt_pktdump("packet data:",
256 				req->rq_svec->iov_base,
257 				req->rq_svec->iov_len);
258 
259 	/* For UDP, we need to provide an address */
260 	if (!xprt->stream) {
261 		addr = (struct sockaddr *) &xprt->addr;
262 		addrlen = sizeof(xprt->addr);
263 	}
264 	/* Dont repeat bytes */
265 	skip = req->rq_bytes_sent;
266 
267 	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
268 	result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT);
269 
270 	dprintk("RPC:      xprt_sendmsg(%d) = %d\n", xdr->len - skip, result);
271 
272 	if (result >= 0)
273 		return result;
274 
275 	switch (result) {
276 	case -ECONNREFUSED:
277 		/* When the server has died, an ICMP port unreachable message
278 		 * prompts ECONNREFUSED.
279 		 */
280 	case -EAGAIN:
281 		break;
282 	case -ECONNRESET:
283 	case -ENOTCONN:
284 	case -EPIPE:
285 		/* connection broken */
286 		if (xprt->stream)
287 			result = -ENOTCONN;
288 		break;
289 	default:
290 		printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
291 	}
292 	return result;
293 }
294 
295 /*
296  * Van Jacobson congestion avoidance. Check if the congestion window
297  * overflowed. Put the task to sleep if this is the case.
298  */
299 static int
300 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
301 {
302 	struct rpc_rqst *req = task->tk_rqstp;
303 
304 	if (req->rq_cong)
305 		return 1;
306 	dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
307 			task->tk_pid, xprt->cong, xprt->cwnd);
308 	if (RPCXPRT_CONGESTED(xprt))
309 		return 0;
310 	req->rq_cong = 1;
311 	xprt->cong += RPC_CWNDSCALE;
312 	return 1;
313 }
314 
315 /*
316  * Adjust the congestion window, and wake up the next task
317  * that has been sleeping due to congestion
318  */
319 static void
320 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
321 {
322 	if (!req->rq_cong)
323 		return;
324 	req->rq_cong = 0;
325 	xprt->cong -= RPC_CWNDSCALE;
326 	__xprt_lock_write_next(xprt);
327 }
328 
329 /*
330  * Adjust RPC congestion window
331  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
332  */
333 static void
334 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
335 {
336 	unsigned long	cwnd;
337 
338 	cwnd = xprt->cwnd;
339 	if (result >= 0 && cwnd <= xprt->cong) {
340 		/* The (cwnd >> 1) term makes sure
341 		 * the result gets rounded properly. */
342 		cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
343 		if (cwnd > RPC_MAXCWND(xprt))
344 			cwnd = RPC_MAXCWND(xprt);
345 		__xprt_lock_write_next(xprt);
346 	} else if (result == -ETIMEDOUT) {
347 		cwnd >>= 1;
348 		if (cwnd < RPC_CWNDSCALE)
349 			cwnd = RPC_CWNDSCALE;
350 	}
351 	dprintk("RPC:      cong %ld, cwnd was %ld, now %ld\n",
352 			xprt->cong, xprt->cwnd, cwnd);
353 	xprt->cwnd = cwnd;
354 }
355 
356 /*
357  * Reset the major timeout value
358  */
359 static void xprt_reset_majortimeo(struct rpc_rqst *req)
360 {
361 	struct rpc_timeout *to = &req->rq_xprt->timeout;
362 
363 	req->rq_majortimeo = req->rq_timeout;
364 	if (to->to_exponential)
365 		req->rq_majortimeo <<= to->to_retries;
366 	else
367 		req->rq_majortimeo += to->to_increment * to->to_retries;
368 	if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
369 		req->rq_majortimeo = to->to_maxval;
370 	req->rq_majortimeo += jiffies;
371 }
372 
373 /*
374  * Adjust timeout values etc for next retransmit
375  */
376 int xprt_adjust_timeout(struct rpc_rqst *req)
377 {
378 	struct rpc_xprt *xprt = req->rq_xprt;
379 	struct rpc_timeout *to = &xprt->timeout;
380 	int status = 0;
381 
382 	if (time_before(jiffies, req->rq_majortimeo)) {
383 		if (to->to_exponential)
384 			req->rq_timeout <<= 1;
385 		else
386 			req->rq_timeout += to->to_increment;
387 		if (to->to_maxval && req->rq_timeout >= to->to_maxval)
388 			req->rq_timeout = to->to_maxval;
389 		req->rq_retries++;
390 		pprintk("RPC: %lu retrans\n", jiffies);
391 	} else {
392 		req->rq_timeout = to->to_initval;
393 		req->rq_retries = 0;
394 		xprt_reset_majortimeo(req);
395 		/* Reset the RTT counters == "slow start" */
396 		spin_lock_bh(&xprt->sock_lock);
397 		rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
398 		spin_unlock_bh(&xprt->sock_lock);
399 		pprintk("RPC: %lu timeout\n", jiffies);
400 		status = -ETIMEDOUT;
401 	}
402 
403 	if (req->rq_timeout == 0) {
404 		printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
405 		req->rq_timeout = 5 * HZ;
406 	}
407 	return status;
408 }
409 
410 /*
411  * Close down a transport socket
412  */
413 static void
414 xprt_close(struct rpc_xprt *xprt)
415 {
416 	struct socket	*sock = xprt->sock;
417 	struct sock	*sk = xprt->inet;
418 
419 	if (!sk)
420 		return;
421 
422 	write_lock_bh(&sk->sk_callback_lock);
423 	xprt->inet = NULL;
424 	xprt->sock = NULL;
425 
426 	sk->sk_user_data    = NULL;
427 	sk->sk_data_ready   = xprt->old_data_ready;
428 	sk->sk_state_change = xprt->old_state_change;
429 	sk->sk_write_space  = xprt->old_write_space;
430 	write_unlock_bh(&sk->sk_callback_lock);
431 
432 	sk->sk_no_check	 = 0;
433 
434 	sock_release(sock);
435 }
436 
437 static void
438 xprt_socket_autoclose(void *args)
439 {
440 	struct rpc_xprt *xprt = (struct rpc_xprt *)args;
441 
442 	xprt_disconnect(xprt);
443 	xprt_close(xprt);
444 	xprt_release_write(xprt, NULL);
445 }
446 
447 /*
448  * Mark a transport as disconnected
449  */
450 static void
451 xprt_disconnect(struct rpc_xprt *xprt)
452 {
453 	dprintk("RPC:      disconnected transport %p\n", xprt);
454 	spin_lock_bh(&xprt->sock_lock);
455 	xprt_clear_connected(xprt);
456 	rpc_wake_up_status(&xprt->pending, -ENOTCONN);
457 	spin_unlock_bh(&xprt->sock_lock);
458 }
459 
460 /*
461  * Used to allow disconnection when we've been idle
462  */
463 static void
464 xprt_init_autodisconnect(unsigned long data)
465 {
466 	struct rpc_xprt *xprt = (struct rpc_xprt *)data;
467 
468 	spin_lock(&xprt->sock_lock);
469 	if (!list_empty(&xprt->recv) || xprt->shutdown)
470 		goto out_abort;
471 	if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate))
472 		goto out_abort;
473 	spin_unlock(&xprt->sock_lock);
474 	/* Let keventd close the socket */
475 	if (test_bit(XPRT_CONNECTING, &xprt->sockstate) != 0)
476 		xprt_release_write(xprt, NULL);
477 	else
478 		schedule_work(&xprt->task_cleanup);
479 	return;
480 out_abort:
481 	spin_unlock(&xprt->sock_lock);
482 }
483 
484 static void xprt_socket_connect(void *args)
485 {
486 	struct rpc_xprt *xprt = (struct rpc_xprt *)args;
487 	struct socket *sock = xprt->sock;
488 	int status = -EIO;
489 
490 	if (xprt->shutdown || xprt->addr.sin_port == 0)
491 		goto out;
492 
493 	/*
494 	 * Start by resetting any existing state
495 	 */
496 	xprt_close(xprt);
497 	sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport);
498 	if (sock == NULL) {
499 		/* couldn't create socket or bind to reserved port;
500 		 * this is likely a permanent error, so cause an abort */
501 		goto out;
502 	}
503 	xprt_bind_socket(xprt, sock);
504 	xprt_sock_setbufsize(xprt);
505 
506 	status = 0;
507 	if (!xprt->stream)
508 		goto out;
509 
510 	/*
511 	 * Tell the socket layer to start connecting...
512 	 */
513 	status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
514 			sizeof(xprt->addr), O_NONBLOCK);
515 	dprintk("RPC: %p  connect status %d connected %d sock state %d\n",
516 			xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
517 	if (status < 0) {
518 		switch (status) {
519 			case -EINPROGRESS:
520 			case -EALREADY:
521 				goto out_clear;
522 		}
523 	}
524 out:
525 	if (status < 0)
526 		rpc_wake_up_status(&xprt->pending, status);
527 	else
528 		rpc_wake_up(&xprt->pending);
529 out_clear:
530 	smp_mb__before_clear_bit();
531 	clear_bit(XPRT_CONNECTING, &xprt->sockstate);
532 	smp_mb__after_clear_bit();
533 }
534 
535 /*
536  * Attempt to connect a TCP socket.
537  *
538  */
539 void xprt_connect(struct rpc_task *task)
540 {
541 	struct rpc_xprt	*xprt = task->tk_xprt;
542 
543 	dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid,
544 			xprt, (xprt_connected(xprt) ? "is" : "is not"));
545 
546 	if (xprt->shutdown) {
547 		task->tk_status = -EIO;
548 		return;
549 	}
550 	if (!xprt->addr.sin_port) {
551 		task->tk_status = -EIO;
552 		return;
553 	}
554 	if (!xprt_lock_write(xprt, task))
555 		return;
556 	if (xprt_connected(xprt))
557 		goto out_write;
558 
559 	if (task->tk_rqstp)
560 		task->tk_rqstp->rq_bytes_sent = 0;
561 
562 	task->tk_timeout = RPC_CONNECT_TIMEOUT;
563 	rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
564 	if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) {
565 		/* Note: if we are here due to a dropped connection
566 		 * 	 we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ
567 		 * 	 seconds
568 		 */
569 		if (xprt->sock != NULL)
570 			schedule_delayed_work(&xprt->sock_connect,
571 					RPC_REESTABLISH_TIMEOUT);
572 		else {
573 			schedule_work(&xprt->sock_connect);
574 			if (!RPC_IS_ASYNC(task))
575 				flush_scheduled_work();
576 		}
577 	}
578 	return;
579  out_write:
580 	xprt_release_write(xprt, task);
581 }
582 
583 /*
584  * We arrive here when awoken from waiting on connection establishment.
585  */
586 static void
587 xprt_connect_status(struct rpc_task *task)
588 {
589 	struct rpc_xprt	*xprt = task->tk_xprt;
590 
591 	if (task->tk_status >= 0) {
592 		dprintk("RPC: %4d xprt_connect_status: connection established\n",
593 				task->tk_pid);
594 		return;
595 	}
596 
597 	/* if soft mounted, just cause this RPC to fail */
598 	if (RPC_IS_SOFT(task))
599 		task->tk_status = -EIO;
600 
601 	switch (task->tk_status) {
602 	case -ECONNREFUSED:
603 	case -ECONNRESET:
604 	case -ENOTCONN:
605 		return;
606 	case -ETIMEDOUT:
607 		dprintk("RPC: %4d xprt_connect_status: timed out\n",
608 				task->tk_pid);
609 		break;
610 	default:
611 		printk(KERN_ERR "RPC: error %d connecting to server %s\n",
612 				-task->tk_status, task->tk_client->cl_server);
613 	}
614 	xprt_release_write(xprt, task);
615 }
616 
617 /*
618  * Look up the RPC request corresponding to a reply, and then lock it.
619  */
620 static inline struct rpc_rqst *
621 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
622 {
623 	struct list_head *pos;
624 	struct rpc_rqst	*req = NULL;
625 
626 	list_for_each(pos, &xprt->recv) {
627 		struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
628 		if (entry->rq_xid == xid) {
629 			req = entry;
630 			break;
631 		}
632 	}
633 	return req;
634 }
635 
636 /*
637  * Complete reply received.
638  * The TCP code relies on us to remove the request from xprt->pending.
639  */
640 static void
641 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
642 {
643 	struct rpc_task	*task = req->rq_task;
644 	struct rpc_clnt *clnt = task->tk_client;
645 
646 	/* Adjust congestion window */
647 	if (!xprt->nocong) {
648 		unsigned timer = task->tk_msg.rpc_proc->p_timer;
649 		xprt_adjust_cwnd(xprt, copied);
650 		__xprt_put_cong(xprt, req);
651 		if (timer) {
652 			if (req->rq_ntrans == 1)
653 				rpc_update_rtt(clnt->cl_rtt, timer,
654 						(long)jiffies - req->rq_xtime);
655 			rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1);
656 		}
657 	}
658 
659 #ifdef RPC_PROFILE
660 	/* Profile only reads for now */
661 	if (copied > 1024) {
662 		static unsigned long	nextstat;
663 		static unsigned long	pkt_rtt, pkt_len, pkt_cnt;
664 
665 		pkt_cnt++;
666 		pkt_len += req->rq_slen + copied;
667 		pkt_rtt += jiffies - req->rq_xtime;
668 		if (time_before(nextstat, jiffies)) {
669 			printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
670 			printk("RPC: %ld %ld %ld %ld stat\n",
671 					jiffies, pkt_cnt, pkt_len, pkt_rtt);
672 			pkt_rtt = pkt_len = pkt_cnt = 0;
673 			nextstat = jiffies + 5 * HZ;
674 		}
675 	}
676 #endif
677 
678 	dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
679 	list_del_init(&req->rq_list);
680 	req->rq_received = req->rq_private_buf.len = copied;
681 
682 	/* ... and wake up the process. */
683 	rpc_wake_up_task(task);
684 	return;
685 }
686 
687 static size_t
688 skb_read_bits(skb_reader_t *desc, void *to, size_t len)
689 {
690 	if (len > desc->count)
691 		len = desc->count;
692 	if (skb_copy_bits(desc->skb, desc->offset, to, len))
693 		return 0;
694 	desc->count -= len;
695 	desc->offset += len;
696 	return len;
697 }
698 
699 static size_t
700 skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len)
701 {
702 	unsigned int csum2, pos;
703 
704 	if (len > desc->count)
705 		len = desc->count;
706 	pos = desc->offset;
707 	csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);
708 	desc->csum = csum_block_add(desc->csum, csum2, pos);
709 	desc->count -= len;
710 	desc->offset += len;
711 	return len;
712 }
713 
714 /*
715  * We have set things up such that we perform the checksum of the UDP
716  * packet in parallel with the copies into the RPC client iovec.  -DaveM
717  */
718 int
719 csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
720 {
721 	skb_reader_t desc;
722 
723 	desc.skb = skb;
724 	desc.offset = sizeof(struct udphdr);
725 	desc.count = skb->len - desc.offset;
726 
727 	if (skb->ip_summed == CHECKSUM_UNNECESSARY)
728 		goto no_checksum;
729 
730 	desc.csum = csum_partial(skb->data, desc.offset, skb->csum);
731 	if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits) < 0)
732 		return -1;
733 	if (desc.offset != skb->len) {
734 		unsigned int csum2;
735 		csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);
736 		desc.csum = csum_block_add(desc.csum, csum2, desc.offset);
737 	}
738 	if (desc.count)
739 		return -1;
740 	if ((unsigned short)csum_fold(desc.csum))
741 		return -1;
742 	return 0;
743 no_checksum:
744 	if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits) < 0)
745 		return -1;
746 	if (desc.count)
747 		return -1;
748 	return 0;
749 }
750 
751 /*
752  * Input handler for RPC replies. Called from a bottom half and hence
753  * atomic.
754  */
755 static void
756 udp_data_ready(struct sock *sk, int len)
757 {
758 	struct rpc_task	*task;
759 	struct rpc_xprt	*xprt;
760 	struct rpc_rqst *rovr;
761 	struct sk_buff	*skb;
762 	int err, repsize, copied;
763 	u32 _xid, *xp;
764 
765 	read_lock(&sk->sk_callback_lock);
766 	dprintk("RPC:      udp_data_ready...\n");
767 	if (!(xprt = xprt_from_sock(sk))) {
768 		printk("RPC:      udp_data_ready request not found!\n");
769 		goto out;
770 	}
771 
772 	dprintk("RPC:      udp_data_ready client %p\n", xprt);
773 
774 	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
775 		goto out;
776 
777 	if (xprt->shutdown)
778 		goto dropit;
779 
780 	repsize = skb->len - sizeof(struct udphdr);
781 	if (repsize < 4) {
782 		printk("RPC: impossible RPC reply size %d!\n", repsize);
783 		goto dropit;
784 	}
785 
786 	/* Copy the XID from the skb... */
787 	xp = skb_header_pointer(skb, sizeof(struct udphdr),
788 				sizeof(_xid), &_xid);
789 	if (xp == NULL)
790 		goto dropit;
791 
792 	/* Look up and lock the request corresponding to the given XID */
793 	spin_lock(&xprt->sock_lock);
794 	rovr = xprt_lookup_rqst(xprt, *xp);
795 	if (!rovr)
796 		goto out_unlock;
797 	task = rovr->rq_task;
798 
799 	dprintk("RPC: %4d received reply\n", task->tk_pid);
800 
801 	if ((copied = rovr->rq_private_buf.buflen) > repsize)
802 		copied = repsize;
803 
804 	/* Suck it into the iovec, verify checksum if not done by hw. */
805 	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
806 		goto out_unlock;
807 
808 	/* Something worked... */
809 	dst_confirm(skb->dst);
810 
811 	xprt_complete_rqst(xprt, rovr, copied);
812 
813  out_unlock:
814 	spin_unlock(&xprt->sock_lock);
815  dropit:
816 	skb_free_datagram(sk, skb);
817  out:
818 	read_unlock(&sk->sk_callback_lock);
819 }
820 
821 /*
822  * Copy from an skb into memory and shrink the skb.
823  */
824 static inline size_t
825 tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
826 {
827 	if (len > desc->count)
828 		len = desc->count;
829 	if (skb_copy_bits(desc->skb, desc->offset, p, len)) {
830 		dprintk("RPC:      failed to copy %zu bytes from skb. %zu bytes remain\n",
831 				len, desc->count);
832 		return 0;
833 	}
834 	desc->offset += len;
835 	desc->count -= len;
836 	dprintk("RPC:      copied %zu bytes from skb. %zu bytes remain\n",
837 			len, desc->count);
838 	return len;
839 }
840 
841 /*
842  * TCP read fragment marker
843  */
844 static inline void
845 tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
846 {
847 	size_t len, used;
848 	char *p;
849 
850 	p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
851 	len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
852 	used = tcp_copy_data(desc, p, len);
853 	xprt->tcp_offset += used;
854 	if (used != len)
855 		return;
856 	xprt->tcp_reclen = ntohl(xprt->tcp_recm);
857 	if (xprt->tcp_reclen & 0x80000000)
858 		xprt->tcp_flags |= XPRT_LAST_FRAG;
859 	else
860 		xprt->tcp_flags &= ~XPRT_LAST_FRAG;
861 	xprt->tcp_reclen &= 0x7fffffff;
862 	xprt->tcp_flags &= ~XPRT_COPY_RECM;
863 	xprt->tcp_offset = 0;
864 	/* Sanity check of the record length */
865 	if (xprt->tcp_reclen < 4) {
866 		printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
867 		xprt_disconnect(xprt);
868 	}
869 	dprintk("RPC:      reading TCP record fragment of length %d\n",
870 			xprt->tcp_reclen);
871 }
872 
873 static void
874 tcp_check_recm(struct rpc_xprt *xprt)
875 {
876 	dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
877 			xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags);
878 	if (xprt->tcp_offset == xprt->tcp_reclen) {
879 		xprt->tcp_flags |= XPRT_COPY_RECM;
880 		xprt->tcp_offset = 0;
881 		if (xprt->tcp_flags & XPRT_LAST_FRAG) {
882 			xprt->tcp_flags &= ~XPRT_COPY_DATA;
883 			xprt->tcp_flags |= XPRT_COPY_XID;
884 			xprt->tcp_copied = 0;
885 		}
886 	}
887 }
888 
889 /*
890  * TCP read xid
891  */
892 static inline void
893 tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
894 {
895 	size_t len, used;
896 	char *p;
897 
898 	len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
899 	dprintk("RPC:      reading XID (%Zu bytes)\n", len);
900 	p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
901 	used = tcp_copy_data(desc, p, len);
902 	xprt->tcp_offset += used;
903 	if (used != len)
904 		return;
905 	xprt->tcp_flags &= ~XPRT_COPY_XID;
906 	xprt->tcp_flags |= XPRT_COPY_DATA;
907 	xprt->tcp_copied = 4;
908 	dprintk("RPC:      reading reply for XID %08x\n",
909 						ntohl(xprt->tcp_xid));
910 	tcp_check_recm(xprt);
911 }
912 
913 /*
914  * TCP read and complete request
915  */
916 static inline void
917 tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
918 {
919 	struct rpc_rqst *req;
920 	struct xdr_buf *rcvbuf;
921 	size_t len;
922 	ssize_t r;
923 
924 	/* Find and lock the request corresponding to this xid */
925 	spin_lock(&xprt->sock_lock);
926 	req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
927 	if (!req) {
928 		xprt->tcp_flags &= ~XPRT_COPY_DATA;
929 		dprintk("RPC:      XID %08x request not found!\n",
930 				ntohl(xprt->tcp_xid));
931 		spin_unlock(&xprt->sock_lock);
932 		return;
933 	}
934 
935 	rcvbuf = &req->rq_private_buf;
936 	len = desc->count;
937 	if (len > xprt->tcp_reclen - xprt->tcp_offset) {
938 		skb_reader_t my_desc;
939 
940 		len = xprt->tcp_reclen - xprt->tcp_offset;
941 		memcpy(&my_desc, desc, sizeof(my_desc));
942 		my_desc.count = len;
943 		r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
944 					  &my_desc, tcp_copy_data);
945 		desc->count -= r;
946 		desc->offset += r;
947 	} else
948 		r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
949 					  desc, tcp_copy_data);
950 
951 	if (r > 0) {
952 		xprt->tcp_copied += r;
953 		xprt->tcp_offset += r;
954 	}
955 	if (r != len) {
956 		/* Error when copying to the receive buffer,
957 		 * usually because we weren't able to allocate
958 		 * additional buffer pages. All we can do now
959 		 * is turn off XPRT_COPY_DATA, so the request
960 		 * will not receive any additional updates,
961 		 * and time out.
962 		 * Any remaining data from this record will
963 		 * be discarded.
964 		 */
965 		xprt->tcp_flags &= ~XPRT_COPY_DATA;
966 		dprintk("RPC:      XID %08x truncated request\n",
967 				ntohl(xprt->tcp_xid));
968 		dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
969 				xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen);
970 		goto out;
971 	}
972 
973 	dprintk("RPC:      XID %08x read %Zd bytes\n",
974 			ntohl(xprt->tcp_xid), r);
975 	dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
976 			xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen);
977 
978 	if (xprt->tcp_copied == req->rq_private_buf.buflen)
979 		xprt->tcp_flags &= ~XPRT_COPY_DATA;
980 	else if (xprt->tcp_offset == xprt->tcp_reclen) {
981 		if (xprt->tcp_flags & XPRT_LAST_FRAG)
982 			xprt->tcp_flags &= ~XPRT_COPY_DATA;
983 	}
984 
985 out:
986 	if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
987 		dprintk("RPC: %4d received reply complete\n",
988 				req->rq_task->tk_pid);
989 		xprt_complete_rqst(xprt, req, xprt->tcp_copied);
990 	}
991 	spin_unlock(&xprt->sock_lock);
992 	tcp_check_recm(xprt);
993 }
994 
995 /*
996  * TCP discard extra bytes from a short read
997  */
998 static inline void
999 tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
1000 {
1001 	size_t len;
1002 
1003 	len = xprt->tcp_reclen - xprt->tcp_offset;
1004 	if (len > desc->count)
1005 		len = desc->count;
1006 	desc->count -= len;
1007 	desc->offset += len;
1008 	xprt->tcp_offset += len;
1009 	dprintk("RPC:      discarded %Zu bytes\n", len);
1010 	tcp_check_recm(xprt);
1011 }
1012 
1013 /*
1014  * TCP record receive routine
1015  * We first have to grab the record marker, then the XID, then the data.
1016  */
1017 static int
1018 tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
1019 		unsigned int offset, size_t len)
1020 {
1021 	struct rpc_xprt *xprt = rd_desc->arg.data;
1022 	skb_reader_t desc = {
1023 		.skb	= skb,
1024 		.offset	= offset,
1025 		.count	= len,
1026 		.csum	= 0
1027        	};
1028 
1029 	dprintk("RPC:      tcp_data_recv\n");
1030 	do {
1031 		/* Read in a new fragment marker if necessary */
1032 		/* Can we ever really expect to get completely empty fragments? */
1033 		if (xprt->tcp_flags & XPRT_COPY_RECM) {
1034 			tcp_read_fraghdr(xprt, &desc);
1035 			continue;
1036 		}
1037 		/* Read in the xid if necessary */
1038 		if (xprt->tcp_flags & XPRT_COPY_XID) {
1039 			tcp_read_xid(xprt, &desc);
1040 			continue;
1041 		}
1042 		/* Read in the request data */
1043 		if (xprt->tcp_flags & XPRT_COPY_DATA) {
1044 			tcp_read_request(xprt, &desc);
1045 			continue;
1046 		}
1047 		/* Skip over any trailing bytes on short reads */
1048 		tcp_read_discard(xprt, &desc);
1049 	} while (desc.count);
1050 	dprintk("RPC:      tcp_data_recv done\n");
1051 	return len - desc.count;
1052 }
1053 
1054 static void tcp_data_ready(struct sock *sk, int bytes)
1055 {
1056 	struct rpc_xprt *xprt;
1057 	read_descriptor_t rd_desc;
1058 
1059 	read_lock(&sk->sk_callback_lock);
1060 	dprintk("RPC:      tcp_data_ready...\n");
1061 	if (!(xprt = xprt_from_sock(sk))) {
1062 		printk("RPC:      tcp_data_ready socket info not found!\n");
1063 		goto out;
1064 	}
1065 	if (xprt->shutdown)
1066 		goto out;
1067 
1068 	/* We use rd_desc to pass struct xprt to tcp_data_recv */
1069 	rd_desc.arg.data = xprt;
1070 	rd_desc.count = 65536;
1071 	tcp_read_sock(sk, &rd_desc, tcp_data_recv);
1072 out:
1073 	read_unlock(&sk->sk_callback_lock);
1074 }
1075 
1076 static void
1077 tcp_state_change(struct sock *sk)
1078 {
1079 	struct rpc_xprt	*xprt;
1080 
1081 	read_lock(&sk->sk_callback_lock);
1082 	if (!(xprt = xprt_from_sock(sk)))
1083 		goto out;
1084 	dprintk("RPC:      tcp_state_change client %p...\n", xprt);
1085 	dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
1086 				sk->sk_state, xprt_connected(xprt),
1087 				sock_flag(sk, SOCK_DEAD),
1088 				sock_flag(sk, SOCK_ZAPPED));
1089 
1090 	switch (sk->sk_state) {
1091 	case TCP_ESTABLISHED:
1092 		spin_lock_bh(&xprt->sock_lock);
1093 		if (!xprt_test_and_set_connected(xprt)) {
1094 			/* Reset TCP record info */
1095 			xprt->tcp_offset = 0;
1096 			xprt->tcp_reclen = 0;
1097 			xprt->tcp_copied = 0;
1098 			xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
1099 			rpc_wake_up(&xprt->pending);
1100 		}
1101 		spin_unlock_bh(&xprt->sock_lock);
1102 		break;
1103 	case TCP_SYN_SENT:
1104 	case TCP_SYN_RECV:
1105 		break;
1106 	default:
1107 		xprt_disconnect(xprt);
1108 		break;
1109 	}
1110  out:
1111 	read_unlock(&sk->sk_callback_lock);
1112 }
1113 
1114 /*
1115  * Called when more output buffer space is available for this socket.
1116  * We try not to wake our writers until they can make "significant"
1117  * progress, otherwise we'll waste resources thrashing sock_sendmsg
1118  * with a bunch of small requests.
1119  */
1120 static void
1121 xprt_write_space(struct sock *sk)
1122 {
1123 	struct rpc_xprt	*xprt;
1124 	struct socket	*sock;
1125 
1126 	read_lock(&sk->sk_callback_lock);
1127 	if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket))
1128 		goto out;
1129 	if (xprt->shutdown)
1130 		goto out;
1131 
1132 	/* Wait until we have enough socket memory */
1133 	if (xprt->stream) {
1134 		/* from net/core/stream.c:sk_stream_write_space */
1135 		if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk))
1136 			goto out;
1137 	} else {
1138 		/* from net/core/sock.c:sock_def_write_space */
1139 		if (!sock_writeable(sk))
1140 			goto out;
1141 	}
1142 
1143 	if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
1144 		goto out;
1145 
1146 	spin_lock_bh(&xprt->sock_lock);
1147 	if (xprt->snd_task)
1148 		rpc_wake_up_task(xprt->snd_task);
1149 	spin_unlock_bh(&xprt->sock_lock);
1150 out:
1151 	read_unlock(&sk->sk_callback_lock);
1152 }
1153 
1154 /*
1155  * RPC receive timeout handler.
1156  */
1157 static void
1158 xprt_timer(struct rpc_task *task)
1159 {
1160 	struct rpc_rqst	*req = task->tk_rqstp;
1161 	struct rpc_xprt *xprt = req->rq_xprt;
1162 
1163 	spin_lock(&xprt->sock_lock);
1164 	if (req->rq_received)
1165 		goto out;
1166 
1167 	xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
1168 	__xprt_put_cong(xprt, req);
1169 
1170 	dprintk("RPC: %4d xprt_timer (%s request)\n",
1171 		task->tk_pid, req ? "pending" : "backlogged");
1172 
1173 	task->tk_status  = -ETIMEDOUT;
1174 out:
1175 	task->tk_timeout = 0;
1176 	rpc_wake_up_task(task);
1177 	spin_unlock(&xprt->sock_lock);
1178 }
1179 
1180 /*
1181  * Place the actual RPC call.
1182  * We have to copy the iovec because sendmsg fiddles with its contents.
1183  */
1184 int
1185 xprt_prepare_transmit(struct rpc_task *task)
1186 {
1187 	struct rpc_rqst	*req = task->tk_rqstp;
1188 	struct rpc_xprt	*xprt = req->rq_xprt;
1189 	int err = 0;
1190 
1191 	dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid);
1192 
1193 	if (xprt->shutdown)
1194 		return -EIO;
1195 
1196 	spin_lock_bh(&xprt->sock_lock);
1197 	if (req->rq_received && !req->rq_bytes_sent) {
1198 		err = req->rq_received;
1199 		goto out_unlock;
1200 	}
1201 	if (!__xprt_lock_write(xprt, task)) {
1202 		err = -EAGAIN;
1203 		goto out_unlock;
1204 	}
1205 
1206 	if (!xprt_connected(xprt)) {
1207 		err = -ENOTCONN;
1208 		goto out_unlock;
1209 	}
1210 out_unlock:
1211 	spin_unlock_bh(&xprt->sock_lock);
1212 	return err;
1213 }
1214 
1215 void
1216 xprt_transmit(struct rpc_task *task)
1217 {
1218 	struct rpc_clnt *clnt = task->tk_client;
1219 	struct rpc_rqst	*req = task->tk_rqstp;
1220 	struct rpc_xprt	*xprt = req->rq_xprt;
1221 	int status, retry = 0;
1222 
1223 
1224 	dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
1225 
1226 	/* set up everything as needed. */
1227 	/* Write the record marker */
1228 	if (xprt->stream) {
1229 		u32	*marker = req->rq_svec[0].iov_base;
1230 
1231 		*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1232 	}
1233 
1234 	smp_rmb();
1235 	if (!req->rq_received) {
1236 		if (list_empty(&req->rq_list)) {
1237 			spin_lock_bh(&xprt->sock_lock);
1238 			/* Update the softirq receive buffer */
1239 			memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1240 					sizeof(req->rq_private_buf));
1241 			/* Add request to the receive list */
1242 			list_add_tail(&req->rq_list, &xprt->recv);
1243 			spin_unlock_bh(&xprt->sock_lock);
1244 			xprt_reset_majortimeo(req);
1245 			/* Turn off autodisconnect */
1246 			del_singleshot_timer_sync(&xprt->timer);
1247 		}
1248 	} else if (!req->rq_bytes_sent)
1249 		return;
1250 
1251 	/* Continue transmitting the packet/record. We must be careful
1252 	 * to cope with writespace callbacks arriving _after_ we have
1253 	 * called xprt_sendmsg().
1254 	 */
1255 	while (1) {
1256 		req->rq_xtime = jiffies;
1257 		status = xprt_sendmsg(xprt, req);
1258 
1259 		if (status < 0)
1260 			break;
1261 
1262 		if (xprt->stream) {
1263 			req->rq_bytes_sent += status;
1264 
1265 			/* If we've sent the entire packet, immediately
1266 			 * reset the count of bytes sent. */
1267 			if (req->rq_bytes_sent >= req->rq_slen) {
1268 				req->rq_bytes_sent = 0;
1269 				goto out_receive;
1270 			}
1271 		} else {
1272 			if (status >= req->rq_slen)
1273 				goto out_receive;
1274 			status = -EAGAIN;
1275 			break;
1276 		}
1277 
1278 		dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1279 				task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1280 				req->rq_slen);
1281 
1282 		status = -EAGAIN;
1283 		if (retry++ > 50)
1284 			break;
1285 	}
1286 
1287 	/* Note: at this point, task->tk_sleeping has not yet been set,
1288 	 *	 hence there is no danger of the waking up task being put on
1289 	 *	 schedq, and being picked up by a parallel run of rpciod().
1290 	 */
1291 	task->tk_status = status;
1292 
1293 	switch (status) {
1294 	case -EAGAIN:
1295 		if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
1296 			/* Protect against races with xprt_write_space */
1297 			spin_lock_bh(&xprt->sock_lock);
1298 			/* Don't race with disconnect */
1299 			if (!xprt_connected(xprt))
1300 				task->tk_status = -ENOTCONN;
1301 			else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
1302 				task->tk_timeout = req->rq_timeout;
1303 				rpc_sleep_on(&xprt->pending, task, NULL, NULL);
1304 			}
1305 			spin_unlock_bh(&xprt->sock_lock);
1306 			return;
1307 		}
1308 		/* Keep holding the socket if it is blocked */
1309 		rpc_delay(task, HZ>>4);
1310 		return;
1311 	case -ECONNREFUSED:
1312 		task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
1313 		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1314 	case -ENOTCONN:
1315 		return;
1316 	default:
1317 		if (xprt->stream)
1318 			xprt_disconnect(xprt);
1319 	}
1320 	xprt_release_write(xprt, task);
1321 	return;
1322  out_receive:
1323 	dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1324 	/* Set the task's receive timeout value */
1325 	spin_lock_bh(&xprt->sock_lock);
1326 	if (!xprt->nocong) {
1327 		int timer = task->tk_msg.rpc_proc->p_timer;
1328 		task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer);
1329 		task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries;
1330 		if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0)
1331 			task->tk_timeout = xprt->timeout.to_maxval;
1332 	} else
1333 		task->tk_timeout = req->rq_timeout;
1334 	/* Don't race with disconnect */
1335 	if (!xprt_connected(xprt))
1336 		task->tk_status = -ENOTCONN;
1337 	else if (!req->rq_received)
1338 		rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
1339 	__xprt_release_write(xprt, task);
1340 	spin_unlock_bh(&xprt->sock_lock);
1341 }
1342 
1343 /*
1344  * Reserve an RPC call slot.
1345  */
1346 static inline void
1347 do_xprt_reserve(struct rpc_task *task)
1348 {
1349 	struct rpc_xprt	*xprt = task->tk_xprt;
1350 
1351 	task->tk_status = 0;
1352 	if (task->tk_rqstp)
1353 		return;
1354 	if (!list_empty(&xprt->free)) {
1355 		struct rpc_rqst	*req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
1356 		list_del_init(&req->rq_list);
1357 		task->tk_rqstp = req;
1358 		xprt_request_init(task, xprt);
1359 		return;
1360 	}
1361 	dprintk("RPC:      waiting for request slot\n");
1362 	task->tk_status = -EAGAIN;
1363 	task->tk_timeout = 0;
1364 	rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1365 }
1366 
1367 void
1368 xprt_reserve(struct rpc_task *task)
1369 {
1370 	struct rpc_xprt	*xprt = task->tk_xprt;
1371 
1372 	task->tk_status = -EIO;
1373 	if (!xprt->shutdown) {
1374 		spin_lock(&xprt->xprt_lock);
1375 		do_xprt_reserve(task);
1376 		spin_unlock(&xprt->xprt_lock);
1377 	}
1378 }
1379 
1380 /*
1381  * Allocate a 'unique' XID
1382  */
1383 static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt)
1384 {
1385 	return xprt->xid++;
1386 }
1387 
1388 static inline void xprt_init_xid(struct rpc_xprt *xprt)
1389 {
1390 	get_random_bytes(&xprt->xid, sizeof(xprt->xid));
1391 }
1392 
1393 /*
1394  * Initialize RPC request
1395  */
1396 static void
1397 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1398 {
1399 	struct rpc_rqst	*req = task->tk_rqstp;
1400 
1401 	req->rq_timeout = xprt->timeout.to_initval;
1402 	req->rq_task	= task;
1403 	req->rq_xprt    = xprt;
1404 	req->rq_xid     = xprt_alloc_xid(xprt);
1405 	dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
1406 			req, ntohl(req->rq_xid));
1407 }
1408 
1409 /*
1410  * Release an RPC call slot
1411  */
1412 void
1413 xprt_release(struct rpc_task *task)
1414 {
1415 	struct rpc_xprt	*xprt = task->tk_xprt;
1416 	struct rpc_rqst	*req;
1417 
1418 	if (!(req = task->tk_rqstp))
1419 		return;
1420 	spin_lock_bh(&xprt->sock_lock);
1421 	__xprt_release_write(xprt, task);
1422 	__xprt_put_cong(xprt, req);
1423 	if (!list_empty(&req->rq_list))
1424 		list_del(&req->rq_list);
1425 	xprt->last_used = jiffies;
1426 	if (list_empty(&xprt->recv) && !xprt->shutdown)
1427 		mod_timer(&xprt->timer, xprt->last_used + XPRT_IDLE_TIMEOUT);
1428 	spin_unlock_bh(&xprt->sock_lock);
1429 	task->tk_rqstp = NULL;
1430 	memset(req, 0, sizeof(*req));	/* mark unused */
1431 
1432 	dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1433 
1434 	spin_lock(&xprt->xprt_lock);
1435 	list_add(&req->rq_list, &xprt->free);
1436 	xprt_clear_backlog(xprt);
1437 	spin_unlock(&xprt->xprt_lock);
1438 }
1439 
1440 /*
1441  * Set default timeout parameters
1442  */
1443 static void
1444 xprt_default_timeout(struct rpc_timeout *to, int proto)
1445 {
1446 	if (proto == IPPROTO_UDP)
1447 		xprt_set_timeout(to, 5,  5 * HZ);
1448 	else
1449 		xprt_set_timeout(to, 5, 60 * HZ);
1450 }
1451 
1452 /*
1453  * Set constant timeout
1454  */
1455 void
1456 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1457 {
1458 	to->to_initval   =
1459 	to->to_increment = incr;
1460 	to->to_maxval    = incr * retr;
1461 	to->to_retries   = retr;
1462 	to->to_exponential = 0;
1463 }
1464 
1465 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
1466 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
1467 
1468 /*
1469  * Initialize an RPC client
1470  */
1471 static struct rpc_xprt *
1472 xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
1473 {
1474 	struct rpc_xprt	*xprt;
1475 	unsigned int entries;
1476 	size_t slot_table_size;
1477 	struct rpc_rqst	*req;
1478 
1479 	dprintk("RPC:      setting up %s transport...\n",
1480 				proto == IPPROTO_UDP? "UDP" : "TCP");
1481 
1482 	entries = (proto == IPPROTO_TCP)?
1483 		xprt_tcp_slot_table_entries : xprt_udp_slot_table_entries;
1484 
1485 	if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1486 		return ERR_PTR(-ENOMEM);
1487 	memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1488 	xprt->max_reqs = entries;
1489 	slot_table_size = entries * sizeof(xprt->slot[0]);
1490 	xprt->slot = kmalloc(slot_table_size, GFP_KERNEL);
1491 	if (xprt->slot == NULL) {
1492 		kfree(xprt);
1493 		return ERR_PTR(-ENOMEM);
1494 	}
1495 	memset(xprt->slot, 0, slot_table_size);
1496 
1497 	xprt->addr = *ap;
1498 	xprt->prot = proto;
1499 	xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1500 	if (xprt->stream) {
1501 		xprt->cwnd = RPC_MAXCWND(xprt);
1502 		xprt->nocong = 1;
1503 		xprt->max_payload = (1U << 31) - 1;
1504 	} else {
1505 		xprt->cwnd = RPC_INITCWND;
1506 		xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
1507 	}
1508 	spin_lock_init(&xprt->sock_lock);
1509 	spin_lock_init(&xprt->xprt_lock);
1510 	init_waitqueue_head(&xprt->cong_wait);
1511 
1512 	INIT_LIST_HEAD(&xprt->free);
1513 	INIT_LIST_HEAD(&xprt->recv);
1514 	INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt);
1515 	INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt);
1516 	init_timer(&xprt->timer);
1517 	xprt->timer.function = xprt_init_autodisconnect;
1518 	xprt->timer.data = (unsigned long) xprt;
1519 	xprt->last_used = jiffies;
1520 	xprt->port = XPRT_MAX_RESVPORT;
1521 
1522 	/* Set timeout parameters */
1523 	if (to) {
1524 		xprt->timeout = *to;
1525 	} else
1526 		xprt_default_timeout(&xprt->timeout, xprt->prot);
1527 
1528 	rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1529 	rpc_init_wait_queue(&xprt->sending, "xprt_sending");
1530 	rpc_init_wait_queue(&xprt->resend, "xprt_resend");
1531 	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1532 
1533 	/* initialize free list */
1534 	for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--)
1535 		list_add(&req->rq_list, &xprt->free);
1536 
1537 	xprt_init_xid(xprt);
1538 
1539 	/* Check whether we want to use a reserved port */
1540 	xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1541 
1542 	dprintk("RPC:      created transport %p with %u slots\n", xprt,
1543 			xprt->max_reqs);
1544 
1545 	return xprt;
1546 }
1547 
1548 /*
1549  * Bind to a reserved port
1550  */
1551 static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
1552 {
1553 	struct sockaddr_in myaddr = {
1554 		.sin_family = AF_INET,
1555 	};
1556 	int		err, port;
1557 
1558 	/* Were we already bound to a given port? Try to reuse it */
1559 	port = xprt->port;
1560 	do {
1561 		myaddr.sin_port = htons(port);
1562 		err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1563 						sizeof(myaddr));
1564 		if (err == 0) {
1565 			xprt->port = port;
1566 			return 0;
1567 		}
1568 		if (--port == 0)
1569 			port = XPRT_MAX_RESVPORT;
1570 	} while (err == -EADDRINUSE && port != xprt->port);
1571 
1572 	printk("RPC: Can't bind to reserved port (%d).\n", -err);
1573 	return err;
1574 }
1575 
1576 static void
1577 xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1578 {
1579 	struct sock	*sk = sock->sk;
1580 
1581 	if (xprt->inet)
1582 		return;
1583 
1584 	write_lock_bh(&sk->sk_callback_lock);
1585 	sk->sk_user_data = xprt;
1586 	xprt->old_data_ready = sk->sk_data_ready;
1587 	xprt->old_state_change = sk->sk_state_change;
1588 	xprt->old_write_space = sk->sk_write_space;
1589 	if (xprt->prot == IPPROTO_UDP) {
1590 		sk->sk_data_ready = udp_data_ready;
1591 		sk->sk_no_check = UDP_CSUM_NORCV;
1592 		xprt_set_connected(xprt);
1593 	} else {
1594 		tcp_sk(sk)->nonagle = 1;	/* disable Nagle's algorithm */
1595 		sk->sk_data_ready = tcp_data_ready;
1596 		sk->sk_state_change = tcp_state_change;
1597 		xprt_clear_connected(xprt);
1598 	}
1599 	sk->sk_write_space = xprt_write_space;
1600 
1601 	/* Reset to new socket */
1602 	xprt->sock = sock;
1603 	xprt->inet = sk;
1604 	write_unlock_bh(&sk->sk_callback_lock);
1605 
1606 	return;
1607 }
1608 
1609 /*
1610  * Set socket buffer length
1611  */
1612 void
1613 xprt_sock_setbufsize(struct rpc_xprt *xprt)
1614 {
1615 	struct sock *sk = xprt->inet;
1616 
1617 	if (xprt->stream)
1618 		return;
1619 	if (xprt->rcvsize) {
1620 		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1621 		sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs *  2;
1622 	}
1623 	if (xprt->sndsize) {
1624 		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1625 		sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2;
1626 		sk->sk_write_space(sk);
1627 	}
1628 }
1629 
1630 /*
1631  * Datastream sockets are created here, but xprt_connect will create
1632  * and connect stream sockets.
1633  */
1634 static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport)
1635 {
1636 	struct socket	*sock;
1637 	int		type, err;
1638 
1639 	dprintk("RPC:      xprt_create_socket(%s %d)\n",
1640 			   (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1641 
1642 	type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1643 
1644 	if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
1645 		printk("RPC: can't create socket (%d).\n", -err);
1646 		return NULL;
1647 	}
1648 
1649 	/* If the caller has the capability, bind to a reserved port */
1650 	if (resvport && xprt_bindresvport(xprt, sock) < 0) {
1651 		printk("RPC: can't bind to reserved port.\n");
1652 		goto failed;
1653 	}
1654 
1655 	return sock;
1656 
1657 failed:
1658 	sock_release(sock);
1659 	return NULL;
1660 }
1661 
1662 /*
1663  * Create an RPC client transport given the protocol and peer address.
1664  */
1665 struct rpc_xprt *
1666 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1667 {
1668 	struct rpc_xprt	*xprt;
1669 
1670 	xprt = xprt_setup(proto, sap, to);
1671 	if (IS_ERR(xprt))
1672 		dprintk("RPC:      xprt_create_proto failed\n");
1673 	else
1674 		dprintk("RPC:      xprt_create_proto created xprt %p\n", xprt);
1675 	return xprt;
1676 }
1677 
1678 /*
1679  * Prepare for transport shutdown.
1680  */
1681 static void
1682 xprt_shutdown(struct rpc_xprt *xprt)
1683 {
1684 	xprt->shutdown = 1;
1685 	rpc_wake_up(&xprt->sending);
1686 	rpc_wake_up(&xprt->resend);
1687 	rpc_wake_up(&xprt->pending);
1688 	rpc_wake_up(&xprt->backlog);
1689 	wake_up(&xprt->cong_wait);
1690 	del_timer_sync(&xprt->timer);
1691 
1692 	/* synchronously wait for connect worker to finish */
1693 	cancel_delayed_work(&xprt->sock_connect);
1694 	flush_scheduled_work();
1695 }
1696 
1697 /*
1698  * Clear the xprt backlog queue
1699  */
1700 static int
1701 xprt_clear_backlog(struct rpc_xprt *xprt) {
1702 	rpc_wake_up_next(&xprt->backlog);
1703 	wake_up(&xprt->cong_wait);
1704 	return 1;
1705 }
1706 
1707 /*
1708  * Destroy an RPC transport, killing off all requests.
1709  */
1710 int
1711 xprt_destroy(struct rpc_xprt *xprt)
1712 {
1713 	dprintk("RPC:      destroying transport %p\n", xprt);
1714 	xprt_shutdown(xprt);
1715 	xprt_disconnect(xprt);
1716 	xprt_close(xprt);
1717 	kfree(xprt->slot);
1718 	kfree(xprt);
1719 
1720 	return 0;
1721 }
1722