1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * verbs.c 42 * 43 * Encapsulates the major functions managing: 44 * o adapters 45 * o endpoints 46 * o connections 47 * o buffer memory 48 */ 49 50 #include <linux/interrupt.h> 51 #include <linux/slab.h> 52 #include <linux/prefetch.h> 53 #include <linux/sunrpc/addr.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 #include <asm/bitops.h> 56 57 #include <rdma/ib_cm.h> 58 59 #include "xprt_rdma.h" 60 61 /* 62 * Globals/Macros 63 */ 64 65 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 66 # define RPCDBG_FACILITY RPCDBG_TRANS 67 #endif 68 69 /* 70 * internal functions 71 */ 72 static void rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt); 73 static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf); 74 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); 75 76 static struct workqueue_struct *rpcrdma_receive_wq __read_mostly; 77 78 int 79 rpcrdma_alloc_wq(void) 80 { 81 struct workqueue_struct *recv_wq; 82 83 recv_wq = alloc_workqueue("xprtrdma_receive", 84 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, 85 0); 86 if (!recv_wq) 87 return -ENOMEM; 88 89 rpcrdma_receive_wq = recv_wq; 90 return 0; 91 } 92 93 void 94 rpcrdma_destroy_wq(void) 95 { 96 struct workqueue_struct *wq; 97 98 if (rpcrdma_receive_wq) { 99 wq = rpcrdma_receive_wq; 100 rpcrdma_receive_wq = NULL; 101 destroy_workqueue(wq); 102 } 103 } 104 105 static void 106 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 107 { 108 struct rpcrdma_ep *ep = context; 109 110 pr_err("rpcrdma: %s on device %s ep %p\n", 111 ib_event_msg(event->event), event->device->name, context); 112 113 if (ep->rep_connected == 1) { 114 ep->rep_connected = -EIO; 115 rpcrdma_conn_func(ep); 116 wake_up_all(&ep->rep_connect_wait); 117 } 118 } 119 120 /** 121 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 122 * @cq: completion queue (ignored) 123 * @wc: completed WR 124 * 125 */ 126 static void 127 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 128 { 129 /* WARNING: Only wr_cqe and status are reliable at this point */ 130 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 131 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 132 ib_wc_status_msg(wc->status), 133 wc->status, wc->vendor_err); 134 } 135 136 /* Perform basic sanity checking to avoid using garbage 137 * to update the credit grant value. 138 */ 139 static void 140 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) 141 { 142 struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); 143 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; 144 u32 credits; 145 146 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 147 return; 148 149 credits = be32_to_cpu(rmsgp->rm_credit); 150 if (credits == 0) 151 credits = 1; /* don't deadlock */ 152 else if (credits > buffer->rb_max_requests) 153 credits = buffer->rb_max_requests; 154 155 atomic_set(&buffer->rb_credits, credits); 156 } 157 158 /** 159 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 160 * @cq: completion queue (ignored) 161 * @wc: completed WR 162 * 163 */ 164 static void 165 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 166 { 167 struct ib_cqe *cqe = wc->wr_cqe; 168 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 169 rr_cqe); 170 171 /* WARNING: Only wr_id and status are reliable at this point */ 172 if (wc->status != IB_WC_SUCCESS) 173 goto out_fail; 174 175 /* status == SUCCESS means all fields in wc are trustworthy */ 176 if (wc->opcode != IB_WC_RECV) 177 return; 178 179 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 180 __func__, rep, wc->byte_len); 181 182 rep->rr_len = wc->byte_len; 183 rep->rr_wc_flags = wc->wc_flags; 184 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 185 186 ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf), 187 rdmab_addr(rep->rr_rdmabuf), 188 rep->rr_len, DMA_FROM_DEVICE); 189 190 rpcrdma_update_granted_credits(rep); 191 192 out_schedule: 193 queue_work(rpcrdma_receive_wq, &rep->rr_work); 194 return; 195 196 out_fail: 197 if (wc->status != IB_WC_WR_FLUSH_ERR) 198 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 199 ib_wc_status_msg(wc->status), 200 wc->status, wc->vendor_err); 201 rep->rr_len = RPCRDMA_BAD_LEN; 202 goto out_schedule; 203 } 204 205 static void 206 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, 207 struct rdma_conn_param *param) 208 { 209 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 210 const struct rpcrdma_connect_private *pmsg = param->private_data; 211 unsigned int rsize, wsize; 212 213 /* Default settings for RPC-over-RDMA Version One */ 214 r_xprt->rx_ia.ri_reminv_expected = false; 215 r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; 216 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 217 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 218 219 if (pmsg && 220 pmsg->cp_magic == rpcrdma_cmp_magic && 221 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 222 r_xprt->rx_ia.ri_reminv_expected = true; 223 r_xprt->rx_ia.ri_implicit_roundup = true; 224 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 225 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 226 } 227 228 if (rsize < cdata->inline_rsize) 229 cdata->inline_rsize = rsize; 230 if (wsize < cdata->inline_wsize) 231 cdata->inline_wsize = wsize; 232 dprintk("RPC: %s: max send %u, max recv %u\n", 233 __func__, cdata->inline_wsize, cdata->inline_rsize); 234 rpcrdma_set_max_header_sizes(r_xprt); 235 } 236 237 static int 238 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 239 { 240 struct rpcrdma_xprt *xprt = id->context; 241 struct rpcrdma_ia *ia = &xprt->rx_ia; 242 struct rpcrdma_ep *ep = &xprt->rx_ep; 243 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 244 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 245 #endif 246 struct ib_qp_attr *attr = &ia->ri_qp_attr; 247 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr; 248 int connstate = 0; 249 250 switch (event->event) { 251 case RDMA_CM_EVENT_ADDR_RESOLVED: 252 case RDMA_CM_EVENT_ROUTE_RESOLVED: 253 ia->ri_async_rc = 0; 254 complete(&ia->ri_done); 255 break; 256 case RDMA_CM_EVENT_ADDR_ERROR: 257 ia->ri_async_rc = -EHOSTUNREACH; 258 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 259 __func__, ep); 260 complete(&ia->ri_done); 261 break; 262 case RDMA_CM_EVENT_ROUTE_ERROR: 263 ia->ri_async_rc = -ENETUNREACH; 264 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 265 __func__, ep); 266 complete(&ia->ri_done); 267 break; 268 case RDMA_CM_EVENT_DEVICE_REMOVAL: 269 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 270 pr_info("rpcrdma: removing device for %pIS:%u\n", 271 sap, rpc_get_port(sap)); 272 #endif 273 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); 274 ep->rep_connected = -ENODEV; 275 xprt_force_disconnect(&xprt->rx_xprt); 276 wait_for_completion(&ia->ri_remove_done); 277 278 ia->ri_id = NULL; 279 ia->ri_pd = NULL; 280 ia->ri_device = NULL; 281 /* Return 1 to ensure the core destroys the id. */ 282 return 1; 283 case RDMA_CM_EVENT_ESTABLISHED: 284 connstate = 1; 285 ib_query_qp(ia->ri_id->qp, attr, 286 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, 287 iattr); 288 dprintk("RPC: %s: %d responder resources" 289 " (%d initiator)\n", 290 __func__, attr->max_dest_rd_atomic, 291 attr->max_rd_atomic); 292 rpcrdma_update_connect_private(xprt, &event->param.conn); 293 goto connected; 294 case RDMA_CM_EVENT_CONNECT_ERROR: 295 connstate = -ENOTCONN; 296 goto connected; 297 case RDMA_CM_EVENT_UNREACHABLE: 298 connstate = -ENETDOWN; 299 goto connected; 300 case RDMA_CM_EVENT_REJECTED: 301 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 302 pr_info("rpcrdma: connection to %pIS:%u on %s rejected: %s\n", 303 sap, rpc_get_port(sap), ia->ri_device->name, 304 rdma_reject_msg(id, event->status)); 305 #endif 306 connstate = -ECONNREFUSED; 307 if (event->status == IB_CM_REJ_STALE_CONN) 308 connstate = -EAGAIN; 309 goto connected; 310 case RDMA_CM_EVENT_DISCONNECTED: 311 connstate = -ECONNABORTED; 312 connected: 313 dprintk("RPC: %s: %sconnected\n", 314 __func__, connstate > 0 ? "" : "dis"); 315 atomic_set(&xprt->rx_buf.rb_credits, 1); 316 ep->rep_connected = connstate; 317 rpcrdma_conn_func(ep); 318 wake_up_all(&ep->rep_connect_wait); 319 /*FALLTHROUGH*/ 320 default: 321 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n", 322 __func__, sap, rpc_get_port(sap), ep, 323 rdma_event_msg(event->event)); 324 break; 325 } 326 327 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 328 if (connstate == 1) { 329 int ird = attr->max_dest_rd_atomic; 330 int tird = ep->rep_remote_cma.responder_resources; 331 332 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n", 333 sap, rpc_get_port(sap), 334 ia->ri_device->name, 335 ia->ri_ops->ro_displayname, 336 xprt->rx_buf.rb_max_requests, 337 ird, ird < 4 && ird < tird / 2 ? " (low!)" : ""); 338 } else if (connstate < 0) { 339 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n", 340 sap, rpc_get_port(sap), connstate); 341 } 342 #endif 343 344 return 0; 345 } 346 347 static struct rdma_cm_id * 348 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 349 struct rpcrdma_ia *ia, struct sockaddr *addr) 350 { 351 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; 352 struct rdma_cm_id *id; 353 int rc; 354 355 init_completion(&ia->ri_done); 356 init_completion(&ia->ri_remove_done); 357 358 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 359 IB_QPT_RC); 360 if (IS_ERR(id)) { 361 rc = PTR_ERR(id); 362 dprintk("RPC: %s: rdma_create_id() failed %i\n", 363 __func__, rc); 364 return id; 365 } 366 367 ia->ri_async_rc = -ETIMEDOUT; 368 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 369 if (rc) { 370 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 371 __func__, rc); 372 goto out; 373 } 374 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 375 if (rc < 0) { 376 dprintk("RPC: %s: wait() exited: %i\n", 377 __func__, rc); 378 goto out; 379 } 380 381 rc = ia->ri_async_rc; 382 if (rc) 383 goto out; 384 385 ia->ri_async_rc = -ETIMEDOUT; 386 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 387 if (rc) { 388 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 389 __func__, rc); 390 goto out; 391 } 392 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 393 if (rc < 0) { 394 dprintk("RPC: %s: wait() exited: %i\n", 395 __func__, rc); 396 goto out; 397 } 398 rc = ia->ri_async_rc; 399 if (rc) 400 goto out; 401 402 return id; 403 404 out: 405 rdma_destroy_id(id); 406 return ERR_PTR(rc); 407 } 408 409 /* 410 * Exported functions. 411 */ 412 413 /** 414 * rpcrdma_ia_open - Open and initialize an Interface Adapter. 415 * @xprt: controlling transport 416 * @addr: IP address of remote peer 417 * 418 * Returns 0 on success, negative errno if an appropriate 419 * Interface Adapter could not be found and opened. 420 */ 421 int 422 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr) 423 { 424 struct rpcrdma_ia *ia = &xprt->rx_ia; 425 int rc; 426 427 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 428 if (IS_ERR(ia->ri_id)) { 429 rc = PTR_ERR(ia->ri_id); 430 goto out_err; 431 } 432 ia->ri_device = ia->ri_id->device; 433 434 ia->ri_pd = ib_alloc_pd(ia->ri_device, 0); 435 if (IS_ERR(ia->ri_pd)) { 436 rc = PTR_ERR(ia->ri_pd); 437 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 438 goto out_err; 439 } 440 441 switch (xprt_rdma_memreg_strategy) { 442 case RPCRDMA_FRMR: 443 if (frwr_is_supported(ia)) { 444 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 445 break; 446 } 447 /*FALLTHROUGH*/ 448 case RPCRDMA_MTHCAFMR: 449 if (fmr_is_supported(ia)) { 450 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 451 break; 452 } 453 /*FALLTHROUGH*/ 454 default: 455 pr_err("rpcrdma: Device %s does not support memreg mode %d\n", 456 ia->ri_device->name, xprt_rdma_memreg_strategy); 457 rc = -EINVAL; 458 goto out_err; 459 } 460 461 return 0; 462 463 out_err: 464 rpcrdma_ia_close(ia); 465 return rc; 466 } 467 468 /** 469 * rpcrdma_ia_remove - Handle device driver unload 470 * @ia: interface adapter being removed 471 * 472 * Divest transport H/W resources associated with this adapter, 473 * but allow it to be restored later. 474 */ 475 void 476 rpcrdma_ia_remove(struct rpcrdma_ia *ia) 477 { 478 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 479 rx_ia); 480 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 481 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 482 struct rpcrdma_req *req; 483 struct rpcrdma_rep *rep; 484 485 cancel_delayed_work_sync(&buf->rb_refresh_worker); 486 487 /* This is similar to rpcrdma_ep_destroy, but: 488 * - Don't cancel the connect worker. 489 * - Don't call rpcrdma_ep_disconnect, which waits 490 * for another conn upcall, which will deadlock. 491 * - rdma_disconnect is unneeded, the underlying 492 * connection is already gone. 493 */ 494 if (ia->ri_id->qp) { 495 ib_drain_qp(ia->ri_id->qp); 496 rdma_destroy_qp(ia->ri_id); 497 ia->ri_id->qp = NULL; 498 } 499 ib_free_cq(ep->rep_attr.recv_cq); 500 ib_free_cq(ep->rep_attr.send_cq); 501 502 /* The ULP is responsible for ensuring all DMA 503 * mappings and MRs are gone. 504 */ 505 list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list) 506 rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf); 507 list_for_each_entry(req, &buf->rb_allreqs, rl_all) { 508 rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf); 509 rpcrdma_dma_unmap_regbuf(req->rl_sendbuf); 510 rpcrdma_dma_unmap_regbuf(req->rl_recvbuf); 511 } 512 rpcrdma_destroy_mrs(buf); 513 514 /* Allow waiters to continue */ 515 complete(&ia->ri_remove_done); 516 } 517 518 /** 519 * rpcrdma_ia_close - Clean up/close an IA. 520 * @ia: interface adapter to close 521 * 522 */ 523 void 524 rpcrdma_ia_close(struct rpcrdma_ia *ia) 525 { 526 dprintk("RPC: %s: entering\n", __func__); 527 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 528 if (ia->ri_id->qp) 529 rdma_destroy_qp(ia->ri_id); 530 rdma_destroy_id(ia->ri_id); 531 } 532 ia->ri_id = NULL; 533 ia->ri_device = NULL; 534 535 /* If the pd is still busy, xprtrdma missed freeing a resource */ 536 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 537 ib_dealloc_pd(ia->ri_pd); 538 ia->ri_pd = NULL; 539 } 540 541 /* 542 * Create unconnected endpoint. 543 */ 544 int 545 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 546 struct rpcrdma_create_data_internal *cdata) 547 { 548 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 549 unsigned int max_qp_wr, max_sge; 550 struct ib_cq *sendcq, *recvcq; 551 int rc; 552 553 max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge, 554 RPCRDMA_MAX_SEND_SGES); 555 if (max_sge < RPCRDMA_MIN_SEND_SGES) { 556 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); 557 return -ENOMEM; 558 } 559 ia->ri_max_send_sges = max_sge - RPCRDMA_MIN_SEND_SGES; 560 561 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 562 dprintk("RPC: %s: insufficient wqe's available\n", 563 __func__); 564 return -ENOMEM; 565 } 566 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 567 568 /* check provider's send/recv wr limits */ 569 if (cdata->max_requests > max_qp_wr) 570 cdata->max_requests = max_qp_wr; 571 572 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 573 ep->rep_attr.qp_context = ep; 574 ep->rep_attr.srq = NULL; 575 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 576 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 577 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 578 rc = ia->ri_ops->ro_open(ia, ep, cdata); 579 if (rc) 580 return rc; 581 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 582 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 583 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 584 ep->rep_attr.cap.max_send_sge = max_sge; 585 ep->rep_attr.cap.max_recv_sge = 1; 586 ep->rep_attr.cap.max_inline_data = 0; 587 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 588 ep->rep_attr.qp_type = IB_QPT_RC; 589 ep->rep_attr.port_num = ~0; 590 591 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 592 "iovs: send %d recv %d\n", 593 __func__, 594 ep->rep_attr.cap.max_send_wr, 595 ep->rep_attr.cap.max_recv_wr, 596 ep->rep_attr.cap.max_send_sge, 597 ep->rep_attr.cap.max_recv_sge); 598 599 /* set trigger for requesting send completion */ 600 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 601 if (ep->rep_cqinit <= 2) 602 ep->rep_cqinit = 0; /* always signal? */ 603 rpcrdma_init_cqcount(ep, 0); 604 init_waitqueue_head(&ep->rep_connect_wait); 605 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 606 607 sendcq = ib_alloc_cq(ia->ri_device, NULL, 608 ep->rep_attr.cap.max_send_wr + 1, 609 0, IB_POLL_SOFTIRQ); 610 if (IS_ERR(sendcq)) { 611 rc = PTR_ERR(sendcq); 612 dprintk("RPC: %s: failed to create send CQ: %i\n", 613 __func__, rc); 614 goto out1; 615 } 616 617 recvcq = ib_alloc_cq(ia->ri_device, NULL, 618 ep->rep_attr.cap.max_recv_wr + 1, 619 0, IB_POLL_SOFTIRQ); 620 if (IS_ERR(recvcq)) { 621 rc = PTR_ERR(recvcq); 622 dprintk("RPC: %s: failed to create recv CQ: %i\n", 623 __func__, rc); 624 goto out2; 625 } 626 627 ep->rep_attr.send_cq = sendcq; 628 ep->rep_attr.recv_cq = recvcq; 629 630 /* Initialize cma parameters */ 631 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 632 633 /* Prepare RDMA-CM private message */ 634 pmsg->cp_magic = rpcrdma_cmp_magic; 635 pmsg->cp_version = RPCRDMA_CMP_VERSION; 636 pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok; 637 pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize); 638 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize); 639 ep->rep_remote_cma.private_data = pmsg; 640 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 641 642 /* Client offers RDMA Read but does not initiate */ 643 ep->rep_remote_cma.initiator_depth = 0; 644 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 645 ep->rep_remote_cma.responder_resources = 32; 646 else 647 ep->rep_remote_cma.responder_resources = 648 ia->ri_device->attrs.max_qp_rd_atom; 649 650 /* Limit transport retries so client can detect server 651 * GID changes quickly. RPC layer handles re-establishing 652 * transport connection and retransmission. 653 */ 654 ep->rep_remote_cma.retry_count = 6; 655 656 /* RPC-over-RDMA handles its own flow control. In addition, 657 * make all RNR NAKs visible so we know that RPC-over-RDMA 658 * flow control is working correctly (no NAKs should be seen). 659 */ 660 ep->rep_remote_cma.flow_control = 0; 661 ep->rep_remote_cma.rnr_retry_count = 0; 662 663 return 0; 664 665 out2: 666 ib_free_cq(sendcq); 667 out1: 668 return rc; 669 } 670 671 /* 672 * rpcrdma_ep_destroy 673 * 674 * Disconnect and destroy endpoint. After this, the only 675 * valid operations on the ep are to free it (if dynamically 676 * allocated) or re-create it. 677 */ 678 void 679 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 680 { 681 dprintk("RPC: %s: entering, connected is %d\n", 682 __func__, ep->rep_connected); 683 684 cancel_delayed_work_sync(&ep->rep_connect_worker); 685 686 if (ia->ri_id->qp) { 687 rpcrdma_ep_disconnect(ep, ia); 688 rdma_destroy_qp(ia->ri_id); 689 ia->ri_id->qp = NULL; 690 } 691 692 ib_free_cq(ep->rep_attr.recv_cq); 693 ib_free_cq(ep->rep_attr.send_cq); 694 } 695 696 /* Re-establish a connection after a device removal event. 697 * Unlike a normal reconnection, a fresh PD and a new set 698 * of MRs and buffers is needed. 699 */ 700 static int 701 rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, 702 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 703 { 704 struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr; 705 int rc, err; 706 707 pr_info("%s: r_xprt = %p\n", __func__, r_xprt); 708 709 rc = -EHOSTUNREACH; 710 if (rpcrdma_ia_open(r_xprt, sap)) 711 goto out1; 712 713 rc = -ENOMEM; 714 err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data); 715 if (err) { 716 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err); 717 goto out2; 718 } 719 720 rc = -ENETUNREACH; 721 err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 722 if (err) { 723 pr_err("rpcrdma: rdma_create_qp returned %d\n", err); 724 goto out3; 725 } 726 727 rpcrdma_create_mrs(r_xprt); 728 return 0; 729 730 out3: 731 rpcrdma_ep_destroy(ep, ia); 732 out2: 733 rpcrdma_ia_close(ia); 734 out1: 735 return rc; 736 } 737 738 static int 739 rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, 740 struct rpcrdma_ia *ia) 741 { 742 struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr; 743 struct rdma_cm_id *id, *old; 744 int err, rc; 745 746 dprintk("RPC: %s: reconnecting...\n", __func__); 747 748 rpcrdma_ep_disconnect(ep, ia); 749 750 rc = -EHOSTUNREACH; 751 id = rpcrdma_create_id(r_xprt, ia, sap); 752 if (IS_ERR(id)) 753 goto out; 754 755 /* As long as the new ID points to the same device as the 756 * old ID, we can reuse the transport's existing PD and all 757 * previously allocated MRs. Also, the same device means 758 * the transport's previous DMA mappings are still valid. 759 * 760 * This is a sanity check only. There should be no way these 761 * point to two different devices here. 762 */ 763 old = id; 764 rc = -ENETUNREACH; 765 if (ia->ri_device != id->device) { 766 pr_err("rpcrdma: can't reconnect on different device!\n"); 767 goto out_destroy; 768 } 769 770 err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 771 if (err) { 772 dprintk("RPC: %s: rdma_create_qp returned %d\n", 773 __func__, err); 774 goto out_destroy; 775 } 776 777 /* Atomically replace the transport's ID and QP. */ 778 rc = 0; 779 old = ia->ri_id; 780 ia->ri_id = id; 781 rdma_destroy_qp(old); 782 783 out_destroy: 784 rdma_destroy_id(old); 785 out: 786 return rc; 787 } 788 789 /* 790 * Connect unconnected endpoint. 791 */ 792 int 793 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 794 { 795 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 796 rx_ia); 797 unsigned int extras; 798 int rc; 799 800 retry: 801 switch (ep->rep_connected) { 802 case 0: 803 dprintk("RPC: %s: connecting...\n", __func__); 804 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 805 if (rc) { 806 dprintk("RPC: %s: rdma_create_qp failed %i\n", 807 __func__, rc); 808 rc = -ENETUNREACH; 809 goto out_noupdate; 810 } 811 break; 812 case -ENODEV: 813 rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia); 814 if (rc) 815 goto out_noupdate; 816 break; 817 default: 818 rc = rpcrdma_ep_reconnect(r_xprt, ep, ia); 819 if (rc) 820 goto out; 821 } 822 823 ep->rep_connected = 0; 824 825 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 826 if (rc) { 827 dprintk("RPC: %s: rdma_connect() failed with %i\n", 828 __func__, rc); 829 goto out; 830 } 831 832 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 833 if (ep->rep_connected <= 0) { 834 if (ep->rep_connected == -EAGAIN) 835 goto retry; 836 rc = ep->rep_connected; 837 goto out; 838 } 839 840 dprintk("RPC: %s: connected\n", __func__); 841 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 842 if (extras) 843 rpcrdma_ep_post_extra_recv(r_xprt, extras); 844 845 out: 846 if (rc) 847 ep->rep_connected = rc; 848 849 out_noupdate: 850 return rc; 851 } 852 853 /* 854 * rpcrdma_ep_disconnect 855 * 856 * This is separate from destroy to facilitate the ability 857 * to reconnect without recreating the endpoint. 858 * 859 * This call is not reentrant, and must not be made in parallel 860 * on the same endpoint. 861 */ 862 void 863 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 864 { 865 int rc; 866 867 rc = rdma_disconnect(ia->ri_id); 868 if (!rc) { 869 /* returns without wait if not connected */ 870 wait_event_interruptible(ep->rep_connect_wait, 871 ep->rep_connected != 1); 872 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 873 (ep->rep_connected == 1) ? "still " : "dis"); 874 } else { 875 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 876 ep->rep_connected = rc; 877 } 878 879 ib_drain_qp(ia->ri_id->qp); 880 } 881 882 static void 883 rpcrdma_mr_recovery_worker(struct work_struct *work) 884 { 885 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 886 rb_recovery_worker.work); 887 struct rpcrdma_mw *mw; 888 889 spin_lock(&buf->rb_recovery_lock); 890 while (!list_empty(&buf->rb_stale_mrs)) { 891 mw = rpcrdma_pop_mw(&buf->rb_stale_mrs); 892 spin_unlock(&buf->rb_recovery_lock); 893 894 dprintk("RPC: %s: recovering MR %p\n", __func__, mw); 895 mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); 896 897 spin_lock(&buf->rb_recovery_lock); 898 } 899 spin_unlock(&buf->rb_recovery_lock); 900 } 901 902 void 903 rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) 904 { 905 struct rpcrdma_xprt *r_xprt = mw->mw_xprt; 906 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 907 908 spin_lock(&buf->rb_recovery_lock); 909 rpcrdma_push_mw(mw, &buf->rb_stale_mrs); 910 spin_unlock(&buf->rb_recovery_lock); 911 912 schedule_delayed_work(&buf->rb_recovery_worker, 0); 913 } 914 915 static void 916 rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) 917 { 918 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 919 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 920 unsigned int count; 921 LIST_HEAD(free); 922 LIST_HEAD(all); 923 924 for (count = 0; count < 32; count++) { 925 struct rpcrdma_mw *mw; 926 int rc; 927 928 mw = kzalloc(sizeof(*mw), GFP_KERNEL); 929 if (!mw) 930 break; 931 932 rc = ia->ri_ops->ro_init_mr(ia, mw); 933 if (rc) { 934 kfree(mw); 935 break; 936 } 937 938 mw->mw_xprt = r_xprt; 939 940 list_add(&mw->mw_list, &free); 941 list_add(&mw->mw_all, &all); 942 } 943 944 spin_lock(&buf->rb_mwlock); 945 list_splice(&free, &buf->rb_mws); 946 list_splice(&all, &buf->rb_all); 947 r_xprt->rx_stats.mrs_allocated += count; 948 spin_unlock(&buf->rb_mwlock); 949 950 dprintk("RPC: %s: created %u MRs\n", __func__, count); 951 } 952 953 static void 954 rpcrdma_mr_refresh_worker(struct work_struct *work) 955 { 956 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 957 rb_refresh_worker.work); 958 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 959 rx_buf); 960 961 rpcrdma_create_mrs(r_xprt); 962 } 963 964 struct rpcrdma_req * 965 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 966 { 967 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 968 struct rpcrdma_req *req; 969 970 req = kzalloc(sizeof(*req), GFP_KERNEL); 971 if (req == NULL) 972 return ERR_PTR(-ENOMEM); 973 974 INIT_LIST_HEAD(&req->rl_free); 975 spin_lock(&buffer->rb_reqslock); 976 list_add(&req->rl_all, &buffer->rb_allreqs); 977 spin_unlock(&buffer->rb_reqslock); 978 req->rl_cqe.done = rpcrdma_wc_send; 979 req->rl_buffer = &r_xprt->rx_buf; 980 INIT_LIST_HEAD(&req->rl_registered); 981 req->rl_send_wr.next = NULL; 982 req->rl_send_wr.wr_cqe = &req->rl_cqe; 983 req->rl_send_wr.sg_list = req->rl_send_sge; 984 req->rl_send_wr.opcode = IB_WR_SEND; 985 return req; 986 } 987 988 struct rpcrdma_rep * 989 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 990 { 991 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 992 struct rpcrdma_rep *rep; 993 int rc; 994 995 rc = -ENOMEM; 996 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 997 if (rep == NULL) 998 goto out; 999 1000 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize, 1001 DMA_FROM_DEVICE, GFP_KERNEL); 1002 if (IS_ERR(rep->rr_rdmabuf)) { 1003 rc = PTR_ERR(rep->rr_rdmabuf); 1004 goto out_free; 1005 } 1006 1007 rep->rr_cqe.done = rpcrdma_wc_receive; 1008 rep->rr_rxprt = r_xprt; 1009 INIT_WORK(&rep->rr_work, rpcrdma_reply_handler); 1010 rep->rr_recv_wr.next = NULL; 1011 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 1012 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1013 rep->rr_recv_wr.num_sge = 1; 1014 return rep; 1015 1016 out_free: 1017 kfree(rep); 1018 out: 1019 return ERR_PTR(rc); 1020 } 1021 1022 int 1023 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 1024 { 1025 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1026 int i, rc; 1027 1028 buf->rb_max_requests = r_xprt->rx_data.max_requests; 1029 buf->rb_bc_srv_max_requests = 0; 1030 atomic_set(&buf->rb_credits, 1); 1031 spin_lock_init(&buf->rb_mwlock); 1032 spin_lock_init(&buf->rb_lock); 1033 spin_lock_init(&buf->rb_recovery_lock); 1034 INIT_LIST_HEAD(&buf->rb_mws); 1035 INIT_LIST_HEAD(&buf->rb_all); 1036 INIT_LIST_HEAD(&buf->rb_stale_mrs); 1037 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 1038 rpcrdma_mr_refresh_worker); 1039 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 1040 rpcrdma_mr_recovery_worker); 1041 1042 rpcrdma_create_mrs(r_xprt); 1043 1044 INIT_LIST_HEAD(&buf->rb_send_bufs); 1045 INIT_LIST_HEAD(&buf->rb_allreqs); 1046 spin_lock_init(&buf->rb_reqslock); 1047 for (i = 0; i < buf->rb_max_requests; i++) { 1048 struct rpcrdma_req *req; 1049 1050 req = rpcrdma_create_req(r_xprt); 1051 if (IS_ERR(req)) { 1052 dprintk("RPC: %s: request buffer %d alloc" 1053 " failed\n", __func__, i); 1054 rc = PTR_ERR(req); 1055 goto out; 1056 } 1057 req->rl_backchannel = false; 1058 list_add(&req->rl_free, &buf->rb_send_bufs); 1059 } 1060 1061 INIT_LIST_HEAD(&buf->rb_recv_bufs); 1062 for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) { 1063 struct rpcrdma_rep *rep; 1064 1065 rep = rpcrdma_create_rep(r_xprt); 1066 if (IS_ERR(rep)) { 1067 dprintk("RPC: %s: reply buffer %d alloc failed\n", 1068 __func__, i); 1069 rc = PTR_ERR(rep); 1070 goto out; 1071 } 1072 list_add(&rep->rr_list, &buf->rb_recv_bufs); 1073 } 1074 1075 return 0; 1076 out: 1077 rpcrdma_buffer_destroy(buf); 1078 return rc; 1079 } 1080 1081 static struct rpcrdma_req * 1082 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 1083 { 1084 struct rpcrdma_req *req; 1085 1086 req = list_first_entry(&buf->rb_send_bufs, 1087 struct rpcrdma_req, rl_free); 1088 list_del(&req->rl_free); 1089 return req; 1090 } 1091 1092 static struct rpcrdma_rep * 1093 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 1094 { 1095 struct rpcrdma_rep *rep; 1096 1097 rep = list_first_entry(&buf->rb_recv_bufs, 1098 struct rpcrdma_rep, rr_list); 1099 list_del(&rep->rr_list); 1100 return rep; 1101 } 1102 1103 static void 1104 rpcrdma_destroy_rep(struct rpcrdma_rep *rep) 1105 { 1106 rpcrdma_free_regbuf(rep->rr_rdmabuf); 1107 kfree(rep); 1108 } 1109 1110 void 1111 rpcrdma_destroy_req(struct rpcrdma_req *req) 1112 { 1113 rpcrdma_free_regbuf(req->rl_recvbuf); 1114 rpcrdma_free_regbuf(req->rl_sendbuf); 1115 rpcrdma_free_regbuf(req->rl_rdmabuf); 1116 kfree(req); 1117 } 1118 1119 static void 1120 rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) 1121 { 1122 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1123 rx_buf); 1124 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1125 struct rpcrdma_mw *mw; 1126 unsigned int count; 1127 1128 count = 0; 1129 spin_lock(&buf->rb_mwlock); 1130 while (!list_empty(&buf->rb_all)) { 1131 mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); 1132 list_del(&mw->mw_all); 1133 1134 spin_unlock(&buf->rb_mwlock); 1135 ia->ri_ops->ro_release_mr(mw); 1136 count++; 1137 spin_lock(&buf->rb_mwlock); 1138 } 1139 spin_unlock(&buf->rb_mwlock); 1140 r_xprt->rx_stats.mrs_allocated = 0; 1141 1142 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1143 } 1144 1145 void 1146 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1147 { 1148 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1149 cancel_delayed_work_sync(&buf->rb_refresh_worker); 1150 1151 while (!list_empty(&buf->rb_recv_bufs)) { 1152 struct rpcrdma_rep *rep; 1153 1154 rep = rpcrdma_buffer_get_rep_locked(buf); 1155 rpcrdma_destroy_rep(rep); 1156 } 1157 buf->rb_send_count = 0; 1158 1159 spin_lock(&buf->rb_reqslock); 1160 while (!list_empty(&buf->rb_allreqs)) { 1161 struct rpcrdma_req *req; 1162 1163 req = list_first_entry(&buf->rb_allreqs, 1164 struct rpcrdma_req, rl_all); 1165 list_del(&req->rl_all); 1166 1167 spin_unlock(&buf->rb_reqslock); 1168 rpcrdma_destroy_req(req); 1169 spin_lock(&buf->rb_reqslock); 1170 } 1171 spin_unlock(&buf->rb_reqslock); 1172 buf->rb_recv_count = 0; 1173 1174 rpcrdma_destroy_mrs(buf); 1175 } 1176 1177 struct rpcrdma_mw * 1178 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) 1179 { 1180 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1181 struct rpcrdma_mw *mw = NULL; 1182 1183 spin_lock(&buf->rb_mwlock); 1184 if (!list_empty(&buf->rb_mws)) 1185 mw = rpcrdma_pop_mw(&buf->rb_mws); 1186 spin_unlock(&buf->rb_mwlock); 1187 1188 if (!mw) 1189 goto out_nomws; 1190 return mw; 1191 1192 out_nomws: 1193 dprintk("RPC: %s: no MWs available\n", __func__); 1194 if (r_xprt->rx_ep.rep_connected != -ENODEV) 1195 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1196 1197 /* Allow the reply handler and refresh worker to run */ 1198 cond_resched(); 1199 1200 return NULL; 1201 } 1202 1203 void 1204 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) 1205 { 1206 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1207 1208 spin_lock(&buf->rb_mwlock); 1209 rpcrdma_push_mw(mw, &buf->rb_mws); 1210 spin_unlock(&buf->rb_mwlock); 1211 } 1212 1213 static struct rpcrdma_rep * 1214 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) 1215 { 1216 /* If an RPC previously completed without a reply (say, a 1217 * credential problem or a soft timeout occurs) then hold off 1218 * on supplying more Receive buffers until the number of new 1219 * pending RPCs catches up to the number of posted Receives. 1220 */ 1221 if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) 1222 return NULL; 1223 1224 if (unlikely(list_empty(&buffers->rb_recv_bufs))) 1225 return NULL; 1226 buffers->rb_recv_count++; 1227 return rpcrdma_buffer_get_rep_locked(buffers); 1228 } 1229 1230 /* 1231 * Get a set of request/reply buffers. 1232 * 1233 * Reply buffer (if available) is attached to send buffer upon return. 1234 */ 1235 struct rpcrdma_req * 1236 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1237 { 1238 struct rpcrdma_req *req; 1239 1240 spin_lock(&buffers->rb_lock); 1241 if (list_empty(&buffers->rb_send_bufs)) 1242 goto out_reqbuf; 1243 buffers->rb_send_count++; 1244 req = rpcrdma_buffer_get_req_locked(buffers); 1245 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1246 spin_unlock(&buffers->rb_lock); 1247 return req; 1248 1249 out_reqbuf: 1250 spin_unlock(&buffers->rb_lock); 1251 pr_warn("RPC: %s: out of request buffers\n", __func__); 1252 return NULL; 1253 } 1254 1255 /* 1256 * Put request/reply buffers back into pool. 1257 * Pre-decrement counter/array index. 1258 */ 1259 void 1260 rpcrdma_buffer_put(struct rpcrdma_req *req) 1261 { 1262 struct rpcrdma_buffer *buffers = req->rl_buffer; 1263 struct rpcrdma_rep *rep = req->rl_reply; 1264 1265 req->rl_send_wr.num_sge = 0; 1266 req->rl_reply = NULL; 1267 1268 spin_lock(&buffers->rb_lock); 1269 buffers->rb_send_count--; 1270 list_add_tail(&req->rl_free, &buffers->rb_send_bufs); 1271 if (rep) { 1272 buffers->rb_recv_count--; 1273 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1274 } 1275 spin_unlock(&buffers->rb_lock); 1276 } 1277 1278 /* 1279 * Recover reply buffers from pool. 1280 * This happens when recovering from disconnect. 1281 */ 1282 void 1283 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1284 { 1285 struct rpcrdma_buffer *buffers = req->rl_buffer; 1286 1287 spin_lock(&buffers->rb_lock); 1288 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1289 spin_unlock(&buffers->rb_lock); 1290 } 1291 1292 /* 1293 * Put reply buffers back into pool when not attached to 1294 * request. This happens in error conditions. 1295 */ 1296 void 1297 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1298 { 1299 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1300 1301 spin_lock(&buffers->rb_lock); 1302 buffers->rb_recv_count--; 1303 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1304 spin_unlock(&buffers->rb_lock); 1305 } 1306 1307 /** 1308 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers 1309 * @size: size of buffer to be allocated, in bytes 1310 * @direction: direction of data movement 1311 * @flags: GFP flags 1312 * 1313 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that 1314 * can be persistently DMA-mapped for I/O. 1315 * 1316 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1317 * receiving the payload of RDMA RECV operations. During Long Calls 1318 * or Replies they may be registered externally via ro_map. 1319 */ 1320 struct rpcrdma_regbuf * 1321 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction, 1322 gfp_t flags) 1323 { 1324 struct rpcrdma_regbuf *rb; 1325 1326 rb = kmalloc(sizeof(*rb) + size, flags); 1327 if (rb == NULL) 1328 return ERR_PTR(-ENOMEM); 1329 1330 rb->rg_device = NULL; 1331 rb->rg_direction = direction; 1332 rb->rg_iov.length = size; 1333 1334 return rb; 1335 } 1336 1337 /** 1338 * __rpcrdma_map_regbuf - DMA-map a regbuf 1339 * @ia: controlling rpcrdma_ia 1340 * @rb: regbuf to be mapped 1341 */ 1342 bool 1343 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1344 { 1345 struct ib_device *device = ia->ri_device; 1346 1347 if (rb->rg_direction == DMA_NONE) 1348 return false; 1349 1350 rb->rg_iov.addr = ib_dma_map_single(device, 1351 (void *)rb->rg_base, 1352 rdmab_length(rb), 1353 rb->rg_direction); 1354 if (ib_dma_mapping_error(device, rdmab_addr(rb))) 1355 return false; 1356 1357 rb->rg_device = device; 1358 rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey; 1359 return true; 1360 } 1361 1362 static void 1363 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) 1364 { 1365 if (!rpcrdma_regbuf_is_mapped(rb)) 1366 return; 1367 1368 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), 1369 rdmab_length(rb), rb->rg_direction); 1370 rb->rg_device = NULL; 1371 } 1372 1373 /** 1374 * rpcrdma_free_regbuf - deregister and free registered buffer 1375 * @rb: regbuf to be deregistered and freed 1376 */ 1377 void 1378 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) 1379 { 1380 if (!rb) 1381 return; 1382 1383 rpcrdma_dma_unmap_regbuf(rb); 1384 kfree(rb); 1385 } 1386 1387 /* 1388 * Prepost any receive buffer, then post send. 1389 * 1390 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1391 */ 1392 int 1393 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1394 struct rpcrdma_ep *ep, 1395 struct rpcrdma_req *req) 1396 { 1397 struct ib_send_wr *send_wr = &req->rl_send_wr; 1398 struct ib_send_wr *send_wr_fail; 1399 int rc; 1400 1401 if (req->rl_reply) { 1402 rc = rpcrdma_ep_post_recv(ia, req->rl_reply); 1403 if (rc) 1404 return rc; 1405 req->rl_reply = NULL; 1406 } 1407 1408 dprintk("RPC: %s: posting %d s/g entries\n", 1409 __func__, send_wr->num_sge); 1410 1411 rpcrdma_set_signaled(ep, send_wr); 1412 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); 1413 if (rc) 1414 goto out_postsend_err; 1415 return 0; 1416 1417 out_postsend_err: 1418 pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); 1419 return -ENOTCONN; 1420 } 1421 1422 int 1423 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1424 struct rpcrdma_rep *rep) 1425 { 1426 struct ib_recv_wr *recv_wr_fail; 1427 int rc; 1428 1429 if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) 1430 goto out_map; 1431 rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); 1432 if (rc) 1433 goto out_postrecv; 1434 return 0; 1435 1436 out_map: 1437 pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); 1438 return -EIO; 1439 1440 out_postrecv: 1441 pr_err("rpcrdma: ib_post_recv returned %i\n", rc); 1442 return -ENOTCONN; 1443 } 1444 1445 /** 1446 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1447 * @r_xprt: transport associated with these backchannel resources 1448 * @min_reqs: minimum number of incoming requests expected 1449 * 1450 * Returns zero if all requested buffers were posted, or a negative errno. 1451 */ 1452 int 1453 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1454 { 1455 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1456 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1457 struct rpcrdma_rep *rep; 1458 int rc; 1459 1460 while (count--) { 1461 spin_lock(&buffers->rb_lock); 1462 if (list_empty(&buffers->rb_recv_bufs)) 1463 goto out_reqbuf; 1464 rep = rpcrdma_buffer_get_rep_locked(buffers); 1465 spin_unlock(&buffers->rb_lock); 1466 1467 rc = rpcrdma_ep_post_recv(ia, rep); 1468 if (rc) 1469 goto out_rc; 1470 } 1471 1472 return 0; 1473 1474 out_reqbuf: 1475 spin_unlock(&buffers->rb_lock); 1476 pr_warn("%s: no extra receive buffers\n", __func__); 1477 return -ENOMEM; 1478 1479 out_rc: 1480 rpcrdma_recv_buffer_put(rep); 1481 return rc; 1482 } 1483