xref: /linux/net/sunrpc/xprtrdma/svc_rdma_sendto.c (revision ca55b2fef3a9373fcfc30f82fd26bc7fccbda732)
1 /*
2  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
3  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the BSD-type
9  * license below:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions
13  * are met:
14  *
15  *      Redistributions of source code must retain the above copyright
16  *      notice, this list of conditions and the following disclaimer.
17  *
18  *      Redistributions in binary form must reproduce the above
19  *      copyright notice, this list of conditions and the following
20  *      disclaimer in the documentation and/or other materials provided
21  *      with the distribution.
22  *
23  *      Neither the name of the Network Appliance, Inc. nor the names of
24  *      its contributors may be used to endorse or promote products
25  *      derived from this software without specific prior written
26  *      permission.
27  *
28  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
29  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
31  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
32  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
33  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
34  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
35  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
36  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
37  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
38  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39  *
40  * Author: Tom Tucker <tom@opengridcomputing.com>
41  */
42 
43 #include <linux/sunrpc/debug.h>
44 #include <linux/sunrpc/rpc_rdma.h>
45 #include <linux/spinlock.h>
46 #include <asm/unaligned.h>
47 #include <rdma/ib_verbs.h>
48 #include <rdma/rdma_cm.h>
49 #include <linux/sunrpc/svc_rdma.h>
50 
51 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
52 
53 static int map_xdr(struct svcxprt_rdma *xprt,
54 		   struct xdr_buf *xdr,
55 		   struct svc_rdma_req_map *vec)
56 {
57 	int sge_no;
58 	u32 sge_bytes;
59 	u32 page_bytes;
60 	u32 page_off;
61 	int page_no;
62 
63 	if (xdr->len !=
64 	    (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
65 		pr_err("svcrdma: map_xdr: XDR buffer length error\n");
66 		return -EIO;
67 	}
68 
69 	/* Skip the first sge, this is for the RPCRDMA header */
70 	sge_no = 1;
71 
72 	/* Head SGE */
73 	vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
74 	vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
75 	sge_no++;
76 
77 	/* pages SGE */
78 	page_no = 0;
79 	page_bytes = xdr->page_len;
80 	page_off = xdr->page_base;
81 	while (page_bytes) {
82 		vec->sge[sge_no].iov_base =
83 			page_address(xdr->pages[page_no]) + page_off;
84 		sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
85 		page_bytes -= sge_bytes;
86 		vec->sge[sge_no].iov_len = sge_bytes;
87 
88 		sge_no++;
89 		page_no++;
90 		page_off = 0; /* reset for next time through loop */
91 	}
92 
93 	/* Tail SGE */
94 	if (xdr->tail[0].iov_len) {
95 		vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
96 		vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
97 		sge_no++;
98 	}
99 
100 	dprintk("svcrdma: map_xdr: sge_no %d page_no %d "
101 		"page_base %u page_len %u head_len %zu tail_len %zu\n",
102 		sge_no, page_no, xdr->page_base, xdr->page_len,
103 		xdr->head[0].iov_len, xdr->tail[0].iov_len);
104 
105 	vec->count = sge_no;
106 	return 0;
107 }
108 
109 static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
110 			      struct xdr_buf *xdr,
111 			      u32 xdr_off, size_t len, int dir)
112 {
113 	struct page *page;
114 	dma_addr_t dma_addr;
115 	if (xdr_off < xdr->head[0].iov_len) {
116 		/* This offset is in the head */
117 		xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
118 		page = virt_to_page(xdr->head[0].iov_base);
119 	} else {
120 		xdr_off -= xdr->head[0].iov_len;
121 		if (xdr_off < xdr->page_len) {
122 			/* This offset is in the page list */
123 			xdr_off += xdr->page_base;
124 			page = xdr->pages[xdr_off >> PAGE_SHIFT];
125 			xdr_off &= ~PAGE_MASK;
126 		} else {
127 			/* This offset is in the tail */
128 			xdr_off -= xdr->page_len;
129 			xdr_off += (unsigned long)
130 				xdr->tail[0].iov_base & ~PAGE_MASK;
131 			page = virt_to_page(xdr->tail[0].iov_base);
132 		}
133 	}
134 	dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
135 				   min_t(size_t, PAGE_SIZE, len), dir);
136 	return dma_addr;
137 }
138 
139 /* Returns the address of the first read chunk or <nul> if no read chunk
140  * is present
141  */
142 struct rpcrdma_read_chunk *
143 svc_rdma_get_read_chunk(struct rpcrdma_msg *rmsgp)
144 {
145 	struct rpcrdma_read_chunk *ch =
146 		(struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
147 
148 	if (ch->rc_discrim == xdr_zero)
149 		return NULL;
150 	return ch;
151 }
152 
153 /* Returns the address of the first read write array element or <nul>
154  * if no write array list is present
155  */
156 static struct rpcrdma_write_array *
157 svc_rdma_get_write_array(struct rpcrdma_msg *rmsgp)
158 {
159 	if (rmsgp->rm_body.rm_chunks[0] != xdr_zero ||
160 	    rmsgp->rm_body.rm_chunks[1] == xdr_zero)
161 		return NULL;
162 	return (struct rpcrdma_write_array *)&rmsgp->rm_body.rm_chunks[1];
163 }
164 
165 /* Returns the address of the first reply array element or <nul> if no
166  * reply array is present
167  */
168 static struct rpcrdma_write_array *
169 svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp)
170 {
171 	struct rpcrdma_read_chunk *rch;
172 	struct rpcrdma_write_array *wr_ary;
173 	struct rpcrdma_write_array *rp_ary;
174 
175 	/* XXX: Need to fix when reply chunk may occur with read list
176 	 *	and/or write list.
177 	 */
178 	if (rmsgp->rm_body.rm_chunks[0] != xdr_zero ||
179 	    rmsgp->rm_body.rm_chunks[1] != xdr_zero)
180 		return NULL;
181 
182 	rch = svc_rdma_get_read_chunk(rmsgp);
183 	if (rch) {
184 		while (rch->rc_discrim != xdr_zero)
185 			rch++;
186 
187 		/* The reply chunk follows an empty write array located
188 		 * at 'rc_position' here. The reply array is at rc_target.
189 		 */
190 		rp_ary = (struct rpcrdma_write_array *)&rch->rc_target;
191 		goto found_it;
192 	}
193 
194 	wr_ary = svc_rdma_get_write_array(rmsgp);
195 	if (wr_ary) {
196 		int chunk = be32_to_cpu(wr_ary->wc_nchunks);
197 
198 		rp_ary = (struct rpcrdma_write_array *)
199 			 &wr_ary->wc_array[chunk].wc_target.rs_length;
200 		goto found_it;
201 	}
202 
203 	/* No read list, no write list */
204 	rp_ary = (struct rpcrdma_write_array *)&rmsgp->rm_body.rm_chunks[2];
205 
206  found_it:
207 	if (rp_ary->wc_discrim == xdr_zero)
208 		return NULL;
209 	return rp_ary;
210 }
211 
212 /* Assumptions:
213  * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
214  */
215 static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
216 		      u32 rmr, u64 to,
217 		      u32 xdr_off, int write_len,
218 		      struct svc_rdma_req_map *vec)
219 {
220 	struct ib_send_wr write_wr;
221 	struct ib_sge *sge;
222 	int xdr_sge_no;
223 	int sge_no;
224 	int sge_bytes;
225 	int sge_off;
226 	int bc;
227 	struct svc_rdma_op_ctxt *ctxt;
228 
229 	if (vec->count > RPCSVC_MAXPAGES) {
230 		pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
231 		return -EIO;
232 	}
233 
234 	dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
235 		"write_len=%d, vec->sge=%p, vec->count=%lu\n",
236 		rmr, (unsigned long long)to, xdr_off,
237 		write_len, vec->sge, vec->count);
238 
239 	ctxt = svc_rdma_get_context(xprt);
240 	ctxt->direction = DMA_TO_DEVICE;
241 	sge = ctxt->sge;
242 
243 	/* Find the SGE associated with xdr_off */
244 	for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
245 	     xdr_sge_no++) {
246 		if (vec->sge[xdr_sge_no].iov_len > bc)
247 			break;
248 		bc -= vec->sge[xdr_sge_no].iov_len;
249 	}
250 
251 	sge_off = bc;
252 	bc = write_len;
253 	sge_no = 0;
254 
255 	/* Copy the remaining SGE */
256 	while (bc != 0) {
257 		sge_bytes = min_t(size_t,
258 			  bc, vec->sge[xdr_sge_no].iov_len-sge_off);
259 		sge[sge_no].length = sge_bytes;
260 		sge[sge_no].addr =
261 			dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
262 				    sge_bytes, DMA_TO_DEVICE);
263 		xdr_off += sge_bytes;
264 		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
265 					 sge[sge_no].addr))
266 			goto err;
267 		atomic_inc(&xprt->sc_dma_used);
268 		sge[sge_no].lkey = xprt->sc_dma_lkey;
269 		ctxt->count++;
270 		sge_off = 0;
271 		sge_no++;
272 		xdr_sge_no++;
273 		if (xdr_sge_no > vec->count) {
274 			pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
275 			goto err;
276 		}
277 		bc -= sge_bytes;
278 		if (sge_no == xprt->sc_max_sge)
279 			break;
280 	}
281 
282 	/* Prepare WRITE WR */
283 	memset(&write_wr, 0, sizeof write_wr);
284 	ctxt->wr_op = IB_WR_RDMA_WRITE;
285 	write_wr.wr_id = (unsigned long)ctxt;
286 	write_wr.sg_list = &sge[0];
287 	write_wr.num_sge = sge_no;
288 	write_wr.opcode = IB_WR_RDMA_WRITE;
289 	write_wr.send_flags = IB_SEND_SIGNALED;
290 	write_wr.wr.rdma.rkey = rmr;
291 	write_wr.wr.rdma.remote_addr = to;
292 
293 	/* Post It */
294 	atomic_inc(&rdma_stat_write);
295 	if (svc_rdma_send(xprt, &write_wr))
296 		goto err;
297 	return write_len - bc;
298  err:
299 	svc_rdma_unmap_dma(ctxt);
300 	svc_rdma_put_context(ctxt, 0);
301 	/* Fatal error, close transport */
302 	return -EIO;
303 }
304 
305 static int send_write_chunks(struct svcxprt_rdma *xprt,
306 			     struct rpcrdma_msg *rdma_argp,
307 			     struct rpcrdma_msg *rdma_resp,
308 			     struct svc_rqst *rqstp,
309 			     struct svc_rdma_req_map *vec)
310 {
311 	u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
312 	int write_len;
313 	u32 xdr_off;
314 	int chunk_off;
315 	int chunk_no;
316 	int nchunks;
317 	struct rpcrdma_write_array *arg_ary;
318 	struct rpcrdma_write_array *res_ary;
319 	int ret;
320 
321 	arg_ary = svc_rdma_get_write_array(rdma_argp);
322 	if (!arg_ary)
323 		return 0;
324 	res_ary = (struct rpcrdma_write_array *)
325 		&rdma_resp->rm_body.rm_chunks[1];
326 
327 	/* Write chunks start at the pagelist */
328 	nchunks = be32_to_cpu(arg_ary->wc_nchunks);
329 	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
330 	     xfer_len && chunk_no < nchunks;
331 	     chunk_no++) {
332 		struct rpcrdma_segment *arg_ch;
333 		u64 rs_offset;
334 
335 		arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
336 		write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
337 
338 		/* Prepare the response chunk given the length actually
339 		 * written */
340 		xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
341 		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
342 						arg_ch->rs_handle,
343 						arg_ch->rs_offset,
344 						write_len);
345 		chunk_off = 0;
346 		while (write_len) {
347 			ret = send_write(xprt, rqstp,
348 					 be32_to_cpu(arg_ch->rs_handle),
349 					 rs_offset + chunk_off,
350 					 xdr_off,
351 					 write_len,
352 					 vec);
353 			if (ret <= 0) {
354 				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
355 					ret);
356 				return -EIO;
357 			}
358 			chunk_off += ret;
359 			xdr_off += ret;
360 			xfer_len -= ret;
361 			write_len -= ret;
362 		}
363 	}
364 	/* Update the req with the number of chunks actually used */
365 	svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
366 
367 	return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
368 }
369 
370 static int send_reply_chunks(struct svcxprt_rdma *xprt,
371 			     struct rpcrdma_msg *rdma_argp,
372 			     struct rpcrdma_msg *rdma_resp,
373 			     struct svc_rqst *rqstp,
374 			     struct svc_rdma_req_map *vec)
375 {
376 	u32 xfer_len = rqstp->rq_res.len;
377 	int write_len;
378 	u32 xdr_off;
379 	int chunk_no;
380 	int chunk_off;
381 	int nchunks;
382 	struct rpcrdma_segment *ch;
383 	struct rpcrdma_write_array *arg_ary;
384 	struct rpcrdma_write_array *res_ary;
385 	int ret;
386 
387 	arg_ary = svc_rdma_get_reply_array(rdma_argp);
388 	if (!arg_ary)
389 		return 0;
390 	/* XXX: need to fix when reply lists occur with read-list and or
391 	 * write-list */
392 	res_ary = (struct rpcrdma_write_array *)
393 		&rdma_resp->rm_body.rm_chunks[2];
394 
395 	/* xdr offset starts at RPC message */
396 	nchunks = be32_to_cpu(arg_ary->wc_nchunks);
397 	for (xdr_off = 0, chunk_no = 0;
398 	     xfer_len && chunk_no < nchunks;
399 	     chunk_no++) {
400 		u64 rs_offset;
401 		ch = &arg_ary->wc_array[chunk_no].wc_target;
402 		write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
403 
404 		/* Prepare the reply chunk given the length actually
405 		 * written */
406 		xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
407 		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
408 						ch->rs_handle, ch->rs_offset,
409 						write_len);
410 		chunk_off = 0;
411 		while (write_len) {
412 			ret = send_write(xprt, rqstp,
413 					 be32_to_cpu(ch->rs_handle),
414 					 rs_offset + chunk_off,
415 					 xdr_off,
416 					 write_len,
417 					 vec);
418 			if (ret <= 0) {
419 				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
420 					ret);
421 				return -EIO;
422 			}
423 			chunk_off += ret;
424 			xdr_off += ret;
425 			xfer_len -= ret;
426 			write_len -= ret;
427 		}
428 	}
429 	/* Update the req with the number of chunks actually used */
430 	svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
431 
432 	return rqstp->rq_res.len;
433 }
434 
435 /* This function prepares the portion of the RPCRDMA message to be
436  * sent in the RDMA_SEND. This function is called after data sent via
437  * RDMA has already been transmitted. There are three cases:
438  * - The RPCRDMA header, RPC header, and payload are all sent in a
439  *   single RDMA_SEND. This is the "inline" case.
440  * - The RPCRDMA header and some portion of the RPC header and data
441  *   are sent via this RDMA_SEND and another portion of the data is
442  *   sent via RDMA.
443  * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
444  *   header and data are all transmitted via RDMA.
445  * In all three cases, this function prepares the RPCRDMA header in
446  * sge[0], the 'type' parameter indicates the type to place in the
447  * RPCRDMA header, and the 'byte_count' field indicates how much of
448  * the XDR to include in this RDMA_SEND. NB: The offset of the payload
449  * to send is zero in the XDR.
450  */
451 static int send_reply(struct svcxprt_rdma *rdma,
452 		      struct svc_rqst *rqstp,
453 		      struct page *page,
454 		      struct rpcrdma_msg *rdma_resp,
455 		      struct svc_rdma_op_ctxt *ctxt,
456 		      struct svc_rdma_req_map *vec,
457 		      int byte_count)
458 {
459 	struct ib_send_wr send_wr;
460 	u32 xdr_off;
461 	int sge_no;
462 	int sge_bytes;
463 	int page_no;
464 	int pages;
465 	int ret;
466 
467 	/* Post a recv buffer to handle another request. */
468 	ret = svc_rdma_post_recv(rdma);
469 	if (ret) {
470 		printk(KERN_INFO
471 		       "svcrdma: could not post a receive buffer, err=%d."
472 		       "Closing transport %p.\n", ret, rdma);
473 		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
474 		svc_rdma_put_context(ctxt, 0);
475 		return -ENOTCONN;
476 	}
477 
478 	/* Prepare the context */
479 	ctxt->pages[0] = page;
480 	ctxt->count = 1;
481 
482 	/* Prepare the SGE for the RPCRDMA Header */
483 	ctxt->sge[0].lkey = rdma->sc_dma_lkey;
484 	ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
485 	ctxt->sge[0].addr =
486 	    ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
487 			    ctxt->sge[0].length, DMA_TO_DEVICE);
488 	if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
489 		goto err;
490 	atomic_inc(&rdma->sc_dma_used);
491 
492 	ctxt->direction = DMA_TO_DEVICE;
493 
494 	/* Map the payload indicated by 'byte_count' */
495 	xdr_off = 0;
496 	for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
497 		sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
498 		byte_count -= sge_bytes;
499 		ctxt->sge[sge_no].addr =
500 			dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
501 				    sge_bytes, DMA_TO_DEVICE);
502 		xdr_off += sge_bytes;
503 		if (ib_dma_mapping_error(rdma->sc_cm_id->device,
504 					 ctxt->sge[sge_no].addr))
505 			goto err;
506 		atomic_inc(&rdma->sc_dma_used);
507 		ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
508 		ctxt->sge[sge_no].length = sge_bytes;
509 	}
510 	if (byte_count != 0) {
511 		pr_err("svcrdma: Could not map %d bytes\n", byte_count);
512 		goto err;
513 	}
514 
515 	/* Save all respages in the ctxt and remove them from the
516 	 * respages array. They are our pages until the I/O
517 	 * completes.
518 	 */
519 	pages = rqstp->rq_next_page - rqstp->rq_respages;
520 	for (page_no = 0; page_no < pages; page_no++) {
521 		ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
522 		ctxt->count++;
523 		rqstp->rq_respages[page_no] = NULL;
524 		/*
525 		 * If there are more pages than SGE, terminate SGE
526 		 * list so that svc_rdma_unmap_dma doesn't attempt to
527 		 * unmap garbage.
528 		 */
529 		if (page_no+1 >= sge_no)
530 			ctxt->sge[page_no+1].length = 0;
531 	}
532 	rqstp->rq_next_page = rqstp->rq_respages + 1;
533 
534 	/* The loop above bumps sc_dma_used for each sge. The
535 	 * xdr_buf.tail gets a separate sge, but resides in the
536 	 * same page as xdr_buf.head. Don't count it twice.
537 	 */
538 	if (sge_no > ctxt->count)
539 		atomic_dec(&rdma->sc_dma_used);
540 
541 	if (sge_no > rdma->sc_max_sge) {
542 		pr_err("svcrdma: Too many sges (%d)\n", sge_no);
543 		goto err;
544 	}
545 	memset(&send_wr, 0, sizeof send_wr);
546 	ctxt->wr_op = IB_WR_SEND;
547 	send_wr.wr_id = (unsigned long)ctxt;
548 	send_wr.sg_list = ctxt->sge;
549 	send_wr.num_sge = sge_no;
550 	send_wr.opcode = IB_WR_SEND;
551 	send_wr.send_flags =  IB_SEND_SIGNALED;
552 
553 	ret = svc_rdma_send(rdma, &send_wr);
554 	if (ret)
555 		goto err;
556 
557 	return 0;
558 
559  err:
560 	svc_rdma_unmap_dma(ctxt);
561 	svc_rdma_put_context(ctxt, 1);
562 	return -EIO;
563 }
564 
565 void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
566 {
567 }
568 
569 int svc_rdma_sendto(struct svc_rqst *rqstp)
570 {
571 	struct svc_xprt *xprt = rqstp->rq_xprt;
572 	struct svcxprt_rdma *rdma =
573 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
574 	struct rpcrdma_msg *rdma_argp;
575 	struct rpcrdma_msg *rdma_resp;
576 	struct rpcrdma_write_array *reply_ary;
577 	enum rpcrdma_proc reply_type;
578 	int ret;
579 	int inline_bytes;
580 	struct page *res_page;
581 	struct svc_rdma_op_ctxt *ctxt;
582 	struct svc_rdma_req_map *vec;
583 
584 	dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
585 
586 	/* Get the RDMA request header. The receive logic always
587 	 * places this at the start of page 0.
588 	 */
589 	rdma_argp = page_address(rqstp->rq_pages[0]);
590 
591 	/* Build an req vec for the XDR */
592 	ctxt = svc_rdma_get_context(rdma);
593 	ctxt->direction = DMA_TO_DEVICE;
594 	vec = svc_rdma_get_req_map();
595 	ret = map_xdr(rdma, &rqstp->rq_res, vec);
596 	if (ret)
597 		goto err0;
598 	inline_bytes = rqstp->rq_res.len;
599 
600 	/* Create the RDMA response header */
601 	res_page = alloc_page(GFP_KERNEL | __GFP_NOFAIL);
602 	rdma_resp = page_address(res_page);
603 	reply_ary = svc_rdma_get_reply_array(rdma_argp);
604 	if (reply_ary)
605 		reply_type = RDMA_NOMSG;
606 	else
607 		reply_type = RDMA_MSG;
608 	svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
609 					 rdma_resp, reply_type);
610 
611 	/* Send any write-chunk data and build resp write-list */
612 	ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
613 				rqstp, vec);
614 	if (ret < 0) {
615 		printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
616 		       ret);
617 		goto err1;
618 	}
619 	inline_bytes -= ret;
620 
621 	/* Send any reply-list data and update resp reply-list */
622 	ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
623 				rqstp, vec);
624 	if (ret < 0) {
625 		printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
626 		       ret);
627 		goto err1;
628 	}
629 	inline_bytes -= ret;
630 
631 	ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
632 			 inline_bytes);
633 	svc_rdma_put_req_map(vec);
634 	dprintk("svcrdma: send_reply returns %d\n", ret);
635 	return ret;
636 
637  err1:
638 	put_page(res_page);
639  err0:
640 	svc_rdma_put_req_map(vec);
641 	svc_rdma_put_context(ctxt, 0);
642 	return ret;
643 }
644