1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2016-2018 Oracle. All rights reserved. 4 * 5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. 6 */ 7 8 #include <rdma/rw.h> 9 10 #include <linux/sunrpc/xdr.h> 11 #include <linux/sunrpc/rpc_rdma.h> 12 #include <linux/sunrpc/svc_rdma.h> 13 14 #include "xprt_rdma.h" 15 #include <trace/events/rpcrdma.h> 16 17 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc); 18 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc); 19 20 /* Each R/W context contains state for one chain of RDMA Read or 21 * Write Work Requests. 22 * 23 * Each WR chain handles a single contiguous server-side buffer, 24 * because scatterlist entries after the first have to start on 25 * page alignment. xdr_buf iovecs cannot guarantee alignment. 26 * 27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment 28 * from a client may contain a unique R_key, so each WR chain moves 29 * up to one segment at a time. 30 * 31 * The scatterlist makes this data structure over 4KB in size. To 32 * make it less likely to fail, and to handle the allocation for 33 * smaller I/O requests without disabling bottom-halves, these 34 * contexts are created on demand, but cached and reused until the 35 * controlling svcxprt_rdma is destroyed. 36 */ 37 struct svc_rdma_rw_ctxt { 38 struct llist_node rw_node; 39 struct list_head rw_list; 40 struct rdma_rw_ctx rw_ctx; 41 unsigned int rw_nents; 42 unsigned int rw_first_sgl_nents; 43 struct sg_table rw_sg_table; 44 struct scatterlist rw_first_sgl[]; 45 }; 46 47 static inline struct svc_rdma_rw_ctxt * 48 svc_rdma_next_ctxt(struct list_head *list) 49 { 50 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, 51 rw_list); 52 } 53 54 static struct svc_rdma_rw_ctxt * 55 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) 56 { 57 struct ib_device *dev = rdma->sc_cm_id->device; 58 unsigned int first_sgl_nents = dev->attrs.max_send_sge; 59 struct svc_rdma_rw_ctxt *ctxt; 60 struct llist_node *node; 61 62 spin_lock(&rdma->sc_rw_ctxt_lock); 63 node = llist_del_first(&rdma->sc_rw_ctxts); 64 spin_unlock(&rdma->sc_rw_ctxt_lock); 65 if (node) { 66 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 67 } else { 68 ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, first_sgl_nents), 69 GFP_KERNEL, ibdev_to_node(dev)); 70 if (!ctxt) 71 goto out_noctx; 72 73 INIT_LIST_HEAD(&ctxt->rw_list); 74 ctxt->rw_first_sgl_nents = first_sgl_nents; 75 } 76 77 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; 78 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, 79 ctxt->rw_sg_table.sgl, 80 first_sgl_nents)) 81 goto out_free; 82 return ctxt; 83 84 out_free: 85 kfree(ctxt); 86 out_noctx: 87 trace_svcrdma_rwctx_empty(rdma, sges); 88 return NULL; 89 } 90 91 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt, 92 struct llist_head *list) 93 { 94 sg_free_table_chained(&ctxt->rw_sg_table, ctxt->rw_first_sgl_nents); 95 llist_add(&ctxt->rw_node, list); 96 } 97 98 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 99 struct svc_rdma_rw_ctxt *ctxt) 100 { 101 __svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts); 102 } 103 104 /** 105 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts 106 * @rdma: transport about to be destroyed 107 * 108 */ 109 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) 110 { 111 struct svc_rdma_rw_ctxt *ctxt; 112 struct llist_node *node; 113 114 while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) { 115 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 116 kfree(ctxt); 117 } 118 } 119 120 /** 121 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O 122 * @rdma: controlling transport instance 123 * @ctxt: R/W context to prepare 124 * @offset: RDMA offset 125 * @handle: RDMA tag/handle 126 * @direction: I/O direction 127 * 128 * Returns on success, the number of WQEs that will be needed 129 * on the workqueue, or a negative errno. 130 */ 131 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma, 132 struct svc_rdma_rw_ctxt *ctxt, 133 u64 offset, u32 handle, 134 enum dma_data_direction direction) 135 { 136 int ret; 137 138 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num, 139 ctxt->rw_sg_table.sgl, ctxt->rw_nents, 140 0, offset, handle, direction); 141 if (unlikely(ret < 0)) { 142 trace_svcrdma_dma_map_rw_err(rdma, offset, handle, 143 ctxt->rw_nents, ret); 144 svc_rdma_put_rw_ctxt(rdma, ctxt); 145 } 146 return ret; 147 } 148 149 /** 150 * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt 151 * @rdma: controlling transport instance 152 * @cc: svc_rdma_chunk_ctxt to be initialized 153 */ 154 void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 155 struct svc_rdma_chunk_ctxt *cc) 156 { 157 struct rpc_rdma_cid *cid = &cc->cc_cid; 158 159 if (unlikely(!cid->ci_completion_id)) 160 svc_rdma_send_cid_init(rdma, cid); 161 162 INIT_LIST_HEAD(&cc->cc_rwctxts); 163 cc->cc_sqecount = 0; 164 } 165 166 /** 167 * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt 168 * @rdma: controlling transport instance 169 * @cc: svc_rdma_chunk_ctxt to be released 170 * @dir: DMA direction 171 */ 172 void svc_rdma_cc_release(struct svcxprt_rdma *rdma, 173 struct svc_rdma_chunk_ctxt *cc, 174 enum dma_data_direction dir) 175 { 176 struct llist_node *first, *last; 177 struct svc_rdma_rw_ctxt *ctxt; 178 LLIST_HEAD(free); 179 180 trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount); 181 182 first = last = NULL; 183 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { 184 list_del(&ctxt->rw_list); 185 186 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, 187 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 188 ctxt->rw_nents, dir); 189 __svc_rdma_put_rw_ctxt(ctxt, &free); 190 191 ctxt->rw_node.next = first; 192 first = &ctxt->rw_node; 193 if (!last) 194 last = first; 195 } 196 if (first) 197 llist_add_batch(first, last, &rdma->sc_rw_ctxts); 198 } 199 200 /* State for sending a Write or Reply chunk. 201 * - Tracks progress of writing one chunk over all its segments 202 * - Stores arguments for the SGL constructor functions 203 */ 204 struct svc_rdma_write_info { 205 struct svcxprt_rdma *wi_rdma; 206 207 const struct svc_rdma_chunk *wi_chunk; 208 209 /* write state of this chunk */ 210 unsigned int wi_seg_off; 211 unsigned int wi_seg_no; 212 213 /* SGL constructor arguments */ 214 const struct xdr_buf *wi_xdr; 215 unsigned char *wi_base; 216 unsigned int wi_next_off; 217 218 struct svc_rdma_chunk_ctxt wi_cc; 219 struct work_struct wi_work; 220 }; 221 222 static struct svc_rdma_write_info * 223 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, 224 const struct svc_rdma_chunk *chunk) 225 { 226 struct svc_rdma_write_info *info; 227 228 info = kzalloc_node(sizeof(*info), GFP_KERNEL, 229 ibdev_to_node(rdma->sc_cm_id->device)); 230 if (!info) 231 return info; 232 233 info->wi_rdma = rdma; 234 info->wi_chunk = chunk; 235 svc_rdma_cc_init(rdma, &info->wi_cc); 236 info->wi_cc.cc_cqe.done = svc_rdma_write_done; 237 return info; 238 } 239 240 static void svc_rdma_write_info_free_async(struct work_struct *work) 241 { 242 struct svc_rdma_write_info *info; 243 244 info = container_of(work, struct svc_rdma_write_info, wi_work); 245 svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE); 246 kfree(info); 247 } 248 249 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 250 { 251 INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async); 252 queue_work(svcrdma_wq, &info->wi_work); 253 } 254 255 /** 256 * svc_rdma_write_done - Write chunk completion 257 * @cq: controlling Completion Queue 258 * @wc: Work Completion 259 * 260 * Pages under I/O are freed by a subsequent Send completion. 261 */ 262 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) 263 { 264 struct svcxprt_rdma *rdma = cq->cq_context; 265 struct ib_cqe *cqe = wc->wr_cqe; 266 struct svc_rdma_chunk_ctxt *cc = 267 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 268 struct svc_rdma_write_info *info = 269 container_of(cc, struct svc_rdma_write_info, wi_cc); 270 271 switch (wc->status) { 272 case IB_WC_SUCCESS: 273 trace_svcrdma_wc_write(&cc->cc_cid); 274 break; 275 case IB_WC_WR_FLUSH_ERR: 276 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid); 277 break; 278 default: 279 trace_svcrdma_wc_write_err(wc, &cc->cc_cid); 280 } 281 282 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); 283 284 if (unlikely(wc->status != IB_WC_SUCCESS)) 285 svc_xprt_deferred_close(&rdma->sc_xprt); 286 287 svc_rdma_write_info_free(info); 288 } 289 290 /** 291 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx 292 * @cq: controlling Completion Queue 293 * @wc: Work Completion 294 * 295 */ 296 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) 297 { 298 struct svcxprt_rdma *rdma = cq->cq_context; 299 struct ib_cqe *cqe = wc->wr_cqe; 300 struct svc_rdma_chunk_ctxt *cc = 301 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 302 struct svc_rdma_recv_ctxt *ctxt; 303 304 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); 305 306 ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc); 307 switch (wc->status) { 308 case IB_WC_SUCCESS: 309 trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes, 310 cc->cc_posttime); 311 312 spin_lock(&rdma->sc_rq_dto_lock); 313 list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q); 314 /* the unlock pairs with the smp_rmb in svc_xprt_ready */ 315 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); 316 spin_unlock(&rdma->sc_rq_dto_lock); 317 svc_xprt_enqueue(&rdma->sc_xprt); 318 return; 319 case IB_WC_WR_FLUSH_ERR: 320 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid); 321 break; 322 default: 323 trace_svcrdma_wc_read_err(wc, &cc->cc_cid); 324 } 325 326 /* The RDMA Read has flushed, so the incoming RPC message 327 * cannot be constructed and must be dropped. Signal the 328 * loss to the client by closing the connection. 329 */ 330 svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE); 331 svc_rdma_recv_ctxt_put(rdma, ctxt); 332 svc_xprt_deferred_close(&rdma->sc_xprt); 333 } 334 335 /* 336 * Assumptions: 337 * - If ib_post_send() succeeds, only one completion is expected, 338 * even if one or more WRs are flushed. This is true when posting 339 * an rdma_rw_ctx or when posting a single signaled WR. 340 */ 341 static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma, 342 struct svc_rdma_chunk_ctxt *cc) 343 { 344 struct ib_send_wr *first_wr; 345 const struct ib_send_wr *bad_wr; 346 struct list_head *tmp; 347 struct ib_cqe *cqe; 348 int ret; 349 350 might_sleep(); 351 352 if (cc->cc_sqecount > rdma->sc_sq_depth) 353 return -EINVAL; 354 355 first_wr = NULL; 356 cqe = &cc->cc_cqe; 357 list_for_each(tmp, &cc->cc_rwctxts) { 358 struct svc_rdma_rw_ctxt *ctxt; 359 360 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); 361 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, 362 rdma->sc_port_num, cqe, first_wr); 363 cqe = NULL; 364 } 365 366 do { 367 if (atomic_sub_return(cc->cc_sqecount, 368 &rdma->sc_sq_avail) > 0) { 369 cc->cc_posttime = ktime_get(); 370 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); 371 if (ret) 372 break; 373 return 0; 374 } 375 376 percpu_counter_inc(&svcrdma_stat_sq_starve); 377 trace_svcrdma_sq_full(rdma, &cc->cc_cid); 378 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 379 wait_event(rdma->sc_send_wait, 380 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); 381 trace_svcrdma_sq_retry(rdma, &cc->cc_cid); 382 } while (1); 383 384 trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret); 385 svc_xprt_deferred_close(&rdma->sc_xprt); 386 387 /* If even one was posted, there will be a completion. */ 388 if (bad_wr != first_wr) 389 return 0; 390 391 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 392 wake_up(&rdma->sc_send_wait); 393 return -ENOTCONN; 394 } 395 396 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf 397 */ 398 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, 399 unsigned int len, 400 struct svc_rdma_rw_ctxt *ctxt) 401 { 402 struct scatterlist *sg = ctxt->rw_sg_table.sgl; 403 404 sg_set_buf(&sg[0], info->wi_base, len); 405 info->wi_base += len; 406 407 ctxt->rw_nents = 1; 408 } 409 410 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist. 411 */ 412 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, 413 unsigned int remaining, 414 struct svc_rdma_rw_ctxt *ctxt) 415 { 416 unsigned int sge_no, sge_bytes, page_off, page_no; 417 const struct xdr_buf *xdr = info->wi_xdr; 418 struct scatterlist *sg; 419 struct page **page; 420 421 page_off = info->wi_next_off + xdr->page_base; 422 page_no = page_off >> PAGE_SHIFT; 423 page_off = offset_in_page(page_off); 424 page = xdr->pages + page_no; 425 info->wi_next_off += remaining; 426 sg = ctxt->rw_sg_table.sgl; 427 sge_no = 0; 428 do { 429 sge_bytes = min_t(unsigned int, remaining, 430 PAGE_SIZE - page_off); 431 sg_set_page(sg, *page, sge_bytes, page_off); 432 433 remaining -= sge_bytes; 434 sg = sg_next(sg); 435 page_off = 0; 436 sge_no++; 437 page++; 438 } while (remaining); 439 440 ctxt->rw_nents = sge_no; 441 } 442 443 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing 444 * an RPC Reply. 445 */ 446 static int 447 svc_rdma_build_writes(struct svc_rdma_write_info *info, 448 void (*constructor)(struct svc_rdma_write_info *info, 449 unsigned int len, 450 struct svc_rdma_rw_ctxt *ctxt), 451 unsigned int remaining) 452 { 453 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 454 struct svcxprt_rdma *rdma = info->wi_rdma; 455 const struct svc_rdma_segment *seg; 456 struct svc_rdma_rw_ctxt *ctxt; 457 int ret; 458 459 do { 460 unsigned int write_len; 461 u64 offset; 462 463 if (info->wi_seg_no >= info->wi_chunk->ch_segcount) 464 goto out_overflow; 465 466 seg = &info->wi_chunk->ch_segments[info->wi_seg_no]; 467 write_len = min(remaining, seg->rs_length - info->wi_seg_off); 468 if (!write_len) 469 goto out_overflow; 470 ctxt = svc_rdma_get_rw_ctxt(rdma, 471 (write_len >> PAGE_SHIFT) + 2); 472 if (!ctxt) 473 return -ENOMEM; 474 475 constructor(info, write_len, ctxt); 476 offset = seg->rs_offset + info->wi_seg_off; 477 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle, 478 DMA_TO_DEVICE); 479 if (ret < 0) 480 return -EIO; 481 percpu_counter_inc(&svcrdma_stat_write); 482 483 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 484 cc->cc_sqecount += ret; 485 if (write_len == seg->rs_length - info->wi_seg_off) { 486 info->wi_seg_no++; 487 info->wi_seg_off = 0; 488 } else { 489 info->wi_seg_off += write_len; 490 } 491 remaining -= write_len; 492 } while (remaining); 493 494 return 0; 495 496 out_overflow: 497 trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no, 498 info->wi_chunk->ch_segcount); 499 return -E2BIG; 500 } 501 502 /** 503 * svc_rdma_iov_write - Construct RDMA Writes from an iov 504 * @info: pointer to write arguments 505 * @iov: kvec to write 506 * 507 * Returns: 508 * On success, returns zero 509 * %-E2BIG if the client-provided Write chunk is too small 510 * %-ENOMEM if a resource has been exhausted 511 * %-EIO if an rdma-rw error occurred 512 */ 513 static int svc_rdma_iov_write(struct svc_rdma_write_info *info, 514 const struct kvec *iov) 515 { 516 info->wi_base = iov->iov_base; 517 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, 518 iov->iov_len); 519 } 520 521 /** 522 * svc_rdma_pages_write - Construct RDMA Writes from pages 523 * @info: pointer to write arguments 524 * @xdr: xdr_buf with pages to write 525 * @offset: offset into the content of @xdr 526 * @length: number of bytes to write 527 * 528 * Returns: 529 * On success, returns zero 530 * %-E2BIG if the client-provided Write chunk is too small 531 * %-ENOMEM if a resource has been exhausted 532 * %-EIO if an rdma-rw error occurred 533 */ 534 static int svc_rdma_pages_write(struct svc_rdma_write_info *info, 535 const struct xdr_buf *xdr, 536 unsigned int offset, 537 unsigned long length) 538 { 539 info->wi_xdr = xdr; 540 info->wi_next_off = offset - xdr->head[0].iov_len; 541 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, 542 length); 543 } 544 545 /** 546 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf 547 * @xdr: xdr_buf to write 548 * @data: pointer to write arguments 549 * 550 * Returns: 551 * On success, returns zero 552 * %-E2BIG if the client-provided Write chunk is too small 553 * %-ENOMEM if a resource has been exhausted 554 * %-EIO if an rdma-rw error occurred 555 */ 556 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data) 557 { 558 struct svc_rdma_write_info *info = data; 559 int ret; 560 561 if (xdr->head[0].iov_len) { 562 ret = svc_rdma_iov_write(info, &xdr->head[0]); 563 if (ret < 0) 564 return ret; 565 } 566 567 if (xdr->page_len) { 568 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len, 569 xdr->page_len); 570 if (ret < 0) 571 return ret; 572 } 573 574 if (xdr->tail[0].iov_len) { 575 ret = svc_rdma_iov_write(info, &xdr->tail[0]); 576 if (ret < 0) 577 return ret; 578 } 579 580 return xdr->len; 581 } 582 583 /** 584 * svc_rdma_send_write_chunk - Write all segments in a Write chunk 585 * @rdma: controlling RDMA transport 586 * @chunk: Write chunk provided by the client 587 * @xdr: xdr_buf containing the data payload 588 * 589 * Returns a non-negative number of bytes the chunk consumed, or 590 * %-E2BIG if the payload was larger than the Write chunk, 591 * %-EINVAL if client provided too many segments, 592 * %-ENOMEM if rdma_rw context pool was exhausted, 593 * %-ENOTCONN if posting failed (connection is lost), 594 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 595 */ 596 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, 597 const struct svc_rdma_chunk *chunk, 598 const struct xdr_buf *xdr) 599 { 600 struct svc_rdma_write_info *info; 601 struct svc_rdma_chunk_ctxt *cc; 602 int ret; 603 604 info = svc_rdma_write_info_alloc(rdma, chunk); 605 if (!info) 606 return -ENOMEM; 607 cc = &info->wi_cc; 608 609 ret = svc_rdma_xb_write(xdr, info); 610 if (ret != xdr->len) 611 goto out_err; 612 613 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); 614 ret = svc_rdma_post_chunk_ctxt(rdma, cc); 615 if (ret < 0) 616 goto out_err; 617 return xdr->len; 618 619 out_err: 620 svc_rdma_write_info_free(info); 621 return ret; 622 } 623 624 /** 625 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk 626 * @rdma: controlling RDMA transport 627 * @rctxt: Write and Reply chunks from client 628 * @xdr: xdr_buf containing an RPC Reply 629 * 630 * Returns a non-negative number of bytes the chunk consumed, or 631 * %-E2BIG if the payload was larger than the Reply chunk, 632 * %-EINVAL if client provided too many segments, 633 * %-ENOMEM if rdma_rw context pool was exhausted, 634 * %-ENOTCONN if posting failed (connection is lost), 635 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 636 */ 637 int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, 638 const struct svc_rdma_recv_ctxt *rctxt, 639 const struct xdr_buf *xdr) 640 { 641 struct svc_rdma_write_info *info; 642 struct svc_rdma_chunk_ctxt *cc; 643 struct svc_rdma_chunk *chunk; 644 int ret; 645 646 if (pcl_is_empty(&rctxt->rc_reply_pcl)) 647 return 0; 648 649 chunk = pcl_first_chunk(&rctxt->rc_reply_pcl); 650 info = svc_rdma_write_info_alloc(rdma, chunk); 651 if (!info) 652 return -ENOMEM; 653 cc = &info->wi_cc; 654 655 ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, 656 svc_rdma_xb_write, info); 657 if (ret < 0) 658 goto out_err; 659 660 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); 661 ret = svc_rdma_post_chunk_ctxt(rdma, cc); 662 if (ret < 0) 663 goto out_err; 664 665 return xdr->len; 666 667 out_err: 668 svc_rdma_write_info_free(info); 669 return ret; 670 } 671 672 /** 673 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment 674 * @rqstp: RPC transaction context 675 * @head: context for ongoing I/O 676 * @segment: co-ordinates of remote memory to be read 677 * 678 * Returns: 679 * %0: the Read WR chain was constructed successfully 680 * %-EINVAL: there were not enough rq_pages to finish 681 * %-ENOMEM: allocating a local resources failed 682 * %-EIO: a DMA mapping error occurred 683 */ 684 static int svc_rdma_build_read_segment(struct svc_rqst *rqstp, 685 struct svc_rdma_recv_ctxt *head, 686 const struct svc_rdma_segment *segment) 687 { 688 struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp); 689 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; 690 unsigned int sge_no, seg_len, len; 691 struct svc_rdma_rw_ctxt *ctxt; 692 struct scatterlist *sg; 693 int ret; 694 695 len = segment->rs_length; 696 sge_no = PAGE_ALIGN(head->rc_pageoff + len) >> PAGE_SHIFT; 697 ctxt = svc_rdma_get_rw_ctxt(rdma, sge_no); 698 if (!ctxt) 699 return -ENOMEM; 700 ctxt->rw_nents = sge_no; 701 702 sg = ctxt->rw_sg_table.sgl; 703 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { 704 seg_len = min_t(unsigned int, len, 705 PAGE_SIZE - head->rc_pageoff); 706 707 if (!head->rc_pageoff) 708 head->rc_page_count++; 709 710 sg_set_page(sg, rqstp->rq_pages[head->rc_curpage], 711 seg_len, head->rc_pageoff); 712 sg = sg_next(sg); 713 714 head->rc_pageoff += seg_len; 715 if (head->rc_pageoff == PAGE_SIZE) { 716 head->rc_curpage++; 717 head->rc_pageoff = 0; 718 } 719 len -= seg_len; 720 721 if (len && ((head->rc_curpage + 1) > ARRAY_SIZE(rqstp->rq_pages))) 722 goto out_overrun; 723 } 724 725 ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset, 726 segment->rs_handle, DMA_FROM_DEVICE); 727 if (ret < 0) 728 return -EIO; 729 percpu_counter_inc(&svcrdma_stat_read); 730 731 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 732 cc->cc_sqecount += ret; 733 return 0; 734 735 out_overrun: 736 trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage); 737 return -EINVAL; 738 } 739 740 /** 741 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk 742 * @rqstp: RPC transaction context 743 * @head: context for ongoing I/O 744 * @chunk: Read chunk to pull 745 * 746 * Return values: 747 * %0: the Read WR chain was constructed successfully 748 * %-EINVAL: there were not enough resources to finish 749 * %-ENOMEM: allocating a local resources failed 750 * %-EIO: a DMA mapping error occurred 751 */ 752 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, 753 struct svc_rdma_recv_ctxt *head, 754 const struct svc_rdma_chunk *chunk) 755 { 756 const struct svc_rdma_segment *segment; 757 int ret; 758 759 ret = -EINVAL; 760 pcl_for_each_segment(segment, chunk) { 761 ret = svc_rdma_build_read_segment(rqstp, head, segment); 762 if (ret < 0) 763 break; 764 head->rc_readbytes += segment->rs_length; 765 } 766 return ret; 767 } 768 769 /** 770 * svc_rdma_copy_inline_range - Copy part of the inline content into pages 771 * @rqstp: RPC transaction context 772 * @head: context for ongoing I/O 773 * @offset: offset into the Receive buffer of region to copy 774 * @remaining: length of region to copy 775 * 776 * Take a page at a time from rqstp->rq_pages and copy the inline 777 * content from the Receive buffer into that page. Update 778 * head->rc_curpage and head->rc_pageoff so that the next RDMA Read 779 * result will land contiguously with the copied content. 780 * 781 * Return values: 782 * %0: Inline content was successfully copied 783 * %-EINVAL: offset or length was incorrect 784 */ 785 static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp, 786 struct svc_rdma_recv_ctxt *head, 787 unsigned int offset, 788 unsigned int remaining) 789 { 790 unsigned char *dst, *src = head->rc_recv_buf; 791 unsigned int page_no, numpages; 792 793 numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT; 794 for (page_no = 0; page_no < numpages; page_no++) { 795 unsigned int page_len; 796 797 page_len = min_t(unsigned int, remaining, 798 PAGE_SIZE - head->rc_pageoff); 799 800 if (!head->rc_pageoff) 801 head->rc_page_count++; 802 803 dst = page_address(rqstp->rq_pages[head->rc_curpage]); 804 memcpy(dst + head->rc_curpage, src + offset, page_len); 805 806 head->rc_readbytes += page_len; 807 head->rc_pageoff += page_len; 808 if (head->rc_pageoff == PAGE_SIZE) { 809 head->rc_curpage++; 810 head->rc_pageoff = 0; 811 } 812 remaining -= page_len; 813 offset += page_len; 814 } 815 816 return -EINVAL; 817 } 818 819 /** 820 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks 821 * @rqstp: RPC transaction context 822 * @head: context for ongoing I/O 823 * 824 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages, 825 * like an incoming TCP call. 826 * 827 * Return values: 828 * %0: RDMA Read WQEs were successfully built 829 * %-EINVAL: client provided too many chunks or segments, 830 * %-ENOMEM: rdma_rw context pool was exhausted, 831 * %-ENOTCONN: posting failed (connection is lost), 832 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 833 */ 834 static noinline int 835 svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp, 836 struct svc_rdma_recv_ctxt *head) 837 { 838 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 839 struct svc_rdma_chunk *chunk, *next; 840 unsigned int start, length; 841 int ret; 842 843 start = 0; 844 chunk = pcl_first_chunk(pcl); 845 length = chunk->ch_position; 846 ret = svc_rdma_copy_inline_range(rqstp, head, start, length); 847 if (ret < 0) 848 return ret; 849 850 pcl_for_each_chunk(chunk, pcl) { 851 ret = svc_rdma_build_read_chunk(rqstp, head, chunk); 852 if (ret < 0) 853 return ret; 854 855 next = pcl_next_chunk(pcl, chunk); 856 if (!next) 857 break; 858 859 start += length; 860 length = next->ch_position - head->rc_readbytes; 861 ret = svc_rdma_copy_inline_range(rqstp, head, start, length); 862 if (ret < 0) 863 return ret; 864 } 865 866 start += length; 867 length = head->rc_byte_len - start; 868 return svc_rdma_copy_inline_range(rqstp, head, start, length); 869 } 870 871 /** 872 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks 873 * @rqstp: RPC transaction context 874 * @head: context for ongoing I/O 875 * 876 * The chunk data lands in the page list of rqstp->rq_arg.pages. 877 * 878 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec. 879 * Therefore, XDR round-up of the Read chunk and trailing 880 * inline content must both be added at the end of the pagelist. 881 * 882 * Return values: 883 * %0: RDMA Read WQEs were successfully built 884 * %-EINVAL: client provided too many chunks or segments, 885 * %-ENOMEM: rdma_rw context pool was exhausted, 886 * %-ENOTCONN: posting failed (connection is lost), 887 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 888 */ 889 static int svc_rdma_read_data_item(struct svc_rqst *rqstp, 890 struct svc_rdma_recv_ctxt *head) 891 { 892 return svc_rdma_build_read_chunk(rqstp, head, 893 pcl_first_chunk(&head->rc_read_pcl)); 894 } 895 896 /** 897 * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk 898 * @rqstp: RPC transaction context 899 * @head: context for ongoing I/O 900 * @chunk: parsed Call chunk to pull 901 * @offset: offset of region to pull 902 * @length: length of region to pull 903 * 904 * Return values: 905 * %0: RDMA Read WQEs were successfully built 906 * %-EINVAL: there were not enough resources to finish 907 * %-ENOMEM: rdma_rw context pool was exhausted, 908 * %-ENOTCONN: posting failed (connection is lost), 909 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 910 */ 911 static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp, 912 struct svc_rdma_recv_ctxt *head, 913 const struct svc_rdma_chunk *chunk, 914 unsigned int offset, unsigned int length) 915 { 916 const struct svc_rdma_segment *segment; 917 int ret; 918 919 ret = -EINVAL; 920 pcl_for_each_segment(segment, chunk) { 921 struct svc_rdma_segment dummy; 922 923 if (offset > segment->rs_length) { 924 offset -= segment->rs_length; 925 continue; 926 } 927 928 dummy.rs_handle = segment->rs_handle; 929 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset; 930 dummy.rs_offset = segment->rs_offset + offset; 931 932 ret = svc_rdma_build_read_segment(rqstp, head, &dummy); 933 if (ret < 0) 934 break; 935 936 head->rc_readbytes += dummy.rs_length; 937 length -= dummy.rs_length; 938 offset = 0; 939 } 940 return ret; 941 } 942 943 /** 944 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message 945 * @rqstp: RPC transaction context 946 * @head: context for ongoing I/O 947 * 948 * Return values: 949 * %0: RDMA Read WQEs were successfully built 950 * %-EINVAL: there were not enough resources to finish 951 * %-ENOMEM: rdma_rw context pool was exhausted, 952 * %-ENOTCONN: posting failed (connection is lost), 953 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 954 */ 955 static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp, 956 struct svc_rdma_recv_ctxt *head) 957 { 958 const struct svc_rdma_chunk *call_chunk = 959 pcl_first_chunk(&head->rc_call_pcl); 960 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 961 struct svc_rdma_chunk *chunk, *next; 962 unsigned int start, length; 963 int ret; 964 965 if (pcl_is_empty(pcl)) 966 return svc_rdma_build_read_chunk(rqstp, head, call_chunk); 967 968 start = 0; 969 chunk = pcl_first_chunk(pcl); 970 length = chunk->ch_position; 971 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, 972 start, length); 973 if (ret < 0) 974 return ret; 975 976 pcl_for_each_chunk(chunk, pcl) { 977 ret = svc_rdma_build_read_chunk(rqstp, head, chunk); 978 if (ret < 0) 979 return ret; 980 981 next = pcl_next_chunk(pcl, chunk); 982 if (!next) 983 break; 984 985 start += length; 986 length = next->ch_position - head->rc_readbytes; 987 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, 988 start, length); 989 if (ret < 0) 990 return ret; 991 } 992 993 start += length; 994 length = call_chunk->ch_length - start; 995 return svc_rdma_read_chunk_range(rqstp, head, call_chunk, 996 start, length); 997 } 998 999 /** 1000 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message 1001 * @rqstp: RPC transaction context 1002 * @head: context for ongoing I/O 1003 * 1004 * The start of the data lands in the first page just after the 1005 * Transport header, and the rest lands in rqstp->rq_arg.pages. 1006 * 1007 * Assumptions: 1008 * - A PZRC is never sent in an RDMA_MSG message, though it's 1009 * allowed by spec. 1010 * 1011 * Return values: 1012 * %0: RDMA Read WQEs were successfully built 1013 * %-EINVAL: client provided too many chunks or segments, 1014 * %-ENOMEM: rdma_rw context pool was exhausted, 1015 * %-ENOTCONN: posting failed (connection is lost), 1016 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1017 */ 1018 static noinline int svc_rdma_read_special(struct svc_rqst *rqstp, 1019 struct svc_rdma_recv_ctxt *head) 1020 { 1021 return svc_rdma_read_call_chunk(rqstp, head); 1022 } 1023 1024 /* Pages under I/O have been copied to head->rc_pages. Ensure that 1025 * svc_xprt_release() does not put them when svc_rdma_recvfrom() 1026 * returns. This has to be done after all Read WRs are constructed 1027 * to properly handle a page that happens to be part of I/O on behalf 1028 * of two different RDMA segments. 1029 * 1030 * Note: if the subsequent post_send fails, these pages have already 1031 * been moved to head->rc_pages and thus will be cleaned up by 1032 * svc_rdma_recv_ctxt_put(). 1033 */ 1034 static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp, 1035 struct svc_rdma_recv_ctxt *head) 1036 { 1037 unsigned int i; 1038 1039 for (i = 0; i < head->rc_page_count; i++) { 1040 head->rc_pages[i] = rqstp->rq_pages[i]; 1041 rqstp->rq_pages[i] = NULL; 1042 } 1043 } 1044 1045 /** 1046 * svc_rdma_process_read_list - Pull list of Read chunks from the client 1047 * @rdma: controlling RDMA transport 1048 * @rqstp: set of pages to use as Read sink buffers 1049 * @head: pages under I/O collect here 1050 * 1051 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders 1052 * pull each Read chunk as they decode an incoming RPC message. 1053 * 1054 * On Linux, however, the server needs to have a fully-constructed RPC 1055 * message in rqstp->rq_arg when there is a positive return code from 1056 * ->xpo_recvfrom. So the Read list is safety-checked immediately when 1057 * it is received, then here the whole Read list is pulled all at once. 1058 * The ingress RPC message is fully reconstructed once all associated 1059 * RDMA Reads have completed. 1060 * 1061 * Return values: 1062 * %1: all needed RDMA Reads were posted successfully, 1063 * %-EINVAL: client provided too many chunks or segments, 1064 * %-ENOMEM: rdma_rw context pool was exhausted, 1065 * %-ENOTCONN: posting failed (connection is lost), 1066 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1067 */ 1068 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, 1069 struct svc_rqst *rqstp, 1070 struct svc_rdma_recv_ctxt *head) 1071 { 1072 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; 1073 int ret; 1074 1075 cc->cc_cqe.done = svc_rdma_wc_read_done; 1076 cc->cc_sqecount = 0; 1077 head->rc_pageoff = 0; 1078 head->rc_curpage = 0; 1079 head->rc_readbytes = 0; 1080 1081 if (pcl_is_empty(&head->rc_call_pcl)) { 1082 if (head->rc_read_pcl.cl_count == 1) 1083 ret = svc_rdma_read_data_item(rqstp, head); 1084 else 1085 ret = svc_rdma_read_multiple_chunks(rqstp, head); 1086 } else 1087 ret = svc_rdma_read_special(rqstp, head); 1088 svc_rdma_clear_rqst_pages(rqstp, head); 1089 if (ret < 0) 1090 return ret; 1091 1092 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); 1093 ret = svc_rdma_post_chunk_ctxt(rdma, cc); 1094 return ret < 0 ? ret : 1; 1095 } 1096