/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License (the "License").
 * You may not use this file except in compliance with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

/*
 * Copyright (c) 2007, The Ohio State University. All rights reserved.
 *
 * Portions of this source code is developed by the team members of
 * The Ohio State University's Network-Based Computing Laboratory (NBCL),
 * headed by Professor Dhabaleswar K. (DK) Panda.
 *
 * Acknowledgements to contributions from developors:
 *   Ranjit Noronha: noronha@cse.ohio-state.edu
 *   Lei Chai      : chail@cse.ohio-state.edu
 *   Weikuan Yu    : yuw@cse.ohio-state.edu
 *
 */

/*
 * xdr_rdma.c, XDR implementation using RDMA to move large chunks
 */

#include <sys/param.h>
#include <sys/types.h>
#include <sys/systm.h>
#include <sys/kmem.h>
#include <sys/sdt.h>
#include <sys/debug.h>

#include <rpc/types.h>
#include <rpc/xdr.h>
#include <sys/cmn_err.h>
#include <rpc/rpc_sztypes.h>
#include <rpc/rpc_rdma.h>
#include <sys/sysmacros.h>

static bool_t   xdrrdma_getint32(XDR *, int32_t *);
static bool_t   xdrrdma_putint32(XDR *, int32_t *);
static bool_t   xdrrdma_getbytes(XDR *, caddr_t, int);
static bool_t   xdrrdma_putbytes(XDR *, caddr_t, int);
uint_t		xdrrdma_getpos(XDR *);
bool_t		xdrrdma_setpos(XDR *, uint_t);
static rpc_inline_t *xdrrdma_inline(XDR *, int);
void		xdrrdma_destroy(XDR *);
static bool_t   xdrrdma_control(XDR *, int, void *);
static bool_t  xdrrdma_read_a_chunk(XDR *, CONN **);
static void xdrrdma_free_xdr_chunks(CONN *, struct clist *);

struct xdr_ops  xdrrdmablk_ops = {
	xdrrdma_getbytes,
	xdrrdma_putbytes,
	xdrrdma_getpos,
	xdrrdma_setpos,
	xdrrdma_inline,
	xdrrdma_destroy,
	xdrrdma_control,
	xdrrdma_getint32,
	xdrrdma_putint32
};

struct xdr_ops  xdrrdma_ops = {
	xdrrdma_getbytes,
	xdrrdma_putbytes,
	xdrrdma_getpos,
	xdrrdma_setpos,
	xdrrdma_inline,
	xdrrdma_destroy,
	xdrrdma_control,
	xdrrdma_getint32,
	xdrrdma_putint32
};

/*
 * A chunk list entry identifies a chunk of opaque data to be moved
 * separately from the rest of the RPC message. xp_min_chunk = 0, is a
 * special case for ENCODING, which means do not chunk the incoming stream of
 * data.
 *
 * A read chunk can contain part of the RPC message in addition to the
 * inline message. In such a case, (xp_offp - x_base) will not provide
 * the correct xdr offset of the entire message. xp_off is used in such
 * a case to denote the offset or current position in the overall message
 * covering both the inline and the chunk. This is used only in the case
 * of decoding and useful to compare read chunk 'c_xdroff' offsets.
 *
 * An example for a read chunk containing an XDR message:
 * An NFSv4 compound as following:
 *
 * PUTFH
 * WRITE [4109 bytes]
 * GETATTR
 *
 * Solaris Encoding is:
 * -------------------
 *
 * <Inline message>: [PUTFH WRITE4args GETATTR]
 *                                   |
 *                                   v
 * [RDMA_READ chunks]:               [write data]
 *
 *
 * Linux encoding is:
 * -----------------
 *
 * <Inline message>: [PUTFH WRITE4args]
 *                                    |
 *                                    v
 * [RDMA_READ chunks]:                [Write data] [Write data2] [Getattr chunk]
 *                                     chunk1       chunk2         chunk3
 *
 * where the READ chunks are as:
 *
 *             - chunk1 - 4k
 * write data |
 *             - chunk2 - 13 bytes(4109 - 4k)
 * getattr op  - chunk3 - 19 bytes
 * (getattr op starts at byte 4 after 3 bytes of roundup)
 *
 */

typedef struct {
	caddr_t		xp_offp;
	int		xp_min_chunk;
	uint_t		xp_flags;	/* Controls setting for rdma xdr */
	int		xp_buf_size;	/* size of xdr buffer */
	int		xp_off;		/* overall offset */
	struct clist	*xp_rcl;	/* head of chunk list */
	struct clist	**xp_rcl_next;	/* location to place/find next chunk */
	struct clist	*xp_rcl_xdr;	/* copy of rcl containing RPC message */
	struct clist	*xp_wcl;	/* head of write chunk list */
	CONN		*xp_conn;	/* connection for chunk data xfer */
	uint_t		xp_reply_chunk_len;
	/* used to track length for security modes: integrity/privacy */
	uint_t		xp_reply_chunk_len_alt;
} xrdma_private_t;

extern kmem_cache_t *clist_cache;

