xref: /linux/net/sunrpc/clnt.c (revision 606d099cdd1080bbb50ea50dc52d98252f8f10a1)
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -	RPC header generation and argument serialization.
9  *  -	Credential refresh.
10  *  -	TCP connect handling.
11  *  -	Retry of operation when it is suspected the operation failed because
12  *	of uid squashing on the server, or when the credentials were stale
13  *	and need to be refreshed, or when a packet was damaged in transit.
14  *	This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23 
24 #include <asm/system.h>
25 
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/mm.h>
29 #include <linux/slab.h>
30 #include <linux/smp_lock.h>
31 #include <linux/utsname.h>
32 #include <linux/workqueue.h>
33 
34 #include <linux/sunrpc/clnt.h>
35 #include <linux/sunrpc/rpc_pipe_fs.h>
36 #include <linux/sunrpc/metrics.h>
37 
38 
39 #define RPC_SLACK_SPACE		(1024)	/* total overkill */
40 
41 #ifdef RPC_DEBUG
42 # define RPCDBG_FACILITY	RPCDBG_CALL
43 #endif
44 
45 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
46 
47 
48 static void	call_start(struct rpc_task *task);
49 static void	call_reserve(struct rpc_task *task);
50 static void	call_reserveresult(struct rpc_task *task);
51 static void	call_allocate(struct rpc_task *task);
52 static void	call_encode(struct rpc_task *task);
53 static void	call_decode(struct rpc_task *task);
54 static void	call_bind(struct rpc_task *task);
55 static void	call_bind_status(struct rpc_task *task);
56 static void	call_transmit(struct rpc_task *task);
57 static void	call_status(struct rpc_task *task);
58 static void	call_transmit_status(struct rpc_task *task);
59 static void	call_refresh(struct rpc_task *task);
60 static void	call_refreshresult(struct rpc_task *task);
61 static void	call_timeout(struct rpc_task *task);
62 static void	call_connect(struct rpc_task *task);
63 static void	call_connect_status(struct rpc_task *task);
64 static __be32 *	call_header(struct rpc_task *task);
65 static __be32 *	call_verify(struct rpc_task *task);
66 
67 
68 static int
69 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
70 {
71 	static uint32_t clntid;
72 	int error;
73 
74 	clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
75 	clnt->cl_dentry = ERR_PTR(-ENOENT);
76 	if (dir_name == NULL)
77 		return 0;
78 
79 	clnt->cl_vfsmnt = rpc_get_mount();
80 	if (IS_ERR(clnt->cl_vfsmnt))
81 		return PTR_ERR(clnt->cl_vfsmnt);
82 
83 	for (;;) {
84 		snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
85 				"%s/clnt%x", dir_name,
86 				(unsigned int)clntid++);
87 		clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
88 		clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
89 		if (!IS_ERR(clnt->cl_dentry))
90 			return 0;
91 		error = PTR_ERR(clnt->cl_dentry);
92 		if (error != -EEXIST) {
93 			printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
94 					clnt->cl_pathname, error);
95 			rpc_put_mount();
96 			return error;
97 		}
98 	}
99 }
100 
101 static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, struct rpc_program *program, u32 vers, rpc_authflavor_t flavor)
102 {
103 	struct rpc_version	*version;
104 	struct rpc_clnt		*clnt = NULL;
105 	struct rpc_auth		*auth;
106 	int err;
107 	int len;
108 
109 	dprintk("RPC: creating %s client for %s (xprt %p)\n",
110 		program->name, servname, xprt);
111 
112 	err = -EINVAL;
113 	if (!xprt)
114 		goto out_no_xprt;
115 	if (vers >= program->nrvers || !(version = program->version[vers]))
116 		goto out_err;
117 
118 	err = -ENOMEM;
119 	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
120 	if (!clnt)
121 		goto out_err;
122 	atomic_set(&clnt->cl_users, 0);
123 	atomic_set(&clnt->cl_count, 1);
124 	clnt->cl_parent = clnt;
125 
126 	clnt->cl_server = clnt->cl_inline_name;
127 	len = strlen(servname) + 1;
128 	if (len > sizeof(clnt->cl_inline_name)) {
129 		char *buf = kmalloc(len, GFP_KERNEL);
130 		if (buf != 0)
131 			clnt->cl_server = buf;
132 		else
133 			len = sizeof(clnt->cl_inline_name);
134 	}
135 	strlcpy(clnt->cl_server, servname, len);
136 
137 	clnt->cl_xprt     = xprt;
138 	clnt->cl_procinfo = version->procs;
139 	clnt->cl_maxproc  = version->nrprocs;
140 	clnt->cl_protname = program->name;
141 	clnt->cl_prog     = program->number;
142 	clnt->cl_vers     = version->number;
143 	clnt->cl_stats    = program->stats;
144 	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
145 	err = -ENOMEM;
146 	if (clnt->cl_metrics == NULL)
147 		goto out_no_stats;
148 	clnt->cl_program  = program;
149 
150 	if (!xprt_bound(clnt->cl_xprt))
151 		clnt->cl_autobind = 1;
152 
153 	clnt->cl_rtt = &clnt->cl_rtt_default;
154 	rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
155 
156 	err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
157 	if (err < 0)
158 		goto out_no_path;
159 
160 	auth = rpcauth_create(flavor, clnt);
161 	if (IS_ERR(auth)) {
162 		printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
163 				flavor);
164 		err = PTR_ERR(auth);
165 		goto out_no_auth;
166 	}
167 
168 	/* save the nodename */
169 	clnt->cl_nodelen = strlen(utsname()->nodename);
170 	if (clnt->cl_nodelen > UNX_MAXNODENAME)
171 		clnt->cl_nodelen = UNX_MAXNODENAME;
172 	memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
173 	return clnt;
174 
175 out_no_auth:
176 	if (!IS_ERR(clnt->cl_dentry)) {
177 		rpc_rmdir(clnt->cl_dentry);
178 		rpc_put_mount();
179 	}
180 out_no_path:
181 	rpc_free_iostats(clnt->cl_metrics);
182 out_no_stats:
183 	if (clnt->cl_server != clnt->cl_inline_name)
184 		kfree(clnt->cl_server);
185 	kfree(clnt);
186 out_err:
187 	xprt_put(xprt);
188 out_no_xprt:
189 	return ERR_PTR(err);
190 }
191 
192 /*
193  * rpc_create - create an RPC client and transport with one call
194  * @args: rpc_clnt create argument structure
195  *
196  * Creates and initializes an RPC transport and an RPC client.
197  *
198  * It can ping the server in order to determine if it is up, and to see if
199  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
200  * this behavior so asynchronous tasks can also use rpc_create.
201  */
202 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
203 {
204 	struct rpc_xprt *xprt;
205 	struct rpc_clnt *clnt;
206 
207 	xprt = xprt_create_transport(args->protocol, args->address,
208 					args->addrsize, args->timeout);
209 	if (IS_ERR(xprt))
210 		return (struct rpc_clnt *)xprt;
211 
212 	/*
213 	 * By default, kernel RPC client connects from a reserved port.
214 	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
215 	 * but it is always enabled for rpciod, which handles the connect
216 	 * operation.
217 	 */
218 	xprt->resvport = 1;
219 	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
220 		xprt->resvport = 0;
221 
222 	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
223 		args->program->name, args->servername, xprt);
224 
225 	clnt = rpc_new_client(xprt, args->servername, args->program,
226 				args->version, args->authflavor);
227 	if (IS_ERR(clnt))
228 		return clnt;
229 
230 	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
231 		int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
232 		if (err != 0) {
233 			rpc_shutdown_client(clnt);
234 			return ERR_PTR(err);
235 		}
236 	}
237 
238 	clnt->cl_softrtry = 1;
239 	if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
240 		clnt->cl_softrtry = 0;
241 
242 	if (args->flags & RPC_CLNT_CREATE_INTR)
243 		clnt->cl_intr = 1;
244 	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
245 		clnt->cl_autobind = 1;
246 	if (args->flags & RPC_CLNT_CREATE_ONESHOT)
247 		clnt->cl_oneshot = 1;
248 
249 	return clnt;
250 }
251 EXPORT_SYMBOL_GPL(rpc_create);
252 
253 /*
254  * This function clones the RPC client structure. It allows us to share the
255  * same transport while varying parameters such as the authentication
256  * flavour.
257  */
258 struct rpc_clnt *
259 rpc_clone_client(struct rpc_clnt *clnt)
260 {
261 	struct rpc_clnt *new;
262 	int err = -ENOMEM;
263 
264 	new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
265 	if (!new)
266 		goto out_no_clnt;
267 	atomic_set(&new->cl_count, 1);
268 	atomic_set(&new->cl_users, 0);
269 	new->cl_metrics = rpc_alloc_iostats(clnt);
270 	if (new->cl_metrics == NULL)
271 		goto out_no_stats;
272 	err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
273 	if (err != 0)
274 		goto out_no_path;
275 	new->cl_parent = clnt;
276 	atomic_inc(&clnt->cl_count);
277 	new->cl_xprt = xprt_get(clnt->cl_xprt);
278 	/* Turn off autobind on clones */
279 	new->cl_autobind = 0;
280 	new->cl_oneshot = 0;
281 	new->cl_dead = 0;
282 	rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
283 	if (new->cl_auth)
284 		atomic_inc(&new->cl_auth->au_count);
285 	return new;
286 out_no_path:
287 	rpc_free_iostats(new->cl_metrics);
288 out_no_stats:
289 	kfree(new);
290 out_no_clnt:
291 	dprintk("RPC: %s returned error %d\n", __FUNCTION__, err);
292 	return ERR_PTR(err);
293 }
294 
295 /*
296  * Properly shut down an RPC client, terminating all outstanding
297  * requests. Note that we must be certain that cl_oneshot and
298  * cl_dead are cleared, or else the client would be destroyed
299  * when the last task releases it.
300  */
301 int
302 rpc_shutdown_client(struct rpc_clnt *clnt)
303 {
304 	dprintk("RPC: shutting down %s client for %s, tasks=%d\n",
305 			clnt->cl_protname, clnt->cl_server,
306 			atomic_read(&clnt->cl_users));
307 
308 	while (atomic_read(&clnt->cl_users) > 0) {
309 		/* Don't let rpc_release_client destroy us */
310 		clnt->cl_oneshot = 0;
311 		clnt->cl_dead = 0;
312 		rpc_killall_tasks(clnt);
313 		wait_event_timeout(destroy_wait,
314 			!atomic_read(&clnt->cl_users), 1*HZ);
315 	}
316 
317 	if (atomic_read(&clnt->cl_users) < 0) {
318 		printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
319 				clnt, atomic_read(&clnt->cl_users));
320 #ifdef RPC_DEBUG
321 		rpc_show_tasks();
322 #endif
323 		BUG();
324 	}
325 
326 	return rpc_destroy_client(clnt);
327 }
328 
329 /*
330  * Delete an RPC client
331  */
332 int
333 rpc_destroy_client(struct rpc_clnt *clnt)
334 {
335 	if (!atomic_dec_and_test(&clnt->cl_count))
336 		return 1;
337 	BUG_ON(atomic_read(&clnt->cl_users) != 0);
338 
339 	dprintk("RPC: destroying %s client for %s\n",
340 			clnt->cl_protname, clnt->cl_server);
341 	if (clnt->cl_auth) {
342 		rpcauth_destroy(clnt->cl_auth);
343 		clnt->cl_auth = NULL;
344 	}
345 	if (!IS_ERR(clnt->cl_dentry)) {
346 		rpc_rmdir(clnt->cl_dentry);
347 		rpc_put_mount();
348 	}
349 	if (clnt->cl_parent != clnt) {
350 		rpc_destroy_client(clnt->cl_parent);
351 		goto out_free;
352 	}
353 	if (clnt->cl_server != clnt->cl_inline_name)
354 		kfree(clnt->cl_server);
355 out_free:
356 	rpc_free_iostats(clnt->cl_metrics);
357 	clnt->cl_metrics = NULL;
358 	xprt_put(clnt->cl_xprt);
359 	kfree(clnt);
360 	return 0;
361 }
362 
363 /*
364  * Release an RPC client
365  */
366 void
367 rpc_release_client(struct rpc_clnt *clnt)
368 {
369 	dprintk("RPC:      rpc_release_client(%p, %d)\n",
370 				clnt, atomic_read(&clnt->cl_users));
371 
372 	if (!atomic_dec_and_test(&clnt->cl_users))
373 		return;
374 	wake_up(&destroy_wait);
375 	if (clnt->cl_oneshot || clnt->cl_dead)
376 		rpc_destroy_client(clnt);
377 }
378 
379 /**
380  * rpc_bind_new_program - bind a new RPC program to an existing client
381  * @old - old rpc_client
382  * @program - rpc program to set
383  * @vers - rpc program version
384  *
385  * Clones the rpc client and sets up a new RPC program. This is mainly
386  * of use for enabling different RPC programs to share the same transport.
387  * The Sun NFSv2/v3 ACL protocol can do this.
388  */
389 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
390 				      struct rpc_program *program,
391 				      int vers)
392 {
393 	struct rpc_clnt *clnt;
394 	struct rpc_version *version;
395 	int err;
396 
397 	BUG_ON(vers >= program->nrvers || !program->version[vers]);
398 	version = program->version[vers];
399 	clnt = rpc_clone_client(old);
400 	if (IS_ERR(clnt))
401 		goto out;
402 	clnt->cl_procinfo = version->procs;
403 	clnt->cl_maxproc  = version->nrprocs;
404 	clnt->cl_protname = program->name;
405 	clnt->cl_prog     = program->number;
406 	clnt->cl_vers     = version->number;
407 	clnt->cl_stats    = program->stats;
408 	err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
409 	if (err != 0) {
410 		rpc_shutdown_client(clnt);
411 		clnt = ERR_PTR(err);
412 	}
413 out:
414 	return clnt;
415 }
416 
417 /*
418  * Default callback for async RPC calls
419  */
420 static void
421 rpc_default_callback(struct rpc_task *task, void *data)
422 {
423 }
424 
425 static const struct rpc_call_ops rpc_default_ops = {
426 	.rpc_call_done = rpc_default_callback,
427 };
428 
429 /*
430  *	Export the signal mask handling for synchronous code that
431  *	sleeps on RPC calls
432  */
433 #define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
434 
435 static void rpc_save_sigmask(sigset_t *oldset, int intr)
436 {
437 	unsigned long	sigallow = sigmask(SIGKILL);
438 	sigset_t sigmask;
439 
440 	/* Block all signals except those listed in sigallow */
441 	if (intr)
442 		sigallow |= RPC_INTR_SIGNALS;
443 	siginitsetinv(&sigmask, sigallow);
444 	sigprocmask(SIG_BLOCK, &sigmask, oldset);
445 }
446 
447 static inline void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
448 {
449 	rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
450 }
451 
452 static inline void rpc_restore_sigmask(sigset_t *oldset)
453 {
454 	sigprocmask(SIG_SETMASK, oldset, NULL);
455 }
456 
457 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
458 {
459 	rpc_save_sigmask(oldset, clnt->cl_intr);
460 }
461 
462 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
463 {
464 	rpc_restore_sigmask(oldset);
465 }
466 
467 /*
468  * New rpc_call implementation
469  */
470 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
471 {
472 	struct rpc_task	*task;
473 	sigset_t	oldset;
474 	int		status;
475 
476 	/* If this client is slain all further I/O fails */
477 	if (clnt->cl_dead)
478 		return -EIO;
479 
480 	BUG_ON(flags & RPC_TASK_ASYNC);
481 
482 	task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL);
483 	if (task == NULL)
484 		return -ENOMEM;
485 
486 	/* Mask signals on RPC calls _and_ GSS_AUTH upcalls */
487 	rpc_task_sigmask(task, &oldset);
488 
489 	rpc_call_setup(task, msg, 0);
490 
491 	/* Set up the call info struct and execute the task */
492 	status = task->tk_status;
493 	if (status != 0) {
494 		rpc_release_task(task);
495 		goto out;
496 	}
497 	atomic_inc(&task->tk_count);
498 	status = rpc_execute(task);
499 	if (status == 0)
500 		status = task->tk_status;
501 	rpc_put_task(task);
502 out:
503 	rpc_restore_sigmask(&oldset);
504 	return status;
505 }
506 
507 /*
508  * New rpc_call implementation
509  */
510 int
511 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
512 	       const struct rpc_call_ops *tk_ops, void *data)
513 {
514 	struct rpc_task	*task;
515 	sigset_t	oldset;
516 	int		status;
517 
518 	/* If this client is slain all further I/O fails */
519 	status = -EIO;
520 	if (clnt->cl_dead)
521 		goto out_release;
522 
523 	flags |= RPC_TASK_ASYNC;
524 
525 	/* Create/initialize a new RPC task */
526 	status = -ENOMEM;
527 	if (!(task = rpc_new_task(clnt, flags, tk_ops, data)))
528 		goto out_release;
529 
530 	/* Mask signals on GSS_AUTH upcalls */
531 	rpc_task_sigmask(task, &oldset);
532 
533 	rpc_call_setup(task, msg, 0);
534 
535 	/* Set up the call info struct and execute the task */
536 	status = task->tk_status;
537 	if (status == 0)
538 		rpc_execute(task);
539 	else
540 		rpc_release_task(task);
541 
542 	rpc_restore_sigmask(&oldset);
543 	return status;
544 out_release:
545 	rpc_release_calldata(tk_ops, data);
546 	return status;
547 }
548 
549 
550 void
551 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
552 {
553 	task->tk_msg   = *msg;
554 	task->tk_flags |= flags;
555 	/* Bind the user cred */
556 	if (task->tk_msg.rpc_cred != NULL)
557 		rpcauth_holdcred(task);
558 	else
559 		rpcauth_bindcred(task);
560 
561 	if (task->tk_status == 0)
562 		task->tk_action = call_start;
563 	else
564 		task->tk_action = rpc_exit_task;
565 }
566 
567 /**
568  * rpc_peeraddr - extract remote peer address from clnt's xprt
569  * @clnt: RPC client structure
570  * @buf: target buffer
571  * @size: length of target buffer
572  *
573  * Returns the number of bytes that are actually in the stored address.
574  */
575 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
576 {
577 	size_t bytes;
578 	struct rpc_xprt *xprt = clnt->cl_xprt;
579 
580 	bytes = sizeof(xprt->addr);
581 	if (bytes > bufsize)
582 		bytes = bufsize;
583 	memcpy(buf, &clnt->cl_xprt->addr, bytes);
584 	return xprt->addrlen;
585 }
586 EXPORT_SYMBOL_GPL(rpc_peeraddr);
587 
588 /**
589  * rpc_peeraddr2str - return remote peer address in printable format
590  * @clnt: RPC client structure
591  * @format: address format
592  *
593  */
594 char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format)
595 {
596 	struct rpc_xprt *xprt = clnt->cl_xprt;
597 
598 	if (xprt->address_strings[format] != NULL)
599 		return xprt->address_strings[format];
600 	else
601 		return "unprintable";
602 }
603 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
604 
605 void
606 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
607 {
608 	struct rpc_xprt *xprt = clnt->cl_xprt;
609 	if (xprt->ops->set_buffer_size)
610 		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
611 }
612 
613 /*
614  * Return size of largest payload RPC client can support, in bytes
615  *
616  * For stream transports, this is one RPC record fragment (see RFC
617  * 1831), as we don't support multi-record requests yet.  For datagram
618  * transports, this is the size of an IP packet minus the IP, UDP, and
619  * RPC header sizes.
620  */
621 size_t rpc_max_payload(struct rpc_clnt *clnt)
622 {
623 	return clnt->cl_xprt->max_payload;
624 }
625 EXPORT_SYMBOL_GPL(rpc_max_payload);
626 
627 /**
628  * rpc_force_rebind - force transport to check that remote port is unchanged
629  * @clnt: client to rebind
630  *
631  */
632 void rpc_force_rebind(struct rpc_clnt *clnt)
633 {
634 	if (clnt->cl_autobind)
635 		xprt_clear_bound(clnt->cl_xprt);
636 }
637 EXPORT_SYMBOL_GPL(rpc_force_rebind);
638 
639 /*
640  * Restart an (async) RPC call. Usually called from within the
641  * exit handler.
642  */
643 void
644 rpc_restart_call(struct rpc_task *task)
645 {
646 	if (RPC_ASSASSINATED(task))
647 		return;
648 
649 	task->tk_action = call_start;
650 }
651 
652 /*
653  * 0.  Initial state
654  *
655  *     Other FSM states can be visited zero or more times, but
656  *     this state is visited exactly once for each RPC.
657  */
658 static void
659 call_start(struct rpc_task *task)
660 {
661 	struct rpc_clnt	*clnt = task->tk_client;
662 
663 	dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid,
664 		clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc,
665 		(RPC_IS_ASYNC(task) ? "async" : "sync"));
666 
667 	/* Increment call count */
668 	task->tk_msg.rpc_proc->p_count++;
669 	clnt->cl_stats->rpccnt++;
670 	task->tk_action = call_reserve;
671 }
672 
673 /*
674  * 1.	Reserve an RPC call slot
675  */
676 static void
677 call_reserve(struct rpc_task *task)
678 {
679 	dprintk("RPC: %4d call_reserve\n", task->tk_pid);
680 
681 	if (!rpcauth_uptodatecred(task)) {
682 		task->tk_action = call_refresh;
683 		return;
684 	}
685 
686 	task->tk_status  = 0;
687 	task->tk_action  = call_reserveresult;
688 	xprt_reserve(task);
689 }
690 
691 /*
692  * 1b.	Grok the result of xprt_reserve()
693  */
694 static void
695 call_reserveresult(struct rpc_task *task)
696 {
697 	int status = task->tk_status;
698 
699 	dprintk("RPC: %4d call_reserveresult (status %d)\n",
700 				task->tk_pid, task->tk_status);
701 
702 	/*
703 	 * After a call to xprt_reserve(), we must have either
704 	 * a request slot or else an error status.
705 	 */
706 	task->tk_status = 0;
707 	if (status >= 0) {
708 		if (task->tk_rqstp) {
709 			task->tk_action = call_allocate;
710 			return;
711 		}
712 
713 		printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
714 				__FUNCTION__, status);
715 		rpc_exit(task, -EIO);
716 		return;
717 	}
718 
719 	/*
720 	 * Even though there was an error, we may have acquired
721 	 * a request slot somehow.  Make sure not to leak it.
722 	 */
723 	if (task->tk_rqstp) {
724 		printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
725 				__FUNCTION__, status);
726 		xprt_release(task);
727 	}
728 
729 	switch (status) {
730 	case -EAGAIN:	/* woken up; retry */
731 		task->tk_action = call_reserve;
732 		return;
733 	case -EIO:	/* probably a shutdown */
734 		break;
735 	default:
736 		printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
737 				__FUNCTION__, status);
738 		break;
739 	}
740 	rpc_exit(task, status);
741 }
742 
743 /*
744  * 2.	Allocate the buffer. For details, see sched.c:rpc_malloc.
745  *	(Note: buffer memory is freed in xprt_release).
746  */
747 static void
748 call_allocate(struct rpc_task *task)
749 {
750 	struct rpc_rqst *req = task->tk_rqstp;
751 	struct rpc_xprt *xprt = task->tk_xprt;
752 	unsigned int	bufsiz;
753 
754 	dprintk("RPC: %4d call_allocate (status %d)\n",
755 				task->tk_pid, task->tk_status);
756 	task->tk_action = call_bind;
757 	if (req->rq_buffer)
758 		return;
759 
760 	/* FIXME: compute buffer requirements more exactly using
761 	 * auth->au_wslack */
762 	bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
763 
764 	if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL)
765 		return;
766 	printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task);
767 
768 	if (RPC_IS_ASYNC(task) || !signalled()) {
769 		xprt_release(task);
770 		task->tk_action = call_reserve;
771 		rpc_delay(task, HZ>>4);
772 		return;
773 	}
774 
775 	rpc_exit(task, -ERESTARTSYS);
776 }
777 
778 static inline int
779 rpc_task_need_encode(struct rpc_task *task)
780 {
781 	return task->tk_rqstp->rq_snd_buf.len == 0;
782 }
783 
784 static inline void
785 rpc_task_force_reencode(struct rpc_task *task)
786 {
787 	task->tk_rqstp->rq_snd_buf.len = 0;
788 }
789 
790 /*
791  * 3.	Encode arguments of an RPC call
792  */
793 static void
794 call_encode(struct rpc_task *task)
795 {
796 	struct rpc_rqst	*req = task->tk_rqstp;
797 	struct xdr_buf *sndbuf = &req->rq_snd_buf;
798 	struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
799 	unsigned int	bufsiz;
800 	kxdrproc_t	encode;
801 	__be32		*p;
802 
803 	dprintk("RPC: %4d call_encode (status %d)\n",
804 				task->tk_pid, task->tk_status);
805 
806 	/* Default buffer setup */
807 	bufsiz = req->rq_bufsize >> 1;
808 	sndbuf->head[0].iov_base = (void *)req->rq_buffer;
809 	sndbuf->head[0].iov_len  = bufsiz;
810 	sndbuf->tail[0].iov_len  = 0;
811 	sndbuf->page_len	 = 0;
812 	sndbuf->len		 = 0;
813 	sndbuf->buflen		 = bufsiz;
814 	rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz);
815 	rcvbuf->head[0].iov_len  = bufsiz;
816 	rcvbuf->tail[0].iov_len  = 0;
817 	rcvbuf->page_len	 = 0;
818 	rcvbuf->len		 = 0;
819 	rcvbuf->buflen		 = bufsiz;
820 
821 	/* Encode header and provided arguments */
822 	encode = task->tk_msg.rpc_proc->p_encode;
823 	if (!(p = call_header(task))) {
824 		printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
825 		rpc_exit(task, -EIO);
826 		return;
827 	}
828 	if (encode == NULL)
829 		return;
830 
831 	lock_kernel();
832 	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
833 			task->tk_msg.rpc_argp);
834 	unlock_kernel();
835 	if (task->tk_status == -ENOMEM) {
836 		/* XXX: Is this sane? */
837 		rpc_delay(task, 3*HZ);
838 		task->tk_status = -EAGAIN;
839 	}
840 }
841 
842 /*
843  * 4.	Get the server port number if not yet set
844  */
845 static void
846 call_bind(struct rpc_task *task)
847 {
848 	struct rpc_xprt *xprt = task->tk_xprt;
849 
850 	dprintk("RPC: %4d call_bind (status %d)\n",
851 				task->tk_pid, task->tk_status);
852 
853 	task->tk_action = call_connect;
854 	if (!xprt_bound(xprt)) {
855 		task->tk_action = call_bind_status;
856 		task->tk_timeout = xprt->bind_timeout;
857 		xprt->ops->rpcbind(task);
858 	}
859 }
860 
861 /*
862  * 4a.	Sort out bind result
863  */
864 static void
865 call_bind_status(struct rpc_task *task)
866 {
867 	int status = -EACCES;
868 
869 	if (task->tk_status >= 0) {
870 		dprintk("RPC: %4d call_bind_status (status %d)\n",
871 					task->tk_pid, task->tk_status);
872 		task->tk_status = 0;
873 		task->tk_action = call_connect;
874 		return;
875 	}
876 
877 	switch (task->tk_status) {
878 	case -EACCES:
879 		dprintk("RPC: %4d remote rpcbind: RPC program/version unavailable\n",
880 				task->tk_pid);
881 		rpc_delay(task, 3*HZ);
882 		goto retry_timeout;
883 	case -ETIMEDOUT:
884 		dprintk("RPC: %4d rpcbind request timed out\n",
885 				task->tk_pid);
886 		goto retry_timeout;
887 	case -EPFNOSUPPORT:
888 		dprintk("RPC: %4d remote rpcbind service unavailable\n",
889 				task->tk_pid);
890 		break;
891 	case -EPROTONOSUPPORT:
892 		dprintk("RPC: %4d remote rpcbind version 2 unavailable\n",
893 				task->tk_pid);
894 		break;
895 	default:
896 		dprintk("RPC: %4d unrecognized rpcbind error (%d)\n",
897 				task->tk_pid, -task->tk_status);
898 		status = -EIO;
899 	}
900 
901 	rpc_exit(task, status);
902 	return;
903 
904 retry_timeout:
905 	task->tk_action = call_timeout;
906 }
907 
908 /*
909  * 4b.	Connect to the RPC server
910  */
911 static void
912 call_connect(struct rpc_task *task)
913 {
914 	struct rpc_xprt *xprt = task->tk_xprt;
915 
916 	dprintk("RPC: %4d call_connect xprt %p %s connected\n",
917 			task->tk_pid, xprt,
918 			(xprt_connected(xprt) ? "is" : "is not"));
919 
920 	task->tk_action = call_transmit;
921 	if (!xprt_connected(xprt)) {
922 		task->tk_action = call_connect_status;
923 		if (task->tk_status < 0)
924 			return;
925 		xprt_connect(task);
926 	}
927 }
928 
929 /*
930  * 4c.	Sort out connect result
931  */
932 static void
933 call_connect_status(struct rpc_task *task)
934 {
935 	struct rpc_clnt *clnt = task->tk_client;
936 	int status = task->tk_status;
937 
938 	dprintk("RPC: %5u call_connect_status (status %d)\n",
939 				task->tk_pid, task->tk_status);
940 
941 	task->tk_status = 0;
942 	if (status >= 0) {
943 		clnt->cl_stats->netreconn++;
944 		task->tk_action = call_transmit;
945 		return;
946 	}
947 
948 	/* Something failed: remote service port may have changed */
949 	rpc_force_rebind(clnt);
950 
951 	switch (status) {
952 	case -ENOTCONN:
953 	case -EAGAIN:
954 		task->tk_action = call_bind;
955 		if (!RPC_IS_SOFT(task))
956 			return;
957 		/* if soft mounted, test if we've timed out */
958 	case -ETIMEDOUT:
959 		task->tk_action = call_timeout;
960 		return;
961 	}
962 	rpc_exit(task, -EIO);
963 }
964 
965 /*
966  * 5.	Transmit the RPC request, and wait for reply
967  */
968 static void
969 call_transmit(struct rpc_task *task)
970 {
971 	dprintk("RPC: %4d call_transmit (status %d)\n",
972 				task->tk_pid, task->tk_status);
973 
974 	task->tk_action = call_status;
975 	if (task->tk_status < 0)
976 		return;
977 	task->tk_status = xprt_prepare_transmit(task);
978 	if (task->tk_status != 0)
979 		return;
980 	task->tk_action = call_transmit_status;
981 	/* Encode here so that rpcsec_gss can use correct sequence number. */
982 	if (rpc_task_need_encode(task)) {
983 		BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
984 		call_encode(task);
985 		/* Did the encode result in an error condition? */
986 		if (task->tk_status != 0)
987 			return;
988 	}
989 	xprt_transmit(task);
990 	if (task->tk_status < 0)
991 		return;
992 	/*
993 	 * On success, ensure that we call xprt_end_transmit() before sleeping
994 	 * in order to allow access to the socket to other RPC requests.
995 	 */
996 	call_transmit_status(task);
997 	if (task->tk_msg.rpc_proc->p_decode != NULL)
998 		return;
999 	task->tk_action = rpc_exit_task;
1000 	rpc_wake_up_task(task);
1001 }
1002 
1003 /*
1004  * 5a.	Handle cleanup after a transmission
1005  */
1006 static void
1007 call_transmit_status(struct rpc_task *task)
1008 {
1009 	task->tk_action = call_status;
1010 	/*
1011 	 * Special case: if we've been waiting on the socket's write_space()
1012 	 * callback, then don't call xprt_end_transmit().
1013 	 */
1014 	if (task->tk_status == -EAGAIN)
1015 		return;
1016 	xprt_end_transmit(task);
1017 	rpc_task_force_reencode(task);
1018 }
1019 
1020 /*
1021  * 6.	Sort out the RPC call status
1022  */
1023 static void
1024 call_status(struct rpc_task *task)
1025 {
1026 	struct rpc_clnt	*clnt = task->tk_client;
1027 	struct rpc_rqst	*req = task->tk_rqstp;
1028 	int		status;
1029 
1030 	if (req->rq_received > 0 && !req->rq_bytes_sent)
1031 		task->tk_status = req->rq_received;
1032 
1033 	dprintk("RPC: %4d call_status (status %d)\n",
1034 				task->tk_pid, task->tk_status);
1035 
1036 	status = task->tk_status;
1037 	if (status >= 0) {
1038 		task->tk_action = call_decode;
1039 		return;
1040 	}
1041 
1042 	task->tk_status = 0;
1043 	switch(status) {
1044 	case -EHOSTDOWN:
1045 	case -EHOSTUNREACH:
1046 	case -ENETUNREACH:
1047 		/*
1048 		 * Delay any retries for 3 seconds, then handle as if it
1049 		 * were a timeout.
1050 		 */
1051 		rpc_delay(task, 3*HZ);
1052 	case -ETIMEDOUT:
1053 		task->tk_action = call_timeout;
1054 		break;
1055 	case -ECONNREFUSED:
1056 	case -ENOTCONN:
1057 		rpc_force_rebind(clnt);
1058 		task->tk_action = call_bind;
1059 		break;
1060 	case -EAGAIN:
1061 		task->tk_action = call_transmit;
1062 		break;
1063 	case -EIO:
1064 		/* shutdown or soft timeout */
1065 		rpc_exit(task, status);
1066 		break;
1067 	default:
1068 		printk("%s: RPC call returned error %d\n",
1069 			       clnt->cl_protname, -status);
1070 		rpc_exit(task, status);
1071 	}
1072 }
1073 
1074 /*
1075  * 6a.	Handle RPC timeout
1076  * 	We do not release the request slot, so we keep using the
1077  *	same XID for all retransmits.
1078  */
1079 static void
1080 call_timeout(struct rpc_task *task)
1081 {
1082 	struct rpc_clnt	*clnt = task->tk_client;
1083 
1084 	if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1085 		dprintk("RPC: %4d call_timeout (minor)\n", task->tk_pid);
1086 		goto retry;
1087 	}
1088 
1089 	dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
1090 	task->tk_timeouts++;
1091 
1092 	if (RPC_IS_SOFT(task)) {
1093 		printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1094 				clnt->cl_protname, clnt->cl_server);
1095 		rpc_exit(task, -EIO);
1096 		return;
1097 	}
1098 
1099 	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1100 		task->tk_flags |= RPC_CALL_MAJORSEEN;
1101 		printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1102 			clnt->cl_protname, clnt->cl_server);
1103 	}
1104 	rpc_force_rebind(clnt);
1105 
1106 retry:
1107 	clnt->cl_stats->rpcretrans++;
1108 	task->tk_action = call_bind;
1109 	task->tk_status = 0;
1110 }
1111 
1112 /*
1113  * 7.	Decode the RPC reply
1114  */
1115 static void
1116 call_decode(struct rpc_task *task)
1117 {
1118 	struct rpc_clnt	*clnt = task->tk_client;
1119 	struct rpc_rqst	*req = task->tk_rqstp;
1120 	kxdrproc_t	decode = task->tk_msg.rpc_proc->p_decode;
1121 	__be32		*p;
1122 
1123 	dprintk("RPC: %4d call_decode (status %d)\n",
1124 				task->tk_pid, task->tk_status);
1125 
1126 	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1127 		printk(KERN_NOTICE "%s: server %s OK\n",
1128 			clnt->cl_protname, clnt->cl_server);
1129 		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1130 	}
1131 
1132 	if (task->tk_status < 12) {
1133 		if (!RPC_IS_SOFT(task)) {
1134 			task->tk_action = call_bind;
1135 			clnt->cl_stats->rpcretrans++;
1136 			goto out_retry;
1137 		}
1138 		dprintk("%s: too small RPC reply size (%d bytes)\n",
1139 			clnt->cl_protname, task->tk_status);
1140 		task->tk_action = call_timeout;
1141 		goto out_retry;
1142 	}
1143 
1144 	/*
1145 	 * Ensure that we see all writes made by xprt_complete_rqst()
1146 	 * before it changed req->rq_received.
1147 	 */
1148 	smp_rmb();
1149 	req->rq_rcv_buf.len = req->rq_private_buf.len;
1150 
1151 	/* Check that the softirq receive buffer is valid */
1152 	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1153 				sizeof(req->rq_rcv_buf)) != 0);
1154 
1155 	/* Verify the RPC header */
1156 	p = call_verify(task);
1157 	if (IS_ERR(p)) {
1158 		if (p == ERR_PTR(-EAGAIN))
1159 			goto out_retry;
1160 		return;
1161 	}
1162 
1163 	task->tk_action = rpc_exit_task;
1164 
1165 	if (decode) {
1166 		lock_kernel();
1167 		task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1168 						      task->tk_msg.rpc_resp);
1169 		unlock_kernel();
1170 	}
1171 	dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
1172 					task->tk_status);
1173 	return;
1174 out_retry:
1175 	req->rq_received = req->rq_private_buf.len = 0;
1176 	task->tk_status = 0;
1177 }
1178 
1179 /*
1180  * 8.	Refresh the credentials if rejected by the server
1181  */
1182 static void
1183 call_refresh(struct rpc_task *task)
1184 {
1185 	dprintk("RPC: %4d call_refresh\n", task->tk_pid);
1186 
1187 	xprt_release(task);	/* Must do to obtain new XID */
1188 	task->tk_action = call_refreshresult;
1189 	task->tk_status = 0;
1190 	task->tk_client->cl_stats->rpcauthrefresh++;
1191 	rpcauth_refreshcred(task);
1192 }
1193 
1194 /*
1195  * 8a.	Process the results of a credential refresh
1196  */
1197 static void
1198 call_refreshresult(struct rpc_task *task)
1199 {
1200 	int status = task->tk_status;
1201 	dprintk("RPC: %4d call_refreshresult (status %d)\n",
1202 				task->tk_pid, task->tk_status);
1203 
1204 	task->tk_status = 0;
1205 	task->tk_action = call_reserve;
1206 	if (status >= 0 && rpcauth_uptodatecred(task))
1207 		return;
1208 	if (status == -EACCES) {
1209 		rpc_exit(task, -EACCES);
1210 		return;
1211 	}
1212 	task->tk_action = call_refresh;
1213 	if (status != -ETIMEDOUT)
1214 		rpc_delay(task, 3*HZ);
1215 	return;
1216 }
1217 
1218 /*
1219  * Call header serialization
1220  */
1221 static __be32 *
1222 call_header(struct rpc_task *task)
1223 {
1224 	struct rpc_clnt *clnt = task->tk_client;
1225 	struct rpc_rqst	*req = task->tk_rqstp;
1226 	__be32		*p = req->rq_svec[0].iov_base;
1227 
1228 	/* FIXME: check buffer size? */
1229 
1230 	p = xprt_skip_transport_header(task->tk_xprt, p);
1231 	*p++ = req->rq_xid;		/* XID */
1232 	*p++ = htonl(RPC_CALL);		/* CALL */
1233 	*p++ = htonl(RPC_VERSION);	/* RPC version */
1234 	*p++ = htonl(clnt->cl_prog);	/* program number */
1235 	*p++ = htonl(clnt->cl_vers);	/* program version */
1236 	*p++ = htonl(task->tk_msg.rpc_proc->p_proc);	/* procedure */
1237 	p = rpcauth_marshcred(task, p);
1238 	req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1239 	return p;
1240 }
1241 
1242 /*
1243  * Reply header verification
1244  */
1245 static __be32 *
1246 call_verify(struct rpc_task *task)
1247 {
1248 	struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1249 	int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1250 	__be32	*p = iov->iov_base;
1251 	u32 n;
1252 	int error = -EACCES;
1253 
1254 	if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1255 		/* RFC-1014 says that the representation of XDR data must be a
1256 		 * multiple of four bytes
1257 		 * - if it isn't pointer subtraction in the NFS client may give
1258 		 *   undefined results
1259 		 */
1260 		printk(KERN_WARNING
1261 		       "call_verify: XDR representation not a multiple of"
1262 		       " 4 bytes: 0x%x\n", task->tk_rqstp->rq_rcv_buf.len);
1263 		goto out_eio;
1264 	}
1265 	if ((len -= 3) < 0)
1266 		goto out_overflow;
1267 	p += 1;	/* skip XID */
1268 
1269 	if ((n = ntohl(*p++)) != RPC_REPLY) {
1270 		printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
1271 		goto out_garbage;
1272 	}
1273 	if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1274 		if (--len < 0)
1275 			goto out_overflow;
1276 		switch ((n = ntohl(*p++))) {
1277 			case RPC_AUTH_ERROR:
1278 				break;
1279 			case RPC_MISMATCH:
1280 				dprintk("%s: RPC call version mismatch!\n", __FUNCTION__);
1281 				error = -EPROTONOSUPPORT;
1282 				goto out_err;
1283 			default:
1284 				dprintk("%s: RPC call rejected, unknown error: %x\n", __FUNCTION__, n);
1285 				goto out_eio;
1286 		}
1287 		if (--len < 0)
1288 			goto out_overflow;
1289 		switch ((n = ntohl(*p++))) {
1290 		case RPC_AUTH_REJECTEDCRED:
1291 		case RPC_AUTH_REJECTEDVERF:
1292 		case RPCSEC_GSS_CREDPROBLEM:
1293 		case RPCSEC_GSS_CTXPROBLEM:
1294 			if (!task->tk_cred_retry)
1295 				break;
1296 			task->tk_cred_retry--;
1297 			dprintk("RPC: %4d call_verify: retry stale creds\n",
1298 							task->tk_pid);
1299 			rpcauth_invalcred(task);
1300 			task->tk_action = call_refresh;
1301 			goto out_retry;
1302 		case RPC_AUTH_BADCRED:
1303 		case RPC_AUTH_BADVERF:
1304 			/* possibly garbled cred/verf? */
1305 			if (!task->tk_garb_retry)
1306 				break;
1307 			task->tk_garb_retry--;
1308 			dprintk("RPC: %4d call_verify: retry garbled creds\n",
1309 							task->tk_pid);
1310 			task->tk_action = call_bind;
1311 			goto out_retry;
1312 		case RPC_AUTH_TOOWEAK:
1313 			printk(KERN_NOTICE "call_verify: server %s requires stronger "
1314 			       "authentication.\n", task->tk_client->cl_server);
1315 			break;
1316 		default:
1317 			printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1318 			error = -EIO;
1319 		}
1320 		dprintk("RPC: %4d call_verify: call rejected %d\n",
1321 						task->tk_pid, n);
1322 		goto out_err;
1323 	}
1324 	if (!(p = rpcauth_checkverf(task, p))) {
1325 		printk(KERN_WARNING "call_verify: auth check failed\n");
1326 		goto out_garbage;		/* bad verifier, retry */
1327 	}
1328 	len = p - (__be32 *)iov->iov_base - 1;
1329 	if (len < 0)
1330 		goto out_overflow;
1331 	switch ((n = ntohl(*p++))) {
1332 	case RPC_SUCCESS:
1333 		return p;
1334 	case RPC_PROG_UNAVAIL:
1335 		dprintk("RPC: call_verify: program %u is unsupported by server %s\n",
1336 				(unsigned int)task->tk_client->cl_prog,
1337 				task->tk_client->cl_server);
1338 		error = -EPFNOSUPPORT;
1339 		goto out_err;
1340 	case RPC_PROG_MISMATCH:
1341 		dprintk("RPC: call_verify: program %u, version %u unsupported by server %s\n",
1342 				(unsigned int)task->tk_client->cl_prog,
1343 				(unsigned int)task->tk_client->cl_vers,
1344 				task->tk_client->cl_server);
1345 		error = -EPROTONOSUPPORT;
1346 		goto out_err;
1347 	case RPC_PROC_UNAVAIL:
1348 		dprintk("RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
1349 				task->tk_msg.rpc_proc,
1350 				task->tk_client->cl_prog,
1351 				task->tk_client->cl_vers,
1352 				task->tk_client->cl_server);
1353 		error = -EOPNOTSUPP;
1354 		goto out_err;
1355 	case RPC_GARBAGE_ARGS:
1356 		dprintk("RPC: %4d %s: server saw garbage\n", task->tk_pid, __FUNCTION__);
1357 		break;			/* retry */
1358 	default:
1359 		printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1360 		/* Also retry */
1361 	}
1362 
1363 out_garbage:
1364 	task->tk_client->cl_stats->rpcgarbage++;
1365 	if (task->tk_garb_retry) {
1366 		task->tk_garb_retry--;
1367 		dprintk("RPC %s: retrying %4d\n", __FUNCTION__, task->tk_pid);
1368 		task->tk_action = call_bind;
1369 out_retry:
1370 		return ERR_PTR(-EAGAIN);
1371 	}
1372 	printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
1373 out_eio:
1374 	error = -EIO;
1375 out_err:
1376 	rpc_exit(task, error);
1377 	return ERR_PTR(error);
1378 out_overflow:
1379 	printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
1380 	goto out_garbage;
1381 }
1382 
1383 static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj)
1384 {
1385 	return 0;
1386 }
1387 
1388 static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj)
1389 {
1390 	return 0;
1391 }
1392 
1393 static struct rpc_procinfo rpcproc_null = {
1394 	.p_encode = rpcproc_encode_null,
1395 	.p_decode = rpcproc_decode_null,
1396 };
1397 
1398 int rpc_ping(struct rpc_clnt *clnt, int flags)
1399 {
1400 	struct rpc_message msg = {
1401 		.rpc_proc = &rpcproc_null,
1402 	};
1403 	int err;
1404 	msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1405 	err = rpc_call_sync(clnt, &msg, flags);
1406 	put_rpccred(msg.rpc_cred);
1407 	return err;
1408 }
1409