xref: /linux/net/sunrpc/xprtrdma/rpc_rdma.c (revision 4ab5a5d2a4a2289c2af07accbec7170ca5671f41)
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the BSD-type
10  * license below:
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  *
16  *      Redistributions of source code must retain the above copyright
17  *      notice, this list of conditions and the following disclaimer.
18  *
19  *      Redistributions in binary form must reproduce the above
20  *      copyright notice, this list of conditions and the following
21  *      disclaimer in the documentation and/or other materials provided
22  *      with the distribution.
23  *
24  *      Neither the name of the Network Appliance, Inc. nor the names of
25  *      its contributors may be used to endorse or promote products
26  *      derived from this software without specific prior written
27  *      permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  */
41 
42 /*
43  * rpc_rdma.c
44  *
45  * This file contains the guts of the RPC RDMA protocol, and
46  * does marshaling/unmarshaling, etc. It is also where interfacing
47  * to the Linux RPC framework lives.
48  */
49 
50 #include <linux/highmem.h>
51 
52 #include <linux/sunrpc/svc_rdma.h>
53 
54 #include "xprt_rdma.h"
55 #include <trace/events/rpcrdma.h>
56 
57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
58 # define RPCDBG_FACILITY	RPCDBG_TRANS
59 #endif
60 
61 /* Returns size of largest RPC-over-RDMA header in a Call message
62  *
63  * The largest Call header contains a full-size Read list and a
64  * minimal Reply chunk.
65  */
66 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
67 {
68 	unsigned int size;
69 
70 	/* Fixed header fields and list discriminators */
71 	size = RPCRDMA_HDRLEN_MIN;
72 
73 	/* Maximum Read list size */
74 	size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
75 
76 	/* Minimal Read chunk size */
77 	size += sizeof(__be32);	/* segment count */
78 	size += rpcrdma_segment_maxsz * sizeof(__be32);
79 	size += sizeof(__be32);	/* list discriminator */
80 
81 	dprintk("RPC:       %s: max call header size = %u\n",
82 		__func__, size);
83 	return size;
84 }
85 
86 /* Returns size of largest RPC-over-RDMA header in a Reply message
87  *
88  * There is only one Write list or one Reply chunk per Reply
89  * message.  The larger list is the Write list.
90  */
91 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
92 {
93 	unsigned int size;
94 
95 	/* Fixed header fields and list discriminators */
96 	size = RPCRDMA_HDRLEN_MIN;
97 
98 	/* Maximum Write list size */
99 	size = sizeof(__be32);		/* segment count */
100 	size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
101 	size += sizeof(__be32);	/* list discriminator */
102 
103 	dprintk("RPC:       %s: max reply header size = %u\n",
104 		__func__, size);
105 	return size;
106 }
107 
108 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
109 {
110 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
111 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
112 	unsigned int maxsegs = ia->ri_max_segs;
113 
114 	ia->ri_max_inline_write = cdata->inline_wsize -
115 				  rpcrdma_max_call_header_size(maxsegs);
116 	ia->ri_max_inline_read = cdata->inline_rsize -
117 				 rpcrdma_max_reply_header_size(maxsegs);
118 }
119 
120 /* The client can send a request inline as long as the RPCRDMA header
121  * plus the RPC call fit under the transport's inline limit. If the
122  * combined call message size exceeds that limit, the client must use
123  * a Read chunk for this operation.
124  *
125  * A Read chunk is also required if sending the RPC call inline would
126  * exceed this device's max_sge limit.
127  */
128 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
129 				struct rpc_rqst *rqst)
130 {
131 	struct xdr_buf *xdr = &rqst->rq_snd_buf;
132 	unsigned int count, remaining, offset;
133 
134 	if (xdr->len > r_xprt->rx_ia.ri_max_inline_write)
135 		return false;
136 
137 	if (xdr->page_len) {
138 		remaining = xdr->page_len;
139 		offset = offset_in_page(xdr->page_base);
140 		count = RPCRDMA_MIN_SEND_SGES;
141 		while (remaining) {
142 			remaining -= min_t(unsigned int,
143 					   PAGE_SIZE - offset, remaining);
144 			offset = 0;
145 			if (++count > r_xprt->rx_ia.ri_max_send_sges)
146 				return false;
147 		}
148 	}
149 
150 	return true;
151 }
152 
153 /* The client can't know how large the actual reply will be. Thus it
154  * plans for the largest possible reply for that particular ULP
155  * operation. If the maximum combined reply message size exceeds that
156  * limit, the client must provide a write list or a reply chunk for
157  * this request.
158  */
159 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
160 				   struct rpc_rqst *rqst)
161 {
162 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
163 
164 	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
165 }
166 
167 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
168  * a byte range. Other modes coalesce these SGEs into a single MR
169  * when they can.
170  *
171  * Returns pointer to next available SGE, and bumps the total number
172  * of SGEs consumed.
173  */
174 static struct rpcrdma_mr_seg *
175 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
176 		     unsigned int *n)
177 {
178 	u32 remaining, page_offset;
179 	char *base;
180 
181 	base = vec->iov_base;
182 	page_offset = offset_in_page(base);
183 	remaining = vec->iov_len;
184 	while (remaining) {
185 		seg->mr_page = NULL;
186 		seg->mr_offset = base;
187 		seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
188 		remaining -= seg->mr_len;
189 		base += seg->mr_len;
190 		++seg;
191 		++(*n);
192 		page_offset = 0;
193 	}
194 	return seg;
195 }
196 
197 /* Convert @xdrbuf into SGEs no larger than a page each. As they
198  * are registered, these SGEs are then coalesced into RDMA segments
199  * when the selected memreg mode supports it.
200  *
201  * Returns positive number of SGEs consumed, or a negative errno.
202  */
203 
204 static int
205 rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
206 		     unsigned int pos, enum rpcrdma_chunktype type,
207 		     struct rpcrdma_mr_seg *seg)
208 {
209 	unsigned long page_base;
210 	unsigned int len, n;
211 	struct page **ppages;
212 
213 	n = 0;
214 	if (pos == 0)
215 		seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
216 
217 	len = xdrbuf->page_len;
218 	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
219 	page_base = offset_in_page(xdrbuf->page_base);
220 	while (len) {
221 		if (unlikely(!*ppages)) {
222 			/* XXX: Certain upper layer operations do
223 			 *	not provide receive buffer pages.
224 			 */
225 			*ppages = alloc_page(GFP_ATOMIC);
226 			if (!*ppages)
227 				return -ENOBUFS;
228 		}
229 		seg->mr_page = *ppages;
230 		seg->mr_offset = (char *)page_base;
231 		seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
232 		len -= seg->mr_len;
233 		++ppages;
234 		++seg;
235 		++n;
236 		page_base = 0;
237 	}
238 
239 	/* When encoding a Read chunk, the tail iovec contains an
240 	 * XDR pad and may be omitted.
241 	 */
242 	if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
243 		goto out;
244 
245 	/* When encoding a Write chunk, some servers need to see an
246 	 * extra segment for non-XDR-aligned Write chunks. The upper
247 	 * layer provides space in the tail iovec that may be used
248 	 * for this purpose.
249 	 */
250 	if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
251 		goto out;
252 
253 	if (xdrbuf->tail[0].iov_len)
254 		seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
255 
256 out:
257 	if (unlikely(n > RPCRDMA_MAX_SEGS))
258 		return -EIO;
259 	return n;
260 }
261 
262 static inline int
263 encode_item_present(struct xdr_stream *xdr)
264 {
265 	__be32 *p;
266 
267 	p = xdr_reserve_space(xdr, sizeof(*p));
268 	if (unlikely(!p))
269 		return -EMSGSIZE;
270 
271 	*p = xdr_one;
272 	return 0;
273 }
274 
275 static inline int
276 encode_item_not_present(struct xdr_stream *xdr)
277 {
278 	__be32 *p;
279 
280 	p = xdr_reserve_space(xdr, sizeof(*p));
281 	if (unlikely(!p))
282 		return -EMSGSIZE;
283 
284 	*p = xdr_zero;
285 	return 0;
286 }
287 
288 static void
289 xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr)
290 {
291 	*iptr++ = cpu_to_be32(mr->mr_handle);
292 	*iptr++ = cpu_to_be32(mr->mr_length);
293 	xdr_encode_hyper(iptr, mr->mr_offset);
294 }
295 
296 static int
297 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
298 {
299 	__be32 *p;
300 
301 	p = xdr_reserve_space(xdr, 4 * sizeof(*p));
302 	if (unlikely(!p))
303 		return -EMSGSIZE;
304 
305 	xdr_encode_rdma_segment(p, mr);
306 	return 0;
307 }
308 
309 static int
310 encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
311 		    u32 position)
312 {
313 	__be32 *p;
314 
315 	p = xdr_reserve_space(xdr, 6 * sizeof(*p));
316 	if (unlikely(!p))
317 		return -EMSGSIZE;
318 
319 	*p++ = xdr_one;			/* Item present */
320 	*p++ = cpu_to_be32(position);
321 	xdr_encode_rdma_segment(p, mr);
322 	return 0;
323 }
324 
325 /* Register and XDR encode the Read list. Supports encoding a list of read
326  * segments that belong to a single read chunk.
327  *
328  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
329  *
330  *  Read chunklist (a linked list):
331  *   N elements, position P (same P for all chunks of same arg!):
332  *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
333  *
334  * Returns zero on success, or a negative errno if a failure occurred.
335  * @xdr is advanced to the next position in the stream.
336  *
337  * Only a single @pos value is currently supported.
338  */
339 static noinline int
340 rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
341 			 struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
342 {
343 	struct xdr_stream *xdr = &req->rl_stream;
344 	struct rpcrdma_mr_seg *seg;
345 	struct rpcrdma_mr *mr;
346 	unsigned int pos;
347 	int nsegs;
348 
349 	pos = rqst->rq_snd_buf.head[0].iov_len;
350 	if (rtype == rpcrdma_areadch)
351 		pos = 0;
352 	seg = req->rl_segments;
353 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
354 				     rtype, seg);
355 	if (nsegs < 0)
356 		return nsegs;
357 
358 	do {
359 		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
360 						   false, &mr);
361 		if (IS_ERR(seg))
362 			return PTR_ERR(seg);
363 		rpcrdma_mr_push(mr, &req->rl_registered);
364 
365 		if (encode_read_segment(xdr, mr, pos) < 0)
366 			return -EMSGSIZE;
367 
368 		trace_xprtrdma_read_chunk(rqst->rq_task, pos, mr, nsegs);
369 		r_xprt->rx_stats.read_chunk_count++;
370 		nsegs -= mr->mr_nents;
371 	} while (nsegs);
372 
373 	return 0;
374 }
375 
376 /* Register and XDR encode the Write list. Supports encoding a list
377  * containing one array of plain segments that belong to a single
378  * write chunk.
379  *
380  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
381  *
382  *  Write chunklist (a list of (one) counted array):
383  *   N elements:
384  *    1 - N - HLOO - HLOO - ... - HLOO - 0
385  *
386  * Returns zero on success, or a negative errno if a failure occurred.
387  * @xdr is advanced to the next position in the stream.
388  *
389  * Only a single Write chunk is currently supported.
390  */
391 static noinline int
392 rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
393 			  struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
394 {
395 	struct xdr_stream *xdr = &req->rl_stream;
396 	struct rpcrdma_mr_seg *seg;
397 	struct rpcrdma_mr *mr;
398 	int nsegs, nchunks;
399 	__be32 *segcount;
400 
401 	seg = req->rl_segments;
402 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
403 				     rqst->rq_rcv_buf.head[0].iov_len,
404 				     wtype, seg);
405 	if (nsegs < 0)
406 		return nsegs;
407 
408 	if (encode_item_present(xdr) < 0)
409 		return -EMSGSIZE;
410 	segcount = xdr_reserve_space(xdr, sizeof(*segcount));
411 	if (unlikely(!segcount))
412 		return -EMSGSIZE;
413 	/* Actual value encoded below */
414 
415 	nchunks = 0;
416 	do {
417 		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
418 						   true, &mr);
419 		if (IS_ERR(seg))
420 			return PTR_ERR(seg);
421 		rpcrdma_mr_push(mr, &req->rl_registered);
422 
423 		if (encode_rdma_segment(xdr, mr) < 0)
424 			return -EMSGSIZE;
425 
426 		trace_xprtrdma_write_chunk(rqst->rq_task, mr, nsegs);
427 		r_xprt->rx_stats.write_chunk_count++;
428 		r_xprt->rx_stats.total_rdma_request += mr->mr_length;
429 		nchunks++;
430 		nsegs -= mr->mr_nents;
431 	} while (nsegs);
432 
433 	/* Update count of segments in this Write chunk */
434 	*segcount = cpu_to_be32(nchunks);
435 
436 	return 0;
437 }
438 
439 /* Register and XDR encode the Reply chunk. Supports encoding an array
440  * of plain segments that belong to a single write (reply) chunk.
441  *
442  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
443  *
444  *  Reply chunk (a counted array):
445  *   N elements:
446  *    1 - N - HLOO - HLOO - ... - HLOO
447  *
448  * Returns zero on success, or a negative errno if a failure occurred.
449  * @xdr is advanced to the next position in the stream.
450  */
451 static noinline int
452 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
453 			   struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
454 {
455 	struct xdr_stream *xdr = &req->rl_stream;
456 	struct rpcrdma_mr_seg *seg;
457 	struct rpcrdma_mr *mr;
458 	int nsegs, nchunks;
459 	__be32 *segcount;
460 
461 	seg = req->rl_segments;
462 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
463 	if (nsegs < 0)
464 		return nsegs;
465 
466 	if (encode_item_present(xdr) < 0)
467 		return -EMSGSIZE;
468 	segcount = xdr_reserve_space(xdr, sizeof(*segcount));
469 	if (unlikely(!segcount))
470 		return -EMSGSIZE;
471 	/* Actual value encoded below */
472 
473 	nchunks = 0;
474 	do {
475 		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
476 						   true, &mr);
477 		if (IS_ERR(seg))
478 			return PTR_ERR(seg);
479 		rpcrdma_mr_push(mr, &req->rl_registered);
480 
481 		if (encode_rdma_segment(xdr, mr) < 0)
482 			return -EMSGSIZE;
483 
484 		trace_xprtrdma_reply_chunk(rqst->rq_task, mr, nsegs);
485 		r_xprt->rx_stats.reply_chunk_count++;
486 		r_xprt->rx_stats.total_rdma_request += mr->mr_length;
487 		nchunks++;
488 		nsegs -= mr->mr_nents;
489 	} while (nsegs);
490 
491 	/* Update count of segments in the Reply chunk */
492 	*segcount = cpu_to_be32(nchunks);
493 
494 	return 0;
495 }
496 
497 /**
498  * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
499  * @sc: sendctx containing SGEs to unmap
500  *
501  */
502 void
503 rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
504 {
505 	struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
506 	struct ib_sge *sge;
507 	unsigned int count;
508 
509 	/* The first two SGEs contain the transport header and
510 	 * the inline buffer. These are always left mapped so
511 	 * they can be cheaply re-used.
512 	 */
513 	sge = &sc->sc_sges[2];
514 	for (count = sc->sc_unmap_count; count; ++sge, --count)
515 		ib_dma_unmap_page(ia->ri_device,
516 				  sge->addr, sge->length, DMA_TO_DEVICE);
517 
518 	if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
519 		smp_mb__after_atomic();
520 		wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
521 	}
522 }
523 
524 /* Prepare an SGE for the RPC-over-RDMA transport header.
525  */
526 static bool
527 rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
528 			u32 len)
529 {
530 	struct rpcrdma_sendctx *sc = req->rl_sendctx;
531 	struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
532 	struct ib_sge *sge = sc->sc_sges;
533 
534 	if (!rpcrdma_dma_map_regbuf(ia, rb))
535 		goto out_regbuf;
536 	sge->addr = rdmab_addr(rb);
537 	sge->length = len;
538 	sge->lkey = rdmab_lkey(rb);
539 
540 	ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
541 				      sge->length, DMA_TO_DEVICE);
542 	sc->sc_wr.num_sge++;
543 	return true;
544 
545 out_regbuf:
546 	pr_err("rpcrdma: failed to DMA map a Send buffer\n");
547 	return false;
548 }
549 
550 /* Prepare the Send SGEs. The head and tail iovec, and each entry
551  * in the page list, gets its own SGE.
552  */
553 static bool
554 rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
555 			 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
556 {
557 	struct rpcrdma_sendctx *sc = req->rl_sendctx;
558 	unsigned int sge_no, page_base, len, remaining;
559 	struct rpcrdma_regbuf *rb = req->rl_sendbuf;
560 	struct ib_device *device = ia->ri_device;
561 	struct ib_sge *sge = sc->sc_sges;
562 	u32 lkey = ia->ri_pd->local_dma_lkey;
563 	struct page *page, **ppages;
564 
565 	/* The head iovec is straightforward, as it is already
566 	 * DMA-mapped. Sync the content that has changed.
567 	 */
568 	if (!rpcrdma_dma_map_regbuf(ia, rb))
569 		goto out_regbuf;
570 	sge_no = 1;
571 	sge[sge_no].addr = rdmab_addr(rb);
572 	sge[sge_no].length = xdr->head[0].iov_len;
573 	sge[sge_no].lkey = rdmab_lkey(rb);
574 	ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr,
575 				      sge[sge_no].length, DMA_TO_DEVICE);
576 
577 	/* If there is a Read chunk, the page list is being handled
578 	 * via explicit RDMA, and thus is skipped here. However, the
579 	 * tail iovec may include an XDR pad for the page list, as
580 	 * well as additional content, and may not reside in the
581 	 * same page as the head iovec.
582 	 */
583 	if (rtype == rpcrdma_readch) {
584 		len = xdr->tail[0].iov_len;
585 
586 		/* Do not include the tail if it is only an XDR pad */
587 		if (len < 4)
588 			goto out;
589 
590 		page = virt_to_page(xdr->tail[0].iov_base);
591 		page_base = offset_in_page(xdr->tail[0].iov_base);
592 
593 		/* If the content in the page list is an odd length,
594 		 * xdr_write_pages() has added a pad at the beginning
595 		 * of the tail iovec. Force the tail's non-pad content
596 		 * to land at the next XDR position in the Send message.
597 		 */
598 		page_base += len & 3;
599 		len -= len & 3;
600 		goto map_tail;
601 	}
602 
603 	/* If there is a page list present, temporarily DMA map
604 	 * and prepare an SGE for each page to be sent.
605 	 */
606 	if (xdr->page_len) {
607 		ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
608 		page_base = offset_in_page(xdr->page_base);
609 		remaining = xdr->page_len;
610 		while (remaining) {
611 			sge_no++;
612 			if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
613 				goto out_mapping_overflow;
614 
615 			len = min_t(u32, PAGE_SIZE - page_base, remaining);
616 			sge[sge_no].addr = ib_dma_map_page(device, *ppages,
617 							   page_base, len,
618 							   DMA_TO_DEVICE);
619 			if (ib_dma_mapping_error(device, sge[sge_no].addr))
620 				goto out_mapping_err;
621 			sge[sge_no].length = len;
622 			sge[sge_no].lkey = lkey;
623 
624 			sc->sc_unmap_count++;
625 			ppages++;
626 			remaining -= len;
627 			page_base = 0;
628 		}
629 	}
630 
631 	/* The tail iovec is not always constructed in the same
632 	 * page where the head iovec resides (see, for example,
633 	 * gss_wrap_req_priv). To neatly accommodate that case,
634 	 * DMA map it separately.
635 	 */
636 	if (xdr->tail[0].iov_len) {
637 		page = virt_to_page(xdr->tail[0].iov_base);
638 		page_base = offset_in_page(xdr->tail[0].iov_base);
639 		len = xdr->tail[0].iov_len;
640 
641 map_tail:
642 		sge_no++;
643 		sge[sge_no].addr = ib_dma_map_page(device, page,
644 						   page_base, len,
645 						   DMA_TO_DEVICE);
646 		if (ib_dma_mapping_error(device, sge[sge_no].addr))
647 			goto out_mapping_err;
648 		sge[sge_no].length = len;
649 		sge[sge_no].lkey = lkey;
650 		sc->sc_unmap_count++;
651 	}
652 
653 out:
654 	sc->sc_wr.num_sge += sge_no;
655 	if (sc->sc_unmap_count)
656 		__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
657 	return true;
658 
659 out_regbuf:
660 	pr_err("rpcrdma: failed to DMA map a Send buffer\n");
661 	return false;
662 
663 out_mapping_overflow:
664 	rpcrdma_unmap_sendctx(sc);
665 	pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
666 	return false;
667 
668 out_mapping_err:
669 	rpcrdma_unmap_sendctx(sc);
670 	pr_err("rpcrdma: Send mapping error\n");
671 	return false;
672 }
673 
674 /**
675  * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
676  * @r_xprt: controlling transport
677  * @req: context of RPC Call being marshalled
678  * @hdrlen: size of transport header, in bytes
679  * @xdr: xdr_buf containing RPC Call
680  * @rtype: chunk type being encoded
681  *
682  * Returns 0 on success; otherwise a negative errno is returned.
683  */
684 int
685 rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
686 			  struct rpcrdma_req *req, u32 hdrlen,
687 			  struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
688 {
689 	req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
690 	if (!req->rl_sendctx)
691 		return -EAGAIN;
692 	req->rl_sendctx->sc_wr.num_sge = 0;
693 	req->rl_sendctx->sc_unmap_count = 0;
694 	req->rl_sendctx->sc_req = req;
695 	__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
696 
697 	if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
698 		return -EIO;
699 
700 	if (rtype != rpcrdma_areadch)
701 		if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
702 			return -EIO;
703 
704 	return 0;
705 }
706 
707 /**
708  * rpcrdma_marshal_req - Marshal and send one RPC request
709  * @r_xprt: controlling transport
710  * @rqst: RPC request to be marshaled
711  *
712  * For the RPC in "rqst", this function:
713  *  - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
714  *  - Registers Read, Write, and Reply chunks
715  *  - Constructs the transport header
716  *  - Posts a Send WR to send the transport header and request
717  *
718  * Returns:
719  *	%0 if the RPC was sent successfully,
720  *	%-ENOTCONN if the connection was lost,
721  *	%-EAGAIN if the caller should call again with the same arguments,
722  *	%-ENOBUFS if the caller should call again after a delay,
723  *	%-EMSGSIZE if the transport header is too small,
724  *	%-EIO if a permanent problem occurred while marshaling.
725  */
726 int
727 rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
728 {
729 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
730 	struct xdr_stream *xdr = &req->rl_stream;
731 	enum rpcrdma_chunktype rtype, wtype;
732 	bool ddp_allowed;
733 	__be32 *p;
734 	int ret;
735 
736 	rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
737 	xdr_init_encode(xdr, &req->rl_hdrbuf,
738 			req->rl_rdmabuf->rg_base);
739 
740 	/* Fixed header fields */
741 	ret = -EMSGSIZE;
742 	p = xdr_reserve_space(xdr, 4 * sizeof(*p));
743 	if (!p)
744 		goto out_err;
745 	*p++ = rqst->rq_xid;
746 	*p++ = rpcrdma_version;
747 	*p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
748 
749 	/* When the ULP employs a GSS flavor that guarantees integrity
750 	 * or privacy, direct data placement of individual data items
751 	 * is not allowed.
752 	 */
753 	ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags &
754 						RPCAUTH_AUTH_DATATOUCH);
755 
756 	/*
757 	 * Chunks needed for results?
758 	 *
759 	 * o If the expected result is under the inline threshold, all ops
760 	 *   return as inline.
761 	 * o Large read ops return data as write chunk(s), header as
762 	 *   inline.
763 	 * o Large non-read ops return as a single reply chunk.
764 	 */
765 	if (rpcrdma_results_inline(r_xprt, rqst))
766 		wtype = rpcrdma_noch;
767 	else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
768 		wtype = rpcrdma_writech;
769 	else
770 		wtype = rpcrdma_replych;
771 
772 	/*
773 	 * Chunks needed for arguments?
774 	 *
775 	 * o If the total request is under the inline threshold, all ops
776 	 *   are sent as inline.
777 	 * o Large write ops transmit data as read chunk(s), header as
778 	 *   inline.
779 	 * o Large non-write ops are sent with the entire message as a
780 	 *   single read chunk (protocol 0-position special case).
781 	 *
782 	 * This assumes that the upper layer does not present a request
783 	 * that both has a data payload, and whose non-data arguments
784 	 * by themselves are larger than the inline threshold.
785 	 */
786 	if (rpcrdma_args_inline(r_xprt, rqst)) {
787 		*p++ = rdma_msg;
788 		rtype = rpcrdma_noch;
789 	} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
790 		*p++ = rdma_msg;
791 		rtype = rpcrdma_readch;
792 	} else {
793 		r_xprt->rx_stats.nomsg_call_count++;
794 		*p++ = rdma_nomsg;
795 		rtype = rpcrdma_areadch;
796 	}
797 
798 	/* If this is a retransmit, discard previously registered
799 	 * chunks. Very likely the connection has been replaced,
800 	 * so these registrations are invalid and unusable.
801 	 */
802 	while (unlikely(!list_empty(&req->rl_registered))) {
803 		struct rpcrdma_mr *mr;
804 
805 		mr = rpcrdma_mr_pop(&req->rl_registered);
806 		rpcrdma_mr_recycle(mr);
807 	}
808 
809 	/* This implementation supports the following combinations
810 	 * of chunk lists in one RPC-over-RDMA Call message:
811 	 *
812 	 *   - Read list
813 	 *   - Write list
814 	 *   - Reply chunk
815 	 *   - Read list + Reply chunk
816 	 *
817 	 * It might not yet support the following combinations:
818 	 *
819 	 *   - Read list + Write list
820 	 *
821 	 * It does not support the following combinations:
822 	 *
823 	 *   - Write list + Reply chunk
824 	 *   - Read list + Write list + Reply chunk
825 	 *
826 	 * This implementation supports only a single chunk in each
827 	 * Read or Write list. Thus for example the client cannot
828 	 * send a Call message with a Position Zero Read chunk and a
829 	 * regular Read chunk at the same time.
830 	 */
831 	if (rtype != rpcrdma_noch) {
832 		ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
833 		if (ret)
834 			goto out_err;
835 	}
836 	ret = encode_item_not_present(xdr);
837 	if (ret)
838 		goto out_err;
839 
840 	if (wtype == rpcrdma_writech) {
841 		ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
842 		if (ret)
843 			goto out_err;
844 	}
845 	ret = encode_item_not_present(xdr);
846 	if (ret)
847 		goto out_err;
848 
849 	if (wtype != rpcrdma_replych)
850 		ret = encode_item_not_present(xdr);
851 	else
852 		ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
853 	if (ret)
854 		goto out_err;
855 
856 	trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
857 
858 	ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
859 					&rqst->rq_snd_buf, rtype);
860 	if (ret)
861 		goto out_err;
862 	return 0;
863 
864 out_err:
865 	switch (ret) {
866 	case -EAGAIN:
867 		xprt_wait_for_buffer_space(rqst->rq_xprt);
868 		break;
869 	case -ENOBUFS:
870 		break;
871 	default:
872 		r_xprt->rx_stats.failed_marshal_count++;
873 	}
874 	return ret;
875 }
876 
877 /**
878  * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
879  * @rqst: controlling RPC request
880  * @srcp: points to RPC message payload in receive buffer
881  * @copy_len: remaining length of receive buffer content
882  * @pad: Write chunk pad bytes needed (zero for pure inline)
883  *
884  * The upper layer has set the maximum number of bytes it can
885  * receive in each component of rq_rcv_buf. These values are set in
886  * the head.iov_len, page_len, tail.iov_len, and buflen fields.
887  *
888  * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
889  * many cases this function simply updates iov_base pointers in
890  * rq_rcv_buf to point directly to the received reply data, to
891  * avoid copying reply data.
892  *
893  * Returns the count of bytes which had to be memcopied.
894  */
895 static unsigned long
896 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
897 {
898 	unsigned long fixup_copy_count;
899 	int i, npages, curlen;
900 	char *destp;
901 	struct page **ppages;
902 	int page_base;
903 
904 	/* The head iovec is redirected to the RPC reply message
905 	 * in the receive buffer, to avoid a memcopy.
906 	 */
907 	rqst->rq_rcv_buf.head[0].iov_base = srcp;
908 	rqst->rq_private_buf.head[0].iov_base = srcp;
909 
910 	/* The contents of the receive buffer that follow
911 	 * head.iov_len bytes are copied into the page list.
912 	 */
913 	curlen = rqst->rq_rcv_buf.head[0].iov_len;
914 	if (curlen > copy_len)
915 		curlen = copy_len;
916 	trace_xprtrdma_fixup(rqst, copy_len, curlen);
917 	srcp += curlen;
918 	copy_len -= curlen;
919 
920 	ppages = rqst->rq_rcv_buf.pages +
921 		(rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
922 	page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
923 	fixup_copy_count = 0;
924 	if (copy_len && rqst->rq_rcv_buf.page_len) {
925 		int pagelist_len;
926 
927 		pagelist_len = rqst->rq_rcv_buf.page_len;
928 		if (pagelist_len > copy_len)
929 			pagelist_len = copy_len;
930 		npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
931 		for (i = 0; i < npages; i++) {
932 			curlen = PAGE_SIZE - page_base;
933 			if (curlen > pagelist_len)
934 				curlen = pagelist_len;
935 
936 			trace_xprtrdma_fixup_pg(rqst, i, srcp,
937 						copy_len, curlen);
938 			destp = kmap_atomic(ppages[i]);
939 			memcpy(destp + page_base, srcp, curlen);
940 			flush_dcache_page(ppages[i]);
941 			kunmap_atomic(destp);
942 			srcp += curlen;
943 			copy_len -= curlen;
944 			fixup_copy_count += curlen;
945 			pagelist_len -= curlen;
946 			if (!pagelist_len)
947 				break;
948 			page_base = 0;
949 		}
950 
951 		/* Implicit padding for the last segment in a Write
952 		 * chunk is inserted inline at the front of the tail
953 		 * iovec. The upper layer ignores the content of
954 		 * the pad. Simply ensure inline content in the tail
955 		 * that follows the Write chunk is properly aligned.
956 		 */
957 		if (pad)
958 			srcp -= pad;
959 	}
960 
961 	/* The tail iovec is redirected to the remaining data
962 	 * in the receive buffer, to avoid a memcopy.
963 	 */
964 	if (copy_len || pad) {
965 		rqst->rq_rcv_buf.tail[0].iov_base = srcp;
966 		rqst->rq_private_buf.tail[0].iov_base = srcp;
967 	}
968 
969 	return fixup_copy_count;
970 }
971 
972 /* By convention, backchannel calls arrive via rdma_msg type
973  * messages, and never populate the chunk lists. This makes
974  * the RPC/RDMA header small and fixed in size, so it is
975  * straightforward to check the RPC header's direction field.
976  */
977 static bool
978 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
979 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
980 {
981 	struct xdr_stream *xdr = &rep->rr_stream;
982 	__be32 *p;
983 
984 	if (rep->rr_proc != rdma_msg)
985 		return false;
986 
987 	/* Peek at stream contents without advancing. */
988 	p = xdr_inline_decode(xdr, 0);
989 
990 	/* Chunk lists */
991 	if (*p++ != xdr_zero)
992 		return false;
993 	if (*p++ != xdr_zero)
994 		return false;
995 	if (*p++ != xdr_zero)
996 		return false;
997 
998 	/* RPC header */
999 	if (*p++ != rep->rr_xid)
1000 		return false;
1001 	if (*p != cpu_to_be32(RPC_CALL))
1002 		return false;
1003 
1004 	/* Now that we are sure this is a backchannel call,
1005 	 * advance to the RPC header.
1006 	 */
1007 	p = xdr_inline_decode(xdr, 3 * sizeof(*p));
1008 	if (unlikely(!p))
1009 		goto out_short;
1010 
1011 	rpcrdma_bc_receive_call(r_xprt, rep);
1012 	return true;
1013 
1014 out_short:
1015 	pr_warn("RPC/RDMA short backward direction call\n");
1016 	return true;
1017 }
1018 #else	/* CONFIG_SUNRPC_BACKCHANNEL */
1019 {
1020 	return false;
1021 }
1022 #endif	/* CONFIG_SUNRPC_BACKCHANNEL */
1023 
1024 static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
1025 {
1026 	u32 handle;
1027 	u64 offset;
1028 	__be32 *p;
1029 
1030 	p = xdr_inline_decode(xdr, 4 * sizeof(*p));
1031 	if (unlikely(!p))
1032 		return -EIO;
1033 
1034 	handle = be32_to_cpup(p++);
1035 	*length = be32_to_cpup(p++);
1036 	xdr_decode_hyper(p, &offset);
1037 
1038 	trace_xprtrdma_decode_seg(handle, *length, offset);
1039 	return 0;
1040 }
1041 
1042 static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
1043 {
1044 	u32 segcount, seglength;
1045 	__be32 *p;
1046 
1047 	p = xdr_inline_decode(xdr, sizeof(*p));
1048 	if (unlikely(!p))
1049 		return -EIO;
1050 
1051 	*length = 0;
1052 	segcount = be32_to_cpup(p);
1053 	while (segcount--) {
1054 		if (decode_rdma_segment(xdr, &seglength))
1055 			return -EIO;
1056 		*length += seglength;
1057 	}
1058 
1059 	return 0;
1060 }
1061 
1062 /* In RPC-over-RDMA Version One replies, a Read list is never
1063  * expected. This decoder is a stub that returns an error if
1064  * a Read list is present.
1065  */
1066 static int decode_read_list(struct xdr_stream *xdr)
1067 {
1068 	__be32 *p;
1069 
1070 	p = xdr_inline_decode(xdr, sizeof(*p));
1071 	if (unlikely(!p))
1072 		return -EIO;
1073 	if (unlikely(*p != xdr_zero))
1074 		return -EIO;
1075 	return 0;
1076 }
1077 
1078 /* Supports only one Write chunk in the Write list
1079  */
1080 static int decode_write_list(struct xdr_stream *xdr, u32 *length)
1081 {
1082 	u32 chunklen;
1083 	bool first;
1084 	__be32 *p;
1085 
1086 	*length = 0;
1087 	first = true;
1088 	do {
1089 		p = xdr_inline_decode(xdr, sizeof(*p));
1090 		if (unlikely(!p))
1091 			return -EIO;
1092 		if (*p == xdr_zero)
1093 			break;
1094 		if (!first)
1095 			return -EIO;
1096 
1097 		if (decode_write_chunk(xdr, &chunklen))
1098 			return -EIO;
1099 		*length += chunklen;
1100 		first = false;
1101 	} while (true);
1102 	return 0;
1103 }
1104 
1105 static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
1106 {
1107 	__be32 *p;
1108 
1109 	p = xdr_inline_decode(xdr, sizeof(*p));
1110 	if (unlikely(!p))
1111 		return -EIO;
1112 
1113 	*length = 0;
1114 	if (*p != xdr_zero)
1115 		if (decode_write_chunk(xdr, length))
1116 			return -EIO;
1117 	return 0;
1118 }
1119 
1120 static int
1121 rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1122 		   struct rpc_rqst *rqst)
1123 {
1124 	struct xdr_stream *xdr = &rep->rr_stream;
1125 	u32 writelist, replychunk, rpclen;
1126 	char *base;
1127 
1128 	/* Decode the chunk lists */
1129 	if (decode_read_list(xdr))
1130 		return -EIO;
1131 	if (decode_write_list(xdr, &writelist))
1132 		return -EIO;
1133 	if (decode_reply_chunk(xdr, &replychunk))
1134 		return -EIO;
1135 
1136 	/* RDMA_MSG sanity checks */
1137 	if (unlikely(replychunk))
1138 		return -EIO;
1139 
1140 	/* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
1141 	base = (char *)xdr_inline_decode(xdr, 0);
1142 	rpclen = xdr_stream_remaining(xdr);
1143 	r_xprt->rx_stats.fixup_copy_count +=
1144 		rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
1145 
1146 	r_xprt->rx_stats.total_rdma_reply += writelist;
1147 	return rpclen + xdr_align_size(writelist);
1148 }
1149 
1150 static noinline int
1151 rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1152 {
1153 	struct xdr_stream *xdr = &rep->rr_stream;
1154 	u32 writelist, replychunk;
1155 
1156 	/* Decode the chunk lists */
1157 	if (decode_read_list(xdr))
1158 		return -EIO;
1159 	if (decode_write_list(xdr, &writelist))
1160 		return -EIO;
1161 	if (decode_reply_chunk(xdr, &replychunk))
1162 		return -EIO;
1163 
1164 	/* RDMA_NOMSG sanity checks */
1165 	if (unlikely(writelist))
1166 		return -EIO;
1167 	if (unlikely(!replychunk))
1168 		return -EIO;
1169 
1170 	/* Reply chunk buffer already is the reply vector */
1171 	r_xprt->rx_stats.total_rdma_reply += replychunk;
1172 	return replychunk;
1173 }
1174 
1175 static noinline int
1176 rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1177 		     struct rpc_rqst *rqst)
1178 {
1179 	struct xdr_stream *xdr = &rep->rr_stream;
1180 	__be32 *p;
1181 
1182 	p = xdr_inline_decode(xdr, sizeof(*p));
1183 	if (unlikely(!p))
1184 		return -EIO;
1185 
1186 	switch (*p) {
1187 	case err_vers:
1188 		p = xdr_inline_decode(xdr, 2 * sizeof(*p));
1189 		if (!p)
1190 			break;
1191 		dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
1192 			rqst->rq_task->tk_pid, __func__,
1193 			be32_to_cpup(p), be32_to_cpu(*(p + 1)));
1194 		break;
1195 	case err_chunk:
1196 		dprintk("RPC: %5u: %s: server reports header decoding error\n",
1197 			rqst->rq_task->tk_pid, __func__);
1198 		break;
1199 	default:
1200 		dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
1201 			rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
1202 	}
1203 
1204 	r_xprt->rx_stats.bad_reply_count++;
1205 	return -EREMOTEIO;
1206 }
1207 
1208 /* Perform XID lookup, reconstruction of the RPC reply, and
1209  * RPC completion while holding the transport lock to ensure
1210  * the rep, rqst, and rq_task pointers remain stable.
1211  */
1212 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
1213 {
1214 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1215 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1216 	struct rpc_rqst *rqst = rep->rr_rqst;
1217 	int status;
1218 
1219 	xprt->reestablish_timeout = 0;
1220 
1221 	switch (rep->rr_proc) {
1222 	case rdma_msg:
1223 		status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1224 		break;
1225 	case rdma_nomsg:
1226 		status = rpcrdma_decode_nomsg(r_xprt, rep);
1227 		break;
1228 	case rdma_error:
1229 		status = rpcrdma_decode_error(r_xprt, rep, rqst);
1230 		break;
1231 	default:
1232 		status = -EIO;
1233 	}
1234 	if (status < 0)
1235 		goto out_badheader;
1236 
1237 out:
1238 	spin_lock(&xprt->queue_lock);
1239 	xprt_complete_rqst(rqst->rq_task, status);
1240 	xprt_unpin_rqst(rqst);
1241 	spin_unlock(&xprt->queue_lock);
1242 	return;
1243 
1244 /* If the incoming reply terminated a pending RPC, the next
1245  * RPC call will post a replacement receive buffer as it is
1246  * being marshaled.
1247  */
1248 out_badheader:
1249 	trace_xprtrdma_reply_hdr(rep);
1250 	r_xprt->rx_stats.bad_reply_count++;
1251 	status = -EIO;
1252 	goto out;
1253 }
1254 
1255 void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
1256 {
1257 	/* Invalidate and unmap the data payloads before waking
1258 	 * the waiting application. This guarantees the memory
1259 	 * regions are properly fenced from the server before the
1260 	 * application accesses the data. It also ensures proper
1261 	 * send flow control: waking the next RPC waits until this
1262 	 * RPC has relinquished all its Send Queue entries.
1263 	 */
1264 	if (!list_empty(&req->rl_registered))
1265 		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
1266 						    &req->rl_registered);
1267 
1268 	/* Ensure that any DMA mapped pages associated with
1269 	 * the Send of the RPC Call have been unmapped before
1270 	 * allowing the RPC to complete. This protects argument
1271 	 * memory not controlled by the RPC client from being
1272 	 * re-used before we're done with it.
1273 	 */
1274 	if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1275 		r_xprt->rx_stats.reply_waits_for_send++;
1276 		out_of_line_wait_on_bit(&req->rl_flags,
1277 					RPCRDMA_REQ_F_TX_RESOURCES,
1278 					bit_wait,
1279 					TASK_UNINTERRUPTIBLE);
1280 	}
1281 }
1282 
1283 /* Reply handling runs in the poll worker thread. Anything that
1284  * might wait is deferred to a separate workqueue.
1285  */
1286 void rpcrdma_deferred_completion(struct work_struct *work)
1287 {
1288 	struct rpcrdma_rep *rep =
1289 			container_of(work, struct rpcrdma_rep, rr_work);
1290 	struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
1291 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1292 
1293 	trace_xprtrdma_defer_cmp(rep);
1294 	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1295 		r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered);
1296 	rpcrdma_release_rqst(r_xprt, req);
1297 	rpcrdma_complete_rqst(rep);
1298 }
1299 
1300 /* Process received RPC/RDMA messages.
1301  *
1302  * Errors must result in the RPC task either being awakened, or
1303  * allowed to timeout, to discover the errors at that time.
1304  */
1305 void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1306 {
1307 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1308 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1309 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1310 	struct rpcrdma_req *req;
1311 	struct rpc_rqst *rqst;
1312 	u32 credits;
1313 	__be32 *p;
1314 
1315 	--buf->rb_posted_receives;
1316 
1317 	if (rep->rr_hdrbuf.head[0].iov_len == 0)
1318 		goto out_badstatus;
1319 
1320 	/* Fixed transport header fields */
1321 	xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
1322 			rep->rr_hdrbuf.head[0].iov_base);
1323 	p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
1324 	if (unlikely(!p))
1325 		goto out_shortreply;
1326 	rep->rr_xid = *p++;
1327 	rep->rr_vers = *p++;
1328 	credits = be32_to_cpu(*p++);
1329 	rep->rr_proc = *p++;
1330 
1331 	if (rep->rr_vers != rpcrdma_version)
1332 		goto out_badversion;
1333 
1334 	if (rpcrdma_is_bcall(r_xprt, rep))
1335 		return;
1336 
1337 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
1338 	 * get context for handling any incoming chunks.
1339 	 */
1340 	spin_lock(&xprt->queue_lock);
1341 	rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
1342 	if (!rqst)
1343 		goto out_norqst;
1344 	xprt_pin_rqst(rqst);
1345 	spin_unlock(&xprt->queue_lock);
1346 
1347 	if (credits == 0)
1348 		credits = 1;	/* don't deadlock */
1349 	else if (credits > buf->rb_max_requests)
1350 		credits = buf->rb_max_requests;
1351 	if (buf->rb_credits != credits) {
1352 		spin_lock_bh(&xprt->transport_lock);
1353 		buf->rb_credits = credits;
1354 		xprt->cwnd = credits << RPC_CWNDSHIFT;
1355 		spin_unlock_bh(&xprt->transport_lock);
1356 	}
1357 
1358 	req = rpcr_to_rdmar(rqst);
1359 	req->rl_reply = rep;
1360 	rep->rr_rqst = rqst;
1361 	clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
1362 
1363 	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
1364 
1365 	rpcrdma_post_recvs(r_xprt, false);
1366 	queue_work(rpcrdma_receive_wq, &rep->rr_work);
1367 	return;
1368 
1369 out_badversion:
1370 	trace_xprtrdma_reply_vers(rep);
1371 	goto repost;
1372 
1373 /* The RPC transaction has already been terminated, or the header
1374  * is corrupt.
1375  */
1376 out_norqst:
1377 	spin_unlock(&xprt->queue_lock);
1378 	trace_xprtrdma_reply_rqst(rep);
1379 	goto repost;
1380 
1381 out_shortreply:
1382 	trace_xprtrdma_reply_short(rep);
1383 
1384 /* If no pending RPC transaction was matched, post a replacement
1385  * receive buffer before returning.
1386  */
1387 repost:
1388 	rpcrdma_post_recvs(r_xprt, false);
1389 out_badstatus:
1390 	rpcrdma_recv_buffer_put(rep);
1391 }
1392