xref: /linux/net/sunrpc/xprtrdma/svc_rdma_rw.c (revision 18755b8c2f241648b951d3772e0742cc59834d5a)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2016-2018 Oracle.  All rights reserved.
4  *
5  * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
6  */
7 
8 #include <linux/bvec.h>
9 #include <linux/overflow.h>
10 #include <rdma/rw.h>
11 
12 #include <linux/sunrpc/xdr.h>
13 #include <linux/sunrpc/rpc_rdma.h>
14 #include <linux/sunrpc/svc_rdma.h>
15 
16 #include "xprt_rdma.h"
17 #include <trace/events/rpcrdma.h>
18 
19 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
20 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
21 
22 /* Each R/W context contains state for one chain of RDMA Read or
23  * Write Work Requests.
24  *
25  * Each WR chain handles a single contiguous server-side buffer.
26  * - each xdr_buf iovec is a single contiguous buffer
27  * - the xdr_buf pages array is a single contiguous buffer because the
28  *   second through the last element always start on a page boundary
29  *
30  * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
31  * from a client may contain a unique R_key, so each WR chain moves
32  * up to one segment at a time.
33  *
34  * The inline bvec array is sized to handle most I/O requests without
35  * additional allocation. Larger requests fall back to dynamic allocation.
36  * These contexts are created on demand, but cached and reused until
37  * the controlling svcxprt_rdma is destroyed.
38  */
39 struct svc_rdma_rw_ctxt {
40 	struct llist_node	rw_node;
41 	struct list_head	rw_list;
42 	struct rdma_rw_ctx	rw_ctx;
43 	unsigned int		rw_nents;
44 	unsigned int		rw_first_bvec_nents;
45 	struct bio_vec		*rw_bvec;
46 	struct bio_vec		rw_first_bvec[];
47 };
48 
49 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
50 				 struct svc_rdma_rw_ctxt *ctxt);
51 
52 static inline struct svc_rdma_rw_ctxt *
53 svc_rdma_next_ctxt(struct list_head *list)
54 {
55 	return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
56 					rw_list);
57 }
58 
59 static struct svc_rdma_rw_ctxt *
60 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int nr_bvec)
61 {
62 	struct ib_device *dev = rdma->sc_cm_id->device;
63 	unsigned int first_bvec_nents = dev->attrs.max_send_sge;
64 	struct svc_rdma_rw_ctxt *ctxt;
65 	struct llist_node *node;
66 
67 	spin_lock(&rdma->sc_rw_ctxt_lock);
68 	node = llist_del_first(&rdma->sc_rw_ctxts);
69 	spin_unlock(&rdma->sc_rw_ctxt_lock);
70 	if (node) {
71 		ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
72 	} else {
73 		ctxt = kmalloc_node(struct_size(ctxt, rw_first_bvec,
74 						first_bvec_nents),
75 				    GFP_KERNEL, ibdev_to_node(dev));
76 		if (!ctxt)
77 			goto out_noctx;
78 
79 		INIT_LIST_HEAD(&ctxt->rw_list);
80 		ctxt->rw_first_bvec_nents = first_bvec_nents;
81 	}
82 
83 	if (nr_bvec <= ctxt->rw_first_bvec_nents) {
84 		ctxt->rw_bvec = ctxt->rw_first_bvec;
85 	} else {
86 		ctxt->rw_bvec = kmalloc_array_node(nr_bvec,
87 						   sizeof(*ctxt->rw_bvec),
88 						   GFP_KERNEL,
89 						   ibdev_to_node(dev));
90 		if (!ctxt->rw_bvec)
91 			goto out_free;
92 	}
93 	return ctxt;
94 
95 out_free:
96 	/* Return cached contexts to cache; free freshly allocated ones */
97 	if (node)
98 		svc_rdma_put_rw_ctxt(rdma, ctxt);
99 	else
100 		kfree(ctxt);
101 out_noctx:
102 	trace_svcrdma_rwctx_empty(rdma, nr_bvec);
103 	return NULL;
104 }
105 
106 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt,
107 				   struct llist_head *list)
108 {
109 	if (ctxt->rw_bvec != ctxt->rw_first_bvec)
110 		kfree(ctxt->rw_bvec);
111 	llist_add(&ctxt->rw_node, list);
112 }
113 
114 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
115 				 struct svc_rdma_rw_ctxt *ctxt)
116 {
117 	__svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts);
118 }
119 
120 /**
121  * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
122  * @rdma: transport about to be destroyed
123  *
124  */
125 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
126 {
127 	struct svc_rdma_rw_ctxt *ctxt;
128 	struct llist_node *node;
129 
130 	while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) {
131 		ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
132 		kfree(ctxt);
133 	}
134 }
135 
136 /**
137  * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
138  * @rdma: controlling transport instance
139  * @ctxt: R/W context to prepare
140  * @offset: RDMA offset
141  * @handle: RDMA tag/handle
142  * @length: total number of bytes in the bvec array
143  * @direction: I/O direction
144  *
145  * Returns on success, the number of WQEs that will be needed
146  * on the workqueue, or a negative errno.
147  */
148 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
149 				struct svc_rdma_rw_ctxt *ctxt,
150 				u64 offset, u32 handle, unsigned int length,
151 				enum dma_data_direction direction)
152 {
153 	struct bvec_iter iter = {
154 		.bi_size = length,
155 	};
156 	int ret;
157 
158 	ret = rdma_rw_ctx_init_bvec(&ctxt->rw_ctx, rdma->sc_qp,
159 				    rdma->sc_port_num,
160 				    ctxt->rw_bvec, ctxt->rw_nents,
161 				    iter, offset, handle, direction);
162 	if (unlikely(ret < 0)) {
163 		trace_svcrdma_dma_map_rw_err(rdma, offset, handle,
164 					     ctxt->rw_nents, ret);
165 		svc_rdma_put_rw_ctxt(rdma, ctxt);
166 	}
167 	return ret;
168 }
169 
170 /**
171  * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt
172  * @rdma: controlling transport instance
173  * @cc: svc_rdma_chunk_ctxt to be initialized
174  */
175 void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
176 		      struct svc_rdma_chunk_ctxt *cc)
177 {
178 	struct rpc_rdma_cid *cid = &cc->cc_cid;
179 
180 	if (unlikely(!cid->ci_completion_id))
181 		svc_rdma_send_cid_init(rdma, cid);
182 
183 	INIT_LIST_HEAD(&cc->cc_rwctxts);
184 	cc->cc_sqecount = 0;
185 }
186 
187 /**
188  * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
189  * @rdma: controlling transport instance
190  * @cc: svc_rdma_chunk_ctxt to be released
191  * @dir: DMA direction
192  */
193 void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
194 			 struct svc_rdma_chunk_ctxt *cc,
195 			 enum dma_data_direction dir)
196 {
197 	struct llist_node *first, *last;
198 	struct svc_rdma_rw_ctxt *ctxt;
199 
200 	trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount);
201 
202 	first = last = NULL;
203 	while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
204 		list_del(&ctxt->rw_list);
205 
206 		rdma_rw_ctx_destroy_bvec(&ctxt->rw_ctx, rdma->sc_qp,
207 					 rdma->sc_port_num,
208 					 ctxt->rw_bvec, ctxt->rw_nents, dir);
209 		if (ctxt->rw_bvec != ctxt->rw_first_bvec)
210 			kfree(ctxt->rw_bvec);
211 
212 		ctxt->rw_node.next = first;
213 		first = &ctxt->rw_node;
214 		if (!last)
215 			last = first;
216 	}
217 	if (first)
218 		llist_add_batch(first, last, &rdma->sc_rw_ctxts);
219 }
220 
221 static struct svc_rdma_write_info *
222 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
223 			  const struct svc_rdma_chunk *chunk)
224 {
225 	struct svc_rdma_write_info *info;
226 
227 	info = kzalloc_node(sizeof(*info), GFP_KERNEL,
228 			    ibdev_to_node(rdma->sc_cm_id->device));
229 	if (!info)
230 		return info;
231 
232 	info->wi_rdma = rdma;
233 	info->wi_chunk = chunk;
234 	svc_rdma_cc_init(rdma, &info->wi_cc);
235 	info->wi_cc.cc_cqe.done = svc_rdma_write_done;
236 	return info;
237 }
238 
239 static void svc_rdma_write_info_free_async(struct work_struct *work)
240 {
241 	struct svc_rdma_write_info *info;
242 
243 	info = container_of(work, struct svc_rdma_write_info, wi_work);
244 	svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE);
245 	kfree(info);
246 }
247 
248 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
249 {
250 	INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async);
251 	queue_work(svcrdma_wq, &info->wi_work);
252 }
253 
254 /**
255  * svc_rdma_write_chunk_release - Release Write chunk I/O resources
256  * @rdma: controlling transport
257  * @ctxt: Send context that is being released
258  *
259  * Write chunk resources remain live until Send completion because
260  * Write WRs are chained to the Send WR. This function releases all
261  * write_info structures accumulated on @ctxt->sc_write_info_list.
262  */
263 void svc_rdma_write_chunk_release(struct svcxprt_rdma *rdma,
264 				  struct svc_rdma_send_ctxt *ctxt)
265 {
266 	struct svc_rdma_write_info *info;
267 
268 	while (!list_empty(&ctxt->sc_write_info_list)) {
269 		info = list_first_entry(&ctxt->sc_write_info_list,
270 					struct svc_rdma_write_info, wi_list);
271 		list_del(&info->wi_list);
272 		svc_rdma_write_info_free(info);
273 	}
274 }
275 
276 /**
277  * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
278  * @rdma: controlling transport
279  * @ctxt: Send context that is being released
280  */
281 void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
282 				  struct svc_rdma_send_ctxt *ctxt)
283 {
284 	struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc;
285 
286 	if (!cc->cc_sqecount)
287 		return;
288 	svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE);
289 }
290 
291 /**
292  * svc_rdma_reply_done - Reply chunk Write completion handler
293  * @cq: controlling Completion Queue
294  * @wc: Work Completion report
295  *
296  * Pages under I/O are released by a subsequent Send completion.
297  */
298 static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc)
299 {
300 	struct ib_cqe *cqe = wc->wr_cqe;
301 	struct svc_rdma_chunk_ctxt *cc =
302 			container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
303 	struct svcxprt_rdma *rdma = cq->cq_context;
304 
305 	switch (wc->status) {
306 	case IB_WC_SUCCESS:
307 		trace_svcrdma_wc_reply(&cc->cc_cid);
308 		return;
309 	case IB_WC_WR_FLUSH_ERR:
310 		trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid);
311 		break;
312 	default:
313 		trace_svcrdma_wc_reply_err(wc, &cc->cc_cid);
314 	}
315 
316 	svc_xprt_deferred_close(&rdma->sc_xprt);
317 }
318 
319 /**
320  * svc_rdma_write_done - Write chunk completion
321  * @cq: controlling Completion Queue
322  * @wc: Work Completion
323  *
324  * Pages under I/O are freed by a subsequent Send completion.
325  */
326 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
327 {
328 	struct svcxprt_rdma *rdma = cq->cq_context;
329 	struct ib_cqe *cqe = wc->wr_cqe;
330 	struct svc_rdma_chunk_ctxt *cc =
331 			container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
332 
333 	switch (wc->status) {
334 	case IB_WC_SUCCESS:
335 		trace_svcrdma_wc_write(&cc->cc_cid);
336 		return;
337 	case IB_WC_WR_FLUSH_ERR:
338 		trace_svcrdma_wc_write_flush(wc, &cc->cc_cid);
339 		break;
340 	default:
341 		trace_svcrdma_wc_write_err(wc, &cc->cc_cid);
342 	}
343 
344 	/* The RDMA Write has flushed, so the client won't get
345 	 * some of the outgoing RPC message. Signal the loss
346 	 * to the client by closing the connection.
347 	 */
348 	svc_xprt_deferred_close(&rdma->sc_xprt);
349 }
350 
351 /**
352  * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
353  * @cq: controlling Completion Queue
354  * @wc: Work Completion
355  *
356  */
357 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
358 {
359 	struct svcxprt_rdma *rdma = cq->cq_context;
360 	struct ib_cqe *cqe = wc->wr_cqe;
361 	struct svc_rdma_chunk_ctxt *cc =
362 			container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
363 	struct svc_rdma_recv_ctxt *ctxt;
364 
365 	svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
366 
367 	ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc);
368 	switch (wc->status) {
369 	case IB_WC_SUCCESS:
370 		trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes,
371 				      cc->cc_posttime);
372 
373 		spin_lock(&rdma->sc_rq_dto_lock);
374 		list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q);
375 		/* the unlock pairs with the smp_rmb in svc_xprt_ready */
376 		set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
377 		spin_unlock(&rdma->sc_rq_dto_lock);
378 		svc_xprt_enqueue(&rdma->sc_xprt);
379 		return;
380 	case IB_WC_WR_FLUSH_ERR:
381 		trace_svcrdma_wc_read_flush(wc, &cc->cc_cid);
382 		break;
383 	default:
384 		trace_svcrdma_wc_read_err(wc, &cc->cc_cid);
385 	}
386 
387 	/* The RDMA Read has flushed, so the incoming RPC message
388 	 * cannot be constructed and must be dropped. Signal the
389 	 * loss to the client by closing the connection.
390 	 */
391 	svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE);
392 	svc_rdma_recv_ctxt_put(rdma, ctxt);
393 	svc_xprt_deferred_close(&rdma->sc_xprt);
394 }
395 
396 /*
397  * Assumptions:
398  * - If ib_post_send() succeeds, only one completion is expected,
399  *   even if one or more WRs are flushed. This is true when posting
400  *   an rdma_rw_ctx or when posting a single signaled WR.
401  */
402 static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma,
403 				    struct svc_rdma_chunk_ctxt *cc)
404 {
405 	struct ib_send_wr *first_wr;
406 	const struct ib_send_wr *bad_wr;
407 	struct list_head *tmp;
408 	struct ib_cqe *cqe;
409 	int ret;
410 
411 	might_sleep();
412 
413 	if (cc->cc_sqecount > rdma->sc_sq_depth)
414 		return -EINVAL;
415 
416 	first_wr = NULL;
417 	cqe = &cc->cc_cqe;
418 	list_for_each(tmp, &cc->cc_rwctxts) {
419 		struct svc_rdma_rw_ctxt *ctxt;
420 
421 		ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
422 		first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
423 					   rdma->sc_port_num, cqe, first_wr);
424 		cqe = NULL;
425 	}
426 
427 	ret = svc_rdma_sq_wait(rdma, &cc->cc_cid, cc->cc_sqecount);
428 	if (ret < 0)
429 		return ret;
430 
431 	cc->cc_posttime = ktime_get();
432 	ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
433 	if (ret)
434 		return svc_rdma_post_send_err(rdma, &cc->cc_cid, bad_wr,
435 					      first_wr, cc->cc_sqecount,
436 					      ret);
437 	return 0;
438 }
439 
440 /* Build a bvec that covers one kvec in an xdr_buf.
441  */
442 static void svc_rdma_vec_to_bvec(struct svc_rdma_write_info *info,
443 				 unsigned int len,
444 				 struct svc_rdma_rw_ctxt *ctxt)
445 {
446 	bvec_set_virt(&ctxt->rw_bvec[0], info->wi_base, len);
447 	info->wi_base += len;
448 
449 	ctxt->rw_nents = 1;
450 }
451 
452 /* Build a bvec array that covers part of an xdr_buf's pagelist.
453  */
454 static void svc_rdma_pagelist_to_bvec(struct svc_rdma_write_info *info,
455 				      unsigned int remaining,
456 				      struct svc_rdma_rw_ctxt *ctxt)
457 {
458 	unsigned int bvec_idx, bvec_len, page_off, page_no;
459 	const struct xdr_buf *xdr = info->wi_xdr;
460 	struct page **page;
461 
462 	page_off = info->wi_next_off + xdr->page_base;
463 	page_no = page_off >> PAGE_SHIFT;
464 	page_off = offset_in_page(page_off);
465 	page = xdr->pages + page_no;
466 	info->wi_next_off += remaining;
467 	bvec_idx = 0;
468 	do {
469 		bvec_len = min_t(unsigned int, remaining,
470 				 PAGE_SIZE - page_off);
471 		bvec_set_page(&ctxt->rw_bvec[bvec_idx], *page, bvec_len,
472 			      page_off);
473 		remaining -= bvec_len;
474 		page_off = 0;
475 		bvec_idx++;
476 		page++;
477 	} while (remaining);
478 
479 	ctxt->rw_nents = bvec_idx;
480 }
481 
482 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
483  * an RPC Reply.
484  */
485 static int
486 svc_rdma_build_writes(struct svc_rdma_write_info *info,
487 		      void (*constructor)(struct svc_rdma_write_info *info,
488 					  unsigned int len,
489 					  struct svc_rdma_rw_ctxt *ctxt),
490 		      unsigned int remaining)
491 {
492 	struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
493 	struct svcxprt_rdma *rdma = info->wi_rdma;
494 	const struct svc_rdma_segment *seg;
495 	struct svc_rdma_rw_ctxt *ctxt;
496 	int ret;
497 
498 	do {
499 		unsigned int write_len;
500 		u64 offset;
501 
502 		if (info->wi_seg_no >= info->wi_chunk->ch_segcount)
503 			goto out_overflow;
504 
505 		seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
506 		write_len = min(remaining, seg->rs_length - info->wi_seg_off);
507 		if (!write_len)
508 			goto out_overflow;
509 		ctxt = svc_rdma_get_rw_ctxt(rdma,
510 					    (write_len >> PAGE_SHIFT) + 2);
511 		if (!ctxt)
512 			return -ENOMEM;
513 
514 		constructor(info, write_len, ctxt);
515 		offset = seg->rs_offset + info->wi_seg_off;
516 		ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
517 					   write_len, DMA_TO_DEVICE);
518 		if (ret < 0)
519 			return -EIO;
520 		percpu_counter_inc(&svcrdma_stat_write);
521 
522 		list_add(&ctxt->rw_list, &cc->cc_rwctxts);
523 		cc->cc_sqecount += ret;
524 		if (write_len == seg->rs_length - info->wi_seg_off) {
525 			info->wi_seg_no++;
526 			info->wi_seg_off = 0;
527 		} else {
528 			info->wi_seg_off += write_len;
529 		}
530 		remaining -= write_len;
531 	} while (remaining);
532 
533 	return 0;
534 
535 out_overflow:
536 	trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no,
537 				     info->wi_chunk->ch_segcount);
538 	return -E2BIG;
539 }
540 
541 /**
542  * svc_rdma_iov_write - Construct RDMA Writes from an iov
543  * @info: pointer to write arguments
544  * @iov: kvec to write
545  *
546  * Returns:
547  *   On success, returns zero
548  *   %-E2BIG if the client-provided Write chunk is too small
549  *   %-ENOMEM if a resource has been exhausted
550  *   %-EIO if an rdma-rw error occurred
551  */
552 static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
553 			      const struct kvec *iov)
554 {
555 	info->wi_base = iov->iov_base;
556 	return svc_rdma_build_writes(info, svc_rdma_vec_to_bvec,
557 				     iov->iov_len);
558 }
559 
560 /**
561  * svc_rdma_pages_write - Construct RDMA Writes from pages
562  * @info: pointer to write arguments
563  * @xdr: xdr_buf with pages to write
564  * @offset: offset into the content of @xdr
565  * @length: number of bytes to write
566  *
567  * Returns:
568  *   On success, returns zero
569  *   %-E2BIG if the client-provided Write chunk is too small
570  *   %-ENOMEM if a resource has been exhausted
571  *   %-EIO if an rdma-rw error occurred
572  */
573 static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
574 				const struct xdr_buf *xdr,
575 				unsigned int offset,
576 				unsigned long length)
577 {
578 	info->wi_xdr = xdr;
579 	info->wi_next_off = offset - xdr->head[0].iov_len;
580 	return svc_rdma_build_writes(info, svc_rdma_pagelist_to_bvec,
581 				     length);
582 }
583 
584 /**
585  * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
586  * @xdr: xdr_buf to write
587  * @data: pointer to write arguments
588  *
589  * Returns:
590  *   On success, returns zero
591  *   %-E2BIG if the client-provided Write chunk is too small
592  *   %-ENOMEM if a resource has been exhausted
593  *   %-EIO if an rdma-rw error occurred
594  */
595 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
596 {
597 	struct svc_rdma_write_info *info = data;
598 	int ret;
599 
600 	if (xdr->head[0].iov_len) {
601 		ret = svc_rdma_iov_write(info, &xdr->head[0]);
602 		if (ret < 0)
603 			return ret;
604 	}
605 
606 	if (xdr->page_len) {
607 		ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
608 					   xdr->page_len);
609 		if (ret < 0)
610 			return ret;
611 	}
612 
613 	if (xdr->tail[0].iov_len) {
614 		ret = svc_rdma_iov_write(info, &xdr->tail[0]);
615 		if (ret < 0)
616 			return ret;
617 	}
618 
619 	return xdr->len;
620 }
621 
622 /* Link chunk WRs onto @sctxt's WR chain. Completion is requested
623  * for the tail WR, which is posted first.
624  */
625 static void svc_rdma_cc_link_wrs(struct svcxprt_rdma *rdma,
626 				 struct svc_rdma_send_ctxt *sctxt,
627 				 struct svc_rdma_chunk_ctxt *cc)
628 {
629 	struct ib_send_wr *first_wr;
630 	struct list_head *pos;
631 	struct ib_cqe *cqe;
632 
633 	first_wr = sctxt->sc_wr_chain;
634 	cqe = &cc->cc_cqe;
635 	list_for_each(pos, &cc->cc_rwctxts) {
636 		struct svc_rdma_rw_ctxt *rwc;
637 
638 		rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
639 		first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
640 					   rdma->sc_port_num, cqe, first_wr);
641 		cqe = NULL;
642 	}
643 	sctxt->sc_wr_chain = first_wr;
644 	sctxt->sc_sqecount += cc->cc_sqecount;
645 }
646 
647 /* Link Write WRs for @chunk onto @sctxt's WR chain.
648  */
649 static int svc_rdma_prepare_write_chunk(struct svcxprt_rdma *rdma,
650 					struct svc_rdma_send_ctxt *sctxt,
651 					const struct svc_rdma_chunk *chunk,
652 					const struct xdr_buf *xdr)
653 {
654 	struct svc_rdma_write_info *info;
655 	struct svc_rdma_chunk_ctxt *cc;
656 	struct xdr_buf payload;
657 	int ret;
658 
659 	if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position,
660 			       chunk->ch_payload_length))
661 		return -EMSGSIZE;
662 
663 	info = svc_rdma_write_info_alloc(rdma, chunk);
664 	if (!info)
665 		return -ENOMEM;
666 	cc = &info->wi_cc;
667 
668 	ret = svc_rdma_xb_write(&payload, info);
669 	if (ret != payload.len)
670 		goto out_err;
671 
672 	ret = -EINVAL;
673 	if (unlikely(sctxt->sc_sqecount + cc->cc_sqecount > rdma->sc_sq_depth))
674 		goto out_err;
675 
676 	svc_rdma_cc_link_wrs(rdma, sctxt, cc);
677 	list_add(&info->wi_list, &sctxt->sc_write_info_list);
678 
679 	trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
680 	return 0;
681 
682 out_err:
683 	svc_rdma_write_info_free(info);
684 	return ret;
685 }
686 
687 /**
688  * svc_rdma_prepare_write_list - Construct WR chain for sending Write list
689  * @rdma: controlling RDMA transport
690  * @rctxt: Write list provisioned by the client
691  * @sctxt: Send WR resources
692  * @xdr: xdr_buf containing an RPC Reply message
693  *
694  * Returns zero on success, or a negative errno if WR chain
695  * construction fails for one or more Write chunks.
696  */
697 int svc_rdma_prepare_write_list(struct svcxprt_rdma *rdma,
698 				const struct svc_rdma_recv_ctxt *rctxt,
699 				struct svc_rdma_send_ctxt *sctxt,
700 				const struct xdr_buf *xdr)
701 {
702 	struct svc_rdma_chunk *chunk;
703 	int ret;
704 
705 	pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
706 		if (!chunk->ch_payload_length)
707 			break;
708 		ret = svc_rdma_prepare_write_chunk(rdma, sctxt, chunk, xdr);
709 		if (ret < 0)
710 			return ret;
711 	}
712 	return 0;
713 }
714 
715 /**
716  * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk
717  * @rdma: controlling RDMA transport
718  * @write_pcl: Write chunk list provided by client
719  * @reply_pcl: Reply chunk provided by client
720  * @sctxt: Send WR resources
721  * @xdr: xdr_buf containing an RPC Reply
722  *
723  * Returns a non-negative number of bytes the chunk consumed, or
724  *	%-E2BIG if the payload was larger than the Reply chunk,
725  *	%-EINVAL if client provided too many segments,
726  *	%-ENOMEM if rdma_rw context pool was exhausted,
727  *	%-ENOTCONN if posting failed (connection is lost),
728  *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
729  */
730 int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
731 				 const struct svc_rdma_pcl *write_pcl,
732 				 const struct svc_rdma_pcl *reply_pcl,
733 				 struct svc_rdma_send_ctxt *sctxt,
734 				 const struct xdr_buf *xdr)
735 {
736 	struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
737 	struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
738 	int ret;
739 
740 	info->wi_rdma = rdma;
741 	info->wi_chunk = pcl_first_chunk(reply_pcl);
742 	info->wi_seg_off = 0;
743 	info->wi_seg_no = 0;
744 	info->wi_cc.cc_cqe.done = svc_rdma_reply_done;
745 
746 	ret = pcl_process_nonpayloads(write_pcl, xdr,
747 				      svc_rdma_xb_write, info);
748 	if (ret < 0)
749 		return ret;
750 
751 	svc_rdma_cc_link_wrs(rdma, sctxt, cc);
752 
753 	trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
754 	return xdr->len;
755 }
756 
757 /*
758  * Cap contiguous RDMA Read sink allocations at order-4.
759  * Higher orders risk allocation failure under
760  * __GFP_NORETRY, which would negate the benefit of the
761  * contiguous fast path.
762  */
763 #define SVC_RDMA_CONTIG_MAX_ORDER	4
764 
765 /**
766  * svc_rdma_alloc_read_pages - Allocate physically contiguous pages
767  * @nr_pages: number of pages needed
768  * @order: on success, set to the allocation order
769  *
770  * Attempts a higher-order allocation, falling back to smaller orders.
771  * The returned pages are split immediately so each sub-page has its
772  * own refcount and can be freed independently.
773  *
774  * Returns a pointer to the first page on success, or NULL if even
775  * order-1 allocation fails.
776  */
777 static struct page *
778 svc_rdma_alloc_read_pages(unsigned int nr_pages, unsigned int *order)
779 {
780 	unsigned int o;
781 	struct page *page;
782 
783 	o = min(get_order(nr_pages << PAGE_SHIFT),
784 		SVC_RDMA_CONTIG_MAX_ORDER);
785 
786 	while (o >= 1) {
787 		page = alloc_pages(GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN,
788 				   o);
789 		if (page) {
790 			split_page(page, o);
791 			*order = o;
792 			return page;
793 		}
794 		o--;
795 	}
796 	return NULL;
797 }
798 
799 /*
800  * svc_rdma_fill_contig_bvec - Replace rq_pages with a contiguous allocation
801  * @rqstp: RPC transaction context
802  * @head: context for ongoing I/O
803  * @bv: bvec entry to fill
804  * @pages_left: number of data pages remaining in the segment
805  * @len_left: bytes remaining in the segment
806  *
807  * On success, fills @bv with a bvec spanning the contiguous range and
808  * advances rc_curpage/rc_page_count. Returns the byte length covered,
809  * or zero if the allocation failed or would overrun rq_maxpages.
810  */
811 static unsigned int
812 svc_rdma_fill_contig_bvec(struct svc_rqst *rqstp,
813 			  struct svc_rdma_recv_ctxt *head,
814 			  struct bio_vec *bv, unsigned int pages_left,
815 			  unsigned int len_left)
816 {
817 	unsigned int order, npages, chunk_pages, chunk_len, i;
818 	struct page *page;
819 
820 	page = svc_rdma_alloc_read_pages(pages_left, &order);
821 	if (!page)
822 		return 0;
823 	npages = 1 << order;
824 
825 	if (head->rc_curpage + npages > rqstp->rq_maxpages) {
826 		for (i = 0; i < npages; i++)
827 			__free_page(page + i);
828 		return 0;
829 	}
830 
831 	/*
832 	 * Replace rq_pages[] entries with pages from the contiguous
833 	 * allocation. If npages exceeds chunk_pages, the extra pages
834 	 * stay in rq_pages[] for later reuse or normal rqst teardown.
835 	 */
836 	for (i = 0; i < npages; i++) {
837 		svc_rqst_page_release(rqstp,
838 				      rqstp->rq_pages[head->rc_curpage + i]);
839 		rqstp->rq_pages[head->rc_curpage + i] = page + i;
840 	}
841 
842 	chunk_pages = min(npages, pages_left);
843 	chunk_len = min_t(unsigned int, chunk_pages << PAGE_SHIFT, len_left);
844 	bvec_set_page(bv, page, chunk_len, 0);
845 	head->rc_page_count += chunk_pages;
846 	head->rc_curpage += chunk_pages;
847 	return chunk_len;
848 }
849 
850 /*
851  * svc_rdma_fill_page_bvec - Add a single rq_page to the bvec array
852  * @head: context for ongoing I/O
853  * @ctxt: R/W context whose bvec array is being filled
854  * @cur: page to add
855  * @bvec_idx: pointer to current bvec index, not advanced on merge
856  * @len_left: bytes remaining in the segment
857  *
858  * If @cur is physically contiguous with the preceding bvec, it is
859  * merged by extending that bvec's length. Otherwise a new bvec
860  * entry is created. Returns the byte length covered.
861  */
862 static unsigned int
863 svc_rdma_fill_page_bvec(struct svc_rdma_recv_ctxt *head,
864 			struct svc_rdma_rw_ctxt *ctxt, struct page *cur,
865 			unsigned int *bvec_idx, unsigned int len_left)
866 {
867 	unsigned int chunk_len = min_t(unsigned int, PAGE_SIZE, len_left);
868 
869 	head->rc_page_count++;
870 	head->rc_curpage++;
871 
872 	if (*bvec_idx > 0) {
873 		struct bio_vec *prev = &ctxt->rw_bvec[*bvec_idx - 1];
874 
875 		if (page_to_phys(prev->bv_page) + prev->bv_offset +
876 		    prev->bv_len == page_to_phys(cur)) {
877 			prev->bv_len += chunk_len;
878 			return chunk_len;
879 		}
880 	}
881 
882 	bvec_set_page(&ctxt->rw_bvec[*bvec_idx], cur, chunk_len, 0);
883 	(*bvec_idx)++;
884 	return chunk_len;
885 }
886 
887 /**
888  * svc_rdma_build_read_segment_contig - Build RDMA Read WR with contiguous pages
889  * @rqstp: RPC transaction context
890  * @head: context for ongoing I/O
891  * @segment: co-ordinates of remote memory to be read
892  *
893  * Greedily allocates higher-order pages to cover the segment,
894  * building one bvec per contiguous chunk. Each allocation is
895  * split so sub-pages have independent refcounts. When a
896  * higher-order allocation fails, remaining pages are covered
897  * individually, merging adjacent pages into the preceding bvec
898  * when they are physically contiguous. The split sub-pages
899  * replace entries in rq_pages[] so downstream cleanup is
900  * unchanged.
901  *
902  * Returns:
903  *   %0: the Read WR was constructed successfully
904  *   %-ENOMEM: allocation failed
905  *   %-EIO: a DMA mapping error occurred
906  */
907 static int svc_rdma_build_read_segment_contig(struct svc_rqst *rqstp,
908 					      struct svc_rdma_recv_ctxt *head,
909 					      const struct svc_rdma_segment *segment)
910 {
911 	struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp);
912 	struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
913 	unsigned int nr_data_pages, bvec_idx;
914 	struct svc_rdma_rw_ctxt *ctxt;
915 	unsigned int len_left;
916 	int ret;
917 
918 	nr_data_pages = PAGE_ALIGN(segment->rs_length) >> PAGE_SHIFT;
919 	if (head->rc_curpage + nr_data_pages > rqstp->rq_maxpages)
920 		return -ENOMEM;
921 
922 	ctxt = svc_rdma_get_rw_ctxt(rdma, nr_data_pages);
923 	if (!ctxt)
924 		return -ENOMEM;
925 
926 	bvec_idx = 0;
927 	len_left = segment->rs_length;
928 	while (len_left) {
929 		unsigned int pages_left = PAGE_ALIGN(len_left) >> PAGE_SHIFT;
930 		unsigned int chunk_len = 0;
931 
932 		if (pages_left >= 2)
933 			chunk_len = svc_rdma_fill_contig_bvec(rqstp, head,
934 							      &ctxt->rw_bvec[bvec_idx],
935 							      pages_left, len_left);
936 		if (chunk_len) {
937 			bvec_idx++;
938 		} else {
939 			struct page *cur =
940 				rqstp->rq_pages[head->rc_curpage];
941 			chunk_len = svc_rdma_fill_page_bvec(head, ctxt, cur,
942 							    &bvec_idx,
943 							    len_left);
944 		}
945 
946 		len_left -= chunk_len;
947 	}
948 
949 	ctxt->rw_nents = bvec_idx;
950 
951 	head->rc_pageoff = offset_in_page(segment->rs_length);
952 	if (head->rc_pageoff)
953 		head->rc_curpage--;
954 
955 	ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset,
956 				   segment->rs_handle, segment->rs_length,
957 				   DMA_FROM_DEVICE);
958 	if (ret < 0)
959 		return -EIO;
960 	percpu_counter_inc(&svcrdma_stat_read);
961 
962 	list_add(&ctxt->rw_list, &cc->cc_rwctxts);
963 	cc->cc_sqecount += ret;
964 	return 0;
965 }
966 
967 /**
968  * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
969  * @rqstp: RPC transaction context
970  * @head: context for ongoing I/O
971  * @segment: co-ordinates of remote memory to be read
972  *
973  * Returns:
974  *   %0: the Read WR chain was constructed successfully
975  *   %-EINVAL: there were not enough rq_pages to finish
976  *   %-ENOMEM: allocating a local resources failed
977  *   %-EIO: a DMA mapping error occurred
978  */
979 static int svc_rdma_build_read_segment(struct svc_rqst *rqstp,
980 				       struct svc_rdma_recv_ctxt *head,
981 				       const struct svc_rdma_segment *segment)
982 {
983 	struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp);
984 	struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
985 	unsigned int bvec_idx, nr_bvec, seg_len, len, total;
986 	struct svc_rdma_rw_ctxt *ctxt;
987 	int ret;
988 
989 	len = segment->rs_length;
990 	if (check_add_overflow(head->rc_pageoff, len, &total))
991 		return -EINVAL;
992 	nr_bvec = PAGE_ALIGN(total) >> PAGE_SHIFT;
993 
994 	if (head->rc_pageoff == 0 && nr_bvec >= 2) {
995 		ret = svc_rdma_build_read_segment_contig(rqstp, head,
996 							 segment);
997 		if (ret != -ENOMEM)
998 			return ret;
999 	}
1000 
1001 	ctxt = svc_rdma_get_rw_ctxt(rdma, nr_bvec);
1002 	if (!ctxt)
1003 		return -ENOMEM;
1004 	ctxt->rw_nents = nr_bvec;
1005 
1006 	for (bvec_idx = 0; bvec_idx < ctxt->rw_nents; bvec_idx++) {
1007 		seg_len = min_t(unsigned int, len,
1008 				PAGE_SIZE - head->rc_pageoff);
1009 
1010 		if (!head->rc_pageoff)
1011 			head->rc_page_count++;
1012 
1013 		bvec_set_page(&ctxt->rw_bvec[bvec_idx],
1014 			      rqstp->rq_pages[head->rc_curpage],
1015 			      seg_len, head->rc_pageoff);
1016 
1017 		head->rc_pageoff += seg_len;
1018 		if (head->rc_pageoff == PAGE_SIZE) {
1019 			head->rc_curpage++;
1020 			head->rc_pageoff = 0;
1021 		}
1022 		len -= seg_len;
1023 
1024 		if (len && ((head->rc_curpage + 1) > rqstp->rq_maxpages))
1025 			goto out_overrun;
1026 	}
1027 
1028 	ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset,
1029 				   segment->rs_handle, segment->rs_length,
1030 				   DMA_FROM_DEVICE);
1031 	if (ret < 0)
1032 		return -EIO;
1033 	percpu_counter_inc(&svcrdma_stat_read);
1034 
1035 	list_add(&ctxt->rw_list, &cc->cc_rwctxts);
1036 	cc->cc_sqecount += ret;
1037 	return 0;
1038 
1039 out_overrun:
1040 	trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage);
1041 	return -EINVAL;
1042 }
1043 
1044 /**
1045  * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
1046  * @rqstp: RPC transaction context
1047  * @head: context for ongoing I/O
1048  * @chunk: Read chunk to pull
1049  *
1050  * Return values:
1051  *   %0: the Read WR chain was constructed successfully
1052  *   %-EINVAL: there were not enough resources to finish
1053  *   %-ENOMEM: allocating a local resources failed
1054  *   %-EIO: a DMA mapping error occurred
1055  */
1056 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
1057 				     struct svc_rdma_recv_ctxt *head,
1058 				     const struct svc_rdma_chunk *chunk)
1059 {
1060 	const struct svc_rdma_segment *segment;
1061 	int ret;
1062 
1063 	ret = -EINVAL;
1064 	pcl_for_each_segment(segment, chunk) {
1065 		ret = svc_rdma_build_read_segment(rqstp, head, segment);
1066 		if (ret < 0)
1067 			break;
1068 		head->rc_readbytes += segment->rs_length;
1069 	}
1070 	return ret;
1071 }
1072 
1073 /**
1074  * svc_rdma_copy_inline_range - Copy part of the inline content into pages
1075  * @rqstp: RPC transaction context
1076  * @head: context for ongoing I/O
1077  * @offset: offset into the Receive buffer of region to copy
1078  * @remaining: length of region to copy
1079  *
1080  * Take a page at a time from rqstp->rq_pages and copy the inline
1081  * content from the Receive buffer into that page. Update
1082  * head->rc_curpage and head->rc_pageoff so that the next RDMA Read
1083  * result will land contiguously with the copied content.
1084  *
1085  * Return values:
1086  *   %0: Inline content was successfully copied
1087  *   %-EINVAL: offset or length was incorrect
1088  */
1089 static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp,
1090 				      struct svc_rdma_recv_ctxt *head,
1091 				      unsigned int offset,
1092 				      unsigned int remaining)
1093 {
1094 	unsigned char *dst, *src = head->rc_recv_buf;
1095 	unsigned int page_no, numpages;
1096 
1097 	numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT;
1098 	for (page_no = 0; page_no < numpages; page_no++) {
1099 		unsigned int page_len;
1100 
1101 		if (head->rc_curpage >= rqstp->rq_maxpages)
1102 			return -EINVAL;
1103 
1104 		page_len = min_t(unsigned int, remaining,
1105 				 PAGE_SIZE - head->rc_pageoff);
1106 
1107 		if (!head->rc_pageoff)
1108 			head->rc_page_count++;
1109 
1110 		dst = page_address(rqstp->rq_pages[head->rc_curpage]);
1111 		memcpy((unsigned char *)dst + head->rc_pageoff, src + offset, page_len);
1112 
1113 		head->rc_readbytes += page_len;
1114 		head->rc_pageoff += page_len;
1115 		if (head->rc_pageoff == PAGE_SIZE) {
1116 			head->rc_curpage++;
1117 			head->rc_pageoff = 0;
1118 		}
1119 		remaining -= page_len;
1120 		offset += page_len;
1121 	}
1122 
1123 	return 0;
1124 }
1125 
1126 /**
1127  * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
1128  * @rqstp: RPC transaction context
1129  * @head: context for ongoing I/O
1130  *
1131  * The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
1132  * like an incoming TCP call.
1133  *
1134  * Return values:
1135  *   %0: RDMA Read WQEs were successfully built
1136  *   %-EINVAL: client provided too many chunks or segments,
1137  *   %-ENOMEM: rdma_rw context pool was exhausted,
1138  *   %-ENOTCONN: posting failed (connection is lost),
1139  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1140  */
1141 static noinline int
1142 svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
1143 			      struct svc_rdma_recv_ctxt *head)
1144 {
1145 	const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
1146 	struct svc_rdma_chunk *chunk, *next;
1147 	unsigned int start, length;
1148 	int ret;
1149 
1150 	start = 0;
1151 	chunk = pcl_first_chunk(pcl);
1152 	length = chunk->ch_position;
1153 	ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
1154 	if (ret < 0)
1155 		return ret;
1156 
1157 	pcl_for_each_chunk(chunk, pcl) {
1158 		ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
1159 		if (ret < 0)
1160 			return ret;
1161 
1162 		next = pcl_next_chunk(pcl, chunk);
1163 		if (!next)
1164 			break;
1165 
1166 		start += length;
1167 		length = next->ch_position - head->rc_readbytes;
1168 		ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
1169 		if (ret < 0)
1170 			return ret;
1171 	}
1172 
1173 	start += length;
1174 	length = head->rc_byte_len - start;
1175 	return svc_rdma_copy_inline_range(rqstp, head, start, length);
1176 }
1177 
1178 /**
1179  * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
1180  * @rqstp: RPC transaction context
1181  * @head: context for ongoing I/O
1182  *
1183  * The chunk data lands in the page list of rqstp->rq_arg.pages.
1184  *
1185  * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec.
1186  * Therefore, XDR round-up of the Read chunk and trailing
1187  * inline content must both be added at the end of the pagelist.
1188  *
1189  * Return values:
1190  *   %0: RDMA Read WQEs were successfully built
1191  *   %-EINVAL: client provided too many chunks or segments,
1192  *   %-ENOMEM: rdma_rw context pool was exhausted,
1193  *   %-ENOTCONN: posting failed (connection is lost),
1194  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1195  */
1196 static int svc_rdma_read_data_item(struct svc_rqst *rqstp,
1197 				   struct svc_rdma_recv_ctxt *head)
1198 {
1199 	return svc_rdma_build_read_chunk(rqstp, head,
1200 					 pcl_first_chunk(&head->rc_read_pcl));
1201 }
1202 
1203 /**
1204  * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk
1205  * @rqstp: RPC transaction context
1206  * @head: context for ongoing I/O
1207  * @chunk: parsed Call chunk to pull
1208  * @offset: offset of region to pull
1209  * @length: length of region to pull
1210  *
1211  * Return values:
1212  *   %0: RDMA Read WQEs were successfully built
1213  *   %-EINVAL: there were not enough resources to finish
1214  *   %-ENOMEM: rdma_rw context pool was exhausted,
1215  *   %-ENOTCONN: posting failed (connection is lost),
1216  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1217  */
1218 static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp,
1219 				     struct svc_rdma_recv_ctxt *head,
1220 				     const struct svc_rdma_chunk *chunk,
1221 				     unsigned int offset, unsigned int length)
1222 {
1223 	const struct svc_rdma_segment *segment;
1224 	int ret;
1225 
1226 	ret = -EINVAL;
1227 	pcl_for_each_segment(segment, chunk) {
1228 		struct svc_rdma_segment dummy;
1229 
1230 		if (offset > segment->rs_length) {
1231 			offset -= segment->rs_length;
1232 			continue;
1233 		}
1234 
1235 		dummy.rs_handle = segment->rs_handle;
1236 		dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
1237 		dummy.rs_offset = segment->rs_offset + offset;
1238 
1239 		ret = svc_rdma_build_read_segment(rqstp, head, &dummy);
1240 		if (ret < 0)
1241 			break;
1242 
1243 		head->rc_readbytes += dummy.rs_length;
1244 		length -= dummy.rs_length;
1245 		offset = 0;
1246 	}
1247 	return ret;
1248 }
1249 
1250 /**
1251  * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
1252  * @rqstp: RPC transaction context
1253  * @head: context for ongoing I/O
1254  *
1255  * Return values:
1256  *   %0: RDMA Read WQEs were successfully built
1257  *   %-EINVAL: there were not enough resources to finish
1258  *   %-ENOMEM: rdma_rw context pool was exhausted,
1259  *   %-ENOTCONN: posting failed (connection is lost),
1260  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1261  */
1262 static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp,
1263 				    struct svc_rdma_recv_ctxt *head)
1264 {
1265 	const struct svc_rdma_chunk *call_chunk =
1266 			pcl_first_chunk(&head->rc_call_pcl);
1267 	const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
1268 	struct svc_rdma_chunk *chunk, *next;
1269 	unsigned int start, length;
1270 	int ret;
1271 
1272 	if (pcl_is_empty(pcl))
1273 		return svc_rdma_build_read_chunk(rqstp, head, call_chunk);
1274 
1275 	start = 0;
1276 	chunk = pcl_first_chunk(pcl);
1277 	length = chunk->ch_position;
1278 	ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1279 					start, length);
1280 	if (ret < 0)
1281 		return ret;
1282 
1283 	pcl_for_each_chunk(chunk, pcl) {
1284 		ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
1285 		if (ret < 0)
1286 			return ret;
1287 
1288 		next = pcl_next_chunk(pcl, chunk);
1289 		if (!next)
1290 			break;
1291 
1292 		start += length;
1293 		length = next->ch_position - head->rc_readbytes;
1294 		ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1295 						start, length);
1296 		if (ret < 0)
1297 			return ret;
1298 	}
1299 
1300 	start += length;
1301 	length = call_chunk->ch_length - start;
1302 	return svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1303 					 start, length);
1304 }
1305 
1306 /**
1307  * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1308  * @rqstp: RPC transaction context
1309  * @head: context for ongoing I/O
1310  *
1311  * The start of the data lands in the first page just after the
1312  * Transport header, and the rest lands in rqstp->rq_arg.pages.
1313  *
1314  * Assumptions:
1315  *	- A PZRC is never sent in an RDMA_MSG message, though it's
1316  *	  allowed by spec.
1317  *
1318  * Return values:
1319  *   %0: RDMA Read WQEs were successfully built
1320  *   %-EINVAL: client provided too many chunks or segments,
1321  *   %-ENOMEM: rdma_rw context pool was exhausted,
1322  *   %-ENOTCONN: posting failed (connection is lost),
1323  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1324  */
1325 static noinline int svc_rdma_read_special(struct svc_rqst *rqstp,
1326 					  struct svc_rdma_recv_ctxt *head)
1327 {
1328 	return svc_rdma_read_call_chunk(rqstp, head);
1329 }
1330 
1331 /* Pages under I/O have been copied to head->rc_pages. Ensure that
1332  * svc_xprt_release() does not put them when svc_rdma_recvfrom()
1333  * returns. This has to be done after all Read WRs are constructed
1334  * to properly handle a page that happens to be part of I/O on behalf
1335  * of two different RDMA segments.
1336  *
1337  * Note: if the subsequent post_send fails, these pages have already
1338  * been moved to head->rc_pages and thus will be cleaned up by
1339  * svc_rdma_recv_ctxt_put().
1340  */
1341 static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp,
1342 				      struct svc_rdma_recv_ctxt *head)
1343 {
1344 	unsigned int i;
1345 
1346 	/*
1347 	 * Move only pages containing RPC data into rc_pages[]. Pages
1348 	 * from a contiguous allocation that were not used for the
1349 	 * payload remain in rq_pages[] for subsequent reuse.
1350 	 */
1351 	for (i = 0; i < head->rc_page_count; i++) {
1352 		head->rc_pages[i] = rqstp->rq_pages[i];
1353 		rqstp->rq_pages[i] = NULL;
1354 	}
1355 	rqstp->rq_pages_nfree = head->rc_page_count;
1356 }
1357 
1358 /**
1359  * svc_rdma_process_read_list - Pull list of Read chunks from the client
1360  * @rdma: controlling RDMA transport
1361  * @rqstp: set of pages to use as Read sink buffers
1362  * @head: pages under I/O collect here
1363  *
1364  * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1365  * pull each Read chunk as they decode an incoming RPC message.
1366  *
1367  * On Linux, however, the server needs to have a fully-constructed RPC
1368  * message in rqstp->rq_arg when there is a positive return code from
1369  * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1370  * it is received, then here the whole Read list is pulled all at once.
1371  * The ingress RPC message is fully reconstructed once all associated
1372  * RDMA Reads have completed.
1373  *
1374  * Return values:
1375  *   %1: all needed RDMA Reads were posted successfully,
1376  *   %-EINVAL: client provided too many chunks or segments,
1377  *   %-ENOMEM: rdma_rw context pool was exhausted,
1378  *   %-ENOTCONN: posting failed (connection is lost),
1379  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1380  */
1381 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
1382 			       struct svc_rqst *rqstp,
1383 			       struct svc_rdma_recv_ctxt *head)
1384 {
1385 	struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
1386 	int ret;
1387 
1388 	cc->cc_cqe.done = svc_rdma_wc_read_done;
1389 	cc->cc_sqecount = 0;
1390 	head->rc_pageoff = 0;
1391 	head->rc_curpage = 0;
1392 	head->rc_readbytes = 0;
1393 
1394 	if (pcl_is_empty(&head->rc_call_pcl)) {
1395 		if (head->rc_read_pcl.cl_count == 1)
1396 			ret = svc_rdma_read_data_item(rqstp, head);
1397 		else
1398 			ret = svc_rdma_read_multiple_chunks(rqstp, head);
1399 	} else
1400 		ret = svc_rdma_read_special(rqstp, head);
1401 	svc_rdma_clear_rqst_pages(rqstp, head);
1402 	if (ret < 0)
1403 		return ret;
1404 
1405 	trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
1406 	ret = svc_rdma_post_chunk_ctxt(rdma, cc);
1407 	return ret < 0 ? ret : 1;
1408 }
1409