1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2016-2018 Oracle. All rights reserved. 4 * 5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. 6 */ 7 8 #include <linux/bvec.h> 9 #include <linux/overflow.h> 10 #include <rdma/rw.h> 11 12 #include <linux/sunrpc/xdr.h> 13 #include <linux/sunrpc/rpc_rdma.h> 14 #include <linux/sunrpc/svc_rdma.h> 15 16 #include "xprt_rdma.h" 17 #include <trace/events/rpcrdma.h> 18 19 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc); 20 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc); 21 22 /* Each R/W context contains state for one chain of RDMA Read or 23 * Write Work Requests. 24 * 25 * Each WR chain handles a single contiguous server-side buffer. 26 * - each xdr_buf iovec is a single contiguous buffer 27 * - the xdr_buf pages array is a single contiguous buffer because the 28 * second through the last element always start on a page boundary 29 * 30 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment 31 * from a client may contain a unique R_key, so each WR chain moves 32 * up to one segment at a time. 33 * 34 * The inline bvec array is sized to handle most I/O requests without 35 * additional allocation. Larger requests fall back to dynamic allocation. 36 * These contexts are created on demand, but cached and reused until 37 * the controlling svcxprt_rdma is destroyed. 38 */ 39 struct svc_rdma_rw_ctxt { 40 struct llist_node rw_node; 41 struct list_head rw_list; 42 struct rdma_rw_ctx rw_ctx; 43 unsigned int rw_nents; 44 unsigned int rw_first_bvec_nents; 45 struct bio_vec *rw_bvec; 46 struct bio_vec rw_first_bvec[]; 47 }; 48 49 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 50 struct svc_rdma_rw_ctxt *ctxt); 51 52 static inline struct svc_rdma_rw_ctxt * 53 svc_rdma_next_ctxt(struct list_head *list) 54 { 55 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, 56 rw_list); 57 } 58 59 static struct svc_rdma_rw_ctxt * 60 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int nr_bvec) 61 { 62 struct ib_device *dev = rdma->sc_cm_id->device; 63 unsigned int first_bvec_nents = dev->attrs.max_send_sge; 64 struct svc_rdma_rw_ctxt *ctxt; 65 struct llist_node *node; 66 67 spin_lock(&rdma->sc_rw_ctxt_lock); 68 node = llist_del_first(&rdma->sc_rw_ctxts); 69 spin_unlock(&rdma->sc_rw_ctxt_lock); 70 if (node) { 71 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 72 } else { 73 ctxt = kmalloc_node(struct_size(ctxt, rw_first_bvec, 74 first_bvec_nents), 75 GFP_KERNEL, ibdev_to_node(dev)); 76 if (!ctxt) 77 goto out_noctx; 78 79 INIT_LIST_HEAD(&ctxt->rw_list); 80 ctxt->rw_first_bvec_nents = first_bvec_nents; 81 } 82 83 if (nr_bvec <= ctxt->rw_first_bvec_nents) { 84 ctxt->rw_bvec = ctxt->rw_first_bvec; 85 } else { 86 ctxt->rw_bvec = kmalloc_array_node(nr_bvec, 87 sizeof(*ctxt->rw_bvec), 88 GFP_KERNEL, 89 ibdev_to_node(dev)); 90 if (!ctxt->rw_bvec) 91 goto out_free; 92 } 93 return ctxt; 94 95 out_free: 96 /* Return cached contexts to cache; free freshly allocated ones */ 97 if (node) 98 svc_rdma_put_rw_ctxt(rdma, ctxt); 99 else 100 kfree(ctxt); 101 out_noctx: 102 trace_svcrdma_rwctx_empty(rdma, nr_bvec); 103 return NULL; 104 } 105 106 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt, 107 struct llist_head *list) 108 { 109 if (ctxt->rw_bvec != ctxt->rw_first_bvec) 110 kfree(ctxt->rw_bvec); 111 llist_add(&ctxt->rw_node, list); 112 } 113 114 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 115 struct svc_rdma_rw_ctxt *ctxt) 116 { 117 __svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts); 118 } 119 120 /** 121 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts 122 * @rdma: transport about to be destroyed 123 * 124 */ 125 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) 126 { 127 struct svc_rdma_rw_ctxt *ctxt; 128 struct llist_node *node; 129 130 while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) { 131 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 132 kfree(ctxt); 133 } 134 } 135 136 /** 137 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O 138 * @rdma: controlling transport instance 139 * @ctxt: R/W context to prepare 140 * @offset: RDMA offset 141 * @handle: RDMA tag/handle 142 * @length: total number of bytes in the bvec array 143 * @direction: I/O direction 144 * 145 * Returns on success, the number of WQEs that will be needed 146 * on the workqueue, or a negative errno. 147 */ 148 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma, 149 struct svc_rdma_rw_ctxt *ctxt, 150 u64 offset, u32 handle, unsigned int length, 151 enum dma_data_direction direction) 152 { 153 struct bvec_iter iter = { 154 .bi_size = length, 155 }; 156 int ret; 157 158 ret = rdma_rw_ctx_init_bvec(&ctxt->rw_ctx, rdma->sc_qp, 159 rdma->sc_port_num, 160 ctxt->rw_bvec, ctxt->rw_nents, 161 iter, offset, handle, direction); 162 if (unlikely(ret < 0)) { 163 trace_svcrdma_dma_map_rw_err(rdma, offset, handle, 164 ctxt->rw_nents, ret); 165 svc_rdma_put_rw_ctxt(rdma, ctxt); 166 } 167 return ret; 168 } 169 170 /** 171 * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt 172 * @rdma: controlling transport instance 173 * @cc: svc_rdma_chunk_ctxt to be initialized 174 */ 175 void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 176 struct svc_rdma_chunk_ctxt *cc) 177 { 178 struct rpc_rdma_cid *cid = &cc->cc_cid; 179 180 if (unlikely(!cid->ci_completion_id)) 181 svc_rdma_send_cid_init(rdma, cid); 182 183 INIT_LIST_HEAD(&cc->cc_rwctxts); 184 cc->cc_sqecount = 0; 185 } 186 187 /** 188 * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt 189 * @rdma: controlling transport instance 190 * @cc: svc_rdma_chunk_ctxt to be released 191 * @dir: DMA direction 192 */ 193 void svc_rdma_cc_release(struct svcxprt_rdma *rdma, 194 struct svc_rdma_chunk_ctxt *cc, 195 enum dma_data_direction dir) 196 { 197 struct llist_node *first, *last; 198 struct svc_rdma_rw_ctxt *ctxt; 199 200 trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount); 201 202 first = last = NULL; 203 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { 204 list_del(&ctxt->rw_list); 205 206 rdma_rw_ctx_destroy_bvec(&ctxt->rw_ctx, rdma->sc_qp, 207 rdma->sc_port_num, 208 ctxt->rw_bvec, ctxt->rw_nents, dir); 209 if (ctxt->rw_bvec != ctxt->rw_first_bvec) 210 kfree(ctxt->rw_bvec); 211 212 ctxt->rw_node.next = first; 213 first = &ctxt->rw_node; 214 if (!last) 215 last = first; 216 } 217 if (first) 218 llist_add_batch(first, last, &rdma->sc_rw_ctxts); 219 } 220 221 static struct svc_rdma_write_info * 222 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, 223 const struct svc_rdma_chunk *chunk) 224 { 225 struct svc_rdma_write_info *info; 226 227 info = kzalloc_node(sizeof(*info), GFP_KERNEL, 228 ibdev_to_node(rdma->sc_cm_id->device)); 229 if (!info) 230 return info; 231 232 info->wi_rdma = rdma; 233 info->wi_chunk = chunk; 234 svc_rdma_cc_init(rdma, &info->wi_cc); 235 info->wi_cc.cc_cqe.done = svc_rdma_write_done; 236 return info; 237 } 238 239 static void svc_rdma_write_info_free_async(struct work_struct *work) 240 { 241 struct svc_rdma_write_info *info; 242 243 info = container_of(work, struct svc_rdma_write_info, wi_work); 244 svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE); 245 kfree(info); 246 } 247 248 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 249 { 250 INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async); 251 queue_work(svcrdma_wq, &info->wi_work); 252 } 253 254 /** 255 * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources 256 * @rdma: controlling transport 257 * @ctxt: Send context that is being released 258 */ 259 void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma, 260 struct svc_rdma_send_ctxt *ctxt) 261 { 262 struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc; 263 264 if (!cc->cc_sqecount) 265 return; 266 svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE); 267 } 268 269 /** 270 * svc_rdma_reply_done - Reply chunk Write completion handler 271 * @cq: controlling Completion Queue 272 * @wc: Work Completion report 273 * 274 * Pages under I/O are released by a subsequent Send completion. 275 */ 276 static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc) 277 { 278 struct ib_cqe *cqe = wc->wr_cqe; 279 struct svc_rdma_chunk_ctxt *cc = 280 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 281 struct svcxprt_rdma *rdma = cq->cq_context; 282 283 switch (wc->status) { 284 case IB_WC_SUCCESS: 285 trace_svcrdma_wc_reply(&cc->cc_cid); 286 return; 287 case IB_WC_WR_FLUSH_ERR: 288 trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid); 289 break; 290 default: 291 trace_svcrdma_wc_reply_err(wc, &cc->cc_cid); 292 } 293 294 svc_xprt_deferred_close(&rdma->sc_xprt); 295 } 296 297 /** 298 * svc_rdma_write_done - Write chunk completion 299 * @cq: controlling Completion Queue 300 * @wc: Work Completion 301 * 302 * Pages under I/O are freed by a subsequent Send completion. 303 */ 304 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) 305 { 306 struct svcxprt_rdma *rdma = cq->cq_context; 307 struct ib_cqe *cqe = wc->wr_cqe; 308 struct svc_rdma_chunk_ctxt *cc = 309 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 310 struct svc_rdma_write_info *info = 311 container_of(cc, struct svc_rdma_write_info, wi_cc); 312 313 switch (wc->status) { 314 case IB_WC_SUCCESS: 315 trace_svcrdma_wc_write(&cc->cc_cid); 316 break; 317 case IB_WC_WR_FLUSH_ERR: 318 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid); 319 break; 320 default: 321 trace_svcrdma_wc_write_err(wc, &cc->cc_cid); 322 } 323 324 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); 325 326 if (unlikely(wc->status != IB_WC_SUCCESS)) 327 svc_xprt_deferred_close(&rdma->sc_xprt); 328 329 svc_rdma_write_info_free(info); 330 } 331 332 /** 333 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx 334 * @cq: controlling Completion Queue 335 * @wc: Work Completion 336 * 337 */ 338 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) 339 { 340 struct svcxprt_rdma *rdma = cq->cq_context; 341 struct ib_cqe *cqe = wc->wr_cqe; 342 struct svc_rdma_chunk_ctxt *cc = 343 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 344 struct svc_rdma_recv_ctxt *ctxt; 345 346 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); 347 348 ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc); 349 switch (wc->status) { 350 case IB_WC_SUCCESS: 351 trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes, 352 cc->cc_posttime); 353 354 spin_lock(&rdma->sc_rq_dto_lock); 355 list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q); 356 /* the unlock pairs with the smp_rmb in svc_xprt_ready */ 357 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); 358 spin_unlock(&rdma->sc_rq_dto_lock); 359 svc_xprt_enqueue(&rdma->sc_xprt); 360 return; 361 case IB_WC_WR_FLUSH_ERR: 362 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid); 363 break; 364 default: 365 trace_svcrdma_wc_read_err(wc, &cc->cc_cid); 366 } 367 368 /* The RDMA Read has flushed, so the incoming RPC message 369 * cannot be constructed and must be dropped. Signal the 370 * loss to the client by closing the connection. 371 */ 372 svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE); 373 svc_rdma_recv_ctxt_put(rdma, ctxt); 374 svc_xprt_deferred_close(&rdma->sc_xprt); 375 } 376 377 /* 378 * Assumptions: 379 * - If ib_post_send() succeeds, only one completion is expected, 380 * even if one or more WRs are flushed. This is true when posting 381 * an rdma_rw_ctx or when posting a single signaled WR. 382 */ 383 static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma, 384 struct svc_rdma_chunk_ctxt *cc) 385 { 386 struct ib_send_wr *first_wr; 387 const struct ib_send_wr *bad_wr; 388 struct list_head *tmp; 389 struct ib_cqe *cqe; 390 int ret; 391 392 might_sleep(); 393 394 if (cc->cc_sqecount > rdma->sc_sq_depth) 395 return -EINVAL; 396 397 first_wr = NULL; 398 cqe = &cc->cc_cqe; 399 list_for_each(tmp, &cc->cc_rwctxts) { 400 struct svc_rdma_rw_ctxt *ctxt; 401 402 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); 403 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, 404 rdma->sc_port_num, cqe, first_wr); 405 cqe = NULL; 406 } 407 408 do { 409 if (atomic_sub_return(cc->cc_sqecount, 410 &rdma->sc_sq_avail) > 0) { 411 cc->cc_posttime = ktime_get(); 412 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); 413 if (ret) 414 break; 415 return 0; 416 } 417 418 percpu_counter_inc(&svcrdma_stat_sq_starve); 419 trace_svcrdma_sq_full(rdma, &cc->cc_cid); 420 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 421 wait_event(rdma->sc_send_wait, 422 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); 423 trace_svcrdma_sq_retry(rdma, &cc->cc_cid); 424 } while (1); 425 426 trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret); 427 svc_xprt_deferred_close(&rdma->sc_xprt); 428 429 /* If even one was posted, there will be a completion. */ 430 if (bad_wr != first_wr) 431 return 0; 432 433 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 434 wake_up(&rdma->sc_send_wait); 435 return -ENOTCONN; 436 } 437 438 /* Build a bvec that covers one kvec in an xdr_buf. 439 */ 440 static void svc_rdma_vec_to_bvec(struct svc_rdma_write_info *info, 441 unsigned int len, 442 struct svc_rdma_rw_ctxt *ctxt) 443 { 444 bvec_set_virt(&ctxt->rw_bvec[0], info->wi_base, len); 445 info->wi_base += len; 446 447 ctxt->rw_nents = 1; 448 } 449 450 /* Build a bvec array that covers part of an xdr_buf's pagelist. 451 */ 452 static void svc_rdma_pagelist_to_bvec(struct svc_rdma_write_info *info, 453 unsigned int remaining, 454 struct svc_rdma_rw_ctxt *ctxt) 455 { 456 unsigned int bvec_idx, bvec_len, page_off, page_no; 457 const struct xdr_buf *xdr = info->wi_xdr; 458 struct page **page; 459 460 page_off = info->wi_next_off + xdr->page_base; 461 page_no = page_off >> PAGE_SHIFT; 462 page_off = offset_in_page(page_off); 463 page = xdr->pages + page_no; 464 info->wi_next_off += remaining; 465 bvec_idx = 0; 466 do { 467 bvec_len = min_t(unsigned int, remaining, 468 PAGE_SIZE - page_off); 469 bvec_set_page(&ctxt->rw_bvec[bvec_idx], *page, bvec_len, 470 page_off); 471 remaining -= bvec_len; 472 page_off = 0; 473 bvec_idx++; 474 page++; 475 } while (remaining); 476 477 ctxt->rw_nents = bvec_idx; 478 } 479 480 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing 481 * an RPC Reply. 482 */ 483 static int 484 svc_rdma_build_writes(struct svc_rdma_write_info *info, 485 void (*constructor)(struct svc_rdma_write_info *info, 486 unsigned int len, 487 struct svc_rdma_rw_ctxt *ctxt), 488 unsigned int remaining) 489 { 490 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 491 struct svcxprt_rdma *rdma = info->wi_rdma; 492 const struct svc_rdma_segment *seg; 493 struct svc_rdma_rw_ctxt *ctxt; 494 int ret; 495 496 do { 497 unsigned int write_len; 498 u64 offset; 499 500 if (info->wi_seg_no >= info->wi_chunk->ch_segcount) 501 goto out_overflow; 502 503 seg = &info->wi_chunk->ch_segments[info->wi_seg_no]; 504 write_len = min(remaining, seg->rs_length - info->wi_seg_off); 505 if (!write_len) 506 goto out_overflow; 507 ctxt = svc_rdma_get_rw_ctxt(rdma, 508 (write_len >> PAGE_SHIFT) + 2); 509 if (!ctxt) 510 return -ENOMEM; 511 512 constructor(info, write_len, ctxt); 513 offset = seg->rs_offset + info->wi_seg_off; 514 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle, 515 write_len, DMA_TO_DEVICE); 516 if (ret < 0) 517 return -EIO; 518 percpu_counter_inc(&svcrdma_stat_write); 519 520 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 521 cc->cc_sqecount += ret; 522 if (write_len == seg->rs_length - info->wi_seg_off) { 523 info->wi_seg_no++; 524 info->wi_seg_off = 0; 525 } else { 526 info->wi_seg_off += write_len; 527 } 528 remaining -= write_len; 529 } while (remaining); 530 531 return 0; 532 533 out_overflow: 534 trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no, 535 info->wi_chunk->ch_segcount); 536 return -E2BIG; 537 } 538 539 /** 540 * svc_rdma_iov_write - Construct RDMA Writes from an iov 541 * @info: pointer to write arguments 542 * @iov: kvec to write 543 * 544 * Returns: 545 * On success, returns zero 546 * %-E2BIG if the client-provided Write chunk is too small 547 * %-ENOMEM if a resource has been exhausted 548 * %-EIO if an rdma-rw error occurred 549 */ 550 static int svc_rdma_iov_write(struct svc_rdma_write_info *info, 551 const struct kvec *iov) 552 { 553 info->wi_base = iov->iov_base; 554 return svc_rdma_build_writes(info, svc_rdma_vec_to_bvec, 555 iov->iov_len); 556 } 557 558 /** 559 * svc_rdma_pages_write - Construct RDMA Writes from pages 560 * @info: pointer to write arguments 561 * @xdr: xdr_buf with pages to write 562 * @offset: offset into the content of @xdr 563 * @length: number of bytes to write 564 * 565 * Returns: 566 * On success, returns zero 567 * %-E2BIG if the client-provided Write chunk is too small 568 * %-ENOMEM if a resource has been exhausted 569 * %-EIO if an rdma-rw error occurred 570 */ 571 static int svc_rdma_pages_write(struct svc_rdma_write_info *info, 572 const struct xdr_buf *xdr, 573 unsigned int offset, 574 unsigned long length) 575 { 576 info->wi_xdr = xdr; 577 info->wi_next_off = offset - xdr->head[0].iov_len; 578 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_bvec, 579 length); 580 } 581 582 /** 583 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf 584 * @xdr: xdr_buf to write 585 * @data: pointer to write arguments 586 * 587 * Returns: 588 * On success, returns zero 589 * %-E2BIG if the client-provided Write chunk is too small 590 * %-ENOMEM if a resource has been exhausted 591 * %-EIO if an rdma-rw error occurred 592 */ 593 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data) 594 { 595 struct svc_rdma_write_info *info = data; 596 int ret; 597 598 if (xdr->head[0].iov_len) { 599 ret = svc_rdma_iov_write(info, &xdr->head[0]); 600 if (ret < 0) 601 return ret; 602 } 603 604 if (xdr->page_len) { 605 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len, 606 xdr->page_len); 607 if (ret < 0) 608 return ret; 609 } 610 611 if (xdr->tail[0].iov_len) { 612 ret = svc_rdma_iov_write(info, &xdr->tail[0]); 613 if (ret < 0) 614 return ret; 615 } 616 617 return xdr->len; 618 } 619 620 static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, 621 const struct svc_rdma_chunk *chunk, 622 const struct xdr_buf *xdr) 623 { 624 struct svc_rdma_write_info *info; 625 struct svc_rdma_chunk_ctxt *cc; 626 struct xdr_buf payload; 627 int ret; 628 629 if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position, 630 chunk->ch_payload_length)) 631 return -EMSGSIZE; 632 633 info = svc_rdma_write_info_alloc(rdma, chunk); 634 if (!info) 635 return -ENOMEM; 636 cc = &info->wi_cc; 637 638 ret = svc_rdma_xb_write(&payload, info); 639 if (ret != payload.len) 640 goto out_err; 641 642 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); 643 ret = svc_rdma_post_chunk_ctxt(rdma, cc); 644 if (ret < 0) 645 goto out_err; 646 return 0; 647 648 out_err: 649 svc_rdma_write_info_free(info); 650 return ret; 651 } 652 653 /** 654 * svc_rdma_send_write_list - Send all chunks on the Write list 655 * @rdma: controlling RDMA transport 656 * @rctxt: Write list provisioned by the client 657 * @xdr: xdr_buf containing an RPC Reply message 658 * 659 * Returns zero on success, or a negative errno if one or more 660 * Write chunks could not be sent. 661 */ 662 int svc_rdma_send_write_list(struct svcxprt_rdma *rdma, 663 const struct svc_rdma_recv_ctxt *rctxt, 664 const struct xdr_buf *xdr) 665 { 666 struct svc_rdma_chunk *chunk; 667 int ret; 668 669 pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) { 670 if (!chunk->ch_payload_length) 671 break; 672 ret = svc_rdma_send_write_chunk(rdma, chunk, xdr); 673 if (ret < 0) 674 return ret; 675 } 676 return 0; 677 } 678 679 /** 680 * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk 681 * @rdma: controlling RDMA transport 682 * @write_pcl: Write chunk list provided by client 683 * @reply_pcl: Reply chunk provided by client 684 * @sctxt: Send WR resources 685 * @xdr: xdr_buf containing an RPC Reply 686 * 687 * Returns a non-negative number of bytes the chunk consumed, or 688 * %-E2BIG if the payload was larger than the Reply chunk, 689 * %-EINVAL if client provided too many segments, 690 * %-ENOMEM if rdma_rw context pool was exhausted, 691 * %-ENOTCONN if posting failed (connection is lost), 692 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 693 */ 694 int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma, 695 const struct svc_rdma_pcl *write_pcl, 696 const struct svc_rdma_pcl *reply_pcl, 697 struct svc_rdma_send_ctxt *sctxt, 698 const struct xdr_buf *xdr) 699 { 700 struct svc_rdma_write_info *info = &sctxt->sc_reply_info; 701 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 702 struct ib_send_wr *first_wr; 703 struct list_head *pos; 704 struct ib_cqe *cqe; 705 int ret; 706 707 info->wi_rdma = rdma; 708 info->wi_chunk = pcl_first_chunk(reply_pcl); 709 info->wi_seg_off = 0; 710 info->wi_seg_no = 0; 711 info->wi_cc.cc_cqe.done = svc_rdma_reply_done; 712 713 ret = pcl_process_nonpayloads(write_pcl, xdr, 714 svc_rdma_xb_write, info); 715 if (ret < 0) 716 return ret; 717 718 first_wr = sctxt->sc_wr_chain; 719 cqe = &cc->cc_cqe; 720 list_for_each(pos, &cc->cc_rwctxts) { 721 struct svc_rdma_rw_ctxt *rwc; 722 723 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list); 724 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp, 725 rdma->sc_port_num, cqe, first_wr); 726 cqe = NULL; 727 } 728 sctxt->sc_wr_chain = first_wr; 729 sctxt->sc_sqecount += cc->cc_sqecount; 730 731 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); 732 return xdr->len; 733 } 734 735 /** 736 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment 737 * @rqstp: RPC transaction context 738 * @head: context for ongoing I/O 739 * @segment: co-ordinates of remote memory to be read 740 * 741 * Returns: 742 * %0: the Read WR chain was constructed successfully 743 * %-EINVAL: there were not enough rq_pages to finish 744 * %-ENOMEM: allocating a local resources failed 745 * %-EIO: a DMA mapping error occurred 746 */ 747 static int svc_rdma_build_read_segment(struct svc_rqst *rqstp, 748 struct svc_rdma_recv_ctxt *head, 749 const struct svc_rdma_segment *segment) 750 { 751 struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp); 752 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; 753 unsigned int bvec_idx, nr_bvec, seg_len, len, total; 754 struct svc_rdma_rw_ctxt *ctxt; 755 int ret; 756 757 len = segment->rs_length; 758 if (check_add_overflow(head->rc_pageoff, len, &total)) 759 return -EINVAL; 760 nr_bvec = PAGE_ALIGN(total) >> PAGE_SHIFT; 761 ctxt = svc_rdma_get_rw_ctxt(rdma, nr_bvec); 762 if (!ctxt) 763 return -ENOMEM; 764 ctxt->rw_nents = nr_bvec; 765 766 for (bvec_idx = 0; bvec_idx < ctxt->rw_nents; bvec_idx++) { 767 seg_len = min_t(unsigned int, len, 768 PAGE_SIZE - head->rc_pageoff); 769 770 if (!head->rc_pageoff) 771 head->rc_page_count++; 772 773 bvec_set_page(&ctxt->rw_bvec[bvec_idx], 774 rqstp->rq_pages[head->rc_curpage], 775 seg_len, head->rc_pageoff); 776 777 head->rc_pageoff += seg_len; 778 if (head->rc_pageoff == PAGE_SIZE) { 779 head->rc_curpage++; 780 head->rc_pageoff = 0; 781 } 782 len -= seg_len; 783 784 if (len && ((head->rc_curpage + 1) > rqstp->rq_maxpages)) 785 goto out_overrun; 786 } 787 788 ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset, 789 segment->rs_handle, segment->rs_length, 790 DMA_FROM_DEVICE); 791 if (ret < 0) 792 return -EIO; 793 percpu_counter_inc(&svcrdma_stat_read); 794 795 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 796 cc->cc_sqecount += ret; 797 return 0; 798 799 out_overrun: 800 trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage); 801 return -EINVAL; 802 } 803 804 /** 805 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk 806 * @rqstp: RPC transaction context 807 * @head: context for ongoing I/O 808 * @chunk: Read chunk to pull 809 * 810 * Return values: 811 * %0: the Read WR chain was constructed successfully 812 * %-EINVAL: there were not enough resources to finish 813 * %-ENOMEM: allocating a local resources failed 814 * %-EIO: a DMA mapping error occurred 815 */ 816 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, 817 struct svc_rdma_recv_ctxt *head, 818 const struct svc_rdma_chunk *chunk) 819 { 820 const struct svc_rdma_segment *segment; 821 int ret; 822 823 ret = -EINVAL; 824 pcl_for_each_segment(segment, chunk) { 825 ret = svc_rdma_build_read_segment(rqstp, head, segment); 826 if (ret < 0) 827 break; 828 head->rc_readbytes += segment->rs_length; 829 } 830 return ret; 831 } 832 833 /** 834 * svc_rdma_copy_inline_range - Copy part of the inline content into pages 835 * @rqstp: RPC transaction context 836 * @head: context for ongoing I/O 837 * @offset: offset into the Receive buffer of region to copy 838 * @remaining: length of region to copy 839 * 840 * Take a page at a time from rqstp->rq_pages and copy the inline 841 * content from the Receive buffer into that page. Update 842 * head->rc_curpage and head->rc_pageoff so that the next RDMA Read 843 * result will land contiguously with the copied content. 844 * 845 * Return values: 846 * %0: Inline content was successfully copied 847 * %-EINVAL: offset or length was incorrect 848 */ 849 static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp, 850 struct svc_rdma_recv_ctxt *head, 851 unsigned int offset, 852 unsigned int remaining) 853 { 854 unsigned char *dst, *src = head->rc_recv_buf; 855 unsigned int page_no, numpages; 856 857 numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT; 858 for (page_no = 0; page_no < numpages; page_no++) { 859 unsigned int page_len; 860 861 if (head->rc_curpage >= rqstp->rq_maxpages) 862 return -EINVAL; 863 864 page_len = min_t(unsigned int, remaining, 865 PAGE_SIZE - head->rc_pageoff); 866 867 if (!head->rc_pageoff) 868 head->rc_page_count++; 869 870 dst = page_address(rqstp->rq_pages[head->rc_curpage]); 871 memcpy((unsigned char *)dst + head->rc_pageoff, src + offset, page_len); 872 873 head->rc_readbytes += page_len; 874 head->rc_pageoff += page_len; 875 if (head->rc_pageoff == PAGE_SIZE) { 876 head->rc_curpage++; 877 head->rc_pageoff = 0; 878 } 879 remaining -= page_len; 880 offset += page_len; 881 } 882 883 return 0; 884 } 885 886 /** 887 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks 888 * @rqstp: RPC transaction context 889 * @head: context for ongoing I/O 890 * 891 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages, 892 * like an incoming TCP call. 893 * 894 * Return values: 895 * %0: RDMA Read WQEs were successfully built 896 * %-EINVAL: client provided too many chunks or segments, 897 * %-ENOMEM: rdma_rw context pool was exhausted, 898 * %-ENOTCONN: posting failed (connection is lost), 899 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 900 */ 901 static noinline int 902 svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp, 903 struct svc_rdma_recv_ctxt *head) 904 { 905 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 906 struct svc_rdma_chunk *chunk, *next; 907 unsigned int start, length; 908 int ret; 909 910 start = 0; 911 chunk = pcl_first_chunk(pcl); 912 length = chunk->ch_position; 913 ret = svc_rdma_copy_inline_range(rqstp, head, start, length); 914 if (ret < 0) 915 return ret; 916 917 pcl_for_each_chunk(chunk, pcl) { 918 ret = svc_rdma_build_read_chunk(rqstp, head, chunk); 919 if (ret < 0) 920 return ret; 921 922 next = pcl_next_chunk(pcl, chunk); 923 if (!next) 924 break; 925 926 start += length; 927 length = next->ch_position - head->rc_readbytes; 928 ret = svc_rdma_copy_inline_range(rqstp, head, start, length); 929 if (ret < 0) 930 return ret; 931 } 932 933 start += length; 934 length = head->rc_byte_len - start; 935 return svc_rdma_copy_inline_range(rqstp, head, start, length); 936 } 937 938 /** 939 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks 940 * @rqstp: RPC transaction context 941 * @head: context for ongoing I/O 942 * 943 * The chunk data lands in the page list of rqstp->rq_arg.pages. 944 * 945 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec. 946 * Therefore, XDR round-up of the Read chunk and trailing 947 * inline content must both be added at the end of the pagelist. 948 * 949 * Return values: 950 * %0: RDMA Read WQEs were successfully built 951 * %-EINVAL: client provided too many chunks or segments, 952 * %-ENOMEM: rdma_rw context pool was exhausted, 953 * %-ENOTCONN: posting failed (connection is lost), 954 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 955 */ 956 static int svc_rdma_read_data_item(struct svc_rqst *rqstp, 957 struct svc_rdma_recv_ctxt *head) 958 { 959 return svc_rdma_build_read_chunk(rqstp, head, 960 pcl_first_chunk(&head->rc_read_pcl)); 961 } 962 963 /** 964 * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk 965 * @rqstp: RPC transaction context 966 * @head: context for ongoing I/O 967 * @chunk: parsed Call chunk to pull 968 * @offset: offset of region to pull 969 * @length: length of region to pull 970 * 971 * Return values: 972 * %0: RDMA Read WQEs were successfully built 973 * %-EINVAL: there were not enough resources to finish 974 * %-ENOMEM: rdma_rw context pool was exhausted, 975 * %-ENOTCONN: posting failed (connection is lost), 976 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 977 */ 978 static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp, 979 struct svc_rdma_recv_ctxt *head, 980 const struct svc_rdma_chunk *chunk, 981 unsigned int offset, unsigned int length) 982 { 983 const struct svc_rdma_segment *segment; 984 int ret; 985 986 ret = -EINVAL; 987 pcl_for_each_segment(segment, chunk) { 988 struct svc_rdma_segment dummy; 989 990 if (offset > segment->rs_length) { 991 offset -= segment->rs_length; 992 continue; 993 } 994 995 dummy.rs_handle = segment->rs_handle; 996 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset; 997 dummy.rs_offset = segment->rs_offset + offset; 998 999 ret = svc_rdma_build_read_segment(rqstp, head, &dummy); 1000 if (ret < 0) 1001 break; 1002 1003 head->rc_readbytes += dummy.rs_length; 1004 length -= dummy.rs_length; 1005 offset = 0; 1006 } 1007 return ret; 1008 } 1009 1010 /** 1011 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message 1012 * @rqstp: RPC transaction context 1013 * @head: context for ongoing I/O 1014 * 1015 * Return values: 1016 * %0: RDMA Read WQEs were successfully built 1017 * %-EINVAL: there were not enough resources to finish 1018 * %-ENOMEM: rdma_rw context pool was exhausted, 1019 * %-ENOTCONN: posting failed (connection is lost), 1020 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1021 */ 1022 static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp, 1023 struct svc_rdma_recv_ctxt *head) 1024 { 1025 const struct svc_rdma_chunk *call_chunk = 1026 pcl_first_chunk(&head->rc_call_pcl); 1027 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 1028 struct svc_rdma_chunk *chunk, *next; 1029 unsigned int start, length; 1030 int ret; 1031 1032 if (pcl_is_empty(pcl)) 1033 return svc_rdma_build_read_chunk(rqstp, head, call_chunk); 1034 1035 start = 0; 1036 chunk = pcl_first_chunk(pcl); 1037 length = chunk->ch_position; 1038 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1039 start, length); 1040 if (ret < 0) 1041 return ret; 1042 1043 pcl_for_each_chunk(chunk, pcl) { 1044 ret = svc_rdma_build_read_chunk(rqstp, head, chunk); 1045 if (ret < 0) 1046 return ret; 1047 1048 next = pcl_next_chunk(pcl, chunk); 1049 if (!next) 1050 break; 1051 1052 start += length; 1053 length = next->ch_position - head->rc_readbytes; 1054 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1055 start, length); 1056 if (ret < 0) 1057 return ret; 1058 } 1059 1060 start += length; 1061 length = call_chunk->ch_length - start; 1062 return svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1063 start, length); 1064 } 1065 1066 /** 1067 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message 1068 * @rqstp: RPC transaction context 1069 * @head: context for ongoing I/O 1070 * 1071 * The start of the data lands in the first page just after the 1072 * Transport header, and the rest lands in rqstp->rq_arg.pages. 1073 * 1074 * Assumptions: 1075 * - A PZRC is never sent in an RDMA_MSG message, though it's 1076 * allowed by spec. 1077 * 1078 * Return values: 1079 * %0: RDMA Read WQEs were successfully built 1080 * %-EINVAL: client provided too many chunks or segments, 1081 * %-ENOMEM: rdma_rw context pool was exhausted, 1082 * %-ENOTCONN: posting failed (connection is lost), 1083 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1084 */ 1085 static noinline int svc_rdma_read_special(struct svc_rqst *rqstp, 1086 struct svc_rdma_recv_ctxt *head) 1087 { 1088 return svc_rdma_read_call_chunk(rqstp, head); 1089 } 1090 1091 /* Pages under I/O have been copied to head->rc_pages. Ensure that 1092 * svc_xprt_release() does not put them when svc_rdma_recvfrom() 1093 * returns. This has to be done after all Read WRs are constructed 1094 * to properly handle a page that happens to be part of I/O on behalf 1095 * of two different RDMA segments. 1096 * 1097 * Note: if the subsequent post_send fails, these pages have already 1098 * been moved to head->rc_pages and thus will be cleaned up by 1099 * svc_rdma_recv_ctxt_put(). 1100 */ 1101 static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp, 1102 struct svc_rdma_recv_ctxt *head) 1103 { 1104 unsigned int i; 1105 1106 for (i = 0; i < head->rc_page_count; i++) { 1107 head->rc_pages[i] = rqstp->rq_pages[i]; 1108 rqstp->rq_pages[i] = NULL; 1109 } 1110 } 1111 1112 /** 1113 * svc_rdma_process_read_list - Pull list of Read chunks from the client 1114 * @rdma: controlling RDMA transport 1115 * @rqstp: set of pages to use as Read sink buffers 1116 * @head: pages under I/O collect here 1117 * 1118 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders 1119 * pull each Read chunk as they decode an incoming RPC message. 1120 * 1121 * On Linux, however, the server needs to have a fully-constructed RPC 1122 * message in rqstp->rq_arg when there is a positive return code from 1123 * ->xpo_recvfrom. So the Read list is safety-checked immediately when 1124 * it is received, then here the whole Read list is pulled all at once. 1125 * The ingress RPC message is fully reconstructed once all associated 1126 * RDMA Reads have completed. 1127 * 1128 * Return values: 1129 * %1: all needed RDMA Reads were posted successfully, 1130 * %-EINVAL: client provided too many chunks or segments, 1131 * %-ENOMEM: rdma_rw context pool was exhausted, 1132 * %-ENOTCONN: posting failed (connection is lost), 1133 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1134 */ 1135 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, 1136 struct svc_rqst *rqstp, 1137 struct svc_rdma_recv_ctxt *head) 1138 { 1139 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; 1140 int ret; 1141 1142 cc->cc_cqe.done = svc_rdma_wc_read_done; 1143 cc->cc_sqecount = 0; 1144 head->rc_pageoff = 0; 1145 head->rc_curpage = 0; 1146 head->rc_readbytes = 0; 1147 1148 if (pcl_is_empty(&head->rc_call_pcl)) { 1149 if (head->rc_read_pcl.cl_count == 1) 1150 ret = svc_rdma_read_data_item(rqstp, head); 1151 else 1152 ret = svc_rdma_read_multiple_chunks(rqstp, head); 1153 } else 1154 ret = svc_rdma_read_special(rqstp, head); 1155 svc_rdma_clear_rqst_pages(rqstp, head); 1156 if (ret < 0) 1157 return ret; 1158 1159 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); 1160 ret = svc_rdma_post_chunk_ctxt(rdma, cc); 1161 return ret < 0 ? ret : 1; 1162 } 1163