xref: /illumos-gate/usr/src/uts/common/rpc/clnt_rdma.c (revision 814a60b13c0ad90e5d2edfd29a7a84bbf416cc1a)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
27 /* All Rights Reserved */
28 /*
29  * Portions of this source code were derived from Berkeley
30  * 4.3 BSD under license from the Regents of the University of
31  * California.
32  */
33 
34 #pragma ident	"%Z%%M%	%I%	%E% SMI"
35 
36 #include <sys/param.h>
37 #include <sys/types.h>
38 #include <sys/user.h>
39 #include <sys/systm.h>
40 #include <sys/sysmacros.h>
41 #include <sys/errno.h>
42 #include <sys/kmem.h>
43 #include <sys/debug.h>
44 #include <sys/systm.h>
45 #include <sys/kstat.h>
46 #include <sys/t_lock.h>
47 #include <sys/ddi.h>
48 #include <sys/cmn_err.h>
49 #include <sys/time.h>
50 #include <sys/isa_defs.h>
51 #include <sys/zone.h>
52 
53 #include <rpc/types.h>
54 #include <rpc/xdr.h>
55 #include <rpc/auth.h>
56 #include <rpc/clnt.h>
57 #include <rpc/rpc_msg.h>
58 #include <rpc/rpc_rdma.h>
59 
60 
61 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
62     caddr_t, xdrproc_t, caddr_t, struct timeval);
63 static void	clnt_rdma_kabort(CLIENT *);
64 static void	clnt_rdma_kerror(CLIENT *, struct rpc_err *);
65 static bool_t	clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
66 static void	clnt_rdma_kdestroy(CLIENT *);
67 static bool_t	clnt_rdma_kcontrol(CLIENT *, int, char *);
68 static int	clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
69     struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);
70 
71 /*
72  * Operations vector for RDMA based RPC
73  */
74 static struct clnt_ops rdma_clnt_ops = {
75 	clnt_rdma_kcallit,	/* do rpc call */
76 	clnt_rdma_kabort,	/* abort call */
77 	clnt_rdma_kerror,	/* return error status */
78 	clnt_rdma_kfreeres,	/* free results */
79 	clnt_rdma_kdestroy,	/* destroy rpc handle */
80 	clnt_rdma_kcontrol,	/* the ioctl() of rpc */
81 	clnt_rdma_ksettimers,	/* set retry timers */
82 };
83 
84 /*
85  * The size of the preserialized RPC header information.
86  */
87 #define	CKU_HDRSIZE	20
88 
89 /*
90  * Per RPC RDMA endpoint details
91  */
92 typedef struct cku_private {
93 	CLIENT			cku_client;	/* client handle */
94 	rdma_mod_t		*cku_rd_mod;	/* underlying RDMA mod */
95 	void			*cku_rd_handle;	/* underlying RDMA device */
96 	struct netbuf		cku_addr;	/* remote netbuf address */
97 	int			cku_addrfmly;	/* for finding addr_type */
98 	struct rpc_err		cku_err;	/* error status */
99 	struct cred		*cku_cred;	/* credentials */
100 	XDR			cku_outxdr;	/* xdr stream for output */
101 	uint32_t		cku_outsz;
102 	XDR			cku_inxdr;	/* xdr stream for input */
103 	char			cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
104 	uint32_t		cku_xid;	/* current XID */
105 } cku_private_t;
106 
107 #define	CLNT_RDMA_DELAY	10	/* secs to delay after a connection failure */
108 static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;
109 
110 struct {
111 	kstat_named_t	rccalls;
112 	kstat_named_t	rcbadcalls;
113 	kstat_named_t	rcbadxids;
114 	kstat_named_t	rctimeouts;
115 	kstat_named_t	rcnewcreds;
116 	kstat_named_t	rcbadverfs;
117 	kstat_named_t	rctimers;
118 	kstat_named_t	rccantconn;
119 	kstat_named_t	rcnomem;
120 	kstat_named_t	rcintrs;
121 	kstat_named_t	rclongrpcs;
122 } rdmarcstat = {
123 	{ "calls",	KSTAT_DATA_UINT64 },
124 	{ "badcalls",	KSTAT_DATA_UINT64 },
125 	{ "badxids",	KSTAT_DATA_UINT64 },
126 	{ "timeouts",	KSTAT_DATA_UINT64 },
127 	{ "newcreds",	KSTAT_DATA_UINT64 },
128 	{ "badverfs",	KSTAT_DATA_UINT64 },
129 	{ "timers",	KSTAT_DATA_UINT64 },
130 	{ "cantconn",	KSTAT_DATA_UINT64 },
131 	{ "nomem",	KSTAT_DATA_UINT64 },
132 	{ "interrupts", KSTAT_DATA_UINT64 },
133 	{ "longrpc", 	KSTAT_DATA_UINT64 }
134 };
135 
136 kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
137 uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);
138 
139 #ifdef DEBUG
140 int rdma_clnt_debug = 0;
141 #endif
142 
143 #ifdef accurate_stats
144 extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */
145 
146 #define	RCSTAT_INCR(x)			\
147 	mutex_enter(&rdmarcstat_lock);	\
148 	rdmarcstat.x.value.ui64++;	\
149 	mutex_exit(&rdmarcstat_lock);
150 #else
151 #define	RCSTAT_INCR(x)			\
152 	rdmarcstat.x.value.ui64++;
153 #endif
154 
155 #define	ptoh(p)		(&((p)->cku_client))
156 #define	htop(h)		((cku_private_t *)((h)->cl_private))
157 
158 int
159 clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
160     rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
161 {
162 	CLIENT *h;
163 	struct cku_private *p;
164 	struct rpc_msg call_msg;
165 	rdma_registry_t *rp;
166 
167 	ASSERT(INGLOBALZONE(curproc));
168 
169 	if (cl == NULL)
170 		return (EINVAL);
171 	*cl = NULL;
172 
173 	p = kmem_zalloc(sizeof (*p), KM_SLEEP);
174 
175 	/*
176 	 * Find underlying RDMATF plugin
177 	 */
178 	rw_enter(&rdma_lock, RW_READER);
179 	rp = rdma_mod_head;
180 	while (rp != NULL) {
181 		if (strcmp(rp->r_mod->rdma_api, proto))
182 			rp = rp->r_next;
183 		else {
184 			p->cku_rd_mod = rp->r_mod;
185 			p->cku_rd_handle = handle;
186 			break;
187 		}
188 	}
189 	rw_exit(&rdma_lock);
190 
191 	if (p->cku_rd_mod == NULL) {
192 		/*
193 		 * Should not happen.
194 		 * No matching RDMATF plugin.
195 		 */
196 		kmem_free(p, sizeof (struct cku_private));
197 		return (EINVAL);
198 	}
199 
200 	h = ptoh(p);
201 	h->cl_ops = &rdma_clnt_ops;
202 	h->cl_private = (caddr_t)p;
203 	h->cl_auth = authkern_create();
204 
205 	/* call message, just used to pre-serialize below */
206 	call_msg.rm_xid = 0;
207 	call_msg.rm_direction = CALL;
208 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
209 	call_msg.rm_call.cb_prog = pgm;
210 	call_msg.rm_call.cb_vers = vers;
211 
212 	xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
213 	/* pre-serialize call message header */
214 	if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
215 		XDR_DESTROY(&p->cku_outxdr);
216 		auth_destroy(h->cl_auth);
217 		kmem_free(p, sizeof (struct cku_private));
218 		return (EINVAL);
219 	}
220 
221 	/*
222 	 * Set up the rpc information
223 	 */
224 	p->cku_cred = cred;
225 	p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
226 	p->cku_addr.maxlen = raddr->maxlen;
227 	p->cku_addr.len = raddr->len;
228 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
229 	p->cku_addrfmly = family;
230 
231 	*cl = h;
232 	return (0);
233 }
234 
235 static void
236 clnt_rdma_kdestroy(CLIENT *h)
237 {
238 	struct cku_private *p = htop(h);
239 
240 	kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
241 	kmem_free(p, sizeof (*p));
242 }
243 
244 void
245 clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
246     struct cred *cred)
247 {
248 	struct cku_private *p = htop(h);
249 	rdma_registry_t *rp;
250 
251 	ASSERT(INGLOBALZONE(curproc));
252 	/*
253 	 * Find underlying RDMATF plugin
254 	 */
255 	p->cku_rd_mod = NULL;
256 	rw_enter(&rdma_lock, RW_READER);
257 	rp = rdma_mod_head;
258 	while (rp != NULL) {
259 		if (strcmp(rp->r_mod->rdma_api, proto))
260 			rp = rp->r_next;
261 		else {
262 			p->cku_rd_mod = rp->r_mod;
263 			p->cku_rd_handle = handle;
264 			break;
265 		}
266 
267 	}
268 	rw_exit(&rdma_lock);
269 
270 	/*
271 	 * Set up the rpc information
272 	 */
273 	p->cku_cred = cred;
274 	p->cku_xid = 0;
275 
276 	if (p->cku_addr.maxlen < raddr->len) {
277 		if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
278 			kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
279 		p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
280 		p->cku_addr.maxlen = raddr->maxlen;
281 	}
282 
283 	p->cku_addr.len = raddr->len;
284 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
285 	h->cl_ops = &rdma_clnt_ops;
286 }
287 
288 /* ARGSUSED */
289 static enum clnt_stat
290 clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
291     caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp, struct timeval wait)
292 {
293 	cku_private_t *p = htop(h);
294 	int 	status;
295 	XDR 	*xdrs;
296 	XDR	*cxdrp = NULL, callxdr;	/* for xdrrdma encoding the RPC call */
297 	XDR	*rxdrp = NULL, replxdr;	/* for xdrrdma decoding the RPC reply */
298 	struct rpc_msg 	reply_msg;
299 	struct clist *sendlist, *recvlist = NULL;
300 	struct clist *cl = NULL, *cle = NULL;
301 	uint_t vers, op;
302 	uint_t off;
303 	uint32_t xid;
304 	CONN *conn = NULL;
305 	rdma_buf_t clmsg, rpcmsg, longmsg, rpcreply;
306 	int msglen;
307 	clock_t	ticks;
308 
309 	RCSTAT_INCR(rccalls);
310 	/*
311 	 * Get unique xid
312 	 */
313 	if (p->cku_xid == 0)
314 		p->cku_xid = alloc_xid();
315 
316 	status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_addr,
317 	    p->cku_addrfmly, p->cku_rd_handle, &conn);
318 
319 	if (conn == NULL) {
320 		/*
321 		 * Connect failed to server. Could be because of one
322 		 * of several things. In some cases we don't want
323 		 * the caller to retry immediately - delay before
324 		 * returning to caller.
325 		 */
326 		switch (status) {
327 		case RDMA_TIMEDOUT:
328 			/*
329 			 * Already timed out. No need to delay
330 			 * some more.
331 			 */
332 			p->cku_err.re_status = RPC_TIMEDOUT;
333 			p->cku_err.re_errno = ETIMEDOUT;
334 			break;
335 		case RDMA_INTR:
336 			/*
337 			 * Failed because of an signal. Very likely
338 			 * the caller will not retry.
339 			 */
340 			p->cku_err.re_status = RPC_INTR;
341 			p->cku_err.re_errno = EINTR;
342 			break;
343 		default:
344 			/*
345 			 * All other failures - server down or service
346 			 * down or temporary resource failure. Delay before
347 			 * returning to caller.
348 			 */
349 			ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
350 			p->cku_err.re_status = RPC_CANTCONNECT;
351 			p->cku_err.re_errno = EIO;
352 
353 			if (h->cl_nosignal == TRUE) {
354 				delay(ticks);
355 			} else {
356 				if (delay_sig(ticks) == EINTR) {
357 					p->cku_err.re_status = RPC_INTR;
358 					p->cku_err.re_errno = EINTR;
359 				}
360 			}
361 			break;
362 		}
363 
364 		return (p->cku_err.re_status);
365 	}
366 	/*
367 	 * Get the size of the rpc call message. Need this
368 	 * to determine if the rpc call message will fit in
369 	 * the pre-allocated RDMA buffers. If the rpc call
370 	 * message length is greater that the pre-allocated
371 	 * buffers then, it is a Long RPC. A one time use
372 	 * buffer is allocated and registered for the Long
373 	 * RPC call.
374 	 */
375 	xdrs = &callxdr;
376 	msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT;
377 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
378 		msglen += xdrrdma_authsize(h->cl_auth, p->cku_cred,
379 				rdma_minchunk);
380 		msglen += xdrrdma_sizeof(xdr_args, argsp, rdma_minchunk);
381 
382 		if (msglen > RPC_MSG_SZ) {
383 
384 			/*
385 			 * Long RPC. Allocate one time use custom buffer.
386 			 */
387 			rpcmsg.type = CHUNK_BUFFER;
388 			rpcmsg.addr = kmem_zalloc(msglen, KM_SLEEP);
389 			cle = kmem_zalloc(sizeof (*cle), KM_SLEEP);
390 			cle->c_xdroff = 0;
391 			cle->c_len  = rpcmsg.len = msglen;
392 			cle->c_saddr = (uint64)(uintptr_t)rpcmsg.addr;
393 			cle->c_next = NULL;
394 			xdrrdma_create(xdrs, rpcmsg.addr, msglen,
395 			    rdma_minchunk, cle, XDR_ENCODE, NULL);
396 			cxdrp = xdrs;
397 			op = RDMA_NOMSG;
398 		} else {
399 			/*
400 			 * Get a pre-allocated buffer for rpc call
401 			 */
402 			rpcmsg.type = SEND_BUFFER;
403 			if (RDMA_BUF_ALLOC(conn, &rpcmsg)) {
404 				p->cku_err.re_status = RPC_CANTSEND;
405 				p->cku_err.re_errno = EIO;
406 				RCSTAT_INCR(rcnomem);
407 				cmn_err(CE_WARN,
408 				    "clnt_rdma_kcallit: no buffers!");
409 				goto done;
410 			}
411 			xdrrdma_create(xdrs, rpcmsg.addr, rpcmsg.len,
412 			    rdma_minchunk, NULL, XDR_ENCODE, NULL);
413 			cxdrp = xdrs;
414 			op = RDMA_MSG;
415 		}
416 	} else {
417 		/*
418 		 * For RPCSEC_GSS since we cannot accurately presize the
419 		 * buffer required for encoding, we assume that its going
420 		 * to be a Long RPC to start with. We also create the
421 		 * the XDR stream with min_chunk set to 0 which instructs
422 		 * the XDR layer to not chunk the incoming byte stream.
423 		 */
424 
425 		msglen += 2 * MAX_AUTH_BYTES + 2 * sizeof (struct opaque_auth);
426 		msglen += xdr_sizeof(xdr_args, argsp);
427 
428 		/*
429 		 * Long RPC. Allocate one time use custom buffer.
430 		 */
431 		longmsg.type = CHUNK_BUFFER;
432 		longmsg.addr = kmem_zalloc(msglen, KM_SLEEP);
433 		cle = kmem_zalloc(sizeof (*cle), KM_SLEEP);
434 		cle->c_xdroff = 0;
435 		cle->c_len  = longmsg.len = msglen;
436 		cle->c_saddr = (uint64)(uintptr_t)longmsg.addr;
437 		cle->c_next = NULL;
438 		xdrrdma_create(xdrs, longmsg.addr, msglen, 0, cle,
439 		    XDR_ENCODE, NULL);
440 		cxdrp = xdrs;
441 		op = RDMA_NOMSG;
442 	}
443 
444 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
445 		/*
446 		 * Copy in the preserialized RPC header
447 		 * information.
448 		 */
449 		bcopy(p->cku_rpchdr, rpcmsg.addr, CKU_HDRSIZE);
450 
451 		/*
452 		 * transaction id is the 1st thing in the output
453 		 * buffer.
454 		 */
455 		/* LINTED pointer alignment */
456 		(*(uint32_t *)(rpcmsg.addr)) = p->cku_xid;
457 
458 		/* Skip the preserialized stuff. */
459 		XDR_SETPOS(xdrs, CKU_HDRSIZE);
460 
461 		/* Serialize dynamic stuff into the output buffer. */
462 		if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
463 		    (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
464 		    (!(*xdr_args)(xdrs, argsp))) {
465 			rdma_buf_free(conn, &rpcmsg);
466 			if (cle)
467 				clist_free(cle);
468 			p->cku_err.re_status = RPC_CANTENCODEARGS;
469 			p->cku_err.re_errno = EIO;
470 			cmn_err(CE_WARN,
471 	"clnt_rdma_kcallit: XDR_PUTINT32/AUTH_MARSHAL/xdr_args failed");
472 			goto done;
473 		}
474 		p->cku_outsz = XDR_GETPOS(xdrs);
475 	} else {
476 		uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
477 		IXDR_PUT_U_INT32(uproc, procnum);
478 		(*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
479 		XDR_SETPOS(xdrs, 0);
480 
481 		/* Serialize the procedure number and the arguments. */
482 		if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
483 		    CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
484 			if (longmsg.addr != xdrs->x_base) {
485 				longmsg.addr = xdrs->x_base;
486 				longmsg.len = xdr_getbufsize(xdrs);
487 			}
488 			rdma_buf_free(conn, &longmsg);
489 			clist_free(cle);
490 			p->cku_err.re_status = RPC_CANTENCODEARGS;
491 			p->cku_err.re_errno = EIO;
492 			cmn_err(CE_WARN,
493 		"clnt_rdma_kcallit: AUTH_WRAP failed");
494 			goto done;
495 		}
496 		/*
497 		 * If we had to allocate a new buffer while encoding
498 		 * then update the addr and len.
499 		 */
500 		if (longmsg.addr != xdrs->x_base) {
501 			longmsg.addr = xdrs->x_base;
502 			longmsg.len = xdr_getbufsize(xdrs);
503 		}
504 
505 		/*
506 		 * If it so happens that the encoded message is after all
507 		 * not long enough to be a Long RPC then allocate a
508 		 * SEND_BUFFER and copy the encoded message into it.
509 		 */
510 		p->cku_outsz = XDR_GETPOS(xdrs);
511 		if (p->cku_outsz > RPC_MSG_SZ) {
512 			rpcmsg.type = CHUNK_BUFFER;
513 			rpcmsg.addr = longmsg.addr;
514 			rpcmsg.len = longmsg.len;
515 		} else {
516 			clist_free(cle);
517 			XDR_DESTROY(cxdrp);
518 			cxdrp = NULL;
519 			/*
520 			 * Get a pre-allocated buffer for rpc call
521 			 */
522 			rpcmsg.type = SEND_BUFFER;
523 			if (RDMA_BUF_ALLOC(conn, &rpcmsg)) {
524 				p->cku_err.re_status = RPC_CANTSEND;
525 				p->cku_err.re_errno = EIO;
526 				RCSTAT_INCR(rcnomem);
527 				cmn_err(CE_WARN,
528 				    "clnt_rdma_kcallit: no buffers!");
529 				rdma_buf_free(conn, &longmsg);
530 				goto done;
531 			}
532 			bcopy(longmsg.addr, rpcmsg.addr, p->cku_outsz);
533 			xdrrdma_create(xdrs, rpcmsg.addr, p->cku_outsz, 0,
534 			    NULL, XDR_ENCODE, NULL);
535 			cxdrp = xdrs;
536 			rdma_buf_free(conn, &longmsg);
537 			op = RDMA_MSG;
538 		}
539 	}
540 
541 	cl = xdrrdma_clist(xdrs);
542 
543 	/*
544 	 * Update the chunk size information for the Long RPC msg.
545 	 */
546 	if (cl && op == RDMA_NOMSG)
547 		cl->c_len = p->cku_outsz;
548 
549 	/*
550 	 * Set up the RDMA chunk message
551 	 */
552 	vers = RPCRDMA_VERS;
553 	clmsg.type = SEND_BUFFER;
554 	if (RDMA_BUF_ALLOC(conn, &clmsg)) {
555 		p->cku_err.re_status = RPC_CANTSEND;
556 		p->cku_err.re_errno = EIO;
557 		rdma_buf_free(conn, &rpcmsg);
558 		RCSTAT_INCR(rcnomem);
559 		cmn_err(CE_WARN, "clnt_rdma_kcallit: no free buffers!!");
560 		goto done;
561 	}
562 	xdrs = &p->cku_outxdr;
563 	xdrmem_create(xdrs, clmsg.addr, clmsg.len, XDR_ENCODE);
564 	/*
565 	 * Treat xid as opaque (xid is the first entity
566 	 * in the rpc rdma message).
567 	 */
568 	(*(uint32_t *)clmsg.addr) = p->cku_xid;
569 	/* Skip xid and set the xdr position accordingly. */
570 	XDR_SETPOS(xdrs, sizeof (uint32_t));
571 	(void) xdr_u_int(xdrs, &vers);
572 	(void) xdr_u_int(xdrs, &op);
573 
574 	/*
575 	 * Now XDR the chunk list
576 	 */
577 	if (cl != NULL) {
578 
579 		/*
580 		 * Register the chunks in the list
581 		 */
582 		status = clist_register(conn, cl, 1);
583 		if (status != RDMA_SUCCESS) {
584 			cmn_err(CE_WARN,
585 		"clnt_rdma_kcallit: clist register failed");
586 			rdma_buf_free(conn, &clmsg);
587 			rdma_buf_free(conn, &rpcmsg);
588 			clist_free(cl);
589 			p->cku_err.re_status = RPC_CANTSEND;
590 			p->cku_err.re_errno = EIO;
591 			goto done;
592 		}
593 
594 	}
595 	(void) xdr_do_clist(xdrs, &cl);
596 
597 	/*
598 	 * Start with the RDMA header and clist (if any)
599 	 */
600 	sendlist = NULL;
601 	clist_add(&sendlist, 0, XDR_GETPOS(xdrs), &clmsg.handle,
602 		clmsg.addr, NULL, NULL);
603 
604 	/*
605 	 * Put the RPC call message in the send list if small RPC
606 	 */
607 	if (op == RDMA_MSG) {
608 		clist_add(&sendlist, 0, p->cku_outsz, &rpcmsg.handle,
609 			rpcmsg.addr, NULL, NULL);
610 	} else {
611 		/* Long RPC already in chunk list */
612 		RCSTAT_INCR(rclongrpcs);
613 	}
614 
615 	/*
616 	 * Set up a reply buffer ready for the reply
617 	 */
618 	status = rdma_clnt_postrecv(conn, p->cku_xid);
619 	if (status != RDMA_SUCCESS) {
620 		rdma_buf_free(conn, &clmsg);
621 		rdma_buf_free(conn, &rpcmsg);
622 		if (cl) {
623 			(void) clist_deregister(conn, cl, 1);
624 			clist_free(cl);
625 		}
626 		clist_free(sendlist);
627 		p->cku_err.re_status = RPC_CANTSEND;
628 		p->cku_err.re_errno = EIO;
629 		goto done;
630 	}
631 	/*
632 	 * sync the memory for dma
633 	 */
634 	if (cl != NULL) {
635 		status = clist_syncmem(conn, cl, 1);
636 		if (status != RDMA_SUCCESS) {
637 			rdma_buf_free(conn, &clmsg);
638 			rdma_buf_free(conn, &rpcmsg);
639 			(void) clist_deregister(conn, cl, 1);
640 			clist_free(cl);
641 			clist_free(sendlist);
642 			p->cku_err.re_status = RPC_CANTSEND;
643 			p->cku_err.re_errno = EIO;
644 			goto done;
645 		}
646 	}
647 
648 	/*
649 	 * Send the call message to the server
650 	 */
651 	status = RDMA_SEND(conn, sendlist, p->cku_xid);
652 	if (status != RDMA_SUCCESS) {
653 		if (cl) {
654 			(void) clist_deregister(conn, cl, 1);
655 			clist_free(cl);
656 			/*
657 			 * If this was a long RPC message, need
658 			 * to free that buffer.
659 			 */
660 			if (rpcmsg.type == CHUNK_BUFFER)
661 				rdma_buf_free(conn, &rpcmsg);
662 		}
663 		clist_free(sendlist);
664 		p->cku_err.re_status = RPC_CANTSEND;
665 		p->cku_err.re_errno = EIO;
666 		goto done;
667 	} else {
668 		/*
669 		 * RDMA plugin now owns the send msg buffers.
670 		 * Clear them out and don't free them here.
671 		 */
672 		clmsg.addr = NULL;
673 		if (rpcmsg.type == SEND_BUFFER)
674 			rpcmsg.addr = NULL;
675 	}
676 	clist_free(sendlist);
677 #ifdef DEBUG
678 if (rdma_clnt_debug) {
679 		printf("clnt_rdma_kcallit: send request xid %u\n", p->cku_xid);
680 	}
681 #endif
682 
683 	/*
684 	 * Recv rpc reply
685 	 */
686 	status = RDMA_RECV(conn, &recvlist, p->cku_xid);
687 
688 	/*
689 	 * Deregister chunks sent. Do this only after the reply
690 	 * is received as that is a sure indication that the
691 	 * remote end has completed RDMA of the chunks.
692 	 */
693 	if (cl != NULL) {
694 		/*
695 		 * Deregister the chunks
696 		 */
697 		(void) clist_deregister(conn, cl, 1);
698 		clist_free(cl);
699 		/*
700 		 * If long RPC free chunk
701 		 */
702 		rdma_buf_free(conn, &rpcmsg);
703 	}
704 
705 	/*
706 	 * Now check recv status
707 	 */
708 	if (status != 0) {
709 #ifdef DEBUG
710 		if (rdma_clnt_debug)
711 			cmn_err(CE_NOTE,
712 			    "clnt_rdma_kcallit: reply failed %u status %d",
713 			    p->cku_xid, status);
714 #endif
715 		if (status == RDMA_INTR) {
716 			p->cku_err.re_status = RPC_INTR;
717 			p->cku_err.re_errno = EINTR;
718 			RCSTAT_INCR(rcintrs);
719 		} else if (status == RPC_TIMEDOUT) {
720 			p->cku_err.re_status = RPC_TIMEDOUT;
721 			p->cku_err.re_errno = ETIMEDOUT;
722 			RCSTAT_INCR(rctimeouts);
723 		} else {
724 			p->cku_err.re_status = RPC_CANTRECV;
725 			p->cku_err.re_errno = EIO;
726 		}
727 		goto done;
728 	}
729 #ifdef DEBUG
730 	if (rdma_clnt_debug)
731 		printf("clnt_rdma_kcallit: got response xid %u\n", p->cku_xid);
732 #endif
733 	/*
734 	 * Process the reply message.
735 	 *
736 	 * First the chunk list (if any)
737 	 */
738 	xdrs = &(p->cku_inxdr);
739 	xdrmem_create(xdrs, (caddr_t)(uintptr_t)recvlist->c_saddr,
740 	    recvlist->c_len, XDR_DECODE);
741 	/*
742 	 * Treat xid as opaque (xid is the first entity
743 	 * in the rpc rdma message).
744 	 */
745 	xid = *(uint32_t *)(uintptr_t)recvlist->c_saddr;
746 	/* Skip xid and set the xdr position accordingly. */
747 	XDR_SETPOS(xdrs, sizeof (uint32_t));
748 	(void) xdr_u_int(xdrs, &vers);
749 	(void) xdr_u_int(xdrs, &op);
750 	(void) xdr_do_clist(xdrs, &cl);
751 	off = xdr_getpos(xdrs);
752 
753 	/*
754 	 * Now the RPC reply message itself. If the reply
755 	 * came as a chunk item, then RDMA the reply over.
756 	 */
757 	xdrs = &replxdr;
758 	if (cl && op == RDMA_NOMSG) {
759 		struct clist		*cle = cl;
760 
761 		rpcreply.type = CHUNK_BUFFER;
762 		rpcreply.addr = kmem_alloc(cle->c_len, KM_SLEEP);
763 		rpcreply.len = cle->c_len;
764 		cle->c_daddr = (uint64)(uintptr_t)rpcreply.addr;
765 		cl = cl->c_next;
766 		cle->c_next = NULL;
767 
768 		/*
769 		 * Register the rpc reply chunk destination
770 		 */
771 		status = clist_register(conn, cle, 0);
772 		if (status) {
773 			rdma_buf_free(conn, &rpcreply);
774 			clist_free(cle);
775 			p->cku_err.re_status = RPC_CANTDECODERES;
776 			p->cku_err.re_errno = EIO;
777 			cmn_err(CE_WARN,
778 			    "clnt_rdma_kcallit: clist_register failed");
779 			goto rdma_done;
780 		}
781 
782 		/*
783 		 * Now read rpc reply in
784 		 */
785 #ifdef DEBUG
786 	if (rdma_clnt_debug)
787 		printf("clnt_rdma_kcallit: read chunk, len %d, xid %u, \
788 			reply xid %u\n", cle->c_len, p->cku_xid, xid);
789 #endif
790 		status = RDMA_READ(conn, cle, WAIT);
791 		if (status) {
792 			(void) clist_deregister(conn, cle, 0);
793 			rdma_buf_free(conn, &rpcreply);
794 			clist_free(cle);
795 			p->cku_err.re_status = RPC_CANTDECODERES;
796 			p->cku_err.re_errno = EIO;
797 			cmn_err(CE_WARN,
798 				"clnt_rdma_kcallit: RDMA_READ failed");
799 			goto rdma_done;
800 		}
801 
802 		/*
803 		 * sync the memory for dma
804 		 */
805 		status = clist_syncmem(conn, cle, 0);
806 		if (status != RDMA_SUCCESS) {
807 			(void) clist_deregister(conn, cle, 0);
808 			rdma_buf_free(conn, &rpcreply);
809 			clist_free(cle);
810 			p->cku_err.re_status = RPC_CANTDECODERES;
811 			p->cku_err.re_errno = EIO;
812 			goto rdma_done;
813 		}
814 
815 		/*
816 		 * Deregister the Long RPC chunk
817 		 */
818 		(void) clist_deregister(conn, cle, 0);
819 		clist_free(cle);
820 		xdrrdma_create(xdrs, rpcreply.addr, rpcreply.len, 0, cl,
821 			XDR_DECODE, conn);
822 		rxdrp = xdrs;
823 	} else {
824 		rpcreply.addr = NULL;
825 		xdrrdma_create(xdrs,
826 		    (caddr_t)(uintptr_t)(recvlist->c_saddr + off),
827 		    recvlist->c_len - off, 0, cl, XDR_DECODE, conn);
828 		rxdrp = xdrs;
829 	}
830 
831 	reply_msg.rm_direction = REPLY;
832 	reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
833 	reply_msg.acpted_rply.ar_stat = SUCCESS;
834 	reply_msg.acpted_rply.ar_verf = _null_auth;
835 	/*
836 	 *  xdr_results will be done in AUTH_UNWRAP.
837 	 */
838 	reply_msg.acpted_rply.ar_results.where = NULL;
839 	reply_msg.acpted_rply.ar_results.proc = xdr_void;
840 
841 	/*
842 	 * Decode and validate the response.
843 	 */
844 	if (xdr_replymsg(xdrs, &reply_msg)) {
845 		enum clnt_stat re_status;
846 
847 		_seterr_reply(&reply_msg, &(p->cku_err));
848 
849 		re_status = p->cku_err.re_status;
850 		if (re_status == RPC_SUCCESS) {
851 			/*
852 			 * Reply is good, check auth.
853 			 */
854 			if (!AUTH_VALIDATE(h->cl_auth,
855 			    &reply_msg.acpted_rply.ar_verf)) {
856 				p->cku_err.re_status = RPC_AUTHERROR;
857 				p->cku_err.re_why = AUTH_INVALIDRESP;
858 				RCSTAT_INCR(rcbadverfs);
859 				cmn_err(CE_WARN,
860 			    "clnt_rdma_kcallit: AUTH_VALIDATE failed");
861 			} else if (!AUTH_UNWRAP(h->cl_auth, xdrs,
862 			    xdr_results, resultsp)) {
863 				p->cku_err.re_status = RPC_CANTDECODERES;
864 				p->cku_err.re_errno = EIO;
865 				cmn_err(CE_WARN,
866 				    "clnt_rdma_kcallit: AUTH_UNWRAP failed");
867 			}
868 		} else {
869 			/* set errno in case we can't recover */
870 			if (re_status != RPC_VERSMISMATCH &&
871 			    re_status != RPC_AUTHERROR &&
872 			    re_status != RPC_PROGVERSMISMATCH)
873 				p->cku_err.re_errno = EIO;
874 
875 			if (re_status == RPC_AUTHERROR) {
876 				/*
877 				 * Map recoverable and unrecoverable
878 				 * authentication errors to appropriate
879 				 * errno
880 				 */
881 				switch (p->cku_err.re_why) {
882 				case AUTH_BADCRED:
883 				case AUTH_BADVERF:
884 				case AUTH_INVALIDRESP:
885 				case AUTH_TOOWEAK:
886 				case AUTH_FAILED:
887 				case RPCSEC_GSS_NOCRED:
888 				case RPCSEC_GSS_FAILED:
889 					p->cku_err.re_errno = EACCES;
890 					break;
891 				case AUTH_REJECTEDCRED:
892 				case AUTH_REJECTEDVERF:
893 				default:
894 					p->cku_err.re_errno = EIO;
895 					break;
896 				}
897 				RPCLOG(1, "clnt_rdma_kcallit : "
898 				    "authentication failed with "
899 				    "RPC_AUTHERROR of type %d\n",
900 				    p->cku_err.re_why);
901 			}
902 			cmn_err(CE_WARN,
903 				    "clnt_rdma_kcallit: RPC failed");
904 
905 		}
906 	} else {
907 		p->cku_err.re_status = RPC_CANTDECODERES;
908 		p->cku_err.re_errno = EIO;
909 		cmn_err(CE_WARN, "clnt_rdma_kcallit: xdr_replymsg failed");
910 	}
911 
912 	/*
913 	 * If rpc reply is in a chunk, free it now.
914 	 */
915 	if (rpcreply.addr != NULL)
916 		rdma_buf_free(conn, &rpcreply);
917 
918 rdma_done:
919 	if ((cl != NULL) || (op == RDMA_NOMSG)) {
920 		rdma_buf_t	donemsg;
921 
922 		/*
923 		 * Free the list holding the chunk info
924 		 */
925 		if (cl) {
926 			clist_free(cl);
927 			cl = NULL;
928 		}
929 
930 		/*
931 		 * Tell the server that the reads are done
932 		 */
933 		donemsg.type = SEND_BUFFER;
934 		if (RDMA_BUF_ALLOC(conn, &donemsg)) {
935 			p->cku_err.re_status = RPC_CANTSEND;
936 			p->cku_err.re_errno = EIO;
937 			RCSTAT_INCR(rcnomem);
938 			cmn_err(CE_WARN, "clnt_rdma_kcallit: no free buffer");
939 			goto done;
940 		}
941 		xdrs = &p->cku_outxdr;
942 		xdrmem_create(xdrs, donemsg.addr, donemsg.len, XDR_ENCODE);
943 		vers = RPCRDMA_VERS;
944 		op = RDMA_DONE;
945 
946 		/*
947 		 * Treat xid as opaque (xid is the first entity
948 		 * in the rpc rdma message).
949 		 */
950 		(*(uint32_t *)donemsg.addr) = p->cku_xid;
951 		/* Skip xid and set the xdr position accordingly. */
952 		XDR_SETPOS(xdrs, sizeof (uint32_t));
953 		if (!xdr_u_int(xdrs, &vers) ||
954 		    !xdr_u_int(xdrs, &op)) {
955 			cmn_err(CE_WARN,
956 				"clnt_rdma_kcallit: xdr_u_int failed");
957 			rdma_buf_free(conn, &donemsg);
958 			goto done;
959 		}
960 
961 		sendlist = NULL;
962 		clist_add(&sendlist, 0, XDR_GETPOS(xdrs), &donemsg.handle,
963 			donemsg.addr, NULL, NULL);
964 
965 		status = RDMA_SEND(conn, sendlist, p->cku_xid);
966 		if (status != RDMA_SUCCESS) {
967 			cmn_err(CE_WARN,
968 				"clnt_rdma_kcallit: RDMA_SEND failed xid %u",
969 					p->cku_xid);
970 		}
971 #ifdef DEBUG
972 		else {
973 		if (rdma_clnt_debug)
974 			printf("clnt_rdma_kcallit: sent RDMA_DONE xid %u\n",
975 				p->cku_xid);
976 		}
977 #endif
978 		clist_free(sendlist);
979 	}
980 
981 done:
982 	if (cxdrp)
983 		XDR_DESTROY(cxdrp);
984 	if (rxdrp) {
985 		(void) xdr_rpc_free_verifier(rxdrp, &reply_msg);
986 		XDR_DESTROY(rxdrp);
987 	}
988 
989 	if (recvlist) {
990 		rdma_buf_t	recvmsg;
991 
992 		recvmsg.addr = (caddr_t)(uintptr_t)recvlist->c_saddr;
993 		recvmsg.type = RECV_BUFFER;
994 		RDMA_BUF_FREE(conn, &recvmsg);
995 		clist_free(recvlist);
996 	}
997 	RDMA_REL_CONN(conn);
998 	if (p->cku_err.re_status != RPC_SUCCESS) {
999 		RCSTAT_INCR(rcbadcalls);
1000 	}
1001 	return (p->cku_err.re_status);
1002 }
1003 
1004 /* ARGSUSED */
1005 static void
1006 clnt_rdma_kabort(CLIENT *h)
1007 {
1008 }
1009 
1010 static void
1011 clnt_rdma_kerror(CLIENT *h, struct rpc_err *err)
1012 {
1013 	struct cku_private *p = htop(h);
1014 
1015 	*err = p->cku_err;
1016 }
1017 
1018 static bool_t
1019 clnt_rdma_kfreeres(CLIENT *h, xdrproc_t xdr_res, caddr_t res_ptr)
1020 {
1021 	struct cku_private *p = htop(h);
1022 	XDR *xdrs;
1023 
1024 	xdrs = &(p->cku_outxdr);
1025 	xdrs->x_op = XDR_FREE;
1026 	return ((*xdr_res)(xdrs, res_ptr));
1027 }
1028 
1029 /* ARGSUSED */
1030 static bool_t
1031 clnt_rdma_kcontrol(CLIENT *h, int cmd, char *arg)
1032 {
1033 	return (TRUE);
1034 }
1035 
1036 /* ARGSUSED */
1037 static int
1038 clnt_rdma_ksettimers(CLIENT *h, struct rpc_timers *t, struct rpc_timers *all,
1039 	int minimum, void(*feedback)(int, int, caddr_t), caddr_t arg,
1040 	uint32_t xid)
1041 {
1042 	RCSTAT_INCR(rctimers);
1043 	return (0);
1044 }
1045 
1046 int
1047 rdma_reachable(int addr_type, struct netbuf *addr, struct knetconfig **knconf)
1048 {
1049 	rdma_registry_t	*rp;
1050 	void *handle = NULL;
1051 	struct knetconfig *knc;
1052 	char *pf, *p;
1053 	rdma_stat status;
1054 	int error = 0;
1055 
1056 	if (!INGLOBALZONE(curproc))
1057 		return (-1);
1058 	/*
1059 	 * modload the RDMA plugins if not already done.
1060 	 */
1061 	if (!rdma_modloaded) {
1062 		mutex_enter(&rdma_modload_lock);
1063 		if (!rdma_modloaded) {
1064 			error = rdma_modload();
1065 		}
1066 		mutex_exit(&rdma_modload_lock);
1067 		if (error)
1068 			return (-1);
1069 	}
1070 
1071 	if (!rdma_dev_available)
1072 		return (-1);
1073 
1074 	rw_enter(&rdma_lock, RW_READER);
1075 	rp = rdma_mod_head;
1076 	while (rp != NULL) {
1077 		status = RDMA_REACHABLE(rp->r_mod->rdma_ops, addr_type, addr,
1078 		    &handle);
1079 		if (status == RDMA_SUCCESS) {
1080 			knc = kmem_zalloc(sizeof (struct knetconfig),
1081 				KM_SLEEP);
1082 			knc->knc_semantics = NC_TPI_RDMA;
1083 			pf = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1084 			p = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1085 			if (addr_type == AF_INET)
1086 				(void) strncpy(pf, NC_INET, KNC_STRSIZE);
1087 			else if (addr_type == AF_INET6)
1088 				(void) strncpy(pf, NC_INET6, KNC_STRSIZE);
1089 			pf[KNC_STRSIZE - 1] = '\0';
1090 
1091 			(void) strncpy(p, rp->r_mod->rdma_api, KNC_STRSIZE);
1092 			p[KNC_STRSIZE - 1] = '\0';
1093 
1094 			knc->knc_protofmly = pf;
1095 			knc->knc_proto = p;
1096 			knc->knc_rdev = (dev_t)handle;
1097 			*knconf = knc;
1098 			rw_exit(&rdma_lock);
1099 			return (0);
1100 		}
1101 		rp = rp->r_next;
1102 	}
1103 	rw_exit(&rdma_lock);
1104 	return (-1);
1105 }
1106