1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3
4 (c) 2007 Network Appliance, Inc. All Rights Reserved.
5 (c) 2009 NetApp. All Rights Reserved.
6
7
8 ******************************************************************************/
9
10 #include <linux/tcp.h>
11 #include <linux/slab.h>
12 #include <linux/sunrpc/xprt.h>
13 #include <linux/export.h>
14 #include <linux/sunrpc/bc_xprt.h>
15
16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
17 #define RPCDBG_FACILITY RPCDBG_TRANS
18 #endif
19
20 #define BC_MAX_SLOTS 64U
21
xprt_bc_max_slots(struct rpc_xprt * xprt)22 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
23 {
24 return BC_MAX_SLOTS;
25 }
26
27 /*
28 * Helper function to nullify backchannel server pointer in transport.
29 * We need to synchronize setting the pointer to NULL (done so after
30 * the backchannel server is shutdown) with the usage of that pointer
31 * by the backchannel request processing routines
32 * xprt_complete_bc_request() and rpcrdma_bc_receive_call().
33 */
xprt_svc_destroy_nullify_bc(struct rpc_xprt * xprt,struct svc_serv ** serv)34 void xprt_svc_destroy_nullify_bc(struct rpc_xprt *xprt, struct svc_serv **serv)
35 {
36 spin_lock(&xprt->bc_pa_lock);
37 svc_destroy(serv);
38 xprt->bc_serv = NULL;
39 spin_unlock(&xprt->bc_pa_lock);
40 }
41 EXPORT_SYMBOL_GPL(xprt_svc_destroy_nullify_bc);
42
43 /*
44 * Helper routines that track the number of preallocation elements
45 * on the transport.
46 */
xprt_need_to_requeue(struct rpc_xprt * xprt)47 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
48 {
49 return xprt->bc_alloc_count < xprt->bc_alloc_max;
50 }
51
52 /*
53 * Free the preallocated rpc_rqst structure and the memory
54 * buffers hanging off of it.
55 */
xprt_free_allocation(struct rpc_rqst * req)56 static void xprt_free_allocation(struct rpc_rqst *req)
57 {
58 struct xdr_buf *xbufp;
59
60 dprintk("RPC: free allocations for req= %p\n", req);
61 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
62 xbufp = &req->rq_rcv_buf;
63 free_page((unsigned long)xbufp->head[0].iov_base);
64 xbufp = &req->rq_snd_buf;
65 free_page((unsigned long)xbufp->head[0].iov_base);
66 kfree(req);
67 }
68
xprt_bc_reinit_xdr_buf(struct xdr_buf * buf)69 static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
70 {
71 buf->head[0].iov_len = PAGE_SIZE;
72 buf->tail[0].iov_len = 0;
73 buf->pages = NULL;
74 buf->page_len = 0;
75 buf->flags = 0;
76 buf->len = 0;
77 buf->buflen = PAGE_SIZE;
78 }
79
xprt_alloc_xdr_buf(struct xdr_buf * buf,gfp_t gfp_flags)80 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
81 {
82 struct page *page;
83 /* Preallocate one XDR receive buffer */
84 page = alloc_page(gfp_flags);
85 if (page == NULL)
86 return -ENOMEM;
87 xdr_buf_init(buf, page_address(page), PAGE_SIZE);
88 return 0;
89 }
90
xprt_alloc_bc_req(struct rpc_xprt * xprt)91 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
92 {
93 gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
94 struct rpc_rqst *req;
95
96 /* Pre-allocate one backchannel rpc_rqst */
97 req = kzalloc(sizeof(*req), gfp_flags);
98 if (req == NULL)
99 return NULL;
100
101 req->rq_xprt = xprt;
102
103 /* Preallocate one XDR receive buffer */
104 if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
105 printk(KERN_ERR "Failed to create bc receive xbuf\n");
106 goto out_free;
107 }
108 req->rq_rcv_buf.len = PAGE_SIZE;
109
110 /* Preallocate one XDR send buffer */
111 if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
112 printk(KERN_ERR "Failed to create bc snd xbuf\n");
113 goto out_free;
114 }
115 return req;
116 out_free:
117 xprt_free_allocation(req);
118 return NULL;
119 }
120
121 /*
122 * Preallocate up to min_reqs structures and related buffers for use
123 * by the backchannel. This function can be called multiple times
124 * when creating new sessions that use the same rpc_xprt. The
125 * preallocated buffers are added to the pool of resources used by
126 * the rpc_xprt. Any one of these resources may be used by an
127 * incoming callback request. It's up to the higher levels in the
128 * stack to enforce that the maximum number of session slots is not
129 * being exceeded.
130 *
131 * Some callback arguments can be large. For example, a pNFS server
132 * using multiple deviceids. The list can be unbound, but the client
133 * has the ability to tell the server the maximum size of the callback
134 * requests. Each deviceID is 16 bytes, so allocate one page
135 * for the arguments to have enough room to receive a number of these
136 * deviceIDs. The NFS client indicates to the pNFS server that its
137 * callback requests can be up to 4096 bytes in size.
138 */
xprt_setup_backchannel(struct rpc_xprt * xprt,unsigned int min_reqs)139 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
140 {
141 if (!xprt->ops->bc_setup)
142 return 0;
143 return xprt->ops->bc_setup(xprt, min_reqs);
144 }
145 EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
146
xprt_setup_bc(struct rpc_xprt * xprt,unsigned int min_reqs)147 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
148 {
149 struct rpc_rqst *req;
150 struct list_head tmp_list;
151 int i;
152
153 dprintk("RPC: setup backchannel transport\n");
154
155 if (min_reqs > BC_MAX_SLOTS)
156 min_reqs = BC_MAX_SLOTS;
157
158 /*
159 * We use a temporary list to keep track of the preallocated
160 * buffers. Once we're done building the list we splice it
161 * into the backchannel preallocation list off of the rpc_xprt
162 * struct. This helps minimize the amount of time the list
163 * lock is held on the rpc_xprt struct. It also makes cleanup
164 * easier in case of memory allocation errors.
165 */
166 INIT_LIST_HEAD(&tmp_list);
167 for (i = 0; i < min_reqs; i++) {
168 /* Pre-allocate one backchannel rpc_rqst */
169 req = xprt_alloc_bc_req(xprt);
170 if (req == NULL) {
171 printk(KERN_ERR "Failed to create bc rpc_rqst\n");
172 goto out_free;
173 }
174
175 /* Add the allocated buffer to the tmp list */
176 dprintk("RPC: adding req= %p\n", req);
177 list_add(&req->rq_bc_pa_list, &tmp_list);
178 }
179
180 /*
181 * Add the temporary list to the backchannel preallocation list
182 */
183 spin_lock(&xprt->bc_pa_lock);
184 list_splice(&tmp_list, &xprt->bc_pa_list);
185 xprt->bc_alloc_count += min_reqs;
186 xprt->bc_alloc_max += min_reqs;
187 atomic_add(min_reqs, &xprt->bc_slot_count);
188 spin_unlock(&xprt->bc_pa_lock);
189
190 dprintk("RPC: setup backchannel transport done\n");
191 return 0;
192
193 out_free:
194 /*
195 * Memory allocation failed, free the temporary list
196 */
197 while (!list_empty(&tmp_list)) {
198 req = list_first_entry(&tmp_list,
199 struct rpc_rqst,
200 rq_bc_pa_list);
201 list_del(&req->rq_bc_pa_list);
202 xprt_free_allocation(req);
203 }
204
205 dprintk("RPC: setup backchannel transport failed\n");
206 return -ENOMEM;
207 }
208
209 /**
210 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
211 * @xprt: the transport holding the preallocated strucures
212 * @max_reqs: the maximum number of preallocated structures to destroy
213 *
214 * Since these structures may have been allocated by multiple calls
215 * to xprt_setup_backchannel, we only destroy up to the maximum number
216 * of reqs specified by the caller.
217 */
xprt_destroy_backchannel(struct rpc_xprt * xprt,unsigned int max_reqs)218 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
219 {
220 if (xprt->ops->bc_destroy)
221 xprt->ops->bc_destroy(xprt, max_reqs);
222 }
223 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
224
xprt_destroy_bc(struct rpc_xprt * xprt,unsigned int max_reqs)225 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
226 {
227 struct rpc_rqst *req = NULL, *tmp = NULL;
228
229 dprintk("RPC: destroy backchannel transport\n");
230
231 if (max_reqs == 0)
232 goto out;
233
234 spin_lock_bh(&xprt->bc_pa_lock);
235 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
236 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
237 dprintk("RPC: req=%p\n", req);
238 list_del(&req->rq_bc_pa_list);
239 xprt_free_allocation(req);
240 xprt->bc_alloc_count--;
241 atomic_dec(&xprt->bc_slot_count);
242 if (--max_reqs == 0)
243 break;
244 }
245 spin_unlock_bh(&xprt->bc_pa_lock);
246
247 out:
248 dprintk("RPC: backchannel list empty= %s\n",
249 list_empty(&xprt->bc_pa_list) ? "true" : "false");
250 }
251
xprt_get_bc_request(struct rpc_xprt * xprt,__be32 xid,struct rpc_rqst * new)252 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
253 struct rpc_rqst *new)
254 {
255 struct rpc_rqst *req = NULL;
256
257 dprintk("RPC: allocate a backchannel request\n");
258 if (list_empty(&xprt->bc_pa_list)) {
259 if (!new)
260 goto not_found;
261 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
262 goto not_found;
263 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
264 xprt->bc_alloc_count++;
265 atomic_inc(&xprt->bc_slot_count);
266 }
267 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
268 rq_bc_pa_list);
269 req->rq_reply_bytes_recvd = 0;
270 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
271 sizeof(req->rq_private_buf));
272 req->rq_xid = xid;
273 req->rq_connect_cookie = xprt->connect_cookie;
274 dprintk("RPC: backchannel req=%p\n", req);
275 not_found:
276 return req;
277 }
278
279 /*
280 * Return the preallocated rpc_rqst structure and XDR buffers
281 * associated with this rpc_task.
282 */
xprt_free_bc_request(struct rpc_rqst * req)283 void xprt_free_bc_request(struct rpc_rqst *req)
284 {
285 struct rpc_xprt *xprt = req->rq_xprt;
286
287 xprt->ops->bc_free_rqst(req);
288 }
289
xprt_free_bc_rqst(struct rpc_rqst * req)290 void xprt_free_bc_rqst(struct rpc_rqst *req)
291 {
292 struct rpc_xprt *xprt = req->rq_xprt;
293
294 dprintk("RPC: free backchannel req=%p\n", req);
295
296 req->rq_connect_cookie = xprt->connect_cookie - 1;
297 smp_mb__before_atomic();
298 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
299 smp_mb__after_atomic();
300
301 /*
302 * Return it to the list of preallocations so that it
303 * may be reused by a new callback request.
304 */
305 spin_lock_bh(&xprt->bc_pa_lock);
306 if (xprt_need_to_requeue(xprt)) {
307 xprt_bc_reinit_xdr_buf(&req->rq_snd_buf);
308 xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf);
309 req->rq_rcv_buf.len = PAGE_SIZE;
310 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
311 xprt->bc_alloc_count++;
312 atomic_inc(&xprt->bc_slot_count);
313 req = NULL;
314 }
315 spin_unlock_bh(&xprt->bc_pa_lock);
316 if (req != NULL) {
317 /*
318 * The last remaining session was destroyed while this
319 * entry was in use. Free the entry and don't attempt
320 * to add back to the list because there is no need to
321 * have anymore preallocated entries.
322 */
323 dprintk("RPC: Last session removed req=%p\n", req);
324 xprt_free_allocation(req);
325 }
326 xprt_put(xprt);
327 }
328
329 /*
330 * One or more rpc_rqst structure have been preallocated during the
331 * backchannel setup. Buffer space for the send and private XDR buffers
332 * has been preallocated as well. Use xprt_alloc_bc_request to allocate
333 * to this request. Use xprt_free_bc_request to return it.
334 *
335 * We know that we're called in soft interrupt context, grab the spin_lock
336 * since there is no need to grab the bottom half spin_lock.
337 *
338 * Return an available rpc_rqst, otherwise NULL if non are available.
339 */
xprt_lookup_bc_request(struct rpc_xprt * xprt,__be32 xid)340 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
341 {
342 struct rpc_rqst *req, *new = NULL;
343
344 do {
345 spin_lock(&xprt->bc_pa_lock);
346 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
347 if (req->rq_connect_cookie != xprt->connect_cookie)
348 continue;
349 if (req->rq_xid == xid)
350 goto found;
351 }
352 req = xprt_get_bc_request(xprt, xid, new);
353 found:
354 spin_unlock(&xprt->bc_pa_lock);
355 if (new) {
356 if (req != new)
357 xprt_free_allocation(new);
358 break;
359 } else if (req)
360 break;
361 new = xprt_alloc_bc_req(xprt);
362 } while (new);
363 return req;
364 }
365
366 /*
367 * Add callback request to callback list. Wake a thread
368 * on the first pool (usually the only pool) to handle it.
369 */
xprt_complete_bc_request(struct rpc_rqst * req,uint32_t copied)370 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
371 {
372 struct rpc_xprt *xprt = req->rq_xprt;
373
374 spin_lock(&xprt->bc_pa_lock);
375 list_del(&req->rq_bc_pa_list);
376 xprt->bc_alloc_count--;
377 spin_unlock(&xprt->bc_pa_lock);
378
379 req->rq_private_buf.len = copied;
380 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
381
382 dprintk("RPC: add callback request to list\n");
383 xprt_enqueue_bc_request(req);
384 }
385
xprt_enqueue_bc_request(struct rpc_rqst * req)386 void xprt_enqueue_bc_request(struct rpc_rqst *req)
387 {
388 struct rpc_xprt *xprt = req->rq_xprt;
389 struct svc_serv *bc_serv;
390
391 xprt_get(xprt);
392 spin_lock(&xprt->bc_pa_lock);
393 bc_serv = xprt->bc_serv;
394 if (bc_serv) {
395 lwq_enqueue(&req->rq_bc_list, &bc_serv->sv_cb_list);
396 svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]);
397 }
398 spin_unlock(&xprt->bc_pa_lock);
399 }
400 EXPORT_SYMBOL_GPL(xprt_enqueue_bc_request);
401