1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2016-2018 Oracle. All rights reserved. 4 * 5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. 6 */ 7 8 #include <rdma/rw.h> 9 10 #include <linux/sunrpc/xdr.h> 11 #include <linux/sunrpc/rpc_rdma.h> 12 #include <linux/sunrpc/svc_rdma.h> 13 14 #include "xprt_rdma.h" 15 #include <trace/events/rpcrdma.h> 16 17 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc); 18 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc); 19 20 /* Each R/W context contains state for one chain of RDMA Read or 21 * Write Work Requests. 22 * 23 * Each WR chain handles a single contiguous server-side buffer, 24 * because scatterlist entries after the first have to start on 25 * page alignment. xdr_buf iovecs cannot guarantee alignment. 26 * 27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment 28 * from a client may contain a unique R_key, so each WR chain moves 29 * up to one segment at a time. 30 * 31 * The scatterlist makes this data structure over 4KB in size. To 32 * make it less likely to fail, and to handle the allocation for 33 * smaller I/O requests without disabling bottom-halves, these 34 * contexts are created on demand, but cached and reused until the 35 * controlling svcxprt_rdma is destroyed. 36 */ 37 struct svc_rdma_rw_ctxt { 38 struct llist_node rw_node; 39 struct list_head rw_list; 40 struct rdma_rw_ctx rw_ctx; 41 unsigned int rw_nents; 42 unsigned int rw_first_sgl_nents; 43 struct sg_table rw_sg_table; 44 struct scatterlist rw_first_sgl[]; 45 }; 46 47 static inline struct svc_rdma_rw_ctxt * 48 svc_rdma_next_ctxt(struct list_head *list) 49 { 50 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, 51 rw_list); 52 } 53 54 static struct svc_rdma_rw_ctxt * 55 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) 56 { 57 struct ib_device *dev = rdma->sc_cm_id->device; 58 unsigned int first_sgl_nents = dev->attrs.max_send_sge; 59 struct svc_rdma_rw_ctxt *ctxt; 60 struct llist_node *node; 61 62 spin_lock(&rdma->sc_rw_ctxt_lock); 63 node = llist_del_first(&rdma->sc_rw_ctxts); 64 spin_unlock(&rdma->sc_rw_ctxt_lock); 65 if (node) { 66 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 67 } else { 68 ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, first_sgl_nents), 69 GFP_KERNEL, ibdev_to_node(dev)); 70 if (!ctxt) 71 goto out_noctx; 72 73 INIT_LIST_HEAD(&ctxt->rw_list); 74 ctxt->rw_first_sgl_nents = first_sgl_nents; 75 } 76 77 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; 78 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, 79 ctxt->rw_sg_table.sgl, 80 first_sgl_nents)) 81 goto out_free; 82 return ctxt; 83 84 out_free: 85 kfree(ctxt); 86 out_noctx: 87 trace_svcrdma_rwctx_empty(rdma, sges); 88 return NULL; 89 } 90 91 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt, 92 struct llist_head *list) 93 { 94 sg_free_table_chained(&ctxt->rw_sg_table, ctxt->rw_first_sgl_nents); 95 llist_add(&ctxt->rw_node, list); 96 } 97 98 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 99 struct svc_rdma_rw_ctxt *ctxt) 100 { 101 __svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts); 102 } 103 104 /** 105 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts 106 * @rdma: transport about to be destroyed 107 * 108 */ 109 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) 110 { 111 struct svc_rdma_rw_ctxt *ctxt; 112 struct llist_node *node; 113 114 while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) { 115 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 116 kfree(ctxt); 117 } 118 } 119 120 /** 121 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O 122 * @rdma: controlling transport instance 123 * @ctxt: R/W context to prepare 124 * @offset: RDMA offset 125 * @handle: RDMA tag/handle 126 * @direction: I/O direction 127 * 128 * Returns on success, the number of WQEs that will be needed 129 * on the workqueue, or a negative errno. 130 */ 131 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma, 132 struct svc_rdma_rw_ctxt *ctxt, 133 u64 offset, u32 handle, 134 enum dma_data_direction direction) 135 { 136 int ret; 137 138 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num, 139 ctxt->rw_sg_table.sgl, ctxt->rw_nents, 140 0, offset, handle, direction); 141 if (unlikely(ret < 0)) { 142 trace_svcrdma_dma_map_rw_err(rdma, offset, handle, 143 ctxt->rw_nents, ret); 144 svc_rdma_put_rw_ctxt(rdma, ctxt); 145 } 146 return ret; 147 } 148 149 /** 150 * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt 151 * @rdma: controlling transport instance 152 * @cc: svc_rdma_chunk_ctxt to be initialized 153 */ 154 void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 155 struct svc_rdma_chunk_ctxt *cc) 156 { 157 struct rpc_rdma_cid *cid = &cc->cc_cid; 158 159 if (unlikely(!cid->ci_completion_id)) 160 svc_rdma_send_cid_init(rdma, cid); 161 162 INIT_LIST_HEAD(&cc->cc_rwctxts); 163 cc->cc_sqecount = 0; 164 } 165 166 /** 167 * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt 168 * @rdma: controlling transport instance 169 * @cc: svc_rdma_chunk_ctxt to be released 170 * @dir: DMA direction 171 */ 172 void svc_rdma_cc_release(struct svcxprt_rdma *rdma, 173 struct svc_rdma_chunk_ctxt *cc, 174 enum dma_data_direction dir) 175 { 176 struct llist_node *first, *last; 177 struct svc_rdma_rw_ctxt *ctxt; 178 LLIST_HEAD(free); 179 180 trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount); 181 182 first = last = NULL; 183 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { 184 list_del(&ctxt->rw_list); 185 186 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, 187 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 188 ctxt->rw_nents, dir); 189 __svc_rdma_put_rw_ctxt(ctxt, &free); 190 191 ctxt->rw_node.next = first; 192 first = &ctxt->rw_node; 193 if (!last) 194 last = first; 195 } 196 if (first) 197 llist_add_batch(first, last, &rdma->sc_rw_ctxts); 198 } 199 200 static struct svc_rdma_write_info * 201 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, 202 const struct svc_rdma_chunk *chunk) 203 { 204 struct svc_rdma_write_info *info; 205 206 info = kzalloc_node(sizeof(*info), GFP_KERNEL, 207 ibdev_to_node(rdma->sc_cm_id->device)); 208 if (!info) 209 return info; 210 211 info->wi_rdma = rdma; 212 info->wi_chunk = chunk; 213 svc_rdma_cc_init(rdma, &info->wi_cc); 214 info->wi_cc.cc_cqe.done = svc_rdma_write_done; 215 return info; 216 } 217 218 static void svc_rdma_write_info_free_async(struct work_struct *work) 219 { 220 struct svc_rdma_write_info *info; 221 222 info = container_of(work, struct svc_rdma_write_info, wi_work); 223 svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE); 224 kfree(info); 225 } 226 227 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 228 { 229 INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async); 230 queue_work(svcrdma_wq, &info->wi_work); 231 } 232 233 /** 234 * svc_rdma_write_chunk_release - Release Write chunk I/O resources 235 * @rdma: controlling transport 236 * @ctxt: Send context that is being released 237 */ 238 void svc_rdma_write_chunk_release(struct svcxprt_rdma *rdma, 239 struct svc_rdma_send_ctxt *ctxt) 240 { 241 struct svc_rdma_write_info *info; 242 struct svc_rdma_chunk_ctxt *cc; 243 244 while (!list_empty(&ctxt->sc_write_info_list)) { 245 info = list_first_entry(&ctxt->sc_write_info_list, 246 struct svc_rdma_write_info, wi_list); 247 list_del(&info->wi_list); 248 249 cc = &info->wi_cc; 250 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); 251 svc_rdma_write_info_free(info); 252 } 253 } 254 255 /** 256 * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources 257 * @rdma: controlling transport 258 * @ctxt: Send context that is being released 259 */ 260 void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma, 261 struct svc_rdma_send_ctxt *ctxt) 262 { 263 struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc; 264 265 if (!cc->cc_sqecount) 266 return; 267 svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE); 268 } 269 270 /** 271 * svc_rdma_reply_done - Reply chunk Write completion handler 272 * @cq: controlling Completion Queue 273 * @wc: Work Completion report 274 * 275 * Pages under I/O are released by a subsequent Send completion. 276 */ 277 static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc) 278 { 279 struct ib_cqe *cqe = wc->wr_cqe; 280 struct svc_rdma_chunk_ctxt *cc = 281 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 282 struct svcxprt_rdma *rdma = cq->cq_context; 283 284 switch (wc->status) { 285 case IB_WC_SUCCESS: 286 trace_svcrdma_wc_reply(&cc->cc_cid); 287 return; 288 case IB_WC_WR_FLUSH_ERR: 289 trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid); 290 break; 291 default: 292 trace_svcrdma_wc_reply_err(wc, &cc->cc_cid); 293 } 294 295 svc_xprt_deferred_close(&rdma->sc_xprt); 296 } 297 298 /** 299 * svc_rdma_write_done - Write chunk completion 300 * @cq: controlling Completion Queue 301 * @wc: Work Completion 302 * 303 * Pages under I/O are freed by a subsequent Send completion. 304 */ 305 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) 306 { 307 struct svcxprt_rdma *rdma = cq->cq_context; 308 struct ib_cqe *cqe = wc->wr_cqe; 309 struct svc_rdma_chunk_ctxt *cc = 310 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 311 312 switch (wc->status) { 313 case IB_WC_SUCCESS: 314 trace_svcrdma_wc_write(&cc->cc_cid); 315 return; 316 case IB_WC_WR_FLUSH_ERR: 317 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid); 318 break; 319 default: 320 trace_svcrdma_wc_write_err(wc, &cc->cc_cid); 321 } 322 323 /* The RDMA Write has flushed, so the client won't get 324 * some of the outgoing RPC message. Signal the loss 325 * to the client by closing the connection. 326 */ 327 svc_xprt_deferred_close(&rdma->sc_xprt); 328 } 329 330 /** 331 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx 332 * @cq: controlling Completion Queue 333 * @wc: Work Completion 334 * 335 */ 336 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) 337 { 338 struct svcxprt_rdma *rdma = cq->cq_context; 339 struct ib_cqe *cqe = wc->wr_cqe; 340 struct svc_rdma_chunk_ctxt *cc = 341 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 342 struct svc_rdma_recv_ctxt *ctxt; 343 344 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); 345 346 ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc); 347 switch (wc->status) { 348 case IB_WC_SUCCESS: 349 trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes, 350 cc->cc_posttime); 351 352 spin_lock(&rdma->sc_rq_dto_lock); 353 list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q); 354 /* the unlock pairs with the smp_rmb in svc_xprt_ready */ 355 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); 356 spin_unlock(&rdma->sc_rq_dto_lock); 357 svc_xprt_enqueue(&rdma->sc_xprt); 358 return; 359 case IB_WC_WR_FLUSH_ERR: 360 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid); 361 break; 362 default: 363 trace_svcrdma_wc_read_err(wc, &cc->cc_cid); 364 } 365 366 /* The RDMA Read has flushed, so the incoming RPC message 367 * cannot be constructed and must be dropped. Signal the 368 * loss to the client by closing the connection. 369 */ 370 svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE); 371 svc_rdma_recv_ctxt_put(rdma, ctxt); 372 svc_xprt_deferred_close(&rdma->sc_xprt); 373 } 374 375 /* 376 * Assumptions: 377 * - If ib_post_send() succeeds, only one completion is expected, 378 * even if one or more WRs are flushed. This is true when posting 379 * an rdma_rw_ctx or when posting a single signaled WR. 380 */ 381 static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma, 382 struct svc_rdma_chunk_ctxt *cc) 383 { 384 struct ib_send_wr *first_wr; 385 const struct ib_send_wr *bad_wr; 386 struct list_head *tmp; 387 struct ib_cqe *cqe; 388 int ret; 389 390 might_sleep(); 391 392 if (cc->cc_sqecount > rdma->sc_sq_depth) 393 return -EINVAL; 394 395 first_wr = NULL; 396 cqe = &cc->cc_cqe; 397 list_for_each(tmp, &cc->cc_rwctxts) { 398 struct svc_rdma_rw_ctxt *ctxt; 399 400 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); 401 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, 402 rdma->sc_port_num, cqe, first_wr); 403 cqe = NULL; 404 } 405 406 do { 407 if (atomic_sub_return(cc->cc_sqecount, 408 &rdma->sc_sq_avail) > 0) { 409 cc->cc_posttime = ktime_get(); 410 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); 411 if (ret) 412 break; 413 return 0; 414 } 415 416 percpu_counter_inc(&svcrdma_stat_sq_starve); 417 trace_svcrdma_sq_full(rdma, &cc->cc_cid); 418 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 419 wait_event(rdma->sc_send_wait, 420 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); 421 trace_svcrdma_sq_retry(rdma, &cc->cc_cid); 422 } while (1); 423 424 trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret); 425 svc_xprt_deferred_close(&rdma->sc_xprt); 426 427 /* If even one was posted, there will be a completion. */ 428 if (bad_wr != first_wr) 429 return 0; 430 431 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 432 wake_up(&rdma->sc_send_wait); 433 return -ENOTCONN; 434 } 435 436 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf 437 */ 438 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, 439 unsigned int len, 440 struct svc_rdma_rw_ctxt *ctxt) 441 { 442 struct scatterlist *sg = ctxt->rw_sg_table.sgl; 443 444 sg_set_buf(&sg[0], info->wi_base, len); 445 info->wi_base += len; 446 447 ctxt->rw_nents = 1; 448 } 449 450 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist. 451 */ 452 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, 453 unsigned int remaining, 454 struct svc_rdma_rw_ctxt *ctxt) 455 { 456 unsigned int sge_no, sge_bytes, page_off, page_no; 457 const struct xdr_buf *xdr = info->wi_xdr; 458 struct scatterlist *sg; 459 struct page **page; 460 461 page_off = info->wi_next_off + xdr->page_base; 462 page_no = page_off >> PAGE_SHIFT; 463 page_off = offset_in_page(page_off); 464 page = xdr->pages + page_no; 465 info->wi_next_off += remaining; 466 sg = ctxt->rw_sg_table.sgl; 467 sge_no = 0; 468 do { 469 sge_bytes = min_t(unsigned int, remaining, 470 PAGE_SIZE - page_off); 471 sg_set_page(sg, *page, sge_bytes, page_off); 472 473 remaining -= sge_bytes; 474 sg = sg_next(sg); 475 page_off = 0; 476 sge_no++; 477 page++; 478 } while (remaining); 479 480 ctxt->rw_nents = sge_no; 481 } 482 483 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing 484 * an RPC Reply. 485 */ 486 static int 487 svc_rdma_build_writes(struct svc_rdma_write_info *info, 488 void (*constructor)(struct svc_rdma_write_info *info, 489 unsigned int len, 490 struct svc_rdma_rw_ctxt *ctxt), 491 unsigned int remaining) 492 { 493 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 494 struct svcxprt_rdma *rdma = info->wi_rdma; 495 const struct svc_rdma_segment *seg; 496 struct svc_rdma_rw_ctxt *ctxt; 497 int ret; 498 499 do { 500 unsigned int write_len; 501 u64 offset; 502 503 if (info->wi_seg_no >= info->wi_chunk->ch_segcount) 504 goto out_overflow; 505 506 seg = &info->wi_chunk->ch_segments[info->wi_seg_no]; 507 write_len = min(remaining, seg->rs_length - info->wi_seg_off); 508 if (!write_len) 509 goto out_overflow; 510 ctxt = svc_rdma_get_rw_ctxt(rdma, 511 (write_len >> PAGE_SHIFT) + 2); 512 if (!ctxt) 513 return -ENOMEM; 514 515 constructor(info, write_len, ctxt); 516 offset = seg->rs_offset + info->wi_seg_off; 517 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle, 518 DMA_TO_DEVICE); 519 if (ret < 0) 520 return -EIO; 521 percpu_counter_inc(&svcrdma_stat_write); 522 523 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 524 cc->cc_sqecount += ret; 525 if (write_len == seg->rs_length - info->wi_seg_off) { 526 info->wi_seg_no++; 527 info->wi_seg_off = 0; 528 } else { 529 info->wi_seg_off += write_len; 530 } 531 remaining -= write_len; 532 } while (remaining); 533 534 return 0; 535 536 out_overflow: 537 trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no, 538 info->wi_chunk->ch_segcount); 539 return -E2BIG; 540 } 541 542 /** 543 * svc_rdma_iov_write - Construct RDMA Writes from an iov 544 * @info: pointer to write arguments 545 * @iov: kvec to write 546 * 547 * Returns: 548 * On success, returns zero 549 * %-E2BIG if the client-provided Write chunk is too small 550 * %-ENOMEM if a resource has been exhausted 551 * %-EIO if an rdma-rw error occurred 552 */ 553 static int svc_rdma_iov_write(struct svc_rdma_write_info *info, 554 const struct kvec *iov) 555 { 556 info->wi_base = iov->iov_base; 557 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, 558 iov->iov_len); 559 } 560 561 /** 562 * svc_rdma_pages_write - Construct RDMA Writes from pages 563 * @info: pointer to write arguments 564 * @xdr: xdr_buf with pages to write 565 * @offset: offset into the content of @xdr 566 * @length: number of bytes to write 567 * 568 * Returns: 569 * On success, returns zero 570 * %-E2BIG if the client-provided Write chunk is too small 571 * %-ENOMEM if a resource has been exhausted 572 * %-EIO if an rdma-rw error occurred 573 */ 574 static int svc_rdma_pages_write(struct svc_rdma_write_info *info, 575 const struct xdr_buf *xdr, 576 unsigned int offset, 577 unsigned long length) 578 { 579 info->wi_xdr = xdr; 580 info->wi_next_off = offset - xdr->head[0].iov_len; 581 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, 582 length); 583 } 584 585 /** 586 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf 587 * @xdr: xdr_buf to write 588 * @data: pointer to write arguments 589 * 590 * Returns: 591 * On success, returns zero 592 * %-E2BIG if the client-provided Write chunk is too small 593 * %-ENOMEM if a resource has been exhausted 594 * %-EIO if an rdma-rw error occurred 595 */ 596 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data) 597 { 598 struct svc_rdma_write_info *info = data; 599 int ret; 600 601 if (xdr->head[0].iov_len) { 602 ret = svc_rdma_iov_write(info, &xdr->head[0]); 603 if (ret < 0) 604 return ret; 605 } 606 607 if (xdr->page_len) { 608 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len, 609 xdr->page_len); 610 if (ret < 0) 611 return ret; 612 } 613 614 if (xdr->tail[0].iov_len) { 615 ret = svc_rdma_iov_write(info, &xdr->tail[0]); 616 if (ret < 0) 617 return ret; 618 } 619 620 return xdr->len; 621 } 622 623 /* Link Write WRs for @chunk onto @sctxt's WR chain. 624 */ 625 static int svc_rdma_prepare_write_chunk(struct svcxprt_rdma *rdma, 626 struct svc_rdma_send_ctxt *sctxt, 627 const struct svc_rdma_chunk *chunk, 628 const struct xdr_buf *xdr) 629 { 630 struct svc_rdma_write_info *info; 631 struct svc_rdma_chunk_ctxt *cc; 632 struct ib_send_wr *first_wr; 633 struct xdr_buf payload; 634 struct list_head *pos; 635 struct ib_cqe *cqe; 636 int ret; 637 638 if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position, 639 chunk->ch_payload_length)) 640 return -EMSGSIZE; 641 642 info = svc_rdma_write_info_alloc(rdma, chunk); 643 if (!info) 644 return -ENOMEM; 645 cc = &info->wi_cc; 646 647 ret = svc_rdma_xb_write(&payload, info); 648 if (ret != payload.len) 649 goto out_err; 650 651 ret = -EINVAL; 652 if (unlikely(cc->cc_sqecount > rdma->sc_sq_depth)) 653 goto out_err; 654 655 first_wr = sctxt->sc_wr_chain; 656 cqe = &cc->cc_cqe; 657 list_for_each(pos, &cc->cc_rwctxts) { 658 struct svc_rdma_rw_ctxt *rwc; 659 660 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list); 661 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp, 662 rdma->sc_port_num, cqe, first_wr); 663 cqe = NULL; 664 } 665 sctxt->sc_wr_chain = first_wr; 666 sctxt->sc_sqecount += cc->cc_sqecount; 667 list_add(&info->wi_list, &sctxt->sc_write_info_list); 668 669 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); 670 return 0; 671 672 out_err: 673 svc_rdma_write_info_free(info); 674 return ret; 675 } 676 677 /** 678 * svc_rdma_prepare_write_list - Construct WR chain for sending Write list 679 * @rdma: controlling RDMA transport 680 * @write_pcl: Write list provisioned by the client 681 * @sctxt: Send WR resources 682 * @xdr: xdr_buf containing an RPC Reply message 683 * 684 * Returns zero on success, or a negative errno if one or more 685 * Write chunks could not be sent. 686 */ 687 int svc_rdma_prepare_write_list(struct svcxprt_rdma *rdma, 688 const struct svc_rdma_pcl *write_pcl, 689 struct svc_rdma_send_ctxt *sctxt, 690 const struct xdr_buf *xdr) 691 { 692 struct svc_rdma_chunk *chunk; 693 int ret; 694 695 pcl_for_each_chunk(chunk, write_pcl) { 696 if (!chunk->ch_payload_length) 697 break; 698 ret = svc_rdma_prepare_write_chunk(rdma, sctxt, chunk, xdr); 699 if (ret < 0) 700 return ret; 701 } 702 return 0; 703 } 704 705 /** 706 * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk 707 * @rdma: controlling RDMA transport 708 * @write_pcl: Write chunk list provided by client 709 * @reply_pcl: Reply chunk provided by client 710 * @sctxt: Send WR resources 711 * @xdr: xdr_buf containing an RPC Reply 712 * 713 * Returns a non-negative number of bytes the chunk consumed, or 714 * %-E2BIG if the payload was larger than the Reply chunk, 715 * %-EINVAL if client provided too many segments, 716 * %-ENOMEM if rdma_rw context pool was exhausted, 717 * %-ENOTCONN if posting failed (connection is lost), 718 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 719 */ 720 int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma, 721 const struct svc_rdma_pcl *write_pcl, 722 const struct svc_rdma_pcl *reply_pcl, 723 struct svc_rdma_send_ctxt *sctxt, 724 const struct xdr_buf *xdr) 725 { 726 struct svc_rdma_write_info *info = &sctxt->sc_reply_info; 727 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 728 struct ib_send_wr *first_wr; 729 struct list_head *pos; 730 struct ib_cqe *cqe; 731 int ret; 732 733 info->wi_rdma = rdma; 734 info->wi_chunk = pcl_first_chunk(reply_pcl); 735 info->wi_seg_off = 0; 736 info->wi_seg_no = 0; 737 info->wi_cc.cc_cqe.done = svc_rdma_reply_done; 738 739 ret = pcl_process_nonpayloads(write_pcl, xdr, 740 svc_rdma_xb_write, info); 741 if (ret < 0) 742 return ret; 743 744 first_wr = sctxt->sc_wr_chain; 745 cqe = &cc->cc_cqe; 746 list_for_each(pos, &cc->cc_rwctxts) { 747 struct svc_rdma_rw_ctxt *rwc; 748 749 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list); 750 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp, 751 rdma->sc_port_num, cqe, first_wr); 752 cqe = NULL; 753 } 754 sctxt->sc_wr_chain = first_wr; 755 sctxt->sc_sqecount += cc->cc_sqecount; 756 757 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); 758 return xdr->len; 759 } 760 761 /** 762 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment 763 * @rqstp: RPC transaction context 764 * @head: context for ongoing I/O 765 * @segment: co-ordinates of remote memory to be read 766 * 767 * Returns: 768 * %0: the Read WR chain was constructed successfully 769 * %-EINVAL: there were not enough rq_pages to finish 770 * %-ENOMEM: allocating a local resources failed 771 * %-EIO: a DMA mapping error occurred 772 */ 773 static int svc_rdma_build_read_segment(struct svc_rqst *rqstp, 774 struct svc_rdma_recv_ctxt *head, 775 const struct svc_rdma_segment *segment) 776 { 777 struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp); 778 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; 779 unsigned int sge_no, seg_len, len; 780 struct svc_rdma_rw_ctxt *ctxt; 781 struct scatterlist *sg; 782 int ret; 783 784 len = segment->rs_length; 785 sge_no = PAGE_ALIGN(head->rc_pageoff + len) >> PAGE_SHIFT; 786 ctxt = svc_rdma_get_rw_ctxt(rdma, sge_no); 787 if (!ctxt) 788 return -ENOMEM; 789 ctxt->rw_nents = sge_no; 790 791 sg = ctxt->rw_sg_table.sgl; 792 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { 793 seg_len = min_t(unsigned int, len, 794 PAGE_SIZE - head->rc_pageoff); 795 796 if (!head->rc_pageoff) 797 head->rc_page_count++; 798 799 sg_set_page(sg, rqstp->rq_pages[head->rc_curpage], 800 seg_len, head->rc_pageoff); 801 sg = sg_next(sg); 802 803 head->rc_pageoff += seg_len; 804 if (head->rc_pageoff == PAGE_SIZE) { 805 head->rc_curpage++; 806 head->rc_pageoff = 0; 807 } 808 len -= seg_len; 809 810 if (len && ((head->rc_curpage + 1) > ARRAY_SIZE(rqstp->rq_pages))) 811 goto out_overrun; 812 } 813 814 ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset, 815 segment->rs_handle, DMA_FROM_DEVICE); 816 if (ret < 0) 817 return -EIO; 818 percpu_counter_inc(&svcrdma_stat_read); 819 820 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 821 cc->cc_sqecount += ret; 822 return 0; 823 824 out_overrun: 825 trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage); 826 return -EINVAL; 827 } 828 829 /** 830 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk 831 * @rqstp: RPC transaction context 832 * @head: context for ongoing I/O 833 * @chunk: Read chunk to pull 834 * 835 * Return values: 836 * %0: the Read WR chain was constructed successfully 837 * %-EINVAL: there were not enough resources to finish 838 * %-ENOMEM: allocating a local resources failed 839 * %-EIO: a DMA mapping error occurred 840 */ 841 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, 842 struct svc_rdma_recv_ctxt *head, 843 const struct svc_rdma_chunk *chunk) 844 { 845 const struct svc_rdma_segment *segment; 846 int ret; 847 848 ret = -EINVAL; 849 pcl_for_each_segment(segment, chunk) { 850 ret = svc_rdma_build_read_segment(rqstp, head, segment); 851 if (ret < 0) 852 break; 853 head->rc_readbytes += segment->rs_length; 854 } 855 return ret; 856 } 857 858 /** 859 * svc_rdma_copy_inline_range - Copy part of the inline content into pages 860 * @rqstp: RPC transaction context 861 * @head: context for ongoing I/O 862 * @offset: offset into the Receive buffer of region to copy 863 * @remaining: length of region to copy 864 * 865 * Take a page at a time from rqstp->rq_pages and copy the inline 866 * content from the Receive buffer into that page. Update 867 * head->rc_curpage and head->rc_pageoff so that the next RDMA Read 868 * result will land contiguously with the copied content. 869 * 870 * Return values: 871 * %0: Inline content was successfully copied 872 * %-EINVAL: offset or length was incorrect 873 */ 874 static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp, 875 struct svc_rdma_recv_ctxt *head, 876 unsigned int offset, 877 unsigned int remaining) 878 { 879 unsigned char *dst, *src = head->rc_recv_buf; 880 unsigned int page_no, numpages; 881 882 numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT; 883 for (page_no = 0; page_no < numpages; page_no++) { 884 unsigned int page_len; 885 886 page_len = min_t(unsigned int, remaining, 887 PAGE_SIZE - head->rc_pageoff); 888 889 if (!head->rc_pageoff) 890 head->rc_page_count++; 891 892 dst = page_address(rqstp->rq_pages[head->rc_curpage]); 893 memcpy(dst + head->rc_curpage, src + offset, page_len); 894 895 head->rc_readbytes += page_len; 896 head->rc_pageoff += page_len; 897 if (head->rc_pageoff == PAGE_SIZE) { 898 head->rc_curpage++; 899 head->rc_pageoff = 0; 900 } 901 remaining -= page_len; 902 offset += page_len; 903 } 904 905 return -EINVAL; 906 } 907 908 /** 909 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks 910 * @rqstp: RPC transaction context 911 * @head: context for ongoing I/O 912 * 913 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages, 914 * like an incoming TCP call. 915 * 916 * Return values: 917 * %0: RDMA Read WQEs were successfully built 918 * %-EINVAL: client provided too many chunks or segments, 919 * %-ENOMEM: rdma_rw context pool was exhausted, 920 * %-ENOTCONN: posting failed (connection is lost), 921 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 922 */ 923 static noinline int 924 svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp, 925 struct svc_rdma_recv_ctxt *head) 926 { 927 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 928 struct svc_rdma_chunk *chunk, *next; 929 unsigned int start, length; 930 int ret; 931 932 start = 0; 933 chunk = pcl_first_chunk(pcl); 934 length = chunk->ch_position; 935 ret = svc_rdma_copy_inline_range(rqstp, head, start, length); 936 if (ret < 0) 937 return ret; 938 939 pcl_for_each_chunk(chunk, pcl) { 940 ret = svc_rdma_build_read_chunk(rqstp, head, chunk); 941 if (ret < 0) 942 return ret; 943 944 next = pcl_next_chunk(pcl, chunk); 945 if (!next) 946 break; 947 948 start += length; 949 length = next->ch_position - head->rc_readbytes; 950 ret = svc_rdma_copy_inline_range(rqstp, head, start, length); 951 if (ret < 0) 952 return ret; 953 } 954 955 start += length; 956 length = head->rc_byte_len - start; 957 return svc_rdma_copy_inline_range(rqstp, head, start, length); 958 } 959 960 /** 961 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks 962 * @rqstp: RPC transaction context 963 * @head: context for ongoing I/O 964 * 965 * The chunk data lands in the page list of rqstp->rq_arg.pages. 966 * 967 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec. 968 * Therefore, XDR round-up of the Read chunk and trailing 969 * inline content must both be added at the end of the pagelist. 970 * 971 * Return values: 972 * %0: RDMA Read WQEs were successfully built 973 * %-EINVAL: client provided too many chunks or segments, 974 * %-ENOMEM: rdma_rw context pool was exhausted, 975 * %-ENOTCONN: posting failed (connection is lost), 976 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 977 */ 978 static int svc_rdma_read_data_item(struct svc_rqst *rqstp, 979 struct svc_rdma_recv_ctxt *head) 980 { 981 return svc_rdma_build_read_chunk(rqstp, head, 982 pcl_first_chunk(&head->rc_read_pcl)); 983 } 984 985 /** 986 * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk 987 * @rqstp: RPC transaction context 988 * @head: context for ongoing I/O 989 * @chunk: parsed Call chunk to pull 990 * @offset: offset of region to pull 991 * @length: length of region to pull 992 * 993 * Return values: 994 * %0: RDMA Read WQEs were successfully built 995 * %-EINVAL: there were not enough resources to finish 996 * %-ENOMEM: rdma_rw context pool was exhausted, 997 * %-ENOTCONN: posting failed (connection is lost), 998 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 999 */ 1000 static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp, 1001 struct svc_rdma_recv_ctxt *head, 1002 const struct svc_rdma_chunk *chunk, 1003 unsigned int offset, unsigned int length) 1004 { 1005 const struct svc_rdma_segment *segment; 1006 int ret; 1007 1008 ret = -EINVAL; 1009 pcl_for_each_segment(segment, chunk) { 1010 struct svc_rdma_segment dummy; 1011 1012 if (offset > segment->rs_length) { 1013 offset -= segment->rs_length; 1014 continue; 1015 } 1016 1017 dummy.rs_handle = segment->rs_handle; 1018 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset; 1019 dummy.rs_offset = segment->rs_offset + offset; 1020 1021 ret = svc_rdma_build_read_segment(rqstp, head, &dummy); 1022 if (ret < 0) 1023 break; 1024 1025 head->rc_readbytes += dummy.rs_length; 1026 length -= dummy.rs_length; 1027 offset = 0; 1028 } 1029 return ret; 1030 } 1031 1032 /** 1033 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message 1034 * @rqstp: RPC transaction context 1035 * @head: context for ongoing I/O 1036 * 1037 * Return values: 1038 * %0: RDMA Read WQEs were successfully built 1039 * %-EINVAL: there were not enough resources to finish 1040 * %-ENOMEM: rdma_rw context pool was exhausted, 1041 * %-ENOTCONN: posting failed (connection is lost), 1042 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1043 */ 1044 static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp, 1045 struct svc_rdma_recv_ctxt *head) 1046 { 1047 const struct svc_rdma_chunk *call_chunk = 1048 pcl_first_chunk(&head->rc_call_pcl); 1049 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 1050 struct svc_rdma_chunk *chunk, *next; 1051 unsigned int start, length; 1052 int ret; 1053 1054 if (pcl_is_empty(pcl)) 1055 return svc_rdma_build_read_chunk(rqstp, head, call_chunk); 1056 1057 start = 0; 1058 chunk = pcl_first_chunk(pcl); 1059 length = chunk->ch_position; 1060 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1061 start, length); 1062 if (ret < 0) 1063 return ret; 1064 1065 pcl_for_each_chunk(chunk, pcl) { 1066 ret = svc_rdma_build_read_chunk(rqstp, head, chunk); 1067 if (ret < 0) 1068 return ret; 1069 1070 next = pcl_next_chunk(pcl, chunk); 1071 if (!next) 1072 break; 1073 1074 start += length; 1075 length = next->ch_position - head->rc_readbytes; 1076 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1077 start, length); 1078 if (ret < 0) 1079 return ret; 1080 } 1081 1082 start += length; 1083 length = call_chunk->ch_length - start; 1084 return svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1085 start, length); 1086 } 1087 1088 /** 1089 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message 1090 * @rqstp: RPC transaction context 1091 * @head: context for ongoing I/O 1092 * 1093 * The start of the data lands in the first page just after the 1094 * Transport header, and the rest lands in rqstp->rq_arg.pages. 1095 * 1096 * Assumptions: 1097 * - A PZRC is never sent in an RDMA_MSG message, though it's 1098 * allowed by spec. 1099 * 1100 * Return values: 1101 * %0: RDMA Read WQEs were successfully built 1102 * %-EINVAL: client provided too many chunks or segments, 1103 * %-ENOMEM: rdma_rw context pool was exhausted, 1104 * %-ENOTCONN: posting failed (connection is lost), 1105 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1106 */ 1107 static noinline int svc_rdma_read_special(struct svc_rqst *rqstp, 1108 struct svc_rdma_recv_ctxt *head) 1109 { 1110 return svc_rdma_read_call_chunk(rqstp, head); 1111 } 1112 1113 /* Pages under I/O have been copied to head->rc_pages. Ensure that 1114 * svc_xprt_release() does not put them when svc_rdma_recvfrom() 1115 * returns. This has to be done after all Read WRs are constructed 1116 * to properly handle a page that happens to be part of I/O on behalf 1117 * of two different RDMA segments. 1118 * 1119 * Note: if the subsequent post_send fails, these pages have already 1120 * been moved to head->rc_pages and thus will be cleaned up by 1121 * svc_rdma_recv_ctxt_put(). 1122 */ 1123 static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp, 1124 struct svc_rdma_recv_ctxt *head) 1125 { 1126 unsigned int i; 1127 1128 for (i = 0; i < head->rc_page_count; i++) { 1129 head->rc_pages[i] = rqstp->rq_pages[i]; 1130 rqstp->rq_pages[i] = NULL; 1131 } 1132 } 1133 1134 /** 1135 * svc_rdma_process_read_list - Pull list of Read chunks from the client 1136 * @rdma: controlling RDMA transport 1137 * @rqstp: set of pages to use as Read sink buffers 1138 * @head: pages under I/O collect here 1139 * 1140 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders 1141 * pull each Read chunk as they decode an incoming RPC message. 1142 * 1143 * On Linux, however, the server needs to have a fully-constructed RPC 1144 * message in rqstp->rq_arg when there is a positive return code from 1145 * ->xpo_recvfrom. So the Read list is safety-checked immediately when 1146 * it is received, then here the whole Read list is pulled all at once. 1147 * The ingress RPC message is fully reconstructed once all associated 1148 * RDMA Reads have completed. 1149 * 1150 * Return values: 1151 * %1: all needed RDMA Reads were posted successfully, 1152 * %-EINVAL: client provided too many chunks or segments, 1153 * %-ENOMEM: rdma_rw context pool was exhausted, 1154 * %-ENOTCONN: posting failed (connection is lost), 1155 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1156 */ 1157 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, 1158 struct svc_rqst *rqstp, 1159 struct svc_rdma_recv_ctxt *head) 1160 { 1161 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; 1162 int ret; 1163 1164 cc->cc_cqe.done = svc_rdma_wc_read_done; 1165 cc->cc_sqecount = 0; 1166 head->rc_pageoff = 0; 1167 head->rc_curpage = 0; 1168 head->rc_readbytes = 0; 1169 1170 if (pcl_is_empty(&head->rc_call_pcl)) { 1171 if (head->rc_read_pcl.cl_count == 1) 1172 ret = svc_rdma_read_data_item(rqstp, head); 1173 else 1174 ret = svc_rdma_read_multiple_chunks(rqstp, head); 1175 } else 1176 ret = svc_rdma_read_special(rqstp, head); 1177 svc_rdma_clear_rqst_pages(rqstp, head); 1178 if (ret < 0) 1179 return ret; 1180 1181 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); 1182 ret = svc_rdma_post_chunk_ctxt(rdma, cc); 1183 return ret < 0 ? ret : 1; 1184 } 1185