xref: /titanic_52/usr/src/uts/common/rpc/clnt_rdma.c (revision 9e39c5ba00a55fa05777cc94b148296af305e135)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
26 /* All Rights Reserved */
27 /*
28  * Portions of this source code were derived from Berkeley
29  * 4.3 BSD under license from the Regents of the University of
30  * California.
31  */
32 
33 #include <sys/param.h>
34 #include <sys/types.h>
35 #include <sys/user.h>
36 #include <sys/systm.h>
37 #include <sys/sysmacros.h>
38 #include <sys/errno.h>
39 #include <sys/kmem.h>
40 #include <sys/debug.h>
41 #include <sys/systm.h>
42 #include <sys/kstat.h>
43 #include <sys/t_lock.h>
44 #include <sys/ddi.h>
45 #include <sys/cmn_err.h>
46 #include <sys/time.h>
47 #include <sys/isa_defs.h>
48 #include <sys/zone.h>
49 #include <sys/sdt.h>
50 
51 #include <rpc/types.h>
52 #include <rpc/xdr.h>
53 #include <rpc/auth.h>
54 #include <rpc/clnt.h>
55 #include <rpc/rpc_msg.h>
56 #include <rpc/rpc_rdma.h>
57 #include <nfs/nfs.h>
58 #include <nfs/nfs4_kprot.h>
59 
60 static uint32_t rdma_bufs_rqst = RDMA_BUFS_RQST;
61 
62 static int clnt_compose_rpcmsg(CLIENT *, rpcproc_t, rdma_buf_t *,
63 			    XDR *, xdrproc_t, caddr_t);
64 static int  clnt_compose_rdma_header(CONN *, CLIENT *, rdma_buf_t *,
65 		    XDR **, uint_t *);
66 static int clnt_setup_rlist(CONN *, XDR *, XDR *);
67 static int clnt_setup_wlist(CONN *, XDR *, XDR *, rdma_buf_t *);
68 static int clnt_setup_long_reply(CONN *, struct clist **, uint_t);
69 static void clnt_check_credit(CONN *);
70 static void clnt_return_credit(CONN *);
71 static void clnt_decode_long_reply(CONN *, struct clist *,
72 		struct clist *, XDR *, XDR **, struct clist *,
73 		struct clist *, uint_t, uint_t);
74 
75 static void clnt_update_credit(CONN *, uint32_t);
76 
77 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
78     caddr_t, xdrproc_t, caddr_t, struct timeval);
79 static void	clnt_rdma_kabort(CLIENT *);
80 static void	clnt_rdma_kerror(CLIENT *, struct rpc_err *);
81 static bool_t	clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
82 static void	clnt_rdma_kdestroy(CLIENT *);
83 static bool_t	clnt_rdma_kcontrol(CLIENT *, int, char *);
84 static int	clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
85     struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);
86 
87 /*
88  * Operations vector for RDMA based RPC
89  */
90 static struct clnt_ops rdma_clnt_ops = {
91 	clnt_rdma_kcallit,	/* do rpc call */
92 	clnt_rdma_kabort,	/* abort call */
93 	clnt_rdma_kerror,	/* return error status */
94 	clnt_rdma_kfreeres,	/* free results */
95 	clnt_rdma_kdestroy,	/* destroy rpc handle */
96 	clnt_rdma_kcontrol,	/* the ioctl() of rpc */
97 	clnt_rdma_ksettimers,	/* set retry timers */
98 };
99 
100 /*
101  * The size of the preserialized RPC header information.
102  */
103 #define	CKU_HDRSIZE	20
104 #define	CLNT_RDMA_SUCCESS 0
105 #define	CLNT_RDMA_FAIL (-1)
106 
107 #define	AUTH_REFRESH_COUNT 2
108 
109 #define	IS_RPCSEC_GSS(authh)			\
110 	(authh->cl_auth->ah_cred.oa_flavor == RPCSEC_GSS)
111 
112 /*
113  * Per RPC RDMA endpoint details
114  */
115 typedef struct cku_private {
116 	CLIENT			cku_client;	/* client handle */
117 	rdma_mod_t		*cku_rd_mod;	/* underlying RDMA mod */
118 	void			*cku_rd_handle;	/* underlying RDMA device */
119 	struct netbuf		cku_addr;	/* remote netbuf address */
120 	int			cku_addrfmly;	/* for finding addr_type */
121 	struct rpc_err		cku_err;	/* error status */
122 	struct cred		*cku_cred;	/* credentials */
123 	XDR			cku_outxdr;	/* xdr stream for output */
124 	uint32_t		cku_outsz;
125 	XDR			cku_inxdr;	/* xdr stream for input */
126 	char			cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
127 	uint32_t		cku_xid;	/* current XID */
128 } cku_private_t;
129 
130 #define	CLNT_RDMA_DELAY	10	/* secs to delay after a connection failure */
131 static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;
132 
133 struct {
134 	kstat_named_t	rccalls;
135 	kstat_named_t	rcbadcalls;
136 	kstat_named_t	rcbadxids;
137 	kstat_named_t	rctimeouts;
138 	kstat_named_t	rcnewcreds;
139 	kstat_named_t	rcbadverfs;
140 	kstat_named_t	rctimers;
141 	kstat_named_t	rccantconn;
142 	kstat_named_t	rcnomem;
143 	kstat_named_t	rcintrs;
144 	kstat_named_t	rclongrpcs;
145 } rdmarcstat = {
146 	{ "calls",	KSTAT_DATA_UINT64 },
147 	{ "badcalls",	KSTAT_DATA_UINT64 },
148 	{ "badxids",	KSTAT_DATA_UINT64 },
149 	{ "timeouts",	KSTAT_DATA_UINT64 },
150 	{ "newcreds",	KSTAT_DATA_UINT64 },
151 	{ "badverfs",	KSTAT_DATA_UINT64 },
152 	{ "timers",	KSTAT_DATA_UINT64 },
153 	{ "cantconn",	KSTAT_DATA_UINT64 },
154 	{ "nomem",	KSTAT_DATA_UINT64 },
155 	{ "interrupts", KSTAT_DATA_UINT64 },
156 	{ "longrpc", 	KSTAT_DATA_UINT64 }
157 };
158 
159 kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
160 uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);
161 
162 #ifdef DEBUG
163 int rdma_clnt_debug = 0;
164 #endif
165 
166 #ifdef accurate_stats
167 extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */
168 
169 #define	RCSTAT_INCR(x)			\
170 	mutex_enter(&rdmarcstat_lock);	\
171 	rdmarcstat.x.value.ui64++;	\
172 	mutex_exit(&rdmarcstat_lock);
173 #else
174 #define	RCSTAT_INCR(x)			\
175 	rdmarcstat.x.value.ui64++;
176 #endif
177 
178 #define	ptoh(p)		(&((p)->cku_client))
179 #define	htop(h)		((cku_private_t *)((h)->cl_private))
180 
181 uint_t
182 calc_length(uint_t len)
183 {
184 	len = RNDUP(len);
185 
186 	if (len <= 64 * 1024) {
187 		if (len > 32 * 1024) {
188 			len = 64 * 1024;
189 		} else {
190 			if (len > 16 * 1024) {
191 				len = 32 * 1024;
192 			} else {
193 				if (len > 8 * 1024) {
194 					len = 16 * 1024;
195 				} else {
196 					len = 8 * 1024;
197 				}
198 			}
199 		}
200 	}
201 	return (len);
202 }
203 int
204 clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
205     rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
206 {
207 	CLIENT *h;
208 	struct cku_private *p;
209 	struct rpc_msg call_msg;
210 	rdma_registry_t *rp;
211 
212 	ASSERT(INGLOBALZONE(curproc));
213 
214 	if (cl == NULL)
215 		return (EINVAL);
216 	*cl = NULL;
217 
218 	p = kmem_zalloc(sizeof (*p), KM_SLEEP);
219 
220 	/*
221 	 * Find underlying RDMATF plugin
222 	 */
223 	rw_enter(&rdma_lock, RW_READER);
224 	rp = rdma_mod_head;
225 	while (rp != NULL) {
226 		if (strcmp(rp->r_mod->rdma_api, proto))
227 			rp = rp->r_next;
228 		else {
229 			p->cku_rd_mod = rp->r_mod;
230 			p->cku_rd_handle = handle;
231 			break;
232 		}
233 	}
234 	rw_exit(&rdma_lock);
235 
236 	if (p->cku_rd_mod == NULL) {
237 		/*
238 		 * Should not happen.
239 		 * No matching RDMATF plugin.
240 		 */
241 		kmem_free(p, sizeof (struct cku_private));
242 		return (EINVAL);
243 	}
244 
245 	h = ptoh(p);
246 	h->cl_ops = &rdma_clnt_ops;
247 	h->cl_private = (caddr_t)p;
248 	h->cl_auth = authkern_create();
249 
250 	/* call message, just used to pre-serialize below */
251 	call_msg.rm_xid = 0;
252 	call_msg.rm_direction = CALL;
253 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
254 	call_msg.rm_call.cb_prog = pgm;
255 	call_msg.rm_call.cb_vers = vers;
256 
257 	xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
258 	/* pre-serialize call message header */
259 	if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
260 		XDR_DESTROY(&p->cku_outxdr);
261 		auth_destroy(h->cl_auth);
262 		kmem_free(p, sizeof (struct cku_private));
263 		return (EINVAL);
264 	}
265 
266 	/*
267 	 * Set up the rpc information
268 	 */
269 	p->cku_cred = cred;
270 	p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
271 	p->cku_addr.maxlen = raddr->maxlen;
272 	p->cku_addr.len = raddr->len;
273 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
274 	p->cku_addrfmly = family;
275 
276 	*cl = h;
277 	return (0);
278 }
279 
280 static void
281 clnt_rdma_kdestroy(CLIENT *h)
282 {
283 	struct cku_private *p = htop(h);
284 
285 	kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
286 	kmem_free(p, sizeof (*p));
287 }
288 
289 void
290 clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
291     struct cred *cred)
292 {
293 	struct cku_private *p = htop(h);
294 	rdma_registry_t *rp;
295 
296 	ASSERT(INGLOBALZONE(curproc));
297 	/*
298 	 * Find underlying RDMATF plugin
299 	 */
300 	p->cku_rd_mod = NULL;
301 	rw_enter(&rdma_lock, RW_READER);
302 	rp = rdma_mod_head;
303 	while (rp != NULL) {
304 		if (strcmp(rp->r_mod->rdma_api, proto))
305 			rp = rp->r_next;
306 		else {
307 			p->cku_rd_mod = rp->r_mod;
308 			p->cku_rd_handle = handle;
309 			break;
310 		}
311 
312 	}
313 	rw_exit(&rdma_lock);
314 
315 	/*
316 	 * Set up the rpc information
317 	 */
318 	p->cku_cred = cred;
319 	p->cku_xid = 0;
320 
321 	if (p->cku_addr.maxlen < raddr->len) {
322 		if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
323 			kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
324 		p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
325 		p->cku_addr.maxlen = raddr->maxlen;
326 	}
327 
328 	p->cku_addr.len = raddr->len;
329 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
330 	h->cl_ops = &rdma_clnt_ops;
331 }
332 
333 static int
334 clnt_compose_rpcmsg(CLIENT *h, rpcproc_t procnum,
335     rdma_buf_t *rpcmsg, XDR *xdrs,
336     xdrproc_t xdr_args, caddr_t argsp)
337 {
338 	cku_private_t *p = htop(h);
339 
340 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
341 		/*
342 		 * Copy in the preserialized RPC header
343 		 * information.
344 		 */
345 		bcopy(p->cku_rpchdr, rpcmsg->addr, CKU_HDRSIZE);
346 
347 		/*
348 		 * transaction id is the 1st thing in the output
349 		 * buffer.
350 		 */
351 		/* LINTED pointer alignment */
352 		(*(uint32_t *)(rpcmsg->addr)) = p->cku_xid;
353 
354 		/* Skip the preserialized stuff. */
355 		XDR_SETPOS(xdrs, CKU_HDRSIZE);
356 
357 		/* Serialize dynamic stuff into the output buffer. */
358 		if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
359 		    (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
360 		    (!(*xdr_args)(xdrs, argsp))) {
361 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__dynargs);
362 			return (CLNT_RDMA_FAIL);
363 		}
364 		p->cku_outsz = XDR_GETPOS(xdrs);
365 	} else {
366 		uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
367 		IXDR_PUT_U_INT32(uproc, procnum);
368 		(*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
369 		XDR_SETPOS(xdrs, 0);
370 
371 		/* Serialize the procedure number and the arguments. */
372 		if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
373 		    CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
374 			if (rpcmsg->addr != xdrs->x_base) {
375 				rpcmsg->addr = xdrs->x_base;
376 				rpcmsg->len = xdr_getbufsize(xdrs);
377 			}
378 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__procnum);
379 			return (CLNT_RDMA_FAIL);
380 		}
381 		/*
382 		 * If we had to allocate a new buffer while encoding
383 		 * then update the addr and len.
384 		 */
385 		if (rpcmsg->addr != xdrs->x_base) {
386 			rpcmsg->addr = xdrs->x_base;
387 			rpcmsg->len = xdr_getbufsize(xdrs);
388 		}
389 
390 		p->cku_outsz = XDR_GETPOS(xdrs);
391 		DTRACE_PROBE1(krpc__i__compose__size__sec, int, p->cku_outsz)
392 	}
393 
394 	return (CLNT_RDMA_SUCCESS);
395 }
396 
397 static int
398 clnt_compose_rdma_header(CONN *conn, CLIENT *h, rdma_buf_t *clmsg,
399     XDR **xdrs, uint_t *op)
400 {
401 	cku_private_t *p = htop(h);
402 	uint_t vers;
403 	uint32_t rdma_credit = rdma_bufs_rqst;
404 
405 	vers = RPCRDMA_VERS;
406 	clmsg->type = SEND_BUFFER;
407 
408 	if (rdma_buf_alloc(conn, clmsg)) {
409 		return (CLNT_RDMA_FAIL);
410 	}
411 
412 	*xdrs = &p->cku_outxdr;
413 	xdrmem_create(*xdrs, clmsg->addr, clmsg->len, XDR_ENCODE);
414 
415 	(*(uint32_t *)clmsg->addr) = p->cku_xid;
416 	XDR_SETPOS(*xdrs, sizeof (uint32_t));
417 	(void) xdr_u_int(*xdrs, &vers);
418 	(void) xdr_u_int(*xdrs, &rdma_credit);
419 	(void) xdr_u_int(*xdrs, op);
420 
421 	return (CLNT_RDMA_SUCCESS);
422 }
423 
424 /*
425  * If xp_cl is NULL value, then the RPC payload will NOT carry
426  * an RDMA READ chunk list, in this case we insert FALSE into
427  * the XDR stream. Otherwise we use the clist and RDMA register
428  * the memory and encode the clist into the outbound XDR stream.
429  */
430 static int
431 clnt_setup_rlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
432 {
433 	int status;
434 	struct clist *rclp;
435 	int32_t xdr_flag = XDR_RDMA_RLIST_REG;
436 
437 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &rclp);
438 
439 	if (rclp != NULL) {
440 		status = clist_register(conn, rclp, CLIST_REG_SOURCE);
441 		if (status != RDMA_SUCCESS) {
442 			return (CLNT_RDMA_FAIL);
443 		}
444 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
445 	}
446 	(void) xdr_do_clist(xdrs, &rclp);
447 
448 	return (CLNT_RDMA_SUCCESS);
449 }
450 
451 /*
452  * If xp_wcl is NULL value, then the RPC payload will NOT carry
453  * an RDMA WRITE chunk list, in this case we insert FALSE into
454  * the XDR stream. Otherwise we use the clist and  RDMA register
455  * the memory and encode the clist into the outbound XDR stream.
456  */
457 static int
458 clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp, rdma_buf_t *rndbuf)
459 {
460 	int status;
461 	struct clist *wlist, *rndcl;
462 	int wlen, rndlen;
463 	int32_t xdr_flag = XDR_RDMA_WLIST_REG;
464 
465 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_WLIST, &wlist);
466 
467 	if (wlist != NULL) {
468 		/*
469 		 * If we are sending a non 4-byte alligned length
470 		 * the server will roundup the length to 4-byte
471 		 * boundary. In such a case, a trailing chunk is
472 		 * added to take any spill over roundup bytes.
473 		 */
474 		wlen = clist_len(wlist);
475 		rndlen = (roundup(wlen, BYTES_PER_XDR_UNIT) - wlen);
476 		if (rndlen) {
477 			rndcl = clist_alloc();
478 			/*
479 			 * calc_length() will allocate a PAGESIZE
480 			 * buffer below.
481 			 */
482 			rndcl->c_len = calc_length(rndlen);
483 			rndcl->rb_longbuf.type = RDMA_LONG_BUFFER;
484 			rndcl->rb_longbuf.len = rndcl->c_len;
485 			if (rdma_buf_alloc(conn, &rndcl->rb_longbuf)) {
486 				clist_free(rndcl);
487 				return (CLNT_RDMA_FAIL);
488 			}
489 
490 			/* Roundup buffer freed back in caller */
491 			*rndbuf = rndcl->rb_longbuf;
492 
493 			rndcl->u.c_daddr3 = rndcl->rb_longbuf.addr;
494 			rndcl->c_next = NULL;
495 			rndcl->c_dmemhandle = rndcl->rb_longbuf.handle;
496 			wlist->c_next = rndcl;
497 		}
498 
499 		status = clist_register(conn, wlist, CLIST_REG_DST);
500 		if (status != RDMA_SUCCESS) {
501 			rdma_buf_free(conn, rndbuf);
502 			bzero(rndbuf, sizeof (rdma_buf_t));
503 			return (CLNT_RDMA_FAIL);
504 		}
505 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
506 	}
507 
508 	if (!xdr_encode_wlist(xdrs, wlist)) {
509 		if (rndlen) {
510 			rdma_buf_free(conn, rndbuf);
511 			bzero(rndbuf, sizeof (rdma_buf_t));
512 		}
513 		return (CLNT_RDMA_FAIL);
514 	}
515 
516 	return (CLNT_RDMA_SUCCESS);
517 }
518 
519 static int
520 clnt_setup_long_reply(CONN *conn, struct clist **clpp, uint_t length)
521 {
522 	if (length == 0) {
523 		*clpp = NULL;
524 		return (CLNT_RDMA_SUCCESS);
525 	}
526 
527 	*clpp = clist_alloc();
528 
529 	(*clpp)->rb_longbuf.len = calc_length(length);
530 	(*clpp)->rb_longbuf.type = RDMA_LONG_BUFFER;
531 
532 	if (rdma_buf_alloc(conn, &((*clpp)->rb_longbuf))) {
533 		clist_free(*clpp);
534 		*clpp = NULL;
535 		return (CLNT_RDMA_FAIL);
536 	}
537 
538 	(*clpp)->u.c_daddr3 = (*clpp)->rb_longbuf.addr;
539 	(*clpp)->c_len = (*clpp)->rb_longbuf.len;
540 	(*clpp)->c_next = NULL;
541 	(*clpp)->c_dmemhandle = (*clpp)->rb_longbuf.handle;
542 
543 	if (clist_register(conn, *clpp, CLIST_REG_DST)) {
544 		DTRACE_PROBE(krpc__e__clntrdma__longrep_regbuf);
545 		rdma_buf_free(conn, &((*clpp)->rb_longbuf));
546 		clist_free(*clpp);
547 		return (CLNT_RDMA_FAIL);
548 	}
549 
550 	return (CLNT_RDMA_SUCCESS);
551 }
552 
553 /* ARGSUSED */
554 static enum clnt_stat
555 clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
556     caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp,
557     struct timeval wait)
558 {
559 	cku_private_t *p = htop(h);
560 
561 	int 	try_call_again;
562 	int	refresh_attempt = AUTH_REFRESH_COUNT;
563 	int 	status;
564 	int 	msglen;
565 
566 	XDR	*call_xdrp, callxdr; /* for xdrrdma encoding the RPC call */
567 	XDR	*reply_xdrp, replyxdr; /* for xdrrdma decoding the RPC reply */
568 	XDR 	*rdmahdr_o_xdrs, *rdmahdr_i_xdrs;
569 
570 	struct rpc_msg 	reply_msg;
571 	rdma_registry_t	*m;
572 
573 	struct clist *cl_sendlist;
574 	struct clist *cl_recvlist;
575 	struct clist *cl;
576 	struct clist *cl_rpcmsg;
577 	struct clist *cl_rdma_reply;
578 	struct clist *cl_rpcreply_wlist;
579 	struct clist *cl_long_reply;
580 	rdma_buf_t  rndup;
581 
582 	uint_t vers;
583 	uint_t op;
584 	uint_t off;
585 	uint32_t seg_array_len;
586 	uint_t long_reply_len;
587 	uint_t rpcsec_gss;
588 	uint_t gss_i_or_p;
589 
590 	CONN *conn = NULL;
591 	rdma_buf_t clmsg;
592 	rdma_buf_t rpcmsg;
593 	rdma_chunkinfo_lengths_t rcil;
594 
595 	clock_t	ticks;
596 	bool_t wlist_exists_reply;
597 
598 	uint32_t rdma_credit = rdma_bufs_rqst;
599 
600 	RCSTAT_INCR(rccalls);
601 
602 call_again:
603 
604 	bzero(&clmsg, sizeof (clmsg));
605 	bzero(&rpcmsg, sizeof (rpcmsg));
606 	bzero(&rndup, sizeof (rndup));
607 	try_call_again = 0;
608 	cl_sendlist = NULL;
609 	cl_recvlist = NULL;
610 	cl = NULL;
611 	cl_rpcmsg = NULL;
612 	cl_rdma_reply = NULL;
613 	call_xdrp = NULL;
614 	reply_xdrp = NULL;
615 	wlist_exists_reply  = FALSE;
616 	cl_rpcreply_wlist = NULL;
617 	cl_long_reply = NULL;
618 	rcil.rcil_len = 0;
619 	rcil.rcil_len_alt = 0;
620 	long_reply_len = 0;
621 
622 	rw_enter(&rdma_lock, RW_READER);
623 	m = (rdma_registry_t *)p->cku_rd_handle;
624 	if (m->r_mod_state == RDMA_MOD_INACTIVE) {
625 		/*
626 		 * If we didn't find a matching RDMA module in the registry
627 		 * then there is no transport.
628 		 */
629 		rw_exit(&rdma_lock);
630 		p->cku_err.re_status = RPC_CANTSEND;
631 		p->cku_err.re_errno = EIO;
632 		ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
633 		if (h->cl_nosignal == TRUE) {
634 			delay(ticks);
635 		} else {
636 			if (delay_sig(ticks) == EINTR) {
637 				p->cku_err.re_status = RPC_INTR;
638 				p->cku_err.re_errno = EINTR;
639 			}
640 		}
641 		return (RPC_CANTSEND);
642 	}
643 	/*
644 	 * Get unique xid
645 	 */
646 	if (p->cku_xid == 0)
647 		p->cku_xid = alloc_xid();
648 
649 	status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_addr,
650 	    p->cku_addrfmly, p->cku_rd_handle, &conn);
651 	rw_exit(&rdma_lock);
652 
653 	/*
654 	 * If there is a problem with the connection reflect the issue
655 	 * back to the higher level to address, we MAY delay for a short
656 	 * period so that we are kind to the transport.
657 	 */
658 	if (conn == NULL) {
659 		/*
660 		 * Connect failed to server. Could be because of one
661 		 * of several things. In some cases we don't want
662 		 * the caller to retry immediately - delay before
663 		 * returning to caller.
664 		 */
665 		switch (status) {
666 		case RDMA_TIMEDOUT:
667 			/*
668 			 * Already timed out. No need to delay
669 			 * some more.
670 			 */
671 			p->cku_err.re_status = RPC_TIMEDOUT;
672 			p->cku_err.re_errno = ETIMEDOUT;
673 			break;
674 		case RDMA_INTR:
675 			/*
676 			 * Failed because of an signal. Very likely
677 			 * the caller will not retry.
678 			 */
679 			p->cku_err.re_status = RPC_INTR;
680 			p->cku_err.re_errno = EINTR;
681 			break;
682 		default:
683 			/*
684 			 * All other failures - server down or service
685 			 * down or temporary resource failure. Delay before
686 			 * returning to caller.
687 			 */
688 			ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
689 			p->cku_err.re_status = RPC_CANTCONNECT;
690 			p->cku_err.re_errno = EIO;
691 
692 			if (h->cl_nosignal == TRUE) {
693 				delay(ticks);
694 			} else {
695 				if (delay_sig(ticks) == EINTR) {
696 					p->cku_err.re_status = RPC_INTR;
697 					p->cku_err.re_errno = EINTR;
698 				}
699 			}
700 			break;
701 		}
702 
703 		return (p->cku_err.re_status);
704 	}
705 
706 	clnt_check_credit(conn);
707 
708 	status = CLNT_RDMA_FAIL;
709 
710 	rpcsec_gss = gss_i_or_p = FALSE;
711 
712 	if (IS_RPCSEC_GSS(h)) {
713 		rpcsec_gss = TRUE;
714 		if (rpc_gss_get_service_type(h->cl_auth) ==
715 		    rpc_gss_svc_integrity ||
716 		    rpc_gss_get_service_type(h->cl_auth) ==
717 		    rpc_gss_svc_privacy)
718 			gss_i_or_p = TRUE;
719 	}
720 
721 	/*
722 	 * Try a regular RDMA message if RPCSEC_GSS is not being used
723 	 * or if RPCSEC_GSS is being used for authentication only.
724 	 */
725 	if (rpcsec_gss == FALSE ||
726 	    (rpcsec_gss == TRUE && gss_i_or_p == FALSE)) {
727 		/*
728 		 * Grab a send buffer for the request.  Try to
729 		 * encode it to see if it fits. If not, then it
730 		 * needs to be sent in a chunk.
731 		 */
732 		rpcmsg.type = SEND_BUFFER;
733 		if (rdma_buf_alloc(conn, &rpcmsg)) {
734 			DTRACE_PROBE(krpc__e__clntrdma__callit_nobufs);
735 			goto done;
736 		}
737 
738 		/* First try to encode into regular send buffer */
739 		op = RDMA_MSG;
740 
741 		call_xdrp = &callxdr;
742 
743 		xdrrdma_create(call_xdrp, rpcmsg.addr, rpcmsg.len,
744 		    rdma_minchunk, NULL, XDR_ENCODE, conn);
745 
746 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
747 		    xdr_args, argsp);
748 
749 		if (status != CLNT_RDMA_SUCCESS) {
750 			/* Clean up from previous encode attempt */
751 			rdma_buf_free(conn, &rpcmsg);
752 			XDR_DESTROY(call_xdrp);
753 		} else {
754 			XDR_CONTROL(call_xdrp, XDR_RDMA_GET_CHUNK_LEN, &rcil);
755 		}
756 	}
757 
758 	/* If the encode didn't work, then try a NOMSG */
759 	if (status != CLNT_RDMA_SUCCESS) {
760 
761 		msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT + MAX_AUTH_BYTES +
762 		    xdr_sizeof(xdr_args, argsp);
763 
764 		msglen = calc_length(msglen);
765 
766 		/* pick up the lengths for the reply buffer needed */
767 		(void) xdrrdma_sizeof(xdr_args, argsp, 0,
768 		    &rcil.rcil_len, &rcil.rcil_len_alt);
769 
770 		/*
771 		 * Construct a clist to describe the CHUNK_BUFFER
772 		 * for the rpcmsg.
773 		 */
774 		cl_rpcmsg = clist_alloc();
775 		cl_rpcmsg->c_len = msglen;
776 		cl_rpcmsg->rb_longbuf.type = RDMA_LONG_BUFFER;
777 		cl_rpcmsg->rb_longbuf.len = msglen;
778 		if (rdma_buf_alloc(conn, &cl_rpcmsg->rb_longbuf)) {
779 			clist_free(cl_rpcmsg);
780 			goto done;
781 		}
782 		cl_rpcmsg->w.c_saddr3 = cl_rpcmsg->rb_longbuf.addr;
783 
784 		op = RDMA_NOMSG;
785 		call_xdrp = &callxdr;
786 
787 		xdrrdma_create(call_xdrp, cl_rpcmsg->rb_longbuf.addr,
788 		    cl_rpcmsg->rb_longbuf.len, 0,
789 		    cl_rpcmsg, XDR_ENCODE, conn);
790 
791 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
792 		    xdr_args, argsp);
793 
794 		if (status != CLNT_RDMA_SUCCESS) {
795 			p->cku_err.re_status = RPC_CANTENCODEARGS;
796 			p->cku_err.re_errno = EIO;
797 			DTRACE_PROBE(krpc__e__clntrdma__callit__composemsg);
798 			goto done;
799 		}
800 	}
801 
802 	/*
803 	 * During the XDR_ENCODE we may have "allocated" an RDMA READ or
804 	 * RDMA WRITE clist.
805 	 *
806 	 * First pull the RDMA READ chunk list from the XDR private
807 	 * area to keep it handy.
808 	 */
809 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &cl);
810 
811 	if (gss_i_or_p) {
812 		long_reply_len = rcil.rcil_len + rcil.rcil_len_alt;
813 		long_reply_len += MAX_AUTH_BYTES;
814 	} else {
815 		long_reply_len = rcil.rcil_len;
816 	}
817 
818 	/*
819 	 * Update the chunk size information for the Long RPC msg.
820 	 */
821 	if (cl && op == RDMA_NOMSG)
822 		cl->c_len = p->cku_outsz;
823 
824 	/*
825 	 * Prepare the RDMA header. On success xdrs will hold the result
826 	 * of xdrmem_create() for a SEND_BUFFER.
827 	 */
828 	status = clnt_compose_rdma_header(conn, h, &clmsg,
829 	    &rdmahdr_o_xdrs, &op);
830 
831 	if (status != CLNT_RDMA_SUCCESS) {
832 		p->cku_err.re_status = RPC_CANTSEND;
833 		p->cku_err.re_errno = EIO;
834 		RCSTAT_INCR(rcnomem);
835 		DTRACE_PROBE(krpc__e__clntrdma__callit__nobufs2);
836 		goto done;
837 	}
838 
839 	/*
840 	 * Now insert the RDMA READ list iff present
841 	 */
842 	status = clnt_setup_rlist(conn, rdmahdr_o_xdrs, call_xdrp);
843 	if (status != CLNT_RDMA_SUCCESS) {
844 		DTRACE_PROBE(krpc__e__clntrdma__callit__clistreg);
845 		rdma_buf_free(conn, &clmsg);
846 		p->cku_err.re_status = RPC_CANTSEND;
847 		p->cku_err.re_errno = EIO;
848 		goto done;
849 	}
850 
851 	/*
852 	 * Setup RDMA WRITE chunk list for nfs read operation
853 	 * other operations will have a NULL which will result
854 	 * as a NULL list in the XDR stream.
855 	 */
856 	status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp, &rndup);
857 	if (status != CLNT_RDMA_SUCCESS) {
858 		rdma_buf_free(conn, &clmsg);
859 		p->cku_err.re_status = RPC_CANTSEND;
860 		p->cku_err.re_errno = EIO;
861 		goto done;
862 	}
863 
864 	/*
865 	 * If NULL call and RPCSEC_GSS, provide a chunk such that
866 	 * large responses can flow back to the client.
867 	 * If RPCSEC_GSS with integrity or privacy is in use, get chunk.
868 	 */
869 	if ((procnum == 0 && rpcsec_gss == TRUE) ||
870 	    (rpcsec_gss == TRUE && gss_i_or_p == TRUE))
871 		long_reply_len += 1024;
872 
873 	status = clnt_setup_long_reply(conn, &cl_long_reply, long_reply_len);
874 
875 	if (status != CLNT_RDMA_SUCCESS) {
876 		rdma_buf_free(conn, &clmsg);
877 		p->cku_err.re_status = RPC_CANTSEND;
878 		p->cku_err.re_errno = EIO;
879 		goto done;
880 	}
881 
882 	/*
883 	 * XDR encode the RDMA_REPLY write chunk
884 	 */
885 	seg_array_len = (cl_long_reply ? 1 : 0);
886 	(void) xdr_encode_reply_wchunk(rdmahdr_o_xdrs, cl_long_reply,
887 	    seg_array_len);
888 
889 	/*
890 	 * Construct a clist in "sendlist" that represents what we
891 	 * will push over the wire.
892 	 *
893 	 * Start with the RDMA header and clist (if any)
894 	 */
895 	clist_add(&cl_sendlist, 0, XDR_GETPOS(rdmahdr_o_xdrs), &clmsg.handle,
896 	    clmsg.addr, NULL, NULL);
897 
898 	/*
899 	 * Put the RPC call message in  sendlist if small RPC
900 	 */
901 	if (op == RDMA_MSG) {
902 		clist_add(&cl_sendlist, 0, p->cku_outsz, &rpcmsg.handle,
903 		    rpcmsg.addr, NULL, NULL);
904 	} else {
905 		/* Long RPC already in chunk list */
906 		RCSTAT_INCR(rclongrpcs);
907 	}
908 
909 	/*
910 	 * Set up a reply buffer ready for the reply
911 	 */
912 	status = rdma_clnt_postrecv(conn, p->cku_xid);
913 	if (status != RDMA_SUCCESS) {
914 		rdma_buf_free(conn, &clmsg);
915 		p->cku_err.re_status = RPC_CANTSEND;
916 		p->cku_err.re_errno = EIO;
917 		goto done;
918 	}
919 
920 	/*
921 	 * sync the memory for dma
922 	 */
923 	if (cl != NULL) {
924 		status = clist_syncmem(conn, cl, CLIST_REG_SOURCE);
925 		if (status != RDMA_SUCCESS) {
926 			(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
927 			rdma_buf_free(conn, &clmsg);
928 			p->cku_err.re_status = RPC_CANTSEND;
929 			p->cku_err.re_errno = EIO;
930 			goto done;
931 		}
932 	}
933 
934 	/*
935 	 * Send the RDMA Header and RPC call message to the server
936 	 */
937 	status = RDMA_SEND(conn, cl_sendlist, p->cku_xid);
938 	if (status != RDMA_SUCCESS) {
939 		(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
940 		p->cku_err.re_status = RPC_CANTSEND;
941 		p->cku_err.re_errno = EIO;
942 		goto done;
943 	}
944 
945 	/*
946 	 * RDMA plugin now owns the send msg buffers.
947 	 * Clear them out and don't free them.
948 	 */
949 	clmsg.addr = NULL;
950 	if (rpcmsg.type == SEND_BUFFER)
951 		rpcmsg.addr = NULL;
952 
953 	/*
954 	 * Recv rpc reply
955 	 */
956 	status = RDMA_RECV(conn, &cl_recvlist, p->cku_xid);
957 
958 	/*
959 	 * Now check recv status
960 	 */
961 	if (status != 0) {
962 		if (status == RDMA_INTR) {
963 			p->cku_err.re_status = RPC_INTR;
964 			p->cku_err.re_errno = EINTR;
965 			RCSTAT_INCR(rcintrs);
966 		} else if (status == RPC_TIMEDOUT) {
967 			p->cku_err.re_status = RPC_TIMEDOUT;
968 			p->cku_err.re_errno = ETIMEDOUT;
969 			RCSTAT_INCR(rctimeouts);
970 		} else {
971 			p->cku_err.re_status = RPC_CANTRECV;
972 			p->cku_err.re_errno = EIO;
973 		}
974 		goto done;
975 	}
976 
977 	/*
978 	 * Process the reply message.
979 	 *
980 	 * First the chunk list (if any)
981 	 */
982 	rdmahdr_i_xdrs = &(p->cku_inxdr);
983 	xdrmem_create(rdmahdr_i_xdrs,
984 	    (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3,
985 	    cl_recvlist->c_len, XDR_DECODE);
986 
987 	/*
988 	 * Treat xid as opaque (xid is the first entity
989 	 * in the rpc rdma message).
990 	 * Skip xid and set the xdr position accordingly.
991 	 */
992 	XDR_SETPOS(rdmahdr_i_xdrs, sizeof (uint32_t));
993 	(void) xdr_u_int(rdmahdr_i_xdrs, &vers);
994 	(void) xdr_u_int(rdmahdr_i_xdrs, &rdma_credit);
995 	(void) xdr_u_int(rdmahdr_i_xdrs, &op);
996 	(void) xdr_do_clist(rdmahdr_i_xdrs, &cl);
997 
998 	clnt_update_credit(conn, rdma_credit);
999 
1000 	wlist_exists_reply = FALSE;
1001 	if (! xdr_decode_wlist(rdmahdr_i_xdrs, &cl_rpcreply_wlist,
1002 	    &wlist_exists_reply)) {
1003 		DTRACE_PROBE(krpc__e__clntrdma__callit__wlist_decode);
1004 		p->cku_err.re_status = RPC_CANTDECODERES;
1005 		p->cku_err.re_errno = EIO;
1006 		goto done;
1007 	}
1008 
1009 	/*
1010 	 * The server shouldn't have sent a RDMA_SEND that
1011 	 * the client needs to RDMA_WRITE a reply back to
1012 	 * the server.  So silently ignoring what the
1013 	 * server returns in the rdma_reply section of the
1014 	 * header.
1015 	 */
1016 	(void) xdr_decode_reply_wchunk(rdmahdr_i_xdrs, &cl_rdma_reply);
1017 	off = xdr_getpos(rdmahdr_i_xdrs);
1018 
1019 	clnt_decode_long_reply(conn, cl_long_reply,
1020 	    cl_rdma_reply, &replyxdr, &reply_xdrp,
1021 	    cl, cl_recvlist, op, off);
1022 
1023 	if (reply_xdrp == NULL)
1024 		goto done;
1025 
1026 	if (wlist_exists_reply) {
1027 		XDR_CONTROL(reply_xdrp, XDR_RDMA_SET_WLIST, cl_rpcreply_wlist);
1028 	}
1029 
1030 	reply_msg.rm_direction = REPLY;
1031 	reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
1032 	reply_msg.acpted_rply.ar_stat = SUCCESS;
1033 	reply_msg.acpted_rply.ar_verf = _null_auth;
1034 
1035 	/*
1036 	 *  xdr_results will be done in AUTH_UNWRAP.
1037 	 */
1038 	reply_msg.acpted_rply.ar_results.where = NULL;
1039 	reply_msg.acpted_rply.ar_results.proc = xdr_void;
1040 
1041 	/*
1042 	 * Decode and validate the response.
1043 	 */
1044 	if (xdr_replymsg(reply_xdrp, &reply_msg)) {
1045 		enum clnt_stat re_status;
1046 
1047 		_seterr_reply(&reply_msg, &(p->cku_err));
1048 
1049 		re_status = p->cku_err.re_status;
1050 		if (re_status == RPC_SUCCESS) {
1051 			/*
1052 			 * Reply is good, check auth.
1053 			 */
1054 			if (!AUTH_VALIDATE(h->cl_auth,
1055 			    &reply_msg.acpted_rply.ar_verf)) {
1056 				p->cku_err.re_status = RPC_AUTHERROR;
1057 				p->cku_err.re_why = AUTH_INVALIDRESP;
1058 				RCSTAT_INCR(rcbadverfs);
1059 				DTRACE_PROBE(
1060 				    krpc__e__clntrdma__callit__authvalidate);
1061 			} else if (!AUTH_UNWRAP(h->cl_auth, reply_xdrp,
1062 			    xdr_results, resultsp)) {
1063 				p->cku_err.re_status = RPC_CANTDECODERES;
1064 				p->cku_err.re_errno = EIO;
1065 				DTRACE_PROBE(
1066 				    krpc__e__clntrdma__callit__authunwrap);
1067 			}
1068 		} else {
1069 			/* set errno in case we can't recover */
1070 			if (re_status != RPC_VERSMISMATCH &&
1071 			    re_status != RPC_AUTHERROR &&
1072 			    re_status != RPC_PROGVERSMISMATCH)
1073 				p->cku_err.re_errno = EIO;
1074 
1075 			if (re_status == RPC_AUTHERROR) {
1076 				if ((refresh_attempt > 0) &&
1077 				    AUTH_REFRESH(h->cl_auth, &reply_msg,
1078 				    p->cku_cred)) {
1079 					refresh_attempt--;
1080 					try_call_again = 1;
1081 					goto done;
1082 				}
1083 
1084 				try_call_again = 0;
1085 
1086 				/*
1087 				 * We have used the client handle to
1088 				 * do an AUTH_REFRESH and the RPC status may
1089 				 * be set to RPC_SUCCESS; Let's make sure to
1090 				 * set it to RPC_AUTHERROR.
1091 				 */
1092 				p->cku_err.re_status = RPC_AUTHERROR;
1093 
1094 				/*
1095 				 * Map recoverable and unrecoverable
1096 				 * authentication errors to appropriate
1097 				 * errno
1098 				 */
1099 				switch (p->cku_err.re_why) {
1100 				case AUTH_BADCRED:
1101 				case AUTH_BADVERF:
1102 				case AUTH_INVALIDRESP:
1103 				case AUTH_TOOWEAK:
1104 				case AUTH_FAILED:
1105 				case RPCSEC_GSS_NOCRED:
1106 				case RPCSEC_GSS_FAILED:
1107 					p->cku_err.re_errno = EACCES;
1108 					break;
1109 				case AUTH_REJECTEDCRED:
1110 				case AUTH_REJECTEDVERF:
1111 				default:
1112 					p->cku_err.re_errno = EIO;
1113 					break;
1114 				}
1115 			}
1116 			DTRACE_PROBE1(krpc__e__clntrdma__callit__rpcfailed,
1117 			    int, p->cku_err.re_why);
1118 		}
1119 	} else {
1120 		p->cku_err.re_status = RPC_CANTDECODERES;
1121 		p->cku_err.re_errno = EIO;
1122 		DTRACE_PROBE(krpc__e__clntrdma__callit__replymsg);
1123 	}
1124 
1125 done:
1126 	clnt_return_credit(conn);
1127 
1128 	if (cl_sendlist != NULL)
1129 		clist_free(cl_sendlist);
1130 
1131 	/*
1132 	 * If rpc reply is in a chunk, free it now.
1133 	 */
1134 	if (cl_long_reply) {
1135 		(void) clist_deregister(conn, cl_long_reply);
1136 		rdma_buf_free(conn, &cl_long_reply->rb_longbuf);
1137 		clist_free(cl_long_reply);
1138 	}
1139 
1140 	if (call_xdrp)
1141 		XDR_DESTROY(call_xdrp);
1142 
1143 	if (rndup.rb_private) {
1144 		rdma_buf_free(conn, &rndup);
1145 	}
1146 
1147 	if (reply_xdrp) {
1148 		(void) xdr_rpc_free_verifier(reply_xdrp, &reply_msg);
1149 		XDR_DESTROY(reply_xdrp);
1150 	}
1151 
1152 	if (cl_rdma_reply) {
1153 		clist_free(cl_rdma_reply);
1154 	}
1155 
1156 	if (cl_recvlist) {
1157 		rdma_buf_t	recvmsg = {0};
1158 		recvmsg.addr = (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3;
1159 		recvmsg.type = RECV_BUFFER;
1160 		RDMA_BUF_FREE(conn, &recvmsg);
1161 		clist_free(cl_recvlist);
1162 	}
1163 
1164 	RDMA_REL_CONN(conn);
1165 
1166 	if (try_call_again)
1167 		goto call_again;
1168 
1169 	if (p->cku_err.re_status != RPC_SUCCESS) {
1170 		RCSTAT_INCR(rcbadcalls);
1171 	}
1172 	return (p->cku_err.re_status);
1173 }
1174 
1175 
1176 static void
1177 clnt_decode_long_reply(CONN *conn,
1178     struct clist *cl_long_reply,
1179     struct clist *cl_rdma_reply, XDR *xdrs,
1180     XDR **rxdrp, struct clist *cl,
1181     struct clist *cl_recvlist,
1182     uint_t  op, uint_t off)
1183 {
1184 	if (op != RDMA_NOMSG) {
1185 		DTRACE_PROBE1(krpc__i__longrepl__rdmamsg__len,
1186 		    int, cl_recvlist->c_len - off);
1187 		xdrrdma_create(xdrs,
1188 		    (caddr_t)(uintptr_t)(cl_recvlist->w.c_saddr3 + off),
1189 		    cl_recvlist->c_len - off, 0, cl, XDR_DECODE, conn);
1190 		*rxdrp = xdrs;
1191 		return;
1192 	}
1193 
1194 	/* op must be RDMA_NOMSG */
1195 	if (cl) {
1196 		DTRACE_PROBE(krpc__e__clntrdma__declongreply__serverreadlist);
1197 		return;
1198 	}
1199 
1200 	if (cl_long_reply->u.c_daddr) {
1201 		DTRACE_PROBE1(krpc__i__longrepl__rdmanomsg__len,
1202 		    int, cl_rdma_reply->c_len);
1203 
1204 		xdrrdma_create(xdrs, (caddr_t)cl_long_reply->u.c_daddr3,
1205 		    cl_rdma_reply->c_len, 0, NULL, XDR_DECODE, conn);
1206 
1207 		*rxdrp = xdrs;
1208 	}
1209 }
1210 
1211 static void
1212 clnt_return_credit(CONN *conn)
1213 {
1214 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1215 
1216 	mutex_enter(&conn->c_lock);
1217 	cc_info->clnt_cc_in_flight_ops--;
1218 	cv_signal(&cc_info->clnt_cc_cv);
1219 	mutex_exit(&conn->c_lock);
1220 }
1221 
1222 static void
1223 clnt_update_credit(CONN *conn, uint32_t rdma_credit)
1224 {
1225 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1226 
1227 	/*
1228 	 * If the granted has not altered, avoid taking the
1229 	 * mutex, to essentially do nothing..
1230 	 */
1231 	if (cc_info->clnt_cc_granted_ops == rdma_credit)
1232 		return;
1233 	/*
1234 	 * Get the granted number of buffers for credit control.
1235 	 */
1236 	mutex_enter(&conn->c_lock);
1237 	cc_info->clnt_cc_granted_ops = rdma_credit;
1238 	mutex_exit(&conn->c_lock);
1239 }
1240 
1241 static void
1242 clnt_check_credit(CONN *conn)
1243 {
1244 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1245 
1246 	/*
1247 	 * Make sure we are not going over our allowed buffer use
1248 	 * (and make sure we have gotten a granted value before).
1249 	 */
1250 	mutex_enter(&conn->c_lock);
1251 	while (cc_info->clnt_cc_in_flight_ops >= cc_info->clnt_cc_granted_ops &&
1252 	    cc_info->clnt_cc_granted_ops != 0) {
1253 		/*
1254 		 * Client has maxed out its granted buffers due to
1255 		 * credit control.  Current handling is to block and wait.
1256 		 */
1257 		cv_wait(&cc_info->clnt_cc_cv, &conn->c_lock);
1258 	}
1259 	cc_info->clnt_cc_in_flight_ops++;
1260 	mutex_exit(&conn->c_lock);
1261 }
1262 
1263 /* ARGSUSED */
1264 static void
1265 clnt_rdma_kabort(CLIENT *h)
1266 {
1267 }
1268 
1269 static void
1270 clnt_rdma_kerror(CLIENT *h, struct rpc_err *err)
1271 {
1272 	struct cku_private *p = htop(h);
1273 	*err = p->cku_err;
1274 }
1275 
1276 static bool_t
1277 clnt_rdma_kfreeres(CLIENT *h, xdrproc_t xdr_res, caddr_t res_ptr)
1278 {
1279 	struct cku_private *p = htop(h);
1280 	XDR *xdrs;
1281 
1282 	xdrs = &(p->cku_outxdr);
1283 	xdrs->x_op = XDR_FREE;
1284 	return ((*xdr_res)(xdrs, res_ptr));
1285 }
1286 
1287 /* ARGSUSED */
1288 static bool_t
1289 clnt_rdma_kcontrol(CLIENT *h, int cmd, char *arg)
1290 {
1291 	return (TRUE);
1292 }
1293 
1294 /* ARGSUSED */
1295 static int
1296 clnt_rdma_ksettimers(CLIENT *h, struct rpc_timers *t, struct rpc_timers *all,
1297 	int minimum, void(*feedback)(int, int, caddr_t), caddr_t arg,
1298 	uint32_t xid)
1299 {
1300 	RCSTAT_INCR(rctimers);
1301 	return (0);
1302 }
1303 
1304 int
1305 rdma_reachable(int addr_type, struct netbuf *addr, struct knetconfig **knconf)
1306 {
1307 	rdma_registry_t	*rp;
1308 	void *handle = NULL;
1309 	struct knetconfig *knc;
1310 	char *pf, *p;
1311 	rdma_stat status;
1312 	int error = 0;
1313 
1314 	if (!INGLOBALZONE(curproc))
1315 		return (-1);
1316 
1317 	/*
1318 	 * modload the RDMA plugins if not already done.
1319 	 */
1320 	if (!rdma_modloaded) {
1321 		mutex_enter(&rdma_modload_lock);
1322 		if (!rdma_modloaded) {
1323 			error = rdma_modload();
1324 		}
1325 		mutex_exit(&rdma_modload_lock);
1326 		if (error)
1327 			return (-1);
1328 	}
1329 
1330 	if (!rdma_dev_available)
1331 		return (-1);
1332 
1333 	rw_enter(&rdma_lock, RW_READER);
1334 	rp = rdma_mod_head;
1335 	while (rp != NULL) {
1336 		if (rp->r_mod_state == RDMA_MOD_INACTIVE) {
1337 			rp = rp->r_next;
1338 			continue;
1339 		}
1340 		status = RDMA_REACHABLE(rp->r_mod->rdma_ops, addr_type, addr,
1341 		    &handle);
1342 		if (status == RDMA_SUCCESS) {
1343 			knc = kmem_zalloc(sizeof (struct knetconfig),
1344 			    KM_SLEEP);
1345 			knc->knc_semantics = NC_TPI_RDMA;
1346 			pf = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1347 			p = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1348 			if (addr_type == AF_INET)
1349 				(void) strncpy(pf, NC_INET, KNC_STRSIZE);
1350 			else if (addr_type == AF_INET6)
1351 				(void) strncpy(pf, NC_INET6, KNC_STRSIZE);
1352 			pf[KNC_STRSIZE - 1] = '\0';
1353 
1354 			(void) strncpy(p, rp->r_mod->rdma_api, KNC_STRSIZE);
1355 			p[KNC_STRSIZE - 1] = '\0';
1356 
1357 			knc->knc_protofmly = pf;
1358 			knc->knc_proto = p;
1359 			knc->knc_rdev = (dev_t)rp;
1360 			*knconf = knc;
1361 			rw_exit(&rdma_lock);
1362 			return (0);
1363 		}
1364 		rp = rp->r_next;
1365 	}
1366 	rw_exit(&rdma_lock);
1367 	return (-1);
1368 }
1369