xref: /titanic_51/usr/src/uts/common/rpc/clnt_rdma.c (revision 597bd30ba830d1e11c8efdb9a1b9de28e0599f5e)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
26 /* All Rights Reserved */
27 /*
28  * Portions of this source code were derived from Berkeley
29  * 4.3 BSD under license from the Regents of the University of
30  * California.
31  */
32 
33 #include <sys/param.h>
34 #include <sys/types.h>
35 #include <sys/user.h>
36 #include <sys/systm.h>
37 #include <sys/sysmacros.h>
38 #include <sys/errno.h>
39 #include <sys/kmem.h>
40 #include <sys/debug.h>
41 #include <sys/systm.h>
42 #include <sys/kstat.h>
43 #include <sys/t_lock.h>
44 #include <sys/ddi.h>
45 #include <sys/cmn_err.h>
46 #include <sys/time.h>
47 #include <sys/isa_defs.h>
48 #include <sys/zone.h>
49 #include <sys/sdt.h>
50 
51 #include <rpc/types.h>
52 #include <rpc/xdr.h>
53 #include <rpc/auth.h>
54 #include <rpc/clnt.h>
55 #include <rpc/rpc_msg.h>
56 #include <rpc/rpc_rdma.h>
57 #include <nfs/nfs.h>
58 #include <nfs/nfs4_kprot.h>
59 
60 static uint32_t rdma_bufs_rqst = RDMA_BUFS_RQST;
61 
62 static int clnt_compose_rpcmsg(CLIENT *, rpcproc_t, rdma_buf_t *,
63 			    XDR *, xdrproc_t, caddr_t);
64 static int  clnt_compose_rdma_header(CONN *, CLIENT *, rdma_buf_t *,
65 		    XDR **, uint_t *);
66 static int clnt_setup_rlist(CONN *, XDR *, XDR *);
67 static int clnt_setup_wlist(CONN *, XDR *, XDR *, rdma_buf_t *);
68 static int clnt_setup_long_reply(CONN *, struct clist **, uint_t);
69 static void clnt_check_credit(CONN *);
70 static void clnt_return_credit(CONN *);
71 static void clnt_decode_long_reply(CONN *, struct clist *,
72 		struct clist *, XDR *, XDR **, struct clist *,
73 		struct clist *, uint_t, uint_t);
74 
75 static void clnt_update_credit(CONN *, uint32_t);
76 
77 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
78     caddr_t, xdrproc_t, caddr_t, struct timeval);
79 static void	clnt_rdma_kabort(CLIENT *);
80 static void	clnt_rdma_kerror(CLIENT *, struct rpc_err *);
81 static bool_t	clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
82 static void	clnt_rdma_kdestroy(CLIENT *);
83 static bool_t	clnt_rdma_kcontrol(CLIENT *, int, char *);
84 static int	clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
85     struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);
86 
87 /*
88  * Operations vector for RDMA based RPC
89  */
90 static struct clnt_ops rdma_clnt_ops = {
91 	clnt_rdma_kcallit,	/* do rpc call */
92 	clnt_rdma_kabort,	/* abort call */
93 	clnt_rdma_kerror,	/* return error status */
94 	clnt_rdma_kfreeres,	/* free results */
95 	clnt_rdma_kdestroy,	/* destroy rpc handle */
96 	clnt_rdma_kcontrol,	/* the ioctl() of rpc */
97 	clnt_rdma_ksettimers,	/* set retry timers */
98 };
99 
100 /*
101  * The size of the preserialized RPC header information.
102  */
103 #define	CKU_HDRSIZE	20
104 #define	CLNT_RDMA_SUCCESS 0
105 #define	CLNT_RDMA_FAIL (-1)
106 
107 #define	AUTH_REFRESH_COUNT 2
108 
109 #define	IS_RPCSEC_GSS(authh)			\
110 	(authh->cl_auth->ah_cred.oa_flavor == RPCSEC_GSS)
111 
112 /*
113  * Per RPC RDMA endpoint details
114  */
115 typedef struct cku_private {
116 	CLIENT			cku_client;	/* client handle */
117 	rdma_mod_t		*cku_rd_mod;	/* underlying RDMA mod */
118 	void			*cku_rd_handle;	/* underlying RDMA device */
119 	struct netbuf		cku_srcaddr;	/* source address for retries */
120 	struct netbuf		cku_addr;	/* remote netbuf address */
121 	int			cku_addrfmly;	/* for finding addr_type */
122 	struct rpc_err		cku_err;	/* error status */
123 	struct cred		*cku_cred;	/* credentials */
124 	XDR			cku_outxdr;	/* xdr stream for output */
125 	uint32_t		cku_outsz;
126 	XDR			cku_inxdr;	/* xdr stream for input */
127 	char			cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
128 	uint32_t		cku_xid;	/* current XID */
129 } cku_private_t;
130 
131 #define	CLNT_RDMA_DELAY	10	/* secs to delay after a connection failure */
132 static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;
133 
134 struct {
135 	kstat_named_t	rccalls;
136 	kstat_named_t	rcbadcalls;
137 	kstat_named_t	rcbadxids;
138 	kstat_named_t	rctimeouts;
139 	kstat_named_t	rcnewcreds;
140 	kstat_named_t	rcbadverfs;
141 	kstat_named_t	rctimers;
142 	kstat_named_t	rccantconn;
143 	kstat_named_t	rcnomem;
144 	kstat_named_t	rcintrs;
145 	kstat_named_t	rclongrpcs;
146 } rdmarcstat = {
147 	{ "calls",	KSTAT_DATA_UINT64 },
148 	{ "badcalls",	KSTAT_DATA_UINT64 },
149 	{ "badxids",	KSTAT_DATA_UINT64 },
150 	{ "timeouts",	KSTAT_DATA_UINT64 },
151 	{ "newcreds",	KSTAT_DATA_UINT64 },
152 	{ "badverfs",	KSTAT_DATA_UINT64 },
153 	{ "timers",	KSTAT_DATA_UINT64 },
154 	{ "cantconn",	KSTAT_DATA_UINT64 },
155 	{ "nomem",	KSTAT_DATA_UINT64 },
156 	{ "interrupts", KSTAT_DATA_UINT64 },
157 	{ "longrpc", 	KSTAT_DATA_UINT64 }
158 };
159 
160 kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
161 uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);
162 
163 #ifdef DEBUG
164 int rdma_clnt_debug = 0;
165 #endif
166 
167 #ifdef accurate_stats
168 extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */
169 
170 #define	RCSTAT_INCR(x)			\
171 	mutex_enter(&rdmarcstat_lock);	\
172 	rdmarcstat.x.value.ui64++;	\
173 	mutex_exit(&rdmarcstat_lock);
174 #else
175 #define	RCSTAT_INCR(x)			\
176 	rdmarcstat.x.value.ui64++;
177 #endif
178 
179 #define	ptoh(p)		(&((p)->cku_client))
180 #define	htop(h)		((cku_private_t *)((h)->cl_private))
181 
182 uint_t
183 calc_length(uint_t len)
184 {
185 	len = RNDUP(len);
186 
187 	if (len <= 64 * 1024) {
188 		if (len > 32 * 1024) {
189 			len = 64 * 1024;
190 		} else {
191 			if (len > 16 * 1024) {
192 				len = 32 * 1024;
193 			} else {
194 				if (len > 8 * 1024) {
195 					len = 16 * 1024;
196 				} else {
197 					len = 8 * 1024;
198 				}
199 			}
200 		}
201 	}
202 	return (len);
203 }
204 int
205 clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
206     rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
207 {
208 	CLIENT *h;
209 	struct cku_private *p;
210 	struct rpc_msg call_msg;
211 	rdma_registry_t *rp;
212 
213 	ASSERT(INGLOBALZONE(curproc));
214 
215 	if (cl == NULL)
216 		return (EINVAL);
217 	*cl = NULL;
218 
219 	p = kmem_zalloc(sizeof (*p), KM_SLEEP);
220 
221 	/*
222 	 * Find underlying RDMATF plugin
223 	 */
224 	rw_enter(&rdma_lock, RW_READER);
225 	rp = rdma_mod_head;
226 	while (rp != NULL) {
227 		if (strcmp(rp->r_mod->rdma_api, proto))
228 			rp = rp->r_next;
229 		else {
230 			p->cku_rd_mod = rp->r_mod;
231 			p->cku_rd_handle = handle;
232 			break;
233 		}
234 	}
235 	rw_exit(&rdma_lock);
236 
237 	if (p->cku_rd_mod == NULL) {
238 		/*
239 		 * Should not happen.
240 		 * No matching RDMATF plugin.
241 		 */
242 		kmem_free(p, sizeof (struct cku_private));
243 		return (EINVAL);
244 	}
245 
246 	h = ptoh(p);
247 	h->cl_ops = &rdma_clnt_ops;
248 	h->cl_private = (caddr_t)p;
249 	h->cl_auth = authkern_create();
250 
251 	/* call message, just used to pre-serialize below */
252 	call_msg.rm_xid = 0;
253 	call_msg.rm_direction = CALL;
254 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
255 	call_msg.rm_call.cb_prog = pgm;
256 	call_msg.rm_call.cb_vers = vers;
257 
258 	xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
259 	/* pre-serialize call message header */
260 	if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
261 		XDR_DESTROY(&p->cku_outxdr);
262 		auth_destroy(h->cl_auth);
263 		kmem_free(p, sizeof (struct cku_private));
264 		return (EINVAL);
265 	}
266 
267 	/*
268 	 * Set up the rpc information
269 	 */
270 	p->cku_cred = cred;
271 	p->cku_srcaddr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
272 	p->cku_srcaddr.maxlen = raddr->maxlen;
273 	p->cku_srcaddr.len = 0;
274 	p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
275 	p->cku_addr.maxlen = raddr->maxlen;
276 	p->cku_addr.len = raddr->len;
277 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
278 	p->cku_addrfmly = family;
279 
280 	*cl = h;
281 	return (0);
282 }
283 
284 static void
285 clnt_rdma_kdestroy(CLIENT *h)
286 {
287 	struct cku_private *p = htop(h);
288 
289 	kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
290 	kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
291 	kmem_free(p, sizeof (*p));
292 }
293 
294 void
295 clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
296     struct cred *cred)
297 {
298 	struct cku_private *p = htop(h);
299 	rdma_registry_t *rp;
300 
301 	ASSERT(INGLOBALZONE(curproc));
302 	/*
303 	 * Find underlying RDMATF plugin
304 	 */
305 	p->cku_rd_mod = NULL;
306 	rw_enter(&rdma_lock, RW_READER);
307 	rp = rdma_mod_head;
308 	while (rp != NULL) {
309 		if (strcmp(rp->r_mod->rdma_api, proto))
310 			rp = rp->r_next;
311 		else {
312 			p->cku_rd_mod = rp->r_mod;
313 			p->cku_rd_handle = handle;
314 			break;
315 		}
316 
317 	}
318 	rw_exit(&rdma_lock);
319 
320 	/*
321 	 * Set up the rpc information
322 	 */
323 	p->cku_cred = cred;
324 	p->cku_xid = 0;
325 
326 	if (p->cku_addr.maxlen < raddr->len) {
327 		if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
328 			kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
329 		p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
330 		p->cku_addr.maxlen = raddr->maxlen;
331 	}
332 
333 	p->cku_srcaddr.len = 0;
334 
335 	p->cku_addr.len = raddr->len;
336 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
337 	h->cl_ops = &rdma_clnt_ops;
338 }
339 
340 static int
341 clnt_compose_rpcmsg(CLIENT *h, rpcproc_t procnum,
342     rdma_buf_t *rpcmsg, XDR *xdrs,
343     xdrproc_t xdr_args, caddr_t argsp)
344 {
345 	cku_private_t *p = htop(h);
346 
347 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
348 		/*
349 		 * Copy in the preserialized RPC header
350 		 * information.
351 		 */
352 		bcopy(p->cku_rpchdr, rpcmsg->addr, CKU_HDRSIZE);
353 
354 		/*
355 		 * transaction id is the 1st thing in the output
356 		 * buffer.
357 		 */
358 		/* LINTED pointer alignment */
359 		(*(uint32_t *)(rpcmsg->addr)) = p->cku_xid;
360 
361 		/* Skip the preserialized stuff. */
362 		XDR_SETPOS(xdrs, CKU_HDRSIZE);
363 
364 		/* Serialize dynamic stuff into the output buffer. */
365 		if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
366 		    (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
367 		    (!(*xdr_args)(xdrs, argsp))) {
368 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__dynargs);
369 			return (CLNT_RDMA_FAIL);
370 		}
371 		p->cku_outsz = XDR_GETPOS(xdrs);
372 	} else {
373 		uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
374 		IXDR_PUT_U_INT32(uproc, procnum);
375 		(*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
376 		XDR_SETPOS(xdrs, 0);
377 
378 		/* Serialize the procedure number and the arguments. */
379 		if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
380 		    CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
381 			if (rpcmsg->addr != xdrs->x_base) {
382 				rpcmsg->addr = xdrs->x_base;
383 				rpcmsg->len = xdr_getbufsize(xdrs);
384 			}
385 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__procnum);
386 			return (CLNT_RDMA_FAIL);
387 		}
388 		/*
389 		 * If we had to allocate a new buffer while encoding
390 		 * then update the addr and len.
391 		 */
392 		if (rpcmsg->addr != xdrs->x_base) {
393 			rpcmsg->addr = xdrs->x_base;
394 			rpcmsg->len = xdr_getbufsize(xdrs);
395 		}
396 
397 		p->cku_outsz = XDR_GETPOS(xdrs);
398 		DTRACE_PROBE1(krpc__i__compose__size__sec, int, p->cku_outsz)
399 	}
400 
401 	return (CLNT_RDMA_SUCCESS);
402 }
403 
404 static int
405 clnt_compose_rdma_header(CONN *conn, CLIENT *h, rdma_buf_t *clmsg,
406     XDR **xdrs, uint_t *op)
407 {
408 	cku_private_t *p = htop(h);
409 	uint_t vers;
410 	uint32_t rdma_credit = rdma_bufs_rqst;
411 
412 	vers = RPCRDMA_VERS;
413 	clmsg->type = SEND_BUFFER;
414 
415 	if (rdma_buf_alloc(conn, clmsg)) {
416 		return (CLNT_RDMA_FAIL);
417 	}
418 
419 	*xdrs = &p->cku_outxdr;
420 	xdrmem_create(*xdrs, clmsg->addr, clmsg->len, XDR_ENCODE);
421 
422 	(*(uint32_t *)clmsg->addr) = p->cku_xid;
423 	XDR_SETPOS(*xdrs, sizeof (uint32_t));
424 	(void) xdr_u_int(*xdrs, &vers);
425 	(void) xdr_u_int(*xdrs, &rdma_credit);
426 	(void) xdr_u_int(*xdrs, op);
427 
428 	return (CLNT_RDMA_SUCCESS);
429 }
430 
431 /*
432  * If xp_cl is NULL value, then the RPC payload will NOT carry
433  * an RDMA READ chunk list, in this case we insert FALSE into
434  * the XDR stream. Otherwise we use the clist and RDMA register
435  * the memory and encode the clist into the outbound XDR stream.
436  */
437 static int
438 clnt_setup_rlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
439 {
440 	int status;
441 	struct clist *rclp;
442 	int32_t xdr_flag = XDR_RDMA_RLIST_REG;
443 
444 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &rclp);
445 
446 	if (rclp != NULL) {
447 		status = clist_register(conn, rclp, CLIST_REG_SOURCE);
448 		if (status != RDMA_SUCCESS) {
449 			return (CLNT_RDMA_FAIL);
450 		}
451 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
452 	}
453 	(void) xdr_do_clist(xdrs, &rclp);
454 
455 	return (CLNT_RDMA_SUCCESS);
456 }
457 
458 /*
459  * If xp_wcl is NULL value, then the RPC payload will NOT carry
460  * an RDMA WRITE chunk list, in this case we insert FALSE into
461  * the XDR stream. Otherwise we use the clist and  RDMA register
462  * the memory and encode the clist into the outbound XDR stream.
463  */
464 static int
465 clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp, rdma_buf_t *rndbuf)
466 {
467 	int status;
468 	struct clist *wlist, *rndcl;
469 	int wlen, rndlen;
470 	int32_t xdr_flag = XDR_RDMA_WLIST_REG;
471 
472 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_WLIST, &wlist);
473 
474 	if (wlist != NULL) {
475 		/*
476 		 * If we are sending a non 4-byte alligned length
477 		 * the server will roundup the length to 4-byte
478 		 * boundary. In such a case, a trailing chunk is
479 		 * added to take any spill over roundup bytes.
480 		 */
481 		wlen = clist_len(wlist);
482 		rndlen = (roundup(wlen, BYTES_PER_XDR_UNIT) - wlen);
483 		if (rndlen) {
484 			rndcl = clist_alloc();
485 			/*
486 			 * calc_length() will allocate a PAGESIZE
487 			 * buffer below.
488 			 */
489 			rndcl->c_len = calc_length(rndlen);
490 			rndcl->rb_longbuf.type = RDMA_LONG_BUFFER;
491 			rndcl->rb_longbuf.len = rndcl->c_len;
492 			if (rdma_buf_alloc(conn, &rndcl->rb_longbuf)) {
493 				clist_free(rndcl);
494 				return (CLNT_RDMA_FAIL);
495 			}
496 
497 			/* Roundup buffer freed back in caller */
498 			*rndbuf = rndcl->rb_longbuf;
499 
500 			rndcl->u.c_daddr3 = rndcl->rb_longbuf.addr;
501 			rndcl->c_next = NULL;
502 			rndcl->c_dmemhandle = rndcl->rb_longbuf.handle;
503 			wlist->c_next = rndcl;
504 		}
505 
506 		status = clist_register(conn, wlist, CLIST_REG_DST);
507 		if (status != RDMA_SUCCESS) {
508 			rdma_buf_free(conn, rndbuf);
509 			bzero(rndbuf, sizeof (rdma_buf_t));
510 			return (CLNT_RDMA_FAIL);
511 		}
512 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
513 	}
514 
515 	if (!xdr_encode_wlist(xdrs, wlist)) {
516 		if (rndlen) {
517 			rdma_buf_free(conn, rndbuf);
518 			bzero(rndbuf, sizeof (rdma_buf_t));
519 		}
520 		return (CLNT_RDMA_FAIL);
521 	}
522 
523 	return (CLNT_RDMA_SUCCESS);
524 }
525 
526 static int
527 clnt_setup_long_reply(CONN *conn, struct clist **clpp, uint_t length)
528 {
529 	if (length == 0) {
530 		*clpp = NULL;
531 		return (CLNT_RDMA_SUCCESS);
532 	}
533 
534 	*clpp = clist_alloc();
535 
536 	(*clpp)->rb_longbuf.len = calc_length(length);
537 	(*clpp)->rb_longbuf.type = RDMA_LONG_BUFFER;
538 
539 	if (rdma_buf_alloc(conn, &((*clpp)->rb_longbuf))) {
540 		clist_free(*clpp);
541 		*clpp = NULL;
542 		return (CLNT_RDMA_FAIL);
543 	}
544 
545 	(*clpp)->u.c_daddr3 = (*clpp)->rb_longbuf.addr;
546 	(*clpp)->c_len = (*clpp)->rb_longbuf.len;
547 	(*clpp)->c_next = NULL;
548 	(*clpp)->c_dmemhandle = (*clpp)->rb_longbuf.handle;
549 
550 	if (clist_register(conn, *clpp, CLIST_REG_DST)) {
551 		DTRACE_PROBE(krpc__e__clntrdma__longrep_regbuf);
552 		rdma_buf_free(conn, &((*clpp)->rb_longbuf));
553 		clist_free(*clpp);
554 		*clpp = NULL;
555 		return (CLNT_RDMA_FAIL);
556 	}
557 
558 	return (CLNT_RDMA_SUCCESS);
559 }
560 
561 /* ARGSUSED */
562 static enum clnt_stat
563 clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
564     caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp,
565     struct timeval wait)
566 {
567 	cku_private_t *p = htop(h);
568 
569 	int 	try_call_again;
570 	int	refresh_attempt = AUTH_REFRESH_COUNT;
571 	int 	status;
572 	int 	msglen;
573 
574 	XDR	*call_xdrp, callxdr; /* for xdrrdma encoding the RPC call */
575 	XDR	*reply_xdrp, replyxdr; /* for xdrrdma decoding the RPC reply */
576 	XDR 	*rdmahdr_o_xdrs, *rdmahdr_i_xdrs;
577 
578 	struct rpc_msg 	reply_msg;
579 	rdma_registry_t	*m;
580 
581 	struct clist *cl_sendlist;
582 	struct clist *cl_recvlist;
583 	struct clist *cl;
584 	struct clist *cl_rpcmsg;
585 	struct clist *cl_rdma_reply;
586 	struct clist *cl_rpcreply_wlist;
587 	struct clist *cl_long_reply;
588 	rdma_buf_t  rndup;
589 
590 	uint_t vers;
591 	uint_t op;
592 	uint_t off;
593 	uint32_t seg_array_len;
594 	uint_t long_reply_len;
595 	uint_t rpcsec_gss;
596 	uint_t gss_i_or_p;
597 
598 	CONN *conn = NULL;
599 	rdma_buf_t clmsg;
600 	rdma_buf_t rpcmsg;
601 	rdma_chunkinfo_lengths_t rcil;
602 
603 	clock_t	ticks;
604 	bool_t wlist_exists_reply;
605 
606 	uint32_t rdma_credit = rdma_bufs_rqst;
607 
608 	RCSTAT_INCR(rccalls);
609 
610 call_again:
611 
612 	bzero(&clmsg, sizeof (clmsg));
613 	bzero(&rpcmsg, sizeof (rpcmsg));
614 	bzero(&rndup, sizeof (rndup));
615 	try_call_again = 0;
616 	cl_sendlist = NULL;
617 	cl_recvlist = NULL;
618 	cl = NULL;
619 	cl_rpcmsg = NULL;
620 	cl_rdma_reply = NULL;
621 	call_xdrp = NULL;
622 	reply_xdrp = NULL;
623 	wlist_exists_reply  = FALSE;
624 	cl_rpcreply_wlist = NULL;
625 	cl_long_reply = NULL;
626 	rcil.rcil_len = 0;
627 	rcil.rcil_len_alt = 0;
628 	long_reply_len = 0;
629 
630 	rw_enter(&rdma_lock, RW_READER);
631 	m = (rdma_registry_t *)p->cku_rd_handle;
632 	if (m->r_mod_state == RDMA_MOD_INACTIVE) {
633 		/*
634 		 * If we didn't find a matching RDMA module in the registry
635 		 * then there is no transport.
636 		 */
637 		rw_exit(&rdma_lock);
638 		p->cku_err.re_status = RPC_CANTSEND;
639 		p->cku_err.re_errno = EIO;
640 		ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
641 		if (h->cl_nosignal == TRUE) {
642 			delay(ticks);
643 		} else {
644 			if (delay_sig(ticks) == EINTR) {
645 				p->cku_err.re_status = RPC_INTR;
646 				p->cku_err.re_errno = EINTR;
647 			}
648 		}
649 		return (RPC_CANTSEND);
650 	}
651 	/*
652 	 * Get unique xid
653 	 */
654 	if (p->cku_xid == 0)
655 		p->cku_xid = alloc_xid();
656 
657 	status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_srcaddr,
658 	    &p->cku_addr, p->cku_addrfmly, p->cku_rd_handle, &conn);
659 	rw_exit(&rdma_lock);
660 
661 	/*
662 	 * If there is a problem with the connection reflect the issue
663 	 * back to the higher level to address, we MAY delay for a short
664 	 * period so that we are kind to the transport.
665 	 */
666 	if (conn == NULL) {
667 		/*
668 		 * Connect failed to server. Could be because of one
669 		 * of several things. In some cases we don't want
670 		 * the caller to retry immediately - delay before
671 		 * returning to caller.
672 		 */
673 		switch (status) {
674 		case RDMA_TIMEDOUT:
675 			/*
676 			 * Already timed out. No need to delay
677 			 * some more.
678 			 */
679 			p->cku_err.re_status = RPC_TIMEDOUT;
680 			p->cku_err.re_errno = ETIMEDOUT;
681 			break;
682 		case RDMA_INTR:
683 			/*
684 			 * Failed because of an signal. Very likely
685 			 * the caller will not retry.
686 			 */
687 			p->cku_err.re_status = RPC_INTR;
688 			p->cku_err.re_errno = EINTR;
689 			break;
690 		default:
691 			/*
692 			 * All other failures - server down or service
693 			 * down or temporary resource failure. Delay before
694 			 * returning to caller.
695 			 */
696 			ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
697 			p->cku_err.re_status = RPC_CANTCONNECT;
698 			p->cku_err.re_errno = EIO;
699 
700 			if (h->cl_nosignal == TRUE) {
701 				delay(ticks);
702 			} else {
703 				if (delay_sig(ticks) == EINTR) {
704 					p->cku_err.re_status = RPC_INTR;
705 					p->cku_err.re_errno = EINTR;
706 				}
707 			}
708 			break;
709 		}
710 
711 		return (p->cku_err.re_status);
712 	}
713 
714 	if (p->cku_srcaddr.maxlen < conn->c_laddr.len) {
715 		if ((p->cku_srcaddr.maxlen != 0) &&
716 		    (p->cku_srcaddr.buf != NULL))
717 			kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
718 		p->cku_srcaddr.buf = kmem_zalloc(conn->c_laddr.maxlen,
719 		    KM_SLEEP);
720 		p->cku_srcaddr.maxlen = conn->c_laddr.maxlen;
721 	}
722 
723 	p->cku_srcaddr.len = conn->c_laddr.len;
724 	bcopy(conn->c_laddr.buf, p->cku_srcaddr.buf, conn->c_laddr.len);
725 
726 	clnt_check_credit(conn);
727 
728 	status = CLNT_RDMA_FAIL;
729 
730 	rpcsec_gss = gss_i_or_p = FALSE;
731 
732 	if (IS_RPCSEC_GSS(h)) {
733 		rpcsec_gss = TRUE;
734 		if (rpc_gss_get_service_type(h->cl_auth) ==
735 		    rpc_gss_svc_integrity ||
736 		    rpc_gss_get_service_type(h->cl_auth) ==
737 		    rpc_gss_svc_privacy)
738 			gss_i_or_p = TRUE;
739 	}
740 
741 	/*
742 	 * Try a regular RDMA message if RPCSEC_GSS is not being used
743 	 * or if RPCSEC_GSS is being used for authentication only.
744 	 */
745 	if (rpcsec_gss == FALSE ||
746 	    (rpcsec_gss == TRUE && gss_i_or_p == FALSE)) {
747 		/*
748 		 * Grab a send buffer for the request.  Try to
749 		 * encode it to see if it fits. If not, then it
750 		 * needs to be sent in a chunk.
751 		 */
752 		rpcmsg.type = SEND_BUFFER;
753 		if (rdma_buf_alloc(conn, &rpcmsg)) {
754 			DTRACE_PROBE(krpc__e__clntrdma__callit_nobufs);
755 			goto done;
756 		}
757 
758 		/* First try to encode into regular send buffer */
759 		op = RDMA_MSG;
760 
761 		call_xdrp = &callxdr;
762 
763 		xdrrdma_create(call_xdrp, rpcmsg.addr, rpcmsg.len,
764 		    rdma_minchunk, NULL, XDR_ENCODE, conn);
765 
766 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
767 		    xdr_args, argsp);
768 
769 		if (status != CLNT_RDMA_SUCCESS) {
770 			/* Clean up from previous encode attempt */
771 			rdma_buf_free(conn, &rpcmsg);
772 			XDR_DESTROY(call_xdrp);
773 		} else {
774 			XDR_CONTROL(call_xdrp, XDR_RDMA_GET_CHUNK_LEN, &rcil);
775 		}
776 	}
777 
778 	/* If the encode didn't work, then try a NOMSG */
779 	if (status != CLNT_RDMA_SUCCESS) {
780 
781 		msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT + MAX_AUTH_BYTES +
782 		    xdr_sizeof(xdr_args, argsp);
783 
784 		msglen = calc_length(msglen);
785 
786 		/* pick up the lengths for the reply buffer needed */
787 		(void) xdrrdma_sizeof(xdr_args, argsp, 0,
788 		    &rcil.rcil_len, &rcil.rcil_len_alt);
789 
790 		/*
791 		 * Construct a clist to describe the CHUNK_BUFFER
792 		 * for the rpcmsg.
793 		 */
794 		cl_rpcmsg = clist_alloc();
795 		cl_rpcmsg->c_len = msglen;
796 		cl_rpcmsg->rb_longbuf.type = RDMA_LONG_BUFFER;
797 		cl_rpcmsg->rb_longbuf.len = msglen;
798 		if (rdma_buf_alloc(conn, &cl_rpcmsg->rb_longbuf)) {
799 			clist_free(cl_rpcmsg);
800 			goto done;
801 		}
802 		cl_rpcmsg->w.c_saddr3 = cl_rpcmsg->rb_longbuf.addr;
803 
804 		op = RDMA_NOMSG;
805 		call_xdrp = &callxdr;
806 
807 		xdrrdma_create(call_xdrp, cl_rpcmsg->rb_longbuf.addr,
808 		    cl_rpcmsg->rb_longbuf.len, 0,
809 		    cl_rpcmsg, XDR_ENCODE, conn);
810 
811 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
812 		    xdr_args, argsp);
813 
814 		if (status != CLNT_RDMA_SUCCESS) {
815 			p->cku_err.re_status = RPC_CANTENCODEARGS;
816 			p->cku_err.re_errno = EIO;
817 			DTRACE_PROBE(krpc__e__clntrdma__callit__composemsg);
818 			goto done;
819 		}
820 	}
821 
822 	/*
823 	 * During the XDR_ENCODE we may have "allocated" an RDMA READ or
824 	 * RDMA WRITE clist.
825 	 *
826 	 * First pull the RDMA READ chunk list from the XDR private
827 	 * area to keep it handy.
828 	 */
829 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &cl);
830 
831 	if (gss_i_or_p) {
832 		long_reply_len = rcil.rcil_len + rcil.rcil_len_alt;
833 		long_reply_len += MAX_AUTH_BYTES;
834 	} else {
835 		long_reply_len = rcil.rcil_len;
836 	}
837 
838 	/*
839 	 * Update the chunk size information for the Long RPC msg.
840 	 */
841 	if (cl && op == RDMA_NOMSG)
842 		cl->c_len = p->cku_outsz;
843 
844 	/*
845 	 * Prepare the RDMA header. On success xdrs will hold the result
846 	 * of xdrmem_create() for a SEND_BUFFER.
847 	 */
848 	status = clnt_compose_rdma_header(conn, h, &clmsg,
849 	    &rdmahdr_o_xdrs, &op);
850 
851 	if (status != CLNT_RDMA_SUCCESS) {
852 		p->cku_err.re_status = RPC_CANTSEND;
853 		p->cku_err.re_errno = EIO;
854 		RCSTAT_INCR(rcnomem);
855 		DTRACE_PROBE(krpc__e__clntrdma__callit__nobufs2);
856 		goto done;
857 	}
858 
859 	/*
860 	 * Now insert the RDMA READ list iff present
861 	 */
862 	status = clnt_setup_rlist(conn, rdmahdr_o_xdrs, call_xdrp);
863 	if (status != CLNT_RDMA_SUCCESS) {
864 		DTRACE_PROBE(krpc__e__clntrdma__callit__clistreg);
865 		rdma_buf_free(conn, &clmsg);
866 		p->cku_err.re_status = RPC_CANTSEND;
867 		p->cku_err.re_errno = EIO;
868 		goto done;
869 	}
870 
871 	/*
872 	 * Setup RDMA WRITE chunk list for nfs read operation
873 	 * other operations will have a NULL which will result
874 	 * as a NULL list in the XDR stream.
875 	 */
876 	status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp, &rndup);
877 	if (status != CLNT_RDMA_SUCCESS) {
878 		rdma_buf_free(conn, &clmsg);
879 		p->cku_err.re_status = RPC_CANTSEND;
880 		p->cku_err.re_errno = EIO;
881 		goto done;
882 	}
883 
884 	/*
885 	 * If NULL call and RPCSEC_GSS, provide a chunk such that
886 	 * large responses can flow back to the client.
887 	 * If RPCSEC_GSS with integrity or privacy is in use, get chunk.
888 	 */
889 	if ((procnum == 0 && rpcsec_gss == TRUE) ||
890 	    (rpcsec_gss == TRUE && gss_i_or_p == TRUE))
891 		long_reply_len += 1024;
892 
893 	status = clnt_setup_long_reply(conn, &cl_long_reply, long_reply_len);
894 
895 	if (status != CLNT_RDMA_SUCCESS) {
896 		rdma_buf_free(conn, &clmsg);
897 		p->cku_err.re_status = RPC_CANTSEND;
898 		p->cku_err.re_errno = EIO;
899 		goto done;
900 	}
901 
902 	/*
903 	 * XDR encode the RDMA_REPLY write chunk
904 	 */
905 	seg_array_len = (cl_long_reply ? 1 : 0);
906 	(void) xdr_encode_reply_wchunk(rdmahdr_o_xdrs, cl_long_reply,
907 	    seg_array_len);
908 
909 	/*
910 	 * Construct a clist in "sendlist" that represents what we
911 	 * will push over the wire.
912 	 *
913 	 * Start with the RDMA header and clist (if any)
914 	 */
915 	clist_add(&cl_sendlist, 0, XDR_GETPOS(rdmahdr_o_xdrs), &clmsg.handle,
916 	    clmsg.addr, NULL, NULL);
917 
918 	/*
919 	 * Put the RPC call message in  sendlist if small RPC
920 	 */
921 	if (op == RDMA_MSG) {
922 		clist_add(&cl_sendlist, 0, p->cku_outsz, &rpcmsg.handle,
923 		    rpcmsg.addr, NULL, NULL);
924 	} else {
925 		/* Long RPC already in chunk list */
926 		RCSTAT_INCR(rclongrpcs);
927 	}
928 
929 	/*
930 	 * Set up a reply buffer ready for the reply
931 	 */
932 	status = rdma_clnt_postrecv(conn, p->cku_xid);
933 	if (status != RDMA_SUCCESS) {
934 		rdma_buf_free(conn, &clmsg);
935 		p->cku_err.re_status = RPC_CANTSEND;
936 		p->cku_err.re_errno = EIO;
937 		goto done;
938 	}
939 
940 	/*
941 	 * sync the memory for dma
942 	 */
943 	if (cl != NULL) {
944 		status = clist_syncmem(conn, cl, CLIST_REG_SOURCE);
945 		if (status != RDMA_SUCCESS) {
946 			(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
947 			rdma_buf_free(conn, &clmsg);
948 			p->cku_err.re_status = RPC_CANTSEND;
949 			p->cku_err.re_errno = EIO;
950 			goto done;
951 		}
952 	}
953 
954 	/*
955 	 * Send the RDMA Header and RPC call message to the server
956 	 */
957 	status = RDMA_SEND(conn, cl_sendlist, p->cku_xid);
958 	if (status != RDMA_SUCCESS) {
959 		(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
960 		p->cku_err.re_status = RPC_CANTSEND;
961 		p->cku_err.re_errno = EIO;
962 		goto done;
963 	}
964 
965 	/*
966 	 * RDMA plugin now owns the send msg buffers.
967 	 * Clear them out and don't free them.
968 	 */
969 	clmsg.addr = NULL;
970 	if (rpcmsg.type == SEND_BUFFER)
971 		rpcmsg.addr = NULL;
972 
973 	/*
974 	 * Recv rpc reply
975 	 */
976 	status = RDMA_RECV(conn, &cl_recvlist, p->cku_xid);
977 
978 	/*
979 	 * Now check recv status
980 	 */
981 	if (status != 0) {
982 		if (status == RDMA_INTR) {
983 			p->cku_err.re_status = RPC_INTR;
984 			p->cku_err.re_errno = EINTR;
985 			RCSTAT_INCR(rcintrs);
986 		} else if (status == RPC_TIMEDOUT) {
987 			p->cku_err.re_status = RPC_TIMEDOUT;
988 			p->cku_err.re_errno = ETIMEDOUT;
989 			RCSTAT_INCR(rctimeouts);
990 		} else {
991 			p->cku_err.re_status = RPC_CANTRECV;
992 			p->cku_err.re_errno = EIO;
993 		}
994 		goto done;
995 	}
996 
997 	/*
998 	 * Process the reply message.
999 	 *
1000 	 * First the chunk list (if any)
1001 	 */
1002 	rdmahdr_i_xdrs = &(p->cku_inxdr);
1003 	xdrmem_create(rdmahdr_i_xdrs,
1004 	    (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3,
1005 	    cl_recvlist->c_len, XDR_DECODE);
1006 
1007 	/*
1008 	 * Treat xid as opaque (xid is the first entity
1009 	 * in the rpc rdma message).
1010 	 * Skip xid and set the xdr position accordingly.
1011 	 */
1012 	XDR_SETPOS(rdmahdr_i_xdrs, sizeof (uint32_t));
1013 	(void) xdr_u_int(rdmahdr_i_xdrs, &vers);
1014 	(void) xdr_u_int(rdmahdr_i_xdrs, &rdma_credit);
1015 	(void) xdr_u_int(rdmahdr_i_xdrs, &op);
1016 	(void) xdr_do_clist(rdmahdr_i_xdrs, &cl);
1017 
1018 	clnt_update_credit(conn, rdma_credit);
1019 
1020 	wlist_exists_reply = FALSE;
1021 	if (! xdr_decode_wlist(rdmahdr_i_xdrs, &cl_rpcreply_wlist,
1022 	    &wlist_exists_reply)) {
1023 		DTRACE_PROBE(krpc__e__clntrdma__callit__wlist_decode);
1024 		p->cku_err.re_status = RPC_CANTDECODERES;
1025 		p->cku_err.re_errno = EIO;
1026 		goto done;
1027 	}
1028 
1029 	/*
1030 	 * The server shouldn't have sent a RDMA_SEND that
1031 	 * the client needs to RDMA_WRITE a reply back to
1032 	 * the server.  So silently ignoring what the
1033 	 * server returns in the rdma_reply section of the
1034 	 * header.
1035 	 */
1036 	(void) xdr_decode_reply_wchunk(rdmahdr_i_xdrs, &cl_rdma_reply);
1037 	off = xdr_getpos(rdmahdr_i_xdrs);
1038 
1039 	clnt_decode_long_reply(conn, cl_long_reply,
1040 	    cl_rdma_reply, &replyxdr, &reply_xdrp,
1041 	    cl, cl_recvlist, op, off);
1042 
1043 	if (reply_xdrp == NULL)
1044 		goto done;
1045 
1046 	if (wlist_exists_reply) {
1047 		XDR_CONTROL(reply_xdrp, XDR_RDMA_SET_WLIST, cl_rpcreply_wlist);
1048 	}
1049 
1050 	reply_msg.rm_direction = REPLY;
1051 	reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
1052 	reply_msg.acpted_rply.ar_stat = SUCCESS;
1053 	reply_msg.acpted_rply.ar_verf = _null_auth;
1054 
1055 	/*
1056 	 *  xdr_results will be done in AUTH_UNWRAP.
1057 	 */
1058 	reply_msg.acpted_rply.ar_results.where = NULL;
1059 	reply_msg.acpted_rply.ar_results.proc = xdr_void;
1060 
1061 	/*
1062 	 * Decode and validate the response.
1063 	 */
1064 	if (xdr_replymsg(reply_xdrp, &reply_msg)) {
1065 		enum clnt_stat re_status;
1066 
1067 		_seterr_reply(&reply_msg, &(p->cku_err));
1068 
1069 		re_status = p->cku_err.re_status;
1070 		if (re_status == RPC_SUCCESS) {
1071 			/*
1072 			 * Reply is good, check auth.
1073 			 */
1074 			if (!AUTH_VALIDATE(h->cl_auth,
1075 			    &reply_msg.acpted_rply.ar_verf)) {
1076 				p->cku_err.re_status = RPC_AUTHERROR;
1077 				p->cku_err.re_why = AUTH_INVALIDRESP;
1078 				RCSTAT_INCR(rcbadverfs);
1079 				DTRACE_PROBE(
1080 				    krpc__e__clntrdma__callit__authvalidate);
1081 			} else if (!AUTH_UNWRAP(h->cl_auth, reply_xdrp,
1082 			    xdr_results, resultsp)) {
1083 				p->cku_err.re_status = RPC_CANTDECODERES;
1084 				p->cku_err.re_errno = EIO;
1085 				DTRACE_PROBE(
1086 				    krpc__e__clntrdma__callit__authunwrap);
1087 			}
1088 		} else {
1089 			/* set errno in case we can't recover */
1090 			if (re_status != RPC_VERSMISMATCH &&
1091 			    re_status != RPC_AUTHERROR &&
1092 			    re_status != RPC_PROGVERSMISMATCH)
1093 				p->cku_err.re_errno = EIO;
1094 
1095 			if (re_status == RPC_AUTHERROR) {
1096 				if ((refresh_attempt > 0) &&
1097 				    AUTH_REFRESH(h->cl_auth, &reply_msg,
1098 				    p->cku_cred)) {
1099 					refresh_attempt--;
1100 					try_call_again = 1;
1101 					goto done;
1102 				}
1103 
1104 				try_call_again = 0;
1105 
1106 				/*
1107 				 * We have used the client handle to
1108 				 * do an AUTH_REFRESH and the RPC status may
1109 				 * be set to RPC_SUCCESS; Let's make sure to
1110 				 * set it to RPC_AUTHERROR.
1111 				 */
1112 				p->cku_err.re_status = RPC_AUTHERROR;
1113 
1114 				/*
1115 				 * Map recoverable and unrecoverable
1116 				 * authentication errors to appropriate
1117 				 * errno
1118 				 */
1119 				switch (p->cku_err.re_why) {
1120 				case AUTH_BADCRED:
1121 				case AUTH_BADVERF:
1122 				case AUTH_INVALIDRESP:
1123 				case AUTH_TOOWEAK:
1124 				case AUTH_FAILED:
1125 				case RPCSEC_GSS_NOCRED:
1126 				case RPCSEC_GSS_FAILED:
1127 					p->cku_err.re_errno = EACCES;
1128 					break;
1129 				case AUTH_REJECTEDCRED:
1130 				case AUTH_REJECTEDVERF:
1131 				default:
1132 					p->cku_err.re_errno = EIO;
1133 					break;
1134 				}
1135 			}
1136 			DTRACE_PROBE1(krpc__e__clntrdma__callit__rpcfailed,
1137 			    int, p->cku_err.re_why);
1138 		}
1139 	} else {
1140 		p->cku_err.re_status = RPC_CANTDECODERES;
1141 		p->cku_err.re_errno = EIO;
1142 		DTRACE_PROBE(krpc__e__clntrdma__callit__replymsg);
1143 	}
1144 
1145 done:
1146 	clnt_return_credit(conn);
1147 
1148 	if (cl_sendlist != NULL)
1149 		clist_free(cl_sendlist);
1150 
1151 	/*
1152 	 * If rpc reply is in a chunk, free it now.
1153 	 */
1154 	if (cl_long_reply) {
1155 		(void) clist_deregister(conn, cl_long_reply);
1156 		rdma_buf_free(conn, &cl_long_reply->rb_longbuf);
1157 		clist_free(cl_long_reply);
1158 	}
1159 
1160 	if (call_xdrp)
1161 		XDR_DESTROY(call_xdrp);
1162 
1163 	if (rndup.rb_private) {
1164 		rdma_buf_free(conn, &rndup);
1165 	}
1166 
1167 	if (reply_xdrp) {
1168 		(void) xdr_rpc_free_verifier(reply_xdrp, &reply_msg);
1169 		XDR_DESTROY(reply_xdrp);
1170 	}
1171 
1172 	if (cl_rdma_reply) {
1173 		clist_free(cl_rdma_reply);
1174 	}
1175 
1176 	if (cl_recvlist) {
1177 		rdma_buf_t	recvmsg = {0};
1178 		recvmsg.addr = (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3;
1179 		recvmsg.type = RECV_BUFFER;
1180 		RDMA_BUF_FREE(conn, &recvmsg);
1181 		clist_free(cl_recvlist);
1182 	}
1183 
1184 	RDMA_REL_CONN(conn);
1185 
1186 	if (try_call_again)
1187 		goto call_again;
1188 
1189 	if (p->cku_err.re_status != RPC_SUCCESS) {
1190 		RCSTAT_INCR(rcbadcalls);
1191 	}
1192 	return (p->cku_err.re_status);
1193 }
1194 
1195 
1196 static void
1197 clnt_decode_long_reply(CONN *conn,
1198     struct clist *cl_long_reply,
1199     struct clist *cl_rdma_reply, XDR *xdrs,
1200     XDR **rxdrp, struct clist *cl,
1201     struct clist *cl_recvlist,
1202     uint_t  op, uint_t off)
1203 {
1204 	if (op != RDMA_NOMSG) {
1205 		DTRACE_PROBE1(krpc__i__longrepl__rdmamsg__len,
1206 		    int, cl_recvlist->c_len - off);
1207 		xdrrdma_create(xdrs,
1208 		    (caddr_t)(uintptr_t)(cl_recvlist->w.c_saddr3 + off),
1209 		    cl_recvlist->c_len - off, 0, cl, XDR_DECODE, conn);
1210 		*rxdrp = xdrs;
1211 		return;
1212 	}
1213 
1214 	/* op must be RDMA_NOMSG */
1215 	if (cl) {
1216 		DTRACE_PROBE(krpc__e__clntrdma__declongreply__serverreadlist);
1217 		return;
1218 	}
1219 
1220 	if (cl_long_reply->u.c_daddr) {
1221 		DTRACE_PROBE1(krpc__i__longrepl__rdmanomsg__len,
1222 		    int, cl_rdma_reply->c_len);
1223 
1224 		xdrrdma_create(xdrs, (caddr_t)cl_long_reply->u.c_daddr3,
1225 		    cl_rdma_reply->c_len, 0, NULL, XDR_DECODE, conn);
1226 
1227 		*rxdrp = xdrs;
1228 	}
1229 }
1230 
1231 static void
1232 clnt_return_credit(CONN *conn)
1233 {
1234 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1235 
1236 	mutex_enter(&conn->c_lock);
1237 	cc_info->clnt_cc_in_flight_ops--;
1238 	cv_signal(&cc_info->clnt_cc_cv);
1239 	mutex_exit(&conn->c_lock);
1240 }
1241 
1242 static void
1243 clnt_update_credit(CONN *conn, uint32_t rdma_credit)
1244 {
1245 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1246 
1247 	/*
1248 	 * If the granted has not altered, avoid taking the
1249 	 * mutex, to essentially do nothing..
1250 	 */
1251 	if (cc_info->clnt_cc_granted_ops == rdma_credit)
1252 		return;
1253 	/*
1254 	 * Get the granted number of buffers for credit control.
1255 	 */
1256 	mutex_enter(&conn->c_lock);
1257 	cc_info->clnt_cc_granted_ops = rdma_credit;
1258 	mutex_exit(&conn->c_lock);
1259 }
1260 
1261 static void
1262 clnt_check_credit(CONN *conn)
1263 {
1264 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1265 
1266 	/*
1267 	 * Make sure we are not going over our allowed buffer use
1268 	 * (and make sure we have gotten a granted value before).
1269 	 */
1270 	mutex_enter(&conn->c_lock);
1271 	while (cc_info->clnt_cc_in_flight_ops >= cc_info->clnt_cc_granted_ops &&
1272 	    cc_info->clnt_cc_granted_ops != 0) {
1273 		/*
1274 		 * Client has maxed out its granted buffers due to
1275 		 * credit control.  Current handling is to block and wait.
1276 		 */
1277 		cv_wait(&cc_info->clnt_cc_cv, &conn->c_lock);
1278 	}
1279 	cc_info->clnt_cc_in_flight_ops++;
1280 	mutex_exit(&conn->c_lock);
1281 }
1282 
1283 /* ARGSUSED */
1284 static void
1285 clnt_rdma_kabort(CLIENT *h)
1286 {
1287 }
1288 
1289 static void
1290 clnt_rdma_kerror(CLIENT *h, struct rpc_err *err)
1291 {
1292 	struct cku_private *p = htop(h);
1293 	*err = p->cku_err;
1294 }
1295 
1296 static bool_t
1297 clnt_rdma_kfreeres(CLIENT *h, xdrproc_t xdr_res, caddr_t res_ptr)
1298 {
1299 	struct cku_private *p = htop(h);
1300 	XDR *xdrs;
1301 
1302 	xdrs = &(p->cku_outxdr);
1303 	xdrs->x_op = XDR_FREE;
1304 	return ((*xdr_res)(xdrs, res_ptr));
1305 }
1306 
1307 /* ARGSUSED */
1308 static bool_t
1309 clnt_rdma_kcontrol(CLIENT *h, int cmd, char *arg)
1310 {
1311 	return (TRUE);
1312 }
1313 
1314 /* ARGSUSED */
1315 static int
1316 clnt_rdma_ksettimers(CLIENT *h, struct rpc_timers *t, struct rpc_timers *all,
1317 	int minimum, void(*feedback)(int, int, caddr_t), caddr_t arg,
1318 	uint32_t xid)
1319 {
1320 	RCSTAT_INCR(rctimers);
1321 	return (0);
1322 }
1323 
1324 int
1325 rdma_reachable(int addr_type, struct netbuf *addr, struct knetconfig **knconf)
1326 {
1327 	rdma_registry_t	*rp;
1328 	void *handle = NULL;
1329 	struct knetconfig *knc;
1330 	char *pf, *p;
1331 	rdma_stat status;
1332 	int error = 0;
1333 
1334 	if (!INGLOBALZONE(curproc))
1335 		return (-1);
1336 
1337 	/*
1338 	 * modload the RDMA plugins if not already done.
1339 	 */
1340 	if (!rdma_modloaded) {
1341 		mutex_enter(&rdma_modload_lock);
1342 		if (!rdma_modloaded) {
1343 			error = rdma_modload();
1344 		}
1345 		mutex_exit(&rdma_modload_lock);
1346 		if (error)
1347 			return (-1);
1348 	}
1349 
1350 	if (!rdma_dev_available)
1351 		return (-1);
1352 
1353 	rw_enter(&rdma_lock, RW_READER);
1354 	rp = rdma_mod_head;
1355 	while (rp != NULL) {
1356 		if (rp->r_mod_state == RDMA_MOD_INACTIVE) {
1357 			rp = rp->r_next;
1358 			continue;
1359 		}
1360 		status = RDMA_REACHABLE(rp->r_mod->rdma_ops, addr_type, addr,
1361 		    &handle);
1362 		if (status == RDMA_SUCCESS) {
1363 			knc = kmem_zalloc(sizeof (struct knetconfig),
1364 			    KM_SLEEP);
1365 			knc->knc_semantics = NC_TPI_RDMA;
1366 			pf = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1367 			p = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1368 			if (addr_type == AF_INET)
1369 				(void) strncpy(pf, NC_INET, KNC_STRSIZE);
1370 			else if (addr_type == AF_INET6)
1371 				(void) strncpy(pf, NC_INET6, KNC_STRSIZE);
1372 			pf[KNC_STRSIZE - 1] = '\0';
1373 
1374 			(void) strncpy(p, rp->r_mod->rdma_api, KNC_STRSIZE);
1375 			p[KNC_STRSIZE - 1] = '\0';
1376 
1377 			knc->knc_protofmly = pf;
1378 			knc->knc_proto = p;
1379 			knc->knc_rdev = (dev_t)rp;
1380 			*knconf = knc;
1381 			rw_exit(&rdma_lock);
1382 			return (0);
1383 		}
1384 		rp = rp->r_next;
1385 	}
1386 	rw_exit(&rdma_lock);
1387 	return (-1);
1388 }
1389