1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 /* 3 * Copyright (c) 2014-2020, Oracle and/or its affiliates. 4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the BSD-type 10 * license below: 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted provided that the following conditions 14 * are met: 15 * 16 * Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * Neither the name of the Network Appliance, Inc. nor the names of 25 * its contributors may be used to endorse or promote products 26 * derived from this software without specific prior written 27 * permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 40 */ 41 42 /* 43 * rpc_rdma.c 44 * 45 * This file contains the guts of the RPC RDMA protocol, and 46 * does marshaling/unmarshaling, etc. It is also where interfacing 47 * to the Linux RPC framework lives. 48 */ 49 50 #include <linux/highmem.h> 51 52 #include <linux/sunrpc/svc_rdma.h> 53 54 #include "xprt_rdma.h" 55 #include <trace/events/rpcrdma.h> 56 57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 58 # define RPCDBG_FACILITY RPCDBG_TRANS 59 #endif 60 61 /* Returns size of largest RPC-over-RDMA header in a Call message 62 * 63 * The largest Call header contains a full-size Read list and a 64 * minimal Reply chunk. 65 */ 66 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) 67 { 68 unsigned int size; 69 70 /* Fixed header fields and list discriminators */ 71 size = RPCRDMA_HDRLEN_MIN; 72 73 /* Maximum Read list size */ 74 size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32); 75 76 /* Minimal Read chunk size */ 77 size += sizeof(__be32); /* segment count */ 78 size += rpcrdma_segment_maxsz * sizeof(__be32); 79 size += sizeof(__be32); /* list discriminator */ 80 81 return size; 82 } 83 84 /* Returns size of largest RPC-over-RDMA header in a Reply message 85 * 86 * There is only one Write list or one Reply chunk per Reply 87 * message. The larger list is the Write list. 88 */ 89 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) 90 { 91 unsigned int size; 92 93 /* Fixed header fields and list discriminators */ 94 size = RPCRDMA_HDRLEN_MIN; 95 96 /* Maximum Write list size */ 97 size += sizeof(__be32); /* segment count */ 98 size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); 99 size += sizeof(__be32); /* list discriminator */ 100 101 return size; 102 } 103 104 /** 105 * rpcrdma_set_max_header_sizes - Initialize inline payload sizes 106 * @ep: endpoint to initialize 107 * 108 * The max_inline fields contain the maximum size of an RPC message 109 * so the marshaling code doesn't have to repeat this calculation 110 * for every RPC. 111 */ 112 void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep) 113 { 114 unsigned int maxsegs = ep->re_max_rdma_segs; 115 116 ep->re_max_inline_send = 117 ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs); 118 ep->re_max_inline_recv = 119 ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs); 120 } 121 122 /* The client can send a request inline as long as the RPCRDMA header 123 * plus the RPC call fit under the transport's inline limit. If the 124 * combined call message size exceeds that limit, the client must use 125 * a Read chunk for this operation. 126 * 127 * A Read chunk is also required if sending the RPC call inline would 128 * exceed this device's max_sge limit. 129 */ 130 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, 131 struct rpc_rqst *rqst) 132 { 133 struct xdr_buf *xdr = &rqst->rq_snd_buf; 134 struct rpcrdma_ep *ep = r_xprt->rx_ep; 135 unsigned int count, remaining, offset; 136 137 if (xdr->len > ep->re_max_inline_send) 138 return false; 139 140 if (xdr->page_len) { 141 remaining = xdr->page_len; 142 offset = offset_in_page(xdr->page_base); 143 count = RPCRDMA_MIN_SEND_SGES; 144 while (remaining) { 145 remaining -= min_t(unsigned int, 146 PAGE_SIZE - offset, remaining); 147 offset = 0; 148 if (++count > ep->re_attr.cap.max_send_sge) 149 return false; 150 } 151 } 152 153 return true; 154 } 155 156 /* The client can't know how large the actual reply will be. Thus it 157 * plans for the largest possible reply for that particular ULP 158 * operation. If the maximum combined reply message size exceeds that 159 * limit, the client must provide a write list or a reply chunk for 160 * this request. 161 */ 162 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, 163 struct rpc_rqst *rqst) 164 { 165 return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv; 166 } 167 168 /* The client is required to provide a Reply chunk if the maximum 169 * size of the non-payload part of the RPC Reply is larger than 170 * the inline threshold. 171 */ 172 static bool 173 rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt, 174 const struct rpc_rqst *rqst) 175 { 176 const struct xdr_buf *buf = &rqst->rq_rcv_buf; 177 178 return (buf->head[0].iov_len + buf->tail[0].iov_len) < 179 r_xprt->rx_ep->re_max_inline_recv; 180 } 181 182 /* ACL likes to be lazy in allocating pages. For TCP, these 183 * pages can be allocated during receive processing. Not true 184 * for RDMA, which must always provision receive buffers 185 * up front. 186 */ 187 static noinline int 188 rpcrdma_alloc_sparse_pages(struct xdr_buf *buf) 189 { 190 struct page **ppages; 191 int len; 192 193 len = buf->page_len; 194 ppages = buf->pages + (buf->page_base >> PAGE_SHIFT); 195 while (len > 0) { 196 if (!*ppages) 197 *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN); 198 if (!*ppages) 199 return -ENOBUFS; 200 ppages++; 201 len -= PAGE_SIZE; 202 } 203 204 return 0; 205 } 206 207 /* Split @vec on page boundaries into SGEs. FMR registers pages, not 208 * a byte range. Other modes coalesce these SGEs into a single MR 209 * when they can. 210 * 211 * Returns pointer to next available SGE, and bumps the total number 212 * of SGEs consumed. 213 */ 214 static struct rpcrdma_mr_seg * 215 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, 216 unsigned int *n) 217 { 218 u32 remaining, page_offset; 219 char *base; 220 221 base = vec->iov_base; 222 page_offset = offset_in_page(base); 223 remaining = vec->iov_len; 224 while (remaining) { 225 seg->mr_page = NULL; 226 seg->mr_offset = base; 227 seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); 228 remaining -= seg->mr_len; 229 base += seg->mr_len; 230 ++seg; 231 ++(*n); 232 page_offset = 0; 233 } 234 return seg; 235 } 236 237 /* Convert @xdrbuf into SGEs no larger than a page each. As they 238 * are registered, these SGEs are then coalesced into RDMA segments 239 * when the selected memreg mode supports it. 240 * 241 * Returns positive number of SGEs consumed, or a negative errno. 242 */ 243 244 static int 245 rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, 246 unsigned int pos, enum rpcrdma_chunktype type, 247 struct rpcrdma_mr_seg *seg) 248 { 249 unsigned long page_base; 250 unsigned int len, n; 251 struct page **ppages; 252 253 n = 0; 254 if (pos == 0) 255 seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n); 256 257 len = xdrbuf->page_len; 258 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); 259 page_base = offset_in_page(xdrbuf->page_base); 260 while (len) { 261 seg->mr_page = *ppages; 262 seg->mr_offset = (char *)page_base; 263 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len); 264 len -= seg->mr_len; 265 ++ppages; 266 ++seg; 267 ++n; 268 page_base = 0; 269 } 270 271 /* When encoding a Read chunk, the tail iovec contains an 272 * XDR pad and may be omitted. 273 */ 274 if (type == rpcrdma_readch && r_xprt->rx_ep->re_implicit_roundup) 275 goto out; 276 277 /* When encoding a Write chunk, some servers need to see an 278 * extra segment for non-XDR-aligned Write chunks. The upper 279 * layer provides space in the tail iovec that may be used 280 * for this purpose. 281 */ 282 if (type == rpcrdma_writech && r_xprt->rx_ep->re_implicit_roundup) 283 goto out; 284 285 if (xdrbuf->tail[0].iov_len) 286 seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n); 287 288 out: 289 if (unlikely(n > RPCRDMA_MAX_SEGS)) 290 return -EIO; 291 return n; 292 } 293 294 static int 295 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr) 296 { 297 __be32 *p; 298 299 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 300 if (unlikely(!p)) 301 return -EMSGSIZE; 302 303 xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset); 304 return 0; 305 } 306 307 static int 308 encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, 309 u32 position) 310 { 311 __be32 *p; 312 313 p = xdr_reserve_space(xdr, 6 * sizeof(*p)); 314 if (unlikely(!p)) 315 return -EMSGSIZE; 316 317 *p++ = xdr_one; /* Item present */ 318 xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length, 319 mr->mr_offset); 320 return 0; 321 } 322 323 static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, 324 struct rpcrdma_req *req, 325 struct rpcrdma_mr_seg *seg, 326 int nsegs, bool writing, 327 struct rpcrdma_mr **mr) 328 { 329 *mr = rpcrdma_mr_pop(&req->rl_free_mrs); 330 if (!*mr) { 331 *mr = rpcrdma_mr_get(r_xprt); 332 if (!*mr) 333 goto out_getmr_err; 334 (*mr)->mr_req = req; 335 } 336 337 rpcrdma_mr_push(*mr, &req->rl_registered); 338 return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr); 339 340 out_getmr_err: 341 trace_xprtrdma_nomrs_err(r_xprt, req); 342 xprt_wait_for_buffer_space(&r_xprt->rx_xprt); 343 rpcrdma_mrs_refresh(r_xprt); 344 return ERR_PTR(-EAGAIN); 345 } 346 347 /* Register and XDR encode the Read list. Supports encoding a list of read 348 * segments that belong to a single read chunk. 349 * 350 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 351 * 352 * Read chunklist (a linked list): 353 * N elements, position P (same P for all chunks of same arg!): 354 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 355 * 356 * Returns zero on success, or a negative errno if a failure occurred. 357 * @xdr is advanced to the next position in the stream. 358 * 359 * Only a single @pos value is currently supported. 360 */ 361 static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, 362 struct rpcrdma_req *req, 363 struct rpc_rqst *rqst, 364 enum rpcrdma_chunktype rtype) 365 { 366 struct xdr_stream *xdr = &req->rl_stream; 367 struct rpcrdma_mr_seg *seg; 368 struct rpcrdma_mr *mr; 369 unsigned int pos; 370 int nsegs; 371 372 if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped) 373 goto done; 374 375 pos = rqst->rq_snd_buf.head[0].iov_len; 376 if (rtype == rpcrdma_areadch) 377 pos = 0; 378 seg = req->rl_segments; 379 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, 380 rtype, seg); 381 if (nsegs < 0) 382 return nsegs; 383 384 do { 385 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr); 386 if (IS_ERR(seg)) 387 return PTR_ERR(seg); 388 389 if (encode_read_segment(xdr, mr, pos) < 0) 390 return -EMSGSIZE; 391 392 trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs); 393 r_xprt->rx_stats.read_chunk_count++; 394 nsegs -= mr->mr_nents; 395 } while (nsegs); 396 397 done: 398 if (xdr_stream_encode_item_absent(xdr) < 0) 399 return -EMSGSIZE; 400 return 0; 401 } 402 403 /* Register and XDR encode the Write list. Supports encoding a list 404 * containing one array of plain segments that belong to a single 405 * write chunk. 406 * 407 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 408 * 409 * Write chunklist (a list of (one) counted array): 410 * N elements: 411 * 1 - N - HLOO - HLOO - ... - HLOO - 0 412 * 413 * Returns zero on success, or a negative errno if a failure occurred. 414 * @xdr is advanced to the next position in the stream. 415 * 416 * Only a single Write chunk is currently supported. 417 */ 418 static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, 419 struct rpcrdma_req *req, 420 struct rpc_rqst *rqst, 421 enum rpcrdma_chunktype wtype) 422 { 423 struct xdr_stream *xdr = &req->rl_stream; 424 struct rpcrdma_mr_seg *seg; 425 struct rpcrdma_mr *mr; 426 int nsegs, nchunks; 427 __be32 *segcount; 428 429 if (wtype != rpcrdma_writech) 430 goto done; 431 432 seg = req->rl_segments; 433 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 434 rqst->rq_rcv_buf.head[0].iov_len, 435 wtype, seg); 436 if (nsegs < 0) 437 return nsegs; 438 439 if (xdr_stream_encode_item_present(xdr) < 0) 440 return -EMSGSIZE; 441 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 442 if (unlikely(!segcount)) 443 return -EMSGSIZE; 444 /* Actual value encoded below */ 445 446 nchunks = 0; 447 do { 448 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); 449 if (IS_ERR(seg)) 450 return PTR_ERR(seg); 451 452 if (encode_rdma_segment(xdr, mr) < 0) 453 return -EMSGSIZE; 454 455 trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs); 456 r_xprt->rx_stats.write_chunk_count++; 457 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 458 nchunks++; 459 nsegs -= mr->mr_nents; 460 } while (nsegs); 461 462 /* Update count of segments in this Write chunk */ 463 *segcount = cpu_to_be32(nchunks); 464 465 done: 466 if (xdr_stream_encode_item_absent(xdr) < 0) 467 return -EMSGSIZE; 468 return 0; 469 } 470 471 /* Register and XDR encode the Reply chunk. Supports encoding an array 472 * of plain segments that belong to a single write (reply) chunk. 473 * 474 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 475 * 476 * Reply chunk (a counted array): 477 * N elements: 478 * 1 - N - HLOO - HLOO - ... - HLOO 479 * 480 * Returns zero on success, or a negative errno if a failure occurred. 481 * @xdr is advanced to the next position in the stream. 482 */ 483 static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, 484 struct rpcrdma_req *req, 485 struct rpc_rqst *rqst, 486 enum rpcrdma_chunktype wtype) 487 { 488 struct xdr_stream *xdr = &req->rl_stream; 489 struct rpcrdma_mr_seg *seg; 490 struct rpcrdma_mr *mr; 491 int nsegs, nchunks; 492 __be32 *segcount; 493 494 if (wtype != rpcrdma_replych) { 495 if (xdr_stream_encode_item_absent(xdr) < 0) 496 return -EMSGSIZE; 497 return 0; 498 } 499 500 seg = req->rl_segments; 501 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); 502 if (nsegs < 0) 503 return nsegs; 504 505 if (xdr_stream_encode_item_present(xdr) < 0) 506 return -EMSGSIZE; 507 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 508 if (unlikely(!segcount)) 509 return -EMSGSIZE; 510 /* Actual value encoded below */ 511 512 nchunks = 0; 513 do { 514 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); 515 if (IS_ERR(seg)) 516 return PTR_ERR(seg); 517 518 if (encode_rdma_segment(xdr, mr) < 0) 519 return -EMSGSIZE; 520 521 trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs); 522 r_xprt->rx_stats.reply_chunk_count++; 523 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 524 nchunks++; 525 nsegs -= mr->mr_nents; 526 } while (nsegs); 527 528 /* Update count of segments in the Reply chunk */ 529 *segcount = cpu_to_be32(nchunks); 530 531 return 0; 532 } 533 534 static void rpcrdma_sendctx_done(struct kref *kref) 535 { 536 struct rpcrdma_req *req = 537 container_of(kref, struct rpcrdma_req, rl_kref); 538 struct rpcrdma_rep *rep = req->rl_reply; 539 540 rpcrdma_complete_rqst(rep); 541 rep->rr_rxprt->rx_stats.reply_waits_for_send++; 542 } 543 544 /** 545 * rpcrdma_sendctx_unmap - DMA-unmap Send buffer 546 * @sc: sendctx containing SGEs to unmap 547 * 548 */ 549 void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) 550 { 551 struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf; 552 struct ib_sge *sge; 553 554 if (!sc->sc_unmap_count) 555 return; 556 557 /* The first two SGEs contain the transport header and 558 * the inline buffer. These are always left mapped so 559 * they can be cheaply re-used. 560 */ 561 for (sge = &sc->sc_sges[2]; sc->sc_unmap_count; 562 ++sge, --sc->sc_unmap_count) 563 ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length, 564 DMA_TO_DEVICE); 565 566 kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done); 567 } 568 569 /* Prepare an SGE for the RPC-over-RDMA transport header. 570 */ 571 static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, 572 struct rpcrdma_req *req, u32 len) 573 { 574 struct rpcrdma_sendctx *sc = req->rl_sendctx; 575 struct rpcrdma_regbuf *rb = req->rl_rdmabuf; 576 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 577 578 sge->addr = rdmab_addr(rb); 579 sge->length = len; 580 sge->lkey = rdmab_lkey(rb); 581 582 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, 583 DMA_TO_DEVICE); 584 } 585 586 /* The head iovec is straightforward, as it is usually already 587 * DMA-mapped. Sync the content that has changed. 588 */ 589 static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt, 590 struct rpcrdma_req *req, unsigned int len) 591 { 592 struct rpcrdma_sendctx *sc = req->rl_sendctx; 593 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 594 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 595 596 if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) 597 return false; 598 599 sge->addr = rdmab_addr(rb); 600 sge->length = len; 601 sge->lkey = rdmab_lkey(rb); 602 603 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, 604 DMA_TO_DEVICE); 605 return true; 606 } 607 608 /* If there is a page list present, DMA map and prepare an 609 * SGE for each page to be sent. 610 */ 611 static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req, 612 struct xdr_buf *xdr) 613 { 614 struct rpcrdma_sendctx *sc = req->rl_sendctx; 615 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 616 unsigned int page_base, len, remaining; 617 struct page **ppages; 618 struct ib_sge *sge; 619 620 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 621 page_base = offset_in_page(xdr->page_base); 622 remaining = xdr->page_len; 623 while (remaining) { 624 sge = &sc->sc_sges[req->rl_wr.num_sge++]; 625 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); 626 sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages, 627 page_base, len, DMA_TO_DEVICE); 628 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) 629 goto out_mapping_err; 630 631 sge->length = len; 632 sge->lkey = rdmab_lkey(rb); 633 634 sc->sc_unmap_count++; 635 ppages++; 636 remaining -= len; 637 page_base = 0; 638 } 639 640 return true; 641 642 out_mapping_err: 643 trace_xprtrdma_dma_maperr(sge->addr); 644 return false; 645 } 646 647 /* The tail iovec may include an XDR pad for the page list, 648 * as well as additional content, and may not reside in the 649 * same page as the head iovec. 650 */ 651 static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req, 652 struct xdr_buf *xdr, 653 unsigned int page_base, unsigned int len) 654 { 655 struct rpcrdma_sendctx *sc = req->rl_sendctx; 656 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 657 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 658 struct page *page = virt_to_page(xdr->tail[0].iov_base); 659 660 sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len, 661 DMA_TO_DEVICE); 662 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) 663 goto out_mapping_err; 664 665 sge->length = len; 666 sge->lkey = rdmab_lkey(rb); 667 ++sc->sc_unmap_count; 668 return true; 669 670 out_mapping_err: 671 trace_xprtrdma_dma_maperr(sge->addr); 672 return false; 673 } 674 675 /* Copy the tail to the end of the head buffer. 676 */ 677 static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt, 678 struct rpcrdma_req *req, 679 struct xdr_buf *xdr) 680 { 681 unsigned char *dst; 682 683 dst = (unsigned char *)xdr->head[0].iov_base; 684 dst += xdr->head[0].iov_len + xdr->page_len; 685 memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len); 686 r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len; 687 } 688 689 /* Copy pagelist content into the head buffer. 690 */ 691 static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt, 692 struct rpcrdma_req *req, 693 struct xdr_buf *xdr) 694 { 695 unsigned int len, page_base, remaining; 696 struct page **ppages; 697 unsigned char *src, *dst; 698 699 dst = (unsigned char *)xdr->head[0].iov_base; 700 dst += xdr->head[0].iov_len; 701 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 702 page_base = offset_in_page(xdr->page_base); 703 remaining = xdr->page_len; 704 while (remaining) { 705 src = page_address(*ppages); 706 src += page_base; 707 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); 708 memcpy(dst, src, len); 709 r_xprt->rx_stats.pullup_copy_count += len; 710 711 ppages++; 712 dst += len; 713 remaining -= len; 714 page_base = 0; 715 } 716 } 717 718 /* Copy the contents of @xdr into @rl_sendbuf and DMA sync it. 719 * When the head, pagelist, and tail are small, a pull-up copy 720 * is considerably less costly than DMA mapping the components 721 * of @xdr. 722 * 723 * Assumptions: 724 * - the caller has already verified that the total length 725 * of the RPC Call body will fit into @rl_sendbuf. 726 */ 727 static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt, 728 struct rpcrdma_req *req, 729 struct xdr_buf *xdr) 730 { 731 if (unlikely(xdr->tail[0].iov_len)) 732 rpcrdma_pullup_tail_iov(r_xprt, req, xdr); 733 734 if (unlikely(xdr->page_len)) 735 rpcrdma_pullup_pagelist(r_xprt, req, xdr); 736 737 /* The whole RPC message resides in the head iovec now */ 738 return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len); 739 } 740 741 static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt, 742 struct rpcrdma_req *req, 743 struct xdr_buf *xdr) 744 { 745 struct kvec *tail = &xdr->tail[0]; 746 747 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) 748 return false; 749 if (xdr->page_len) 750 if (!rpcrdma_prepare_pagelist(req, xdr)) 751 return false; 752 if (tail->iov_len) 753 if (!rpcrdma_prepare_tail_iov(req, xdr, 754 offset_in_page(tail->iov_base), 755 tail->iov_len)) 756 return false; 757 758 if (req->rl_sendctx->sc_unmap_count) 759 kref_get(&req->rl_kref); 760 return true; 761 } 762 763 static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt, 764 struct rpcrdma_req *req, 765 struct xdr_buf *xdr) 766 { 767 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) 768 return false; 769 770 /* If there is a Read chunk, the page list is being handled 771 * via explicit RDMA, and thus is skipped here. 772 */ 773 774 /* Do not include the tail if it is only an XDR pad */ 775 if (xdr->tail[0].iov_len > 3) { 776 unsigned int page_base, len; 777 778 /* If the content in the page list is an odd length, 779 * xdr_write_pages() adds a pad at the beginning of 780 * the tail iovec. Force the tail's non-pad content to 781 * land at the next XDR position in the Send message. 782 */ 783 page_base = offset_in_page(xdr->tail[0].iov_base); 784 len = xdr->tail[0].iov_len; 785 page_base += len & 3; 786 len -= len & 3; 787 if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len)) 788 return false; 789 kref_get(&req->rl_kref); 790 } 791 792 return true; 793 } 794 795 /** 796 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR 797 * @r_xprt: controlling transport 798 * @req: context of RPC Call being marshalled 799 * @hdrlen: size of transport header, in bytes 800 * @xdr: xdr_buf containing RPC Call 801 * @rtype: chunk type being encoded 802 * 803 * Returns 0 on success; otherwise a negative errno is returned. 804 */ 805 inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, 806 struct rpcrdma_req *req, u32 hdrlen, 807 struct xdr_buf *xdr, 808 enum rpcrdma_chunktype rtype) 809 { 810 int ret; 811 812 ret = -EAGAIN; 813 req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); 814 if (!req->rl_sendctx) 815 goto out_nosc; 816 req->rl_sendctx->sc_unmap_count = 0; 817 req->rl_sendctx->sc_req = req; 818 kref_init(&req->rl_kref); 819 req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe; 820 req->rl_wr.sg_list = req->rl_sendctx->sc_sges; 821 req->rl_wr.num_sge = 0; 822 req->rl_wr.opcode = IB_WR_SEND; 823 824 rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen); 825 826 ret = -EIO; 827 switch (rtype) { 828 case rpcrdma_noch_pullup: 829 if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr)) 830 goto out_unmap; 831 break; 832 case rpcrdma_noch_mapped: 833 if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr)) 834 goto out_unmap; 835 break; 836 case rpcrdma_readch: 837 if (!rpcrdma_prepare_readch(r_xprt, req, xdr)) 838 goto out_unmap; 839 break; 840 case rpcrdma_areadch: 841 break; 842 default: 843 goto out_unmap; 844 } 845 846 return 0; 847 848 out_unmap: 849 rpcrdma_sendctx_unmap(req->rl_sendctx); 850 out_nosc: 851 trace_xprtrdma_prepsend_failed(&req->rl_slot, ret); 852 return ret; 853 } 854 855 /** 856 * rpcrdma_marshal_req - Marshal and send one RPC request 857 * @r_xprt: controlling transport 858 * @rqst: RPC request to be marshaled 859 * 860 * For the RPC in "rqst", this function: 861 * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG) 862 * - Registers Read, Write, and Reply chunks 863 * - Constructs the transport header 864 * - Posts a Send WR to send the transport header and request 865 * 866 * Returns: 867 * %0 if the RPC was sent successfully, 868 * %-ENOTCONN if the connection was lost, 869 * %-EAGAIN if the caller should call again with the same arguments, 870 * %-ENOBUFS if the caller should call again after a delay, 871 * %-EMSGSIZE if the transport header is too small, 872 * %-EIO if a permanent problem occurred while marshaling. 873 */ 874 int 875 rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) 876 { 877 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 878 struct xdr_stream *xdr = &req->rl_stream; 879 enum rpcrdma_chunktype rtype, wtype; 880 struct xdr_buf *buf = &rqst->rq_snd_buf; 881 bool ddp_allowed; 882 __be32 *p; 883 int ret; 884 885 if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) { 886 ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf); 887 if (ret) 888 return ret; 889 } 890 891 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); 892 xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf), 893 rqst); 894 895 /* Fixed header fields */ 896 ret = -EMSGSIZE; 897 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 898 if (!p) 899 goto out_err; 900 *p++ = rqst->rq_xid; 901 *p++ = rpcrdma_version; 902 *p++ = r_xprt->rx_buf.rb_max_requests; 903 904 /* When the ULP employs a GSS flavor that guarantees integrity 905 * or privacy, direct data placement of individual data items 906 * is not allowed. 907 */ 908 ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH, 909 &rqst->rq_cred->cr_auth->au_flags); 910 911 /* 912 * Chunks needed for results? 913 * 914 * o If the expected result is under the inline threshold, all ops 915 * return as inline. 916 * o Large read ops return data as write chunk(s), header as 917 * inline. 918 * o Large non-read ops return as a single reply chunk. 919 */ 920 if (rpcrdma_results_inline(r_xprt, rqst)) 921 wtype = rpcrdma_noch; 922 else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) && 923 rpcrdma_nonpayload_inline(r_xprt, rqst)) 924 wtype = rpcrdma_writech; 925 else 926 wtype = rpcrdma_replych; 927 928 /* 929 * Chunks needed for arguments? 930 * 931 * o If the total request is under the inline threshold, all ops 932 * are sent as inline. 933 * o Large write ops transmit data as read chunk(s), header as 934 * inline. 935 * o Large non-write ops are sent with the entire message as a 936 * single read chunk (protocol 0-position special case). 937 * 938 * This assumes that the upper layer does not present a request 939 * that both has a data payload, and whose non-data arguments 940 * by themselves are larger than the inline threshold. 941 */ 942 if (rpcrdma_args_inline(r_xprt, rqst)) { 943 *p++ = rdma_msg; 944 rtype = buf->len < rdmab_length(req->rl_sendbuf) ? 945 rpcrdma_noch_pullup : rpcrdma_noch_mapped; 946 } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) { 947 *p++ = rdma_msg; 948 rtype = rpcrdma_readch; 949 } else { 950 r_xprt->rx_stats.nomsg_call_count++; 951 *p++ = rdma_nomsg; 952 rtype = rpcrdma_areadch; 953 } 954 955 /* This implementation supports the following combinations 956 * of chunk lists in one RPC-over-RDMA Call message: 957 * 958 * - Read list 959 * - Write list 960 * - Reply chunk 961 * - Read list + Reply chunk 962 * 963 * It might not yet support the following combinations: 964 * 965 * - Read list + Write list 966 * 967 * It does not support the following combinations: 968 * 969 * - Write list + Reply chunk 970 * - Read list + Write list + Reply chunk 971 * 972 * This implementation supports only a single chunk in each 973 * Read or Write list. Thus for example the client cannot 974 * send a Call message with a Position Zero Read chunk and a 975 * regular Read chunk at the same time. 976 */ 977 ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); 978 if (ret) 979 goto out_err; 980 ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); 981 if (ret) 982 goto out_err; 983 ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); 984 if (ret) 985 goto out_err; 986 987 ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len, 988 buf, rtype); 989 if (ret) 990 goto out_err; 991 992 trace_xprtrdma_marshal(req, rtype, wtype); 993 return 0; 994 995 out_err: 996 trace_xprtrdma_marshal_failed(rqst, ret); 997 r_xprt->rx_stats.failed_marshal_count++; 998 frwr_reset(req); 999 return ret; 1000 } 1001 1002 static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt, 1003 struct rpcrdma_buffer *buf, 1004 u32 grant) 1005 { 1006 buf->rb_credits = grant; 1007 xprt->cwnd = grant << RPC_CWNDSHIFT; 1008 } 1009 1010 static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant) 1011 { 1012 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1013 1014 spin_lock(&xprt->transport_lock); 1015 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant); 1016 spin_unlock(&xprt->transport_lock); 1017 } 1018 1019 /** 1020 * rpcrdma_reset_cwnd - Reset the xprt's congestion window 1021 * @r_xprt: controlling transport instance 1022 * 1023 * Prepare @r_xprt for the next connection by reinitializing 1024 * its credit grant to one (see RFC 8166, Section 3.3.3). 1025 */ 1026 void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt) 1027 { 1028 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1029 1030 spin_lock(&xprt->transport_lock); 1031 xprt->cong = 0; 1032 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1); 1033 spin_unlock(&xprt->transport_lock); 1034 } 1035 1036 /** 1037 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs 1038 * @rqst: controlling RPC request 1039 * @srcp: points to RPC message payload in receive buffer 1040 * @copy_len: remaining length of receive buffer content 1041 * @pad: Write chunk pad bytes needed (zero for pure inline) 1042 * 1043 * The upper layer has set the maximum number of bytes it can 1044 * receive in each component of rq_rcv_buf. These values are set in 1045 * the head.iov_len, page_len, tail.iov_len, and buflen fields. 1046 * 1047 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in 1048 * many cases this function simply updates iov_base pointers in 1049 * rq_rcv_buf to point directly to the received reply data, to 1050 * avoid copying reply data. 1051 * 1052 * Returns the count of bytes which had to be memcopied. 1053 */ 1054 static unsigned long 1055 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) 1056 { 1057 unsigned long fixup_copy_count; 1058 int i, npages, curlen; 1059 char *destp; 1060 struct page **ppages; 1061 int page_base; 1062 1063 /* The head iovec is redirected to the RPC reply message 1064 * in the receive buffer, to avoid a memcopy. 1065 */ 1066 rqst->rq_rcv_buf.head[0].iov_base = srcp; 1067 rqst->rq_private_buf.head[0].iov_base = srcp; 1068 1069 /* The contents of the receive buffer that follow 1070 * head.iov_len bytes are copied into the page list. 1071 */ 1072 curlen = rqst->rq_rcv_buf.head[0].iov_len; 1073 if (curlen > copy_len) 1074 curlen = copy_len; 1075 srcp += curlen; 1076 copy_len -= curlen; 1077 1078 ppages = rqst->rq_rcv_buf.pages + 1079 (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT); 1080 page_base = offset_in_page(rqst->rq_rcv_buf.page_base); 1081 fixup_copy_count = 0; 1082 if (copy_len && rqst->rq_rcv_buf.page_len) { 1083 int pagelist_len; 1084 1085 pagelist_len = rqst->rq_rcv_buf.page_len; 1086 if (pagelist_len > copy_len) 1087 pagelist_len = copy_len; 1088 npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT; 1089 for (i = 0; i < npages; i++) { 1090 curlen = PAGE_SIZE - page_base; 1091 if (curlen > pagelist_len) 1092 curlen = pagelist_len; 1093 1094 destp = kmap_atomic(ppages[i]); 1095 memcpy(destp + page_base, srcp, curlen); 1096 flush_dcache_page(ppages[i]); 1097 kunmap_atomic(destp); 1098 srcp += curlen; 1099 copy_len -= curlen; 1100 fixup_copy_count += curlen; 1101 pagelist_len -= curlen; 1102 if (!pagelist_len) 1103 break; 1104 page_base = 0; 1105 } 1106 1107 /* Implicit padding for the last segment in a Write 1108 * chunk is inserted inline at the front of the tail 1109 * iovec. The upper layer ignores the content of 1110 * the pad. Simply ensure inline content in the tail 1111 * that follows the Write chunk is properly aligned. 1112 */ 1113 if (pad) 1114 srcp -= pad; 1115 } 1116 1117 /* The tail iovec is redirected to the remaining data 1118 * in the receive buffer, to avoid a memcopy. 1119 */ 1120 if (copy_len || pad) { 1121 rqst->rq_rcv_buf.tail[0].iov_base = srcp; 1122 rqst->rq_private_buf.tail[0].iov_base = srcp; 1123 } 1124 1125 if (fixup_copy_count) 1126 trace_xprtrdma_fixup(rqst, fixup_copy_count); 1127 return fixup_copy_count; 1128 } 1129 1130 /* By convention, backchannel calls arrive via rdma_msg type 1131 * messages, and never populate the chunk lists. This makes 1132 * the RPC/RDMA header small and fixed in size, so it is 1133 * straightforward to check the RPC header's direction field. 1134 */ 1135 static bool 1136 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 1137 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1138 { 1139 struct xdr_stream *xdr = &rep->rr_stream; 1140 __be32 *p; 1141 1142 if (rep->rr_proc != rdma_msg) 1143 return false; 1144 1145 /* Peek at stream contents without advancing. */ 1146 p = xdr_inline_decode(xdr, 0); 1147 1148 /* Chunk lists */ 1149 if (xdr_item_is_present(p++)) 1150 return false; 1151 if (xdr_item_is_present(p++)) 1152 return false; 1153 if (xdr_item_is_present(p++)) 1154 return false; 1155 1156 /* RPC header */ 1157 if (*p++ != rep->rr_xid) 1158 return false; 1159 if (*p != cpu_to_be32(RPC_CALL)) 1160 return false; 1161 1162 /* Now that we are sure this is a backchannel call, 1163 * advance to the RPC header. 1164 */ 1165 p = xdr_inline_decode(xdr, 3 * sizeof(*p)); 1166 if (unlikely(!p)) 1167 goto out_short; 1168 1169 rpcrdma_bc_receive_call(r_xprt, rep); 1170 return true; 1171 1172 out_short: 1173 pr_warn("RPC/RDMA short backward direction call\n"); 1174 return true; 1175 } 1176 #else /* CONFIG_SUNRPC_BACKCHANNEL */ 1177 { 1178 return false; 1179 } 1180 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1181 1182 static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length) 1183 { 1184 u32 handle; 1185 u64 offset; 1186 __be32 *p; 1187 1188 p = xdr_inline_decode(xdr, 4 * sizeof(*p)); 1189 if (unlikely(!p)) 1190 return -EIO; 1191 1192 xdr_decode_rdma_segment(p, &handle, length, &offset); 1193 trace_xprtrdma_decode_seg(handle, *length, offset); 1194 return 0; 1195 } 1196 1197 static int decode_write_chunk(struct xdr_stream *xdr, u32 *length) 1198 { 1199 u32 segcount, seglength; 1200 __be32 *p; 1201 1202 p = xdr_inline_decode(xdr, sizeof(*p)); 1203 if (unlikely(!p)) 1204 return -EIO; 1205 1206 *length = 0; 1207 segcount = be32_to_cpup(p); 1208 while (segcount--) { 1209 if (decode_rdma_segment(xdr, &seglength)) 1210 return -EIO; 1211 *length += seglength; 1212 } 1213 1214 return 0; 1215 } 1216 1217 /* In RPC-over-RDMA Version One replies, a Read list is never 1218 * expected. This decoder is a stub that returns an error if 1219 * a Read list is present. 1220 */ 1221 static int decode_read_list(struct xdr_stream *xdr) 1222 { 1223 __be32 *p; 1224 1225 p = xdr_inline_decode(xdr, sizeof(*p)); 1226 if (unlikely(!p)) 1227 return -EIO; 1228 if (unlikely(xdr_item_is_present(p))) 1229 return -EIO; 1230 return 0; 1231 } 1232 1233 /* Supports only one Write chunk in the Write list 1234 */ 1235 static int decode_write_list(struct xdr_stream *xdr, u32 *length) 1236 { 1237 u32 chunklen; 1238 bool first; 1239 __be32 *p; 1240 1241 *length = 0; 1242 first = true; 1243 do { 1244 p = xdr_inline_decode(xdr, sizeof(*p)); 1245 if (unlikely(!p)) 1246 return -EIO; 1247 if (xdr_item_is_absent(p)) 1248 break; 1249 if (!first) 1250 return -EIO; 1251 1252 if (decode_write_chunk(xdr, &chunklen)) 1253 return -EIO; 1254 *length += chunklen; 1255 first = false; 1256 } while (true); 1257 return 0; 1258 } 1259 1260 static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length) 1261 { 1262 __be32 *p; 1263 1264 p = xdr_inline_decode(xdr, sizeof(*p)); 1265 if (unlikely(!p)) 1266 return -EIO; 1267 1268 *length = 0; 1269 if (xdr_item_is_present(p)) 1270 if (decode_write_chunk(xdr, length)) 1271 return -EIO; 1272 return 0; 1273 } 1274 1275 static int 1276 rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1277 struct rpc_rqst *rqst) 1278 { 1279 struct xdr_stream *xdr = &rep->rr_stream; 1280 u32 writelist, replychunk, rpclen; 1281 char *base; 1282 1283 /* Decode the chunk lists */ 1284 if (decode_read_list(xdr)) 1285 return -EIO; 1286 if (decode_write_list(xdr, &writelist)) 1287 return -EIO; 1288 if (decode_reply_chunk(xdr, &replychunk)) 1289 return -EIO; 1290 1291 /* RDMA_MSG sanity checks */ 1292 if (unlikely(replychunk)) 1293 return -EIO; 1294 1295 /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */ 1296 base = (char *)xdr_inline_decode(xdr, 0); 1297 rpclen = xdr_stream_remaining(xdr); 1298 r_xprt->rx_stats.fixup_copy_count += 1299 rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3); 1300 1301 r_xprt->rx_stats.total_rdma_reply += writelist; 1302 return rpclen + xdr_align_size(writelist); 1303 } 1304 1305 static noinline int 1306 rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 1307 { 1308 struct xdr_stream *xdr = &rep->rr_stream; 1309 u32 writelist, replychunk; 1310 1311 /* Decode the chunk lists */ 1312 if (decode_read_list(xdr)) 1313 return -EIO; 1314 if (decode_write_list(xdr, &writelist)) 1315 return -EIO; 1316 if (decode_reply_chunk(xdr, &replychunk)) 1317 return -EIO; 1318 1319 /* RDMA_NOMSG sanity checks */ 1320 if (unlikely(writelist)) 1321 return -EIO; 1322 if (unlikely(!replychunk)) 1323 return -EIO; 1324 1325 /* Reply chunk buffer already is the reply vector */ 1326 r_xprt->rx_stats.total_rdma_reply += replychunk; 1327 return replychunk; 1328 } 1329 1330 static noinline int 1331 rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1332 struct rpc_rqst *rqst) 1333 { 1334 struct xdr_stream *xdr = &rep->rr_stream; 1335 __be32 *p; 1336 1337 p = xdr_inline_decode(xdr, sizeof(*p)); 1338 if (unlikely(!p)) 1339 return -EIO; 1340 1341 switch (*p) { 1342 case err_vers: 1343 p = xdr_inline_decode(xdr, 2 * sizeof(*p)); 1344 if (!p) 1345 break; 1346 trace_xprtrdma_err_vers(rqst, p, p + 1); 1347 break; 1348 case err_chunk: 1349 trace_xprtrdma_err_chunk(rqst); 1350 break; 1351 default: 1352 trace_xprtrdma_err_unrecognized(rqst, p); 1353 } 1354 1355 return -EIO; 1356 } 1357 1358 /* Perform XID lookup, reconstruction of the RPC reply, and 1359 * RPC completion while holding the transport lock to ensure 1360 * the rep, rqst, and rq_task pointers remain stable. 1361 */ 1362 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) 1363 { 1364 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1365 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1366 struct rpc_rqst *rqst = rep->rr_rqst; 1367 int status; 1368 1369 switch (rep->rr_proc) { 1370 case rdma_msg: 1371 status = rpcrdma_decode_msg(r_xprt, rep, rqst); 1372 break; 1373 case rdma_nomsg: 1374 status = rpcrdma_decode_nomsg(r_xprt, rep); 1375 break; 1376 case rdma_error: 1377 status = rpcrdma_decode_error(r_xprt, rep, rqst); 1378 break; 1379 default: 1380 status = -EIO; 1381 } 1382 if (status < 0) 1383 goto out_badheader; 1384 1385 out: 1386 spin_lock(&xprt->queue_lock); 1387 xprt_complete_rqst(rqst->rq_task, status); 1388 xprt_unpin_rqst(rqst); 1389 spin_unlock(&xprt->queue_lock); 1390 return; 1391 1392 out_badheader: 1393 trace_xprtrdma_reply_hdr_err(rep); 1394 r_xprt->rx_stats.bad_reply_count++; 1395 rqst->rq_task->tk_status = status; 1396 status = 0; 1397 goto out; 1398 } 1399 1400 static void rpcrdma_reply_done(struct kref *kref) 1401 { 1402 struct rpcrdma_req *req = 1403 container_of(kref, struct rpcrdma_req, rl_kref); 1404 1405 rpcrdma_complete_rqst(req->rl_reply); 1406 } 1407 1408 /** 1409 * rpcrdma_reply_handler - Process received RPC/RDMA messages 1410 * @rep: Incoming rpcrdma_rep object to process 1411 * 1412 * Errors must result in the RPC task either being awakened, or 1413 * allowed to timeout, to discover the errors at that time. 1414 */ 1415 void rpcrdma_reply_handler(struct rpcrdma_rep *rep) 1416 { 1417 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1418 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1419 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1420 struct rpcrdma_req *req; 1421 struct rpc_rqst *rqst; 1422 u32 credits; 1423 __be32 *p; 1424 1425 /* Any data means we had a useful conversation, so 1426 * then we don't need to delay the next reconnect. 1427 */ 1428 if (xprt->reestablish_timeout) 1429 xprt->reestablish_timeout = 0; 1430 1431 /* Fixed transport header fields */ 1432 xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, 1433 rep->rr_hdrbuf.head[0].iov_base, NULL); 1434 p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p)); 1435 if (unlikely(!p)) 1436 goto out_shortreply; 1437 rep->rr_xid = *p++; 1438 rep->rr_vers = *p++; 1439 credits = be32_to_cpu(*p++); 1440 rep->rr_proc = *p++; 1441 1442 if (rep->rr_vers != rpcrdma_version) 1443 goto out_badversion; 1444 1445 if (rpcrdma_is_bcall(r_xprt, rep)) 1446 return; 1447 1448 /* Match incoming rpcrdma_rep to an rpcrdma_req to 1449 * get context for handling any incoming chunks. 1450 */ 1451 spin_lock(&xprt->queue_lock); 1452 rqst = xprt_lookup_rqst(xprt, rep->rr_xid); 1453 if (!rqst) 1454 goto out_norqst; 1455 xprt_pin_rqst(rqst); 1456 spin_unlock(&xprt->queue_lock); 1457 1458 if (credits == 0) 1459 credits = 1; /* don't deadlock */ 1460 else if (credits > r_xprt->rx_ep->re_max_requests) 1461 credits = r_xprt->rx_ep->re_max_requests; 1462 if (buf->rb_credits != credits) 1463 rpcrdma_update_cwnd(r_xprt, credits); 1464 rpcrdma_post_recvs(r_xprt, false); 1465 1466 req = rpcr_to_rdmar(rqst); 1467 if (unlikely(req->rl_reply)) 1468 rpcrdma_recv_buffer_put(req->rl_reply); 1469 req->rl_reply = rep; 1470 rep->rr_rqst = rqst; 1471 1472 trace_xprtrdma_reply(rqst->rq_task, rep, credits); 1473 1474 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) 1475 frwr_reminv(rep, &req->rl_registered); 1476 if (!list_empty(&req->rl_registered)) 1477 frwr_unmap_async(r_xprt, req); 1478 /* LocalInv completion will complete the RPC */ 1479 else 1480 kref_put(&req->rl_kref, rpcrdma_reply_done); 1481 return; 1482 1483 out_badversion: 1484 trace_xprtrdma_reply_vers_err(rep); 1485 goto out; 1486 1487 out_norqst: 1488 spin_unlock(&xprt->queue_lock); 1489 trace_xprtrdma_reply_rqst_err(rep); 1490 goto out; 1491 1492 out_shortreply: 1493 trace_xprtrdma_reply_short_err(rep); 1494 1495 out: 1496 rpcrdma_recv_buffer_put(rep); 1497 } 1498