bool_t
xdrrdma_getrdmablk(XDR *xdrs, struct clist **rlist, uint_t *sizep,
    CONN **conn, const uint_t maxsize)
{
	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
	struct clist	*cle = *(xdrp->xp_rcl_next);
	struct clist	*rdclist = NULL, *prev = NULL;
	bool_t		retval = TRUE;
	uint32_t	cur_offset = 0;
	uint32_t	total_segments = 0;
	uint32_t	actual_segments = 0;
	uint32_t	alen;
	uint_t		total_len;

	ASSERT(xdrs->x_op != XDR_FREE);

	/*
	 * first deal with the length since xdr bytes are counted
	 */
	if (!xdr_u_int(xdrs, sizep)) {
		DTRACE_PROBE(xdr__e__getrdmablk_sizep_fail);
		return (FALSE);
	}
	total_len = *sizep;
	if (total_len > maxsize) {
		DTRACE_PROBE2(xdr__e__getrdmablk_bad_size,
		    int, total_len, int, maxsize);
		return (FALSE);
	}
	(*conn) = xdrp->xp_conn;

	/*
	 * if no data we are done
	 */
	if (total_len == 0)
		return (TRUE);

	while (cle) {
		total_segments++;
		cle = cle->c_next;
	}

	cle = *(xdrp->xp_rcl_next);

	/*
	 * If there was a chunk at the current offset, then setup a read
	 * chunk list which records the destination address and length
	 * and will RDMA READ the data in later.
	 */
	if (cle == NULL)
		return (FALSE);

	if (cle->c_xdroff != (xdrp->xp_offp - xdrs->x_base))
		return (FALSE);

	/*
	 * Setup the chunk list with appropriate
	 * address (offset) and length
	 */
	for (actual_segments = 0;
	    actual_segments < total_segments; actual_segments++) {

		DTRACE_PROBE3(krpc__i__xdrrdma_getrdmablk, uint32_t, cle->c_len,
		    uint32_t, total_len, uint32_t, cle->c_xdroff);

		if (total_len <= 0)
			break;

		/*
		 * not the first time in the loop
		 */
		if (actual_segments > 0)
			cle = cle->c_next;

		cle->u.c_daddr = (uint64) cur_offset;
		alen = 0;
		if (cle->c_len > total_len) {
			alen = cle->c_len;
			cle->c_len = total_len;
		}
		if (!alen)
			xdrp->xp_rcl_next = &cle->c_next;

		cur_offset += cle->c_len;
		total_len -= cle->c_len;

		if ((total_segments - actual_segments - 1) == 0 &&
		    total_len > 0) {
			DTRACE_PROBE(krpc__e__xdrrdma_getblk_chunktooshort);
			retval = FALSE;
		}

		if ((total_segments - actual_segments - 1) > 0 &&
		    total_len == 0) {
			DTRACE_PROBE2(krpc__e__xdrrdma_getblk_toobig,
			    int, total_segments, int, actual_segments);
		}

		rdclist = clist_alloc();
		(*rdclist) = (*cle);
		if ((*rlist) == NULL)
			(*rlist) = rdclist;
		if (prev == NULL)
			prev = rdclist;
		else {
			prev->c_next = rdclist;
			prev = rdclist;
		}

	}

out:
	if (prev != NULL)
		prev->c_next = NULL;

	/*
	 * Adjust the chunk length, if we read only a part of
	 * a chunk.
	 */

	if (alen) {
		cle->w.c_saddr =
		    (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
		cle->c_len = alen - cle->c_len;
	}

	return (retval);
}

/*
 * The procedure xdrrdma_create initializes a stream descriptor for a memory
 * buffer.
 */
void
xdrrdma_create(XDR *xdrs, caddr_t addr, uint_t size,
    int min_chunk, struct clist *cl, enum xdr_op op, CONN *conn)
{
	xrdma_private_t *xdrp;
	struct clist   *cle;

	xdrs->x_op = op;
	xdrs->x_ops = &xdrrdma_ops;
	xdrs->x_base = addr;
	xdrs->x_handy = size;
	xdrs->x_public = NULL;

	xdrp = (xrdma_private_t *)kmem_zalloc(sizeof (xrdma_private_t),
	    KM_SLEEP);
	xdrs->x_private = (caddr_t)xdrp;
	xdrp->xp_offp = addr;
	xdrp->xp_min_chunk = min_chunk;
	xdrp->xp_flags = 0;
	xdrp->xp_buf_size = size;
	xdrp->xp_rcl = cl;
	xdrp->xp_reply_chunk_len = 0;
	xdrp->xp_reply_chunk_len_alt = 0;

	if (op == XDR_ENCODE && cl != NULL) {
		/* Find last element in chunk list and set xp_rcl_next */
		for (cle = cl; cle->c_next != NULL; cle = cle->c_next)
			continue;

		xdrp->xp_rcl_next = &(cle->c_next);
	} else {
		xdrp->xp_rcl_next = &(xdrp->xp_rcl);
	}

	xdrp->xp_wcl = NULL;

	xdrp->xp_conn = conn;
	if (xdrp->xp_min_chunk != 0)
		xdrp->xp_flags |= XDR_RDMA_CHUNK;
}

/* ARGSUSED */
void
xdrrdma_destroy(XDR * xdrs)
{
	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);

	if (xdrp == NULL)
		return;

	if (xdrp->xp_wcl) {
		if (xdrp->xp_flags & XDR_RDMA_WLIST_REG) {
			(void) clist_deregister(xdrp->xp_conn, xdrp->xp_wcl);
			rdma_buf_free(xdrp->xp_conn,
			    &xdrp->xp_wcl->rb_longbuf);
		}
		clist_free(xdrp->xp_wcl);
	}

	if (xdrp->xp_rcl) {
		if (xdrp->xp_flags & XDR_RDMA_RLIST_REG) {
			(void) clist_deregister(xdrp->xp_conn, xdrp->xp_rcl);
			rdma_buf_free(xdrp->xp_conn,
			    &xdrp->xp_rcl->rb_longbuf);
		}
		clist_free(xdrp->xp_rcl);
	}

	if (xdrp->xp_rcl_xdr)
		xdrrdma_free_xdr_chunks(xdrp->xp_conn, xdrp->xp_rcl_xdr);

	(void) kmem_free(xdrs->x_private, sizeof (xrdma_private_t));
	xdrs->x_private = NULL;
}

