xref: /illumos-gate/usr/src/uts/common/rpc/xdr_rdma.c (revision 24da5b34f49324ed742a340010ed5bd3d4e06625)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * xdr_rdma.c, XDR implementation using RDMA to move large chunks
31  */
32 
33 #include <sys/param.h>
34 #include <sys/types.h>
35 #include <sys/systm.h>
36 #include <sys/kmem.h>
37 
38 #include <rpc/types.h>
39 #include <rpc/xdr.h>
40 #include <sys/cmn_err.h>
41 #include <rpc/rpc_sztypes.h>
42 #include <rpc/rpc_rdma.h>
43 
44 static struct xdr_ops *xdrrdma_ops(void);
45 
46 /*
47  * A chunk list entry identifies a chunk
48  * of opaque data to be moved separately
49  * from the rest of the RPC message.
50  * xp_min_chunk = 0, is a special case for ENCODING, which means
51  * do not chunk the incoming stream of data.
52  */
53 
54 struct private {
55 	caddr_t		xp_offp;
56 	int		xp_min_chunk;
57 	uint_t		xp_flags;	/* Controls setting for rdma xdr */
58 	int		xp_buf_size;		/* size of xdr buffer */
59 	struct clist	*xp_cl;			/* head of chunk list */
60 	struct clist	**xp_cl_next;	/* location to place/find next chunk */
61 	CONN		*xp_conn;	/* connection for chunk data xfer */
62 };
63 
64 
65 /*
66  * The procedure xdrrdma_create initializes a stream descriptor for a
67  * memory buffer.
68  */
69 void
70 xdrrdma_create(XDR *xdrs, caddr_t addr, uint_t size,
71 	int min_chunk, struct clist *cl, enum xdr_op op, CONN *conn)
72 {
73 	struct private *xdrp;
74 	struct clist *cle;
75 
76 	xdrs->x_op = op;
77 	xdrs->x_ops = xdrrdma_ops();
78 	xdrs->x_base = addr;
79 	xdrs->x_handy = size;
80 	xdrs->x_public = NULL;
81 
82 	xdrp = (struct private *)kmem_zalloc(sizeof (struct private), KM_SLEEP);
83 	xdrs->x_private = (caddr_t)xdrp;
84 	xdrp->xp_offp = addr;
85 	xdrp->xp_min_chunk = min_chunk;
86 	xdrp->xp_flags = 0;
87 	xdrp->xp_buf_size = size;
88 	xdrp->xp_cl = cl;
89 	if (op == XDR_ENCODE && cl != NULL) {
90 		/* Find last element in chunk list and set xp_cl_next */
91 		for (cle = cl; cle->c_next != NULL; cle = cle->c_next);
92 		xdrp->xp_cl_next = &(cle->c_next);
93 	} else
94 		xdrp->xp_cl_next = &(xdrp->xp_cl);
95 	xdrp->xp_conn = conn;
96 	if (xdrp->xp_min_chunk == 0)
97 		xdrp->xp_flags |= RDMA_NOCHUNK;
98 }
99 
100 /* ARGSUSED */
101 void
102 xdrrdma_destroy(XDR *xdrs)
103 {
104 	(void) kmem_free(xdrs->x_private, sizeof (struct private));
105 }
106 
107 struct clist *
108 xdrrdma_clist(XDR *xdrs) {
109 	return (((struct private *)(xdrs->x_private))->xp_cl);
110 }
111 
112 static bool_t
113 xdrrdma_getint32(XDR *xdrs, int32_t *int32p)
114 {
115 	struct private *xdrp = (struct private *)(xdrs->x_private);
116 
117 	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0)
118 		return (FALSE);
119 
120 	/* LINTED pointer alignment */
121 	*int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp))));
122 	xdrp->xp_offp += sizeof (int32_t);
123 
124 	return (TRUE);
125 }
126 
127 static bool_t
128 xdrrdma_putint32(XDR *xdrs, int32_t *int32p)
129 {
130 	struct private *xdrp = (struct private *)(xdrs->x_private);
131 
132 	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0)
133 		return (FALSE);
134 
135 	/* LINTED pointer alignment */
136 	*(int32_t *)xdrp->xp_offp = (int32_t)htonl((uint32_t)(*int32p));
137 	xdrp->xp_offp += sizeof (int32_t);
138 
139 	return (TRUE);
140 }
141 
142 /*
143  * DECODE some bytes from an XDR stream
144  */
145 static bool_t
146 xdrrdma_getbytes(XDR *xdrs, caddr_t addr, int len)
147 {
148 	struct private *xdrp = (struct private *)(xdrs->x_private);
149 	struct clist *cle = *(xdrp->xp_cl_next);
150 	struct clist cl;
151 	bool_t  retval = TRUE;
152 
153 	/*
154 	 * If there was a chunk at the current offset
155 	 * first record the destination address and length
156 	 * in the chunk list that came with the message, then
157 	 * RDMA READ the chunk data.
158 	 */
159 	if (cle != NULL &&
160 		cle->c_xdroff == (xdrp->xp_offp - xdrs->x_base)) {
161 		cle->c_daddr = (uint64)(uintptr_t)addr;
162 		cle->c_len  = len;
163 		xdrp->xp_cl_next = &cle->c_next;
164 
165 		/*
166 		 * RDMA READ the chunk data from the remote end.
167 		 * First prep the destination buffer by registering
168 		 * it, then RDMA READ the chunk data. Since we are
169 		 * doing streaming memory, sync the destination buffer
170 		 * to CPU and deregister the buffer.
171 		 */
172 		if (xdrp->xp_conn == NULL) {
173 			return (FALSE);
174 		}
175 
176 		cl = *cle;
177 		cl.c_next = NULL;
178 		if (clist_register(xdrp->xp_conn, &cl, 0) != RDMA_SUCCESS) {
179 			return (FALSE);
180 		}
181 
182 		/*
183 		 * Now read the chunk in
184 		 */
185 		if (RDMA_READ(xdrp->xp_conn, &cl, WAIT) != RDMA_SUCCESS) {
186 #ifdef DEBUG
187 			cmn_err(CE_WARN,
188 				"xdrrdma_getbytes: RDMA_READ failed\n");
189 #endif
190 			retval = FALSE;
191 			goto out;
192 		}
193 		/*
194 		 * sync the memory for cpu
195 		 */
196 		if (clist_syncmem(xdrp->xp_conn, &cl, 0) != RDMA_SUCCESS) {
197 			retval = FALSE;
198 			goto out;
199 		}
200 
201 out:
202 		/*
203 		 * Deregister the chunks
204 		 */
205 		(void) clist_deregister(xdrp->xp_conn, &cl, 0);
206 		return (retval);
207 	}
208 
209 	if ((xdrs->x_handy -= len) < 0)
210 		return (FALSE);
211 
212 	bcopy(xdrp->xp_offp, addr, len);
213 	xdrp->xp_offp += len;
214 
215 	return (TRUE);
216 }
217 
218 /*
219  * ENCODE some bytes into an XDR stream
220  * xp_min_chunk = 0, means the stream of bytes contain no chunks
221  * to seperate out, and if the bytes do not fit in the supplied
222  * buffer, grow the buffer and free the old buffer.
223  */
224 static bool_t
225 xdrrdma_putbytes(XDR *xdrs, caddr_t addr, int len)
226 {
227 	struct private *xdrp = (struct private *)(xdrs->x_private);
228 	struct clist *clzero = xdrp->xp_cl;
229 
230 	/*
231 	 * If this chunk meets the minimum chunk size
232 	 * then don't encode it.  Just record its address
233 	 * and length in a chunk list entry so that it
234 	 * can be moved separately via RDMA.
235 	 */
236 	if (!(xdrp->xp_flags & RDMA_NOCHUNK) && xdrp->xp_min_chunk != 0 &&
237 	    len >= xdrp->xp_min_chunk) {
238 		struct clist *cle;
239 		int offset = xdrp->xp_offp - xdrs->x_base;
240 
241 		cle = (struct clist *)kmem_zalloc(sizeof (struct clist),
242 				KM_SLEEP);
243 		cle->c_xdroff = offset;
244 		cle->c_len  = len;
245 		cle->c_saddr = (uint64)(uintptr_t)addr;
246 		cle->c_next = NULL;
247 
248 		*(xdrp->xp_cl_next) = cle;
249 		xdrp->xp_cl_next = &(cle->c_next);
250 
251 		return (TRUE);
252 	}
253 
254 	if ((xdrs->x_handy -= len) < 0) {
255 		if (xdrp->xp_min_chunk == 0) {
256 			int  newbuflen, encodelen;
257 			caddr_t newbuf;
258 
259 			xdrs->x_handy += len;
260 			encodelen = xdrp->xp_offp - xdrs->x_base;
261 			newbuflen = xdrp->xp_buf_size + len;
262 			newbuf = kmem_zalloc(newbuflen, KM_SLEEP);
263 			bcopy(xdrs->x_base, newbuf, encodelen);
264 			(void) kmem_free(xdrs->x_base, xdrp->xp_buf_size);
265 			xdrs->x_base = newbuf;
266 			xdrp->xp_offp = newbuf + encodelen;
267 			xdrp->xp_buf_size = newbuflen;
268 			if (xdrp->xp_min_chunk == 0 && clzero->c_xdroff == 0) {
269 				clzero->c_len = newbuflen;
270 				clzero->c_saddr = (uint64)(uintptr_t)newbuf;
271 			}
272 		} else
273 			return (FALSE);
274 	}
275 
276 	bcopy(addr, xdrp->xp_offp, len);
277 	xdrp->xp_offp += len;
278 
279 	return (TRUE);
280 }
281 
282 uint_t
283 xdrrdma_getpos(XDR *xdrs)
284 {
285 	struct private *xdrp = (struct private *)(xdrs->x_private);
286 
287 	return ((uint_t)((uintptr_t)xdrp->xp_offp - (uintptr_t)xdrs->x_base));
288 }
289 
290 bool_t
291 xdrrdma_setpos(XDR *xdrs, uint_t pos)
292 {
293 	struct private *xdrp = (struct private *)(xdrs->x_private);
294 
295 	caddr_t newaddr = xdrs->x_base + pos;
296 	caddr_t lastaddr = xdrp->xp_offp + xdrs->x_handy;
297 	ptrdiff_t diff;
298 
299 	if (newaddr > lastaddr)
300 		return (FALSE);
301 
302 	xdrp->xp_offp = newaddr;
303 	diff = lastaddr - newaddr;
304 	xdrs->x_handy = (int)diff;
305 
306 	return (TRUE);
307 }
308 
309 /* ARGSUSED */
310 static rpc_inline_t *
311 xdrrdma_inline(XDR *xdrs, int len)
312 {
313 	rpc_inline_t *buf = NULL;
314 	struct private *xdrp = (struct private *)(xdrs->x_private);
315 	struct clist *cle = *(xdrp->xp_cl_next);
316 
317 	if (xdrs->x_op == XDR_DECODE) {
318 		/*
319 		 * Since chunks aren't in-line, check to see whether
320 		 * there is a chunk in the inline range.
321 		 */
322 		if (cle != NULL &&
323 			cle->c_xdroff <= (xdrp->xp_offp - xdrs->x_base + len))
324 		return (NULL);
325 	}
326 
327 	if ((xdrs->x_handy < len) || (xdrp->xp_min_chunk != 0 &&
328 	    len >= xdrp->xp_min_chunk)) {
329 		return (NULL);
330 	} else {
331 		xdrs->x_handy -= len;
332 		/* LINTED pointer alignment */
333 		buf = (rpc_inline_t *)xdrp->xp_offp;
334 		xdrp->xp_offp += len;
335 		return (buf);
336 	}
337 }
338 
339 static bool_t
340 xdrrdma_control(XDR *xdrs, int request, void *info)
341 {
342 	int32_t *int32p;
343 	int len;
344 	uint_t in_flags;
345 	struct private *xdrp = (struct private *)(xdrs->x_private);
346 
347 	switch (request) {
348 	case XDR_PEEK:
349 		/*
350 		 * Return the next 4 byte unit in the XDR stream.
351 		 */
352 		if (xdrs->x_handy < sizeof (int32_t))
353 			return (FALSE);
354 
355 		int32p = (int32_t *)info;
356 		*int32p = (int32_t)ntohl((uint32_t)
357 		    (*((int32_t *)(xdrp->xp_offp))));
358 
359 		return (TRUE);
360 
361 	case XDR_SKIPBYTES:
362 		/*
363 		 * Skip the next N bytes in the XDR stream.
364 		 */
365 		int32p = (int32_t *)info;
366 		len = RNDUP((int)(*int32p));
367 		if ((xdrs->x_handy -= len) < 0)
368 			return (FALSE);
369 		xdrp->xp_offp += len;
370 
371 		return (TRUE);
372 
373 	case XDR_RDMASET:
374 		/*
375 		 * Set the flags provided in the *info in xp_flags for rdma xdr
376 		 * stream control.
377 		 */
378 		int32p = (int32_t *)info;
379 		in_flags = (uint_t)(*int32p);
380 
381 		xdrp->xp_flags |= in_flags;
382 		return (TRUE);
383 
384 	case XDR_RDMAGET:
385 		/*
386 		 * Get the flags provided in xp_flags return through *info
387 		 */
388 		int32p = (int32_t *)info;
389 
390 		*int32p = (int32_t)xdrp->xp_flags;
391 		return (TRUE);
392 
393 	default:
394 		return (FALSE);
395 	}
396 }
397 
398 static struct xdr_ops *
399 xdrrdma_ops(void)
400 {
401 	static struct xdr_ops ops;
402 
403 	if (ops.x_getint32 == NULL) {
404 		ops.x_getbytes = xdrrdma_getbytes;
405 		ops.x_putbytes = xdrrdma_putbytes;
406 		ops.x_getpostn = xdrrdma_getpos;
407 		ops.x_setpostn = xdrrdma_setpos;
408 		ops.x_inline = xdrrdma_inline;
409 		ops.x_destroy = xdrrdma_destroy;
410 		ops.x_control = xdrrdma_control;
411 		ops.x_getint32 = xdrrdma_getint32;
412 		ops.x_putint32 = xdrrdma_putint32;
413 	}
414 	return (&ops);
415 }
416 
417 /*
418  * Not all fields in struct clist are interesting to the
419  * RPC over RDMA protocol. Only XDR the interesting fields.
420  */
421 bool_t
422 xdr_clist(XDR *xdrs, clist *objp)
423 {
424 
425 	if (!xdr_uint32(xdrs, &objp->c_xdroff))
426 		return (FALSE);
427 	if (!xdr_uint32(xdrs, &objp->c_len))
428 		return (FALSE);
429 	if (!xdr_uint32(xdrs, &objp->c_smemhandle.mrc_rmr))
430 		return (FALSE);
431 	if (!xdr_uint64(xdrs, &objp->c_saddr))
432 		return (FALSE);
433 	if (!xdr_pointer(xdrs, (char **)&objp->c_next, sizeof (clist),
434 		(xdrproc_t)xdr_clist))
435 		return (FALSE);
436 	return (TRUE);
437 }
438 
439 bool_t
440 xdr_do_clist(XDR *xdrs, clist **clp)
441 {
442 	return (xdr_pointer(xdrs, (char **)clp,
443 		sizeof (clist), (xdrproc_t)xdr_clist));
444 }
445 
446 uint_t
447 xdr_getbufsize(XDR *xdrs)
448 {
449 	struct private *xdrp = (struct private *)(xdrs->x_private);
450 
451 	return ((uint_t)xdrp->xp_buf_size);
452 }
453