1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 /* 27 * Copyright (c) 2007, The Ohio State University. All rights reserved. 28 * 29 * Portions of this source code is developed by the team members of 30 * The Ohio State University's Network-Based Computing Laboratory (NBCL), 31 * headed by Professor Dhabaleswar K. (DK) Panda. 32 * 33 * Acknowledgements to contributions from developors: 34 * Ranjit Noronha: noronha@cse.ohio-state.edu 35 * Lei Chai : chail@cse.ohio-state.edu 36 * Weikuan Yu : yuw@cse.ohio-state.edu 37 * 38 */ 39 40 /* 41 * xdr_rdma.c, XDR implementation using RDMA to move large chunks 42 */ 43 44 #include <sys/param.h> 45 #include <sys/types.h> 46 #include <sys/systm.h> 47 #include <sys/kmem.h> 48 #include <sys/sdt.h> 49 #include <sys/debug.h> 50 51 #include <rpc/types.h> 52 #include <rpc/xdr.h> 53 #include <sys/cmn_err.h> 54 #include <rpc/rpc_sztypes.h> 55 #include <rpc/rpc_rdma.h> 56 #include <sys/sysmacros.h> 57 58 static bool_t xdrrdma_getint32(XDR *, int32_t *); 59 static bool_t xdrrdma_putint32(XDR *, int32_t *); 60 static bool_t xdrrdma_getbytes(XDR *, caddr_t, int); 61 static bool_t xdrrdma_putbytes(XDR *, caddr_t, int); 62 uint_t xdrrdma_getpos(XDR *); 63 bool_t xdrrdma_setpos(XDR *, uint_t); 64 static rpc_inline_t *xdrrdma_inline(XDR *, int); 65 void xdrrdma_destroy(XDR *); 66 static bool_t xdrrdma_control(XDR *, int, void *); 67 static bool_t xdrrdma_read_a_chunk(XDR *, CONN **); 68 static void xdrrdma_free_xdr_chunks(CONN *, struct clist *); 69 70 struct xdr_ops xdrrdmablk_ops = { 71 xdrrdma_getbytes, 72 xdrrdma_putbytes, 73 xdrrdma_getpos, 74 xdrrdma_setpos, 75 xdrrdma_inline, 76 xdrrdma_destroy, 77 xdrrdma_control, 78 xdrrdma_getint32, 79 xdrrdma_putint32 80 }; 81 82 struct xdr_ops xdrrdma_ops = { 83 xdrrdma_getbytes, 84 xdrrdma_putbytes, 85 xdrrdma_getpos, 86 xdrrdma_setpos, 87 xdrrdma_inline, 88 xdrrdma_destroy, 89 xdrrdma_control, 90 xdrrdma_getint32, 91 xdrrdma_putint32 92 }; 93 94 /* 95 * A chunk list entry identifies a chunk of opaque data to be moved 96 * separately from the rest of the RPC message. xp_min_chunk = 0, is a 97 * special case for ENCODING, which means do not chunk the incoming stream of 98 * data. 99 * 100 * A read chunk can contain part of the RPC message in addition to the 101 * inline message. In such a case, (xp_offp - x_base) will not provide 102 * the correct xdr offset of the entire message. xp_off is used in such 103 * a case to denote the offset or current position in the overall message 104 * covering both the inline and the chunk. This is used only in the case 105 * of decoding and useful to compare read chunk 'c_xdroff' offsets. 106 * 107 * An example for a read chunk containing an XDR message: 108 * An NFSv4 compound as following: 109 * 110 * PUTFH 111 * WRITE [4109 bytes] 112 * GETATTR 113 * 114 * Solaris Encoding is: 115 * ------------------- 116 * 117 * <Inline message>: [PUTFH WRITE4args GETATTR] 118 * | 119 * v 120 * [RDMA_READ chunks]: [write data] 121 * 122 * 123 * Linux encoding is: 124 * ----------------- 125 * 126 * <Inline message>: [PUTFH WRITE4args] 127 * | 128 * v 129 * [RDMA_READ chunks]: [Write data] [Write data2] [Getattr chunk] 130 * chunk1 chunk2 chunk3 131 * 132 * where the READ chunks are as: 133 * 134 * - chunk1 - 4k 135 * write data | 136 * - chunk2 - 13 bytes(4109 - 4k) 137 * getattr op - chunk3 - 19 bytes 138 * (getattr op starts at byte 4 after 3 bytes of roundup) 139 * 140 */ 141 142 typedef struct { 143 caddr_t xp_offp; 144 int xp_min_chunk; 145 uint_t xp_flags; /* Controls setting for rdma xdr */ 146 int xp_buf_size; /* size of xdr buffer */ 147 int xp_off; /* overall offset */ 148 struct clist *xp_rcl; /* head of chunk list */ 149 struct clist **xp_rcl_next; /* location to place/find next chunk */ 150 struct clist *xp_rcl_xdr; /* copy of rcl containing RPC message */ 151 struct clist *xp_wcl; /* head of write chunk list */ 152 CONN *xp_conn; /* connection for chunk data xfer */ 153 uint_t xp_reply_chunk_len; 154 /* used to track length for security modes: integrity/privacy */ 155 uint_t xp_reply_chunk_len_alt; 156 } xrdma_private_t; 157 158 extern kmem_cache_t *clist_cache; 159 160 bool_t 161 xdrrdma_getrdmablk(XDR *xdrs, struct clist **rlist, uint_t *sizep, 162 CONN **conn, const uint_t maxsize) 163 { 164 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 165 struct clist *cle = *(xdrp->xp_rcl_next); 166 struct clist *rdclist = NULL, *prev = NULL; 167 bool_t retval = TRUE; 168 uint32_t cur_offset = 0; 169 uint32_t total_segments = 0; 170 uint32_t actual_segments = 0; 171 uint32_t alen; 172 uint_t total_len; 173 174 ASSERT(xdrs->x_op != XDR_FREE); 175 176 /* 177 * first deal with the length since xdr bytes are counted 178 */ 179 if (!xdr_u_int(xdrs, sizep)) { 180 DTRACE_PROBE(xdr__e__getrdmablk_sizep_fail); 181 return (FALSE); 182 } 183 total_len = *sizep; 184 if (total_len > maxsize) { 185 DTRACE_PROBE2(xdr__e__getrdmablk_bad_size, 186 int, total_len, int, maxsize); 187 return (FALSE); 188 } 189 (*conn) = xdrp->xp_conn; 190 191 /* 192 * if no data we are done 193 */ 194 if (total_len == 0) 195 return (TRUE); 196 197 while (cle) { 198 total_segments++; 199 cle = cle->c_next; 200 } 201 202 cle = *(xdrp->xp_rcl_next); 203 204 /* 205 * If there was a chunk at the current offset, then setup a read 206 * chunk list which records the destination address and length 207 * and will RDMA READ the data in later. 208 */ 209 if (cle == NULL) 210 return (FALSE); 211 212 if (cle->c_xdroff != (xdrp->xp_offp - xdrs->x_base)) 213 return (FALSE); 214 215 /* 216 * Setup the chunk list with appropriate 217 * address (offset) and length 218 */ 219 for (actual_segments = 0; 220 actual_segments < total_segments; actual_segments++) { 221 222 DTRACE_PROBE3(krpc__i__xdrrdma_getrdmablk, uint32_t, cle->c_len, 223 uint32_t, total_len, uint32_t, cle->c_xdroff); 224 225 if (total_len <= 0) 226 break; 227 228 /* 229 * not the first time in the loop 230 */ 231 if (actual_segments > 0) 232 cle = cle->c_next; 233 234 cle->u.c_daddr = (uint64) cur_offset; 235 alen = 0; 236 if (cle->c_len > total_len) { 237 alen = cle->c_len; 238 cle->c_len = total_len; 239 } 240 if (!alen) 241 xdrp->xp_rcl_next = &cle->c_next; 242 243 cur_offset += cle->c_len; 244 total_len -= cle->c_len; 245 246 if ((total_segments - actual_segments - 1) == 0 && 247 total_len > 0) { 248 DTRACE_PROBE(krpc__e__xdrrdma_getblk_chunktooshort); 249 retval = FALSE; 250 } 251 252 if ((total_segments - actual_segments - 1) > 0 && 253 total_len == 0) { 254 DTRACE_PROBE2(krpc__e__xdrrdma_getblk_toobig, 255 int, total_segments, int, actual_segments); 256 } 257 258 rdclist = clist_alloc(); 259 (*rdclist) = (*cle); 260 if ((*rlist) == NULL) 261 (*rlist) = rdclist; 262 if (prev == NULL) 263 prev = rdclist; 264 else { 265 prev->c_next = rdclist; 266 prev = rdclist; 267 } 268 269 } 270 271 out: 272 if (prev != NULL) 273 prev->c_next = NULL; 274 275 /* 276 * Adjust the chunk length, if we read only a part of 277 * a chunk. 278 */ 279 280 if (alen) { 281 cle->w.c_saddr = 282 (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len; 283 cle->c_len = alen - cle->c_len; 284 } 285 286 return (retval); 287 } 288 289 /* 290 * The procedure xdrrdma_create initializes a stream descriptor for a memory 291 * buffer. 292 */ 293 void 294 xdrrdma_create(XDR *xdrs, caddr_t addr, uint_t size, 295 int min_chunk, struct clist *cl, enum xdr_op op, CONN *conn) 296 { 297 xrdma_private_t *xdrp; 298 struct clist *cle; 299 300 xdrs->x_op = op; 301 xdrs->x_ops = &xdrrdma_ops; 302 xdrs->x_base = addr; 303 xdrs->x_handy = size; 304 xdrs->x_public = NULL; 305 306 xdrp = (xrdma_private_t *)kmem_zalloc(sizeof (xrdma_private_t), 307 KM_SLEEP); 308 xdrs->x_private = (caddr_t)xdrp; 309 xdrp->xp_offp = addr; 310 xdrp->xp_min_chunk = min_chunk; 311 xdrp->xp_flags = 0; 312 xdrp->xp_buf_size = size; 313 xdrp->xp_rcl = cl; 314 xdrp->xp_reply_chunk_len = 0; 315 xdrp->xp_reply_chunk_len_alt = 0; 316 317 if (op == XDR_ENCODE && cl != NULL) { 318 /* Find last element in chunk list and set xp_rcl_next */ 319 for (cle = cl; cle->c_next != NULL; cle = cle->c_next) 320 continue; 321 322 xdrp->xp_rcl_next = &(cle->c_next); 323 } else { 324 xdrp->xp_rcl_next = &(xdrp->xp_rcl); 325 } 326 327 xdrp->xp_wcl = NULL; 328 329 xdrp->xp_conn = conn; 330 if (xdrp->xp_min_chunk != 0) 331 xdrp->xp_flags |= XDR_RDMA_CHUNK; 332 } 333 334 /* ARGSUSED */ 335 void 336 xdrrdma_destroy(XDR * xdrs) 337 { 338 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 339 340 if (xdrp == NULL) 341 return; 342 343 if (xdrp->xp_wcl) { 344 if (xdrp->xp_flags & XDR_RDMA_WLIST_REG) { 345 (void) clist_deregister(xdrp->xp_conn, xdrp->xp_wcl); 346 rdma_buf_free(xdrp->xp_conn, 347 &xdrp->xp_wcl->rb_longbuf); 348 } 349 clist_free(xdrp->xp_wcl); 350 } 351 352 if (xdrp->xp_rcl) { 353 if (xdrp->xp_flags & XDR_RDMA_RLIST_REG) { 354 (void) clist_deregister(xdrp->xp_conn, xdrp->xp_rcl); 355 rdma_buf_free(xdrp->xp_conn, 356 &xdrp->xp_rcl->rb_longbuf); 357 } 358 clist_free(xdrp->xp_rcl); 359 } 360 361 if (xdrp->xp_rcl_xdr) 362 xdrrdma_free_xdr_chunks(xdrp->xp_conn, xdrp->xp_rcl_xdr); 363 364 (void) kmem_free(xdrs->x_private, sizeof (xrdma_private_t)); 365 xdrs->x_private = NULL; 366 } 367 368 static bool_t 369 xdrrdma_getint32(XDR *xdrs, int32_t *int32p) 370 { 371 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 372 int chunked = 0; 373 374 if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) { 375 /* 376 * check if rest of the rpc message is in a chunk 377 */ 378 if (!xdrrdma_read_a_chunk(xdrs, &xdrp->xp_conn)) { 379 return (FALSE); 380 } 381 chunked = 1; 382 } 383 384 /* LINTED pointer alignment */ 385 *int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp)))); 386 387 DTRACE_PROBE1(krpc__i__xdrrdma_getint32, int32_t, *int32p); 388 389 xdrp->xp_offp += sizeof (int32_t); 390 391 if (chunked) 392 xdrs->x_handy -= (int)sizeof (int32_t); 393 394 if (xdrp->xp_off != 0) { 395 xdrp->xp_off += sizeof (int32_t); 396 } 397 398 return (TRUE); 399 } 400 401 static bool_t 402 xdrrdma_putint32(XDR *xdrs, int32_t *int32p) 403 { 404 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 405 406 if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) 407 return (FALSE); 408 409 /* LINTED pointer alignment */ 410 *(int32_t *)xdrp->xp_offp = (int32_t)htonl((uint32_t)(*int32p)); 411 xdrp->xp_offp += sizeof (int32_t); 412 413 return (TRUE); 414 } 415 416 /* 417 * DECODE bytes from XDR stream for rdma. 418 * If the XDR stream contains a read chunk list, 419 * it will go through xdrrdma_getrdmablk instead. 420 */ 421 static bool_t 422 xdrrdma_getbytes(XDR *xdrs, caddr_t addr, int len) 423 { 424 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 425 struct clist *cle = *(xdrp->xp_rcl_next); 426 struct clist *cls = *(xdrp->xp_rcl_next); 427 struct clist cl; 428 bool_t retval = TRUE; 429 uint32_t total_len = len; 430 uint32_t cur_offset = 0; 431 uint32_t total_segments = 0; 432 uint32_t actual_segments = 0; 433 uint32_t status = RDMA_SUCCESS; 434 uint32_t alen = 0; 435 uint32_t xpoff; 436 437 while (cle) { 438 total_segments++; 439 cle = cle->c_next; 440 } 441 442 cle = *(xdrp->xp_rcl_next); 443 444 if (xdrp->xp_off) { 445 xpoff = xdrp->xp_off; 446 } else { 447 xpoff = (xdrp->xp_offp - xdrs->x_base); 448 } 449 450 /* 451 * If there was a chunk at the current offset, then setup a read 452 * chunk list which records the destination address and length 453 * and will RDMA READ the data in later. 454 */ 455 456 if (cle != NULL && cle->c_xdroff == xpoff) { 457 for (actual_segments = 0; 458 actual_segments < total_segments; actual_segments++) { 459 460 if (total_len <= 0) 461 break; 462 463 if (status != RDMA_SUCCESS) 464 goto out; 465 466 cle->u.c_daddr = (uint64)(uintptr_t)addr + cur_offset; 467 alen = 0; 468 if (cle->c_len > total_len) { 469 alen = cle->c_len; 470 cle->c_len = total_len; 471 } 472 if (!alen) 473 xdrp->xp_rcl_next = &cle->c_next; 474 475 cur_offset += cle->c_len; 476 total_len -= cle->c_len; 477 478 if ((total_segments - actual_segments - 1) == 0 && 479 total_len > 0) { 480 DTRACE_PROBE( 481 krpc__e__xdrrdma_getbytes_chunktooshort); 482 retval = FALSE; 483 } 484 485 if ((total_segments - actual_segments - 1) > 0 && 486 total_len == 0) { 487 DTRACE_PROBE2(krpc__e__xdrrdma_getbytes_toobig, 488 int, total_segments, int, actual_segments); 489 } 490 491 /* 492 * RDMA READ the chunk data from the remote end. 493 * First prep the destination buffer by registering 494 * it, then RDMA READ the chunk data. Since we are 495 * doing streaming memory, sync the destination 496 * buffer to CPU and deregister the buffer. 497 */ 498 if (xdrp->xp_conn == NULL) { 499 return (FALSE); 500 } 501 cl = *cle; 502 cl.c_next = NULL; 503 status = clist_register(xdrp->xp_conn, &cl, 504 CLIST_REG_DST); 505 if (status != RDMA_SUCCESS) { 506 retval = FALSE; 507 /* 508 * Deregister the previous chunks 509 * before return 510 */ 511 goto out; 512 } 513 514 cle->c_dmemhandle = cl.c_dmemhandle; 515 cle->c_dsynchandle = cl.c_dsynchandle; 516 517 /* 518 * Now read the chunk in 519 */ 520 if ((total_segments - actual_segments - 1) == 0 || 521 total_len == 0) { 522 status = RDMA_READ(xdrp->xp_conn, &cl, WAIT); 523 } else { 524 status = RDMA_READ(xdrp->xp_conn, &cl, NOWAIT); 525 } 526 if (status != RDMA_SUCCESS) { 527 DTRACE_PROBE1( 528 krpc__i__xdrrdma_getblk_readfailed, 529 int, status); 530 retval = FALSE; 531 } 532 533 cle = cle->c_next; 534 535 } 536 537 /* 538 * sync the memory for cpu 539 */ 540 cl = *cls; 541 cl.c_next = NULL; 542 cl.c_len = cur_offset; 543 if (clist_syncmem( 544 xdrp->xp_conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 545 retval = FALSE; 546 } 547 out: 548 549 /* 550 * Deregister the chunks 551 */ 552 cle = cls; 553 while (actual_segments != 0) { 554 cl = *cle; 555 cl.c_next = NULL; 556 557 cl.c_regtype = CLIST_REG_DST; 558 (void) clist_deregister(xdrp->xp_conn, &cl); 559 560 cle = cle->c_next; 561 actual_segments--; 562 } 563 564 if (alen) { 565 cle = *(xdrp->xp_rcl_next); 566 cle->w.c_saddr = 567 (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len; 568 cle->c_len = alen - cle->c_len; 569 } 570 571 return (retval); 572 } 573 574 if ((xdrs->x_handy -= len) < 0) 575 return (FALSE); 576 577 bcopy(xdrp->xp_offp, addr, len); 578 579 xdrp->xp_offp += len; 580 581 if (xdrp->xp_off != 0) 582 xdrp->xp_off += len; 583 584 return (TRUE); 585 } 586 587 /* 588 * ENCODE some bytes into an XDR stream xp_min_chunk = 0, means the stream of 589 * bytes contain no chunks to seperate out, and if the bytes do not fit in 590 * the supplied buffer, grow the buffer and free the old buffer. 591 */ 592 static bool_t 593 xdrrdma_putbytes(XDR *xdrs, caddr_t addr, int len) 594 { 595 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 596 /* 597 * Is this stream accepting chunks? 598 * If so, does the either of the two following conditions exist? 599 * - length of bytes to encode is greater than the min chunk size? 600 * - remaining space in this stream is shorter than length of 601 * bytes to encode? 602 * 603 * If the above exists, then create a chunk for this encoding 604 * and save the addresses, etc. 605 */ 606 if (xdrp->xp_flags & XDR_RDMA_CHUNK && 607 ((xdrp->xp_min_chunk != 0 && 608 len >= xdrp->xp_min_chunk) || 609 (xdrs->x_handy - len < 0))) { 610 struct clist *cle; 611 int offset = xdrp->xp_offp - xdrs->x_base; 612 613 cle = clist_alloc(); 614 cle->c_xdroff = offset; 615 cle->c_len = len; 616 cle->w.c_saddr = (uint64)(uintptr_t)addr; 617 cle->c_next = NULL; 618 619 *(xdrp->xp_rcl_next) = cle; 620 xdrp->xp_rcl_next = &(cle->c_next); 621 622 return (TRUE); 623 } 624 /* Is there enough space to encode what is left? */ 625 if ((xdrs->x_handy -= len) < 0) { 626 return (FALSE); 627 } 628 bcopy(addr, xdrp->xp_offp, len); 629 xdrp->xp_offp += len; 630 631 return (TRUE); 632 } 633 634 uint_t 635 xdrrdma_getpos(XDR *xdrs) 636 { 637 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 638 639 return ((uint_t)((uintptr_t)xdrp->xp_offp - (uintptr_t)xdrs->x_base)); 640 } 641 642 bool_t 643 xdrrdma_setpos(XDR *xdrs, uint_t pos) 644 { 645 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 646 647 caddr_t newaddr = xdrs->x_base + pos; 648 caddr_t lastaddr = xdrp->xp_offp + xdrs->x_handy; 649 ptrdiff_t diff; 650 651 if (newaddr > lastaddr) 652 return (FALSE); 653 654 xdrp->xp_offp = newaddr; 655 diff = lastaddr - newaddr; 656 xdrs->x_handy = (int)diff; 657 658 return (TRUE); 659 } 660 661 /* ARGSUSED */ 662 static rpc_inline_t * 663 xdrrdma_inline(XDR *xdrs, int len) 664 { 665 rpc_inline_t *buf = NULL; 666 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 667 struct clist *cle = *(xdrp->xp_rcl_next); 668 669 if (xdrs->x_op == XDR_DECODE) { 670 /* 671 * Since chunks aren't in-line, check to see whether there is 672 * a chunk in the inline range. 673 */ 674 if (cle != NULL && 675 cle->c_xdroff <= (xdrp->xp_offp - xdrs->x_base + len)) 676 return (NULL); 677 } 678 679 /* LINTED pointer alignment */ 680 buf = (rpc_inline_t *)xdrp->xp_offp; 681 if (!IS_P2ALIGNED(buf, sizeof (int32_t))) 682 return (NULL); 683 684 if ((xdrs->x_handy < len) || (xdrp->xp_min_chunk != 0 && 685 len >= xdrp->xp_min_chunk)) { 686 return (NULL); 687 } else { 688 xdrs->x_handy -= len; 689 xdrp->xp_offp += len; 690 return (buf); 691 } 692 } 693 694 static bool_t 695 xdrrdma_control(XDR *xdrs, int request, void *info) 696 { 697 int32_t *int32p; 698 int len, i; 699 uint_t in_flags; 700 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 701 rdma_chunkinfo_t *rcip = NULL; 702 rdma_wlist_conn_info_t *rwcip = NULL; 703 rdma_chunkinfo_lengths_t *rcilp = NULL; 704 struct uio *uiop; 705 struct clist *rwl = NULL; 706 struct clist *prev = NULL; 707 708 switch (request) { 709 case XDR_PEEK: 710 /* 711 * Return the next 4 byte unit in the XDR stream. 712 */ 713 if (xdrs->x_handy < sizeof (int32_t)) 714 return (FALSE); 715 716 int32p = (int32_t *)info; 717 *int32p = (int32_t)ntohl((uint32_t) 718 (*((int32_t *)(xdrp->xp_offp)))); 719 720 return (TRUE); 721 722 case XDR_SKIPBYTES: 723 /* 724 * Skip the next N bytes in the XDR stream. 725 */ 726 int32p = (int32_t *)info; 727 len = RNDUP((int)(*int32p)); 728 if ((xdrs->x_handy -= len) < 0) 729 return (FALSE); 730 xdrp->xp_offp += len; 731 732 return (TRUE); 733 734 case XDR_RDMA_SET_FLAGS: 735 /* 736 * Set the flags provided in the *info in xp_flags for rdma 737 * xdr stream control. 738 */ 739 int32p = (int32_t *)info; 740 in_flags = (uint_t)(*int32p); 741 742 xdrp->xp_flags |= in_flags; 743 return (TRUE); 744 745 case XDR_RDMA_GET_FLAGS: 746 /* 747 * Get the flags provided in xp_flags return through *info 748 */ 749 int32p = (int32_t *)info; 750 751 *int32p = (int32_t)xdrp->xp_flags; 752 return (TRUE); 753 754 case XDR_RDMA_GET_CHUNK_LEN: 755 rcilp = (rdma_chunkinfo_lengths_t *)info; 756 rcilp->rcil_len = xdrp->xp_reply_chunk_len; 757 rcilp->rcil_len_alt = xdrp->xp_reply_chunk_len_alt; 758 759 return (TRUE); 760 761 case XDR_RDMA_ADD_CHUNK: 762 /* 763 * Store wlist information 764 */ 765 766 rcip = (rdma_chunkinfo_t *)info; 767 768 switch (rcip->rci_type) { 769 case RCI_WRITE_UIO_CHUNK: 770 xdrp->xp_reply_chunk_len_alt += rcip->rci_len; 771 772 if (rcip->rci_len < xdrp->xp_min_chunk) { 773 xdrp->xp_wcl = NULL; 774 *(rcip->rci_clpp) = NULL; 775 return (TRUE); 776 } 777 uiop = rcip->rci_a.rci_uiop; 778 779 for (i = 0; i < uiop->uio_iovcnt; i++) { 780 rwl = clist_alloc(); 781 rwl->c_len = uiop->uio_iov[i].iov_len; 782 rwl->u.c_daddr = 783 (uint64)(uintptr_t) 784 (uiop->uio_iov[i].iov_base); 785 /* 786 * if userspace address, put adspace ptr in 787 * clist. If not, then do nothing since it's 788 * already set to NULL (from kmem_zalloc) 789 */ 790 if (uiop->uio_segflg == UIO_USERSPACE) { 791 rwl->c_adspc = ttoproc(curthread)->p_as; 792 } 793 794 if (prev == NULL) 795 prev = rwl; 796 else { 797 prev->c_next = rwl; 798 prev = rwl; 799 } 800 } 801 802 rwl->c_next = NULL; 803 xdrp->xp_wcl = rwl; 804 *(rcip->rci_clpp) = rwl; 805 806 break; 807 808 case RCI_WRITE_ADDR_CHUNK: 809 rwl = clist_alloc(); 810 811 rwl->c_len = rcip->rci_len; 812 rwl->u.c_daddr3 = rcip->rci_a.rci_addr; 813 rwl->c_next = NULL; 814 xdrp->xp_reply_chunk_len_alt += rcip->rci_len; 815 816 xdrp->xp_wcl = rwl; 817 *(rcip->rci_clpp) = rwl; 818 819 break; 820 821 case RCI_REPLY_CHUNK: 822 xdrp->xp_reply_chunk_len += rcip->rci_len; 823 break; 824 } 825 return (TRUE); 826 827 case XDR_RDMA_GET_WLIST: 828 *((struct clist **)info) = xdrp->xp_wcl; 829 return (TRUE); 830 831 case XDR_RDMA_SET_WLIST: 832 xdrp->xp_wcl = (struct clist *)info; 833 return (TRUE); 834 835 case XDR_RDMA_GET_RLIST: 836 *((struct clist **)info) = xdrp->xp_rcl; 837 return (TRUE); 838 839 case XDR_RDMA_GET_WCINFO: 840 rwcip = (rdma_wlist_conn_info_t *)info; 841 842 rwcip->rwci_wlist = xdrp->xp_wcl; 843 rwcip->rwci_conn = xdrp->xp_conn; 844 845 return (TRUE); 846 847 default: 848 return (FALSE); 849 } 850 } 851 852 bool_t xdr_do_clist(XDR *, clist **); 853 854 /* 855 * Not all fields in struct clist are interesting to the RPC over RDMA 856 * protocol. Only XDR the interesting fields. 857 */ 858 bool_t 859 xdr_clist(XDR *xdrs, clist *objp) 860 { 861 if (!xdr_uint32(xdrs, &objp->c_xdroff)) 862 return (FALSE); 863 if (!xdr_uint32(xdrs, &objp->c_smemhandle.mrc_rmr)) 864 return (FALSE); 865 if (!xdr_uint32(xdrs, &objp->c_len)) 866 return (FALSE); 867 if (!xdr_uint64(xdrs, &objp->w.c_saddr)) 868 return (FALSE); 869 if (!xdr_do_clist(xdrs, &objp->c_next)) 870 return (FALSE); 871 return (TRUE); 872 } 873 874 /* 875 * The following two functions are forms of xdr_pointer() 876 * and xdr_reference(). Since the generic versions just 877 * kmem_alloc() a new clist, we actually want to use the 878 * rdma_clist kmem_cache. 879 */ 880 881 /* 882 * Generate or free a clist structure from the 883 * kmem_cache "rdma_clist" 884 */ 885 bool_t 886 xdr_ref_clist(XDR *xdrs, caddr_t *pp) 887 { 888 caddr_t loc = *pp; 889 bool_t stat; 890 891 if (loc == NULL) { 892 switch (xdrs->x_op) { 893 case XDR_FREE: 894 return (TRUE); 895 896 case XDR_DECODE: 897 *pp = loc = (caddr_t)clist_alloc(); 898 break; 899 900 case XDR_ENCODE: 901 ASSERT(loc); 902 break; 903 } 904 } 905 906 stat = xdr_clist(xdrs, (struct clist *)loc); 907 908 if (xdrs->x_op == XDR_FREE) { 909 kmem_cache_free(clist_cache, loc); 910 *pp = NULL; 911 } 912 return (stat); 913 } 914 915 /* 916 * XDR a pointer to a possibly recursive clist. This differs 917 * with xdr_reference in that it can serialize/deserialiaze 918 * trees correctly. 919 * 920 * What is sent is actually a union: 921 * 922 * union object_pointer switch (boolean b) { 923 * case TRUE: object_data data; 924 * case FALSE: void nothing; 925 * } 926 * 927 * > objpp: Pointer to the pointer to the object. 928 * 929 */ 930 931 bool_t 932 xdr_do_clist(XDR *xdrs, clist **objpp) 933 { 934 bool_t more_data; 935 936 more_data = (*objpp != NULL); 937 if (!xdr_bool(xdrs, &more_data)) 938 return (FALSE); 939 if (!more_data) { 940 *objpp = NULL; 941 return (TRUE); 942 } 943 return (xdr_ref_clist(xdrs, (caddr_t *)objpp)); 944 } 945 946 uint_t 947 xdr_getbufsize(XDR *xdrs) 948 { 949 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 950 951 return ((uint_t)xdrp->xp_buf_size); 952 } 953 954 /* ARGSUSED */ 955 bool_t 956 xdr_encode_rlist_svc(XDR *xdrs, clist *rlist) 957 { 958 bool_t vfalse = FALSE; 959 960 ASSERT(rlist == NULL); 961 return (xdr_bool(xdrs, &vfalse)); 962 } 963 964 bool_t 965 xdr_encode_wlist(XDR *xdrs, clist *w) 966 { 967 bool_t vfalse = FALSE, vtrue = TRUE; 968 int i; 969 uint_t num_segment = 0; 970 struct clist *cl; 971 972 /* does a wlist exist? */ 973 if (w == NULL) { 974 return (xdr_bool(xdrs, &vfalse)); 975 } 976 /* Encode N consecutive segments, 1, N, HLOO, ..., HLOO, 0 */ 977 if (!xdr_bool(xdrs, &vtrue)) 978 return (FALSE); 979 980 for (cl = w; cl != NULL; cl = cl->c_next) { 981 num_segment++; 982 } 983 984 if (!xdr_uint32(xdrs, &num_segment)) 985 return (FALSE); 986 for (i = 0; i < num_segment; i++) { 987 988 DTRACE_PROBE1(krpc__i__xdr_encode_wlist_len, uint_t, w->c_len); 989 990 if (!xdr_uint32(xdrs, &w->c_dmemhandle.mrc_rmr)) 991 return (FALSE); 992 993 if (!xdr_uint32(xdrs, &w->c_len)) 994 return (FALSE); 995 996 if (!xdr_uint64(xdrs, &w->u.c_daddr)) 997 return (FALSE); 998 999 w = w->c_next; 1000 } 1001 1002 if (!xdr_bool(xdrs, &vfalse)) 1003 return (FALSE); 1004 1005 return (TRUE); 1006 } 1007 1008 1009 /* 1010 * Conditionally decode a RDMA WRITE chunk list from XDR stream. 1011 * 1012 * If the next boolean in the XDR stream is false there is no 1013 * RDMA WRITE chunk list present. Otherwise iterate over the 1014 * array and for each entry: allocate a struct clist and decode. 1015 * Pass back an indication via wlist_exists if we have seen a 1016 * RDMA WRITE chunk list. 1017 */ 1018 bool_t 1019 xdr_decode_wlist(XDR *xdrs, struct clist **w, bool_t *wlist_exists) 1020 { 1021 struct clist *tmp; 1022 bool_t more = FALSE; 1023 uint32_t seg_array_len; 1024 uint32_t i; 1025 1026 if (!xdr_bool(xdrs, &more)) 1027 return (FALSE); 1028 1029 /* is there a wlist? */ 1030 if (more == FALSE) { 1031 *wlist_exists = FALSE; 1032 return (TRUE); 1033 } 1034 *wlist_exists = TRUE; 1035 1036 if (!xdr_uint32(xdrs, &seg_array_len)) 1037 return (FALSE); 1038 1039 tmp = *w = clist_alloc(); 1040 for (i = 0; i < seg_array_len; i++) { 1041 1042 if (!xdr_uint32(xdrs, &tmp->c_dmemhandle.mrc_rmr)) 1043 return (FALSE); 1044 if (!xdr_uint32(xdrs, &tmp->c_len)) 1045 return (FALSE); 1046 1047 DTRACE_PROBE1(krpc__i__xdr_decode_wlist_len, 1048 uint_t, tmp->c_len); 1049 1050 if (!xdr_uint64(xdrs, &tmp->u.c_daddr)) 1051 return (FALSE); 1052 if (i < seg_array_len - 1) { 1053 tmp->c_next = clist_alloc(); 1054 tmp = tmp->c_next; 1055 } else { 1056 tmp->c_next = NULL; 1057 } 1058 } 1059 1060 more = FALSE; 1061 if (!xdr_bool(xdrs, &more)) 1062 return (FALSE); 1063 1064 return (TRUE); 1065 } 1066 1067 /* 1068 * Server side RDMA WRITE list decode. 1069 * XDR context is memory ops 1070 */ 1071 bool_t 1072 xdr_decode_wlist_svc(XDR *xdrs, struct clist **wclp, bool_t *wwl, 1073 uint32_t *total_length, CONN *conn) 1074 { 1075 struct clist *first, *ncl; 1076 char *memp; 1077 uint32_t num_wclist; 1078 uint32_t wcl_length = 0; 1079 uint32_t i; 1080 bool_t more = FALSE; 1081 1082 *wclp = NULL; 1083 *wwl = FALSE; 1084 *total_length = 0; 1085 1086 if (!xdr_bool(xdrs, &more)) { 1087 return (FALSE); 1088 } 1089 1090 if (more == FALSE) { 1091 return (TRUE); 1092 } 1093 1094 *wwl = TRUE; 1095 1096 if (!xdr_uint32(xdrs, &num_wclist)) { 1097 DTRACE_PROBE(krpc__e__xdrrdma__wlistsvc__listlength); 1098 return (FALSE); 1099 } 1100 1101 first = ncl = clist_alloc(); 1102 1103 for (i = 0; i < num_wclist; i++) { 1104 1105 if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr)) 1106 goto err_out; 1107 if (!xdr_uint32(xdrs, &ncl->c_len)) 1108 goto err_out; 1109 if (!xdr_uint64(xdrs, &ncl->u.c_daddr)) 1110 goto err_out; 1111 1112 if (ncl->c_len > MAX_SVC_XFER_SIZE) { 1113 DTRACE_PROBE( 1114 krpc__e__xdrrdma__wlistsvc__chunklist_toobig); 1115 ncl->c_len = MAX_SVC_XFER_SIZE; 1116 } 1117 1118 DTRACE_PROBE1(krpc__i__xdr_decode_wlist_svc_len, 1119 uint_t, ncl->c_len); 1120 1121 wcl_length += ncl->c_len; 1122 1123 if (i < num_wclist - 1) { 1124 ncl->c_next = clist_alloc(); 1125 ncl = ncl->c_next; 1126 } 1127 } 1128 1129 if (!xdr_bool(xdrs, &more)) 1130 goto err_out; 1131 1132 first->rb_longbuf.type = RDMA_LONG_BUFFER; 1133 first->rb_longbuf.len = 1134 wcl_length > WCL_BUF_LEN ? wcl_length : WCL_BUF_LEN; 1135 1136 if (rdma_buf_alloc(conn, &first->rb_longbuf)) { 1137 clist_free(first); 1138 return (FALSE); 1139 } 1140 1141 memp = first->rb_longbuf.addr; 1142 1143 ncl = first; 1144 for (i = 0; i < num_wclist; i++) { 1145 ncl->w.c_saddr3 = (caddr_t)memp; 1146 memp += ncl->c_len; 1147 ncl = ncl->c_next; 1148 } 1149 1150 *wclp = first; 1151 *total_length = wcl_length; 1152 return (TRUE); 1153 1154 err_out: 1155 clist_free(first); 1156 return (FALSE); 1157 } 1158 1159 /* 1160 * XDR decode the long reply write chunk. 1161 */ 1162 bool_t 1163 xdr_decode_reply_wchunk(XDR *xdrs, struct clist **clist) 1164 { 1165 bool_t have_rchunk = FALSE; 1166 struct clist *first = NULL, *ncl = NULL; 1167 uint32_t num_wclist; 1168 uint32_t i; 1169 1170 if (!xdr_bool(xdrs, &have_rchunk)) 1171 return (FALSE); 1172 1173 if (have_rchunk == FALSE) 1174 return (TRUE); 1175 1176 if (!xdr_uint32(xdrs, &num_wclist)) { 1177 DTRACE_PROBE(krpc__e__xdrrdma__replywchunk__listlength); 1178 return (FALSE); 1179 } 1180 1181 if (num_wclist == 0) { 1182 return (FALSE); 1183 } 1184 1185 first = ncl = clist_alloc(); 1186 1187 for (i = 0; i < num_wclist; i++) { 1188 1189 if (i > 0) { 1190 ncl->c_next = clist_alloc(); 1191 ncl = ncl->c_next; 1192 } 1193 1194 if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr)) 1195 goto err_out; 1196 if (!xdr_uint32(xdrs, &ncl->c_len)) 1197 goto err_out; 1198 if (!xdr_uint64(xdrs, &ncl->u.c_daddr)) 1199 goto err_out; 1200 1201 if (ncl->c_len > MAX_SVC_XFER_SIZE) { 1202 DTRACE_PROBE( 1203 krpc__e__xdrrdma__replywchunk__chunklist_toobig); 1204 ncl->c_len = MAX_SVC_XFER_SIZE; 1205 } 1206 if (!(ncl->c_dmemhandle.mrc_rmr && 1207 (ncl->c_len > 0) && ncl->u.c_daddr)) 1208 DTRACE_PROBE( 1209 krpc__e__xdrrdma__replywchunk__invalid_segaddr); 1210 1211 DTRACE_PROBE1(krpc__i__xdr_decode_reply_wchunk_c_len, 1212 uint32_t, ncl->c_len); 1213 1214 } 1215 *clist = first; 1216 return (TRUE); 1217 1218 err_out: 1219 clist_free(first); 1220 return (FALSE); 1221 } 1222 1223 1224 bool_t 1225 xdr_encode_reply_wchunk(XDR *xdrs, 1226 struct clist *cl_longreply, uint32_t seg_array_len) 1227 { 1228 int i; 1229 bool_t long_reply_exists = TRUE; 1230 uint32_t length; 1231 uint64 offset; 1232 1233 if (seg_array_len > 0) { 1234 if (!xdr_bool(xdrs, &long_reply_exists)) 1235 return (FALSE); 1236 if (!xdr_uint32(xdrs, &seg_array_len)) 1237 return (FALSE); 1238 1239 for (i = 0; i < seg_array_len; i++) { 1240 if (!cl_longreply) 1241 return (FALSE); 1242 length = cl_longreply->c_len; 1243 offset = (uint64) cl_longreply->u.c_daddr; 1244 1245 DTRACE_PROBE1( 1246 krpc__i__xdr_encode_reply_wchunk_c_len, 1247 uint32_t, length); 1248 1249 if (!xdr_uint32(xdrs, 1250 &cl_longreply->c_dmemhandle.mrc_rmr)) 1251 return (FALSE); 1252 if (!xdr_uint32(xdrs, &length)) 1253 return (FALSE); 1254 if (!xdr_uint64(xdrs, &offset)) 1255 return (FALSE); 1256 cl_longreply = cl_longreply->c_next; 1257 } 1258 } else { 1259 long_reply_exists = FALSE; 1260 if (!xdr_bool(xdrs, &long_reply_exists)) 1261 return (FALSE); 1262 } 1263 return (TRUE); 1264 } 1265 bool_t 1266 xdrrdma_read_from_client(struct clist *rlist, CONN **conn, uint_t count) 1267 { 1268 struct clist *rdclist; 1269 struct clist cl; 1270 uint_t total_len = 0; 1271 uint32_t status; 1272 bool_t retval = TRUE; 1273 1274 rlist->rb_longbuf.type = RDMA_LONG_BUFFER; 1275 rlist->rb_longbuf.len = 1276 count > RCL_BUF_LEN ? count : RCL_BUF_LEN; 1277 1278 if (rdma_buf_alloc(*conn, &rlist->rb_longbuf)) { 1279 return (FALSE); 1280 } 1281 1282 /* 1283 * The entire buffer is registered with the first chunk. 1284 * Later chunks will use the same registered memory handle. 1285 */ 1286 1287 cl = *rlist; 1288 cl.c_next = NULL; 1289 if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 1290 rdma_buf_free(*conn, &rlist->rb_longbuf); 1291 DTRACE_PROBE( 1292 krpc__e__xdrrdma__readfromclient__clist__reg); 1293 return (FALSE); 1294 } 1295 1296 rlist->c_regtype = CLIST_REG_DST; 1297 rlist->c_dmemhandle = cl.c_dmemhandle; 1298 rlist->c_dsynchandle = cl.c_dsynchandle; 1299 1300 for (rdclist = rlist; 1301 rdclist != NULL; rdclist = rdclist->c_next) { 1302 total_len += rdclist->c_len; 1303 #if (defined(OBJ32)||defined(DEBUG32)) 1304 rdclist->u.c_daddr3 = 1305 (caddr_t)((char *)rlist->rb_longbuf.addr + 1306 (uint32) rdclist->u.c_daddr3); 1307 #else 1308 rdclist->u.c_daddr3 = 1309 (caddr_t)((char *)rlist->rb_longbuf.addr + 1310 (uint64) rdclist->u.c_daddr); 1311 1312 #endif 1313 cl = (*rdclist); 1314 cl.c_next = NULL; 1315 1316 /* 1317 * Use the same memory handle for all the chunks 1318 */ 1319 cl.c_dmemhandle = rlist->c_dmemhandle; 1320 cl.c_dsynchandle = rlist->c_dsynchandle; 1321 1322 1323 DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen, 1324 int, rdclist->c_len); 1325 1326 /* 1327 * Now read the chunk in 1328 */ 1329 if (rdclist->c_next == NULL) { 1330 status = RDMA_READ(*conn, &cl, WAIT); 1331 } else { 1332 status = RDMA_READ(*conn, &cl, NOWAIT); 1333 } 1334 if (status != RDMA_SUCCESS) { 1335 DTRACE_PROBE( 1336 krpc__e__xdrrdma__readfromclient__readfailed); 1337 rdma_buf_free(*conn, &rlist->rb_longbuf); 1338 return (FALSE); 1339 } 1340 } 1341 1342 cl = (*rlist); 1343 cl.c_next = NULL; 1344 cl.c_len = total_len; 1345 if (clist_syncmem(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 1346 retval = FALSE; 1347 } 1348 return (retval); 1349 } 1350 1351 bool_t 1352 xdrrdma_free_clist(CONN *conn, struct clist *clp) 1353 { 1354 rdma_buf_free(conn, &clp->rb_longbuf); 1355 clist_free(clp); 1356 return (TRUE); 1357 } 1358 1359 bool_t 1360 xdrrdma_send_read_data(XDR *xdrs, uint_t data_len, struct clist *wcl) 1361 { 1362 int status; 1363 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 1364 struct xdr_ops *xops = xdrrdma_xops(); 1365 struct clist *tcl, *wrcl, *cl; 1366 struct clist fcl; 1367 int rndup_present, rnduplen; 1368 1369 rndup_present = 0; 1370 wrcl = NULL; 1371 1372 /* caller is doing a sizeof */ 1373 if (xdrs->x_ops != &xdrrdma_ops || xdrs->x_ops == xops) 1374 return (TRUE); 1375 1376 /* copy of the first chunk */ 1377 fcl = *wcl; 1378 fcl.c_next = NULL; 1379 1380 /* 1381 * The entire buffer is registered with the first chunk. 1382 * Later chunks will use the same registered memory handle. 1383 */ 1384 1385 status = clist_register(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE); 1386 if (status != RDMA_SUCCESS) { 1387 return (FALSE); 1388 } 1389 1390 wcl->c_regtype = CLIST_REG_SOURCE; 1391 wcl->c_smemhandle = fcl.c_smemhandle; 1392 wcl->c_ssynchandle = fcl.c_ssynchandle; 1393 1394 /* 1395 * Only transfer the read data ignoring any trailing 1396 * roundup chunks. A bit of work, but it saves an 1397 * unnecessary extra RDMA_WRITE containing only 1398 * roundup bytes. 1399 */ 1400 1401 rnduplen = clist_len(wcl) - data_len; 1402 1403 if (rnduplen) { 1404 1405 tcl = wcl->c_next; 1406 1407 /* 1408 * Check if there is a trailing roundup chunk 1409 */ 1410 while (tcl) { 1411 if ((tcl->c_next == NULL) && (tcl->c_len == rnduplen)) { 1412 rndup_present = 1; 1413 break; 1414 } 1415 tcl = tcl->c_next; 1416 } 1417 1418 /* 1419 * Make a copy chunk list skipping the last chunk 1420 */ 1421 if (rndup_present) { 1422 cl = wcl; 1423 tcl = NULL; 1424 while (cl) { 1425 if (tcl == NULL) { 1426 tcl = clist_alloc(); 1427 wrcl = tcl; 1428 } else { 1429 tcl->c_next = clist_alloc(); 1430 tcl = tcl->c_next; 1431 } 1432 1433 *tcl = *cl; 1434 cl = cl->c_next; 1435 /* last chunk */ 1436 if (cl->c_next == NULL) 1437 break; 1438 } 1439 tcl->c_next = NULL; 1440 } 1441 } 1442 1443 if (wrcl == NULL) { 1444 /* No roundup chunks */ 1445 wrcl = wcl; 1446 } 1447 1448 /* 1449 * Set the registered memory handles for the 1450 * rest of the chunks same as the first chunk. 1451 */ 1452 tcl = wrcl->c_next; 1453 while (tcl) { 1454 tcl->c_smemhandle = fcl.c_smemhandle; 1455 tcl->c_ssynchandle = fcl.c_ssynchandle; 1456 tcl = tcl->c_next; 1457 } 1458 1459 /* 1460 * Sync the total len beginning from the first chunk. 1461 */ 1462 fcl.c_len = clist_len(wrcl); 1463 status = clist_syncmem(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE); 1464 if (status != RDMA_SUCCESS) { 1465 return (FALSE); 1466 } 1467 1468 status = RDMA_WRITE(xdrp->xp_conn, wrcl, WAIT); 1469 1470 if (rndup_present) 1471 clist_free(wrcl); 1472 1473 if (status != RDMA_SUCCESS) { 1474 return (FALSE); 1475 } 1476 1477 return (TRUE); 1478 } 1479 1480 1481 /* 1482 * Reads one chunk at a time 1483 */ 1484 1485 static bool_t 1486 xdrrdma_read_a_chunk(XDR *xdrs, CONN **conn) 1487 { 1488 int status; 1489 int32_t len = 0; 1490 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 1491 struct clist *cle = *(xdrp->xp_rcl_next); 1492 struct clist *rclp = xdrp->xp_rcl; 1493 struct clist *clp; 1494 1495 /* 1496 * len is used later to decide xdr offset in 1497 * the chunk factoring any 4-byte XDR alignment 1498 * (See read chunk example top of this file) 1499 */ 1500 while (rclp != cle) { 1501 len += rclp->c_len; 1502 rclp = rclp->c_next; 1503 } 1504 1505 len = RNDUP(len) - len; 1506 1507 ASSERT(xdrs->x_handy <= 0); 1508 1509 /* 1510 * If this is the first chunk to contain the RPC 1511 * message set xp_off to the xdr offset of the 1512 * inline message. 1513 */ 1514 if (xdrp->xp_off == 0) 1515 xdrp->xp_off = (xdrp->xp_offp - xdrs->x_base); 1516 1517 if (cle == NULL || (cle->c_xdroff != xdrp->xp_off)) 1518 return (FALSE); 1519 1520 /* 1521 * Make a copy of the chunk to read from client. 1522 * Chunks are read on demand, so read only one 1523 * for now. 1524 */ 1525 1526 rclp = clist_alloc(); 1527 *rclp = *cle; 1528 rclp->c_next = NULL; 1529 1530 xdrp->xp_rcl_next = &cle->c_next; 1531 1532 /* 1533 * If there is a roundup present, then skip those 1534 * bytes when reading. 1535 */ 1536 if (len) { 1537 rclp->w.c_saddr = 1538 (uint64)(uintptr_t)rclp->w.c_saddr + len; 1539 rclp->c_len = rclp->c_len - len; 1540 } 1541 1542 status = xdrrdma_read_from_client(rclp, conn, rclp->c_len); 1543 1544 if (status == FALSE) { 1545 clist_free(rclp); 1546 return (status); 1547 } 1548 1549 xdrp->xp_offp = rclp->rb_longbuf.addr; 1550 xdrs->x_base = xdrp->xp_offp; 1551 xdrs->x_handy = rclp->c_len; 1552 1553 /* 1554 * This copy of read chunks containing the XDR 1555 * message is freed later in xdrrdma_destroy() 1556 */ 1557 1558 if (xdrp->xp_rcl_xdr) { 1559 /* Add the chunk to end of the list */ 1560 clp = xdrp->xp_rcl_xdr; 1561 while (clp->c_next != NULL) 1562 clp = clp->c_next; 1563 clp->c_next = rclp; 1564 } else { 1565 xdrp->xp_rcl_xdr = rclp; 1566 } 1567 return (TRUE); 1568 } 1569 1570 static void 1571 xdrrdma_free_xdr_chunks(CONN *conn, struct clist *xdr_rcl) 1572 { 1573 struct clist *cl; 1574 1575 (void) clist_deregister(conn, xdr_rcl); 1576 1577 /* 1578 * Read chunks containing parts XDR message are 1579 * special: in case of multiple chunks each has 1580 * its own buffer. 1581 */ 1582 1583 cl = xdr_rcl; 1584 while (cl) { 1585 rdma_buf_free(conn, &cl->rb_longbuf); 1586 cl = cl->c_next; 1587 } 1588 1589 clist_free(xdr_rcl); 1590 } 1591