static	bool_t
xdrrdma_getint32(XDR *xdrs, int32_t *int32p)
{
	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
	int chunked = 0;

	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) {
		/*
		 * check if rest of the rpc message is in a chunk
		 */
		if (!xdrrdma_read_a_chunk(xdrs, &xdrp->xp_conn)) {
			return (FALSE);
		}
		chunked = 1;
	}

	/* LINTED pointer alignment */
	*int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp))));

	DTRACE_PROBE1(krpc__i__xdrrdma_getint32, int32_t, *int32p);

	xdrp->xp_offp += sizeof (int32_t);

	if (chunked)
		xdrs->x_handy -= (int)sizeof (int32_t);

	if (xdrp->xp_off != 0) {
		xdrp->xp_off += sizeof (int32_t);
	}

	return (TRUE);
}

static	bool_t
xdrrdma_putint32(XDR *xdrs, int32_t *int32p)
{
	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);

	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0)
		return (FALSE);

	/* LINTED pointer alignment */
	*(int32_t *)xdrp->xp_offp = (int32_t)htonl((uint32_t)(*int32p));
	xdrp->xp_offp += sizeof (int32_t);

	return (TRUE);
}

/*
 * DECODE bytes from XDR stream for rdma.
 * If the XDR stream contains a read chunk list,
 * it will go through xdrrdma_getrdmablk instead.
 */
static	bool_t
xdrrdma_getbytes(XDR *xdrs, caddr_t addr, int len)
{
	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
	struct clist	*cle = *(xdrp->xp_rcl_next);
	struct clist	*cls = *(xdrp->xp_rcl_next);
	struct clist	cl;
	bool_t		retval = TRUE;
	uint32_t	total_len = len;
	uint32_t	cur_offset = 0;
	uint32_t	total_segments = 0;
	uint32_t	actual_segments = 0;
	uint32_t	status = RDMA_SUCCESS;
	uint32_t	alen = 0;
	uint32_t	xpoff;

	while (cle) {
		total_segments++;
		cle = cle->c_next;
	}

	cle = *(xdrp->xp_rcl_next);

	if (xdrp->xp_off) {
		xpoff = xdrp->xp_off;
	} else {
		xpoff = (xdrp->xp_offp - xdrs->x_base);
	}

	/*
	 * If there was a chunk at the current offset, then setup a read
	 * chunk list which records the destination address and length
	 * and will RDMA READ the data in later.
	 */

	if (cle != NULL && cle->c_xdroff == xpoff) {
		for (actual_segments = 0;
		    actual_segments < total_segments; actual_segments++) {

			if (total_len <= 0)
				break;

			if (status != RDMA_SUCCESS)
				goto out;

			cle->u.c_daddr = (uint64)(uintptr_t)addr + cur_offset;
			alen = 0;
			if (cle->c_len > total_len) {
				alen = cle->c_len;
				cle->c_len = total_len;
			}
			if (!alen)
				xdrp->xp_rcl_next = &cle->c_next;

			cur_offset += cle->c_len;
			total_len -= cle->c_len;

			if ((total_segments - actual_segments - 1) == 0 &&
			    total_len > 0) {
				DTRACE_PROBE(
				    krpc__e__xdrrdma_getbytes_chunktooshort);
				retval = FALSE;
			}

			if ((total_segments - actual_segments - 1) > 0 &&
			    total_len == 0) {
				DTRACE_PROBE2(krpc__e__xdrrdma_getbytes_toobig,
				    int, total_segments, int, actual_segments);
			}

			/*
			 * RDMA READ the chunk data from the remote end.
			 * First prep the destination buffer by registering
			 * it, then RDMA READ the chunk data. Since we are
			 * doing streaming memory, sync the destination
			 * buffer to CPU and deregister the buffer.
			 */
			if (xdrp->xp_conn == NULL) {
				return (FALSE);
			}
			cl = *cle;
			cl.c_next = NULL;
			status = clist_register(xdrp->xp_conn, &cl,
			    CLIST_REG_DST);
			if (status != RDMA_SUCCESS) {
				retval = FALSE;
				/*
				 * Deregister the previous chunks
				 * before return
				 */
				goto out;
			}

			cle->c_dmemhandle = cl.c_dmemhandle;
			cle->c_dsynchandle = cl.c_dsynchandle;

			/*
			 * Now read the chunk in
			 */
			if ((total_segments - actual_segments - 1) == 0 ||
			    total_len == 0) {
				status = RDMA_READ(xdrp->xp_conn, &cl, WAIT);
			} else {
				status = RDMA_READ(xdrp->xp_conn, &cl, NOWAIT);
			}
			if (status != RDMA_SUCCESS) {
				DTRACE_PROBE1(
				    krpc__i__xdrrdma_getblk_readfailed,
				    int, status);
				retval = FALSE;
			}

			cle = cle->c_next;

		}

		/*
		 * sync the memory for cpu
		 */
		cl = *cls;
		cl.c_next = NULL;
		cl.c_len = cur_offset;
		if (clist_syncmem(
		    xdrp->xp_conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
			retval = FALSE;
		}
out:

		/*
		 * Deregister the chunks
		 */
		cle = cls;
		while (actual_segments != 0) {
			cl = *cle;
			cl.c_next = NULL;

			cl.c_regtype = CLIST_REG_DST;
			(void) clist_deregister(xdrp->xp_conn, &cl);

			cle = cle->c_next;
			actual_segments--;
		}

		if (alen) {
			cle = *(xdrp->xp_rcl_next);
			cle->w.c_saddr =
			    (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
			cle->c_len = alen - cle->c_len;
		}

		return (retval);
	}

	if ((xdrs->x_handy -= len) < 0)
		return (FALSE);

	bcopy(xdrp->xp_offp, addr, len);

	xdrp->xp_offp += len;

	if (xdrp->xp_off != 0)
		xdrp->xp_off += len;

	return (TRUE);
}

/*
 * ENCODE some bytes into an XDR stream xp_min_chunk = 0, means the stream of
 * bytes contain no chunks to seperate out, and if the bytes do not fit in
 * the supplied buffer, grow the buffer and free the old buffer.
 */
static	bool_t
xdrrdma_putbytes(XDR *xdrs, caddr_t addr, int len)
{
	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
	/*
	 * Is this stream accepting chunks?
	 * If so, does the either of the two following conditions exist?
	 * - length of bytes to encode is greater than the min chunk size?
	 * - remaining space in this stream is shorter than length of
	 *   bytes to encode?
	 *
	 * If the above exists, then create a chunk for this encoding
	 * and save the addresses, etc.
	 */
	if (xdrp->xp_flags & XDR_RDMA_CHUNK &&
	    ((xdrp->xp_min_chunk != 0 &&
	    len >= xdrp->xp_min_chunk) ||
	    (xdrs->x_handy - len  < 0))) {
		struct clist	*cle;
		int		offset = xdrp->xp_offp - xdrs->x_base;

		cle = clist_alloc();
		cle->c_xdroff = offset;
		cle->c_len = len;
		cle->w.c_saddr = (uint64)(uintptr_t)addr;
		cle->c_next = NULL;

		*(xdrp->xp_rcl_next) = cle;
		xdrp->xp_rcl_next = &(cle->c_next);

		return (TRUE);
	}
	/* Is there enough space to encode what is left? */
	if ((xdrs->x_handy -= len) < 0) {
		return (FALSE);
	}
	bcopy(addr, xdrp->xp_offp, len);
	xdrp->xp_offp += len;

	return (TRUE);
}

uint_t
xdrrdma_getpos(XDR *xdrs)
{
	xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);

	return ((uint_t)((uintptr_t)xdrp->xp_offp - (uintptr_t)xdrs->x_base));
}

bool_t
xdrrdma_setpos(XDR *xdrs, uint_t pos)
{
	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);

	caddr_t		newaddr = xdrs->x_base + pos;
	caddr_t		lastaddr = xdrp->xp_offp + xdrs->x_handy;
	ptrdiff_t	diff;

	if (newaddr > lastaddr)
		return (FALSE);

	xdrp->xp_offp = newaddr;
	diff = lastaddr - newaddr;
	xdrs->x_handy = (int)diff;

	return (TRUE);
}

