xref: /linux/net/sunrpc/xprtrdma/rpc_rdma.c (revision 0883c2c06fb5bcf5b9e008270827e63c09a88c1e)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * rpc_rdma.c
42  *
43  * This file contains the guts of the RPC RDMA protocol, and
44  * does marshaling/unmarshaling, etc. It is also where interfacing
45  * to the Linux RPC framework lives.
46  */
47 
48 #include "xprt_rdma.h"
49 
50 #include <linux/highmem.h>
51 
52 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
53 # define RPCDBG_FACILITY	RPCDBG_TRANS
54 #endif
55 
56 enum rpcrdma_chunktype {
57 	rpcrdma_noch = 0,
58 	rpcrdma_readch,
59 	rpcrdma_areadch,
60 	rpcrdma_writech,
61 	rpcrdma_replych
62 };
63 
64 static const char transfertypes[][12] = {
65 	"inline",	/* no chunks */
66 	"read list",	/* some argument via rdma read */
67 	"*read list",	/* entire request via rdma read */
68 	"write list",	/* some result via rdma write */
69 	"reply chunk"	/* entire reply via rdma write */
70 };
71 
72 /* Returns size of largest RPC-over-RDMA header in a Call message
73  *
74  * The largest Call header contains a full-size Read list and a
75  * minimal Reply chunk.
76  */
77 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
78 {
79 	unsigned int size;
80 
81 	/* Fixed header fields and list discriminators */
82 	size = RPCRDMA_HDRLEN_MIN;
83 
84 	/* Maximum Read list size */
85 	maxsegs += 2;	/* segment for head and tail buffers */
86 	size = maxsegs * sizeof(struct rpcrdma_read_chunk);
87 
88 	/* Minimal Read chunk size */
89 	size += sizeof(__be32);	/* segment count */
90 	size += sizeof(struct rpcrdma_segment);
91 	size += sizeof(__be32);	/* list discriminator */
92 
93 	dprintk("RPC:       %s: max call header size = %u\n",
94 		__func__, size);
95 	return size;
96 }
97 
98 /* Returns size of largest RPC-over-RDMA header in a Reply message
99  *
100  * There is only one Write list or one Reply chunk per Reply
101  * message.  The larger list is the Write list.
102  */
103 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
104 {
105 	unsigned int size;
106 
107 	/* Fixed header fields and list discriminators */
108 	size = RPCRDMA_HDRLEN_MIN;
109 
110 	/* Maximum Write list size */
111 	maxsegs += 2;	/* segment for head and tail buffers */
112 	size = sizeof(__be32);		/* segment count */
113 	size += maxsegs * sizeof(struct rpcrdma_segment);
114 	size += sizeof(__be32);	/* list discriminator */
115 
116 	dprintk("RPC:       %s: max reply header size = %u\n",
117 		__func__, size);
118 	return size;
119 }
120 
121 void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia,
122 				  struct rpcrdma_create_data_internal *cdata,
123 				  unsigned int maxsegs)
124 {
125 	ia->ri_max_inline_write = cdata->inline_wsize -
126 				  rpcrdma_max_call_header_size(maxsegs);
127 	ia->ri_max_inline_read = cdata->inline_rsize -
128 				 rpcrdma_max_reply_header_size(maxsegs);
129 }
130 
131 /* The client can send a request inline as long as the RPCRDMA header
132  * plus the RPC call fit under the transport's inline limit. If the
133  * combined call message size exceeds that limit, the client must use
134  * the read chunk list for this operation.
135  */
136 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
137 				struct rpc_rqst *rqst)
138 {
139 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
140 
141 	return rqst->rq_snd_buf.len <= ia->ri_max_inline_write;
142 }
143 
144 /* The client can't know how large the actual reply will be. Thus it
145  * plans for the largest possible reply for that particular ULP
146  * operation. If the maximum combined reply message size exceeds that
147  * limit, the client must provide a write list or a reply chunk for
148  * this request.
149  */
150 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
151 				   struct rpc_rqst *rqst)
152 {
153 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
154 
155 	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
156 }
157 
158 static int
159 rpcrdma_tail_pullup(struct xdr_buf *buf)
160 {
161 	size_t tlen = buf->tail[0].iov_len;
162 	size_t skip = tlen & 3;
163 
164 	/* Do not include the tail if it is only an XDR pad */
165 	if (tlen < 4)
166 		return 0;
167 
168 	/* xdr_write_pages() adds a pad at the beginning of the tail
169 	 * if the content in "buf->pages" is unaligned. Force the
170 	 * tail's actual content to land at the next XDR position
171 	 * after the head instead.
172 	 */
173 	if (skip) {
174 		unsigned char *src, *dst;
175 		unsigned int count;
176 
177 		src = buf->tail[0].iov_base;
178 		dst = buf->head[0].iov_base;
179 		dst += buf->head[0].iov_len;
180 
181 		src += skip;
182 		tlen -= skip;
183 
184 		dprintk("RPC:       %s: skip=%zu, memmove(%p, %p, %zu)\n",
185 			__func__, skip, dst, src, tlen);
186 
187 		for (count = tlen; count; count--)
188 			*dst++ = *src++;
189 	}
190 
191 	return tlen;
192 }
193 
194 /* Split "vec" on page boundaries into segments. FMR registers pages,
195  * not a byte range. Other modes coalesce these segments into a single
196  * MR when they can.
197  */
198 static int
199 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
200 		     int n, int nsegs)
201 {
202 	size_t page_offset;
203 	u32 remaining;
204 	char *base;
205 
206 	base = vec->iov_base;
207 	page_offset = offset_in_page(base);
208 	remaining = vec->iov_len;
209 	while (remaining && n < nsegs) {
210 		seg[n].mr_page = NULL;
211 		seg[n].mr_offset = base;
212 		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
213 		remaining -= seg[n].mr_len;
214 		base += seg[n].mr_len;
215 		++n;
216 		page_offset = 0;
217 	}
218 	return n;
219 }
220 
221 /*
222  * Chunk assembly from upper layer xdr_buf.
223  *
224  * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
225  * elements. Segments are then coalesced when registered, if possible
226  * within the selected memreg mode.
227  *
228  * Returns positive number of segments converted, or a negative errno.
229  */
230 
231 static int
232 rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
233 	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
234 {
235 	int len, n = 0, p;
236 	int page_base;
237 	struct page **ppages;
238 
239 	if (pos == 0) {
240 		n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
241 		if (n == nsegs)
242 			return -EIO;
243 	}
244 
245 	len = xdrbuf->page_len;
246 	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
247 	page_base = xdrbuf->page_base & ~PAGE_MASK;
248 	p = 0;
249 	while (len && n < nsegs) {
250 		if (!ppages[p]) {
251 			/* alloc the pagelist for receiving buffer */
252 			ppages[p] = alloc_page(GFP_ATOMIC);
253 			if (!ppages[p])
254 				return -ENOMEM;
255 		}
256 		seg[n].mr_page = ppages[p];
257 		seg[n].mr_offset = (void *)(unsigned long) page_base;
258 		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
259 		if (seg[n].mr_len > PAGE_SIZE)
260 			return -EIO;
261 		len -= seg[n].mr_len;
262 		++n;
263 		++p;
264 		page_base = 0;	/* page offset only applies to first page */
265 	}
266 
267 	/* Message overflows the seg array */
268 	if (len && n == nsegs)
269 		return -EIO;
270 
271 	/* When encoding the read list, the tail is always sent inline */
272 	if (type == rpcrdma_readch)
273 		return n;
274 
275 	if (xdrbuf->tail[0].iov_len) {
276 		/* the rpcrdma protocol allows us to omit any trailing
277 		 * xdr pad bytes, saving the server an RDMA operation. */
278 		if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
279 			return n;
280 		n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
281 		if (n == nsegs)
282 			return -EIO;
283 	}
284 
285 	return n;
286 }
287 
288 static inline __be32 *
289 xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg)
290 {
291 	*iptr++ = cpu_to_be32(seg->mr_rkey);
292 	*iptr++ = cpu_to_be32(seg->mr_len);
293 	return xdr_encode_hyper(iptr, seg->mr_base);
294 }
295 
296 /* XDR-encode the Read list. Supports encoding a list of read
297  * segments that belong to a single read chunk.
298  *
299  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
300  *
301  *  Read chunklist (a linked list):
302  *   N elements, position P (same P for all chunks of same arg!):
303  *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
304  *
305  * Returns a pointer to the XDR word in the RDMA header following
306  * the end of the Read list, or an error pointer.
307  */
308 static __be32 *
309 rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
310 			 struct rpcrdma_req *req, struct rpc_rqst *rqst,
311 			 __be32 *iptr, enum rpcrdma_chunktype rtype)
312 {
313 	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
314 	unsigned int pos;
315 	int n, nsegs;
316 
317 	if (rtype == rpcrdma_noch) {
318 		*iptr++ = xdr_zero;	/* item not present */
319 		return iptr;
320 	}
321 
322 	pos = rqst->rq_snd_buf.head[0].iov_len;
323 	if (rtype == rpcrdma_areadch)
324 		pos = 0;
325 	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
326 				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
327 	if (nsegs < 0)
328 		return ERR_PTR(nsegs);
329 
330 	do {
331 		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false);
332 		if (n <= 0)
333 			return ERR_PTR(n);
334 
335 		*iptr++ = xdr_one;	/* item present */
336 
337 		/* All read segments in this chunk
338 		 * have the same "position".
339 		 */
340 		*iptr++ = cpu_to_be32(pos);
341 		iptr = xdr_encode_rdma_segment(iptr, seg);
342 
343 		dprintk("RPC: %5u %s: read segment pos %u "
344 			"%d@0x%016llx:0x%08x (%s)\n",
345 			rqst->rq_task->tk_pid, __func__, pos,
346 			seg->mr_len, (unsigned long long)seg->mr_base,
347 			seg->mr_rkey, n < nsegs ? "more" : "last");
348 
349 		r_xprt->rx_stats.read_chunk_count++;
350 		req->rl_nchunks++;
351 		seg += n;
352 		nsegs -= n;
353 	} while (nsegs);
354 	req->rl_nextseg = seg;
355 
356 	/* Finish Read list */
357 	*iptr++ = xdr_zero;	/* Next item not present */
358 	return iptr;
359 }
360 
361 /* XDR-encode the Write list. Supports encoding a list containing
362  * one array of plain segments that belong to a single write chunk.
363  *
364  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
365  *
366  *  Write chunklist (a list of (one) counted array):
367  *   N elements:
368  *    1 - N - HLOO - HLOO - ... - HLOO - 0
369  *
370  * Returns a pointer to the XDR word in the RDMA header following
371  * the end of the Write list, or an error pointer.
372  */
373 static __be32 *
374 rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
375 			  struct rpc_rqst *rqst, __be32 *iptr,
376 			  enum rpcrdma_chunktype wtype)
377 {
378 	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
379 	int n, nsegs, nchunks;
380 	__be32 *segcount;
381 
382 	if (wtype != rpcrdma_writech) {
383 		*iptr++ = xdr_zero;	/* no Write list present */
384 		return iptr;
385 	}
386 
387 	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
388 				     rqst->rq_rcv_buf.head[0].iov_len,
389 				     wtype, seg,
390 				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
391 	if (nsegs < 0)
392 		return ERR_PTR(nsegs);
393 
394 	*iptr++ = xdr_one;	/* Write list present */
395 	segcount = iptr++;	/* save location of segment count */
396 
397 	nchunks = 0;
398 	do {
399 		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
400 		if (n <= 0)
401 			return ERR_PTR(n);
402 
403 		iptr = xdr_encode_rdma_segment(iptr, seg);
404 
405 		dprintk("RPC: %5u %s: write segment "
406 			"%d@0x016%llx:0x%08x (%s)\n",
407 			rqst->rq_task->tk_pid, __func__,
408 			seg->mr_len, (unsigned long long)seg->mr_base,
409 			seg->mr_rkey, n < nsegs ? "more" : "last");
410 
411 		r_xprt->rx_stats.write_chunk_count++;
412 		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
413 		req->rl_nchunks++;
414 		nchunks++;
415 		seg   += n;
416 		nsegs -= n;
417 	} while (nsegs);
418 	req->rl_nextseg = seg;
419 
420 	/* Update count of segments in this Write chunk */
421 	*segcount = cpu_to_be32(nchunks);
422 
423 	/* Finish Write list */
424 	*iptr++ = xdr_zero;	/* Next item not present */
425 	return iptr;
426 }
427 
428 /* XDR-encode the Reply chunk. Supports encoding an array of plain
429  * segments that belong to a single write (reply) chunk.
430  *
431  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
432  *
433  *  Reply chunk (a counted array):
434  *   N elements:
435  *    1 - N - HLOO - HLOO - ... - HLOO
436  *
437  * Returns a pointer to the XDR word in the RDMA header following
438  * the end of the Reply chunk, or an error pointer.
439  */
440 static __be32 *
441 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
442 			   struct rpcrdma_req *req, struct rpc_rqst *rqst,
443 			   __be32 *iptr, enum rpcrdma_chunktype wtype)
444 {
445 	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
446 	int n, nsegs, nchunks;
447 	__be32 *segcount;
448 
449 	if (wtype != rpcrdma_replych) {
450 		*iptr++ = xdr_zero;	/* no Reply chunk present */
451 		return iptr;
452 	}
453 
454 	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
455 				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
456 	if (nsegs < 0)
457 		return ERR_PTR(nsegs);
458 
459 	*iptr++ = xdr_one;	/* Reply chunk present */
460 	segcount = iptr++;	/* save location of segment count */
461 
462 	nchunks = 0;
463 	do {
464 		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
465 		if (n <= 0)
466 			return ERR_PTR(n);
467 
468 		iptr = xdr_encode_rdma_segment(iptr, seg);
469 
470 		dprintk("RPC: %5u %s: reply segment "
471 			"%d@0x%016llx:0x%08x (%s)\n",
472 			rqst->rq_task->tk_pid, __func__,
473 			seg->mr_len, (unsigned long long)seg->mr_base,
474 			seg->mr_rkey, n < nsegs ? "more" : "last");
475 
476 		r_xprt->rx_stats.reply_chunk_count++;
477 		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
478 		req->rl_nchunks++;
479 		nchunks++;
480 		seg   += n;
481 		nsegs -= n;
482 	} while (nsegs);
483 	req->rl_nextseg = seg;
484 
485 	/* Update count of segments in the Reply chunk */
486 	*segcount = cpu_to_be32(nchunks);
487 
488 	return iptr;
489 }
490 
491 /*
492  * Copy write data inline.
493  * This function is used for "small" requests. Data which is passed
494  * to RPC via iovecs (or page list) is copied directly into the
495  * pre-registered memory buffer for this request. For small amounts
496  * of data, this is efficient. The cutoff value is tunable.
497  */
498 static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
499 {
500 	int i, npages, curlen;
501 	int copy_len;
502 	unsigned char *srcp, *destp;
503 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
504 	int page_base;
505 	struct page **ppages;
506 
507 	destp = rqst->rq_svec[0].iov_base;
508 	curlen = rqst->rq_svec[0].iov_len;
509 	destp += curlen;
510 
511 	dprintk("RPC:       %s: destp 0x%p len %d hdrlen %d\n",
512 		__func__, destp, rqst->rq_slen, curlen);
513 
514 	copy_len = rqst->rq_snd_buf.page_len;
515 
516 	if (rqst->rq_snd_buf.tail[0].iov_len) {
517 		curlen = rqst->rq_snd_buf.tail[0].iov_len;
518 		if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) {
519 			memmove(destp + copy_len,
520 				rqst->rq_snd_buf.tail[0].iov_base, curlen);
521 			r_xprt->rx_stats.pullup_copy_count += curlen;
522 		}
523 		dprintk("RPC:       %s: tail destp 0x%p len %d\n",
524 			__func__, destp + copy_len, curlen);
525 		rqst->rq_svec[0].iov_len += curlen;
526 	}
527 	r_xprt->rx_stats.pullup_copy_count += copy_len;
528 
529 	page_base = rqst->rq_snd_buf.page_base;
530 	ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT);
531 	page_base &= ~PAGE_MASK;
532 	npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;
533 	for (i = 0; copy_len && i < npages; i++) {
534 		curlen = PAGE_SIZE - page_base;
535 		if (curlen > copy_len)
536 			curlen = copy_len;
537 		dprintk("RPC:       %s: page %d destp 0x%p len %d curlen %d\n",
538 			__func__, i, destp, copy_len, curlen);
539 		srcp = kmap_atomic(ppages[i]);
540 		memcpy(destp, srcp+page_base, curlen);
541 		kunmap_atomic(srcp);
542 		rqst->rq_svec[0].iov_len += curlen;
543 		destp += curlen;
544 		copy_len -= curlen;
545 		page_base = 0;
546 	}
547 	/* header now contains entire send message */
548 }
549 
550 /*
551  * Marshal a request: the primary job of this routine is to choose
552  * the transfer modes. See comments below.
553  *
554  * Prepares up to two IOVs per Call message:
555  *
556  *  [0] -- RPC RDMA header
557  *  [1] -- the RPC header/data
558  *
559  * Returns zero on success, otherwise a negative errno.
560  */
561 
562 int
563 rpcrdma_marshal_req(struct rpc_rqst *rqst)
564 {
565 	struct rpc_xprt *xprt = rqst->rq_xprt;
566 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
567 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
568 	enum rpcrdma_chunktype rtype, wtype;
569 	struct rpcrdma_msg *headerp;
570 	ssize_t hdrlen;
571 	size_t rpclen;
572 	__be32 *iptr;
573 
574 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
575 	if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
576 		return rpcrdma_bc_marshal_reply(rqst);
577 #endif
578 
579 	headerp = rdmab_to_msg(req->rl_rdmabuf);
580 	/* don't byte-swap XID, it's already done in request */
581 	headerp->rm_xid = rqst->rq_xid;
582 	headerp->rm_vers = rpcrdma_version;
583 	headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
584 	headerp->rm_type = rdma_msg;
585 
586 	/*
587 	 * Chunks needed for results?
588 	 *
589 	 * o If the expected result is under the inline threshold, all ops
590 	 *   return as inline.
591 	 * o Large read ops return data as write chunk(s), header as
592 	 *   inline.
593 	 * o Large non-read ops return as a single reply chunk.
594 	 */
595 	if (rpcrdma_results_inline(r_xprt, rqst))
596 		wtype = rpcrdma_noch;
597 	else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
598 		wtype = rpcrdma_writech;
599 	else
600 		wtype = rpcrdma_replych;
601 
602 	/*
603 	 * Chunks needed for arguments?
604 	 *
605 	 * o If the total request is under the inline threshold, all ops
606 	 *   are sent as inline.
607 	 * o Large write ops transmit data as read chunk(s), header as
608 	 *   inline.
609 	 * o Large non-write ops are sent with the entire message as a
610 	 *   single read chunk (protocol 0-position special case).
611 	 *
612 	 * This assumes that the upper layer does not present a request
613 	 * that both has a data payload, and whose non-data arguments
614 	 * by themselves are larger than the inline threshold.
615 	 */
616 	if (rpcrdma_args_inline(r_xprt, rqst)) {
617 		rtype = rpcrdma_noch;
618 		rpcrdma_inline_pullup(rqst);
619 		rpclen = rqst->rq_svec[0].iov_len;
620 	} else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
621 		rtype = rpcrdma_readch;
622 		rpclen = rqst->rq_svec[0].iov_len;
623 		rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
624 	} else {
625 		r_xprt->rx_stats.nomsg_call_count++;
626 		headerp->rm_type = htonl(RDMA_NOMSG);
627 		rtype = rpcrdma_areadch;
628 		rpclen = 0;
629 	}
630 
631 	/* This implementation supports the following combinations
632 	 * of chunk lists in one RPC-over-RDMA Call message:
633 	 *
634 	 *   - Read list
635 	 *   - Write list
636 	 *   - Reply chunk
637 	 *   - Read list + Reply chunk
638 	 *
639 	 * It might not yet support the following combinations:
640 	 *
641 	 *   - Read list + Write list
642 	 *
643 	 * It does not support the following combinations:
644 	 *
645 	 *   - Write list + Reply chunk
646 	 *   - Read list + Write list + Reply chunk
647 	 *
648 	 * This implementation supports only a single chunk in each
649 	 * Read or Write list. Thus for example the client cannot
650 	 * send a Call message with a Position Zero Read chunk and a
651 	 * regular Read chunk at the same time.
652 	 */
653 	req->rl_nchunks = 0;
654 	req->rl_nextseg = req->rl_segments;
655 	iptr = headerp->rm_body.rm_chunks;
656 	iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
657 	if (IS_ERR(iptr))
658 		goto out_unmap;
659 	iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
660 	if (IS_ERR(iptr))
661 		goto out_unmap;
662 	iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
663 	if (IS_ERR(iptr))
664 		goto out_unmap;
665 	hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
666 
667 	if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
668 		goto out_overflow;
669 
670 	dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
671 		rqst->rq_task->tk_pid, __func__,
672 		transfertypes[rtype], transfertypes[wtype],
673 		hdrlen, rpclen);
674 
675 	req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
676 	req->rl_send_iov[0].length = hdrlen;
677 	req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
678 
679 	req->rl_niovs = 1;
680 	if (rtype == rpcrdma_areadch)
681 		return 0;
682 
683 	req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
684 	req->rl_send_iov[1].length = rpclen;
685 	req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
686 
687 	req->rl_niovs = 2;
688 	return 0;
689 
690 out_overflow:
691 	pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
692 		hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
693 	/* Terminate this RPC. Chunks registered above will be
694 	 * released by xprt_release -> xprt_rmda_free .
695 	 */
696 	return -EIO;
697 
698 out_unmap:
699 	r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
700 	return PTR_ERR(iptr);
701 }
702 
703 /*
704  * Chase down a received write or reply chunklist to get length
705  * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
706  */
707 static int
708 rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp)
709 {
710 	unsigned int i, total_len;
711 	struct rpcrdma_write_chunk *cur_wchunk;
712 	char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
713 
714 	i = be32_to_cpu(**iptrp);
715 	if (i > max)
716 		return -1;
717 	cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
718 	total_len = 0;
719 	while (i--) {
720 		struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
721 		ifdebug(FACILITY) {
722 			u64 off;
723 			xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
724 			dprintk("RPC:       %s: chunk %d@0x%llx:0x%x\n",
725 				__func__,
726 				be32_to_cpu(seg->rs_length),
727 				(unsigned long long)off,
728 				be32_to_cpu(seg->rs_handle));
729 		}
730 		total_len += be32_to_cpu(seg->rs_length);
731 		++cur_wchunk;
732 	}
733 	/* check and adjust for properly terminated write chunk */
734 	if (wrchunk) {
735 		__be32 *w = (__be32 *) cur_wchunk;
736 		if (*w++ != xdr_zero)
737 			return -1;
738 		cur_wchunk = (struct rpcrdma_write_chunk *) w;
739 	}
740 	if ((char *)cur_wchunk > base + rep->rr_len)
741 		return -1;
742 
743 	*iptrp = (__be32 *) cur_wchunk;
744 	return total_len;
745 }
746 
747 /*
748  * Scatter inline received data back into provided iov's.
749  */
750 static void
751 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
752 {
753 	int i, npages, curlen, olen;
754 	char *destp;
755 	struct page **ppages;
756 	int page_base;
757 
758 	curlen = rqst->rq_rcv_buf.head[0].iov_len;
759 	if (curlen > copy_len) {	/* write chunk header fixup */
760 		curlen = copy_len;
761 		rqst->rq_rcv_buf.head[0].iov_len = curlen;
762 	}
763 
764 	dprintk("RPC:       %s: srcp 0x%p len %d hdrlen %d\n",
765 		__func__, srcp, copy_len, curlen);
766 
767 	/* Shift pointer for first receive segment only */
768 	rqst->rq_rcv_buf.head[0].iov_base = srcp;
769 	srcp += curlen;
770 	copy_len -= curlen;
771 
772 	olen = copy_len;
773 	i = 0;
774 	rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
775 	page_base = rqst->rq_rcv_buf.page_base;
776 	ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
777 	page_base &= ~PAGE_MASK;
778 
779 	if (copy_len && rqst->rq_rcv_buf.page_len) {
780 		npages = PAGE_ALIGN(page_base +
781 			rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
782 		for (; i < npages; i++) {
783 			curlen = PAGE_SIZE - page_base;
784 			if (curlen > copy_len)
785 				curlen = copy_len;
786 			dprintk("RPC:       %s: page %d"
787 				" srcp 0x%p len %d curlen %d\n",
788 				__func__, i, srcp, copy_len, curlen);
789 			destp = kmap_atomic(ppages[i]);
790 			memcpy(destp + page_base, srcp, curlen);
791 			flush_dcache_page(ppages[i]);
792 			kunmap_atomic(destp);
793 			srcp += curlen;
794 			copy_len -= curlen;
795 			if (copy_len == 0)
796 				break;
797 			page_base = 0;
798 		}
799 	}
800 
801 	if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
802 		curlen = copy_len;
803 		if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
804 			curlen = rqst->rq_rcv_buf.tail[0].iov_len;
805 		if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
806 			memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
807 		dprintk("RPC:       %s: tail srcp 0x%p len %d curlen %d\n",
808 			__func__, srcp, copy_len, curlen);
809 		rqst->rq_rcv_buf.tail[0].iov_len = curlen;
810 		copy_len -= curlen; ++i;
811 	} else
812 		rqst->rq_rcv_buf.tail[0].iov_len = 0;
813 
814 	if (pad) {
815 		/* implicit padding on terminal chunk */
816 		unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base;
817 		while (pad--)
818 			p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0;
819 	}
820 
821 	if (copy_len)
822 		dprintk("RPC:       %s: %d bytes in"
823 			" %d extra segments (%d lost)\n",
824 			__func__, olen, i, copy_len);
825 
826 	/* TBD avoid a warning from call_decode() */
827 	rqst->rq_private_buf = rqst->rq_rcv_buf;
828 }
829 
830 void
831 rpcrdma_connect_worker(struct work_struct *work)
832 {
833 	struct rpcrdma_ep *ep =
834 		container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
835 	struct rpcrdma_xprt *r_xprt =
836 		container_of(ep, struct rpcrdma_xprt, rx_ep);
837 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
838 
839 	spin_lock_bh(&xprt->transport_lock);
840 	if (++xprt->connect_cookie == 0)	/* maintain a reserved value */
841 		++xprt->connect_cookie;
842 	if (ep->rep_connected > 0) {
843 		if (!xprt_test_and_set_connected(xprt))
844 			xprt_wake_pending_tasks(xprt, 0);
845 	} else {
846 		if (xprt_test_and_clear_connected(xprt))
847 			xprt_wake_pending_tasks(xprt, -ENOTCONN);
848 	}
849 	spin_unlock_bh(&xprt->transport_lock);
850 }
851 
852 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
853 /* By convention, backchannel calls arrive via rdma_msg type
854  * messages, and never populate the chunk lists. This makes
855  * the RPC/RDMA header small and fixed in size, so it is
856  * straightforward to check the RPC header's direction field.
857  */
858 static bool
859 rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
860 {
861 	__be32 *p = (__be32 *)headerp;
862 
863 	if (headerp->rm_type != rdma_msg)
864 		return false;
865 	if (headerp->rm_body.rm_chunks[0] != xdr_zero)
866 		return false;
867 	if (headerp->rm_body.rm_chunks[1] != xdr_zero)
868 		return false;
869 	if (headerp->rm_body.rm_chunks[2] != xdr_zero)
870 		return false;
871 
872 	/* sanity */
873 	if (p[7] != headerp->rm_xid)
874 		return false;
875 	/* call direction */
876 	if (p[8] != cpu_to_be32(RPC_CALL))
877 		return false;
878 
879 	return true;
880 }
881 #endif	/* CONFIG_SUNRPC_BACKCHANNEL */
882 
883 /*
884  * This function is called when an async event is posted to
885  * the connection which changes the connection state. All it
886  * does at this point is mark the connection up/down, the rpc
887  * timers do the rest.
888  */
889 void
890 rpcrdma_conn_func(struct rpcrdma_ep *ep)
891 {
892 	schedule_delayed_work(&ep->rep_connect_worker, 0);
893 }
894 
895 /* Process received RPC/RDMA messages.
896  *
897  * Errors must result in the RPC task either being awakened, or
898  * allowed to timeout, to discover the errors at that time.
899  */
900 void
901 rpcrdma_reply_handler(struct rpcrdma_rep *rep)
902 {
903 	struct rpcrdma_msg *headerp;
904 	struct rpcrdma_req *req;
905 	struct rpc_rqst *rqst;
906 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
907 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
908 	__be32 *iptr;
909 	int rdmalen, status, rmerr;
910 	unsigned long cwnd;
911 
912 	dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
913 
914 	if (rep->rr_len == RPCRDMA_BAD_LEN)
915 		goto out_badstatus;
916 	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
917 		goto out_shortreply;
918 
919 	headerp = rdmab_to_msg(rep->rr_rdmabuf);
920 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
921 	if (rpcrdma_is_bcall(headerp))
922 		goto out_bcall;
923 #endif
924 
925 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
926 	 * get context for handling any incoming chunks.
927 	 */
928 	spin_lock_bh(&xprt->transport_lock);
929 	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
930 	if (!rqst)
931 		goto out_nomatch;
932 
933 	req = rpcr_to_rdmar(rqst);
934 	if (req->rl_reply)
935 		goto out_duplicate;
936 
937 	/* Sanity checking has passed. We are now committed
938 	 * to complete this transaction.
939 	 */
940 	list_del_init(&rqst->rq_list);
941 	spin_unlock_bh(&xprt->transport_lock);
942 	dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
943 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
944 
945 	/* from here on, the reply is no longer an orphan */
946 	req->rl_reply = rep;
947 	xprt->reestablish_timeout = 0;
948 
949 	if (headerp->rm_vers != rpcrdma_version)
950 		goto out_badversion;
951 
952 	/* check for expected message types */
953 	/* The order of some of these tests is important. */
954 	switch (headerp->rm_type) {
955 	case rdma_msg:
956 		/* never expect read chunks */
957 		/* never expect reply chunks (two ways to check) */
958 		/* never expect write chunks without having offered RDMA */
959 		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
960 		    (headerp->rm_body.rm_chunks[1] == xdr_zero &&
961 		     headerp->rm_body.rm_chunks[2] != xdr_zero) ||
962 		    (headerp->rm_body.rm_chunks[1] != xdr_zero &&
963 		     req->rl_nchunks == 0))
964 			goto badheader;
965 		if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
966 			/* count any expected write chunks in read reply */
967 			/* start at write chunk array count */
968 			iptr = &headerp->rm_body.rm_chunks[2];
969 			rdmalen = rpcrdma_count_chunks(rep,
970 						req->rl_nchunks, 1, &iptr);
971 			/* check for validity, and no reply chunk after */
972 			if (rdmalen < 0 || *iptr++ != xdr_zero)
973 				goto badheader;
974 			rep->rr_len -=
975 			    ((unsigned char *)iptr - (unsigned char *)headerp);
976 			status = rep->rr_len + rdmalen;
977 			r_xprt->rx_stats.total_rdma_reply += rdmalen;
978 			/* special case - last chunk may omit padding */
979 			if (rdmalen &= 3) {
980 				rdmalen = 4 - rdmalen;
981 				status += rdmalen;
982 			}
983 		} else {
984 			/* else ordinary inline */
985 			rdmalen = 0;
986 			iptr = (__be32 *)((unsigned char *)headerp +
987 							RPCRDMA_HDRLEN_MIN);
988 			rep->rr_len -= RPCRDMA_HDRLEN_MIN;
989 			status = rep->rr_len;
990 		}
991 		/* Fix up the rpc results for upper layer */
992 		rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
993 		break;
994 
995 	case rdma_nomsg:
996 		/* never expect read or write chunks, always reply chunks */
997 		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
998 		    headerp->rm_body.rm_chunks[1] != xdr_zero ||
999 		    headerp->rm_body.rm_chunks[2] != xdr_one ||
1000 		    req->rl_nchunks == 0)
1001 			goto badheader;
1002 		iptr = (__be32 *)((unsigned char *)headerp +
1003 							RPCRDMA_HDRLEN_MIN);
1004 		rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
1005 		if (rdmalen < 0)
1006 			goto badheader;
1007 		r_xprt->rx_stats.total_rdma_reply += rdmalen;
1008 		/* Reply chunk buffer already is the reply vector - no fixup. */
1009 		status = rdmalen;
1010 		break;
1011 
1012 	case rdma_error:
1013 		goto out_rdmaerr;
1014 
1015 badheader:
1016 	default:
1017 		dprintk("%s: invalid rpcrdma reply header (type %d):"
1018 				" chunks[012] == %d %d %d"
1019 				" expected chunks <= %d\n",
1020 				__func__, be32_to_cpu(headerp->rm_type),
1021 				headerp->rm_body.rm_chunks[0],
1022 				headerp->rm_body.rm_chunks[1],
1023 				headerp->rm_body.rm_chunks[2],
1024 				req->rl_nchunks);
1025 		status = -EIO;
1026 		r_xprt->rx_stats.bad_reply_count++;
1027 		break;
1028 	}
1029 
1030 out:
1031 	/* Invalidate and flush the data payloads before waking the
1032 	 * waiting application. This guarantees the memory region is
1033 	 * properly fenced from the server before the application
1034 	 * accesses the data. It also ensures proper send flow
1035 	 * control: waking the next RPC waits until this RPC has
1036 	 * relinquished all its Send Queue entries.
1037 	 */
1038 	if (req->rl_nchunks)
1039 		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
1040 
1041 	spin_lock_bh(&xprt->transport_lock);
1042 	cwnd = xprt->cwnd;
1043 	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
1044 	if (xprt->cwnd > cwnd)
1045 		xprt_release_rqst_cong(rqst->rq_task);
1046 
1047 	xprt_complete_rqst(rqst->rq_task, status);
1048 	spin_unlock_bh(&xprt->transport_lock);
1049 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
1050 			__func__, xprt, rqst, status);
1051 	return;
1052 
1053 out_badstatus:
1054 	rpcrdma_recv_buffer_put(rep);
1055 	if (r_xprt->rx_ep.rep_connected == 1) {
1056 		r_xprt->rx_ep.rep_connected = -EIO;
1057 		rpcrdma_conn_func(&r_xprt->rx_ep);
1058 	}
1059 	return;
1060 
1061 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1062 out_bcall:
1063 	rpcrdma_bc_receive_call(r_xprt, rep);
1064 	return;
1065 #endif
1066 
1067 /* If the incoming reply terminated a pending RPC, the next
1068  * RPC call will post a replacement receive buffer as it is
1069  * being marshaled.
1070  */
1071 out_badversion:
1072 	dprintk("RPC:       %s: invalid version %d\n",
1073 		__func__, be32_to_cpu(headerp->rm_vers));
1074 	status = -EIO;
1075 	r_xprt->rx_stats.bad_reply_count++;
1076 	goto out;
1077 
1078 out_rdmaerr:
1079 	rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
1080 	switch (rmerr) {
1081 	case ERR_VERS:
1082 		pr_err("%s: server reports header version error (%u-%u)\n",
1083 		       __func__,
1084 		       be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
1085 		       be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
1086 		break;
1087 	case ERR_CHUNK:
1088 		pr_err("%s: server reports header decoding error\n",
1089 		       __func__);
1090 		break;
1091 	default:
1092 		pr_err("%s: server reports unknown error %d\n",
1093 		       __func__, rmerr);
1094 	}
1095 	status = -EREMOTEIO;
1096 	r_xprt->rx_stats.bad_reply_count++;
1097 	goto out;
1098 
1099 /* If no pending RPC transaction was matched, post a replacement
1100  * receive buffer before returning.
1101  */
1102 out_shortreply:
1103 	dprintk("RPC:       %s: short/invalid reply\n", __func__);
1104 	goto repost;
1105 
1106 out_nomatch:
1107 	spin_unlock_bh(&xprt->transport_lock);
1108 	dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
1109 		__func__, be32_to_cpu(headerp->rm_xid),
1110 		rep->rr_len);
1111 	goto repost;
1112 
1113 out_duplicate:
1114 	spin_unlock_bh(&xprt->transport_lock);
1115 	dprintk("RPC:       %s: "
1116 		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
1117 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
1118 
1119 repost:
1120 	r_xprt->rx_stats.bad_reply_count++;
1121 	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
1122 		rpcrdma_recv_buffer_put(rep);
1123 }
1124