1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2016-2018 Oracle. All rights reserved. 4 * 5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. 6 */ 7 8 #include <rdma/rw.h> 9 10 #include <linux/sunrpc/xdr.h> 11 #include <linux/sunrpc/rpc_rdma.h> 12 #include <linux/sunrpc/svc_rdma.h> 13 14 #include "xprt_rdma.h" 15 #include <trace/events/rpcrdma.h> 16 17 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc); 18 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc); 19 20 /* Each R/W context contains state for one chain of RDMA Read or 21 * Write Work Requests. 22 * 23 * Each WR chain handles a single contiguous server-side buffer, 24 * because scatterlist entries after the first have to start on 25 * page alignment. xdr_buf iovecs cannot guarantee alignment. 26 * 27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment 28 * from a client may contain a unique R_key, so each WR chain moves 29 * up to one segment at a time. 30 * 31 * The scatterlist makes this data structure over 4KB in size. To 32 * make it less likely to fail, and to handle the allocation for 33 * smaller I/O requests without disabling bottom-halves, these 34 * contexts are created on demand, but cached and reused until the 35 * controlling svcxprt_rdma is destroyed. 36 */ 37 struct svc_rdma_rw_ctxt { 38 struct llist_node rw_node; 39 struct list_head rw_list; 40 struct rdma_rw_ctx rw_ctx; 41 unsigned int rw_nents; 42 unsigned int rw_first_sgl_nents; 43 struct sg_table rw_sg_table; 44 struct scatterlist rw_first_sgl[]; 45 }; 46 47 static inline struct svc_rdma_rw_ctxt * 48 svc_rdma_next_ctxt(struct list_head *list) 49 { 50 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, 51 rw_list); 52 } 53 54 static struct svc_rdma_rw_ctxt * 55 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) 56 { 57 struct ib_device *dev = rdma->sc_cm_id->device; 58 unsigned int first_sgl_nents = dev->attrs.max_send_sge; 59 struct svc_rdma_rw_ctxt *ctxt; 60 struct llist_node *node; 61 62 spin_lock(&rdma->sc_rw_ctxt_lock); 63 node = llist_del_first(&rdma->sc_rw_ctxts); 64 spin_unlock(&rdma->sc_rw_ctxt_lock); 65 if (node) { 66 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 67 } else { 68 ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, first_sgl_nents), 69 GFP_KERNEL, ibdev_to_node(dev)); 70 if (!ctxt) 71 goto out_noctx; 72 73 INIT_LIST_HEAD(&ctxt->rw_list); 74 ctxt->rw_first_sgl_nents = first_sgl_nents; 75 } 76 77 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; 78 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, 79 ctxt->rw_sg_table.sgl, 80 first_sgl_nents)) 81 goto out_free; 82 return ctxt; 83 84 out_free: 85 kfree(ctxt); 86 out_noctx: 87 trace_svcrdma_rwctx_empty(rdma, sges); 88 return NULL; 89 } 90 91 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt, 92 struct llist_head *list) 93 { 94 sg_free_table_chained(&ctxt->rw_sg_table, ctxt->rw_first_sgl_nents); 95 llist_add(&ctxt->rw_node, list); 96 } 97 98 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 99 struct svc_rdma_rw_ctxt *ctxt) 100 { 101 __svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts); 102 } 103 104 /** 105 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts 106 * @rdma: transport about to be destroyed 107 * 108 */ 109 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) 110 { 111 struct svc_rdma_rw_ctxt *ctxt; 112 struct llist_node *node; 113 114 while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) { 115 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 116 kfree(ctxt); 117 } 118 } 119 120 /** 121 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O 122 * @rdma: controlling transport instance 123 * @ctxt: R/W context to prepare 124 * @offset: RDMA offset 125 * @handle: RDMA tag/handle 126 * @direction: I/O direction 127 * 128 * Returns on success, the number of WQEs that will be needed 129 * on the workqueue, or a negative errno. 130 */ 131 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma, 132 struct svc_rdma_rw_ctxt *ctxt, 133 u64 offset, u32 handle, 134 enum dma_data_direction direction) 135 { 136 int ret; 137 138 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num, 139 ctxt->rw_sg_table.sgl, ctxt->rw_nents, 140 0, offset, handle, direction); 141 if (unlikely(ret < 0)) { 142 trace_svcrdma_dma_map_rw_err(rdma, offset, handle, 143 ctxt->rw_nents, ret); 144 svc_rdma_put_rw_ctxt(rdma, ctxt); 145 } 146 return ret; 147 } 148 149 /** 150 * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt 151 * @rdma: controlling transport instance 152 * @cc: svc_rdma_chunk_ctxt to be initialized 153 */ 154 void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 155 struct svc_rdma_chunk_ctxt *cc) 156 { 157 struct rpc_rdma_cid *cid = &cc->cc_cid; 158 159 if (unlikely(!cid->ci_completion_id)) 160 svc_rdma_send_cid_init(rdma, cid); 161 162 INIT_LIST_HEAD(&cc->cc_rwctxts); 163 cc->cc_sqecount = 0; 164 } 165 166 /** 167 * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt 168 * @rdma: controlling transport instance 169 * @cc: svc_rdma_chunk_ctxt to be released 170 * @dir: DMA direction 171 */ 172 void svc_rdma_cc_release(struct svcxprt_rdma *rdma, 173 struct svc_rdma_chunk_ctxt *cc, 174 enum dma_data_direction dir) 175 { 176 struct llist_node *first, *last; 177 struct svc_rdma_rw_ctxt *ctxt; 178 LLIST_HEAD(free); 179 180 trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount); 181 182 first = last = NULL; 183 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { 184 list_del(&ctxt->rw_list); 185 186 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, 187 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 188 ctxt->rw_nents, dir); 189 __svc_rdma_put_rw_ctxt(ctxt, &free); 190 191 ctxt->rw_node.next = first; 192 first = &ctxt->rw_node; 193 if (!last) 194 last = first; 195 } 196 if (first) 197 llist_add_batch(first, last, &rdma->sc_rw_ctxts); 198 } 199 200 static struct svc_rdma_write_info * 201 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, 202 const struct svc_rdma_chunk *chunk) 203 { 204 struct svc_rdma_write_info *info; 205 206 info = kzalloc_node(sizeof(*info), GFP_KERNEL, 207 ibdev_to_node(rdma->sc_cm_id->device)); 208 if (!info) 209 return info; 210 211 info->wi_rdma = rdma; 212 info->wi_chunk = chunk; 213 svc_rdma_cc_init(rdma, &info->wi_cc); 214 info->wi_cc.cc_cqe.done = svc_rdma_write_done; 215 return info; 216 } 217 218 static void svc_rdma_write_info_free_async(struct work_struct *work) 219 { 220 struct svc_rdma_write_info *info; 221 222 info = container_of(work, struct svc_rdma_write_info, wi_work); 223 svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE); 224 kfree(info); 225 } 226 227 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 228 { 229 INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async); 230 queue_work(svcrdma_wq, &info->wi_work); 231 } 232 233 /** 234 * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources 235 * @rdma: controlling transport 236 * @ctxt: Send context that is being released 237 */ 238 void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma, 239 struct svc_rdma_send_ctxt *ctxt) 240 { 241 struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc; 242 243 if (!cc->cc_sqecount) 244 return; 245 svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE); 246 } 247 248 /** 249 * svc_rdma_reply_done - Reply chunk Write completion handler 250 * @cq: controlling Completion Queue 251 * @wc: Work Completion report 252 * 253 * Pages under I/O are released by a subsequent Send completion. 254 */ 255 static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc) 256 { 257 struct ib_cqe *cqe = wc->wr_cqe; 258 struct svc_rdma_chunk_ctxt *cc = 259 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 260 struct svcxprt_rdma *rdma = cq->cq_context; 261 262 switch (wc->status) { 263 case IB_WC_SUCCESS: 264 trace_svcrdma_wc_reply(&cc->cc_cid); 265 return; 266 case IB_WC_WR_FLUSH_ERR: 267 trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid); 268 break; 269 default: 270 trace_svcrdma_wc_reply_err(wc, &cc->cc_cid); 271 } 272 273 svc_xprt_deferred_close(&rdma->sc_xprt); 274 } 275 276 /** 277 * svc_rdma_write_done - Write chunk completion 278 * @cq: controlling Completion Queue 279 * @wc: Work Completion 280 * 281 * Pages under I/O are freed by a subsequent Send completion. 282 */ 283 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) 284 { 285 struct svcxprt_rdma *rdma = cq->cq_context; 286 struct ib_cqe *cqe = wc->wr_cqe; 287 struct svc_rdma_chunk_ctxt *cc = 288 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 289 struct svc_rdma_write_info *info = 290 container_of(cc, struct svc_rdma_write_info, wi_cc); 291 292 switch (wc->status) { 293 case IB_WC_SUCCESS: 294 trace_svcrdma_wc_write(&cc->cc_cid); 295 break; 296 case IB_WC_WR_FLUSH_ERR: 297 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid); 298 break; 299 default: 300 trace_svcrdma_wc_write_err(wc, &cc->cc_cid); 301 } 302 303 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); 304 305 if (unlikely(wc->status != IB_WC_SUCCESS)) 306 svc_xprt_deferred_close(&rdma->sc_xprt); 307 308 svc_rdma_write_info_free(info); 309 } 310 311 /** 312 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx 313 * @cq: controlling Completion Queue 314 * @wc: Work Completion 315 * 316 */ 317 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) 318 { 319 struct svcxprt_rdma *rdma = cq->cq_context; 320 struct ib_cqe *cqe = wc->wr_cqe; 321 struct svc_rdma_chunk_ctxt *cc = 322 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 323 struct svc_rdma_recv_ctxt *ctxt; 324 325 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); 326 327 ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc); 328 switch (wc->status) { 329 case IB_WC_SUCCESS: 330 trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes, 331 cc->cc_posttime); 332 333 spin_lock(&rdma->sc_rq_dto_lock); 334 list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q); 335 /* the unlock pairs with the smp_rmb in svc_xprt_ready */ 336 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); 337 spin_unlock(&rdma->sc_rq_dto_lock); 338 svc_xprt_enqueue(&rdma->sc_xprt); 339 return; 340 case IB_WC_WR_FLUSH_ERR: 341 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid); 342 break; 343 default: 344 trace_svcrdma_wc_read_err(wc, &cc->cc_cid); 345 } 346 347 /* The RDMA Read has flushed, so the incoming RPC message 348 * cannot be constructed and must be dropped. Signal the 349 * loss to the client by closing the connection. 350 */ 351 svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE); 352 svc_rdma_recv_ctxt_put(rdma, ctxt); 353 svc_xprt_deferred_close(&rdma->sc_xprt); 354 } 355 356 /* 357 * Assumptions: 358 * - If ib_post_send() succeeds, only one completion is expected, 359 * even if one or more WRs are flushed. This is true when posting 360 * an rdma_rw_ctx or when posting a single signaled WR. 361 */ 362 static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma, 363 struct svc_rdma_chunk_ctxt *cc) 364 { 365 struct ib_send_wr *first_wr; 366 const struct ib_send_wr *bad_wr; 367 struct list_head *tmp; 368 struct ib_cqe *cqe; 369 int ret; 370 371 might_sleep(); 372 373 if (cc->cc_sqecount > rdma->sc_sq_depth) 374 return -EINVAL; 375 376 first_wr = NULL; 377 cqe = &cc->cc_cqe; 378 list_for_each(tmp, &cc->cc_rwctxts) { 379 struct svc_rdma_rw_ctxt *ctxt; 380 381 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); 382 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, 383 rdma->sc_port_num, cqe, first_wr); 384 cqe = NULL; 385 } 386 387 do { 388 if (atomic_sub_return(cc->cc_sqecount, 389 &rdma->sc_sq_avail) > 0) { 390 cc->cc_posttime = ktime_get(); 391 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); 392 if (ret) 393 break; 394 return 0; 395 } 396 397 percpu_counter_inc(&svcrdma_stat_sq_starve); 398 trace_svcrdma_sq_full(rdma, &cc->cc_cid); 399 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 400 wait_event(rdma->sc_send_wait, 401 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); 402 trace_svcrdma_sq_retry(rdma, &cc->cc_cid); 403 } while (1); 404 405 trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret); 406 svc_xprt_deferred_close(&rdma->sc_xprt); 407 408 /* If even one was posted, there will be a completion. */ 409 if (bad_wr != first_wr) 410 return 0; 411 412 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 413 wake_up(&rdma->sc_send_wait); 414 return -ENOTCONN; 415 } 416 417 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf 418 */ 419 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, 420 unsigned int len, 421 struct svc_rdma_rw_ctxt *ctxt) 422 { 423 struct scatterlist *sg = ctxt->rw_sg_table.sgl; 424 425 sg_set_buf(&sg[0], info->wi_base, len); 426 info->wi_base += len; 427 428 ctxt->rw_nents = 1; 429 } 430 431 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist. 432 */ 433 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, 434 unsigned int remaining, 435 struct svc_rdma_rw_ctxt *ctxt) 436 { 437 unsigned int sge_no, sge_bytes, page_off, page_no; 438 const struct xdr_buf *xdr = info->wi_xdr; 439 struct scatterlist *sg; 440 struct page **page; 441 442 page_off = info->wi_next_off + xdr->page_base; 443 page_no = page_off >> PAGE_SHIFT; 444 page_off = offset_in_page(page_off); 445 page = xdr->pages + page_no; 446 info->wi_next_off += remaining; 447 sg = ctxt->rw_sg_table.sgl; 448 sge_no = 0; 449 do { 450 sge_bytes = min_t(unsigned int, remaining, 451 PAGE_SIZE - page_off); 452 sg_set_page(sg, *page, sge_bytes, page_off); 453 454 remaining -= sge_bytes; 455 sg = sg_next(sg); 456 page_off = 0; 457 sge_no++; 458 page++; 459 } while (remaining); 460 461 ctxt->rw_nents = sge_no; 462 } 463 464 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing 465 * an RPC Reply. 466 */ 467 static int 468 svc_rdma_build_writes(struct svc_rdma_write_info *info, 469 void (*constructor)(struct svc_rdma_write_info *info, 470 unsigned int len, 471 struct svc_rdma_rw_ctxt *ctxt), 472 unsigned int remaining) 473 { 474 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 475 struct svcxprt_rdma *rdma = info->wi_rdma; 476 const struct svc_rdma_segment *seg; 477 struct svc_rdma_rw_ctxt *ctxt; 478 int ret; 479 480 do { 481 unsigned int write_len; 482 u64 offset; 483 484 if (info->wi_seg_no >= info->wi_chunk->ch_segcount) 485 goto out_overflow; 486 487 seg = &info->wi_chunk->ch_segments[info->wi_seg_no]; 488 write_len = min(remaining, seg->rs_length - info->wi_seg_off); 489 if (!write_len) 490 goto out_overflow; 491 ctxt = svc_rdma_get_rw_ctxt(rdma, 492 (write_len >> PAGE_SHIFT) + 2); 493 if (!ctxt) 494 return -ENOMEM; 495 496 constructor(info, write_len, ctxt); 497 offset = seg->rs_offset + info->wi_seg_off; 498 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle, 499 DMA_TO_DEVICE); 500 if (ret < 0) 501 return -EIO; 502 percpu_counter_inc(&svcrdma_stat_write); 503 504 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 505 cc->cc_sqecount += ret; 506 if (write_len == seg->rs_length - info->wi_seg_off) { 507 info->wi_seg_no++; 508 info->wi_seg_off = 0; 509 } else { 510 info->wi_seg_off += write_len; 511 } 512 remaining -= write_len; 513 } while (remaining); 514 515 return 0; 516 517 out_overflow: 518 trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no, 519 info->wi_chunk->ch_segcount); 520 return -E2BIG; 521 } 522 523 /** 524 * svc_rdma_iov_write - Construct RDMA Writes from an iov 525 * @info: pointer to write arguments 526 * @iov: kvec to write 527 * 528 * Returns: 529 * On success, returns zero 530 * %-E2BIG if the client-provided Write chunk is too small 531 * %-ENOMEM if a resource has been exhausted 532 * %-EIO if an rdma-rw error occurred 533 */ 534 static int svc_rdma_iov_write(struct svc_rdma_write_info *info, 535 const struct kvec *iov) 536 { 537 info->wi_base = iov->iov_base; 538 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, 539 iov->iov_len); 540 } 541 542 /** 543 * svc_rdma_pages_write - Construct RDMA Writes from pages 544 * @info: pointer to write arguments 545 * @xdr: xdr_buf with pages to write 546 * @offset: offset into the content of @xdr 547 * @length: number of bytes to write 548 * 549 * Returns: 550 * On success, returns zero 551 * %-E2BIG if the client-provided Write chunk is too small 552 * %-ENOMEM if a resource has been exhausted 553 * %-EIO if an rdma-rw error occurred 554 */ 555 static int svc_rdma_pages_write(struct svc_rdma_write_info *info, 556 const struct xdr_buf *xdr, 557 unsigned int offset, 558 unsigned long length) 559 { 560 info->wi_xdr = xdr; 561 info->wi_next_off = offset - xdr->head[0].iov_len; 562 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, 563 length); 564 } 565 566 /** 567 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf 568 * @xdr: xdr_buf to write 569 * @data: pointer to write arguments 570 * 571 * Returns: 572 * On success, returns zero 573 * %-E2BIG if the client-provided Write chunk is too small 574 * %-ENOMEM if a resource has been exhausted 575 * %-EIO if an rdma-rw error occurred 576 */ 577 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data) 578 { 579 struct svc_rdma_write_info *info = data; 580 int ret; 581 582 if (xdr->head[0].iov_len) { 583 ret = svc_rdma_iov_write(info, &xdr->head[0]); 584 if (ret < 0) 585 return ret; 586 } 587 588 if (xdr->page_len) { 589 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len, 590 xdr->page_len); 591 if (ret < 0) 592 return ret; 593 } 594 595 if (xdr->tail[0].iov_len) { 596 ret = svc_rdma_iov_write(info, &xdr->tail[0]); 597 if (ret < 0) 598 return ret; 599 } 600 601 return xdr->len; 602 } 603 604 static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, 605 const struct svc_rdma_chunk *chunk, 606 const struct xdr_buf *xdr) 607 { 608 struct svc_rdma_write_info *info; 609 struct svc_rdma_chunk_ctxt *cc; 610 struct xdr_buf payload; 611 int ret; 612 613 if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position, 614 chunk->ch_payload_length)) 615 return -EMSGSIZE; 616 617 info = svc_rdma_write_info_alloc(rdma, chunk); 618 if (!info) 619 return -ENOMEM; 620 cc = &info->wi_cc; 621 622 ret = svc_rdma_xb_write(&payload, info); 623 if (ret != payload.len) 624 goto out_err; 625 626 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); 627 ret = svc_rdma_post_chunk_ctxt(rdma, cc); 628 if (ret < 0) 629 goto out_err; 630 return 0; 631 632 out_err: 633 svc_rdma_write_info_free(info); 634 return ret; 635 } 636 637 /** 638 * svc_rdma_send_write_list - Send all chunks on the Write list 639 * @rdma: controlling RDMA transport 640 * @rctxt: Write list provisioned by the client 641 * @xdr: xdr_buf containing an RPC Reply message 642 * 643 * Returns zero on success, or a negative errno if one or more 644 * Write chunks could not be sent. 645 */ 646 int svc_rdma_send_write_list(struct svcxprt_rdma *rdma, 647 const struct svc_rdma_recv_ctxt *rctxt, 648 const struct xdr_buf *xdr) 649 { 650 struct svc_rdma_chunk *chunk; 651 int ret; 652 653 pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) { 654 if (!chunk->ch_payload_length) 655 break; 656 ret = svc_rdma_send_write_chunk(rdma, chunk, xdr); 657 if (ret < 0) 658 return ret; 659 } 660 return 0; 661 } 662 663 /** 664 * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk 665 * @rdma: controlling RDMA transport 666 * @write_pcl: Write chunk list provided by client 667 * @reply_pcl: Reply chunk provided by client 668 * @sctxt: Send WR resources 669 * @xdr: xdr_buf containing an RPC Reply 670 * 671 * Returns a non-negative number of bytes the chunk consumed, or 672 * %-E2BIG if the payload was larger than the Reply chunk, 673 * %-EINVAL if client provided too many segments, 674 * %-ENOMEM if rdma_rw context pool was exhausted, 675 * %-ENOTCONN if posting failed (connection is lost), 676 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 677 */ 678 int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma, 679 const struct svc_rdma_pcl *write_pcl, 680 const struct svc_rdma_pcl *reply_pcl, 681 struct svc_rdma_send_ctxt *sctxt, 682 const struct xdr_buf *xdr) 683 { 684 struct svc_rdma_write_info *info = &sctxt->sc_reply_info; 685 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 686 struct ib_send_wr *first_wr; 687 struct list_head *pos; 688 struct ib_cqe *cqe; 689 int ret; 690 691 info->wi_rdma = rdma; 692 info->wi_chunk = pcl_first_chunk(reply_pcl); 693 info->wi_seg_off = 0; 694 info->wi_seg_no = 0; 695 info->wi_cc.cc_cqe.done = svc_rdma_reply_done; 696 697 ret = pcl_process_nonpayloads(write_pcl, xdr, 698 svc_rdma_xb_write, info); 699 if (ret < 0) 700 return ret; 701 702 first_wr = sctxt->sc_wr_chain; 703 cqe = &cc->cc_cqe; 704 list_for_each(pos, &cc->cc_rwctxts) { 705 struct svc_rdma_rw_ctxt *rwc; 706 707 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list); 708 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp, 709 rdma->sc_port_num, cqe, first_wr); 710 cqe = NULL; 711 } 712 sctxt->sc_wr_chain = first_wr; 713 sctxt->sc_sqecount += cc->cc_sqecount; 714 715 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); 716 return xdr->len; 717 } 718 719 /** 720 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment 721 * @rqstp: RPC transaction context 722 * @head: context for ongoing I/O 723 * @segment: co-ordinates of remote memory to be read 724 * 725 * Returns: 726 * %0: the Read WR chain was constructed successfully 727 * %-EINVAL: there were not enough rq_pages to finish 728 * %-ENOMEM: allocating a local resources failed 729 * %-EIO: a DMA mapping error occurred 730 */ 731 static int svc_rdma_build_read_segment(struct svc_rqst *rqstp, 732 struct svc_rdma_recv_ctxt *head, 733 const struct svc_rdma_segment *segment) 734 { 735 struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp); 736 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; 737 unsigned int sge_no, seg_len, len; 738 struct svc_rdma_rw_ctxt *ctxt; 739 struct scatterlist *sg; 740 int ret; 741 742 len = segment->rs_length; 743 sge_no = PAGE_ALIGN(head->rc_pageoff + len) >> PAGE_SHIFT; 744 ctxt = svc_rdma_get_rw_ctxt(rdma, sge_no); 745 if (!ctxt) 746 return -ENOMEM; 747 ctxt->rw_nents = sge_no; 748 749 sg = ctxt->rw_sg_table.sgl; 750 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { 751 seg_len = min_t(unsigned int, len, 752 PAGE_SIZE - head->rc_pageoff); 753 754 if (!head->rc_pageoff) 755 head->rc_page_count++; 756 757 sg_set_page(sg, rqstp->rq_pages[head->rc_curpage], 758 seg_len, head->rc_pageoff); 759 sg = sg_next(sg); 760 761 head->rc_pageoff += seg_len; 762 if (head->rc_pageoff == PAGE_SIZE) { 763 head->rc_curpage++; 764 head->rc_pageoff = 0; 765 } 766 len -= seg_len; 767 768 if (len && ((head->rc_curpage + 1) > ARRAY_SIZE(rqstp->rq_pages))) 769 goto out_overrun; 770 } 771 772 ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset, 773 segment->rs_handle, DMA_FROM_DEVICE); 774 if (ret < 0) 775 return -EIO; 776 percpu_counter_inc(&svcrdma_stat_read); 777 778 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 779 cc->cc_sqecount += ret; 780 return 0; 781 782 out_overrun: 783 trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage); 784 return -EINVAL; 785 } 786 787 /** 788 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk 789 * @rqstp: RPC transaction context 790 * @head: context for ongoing I/O 791 * @chunk: Read chunk to pull 792 * 793 * Return values: 794 * %0: the Read WR chain was constructed successfully 795 * %-EINVAL: there were not enough resources to finish 796 * %-ENOMEM: allocating a local resources failed 797 * %-EIO: a DMA mapping error occurred 798 */ 799 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, 800 struct svc_rdma_recv_ctxt *head, 801 const struct svc_rdma_chunk *chunk) 802 { 803 const struct svc_rdma_segment *segment; 804 int ret; 805 806 ret = -EINVAL; 807 pcl_for_each_segment(segment, chunk) { 808 ret = svc_rdma_build_read_segment(rqstp, head, segment); 809 if (ret < 0) 810 break; 811 head->rc_readbytes += segment->rs_length; 812 } 813 return ret; 814 } 815 816 /** 817 * svc_rdma_copy_inline_range - Copy part of the inline content into pages 818 * @rqstp: RPC transaction context 819 * @head: context for ongoing I/O 820 * @offset: offset into the Receive buffer of region to copy 821 * @remaining: length of region to copy 822 * 823 * Take a page at a time from rqstp->rq_pages and copy the inline 824 * content from the Receive buffer into that page. Update 825 * head->rc_curpage and head->rc_pageoff so that the next RDMA Read 826 * result will land contiguously with the copied content. 827 * 828 * Return values: 829 * %0: Inline content was successfully copied 830 * %-EINVAL: offset or length was incorrect 831 */ 832 static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp, 833 struct svc_rdma_recv_ctxt *head, 834 unsigned int offset, 835 unsigned int remaining) 836 { 837 unsigned char *dst, *src = head->rc_recv_buf; 838 unsigned int page_no, numpages; 839 840 numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT; 841 for (page_no = 0; page_no < numpages; page_no++) { 842 unsigned int page_len; 843 844 page_len = min_t(unsigned int, remaining, 845 PAGE_SIZE - head->rc_pageoff); 846 847 if (!head->rc_pageoff) 848 head->rc_page_count++; 849 850 dst = page_address(rqstp->rq_pages[head->rc_curpage]); 851 memcpy(dst + head->rc_curpage, src + offset, page_len); 852 853 head->rc_readbytes += page_len; 854 head->rc_pageoff += page_len; 855 if (head->rc_pageoff == PAGE_SIZE) { 856 head->rc_curpage++; 857 head->rc_pageoff = 0; 858 } 859 remaining -= page_len; 860 offset += page_len; 861 } 862 863 return -EINVAL; 864 } 865 866 /** 867 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks 868 * @rqstp: RPC transaction context 869 * @head: context for ongoing I/O 870 * 871 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages, 872 * like an incoming TCP call. 873 * 874 * Return values: 875 * %0: RDMA Read WQEs were successfully built 876 * %-EINVAL: client provided too many chunks or segments, 877 * %-ENOMEM: rdma_rw context pool was exhausted, 878 * %-ENOTCONN: posting failed (connection is lost), 879 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 880 */ 881 static noinline int 882 svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp, 883 struct svc_rdma_recv_ctxt *head) 884 { 885 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 886 struct svc_rdma_chunk *chunk, *next; 887 unsigned int start, length; 888 int ret; 889 890 start = 0; 891 chunk = pcl_first_chunk(pcl); 892 length = chunk->ch_position; 893 ret = svc_rdma_copy_inline_range(rqstp, head, start, length); 894 if (ret < 0) 895 return ret; 896 897 pcl_for_each_chunk(chunk, pcl) { 898 ret = svc_rdma_build_read_chunk(rqstp, head, chunk); 899 if (ret < 0) 900 return ret; 901 902 next = pcl_next_chunk(pcl, chunk); 903 if (!next) 904 break; 905 906 start += length; 907 length = next->ch_position - head->rc_readbytes; 908 ret = svc_rdma_copy_inline_range(rqstp, head, start, length); 909 if (ret < 0) 910 return ret; 911 } 912 913 start += length; 914 length = head->rc_byte_len - start; 915 return svc_rdma_copy_inline_range(rqstp, head, start, length); 916 } 917 918 /** 919 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks 920 * @rqstp: RPC transaction context 921 * @head: context for ongoing I/O 922 * 923 * The chunk data lands in the page list of rqstp->rq_arg.pages. 924 * 925 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec. 926 * Therefore, XDR round-up of the Read chunk and trailing 927 * inline content must both be added at the end of the pagelist. 928 * 929 * Return values: 930 * %0: RDMA Read WQEs were successfully built 931 * %-EINVAL: client provided too many chunks or segments, 932 * %-ENOMEM: rdma_rw context pool was exhausted, 933 * %-ENOTCONN: posting failed (connection is lost), 934 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 935 */ 936 static int svc_rdma_read_data_item(struct svc_rqst *rqstp, 937 struct svc_rdma_recv_ctxt *head) 938 { 939 return svc_rdma_build_read_chunk(rqstp, head, 940 pcl_first_chunk(&head->rc_read_pcl)); 941 } 942 943 /** 944 * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk 945 * @rqstp: RPC transaction context 946 * @head: context for ongoing I/O 947 * @chunk: parsed Call chunk to pull 948 * @offset: offset of region to pull 949 * @length: length of region to pull 950 * 951 * Return values: 952 * %0: RDMA Read WQEs were successfully built 953 * %-EINVAL: there were not enough resources to finish 954 * %-ENOMEM: rdma_rw context pool was exhausted, 955 * %-ENOTCONN: posting failed (connection is lost), 956 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 957 */ 958 static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp, 959 struct svc_rdma_recv_ctxt *head, 960 const struct svc_rdma_chunk *chunk, 961 unsigned int offset, unsigned int length) 962 { 963 const struct svc_rdma_segment *segment; 964 int ret; 965 966 ret = -EINVAL; 967 pcl_for_each_segment(segment, chunk) { 968 struct svc_rdma_segment dummy; 969 970 if (offset > segment->rs_length) { 971 offset -= segment->rs_length; 972 continue; 973 } 974 975 dummy.rs_handle = segment->rs_handle; 976 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset; 977 dummy.rs_offset = segment->rs_offset + offset; 978 979 ret = svc_rdma_build_read_segment(rqstp, head, &dummy); 980 if (ret < 0) 981 break; 982 983 head->rc_readbytes += dummy.rs_length; 984 length -= dummy.rs_length; 985 offset = 0; 986 } 987 return ret; 988 } 989 990 /** 991 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message 992 * @rqstp: RPC transaction context 993 * @head: context for ongoing I/O 994 * 995 * Return values: 996 * %0: RDMA Read WQEs were successfully built 997 * %-EINVAL: there were not enough resources to finish 998 * %-ENOMEM: rdma_rw context pool was exhausted, 999 * %-ENOTCONN: posting failed (connection is lost), 1000 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1001 */ 1002 static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp, 1003 struct svc_rdma_recv_ctxt *head) 1004 { 1005 const struct svc_rdma_chunk *call_chunk = 1006 pcl_first_chunk(&head->rc_call_pcl); 1007 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 1008 struct svc_rdma_chunk *chunk, *next; 1009 unsigned int start, length; 1010 int ret; 1011 1012 if (pcl_is_empty(pcl)) 1013 return svc_rdma_build_read_chunk(rqstp, head, call_chunk); 1014 1015 start = 0; 1016 chunk = pcl_first_chunk(pcl); 1017 length = chunk->ch_position; 1018 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1019 start, length); 1020 if (ret < 0) 1021 return ret; 1022 1023 pcl_for_each_chunk(chunk, pcl) { 1024 ret = svc_rdma_build_read_chunk(rqstp, head, chunk); 1025 if (ret < 0) 1026 return ret; 1027 1028 next = pcl_next_chunk(pcl, chunk); 1029 if (!next) 1030 break; 1031 1032 start += length; 1033 length = next->ch_position - head->rc_readbytes; 1034 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1035 start, length); 1036 if (ret < 0) 1037 return ret; 1038 } 1039 1040 start += length; 1041 length = call_chunk->ch_length - start; 1042 return svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1043 start, length); 1044 } 1045 1046 /** 1047 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message 1048 * @rqstp: RPC transaction context 1049 * @head: context for ongoing I/O 1050 * 1051 * The start of the data lands in the first page just after the 1052 * Transport header, and the rest lands in rqstp->rq_arg.pages. 1053 * 1054 * Assumptions: 1055 * - A PZRC is never sent in an RDMA_MSG message, though it's 1056 * allowed by spec. 1057 * 1058 * Return values: 1059 * %0: RDMA Read WQEs were successfully built 1060 * %-EINVAL: client provided too many chunks or segments, 1061 * %-ENOMEM: rdma_rw context pool was exhausted, 1062 * %-ENOTCONN: posting failed (connection is lost), 1063 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1064 */ 1065 static noinline int svc_rdma_read_special(struct svc_rqst *rqstp, 1066 struct svc_rdma_recv_ctxt *head) 1067 { 1068 return svc_rdma_read_call_chunk(rqstp, head); 1069 } 1070 1071 /* Pages under I/O have been copied to head->rc_pages. Ensure that 1072 * svc_xprt_release() does not put them when svc_rdma_recvfrom() 1073 * returns. This has to be done after all Read WRs are constructed 1074 * to properly handle a page that happens to be part of I/O on behalf 1075 * of two different RDMA segments. 1076 * 1077 * Note: if the subsequent post_send fails, these pages have already 1078 * been moved to head->rc_pages and thus will be cleaned up by 1079 * svc_rdma_recv_ctxt_put(). 1080 */ 1081 static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp, 1082 struct svc_rdma_recv_ctxt *head) 1083 { 1084 unsigned int i; 1085 1086 for (i = 0; i < head->rc_page_count; i++) { 1087 head->rc_pages[i] = rqstp->rq_pages[i]; 1088 rqstp->rq_pages[i] = NULL; 1089 } 1090 } 1091 1092 /** 1093 * svc_rdma_process_read_list - Pull list of Read chunks from the client 1094 * @rdma: controlling RDMA transport 1095 * @rqstp: set of pages to use as Read sink buffers 1096 * @head: pages under I/O collect here 1097 * 1098 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders 1099 * pull each Read chunk as they decode an incoming RPC message. 1100 * 1101 * On Linux, however, the server needs to have a fully-constructed RPC 1102 * message in rqstp->rq_arg when there is a positive return code from 1103 * ->xpo_recvfrom. So the Read list is safety-checked immediately when 1104 * it is received, then here the whole Read list is pulled all at once. 1105 * The ingress RPC message is fully reconstructed once all associated 1106 * RDMA Reads have completed. 1107 * 1108 * Return values: 1109 * %1: all needed RDMA Reads were posted successfully, 1110 * %-EINVAL: client provided too many chunks or segments, 1111 * %-ENOMEM: rdma_rw context pool was exhausted, 1112 * %-ENOTCONN: posting failed (connection is lost), 1113 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1114 */ 1115 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, 1116 struct svc_rqst *rqstp, 1117 struct svc_rdma_recv_ctxt *head) 1118 { 1119 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; 1120 int ret; 1121 1122 cc->cc_cqe.done = svc_rdma_wc_read_done; 1123 cc->cc_sqecount = 0; 1124 head->rc_pageoff = 0; 1125 head->rc_curpage = 0; 1126 head->rc_readbytes = 0; 1127 1128 if (pcl_is_empty(&head->rc_call_pcl)) { 1129 if (head->rc_read_pcl.cl_count == 1) 1130 ret = svc_rdma_read_data_item(rqstp, head); 1131 else 1132 ret = svc_rdma_read_multiple_chunks(rqstp, head); 1133 } else 1134 ret = svc_rdma_read_special(rqstp, head); 1135 svc_rdma_clear_rqst_pages(rqstp, head); 1136 if (ret < 0) 1137 return ret; 1138 1139 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); 1140 ret = svc_rdma_post_chunk_ctxt(rdma, cc); 1141 return ret < 0 ? ret : 1; 1142 } 1143