1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Copyright (c) 2007, The Ohio State University. All rights reserved. 27 * 28 * Portions of this source code is developed by the team members of 29 * The Ohio State University's Network-Based Computing Laboratory (NBCL), 30 * headed by Professor Dhabaleswar K. (DK) Panda. 31 * 32 * Acknowledgements to contributions from developors: 33 * Ranjit Noronha: noronha@cse.ohio-state.edu 34 * Lei Chai : chail@cse.ohio-state.edu 35 * Weikuan Yu : yuw@cse.ohio-state.edu 36 * 37 */ 38 39 /* 40 * xdr_rdma.c, XDR implementation using RDMA to move large chunks 41 */ 42 43 #include <sys/param.h> 44 #include <sys/types.h> 45 #include <sys/systm.h> 46 #include <sys/kmem.h> 47 #include <sys/sdt.h> 48 #include <sys/debug.h> 49 50 #include <rpc/types.h> 51 #include <rpc/xdr.h> 52 #include <sys/cmn_err.h> 53 #include <rpc/rpc_sztypes.h> 54 #include <rpc/rpc_rdma.h> 55 #include <sys/sysmacros.h> 56 57 /* 58 * RCP header and xdr encoding overhead. The number was determined by 59 * tracing the msglen in svc_rdma_ksend for sec=sys,krb5,krb5i and krb5p. 60 * If the XDR_RDMA_BUF_OVERHEAD is not large enough the result is the trigger 61 * of the dtrace probe on the server "krpc-e-svcrdma-ksend-noreplycl" from 62 * svc_rdma_ksend. 63 */ 64 #define XDR_RDMA_BUF_OVERHEAD 300 65 66 static bool_t xdrrdma_getint32(XDR *, int32_t *); 67 static bool_t xdrrdma_putint32(XDR *, int32_t *); 68 static bool_t xdrrdma_getbytes(XDR *, caddr_t, int); 69 static bool_t xdrrdma_putbytes(XDR *, caddr_t, int); 70 uint_t xdrrdma_getpos(XDR *); 71 bool_t xdrrdma_setpos(XDR *, uint_t); 72 static rpc_inline_t *xdrrdma_inline(XDR *, int); 73 void xdrrdma_destroy(XDR *); 74 static bool_t xdrrdma_control(XDR *, int, void *); 75 static bool_t xdrrdma_read_a_chunk(XDR *, CONN **); 76 static void xdrrdma_free_xdr_chunks(CONN *, struct clist *); 77 78 struct xdr_ops xdrrdmablk_ops = { 79 xdrrdma_getbytes, 80 xdrrdma_putbytes, 81 xdrrdma_getpos, 82 xdrrdma_setpos, 83 xdrrdma_inline, 84 xdrrdma_destroy, 85 xdrrdma_control, 86 xdrrdma_getint32, 87 xdrrdma_putint32 88 }; 89 90 struct xdr_ops xdrrdma_ops = { 91 xdrrdma_getbytes, 92 xdrrdma_putbytes, 93 xdrrdma_getpos, 94 xdrrdma_setpos, 95 xdrrdma_inline, 96 xdrrdma_destroy, 97 xdrrdma_control, 98 xdrrdma_getint32, 99 xdrrdma_putint32 100 }; 101 102 /* 103 * A chunk list entry identifies a chunk of opaque data to be moved 104 * separately from the rest of the RPC message. xp_min_chunk = 0, is a 105 * special case for ENCODING, which means do not chunk the incoming stream of 106 * data. 107 * 108 * A read chunk can contain part of the RPC message in addition to the 109 * inline message. In such a case, (xp_offp - x_base) will not provide 110 * the correct xdr offset of the entire message. xp_off is used in such 111 * a case to denote the offset or current position in the overall message 112 * covering both the inline and the chunk. This is used only in the case 113 * of decoding and useful to compare read chunk 'c_xdroff' offsets. 114 * 115 * An example for a read chunk containing an XDR message: 116 * An NFSv4 compound as following: 117 * 118 * PUTFH 119 * WRITE [4109 bytes] 120 * GETATTR 121 * 122 * Solaris Encoding is: 123 * ------------------- 124 * 125 * <Inline message>: [PUTFH WRITE4args GETATTR] 126 * | 127 * v 128 * [RDMA_READ chunks]: [write data] 129 * 130 * 131 * Linux encoding is: 132 * ----------------- 133 * 134 * <Inline message>: [PUTFH WRITE4args] 135 * | 136 * v 137 * [RDMA_READ chunks]: [Write data] [Write data2] [Getattr chunk] 138 * chunk1 chunk2 chunk3 139 * 140 * where the READ chunks are as: 141 * 142 * - chunk1 - 4k 143 * write data | 144 * - chunk2 - 13 bytes(4109 - 4k) 145 * getattr op - chunk3 - 19 bytes 146 * (getattr op starts at byte 4 after 3 bytes of roundup) 147 * 148 */ 149 150 typedef struct { 151 caddr_t xp_offp; 152 int xp_min_chunk; 153 uint_t xp_flags; /* Controls setting for rdma xdr */ 154 int xp_buf_size; /* size of xdr buffer */ 155 int xp_off; /* overall offset */ 156 struct clist *xp_rcl; /* head of chunk list */ 157 struct clist **xp_rcl_next; /* location to place/find next chunk */ 158 struct clist *xp_rcl_xdr; /* copy of rcl containing RPC message */ 159 struct clist *xp_wcl; /* head of write chunk list */ 160 CONN *xp_conn; /* connection for chunk data xfer */ 161 uint_t xp_reply_chunk_len; 162 /* used to track length for security modes: integrity/privacy */ 163 uint_t xp_reply_chunk_len_alt; 164 } xrdma_private_t; 165 166 extern kmem_cache_t *clist_cache; 167 168 bool_t 169 xdrrdma_getrdmablk(XDR *xdrs, struct clist **rlist, uint_t *sizep, 170 CONN **conn, const uint_t maxsize) 171 { 172 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 173 struct clist *cle = *(xdrp->xp_rcl_next); 174 struct clist *rdclist = NULL, *prev = NULL; 175 bool_t retval = TRUE; 176 uint32_t cur_offset = 0; 177 uint32_t total_segments = 0; 178 uint32_t actual_segments = 0; 179 uint32_t alen; 180 uint_t total_len; 181 182 ASSERT(xdrs->x_op != XDR_FREE); 183 184 /* 185 * first deal with the length since xdr bytes are counted 186 */ 187 if (!xdr_u_int(xdrs, sizep)) { 188 DTRACE_PROBE(xdr__e__getrdmablk_sizep_fail); 189 return (FALSE); 190 } 191 total_len = *sizep; 192 if (total_len > maxsize) { 193 DTRACE_PROBE2(xdr__e__getrdmablk_bad_size, 194 int, total_len, int, maxsize); 195 return (FALSE); 196 } 197 (*conn) = xdrp->xp_conn; 198 199 /* 200 * if no data we are done 201 */ 202 if (total_len == 0) 203 return (TRUE); 204 205 while (cle) { 206 total_segments++; 207 cle = cle->c_next; 208 } 209 210 cle = *(xdrp->xp_rcl_next); 211 212 /* 213 * If there was a chunk at the current offset, then setup a read 214 * chunk list which records the destination address and length 215 * and will RDMA READ the data in later. 216 */ 217 if (cle == NULL) 218 return (FALSE); 219 220 if (cle->c_xdroff != (xdrp->xp_offp - xdrs->x_base)) 221 return (FALSE); 222 223 /* 224 * Setup the chunk list with appropriate 225 * address (offset) and length 226 */ 227 for (actual_segments = 0; 228 actual_segments < total_segments; actual_segments++) { 229 230 DTRACE_PROBE3(krpc__i__xdrrdma_getrdmablk, uint32_t, cle->c_len, 231 uint32_t, total_len, uint32_t, cle->c_xdroff); 232 233 if (total_len <= 0) 234 break; 235 236 /* 237 * not the first time in the loop 238 */ 239 if (actual_segments > 0) 240 cle = cle->c_next; 241 242 cle->u.c_daddr = (uint64) cur_offset; 243 alen = 0; 244 if (cle->c_len > total_len) { 245 alen = cle->c_len; 246 cle->c_len = total_len; 247 } 248 if (!alen) 249 xdrp->xp_rcl_next = &cle->c_next; 250 251 cur_offset += cle->c_len; 252 total_len -= cle->c_len; 253 254 if ((total_segments - actual_segments - 1) == 0 && 255 total_len > 0) { 256 DTRACE_PROBE(krpc__e__xdrrdma_getblk_chunktooshort); 257 retval = FALSE; 258 } 259 260 if ((total_segments - actual_segments - 1) > 0 && 261 total_len == 0) { 262 DTRACE_PROBE2(krpc__e__xdrrdma_getblk_toobig, 263 int, total_segments, int, actual_segments); 264 } 265 266 rdclist = clist_alloc(); 267 (*rdclist) = (*cle); 268 if ((*rlist) == NULL) 269 (*rlist) = rdclist; 270 if (prev == NULL) 271 prev = rdclist; 272 else { 273 prev->c_next = rdclist; 274 prev = rdclist; 275 } 276 277 } 278 279 if (prev != NULL) 280 prev->c_next = NULL; 281 282 /* 283 * Adjust the chunk length, if we read only a part of 284 * a chunk. 285 */ 286 287 if (alen) { 288 cle->w.c_saddr = 289 (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len; 290 cle->c_len = alen - cle->c_len; 291 } 292 293 return (retval); 294 } 295 296 /* 297 * The procedure xdrrdma_create initializes a stream descriptor for a memory 298 * buffer. 299 */ 300 void 301 xdrrdma_create(XDR *xdrs, caddr_t addr, uint_t size, 302 int min_chunk, struct clist *cl, enum xdr_op op, CONN *conn) 303 { 304 xrdma_private_t *xdrp; 305 struct clist *cle; 306 307 xdrs->x_op = op; 308 xdrs->x_ops = &xdrrdma_ops; 309 xdrs->x_base = addr; 310 xdrs->x_handy = size; 311 xdrs->x_public = NULL; 312 313 xdrp = (xrdma_private_t *)kmem_zalloc(sizeof (xrdma_private_t), 314 KM_SLEEP); 315 xdrs->x_private = (caddr_t)xdrp; 316 xdrp->xp_offp = addr; 317 xdrp->xp_min_chunk = min_chunk; 318 xdrp->xp_flags = 0; 319 xdrp->xp_buf_size = size; 320 xdrp->xp_rcl = cl; 321 xdrp->xp_reply_chunk_len = 0; 322 xdrp->xp_reply_chunk_len_alt = 0; 323 324 if (op == XDR_ENCODE && cl != NULL) { 325 /* Find last element in chunk list and set xp_rcl_next */ 326 for (cle = cl; cle->c_next != NULL; cle = cle->c_next) 327 continue; 328 329 xdrp->xp_rcl_next = &(cle->c_next); 330 } else { 331 xdrp->xp_rcl_next = &(xdrp->xp_rcl); 332 } 333 334 xdrp->xp_wcl = NULL; 335 336 xdrp->xp_conn = conn; 337 if (xdrp->xp_min_chunk != 0) 338 xdrp->xp_flags |= XDR_RDMA_CHUNK; 339 } 340 341 /* ARGSUSED */ 342 void 343 xdrrdma_destroy(XDR * xdrs) 344 { 345 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 346 347 if (xdrp == NULL) 348 return; 349 350 if (xdrp->xp_wcl) { 351 if (xdrp->xp_flags & XDR_RDMA_WLIST_REG) { 352 (void) clist_deregister(xdrp->xp_conn, xdrp->xp_wcl); 353 rdma_buf_free(xdrp->xp_conn, 354 &xdrp->xp_wcl->rb_longbuf); 355 } 356 clist_free(xdrp->xp_wcl); 357 } 358 359 if (xdrp->xp_rcl) { 360 if (xdrp->xp_flags & XDR_RDMA_RLIST_REG) { 361 (void) clist_deregister(xdrp->xp_conn, xdrp->xp_rcl); 362 rdma_buf_free(xdrp->xp_conn, 363 &xdrp->xp_rcl->rb_longbuf); 364 } 365 clist_free(xdrp->xp_rcl); 366 } 367 368 if (xdrp->xp_rcl_xdr) 369 xdrrdma_free_xdr_chunks(xdrp->xp_conn, xdrp->xp_rcl_xdr); 370 371 (void) kmem_free(xdrs->x_private, sizeof (xrdma_private_t)); 372 xdrs->x_private = NULL; 373 } 374 375 static bool_t 376 xdrrdma_getint32(XDR *xdrs, int32_t *int32p) 377 { 378 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 379 int chunked = 0; 380 381 if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) { 382 /* 383 * check if rest of the rpc message is in a chunk 384 */ 385 if (!xdrrdma_read_a_chunk(xdrs, &xdrp->xp_conn)) { 386 return (FALSE); 387 } 388 chunked = 1; 389 } 390 391 /* LINTED pointer alignment */ 392 *int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp)))); 393 394 DTRACE_PROBE1(krpc__i__xdrrdma_getint32, int32_t, *int32p); 395 396 xdrp->xp_offp += sizeof (int32_t); 397 398 if (chunked) 399 xdrs->x_handy -= (int)sizeof (int32_t); 400 401 if (xdrp->xp_off != 0) { 402 xdrp->xp_off += sizeof (int32_t); 403 } 404 405 return (TRUE); 406 } 407 408 static bool_t 409 xdrrdma_putint32(XDR *xdrs, int32_t *int32p) 410 { 411 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 412 413 if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) 414 return (FALSE); 415 416 /* LINTED pointer alignment */ 417 *(int32_t *)xdrp->xp_offp = (int32_t)htonl((uint32_t)(*int32p)); 418 xdrp->xp_offp += sizeof (int32_t); 419 420 return (TRUE); 421 } 422 423 /* 424 * DECODE bytes from XDR stream for rdma. 425 * If the XDR stream contains a read chunk list, 426 * it will go through xdrrdma_getrdmablk instead. 427 */ 428 static bool_t 429 xdrrdma_getbytes(XDR *xdrs, caddr_t addr, int len) 430 { 431 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 432 struct clist *cle = *(xdrp->xp_rcl_next); 433 struct clist *cls = *(xdrp->xp_rcl_next); 434 struct clist cl; 435 bool_t retval = TRUE; 436 uint32_t total_len = len; 437 uint32_t cur_offset = 0; 438 uint32_t total_segments = 0; 439 uint32_t actual_segments = 0; 440 uint32_t status = RDMA_SUCCESS; 441 uint32_t alen = 0; 442 uint32_t xpoff; 443 444 while (cle) { 445 total_segments++; 446 cle = cle->c_next; 447 } 448 449 cle = *(xdrp->xp_rcl_next); 450 451 if (xdrp->xp_off) { 452 xpoff = xdrp->xp_off; 453 } else { 454 xpoff = (xdrp->xp_offp - xdrs->x_base); 455 } 456 457 /* 458 * If there was a chunk at the current offset, then setup a read 459 * chunk list which records the destination address and length 460 * and will RDMA READ the data in later. 461 */ 462 463 if (cle != NULL && cle->c_xdroff == xpoff) { 464 for (actual_segments = 0; 465 actual_segments < total_segments; actual_segments++) { 466 467 if (total_len <= 0) 468 break; 469 470 if (status != RDMA_SUCCESS) 471 goto out; 472 473 cle->u.c_daddr = (uint64)(uintptr_t)addr + cur_offset; 474 alen = 0; 475 if (cle->c_len > total_len) { 476 alen = cle->c_len; 477 cle->c_len = total_len; 478 } 479 if (!alen) 480 xdrp->xp_rcl_next = &cle->c_next; 481 482 cur_offset += cle->c_len; 483 total_len -= cle->c_len; 484 485 if ((total_segments - actual_segments - 1) == 0 && 486 total_len > 0) { 487 DTRACE_PROBE( 488 krpc__e__xdrrdma_getbytes_chunktooshort); 489 retval = FALSE; 490 } 491 492 if ((total_segments - actual_segments - 1) > 0 && 493 total_len == 0) { 494 DTRACE_PROBE2(krpc__e__xdrrdma_getbytes_toobig, 495 int, total_segments, int, actual_segments); 496 } 497 498 /* 499 * RDMA READ the chunk data from the remote end. 500 * First prep the destination buffer by registering 501 * it, then RDMA READ the chunk data. Since we are 502 * doing streaming memory, sync the destination 503 * buffer to CPU and deregister the buffer. 504 */ 505 if (xdrp->xp_conn == NULL) { 506 return (FALSE); 507 } 508 cl = *cle; 509 cl.c_next = NULL; 510 status = clist_register(xdrp->xp_conn, &cl, 511 CLIST_REG_DST); 512 if (status != RDMA_SUCCESS) { 513 retval = FALSE; 514 /* 515 * Deregister the previous chunks 516 * before return 517 */ 518 goto out; 519 } 520 521 cle->c_dmemhandle = cl.c_dmemhandle; 522 cle->c_dsynchandle = cl.c_dsynchandle; 523 524 /* 525 * Now read the chunk in 526 */ 527 if ((total_segments - actual_segments - 1) == 0 || 528 total_len == 0) { 529 status = RDMA_READ(xdrp->xp_conn, &cl, WAIT); 530 } else { 531 status = RDMA_READ(xdrp->xp_conn, &cl, NOWAIT); 532 } 533 if (status != RDMA_SUCCESS) { 534 DTRACE_PROBE1( 535 krpc__i__xdrrdma_getblk_readfailed, 536 int, status); 537 retval = FALSE; 538 } 539 540 cle = cle->c_next; 541 542 } 543 544 /* 545 * sync the memory for cpu 546 */ 547 cl = *cls; 548 cl.c_next = NULL; 549 cl.c_len = cur_offset; 550 if (clist_syncmem( 551 xdrp->xp_conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 552 retval = FALSE; 553 } 554 out: 555 556 /* 557 * Deregister the chunks 558 */ 559 cle = cls; 560 while (actual_segments != 0) { 561 cl = *cle; 562 cl.c_next = NULL; 563 564 cl.c_regtype = CLIST_REG_DST; 565 (void) clist_deregister(xdrp->xp_conn, &cl); 566 567 cle = cle->c_next; 568 actual_segments--; 569 } 570 571 if (alen) { 572 cle = *(xdrp->xp_rcl_next); 573 cle->w.c_saddr = 574 (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len; 575 cle->c_len = alen - cle->c_len; 576 } 577 578 return (retval); 579 } 580 581 if ((xdrs->x_handy -= len) < 0) 582 return (FALSE); 583 584 bcopy(xdrp->xp_offp, addr, len); 585 586 xdrp->xp_offp += len; 587 588 if (xdrp->xp_off != 0) 589 xdrp->xp_off += len; 590 591 return (TRUE); 592 } 593 594 /* 595 * ENCODE some bytes into an XDR stream xp_min_chunk = 0, means the stream of 596 * bytes contain no chunks to seperate out, and if the bytes do not fit in 597 * the supplied buffer, grow the buffer and free the old buffer. 598 */ 599 static bool_t 600 xdrrdma_putbytes(XDR *xdrs, caddr_t addr, int len) 601 { 602 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 603 /* 604 * Is this stream accepting chunks? 605 * If so, does the either of the two following conditions exist? 606 * - length of bytes to encode is greater than the min chunk size? 607 * - remaining space in this stream is shorter than length of 608 * bytes to encode? 609 * 610 * If the above exists, then create a chunk for this encoding 611 * and save the addresses, etc. 612 */ 613 if (xdrp->xp_flags & XDR_RDMA_CHUNK && 614 ((xdrp->xp_min_chunk != 0 && 615 len >= xdrp->xp_min_chunk) || 616 (xdrs->x_handy - len < 0))) { 617 struct clist *cle; 618 int offset = xdrp->xp_offp - xdrs->x_base; 619 620 cle = clist_alloc(); 621 cle->c_xdroff = offset; 622 cle->c_len = len; 623 cle->w.c_saddr = (uint64)(uintptr_t)addr; 624 cle->c_next = NULL; 625 626 *(xdrp->xp_rcl_next) = cle; 627 xdrp->xp_rcl_next = &(cle->c_next); 628 629 return (TRUE); 630 } 631 /* Is there enough space to encode what is left? */ 632 if ((xdrs->x_handy -= len) < 0) { 633 return (FALSE); 634 } 635 bcopy(addr, xdrp->xp_offp, len); 636 xdrp->xp_offp += len; 637 638 return (TRUE); 639 } 640 641 uint_t 642 xdrrdma_getpos(XDR *xdrs) 643 { 644 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 645 646 return ((uint_t)((uintptr_t)xdrp->xp_offp - (uintptr_t)xdrs->x_base)); 647 } 648 649 bool_t 650 xdrrdma_setpos(XDR *xdrs, uint_t pos) 651 { 652 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 653 654 caddr_t newaddr = xdrs->x_base + pos; 655 caddr_t lastaddr = xdrp->xp_offp + xdrs->x_handy; 656 ptrdiff_t diff; 657 658 if (newaddr > lastaddr) 659 return (FALSE); 660 661 xdrp->xp_offp = newaddr; 662 diff = lastaddr - newaddr; 663 xdrs->x_handy = (int)diff; 664 665 return (TRUE); 666 } 667 668 /* ARGSUSED */ 669 static rpc_inline_t * 670 xdrrdma_inline(XDR *xdrs, int len) 671 { 672 rpc_inline_t *buf = NULL; 673 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 674 struct clist *cle = *(xdrp->xp_rcl_next); 675 676 if (xdrs->x_op == XDR_DECODE) { 677 /* 678 * Since chunks aren't in-line, check to see whether there is 679 * a chunk in the inline range. 680 */ 681 if (cle != NULL && 682 cle->c_xdroff <= (xdrp->xp_offp - xdrs->x_base + len)) 683 return (NULL); 684 } 685 686 /* LINTED pointer alignment */ 687 buf = (rpc_inline_t *)xdrp->xp_offp; 688 if (!IS_P2ALIGNED(buf, sizeof (int32_t))) 689 return (NULL); 690 691 if ((xdrs->x_handy < len) || (xdrp->xp_min_chunk != 0 && 692 len >= xdrp->xp_min_chunk)) { 693 return (NULL); 694 } else { 695 xdrs->x_handy -= len; 696 xdrp->xp_offp += len; 697 return (buf); 698 } 699 } 700 701 static bool_t 702 xdrrdma_control(XDR *xdrs, int request, void *info) 703 { 704 int32_t *int32p; 705 int len, i; 706 uint_t in_flags; 707 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 708 rdma_chunkinfo_t *rcip = NULL; 709 rdma_wlist_conn_info_t *rwcip = NULL; 710 rdma_chunkinfo_lengths_t *rcilp = NULL; 711 struct uio *uiop; 712 struct clist *rwl = NULL, *first = NULL; 713 struct clist *prev = NULL; 714 715 switch (request) { 716 case XDR_PEEK: 717 /* 718 * Return the next 4 byte unit in the XDR stream. 719 */ 720 if (xdrs->x_handy < sizeof (int32_t)) 721 return (FALSE); 722 723 int32p = (int32_t *)info; 724 *int32p = (int32_t)ntohl((uint32_t) 725 (*((int32_t *)(xdrp->xp_offp)))); 726 727 return (TRUE); 728 729 case XDR_SKIPBYTES: 730 /* 731 * Skip the next N bytes in the XDR stream. 732 */ 733 int32p = (int32_t *)info; 734 len = RNDUP((int)(*int32p)); 735 if ((xdrs->x_handy -= len) < 0) 736 return (FALSE); 737 xdrp->xp_offp += len; 738 739 return (TRUE); 740 741 case XDR_RDMA_SET_FLAGS: 742 /* 743 * Set the flags provided in the *info in xp_flags for rdma 744 * xdr stream control. 745 */ 746 int32p = (int32_t *)info; 747 in_flags = (uint_t)(*int32p); 748 749 xdrp->xp_flags |= in_flags; 750 return (TRUE); 751 752 case XDR_RDMA_GET_FLAGS: 753 /* 754 * Get the flags provided in xp_flags return through *info 755 */ 756 int32p = (int32_t *)info; 757 758 *int32p = (int32_t)xdrp->xp_flags; 759 return (TRUE); 760 761 case XDR_RDMA_GET_CHUNK_LEN: 762 rcilp = (rdma_chunkinfo_lengths_t *)info; 763 rcilp->rcil_len = xdrp->xp_reply_chunk_len; 764 rcilp->rcil_len_alt = xdrp->xp_reply_chunk_len_alt; 765 766 return (TRUE); 767 768 case XDR_RDMA_ADD_CHUNK: 769 /* 770 * Store wlist information 771 */ 772 773 rcip = (rdma_chunkinfo_t *)info; 774 775 DTRACE_PROBE2(krpc__i__xdrrdma__control__add__chunk, 776 rci_type_t, rcip->rci_type, uint32, rcip->rci_len); 777 switch (rcip->rci_type) { 778 case RCI_WRITE_UIO_CHUNK: 779 xdrp->xp_reply_chunk_len_alt += rcip->rci_len; 780 781 if ((rcip->rci_len + XDR_RDMA_BUF_OVERHEAD) < 782 xdrp->xp_min_chunk) { 783 xdrp->xp_wcl = NULL; 784 *(rcip->rci_clpp) = NULL; 785 return (TRUE); 786 } 787 uiop = rcip->rci_a.rci_uiop; 788 789 for (i = 0; i < uiop->uio_iovcnt; i++) { 790 rwl = clist_alloc(); 791 if (first == NULL) 792 first = rwl; 793 rwl->c_len = uiop->uio_iov[i].iov_len; 794 rwl->u.c_daddr = 795 (uint64)(uintptr_t) 796 (uiop->uio_iov[i].iov_base); 797 /* 798 * if userspace address, put adspace ptr in 799 * clist. If not, then do nothing since it's 800 * already set to NULL (from kmem_zalloc) 801 */ 802 if (uiop->uio_segflg == UIO_USERSPACE) { 803 rwl->c_adspc = ttoproc(curthread)->p_as; 804 } 805 806 if (prev == NULL) 807 prev = rwl; 808 else { 809 prev->c_next = rwl; 810 prev = rwl; 811 } 812 } 813 814 rwl->c_next = NULL; 815 xdrp->xp_wcl = first; 816 *(rcip->rci_clpp) = first; 817 818 break; 819 820 case RCI_WRITE_ADDR_CHUNK: 821 rwl = clist_alloc(); 822 823 rwl->c_len = rcip->rci_len; 824 rwl->u.c_daddr3 = rcip->rci_a.rci_addr; 825 rwl->c_next = NULL; 826 xdrp->xp_reply_chunk_len_alt += rcip->rci_len; 827 828 xdrp->xp_wcl = rwl; 829 *(rcip->rci_clpp) = rwl; 830 831 break; 832 833 case RCI_REPLY_CHUNK: 834 xdrp->xp_reply_chunk_len += rcip->rci_len; 835 break; 836 } 837 return (TRUE); 838 839 case XDR_RDMA_GET_WLIST: 840 *((struct clist **)info) = xdrp->xp_wcl; 841 return (TRUE); 842 843 case XDR_RDMA_SET_WLIST: 844 xdrp->xp_wcl = (struct clist *)info; 845 return (TRUE); 846 847 case XDR_RDMA_GET_RLIST: 848 *((struct clist **)info) = xdrp->xp_rcl; 849 return (TRUE); 850 851 case XDR_RDMA_GET_WCINFO: 852 rwcip = (rdma_wlist_conn_info_t *)info; 853 854 rwcip->rwci_wlist = xdrp->xp_wcl; 855 rwcip->rwci_conn = xdrp->xp_conn; 856 857 return (TRUE); 858 859 default: 860 return (FALSE); 861 } 862 } 863 864 bool_t xdr_do_clist(XDR *, clist **); 865 866 /* 867 * Not all fields in struct clist are interesting to the RPC over RDMA 868 * protocol. Only XDR the interesting fields. 869 */ 870 bool_t 871 xdr_clist(XDR *xdrs, clist *objp) 872 { 873 if (!xdr_uint32(xdrs, &objp->c_xdroff)) 874 return (FALSE); 875 if (!xdr_uint32(xdrs, &objp->c_smemhandle.mrc_rmr)) 876 return (FALSE); 877 if (!xdr_uint32(xdrs, &objp->c_len)) 878 return (FALSE); 879 if (!xdr_uint64(xdrs, &objp->w.c_saddr)) 880 return (FALSE); 881 if (!xdr_do_clist(xdrs, &objp->c_next)) 882 return (FALSE); 883 return (TRUE); 884 } 885 886 /* 887 * The following two functions are forms of xdr_pointer() 888 * and xdr_reference(). Since the generic versions just 889 * kmem_alloc() a new clist, we actually want to use the 890 * rdma_clist kmem_cache. 891 */ 892 893 /* 894 * Generate or free a clist structure from the 895 * kmem_cache "rdma_clist" 896 */ 897 bool_t 898 xdr_ref_clist(XDR *xdrs, caddr_t *pp) 899 { 900 caddr_t loc = *pp; 901 bool_t stat; 902 903 if (loc == NULL) { 904 switch (xdrs->x_op) { 905 case XDR_FREE: 906 return (TRUE); 907 908 case XDR_DECODE: 909 *pp = loc = (caddr_t)clist_alloc(); 910 break; 911 912 case XDR_ENCODE: 913 ASSERT(loc); 914 break; 915 } 916 } 917 918 stat = xdr_clist(xdrs, (struct clist *)loc); 919 920 if (xdrs->x_op == XDR_FREE) { 921 kmem_cache_free(clist_cache, loc); 922 *pp = NULL; 923 } 924 return (stat); 925 } 926 927 /* 928 * XDR a pointer to a possibly recursive clist. This differs 929 * with xdr_reference in that it can serialize/deserialiaze 930 * trees correctly. 931 * 932 * What is sent is actually a union: 933 * 934 * union object_pointer switch (boolean b) { 935 * case TRUE: object_data data; 936 * case FALSE: void nothing; 937 * } 938 * 939 * > objpp: Pointer to the pointer to the object. 940 * 941 */ 942 943 bool_t 944 xdr_do_clist(XDR *xdrs, clist **objpp) 945 { 946 bool_t more_data; 947 948 more_data = (*objpp != NULL); 949 if (!xdr_bool(xdrs, &more_data)) 950 return (FALSE); 951 if (!more_data) { 952 *objpp = NULL; 953 return (TRUE); 954 } 955 return (xdr_ref_clist(xdrs, (caddr_t *)objpp)); 956 } 957 958 uint_t 959 xdr_getbufsize(XDR *xdrs) 960 { 961 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 962 963 return ((uint_t)xdrp->xp_buf_size); 964 } 965 966 /* ARGSUSED */ 967 bool_t 968 xdr_encode_rlist_svc(XDR *xdrs, clist *rlist) 969 { 970 bool_t vfalse = FALSE; 971 972 ASSERT(rlist == NULL); 973 return (xdr_bool(xdrs, &vfalse)); 974 } 975 976 bool_t 977 xdr_encode_wlist(XDR *xdrs, clist *w) 978 { 979 bool_t vfalse = FALSE, vtrue = TRUE; 980 int i; 981 uint_t num_segment = 0; 982 struct clist *cl; 983 984 /* does a wlist exist? */ 985 if (w == NULL) { 986 return (xdr_bool(xdrs, &vfalse)); 987 } 988 /* Encode N consecutive segments, 1, N, HLOO, ..., HLOO, 0 */ 989 if (!xdr_bool(xdrs, &vtrue)) 990 return (FALSE); 991 992 for (cl = w; cl != NULL; cl = cl->c_next) { 993 num_segment++; 994 } 995 996 if (!xdr_uint32(xdrs, &num_segment)) 997 return (FALSE); 998 for (i = 0; i < num_segment; i++) { 999 1000 DTRACE_PROBE1(krpc__i__xdr_encode_wlist_len, uint_t, w->c_len); 1001 1002 if (!xdr_uint32(xdrs, &w->c_dmemhandle.mrc_rmr)) 1003 return (FALSE); 1004 1005 if (!xdr_uint32(xdrs, &w->c_len)) 1006 return (FALSE); 1007 1008 if (!xdr_uint64(xdrs, &w->u.c_daddr)) 1009 return (FALSE); 1010 1011 w = w->c_next; 1012 } 1013 1014 if (!xdr_bool(xdrs, &vfalse)) 1015 return (FALSE); 1016 1017 return (TRUE); 1018 } 1019 1020 1021 /* 1022 * Conditionally decode a RDMA WRITE chunk list from XDR stream. 1023 * 1024 * If the next boolean in the XDR stream is false there is no 1025 * RDMA WRITE chunk list present. Otherwise iterate over the 1026 * array and for each entry: allocate a struct clist and decode. 1027 * Pass back an indication via wlist_exists if we have seen a 1028 * RDMA WRITE chunk list. 1029 */ 1030 bool_t 1031 xdr_decode_wlist(XDR *xdrs, struct clist **w, bool_t *wlist_exists) 1032 { 1033 struct clist *tmp; 1034 bool_t more = FALSE; 1035 uint32_t seg_array_len; 1036 uint32_t i; 1037 1038 if (!xdr_bool(xdrs, &more)) 1039 return (FALSE); 1040 1041 /* is there a wlist? */ 1042 if (more == FALSE) { 1043 *wlist_exists = FALSE; 1044 return (TRUE); 1045 } 1046 *wlist_exists = TRUE; 1047 1048 if (!xdr_uint32(xdrs, &seg_array_len)) 1049 return (FALSE); 1050 1051 tmp = *w = clist_alloc(); 1052 for (i = 0; i < seg_array_len; i++) { 1053 1054 if (!xdr_uint32(xdrs, &tmp->c_dmemhandle.mrc_rmr)) 1055 return (FALSE); 1056 if (!xdr_uint32(xdrs, &tmp->c_len)) 1057 return (FALSE); 1058 1059 DTRACE_PROBE1(krpc__i__xdr_decode_wlist_len, 1060 uint_t, tmp->c_len); 1061 1062 if (!xdr_uint64(xdrs, &tmp->u.c_daddr)) 1063 return (FALSE); 1064 if (i < seg_array_len - 1) { 1065 tmp->c_next = clist_alloc(); 1066 tmp = tmp->c_next; 1067 } else { 1068 tmp->c_next = NULL; 1069 } 1070 } 1071 1072 more = FALSE; 1073 if (!xdr_bool(xdrs, &more)) 1074 return (FALSE); 1075 1076 return (TRUE); 1077 } 1078 1079 /* 1080 * Server side RDMA WRITE list decode. 1081 * XDR context is memory ops 1082 */ 1083 bool_t 1084 xdr_decode_wlist_svc(XDR *xdrs, struct clist **wclp, bool_t *wwl, 1085 uint32_t *total_length, CONN *conn) 1086 { 1087 struct clist *first, *ncl; 1088 char *memp; 1089 uint32_t num_wclist; 1090 uint32_t wcl_length = 0; 1091 uint32_t i; 1092 bool_t more = FALSE; 1093 1094 *wclp = NULL; 1095 *wwl = FALSE; 1096 *total_length = 0; 1097 1098 if (!xdr_bool(xdrs, &more)) { 1099 return (FALSE); 1100 } 1101 1102 if (more == FALSE) { 1103 return (TRUE); 1104 } 1105 1106 *wwl = TRUE; 1107 1108 if (!xdr_uint32(xdrs, &num_wclist)) { 1109 DTRACE_PROBE(krpc__e__xdrrdma__wlistsvc__listlength); 1110 return (FALSE); 1111 } 1112 1113 first = ncl = clist_alloc(); 1114 1115 for (i = 0; i < num_wclist; i++) { 1116 1117 if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr)) 1118 goto err_out; 1119 if (!xdr_uint32(xdrs, &ncl->c_len)) 1120 goto err_out; 1121 if (!xdr_uint64(xdrs, &ncl->u.c_daddr)) 1122 goto err_out; 1123 1124 if (ncl->c_len > MAX_SVC_XFER_SIZE) { 1125 DTRACE_PROBE( 1126 krpc__e__xdrrdma__wlistsvc__chunklist_toobig); 1127 ncl->c_len = MAX_SVC_XFER_SIZE; 1128 } 1129 1130 DTRACE_PROBE1(krpc__i__xdr_decode_wlist_svc_len, 1131 uint_t, ncl->c_len); 1132 1133 wcl_length += ncl->c_len; 1134 1135 if (i < num_wclist - 1) { 1136 ncl->c_next = clist_alloc(); 1137 ncl = ncl->c_next; 1138 } 1139 } 1140 1141 if (!xdr_bool(xdrs, &more)) 1142 goto err_out; 1143 1144 first->rb_longbuf.type = RDMA_LONG_BUFFER; 1145 first->rb_longbuf.len = 1146 wcl_length > WCL_BUF_LEN ? wcl_length : WCL_BUF_LEN; 1147 1148 if (rdma_buf_alloc(conn, &first->rb_longbuf)) { 1149 clist_free(first); 1150 return (FALSE); 1151 } 1152 1153 memp = first->rb_longbuf.addr; 1154 1155 ncl = first; 1156 for (i = 0; i < num_wclist; i++) { 1157 ncl->w.c_saddr3 = (caddr_t)memp; 1158 memp += ncl->c_len; 1159 ncl = ncl->c_next; 1160 } 1161 1162 *wclp = first; 1163 *total_length = wcl_length; 1164 return (TRUE); 1165 1166 err_out: 1167 clist_free(first); 1168 return (FALSE); 1169 } 1170 1171 /* 1172 * XDR decode the long reply write chunk. 1173 */ 1174 bool_t 1175 xdr_decode_reply_wchunk(XDR *xdrs, struct clist **clist) 1176 { 1177 bool_t have_rchunk = FALSE; 1178 struct clist *first = NULL, *ncl = NULL; 1179 uint32_t num_wclist; 1180 uint32_t i; 1181 1182 if (!xdr_bool(xdrs, &have_rchunk)) 1183 return (FALSE); 1184 1185 if (have_rchunk == FALSE) 1186 return (TRUE); 1187 1188 if (!xdr_uint32(xdrs, &num_wclist)) { 1189 DTRACE_PROBE(krpc__e__xdrrdma__replywchunk__listlength); 1190 return (FALSE); 1191 } 1192 1193 if (num_wclist == 0) { 1194 return (FALSE); 1195 } 1196 1197 first = ncl = clist_alloc(); 1198 1199 for (i = 0; i < num_wclist; i++) { 1200 1201 if (i > 0) { 1202 ncl->c_next = clist_alloc(); 1203 ncl = ncl->c_next; 1204 } 1205 1206 if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr)) 1207 goto err_out; 1208 if (!xdr_uint32(xdrs, &ncl->c_len)) 1209 goto err_out; 1210 if (!xdr_uint64(xdrs, &ncl->u.c_daddr)) 1211 goto err_out; 1212 1213 if (ncl->c_len > MAX_SVC_XFER_SIZE) { 1214 DTRACE_PROBE( 1215 krpc__e__xdrrdma__replywchunk__chunklist_toobig); 1216 ncl->c_len = MAX_SVC_XFER_SIZE; 1217 } 1218 if (!(ncl->c_dmemhandle.mrc_rmr && 1219 (ncl->c_len > 0) && ncl->u.c_daddr)) 1220 DTRACE_PROBE( 1221 krpc__e__xdrrdma__replywchunk__invalid_segaddr); 1222 1223 DTRACE_PROBE1(krpc__i__xdr_decode_reply_wchunk_c_len, 1224 uint32_t, ncl->c_len); 1225 1226 } 1227 *clist = first; 1228 return (TRUE); 1229 1230 err_out: 1231 clist_free(first); 1232 return (FALSE); 1233 } 1234 1235 1236 bool_t 1237 xdr_encode_reply_wchunk(XDR *xdrs, 1238 struct clist *cl_longreply, uint32_t seg_array_len) 1239 { 1240 int i; 1241 bool_t long_reply_exists = TRUE; 1242 uint32_t length; 1243 uint64 offset; 1244 1245 if (seg_array_len > 0) { 1246 if (!xdr_bool(xdrs, &long_reply_exists)) 1247 return (FALSE); 1248 if (!xdr_uint32(xdrs, &seg_array_len)) 1249 return (FALSE); 1250 1251 for (i = 0; i < seg_array_len; i++) { 1252 if (!cl_longreply) 1253 return (FALSE); 1254 length = cl_longreply->c_len; 1255 offset = (uint64) cl_longreply->u.c_daddr; 1256 1257 DTRACE_PROBE1( 1258 krpc__i__xdr_encode_reply_wchunk_c_len, 1259 uint32_t, length); 1260 1261 if (!xdr_uint32(xdrs, 1262 &cl_longreply->c_dmemhandle.mrc_rmr)) 1263 return (FALSE); 1264 if (!xdr_uint32(xdrs, &length)) 1265 return (FALSE); 1266 if (!xdr_uint64(xdrs, &offset)) 1267 return (FALSE); 1268 cl_longreply = cl_longreply->c_next; 1269 } 1270 } else { 1271 long_reply_exists = FALSE; 1272 if (!xdr_bool(xdrs, &long_reply_exists)) 1273 return (FALSE); 1274 } 1275 return (TRUE); 1276 } 1277 bool_t 1278 xdrrdma_read_from_client(struct clist *rlist, CONN **conn, uint_t count) 1279 { 1280 struct clist *rdclist; 1281 struct clist cl; 1282 uint_t total_len = 0; 1283 uint32_t status; 1284 bool_t retval = TRUE; 1285 1286 rlist->rb_longbuf.type = RDMA_LONG_BUFFER; 1287 rlist->rb_longbuf.len = 1288 count > RCL_BUF_LEN ? count : RCL_BUF_LEN; 1289 1290 if (rdma_buf_alloc(*conn, &rlist->rb_longbuf)) { 1291 return (FALSE); 1292 } 1293 1294 /* 1295 * The entire buffer is registered with the first chunk. 1296 * Later chunks will use the same registered memory handle. 1297 */ 1298 1299 cl = *rlist; 1300 cl.c_next = NULL; 1301 if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 1302 rdma_buf_free(*conn, &rlist->rb_longbuf); 1303 DTRACE_PROBE( 1304 krpc__e__xdrrdma__readfromclient__clist__reg); 1305 return (FALSE); 1306 } 1307 1308 rlist->c_regtype = CLIST_REG_DST; 1309 rlist->c_dmemhandle = cl.c_dmemhandle; 1310 rlist->c_dsynchandle = cl.c_dsynchandle; 1311 1312 for (rdclist = rlist; 1313 rdclist != NULL; rdclist = rdclist->c_next) { 1314 total_len += rdclist->c_len; 1315 #if (defined(OBJ32)||defined(DEBUG32)) 1316 rdclist->u.c_daddr3 = 1317 (caddr_t)((char *)rlist->rb_longbuf.addr + 1318 (uint32) rdclist->u.c_daddr3); 1319 #else 1320 rdclist->u.c_daddr3 = 1321 (caddr_t)((char *)rlist->rb_longbuf.addr + 1322 (uint64) rdclist->u.c_daddr); 1323 1324 #endif 1325 cl = (*rdclist); 1326 cl.c_next = NULL; 1327 1328 /* 1329 * Use the same memory handle for all the chunks 1330 */ 1331 cl.c_dmemhandle = rlist->c_dmemhandle; 1332 cl.c_dsynchandle = rlist->c_dsynchandle; 1333 1334 1335 DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen, 1336 int, rdclist->c_len); 1337 1338 /* 1339 * Now read the chunk in 1340 */ 1341 if (rdclist->c_next == NULL) { 1342 status = RDMA_READ(*conn, &cl, WAIT); 1343 } else { 1344 status = RDMA_READ(*conn, &cl, NOWAIT); 1345 } 1346 if (status != RDMA_SUCCESS) { 1347 DTRACE_PROBE( 1348 krpc__e__xdrrdma__readfromclient__readfailed); 1349 rdma_buf_free(*conn, &rlist->rb_longbuf); 1350 return (FALSE); 1351 } 1352 } 1353 1354 cl = (*rlist); 1355 cl.c_next = NULL; 1356 cl.c_len = total_len; 1357 if (clist_syncmem(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) { 1358 retval = FALSE; 1359 } 1360 return (retval); 1361 } 1362 1363 bool_t 1364 xdrrdma_free_clist(CONN *conn, struct clist *clp) 1365 { 1366 rdma_buf_free(conn, &clp->rb_longbuf); 1367 clist_free(clp); 1368 return (TRUE); 1369 } 1370 1371 bool_t 1372 xdrrdma_send_read_data(XDR *xdrs, uint_t data_len, struct clist *wcl) 1373 { 1374 int status; 1375 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 1376 struct xdr_ops *xops = xdrrdma_xops(); 1377 struct clist *tcl, *wrcl, *cl; 1378 struct clist fcl; 1379 int rndup_present, rnduplen; 1380 1381 rndup_present = 0; 1382 wrcl = NULL; 1383 1384 /* caller is doing a sizeof */ 1385 if (xdrs->x_ops != &xdrrdma_ops || xdrs->x_ops == xops) 1386 return (TRUE); 1387 1388 /* copy of the first chunk */ 1389 fcl = *wcl; 1390 fcl.c_next = NULL; 1391 1392 /* 1393 * The entire buffer is registered with the first chunk. 1394 * Later chunks will use the same registered memory handle. 1395 */ 1396 1397 status = clist_register(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE); 1398 if (status != RDMA_SUCCESS) { 1399 return (FALSE); 1400 } 1401 1402 wcl->c_regtype = CLIST_REG_SOURCE; 1403 wcl->c_smemhandle = fcl.c_smemhandle; 1404 wcl->c_ssynchandle = fcl.c_ssynchandle; 1405 1406 /* 1407 * Only transfer the read data ignoring any trailing 1408 * roundup chunks. A bit of work, but it saves an 1409 * unnecessary extra RDMA_WRITE containing only 1410 * roundup bytes. 1411 */ 1412 1413 rnduplen = clist_len(wcl) - data_len; 1414 1415 if (rnduplen) { 1416 1417 tcl = wcl->c_next; 1418 1419 /* 1420 * Check if there is a trailing roundup chunk 1421 */ 1422 while (tcl) { 1423 if ((tcl->c_next == NULL) && (tcl->c_len == rnduplen)) { 1424 rndup_present = 1; 1425 break; 1426 } 1427 tcl = tcl->c_next; 1428 } 1429 1430 /* 1431 * Make a copy chunk list skipping the last chunk 1432 */ 1433 if (rndup_present) { 1434 cl = wcl; 1435 tcl = NULL; 1436 while (cl) { 1437 if (tcl == NULL) { 1438 tcl = clist_alloc(); 1439 wrcl = tcl; 1440 } else { 1441 tcl->c_next = clist_alloc(); 1442 tcl = tcl->c_next; 1443 } 1444 1445 *tcl = *cl; 1446 cl = cl->c_next; 1447 /* last chunk */ 1448 if (cl->c_next == NULL) 1449 break; 1450 } 1451 tcl->c_next = NULL; 1452 } 1453 } 1454 1455 if (wrcl == NULL) { 1456 /* No roundup chunks */ 1457 wrcl = wcl; 1458 } 1459 1460 /* 1461 * Set the registered memory handles for the 1462 * rest of the chunks same as the first chunk. 1463 */ 1464 tcl = wrcl->c_next; 1465 while (tcl) { 1466 tcl->c_smemhandle = fcl.c_smemhandle; 1467 tcl->c_ssynchandle = fcl.c_ssynchandle; 1468 tcl = tcl->c_next; 1469 } 1470 1471 /* 1472 * Sync the total len beginning from the first chunk. 1473 */ 1474 fcl.c_len = clist_len(wrcl); 1475 status = clist_syncmem(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE); 1476 if (status != RDMA_SUCCESS) { 1477 return (FALSE); 1478 } 1479 1480 status = RDMA_WRITE(xdrp->xp_conn, wrcl, WAIT); 1481 1482 if (rndup_present) 1483 clist_free(wrcl); 1484 1485 if (status != RDMA_SUCCESS) { 1486 return (FALSE); 1487 } 1488 1489 return (TRUE); 1490 } 1491 1492 1493 /* 1494 * Reads one chunk at a time 1495 */ 1496 1497 static bool_t 1498 xdrrdma_read_a_chunk(XDR *xdrs, CONN **conn) 1499 { 1500 int status; 1501 int32_t len = 0; 1502 xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private); 1503 struct clist *cle = *(xdrp->xp_rcl_next); 1504 struct clist *rclp = xdrp->xp_rcl; 1505 struct clist *clp; 1506 1507 /* 1508 * len is used later to decide xdr offset in 1509 * the chunk factoring any 4-byte XDR alignment 1510 * (See read chunk example top of this file) 1511 */ 1512 while (rclp != cle) { 1513 len += rclp->c_len; 1514 rclp = rclp->c_next; 1515 } 1516 1517 len = RNDUP(len) - len; 1518 1519 ASSERT(xdrs->x_handy <= 0); 1520 1521 /* 1522 * If this is the first chunk to contain the RPC 1523 * message set xp_off to the xdr offset of the 1524 * inline message. 1525 */ 1526 if (xdrp->xp_off == 0) 1527 xdrp->xp_off = (xdrp->xp_offp - xdrs->x_base); 1528 1529 if (cle == NULL || (cle->c_xdroff != xdrp->xp_off)) 1530 return (FALSE); 1531 1532 /* 1533 * Make a copy of the chunk to read from client. 1534 * Chunks are read on demand, so read only one 1535 * for now. 1536 */ 1537 1538 rclp = clist_alloc(); 1539 *rclp = *cle; 1540 rclp->c_next = NULL; 1541 1542 xdrp->xp_rcl_next = &cle->c_next; 1543 1544 /* 1545 * If there is a roundup present, then skip those 1546 * bytes when reading. 1547 */ 1548 if (len) { 1549 rclp->w.c_saddr = 1550 (uint64)(uintptr_t)rclp->w.c_saddr + len; 1551 rclp->c_len = rclp->c_len - len; 1552 } 1553 1554 status = xdrrdma_read_from_client(rclp, conn, rclp->c_len); 1555 1556 if (status == FALSE) { 1557 clist_free(rclp); 1558 return (status); 1559 } 1560 1561 xdrp->xp_offp = rclp->rb_longbuf.addr; 1562 xdrs->x_base = xdrp->xp_offp; 1563 xdrs->x_handy = rclp->c_len; 1564 1565 /* 1566 * This copy of read chunks containing the XDR 1567 * message is freed later in xdrrdma_destroy() 1568 */ 1569 1570 if (xdrp->xp_rcl_xdr) { 1571 /* Add the chunk to end of the list */ 1572 clp = xdrp->xp_rcl_xdr; 1573 while (clp->c_next != NULL) 1574 clp = clp->c_next; 1575 clp->c_next = rclp; 1576 } else { 1577 xdrp->xp_rcl_xdr = rclp; 1578 } 1579 return (TRUE); 1580 } 1581 1582 static void 1583 xdrrdma_free_xdr_chunks(CONN *conn, struct clist *xdr_rcl) 1584 { 1585 struct clist *cl; 1586 1587 (void) clist_deregister(conn, xdr_rcl); 1588 1589 /* 1590 * Read chunks containing parts XDR message are 1591 * special: in case of multiple chunks each has 1592 * its own buffer. 1593 */ 1594 1595 cl = xdr_rcl; 1596 while (cl) { 1597 rdma_buf_free(conn, &cl->rb_longbuf); 1598 cl = cl->c_next; 1599 } 1600 1601 clist_free(xdr_rcl); 1602 } 1603