/* ARGSUSED */
static rpc_inline_t *
xdrrdma_inline(XDR *xdrs, int len)
{
	rpc_inline_t	*buf = NULL;
	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
	struct clist	*cle = *(xdrp->xp_rcl_next);

	if (xdrs->x_op == XDR_DECODE) {
		/*
		 * Since chunks aren't in-line, check to see whether there is
		 * a chunk in the inline range.
		 */
		if (cle != NULL &&
		    cle->c_xdroff <= (xdrp->xp_offp - xdrs->x_base + len))
			return (NULL);
	}

	/* LINTED pointer alignment */
	buf = (rpc_inline_t *)xdrp->xp_offp;
	if (!IS_P2ALIGNED(buf, sizeof (int32_t)))
		return (NULL);

	if ((xdrs->x_handy < len) || (xdrp->xp_min_chunk != 0 &&
	    len >= xdrp->xp_min_chunk)) {
		return (NULL);
	} else {
		xdrs->x_handy -= len;
		xdrp->xp_offp += len;
		return (buf);
	}
}

static	bool_t
xdrrdma_control(XDR *xdrs, int request, void *info)
{
	int32_t		*int32p;
	int		len, i;
	uint_t		in_flags;
	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
	rdma_chunkinfo_t *rcip = NULL;
	rdma_wlist_conn_info_t *rwcip = NULL;
	rdma_chunkinfo_lengths_t *rcilp = NULL;
	struct uio *uiop;
	struct clist	*rwl = NULL;
	struct clist	*prev = NULL;

	switch (request) {
	case XDR_PEEK:
		/*
		 * Return the next 4 byte unit in the XDR stream.
		 */
		if (xdrs->x_handy < sizeof (int32_t))
			return (FALSE);

		int32p = (int32_t *)info;
		*int32p = (int32_t)ntohl((uint32_t)
		    (*((int32_t *)(xdrp->xp_offp))));

		return (TRUE);

	case XDR_SKIPBYTES:
		/*
		 * Skip the next N bytes in the XDR stream.
		 */
		int32p = (int32_t *)info;
		len = RNDUP((int)(*int32p));
		if ((xdrs->x_handy -= len) < 0)
			return (FALSE);
		xdrp->xp_offp += len;

		return (TRUE);

	case XDR_RDMA_SET_FLAGS:
		/*
		 * Set the flags provided in the *info in xp_flags for rdma
		 * xdr stream control.
		 */
		int32p = (int32_t *)info;
		in_flags = (uint_t)(*int32p);

		xdrp->xp_flags |= in_flags;
		return (TRUE);

	case XDR_RDMA_GET_FLAGS:
		/*
		 * Get the flags provided in xp_flags return through *info
		 */
		int32p = (int32_t *)info;

		*int32p = (int32_t)xdrp->xp_flags;
		return (TRUE);

	case XDR_RDMA_GET_CHUNK_LEN:
		rcilp = (rdma_chunkinfo_lengths_t *)info;
		rcilp->rcil_len = xdrp->xp_reply_chunk_len;
		rcilp->rcil_len_alt = xdrp->xp_reply_chunk_len_alt;

		return (TRUE);

	case XDR_RDMA_ADD_CHUNK:
		/*
		 * Store wlist information
		 */

		rcip = (rdma_chunkinfo_t *)info;

		DTRACE_PROBE2(krpc__i__xdrrdma__control__add__chunk,
		    rci_type_t, rcip->rci_type, uint32, rcip->rci_len);
		switch (rcip->rci_type) {
		case RCI_WRITE_UIO_CHUNK:
			xdrp->xp_reply_chunk_len_alt += rcip->rci_len;

			if (rcip->rci_len < xdrp->xp_min_chunk) {
				xdrp->xp_wcl = NULL;
				*(rcip->rci_clpp) = NULL;
				return (TRUE);
			}
			uiop = rcip->rci_a.rci_uiop;

			for (i = 0; i < uiop->uio_iovcnt; i++) {
				rwl = clist_alloc();
				rwl->c_len = uiop->uio_iov[i].iov_len;
				rwl->u.c_daddr =
				    (uint64)(uintptr_t)
				    (uiop->uio_iov[i].iov_base);
				/*
				 * if userspace address, put adspace ptr in
				 * clist. If not, then do nothing since it's
				 * already set to NULL (from kmem_zalloc)
				 */
				if (uiop->uio_segflg == UIO_USERSPACE) {
					rwl->c_adspc = ttoproc(curthread)->p_as;
				}

				if (prev == NULL)
					prev = rwl;
				else {
					prev->c_next = rwl;
					prev = rwl;
				}
			}

			rwl->c_next = NULL;
			xdrp->xp_wcl = rwl;
			*(rcip->rci_clpp) = rwl;

			break;

		case RCI_WRITE_ADDR_CHUNK:
			rwl = clist_alloc();

			rwl->c_len = rcip->rci_len;
			rwl->u.c_daddr3 = rcip->rci_a.rci_addr;
			rwl->c_next = NULL;
			xdrp->xp_reply_chunk_len_alt += rcip->rci_len;

			xdrp->xp_wcl = rwl;
			*(rcip->rci_clpp) = rwl;

			break;

		case RCI_REPLY_CHUNK:
			xdrp->xp_reply_chunk_len += rcip->rci_len;
			break;
		}
		return (TRUE);

	case XDR_RDMA_GET_WLIST:
		*((struct clist **)info) = xdrp->xp_wcl;
		return (TRUE);

	case XDR_RDMA_SET_WLIST:
		xdrp->xp_wcl = (struct clist *)info;
		return (TRUE);

	case XDR_RDMA_GET_RLIST:
		*((struct clist **)info) = xdrp->xp_rcl;
		return (TRUE);

	case XDR_RDMA_GET_WCINFO:
		rwcip = (rdma_wlist_conn_info_t *)info;

		rwcip->rwci_wlist = xdrp->xp_wcl;
		rwcip->rwci_conn = xdrp->xp_conn;

		return (TRUE);

	default:
		return (FALSE);
	}
}

