1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Copyright (c) 2016-2018 Oracle. All rights reserved.
4 *
5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
6 */
7
8 #include <linux/bvec.h>
9 #include <linux/overflow.h>
10 #include <rdma/rw.h>
11
12 #include <linux/sunrpc/xdr.h>
13 #include <linux/sunrpc/rpc_rdma.h>
14 #include <linux/sunrpc/svc_rdma.h>
15
16 #include "xprt_rdma.h"
17 #include <trace/events/rpcrdma.h>
18
19 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
20 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
21
22 /* Each R/W context contains state for one chain of RDMA Read or
23 * Write Work Requests.
24 *
25 * Each WR chain handles a single contiguous server-side buffer.
26 * - each xdr_buf iovec is a single contiguous buffer
27 * - the xdr_buf pages array is a single contiguous buffer because the
28 * second through the last element always start on a page boundary
29 *
30 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
31 * from a client may contain a unique R_key, so each WR chain moves
32 * up to one segment at a time.
33 *
34 * The inline bvec array is sized to handle most I/O requests without
35 * additional allocation. Larger requests fall back to dynamic allocation.
36 * These contexts are created on demand, but cached and reused until
37 * the controlling svcxprt_rdma is destroyed.
38 */
39 struct svc_rdma_rw_ctxt {
40 struct llist_node rw_node;
41 struct list_head rw_list;
42 struct rdma_rw_ctx rw_ctx;
43 unsigned int rw_nents;
44 unsigned int rw_first_bvec_nents;
45 struct bio_vec *rw_bvec;
46 struct bio_vec rw_first_bvec[];
47 };
48
49 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
50 struct svc_rdma_rw_ctxt *ctxt);
51
52 static inline struct svc_rdma_rw_ctxt *
svc_rdma_next_ctxt(struct list_head * list)53 svc_rdma_next_ctxt(struct list_head *list)
54 {
55 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
56 rw_list);
57 }
58
59 static struct svc_rdma_rw_ctxt *
svc_rdma_get_rw_ctxt(struct svcxprt_rdma * rdma,unsigned int nr_bvec)60 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int nr_bvec)
61 {
62 struct ib_device *dev = rdma->sc_cm_id->device;
63 unsigned int first_bvec_nents = dev->attrs.max_send_sge;
64 struct svc_rdma_rw_ctxt *ctxt;
65 struct llist_node *node;
66
67 spin_lock(&rdma->sc_rw_ctxt_lock);
68 node = llist_del_first(&rdma->sc_rw_ctxts);
69 spin_unlock(&rdma->sc_rw_ctxt_lock);
70 if (node) {
71 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
72 } else {
73 ctxt = kmalloc_node(struct_size(ctxt, rw_first_bvec,
74 first_bvec_nents),
75 GFP_KERNEL, ibdev_to_node(dev));
76 if (!ctxt)
77 goto out_noctx;
78
79 INIT_LIST_HEAD(&ctxt->rw_list);
80 ctxt->rw_first_bvec_nents = first_bvec_nents;
81 }
82
83 if (nr_bvec <= ctxt->rw_first_bvec_nents) {
84 ctxt->rw_bvec = ctxt->rw_first_bvec;
85 } else {
86 ctxt->rw_bvec = kmalloc_array_node(nr_bvec,
87 sizeof(*ctxt->rw_bvec),
88 GFP_KERNEL,
89 ibdev_to_node(dev));
90 if (!ctxt->rw_bvec)
91 goto out_free;
92 }
93 return ctxt;
94
95 out_free:
96 /* Return cached contexts to cache; free freshly allocated ones */
97 if (node)
98 svc_rdma_put_rw_ctxt(rdma, ctxt);
99 else
100 kfree(ctxt);
101 out_noctx:
102 trace_svcrdma_rwctx_empty(rdma, nr_bvec);
103 return NULL;
104 }
105
__svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt * ctxt,struct llist_head * list)106 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt,
107 struct llist_head *list)
108 {
109 if (ctxt->rw_bvec != ctxt->rw_first_bvec)
110 kfree(ctxt->rw_bvec);
111 llist_add(&ctxt->rw_node, list);
112 }
113
svc_rdma_put_rw_ctxt(struct svcxprt_rdma * rdma,struct svc_rdma_rw_ctxt * ctxt)114 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
115 struct svc_rdma_rw_ctxt *ctxt)
116 {
117 __svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts);
118 }
119
120 /**
121 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
122 * @rdma: transport about to be destroyed
123 *
124 */
svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma * rdma)125 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
126 {
127 struct svc_rdma_rw_ctxt *ctxt;
128 struct llist_node *node;
129
130 while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) {
131 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
132 kfree(ctxt);
133 }
134 }
135
136 /**
137 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
138 * @rdma: controlling transport instance
139 * @ctxt: R/W context to prepare
140 * @offset: RDMA offset
141 * @handle: RDMA tag/handle
142 * @length: total number of bytes in the bvec array
143 * @direction: I/O direction
144 *
145 * Returns on success, the number of WQEs that will be needed
146 * on the workqueue, or a negative errno.
147 */
svc_rdma_rw_ctx_init(struct svcxprt_rdma * rdma,struct svc_rdma_rw_ctxt * ctxt,u64 offset,u32 handle,unsigned int length,enum dma_data_direction direction)148 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
149 struct svc_rdma_rw_ctxt *ctxt,
150 u64 offset, u32 handle, unsigned int length,
151 enum dma_data_direction direction)
152 {
153 struct bvec_iter iter = {
154 .bi_size = length,
155 };
156 int ret;
157
158 ret = rdma_rw_ctx_init_bvec(&ctxt->rw_ctx, rdma->sc_qp,
159 rdma->sc_port_num,
160 ctxt->rw_bvec, ctxt->rw_nents,
161 iter, offset, handle, direction);
162 if (unlikely(ret < 0)) {
163 trace_svcrdma_dma_map_rw_err(rdma, offset, handle,
164 ctxt->rw_nents, ret);
165 svc_rdma_put_rw_ctxt(rdma, ctxt);
166 }
167 return ret;
168 }
169
170 /**
171 * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt
172 * @rdma: controlling transport instance
173 * @cc: svc_rdma_chunk_ctxt to be initialized
174 */
svc_rdma_cc_init(struct svcxprt_rdma * rdma,struct svc_rdma_chunk_ctxt * cc)175 void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
176 struct svc_rdma_chunk_ctxt *cc)
177 {
178 struct rpc_rdma_cid *cid = &cc->cc_cid;
179
180 if (unlikely(!cid->ci_completion_id))
181 svc_rdma_send_cid_init(rdma, cid);
182
183 INIT_LIST_HEAD(&cc->cc_rwctxts);
184 cc->cc_sqecount = 0;
185 }
186
187 /**
188 * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
189 * @rdma: controlling transport instance
190 * @cc: svc_rdma_chunk_ctxt to be released
191 * @dir: DMA direction
192 */
svc_rdma_cc_release(struct svcxprt_rdma * rdma,struct svc_rdma_chunk_ctxt * cc,enum dma_data_direction dir)193 void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
194 struct svc_rdma_chunk_ctxt *cc,
195 enum dma_data_direction dir)
196 {
197 struct llist_node *first, *last;
198 struct svc_rdma_rw_ctxt *ctxt;
199
200 trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount);
201
202 first = last = NULL;
203 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
204 list_del(&ctxt->rw_list);
205
206 rdma_rw_ctx_destroy_bvec(&ctxt->rw_ctx, rdma->sc_qp,
207 rdma->sc_port_num,
208 ctxt->rw_bvec, ctxt->rw_nents, dir);
209 if (ctxt->rw_bvec != ctxt->rw_first_bvec)
210 kfree(ctxt->rw_bvec);
211
212 ctxt->rw_node.next = first;
213 first = &ctxt->rw_node;
214 if (!last)
215 last = first;
216 }
217 if (first)
218 llist_add_batch(first, last, &rdma->sc_rw_ctxts);
219 }
220
221 static struct svc_rdma_write_info *
svc_rdma_write_info_alloc(struct svcxprt_rdma * rdma,const struct svc_rdma_chunk * chunk)222 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
223 const struct svc_rdma_chunk *chunk)
224 {
225 struct svc_rdma_write_info *info;
226
227 info = kzalloc_node(sizeof(*info), GFP_KERNEL,
228 ibdev_to_node(rdma->sc_cm_id->device));
229 if (!info)
230 return info;
231
232 info->wi_rdma = rdma;
233 info->wi_chunk = chunk;
234 svc_rdma_cc_init(rdma, &info->wi_cc);
235 info->wi_cc.cc_cqe.done = svc_rdma_write_done;
236 return info;
237 }
238
svc_rdma_write_info_free_async(struct work_struct * work)239 static void svc_rdma_write_info_free_async(struct work_struct *work)
240 {
241 struct svc_rdma_write_info *info;
242
243 info = container_of(work, struct svc_rdma_write_info, wi_work);
244 svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE);
245 kfree(info);
246 }
247
svc_rdma_write_info_free(struct svc_rdma_write_info * info)248 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
249 {
250 INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async);
251 queue_work(svcrdma_wq, &info->wi_work);
252 }
253
254 /**
255 * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
256 * @rdma: controlling transport
257 * @ctxt: Send context that is being released
258 */
svc_rdma_reply_chunk_release(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * ctxt)259 void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
260 struct svc_rdma_send_ctxt *ctxt)
261 {
262 struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc;
263
264 if (!cc->cc_sqecount)
265 return;
266 svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE);
267 }
268
269 /**
270 * svc_rdma_reply_done - Reply chunk Write completion handler
271 * @cq: controlling Completion Queue
272 * @wc: Work Completion report
273 *
274 * Pages under I/O are released by a subsequent Send completion.
275 */
svc_rdma_reply_done(struct ib_cq * cq,struct ib_wc * wc)276 static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc)
277 {
278 struct ib_cqe *cqe = wc->wr_cqe;
279 struct svc_rdma_chunk_ctxt *cc =
280 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
281 struct svcxprt_rdma *rdma = cq->cq_context;
282
283 switch (wc->status) {
284 case IB_WC_SUCCESS:
285 trace_svcrdma_wc_reply(&cc->cc_cid);
286 return;
287 case IB_WC_WR_FLUSH_ERR:
288 trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid);
289 break;
290 default:
291 trace_svcrdma_wc_reply_err(wc, &cc->cc_cid);
292 }
293
294 svc_xprt_deferred_close(&rdma->sc_xprt);
295 }
296
297 /**
298 * svc_rdma_write_done - Write chunk completion
299 * @cq: controlling Completion Queue
300 * @wc: Work Completion
301 *
302 * Pages under I/O are freed by a subsequent Send completion.
303 */
svc_rdma_write_done(struct ib_cq * cq,struct ib_wc * wc)304 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
305 {
306 struct svcxprt_rdma *rdma = cq->cq_context;
307 struct ib_cqe *cqe = wc->wr_cqe;
308 struct svc_rdma_chunk_ctxt *cc =
309 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
310 struct svc_rdma_write_info *info =
311 container_of(cc, struct svc_rdma_write_info, wi_cc);
312
313 switch (wc->status) {
314 case IB_WC_SUCCESS:
315 trace_svcrdma_wc_write(&cc->cc_cid);
316 break;
317 case IB_WC_WR_FLUSH_ERR:
318 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid);
319 break;
320 default:
321 trace_svcrdma_wc_write_err(wc, &cc->cc_cid);
322 }
323
324 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
325
326 if (unlikely(wc->status != IB_WC_SUCCESS))
327 svc_xprt_deferred_close(&rdma->sc_xprt);
328
329 svc_rdma_write_info_free(info);
330 }
331
332 /**
333 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
334 * @cq: controlling Completion Queue
335 * @wc: Work Completion
336 *
337 */
svc_rdma_wc_read_done(struct ib_cq * cq,struct ib_wc * wc)338 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
339 {
340 struct svcxprt_rdma *rdma = cq->cq_context;
341 struct ib_cqe *cqe = wc->wr_cqe;
342 struct svc_rdma_chunk_ctxt *cc =
343 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
344 struct svc_rdma_recv_ctxt *ctxt;
345
346 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
347
348 ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc);
349 switch (wc->status) {
350 case IB_WC_SUCCESS:
351 trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes,
352 cc->cc_posttime);
353
354 spin_lock(&rdma->sc_rq_dto_lock);
355 list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q);
356 /* the unlock pairs with the smp_rmb in svc_xprt_ready */
357 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
358 spin_unlock(&rdma->sc_rq_dto_lock);
359 svc_xprt_enqueue(&rdma->sc_xprt);
360 return;
361 case IB_WC_WR_FLUSH_ERR:
362 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid);
363 break;
364 default:
365 trace_svcrdma_wc_read_err(wc, &cc->cc_cid);
366 }
367
368 /* The RDMA Read has flushed, so the incoming RPC message
369 * cannot be constructed and must be dropped. Signal the
370 * loss to the client by closing the connection.
371 */
372 svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE);
373 svc_rdma_recv_ctxt_put(rdma, ctxt);
374 svc_xprt_deferred_close(&rdma->sc_xprt);
375 }
376
377 /*
378 * Assumptions:
379 * - If ib_post_send() succeeds, only one completion is expected,
380 * even if one or more WRs are flushed. This is true when posting
381 * an rdma_rw_ctx or when posting a single signaled WR.
382 */
svc_rdma_post_chunk_ctxt(struct svcxprt_rdma * rdma,struct svc_rdma_chunk_ctxt * cc)383 static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma,
384 struct svc_rdma_chunk_ctxt *cc)
385 {
386 struct ib_send_wr *first_wr;
387 const struct ib_send_wr *bad_wr;
388 struct list_head *tmp;
389 struct ib_cqe *cqe;
390 int ret;
391
392 might_sleep();
393
394 if (cc->cc_sqecount > rdma->sc_sq_depth)
395 return -EINVAL;
396
397 first_wr = NULL;
398 cqe = &cc->cc_cqe;
399 list_for_each(tmp, &cc->cc_rwctxts) {
400 struct svc_rdma_rw_ctxt *ctxt;
401
402 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
403 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
404 rdma->sc_port_num, cqe, first_wr);
405 cqe = NULL;
406 }
407
408 do {
409 if (atomic_sub_return(cc->cc_sqecount,
410 &rdma->sc_sq_avail) > 0) {
411 cc->cc_posttime = ktime_get();
412 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
413 if (ret)
414 break;
415 return 0;
416 }
417
418 percpu_counter_inc(&svcrdma_stat_sq_starve);
419 trace_svcrdma_sq_full(rdma, &cc->cc_cid);
420 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
421 wait_event(rdma->sc_send_wait,
422 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
423 trace_svcrdma_sq_retry(rdma, &cc->cc_cid);
424 } while (1);
425
426 trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret);
427 svc_xprt_deferred_close(&rdma->sc_xprt);
428
429 /* If even one was posted, there will be a completion. */
430 if (bad_wr != first_wr)
431 return 0;
432
433 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
434 wake_up(&rdma->sc_send_wait);
435 return -ENOTCONN;
436 }
437
438 /* Build a bvec that covers one kvec in an xdr_buf.
439 */
svc_rdma_vec_to_bvec(struct svc_rdma_write_info * info,unsigned int len,struct svc_rdma_rw_ctxt * ctxt)440 static void svc_rdma_vec_to_bvec(struct svc_rdma_write_info *info,
441 unsigned int len,
442 struct svc_rdma_rw_ctxt *ctxt)
443 {
444 bvec_set_virt(&ctxt->rw_bvec[0], info->wi_base, len);
445 info->wi_base += len;
446
447 ctxt->rw_nents = 1;
448 }
449
450 /* Build a bvec array that covers part of an xdr_buf's pagelist.
451 */
svc_rdma_pagelist_to_bvec(struct svc_rdma_write_info * info,unsigned int remaining,struct svc_rdma_rw_ctxt * ctxt)452 static void svc_rdma_pagelist_to_bvec(struct svc_rdma_write_info *info,
453 unsigned int remaining,
454 struct svc_rdma_rw_ctxt *ctxt)
455 {
456 unsigned int bvec_idx, bvec_len, page_off, page_no;
457 const struct xdr_buf *xdr = info->wi_xdr;
458 struct page **page;
459
460 page_off = info->wi_next_off + xdr->page_base;
461 page_no = page_off >> PAGE_SHIFT;
462 page_off = offset_in_page(page_off);
463 page = xdr->pages + page_no;
464 info->wi_next_off += remaining;
465 bvec_idx = 0;
466 do {
467 bvec_len = min_t(unsigned int, remaining,
468 PAGE_SIZE - page_off);
469 bvec_set_page(&ctxt->rw_bvec[bvec_idx], *page, bvec_len,
470 page_off);
471 remaining -= bvec_len;
472 page_off = 0;
473 bvec_idx++;
474 page++;
475 } while (remaining);
476
477 ctxt->rw_nents = bvec_idx;
478 }
479
480 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
481 * an RPC Reply.
482 */
483 static int
svc_rdma_build_writes(struct svc_rdma_write_info * info,void (* constructor)(struct svc_rdma_write_info * info,unsigned int len,struct svc_rdma_rw_ctxt * ctxt),unsigned int remaining)484 svc_rdma_build_writes(struct svc_rdma_write_info *info,
485 void (*constructor)(struct svc_rdma_write_info *info,
486 unsigned int len,
487 struct svc_rdma_rw_ctxt *ctxt),
488 unsigned int remaining)
489 {
490 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
491 struct svcxprt_rdma *rdma = info->wi_rdma;
492 const struct svc_rdma_segment *seg;
493 struct svc_rdma_rw_ctxt *ctxt;
494 int ret;
495
496 do {
497 unsigned int write_len;
498 u64 offset;
499
500 if (info->wi_seg_no >= info->wi_chunk->ch_segcount)
501 goto out_overflow;
502
503 seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
504 write_len = min(remaining, seg->rs_length - info->wi_seg_off);
505 if (!write_len)
506 goto out_overflow;
507 ctxt = svc_rdma_get_rw_ctxt(rdma,
508 (write_len >> PAGE_SHIFT) + 2);
509 if (!ctxt)
510 return -ENOMEM;
511
512 constructor(info, write_len, ctxt);
513 offset = seg->rs_offset + info->wi_seg_off;
514 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
515 write_len, DMA_TO_DEVICE);
516 if (ret < 0)
517 return -EIO;
518 percpu_counter_inc(&svcrdma_stat_write);
519
520 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
521 cc->cc_sqecount += ret;
522 if (write_len == seg->rs_length - info->wi_seg_off) {
523 info->wi_seg_no++;
524 info->wi_seg_off = 0;
525 } else {
526 info->wi_seg_off += write_len;
527 }
528 remaining -= write_len;
529 } while (remaining);
530
531 return 0;
532
533 out_overflow:
534 trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no,
535 info->wi_chunk->ch_segcount);
536 return -E2BIG;
537 }
538
539 /**
540 * svc_rdma_iov_write - Construct RDMA Writes from an iov
541 * @info: pointer to write arguments
542 * @iov: kvec to write
543 *
544 * Returns:
545 * On success, returns zero
546 * %-E2BIG if the client-provided Write chunk is too small
547 * %-ENOMEM if a resource has been exhausted
548 * %-EIO if an rdma-rw error occurred
549 */
svc_rdma_iov_write(struct svc_rdma_write_info * info,const struct kvec * iov)550 static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
551 const struct kvec *iov)
552 {
553 info->wi_base = iov->iov_base;
554 return svc_rdma_build_writes(info, svc_rdma_vec_to_bvec,
555 iov->iov_len);
556 }
557
558 /**
559 * svc_rdma_pages_write - Construct RDMA Writes from pages
560 * @info: pointer to write arguments
561 * @xdr: xdr_buf with pages to write
562 * @offset: offset into the content of @xdr
563 * @length: number of bytes to write
564 *
565 * Returns:
566 * On success, returns zero
567 * %-E2BIG if the client-provided Write chunk is too small
568 * %-ENOMEM if a resource has been exhausted
569 * %-EIO if an rdma-rw error occurred
570 */
svc_rdma_pages_write(struct svc_rdma_write_info * info,const struct xdr_buf * xdr,unsigned int offset,unsigned long length)571 static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
572 const struct xdr_buf *xdr,
573 unsigned int offset,
574 unsigned long length)
575 {
576 info->wi_xdr = xdr;
577 info->wi_next_off = offset - xdr->head[0].iov_len;
578 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_bvec,
579 length);
580 }
581
582 /**
583 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
584 * @xdr: xdr_buf to write
585 * @data: pointer to write arguments
586 *
587 * Returns:
588 * On success, returns zero
589 * %-E2BIG if the client-provided Write chunk is too small
590 * %-ENOMEM if a resource has been exhausted
591 * %-EIO if an rdma-rw error occurred
592 */
svc_rdma_xb_write(const struct xdr_buf * xdr,void * data)593 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
594 {
595 struct svc_rdma_write_info *info = data;
596 int ret;
597
598 if (xdr->head[0].iov_len) {
599 ret = svc_rdma_iov_write(info, &xdr->head[0]);
600 if (ret < 0)
601 return ret;
602 }
603
604 if (xdr->page_len) {
605 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
606 xdr->page_len);
607 if (ret < 0)
608 return ret;
609 }
610
611 if (xdr->tail[0].iov_len) {
612 ret = svc_rdma_iov_write(info, &xdr->tail[0]);
613 if (ret < 0)
614 return ret;
615 }
616
617 return xdr->len;
618 }
619
svc_rdma_send_write_chunk(struct svcxprt_rdma * rdma,const struct svc_rdma_chunk * chunk,const struct xdr_buf * xdr)620 static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
621 const struct svc_rdma_chunk *chunk,
622 const struct xdr_buf *xdr)
623 {
624 struct svc_rdma_write_info *info;
625 struct svc_rdma_chunk_ctxt *cc;
626 struct xdr_buf payload;
627 int ret;
628
629 if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position,
630 chunk->ch_payload_length))
631 return -EMSGSIZE;
632
633 info = svc_rdma_write_info_alloc(rdma, chunk);
634 if (!info)
635 return -ENOMEM;
636 cc = &info->wi_cc;
637
638 ret = svc_rdma_xb_write(&payload, info);
639 if (ret != payload.len)
640 goto out_err;
641
642 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
643 ret = svc_rdma_post_chunk_ctxt(rdma, cc);
644 if (ret < 0)
645 goto out_err;
646 return 0;
647
648 out_err:
649 svc_rdma_write_info_free(info);
650 return ret;
651 }
652
653 /**
654 * svc_rdma_send_write_list - Send all chunks on the Write list
655 * @rdma: controlling RDMA transport
656 * @rctxt: Write list provisioned by the client
657 * @xdr: xdr_buf containing an RPC Reply message
658 *
659 * Returns zero on success, or a negative errno if one or more
660 * Write chunks could not be sent.
661 */
svc_rdma_send_write_list(struct svcxprt_rdma * rdma,const struct svc_rdma_recv_ctxt * rctxt,const struct xdr_buf * xdr)662 int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
663 const struct svc_rdma_recv_ctxt *rctxt,
664 const struct xdr_buf *xdr)
665 {
666 struct svc_rdma_chunk *chunk;
667 int ret;
668
669 pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
670 if (!chunk->ch_payload_length)
671 break;
672 ret = svc_rdma_send_write_chunk(rdma, chunk, xdr);
673 if (ret < 0)
674 return ret;
675 }
676 return 0;
677 }
678
679 /**
680 * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk
681 * @rdma: controlling RDMA transport
682 * @write_pcl: Write chunk list provided by client
683 * @reply_pcl: Reply chunk provided by client
684 * @sctxt: Send WR resources
685 * @xdr: xdr_buf containing an RPC Reply
686 *
687 * Returns a non-negative number of bytes the chunk consumed, or
688 * %-E2BIG if the payload was larger than the Reply chunk,
689 * %-EINVAL if client provided too many segments,
690 * %-ENOMEM if rdma_rw context pool was exhausted,
691 * %-ENOTCONN if posting failed (connection is lost),
692 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
693 */
svc_rdma_prepare_reply_chunk(struct svcxprt_rdma * rdma,const struct svc_rdma_pcl * write_pcl,const struct svc_rdma_pcl * reply_pcl,struct svc_rdma_send_ctxt * sctxt,const struct xdr_buf * xdr)694 int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
695 const struct svc_rdma_pcl *write_pcl,
696 const struct svc_rdma_pcl *reply_pcl,
697 struct svc_rdma_send_ctxt *sctxt,
698 const struct xdr_buf *xdr)
699 {
700 struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
701 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
702 struct ib_send_wr *first_wr;
703 struct list_head *pos;
704 struct ib_cqe *cqe;
705 int ret;
706
707 info->wi_rdma = rdma;
708 info->wi_chunk = pcl_first_chunk(reply_pcl);
709 info->wi_seg_off = 0;
710 info->wi_seg_no = 0;
711 info->wi_cc.cc_cqe.done = svc_rdma_reply_done;
712
713 ret = pcl_process_nonpayloads(write_pcl, xdr,
714 svc_rdma_xb_write, info);
715 if (ret < 0)
716 return ret;
717
718 first_wr = sctxt->sc_wr_chain;
719 cqe = &cc->cc_cqe;
720 list_for_each(pos, &cc->cc_rwctxts) {
721 struct svc_rdma_rw_ctxt *rwc;
722
723 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
724 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
725 rdma->sc_port_num, cqe, first_wr);
726 cqe = NULL;
727 }
728 sctxt->sc_wr_chain = first_wr;
729 sctxt->sc_sqecount += cc->cc_sqecount;
730
731 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
732 return xdr->len;
733 }
734
735 /**
736 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
737 * @rqstp: RPC transaction context
738 * @head: context for ongoing I/O
739 * @segment: co-ordinates of remote memory to be read
740 *
741 * Returns:
742 * %0: the Read WR chain was constructed successfully
743 * %-EINVAL: there were not enough rq_pages to finish
744 * %-ENOMEM: allocating a local resources failed
745 * %-EIO: a DMA mapping error occurred
746 */
svc_rdma_build_read_segment(struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * head,const struct svc_rdma_segment * segment)747 static int svc_rdma_build_read_segment(struct svc_rqst *rqstp,
748 struct svc_rdma_recv_ctxt *head,
749 const struct svc_rdma_segment *segment)
750 {
751 struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp);
752 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
753 unsigned int bvec_idx, nr_bvec, seg_len, len, total;
754 struct svc_rdma_rw_ctxt *ctxt;
755 int ret;
756
757 len = segment->rs_length;
758 if (check_add_overflow(head->rc_pageoff, len, &total))
759 return -EINVAL;
760 nr_bvec = PAGE_ALIGN(total) >> PAGE_SHIFT;
761 ctxt = svc_rdma_get_rw_ctxt(rdma, nr_bvec);
762 if (!ctxt)
763 return -ENOMEM;
764 ctxt->rw_nents = nr_bvec;
765
766 for (bvec_idx = 0; bvec_idx < ctxt->rw_nents; bvec_idx++) {
767 seg_len = min_t(unsigned int, len,
768 PAGE_SIZE - head->rc_pageoff);
769
770 if (!head->rc_pageoff)
771 head->rc_page_count++;
772
773 bvec_set_page(&ctxt->rw_bvec[bvec_idx],
774 rqstp->rq_pages[head->rc_curpage],
775 seg_len, head->rc_pageoff);
776
777 head->rc_pageoff += seg_len;
778 if (head->rc_pageoff == PAGE_SIZE) {
779 head->rc_curpage++;
780 head->rc_pageoff = 0;
781 }
782 len -= seg_len;
783
784 if (len && ((head->rc_curpage + 1) > rqstp->rq_maxpages))
785 goto out_overrun;
786 }
787
788 ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset,
789 segment->rs_handle, segment->rs_length,
790 DMA_FROM_DEVICE);
791 if (ret < 0)
792 return -EIO;
793 percpu_counter_inc(&svcrdma_stat_read);
794
795 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
796 cc->cc_sqecount += ret;
797 return 0;
798
799 out_overrun:
800 trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage);
801 return -EINVAL;
802 }
803
804 /**
805 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
806 * @rqstp: RPC transaction context
807 * @head: context for ongoing I/O
808 * @chunk: Read chunk to pull
809 *
810 * Return values:
811 * %0: the Read WR chain was constructed successfully
812 * %-EINVAL: there were not enough resources to finish
813 * %-ENOMEM: allocating a local resources failed
814 * %-EIO: a DMA mapping error occurred
815 */
svc_rdma_build_read_chunk(struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * head,const struct svc_rdma_chunk * chunk)816 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
817 struct svc_rdma_recv_ctxt *head,
818 const struct svc_rdma_chunk *chunk)
819 {
820 const struct svc_rdma_segment *segment;
821 int ret;
822
823 ret = -EINVAL;
824 pcl_for_each_segment(segment, chunk) {
825 ret = svc_rdma_build_read_segment(rqstp, head, segment);
826 if (ret < 0)
827 break;
828 head->rc_readbytes += segment->rs_length;
829 }
830 return ret;
831 }
832
833 /**
834 * svc_rdma_copy_inline_range - Copy part of the inline content into pages
835 * @rqstp: RPC transaction context
836 * @head: context for ongoing I/O
837 * @offset: offset into the Receive buffer of region to copy
838 * @remaining: length of region to copy
839 *
840 * Take a page at a time from rqstp->rq_pages and copy the inline
841 * content from the Receive buffer into that page. Update
842 * head->rc_curpage and head->rc_pageoff so that the next RDMA Read
843 * result will land contiguously with the copied content.
844 *
845 * Return values:
846 * %0: Inline content was successfully copied
847 * %-EINVAL: offset or length was incorrect
848 */
svc_rdma_copy_inline_range(struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * head,unsigned int offset,unsigned int remaining)849 static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp,
850 struct svc_rdma_recv_ctxt *head,
851 unsigned int offset,
852 unsigned int remaining)
853 {
854 unsigned char *dst, *src = head->rc_recv_buf;
855 unsigned int page_no, numpages;
856
857 numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT;
858 for (page_no = 0; page_no < numpages; page_no++) {
859 unsigned int page_len;
860
861 if (head->rc_curpage >= rqstp->rq_maxpages)
862 return -EINVAL;
863
864 page_len = min_t(unsigned int, remaining,
865 PAGE_SIZE - head->rc_pageoff);
866
867 if (!head->rc_pageoff)
868 head->rc_page_count++;
869
870 dst = page_address(rqstp->rq_pages[head->rc_curpage]);
871 memcpy((unsigned char *)dst + head->rc_pageoff, src + offset, page_len);
872
873 head->rc_readbytes += page_len;
874 head->rc_pageoff += page_len;
875 if (head->rc_pageoff == PAGE_SIZE) {
876 head->rc_curpage++;
877 head->rc_pageoff = 0;
878 }
879 remaining -= page_len;
880 offset += page_len;
881 }
882
883 return 0;
884 }
885
886 /**
887 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
888 * @rqstp: RPC transaction context
889 * @head: context for ongoing I/O
890 *
891 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
892 * like an incoming TCP call.
893 *
894 * Return values:
895 * %0: RDMA Read WQEs were successfully built
896 * %-EINVAL: client provided too many chunks or segments,
897 * %-ENOMEM: rdma_rw context pool was exhausted,
898 * %-ENOTCONN: posting failed (connection is lost),
899 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
900 */
901 static noinline int
svc_rdma_read_multiple_chunks(struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * head)902 svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
903 struct svc_rdma_recv_ctxt *head)
904 {
905 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
906 struct svc_rdma_chunk *chunk, *next;
907 unsigned int start, length;
908 int ret;
909
910 start = 0;
911 chunk = pcl_first_chunk(pcl);
912 length = chunk->ch_position;
913 ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
914 if (ret < 0)
915 return ret;
916
917 pcl_for_each_chunk(chunk, pcl) {
918 ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
919 if (ret < 0)
920 return ret;
921
922 next = pcl_next_chunk(pcl, chunk);
923 if (!next)
924 break;
925
926 start += length;
927 length = next->ch_position - head->rc_readbytes;
928 ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
929 if (ret < 0)
930 return ret;
931 }
932
933 start += length;
934 length = head->rc_byte_len - start;
935 return svc_rdma_copy_inline_range(rqstp, head, start, length);
936 }
937
938 /**
939 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
940 * @rqstp: RPC transaction context
941 * @head: context for ongoing I/O
942 *
943 * The chunk data lands in the page list of rqstp->rq_arg.pages.
944 *
945 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec.
946 * Therefore, XDR round-up of the Read chunk and trailing
947 * inline content must both be added at the end of the pagelist.
948 *
949 * Return values:
950 * %0: RDMA Read WQEs were successfully built
951 * %-EINVAL: client provided too many chunks or segments,
952 * %-ENOMEM: rdma_rw context pool was exhausted,
953 * %-ENOTCONN: posting failed (connection is lost),
954 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
955 */
svc_rdma_read_data_item(struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * head)956 static int svc_rdma_read_data_item(struct svc_rqst *rqstp,
957 struct svc_rdma_recv_ctxt *head)
958 {
959 return svc_rdma_build_read_chunk(rqstp, head,
960 pcl_first_chunk(&head->rc_read_pcl));
961 }
962
963 /**
964 * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk
965 * @rqstp: RPC transaction context
966 * @head: context for ongoing I/O
967 * @chunk: parsed Call chunk to pull
968 * @offset: offset of region to pull
969 * @length: length of region to pull
970 *
971 * Return values:
972 * %0: RDMA Read WQEs were successfully built
973 * %-EINVAL: there were not enough resources to finish
974 * %-ENOMEM: rdma_rw context pool was exhausted,
975 * %-ENOTCONN: posting failed (connection is lost),
976 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
977 */
svc_rdma_read_chunk_range(struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * head,const struct svc_rdma_chunk * chunk,unsigned int offset,unsigned int length)978 static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp,
979 struct svc_rdma_recv_ctxt *head,
980 const struct svc_rdma_chunk *chunk,
981 unsigned int offset, unsigned int length)
982 {
983 const struct svc_rdma_segment *segment;
984 int ret;
985
986 ret = -EINVAL;
987 pcl_for_each_segment(segment, chunk) {
988 struct svc_rdma_segment dummy;
989
990 if (offset > segment->rs_length) {
991 offset -= segment->rs_length;
992 continue;
993 }
994
995 dummy.rs_handle = segment->rs_handle;
996 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
997 dummy.rs_offset = segment->rs_offset + offset;
998
999 ret = svc_rdma_build_read_segment(rqstp, head, &dummy);
1000 if (ret < 0)
1001 break;
1002
1003 head->rc_readbytes += dummy.rs_length;
1004 length -= dummy.rs_length;
1005 offset = 0;
1006 }
1007 return ret;
1008 }
1009
1010 /**
1011 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
1012 * @rqstp: RPC transaction context
1013 * @head: context for ongoing I/O
1014 *
1015 * Return values:
1016 * %0: RDMA Read WQEs were successfully built
1017 * %-EINVAL: there were not enough resources to finish
1018 * %-ENOMEM: rdma_rw context pool was exhausted,
1019 * %-ENOTCONN: posting failed (connection is lost),
1020 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1021 */
svc_rdma_read_call_chunk(struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * head)1022 static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp,
1023 struct svc_rdma_recv_ctxt *head)
1024 {
1025 const struct svc_rdma_chunk *call_chunk =
1026 pcl_first_chunk(&head->rc_call_pcl);
1027 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
1028 struct svc_rdma_chunk *chunk, *next;
1029 unsigned int start, length;
1030 int ret;
1031
1032 if (pcl_is_empty(pcl))
1033 return svc_rdma_build_read_chunk(rqstp, head, call_chunk);
1034
1035 start = 0;
1036 chunk = pcl_first_chunk(pcl);
1037 length = chunk->ch_position;
1038 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1039 start, length);
1040 if (ret < 0)
1041 return ret;
1042
1043 pcl_for_each_chunk(chunk, pcl) {
1044 ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
1045 if (ret < 0)
1046 return ret;
1047
1048 next = pcl_next_chunk(pcl, chunk);
1049 if (!next)
1050 break;
1051
1052 start += length;
1053 length = next->ch_position - head->rc_readbytes;
1054 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1055 start, length);
1056 if (ret < 0)
1057 return ret;
1058 }
1059
1060 start += length;
1061 length = call_chunk->ch_length - start;
1062 return svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1063 start, length);
1064 }
1065
1066 /**
1067 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1068 * @rqstp: RPC transaction context
1069 * @head: context for ongoing I/O
1070 *
1071 * The start of the data lands in the first page just after the
1072 * Transport header, and the rest lands in rqstp->rq_arg.pages.
1073 *
1074 * Assumptions:
1075 * - A PZRC is never sent in an RDMA_MSG message, though it's
1076 * allowed by spec.
1077 *
1078 * Return values:
1079 * %0: RDMA Read WQEs were successfully built
1080 * %-EINVAL: client provided too many chunks or segments,
1081 * %-ENOMEM: rdma_rw context pool was exhausted,
1082 * %-ENOTCONN: posting failed (connection is lost),
1083 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1084 */
svc_rdma_read_special(struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * head)1085 static noinline int svc_rdma_read_special(struct svc_rqst *rqstp,
1086 struct svc_rdma_recv_ctxt *head)
1087 {
1088 return svc_rdma_read_call_chunk(rqstp, head);
1089 }
1090
1091 /* Pages under I/O have been copied to head->rc_pages. Ensure that
1092 * svc_xprt_release() does not put them when svc_rdma_recvfrom()
1093 * returns. This has to be done after all Read WRs are constructed
1094 * to properly handle a page that happens to be part of I/O on behalf
1095 * of two different RDMA segments.
1096 *
1097 * Note: if the subsequent post_send fails, these pages have already
1098 * been moved to head->rc_pages and thus will be cleaned up by
1099 * svc_rdma_recv_ctxt_put().
1100 */
svc_rdma_clear_rqst_pages(struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * head)1101 static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp,
1102 struct svc_rdma_recv_ctxt *head)
1103 {
1104 unsigned int i;
1105
1106 for (i = 0; i < head->rc_page_count; i++) {
1107 head->rc_pages[i] = rqstp->rq_pages[i];
1108 rqstp->rq_pages[i] = NULL;
1109 }
1110 }
1111
1112 /**
1113 * svc_rdma_process_read_list - Pull list of Read chunks from the client
1114 * @rdma: controlling RDMA transport
1115 * @rqstp: set of pages to use as Read sink buffers
1116 * @head: pages under I/O collect here
1117 *
1118 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1119 * pull each Read chunk as they decode an incoming RPC message.
1120 *
1121 * On Linux, however, the server needs to have a fully-constructed RPC
1122 * message in rqstp->rq_arg when there is a positive return code from
1123 * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1124 * it is received, then here the whole Read list is pulled all at once.
1125 * The ingress RPC message is fully reconstructed once all associated
1126 * RDMA Reads have completed.
1127 *
1128 * Return values:
1129 * %1: all needed RDMA Reads were posted successfully,
1130 * %-EINVAL: client provided too many chunks or segments,
1131 * %-ENOMEM: rdma_rw context pool was exhausted,
1132 * %-ENOTCONN: posting failed (connection is lost),
1133 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1134 */
svc_rdma_process_read_list(struct svcxprt_rdma * rdma,struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * head)1135 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
1136 struct svc_rqst *rqstp,
1137 struct svc_rdma_recv_ctxt *head)
1138 {
1139 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
1140 int ret;
1141
1142 cc->cc_cqe.done = svc_rdma_wc_read_done;
1143 cc->cc_sqecount = 0;
1144 head->rc_pageoff = 0;
1145 head->rc_curpage = 0;
1146 head->rc_readbytes = 0;
1147
1148 if (pcl_is_empty(&head->rc_call_pcl)) {
1149 if (head->rc_read_pcl.cl_count == 1)
1150 ret = svc_rdma_read_data_item(rqstp, head);
1151 else
1152 ret = svc_rdma_read_multiple_chunks(rqstp, head);
1153 } else
1154 ret = svc_rdma_read_special(rqstp, head);
1155 svc_rdma_clear_rqst_pages(rqstp, head);
1156 if (ret < 0)
1157 return ret;
1158
1159 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
1160 ret = svc_rdma_post_chunk_ctxt(rdma, cc);
1161 return ret < 0 ? ret : 1;
1162 }
1163