1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 4 (c) 2007 Network Appliance, Inc. All Rights Reserved. 5 (c) 2009 NetApp. All Rights Reserved. 6 7 8 ******************************************************************************/ 9 10 #include <linux/tcp.h> 11 #include <linux/slab.h> 12 #include <linux/sunrpc/xprt.h> 13 #include <linux/export.h> 14 #include <linux/sunrpc/bc_xprt.h> 15 16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 17 #define RPCDBG_FACILITY RPCDBG_TRANS 18 #endif 19 20 #define BC_MAX_SLOTS 64U 21 22 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt) 23 { 24 return BC_MAX_SLOTS; 25 } 26 27 /* 28 * Helper function to nullify backchannel server pointer in transport. 29 * We need to synchronize setting the pointer to NULL (done so after 30 * the backchannel server is shutdown) with the usage of that pointer 31 * by the backchannel request processing routines 32 * xprt_complete_bc_request() and rpcrdma_bc_receive_call(). 33 */ 34 void xprt_svc_destroy_nullify_bc(struct rpc_xprt *xprt, struct svc_serv **serv) 35 { 36 spin_lock(&xprt->bc_pa_lock); 37 svc_destroy(serv); 38 xprt->bc_serv = NULL; 39 spin_unlock(&xprt->bc_pa_lock); 40 } 41 EXPORT_SYMBOL_GPL(xprt_svc_destroy_nullify_bc); 42 43 /* 44 * Helper routines that track the number of preallocation elements 45 * on the transport. 46 */ 47 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) 48 { 49 return xprt->bc_alloc_count < xprt->bc_alloc_max; 50 } 51 52 /* 53 * Free the preallocated rpc_rqst structure and the memory 54 * buffers hanging off of it. 55 */ 56 static void xprt_free_allocation(struct rpc_rqst *req) 57 { 58 struct xdr_buf *xbufp; 59 60 dprintk("RPC: free allocations for req= %p\n", req); 61 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); 62 xbufp = &req->rq_rcv_buf; 63 free_page((unsigned long)xbufp->head[0].iov_base); 64 xbufp = &req->rq_snd_buf; 65 free_page((unsigned long)xbufp->head[0].iov_base); 66 kfree(req); 67 } 68 69 static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf) 70 { 71 buf->head[0].iov_len = PAGE_SIZE; 72 buf->tail[0].iov_len = 0; 73 buf->pages = NULL; 74 buf->page_len = 0; 75 buf->flags = 0; 76 buf->len = 0; 77 buf->buflen = PAGE_SIZE; 78 } 79 80 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags) 81 { 82 struct page *page; 83 /* Preallocate one XDR receive buffer */ 84 page = alloc_page(gfp_flags); 85 if (page == NULL) 86 return -ENOMEM; 87 xdr_buf_init(buf, page_address(page), PAGE_SIZE); 88 return 0; 89 } 90 91 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt) 92 { 93 gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN; 94 struct rpc_rqst *req; 95 96 /* Pre-allocate one backchannel rpc_rqst */ 97 req = kzalloc(sizeof(*req), gfp_flags); 98 if (req == NULL) 99 return NULL; 100 101 req->rq_xprt = xprt; 102 103 /* Preallocate one XDR receive buffer */ 104 if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) { 105 printk(KERN_ERR "Failed to create bc receive xbuf\n"); 106 goto out_free; 107 } 108 req->rq_rcv_buf.len = PAGE_SIZE; 109 110 /* Preallocate one XDR send buffer */ 111 if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) { 112 printk(KERN_ERR "Failed to create bc snd xbuf\n"); 113 goto out_free; 114 } 115 return req; 116 out_free: 117 xprt_free_allocation(req); 118 return NULL; 119 } 120 121 /* 122 * Preallocate up to min_reqs structures and related buffers for use 123 * by the backchannel. This function can be called multiple times 124 * when creating new sessions that use the same rpc_xprt. The 125 * preallocated buffers are added to the pool of resources used by 126 * the rpc_xprt. Any one of these resources may be used by an 127 * incoming callback request. It's up to the higher levels in the 128 * stack to enforce that the maximum number of session slots is not 129 * being exceeded. 130 * 131 * Some callback arguments can be large. For example, a pNFS server 132 * using multiple deviceids. The list can be unbound, but the client 133 * has the ability to tell the server the maximum size of the callback 134 * requests. Each deviceID is 16 bytes, so allocate one page 135 * for the arguments to have enough room to receive a number of these 136 * deviceIDs. The NFS client indicates to the pNFS server that its 137 * callback requests can be up to 4096 bytes in size. 138 */ 139 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) 140 { 141 if (!xprt->ops->bc_setup) 142 return 0; 143 return xprt->ops->bc_setup(xprt, min_reqs); 144 } 145 EXPORT_SYMBOL_GPL(xprt_setup_backchannel); 146 147 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) 148 { 149 struct rpc_rqst *req; 150 struct list_head tmp_list; 151 int i; 152 153 dprintk("RPC: setup backchannel transport\n"); 154 155 if (min_reqs > BC_MAX_SLOTS) 156 min_reqs = BC_MAX_SLOTS; 157 158 /* 159 * We use a temporary list to keep track of the preallocated 160 * buffers. Once we're done building the list we splice it 161 * into the backchannel preallocation list off of the rpc_xprt 162 * struct. This helps minimize the amount of time the list 163 * lock is held on the rpc_xprt struct. It also makes cleanup 164 * easier in case of memory allocation errors. 165 */ 166 INIT_LIST_HEAD(&tmp_list); 167 for (i = 0; i < min_reqs; i++) { 168 /* Pre-allocate one backchannel rpc_rqst */ 169 req = xprt_alloc_bc_req(xprt); 170 if (req == NULL) { 171 printk(KERN_ERR "Failed to create bc rpc_rqst\n"); 172 goto out_free; 173 } 174 175 /* Add the allocated buffer to the tmp list */ 176 dprintk("RPC: adding req= %p\n", req); 177 list_add(&req->rq_bc_pa_list, &tmp_list); 178 } 179 180 /* 181 * Add the temporary list to the backchannel preallocation list 182 */ 183 spin_lock(&xprt->bc_pa_lock); 184 list_splice(&tmp_list, &xprt->bc_pa_list); 185 xprt->bc_alloc_count += min_reqs; 186 xprt->bc_alloc_max += min_reqs; 187 atomic_add(min_reqs, &xprt->bc_slot_count); 188 spin_unlock(&xprt->bc_pa_lock); 189 190 dprintk("RPC: setup backchannel transport done\n"); 191 return 0; 192 193 out_free: 194 /* 195 * Memory allocation failed, free the temporary list 196 */ 197 while (!list_empty(&tmp_list)) { 198 req = list_first_entry(&tmp_list, 199 struct rpc_rqst, 200 rq_bc_pa_list); 201 list_del(&req->rq_bc_pa_list); 202 xprt_free_allocation(req); 203 } 204 205 dprintk("RPC: setup backchannel transport failed\n"); 206 return -ENOMEM; 207 } 208 209 /** 210 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures. 211 * @xprt: the transport holding the preallocated strucures 212 * @max_reqs: the maximum number of preallocated structures to destroy 213 * 214 * Since these structures may have been allocated by multiple calls 215 * to xprt_setup_backchannel, we only destroy up to the maximum number 216 * of reqs specified by the caller. 217 */ 218 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) 219 { 220 if (xprt->ops->bc_destroy) 221 xprt->ops->bc_destroy(xprt, max_reqs); 222 } 223 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); 224 225 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs) 226 { 227 struct rpc_rqst *req = NULL, *tmp = NULL; 228 229 dprintk("RPC: destroy backchannel transport\n"); 230 231 if (max_reqs == 0) 232 goto out; 233 234 spin_lock_bh(&xprt->bc_pa_lock); 235 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max); 236 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 237 dprintk("RPC: req=%p\n", req); 238 list_del(&req->rq_bc_pa_list); 239 xprt_free_allocation(req); 240 xprt->bc_alloc_count--; 241 atomic_dec(&xprt->bc_slot_count); 242 if (--max_reqs == 0) 243 break; 244 } 245 spin_unlock_bh(&xprt->bc_pa_lock); 246 247 out: 248 dprintk("RPC: backchannel list empty= %s\n", 249 list_empty(&xprt->bc_pa_list) ? "true" : "false"); 250 } 251 252 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid, 253 struct rpc_rqst *new) 254 { 255 struct rpc_rqst *req = NULL; 256 257 dprintk("RPC: allocate a backchannel request\n"); 258 if (list_empty(&xprt->bc_pa_list)) { 259 if (!new) 260 goto not_found; 261 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS) 262 goto not_found; 263 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list); 264 xprt->bc_alloc_count++; 265 atomic_inc(&xprt->bc_slot_count); 266 } 267 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, 268 rq_bc_pa_list); 269 req->rq_reply_bytes_recvd = 0; 270 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 271 sizeof(req->rq_private_buf)); 272 req->rq_xid = xid; 273 req->rq_connect_cookie = xprt->connect_cookie; 274 dprintk("RPC: backchannel req=%p\n", req); 275 not_found: 276 return req; 277 } 278 279 /* 280 * Return the preallocated rpc_rqst structure and XDR buffers 281 * associated with this rpc_task. 282 */ 283 void xprt_free_bc_request(struct rpc_rqst *req) 284 { 285 struct rpc_xprt *xprt = req->rq_xprt; 286 287 xprt->ops->bc_free_rqst(req); 288 } 289 290 void xprt_free_bc_rqst(struct rpc_rqst *req) 291 { 292 struct rpc_xprt *xprt = req->rq_xprt; 293 294 dprintk("RPC: free backchannel req=%p\n", req); 295 296 req->rq_connect_cookie = xprt->connect_cookie - 1; 297 smp_mb__before_atomic(); 298 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 299 smp_mb__after_atomic(); 300 301 /* 302 * Return it to the list of preallocations so that it 303 * may be reused by a new callback request. 304 */ 305 spin_lock_bh(&xprt->bc_pa_lock); 306 if (xprt_need_to_requeue(xprt)) { 307 xprt_bc_reinit_xdr_buf(&req->rq_snd_buf); 308 xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf); 309 req->rq_rcv_buf.len = PAGE_SIZE; 310 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); 311 xprt->bc_alloc_count++; 312 atomic_inc(&xprt->bc_slot_count); 313 req = NULL; 314 } 315 spin_unlock_bh(&xprt->bc_pa_lock); 316 if (req != NULL) { 317 /* 318 * The last remaining session was destroyed while this 319 * entry was in use. Free the entry and don't attempt 320 * to add back to the list because there is no need to 321 * have anymore preallocated entries. 322 */ 323 dprintk("RPC: Last session removed req=%p\n", req); 324 xprt_free_allocation(req); 325 } 326 xprt_put(xprt); 327 } 328 329 /* 330 * One or more rpc_rqst structure have been preallocated during the 331 * backchannel setup. Buffer space for the send and private XDR buffers 332 * has been preallocated as well. Use xprt_alloc_bc_request to allocate 333 * to this request. Use xprt_free_bc_request to return it. 334 * 335 * We know that we're called in soft interrupt context, grab the spin_lock 336 * since there is no need to grab the bottom half spin_lock. 337 * 338 * Return an available rpc_rqst, otherwise NULL if non are available. 339 */ 340 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid) 341 { 342 struct rpc_rqst *req, *new = NULL; 343 344 do { 345 spin_lock(&xprt->bc_pa_lock); 346 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) { 347 if (req->rq_connect_cookie != xprt->connect_cookie) 348 continue; 349 if (req->rq_xid == xid) 350 goto found; 351 } 352 req = xprt_get_bc_request(xprt, xid, new); 353 found: 354 spin_unlock(&xprt->bc_pa_lock); 355 if (new) { 356 if (req != new) 357 xprt_free_allocation(new); 358 break; 359 } else if (req) 360 break; 361 new = xprt_alloc_bc_req(xprt); 362 } while (new); 363 return req; 364 } 365 366 /* 367 * Add callback request to callback list. Wake a thread 368 * on the first pool (usually the only pool) to handle it. 369 */ 370 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied) 371 { 372 struct rpc_xprt *xprt = req->rq_xprt; 373 374 spin_lock(&xprt->bc_pa_lock); 375 list_del(&req->rq_bc_pa_list); 376 xprt->bc_alloc_count--; 377 spin_unlock(&xprt->bc_pa_lock); 378 379 req->rq_private_buf.len = copied; 380 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 381 382 dprintk("RPC: add callback request to list\n"); 383 xprt_enqueue_bc_request(req); 384 } 385 386 void xprt_enqueue_bc_request(struct rpc_rqst *req) 387 { 388 struct rpc_xprt *xprt = req->rq_xprt; 389 struct svc_serv *bc_serv; 390 391 xprt_get(xprt); 392 spin_lock(&xprt->bc_pa_lock); 393 bc_serv = xprt->bc_serv; 394 if (bc_serv) { 395 lwq_enqueue(&req->rq_bc_list, &bc_serv->sv_cb_list); 396 svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]); 397 } 398 spin_unlock(&xprt->bc_pa_lock); 399 } 400 EXPORT_SYMBOL_GPL(xprt_enqueue_bc_request); 401