1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 4 (c) 2007 Network Appliance, Inc. All Rights Reserved. 5 (c) 2009 NetApp. All Rights Reserved. 6 7 8 ******************************************************************************/ 9 10 #include <linux/tcp.h> 11 #include <linux/slab.h> 12 #include <linux/sunrpc/xprt.h> 13 #include <linux/export.h> 14 #include <linux/sunrpc/bc_xprt.h> 15 16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 17 #define RPCDBG_FACILITY RPCDBG_TRANS 18 #endif 19 20 #define BC_MAX_SLOTS 64U 21 22 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt) 23 { 24 return BC_MAX_SLOTS; 25 } 26 27 /* 28 * Helper function to nullify backchannel server pointer in transport. 29 * We need to synchronize setting the pointer to NULL (done so after 30 * the backchannel server is shutdown) with the usage of that pointer 31 * by the backchannel request processing routines 32 * xprt_complete_bc_request() and rpcrdma_bc_receive_call(). 33 */ 34 void xprt_svc_destroy_nullify_bc(struct rpc_xprt *xprt, struct svc_serv **serv) 35 { 36 spin_lock(&xprt->bc_pa_lock); 37 svc_destroy(serv); 38 xprt->bc_serv = NULL; 39 spin_unlock(&xprt->bc_pa_lock); 40 } 41 EXPORT_SYMBOL_GPL(xprt_svc_destroy_nullify_bc); 42 43 /* 44 * Helper routines that track the number of preallocation elements 45 * on the transport. 46 */ 47 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) 48 { 49 return xprt->bc_alloc_count < xprt->bc_alloc_max; 50 } 51 52 /* 53 * Free the preallocated rpc_rqst structure and the memory 54 * buffers hanging off of it. 55 */ 56 static void xprt_free_allocation(struct rpc_rqst *req) 57 { 58 struct xdr_buf *xbufp; 59 60 dprintk("RPC: free allocations for req= %p\n", req); 61 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); 62 xbufp = &req->rq_rcv_buf; 63 free_page((unsigned long)xbufp->head[0].iov_base); 64 xbufp = &req->rq_snd_buf; 65 free_page((unsigned long)xbufp->head[0].iov_base); 66 kfree(req); 67 } 68 69 static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf) 70 { 71 buf->head[0].iov_len = PAGE_SIZE; 72 buf->tail[0].iov_len = 0; 73 buf->pages = NULL; 74 buf->page_len = 0; 75 buf->flags = 0; 76 buf->len = 0; 77 buf->buflen = PAGE_SIZE; 78 } 79 80 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags) 81 { 82 struct page *page; 83 /* Preallocate one XDR receive buffer */ 84 page = alloc_page(gfp_flags); 85 if (page == NULL) 86 return -ENOMEM; 87 xdr_buf_init(buf, page_address(page), PAGE_SIZE); 88 return 0; 89 } 90 91 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt) 92 { 93 gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN; 94 struct rpc_rqst *req; 95 96 /* Pre-allocate one backchannel rpc_rqst */ 97 req = kzalloc_obj(*req, gfp_flags); 98 if (req == NULL) 99 return NULL; 100 101 req->rq_xprt = xprt; 102 103 /* Preallocate one XDR receive buffer */ 104 if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) { 105 printk(KERN_ERR "Failed to create bc receive xbuf\n"); 106 goto out_free; 107 } 108 req->rq_rcv_buf.len = PAGE_SIZE; 109 110 /* Preallocate one XDR send buffer */ 111 if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) { 112 printk(KERN_ERR "Failed to create bc snd xbuf\n"); 113 goto out_free; 114 } 115 return req; 116 out_free: 117 xprt_free_allocation(req); 118 return NULL; 119 } 120 121 /* 122 * Preallocate up to min_reqs structures and related buffers for use 123 * by the backchannel. This function can be called multiple times 124 * when creating new sessions that use the same rpc_xprt. The 125 * preallocated buffers are added to the pool of resources used by 126 * the rpc_xprt. Any one of these resources may be used by an 127 * incoming callback request. It's up to the higher levels in the 128 * stack to enforce that the maximum number of session slots is not 129 * being exceeded. 130 * 131 * Some callback arguments can be large. For example, a pNFS server 132 * using multiple deviceids. The list can be unbound, but the client 133 * has the ability to tell the server the maximum size of the callback 134 * requests. Each deviceID is 16 bytes, so allocate one page 135 * for the arguments to have enough room to receive a number of these 136 * deviceIDs. The NFS client indicates to the pNFS server that its 137 * callback requests can be up to 4096 bytes in size. 138 */ 139 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) 140 { 141 if (!xprt->ops->bc_setup) 142 return 0; 143 return xprt->ops->bc_setup(xprt, min_reqs); 144 } 145 EXPORT_SYMBOL_GPL(xprt_setup_backchannel); 146 147 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) 148 { 149 struct rpc_rqst *req; 150 LIST_HEAD(tmp_list); 151 int i; 152 153 dprintk("RPC: setup backchannel transport\n"); 154 155 if (min_reqs > BC_MAX_SLOTS) 156 min_reqs = BC_MAX_SLOTS; 157 158 /* 159 * We use a temporary list to keep track of the preallocated 160 * buffers. Once we're done building the list we splice it 161 * into the backchannel preallocation list off of the rpc_xprt 162 * struct. This helps minimize the amount of time the list 163 * lock is held on the rpc_xprt struct. It also makes cleanup 164 * easier in case of memory allocation errors. 165 */ 166 for (i = 0; i < min_reqs; i++) { 167 /* Pre-allocate one backchannel rpc_rqst */ 168 req = xprt_alloc_bc_req(xprt); 169 if (req == NULL) { 170 printk(KERN_ERR "Failed to create bc rpc_rqst\n"); 171 goto out_free; 172 } 173 174 /* Add the allocated buffer to the tmp list */ 175 dprintk("RPC: adding req= %p\n", req); 176 list_add(&req->rq_bc_pa_list, &tmp_list); 177 } 178 179 /* 180 * Add the temporary list to the backchannel preallocation list 181 */ 182 spin_lock(&xprt->bc_pa_lock); 183 list_splice(&tmp_list, &xprt->bc_pa_list); 184 xprt->bc_alloc_count += min_reqs; 185 xprt->bc_alloc_max += min_reqs; 186 atomic_add(min_reqs, &xprt->bc_slot_count); 187 spin_unlock(&xprt->bc_pa_lock); 188 189 dprintk("RPC: setup backchannel transport done\n"); 190 return 0; 191 192 out_free: 193 /* 194 * Memory allocation failed, free the temporary list 195 */ 196 while (!list_empty(&tmp_list)) { 197 req = list_first_entry(&tmp_list, 198 struct rpc_rqst, 199 rq_bc_pa_list); 200 list_del(&req->rq_bc_pa_list); 201 xprt_free_allocation(req); 202 } 203 204 dprintk("RPC: setup backchannel transport failed\n"); 205 return -ENOMEM; 206 } 207 208 /** 209 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures. 210 * @xprt: the transport holding the preallocated strucures 211 * @max_reqs: the maximum number of preallocated structures to destroy 212 * 213 * Since these structures may have been allocated by multiple calls 214 * to xprt_setup_backchannel, we only destroy up to the maximum number 215 * of reqs specified by the caller. 216 */ 217 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) 218 { 219 if (xprt->ops->bc_destroy) 220 xprt->ops->bc_destroy(xprt, max_reqs); 221 } 222 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); 223 224 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs) 225 { 226 struct rpc_rqst *req = NULL, *tmp = NULL; 227 228 dprintk("RPC: destroy backchannel transport\n"); 229 230 if (max_reqs == 0) 231 goto out; 232 233 spin_lock_bh(&xprt->bc_pa_lock); 234 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max); 235 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 236 dprintk("RPC: req=%p\n", req); 237 list_del(&req->rq_bc_pa_list); 238 xprt_free_allocation(req); 239 xprt->bc_alloc_count--; 240 atomic_dec(&xprt->bc_slot_count); 241 if (--max_reqs == 0) 242 break; 243 } 244 spin_unlock_bh(&xprt->bc_pa_lock); 245 246 out: 247 dprintk("RPC: backchannel list empty= %s\n", 248 list_empty(&xprt->bc_pa_list) ? "true" : "false"); 249 } 250 251 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid, 252 struct rpc_rqst *new) 253 { 254 struct rpc_rqst *req = NULL; 255 256 dprintk("RPC: allocate a backchannel request\n"); 257 if (list_empty(&xprt->bc_pa_list)) { 258 if (!new) 259 goto not_found; 260 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS) 261 goto not_found; 262 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list); 263 xprt->bc_alloc_count++; 264 atomic_inc(&xprt->bc_slot_count); 265 } 266 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, 267 rq_bc_pa_list); 268 req->rq_reply_bytes_recvd = 0; 269 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 270 sizeof(req->rq_private_buf)); 271 req->rq_xid = xid; 272 req->rq_connect_cookie = xprt->connect_cookie; 273 dprintk("RPC: backchannel req=%p\n", req); 274 not_found: 275 return req; 276 } 277 278 /* 279 * Return the preallocated rpc_rqst structure and XDR buffers 280 * associated with this rpc_task. 281 */ 282 void xprt_free_bc_request(struct rpc_rqst *req) 283 { 284 struct rpc_xprt *xprt = req->rq_xprt; 285 286 xprt->ops->bc_free_rqst(req); 287 } 288 289 void xprt_free_bc_rqst(struct rpc_rqst *req) 290 { 291 struct rpc_xprt *xprt = req->rq_xprt; 292 293 dprintk("RPC: free backchannel req=%p\n", req); 294 295 req->rq_connect_cookie = xprt->connect_cookie - 1; 296 smp_mb__before_atomic(); 297 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 298 smp_mb__after_atomic(); 299 300 /* 301 * Return it to the list of preallocations so that it 302 * may be reused by a new callback request. 303 */ 304 spin_lock_bh(&xprt->bc_pa_lock); 305 if (xprt_need_to_requeue(xprt)) { 306 xprt_bc_reinit_xdr_buf(&req->rq_snd_buf); 307 xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf); 308 req->rq_rcv_buf.len = PAGE_SIZE; 309 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); 310 xprt->bc_alloc_count++; 311 atomic_inc(&xprt->bc_slot_count); 312 req = NULL; 313 } 314 spin_unlock_bh(&xprt->bc_pa_lock); 315 if (req != NULL) { 316 /* 317 * The last remaining session was destroyed while this 318 * entry was in use. Free the entry and don't attempt 319 * to add back to the list because there is no need to 320 * have anymore preallocated entries. 321 */ 322 dprintk("RPC: Last session removed req=%p\n", req); 323 xprt_free_allocation(req); 324 } 325 xprt_put(xprt); 326 } 327 328 /* 329 * One or more rpc_rqst structure have been preallocated during the 330 * backchannel setup. Buffer space for the send and private XDR buffers 331 * has been preallocated as well. Use xprt_alloc_bc_request to allocate 332 * to this request. Use xprt_free_bc_request to return it. 333 * 334 * We know that we're called in soft interrupt context, grab the spin_lock 335 * since there is no need to grab the bottom half spin_lock. 336 * 337 * Return an available rpc_rqst, otherwise NULL if non are available. 338 */ 339 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid) 340 { 341 struct rpc_rqst *req, *new = NULL; 342 343 do { 344 spin_lock(&xprt->bc_pa_lock); 345 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) { 346 if (req->rq_connect_cookie != xprt->connect_cookie) 347 continue; 348 if (req->rq_xid == xid) 349 goto found; 350 } 351 req = xprt_get_bc_request(xprt, xid, new); 352 found: 353 spin_unlock(&xprt->bc_pa_lock); 354 if (new) { 355 if (req != new) 356 xprt_free_allocation(new); 357 break; 358 } else if (req) 359 break; 360 new = xprt_alloc_bc_req(xprt); 361 } while (new); 362 return req; 363 } 364 365 /* 366 * Add callback request to callback list. Wake a thread 367 * on the first pool (usually the only pool) to handle it. 368 */ 369 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied) 370 { 371 struct rpc_xprt *xprt = req->rq_xprt; 372 373 spin_lock(&xprt->bc_pa_lock); 374 list_del(&req->rq_bc_pa_list); 375 xprt->bc_alloc_count--; 376 spin_unlock(&xprt->bc_pa_lock); 377 378 req->rq_private_buf.len = copied; 379 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 380 381 dprintk("RPC: add callback request to list\n"); 382 xprt_enqueue_bc_request(req); 383 } 384 385 void xprt_enqueue_bc_request(struct rpc_rqst *req) 386 { 387 struct rpc_xprt *xprt = req->rq_xprt; 388 struct svc_serv *bc_serv; 389 390 xprt_get(xprt); 391 spin_lock(&xprt->bc_pa_lock); 392 bc_serv = xprt->bc_serv; 393 if (bc_serv) { 394 lwq_enqueue(&req->rq_bc_list, &bc_serv->sv_cb_list); 395 svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]); 396 } 397 spin_unlock(&xprt->bc_pa_lock); 398 } 399 EXPORT_SYMBOL_GPL(xprt_enqueue_bc_request); 400