1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * rpc_rdma.c 42 * 43 * This file contains the guts of the RPC RDMA protocol, and 44 * does marshaling/unmarshaling, etc. It is also where interfacing 45 * to the Linux RPC framework lives. 46 */ 47 48 #include "xprt_rdma.h" 49 50 #include <linux/highmem.h> 51 52 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 53 # define RPCDBG_FACILITY RPCDBG_TRANS 54 #endif 55 56 enum rpcrdma_chunktype { 57 rpcrdma_noch = 0, 58 rpcrdma_readch, 59 rpcrdma_areadch, 60 rpcrdma_writech, 61 rpcrdma_replych 62 }; 63 64 static const char transfertypes[][12] = { 65 "inline", /* no chunks */ 66 "read list", /* some argument via rdma read */ 67 "*read list", /* entire request via rdma read */ 68 "write list", /* some result via rdma write */ 69 "reply chunk" /* entire reply via rdma write */ 70 }; 71 72 /* Returns size of largest RPC-over-RDMA header in a Call message 73 * 74 * The largest Call header contains a full-size Read list and a 75 * minimal Reply chunk. 76 */ 77 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) 78 { 79 unsigned int size; 80 81 /* Fixed header fields and list discriminators */ 82 size = RPCRDMA_HDRLEN_MIN; 83 84 /* Maximum Read list size */ 85 maxsegs += 2; /* segment for head and tail buffers */ 86 size = maxsegs * sizeof(struct rpcrdma_read_chunk); 87 88 /* Minimal Read chunk size */ 89 size += sizeof(__be32); /* segment count */ 90 size += sizeof(struct rpcrdma_segment); 91 size += sizeof(__be32); /* list discriminator */ 92 93 dprintk("RPC: %s: max call header size = %u\n", 94 __func__, size); 95 return size; 96 } 97 98 /* Returns size of largest RPC-over-RDMA header in a Reply message 99 * 100 * There is only one Write list or one Reply chunk per Reply 101 * message. The larger list is the Write list. 102 */ 103 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) 104 { 105 unsigned int size; 106 107 /* Fixed header fields and list discriminators */ 108 size = RPCRDMA_HDRLEN_MIN; 109 110 /* Maximum Write list size */ 111 maxsegs += 2; /* segment for head and tail buffers */ 112 size = sizeof(__be32); /* segment count */ 113 size += maxsegs * sizeof(struct rpcrdma_segment); 114 size += sizeof(__be32); /* list discriminator */ 115 116 dprintk("RPC: %s: max reply header size = %u\n", 117 __func__, size); 118 return size; 119 } 120 121 void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia, 122 struct rpcrdma_create_data_internal *cdata, 123 unsigned int maxsegs) 124 { 125 ia->ri_max_inline_write = cdata->inline_wsize - 126 rpcrdma_max_call_header_size(maxsegs); 127 ia->ri_max_inline_read = cdata->inline_rsize - 128 rpcrdma_max_reply_header_size(maxsegs); 129 } 130 131 /* The client can send a request inline as long as the RPCRDMA header 132 * plus the RPC call fit under the transport's inline limit. If the 133 * combined call message size exceeds that limit, the client must use 134 * the read chunk list for this operation. 135 */ 136 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, 137 struct rpc_rqst *rqst) 138 { 139 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 140 141 return rqst->rq_snd_buf.len <= ia->ri_max_inline_write; 142 } 143 144 /* The client can't know how large the actual reply will be. Thus it 145 * plans for the largest possible reply for that particular ULP 146 * operation. If the maximum combined reply message size exceeds that 147 * limit, the client must provide a write list or a reply chunk for 148 * this request. 149 */ 150 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, 151 struct rpc_rqst *rqst) 152 { 153 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 154 155 return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; 156 } 157 158 static int 159 rpcrdma_tail_pullup(struct xdr_buf *buf) 160 { 161 size_t tlen = buf->tail[0].iov_len; 162 size_t skip = tlen & 3; 163 164 /* Do not include the tail if it is only an XDR pad */ 165 if (tlen < 4) 166 return 0; 167 168 /* xdr_write_pages() adds a pad at the beginning of the tail 169 * if the content in "buf->pages" is unaligned. Force the 170 * tail's actual content to land at the next XDR position 171 * after the head instead. 172 */ 173 if (skip) { 174 unsigned char *src, *dst; 175 unsigned int count; 176 177 src = buf->tail[0].iov_base; 178 dst = buf->head[0].iov_base; 179 dst += buf->head[0].iov_len; 180 181 src += skip; 182 tlen -= skip; 183 184 dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n", 185 __func__, skip, dst, src, tlen); 186 187 for (count = tlen; count; count--) 188 *dst++ = *src++; 189 } 190 191 return tlen; 192 } 193 194 /* Split "vec" on page boundaries into segments. FMR registers pages, 195 * not a byte range. Other modes coalesce these segments into a single 196 * MR when they can. 197 */ 198 static int 199 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, 200 int n, int nsegs) 201 { 202 size_t page_offset; 203 u32 remaining; 204 char *base; 205 206 base = vec->iov_base; 207 page_offset = offset_in_page(base); 208 remaining = vec->iov_len; 209 while (remaining && n < nsegs) { 210 seg[n].mr_page = NULL; 211 seg[n].mr_offset = base; 212 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); 213 remaining -= seg[n].mr_len; 214 base += seg[n].mr_len; 215 ++n; 216 page_offset = 0; 217 } 218 return n; 219 } 220 221 /* 222 * Chunk assembly from upper layer xdr_buf. 223 * 224 * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk 225 * elements. Segments are then coalesced when registered, if possible 226 * within the selected memreg mode. 227 * 228 * Returns positive number of segments converted, or a negative errno. 229 */ 230 231 static int 232 rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, 233 enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs) 234 { 235 int len, n = 0, p; 236 int page_base; 237 struct page **ppages; 238 239 if (pos == 0) { 240 n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs); 241 if (n == nsegs) 242 return -EIO; 243 } 244 245 len = xdrbuf->page_len; 246 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); 247 page_base = xdrbuf->page_base & ~PAGE_MASK; 248 p = 0; 249 while (len && n < nsegs) { 250 if (!ppages[p]) { 251 /* alloc the pagelist for receiving buffer */ 252 ppages[p] = alloc_page(GFP_ATOMIC); 253 if (!ppages[p]) 254 return -ENOMEM; 255 } 256 seg[n].mr_page = ppages[p]; 257 seg[n].mr_offset = (void *)(unsigned long) page_base; 258 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); 259 if (seg[n].mr_len > PAGE_SIZE) 260 return -EIO; 261 len -= seg[n].mr_len; 262 ++n; 263 ++p; 264 page_base = 0; /* page offset only applies to first page */ 265 } 266 267 /* Message overflows the seg array */ 268 if (len && n == nsegs) 269 return -EIO; 270 271 /* When encoding the read list, the tail is always sent inline */ 272 if (type == rpcrdma_readch) 273 return n; 274 275 if (xdrbuf->tail[0].iov_len) { 276 /* the rpcrdma protocol allows us to omit any trailing 277 * xdr pad bytes, saving the server an RDMA operation. */ 278 if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize) 279 return n; 280 n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs); 281 if (n == nsegs) 282 return -EIO; 283 } 284 285 return n; 286 } 287 288 static inline __be32 * 289 xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg) 290 { 291 *iptr++ = cpu_to_be32(seg->mr_rkey); 292 *iptr++ = cpu_to_be32(seg->mr_len); 293 return xdr_encode_hyper(iptr, seg->mr_base); 294 } 295 296 /* XDR-encode the Read list. Supports encoding a list of read 297 * segments that belong to a single read chunk. 298 * 299 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 300 * 301 * Read chunklist (a linked list): 302 * N elements, position P (same P for all chunks of same arg!): 303 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 304 * 305 * Returns a pointer to the XDR word in the RDMA header following 306 * the end of the Read list, or an error pointer. 307 */ 308 static __be32 * 309 rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, 310 struct rpcrdma_req *req, struct rpc_rqst *rqst, 311 __be32 *iptr, enum rpcrdma_chunktype rtype) 312 { 313 struct rpcrdma_mr_seg *seg = req->rl_nextseg; 314 unsigned int pos; 315 int n, nsegs; 316 317 if (rtype == rpcrdma_noch) { 318 *iptr++ = xdr_zero; /* item not present */ 319 return iptr; 320 } 321 322 pos = rqst->rq_snd_buf.head[0].iov_len; 323 if (rtype == rpcrdma_areadch) 324 pos = 0; 325 nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, 326 RPCRDMA_MAX_SEGS - req->rl_nchunks); 327 if (nsegs < 0) 328 return ERR_PTR(nsegs); 329 330 do { 331 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false); 332 if (n <= 0) 333 return ERR_PTR(n); 334 335 *iptr++ = xdr_one; /* item present */ 336 337 /* All read segments in this chunk 338 * have the same "position". 339 */ 340 *iptr++ = cpu_to_be32(pos); 341 iptr = xdr_encode_rdma_segment(iptr, seg); 342 343 dprintk("RPC: %5u %s: read segment pos %u " 344 "%d@0x%016llx:0x%08x (%s)\n", 345 rqst->rq_task->tk_pid, __func__, pos, 346 seg->mr_len, (unsigned long long)seg->mr_base, 347 seg->mr_rkey, n < nsegs ? "more" : "last"); 348 349 r_xprt->rx_stats.read_chunk_count++; 350 req->rl_nchunks++; 351 seg += n; 352 nsegs -= n; 353 } while (nsegs); 354 req->rl_nextseg = seg; 355 356 /* Finish Read list */ 357 *iptr++ = xdr_zero; /* Next item not present */ 358 return iptr; 359 } 360 361 /* XDR-encode the Write list. Supports encoding a list containing 362 * one array of plain segments that belong to a single write chunk. 363 * 364 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 365 * 366 * Write chunklist (a list of (one) counted array): 367 * N elements: 368 * 1 - N - HLOO - HLOO - ... - HLOO - 0 369 * 370 * Returns a pointer to the XDR word in the RDMA header following 371 * the end of the Write list, or an error pointer. 372 */ 373 static __be32 * 374 rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, 375 struct rpc_rqst *rqst, __be32 *iptr, 376 enum rpcrdma_chunktype wtype) 377 { 378 struct rpcrdma_mr_seg *seg = req->rl_nextseg; 379 int n, nsegs, nchunks; 380 __be32 *segcount; 381 382 if (wtype != rpcrdma_writech) { 383 *iptr++ = xdr_zero; /* no Write list present */ 384 return iptr; 385 } 386 387 nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 388 rqst->rq_rcv_buf.head[0].iov_len, 389 wtype, seg, 390 RPCRDMA_MAX_SEGS - req->rl_nchunks); 391 if (nsegs < 0) 392 return ERR_PTR(nsegs); 393 394 *iptr++ = xdr_one; /* Write list present */ 395 segcount = iptr++; /* save location of segment count */ 396 397 nchunks = 0; 398 do { 399 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); 400 if (n <= 0) 401 return ERR_PTR(n); 402 403 iptr = xdr_encode_rdma_segment(iptr, seg); 404 405 dprintk("RPC: %5u %s: write segment " 406 "%d@0x016%llx:0x%08x (%s)\n", 407 rqst->rq_task->tk_pid, __func__, 408 seg->mr_len, (unsigned long long)seg->mr_base, 409 seg->mr_rkey, n < nsegs ? "more" : "last"); 410 411 r_xprt->rx_stats.write_chunk_count++; 412 r_xprt->rx_stats.total_rdma_request += seg->mr_len; 413 req->rl_nchunks++; 414 nchunks++; 415 seg += n; 416 nsegs -= n; 417 } while (nsegs); 418 req->rl_nextseg = seg; 419 420 /* Update count of segments in this Write chunk */ 421 *segcount = cpu_to_be32(nchunks); 422 423 /* Finish Write list */ 424 *iptr++ = xdr_zero; /* Next item not present */ 425 return iptr; 426 } 427 428 /* XDR-encode the Reply chunk. Supports encoding an array of plain 429 * segments that belong to a single write (reply) chunk. 430 * 431 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 432 * 433 * Reply chunk (a counted array): 434 * N elements: 435 * 1 - N - HLOO - HLOO - ... - HLOO 436 * 437 * Returns a pointer to the XDR word in the RDMA header following 438 * the end of the Reply chunk, or an error pointer. 439 */ 440 static __be32 * 441 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, 442 struct rpcrdma_req *req, struct rpc_rqst *rqst, 443 __be32 *iptr, enum rpcrdma_chunktype wtype) 444 { 445 struct rpcrdma_mr_seg *seg = req->rl_nextseg; 446 int n, nsegs, nchunks; 447 __be32 *segcount; 448 449 if (wtype != rpcrdma_replych) { 450 *iptr++ = xdr_zero; /* no Reply chunk present */ 451 return iptr; 452 } 453 454 nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg, 455 RPCRDMA_MAX_SEGS - req->rl_nchunks); 456 if (nsegs < 0) 457 return ERR_PTR(nsegs); 458 459 *iptr++ = xdr_one; /* Reply chunk present */ 460 segcount = iptr++; /* save location of segment count */ 461 462 nchunks = 0; 463 do { 464 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); 465 if (n <= 0) 466 return ERR_PTR(n); 467 468 iptr = xdr_encode_rdma_segment(iptr, seg); 469 470 dprintk("RPC: %5u %s: reply segment " 471 "%d@0x%016llx:0x%08x (%s)\n", 472 rqst->rq_task->tk_pid, __func__, 473 seg->mr_len, (unsigned long long)seg->mr_base, 474 seg->mr_rkey, n < nsegs ? "more" : "last"); 475 476 r_xprt->rx_stats.reply_chunk_count++; 477 r_xprt->rx_stats.total_rdma_request += seg->mr_len; 478 req->rl_nchunks++; 479 nchunks++; 480 seg += n; 481 nsegs -= n; 482 } while (nsegs); 483 req->rl_nextseg = seg; 484 485 /* Update count of segments in the Reply chunk */ 486 *segcount = cpu_to_be32(nchunks); 487 488 return iptr; 489 } 490 491 /* 492 * Copy write data inline. 493 * This function is used for "small" requests. Data which is passed 494 * to RPC via iovecs (or page list) is copied directly into the 495 * pre-registered memory buffer for this request. For small amounts 496 * of data, this is efficient. The cutoff value is tunable. 497 */ 498 static void rpcrdma_inline_pullup(struct rpc_rqst *rqst) 499 { 500 int i, npages, curlen; 501 int copy_len; 502 unsigned char *srcp, *destp; 503 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 504 int page_base; 505 struct page **ppages; 506 507 destp = rqst->rq_svec[0].iov_base; 508 curlen = rqst->rq_svec[0].iov_len; 509 destp += curlen; 510 511 dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n", 512 __func__, destp, rqst->rq_slen, curlen); 513 514 copy_len = rqst->rq_snd_buf.page_len; 515 516 if (rqst->rq_snd_buf.tail[0].iov_len) { 517 curlen = rqst->rq_snd_buf.tail[0].iov_len; 518 if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) { 519 memmove(destp + copy_len, 520 rqst->rq_snd_buf.tail[0].iov_base, curlen); 521 r_xprt->rx_stats.pullup_copy_count += curlen; 522 } 523 dprintk("RPC: %s: tail destp 0x%p len %d\n", 524 __func__, destp + copy_len, curlen); 525 rqst->rq_svec[0].iov_len += curlen; 526 } 527 r_xprt->rx_stats.pullup_copy_count += copy_len; 528 529 page_base = rqst->rq_snd_buf.page_base; 530 ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT); 531 page_base &= ~PAGE_MASK; 532 npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT; 533 for (i = 0; copy_len && i < npages; i++) { 534 curlen = PAGE_SIZE - page_base; 535 if (curlen > copy_len) 536 curlen = copy_len; 537 dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n", 538 __func__, i, destp, copy_len, curlen); 539 srcp = kmap_atomic(ppages[i]); 540 memcpy(destp, srcp+page_base, curlen); 541 kunmap_atomic(srcp); 542 rqst->rq_svec[0].iov_len += curlen; 543 destp += curlen; 544 copy_len -= curlen; 545 page_base = 0; 546 } 547 /* header now contains entire send message */ 548 } 549 550 /* 551 * Marshal a request: the primary job of this routine is to choose 552 * the transfer modes. See comments below. 553 * 554 * Prepares up to two IOVs per Call message: 555 * 556 * [0] -- RPC RDMA header 557 * [1] -- the RPC header/data 558 * 559 * Returns zero on success, otherwise a negative errno. 560 */ 561 562 int 563 rpcrdma_marshal_req(struct rpc_rqst *rqst) 564 { 565 struct rpc_xprt *xprt = rqst->rq_xprt; 566 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 567 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 568 enum rpcrdma_chunktype rtype, wtype; 569 struct rpcrdma_msg *headerp; 570 ssize_t hdrlen; 571 size_t rpclen; 572 __be32 *iptr; 573 574 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 575 if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) 576 return rpcrdma_bc_marshal_reply(rqst); 577 #endif 578 579 headerp = rdmab_to_msg(req->rl_rdmabuf); 580 /* don't byte-swap XID, it's already done in request */ 581 headerp->rm_xid = rqst->rq_xid; 582 headerp->rm_vers = rpcrdma_version; 583 headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); 584 headerp->rm_type = rdma_msg; 585 586 /* 587 * Chunks needed for results? 588 * 589 * o If the expected result is under the inline threshold, all ops 590 * return as inline. 591 * o Large read ops return data as write chunk(s), header as 592 * inline. 593 * o Large non-read ops return as a single reply chunk. 594 */ 595 if (rpcrdma_results_inline(r_xprt, rqst)) 596 wtype = rpcrdma_noch; 597 else if (rqst->rq_rcv_buf.flags & XDRBUF_READ) 598 wtype = rpcrdma_writech; 599 else 600 wtype = rpcrdma_replych; 601 602 /* 603 * Chunks needed for arguments? 604 * 605 * o If the total request is under the inline threshold, all ops 606 * are sent as inline. 607 * o Large write ops transmit data as read chunk(s), header as 608 * inline. 609 * o Large non-write ops are sent with the entire message as a 610 * single read chunk (protocol 0-position special case). 611 * 612 * This assumes that the upper layer does not present a request 613 * that both has a data payload, and whose non-data arguments 614 * by themselves are larger than the inline threshold. 615 */ 616 if (rpcrdma_args_inline(r_xprt, rqst)) { 617 rtype = rpcrdma_noch; 618 rpcrdma_inline_pullup(rqst); 619 rpclen = rqst->rq_svec[0].iov_len; 620 } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) { 621 rtype = rpcrdma_readch; 622 rpclen = rqst->rq_svec[0].iov_len; 623 rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); 624 } else { 625 r_xprt->rx_stats.nomsg_call_count++; 626 headerp->rm_type = htonl(RDMA_NOMSG); 627 rtype = rpcrdma_areadch; 628 rpclen = 0; 629 } 630 631 /* This implementation supports the following combinations 632 * of chunk lists in one RPC-over-RDMA Call message: 633 * 634 * - Read list 635 * - Write list 636 * - Reply chunk 637 * - Read list + Reply chunk 638 * 639 * It might not yet support the following combinations: 640 * 641 * - Read list + Write list 642 * 643 * It does not support the following combinations: 644 * 645 * - Write list + Reply chunk 646 * - Read list + Write list + Reply chunk 647 * 648 * This implementation supports only a single chunk in each 649 * Read or Write list. Thus for example the client cannot 650 * send a Call message with a Position Zero Read chunk and a 651 * regular Read chunk at the same time. 652 */ 653 req->rl_nchunks = 0; 654 req->rl_nextseg = req->rl_segments; 655 iptr = headerp->rm_body.rm_chunks; 656 iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype); 657 if (IS_ERR(iptr)) 658 goto out_unmap; 659 iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype); 660 if (IS_ERR(iptr)) 661 goto out_unmap; 662 iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype); 663 if (IS_ERR(iptr)) 664 goto out_unmap; 665 hdrlen = (unsigned char *)iptr - (unsigned char *)headerp; 666 667 if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) 668 goto out_overflow; 669 670 dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n", 671 rqst->rq_task->tk_pid, __func__, 672 transfertypes[rtype], transfertypes[wtype], 673 hdrlen, rpclen); 674 675 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); 676 req->rl_send_iov[0].length = hdrlen; 677 req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); 678 679 req->rl_niovs = 1; 680 if (rtype == rpcrdma_areadch) 681 return 0; 682 683 req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); 684 req->rl_send_iov[1].length = rpclen; 685 req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf); 686 687 req->rl_niovs = 2; 688 return 0; 689 690 out_overflow: 691 pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n", 692 hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]); 693 /* Terminate this RPC. Chunks registered above will be 694 * released by xprt_release -> xprt_rmda_free . 695 */ 696 return -EIO; 697 698 out_unmap: 699 r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); 700 return PTR_ERR(iptr); 701 } 702 703 /* 704 * Chase down a received write or reply chunklist to get length 705 * RDMA'd by server. See map at rpcrdma_create_chunks()! :-) 706 */ 707 static int 708 rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp) 709 { 710 unsigned int i, total_len; 711 struct rpcrdma_write_chunk *cur_wchunk; 712 char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf); 713 714 i = be32_to_cpu(**iptrp); 715 if (i > max) 716 return -1; 717 cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1); 718 total_len = 0; 719 while (i--) { 720 struct rpcrdma_segment *seg = &cur_wchunk->wc_target; 721 ifdebug(FACILITY) { 722 u64 off; 723 xdr_decode_hyper((__be32 *)&seg->rs_offset, &off); 724 dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n", 725 __func__, 726 be32_to_cpu(seg->rs_length), 727 (unsigned long long)off, 728 be32_to_cpu(seg->rs_handle)); 729 } 730 total_len += be32_to_cpu(seg->rs_length); 731 ++cur_wchunk; 732 } 733 /* check and adjust for properly terminated write chunk */ 734 if (wrchunk) { 735 __be32 *w = (__be32 *) cur_wchunk; 736 if (*w++ != xdr_zero) 737 return -1; 738 cur_wchunk = (struct rpcrdma_write_chunk *) w; 739 } 740 if ((char *)cur_wchunk > base + rep->rr_len) 741 return -1; 742 743 *iptrp = (__be32 *) cur_wchunk; 744 return total_len; 745 } 746 747 /* 748 * Scatter inline received data back into provided iov's. 749 */ 750 static void 751 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) 752 { 753 int i, npages, curlen, olen; 754 char *destp; 755 struct page **ppages; 756 int page_base; 757 758 curlen = rqst->rq_rcv_buf.head[0].iov_len; 759 if (curlen > copy_len) { /* write chunk header fixup */ 760 curlen = copy_len; 761 rqst->rq_rcv_buf.head[0].iov_len = curlen; 762 } 763 764 dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n", 765 __func__, srcp, copy_len, curlen); 766 767 /* Shift pointer for first receive segment only */ 768 rqst->rq_rcv_buf.head[0].iov_base = srcp; 769 srcp += curlen; 770 copy_len -= curlen; 771 772 olen = copy_len; 773 i = 0; 774 rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen; 775 page_base = rqst->rq_rcv_buf.page_base; 776 ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT); 777 page_base &= ~PAGE_MASK; 778 779 if (copy_len && rqst->rq_rcv_buf.page_len) { 780 npages = PAGE_ALIGN(page_base + 781 rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT; 782 for (; i < npages; i++) { 783 curlen = PAGE_SIZE - page_base; 784 if (curlen > copy_len) 785 curlen = copy_len; 786 dprintk("RPC: %s: page %d" 787 " srcp 0x%p len %d curlen %d\n", 788 __func__, i, srcp, copy_len, curlen); 789 destp = kmap_atomic(ppages[i]); 790 memcpy(destp + page_base, srcp, curlen); 791 flush_dcache_page(ppages[i]); 792 kunmap_atomic(destp); 793 srcp += curlen; 794 copy_len -= curlen; 795 if (copy_len == 0) 796 break; 797 page_base = 0; 798 } 799 } 800 801 if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) { 802 curlen = copy_len; 803 if (curlen > rqst->rq_rcv_buf.tail[0].iov_len) 804 curlen = rqst->rq_rcv_buf.tail[0].iov_len; 805 if (rqst->rq_rcv_buf.tail[0].iov_base != srcp) 806 memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen); 807 dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n", 808 __func__, srcp, copy_len, curlen); 809 rqst->rq_rcv_buf.tail[0].iov_len = curlen; 810 copy_len -= curlen; ++i; 811 } else 812 rqst->rq_rcv_buf.tail[0].iov_len = 0; 813 814 if (pad) { 815 /* implicit padding on terminal chunk */ 816 unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base; 817 while (pad--) 818 p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0; 819 } 820 821 if (copy_len) 822 dprintk("RPC: %s: %d bytes in" 823 " %d extra segments (%d lost)\n", 824 __func__, olen, i, copy_len); 825 826 /* TBD avoid a warning from call_decode() */ 827 rqst->rq_private_buf = rqst->rq_rcv_buf; 828 } 829 830 void 831 rpcrdma_connect_worker(struct work_struct *work) 832 { 833 struct rpcrdma_ep *ep = 834 container_of(work, struct rpcrdma_ep, rep_connect_worker.work); 835 struct rpcrdma_xprt *r_xprt = 836 container_of(ep, struct rpcrdma_xprt, rx_ep); 837 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 838 839 spin_lock_bh(&xprt->transport_lock); 840 if (++xprt->connect_cookie == 0) /* maintain a reserved value */ 841 ++xprt->connect_cookie; 842 if (ep->rep_connected > 0) { 843 if (!xprt_test_and_set_connected(xprt)) 844 xprt_wake_pending_tasks(xprt, 0); 845 } else { 846 if (xprt_test_and_clear_connected(xprt)) 847 xprt_wake_pending_tasks(xprt, -ENOTCONN); 848 } 849 spin_unlock_bh(&xprt->transport_lock); 850 } 851 852 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 853 /* By convention, backchannel calls arrive via rdma_msg type 854 * messages, and never populate the chunk lists. This makes 855 * the RPC/RDMA header small and fixed in size, so it is 856 * straightforward to check the RPC header's direction field. 857 */ 858 static bool 859 rpcrdma_is_bcall(struct rpcrdma_msg *headerp) 860 { 861 __be32 *p = (__be32 *)headerp; 862 863 if (headerp->rm_type != rdma_msg) 864 return false; 865 if (headerp->rm_body.rm_chunks[0] != xdr_zero) 866 return false; 867 if (headerp->rm_body.rm_chunks[1] != xdr_zero) 868 return false; 869 if (headerp->rm_body.rm_chunks[2] != xdr_zero) 870 return false; 871 872 /* sanity */ 873 if (p[7] != headerp->rm_xid) 874 return false; 875 /* call direction */ 876 if (p[8] != cpu_to_be32(RPC_CALL)) 877 return false; 878 879 return true; 880 } 881 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 882 883 /* 884 * This function is called when an async event is posted to 885 * the connection which changes the connection state. All it 886 * does at this point is mark the connection up/down, the rpc 887 * timers do the rest. 888 */ 889 void 890 rpcrdma_conn_func(struct rpcrdma_ep *ep) 891 { 892 schedule_delayed_work(&ep->rep_connect_worker, 0); 893 } 894 895 /* Process received RPC/RDMA messages. 896 * 897 * Errors must result in the RPC task either being awakened, or 898 * allowed to timeout, to discover the errors at that time. 899 */ 900 void 901 rpcrdma_reply_handler(struct rpcrdma_rep *rep) 902 { 903 struct rpcrdma_msg *headerp; 904 struct rpcrdma_req *req; 905 struct rpc_rqst *rqst; 906 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 907 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 908 __be32 *iptr; 909 int rdmalen, status, rmerr; 910 unsigned long cwnd; 911 912 dprintk("RPC: %s: incoming rep %p\n", __func__, rep); 913 914 if (rep->rr_len == RPCRDMA_BAD_LEN) 915 goto out_badstatus; 916 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 917 goto out_shortreply; 918 919 headerp = rdmab_to_msg(rep->rr_rdmabuf); 920 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 921 if (rpcrdma_is_bcall(headerp)) 922 goto out_bcall; 923 #endif 924 925 /* Match incoming rpcrdma_rep to an rpcrdma_req to 926 * get context for handling any incoming chunks. 927 */ 928 spin_lock_bh(&xprt->transport_lock); 929 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); 930 if (!rqst) 931 goto out_nomatch; 932 933 req = rpcr_to_rdmar(rqst); 934 if (req->rl_reply) 935 goto out_duplicate; 936 937 /* Sanity checking has passed. We are now committed 938 * to complete this transaction. 939 */ 940 list_del_init(&rqst->rq_list); 941 spin_unlock_bh(&xprt->transport_lock); 942 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", 943 __func__, rep, req, be32_to_cpu(headerp->rm_xid)); 944 945 /* from here on, the reply is no longer an orphan */ 946 req->rl_reply = rep; 947 xprt->reestablish_timeout = 0; 948 949 if (headerp->rm_vers != rpcrdma_version) 950 goto out_badversion; 951 952 /* check for expected message types */ 953 /* The order of some of these tests is important. */ 954 switch (headerp->rm_type) { 955 case rdma_msg: 956 /* never expect read chunks */ 957 /* never expect reply chunks (two ways to check) */ 958 /* never expect write chunks without having offered RDMA */ 959 if (headerp->rm_body.rm_chunks[0] != xdr_zero || 960 (headerp->rm_body.rm_chunks[1] == xdr_zero && 961 headerp->rm_body.rm_chunks[2] != xdr_zero) || 962 (headerp->rm_body.rm_chunks[1] != xdr_zero && 963 req->rl_nchunks == 0)) 964 goto badheader; 965 if (headerp->rm_body.rm_chunks[1] != xdr_zero) { 966 /* count any expected write chunks in read reply */ 967 /* start at write chunk array count */ 968 iptr = &headerp->rm_body.rm_chunks[2]; 969 rdmalen = rpcrdma_count_chunks(rep, 970 req->rl_nchunks, 1, &iptr); 971 /* check for validity, and no reply chunk after */ 972 if (rdmalen < 0 || *iptr++ != xdr_zero) 973 goto badheader; 974 rep->rr_len -= 975 ((unsigned char *)iptr - (unsigned char *)headerp); 976 status = rep->rr_len + rdmalen; 977 r_xprt->rx_stats.total_rdma_reply += rdmalen; 978 /* special case - last chunk may omit padding */ 979 if (rdmalen &= 3) { 980 rdmalen = 4 - rdmalen; 981 status += rdmalen; 982 } 983 } else { 984 /* else ordinary inline */ 985 rdmalen = 0; 986 iptr = (__be32 *)((unsigned char *)headerp + 987 RPCRDMA_HDRLEN_MIN); 988 rep->rr_len -= RPCRDMA_HDRLEN_MIN; 989 status = rep->rr_len; 990 } 991 /* Fix up the rpc results for upper layer */ 992 rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen); 993 break; 994 995 case rdma_nomsg: 996 /* never expect read or write chunks, always reply chunks */ 997 if (headerp->rm_body.rm_chunks[0] != xdr_zero || 998 headerp->rm_body.rm_chunks[1] != xdr_zero || 999 headerp->rm_body.rm_chunks[2] != xdr_one || 1000 req->rl_nchunks == 0) 1001 goto badheader; 1002 iptr = (__be32 *)((unsigned char *)headerp + 1003 RPCRDMA_HDRLEN_MIN); 1004 rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr); 1005 if (rdmalen < 0) 1006 goto badheader; 1007 r_xprt->rx_stats.total_rdma_reply += rdmalen; 1008 /* Reply chunk buffer already is the reply vector - no fixup. */ 1009 status = rdmalen; 1010 break; 1011 1012 case rdma_error: 1013 goto out_rdmaerr; 1014 1015 badheader: 1016 default: 1017 dprintk("%s: invalid rpcrdma reply header (type %d):" 1018 " chunks[012] == %d %d %d" 1019 " expected chunks <= %d\n", 1020 __func__, be32_to_cpu(headerp->rm_type), 1021 headerp->rm_body.rm_chunks[0], 1022 headerp->rm_body.rm_chunks[1], 1023 headerp->rm_body.rm_chunks[2], 1024 req->rl_nchunks); 1025 status = -EIO; 1026 r_xprt->rx_stats.bad_reply_count++; 1027 break; 1028 } 1029 1030 out: 1031 /* Invalidate and flush the data payloads before waking the 1032 * waiting application. This guarantees the memory region is 1033 * properly fenced from the server before the application 1034 * accesses the data. It also ensures proper send flow 1035 * control: waking the next RPC waits until this RPC has 1036 * relinquished all its Send Queue entries. 1037 */ 1038 if (req->rl_nchunks) 1039 r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req); 1040 1041 spin_lock_bh(&xprt->transport_lock); 1042 cwnd = xprt->cwnd; 1043 xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; 1044 if (xprt->cwnd > cwnd) 1045 xprt_release_rqst_cong(rqst->rq_task); 1046 1047 xprt_complete_rqst(rqst->rq_task, status); 1048 spin_unlock_bh(&xprt->transport_lock); 1049 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", 1050 __func__, xprt, rqst, status); 1051 return; 1052 1053 out_badstatus: 1054 rpcrdma_recv_buffer_put(rep); 1055 if (r_xprt->rx_ep.rep_connected == 1) { 1056 r_xprt->rx_ep.rep_connected = -EIO; 1057 rpcrdma_conn_func(&r_xprt->rx_ep); 1058 } 1059 return; 1060 1061 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1062 out_bcall: 1063 rpcrdma_bc_receive_call(r_xprt, rep); 1064 return; 1065 #endif 1066 1067 /* If the incoming reply terminated a pending RPC, the next 1068 * RPC call will post a replacement receive buffer as it is 1069 * being marshaled. 1070 */ 1071 out_badversion: 1072 dprintk("RPC: %s: invalid version %d\n", 1073 __func__, be32_to_cpu(headerp->rm_vers)); 1074 status = -EIO; 1075 r_xprt->rx_stats.bad_reply_count++; 1076 goto out; 1077 1078 out_rdmaerr: 1079 rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err); 1080 switch (rmerr) { 1081 case ERR_VERS: 1082 pr_err("%s: server reports header version error (%u-%u)\n", 1083 __func__, 1084 be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low), 1085 be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high)); 1086 break; 1087 case ERR_CHUNK: 1088 pr_err("%s: server reports header decoding error\n", 1089 __func__); 1090 break; 1091 default: 1092 pr_err("%s: server reports unknown error %d\n", 1093 __func__, rmerr); 1094 } 1095 status = -EREMOTEIO; 1096 r_xprt->rx_stats.bad_reply_count++; 1097 goto out; 1098 1099 /* If no pending RPC transaction was matched, post a replacement 1100 * receive buffer before returning. 1101 */ 1102 out_shortreply: 1103 dprintk("RPC: %s: short/invalid reply\n", __func__); 1104 goto repost; 1105 1106 out_nomatch: 1107 spin_unlock_bh(&xprt->transport_lock); 1108 dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n", 1109 __func__, be32_to_cpu(headerp->rm_xid), 1110 rep->rr_len); 1111 goto repost; 1112 1113 out_duplicate: 1114 spin_unlock_bh(&xprt->transport_lock); 1115 dprintk("RPC: %s: " 1116 "duplicate reply %p to RPC request %p: xid 0x%08x\n", 1117 __func__, rep, req, be32_to_cpu(headerp->rm_xid)); 1118 1119 repost: 1120 r_xprt->rx_stats.bad_reply_count++; 1121 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) 1122 rpcrdma_recv_buffer_put(rep); 1123 } 1124