bool_t xdr_do_clist(XDR *, clist **);

/*
 * Not all fields in struct clist are interesting to the RPC over RDMA
 * protocol. Only XDR the interesting fields.
 */
bool_t
xdr_clist(XDR *xdrs, clist *objp)
{
	if (!xdr_uint32(xdrs, &objp->c_xdroff))
		return (FALSE);
	if (!xdr_uint32(xdrs, &objp->c_smemhandle.mrc_rmr))
		return (FALSE);
	if (!xdr_uint32(xdrs, &objp->c_len))
		return (FALSE);
	if (!xdr_uint64(xdrs, &objp->w.c_saddr))
		return (FALSE);
	if (!xdr_do_clist(xdrs, &objp->c_next))
		return (FALSE);
	return (TRUE);
}

/*
 * The following two functions are forms of xdr_pointer()
 * and xdr_reference(). Since the generic versions just
 * kmem_alloc() a new clist, we actually want to use the
 * rdma_clist kmem_cache.
 */

/*
 * Generate or free a clist structure from the
 * kmem_cache "rdma_clist"
 */
bool_t
xdr_ref_clist(XDR *xdrs, caddr_t *pp)
{
	caddr_t loc = *pp;
	bool_t stat;

	if (loc == NULL) {
		switch (xdrs->x_op) {
		case XDR_FREE:
			return (TRUE);

		case XDR_DECODE:
			*pp = loc = (caddr_t)clist_alloc();
			break;

		case XDR_ENCODE:
			ASSERT(loc);
			break;
		}
	}

	stat = xdr_clist(xdrs, (struct clist *)loc);

	if (xdrs->x_op == XDR_FREE) {
		kmem_cache_free(clist_cache, loc);
		*pp = NULL;
	}
	return (stat);
}

/*
 * XDR a pointer to a possibly recursive clist. This differs
 * with xdr_reference in that it can serialize/deserialiaze
 * trees correctly.
 *
 *  What is sent is actually a union:
 *
 *  union object_pointer switch (boolean b) {
 *  case TRUE: object_data data;
 *  case FALSE: void nothing;
 *  }
 *
 * > objpp: Pointer to the pointer to the object.
 *
 */

bool_t
xdr_do_clist(XDR *xdrs, clist **objpp)
{
	bool_t more_data;

	more_data = (*objpp != NULL);
	if (!xdr_bool(xdrs, &more_data))
		return (FALSE);
	if (!more_data) {
		*objpp = NULL;
		return (TRUE);
	}
	return (xdr_ref_clist(xdrs, (caddr_t *)objpp));
}

uint_t
xdr_getbufsize(XDR *xdrs)
{
	xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);

	return ((uint_t)xdrp->xp_buf_size);
}

/* ARGSUSED */
bool_t
xdr_encode_rlist_svc(XDR *xdrs, clist *rlist)
{
	bool_t	vfalse = FALSE;

	ASSERT(rlist == NULL);
	return (xdr_bool(xdrs, &vfalse));
}

