1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 /* 27 * Copyright (c) 2007, The Ohio State University. All rights reserved. 28 * 29 * Portions of this source code is developed by the team members of 30 * The Ohio State University's Network-Based Computing Laboratory (NBCL), 31 * headed by Professor Dhabaleswar K. (DK) Panda. 32 * 33 * Acknowledgements to contributions from developors: 34 * Ranjit Noronha: noronha@cse.ohio-state.edu 35 * Lei Chai : chail@cse.ohio-state.edu 36 * Weikuan Yu : yuw@cse.ohio-state.edu 37 * 38 */ 39 40 /* 41 * xdr_rdma.c, XDR implementation using RDMA to move large chunks 42 */ 43 44 #include <sys/param.h> 45 #include <sys/types.h> 46 #include <sys/systm.h> 47 #include <sys/kmem.h> 48 #include <sys/sdt.h> 49 #include <sys/debug.h> 50 51 #include <rpc/types.h> 52 #include <rpc/xdr.h> 53 #include <sys/cmn_err.h> 54 #include <rpc/rpc_sztypes.h> 55 #include <rpc/rpc_rdma.h> 56 #include <sys/sysmacros.h> 57 58 static bool_t xdrrdma_getint32(XDR *, int32_t *); 59 static bool_t xdrrdma_putint32(XDR *, int32_t *); 60 static bool_t xdrrdma_getbytes(XDR *, caddr_t, int); 61 static bool_t xdrrdma_putbytes(XDR *, caddr_t, int); 62 uint_t xdrrdma_getpos(XDR *); 63 bool_t xdrrdma_setpos(XDR *, uint_t); 64 static rpc_inline_t *xdrrdma_inline(XDR *, int); 65 void xdrrdma_destroy(XDR *); 66 static bool_t xdrrdma_control(XDR *, int, void *); 67 static bool_t xdrrdma_read_a_chunk(XDR *, CONN **); 68 static void xdrrdma_free_xdr_chunks(CONN *, struct clist *); 69 70 struct xdr_ops xdrrdmablk_ops = { 71 xdrrdma_getbytes, 72 xdrrdma_putbytes, 73 xdrrdma_getpos, 74 xdrrdma_setpos, 75 xdrrdma_inline, 76 xdrrdma_destroy, 77 xdrrdma_control, 78 xdrrdma_getint32, 79 xdrrdma_putint32 80 }; 81 82 struct xdr_ops xdrrdma_ops = { 83 xdrrdma_getbytes, 84 xdrrdma_putbytes, 85 xdrrdma_getpos, 86 xdrrdma_setpos, 87 xdrrdma_inline, 88 xdrrdma_destroy, 89 xdrrdma_control, 90 xdrrdma_getint32, 91 xdrrdma_putint32 92 }; 93 94 /* 95 * A chunk list entry identifies a chunk of opaque data to be moved 96 * separately from the rest of the RPC message. xp_min_chunk = 0, is a 97 * special case for ENCODING, which means do not chunk the incoming stream of 98 * data. 99 * 100 * A read chunk can contain part of the RPC message in addition to the 101 * inline message. In such a case, (xp_offp - x_base) will not provide 102 * the correct xdr offset of the entire message. xp_off is used in such 103 * a case to denote the offset or current position in the overall message 104 * covering both the inline and the chunk. This is used only in the case 105 * of decoding and useful to compare read chunk 'c_xdroff' offsets. 106 * 107 * An example for a read chunk containing an XDR message: 108 * An NFSv4 compound as following: 109 * 110 * PUTFH 111 * WRITE [4109 bytes] 112 * GETATTR 113 * 114 * Solaris Encoding is: 115 * ------------------- 116 * 117 * <Inline message>: [PUTFH WRITE4args GETATTR] 118 * | 119 * v 120 * [RDMA_READ chunks]: [write data] 121 * 122 * 123 * Linux encoding is: 124 * ----------------- 125 * 126 * <Inline message>: [PUTFH WRITE4args] 127 * | 128 * v 129 * [RDMA_READ chunks]: [Write data] [Write data2] [Getattr chunk] 130 * chunk1 chunk2 chunk3 131 * 132 * where the READ chunks are as: 133 * 134 * - chunk1 - 4k 135 * write data | 136 * - chunk2 - 13 bytes(4109 - 4k) 137 * getattr op - chunk3 - 19 bytes 138 * (getattr op starts at byte 4 after 3 bytes of roundup) 139 * 140 */ 141 142 typedef struct { 143 caddr_t xp_offp; 144 int xp_min_chunk; 145 uint_t xp_flags; /* Controls setting for rdma xdr */ 146 int xp_buf_size; /* size of xdr buffer */ 147 int xp_off; /* overall offset */ 148 struct clist *xp_rcl; /* head of chunk list */ 149 struct clist **xp_rcl_next; /* location to place/find next chunk */ 150 struct clist *xp_rcl_xdr; /* copy of rcl containing RPC message */ 151 struct clist *xp_wcl; /* head of write chunk list */ 152 CONN *xp_conn; /* connection for chunk data xfer */ 153 uint_t xp_reply_chunk_len; 154 /* used to track length for security modes: integrity/privacy */ 155 uint_t xp_reply_chunk_len_alt; 156 } xrdma_private_t; 157 158 extern kmem_cache_t *clist_cache; 159 160 bool_t 161 xdrrdma_getrdmablk(XDR *xdrs, struct clist **rlist, uint_t *sizep, 162 CONN **conn, const uint_t maxsize) 163 { 164 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 165 struct clist *cle = *(xdrp->xp_rcl_next); 166 struct clist *rdclist = NULL, *prev = NULL; 167 bool_t retval = TRUE; 168 uint32_t cur_offset = 0; 169 uint32_t total_segments = 0; 170 uint32_t actual_segments = 0; 171 uint32_t alen; 172 uint_t total_len; 173 174 ASSERT(xdrs->x_op != XDR_FREE); 175 176 /* 177 * first deal with the length since xdr bytes are counted 178 */ 179 if (!xdr_u_int(xdrs, sizep)) { 180 DTRACE_PROBE(xdr__e__getrdmablk_sizep_fail); 181 return (FALSE); 182 } 183 total_len = *sizep; 184 if (total_len > maxsize) { 185 DTRACE_PROBE2(xdr__e__getrdmablk_bad_size, 186 int, total_len, int, maxsize); 187 return (FALSE); 188 } 189 (*conn) = xdrp->xp_conn; 190 191 /* 192 * if no data we are done 193 */ 194 if (total_len == 0) 195 return (TRUE); 196 197 while (cle) { 198 total_segments++; 199 cle = cle->c_next; 200 } 201 202 cle = *(xdrp->xp_rcl_next); 203 204 /* 205 * If there was a chunk at the current offset, then setup a read 206 * chunk list which records the destination address and length 207 * and will RDMA READ the data in later. 208 */ 209 if (cle == NULL) 210 return (FALSE); 211 212 if (cle->c_xdroff != (xdrp->xp_offp - xdrs->x_base)) 213 return (FALSE); 214 215 /* 216 * Setup the chunk list with appropriate 217 * address (offset) and length 218 */ 219 for (actual_segments = 0; 220 actual_segments < total_segments; actual_segments++) { 221 222 DTRACE_PROBE3(krpc__i__xdrrdma_getrdmablk, uint32_t, cle->c_len, 223 uint32_t, total_len, uint32_t, cle->c_xdroff); 224 225 if (total_len <= 0) 226 break; 227 228 /* 229 * not the first time in the loop 230 */ 231 if (actual_segments > 0) 232 cle = cle->c_next; 233 234 cle->u.c_daddr = (uint64) cur_offset; 235 alen = 0; 236 if (cle->c_len > total_len) { 237 alen = cle->c_len; 238 cle->c_len = total_len; 239 } 240 if (!alen) 241 xdrp->xp_rcl_next = &cle->c_next; 242 243 cur_offset += cle->c_len; 244 total_len -= cle->c_len; 245 246 if ((total_segments - actual_segments - 1) == 0 && 247 total_len > 0) { 248 DTRACE_PROBE(krpc__e__xdrrdma_getblk_chunktooshort); 249 retval = FALSE; 250 } 251 252 if ((total_segments - actual_segments - 1) > 0 && 253 total_len == 0) { 254 DTRACE_PROBE2(krpc__e__xdrrdma_getblk_toobig, 255 int, total_segments, int, actual_segments); 256 } 257 258 rdclist = clist_alloc(); 259 (*rdclist) = (*cle); 260 if ((*rlist) == NULL) 261 (*rlist) = rdclist; 262 if (prev == NULL) 263 prev = rdclist; 264 else { 265 prev->c_next = rdclist; 266 prev = rdclist; 267 } 268 269 } 270 271 out: 272 if (prev != NULL) 273 prev->c_next = NULL; 274 275 /* 276 * Adjust the chunk length, if we read only a part of 277 * a chunk. 278 */ 279 280 if (alen) { 281 cle->w.c_saddr = 282 (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len; 283 cle->c_len = alen - cle->c_len; 284 } 285 286 return (retval); 287 } 288 289 /* 290 * The procedure xdrrdma_create initializes a stream descriptor for a memory 291 * buffer. 292 */ 293 void 294 xdrrdma_create(XDR *xdrs, caddr_t addr, uint_t size, 295 int min_chunk, struct clist *cl, enum xdr_op op, CONN *conn) 296 { 297 xrdma_private_t *xdrp; 298 struct clist *cle; 299 300 xdrs->x_op = op; 301 xdrs->x_ops = &xdrrdma_ops; 302 xdrs->x_base = addr; 303 xdrs->x_handy = size; 304 xdrs->x_public = NULL; 305 306 xdrp = (xrdma_private_t *)kmem_zalloc(sizeof (xrdma_private_t), 307 KM_SLEEP); 308 xdrs->x_private = (caddr_t)xdrp; 309 xdrp->xp_offp = addr; 310 xdrp->xp_min_chunk = min_chunk; 311 xdrp->xp_flags = 0; 312 xdrp->xp_buf_size = size; 313 xdrp->xp_rcl = cl; 314 xdrp->xp_reply_chunk_len = 0; 315 xdrp->xp_reply_chunk_len_alt = 0; 316 317 if (op == XDR_ENCODE && cl != NULL) { 318 /* Find last element in chunk list and set xp_rcl_next */ 319 for (cle = cl; cle->c_next != NULL; cle = cle->c_next) 320 continue; 321 322 xdrp->xp_rcl_next = &(cle->c_next); 323 } else { 324 xdrp->xp_rcl_next = &(xdrp->xp_rcl); 325 } 326 327 xdrp->xp_wcl = NULL; 328 329 xdrp->xp_conn = conn; 330 if (xdrp->xp_min_chunk != 0) 331 xdrp->xp_flags |= XDR_RDMA_CHUNK; 332 } 333 334 /* ARGSUSED */ 335 void 336 xdrrdma_destroy(XDR * xdrs) 337 { 338 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 339 340 if (xdrp == NULL) 341 return; 342 343 if (xdrp->xp_wcl) { 344 if (xdrp->xp_flags & XDR_RDMA_WLIST_REG) { 345 (void) clist_deregister(xdrp->xp_conn, xdrp->xp_wcl); 346 rdma_buf_free(xdrp->xp_conn, 347 &xdrp->xp_wcl->rb_longbuf); 348 } 349 clist_free(xdrp->xp_wcl); 350 } 351 352 if (xdrp->xp_rcl) { 353 if (xdrp->xp_flags & XDR_RDMA_RLIST_REG) { 354 (void) clist_deregister(xdrp->xp_conn, xdrp->xp_rcl); 355 rdma_buf_free(xdrp->xp_conn, 356 &xdrp->xp_rcl->rb_longbuf); 357 } 358 clist_free(xdrp->xp_rcl); 359 } 360 361 if (xdrp->xp_rcl_xdr) 362 xdrrdma_free_xdr_chunks(xdrp->xp_conn, xdrp->xp_rcl_xdr); 363 364 (void) kmem_free(xdrs->x_private, sizeof (xrdma_private_t)); 365 xdrs->x_private = NULL; 366 } 367 368 static bool_t 369 xdrrdma_getint32(XDR *xdrs, int32_t *int32p) 370 { 371 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 372 int chunked = 0; 373 374 if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) { 375 /* 376 * check if rest of the rpc message is in a chunk 377 */ 378 if (!xdrrdma_read_a_chunk(xdrs, &xdrp->xp_conn)) { 379 return (FALSE); 380 } 381 chunked = 1; 382 } 383 384 /* LINTED pointer alignment */ 385 *int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp)))); 386 387 DTRACE_PROBE1(krpc__i__xdrrdma_getint32, int32_t, *int32p); 388 389 xdrp->xp_offp += sizeof (int32_t); 390 391 if (chunked) 392 xdrs->x_handy -= (int)sizeof (int32_t); 393 394 if (xdrp->xp_off != 0) { 395 xdrp->xp_off += sizeof (int32_t); 396 } 397 398 return (TRUE); 399 } 400 401 static bool_t 402 xdrrdma_putint32(XDR *xdrs, int32_t *int32p) 403 { 404 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 405 406 if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) 407 return (FALSE); 408 409 /* LINTED pointer alignment */ 410 *(int32_t *)xdrp->xp_offp = (int32_t)htonl((uint32_t)(*int32p)); 411 xdrp->xp_offp += sizeof (int32_t); 412 413 return (TRUE); 414 } 415 416 /* 417 * DECODE bytes from XDR stream for rdma. 418 * If the XDR stream contains a read chunk list, 419 * it will go through xdrrdma_getrdmablk instead. 420 */ 421 static bool_t 422 xdrrdma_getbytes(XDR *xdrs, caddr_t addr, int len) 423 { 424 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 425 struct clist *cle = *(xdrp->xp_rcl_next); 426 struct clist *cls = *(xdrp->xp_rcl_next); 427 struct clist cl; 428 bool_t retval = TRUE; 429 uint32_t total_len = len; 430 uint32_t cur_offset = 0; 431 uint32_t total_segments = 0; 432 uint32_t actual_segments = 0; 433 uint32_t status = RDMA_SUCCESS; 434 uint32_t alen = 0; 435 uint32_t xpoff; 436 437 while (cle) { 438 total_segments++; 439 cle = cle->c_next; 440 } 441 442 cle = *(xdrp->xp_rcl_next); 443 444 if (xdrp->xp_off) { 445 xpoff = xdrp->xp_off; 446 } else { 447 xpoff = (xdrp->xp_offp - xdrs->x_base); 448 } 449 450 /* 451 * If there was a chunk at the current offset, then setup a read 452 * chunk list which records the destination address and length 453 * and will RDMA READ the data in later. 454 */ 455 456 if (cle != NULL && cle->c_xdroff == xpoff) { 457 for (actual_segments = 0; 458 actual_segments < total_segments; actual_segments++) { 459 460 if (total_len <= 0) 461 break; 462 463 if (status != RDMA_SUCCESS) 464 goto out; 465 466 cle->u.c_daddr = (uint64)(uintptr_t)addr + cur_offset; 467 alen = 0; 468 if (cle->c_len > total_len) { 469 alen = cle->c_len; 470 cle->c_len = total_len; 471 } 472 if (!alen) 473 xdrp->xp_rcl_next = &cle->c_next; 474 475 cur_offset += cle->c_len; 476 total_len -= cle->c_len; 477 478 if ((total_segments - actual_segments - 1) == 0 && 479 total_len > 0) { 480 DTRACE_PROBE( 481 krpc__e__xdrrdma_getbytes_chunktooshort); 482 retval = FALSE; 483 } 484 485 if ((total_segments - actual_segments - 1) > 0 && 486 total_len == 0) { 487 DTRACE_PROBE2(krpc__e__xdrrdma_getbytes_toobig, 488 int, total_segments, int, actual_segments); 489 } 490 491 /* 492 * RDMA READ the chunk data from the remote end. 493 * First prep the destination buffer by registering 494 * it, then RDMA READ the chunk data. Since we are 495 * doing streaming memory, sync the destination 496 * buffer to CPU and deregister the buffer. 497 */ 498 if (xdrp->xp_conn == NULL) { 499 return (FALSE); 500 } 501 cl = *cle; 502 cl.c_next = NULL; 503 status = clist_register(xdrp->xp_conn, &cl, 504 CLIST_REG_DST); 505 if (status != RDMA_SUCCESS) { 506 retval = FALSE; 507 /* 508 * Deregister the previous chunks 509 * before return 510 */ 511 goto out; 512 } 513 514 cle->c_dmemhandle = cl.c_dmemhandle; 515 cle->c_dsynchandle = cl.c_dsynchandle; 516 517 /* 518 * Now read the chunk in 519 */ 520 if ((total_segments - actual_segments - 1) == 0 || 521 total_len == 0) { 522 status = RDMA_READ(xdrp->xp_conn, &cl, WAIT); 523 } else { 524 status = RDMA_READ(xdrp->xp_conn, &cl, NOWAIT); 525 } 526 if (status != RDMA_SUCCESS) { 527 DTRACE_PROBE1( 528 krpc__i__xdrrdma_getblk_readfailed, 529 int, status); 530 retval = FALSE; 531 } 532 533 cle = cle->c_next; 534 535 } 536 537 /* 538 * sync the memory for cpu 539 */ 540 cl = *cls; 541 cl.c_next = NULL; 542 cl.c_len = cur_offset; 543 if (clist_syncmem( 544 xdrp->xp_conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 545 retval = FALSE; 546 } 547 out: 548 549 /* 550 * Deregister the chunks 551 */ 552 cle = cls; 553 while (actual_segments != 0) { 554 cl = *cle; 555 cl.c_next = NULL; 556 557 cl.c_regtype = CLIST_REG_DST; 558 (void) clist_deregister(xdrp->xp_conn, &cl); 559 560 cle = cle->c_next; 561 actual_segments--; 562 } 563 564 if (alen) { 565 cle = *(xdrp->xp_rcl_next); 566 cle->w.c_saddr = 567 (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len; 568 cle->c_len = alen - cle->c_len; 569 } 570 571 return (retval); 572 } 573 574 if ((xdrs->x_handy -= len) < 0) 575 return (FALSE); 576 577 bcopy(xdrp->xp_offp, addr, len); 578 579 xdrp->xp_offp += len; 580 581 if (xdrp->xp_off != 0) 582 xdrp->xp_off += len; 583 584 return (TRUE); 585 } 586 587 /* 588 * ENCODE some bytes into an XDR stream xp_min_chunk = 0, means the stream of 589 * bytes contain no chunks to seperate out, and if the bytes do not fit in 590 * the supplied buffer, grow the buffer and free the old buffer. 591 */ 592 static bool_t 593 xdrrdma_putbytes(XDR *xdrs, caddr_t addr, int len) 594 { 595 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 596 /* 597 * Is this stream accepting chunks? 598 * If so, does the either of the two following conditions exist? 599 * - length of bytes to encode is greater than the min chunk size? 600 * - remaining space in this stream is shorter than length of 601 * bytes to encode? 602 * 603 * If the above exists, then create a chunk for this encoding 604 * and save the addresses, etc. 605 */ 606 if (xdrp->xp_flags & XDR_RDMA_CHUNK && 607 ((xdrp->xp_min_chunk != 0 && 608 len >= xdrp->xp_min_chunk) || 609 (xdrs->x_handy - len < 0))) { 610 struct clist *cle; 611 int offset = xdrp->xp_offp - xdrs->x_base; 612 613 cle = clist_alloc(); 614 cle->c_xdroff = offset; 615 cle->c_len = len; 616 cle->w.c_saddr = (uint64)(uintptr_t)addr; 617 cle->c_next = NULL; 618 619 *(xdrp->xp_rcl_next) = cle; 620 xdrp->xp_rcl_next = &(cle->c_next); 621 622 return (TRUE); 623 } 624 /* Is there enough space to encode what is left? */ 625 if ((xdrs->x_handy -= len) < 0) { 626 return (FALSE); 627 } 628 bcopy(addr, xdrp->xp_offp, len); 629 xdrp->xp_offp += len; 630 631 return (TRUE); 632 } 633 634 uint_t 635 xdrrdma_getpos(XDR *xdrs) 636 { 637 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 638 639 return ((uint_t)((uintptr_t)xdrp->xp_offp - (uintptr_t)xdrs->x_base)); 640 } 641 642 bool_t 643 xdrrdma_setpos(XDR *xdrs, uint_t pos) 644 { 645 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 646 647 caddr_t newaddr = xdrs->x_base + pos; 648 caddr_t lastaddr = xdrp->xp_offp + xdrs->x_handy; 649 ptrdiff_t diff; 650 651 if (newaddr > lastaddr) 652 return (FALSE); 653 654 xdrp->xp_offp = newaddr; 655 diff = lastaddr - newaddr; 656 xdrs->x_handy = (int)diff; 657 658 return (TRUE); 659 } 660 661 /* ARGSUSED */ 662 static rpc_inline_t * 663 xdrrdma_inline(XDR *xdrs, int len) 664 { 665 rpc_inline_t *buf = NULL; 666 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 667 struct clist *cle = *(xdrp->xp_rcl_next); 668 669 if (xdrs->x_op == XDR_DECODE) { 670 /* 671 * Since chunks aren't in-line, check to see whether there is 672 * a chunk in the inline range. 673 */ 674 if (cle != NULL && 675 cle->c_xdroff <= (xdrp->xp_offp - xdrs->x_base + len)) 676 return (NULL); 677 } 678 679 /* LINTED pointer alignment */ 680 buf = (rpc_inline_t *)xdrp->xp_offp; 681 if (!IS_P2ALIGNED(buf, sizeof (int32_t))) 682 return (NULL); 683 684 if ((xdrs->x_handy < len) || (xdrp->xp_min_chunk != 0 && 685 len >= xdrp->xp_min_chunk)) { 686 return (NULL); 687 } else { 688 xdrs->x_handy -= len; 689 xdrp->xp_offp += len; 690 return (buf); 691 } 692 } 693 694 static bool_t 695 xdrrdma_control(XDR *xdrs, int request, void *info) 696 { 697 int32_t *int32p; 698 int len, i; 699 uint_t in_flags; 700 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 701 rdma_chunkinfo_t *rcip = NULL; 702 rdma_wlist_conn_info_t *rwcip = NULL; 703 rdma_chunkinfo_lengths_t *rcilp = NULL; 704 struct uio *uiop; 705 struct clist *rwl = NULL; 706 struct clist *prev = NULL; 707 708 switch (request) { 709 case XDR_PEEK: 710 /* 711 * Return the next 4 byte unit in the XDR stream. 712 */ 713 if (xdrs->x_handy < sizeof (int32_t)) 714 return (FALSE); 715 716 int32p = (int32_t *)info; 717 *int32p = (int32_t)ntohl((uint32_t) 718 (*((int32_t *)(xdrp->xp_offp)))); 719 720 return (TRUE); 721 722 case XDR_SKIPBYTES: 723 /* 724 * Skip the next N bytes in the XDR stream. 725 */ 726 int32p = (int32_t *)info; 727 len = RNDUP((int)(*int32p)); 728 if ((xdrs->x_handy -= len) < 0) 729 return (FALSE); 730 xdrp->xp_offp += len; 731 732 return (TRUE); 733 734 case XDR_RDMA_SET_FLAGS: 735 /* 736 * Set the flags provided in the *info in xp_flags for rdma 737 * xdr stream control. 738 */ 739 int32p = (int32_t *)info; 740 in_flags = (uint_t)(*int32p); 741 742 xdrp->xp_flags |= in_flags; 743 return (TRUE); 744 745 case XDR_RDMA_GET_FLAGS: 746 /* 747 * Get the flags provided in xp_flags return through *info 748 */ 749 int32p = (int32_t *)info; 750 751 *int32p = (int32_t)xdrp->xp_flags; 752 return (TRUE); 753 754 case XDR_RDMA_GET_CHUNK_LEN: 755 rcilp = (rdma_chunkinfo_lengths_t *)info; 756 rcilp->rcil_len = xdrp->xp_reply_chunk_len; 757 rcilp->rcil_len_alt = xdrp->xp_reply_chunk_len_alt; 758 759 return (TRUE); 760 761 case XDR_RDMA_ADD_CHUNK: 762 /* 763 * Store wlist information 764 */ 765 766 rcip = (rdma_chunkinfo_t *)info; 767 768 DTRACE_PROBE2(krpc__i__xdrrdma__control__add__chunk, 769 rci_type_t, rcip->rci_type, uint32, rcip->rci_len); 770 switch (rcip->rci_type) { 771 case RCI_WRITE_UIO_CHUNK: 772 xdrp->xp_reply_chunk_len_alt += rcip->rci_len; 773 774 if (rcip->rci_len < xdrp->xp_min_chunk) { 775 xdrp->xp_wcl = NULL; 776 *(rcip->rci_clpp) = NULL; 777 return (TRUE); 778 } 779 uiop = rcip->rci_a.rci_uiop; 780 781 for (i = 0; i < uiop->uio_iovcnt; i++) { 782 rwl = clist_alloc(); 783 rwl->c_len = uiop->uio_iov[i].iov_len; 784 rwl->u.c_daddr = 785 (uint64)(uintptr_t) 786 (uiop->uio_iov[i].iov_base); 787 /* 788 * if userspace address, put adspace ptr in 789 * clist. If not, then do nothing since it's 790 * already set to NULL (from kmem_zalloc) 791 */ 792 if (uiop->uio_segflg == UIO_USERSPACE) { 793 rwl->c_adspc = ttoproc(curthread)->p_as; 794 } 795 796 if (prev == NULL) 797 prev = rwl; 798 else { 799 prev->c_next = rwl; 800 prev = rwl; 801 } 802 } 803 804 rwl->c_next = NULL; 805 xdrp->xp_wcl = rwl; 806 *(rcip->rci_clpp) = rwl; 807 808 break; 809 810 case RCI_WRITE_ADDR_CHUNK: 811 rwl = clist_alloc(); 812 813 rwl->c_len = rcip->rci_len; 814 rwl->u.c_daddr3 = rcip->rci_a.rci_addr; 815 rwl->c_next = NULL; 816 xdrp->xp_reply_chunk_len_alt += rcip->rci_len; 817 818 xdrp->xp_wcl = rwl; 819 *(rcip->rci_clpp) = rwl; 820 821 break; 822 823 case RCI_REPLY_CHUNK: 824 xdrp->xp_reply_chunk_len += rcip->rci_len; 825 break; 826 } 827 return (TRUE); 828 829 case XDR_RDMA_GET_WLIST: 830 *((struct clist **)info) = xdrp->xp_wcl; 831 return (TRUE); 832 833 case XDR_RDMA_SET_WLIST: 834 xdrp->xp_wcl = (struct clist *)info; 835 return (TRUE); 836 837 case XDR_RDMA_GET_RLIST: 838 *((struct clist **)info) = xdrp->xp_rcl; 839 return (TRUE); 840 841 case XDR_RDMA_GET_WCINFO: 842 rwcip = (rdma_wlist_conn_info_t *)info; 843 844 rwcip->rwci_wlist = xdrp->xp_wcl; 845 rwcip->rwci_conn = xdrp->xp_conn; 846 847 return (TRUE); 848 849 default: 850 return (FALSE); 851 } 852 } 853 854 bool_t xdr_do_clist(XDR *, clist **); 855 856 /* 857 * Not all fields in struct clist are interesting to the RPC over RDMA 858 * protocol. Only XDR the interesting fields. 859 */ 860 bool_t 861 xdr_clist(XDR *xdrs, clist *objp) 862 { 863 if (!xdr_uint32(xdrs, &objp->c_xdroff)) 864 return (FALSE); 865 if (!xdr_uint32(xdrs, &objp->c_smemhandle.mrc_rmr)) 866 return (FALSE); 867 if (!xdr_uint32(xdrs, &objp->c_len)) 868 return (FALSE); 869 if (!xdr_uint64(xdrs, &objp->w.c_saddr)) 870 return (FALSE); 871 if (!xdr_do_clist(xdrs, &objp->c_next)) 872 return (FALSE); 873 return (TRUE); 874 } 875 876 /* 877 * The following two functions are forms of xdr_pointer() 878 * and xdr_reference(). Since the generic versions just 879 * kmem_alloc() a new clist, we actually want to use the 880 * rdma_clist kmem_cache. 881 */ 882 883 /* 884 * Generate or free a clist structure from the 885 * kmem_cache "rdma_clist" 886 */ 887 bool_t 888 xdr_ref_clist(XDR *xdrs, caddr_t *pp) 889 { 890 caddr_t loc = *pp; 891 bool_t stat; 892 893 if (loc == NULL) { 894 switch (xdrs->x_op) { 895 case XDR_FREE: 896 return (TRUE); 897 898 case XDR_DECODE: 899 *pp = loc = (caddr_t)clist_alloc(); 900 break; 901 902 case XDR_ENCODE: 903 ASSERT(loc); 904 break; 905 } 906 } 907 908 stat = xdr_clist(xdrs, (struct clist *)loc); 909 910 if (xdrs->x_op == XDR_FREE) { 911 kmem_cache_free(clist_cache, loc); 912 *pp = NULL; 913 } 914 return (stat); 915 } 916 917 /* 918 * XDR a pointer to a possibly recursive clist. This differs 919 * with xdr_reference in that it can serialize/deserialiaze 920 * trees correctly. 921 * 922 * What is sent is actually a union: 923 * 924 * union object_pointer switch (boolean b) { 925 * case TRUE: object_data data; 926 * case FALSE: void nothing; 927 * } 928 * 929 * > objpp: Pointer to the pointer to the object. 930 * 931 */ 932 933 bool_t 934 xdr_do_clist(XDR *xdrs, clist **objpp) 935 { 936 bool_t more_data; 937 938 more_data = (*objpp != NULL); 939 if (!xdr_bool(xdrs, &more_data)) 940 return (FALSE); 941 if (!more_data) { 942 *objpp = NULL; 943 return (TRUE); 944 } 945 return (xdr_ref_clist(xdrs, (caddr_t *)objpp)); 946 } 947 948 uint_t 949 xdr_getbufsize(XDR *xdrs) 950 { 951 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 952 953 return ((uint_t)xdrp->xp_buf_size); 954 } 955 956 /* ARGSUSED */ 957 bool_t 958 xdr_encode_rlist_svc(XDR *xdrs, clist *rlist) 959 { 960 bool_t vfalse = FALSE; 961 962 ASSERT(rlist == NULL); 963 return (xdr_bool(xdrs, &vfalse)); 964 } 965 966 bool_t 967 xdr_encode_wlist(XDR *xdrs, clist *w) 968 { 969 bool_t vfalse = FALSE, vtrue = TRUE; 970 int i; 971 uint_t num_segment = 0; 972 struct clist *cl; 973 974 /* does a wlist exist? */ 975 if (w == NULL) { 976 return (xdr_bool(xdrs, &vfalse)); 977 } 978 /* Encode N consecutive segments, 1, N, HLOO, ..., HLOO, 0 */ 979 if (!xdr_bool(xdrs, &vtrue)) 980 return (FALSE); 981 982 for (cl = w; cl != NULL; cl = cl->c_next) { 983 num_segment++; 984 } 985 986 if (!xdr_uint32(xdrs, &num_segment)) 987 return (FALSE); 988 for (i = 0; i < num_segment; i++) { 989 990 DTRACE_PROBE1(krpc__i__xdr_encode_wlist_len, uint_t, w->c_len); 991 992 if (!xdr_uint32(xdrs, &w->c_dmemhandle.mrc_rmr)) 993 return (FALSE); 994 995 if (!xdr_uint32(xdrs, &w->c_len)) 996 return (FALSE); 997 998 if (!xdr_uint64(xdrs, &w->u.c_daddr)) 999 return (FALSE); 1000 1001 w = w->c_next; 1002 } 1003 1004 if (!xdr_bool(xdrs, &vfalse)) 1005 return (FALSE); 1006 1007 return (TRUE); 1008 } 1009 1010 1011 /* 1012 * Conditionally decode a RDMA WRITE chunk list from XDR stream. 1013 * 1014 * If the next boolean in the XDR stream is false there is no 1015 * RDMA WRITE chunk list present. Otherwise iterate over the 1016 * array and for each entry: allocate a struct clist and decode. 1017 * Pass back an indication via wlist_exists if we have seen a 1018 * RDMA WRITE chunk list. 1019 */ 1020 bool_t 1021 xdr_decode_wlist(XDR *xdrs, struct clist **w, bool_t *wlist_exists) 1022 { 1023 struct clist *tmp; 1024 bool_t more = FALSE; 1025 uint32_t seg_array_len; 1026 uint32_t i; 1027 1028 if (!xdr_bool(xdrs, &more)) 1029 return (FALSE); 1030 1031 /* is there a wlist? */ 1032 if (more == FALSE) { 1033 *wlist_exists = FALSE; 1034 return (TRUE); 1035 } 1036 *wlist_exists = TRUE; 1037 1038 if (!xdr_uint32(xdrs, &seg_array_len)) 1039 return (FALSE); 1040 1041 tmp = *w = clist_alloc(); 1042 for (i = 0; i < seg_array_len; i++) { 1043 1044 if (!xdr_uint32(xdrs, &tmp->c_dmemhandle.mrc_rmr)) 1045 return (FALSE); 1046 if (!xdr_uint32(xdrs, &tmp->c_len)) 1047 return (FALSE); 1048 1049 DTRACE_PROBE1(krpc__i__xdr_decode_wlist_len, 1050 uint_t, tmp->c_len); 1051 1052 if (!xdr_uint64(xdrs, &tmp->u.c_daddr)) 1053 return (FALSE); 1054 if (i < seg_array_len - 1) { 1055 tmp->c_next = clist_alloc(); 1056 tmp = tmp->c_next; 1057 } else { 1058 tmp->c_next = NULL; 1059 } 1060 } 1061 1062 more = FALSE; 1063 if (!xdr_bool(xdrs, &more)) 1064 return (FALSE); 1065 1066 return (TRUE); 1067 } 1068 1069 /* 1070 * Server side RDMA WRITE list decode. 1071 * XDR context is memory ops 1072 */ 1073 bool_t 1074 xdr_decode_wlist_svc(XDR *xdrs, struct clist **wclp, bool_t *wwl, 1075 uint32_t *total_length, CONN *conn) 1076 { 1077 struct clist *first, *ncl; 1078 char *memp; 1079 uint32_t num_wclist; 1080 uint32_t wcl_length = 0; 1081 uint32_t i; 1082 bool_t more = FALSE; 1083 1084 *wclp = NULL; 1085 *wwl = FALSE; 1086 *total_length = 0; 1087 1088 if (!xdr_bool(xdrs, &more)) { 1089 return (FALSE); 1090 } 1091 1092 if (more == FALSE) { 1093 return (TRUE); 1094 } 1095 1096 *wwl = TRUE; 1097 1098 if (!xdr_uint32(xdrs, &num_wclist)) { 1099 DTRACE_PROBE(krpc__e__xdrrdma__wlistsvc__listlength); 1100 return (FALSE); 1101 } 1102 1103 first = ncl = clist_alloc(); 1104 1105 for (i = 0; i < num_wclist; i++) { 1106 1107 if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr)) 1108 goto err_out; 1109 if (!xdr_uint32(xdrs, &ncl->c_len)) 1110 goto err_out; 1111 if (!xdr_uint64(xdrs, &ncl->u.c_daddr)) 1112 goto err_out; 1113 1114 if (ncl->c_len > MAX_SVC_XFER_SIZE) { 1115 DTRACE_PROBE( 1116 krpc__e__xdrrdma__wlistsvc__chunklist_toobig); 1117 ncl->c_len = MAX_SVC_XFER_SIZE; 1118 } 1119 1120 DTRACE_PROBE1(krpc__i__xdr_decode_wlist_svc_len, 1121 uint_t, ncl->c_len); 1122 1123 wcl_length += ncl->c_len; 1124 1125 if (i < num_wclist - 1) { 1126 ncl->c_next = clist_alloc(); 1127 ncl = ncl->c_next; 1128 } 1129 } 1130 1131 if (!xdr_bool(xdrs, &more)) 1132 goto err_out; 1133 1134 first->rb_longbuf.type = RDMA_LONG_BUFFER; 1135 first->rb_longbuf.len = 1136 wcl_length > WCL_BUF_LEN ? wcl_length : WCL_BUF_LEN; 1137 1138 if (rdma_buf_alloc(conn, &first->rb_longbuf)) { 1139 clist_free(first); 1140 return (FALSE); 1141 } 1142 1143 memp = first->rb_longbuf.addr; 1144 1145 ncl = first; 1146 for (i = 0; i < num_wclist; i++) { 1147 ncl->w.c_saddr3 = (caddr_t)memp; 1148 memp += ncl->c_len; 1149 ncl = ncl->c_next; 1150 } 1151 1152 *wclp = first; 1153 *total_length = wcl_length; 1154 return (TRUE); 1155 1156 err_out: 1157 clist_free(first); 1158 return (FALSE); 1159 } 1160 1161 /* 1162 * XDR decode the long reply write chunk. 1163 */ 1164 bool_t 1165 xdr_decode_reply_wchunk(XDR *xdrs, struct clist **clist) 1166 { 1167 bool_t have_rchunk = FALSE; 1168 struct clist *first = NULL, *ncl = NULL; 1169 uint32_t num_wclist; 1170 uint32_t i; 1171 1172 if (!xdr_bool(xdrs, &have_rchunk)) 1173 return (FALSE); 1174 1175 if (have_rchunk == FALSE) 1176 return (TRUE); 1177 1178 if (!xdr_uint32(xdrs, &num_wclist)) { 1179 DTRACE_PROBE(krpc__e__xdrrdma__replywchunk__listlength); 1180 return (FALSE); 1181 } 1182 1183 if (num_wclist == 0) { 1184 return (FALSE); 1185 } 1186 1187 first = ncl = clist_alloc(); 1188 1189 for (i = 0; i < num_wclist; i++) { 1190 1191 if (i > 0) { 1192 ncl->c_next = clist_alloc(); 1193 ncl = ncl->c_next; 1194 } 1195 1196 if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr)) 1197 goto err_out; 1198 if (!xdr_uint32(xdrs, &ncl->c_len)) 1199 goto err_out; 1200 if (!xdr_uint64(xdrs, &ncl->u.c_daddr)) 1201 goto err_out; 1202 1203 if (ncl->c_len > MAX_SVC_XFER_SIZE) { 1204 DTRACE_PROBE( 1205 krpc__e__xdrrdma__replywchunk__chunklist_toobig); 1206 ncl->c_len = MAX_SVC_XFER_SIZE; 1207 } 1208 if (!(ncl->c_dmemhandle.mrc_rmr && 1209 (ncl->c_len > 0) && ncl->u.c_daddr)) 1210 DTRACE_PROBE( 1211 krpc__e__xdrrdma__replywchunk__invalid_segaddr); 1212 1213 DTRACE_PROBE1(krpc__i__xdr_decode_reply_wchunk_c_len, 1214 uint32_t, ncl->c_len); 1215 1216 } 1217 *clist = first; 1218 return (TRUE); 1219 1220 err_out: 1221 clist_free(first); 1222 return (FALSE); 1223 } 1224 1225 1226 bool_t 1227 xdr_encode_reply_wchunk(XDR *xdrs, 1228 struct clist *cl_longreply, uint32_t seg_array_len) 1229 { 1230 int i; 1231 bool_t long_reply_exists = TRUE; 1232 uint32_t length; 1233 uint64 offset; 1234 1235 if (seg_array_len > 0) { 1236 if (!xdr_bool(xdrs, &long_reply_exists)) 1237 return (FALSE); 1238 if (!xdr_uint32(xdrs, &seg_array_len)) 1239 return (FALSE); 1240 1241 for (i = 0; i < seg_array_len; i++) { 1242 if (!cl_longreply) 1243 return (FALSE); 1244 length = cl_longreply->c_len; 1245 offset = (uint64) cl_longreply->u.c_daddr; 1246 1247 DTRACE_PROBE1( 1248 krpc__i__xdr_encode_reply_wchunk_c_len, 1249 uint32_t, length); 1250 1251 if (!xdr_uint32(xdrs, 1252 &cl_longreply->c_dmemhandle.mrc_rmr)) 1253 return (FALSE); 1254 if (!xdr_uint32(xdrs, &length)) 1255 return (FALSE); 1256 if (!xdr_uint64(xdrs, &offset)) 1257 return (FALSE); 1258 cl_longreply = cl_longreply->c_next; 1259 } 1260 } else { 1261 long_reply_exists = FALSE; 1262 if (!xdr_bool(xdrs, &long_reply_exists)) 1263 return (FALSE); 1264 } 1265 return (TRUE); 1266 } 1267 bool_t 1268 xdrrdma_read_from_client(struct clist *rlist, CONN **conn, uint_t count) 1269 { 1270 struct clist *rdclist; 1271 struct clist cl; 1272 uint_t total_len = 0; 1273 uint32_t status; 1274 bool_t retval = TRUE; 1275 1276 rlist->rb_longbuf.type = RDMA_LONG_BUFFER; 1277 rlist->rb_longbuf.len = 1278 count > RCL_BUF_LEN ? count : RCL_BUF_LEN; 1279 1280 if (rdma_buf_alloc(*conn, &rlist->rb_longbuf)) { 1281 return (FALSE); 1282 } 1283 1284 /* 1285 * The entire buffer is registered with the first chunk. 1286 * Later chunks will use the same registered memory handle. 1287 */ 1288 1289 cl = *rlist; 1290 cl.c_next = NULL; 1291 if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 1292 rdma_buf_free(*conn, &rlist->rb_longbuf); 1293 DTRACE_PROBE( 1294 krpc__e__xdrrdma__readfromclient__clist__reg); 1295 return (FALSE); 1296 } 1297 1298 rlist->c_regtype = CLIST_REG_DST; 1299 rlist->c_dmemhandle = cl.c_dmemhandle; 1300 rlist->c_dsynchandle = cl.c_dsynchandle; 1301 1302 for (rdclist = rlist; 1303 rdclist != NULL; rdclist = rdclist->c_next) { 1304 total_len += rdclist->c_len; 1305 #if (defined(OBJ32)||defined(DEBUG32)) 1306 rdclist->u.c_daddr3 = 1307 (caddr_t)((char *)rlist->rb_longbuf.addr + 1308 (uint32) rdclist->u.c_daddr3); 1309 #else 1310 rdclist->u.c_daddr3 = 1311 (caddr_t)((char *)rlist->rb_longbuf.addr + 1312 (uint64) rdclist->u.c_daddr); 1313 1314 #endif 1315 cl = (*rdclist); 1316 cl.c_next = NULL; 1317 1318 /* 1319 * Use the same memory handle for all the chunks 1320 */ 1321 cl.c_dmemhandle = rlist->c_dmemhandle; 1322 cl.c_dsynchandle = rlist->c_dsynchandle; 1323 1324 1325 DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen, 1326 int, rdclist->c_len); 1327 1328 /* 1329 * Now read the chunk in 1330 */ 1331 if (rdclist->c_next == NULL) { 1332 status = RDMA_READ(*conn, &cl, WAIT); 1333 } else { 1334 status = RDMA_READ(*conn, &cl, NOWAIT); 1335 } 1336 if (status != RDMA_SUCCESS) { 1337 DTRACE_PROBE( 1338 krpc__e__xdrrdma__readfromclient__readfailed); 1339 rdma_buf_free(*conn, &rlist->rb_longbuf); 1340 return (FALSE); 1341 } 1342 } 1343 1344 cl = (*rlist); 1345 cl.c_next = NULL; 1346 cl.c_len = total_len; 1347 if (clist_syncmem(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 1348 retval = FALSE; 1349 } 1350 return (retval); 1351 } 1352 1353 bool_t 1354 xdrrdma_free_clist(CONN *conn, struct clist *clp) 1355 { 1356 rdma_buf_free(conn, &clp->rb_longbuf); 1357 clist_free(clp); 1358 return (TRUE); 1359 } 1360 1361 bool_t 1362 xdrrdma_send_read_data(XDR *xdrs, uint_t data_len, struct clist *wcl) 1363 { 1364 int status; 1365 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 1366 struct xdr_ops *xops = xdrrdma_xops(); 1367 struct clist *tcl, *wrcl, *cl; 1368 struct clist fcl; 1369 int rndup_present, rnduplen; 1370 1371 rndup_present = 0; 1372 wrcl = NULL; 1373 1374 /* caller is doing a sizeof */ 1375 if (xdrs->x_ops != &xdrrdma_ops || xdrs->x_ops == xops) 1376 return (TRUE); 1377 1378 /* copy of the first chunk */ 1379 fcl = *wcl; 1380 fcl.c_next = NULL; 1381 1382 /* 1383 * The entire buffer is registered with the first chunk. 1384 * Later chunks will use the same registered memory handle. 1385 */ 1386 1387 status = clist_register(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE); 1388 if (status != RDMA_SUCCESS) { 1389 return (FALSE); 1390 } 1391 1392 wcl->c_regtype = CLIST_REG_SOURCE; 1393 wcl->c_smemhandle = fcl.c_smemhandle; 1394 wcl->c_ssynchandle = fcl.c_ssynchandle; 1395 1396 /* 1397 * Only transfer the read data ignoring any trailing 1398 * roundup chunks. A bit of work, but it saves an 1399 * unnecessary extra RDMA_WRITE containing only 1400 * roundup bytes. 1401 */ 1402 1403 rnduplen = clist_len(wcl) - data_len; 1404 1405 if (rnduplen) { 1406 1407 tcl = wcl->c_next; 1408 1409 /* 1410 * Check if there is a trailing roundup chunk 1411 */ 1412 while (tcl) { 1413 if ((tcl->c_next == NULL) && (tcl->c_len == rnduplen)) { 1414 rndup_present = 1; 1415 break; 1416 } 1417 tcl = tcl->c_next; 1418 } 1419 1420 /* 1421 * Make a copy chunk list skipping the last chunk 1422 */ 1423 if (rndup_present) { 1424 cl = wcl; 1425 tcl = NULL; 1426 while (cl) { 1427 if (tcl == NULL) { 1428 tcl = clist_alloc(); 1429 wrcl = tcl; 1430 } else { 1431 tcl->c_next = clist_alloc(); 1432 tcl = tcl->c_next; 1433 } 1434 1435 *tcl = *cl; 1436 cl = cl->c_next; 1437 /* last chunk */ 1438 if (cl->c_next == NULL) 1439 break; 1440 } 1441 tcl->c_next = NULL; 1442 } 1443 } 1444 1445 if (wrcl == NULL) { 1446 /* No roundup chunks */ 1447 wrcl = wcl; 1448 } 1449 1450 /* 1451 * Set the registered memory handles for the 1452 * rest of the chunks same as the first chunk. 1453 */ 1454 tcl = wrcl->c_next; 1455 while (tcl) { 1456 tcl->c_smemhandle = fcl.c_smemhandle; 1457 tcl->c_ssynchandle = fcl.c_ssynchandle; 1458 tcl = tcl->c_next; 1459 } 1460 1461 /* 1462 * Sync the total len beginning from the first chunk. 1463 */ 1464 fcl.c_len = clist_len(wrcl); 1465 status = clist_syncmem(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE); 1466 if (status != RDMA_SUCCESS) { 1467 return (FALSE); 1468 } 1469 1470 status = RDMA_WRITE(xdrp->xp_conn, wrcl, WAIT); 1471 1472 if (rndup_present) 1473 clist_free(wrcl); 1474 1475 if (status != RDMA_SUCCESS) { 1476 return (FALSE); 1477 } 1478 1479 return (TRUE); 1480 } 1481 1482 1483 /* 1484 * Reads one chunk at a time 1485 */ 1486 1487 static bool_t 1488 xdrrdma_read_a_chunk(XDR *xdrs, CONN **conn) 1489 { 1490 int status; 1491 int32_t len = 0; 1492 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 1493 struct clist *cle = *(xdrp->xp_rcl_next); 1494 struct clist *rclp = xdrp->xp_rcl; 1495 struct clist *clp; 1496 1497 /* 1498 * len is used later to decide xdr offset in 1499 * the chunk factoring any 4-byte XDR alignment 1500 * (See read chunk example top of this file) 1501 */ 1502 while (rclp != cle) { 1503 len += rclp->c_len; 1504 rclp = rclp->c_next; 1505 } 1506 1507 len = RNDUP(len) - len; 1508 1509 ASSERT(xdrs->x_handy <= 0); 1510 1511 /* 1512 * If this is the first chunk to contain the RPC 1513 * message set xp_off to the xdr offset of the 1514 * inline message. 1515 */ 1516 if (xdrp->xp_off == 0) 1517 xdrp->xp_off = (xdrp->xp_offp - xdrs->x_base); 1518 1519 if (cle == NULL || (cle->c_xdroff != xdrp->xp_off)) 1520 return (FALSE); 1521 1522 /* 1523 * Make a copy of the chunk to read from client. 1524 * Chunks are read on demand, so read only one 1525 * for now. 1526 */ 1527 1528 rclp = clist_alloc(); 1529 *rclp = *cle; 1530 rclp->c_next = NULL; 1531 1532 xdrp->xp_rcl_next = &cle->c_next; 1533 1534 /* 1535 * If there is a roundup present, then skip those 1536 * bytes when reading. 1537 */ 1538 if (len) { 1539 rclp->w.c_saddr = 1540 (uint64)(uintptr_t)rclp->w.c_saddr + len; 1541 rclp->c_len = rclp->c_len - len; 1542 } 1543 1544 status = xdrrdma_read_from_client(rclp, conn, rclp->c_len); 1545 1546 if (status == FALSE) { 1547 clist_free(rclp); 1548 return (status); 1549 } 1550 1551 xdrp->xp_offp = rclp->rb_longbuf.addr; 1552 xdrs->x_base = xdrp->xp_offp; 1553 xdrs->x_handy = rclp->c_len; 1554 1555 /* 1556 * This copy of read chunks containing the XDR 1557 * message is freed later in xdrrdma_destroy() 1558 */ 1559 1560 if (xdrp->xp_rcl_xdr) { 1561 /* Add the chunk to end of the list */ 1562 clp = xdrp->xp_rcl_xdr; 1563 while (clp->c_next != NULL) 1564 clp = clp->c_next; 1565 clp->c_next = rclp; 1566 } else { 1567 xdrp->xp_rcl_xdr = rclp; 1568 } 1569 return (TRUE); 1570 } 1571 1572 static void 1573 xdrrdma_free_xdr_chunks(CONN *conn, struct clist *xdr_rcl) 1574 { 1575 struct clist *cl; 1576 1577 (void) clist_deregister(conn, xdr_rcl); 1578 1579 /* 1580 * Read chunks containing parts XDR message are 1581 * special: in case of multiple chunks each has 1582 * its own buffer. 1583 */ 1584 1585 cl = xdr_rcl; 1586 while (cl) { 1587 rdma_buf_free(conn, &cl->rb_longbuf); 1588 cl = cl->c_next; 1589 } 1590 1591 clist_free(xdr_rcl); 1592 } 1593