xref: /linux/net/sunrpc/xprtrdma/rpc_rdma.c (revision 1f2367a39f17bd553a75e179a747f9b257bc9478)
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the BSD-type
10  * license below:
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  *
16  *      Redistributions of source code must retain the above copyright
17  *      notice, this list of conditions and the following disclaimer.
18  *
19  *      Redistributions in binary form must reproduce the above
20  *      copyright notice, this list of conditions and the following
21  *      disclaimer in the documentation and/or other materials provided
22  *      with the distribution.
23  *
24  *      Neither the name of the Network Appliance, Inc. nor the names of
25  *      its contributors may be used to endorse or promote products
26  *      derived from this software without specific prior written
27  *      permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  */
41 
42 /*
43  * rpc_rdma.c
44  *
45  * This file contains the guts of the RPC RDMA protocol, and
46  * does marshaling/unmarshaling, etc. It is also where interfacing
47  * to the Linux RPC framework lives.
48  */
49 
50 #include <linux/highmem.h>
51 
52 #include <linux/sunrpc/svc_rdma.h>
53 
54 #include "xprt_rdma.h"
55 #include <trace/events/rpcrdma.h>
56 
57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
58 # define RPCDBG_FACILITY	RPCDBG_TRANS
59 #endif
60 
61 /* Returns size of largest RPC-over-RDMA header in a Call message
62  *
63  * The largest Call header contains a full-size Read list and a
64  * minimal Reply chunk.
65  */
66 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
67 {
68 	unsigned int size;
69 
70 	/* Fixed header fields and list discriminators */
71 	size = RPCRDMA_HDRLEN_MIN;
72 
73 	/* Maximum Read list size */
74 	size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
75 
76 	/* Minimal Read chunk size */
77 	size += sizeof(__be32);	/* segment count */
78 	size += rpcrdma_segment_maxsz * sizeof(__be32);
79 	size += sizeof(__be32);	/* list discriminator */
80 
81 	dprintk("RPC:       %s: max call header size = %u\n",
82 		__func__, size);
83 	return size;
84 }
85 
86 /* Returns size of largest RPC-over-RDMA header in a Reply message
87  *
88  * There is only one Write list or one Reply chunk per Reply
89  * message.  The larger list is the Write list.
90  */
91 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
92 {
93 	unsigned int size;
94 
95 	/* Fixed header fields and list discriminators */
96 	size = RPCRDMA_HDRLEN_MIN;
97 
98 	/* Maximum Write list size */
99 	size = sizeof(__be32);		/* segment count */
100 	size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
101 	size += sizeof(__be32);	/* list discriminator */
102 
103 	dprintk("RPC:       %s: max reply header size = %u\n",
104 		__func__, size);
105 	return size;
106 }
107 
108 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
109 {
110 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
111 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
112 	unsigned int maxsegs = ia->ri_max_segs;
113 
114 	ia->ri_max_inline_write = cdata->inline_wsize -
115 				  rpcrdma_max_call_header_size(maxsegs);
116 	ia->ri_max_inline_read = cdata->inline_rsize -
117 				 rpcrdma_max_reply_header_size(maxsegs);
118 }
119 
120 /* The client can send a request inline as long as the RPCRDMA header
121  * plus the RPC call fit under the transport's inline limit. If the
122  * combined call message size exceeds that limit, the client must use
123  * a Read chunk for this operation.
124  *
125  * A Read chunk is also required if sending the RPC call inline would
126  * exceed this device's max_sge limit.
127  */
128 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
129 				struct rpc_rqst *rqst)
130 {
131 	struct xdr_buf *xdr = &rqst->rq_snd_buf;
132 	unsigned int count, remaining, offset;
133 
134 	if (xdr->len > r_xprt->rx_ia.ri_max_inline_write)
135 		return false;
136 
137 	if (xdr->page_len) {
138 		remaining = xdr->page_len;
139 		offset = offset_in_page(xdr->page_base);
140 		count = RPCRDMA_MIN_SEND_SGES;
141 		while (remaining) {
142 			remaining -= min_t(unsigned int,
143 					   PAGE_SIZE - offset, remaining);
144 			offset = 0;
145 			if (++count > r_xprt->rx_ia.ri_max_send_sges)
146 				return false;
147 		}
148 	}
149 
150 	return true;
151 }
152 
153 /* The client can't know how large the actual reply will be. Thus it
154  * plans for the largest possible reply for that particular ULP
155  * operation. If the maximum combined reply message size exceeds that
156  * limit, the client must provide a write list or a reply chunk for
157  * this request.
158  */
159 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
160 				   struct rpc_rqst *rqst)
161 {
162 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
163 
164 	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
165 }
166 
167 /* The client is required to provide a Reply chunk if the maximum
168  * size of the non-payload part of the RPC Reply is larger than
169  * the inline threshold.
170  */
171 static bool
172 rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
173 			  const struct rpc_rqst *rqst)
174 {
175 	const struct xdr_buf *buf = &rqst->rq_rcv_buf;
176 	const struct rpcrdma_ia *ia = &r_xprt->rx_ia;
177 
178 	return buf->head[0].iov_len + buf->tail[0].iov_len <
179 		ia->ri_max_inline_read;
180 }
181 
182 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
183  * a byte range. Other modes coalesce these SGEs into a single MR
184  * when they can.
185  *
186  * Returns pointer to next available SGE, and bumps the total number
187  * of SGEs consumed.
188  */
189 static struct rpcrdma_mr_seg *
190 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
191 		     unsigned int *n)
192 {
193 	u32 remaining, page_offset;
194 	char *base;
195 
196 	base = vec->iov_base;
197 	page_offset = offset_in_page(base);
198 	remaining = vec->iov_len;
199 	while (remaining) {
200 		seg->mr_page = NULL;
201 		seg->mr_offset = base;
202 		seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
203 		remaining -= seg->mr_len;
204 		base += seg->mr_len;
205 		++seg;
206 		++(*n);
207 		page_offset = 0;
208 	}
209 	return seg;
210 }
211 
212 /* Convert @xdrbuf into SGEs no larger than a page each. As they
213  * are registered, these SGEs are then coalesced into RDMA segments
214  * when the selected memreg mode supports it.
215  *
216  * Returns positive number of SGEs consumed, or a negative errno.
217  */
218 
219 static int
220 rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
221 		     unsigned int pos, enum rpcrdma_chunktype type,
222 		     struct rpcrdma_mr_seg *seg)
223 {
224 	unsigned long page_base;
225 	unsigned int len, n;
226 	struct page **ppages;
227 
228 	n = 0;
229 	if (pos == 0)
230 		seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
231 
232 	len = xdrbuf->page_len;
233 	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
234 	page_base = offset_in_page(xdrbuf->page_base);
235 	while (len) {
236 		/* ACL likes to be lazy in allocating pages - ACLs
237 		 * are small by default but can get huge.
238 		 */
239 		if (unlikely(xdrbuf->flags & XDRBUF_SPARSE_PAGES)) {
240 			if (!*ppages)
241 				*ppages = alloc_page(GFP_ATOMIC);
242 			if (!*ppages)
243 				return -ENOBUFS;
244 		}
245 		seg->mr_page = *ppages;
246 		seg->mr_offset = (char *)page_base;
247 		seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
248 		len -= seg->mr_len;
249 		++ppages;
250 		++seg;
251 		++n;
252 		page_base = 0;
253 	}
254 
255 	/* When encoding a Read chunk, the tail iovec contains an
256 	 * XDR pad and may be omitted.
257 	 */
258 	if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
259 		goto out;
260 
261 	/* When encoding a Write chunk, some servers need to see an
262 	 * extra segment for non-XDR-aligned Write chunks. The upper
263 	 * layer provides space in the tail iovec that may be used
264 	 * for this purpose.
265 	 */
266 	if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
267 		goto out;
268 
269 	if (xdrbuf->tail[0].iov_len)
270 		seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
271 
272 out:
273 	if (unlikely(n > RPCRDMA_MAX_SEGS))
274 		return -EIO;
275 	return n;
276 }
277 
278 static inline int
279 encode_item_present(struct xdr_stream *xdr)
280 {
281 	__be32 *p;
282 
283 	p = xdr_reserve_space(xdr, sizeof(*p));
284 	if (unlikely(!p))
285 		return -EMSGSIZE;
286 
287 	*p = xdr_one;
288 	return 0;
289 }
290 
291 static inline int
292 encode_item_not_present(struct xdr_stream *xdr)
293 {
294 	__be32 *p;
295 
296 	p = xdr_reserve_space(xdr, sizeof(*p));
297 	if (unlikely(!p))
298 		return -EMSGSIZE;
299 
300 	*p = xdr_zero;
301 	return 0;
302 }
303 
304 static void
305 xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr)
306 {
307 	*iptr++ = cpu_to_be32(mr->mr_handle);
308 	*iptr++ = cpu_to_be32(mr->mr_length);
309 	xdr_encode_hyper(iptr, mr->mr_offset);
310 }
311 
312 static int
313 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
314 {
315 	__be32 *p;
316 
317 	p = xdr_reserve_space(xdr, 4 * sizeof(*p));
318 	if (unlikely(!p))
319 		return -EMSGSIZE;
320 
321 	xdr_encode_rdma_segment(p, mr);
322 	return 0;
323 }
324 
325 static int
326 encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
327 		    u32 position)
328 {
329 	__be32 *p;
330 
331 	p = xdr_reserve_space(xdr, 6 * sizeof(*p));
332 	if (unlikely(!p))
333 		return -EMSGSIZE;
334 
335 	*p++ = xdr_one;			/* Item present */
336 	*p++ = cpu_to_be32(position);
337 	xdr_encode_rdma_segment(p, mr);
338 	return 0;
339 }
340 
341 /* Register and XDR encode the Read list. Supports encoding a list of read
342  * segments that belong to a single read chunk.
343  *
344  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
345  *
346  *  Read chunklist (a linked list):
347  *   N elements, position P (same P for all chunks of same arg!):
348  *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
349  *
350  * Returns zero on success, or a negative errno if a failure occurred.
351  * @xdr is advanced to the next position in the stream.
352  *
353  * Only a single @pos value is currently supported.
354  */
355 static noinline int
356 rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
357 			 struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
358 {
359 	struct xdr_stream *xdr = &req->rl_stream;
360 	struct rpcrdma_mr_seg *seg;
361 	struct rpcrdma_mr *mr;
362 	unsigned int pos;
363 	int nsegs;
364 
365 	pos = rqst->rq_snd_buf.head[0].iov_len;
366 	if (rtype == rpcrdma_areadch)
367 		pos = 0;
368 	seg = req->rl_segments;
369 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
370 				     rtype, seg);
371 	if (nsegs < 0)
372 		return nsegs;
373 
374 	do {
375 		seg = frwr_map(r_xprt, seg, nsegs, false, rqst->rq_xid, &mr);
376 		if (IS_ERR(seg))
377 			return PTR_ERR(seg);
378 		rpcrdma_mr_push(mr, &req->rl_registered);
379 
380 		if (encode_read_segment(xdr, mr, pos) < 0)
381 			return -EMSGSIZE;
382 
383 		trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
384 		r_xprt->rx_stats.read_chunk_count++;
385 		nsegs -= mr->mr_nents;
386 	} while (nsegs);
387 
388 	return 0;
389 }
390 
391 /* Register and XDR encode the Write list. Supports encoding a list
392  * containing one array of plain segments that belong to a single
393  * write chunk.
394  *
395  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
396  *
397  *  Write chunklist (a list of (one) counted array):
398  *   N elements:
399  *    1 - N - HLOO - HLOO - ... - HLOO - 0
400  *
401  * Returns zero on success, or a negative errno if a failure occurred.
402  * @xdr is advanced to the next position in the stream.
403  *
404  * Only a single Write chunk is currently supported.
405  */
406 static noinline int
407 rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
408 			  struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
409 {
410 	struct xdr_stream *xdr = &req->rl_stream;
411 	struct rpcrdma_mr_seg *seg;
412 	struct rpcrdma_mr *mr;
413 	int nsegs, nchunks;
414 	__be32 *segcount;
415 
416 	seg = req->rl_segments;
417 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
418 				     rqst->rq_rcv_buf.head[0].iov_len,
419 				     wtype, seg);
420 	if (nsegs < 0)
421 		return nsegs;
422 
423 	if (encode_item_present(xdr) < 0)
424 		return -EMSGSIZE;
425 	segcount = xdr_reserve_space(xdr, sizeof(*segcount));
426 	if (unlikely(!segcount))
427 		return -EMSGSIZE;
428 	/* Actual value encoded below */
429 
430 	nchunks = 0;
431 	do {
432 		seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
433 		if (IS_ERR(seg))
434 			return PTR_ERR(seg);
435 		rpcrdma_mr_push(mr, &req->rl_registered);
436 
437 		if (encode_rdma_segment(xdr, mr) < 0)
438 			return -EMSGSIZE;
439 
440 		trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
441 		r_xprt->rx_stats.write_chunk_count++;
442 		r_xprt->rx_stats.total_rdma_request += mr->mr_length;
443 		nchunks++;
444 		nsegs -= mr->mr_nents;
445 	} while (nsegs);
446 
447 	/* Update count of segments in this Write chunk */
448 	*segcount = cpu_to_be32(nchunks);
449 
450 	return 0;
451 }
452 
453 /* Register and XDR encode the Reply chunk. Supports encoding an array
454  * of plain segments that belong to a single write (reply) chunk.
455  *
456  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
457  *
458  *  Reply chunk (a counted array):
459  *   N elements:
460  *    1 - N - HLOO - HLOO - ... - HLOO
461  *
462  * Returns zero on success, or a negative errno if a failure occurred.
463  * @xdr is advanced to the next position in the stream.
464  */
465 static noinline int
466 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
467 			   struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
468 {
469 	struct xdr_stream *xdr = &req->rl_stream;
470 	struct rpcrdma_mr_seg *seg;
471 	struct rpcrdma_mr *mr;
472 	int nsegs, nchunks;
473 	__be32 *segcount;
474 
475 	seg = req->rl_segments;
476 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
477 	if (nsegs < 0)
478 		return nsegs;
479 
480 	if (encode_item_present(xdr) < 0)
481 		return -EMSGSIZE;
482 	segcount = xdr_reserve_space(xdr, sizeof(*segcount));
483 	if (unlikely(!segcount))
484 		return -EMSGSIZE;
485 	/* Actual value encoded below */
486 
487 	nchunks = 0;
488 	do {
489 		seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
490 		if (IS_ERR(seg))
491 			return PTR_ERR(seg);
492 		rpcrdma_mr_push(mr, &req->rl_registered);
493 
494 		if (encode_rdma_segment(xdr, mr) < 0)
495 			return -EMSGSIZE;
496 
497 		trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
498 		r_xprt->rx_stats.reply_chunk_count++;
499 		r_xprt->rx_stats.total_rdma_request += mr->mr_length;
500 		nchunks++;
501 		nsegs -= mr->mr_nents;
502 	} while (nsegs);
503 
504 	/* Update count of segments in the Reply chunk */
505 	*segcount = cpu_to_be32(nchunks);
506 
507 	return 0;
508 }
509 
510 /**
511  * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
512  * @sc: sendctx containing SGEs to unmap
513  *
514  */
515 void
516 rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
517 {
518 	struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
519 	struct ib_sge *sge;
520 	unsigned int count;
521 
522 	/* The first two SGEs contain the transport header and
523 	 * the inline buffer. These are always left mapped so
524 	 * they can be cheaply re-used.
525 	 */
526 	sge = &sc->sc_sges[2];
527 	for (count = sc->sc_unmap_count; count; ++sge, --count)
528 		ib_dma_unmap_page(ia->ri_device,
529 				  sge->addr, sge->length, DMA_TO_DEVICE);
530 
531 	if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
532 		smp_mb__after_atomic();
533 		wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
534 	}
535 }
536 
537 /* Prepare an SGE for the RPC-over-RDMA transport header.
538  */
539 static bool
540 rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
541 			u32 len)
542 {
543 	struct rpcrdma_sendctx *sc = req->rl_sendctx;
544 	struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
545 	struct ib_sge *sge = sc->sc_sges;
546 
547 	if (!rpcrdma_dma_map_regbuf(ia, rb))
548 		goto out_regbuf;
549 	sge->addr = rdmab_addr(rb);
550 	sge->length = len;
551 	sge->lkey = rdmab_lkey(rb);
552 
553 	ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
554 				      sge->length, DMA_TO_DEVICE);
555 	sc->sc_wr.num_sge++;
556 	return true;
557 
558 out_regbuf:
559 	pr_err("rpcrdma: failed to DMA map a Send buffer\n");
560 	return false;
561 }
562 
563 /* Prepare the Send SGEs. The head and tail iovec, and each entry
564  * in the page list, gets its own SGE.
565  */
566 static bool
567 rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
568 			 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
569 {
570 	struct rpcrdma_sendctx *sc = req->rl_sendctx;
571 	unsigned int sge_no, page_base, len, remaining;
572 	struct rpcrdma_regbuf *rb = req->rl_sendbuf;
573 	struct ib_device *device = ia->ri_device;
574 	struct ib_sge *sge = sc->sc_sges;
575 	u32 lkey = ia->ri_pd->local_dma_lkey;
576 	struct page *page, **ppages;
577 
578 	/* The head iovec is straightforward, as it is already
579 	 * DMA-mapped. Sync the content that has changed.
580 	 */
581 	if (!rpcrdma_dma_map_regbuf(ia, rb))
582 		goto out_regbuf;
583 	sge_no = 1;
584 	sge[sge_no].addr = rdmab_addr(rb);
585 	sge[sge_no].length = xdr->head[0].iov_len;
586 	sge[sge_no].lkey = rdmab_lkey(rb);
587 	ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr,
588 				      sge[sge_no].length, DMA_TO_DEVICE);
589 
590 	/* If there is a Read chunk, the page list is being handled
591 	 * via explicit RDMA, and thus is skipped here. However, the
592 	 * tail iovec may include an XDR pad for the page list, as
593 	 * well as additional content, and may not reside in the
594 	 * same page as the head iovec.
595 	 */
596 	if (rtype == rpcrdma_readch) {
597 		len = xdr->tail[0].iov_len;
598 
599 		/* Do not include the tail if it is only an XDR pad */
600 		if (len < 4)
601 			goto out;
602 
603 		page = virt_to_page(xdr->tail[0].iov_base);
604 		page_base = offset_in_page(xdr->tail[0].iov_base);
605 
606 		/* If the content in the page list is an odd length,
607 		 * xdr_write_pages() has added a pad at the beginning
608 		 * of the tail iovec. Force the tail's non-pad content
609 		 * to land at the next XDR position in the Send message.
610 		 */
611 		page_base += len & 3;
612 		len -= len & 3;
613 		goto map_tail;
614 	}
615 
616 	/* If there is a page list present, temporarily DMA map
617 	 * and prepare an SGE for each page to be sent.
618 	 */
619 	if (xdr->page_len) {
620 		ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
621 		page_base = offset_in_page(xdr->page_base);
622 		remaining = xdr->page_len;
623 		while (remaining) {
624 			sge_no++;
625 			if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
626 				goto out_mapping_overflow;
627 
628 			len = min_t(u32, PAGE_SIZE - page_base, remaining);
629 			sge[sge_no].addr = ib_dma_map_page(device, *ppages,
630 							   page_base, len,
631 							   DMA_TO_DEVICE);
632 			if (ib_dma_mapping_error(device, sge[sge_no].addr))
633 				goto out_mapping_err;
634 			sge[sge_no].length = len;
635 			sge[sge_no].lkey = lkey;
636 
637 			sc->sc_unmap_count++;
638 			ppages++;
639 			remaining -= len;
640 			page_base = 0;
641 		}
642 	}
643 
644 	/* The tail iovec is not always constructed in the same
645 	 * page where the head iovec resides (see, for example,
646 	 * gss_wrap_req_priv). To neatly accommodate that case,
647 	 * DMA map it separately.
648 	 */
649 	if (xdr->tail[0].iov_len) {
650 		page = virt_to_page(xdr->tail[0].iov_base);
651 		page_base = offset_in_page(xdr->tail[0].iov_base);
652 		len = xdr->tail[0].iov_len;
653 
654 map_tail:
655 		sge_no++;
656 		sge[sge_no].addr = ib_dma_map_page(device, page,
657 						   page_base, len,
658 						   DMA_TO_DEVICE);
659 		if (ib_dma_mapping_error(device, sge[sge_no].addr))
660 			goto out_mapping_err;
661 		sge[sge_no].length = len;
662 		sge[sge_no].lkey = lkey;
663 		sc->sc_unmap_count++;
664 	}
665 
666 out:
667 	sc->sc_wr.num_sge += sge_no;
668 	if (sc->sc_unmap_count)
669 		__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
670 	return true;
671 
672 out_regbuf:
673 	pr_err("rpcrdma: failed to DMA map a Send buffer\n");
674 	return false;
675 
676 out_mapping_overflow:
677 	rpcrdma_unmap_sendctx(sc);
678 	pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
679 	return false;
680 
681 out_mapping_err:
682 	rpcrdma_unmap_sendctx(sc);
683 	trace_xprtrdma_dma_maperr(sge[sge_no].addr);
684 	return false;
685 }
686 
687 /**
688  * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
689  * @r_xprt: controlling transport
690  * @req: context of RPC Call being marshalled
691  * @hdrlen: size of transport header, in bytes
692  * @xdr: xdr_buf containing RPC Call
693  * @rtype: chunk type being encoded
694  *
695  * Returns 0 on success; otherwise a negative errno is returned.
696  */
697 int
698 rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
699 			  struct rpcrdma_req *req, u32 hdrlen,
700 			  struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
701 {
702 	req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
703 	if (!req->rl_sendctx)
704 		return -EAGAIN;
705 	req->rl_sendctx->sc_wr.num_sge = 0;
706 	req->rl_sendctx->sc_unmap_count = 0;
707 	req->rl_sendctx->sc_req = req;
708 	__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
709 
710 	if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
711 		return -EIO;
712 
713 	if (rtype != rpcrdma_areadch)
714 		if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
715 			return -EIO;
716 
717 	return 0;
718 }
719 
720 /**
721  * rpcrdma_marshal_req - Marshal and send one RPC request
722  * @r_xprt: controlling transport
723  * @rqst: RPC request to be marshaled
724  *
725  * For the RPC in "rqst", this function:
726  *  - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
727  *  - Registers Read, Write, and Reply chunks
728  *  - Constructs the transport header
729  *  - Posts a Send WR to send the transport header and request
730  *
731  * Returns:
732  *	%0 if the RPC was sent successfully,
733  *	%-ENOTCONN if the connection was lost,
734  *	%-EAGAIN if the caller should call again with the same arguments,
735  *	%-ENOBUFS if the caller should call again after a delay,
736  *	%-EMSGSIZE if the transport header is too small,
737  *	%-EIO if a permanent problem occurred while marshaling.
738  */
739 int
740 rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
741 {
742 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
743 	struct xdr_stream *xdr = &req->rl_stream;
744 	enum rpcrdma_chunktype rtype, wtype;
745 	bool ddp_allowed;
746 	__be32 *p;
747 	int ret;
748 
749 	rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
750 	xdr_init_encode(xdr, &req->rl_hdrbuf,
751 			req->rl_rdmabuf->rg_base, rqst);
752 
753 	/* Fixed header fields */
754 	ret = -EMSGSIZE;
755 	p = xdr_reserve_space(xdr, 4 * sizeof(*p));
756 	if (!p)
757 		goto out_err;
758 	*p++ = rqst->rq_xid;
759 	*p++ = rpcrdma_version;
760 	*p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
761 
762 	/* When the ULP employs a GSS flavor that guarantees integrity
763 	 * or privacy, direct data placement of individual data items
764 	 * is not allowed.
765 	 */
766 	ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags &
767 						RPCAUTH_AUTH_DATATOUCH);
768 
769 	/*
770 	 * Chunks needed for results?
771 	 *
772 	 * o If the expected result is under the inline threshold, all ops
773 	 *   return as inline.
774 	 * o Large read ops return data as write chunk(s), header as
775 	 *   inline.
776 	 * o Large non-read ops return as a single reply chunk.
777 	 */
778 	if (rpcrdma_results_inline(r_xprt, rqst))
779 		wtype = rpcrdma_noch;
780 	else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
781 		 rpcrdma_nonpayload_inline(r_xprt, rqst))
782 		wtype = rpcrdma_writech;
783 	else
784 		wtype = rpcrdma_replych;
785 
786 	/*
787 	 * Chunks needed for arguments?
788 	 *
789 	 * o If the total request is under the inline threshold, all ops
790 	 *   are sent as inline.
791 	 * o Large write ops transmit data as read chunk(s), header as
792 	 *   inline.
793 	 * o Large non-write ops are sent with the entire message as a
794 	 *   single read chunk (protocol 0-position special case).
795 	 *
796 	 * This assumes that the upper layer does not present a request
797 	 * that both has a data payload, and whose non-data arguments
798 	 * by themselves are larger than the inline threshold.
799 	 */
800 	if (rpcrdma_args_inline(r_xprt, rqst)) {
801 		*p++ = rdma_msg;
802 		rtype = rpcrdma_noch;
803 	} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
804 		*p++ = rdma_msg;
805 		rtype = rpcrdma_readch;
806 	} else {
807 		r_xprt->rx_stats.nomsg_call_count++;
808 		*p++ = rdma_nomsg;
809 		rtype = rpcrdma_areadch;
810 	}
811 
812 	/* If this is a retransmit, discard previously registered
813 	 * chunks. Very likely the connection has been replaced,
814 	 * so these registrations are invalid and unusable.
815 	 */
816 	while (unlikely(!list_empty(&req->rl_registered))) {
817 		struct rpcrdma_mr *mr;
818 
819 		mr = rpcrdma_mr_pop(&req->rl_registered);
820 		rpcrdma_mr_recycle(mr);
821 	}
822 
823 	/* This implementation supports the following combinations
824 	 * of chunk lists in one RPC-over-RDMA Call message:
825 	 *
826 	 *   - Read list
827 	 *   - Write list
828 	 *   - Reply chunk
829 	 *   - Read list + Reply chunk
830 	 *
831 	 * It might not yet support the following combinations:
832 	 *
833 	 *   - Read list + Write list
834 	 *
835 	 * It does not support the following combinations:
836 	 *
837 	 *   - Write list + Reply chunk
838 	 *   - Read list + Write list + Reply chunk
839 	 *
840 	 * This implementation supports only a single chunk in each
841 	 * Read or Write list. Thus for example the client cannot
842 	 * send a Call message with a Position Zero Read chunk and a
843 	 * regular Read chunk at the same time.
844 	 */
845 	if (rtype != rpcrdma_noch) {
846 		ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
847 		if (ret)
848 			goto out_err;
849 	}
850 	ret = encode_item_not_present(xdr);
851 	if (ret)
852 		goto out_err;
853 
854 	if (wtype == rpcrdma_writech) {
855 		ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
856 		if (ret)
857 			goto out_err;
858 	}
859 	ret = encode_item_not_present(xdr);
860 	if (ret)
861 		goto out_err;
862 
863 	if (wtype != rpcrdma_replych)
864 		ret = encode_item_not_present(xdr);
865 	else
866 		ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
867 	if (ret)
868 		goto out_err;
869 
870 	trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
871 
872 	ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
873 					&rqst->rq_snd_buf, rtype);
874 	if (ret)
875 		goto out_err;
876 	return 0;
877 
878 out_err:
879 	switch (ret) {
880 	case -EAGAIN:
881 		xprt_wait_for_buffer_space(rqst->rq_xprt);
882 		break;
883 	case -ENOBUFS:
884 		break;
885 	default:
886 		r_xprt->rx_stats.failed_marshal_count++;
887 	}
888 	return ret;
889 }
890 
891 /**
892  * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
893  * @rqst: controlling RPC request
894  * @srcp: points to RPC message payload in receive buffer
895  * @copy_len: remaining length of receive buffer content
896  * @pad: Write chunk pad bytes needed (zero for pure inline)
897  *
898  * The upper layer has set the maximum number of bytes it can
899  * receive in each component of rq_rcv_buf. These values are set in
900  * the head.iov_len, page_len, tail.iov_len, and buflen fields.
901  *
902  * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
903  * many cases this function simply updates iov_base pointers in
904  * rq_rcv_buf to point directly to the received reply data, to
905  * avoid copying reply data.
906  *
907  * Returns the count of bytes which had to be memcopied.
908  */
909 static unsigned long
910 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
911 {
912 	unsigned long fixup_copy_count;
913 	int i, npages, curlen;
914 	char *destp;
915 	struct page **ppages;
916 	int page_base;
917 
918 	/* The head iovec is redirected to the RPC reply message
919 	 * in the receive buffer, to avoid a memcopy.
920 	 */
921 	rqst->rq_rcv_buf.head[0].iov_base = srcp;
922 	rqst->rq_private_buf.head[0].iov_base = srcp;
923 
924 	/* The contents of the receive buffer that follow
925 	 * head.iov_len bytes are copied into the page list.
926 	 */
927 	curlen = rqst->rq_rcv_buf.head[0].iov_len;
928 	if (curlen > copy_len)
929 		curlen = copy_len;
930 	trace_xprtrdma_fixup(rqst, copy_len, curlen);
931 	srcp += curlen;
932 	copy_len -= curlen;
933 
934 	ppages = rqst->rq_rcv_buf.pages +
935 		(rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
936 	page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
937 	fixup_copy_count = 0;
938 	if (copy_len && rqst->rq_rcv_buf.page_len) {
939 		int pagelist_len;
940 
941 		pagelist_len = rqst->rq_rcv_buf.page_len;
942 		if (pagelist_len > copy_len)
943 			pagelist_len = copy_len;
944 		npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
945 		for (i = 0; i < npages; i++) {
946 			curlen = PAGE_SIZE - page_base;
947 			if (curlen > pagelist_len)
948 				curlen = pagelist_len;
949 
950 			trace_xprtrdma_fixup_pg(rqst, i, srcp,
951 						copy_len, curlen);
952 			destp = kmap_atomic(ppages[i]);
953 			memcpy(destp + page_base, srcp, curlen);
954 			flush_dcache_page(ppages[i]);
955 			kunmap_atomic(destp);
956 			srcp += curlen;
957 			copy_len -= curlen;
958 			fixup_copy_count += curlen;
959 			pagelist_len -= curlen;
960 			if (!pagelist_len)
961 				break;
962 			page_base = 0;
963 		}
964 
965 		/* Implicit padding for the last segment in a Write
966 		 * chunk is inserted inline at the front of the tail
967 		 * iovec. The upper layer ignores the content of
968 		 * the pad. Simply ensure inline content in the tail
969 		 * that follows the Write chunk is properly aligned.
970 		 */
971 		if (pad)
972 			srcp -= pad;
973 	}
974 
975 	/* The tail iovec is redirected to the remaining data
976 	 * in the receive buffer, to avoid a memcopy.
977 	 */
978 	if (copy_len || pad) {
979 		rqst->rq_rcv_buf.tail[0].iov_base = srcp;
980 		rqst->rq_private_buf.tail[0].iov_base = srcp;
981 	}
982 
983 	return fixup_copy_count;
984 }
985 
986 /* By convention, backchannel calls arrive via rdma_msg type
987  * messages, and never populate the chunk lists. This makes
988  * the RPC/RDMA header small and fixed in size, so it is
989  * straightforward to check the RPC header's direction field.
990  */
991 static bool
992 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
993 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
994 {
995 	struct xdr_stream *xdr = &rep->rr_stream;
996 	__be32 *p;
997 
998 	if (rep->rr_proc != rdma_msg)
999 		return false;
1000 
1001 	/* Peek at stream contents without advancing. */
1002 	p = xdr_inline_decode(xdr, 0);
1003 
1004 	/* Chunk lists */
1005 	if (*p++ != xdr_zero)
1006 		return false;
1007 	if (*p++ != xdr_zero)
1008 		return false;
1009 	if (*p++ != xdr_zero)
1010 		return false;
1011 
1012 	/* RPC header */
1013 	if (*p++ != rep->rr_xid)
1014 		return false;
1015 	if (*p != cpu_to_be32(RPC_CALL))
1016 		return false;
1017 
1018 	/* Now that we are sure this is a backchannel call,
1019 	 * advance to the RPC header.
1020 	 */
1021 	p = xdr_inline_decode(xdr, 3 * sizeof(*p));
1022 	if (unlikely(!p))
1023 		goto out_short;
1024 
1025 	rpcrdma_bc_receive_call(r_xprt, rep);
1026 	return true;
1027 
1028 out_short:
1029 	pr_warn("RPC/RDMA short backward direction call\n");
1030 	return true;
1031 }
1032 #else	/* CONFIG_SUNRPC_BACKCHANNEL */
1033 {
1034 	return false;
1035 }
1036 #endif	/* CONFIG_SUNRPC_BACKCHANNEL */
1037 
1038 static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
1039 {
1040 	u32 handle;
1041 	u64 offset;
1042 	__be32 *p;
1043 
1044 	p = xdr_inline_decode(xdr, 4 * sizeof(*p));
1045 	if (unlikely(!p))
1046 		return -EIO;
1047 
1048 	handle = be32_to_cpup(p++);
1049 	*length = be32_to_cpup(p++);
1050 	xdr_decode_hyper(p, &offset);
1051 
1052 	trace_xprtrdma_decode_seg(handle, *length, offset);
1053 	return 0;
1054 }
1055 
1056 static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
1057 {
1058 	u32 segcount, seglength;
1059 	__be32 *p;
1060 
1061 	p = xdr_inline_decode(xdr, sizeof(*p));
1062 	if (unlikely(!p))
1063 		return -EIO;
1064 
1065 	*length = 0;
1066 	segcount = be32_to_cpup(p);
1067 	while (segcount--) {
1068 		if (decode_rdma_segment(xdr, &seglength))
1069 			return -EIO;
1070 		*length += seglength;
1071 	}
1072 
1073 	return 0;
1074 }
1075 
1076 /* In RPC-over-RDMA Version One replies, a Read list is never
1077  * expected. This decoder is a stub that returns an error if
1078  * a Read list is present.
1079  */
1080 static int decode_read_list(struct xdr_stream *xdr)
1081 {
1082 	__be32 *p;
1083 
1084 	p = xdr_inline_decode(xdr, sizeof(*p));
1085 	if (unlikely(!p))
1086 		return -EIO;
1087 	if (unlikely(*p != xdr_zero))
1088 		return -EIO;
1089 	return 0;
1090 }
1091 
1092 /* Supports only one Write chunk in the Write list
1093  */
1094 static int decode_write_list(struct xdr_stream *xdr, u32 *length)
1095 {
1096 	u32 chunklen;
1097 	bool first;
1098 	__be32 *p;
1099 
1100 	*length = 0;
1101 	first = true;
1102 	do {
1103 		p = xdr_inline_decode(xdr, sizeof(*p));
1104 		if (unlikely(!p))
1105 			return -EIO;
1106 		if (*p == xdr_zero)
1107 			break;
1108 		if (!first)
1109 			return -EIO;
1110 
1111 		if (decode_write_chunk(xdr, &chunklen))
1112 			return -EIO;
1113 		*length += chunklen;
1114 		first = false;
1115 	} while (true);
1116 	return 0;
1117 }
1118 
1119 static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
1120 {
1121 	__be32 *p;
1122 
1123 	p = xdr_inline_decode(xdr, sizeof(*p));
1124 	if (unlikely(!p))
1125 		return -EIO;
1126 
1127 	*length = 0;
1128 	if (*p != xdr_zero)
1129 		if (decode_write_chunk(xdr, length))
1130 			return -EIO;
1131 	return 0;
1132 }
1133 
1134 static int
1135 rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1136 		   struct rpc_rqst *rqst)
1137 {
1138 	struct xdr_stream *xdr = &rep->rr_stream;
1139 	u32 writelist, replychunk, rpclen;
1140 	char *base;
1141 
1142 	/* Decode the chunk lists */
1143 	if (decode_read_list(xdr))
1144 		return -EIO;
1145 	if (decode_write_list(xdr, &writelist))
1146 		return -EIO;
1147 	if (decode_reply_chunk(xdr, &replychunk))
1148 		return -EIO;
1149 
1150 	/* RDMA_MSG sanity checks */
1151 	if (unlikely(replychunk))
1152 		return -EIO;
1153 
1154 	/* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
1155 	base = (char *)xdr_inline_decode(xdr, 0);
1156 	rpclen = xdr_stream_remaining(xdr);
1157 	r_xprt->rx_stats.fixup_copy_count +=
1158 		rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
1159 
1160 	r_xprt->rx_stats.total_rdma_reply += writelist;
1161 	return rpclen + xdr_align_size(writelist);
1162 }
1163 
1164 static noinline int
1165 rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1166 {
1167 	struct xdr_stream *xdr = &rep->rr_stream;
1168 	u32 writelist, replychunk;
1169 
1170 	/* Decode the chunk lists */
1171 	if (decode_read_list(xdr))
1172 		return -EIO;
1173 	if (decode_write_list(xdr, &writelist))
1174 		return -EIO;
1175 	if (decode_reply_chunk(xdr, &replychunk))
1176 		return -EIO;
1177 
1178 	/* RDMA_NOMSG sanity checks */
1179 	if (unlikely(writelist))
1180 		return -EIO;
1181 	if (unlikely(!replychunk))
1182 		return -EIO;
1183 
1184 	/* Reply chunk buffer already is the reply vector */
1185 	r_xprt->rx_stats.total_rdma_reply += replychunk;
1186 	return replychunk;
1187 }
1188 
1189 static noinline int
1190 rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1191 		     struct rpc_rqst *rqst)
1192 {
1193 	struct xdr_stream *xdr = &rep->rr_stream;
1194 	__be32 *p;
1195 
1196 	p = xdr_inline_decode(xdr, sizeof(*p));
1197 	if (unlikely(!p))
1198 		return -EIO;
1199 
1200 	switch (*p) {
1201 	case err_vers:
1202 		p = xdr_inline_decode(xdr, 2 * sizeof(*p));
1203 		if (!p)
1204 			break;
1205 		dprintk("RPC:       %s: server reports "
1206 			"version error (%u-%u), xid %08x\n", __func__,
1207 			be32_to_cpup(p), be32_to_cpu(*(p + 1)),
1208 			be32_to_cpu(rep->rr_xid));
1209 		break;
1210 	case err_chunk:
1211 		dprintk("RPC:       %s: server reports "
1212 			"header decoding error, xid %08x\n", __func__,
1213 			be32_to_cpu(rep->rr_xid));
1214 		break;
1215 	default:
1216 		dprintk("RPC:       %s: server reports "
1217 			"unrecognized error %d, xid %08x\n", __func__,
1218 			be32_to_cpup(p), be32_to_cpu(rep->rr_xid));
1219 	}
1220 
1221 	r_xprt->rx_stats.bad_reply_count++;
1222 	return -EREMOTEIO;
1223 }
1224 
1225 /* Perform XID lookup, reconstruction of the RPC reply, and
1226  * RPC completion while holding the transport lock to ensure
1227  * the rep, rqst, and rq_task pointers remain stable.
1228  */
1229 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
1230 {
1231 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1232 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1233 	struct rpc_rqst *rqst = rep->rr_rqst;
1234 	int status;
1235 
1236 	xprt->reestablish_timeout = 0;
1237 
1238 	switch (rep->rr_proc) {
1239 	case rdma_msg:
1240 		status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1241 		break;
1242 	case rdma_nomsg:
1243 		status = rpcrdma_decode_nomsg(r_xprt, rep);
1244 		break;
1245 	case rdma_error:
1246 		status = rpcrdma_decode_error(r_xprt, rep, rqst);
1247 		break;
1248 	default:
1249 		status = -EIO;
1250 	}
1251 	if (status < 0)
1252 		goto out_badheader;
1253 
1254 out:
1255 	spin_lock(&xprt->queue_lock);
1256 	xprt_complete_rqst(rqst->rq_task, status);
1257 	xprt_unpin_rqst(rqst);
1258 	spin_unlock(&xprt->queue_lock);
1259 	return;
1260 
1261 /* If the incoming reply terminated a pending RPC, the next
1262  * RPC call will post a replacement receive buffer as it is
1263  * being marshaled.
1264  */
1265 out_badheader:
1266 	trace_xprtrdma_reply_hdr(rep);
1267 	r_xprt->rx_stats.bad_reply_count++;
1268 	goto out;
1269 }
1270 
1271 void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
1272 {
1273 	/* Invalidate and unmap the data payloads before waking
1274 	 * the waiting application. This guarantees the memory
1275 	 * regions are properly fenced from the server before the
1276 	 * application accesses the data. It also ensures proper
1277 	 * send flow control: waking the next RPC waits until this
1278 	 * RPC has relinquished all its Send Queue entries.
1279 	 */
1280 	if (!list_empty(&req->rl_registered))
1281 		frwr_unmap_sync(r_xprt, &req->rl_registered);
1282 
1283 	/* Ensure that any DMA mapped pages associated with
1284 	 * the Send of the RPC Call have been unmapped before
1285 	 * allowing the RPC to complete. This protects argument
1286 	 * memory not controlled by the RPC client from being
1287 	 * re-used before we're done with it.
1288 	 */
1289 	if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1290 		r_xprt->rx_stats.reply_waits_for_send++;
1291 		out_of_line_wait_on_bit(&req->rl_flags,
1292 					RPCRDMA_REQ_F_TX_RESOURCES,
1293 					bit_wait,
1294 					TASK_UNINTERRUPTIBLE);
1295 	}
1296 }
1297 
1298 /* Reply handling runs in the poll worker thread. Anything that
1299  * might wait is deferred to a separate workqueue.
1300  */
1301 void rpcrdma_deferred_completion(struct work_struct *work)
1302 {
1303 	struct rpcrdma_rep *rep =
1304 			container_of(work, struct rpcrdma_rep, rr_work);
1305 	struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
1306 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1307 
1308 	trace_xprtrdma_defer_cmp(rep);
1309 	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1310 		frwr_reminv(rep, &req->rl_registered);
1311 	rpcrdma_release_rqst(r_xprt, req);
1312 	rpcrdma_complete_rqst(rep);
1313 }
1314 
1315 /* Process received RPC/RDMA messages.
1316  *
1317  * Errors must result in the RPC task either being awakened, or
1318  * allowed to timeout, to discover the errors at that time.
1319  */
1320 void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1321 {
1322 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1323 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1324 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1325 	struct rpcrdma_req *req;
1326 	struct rpc_rqst *rqst;
1327 	u32 credits;
1328 	__be32 *p;
1329 
1330 	/* Fixed transport header fields */
1331 	xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
1332 			rep->rr_hdrbuf.head[0].iov_base, NULL);
1333 	p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
1334 	if (unlikely(!p))
1335 		goto out_shortreply;
1336 	rep->rr_xid = *p++;
1337 	rep->rr_vers = *p++;
1338 	credits = be32_to_cpu(*p++);
1339 	rep->rr_proc = *p++;
1340 
1341 	if (rep->rr_vers != rpcrdma_version)
1342 		goto out_badversion;
1343 
1344 	if (rpcrdma_is_bcall(r_xprt, rep))
1345 		return;
1346 
1347 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
1348 	 * get context for handling any incoming chunks.
1349 	 */
1350 	spin_lock(&xprt->queue_lock);
1351 	rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
1352 	if (!rqst)
1353 		goto out_norqst;
1354 	xprt_pin_rqst(rqst);
1355 	spin_unlock(&xprt->queue_lock);
1356 
1357 	if (credits == 0)
1358 		credits = 1;	/* don't deadlock */
1359 	else if (credits > buf->rb_max_requests)
1360 		credits = buf->rb_max_requests;
1361 	if (buf->rb_credits != credits) {
1362 		spin_lock_bh(&xprt->transport_lock);
1363 		buf->rb_credits = credits;
1364 		xprt->cwnd = credits << RPC_CWNDSHIFT;
1365 		spin_unlock_bh(&xprt->transport_lock);
1366 	}
1367 
1368 	req = rpcr_to_rdmar(rqst);
1369 	if (req->rl_reply) {
1370 		trace_xprtrdma_leaked_rep(rqst, req->rl_reply);
1371 		rpcrdma_recv_buffer_put(req->rl_reply);
1372 	}
1373 	req->rl_reply = rep;
1374 	rep->rr_rqst = rqst;
1375 	clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
1376 
1377 	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
1378 	queue_work(buf->rb_completion_wq, &rep->rr_work);
1379 	return;
1380 
1381 out_badversion:
1382 	trace_xprtrdma_reply_vers(rep);
1383 	goto out;
1384 
1385 out_norqst:
1386 	spin_unlock(&xprt->queue_lock);
1387 	trace_xprtrdma_reply_rqst(rep);
1388 	goto out;
1389 
1390 out_shortreply:
1391 	trace_xprtrdma_reply_short(rep);
1392 
1393 out:
1394 	rpcrdma_recv_buffer_put(rep);
1395 }
1396