Lines Matching +full:dma +full:- +full:write

1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3 * Copyright (c) 2016-2018 Oracle. All rights reserved.
5 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
10 * COPYING in the main directory of this source tree, or the BSD-type
50 * The passed-in svc_rqst contains a struct xdr_buf which holds an
51 * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
52 * transport header, post all Write WRs needed for this Reply, then post
70 * when it completes, it is guaranteed that all previous Write WRs have
73 * Write WRs are constructed and posted. Each Write segment gets its own
74 * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
75 * DMA-unmap the pages under I/O for that Write segment. The Write
86 * Completion Queue do not run in parallel. Otherwise a Write completion
88 * are still DMA-mapped.
92 * - If the Send WR is posted successfully, it will either complete
95 * - If the Send WR cannot be not posted, the forward path releases
99 * where two different Write segments send portions of the same page.
119 int node = ibdev_to_node(rdma->sc_cm_id->device);
126 ctxt = kzalloc_node(struct_size(ctxt, sc_sges, rdma->sc_max_send_sges),
130 pages = svc_serv_maxpages(rdma->sc_xprt.xpt_server);
131 ctxt->sc_pages = kcalloc_node(pages, sizeof(struct page *),
133 if (!ctxt->sc_pages)
135 ctxt->sc_maxpages = pages;
136 buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node);
139 addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
140 rdma->sc_max_req_size, DMA_TO_DEVICE);
141 if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
144 svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
146 ctxt->sc_rdma = rdma;
147 ctxt->sc_send_wr.next = NULL;
148 ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
149 ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
150 ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
151 ctxt->sc_cqe.done = svc_rdma_wc_send;
152 ctxt->sc_xprt_buf = buffer;
153 xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
154 rdma->sc_max_req_size);
155 ctxt->sc_sges[0].addr = addr;
157 for (i = 0; i < rdma->sc_max_send_sges; i++)
158 ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
164 kfree(ctxt->sc_pages);
172 * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt
181 while ((node = llist_del_first(&rdma->sc_send_ctxts)) != NULL) {
183 ib_dma_unmap_single(rdma->sc_pd->device,
184 ctxt->sc_sges[0].addr,
185 rdma->sc_max_req_size,
187 kfree(ctxt->sc_xprt_buf);
188 kfree(ctxt->sc_pages);
194 * svc_rdma_send_ctxt_get - Get a free send_ctxt
197 * Returns a ready-to-use send_ctxt, or NULL if none are
205 spin_lock(&rdma->sc_send_lock);
206 node = llist_del_first(&rdma->sc_send_ctxts);
207 spin_unlock(&rdma->sc_send_lock);
214 rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
215 xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
216 ctxt->sc_xprt_buf, NULL);
218 svc_rdma_cc_init(rdma, &ctxt->sc_reply_info.wi_cc);
219 ctxt->sc_send_wr.num_sge = 0;
220 ctxt->sc_cur_sge_no = 0;
221 ctxt->sc_page_count = 0;
222 ctxt->sc_wr_chain = &ctxt->sc_send_wr;
223 ctxt->sc_sqecount = 1;
237 struct ib_device *device = rdma->sc_cm_id->device;
242 if (ctxt->sc_page_count)
243 release_pages(ctxt->sc_pages, ctxt->sc_page_count);
248 for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) {
249 trace_svcrdma_dma_unmap_page(&ctxt->sc_cid,
250 ctxt->sc_sges[i].addr,
251 ctxt->sc_sges[i].length);
253 ctxt->sc_sges[i].addr,
254 ctxt->sc_sges[i].length,
258 llist_add(&ctxt->sc_node, &rdma->sc_send_ctxts);
266 svc_rdma_send_ctxt_release(ctxt->sc_rdma, ctxt);
270 * svc_rdma_send_ctxt_put - Return send_ctxt to free list
274 * Pages left in sc_pages are DMA unmapped and released.
279 INIT_WORK(&ctxt->sc_work, svc_rdma_send_ctxt_put_async);
280 queue_work(svcrdma_wq, &ctxt->sc_work);
284 * svc_rdma_wake_send_waiters - manage Send Queue accounting
291 atomic_add(avail, &rdma->sc_sq_avail);
293 if (unlikely(waitqueue_active(&rdma->sc_send_wait)))
294 wake_up(&rdma->sc_send_wait);
298 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
307 struct svcxprt_rdma *rdma = cq->cq_context;
308 struct ib_cqe *cqe = wc->wr_cqe;
312 svc_rdma_wake_send_waiters(rdma, ctxt->sc_sqecount);
314 if (unlikely(wc->status != IB_WC_SUCCESS))
317 trace_svcrdma_wc_send(&ctxt->sc_cid);
322 if (wc->status != IB_WC_WR_FLUSH_ERR)
323 trace_svcrdma_wc_send_err(wc, &ctxt->sc_cid);
325 trace_svcrdma_wc_send_flush(wc, &ctxt->sc_cid);
327 svc_xprt_deferred_close(&rdma->sc_xprt);
331 * svc_rdma_post_send - Post a WR chain to the Send Queue
346 * %-ENOTCONN: The connection was lost
351 struct ib_send_wr *first_wr = ctxt->sc_wr_chain;
352 struct ib_send_wr *send_wr = &ctxt->sc_send_wr;
354 struct rpc_rdma_cid cid = ctxt->sc_cid;
355 int ret, sqecount = ctxt->sc_sqecount;
360 ib_dma_sync_single_for_device(rdma->sc_pd->device,
361 send_wr->sg_list[0].addr,
362 send_wr->sg_list[0].length,
366 while (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) {
367 if (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) {
378 wait_event(rdma->sc_send_wait,
379 atomic_read(&rdma->sc_sq_avail) > 0);
385 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
388 svc_xprt_deferred_close(&rdma->sc_xprt);
400 return -ENOTCONN;
404 * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
410 * %-EMSGSIZE on XDR buffer overflow
414 /* RPC-over-RDMA version 1 replies never have a Read list. */
415 return xdr_stream_encode_item_absent(&sctxt->sc_stream);
419 * svc_rdma_encode_write_segment - Encode one Write segment
421 * @chunk: Write chunk to push
422 * @remaining: remaining bytes of the payload left in the Write chunk
427 * that was consumed by the Write segment, and updates @remaining
428 * %-EMSGSIZE on XDR buffer overflow
434 const struct svc_rdma_segment *segment = &chunk->ch_segments[segno];
439 p = xdr_reserve_space(&sctxt->sc_stream, len);
441 return -EMSGSIZE;
443 length = min_t(u32, *remaining, segment->rs_length);
444 *remaining -= length;
445 xdr_encode_rdma_segment(p, segment->rs_handle, length,
446 segment->rs_offset);
447 trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length,
448 segment->rs_offset);
453 * svc_rdma_encode_write_chunk - Encode one Write chunk
455 * @chunk: Write chunk to push
457 * Copy a Write chunk from the Call transport header to the
463 * that was consumed by the Write chunk
464 * %-EMSGSIZE on XDR buffer overflow
469 u32 remaining = chunk->ch_payload_length;
474 ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
479 ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount);
484 for (segno = 0; segno < chunk->ch_segcount; segno++) {
495 * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
501 * that was consumed by the Reply's Write list
502 * %-EMSGSIZE on XDR buffer overflow
511 pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
518 /* Terminate the Write list */
519 ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
527 * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
535 * %-EMSGSIZE on XDR buffer overflow
536 * %-E2BIG if the RPC message is larger than the Reply chunk
545 if (pcl_is_empty(&rctxt->rc_reply_pcl))
546 return xdr_stream_encode_item_absent(&sctxt->sc_stream);
548 chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
549 if (length > chunk->ch_length)
550 return -E2BIG;
552 chunk->ch_payload_length = length;
562 * svc_rdma_page_dma_map - DMA map one page
564 * @page: struct page to DMA map
569 * %0 if DMA mapping was successful
570 * %-EIO if the page cannot be DMA mapped
576 struct svcxprt_rdma *rdma = args->md_rdma;
577 struct svc_rdma_send_ctxt *ctxt = args->md_ctxt;
578 struct ib_device *dev = rdma->sc_cm_id->device;
581 ++ctxt->sc_cur_sge_no;
587 trace_svcrdma_dma_map_page(&ctxt->sc_cid, dma_addr, len);
588 ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr;
589 ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len;
590 ctxt->sc_send_wr.num_sge++;
594 trace_svcrdma_dma_map_err(&ctxt->sc_cid, dma_addr, len);
595 return -EIO;
599 * svc_rdma_iov_dma_map - DMA map an iovec
601 * @iov: kvec to DMA map
604 * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
607 * %0 if DMA mapping was successful
608 * %-EIO if the iovec cannot be DMA mapped
612 if (!iov->iov_len)
614 return svc_rdma_page_dma_map(data, virt_to_page(iov->iov_base),
615 offset_in_page(iov->iov_base),
616 iov->iov_len);
620 * svc_rdma_xb_dma_map - DMA map all segments of an xdr_buf
625 * %0 if DMA mapping was successful
626 * %-EIO if DMA mapping failed
628 * On failure, any DMA mappings that have been already done must be
638 ret = svc_rdma_iov_dma_map(data, &xdr->head[0]);
642 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
643 pageoff = offset_in_page(xdr->page_base);
644 remaining = xdr->page_len;
646 len = min_t(u32, PAGE_SIZE - pageoff, remaining);
652 remaining -= len;
656 ret = svc_rdma_iov_dma_map(data, &xdr->tail[0]);
660 return xdr->len;
670 * svc_rdma_xb_count_sges - Count how many SGEs will be needed
684 if (xdr->head[0].iov_len)
685 ++args->pd_num_sges;
687 offset = offset_in_page(xdr->page_base);
688 remaining = xdr->page_len;
690 ++args->pd_num_sges;
691 remaining -= min_t(u32, PAGE_SIZE - offset, remaining);
695 if (xdr->tail[0].iov_len)
696 ++args->pd_num_sges;
698 args->pd_length += xdr->len;
703 * svc_rdma_pull_up_needed - Determine whether to use pull-up
706 * @write_pcl: Write chunk list provided by client
710 * %true if pull-up must be used
720 .pd_length = sctxt->sc_hdrbuf.len,
732 return args.pd_num_sges >= rdma->sc_max_send_sges;
736 * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer
751 if (xdr->head[0].iov_len) {
752 memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len);
753 args->pd_dest += xdr->head[0].iov_len;
756 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
757 pageoff = offset_in_page(xdr->page_base);
758 remaining = xdr->page_len;
760 len = min_t(u32, PAGE_SIZE - pageoff, remaining);
761 memcpy(args->pd_dest, page_address(*ppages) + pageoff, len);
762 remaining -= len;
763 args->pd_dest += len;
768 if (xdr->tail[0].iov_len) {
769 memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
770 args->pd_dest += xdr->tail[0].iov_len;
773 args->pd_length += xdr->len;
778 * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
781 * @write_pcl: Write chunk list provided by client
791 * %0 if pull-up was successful
792 * %-EMSGSIZE if a buffer manipulation problem occurred
800 .pd_dest = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len,
809 sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length;
814 /* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
817 * @write_pcl: Write chunk list provided by client
822 * %0 if DMA mapping was successful.
823 * %-EMSGSIZE if a buffer manipulation problem occurred
824 * %-EIO if DMA mapping failed
839 /* Set up the (persistently-mapped) transport header SGE. */
840 sctxt->sc_send_wr.num_sge = 1;
841 sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
849 /* For pull-up, svc_rdma_send() will sync the transport header.
850 * No additional DMA mapping is necessary.
866 int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
868 ctxt->sc_page_count += pages;
870 ctxt->sc_pages[i] = rqstp->rq_respages[i];
871 rqstp->rq_respages[i] = NULL;
875 rqstp->rq_next_page = rqstp->rq_respages;
879 * via RDMA Send. The RPC-over-RDMA transport header is prepared
882 * Depending on whether a Write list or Reply chunk is present,
888 * - The Reply's transport header will never be larger than a page.
895 struct ib_send_wr *send_wr = &sctxt->sc_send_wr;
898 ret = svc_rdma_map_reply_msg(rdma, sctxt, &rctxt->rc_write_pcl,
899 &rctxt->rc_reply_pcl, &rqstp->rq_res);
908 if (rctxt->rc_inv_rkey) {
909 send_wr->opcode = IB_WR_SEND_WITH_INV;
910 send_wr->ex.invalidate_rkey = rctxt->rc_inv_rkey;
912 send_wr->opcode = IB_WR_SEND;
919 * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
925 * Given the client-provided Read, Write, and Reply chunks, the
938 __be32 *rdma_argp = rctxt->rc_recv_buf;
941 rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0);
942 xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf,
943 sctxt->sc_xprt_buf, NULL);
945 p = xdr_reserve_space(&sctxt->sc_stream,
952 *p++ = rdma->sc_fc_credits;
956 case -EPROTONOSUPPORT:
957 p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p));
967 p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p));
976 sctxt->sc_send_wr.num_sge = 1;
977 sctxt->sc_send_wr.opcode = IB_WR_SEND;
978 sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
988 * svc_rdma_sendto - Transmit an RPC reply
996 * %-ENOMEM if a resource shortage occurred (connection is lost),
997 * %-ENOTCONN if posting failed (connection is lost).
1001 struct svc_xprt *xprt = rqstp->rq_xprt;
1004 struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
1005 __be32 *rdma_argp = rctxt->rc_recv_buf;
1011 ret = -ENOTCONN;
1015 ret = -ENOMEM;
1020 ret = -EMSGSIZE;
1021 p = xdr_reserve_space(&sctxt->sc_stream,
1026 ret = svc_rdma_send_write_list(rdma, rctxt, &rqstp->rq_res);
1031 if (!pcl_is_empty(&rctxt->rc_reply_pcl)) {
1032 ret = svc_rdma_prepare_reply_chunk(rdma, &rctxt->rc_write_pcl,
1033 &rctxt->rc_reply_pcl, sctxt,
1034 &rqstp->rq_res);
1042 *p++ = rdma->sc_fc_credits;
1043 *p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg;
1061 if (ret != -E2BIG && ret != -EINVAL)
1075 svc_xprt_deferred_close(&rdma->sc_xprt);
1076 return -ENOTCONN;
1080 * svc_rdma_result_payload - special processing for a result payload
1082 * @offset: payload's byte offset in @rqstp->rq_res
1085 * Assign the passed-in result payload to the current Write chunk,
1086 * and advance to cur_result_payload to the next Write chunk, if
1091 * %-E2BIG if the payload was larger than the Write chunk
1096 struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
1099 chunk = rctxt->rc_cur_result_payload;
1102 rctxt->rc_cur_result_payload =
1103 pcl_next_chunk(&rctxt->rc_write_pcl, chunk);
1105 if (length > chunk->ch_length)
1106 return -E2BIG;
1107 chunk->ch_position = offset;
1108 chunk->ch_payload_length = length;