xref: /titanic_51/usr/src/uts/common/rpc/clnt_rdma.c (revision cfb7311ca709ca6f4d930977c7498455556b5312)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
26 /* All Rights Reserved */
27 /*
28  * Portions of this source code were derived from Berkeley
29  * 4.3 BSD under license from the Regents of the University of
30  * California.
31  */
32 
33 #include <sys/param.h>
34 #include <sys/types.h>
35 #include <sys/user.h>
36 #include <sys/systm.h>
37 #include <sys/sysmacros.h>
38 #include <sys/errno.h>
39 #include <sys/kmem.h>
40 #include <sys/debug.h>
41 #include <sys/systm.h>
42 #include <sys/kstat.h>
43 #include <sys/t_lock.h>
44 #include <sys/ddi.h>
45 #include <sys/cmn_err.h>
46 #include <sys/time.h>
47 #include <sys/isa_defs.h>
48 #include <sys/zone.h>
49 #include <sys/sdt.h>
50 
51 #include <rpc/types.h>
52 #include <rpc/xdr.h>
53 #include <rpc/auth.h>
54 #include <rpc/clnt.h>
55 #include <rpc/rpc_msg.h>
56 #include <rpc/rpc_rdma.h>
57 #include <nfs/nfs.h>
58 #include <nfs/nfs4_kprot.h>
59 
60 static uint32_t rdma_bufs_rqst = RDMA_BUFS_RQST;
61 
62 static int clnt_compose_rpcmsg(CLIENT *, rpcproc_t, rdma_buf_t *,
63 			    XDR *, xdrproc_t, caddr_t);
64 static int  clnt_compose_rdma_header(CONN *, CLIENT *, rdma_buf_t *,
65 		    XDR **, uint_t *);
66 static int clnt_setup_rlist(CONN *, XDR *, XDR *);
67 static int clnt_setup_wlist(CONN *, XDR *, XDR *, rdma_buf_t *);
68 static int clnt_setup_long_reply(CONN *, struct clist **, uint_t);
69 static void clnt_check_credit(CONN *);
70 static void clnt_return_credit(CONN *);
71 static void clnt_decode_long_reply(CONN *, struct clist *,
72 		struct clist *, XDR *, XDR **, struct clist *,
73 		struct clist *, uint_t, uint_t);
74 
75 static void clnt_update_credit(CONN *, uint32_t);
76 
77 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
78     caddr_t, xdrproc_t, caddr_t, struct timeval);
79 static void	clnt_rdma_kabort(CLIENT *);
80 static void	clnt_rdma_kerror(CLIENT *, struct rpc_err *);
81 static bool_t	clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
82 static void	clnt_rdma_kdestroy(CLIENT *);
83 static bool_t	clnt_rdma_kcontrol(CLIENT *, int, char *);
84 static int	clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
85     struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);
86 
87 /*
88  * Operations vector for RDMA based RPC
89  */
90 static struct clnt_ops rdma_clnt_ops = {
91 	clnt_rdma_kcallit,	/* do rpc call */
92 	clnt_rdma_kabort,	/* abort call */
93 	clnt_rdma_kerror,	/* return error status */
94 	clnt_rdma_kfreeres,	/* free results */
95 	clnt_rdma_kdestroy,	/* destroy rpc handle */
96 	clnt_rdma_kcontrol,	/* the ioctl() of rpc */
97 	clnt_rdma_ksettimers,	/* set retry timers */
98 };
99 
100 /*
101  * The size of the preserialized RPC header information.
102  */
103 #define	CKU_HDRSIZE	20
104 #define	CLNT_RDMA_SUCCESS 0
105 #define	CLNT_RDMA_FAIL (-1)
106 
107 #define	AUTH_REFRESH_COUNT 2
108 
109 #define	IS_RPCSEC_GSS(authh)			\
110 	(authh->cl_auth->ah_cred.oa_flavor == RPCSEC_GSS)
111 
112 /*
113  * Per RPC RDMA endpoint details
114  */
115 typedef struct cku_private {
116 	CLIENT			cku_client;	/* client handle */
117 	rdma_mod_t		*cku_rd_mod;	/* underlying RDMA mod */
118 	void			*cku_rd_handle;	/* underlying RDMA device */
119 	struct netbuf		cku_srcaddr;	/* source address for retries */
120 	struct netbuf		cku_addr;	/* remote netbuf address */
121 	int			cku_addrfmly;	/* for finding addr_type */
122 	struct rpc_err		cku_err;	/* error status */
123 	struct cred		*cku_cred;	/* credentials */
124 	XDR			cku_outxdr;	/* xdr stream for output */
125 	uint32_t		cku_outsz;
126 	XDR			cku_inxdr;	/* xdr stream for input */
127 	char			cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
128 	uint32_t		cku_xid;	/* current XID */
129 } cku_private_t;
130 
131 #define	CLNT_RDMA_DELAY	10	/* secs to delay after a connection failure */
132 static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;
133 
134 struct {
135 	kstat_named_t	rccalls;
136 	kstat_named_t	rcbadcalls;
137 	kstat_named_t	rcbadxids;
138 	kstat_named_t	rctimeouts;
139 	kstat_named_t	rcnewcreds;
140 	kstat_named_t	rcbadverfs;
141 	kstat_named_t	rctimers;
142 	kstat_named_t	rccantconn;
143 	kstat_named_t	rcnomem;
144 	kstat_named_t	rcintrs;
145 	kstat_named_t	rclongrpcs;
146 } rdmarcstat = {
147 	{ "calls",	KSTAT_DATA_UINT64 },
148 	{ "badcalls",	KSTAT_DATA_UINT64 },
149 	{ "badxids",	KSTAT_DATA_UINT64 },
150 	{ "timeouts",	KSTAT_DATA_UINT64 },
151 	{ "newcreds",	KSTAT_DATA_UINT64 },
152 	{ "badverfs",	KSTAT_DATA_UINT64 },
153 	{ "timers",	KSTAT_DATA_UINT64 },
154 	{ "cantconn",	KSTAT_DATA_UINT64 },
155 	{ "nomem",	KSTAT_DATA_UINT64 },
156 	{ "interrupts", KSTAT_DATA_UINT64 },
157 	{ "longrpc", 	KSTAT_DATA_UINT64 }
158 };
159 
160 kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
161 uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);
162 
163 #ifdef DEBUG
164 int rdma_clnt_debug = 0;
165 #endif
166 
167 #ifdef accurate_stats
168 extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */
169 
170 #define	RCSTAT_INCR(x)			\
171 	mutex_enter(&rdmarcstat_lock);	\
172 	rdmarcstat.x.value.ui64++;	\
173 	mutex_exit(&rdmarcstat_lock);
174 #else
175 #define	RCSTAT_INCR(x)			\
176 	rdmarcstat.x.value.ui64++;
177 #endif
178 
179 #define	ptoh(p)		(&((p)->cku_client))
180 #define	htop(h)		((cku_private_t *)((h)->cl_private))
181 
182 uint_t
183 calc_length(uint_t len)
184 {
185 	len = RNDUP(len);
186 
187 	if (len <= 64 * 1024) {
188 		if (len > 32 * 1024) {
189 			len = 64 * 1024;
190 		} else {
191 			if (len > 16 * 1024) {
192 				len = 32 * 1024;
193 			} else {
194 				if (len > 8 * 1024) {
195 					len = 16 * 1024;
196 				} else {
197 					len = 8 * 1024;
198 				}
199 			}
200 		}
201 	}
202 	return (len);
203 }
204 int
205 clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
206     rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
207 {
208 	CLIENT *h;
209 	struct cku_private *p;
210 	struct rpc_msg call_msg;
211 	rdma_registry_t *rp;
212 
213 	ASSERT(INGLOBALZONE(curproc));
214 
215 	if (cl == NULL)
216 		return (EINVAL);
217 	*cl = NULL;
218 
219 	p = kmem_zalloc(sizeof (*p), KM_SLEEP);
220 
221 	/*
222 	 * Find underlying RDMATF plugin
223 	 */
224 	rw_enter(&rdma_lock, RW_READER);
225 	rp = rdma_mod_head;
226 	while (rp != NULL) {
227 		if (strcmp(rp->r_mod->rdma_api, proto))
228 			rp = rp->r_next;
229 		else {
230 			p->cku_rd_mod = rp->r_mod;
231 			p->cku_rd_handle = handle;
232 			break;
233 		}
234 	}
235 	rw_exit(&rdma_lock);
236 
237 	if (p->cku_rd_mod == NULL) {
238 		/*
239 		 * Should not happen.
240 		 * No matching RDMATF plugin.
241 		 */
242 		kmem_free(p, sizeof (struct cku_private));
243 		return (EINVAL);
244 	}
245 
246 	h = ptoh(p);
247 	h->cl_ops = &rdma_clnt_ops;
248 	h->cl_private = (caddr_t)p;
249 	h->cl_auth = authkern_create();
250 
251 	/* call message, just used to pre-serialize below */
252 	call_msg.rm_xid = 0;
253 	call_msg.rm_direction = CALL;
254 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
255 	call_msg.rm_call.cb_prog = pgm;
256 	call_msg.rm_call.cb_vers = vers;
257 
258 	xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
259 	/* pre-serialize call message header */
260 	if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
261 		XDR_DESTROY(&p->cku_outxdr);
262 		auth_destroy(h->cl_auth);
263 		kmem_free(p, sizeof (struct cku_private));
264 		return (EINVAL);
265 	}
266 
267 	/*
268 	 * Set up the rpc information
269 	 */
270 	p->cku_cred = cred;
271 	p->cku_srcaddr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
272 	p->cku_srcaddr.maxlen = raddr->maxlen;
273 	p->cku_srcaddr.len = 0;
274 	p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
275 	p->cku_addr.maxlen = raddr->maxlen;
276 	p->cku_addr.len = raddr->len;
277 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
278 	p->cku_addrfmly = family;
279 
280 	*cl = h;
281 	return (0);
282 }
283 
284 static void
285 clnt_rdma_kdestroy(CLIENT *h)
286 {
287 	struct cku_private *p = htop(h);
288 
289 	kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
290 	kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
291 	kmem_free(p, sizeof (*p));
292 }
293 
294 void
295 clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
296     struct cred *cred)
297 {
298 	struct cku_private *p = htop(h);
299 	rdma_registry_t *rp;
300 
301 	ASSERT(INGLOBALZONE(curproc));
302 	/*
303 	 * Find underlying RDMATF plugin
304 	 */
305 	p->cku_rd_mod = NULL;
306 	rw_enter(&rdma_lock, RW_READER);
307 	rp = rdma_mod_head;
308 	while (rp != NULL) {
309 		if (strcmp(rp->r_mod->rdma_api, proto))
310 			rp = rp->r_next;
311 		else {
312 			p->cku_rd_mod = rp->r_mod;
313 			p->cku_rd_handle = handle;
314 			break;
315 		}
316 
317 	}
318 	rw_exit(&rdma_lock);
319 
320 	/*
321 	 * Set up the rpc information
322 	 */
323 	p->cku_cred = cred;
324 	p->cku_xid = 0;
325 
326 	if (p->cku_addr.maxlen < raddr->len) {
327 		if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
328 			kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
329 		p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
330 		p->cku_addr.maxlen = raddr->maxlen;
331 	}
332 
333 	p->cku_srcaddr.len = 0;
334 
335 	p->cku_addr.len = raddr->len;
336 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
337 	h->cl_ops = &rdma_clnt_ops;
338 }
339 
340 static int
341 clnt_compose_rpcmsg(CLIENT *h, rpcproc_t procnum,
342     rdma_buf_t *rpcmsg, XDR *xdrs,
343     xdrproc_t xdr_args, caddr_t argsp)
344 {
345 	cku_private_t *p = htop(h);
346 
347 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
348 		/*
349 		 * Copy in the preserialized RPC header
350 		 * information.
351 		 */
352 		bcopy(p->cku_rpchdr, rpcmsg->addr, CKU_HDRSIZE);
353 
354 		/*
355 		 * transaction id is the 1st thing in the output
356 		 * buffer.
357 		 */
358 		/* LINTED pointer alignment */
359 		(*(uint32_t *)(rpcmsg->addr)) = p->cku_xid;
360 
361 		/* Skip the preserialized stuff. */
362 		XDR_SETPOS(xdrs, CKU_HDRSIZE);
363 
364 		/* Serialize dynamic stuff into the output buffer. */
365 		if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
366 		    (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
367 		    (!(*xdr_args)(xdrs, argsp))) {
368 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__dynargs);
369 			return (CLNT_RDMA_FAIL);
370 		}
371 		p->cku_outsz = XDR_GETPOS(xdrs);
372 	} else {
373 		uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
374 		IXDR_PUT_U_INT32(uproc, procnum);
375 		(*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
376 		XDR_SETPOS(xdrs, 0);
377 
378 		/* Serialize the procedure number and the arguments. */
379 		if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
380 		    CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
381 			if (rpcmsg->addr != xdrs->x_base) {
382 				rpcmsg->addr = xdrs->x_base;
383 				rpcmsg->len = xdr_getbufsize(xdrs);
384 			}
385 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__procnum);
386 			return (CLNT_RDMA_FAIL);
387 		}
388 		/*
389 		 * If we had to allocate a new buffer while encoding
390 		 * then update the addr and len.
391 		 */
392 		if (rpcmsg->addr != xdrs->x_base) {
393 			rpcmsg->addr = xdrs->x_base;
394 			rpcmsg->len = xdr_getbufsize(xdrs);
395 		}
396 
397 		p->cku_outsz = XDR_GETPOS(xdrs);
398 		DTRACE_PROBE1(krpc__i__compose__size__sec, int, p->cku_outsz)
399 	}
400 
401 	return (CLNT_RDMA_SUCCESS);
402 }
403 
404 static int
405 clnt_compose_rdma_header(CONN *conn, CLIENT *h, rdma_buf_t *clmsg,
406     XDR **xdrs, uint_t *op)
407 {
408 	cku_private_t *p = htop(h);
409 	uint_t vers;
410 	uint32_t rdma_credit = rdma_bufs_rqst;
411 
412 	vers = RPCRDMA_VERS;
413 	clmsg->type = SEND_BUFFER;
414 
415 	if (rdma_buf_alloc(conn, clmsg)) {
416 		return (CLNT_RDMA_FAIL);
417 	}
418 
419 	*xdrs = &p->cku_outxdr;
420 	xdrmem_create(*xdrs, clmsg->addr, clmsg->len, XDR_ENCODE);
421 
422 	(*(uint32_t *)clmsg->addr) = p->cku_xid;
423 	XDR_SETPOS(*xdrs, sizeof (uint32_t));
424 	(void) xdr_u_int(*xdrs, &vers);
425 	(void) xdr_u_int(*xdrs, &rdma_credit);
426 	(void) xdr_u_int(*xdrs, op);
427 
428 	return (CLNT_RDMA_SUCCESS);
429 }
430 
431 /*
432  * If xp_cl is NULL value, then the RPC payload will NOT carry
433  * an RDMA READ chunk list, in this case we insert FALSE into
434  * the XDR stream. Otherwise we use the clist and RDMA register
435  * the memory and encode the clist into the outbound XDR stream.
436  */
437 static int
438 clnt_setup_rlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
439 {
440 	int status;
441 	struct clist *rclp;
442 	int32_t xdr_flag = XDR_RDMA_RLIST_REG;
443 
444 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &rclp);
445 
446 	if (rclp != NULL) {
447 		status = clist_register(conn, rclp, CLIST_REG_SOURCE);
448 		if (status != RDMA_SUCCESS) {
449 			return (CLNT_RDMA_FAIL);
450 		}
451 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
452 	}
453 	(void) xdr_do_clist(xdrs, &rclp);
454 
455 	return (CLNT_RDMA_SUCCESS);
456 }
457 
458 /*
459  * If xp_wcl is NULL value, then the RPC payload will NOT carry
460  * an RDMA WRITE chunk list, in this case we insert FALSE into
461  * the XDR stream. Otherwise we use the clist and  RDMA register
462  * the memory and encode the clist into the outbound XDR stream.
463  */
464 static int
465 clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp, rdma_buf_t *rndbuf)
466 {
467 	int status;
468 	struct clist *wlist, *rndcl;
469 	int wlen, rndlen;
470 	int32_t xdr_flag = XDR_RDMA_WLIST_REG;
471 
472 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_WLIST, &wlist);
473 
474 	if (wlist != NULL) {
475 		/*
476 		 * If we are sending a non 4-byte alligned length
477 		 * the server will roundup the length to 4-byte
478 		 * boundary. In such a case, a trailing chunk is
479 		 * added to take any spill over roundup bytes.
480 		 */
481 		wlen = clist_len(wlist);
482 		rndlen = (roundup(wlen, BYTES_PER_XDR_UNIT) - wlen);
483 		if (rndlen) {
484 			rndcl = clist_alloc();
485 			/*
486 			 * calc_length() will allocate a PAGESIZE
487 			 * buffer below.
488 			 */
489 			rndcl->c_len = calc_length(rndlen);
490 			rndcl->rb_longbuf.type = RDMA_LONG_BUFFER;
491 			rndcl->rb_longbuf.len = rndcl->c_len;
492 			if (rdma_buf_alloc(conn, &rndcl->rb_longbuf)) {
493 				clist_free(rndcl);
494 				return (CLNT_RDMA_FAIL);
495 			}
496 
497 			/* Roundup buffer freed back in caller */
498 			*rndbuf = rndcl->rb_longbuf;
499 
500 			rndcl->u.c_daddr3 = rndcl->rb_longbuf.addr;
501 			rndcl->c_next = NULL;
502 			rndcl->c_dmemhandle = rndcl->rb_longbuf.handle;
503 			wlist->c_next = rndcl;
504 		}
505 
506 		status = clist_register(conn, wlist, CLIST_REG_DST);
507 		if (status != RDMA_SUCCESS) {
508 			rdma_buf_free(conn, rndbuf);
509 			bzero(rndbuf, sizeof (rdma_buf_t));
510 			return (CLNT_RDMA_FAIL);
511 		}
512 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
513 	}
514 
515 	if (!xdr_encode_wlist(xdrs, wlist)) {
516 		if (rndlen) {
517 			rdma_buf_free(conn, rndbuf);
518 			bzero(rndbuf, sizeof (rdma_buf_t));
519 		}
520 		return (CLNT_RDMA_FAIL);
521 	}
522 
523 	return (CLNT_RDMA_SUCCESS);
524 }
525 
526 static int
527 clnt_setup_long_reply(CONN *conn, struct clist **clpp, uint_t length)
528 {
529 	if (length == 0) {
530 		*clpp = NULL;
531 		return (CLNT_RDMA_SUCCESS);
532 	}
533 
534 	*clpp = clist_alloc();
535 
536 	(*clpp)->rb_longbuf.len = calc_length(length);
537 	(*clpp)->rb_longbuf.type = RDMA_LONG_BUFFER;
538 
539 	if (rdma_buf_alloc(conn, &((*clpp)->rb_longbuf))) {
540 		clist_free(*clpp);
541 		*clpp = NULL;
542 		return (CLNT_RDMA_FAIL);
543 	}
544 
545 	(*clpp)->u.c_daddr3 = (*clpp)->rb_longbuf.addr;
546 	(*clpp)->c_len = (*clpp)->rb_longbuf.len;
547 	(*clpp)->c_next = NULL;
548 	(*clpp)->c_dmemhandle = (*clpp)->rb_longbuf.handle;
549 
550 	if (clist_register(conn, *clpp, CLIST_REG_DST)) {
551 		DTRACE_PROBE(krpc__e__clntrdma__longrep_regbuf);
552 		rdma_buf_free(conn, &((*clpp)->rb_longbuf));
553 		clist_free(*clpp);
554 		*clpp = NULL;
555 		return (CLNT_RDMA_FAIL);
556 	}
557 
558 	return (CLNT_RDMA_SUCCESS);
559 }
560 
561 /* ARGSUSED */
562 static enum clnt_stat
563 clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
564     caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp,
565     struct timeval wait)
566 {
567 	cku_private_t *p = htop(h);
568 
569 	int 	try_call_again;
570 	int	refresh_attempt = AUTH_REFRESH_COUNT;
571 	int 	status;
572 	int 	msglen;
573 
574 	XDR	*call_xdrp, callxdr; /* for xdrrdma encoding the RPC call */
575 	XDR	*reply_xdrp, replyxdr; /* for xdrrdma decoding the RPC reply */
576 	XDR 	*rdmahdr_o_xdrs, *rdmahdr_i_xdrs;
577 
578 	struct rpc_msg 	reply_msg;
579 	rdma_registry_t	*m;
580 
581 	struct clist *cl_sendlist;
582 	struct clist *cl_recvlist;
583 	struct clist *cl;
584 	struct clist *cl_rpcmsg;
585 	struct clist *cl_rdma_reply;
586 	struct clist *cl_rpcreply_wlist;
587 	struct clist *cl_long_reply;
588 	rdma_buf_t  rndup;
589 
590 	uint_t vers;
591 	uint_t op;
592 	uint_t off;
593 	uint32_t seg_array_len;
594 	uint_t long_reply_len;
595 	uint_t rpcsec_gss;
596 	uint_t gss_i_or_p;
597 
598 	CONN *conn = NULL;
599 	rdma_buf_t clmsg;
600 	rdma_buf_t rpcmsg;
601 	rdma_chunkinfo_lengths_t rcil;
602 
603 	clock_t	ticks;
604 	bool_t wlist_exists_reply;
605 
606 	uint32_t rdma_credit = rdma_bufs_rqst;
607 
608 	RCSTAT_INCR(rccalls);
609 
610 call_again:
611 
612 	bzero(&clmsg, sizeof (clmsg));
613 	bzero(&rpcmsg, sizeof (rpcmsg));
614 	bzero(&rndup, sizeof (rndup));
615 	try_call_again = 0;
616 	cl_sendlist = NULL;
617 	cl_recvlist = NULL;
618 	cl = NULL;
619 	cl_rpcmsg = NULL;
620 	cl_rdma_reply = NULL;
621 	call_xdrp = NULL;
622 	reply_xdrp = NULL;
623 	wlist_exists_reply  = FALSE;
624 	cl_rpcreply_wlist = NULL;
625 	cl_long_reply = NULL;
626 	rcil.rcil_len = 0;
627 	rcil.rcil_len_alt = 0;
628 	long_reply_len = 0;
629 
630 	rw_enter(&rdma_lock, RW_READER);
631 	m = (rdma_registry_t *)p->cku_rd_handle;
632 	if (m->r_mod_state == RDMA_MOD_INACTIVE) {
633 		/*
634 		 * If we didn't find a matching RDMA module in the registry
635 		 * then there is no transport.
636 		 */
637 		rw_exit(&rdma_lock);
638 		p->cku_err.re_status = RPC_CANTSEND;
639 		p->cku_err.re_errno = EIO;
640 		ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
641 		if (h->cl_nosignal == TRUE) {
642 			delay(ticks);
643 		} else {
644 			if (delay_sig(ticks) == EINTR) {
645 				p->cku_err.re_status = RPC_INTR;
646 				p->cku_err.re_errno = EINTR;
647 			}
648 		}
649 		return (RPC_CANTSEND);
650 	}
651 	/*
652 	 * Get unique xid
653 	 */
654 	if (p->cku_xid == 0)
655 		p->cku_xid = alloc_xid();
656 
657 	status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_srcaddr,
658 	    &p->cku_addr, p->cku_addrfmly, p->cku_rd_handle, &conn);
659 	rw_exit(&rdma_lock);
660 
661 	/*
662 	 * If there is a problem with the connection reflect the issue
663 	 * back to the higher level to address, we MAY delay for a short
664 	 * period so that we are kind to the transport.
665 	 */
666 	if (conn == NULL) {
667 		/*
668 		 * Connect failed to server. Could be because of one
669 		 * of several things. In some cases we don't want
670 		 * the caller to retry immediately - delay before
671 		 * returning to caller.
672 		 */
673 		switch (status) {
674 		case RDMA_TIMEDOUT:
675 			/*
676 			 * Already timed out. No need to delay
677 			 * some more.
678 			 */
679 			p->cku_err.re_status = RPC_TIMEDOUT;
680 			p->cku_err.re_errno = ETIMEDOUT;
681 			break;
682 		case RDMA_INTR:
683 			/*
684 			 * Failed because of an signal. Very likely
685 			 * the caller will not retry.
686 			 */
687 			p->cku_err.re_status = RPC_INTR;
688 			p->cku_err.re_errno = EINTR;
689 			break;
690 		default:
691 			/*
692 			 * All other failures - server down or service
693 			 * down or temporary resource failure. Delay before
694 			 * returning to caller.
695 			 */
696 			ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
697 			p->cku_err.re_status = RPC_CANTCONNECT;
698 			p->cku_err.re_errno = EIO;
699 
700 			if (h->cl_nosignal == TRUE) {
701 				delay(ticks);
702 			} else {
703 				if (delay_sig(ticks) == EINTR) {
704 					p->cku_err.re_status = RPC_INTR;
705 					p->cku_err.re_errno = EINTR;
706 				}
707 			}
708 			break;
709 		}
710 
711 		return (p->cku_err.re_status);
712 	}
713 
714 	if (p->cku_srcaddr.maxlen < conn->c_laddr.len) {
715 		if ((p->cku_srcaddr.maxlen != 0) &&
716 		    (p->cku_srcaddr.buf != NULL))
717 			kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
718 		p->cku_srcaddr.buf = kmem_zalloc(conn->c_laddr.maxlen,
719 		    KM_SLEEP);
720 		p->cku_srcaddr.maxlen = conn->c_laddr.maxlen;
721 	}
722 
723 	p->cku_srcaddr.len = conn->c_laddr.len;
724 	bcopy(conn->c_laddr.buf, p->cku_srcaddr.buf, conn->c_laddr.len);
725 
726 	clnt_check_credit(conn);
727 
728 	status = CLNT_RDMA_FAIL;
729 
730 	rpcsec_gss = gss_i_or_p = FALSE;
731 
732 	if (IS_RPCSEC_GSS(h)) {
733 		rpcsec_gss = TRUE;
734 		if (rpc_gss_get_service_type(h->cl_auth) ==
735 		    rpc_gss_svc_integrity ||
736 		    rpc_gss_get_service_type(h->cl_auth) ==
737 		    rpc_gss_svc_privacy)
738 			gss_i_or_p = TRUE;
739 	}
740 
741 	/*
742 	 * Try a regular RDMA message if RPCSEC_GSS is not being used
743 	 * or if RPCSEC_GSS is being used for authentication only.
744 	 */
745 	if (rpcsec_gss == FALSE ||
746 	    (rpcsec_gss == TRUE && gss_i_or_p == FALSE)) {
747 		/*
748 		 * Grab a send buffer for the request.  Try to
749 		 * encode it to see if it fits. If not, then it
750 		 * needs to be sent in a chunk.
751 		 */
752 		rpcmsg.type = SEND_BUFFER;
753 		if (rdma_buf_alloc(conn, &rpcmsg)) {
754 			DTRACE_PROBE(krpc__e__clntrdma__callit_nobufs);
755 			goto done;
756 		}
757 
758 		/* First try to encode into regular send buffer */
759 		op = RDMA_MSG;
760 
761 		call_xdrp = &callxdr;
762 
763 		xdrrdma_create(call_xdrp, rpcmsg.addr, rpcmsg.len,
764 		    rdma_minchunk, NULL, XDR_ENCODE, conn);
765 
766 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
767 		    xdr_args, argsp);
768 
769 		if (status != CLNT_RDMA_SUCCESS) {
770 			/* Clean up from previous encode attempt */
771 			rdma_buf_free(conn, &rpcmsg);
772 			XDR_DESTROY(call_xdrp);
773 		} else {
774 			XDR_CONTROL(call_xdrp, XDR_RDMA_GET_CHUNK_LEN, &rcil);
775 		}
776 	}
777 
778 	/* If the encode didn't work, then try a NOMSG */
779 	if (status != CLNT_RDMA_SUCCESS) {
780 
781 		msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT + MAX_AUTH_BYTES +
782 		    xdr_sizeof(xdr_args, argsp);
783 
784 		msglen = calc_length(msglen);
785 
786 		/* pick up the lengths for the reply buffer needed */
787 		(void) xdrrdma_sizeof(xdr_args, argsp, 0,
788 		    &rcil.rcil_len, &rcil.rcil_len_alt);
789 
790 		/*
791 		 * Construct a clist to describe the CHUNK_BUFFER
792 		 * for the rpcmsg.
793 		 */
794 		cl_rpcmsg = clist_alloc();
795 		cl_rpcmsg->c_len = msglen;
796 		cl_rpcmsg->rb_longbuf.type = RDMA_LONG_BUFFER;
797 		cl_rpcmsg->rb_longbuf.len = msglen;
798 		if (rdma_buf_alloc(conn, &cl_rpcmsg->rb_longbuf)) {
799 			clist_free(cl_rpcmsg);
800 			goto done;
801 		}
802 		cl_rpcmsg->w.c_saddr3 = cl_rpcmsg->rb_longbuf.addr;
803 
804 		op = RDMA_NOMSG;
805 		call_xdrp = &callxdr;
806 
807 		xdrrdma_create(call_xdrp, cl_rpcmsg->rb_longbuf.addr,
808 		    cl_rpcmsg->rb_longbuf.len, 0,
809 		    cl_rpcmsg, XDR_ENCODE, conn);
810 
811 		status = clnt_compose_rpcmsg(h, procnum, &cl_rpcmsg->rb_longbuf,
812 		    call_xdrp, xdr_args, argsp);
813 
814 		DTRACE_PROBE2(krpc__i__clntrdma__callit__longbuf, int, status,
815 		    int, msglen);
816 		if (status != CLNT_RDMA_SUCCESS) {
817 			p->cku_err.re_status = RPC_CANTENCODEARGS;
818 			p->cku_err.re_errno = EIO;
819 			DTRACE_PROBE(krpc__e__clntrdma__callit__composemsg);
820 			goto done;
821 		}
822 	}
823 
824 	/*
825 	 * During the XDR_ENCODE we may have "allocated" an RDMA READ or
826 	 * RDMA WRITE clist.
827 	 *
828 	 * First pull the RDMA READ chunk list from the XDR private
829 	 * area to keep it handy.
830 	 */
831 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &cl);
832 
833 	if (gss_i_or_p) {
834 		long_reply_len = rcil.rcil_len + rcil.rcil_len_alt;
835 		long_reply_len += MAX_AUTH_BYTES;
836 	} else {
837 		long_reply_len = rcil.rcil_len;
838 	}
839 
840 	/*
841 	 * Update the chunk size information for the Long RPC msg.
842 	 */
843 	if (cl && op == RDMA_NOMSG)
844 		cl->c_len = p->cku_outsz;
845 
846 	/*
847 	 * Prepare the RDMA header. On success xdrs will hold the result
848 	 * of xdrmem_create() for a SEND_BUFFER.
849 	 */
850 	status = clnt_compose_rdma_header(conn, h, &clmsg,
851 	    &rdmahdr_o_xdrs, &op);
852 
853 	if (status != CLNT_RDMA_SUCCESS) {
854 		p->cku_err.re_status = RPC_CANTSEND;
855 		p->cku_err.re_errno = EIO;
856 		RCSTAT_INCR(rcnomem);
857 		DTRACE_PROBE(krpc__e__clntrdma__callit__nobufs2);
858 		goto done;
859 	}
860 
861 	/*
862 	 * Now insert the RDMA READ list iff present
863 	 */
864 	status = clnt_setup_rlist(conn, rdmahdr_o_xdrs, call_xdrp);
865 	if (status != CLNT_RDMA_SUCCESS) {
866 		DTRACE_PROBE(krpc__e__clntrdma__callit__clistreg);
867 		rdma_buf_free(conn, &clmsg);
868 		p->cku_err.re_status = RPC_CANTSEND;
869 		p->cku_err.re_errno = EIO;
870 		goto done;
871 	}
872 
873 	/*
874 	 * Setup RDMA WRITE chunk list for nfs read operation
875 	 * other operations will have a NULL which will result
876 	 * as a NULL list in the XDR stream.
877 	 */
878 	status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp, &rndup);
879 	if (status != CLNT_RDMA_SUCCESS) {
880 		rdma_buf_free(conn, &clmsg);
881 		p->cku_err.re_status = RPC_CANTSEND;
882 		p->cku_err.re_errno = EIO;
883 		goto done;
884 	}
885 
886 	/*
887 	 * If NULL call and RPCSEC_GSS, provide a chunk such that
888 	 * large responses can flow back to the client.
889 	 * If RPCSEC_GSS with integrity or privacy is in use, get chunk.
890 	 */
891 	if ((procnum == 0 && rpcsec_gss == TRUE) ||
892 	    (rpcsec_gss == TRUE && gss_i_or_p == TRUE))
893 		long_reply_len += 1024;
894 
895 	status = clnt_setup_long_reply(conn, &cl_long_reply, long_reply_len);
896 
897 	DTRACE_PROBE2(krpc__i__clntrdma__callit__longreply, int, status,
898 	    int, long_reply_len);
899 
900 	if (status != CLNT_RDMA_SUCCESS) {
901 		rdma_buf_free(conn, &clmsg);
902 		p->cku_err.re_status = RPC_CANTSEND;
903 		p->cku_err.re_errno = EIO;
904 		goto done;
905 	}
906 
907 	/*
908 	 * XDR encode the RDMA_REPLY write chunk
909 	 */
910 	seg_array_len = (cl_long_reply ? 1 : 0);
911 	(void) xdr_encode_reply_wchunk(rdmahdr_o_xdrs, cl_long_reply,
912 	    seg_array_len);
913 
914 	/*
915 	 * Construct a clist in "sendlist" that represents what we
916 	 * will push over the wire.
917 	 *
918 	 * Start with the RDMA header and clist (if any)
919 	 */
920 	clist_add(&cl_sendlist, 0, XDR_GETPOS(rdmahdr_o_xdrs), &clmsg.handle,
921 	    clmsg.addr, NULL, NULL);
922 
923 	/*
924 	 * Put the RPC call message in  sendlist if small RPC
925 	 */
926 	if (op == RDMA_MSG) {
927 		clist_add(&cl_sendlist, 0, p->cku_outsz, &rpcmsg.handle,
928 		    rpcmsg.addr, NULL, NULL);
929 	} else {
930 		/* Long RPC already in chunk list */
931 		RCSTAT_INCR(rclongrpcs);
932 	}
933 
934 	/*
935 	 * Set up a reply buffer ready for the reply
936 	 */
937 	status = rdma_clnt_postrecv(conn, p->cku_xid);
938 	if (status != RDMA_SUCCESS) {
939 		rdma_buf_free(conn, &clmsg);
940 		p->cku_err.re_status = RPC_CANTSEND;
941 		p->cku_err.re_errno = EIO;
942 		goto done;
943 	}
944 
945 	/*
946 	 * sync the memory for dma
947 	 */
948 	if (cl != NULL) {
949 		status = clist_syncmem(conn, cl, CLIST_REG_SOURCE);
950 		if (status != RDMA_SUCCESS) {
951 			(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
952 			rdma_buf_free(conn, &clmsg);
953 			p->cku_err.re_status = RPC_CANTSEND;
954 			p->cku_err.re_errno = EIO;
955 			goto done;
956 		}
957 	}
958 
959 	/*
960 	 * Send the RDMA Header and RPC call message to the server
961 	 */
962 	status = RDMA_SEND(conn, cl_sendlist, p->cku_xid);
963 	if (status != RDMA_SUCCESS) {
964 		(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
965 		p->cku_err.re_status = RPC_CANTSEND;
966 		p->cku_err.re_errno = EIO;
967 		goto done;
968 	}
969 
970 	/*
971 	 * RDMA plugin now owns the send msg buffers.
972 	 * Clear them out and don't free them.
973 	 */
974 	clmsg.addr = NULL;
975 	if (rpcmsg.type == SEND_BUFFER)
976 		rpcmsg.addr = NULL;
977 
978 	/*
979 	 * Recv rpc reply
980 	 */
981 	status = RDMA_RECV(conn, &cl_recvlist, p->cku_xid);
982 
983 	/*
984 	 * Now check recv status
985 	 */
986 	if (status != 0) {
987 		if (status == RDMA_INTR) {
988 			p->cku_err.re_status = RPC_INTR;
989 			p->cku_err.re_errno = EINTR;
990 			RCSTAT_INCR(rcintrs);
991 		} else if (status == RPC_TIMEDOUT) {
992 			p->cku_err.re_status = RPC_TIMEDOUT;
993 			p->cku_err.re_errno = ETIMEDOUT;
994 			RCSTAT_INCR(rctimeouts);
995 		} else {
996 			p->cku_err.re_status = RPC_CANTRECV;
997 			p->cku_err.re_errno = EIO;
998 		}
999 		goto done;
1000 	}
1001 
1002 	/*
1003 	 * Process the reply message.
1004 	 *
1005 	 * First the chunk list (if any)
1006 	 */
1007 	rdmahdr_i_xdrs = &(p->cku_inxdr);
1008 	xdrmem_create(rdmahdr_i_xdrs,
1009 	    (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3,
1010 	    cl_recvlist->c_len, XDR_DECODE);
1011 
1012 	/*
1013 	 * Treat xid as opaque (xid is the first entity
1014 	 * in the rpc rdma message).
1015 	 * Skip xid and set the xdr position accordingly.
1016 	 */
1017 	XDR_SETPOS(rdmahdr_i_xdrs, sizeof (uint32_t));
1018 	(void) xdr_u_int(rdmahdr_i_xdrs, &vers);
1019 	(void) xdr_u_int(rdmahdr_i_xdrs, &rdma_credit);
1020 	(void) xdr_u_int(rdmahdr_i_xdrs, &op);
1021 	(void) xdr_do_clist(rdmahdr_i_xdrs, &cl);
1022 
1023 	clnt_update_credit(conn, rdma_credit);
1024 
1025 	wlist_exists_reply = FALSE;
1026 	if (! xdr_decode_wlist(rdmahdr_i_xdrs, &cl_rpcreply_wlist,
1027 	    &wlist_exists_reply)) {
1028 		DTRACE_PROBE(krpc__e__clntrdma__callit__wlist_decode);
1029 		p->cku_err.re_status = RPC_CANTDECODERES;
1030 		p->cku_err.re_errno = EIO;
1031 		goto done;
1032 	}
1033 
1034 	/*
1035 	 * The server shouldn't have sent a RDMA_SEND that
1036 	 * the client needs to RDMA_WRITE a reply back to
1037 	 * the server.  So silently ignoring what the
1038 	 * server returns in the rdma_reply section of the
1039 	 * header.
1040 	 */
1041 	(void) xdr_decode_reply_wchunk(rdmahdr_i_xdrs, &cl_rdma_reply);
1042 	off = xdr_getpos(rdmahdr_i_xdrs);
1043 
1044 	clnt_decode_long_reply(conn, cl_long_reply,
1045 	    cl_rdma_reply, &replyxdr, &reply_xdrp,
1046 	    cl, cl_recvlist, op, off);
1047 
1048 	if (reply_xdrp == NULL)
1049 		goto done;
1050 
1051 	if (wlist_exists_reply) {
1052 		XDR_CONTROL(reply_xdrp, XDR_RDMA_SET_WLIST, cl_rpcreply_wlist);
1053 	}
1054 
1055 	reply_msg.rm_direction = REPLY;
1056 	reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
1057 	reply_msg.acpted_rply.ar_stat = SUCCESS;
1058 	reply_msg.acpted_rply.ar_verf = _null_auth;
1059 
1060 	/*
1061 	 *  xdr_results will be done in AUTH_UNWRAP.
1062 	 */
1063 	reply_msg.acpted_rply.ar_results.where = NULL;
1064 	reply_msg.acpted_rply.ar_results.proc = xdr_void;
1065 
1066 	/*
1067 	 * Decode and validate the response.
1068 	 */
1069 	if (xdr_replymsg(reply_xdrp, &reply_msg)) {
1070 		enum clnt_stat re_status;
1071 
1072 		_seterr_reply(&reply_msg, &(p->cku_err));
1073 
1074 		re_status = p->cku_err.re_status;
1075 		if (re_status == RPC_SUCCESS) {
1076 			/*
1077 			 * Reply is good, check auth.
1078 			 */
1079 			if (!AUTH_VALIDATE(h->cl_auth,
1080 			    &reply_msg.acpted_rply.ar_verf)) {
1081 				p->cku_err.re_status = RPC_AUTHERROR;
1082 				p->cku_err.re_why = AUTH_INVALIDRESP;
1083 				RCSTAT_INCR(rcbadverfs);
1084 				DTRACE_PROBE(
1085 				    krpc__e__clntrdma__callit__authvalidate);
1086 			} else if (!AUTH_UNWRAP(h->cl_auth, reply_xdrp,
1087 			    xdr_results, resultsp)) {
1088 				p->cku_err.re_status = RPC_CANTDECODERES;
1089 				p->cku_err.re_errno = EIO;
1090 				DTRACE_PROBE(
1091 				    krpc__e__clntrdma__callit__authunwrap);
1092 			}
1093 		} else {
1094 			/* set errno in case we can't recover */
1095 			if (re_status != RPC_VERSMISMATCH &&
1096 			    re_status != RPC_AUTHERROR &&
1097 			    re_status != RPC_PROGVERSMISMATCH)
1098 				p->cku_err.re_errno = EIO;
1099 
1100 			if (re_status == RPC_AUTHERROR) {
1101 				if ((refresh_attempt > 0) &&
1102 				    AUTH_REFRESH(h->cl_auth, &reply_msg,
1103 				    p->cku_cred)) {
1104 					refresh_attempt--;
1105 					try_call_again = 1;
1106 					goto done;
1107 				}
1108 
1109 				try_call_again = 0;
1110 
1111 				/*
1112 				 * We have used the client handle to
1113 				 * do an AUTH_REFRESH and the RPC status may
1114 				 * be set to RPC_SUCCESS; Let's make sure to
1115 				 * set it to RPC_AUTHERROR.
1116 				 */
1117 				p->cku_err.re_status = RPC_AUTHERROR;
1118 
1119 				/*
1120 				 * Map recoverable and unrecoverable
1121 				 * authentication errors to appropriate
1122 				 * errno
1123 				 */
1124 				switch (p->cku_err.re_why) {
1125 				case AUTH_BADCRED:
1126 				case AUTH_BADVERF:
1127 				case AUTH_INVALIDRESP:
1128 				case AUTH_TOOWEAK:
1129 				case AUTH_FAILED:
1130 				case RPCSEC_GSS_NOCRED:
1131 				case RPCSEC_GSS_FAILED:
1132 					p->cku_err.re_errno = EACCES;
1133 					break;
1134 				case AUTH_REJECTEDCRED:
1135 				case AUTH_REJECTEDVERF:
1136 				default:
1137 					p->cku_err.re_errno = EIO;
1138 					break;
1139 				}
1140 			}
1141 			DTRACE_PROBE1(krpc__e__clntrdma__callit__rpcfailed,
1142 			    int, p->cku_err.re_why);
1143 		}
1144 	} else {
1145 		p->cku_err.re_status = RPC_CANTDECODERES;
1146 		p->cku_err.re_errno = EIO;
1147 		DTRACE_PROBE(krpc__e__clntrdma__callit__replymsg);
1148 	}
1149 
1150 done:
1151 	clnt_return_credit(conn);
1152 
1153 	if (cl_sendlist != NULL)
1154 		clist_free(cl_sendlist);
1155 
1156 	/*
1157 	 * If rpc reply is in a chunk, free it now.
1158 	 */
1159 	if (cl_long_reply) {
1160 		(void) clist_deregister(conn, cl_long_reply);
1161 		rdma_buf_free(conn, &cl_long_reply->rb_longbuf);
1162 		clist_free(cl_long_reply);
1163 	}
1164 
1165 	if (call_xdrp)
1166 		XDR_DESTROY(call_xdrp);
1167 
1168 	if (rndup.rb_private) {
1169 		rdma_buf_free(conn, &rndup);
1170 	}
1171 
1172 	if (reply_xdrp) {
1173 		(void) xdr_rpc_free_verifier(reply_xdrp, &reply_msg);
1174 		XDR_DESTROY(reply_xdrp);
1175 	}
1176 
1177 	if (cl_rdma_reply) {
1178 		clist_free(cl_rdma_reply);
1179 	}
1180 
1181 	if (cl_recvlist) {
1182 		rdma_buf_t	recvmsg = {0};
1183 		recvmsg.addr = (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3;
1184 		recvmsg.type = RECV_BUFFER;
1185 		RDMA_BUF_FREE(conn, &recvmsg);
1186 		clist_free(cl_recvlist);
1187 	}
1188 
1189 	RDMA_REL_CONN(conn);
1190 
1191 	if (try_call_again)
1192 		goto call_again;
1193 
1194 	if (p->cku_err.re_status != RPC_SUCCESS) {
1195 		RCSTAT_INCR(rcbadcalls);
1196 	}
1197 	return (p->cku_err.re_status);
1198 }
1199 
1200 
1201 static void
1202 clnt_decode_long_reply(CONN *conn,
1203     struct clist *cl_long_reply,
1204     struct clist *cl_rdma_reply, XDR *xdrs,
1205     XDR **rxdrp, struct clist *cl,
1206     struct clist *cl_recvlist,
1207     uint_t  op, uint_t off)
1208 {
1209 	if (op != RDMA_NOMSG) {
1210 		DTRACE_PROBE1(krpc__i__longrepl__rdmamsg__len,
1211 		    int, cl_recvlist->c_len - off);
1212 		xdrrdma_create(xdrs,
1213 		    (caddr_t)(uintptr_t)(cl_recvlist->w.c_saddr3 + off),
1214 		    cl_recvlist->c_len - off, 0, cl, XDR_DECODE, conn);
1215 		*rxdrp = xdrs;
1216 		return;
1217 	}
1218 
1219 	/* op must be RDMA_NOMSG */
1220 	if (cl) {
1221 		DTRACE_PROBE(krpc__e__clntrdma__declongreply__serverreadlist);
1222 		return;
1223 	}
1224 
1225 	if (cl_long_reply->u.c_daddr) {
1226 		DTRACE_PROBE1(krpc__i__longrepl__rdmanomsg__len,
1227 		    int, cl_rdma_reply->c_len);
1228 
1229 		xdrrdma_create(xdrs, (caddr_t)cl_long_reply->u.c_daddr3,
1230 		    cl_rdma_reply->c_len, 0, NULL, XDR_DECODE, conn);
1231 
1232 		*rxdrp = xdrs;
1233 	}
1234 }
1235 
1236 static void
1237 clnt_return_credit(CONN *conn)
1238 {
1239 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1240 
1241 	mutex_enter(&conn->c_lock);
1242 	cc_info->clnt_cc_in_flight_ops--;
1243 	cv_signal(&cc_info->clnt_cc_cv);
1244 	mutex_exit(&conn->c_lock);
1245 }
1246 
1247 static void
1248 clnt_update_credit(CONN *conn, uint32_t rdma_credit)
1249 {
1250 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1251 
1252 	/*
1253 	 * If the granted has not altered, avoid taking the
1254 	 * mutex, to essentially do nothing..
1255 	 */
1256 	if (cc_info->clnt_cc_granted_ops == rdma_credit)
1257 		return;
1258 	/*
1259 	 * Get the granted number of buffers for credit control.
1260 	 */
1261 	mutex_enter(&conn->c_lock);
1262 	cc_info->clnt_cc_granted_ops = rdma_credit;
1263 	mutex_exit(&conn->c_lock);
1264 }
1265 
1266 static void
1267 clnt_check_credit(CONN *conn)
1268 {
1269 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1270 
1271 	/*
1272 	 * Make sure we are not going over our allowed buffer use
1273 	 * (and make sure we have gotten a granted value before).
1274 	 */
1275 	mutex_enter(&conn->c_lock);
1276 	while (cc_info->clnt_cc_in_flight_ops >= cc_info->clnt_cc_granted_ops &&
1277 	    cc_info->clnt_cc_granted_ops != 0) {
1278 		/*
1279 		 * Client has maxed out its granted buffers due to
1280 		 * credit control.  Current handling is to block and wait.
1281 		 */
1282 		cv_wait(&cc_info->clnt_cc_cv, &conn->c_lock);
1283 	}
1284 	cc_info->clnt_cc_in_flight_ops++;
1285 	mutex_exit(&conn->c_lock);
1286 }
1287 
1288 /* ARGSUSED */
1289 static void
1290 clnt_rdma_kabort(CLIENT *h)
1291 {
1292 }
1293 
1294 static void
1295 clnt_rdma_kerror(CLIENT *h, struct rpc_err *err)
1296 {
1297 	struct cku_private *p = htop(h);
1298 	*err = p->cku_err;
1299 }
1300 
1301 static bool_t
1302 clnt_rdma_kfreeres(CLIENT *h, xdrproc_t xdr_res, caddr_t res_ptr)
1303 {
1304 	struct cku_private *p = htop(h);
1305 	XDR *xdrs;
1306 
1307 	xdrs = &(p->cku_outxdr);
1308 	xdrs->x_op = XDR_FREE;
1309 	return ((*xdr_res)(xdrs, res_ptr));
1310 }
1311 
1312 /* ARGSUSED */
1313 static bool_t
1314 clnt_rdma_kcontrol(CLIENT *h, int cmd, char *arg)
1315 {
1316 	return (TRUE);
1317 }
1318 
1319 /* ARGSUSED */
1320 static int
1321 clnt_rdma_ksettimers(CLIENT *h, struct rpc_timers *t, struct rpc_timers *all,
1322 	int minimum, void(*feedback)(int, int, caddr_t), caddr_t arg,
1323 	uint32_t xid)
1324 {
1325 	RCSTAT_INCR(rctimers);
1326 	return (0);
1327 }
1328 
1329 int
1330 rdma_reachable(int addr_type, struct netbuf *addr, struct knetconfig **knconf)
1331 {
1332 	rdma_registry_t	*rp;
1333 	void *handle = NULL;
1334 	struct knetconfig *knc;
1335 	char *pf, *p;
1336 	rdma_stat status;
1337 	int error = 0;
1338 
1339 	if (!INGLOBALZONE(curproc))
1340 		return (-1);
1341 
1342 	/*
1343 	 * modload the RDMA plugins if not already done.
1344 	 */
1345 	if (!rdma_modloaded) {
1346 		mutex_enter(&rdma_modload_lock);
1347 		if (!rdma_modloaded) {
1348 			error = rdma_modload();
1349 		}
1350 		mutex_exit(&rdma_modload_lock);
1351 		if (error)
1352 			return (-1);
1353 	}
1354 
1355 	if (!rdma_dev_available)
1356 		return (-1);
1357 
1358 	rw_enter(&rdma_lock, RW_READER);
1359 	rp = rdma_mod_head;
1360 	while (rp != NULL) {
1361 		if (rp->r_mod_state == RDMA_MOD_INACTIVE) {
1362 			rp = rp->r_next;
1363 			continue;
1364 		}
1365 		status = RDMA_REACHABLE(rp->r_mod->rdma_ops, addr_type, addr,
1366 		    &handle);
1367 		if (status == RDMA_SUCCESS) {
1368 			knc = kmem_zalloc(sizeof (struct knetconfig),
1369 			    KM_SLEEP);
1370 			knc->knc_semantics = NC_TPI_RDMA;
1371 			pf = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1372 			p = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1373 			if (addr_type == AF_INET)
1374 				(void) strncpy(pf, NC_INET, KNC_STRSIZE);
1375 			else if (addr_type == AF_INET6)
1376 				(void) strncpy(pf, NC_INET6, KNC_STRSIZE);
1377 			pf[KNC_STRSIZE - 1] = '\0';
1378 
1379 			(void) strncpy(p, rp->r_mod->rdma_api, KNC_STRSIZE);
1380 			p[KNC_STRSIZE - 1] = '\0';
1381 
1382 			knc->knc_protofmly = pf;
1383 			knc->knc_proto = p;
1384 			knc->knc_rdev = (dev_t)rp;
1385 			*knconf = knc;
1386 			rw_exit(&rdma_lock);
1387 			return (0);
1388 		}
1389 		rp = rp->r_next;
1390 	}
1391 	rw_exit(&rdma_lock);
1392 	return (-1);
1393 }
1394