1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 /* 27 * Copyright (c) 2007, The Ohio State University. All rights reserved. 28 * 29 * Portions of this source code is developed by the team members of 30 * The Ohio State University's Network-Based Computing Laboratory (NBCL), 31 * headed by Professor Dhabaleswar K. (DK) Panda. 32 * 33 * Acknowledgements to contributions from developors: 34 * Ranjit Noronha: noronha@cse.ohio-state.edu 35 * Lei Chai : chail@cse.ohio-state.edu 36 * Weikuan Yu : yuw@cse.ohio-state.edu 37 * 38 */ 39 40 /* 41 * xdr_rdma.c, XDR implementation using RDMA to move large chunks 42 */ 43 44 #include <sys/param.h> 45 #include <sys/types.h> 46 #include <sys/systm.h> 47 #include <sys/kmem.h> 48 #include <sys/sdt.h> 49 #include <sys/debug.h> 50 51 #include <rpc/types.h> 52 #include <rpc/xdr.h> 53 #include <sys/cmn_err.h> 54 #include <rpc/rpc_sztypes.h> 55 #include <rpc/rpc_rdma.h> 56 #include <sys/sysmacros.h> 57 58 static bool_t xdrrdma_getint32(XDR *, int32_t *); 59 static bool_t xdrrdma_putint32(XDR *, int32_t *); 60 static bool_t xdrrdma_getbytes(XDR *, caddr_t, int); 61 static bool_t xdrrdma_putbytes(XDR *, caddr_t, int); 62 uint_t xdrrdma_getpos(XDR *); 63 bool_t xdrrdma_setpos(XDR *, uint_t); 64 static rpc_inline_t *xdrrdma_inline(XDR *, int); 65 void xdrrdma_destroy(XDR *); 66 static bool_t xdrrdma_control(XDR *, int, void *); 67 static bool_t xdrrdma_read_a_chunk(XDR *, CONN **); 68 static void xdrrdma_free_xdr_chunks(CONN *, struct clist *); 69 70 struct xdr_ops xdrrdmablk_ops = { 71 xdrrdma_getbytes, 72 xdrrdma_putbytes, 73 xdrrdma_getpos, 74 xdrrdma_setpos, 75 xdrrdma_inline, 76 xdrrdma_destroy, 77 xdrrdma_control, 78 xdrrdma_getint32, 79 xdrrdma_putint32 80 }; 81 82 struct xdr_ops xdrrdma_ops = { 83 xdrrdma_getbytes, 84 xdrrdma_putbytes, 85 xdrrdma_getpos, 86 xdrrdma_setpos, 87 xdrrdma_inline, 88 xdrrdma_destroy, 89 xdrrdma_control, 90 xdrrdma_getint32, 91 xdrrdma_putint32 92 }; 93 94 /* 95 * A chunk list entry identifies a chunk of opaque data to be moved 96 * separately from the rest of the RPC message. xp_min_chunk = 0, is a 97 * special case for ENCODING, which means do not chunk the incoming stream of 98 * data. 99 * 100 * A read chunk can contain part of the RPC message in addition to the 101 * inline message. In such a case, (xp_offp - x_base) will not provide 102 * the correct xdr offset of the entire message. xp_off is used in such 103 * a case to denote the offset or current position in the overall message 104 * covering both the inline and the chunk. This is used only in the case 105 * of decoding and useful to compare read chunk 'c_xdroff' offsets. 106 * 107 * An example for a read chunk containing an XDR message: 108 * An NFSv4 compound as following: 109 * 110 * PUTFH 111 * WRITE [4109 bytes] 112 * GETATTR 113 * 114 * Solaris Encoding is: 115 * ------------------- 116 * 117 * <Inline message>: [PUTFH WRITE4args GETATTR] 118 * | 119 * v 120 * [RDMA_READ chunks]: [write data] 121 * 122 * 123 * Linux encoding is: 124 * ----------------- 125 * 126 * <Inline message>: [PUTFH WRITE4args] 127 * | 128 * v 129 * [RDMA_READ chunks]: [Write data] [Write data2] [Getattr chunk] 130 * chunk1 chunk2 chunk3 131 * 132 * where the READ chunks are as: 133 * 134 * - chunk1 - 4k 135 * write data | 136 * - chunk2 - 13 bytes(4109 - 4k) 137 * getattr op - chunk3 - 19 bytes 138 * (getattr op starts at byte 4 after 3 bytes of roundup) 139 * 140 */ 141 142 typedef struct { 143 caddr_t xp_offp; 144 int xp_min_chunk; 145 uint_t xp_flags; /* Controls setting for rdma xdr */ 146 int xp_buf_size; /* size of xdr buffer */ 147 int xp_off; /* overall offset */ 148 struct clist *xp_rcl; /* head of chunk list */ 149 struct clist **xp_rcl_next; /* location to place/find next chunk */ 150 struct clist *xp_rcl_xdr; /* copy of rcl containing RPC message */ 151 struct clist *xp_wcl; /* head of write chunk list */ 152 CONN *xp_conn; /* connection for chunk data xfer */ 153 uint_t xp_reply_chunk_len; 154 /* used to track length for security modes: integrity/privacy */ 155 uint_t xp_reply_chunk_len_alt; 156 } xrdma_private_t; 157 158 extern kmem_cache_t *clist_cache; 159 160 bool_t 161 xdrrdma_getrdmablk(XDR *xdrs, struct clist **rlist, uint_t *sizep, 162 CONN **conn, const uint_t maxsize) 163 { 164 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 165 struct clist *cle = *(xdrp->xp_rcl_next); 166 struct clist *rdclist = NULL, *prev = NULL; 167 bool_t retval = TRUE; 168 uint32_t cur_offset = 0; 169 uint32_t total_segments = 0; 170 uint32_t actual_segments = 0; 171 uint32_t alen; 172 uint_t total_len; 173 174 ASSERT(xdrs->x_op != XDR_FREE); 175 176 /* 177 * first deal with the length since xdr bytes are counted 178 */ 179 if (!xdr_u_int(xdrs, sizep)) { 180 DTRACE_PROBE(xdr__e__getrdmablk_sizep_fail); 181 return (FALSE); 182 } 183 total_len = *sizep; 184 if (total_len > maxsize) { 185 DTRACE_PROBE2(xdr__e__getrdmablk_bad_size, 186 int, total_len, int, maxsize); 187 return (FALSE); 188 } 189 (*conn) = xdrp->xp_conn; 190 191 /* 192 * if no data we are done 193 */ 194 if (total_len == 0) 195 return (TRUE); 196 197 while (cle) { 198 total_segments++; 199 cle = cle->c_next; 200 } 201 202 cle = *(xdrp->xp_rcl_next); 203 204 /* 205 * If there was a chunk at the current offset, then setup a read 206 * chunk list which records the destination address and length 207 * and will RDMA READ the data in later. 208 */ 209 if (cle == NULL) 210 return (FALSE); 211 212 if (cle->c_xdroff != (xdrp->xp_offp - xdrs->x_base)) 213 return (FALSE); 214 215 /* 216 * Setup the chunk list with appropriate 217 * address (offset) and length 218 */ 219 for (actual_segments = 0; 220 actual_segments < total_segments; actual_segments++) { 221 222 DTRACE_PROBE3(krpc__i__xdrrdma_getrdmablk, uint32_t, cle->c_len, 223 uint32_t, total_len, uint32_t, cle->c_xdroff); 224 225 if (total_len <= 0) 226 break; 227 228 /* 229 * not the first time in the loop 230 */ 231 if (actual_segments > 0) 232 cle = cle->c_next; 233 234 cle->u.c_daddr = (uint64) cur_offset; 235 alen = 0; 236 if (cle->c_len > total_len) { 237 alen = cle->c_len; 238 cle->c_len = total_len; 239 } 240 if (!alen) 241 xdrp->xp_rcl_next = &cle->c_next; 242 243 cur_offset += cle->c_len; 244 total_len -= cle->c_len; 245 246 if ((total_segments - actual_segments - 1) == 0 && 247 total_len > 0) { 248 DTRACE_PROBE(krpc__e__xdrrdma_getblk_chunktooshort); 249 retval = FALSE; 250 } 251 252 if ((total_segments - actual_segments - 1) > 0 && 253 total_len == 0) { 254 DTRACE_PROBE2(krpc__e__xdrrdma_getblk_toobig, 255 int, total_segments, int, actual_segments); 256 } 257 258 rdclist = clist_alloc(); 259 (*rdclist) = (*cle); 260 if ((*rlist) == NULL) 261 (*rlist) = rdclist; 262 if (prev == NULL) 263 prev = rdclist; 264 else { 265 prev->c_next = rdclist; 266 prev = rdclist; 267 } 268 269 } 270 271 out: 272 if (prev != NULL) 273 prev->c_next = NULL; 274 275 /* 276 * Adjust the chunk length, if we read only a part of 277 * a chunk. 278 */ 279 280 if (alen) { 281 cle->w.c_saddr = 282 (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len; 283 cle->c_len = alen - cle->c_len; 284 } 285 286 return (retval); 287 } 288 289 /* 290 * The procedure xdrrdma_create initializes a stream descriptor for a memory 291 * buffer. 292 */ 293 void 294 xdrrdma_create(XDR *xdrs, caddr_t addr, uint_t size, 295 int min_chunk, struct clist *cl, enum xdr_op op, CONN *conn) 296 { 297 xrdma_private_t *xdrp; 298 struct clist *cle; 299 300 xdrs->x_op = op; 301 xdrs->x_ops = &xdrrdma_ops; 302 xdrs->x_base = addr; 303 xdrs->x_handy = size; 304 xdrs->x_public = NULL; 305 306 xdrp = (xrdma_private_t *)kmem_zalloc(sizeof (xrdma_private_t), 307 KM_SLEEP); 308 xdrs->x_private = (caddr_t)xdrp; 309 xdrp->xp_offp = addr; 310 xdrp->xp_min_chunk = min_chunk; 311 xdrp->xp_flags = 0; 312 xdrp->xp_buf_size = size; 313 xdrp->xp_rcl = cl; 314 xdrp->xp_reply_chunk_len = 0; 315 xdrp->xp_reply_chunk_len_alt = 0; 316 317 if (op == XDR_ENCODE && cl != NULL) { 318 /* Find last element in chunk list and set xp_rcl_next */ 319 for (cle = cl; cle->c_next != NULL; cle = cle->c_next) 320 continue; 321 322 xdrp->xp_rcl_next = &(cle->c_next); 323 } else { 324 xdrp->xp_rcl_next = &(xdrp->xp_rcl); 325 } 326 327 xdrp->xp_wcl = NULL; 328 329 xdrp->xp_conn = conn; 330 if (xdrp->xp_min_chunk != 0) 331 xdrp->xp_flags |= XDR_RDMA_CHUNK; 332 } 333 334 /* ARGSUSED */ 335 void 336 xdrrdma_destroy(XDR * xdrs) 337 { 338 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 339 340 if (xdrp == NULL) 341 return; 342 343 if (xdrp->xp_wcl) { 344 if (xdrp->xp_flags & XDR_RDMA_WLIST_REG) { 345 (void) clist_deregister(xdrp->xp_conn, xdrp->xp_wcl); 346 rdma_buf_free(xdrp->xp_conn, 347 &xdrp->xp_wcl->rb_longbuf); 348 } 349 clist_free(xdrp->xp_wcl); 350 } 351 352 if (xdrp->xp_rcl) { 353 if (xdrp->xp_flags & XDR_RDMA_RLIST_REG) { 354 (void) clist_deregister(xdrp->xp_conn, xdrp->xp_rcl); 355 rdma_buf_free(xdrp->xp_conn, 356 &xdrp->xp_rcl->rb_longbuf); 357 } 358 clist_free(xdrp->xp_rcl); 359 } 360 361 if (xdrp->xp_rcl_xdr) 362 xdrrdma_free_xdr_chunks(xdrp->xp_conn, xdrp->xp_rcl_xdr); 363 364 (void) kmem_free(xdrs->x_private, sizeof (xrdma_private_t)); 365 xdrs->x_private = NULL; 366 } 367 368 static bool_t 369 xdrrdma_getint32(XDR *xdrs, int32_t *int32p) 370 { 371 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 372 int chunked = 0; 373 374 if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) { 375 /* 376 * check if rest of the rpc message is in a chunk 377 */ 378 if (!xdrrdma_read_a_chunk(xdrs, &xdrp->xp_conn)) { 379 return (FALSE); 380 } 381 chunked = 1; 382 } 383 384 /* LINTED pointer alignment */ 385 *int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp)))); 386 387 DTRACE_PROBE1(krpc__i__xdrrdma_getint32, int32_t, *int32p); 388 389 xdrp->xp_offp += sizeof (int32_t); 390 391 if (chunked) 392 xdrs->x_handy -= (int)sizeof (int32_t); 393 394 if (xdrp->xp_off != 0) { 395 xdrp->xp_off += sizeof (int32_t); 396 } 397 398 return (TRUE); 399 } 400 401 static bool_t 402 xdrrdma_putint32(XDR *xdrs, int32_t *int32p) 403 { 404 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 405 406 if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) 407 return (FALSE); 408 409 /* LINTED pointer alignment */ 410 *(int32_t *)xdrp->xp_offp = (int32_t)htonl((uint32_t)(*int32p)); 411 xdrp->xp_offp += sizeof (int32_t); 412 413 return (TRUE); 414 } 415 416 /* 417 * DECODE bytes from XDR stream for rdma. 418 * If the XDR stream contains a read chunk list, 419 * it will go through xdrrdma_getrdmablk instead. 420 */ 421 static bool_t 422 xdrrdma_getbytes(XDR *xdrs, caddr_t addr, int len) 423 { 424 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 425 struct clist *cle = *(xdrp->xp_rcl_next); 426 struct clist *cls = *(xdrp->xp_rcl_next); 427 struct clist cl; 428 bool_t retval = TRUE; 429 uint32_t total_len = len; 430 uint32_t cur_offset = 0; 431 uint32_t total_segments = 0; 432 uint32_t actual_segments = 0; 433 uint32_t status; 434 uint32_t alen; 435 uint32_t xpoff; 436 437 while (cle) { 438 total_segments++; 439 cle = cle->c_next; 440 } 441 442 cle = *(xdrp->xp_rcl_next); 443 444 if (xdrp->xp_off) { 445 xpoff = xdrp->xp_off; 446 } else { 447 xpoff = (xdrp->xp_offp - xdrs->x_base); 448 } 449 450 /* 451 * If there was a chunk at the current offset, then setup a read 452 * chunk list which records the destination address and length 453 * and will RDMA READ the data in later. 454 */ 455 456 if (cle != NULL && cle->c_xdroff == xpoff) { 457 for (actual_segments = 0; 458 actual_segments < total_segments; actual_segments++) { 459 if (total_len <= 0) 460 break; 461 cle->u.c_daddr = (uint64)(uintptr_t)addr + cur_offset; 462 alen = 0; 463 if (cle->c_len > total_len) { 464 alen = cle->c_len; 465 cle->c_len = total_len; 466 } 467 if (!alen) 468 xdrp->xp_rcl_next = &cle->c_next; 469 470 cur_offset += cle->c_len; 471 total_len -= cle->c_len; 472 473 if ((total_segments - actual_segments - 1) == 0 && 474 total_len > 0) { 475 DTRACE_PROBE( 476 krpc__e__xdrrdma_getbytes_chunktooshort); 477 retval = FALSE; 478 } 479 480 if ((total_segments - actual_segments - 1) > 0 && 481 total_len == 0) { 482 DTRACE_PROBE2(krpc__e__xdrrdma_getbytes_toobig, 483 int, total_segments, int, actual_segments); 484 } 485 486 /* 487 * RDMA READ the chunk data from the remote end. 488 * First prep the destination buffer by registering 489 * it, then RDMA READ the chunk data. Since we are 490 * doing streaming memory, sync the destination 491 * buffer to CPU and deregister the buffer. 492 */ 493 if (xdrp->xp_conn == NULL) { 494 return (FALSE); 495 } 496 cl = *cle; 497 cl.c_next = NULL; 498 if (clist_register(xdrp->xp_conn, &cl, CLIST_REG_DST) 499 != RDMA_SUCCESS) { 500 return (FALSE); 501 } 502 cle->c_dmemhandle = cl.c_dmemhandle; 503 cle->c_dsynchandle = cl.c_dsynchandle; 504 505 /* 506 * Now read the chunk in 507 */ 508 if ((total_segments - actual_segments - 1) == 0 || 509 total_len == 0) { 510 status = RDMA_READ(xdrp->xp_conn, &cl, WAIT); 511 } else { 512 status = RDMA_READ(xdrp->xp_conn, &cl, NOWAIT); 513 } 514 if (status != RDMA_SUCCESS) { 515 DTRACE_PROBE1( 516 krpc__i__xdrrdma_getblk_readfailed, 517 int, status); 518 retval = FALSE; 519 goto out; 520 } 521 cle = cle->c_next; 522 } 523 524 /* 525 * sync the memory for cpu 526 */ 527 cl = *cls; 528 cl.c_next = NULL; 529 cl.c_len = cur_offset; 530 if (clist_syncmem( 531 xdrp->xp_conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 532 retval = FALSE; 533 } 534 out: 535 /* 536 * Deregister the chunks 537 */ 538 cle = cls; 539 cl = *cle; 540 cl.c_next = NULL; 541 cl.c_len = cur_offset; 542 (void) clist_deregister(xdrp->xp_conn, &cl); 543 if (alen) { 544 cle->w.c_saddr = 545 (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len; 546 cle->c_len = alen - cle->c_len; 547 } 548 return (retval); 549 } 550 551 if ((xdrs->x_handy -= len) < 0) 552 return (FALSE); 553 554 bcopy(xdrp->xp_offp, addr, len); 555 556 xdrp->xp_offp += len; 557 558 if (xdrp->xp_off != 0) 559 xdrp->xp_off += len; 560 561 return (TRUE); 562 } 563 564 /* 565 * ENCODE some bytes into an XDR stream xp_min_chunk = 0, means the stream of 566 * bytes contain no chunks to seperate out, and if the bytes do not fit in 567 * the supplied buffer, grow the buffer and free the old buffer. 568 */ 569 static bool_t 570 xdrrdma_putbytes(XDR *xdrs, caddr_t addr, int len) 571 { 572 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 573 /* 574 * Is this stream accepting chunks? 575 * If so, does the either of the two following conditions exist? 576 * - length of bytes to encode is greater than the min chunk size? 577 * - remaining space in this stream is shorter than length of 578 * bytes to encode? 579 * 580 * If the above exists, then create a chunk for this encoding 581 * and save the addresses, etc. 582 */ 583 if (xdrp->xp_flags & XDR_RDMA_CHUNK && 584 ((xdrp->xp_min_chunk != 0 && 585 len >= xdrp->xp_min_chunk) || 586 (xdrs->x_handy - len < 0))) { 587 struct clist *cle; 588 int offset = xdrp->xp_offp - xdrs->x_base; 589 590 cle = clist_alloc(); 591 cle->c_xdroff = offset; 592 cle->c_len = len; 593 cle->w.c_saddr = (uint64)(uintptr_t)addr; 594 cle->c_next = NULL; 595 596 *(xdrp->xp_rcl_next) = cle; 597 xdrp->xp_rcl_next = &(cle->c_next); 598 599 return (TRUE); 600 } 601 /* Is there enough space to encode what is left? */ 602 if ((xdrs->x_handy -= len) < 0) { 603 return (FALSE); 604 } 605 bcopy(addr, xdrp->xp_offp, len); 606 xdrp->xp_offp += len; 607 608 return (TRUE); 609 } 610 611 uint_t 612 xdrrdma_getpos(XDR *xdrs) 613 { 614 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 615 616 return ((uint_t)((uintptr_t)xdrp->xp_offp - (uintptr_t)xdrs->x_base)); 617 } 618 619 bool_t 620 xdrrdma_setpos(XDR *xdrs, uint_t pos) 621 { 622 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 623 624 caddr_t newaddr = xdrs->x_base + pos; 625 caddr_t lastaddr = xdrp->xp_offp + xdrs->x_handy; 626 ptrdiff_t diff; 627 628 if (newaddr > lastaddr) 629 return (FALSE); 630 631 xdrp->xp_offp = newaddr; 632 diff = lastaddr - newaddr; 633 xdrs->x_handy = (int)diff; 634 635 return (TRUE); 636 } 637 638 /* ARGSUSED */ 639 static rpc_inline_t * 640 xdrrdma_inline(XDR *xdrs, int len) 641 { 642 rpc_inline_t *buf = NULL; 643 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 644 struct clist *cle = *(xdrp->xp_rcl_next); 645 646 if (xdrs->x_op == XDR_DECODE) { 647 /* 648 * Since chunks aren't in-line, check to see whether there is 649 * a chunk in the inline range. 650 */ 651 if (cle != NULL && 652 cle->c_xdroff <= (xdrp->xp_offp - xdrs->x_base + len)) 653 return (NULL); 654 } 655 656 /* LINTED pointer alignment */ 657 buf = (rpc_inline_t *)xdrp->xp_offp; 658 if (!IS_P2ALIGNED(buf, sizeof (int32_t))) 659 return (NULL); 660 661 if ((xdrs->x_handy < len) || (xdrp->xp_min_chunk != 0 && 662 len >= xdrp->xp_min_chunk)) { 663 return (NULL); 664 } else { 665 xdrs->x_handy -= len; 666 xdrp->xp_offp += len; 667 return (buf); 668 } 669 } 670 671 static bool_t 672 xdrrdma_control(XDR *xdrs, int request, void *info) 673 { 674 int32_t *int32p; 675 int len, i; 676 uint_t in_flags; 677 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 678 rdma_chunkinfo_t *rcip = NULL; 679 rdma_wlist_conn_info_t *rwcip = NULL; 680 rdma_chunkinfo_lengths_t *rcilp = NULL; 681 struct uio *uiop; 682 struct clist *rwl = NULL; 683 struct clist *prev = NULL; 684 685 switch (request) { 686 case XDR_PEEK: 687 /* 688 * Return the next 4 byte unit in the XDR stream. 689 */ 690 if (xdrs->x_handy < sizeof (int32_t)) 691 return (FALSE); 692 693 int32p = (int32_t *)info; 694 *int32p = (int32_t)ntohl((uint32_t) 695 (*((int32_t *)(xdrp->xp_offp)))); 696 697 return (TRUE); 698 699 case XDR_SKIPBYTES: 700 /* 701 * Skip the next N bytes in the XDR stream. 702 */ 703 int32p = (int32_t *)info; 704 len = RNDUP((int)(*int32p)); 705 if ((xdrs->x_handy -= len) < 0) 706 return (FALSE); 707 xdrp->xp_offp += len; 708 709 return (TRUE); 710 711 case XDR_RDMA_SET_FLAGS: 712 /* 713 * Set the flags provided in the *info in xp_flags for rdma 714 * xdr stream control. 715 */ 716 int32p = (int32_t *)info; 717 in_flags = (uint_t)(*int32p); 718 719 xdrp->xp_flags |= in_flags; 720 return (TRUE); 721 722 case XDR_RDMA_GET_FLAGS: 723 /* 724 * Get the flags provided in xp_flags return through *info 725 */ 726 int32p = (int32_t *)info; 727 728 *int32p = (int32_t)xdrp->xp_flags; 729 return (TRUE); 730 731 case XDR_RDMA_GET_CHUNK_LEN: 732 rcilp = (rdma_chunkinfo_lengths_t *)info; 733 rcilp->rcil_len = xdrp->xp_reply_chunk_len; 734 rcilp->rcil_len_alt = xdrp->xp_reply_chunk_len_alt; 735 736 return (TRUE); 737 738 case XDR_RDMA_ADD_CHUNK: 739 /* 740 * Store wlist information 741 */ 742 743 rcip = (rdma_chunkinfo_t *)info; 744 745 switch (rcip->rci_type) { 746 case RCI_WRITE_UIO_CHUNK: 747 xdrp->xp_reply_chunk_len_alt += rcip->rci_len; 748 749 if (rcip->rci_len < xdrp->xp_min_chunk) { 750 xdrp->xp_wcl = NULL; 751 *(rcip->rci_clpp) = NULL; 752 return (TRUE); 753 } 754 uiop = rcip->rci_a.rci_uiop; 755 756 for (i = 0; i < uiop->uio_iovcnt; i++) { 757 rwl = clist_alloc(); 758 rwl->c_len = uiop->uio_iov[i].iov_len; 759 rwl->u.c_daddr = 760 (uint64)(uintptr_t) 761 (uiop->uio_iov[i].iov_base); 762 /* 763 * if userspace address, put adspace ptr in 764 * clist. If not, then do nothing since it's 765 * already set to NULL (from kmem_zalloc) 766 */ 767 if (uiop->uio_segflg == UIO_USERSPACE) { 768 rwl->c_adspc = ttoproc(curthread)->p_as; 769 } 770 771 if (prev == NULL) 772 prev = rwl; 773 else { 774 prev->c_next = rwl; 775 prev = rwl; 776 } 777 } 778 779 rwl->c_next = NULL; 780 xdrp->xp_wcl = rwl; 781 *(rcip->rci_clpp) = rwl; 782 783 break; 784 785 case RCI_WRITE_ADDR_CHUNK: 786 rwl = clist_alloc(); 787 788 rwl->c_len = rcip->rci_len; 789 rwl->u.c_daddr3 = rcip->rci_a.rci_addr; 790 rwl->c_next = NULL; 791 xdrp->xp_reply_chunk_len_alt += rcip->rci_len; 792 793 xdrp->xp_wcl = rwl; 794 *(rcip->rci_clpp) = rwl; 795 796 break; 797 798 case RCI_REPLY_CHUNK: 799 xdrp->xp_reply_chunk_len += rcip->rci_len; 800 break; 801 } 802 return (TRUE); 803 804 case XDR_RDMA_GET_WLIST: 805 *((struct clist **)info) = xdrp->xp_wcl; 806 return (TRUE); 807 808 case XDR_RDMA_SET_WLIST: 809 xdrp->xp_wcl = (struct clist *)info; 810 return (TRUE); 811 812 case XDR_RDMA_GET_RLIST: 813 *((struct clist **)info) = xdrp->xp_rcl; 814 return (TRUE); 815 816 case XDR_RDMA_GET_WCINFO: 817 rwcip = (rdma_wlist_conn_info_t *)info; 818 819 rwcip->rwci_wlist = xdrp->xp_wcl; 820 rwcip->rwci_conn = xdrp->xp_conn; 821 822 return (TRUE); 823 824 default: 825 return (FALSE); 826 } 827 } 828 829 bool_t xdr_do_clist(XDR *, clist **); 830 831 /* 832 * Not all fields in struct clist are interesting to the RPC over RDMA 833 * protocol. Only XDR the interesting fields. 834 */ 835 bool_t 836 xdr_clist(XDR *xdrs, clist *objp) 837 { 838 if (!xdr_uint32(xdrs, &objp->c_xdroff)) 839 return (FALSE); 840 if (!xdr_uint32(xdrs, &objp->c_smemhandle.mrc_rmr)) 841 return (FALSE); 842 if (!xdr_uint32(xdrs, &objp->c_len)) 843 return (FALSE); 844 if (!xdr_uint64(xdrs, &objp->w.c_saddr)) 845 return (FALSE); 846 if (!xdr_do_clist(xdrs, &objp->c_next)) 847 return (FALSE); 848 return (TRUE); 849 } 850 851 /* 852 * The following two functions are forms of xdr_pointer() 853 * and xdr_reference(). Since the generic versions just 854 * kmem_alloc() a new clist, we actually want to use the 855 * rdma_clist kmem_cache. 856 */ 857 858 /* 859 * Generate or free a clist structure from the 860 * kmem_cache "rdma_clist" 861 */ 862 bool_t 863 xdr_ref_clist(XDR *xdrs, caddr_t *pp) 864 { 865 caddr_t loc = *pp; 866 bool_t stat; 867 868 if (loc == NULL) { 869 switch (xdrs->x_op) { 870 case XDR_FREE: 871 return (TRUE); 872 873 case XDR_DECODE: 874 *pp = loc = (caddr_t)clist_alloc(); 875 break; 876 877 case XDR_ENCODE: 878 ASSERT(loc); 879 break; 880 } 881 } 882 883 stat = xdr_clist(xdrs, (struct clist *)loc); 884 885 if (xdrs->x_op == XDR_FREE) { 886 kmem_cache_free(clist_cache, loc); 887 *pp = NULL; 888 } 889 return (stat); 890 } 891 892 /* 893 * XDR a pointer to a possibly recursive clist. This differs 894 * with xdr_reference in that it can serialize/deserialiaze 895 * trees correctly. 896 * 897 * What is sent is actually a union: 898 * 899 * union object_pointer switch (boolean b) { 900 * case TRUE: object_data data; 901 * case FALSE: void nothing; 902 * } 903 * 904 * > objpp: Pointer to the pointer to the object. 905 * 906 */ 907 908 bool_t 909 xdr_do_clist(XDR *xdrs, clist **objpp) 910 { 911 bool_t more_data; 912 913 more_data = (*objpp != NULL); 914 if (!xdr_bool(xdrs, &more_data)) 915 return (FALSE); 916 if (!more_data) { 917 *objpp = NULL; 918 return (TRUE); 919 } 920 return (xdr_ref_clist(xdrs, (caddr_t *)objpp)); 921 } 922 923 uint_t 924 xdr_getbufsize(XDR *xdrs) 925 { 926 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 927 928 return ((uint_t)xdrp->xp_buf_size); 929 } 930 931 /* ARGSUSED */ 932 bool_t 933 xdr_encode_rlist_svc(XDR *xdrs, clist *rlist) 934 { 935 bool_t vfalse = FALSE; 936 937 ASSERT(rlist == NULL); 938 return (xdr_bool(xdrs, &vfalse)); 939 } 940 941 bool_t 942 xdr_encode_wlist(XDR *xdrs, clist *w) 943 { 944 bool_t vfalse = FALSE, vtrue = TRUE; 945 int i; 946 uint_t num_segment = 0; 947 struct clist *cl; 948 949 /* does a wlist exist? */ 950 if (w == NULL) { 951 return (xdr_bool(xdrs, &vfalse)); 952 } 953 /* Encode N consecutive segments, 1, N, HLOO, ..., HLOO, 0 */ 954 if (!xdr_bool(xdrs, &vtrue)) 955 return (FALSE); 956 957 for (cl = w; cl != NULL; cl = cl->c_next) { 958 num_segment++; 959 } 960 961 if (!xdr_uint32(xdrs, &num_segment)) 962 return (FALSE); 963 for (i = 0; i < num_segment; i++) { 964 965 DTRACE_PROBE1(krpc__i__xdr_encode_wlist_len, uint_t, w->c_len); 966 967 if (!xdr_uint32(xdrs, &w->c_dmemhandle.mrc_rmr)) 968 return (FALSE); 969 970 if (!xdr_uint32(xdrs, &w->c_len)) 971 return (FALSE); 972 973 if (!xdr_uint64(xdrs, &w->u.c_daddr)) 974 return (FALSE); 975 976 w = w->c_next; 977 } 978 979 if (!xdr_bool(xdrs, &vfalse)) 980 return (FALSE); 981 982 return (TRUE); 983 } 984 985 986 /* 987 * Conditionally decode a RDMA WRITE chunk list from XDR stream. 988 * 989 * If the next boolean in the XDR stream is false there is no 990 * RDMA WRITE chunk list present. Otherwise iterate over the 991 * array and for each entry: allocate a struct clist and decode. 992 * Pass back an indication via wlist_exists if we have seen a 993 * RDMA WRITE chunk list. 994 */ 995 bool_t 996 xdr_decode_wlist(XDR *xdrs, struct clist **w, bool_t *wlist_exists) 997 { 998 struct clist *tmp; 999 bool_t more = FALSE; 1000 uint32_t seg_array_len; 1001 uint32_t i; 1002 1003 if (!xdr_bool(xdrs, &more)) 1004 return (FALSE); 1005 1006 /* is there a wlist? */ 1007 if (more == FALSE) { 1008 *wlist_exists = FALSE; 1009 return (TRUE); 1010 } 1011 *wlist_exists = TRUE; 1012 1013 if (!xdr_uint32(xdrs, &seg_array_len)) 1014 return (FALSE); 1015 1016 tmp = *w = clist_alloc(); 1017 for (i = 0; i < seg_array_len; i++) { 1018 1019 if (!xdr_uint32(xdrs, &tmp->c_dmemhandle.mrc_rmr)) 1020 return (FALSE); 1021 if (!xdr_uint32(xdrs, &tmp->c_len)) 1022 return (FALSE); 1023 1024 DTRACE_PROBE1(krpc__i__xdr_decode_wlist_len, 1025 uint_t, tmp->c_len); 1026 1027 if (!xdr_uint64(xdrs, &tmp->u.c_daddr)) 1028 return (FALSE); 1029 if (i < seg_array_len - 1) { 1030 tmp->c_next = clist_alloc(); 1031 tmp = tmp->c_next; 1032 } else { 1033 tmp->c_next = NULL; 1034 } 1035 } 1036 1037 more = FALSE; 1038 if (!xdr_bool(xdrs, &more)) 1039 return (FALSE); 1040 1041 return (TRUE); 1042 } 1043 1044 /* 1045 * Server side RDMA WRITE list decode. 1046 * XDR context is memory ops 1047 */ 1048 bool_t 1049 xdr_decode_wlist_svc(XDR *xdrs, struct clist **wclp, bool_t *wwl, 1050 uint32_t *total_length, CONN *conn) 1051 { 1052 struct clist *first, *ncl; 1053 char *memp; 1054 uint32_t num_wclist; 1055 uint32_t wcl_length = 0; 1056 uint32_t i; 1057 bool_t more = FALSE; 1058 1059 *wclp = NULL; 1060 *wwl = FALSE; 1061 *total_length = 0; 1062 1063 if (!xdr_bool(xdrs, &more)) { 1064 return (FALSE); 1065 } 1066 1067 if (more == FALSE) { 1068 return (TRUE); 1069 } 1070 1071 *wwl = TRUE; 1072 1073 if (!xdr_uint32(xdrs, &num_wclist)) { 1074 DTRACE_PROBE(krpc__e__xdrrdma__wlistsvc__listlength); 1075 return (FALSE); 1076 } 1077 1078 first = ncl = clist_alloc(); 1079 1080 for (i = 0; i < num_wclist; i++) { 1081 1082 if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr)) 1083 goto err_out; 1084 if (!xdr_uint32(xdrs, &ncl->c_len)) 1085 goto err_out; 1086 if (!xdr_uint64(xdrs, &ncl->u.c_daddr)) 1087 goto err_out; 1088 1089 if (ncl->c_len > MAX_SVC_XFER_SIZE) { 1090 DTRACE_PROBE( 1091 krpc__e__xdrrdma__wlistsvc__chunklist_toobig); 1092 ncl->c_len = MAX_SVC_XFER_SIZE; 1093 } 1094 1095 DTRACE_PROBE1(krpc__i__xdr_decode_wlist_svc_len, 1096 uint_t, ncl->c_len); 1097 1098 wcl_length += ncl->c_len; 1099 1100 if (i < num_wclist - 1) { 1101 ncl->c_next = clist_alloc(); 1102 ncl = ncl->c_next; 1103 } 1104 } 1105 1106 if (!xdr_bool(xdrs, &more)) 1107 goto err_out; 1108 1109 first->rb_longbuf.type = RDMA_LONG_BUFFER; 1110 first->rb_longbuf.len = 1111 wcl_length > WCL_BUF_LEN ? wcl_length : WCL_BUF_LEN; 1112 1113 if (rdma_buf_alloc(conn, &first->rb_longbuf)) { 1114 clist_free(first); 1115 return (FALSE); 1116 } 1117 1118 memp = first->rb_longbuf.addr; 1119 1120 ncl = first; 1121 for (i = 0; i < num_wclist; i++) { 1122 ncl->w.c_saddr3 = (caddr_t)memp; 1123 memp += ncl->c_len; 1124 ncl = ncl->c_next; 1125 } 1126 1127 *wclp = first; 1128 *total_length = wcl_length; 1129 return (TRUE); 1130 1131 err_out: 1132 clist_free(first); 1133 return (FALSE); 1134 } 1135 1136 /* 1137 * XDR decode the long reply write chunk. 1138 */ 1139 bool_t 1140 xdr_decode_reply_wchunk(XDR *xdrs, struct clist **clist) 1141 { 1142 bool_t have_rchunk = FALSE; 1143 struct clist *first = NULL, *ncl = NULL; 1144 uint32_t num_wclist; 1145 uint32_t i; 1146 1147 if (!xdr_bool(xdrs, &have_rchunk)) 1148 return (FALSE); 1149 1150 if (have_rchunk == FALSE) 1151 return (TRUE); 1152 1153 if (!xdr_uint32(xdrs, &num_wclist)) { 1154 DTRACE_PROBE(krpc__e__xdrrdma__replywchunk__listlength); 1155 return (FALSE); 1156 } 1157 1158 if (num_wclist == 0) { 1159 return (FALSE); 1160 } 1161 1162 first = ncl = clist_alloc(); 1163 1164 for (i = 0; i < num_wclist; i++) { 1165 1166 if (i > 0) { 1167 ncl->c_next = clist_alloc(); 1168 ncl = ncl->c_next; 1169 } 1170 1171 if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr)) 1172 goto err_out; 1173 if (!xdr_uint32(xdrs, &ncl->c_len)) 1174 goto err_out; 1175 if (!xdr_uint64(xdrs, &ncl->u.c_daddr)) 1176 goto err_out; 1177 1178 if (ncl->c_len > MAX_SVC_XFER_SIZE) { 1179 DTRACE_PROBE( 1180 krpc__e__xdrrdma__replywchunk__chunklist_toobig); 1181 ncl->c_len = MAX_SVC_XFER_SIZE; 1182 } 1183 if (!(ncl->c_dmemhandle.mrc_rmr && 1184 (ncl->c_len > 0) && ncl->u.c_daddr)) 1185 DTRACE_PROBE( 1186 krpc__e__xdrrdma__replywchunk__invalid_segaddr); 1187 1188 DTRACE_PROBE1(krpc__i__xdr_decode_reply_wchunk_c_len, 1189 uint32_t, ncl->c_len); 1190 1191 } 1192 *clist = first; 1193 return (TRUE); 1194 1195 err_out: 1196 clist_free(first); 1197 return (FALSE); 1198 } 1199 1200 1201 bool_t 1202 xdr_encode_reply_wchunk(XDR *xdrs, 1203 struct clist *cl_longreply, uint32_t seg_array_len) 1204 { 1205 int i; 1206 bool_t long_reply_exists = TRUE; 1207 uint32_t length; 1208 uint64 offset; 1209 1210 if (seg_array_len > 0) { 1211 if (!xdr_bool(xdrs, &long_reply_exists)) 1212 return (FALSE); 1213 if (!xdr_uint32(xdrs, &seg_array_len)) 1214 return (FALSE); 1215 1216 for (i = 0; i < seg_array_len; i++) { 1217 if (!cl_longreply) 1218 return (FALSE); 1219 length = cl_longreply->c_len; 1220 offset = (uint64) cl_longreply->u.c_daddr; 1221 1222 DTRACE_PROBE1( 1223 krpc__i__xdr_encode_reply_wchunk_c_len, 1224 uint32_t, length); 1225 1226 if (!xdr_uint32(xdrs, 1227 &cl_longreply->c_dmemhandle.mrc_rmr)) 1228 return (FALSE); 1229 if (!xdr_uint32(xdrs, &length)) 1230 return (FALSE); 1231 if (!xdr_uint64(xdrs, &offset)) 1232 return (FALSE); 1233 cl_longreply = cl_longreply->c_next; 1234 } 1235 } else { 1236 long_reply_exists = FALSE; 1237 if (!xdr_bool(xdrs, &long_reply_exists)) 1238 return (FALSE); 1239 } 1240 return (TRUE); 1241 } 1242 bool_t 1243 xdrrdma_read_from_client(struct clist *rlist, CONN **conn, uint_t count) 1244 { 1245 struct clist *rdclist; 1246 struct clist cl; 1247 uint_t total_len = 0; 1248 uint32_t status; 1249 bool_t retval = TRUE; 1250 1251 rlist->rb_longbuf.type = RDMA_LONG_BUFFER; 1252 rlist->rb_longbuf.len = 1253 count > RCL_BUF_LEN ? count : RCL_BUF_LEN; 1254 1255 if (rdma_buf_alloc(*conn, &rlist->rb_longbuf)) { 1256 return (FALSE); 1257 } 1258 1259 /* 1260 * The entire buffer is registered with the first chunk. 1261 * Later chunks will use the same registered memory handle. 1262 */ 1263 1264 cl = *rlist; 1265 cl.c_next = NULL; 1266 if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 1267 rdma_buf_free(*conn, &rlist->rb_longbuf); 1268 DTRACE_PROBE( 1269 krpc__e__xdrrdma__readfromclient__clist__reg); 1270 return (FALSE); 1271 } 1272 1273 rlist->c_regtype = CLIST_REG_DST; 1274 rlist->c_dmemhandle = cl.c_dmemhandle; 1275 rlist->c_dsynchandle = cl.c_dsynchandle; 1276 1277 for (rdclist = rlist; 1278 rdclist != NULL; rdclist = rdclist->c_next) { 1279 total_len += rdclist->c_len; 1280 #if (defined(OBJ32)||defined(DEBUG32)) 1281 rdclist->u.c_daddr3 = 1282 (caddr_t)((char *)rlist->rb_longbuf.addr + 1283 (uint32) rdclist->u.c_daddr3); 1284 #else 1285 rdclist->u.c_daddr3 = 1286 (caddr_t)((char *)rlist->rb_longbuf.addr + 1287 (uint64) rdclist->u.c_daddr); 1288 1289 #endif 1290 cl = (*rdclist); 1291 cl.c_next = NULL; 1292 1293 /* 1294 * Use the same memory handle for all the chunks 1295 */ 1296 cl.c_dmemhandle = rlist->c_dmemhandle; 1297 cl.c_dsynchandle = rlist->c_dsynchandle; 1298 1299 1300 DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen, 1301 int, rdclist->c_len); 1302 1303 /* 1304 * Now read the chunk in 1305 */ 1306 if (rdclist->c_next == NULL) { 1307 status = RDMA_READ(*conn, &cl, WAIT); 1308 } else { 1309 status = RDMA_READ(*conn, &cl, NOWAIT); 1310 } 1311 if (status != RDMA_SUCCESS) { 1312 DTRACE_PROBE( 1313 krpc__e__xdrrdma__readfromclient__readfailed); 1314 rdma_buf_free(*conn, &rlist->rb_longbuf); 1315 return (FALSE); 1316 } 1317 } 1318 1319 cl = (*rlist); 1320 cl.c_next = NULL; 1321 cl.c_len = total_len; 1322 if (clist_syncmem(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 1323 retval = FALSE; 1324 } 1325 return (retval); 1326 } 1327 1328 bool_t 1329 xdrrdma_free_clist(CONN *conn, struct clist *clp) 1330 { 1331 rdma_buf_free(conn, &clp->rb_longbuf); 1332 clist_free(clp); 1333 return (TRUE); 1334 } 1335 1336 bool_t 1337 xdrrdma_send_read_data(XDR *xdrs, uint_t data_len, struct clist *wcl) 1338 { 1339 int status; 1340 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 1341 struct xdr_ops *xops = xdrrdma_xops(); 1342 struct clist *tcl, *wrcl, *cl; 1343 struct clist fcl; 1344 int rndup_present, rnduplen; 1345 1346 rndup_present = 0; 1347 wrcl = NULL; 1348 1349 /* caller is doing a sizeof */ 1350 if (xdrs->x_ops != &xdrrdma_ops || xdrs->x_ops == xops) 1351 return (TRUE); 1352 1353 /* copy of the first chunk */ 1354 fcl = *wcl; 1355 fcl.c_next = NULL; 1356 1357 /* 1358 * The entire buffer is registered with the first chunk. 1359 * Later chunks will use the same registered memory handle. 1360 */ 1361 1362 status = clist_register(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE); 1363 if (status != RDMA_SUCCESS) { 1364 return (FALSE); 1365 } 1366 1367 wcl->c_regtype = CLIST_REG_SOURCE; 1368 wcl->c_smemhandle = fcl.c_smemhandle; 1369 wcl->c_ssynchandle = fcl.c_ssynchandle; 1370 1371 /* 1372 * Only transfer the read data ignoring any trailing 1373 * roundup chunks. A bit of work, but it saves an 1374 * unnecessary extra RDMA_WRITE containing only 1375 * roundup bytes. 1376 */ 1377 1378 rnduplen = clist_len(wcl) - data_len; 1379 1380 if (rnduplen) { 1381 1382 tcl = wcl->c_next; 1383 1384 /* 1385 * Check if there is a trailing roundup chunk 1386 */ 1387 while (tcl) { 1388 if ((tcl->c_next == NULL) && (tcl->c_len == rnduplen)) { 1389 rndup_present = 1; 1390 break; 1391 } 1392 tcl = tcl->c_next; 1393 } 1394 1395 /* 1396 * Make a copy chunk list skipping the last chunk 1397 */ 1398 if (rndup_present) { 1399 cl = wcl; 1400 tcl = NULL; 1401 while (cl) { 1402 if (tcl == NULL) { 1403 tcl = clist_alloc(); 1404 wrcl = tcl; 1405 } else { 1406 tcl->c_next = clist_alloc(); 1407 tcl = tcl->c_next; 1408 } 1409 1410 *tcl = *cl; 1411 cl = cl->c_next; 1412 /* last chunk */ 1413 if (cl->c_next == NULL) 1414 break; 1415 } 1416 tcl->c_next = NULL; 1417 } 1418 } 1419 1420 if (wrcl == NULL) { 1421 /* No roundup chunks */ 1422 wrcl = wcl; 1423 } 1424 1425 /* 1426 * Set the registered memory handles for the 1427 * rest of the chunks same as the first chunk. 1428 */ 1429 tcl = wrcl->c_next; 1430 while (tcl) { 1431 tcl->c_smemhandle = fcl.c_smemhandle; 1432 tcl->c_ssynchandle = fcl.c_ssynchandle; 1433 tcl = tcl->c_next; 1434 } 1435 1436 /* 1437 * Sync the total len beginning from the first chunk. 1438 */ 1439 fcl.c_len = clist_len(wrcl); 1440 status = clist_syncmem(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE); 1441 if (status != RDMA_SUCCESS) { 1442 return (FALSE); 1443 } 1444 1445 status = RDMA_WRITE(xdrp->xp_conn, wrcl, WAIT); 1446 1447 if (rndup_present) 1448 clist_free(wrcl); 1449 1450 if (status != RDMA_SUCCESS) { 1451 return (FALSE); 1452 } 1453 1454 return (TRUE); 1455 } 1456 1457 1458 /* 1459 * Reads one chunk at a time 1460 */ 1461 1462 static bool_t 1463 xdrrdma_read_a_chunk(XDR *xdrs, CONN **conn) 1464 { 1465 int status; 1466 int32_t len = 0; 1467 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 1468 struct clist *cle = *(xdrp->xp_rcl_next); 1469 struct clist *rclp = xdrp->xp_rcl; 1470 struct clist *clp; 1471 1472 /* 1473 * len is used later to decide xdr offset in 1474 * the chunk factoring any 4-byte XDR alignment 1475 * (See read chunk example top of this file) 1476 */ 1477 while (rclp != cle) { 1478 len += rclp->c_len; 1479 rclp = rclp->c_next; 1480 } 1481 1482 len = RNDUP(len) - len; 1483 1484 ASSERT(xdrs->x_handy <= 0); 1485 1486 /* 1487 * If this is the first chunk to contain the RPC 1488 * message set xp_off to the xdr offset of the 1489 * inline message. 1490 */ 1491 if (xdrp->xp_off == 0) 1492 xdrp->xp_off = (xdrp->xp_offp - xdrs->x_base); 1493 1494 if (cle == NULL || (cle->c_xdroff != xdrp->xp_off)) 1495 return (FALSE); 1496 1497 /* 1498 * Make a copy of the chunk to read from client. 1499 * Chunks are read on demand, so read only one 1500 * for now. 1501 */ 1502 1503 rclp = clist_alloc(); 1504 *rclp = *cle; 1505 rclp->c_next = NULL; 1506 1507 xdrp->xp_rcl_next = &cle->c_next; 1508 1509 /* 1510 * If there is a roundup present, then skip those 1511 * bytes when reading. 1512 */ 1513 if (len) { 1514 rclp->w.c_saddr = 1515 (uint64)(uintptr_t)rclp->w.c_saddr + len; 1516 rclp->c_len = rclp->c_len - len; 1517 } 1518 1519 status = xdrrdma_read_from_client(rclp, conn, rclp->c_len); 1520 1521 if (status == FALSE) { 1522 clist_free(rclp); 1523 return (status); 1524 } 1525 1526 xdrp->xp_offp = rclp->rb_longbuf.addr; 1527 xdrs->x_base = xdrp->xp_offp; 1528 xdrs->x_handy = rclp->c_len; 1529 1530 /* 1531 * This copy of read chunks containing the XDR 1532 * message is freed later in xdrrdma_destroy() 1533 */ 1534 1535 if (xdrp->xp_rcl_xdr) { 1536 /* Add the chunk to end of the list */ 1537 clp = xdrp->xp_rcl_xdr; 1538 while (clp->c_next != NULL) 1539 clp = clp->c_next; 1540 clp->c_next = rclp; 1541 } else { 1542 xdrp->xp_rcl_xdr = rclp; 1543 } 1544 return (TRUE); 1545 } 1546 1547 static void 1548 xdrrdma_free_xdr_chunks(CONN *conn, struct clist *xdr_rcl) 1549 { 1550 struct clist *cl; 1551 1552 (void) clist_deregister(conn, xdr_rcl); 1553 1554 /* 1555 * Read chunks containing parts XDR message are 1556 * special: in case of multiple chunks each has 1557 * its own buffer. 1558 */ 1559 1560 cl = xdr_rcl; 1561 while (cl) { 1562 rdma_buf_free(conn, &cl->rb_longbuf); 1563 cl = cl->c_next; 1564 } 1565 1566 clist_free(xdr_rcl); 1567 } 1568