xref: /illumos-gate/usr/src/uts/common/rpc/clnt_rdma.c (revision 9ffca3735a6d60d1994f185af63e16705e87d2c5)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
26 /* All Rights Reserved */
27 /*
28  * Portions of this source code were derived from Berkeley
29  * 4.3 BSD under license from the Regents of the University of
30  * California.
31  */
32 
33 #include <sys/param.h>
34 #include <sys/types.h>
35 #include <sys/user.h>
36 #include <sys/systm.h>
37 #include <sys/sysmacros.h>
38 #include <sys/errno.h>
39 #include <sys/kmem.h>
40 #include <sys/debug.h>
41 #include <sys/systm.h>
42 #include <sys/kstat.h>
43 #include <sys/t_lock.h>
44 #include <sys/ddi.h>
45 #include <sys/cmn_err.h>
46 #include <sys/time.h>
47 #include <sys/isa_defs.h>
48 #include <sys/zone.h>
49 #include <sys/sdt.h>
50 
51 #include <rpc/types.h>
52 #include <rpc/xdr.h>
53 #include <rpc/auth.h>
54 #include <rpc/clnt.h>
55 #include <rpc/rpc_msg.h>
56 #include <rpc/rpc_rdma.h>
57 #include <nfs/nfs.h>
58 #include <nfs/nfs4_kprot.h>
59 
60 static uint32_t rdma_bufs_rqst = RDMA_BUFS_RQST;
61 
62 static int clnt_compose_rpcmsg(CLIENT *, rpcproc_t, rdma_buf_t *,
63 			    XDR *, xdrproc_t, caddr_t);
64 static int  clnt_compose_rdma_header(CONN *, CLIENT *, rdma_buf_t *,
65 		    XDR **, uint_t *);
66 static int clnt_setup_rlist(CONN *, XDR *, XDR *);
67 static int clnt_setup_wlist(CONN *, XDR *, XDR *);
68 static int clnt_setup_long_reply(CONN *, struct clist **, uint_t);
69 static void clnt_check_credit(CONN *);
70 static void clnt_return_credit(CONN *);
71 static void clnt_decode_long_reply(CONN *, struct clist *,
72 		struct clist *, XDR *, XDR **, struct clist *,
73 		struct clist *, uint_t, uint_t);
74 
75 static void clnt_update_credit(CONN *, uint32_t);
76 static void check_dereg_wlist(CONN *, struct clist *);
77 
78 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
79     caddr_t, xdrproc_t, caddr_t, struct timeval);
80 static void	clnt_rdma_kabort(CLIENT *);
81 static void	clnt_rdma_kerror(CLIENT *, struct rpc_err *);
82 static bool_t	clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
83 static void	clnt_rdma_kdestroy(CLIENT *);
84 static bool_t	clnt_rdma_kcontrol(CLIENT *, int, char *);
85 static int	clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
86     struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);
87 
88 /*
89  * Operations vector for RDMA based RPC
90  */
91 static struct clnt_ops rdma_clnt_ops = {
92 	clnt_rdma_kcallit,	/* do rpc call */
93 	clnt_rdma_kabort,	/* abort call */
94 	clnt_rdma_kerror,	/* return error status */
95 	clnt_rdma_kfreeres,	/* free results */
96 	clnt_rdma_kdestroy,	/* destroy rpc handle */
97 	clnt_rdma_kcontrol,	/* the ioctl() of rpc */
98 	clnt_rdma_ksettimers,	/* set retry timers */
99 };
100 
101 /*
102  * The size of the preserialized RPC header information.
103  */
104 #define	CKU_HDRSIZE	20
105 #define	CLNT_RDMA_SUCCESS 0
106 #define	CLNT_RDMA_FAIL (-1)
107 
108 #define	AUTH_REFRESH_COUNT 2
109 
110 #define	IS_RPCSEC_GSS(authh)			\
111 	(authh->cl_auth->ah_cred.oa_flavor == RPCSEC_GSS)
112 
113 /*
114  * Per RPC RDMA endpoint details
115  */
116 typedef struct cku_private {
117 	CLIENT			cku_client;	/* client handle */
118 	rdma_mod_t		*cku_rd_mod;	/* underlying RDMA mod */
119 	void			*cku_rd_handle;	/* underlying RDMA device */
120 	struct netbuf		cku_addr;	/* remote netbuf address */
121 	int			cku_addrfmly;	/* for finding addr_type */
122 	struct rpc_err		cku_err;	/* error status */
123 	struct cred		*cku_cred;	/* credentials */
124 	XDR			cku_outxdr;	/* xdr stream for output */
125 	uint32_t		cku_outsz;
126 	XDR			cku_inxdr;	/* xdr stream for input */
127 	char			cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
128 	uint32_t		cku_xid;	/* current XID */
129 } cku_private_t;
130 
131 #define	CLNT_RDMA_DELAY	10	/* secs to delay after a connection failure */
132 static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;
133 
134 struct {
135 	kstat_named_t	rccalls;
136 	kstat_named_t	rcbadcalls;
137 	kstat_named_t	rcbadxids;
138 	kstat_named_t	rctimeouts;
139 	kstat_named_t	rcnewcreds;
140 	kstat_named_t	rcbadverfs;
141 	kstat_named_t	rctimers;
142 	kstat_named_t	rccantconn;
143 	kstat_named_t	rcnomem;
144 	kstat_named_t	rcintrs;
145 	kstat_named_t	rclongrpcs;
146 } rdmarcstat = {
147 	{ "calls",	KSTAT_DATA_UINT64 },
148 	{ "badcalls",	KSTAT_DATA_UINT64 },
149 	{ "badxids",	KSTAT_DATA_UINT64 },
150 	{ "timeouts",	KSTAT_DATA_UINT64 },
151 	{ "newcreds",	KSTAT_DATA_UINT64 },
152 	{ "badverfs",	KSTAT_DATA_UINT64 },
153 	{ "timers",	KSTAT_DATA_UINT64 },
154 	{ "cantconn",	KSTAT_DATA_UINT64 },
155 	{ "nomem",	KSTAT_DATA_UINT64 },
156 	{ "interrupts", KSTAT_DATA_UINT64 },
157 	{ "longrpc", 	KSTAT_DATA_UINT64 }
158 };
159 
160 kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
161 uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);
162 
163 #ifdef DEBUG
164 int rdma_clnt_debug = 0;
165 #endif
166 
167 #ifdef accurate_stats
168 extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */
169 
170 #define	RCSTAT_INCR(x)			\
171 	mutex_enter(&rdmarcstat_lock);	\
172 	rdmarcstat.x.value.ui64++;	\
173 	mutex_exit(&rdmarcstat_lock);
174 #else
175 #define	RCSTAT_INCR(x)			\
176 	rdmarcstat.x.value.ui64++;
177 #endif
178 
179 #define	ptoh(p)		(&((p)->cku_client))
180 #define	htop(h)		((cku_private_t *)((h)->cl_private))
181 
182 uint_t
183 calc_length(uint_t len)
184 {
185 	len = RNDUP(len);
186 
187 	if (len <= 64 * 1024) {
188 		if (len > 32 * 1024) {
189 			len = 64 * 1024;
190 		} else {
191 			if (len > 16 * 1024) {
192 				len = 32 * 1024;
193 			} else {
194 				if (len > 8 * 1024) {
195 					len = 16 * 1024;
196 				} else {
197 					len = 8 * 1024;
198 				}
199 			}
200 		}
201 	}
202 	return (len);
203 }
204 int
205 clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
206     rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
207 {
208 	CLIENT *h;
209 	struct cku_private *p;
210 	struct rpc_msg call_msg;
211 	rdma_registry_t *rp;
212 
213 	ASSERT(INGLOBALZONE(curproc));
214 
215 	if (cl == NULL)
216 		return (EINVAL);
217 	*cl = NULL;
218 
219 	p = kmem_zalloc(sizeof (*p), KM_SLEEP);
220 
221 	/*
222 	 * Find underlying RDMATF plugin
223 	 */
224 	rw_enter(&rdma_lock, RW_READER);
225 	rp = rdma_mod_head;
226 	while (rp != NULL) {
227 		if (strcmp(rp->r_mod->rdma_api, proto))
228 			rp = rp->r_next;
229 		else {
230 			p->cku_rd_mod = rp->r_mod;
231 			p->cku_rd_handle = handle;
232 			break;
233 		}
234 	}
235 	rw_exit(&rdma_lock);
236 
237 	if (p->cku_rd_mod == NULL) {
238 		/*
239 		 * Should not happen.
240 		 * No matching RDMATF plugin.
241 		 */
242 		kmem_free(p, sizeof (struct cku_private));
243 		return (EINVAL);
244 	}
245 
246 	h = ptoh(p);
247 	h->cl_ops = &rdma_clnt_ops;
248 	h->cl_private = (caddr_t)p;
249 	h->cl_auth = authkern_create();
250 
251 	/* call message, just used to pre-serialize below */
252 	call_msg.rm_xid = 0;
253 	call_msg.rm_direction = CALL;
254 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
255 	call_msg.rm_call.cb_prog = pgm;
256 	call_msg.rm_call.cb_vers = vers;
257 
258 	xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
259 	/* pre-serialize call message header */
260 	if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
261 		XDR_DESTROY(&p->cku_outxdr);
262 		auth_destroy(h->cl_auth);
263 		kmem_free(p, sizeof (struct cku_private));
264 		return (EINVAL);
265 	}
266 
267 	/*
268 	 * Set up the rpc information
269 	 */
270 	p->cku_cred = cred;
271 	p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
272 	p->cku_addr.maxlen = raddr->maxlen;
273 	p->cku_addr.len = raddr->len;
274 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
275 	p->cku_addrfmly = family;
276 
277 	*cl = h;
278 	return (0);
279 }
280 
281 static void
282 clnt_rdma_kdestroy(CLIENT *h)
283 {
284 	struct cku_private *p = htop(h);
285 
286 	kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
287 	kmem_free(p, sizeof (*p));
288 }
289 
290 void
291 clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
292     struct cred *cred)
293 {
294 	struct cku_private *p = htop(h);
295 	rdma_registry_t *rp;
296 
297 	ASSERT(INGLOBALZONE(curproc));
298 	/*
299 	 * Find underlying RDMATF plugin
300 	 */
301 	p->cku_rd_mod = NULL;
302 	rw_enter(&rdma_lock, RW_READER);
303 	rp = rdma_mod_head;
304 	while (rp != NULL) {
305 		if (strcmp(rp->r_mod->rdma_api, proto))
306 			rp = rp->r_next;
307 		else {
308 			p->cku_rd_mod = rp->r_mod;
309 			p->cku_rd_handle = handle;
310 			break;
311 		}
312 
313 	}
314 	rw_exit(&rdma_lock);
315 
316 	/*
317 	 * Set up the rpc information
318 	 */
319 	p->cku_cred = cred;
320 	p->cku_xid = 0;
321 
322 	if (p->cku_addr.maxlen < raddr->len) {
323 		if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
324 			kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
325 		p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
326 		p->cku_addr.maxlen = raddr->maxlen;
327 	}
328 
329 	p->cku_addr.len = raddr->len;
330 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
331 	h->cl_ops = &rdma_clnt_ops;
332 }
333 
334 static int
335 clnt_compose_rpcmsg(CLIENT *h, rpcproc_t procnum,
336     rdma_buf_t *rpcmsg, XDR *xdrs,
337     xdrproc_t xdr_args, caddr_t argsp)
338 {
339 	cku_private_t *p = htop(h);
340 
341 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
342 		/*
343 		 * Copy in the preserialized RPC header
344 		 * information.
345 		 */
346 		bcopy(p->cku_rpchdr, rpcmsg->addr, CKU_HDRSIZE);
347 
348 		/*
349 		 * transaction id is the 1st thing in the output
350 		 * buffer.
351 		 */
352 		/* LINTED pointer alignment */
353 		(*(uint32_t *)(rpcmsg->addr)) = p->cku_xid;
354 
355 		/* Skip the preserialized stuff. */
356 		XDR_SETPOS(xdrs, CKU_HDRSIZE);
357 
358 		/* Serialize dynamic stuff into the output buffer. */
359 		if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
360 		    (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
361 		    (!(*xdr_args)(xdrs, argsp))) {
362 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__dynargs);
363 			return (CLNT_RDMA_FAIL);
364 		}
365 		p->cku_outsz = XDR_GETPOS(xdrs);
366 	} else {
367 		uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
368 		IXDR_PUT_U_INT32(uproc, procnum);
369 		(*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
370 		XDR_SETPOS(xdrs, 0);
371 
372 		/* Serialize the procedure number and the arguments. */
373 		if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
374 		    CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
375 			if (rpcmsg->addr != xdrs->x_base) {
376 				rpcmsg->addr = xdrs->x_base;
377 				rpcmsg->len = xdr_getbufsize(xdrs);
378 			}
379 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__procnum);
380 			return (CLNT_RDMA_FAIL);
381 		}
382 		/*
383 		 * If we had to allocate a new buffer while encoding
384 		 * then update the addr and len.
385 		 */
386 		if (rpcmsg->addr != xdrs->x_base) {
387 			rpcmsg->addr = xdrs->x_base;
388 			rpcmsg->len = xdr_getbufsize(xdrs);
389 		}
390 
391 		p->cku_outsz = XDR_GETPOS(xdrs);
392 		DTRACE_PROBE1(krpc__i__compose__size__sec, int, p->cku_outsz)
393 	}
394 
395 	return (CLNT_RDMA_SUCCESS);
396 }
397 
398 static int
399 clnt_compose_rdma_header(CONN *conn, CLIENT *h, rdma_buf_t *clmsg,
400     XDR **xdrs, uint_t *op)
401 {
402 	cku_private_t *p = htop(h);
403 	uint_t vers;
404 	uint32_t rdma_credit = rdma_bufs_rqst;
405 
406 	vers = RPCRDMA_VERS;
407 	clmsg->type = SEND_BUFFER;
408 
409 	if (rdma_buf_alloc(conn, clmsg)) {
410 		return (CLNT_RDMA_FAIL);
411 	}
412 
413 	*xdrs = &p->cku_outxdr;
414 	xdrmem_create(*xdrs, clmsg->addr, clmsg->len, XDR_ENCODE);
415 
416 	(*(uint32_t *)clmsg->addr) = p->cku_xid;
417 	XDR_SETPOS(*xdrs, sizeof (uint32_t));
418 	(void) xdr_u_int(*xdrs, &vers);
419 	(void) xdr_u_int(*xdrs, &rdma_credit);
420 	(void) xdr_u_int(*xdrs, op);
421 
422 	return (CLNT_RDMA_SUCCESS);
423 }
424 
425 /*
426  * If xp_cl is NULL value, then the RPC payload will NOT carry
427  * an RDMA READ chunk list, in this case we insert FALSE into
428  * the XDR stream. Otherwise we use the clist and RDMA register
429  * the memory and encode the clist into the outbound XDR stream.
430  */
431 static int
432 clnt_setup_rlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
433 {
434 	int status;
435 	struct clist *rclp;
436 	int32_t xdr_flag = XDR_RDMA_RLIST_REG;
437 
438 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &rclp);
439 
440 	if (rclp != NULL) {
441 		status = clist_register(conn, rclp, CLIST_REG_SOURCE);
442 		if (status != RDMA_SUCCESS) {
443 			return (CLNT_RDMA_FAIL);
444 		}
445 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
446 	}
447 	(void) xdr_do_clist(xdrs, &rclp);
448 
449 	return (CLNT_RDMA_SUCCESS);
450 }
451 
452 /*
453  * If xp_wcl is NULL value, then the RPC payload will NOT carry
454  * an RDMA WRITE chunk list, in this case we insert FALSE into
455  * the XDR stream. Otherwise we use the clist and  RDMA register
456  * the memory and encode the clist into the outbound XDR stream.
457  */
458 static int
459 clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
460 {
461 	int status;
462 	struct clist *wlist;
463 	int32_t xdr_flag = XDR_RDMA_WLIST_REG;
464 
465 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_WLIST, &wlist);
466 
467 	if (wlist != NULL) {
468 		status = clist_register(conn, wlist, CLIST_REG_DST);
469 		if (status != RDMA_SUCCESS) {
470 			return (CLNT_RDMA_FAIL);
471 		}
472 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
473 	}
474 
475 	if (!xdr_encode_wlist(xdrs, wlist))
476 		return (CLNT_RDMA_FAIL);
477 
478 	return (CLNT_RDMA_SUCCESS);
479 }
480 
481 static int
482 clnt_setup_long_reply(CONN *conn, struct clist **clpp, uint_t length)
483 {
484 	if (length == 0) {
485 		*clpp = NULL;
486 		return (CLNT_RDMA_SUCCESS);
487 	}
488 
489 	*clpp = clist_alloc();
490 
491 	(*clpp)->rb_longbuf.len = calc_length(length);
492 	(*clpp)->rb_longbuf.type = RDMA_LONG_BUFFER;
493 
494 	if (rdma_buf_alloc(conn, &((*clpp)->rb_longbuf))) {
495 		clist_free(*clpp);
496 		*clpp = NULL;
497 		return (CLNT_RDMA_FAIL);
498 	}
499 
500 	(*clpp)->u.c_daddr3 = (*clpp)->rb_longbuf.addr;
501 	(*clpp)->c_len = (*clpp)->rb_longbuf.len;
502 	(*clpp)->c_next = NULL;
503 	(*clpp)->c_dmemhandle = (*clpp)->rb_longbuf.handle;
504 
505 	if (clist_register(conn, *clpp, CLIST_REG_DST)) {
506 		DTRACE_PROBE(krpc__e__clntrdma__longrep_regbuf);
507 		rdma_buf_free(conn, &((*clpp)->rb_longbuf));
508 		clist_free(*clpp);
509 		return (CLNT_RDMA_FAIL);
510 	}
511 
512 	return (CLNT_RDMA_SUCCESS);
513 }
514 
515 /* ARGSUSED */
516 static enum clnt_stat
517 clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
518     caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp,
519     struct timeval wait)
520 {
521 	cku_private_t *p = htop(h);
522 
523 	int 	try_call_again;
524 	int	refresh_attempt = AUTH_REFRESH_COUNT;
525 	int 	status;
526 	int 	msglen;
527 
528 	XDR	*call_xdrp, callxdr; /* for xdrrdma encoding the RPC call */
529 	XDR	*reply_xdrp, replyxdr; /* for xdrrdma decoding the RPC reply */
530 	XDR 	*rdmahdr_o_xdrs, *rdmahdr_i_xdrs;
531 
532 	struct rpc_msg 	reply_msg;
533 
534 	struct clist *cl_sendlist;
535 	struct clist *cl_recvlist;
536 	struct clist *cl;
537 	struct clist *cl_rpcmsg;
538 	struct clist *cl_rdma_reply;
539 	struct clist *cl_rpcreply_wlist;
540 	struct clist *cl_long_reply;
541 
542 	uint_t vers;
543 	uint_t op;
544 	uint_t off;
545 	uint32_t seg_array_len;
546 	uint_t long_reply_len;
547 	uint_t rpcsec_gss;
548 	uint_t gss_i_or_p;
549 
550 	CONN *conn = NULL;
551 	rdma_buf_t clmsg;
552 	rdma_buf_t rpcmsg;
553 	rdma_chunkinfo_lengths_t rcil;
554 
555 	clock_t	ticks;
556 	bool_t wlist_exists_reply;
557 
558 	uint32_t rdma_credit = rdma_bufs_rqst;
559 
560 	RCSTAT_INCR(rccalls);
561 
562 call_again:
563 
564 	bzero(&clmsg, sizeof (clmsg));
565 	bzero(&rpcmsg, sizeof (rpcmsg));
566 	try_call_again = 0;
567 	cl_sendlist = NULL;
568 	cl_recvlist = NULL;
569 	cl = NULL;
570 	cl_rpcmsg = NULL;
571 	cl_rdma_reply = NULL;
572 	call_xdrp = NULL;
573 	reply_xdrp = NULL;
574 	wlist_exists_reply  = FALSE;
575 	cl_rpcreply_wlist = NULL;
576 	cl_long_reply = NULL;
577 	rcil.rcil_len = 0;
578 	rcil.rcil_len_alt = 0;
579 	long_reply_len = 0;
580 
581 	/*
582 	 * Get unique xid
583 	 */
584 	if (p->cku_xid == 0)
585 		p->cku_xid = alloc_xid();
586 
587 	status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_addr,
588 	    p->cku_addrfmly, p->cku_rd_handle, &conn);
589 
590 	/*
591 	 * If there is a problem with the connection reflect the issue
592 	 * back to the higher level to address, we MAY delay for a short
593 	 * period so that we are kind to the transport.
594 	 */
595 	if (conn == NULL) {
596 		/*
597 		 * Connect failed to server. Could be because of one
598 		 * of several things. In some cases we don't want
599 		 * the caller to retry immediately - delay before
600 		 * returning to caller.
601 		 */
602 		switch (status) {
603 		case RDMA_TIMEDOUT:
604 			/*
605 			 * Already timed out. No need to delay
606 			 * some more.
607 			 */
608 			p->cku_err.re_status = RPC_TIMEDOUT;
609 			p->cku_err.re_errno = ETIMEDOUT;
610 			break;
611 		case RDMA_INTR:
612 			/*
613 			 * Failed because of an signal. Very likely
614 			 * the caller will not retry.
615 			 */
616 			p->cku_err.re_status = RPC_INTR;
617 			p->cku_err.re_errno = EINTR;
618 			break;
619 		default:
620 			/*
621 			 * All other failures - server down or service
622 			 * down or temporary resource failure. Delay before
623 			 * returning to caller.
624 			 */
625 			ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
626 			p->cku_err.re_status = RPC_CANTCONNECT;
627 			p->cku_err.re_errno = EIO;
628 
629 			if (h->cl_nosignal == TRUE) {
630 				delay(ticks);
631 			} else {
632 				if (delay_sig(ticks) == EINTR) {
633 					p->cku_err.re_status = RPC_INTR;
634 					p->cku_err.re_errno = EINTR;
635 				}
636 			}
637 			break;
638 		}
639 
640 		return (p->cku_err.re_status);
641 	}
642 
643 	clnt_check_credit(conn);
644 
645 	status = CLNT_RDMA_FAIL;
646 
647 	rpcsec_gss = gss_i_or_p = FALSE;
648 
649 	if (IS_RPCSEC_GSS(h)) {
650 		rpcsec_gss = TRUE;
651 		if (rpc_gss_get_service_type(h->cl_auth) ==
652 		    rpc_gss_svc_integrity ||
653 		    rpc_gss_get_service_type(h->cl_auth) ==
654 		    rpc_gss_svc_privacy)
655 			gss_i_or_p = TRUE;
656 	}
657 
658 	/*
659 	 * Try a regular RDMA message if RPCSEC_GSS is not being used
660 	 * or if RPCSEC_GSS is being used for authentication only.
661 	 */
662 	if (rpcsec_gss == FALSE ||
663 	    (rpcsec_gss == TRUE && gss_i_or_p == FALSE)) {
664 		/*
665 		 * Grab a send buffer for the request.  Try to
666 		 * encode it to see if it fits. If not, then it
667 		 * needs to be sent in a chunk.
668 		 */
669 		rpcmsg.type = SEND_BUFFER;
670 		if (rdma_buf_alloc(conn, &rpcmsg)) {
671 			DTRACE_PROBE(krpc__e__clntrdma__callit_nobufs);
672 			goto done;
673 		}
674 
675 		/* First try to encode into regular send buffer */
676 		op = RDMA_MSG;
677 
678 		call_xdrp = &callxdr;
679 
680 		xdrrdma_create(call_xdrp, rpcmsg.addr, rpcmsg.len,
681 		    rdma_minchunk, NULL, XDR_ENCODE, conn);
682 
683 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
684 		    xdr_args, argsp);
685 
686 		if (status != CLNT_RDMA_SUCCESS) {
687 			/* Clean up from previous encode attempt */
688 			rdma_buf_free(conn, &rpcmsg);
689 			XDR_DESTROY(call_xdrp);
690 		} else {
691 			XDR_CONTROL(call_xdrp, XDR_RDMA_GET_CHUNK_LEN, &rcil);
692 		}
693 	}
694 
695 	/* If the encode didn't work, then try a NOMSG */
696 	if (status != CLNT_RDMA_SUCCESS) {
697 
698 		msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT + MAX_AUTH_BYTES +
699 		    xdr_sizeof(xdr_args, argsp);
700 
701 		msglen = calc_length(msglen);
702 
703 		/* pick up the lengths for the reply buffer needed */
704 		(void) xdrrdma_sizeof(xdr_args, argsp, 0,
705 		    &rcil.rcil_len, &rcil.rcil_len_alt);
706 
707 		/*
708 		 * Construct a clist to describe the CHUNK_BUFFER
709 		 * for the rpcmsg.
710 		 */
711 		cl_rpcmsg = clist_alloc();
712 		cl_rpcmsg->c_len = msglen;
713 		cl_rpcmsg->rb_longbuf.type = RDMA_LONG_BUFFER;
714 		cl_rpcmsg->rb_longbuf.len = msglen;
715 		if (rdma_buf_alloc(conn, &cl_rpcmsg->rb_longbuf)) {
716 			clist_free(cl_rpcmsg);
717 			goto done;
718 		}
719 		cl_rpcmsg->w.c_saddr3 = cl_rpcmsg->rb_longbuf.addr;
720 
721 		op = RDMA_NOMSG;
722 		call_xdrp = &callxdr;
723 
724 		xdrrdma_create(call_xdrp, cl_rpcmsg->rb_longbuf.addr,
725 		    cl_rpcmsg->rb_longbuf.len, 0,
726 		    cl_rpcmsg, XDR_ENCODE, conn);
727 
728 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
729 		    xdr_args, argsp);
730 
731 		if (status != CLNT_RDMA_SUCCESS) {
732 			p->cku_err.re_status = RPC_CANTENCODEARGS;
733 			p->cku_err.re_errno = EIO;
734 			DTRACE_PROBE(krpc__e__clntrdma__callit__composemsg);
735 			goto done;
736 		}
737 	}
738 
739 	/*
740 	 * During the XDR_ENCODE we may have "allocated" an RDMA READ or
741 	 * RDMA WRITE clist.
742 	 *
743 	 * First pull the RDMA READ chunk list from the XDR private
744 	 * area to keep it handy.
745 	 */
746 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &cl);
747 
748 	if (gss_i_or_p) {
749 		long_reply_len = rcil.rcil_len + rcil.rcil_len_alt;
750 		long_reply_len += MAX_AUTH_BYTES;
751 	} else {
752 		long_reply_len = rcil.rcil_len;
753 	}
754 
755 	/*
756 	 * Update the chunk size information for the Long RPC msg.
757 	 */
758 	if (cl && op == RDMA_NOMSG)
759 		cl->c_len = p->cku_outsz;
760 
761 	/*
762 	 * Prepare the RDMA header. On success xdrs will hold the result
763 	 * of xdrmem_create() for a SEND_BUFFER.
764 	 */
765 	status = clnt_compose_rdma_header(conn, h, &clmsg,
766 	    &rdmahdr_o_xdrs, &op);
767 
768 	if (status != CLNT_RDMA_SUCCESS) {
769 		p->cku_err.re_status = RPC_CANTSEND;
770 		p->cku_err.re_errno = EIO;
771 		RCSTAT_INCR(rcnomem);
772 		DTRACE_PROBE(krpc__e__clntrdma__callit__nobufs2);
773 		goto done;
774 	}
775 
776 	/*
777 	 * Now insert the RDMA READ list iff present
778 	 */
779 	status = clnt_setup_rlist(conn, rdmahdr_o_xdrs, call_xdrp);
780 	if (status != CLNT_RDMA_SUCCESS) {
781 		DTRACE_PROBE(krpc__e__clntrdma__callit__clistreg);
782 		rdma_buf_free(conn, &clmsg);
783 		p->cku_err.re_status = RPC_CANTSEND;
784 		p->cku_err.re_errno = EIO;
785 		goto done;
786 	}
787 
788 	/*
789 	 * Setup RDMA WRITE chunk list for nfs read operation
790 	 * other operations will have a NULL which will result
791 	 * as a NULL list in the XDR stream.
792 	 */
793 	status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp);
794 	if (status != CLNT_RDMA_SUCCESS) {
795 		rdma_buf_free(conn, &clmsg);
796 		p->cku_err.re_status = RPC_CANTSEND;
797 		p->cku_err.re_errno = EIO;
798 		goto done;
799 	}
800 
801 	/*
802 	 * If NULL call and RPCSEC_GSS, provide a chunk such that
803 	 * large responses can flow back to the client.
804 	 * If RPCSEC_GSS with integrity or privacy is in use, get chunk.
805 	 */
806 	if ((procnum == 0 && rpcsec_gss == TRUE) ||
807 	    (rpcsec_gss == TRUE && gss_i_or_p == TRUE))
808 		long_reply_len += 1024;
809 
810 	status = clnt_setup_long_reply(conn, &cl_long_reply, long_reply_len);
811 
812 	if (status != CLNT_RDMA_SUCCESS) {
813 		rdma_buf_free(conn, &clmsg);
814 		p->cku_err.re_status = RPC_CANTSEND;
815 		p->cku_err.re_errno = EIO;
816 		goto done;
817 	}
818 
819 	/*
820 	 * XDR encode the RDMA_REPLY write chunk
821 	 */
822 	seg_array_len = (cl_long_reply ? 1 : 0);
823 	(void) xdr_encode_reply_wchunk(rdmahdr_o_xdrs, cl_long_reply,
824 	    seg_array_len);
825 
826 	/*
827 	 * Construct a clist in "sendlist" that represents what we
828 	 * will push over the wire.
829 	 *
830 	 * Start with the RDMA header and clist (if any)
831 	 */
832 	clist_add(&cl_sendlist, 0, XDR_GETPOS(rdmahdr_o_xdrs), &clmsg.handle,
833 	    clmsg.addr, NULL, NULL);
834 
835 	/*
836 	 * Put the RPC call message in  sendlist if small RPC
837 	 */
838 	if (op == RDMA_MSG) {
839 		clist_add(&cl_sendlist, 0, p->cku_outsz, &rpcmsg.handle,
840 		    rpcmsg.addr, NULL, NULL);
841 	} else {
842 		/* Long RPC already in chunk list */
843 		RCSTAT_INCR(rclongrpcs);
844 	}
845 
846 	/*
847 	 * Set up a reply buffer ready for the reply
848 	 */
849 	status = rdma_clnt_postrecv(conn, p->cku_xid);
850 	if (status != RDMA_SUCCESS) {
851 		rdma_buf_free(conn, &clmsg);
852 		p->cku_err.re_status = RPC_CANTSEND;
853 		p->cku_err.re_errno = EIO;
854 		goto done;
855 	}
856 
857 	/*
858 	 * sync the memory for dma
859 	 */
860 	if (cl != NULL) {
861 		status = clist_syncmem(conn, cl, CLIST_REG_SOURCE);
862 		if (status != RDMA_SUCCESS) {
863 			(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
864 			rdma_buf_free(conn, &clmsg);
865 			p->cku_err.re_status = RPC_CANTSEND;
866 			p->cku_err.re_errno = EIO;
867 			goto done;
868 		}
869 	}
870 
871 	/*
872 	 * Send the RDMA Header and RPC call message to the server
873 	 */
874 	status = RDMA_SEND(conn, cl_sendlist, p->cku_xid);
875 	if (status != RDMA_SUCCESS) {
876 		(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
877 		p->cku_err.re_status = RPC_CANTSEND;
878 		p->cku_err.re_errno = EIO;
879 		goto done;
880 	}
881 
882 	/*
883 	 * RDMA plugin now owns the send msg buffers.
884 	 * Clear them out and don't free them.
885 	 */
886 	clmsg.addr = NULL;
887 	if (rpcmsg.type == SEND_BUFFER)
888 		rpcmsg.addr = NULL;
889 
890 	/*
891 	 * Recv rpc reply
892 	 */
893 	status = RDMA_RECV(conn, &cl_recvlist, p->cku_xid);
894 
895 	/*
896 	 * Now check recv status
897 	 */
898 	if (status != 0) {
899 		if (status == RDMA_INTR) {
900 			p->cku_err.re_status = RPC_INTR;
901 			p->cku_err.re_errno = EINTR;
902 			RCSTAT_INCR(rcintrs);
903 		} else if (status == RPC_TIMEDOUT) {
904 			p->cku_err.re_status = RPC_TIMEDOUT;
905 			p->cku_err.re_errno = ETIMEDOUT;
906 			RCSTAT_INCR(rctimeouts);
907 		} else {
908 			p->cku_err.re_status = RPC_CANTRECV;
909 			p->cku_err.re_errno = EIO;
910 		}
911 		goto done;
912 	}
913 
914 	/*
915 	 * Process the reply message.
916 	 *
917 	 * First the chunk list (if any)
918 	 */
919 	rdmahdr_i_xdrs = &(p->cku_inxdr);
920 	xdrmem_create(rdmahdr_i_xdrs,
921 	    (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3,
922 	    cl_recvlist->c_len, XDR_DECODE);
923 
924 	/*
925 	 * Treat xid as opaque (xid is the first entity
926 	 * in the rpc rdma message).
927 	 * Skip xid and set the xdr position accordingly.
928 	 */
929 	XDR_SETPOS(rdmahdr_i_xdrs, sizeof (uint32_t));
930 	(void) xdr_u_int(rdmahdr_i_xdrs, &vers);
931 	(void) xdr_u_int(rdmahdr_i_xdrs, &rdma_credit);
932 	(void) xdr_u_int(rdmahdr_i_xdrs, &op);
933 	(void) xdr_do_clist(rdmahdr_i_xdrs, &cl);
934 
935 	clnt_update_credit(conn, rdma_credit);
936 
937 	wlist_exists_reply = FALSE;
938 	if (! xdr_decode_wlist(rdmahdr_i_xdrs, &cl_rpcreply_wlist,
939 	    &wlist_exists_reply)) {
940 		DTRACE_PROBE(krpc__e__clntrdma__callit__wlist_decode);
941 		p->cku_err.re_status = RPC_CANTDECODERES;
942 		p->cku_err.re_errno = EIO;
943 		goto done;
944 	}
945 
946 	/*
947 	 * The server shouldn't have sent a RDMA_SEND that
948 	 * the client needs to RDMA_WRITE a reply back to
949 	 * the server.  So silently ignoring what the
950 	 * server returns in the rdma_reply section of the
951 	 * header.
952 	 */
953 	(void) xdr_decode_reply_wchunk(rdmahdr_i_xdrs, &cl_rdma_reply);
954 	off = xdr_getpos(rdmahdr_i_xdrs);
955 
956 	clnt_decode_long_reply(conn, cl_long_reply,
957 	    cl_rdma_reply, &replyxdr, &reply_xdrp,
958 	    cl, cl_recvlist, op, off);
959 
960 	if (reply_xdrp == NULL)
961 		goto done;
962 
963 	if (wlist_exists_reply) {
964 		XDR_CONTROL(reply_xdrp, XDR_RDMA_SET_WLIST, cl_rpcreply_wlist);
965 	}
966 
967 	reply_msg.rm_direction = REPLY;
968 	reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
969 	reply_msg.acpted_rply.ar_stat = SUCCESS;
970 	reply_msg.acpted_rply.ar_verf = _null_auth;
971 
972 	/*
973 	 *  xdr_results will be done in AUTH_UNWRAP.
974 	 */
975 	reply_msg.acpted_rply.ar_results.where = NULL;
976 	reply_msg.acpted_rply.ar_results.proc = xdr_void;
977 
978 	/*
979 	 * Decode and validate the response.
980 	 */
981 	if (xdr_replymsg(reply_xdrp, &reply_msg)) {
982 		enum clnt_stat re_status;
983 
984 		_seterr_reply(&reply_msg, &(p->cku_err));
985 
986 		re_status = p->cku_err.re_status;
987 		if (re_status == RPC_SUCCESS) {
988 			/*
989 			 * Reply is good, check auth.
990 			 */
991 			if (!AUTH_VALIDATE(h->cl_auth,
992 			    &reply_msg.acpted_rply.ar_verf)) {
993 				p->cku_err.re_status = RPC_AUTHERROR;
994 				p->cku_err.re_why = AUTH_INVALIDRESP;
995 				RCSTAT_INCR(rcbadverfs);
996 				DTRACE_PROBE(
997 				    krpc__e__clntrdma__callit__authvalidate);
998 			} else if (!AUTH_UNWRAP(h->cl_auth, reply_xdrp,
999 			    xdr_results, resultsp)) {
1000 				p->cku_err.re_status = RPC_CANTDECODERES;
1001 				p->cku_err.re_errno = EIO;
1002 				DTRACE_PROBE(
1003 				    krpc__e__clntrdma__callit__authunwrap);
1004 			}
1005 		} else {
1006 			/* set errno in case we can't recover */
1007 			if (re_status != RPC_VERSMISMATCH &&
1008 			    re_status != RPC_AUTHERROR &&
1009 			    re_status != RPC_PROGVERSMISMATCH)
1010 				p->cku_err.re_errno = EIO;
1011 
1012 			if (re_status == RPC_AUTHERROR) {
1013 				if ((refresh_attempt > 0) &&
1014 				    AUTH_REFRESH(h->cl_auth, &reply_msg,
1015 				    p->cku_cred)) {
1016 					refresh_attempt--;
1017 					try_call_again = 1;
1018 					goto done;
1019 				}
1020 
1021 				try_call_again = 0;
1022 
1023 				/*
1024 				 * We have used the client handle to
1025 				 * do an AUTH_REFRESH and the RPC status may
1026 				 * be set to RPC_SUCCESS; Let's make sure to
1027 				 * set it to RPC_AUTHERROR.
1028 				 */
1029 				p->cku_err.re_status = RPC_AUTHERROR;
1030 
1031 				/*
1032 				 * Map recoverable and unrecoverable
1033 				 * authentication errors to appropriate
1034 				 * errno
1035 				 */
1036 				switch (p->cku_err.re_why) {
1037 				case AUTH_BADCRED:
1038 				case AUTH_BADVERF:
1039 				case AUTH_INVALIDRESP:
1040 				case AUTH_TOOWEAK:
1041 				case AUTH_FAILED:
1042 				case RPCSEC_GSS_NOCRED:
1043 				case RPCSEC_GSS_FAILED:
1044 					p->cku_err.re_errno = EACCES;
1045 					break;
1046 				case AUTH_REJECTEDCRED:
1047 				case AUTH_REJECTEDVERF:
1048 				default:
1049 					p->cku_err.re_errno = EIO;
1050 					break;
1051 				}
1052 			}
1053 			DTRACE_PROBE1(krpc__e__clntrdma__callit__rpcfailed,
1054 			    int, p->cku_err.re_why);
1055 		}
1056 	} else {
1057 		p->cku_err.re_status = RPC_CANTDECODERES;
1058 		p->cku_err.re_errno = EIO;
1059 		DTRACE_PROBE(krpc__e__clntrdma__callit__replymsg);
1060 	}
1061 
1062 done:
1063 	clnt_return_credit(conn);
1064 
1065 	if (cl_sendlist != NULL)
1066 		clist_free(cl_sendlist);
1067 
1068 	/*
1069 	 * If rpc reply is in a chunk, free it now.
1070 	 */
1071 	if (cl_long_reply) {
1072 		(void) clist_deregister(conn, cl_long_reply, CLIST_REG_DST);
1073 		rdma_buf_free(conn, &cl_long_reply->rb_longbuf);
1074 		clist_free(cl_long_reply);
1075 	}
1076 
1077 	if (call_xdrp)
1078 		XDR_DESTROY(call_xdrp);
1079 
1080 	if (reply_xdrp) {
1081 		(void) xdr_rpc_free_verifier(reply_xdrp, &reply_msg);
1082 		XDR_DESTROY(reply_xdrp);
1083 	}
1084 
1085 	if (cl_rdma_reply) {
1086 		clist_free(cl_rdma_reply);
1087 	}
1088 
1089 	if (cl_recvlist) {
1090 		rdma_buf_t	recvmsg = {0};
1091 		recvmsg.addr = (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3;
1092 		recvmsg.type = RECV_BUFFER;
1093 		RDMA_BUF_FREE(conn, &recvmsg);
1094 		clist_free(cl_recvlist);
1095 	}
1096 
1097 	RDMA_REL_CONN(conn);
1098 
1099 	if (try_call_again)
1100 		goto call_again;
1101 
1102 	if (p->cku_err.re_status != RPC_SUCCESS) {
1103 		RCSTAT_INCR(rcbadcalls);
1104 	}
1105 	return (p->cku_err.re_status);
1106 }
1107 
1108 
1109 static void
1110 clnt_decode_long_reply(CONN *conn,
1111     struct clist *cl_long_reply,
1112     struct clist *cl_rdma_reply, XDR *xdrs,
1113     XDR **rxdrp, struct clist *cl,
1114     struct clist *cl_recvlist,
1115     uint_t  op, uint_t off)
1116 {
1117 	if (op != RDMA_NOMSG) {
1118 		DTRACE_PROBE1(krpc__i__longrepl__rdmamsg__len,
1119 		    int, cl_recvlist->c_len - off);
1120 		xdrrdma_create(xdrs,
1121 		    (caddr_t)(uintptr_t)(cl_recvlist->w.c_saddr3 + off),
1122 		    cl_recvlist->c_len - off, 0, cl, XDR_DECODE, conn);
1123 		*rxdrp = xdrs;
1124 		return;
1125 	}
1126 
1127 	/* op must be RDMA_NOMSG */
1128 	if (cl) {
1129 		DTRACE_PROBE(krpc__e__clntrdma__declongreply__serverreadlist);
1130 		return;
1131 	}
1132 
1133 	if (cl_long_reply->u.c_daddr) {
1134 		DTRACE_PROBE1(krpc__i__longrepl__rdmanomsg__len,
1135 		    int, cl_rdma_reply->c_len);
1136 
1137 		xdrrdma_create(xdrs, (caddr_t)cl_long_reply->u.c_daddr3,
1138 		    cl_rdma_reply->c_len, 0, NULL, XDR_DECODE, conn);
1139 
1140 		*rxdrp = xdrs;
1141 	}
1142 }
1143 
1144 static void
1145 clnt_return_credit(CONN *conn)
1146 {
1147 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1148 
1149 	mutex_enter(&conn->c_lock);
1150 	cc_info->clnt_cc_in_flight_ops--;
1151 	cv_signal(&cc_info->clnt_cc_cv);
1152 	mutex_exit(&conn->c_lock);
1153 }
1154 
1155 static void
1156 clnt_update_credit(CONN *conn, uint32_t rdma_credit)
1157 {
1158 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1159 
1160 	/*
1161 	 * If the granted has not altered, avoid taking the
1162 	 * mutex, to essentially do nothing..
1163 	 */
1164 	if (cc_info->clnt_cc_granted_ops == rdma_credit)
1165 		return;
1166 	/*
1167 	 * Get the granted number of buffers for credit control.
1168 	 */
1169 	mutex_enter(&conn->c_lock);
1170 	cc_info->clnt_cc_granted_ops = rdma_credit;
1171 	mutex_exit(&conn->c_lock);
1172 }
1173 
1174 static void
1175 clnt_check_credit(CONN *conn)
1176 {
1177 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1178 
1179 	/*
1180 	 * Make sure we are not going over our allowed buffer use
1181 	 * (and make sure we have gotten a granted value before).
1182 	 */
1183 	mutex_enter(&conn->c_lock);
1184 	while (cc_info->clnt_cc_in_flight_ops >= cc_info->clnt_cc_granted_ops &&
1185 	    cc_info->clnt_cc_granted_ops != 0) {
1186 		/*
1187 		 * Client has maxed out its granted buffers due to
1188 		 * credit control.  Current handling is to block and wait.
1189 		 */
1190 		cv_wait(&cc_info->clnt_cc_cv, &conn->c_lock);
1191 	}
1192 	cc_info->clnt_cc_in_flight_ops++;
1193 	mutex_exit(&conn->c_lock);
1194 }
1195 
1196 /* ARGSUSED */
1197 static void
1198 clnt_rdma_kabort(CLIENT *h)
1199 {
1200 }
1201 
1202 static void
1203 clnt_rdma_kerror(CLIENT *h, struct rpc_err *err)
1204 {
1205 	struct cku_private *p = htop(h);
1206 	*err = p->cku_err;
1207 }
1208 
1209 static bool_t
1210 clnt_rdma_kfreeres(CLIENT *h, xdrproc_t xdr_res, caddr_t res_ptr)
1211 {
1212 	struct cku_private *p = htop(h);
1213 	XDR *xdrs;
1214 
1215 	xdrs = &(p->cku_outxdr);
1216 	xdrs->x_op = XDR_FREE;
1217 	return ((*xdr_res)(xdrs, res_ptr));
1218 }
1219 
1220 /* ARGSUSED */
1221 static bool_t
1222 clnt_rdma_kcontrol(CLIENT *h, int cmd, char *arg)
1223 {
1224 	return (TRUE);
1225 }
1226 
1227 /* ARGSUSED */
1228 static int
1229 clnt_rdma_ksettimers(CLIENT *h, struct rpc_timers *t, struct rpc_timers *all,
1230 	int minimum, void(*feedback)(int, int, caddr_t), caddr_t arg,
1231 	uint32_t xid)
1232 {
1233 	RCSTAT_INCR(rctimers);
1234 	return (0);
1235 }
1236 
1237 int
1238 rdma_reachable(int addr_type, struct netbuf *addr, struct knetconfig **knconf)
1239 {
1240 	rdma_registry_t	*rp;
1241 	void *handle = NULL;
1242 	struct knetconfig *knc;
1243 	char *pf, *p;
1244 	rdma_stat status;
1245 	int error = 0;
1246 
1247 	if (!INGLOBALZONE(curproc))
1248 		return (-1);
1249 
1250 	/*
1251 	 * modload the RDMA plugins if not already done.
1252 	 */
1253 	if (!rdma_modloaded) {
1254 		mutex_enter(&rdma_modload_lock);
1255 		if (!rdma_modloaded) {
1256 			error = rdma_modload();
1257 		}
1258 		mutex_exit(&rdma_modload_lock);
1259 		if (error)
1260 			return (-1);
1261 	}
1262 
1263 	if (!rdma_dev_available)
1264 		return (-1);
1265 
1266 	rw_enter(&rdma_lock, RW_READER);
1267 	rp = rdma_mod_head;
1268 	while (rp != NULL) {
1269 		status = RDMA_REACHABLE(rp->r_mod->rdma_ops, addr_type, addr,
1270 		    &handle);
1271 		if (status == RDMA_SUCCESS) {
1272 			knc = kmem_zalloc(sizeof (struct knetconfig),
1273 			    KM_SLEEP);
1274 			knc->knc_semantics = NC_TPI_RDMA;
1275 			pf = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1276 			p = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1277 			if (addr_type == AF_INET)
1278 				(void) strncpy(pf, NC_INET, KNC_STRSIZE);
1279 			else if (addr_type == AF_INET6)
1280 				(void) strncpy(pf, NC_INET6, KNC_STRSIZE);
1281 			pf[KNC_STRSIZE - 1] = '\0';
1282 
1283 			(void) strncpy(p, rp->r_mod->rdma_api, KNC_STRSIZE);
1284 			p[KNC_STRSIZE - 1] = '\0';
1285 
1286 			knc->knc_protofmly = pf;
1287 			knc->knc_proto = p;
1288 			knc->knc_rdev = (dev_t)handle;
1289 			*knconf = knc;
1290 			rw_exit(&rdma_lock);
1291 			return (0);
1292 		}
1293 		rp = rp->r_next;
1294 	}
1295 	rw_exit(&rdma_lock);
1296 	return (-1);
1297 }
1298 
1299 static void
1300 check_dereg_wlist(CONN *conn, clist *rwc)
1301 {
1302 	int status;
1303 
1304 	if (rwc == NULL)
1305 		return;
1306 
1307 	if (rwc->c_dmemhandle.mrc_rmr && rwc->c_len) {
1308 
1309 		status = clist_deregister(conn, rwc, CLIST_REG_DST);
1310 
1311 		if (status != RDMA_SUCCESS) {
1312 			DTRACE_PROBE1(krpc__e__clntrdma__dereg_wlist,
1313 			    int, status);
1314 		}
1315 	}
1316 }
1317