xref: /linux/net/sunrpc/xprtrdma/backchannel.c (revision f531a5dbc451afb66e9d6c71a69e8358d1847969)
1*f531a5dbSChuck Lever /*
2*f531a5dbSChuck Lever  * Copyright (c) 2015 Oracle.  All rights reserved.
3*f531a5dbSChuck Lever  *
4*f531a5dbSChuck Lever  * Support for backward direction RPCs on RPC/RDMA.
5*f531a5dbSChuck Lever  */
6*f531a5dbSChuck Lever 
7*f531a5dbSChuck Lever #include <linux/module.h>
8*f531a5dbSChuck Lever 
9*f531a5dbSChuck Lever #include "xprt_rdma.h"
10*f531a5dbSChuck Lever 
11*f531a5dbSChuck Lever #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
12*f531a5dbSChuck Lever # define RPCDBG_FACILITY	RPCDBG_TRANS
13*f531a5dbSChuck Lever #endif
14*f531a5dbSChuck Lever 
15*f531a5dbSChuck Lever static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
16*f531a5dbSChuck Lever 				 struct rpc_rqst *rqst)
17*f531a5dbSChuck Lever {
18*f531a5dbSChuck Lever 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
19*f531a5dbSChuck Lever 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
20*f531a5dbSChuck Lever 
21*f531a5dbSChuck Lever 	spin_lock(&buf->rb_reqslock);
22*f531a5dbSChuck Lever 	list_del(&req->rl_all);
23*f531a5dbSChuck Lever 	spin_unlock(&buf->rb_reqslock);
24*f531a5dbSChuck Lever 
25*f531a5dbSChuck Lever 	rpcrdma_destroy_req(&r_xprt->rx_ia, req);
26*f531a5dbSChuck Lever 
27*f531a5dbSChuck Lever 	kfree(rqst);
28*f531a5dbSChuck Lever }
29*f531a5dbSChuck Lever 
30*f531a5dbSChuck Lever static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
31*f531a5dbSChuck Lever 				 struct rpc_rqst *rqst)
32*f531a5dbSChuck Lever {
33*f531a5dbSChuck Lever 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
34*f531a5dbSChuck Lever 	struct rpcrdma_regbuf *rb;
35*f531a5dbSChuck Lever 	struct rpcrdma_req *req;
36*f531a5dbSChuck Lever 	struct xdr_buf *buf;
37*f531a5dbSChuck Lever 	size_t size;
38*f531a5dbSChuck Lever 
39*f531a5dbSChuck Lever 	req = rpcrdma_create_req(r_xprt);
40*f531a5dbSChuck Lever 	if (!req)
41*f531a5dbSChuck Lever 		return -ENOMEM;
42*f531a5dbSChuck Lever 	req->rl_backchannel = true;
43*f531a5dbSChuck Lever 
44*f531a5dbSChuck Lever 	size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
45*f531a5dbSChuck Lever 	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
46*f531a5dbSChuck Lever 	if (IS_ERR(rb))
47*f531a5dbSChuck Lever 		goto out_fail;
48*f531a5dbSChuck Lever 	req->rl_rdmabuf = rb;
49*f531a5dbSChuck Lever 
50*f531a5dbSChuck Lever 	size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
51*f531a5dbSChuck Lever 	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
52*f531a5dbSChuck Lever 	if (IS_ERR(rb))
53*f531a5dbSChuck Lever 		goto out_fail;
54*f531a5dbSChuck Lever 	rb->rg_owner = req;
55*f531a5dbSChuck Lever 	req->rl_sendbuf = rb;
56*f531a5dbSChuck Lever 	/* so that rpcr_to_rdmar works when receiving a request */
57*f531a5dbSChuck Lever 	rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
58*f531a5dbSChuck Lever 
59*f531a5dbSChuck Lever 	buf = &rqst->rq_snd_buf;
60*f531a5dbSChuck Lever 	buf->head[0].iov_base = rqst->rq_buffer;
61*f531a5dbSChuck Lever 	buf->head[0].iov_len = 0;
62*f531a5dbSChuck Lever 	buf->tail[0].iov_base = NULL;
63*f531a5dbSChuck Lever 	buf->tail[0].iov_len = 0;
64*f531a5dbSChuck Lever 	buf->page_len = 0;
65*f531a5dbSChuck Lever 	buf->len = 0;
66*f531a5dbSChuck Lever 	buf->buflen = size;
67*f531a5dbSChuck Lever 
68*f531a5dbSChuck Lever 	return 0;
69*f531a5dbSChuck Lever 
70*f531a5dbSChuck Lever out_fail:
71*f531a5dbSChuck Lever 	rpcrdma_bc_free_rqst(r_xprt, rqst);
72*f531a5dbSChuck Lever 	return -ENOMEM;
73*f531a5dbSChuck Lever }
74*f531a5dbSChuck Lever 
75*f531a5dbSChuck Lever /* Allocate and add receive buffers to the rpcrdma_buffer's
76*f531a5dbSChuck Lever  * existing list of rep's. These are released when the
77*f531a5dbSChuck Lever  * transport is destroyed.
78*f531a5dbSChuck Lever  */
79*f531a5dbSChuck Lever static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
80*f531a5dbSChuck Lever 				 unsigned int count)
81*f531a5dbSChuck Lever {
82*f531a5dbSChuck Lever 	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
83*f531a5dbSChuck Lever 	struct rpcrdma_rep *rep;
84*f531a5dbSChuck Lever 	unsigned long flags;
85*f531a5dbSChuck Lever 	int rc = 0;
86*f531a5dbSChuck Lever 
87*f531a5dbSChuck Lever 	while (count--) {
88*f531a5dbSChuck Lever 		rep = rpcrdma_create_rep(r_xprt);
89*f531a5dbSChuck Lever 		if (IS_ERR(rep)) {
90*f531a5dbSChuck Lever 			pr_err("RPC:       %s: reply buffer alloc failed\n",
91*f531a5dbSChuck Lever 			       __func__);
92*f531a5dbSChuck Lever 			rc = PTR_ERR(rep);
93*f531a5dbSChuck Lever 			break;
94*f531a5dbSChuck Lever 		}
95*f531a5dbSChuck Lever 
96*f531a5dbSChuck Lever 		spin_lock_irqsave(&buffers->rb_lock, flags);
97*f531a5dbSChuck Lever 		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
98*f531a5dbSChuck Lever 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
99*f531a5dbSChuck Lever 	}
100*f531a5dbSChuck Lever 
101*f531a5dbSChuck Lever 	return rc;
102*f531a5dbSChuck Lever }
103*f531a5dbSChuck Lever 
104*f531a5dbSChuck Lever /**
105*f531a5dbSChuck Lever  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
106*f531a5dbSChuck Lever  * @xprt: transport associated with these backchannel resources
107*f531a5dbSChuck Lever  * @reqs: number of concurrent incoming requests to expect
108*f531a5dbSChuck Lever  *
109*f531a5dbSChuck Lever  * Returns 0 on success; otherwise a negative errno
110*f531a5dbSChuck Lever  */
111*f531a5dbSChuck Lever int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
112*f531a5dbSChuck Lever {
113*f531a5dbSChuck Lever 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
114*f531a5dbSChuck Lever 	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
115*f531a5dbSChuck Lever 	struct rpc_rqst *rqst;
116*f531a5dbSChuck Lever 	unsigned int i;
117*f531a5dbSChuck Lever 	int rc;
118*f531a5dbSChuck Lever 
119*f531a5dbSChuck Lever 	/* The backchannel reply path returns each rpc_rqst to the
120*f531a5dbSChuck Lever 	 * bc_pa_list _after_ the reply is sent. If the server is
121*f531a5dbSChuck Lever 	 * faster than the client, it can send another backward
122*f531a5dbSChuck Lever 	 * direction request before the rpc_rqst is returned to the
123*f531a5dbSChuck Lever 	 * list. The client rejects the request in this case.
124*f531a5dbSChuck Lever 	 *
125*f531a5dbSChuck Lever 	 * Twice as many rpc_rqsts are prepared to ensure there is
126*f531a5dbSChuck Lever 	 * always an rpc_rqst available as soon as a reply is sent.
127*f531a5dbSChuck Lever 	 */
128*f531a5dbSChuck Lever 	for (i = 0; i < (reqs << 1); i++) {
129*f531a5dbSChuck Lever 		rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
130*f531a5dbSChuck Lever 		if (!rqst) {
131*f531a5dbSChuck Lever 			pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
132*f531a5dbSChuck Lever 			       __func__);
133*f531a5dbSChuck Lever 			goto out_free;
134*f531a5dbSChuck Lever 		}
135*f531a5dbSChuck Lever 
136*f531a5dbSChuck Lever 		rqst->rq_xprt = &r_xprt->rx_xprt;
137*f531a5dbSChuck Lever 		INIT_LIST_HEAD(&rqst->rq_list);
138*f531a5dbSChuck Lever 		INIT_LIST_HEAD(&rqst->rq_bc_list);
139*f531a5dbSChuck Lever 
140*f531a5dbSChuck Lever 		if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
141*f531a5dbSChuck Lever 			goto out_free;
142*f531a5dbSChuck Lever 
143*f531a5dbSChuck Lever 		spin_lock_bh(&xprt->bc_pa_lock);
144*f531a5dbSChuck Lever 		list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
145*f531a5dbSChuck Lever 		spin_unlock_bh(&xprt->bc_pa_lock);
146*f531a5dbSChuck Lever 	}
147*f531a5dbSChuck Lever 
148*f531a5dbSChuck Lever 	rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
149*f531a5dbSChuck Lever 	if (rc)
150*f531a5dbSChuck Lever 		goto out_free;
151*f531a5dbSChuck Lever 
152*f531a5dbSChuck Lever 	rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
153*f531a5dbSChuck Lever 	if (rc)
154*f531a5dbSChuck Lever 		goto out_free;
155*f531a5dbSChuck Lever 
156*f531a5dbSChuck Lever 	buffer->rb_bc_srv_max_requests = reqs;
157*f531a5dbSChuck Lever 	request_module("svcrdma");
158*f531a5dbSChuck Lever 
159*f531a5dbSChuck Lever 	return 0;
160*f531a5dbSChuck Lever 
161*f531a5dbSChuck Lever out_free:
162*f531a5dbSChuck Lever 	xprt_rdma_bc_destroy(xprt, reqs);
163*f531a5dbSChuck Lever 
164*f531a5dbSChuck Lever 	pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
165*f531a5dbSChuck Lever 	return -ENOMEM;
166*f531a5dbSChuck Lever }
167*f531a5dbSChuck Lever 
168*f531a5dbSChuck Lever /**
169*f531a5dbSChuck Lever  * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
170*f531a5dbSChuck Lever  * @xprt: transport associated with these backchannel resources
171*f531a5dbSChuck Lever  * @reqs: number of incoming requests to destroy; ignored
172*f531a5dbSChuck Lever  */
173*f531a5dbSChuck Lever void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
174*f531a5dbSChuck Lever {
175*f531a5dbSChuck Lever 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
176*f531a5dbSChuck Lever 	struct rpc_rqst *rqst, *tmp;
177*f531a5dbSChuck Lever 
178*f531a5dbSChuck Lever 	spin_lock_bh(&xprt->bc_pa_lock);
179*f531a5dbSChuck Lever 	list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
180*f531a5dbSChuck Lever 		list_del(&rqst->rq_bc_pa_list);
181*f531a5dbSChuck Lever 		spin_unlock_bh(&xprt->bc_pa_lock);
182*f531a5dbSChuck Lever 
183*f531a5dbSChuck Lever 		rpcrdma_bc_free_rqst(r_xprt, rqst);
184*f531a5dbSChuck Lever 
185*f531a5dbSChuck Lever 		spin_lock_bh(&xprt->bc_pa_lock);
186*f531a5dbSChuck Lever 	}
187*f531a5dbSChuck Lever 	spin_unlock_bh(&xprt->bc_pa_lock);
188*f531a5dbSChuck Lever }
189*f531a5dbSChuck Lever 
190*f531a5dbSChuck Lever /**
191*f531a5dbSChuck Lever  * xprt_rdma_bc_free_rqst - Release a backchannel rqst
192*f531a5dbSChuck Lever  * @rqst: request to release
193*f531a5dbSChuck Lever  */
194*f531a5dbSChuck Lever void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
195*f531a5dbSChuck Lever {
196*f531a5dbSChuck Lever 	struct rpc_xprt *xprt = rqst->rq_xprt;
197*f531a5dbSChuck Lever 
198*f531a5dbSChuck Lever 	smp_mb__before_atomic();
199*f531a5dbSChuck Lever 	WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
200*f531a5dbSChuck Lever 	clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
201*f531a5dbSChuck Lever 	smp_mb__after_atomic();
202*f531a5dbSChuck Lever 
203*f531a5dbSChuck Lever 	spin_lock_bh(&xprt->bc_pa_lock);
204*f531a5dbSChuck Lever 	list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
205*f531a5dbSChuck Lever 	spin_unlock_bh(&xprt->bc_pa_lock);
206*f531a5dbSChuck Lever }
207