1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Copyright (c) 2015-2020, Oracle and/or its affiliates.
4 *
5 * Support for reverse-direction RPCs on RPC/RDMA.
6 */
7
8 #include <linux/sunrpc/xprt.h>
9 #include <linux/sunrpc/svc.h>
10 #include <linux/sunrpc/svc_xprt.h>
11 #include <linux/sunrpc/svc_rdma.h>
12 #include <linux/sunrpc/bc_xprt.h>
13
14 #include "xprt_rdma.h"
15 #include <trace/events/rpcrdma.h>
16
17 #undef RPCRDMA_BACKCHANNEL_DEBUG
18
19 /**
20 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
21 * @xprt: transport associated with these backchannel resources
22 * @reqs: number of concurrent incoming requests to expect
23 *
24 * Returns 0 on success; otherwise a negative errno
25 */
xprt_rdma_bc_setup(struct rpc_xprt * xprt,unsigned int reqs)26 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
27 {
28 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
29
30 r_xprt->rx_buf.rb_bc_srv_max_requests = RPCRDMA_BACKWARD_WRS >> 1;
31 trace_xprtrdma_cb_setup(r_xprt, reqs);
32 return 0;
33 }
34
35 /**
36 * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
37 * @xprt: transport
38 *
39 * Returns maximum size, in bytes, of a backchannel message
40 */
xprt_rdma_bc_maxpayload(struct rpc_xprt * xprt)41 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
42 {
43 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
44 struct rpcrdma_ep *ep = r_xprt->rx_ep;
45 size_t maxmsg;
46
47 maxmsg = min_t(unsigned int, ep->re_inline_send, ep->re_inline_recv);
48 maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE);
49 return maxmsg - RPCRDMA_HDRLEN_MIN;
50 }
51
xprt_rdma_bc_max_slots(struct rpc_xprt * xprt)52 unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
53 {
54 return RPCRDMA_BACKWARD_WRS >> 1;
55 }
56
rpcrdma_bc_marshal_reply(struct rpc_rqst * rqst)57 static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
58 {
59 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
60 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
61 __be32 *p;
62
63 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
64 xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
65 rdmab_data(req->rl_rdmabuf), rqst);
66
67 p = xdr_reserve_space(&req->rl_stream, 28);
68 if (unlikely(!p))
69 return -EIO;
70 *p++ = rqst->rq_xid;
71 *p++ = rpcrdma_version;
72 *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
73 *p++ = rdma_msg;
74 *p++ = xdr_zero;
75 *p++ = xdr_zero;
76 *p = xdr_zero;
77
78 if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
79 &rqst->rq_snd_buf, rpcrdma_noch_pullup))
80 return -EIO;
81
82 trace_xprtrdma_cb_reply(r_xprt, rqst);
83 return 0;
84 }
85
86 /**
87 * xprt_rdma_bc_send_reply - marshal and send a backchannel reply
88 * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf
89 *
90 * Caller holds the transport's write lock.
91 *
92 * Returns:
93 * %0 if the RPC message has been sent
94 * %-ENOTCONN if the caller should reconnect and call again
95 * %-EIO if a permanent error occurred and the request was not
96 * sent. Do not try to send this message again.
97 */
xprt_rdma_bc_send_reply(struct rpc_rqst * rqst)98 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
99 {
100 struct rpc_xprt *xprt = rqst->rq_xprt;
101 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
102 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
103 int rc;
104
105 if (!xprt_connected(xprt))
106 return -ENOTCONN;
107
108 if (!xprt_request_get_cong(xprt, rqst))
109 return -EBADSLT;
110
111 rc = rpcrdma_bc_marshal_reply(rqst);
112 if (rc < 0)
113 goto failed_marshal;
114
115 if (frwr_send(r_xprt, req))
116 goto drop_connection;
117 return 0;
118
119 failed_marshal:
120 if (rc != -ENOTCONN)
121 return rc;
122 drop_connection:
123 xprt_rdma_close(xprt);
124 return -ENOTCONN;
125 }
126
127 /**
128 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
129 * @xprt: transport associated with these backchannel resources
130 * @reqs: number of incoming requests to destroy; ignored
131 */
xprt_rdma_bc_destroy(struct rpc_xprt * xprt,unsigned int reqs)132 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
133 {
134 struct rpc_rqst *rqst, *tmp;
135
136 spin_lock(&xprt->bc_pa_lock);
137 list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
138 list_del(&rqst->rq_bc_pa_list);
139 spin_unlock(&xprt->bc_pa_lock);
140
141 rpcrdma_req_destroy(rpcr_to_rdmar(rqst));
142
143 spin_lock(&xprt->bc_pa_lock);
144 }
145 spin_unlock(&xprt->bc_pa_lock);
146 }
147
148 /**
149 * xprt_rdma_bc_free_rqst - Release a backchannel rqst
150 * @rqst: request to release
151 */
xprt_rdma_bc_free_rqst(struct rpc_rqst * rqst)152 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
153 {
154 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
155 struct rpcrdma_rep *rep = req->rl_reply;
156 struct rpc_xprt *xprt = rqst->rq_xprt;
157 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
158
159 rpcrdma_rep_put(&r_xprt->rx_buf, rep);
160 req->rl_reply = NULL;
161
162 spin_lock(&xprt->bc_pa_lock);
163 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
164 spin_unlock(&xprt->bc_pa_lock);
165 xprt_put(xprt);
166 }
167
rpcrdma_bc_rqst_get(struct rpcrdma_xprt * r_xprt)168 static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt)
169 {
170 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
171 struct rpcrdma_req *req;
172 struct rpc_rqst *rqst;
173 size_t size;
174
175 spin_lock(&xprt->bc_pa_lock);
176 rqst = list_first_entry_or_null(&xprt->bc_pa_list, struct rpc_rqst,
177 rq_bc_pa_list);
178 if (!rqst)
179 goto create_req;
180 list_del(&rqst->rq_bc_pa_list);
181 spin_unlock(&xprt->bc_pa_lock);
182 return rqst;
183
184 create_req:
185 spin_unlock(&xprt->bc_pa_lock);
186
187 /* Set a limit to prevent a remote from overrunning our resources.
188 */
189 if (xprt->bc_alloc_count >= RPCRDMA_BACKWARD_WRS)
190 return NULL;
191
192 size = min_t(size_t, r_xprt->rx_ep->re_inline_recv, PAGE_SIZE);
193 req = rpcrdma_req_create(r_xprt, size);
194 if (!req)
195 return NULL;
196 if (rpcrdma_req_setup(r_xprt, req)) {
197 rpcrdma_req_destroy(req);
198 return NULL;
199 }
200
201 xprt->bc_alloc_count++;
202 rqst = &req->rl_slot;
203 rqst->rq_xprt = xprt;
204 __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
205 xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size);
206 return rqst;
207 }
208
209 /**
210 * rpcrdma_bc_receive_call - Handle a reverse-direction Call
211 * @r_xprt: transport receiving the call
212 * @rep: receive buffer containing the call
213 *
214 * Operational assumptions:
215 * o Backchannel credits are ignored, just as the NFS server
216 * forechannel currently does
217 * o The ULP manages a replay cache (eg, NFSv4.1 sessions).
218 * No replay detection is done at the transport level
219 */
rpcrdma_bc_receive_call(struct rpcrdma_xprt * r_xprt,struct rpcrdma_rep * rep)220 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
221 struct rpcrdma_rep *rep)
222 {
223 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
224 struct rpcrdma_req *req;
225 struct rpc_rqst *rqst;
226 struct xdr_buf *buf;
227 size_t size;
228 __be32 *p;
229
230 p = xdr_inline_decode(&rep->rr_stream, 0);
231 size = xdr_stream_remaining(&rep->rr_stream);
232
233 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
234 pr_info("RPC: %s: callback XID %08x, length=%u\n",
235 __func__, be32_to_cpup(p), size);
236 pr_info("RPC: %s: %*ph\n", __func__, size, p);
237 #endif
238
239 rqst = rpcrdma_bc_rqst_get(r_xprt);
240 if (!rqst)
241 goto out_overflow;
242
243 rqst->rq_reply_bytes_recvd = 0;
244 rqst->rq_xid = *p;
245
246 rqst->rq_private_buf.len = size;
247
248 buf = &rqst->rq_rcv_buf;
249 memset(buf, 0, sizeof(*buf));
250 buf->head[0].iov_base = p;
251 buf->head[0].iov_len = size;
252 buf->len = size;
253
254 /* The receive buffer has to be hooked to the rpcrdma_req
255 * so that it is not released while the req is pointing
256 * to its buffer, and so that it can be reposted after
257 * the Upper Layer is done decoding it.
258 */
259 req = rpcr_to_rdmar(rqst);
260 req->rl_reply = rep;
261 trace_xprtrdma_cb_call(r_xprt, rqst);
262
263 /* Queue rqst for ULP's callback service */
264 xprt_enqueue_bc_request(rqst);
265
266 r_xprt->rx_stats.bcall_count++;
267 return;
268
269 out_overflow:
270 pr_warn("RPC/RDMA backchannel overflow\n");
271 xprt_force_disconnect(xprt);
272 /* This receive buffer gets reposted automatically
273 * when the connection is re-established.
274 */
275 return;
276 }
277