bool_t
xdr_encode_wlist(XDR *xdrs, clist *w)
{
	bool_t		vfalse = FALSE, vtrue = TRUE;
	int		i;
	uint_t		num_segment = 0;
	struct clist	*cl;

	/* does a wlist exist? */
	if (w == NULL) {
		return (xdr_bool(xdrs, &vfalse));
	}
	/* Encode N consecutive segments, 1, N, HLOO, ..., HLOO, 0 */
	if (!xdr_bool(xdrs, &vtrue))
		return (FALSE);

	for (cl = w; cl != NULL; cl = cl->c_next) {
		num_segment++;
	}

	if (!xdr_uint32(xdrs, &num_segment))
		return (FALSE);
	for (i = 0; i < num_segment; i++) {

		DTRACE_PROBE1(krpc__i__xdr_encode_wlist_len, uint_t, w->c_len);

		if (!xdr_uint32(xdrs, &w->c_dmemhandle.mrc_rmr))
			return (FALSE);

		if (!xdr_uint32(xdrs, &w->c_len))
			return (FALSE);

		if (!xdr_uint64(xdrs, &w->u.c_daddr))
			return (FALSE);

		w = w->c_next;
	}

	if (!xdr_bool(xdrs, &vfalse))
		return (FALSE);

	return (TRUE);
}


/*
 * Conditionally decode a RDMA WRITE chunk list from XDR stream.
 *
 * If the next boolean in the XDR stream is false there is no
 * RDMA WRITE chunk list present. Otherwise iterate over the
 * array and for each entry: allocate a struct clist and decode.
 * Pass back an indication via wlist_exists if we have seen a
 * RDMA WRITE chunk list.
 */
bool_t
xdr_decode_wlist(XDR *xdrs, struct clist **w, bool_t *wlist_exists)
{
	struct clist	*tmp;
	bool_t		more = FALSE;
	uint32_t	seg_array_len;
	uint32_t	i;

	if (!xdr_bool(xdrs, &more))
		return (FALSE);

	/* is there a wlist? */
	if (more == FALSE) {
		*wlist_exists = FALSE;
		return (TRUE);
	}
	*wlist_exists = TRUE;

	if (!xdr_uint32(xdrs, &seg_array_len))
		return (FALSE);

	tmp = *w = clist_alloc();
	for (i = 0; i < seg_array_len; i++) {

		if (!xdr_uint32(xdrs, &tmp->c_dmemhandle.mrc_rmr))
			return (FALSE);
		if (!xdr_uint32(xdrs, &tmp->c_len))
			return (FALSE);

		DTRACE_PROBE1(krpc__i__xdr_decode_wlist_len,
		    uint_t, tmp->c_len);

		if (!xdr_uint64(xdrs, &tmp->u.c_daddr))
			return (FALSE);
		if (i < seg_array_len - 1) {
			tmp->c_next = clist_alloc();
			tmp = tmp->c_next;
		} else {
			tmp->c_next = NULL;
		}
	}

	more = FALSE;
	if (!xdr_bool(xdrs, &more))
		return (FALSE);

	return (TRUE);
}

/*
 * Server side RDMA WRITE list decode.
 * XDR context is memory ops
 */
bool_t
xdr_decode_wlist_svc(XDR *xdrs, struct clist **wclp, bool_t *wwl,
    uint32_t *total_length, CONN *conn)
{
	struct clist	*first, *ncl;
	char		*memp;
	uint32_t	num_wclist;
	uint32_t	wcl_length = 0;
	uint32_t	i;
	bool_t		more = FALSE;

	*wclp = NULL;
	*wwl = FALSE;
	*total_length = 0;

	if (!xdr_bool(xdrs, &more)) {
		return (FALSE);
	}

	if (more == FALSE) {
		return (TRUE);
	}

	*wwl = TRUE;

	if (!xdr_uint32(xdrs, &num_wclist)) {
		DTRACE_PROBE(krpc__e__xdrrdma__wlistsvc__listlength);
		return (FALSE);
	}

	first = ncl = clist_alloc();

	for (i = 0; i < num_wclist; i++) {

		if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
			goto err_out;
		if (!xdr_uint32(xdrs, &ncl->c_len))
			goto err_out;
		if (!xdr_uint64(xdrs, &ncl->u.c_daddr))
			goto err_out;

		if (ncl->c_len > MAX_SVC_XFER_SIZE) {
			DTRACE_PROBE(
			    krpc__e__xdrrdma__wlistsvc__chunklist_toobig);
			ncl->c_len = MAX_SVC_XFER_SIZE;
		}

		DTRACE_PROBE1(krpc__i__xdr_decode_wlist_svc_len,
		    uint_t, ncl->c_len);

		wcl_length += ncl->c_len;

		if (i < num_wclist - 1) {
			ncl->c_next = clist_alloc();
			ncl = ncl->c_next;
		}
	}

	if (!xdr_bool(xdrs, &more))
		goto err_out;

	first->rb_longbuf.type = RDMA_LONG_BUFFER;
	first->rb_longbuf.len =
	    wcl_length > WCL_BUF_LEN ? wcl_length : WCL_BUF_LEN;

	if (rdma_buf_alloc(conn, &first->rb_longbuf)) {
		clist_free(first);
		return (FALSE);
	}

	memp = first->rb_longbuf.addr;

	ncl = first;
	for (i = 0; i < num_wclist; i++) {
		ncl->w.c_saddr3 = (caddr_t)memp;
		memp += ncl->c_len;
		ncl = ncl->c_next;
	}

	*wclp = first;
	*total_length = wcl_length;
	return (TRUE);

err_out:
	clist_free(first);
	return (FALSE);
}

/*
 * XDR decode the long reply write chunk.
 */
