1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2016-2018 Oracle. All rights reserved. 4 * 5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. 6 */ 7 8 #include <linux/bvec.h> 9 #include <linux/overflow.h> 10 #include <rdma/rw.h> 11 12 #include <linux/sunrpc/xdr.h> 13 #include <linux/sunrpc/rpc_rdma.h> 14 #include <linux/sunrpc/svc_rdma.h> 15 16 #include "xprt_rdma.h" 17 #include <trace/events/rpcrdma.h> 18 19 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc); 20 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc); 21 22 /* Each R/W context contains state for one chain of RDMA Read or 23 * Write Work Requests. 24 * 25 * Each WR chain handles a single contiguous server-side buffer. 26 * - each xdr_buf iovec is a single contiguous buffer 27 * - the xdr_buf pages array is a single contiguous buffer because the 28 * second through the last element always start on a page boundary 29 * 30 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment 31 * from a client may contain a unique R_key, so each WR chain moves 32 * up to one segment at a time. 33 * 34 * The inline bvec array is sized to handle most I/O requests without 35 * additional allocation. Larger requests fall back to dynamic allocation. 36 * These contexts are created on demand, but cached and reused until 37 * the controlling svcxprt_rdma is destroyed. 38 */ 39 struct svc_rdma_rw_ctxt { 40 struct llist_node rw_node; 41 struct list_head rw_list; 42 struct rdma_rw_ctx rw_ctx; 43 unsigned int rw_nents; 44 unsigned int rw_first_bvec_nents; 45 struct bio_vec *rw_bvec; 46 struct bio_vec rw_first_bvec[]; 47 }; 48 49 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 50 struct svc_rdma_rw_ctxt *ctxt); 51 52 static inline struct svc_rdma_rw_ctxt * 53 svc_rdma_next_ctxt(struct list_head *list) 54 { 55 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, 56 rw_list); 57 } 58 59 static struct svc_rdma_rw_ctxt * 60 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int nr_bvec) 61 { 62 struct ib_device *dev = rdma->sc_cm_id->device; 63 unsigned int first_bvec_nents = dev->attrs.max_send_sge; 64 struct svc_rdma_rw_ctxt *ctxt; 65 struct llist_node *node; 66 67 spin_lock(&rdma->sc_rw_ctxt_lock); 68 node = llist_del_first(&rdma->sc_rw_ctxts); 69 spin_unlock(&rdma->sc_rw_ctxt_lock); 70 if (node) { 71 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 72 } else { 73 ctxt = kmalloc_node(struct_size(ctxt, rw_first_bvec, 74 first_bvec_nents), 75 GFP_KERNEL, ibdev_to_node(dev)); 76 if (!ctxt) 77 goto out_noctx; 78 79 INIT_LIST_HEAD(&ctxt->rw_list); 80 ctxt->rw_first_bvec_nents = first_bvec_nents; 81 } 82 83 if (nr_bvec <= ctxt->rw_first_bvec_nents) { 84 ctxt->rw_bvec = ctxt->rw_first_bvec; 85 } else { 86 ctxt->rw_bvec = kmalloc_array_node(nr_bvec, 87 sizeof(*ctxt->rw_bvec), 88 GFP_KERNEL, 89 ibdev_to_node(dev)); 90 if (!ctxt->rw_bvec) 91 goto out_free; 92 } 93 return ctxt; 94 95 out_free: 96 /* Return cached contexts to cache; free freshly allocated ones */ 97 if (node) 98 svc_rdma_put_rw_ctxt(rdma, ctxt); 99 else 100 kfree(ctxt); 101 out_noctx: 102 trace_svcrdma_rwctx_empty(rdma, nr_bvec); 103 return NULL; 104 } 105 106 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt, 107 struct llist_head *list) 108 { 109 if (ctxt->rw_bvec != ctxt->rw_first_bvec) 110 kfree(ctxt->rw_bvec); 111 llist_add(&ctxt->rw_node, list); 112 } 113 114 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 115 struct svc_rdma_rw_ctxt *ctxt) 116 { 117 __svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts); 118 } 119 120 /** 121 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts 122 * @rdma: transport about to be destroyed 123 * 124 */ 125 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) 126 { 127 struct svc_rdma_rw_ctxt *ctxt; 128 struct llist_node *node; 129 130 while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) { 131 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 132 kfree(ctxt); 133 } 134 } 135 136 /** 137 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O 138 * @rdma: controlling transport instance 139 * @ctxt: R/W context to prepare 140 * @offset: RDMA offset 141 * @handle: RDMA tag/handle 142 * @length: total number of bytes in the bvec array 143 * @direction: I/O direction 144 * 145 * Returns on success, the number of WQEs that will be needed 146 * on the workqueue, or a negative errno. 147 */ 148 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma, 149 struct svc_rdma_rw_ctxt *ctxt, 150 u64 offset, u32 handle, unsigned int length, 151 enum dma_data_direction direction) 152 { 153 struct bvec_iter iter = { 154 .bi_size = length, 155 }; 156 int ret; 157 158 ret = rdma_rw_ctx_init_bvec(&ctxt->rw_ctx, rdma->sc_qp, 159 rdma->sc_port_num, 160 ctxt->rw_bvec, ctxt->rw_nents, 161 iter, offset, handle, direction); 162 if (unlikely(ret < 0)) { 163 trace_svcrdma_dma_map_rw_err(rdma, offset, handle, 164 ctxt->rw_nents, ret); 165 svc_rdma_put_rw_ctxt(rdma, ctxt); 166 } 167 return ret; 168 } 169 170 /** 171 * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt 172 * @rdma: controlling transport instance 173 * @cc: svc_rdma_chunk_ctxt to be initialized 174 */ 175 void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 176 struct svc_rdma_chunk_ctxt *cc) 177 { 178 struct rpc_rdma_cid *cid = &cc->cc_cid; 179 180 if (unlikely(!cid->ci_completion_id)) 181 svc_rdma_send_cid_init(rdma, cid); 182 183 INIT_LIST_HEAD(&cc->cc_rwctxts); 184 cc->cc_sqecount = 0; 185 } 186 187 /** 188 * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt 189 * @rdma: controlling transport instance 190 * @cc: svc_rdma_chunk_ctxt to be released 191 * @dir: DMA direction 192 */ 193 void svc_rdma_cc_release(struct svcxprt_rdma *rdma, 194 struct svc_rdma_chunk_ctxt *cc, 195 enum dma_data_direction dir) 196 { 197 struct llist_node *first, *last; 198 struct svc_rdma_rw_ctxt *ctxt; 199 200 trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount); 201 202 first = last = NULL; 203 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { 204 list_del(&ctxt->rw_list); 205 206 rdma_rw_ctx_destroy_bvec(&ctxt->rw_ctx, rdma->sc_qp, 207 rdma->sc_port_num, 208 ctxt->rw_bvec, ctxt->rw_nents, dir); 209 if (ctxt->rw_bvec != ctxt->rw_first_bvec) 210 kfree(ctxt->rw_bvec); 211 212 ctxt->rw_node.next = first; 213 first = &ctxt->rw_node; 214 if (!last) 215 last = first; 216 } 217 if (first) 218 llist_add_batch(first, last, &rdma->sc_rw_ctxts); 219 } 220 221 static struct svc_rdma_write_info * 222 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, 223 const struct svc_rdma_chunk *chunk) 224 { 225 struct svc_rdma_write_info *info; 226 227 info = kzalloc_node(sizeof(*info), GFP_KERNEL, 228 ibdev_to_node(rdma->sc_cm_id->device)); 229 if (!info) 230 return info; 231 232 info->wi_rdma = rdma; 233 info->wi_chunk = chunk; 234 svc_rdma_cc_init(rdma, &info->wi_cc); 235 info->wi_cc.cc_cqe.done = svc_rdma_write_done; 236 return info; 237 } 238 239 static void svc_rdma_write_info_free_async(struct work_struct *work) 240 { 241 struct svc_rdma_write_info *info; 242 243 info = container_of(work, struct svc_rdma_write_info, wi_work); 244 svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE); 245 kfree(info); 246 } 247 248 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 249 { 250 INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async); 251 queue_work(svcrdma_wq, &info->wi_work); 252 } 253 254 /** 255 * svc_rdma_write_chunk_release - Release Write chunk I/O resources 256 * @rdma: controlling transport 257 * @ctxt: Send context that is being released 258 * 259 * Write chunk resources remain live until Send completion because 260 * Write WRs are chained to the Send WR. This function releases all 261 * write_info structures accumulated on @ctxt->sc_write_info_list. 262 */ 263 void svc_rdma_write_chunk_release(struct svcxprt_rdma *rdma, 264 struct svc_rdma_send_ctxt *ctxt) 265 { 266 struct svc_rdma_write_info *info; 267 268 while (!list_empty(&ctxt->sc_write_info_list)) { 269 info = list_first_entry(&ctxt->sc_write_info_list, 270 struct svc_rdma_write_info, wi_list); 271 list_del(&info->wi_list); 272 svc_rdma_write_info_free(info); 273 } 274 } 275 276 /** 277 * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources 278 * @rdma: controlling transport 279 * @ctxt: Send context that is being released 280 */ 281 void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma, 282 struct svc_rdma_send_ctxt *ctxt) 283 { 284 struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc; 285 286 if (!cc->cc_sqecount) 287 return; 288 svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE); 289 } 290 291 /** 292 * svc_rdma_reply_done - Reply chunk Write completion handler 293 * @cq: controlling Completion Queue 294 * @wc: Work Completion report 295 * 296 * Pages under I/O are released by a subsequent Send completion. 297 */ 298 static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc) 299 { 300 struct ib_cqe *cqe = wc->wr_cqe; 301 struct svc_rdma_chunk_ctxt *cc = 302 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 303 struct svcxprt_rdma *rdma = cq->cq_context; 304 305 switch (wc->status) { 306 case IB_WC_SUCCESS: 307 trace_svcrdma_wc_reply(&cc->cc_cid); 308 return; 309 case IB_WC_WR_FLUSH_ERR: 310 trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid); 311 break; 312 default: 313 trace_svcrdma_wc_reply_err(wc, &cc->cc_cid); 314 } 315 316 svc_xprt_deferred_close(&rdma->sc_xprt); 317 } 318 319 /** 320 * svc_rdma_write_done - Write chunk completion 321 * @cq: controlling Completion Queue 322 * @wc: Work Completion 323 * 324 * Pages under I/O are freed by a subsequent Send completion. 325 */ 326 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) 327 { 328 struct svcxprt_rdma *rdma = cq->cq_context; 329 struct ib_cqe *cqe = wc->wr_cqe; 330 struct svc_rdma_chunk_ctxt *cc = 331 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 332 333 switch (wc->status) { 334 case IB_WC_SUCCESS: 335 trace_svcrdma_wc_write(&cc->cc_cid); 336 return; 337 case IB_WC_WR_FLUSH_ERR: 338 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid); 339 break; 340 default: 341 trace_svcrdma_wc_write_err(wc, &cc->cc_cid); 342 } 343 344 /* The RDMA Write has flushed, so the client won't get 345 * some of the outgoing RPC message. Signal the loss 346 * to the client by closing the connection. 347 */ 348 svc_xprt_deferred_close(&rdma->sc_xprt); 349 } 350 351 /** 352 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx 353 * @cq: controlling Completion Queue 354 * @wc: Work Completion 355 * 356 */ 357 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) 358 { 359 struct svcxprt_rdma *rdma = cq->cq_context; 360 struct ib_cqe *cqe = wc->wr_cqe; 361 struct svc_rdma_chunk_ctxt *cc = 362 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 363 struct svc_rdma_recv_ctxt *ctxt; 364 365 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); 366 367 ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc); 368 switch (wc->status) { 369 case IB_WC_SUCCESS: 370 trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes, 371 cc->cc_posttime); 372 373 spin_lock(&rdma->sc_rq_dto_lock); 374 list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q); 375 /* the unlock pairs with the smp_rmb in svc_xprt_ready */ 376 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); 377 spin_unlock(&rdma->sc_rq_dto_lock); 378 svc_xprt_enqueue(&rdma->sc_xprt); 379 return; 380 case IB_WC_WR_FLUSH_ERR: 381 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid); 382 break; 383 default: 384 trace_svcrdma_wc_read_err(wc, &cc->cc_cid); 385 } 386 387 /* The RDMA Read has flushed, so the incoming RPC message 388 * cannot be constructed and must be dropped. Signal the 389 * loss to the client by closing the connection. 390 */ 391 svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE); 392 svc_rdma_recv_ctxt_put(rdma, ctxt); 393 svc_xprt_deferred_close(&rdma->sc_xprt); 394 } 395 396 /* 397 * Assumptions: 398 * - If ib_post_send() succeeds, only one completion is expected, 399 * even if one or more WRs are flushed. This is true when posting 400 * an rdma_rw_ctx or when posting a single signaled WR. 401 */ 402 static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma, 403 struct svc_rdma_chunk_ctxt *cc) 404 { 405 struct ib_send_wr *first_wr; 406 const struct ib_send_wr *bad_wr; 407 struct list_head *tmp; 408 struct ib_cqe *cqe; 409 int ret; 410 411 might_sleep(); 412 413 if (cc->cc_sqecount > rdma->sc_sq_depth) 414 return -EINVAL; 415 416 first_wr = NULL; 417 cqe = &cc->cc_cqe; 418 list_for_each(tmp, &cc->cc_rwctxts) { 419 struct svc_rdma_rw_ctxt *ctxt; 420 421 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); 422 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, 423 rdma->sc_port_num, cqe, first_wr); 424 cqe = NULL; 425 } 426 427 ret = svc_rdma_sq_wait(rdma, &cc->cc_cid, cc->cc_sqecount); 428 if (ret < 0) 429 return ret; 430 431 cc->cc_posttime = ktime_get(); 432 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); 433 if (ret) 434 return svc_rdma_post_send_err(rdma, &cc->cc_cid, bad_wr, 435 first_wr, cc->cc_sqecount, 436 ret); 437 return 0; 438 } 439 440 /* Build a bvec that covers one kvec in an xdr_buf. 441 */ 442 static void svc_rdma_vec_to_bvec(struct svc_rdma_write_info *info, 443 unsigned int len, 444 struct svc_rdma_rw_ctxt *ctxt) 445 { 446 bvec_set_virt(&ctxt->rw_bvec[0], info->wi_base, len); 447 info->wi_base += len; 448 449 ctxt->rw_nents = 1; 450 } 451 452 /* Build a bvec array that covers part of an xdr_buf's pagelist. 453 */ 454 static void svc_rdma_pagelist_to_bvec(struct svc_rdma_write_info *info, 455 unsigned int remaining, 456 struct svc_rdma_rw_ctxt *ctxt) 457 { 458 unsigned int bvec_idx, bvec_len, page_off, page_no; 459 const struct xdr_buf *xdr = info->wi_xdr; 460 struct page **page; 461 462 page_off = info->wi_next_off + xdr->page_base; 463 page_no = page_off >> PAGE_SHIFT; 464 page_off = offset_in_page(page_off); 465 page = xdr->pages + page_no; 466 info->wi_next_off += remaining; 467 bvec_idx = 0; 468 do { 469 bvec_len = min_t(unsigned int, remaining, 470 PAGE_SIZE - page_off); 471 bvec_set_page(&ctxt->rw_bvec[bvec_idx], *page, bvec_len, 472 page_off); 473 remaining -= bvec_len; 474 page_off = 0; 475 bvec_idx++; 476 page++; 477 } while (remaining); 478 479 ctxt->rw_nents = bvec_idx; 480 } 481 482 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing 483 * an RPC Reply. 484 */ 485 static int 486 svc_rdma_build_writes(struct svc_rdma_write_info *info, 487 void (*constructor)(struct svc_rdma_write_info *info, 488 unsigned int len, 489 struct svc_rdma_rw_ctxt *ctxt), 490 unsigned int remaining) 491 { 492 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 493 struct svcxprt_rdma *rdma = info->wi_rdma; 494 const struct svc_rdma_segment *seg; 495 struct svc_rdma_rw_ctxt *ctxt; 496 int ret; 497 498 do { 499 unsigned int write_len; 500 u64 offset; 501 502 if (info->wi_seg_no >= info->wi_chunk->ch_segcount) 503 goto out_overflow; 504 505 seg = &info->wi_chunk->ch_segments[info->wi_seg_no]; 506 write_len = min(remaining, seg->rs_length - info->wi_seg_off); 507 if (!write_len) 508 goto out_overflow; 509 ctxt = svc_rdma_get_rw_ctxt(rdma, 510 (write_len >> PAGE_SHIFT) + 2); 511 if (!ctxt) 512 return -ENOMEM; 513 514 constructor(info, write_len, ctxt); 515 offset = seg->rs_offset + info->wi_seg_off; 516 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle, 517 write_len, DMA_TO_DEVICE); 518 if (ret < 0) 519 return -EIO; 520 percpu_counter_inc(&svcrdma_stat_write); 521 522 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 523 cc->cc_sqecount += ret; 524 if (write_len == seg->rs_length - info->wi_seg_off) { 525 info->wi_seg_no++; 526 info->wi_seg_off = 0; 527 } else { 528 info->wi_seg_off += write_len; 529 } 530 remaining -= write_len; 531 } while (remaining); 532 533 return 0; 534 535 out_overflow: 536 trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no, 537 info->wi_chunk->ch_segcount); 538 return -E2BIG; 539 } 540 541 /** 542 * svc_rdma_iov_write - Construct RDMA Writes from an iov 543 * @info: pointer to write arguments 544 * @iov: kvec to write 545 * 546 * Returns: 547 * On success, returns zero 548 * %-E2BIG if the client-provided Write chunk is too small 549 * %-ENOMEM if a resource has been exhausted 550 * %-EIO if an rdma-rw error occurred 551 */ 552 static int svc_rdma_iov_write(struct svc_rdma_write_info *info, 553 const struct kvec *iov) 554 { 555 info->wi_base = iov->iov_base; 556 return svc_rdma_build_writes(info, svc_rdma_vec_to_bvec, 557 iov->iov_len); 558 } 559 560 /** 561 * svc_rdma_pages_write - Construct RDMA Writes from pages 562 * @info: pointer to write arguments 563 * @xdr: xdr_buf with pages to write 564 * @offset: offset into the content of @xdr 565 * @length: number of bytes to write 566 * 567 * Returns: 568 * On success, returns zero 569 * %-E2BIG if the client-provided Write chunk is too small 570 * %-ENOMEM if a resource has been exhausted 571 * %-EIO if an rdma-rw error occurred 572 */ 573 static int svc_rdma_pages_write(struct svc_rdma_write_info *info, 574 const struct xdr_buf *xdr, 575 unsigned int offset, 576 unsigned long length) 577 { 578 info->wi_xdr = xdr; 579 info->wi_next_off = offset - xdr->head[0].iov_len; 580 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_bvec, 581 length); 582 } 583 584 /** 585 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf 586 * @xdr: xdr_buf to write 587 * @data: pointer to write arguments 588 * 589 * Returns: 590 * On success, returns zero 591 * %-E2BIG if the client-provided Write chunk is too small 592 * %-ENOMEM if a resource has been exhausted 593 * %-EIO if an rdma-rw error occurred 594 */ 595 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data) 596 { 597 struct svc_rdma_write_info *info = data; 598 int ret; 599 600 if (xdr->head[0].iov_len) { 601 ret = svc_rdma_iov_write(info, &xdr->head[0]); 602 if (ret < 0) 603 return ret; 604 } 605 606 if (xdr->page_len) { 607 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len, 608 xdr->page_len); 609 if (ret < 0) 610 return ret; 611 } 612 613 if (xdr->tail[0].iov_len) { 614 ret = svc_rdma_iov_write(info, &xdr->tail[0]); 615 if (ret < 0) 616 return ret; 617 } 618 619 return xdr->len; 620 } 621 622 /* Link chunk WRs onto @sctxt's WR chain. Completion is requested 623 * for the tail WR, which is posted first. 624 */ 625 static void svc_rdma_cc_link_wrs(struct svcxprt_rdma *rdma, 626 struct svc_rdma_send_ctxt *sctxt, 627 struct svc_rdma_chunk_ctxt *cc) 628 { 629 struct ib_send_wr *first_wr; 630 struct list_head *pos; 631 struct ib_cqe *cqe; 632 633 first_wr = sctxt->sc_wr_chain; 634 cqe = &cc->cc_cqe; 635 list_for_each(pos, &cc->cc_rwctxts) { 636 struct svc_rdma_rw_ctxt *rwc; 637 638 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list); 639 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp, 640 rdma->sc_port_num, cqe, first_wr); 641 cqe = NULL; 642 } 643 sctxt->sc_wr_chain = first_wr; 644 sctxt->sc_sqecount += cc->cc_sqecount; 645 } 646 647 /* Link Write WRs for @chunk onto @sctxt's WR chain. 648 */ 649 static int svc_rdma_prepare_write_chunk(struct svcxprt_rdma *rdma, 650 struct svc_rdma_send_ctxt *sctxt, 651 const struct svc_rdma_chunk *chunk, 652 const struct xdr_buf *xdr) 653 { 654 struct svc_rdma_write_info *info; 655 struct svc_rdma_chunk_ctxt *cc; 656 struct xdr_buf payload; 657 int ret; 658 659 if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position, 660 chunk->ch_payload_length)) 661 return -EMSGSIZE; 662 663 info = svc_rdma_write_info_alloc(rdma, chunk); 664 if (!info) 665 return -ENOMEM; 666 cc = &info->wi_cc; 667 668 ret = svc_rdma_xb_write(&payload, info); 669 if (ret != payload.len) 670 goto out_err; 671 672 ret = -EINVAL; 673 if (unlikely(sctxt->sc_sqecount + cc->cc_sqecount > rdma->sc_sq_depth)) 674 goto out_err; 675 676 svc_rdma_cc_link_wrs(rdma, sctxt, cc); 677 list_add(&info->wi_list, &sctxt->sc_write_info_list); 678 679 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); 680 return 0; 681 682 out_err: 683 svc_rdma_write_info_free(info); 684 return ret; 685 } 686 687 /** 688 * svc_rdma_prepare_write_list - Construct WR chain for sending Write list 689 * @rdma: controlling RDMA transport 690 * @rctxt: Write list provisioned by the client 691 * @sctxt: Send WR resources 692 * @xdr: xdr_buf containing an RPC Reply message 693 * 694 * Returns zero on success, or a negative errno if WR chain 695 * construction fails for one or more Write chunks. 696 */ 697 int svc_rdma_prepare_write_list(struct svcxprt_rdma *rdma, 698 const struct svc_rdma_recv_ctxt *rctxt, 699 struct svc_rdma_send_ctxt *sctxt, 700 const struct xdr_buf *xdr) 701 { 702 struct svc_rdma_chunk *chunk; 703 int ret; 704 705 pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) { 706 if (!chunk->ch_payload_length) 707 break; 708 ret = svc_rdma_prepare_write_chunk(rdma, sctxt, chunk, xdr); 709 if (ret < 0) 710 return ret; 711 } 712 return 0; 713 } 714 715 /** 716 * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk 717 * @rdma: controlling RDMA transport 718 * @write_pcl: Write chunk list provided by client 719 * @reply_pcl: Reply chunk provided by client 720 * @sctxt: Send WR resources 721 * @xdr: xdr_buf containing an RPC Reply 722 * 723 * Returns a non-negative number of bytes the chunk consumed, or 724 * %-E2BIG if the payload was larger than the Reply chunk, 725 * %-EINVAL if client provided too many segments, 726 * %-ENOMEM if rdma_rw context pool was exhausted, 727 * %-ENOTCONN if posting failed (connection is lost), 728 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 729 */ 730 int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma, 731 const struct svc_rdma_pcl *write_pcl, 732 const struct svc_rdma_pcl *reply_pcl, 733 struct svc_rdma_send_ctxt *sctxt, 734 const struct xdr_buf *xdr) 735 { 736 struct svc_rdma_write_info *info = &sctxt->sc_reply_info; 737 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 738 int ret; 739 740 info->wi_rdma = rdma; 741 info->wi_chunk = pcl_first_chunk(reply_pcl); 742 info->wi_seg_off = 0; 743 info->wi_seg_no = 0; 744 info->wi_cc.cc_cqe.done = svc_rdma_reply_done; 745 746 ret = pcl_process_nonpayloads(write_pcl, xdr, 747 svc_rdma_xb_write, info); 748 if (ret < 0) 749 return ret; 750 751 svc_rdma_cc_link_wrs(rdma, sctxt, cc); 752 753 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); 754 return xdr->len; 755 } 756 757 /* 758 * Cap contiguous RDMA Read sink allocations at order-4. 759 * Higher orders risk allocation failure under 760 * __GFP_NORETRY, which would negate the benefit of the 761 * contiguous fast path. 762 */ 763 #define SVC_RDMA_CONTIG_MAX_ORDER 4 764 765 /** 766 * svc_rdma_alloc_read_pages - Allocate physically contiguous pages 767 * @nr_pages: number of pages needed 768 * @order: on success, set to the allocation order 769 * 770 * Attempts a higher-order allocation, falling back to smaller orders. 771 * The returned pages are split immediately so each sub-page has its 772 * own refcount and can be freed independently. 773 * 774 * Returns a pointer to the first page on success, or NULL if even 775 * order-1 allocation fails. 776 */ 777 static struct page * 778 svc_rdma_alloc_read_pages(unsigned int nr_pages, unsigned int *order) 779 { 780 unsigned int o; 781 struct page *page; 782 783 o = min(get_order(nr_pages << PAGE_SHIFT), 784 SVC_RDMA_CONTIG_MAX_ORDER); 785 786 while (o >= 1) { 787 page = alloc_pages(GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN, 788 o); 789 if (page) { 790 split_page(page, o); 791 *order = o; 792 return page; 793 } 794 o--; 795 } 796 return NULL; 797 } 798 799 /* 800 * svc_rdma_fill_contig_bvec - Replace rq_pages with a contiguous allocation 801 * @rqstp: RPC transaction context 802 * @head: context for ongoing I/O 803 * @bv: bvec entry to fill 804 * @pages_left: number of data pages remaining in the segment 805 * @len_left: bytes remaining in the segment 806 * 807 * On success, fills @bv with a bvec spanning the contiguous range and 808 * advances rc_curpage/rc_page_count. Returns the byte length covered, 809 * or zero if the allocation failed or would overrun rq_maxpages. 810 */ 811 static unsigned int 812 svc_rdma_fill_contig_bvec(struct svc_rqst *rqstp, 813 struct svc_rdma_recv_ctxt *head, 814 struct bio_vec *bv, unsigned int pages_left, 815 unsigned int len_left) 816 { 817 unsigned int order, npages, chunk_pages, chunk_len, i; 818 struct page *page; 819 820 page = svc_rdma_alloc_read_pages(pages_left, &order); 821 if (!page) 822 return 0; 823 npages = 1 << order; 824 825 if (head->rc_curpage + npages > rqstp->rq_maxpages) { 826 for (i = 0; i < npages; i++) 827 __free_page(page + i); 828 return 0; 829 } 830 831 /* 832 * Replace rq_pages[] entries with pages from the contiguous 833 * allocation. If npages exceeds chunk_pages, the extra pages 834 * stay in rq_pages[] for later reuse or normal rqst teardown. 835 */ 836 for (i = 0; i < npages; i++) { 837 svc_rqst_page_release(rqstp, 838 rqstp->rq_pages[head->rc_curpage + i]); 839 rqstp->rq_pages[head->rc_curpage + i] = page + i; 840 } 841 842 chunk_pages = min(npages, pages_left); 843 chunk_len = min_t(unsigned int, chunk_pages << PAGE_SHIFT, len_left); 844 bvec_set_page(bv, page, chunk_len, 0); 845 head->rc_page_count += chunk_pages; 846 head->rc_curpage += chunk_pages; 847 return chunk_len; 848 } 849 850 /* 851 * svc_rdma_fill_page_bvec - Add a single rq_page to the bvec array 852 * @head: context for ongoing I/O 853 * @ctxt: R/W context whose bvec array is being filled 854 * @cur: page to add 855 * @bvec_idx: pointer to current bvec index, not advanced on merge 856 * @len_left: bytes remaining in the segment 857 * 858 * If @cur is physically contiguous with the preceding bvec, it is 859 * merged by extending that bvec's length. Otherwise a new bvec 860 * entry is created. Returns the byte length covered. 861 */ 862 static unsigned int 863 svc_rdma_fill_page_bvec(struct svc_rdma_recv_ctxt *head, 864 struct svc_rdma_rw_ctxt *ctxt, struct page *cur, 865 unsigned int *bvec_idx, unsigned int len_left) 866 { 867 unsigned int chunk_len = min_t(unsigned int, PAGE_SIZE, len_left); 868 869 head->rc_page_count++; 870 head->rc_curpage++; 871 872 if (*bvec_idx > 0) { 873 struct bio_vec *prev = &ctxt->rw_bvec[*bvec_idx - 1]; 874 875 if (page_to_phys(prev->bv_page) + prev->bv_offset + 876 prev->bv_len == page_to_phys(cur)) { 877 prev->bv_len += chunk_len; 878 return chunk_len; 879 } 880 } 881 882 bvec_set_page(&ctxt->rw_bvec[*bvec_idx], cur, chunk_len, 0); 883 (*bvec_idx)++; 884 return chunk_len; 885 } 886 887 /** 888 * svc_rdma_build_read_segment_contig - Build RDMA Read WR with contiguous pages 889 * @rqstp: RPC transaction context 890 * @head: context for ongoing I/O 891 * @segment: co-ordinates of remote memory to be read 892 * 893 * Greedily allocates higher-order pages to cover the segment, 894 * building one bvec per contiguous chunk. Each allocation is 895 * split so sub-pages have independent refcounts. When a 896 * higher-order allocation fails, remaining pages are covered 897 * individually, merging adjacent pages into the preceding bvec 898 * when they are physically contiguous. The split sub-pages 899 * replace entries in rq_pages[] so downstream cleanup is 900 * unchanged. 901 * 902 * Returns: 903 * %0: the Read WR was constructed successfully 904 * %-ENOMEM: allocation failed 905 * %-EIO: a DMA mapping error occurred 906 */ 907 static int svc_rdma_build_read_segment_contig(struct svc_rqst *rqstp, 908 struct svc_rdma_recv_ctxt *head, 909 const struct svc_rdma_segment *segment) 910 { 911 struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp); 912 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; 913 unsigned int nr_data_pages, bvec_idx; 914 struct svc_rdma_rw_ctxt *ctxt; 915 unsigned int len_left; 916 int ret; 917 918 nr_data_pages = PAGE_ALIGN(segment->rs_length) >> PAGE_SHIFT; 919 if (head->rc_curpage + nr_data_pages > rqstp->rq_maxpages) 920 return -ENOMEM; 921 922 ctxt = svc_rdma_get_rw_ctxt(rdma, nr_data_pages); 923 if (!ctxt) 924 return -ENOMEM; 925 926 bvec_idx = 0; 927 len_left = segment->rs_length; 928 while (len_left) { 929 unsigned int pages_left = PAGE_ALIGN(len_left) >> PAGE_SHIFT; 930 unsigned int chunk_len = 0; 931 932 if (pages_left >= 2) 933 chunk_len = svc_rdma_fill_contig_bvec(rqstp, head, 934 &ctxt->rw_bvec[bvec_idx], 935 pages_left, len_left); 936 if (chunk_len) { 937 bvec_idx++; 938 } else { 939 struct page *cur = 940 rqstp->rq_pages[head->rc_curpage]; 941 chunk_len = svc_rdma_fill_page_bvec(head, ctxt, cur, 942 &bvec_idx, 943 len_left); 944 } 945 946 len_left -= chunk_len; 947 } 948 949 ctxt->rw_nents = bvec_idx; 950 951 head->rc_pageoff = offset_in_page(segment->rs_length); 952 if (head->rc_pageoff) 953 head->rc_curpage--; 954 955 ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset, 956 segment->rs_handle, segment->rs_length, 957 DMA_FROM_DEVICE); 958 if (ret < 0) 959 return -EIO; 960 percpu_counter_inc(&svcrdma_stat_read); 961 962 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 963 cc->cc_sqecount += ret; 964 return 0; 965 } 966 967 /** 968 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment 969 * @rqstp: RPC transaction context 970 * @head: context for ongoing I/O 971 * @segment: co-ordinates of remote memory to be read 972 * 973 * Returns: 974 * %0: the Read WR chain was constructed successfully 975 * %-EINVAL: there were not enough rq_pages to finish 976 * %-ENOMEM: allocating a local resources failed 977 * %-EIO: a DMA mapping error occurred 978 */ 979 static int svc_rdma_build_read_segment(struct svc_rqst *rqstp, 980 struct svc_rdma_recv_ctxt *head, 981 const struct svc_rdma_segment *segment) 982 { 983 struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp); 984 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; 985 unsigned int bvec_idx, nr_bvec, seg_len, len, total; 986 struct svc_rdma_rw_ctxt *ctxt; 987 int ret; 988 989 len = segment->rs_length; 990 if (check_add_overflow(head->rc_pageoff, len, &total)) 991 return -EINVAL; 992 nr_bvec = PAGE_ALIGN(total) >> PAGE_SHIFT; 993 994 if (head->rc_pageoff == 0 && nr_bvec >= 2) { 995 ret = svc_rdma_build_read_segment_contig(rqstp, head, 996 segment); 997 if (ret != -ENOMEM) 998 return ret; 999 } 1000 1001 ctxt = svc_rdma_get_rw_ctxt(rdma, nr_bvec); 1002 if (!ctxt) 1003 return -ENOMEM; 1004 ctxt->rw_nents = nr_bvec; 1005 1006 for (bvec_idx = 0; bvec_idx < ctxt->rw_nents; bvec_idx++) { 1007 seg_len = min_t(unsigned int, len, 1008 PAGE_SIZE - head->rc_pageoff); 1009 1010 if (!head->rc_pageoff) 1011 head->rc_page_count++; 1012 1013 bvec_set_page(&ctxt->rw_bvec[bvec_idx], 1014 rqstp->rq_pages[head->rc_curpage], 1015 seg_len, head->rc_pageoff); 1016 1017 head->rc_pageoff += seg_len; 1018 if (head->rc_pageoff == PAGE_SIZE) { 1019 head->rc_curpage++; 1020 head->rc_pageoff = 0; 1021 } 1022 len -= seg_len; 1023 1024 if (len && ((head->rc_curpage + 1) > rqstp->rq_maxpages)) 1025 goto out_overrun; 1026 } 1027 1028 ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset, 1029 segment->rs_handle, segment->rs_length, 1030 DMA_FROM_DEVICE); 1031 if (ret < 0) 1032 return -EIO; 1033 percpu_counter_inc(&svcrdma_stat_read); 1034 1035 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 1036 cc->cc_sqecount += ret; 1037 return 0; 1038 1039 out_overrun: 1040 trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage); 1041 return -EINVAL; 1042 } 1043 1044 /** 1045 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk 1046 * @rqstp: RPC transaction context 1047 * @head: context for ongoing I/O 1048 * @chunk: Read chunk to pull 1049 * 1050 * Return values: 1051 * %0: the Read WR chain was constructed successfully 1052 * %-EINVAL: there were not enough resources to finish 1053 * %-ENOMEM: allocating a local resources failed 1054 * %-EIO: a DMA mapping error occurred 1055 */ 1056 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, 1057 struct svc_rdma_recv_ctxt *head, 1058 const struct svc_rdma_chunk *chunk) 1059 { 1060 const struct svc_rdma_segment *segment; 1061 int ret; 1062 1063 ret = -EINVAL; 1064 pcl_for_each_segment(segment, chunk) { 1065 ret = svc_rdma_build_read_segment(rqstp, head, segment); 1066 if (ret < 0) 1067 break; 1068 head->rc_readbytes += segment->rs_length; 1069 } 1070 return ret; 1071 } 1072 1073 /** 1074 * svc_rdma_copy_inline_range - Copy part of the inline content into pages 1075 * @rqstp: RPC transaction context 1076 * @head: context for ongoing I/O 1077 * @offset: offset into the Receive buffer of region to copy 1078 * @remaining: length of region to copy 1079 * 1080 * Take a page at a time from rqstp->rq_pages and copy the inline 1081 * content from the Receive buffer into that page. Update 1082 * head->rc_curpage and head->rc_pageoff so that the next RDMA Read 1083 * result will land contiguously with the copied content. 1084 * 1085 * Return values: 1086 * %0: Inline content was successfully copied 1087 * %-EINVAL: offset or length was incorrect 1088 */ 1089 static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp, 1090 struct svc_rdma_recv_ctxt *head, 1091 unsigned int offset, 1092 unsigned int remaining) 1093 { 1094 unsigned char *dst, *src = head->rc_recv_buf; 1095 unsigned int page_no, numpages; 1096 1097 numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT; 1098 for (page_no = 0; page_no < numpages; page_no++) { 1099 unsigned int page_len; 1100 1101 if (head->rc_curpage >= rqstp->rq_maxpages) 1102 return -EINVAL; 1103 1104 page_len = min_t(unsigned int, remaining, 1105 PAGE_SIZE - head->rc_pageoff); 1106 1107 if (!head->rc_pageoff) 1108 head->rc_page_count++; 1109 1110 dst = page_address(rqstp->rq_pages[head->rc_curpage]); 1111 memcpy((unsigned char *)dst + head->rc_pageoff, src + offset, page_len); 1112 1113 head->rc_readbytes += page_len; 1114 head->rc_pageoff += page_len; 1115 if (head->rc_pageoff == PAGE_SIZE) { 1116 head->rc_curpage++; 1117 head->rc_pageoff = 0; 1118 } 1119 remaining -= page_len; 1120 offset += page_len; 1121 } 1122 1123 return 0; 1124 } 1125 1126 /** 1127 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks 1128 * @rqstp: RPC transaction context 1129 * @head: context for ongoing I/O 1130 * 1131 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages, 1132 * like an incoming TCP call. 1133 * 1134 * Return values: 1135 * %0: RDMA Read WQEs were successfully built 1136 * %-EINVAL: client provided too many chunks or segments, 1137 * %-ENOMEM: rdma_rw context pool was exhausted, 1138 * %-ENOTCONN: posting failed (connection is lost), 1139 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1140 */ 1141 static noinline int 1142 svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp, 1143 struct svc_rdma_recv_ctxt *head) 1144 { 1145 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 1146 struct svc_rdma_chunk *chunk, *next; 1147 unsigned int start, length; 1148 int ret; 1149 1150 start = 0; 1151 chunk = pcl_first_chunk(pcl); 1152 length = chunk->ch_position; 1153 ret = svc_rdma_copy_inline_range(rqstp, head, start, length); 1154 if (ret < 0) 1155 return ret; 1156 1157 pcl_for_each_chunk(chunk, pcl) { 1158 ret = svc_rdma_build_read_chunk(rqstp, head, chunk); 1159 if (ret < 0) 1160 return ret; 1161 1162 next = pcl_next_chunk(pcl, chunk); 1163 if (!next) 1164 break; 1165 1166 start += length; 1167 length = next->ch_position - head->rc_readbytes; 1168 ret = svc_rdma_copy_inline_range(rqstp, head, start, length); 1169 if (ret < 0) 1170 return ret; 1171 } 1172 1173 start += length; 1174 length = head->rc_byte_len - start; 1175 return svc_rdma_copy_inline_range(rqstp, head, start, length); 1176 } 1177 1178 /** 1179 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks 1180 * @rqstp: RPC transaction context 1181 * @head: context for ongoing I/O 1182 * 1183 * The chunk data lands in the page list of rqstp->rq_arg.pages. 1184 * 1185 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec. 1186 * Therefore, XDR round-up of the Read chunk and trailing 1187 * inline content must both be added at the end of the pagelist. 1188 * 1189 * Return values: 1190 * %0: RDMA Read WQEs were successfully built 1191 * %-EINVAL: client provided too many chunks or segments, 1192 * %-ENOMEM: rdma_rw context pool was exhausted, 1193 * %-ENOTCONN: posting failed (connection is lost), 1194 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1195 */ 1196 static int svc_rdma_read_data_item(struct svc_rqst *rqstp, 1197 struct svc_rdma_recv_ctxt *head) 1198 { 1199 return svc_rdma_build_read_chunk(rqstp, head, 1200 pcl_first_chunk(&head->rc_read_pcl)); 1201 } 1202 1203 /** 1204 * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk 1205 * @rqstp: RPC transaction context 1206 * @head: context for ongoing I/O 1207 * @chunk: parsed Call chunk to pull 1208 * @offset: offset of region to pull 1209 * @length: length of region to pull 1210 * 1211 * Return values: 1212 * %0: RDMA Read WQEs were successfully built 1213 * %-EINVAL: there were not enough resources to finish 1214 * %-ENOMEM: rdma_rw context pool was exhausted, 1215 * %-ENOTCONN: posting failed (connection is lost), 1216 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1217 */ 1218 static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp, 1219 struct svc_rdma_recv_ctxt *head, 1220 const struct svc_rdma_chunk *chunk, 1221 unsigned int offset, unsigned int length) 1222 { 1223 const struct svc_rdma_segment *segment; 1224 int ret; 1225 1226 ret = -EINVAL; 1227 pcl_for_each_segment(segment, chunk) { 1228 struct svc_rdma_segment dummy; 1229 1230 if (offset > segment->rs_length) { 1231 offset -= segment->rs_length; 1232 continue; 1233 } 1234 1235 dummy.rs_handle = segment->rs_handle; 1236 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset; 1237 dummy.rs_offset = segment->rs_offset + offset; 1238 1239 ret = svc_rdma_build_read_segment(rqstp, head, &dummy); 1240 if (ret < 0) 1241 break; 1242 1243 head->rc_readbytes += dummy.rs_length; 1244 length -= dummy.rs_length; 1245 offset = 0; 1246 } 1247 return ret; 1248 } 1249 1250 /** 1251 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message 1252 * @rqstp: RPC transaction context 1253 * @head: context for ongoing I/O 1254 * 1255 * Return values: 1256 * %0: RDMA Read WQEs were successfully built 1257 * %-EINVAL: there were not enough resources to finish 1258 * %-ENOMEM: rdma_rw context pool was exhausted, 1259 * %-ENOTCONN: posting failed (connection is lost), 1260 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1261 */ 1262 static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp, 1263 struct svc_rdma_recv_ctxt *head) 1264 { 1265 const struct svc_rdma_chunk *call_chunk = 1266 pcl_first_chunk(&head->rc_call_pcl); 1267 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 1268 struct svc_rdma_chunk *chunk, *next; 1269 unsigned int start, length; 1270 int ret; 1271 1272 if (pcl_is_empty(pcl)) 1273 return svc_rdma_build_read_chunk(rqstp, head, call_chunk); 1274 1275 start = 0; 1276 chunk = pcl_first_chunk(pcl); 1277 length = chunk->ch_position; 1278 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1279 start, length); 1280 if (ret < 0) 1281 return ret; 1282 1283 pcl_for_each_chunk(chunk, pcl) { 1284 ret = svc_rdma_build_read_chunk(rqstp, head, chunk); 1285 if (ret < 0) 1286 return ret; 1287 1288 next = pcl_next_chunk(pcl, chunk); 1289 if (!next) 1290 break; 1291 1292 start += length; 1293 length = next->ch_position - head->rc_readbytes; 1294 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1295 start, length); 1296 if (ret < 0) 1297 return ret; 1298 } 1299 1300 start += length; 1301 length = call_chunk->ch_length - start; 1302 return svc_rdma_read_chunk_range(rqstp, head, call_chunk, 1303 start, length); 1304 } 1305 1306 /** 1307 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message 1308 * @rqstp: RPC transaction context 1309 * @head: context for ongoing I/O 1310 * 1311 * The start of the data lands in the first page just after the 1312 * Transport header, and the rest lands in rqstp->rq_arg.pages. 1313 * 1314 * Assumptions: 1315 * - A PZRC is never sent in an RDMA_MSG message, though it's 1316 * allowed by spec. 1317 * 1318 * Return values: 1319 * %0: RDMA Read WQEs were successfully built 1320 * %-EINVAL: client provided too many chunks or segments, 1321 * %-ENOMEM: rdma_rw context pool was exhausted, 1322 * %-ENOTCONN: posting failed (connection is lost), 1323 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1324 */ 1325 static noinline int svc_rdma_read_special(struct svc_rqst *rqstp, 1326 struct svc_rdma_recv_ctxt *head) 1327 { 1328 return svc_rdma_read_call_chunk(rqstp, head); 1329 } 1330 1331 /* Pages under I/O have been copied to head->rc_pages. Ensure that 1332 * svc_xprt_release() does not put them when svc_rdma_recvfrom() 1333 * returns. This has to be done after all Read WRs are constructed 1334 * to properly handle a page that happens to be part of I/O on behalf 1335 * of two different RDMA segments. 1336 * 1337 * Note: if the subsequent post_send fails, these pages have already 1338 * been moved to head->rc_pages and thus will be cleaned up by 1339 * svc_rdma_recv_ctxt_put(). 1340 */ 1341 static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp, 1342 struct svc_rdma_recv_ctxt *head) 1343 { 1344 unsigned int i; 1345 1346 /* 1347 * Move only pages containing RPC data into rc_pages[]. Pages 1348 * from a contiguous allocation that were not used for the 1349 * payload remain in rq_pages[] for subsequent reuse. 1350 */ 1351 for (i = 0; i < head->rc_page_count; i++) { 1352 head->rc_pages[i] = rqstp->rq_pages[i]; 1353 rqstp->rq_pages[i] = NULL; 1354 } 1355 rqstp->rq_pages_nfree = head->rc_page_count; 1356 } 1357 1358 /** 1359 * svc_rdma_process_read_list - Pull list of Read chunks from the client 1360 * @rdma: controlling RDMA transport 1361 * @rqstp: set of pages to use as Read sink buffers 1362 * @head: pages under I/O collect here 1363 * 1364 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders 1365 * pull each Read chunk as they decode an incoming RPC message. 1366 * 1367 * On Linux, however, the server needs to have a fully-constructed RPC 1368 * message in rqstp->rq_arg when there is a positive return code from 1369 * ->xpo_recvfrom. So the Read list is safety-checked immediately when 1370 * it is received, then here the whole Read list is pulled all at once. 1371 * The ingress RPC message is fully reconstructed once all associated 1372 * RDMA Reads have completed. 1373 * 1374 * Return values: 1375 * %1: all needed RDMA Reads were posted successfully, 1376 * %-EINVAL: client provided too many chunks or segments, 1377 * %-ENOMEM: rdma_rw context pool was exhausted, 1378 * %-ENOTCONN: posting failed (connection is lost), 1379 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1380 */ 1381 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, 1382 struct svc_rqst *rqstp, 1383 struct svc_rdma_recv_ctxt *head) 1384 { 1385 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; 1386 int ret; 1387 1388 cc->cc_cqe.done = svc_rdma_wc_read_done; 1389 cc->cc_sqecount = 0; 1390 head->rc_pageoff = 0; 1391 head->rc_curpage = 0; 1392 head->rc_readbytes = 0; 1393 1394 if (pcl_is_empty(&head->rc_call_pcl)) { 1395 if (head->rc_read_pcl.cl_count == 1) 1396 ret = svc_rdma_read_data_item(rqstp, head); 1397 else 1398 ret = svc_rdma_read_multiple_chunks(rqstp, head); 1399 } else 1400 ret = svc_rdma_read_special(rqstp, head); 1401 svc_rdma_clear_rqst_pages(rqstp, head); 1402 if (ret < 0) 1403 return ret; 1404 1405 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); 1406 ret = svc_rdma_post_chunk_ctxt(rdma, cc); 1407 return ret < 0 ? ret : 1; 1408 } 1409