xref: /linux/net/sunrpc/xprtrdma/verbs.c (revision 170aafe35cb98e0f3fbacb446ea86389fbce22ea)
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the BSD-type
10  * license below:
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  *
16  *      Redistributions of source code must retain the above copyright
17  *      notice, this list of conditions and the following disclaimer.
18  *
19  *      Redistributions in binary form must reproduce the above
20  *      copyright notice, this list of conditions and the following
21  *      disclaimer in the documentation and/or other materials provided
22  *      with the distribution.
23  *
24  *      Neither the name of the Network Appliance, Inc. nor the names of
25  *      its contributors may be used to endorse or promote products
26  *      derived from this software without specific prior written
27  *      permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  */
41 
42 /*
43  * verbs.c
44  *
45  * Encapsulates the major functions managing:
46  *  o adapters
47  *  o endpoints
48  *  o connections
49  *  o buffer memory
50  */
51 
52 #include <linux/bitops.h>
53 #include <linux/interrupt.h>
54 #include <linux/slab.h>
55 #include <linux/sunrpc/addr.h>
56 #include <linux/sunrpc/svc_rdma.h>
57 #include <linux/log2.h>
58 
59 #include <asm/barrier.h>
60 
61 #include <rdma/ib_cm.h>
62 
63 #include "xprt_rdma.h"
64 #include <trace/events/rpcrdma.h>
65 
66 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
67 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
68 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
69 				       struct rpcrdma_sendctx *sc);
70 static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
71 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
72 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
73 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
74 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
75 static void rpcrdma_ep_get(struct rpcrdma_ep *ep);
76 static int rpcrdma_ep_put(struct rpcrdma_ep *ep);
77 static struct rpcrdma_regbuf *
78 rpcrdma_regbuf_alloc_node(size_t size, enum dma_data_direction direction,
79 			  int node);
80 static struct rpcrdma_regbuf *
81 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction);
82 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
83 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
84 
85 /* Wait for outstanding transport work to finish. ib_drain_qp
86  * handles the drains in the wrong order for us, so open code
87  * them here.
88  */
89 static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
90 {
91 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
92 	struct rdma_cm_id *id = ep->re_id;
93 
94 	/* Wait for rpcrdma_post_recvs() to leave its critical
95 	 * section.
96 	 */
97 	if (atomic_inc_return(&ep->re_receiving) > 1)
98 		wait_for_completion(&ep->re_done);
99 
100 	/* Flush Receives, then wait for deferred Reply work
101 	 * to complete.
102 	 */
103 	ib_drain_rq(id->qp);
104 
105 	/* Deferred Reply processing might have scheduled
106 	 * local invalidations.
107 	 */
108 	ib_drain_sq(id->qp);
109 
110 	rpcrdma_ep_put(ep);
111 }
112 
113 /* Ensure xprt_force_disconnect() is invoked exactly once when a
114  * connection is closed or lost. (The important thing is it needs
115  * to be invoked "at least" once).
116  */
117 void rpcrdma_force_disconnect(struct rpcrdma_ep *ep)
118 {
119 	if (atomic_add_unless(&ep->re_force_disconnect, 1, 1))
120 		xprt_force_disconnect(ep->re_xprt);
121 }
122 
123 /**
124  * rpcrdma_flush_disconnect - Disconnect on flushed completion
125  * @r_xprt: transport to disconnect
126  * @wc: work completion entry
127  *
128  * Must be called in process context.
129  */
130 void rpcrdma_flush_disconnect(struct rpcrdma_xprt *r_xprt, struct ib_wc *wc)
131 {
132 	if (wc->status != IB_WC_SUCCESS)
133 		rpcrdma_force_disconnect(r_xprt->rx_ep);
134 }
135 
136 /**
137  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
138  * @cq:	completion queue
139  * @wc:	WCE for a completed Send WR
140  *
141  */
142 static void rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
143 {
144 	struct ib_cqe *cqe = wc->wr_cqe;
145 	struct rpcrdma_sendctx *sc =
146 		container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
147 	struct rpcrdma_xprt *r_xprt = cq->cq_context;
148 
149 	/* WARNING: Only wr_cqe and status are reliable at this point */
150 	trace_xprtrdma_wc_send(wc, &sc->sc_cid);
151 	rpcrdma_sendctx_put_locked(r_xprt, sc);
152 	rpcrdma_flush_disconnect(r_xprt, wc);
153 }
154 
155 /**
156  * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
157  * @cq:	completion queue
158  * @wc:	WCE for a completed Receive WR
159  *
160  */
161 static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
162 {
163 	struct ib_cqe *cqe = wc->wr_cqe;
164 	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
165 					       rr_cqe);
166 	struct rpcrdma_xprt *r_xprt = cq->cq_context;
167 
168 	/* WARNING: Only wr_cqe and status are reliable at this point */
169 	trace_xprtrdma_wc_receive(wc, &rep->rr_cid);
170 	--r_xprt->rx_ep->re_receive_count;
171 	if (wc->status != IB_WC_SUCCESS)
172 		goto out_flushed;
173 
174 	/* status == SUCCESS means all fields in wc are trustworthy */
175 	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
176 	rep->rr_wc_flags = wc->wc_flags;
177 	rep->rr_inv_rkey = wc->ex.invalidate_rkey;
178 
179 	ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
180 				   rdmab_addr(rep->rr_rdmabuf),
181 				   wc->byte_len, DMA_FROM_DEVICE);
182 
183 	rpcrdma_reply_handler(rep);
184 	return;
185 
186 out_flushed:
187 	rpcrdma_flush_disconnect(r_xprt, wc);
188 	rpcrdma_rep_put(&r_xprt->rx_buf, rep);
189 }
190 
191 static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep,
192 				      struct rdma_conn_param *param)
193 {
194 	const struct rpcrdma_connect_private *pmsg = param->private_data;
195 	unsigned int rsize, wsize;
196 
197 	/* Default settings for RPC-over-RDMA Version One */
198 	rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
199 	wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
200 
201 	if (pmsg &&
202 	    pmsg->cp_magic == rpcrdma_cmp_magic &&
203 	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
204 		rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
205 		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
206 	}
207 
208 	if (rsize < ep->re_inline_recv)
209 		ep->re_inline_recv = rsize;
210 	if (wsize < ep->re_inline_send)
211 		ep->re_inline_send = wsize;
212 
213 	rpcrdma_set_max_header_sizes(ep);
214 }
215 
216 /**
217  * rpcrdma_cm_event_handler - Handle RDMA CM events
218  * @id: rdma_cm_id on which an event has occurred
219  * @event: details of the event
220  *
221  * Called with @id's mutex held. Returns 1 if caller should
222  * destroy @id, otherwise 0.
223  */
224 static int
225 rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
226 {
227 	struct rpcrdma_ep *ep = id->context;
228 
229 	might_sleep();
230 
231 	switch (event->event) {
232 	case RDMA_CM_EVENT_ADDR_RESOLVED:
233 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
234 		ep->re_async_rc = 0;
235 		complete(&ep->re_done);
236 		return 0;
237 	case RDMA_CM_EVENT_ADDR_ERROR:
238 		ep->re_async_rc = -EPROTO;
239 		complete(&ep->re_done);
240 		return 0;
241 	case RDMA_CM_EVENT_ROUTE_ERROR:
242 		ep->re_async_rc = -ENETUNREACH;
243 		complete(&ep->re_done);
244 		return 0;
245 	case RDMA_CM_EVENT_ADDR_CHANGE:
246 		ep->re_connect_status = -ENODEV;
247 		goto disconnected;
248 	case RDMA_CM_EVENT_ESTABLISHED:
249 		rpcrdma_ep_get(ep);
250 		ep->re_connect_status = 1;
251 		rpcrdma_update_cm_private(ep, &event->param.conn);
252 		trace_xprtrdma_inline_thresh(ep);
253 		wake_up_all(&ep->re_connect_wait);
254 		break;
255 	case RDMA_CM_EVENT_CONNECT_ERROR:
256 		ep->re_connect_status = -ENOTCONN;
257 		goto wake_connect_worker;
258 	case RDMA_CM_EVENT_UNREACHABLE:
259 		ep->re_connect_status = -ENETUNREACH;
260 		goto wake_connect_worker;
261 	case RDMA_CM_EVENT_REJECTED:
262 		ep->re_connect_status = -ECONNREFUSED;
263 		if (event->status == IB_CM_REJ_STALE_CONN)
264 			ep->re_connect_status = -ENOTCONN;
265 wake_connect_worker:
266 		wake_up_all(&ep->re_connect_wait);
267 		return 0;
268 	case RDMA_CM_EVENT_DISCONNECTED:
269 		ep->re_connect_status = -ECONNABORTED;
270 disconnected:
271 		rpcrdma_force_disconnect(ep);
272 		return rpcrdma_ep_put(ep);
273 	default:
274 		break;
275 	}
276 
277 	return 0;
278 }
279 
280 static void rpcrdma_ep_removal_done(struct rpcrdma_notification *rn)
281 {
282 	struct rpcrdma_ep *ep = container_of(rn, struct rpcrdma_ep, re_rn);
283 
284 	trace_xprtrdma_device_removal(ep->re_id);
285 	xprt_force_disconnect(ep->re_xprt);
286 }
287 
288 static struct rdma_cm_id *rpcrdma_create_id(struct rpcrdma_xprt *r_xprt,
289 					    struct rpcrdma_ep *ep)
290 {
291 	unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
292 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
293 	struct rdma_cm_id *id;
294 	int rc;
295 
296 	init_completion(&ep->re_done);
297 
298 	id = rdma_create_id(xprt->xprt_net, rpcrdma_cm_event_handler, ep,
299 			    RDMA_PS_TCP, IB_QPT_RC);
300 	if (IS_ERR(id))
301 		return id;
302 
303 	ep->re_async_rc = -ETIMEDOUT;
304 	rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)&xprt->addr,
305 			       RDMA_RESOLVE_TIMEOUT);
306 	if (rc)
307 		goto out;
308 	rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
309 	if (rc < 0)
310 		goto out;
311 
312 	rc = ep->re_async_rc;
313 	if (rc)
314 		goto out;
315 
316 	ep->re_async_rc = -ETIMEDOUT;
317 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
318 	if (rc)
319 		goto out;
320 	rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
321 	if (rc < 0)
322 		goto out;
323 	rc = ep->re_async_rc;
324 	if (rc)
325 		goto out;
326 
327 	rc = rpcrdma_rn_register(id->device, &ep->re_rn, rpcrdma_ep_removal_done);
328 	if (rc)
329 		goto out;
330 
331 	return id;
332 
333 out:
334 	rdma_destroy_id(id);
335 	return ERR_PTR(rc);
336 }
337 
338 static void rpcrdma_ep_destroy(struct kref *kref)
339 {
340 	struct rpcrdma_ep *ep = container_of(kref, struct rpcrdma_ep, re_kref);
341 
342 	if (ep->re_id->qp) {
343 		rdma_destroy_qp(ep->re_id);
344 		ep->re_id->qp = NULL;
345 	}
346 
347 	if (ep->re_attr.recv_cq)
348 		ib_free_cq(ep->re_attr.recv_cq);
349 	ep->re_attr.recv_cq = NULL;
350 	if (ep->re_attr.send_cq)
351 		ib_free_cq(ep->re_attr.send_cq);
352 	ep->re_attr.send_cq = NULL;
353 
354 	if (ep->re_pd)
355 		ib_dealloc_pd(ep->re_pd);
356 	ep->re_pd = NULL;
357 
358 	rpcrdma_rn_unregister(ep->re_id->device, &ep->re_rn);
359 
360 	kfree(ep);
361 	module_put(THIS_MODULE);
362 }
363 
364 static noinline void rpcrdma_ep_get(struct rpcrdma_ep *ep)
365 {
366 	kref_get(&ep->re_kref);
367 }
368 
369 /* Returns:
370  *     %0 if @ep still has a positive kref count, or
371  *     %1 if @ep was destroyed successfully.
372  */
373 static noinline int rpcrdma_ep_put(struct rpcrdma_ep *ep)
374 {
375 	return kref_put(&ep->re_kref, rpcrdma_ep_destroy);
376 }
377 
378 static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
379 {
380 	struct rpcrdma_connect_private *pmsg;
381 	struct ib_device *device;
382 	struct rdma_cm_id *id;
383 	struct rpcrdma_ep *ep;
384 	int rc;
385 
386 	ep = kzalloc(sizeof(*ep), XPRTRDMA_GFP_FLAGS);
387 	if (!ep)
388 		return -ENOTCONN;
389 	ep->re_xprt = &r_xprt->rx_xprt;
390 	kref_init(&ep->re_kref);
391 
392 	id = rpcrdma_create_id(r_xprt, ep);
393 	if (IS_ERR(id)) {
394 		kfree(ep);
395 		return PTR_ERR(id);
396 	}
397 	__module_get(THIS_MODULE);
398 	device = id->device;
399 	ep->re_id = id;
400 	reinit_completion(&ep->re_done);
401 
402 	ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
403 	ep->re_inline_send = xprt_rdma_max_inline_write;
404 	ep->re_inline_recv = xprt_rdma_max_inline_read;
405 	rc = frwr_query_device(ep, device);
406 	if (rc)
407 		goto out_destroy;
408 
409 	r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests);
410 
411 	ep->re_attr.srq = NULL;
412 	ep->re_attr.cap.max_inline_data = 0;
413 	ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
414 	ep->re_attr.qp_type = IB_QPT_RC;
415 	ep->re_attr.port_num = ~0;
416 
417 	ep->re_send_batch = ep->re_max_requests >> 3;
418 	ep->re_send_count = ep->re_send_batch;
419 	init_waitqueue_head(&ep->re_connect_wait);
420 
421 	ep->re_attr.send_cq = ib_alloc_cq_any(device, r_xprt,
422 					      ep->re_attr.cap.max_send_wr,
423 					      IB_POLL_WORKQUEUE);
424 	if (IS_ERR(ep->re_attr.send_cq)) {
425 		rc = PTR_ERR(ep->re_attr.send_cq);
426 		ep->re_attr.send_cq = NULL;
427 		goto out_destroy;
428 	}
429 
430 	ep->re_attr.recv_cq = ib_alloc_cq_any(device, r_xprt,
431 					      ep->re_attr.cap.max_recv_wr,
432 					      IB_POLL_WORKQUEUE);
433 	if (IS_ERR(ep->re_attr.recv_cq)) {
434 		rc = PTR_ERR(ep->re_attr.recv_cq);
435 		ep->re_attr.recv_cq = NULL;
436 		goto out_destroy;
437 	}
438 	ep->re_receive_count = 0;
439 
440 	/* Initialize cma parameters */
441 	memset(&ep->re_remote_cma, 0, sizeof(ep->re_remote_cma));
442 
443 	/* Prepare RDMA-CM private message */
444 	pmsg = &ep->re_cm_private;
445 	pmsg->cp_magic = rpcrdma_cmp_magic;
446 	pmsg->cp_version = RPCRDMA_CMP_VERSION;
447 	pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
448 	pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->re_inline_send);
449 	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->re_inline_recv);
450 	ep->re_remote_cma.private_data = pmsg;
451 	ep->re_remote_cma.private_data_len = sizeof(*pmsg);
452 
453 	/* Client offers RDMA Read but does not initiate */
454 	ep->re_remote_cma.initiator_depth = 0;
455 	ep->re_remote_cma.responder_resources =
456 		min_t(int, U8_MAX, device->attrs.max_qp_rd_atom);
457 
458 	/* Limit transport retries so client can detect server
459 	 * GID changes quickly. RPC layer handles re-establishing
460 	 * transport connection and retransmission.
461 	 */
462 	ep->re_remote_cma.retry_count = 6;
463 
464 	/* RPC-over-RDMA handles its own flow control. In addition,
465 	 * make all RNR NAKs visible so we know that RPC-over-RDMA
466 	 * flow control is working correctly (no NAKs should be seen).
467 	 */
468 	ep->re_remote_cma.flow_control = 0;
469 	ep->re_remote_cma.rnr_retry_count = 0;
470 
471 	ep->re_pd = ib_alloc_pd(device, 0);
472 	if (IS_ERR(ep->re_pd)) {
473 		rc = PTR_ERR(ep->re_pd);
474 		ep->re_pd = NULL;
475 		goto out_destroy;
476 	}
477 
478 	rc = rdma_create_qp(id, ep->re_pd, &ep->re_attr);
479 	if (rc)
480 		goto out_destroy;
481 
482 	r_xprt->rx_ep = ep;
483 	return 0;
484 
485 out_destroy:
486 	rpcrdma_ep_put(ep);
487 	rdma_destroy_id(id);
488 	return rc;
489 }
490 
491 /**
492  * rpcrdma_xprt_connect - Connect an unconnected transport
493  * @r_xprt: controlling transport instance
494  *
495  * Returns 0 on success or a negative errno.
496  */
497 int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
498 {
499 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
500 	struct rpcrdma_ep *ep;
501 	int rc;
502 
503 	rc = rpcrdma_ep_create(r_xprt);
504 	if (rc)
505 		return rc;
506 	ep = r_xprt->rx_ep;
507 
508 	xprt_clear_connected(xprt);
509 	rpcrdma_reset_cwnd(r_xprt);
510 
511 	/* Bump the ep's reference count while there are
512 	 * outstanding Receives.
513 	 */
514 	rpcrdma_ep_get(ep);
515 	rpcrdma_post_recvs(r_xprt, 1);
516 
517 	rc = rdma_connect(ep->re_id, &ep->re_remote_cma);
518 	if (rc)
519 		goto out;
520 
521 	if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
522 		xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
523 	wait_event_interruptible(ep->re_connect_wait,
524 				 ep->re_connect_status != 0);
525 	if (ep->re_connect_status <= 0) {
526 		rc = ep->re_connect_status;
527 		goto out;
528 	}
529 
530 	rc = rpcrdma_sendctxs_create(r_xprt);
531 	if (rc) {
532 		rc = -ENOTCONN;
533 		goto out;
534 	}
535 
536 	rc = rpcrdma_reqs_setup(r_xprt);
537 	if (rc) {
538 		rc = -ENOTCONN;
539 		goto out;
540 	}
541 	rpcrdma_mrs_create(r_xprt);
542 	frwr_wp_create(r_xprt);
543 
544 out:
545 	trace_xprtrdma_connect(r_xprt, rc);
546 	return rc;
547 }
548 
549 /**
550  * rpcrdma_xprt_disconnect - Disconnect underlying transport
551  * @r_xprt: controlling transport instance
552  *
553  * Caller serializes. Either the transport send lock is held,
554  * or we're being called to destroy the transport.
555  *
556  * On return, @r_xprt is completely divested of all hardware
557  * resources and prepared for the next ->connect operation.
558  */
559 void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt)
560 {
561 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
562 	struct rdma_cm_id *id;
563 	int rc;
564 
565 	if (!ep)
566 		return;
567 
568 	id = ep->re_id;
569 	rc = rdma_disconnect(id);
570 	trace_xprtrdma_disconnect(r_xprt, rc);
571 
572 	rpcrdma_xprt_drain(r_xprt);
573 	rpcrdma_reps_unmap(r_xprt);
574 	rpcrdma_reqs_reset(r_xprt);
575 	rpcrdma_mrs_destroy(r_xprt);
576 	rpcrdma_sendctxs_destroy(r_xprt);
577 
578 	if (rpcrdma_ep_put(ep))
579 		rdma_destroy_id(id);
580 
581 	r_xprt->rx_ep = NULL;
582 }
583 
584 /* Fixed-size circular FIFO queue. This implementation is wait-free and
585  * lock-free.
586  *
587  * Consumer is the code path that posts Sends. This path dequeues a
588  * sendctx for use by a Send operation. Multiple consumer threads
589  * are serialized by the RPC transport lock, which allows only one
590  * ->send_request call at a time.
591  *
592  * Producer is the code path that handles Send completions. This path
593  * enqueues a sendctx that has been completed. Multiple producer
594  * threads are serialized by the ib_poll_cq() function.
595  */
596 
597 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
598  * queue activity, and rpcrdma_xprt_drain has flushed all remaining
599  * Send requests.
600  */
601 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
602 {
603 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
604 	unsigned long i;
605 
606 	if (!buf->rb_sc_ctxs)
607 		return;
608 	for (i = 0; i <= buf->rb_sc_last; i++)
609 		kfree(buf->rb_sc_ctxs[i]);
610 	kfree(buf->rb_sc_ctxs);
611 	buf->rb_sc_ctxs = NULL;
612 }
613 
614 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep)
615 {
616 	struct rpcrdma_sendctx *sc;
617 
618 	sc = kzalloc(struct_size(sc, sc_sges, ep->re_attr.cap.max_send_sge),
619 		     XPRTRDMA_GFP_FLAGS);
620 	if (!sc)
621 		return NULL;
622 
623 	sc->sc_cqe.done = rpcrdma_wc_send;
624 	sc->sc_cid.ci_queue_id = ep->re_attr.send_cq->res.id;
625 	sc->sc_cid.ci_completion_id =
626 		atomic_inc_return(&ep->re_completion_ids);
627 	return sc;
628 }
629 
630 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
631 {
632 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
633 	struct rpcrdma_sendctx *sc;
634 	unsigned long i;
635 
636 	/* Maximum number of concurrent outstanding Send WRs. Capping
637 	 * the circular queue size stops Send Queue overflow by causing
638 	 * the ->send_request call to fail temporarily before too many
639 	 * Sends are posted.
640 	 */
641 	i = r_xprt->rx_ep->re_max_requests + RPCRDMA_MAX_BC_REQUESTS;
642 	buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), XPRTRDMA_GFP_FLAGS);
643 	if (!buf->rb_sc_ctxs)
644 		return -ENOMEM;
645 
646 	buf->rb_sc_last = i - 1;
647 	for (i = 0; i <= buf->rb_sc_last; i++) {
648 		sc = rpcrdma_sendctx_create(r_xprt->rx_ep);
649 		if (!sc)
650 			return -ENOMEM;
651 
652 		buf->rb_sc_ctxs[i] = sc;
653 	}
654 
655 	buf->rb_sc_head = 0;
656 	buf->rb_sc_tail = 0;
657 	return 0;
658 }
659 
660 /* The sendctx queue is not guaranteed to have a size that is a
661  * power of two, thus the helpers in circ_buf.h cannot be used.
662  * The other option is to use modulus (%), which can be expensive.
663  */
664 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
665 					  unsigned long item)
666 {
667 	return likely(item < buf->rb_sc_last) ? item + 1 : 0;
668 }
669 
670 /**
671  * rpcrdma_sendctx_get_locked - Acquire a send context
672  * @r_xprt: controlling transport instance
673  *
674  * Returns pointer to a free send completion context; or NULL if
675  * the queue is empty.
676  *
677  * Usage: Called to acquire an SGE array before preparing a Send WR.
678  *
679  * The caller serializes calls to this function (per transport), and
680  * provides an effective memory barrier that flushes the new value
681  * of rb_sc_head.
682  */
683 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
684 {
685 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
686 	struct rpcrdma_sendctx *sc;
687 	unsigned long next_head;
688 
689 	next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
690 
691 	if (next_head == READ_ONCE(buf->rb_sc_tail))
692 		goto out_emptyq;
693 
694 	/* ORDER: item must be accessed _before_ head is updated */
695 	sc = buf->rb_sc_ctxs[next_head];
696 
697 	/* Releasing the lock in the caller acts as a memory
698 	 * barrier that flushes rb_sc_head.
699 	 */
700 	buf->rb_sc_head = next_head;
701 
702 	return sc;
703 
704 out_emptyq:
705 	/* The queue is "empty" if there have not been enough Send
706 	 * completions recently. This is a sign the Send Queue is
707 	 * backing up. Cause the caller to pause and try again.
708 	 */
709 	xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
710 	r_xprt->rx_stats.empty_sendctx_q++;
711 	return NULL;
712 }
713 
714 /**
715  * rpcrdma_sendctx_put_locked - Release a send context
716  * @r_xprt: controlling transport instance
717  * @sc: send context to release
718  *
719  * Usage: Called from Send completion to return a sendctxt
720  * to the queue.
721  *
722  * The caller serializes calls to this function (per transport).
723  */
724 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
725 				       struct rpcrdma_sendctx *sc)
726 {
727 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
728 	unsigned long next_tail;
729 
730 	/* Unmap SGEs of previously completed but unsignaled
731 	 * Sends by walking up the queue until @sc is found.
732 	 */
733 	next_tail = buf->rb_sc_tail;
734 	do {
735 		next_tail = rpcrdma_sendctx_next(buf, next_tail);
736 
737 		/* ORDER: item must be accessed _before_ tail is updated */
738 		rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
739 
740 	} while (buf->rb_sc_ctxs[next_tail] != sc);
741 
742 	/* Paired with READ_ONCE */
743 	smp_store_release(&buf->rb_sc_tail, next_tail);
744 
745 	xprt_write_space(&r_xprt->rx_xprt);
746 }
747 
748 static void
749 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
750 {
751 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
752 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
753 	struct ib_device *device = ep->re_id->device;
754 	unsigned int count;
755 
756 	/* Try to allocate enough to perform one full-sized I/O */
757 	for (count = 0; count < ep->re_max_rdma_segs; count++) {
758 		struct rpcrdma_mr *mr;
759 		int rc;
760 
761 		mr = kzalloc_node(sizeof(*mr), XPRTRDMA_GFP_FLAGS,
762 				  ibdev_to_node(device));
763 		if (!mr)
764 			break;
765 
766 		rc = frwr_mr_init(r_xprt, mr);
767 		if (rc) {
768 			kfree(mr);
769 			break;
770 		}
771 
772 		spin_lock(&buf->rb_lock);
773 		rpcrdma_mr_push(mr, &buf->rb_mrs);
774 		list_add(&mr->mr_all, &buf->rb_all_mrs);
775 		spin_unlock(&buf->rb_lock);
776 	}
777 
778 	r_xprt->rx_stats.mrs_allocated += count;
779 	trace_xprtrdma_createmrs(r_xprt, count);
780 }
781 
782 static void
783 rpcrdma_mr_refresh_worker(struct work_struct *work)
784 {
785 	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
786 						  rb_refresh_worker);
787 	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
788 						   rx_buf);
789 
790 	rpcrdma_mrs_create(r_xprt);
791 	xprt_write_space(&r_xprt->rx_xprt);
792 }
793 
794 /**
795  * rpcrdma_mrs_refresh - Wake the MR refresh worker
796  * @r_xprt: controlling transport instance
797  *
798  */
799 void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
800 {
801 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
802 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
803 
804 	/* If there is no underlying connection, it's no use
805 	 * to wake the refresh worker.
806 	 */
807 	if (ep->re_connect_status != 1)
808 		return;
809 	queue_work(system_highpri_wq, &buf->rb_refresh_worker);
810 }
811 
812 /**
813  * rpcrdma_req_create - Allocate an rpcrdma_req object
814  * @r_xprt: controlling r_xprt
815  * @size: initial size, in bytes, of send and receive buffers
816  *
817  * Returns an allocated and fully initialized rpcrdma_req or NULL.
818  */
819 struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt,
820 				       size_t size)
821 {
822 	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
823 	struct rpcrdma_req *req;
824 
825 	req = kzalloc(sizeof(*req), XPRTRDMA_GFP_FLAGS);
826 	if (req == NULL)
827 		goto out1;
828 
829 	req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE);
830 	if (!req->rl_sendbuf)
831 		goto out2;
832 
833 	req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE);
834 	if (!req->rl_recvbuf)
835 		goto out3;
836 
837 	INIT_LIST_HEAD(&req->rl_free_mrs);
838 	INIT_LIST_HEAD(&req->rl_registered);
839 	spin_lock(&buffer->rb_lock);
840 	list_add(&req->rl_all, &buffer->rb_allreqs);
841 	spin_unlock(&buffer->rb_lock);
842 	return req;
843 
844 out3:
845 	rpcrdma_regbuf_free(req->rl_sendbuf);
846 out2:
847 	kfree(req);
848 out1:
849 	return NULL;
850 }
851 
852 /**
853  * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
854  * @r_xprt: controlling transport instance
855  * @req: rpcrdma_req object to set up
856  *
857  * Returns zero on success, and a negative errno on failure.
858  */
859 int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
860 {
861 	struct rpcrdma_regbuf *rb;
862 	size_t maxhdrsize;
863 
864 	/* Compute maximum header buffer size in bytes */
865 	maxhdrsize = rpcrdma_fixed_maxsz + 3 +
866 		     r_xprt->rx_ep->re_max_rdma_segs * rpcrdma_readchunk_maxsz;
867 	maxhdrsize *= sizeof(__be32);
868 	rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
869 				  DMA_TO_DEVICE);
870 	if (!rb)
871 		goto out;
872 
873 	if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
874 		goto out_free;
875 
876 	req->rl_rdmabuf = rb;
877 	xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
878 	return 0;
879 
880 out_free:
881 	rpcrdma_regbuf_free(rb);
882 out:
883 	return -ENOMEM;
884 }
885 
886 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
887  * and thus can be walked without holding rb_lock. Eg. the
888  * caller is holding the transport send lock to exclude
889  * device removal or disconnection.
890  */
891 static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
892 {
893 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
894 	struct rpcrdma_req *req;
895 	int rc;
896 
897 	list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
898 		rc = rpcrdma_req_setup(r_xprt, req);
899 		if (rc)
900 			return rc;
901 	}
902 	return 0;
903 }
904 
905 static void rpcrdma_req_reset(struct rpcrdma_req *req)
906 {
907 	struct rpcrdma_mr *mr;
908 
909 	/* Credits are valid for only one connection */
910 	req->rl_slot.rq_cong = 0;
911 
912 	rpcrdma_regbuf_free(req->rl_rdmabuf);
913 	req->rl_rdmabuf = NULL;
914 
915 	rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
916 	rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
917 
918 	/* The verbs consumer can't know the state of an MR on the
919 	 * req->rl_registered list unless a successful completion
920 	 * has occurred, so they cannot be re-used.
921 	 */
922 	while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
923 		struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
924 
925 		spin_lock(&buf->rb_lock);
926 		list_del(&mr->mr_all);
927 		spin_unlock(&buf->rb_lock);
928 
929 		frwr_mr_release(mr);
930 	}
931 }
932 
933 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
934  * and thus can be walked without holding rb_lock. Eg. the
935  * caller is holding the transport send lock to exclude
936  * device removal or disconnection.
937  */
938 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
939 {
940 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
941 	struct rpcrdma_req *req;
942 
943 	list_for_each_entry(req, &buf->rb_allreqs, rl_all)
944 		rpcrdma_req_reset(req);
945 }
946 
947 static noinline
948 struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt)
949 {
950 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
951 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
952 	struct ib_device *device = ep->re_id->device;
953 	struct rpcrdma_rep *rep;
954 
955 	rep = kzalloc(sizeof(*rep), XPRTRDMA_GFP_FLAGS);
956 	if (rep == NULL)
957 		goto out;
958 
959 	rep->rr_rdmabuf = rpcrdma_regbuf_alloc_node(ep->re_inline_recv,
960 						    DMA_FROM_DEVICE,
961 						    ibdev_to_node(device));
962 	if (!rep->rr_rdmabuf)
963 		goto out_free;
964 
965 	rep->rr_cid.ci_completion_id =
966 		atomic_inc_return(&r_xprt->rx_ep->re_completion_ids);
967 
968 	xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
969 		     rdmab_length(rep->rr_rdmabuf));
970 	rep->rr_cqe.done = rpcrdma_wc_receive;
971 	rep->rr_rxprt = r_xprt;
972 	rep->rr_recv_wr.next = NULL;
973 	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
974 	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
975 	rep->rr_recv_wr.num_sge = 1;
976 
977 	spin_lock(&buf->rb_lock);
978 	list_add(&rep->rr_all, &buf->rb_all_reps);
979 	spin_unlock(&buf->rb_lock);
980 	return rep;
981 
982 out_free:
983 	kfree(rep);
984 out:
985 	return NULL;
986 }
987 
988 static void rpcrdma_rep_free(struct rpcrdma_rep *rep)
989 {
990 	rpcrdma_regbuf_free(rep->rr_rdmabuf);
991 	kfree(rep);
992 }
993 
994 static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
995 {
996 	struct llist_node *node;
997 
998 	/* Calls to llist_del_first are required to be serialized */
999 	node = llist_del_first(&buf->rb_free_reps);
1000 	if (!node)
1001 		return NULL;
1002 	return llist_entry(node, struct rpcrdma_rep, rr_node);
1003 }
1004 
1005 /**
1006  * rpcrdma_rep_put - Release rpcrdma_rep back to free list
1007  * @buf: buffer pool
1008  * @rep: rep to release
1009  *
1010  */
1011 void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep)
1012 {
1013 	llist_add(&rep->rr_node, &buf->rb_free_reps);
1014 }
1015 
1016 /* Caller must ensure the QP is quiescent (RQ is drained) before
1017  * invoking this function, to guarantee rb_all_reps is not
1018  * changing.
1019  */
1020 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
1021 {
1022 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1023 	struct rpcrdma_rep *rep;
1024 
1025 	list_for_each_entry(rep, &buf->rb_all_reps, rr_all)
1026 		rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
1027 }
1028 
1029 static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
1030 {
1031 	struct rpcrdma_rep *rep;
1032 
1033 	spin_lock(&buf->rb_lock);
1034 	while ((rep = list_first_entry_or_null(&buf->rb_all_reps,
1035 					       struct rpcrdma_rep,
1036 					       rr_all)) != NULL) {
1037 		list_del(&rep->rr_all);
1038 		spin_unlock(&buf->rb_lock);
1039 
1040 		rpcrdma_rep_free(rep);
1041 
1042 		spin_lock(&buf->rb_lock);
1043 	}
1044 	spin_unlock(&buf->rb_lock);
1045 }
1046 
1047 /**
1048  * rpcrdma_buffer_create - Create initial set of req/rep objects
1049  * @r_xprt: transport instance to (re)initialize
1050  *
1051  * Returns zero on success, otherwise a negative errno.
1052  */
1053 int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1054 {
1055 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1056 	int i, rc;
1057 
1058 	buf->rb_bc_srv_max_requests = 0;
1059 	spin_lock_init(&buf->rb_lock);
1060 	INIT_LIST_HEAD(&buf->rb_mrs);
1061 	INIT_LIST_HEAD(&buf->rb_all_mrs);
1062 	INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
1063 
1064 	INIT_LIST_HEAD(&buf->rb_send_bufs);
1065 	INIT_LIST_HEAD(&buf->rb_allreqs);
1066 	INIT_LIST_HEAD(&buf->rb_all_reps);
1067 
1068 	rc = -ENOMEM;
1069 	for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
1070 		struct rpcrdma_req *req;
1071 
1072 		req = rpcrdma_req_create(r_xprt,
1073 					 RPCRDMA_V1_DEF_INLINE_SIZE * 2);
1074 		if (!req)
1075 			goto out;
1076 		list_add(&req->rl_list, &buf->rb_send_bufs);
1077 	}
1078 
1079 	init_llist_head(&buf->rb_free_reps);
1080 
1081 	return 0;
1082 out:
1083 	rpcrdma_buffer_destroy(buf);
1084 	return rc;
1085 }
1086 
1087 /**
1088  * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1089  * @req: unused object to be destroyed
1090  *
1091  * Relies on caller holding the transport send lock to protect
1092  * removing req->rl_all from buf->rb_all_reqs safely.
1093  */
1094 void rpcrdma_req_destroy(struct rpcrdma_req *req)
1095 {
1096 	struct rpcrdma_mr *mr;
1097 
1098 	list_del(&req->rl_all);
1099 
1100 	while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
1101 		struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
1102 
1103 		spin_lock(&buf->rb_lock);
1104 		list_del(&mr->mr_all);
1105 		spin_unlock(&buf->rb_lock);
1106 
1107 		frwr_mr_release(mr);
1108 	}
1109 
1110 	rpcrdma_regbuf_free(req->rl_recvbuf);
1111 	rpcrdma_regbuf_free(req->rl_sendbuf);
1112 	rpcrdma_regbuf_free(req->rl_rdmabuf);
1113 	kfree(req);
1114 }
1115 
1116 /**
1117  * rpcrdma_mrs_destroy - Release all of a transport's MRs
1118  * @r_xprt: controlling transport instance
1119  *
1120  * Relies on caller holding the transport send lock to protect
1121  * removing mr->mr_list from req->rl_free_mrs safely.
1122  */
1123 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
1124 {
1125 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1126 	struct rpcrdma_mr *mr;
1127 
1128 	cancel_work_sync(&buf->rb_refresh_worker);
1129 
1130 	spin_lock(&buf->rb_lock);
1131 	while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
1132 					      struct rpcrdma_mr,
1133 					      mr_all)) != NULL) {
1134 		list_del(&mr->mr_list);
1135 		list_del(&mr->mr_all);
1136 		spin_unlock(&buf->rb_lock);
1137 
1138 		frwr_mr_release(mr);
1139 
1140 		spin_lock(&buf->rb_lock);
1141 	}
1142 	spin_unlock(&buf->rb_lock);
1143 }
1144 
1145 /**
1146  * rpcrdma_buffer_destroy - Release all hw resources
1147  * @buf: root control block for resources
1148  *
1149  * ORDERING: relies on a prior rpcrdma_xprt_drain :
1150  * - No more Send or Receive completions can occur
1151  * - All MRs, reps, and reqs are returned to their free lists
1152  */
1153 void
1154 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1155 {
1156 	rpcrdma_reps_destroy(buf);
1157 
1158 	while (!list_empty(&buf->rb_send_bufs)) {
1159 		struct rpcrdma_req *req;
1160 
1161 		req = list_first_entry(&buf->rb_send_bufs,
1162 				       struct rpcrdma_req, rl_list);
1163 		list_del(&req->rl_list);
1164 		rpcrdma_req_destroy(req);
1165 	}
1166 }
1167 
1168 /**
1169  * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1170  * @r_xprt: controlling transport
1171  *
1172  * Returns an initialized rpcrdma_mr or NULL if no free
1173  * rpcrdma_mr objects are available.
1174  */
1175 struct rpcrdma_mr *
1176 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1177 {
1178 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1179 	struct rpcrdma_mr *mr;
1180 
1181 	spin_lock(&buf->rb_lock);
1182 	mr = rpcrdma_mr_pop(&buf->rb_mrs);
1183 	spin_unlock(&buf->rb_lock);
1184 	return mr;
1185 }
1186 
1187 /**
1188  * rpcrdma_reply_put - Put reply buffers back into pool
1189  * @buffers: buffer pool
1190  * @req: object to return
1191  *
1192  */
1193 void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1194 {
1195 	if (req->rl_reply) {
1196 		rpcrdma_rep_put(buffers, req->rl_reply);
1197 		req->rl_reply = NULL;
1198 	}
1199 }
1200 
1201 /**
1202  * rpcrdma_buffer_get - Get a request buffer
1203  * @buffers: Buffer pool from which to obtain a buffer
1204  *
1205  * Returns a fresh rpcrdma_req, or NULL if none are available.
1206  */
1207 struct rpcrdma_req *
1208 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1209 {
1210 	struct rpcrdma_req *req;
1211 
1212 	spin_lock(&buffers->rb_lock);
1213 	req = list_first_entry_or_null(&buffers->rb_send_bufs,
1214 				       struct rpcrdma_req, rl_list);
1215 	if (req)
1216 		list_del_init(&req->rl_list);
1217 	spin_unlock(&buffers->rb_lock);
1218 	return req;
1219 }
1220 
1221 /**
1222  * rpcrdma_buffer_put - Put request/reply buffers back into pool
1223  * @buffers: buffer pool
1224  * @req: object to return
1225  *
1226  */
1227 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1228 {
1229 	rpcrdma_reply_put(buffers, req);
1230 
1231 	spin_lock(&buffers->rb_lock);
1232 	list_add(&req->rl_list, &buffers->rb_send_bufs);
1233 	spin_unlock(&buffers->rb_lock);
1234 }
1235 
1236 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
1237  *
1238  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1239  * receiving the payload of RDMA RECV operations. During Long Calls
1240  * or Replies they may be registered externally via frwr_map.
1241  */
1242 static struct rpcrdma_regbuf *
1243 rpcrdma_regbuf_alloc_node(size_t size, enum dma_data_direction direction,
1244 			  int node)
1245 {
1246 	struct rpcrdma_regbuf *rb;
1247 
1248 	rb = kmalloc_node(sizeof(*rb), XPRTRDMA_GFP_FLAGS, node);
1249 	if (!rb)
1250 		return NULL;
1251 	rb->rg_data = kmalloc_node(size, XPRTRDMA_GFP_FLAGS, node);
1252 	if (!rb->rg_data) {
1253 		kfree(rb);
1254 		return NULL;
1255 	}
1256 
1257 	rb->rg_device = NULL;
1258 	rb->rg_direction = direction;
1259 	rb->rg_iov.length = size;
1260 	return rb;
1261 }
1262 
1263 static struct rpcrdma_regbuf *
1264 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction)
1265 {
1266 	return rpcrdma_regbuf_alloc_node(size, direction, NUMA_NO_NODE);
1267 }
1268 
1269 /**
1270  * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1271  * @rb: regbuf to reallocate
1272  * @size: size of buffer to be allocated, in bytes
1273  * @flags: GFP flags
1274  *
1275  * Returns true if reallocation was successful. If false is
1276  * returned, @rb is left untouched.
1277  */
1278 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
1279 {
1280 	void *buf;
1281 
1282 	buf = kmalloc(size, flags);
1283 	if (!buf)
1284 		return false;
1285 
1286 	rpcrdma_regbuf_dma_unmap(rb);
1287 	kfree(rb->rg_data);
1288 
1289 	rb->rg_data = buf;
1290 	rb->rg_iov.length = size;
1291 	return true;
1292 }
1293 
1294 /**
1295  * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1296  * @r_xprt: controlling transport instance
1297  * @rb: regbuf to be mapped
1298  *
1299  * Returns true if the buffer is now DMA mapped to @r_xprt's device
1300  */
1301 bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
1302 			      struct rpcrdma_regbuf *rb)
1303 {
1304 	struct ib_device *device = r_xprt->rx_ep->re_id->device;
1305 
1306 	if (rb->rg_direction == DMA_NONE)
1307 		return false;
1308 
1309 	rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
1310 					    rdmab_length(rb), rb->rg_direction);
1311 	if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
1312 		trace_xprtrdma_dma_maperr(rdmab_addr(rb));
1313 		return false;
1314 	}
1315 
1316 	rb->rg_device = device;
1317 	rb->rg_iov.lkey = r_xprt->rx_ep->re_pd->local_dma_lkey;
1318 	return true;
1319 }
1320 
1321 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
1322 {
1323 	if (!rb)
1324 		return;
1325 
1326 	if (!rpcrdma_regbuf_is_mapped(rb))
1327 		return;
1328 
1329 	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
1330 			    rb->rg_direction);
1331 	rb->rg_device = NULL;
1332 }
1333 
1334 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
1335 {
1336 	rpcrdma_regbuf_dma_unmap(rb);
1337 	if (rb)
1338 		kfree(rb->rg_data);
1339 	kfree(rb);
1340 }
1341 
1342 /**
1343  * rpcrdma_post_recvs - Refill the Receive Queue
1344  * @r_xprt: controlling transport instance
1345  * @needed: current credit grant
1346  *
1347  */
1348 void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed)
1349 {
1350 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1351 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
1352 	struct ib_recv_wr *wr, *bad_wr;
1353 	struct rpcrdma_rep *rep;
1354 	int count, rc;
1355 
1356 	rc = 0;
1357 	count = 0;
1358 
1359 	if (likely(ep->re_receive_count > needed))
1360 		goto out;
1361 	needed -= ep->re_receive_count;
1362 	needed += RPCRDMA_MAX_RECV_BATCH;
1363 
1364 	if (atomic_inc_return(&ep->re_receiving) > 1)
1365 		goto out;
1366 
1367 	/* fast path: all needed reps can be found on the free list */
1368 	wr = NULL;
1369 	while (needed) {
1370 		rep = rpcrdma_rep_get_locked(buf);
1371 		if (!rep)
1372 			rep = rpcrdma_rep_create(r_xprt);
1373 		if (!rep)
1374 			break;
1375 		if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) {
1376 			rpcrdma_rep_put(buf, rep);
1377 			break;
1378 		}
1379 
1380 		rep->rr_cid.ci_queue_id = ep->re_attr.recv_cq->res.id;
1381 		trace_xprtrdma_post_recv(&rep->rr_cid);
1382 		rep->rr_recv_wr.next = wr;
1383 		wr = &rep->rr_recv_wr;
1384 		--needed;
1385 		++count;
1386 	}
1387 	if (!wr)
1388 		goto out;
1389 
1390 	rc = ib_post_recv(ep->re_id->qp, wr,
1391 			  (const struct ib_recv_wr **)&bad_wr);
1392 	if (rc) {
1393 		trace_xprtrdma_post_recvs_err(r_xprt, rc);
1394 		for (wr = bad_wr; wr;) {
1395 			struct rpcrdma_rep *rep;
1396 
1397 			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1398 			wr = wr->next;
1399 			rpcrdma_rep_put(buf, rep);
1400 			--count;
1401 		}
1402 	}
1403 	if (atomic_dec_return(&ep->re_receiving) > 0)
1404 		complete(&ep->re_done);
1405 
1406 out:
1407 	trace_xprtrdma_post_recvs(r_xprt, count);
1408 	ep->re_receive_count += count;
1409 	return;
1410 }
1411