/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License (the "License"). * You may not use this file except in compliance with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * Copyright 2009 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ /* * Copyright (c) 2007, The Ohio State University. All rights reserved. * * Portions of this source code is developed by the team members of * The Ohio State University's Network-Based Computing Laboratory (NBCL), * headed by Professor Dhabaleswar K. (DK) Panda. * * Acknowledgements to contributions from developors: * Ranjit Noronha: noronha@cse.ohio-state.edu * Lei Chai : chail@cse.ohio-state.edu * Weikuan Yu : yuw@cse.ohio-state.edu * */ /* * xdr_rdma.c, XDR implementation using RDMA to move large chunks */ #include #include #include #include #include #include #include #include #include #include #include #include static bool_t xdrrdma_getint32(XDR *, int32_t *); static bool_t xdrrdma_putint32(XDR *, int32_t *); static bool_t xdrrdma_getbytes(XDR *, caddr_t, int); static bool_t xdrrdma_putbytes(XDR *, caddr_t, int); uint_t xdrrdma_getpos(XDR *); bool_t xdrrdma_setpos(XDR *, uint_t); static rpc_inline_t *xdrrdma_inline(XDR *, int); void xdrrdma_destroy(XDR *); static bool_t xdrrdma_control(XDR *, int, void *); static bool_t xdrrdma_read_a_chunk(XDR *, CONN **); static void xdrrdma_free_xdr_chunks(CONN *, struct clist *); struct xdr_ops xdrrdmablk_ops = { xdrrdma_getbytes, xdrrdma_putbytes, xdrrdma_getpos, xdrrdma_setpos, xdrrdma_inline, xdrrdma_destroy, xdrrdma_control, xdrrdma_getint32, xdrrdma_putint32 }; struct xdr_ops xdrrdma_ops = { xdrrdma_getbytes, xdrrdma_putbytes, xdrrdma_getpos, xdrrdma_setpos, xdrrdma_inline, xdrrdma_destroy, xdrrdma_control, xdrrdma_getint32, xdrrdma_putint32 }; /* * A chunk list entry identifies a chunk of opaque data to be moved * separately from the rest of the RPC message. xp_min_chunk = 0, is a * special case for ENCODING, which means do not chunk the incoming stream of * data. * * A read chunk can contain part of the RPC message in addition to the * inline message. In such a case, (xp_offp - x_base) will not provide * the correct xdr offset of the entire message. xp_off is used in such * a case to denote the offset or current position in the overall message * covering both the inline and the chunk. This is used only in the case * of decoding and useful to compare read chunk 'c_xdroff' offsets. * * An example for a read chunk containing an XDR message: * An NFSv4 compound as following: * * PUTFH * WRITE [4109 bytes] * GETATTR * * Solaris Encoding is: * ------------------- * * : [PUTFH WRITE4args GETATTR] * | * v * [RDMA_READ chunks]: [write data] * * * Linux encoding is: * ----------------- * * : [PUTFH WRITE4args] * | * v * [RDMA_READ chunks]: [Write data] [Write data2] [Getattr chunk] * chunk1 chunk2 chunk3 * * where the READ chunks are as: * * - chunk1 - 4k * write data | * - chunk2 - 13 bytes(4109 - 4k) * getattr op - chunk3 - 19 bytes * (getattr op starts at byte 4 after 3 bytes of roundup) * */ typedef struct { caddr_t xp_offp; int xp_min_chunk; uint_t xp_flags; /* Controls setting for rdma xdr */ int xp_buf_size; /* size of xdr buffer */ int xp_off; /* overall offset */ struct clist *xp_rcl; /* head of chunk list */ struct clist **xp_rcl_next; /* location to place/find next chunk */ struct clist *xp_rcl_xdr; /* copy of rcl containing RPC message */ struct clist *xp_wcl; /* head of write chunk list */ CONN *xp_conn; /* connection for chunk data xfer */ uint_t xp_reply_chunk_len; /* used to track length for security modes: integrity/privacy */ uint_t xp_reply_chunk_len_alt; } xrdma_private_t; extern kmem_cache_t *clist_cache; bool_t xdrrdma_getrdmablk(XDR *xdrs, struct clist **rlist, uint_t *sizep, CONN **conn, const uint_t maxsize) { xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); struct clist *cle = *(xdrp->xp_rcl_next); struct clist *rdclist = NULL, *prev = NULL; bool_t retval = TRUE; uint32_t cur_offset = 0; uint32_t total_segments = 0; uint32_t actual_segments = 0; uint32_t alen; uint_t total_len; ASSERT(xdrs->x_op != XDR_FREE); /* * first deal with the length since xdr bytes are counted */ if (!xdr_u_int(xdrs, sizep)) { DTRACE_PROBE(xdr__e__getrdmablk_sizep_fail); return (FALSE); } total_len = *sizep; if (total_len > maxsize) { DTRACE_PROBE2(xdr__e__getrdmablk_bad_size, int, total_len, int, maxsize); return (FALSE); } (*conn) = xdrp->xp_conn; /* * if no data we are done */ if (total_len == 0) return (TRUE); while (cle) { total_segments++; cle = cle->c_next; } cle = *(xdrp->xp_rcl_next); /* * If there was a chunk at the current offset, then setup a read * chunk list which records the destination address and length * and will RDMA READ the data in later. */ if (cle == NULL) return (FALSE); if (cle->c_xdroff != (xdrp->xp_offp - xdrs->x_base)) return (FALSE); /* * Setup the chunk list with appropriate * address (offset) and length */ for (actual_segments = 0; actual_segments < total_segments; actual_segments++) { DTRACE_PROBE3(krpc__i__xdrrdma_getrdmablk, uint32_t, cle->c_len, uint32_t, total_len, uint32_t, cle->c_xdroff); if (total_len <= 0) break; /* * not the first time in the loop */ if (actual_segments > 0) cle = cle->c_next; cle->u.c_daddr = (uint64) cur_offset; alen = 0; if (cle->c_len > total_len) { alen = cle->c_len; cle->c_len = total_len; } if (!alen) xdrp->xp_rcl_next = &cle->c_next; cur_offset += cle->c_len; total_len -= cle->c_len; if ((total_segments - actual_segments - 1) == 0 && total_len > 0) { DTRACE_PROBE(krpc__e__xdrrdma_getblk_chunktooshort); retval = FALSE; } if ((total_segments - actual_segments - 1) > 0 && total_len == 0) { DTRACE_PROBE2(krpc__e__xdrrdma_getblk_toobig, int, total_segments, int, actual_segments); } rdclist = clist_alloc(); (*rdclist) = (*cle); if ((*rlist) == NULL) (*rlist) = rdclist; if (prev == NULL) prev = rdclist; else { prev->c_next = rdclist; prev = rdclist; } } out: if (prev != NULL) prev->c_next = NULL; /* * Adjust the chunk length, if we read only a part of * a chunk. */ if (alen) { cle->w.c_saddr = (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len; cle->c_len = alen - cle->c_len; } return (retval); } /* * The procedure xdrrdma_create initializes a stream descriptor for a memory * buffer. */ void xdrrdma_create(XDR *xdrs, caddr_t addr, uint_t size, int min_chunk, struct clist *cl, enum xdr_op op, CONN *conn) { xrdma_private_t *xdrp; struct clist *cle; xdrs->x_op = op; xdrs->x_ops = &xdrrdma_ops; xdrs->x_base = addr; xdrs->x_handy = size; xdrs->x_public = NULL; xdrp = (xrdma_private_t *)kmem_zalloc(sizeof (xrdma_private_t), KM_SLEEP); xdrs->x_private = (caddr_t)xdrp; xdrp->xp_offp = addr; xdrp->xp_min_chunk = min_chunk; xdrp->xp_flags = 0; xdrp->xp_buf_size = size; xdrp->xp_rcl = cl; xdrp->xp_reply_chunk_len = 0; xdrp->xp_reply_chunk_len_alt = 0; if (op == XDR_ENCODE && cl != NULL) { /* Find last element in chunk list and set xp_rcl_next */ for (cle = cl; cle->c_next != NULL; cle = cle->c_next) continue; xdrp->xp_rcl_next = &(cle->c_next); } else { xdrp->xp_rcl_next = &(xdrp->xp_rcl); } xdrp->xp_wcl = NULL; xdrp->xp_conn = conn; if (xdrp->xp_min_chunk != 0) xdrp->xp_flags |= XDR_RDMA_CHUNK; } /* ARGSUSED */ void xdrrdma_destroy(XDR * xdrs) { xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); if (xdrp == NULL) return; if (xdrp->xp_wcl) { if (xdrp->xp_flags & XDR_RDMA_WLIST_REG) { (void) clist_deregister(xdrp->xp_conn, xdrp->xp_wcl); rdma_buf_free(xdrp->xp_conn, &xdrp->xp_wcl->rb_longbuf); } clist_free(xdrp->xp_wcl); } if (xdrp->xp_rcl) { if (xdrp->xp_flags & XDR_RDMA_RLIST_REG) { (void) clist_deregister(xdrp->xp_conn, xdrp->xp_rcl); rdma_buf_free(xdrp->xp_conn, &xdrp->xp_rcl->rb_longbuf); } clist_free(xdrp->xp_rcl); } if (xdrp->xp_rcl_xdr) xdrrdma_free_xdr_chunks(xdrp->xp_conn, xdrp->xp_rcl_xdr); (void) kmem_free(xdrs->x_private, sizeof (xrdma_private_t)); xdrs->x_private = NULL; } static bool_t xdrrdma_getint32(XDR *xdrs, int32_t *int32p) { xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); int chunked = 0; if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) { /* * check if rest of the rpc message is in a chunk */ if (!xdrrdma_read_a_chunk(xdrs, &xdrp->xp_conn)) { return (FALSE); } chunked = 1; } /* LINTED pointer alignment */ *int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp)))); DTRACE_PROBE1(krpc__i__xdrrdma_getint32, int32_t, *int32p); xdrp->xp_offp += sizeof (int32_t); if (chunked) xdrs->x_handy -= (int)sizeof (int32_t); if (xdrp->xp_off != 0) { xdrp->xp_off += sizeof (int32_t); } return (TRUE); } static bool_t xdrrdma_putint32(XDR *xdrs, int32_t *int32p) { xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) return (FALSE); /* LINTED pointer alignment */ *(int32_t *)xdrp->xp_offp = (int32_t)htonl((uint32_t)(*int32p)); xdrp->xp_offp += sizeof (int32_t); return (TRUE); } /* * DECODE bytes from XDR stream for rdma. * If the XDR stream contains a read chunk list, * it will go through xdrrdma_getrdmablk instead. */ static bool_t xdrrdma_getbytes(XDR *xdrs, caddr_t addr, int len) { xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); struct clist *cle = *(xdrp->xp_rcl_next); struct clist *cls = *(xdrp->xp_rcl_next); struct clist cl; bool_t retval = TRUE; uint32_t total_len = len; uint32_t cur_offset = 0; uint32_t total_segments = 0; uint32_t actual_segments = 0; uint32_t status = RDMA_SUCCESS; uint32_t alen = 0; uint32_t xpoff; while (cle) { total_segments++; cle = cle->c_next; } cle = *(xdrp->xp_rcl_next); if (xdrp->xp_off) { xpoff = xdrp->xp_off; } else { xpoff = (xdrp->xp_offp - xdrs->x_base); } /* * If there was a chunk at the current offset, then setup a read * chunk list which records the destination address and length * and will RDMA READ the data in later. */ if (cle != NULL && cle->c_xdroff == xpoff) { for (actual_segments = 0; actual_segments < total_segments; actual_segments++) { if (total_len <= 0) break; if (status != RDMA_SUCCESS) goto out; cle->u.c_daddr = (uint64)(uintptr_t)addr + cur_offset; alen = 0; if (cle->c_len > total_len) { alen = cle->c_len; cle->c_len = total_len; } if (!alen) xdrp->xp_rcl_next = &cle->c_next; cur_offset += cle->c_len; total_len -= cle->c_len; if ((total_segments - actual_segments - 1) == 0 && total_len > 0) { DTRACE_PROBE( krpc__e__xdrrdma_getbytes_chunktooshort); retval = FALSE; } if ((total_segments - actual_segments - 1) > 0 && total_len == 0) { DTRACE_PROBE2(krpc__e__xdrrdma_getbytes_toobig, int, total_segments, int, actual_segments); } /* * RDMA READ the chunk data from the remote end. * First prep the destination buffer by registering * it, then RDMA READ the chunk data. Since we are * doing streaming memory, sync the destination * buffer to CPU and deregister the buffer. */ if (xdrp->xp_conn == NULL) { return (FALSE); } cl = *cle; cl.c_next = NULL; status = clist_register(xdrp->xp_conn, &cl, CLIST_REG_DST); if (status != RDMA_SUCCESS) { retval = FALSE; /* * Deregister the previous chunks * before return */ goto out; } cle->c_dmemhandle = cl.c_dmemhandle; cle->c_dsynchandle = cl.c_dsynchandle; /* * Now read the chunk in */ if ((total_segments - actual_segments - 1) == 0 || total_len == 0) { status = RDMA_READ(xdrp->xp_conn, &cl, WAIT); } else { status = RDMA_READ(xdrp->xp_conn, &cl, NOWAIT); } if (status != RDMA_SUCCESS) { DTRACE_PROBE1( krpc__i__xdrrdma_getblk_readfailed, int, status); retval = FALSE; } cle = cle->c_next; } /* * sync the memory for cpu */ cl = *cls; cl.c_next = NULL; cl.c_len = cur_offset; if (clist_syncmem( xdrp->xp_conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { retval = FALSE; } out: /* * Deregister the chunks */ cle = cls; while (actual_segments != 0) { cl = *cle; cl.c_next = NULL; cl.c_regtype = CLIST_REG_DST; (void) clist_deregister(xdrp->xp_conn, &cl); cle = cle->c_next; actual_segments--; } if (alen) { cle = *(xdrp->xp_rcl_next); cle->w.c_saddr = (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len; cle->c_len = alen - cle->c_len; } return (retval); } if ((xdrs->x_handy -= len) < 0) return (FALSE); bcopy(xdrp->xp_offp, addr, len); xdrp->xp_offp += len; if (xdrp->xp_off != 0) xdrp->xp_off += len; return (TRUE); } /* * ENCODE some bytes into an XDR stream xp_min_chunk = 0, means the stream of * bytes contain no chunks to seperate out, and if the bytes do not fit in * the supplied buffer, grow the buffer and free the old buffer. */ static bool_t xdrrdma_putbytes(XDR *xdrs, caddr_t addr, int len) { xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); /* * Is this stream accepting chunks? * If so, does the either of the two following conditions exist? * - length of bytes to encode is greater than the min chunk size? * - remaining space in this stream is shorter than length of * bytes to encode? * * If the above exists, then create a chunk for this encoding * and save the addresses, etc. */ if (xdrp->xp_flags & XDR_RDMA_CHUNK && ((xdrp->xp_min_chunk != 0 && len >= xdrp->xp_min_chunk) || (xdrs->x_handy - len < 0))) { struct clist *cle; int offset = xdrp->xp_offp - xdrs->x_base; cle = clist_alloc(); cle->c_xdroff = offset; cle->c_len = len; cle->w.c_saddr = (uint64)(uintptr_t)addr; cle->c_next = NULL; *(xdrp->xp_rcl_next) = cle; xdrp->xp_rcl_next = &(cle->c_next); return (TRUE); } /* Is there enough space to encode what is left? */ if ((xdrs->x_handy -= len) < 0) { return (FALSE); } bcopy(addr, xdrp->xp_offp, len); xdrp->xp_offp += len; return (TRUE); } uint_t xdrrdma_getpos(XDR *xdrs) { xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); return ((uint_t)((uintptr_t)xdrp->xp_offp - (uintptr_t)xdrs->x_base)); } bool_t xdrrdma_setpos(XDR *xdrs, uint_t pos) { xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); caddr_t newaddr = xdrs->x_base + pos; caddr_t lastaddr = xdrp->xp_offp + xdrs->x_handy; ptrdiff_t diff; if (newaddr > lastaddr) return (FALSE); xdrp->xp_offp = newaddr; diff = lastaddr - newaddr; xdrs->x_handy = (int)diff; return (TRUE); } /* ARGSUSED */ static rpc_inline_t * xdrrdma_inline(XDR *xdrs, int len) { rpc_inline_t *buf = NULL; xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); struct clist *cle = *(xdrp->xp_rcl_next); if (xdrs->x_op == XDR_DECODE) { /* * Since chunks aren't in-line, check to see whether there is * a chunk in the inline range. */ if (cle != NULL && cle->c_xdroff <= (xdrp->xp_offp - xdrs->x_base + len)) return (NULL); } /* LINTED pointer alignment */ buf = (rpc_inline_t *)xdrp->xp_offp; if (!IS_P2ALIGNED(buf, sizeof (int32_t))) return (NULL); if ((xdrs->x_handy < len) || (xdrp->xp_min_chunk != 0 && len >= xdrp->xp_min_chunk)) { return (NULL); } else { xdrs->x_handy -= len; xdrp->xp_offp += len; return (buf); } } static bool_t xdrrdma_control(XDR *xdrs, int request, void *info) { int32_t *int32p; int len, i; uint_t in_flags; xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); rdma_chunkinfo_t *rcip = NULL; rdma_wlist_conn_info_t *rwcip = NULL; rdma_chunkinfo_lengths_t *rcilp = NULL; struct uio *uiop; struct clist *rwl = NULL; struct clist *prev = NULL; switch (request) { case XDR_PEEK: /* * Return the next 4 byte unit in the XDR stream. */ if (xdrs->x_handy < sizeof (int32_t)) return (FALSE); int32p = (int32_t *)info; *int32p = (int32_t)ntohl((uint32_t) (*((int32_t *)(xdrp->xp_offp)))); return (TRUE); case XDR_SKIPBYTES: /* * Skip the next N bytes in the XDR stream. */ int32p = (int32_t *)info; len = RNDUP((int)(*int32p)); if ((xdrs->x_handy -= len) < 0) return (FALSE); xdrp->xp_offp += len; return (TRUE); case XDR_RDMA_SET_FLAGS: /* * Set the flags provided in the *info in xp_flags for rdma * xdr stream control. */ int32p = (int32_t *)info; in_flags = (uint_t)(*int32p); xdrp->xp_flags |= in_flags; return (TRUE); case XDR_RDMA_GET_FLAGS: /* * Get the flags provided in xp_flags return through *info */ int32p = (int32_t *)info; *int32p = (int32_t)xdrp->xp_flags; return (TRUE); case XDR_RDMA_GET_CHUNK_LEN: rcilp = (rdma_chunkinfo_lengths_t *)info; rcilp->rcil_len = xdrp->xp_reply_chunk_len; rcilp->rcil_len_alt = xdrp->xp_reply_chunk_len_alt; return (TRUE); case XDR_RDMA_ADD_CHUNK: /* * Store wlist information */ rcip = (rdma_chunkinfo_t *)info; switch (rcip->rci_type) { case RCI_WRITE_UIO_CHUNK: xdrp->xp_reply_chunk_len_alt += rcip->rci_len; if (rcip->rci_len < xdrp->xp_min_chunk) { xdrp->xp_wcl = NULL; *(rcip->rci_clpp) = NULL; return (TRUE); } uiop = rcip->rci_a.rci_uiop; for (i = 0; i < uiop->uio_iovcnt; i++) { rwl = clist_alloc(); rwl->c_len = uiop->uio_iov[i].iov_len; rwl->u.c_daddr = (uint64)(uintptr_t) (uiop->uio_iov[i].iov_base); /* * if userspace address, put adspace ptr in * clist. If not, then do nothing since it's * already set to NULL (from kmem_zalloc) */ if (uiop->uio_segflg == UIO_USERSPACE) { rwl->c_adspc = ttoproc(curthread)->p_as; } if (prev == NULL) prev = rwl; else { prev->c_next = rwl; prev = rwl; } } rwl->c_next = NULL; xdrp->xp_wcl = rwl; *(rcip->rci_clpp) = rwl; break; case RCI_WRITE_ADDR_CHUNK: rwl = clist_alloc(); rwl->c_len = rcip->rci_len; rwl->u.c_daddr3 = rcip->rci_a.rci_addr; rwl->c_next = NULL; xdrp->xp_reply_chunk_len_alt += rcip->rci_len; xdrp->xp_wcl = rwl; *(rcip->rci_clpp) = rwl; break; case RCI_REPLY_CHUNK: xdrp->xp_reply_chunk_len += rcip->rci_len; break; } return (TRUE); case XDR_RDMA_GET_WLIST: *((struct clist **)info) = xdrp->xp_wcl; return (TRUE); case XDR_RDMA_SET_WLIST: xdrp->xp_wcl = (struct clist *)info; return (TRUE); case XDR_RDMA_GET_RLIST: *((struct clist **)info) = xdrp->xp_rcl; return (TRUE); case XDR_RDMA_GET_WCINFO: rwcip = (rdma_wlist_conn_info_t *)info; rwcip->rwci_wlist = xdrp->xp_wcl; rwcip->rwci_conn = xdrp->xp_conn; return (TRUE); default: return (FALSE); } } bool_t xdr_do_clist(XDR *, clist **); /* * Not all fields in struct clist are interesting to the RPC over RDMA * protocol. Only XDR the interesting fields. */ bool_t xdr_clist(XDR *xdrs, clist *objp) { if (!xdr_uint32(xdrs, &objp->c_xdroff)) return (FALSE); if (!xdr_uint32(xdrs, &objp->c_smemhandle.mrc_rmr)) return (FALSE); if (!xdr_uint32(xdrs, &objp->c_len)) return (FALSE); if (!xdr_uint64(xdrs, &objp->w.c_saddr)) return (FALSE); if (!xdr_do_clist(xdrs, &objp->c_next)) return (FALSE); return (TRUE); } /* * The following two functions are forms of xdr_pointer() * and xdr_reference(). Since the generic versions just * kmem_alloc() a new clist, we actually want to use the * rdma_clist kmem_cache. */ /* * Generate or free a clist structure from the * kmem_cache "rdma_clist" */ bool_t xdr_ref_clist(XDR *xdrs, caddr_t *pp) { caddr_t loc = *pp; bool_t stat; if (loc == NULL) { switch (xdrs->x_op) { case XDR_FREE: return (TRUE); case XDR_DECODE: *pp = loc = (caddr_t)clist_alloc(); break; case XDR_ENCODE: ASSERT(loc); break; } } stat = xdr_clist(xdrs, (struct clist *)loc); if (xdrs->x_op == XDR_FREE) { kmem_cache_free(clist_cache, loc); *pp = NULL; } return (stat); } /* * XDR a pointer to a possibly recursive clist. This differs * with xdr_reference in that it can serialize/deserialiaze * trees correctly. * * What is sent is actually a union: * * union object_pointer switch (boolean b) { * case TRUE: object_data data; * case FALSE: void nothing; * } * * > objpp: Pointer to the pointer to the object. * */ bool_t xdr_do_clist(XDR *xdrs, clist **objpp) { bool_t more_data; more_data = (*objpp != NULL); if (!xdr_bool(xdrs, &more_data)) return (FALSE); if (!more_data) { *objpp = NULL; return (TRUE); } return (xdr_ref_clist(xdrs, (caddr_t *)objpp)); } uint_t xdr_getbufsize(XDR *xdrs) { xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); return ((uint_t)xdrp->xp_buf_size); } /* ARGSUSED */ bool_t xdr_encode_rlist_svc(XDR *xdrs, clist *rlist) { bool_t vfalse = FALSE; ASSERT(rlist == NULL); return (xdr_bool(xdrs, &vfalse)); } bool_t xdr_encode_wlist(XDR *xdrs, clist *w) { bool_t vfalse = FALSE, vtrue = TRUE; int i; uint_t num_segment = 0; struct clist *cl; /* does a wlist exist? */ if (w == NULL) { return (xdr_bool(xdrs, &vfalse)); } /* Encode N consecutive segments, 1, N, HLOO, ..., HLOO, 0 */ if (!xdr_bool(xdrs, &vtrue)) return (FALSE); for (cl = w; cl != NULL; cl = cl->c_next) { num_segment++; } if (!xdr_uint32(xdrs, &num_segment)) return (FALSE); for (i = 0; i < num_segment; i++) { DTRACE_PROBE1(krpc__i__xdr_encode_wlist_len, uint_t, w->c_len); if (!xdr_uint32(xdrs, &w->c_dmemhandle.mrc_rmr)) return (FALSE); if (!xdr_uint32(xdrs, &w->c_len)) return (FALSE); if (!xdr_uint64(xdrs, &w->u.c_daddr)) return (FALSE); w = w->c_next; } if (!xdr_bool(xdrs, &vfalse)) return (FALSE); return (TRUE); } /* * Conditionally decode a RDMA WRITE chunk list from XDR stream. * * If the next boolean in the XDR stream is false there is no * RDMA WRITE chunk list present. Otherwise iterate over the * array and for each entry: allocate a struct clist and decode. * Pass back an indication via wlist_exists if we have seen a * RDMA WRITE chunk list. */ bool_t xdr_decode_wlist(XDR *xdrs, struct clist **w, bool_t *wlist_exists) { struct clist *tmp; bool_t more = FALSE; uint32_t seg_array_len; uint32_t i; if (!xdr_bool(xdrs, &more)) return (FALSE); /* is there a wlist? */ if (more == FALSE) { *wlist_exists = FALSE; return (TRUE); } *wlist_exists = TRUE; if (!xdr_uint32(xdrs, &seg_array_len)) return (FALSE); tmp = *w = clist_alloc(); for (i = 0; i < seg_array_len; i++) { if (!xdr_uint32(xdrs, &tmp->c_dmemhandle.mrc_rmr)) return (FALSE); if (!xdr_uint32(xdrs, &tmp->c_len)) return (FALSE); DTRACE_PROBE1(krpc__i__xdr_decode_wlist_len, uint_t, tmp->c_len); if (!xdr_uint64(xdrs, &tmp->u.c_daddr)) return (FALSE); if (i < seg_array_len - 1) { tmp->c_next = clist_alloc(); tmp = tmp->c_next; } else { tmp->c_next = NULL; } } more = FALSE; if (!xdr_bool(xdrs, &more)) return (FALSE); return (TRUE); } /* * Server side RDMA WRITE list decode. * XDR context is memory ops */ bool_t xdr_decode_wlist_svc(XDR *xdrs, struct clist **wclp, bool_t *wwl, uint32_t *total_length, CONN *conn) { struct clist *first, *ncl; char *memp; uint32_t num_wclist; uint32_t wcl_length = 0; uint32_t i; bool_t more = FALSE; *wclp = NULL; *wwl = FALSE; *total_length = 0; if (!xdr_bool(xdrs, &more)) { return (FALSE); } if (more == FALSE) { return (TRUE); } *wwl = TRUE; if (!xdr_uint32(xdrs, &num_wclist)) { DTRACE_PROBE(krpc__e__xdrrdma__wlistsvc__listlength); return (FALSE); } first = ncl = clist_alloc(); for (i = 0; i < num_wclist; i++) { if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr)) goto err_out; if (!xdr_uint32(xdrs, &ncl->c_len)) goto err_out; if (!xdr_uint64(xdrs, &ncl->u.c_daddr)) goto err_out; if (ncl->c_len > MAX_SVC_XFER_SIZE) { DTRACE_PROBE( krpc__e__xdrrdma__wlistsvc__chunklist_toobig); ncl->c_len = MAX_SVC_XFER_SIZE; } DTRACE_PROBE1(krpc__i__xdr_decode_wlist_svc_len, uint_t, ncl->c_len); wcl_length += ncl->c_len; if (i < num_wclist - 1) { ncl->c_next = clist_alloc(); ncl = ncl->c_next; } } if (!xdr_bool(xdrs, &more)) goto err_out; first->rb_longbuf.type = RDMA_LONG_BUFFER; first->rb_longbuf.len = wcl_length > WCL_BUF_LEN ? wcl_length : WCL_BUF_LEN; if (rdma_buf_alloc(conn, &first->rb_longbuf)) { clist_free(first); return (FALSE); } memp = first->rb_longbuf.addr; ncl = first; for (i = 0; i < num_wclist; i++) { ncl->w.c_saddr3 = (caddr_t)memp; memp += ncl->c_len; ncl = ncl->c_next; } *wclp = first; *total_length = wcl_length; return (TRUE); err_out: clist_free(first); return (FALSE); } /* * XDR decode the long reply write chunk. */ bool_t xdr_decode_reply_wchunk(XDR *xdrs, struct clist **clist) { bool_t have_rchunk = FALSE; struct clist *first = NULL, *ncl = NULL; uint32_t num_wclist; uint32_t i; if (!xdr_bool(xdrs, &have_rchunk)) return (FALSE); if (have_rchunk == FALSE) return (TRUE); if (!xdr_uint32(xdrs, &num_wclist)) { DTRACE_PROBE(krpc__e__xdrrdma__replywchunk__listlength); return (FALSE); } if (num_wclist == 0) { return (FALSE); } first = ncl = clist_alloc(); for (i = 0; i < num_wclist; i++) { if (i > 0) { ncl->c_next = clist_alloc(); ncl = ncl->c_next; } if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr)) goto err_out; if (!xdr_uint32(xdrs, &ncl->c_len)) goto err_out; if (!xdr_uint64(xdrs, &ncl->u.c_daddr)) goto err_out; if (ncl->c_len > MAX_SVC_XFER_SIZE) { DTRACE_PROBE( krpc__e__xdrrdma__replywchunk__chunklist_toobig); ncl->c_len = MAX_SVC_XFER_SIZE; } if (!(ncl->c_dmemhandle.mrc_rmr && (ncl->c_len > 0) && ncl->u.c_daddr)) DTRACE_PROBE( krpc__e__xdrrdma__replywchunk__invalid_segaddr); DTRACE_PROBE1(krpc__i__xdr_decode_reply_wchunk_c_len, uint32_t, ncl->c_len); } *clist = first; return (TRUE); err_out: clist_free(first); return (FALSE); } bool_t xdr_encode_reply_wchunk(XDR *xdrs, struct clist *cl_longreply, uint32_t seg_array_len) { int i; bool_t long_reply_exists = TRUE; uint32_t length; uint64 offset; if (seg_array_len > 0) { if (!xdr_bool(xdrs, &long_reply_exists)) return (FALSE); if (!xdr_uint32(xdrs, &seg_array_len)) return (FALSE); for (i = 0; i < seg_array_len; i++) { if (!cl_longreply) return (FALSE); length = cl_longreply->c_len; offset = (uint64) cl_longreply->u.c_daddr; DTRACE_PROBE1( krpc__i__xdr_encode_reply_wchunk_c_len, uint32_t, length); if (!xdr_uint32(xdrs, &cl_longreply->c_dmemhandle.mrc_rmr)) return (FALSE); if (!xdr_uint32(xdrs, &length)) return (FALSE); if (!xdr_uint64(xdrs, &offset)) return (FALSE); cl_longreply = cl_longreply->c_next; } } else { long_reply_exists = FALSE; if (!xdr_bool(xdrs, &long_reply_exists)) return (FALSE); } return (TRUE); } bool_t xdrrdma_read_from_client(struct clist *rlist, CONN **conn, uint_t count) { struct clist *rdclist; struct clist cl; uint_t total_len = 0; uint32_t status; bool_t retval = TRUE; rlist->rb_longbuf.type = RDMA_LONG_BUFFER; rlist->rb_longbuf.len = count > RCL_BUF_LEN ? count : RCL_BUF_LEN; if (rdma_buf_alloc(*conn, &rlist->rb_longbuf)) { return (FALSE); } /* * The entire buffer is registered with the first chunk. * Later chunks will use the same registered memory handle. */ cl = *rlist; cl.c_next = NULL; if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { rdma_buf_free(*conn, &rlist->rb_longbuf); DTRACE_PROBE( krpc__e__xdrrdma__readfromclient__clist__reg); return (FALSE); } rlist->c_regtype = CLIST_REG_DST; rlist->c_dmemhandle = cl.c_dmemhandle; rlist->c_dsynchandle = cl.c_dsynchandle; for (rdclist = rlist; rdclist != NULL; rdclist = rdclist->c_next) { total_len += rdclist->c_len; #if (defined(OBJ32)||defined(DEBUG32)) rdclist->u.c_daddr3 = (caddr_t)((char *)rlist->rb_longbuf.addr + (uint32) rdclist->u.c_daddr3); #else rdclist->u.c_daddr3 = (caddr_t)((char *)rlist->rb_longbuf.addr + (uint64) rdclist->u.c_daddr); #endif cl = (*rdclist); cl.c_next = NULL; /* * Use the same memory handle for all the chunks */ cl.c_dmemhandle = rlist->c_dmemhandle; cl.c_dsynchandle = rlist->c_dsynchandle; DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen, int, rdclist->c_len); /* * Now read the chunk in */ if (rdclist->c_next == NULL) { status = RDMA_READ(*conn, &cl, WAIT); } else { status = RDMA_READ(*conn, &cl, NOWAIT); } if (status != RDMA_SUCCESS) { DTRACE_PROBE( krpc__e__xdrrdma__readfromclient__readfailed); rdma_buf_free(*conn, &rlist->rb_longbuf); return (FALSE); } } cl = (*rlist); cl.c_next = NULL; cl.c_len = total_len; if (clist_syncmem(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { retval = FALSE; } return (retval); } bool_t xdrrdma_free_clist(CONN *conn, struct clist *clp) { rdma_buf_free(conn, &clp->rb_longbuf); clist_free(clp); return (TRUE); } bool_t xdrrdma_send_read_data(XDR *xdrs, uint_t data_len, struct clist *wcl) { int status; xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); struct xdr_ops *xops = xdrrdma_xops(); struct clist *tcl, *wrcl, *cl; struct clist fcl; int rndup_present, rnduplen; rndup_present = 0; wrcl = NULL; /* caller is doing a sizeof */ if (xdrs->x_ops != &xdrrdma_ops || xdrs->x_ops == xops) return (TRUE); /* copy of the first chunk */ fcl = *wcl; fcl.c_next = NULL; /* * The entire buffer is registered with the first chunk. * Later chunks will use the same registered memory handle. */ status = clist_register(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE); if (status != RDMA_SUCCESS) { return (FALSE); } wcl->c_regtype = CLIST_REG_SOURCE; wcl->c_smemhandle = fcl.c_smemhandle; wcl->c_ssynchandle = fcl.c_ssynchandle; /* * Only transfer the read data ignoring any trailing * roundup chunks. A bit of work, but it saves an * unnecessary extra RDMA_WRITE containing only * roundup bytes. */ rnduplen = clist_len(wcl) - data_len; if (rnduplen) { tcl = wcl->c_next; /* * Check if there is a trailing roundup chunk */ while (tcl) { if ((tcl->c_next == NULL) && (tcl->c_len == rnduplen)) { rndup_present = 1; break; } tcl = tcl->c_next; } /* * Make a copy chunk list skipping the last chunk */ if (rndup_present) { cl = wcl; tcl = NULL; while (cl) { if (tcl == NULL) { tcl = clist_alloc(); wrcl = tcl; } else { tcl->c_next = clist_alloc(); tcl = tcl->c_next; } *tcl = *cl; cl = cl->c_next; /* last chunk */ if (cl->c_next == NULL) break; } tcl->c_next = NULL; } } if (wrcl == NULL) { /* No roundup chunks */ wrcl = wcl; } /* * Set the registered memory handles for the * rest of the chunks same as the first chunk. */ tcl = wrcl->c_next; while (tcl) { tcl->c_smemhandle = fcl.c_smemhandle; tcl->c_ssynchandle = fcl.c_ssynchandle; tcl = tcl->c_next; } /* * Sync the total len beginning from the first chunk. */ fcl.c_len = clist_len(wrcl); status = clist_syncmem(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE); if (status != RDMA_SUCCESS) { return (FALSE); } status = RDMA_WRITE(xdrp->xp_conn, wrcl, WAIT); if (rndup_present) clist_free(wrcl); if (status != RDMA_SUCCESS) { return (FALSE); } return (TRUE); } /* * Reads one chunk at a time */ static bool_t xdrrdma_read_a_chunk(XDR *xdrs, CONN **conn) { int status; int32_t len = 0; xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); struct clist *cle = *(xdrp->xp_rcl_next); struct clist *rclp = xdrp->xp_rcl; struct clist *clp; /* * len is used later to decide xdr offset in * the chunk factoring any 4-byte XDR alignment * (See read chunk example top of this file) */ while (rclp != cle) { len += rclp->c_len; rclp = rclp->c_next; } len = RNDUP(len) - len; ASSERT(xdrs->x_handy <= 0); /* * If this is the first chunk to contain the RPC * message set xp_off to the xdr offset of the * inline message. */ if (xdrp->xp_off == 0) xdrp->xp_off = (xdrp->xp_offp - xdrs->x_base); if (cle == NULL || (cle->c_xdroff != xdrp->xp_off)) return (FALSE); /* * Make a copy of the chunk to read from client. * Chunks are read on demand, so read only one * for now. */ rclp = clist_alloc(); *rclp = *cle; rclp->c_next = NULL; xdrp->xp_rcl_next = &cle->c_next; /* * If there is a roundup present, then skip those * bytes when reading. */ if (len) { rclp->w.c_saddr = (uint64)(uintptr_t)rclp->w.c_saddr + len; rclp->c_len = rclp->c_len - len; } status = xdrrdma_read_from_client(rclp, conn, rclp->c_len); if (status == FALSE) { clist_free(rclp); return (status); } xdrp->xp_offp = rclp->rb_longbuf.addr; xdrs->x_base = xdrp->xp_offp; xdrs->x_handy = rclp->c_len; /* * This copy of read chunks containing the XDR * message is freed later in xdrrdma_destroy() */ if (xdrp->xp_rcl_xdr) { /* Add the chunk to end of the list */ clp = xdrp->xp_rcl_xdr; while (clp->c_next != NULL) clp = clp->c_next; clp->c_next = rclp; } else { xdrp->xp_rcl_xdr = rclp; } return (TRUE); } static void xdrrdma_free_xdr_chunks(CONN *conn, struct clist *xdr_rcl) { struct clist *cl; (void) clist_deregister(conn, xdr_rcl); /* * Read chunks containing parts XDR message are * special: in case of multiple chunks each has * its own buffer. */ cl = xdr_rcl; while (cl) { rdma_buf_free(conn, &cl->rb_longbuf); cl = cl->c_next; } clist_free(xdr_rcl); }