bool_t
xdr_decode_reply_wchunk(XDR *xdrs, struct clist **clist)
{
	bool_t		have_rchunk = FALSE;
	struct clist	*first = NULL, *ncl = NULL;
	uint32_t	num_wclist;
	uint32_t	i;

	if (!xdr_bool(xdrs, &have_rchunk))
		return (FALSE);

	if (have_rchunk == FALSE)
		return (TRUE);

	if (!xdr_uint32(xdrs, &num_wclist)) {
		DTRACE_PROBE(krpc__e__xdrrdma__replywchunk__listlength);
		return (FALSE);
	}

	if (num_wclist == 0) {
		return (FALSE);
	}

	first = ncl = clist_alloc();

	for (i = 0; i < num_wclist; i++) {

		if (i > 0) {
			ncl->c_next = clist_alloc();
			ncl = ncl->c_next;
		}

		if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
			goto err_out;
		if (!xdr_uint32(xdrs, &ncl->c_len))
			goto err_out;
		if (!xdr_uint64(xdrs, &ncl->u.c_daddr))
			goto err_out;

		if (ncl->c_len > MAX_SVC_XFER_SIZE) {
			DTRACE_PROBE(
			    krpc__e__xdrrdma__replywchunk__chunklist_toobig);
			ncl->c_len = MAX_SVC_XFER_SIZE;
		}
		if (!(ncl->c_dmemhandle.mrc_rmr &&
		    (ncl->c_len > 0) && ncl->u.c_daddr))
			DTRACE_PROBE(
			    krpc__e__xdrrdma__replywchunk__invalid_segaddr);

		DTRACE_PROBE1(krpc__i__xdr_decode_reply_wchunk_c_len,
		    uint32_t, ncl->c_len);

	}
	*clist = first;
	return (TRUE);

err_out:
	clist_free(first);
	return (FALSE);
}


bool_t
xdr_encode_reply_wchunk(XDR *xdrs,
    struct clist *cl_longreply, uint32_t seg_array_len)
{
	int		i;
	bool_t		long_reply_exists = TRUE;
	uint32_t	length;
	uint64		offset;

	if (seg_array_len > 0) {
		if (!xdr_bool(xdrs, &long_reply_exists))
			return (FALSE);
		if (!xdr_uint32(xdrs, &seg_array_len))
			return (FALSE);

		for (i = 0; i < seg_array_len; i++) {
			if (!cl_longreply)
				return (FALSE);
			length = cl_longreply->c_len;
			offset = (uint64) cl_longreply->u.c_daddr;

			DTRACE_PROBE1(
			    krpc__i__xdr_encode_reply_wchunk_c_len,
			    uint32_t, length);

			if (!xdr_uint32(xdrs,
			    &cl_longreply->c_dmemhandle.mrc_rmr))
				return (FALSE);
			if (!xdr_uint32(xdrs, &length))
				return (FALSE);
			if (!xdr_uint64(xdrs, &offset))
				return (FALSE);
			cl_longreply = cl_longreply->c_next;
		}
	} else {
		long_reply_exists = FALSE;
		if (!xdr_bool(xdrs, &long_reply_exists))
			return (FALSE);
	}
	return (TRUE);
}
bool_t
xdrrdma_read_from_client(struct clist *rlist, CONN **conn, uint_t count)
{
	struct clist	*rdclist;
	struct clist	cl;
	uint_t		total_len = 0;
	uint32_t	status;
	bool_t		retval = TRUE;

	rlist->rb_longbuf.type = RDMA_LONG_BUFFER;
	rlist->rb_longbuf.len =
	    count > RCL_BUF_LEN ? count : RCL_BUF_LEN;

	if (rdma_buf_alloc(*conn, &rlist->rb_longbuf)) {
		return (FALSE);
	}

	/*
	 * The entire buffer is registered with the first chunk.
	 * Later chunks will use the same registered memory handle.
	 */

	cl = *rlist;
	cl.c_next = NULL;
	if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
		rdma_buf_free(*conn, &rlist->rb_longbuf);
		DTRACE_PROBE(
		    krpc__e__xdrrdma__readfromclient__clist__reg);
		return (FALSE);
	}

	rlist->c_regtype = CLIST_REG_DST;
	rlist->c_dmemhandle = cl.c_dmemhandle;
	rlist->c_dsynchandle = cl.c_dsynchandle;

	for (rdclist = rlist;
	    rdclist != NULL; rdclist = rdclist->c_next) {
		total_len += rdclist->c_len;
#if (defined(OBJ32)||defined(DEBUG32))
		rdclist->u.c_daddr3 =
		    (caddr_t)((char *)rlist->rb_longbuf.addr +
		    (uint32) rdclist->u.c_daddr3);
#else
		rdclist->u.c_daddr3 =
		    (caddr_t)((char *)rlist->rb_longbuf.addr +
		    (uint64) rdclist->u.c_daddr);

#endif
		cl = (*rdclist);
		cl.c_next = NULL;

		/*
		 * Use the same memory handle for all the chunks
		 */
		cl.c_dmemhandle = rlist->c_dmemhandle;
		cl.c_dsynchandle = rlist->c_dsynchandle;


		DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen,
		    int, rdclist->c_len);

		/*
		 * Now read the chunk in
		 */
		if (rdclist->c_next == NULL) {
			status = RDMA_READ(*conn, &cl, WAIT);
		} else {
			status = RDMA_READ(*conn, &cl, NOWAIT);
		}
		if (status != RDMA_SUCCESS) {
			DTRACE_PROBE(
			    krpc__e__xdrrdma__readfromclient__readfailed);
			rdma_buf_free(*conn, &rlist->rb_longbuf);
			return (FALSE);
		}
	}

	cl = (*rlist);
	cl.c_next = NULL;
	cl.c_len = total_len;
	if (clist_syncmem(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
		retval = FALSE;
	}
	return (retval);
}

bool_t
xdrrdma_free_clist(CONN *conn, struct clist *clp)
{
	rdma_buf_free(conn, &clp->rb_longbuf);
	clist_free(clp);
	return (TRUE);
}

