xref: /linux/net/sunrpc/backchannel_rqst.c (revision 6bb34aff1ebdd4ee8ea1721068f74d476d707f01)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 
4 (c) 2007 Network Appliance, Inc.  All Rights Reserved.
5 (c) 2009 NetApp.  All Rights Reserved.
6 
7 
8 ******************************************************************************/
9 
10 #include <linux/tcp.h>
11 #include <linux/slab.h>
12 #include <linux/sunrpc/xprt.h>
13 #include <linux/export.h>
14 #include <linux/sunrpc/bc_xprt.h>
15 
16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
17 #define RPCDBG_FACILITY	RPCDBG_TRANS
18 #endif
19 
20 #define BC_MAX_SLOTS	64U
21 
xprt_bc_max_slots(struct rpc_xprt * xprt)22 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
23 {
24 	return BC_MAX_SLOTS;
25 }
26 
27 /*
28  * Helper function to nullify backchannel server pointer in transport.
29  * We need to synchronize setting the pointer to NULL (done so after
30  * the backchannel server is shutdown) with the usage of that pointer
31  * by the backchannel request processing routines
32  * xprt_complete_bc_request() and rpcrdma_bc_receive_call().
33  */
xprt_svc_destroy_nullify_bc(struct rpc_xprt * xprt,struct svc_serv ** serv)34 void xprt_svc_destroy_nullify_bc(struct rpc_xprt *xprt, struct svc_serv **serv)
35 {
36 	spin_lock(&xprt->bc_pa_lock);
37 	svc_destroy(serv);
38 	xprt->bc_serv = NULL;
39 	spin_unlock(&xprt->bc_pa_lock);
40 }
41 EXPORT_SYMBOL_GPL(xprt_svc_destroy_nullify_bc);
42 
43 /*
44  * Helper routines that track the number of preallocation elements
45  * on the transport.
46  */
xprt_need_to_requeue(struct rpc_xprt * xprt)47 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
48 {
49 	return xprt->bc_alloc_count < xprt->bc_alloc_max;
50 }
51 
52 /*
53  * Free the preallocated rpc_rqst structure and the memory
54  * buffers hanging off of it.
55  */
xprt_free_allocation(struct rpc_rqst * req)56 static void xprt_free_allocation(struct rpc_rqst *req)
57 {
58 	struct xdr_buf *xbufp;
59 
60 	dprintk("RPC:        free allocations for req= %p\n", req);
61 	WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
62 	xbufp = &req->rq_rcv_buf;
63 	free_page((unsigned long)xbufp->head[0].iov_base);
64 	xbufp = &req->rq_snd_buf;
65 	free_page((unsigned long)xbufp->head[0].iov_base);
66 	kfree(req);
67 }
68 
xprt_bc_reinit_xdr_buf(struct xdr_buf * buf)69 static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
70 {
71 	buf->head[0].iov_len = PAGE_SIZE;
72 	buf->tail[0].iov_len = 0;
73 	buf->pages = NULL;
74 	buf->page_len = 0;
75 	buf->flags = 0;
76 	buf->len = 0;
77 	buf->buflen = PAGE_SIZE;
78 }
79 
xprt_alloc_xdr_buf(struct xdr_buf * buf,gfp_t gfp_flags)80 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
81 {
82 	struct page *page;
83 	/* Preallocate one XDR receive buffer */
84 	page = alloc_page(gfp_flags);
85 	if (page == NULL)
86 		return -ENOMEM;
87 	xdr_buf_init(buf, page_address(page), PAGE_SIZE);
88 	return 0;
89 }
90 
xprt_alloc_bc_req(struct rpc_xprt * xprt)91 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
92 {
93 	gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
94 	struct rpc_rqst *req;
95 
96 	/* Pre-allocate one backchannel rpc_rqst */
97 	req = kzalloc(sizeof(*req), gfp_flags);
98 	if (req == NULL)
99 		return NULL;
100 
101 	req->rq_xprt = xprt;
102 
103 	/* Preallocate one XDR receive buffer */
104 	if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
105 		printk(KERN_ERR "Failed to create bc receive xbuf\n");
106 		goto out_free;
107 	}
108 	req->rq_rcv_buf.len = PAGE_SIZE;
109 
110 	/* Preallocate one XDR send buffer */
111 	if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
112 		printk(KERN_ERR "Failed to create bc snd xbuf\n");
113 		goto out_free;
114 	}
115 	return req;
116 out_free:
117 	xprt_free_allocation(req);
118 	return NULL;
119 }
120 
121 /*
122  * Preallocate up to min_reqs structures and related buffers for use
123  * by the backchannel.  This function can be called multiple times
124  * when creating new sessions that use the same rpc_xprt.  The
125  * preallocated buffers are added to the pool of resources used by
126  * the rpc_xprt.  Any one of these resources may be used by an
127  * incoming callback request.  It's up to the higher levels in the
128  * stack to enforce that the maximum number of session slots is not
129  * being exceeded.
130  *
131  * Some callback arguments can be large.  For example, a pNFS server
132  * using multiple deviceids.  The list can be unbound, but the client
133  * has the ability to tell the server the maximum size of the callback
134  * requests.  Each deviceID is 16 bytes, so allocate one page
135  * for the arguments to have enough room to receive a number of these
136  * deviceIDs.  The NFS client indicates to the pNFS server that its
137  * callback requests can be up to 4096 bytes in size.
138  */
xprt_setup_backchannel(struct rpc_xprt * xprt,unsigned int min_reqs)139 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
140 {
141 	if (!xprt->ops->bc_setup)
142 		return 0;
143 	return xprt->ops->bc_setup(xprt, min_reqs);
144 }
145 EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
146 
xprt_setup_bc(struct rpc_xprt * xprt,unsigned int min_reqs)147 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
148 {
149 	struct rpc_rqst *req;
150 	struct list_head tmp_list;
151 	int i;
152 
153 	dprintk("RPC:       setup backchannel transport\n");
154 
155 	if (min_reqs > BC_MAX_SLOTS)
156 		min_reqs = BC_MAX_SLOTS;
157 
158 	/*
159 	 * We use a temporary list to keep track of the preallocated
160 	 * buffers.  Once we're done building the list we splice it
161 	 * into the backchannel preallocation list off of the rpc_xprt
162 	 * struct.  This helps minimize the amount of time the list
163 	 * lock is held on the rpc_xprt struct.  It also makes cleanup
164 	 * easier in case of memory allocation errors.
165 	 */
166 	INIT_LIST_HEAD(&tmp_list);
167 	for (i = 0; i < min_reqs; i++) {
168 		/* Pre-allocate one backchannel rpc_rqst */
169 		req = xprt_alloc_bc_req(xprt);
170 		if (req == NULL) {
171 			printk(KERN_ERR "Failed to create bc rpc_rqst\n");
172 			goto out_free;
173 		}
174 
175 		/* Add the allocated buffer to the tmp list */
176 		dprintk("RPC:       adding req= %p\n", req);
177 		list_add(&req->rq_bc_pa_list, &tmp_list);
178 	}
179 
180 	/*
181 	 * Add the temporary list to the backchannel preallocation list
182 	 */
183 	spin_lock(&xprt->bc_pa_lock);
184 	list_splice(&tmp_list, &xprt->bc_pa_list);
185 	xprt->bc_alloc_count += min_reqs;
186 	xprt->bc_alloc_max += min_reqs;
187 	atomic_add(min_reqs, &xprt->bc_slot_count);
188 	spin_unlock(&xprt->bc_pa_lock);
189 
190 	dprintk("RPC:       setup backchannel transport done\n");
191 	return 0;
192 
193 out_free:
194 	/*
195 	 * Memory allocation failed, free the temporary list
196 	 */
197 	while (!list_empty(&tmp_list)) {
198 		req = list_first_entry(&tmp_list,
199 				struct rpc_rqst,
200 				rq_bc_pa_list);
201 		list_del(&req->rq_bc_pa_list);
202 		xprt_free_allocation(req);
203 	}
204 
205 	dprintk("RPC:       setup backchannel transport failed\n");
206 	return -ENOMEM;
207 }
208 
209 /**
210  * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
211  * @xprt:	the transport holding the preallocated strucures
212  * @max_reqs:	the maximum number of preallocated structures to destroy
213  *
214  * Since these structures may have been allocated by multiple calls
215  * to xprt_setup_backchannel, we only destroy up to the maximum number
216  * of reqs specified by the caller.
217  */
xprt_destroy_backchannel(struct rpc_xprt * xprt,unsigned int max_reqs)218 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
219 {
220 	if (xprt->ops->bc_destroy)
221 		xprt->ops->bc_destroy(xprt, max_reqs);
222 }
223 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
224 
xprt_destroy_bc(struct rpc_xprt * xprt,unsigned int max_reqs)225 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
226 {
227 	struct rpc_rqst *req = NULL, *tmp = NULL;
228 
229 	dprintk("RPC:        destroy backchannel transport\n");
230 
231 	if (max_reqs == 0)
232 		goto out;
233 
234 	spin_lock_bh(&xprt->bc_pa_lock);
235 	xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
236 	list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
237 		dprintk("RPC:        req=%p\n", req);
238 		list_del(&req->rq_bc_pa_list);
239 		xprt_free_allocation(req);
240 		xprt->bc_alloc_count--;
241 		atomic_dec(&xprt->bc_slot_count);
242 		if (--max_reqs == 0)
243 			break;
244 	}
245 	spin_unlock_bh(&xprt->bc_pa_lock);
246 
247 out:
248 	dprintk("RPC:        backchannel list empty= %s\n",
249 		list_empty(&xprt->bc_pa_list) ? "true" : "false");
250 }
251 
xprt_get_bc_request(struct rpc_xprt * xprt,__be32 xid,struct rpc_rqst * new)252 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
253 		struct rpc_rqst *new)
254 {
255 	struct rpc_rqst *req = NULL;
256 
257 	dprintk("RPC:       allocate a backchannel request\n");
258 	if (list_empty(&xprt->bc_pa_list)) {
259 		if (!new)
260 			goto not_found;
261 		if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
262 			goto not_found;
263 		list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
264 		xprt->bc_alloc_count++;
265 		atomic_inc(&xprt->bc_slot_count);
266 	}
267 	req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
268 				rq_bc_pa_list);
269 	req->rq_reply_bytes_recvd = 0;
270 	memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
271 			sizeof(req->rq_private_buf));
272 	req->rq_xid = xid;
273 	req->rq_connect_cookie = xprt->connect_cookie;
274 	dprintk("RPC:       backchannel req=%p\n", req);
275 not_found:
276 	return req;
277 }
278 
279 /*
280  * Return the preallocated rpc_rqst structure and XDR buffers
281  * associated with this rpc_task.
282  */
xprt_free_bc_request(struct rpc_rqst * req)283 void xprt_free_bc_request(struct rpc_rqst *req)
284 {
285 	struct rpc_xprt *xprt = req->rq_xprt;
286 
287 	xprt->ops->bc_free_rqst(req);
288 }
289 
xprt_free_bc_rqst(struct rpc_rqst * req)290 void xprt_free_bc_rqst(struct rpc_rqst *req)
291 {
292 	struct rpc_xprt *xprt = req->rq_xprt;
293 
294 	dprintk("RPC:       free backchannel req=%p\n", req);
295 
296 	req->rq_connect_cookie = xprt->connect_cookie - 1;
297 	smp_mb__before_atomic();
298 	clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
299 	smp_mb__after_atomic();
300 
301 	/*
302 	 * Return it to the list of preallocations so that it
303 	 * may be reused by a new callback request.
304 	 */
305 	spin_lock_bh(&xprt->bc_pa_lock);
306 	if (xprt_need_to_requeue(xprt)) {
307 		xprt_bc_reinit_xdr_buf(&req->rq_snd_buf);
308 		xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf);
309 		req->rq_rcv_buf.len = PAGE_SIZE;
310 		list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
311 		xprt->bc_alloc_count++;
312 		atomic_inc(&xprt->bc_slot_count);
313 		req = NULL;
314 	}
315 	spin_unlock_bh(&xprt->bc_pa_lock);
316 	if (req != NULL) {
317 		/*
318 		 * The last remaining session was destroyed while this
319 		 * entry was in use.  Free the entry and don't attempt
320 		 * to add back to the list because there is no need to
321 		 * have anymore preallocated entries.
322 		 */
323 		dprintk("RPC:       Last session removed req=%p\n", req);
324 		xprt_free_allocation(req);
325 	}
326 	xprt_put(xprt);
327 }
328 
329 /*
330  * One or more rpc_rqst structure have been preallocated during the
331  * backchannel setup.  Buffer space for the send and private XDR buffers
332  * has been preallocated as well.  Use xprt_alloc_bc_request to allocate
333  * to this request.  Use xprt_free_bc_request to return it.
334  *
335  * We know that we're called in soft interrupt context, grab the spin_lock
336  * since there is no need to grab the bottom half spin_lock.
337  *
338  * Return an available rpc_rqst, otherwise NULL if non are available.
339  */
xprt_lookup_bc_request(struct rpc_xprt * xprt,__be32 xid)340 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
341 {
342 	struct rpc_rqst *req, *new = NULL;
343 
344 	do {
345 		spin_lock(&xprt->bc_pa_lock);
346 		list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
347 			if (req->rq_connect_cookie != xprt->connect_cookie)
348 				continue;
349 			if (req->rq_xid == xid)
350 				goto found;
351 		}
352 		req = xprt_get_bc_request(xprt, xid, new);
353 found:
354 		spin_unlock(&xprt->bc_pa_lock);
355 		if (new) {
356 			if (req != new)
357 				xprt_free_allocation(new);
358 			break;
359 		} else if (req)
360 			break;
361 		new = xprt_alloc_bc_req(xprt);
362 	} while (new);
363 	return req;
364 }
365 
366 /*
367  * Add callback request to callback list.  Wake a thread
368  * on the first pool (usually the only pool) to handle it.
369  */
xprt_complete_bc_request(struct rpc_rqst * req,uint32_t copied)370 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
371 {
372 	struct rpc_xprt *xprt = req->rq_xprt;
373 
374 	spin_lock(&xprt->bc_pa_lock);
375 	list_del(&req->rq_bc_pa_list);
376 	xprt->bc_alloc_count--;
377 	spin_unlock(&xprt->bc_pa_lock);
378 
379 	req->rq_private_buf.len = copied;
380 	set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
381 
382 	dprintk("RPC:       add callback request to list\n");
383 	xprt_enqueue_bc_request(req);
384 }
385 
xprt_enqueue_bc_request(struct rpc_rqst * req)386 void xprt_enqueue_bc_request(struct rpc_rqst *req)
387 {
388 	struct rpc_xprt *xprt = req->rq_xprt;
389 	struct svc_serv *bc_serv;
390 
391 	xprt_get(xprt);
392 	spin_lock(&xprt->bc_pa_lock);
393 	bc_serv = xprt->bc_serv;
394 	if (bc_serv) {
395 		lwq_enqueue(&req->rq_bc_list, &bc_serv->sv_cb_list);
396 		svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]);
397 	}
398 	spin_unlock(&xprt->bc_pa_lock);
399 }
400 EXPORT_SYMBOL_GPL(xprt_enqueue_bc_request);
401