1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 4 (c) 2007 Network Appliance, Inc. All Rights Reserved. 5 (c) 2009 NetApp. All Rights Reserved. 6 7 8 ******************************************************************************/ 9 10 #include <linux/tcp.h> 11 #include <linux/slab.h> 12 #include <linux/sunrpc/xprt.h> 13 #include <linux/export.h> 14 #include <linux/sunrpc/bc_xprt.h> 15 16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 17 #define RPCDBG_FACILITY RPCDBG_TRANS 18 #endif 19 20 #define BC_MAX_SLOTS 64U 21 22 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt) 23 { 24 return BC_MAX_SLOTS; 25 } 26 27 /* 28 * Helper routines that track the number of preallocation elements 29 * on the transport. 30 */ 31 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) 32 { 33 return xprt->bc_alloc_count < xprt->bc_alloc_max; 34 } 35 36 /* 37 * Free the preallocated rpc_rqst structure and the memory 38 * buffers hanging off of it. 39 */ 40 static void xprt_free_allocation(struct rpc_rqst *req) 41 { 42 struct xdr_buf *xbufp; 43 44 dprintk("RPC: free allocations for req= %p\n", req); 45 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); 46 xbufp = &req->rq_rcv_buf; 47 free_page((unsigned long)xbufp->head[0].iov_base); 48 xbufp = &req->rq_snd_buf; 49 free_page((unsigned long)xbufp->head[0].iov_base); 50 kfree(req); 51 } 52 53 static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf) 54 { 55 buf->head[0].iov_len = PAGE_SIZE; 56 buf->tail[0].iov_len = 0; 57 buf->pages = NULL; 58 buf->page_len = 0; 59 buf->flags = 0; 60 buf->len = 0; 61 buf->buflen = PAGE_SIZE; 62 } 63 64 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags) 65 { 66 struct page *page; 67 /* Preallocate one XDR receive buffer */ 68 page = alloc_page(gfp_flags); 69 if (page == NULL) 70 return -ENOMEM; 71 xdr_buf_init(buf, page_address(page), PAGE_SIZE); 72 return 0; 73 } 74 75 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt) 76 { 77 gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN; 78 struct rpc_rqst *req; 79 80 /* Pre-allocate one backchannel rpc_rqst */ 81 req = kzalloc(sizeof(*req), gfp_flags); 82 if (req == NULL) 83 return NULL; 84 85 req->rq_xprt = xprt; 86 87 /* Preallocate one XDR receive buffer */ 88 if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) { 89 printk(KERN_ERR "Failed to create bc receive xbuf\n"); 90 goto out_free; 91 } 92 req->rq_rcv_buf.len = PAGE_SIZE; 93 94 /* Preallocate one XDR send buffer */ 95 if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) { 96 printk(KERN_ERR "Failed to create bc snd xbuf\n"); 97 goto out_free; 98 } 99 return req; 100 out_free: 101 xprt_free_allocation(req); 102 return NULL; 103 } 104 105 /* 106 * Preallocate up to min_reqs structures and related buffers for use 107 * by the backchannel. This function can be called multiple times 108 * when creating new sessions that use the same rpc_xprt. The 109 * preallocated buffers are added to the pool of resources used by 110 * the rpc_xprt. Any one of these resources may be used by an 111 * incoming callback request. It's up to the higher levels in the 112 * stack to enforce that the maximum number of session slots is not 113 * being exceeded. 114 * 115 * Some callback arguments can be large. For example, a pNFS server 116 * using multiple deviceids. The list can be unbound, but the client 117 * has the ability to tell the server the maximum size of the callback 118 * requests. Each deviceID is 16 bytes, so allocate one page 119 * for the arguments to have enough room to receive a number of these 120 * deviceIDs. The NFS client indicates to the pNFS server that its 121 * callback requests can be up to 4096 bytes in size. 122 */ 123 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) 124 { 125 if (!xprt->ops->bc_setup) 126 return 0; 127 return xprt->ops->bc_setup(xprt, min_reqs); 128 } 129 EXPORT_SYMBOL_GPL(xprt_setup_backchannel); 130 131 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) 132 { 133 struct rpc_rqst *req; 134 struct list_head tmp_list; 135 int i; 136 137 dprintk("RPC: setup backchannel transport\n"); 138 139 if (min_reqs > BC_MAX_SLOTS) 140 min_reqs = BC_MAX_SLOTS; 141 142 /* 143 * We use a temporary list to keep track of the preallocated 144 * buffers. Once we're done building the list we splice it 145 * into the backchannel preallocation list off of the rpc_xprt 146 * struct. This helps minimize the amount of time the list 147 * lock is held on the rpc_xprt struct. It also makes cleanup 148 * easier in case of memory allocation errors. 149 */ 150 INIT_LIST_HEAD(&tmp_list); 151 for (i = 0; i < min_reqs; i++) { 152 /* Pre-allocate one backchannel rpc_rqst */ 153 req = xprt_alloc_bc_req(xprt); 154 if (req == NULL) { 155 printk(KERN_ERR "Failed to create bc rpc_rqst\n"); 156 goto out_free; 157 } 158 159 /* Add the allocated buffer to the tmp list */ 160 dprintk("RPC: adding req= %p\n", req); 161 list_add(&req->rq_bc_pa_list, &tmp_list); 162 } 163 164 /* 165 * Add the temporary list to the backchannel preallocation list 166 */ 167 spin_lock(&xprt->bc_pa_lock); 168 list_splice(&tmp_list, &xprt->bc_pa_list); 169 xprt->bc_alloc_count += min_reqs; 170 xprt->bc_alloc_max += min_reqs; 171 atomic_add(min_reqs, &xprt->bc_slot_count); 172 spin_unlock(&xprt->bc_pa_lock); 173 174 dprintk("RPC: setup backchannel transport done\n"); 175 return 0; 176 177 out_free: 178 /* 179 * Memory allocation failed, free the temporary list 180 */ 181 while (!list_empty(&tmp_list)) { 182 req = list_first_entry(&tmp_list, 183 struct rpc_rqst, 184 rq_bc_pa_list); 185 list_del(&req->rq_bc_pa_list); 186 xprt_free_allocation(req); 187 } 188 189 dprintk("RPC: setup backchannel transport failed\n"); 190 return -ENOMEM; 191 } 192 193 /** 194 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures. 195 * @xprt: the transport holding the preallocated strucures 196 * @max_reqs: the maximum number of preallocated structures to destroy 197 * 198 * Since these structures may have been allocated by multiple calls 199 * to xprt_setup_backchannel, we only destroy up to the maximum number 200 * of reqs specified by the caller. 201 */ 202 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) 203 { 204 if (xprt->ops->bc_destroy) 205 xprt->ops->bc_destroy(xprt, max_reqs); 206 } 207 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); 208 209 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs) 210 { 211 struct rpc_rqst *req = NULL, *tmp = NULL; 212 213 dprintk("RPC: destroy backchannel transport\n"); 214 215 if (max_reqs == 0) 216 goto out; 217 218 spin_lock_bh(&xprt->bc_pa_lock); 219 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max); 220 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 221 dprintk("RPC: req=%p\n", req); 222 list_del(&req->rq_bc_pa_list); 223 xprt_free_allocation(req); 224 xprt->bc_alloc_count--; 225 atomic_dec(&xprt->bc_slot_count); 226 if (--max_reqs == 0) 227 break; 228 } 229 spin_unlock_bh(&xprt->bc_pa_lock); 230 231 out: 232 dprintk("RPC: backchannel list empty= %s\n", 233 list_empty(&xprt->bc_pa_list) ? "true" : "false"); 234 } 235 236 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid, 237 struct rpc_rqst *new) 238 { 239 struct rpc_rqst *req = NULL; 240 241 dprintk("RPC: allocate a backchannel request\n"); 242 if (list_empty(&xprt->bc_pa_list)) { 243 if (!new) 244 goto not_found; 245 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS) 246 goto not_found; 247 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list); 248 xprt->bc_alloc_count++; 249 atomic_inc(&xprt->bc_slot_count); 250 } 251 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, 252 rq_bc_pa_list); 253 req->rq_reply_bytes_recvd = 0; 254 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 255 sizeof(req->rq_private_buf)); 256 req->rq_xid = xid; 257 req->rq_connect_cookie = xprt->connect_cookie; 258 dprintk("RPC: backchannel req=%p\n", req); 259 not_found: 260 return req; 261 } 262 263 /* 264 * Return the preallocated rpc_rqst structure and XDR buffers 265 * associated with this rpc_task. 266 */ 267 void xprt_free_bc_request(struct rpc_rqst *req) 268 { 269 struct rpc_xprt *xprt = req->rq_xprt; 270 271 xprt->ops->bc_free_rqst(req); 272 } 273 274 void xprt_free_bc_rqst(struct rpc_rqst *req) 275 { 276 struct rpc_xprt *xprt = req->rq_xprt; 277 278 dprintk("RPC: free backchannel req=%p\n", req); 279 280 req->rq_connect_cookie = xprt->connect_cookie - 1; 281 smp_mb__before_atomic(); 282 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 283 smp_mb__after_atomic(); 284 285 /* 286 * Return it to the list of preallocations so that it 287 * may be reused by a new callback request. 288 */ 289 spin_lock_bh(&xprt->bc_pa_lock); 290 if (xprt_need_to_requeue(xprt)) { 291 xprt_bc_reinit_xdr_buf(&req->rq_snd_buf); 292 xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf); 293 req->rq_rcv_buf.len = PAGE_SIZE; 294 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); 295 xprt->bc_alloc_count++; 296 atomic_inc(&xprt->bc_slot_count); 297 req = NULL; 298 } 299 spin_unlock_bh(&xprt->bc_pa_lock); 300 if (req != NULL) { 301 /* 302 * The last remaining session was destroyed while this 303 * entry was in use. Free the entry and don't attempt 304 * to add back to the list because there is no need to 305 * have anymore preallocated entries. 306 */ 307 dprintk("RPC: Last session removed req=%p\n", req); 308 xprt_free_allocation(req); 309 } 310 xprt_put(xprt); 311 } 312 313 /* 314 * One or more rpc_rqst structure have been preallocated during the 315 * backchannel setup. Buffer space for the send and private XDR buffers 316 * has been preallocated as well. Use xprt_alloc_bc_request to allocate 317 * to this request. Use xprt_free_bc_request to return it. 318 * 319 * We know that we're called in soft interrupt context, grab the spin_lock 320 * since there is no need to grab the bottom half spin_lock. 321 * 322 * Return an available rpc_rqst, otherwise NULL if non are available. 323 */ 324 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid) 325 { 326 struct rpc_rqst *req, *new = NULL; 327 328 do { 329 spin_lock(&xprt->bc_pa_lock); 330 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) { 331 if (req->rq_connect_cookie != xprt->connect_cookie) 332 continue; 333 if (req->rq_xid == xid) 334 goto found; 335 } 336 req = xprt_get_bc_request(xprt, xid, new); 337 found: 338 spin_unlock(&xprt->bc_pa_lock); 339 if (new) { 340 if (req != new) 341 xprt_free_allocation(new); 342 break; 343 } else if (req) 344 break; 345 new = xprt_alloc_bc_req(xprt); 346 } while (new); 347 return req; 348 } 349 350 /* 351 * Add callback request to callback list. Wake a thread 352 * on the first pool (usually the only pool) to handle it. 353 */ 354 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied) 355 { 356 struct rpc_xprt *xprt = req->rq_xprt; 357 struct svc_serv *bc_serv = xprt->bc_serv; 358 359 spin_lock(&xprt->bc_pa_lock); 360 list_del(&req->rq_bc_pa_list); 361 xprt->bc_alloc_count--; 362 spin_unlock(&xprt->bc_pa_lock); 363 364 req->rq_private_buf.len = copied; 365 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 366 367 dprintk("RPC: add callback request to list\n"); 368 xprt_get(xprt); 369 lwq_enqueue(&req->rq_bc_list, &bc_serv->sv_cb_list); 370 svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]); 371 } 372