1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 4 (c) 2007 Network Appliance, Inc. All Rights Reserved. 5 (c) 2009 NetApp. All Rights Reserved. 6 7 8 ******************************************************************************/ 9 10 #include <linux/tcp.h> 11 #include <linux/slab.h> 12 #include <linux/sunrpc/xprt.h> 13 #include <linux/export.h> 14 #include <linux/sunrpc/bc_xprt.h> 15 16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 17 #define RPCDBG_FACILITY RPCDBG_TRANS 18 #endif 19 20 #define BC_MAX_SLOTS 64U 21 22 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt) 23 { 24 return BC_MAX_SLOTS; 25 } 26 27 /* 28 * Helper routines that track the number of preallocation elements 29 * on the transport. 30 */ 31 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) 32 { 33 return xprt->bc_alloc_count < xprt->bc_alloc_max; 34 } 35 36 /* 37 * Free the preallocated rpc_rqst structure and the memory 38 * buffers hanging off of it. 39 */ 40 static void xprt_free_allocation(struct rpc_rqst *req) 41 { 42 struct xdr_buf *xbufp; 43 44 dprintk("RPC: free allocations for req= %p\n", req); 45 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); 46 xbufp = &req->rq_rcv_buf; 47 free_page((unsigned long)xbufp->head[0].iov_base); 48 xbufp = &req->rq_snd_buf; 49 free_page((unsigned long)xbufp->head[0].iov_base); 50 kfree(req); 51 } 52 53 static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf) 54 { 55 buf->head[0].iov_len = PAGE_SIZE; 56 buf->tail[0].iov_len = 0; 57 buf->pages = NULL; 58 buf->page_len = 0; 59 buf->flags = 0; 60 buf->len = 0; 61 buf->buflen = PAGE_SIZE; 62 } 63 64 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags) 65 { 66 struct page *page; 67 /* Preallocate one XDR receive buffer */ 68 page = alloc_page(gfp_flags); 69 if (page == NULL) 70 return -ENOMEM; 71 xdr_buf_init(buf, page_address(page), PAGE_SIZE); 72 return 0; 73 } 74 75 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt) 76 { 77 gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN; 78 struct rpc_rqst *req; 79 80 /* Pre-allocate one backchannel rpc_rqst */ 81 req = kzalloc(sizeof(*req), gfp_flags); 82 if (req == NULL) 83 return NULL; 84 85 req->rq_xprt = xprt; 86 INIT_LIST_HEAD(&req->rq_bc_list); 87 88 /* Preallocate one XDR receive buffer */ 89 if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) { 90 printk(KERN_ERR "Failed to create bc receive xbuf\n"); 91 goto out_free; 92 } 93 req->rq_rcv_buf.len = PAGE_SIZE; 94 95 /* Preallocate one XDR send buffer */ 96 if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) { 97 printk(KERN_ERR "Failed to create bc snd xbuf\n"); 98 goto out_free; 99 } 100 return req; 101 out_free: 102 xprt_free_allocation(req); 103 return NULL; 104 } 105 106 /* 107 * Preallocate up to min_reqs structures and related buffers for use 108 * by the backchannel. This function can be called multiple times 109 * when creating new sessions that use the same rpc_xprt. The 110 * preallocated buffers are added to the pool of resources used by 111 * the rpc_xprt. Any one of these resources may be used by an 112 * incoming callback request. It's up to the higher levels in the 113 * stack to enforce that the maximum number of session slots is not 114 * being exceeded. 115 * 116 * Some callback arguments can be large. For example, a pNFS server 117 * using multiple deviceids. The list can be unbound, but the client 118 * has the ability to tell the server the maximum size of the callback 119 * requests. Each deviceID is 16 bytes, so allocate one page 120 * for the arguments to have enough room to receive a number of these 121 * deviceIDs. The NFS client indicates to the pNFS server that its 122 * callback requests can be up to 4096 bytes in size. 123 */ 124 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) 125 { 126 if (!xprt->ops->bc_setup) 127 return 0; 128 return xprt->ops->bc_setup(xprt, min_reqs); 129 } 130 EXPORT_SYMBOL_GPL(xprt_setup_backchannel); 131 132 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) 133 { 134 struct rpc_rqst *req; 135 struct list_head tmp_list; 136 int i; 137 138 dprintk("RPC: setup backchannel transport\n"); 139 140 if (min_reqs > BC_MAX_SLOTS) 141 min_reqs = BC_MAX_SLOTS; 142 143 /* 144 * We use a temporary list to keep track of the preallocated 145 * buffers. Once we're done building the list we splice it 146 * into the backchannel preallocation list off of the rpc_xprt 147 * struct. This helps minimize the amount of time the list 148 * lock is held on the rpc_xprt struct. It also makes cleanup 149 * easier in case of memory allocation errors. 150 */ 151 INIT_LIST_HEAD(&tmp_list); 152 for (i = 0; i < min_reqs; i++) { 153 /* Pre-allocate one backchannel rpc_rqst */ 154 req = xprt_alloc_bc_req(xprt); 155 if (req == NULL) { 156 printk(KERN_ERR "Failed to create bc rpc_rqst\n"); 157 goto out_free; 158 } 159 160 /* Add the allocated buffer to the tmp list */ 161 dprintk("RPC: adding req= %p\n", req); 162 list_add(&req->rq_bc_pa_list, &tmp_list); 163 } 164 165 /* 166 * Add the temporary list to the backchannel preallocation list 167 */ 168 spin_lock(&xprt->bc_pa_lock); 169 list_splice(&tmp_list, &xprt->bc_pa_list); 170 xprt->bc_alloc_count += min_reqs; 171 xprt->bc_alloc_max += min_reqs; 172 atomic_add(min_reqs, &xprt->bc_slot_count); 173 spin_unlock(&xprt->bc_pa_lock); 174 175 dprintk("RPC: setup backchannel transport done\n"); 176 return 0; 177 178 out_free: 179 /* 180 * Memory allocation failed, free the temporary list 181 */ 182 while (!list_empty(&tmp_list)) { 183 req = list_first_entry(&tmp_list, 184 struct rpc_rqst, 185 rq_bc_pa_list); 186 list_del(&req->rq_bc_pa_list); 187 xprt_free_allocation(req); 188 } 189 190 dprintk("RPC: setup backchannel transport failed\n"); 191 return -ENOMEM; 192 } 193 194 /** 195 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures. 196 * @xprt: the transport holding the preallocated strucures 197 * @max_reqs: the maximum number of preallocated structures to destroy 198 * 199 * Since these structures may have been allocated by multiple calls 200 * to xprt_setup_backchannel, we only destroy up to the maximum number 201 * of reqs specified by the caller. 202 */ 203 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) 204 { 205 if (xprt->ops->bc_destroy) 206 xprt->ops->bc_destroy(xprt, max_reqs); 207 } 208 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); 209 210 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs) 211 { 212 struct rpc_rqst *req = NULL, *tmp = NULL; 213 214 dprintk("RPC: destroy backchannel transport\n"); 215 216 if (max_reqs == 0) 217 goto out; 218 219 spin_lock_bh(&xprt->bc_pa_lock); 220 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max); 221 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 222 dprintk("RPC: req=%p\n", req); 223 list_del(&req->rq_bc_pa_list); 224 xprt_free_allocation(req); 225 xprt->bc_alloc_count--; 226 atomic_dec(&xprt->bc_slot_count); 227 if (--max_reqs == 0) 228 break; 229 } 230 spin_unlock_bh(&xprt->bc_pa_lock); 231 232 out: 233 dprintk("RPC: backchannel list empty= %s\n", 234 list_empty(&xprt->bc_pa_list) ? "true" : "false"); 235 } 236 237 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid, 238 struct rpc_rqst *new) 239 { 240 struct rpc_rqst *req = NULL; 241 242 dprintk("RPC: allocate a backchannel request\n"); 243 if (list_empty(&xprt->bc_pa_list)) { 244 if (!new) 245 goto not_found; 246 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS) 247 goto not_found; 248 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list); 249 xprt->bc_alloc_count++; 250 atomic_inc(&xprt->bc_slot_count); 251 } 252 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, 253 rq_bc_pa_list); 254 req->rq_reply_bytes_recvd = 0; 255 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 256 sizeof(req->rq_private_buf)); 257 req->rq_xid = xid; 258 req->rq_connect_cookie = xprt->connect_cookie; 259 dprintk("RPC: backchannel req=%p\n", req); 260 not_found: 261 return req; 262 } 263 264 /* 265 * Return the preallocated rpc_rqst structure and XDR buffers 266 * associated with this rpc_task. 267 */ 268 void xprt_free_bc_request(struct rpc_rqst *req) 269 { 270 struct rpc_xprt *xprt = req->rq_xprt; 271 272 xprt->ops->bc_free_rqst(req); 273 } 274 275 void xprt_free_bc_rqst(struct rpc_rqst *req) 276 { 277 struct rpc_xprt *xprt = req->rq_xprt; 278 279 dprintk("RPC: free backchannel req=%p\n", req); 280 281 req->rq_connect_cookie = xprt->connect_cookie - 1; 282 smp_mb__before_atomic(); 283 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 284 smp_mb__after_atomic(); 285 286 /* 287 * Return it to the list of preallocations so that it 288 * may be reused by a new callback request. 289 */ 290 spin_lock_bh(&xprt->bc_pa_lock); 291 if (xprt_need_to_requeue(xprt)) { 292 xprt_bc_reinit_xdr_buf(&req->rq_snd_buf); 293 xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf); 294 req->rq_rcv_buf.len = PAGE_SIZE; 295 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); 296 xprt->bc_alloc_count++; 297 atomic_inc(&xprt->bc_slot_count); 298 req = NULL; 299 } 300 spin_unlock_bh(&xprt->bc_pa_lock); 301 if (req != NULL) { 302 /* 303 * The last remaining session was destroyed while this 304 * entry was in use. Free the entry and don't attempt 305 * to add back to the list because there is no need to 306 * have anymore preallocated entries. 307 */ 308 dprintk("RPC: Last session removed req=%p\n", req); 309 xprt_free_allocation(req); 310 } 311 xprt_put(xprt); 312 } 313 314 /* 315 * One or more rpc_rqst structure have been preallocated during the 316 * backchannel setup. Buffer space for the send and private XDR buffers 317 * has been preallocated as well. Use xprt_alloc_bc_request to allocate 318 * to this request. Use xprt_free_bc_request to return it. 319 * 320 * We know that we're called in soft interrupt context, grab the spin_lock 321 * since there is no need to grab the bottom half spin_lock. 322 * 323 * Return an available rpc_rqst, otherwise NULL if non are available. 324 */ 325 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid) 326 { 327 struct rpc_rqst *req, *new = NULL; 328 329 do { 330 spin_lock(&xprt->bc_pa_lock); 331 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) { 332 if (req->rq_connect_cookie != xprt->connect_cookie) 333 continue; 334 if (req->rq_xid == xid) 335 goto found; 336 } 337 req = xprt_get_bc_request(xprt, xid, new); 338 found: 339 spin_unlock(&xprt->bc_pa_lock); 340 if (new) { 341 if (req != new) 342 xprt_free_allocation(new); 343 break; 344 } else if (req) 345 break; 346 new = xprt_alloc_bc_req(xprt); 347 } while (new); 348 return req; 349 } 350 351 /* 352 * Add callback request to callback list. The callback 353 * service sleeps on the sv_cb_waitq waiting for new 354 * requests. Wake it up after adding enqueing the 355 * request. 356 */ 357 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied) 358 { 359 struct rpc_xprt *xprt = req->rq_xprt; 360 struct svc_serv *bc_serv = xprt->bc_serv; 361 362 spin_lock(&xprt->bc_pa_lock); 363 list_del(&req->rq_bc_pa_list); 364 xprt->bc_alloc_count--; 365 spin_unlock(&xprt->bc_pa_lock); 366 367 req->rq_private_buf.len = copied; 368 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 369 370 dprintk("RPC: add callback request to list\n"); 371 xprt_get(xprt); 372 spin_lock(&bc_serv->sv_cb_lock); 373 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); 374 wake_up(&bc_serv->sv_cb_waitq); 375 spin_unlock(&bc_serv->sv_cb_lock); 376 } 377