1*f531a5dbSChuck Lever /* 2*f531a5dbSChuck Lever * Copyright (c) 2015 Oracle. All rights reserved. 3*f531a5dbSChuck Lever * 4*f531a5dbSChuck Lever * Support for backward direction RPCs on RPC/RDMA. 5*f531a5dbSChuck Lever */ 6*f531a5dbSChuck Lever 7*f531a5dbSChuck Lever #include <linux/module.h> 8*f531a5dbSChuck Lever 9*f531a5dbSChuck Lever #include "xprt_rdma.h" 10*f531a5dbSChuck Lever 11*f531a5dbSChuck Lever #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 12*f531a5dbSChuck Lever # define RPCDBG_FACILITY RPCDBG_TRANS 13*f531a5dbSChuck Lever #endif 14*f531a5dbSChuck Lever 15*f531a5dbSChuck Lever static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, 16*f531a5dbSChuck Lever struct rpc_rqst *rqst) 17*f531a5dbSChuck Lever { 18*f531a5dbSChuck Lever struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 19*f531a5dbSChuck Lever struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 20*f531a5dbSChuck Lever 21*f531a5dbSChuck Lever spin_lock(&buf->rb_reqslock); 22*f531a5dbSChuck Lever list_del(&req->rl_all); 23*f531a5dbSChuck Lever spin_unlock(&buf->rb_reqslock); 24*f531a5dbSChuck Lever 25*f531a5dbSChuck Lever rpcrdma_destroy_req(&r_xprt->rx_ia, req); 26*f531a5dbSChuck Lever 27*f531a5dbSChuck Lever kfree(rqst); 28*f531a5dbSChuck Lever } 29*f531a5dbSChuck Lever 30*f531a5dbSChuck Lever static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, 31*f531a5dbSChuck Lever struct rpc_rqst *rqst) 32*f531a5dbSChuck Lever { 33*f531a5dbSChuck Lever struct rpcrdma_ia *ia = &r_xprt->rx_ia; 34*f531a5dbSChuck Lever struct rpcrdma_regbuf *rb; 35*f531a5dbSChuck Lever struct rpcrdma_req *req; 36*f531a5dbSChuck Lever struct xdr_buf *buf; 37*f531a5dbSChuck Lever size_t size; 38*f531a5dbSChuck Lever 39*f531a5dbSChuck Lever req = rpcrdma_create_req(r_xprt); 40*f531a5dbSChuck Lever if (!req) 41*f531a5dbSChuck Lever return -ENOMEM; 42*f531a5dbSChuck Lever req->rl_backchannel = true; 43*f531a5dbSChuck Lever 44*f531a5dbSChuck Lever size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); 45*f531a5dbSChuck Lever rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); 46*f531a5dbSChuck Lever if (IS_ERR(rb)) 47*f531a5dbSChuck Lever goto out_fail; 48*f531a5dbSChuck Lever req->rl_rdmabuf = rb; 49*f531a5dbSChuck Lever 50*f531a5dbSChuck Lever size += RPCRDMA_INLINE_READ_THRESHOLD(rqst); 51*f531a5dbSChuck Lever rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); 52*f531a5dbSChuck Lever if (IS_ERR(rb)) 53*f531a5dbSChuck Lever goto out_fail; 54*f531a5dbSChuck Lever rb->rg_owner = req; 55*f531a5dbSChuck Lever req->rl_sendbuf = rb; 56*f531a5dbSChuck Lever /* so that rpcr_to_rdmar works when receiving a request */ 57*f531a5dbSChuck Lever rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base; 58*f531a5dbSChuck Lever 59*f531a5dbSChuck Lever buf = &rqst->rq_snd_buf; 60*f531a5dbSChuck Lever buf->head[0].iov_base = rqst->rq_buffer; 61*f531a5dbSChuck Lever buf->head[0].iov_len = 0; 62*f531a5dbSChuck Lever buf->tail[0].iov_base = NULL; 63*f531a5dbSChuck Lever buf->tail[0].iov_len = 0; 64*f531a5dbSChuck Lever buf->page_len = 0; 65*f531a5dbSChuck Lever buf->len = 0; 66*f531a5dbSChuck Lever buf->buflen = size; 67*f531a5dbSChuck Lever 68*f531a5dbSChuck Lever return 0; 69*f531a5dbSChuck Lever 70*f531a5dbSChuck Lever out_fail: 71*f531a5dbSChuck Lever rpcrdma_bc_free_rqst(r_xprt, rqst); 72*f531a5dbSChuck Lever return -ENOMEM; 73*f531a5dbSChuck Lever } 74*f531a5dbSChuck Lever 75*f531a5dbSChuck Lever /* Allocate and add receive buffers to the rpcrdma_buffer's 76*f531a5dbSChuck Lever * existing list of rep's. These are released when the 77*f531a5dbSChuck Lever * transport is destroyed. 78*f531a5dbSChuck Lever */ 79*f531a5dbSChuck Lever static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, 80*f531a5dbSChuck Lever unsigned int count) 81*f531a5dbSChuck Lever { 82*f531a5dbSChuck Lever struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 83*f531a5dbSChuck Lever struct rpcrdma_rep *rep; 84*f531a5dbSChuck Lever unsigned long flags; 85*f531a5dbSChuck Lever int rc = 0; 86*f531a5dbSChuck Lever 87*f531a5dbSChuck Lever while (count--) { 88*f531a5dbSChuck Lever rep = rpcrdma_create_rep(r_xprt); 89*f531a5dbSChuck Lever if (IS_ERR(rep)) { 90*f531a5dbSChuck Lever pr_err("RPC: %s: reply buffer alloc failed\n", 91*f531a5dbSChuck Lever __func__); 92*f531a5dbSChuck Lever rc = PTR_ERR(rep); 93*f531a5dbSChuck Lever break; 94*f531a5dbSChuck Lever } 95*f531a5dbSChuck Lever 96*f531a5dbSChuck Lever spin_lock_irqsave(&buffers->rb_lock, flags); 97*f531a5dbSChuck Lever list_add(&rep->rr_list, &buffers->rb_recv_bufs); 98*f531a5dbSChuck Lever spin_unlock_irqrestore(&buffers->rb_lock, flags); 99*f531a5dbSChuck Lever } 100*f531a5dbSChuck Lever 101*f531a5dbSChuck Lever return rc; 102*f531a5dbSChuck Lever } 103*f531a5dbSChuck Lever 104*f531a5dbSChuck Lever /** 105*f531a5dbSChuck Lever * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests 106*f531a5dbSChuck Lever * @xprt: transport associated with these backchannel resources 107*f531a5dbSChuck Lever * @reqs: number of concurrent incoming requests to expect 108*f531a5dbSChuck Lever * 109*f531a5dbSChuck Lever * Returns 0 on success; otherwise a negative errno 110*f531a5dbSChuck Lever */ 111*f531a5dbSChuck Lever int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) 112*f531a5dbSChuck Lever { 113*f531a5dbSChuck Lever struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 114*f531a5dbSChuck Lever struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 115*f531a5dbSChuck Lever struct rpc_rqst *rqst; 116*f531a5dbSChuck Lever unsigned int i; 117*f531a5dbSChuck Lever int rc; 118*f531a5dbSChuck Lever 119*f531a5dbSChuck Lever /* The backchannel reply path returns each rpc_rqst to the 120*f531a5dbSChuck Lever * bc_pa_list _after_ the reply is sent. If the server is 121*f531a5dbSChuck Lever * faster than the client, it can send another backward 122*f531a5dbSChuck Lever * direction request before the rpc_rqst is returned to the 123*f531a5dbSChuck Lever * list. The client rejects the request in this case. 124*f531a5dbSChuck Lever * 125*f531a5dbSChuck Lever * Twice as many rpc_rqsts are prepared to ensure there is 126*f531a5dbSChuck Lever * always an rpc_rqst available as soon as a reply is sent. 127*f531a5dbSChuck Lever */ 128*f531a5dbSChuck Lever for (i = 0; i < (reqs << 1); i++) { 129*f531a5dbSChuck Lever rqst = kzalloc(sizeof(*rqst), GFP_KERNEL); 130*f531a5dbSChuck Lever if (!rqst) { 131*f531a5dbSChuck Lever pr_err("RPC: %s: Failed to create bc rpc_rqst\n", 132*f531a5dbSChuck Lever __func__); 133*f531a5dbSChuck Lever goto out_free; 134*f531a5dbSChuck Lever } 135*f531a5dbSChuck Lever 136*f531a5dbSChuck Lever rqst->rq_xprt = &r_xprt->rx_xprt; 137*f531a5dbSChuck Lever INIT_LIST_HEAD(&rqst->rq_list); 138*f531a5dbSChuck Lever INIT_LIST_HEAD(&rqst->rq_bc_list); 139*f531a5dbSChuck Lever 140*f531a5dbSChuck Lever if (rpcrdma_bc_setup_rqst(r_xprt, rqst)) 141*f531a5dbSChuck Lever goto out_free; 142*f531a5dbSChuck Lever 143*f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 144*f531a5dbSChuck Lever list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 145*f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 146*f531a5dbSChuck Lever } 147*f531a5dbSChuck Lever 148*f531a5dbSChuck Lever rc = rpcrdma_bc_setup_reps(r_xprt, reqs); 149*f531a5dbSChuck Lever if (rc) 150*f531a5dbSChuck Lever goto out_free; 151*f531a5dbSChuck Lever 152*f531a5dbSChuck Lever rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs); 153*f531a5dbSChuck Lever if (rc) 154*f531a5dbSChuck Lever goto out_free; 155*f531a5dbSChuck Lever 156*f531a5dbSChuck Lever buffer->rb_bc_srv_max_requests = reqs; 157*f531a5dbSChuck Lever request_module("svcrdma"); 158*f531a5dbSChuck Lever 159*f531a5dbSChuck Lever return 0; 160*f531a5dbSChuck Lever 161*f531a5dbSChuck Lever out_free: 162*f531a5dbSChuck Lever xprt_rdma_bc_destroy(xprt, reqs); 163*f531a5dbSChuck Lever 164*f531a5dbSChuck Lever pr_err("RPC: %s: setup backchannel transport failed\n", __func__); 165*f531a5dbSChuck Lever return -ENOMEM; 166*f531a5dbSChuck Lever } 167*f531a5dbSChuck Lever 168*f531a5dbSChuck Lever /** 169*f531a5dbSChuck Lever * xprt_rdma_bc_destroy - Release resources for handling backchannel requests 170*f531a5dbSChuck Lever * @xprt: transport associated with these backchannel resources 171*f531a5dbSChuck Lever * @reqs: number of incoming requests to destroy; ignored 172*f531a5dbSChuck Lever */ 173*f531a5dbSChuck Lever void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) 174*f531a5dbSChuck Lever { 175*f531a5dbSChuck Lever struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 176*f531a5dbSChuck Lever struct rpc_rqst *rqst, *tmp; 177*f531a5dbSChuck Lever 178*f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 179*f531a5dbSChuck Lever list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 180*f531a5dbSChuck Lever list_del(&rqst->rq_bc_pa_list); 181*f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 182*f531a5dbSChuck Lever 183*f531a5dbSChuck Lever rpcrdma_bc_free_rqst(r_xprt, rqst); 184*f531a5dbSChuck Lever 185*f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 186*f531a5dbSChuck Lever } 187*f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 188*f531a5dbSChuck Lever } 189*f531a5dbSChuck Lever 190*f531a5dbSChuck Lever /** 191*f531a5dbSChuck Lever * xprt_rdma_bc_free_rqst - Release a backchannel rqst 192*f531a5dbSChuck Lever * @rqst: request to release 193*f531a5dbSChuck Lever */ 194*f531a5dbSChuck Lever void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) 195*f531a5dbSChuck Lever { 196*f531a5dbSChuck Lever struct rpc_xprt *xprt = rqst->rq_xprt; 197*f531a5dbSChuck Lever 198*f531a5dbSChuck Lever smp_mb__before_atomic(); 199*f531a5dbSChuck Lever WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)); 200*f531a5dbSChuck Lever clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); 201*f531a5dbSChuck Lever smp_mb__after_atomic(); 202*f531a5dbSChuck Lever 203*f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 204*f531a5dbSChuck Lever list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 205*f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 206*f531a5dbSChuck Lever } 207