xref: /titanic_50/usr/src/uts/common/rpc/clnt_rdma.c (revision cd997836b08639dc4d44a032cadd0c7d526f960c)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
26 /* All Rights Reserved */
27 /*
28  * Portions of this source code were derived from Berkeley
29  * 4.3 BSD under license from the Regents of the University of
30  * California.
31  */
32 
33 #include <sys/param.h>
34 #include <sys/types.h>
35 #include <sys/user.h>
36 #include <sys/systm.h>
37 #include <sys/sysmacros.h>
38 #include <sys/errno.h>
39 #include <sys/kmem.h>
40 #include <sys/debug.h>
41 #include <sys/systm.h>
42 #include <sys/kstat.h>
43 #include <sys/t_lock.h>
44 #include <sys/ddi.h>
45 #include <sys/cmn_err.h>
46 #include <sys/time.h>
47 #include <sys/isa_defs.h>
48 #include <sys/zone.h>
49 #include <sys/sdt.h>
50 
51 #include <rpc/types.h>
52 #include <rpc/xdr.h>
53 #include <rpc/auth.h>
54 #include <rpc/clnt.h>
55 #include <rpc/rpc_msg.h>
56 #include <rpc/rpc_rdma.h>
57 #include <nfs/nfs.h>
58 #include <nfs/nfs4_kprot.h>
59 
60 static uint32_t rdma_bufs_rqst = RDMA_BUFS_RQST;
61 
62 static int clnt_compose_rpcmsg(CLIENT *, rpcproc_t, rdma_buf_t *,
63 			    XDR *, xdrproc_t, caddr_t);
64 static int  clnt_compose_rdma_header(CONN *, CLIENT *, rdma_buf_t *,
65 		    XDR **, uint_t *);
66 static int clnt_setup_rlist(CONN *, XDR *, XDR *);
67 static int clnt_setup_wlist(CONN *, XDR *, XDR *, rdma_buf_t *);
68 static int clnt_setup_long_reply(CONN *, struct clist **, uint_t);
69 static void clnt_check_credit(CONN *);
70 static void clnt_return_credit(CONN *);
71 static void clnt_decode_long_reply(CONN *, struct clist *,
72 		struct clist *, XDR *, XDR **, struct clist *,
73 		struct clist *, uint_t, uint_t);
74 
75 static void clnt_update_credit(CONN *, uint32_t);
76 
77 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
78     caddr_t, xdrproc_t, caddr_t, struct timeval);
79 static void	clnt_rdma_kabort(CLIENT *);
80 static void	clnt_rdma_kerror(CLIENT *, struct rpc_err *);
81 static bool_t	clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
82 static void	clnt_rdma_kdestroy(CLIENT *);
83 static bool_t	clnt_rdma_kcontrol(CLIENT *, int, char *);
84 static int	clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
85     struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);
86 
87 /*
88  * Operations vector for RDMA based RPC
89  */
90 static struct clnt_ops rdma_clnt_ops = {
91 	clnt_rdma_kcallit,	/* do rpc call */
92 	clnt_rdma_kabort,	/* abort call */
93 	clnt_rdma_kerror,	/* return error status */
94 	clnt_rdma_kfreeres,	/* free results */
95 	clnt_rdma_kdestroy,	/* destroy rpc handle */
96 	clnt_rdma_kcontrol,	/* the ioctl() of rpc */
97 	clnt_rdma_ksettimers,	/* set retry timers */
98 };
99 
100 /*
101  * The size of the preserialized RPC header information.
102  */
103 #define	CKU_HDRSIZE	20
104 #define	CLNT_RDMA_SUCCESS 0
105 #define	CLNT_RDMA_FAIL (-1)
106 
107 #define	AUTH_REFRESH_COUNT 2
108 
109 #define	IS_RPCSEC_GSS(authh)			\
110 	(authh->cl_auth->ah_cred.oa_flavor == RPCSEC_GSS)
111 
112 /*
113  * Per RPC RDMA endpoint details
114  */
115 typedef struct cku_private {
116 	CLIENT			cku_client;	/* client handle */
117 	rdma_mod_t		*cku_rd_mod;	/* underlying RDMA mod */
118 	void			*cku_rd_handle;	/* underlying RDMA device */
119 	struct netbuf		cku_srcaddr;	/* source address for retries */
120 	struct netbuf		cku_addr;	/* remote netbuf address */
121 	int			cku_addrfmly;	/* for finding addr_type */
122 	struct rpc_err		cku_err;	/* error status */
123 	struct cred		*cku_cred;	/* credentials */
124 	XDR			cku_outxdr;	/* xdr stream for output */
125 	uint32_t		cku_outsz;
126 	XDR			cku_inxdr;	/* xdr stream for input */
127 	char			cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
128 	uint32_t		cku_xid;	/* current XID */
129 } cku_private_t;
130 
131 #define	CLNT_RDMA_DELAY	10	/* secs to delay after a connection failure */
132 static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;
133 
134 struct {
135 	kstat_named_t	rccalls;
136 	kstat_named_t	rcbadcalls;
137 	kstat_named_t	rcbadxids;
138 	kstat_named_t	rctimeouts;
139 	kstat_named_t	rcnewcreds;
140 	kstat_named_t	rcbadverfs;
141 	kstat_named_t	rctimers;
142 	kstat_named_t	rccantconn;
143 	kstat_named_t	rcnomem;
144 	kstat_named_t	rcintrs;
145 	kstat_named_t	rclongrpcs;
146 } rdmarcstat = {
147 	{ "calls",	KSTAT_DATA_UINT64 },
148 	{ "badcalls",	KSTAT_DATA_UINT64 },
149 	{ "badxids",	KSTAT_DATA_UINT64 },
150 	{ "timeouts",	KSTAT_DATA_UINT64 },
151 	{ "newcreds",	KSTAT_DATA_UINT64 },
152 	{ "badverfs",	KSTAT_DATA_UINT64 },
153 	{ "timers",	KSTAT_DATA_UINT64 },
154 	{ "cantconn",	KSTAT_DATA_UINT64 },
155 	{ "nomem",	KSTAT_DATA_UINT64 },
156 	{ "interrupts", KSTAT_DATA_UINT64 },
157 	{ "longrpc", 	KSTAT_DATA_UINT64 }
158 };
159 
160 kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
161 uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);
162 
163 #ifdef DEBUG
164 int rdma_clnt_debug = 0;
165 #endif
166 
167 #ifdef accurate_stats
168 extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */
169 
170 #define	RCSTAT_INCR(x)			\
171 	mutex_enter(&rdmarcstat_lock);	\
172 	rdmarcstat.x.value.ui64++;	\
173 	mutex_exit(&rdmarcstat_lock);
174 #else
175 #define	RCSTAT_INCR(x)			\
176 	rdmarcstat.x.value.ui64++;
177 #endif
178 
179 #define	ptoh(p)		(&((p)->cku_client))
180 #define	htop(h)		((cku_private_t *)((h)->cl_private))
181 
182 uint_t
183 calc_length(uint_t len)
184 {
185 	len = RNDUP(len);
186 
187 	if (len <= 64 * 1024) {
188 		if (len > 32 * 1024) {
189 			len = 64 * 1024;
190 		} else {
191 			if (len > 16 * 1024) {
192 				len = 32 * 1024;
193 			} else {
194 				if (len > 8 * 1024) {
195 					len = 16 * 1024;
196 				} else {
197 					len = 8 * 1024;
198 				}
199 			}
200 		}
201 	}
202 	return (len);
203 }
204 int
205 clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
206     rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
207 {
208 	CLIENT *h;
209 	struct cku_private *p;
210 	struct rpc_msg call_msg;
211 	rdma_registry_t *rp;
212 
213 	ASSERT(INGLOBALZONE(curproc));
214 
215 	if (cl == NULL)
216 		return (EINVAL);
217 	*cl = NULL;
218 
219 	p = kmem_zalloc(sizeof (*p), KM_SLEEP);
220 
221 	/*
222 	 * Find underlying RDMATF plugin
223 	 */
224 	rw_enter(&rdma_lock, RW_READER);
225 	rp = rdma_mod_head;
226 	while (rp != NULL) {
227 		if (strcmp(rp->r_mod->rdma_api, proto))
228 			rp = rp->r_next;
229 		else {
230 			p->cku_rd_mod = rp->r_mod;
231 			p->cku_rd_handle = handle;
232 			break;
233 		}
234 	}
235 	rw_exit(&rdma_lock);
236 
237 	if (p->cku_rd_mod == NULL) {
238 		/*
239 		 * Should not happen.
240 		 * No matching RDMATF plugin.
241 		 */
242 		kmem_free(p, sizeof (struct cku_private));
243 		return (EINVAL);
244 	}
245 
246 	h = ptoh(p);
247 	h->cl_ops = &rdma_clnt_ops;
248 	h->cl_private = (caddr_t)p;
249 	h->cl_auth = authkern_create();
250 
251 	/* call message, just used to pre-serialize below */
252 	call_msg.rm_xid = 0;
253 	call_msg.rm_direction = CALL;
254 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
255 	call_msg.rm_call.cb_prog = pgm;
256 	call_msg.rm_call.cb_vers = vers;
257 
258 	xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
259 	/* pre-serialize call message header */
260 	if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
261 		XDR_DESTROY(&p->cku_outxdr);
262 		auth_destroy(h->cl_auth);
263 		kmem_free(p, sizeof (struct cku_private));
264 		return (EINVAL);
265 	}
266 
267 	/*
268 	 * Set up the rpc information
269 	 */
270 	p->cku_cred = cred;
271 	p->cku_srcaddr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
272 	p->cku_srcaddr.maxlen = raddr->maxlen;
273 	p->cku_srcaddr.len = 0;
274 	p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
275 	p->cku_addr.maxlen = raddr->maxlen;
276 	p->cku_addr.len = raddr->len;
277 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
278 	p->cku_addrfmly = family;
279 
280 	*cl = h;
281 	return (0);
282 }
283 
284 static void
285 clnt_rdma_kdestroy(CLIENT *h)
286 {
287 	struct cku_private *p = htop(h);
288 
289 	kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
290 	kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
291 	kmem_free(p, sizeof (*p));
292 }
293 
294 void
295 clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
296     struct cred *cred)
297 {
298 	struct cku_private *p = htop(h);
299 	rdma_registry_t *rp;
300 
301 	ASSERT(INGLOBALZONE(curproc));
302 	/*
303 	 * Find underlying RDMATF plugin
304 	 */
305 	p->cku_rd_mod = NULL;
306 	rw_enter(&rdma_lock, RW_READER);
307 	rp = rdma_mod_head;
308 	while (rp != NULL) {
309 		if (strcmp(rp->r_mod->rdma_api, proto))
310 			rp = rp->r_next;
311 		else {
312 			p->cku_rd_mod = rp->r_mod;
313 			p->cku_rd_handle = handle;
314 			break;
315 		}
316 
317 	}
318 	rw_exit(&rdma_lock);
319 
320 	/*
321 	 * Set up the rpc information
322 	 */
323 	p->cku_cred = cred;
324 	p->cku_xid = 0;
325 
326 	if (p->cku_addr.maxlen < raddr->len) {
327 		if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
328 			kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
329 		p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
330 		p->cku_addr.maxlen = raddr->maxlen;
331 	}
332 
333 	p->cku_srcaddr.len = 0;
334 
335 	p->cku_addr.len = raddr->len;
336 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
337 	h->cl_ops = &rdma_clnt_ops;
338 }
339 
340 static int
341 clnt_compose_rpcmsg(CLIENT *h, rpcproc_t procnum,
342     rdma_buf_t *rpcmsg, XDR *xdrs,
343     xdrproc_t xdr_args, caddr_t argsp)
344 {
345 	cku_private_t *p = htop(h);
346 
347 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
348 		/*
349 		 * Copy in the preserialized RPC header
350 		 * information.
351 		 */
352 		bcopy(p->cku_rpchdr, rpcmsg->addr, CKU_HDRSIZE);
353 
354 		/*
355 		 * transaction id is the 1st thing in the output
356 		 * buffer.
357 		 */
358 		/* LINTED pointer alignment */
359 		(*(uint32_t *)(rpcmsg->addr)) = p->cku_xid;
360 
361 		/* Skip the preserialized stuff. */
362 		XDR_SETPOS(xdrs, CKU_HDRSIZE);
363 
364 		/* Serialize dynamic stuff into the output buffer. */
365 		if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
366 		    (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
367 		    (!(*xdr_args)(xdrs, argsp))) {
368 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__dynargs);
369 			return (CLNT_RDMA_FAIL);
370 		}
371 		p->cku_outsz = XDR_GETPOS(xdrs);
372 	} else {
373 		uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
374 		IXDR_PUT_U_INT32(uproc, procnum);
375 		(*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
376 		XDR_SETPOS(xdrs, 0);
377 
378 		/* Serialize the procedure number and the arguments. */
379 		if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
380 		    CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
381 			if (rpcmsg->addr != xdrs->x_base) {
382 				rpcmsg->addr = xdrs->x_base;
383 				rpcmsg->len = xdr_getbufsize(xdrs);
384 			}
385 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__procnum);
386 			return (CLNT_RDMA_FAIL);
387 		}
388 		/*
389 		 * If we had to allocate a new buffer while encoding
390 		 * then update the addr and len.
391 		 */
392 		if (rpcmsg->addr != xdrs->x_base) {
393 			rpcmsg->addr = xdrs->x_base;
394 			rpcmsg->len = xdr_getbufsize(xdrs);
395 		}
396 
397 		p->cku_outsz = XDR_GETPOS(xdrs);
398 		DTRACE_PROBE1(krpc__i__compose__size__sec, int, p->cku_outsz)
399 	}
400 
401 	return (CLNT_RDMA_SUCCESS);
402 }
403 
404 static int
405 clnt_compose_rdma_header(CONN *conn, CLIENT *h, rdma_buf_t *clmsg,
406     XDR **xdrs, uint_t *op)
407 {
408 	cku_private_t *p = htop(h);
409 	uint_t vers;
410 	uint32_t rdma_credit = rdma_bufs_rqst;
411 
412 	vers = RPCRDMA_VERS;
413 	clmsg->type = SEND_BUFFER;
414 
415 	if (rdma_buf_alloc(conn, clmsg)) {
416 		return (CLNT_RDMA_FAIL);
417 	}
418 
419 	*xdrs = &p->cku_outxdr;
420 	xdrmem_create(*xdrs, clmsg->addr, clmsg->len, XDR_ENCODE);
421 
422 	(*(uint32_t *)clmsg->addr) = p->cku_xid;
423 	XDR_SETPOS(*xdrs, sizeof (uint32_t));
424 	(void) xdr_u_int(*xdrs, &vers);
425 	(void) xdr_u_int(*xdrs, &rdma_credit);
426 	(void) xdr_u_int(*xdrs, op);
427 
428 	return (CLNT_RDMA_SUCCESS);
429 }
430 
431 /*
432  * If xp_cl is NULL value, then the RPC payload will NOT carry
433  * an RDMA READ chunk list, in this case we insert FALSE into
434  * the XDR stream. Otherwise we use the clist and RDMA register
435  * the memory and encode the clist into the outbound XDR stream.
436  */
437 static int
438 clnt_setup_rlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
439 {
440 	int status;
441 	struct clist *rclp;
442 	int32_t xdr_flag = XDR_RDMA_RLIST_REG;
443 
444 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &rclp);
445 
446 	if (rclp != NULL) {
447 		status = clist_register(conn, rclp, CLIST_REG_SOURCE);
448 		if (status != RDMA_SUCCESS) {
449 			return (CLNT_RDMA_FAIL);
450 		}
451 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
452 	}
453 	(void) xdr_do_clist(xdrs, &rclp);
454 
455 	return (CLNT_RDMA_SUCCESS);
456 }
457 
458 /*
459  * If xp_wcl is NULL value, then the RPC payload will NOT carry
460  * an RDMA WRITE chunk list, in this case we insert FALSE into
461  * the XDR stream. Otherwise we use the clist and  RDMA register
462  * the memory and encode the clist into the outbound XDR stream.
463  */
464 static int
465 clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp, rdma_buf_t *rndbuf)
466 {
467 	int status;
468 	struct clist *wlist, *rndcl;
469 	int wlen, rndlen;
470 	int32_t xdr_flag = XDR_RDMA_WLIST_REG;
471 
472 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_WLIST, &wlist);
473 
474 	if (wlist != NULL) {
475 		/*
476 		 * If we are sending a non 4-byte alligned length
477 		 * the server will roundup the length to 4-byte
478 		 * boundary. In such a case, a trailing chunk is
479 		 * added to take any spill over roundup bytes.
480 		 */
481 		wlen = clist_len(wlist);
482 		rndlen = (roundup(wlen, BYTES_PER_XDR_UNIT) - wlen);
483 		if (rndlen) {
484 			rndcl = clist_alloc();
485 			/*
486 			 * calc_length() will allocate a PAGESIZE
487 			 * buffer below.
488 			 */
489 			rndcl->c_len = calc_length(rndlen);
490 			rndcl->rb_longbuf.type = RDMA_LONG_BUFFER;
491 			rndcl->rb_longbuf.len = rndcl->c_len;
492 			if (rdma_buf_alloc(conn, &rndcl->rb_longbuf)) {
493 				clist_free(rndcl);
494 				return (CLNT_RDMA_FAIL);
495 			}
496 
497 			/* Roundup buffer freed back in caller */
498 			*rndbuf = rndcl->rb_longbuf;
499 
500 			rndcl->u.c_daddr3 = rndcl->rb_longbuf.addr;
501 			rndcl->c_next = NULL;
502 			rndcl->c_dmemhandle = rndcl->rb_longbuf.handle;
503 			wlist->c_next = rndcl;
504 		}
505 
506 		status = clist_register(conn, wlist, CLIST_REG_DST);
507 		if (status != RDMA_SUCCESS) {
508 			rdma_buf_free(conn, rndbuf);
509 			bzero(rndbuf, sizeof (rdma_buf_t));
510 			return (CLNT_RDMA_FAIL);
511 		}
512 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
513 	}
514 
515 	if (!xdr_encode_wlist(xdrs, wlist)) {
516 		if (rndlen) {
517 			rdma_buf_free(conn, rndbuf);
518 			bzero(rndbuf, sizeof (rdma_buf_t));
519 		}
520 		return (CLNT_RDMA_FAIL);
521 	}
522 
523 	return (CLNT_RDMA_SUCCESS);
524 }
525 
526 static int
527 clnt_setup_long_reply(CONN *conn, struct clist **clpp, uint_t length)
528 {
529 	if (length == 0) {
530 		*clpp = NULL;
531 		return (CLNT_RDMA_SUCCESS);
532 	}
533 
534 	*clpp = clist_alloc();
535 
536 	(*clpp)->rb_longbuf.len = calc_length(length);
537 	(*clpp)->rb_longbuf.type = RDMA_LONG_BUFFER;
538 
539 	if (rdma_buf_alloc(conn, &((*clpp)->rb_longbuf))) {
540 		clist_free(*clpp);
541 		*clpp = NULL;
542 		return (CLNT_RDMA_FAIL);
543 	}
544 
545 	(*clpp)->u.c_daddr3 = (*clpp)->rb_longbuf.addr;
546 	(*clpp)->c_len = (*clpp)->rb_longbuf.len;
547 	(*clpp)->c_next = NULL;
548 	(*clpp)->c_dmemhandle = (*clpp)->rb_longbuf.handle;
549 
550 	if (clist_register(conn, *clpp, CLIST_REG_DST)) {
551 		DTRACE_PROBE(krpc__e__clntrdma__longrep_regbuf);
552 		rdma_buf_free(conn, &((*clpp)->rb_longbuf));
553 		clist_free(*clpp);
554 		return (CLNT_RDMA_FAIL);
555 	}
556 
557 	return (CLNT_RDMA_SUCCESS);
558 }
559 
560 /* ARGSUSED */
561 static enum clnt_stat
562 clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
563     caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp,
564     struct timeval wait)
565 {
566 	cku_private_t *p = htop(h);
567 
568 	int 	try_call_again;
569 	int	refresh_attempt = AUTH_REFRESH_COUNT;
570 	int 	status;
571 	int 	msglen;
572 
573 	XDR	*call_xdrp, callxdr; /* for xdrrdma encoding the RPC call */
574 	XDR	*reply_xdrp, replyxdr; /* for xdrrdma decoding the RPC reply */
575 	XDR 	*rdmahdr_o_xdrs, *rdmahdr_i_xdrs;
576 
577 	struct rpc_msg 	reply_msg;
578 	rdma_registry_t	*m;
579 
580 	struct clist *cl_sendlist;
581 	struct clist *cl_recvlist;
582 	struct clist *cl;
583 	struct clist *cl_rpcmsg;
584 	struct clist *cl_rdma_reply;
585 	struct clist *cl_rpcreply_wlist;
586 	struct clist *cl_long_reply;
587 	rdma_buf_t  rndup;
588 
589 	uint_t vers;
590 	uint_t op;
591 	uint_t off;
592 	uint32_t seg_array_len;
593 	uint_t long_reply_len;
594 	uint_t rpcsec_gss;
595 	uint_t gss_i_or_p;
596 
597 	CONN *conn = NULL;
598 	rdma_buf_t clmsg;
599 	rdma_buf_t rpcmsg;
600 	rdma_chunkinfo_lengths_t rcil;
601 
602 	clock_t	ticks;
603 	bool_t wlist_exists_reply;
604 
605 	uint32_t rdma_credit = rdma_bufs_rqst;
606 
607 	RCSTAT_INCR(rccalls);
608 
609 call_again:
610 
611 	bzero(&clmsg, sizeof (clmsg));
612 	bzero(&rpcmsg, sizeof (rpcmsg));
613 	bzero(&rndup, sizeof (rndup));
614 	try_call_again = 0;
615 	cl_sendlist = NULL;
616 	cl_recvlist = NULL;
617 	cl = NULL;
618 	cl_rpcmsg = NULL;
619 	cl_rdma_reply = NULL;
620 	call_xdrp = NULL;
621 	reply_xdrp = NULL;
622 	wlist_exists_reply  = FALSE;
623 	cl_rpcreply_wlist = NULL;
624 	cl_long_reply = NULL;
625 	rcil.rcil_len = 0;
626 	rcil.rcil_len_alt = 0;
627 	long_reply_len = 0;
628 
629 	rw_enter(&rdma_lock, RW_READER);
630 	m = (rdma_registry_t *)p->cku_rd_handle;
631 	if (m->r_mod_state == RDMA_MOD_INACTIVE) {
632 		/*
633 		 * If we didn't find a matching RDMA module in the registry
634 		 * then there is no transport.
635 		 */
636 		rw_exit(&rdma_lock);
637 		p->cku_err.re_status = RPC_CANTSEND;
638 		p->cku_err.re_errno = EIO;
639 		ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
640 		if (h->cl_nosignal == TRUE) {
641 			delay(ticks);
642 		} else {
643 			if (delay_sig(ticks) == EINTR) {
644 				p->cku_err.re_status = RPC_INTR;
645 				p->cku_err.re_errno = EINTR;
646 			}
647 		}
648 		return (RPC_CANTSEND);
649 	}
650 	/*
651 	 * Get unique xid
652 	 */
653 	if (p->cku_xid == 0)
654 		p->cku_xid = alloc_xid();
655 
656 	status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_srcaddr,
657 	    &p->cku_addr, p->cku_addrfmly, p->cku_rd_handle, &conn);
658 	rw_exit(&rdma_lock);
659 
660 	/*
661 	 * If there is a problem with the connection reflect the issue
662 	 * back to the higher level to address, we MAY delay for a short
663 	 * period so that we are kind to the transport.
664 	 */
665 	if (conn == NULL) {
666 		/*
667 		 * Connect failed to server. Could be because of one
668 		 * of several things. In some cases we don't want
669 		 * the caller to retry immediately - delay before
670 		 * returning to caller.
671 		 */
672 		switch (status) {
673 		case RDMA_TIMEDOUT:
674 			/*
675 			 * Already timed out. No need to delay
676 			 * some more.
677 			 */
678 			p->cku_err.re_status = RPC_TIMEDOUT;
679 			p->cku_err.re_errno = ETIMEDOUT;
680 			break;
681 		case RDMA_INTR:
682 			/*
683 			 * Failed because of an signal. Very likely
684 			 * the caller will not retry.
685 			 */
686 			p->cku_err.re_status = RPC_INTR;
687 			p->cku_err.re_errno = EINTR;
688 			break;
689 		default:
690 			/*
691 			 * All other failures - server down or service
692 			 * down or temporary resource failure. Delay before
693 			 * returning to caller.
694 			 */
695 			ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
696 			p->cku_err.re_status = RPC_CANTCONNECT;
697 			p->cku_err.re_errno = EIO;
698 
699 			if (h->cl_nosignal == TRUE) {
700 				delay(ticks);
701 			} else {
702 				if (delay_sig(ticks) == EINTR) {
703 					p->cku_err.re_status = RPC_INTR;
704 					p->cku_err.re_errno = EINTR;
705 				}
706 			}
707 			break;
708 		}
709 
710 		return (p->cku_err.re_status);
711 	}
712 
713 	if (p->cku_srcaddr.maxlen < conn->c_laddr.len) {
714 		if ((p->cku_srcaddr.maxlen != 0) &&
715 		    (p->cku_srcaddr.buf != NULL))
716 			kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
717 		p->cku_srcaddr.buf = kmem_zalloc(conn->c_laddr.maxlen,
718 		    KM_SLEEP);
719 		p->cku_srcaddr.maxlen = conn->c_laddr.maxlen;
720 	}
721 
722 	p->cku_srcaddr.len = conn->c_laddr.len;
723 	bcopy(conn->c_laddr.buf, p->cku_srcaddr.buf, conn->c_laddr.len);
724 
725 	clnt_check_credit(conn);
726 
727 	status = CLNT_RDMA_FAIL;
728 
729 	rpcsec_gss = gss_i_or_p = FALSE;
730 
731 	if (IS_RPCSEC_GSS(h)) {
732 		rpcsec_gss = TRUE;
733 		if (rpc_gss_get_service_type(h->cl_auth) ==
734 		    rpc_gss_svc_integrity ||
735 		    rpc_gss_get_service_type(h->cl_auth) ==
736 		    rpc_gss_svc_privacy)
737 			gss_i_or_p = TRUE;
738 	}
739 
740 	/*
741 	 * Try a regular RDMA message if RPCSEC_GSS is not being used
742 	 * or if RPCSEC_GSS is being used for authentication only.
743 	 */
744 	if (rpcsec_gss == FALSE ||
745 	    (rpcsec_gss == TRUE && gss_i_or_p == FALSE)) {
746 		/*
747 		 * Grab a send buffer for the request.  Try to
748 		 * encode it to see if it fits. If not, then it
749 		 * needs to be sent in a chunk.
750 		 */
751 		rpcmsg.type = SEND_BUFFER;
752 		if (rdma_buf_alloc(conn, &rpcmsg)) {
753 			DTRACE_PROBE(krpc__e__clntrdma__callit_nobufs);
754 			goto done;
755 		}
756 
757 		/* First try to encode into regular send buffer */
758 		op = RDMA_MSG;
759 
760 		call_xdrp = &callxdr;
761 
762 		xdrrdma_create(call_xdrp, rpcmsg.addr, rpcmsg.len,
763 		    rdma_minchunk, NULL, XDR_ENCODE, conn);
764 
765 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
766 		    xdr_args, argsp);
767 
768 		if (status != CLNT_RDMA_SUCCESS) {
769 			/* Clean up from previous encode attempt */
770 			rdma_buf_free(conn, &rpcmsg);
771 			XDR_DESTROY(call_xdrp);
772 		} else {
773 			XDR_CONTROL(call_xdrp, XDR_RDMA_GET_CHUNK_LEN, &rcil);
774 		}
775 	}
776 
777 	/* If the encode didn't work, then try a NOMSG */
778 	if (status != CLNT_RDMA_SUCCESS) {
779 
780 		msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT + MAX_AUTH_BYTES +
781 		    xdr_sizeof(xdr_args, argsp);
782 
783 		msglen = calc_length(msglen);
784 
785 		/* pick up the lengths for the reply buffer needed */
786 		(void) xdrrdma_sizeof(xdr_args, argsp, 0,
787 		    &rcil.rcil_len, &rcil.rcil_len_alt);
788 
789 		/*
790 		 * Construct a clist to describe the CHUNK_BUFFER
791 		 * for the rpcmsg.
792 		 */
793 		cl_rpcmsg = clist_alloc();
794 		cl_rpcmsg->c_len = msglen;
795 		cl_rpcmsg->rb_longbuf.type = RDMA_LONG_BUFFER;
796 		cl_rpcmsg->rb_longbuf.len = msglen;
797 		if (rdma_buf_alloc(conn, &cl_rpcmsg->rb_longbuf)) {
798 			clist_free(cl_rpcmsg);
799 			goto done;
800 		}
801 		cl_rpcmsg->w.c_saddr3 = cl_rpcmsg->rb_longbuf.addr;
802 
803 		op = RDMA_NOMSG;
804 		call_xdrp = &callxdr;
805 
806 		xdrrdma_create(call_xdrp, cl_rpcmsg->rb_longbuf.addr,
807 		    cl_rpcmsg->rb_longbuf.len, 0,
808 		    cl_rpcmsg, XDR_ENCODE, conn);
809 
810 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
811 		    xdr_args, argsp);
812 
813 		if (status != CLNT_RDMA_SUCCESS) {
814 			p->cku_err.re_status = RPC_CANTENCODEARGS;
815 			p->cku_err.re_errno = EIO;
816 			DTRACE_PROBE(krpc__e__clntrdma__callit__composemsg);
817 			goto done;
818 		}
819 	}
820 
821 	/*
822 	 * During the XDR_ENCODE we may have "allocated" an RDMA READ or
823 	 * RDMA WRITE clist.
824 	 *
825 	 * First pull the RDMA READ chunk list from the XDR private
826 	 * area to keep it handy.
827 	 */
828 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &cl);
829 
830 	if (gss_i_or_p) {
831 		long_reply_len = rcil.rcil_len + rcil.rcil_len_alt;
832 		long_reply_len += MAX_AUTH_BYTES;
833 	} else {
834 		long_reply_len = rcil.rcil_len;
835 	}
836 
837 	/*
838 	 * Update the chunk size information for the Long RPC msg.
839 	 */
840 	if (cl && op == RDMA_NOMSG)
841 		cl->c_len = p->cku_outsz;
842 
843 	/*
844 	 * Prepare the RDMA header. On success xdrs will hold the result
845 	 * of xdrmem_create() for a SEND_BUFFER.
846 	 */
847 	status = clnt_compose_rdma_header(conn, h, &clmsg,
848 	    &rdmahdr_o_xdrs, &op);
849 
850 	if (status != CLNT_RDMA_SUCCESS) {
851 		p->cku_err.re_status = RPC_CANTSEND;
852 		p->cku_err.re_errno = EIO;
853 		RCSTAT_INCR(rcnomem);
854 		DTRACE_PROBE(krpc__e__clntrdma__callit__nobufs2);
855 		goto done;
856 	}
857 
858 	/*
859 	 * Now insert the RDMA READ list iff present
860 	 */
861 	status = clnt_setup_rlist(conn, rdmahdr_o_xdrs, call_xdrp);
862 	if (status != CLNT_RDMA_SUCCESS) {
863 		DTRACE_PROBE(krpc__e__clntrdma__callit__clistreg);
864 		rdma_buf_free(conn, &clmsg);
865 		p->cku_err.re_status = RPC_CANTSEND;
866 		p->cku_err.re_errno = EIO;
867 		goto done;
868 	}
869 
870 	/*
871 	 * Setup RDMA WRITE chunk list for nfs read operation
872 	 * other operations will have a NULL which will result
873 	 * as a NULL list in the XDR stream.
874 	 */
875 	status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp, &rndup);
876 	if (status != CLNT_RDMA_SUCCESS) {
877 		rdma_buf_free(conn, &clmsg);
878 		p->cku_err.re_status = RPC_CANTSEND;
879 		p->cku_err.re_errno = EIO;
880 		goto done;
881 	}
882 
883 	/*
884 	 * If NULL call and RPCSEC_GSS, provide a chunk such that
885 	 * large responses can flow back to the client.
886 	 * If RPCSEC_GSS with integrity or privacy is in use, get chunk.
887 	 */
888 	if ((procnum == 0 && rpcsec_gss == TRUE) ||
889 	    (rpcsec_gss == TRUE && gss_i_or_p == TRUE))
890 		long_reply_len += 1024;
891 
892 	status = clnt_setup_long_reply(conn, &cl_long_reply, long_reply_len);
893 
894 	if (status != CLNT_RDMA_SUCCESS) {
895 		rdma_buf_free(conn, &clmsg);
896 		p->cku_err.re_status = RPC_CANTSEND;
897 		p->cku_err.re_errno = EIO;
898 		goto done;
899 	}
900 
901 	/*
902 	 * XDR encode the RDMA_REPLY write chunk
903 	 */
904 	seg_array_len = (cl_long_reply ? 1 : 0);
905 	(void) xdr_encode_reply_wchunk(rdmahdr_o_xdrs, cl_long_reply,
906 	    seg_array_len);
907 
908 	/*
909 	 * Construct a clist in "sendlist" that represents what we
910 	 * will push over the wire.
911 	 *
912 	 * Start with the RDMA header and clist (if any)
913 	 */
914 	clist_add(&cl_sendlist, 0, XDR_GETPOS(rdmahdr_o_xdrs), &clmsg.handle,
915 	    clmsg.addr, NULL, NULL);
916 
917 	/*
918 	 * Put the RPC call message in  sendlist if small RPC
919 	 */
920 	if (op == RDMA_MSG) {
921 		clist_add(&cl_sendlist, 0, p->cku_outsz, &rpcmsg.handle,
922 		    rpcmsg.addr, NULL, NULL);
923 	} else {
924 		/* Long RPC already in chunk list */
925 		RCSTAT_INCR(rclongrpcs);
926 	}
927 
928 	/*
929 	 * Set up a reply buffer ready for the reply
930 	 */
931 	status = rdma_clnt_postrecv(conn, p->cku_xid);
932 	if (status != RDMA_SUCCESS) {
933 		rdma_buf_free(conn, &clmsg);
934 		p->cku_err.re_status = RPC_CANTSEND;
935 		p->cku_err.re_errno = EIO;
936 		goto done;
937 	}
938 
939 	/*
940 	 * sync the memory for dma
941 	 */
942 	if (cl != NULL) {
943 		status = clist_syncmem(conn, cl, CLIST_REG_SOURCE);
944 		if (status != RDMA_SUCCESS) {
945 			(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
946 			rdma_buf_free(conn, &clmsg);
947 			p->cku_err.re_status = RPC_CANTSEND;
948 			p->cku_err.re_errno = EIO;
949 			goto done;
950 		}
951 	}
952 
953 	/*
954 	 * Send the RDMA Header and RPC call message to the server
955 	 */
956 	status = RDMA_SEND(conn, cl_sendlist, p->cku_xid);
957 	if (status != RDMA_SUCCESS) {
958 		(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
959 		p->cku_err.re_status = RPC_CANTSEND;
960 		p->cku_err.re_errno = EIO;
961 		goto done;
962 	}
963 
964 	/*
965 	 * RDMA plugin now owns the send msg buffers.
966 	 * Clear them out and don't free them.
967 	 */
968 	clmsg.addr = NULL;
969 	if (rpcmsg.type == SEND_BUFFER)
970 		rpcmsg.addr = NULL;
971 
972 	/*
973 	 * Recv rpc reply
974 	 */
975 	status = RDMA_RECV(conn, &cl_recvlist, p->cku_xid);
976 
977 	/*
978 	 * Now check recv status
979 	 */
980 	if (status != 0) {
981 		if (status == RDMA_INTR) {
982 			p->cku_err.re_status = RPC_INTR;
983 			p->cku_err.re_errno = EINTR;
984 			RCSTAT_INCR(rcintrs);
985 		} else if (status == RPC_TIMEDOUT) {
986 			p->cku_err.re_status = RPC_TIMEDOUT;
987 			p->cku_err.re_errno = ETIMEDOUT;
988 			RCSTAT_INCR(rctimeouts);
989 		} else {
990 			p->cku_err.re_status = RPC_CANTRECV;
991 			p->cku_err.re_errno = EIO;
992 		}
993 		goto done;
994 	}
995 
996 	/*
997 	 * Process the reply message.
998 	 *
999 	 * First the chunk list (if any)
1000 	 */
1001 	rdmahdr_i_xdrs = &(p->cku_inxdr);
1002 	xdrmem_create(rdmahdr_i_xdrs,
1003 	    (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3,
1004 	    cl_recvlist->c_len, XDR_DECODE);
1005 
1006 	/*
1007 	 * Treat xid as opaque (xid is the first entity
1008 	 * in the rpc rdma message).
1009 	 * Skip xid and set the xdr position accordingly.
1010 	 */
1011 	XDR_SETPOS(rdmahdr_i_xdrs, sizeof (uint32_t));
1012 	(void) xdr_u_int(rdmahdr_i_xdrs, &vers);
1013 	(void) xdr_u_int(rdmahdr_i_xdrs, &rdma_credit);
1014 	(void) xdr_u_int(rdmahdr_i_xdrs, &op);
1015 	(void) xdr_do_clist(rdmahdr_i_xdrs, &cl);
1016 
1017 	clnt_update_credit(conn, rdma_credit);
1018 
1019 	wlist_exists_reply = FALSE;
1020 	if (! xdr_decode_wlist(rdmahdr_i_xdrs, &cl_rpcreply_wlist,
1021 	    &wlist_exists_reply)) {
1022 		DTRACE_PROBE(krpc__e__clntrdma__callit__wlist_decode);
1023 		p->cku_err.re_status = RPC_CANTDECODERES;
1024 		p->cku_err.re_errno = EIO;
1025 		goto done;
1026 	}
1027 
1028 	/*
1029 	 * The server shouldn't have sent a RDMA_SEND that
1030 	 * the client needs to RDMA_WRITE a reply back to
1031 	 * the server.  So silently ignoring what the
1032 	 * server returns in the rdma_reply section of the
1033 	 * header.
1034 	 */
1035 	(void) xdr_decode_reply_wchunk(rdmahdr_i_xdrs, &cl_rdma_reply);
1036 	off = xdr_getpos(rdmahdr_i_xdrs);
1037 
1038 	clnt_decode_long_reply(conn, cl_long_reply,
1039 	    cl_rdma_reply, &replyxdr, &reply_xdrp,
1040 	    cl, cl_recvlist, op, off);
1041 
1042 	if (reply_xdrp == NULL)
1043 		goto done;
1044 
1045 	if (wlist_exists_reply) {
1046 		XDR_CONTROL(reply_xdrp, XDR_RDMA_SET_WLIST, cl_rpcreply_wlist);
1047 	}
1048 
1049 	reply_msg.rm_direction = REPLY;
1050 	reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
1051 	reply_msg.acpted_rply.ar_stat = SUCCESS;
1052 	reply_msg.acpted_rply.ar_verf = _null_auth;
1053 
1054 	/*
1055 	 *  xdr_results will be done in AUTH_UNWRAP.
1056 	 */
1057 	reply_msg.acpted_rply.ar_results.where = NULL;
1058 	reply_msg.acpted_rply.ar_results.proc = xdr_void;
1059 
1060 	/*
1061 	 * Decode and validate the response.
1062 	 */
1063 	if (xdr_replymsg(reply_xdrp, &reply_msg)) {
1064 		enum clnt_stat re_status;
1065 
1066 		_seterr_reply(&reply_msg, &(p->cku_err));
1067 
1068 		re_status = p->cku_err.re_status;
1069 		if (re_status == RPC_SUCCESS) {
1070 			/*
1071 			 * Reply is good, check auth.
1072 			 */
1073 			if (!AUTH_VALIDATE(h->cl_auth,
1074 			    &reply_msg.acpted_rply.ar_verf)) {
1075 				p->cku_err.re_status = RPC_AUTHERROR;
1076 				p->cku_err.re_why = AUTH_INVALIDRESP;
1077 				RCSTAT_INCR(rcbadverfs);
1078 				DTRACE_PROBE(
1079 				    krpc__e__clntrdma__callit__authvalidate);
1080 			} else if (!AUTH_UNWRAP(h->cl_auth, reply_xdrp,
1081 			    xdr_results, resultsp)) {
1082 				p->cku_err.re_status = RPC_CANTDECODERES;
1083 				p->cku_err.re_errno = EIO;
1084 				DTRACE_PROBE(
1085 				    krpc__e__clntrdma__callit__authunwrap);
1086 			}
1087 		} else {
1088 			/* set errno in case we can't recover */
1089 			if (re_status != RPC_VERSMISMATCH &&
1090 			    re_status != RPC_AUTHERROR &&
1091 			    re_status != RPC_PROGVERSMISMATCH)
1092 				p->cku_err.re_errno = EIO;
1093 
1094 			if (re_status == RPC_AUTHERROR) {
1095 				if ((refresh_attempt > 0) &&
1096 				    AUTH_REFRESH(h->cl_auth, &reply_msg,
1097 				    p->cku_cred)) {
1098 					refresh_attempt--;
1099 					try_call_again = 1;
1100 					goto done;
1101 				}
1102 
1103 				try_call_again = 0;
1104 
1105 				/*
1106 				 * We have used the client handle to
1107 				 * do an AUTH_REFRESH and the RPC status may
1108 				 * be set to RPC_SUCCESS; Let's make sure to
1109 				 * set it to RPC_AUTHERROR.
1110 				 */
1111 				p->cku_err.re_status = RPC_AUTHERROR;
1112 
1113 				/*
1114 				 * Map recoverable and unrecoverable
1115 				 * authentication errors to appropriate
1116 				 * errno
1117 				 */
1118 				switch (p->cku_err.re_why) {
1119 				case AUTH_BADCRED:
1120 				case AUTH_BADVERF:
1121 				case AUTH_INVALIDRESP:
1122 				case AUTH_TOOWEAK:
1123 				case AUTH_FAILED:
1124 				case RPCSEC_GSS_NOCRED:
1125 				case RPCSEC_GSS_FAILED:
1126 					p->cku_err.re_errno = EACCES;
1127 					break;
1128 				case AUTH_REJECTEDCRED:
1129 				case AUTH_REJECTEDVERF:
1130 				default:
1131 					p->cku_err.re_errno = EIO;
1132 					break;
1133 				}
1134 			}
1135 			DTRACE_PROBE1(krpc__e__clntrdma__callit__rpcfailed,
1136 			    int, p->cku_err.re_why);
1137 		}
1138 	} else {
1139 		p->cku_err.re_status = RPC_CANTDECODERES;
1140 		p->cku_err.re_errno = EIO;
1141 		DTRACE_PROBE(krpc__e__clntrdma__callit__replymsg);
1142 	}
1143 
1144 done:
1145 	clnt_return_credit(conn);
1146 
1147 	if (cl_sendlist != NULL)
1148 		clist_free(cl_sendlist);
1149 
1150 	/*
1151 	 * If rpc reply is in a chunk, free it now.
1152 	 */
1153 	if (cl_long_reply) {
1154 		(void) clist_deregister(conn, cl_long_reply);
1155 		rdma_buf_free(conn, &cl_long_reply->rb_longbuf);
1156 		clist_free(cl_long_reply);
1157 	}
1158 
1159 	if (call_xdrp)
1160 		XDR_DESTROY(call_xdrp);
1161 
1162 	if (rndup.rb_private) {
1163 		rdma_buf_free(conn, &rndup);
1164 	}
1165 
1166 	if (reply_xdrp) {
1167 		(void) xdr_rpc_free_verifier(reply_xdrp, &reply_msg);
1168 		XDR_DESTROY(reply_xdrp);
1169 	}
1170 
1171 	if (cl_rdma_reply) {
1172 		clist_free(cl_rdma_reply);
1173 	}
1174 
1175 	if (cl_recvlist) {
1176 		rdma_buf_t	recvmsg = {0};
1177 		recvmsg.addr = (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3;
1178 		recvmsg.type = RECV_BUFFER;
1179 		RDMA_BUF_FREE(conn, &recvmsg);
1180 		clist_free(cl_recvlist);
1181 	}
1182 
1183 	RDMA_REL_CONN(conn);
1184 
1185 	if (try_call_again)
1186 		goto call_again;
1187 
1188 	if (p->cku_err.re_status != RPC_SUCCESS) {
1189 		RCSTAT_INCR(rcbadcalls);
1190 	}
1191 	return (p->cku_err.re_status);
1192 }
1193 
1194 
1195 static void
1196 clnt_decode_long_reply(CONN *conn,
1197     struct clist *cl_long_reply,
1198     struct clist *cl_rdma_reply, XDR *xdrs,
1199     XDR **rxdrp, struct clist *cl,
1200     struct clist *cl_recvlist,
1201     uint_t  op, uint_t off)
1202 {
1203 	if (op != RDMA_NOMSG) {
1204 		DTRACE_PROBE1(krpc__i__longrepl__rdmamsg__len,
1205 		    int, cl_recvlist->c_len - off);
1206 		xdrrdma_create(xdrs,
1207 		    (caddr_t)(uintptr_t)(cl_recvlist->w.c_saddr3 + off),
1208 		    cl_recvlist->c_len - off, 0, cl, XDR_DECODE, conn);
1209 		*rxdrp = xdrs;
1210 		return;
1211 	}
1212 
1213 	/* op must be RDMA_NOMSG */
1214 	if (cl) {
1215 		DTRACE_PROBE(krpc__e__clntrdma__declongreply__serverreadlist);
1216 		return;
1217 	}
1218 
1219 	if (cl_long_reply->u.c_daddr) {
1220 		DTRACE_PROBE1(krpc__i__longrepl__rdmanomsg__len,
1221 		    int, cl_rdma_reply->c_len);
1222 
1223 		xdrrdma_create(xdrs, (caddr_t)cl_long_reply->u.c_daddr3,
1224 		    cl_rdma_reply->c_len, 0, NULL, XDR_DECODE, conn);
1225 
1226 		*rxdrp = xdrs;
1227 	}
1228 }
1229 
1230 static void
1231 clnt_return_credit(CONN *conn)
1232 {
1233 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1234 
1235 	mutex_enter(&conn->c_lock);
1236 	cc_info->clnt_cc_in_flight_ops--;
1237 	cv_signal(&cc_info->clnt_cc_cv);
1238 	mutex_exit(&conn->c_lock);
1239 }
1240 
1241 static void
1242 clnt_update_credit(CONN *conn, uint32_t rdma_credit)
1243 {
1244 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1245 
1246 	/*
1247 	 * If the granted has not altered, avoid taking the
1248 	 * mutex, to essentially do nothing..
1249 	 */
1250 	if (cc_info->clnt_cc_granted_ops == rdma_credit)
1251 		return;
1252 	/*
1253 	 * Get the granted number of buffers for credit control.
1254 	 */
1255 	mutex_enter(&conn->c_lock);
1256 	cc_info->clnt_cc_granted_ops = rdma_credit;
1257 	mutex_exit(&conn->c_lock);
1258 }
1259 
1260 static void
1261 clnt_check_credit(CONN *conn)
1262 {
1263 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1264 
1265 	/*
1266 	 * Make sure we are not going over our allowed buffer use
1267 	 * (and make sure we have gotten a granted value before).
1268 	 */
1269 	mutex_enter(&conn->c_lock);
1270 	while (cc_info->clnt_cc_in_flight_ops >= cc_info->clnt_cc_granted_ops &&
1271 	    cc_info->clnt_cc_granted_ops != 0) {
1272 		/*
1273 		 * Client has maxed out its granted buffers due to
1274 		 * credit control.  Current handling is to block and wait.
1275 		 */
1276 		cv_wait(&cc_info->clnt_cc_cv, &conn->c_lock);
1277 	}
1278 	cc_info->clnt_cc_in_flight_ops++;
1279 	mutex_exit(&conn->c_lock);
1280 }
1281 
1282 /* ARGSUSED */
1283 static void
1284 clnt_rdma_kabort(CLIENT *h)
1285 {
1286 }
1287 
1288 static void
1289 clnt_rdma_kerror(CLIENT *h, struct rpc_err *err)
1290 {
1291 	struct cku_private *p = htop(h);
1292 	*err = p->cku_err;
1293 }
1294 
1295 static bool_t
1296 clnt_rdma_kfreeres(CLIENT *h, xdrproc_t xdr_res, caddr_t res_ptr)
1297 {
1298 	struct cku_private *p = htop(h);
1299 	XDR *xdrs;
1300 
1301 	xdrs = &(p->cku_outxdr);
1302 	xdrs->x_op = XDR_FREE;
1303 	return ((*xdr_res)(xdrs, res_ptr));
1304 }
1305 
1306 /* ARGSUSED */
1307 static bool_t
1308 clnt_rdma_kcontrol(CLIENT *h, int cmd, char *arg)
1309 {
1310 	return (TRUE);
1311 }
1312 
1313 /* ARGSUSED */
1314 static int
1315 clnt_rdma_ksettimers(CLIENT *h, struct rpc_timers *t, struct rpc_timers *all,
1316 	int minimum, void(*feedback)(int, int, caddr_t), caddr_t arg,
1317 	uint32_t xid)
1318 {
1319 	RCSTAT_INCR(rctimers);
1320 	return (0);
1321 }
1322 
1323 int
1324 rdma_reachable(int addr_type, struct netbuf *addr, struct knetconfig **knconf)
1325 {
1326 	rdma_registry_t	*rp;
1327 	void *handle = NULL;
1328 	struct knetconfig *knc;
1329 	char *pf, *p;
1330 	rdma_stat status;
1331 	int error = 0;
1332 
1333 	if (!INGLOBALZONE(curproc))
1334 		return (-1);
1335 
1336 	/*
1337 	 * modload the RDMA plugins if not already done.
1338 	 */
1339 	if (!rdma_modloaded) {
1340 		mutex_enter(&rdma_modload_lock);
1341 		if (!rdma_modloaded) {
1342 			error = rdma_modload();
1343 		}
1344 		mutex_exit(&rdma_modload_lock);
1345 		if (error)
1346 			return (-1);
1347 	}
1348 
1349 	if (!rdma_dev_available)
1350 		return (-1);
1351 
1352 	rw_enter(&rdma_lock, RW_READER);
1353 	rp = rdma_mod_head;
1354 	while (rp != NULL) {
1355 		if (rp->r_mod_state == RDMA_MOD_INACTIVE) {
1356 			rp = rp->r_next;
1357 			continue;
1358 		}
1359 		status = RDMA_REACHABLE(rp->r_mod->rdma_ops, addr_type, addr,
1360 		    &handle);
1361 		if (status == RDMA_SUCCESS) {
1362 			knc = kmem_zalloc(sizeof (struct knetconfig),
1363 			    KM_SLEEP);
1364 			knc->knc_semantics = NC_TPI_RDMA;
1365 			pf = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1366 			p = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1367 			if (addr_type == AF_INET)
1368 				(void) strncpy(pf, NC_INET, KNC_STRSIZE);
1369 			else if (addr_type == AF_INET6)
1370 				(void) strncpy(pf, NC_INET6, KNC_STRSIZE);
1371 			pf[KNC_STRSIZE - 1] = '\0';
1372 
1373 			(void) strncpy(p, rp->r_mod->rdma_api, KNC_STRSIZE);
1374 			p[KNC_STRSIZE - 1] = '\0';
1375 
1376 			knc->knc_protofmly = pf;
1377 			knc->knc_proto = p;
1378 			knc->knc_rdev = (dev_t)rp;
1379 			*knconf = knc;
1380 			rw_exit(&rdma_lock);
1381 			return (0);
1382 		}
1383 		rp = rp->r_next;
1384 	}
1385 	rw_exit(&rdma_lock);
1386 	return (-1);
1387 }
1388