Lines Matching +full:dma +full:- +full:info

1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2016-2018 Oracle. All rights reserved.
5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
23 * Each WR chain handles a single contiguous server-side buffer,
27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
33 * smaller I/O requests without disabling bottom-halves, these
57 struct ib_device *dev = rdma->sc_cm_id->device;
58 unsigned int first_sgl_nents = dev->attrs.max_send_sge;
62 spin_lock(&rdma->sc_rw_ctxt_lock);
63 node = llist_del_first(&rdma->sc_rw_ctxts);
64 spin_unlock(&rdma->sc_rw_ctxt_lock);
73 INIT_LIST_HEAD(&ctxt->rw_list);
74 ctxt->rw_first_sgl_nents = first_sgl_nents;
77 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
78 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
79 ctxt->rw_sg_table.sgl,
94 sg_free_table_chained(&ctxt->rw_sg_table, ctxt->rw_first_sgl_nents);
95 llist_add(&ctxt->rw_node, list);
101 __svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts);
105 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
114 while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) {
121 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
138 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num,
139 ctxt->rw_sg_table.sgl, ctxt->rw_nents,
143 ctxt->rw_nents, ret);
150 * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt
157 struct rpc_rdma_cid *cid = &cc->cc_cid;
159 if (unlikely(!cid->ci_completion_id))
162 INIT_LIST_HEAD(&cc->cc_rwctxts);
163 cc->cc_sqecount = 0;
167 * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
170 * @dir: DMA direction
180 trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount);
183 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
184 list_del(&ctxt->rw_list);
186 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
187 rdma->sc_port_num, ctxt->rw_sg_table.sgl,
188 ctxt->rw_nents, dir);
191 ctxt->rw_node.next = first;
192 first = &ctxt->rw_node;
197 llist_add_batch(first, last, &rdma->sc_rw_ctxts);
204 struct svc_rdma_write_info *info;
206 info = kzalloc_node(sizeof(*info), GFP_KERNEL,
207 ibdev_to_node(rdma->sc_cm_id->device));
208 if (!info)
209 return info;
211 info->wi_rdma = rdma;
212 info->wi_chunk = chunk;
213 svc_rdma_cc_init(rdma, &info->wi_cc);
214 info->wi_cc.cc_cqe.done = svc_rdma_write_done;
215 return info;
220 struct svc_rdma_write_info *info;
222 info = container_of(work, struct svc_rdma_write_info, wi_work);
223 svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE);
224 kfree(info);
227 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
229 INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async);
230 queue_work(svcrdma_wq, &info->wi_work);
234 * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
241 struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc;
243 if (!cc->cc_sqecount)
249 * svc_rdma_reply_done - Reply chunk Write completion handler
257 struct ib_cqe *cqe = wc->wr_cqe;
260 struct svcxprt_rdma *rdma = cq->cq_context;
262 switch (wc->status) {
264 trace_svcrdma_wc_reply(&cc->cc_cid);
267 trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid);
270 trace_svcrdma_wc_reply_err(wc, &cc->cc_cid);
273 svc_xprt_deferred_close(&rdma->sc_xprt);
277 * svc_rdma_write_done - Write chunk completion
285 struct svcxprt_rdma *rdma = cq->cq_context;
286 struct ib_cqe *cqe = wc->wr_cqe;
289 struct svc_rdma_write_info *info =
292 switch (wc->status) {
294 trace_svcrdma_wc_write(&cc->cc_cid);
297 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid);
300 trace_svcrdma_wc_write_err(wc, &cc->cc_cid);
303 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
305 if (unlikely(wc->status != IB_WC_SUCCESS))
306 svc_xprt_deferred_close(&rdma->sc_xprt);
308 svc_rdma_write_info_free(info);
312 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
319 struct svcxprt_rdma *rdma = cq->cq_context;
320 struct ib_cqe *cqe = wc->wr_cqe;
325 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
328 switch (wc->status) {
330 trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes,
331 cc->cc_posttime);
333 spin_lock(&rdma->sc_rq_dto_lock);
334 list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q);
336 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
337 spin_unlock(&rdma->sc_rq_dto_lock);
338 svc_xprt_enqueue(&rdma->sc_xprt);
341 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid);
344 trace_svcrdma_wc_read_err(wc, &cc->cc_cid);
353 svc_xprt_deferred_close(&rdma->sc_xprt);
358 * - If ib_post_send() succeeds, only one completion is expected,
373 if (cc->cc_sqecount > rdma->sc_sq_depth)
374 return -EINVAL;
377 cqe = &cc->cc_cqe;
378 list_for_each(tmp, &cc->cc_rwctxts) {
382 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
383 rdma->sc_port_num, cqe, first_wr);
388 if (atomic_sub_return(cc->cc_sqecount,
389 &rdma->sc_sq_avail) > 0) {
390 cc->cc_posttime = ktime_get();
391 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
398 trace_svcrdma_sq_full(rdma, &cc->cc_cid);
399 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
400 wait_event(rdma->sc_send_wait,
401 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
402 trace_svcrdma_sq_retry(rdma, &cc->cc_cid);
405 trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret);
406 svc_xprt_deferred_close(&rdma->sc_xprt);
412 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
413 wake_up(&rdma->sc_send_wait);
414 return -ENOTCONN;
417 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf
419 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
423 struct scatterlist *sg = ctxt->rw_sg_table.sgl;
425 sg_set_buf(&sg[0], info->wi_base, len);
426 info->wi_base += len;
428 ctxt->rw_nents = 1;
431 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
433 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
438 const struct xdr_buf *xdr = info->wi_xdr;
442 page_off = info->wi_next_off + xdr->page_base;
445 page = xdr->pages + page_no;
446 info->wi_next_off += remaining;
447 sg = ctxt->rw_sg_table.sgl;
451 PAGE_SIZE - page_off);
454 remaining -= sge_bytes;
461 ctxt->rw_nents = sge_no;
468 svc_rdma_build_writes(struct svc_rdma_write_info *info,
469 void (*constructor)(struct svc_rdma_write_info *info,
474 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
475 struct svcxprt_rdma *rdma = info->wi_rdma;
484 if (info->wi_seg_no >= info->wi_chunk->ch_segcount)
487 seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
488 write_len = min(remaining, seg->rs_length - info->wi_seg_off);
494 return -ENOMEM;
496 constructor(info, write_len, ctxt);
497 offset = seg->rs_offset + info->wi_seg_off;
498 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
501 return -EIO;
504 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
505 cc->cc_sqecount += ret;
506 if (write_len == seg->rs_length - info->wi_seg_off) {
507 info->wi_seg_no++;
508 info->wi_seg_off = 0;
510 info->wi_seg_off += write_len;
512 remaining -= write_len;
518 trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no,
519 info->wi_chunk->ch_segcount);
520 return -E2BIG;
524 * svc_rdma_iov_write - Construct RDMA Writes from an iov
525 * @info: pointer to write arguments
530 * %-E2BIG if the client-provided Write chunk is too small
531 * %-ENOMEM if a resource has been exhausted
532 * %-EIO if an rdma-rw error occurred
534 static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
537 info->wi_base = iov->iov_base;
538 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
539 iov->iov_len);
543 * svc_rdma_pages_write - Construct RDMA Writes from pages
544 * @info: pointer to write arguments
551 * %-E2BIG if the client-provided Write chunk is too small
552 * %-ENOMEM if a resource has been exhausted
553 * %-EIO if an rdma-rw error occurred
555 static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
560 info->wi_xdr = xdr;
561 info->wi_next_off = offset - xdr->head[0].iov_len;
562 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
567 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
573 * %-E2BIG if the client-provided Write chunk is too small
574 * %-ENOMEM if a resource has been exhausted
575 * %-EIO if an rdma-rw error occurred
579 struct svc_rdma_write_info *info = data;
582 if (xdr->head[0].iov_len) {
583 ret = svc_rdma_iov_write(info, &xdr->head[0]);
588 if (xdr->page_len) {
589 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
590 xdr->page_len);
595 if (xdr->tail[0].iov_len) {
596 ret = svc_rdma_iov_write(info, &xdr->tail[0]);
601 return xdr->len;
608 struct svc_rdma_write_info *info;
613 if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position,
614 chunk->ch_payload_length))
615 return -EMSGSIZE;
617 info = svc_rdma_write_info_alloc(rdma, chunk);
618 if (!info)
619 return -ENOMEM;
620 cc = &info->wi_cc;
622 ret = svc_rdma_xb_write(&payload, info);
626 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
633 svc_rdma_write_info_free(info);
638 * svc_rdma_send_write_list - Send all chunks on the Write list
653 pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
654 if (!chunk->ch_payload_length)
664 * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk
671 * Returns a non-negative number of bytes the chunk consumed, or
672 * %-E2BIG if the payload was larger than the Reply chunk,
673 * %-EINVAL if client provided too many segments,
674 * %-ENOMEM if rdma_rw context pool was exhausted,
675 * %-ENOTCONN if posting failed (connection is lost),
676 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
684 struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
685 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
691 info->wi_rdma = rdma;
692 info->wi_chunk = pcl_first_chunk(reply_pcl);
693 info->wi_seg_off = 0;
694 info->wi_seg_no = 0;
695 info->wi_cc.cc_cqe.done = svc_rdma_reply_done;
698 svc_rdma_xb_write, info);
702 first_wr = sctxt->sc_wr_chain;
703 cqe = &cc->cc_cqe;
704 list_for_each(pos, &cc->cc_rwctxts) {
708 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
709 rdma->sc_port_num, cqe, first_wr);
712 sctxt->sc_wr_chain = first_wr;
713 sctxt->sc_sqecount += cc->cc_sqecount;
715 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
716 return xdr->len;
720 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
723 * @segment: co-ordinates of remote memory to be read
727 * %-EINVAL: there were not enough rq_pages to finish
728 * %-ENOMEM: allocating a local resources failed
729 * %-EIO: a DMA mapping error occurred
736 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
742 len = segment->rs_length;
743 sge_no = PAGE_ALIGN(head->rc_pageoff + len) >> PAGE_SHIFT;
746 return -ENOMEM;
747 ctxt->rw_nents = sge_no;
749 sg = ctxt->rw_sg_table.sgl;
750 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
752 PAGE_SIZE - head->rc_pageoff);
754 if (!head->rc_pageoff)
755 head->rc_page_count++;
757 sg_set_page(sg, rqstp->rq_pages[head->rc_curpage],
758 seg_len, head->rc_pageoff);
761 head->rc_pageoff += seg_len;
762 if (head->rc_pageoff == PAGE_SIZE) {
763 head->rc_curpage++;
764 head->rc_pageoff = 0;
766 len -= seg_len;
768 if (len && ((head->rc_curpage + 1) > rqstp->rq_maxpages))
772 ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset,
773 segment->rs_handle, DMA_FROM_DEVICE);
775 return -EIO;
778 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
779 cc->cc_sqecount += ret;
783 trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage);
784 return -EINVAL;
788 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
795 * %-EINVAL: there were not enough resources to finish
796 * %-ENOMEM: allocating a local resources failed
797 * %-EIO: a DMA mapping error occurred
806 ret = -EINVAL;
811 head->rc_readbytes += segment->rs_length;
817 * svc_rdma_copy_inline_range - Copy part of the inline content into pages
823 * Take a page at a time from rqstp->rq_pages and copy the inline
825 * head->rc_curpage and head->rc_pageoff so that the next RDMA Read
830 * %-EINVAL: offset or length was incorrect
837 unsigned char *dst, *src = head->rc_recv_buf;
840 numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT;
845 PAGE_SIZE - head->rc_pageoff);
847 if (!head->rc_pageoff)
848 head->rc_page_count++;
850 dst = page_address(rqstp->rq_pages[head->rc_curpage]);
851 memcpy(dst + head->rc_curpage, src + offset, page_len);
853 head->rc_readbytes += page_len;
854 head->rc_pageoff += page_len;
855 if (head->rc_pageoff == PAGE_SIZE) {
856 head->rc_curpage++;
857 head->rc_pageoff = 0;
859 remaining -= page_len;
863 return -EINVAL;
867 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
871 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
876 * %-EINVAL: client provided too many chunks or segments,
877 * %-ENOMEM: rdma_rw context pool was exhausted,
878 * %-ENOTCONN: posting failed (connection is lost),
879 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
885 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
892 length = chunk->ch_position;
907 length = next->ch_position - head->rc_readbytes;
914 length = head->rc_byte_len - start;
919 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
923 * The chunk data lands in the page list of rqstp->rq_arg.pages.
925 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec.
926 * Therefore, XDR round-up of the Read chunk and trailing
931 * %-EINVAL: client provided too many chunks or segments,
932 * %-ENOMEM: rdma_rw context pool was exhausted,
933 * %-ENOTCONN: posting failed (connection is lost),
934 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
940 pcl_first_chunk(&head->rc_read_pcl));
944 * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk
953 * %-EINVAL: there were not enough resources to finish
954 * %-ENOMEM: rdma_rw context pool was exhausted,
955 * %-ENOTCONN: posting failed (connection is lost),
956 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
966 ret = -EINVAL;
970 if (offset > segment->rs_length) {
971 offset -= segment->rs_length;
975 dummy.rs_handle = segment->rs_handle;
976 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
977 dummy.rs_offset = segment->rs_offset + offset;
983 head->rc_readbytes += dummy.rs_length;
984 length -= dummy.rs_length;
991 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
997 * %-EINVAL: there were not enough resources to finish
998 * %-ENOMEM: rdma_rw context pool was exhausted,
999 * %-ENOTCONN: posting failed (connection is lost),
1000 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1006 pcl_first_chunk(&head->rc_call_pcl);
1007 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
1017 length = chunk->ch_position;
1033 length = next->ch_position - head->rc_readbytes;
1041 length = call_chunk->ch_length - start;
1047 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1052 * Transport header, and the rest lands in rqstp->rq_arg.pages.
1055 * - A PZRC is never sent in an RDMA_MSG message, though it's
1060 * %-EINVAL: client provided too many chunks or segments,
1061 * %-ENOMEM: rdma_rw context pool was exhausted,
1062 * %-ENOTCONN: posting failed (connection is lost),
1063 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1071 /* Pages under I/O have been copied to head->rc_pages. Ensure that
1078 * been moved to head->rc_pages and thus will be cleaned up by
1086 for (i = 0; i < head->rc_page_count; i++) {
1087 head->rc_pages[i] = rqstp->rq_pages[i];
1088 rqstp->rq_pages[i] = NULL;
1093 * svc_rdma_process_read_list - Pull list of Read chunks from the client
1101 * On Linux, however, the server needs to have a fully-constructed RPC
1102 * message in rqstp->rq_arg when there is a positive return code from
1103 * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1110 * %-EINVAL: client provided too many chunks or segments,
1111 * %-ENOMEM: rdma_rw context pool was exhausted,
1112 * %-ENOTCONN: posting failed (connection is lost),
1113 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1119 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
1122 cc->cc_cqe.done = svc_rdma_wc_read_done;
1123 cc->cc_sqecount = 0;
1124 head->rc_pageoff = 0;
1125 head->rc_curpage = 0;
1126 head->rc_readbytes = 0;
1128 if (pcl_is_empty(&head->rc_call_pcl)) {
1129 if (head->rc_read_pcl.cl_count == 1)
1139 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);