1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3
4 (c) 2007 Network Appliance, Inc. All Rights Reserved.
5 (c) 2009 NetApp. All Rights Reserved.
6
7
8 ******************************************************************************/
9
10 #include <linux/tcp.h>
11 #include <linux/slab.h>
12 #include <linux/sunrpc/xprt.h>
13 #include <linux/export.h>
14 #include <linux/sunrpc/bc_xprt.h>
15
16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
17 #define RPCDBG_FACILITY RPCDBG_TRANS
18 #endif
19
20 #define BC_MAX_SLOTS 64U
21
xprt_bc_max_slots(struct rpc_xprt * xprt)22 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
23 {
24 return BC_MAX_SLOTS;
25 }
26
27 /*
28 * Helper function to nullify backchannel server pointer in transport.
29 * We need to synchronize setting the pointer to NULL (done so after
30 * the backchannel server is shutdown) with the usage of that pointer
31 * by the backchannel request processing routines
32 * xprt_complete_bc_request() and rpcrdma_bc_receive_call().
33 */
xprt_svc_destroy_nullify_bc(struct rpc_xprt * xprt,struct svc_serv ** serv)34 void xprt_svc_destroy_nullify_bc(struct rpc_xprt *xprt, struct svc_serv **serv)
35 {
36 spin_lock(&xprt->bc_pa_lock);
37 svc_destroy(serv);
38 xprt->bc_serv = NULL;
39 spin_unlock(&xprt->bc_pa_lock);
40 }
41 EXPORT_SYMBOL_GPL(xprt_svc_destroy_nullify_bc);
42
43 /*
44 * Helper routines that track the number of preallocation elements
45 * on the transport.
46 */
xprt_need_to_requeue(struct rpc_xprt * xprt)47 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
48 {
49 return xprt->bc_alloc_count < xprt->bc_alloc_max;
50 }
51
52 /*
53 * Free the preallocated rpc_rqst structure and the memory
54 * buffers hanging off of it.
55 */
xprt_free_allocation(struct rpc_rqst * req)56 static void xprt_free_allocation(struct rpc_rqst *req)
57 {
58 struct xdr_buf *xbufp;
59
60 dprintk("RPC: free allocations for req= %p\n", req);
61 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
62 xbufp = &req->rq_rcv_buf;
63 free_page((unsigned long)xbufp->head[0].iov_base);
64 xbufp = &req->rq_snd_buf;
65 free_page((unsigned long)xbufp->head[0].iov_base);
66 kfree(req);
67 }
68
xprt_bc_reinit_xdr_buf(struct xdr_buf * buf)69 static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
70 {
71 buf->head[0].iov_len = PAGE_SIZE;
72 buf->tail[0].iov_len = 0;
73 buf->pages = NULL;
74 buf->page_len = 0;
75 buf->flags = 0;
76 buf->len = 0;
77 buf->buflen = PAGE_SIZE;
78 }
79
xprt_alloc_xdr_buf(struct xdr_buf * buf,gfp_t gfp_flags)80 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
81 {
82 struct page *page;
83 /* Preallocate one XDR receive buffer */
84 page = alloc_page(gfp_flags);
85 if (page == NULL)
86 return -ENOMEM;
87 xdr_buf_init(buf, page_address(page), PAGE_SIZE);
88 return 0;
89 }
90
xprt_alloc_bc_req(struct rpc_xprt * xprt)91 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
92 {
93 gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
94 struct rpc_rqst *req;
95
96 /* Pre-allocate one backchannel rpc_rqst */
97 req = kzalloc_obj(*req, gfp_flags);
98 if (req == NULL)
99 return NULL;
100
101 req->rq_xprt = xprt;
102
103 /* Preallocate one XDR receive buffer */
104 if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
105 printk(KERN_ERR "Failed to create bc receive xbuf\n");
106 goto out_free;
107 }
108 req->rq_rcv_buf.len = PAGE_SIZE;
109
110 /* Preallocate one XDR send buffer */
111 if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
112 printk(KERN_ERR "Failed to create bc snd xbuf\n");
113 goto out_free;
114 }
115 return req;
116 out_free:
117 xprt_free_allocation(req);
118 return NULL;
119 }
120
121 /*
122 * Preallocate up to min_reqs structures and related buffers for use
123 * by the backchannel. This function can be called multiple times
124 * when creating new sessions that use the same rpc_xprt. The
125 * preallocated buffers are added to the pool of resources used by
126 * the rpc_xprt. Any one of these resources may be used by an
127 * incoming callback request. It's up to the higher levels in the
128 * stack to enforce that the maximum number of session slots is not
129 * being exceeded.
130 *
131 * Some callback arguments can be large. For example, a pNFS server
132 * using multiple deviceids. The list can be unbound, but the client
133 * has the ability to tell the server the maximum size of the callback
134 * requests. Each deviceID is 16 bytes, so allocate one page
135 * for the arguments to have enough room to receive a number of these
136 * deviceIDs. The NFS client indicates to the pNFS server that its
137 * callback requests can be up to 4096 bytes in size.
138 */
xprt_setup_backchannel(struct rpc_xprt * xprt,unsigned int min_reqs)139 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
140 {
141 if (!xprt->ops->bc_setup)
142 return 0;
143 return xprt->ops->bc_setup(xprt, min_reqs);
144 }
145 EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
146
xprt_setup_bc(struct rpc_xprt * xprt,unsigned int min_reqs)147 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
148 {
149 struct rpc_rqst *req;
150 LIST_HEAD(tmp_list);
151 int i;
152
153 dprintk("RPC: setup backchannel transport\n");
154
155 if (min_reqs > BC_MAX_SLOTS)
156 min_reqs = BC_MAX_SLOTS;
157
158 /*
159 * We use a temporary list to keep track of the preallocated
160 * buffers. Once we're done building the list we splice it
161 * into the backchannel preallocation list off of the rpc_xprt
162 * struct. This helps minimize the amount of time the list
163 * lock is held on the rpc_xprt struct. It also makes cleanup
164 * easier in case of memory allocation errors.
165 */
166 for (i = 0; i < min_reqs; i++) {
167 /* Pre-allocate one backchannel rpc_rqst */
168 req = xprt_alloc_bc_req(xprt);
169 if (req == NULL) {
170 printk(KERN_ERR "Failed to create bc rpc_rqst\n");
171 goto out_free;
172 }
173
174 /* Add the allocated buffer to the tmp list */
175 dprintk("RPC: adding req= %p\n", req);
176 list_add(&req->rq_bc_pa_list, &tmp_list);
177 }
178
179 /*
180 * Add the temporary list to the backchannel preallocation list
181 */
182 spin_lock(&xprt->bc_pa_lock);
183 list_splice(&tmp_list, &xprt->bc_pa_list);
184 xprt->bc_alloc_count += min_reqs;
185 xprt->bc_alloc_max += min_reqs;
186 atomic_add(min_reqs, &xprt->bc_slot_count);
187 spin_unlock(&xprt->bc_pa_lock);
188
189 dprintk("RPC: setup backchannel transport done\n");
190 return 0;
191
192 out_free:
193 /*
194 * Memory allocation failed, free the temporary list
195 */
196 while (!list_empty(&tmp_list)) {
197 req = list_first_entry(&tmp_list,
198 struct rpc_rqst,
199 rq_bc_pa_list);
200 list_del(&req->rq_bc_pa_list);
201 xprt_free_allocation(req);
202 }
203
204 dprintk("RPC: setup backchannel transport failed\n");
205 return -ENOMEM;
206 }
207
208 /**
209 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
210 * @xprt: the transport holding the preallocated strucures
211 * @max_reqs: the maximum number of preallocated structures to destroy
212 *
213 * Since these structures may have been allocated by multiple calls
214 * to xprt_setup_backchannel, we only destroy up to the maximum number
215 * of reqs specified by the caller.
216 */
xprt_destroy_backchannel(struct rpc_xprt * xprt,unsigned int max_reqs)217 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
218 {
219 if (xprt->ops->bc_destroy)
220 xprt->ops->bc_destroy(xprt, max_reqs);
221 }
222 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
223
xprt_destroy_bc(struct rpc_xprt * xprt,unsigned int max_reqs)224 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
225 {
226 struct rpc_rqst *req = NULL, *tmp = NULL;
227
228 dprintk("RPC: destroy backchannel transport\n");
229
230 if (max_reqs == 0)
231 goto out;
232
233 spin_lock_bh(&xprt->bc_pa_lock);
234 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
235 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
236 dprintk("RPC: req=%p\n", req);
237 list_del(&req->rq_bc_pa_list);
238 xprt_free_allocation(req);
239 xprt->bc_alloc_count--;
240 atomic_dec(&xprt->bc_slot_count);
241 if (--max_reqs == 0)
242 break;
243 }
244 spin_unlock_bh(&xprt->bc_pa_lock);
245
246 out:
247 dprintk("RPC: backchannel list empty= %s\n",
248 list_empty(&xprt->bc_pa_list) ? "true" : "false");
249 }
250
xprt_get_bc_request(struct rpc_xprt * xprt,__be32 xid,struct rpc_rqst * new)251 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
252 struct rpc_rqst *new)
253 {
254 struct rpc_rqst *req = NULL;
255
256 dprintk("RPC: allocate a backchannel request\n");
257 if (list_empty(&xprt->bc_pa_list)) {
258 if (!new)
259 goto not_found;
260 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
261 goto not_found;
262 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
263 xprt->bc_alloc_count++;
264 atomic_inc(&xprt->bc_slot_count);
265 }
266 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
267 rq_bc_pa_list);
268 req->rq_reply_bytes_recvd = 0;
269 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
270 sizeof(req->rq_private_buf));
271 req->rq_xid = xid;
272 req->rq_connect_cookie = xprt->connect_cookie;
273 dprintk("RPC: backchannel req=%p\n", req);
274 not_found:
275 return req;
276 }
277
278 /*
279 * Return the preallocated rpc_rqst structure and XDR buffers
280 * associated with this rpc_task.
281 */
xprt_free_bc_request(struct rpc_rqst * req)282 void xprt_free_bc_request(struct rpc_rqst *req)
283 {
284 struct rpc_xprt *xprt = req->rq_xprt;
285
286 xprt->ops->bc_free_rqst(req);
287 }
288
xprt_free_bc_rqst(struct rpc_rqst * req)289 void xprt_free_bc_rqst(struct rpc_rqst *req)
290 {
291 struct rpc_xprt *xprt = req->rq_xprt;
292
293 dprintk("RPC: free backchannel req=%p\n", req);
294
295 req->rq_connect_cookie = xprt->connect_cookie - 1;
296 smp_mb__before_atomic();
297 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
298 smp_mb__after_atomic();
299
300 /*
301 * Return it to the list of preallocations so that it
302 * may be reused by a new callback request.
303 */
304 spin_lock_bh(&xprt->bc_pa_lock);
305 if (xprt_need_to_requeue(xprt)) {
306 xprt_bc_reinit_xdr_buf(&req->rq_snd_buf);
307 xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf);
308 req->rq_rcv_buf.len = PAGE_SIZE;
309 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
310 xprt->bc_alloc_count++;
311 atomic_inc(&xprt->bc_slot_count);
312 req = NULL;
313 }
314 spin_unlock_bh(&xprt->bc_pa_lock);
315 if (req != NULL) {
316 /*
317 * The last remaining session was destroyed while this
318 * entry was in use. Free the entry and don't attempt
319 * to add back to the list because there is no need to
320 * have anymore preallocated entries.
321 */
322 dprintk("RPC: Last session removed req=%p\n", req);
323 xprt_free_allocation(req);
324 }
325 xprt_put(xprt);
326 }
327
328 /*
329 * One or more rpc_rqst structure have been preallocated during the
330 * backchannel setup. Buffer space for the send and private XDR buffers
331 * has been preallocated as well. Use xprt_alloc_bc_request to allocate
332 * to this request. Use xprt_free_bc_request to return it.
333 *
334 * We know that we're called in soft interrupt context, grab the spin_lock
335 * since there is no need to grab the bottom half spin_lock.
336 *
337 * Return an available rpc_rqst, otherwise NULL if non are available.
338 */
xprt_lookup_bc_request(struct rpc_xprt * xprt,__be32 xid)339 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
340 {
341 struct rpc_rqst *req, *new = NULL;
342
343 do {
344 spin_lock(&xprt->bc_pa_lock);
345 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
346 if (req->rq_connect_cookie != xprt->connect_cookie)
347 continue;
348 if (req->rq_xid == xid)
349 goto found;
350 }
351 req = xprt_get_bc_request(xprt, xid, new);
352 found:
353 spin_unlock(&xprt->bc_pa_lock);
354 if (new) {
355 if (req != new)
356 xprt_free_allocation(new);
357 break;
358 } else if (req)
359 break;
360 new = xprt_alloc_bc_req(xprt);
361 } while (new);
362 return req;
363 }
364
365 /*
366 * Add callback request to callback list. Wake a thread
367 * on the first pool (usually the only pool) to handle it.
368 */
xprt_complete_bc_request(struct rpc_rqst * req,uint32_t copied)369 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
370 {
371 struct rpc_xprt *xprt = req->rq_xprt;
372
373 spin_lock(&xprt->bc_pa_lock);
374 list_del(&req->rq_bc_pa_list);
375 xprt->bc_alloc_count--;
376 spin_unlock(&xprt->bc_pa_lock);
377
378 req->rq_private_buf.len = copied;
379 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
380
381 dprintk("RPC: add callback request to list\n");
382 xprt_enqueue_bc_request(req);
383 }
384
xprt_enqueue_bc_request(struct rpc_rqst * req)385 void xprt_enqueue_bc_request(struct rpc_rqst *req)
386 {
387 struct rpc_xprt *xprt = req->rq_xprt;
388 struct svc_serv *bc_serv;
389
390 xprt_get(xprt);
391 spin_lock(&xprt->bc_pa_lock);
392 bc_serv = xprt->bc_serv;
393 if (bc_serv) {
394 lwq_enqueue(&req->rq_bc_list, &bc_serv->sv_cb_list);
395 svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]);
396 }
397 spin_unlock(&xprt->bc_pa_lock);
398 }
399 EXPORT_SYMBOL_GPL(xprt_enqueue_bc_request);
400