xref: /linux/net/sunrpc/xprtrdma/svc_rdma_sendto.c (revision e5248a7426030db1e126363f72afdb3b71339a5c)
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2016-2018 Oracle. All rights reserved.
4  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
5  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
6  *
7  * This software is available to you under a choice of one of two
8  * licenses.  You may choose to be licensed under the terms of the GNU
9  * General Public License (GPL) Version 2, available from the file
10  * COPYING in the main directory of this source tree, or the BSD-type
11  * license below:
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions
15  * are met:
16  *
17  *      Redistributions of source code must retain the above copyright
18  *      notice, this list of conditions and the following disclaimer.
19  *
20  *      Redistributions in binary form must reproduce the above
21  *      copyright notice, this list of conditions and the following
22  *      disclaimer in the documentation and/or other materials provided
23  *      with the distribution.
24  *
25  *      Neither the name of the Network Appliance, Inc. nor the names of
26  *      its contributors may be used to endorse or promote products
27  *      derived from this software without specific prior written
28  *      permission.
29  *
30  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
31  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
32  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
33  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
34  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
36  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
37  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
38  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
39  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
40  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41  *
42  * Author: Tom Tucker <tom@opengridcomputing.com>
43  */
44 
45 /* Operation
46  *
47  * The main entry point is svc_rdma_sendto. This is called by the
48  * RPC server when an RPC Reply is ready to be transmitted to a client.
49  *
50  * The passed-in svc_rqst contains a struct xdr_buf which holds an
51  * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
52  * transport header, post all Write WRs needed for this Reply, then post
53  * a Send WR conveying the transport header and the RPC message itself to
54  * the client.
55  *
56  * svc_rdma_sendto must fully transmit the Reply before returning, as
57  * the svc_rqst will be recycled as soon as sendto returns. Remaining
58  * resources referred to by the svc_rqst are also recycled at that time.
59  * Therefore any resources that must remain longer must be detached
60  * from the svc_rqst and released later.
61  *
62  * Page Management
63  *
64  * The I/O that performs Reply transmission is asynchronous, and may
65  * complete well after sendto returns. Thus pages under I/O must be
66  * removed from the svc_rqst before sendto returns.
67  *
68  * The logic here depends on Send Queue and completion ordering. Since
69  * the Send WR is always posted last, it will always complete last. Thus
70  * when it completes, it is guaranteed that all previous Write WRs have
71  * also completed.
72  *
73  * Write WRs are constructed and posted. Each Write segment gets its own
74  * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
75  * DMA-unmap the pages under I/O for that Write segment. The Write
76  * completion handler does not release any pages.
77  *
78  * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt.
79  * The ownership of all of the Reply's pages are transferred into that
80  * ctxt, the Send WR is posted, and sendto returns.
81  *
82  * The svc_rdma_send_ctxt is presented when the Send WR completes.
83  * The Send completion handler queues the send_ctxt onto the
84  * per-transport sc_send_release_list (a lock-free llist). The
85  * nfsd thread drains sc_send_release_list in xpo_release_ctxt
86  * between RPCs, DMA-unmapping SGEs, releasing chunk I/O
87  * resources and pages, and returning send_ctxts to the free
88  * list in a batch.
89  *
90  * Error Handling
91  *
92  * - If the Send WR is posted successfully, it will either complete
93  *   successfully, or get flushed. Either way, the Send completion
94  *   handler queues the send_ctxt for deferred release.
95  * - If the Send WR cannot be posted, the forward path releases the
96  *   Reply's pages.
97  *
98  * This handles the case, without the use of page reference counting,
99  * where two different Write segments send portions of the same page.
100  */
101 
102 #include <linux/spinlock.h>
103 #include <linux/unaligned.h>
104 
105 #include <rdma/ib_verbs.h>
106 #include <rdma/rdma_cm.h>
107 
108 #include <linux/sunrpc/debug.h>
109 #include <linux/sunrpc/svc_rdma.h>
110 
111 #include "xprt_rdma.h"
112 #include <trace/events/rpcrdma.h>
113 
114 static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc);
115 
116 static struct svc_rdma_send_ctxt *
117 svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
118 {
119 	struct ib_device *device = rdma->sc_cm_id->device;
120 	int node = ibdev_to_node(device);
121 	struct svc_rdma_send_ctxt *ctxt;
122 	unsigned long pages;
123 	dma_addr_t addr;
124 	void *buffer;
125 	int i;
126 
127 	ctxt = kzalloc_node(struct_size(ctxt, sc_sges, rdma->sc_max_send_sges),
128 			    GFP_KERNEL, node);
129 	if (!ctxt)
130 		goto fail0;
131 	pages = svc_serv_maxpages(rdma->sc_xprt.xpt_server);
132 	ctxt->sc_pages = kcalloc_node(pages, sizeof(struct page *),
133 				      GFP_KERNEL, node);
134 	if (!ctxt->sc_pages)
135 		goto fail1;
136 	ctxt->sc_maxpages = pages;
137 	buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node);
138 	if (!buffer)
139 		goto fail2;
140 	addr = ib_dma_map_single(device, buffer, rdma->sc_max_req_size,
141 				 DMA_TO_DEVICE);
142 	if (ib_dma_mapping_error(device, addr))
143 		goto fail3;
144 
145 	svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
146 
147 	ctxt->sc_rdma = rdma;
148 	ctxt->sc_send_wr.next = NULL;
149 	ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
150 	ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
151 	ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
152 	ctxt->sc_cqe.done = svc_rdma_wc_send;
153 	INIT_LIST_HEAD(&ctxt->sc_write_info_list);
154 	ctxt->sc_xprt_buf = buffer;
155 	xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
156 		     rdma->sc_max_req_size);
157 	ctxt->sc_sges[0].addr = addr;
158 
159 	for (i = 0; i < rdma->sc_max_send_sges; i++)
160 		ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
161 	return ctxt;
162 
163 fail3:
164 	kfree(buffer);
165 fail2:
166 	kfree(ctxt->sc_pages);
167 fail1:
168 	kfree(ctxt);
169 fail0:
170 	return NULL;
171 }
172 
173 /**
174  * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt
175  * @rdma: svcxprt_rdma being torn down
176  *
177  */
178 void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
179 {
180 	struct ib_device *device = rdma->sc_cm_id->device;
181 	struct svc_rdma_send_ctxt *ctxt;
182 	struct llist_node *node;
183 
184 	while ((node = llist_del_first(&rdma->sc_send_ctxts)) != NULL) {
185 		ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node);
186 		ib_dma_unmap_single(device, ctxt->sc_sges[0].addr,
187 				    rdma->sc_max_req_size, DMA_TO_DEVICE);
188 		kfree(ctxt->sc_xprt_buf);
189 		kfree(ctxt->sc_pages);
190 		kfree(ctxt);
191 	}
192 }
193 
194 /**
195  * svc_rdma_send_ctxt_get - Get a free send_ctxt
196  * @rdma: controlling svcxprt_rdma
197  *
198  * Returns a ready-to-use send_ctxt, or NULL if none are
199  * available and a fresh one cannot be allocated.
200  */
201 struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
202 {
203 	struct svc_rdma_send_ctxt *ctxt;
204 	struct llist_node *node;
205 
206 	spin_lock(&rdma->sc_send_lock);
207 	node = llist_del_first(&rdma->sc_send_ctxts);
208 	spin_unlock(&rdma->sc_send_lock);
209 	if (!node)
210 		goto out_empty;
211 
212 	ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node);
213 
214 out:
215 	rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
216 	xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
217 			ctxt->sc_xprt_buf, NULL);
218 
219 	svc_rdma_cc_init(rdma, &ctxt->sc_reply_info.wi_cc);
220 	ctxt->sc_send_wr.num_sge = 0;
221 	ctxt->sc_cur_sge_no = 0;
222 	ctxt->sc_page_count = 0;
223 	ctxt->sc_wr_chain = &ctxt->sc_send_wr;
224 	ctxt->sc_sqecount = 1;
225 
226 	return ctxt;
227 
228 out_empty:
229 	svc_rdma_send_ctxts_drain(rdma);
230 
231 	spin_lock(&rdma->sc_send_lock);
232 	node = llist_del_first(&rdma->sc_send_ctxts);
233 	spin_unlock(&rdma->sc_send_lock);
234 	if (node) {
235 		ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node);
236 		goto out;
237 	}
238 
239 	ctxt = svc_rdma_send_ctxt_alloc(rdma);
240 	if (!ctxt)
241 		return NULL;
242 	goto out;
243 }
244 
245 /* Release chunk I/O resources and DMA-unmap SGEs. */
246 static void svc_rdma_send_ctxt_unmap(struct svcxprt_rdma *rdma,
247 				     struct svc_rdma_send_ctxt *ctxt)
248 {
249 	struct ib_device *device = rdma->sc_cm_id->device;
250 	unsigned int i;
251 
252 	svc_rdma_write_chunk_release(rdma, ctxt);
253 	svc_rdma_reply_chunk_release(rdma, ctxt);
254 
255 	/* The first SGE contains the transport header, which
256 	 * remains mapped until @ctxt is destroyed.
257 	 */
258 	for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) {
259 		trace_svcrdma_dma_unmap_page(&ctxt->sc_cid,
260 					     ctxt->sc_sges[i].addr,
261 					     ctxt->sc_sges[i].length);
262 		ib_dma_unmap_page(device,
263 				  ctxt->sc_sges[i].addr,
264 				  ctxt->sc_sges[i].length,
265 				  DMA_TO_DEVICE);
266 	}
267 }
268 
269 /* Unmap, release pages, and return send_ctxt to the free list. */
270 static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma,
271 				       struct svc_rdma_send_ctxt *ctxt)
272 {
273 	svc_rdma_send_ctxt_unmap(rdma, ctxt);
274 
275 	if (ctxt->sc_page_count)
276 		release_pages(ctxt->sc_pages, ctxt->sc_page_count);
277 
278 	llist_add(&ctxt->sc_node, &rdma->sc_send_ctxts);
279 }
280 
281 /**
282  * svc_rdma_send_ctxts_drain - Release completed send_ctxts
283  * @rdma: controlling svcxprt_rdma
284  */
285 void svc_rdma_send_ctxts_drain(struct svcxprt_rdma *rdma)
286 {
287 	struct svc_rdma_send_ctxt *ctxt, *next;
288 	struct llist_node *node;
289 
290 	node = llist_del_all(&rdma->sc_send_release_list);
291 	llist_for_each_entry_safe(ctxt, next, node, sc_node)
292 		svc_rdma_send_ctxt_release(rdma, ctxt);
293 }
294 
295 /**
296  * svc_rdma_send_ctxt_put - Queue send_ctxt for deferred release
297  * @rdma: controlling svcxprt_rdma
298  * @ctxt: send_ctxt to queue for deferred release
299  *
300  * Queues @ctxt onto sc_send_release_list. DMA unmap and
301  * page release run later in svc_rdma_send_ctxts_drain(),
302  * typically from xpo_release_ctxt.
303  *
304  * On the empty-to-non-empty transition, set XPT_DATA and
305  * enqueue the transport. Without this self-trigger, a Send
306  * completion arriving after the last xpo_release_ctxt on an
307  * idle connection would leave the send_ctxt's DMA mappings
308  * and reply pages pinned until another drain occurred.
309  */
310 void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
311 			    struct svc_rdma_send_ctxt *ctxt)
312 {
313 	if (llist_add(&ctxt->sc_node, &rdma->sc_send_release_list)) {
314 		set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
315 		svc_xprt_enqueue(&rdma->sc_xprt);
316 	}
317 }
318 
319 /**
320  * svc_rdma_wake_send_waiters - manage Send Queue accounting
321  * @rdma: controlling transport
322  * @avail: Number of additional SQEs that are now available
323  *
324  */
325 void svc_rdma_wake_send_waiters(struct svcxprt_rdma *rdma, int avail)
326 {
327 	atomic_add(avail, &rdma->sc_sq_avail);
328 	smp_mb__after_atomic();
329 	if (unlikely(waitqueue_active(&rdma->sc_send_wait)))
330 		wake_up(&rdma->sc_send_wait);
331 }
332 
333 /**
334  * svc_rdma_sq_wait - Wait for SQ slots using fair queuing
335  * @rdma: controlling transport
336  * @cid: completion ID for tracing
337  * @sqecount: number of SQ entries needed
338  *
339  * A ticket-based system ensures fair ordering when multiple threads
340  * wait for Send Queue capacity. Each waiter takes a ticket and is
341  * served in order, preventing starvation.
342  *
343  * Protocol invariant: every ticket holder must increment
344  * sc_sq_ticket_tail exactly once, whether the reservation
345  * succeeds or the connection closes. Failing to advance the
346  * tail stalls all subsequent waiters.
347  *
348  * The ticket counters are signed 32-bit atomics. After
349  * wrapping through INT_MAX, the equality check
350  * (tail == ticket) remains correct because both counters
351  * advance monotonically and the comparison uses exact
352  * equality rather than relational operators.
353  *
354  * Return values:
355  *   %0: SQ slots were reserved successfully
356  *   %-ENOTCONN: The connection was lost
357  */
358 int svc_rdma_sq_wait(struct svcxprt_rdma *rdma,
359 		     const struct rpc_rdma_cid *cid, int sqecount)
360 {
361 	int ticket;
362 
363 	/* Fast path: try to reserve SQ slots without waiting.
364 	 *
365 	 * A failed reservation temporarily understates sc_sq_avail
366 	 * until the compensating atomic_add restores it. A Send
367 	 * completion arriving in that window sees a lower count
368 	 * than reality, but the value self-corrects once the add
369 	 * completes. No ordering guarantee is needed here because
370 	 * the slow path serializes all contended waiters.
371 	 */
372 	if (likely(atomic_sub_return(sqecount, &rdma->sc_sq_avail) >= 0))
373 		return 0;
374 	atomic_add(sqecount, &rdma->sc_sq_avail);
375 
376 	/* Slow path: take a ticket and wait in line */
377 	ticket = atomic_fetch_inc(&rdma->sc_sq_ticket_head);
378 
379 	percpu_counter_inc(&svcrdma_stat_sq_starve);
380 	trace_svcrdma_sq_full(rdma, cid);
381 
382 	/* Wait until all earlier tickets have been served */
383 	wait_event(rdma->sc_sq_ticket_wait,
384 		   test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) ||
385 		   atomic_read(&rdma->sc_sq_ticket_tail) == ticket);
386 	if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
387 		goto out_close;
388 
389 	/* It's our turn. Wait for enough SQ slots to be available. */
390 	while (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) {
391 		atomic_add(sqecount, &rdma->sc_sq_avail);
392 
393 		wait_event(rdma->sc_send_wait,
394 			   test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) ||
395 			   atomic_read(&rdma->sc_sq_avail) >= sqecount);
396 		if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
397 			goto out_close;
398 	}
399 
400 	/* Slots reserved successfully. Let the next waiter proceed. */
401 	atomic_inc(&rdma->sc_sq_ticket_tail);
402 	wake_up(&rdma->sc_sq_ticket_wait);
403 	trace_svcrdma_sq_retry(rdma, cid);
404 
405 	/*
406 	 * While this thread sat on sc_send_wait or sc_sq_ticket_wait,
407 	 * Send completions that tried to enqueue this transport for a
408 	 * release-list drain were rejected: svc_rdma_has_wspace returns
409 	 * 0 while either waitqueue is active, and svc_xprt_ready
410 	 * rejects the enqueue. Drain the release list now.
411 	 */
412 	svc_rdma_send_ctxts_drain(rdma);
413 	return 0;
414 
415 out_close:
416 	atomic_inc(&rdma->sc_sq_ticket_tail);
417 	wake_up(&rdma->sc_sq_ticket_wait);
418 	return -ENOTCONN;
419 }
420 
421 /**
422  * svc_rdma_post_send_err - Handle ib_post_send failure
423  * @rdma: controlling transport
424  * @cid: completion ID for tracing
425  * @bad_wr: first WR that was not posted
426  * @first_wr: first WR in the chain
427  * @sqecount: number of SQ entries that were reserved
428  * @ret: error code from ib_post_send
429  *
430  * Return values:
431  *   %0: At least one WR was posted; a completion handles cleanup
432  *   %-ENOTCONN: No WRs were posted; SQ slots are released
433  */
434 int svc_rdma_post_send_err(struct svcxprt_rdma *rdma,
435 			   const struct rpc_rdma_cid *cid,
436 			   const struct ib_send_wr *bad_wr,
437 			   const struct ib_send_wr *first_wr,
438 			   int sqecount, int ret)
439 {
440 	trace_svcrdma_sq_post_err(rdma, cid, ret);
441 	svc_rdma_xprt_deferred_close(rdma);
442 
443 	/* If even one WR was posted, a Send completion will
444 	 * return the reserved SQ slots.
445 	 */
446 	if (bad_wr != first_wr)
447 		return 0;
448 
449 	svc_rdma_wake_send_waiters(rdma, sqecount);
450 	return -ENOTCONN;
451 }
452 
453 /**
454  * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
455  * @cq: Completion Queue context
456  * @wc: Work Completion object
457  *
458  * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
459  * the Send completion handler could be running.
460  */
461 static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
462 {
463 	struct svcxprt_rdma *rdma = cq->cq_context;
464 	struct ib_cqe *cqe = wc->wr_cqe;
465 	struct svc_rdma_send_ctxt *ctxt =
466 		container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
467 
468 	svc_rdma_wake_send_waiters(rdma, ctxt->sc_sqecount);
469 
470 	if (unlikely(wc->status != IB_WC_SUCCESS))
471 		goto flushed;
472 
473 	trace_svcrdma_wc_send(&ctxt->sc_cid);
474 	svc_rdma_send_ctxt_put(rdma, ctxt);
475 	return;
476 
477 flushed:
478 	if (wc->status != IB_WC_WR_FLUSH_ERR)
479 		trace_svcrdma_wc_send_err(wc, &ctxt->sc_cid);
480 	else
481 		trace_svcrdma_wc_send_flush(wc, &ctxt->sc_cid);
482 	svc_rdma_send_ctxt_put(rdma, ctxt);
483 	svc_rdma_xprt_deferred_close(rdma);
484 }
485 
486 /**
487  * svc_rdma_post_send - Post a WR chain to the Send Queue
488  * @rdma: transport context
489  * @ctxt: WR chain to post
490  *
491  * Copy fields in @ctxt to stack variables in order to guarantee
492  * that these values remain available after the ib_post_send() call.
493  * In some error flow cases, svc_rdma_wc_send() releases @ctxt.
494  *
495  * Return values:
496  *   %0: @ctxt's WR chain was posted successfully
497  *   %-ENOTCONN: The connection was lost
498  */
499 int svc_rdma_post_send(struct svcxprt_rdma *rdma,
500 		       struct svc_rdma_send_ctxt *ctxt)
501 {
502 	struct ib_send_wr *first_wr = ctxt->sc_wr_chain;
503 	struct ib_send_wr *send_wr = &ctxt->sc_send_wr;
504 	const struct ib_send_wr *bad_wr = first_wr;
505 	struct rpc_rdma_cid cid = ctxt->sc_cid;
506 	int ret, sqecount = ctxt->sc_sqecount;
507 
508 	might_sleep();
509 
510 	/* Sync the transport header buffer */
511 	ib_dma_sync_single_for_device(rdma->sc_cm_id->device,
512 				      send_wr->sg_list[0].addr,
513 				      send_wr->sg_list[0].length,
514 				      DMA_TO_DEVICE);
515 
516 	ret = svc_rdma_sq_wait(rdma, &cid, sqecount);
517 	if (ret < 0)
518 		return ret;
519 
520 	trace_svcrdma_post_send(ctxt);
521 	ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
522 	if (ret)
523 		return svc_rdma_post_send_err(rdma, &cid, bad_wr,
524 					      first_wr, sqecount, ret);
525 	return 0;
526 }
527 
528 /**
529  * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
530  * @sctxt: Send context for the RPC Reply
531  *
532  * Return values:
533  *   On success, returns length in bytes of the Reply XDR buffer
534  *   that was consumed by the Reply Read list
535  *   %-EMSGSIZE on XDR buffer overflow
536  */
537 static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
538 {
539 	/* RPC-over-RDMA version 1 replies never have a Read list. */
540 	return xdr_stream_encode_item_absent(&sctxt->sc_stream);
541 }
542 
543 /**
544  * svc_rdma_encode_write_segment - Encode one Write segment
545  * @sctxt: Send context for the RPC Reply
546  * @chunk: Write chunk to push
547  * @remaining: remaining bytes of the payload left in the Write chunk
548  * @segno: which segment in the chunk
549  *
550  * Return values:
551  *   On success, returns length in bytes of the Reply XDR buffer
552  *   that was consumed by the Write segment, and updates @remaining
553  *   %-EMSGSIZE on XDR buffer overflow
554  */
555 static ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt,
556 					     const struct svc_rdma_chunk *chunk,
557 					     u32 *remaining, unsigned int segno)
558 {
559 	const struct svc_rdma_segment *segment = &chunk->ch_segments[segno];
560 	const size_t len = rpcrdma_segment_maxsz * sizeof(__be32);
561 	u32 length;
562 	__be32 *p;
563 
564 	p = xdr_reserve_space(&sctxt->sc_stream, len);
565 	if (!p)
566 		return -EMSGSIZE;
567 
568 	length = min_t(u32, *remaining, segment->rs_length);
569 	*remaining -= length;
570 	xdr_encode_rdma_segment(p, segment->rs_handle, length,
571 				segment->rs_offset);
572 	trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length,
573 				  segment->rs_offset);
574 	return len;
575 }
576 
577 /**
578  * svc_rdma_encode_write_chunk - Encode one Write chunk
579  * @sctxt: Send context for the RPC Reply
580  * @chunk: Write chunk to push
581  *
582  * Copy a Write chunk from the Call transport header to the
583  * Reply transport header. Update each segment's length field
584  * to reflect the number of bytes written in that segment.
585  *
586  * Return values:
587  *   On success, returns length in bytes of the Reply XDR buffer
588  *   that was consumed by the Write chunk
589  *   %-EMSGSIZE on XDR buffer overflow
590  */
591 static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt,
592 					   const struct svc_rdma_chunk *chunk)
593 {
594 	u32 remaining = chunk->ch_payload_length;
595 	unsigned int segno;
596 	ssize_t len, ret;
597 
598 	len = 0;
599 	ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
600 	if (ret < 0)
601 		return ret;
602 	len += ret;
603 
604 	ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount);
605 	if (ret < 0)
606 		return ret;
607 	len += ret;
608 
609 	for (segno = 0; segno < chunk->ch_segcount; segno++) {
610 		ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno);
611 		if (ret < 0)
612 			return ret;
613 		len += ret;
614 	}
615 
616 	return len;
617 }
618 
619 /**
620  * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
621  * @rctxt: Reply context with information about the RPC Call
622  * @sctxt: Send context for the RPC Reply
623  *
624  * Return values:
625  *   On success, returns length in bytes of the Reply XDR buffer
626  *   that was consumed by the Reply's Write list
627  *   %-EMSGSIZE on XDR buffer overflow
628  */
629 static ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt,
630 					  struct svc_rdma_send_ctxt *sctxt)
631 {
632 	struct svc_rdma_chunk *chunk;
633 	ssize_t len, ret;
634 
635 	len = 0;
636 	pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
637 		ret = svc_rdma_encode_write_chunk(sctxt, chunk);
638 		if (ret < 0)
639 			return ret;
640 		len += ret;
641 	}
642 
643 	/* Terminate the Write list */
644 	ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
645 	if (ret < 0)
646 		return ret;
647 
648 	return len + ret;
649 }
650 
651 /**
652  * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
653  * @rctxt: Reply context with information about the RPC Call
654  * @sctxt: Send context for the RPC Reply
655  * @length: size in bytes of the payload in the Reply chunk
656  *
657  * Return values:
658  *   On success, returns length in bytes of the Reply XDR buffer
659  *   that was consumed by the Reply's Reply chunk
660  *   %-EMSGSIZE on XDR buffer overflow
661  *   %-E2BIG if the RPC message is larger than the Reply chunk
662  */
663 static ssize_t
664 svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt,
665 			    struct svc_rdma_send_ctxt *sctxt,
666 			    unsigned int length)
667 {
668 	struct svc_rdma_chunk *chunk;
669 
670 	if (pcl_is_empty(&rctxt->rc_reply_pcl))
671 		return xdr_stream_encode_item_absent(&sctxt->sc_stream);
672 
673 	chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
674 	if (length > chunk->ch_length)
675 		return -E2BIG;
676 
677 	chunk->ch_payload_length = length;
678 	return svc_rdma_encode_write_chunk(sctxt, chunk);
679 }
680 
681 struct svc_rdma_map_data {
682 	struct svcxprt_rdma		*md_rdma;
683 	struct svc_rdma_send_ctxt	*md_ctxt;
684 };
685 
686 /**
687  * svc_rdma_page_dma_map - DMA map one page
688  * @data: pointer to arguments
689  * @page: struct page to DMA map
690  * @offset: offset into the page
691  * @len: number of bytes to map
692  *
693  * Returns:
694  *   %0 if DMA mapping was successful
695  *   %-EIO if the page cannot be DMA mapped
696  */
697 static int svc_rdma_page_dma_map(void *data, struct page *page,
698 				 unsigned long offset, unsigned int len)
699 {
700 	struct svc_rdma_map_data *args = data;
701 	struct svcxprt_rdma *rdma = args->md_rdma;
702 	struct svc_rdma_send_ctxt *ctxt = args->md_ctxt;
703 	struct ib_device *dev = rdma->sc_cm_id->device;
704 	dma_addr_t dma_addr;
705 
706 	++ctxt->sc_cur_sge_no;
707 
708 	dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
709 	if (ib_dma_mapping_error(dev, dma_addr))
710 		goto out_maperr;
711 
712 	trace_svcrdma_dma_map_page(&ctxt->sc_cid, dma_addr, len);
713 	ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr;
714 	ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len;
715 	ctxt->sc_send_wr.num_sge++;
716 	return 0;
717 
718 out_maperr:
719 	trace_svcrdma_dma_map_err(&ctxt->sc_cid, dma_addr, len);
720 	return -EIO;
721 }
722 
723 /**
724  * svc_rdma_iov_dma_map - DMA map an iovec
725  * @data: pointer to arguments
726  * @iov: kvec to DMA map
727  *
728  * ib_dma_map_page() is used here because svc_rdma_dma_unmap()
729  * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
730  *
731  * Returns:
732  *   %0 if DMA mapping was successful
733  *   %-EIO if the iovec cannot be DMA mapped
734  */
735 static int svc_rdma_iov_dma_map(void *data, const struct kvec *iov)
736 {
737 	if (!iov->iov_len)
738 		return 0;
739 	return svc_rdma_page_dma_map(data, virt_to_page(iov->iov_base),
740 				     offset_in_page(iov->iov_base),
741 				     iov->iov_len);
742 }
743 
744 /**
745  * svc_rdma_xb_dma_map - DMA map all segments of an xdr_buf
746  * @xdr: xdr_buf containing portion of an RPC message to transmit
747  * @data: pointer to arguments
748  *
749  * Returns:
750  *   %0 if DMA mapping was successful
751  *   %-EIO if DMA mapping failed
752  *
753  * On failure, any DMA mappings that have been already done must be
754  * unmapped by the caller.
755  */
756 static int svc_rdma_xb_dma_map(const struct xdr_buf *xdr, void *data)
757 {
758 	unsigned int len, remaining;
759 	unsigned long pageoff;
760 	struct page **ppages;
761 	int ret;
762 
763 	ret = svc_rdma_iov_dma_map(data, &xdr->head[0]);
764 	if (ret < 0)
765 		return ret;
766 
767 	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
768 	pageoff = offset_in_page(xdr->page_base);
769 	remaining = xdr->page_len;
770 	while (remaining) {
771 		len = min_t(u32, PAGE_SIZE - pageoff, remaining);
772 
773 		ret = svc_rdma_page_dma_map(data, *ppages++, pageoff, len);
774 		if (ret < 0)
775 			return ret;
776 
777 		remaining -= len;
778 		pageoff = 0;
779 	}
780 
781 	ret = svc_rdma_iov_dma_map(data, &xdr->tail[0]);
782 	if (ret < 0)
783 		return ret;
784 
785 	return xdr->len;
786 }
787 
788 struct svc_rdma_pullup_data {
789 	u8		*pd_dest;
790 	unsigned int	pd_length;
791 	unsigned int	pd_num_sges;
792 };
793 
794 /**
795  * svc_rdma_xb_count_sges - Count how many SGEs will be needed
796  * @xdr: xdr_buf containing portion of an RPC message to transmit
797  * @data: pointer to arguments
798  *
799  * Returns:
800  *   Number of SGEs needed to Send the contents of @xdr inline
801  */
802 static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr,
803 				  void *data)
804 {
805 	struct svc_rdma_pullup_data *args = data;
806 	unsigned int remaining;
807 	unsigned long offset;
808 
809 	if (xdr->head[0].iov_len)
810 		++args->pd_num_sges;
811 
812 	offset = offset_in_page(xdr->page_base);
813 	remaining = xdr->page_len;
814 	while (remaining) {
815 		++args->pd_num_sges;
816 		remaining -= min_t(u32, PAGE_SIZE - offset, remaining);
817 		offset = 0;
818 	}
819 
820 	if (xdr->tail[0].iov_len)
821 		++args->pd_num_sges;
822 
823 	args->pd_length += xdr->len;
824 	return 0;
825 }
826 
827 /**
828  * svc_rdma_pull_up_needed - Determine whether to use pull-up
829  * @rdma: controlling transport
830  * @sctxt: send_ctxt for the Send WR
831  * @write_pcl: Write chunk list provided by client
832  * @xdr: xdr_buf containing RPC message to transmit
833  *
834  * Returns:
835  *   %true if pull-up must be used
836  *   %false otherwise
837  */
838 static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma,
839 				    const struct svc_rdma_send_ctxt *sctxt,
840 				    const struct svc_rdma_pcl *write_pcl,
841 				    const struct xdr_buf *xdr)
842 {
843 	/* Resources needed for the transport header */
844 	struct svc_rdma_pullup_data args = {
845 		.pd_length	= sctxt->sc_hdrbuf.len,
846 		.pd_num_sges	= 1,
847 	};
848 	int ret;
849 
850 	ret = pcl_process_nonpayloads(write_pcl, xdr,
851 				      svc_rdma_xb_count_sges, &args);
852 	if (ret < 0)
853 		return false;
854 
855 	if (args.pd_length < RPCRDMA_PULLUP_THRESH)
856 		return true;
857 	return args.pd_num_sges >= rdma->sc_max_send_sges;
858 }
859 
860 /**
861  * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer
862  * @xdr: xdr_buf containing portion of an RPC message to copy
863  * @data: pointer to arguments
864  *
865  * Returns:
866  *   Always zero.
867  */
868 static int svc_rdma_xb_linearize(const struct xdr_buf *xdr,
869 				 void *data)
870 {
871 	struct svc_rdma_pullup_data *args = data;
872 	unsigned int len, remaining;
873 	unsigned long pageoff;
874 	struct page **ppages;
875 
876 	if (xdr->head[0].iov_len) {
877 		memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len);
878 		args->pd_dest += xdr->head[0].iov_len;
879 	}
880 
881 	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
882 	pageoff = offset_in_page(xdr->page_base);
883 	remaining = xdr->page_len;
884 	while (remaining) {
885 		len = min_t(u32, PAGE_SIZE - pageoff, remaining);
886 		memcpy(args->pd_dest, page_address(*ppages) + pageoff, len);
887 		remaining -= len;
888 		args->pd_dest += len;
889 		pageoff = 0;
890 		ppages++;
891 	}
892 
893 	if (xdr->tail[0].iov_len) {
894 		memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
895 		args->pd_dest += xdr->tail[0].iov_len;
896 	}
897 
898 	args->pd_length += xdr->len;
899 	return 0;
900 }
901 
902 /**
903  * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
904  * @rdma: controlling transport
905  * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
906  * @write_pcl: Write chunk list provided by client
907  * @xdr: prepared xdr_buf containing RPC message
908  *
909  * The device is not capable of sending the reply directly.
910  * Assemble the elements of @xdr into the transport header buffer.
911  *
912  * Assumptions:
913  *  pull_up_needed has determined that @xdr will fit in the buffer.
914  *
915  * Returns:
916  *   %0 if pull-up was successful
917  *   %-EMSGSIZE if a buffer manipulation problem occurred
918  */
919 static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
920 				      struct svc_rdma_send_ctxt *sctxt,
921 				      const struct svc_rdma_pcl *write_pcl,
922 				      const struct xdr_buf *xdr)
923 {
924 	struct svc_rdma_pullup_data args = {
925 		.pd_dest	= sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len,
926 	};
927 	int ret;
928 
929 	ret = pcl_process_nonpayloads(write_pcl, xdr,
930 				      svc_rdma_xb_linearize, &args);
931 	if (ret < 0)
932 		return ret;
933 
934 	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length;
935 	trace_svcrdma_send_pullup(sctxt, args.pd_length);
936 	return 0;
937 }
938 
939 /* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
940  * @rdma: controlling transport
941  * @sctxt: send_ctxt for the Send WR
942  * @write_pcl: Write chunk list provided by client
943  * @reply_pcl: Reply chunk provided by client
944  * @xdr: prepared xdr_buf containing RPC message
945  *
946  * Returns:
947  *   %0 if DMA mapping was successful.
948  *   %-EMSGSIZE if a buffer manipulation problem occurred
949  *   %-EIO if DMA mapping failed
950  *
951  * The Send WR's num_sge field is set in all cases.
952  */
953 int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
954 			   struct svc_rdma_send_ctxt *sctxt,
955 			   const struct svc_rdma_pcl *write_pcl,
956 			   const struct svc_rdma_pcl *reply_pcl,
957 			   const struct xdr_buf *xdr)
958 {
959 	struct svc_rdma_map_data args = {
960 		.md_rdma	= rdma,
961 		.md_ctxt	= sctxt,
962 	};
963 
964 	/* Set up the (persistently-mapped) transport header SGE. */
965 	sctxt->sc_send_wr.num_sge = 1;
966 	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
967 
968 	/* If there is a Reply chunk, nothing follows the transport
969 	 * header, so there is nothing to map.
970 	 */
971 	if (!pcl_is_empty(reply_pcl))
972 		return 0;
973 
974 	/* For pull-up, svc_rdma_send() will sync the transport header.
975 	 * No additional DMA mapping is necessary.
976 	 */
977 	if (svc_rdma_pull_up_needed(rdma, sctxt, write_pcl, xdr))
978 		return svc_rdma_pull_up_reply_msg(rdma, sctxt, write_pcl, xdr);
979 
980 	return pcl_process_nonpayloads(write_pcl, xdr,
981 				       svc_rdma_xb_dma_map, &args);
982 }
983 
984 /* The svc_rqst and all resources it owns are released as soon as
985  * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
986  * so they are released only after Send completion, and not by
987  * svc_rqst_release_pages().
988  */
989 static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
990 				   struct svc_rdma_send_ctxt *ctxt)
991 {
992 	int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
993 
994 	ctxt->sc_page_count += pages;
995 	for (i = 0; i < pages; i++) {
996 		ctxt->sc_pages[i] = rqstp->rq_respages[i];
997 		rqstp->rq_respages[i] = NULL;
998 	}
999 }
1000 
1001 /* Prepare the portion of the RPC Reply that will be transmitted
1002  * via RDMA Send. The RPC-over-RDMA transport header is prepared
1003  * in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
1004  *
1005  * Depending on whether a Write list or Reply chunk is present,
1006  * the server may Send all, a portion of, or none of the xdr_buf.
1007  * In the latter case, only the transport header (sc_sges[0]) is
1008  * transmitted.
1009  *
1010  * Assumptions:
1011  * - The Reply's transport header will never be larger than a page.
1012  */
1013 static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
1014 				   struct svc_rdma_send_ctxt *sctxt,
1015 				   const struct svc_rdma_recv_ctxt *rctxt,
1016 				   struct svc_rqst *rqstp)
1017 {
1018 	struct ib_send_wr *send_wr = &sctxt->sc_send_wr;
1019 	int ret;
1020 
1021 	ret = svc_rdma_map_reply_msg(rdma, sctxt, &rctxt->rc_write_pcl,
1022 				     &rctxt->rc_reply_pcl, &rqstp->rq_res);
1023 	if (ret < 0)
1024 		return ret;
1025 
1026 	/* Transfer pages involved in RDMA Writes to the sctxt's
1027 	 * page array. Completion handling releases these pages.
1028 	 */
1029 	svc_rdma_save_io_pages(rqstp, sctxt);
1030 
1031 	if (rctxt->rc_inv_rkey) {
1032 		send_wr->opcode = IB_WR_SEND_WITH_INV;
1033 		send_wr->ex.invalidate_rkey = rctxt->rc_inv_rkey;
1034 	} else {
1035 		send_wr->opcode = IB_WR_SEND;
1036 	}
1037 
1038 	return svc_rdma_post_send(rdma, sctxt);
1039 }
1040 
1041 /**
1042  * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
1043  * @rdma: controlling transport context
1044  * @sctxt: Send context for the response
1045  * @rctxt: Receive context for incoming bad message
1046  * @status: negative errno indicating error that occurred
1047  *
1048  * Given the client-provided Read, Write, and Reply chunks, the
1049  * server was not able to parse the Call or form a complete Reply.
1050  * Return an RDMA_ERROR message so the client can retire the RPC
1051  * transaction.
1052  *
1053  * The caller does not have to release @sctxt. It is released by
1054  * Send completion, or by this function on error.
1055  */
1056 void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
1057 			     struct svc_rdma_send_ctxt *sctxt,
1058 			     struct svc_rdma_recv_ctxt *rctxt,
1059 			     int status)
1060 {
1061 	__be32 *rdma_argp = rctxt->rc_recv_buf;
1062 	__be32 *p;
1063 
1064 	rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0);
1065 	xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf,
1066 			sctxt->sc_xprt_buf, NULL);
1067 
1068 	p = xdr_reserve_space(&sctxt->sc_stream,
1069 			      rpcrdma_fixed_maxsz * sizeof(*p));
1070 	if (!p)
1071 		goto put_ctxt;
1072 
1073 	*p++ = *rdma_argp;
1074 	*p++ = *(rdma_argp + 1);
1075 	*p++ = rdma->sc_fc_credits;
1076 	*p = rdma_error;
1077 
1078 	switch (status) {
1079 	case -EPROTONOSUPPORT:
1080 		p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p));
1081 		if (!p)
1082 			goto put_ctxt;
1083 
1084 		*p++ = err_vers;
1085 		*p++ = rpcrdma_version;
1086 		*p = rpcrdma_version;
1087 		trace_svcrdma_err_vers(*rdma_argp);
1088 		break;
1089 	default:
1090 		p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p));
1091 		if (!p)
1092 			goto put_ctxt;
1093 
1094 		*p = err_chunk;
1095 		trace_svcrdma_err_chunk(*rdma_argp);
1096 	}
1097 
1098 	/* Remote Invalidation is skipped for simplicity. */
1099 	sctxt->sc_send_wr.num_sge = 1;
1100 	sctxt->sc_send_wr.opcode = IB_WR_SEND;
1101 	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
1102 
1103 	/* Ensure only the error message is posted, not any previously
1104 	 * prepared Write chunk WRs.
1105 	 */
1106 	sctxt->sc_wr_chain = &sctxt->sc_send_wr;
1107 	sctxt->sc_sqecount = 1;
1108 	if (svc_rdma_post_send(rdma, sctxt))
1109 		goto put_ctxt;
1110 	return;
1111 
1112 put_ctxt:
1113 	svc_rdma_send_ctxt_put(rdma, sctxt);
1114 }
1115 
1116 /**
1117  * svc_rdma_sendto - Transmit an RPC reply
1118  * @rqstp: processed RPC request, reply XDR already in ::rq_res
1119  *
1120  * Any resources still associated with @rqstp are released upon return.
1121  * If no reply message was possible, the connection is closed.
1122  *
1123  * Returns:
1124  *	%0 if an RPC reply has been successfully posted,
1125  *	%-ENOMEM if a resource shortage occurred (connection is lost),
1126  *	%-ENOTCONN if posting failed (connection is lost).
1127  */
1128 int svc_rdma_sendto(struct svc_rqst *rqstp)
1129 {
1130 	struct svc_xprt *xprt = rqstp->rq_xprt;
1131 	struct svcxprt_rdma *rdma =
1132 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
1133 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
1134 	__be32 *rdma_argp = rctxt->rc_recv_buf;
1135 	struct svc_rdma_send_ctxt *sctxt;
1136 	unsigned int rc_size;
1137 	__be32 *p;
1138 	int ret;
1139 
1140 	ret = -ENOTCONN;
1141 	if (svc_xprt_is_dead(xprt))
1142 		goto drop_connection;
1143 
1144 	ret = -ENOMEM;
1145 	sctxt = svc_rdma_send_ctxt_get(rdma);
1146 	if (!sctxt)
1147 		goto drop_connection;
1148 
1149 	ret = -EMSGSIZE;
1150 	p = xdr_reserve_space(&sctxt->sc_stream,
1151 			      rpcrdma_fixed_maxsz * sizeof(*p));
1152 	if (!p)
1153 		goto put_ctxt;
1154 
1155 	ret = svc_rdma_prepare_write_list(rdma, rctxt, sctxt, &rqstp->rq_res);
1156 	if (ret < 0)
1157 		goto put_ctxt;
1158 
1159 	rc_size = 0;
1160 	if (!pcl_is_empty(&rctxt->rc_reply_pcl)) {
1161 		ret = svc_rdma_prepare_reply_chunk(rdma, &rctxt->rc_write_pcl,
1162 						   &rctxt->rc_reply_pcl, sctxt,
1163 						   &rqstp->rq_res);
1164 		if (ret < 0)
1165 			goto reply_chunk;
1166 		rc_size = ret;
1167 	}
1168 
1169 	*p++ = *rdma_argp;
1170 	*p++ = *(rdma_argp + 1);
1171 	*p++ = rdma->sc_fc_credits;
1172 	*p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg;
1173 
1174 	ret = svc_rdma_encode_read_list(sctxt);
1175 	if (ret < 0)
1176 		goto put_ctxt;
1177 	ret = svc_rdma_encode_write_list(rctxt, sctxt);
1178 	if (ret < 0)
1179 		goto put_ctxt;
1180 	ret = svc_rdma_encode_reply_chunk(rctxt, sctxt, rc_size);
1181 	if (ret < 0)
1182 		goto put_ctxt;
1183 
1184 	ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
1185 	if (ret < 0)
1186 		goto put_ctxt;
1187 	return 0;
1188 
1189 reply_chunk:
1190 	if (ret != -E2BIG && ret != -EINVAL)
1191 		goto put_ctxt;
1192 
1193 	/* Send completion releases payload pages that were part
1194 	 * of previously posted RDMA Writes.
1195 	 */
1196 	svc_rdma_save_io_pages(rqstp, sctxt);
1197 	svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret);
1198 	return 0;
1199 
1200 put_ctxt:
1201 	svc_rdma_send_ctxt_put(rdma, sctxt);
1202 drop_connection:
1203 	trace_svcrdma_send_err(rqstp, ret);
1204 	svc_rdma_xprt_deferred_close(rdma);
1205 	return -ENOTCONN;
1206 }
1207 
1208 /**
1209  * svc_rdma_result_payload - special processing for a result payload
1210  * @rqstp: RPC transaction context
1211  * @offset: payload's byte offset in @rqstp->rq_res
1212  * @length: size of payload, in bytes
1213  *
1214  * Assign the passed-in result payload to the current Write chunk,
1215  * and advance to cur_result_payload to the next Write chunk, if
1216  * there is one.
1217  *
1218  * Return values:
1219  *   %0 if successful or nothing needed to be done
1220  *   %-E2BIG if the payload was larger than the Write chunk
1221  */
1222 int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
1223 			    unsigned int length)
1224 {
1225 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
1226 	struct svc_rdma_chunk *chunk;
1227 
1228 	chunk = rctxt->rc_cur_result_payload;
1229 	if (!length || !chunk)
1230 		return 0;
1231 	rctxt->rc_cur_result_payload =
1232 		pcl_next_chunk(&rctxt->rc_write_pcl, chunk);
1233 
1234 	if (length > chunk->ch_length)
1235 		return -E2BIG;
1236 	chunk->ch_position = offset;
1237 	chunk->ch_payload_length = length;
1238 	return 0;
1239 }
1240