xref: /linux/net/sunrpc/clnt.c (revision 8a5f956a9fb7d74fff681145082acfad5afa6bb8)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  *  linux/net/sunrpc/clnt.c
4  *
5  *  This file contains the high-level RPC interface.
6  *  It is modeled as a finite state machine to support both synchronous
7  *  and asynchronous requests.
8  *
9  *  -	RPC header generation and argument serialization.
10  *  -	Credential refresh.
11  *  -	TCP connect handling.
12  *  -	Retry of operation when it is suspected the operation failed because
13  *	of uid squashing on the server, or when the credentials were stale
14  *	and need to be refreshed, or when a packet was damaged in transit.
15  *	This may be have to be moved to the VFS layer.
16  *
17  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
18  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
19  */
20 
21 
22 #include <linux/module.h>
23 #include <linux/types.h>
24 #include <linux/kallsyms.h>
25 #include <linux/mm.h>
26 #include <linux/namei.h>
27 #include <linux/mount.h>
28 #include <linux/slab.h>
29 #include <linux/rcupdate.h>
30 #include <linux/utsname.h>
31 #include <linux/workqueue.h>
32 #include <linux/in.h>
33 #include <linux/in6.h>
34 #include <linux/un.h>
35 
36 #include <linux/sunrpc/clnt.h>
37 #include <linux/sunrpc/addr.h>
38 #include <linux/sunrpc/rpc_pipe_fs.h>
39 #include <linux/sunrpc/metrics.h>
40 #include <linux/sunrpc/bc_xprt.h>
41 #include <trace/events/sunrpc.h>
42 
43 #include "sunrpc.h"
44 #include "sysfs.h"
45 #include "netns.h"
46 
47 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
48 # define RPCDBG_FACILITY	RPCDBG_CALL
49 #endif
50 
51 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
52 
53 static void	call_start(struct rpc_task *task);
54 static void	call_reserve(struct rpc_task *task);
55 static void	call_reserveresult(struct rpc_task *task);
56 static void	call_allocate(struct rpc_task *task);
57 static void	call_encode(struct rpc_task *task);
58 static void	call_decode(struct rpc_task *task);
59 static void	call_bind(struct rpc_task *task);
60 static void	call_bind_status(struct rpc_task *task);
61 static void	call_transmit(struct rpc_task *task);
62 static void	call_status(struct rpc_task *task);
63 static void	call_transmit_status(struct rpc_task *task);
64 static void	call_refresh(struct rpc_task *task);
65 static void	call_refreshresult(struct rpc_task *task);
66 static void	call_connect(struct rpc_task *task);
67 static void	call_connect_status(struct rpc_task *task);
68 
69 static int	rpc_encode_header(struct rpc_task *task,
70 				  struct xdr_stream *xdr);
71 static int	rpc_decode_header(struct rpc_task *task,
72 				  struct xdr_stream *xdr);
73 static int	rpc_ping(struct rpc_clnt *clnt);
74 static int	rpc_ping_noreply(struct rpc_clnt *clnt);
75 static void	rpc_check_timeout(struct rpc_task *task);
76 
77 static void rpc_register_client(struct rpc_clnt *clnt)
78 {
79 	struct net *net = rpc_net_ns(clnt);
80 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
81 
82 	spin_lock(&sn->rpc_client_lock);
83 	list_add(&clnt->cl_clients, &sn->all_clients);
84 	spin_unlock(&sn->rpc_client_lock);
85 }
86 
87 static void rpc_unregister_client(struct rpc_clnt *clnt)
88 {
89 	struct net *net = rpc_net_ns(clnt);
90 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
91 
92 	spin_lock(&sn->rpc_client_lock);
93 	list_del(&clnt->cl_clients);
94 	spin_unlock(&sn->rpc_client_lock);
95 }
96 
97 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
98 {
99 	rpc_remove_client_dir(clnt);
100 }
101 
102 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
103 {
104 	struct net *net = rpc_net_ns(clnt);
105 	struct super_block *pipefs_sb;
106 
107 	pipefs_sb = rpc_get_sb_net(net);
108 	if (pipefs_sb) {
109 		if (pipefs_sb == clnt->pipefs_sb)
110 			__rpc_clnt_remove_pipedir(clnt);
111 		rpc_put_sb_net(net);
112 	}
113 }
114 
115 static int rpc_setup_pipedir_sb(struct super_block *sb,
116 				    struct rpc_clnt *clnt)
117 {
118 	static uint32_t clntid;
119 	const char *dir_name = clnt->cl_program->pipe_dir_name;
120 	char name[15];
121 	struct dentry *dir;
122 	int err;
123 
124 	dir = rpc_d_lookup_sb(sb, dir_name);
125 	if (dir == NULL) {
126 		pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
127 		return -ENOENT;
128 	}
129 	for (;;) {
130 		snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
131 		name[sizeof(name) - 1] = '\0';
132 		err = rpc_create_client_dir(dir, name, clnt);
133 		if (!err)
134 			break;
135 		if (err == -EEXIST)
136 			continue;
137 		printk(KERN_INFO "RPC: Couldn't create pipefs entry"
138 				" %s/%s, error %d\n",
139 				dir_name, name, err);
140 		break;
141 	}
142 	dput(dir);
143 	return err;
144 }
145 
146 static int
147 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
148 {
149 	clnt->pipefs_sb = pipefs_sb;
150 
151 	if (clnt->cl_program->pipe_dir_name != NULL) {
152 		int err = rpc_setup_pipedir_sb(pipefs_sb, clnt);
153 		if (err && err != -ENOENT)
154 			return err;
155 	}
156 	return 0;
157 }
158 
159 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
160 {
161 	if (clnt->cl_program->pipe_dir_name == NULL)
162 		return 1;
163 
164 	switch (event) {
165 	case RPC_PIPEFS_MOUNT:
166 		if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
167 			return 1;
168 		if (refcount_read(&clnt->cl_count) == 0)
169 			return 1;
170 		break;
171 	case RPC_PIPEFS_UMOUNT:
172 		if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
173 			return 1;
174 		break;
175 	}
176 	return 0;
177 }
178 
179 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
180 				   struct super_block *sb)
181 {
182 	switch (event) {
183 	case RPC_PIPEFS_MOUNT:
184 		return rpc_setup_pipedir_sb(sb, clnt);
185 	case RPC_PIPEFS_UMOUNT:
186 		__rpc_clnt_remove_pipedir(clnt);
187 		break;
188 	default:
189 		printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
190 		return -ENOTSUPP;
191 	}
192 	return 0;
193 }
194 
195 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
196 				struct super_block *sb)
197 {
198 	int error = 0;
199 
200 	for (;; clnt = clnt->cl_parent) {
201 		if (!rpc_clnt_skip_event(clnt, event))
202 			error = __rpc_clnt_handle_event(clnt, event, sb);
203 		if (error || clnt == clnt->cl_parent)
204 			break;
205 	}
206 	return error;
207 }
208 
209 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
210 {
211 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
212 	struct rpc_clnt *clnt;
213 
214 	spin_lock(&sn->rpc_client_lock);
215 	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
216 		if (rpc_clnt_skip_event(clnt, event))
217 			continue;
218 		spin_unlock(&sn->rpc_client_lock);
219 		return clnt;
220 	}
221 	spin_unlock(&sn->rpc_client_lock);
222 	return NULL;
223 }
224 
225 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
226 			    void *ptr)
227 {
228 	struct super_block *sb = ptr;
229 	struct rpc_clnt *clnt;
230 	int error = 0;
231 
232 	while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
233 		error = __rpc_pipefs_event(clnt, event, sb);
234 		if (error)
235 			break;
236 	}
237 	return error;
238 }
239 
240 static struct notifier_block rpc_clients_block = {
241 	.notifier_call	= rpc_pipefs_event,
242 	.priority	= SUNRPC_PIPEFS_RPC_PRIO,
243 };
244 
245 int rpc_clients_notifier_register(void)
246 {
247 	return rpc_pipefs_notifier_register(&rpc_clients_block);
248 }
249 
250 void rpc_clients_notifier_unregister(void)
251 {
252 	return rpc_pipefs_notifier_unregister(&rpc_clients_block);
253 }
254 
255 static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
256 		struct rpc_xprt *xprt,
257 		const struct rpc_timeout *timeout)
258 {
259 	struct rpc_xprt *old;
260 
261 	spin_lock(&clnt->cl_lock);
262 	old = rcu_dereference_protected(clnt->cl_xprt,
263 			lockdep_is_held(&clnt->cl_lock));
264 
265 	clnt->cl_timeout = timeout;
266 	rcu_assign_pointer(clnt->cl_xprt, xprt);
267 	spin_unlock(&clnt->cl_lock);
268 
269 	return old;
270 }
271 
272 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
273 {
274 	ssize_t copied;
275 
276 	copied = strscpy(clnt->cl_nodename,
277 			 nodename, sizeof(clnt->cl_nodename));
278 
279 	clnt->cl_nodelen = copied < 0
280 				? sizeof(clnt->cl_nodename) - 1
281 				: copied;
282 }
283 
284 static int rpc_client_register(struct rpc_clnt *clnt,
285 			       rpc_authflavor_t pseudoflavor,
286 			       const char *client_name)
287 {
288 	struct rpc_auth_create_args auth_args = {
289 		.pseudoflavor = pseudoflavor,
290 		.target_name = client_name,
291 	};
292 	struct rpc_auth *auth;
293 	struct net *net = rpc_net_ns(clnt);
294 	struct super_block *pipefs_sb;
295 	int err;
296 
297 	rpc_clnt_debugfs_register(clnt);
298 
299 	pipefs_sb = rpc_get_sb_net(net);
300 	if (pipefs_sb) {
301 		err = rpc_setup_pipedir(pipefs_sb, clnt);
302 		if (err)
303 			goto out;
304 	}
305 
306 	rpc_register_client(clnt);
307 	if (pipefs_sb)
308 		rpc_put_sb_net(net);
309 
310 	auth = rpcauth_create(&auth_args, clnt);
311 	if (IS_ERR(auth)) {
312 		dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
313 				pseudoflavor);
314 		err = PTR_ERR(auth);
315 		goto err_auth;
316 	}
317 	return 0;
318 err_auth:
319 	pipefs_sb = rpc_get_sb_net(net);
320 	rpc_unregister_client(clnt);
321 	__rpc_clnt_remove_pipedir(clnt);
322 out:
323 	if (pipefs_sb)
324 		rpc_put_sb_net(net);
325 	rpc_sysfs_client_destroy(clnt);
326 	rpc_clnt_debugfs_unregister(clnt);
327 	return err;
328 }
329 
330 static DEFINE_IDA(rpc_clids);
331 
332 void rpc_cleanup_clids(void)
333 {
334 	ida_destroy(&rpc_clids);
335 }
336 
337 static int rpc_alloc_clid(struct rpc_clnt *clnt)
338 {
339 	int clid;
340 
341 	clid = ida_alloc(&rpc_clids, GFP_KERNEL);
342 	if (clid < 0)
343 		return clid;
344 	clnt->cl_clid = clid;
345 	return 0;
346 }
347 
348 static void rpc_free_clid(struct rpc_clnt *clnt)
349 {
350 	ida_free(&rpc_clids, clnt->cl_clid);
351 }
352 
353 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
354 		struct rpc_xprt_switch *xps,
355 		struct rpc_xprt *xprt,
356 		struct rpc_clnt *parent)
357 {
358 	const struct rpc_program *program = args->program;
359 	const struct rpc_version *version;
360 	struct rpc_clnt *clnt = NULL;
361 	const struct rpc_timeout *timeout;
362 	const char *nodename = args->nodename;
363 	int err;
364 
365 	err = rpciod_up();
366 	if (err)
367 		goto out_no_rpciod;
368 
369 	err = -EINVAL;
370 	if (args->version >= program->nrvers)
371 		goto out_err;
372 	version = program->version[args->version];
373 	if (version == NULL)
374 		goto out_err;
375 
376 	err = -ENOMEM;
377 	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
378 	if (!clnt)
379 		goto out_err;
380 	clnt->cl_parent = parent ? : clnt;
381 	clnt->cl_xprtsec = args->xprtsec;
382 
383 	err = rpc_alloc_clid(clnt);
384 	if (err)
385 		goto out_no_clid;
386 
387 	clnt->cl_cred	  = get_cred(args->cred);
388 	clnt->cl_procinfo = version->procs;
389 	clnt->cl_maxproc  = version->nrprocs;
390 	clnt->cl_prog     = args->prognumber ? : program->number;
391 	clnt->cl_vers     = version->number;
392 	clnt->cl_stats    = args->stats ? : program->stats;
393 	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
394 	rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
395 	err = -ENOMEM;
396 	if (clnt->cl_metrics == NULL)
397 		goto out_no_stats;
398 	clnt->cl_program  = program;
399 	INIT_LIST_HEAD(&clnt->cl_tasks);
400 	spin_lock_init(&clnt->cl_lock);
401 
402 	timeout = xprt->timeout;
403 	if (args->timeout != NULL) {
404 		memcpy(&clnt->cl_timeout_default, args->timeout,
405 				sizeof(clnt->cl_timeout_default));
406 		timeout = &clnt->cl_timeout_default;
407 	}
408 
409 	rpc_clnt_set_transport(clnt, xprt, timeout);
410 	xprt->main = true;
411 	xprt_iter_init(&clnt->cl_xpi, xps);
412 	xprt_switch_put(xps);
413 
414 	clnt->cl_rtt = &clnt->cl_rtt_default;
415 	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
416 
417 	refcount_set(&clnt->cl_count, 1);
418 
419 	if (nodename == NULL)
420 		nodename = utsname()->nodename;
421 	/* save the nodename */
422 	rpc_clnt_set_nodename(clnt, nodename);
423 
424 	rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
425 	err = rpc_client_register(clnt, args->authflavor, args->client_name);
426 	if (err)
427 		goto out_no_path;
428 	if (parent)
429 		refcount_inc(&parent->cl_count);
430 
431 	trace_rpc_clnt_new(clnt, xprt, args);
432 	return clnt;
433 
434 out_no_path:
435 	rpc_free_iostats(clnt->cl_metrics);
436 out_no_stats:
437 	put_cred(clnt->cl_cred);
438 	rpc_free_clid(clnt);
439 out_no_clid:
440 	kfree(clnt);
441 out_err:
442 	rpciod_down();
443 out_no_rpciod:
444 	xprt_switch_put(xps);
445 	xprt_put(xprt);
446 	trace_rpc_clnt_new_err(program->name, args->servername, err);
447 	return ERR_PTR(err);
448 }
449 
450 static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
451 					struct rpc_xprt *xprt)
452 {
453 	struct rpc_clnt *clnt = NULL;
454 	struct rpc_xprt_switch *xps;
455 
456 	if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
457 		WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
458 		xps = args->bc_xprt->xpt_bc_xps;
459 		xprt_switch_get(xps);
460 	} else {
461 		xps = xprt_switch_alloc(xprt, GFP_KERNEL);
462 		if (xps == NULL) {
463 			xprt_put(xprt);
464 			return ERR_PTR(-ENOMEM);
465 		}
466 		if (xprt->bc_xprt) {
467 			xprt_switch_get(xps);
468 			xprt->bc_xprt->xpt_bc_xps = xps;
469 		}
470 	}
471 	clnt = rpc_new_client(args, xps, xprt, NULL);
472 	if (IS_ERR(clnt))
473 		return clnt;
474 
475 	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
476 		int err = rpc_ping(clnt);
477 		if (err != 0) {
478 			rpc_shutdown_client(clnt);
479 			return ERR_PTR(err);
480 		}
481 	} else if (args->flags & RPC_CLNT_CREATE_CONNECTED) {
482 		int err = rpc_ping_noreply(clnt);
483 		if (err != 0) {
484 			rpc_shutdown_client(clnt);
485 			return ERR_PTR(err);
486 		}
487 	}
488 
489 	clnt->cl_softrtry = 1;
490 	if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
491 		clnt->cl_softrtry = 0;
492 		if (args->flags & RPC_CLNT_CREATE_SOFTERR)
493 			clnt->cl_softerr = 1;
494 	}
495 
496 	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
497 		clnt->cl_autobind = 1;
498 	if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
499 		clnt->cl_noretranstimeo = 1;
500 	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
501 		clnt->cl_discrtry = 1;
502 	if (!(args->flags & RPC_CLNT_CREATE_QUIET))
503 		clnt->cl_chatty = 1;
504 	if (args->flags & RPC_CLNT_CREATE_NETUNREACH_FATAL)
505 		clnt->cl_netunreach_fatal = 1;
506 
507 	return clnt;
508 }
509 
510 /**
511  * rpc_create - create an RPC client and transport with one call
512  * @args: rpc_clnt create argument structure
513  *
514  * Creates and initializes an RPC transport and an RPC client.
515  *
516  * It can ping the server in order to determine if it is up, and to see if
517  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
518  * this behavior so asynchronous tasks can also use rpc_create.
519  */
520 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
521 {
522 	struct rpc_xprt *xprt;
523 	struct xprt_create xprtargs = {
524 		.net = args->net,
525 		.ident = args->protocol,
526 		.srcaddr = args->saddress,
527 		.dstaddr = args->address,
528 		.addrlen = args->addrsize,
529 		.servername = args->servername,
530 		.bc_xprt = args->bc_xprt,
531 		.xprtsec = args->xprtsec,
532 		.connect_timeout = args->connect_timeout,
533 		.reconnect_timeout = args->reconnect_timeout,
534 	};
535 	char servername[RPC_MAXNETNAMELEN];
536 	struct rpc_clnt *clnt;
537 	int i;
538 
539 	if (args->bc_xprt) {
540 		WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
541 		xprt = args->bc_xprt->xpt_bc_xprt;
542 		if (xprt) {
543 			xprt_get(xprt);
544 			return rpc_create_xprt(args, xprt);
545 		}
546 	}
547 
548 	if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
549 		xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
550 	if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
551 		xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
552 	/*
553 	 * If the caller chooses not to specify a hostname, whip
554 	 * up a string representation of the passed-in address.
555 	 */
556 	if (xprtargs.servername == NULL) {
557 		struct sockaddr_un *sun =
558 				(struct sockaddr_un *)args->address;
559 		struct sockaddr_in *sin =
560 				(struct sockaddr_in *)args->address;
561 		struct sockaddr_in6 *sin6 =
562 				(struct sockaddr_in6 *)args->address;
563 
564 		servername[0] = '\0';
565 		switch (args->address->sa_family) {
566 		case AF_LOCAL:
567 			if (sun->sun_path[0])
568 				snprintf(servername, sizeof(servername), "%s",
569 					 sun->sun_path);
570 			else
571 				snprintf(servername, sizeof(servername), "@%s",
572 					 sun->sun_path+1);
573 			break;
574 		case AF_INET:
575 			snprintf(servername, sizeof(servername), "%pI4",
576 				 &sin->sin_addr.s_addr);
577 			break;
578 		case AF_INET6:
579 			snprintf(servername, sizeof(servername), "%pI6",
580 				 &sin6->sin6_addr);
581 			break;
582 		default:
583 			/* caller wants default server name, but
584 			 * address family isn't recognized. */
585 			return ERR_PTR(-EINVAL);
586 		}
587 		xprtargs.servername = servername;
588 	}
589 
590 	xprt = xprt_create_transport(&xprtargs);
591 	if (IS_ERR(xprt))
592 		return (struct rpc_clnt *)xprt;
593 
594 	/*
595 	 * By default, kernel RPC client connects from a reserved port.
596 	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
597 	 * but it is always enabled for rpciod, which handles the connect
598 	 * operation.
599 	 */
600 	xprt->resvport = 1;
601 	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
602 		xprt->resvport = 0;
603 	xprt->reuseport = 0;
604 	if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
605 		xprt->reuseport = 1;
606 
607 	clnt = rpc_create_xprt(args, xprt);
608 	if (IS_ERR(clnt) || args->nconnect <= 1)
609 		return clnt;
610 
611 	for (i = 0; i < args->nconnect - 1; i++) {
612 		if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
613 			break;
614 	}
615 	return clnt;
616 }
617 EXPORT_SYMBOL_GPL(rpc_create);
618 
619 /*
620  * This function clones the RPC client structure. It allows us to share the
621  * same transport while varying parameters such as the authentication
622  * flavour.
623  */
624 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
625 					   struct rpc_clnt *clnt)
626 {
627 	struct rpc_xprt_switch *xps;
628 	struct rpc_xprt *xprt;
629 	struct rpc_clnt *new;
630 	int err;
631 
632 	err = -ENOMEM;
633 	rcu_read_lock();
634 	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
635 	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
636 	rcu_read_unlock();
637 	if (xprt == NULL || xps == NULL) {
638 		xprt_put(xprt);
639 		xprt_switch_put(xps);
640 		goto out_err;
641 	}
642 	args->servername = xprt->servername;
643 	args->nodename = clnt->cl_nodename;
644 
645 	new = rpc_new_client(args, xps, xprt, clnt);
646 	if (IS_ERR(new))
647 		return new;
648 
649 	/* Turn off autobind on clones */
650 	new->cl_autobind = 0;
651 	new->cl_softrtry = clnt->cl_softrtry;
652 	new->cl_softerr = clnt->cl_softerr;
653 	new->cl_noretranstimeo = clnt->cl_noretranstimeo;
654 	new->cl_discrtry = clnt->cl_discrtry;
655 	new->cl_chatty = clnt->cl_chatty;
656 	new->cl_netunreach_fatal = clnt->cl_netunreach_fatal;
657 	new->cl_principal = clnt->cl_principal;
658 	new->cl_max_connect = clnt->cl_max_connect;
659 	return new;
660 
661 out_err:
662 	trace_rpc_clnt_clone_err(clnt, err);
663 	return ERR_PTR(err);
664 }
665 
666 /**
667  * rpc_clone_client - Clone an RPC client structure
668  *
669  * @clnt: RPC client whose parameters are copied
670  *
671  * Returns a fresh RPC client or an ERR_PTR.
672  */
673 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
674 {
675 	struct rpc_create_args args = {
676 		.program	= clnt->cl_program,
677 		.prognumber	= clnt->cl_prog,
678 		.version	= clnt->cl_vers,
679 		.authflavor	= clnt->cl_auth->au_flavor,
680 		.cred		= clnt->cl_cred,
681 		.stats		= clnt->cl_stats,
682 	};
683 	return __rpc_clone_client(&args, clnt);
684 }
685 EXPORT_SYMBOL_GPL(rpc_clone_client);
686 
687 /**
688  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
689  *
690  * @clnt: RPC client whose parameters are copied
691  * @flavor: security flavor for new client
692  *
693  * Returns a fresh RPC client or an ERR_PTR.
694  */
695 struct rpc_clnt *
696 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
697 {
698 	struct rpc_create_args args = {
699 		.program	= clnt->cl_program,
700 		.prognumber	= clnt->cl_prog,
701 		.version	= clnt->cl_vers,
702 		.authflavor	= flavor,
703 		.cred		= clnt->cl_cred,
704 		.stats		= clnt->cl_stats,
705 	};
706 	return __rpc_clone_client(&args, clnt);
707 }
708 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
709 
710 /**
711  * rpc_switch_client_transport: switch the RPC transport on the fly
712  * @clnt: pointer to a struct rpc_clnt
713  * @args: pointer to the new transport arguments
714  * @timeout: pointer to the new timeout parameters
715  *
716  * This function allows the caller to switch the RPC transport for the
717  * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
718  * server, for instance.  It assumes that the caller has ensured that
719  * there are no active RPC tasks by using some form of locking.
720  *
721  * Returns zero if "clnt" is now using the new xprt.  Otherwise a
722  * negative errno is returned, and "clnt" continues to use the old
723  * xprt.
724  */
725 int rpc_switch_client_transport(struct rpc_clnt *clnt,
726 		struct xprt_create *args,
727 		const struct rpc_timeout *timeout)
728 {
729 	const struct rpc_timeout *old_timeo;
730 	rpc_authflavor_t pseudoflavor;
731 	struct rpc_xprt_switch *xps, *oldxps;
732 	struct rpc_xprt *xprt, *old;
733 	struct rpc_clnt *parent;
734 	int err;
735 
736 	args->xprtsec = clnt->cl_xprtsec;
737 	xprt = xprt_create_transport(args);
738 	if (IS_ERR(xprt))
739 		return PTR_ERR(xprt);
740 
741 	xps = xprt_switch_alloc(xprt, GFP_KERNEL);
742 	if (xps == NULL) {
743 		xprt_put(xprt);
744 		return -ENOMEM;
745 	}
746 
747 	pseudoflavor = clnt->cl_auth->au_flavor;
748 
749 	old_timeo = clnt->cl_timeout;
750 	old = rpc_clnt_set_transport(clnt, xprt, timeout);
751 	oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
752 
753 	rpc_unregister_client(clnt);
754 	__rpc_clnt_remove_pipedir(clnt);
755 	rpc_sysfs_client_destroy(clnt);
756 	rpc_clnt_debugfs_unregister(clnt);
757 
758 	/*
759 	 * A new transport was created.  "clnt" therefore
760 	 * becomes the root of a new cl_parent tree.  clnt's
761 	 * children, if it has any, still point to the old xprt.
762 	 */
763 	parent = clnt->cl_parent;
764 	clnt->cl_parent = clnt;
765 
766 	/*
767 	 * The old rpc_auth cache cannot be re-used.  GSS
768 	 * contexts in particular are between a single
769 	 * client and server.
770 	 */
771 	err = rpc_client_register(clnt, pseudoflavor, NULL);
772 	if (err)
773 		goto out_revert;
774 
775 	synchronize_rcu();
776 	if (parent != clnt)
777 		rpc_release_client(parent);
778 	xprt_switch_put(oldxps);
779 	xprt_put(old);
780 	trace_rpc_clnt_replace_xprt(clnt);
781 	return 0;
782 
783 out_revert:
784 	xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
785 	rpc_clnt_set_transport(clnt, old, old_timeo);
786 	clnt->cl_parent = parent;
787 	rpc_client_register(clnt, pseudoflavor, NULL);
788 	xprt_switch_put(xps);
789 	xprt_put(xprt);
790 	trace_rpc_clnt_replace_xprt_err(clnt);
791 	return err;
792 }
793 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
794 
795 static struct rpc_xprt_switch *rpc_clnt_xprt_switch_get(struct rpc_clnt *clnt)
796 {
797 	struct rpc_xprt_switch *xps;
798 
799 	rcu_read_lock();
800 	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
801 	rcu_read_unlock();
802 
803 	return xps;
804 }
805 
806 static
807 int _rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi,
808 			     void func(struct rpc_xprt_iter *xpi, struct rpc_xprt_switch *xps))
809 {
810 	struct rpc_xprt_switch *xps;
811 
812 	xps = rpc_clnt_xprt_switch_get(clnt);
813 	if (xps == NULL)
814 		return -EAGAIN;
815 	func(xpi, xps);
816 	xprt_switch_put(xps);
817 	return 0;
818 }
819 
820 static
821 int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
822 {
823 	return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listall);
824 }
825 
826 static
827 int rpc_clnt_xprt_iter_offline_init(struct rpc_clnt *clnt,
828 				    struct rpc_xprt_iter *xpi)
829 {
830 	return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listoffline);
831 }
832 
833 /**
834  * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
835  * @clnt: pointer to client
836  * @fn: function to apply
837  * @data: void pointer to function data
838  *
839  * Iterates through the list of RPC transports currently attached to the
840  * client and applies the function fn(clnt, xprt, data).
841  *
842  * On error, the iteration stops, and the function returns the error value.
843  */
844 int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
845 		int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
846 		void *data)
847 {
848 	struct rpc_xprt_iter xpi;
849 	int ret;
850 
851 	ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
852 	if (ret)
853 		return ret;
854 	for (;;) {
855 		struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
856 
857 		if (!xprt)
858 			break;
859 		ret = fn(clnt, xprt, data);
860 		xprt_put(xprt);
861 		if (ret < 0)
862 			break;
863 	}
864 	xprt_iter_destroy(&xpi);
865 	return ret;
866 }
867 EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
868 
869 /*
870  * Kill all tasks for the given client.
871  * XXX: kill their descendants as well?
872  */
873 void rpc_killall_tasks(struct rpc_clnt *clnt)
874 {
875 	struct rpc_task	*rovr;
876 
877 
878 	if (list_empty(&clnt->cl_tasks))
879 		return;
880 
881 	/*
882 	 * Spin lock all_tasks to prevent changes...
883 	 */
884 	trace_rpc_clnt_killall(clnt);
885 	spin_lock(&clnt->cl_lock);
886 	list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
887 		rpc_signal_task(rovr);
888 	spin_unlock(&clnt->cl_lock);
889 }
890 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
891 
892 /**
893  * rpc_cancel_tasks - try to cancel a set of RPC tasks
894  * @clnt: Pointer to RPC client
895  * @error: RPC task error value to set
896  * @fnmatch: Pointer to selector function
897  * @data: User data
898  *
899  * Uses @fnmatch to define a set of RPC tasks that are to be cancelled.
900  * The argument @error must be a negative error value.
901  */
902 unsigned long rpc_cancel_tasks(struct rpc_clnt *clnt, int error,
903 			       bool (*fnmatch)(const struct rpc_task *,
904 					       const void *),
905 			       const void *data)
906 {
907 	struct rpc_task *task;
908 	unsigned long count = 0;
909 
910 	if (list_empty(&clnt->cl_tasks))
911 		return 0;
912 	/*
913 	 * Spin lock all_tasks to prevent changes...
914 	 */
915 	spin_lock(&clnt->cl_lock);
916 	list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
917 		if (!RPC_IS_ACTIVATED(task))
918 			continue;
919 		if (!fnmatch(task, data))
920 			continue;
921 		rpc_task_try_cancel(task, error);
922 		count++;
923 	}
924 	spin_unlock(&clnt->cl_lock);
925 	return count;
926 }
927 EXPORT_SYMBOL_GPL(rpc_cancel_tasks);
928 
929 static int rpc_clnt_disconnect_xprt(struct rpc_clnt *clnt,
930 				    struct rpc_xprt *xprt, void *dummy)
931 {
932 	if (xprt_connected(xprt))
933 		xprt_force_disconnect(xprt);
934 	return 0;
935 }
936 
937 void rpc_clnt_disconnect(struct rpc_clnt *clnt)
938 {
939 	rpc_clnt_iterate_for_each_xprt(clnt, rpc_clnt_disconnect_xprt, NULL);
940 }
941 EXPORT_SYMBOL_GPL(rpc_clnt_disconnect);
942 
943 /*
944  * Properly shut down an RPC client, terminating all outstanding
945  * requests.
946  */
947 void rpc_shutdown_client(struct rpc_clnt *clnt)
948 {
949 	might_sleep();
950 
951 	trace_rpc_clnt_shutdown(clnt);
952 
953 	clnt->cl_shutdown = 1;
954 	while (!list_empty(&clnt->cl_tasks)) {
955 		rpc_killall_tasks(clnt);
956 		wait_event_timeout(destroy_wait,
957 			list_empty(&clnt->cl_tasks), 1*HZ);
958 	}
959 
960 	/* wait for tasks still in workqueue or waitqueue */
961 	wait_event_timeout(destroy_wait,
962 			   atomic_read(&clnt->cl_task_count) == 0, 1 * HZ);
963 
964 	rpc_release_client(clnt);
965 }
966 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
967 
968 /*
969  * Free an RPC client
970  */
971 static void rpc_free_client_work(struct work_struct *work)
972 {
973 	struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
974 
975 	trace_rpc_clnt_free(clnt);
976 
977 	/* These might block on processes that might allocate memory,
978 	 * so they cannot be called in rpciod, so they are handled separately
979 	 * here.
980 	 */
981 	rpc_sysfs_client_destroy(clnt);
982 	rpc_clnt_debugfs_unregister(clnt);
983 	rpc_free_clid(clnt);
984 	rpc_clnt_remove_pipedir(clnt);
985 	xprt_put(rcu_dereference_raw(clnt->cl_xprt));
986 
987 	kfree(clnt);
988 	rpciod_down();
989 }
990 static struct rpc_clnt *
991 rpc_free_client(struct rpc_clnt *clnt)
992 {
993 	struct rpc_clnt *parent = NULL;
994 
995 	trace_rpc_clnt_release(clnt);
996 	if (clnt->cl_parent != clnt)
997 		parent = clnt->cl_parent;
998 	rpc_unregister_client(clnt);
999 	rpc_free_iostats(clnt->cl_metrics);
1000 	clnt->cl_metrics = NULL;
1001 	xprt_iter_destroy(&clnt->cl_xpi);
1002 	put_cred(clnt->cl_cred);
1003 
1004 	INIT_WORK(&clnt->cl_work, rpc_free_client_work);
1005 	schedule_work(&clnt->cl_work);
1006 	return parent;
1007 }
1008 
1009 /*
1010  * Free an RPC client
1011  */
1012 static struct rpc_clnt *
1013 rpc_free_auth(struct rpc_clnt *clnt)
1014 {
1015 	/*
1016 	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
1017 	 *       release remaining GSS contexts. This mechanism ensures
1018 	 *       that it can do so safely.
1019 	 */
1020 	if (clnt->cl_auth != NULL) {
1021 		rpcauth_release(clnt->cl_auth);
1022 		clnt->cl_auth = NULL;
1023 	}
1024 	if (refcount_dec_and_test(&clnt->cl_count))
1025 		return rpc_free_client(clnt);
1026 	return NULL;
1027 }
1028 
1029 /*
1030  * Release reference to the RPC client
1031  */
1032 void
1033 rpc_release_client(struct rpc_clnt *clnt)
1034 {
1035 	do {
1036 		if (list_empty(&clnt->cl_tasks))
1037 			wake_up(&destroy_wait);
1038 		if (refcount_dec_not_one(&clnt->cl_count))
1039 			break;
1040 		clnt = rpc_free_auth(clnt);
1041 	} while (clnt != NULL);
1042 }
1043 EXPORT_SYMBOL_GPL(rpc_release_client);
1044 
1045 /**
1046  * rpc_bind_new_program - bind a new RPC program to an existing client
1047  * @old: old rpc_client
1048  * @program: rpc program to set
1049  * @vers: rpc program version
1050  *
1051  * Clones the rpc client and sets up a new RPC program. This is mainly
1052  * of use for enabling different RPC programs to share the same transport.
1053  * The Sun NFSv2/v3 ACL protocol can do this.
1054  */
1055 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
1056 				      const struct rpc_program *program,
1057 				      u32 vers)
1058 {
1059 	struct rpc_create_args args = {
1060 		.program	= program,
1061 		.prognumber	= program->number,
1062 		.version	= vers,
1063 		.authflavor	= old->cl_auth->au_flavor,
1064 		.cred		= old->cl_cred,
1065 		.stats		= old->cl_stats,
1066 		.timeout	= old->cl_timeout,
1067 	};
1068 	struct rpc_clnt *clnt;
1069 	int err;
1070 
1071 	clnt = __rpc_clone_client(&args, old);
1072 	if (IS_ERR(clnt))
1073 		goto out;
1074 	err = rpc_ping(clnt);
1075 	if (err != 0) {
1076 		rpc_shutdown_client(clnt);
1077 		clnt = ERR_PTR(err);
1078 	}
1079 out:
1080 	return clnt;
1081 }
1082 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
1083 
1084 struct rpc_xprt *
1085 rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1086 {
1087 	struct rpc_xprt_switch *xps;
1088 
1089 	if (!xprt)
1090 		return NULL;
1091 	rcu_read_lock();
1092 	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1093 	atomic_long_inc(&xps->xps_queuelen);
1094 	rcu_read_unlock();
1095 	atomic_long_inc(&xprt->queuelen);
1096 
1097 	return xprt;
1098 }
1099 
1100 static void
1101 rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1102 {
1103 	struct rpc_xprt_switch *xps;
1104 
1105 	atomic_long_dec(&xprt->queuelen);
1106 	rcu_read_lock();
1107 	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1108 	atomic_long_dec(&xps->xps_queuelen);
1109 	rcu_read_unlock();
1110 
1111 	xprt_put(xprt);
1112 }
1113 
1114 void rpc_task_release_transport(struct rpc_task *task)
1115 {
1116 	struct rpc_xprt *xprt = task->tk_xprt;
1117 
1118 	if (xprt) {
1119 		task->tk_xprt = NULL;
1120 		if (task->tk_client)
1121 			rpc_task_release_xprt(task->tk_client, xprt);
1122 		else
1123 			xprt_put(xprt);
1124 	}
1125 }
1126 EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1127 
1128 void rpc_task_release_client(struct rpc_task *task)
1129 {
1130 	struct rpc_clnt *clnt = task->tk_client;
1131 
1132 	rpc_task_release_transport(task);
1133 	if (clnt != NULL) {
1134 		/* Remove from client task list */
1135 		spin_lock(&clnt->cl_lock);
1136 		list_del(&task->tk_task);
1137 		spin_unlock(&clnt->cl_lock);
1138 		task->tk_client = NULL;
1139 		atomic_dec(&clnt->cl_task_count);
1140 
1141 		rpc_release_client(clnt);
1142 	}
1143 }
1144 
1145 static struct rpc_xprt *
1146 rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1147 {
1148 	struct rpc_xprt *xprt;
1149 
1150 	rcu_read_lock();
1151 	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1152 	rcu_read_unlock();
1153 	return rpc_task_get_xprt(clnt, xprt);
1154 }
1155 
1156 static struct rpc_xprt *
1157 rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1158 {
1159 	return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1160 }
1161 
1162 static
1163 void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1164 {
1165 	if (task->tk_xprt) {
1166 		if (!(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
1167 		      (task->tk_flags & RPC_TASK_MOVEABLE)))
1168 			return;
1169 		xprt_release(task);
1170 		xprt_put(task->tk_xprt);
1171 	}
1172 	if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1173 		task->tk_xprt = rpc_task_get_first_xprt(clnt);
1174 	else
1175 		task->tk_xprt = rpc_task_get_next_xprt(clnt);
1176 }
1177 
1178 static
1179 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1180 {
1181 	rpc_task_set_transport(task, clnt);
1182 	task->tk_client = clnt;
1183 	refcount_inc(&clnt->cl_count);
1184 	if (clnt->cl_softrtry)
1185 		task->tk_flags |= RPC_TASK_SOFT;
1186 	if (clnt->cl_softerr)
1187 		task->tk_flags |= RPC_TASK_TIMEOUT;
1188 	if (clnt->cl_noretranstimeo)
1189 		task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1190 	if (clnt->cl_netunreach_fatal)
1191 		task->tk_flags |= RPC_TASK_NETUNREACH_FATAL;
1192 	atomic_inc(&clnt->cl_task_count);
1193 }
1194 
1195 static void
1196 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1197 {
1198 	if (msg != NULL) {
1199 		task->tk_msg.rpc_proc = msg->rpc_proc;
1200 		task->tk_msg.rpc_argp = msg->rpc_argp;
1201 		task->tk_msg.rpc_resp = msg->rpc_resp;
1202 		task->tk_msg.rpc_cred = msg->rpc_cred;
1203 		if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1204 			get_cred(task->tk_msg.rpc_cred);
1205 	}
1206 }
1207 
1208 /*
1209  * Default callback for async RPC calls
1210  */
1211 static void
1212 rpc_default_callback(struct rpc_task *task, void *data)
1213 {
1214 }
1215 
1216 static const struct rpc_call_ops rpc_default_ops = {
1217 	.rpc_call_done = rpc_default_callback,
1218 };
1219 
1220 /**
1221  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1222  * @task_setup_data: pointer to task initialisation data
1223  */
1224 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1225 {
1226 	struct rpc_task *task;
1227 
1228 	task = rpc_new_task(task_setup_data);
1229 	if (IS_ERR(task))
1230 		return task;
1231 
1232 	if (!RPC_IS_ASYNC(task))
1233 		task->tk_flags |= RPC_TASK_CRED_NOREF;
1234 
1235 	rpc_task_set_client(task, task_setup_data->rpc_client);
1236 	rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1237 
1238 	if (task->tk_action == NULL)
1239 		rpc_call_start(task);
1240 
1241 	atomic_inc(&task->tk_count);
1242 	rpc_execute(task);
1243 	return task;
1244 }
1245 EXPORT_SYMBOL_GPL(rpc_run_task);
1246 
1247 /**
1248  * rpc_call_sync - Perform a synchronous RPC call
1249  * @clnt: pointer to RPC client
1250  * @msg: RPC call parameters
1251  * @flags: RPC call flags
1252  */
1253 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1254 {
1255 	struct rpc_task	*task;
1256 	struct rpc_task_setup task_setup_data = {
1257 		.rpc_client = clnt,
1258 		.rpc_message = msg,
1259 		.callback_ops = &rpc_default_ops,
1260 		.flags = flags,
1261 	};
1262 	int status;
1263 
1264 	WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1265 	if (flags & RPC_TASK_ASYNC) {
1266 		rpc_release_calldata(task_setup_data.callback_ops,
1267 			task_setup_data.callback_data);
1268 		return -EINVAL;
1269 	}
1270 
1271 	task = rpc_run_task(&task_setup_data);
1272 	if (IS_ERR(task))
1273 		return PTR_ERR(task);
1274 	status = task->tk_status;
1275 	rpc_put_task(task);
1276 	return status;
1277 }
1278 EXPORT_SYMBOL_GPL(rpc_call_sync);
1279 
1280 /**
1281  * rpc_call_async - Perform an asynchronous RPC call
1282  * @clnt: pointer to RPC client
1283  * @msg: RPC call parameters
1284  * @flags: RPC call flags
1285  * @tk_ops: RPC call ops
1286  * @data: user call data
1287  */
1288 int
1289 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1290 	       const struct rpc_call_ops *tk_ops, void *data)
1291 {
1292 	struct rpc_task	*task;
1293 	struct rpc_task_setup task_setup_data = {
1294 		.rpc_client = clnt,
1295 		.rpc_message = msg,
1296 		.callback_ops = tk_ops,
1297 		.callback_data = data,
1298 		.flags = flags|RPC_TASK_ASYNC,
1299 	};
1300 
1301 	task = rpc_run_task(&task_setup_data);
1302 	if (IS_ERR(task))
1303 		return PTR_ERR(task);
1304 	rpc_put_task(task);
1305 	return 0;
1306 }
1307 EXPORT_SYMBOL_GPL(rpc_call_async);
1308 
1309 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1310 static void call_bc_encode(struct rpc_task *task);
1311 
1312 /**
1313  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1314  * rpc_execute against it
1315  * @req: RPC request
1316  * @timeout: timeout values to use for this task
1317  */
1318 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
1319 		struct rpc_timeout *timeout)
1320 {
1321 	struct rpc_task *task;
1322 	struct rpc_task_setup task_setup_data = {
1323 		.callback_ops = &rpc_default_ops,
1324 		.flags = RPC_TASK_SOFTCONN |
1325 			RPC_TASK_NO_RETRANS_TIMEOUT,
1326 	};
1327 
1328 	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1329 	/*
1330 	 * Create an rpc_task to send the data
1331 	 */
1332 	task = rpc_new_task(&task_setup_data);
1333 	if (IS_ERR(task)) {
1334 		xprt_free_bc_request(req);
1335 		return task;
1336 	}
1337 
1338 	xprt_init_bc_request(req, task, timeout);
1339 
1340 	task->tk_action = call_bc_encode;
1341 	atomic_inc(&task->tk_count);
1342 	WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1343 	rpc_execute(task);
1344 
1345 	dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1346 	return task;
1347 }
1348 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1349 
1350 /**
1351  * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1352  * @req: RPC request to prepare
1353  * @pages: vector of struct page pointers
1354  * @base: offset in first page where receive should start, in bytes
1355  * @len: expected size of the upper layer data payload, in bytes
1356  * @hdrsize: expected size of upper layer reply header, in XDR words
1357  *
1358  */
1359 void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1360 			     unsigned int base, unsigned int len,
1361 			     unsigned int hdrsize)
1362 {
1363 	hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1364 
1365 	xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1366 	trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1367 }
1368 EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1369 
1370 void
1371 rpc_call_start(struct rpc_task *task)
1372 {
1373 	task->tk_action = call_start;
1374 }
1375 EXPORT_SYMBOL_GPL(rpc_call_start);
1376 
1377 /**
1378  * rpc_peeraddr - extract remote peer address from clnt's xprt
1379  * @clnt: RPC client structure
1380  * @buf: target buffer
1381  * @bufsize: length of target buffer
1382  *
1383  * Returns the number of bytes that are actually in the stored address.
1384  */
1385 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1386 {
1387 	size_t bytes;
1388 	struct rpc_xprt *xprt;
1389 
1390 	rcu_read_lock();
1391 	xprt = rcu_dereference(clnt->cl_xprt);
1392 
1393 	bytes = xprt->addrlen;
1394 	if (bytes > bufsize)
1395 		bytes = bufsize;
1396 	memcpy(buf, &xprt->addr, bytes);
1397 	rcu_read_unlock();
1398 
1399 	return bytes;
1400 }
1401 EXPORT_SYMBOL_GPL(rpc_peeraddr);
1402 
1403 /**
1404  * rpc_peeraddr2str - return remote peer address in printable format
1405  * @clnt: RPC client structure
1406  * @format: address format
1407  *
1408  * NB: the lifetime of the memory referenced by the returned pointer is
1409  * the same as the rpc_xprt itself.  As long as the caller uses this
1410  * pointer, it must hold the RCU read lock.
1411  */
1412 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1413 			     enum rpc_display_format_t format)
1414 {
1415 	struct rpc_xprt *xprt;
1416 
1417 	xprt = rcu_dereference(clnt->cl_xprt);
1418 
1419 	if (xprt->address_strings[format] != NULL)
1420 		return xprt->address_strings[format];
1421 	else
1422 		return "unprintable";
1423 }
1424 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1425 
1426 static const struct sockaddr_in rpc_inaddr_loopback = {
1427 	.sin_family		= AF_INET,
1428 	.sin_addr.s_addr	= htonl(INADDR_ANY),
1429 };
1430 
1431 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1432 	.sin6_family		= AF_INET6,
1433 	.sin6_addr		= IN6ADDR_ANY_INIT,
1434 };
1435 
1436 /*
1437  * Try a getsockname() on a connected datagram socket.  Using a
1438  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1439  * This conserves the ephemeral port number space.
1440  *
1441  * Returns zero and fills in "buf" if successful; otherwise, a
1442  * negative errno is returned.
1443  */
1444 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1445 			struct sockaddr *buf)
1446 {
1447 	struct socket *sock;
1448 	int err;
1449 
1450 	err = __sock_create(net, sap->sa_family,
1451 				SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1452 	if (err < 0) {
1453 		dprintk("RPC:       can't create UDP socket (%d)\n", err);
1454 		goto out;
1455 	}
1456 
1457 	switch (sap->sa_family) {
1458 	case AF_INET:
1459 		err = kernel_bind(sock,
1460 				(struct sockaddr *)&rpc_inaddr_loopback,
1461 				sizeof(rpc_inaddr_loopback));
1462 		break;
1463 	case AF_INET6:
1464 		err = kernel_bind(sock,
1465 				(struct sockaddr *)&rpc_in6addr_loopback,
1466 				sizeof(rpc_in6addr_loopback));
1467 		break;
1468 	default:
1469 		err = -EAFNOSUPPORT;
1470 		goto out_release;
1471 	}
1472 	if (err < 0) {
1473 		dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1474 		goto out_release;
1475 	}
1476 
1477 	err = kernel_connect(sock, sap, salen, 0);
1478 	if (err < 0) {
1479 		dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1480 		goto out_release;
1481 	}
1482 
1483 	err = kernel_getsockname(sock, buf);
1484 	if (err < 0) {
1485 		dprintk("RPC:       getsockname failed (%d)\n", err);
1486 		goto out_release;
1487 	}
1488 
1489 	err = 0;
1490 	if (buf->sa_family == AF_INET6) {
1491 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1492 		sin6->sin6_scope_id = 0;
1493 	}
1494 	dprintk("RPC:       %s succeeded\n", __func__);
1495 
1496 out_release:
1497 	sock_release(sock);
1498 out:
1499 	return err;
1500 }
1501 
1502 /*
1503  * Scraping a connected socket failed, so we don't have a useable
1504  * local address.  Fallback: generate an address that will prevent
1505  * the server from calling us back.
1506  *
1507  * Returns zero and fills in "buf" if successful; otherwise, a
1508  * negative errno is returned.
1509  */
1510 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1511 {
1512 	switch (family) {
1513 	case AF_INET:
1514 		if (buflen < sizeof(rpc_inaddr_loopback))
1515 			return -EINVAL;
1516 		memcpy(buf, &rpc_inaddr_loopback,
1517 				sizeof(rpc_inaddr_loopback));
1518 		break;
1519 	case AF_INET6:
1520 		if (buflen < sizeof(rpc_in6addr_loopback))
1521 			return -EINVAL;
1522 		memcpy(buf, &rpc_in6addr_loopback,
1523 				sizeof(rpc_in6addr_loopback));
1524 		break;
1525 	default:
1526 		dprintk("RPC:       %s: address family not supported\n",
1527 			__func__);
1528 		return -EAFNOSUPPORT;
1529 	}
1530 	dprintk("RPC:       %s: succeeded\n", __func__);
1531 	return 0;
1532 }
1533 
1534 /**
1535  * rpc_localaddr - discover local endpoint address for an RPC client
1536  * @clnt: RPC client structure
1537  * @buf: target buffer
1538  * @buflen: size of target buffer, in bytes
1539  *
1540  * Returns zero and fills in "buf" and "buflen" if successful;
1541  * otherwise, a negative errno is returned.
1542  *
1543  * This works even if the underlying transport is not currently connected,
1544  * or if the upper layer never previously provided a source address.
1545  *
1546  * The result of this function call is transient: multiple calls in
1547  * succession may give different results, depending on how local
1548  * networking configuration changes over time.
1549  */
1550 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1551 {
1552 	struct sockaddr_storage address;
1553 	struct sockaddr *sap = (struct sockaddr *)&address;
1554 	struct rpc_xprt *xprt;
1555 	struct net *net;
1556 	size_t salen;
1557 	int err;
1558 
1559 	rcu_read_lock();
1560 	xprt = rcu_dereference(clnt->cl_xprt);
1561 	salen = xprt->addrlen;
1562 	memcpy(sap, &xprt->addr, salen);
1563 	net = get_net(xprt->xprt_net);
1564 	rcu_read_unlock();
1565 
1566 	rpc_set_port(sap, 0);
1567 	err = rpc_sockname(net, sap, salen, buf);
1568 	put_net(net);
1569 	if (err != 0)
1570 		/* Couldn't discover local address, return ANYADDR */
1571 		return rpc_anyaddr(sap->sa_family, buf, buflen);
1572 	return 0;
1573 }
1574 EXPORT_SYMBOL_GPL(rpc_localaddr);
1575 
1576 void
1577 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1578 {
1579 	struct rpc_xprt *xprt;
1580 
1581 	rcu_read_lock();
1582 	xprt = rcu_dereference(clnt->cl_xprt);
1583 	if (xprt->ops->set_buffer_size)
1584 		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1585 	rcu_read_unlock();
1586 }
1587 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1588 
1589 /**
1590  * rpc_net_ns - Get the network namespace for this RPC client
1591  * @clnt: RPC client to query
1592  *
1593  */
1594 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1595 {
1596 	struct net *ret;
1597 
1598 	rcu_read_lock();
1599 	ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1600 	rcu_read_unlock();
1601 	return ret;
1602 }
1603 EXPORT_SYMBOL_GPL(rpc_net_ns);
1604 
1605 /**
1606  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1607  * @clnt: RPC client to query
1608  *
1609  * For stream transports, this is one RPC record fragment (see RFC
1610  * 1831), as we don't support multi-record requests yet.  For datagram
1611  * transports, this is the size of an IP packet minus the IP, UDP, and
1612  * RPC header sizes.
1613  */
1614 size_t rpc_max_payload(struct rpc_clnt *clnt)
1615 {
1616 	size_t ret;
1617 
1618 	rcu_read_lock();
1619 	ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1620 	rcu_read_unlock();
1621 	return ret;
1622 }
1623 EXPORT_SYMBOL_GPL(rpc_max_payload);
1624 
1625 /**
1626  * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1627  * @clnt: RPC client to query
1628  */
1629 size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1630 {
1631 	struct rpc_xprt *xprt;
1632 	size_t ret;
1633 
1634 	rcu_read_lock();
1635 	xprt = rcu_dereference(clnt->cl_xprt);
1636 	ret = xprt->ops->bc_maxpayload(xprt);
1637 	rcu_read_unlock();
1638 	return ret;
1639 }
1640 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1641 
1642 unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1643 {
1644 	struct rpc_xprt *xprt;
1645 	unsigned int ret;
1646 
1647 	rcu_read_lock();
1648 	xprt = rcu_dereference(clnt->cl_xprt);
1649 	ret = xprt->ops->bc_num_slots(xprt);
1650 	rcu_read_unlock();
1651 	return ret;
1652 }
1653 EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1654 
1655 /**
1656  * rpc_force_rebind - force transport to check that remote port is unchanged
1657  * @clnt: client to rebind
1658  *
1659  */
1660 void rpc_force_rebind(struct rpc_clnt *clnt)
1661 {
1662 	if (clnt->cl_autobind) {
1663 		rcu_read_lock();
1664 		xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1665 		rcu_read_unlock();
1666 	}
1667 }
1668 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1669 
1670 static int
1671 __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1672 {
1673 	task->tk_status = 0;
1674 	task->tk_rpc_status = 0;
1675 	task->tk_action = action;
1676 	return 1;
1677 }
1678 
1679 /*
1680  * Restart an (async) RPC call. Usually called from within the
1681  * exit handler.
1682  */
1683 int
1684 rpc_restart_call(struct rpc_task *task)
1685 {
1686 	return __rpc_restart_call(task, call_start);
1687 }
1688 EXPORT_SYMBOL_GPL(rpc_restart_call);
1689 
1690 /*
1691  * Restart an (async) RPC call from the call_prepare state.
1692  * Usually called from within the exit handler.
1693  */
1694 int
1695 rpc_restart_call_prepare(struct rpc_task *task)
1696 {
1697 	if (task->tk_ops->rpc_call_prepare != NULL)
1698 		return __rpc_restart_call(task, rpc_prepare_task);
1699 	return rpc_restart_call(task);
1700 }
1701 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1702 
1703 const char
1704 *rpc_proc_name(const struct rpc_task *task)
1705 {
1706 	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1707 
1708 	if (proc) {
1709 		if (proc->p_name)
1710 			return proc->p_name;
1711 		else
1712 			return "NULL";
1713 	} else
1714 		return "no proc";
1715 }
1716 
1717 static void
1718 __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1719 {
1720 	trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1721 	rpc_task_set_rpc_status(task, rpc_status);
1722 	rpc_exit(task, tk_status);
1723 }
1724 
1725 static void
1726 rpc_call_rpcerror(struct rpc_task *task, int status)
1727 {
1728 	__rpc_call_rpcerror(task, status, status);
1729 }
1730 
1731 /*
1732  * 0.  Initial state
1733  *
1734  *     Other FSM states can be visited zero or more times, but
1735  *     this state is visited exactly once for each RPC.
1736  */
1737 static void
1738 call_start(struct rpc_task *task)
1739 {
1740 	struct rpc_clnt	*clnt = task->tk_client;
1741 	int idx = task->tk_msg.rpc_proc->p_statidx;
1742 
1743 	trace_rpc_request(task);
1744 
1745 	if (task->tk_client->cl_shutdown) {
1746 		rpc_call_rpcerror(task, -EIO);
1747 		return;
1748 	}
1749 
1750 	/* Increment call count (version might not be valid for ping) */
1751 	if (clnt->cl_program->version[clnt->cl_vers])
1752 		clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1753 	clnt->cl_stats->rpccnt++;
1754 	task->tk_action = call_reserve;
1755 	rpc_task_set_transport(task, clnt);
1756 }
1757 
1758 /*
1759  * 1.	Reserve an RPC call slot
1760  */
1761 static void
1762 call_reserve(struct rpc_task *task)
1763 {
1764 	task->tk_status  = 0;
1765 	task->tk_action  = call_reserveresult;
1766 	xprt_reserve(task);
1767 }
1768 
1769 static void call_retry_reserve(struct rpc_task *task);
1770 
1771 /*
1772  * 1b.	Grok the result of xprt_reserve()
1773  */
1774 static void
1775 call_reserveresult(struct rpc_task *task)
1776 {
1777 	int status = task->tk_status;
1778 
1779 	/*
1780 	 * After a call to xprt_reserve(), we must have either
1781 	 * a request slot or else an error status.
1782 	 */
1783 	task->tk_status = 0;
1784 	if (status >= 0) {
1785 		if (task->tk_rqstp) {
1786 			task->tk_action = call_refresh;
1787 
1788 			/* Add to the client's list of all tasks */
1789 			spin_lock(&task->tk_client->cl_lock);
1790 			if (list_empty(&task->tk_task))
1791 				list_add_tail(&task->tk_task, &task->tk_client->cl_tasks);
1792 			spin_unlock(&task->tk_client->cl_lock);
1793 			return;
1794 		}
1795 		rpc_call_rpcerror(task, -EIO);
1796 		return;
1797 	}
1798 
1799 	switch (status) {
1800 	case -ENOMEM:
1801 		rpc_delay(task, HZ >> 2);
1802 		fallthrough;
1803 	case -EAGAIN:	/* woken up; retry */
1804 		task->tk_action = call_retry_reserve;
1805 		return;
1806 	default:
1807 		rpc_call_rpcerror(task, status);
1808 	}
1809 }
1810 
1811 /*
1812  * 1c.	Retry reserving an RPC call slot
1813  */
1814 static void
1815 call_retry_reserve(struct rpc_task *task)
1816 {
1817 	task->tk_status  = 0;
1818 	task->tk_action  = call_reserveresult;
1819 	xprt_retry_reserve(task);
1820 }
1821 
1822 /*
1823  * 2.	Bind and/or refresh the credentials
1824  */
1825 static void
1826 call_refresh(struct rpc_task *task)
1827 {
1828 	task->tk_action = call_refreshresult;
1829 	task->tk_status = 0;
1830 	task->tk_client->cl_stats->rpcauthrefresh++;
1831 	rpcauth_refreshcred(task);
1832 }
1833 
1834 /*
1835  * 2a.	Process the results of a credential refresh
1836  */
1837 static void
1838 call_refreshresult(struct rpc_task *task)
1839 {
1840 	int status = task->tk_status;
1841 
1842 	task->tk_status = 0;
1843 	task->tk_action = call_refresh;
1844 	switch (status) {
1845 	case 0:
1846 		if (rpcauth_uptodatecred(task)) {
1847 			task->tk_action = call_allocate;
1848 			return;
1849 		}
1850 		/* Use rate-limiting and a max number of retries if refresh
1851 		 * had status 0 but failed to update the cred.
1852 		 */
1853 		fallthrough;
1854 	case -ETIMEDOUT:
1855 		rpc_delay(task, 3*HZ);
1856 		fallthrough;
1857 	case -EAGAIN:
1858 		status = -EACCES;
1859 		if (!task->tk_cred_retry)
1860 			break;
1861 		task->tk_cred_retry--;
1862 		trace_rpc_retry_refresh_status(task);
1863 		return;
1864 	case -EKEYEXPIRED:
1865 		break;
1866 	case -ENOMEM:
1867 		rpc_delay(task, HZ >> 4);
1868 		return;
1869 	}
1870 	trace_rpc_refresh_status(task);
1871 	rpc_call_rpcerror(task, status);
1872 }
1873 
1874 /*
1875  * 2b.	Allocate the buffer. For details, see sched.c:rpc_malloc.
1876  *	(Note: buffer memory is freed in xprt_release).
1877  */
1878 static void
1879 call_allocate(struct rpc_task *task)
1880 {
1881 	const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1882 	struct rpc_rqst *req = task->tk_rqstp;
1883 	struct rpc_xprt *xprt = req->rq_xprt;
1884 	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1885 	int status;
1886 
1887 	task->tk_status = 0;
1888 	task->tk_action = call_encode;
1889 
1890 	if (req->rq_buffer)
1891 		return;
1892 
1893 	/*
1894 	 * Calculate the size (in quads) of the RPC call
1895 	 * and reply headers, and convert both values
1896 	 * to byte sizes.
1897 	 */
1898 	req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1899 			   proc->p_arglen;
1900 	req->rq_callsize <<= 2;
1901 	/*
1902 	 * Note: the reply buffer must at minimum allocate enough space
1903 	 * for the 'struct accepted_reply' from RFC5531.
1904 	 */
1905 	req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1906 			max_t(size_t, proc->p_replen, 2);
1907 	req->rq_rcvsize <<= 2;
1908 
1909 	status = xprt->ops->buf_alloc(task);
1910 	trace_rpc_buf_alloc(task, status);
1911 	if (status == 0)
1912 		return;
1913 	if (status != -ENOMEM) {
1914 		rpc_call_rpcerror(task, status);
1915 		return;
1916 	}
1917 
1918 	if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1919 		task->tk_action = call_allocate;
1920 		rpc_delay(task, HZ>>4);
1921 		return;
1922 	}
1923 
1924 	rpc_call_rpcerror(task, -ERESTARTSYS);
1925 }
1926 
1927 static int
1928 rpc_task_need_encode(struct rpc_task *task)
1929 {
1930 	return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1931 		(!(task->tk_flags & RPC_TASK_SENT) ||
1932 		 !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1933 		 xprt_request_need_retransmit(task));
1934 }
1935 
1936 static void
1937 rpc_xdr_encode(struct rpc_task *task)
1938 {
1939 	struct rpc_rqst	*req = task->tk_rqstp;
1940 	struct xdr_stream xdr;
1941 
1942 	xdr_buf_init(&req->rq_snd_buf,
1943 		     req->rq_buffer,
1944 		     req->rq_callsize);
1945 	xdr_buf_init(&req->rq_rcv_buf,
1946 		     req->rq_rbuffer,
1947 		     req->rq_rcvsize);
1948 
1949 	req->rq_reply_bytes_recvd = 0;
1950 	req->rq_snd_buf.head[0].iov_len = 0;
1951 	xdr_init_encode(&xdr, &req->rq_snd_buf,
1952 			req->rq_snd_buf.head[0].iov_base, req);
1953 	if (rpc_encode_header(task, &xdr))
1954 		return;
1955 
1956 	task->tk_status = rpcauth_wrap_req(task, &xdr);
1957 }
1958 
1959 /*
1960  * 3.	Encode arguments of an RPC call
1961  */
1962 static void
1963 call_encode(struct rpc_task *task)
1964 {
1965 	if (!rpc_task_need_encode(task))
1966 		goto out;
1967 
1968 	/* Dequeue task from the receive queue while we're encoding */
1969 	xprt_request_dequeue_xprt(task);
1970 	/* Encode here so that rpcsec_gss can use correct sequence number. */
1971 	rpc_xdr_encode(task);
1972 	/* Add task to reply queue before transmission to avoid races */
1973 	if (task->tk_status == 0 && rpc_reply_expected(task))
1974 		task->tk_status = xprt_request_enqueue_receive(task);
1975 	/* Did the encode result in an error condition? */
1976 	if (task->tk_status != 0) {
1977 		/* Was the error nonfatal? */
1978 		switch (task->tk_status) {
1979 		case -EAGAIN:
1980 		case -ENOMEM:
1981 			rpc_delay(task, HZ >> 4);
1982 			break;
1983 		case -EKEYEXPIRED:
1984 			if (!task->tk_cred_retry) {
1985 				rpc_call_rpcerror(task, task->tk_status);
1986 			} else {
1987 				task->tk_action = call_refresh;
1988 				task->tk_cred_retry--;
1989 				trace_rpc_retry_refresh_status(task);
1990 			}
1991 			break;
1992 		default:
1993 			rpc_call_rpcerror(task, task->tk_status);
1994 		}
1995 		return;
1996 	}
1997 
1998 	xprt_request_enqueue_transmit(task);
1999 out:
2000 	task->tk_action = call_transmit;
2001 	/* Check that the connection is OK */
2002 	if (!xprt_bound(task->tk_xprt))
2003 		task->tk_action = call_bind;
2004 	else if (!xprt_connected(task->tk_xprt))
2005 		task->tk_action = call_connect;
2006 }
2007 
2008 /*
2009  * Helpers to check if the task was already transmitted, and
2010  * to take action when that is the case.
2011  */
2012 static bool
2013 rpc_task_transmitted(struct rpc_task *task)
2014 {
2015 	return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
2016 }
2017 
2018 static void
2019 rpc_task_handle_transmitted(struct rpc_task *task)
2020 {
2021 	xprt_end_transmit(task);
2022 	task->tk_action = call_transmit_status;
2023 }
2024 
2025 /*
2026  * 4.	Get the server port number if not yet set
2027  */
2028 static void
2029 call_bind(struct rpc_task *task)
2030 {
2031 	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2032 
2033 	if (rpc_task_transmitted(task)) {
2034 		rpc_task_handle_transmitted(task);
2035 		return;
2036 	}
2037 
2038 	if (xprt_bound(xprt)) {
2039 		task->tk_action = call_connect;
2040 		return;
2041 	}
2042 
2043 	task->tk_action = call_bind_status;
2044 	if (!xprt_prepare_transmit(task))
2045 		return;
2046 
2047 	xprt->ops->rpcbind(task);
2048 }
2049 
2050 /*
2051  * 4a.	Sort out bind result
2052  */
2053 static void
2054 call_bind_status(struct rpc_task *task)
2055 {
2056 	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2057 	int status = -EIO;
2058 
2059 	if (rpc_task_transmitted(task)) {
2060 		rpc_task_handle_transmitted(task);
2061 		return;
2062 	}
2063 
2064 	if (task->tk_status >= 0)
2065 		goto out_next;
2066 	if (xprt_bound(xprt)) {
2067 		task->tk_status = 0;
2068 		goto out_next;
2069 	}
2070 
2071 	switch (task->tk_status) {
2072 	case -ENOMEM:
2073 		rpc_delay(task, HZ >> 2);
2074 		goto retry_timeout;
2075 	case -EACCES:
2076 		trace_rpcb_prog_unavail_err(task);
2077 		/* fail immediately if this is an RPC ping */
2078 		if (task->tk_msg.rpc_proc->p_proc == 0) {
2079 			status = -EOPNOTSUPP;
2080 			break;
2081 		}
2082 		rpc_delay(task, 3*HZ);
2083 		goto retry_timeout;
2084 	case -ENOBUFS:
2085 		rpc_delay(task, HZ >> 2);
2086 		goto retry_timeout;
2087 	case -EAGAIN:
2088 		goto retry_timeout;
2089 	case -ETIMEDOUT:
2090 		trace_rpcb_timeout_err(task);
2091 		goto retry_timeout;
2092 	case -EPFNOSUPPORT:
2093 		/* server doesn't support any rpcbind version we know of */
2094 		trace_rpcb_bind_version_err(task);
2095 		break;
2096 	case -EPROTONOSUPPORT:
2097 		trace_rpcb_bind_version_err(task);
2098 		goto retry_timeout;
2099 	case -ENETDOWN:
2100 	case -ENETUNREACH:
2101 		if (task->tk_flags & RPC_TASK_NETUNREACH_FATAL)
2102 			break;
2103 		fallthrough;
2104 	case -ECONNREFUSED:		/* connection problems */
2105 	case -ECONNRESET:
2106 	case -ECONNABORTED:
2107 	case -ENOTCONN:
2108 	case -EHOSTDOWN:
2109 	case -EHOSTUNREACH:
2110 	case -EPIPE:
2111 		trace_rpcb_unreachable_err(task);
2112 		if (!RPC_IS_SOFTCONN(task)) {
2113 			rpc_delay(task, 5*HZ);
2114 			goto retry_timeout;
2115 		}
2116 		status = task->tk_status;
2117 		break;
2118 	default:
2119 		trace_rpcb_unrecognized_err(task);
2120 	}
2121 
2122 	rpc_call_rpcerror(task, status);
2123 	return;
2124 out_next:
2125 	task->tk_action = call_connect;
2126 	return;
2127 retry_timeout:
2128 	task->tk_status = 0;
2129 	task->tk_action = call_bind;
2130 	rpc_check_timeout(task);
2131 }
2132 
2133 /*
2134  * 4b.	Connect to the RPC server
2135  */
2136 static void
2137 call_connect(struct rpc_task *task)
2138 {
2139 	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2140 
2141 	if (rpc_task_transmitted(task)) {
2142 		rpc_task_handle_transmitted(task);
2143 		return;
2144 	}
2145 
2146 	if (xprt_connected(xprt)) {
2147 		task->tk_action = call_transmit;
2148 		return;
2149 	}
2150 
2151 	task->tk_action = call_connect_status;
2152 	if (task->tk_status < 0)
2153 		return;
2154 	if (task->tk_flags & RPC_TASK_NOCONNECT) {
2155 		rpc_call_rpcerror(task, -ENOTCONN);
2156 		return;
2157 	}
2158 	if (!xprt_prepare_transmit(task))
2159 		return;
2160 	xprt_connect(task);
2161 }
2162 
2163 /*
2164  * 4c.	Sort out connect result
2165  */
2166 static void
2167 call_connect_status(struct rpc_task *task)
2168 {
2169 	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2170 	struct rpc_clnt *clnt = task->tk_client;
2171 	int status = task->tk_status;
2172 
2173 	if (rpc_task_transmitted(task)) {
2174 		rpc_task_handle_transmitted(task);
2175 		return;
2176 	}
2177 
2178 	trace_rpc_connect_status(task);
2179 
2180 	if (task->tk_status == 0) {
2181 		clnt->cl_stats->netreconn++;
2182 		goto out_next;
2183 	}
2184 	if (xprt_connected(xprt)) {
2185 		task->tk_status = 0;
2186 		goto out_next;
2187 	}
2188 
2189 	task->tk_status = 0;
2190 	switch (status) {
2191 	case -ENETDOWN:
2192 	case -ENETUNREACH:
2193 		if (task->tk_flags & RPC_TASK_NETUNREACH_FATAL)
2194 			break;
2195 		fallthrough;
2196 	case -ECONNREFUSED:
2197 	case -ECONNRESET:
2198 		/* A positive refusal suggests a rebind is needed. */
2199 		if (clnt->cl_autobind) {
2200 			rpc_force_rebind(clnt);
2201 			if (RPC_IS_SOFTCONN(task))
2202 				break;
2203 			goto out_retry;
2204 		}
2205 		fallthrough;
2206 	case -ECONNABORTED:
2207 	case -EHOSTUNREACH:
2208 	case -EPIPE:
2209 	case -EPROTO:
2210 		xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2211 					    task->tk_rqstp->rq_connect_cookie);
2212 		if (RPC_IS_SOFTCONN(task))
2213 			break;
2214 		/* retry with existing socket, after a delay */
2215 		rpc_delay(task, 3*HZ);
2216 		fallthrough;
2217 	case -EADDRINUSE:
2218 	case -ENOTCONN:
2219 	case -EAGAIN:
2220 	case -ETIMEDOUT:
2221 		if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2222 		    (task->tk_flags & RPC_TASK_MOVEABLE) &&
2223 		    test_bit(XPRT_REMOVE, &xprt->state)) {
2224 			struct rpc_xprt *saved = task->tk_xprt;
2225 			struct rpc_xprt_switch *xps;
2226 
2227 			xps = rpc_clnt_xprt_switch_get(clnt);
2228 			if (xps->xps_nxprts > 1) {
2229 				long value;
2230 
2231 				xprt_release(task);
2232 				value = atomic_long_dec_return(&xprt->queuelen);
2233 				if (value == 0)
2234 					rpc_xprt_switch_remove_xprt(xps, saved,
2235 								    true);
2236 				xprt_put(saved);
2237 				task->tk_xprt = NULL;
2238 				task->tk_action = call_start;
2239 			}
2240 			xprt_switch_put(xps);
2241 			if (!task->tk_xprt)
2242 				goto out;
2243 		}
2244 		goto out_retry;
2245 	case -ENOBUFS:
2246 		rpc_delay(task, HZ >> 2);
2247 		goto out_retry;
2248 	}
2249 	rpc_call_rpcerror(task, status);
2250 	return;
2251 out_next:
2252 	task->tk_action = call_transmit;
2253 	return;
2254 out_retry:
2255 	/* Check for timeouts before looping back to call_bind */
2256 	task->tk_action = call_bind;
2257 out:
2258 	rpc_check_timeout(task);
2259 }
2260 
2261 /*
2262  * 5.	Transmit the RPC request, and wait for reply
2263  */
2264 static void
2265 call_transmit(struct rpc_task *task)
2266 {
2267 	if (rpc_task_transmitted(task)) {
2268 		rpc_task_handle_transmitted(task);
2269 		return;
2270 	}
2271 
2272 	task->tk_action = call_transmit_status;
2273 	if (!xprt_prepare_transmit(task))
2274 		return;
2275 	task->tk_status = 0;
2276 	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2277 		if (!xprt_connected(task->tk_xprt)) {
2278 			task->tk_status = -ENOTCONN;
2279 			return;
2280 		}
2281 		xprt_transmit(task);
2282 	}
2283 	xprt_end_transmit(task);
2284 }
2285 
2286 /*
2287  * 5a.	Handle cleanup after a transmission
2288  */
2289 static void
2290 call_transmit_status(struct rpc_task *task)
2291 {
2292 	task->tk_action = call_status;
2293 
2294 	/*
2295 	 * Common case: success.  Force the compiler to put this
2296 	 * test first.
2297 	 */
2298 	if (rpc_task_transmitted(task)) {
2299 		task->tk_status = 0;
2300 		xprt_request_wait_receive(task);
2301 		return;
2302 	}
2303 
2304 	switch (task->tk_status) {
2305 	default:
2306 		break;
2307 	case -EBADMSG:
2308 		task->tk_status = 0;
2309 		task->tk_action = call_encode;
2310 		break;
2311 		/*
2312 		 * Special cases: if we've been waiting on the
2313 		 * socket's write_space() callback, or if the
2314 		 * socket just returned a connection error,
2315 		 * then hold onto the transport lock.
2316 		 */
2317 	case -ENOMEM:
2318 	case -ENOBUFS:
2319 		rpc_delay(task, HZ>>2);
2320 		fallthrough;
2321 	case -EBADSLT:
2322 	case -EAGAIN:
2323 		task->tk_action = call_transmit;
2324 		task->tk_status = 0;
2325 		break;
2326 	case -EHOSTDOWN:
2327 	case -ENETDOWN:
2328 	case -EHOSTUNREACH:
2329 	case -ENETUNREACH:
2330 	case -EPERM:
2331 		break;
2332 	case -ECONNREFUSED:
2333 		if (RPC_IS_SOFTCONN(task)) {
2334 			if (!task->tk_msg.rpc_proc->p_proc)
2335 				trace_xprt_ping(task->tk_xprt,
2336 						task->tk_status);
2337 			rpc_call_rpcerror(task, task->tk_status);
2338 			return;
2339 		}
2340 		fallthrough;
2341 	case -ECONNRESET:
2342 	case -ECONNABORTED:
2343 	case -EADDRINUSE:
2344 	case -ENOTCONN:
2345 	case -EPIPE:
2346 		task->tk_action = call_bind;
2347 		task->tk_status = 0;
2348 		break;
2349 	}
2350 	rpc_check_timeout(task);
2351 }
2352 
2353 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2354 static void call_bc_transmit(struct rpc_task *task);
2355 static void call_bc_transmit_status(struct rpc_task *task);
2356 
2357 static void
2358 call_bc_encode(struct rpc_task *task)
2359 {
2360 	xprt_request_enqueue_transmit(task);
2361 	task->tk_action = call_bc_transmit;
2362 }
2363 
2364 /*
2365  * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
2366  * addition, disconnect on connectivity errors.
2367  */
2368 static void
2369 call_bc_transmit(struct rpc_task *task)
2370 {
2371 	task->tk_action = call_bc_transmit_status;
2372 	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2373 		if (!xprt_prepare_transmit(task))
2374 			return;
2375 		task->tk_status = 0;
2376 		xprt_transmit(task);
2377 	}
2378 	xprt_end_transmit(task);
2379 }
2380 
2381 static void
2382 call_bc_transmit_status(struct rpc_task *task)
2383 {
2384 	struct rpc_rqst *req = task->tk_rqstp;
2385 
2386 	if (rpc_task_transmitted(task))
2387 		task->tk_status = 0;
2388 
2389 	switch (task->tk_status) {
2390 	case 0:
2391 		/* Success */
2392 	case -ENETDOWN:
2393 	case -EHOSTDOWN:
2394 	case -EHOSTUNREACH:
2395 	case -ENETUNREACH:
2396 	case -ECONNRESET:
2397 	case -ECONNREFUSED:
2398 	case -EADDRINUSE:
2399 	case -ENOTCONN:
2400 	case -EPIPE:
2401 		break;
2402 	case -ENOMEM:
2403 	case -ENOBUFS:
2404 		rpc_delay(task, HZ>>2);
2405 		fallthrough;
2406 	case -EBADSLT:
2407 	case -EAGAIN:
2408 		task->tk_status = 0;
2409 		task->tk_action = call_bc_transmit;
2410 		return;
2411 	case -ETIMEDOUT:
2412 		/*
2413 		 * Problem reaching the server.  Disconnect and let the
2414 		 * forechannel reestablish the connection.  The server will
2415 		 * have to retransmit the backchannel request and we'll
2416 		 * reprocess it.  Since these ops are idempotent, there's no
2417 		 * need to cache our reply at this time.
2418 		 */
2419 		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2420 			"error: %d\n", task->tk_status);
2421 		xprt_conditional_disconnect(req->rq_xprt,
2422 			req->rq_connect_cookie);
2423 		break;
2424 	default:
2425 		/*
2426 		 * We were unable to reply and will have to drop the
2427 		 * request.  The server should reconnect and retransmit.
2428 		 */
2429 		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2430 			"error: %d\n", task->tk_status);
2431 		break;
2432 	}
2433 	task->tk_action = rpc_exit_task;
2434 }
2435 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2436 
2437 /*
2438  * 6.	Sort out the RPC call status
2439  */
2440 static void
2441 call_status(struct rpc_task *task)
2442 {
2443 	struct rpc_clnt	*clnt = task->tk_client;
2444 	int		status;
2445 
2446 	if (!task->tk_msg.rpc_proc->p_proc)
2447 		trace_xprt_ping(task->tk_xprt, task->tk_status);
2448 
2449 	status = task->tk_status;
2450 	if (status >= 0) {
2451 		task->tk_action = call_decode;
2452 		return;
2453 	}
2454 
2455 	trace_rpc_call_status(task);
2456 	task->tk_status = 0;
2457 	switch(status) {
2458 	case -ENETDOWN:
2459 	case -ENETUNREACH:
2460 		if (task->tk_flags & RPC_TASK_NETUNREACH_FATAL)
2461 			goto out_exit;
2462 		fallthrough;
2463 	case -EHOSTDOWN:
2464 	case -EHOSTUNREACH:
2465 	case -EPERM:
2466 		if (RPC_IS_SOFTCONN(task))
2467 			goto out_exit;
2468 		/*
2469 		 * Delay any retries for 3 seconds, then handle as if it
2470 		 * were a timeout.
2471 		 */
2472 		rpc_delay(task, 3*HZ);
2473 		fallthrough;
2474 	case -ETIMEDOUT:
2475 		break;
2476 	case -ECONNREFUSED:
2477 	case -ECONNRESET:
2478 	case -ECONNABORTED:
2479 	case -ENOTCONN:
2480 		rpc_force_rebind(clnt);
2481 		break;
2482 	case -EADDRINUSE:
2483 		rpc_delay(task, 3*HZ);
2484 		fallthrough;
2485 	case -EPIPE:
2486 	case -EAGAIN:
2487 		break;
2488 	case -ENFILE:
2489 	case -ENOBUFS:
2490 	case -ENOMEM:
2491 		rpc_delay(task, HZ>>2);
2492 		break;
2493 	case -EIO:
2494 		/* shutdown or soft timeout */
2495 		goto out_exit;
2496 	default:
2497 		if (clnt->cl_chatty)
2498 			printk("%s: RPC call returned error %d\n",
2499 			       clnt->cl_program->name, -status);
2500 		goto out_exit;
2501 	}
2502 	task->tk_action = call_encode;
2503 	rpc_check_timeout(task);
2504 	return;
2505 out_exit:
2506 	rpc_call_rpcerror(task, status);
2507 }
2508 
2509 static bool
2510 rpc_check_connected(const struct rpc_rqst *req)
2511 {
2512 	/* No allocated request or transport? return true */
2513 	if (!req || !req->rq_xprt)
2514 		return true;
2515 	return xprt_connected(req->rq_xprt);
2516 }
2517 
2518 static void
2519 rpc_check_timeout(struct rpc_task *task)
2520 {
2521 	struct rpc_clnt	*clnt = task->tk_client;
2522 
2523 	if (RPC_SIGNALLED(task))
2524 		return;
2525 
2526 	if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2527 		return;
2528 
2529 	trace_rpc_timeout_status(task);
2530 	task->tk_timeouts++;
2531 
2532 	if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2533 		rpc_call_rpcerror(task, -ETIMEDOUT);
2534 		return;
2535 	}
2536 
2537 	if (RPC_IS_SOFT(task)) {
2538 		/*
2539 		 * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2540 		 * been sent, it should time out only if the transport
2541 		 * connection gets terminally broken.
2542 		 */
2543 		if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2544 		    rpc_check_connected(task->tk_rqstp))
2545 			return;
2546 
2547 		if (clnt->cl_chatty) {
2548 			pr_notice_ratelimited(
2549 				"%s: server %s not responding, timed out\n",
2550 				clnt->cl_program->name,
2551 				task->tk_xprt->servername);
2552 		}
2553 		if (task->tk_flags & RPC_TASK_TIMEOUT)
2554 			rpc_call_rpcerror(task, -ETIMEDOUT);
2555 		else
2556 			__rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2557 		return;
2558 	}
2559 
2560 	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2561 		task->tk_flags |= RPC_CALL_MAJORSEEN;
2562 		if (clnt->cl_chatty) {
2563 			pr_notice_ratelimited(
2564 				"%s: server %s not responding, still trying\n",
2565 				clnt->cl_program->name,
2566 				task->tk_xprt->servername);
2567 		}
2568 	}
2569 	rpc_force_rebind(clnt);
2570 	/*
2571 	 * Did our request time out due to an RPCSEC_GSS out-of-sequence
2572 	 * event? RFC2203 requires the server to drop all such requests.
2573 	 */
2574 	rpcauth_invalcred(task);
2575 }
2576 
2577 /*
2578  * 7.	Decode the RPC reply
2579  */
2580 static void
2581 call_decode(struct rpc_task *task)
2582 {
2583 	struct rpc_clnt	*clnt = task->tk_client;
2584 	struct rpc_rqst	*req = task->tk_rqstp;
2585 	struct xdr_stream xdr;
2586 	int err;
2587 
2588 	if (!task->tk_msg.rpc_proc->p_decode) {
2589 		task->tk_action = rpc_exit_task;
2590 		return;
2591 	}
2592 
2593 	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2594 		if (clnt->cl_chatty) {
2595 			pr_notice_ratelimited("%s: server %s OK\n",
2596 				clnt->cl_program->name,
2597 				task->tk_xprt->servername);
2598 		}
2599 		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2600 	}
2601 
2602 	/*
2603 	 * Did we ever call xprt_complete_rqst()? If not, we should assume
2604 	 * the message is incomplete.
2605 	 */
2606 	err = -EAGAIN;
2607 	if (!req->rq_reply_bytes_recvd)
2608 		goto out;
2609 
2610 	/* Ensure that we see all writes made by xprt_complete_rqst()
2611 	 * before it changed req->rq_reply_bytes_recvd.
2612 	 */
2613 	smp_rmb();
2614 
2615 	req->rq_rcv_buf.len = req->rq_private_buf.len;
2616 	trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2617 
2618 	/* Check that the softirq receive buffer is valid */
2619 	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2620 				sizeof(req->rq_rcv_buf)) != 0);
2621 
2622 	xdr_init_decode(&xdr, &req->rq_rcv_buf,
2623 			req->rq_rcv_buf.head[0].iov_base, req);
2624 	err = rpc_decode_header(task, &xdr);
2625 out:
2626 	switch (err) {
2627 	case 0:
2628 		task->tk_action = rpc_exit_task;
2629 		task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2630 		xdr_finish_decode(&xdr);
2631 		return;
2632 	case -EAGAIN:
2633 		task->tk_status = 0;
2634 		if (task->tk_client->cl_discrtry)
2635 			xprt_conditional_disconnect(req->rq_xprt,
2636 						    req->rq_connect_cookie);
2637 		task->tk_action = call_encode;
2638 		rpc_check_timeout(task);
2639 		break;
2640 	case -EKEYREJECTED:
2641 		task->tk_action = call_reserve;
2642 		rpc_check_timeout(task);
2643 		rpcauth_invalcred(task);
2644 		/* Ensure we obtain a new XID if we retry! */
2645 		xprt_release(task);
2646 	}
2647 }
2648 
2649 static int
2650 rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2651 {
2652 	struct rpc_clnt *clnt = task->tk_client;
2653 	struct rpc_rqst	*req = task->tk_rqstp;
2654 	__be32 *p;
2655 	int error;
2656 
2657 	error = -EMSGSIZE;
2658 	p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2659 	if (!p)
2660 		goto out_fail;
2661 	*p++ = req->rq_xid;
2662 	*p++ = rpc_call;
2663 	*p++ = cpu_to_be32(RPC_VERSION);
2664 	*p++ = cpu_to_be32(clnt->cl_prog);
2665 	*p++ = cpu_to_be32(clnt->cl_vers);
2666 	*p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2667 
2668 	error = rpcauth_marshcred(task, xdr);
2669 	if (error < 0)
2670 		goto out_fail;
2671 	return 0;
2672 out_fail:
2673 	trace_rpc_bad_callhdr(task);
2674 	rpc_call_rpcerror(task, error);
2675 	return error;
2676 }
2677 
2678 static noinline int
2679 rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2680 {
2681 	struct rpc_clnt *clnt = task->tk_client;
2682 	int error;
2683 	__be32 *p;
2684 
2685 	/* RFC-1014 says that the representation of XDR data must be a
2686 	 * multiple of four bytes
2687 	 * - if it isn't pointer subtraction in the NFS client may give
2688 	 *   undefined results
2689 	 */
2690 	if (task->tk_rqstp->rq_rcv_buf.len & 3)
2691 		goto out_unparsable;
2692 
2693 	p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2694 	if (!p)
2695 		goto out_unparsable;
2696 	p++;	/* skip XID */
2697 	if (*p++ != rpc_reply)
2698 		goto out_unparsable;
2699 	if (*p++ != rpc_msg_accepted)
2700 		goto out_msg_denied;
2701 
2702 	error = rpcauth_checkverf(task, xdr);
2703 	if (error) {
2704 		struct rpc_cred *cred = task->tk_rqstp->rq_cred;
2705 
2706 		if (!test_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags)) {
2707 			rpcauth_invalcred(task);
2708 			if (!task->tk_cred_retry)
2709 				goto out_err;
2710 			task->tk_cred_retry--;
2711 			trace_rpc__stale_creds(task);
2712 			return -EKEYREJECTED;
2713 		}
2714 		goto out_verifier;
2715 	}
2716 
2717 	p = xdr_inline_decode(xdr, sizeof(*p));
2718 	if (!p)
2719 		goto out_unparsable;
2720 	switch (*p) {
2721 	case rpc_success:
2722 		return 0;
2723 	case rpc_prog_unavail:
2724 		trace_rpc__prog_unavail(task);
2725 		error = -EPFNOSUPPORT;
2726 		goto out_err;
2727 	case rpc_prog_mismatch:
2728 		trace_rpc__prog_mismatch(task);
2729 		error = -EPROTONOSUPPORT;
2730 		goto out_err;
2731 	case rpc_proc_unavail:
2732 		trace_rpc__proc_unavail(task);
2733 		error = -EOPNOTSUPP;
2734 		goto out_err;
2735 	case rpc_garbage_args:
2736 	case rpc_system_err:
2737 		trace_rpc__garbage_args(task);
2738 		error = -EIO;
2739 		break;
2740 	default:
2741 		goto out_unparsable;
2742 	}
2743 
2744 out_garbage:
2745 	clnt->cl_stats->rpcgarbage++;
2746 	if (task->tk_garb_retry) {
2747 		task->tk_garb_retry--;
2748 		task->tk_action = call_encode;
2749 		return -EAGAIN;
2750 	}
2751 out_err:
2752 	rpc_call_rpcerror(task, error);
2753 	return error;
2754 
2755 out_unparsable:
2756 	trace_rpc__unparsable(task);
2757 	error = -EIO;
2758 	goto out_garbage;
2759 
2760 out_verifier:
2761 	trace_rpc_bad_verifier(task);
2762 	switch (error) {
2763 	case -EPROTONOSUPPORT:
2764 		goto out_err;
2765 	case -EACCES:
2766 		/* possible RPCSEC_GSS out-of-sequence event (RFC2203),
2767 		 * reset recv state and keep waiting, don't retransmit
2768 		 */
2769 		task->tk_rqstp->rq_reply_bytes_recvd = 0;
2770 		task->tk_status = xprt_request_enqueue_receive(task);
2771 		task->tk_action = call_transmit_status;
2772 		return -EBADMSG;
2773 	default:
2774 		goto out_garbage;
2775 	}
2776 
2777 out_msg_denied:
2778 	error = -EACCES;
2779 	p = xdr_inline_decode(xdr, sizeof(*p));
2780 	if (!p)
2781 		goto out_unparsable;
2782 	switch (*p++) {
2783 	case rpc_auth_error:
2784 		break;
2785 	case rpc_mismatch:
2786 		trace_rpc__mismatch(task);
2787 		error = -EPROTONOSUPPORT;
2788 		goto out_err;
2789 	default:
2790 		goto out_unparsable;
2791 	}
2792 
2793 	p = xdr_inline_decode(xdr, sizeof(*p));
2794 	if (!p)
2795 		goto out_unparsable;
2796 	switch (*p++) {
2797 	case rpc_autherr_rejectedcred:
2798 	case rpc_autherr_rejectedverf:
2799 	case rpcsec_gsserr_credproblem:
2800 	case rpcsec_gsserr_ctxproblem:
2801 		rpcauth_invalcred(task);
2802 		if (!task->tk_cred_retry)
2803 			break;
2804 		task->tk_cred_retry--;
2805 		trace_rpc__stale_creds(task);
2806 		return -EKEYREJECTED;
2807 	case rpc_autherr_badcred:
2808 	case rpc_autherr_badverf:
2809 		/* possibly garbled cred/verf? */
2810 		if (!task->tk_garb_retry)
2811 			break;
2812 		task->tk_garb_retry--;
2813 		trace_rpc__bad_creds(task);
2814 		task->tk_action = call_encode;
2815 		return -EAGAIN;
2816 	case rpc_autherr_tooweak:
2817 		trace_rpc__auth_tooweak(task);
2818 		pr_warn("RPC: server %s requires stronger authentication.\n",
2819 			task->tk_xprt->servername);
2820 		break;
2821 	default:
2822 		goto out_unparsable;
2823 	}
2824 	goto out_err;
2825 }
2826 
2827 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2828 		const void *obj)
2829 {
2830 }
2831 
2832 static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2833 		void *obj)
2834 {
2835 	return 0;
2836 }
2837 
2838 static const struct rpc_procinfo rpcproc_null = {
2839 	.p_encode = rpcproc_encode_null,
2840 	.p_decode = rpcproc_decode_null,
2841 };
2842 
2843 static const struct rpc_procinfo rpcproc_null_noreply = {
2844 	.p_encode = rpcproc_encode_null,
2845 };
2846 
2847 static void
2848 rpc_null_call_prepare(struct rpc_task *task, void *data)
2849 {
2850 	task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
2851 	rpc_call_start(task);
2852 }
2853 
2854 static const struct rpc_call_ops rpc_null_ops = {
2855 	.rpc_call_prepare = rpc_null_call_prepare,
2856 	.rpc_call_done = rpc_default_callback,
2857 };
2858 
2859 static
2860 struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2861 		struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2862 		const struct rpc_call_ops *ops, void *data)
2863 {
2864 	struct rpc_message msg = {
2865 		.rpc_proc = &rpcproc_null,
2866 	};
2867 	struct rpc_task_setup task_setup_data = {
2868 		.rpc_client = clnt,
2869 		.rpc_xprt = xprt,
2870 		.rpc_message = &msg,
2871 		.rpc_op_cred = cred,
2872 		.callback_ops = ops ?: &rpc_null_ops,
2873 		.callback_data = data,
2874 		.flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2875 			 RPC_TASK_NULLCREDS,
2876 	};
2877 
2878 	return rpc_run_task(&task_setup_data);
2879 }
2880 
2881 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2882 {
2883 	return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2884 }
2885 EXPORT_SYMBOL_GPL(rpc_call_null);
2886 
2887 static int rpc_ping(struct rpc_clnt *clnt)
2888 {
2889 	struct rpc_task	*task;
2890 	int status;
2891 
2892 	if (clnt->cl_auth->au_ops->ping)
2893 		return clnt->cl_auth->au_ops->ping(clnt);
2894 
2895 	task = rpc_call_null_helper(clnt, NULL, NULL, 0, NULL, NULL);
2896 	if (IS_ERR(task))
2897 		return PTR_ERR(task);
2898 	status = task->tk_status;
2899 	rpc_put_task(task);
2900 	return status;
2901 }
2902 
2903 static int rpc_ping_noreply(struct rpc_clnt *clnt)
2904 {
2905 	struct rpc_message msg = {
2906 		.rpc_proc = &rpcproc_null_noreply,
2907 	};
2908 	struct rpc_task_setup task_setup_data = {
2909 		.rpc_client = clnt,
2910 		.rpc_message = &msg,
2911 		.callback_ops = &rpc_null_ops,
2912 		.flags = RPC_TASK_SOFT | RPC_TASK_SOFTCONN | RPC_TASK_NULLCREDS,
2913 	};
2914 	struct rpc_task	*task;
2915 	int status;
2916 
2917 	task = rpc_run_task(&task_setup_data);
2918 	if (IS_ERR(task))
2919 		return PTR_ERR(task);
2920 	status = task->tk_status;
2921 	rpc_put_task(task);
2922 	return status;
2923 }
2924 
2925 struct rpc_cb_add_xprt_calldata {
2926 	struct rpc_xprt_switch *xps;
2927 	struct rpc_xprt *xprt;
2928 };
2929 
2930 static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2931 {
2932 	struct rpc_cb_add_xprt_calldata *data = calldata;
2933 
2934 	if (task->tk_status == 0)
2935 		rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2936 }
2937 
2938 static void rpc_cb_add_xprt_release(void *calldata)
2939 {
2940 	struct rpc_cb_add_xprt_calldata *data = calldata;
2941 
2942 	xprt_put(data->xprt);
2943 	xprt_switch_put(data->xps);
2944 	kfree(data);
2945 }
2946 
2947 static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2948 	.rpc_call_prepare = rpc_null_call_prepare,
2949 	.rpc_call_done = rpc_cb_add_xprt_done,
2950 	.rpc_release = rpc_cb_add_xprt_release,
2951 };
2952 
2953 /**
2954  * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2955  * @clnt: pointer to struct rpc_clnt
2956  * @xps: pointer to struct rpc_xprt_switch,
2957  * @xprt: pointer struct rpc_xprt
2958  * @in_max_connect: pointer to the max_connect value for the passed in xprt transport
2959  */
2960 int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2961 		struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2962 		void *in_max_connect)
2963 {
2964 	struct rpc_cb_add_xprt_calldata *data;
2965 	struct rpc_task *task;
2966 	int max_connect = clnt->cl_max_connect;
2967 
2968 	if (in_max_connect)
2969 		max_connect = *(int *)in_max_connect;
2970 	if (xps->xps_nunique_destaddr_xprts + 1 > max_connect) {
2971 		rcu_read_lock();
2972 		pr_warn("SUNRPC: reached max allowed number (%d) did not add "
2973 			"transport to server: %s\n", max_connect,
2974 			rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
2975 		rcu_read_unlock();
2976 		return -EINVAL;
2977 	}
2978 
2979 	data = kmalloc(sizeof(*data), GFP_KERNEL);
2980 	if (!data)
2981 		return -ENOMEM;
2982 	data->xps = xprt_switch_get(xps);
2983 	data->xprt = xprt_get(xprt);
2984 	if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2985 		rpc_cb_add_xprt_release(data);
2986 		goto success;
2987 	}
2988 
2989 	task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2990 			&rpc_cb_add_xprt_call_ops, data);
2991 	if (IS_ERR(task))
2992 		return PTR_ERR(task);
2993 
2994 	data->xps->xps_nunique_destaddr_xprts++;
2995 	rpc_put_task(task);
2996 success:
2997 	return 1;
2998 }
2999 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
3000 
3001 static int rpc_clnt_add_xprt_helper(struct rpc_clnt *clnt,
3002 				    struct rpc_xprt *xprt,
3003 				    struct rpc_add_xprt_test *data)
3004 {
3005 	struct rpc_task *task;
3006 	int status = -EADDRINUSE;
3007 
3008 	/* Test the connection */
3009 	task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
3010 	if (IS_ERR(task))
3011 		return PTR_ERR(task);
3012 
3013 	status = task->tk_status;
3014 	rpc_put_task(task);
3015 
3016 	if (status < 0)
3017 		return status;
3018 
3019 	/* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
3020 	data->add_xprt_test(clnt, xprt, data->data);
3021 
3022 	return 0;
3023 }
3024 
3025 /**
3026  * rpc_clnt_setup_test_and_add_xprt()
3027  *
3028  * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
3029  *   1) caller of the test function must dereference the rpc_xprt_switch
3030  *   and the rpc_xprt.
3031  *   2) test function must call rpc_xprt_switch_add_xprt, usually in
3032  *   the rpc_call_done routine.
3033  *
3034  * Upon success (return of 1), the test function adds the new
3035  * transport to the rpc_clnt xprt switch
3036  *
3037  * @clnt: struct rpc_clnt to get the new transport
3038  * @xps:  the rpc_xprt_switch to hold the new transport
3039  * @xprt: the rpc_xprt to test
3040  * @data: a struct rpc_add_xprt_test pointer that holds the test function
3041  *        and test function call data
3042  */
3043 int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
3044 				     struct rpc_xprt_switch *xps,
3045 				     struct rpc_xprt *xprt,
3046 				     void *data)
3047 {
3048 	int status = -EADDRINUSE;
3049 
3050 	xprt = xprt_get(xprt);
3051 	xprt_switch_get(xps);
3052 
3053 	if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
3054 		goto out_err;
3055 
3056 	status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
3057 	if (status < 0)
3058 		goto out_err;
3059 
3060 	status = 1;
3061 out_err:
3062 	xprt_put(xprt);
3063 	xprt_switch_put(xps);
3064 	if (status < 0)
3065 		pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not "
3066 			"added\n", status,
3067 			xprt->address_strings[RPC_DISPLAY_ADDR]);
3068 	/* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
3069 	return status;
3070 }
3071 EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
3072 
3073 /**
3074  * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
3075  * @clnt: pointer to struct rpc_clnt
3076  * @xprtargs: pointer to struct xprt_create
3077  * @setup: callback to test and/or set up the connection
3078  * @data: pointer to setup function data
3079  *
3080  * Creates a new transport using the parameters set in args and
3081  * adds it to clnt.
3082  * If ping is set, then test that connectivity succeeds before
3083  * adding the new transport.
3084  *
3085  */
3086 int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
3087 		struct xprt_create *xprtargs,
3088 		int (*setup)(struct rpc_clnt *,
3089 			struct rpc_xprt_switch *,
3090 			struct rpc_xprt *,
3091 			void *),
3092 		void *data)
3093 {
3094 	struct rpc_xprt_switch *xps;
3095 	struct rpc_xprt *xprt;
3096 	unsigned long connect_timeout;
3097 	unsigned long reconnect_timeout;
3098 	unsigned char resvport, reuseport;
3099 	int ret = 0, ident;
3100 
3101 	rcu_read_lock();
3102 	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3103 	xprt = xprt_iter_xprt(&clnt->cl_xpi);
3104 	if (xps == NULL || xprt == NULL) {
3105 		rcu_read_unlock();
3106 		xprt_switch_put(xps);
3107 		return -EAGAIN;
3108 	}
3109 	resvport = xprt->resvport;
3110 	reuseport = xprt->reuseport;
3111 	connect_timeout = xprt->connect_timeout;
3112 	reconnect_timeout = xprt->max_reconnect_timeout;
3113 	ident = xprt->xprt_class->ident;
3114 	rcu_read_unlock();
3115 
3116 	if (!xprtargs->ident)
3117 		xprtargs->ident = ident;
3118 	xprtargs->xprtsec = clnt->cl_xprtsec;
3119 	xprt = xprt_create_transport(xprtargs);
3120 	if (IS_ERR(xprt)) {
3121 		ret = PTR_ERR(xprt);
3122 		goto out_put_switch;
3123 	}
3124 	xprt->resvport = resvport;
3125 	xprt->reuseport = reuseport;
3126 
3127 	if (xprtargs->connect_timeout)
3128 		connect_timeout = xprtargs->connect_timeout;
3129 	if (xprtargs->reconnect_timeout)
3130 		reconnect_timeout = xprtargs->reconnect_timeout;
3131 	if (xprt->ops->set_connect_timeout != NULL)
3132 		xprt->ops->set_connect_timeout(xprt,
3133 				connect_timeout,
3134 				reconnect_timeout);
3135 
3136 	rpc_xprt_switch_set_roundrobin(xps);
3137 	if (setup) {
3138 		ret = setup(clnt, xps, xprt, data);
3139 		if (ret != 0)
3140 			goto out_put_xprt;
3141 	}
3142 	rpc_xprt_switch_add_xprt(xps, xprt);
3143 out_put_xprt:
3144 	xprt_put(xprt);
3145 out_put_switch:
3146 	xprt_switch_put(xps);
3147 	return ret;
3148 }
3149 EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
3150 
3151 static int rpc_xprt_probe_trunked(struct rpc_clnt *clnt,
3152 				  struct rpc_xprt *xprt,
3153 				  struct rpc_add_xprt_test *data)
3154 {
3155 	struct rpc_xprt *main_xprt;
3156 	int status = 0;
3157 
3158 	xprt_get(xprt);
3159 
3160 	rcu_read_lock();
3161 	main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3162 	status = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3163 				   (struct sockaddr *)&main_xprt->addr);
3164 	rcu_read_unlock();
3165 	xprt_put(main_xprt);
3166 	if (status || !test_bit(XPRT_OFFLINE, &xprt->state))
3167 		goto out;
3168 
3169 	status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
3170 out:
3171 	xprt_put(xprt);
3172 	return status;
3173 }
3174 
3175 /* rpc_clnt_probe_trunked_xprt -- probe offlined transport for session trunking
3176  * @clnt rpc_clnt structure
3177  *
3178  * For each offlined transport found in the rpc_clnt structure call
3179  * the function rpc_xprt_probe_trunked() which will determine if this
3180  * transport still belongs to the trunking group.
3181  */
3182 void rpc_clnt_probe_trunked_xprts(struct rpc_clnt *clnt,
3183 				  struct rpc_add_xprt_test *data)
3184 {
3185 	struct rpc_xprt_iter xpi;
3186 	int ret;
3187 
3188 	ret = rpc_clnt_xprt_iter_offline_init(clnt, &xpi);
3189 	if (ret)
3190 		return;
3191 	for (;;) {
3192 		struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
3193 
3194 		if (!xprt)
3195 			break;
3196 		ret = rpc_xprt_probe_trunked(clnt, xprt, data);
3197 		xprt_put(xprt);
3198 		if (ret < 0)
3199 			break;
3200 		xprt_iter_rewind(&xpi);
3201 	}
3202 	xprt_iter_destroy(&xpi);
3203 }
3204 EXPORT_SYMBOL_GPL(rpc_clnt_probe_trunked_xprts);
3205 
3206 static int rpc_xprt_offline(struct rpc_clnt *clnt,
3207 			    struct rpc_xprt *xprt,
3208 			    void *data)
3209 {
3210 	struct rpc_xprt *main_xprt;
3211 	struct rpc_xprt_switch *xps;
3212 	int err = 0;
3213 
3214 	xprt_get(xprt);
3215 
3216 	rcu_read_lock();
3217 	main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3218 	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3219 	err = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3220 				(struct sockaddr *)&main_xprt->addr);
3221 	rcu_read_unlock();
3222 	xprt_put(main_xprt);
3223 	if (err)
3224 		goto out;
3225 
3226 	if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) {
3227 		err = -EINTR;
3228 		goto out;
3229 	}
3230 	xprt_set_offline_locked(xprt, xps);
3231 
3232 	xprt_release_write(xprt, NULL);
3233 out:
3234 	xprt_put(xprt);
3235 	xprt_switch_put(xps);
3236 	return err;
3237 }
3238 
3239 /* rpc_clnt_manage_trunked_xprts -- offline trunked transports
3240  * @clnt rpc_clnt structure
3241  *
3242  * For each active transport found in the rpc_clnt structure call
3243  * the function rpc_xprt_offline() which will identify trunked transports
3244  * and will mark them offline.
3245  */
3246 void rpc_clnt_manage_trunked_xprts(struct rpc_clnt *clnt)
3247 {
3248 	rpc_clnt_iterate_for_each_xprt(clnt, rpc_xprt_offline, NULL);
3249 }
3250 EXPORT_SYMBOL_GPL(rpc_clnt_manage_trunked_xprts);
3251 
3252 struct connect_timeout_data {
3253 	unsigned long connect_timeout;
3254 	unsigned long reconnect_timeout;
3255 };
3256 
3257 static int
3258 rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
3259 		struct rpc_xprt *xprt,
3260 		void *data)
3261 {
3262 	struct connect_timeout_data *timeo = data;
3263 
3264 	if (xprt->ops->set_connect_timeout)
3265 		xprt->ops->set_connect_timeout(xprt,
3266 				timeo->connect_timeout,
3267 				timeo->reconnect_timeout);
3268 	return 0;
3269 }
3270 
3271 void
3272 rpc_set_connect_timeout(struct rpc_clnt *clnt,
3273 		unsigned long connect_timeout,
3274 		unsigned long reconnect_timeout)
3275 {
3276 	struct connect_timeout_data timeout = {
3277 		.connect_timeout = connect_timeout,
3278 		.reconnect_timeout = reconnect_timeout,
3279 	};
3280 	rpc_clnt_iterate_for_each_xprt(clnt,
3281 			rpc_xprt_set_connect_timeout,
3282 			&timeout);
3283 }
3284 EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
3285 
3286 void rpc_clnt_xprt_set_online(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3287 {
3288 	struct rpc_xprt_switch *xps;
3289 
3290 	xps = rpc_clnt_xprt_switch_get(clnt);
3291 	xprt_set_online_locked(xprt, xps);
3292 	xprt_switch_put(xps);
3293 }
3294 
3295 void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3296 {
3297 	struct rpc_xprt_switch *xps;
3298 
3299 	if (rpc_clnt_xprt_switch_has_addr(clnt,
3300 		(const struct sockaddr *)&xprt->addr)) {
3301 		return rpc_clnt_xprt_set_online(clnt, xprt);
3302 	}
3303 
3304 	xps = rpc_clnt_xprt_switch_get(clnt);
3305 	rpc_xprt_switch_add_xprt(xps, xprt);
3306 	xprt_switch_put(xps);
3307 }
3308 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
3309 
3310 void rpc_clnt_xprt_switch_remove_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3311 {
3312 	struct rpc_xprt_switch *xps;
3313 
3314 	rcu_read_lock();
3315 	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3316 	rpc_xprt_switch_remove_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3317 				    xprt, 0);
3318 	xps->xps_nunique_destaddr_xprts--;
3319 	rcu_read_unlock();
3320 }
3321 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_remove_xprt);
3322 
3323 bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
3324 				   const struct sockaddr *sap)
3325 {
3326 	struct rpc_xprt_switch *xps;
3327 	bool ret;
3328 
3329 	rcu_read_lock();
3330 	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3331 	ret = rpc_xprt_switch_has_addr(xps, sap);
3332 	rcu_read_unlock();
3333 	return ret;
3334 }
3335 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
3336 
3337 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
3338 static void rpc_show_header(struct rpc_clnt *clnt)
3339 {
3340 	printk(KERN_INFO "clnt[%pISpc] RPC tasks[%d]\n",
3341 	       (struct sockaddr *)&clnt->cl_xprt->addr,
3342 	       atomic_read(&clnt->cl_task_count));
3343 	printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
3344 		"-timeout ---ops--\n");
3345 }
3346 
3347 static void rpc_show_task(const struct rpc_clnt *clnt,
3348 			  const struct rpc_task *task)
3349 {
3350 	const char *rpc_waitq = "none";
3351 
3352 	if (RPC_IS_QUEUED(task))
3353 		rpc_waitq = rpc_qname(task->tk_waitqueue);
3354 
3355 	printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3356 		task->tk_pid, task->tk_flags, task->tk_status,
3357 		clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3358 		clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3359 		task->tk_action, rpc_waitq);
3360 }
3361 
3362 void rpc_show_tasks(struct net *net)
3363 {
3364 	struct rpc_clnt *clnt;
3365 	struct rpc_task *task;
3366 	int header = 0;
3367 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3368 
3369 	spin_lock(&sn->rpc_client_lock);
3370 	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3371 		spin_lock(&clnt->cl_lock);
3372 		list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3373 			if (!header) {
3374 				rpc_show_header(clnt);
3375 				header++;
3376 			}
3377 			rpc_show_task(clnt, task);
3378 		}
3379 		spin_unlock(&clnt->cl_lock);
3380 	}
3381 	spin_unlock(&sn->rpc_client_lock);
3382 }
3383 #endif
3384 
3385 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3386 static int
3387 rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3388 		struct rpc_xprt *xprt,
3389 		void *dummy)
3390 {
3391 	return xprt_enable_swap(xprt);
3392 }
3393 
3394 int
3395 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3396 {
3397 	while (clnt != clnt->cl_parent)
3398 		clnt = clnt->cl_parent;
3399 	if (atomic_inc_return(&clnt->cl_swapper) == 1)
3400 		return rpc_clnt_iterate_for_each_xprt(clnt,
3401 				rpc_clnt_swap_activate_callback, NULL);
3402 	return 0;
3403 }
3404 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3405 
3406 static int
3407 rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3408 		struct rpc_xprt *xprt,
3409 		void *dummy)
3410 {
3411 	xprt_disable_swap(xprt);
3412 	return 0;
3413 }
3414 
3415 void
3416 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3417 {
3418 	while (clnt != clnt->cl_parent)
3419 		clnt = clnt->cl_parent;
3420 	if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3421 		rpc_clnt_iterate_for_each_xprt(clnt,
3422 				rpc_clnt_swap_deactivate_callback, NULL);
3423 }
3424 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3425 #endif /* CONFIG_SUNRPC_SWAP */
3426