1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 /* 3 * Copyright (c) 2014-2020, Oracle and/or its affiliates. 4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the BSD-type 10 * license below: 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted provided that the following conditions 14 * are met: 15 * 16 * Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * Neither the name of the Network Appliance, Inc. nor the names of 25 * its contributors may be used to endorse or promote products 26 * derived from this software without specific prior written 27 * permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 40 */ 41 42 /* 43 * rpc_rdma.c 44 * 45 * This file contains the guts of the RPC RDMA protocol, and 46 * does marshaling/unmarshaling, etc. It is also where interfacing 47 * to the Linux RPC framework lives. 48 */ 49 50 #include <linux/highmem.h> 51 52 #include <linux/sunrpc/svc_rdma.h> 53 54 #include "xprt_rdma.h" 55 #include <trace/events/rpcrdma.h> 56 57 /* Returns size of largest RPC-over-RDMA header in a Call message 58 * 59 * The largest Call header contains a full-size Read list and a 60 * minimal Reply chunk. 61 */ 62 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) 63 { 64 unsigned int size; 65 66 /* Fixed header fields and list discriminators */ 67 size = RPCRDMA_HDRLEN_MIN; 68 69 /* Maximum Read list size */ 70 size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32); 71 72 /* Minimal Read chunk size */ 73 size += sizeof(__be32); /* segment count */ 74 size += rpcrdma_segment_maxsz * sizeof(__be32); 75 size += sizeof(__be32); /* list discriminator */ 76 77 return size; 78 } 79 80 /* Returns size of largest RPC-over-RDMA header in a Reply message 81 * 82 * There is only one Write list or one Reply chunk per Reply 83 * message. The larger list is the Write list. 84 */ 85 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) 86 { 87 unsigned int size; 88 89 /* Fixed header fields and list discriminators */ 90 size = RPCRDMA_HDRLEN_MIN; 91 92 /* Maximum Write list size */ 93 size += sizeof(__be32); /* segment count */ 94 size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); 95 size += sizeof(__be32); /* list discriminator */ 96 97 return size; 98 } 99 100 /** 101 * rpcrdma_set_max_header_sizes - Initialize inline payload sizes 102 * @ep: endpoint to initialize 103 * 104 * The max_inline fields contain the maximum size of an RPC message 105 * so the marshaling code doesn't have to repeat this calculation 106 * for every RPC. 107 */ 108 void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep) 109 { 110 unsigned int maxsegs = ep->re_max_rdma_segs; 111 112 ep->re_max_inline_send = 113 ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs); 114 ep->re_max_inline_recv = 115 ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs); 116 } 117 118 /* The client can send a request inline as long as the RPCRDMA header 119 * plus the RPC call fit under the transport's inline limit. If the 120 * combined call message size exceeds that limit, the client must use 121 * a Read chunk for this operation. 122 * 123 * A Read chunk is also required if sending the RPC call inline would 124 * exceed this device's max_sge limit. 125 */ 126 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, 127 struct rpc_rqst *rqst) 128 { 129 struct xdr_buf *xdr = &rqst->rq_snd_buf; 130 struct rpcrdma_ep *ep = r_xprt->rx_ep; 131 unsigned int count, remaining, offset; 132 133 if (xdr->len > ep->re_max_inline_send) 134 return false; 135 136 if (xdr->page_len) { 137 remaining = xdr->page_len; 138 offset = offset_in_page(xdr->page_base); 139 count = RPCRDMA_MIN_SEND_SGES; 140 while (remaining) { 141 remaining -= min_t(unsigned int, 142 PAGE_SIZE - offset, remaining); 143 offset = 0; 144 if (++count > ep->re_attr.cap.max_send_sge) 145 return false; 146 } 147 } 148 149 return true; 150 } 151 152 /* The client can't know how large the actual reply will be. Thus it 153 * plans for the largest possible reply for that particular ULP 154 * operation. If the maximum combined reply message size exceeds that 155 * limit, the client must provide a write list or a reply chunk for 156 * this request. 157 */ 158 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, 159 struct rpc_rqst *rqst) 160 { 161 return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv; 162 } 163 164 /* The client is required to provide a Reply chunk if the maximum 165 * size of the non-payload part of the RPC Reply is larger than 166 * the inline threshold. 167 */ 168 static bool 169 rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt, 170 const struct rpc_rqst *rqst) 171 { 172 const struct xdr_buf *buf = &rqst->rq_rcv_buf; 173 174 return (buf->head[0].iov_len + buf->tail[0].iov_len) < 175 r_xprt->rx_ep->re_max_inline_recv; 176 } 177 178 /* ACL likes to be lazy in allocating pages. For TCP, these 179 * pages can be allocated during receive processing. Not true 180 * for RDMA, which must always provision receive buffers 181 * up front. 182 */ 183 static noinline int 184 rpcrdma_alloc_sparse_pages(struct xdr_buf *buf) 185 { 186 struct page **ppages; 187 int len; 188 189 len = buf->page_len; 190 ppages = buf->pages + (buf->page_base >> PAGE_SHIFT); 191 while (len > 0) { 192 if (!*ppages) 193 *ppages = alloc_page(GFP_NOWAIT); 194 if (!*ppages) 195 return -ENOBUFS; 196 ppages++; 197 len -= PAGE_SIZE; 198 } 199 200 return 0; 201 } 202 203 static void 204 rpcrdma_xdr_cursor_init(struct rpcrdma_xdr_cursor *cur, 205 const struct xdr_buf *xdrbuf, 206 unsigned int pos, enum rpcrdma_chunktype type) 207 { 208 cur->xc_buf = xdrbuf; 209 cur->xc_page_offset = 0; 210 cur->xc_flags = 0; 211 212 if (pos != 0) 213 cur->xc_flags |= XC_HEAD_DONE; 214 if (!xdrbuf->page_len) 215 cur->xc_flags |= XC_PAGES_DONE; 216 if (type == rpcrdma_readch || type == rpcrdma_writech || 217 !xdrbuf->tail[0].iov_len) 218 cur->xc_flags |= XC_TAIL_DONE; 219 } 220 221 static bool 222 rpcrdma_xdr_cursor_done(const struct rpcrdma_xdr_cursor *cur) 223 { 224 return (cur->xc_flags & (XC_HEAD_DONE | XC_PAGES_DONE | 225 XC_TAIL_DONE)) == 226 (XC_HEAD_DONE | XC_PAGES_DONE | XC_TAIL_DONE); 227 } 228 229 static int 230 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr) 231 { 232 __be32 *p; 233 234 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 235 if (unlikely(!p)) 236 return -EMSGSIZE; 237 238 xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset); 239 return 0; 240 } 241 242 static int 243 encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, 244 u32 position) 245 { 246 __be32 *p; 247 248 p = xdr_reserve_space(xdr, 6 * sizeof(*p)); 249 if (unlikely(!p)) 250 return -EMSGSIZE; 251 252 *p++ = xdr_one; /* Item present */ 253 xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length, 254 mr->mr_offset); 255 return 0; 256 } 257 258 static int rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, 259 struct rpcrdma_req *req, 260 struct rpcrdma_xdr_cursor *cur, 261 bool writing, struct rpcrdma_mr **mr) 262 { 263 *mr = rpcrdma_mr_pop(&req->rl_free_mrs); 264 if (!*mr) { 265 *mr = rpcrdma_mr_get(r_xprt); 266 if (!*mr) 267 goto out_getmr_err; 268 (*mr)->mr_req = req; 269 } 270 271 rpcrdma_mr_push(*mr, &req->rl_registered); 272 return frwr_map(r_xprt, cur, writing, req->rl_slot.rq_xid, *mr); 273 274 out_getmr_err: 275 trace_xprtrdma_nomrs_err(r_xprt, req); 276 xprt_wait_for_buffer_space(&r_xprt->rx_xprt); 277 rpcrdma_mrs_refresh(r_xprt); 278 return -EAGAIN; 279 } 280 281 /* Register and XDR encode the Read list. Supports encoding a list of read 282 * segments that belong to a single read chunk. 283 * 284 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 285 * 286 * Read chunklist (a linked list): 287 * N elements, position P (same P for all chunks of same arg!): 288 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 289 * 290 * Returns zero on success, or a negative errno if a failure occurred. 291 * @xdr is advanced to the next position in the stream. 292 * 293 * Only a single @pos value is currently supported. 294 */ 295 static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, 296 struct rpcrdma_req *req, 297 struct rpc_rqst *rqst, 298 enum rpcrdma_chunktype rtype) 299 { 300 struct xdr_stream *xdr = &req->rl_stream; 301 struct rpcrdma_xdr_cursor cur; 302 struct rpcrdma_mr *mr; 303 unsigned int pos; 304 int ret; 305 306 if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped) 307 goto done; 308 309 pos = rqst->rq_snd_buf.head[0].iov_len; 310 if (rtype == rpcrdma_areadch) 311 pos = 0; 312 rpcrdma_xdr_cursor_init(&cur, &rqst->rq_snd_buf, pos, rtype); 313 314 do { 315 ret = rpcrdma_mr_prepare(r_xprt, req, &cur, false, &mr); 316 if (ret) 317 return ret; 318 319 if (encode_read_segment(xdr, mr, pos) < 0) 320 return -EMSGSIZE; 321 322 trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, 323 rpcrdma_xdr_cursor_done(&cur)); 324 r_xprt->rx_stats.read_chunk_count++; 325 } while (!rpcrdma_xdr_cursor_done(&cur)); 326 327 done: 328 if (xdr_stream_encode_item_absent(xdr) < 0) 329 return -EMSGSIZE; 330 return 0; 331 } 332 333 /* Register and XDR encode the Write list. Supports encoding a list 334 * containing one array of plain segments that belong to a single 335 * write chunk. 336 * 337 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 338 * 339 * Write chunklist (a list of (one) counted array): 340 * N elements: 341 * 1 - N - HLOO - HLOO - ... - HLOO - 0 342 * 343 * Returns zero on success, or a negative errno if a failure occurred. 344 * @xdr is advanced to the next position in the stream. 345 * 346 * Only a single Write chunk is currently supported. 347 */ 348 static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, 349 struct rpcrdma_req *req, 350 struct rpc_rqst *rqst, 351 enum rpcrdma_chunktype wtype) 352 { 353 struct xdr_stream *xdr = &req->rl_stream; 354 struct rpcrdma_ep *ep = r_xprt->rx_ep; 355 struct rpcrdma_xdr_cursor cur; 356 struct rpcrdma_mr *mr; 357 int nchunks, ret; 358 __be32 *segcount; 359 360 if (wtype != rpcrdma_writech) 361 goto done; 362 363 rpcrdma_xdr_cursor_init(&cur, &rqst->rq_rcv_buf, 364 rqst->rq_rcv_buf.head[0].iov_len, wtype); 365 366 if (xdr_stream_encode_item_present(xdr) < 0) 367 return -EMSGSIZE; 368 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 369 if (unlikely(!segcount)) 370 return -EMSGSIZE; 371 /* Actual value encoded below */ 372 373 nchunks = 0; 374 do { 375 ret = rpcrdma_mr_prepare(r_xprt, req, &cur, true, &mr); 376 if (ret) 377 return ret; 378 379 if (encode_rdma_segment(xdr, mr) < 0) 380 return -EMSGSIZE; 381 382 trace_xprtrdma_chunk_write(rqst->rq_task, mr, 383 rpcrdma_xdr_cursor_done(&cur)); 384 r_xprt->rx_stats.write_chunk_count++; 385 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 386 nchunks++; 387 } while (!rpcrdma_xdr_cursor_done(&cur)); 388 389 if (xdr_pad_size(rqst->rq_rcv_buf.page_len)) { 390 if (encode_rdma_segment(xdr, ep->re_write_pad_mr) < 0) 391 return -EMSGSIZE; 392 393 trace_xprtrdma_chunk_wp(rqst->rq_task, ep->re_write_pad_mr, 394 true); 395 r_xprt->rx_stats.write_chunk_count++; 396 r_xprt->rx_stats.total_rdma_request += 397 ep->re_write_pad_mr->mr_length; 398 nchunks++; 399 } 400 401 /* Update count of segments in this Write chunk */ 402 *segcount = cpu_to_be32(nchunks); 403 404 done: 405 if (xdr_stream_encode_item_absent(xdr) < 0) 406 return -EMSGSIZE; 407 return 0; 408 } 409 410 /* Register and XDR encode the Reply chunk. Supports encoding an array 411 * of plain segments that belong to a single write (reply) chunk. 412 * 413 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 414 * 415 * Reply chunk (a counted array): 416 * N elements: 417 * 1 - N - HLOO - HLOO - ... - HLOO 418 * 419 * Returns zero on success, or a negative errno if a failure occurred. 420 * @xdr is advanced to the next position in the stream. 421 */ 422 static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, 423 struct rpcrdma_req *req, 424 struct rpc_rqst *rqst, 425 enum rpcrdma_chunktype wtype) 426 { 427 struct xdr_stream *xdr = &req->rl_stream; 428 struct rpcrdma_xdr_cursor cur; 429 struct rpcrdma_mr *mr; 430 int nchunks, ret; 431 __be32 *segcount; 432 433 if (wtype != rpcrdma_replych) { 434 if (xdr_stream_encode_item_absent(xdr) < 0) 435 return -EMSGSIZE; 436 return 0; 437 } 438 439 rpcrdma_xdr_cursor_init(&cur, &rqst->rq_rcv_buf, 0, wtype); 440 441 if (xdr_stream_encode_item_present(xdr) < 0) 442 return -EMSGSIZE; 443 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 444 if (unlikely(!segcount)) 445 return -EMSGSIZE; 446 /* Actual value encoded below */ 447 448 nchunks = 0; 449 do { 450 ret = rpcrdma_mr_prepare(r_xprt, req, &cur, true, &mr); 451 if (ret) 452 return ret; 453 454 if (encode_rdma_segment(xdr, mr) < 0) 455 return -EMSGSIZE; 456 457 trace_xprtrdma_chunk_reply(rqst->rq_task, mr, 458 rpcrdma_xdr_cursor_done(&cur)); 459 r_xprt->rx_stats.reply_chunk_count++; 460 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 461 nchunks++; 462 } while (!rpcrdma_xdr_cursor_done(&cur)); 463 464 /* Update count of segments in the Reply chunk */ 465 *segcount = cpu_to_be32(nchunks); 466 467 return 0; 468 } 469 470 static void rpcrdma_sendctx_dma_unmap(struct rpcrdma_sendctx *sc) 471 { 472 struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf; 473 struct ib_sge *sge; 474 475 /* The first two SGEs contain the transport header and 476 * the inline buffer. These are always left mapped so 477 * they can be cheaply re-used. 478 */ 479 for (sge = &sc->sc_sges[2]; sc->sc_unmap_count; 480 ++sge, --sc->sc_unmap_count) 481 ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length, 482 DMA_TO_DEVICE); 483 } 484 485 /** 486 * rpcrdma_sendctx_unmap - DMA-unmap Send buffer and release Send owner 487 * @sc: sendctx containing SGEs to unmap 488 * 489 */ 490 void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) 491 { 492 struct rpcrdma_req *req = sc->sc_req; 493 494 rpcrdma_sendctx_dma_unmap(sc); 495 sc->sc_req = NULL; 496 req->rl_sendctx = NULL; 497 rpcrdma_req_put(req); 498 } 499 500 /* No Send was posted. Release DMA mappings prepared for this 501 * sendctx, but leave the request reference count alone. 502 */ 503 static void rpcrdma_sendctx_cancel(struct rpcrdma_sendctx *sc) 504 { 505 struct rpcrdma_req *req = sc->sc_req; 506 507 rpcrdma_sendctx_dma_unmap(sc); 508 sc->sc_req = NULL; 509 req->rl_sendctx = NULL; 510 } 511 512 /* Prepare an SGE for the RPC-over-RDMA transport header. 513 */ 514 static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, 515 struct rpcrdma_req *req, u32 len) 516 { 517 struct rpcrdma_sendctx *sc = req->rl_sendctx; 518 struct rpcrdma_regbuf *rb = req->rl_rdmabuf; 519 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 520 521 sge->addr = rdmab_addr(rb); 522 sge->length = len; 523 sge->lkey = rdmab_lkey(rb); 524 525 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, 526 DMA_TO_DEVICE); 527 } 528 529 /* The head iovec is straightforward, as it is usually already 530 * DMA-mapped. Sync the content that has changed. 531 */ 532 static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt, 533 struct rpcrdma_req *req, unsigned int len) 534 { 535 struct rpcrdma_sendctx *sc = req->rl_sendctx; 536 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 537 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 538 539 if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) 540 return false; 541 542 sge->addr = rdmab_addr(rb); 543 sge->length = len; 544 sge->lkey = rdmab_lkey(rb); 545 546 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, 547 DMA_TO_DEVICE); 548 return true; 549 } 550 551 /* If there is a page list present, DMA map and prepare an 552 * SGE for each page to be sent. 553 */ 554 static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req, 555 struct xdr_buf *xdr) 556 { 557 struct rpcrdma_sendctx *sc = req->rl_sendctx; 558 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 559 unsigned int page_base, len, remaining; 560 struct page **ppages; 561 struct ib_sge *sge; 562 563 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 564 page_base = offset_in_page(xdr->page_base); 565 remaining = xdr->page_len; 566 while (remaining) { 567 sge = &sc->sc_sges[req->rl_wr.num_sge++]; 568 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); 569 sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages, 570 page_base, len, DMA_TO_DEVICE); 571 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) 572 goto out_mapping_err; 573 574 sge->length = len; 575 sge->lkey = rdmab_lkey(rb); 576 577 sc->sc_unmap_count++; 578 ppages++; 579 remaining -= len; 580 page_base = 0; 581 } 582 583 return true; 584 585 out_mapping_err: 586 trace_xprtrdma_dma_maperr(sge->addr); 587 return false; 588 } 589 590 /* The tail iovec may include an XDR pad for the page list, 591 * as well as additional content, and may not reside in the 592 * same page as the head iovec. 593 */ 594 static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req, 595 struct xdr_buf *xdr, 596 unsigned int page_base, unsigned int len) 597 { 598 struct rpcrdma_sendctx *sc = req->rl_sendctx; 599 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 600 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 601 struct page *page = virt_to_page(xdr->tail[0].iov_base); 602 603 sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len, 604 DMA_TO_DEVICE); 605 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) 606 goto out_mapping_err; 607 608 sge->length = len; 609 sge->lkey = rdmab_lkey(rb); 610 ++sc->sc_unmap_count; 611 return true; 612 613 out_mapping_err: 614 trace_xprtrdma_dma_maperr(sge->addr); 615 return false; 616 } 617 618 /* Copy the tail to the end of the head buffer. 619 */ 620 static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt, 621 struct rpcrdma_req *req, 622 struct xdr_buf *xdr) 623 { 624 unsigned char *dst; 625 626 dst = (unsigned char *)xdr->head[0].iov_base; 627 dst += xdr->head[0].iov_len + xdr->page_len; 628 memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len); 629 r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len; 630 } 631 632 /* Copy pagelist content into the head buffer. 633 */ 634 static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt, 635 struct rpcrdma_req *req, 636 struct xdr_buf *xdr) 637 { 638 unsigned int len, page_base, remaining; 639 struct page **ppages; 640 unsigned char *src, *dst; 641 642 dst = (unsigned char *)xdr->head[0].iov_base; 643 dst += xdr->head[0].iov_len; 644 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 645 page_base = offset_in_page(xdr->page_base); 646 remaining = xdr->page_len; 647 while (remaining) { 648 src = page_address(*ppages); 649 src += page_base; 650 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); 651 memcpy(dst, src, len); 652 r_xprt->rx_stats.pullup_copy_count += len; 653 654 ppages++; 655 dst += len; 656 remaining -= len; 657 page_base = 0; 658 } 659 } 660 661 /* Copy the contents of @xdr into @rl_sendbuf and DMA sync it. 662 * When the head, pagelist, and tail are small, a pull-up copy 663 * is considerably less costly than DMA mapping the components 664 * of @xdr. 665 * 666 * Assumptions: 667 * - the caller has already verified that the total length 668 * of the RPC Call body will fit into @rl_sendbuf. 669 */ 670 static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt, 671 struct rpcrdma_req *req, 672 struct xdr_buf *xdr) 673 { 674 if (unlikely(xdr->tail[0].iov_len)) 675 rpcrdma_pullup_tail_iov(r_xprt, req, xdr); 676 677 if (unlikely(xdr->page_len)) 678 rpcrdma_pullup_pagelist(r_xprt, req, xdr); 679 680 /* The whole RPC message resides in the head iovec now */ 681 return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len); 682 } 683 684 static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt, 685 struct rpcrdma_req *req, 686 struct xdr_buf *xdr) 687 { 688 struct kvec *tail = &xdr->tail[0]; 689 690 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) 691 return false; 692 if (xdr->page_len) 693 if (!rpcrdma_prepare_pagelist(req, xdr)) 694 return false; 695 if (tail->iov_len) 696 if (!rpcrdma_prepare_tail_iov(req, xdr, 697 offset_in_page(tail->iov_base), 698 tail->iov_len)) 699 return false; 700 701 return true; 702 } 703 704 static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt, 705 struct rpcrdma_req *req, 706 struct xdr_buf *xdr) 707 { 708 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) 709 return false; 710 711 /* If there is a Read chunk, the page list is being handled 712 * via explicit RDMA, and thus is skipped here. 713 */ 714 715 /* Do not include the tail if it is only an XDR pad */ 716 if (xdr->tail[0].iov_len > 3) { 717 unsigned int page_base, len; 718 719 /* If the content in the page list is an odd length, 720 * xdr_write_pages() adds a pad at the beginning of 721 * the tail iovec. Force the tail's non-pad content to 722 * land at the next XDR position in the Send message. 723 */ 724 page_base = offset_in_page(xdr->tail[0].iov_base); 725 len = xdr->tail[0].iov_len; 726 page_base += len & 3; 727 len -= len & 3; 728 if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len)) 729 return false; 730 } 731 732 return true; 733 } 734 735 /** 736 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR 737 * @r_xprt: controlling transport 738 * @req: context of RPC Call being marshalled 739 * @hdrlen: size of transport header, in bytes 740 * @xdr: xdr_buf containing RPC Call 741 * @rtype: chunk type being encoded 742 * 743 * Returns 0 on success; otherwise a negative errno is returned. 744 */ 745 inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, 746 struct rpcrdma_req *req, u32 hdrlen, 747 struct xdr_buf *xdr, 748 enum rpcrdma_chunktype rtype) 749 { 750 struct rpcrdma_sendctx *sc; 751 int ret; 752 753 ret = -EAGAIN; 754 req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); 755 if (!req->rl_sendctx) 756 goto out_nosc; 757 req->rl_sendctx->sc_unmap_count = 0; 758 req->rl_sendctx->sc_req = req; 759 req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe; 760 req->rl_wr.sg_list = req->rl_sendctx->sc_sges; 761 req->rl_wr.num_sge = 0; 762 req->rl_wr.opcode = IB_WR_SEND; 763 764 rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen); 765 766 ret = -EIO; 767 switch (rtype) { 768 case rpcrdma_noch_pullup: 769 if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr)) 770 goto out_unmap; 771 break; 772 case rpcrdma_noch_mapped: 773 if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr)) 774 goto out_unmap; 775 break; 776 case rpcrdma_readch: 777 if (!rpcrdma_prepare_readch(r_xprt, req, xdr)) 778 goto out_unmap; 779 break; 780 case rpcrdma_areadch: 781 break; 782 default: 783 goto out_unmap; 784 } 785 786 /* The Send-side owner releases this reference when the 787 * Send has completed. 788 */ 789 kref_get(&req->rl_kref); 790 return 0; 791 792 out_unmap: 793 sc = req->rl_sendctx; 794 rpcrdma_sendctx_cancel(sc); 795 rpcrdma_sendctx_unget_locked(r_xprt, sc); 796 out_nosc: 797 trace_xprtrdma_prepsend_failed(&req->rl_slot, ret); 798 return ret; 799 } 800 801 /** 802 * rpcrdma_marshal_req - Marshal and send one RPC request 803 * @r_xprt: controlling transport 804 * @rqst: RPC request to be marshaled 805 * 806 * For the RPC in "rqst", this function: 807 * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG) 808 * - Registers Read, Write, and Reply chunks 809 * - Constructs the transport header 810 * - Posts a Send WR to send the transport header and request 811 * 812 * Returns: 813 * %0 if the RPC was sent successfully, 814 * %-ENOTCONN if the connection was lost, 815 * %-EAGAIN if the caller should call again with the same arguments, 816 * %-ENOBUFS if the caller should call again after a delay, 817 * %-EMSGSIZE if the transport header is too small, 818 * %-EIO if a permanent problem occurred while marshaling. 819 */ 820 int 821 rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) 822 { 823 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 824 struct xdr_stream *xdr = &req->rl_stream; 825 enum rpcrdma_chunktype rtype, wtype; 826 struct xdr_buf *buf = &rqst->rq_snd_buf; 827 bool ddp_allowed; 828 __be32 *p; 829 int ret; 830 831 if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) { 832 ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf); 833 if (ret) 834 return ret; 835 } 836 837 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); 838 xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf), 839 rqst); 840 841 /* Fixed header fields */ 842 ret = -EMSGSIZE; 843 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 844 if (!p) 845 goto out_err; 846 *p++ = rqst->rq_xid; 847 *p++ = rpcrdma_version; 848 *p++ = r_xprt->rx_buf.rb_max_requests; 849 850 /* When the ULP employs a GSS flavor that guarantees integrity 851 * or privacy, direct data placement of individual data items 852 * is not allowed. 853 */ 854 ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH, 855 &rqst->rq_cred->cr_auth->au_flags); 856 857 /* 858 * Chunks needed for results? 859 * 860 * o If the expected result is under the inline threshold, all ops 861 * return as inline. 862 * o Large read ops return data as write chunk(s), header as 863 * inline. 864 * o Large non-read ops return as a single reply chunk. 865 */ 866 if (rpcrdma_results_inline(r_xprt, rqst)) 867 wtype = rpcrdma_noch; 868 else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) && 869 rpcrdma_nonpayload_inline(r_xprt, rqst)) 870 wtype = rpcrdma_writech; 871 else 872 wtype = rpcrdma_replych; 873 874 /* 875 * Chunks needed for arguments? 876 * 877 * o If the total request is under the inline threshold, all ops 878 * are sent as inline. 879 * o Large write ops transmit data as read chunk(s), header as 880 * inline. 881 * o Large non-write ops are sent with the entire message as a 882 * single read chunk (protocol 0-position special case). 883 * 884 * This assumes that the upper layer does not present a request 885 * that both has a data payload, and whose non-data arguments 886 * by themselves are larger than the inline threshold. 887 */ 888 if (rpcrdma_args_inline(r_xprt, rqst)) { 889 *p++ = rdma_msg; 890 rtype = buf->len < rdmab_length(req->rl_sendbuf) ? 891 rpcrdma_noch_pullup : rpcrdma_noch_mapped; 892 } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) { 893 *p++ = rdma_msg; 894 rtype = rpcrdma_readch; 895 } else { 896 r_xprt->rx_stats.nomsg_call_count++; 897 *p++ = rdma_nomsg; 898 rtype = rpcrdma_areadch; 899 } 900 901 /* This implementation supports the following combinations 902 * of chunk lists in one RPC-over-RDMA Call message: 903 * 904 * - Read list 905 * - Write list 906 * - Reply chunk 907 * - Read list + Reply chunk 908 * 909 * It might not yet support the following combinations: 910 * 911 * - Read list + Write list 912 * 913 * It does not support the following combinations: 914 * 915 * - Write list + Reply chunk 916 * - Read list + Write list + Reply chunk 917 * 918 * This implementation supports only a single chunk in each 919 * Read or Write list. Thus for example the client cannot 920 * send a Call message with a Position Zero Read chunk and a 921 * regular Read chunk at the same time. 922 */ 923 ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); 924 if (ret) 925 goto out_err; 926 ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); 927 if (ret) 928 goto out_err; 929 ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); 930 if (ret) 931 goto out_err; 932 933 ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len, 934 buf, rtype); 935 if (ret) 936 goto out_err; 937 938 trace_xprtrdma_marshal(req, rtype, wtype); 939 return 0; 940 941 out_err: 942 trace_xprtrdma_marshal_failed(rqst, ret); 943 r_xprt->rx_stats.failed_marshal_count++; 944 frwr_reset(req); 945 return ret; 946 } 947 948 static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt, 949 struct rpcrdma_buffer *buf, 950 u32 grant) 951 { 952 buf->rb_credits = grant; 953 xprt->cwnd = grant << RPC_CWNDSHIFT; 954 } 955 956 static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant) 957 { 958 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 959 960 spin_lock(&xprt->transport_lock); 961 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant); 962 spin_unlock(&xprt->transport_lock); 963 } 964 965 /** 966 * rpcrdma_reset_cwnd - Reset the xprt's congestion window 967 * @r_xprt: controlling transport instance 968 * 969 * Prepare @r_xprt for the next connection by reinitializing 970 * its credit grant to one (see RFC 8166, Section 3.3.3). 971 */ 972 void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt) 973 { 974 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 975 976 spin_lock(&xprt->transport_lock); 977 xprt->cong = 0; 978 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1); 979 spin_unlock(&xprt->transport_lock); 980 } 981 982 /** 983 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs 984 * @rqst: controlling RPC request 985 * @srcp: points to RPC message payload in receive buffer 986 * @copy_len: remaining length of receive buffer content 987 * @pad: Write chunk pad bytes needed (zero for pure inline) 988 * 989 * The upper layer has set the maximum number of bytes it can 990 * receive in each component of rq_rcv_buf. These values are set in 991 * the head.iov_len, page_len, tail.iov_len, and buflen fields. 992 * 993 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in 994 * many cases this function simply updates iov_base pointers in 995 * rq_rcv_buf to point directly to the received reply data, to 996 * avoid copying reply data. 997 * 998 * Returns the count of bytes which had to be memcopied. 999 */ 1000 static unsigned long 1001 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) 1002 { 1003 unsigned long fixup_copy_count; 1004 int i, npages, curlen; 1005 char *destp; 1006 struct page **ppages; 1007 int page_base; 1008 1009 /* The head iovec is redirected to the RPC reply message 1010 * in the receive buffer, to avoid a memcopy. 1011 */ 1012 rqst->rq_rcv_buf.head[0].iov_base = srcp; 1013 rqst->rq_private_buf.head[0].iov_base = srcp; 1014 1015 /* The contents of the receive buffer that follow 1016 * head.iov_len bytes are copied into the page list. 1017 */ 1018 curlen = rqst->rq_rcv_buf.head[0].iov_len; 1019 if (curlen > copy_len) 1020 curlen = copy_len; 1021 srcp += curlen; 1022 copy_len -= curlen; 1023 1024 ppages = rqst->rq_rcv_buf.pages + 1025 (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT); 1026 page_base = offset_in_page(rqst->rq_rcv_buf.page_base); 1027 fixup_copy_count = 0; 1028 if (copy_len && rqst->rq_rcv_buf.page_len) { 1029 int pagelist_len; 1030 1031 pagelist_len = rqst->rq_rcv_buf.page_len; 1032 if (pagelist_len > copy_len) 1033 pagelist_len = copy_len; 1034 npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT; 1035 for (i = 0; i < npages; i++) { 1036 curlen = PAGE_SIZE - page_base; 1037 if (curlen > pagelist_len) 1038 curlen = pagelist_len; 1039 1040 destp = kmap_atomic(ppages[i]); 1041 memcpy(destp + page_base, srcp, curlen); 1042 flush_dcache_page(ppages[i]); 1043 kunmap_atomic(destp); 1044 srcp += curlen; 1045 copy_len -= curlen; 1046 fixup_copy_count += curlen; 1047 pagelist_len -= curlen; 1048 if (!pagelist_len) 1049 break; 1050 page_base = 0; 1051 } 1052 1053 /* Implicit padding for the last segment in a Write 1054 * chunk is inserted inline at the front of the tail 1055 * iovec. The upper layer ignores the content of 1056 * the pad. Simply ensure inline content in the tail 1057 * that follows the Write chunk is properly aligned. 1058 */ 1059 if (pad) 1060 srcp -= pad; 1061 } 1062 1063 /* The tail iovec is redirected to the remaining data 1064 * in the receive buffer, to avoid a memcopy. 1065 */ 1066 if (copy_len || pad) { 1067 rqst->rq_rcv_buf.tail[0].iov_base = srcp; 1068 rqst->rq_private_buf.tail[0].iov_base = srcp; 1069 } 1070 1071 if (fixup_copy_count) 1072 trace_xprtrdma_fixup(rqst, fixup_copy_count); 1073 return fixup_copy_count; 1074 } 1075 1076 /* By convention, backchannel calls arrive via rdma_msg type 1077 * messages, and never populate the chunk lists. This makes 1078 * the RPC/RDMA header small and fixed in size, so it is 1079 * straightforward to check the RPC header's direction field. 1080 */ 1081 static bool 1082 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 1083 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1084 { 1085 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1086 struct xdr_stream *xdr = &rep->rr_stream; 1087 __be32 *p; 1088 1089 if (rep->rr_proc != rdma_msg) 1090 return false; 1091 1092 /* Peek at stream contents without advancing. */ 1093 p = xdr_inline_decode(xdr, 0); 1094 if ((char *)xdr->end - (char *)p < 5 * XDR_UNIT) 1095 return false; 1096 1097 /* Chunk lists */ 1098 if (xdr_item_is_present(p++)) 1099 return false; 1100 if (xdr_item_is_present(p++)) 1101 return false; 1102 if (xdr_item_is_present(p++)) 1103 return false; 1104 1105 /* RPC header */ 1106 if (*p++ != rep->rr_xid) 1107 return false; 1108 if (*p != cpu_to_be32(RPC_CALL)) 1109 return false; 1110 1111 /* No bc service. */ 1112 if (xprt->bc_serv == NULL) 1113 return false; 1114 1115 /* Now that we are sure this is a backchannel call, 1116 * advance to the RPC header. 1117 */ 1118 p = xdr_inline_decode(xdr, 3 * sizeof(*p)); 1119 if (unlikely(!p)) 1120 return false; 1121 1122 rpcrdma_bc_receive_call(r_xprt, rep); 1123 return true; 1124 } 1125 #else /* CONFIG_SUNRPC_BACKCHANNEL */ 1126 { 1127 return false; 1128 } 1129 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1130 1131 static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length) 1132 { 1133 u32 handle; 1134 u64 offset; 1135 __be32 *p; 1136 1137 p = xdr_inline_decode(xdr, 4 * sizeof(*p)); 1138 if (unlikely(!p)) 1139 return -EIO; 1140 1141 xdr_decode_rdma_segment(p, &handle, length, &offset); 1142 trace_xprtrdma_decode_seg(handle, *length, offset); 1143 return 0; 1144 } 1145 1146 static int decode_write_chunk(struct xdr_stream *xdr, u32 *length) 1147 { 1148 u32 segcount, seglength; 1149 __be32 *p; 1150 1151 p = xdr_inline_decode(xdr, sizeof(*p)); 1152 if (unlikely(!p)) 1153 return -EIO; 1154 1155 *length = 0; 1156 segcount = be32_to_cpup(p); 1157 while (segcount--) { 1158 if (decode_rdma_segment(xdr, &seglength)) 1159 return -EIO; 1160 *length += seglength; 1161 } 1162 1163 return 0; 1164 } 1165 1166 /* In RPC-over-RDMA Version One replies, a Read list is never 1167 * expected. This decoder is a stub that returns an error if 1168 * a Read list is present. 1169 */ 1170 static int decode_read_list(struct xdr_stream *xdr) 1171 { 1172 __be32 *p; 1173 1174 p = xdr_inline_decode(xdr, sizeof(*p)); 1175 if (unlikely(!p)) 1176 return -EIO; 1177 if (unlikely(xdr_item_is_present(p))) 1178 return -EIO; 1179 return 0; 1180 } 1181 1182 /* Supports only one Write chunk in the Write list 1183 */ 1184 static int decode_write_list(struct xdr_stream *xdr, u32 *length) 1185 { 1186 u32 chunklen; 1187 bool first; 1188 __be32 *p; 1189 1190 *length = 0; 1191 first = true; 1192 do { 1193 p = xdr_inline_decode(xdr, sizeof(*p)); 1194 if (unlikely(!p)) 1195 return -EIO; 1196 if (xdr_item_is_absent(p)) 1197 break; 1198 if (!first) 1199 return -EIO; 1200 1201 if (decode_write_chunk(xdr, &chunklen)) 1202 return -EIO; 1203 *length += chunklen; 1204 first = false; 1205 } while (true); 1206 return 0; 1207 } 1208 1209 static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length) 1210 { 1211 __be32 *p; 1212 1213 p = xdr_inline_decode(xdr, sizeof(*p)); 1214 if (unlikely(!p)) 1215 return -EIO; 1216 1217 *length = 0; 1218 if (xdr_item_is_present(p)) 1219 if (decode_write_chunk(xdr, length)) 1220 return -EIO; 1221 return 0; 1222 } 1223 1224 static int 1225 rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1226 struct rpc_rqst *rqst) 1227 { 1228 struct xdr_stream *xdr = &rep->rr_stream; 1229 u32 writelist, replychunk, rpclen; 1230 char *base; 1231 1232 /* Decode the chunk lists */ 1233 if (decode_read_list(xdr)) 1234 return -EIO; 1235 if (decode_write_list(xdr, &writelist)) 1236 return -EIO; 1237 if (decode_reply_chunk(xdr, &replychunk)) 1238 return -EIO; 1239 1240 /* RDMA_MSG sanity checks */ 1241 if (unlikely(replychunk)) 1242 return -EIO; 1243 1244 /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */ 1245 base = (char *)xdr_inline_decode(xdr, 0); 1246 rpclen = xdr_stream_remaining(xdr); 1247 r_xprt->rx_stats.fixup_copy_count += 1248 rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3); 1249 1250 r_xprt->rx_stats.total_rdma_reply += writelist; 1251 return rpclen + xdr_align_size(writelist); 1252 } 1253 1254 static noinline int 1255 rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 1256 { 1257 struct xdr_stream *xdr = &rep->rr_stream; 1258 u32 writelist, replychunk; 1259 1260 /* Decode the chunk lists */ 1261 if (decode_read_list(xdr)) 1262 return -EIO; 1263 if (decode_write_list(xdr, &writelist)) 1264 return -EIO; 1265 if (decode_reply_chunk(xdr, &replychunk)) 1266 return -EIO; 1267 1268 /* RDMA_NOMSG sanity checks */ 1269 if (unlikely(writelist)) 1270 return -EIO; 1271 if (unlikely(!replychunk)) 1272 return -EIO; 1273 1274 /* Reply chunk buffer already is the reply vector */ 1275 r_xprt->rx_stats.total_rdma_reply += replychunk; 1276 return replychunk; 1277 } 1278 1279 static noinline int 1280 rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1281 struct rpc_rqst *rqst) 1282 { 1283 struct xdr_stream *xdr = &rep->rr_stream; 1284 __be32 *p; 1285 1286 p = xdr_inline_decode(xdr, sizeof(*p)); 1287 if (unlikely(!p)) 1288 return -EIO; 1289 1290 switch (*p) { 1291 case err_vers: 1292 p = xdr_inline_decode(xdr, 2 * sizeof(*p)); 1293 if (!p) 1294 break; 1295 trace_xprtrdma_err_vers(rqst, p, p + 1); 1296 break; 1297 case err_chunk: 1298 trace_xprtrdma_err_chunk(rqst); 1299 break; 1300 default: 1301 trace_xprtrdma_err_unrecognized(rqst, p); 1302 } 1303 1304 return -EIO; 1305 } 1306 1307 /** 1308 * rpcrdma_unpin_rqst - Release rqst without completing it 1309 * @rep: RPC/RDMA Receive context 1310 * 1311 * This is done when a connection is lost so that a Reply 1312 * can be dropped and its matching Call can be subsequently 1313 * retransmitted on a new connection. 1314 */ 1315 void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep) 1316 { 1317 struct rpc_xprt *xprt = &rep->rr_rxprt->rx_xprt; 1318 struct rpc_rqst *rqst = rep->rr_rqst; 1319 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 1320 1321 req->rl_reply = NULL; 1322 rep->rr_rqst = NULL; 1323 1324 spin_lock(&xprt->queue_lock); 1325 xprt_unpin_rqst(rqst); 1326 spin_unlock(&xprt->queue_lock); 1327 } 1328 1329 /** 1330 * rpcrdma_complete_rqst - Pass completed rqst back to RPC 1331 * @rep: RPC/RDMA Receive context 1332 * 1333 * Reconstruct the RPC reply and complete the transaction 1334 * while @rqst is still pinned to ensure the rep, rqst, and 1335 * rq_task pointers remain stable. 1336 */ 1337 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) 1338 { 1339 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1340 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1341 struct rpc_rqst *rqst = rep->rr_rqst; 1342 int status; 1343 1344 /* I3: rl_registered has been drained by frwr_unmap before 1345 * complete_rqst runs. 1346 */ 1347 WARN_ON_ONCE(!list_empty(&rpcr_to_rdmar(rqst)->rl_registered)); 1348 1349 switch (rep->rr_proc) { 1350 case rdma_msg: 1351 status = rpcrdma_decode_msg(r_xprt, rep, rqst); 1352 break; 1353 case rdma_nomsg: 1354 status = rpcrdma_decode_nomsg(r_xprt, rep); 1355 break; 1356 case rdma_error: 1357 status = rpcrdma_decode_error(r_xprt, rep, rqst); 1358 break; 1359 default: 1360 status = -EIO; 1361 } 1362 if (status < 0) 1363 goto out_badheader; 1364 1365 out: 1366 spin_lock(&xprt->queue_lock); 1367 xprt_complete_rqst(rqst->rq_task, status); 1368 xprt_unpin_rqst(rqst); 1369 spin_unlock(&xprt->queue_lock); 1370 return; 1371 1372 out_badheader: 1373 trace_xprtrdma_reply_hdr_err(rep); 1374 r_xprt->rx_stats.bad_reply_count++; 1375 rqst->rq_task->tk_status = status; 1376 status = 0; 1377 goto out; 1378 } 1379 1380 /* Reply-side ownership invariants 1381 * 1382 * I1 (Receive WR ownership). A struct rpcrdma_rep is owned by the 1383 * HCA between ib_post_recv() and the matching Receive completion. 1384 * After ib_dma_sync_single_for_cpu() in rpcrdma_wc_receive() it is 1385 * owned by the CPU until rpcrdma_rep_put() returns it to 1386 * rb_free_reps; a rep on rb_free_reps is not re-posted until 1387 * rpcrdma_post_recvs() pulls it off. Asserted: rpcrdma_post_recvs() 1388 * WARNs that a pulled rep has rr_rqst == NULL. 1389 * 1390 * I2 (rep attachment). While req->rl_reply == rep, the rep cannot be 1391 * re-posted. rpcrdma_reply_put() NULLs req->rl_reply before handing 1392 * the rep to rpcrdma_rep_put(). Asserted: rpcrdma_reply_put() WARNs 1393 * that rl_reply is NULL after the put. 1394 * 1395 * I3 (Registered-MR fence). On entry to rpcrdma_complete_rqst() every 1396 * MR that was on req->rl_registered has had its rkey invalidated 1397 * (remotely via IB_WC_WITH_INVALIDATE or locally via IB_WR_LOCAL_INV) 1398 * and its pages ib_dma_unmap_sg()'d. The LocalInv chain is posted 1399 * on a single QP; strong send-queue ordering makes the last 1400 * completion (frwr_wc_localinv_done) observe the 1401 * ib_dma_unmap_sg() that ran from each earlier completion's 1402 * frwr_mr_put() before complete_rqst is called. The inline 1403 * frwr_reminv() path unmaps its one MR synchronously before 1404 * rpcrdma_reply_handler() reaches complete_rqst. Asserted: 1405 * rpcrdma_complete_rqst() WARNs that rl_registered is empty. 1406 * 1407 * I4 (Send-buffer release). req->rl_kref carries two unconditional 1408 * owners while a Send is outstanding: the RPC-layer reference (set 1409 * at xprt_rdma_alloc_slot / xprt_rdma_bc_rqst_get / rpcrdma_req_release 1410 * pool-entry) and the Send-side reference (kref_get() in 1411 * rpcrdma_prepare_send_sges()). rpcrdma_req_release() runs only 1412 * after both have dropped, so the req does not return to its free 1413 * pool until rpcrdma_sendctx_unmap() has fired -- the HCA has 1414 * released the send buffer before the req can be reused. Asserted: 1415 * rpcrdma_req_release() WARNs that rl_sendctx is NULL. 1416 * 1417 * I5 (req lifecycle). A req is owned by the RPC layer between slot 1418 * acquisition and the matching xprt_rdma_free_slot() (or, for the 1419 * backchannel, xprt_rdma_bc_free_rqst()). While owned, rl_kref >= 1. 1420 * The pools (rb_send_bufs, bc_pa_list, backlog wake target) never 1421 * contain a req with outstanding Send-side or Reply-side work. 1422 * 1423 * Non-hazards. The following claims have been raised by adversarial 1424 * review and are each closed by the invariants above: 1425 * 1426 * * "Reply completes the RPC while the HCA still holds the send 1427 * buffer" -- excluded by I4. The Send-side kref reference is held 1428 * until rpcrdma_sendctx_unmap() runs from Send completion. 1429 * 1430 * * "Signal-driven release races the in-flight Send" -- same 1431 * resolution. xprt_rdma_free() does not touch rl_kref; the 1432 * Send-side reference keeps the req out of its pool until Send 1433 * completion fires. 1434 * 1435 * * "Receive completion races rep reuse" -- excluded by I1. A rep 1436 * is on rb_free_reps only after rpcrdma_rep_put() has been called 1437 * and rpcrdma_post_recvs() owns the next transition back to the HCA. 1438 * 1439 * * "Pages still DMA-mapped when call_decode reads them" -- excluded 1440 * by I3. The matching ib_dma_unmap_sg() for every MR has run on 1441 * the same CPU thread that calls rpcrdma_complete_rqst(). 1442 */ 1443 1444 /** 1445 * rpcrdma_reply_handler - Process received RPC/RDMA messages 1446 * @rep: Incoming rpcrdma_rep object to process 1447 * 1448 * Errors must result in the RPC task either being awakened, or 1449 * allowed to timeout, to discover the errors at that time. 1450 */ 1451 void rpcrdma_reply_handler(struct rpcrdma_rep *rep) 1452 { 1453 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1454 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1455 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1456 struct rpcrdma_req *req; 1457 struct rpc_rqst *rqst; 1458 u32 credits; 1459 __be32 *p; 1460 1461 /* Any data means we had a useful conversation, so 1462 * then we don't need to delay the next reconnect. 1463 */ 1464 if (xprt->reestablish_timeout) 1465 xprt->reestablish_timeout = 0; 1466 1467 /* Fixed transport header fields */ 1468 xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, 1469 rep->rr_hdrbuf.head[0].iov_base, NULL); 1470 p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p)); 1471 if (unlikely(!p)) 1472 goto out_shortreply; 1473 rep->rr_xid = *p++; 1474 rep->rr_vers = *p++; 1475 credits = be32_to_cpu(*p++); 1476 rep->rr_proc = *p++; 1477 1478 /* The credit grant from the wire is not trustworthy; 1479 * sanitize it before any code path consumes it. 1480 */ 1481 if (credits == 0) 1482 credits = 1; /* don't deadlock */ 1483 else if (credits > r_xprt->rx_ep->re_max_requests) 1484 credits = r_xprt->rx_ep->re_max_requests; 1485 1486 if (rep->rr_vers != rpcrdma_version) 1487 goto out_badversion; 1488 1489 if (rpcrdma_is_bcall(r_xprt, rep)) 1490 return; 1491 1492 /* Match incoming rpcrdma_rep to an rpcrdma_req to 1493 * get context for handling any incoming chunks. 1494 */ 1495 spin_lock(&xprt->queue_lock); 1496 rqst = xprt_lookup_rqst(xprt, rep->rr_xid); 1497 if (!rqst) 1498 goto out_norqst; 1499 xprt_pin_rqst(rqst); 1500 spin_unlock(&xprt->queue_lock); 1501 1502 if (buf->rb_credits != credits) 1503 rpcrdma_update_cwnd(r_xprt, credits); 1504 1505 req = rpcr_to_rdmar(rqst); 1506 if (unlikely(req->rl_reply)) 1507 rpcrdma_rep_put(buf, req->rl_reply); 1508 req->rl_reply = rep; 1509 rep->rr_rqst = rqst; 1510 1511 trace_xprtrdma_reply(rqst->rq_task, rep, credits); 1512 1513 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) 1514 frwr_reminv(rep, &req->rl_registered); 1515 if (!list_empty(&req->rl_registered)) 1516 frwr_unmap_async(r_xprt, req); 1517 /* LocalInv completion will complete the RPC */ 1518 else 1519 rpcrdma_complete_rqst(rep); 1520 1521 out_post: 1522 rpcrdma_post_recvs(r_xprt, 1523 credits + (buf->rb_bc_srv_max_requests << 1)); 1524 return; 1525 1526 out_norqst: 1527 spin_unlock(&xprt->queue_lock); 1528 trace_xprtrdma_reply_rqst_err(rep); 1529 rpcrdma_rep_put(buf, rep); 1530 goto out_post; 1531 1532 out_badversion: 1533 trace_xprtrdma_reply_vers_err(rep); 1534 rpcrdma_rep_put(buf, rep); 1535 credits = buf->rb_credits; 1536 goto out_post; 1537 1538 out_shortreply: 1539 trace_xprtrdma_reply_short_err(rep); 1540 rpcrdma_rep_put(buf, rep); 1541 credits = buf->rb_credits; 1542 goto out_post; 1543 } 1544