xref: /linux/net/sunrpc/xprtrdma/svc_rdma_sendto.c (revision d16f060f3ee297424c0aba047b1d49208adb9318)
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2016-2018 Oracle. All rights reserved.
4  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
5  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
6  *
7  * This software is available to you under a choice of one of two
8  * licenses.  You may choose to be licensed under the terms of the GNU
9  * General Public License (GPL) Version 2, available from the file
10  * COPYING in the main directory of this source tree, or the BSD-type
11  * license below:
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions
15  * are met:
16  *
17  *      Redistributions of source code must retain the above copyright
18  *      notice, this list of conditions and the following disclaimer.
19  *
20  *      Redistributions in binary form must reproduce the above
21  *      copyright notice, this list of conditions and the following
22  *      disclaimer in the documentation and/or other materials provided
23  *      with the distribution.
24  *
25  *      Neither the name of the Network Appliance, Inc. nor the names of
26  *      its contributors may be used to endorse or promote products
27  *      derived from this software without specific prior written
28  *      permission.
29  *
30  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
31  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
32  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
33  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
34  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
36  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
37  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
38  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
39  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
40  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41  *
42  * Author: Tom Tucker <tom@opengridcomputing.com>
43  */
44 
45 /* Operation
46  *
47  * The main entry point is svc_rdma_sendto. This is called by the
48  * RPC server when an RPC Reply is ready to be transmitted to a client.
49  *
50  * The passed-in svc_rqst contains a struct xdr_buf which holds an
51  * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
52  * transport header, post all Write WRs needed for this Reply, then post
53  * a Send WR conveying the transport header and the RPC message itself to
54  * the client.
55  *
56  * svc_rdma_sendto must fully transmit the Reply before returning, as
57  * the svc_rqst will be recycled as soon as sendto returns. Remaining
58  * resources referred to by the svc_rqst are also recycled at that time.
59  * Therefore any resources that must remain longer must be detached
60  * from the svc_rqst and released later.
61  *
62  * Page Management
63  *
64  * The I/O that performs Reply transmission is asynchronous, and may
65  * complete well after sendto returns. Thus pages under I/O must be
66  * removed from the svc_rqst before sendto returns.
67  *
68  * The logic here depends on Send Queue and completion ordering. Since
69  * the Send WR is always posted last, it will always complete last. Thus
70  * when it completes, it is guaranteed that all previous Write WRs have
71  * also completed.
72  *
73  * Write WRs are constructed and posted. Each Write segment gets its own
74  * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
75  * DMA-unmap the pages under I/O for that Write segment. The Write
76  * completion handler does not release any pages.
77  *
78  * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt.
79  * The ownership of all of the Reply's pages are transferred into that
80  * ctxt, the Send WR is posted, and sendto returns.
81  *
82  * The svc_rdma_send_ctxt is presented when the Send WR completes. The
83  * Send completion handler finally releases the Reply's pages.
84  *
85  * This mechanism also assumes that completions on the transport's Send
86  * Completion Queue do not run in parallel. Otherwise a Write completion
87  * and Send completion running at the same time could release pages that
88  * are still DMA-mapped.
89  *
90  * Error Handling
91  *
92  * - If the Send WR is posted successfully, it will either complete
93  *   successfully, or get flushed. Either way, the Send completion
94  *   handler releases the Reply's pages.
95  * - If the Send WR cannot be not posted, the forward path releases
96  *   the Reply's pages.
97  *
98  * This handles the case, without the use of page reference counting,
99  * where two different Write segments send portions of the same page.
100  */
101 
102 #include <linux/spinlock.h>
103 #include <linux/unaligned.h>
104 
105 #include <rdma/ib_verbs.h>
106 #include <rdma/rdma_cm.h>
107 
108 #include <linux/sunrpc/debug.h>
109 #include <linux/sunrpc/svc_rdma.h>
110 
111 #include "xprt_rdma.h"
112 #include <trace/events/rpcrdma.h>
113 
114 static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc);
115 
116 static struct svc_rdma_send_ctxt *
117 svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
118 {
119 	struct ib_device *device = rdma->sc_cm_id->device;
120 	int node = ibdev_to_node(device);
121 	struct svc_rdma_send_ctxt *ctxt;
122 	unsigned long pages;
123 	dma_addr_t addr;
124 	void *buffer;
125 	int i;
126 
127 	ctxt = kzalloc_node(struct_size(ctxt, sc_sges, rdma->sc_max_send_sges),
128 			    GFP_KERNEL, node);
129 	if (!ctxt)
130 		goto fail0;
131 	pages = svc_serv_maxpages(rdma->sc_xprt.xpt_server);
132 	ctxt->sc_pages = kcalloc_node(pages, sizeof(struct page *),
133 				      GFP_KERNEL, node);
134 	if (!ctxt->sc_pages)
135 		goto fail1;
136 	ctxt->sc_maxpages = pages;
137 	buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node);
138 	if (!buffer)
139 		goto fail2;
140 	addr = ib_dma_map_single(device, buffer, rdma->sc_max_req_size,
141 				 DMA_TO_DEVICE);
142 	if (ib_dma_mapping_error(device, addr))
143 		goto fail3;
144 
145 	svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
146 
147 	ctxt->sc_rdma = rdma;
148 	ctxt->sc_send_wr.next = NULL;
149 	ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
150 	ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
151 	ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
152 	ctxt->sc_cqe.done = svc_rdma_wc_send;
153 	INIT_LIST_HEAD(&ctxt->sc_write_info_list);
154 	ctxt->sc_xprt_buf = buffer;
155 	xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
156 		     rdma->sc_max_req_size);
157 	ctxt->sc_sges[0].addr = addr;
158 
159 	for (i = 0; i < rdma->sc_max_send_sges; i++)
160 		ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
161 	return ctxt;
162 
163 fail3:
164 	kfree(buffer);
165 fail2:
166 	kfree(ctxt->sc_pages);
167 fail1:
168 	kfree(ctxt);
169 fail0:
170 	return NULL;
171 }
172 
173 /**
174  * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt
175  * @rdma: svcxprt_rdma being torn down
176  *
177  */
178 void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
179 {
180 	struct ib_device *device = rdma->sc_cm_id->device;
181 	struct svc_rdma_send_ctxt *ctxt;
182 	struct llist_node *node;
183 
184 	while ((node = llist_del_first(&rdma->sc_send_ctxts)) != NULL) {
185 		ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node);
186 		ib_dma_unmap_single(device, ctxt->sc_sges[0].addr,
187 				    rdma->sc_max_req_size, DMA_TO_DEVICE);
188 		kfree(ctxt->sc_xprt_buf);
189 		kfree(ctxt->sc_pages);
190 		kfree(ctxt);
191 	}
192 }
193 
194 /**
195  * svc_rdma_send_ctxt_get - Get a free send_ctxt
196  * @rdma: controlling svcxprt_rdma
197  *
198  * Returns a ready-to-use send_ctxt, or NULL if none are
199  * available and a fresh one cannot be allocated.
200  */
201 struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
202 {
203 	struct svc_rdma_send_ctxt *ctxt;
204 	struct llist_node *node;
205 
206 	spin_lock(&rdma->sc_send_lock);
207 	node = llist_del_first(&rdma->sc_send_ctxts);
208 	spin_unlock(&rdma->sc_send_lock);
209 	if (!node)
210 		goto out_empty;
211 
212 	ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node);
213 
214 out:
215 	rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
216 	xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
217 			ctxt->sc_xprt_buf, NULL);
218 
219 	svc_rdma_cc_init(rdma, &ctxt->sc_reply_info.wi_cc);
220 	ctxt->sc_send_wr.num_sge = 0;
221 	ctxt->sc_cur_sge_no = 0;
222 	ctxt->sc_page_count = 0;
223 	ctxt->sc_wr_chain = &ctxt->sc_send_wr;
224 	ctxt->sc_sqecount = 1;
225 
226 	return ctxt;
227 
228 out_empty:
229 	ctxt = svc_rdma_send_ctxt_alloc(rdma);
230 	if (!ctxt)
231 		return NULL;
232 	goto out;
233 }
234 
235 static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma,
236 				       struct svc_rdma_send_ctxt *ctxt)
237 {
238 	struct ib_device *device = rdma->sc_cm_id->device;
239 	unsigned int i;
240 
241 	svc_rdma_write_chunk_release(rdma, ctxt);
242 	svc_rdma_reply_chunk_release(rdma, ctxt);
243 
244 	if (ctxt->sc_page_count)
245 		release_pages(ctxt->sc_pages, ctxt->sc_page_count);
246 
247 	/* The first SGE contains the transport header, which
248 	 * remains mapped until @ctxt is destroyed.
249 	 */
250 	for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) {
251 		trace_svcrdma_dma_unmap_page(&ctxt->sc_cid,
252 					     ctxt->sc_sges[i].addr,
253 					     ctxt->sc_sges[i].length);
254 		ib_dma_unmap_page(device,
255 				  ctxt->sc_sges[i].addr,
256 				  ctxt->sc_sges[i].length,
257 				  DMA_TO_DEVICE);
258 	}
259 
260 	llist_add(&ctxt->sc_node, &rdma->sc_send_ctxts);
261 }
262 
263 static void svc_rdma_send_ctxt_put_async(struct work_struct *work)
264 {
265 	struct svc_rdma_send_ctxt *ctxt;
266 
267 	ctxt = container_of(work, struct svc_rdma_send_ctxt, sc_work);
268 	svc_rdma_send_ctxt_release(ctxt->sc_rdma, ctxt);
269 }
270 
271 /**
272  * svc_rdma_send_ctxt_put - Return send_ctxt to free list
273  * @rdma: controlling svcxprt_rdma
274  * @ctxt: object to return to the free list
275  *
276  * Pages left in sc_pages are DMA unmapped and released.
277  */
278 void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
279 			    struct svc_rdma_send_ctxt *ctxt)
280 {
281 	INIT_WORK(&ctxt->sc_work, svc_rdma_send_ctxt_put_async);
282 	queue_work(svcrdma_wq, &ctxt->sc_work);
283 }
284 
285 /**
286  * svc_rdma_wake_send_waiters - manage Send Queue accounting
287  * @rdma: controlling transport
288  * @avail: Number of additional SQEs that are now available
289  *
290  */
291 void svc_rdma_wake_send_waiters(struct svcxprt_rdma *rdma, int avail)
292 {
293 	atomic_add(avail, &rdma->sc_sq_avail);
294 	smp_mb__after_atomic();
295 	if (unlikely(waitqueue_active(&rdma->sc_send_wait)))
296 		wake_up(&rdma->sc_send_wait);
297 }
298 
299 /**
300  * svc_rdma_sq_wait - Wait for SQ slots using fair queuing
301  * @rdma: controlling transport
302  * @cid: completion ID for tracing
303  * @sqecount: number of SQ entries needed
304  *
305  * A ticket-based system ensures fair ordering when multiple threads
306  * wait for Send Queue capacity. Each waiter takes a ticket and is
307  * served in order, preventing starvation.
308  *
309  * Protocol invariant: every ticket holder must increment
310  * sc_sq_ticket_tail exactly once, whether the reservation
311  * succeeds or the connection closes. Failing to advance the
312  * tail stalls all subsequent waiters.
313  *
314  * The ticket counters are signed 32-bit atomics. After
315  * wrapping through INT_MAX, the equality check
316  * (tail == ticket) remains correct because both counters
317  * advance monotonically and the comparison uses exact
318  * equality rather than relational operators.
319  *
320  * Return values:
321  *   %0: SQ slots were reserved successfully
322  *   %-ENOTCONN: The connection was lost
323  */
324 int svc_rdma_sq_wait(struct svcxprt_rdma *rdma,
325 		     const struct rpc_rdma_cid *cid, int sqecount)
326 {
327 	int ticket;
328 
329 	/* Fast path: try to reserve SQ slots without waiting.
330 	 *
331 	 * A failed reservation temporarily understates sc_sq_avail
332 	 * until the compensating atomic_add restores it. A Send
333 	 * completion arriving in that window sees a lower count
334 	 * than reality, but the value self-corrects once the add
335 	 * completes. No ordering guarantee is needed here because
336 	 * the slow path serializes all contended waiters.
337 	 */
338 	if (likely(atomic_sub_return(sqecount, &rdma->sc_sq_avail) >= 0))
339 		return 0;
340 	atomic_add(sqecount, &rdma->sc_sq_avail);
341 
342 	/* Slow path: take a ticket and wait in line */
343 	ticket = atomic_fetch_inc(&rdma->sc_sq_ticket_head);
344 
345 	percpu_counter_inc(&svcrdma_stat_sq_starve);
346 	trace_svcrdma_sq_full(rdma, cid);
347 
348 	/* Wait until all earlier tickets have been served */
349 	wait_event(rdma->sc_sq_ticket_wait,
350 		   test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) ||
351 		   atomic_read(&rdma->sc_sq_ticket_tail) == ticket);
352 	if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
353 		goto out_close;
354 
355 	/* It's our turn. Wait for enough SQ slots to be available. */
356 	while (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) {
357 		atomic_add(sqecount, &rdma->sc_sq_avail);
358 
359 		wait_event(rdma->sc_send_wait,
360 			   test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) ||
361 			   atomic_read(&rdma->sc_sq_avail) >= sqecount);
362 		if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
363 			goto out_close;
364 	}
365 
366 	/* Slots reserved successfully. Let the next waiter proceed. */
367 	atomic_inc(&rdma->sc_sq_ticket_tail);
368 	wake_up(&rdma->sc_sq_ticket_wait);
369 	trace_svcrdma_sq_retry(rdma, cid);
370 	return 0;
371 
372 out_close:
373 	atomic_inc(&rdma->sc_sq_ticket_tail);
374 	wake_up(&rdma->sc_sq_ticket_wait);
375 	return -ENOTCONN;
376 }
377 
378 /**
379  * svc_rdma_post_send_err - Handle ib_post_send failure
380  * @rdma: controlling transport
381  * @cid: completion ID for tracing
382  * @bad_wr: first WR that was not posted
383  * @first_wr: first WR in the chain
384  * @sqecount: number of SQ entries that were reserved
385  * @ret: error code from ib_post_send
386  *
387  * Return values:
388  *   %0: At least one WR was posted; a completion handles cleanup
389  *   %-ENOTCONN: No WRs were posted; SQ slots are released
390  */
391 int svc_rdma_post_send_err(struct svcxprt_rdma *rdma,
392 			   const struct rpc_rdma_cid *cid,
393 			   const struct ib_send_wr *bad_wr,
394 			   const struct ib_send_wr *first_wr,
395 			   int sqecount, int ret)
396 {
397 	trace_svcrdma_sq_post_err(rdma, cid, ret);
398 	svc_xprt_deferred_close(&rdma->sc_xprt);
399 
400 	/* If even one WR was posted, a Send completion will
401 	 * return the reserved SQ slots.
402 	 */
403 	if (bad_wr != first_wr)
404 		return 0;
405 
406 	svc_rdma_wake_send_waiters(rdma, sqecount);
407 	return -ENOTCONN;
408 }
409 
410 /**
411  * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
412  * @cq: Completion Queue context
413  * @wc: Work Completion object
414  *
415  * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
416  * the Send completion handler could be running.
417  */
418 static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
419 {
420 	struct svcxprt_rdma *rdma = cq->cq_context;
421 	struct ib_cqe *cqe = wc->wr_cqe;
422 	struct svc_rdma_send_ctxt *ctxt =
423 		container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
424 
425 	svc_rdma_wake_send_waiters(rdma, ctxt->sc_sqecount);
426 
427 	if (unlikely(wc->status != IB_WC_SUCCESS))
428 		goto flushed;
429 
430 	trace_svcrdma_wc_send(&ctxt->sc_cid);
431 	svc_rdma_send_ctxt_put(rdma, ctxt);
432 	return;
433 
434 flushed:
435 	if (wc->status != IB_WC_WR_FLUSH_ERR)
436 		trace_svcrdma_wc_send_err(wc, &ctxt->sc_cid);
437 	else
438 		trace_svcrdma_wc_send_flush(wc, &ctxt->sc_cid);
439 	svc_rdma_send_ctxt_put(rdma, ctxt);
440 	svc_xprt_deferred_close(&rdma->sc_xprt);
441 }
442 
443 /**
444  * svc_rdma_post_send - Post a WR chain to the Send Queue
445  * @rdma: transport context
446  * @ctxt: WR chain to post
447  *
448  * Copy fields in @ctxt to stack variables in order to guarantee
449  * that these values remain available after the ib_post_send() call.
450  * In some error flow cases, svc_rdma_wc_send() releases @ctxt.
451  *
452  * Return values:
453  *   %0: @ctxt's WR chain was posted successfully
454  *   %-ENOTCONN: The connection was lost
455  */
456 int svc_rdma_post_send(struct svcxprt_rdma *rdma,
457 		       struct svc_rdma_send_ctxt *ctxt)
458 {
459 	struct ib_send_wr *first_wr = ctxt->sc_wr_chain;
460 	struct ib_send_wr *send_wr = &ctxt->sc_send_wr;
461 	const struct ib_send_wr *bad_wr = first_wr;
462 	struct rpc_rdma_cid cid = ctxt->sc_cid;
463 	int ret, sqecount = ctxt->sc_sqecount;
464 
465 	might_sleep();
466 
467 	/* Sync the transport header buffer */
468 	ib_dma_sync_single_for_device(rdma->sc_cm_id->device,
469 				      send_wr->sg_list[0].addr,
470 				      send_wr->sg_list[0].length,
471 				      DMA_TO_DEVICE);
472 
473 	ret = svc_rdma_sq_wait(rdma, &cid, sqecount);
474 	if (ret < 0)
475 		return ret;
476 
477 	trace_svcrdma_post_send(ctxt);
478 	ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
479 	if (ret)
480 		return svc_rdma_post_send_err(rdma, &cid, bad_wr,
481 					      first_wr, sqecount, ret);
482 	return 0;
483 }
484 
485 /**
486  * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
487  * @sctxt: Send context for the RPC Reply
488  *
489  * Return values:
490  *   On success, returns length in bytes of the Reply XDR buffer
491  *   that was consumed by the Reply Read list
492  *   %-EMSGSIZE on XDR buffer overflow
493  */
494 static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
495 {
496 	/* RPC-over-RDMA version 1 replies never have a Read list. */
497 	return xdr_stream_encode_item_absent(&sctxt->sc_stream);
498 }
499 
500 /**
501  * svc_rdma_encode_write_segment - Encode one Write segment
502  * @sctxt: Send context for the RPC Reply
503  * @chunk: Write chunk to push
504  * @remaining: remaining bytes of the payload left in the Write chunk
505  * @segno: which segment in the chunk
506  *
507  * Return values:
508  *   On success, returns length in bytes of the Reply XDR buffer
509  *   that was consumed by the Write segment, and updates @remaining
510  *   %-EMSGSIZE on XDR buffer overflow
511  */
512 static ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt,
513 					     const struct svc_rdma_chunk *chunk,
514 					     u32 *remaining, unsigned int segno)
515 {
516 	const struct svc_rdma_segment *segment = &chunk->ch_segments[segno];
517 	const size_t len = rpcrdma_segment_maxsz * sizeof(__be32);
518 	u32 length;
519 	__be32 *p;
520 
521 	p = xdr_reserve_space(&sctxt->sc_stream, len);
522 	if (!p)
523 		return -EMSGSIZE;
524 
525 	length = min_t(u32, *remaining, segment->rs_length);
526 	*remaining -= length;
527 	xdr_encode_rdma_segment(p, segment->rs_handle, length,
528 				segment->rs_offset);
529 	trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length,
530 				  segment->rs_offset);
531 	return len;
532 }
533 
534 /**
535  * svc_rdma_encode_write_chunk - Encode one Write chunk
536  * @sctxt: Send context for the RPC Reply
537  * @chunk: Write chunk to push
538  *
539  * Copy a Write chunk from the Call transport header to the
540  * Reply transport header. Update each segment's length field
541  * to reflect the number of bytes written in that segment.
542  *
543  * Return values:
544  *   On success, returns length in bytes of the Reply XDR buffer
545  *   that was consumed by the Write chunk
546  *   %-EMSGSIZE on XDR buffer overflow
547  */
548 static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt,
549 					   const struct svc_rdma_chunk *chunk)
550 {
551 	u32 remaining = chunk->ch_payload_length;
552 	unsigned int segno;
553 	ssize_t len, ret;
554 
555 	len = 0;
556 	ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
557 	if (ret < 0)
558 		return ret;
559 	len += ret;
560 
561 	ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount);
562 	if (ret < 0)
563 		return ret;
564 	len += ret;
565 
566 	for (segno = 0; segno < chunk->ch_segcount; segno++) {
567 		ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno);
568 		if (ret < 0)
569 			return ret;
570 		len += ret;
571 	}
572 
573 	return len;
574 }
575 
576 /**
577  * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
578  * @rctxt: Reply context with information about the RPC Call
579  * @sctxt: Send context for the RPC Reply
580  *
581  * Return values:
582  *   On success, returns length in bytes of the Reply XDR buffer
583  *   that was consumed by the Reply's Write list
584  *   %-EMSGSIZE on XDR buffer overflow
585  */
586 static ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt,
587 					  struct svc_rdma_send_ctxt *sctxt)
588 {
589 	struct svc_rdma_chunk *chunk;
590 	ssize_t len, ret;
591 
592 	len = 0;
593 	pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
594 		ret = svc_rdma_encode_write_chunk(sctxt, chunk);
595 		if (ret < 0)
596 			return ret;
597 		len += ret;
598 	}
599 
600 	/* Terminate the Write list */
601 	ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
602 	if (ret < 0)
603 		return ret;
604 
605 	return len + ret;
606 }
607 
608 /**
609  * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
610  * @rctxt: Reply context with information about the RPC Call
611  * @sctxt: Send context for the RPC Reply
612  * @length: size in bytes of the payload in the Reply chunk
613  *
614  * Return values:
615  *   On success, returns length in bytes of the Reply XDR buffer
616  *   that was consumed by the Reply's Reply chunk
617  *   %-EMSGSIZE on XDR buffer overflow
618  *   %-E2BIG if the RPC message is larger than the Reply chunk
619  */
620 static ssize_t
621 svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt,
622 			    struct svc_rdma_send_ctxt *sctxt,
623 			    unsigned int length)
624 {
625 	struct svc_rdma_chunk *chunk;
626 
627 	if (pcl_is_empty(&rctxt->rc_reply_pcl))
628 		return xdr_stream_encode_item_absent(&sctxt->sc_stream);
629 
630 	chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
631 	if (length > chunk->ch_length)
632 		return -E2BIG;
633 
634 	chunk->ch_payload_length = length;
635 	return svc_rdma_encode_write_chunk(sctxt, chunk);
636 }
637 
638 struct svc_rdma_map_data {
639 	struct svcxprt_rdma		*md_rdma;
640 	struct svc_rdma_send_ctxt	*md_ctxt;
641 };
642 
643 /**
644  * svc_rdma_page_dma_map - DMA map one page
645  * @data: pointer to arguments
646  * @page: struct page to DMA map
647  * @offset: offset into the page
648  * @len: number of bytes to map
649  *
650  * Returns:
651  *   %0 if DMA mapping was successful
652  *   %-EIO if the page cannot be DMA mapped
653  */
654 static int svc_rdma_page_dma_map(void *data, struct page *page,
655 				 unsigned long offset, unsigned int len)
656 {
657 	struct svc_rdma_map_data *args = data;
658 	struct svcxprt_rdma *rdma = args->md_rdma;
659 	struct svc_rdma_send_ctxt *ctxt = args->md_ctxt;
660 	struct ib_device *dev = rdma->sc_cm_id->device;
661 	dma_addr_t dma_addr;
662 
663 	++ctxt->sc_cur_sge_no;
664 
665 	dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
666 	if (ib_dma_mapping_error(dev, dma_addr))
667 		goto out_maperr;
668 
669 	trace_svcrdma_dma_map_page(&ctxt->sc_cid, dma_addr, len);
670 	ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr;
671 	ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len;
672 	ctxt->sc_send_wr.num_sge++;
673 	return 0;
674 
675 out_maperr:
676 	trace_svcrdma_dma_map_err(&ctxt->sc_cid, dma_addr, len);
677 	return -EIO;
678 }
679 
680 /**
681  * svc_rdma_iov_dma_map - DMA map an iovec
682  * @data: pointer to arguments
683  * @iov: kvec to DMA map
684  *
685  * ib_dma_map_page() is used here because svc_rdma_dma_unmap()
686  * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
687  *
688  * Returns:
689  *   %0 if DMA mapping was successful
690  *   %-EIO if the iovec cannot be DMA mapped
691  */
692 static int svc_rdma_iov_dma_map(void *data, const struct kvec *iov)
693 {
694 	if (!iov->iov_len)
695 		return 0;
696 	return svc_rdma_page_dma_map(data, virt_to_page(iov->iov_base),
697 				     offset_in_page(iov->iov_base),
698 				     iov->iov_len);
699 }
700 
701 /**
702  * svc_rdma_xb_dma_map - DMA map all segments of an xdr_buf
703  * @xdr: xdr_buf containing portion of an RPC message to transmit
704  * @data: pointer to arguments
705  *
706  * Returns:
707  *   %0 if DMA mapping was successful
708  *   %-EIO if DMA mapping failed
709  *
710  * On failure, any DMA mappings that have been already done must be
711  * unmapped by the caller.
712  */
713 static int svc_rdma_xb_dma_map(const struct xdr_buf *xdr, void *data)
714 {
715 	unsigned int len, remaining;
716 	unsigned long pageoff;
717 	struct page **ppages;
718 	int ret;
719 
720 	ret = svc_rdma_iov_dma_map(data, &xdr->head[0]);
721 	if (ret < 0)
722 		return ret;
723 
724 	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
725 	pageoff = offset_in_page(xdr->page_base);
726 	remaining = xdr->page_len;
727 	while (remaining) {
728 		len = min_t(u32, PAGE_SIZE - pageoff, remaining);
729 
730 		ret = svc_rdma_page_dma_map(data, *ppages++, pageoff, len);
731 		if (ret < 0)
732 			return ret;
733 
734 		remaining -= len;
735 		pageoff = 0;
736 	}
737 
738 	ret = svc_rdma_iov_dma_map(data, &xdr->tail[0]);
739 	if (ret < 0)
740 		return ret;
741 
742 	return xdr->len;
743 }
744 
745 struct svc_rdma_pullup_data {
746 	u8		*pd_dest;
747 	unsigned int	pd_length;
748 	unsigned int	pd_num_sges;
749 };
750 
751 /**
752  * svc_rdma_xb_count_sges - Count how many SGEs will be needed
753  * @xdr: xdr_buf containing portion of an RPC message to transmit
754  * @data: pointer to arguments
755  *
756  * Returns:
757  *   Number of SGEs needed to Send the contents of @xdr inline
758  */
759 static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr,
760 				  void *data)
761 {
762 	struct svc_rdma_pullup_data *args = data;
763 	unsigned int remaining;
764 	unsigned long offset;
765 
766 	if (xdr->head[0].iov_len)
767 		++args->pd_num_sges;
768 
769 	offset = offset_in_page(xdr->page_base);
770 	remaining = xdr->page_len;
771 	while (remaining) {
772 		++args->pd_num_sges;
773 		remaining -= min_t(u32, PAGE_SIZE - offset, remaining);
774 		offset = 0;
775 	}
776 
777 	if (xdr->tail[0].iov_len)
778 		++args->pd_num_sges;
779 
780 	args->pd_length += xdr->len;
781 	return 0;
782 }
783 
784 /**
785  * svc_rdma_pull_up_needed - Determine whether to use pull-up
786  * @rdma: controlling transport
787  * @sctxt: send_ctxt for the Send WR
788  * @write_pcl: Write chunk list provided by client
789  * @xdr: xdr_buf containing RPC message to transmit
790  *
791  * Returns:
792  *   %true if pull-up must be used
793  *   %false otherwise
794  */
795 static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma,
796 				    const struct svc_rdma_send_ctxt *sctxt,
797 				    const struct svc_rdma_pcl *write_pcl,
798 				    const struct xdr_buf *xdr)
799 {
800 	/* Resources needed for the transport header */
801 	struct svc_rdma_pullup_data args = {
802 		.pd_length	= sctxt->sc_hdrbuf.len,
803 		.pd_num_sges	= 1,
804 	};
805 	int ret;
806 
807 	ret = pcl_process_nonpayloads(write_pcl, xdr,
808 				      svc_rdma_xb_count_sges, &args);
809 	if (ret < 0)
810 		return false;
811 
812 	if (args.pd_length < RPCRDMA_PULLUP_THRESH)
813 		return true;
814 	return args.pd_num_sges >= rdma->sc_max_send_sges;
815 }
816 
817 /**
818  * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer
819  * @xdr: xdr_buf containing portion of an RPC message to copy
820  * @data: pointer to arguments
821  *
822  * Returns:
823  *   Always zero.
824  */
825 static int svc_rdma_xb_linearize(const struct xdr_buf *xdr,
826 				 void *data)
827 {
828 	struct svc_rdma_pullup_data *args = data;
829 	unsigned int len, remaining;
830 	unsigned long pageoff;
831 	struct page **ppages;
832 
833 	if (xdr->head[0].iov_len) {
834 		memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len);
835 		args->pd_dest += xdr->head[0].iov_len;
836 	}
837 
838 	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
839 	pageoff = offset_in_page(xdr->page_base);
840 	remaining = xdr->page_len;
841 	while (remaining) {
842 		len = min_t(u32, PAGE_SIZE - pageoff, remaining);
843 		memcpy(args->pd_dest, page_address(*ppages) + pageoff, len);
844 		remaining -= len;
845 		args->pd_dest += len;
846 		pageoff = 0;
847 		ppages++;
848 	}
849 
850 	if (xdr->tail[0].iov_len) {
851 		memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
852 		args->pd_dest += xdr->tail[0].iov_len;
853 	}
854 
855 	args->pd_length += xdr->len;
856 	return 0;
857 }
858 
859 /**
860  * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
861  * @rdma: controlling transport
862  * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
863  * @write_pcl: Write chunk list provided by client
864  * @xdr: prepared xdr_buf containing RPC message
865  *
866  * The device is not capable of sending the reply directly.
867  * Assemble the elements of @xdr into the transport header buffer.
868  *
869  * Assumptions:
870  *  pull_up_needed has determined that @xdr will fit in the buffer.
871  *
872  * Returns:
873  *   %0 if pull-up was successful
874  *   %-EMSGSIZE if a buffer manipulation problem occurred
875  */
876 static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
877 				      struct svc_rdma_send_ctxt *sctxt,
878 				      const struct svc_rdma_pcl *write_pcl,
879 				      const struct xdr_buf *xdr)
880 {
881 	struct svc_rdma_pullup_data args = {
882 		.pd_dest	= sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len,
883 	};
884 	int ret;
885 
886 	ret = pcl_process_nonpayloads(write_pcl, xdr,
887 				      svc_rdma_xb_linearize, &args);
888 	if (ret < 0)
889 		return ret;
890 
891 	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length;
892 	trace_svcrdma_send_pullup(sctxt, args.pd_length);
893 	return 0;
894 }
895 
896 /* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
897  * @rdma: controlling transport
898  * @sctxt: send_ctxt for the Send WR
899  * @write_pcl: Write chunk list provided by client
900  * @reply_pcl: Reply chunk provided by client
901  * @xdr: prepared xdr_buf containing RPC message
902  *
903  * Returns:
904  *   %0 if DMA mapping was successful.
905  *   %-EMSGSIZE if a buffer manipulation problem occurred
906  *   %-EIO if DMA mapping failed
907  *
908  * The Send WR's num_sge field is set in all cases.
909  */
910 int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
911 			   struct svc_rdma_send_ctxt *sctxt,
912 			   const struct svc_rdma_pcl *write_pcl,
913 			   const struct svc_rdma_pcl *reply_pcl,
914 			   const struct xdr_buf *xdr)
915 {
916 	struct svc_rdma_map_data args = {
917 		.md_rdma	= rdma,
918 		.md_ctxt	= sctxt,
919 	};
920 
921 	/* Set up the (persistently-mapped) transport header SGE. */
922 	sctxt->sc_send_wr.num_sge = 1;
923 	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
924 
925 	/* If there is a Reply chunk, nothing follows the transport
926 	 * header, so there is nothing to map.
927 	 */
928 	if (!pcl_is_empty(reply_pcl))
929 		return 0;
930 
931 	/* For pull-up, svc_rdma_send() will sync the transport header.
932 	 * No additional DMA mapping is necessary.
933 	 */
934 	if (svc_rdma_pull_up_needed(rdma, sctxt, write_pcl, xdr))
935 		return svc_rdma_pull_up_reply_msg(rdma, sctxt, write_pcl, xdr);
936 
937 	return pcl_process_nonpayloads(write_pcl, xdr,
938 				       svc_rdma_xb_dma_map, &args);
939 }
940 
941 /* The svc_rqst and all resources it owns are released as soon as
942  * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
943  * so they are released only after Send completion, and not by
944  * svc_rqst_release_pages().
945  */
946 static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
947 				   struct svc_rdma_send_ctxt *ctxt)
948 {
949 	int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
950 
951 	ctxt->sc_page_count += pages;
952 	for (i = 0; i < pages; i++) {
953 		ctxt->sc_pages[i] = rqstp->rq_respages[i];
954 		rqstp->rq_respages[i] = NULL;
955 	}
956 }
957 
958 /* Prepare the portion of the RPC Reply that will be transmitted
959  * via RDMA Send. The RPC-over-RDMA transport header is prepared
960  * in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
961  *
962  * Depending on whether a Write list or Reply chunk is present,
963  * the server may Send all, a portion of, or none of the xdr_buf.
964  * In the latter case, only the transport header (sc_sges[0]) is
965  * transmitted.
966  *
967  * Assumptions:
968  * - The Reply's transport header will never be larger than a page.
969  */
970 static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
971 				   struct svc_rdma_send_ctxt *sctxt,
972 				   const struct svc_rdma_recv_ctxt *rctxt,
973 				   struct svc_rqst *rqstp)
974 {
975 	struct ib_send_wr *send_wr = &sctxt->sc_send_wr;
976 	int ret;
977 
978 	ret = svc_rdma_map_reply_msg(rdma, sctxt, &rctxt->rc_write_pcl,
979 				     &rctxt->rc_reply_pcl, &rqstp->rq_res);
980 	if (ret < 0)
981 		return ret;
982 
983 	/* Transfer pages involved in RDMA Writes to the sctxt's
984 	 * page array. Completion handling releases these pages.
985 	 */
986 	svc_rdma_save_io_pages(rqstp, sctxt);
987 
988 	if (rctxt->rc_inv_rkey) {
989 		send_wr->opcode = IB_WR_SEND_WITH_INV;
990 		send_wr->ex.invalidate_rkey = rctxt->rc_inv_rkey;
991 	} else {
992 		send_wr->opcode = IB_WR_SEND;
993 	}
994 
995 	return svc_rdma_post_send(rdma, sctxt);
996 }
997 
998 /**
999  * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
1000  * @rdma: controlling transport context
1001  * @sctxt: Send context for the response
1002  * @rctxt: Receive context for incoming bad message
1003  * @status: negative errno indicating error that occurred
1004  *
1005  * Given the client-provided Read, Write, and Reply chunks, the
1006  * server was not able to parse the Call or form a complete Reply.
1007  * Return an RDMA_ERROR message so the client can retire the RPC
1008  * transaction.
1009  *
1010  * The caller does not have to release @sctxt. It is released by
1011  * Send completion, or by this function on error.
1012  */
1013 void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
1014 			     struct svc_rdma_send_ctxt *sctxt,
1015 			     struct svc_rdma_recv_ctxt *rctxt,
1016 			     int status)
1017 {
1018 	__be32 *rdma_argp = rctxt->rc_recv_buf;
1019 	__be32 *p;
1020 
1021 	rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0);
1022 	xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf,
1023 			sctxt->sc_xprt_buf, NULL);
1024 
1025 	p = xdr_reserve_space(&sctxt->sc_stream,
1026 			      rpcrdma_fixed_maxsz * sizeof(*p));
1027 	if (!p)
1028 		goto put_ctxt;
1029 
1030 	*p++ = *rdma_argp;
1031 	*p++ = *(rdma_argp + 1);
1032 	*p++ = rdma->sc_fc_credits;
1033 	*p = rdma_error;
1034 
1035 	switch (status) {
1036 	case -EPROTONOSUPPORT:
1037 		p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p));
1038 		if (!p)
1039 			goto put_ctxt;
1040 
1041 		*p++ = err_vers;
1042 		*p++ = rpcrdma_version;
1043 		*p = rpcrdma_version;
1044 		trace_svcrdma_err_vers(*rdma_argp);
1045 		break;
1046 	default:
1047 		p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p));
1048 		if (!p)
1049 			goto put_ctxt;
1050 
1051 		*p = err_chunk;
1052 		trace_svcrdma_err_chunk(*rdma_argp);
1053 	}
1054 
1055 	/* Remote Invalidation is skipped for simplicity. */
1056 	sctxt->sc_send_wr.num_sge = 1;
1057 	sctxt->sc_send_wr.opcode = IB_WR_SEND;
1058 	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
1059 
1060 	/* Ensure only the error message is posted, not any previously
1061 	 * prepared Write chunk WRs.
1062 	 */
1063 	sctxt->sc_wr_chain = &sctxt->sc_send_wr;
1064 	sctxt->sc_sqecount = 1;
1065 	if (svc_rdma_post_send(rdma, sctxt))
1066 		goto put_ctxt;
1067 	return;
1068 
1069 put_ctxt:
1070 	svc_rdma_send_ctxt_put(rdma, sctxt);
1071 }
1072 
1073 /**
1074  * svc_rdma_sendto - Transmit an RPC reply
1075  * @rqstp: processed RPC request, reply XDR already in ::rq_res
1076  *
1077  * Any resources still associated with @rqstp are released upon return.
1078  * If no reply message was possible, the connection is closed.
1079  *
1080  * Returns:
1081  *	%0 if an RPC reply has been successfully posted,
1082  *	%-ENOMEM if a resource shortage occurred (connection is lost),
1083  *	%-ENOTCONN if posting failed (connection is lost).
1084  */
1085 int svc_rdma_sendto(struct svc_rqst *rqstp)
1086 {
1087 	struct svc_xprt *xprt = rqstp->rq_xprt;
1088 	struct svcxprt_rdma *rdma =
1089 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
1090 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
1091 	__be32 *rdma_argp = rctxt->rc_recv_buf;
1092 	struct svc_rdma_send_ctxt *sctxt;
1093 	unsigned int rc_size;
1094 	__be32 *p;
1095 	int ret;
1096 
1097 	ret = -ENOTCONN;
1098 	if (svc_xprt_is_dead(xprt))
1099 		goto drop_connection;
1100 
1101 	ret = -ENOMEM;
1102 	sctxt = svc_rdma_send_ctxt_get(rdma);
1103 	if (!sctxt)
1104 		goto drop_connection;
1105 
1106 	ret = -EMSGSIZE;
1107 	p = xdr_reserve_space(&sctxt->sc_stream,
1108 			      rpcrdma_fixed_maxsz * sizeof(*p));
1109 	if (!p)
1110 		goto put_ctxt;
1111 
1112 	ret = svc_rdma_prepare_write_list(rdma, rctxt, sctxt, &rqstp->rq_res);
1113 	if (ret < 0)
1114 		goto put_ctxt;
1115 
1116 	rc_size = 0;
1117 	if (!pcl_is_empty(&rctxt->rc_reply_pcl)) {
1118 		ret = svc_rdma_prepare_reply_chunk(rdma, &rctxt->rc_write_pcl,
1119 						   &rctxt->rc_reply_pcl, sctxt,
1120 						   &rqstp->rq_res);
1121 		if (ret < 0)
1122 			goto reply_chunk;
1123 		rc_size = ret;
1124 	}
1125 
1126 	*p++ = *rdma_argp;
1127 	*p++ = *(rdma_argp + 1);
1128 	*p++ = rdma->sc_fc_credits;
1129 	*p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg;
1130 
1131 	ret = svc_rdma_encode_read_list(sctxt);
1132 	if (ret < 0)
1133 		goto put_ctxt;
1134 	ret = svc_rdma_encode_write_list(rctxt, sctxt);
1135 	if (ret < 0)
1136 		goto put_ctxt;
1137 	ret = svc_rdma_encode_reply_chunk(rctxt, sctxt, rc_size);
1138 	if (ret < 0)
1139 		goto put_ctxt;
1140 
1141 	ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
1142 	if (ret < 0)
1143 		goto put_ctxt;
1144 	return 0;
1145 
1146 reply_chunk:
1147 	if (ret != -E2BIG && ret != -EINVAL)
1148 		goto put_ctxt;
1149 
1150 	/* Send completion releases payload pages that were part
1151 	 * of previously posted RDMA Writes.
1152 	 */
1153 	svc_rdma_save_io_pages(rqstp, sctxt);
1154 	svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret);
1155 	return 0;
1156 
1157 put_ctxt:
1158 	svc_rdma_send_ctxt_put(rdma, sctxt);
1159 drop_connection:
1160 	trace_svcrdma_send_err(rqstp, ret);
1161 	svc_xprt_deferred_close(&rdma->sc_xprt);
1162 	return -ENOTCONN;
1163 }
1164 
1165 /**
1166  * svc_rdma_result_payload - special processing for a result payload
1167  * @rqstp: RPC transaction context
1168  * @offset: payload's byte offset in @rqstp->rq_res
1169  * @length: size of payload, in bytes
1170  *
1171  * Assign the passed-in result payload to the current Write chunk,
1172  * and advance to cur_result_payload to the next Write chunk, if
1173  * there is one.
1174  *
1175  * Return values:
1176  *   %0 if successful or nothing needed to be done
1177  *   %-E2BIG if the payload was larger than the Write chunk
1178  */
1179 int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
1180 			    unsigned int length)
1181 {
1182 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
1183 	struct svc_rdma_chunk *chunk;
1184 
1185 	chunk = rctxt->rc_cur_result_payload;
1186 	if (!length || !chunk)
1187 		return 0;
1188 	rctxt->rc_cur_result_payload =
1189 		pcl_next_chunk(&rctxt->rc_write_pcl, chunk);
1190 
1191 	if (length > chunk->ch_length)
1192 		return -E2BIG;
1193 	chunk->ch_position = offset;
1194 	chunk->ch_payload_length = length;
1195 	return 0;
1196 }
1197