bool_t
xdrrdma_send_read_data(XDR *xdrs, uint_t data_len, struct clist *wcl)
{
	int status;
	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
	struct xdr_ops *xops = xdrrdma_xops();
	struct clist *tcl, *wrcl, *cl;
	struct clist fcl;
	int rndup_present, rnduplen;

	rndup_present = 0;
	wrcl = NULL;

	/* caller is doing a sizeof */
	if (xdrs->x_ops != &xdrrdma_ops || xdrs->x_ops == xops)
		return (TRUE);

	/* copy of the first chunk */
	fcl = *wcl;
	fcl.c_next = NULL;

	/*
	 * The entire buffer is registered with the first chunk.
	 * Later chunks will use the same registered memory handle.
	 */

	status = clist_register(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE);
	if (status != RDMA_SUCCESS) {
		return (FALSE);
	}

	wcl->c_regtype = CLIST_REG_SOURCE;
	wcl->c_smemhandle = fcl.c_smemhandle;
	wcl->c_ssynchandle = fcl.c_ssynchandle;

	/*
	 * Only transfer the read data ignoring any trailing
	 * roundup chunks. A bit of work, but it saves an
	 * unnecessary extra RDMA_WRITE containing only
	 * roundup bytes.
	 */

	rnduplen = clist_len(wcl) - data_len;

	if (rnduplen) {

		tcl = wcl->c_next;

		/*
		 * Check if there is a trailing roundup chunk
		 */
		while (tcl) {
			if ((tcl->c_next == NULL) && (tcl->c_len == rnduplen)) {
				rndup_present = 1;
				break;
			}
			tcl = tcl->c_next;
		}

		/*
		 * Make a copy chunk list skipping the last chunk
		 */
		if (rndup_present) {
			cl = wcl;
			tcl = NULL;
			while (cl) {
				if (tcl == NULL) {
					tcl = clist_alloc();
					wrcl = tcl;
				} else {
					tcl->c_next = clist_alloc();
					tcl = tcl->c_next;
				}

				*tcl = *cl;
				cl = cl->c_next;
				/* last chunk */
				if (cl->c_next == NULL)
					break;
			}
			tcl->c_next = NULL;
		}
	}

	if (wrcl == NULL) {
		/* No roundup chunks */
		wrcl = wcl;
	}

	/*
	 * Set the registered memory handles for the
	 * rest of the chunks same as the first chunk.
	 */
	tcl = wrcl->c_next;
	while (tcl) {
		tcl->c_smemhandle = fcl.c_smemhandle;
		tcl->c_ssynchandle = fcl.c_ssynchandle;
		tcl = tcl->c_next;
	}

	/*
	 * Sync the total len beginning from the first chunk.
	 */
	fcl.c_len = clist_len(wrcl);
	status = clist_syncmem(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE);
	if (status != RDMA_SUCCESS) {
		return (FALSE);
	}

	status = RDMA_WRITE(xdrp->xp_conn, wrcl, WAIT);

	if (rndup_present)
		clist_free(wrcl);

	if (status != RDMA_SUCCESS) {
		return (FALSE);
	}

	return (TRUE);
}


/*
 * Reads one chunk at a time
 */

static bool_t
xdrrdma_read_a_chunk(XDR *xdrs, CONN **conn)
{
	int status;
	int32_t len = 0;
	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
	struct clist *cle = *(xdrp->xp_rcl_next);
	struct clist *rclp = xdrp->xp_rcl;
	struct clist *clp;

	/*
	 * len is used later to decide xdr offset in
	 * the chunk factoring any 4-byte XDR alignment
	 * (See read chunk example top of this file)
	 */
	while (rclp != cle) {
		len += rclp->c_len;
		rclp = rclp->c_next;
	}

	len = RNDUP(len) - len;

	ASSERT(xdrs->x_handy <= 0);

	/*
	 * If this is the first chunk to contain the RPC
	 * message set xp_off to the xdr offset of the
	 * inline message.
	 */
	if (xdrp->xp_off == 0)
		xdrp->xp_off = (xdrp->xp_offp - xdrs->x_base);

	if (cle == NULL || (cle->c_xdroff != xdrp->xp_off))
		return (FALSE);

	/*
	 * Make a copy of the chunk to read from client.
	 * Chunks are read on demand, so read only one
	 * for now.
	 */

	rclp = clist_alloc();
	*rclp = *cle;
	rclp->c_next = NULL;

	xdrp->xp_rcl_next = &cle->c_next;

	/*
	 * If there is a roundup present, then skip those
	 * bytes when reading.
	 */
	if (len) {
		rclp->w.c_saddr =
		    (uint64)(uintptr_t)rclp->w.c_saddr + len;
			rclp->c_len = rclp->c_len - len;
	}

	status = xdrrdma_read_from_client(rclp, conn, rclp->c_len);

	if (status == FALSE) {
		clist_free(rclp);
		return (status);
	}

	xdrp->xp_offp = rclp->rb_longbuf.addr;
	xdrs->x_base = xdrp->xp_offp;
	xdrs->x_handy = rclp->c_len;

	/*
	 * This copy of read chunks containing the XDR
	 * message is freed later in xdrrdma_destroy()
	 */

	if (xdrp->xp_rcl_xdr) {
		/* Add the chunk to end of the list */
		clp = xdrp->xp_rcl_xdr;
		while (clp->c_next != NULL)
			clp = clp->c_next;
		clp->c_next = rclp;
	} else {
		xdrp->xp_rcl_xdr = rclp;
	}
	return (TRUE);
}

static void
xdrrdma_free_xdr_chunks(CONN *conn, struct clist *xdr_rcl)
{
	struct clist *cl;

	(void) clist_deregister(conn, xdr_rcl);

	/*
	 * Read chunks containing parts XDR message are
	 * special: in case of multiple chunks each has
	 * its own buffer.
	 */

	cl = xdr_rcl;
	while (cl) {
		rdma_buf_free(conn, &cl->rb_longbuf);
		cl = cl->c_next;
	}

	clist_free(xdr_rcl);
}