1 /* 2 * Copyright (c) 2014-2017 Oracle. All rights reserved. 3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 */ 40 41 /* 42 * verbs.c 43 * 44 * Encapsulates the major functions managing: 45 * o adapters 46 * o endpoints 47 * o connections 48 * o buffer memory 49 */ 50 51 #include <linux/interrupt.h> 52 #include <linux/slab.h> 53 #include <linux/sunrpc/addr.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 56 #include <asm-generic/barrier.h> 57 #include <asm/bitops.h> 58 59 #include <rdma/ib_cm.h> 60 61 #include "xprt_rdma.h" 62 63 /* 64 * Globals/Macros 65 */ 66 67 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 68 # define RPCDBG_FACILITY RPCDBG_TRANS 69 #endif 70 71 /* 72 * internal functions 73 */ 74 static void rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt); 75 static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf); 76 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); 77 78 struct workqueue_struct *rpcrdma_receive_wq __read_mostly; 79 80 int 81 rpcrdma_alloc_wq(void) 82 { 83 struct workqueue_struct *recv_wq; 84 85 recv_wq = alloc_workqueue("xprtrdma_receive", 86 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, 87 0); 88 if (!recv_wq) 89 return -ENOMEM; 90 91 rpcrdma_receive_wq = recv_wq; 92 return 0; 93 } 94 95 void 96 rpcrdma_destroy_wq(void) 97 { 98 struct workqueue_struct *wq; 99 100 if (rpcrdma_receive_wq) { 101 wq = rpcrdma_receive_wq; 102 rpcrdma_receive_wq = NULL; 103 destroy_workqueue(wq); 104 } 105 } 106 107 static void 108 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 109 { 110 struct rpcrdma_ep *ep = context; 111 112 pr_err("rpcrdma: %s on device %s ep %p\n", 113 ib_event_msg(event->event), event->device->name, context); 114 115 if (ep->rep_connected == 1) { 116 ep->rep_connected = -EIO; 117 rpcrdma_conn_func(ep); 118 wake_up_all(&ep->rep_connect_wait); 119 } 120 } 121 122 /** 123 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 124 * @cq: completion queue (ignored) 125 * @wc: completed WR 126 * 127 */ 128 static void 129 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 130 { 131 struct ib_cqe *cqe = wc->wr_cqe; 132 struct rpcrdma_sendctx *sc = 133 container_of(cqe, struct rpcrdma_sendctx, sc_cqe); 134 135 /* WARNING: Only wr_cqe and status are reliable at this point */ 136 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 137 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 138 ib_wc_status_msg(wc->status), 139 wc->status, wc->vendor_err); 140 141 rpcrdma_sendctx_put_locked(sc); 142 } 143 144 /** 145 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 146 * @cq: completion queue (ignored) 147 * @wc: completed WR 148 * 149 */ 150 static void 151 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 152 { 153 struct ib_cqe *cqe = wc->wr_cqe; 154 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 155 rr_cqe); 156 157 /* WARNING: Only wr_id and status are reliable at this point */ 158 if (wc->status != IB_WC_SUCCESS) 159 goto out_fail; 160 161 /* status == SUCCESS means all fields in wc are trustworthy */ 162 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 163 __func__, rep, wc->byte_len); 164 165 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len); 166 rep->rr_wc_flags = wc->wc_flags; 167 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 168 169 ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf), 170 rdmab_addr(rep->rr_rdmabuf), 171 wc->byte_len, DMA_FROM_DEVICE); 172 173 out_schedule: 174 rpcrdma_reply_handler(rep); 175 return; 176 177 out_fail: 178 if (wc->status != IB_WC_WR_FLUSH_ERR) 179 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 180 ib_wc_status_msg(wc->status), 181 wc->status, wc->vendor_err); 182 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0); 183 goto out_schedule; 184 } 185 186 static void 187 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, 188 struct rdma_conn_param *param) 189 { 190 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 191 const struct rpcrdma_connect_private *pmsg = param->private_data; 192 unsigned int rsize, wsize; 193 194 /* Default settings for RPC-over-RDMA Version One */ 195 r_xprt->rx_ia.ri_reminv_expected = false; 196 r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; 197 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 198 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 199 200 if (pmsg && 201 pmsg->cp_magic == rpcrdma_cmp_magic && 202 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 203 r_xprt->rx_ia.ri_reminv_expected = true; 204 r_xprt->rx_ia.ri_implicit_roundup = true; 205 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 206 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 207 } 208 209 if (rsize < cdata->inline_rsize) 210 cdata->inline_rsize = rsize; 211 if (wsize < cdata->inline_wsize) 212 cdata->inline_wsize = wsize; 213 dprintk("RPC: %s: max send %u, max recv %u\n", 214 __func__, cdata->inline_wsize, cdata->inline_rsize); 215 rpcrdma_set_max_header_sizes(r_xprt); 216 } 217 218 static int 219 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 220 { 221 struct rpcrdma_xprt *xprt = id->context; 222 struct rpcrdma_ia *ia = &xprt->rx_ia; 223 struct rpcrdma_ep *ep = &xprt->rx_ep; 224 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 225 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 226 #endif 227 int connstate = 0; 228 229 switch (event->event) { 230 case RDMA_CM_EVENT_ADDR_RESOLVED: 231 case RDMA_CM_EVENT_ROUTE_RESOLVED: 232 ia->ri_async_rc = 0; 233 complete(&ia->ri_done); 234 break; 235 case RDMA_CM_EVENT_ADDR_ERROR: 236 ia->ri_async_rc = -EHOSTUNREACH; 237 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 238 __func__, ep); 239 complete(&ia->ri_done); 240 break; 241 case RDMA_CM_EVENT_ROUTE_ERROR: 242 ia->ri_async_rc = -ENETUNREACH; 243 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 244 __func__, ep); 245 complete(&ia->ri_done); 246 break; 247 case RDMA_CM_EVENT_DEVICE_REMOVAL: 248 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 249 pr_info("rpcrdma: removing device %s for %pIS:%u\n", 250 ia->ri_device->name, 251 sap, rpc_get_port(sap)); 252 #endif 253 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); 254 ep->rep_connected = -ENODEV; 255 xprt_force_disconnect(&xprt->rx_xprt); 256 wait_for_completion(&ia->ri_remove_done); 257 258 ia->ri_id = NULL; 259 ia->ri_pd = NULL; 260 ia->ri_device = NULL; 261 /* Return 1 to ensure the core destroys the id. */ 262 return 1; 263 case RDMA_CM_EVENT_ESTABLISHED: 264 connstate = 1; 265 rpcrdma_update_connect_private(xprt, &event->param.conn); 266 goto connected; 267 case RDMA_CM_EVENT_CONNECT_ERROR: 268 connstate = -ENOTCONN; 269 goto connected; 270 case RDMA_CM_EVENT_UNREACHABLE: 271 connstate = -ENETDOWN; 272 goto connected; 273 case RDMA_CM_EVENT_REJECTED: 274 dprintk("rpcrdma: connection to %pIS:%u rejected: %s\n", 275 sap, rpc_get_port(sap), 276 rdma_reject_msg(id, event->status)); 277 connstate = -ECONNREFUSED; 278 if (event->status == IB_CM_REJ_STALE_CONN) 279 connstate = -EAGAIN; 280 goto connected; 281 case RDMA_CM_EVENT_DISCONNECTED: 282 connstate = -ECONNABORTED; 283 connected: 284 xprt->rx_buf.rb_credits = 1; 285 ep->rep_connected = connstate; 286 rpcrdma_conn_func(ep); 287 wake_up_all(&ep->rep_connect_wait); 288 /*FALLTHROUGH*/ 289 default: 290 dprintk("RPC: %s: %pIS:%u on %s/%s (ep 0x%p): %s\n", 291 __func__, sap, rpc_get_port(sap), 292 ia->ri_device->name, ia->ri_ops->ro_displayname, 293 ep, rdma_event_msg(event->event)); 294 break; 295 } 296 297 return 0; 298 } 299 300 static struct rdma_cm_id * 301 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 302 struct rpcrdma_ia *ia, struct sockaddr *addr) 303 { 304 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; 305 struct rdma_cm_id *id; 306 int rc; 307 308 init_completion(&ia->ri_done); 309 init_completion(&ia->ri_remove_done); 310 311 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 312 IB_QPT_RC); 313 if (IS_ERR(id)) { 314 rc = PTR_ERR(id); 315 dprintk("RPC: %s: rdma_create_id() failed %i\n", 316 __func__, rc); 317 return id; 318 } 319 320 ia->ri_async_rc = -ETIMEDOUT; 321 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 322 if (rc) { 323 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 324 __func__, rc); 325 goto out; 326 } 327 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 328 if (rc < 0) { 329 dprintk("RPC: %s: wait() exited: %i\n", 330 __func__, rc); 331 goto out; 332 } 333 334 rc = ia->ri_async_rc; 335 if (rc) 336 goto out; 337 338 ia->ri_async_rc = -ETIMEDOUT; 339 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 340 if (rc) { 341 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 342 __func__, rc); 343 goto out; 344 } 345 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 346 if (rc < 0) { 347 dprintk("RPC: %s: wait() exited: %i\n", 348 __func__, rc); 349 goto out; 350 } 351 rc = ia->ri_async_rc; 352 if (rc) 353 goto out; 354 355 return id; 356 357 out: 358 rdma_destroy_id(id); 359 return ERR_PTR(rc); 360 } 361 362 /* 363 * Exported functions. 364 */ 365 366 /** 367 * rpcrdma_ia_open - Open and initialize an Interface Adapter. 368 * @xprt: controlling transport 369 * @addr: IP address of remote peer 370 * 371 * Returns 0 on success, negative errno if an appropriate 372 * Interface Adapter could not be found and opened. 373 */ 374 int 375 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr) 376 { 377 struct rpcrdma_ia *ia = &xprt->rx_ia; 378 int rc; 379 380 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 381 if (IS_ERR(ia->ri_id)) { 382 rc = PTR_ERR(ia->ri_id); 383 goto out_err; 384 } 385 ia->ri_device = ia->ri_id->device; 386 387 ia->ri_pd = ib_alloc_pd(ia->ri_device, 0); 388 if (IS_ERR(ia->ri_pd)) { 389 rc = PTR_ERR(ia->ri_pd); 390 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 391 goto out_err; 392 } 393 394 switch (xprt_rdma_memreg_strategy) { 395 case RPCRDMA_FRMR: 396 if (frwr_is_supported(ia)) { 397 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 398 break; 399 } 400 /*FALLTHROUGH*/ 401 case RPCRDMA_MTHCAFMR: 402 if (fmr_is_supported(ia)) { 403 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 404 break; 405 } 406 /*FALLTHROUGH*/ 407 default: 408 pr_err("rpcrdma: Device %s does not support memreg mode %d\n", 409 ia->ri_device->name, xprt_rdma_memreg_strategy); 410 rc = -EINVAL; 411 goto out_err; 412 } 413 414 return 0; 415 416 out_err: 417 rpcrdma_ia_close(ia); 418 return rc; 419 } 420 421 /** 422 * rpcrdma_ia_remove - Handle device driver unload 423 * @ia: interface adapter being removed 424 * 425 * Divest transport H/W resources associated with this adapter, 426 * but allow it to be restored later. 427 */ 428 void 429 rpcrdma_ia_remove(struct rpcrdma_ia *ia) 430 { 431 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 432 rx_ia); 433 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 434 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 435 struct rpcrdma_req *req; 436 struct rpcrdma_rep *rep; 437 438 cancel_delayed_work_sync(&buf->rb_refresh_worker); 439 440 /* This is similar to rpcrdma_ep_destroy, but: 441 * - Don't cancel the connect worker. 442 * - Don't call rpcrdma_ep_disconnect, which waits 443 * for another conn upcall, which will deadlock. 444 * - rdma_disconnect is unneeded, the underlying 445 * connection is already gone. 446 */ 447 if (ia->ri_id->qp) { 448 ib_drain_qp(ia->ri_id->qp); 449 rdma_destroy_qp(ia->ri_id); 450 ia->ri_id->qp = NULL; 451 } 452 ib_free_cq(ep->rep_attr.recv_cq); 453 ib_free_cq(ep->rep_attr.send_cq); 454 455 /* The ULP is responsible for ensuring all DMA 456 * mappings and MRs are gone. 457 */ 458 list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list) 459 rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf); 460 list_for_each_entry(req, &buf->rb_allreqs, rl_all) { 461 rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf); 462 rpcrdma_dma_unmap_regbuf(req->rl_sendbuf); 463 rpcrdma_dma_unmap_regbuf(req->rl_recvbuf); 464 } 465 rpcrdma_destroy_mrs(buf); 466 467 /* Allow waiters to continue */ 468 complete(&ia->ri_remove_done); 469 } 470 471 /** 472 * rpcrdma_ia_close - Clean up/close an IA. 473 * @ia: interface adapter to close 474 * 475 */ 476 void 477 rpcrdma_ia_close(struct rpcrdma_ia *ia) 478 { 479 dprintk("RPC: %s: entering\n", __func__); 480 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 481 if (ia->ri_id->qp) 482 rdma_destroy_qp(ia->ri_id); 483 rdma_destroy_id(ia->ri_id); 484 } 485 ia->ri_id = NULL; 486 ia->ri_device = NULL; 487 488 /* If the pd is still busy, xprtrdma missed freeing a resource */ 489 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 490 ib_dealloc_pd(ia->ri_pd); 491 ia->ri_pd = NULL; 492 } 493 494 /* 495 * Create unconnected endpoint. 496 */ 497 int 498 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 499 struct rpcrdma_create_data_internal *cdata) 500 { 501 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 502 unsigned int max_qp_wr, max_sge; 503 struct ib_cq *sendcq, *recvcq; 504 int rc; 505 506 max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge, 507 RPCRDMA_MAX_SEND_SGES); 508 if (max_sge < RPCRDMA_MIN_SEND_SGES) { 509 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); 510 return -ENOMEM; 511 } 512 ia->ri_max_send_sges = max_sge - RPCRDMA_MIN_SEND_SGES; 513 514 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 515 dprintk("RPC: %s: insufficient wqe's available\n", 516 __func__); 517 return -ENOMEM; 518 } 519 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 520 521 /* check provider's send/recv wr limits */ 522 if (cdata->max_requests > max_qp_wr) 523 cdata->max_requests = max_qp_wr; 524 525 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 526 ep->rep_attr.qp_context = ep; 527 ep->rep_attr.srq = NULL; 528 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 529 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 530 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 531 rc = ia->ri_ops->ro_open(ia, ep, cdata); 532 if (rc) 533 return rc; 534 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 535 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 536 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 537 ep->rep_attr.cap.max_send_sge = max_sge; 538 ep->rep_attr.cap.max_recv_sge = 1; 539 ep->rep_attr.cap.max_inline_data = 0; 540 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 541 ep->rep_attr.qp_type = IB_QPT_RC; 542 ep->rep_attr.port_num = ~0; 543 544 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 545 "iovs: send %d recv %d\n", 546 __func__, 547 ep->rep_attr.cap.max_send_wr, 548 ep->rep_attr.cap.max_recv_wr, 549 ep->rep_attr.cap.max_send_sge, 550 ep->rep_attr.cap.max_recv_sge); 551 552 /* set trigger for requesting send completion */ 553 ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH, 554 cdata->max_requests >> 2); 555 ep->rep_send_count = ep->rep_send_batch; 556 init_waitqueue_head(&ep->rep_connect_wait); 557 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 558 559 sendcq = ib_alloc_cq(ia->ri_device, NULL, 560 ep->rep_attr.cap.max_send_wr + 1, 561 1, IB_POLL_WORKQUEUE); 562 if (IS_ERR(sendcq)) { 563 rc = PTR_ERR(sendcq); 564 dprintk("RPC: %s: failed to create send CQ: %i\n", 565 __func__, rc); 566 goto out1; 567 } 568 569 recvcq = ib_alloc_cq(ia->ri_device, NULL, 570 ep->rep_attr.cap.max_recv_wr + 1, 571 0, IB_POLL_WORKQUEUE); 572 if (IS_ERR(recvcq)) { 573 rc = PTR_ERR(recvcq); 574 dprintk("RPC: %s: failed to create recv CQ: %i\n", 575 __func__, rc); 576 goto out2; 577 } 578 579 ep->rep_attr.send_cq = sendcq; 580 ep->rep_attr.recv_cq = recvcq; 581 582 /* Initialize cma parameters */ 583 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 584 585 /* Prepare RDMA-CM private message */ 586 pmsg->cp_magic = rpcrdma_cmp_magic; 587 pmsg->cp_version = RPCRDMA_CMP_VERSION; 588 pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok; 589 pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize); 590 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize); 591 ep->rep_remote_cma.private_data = pmsg; 592 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 593 594 /* Client offers RDMA Read but does not initiate */ 595 ep->rep_remote_cma.initiator_depth = 0; 596 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 597 ep->rep_remote_cma.responder_resources = 32; 598 else 599 ep->rep_remote_cma.responder_resources = 600 ia->ri_device->attrs.max_qp_rd_atom; 601 602 /* Limit transport retries so client can detect server 603 * GID changes quickly. RPC layer handles re-establishing 604 * transport connection and retransmission. 605 */ 606 ep->rep_remote_cma.retry_count = 6; 607 608 /* RPC-over-RDMA handles its own flow control. In addition, 609 * make all RNR NAKs visible so we know that RPC-over-RDMA 610 * flow control is working correctly (no NAKs should be seen). 611 */ 612 ep->rep_remote_cma.flow_control = 0; 613 ep->rep_remote_cma.rnr_retry_count = 0; 614 615 return 0; 616 617 out2: 618 ib_free_cq(sendcq); 619 out1: 620 return rc; 621 } 622 623 /* 624 * rpcrdma_ep_destroy 625 * 626 * Disconnect and destroy endpoint. After this, the only 627 * valid operations on the ep are to free it (if dynamically 628 * allocated) or re-create it. 629 */ 630 void 631 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 632 { 633 dprintk("RPC: %s: entering, connected is %d\n", 634 __func__, ep->rep_connected); 635 636 cancel_delayed_work_sync(&ep->rep_connect_worker); 637 638 if (ia->ri_id->qp) { 639 rpcrdma_ep_disconnect(ep, ia); 640 rdma_destroy_qp(ia->ri_id); 641 ia->ri_id->qp = NULL; 642 } 643 644 ib_free_cq(ep->rep_attr.recv_cq); 645 ib_free_cq(ep->rep_attr.send_cq); 646 } 647 648 /* Re-establish a connection after a device removal event. 649 * Unlike a normal reconnection, a fresh PD and a new set 650 * of MRs and buffers is needed. 651 */ 652 static int 653 rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, 654 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 655 { 656 struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr; 657 int rc, err; 658 659 pr_info("%s: r_xprt = %p\n", __func__, r_xprt); 660 661 rc = -EHOSTUNREACH; 662 if (rpcrdma_ia_open(r_xprt, sap)) 663 goto out1; 664 665 rc = -ENOMEM; 666 err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data); 667 if (err) { 668 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err); 669 goto out2; 670 } 671 672 rc = -ENETUNREACH; 673 err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 674 if (err) { 675 pr_err("rpcrdma: rdma_create_qp returned %d\n", err); 676 goto out3; 677 } 678 679 rpcrdma_create_mrs(r_xprt); 680 return 0; 681 682 out3: 683 rpcrdma_ep_destroy(ep, ia); 684 out2: 685 rpcrdma_ia_close(ia); 686 out1: 687 return rc; 688 } 689 690 static int 691 rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, 692 struct rpcrdma_ia *ia) 693 { 694 struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr; 695 struct rdma_cm_id *id, *old; 696 int err, rc; 697 698 dprintk("RPC: %s: reconnecting...\n", __func__); 699 700 rpcrdma_ep_disconnect(ep, ia); 701 702 rc = -EHOSTUNREACH; 703 id = rpcrdma_create_id(r_xprt, ia, sap); 704 if (IS_ERR(id)) 705 goto out; 706 707 /* As long as the new ID points to the same device as the 708 * old ID, we can reuse the transport's existing PD and all 709 * previously allocated MRs. Also, the same device means 710 * the transport's previous DMA mappings are still valid. 711 * 712 * This is a sanity check only. There should be no way these 713 * point to two different devices here. 714 */ 715 old = id; 716 rc = -ENETUNREACH; 717 if (ia->ri_device != id->device) { 718 pr_err("rpcrdma: can't reconnect on different device!\n"); 719 goto out_destroy; 720 } 721 722 err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 723 if (err) { 724 dprintk("RPC: %s: rdma_create_qp returned %d\n", 725 __func__, err); 726 goto out_destroy; 727 } 728 729 /* Atomically replace the transport's ID and QP. */ 730 rc = 0; 731 old = ia->ri_id; 732 ia->ri_id = id; 733 rdma_destroy_qp(old); 734 735 out_destroy: 736 rdma_destroy_id(old); 737 out: 738 return rc; 739 } 740 741 /* 742 * Connect unconnected endpoint. 743 */ 744 int 745 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 746 { 747 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 748 rx_ia); 749 unsigned int extras; 750 int rc; 751 752 retry: 753 switch (ep->rep_connected) { 754 case 0: 755 dprintk("RPC: %s: connecting...\n", __func__); 756 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 757 if (rc) { 758 dprintk("RPC: %s: rdma_create_qp failed %i\n", 759 __func__, rc); 760 rc = -ENETUNREACH; 761 goto out_noupdate; 762 } 763 break; 764 case -ENODEV: 765 rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia); 766 if (rc) 767 goto out_noupdate; 768 break; 769 default: 770 rc = rpcrdma_ep_reconnect(r_xprt, ep, ia); 771 if (rc) 772 goto out; 773 } 774 775 ep->rep_connected = 0; 776 777 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 778 if (rc) { 779 dprintk("RPC: %s: rdma_connect() failed with %i\n", 780 __func__, rc); 781 goto out; 782 } 783 784 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 785 if (ep->rep_connected <= 0) { 786 if (ep->rep_connected == -EAGAIN) 787 goto retry; 788 rc = ep->rep_connected; 789 goto out; 790 } 791 792 dprintk("RPC: %s: connected\n", __func__); 793 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 794 if (extras) 795 rpcrdma_ep_post_extra_recv(r_xprt, extras); 796 797 out: 798 if (rc) 799 ep->rep_connected = rc; 800 801 out_noupdate: 802 return rc; 803 } 804 805 /* 806 * rpcrdma_ep_disconnect 807 * 808 * This is separate from destroy to facilitate the ability 809 * to reconnect without recreating the endpoint. 810 * 811 * This call is not reentrant, and must not be made in parallel 812 * on the same endpoint. 813 */ 814 void 815 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 816 { 817 int rc; 818 819 rc = rdma_disconnect(ia->ri_id); 820 if (!rc) { 821 /* returns without wait if not connected */ 822 wait_event_interruptible(ep->rep_connect_wait, 823 ep->rep_connected != 1); 824 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 825 (ep->rep_connected == 1) ? "still " : "dis"); 826 } else { 827 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 828 ep->rep_connected = rc; 829 } 830 831 ib_drain_qp(ia->ri_id->qp); 832 } 833 834 /* Fixed-size circular FIFO queue. This implementation is wait-free and 835 * lock-free. 836 * 837 * Consumer is the code path that posts Sends. This path dequeues a 838 * sendctx for use by a Send operation. Multiple consumer threads 839 * are serialized by the RPC transport lock, which allows only one 840 * ->send_request call at a time. 841 * 842 * Producer is the code path that handles Send completions. This path 843 * enqueues a sendctx that has been completed. Multiple producer 844 * threads are serialized by the ib_poll_cq() function. 845 */ 846 847 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced 848 * queue activity, and ib_drain_qp has flushed all remaining Send 849 * requests. 850 */ 851 static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf) 852 { 853 unsigned long i; 854 855 for (i = 0; i <= buf->rb_sc_last; i++) 856 kfree(buf->rb_sc_ctxs[i]); 857 kfree(buf->rb_sc_ctxs); 858 } 859 860 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia) 861 { 862 struct rpcrdma_sendctx *sc; 863 864 sc = kzalloc(sizeof(*sc) + 865 ia->ri_max_send_sges * sizeof(struct ib_sge), 866 GFP_KERNEL); 867 if (!sc) 868 return NULL; 869 870 sc->sc_wr.wr_cqe = &sc->sc_cqe; 871 sc->sc_wr.sg_list = sc->sc_sges; 872 sc->sc_wr.opcode = IB_WR_SEND; 873 sc->sc_cqe.done = rpcrdma_wc_send; 874 return sc; 875 } 876 877 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) 878 { 879 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 880 struct rpcrdma_sendctx *sc; 881 unsigned long i; 882 883 /* Maximum number of concurrent outstanding Send WRs. Capping 884 * the circular queue size stops Send Queue overflow by causing 885 * the ->send_request call to fail temporarily before too many 886 * Sends are posted. 887 */ 888 i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; 889 dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i); 890 buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL); 891 if (!buf->rb_sc_ctxs) 892 return -ENOMEM; 893 894 buf->rb_sc_last = i - 1; 895 for (i = 0; i <= buf->rb_sc_last; i++) { 896 sc = rpcrdma_sendctx_create(&r_xprt->rx_ia); 897 if (!sc) 898 goto out_destroy; 899 900 sc->sc_xprt = r_xprt; 901 buf->rb_sc_ctxs[i] = sc; 902 } 903 904 return 0; 905 906 out_destroy: 907 rpcrdma_sendctxs_destroy(buf); 908 return -ENOMEM; 909 } 910 911 /* The sendctx queue is not guaranteed to have a size that is a 912 * power of two, thus the helpers in circ_buf.h cannot be used. 913 * The other option is to use modulus (%), which can be expensive. 914 */ 915 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf, 916 unsigned long item) 917 { 918 return likely(item < buf->rb_sc_last) ? item + 1 : 0; 919 } 920 921 /** 922 * rpcrdma_sendctx_get_locked - Acquire a send context 923 * @buf: transport buffers from which to acquire an unused context 924 * 925 * Returns pointer to a free send completion context; or NULL if 926 * the queue is empty. 927 * 928 * Usage: Called to acquire an SGE array before preparing a Send WR. 929 * 930 * The caller serializes calls to this function (per rpcrdma_buffer), 931 * and provides an effective memory barrier that flushes the new value 932 * of rb_sc_head. 933 */ 934 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf) 935 { 936 struct rpcrdma_xprt *r_xprt; 937 struct rpcrdma_sendctx *sc; 938 unsigned long next_head; 939 940 next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head); 941 942 if (next_head == READ_ONCE(buf->rb_sc_tail)) 943 goto out_emptyq; 944 945 /* ORDER: item must be accessed _before_ head is updated */ 946 sc = buf->rb_sc_ctxs[next_head]; 947 948 /* Releasing the lock in the caller acts as a memory 949 * barrier that flushes rb_sc_head. 950 */ 951 buf->rb_sc_head = next_head; 952 953 return sc; 954 955 out_emptyq: 956 /* The queue is "empty" if there have not been enough Send 957 * completions recently. This is a sign the Send Queue is 958 * backing up. Cause the caller to pause and try again. 959 */ 960 dprintk("RPC: %s: empty sendctx queue\n", __func__); 961 r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); 962 r_xprt->rx_stats.empty_sendctx_q++; 963 return NULL; 964 } 965 966 /** 967 * rpcrdma_sendctx_put_locked - Release a send context 968 * @sc: send context to release 969 * 970 * Usage: Called from Send completion to return a sendctxt 971 * to the queue. 972 * 973 * The caller serializes calls to this function (per rpcrdma_buffer). 974 */ 975 void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) 976 { 977 struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; 978 unsigned long next_tail; 979 980 /* Unmap SGEs of previously completed by unsignaled 981 * Sends by walking up the queue until @sc is found. 982 */ 983 next_tail = buf->rb_sc_tail; 984 do { 985 next_tail = rpcrdma_sendctx_next(buf, next_tail); 986 987 /* ORDER: item must be accessed _before_ tail is updated */ 988 rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]); 989 990 } while (buf->rb_sc_ctxs[next_tail] != sc); 991 992 /* Paired with READ_ONCE */ 993 smp_store_release(&buf->rb_sc_tail, next_tail); 994 } 995 996 static void 997 rpcrdma_mr_recovery_worker(struct work_struct *work) 998 { 999 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 1000 rb_recovery_worker.work); 1001 struct rpcrdma_mw *mw; 1002 1003 spin_lock(&buf->rb_recovery_lock); 1004 while (!list_empty(&buf->rb_stale_mrs)) { 1005 mw = rpcrdma_pop_mw(&buf->rb_stale_mrs); 1006 spin_unlock(&buf->rb_recovery_lock); 1007 1008 dprintk("RPC: %s: recovering MR %p\n", __func__, mw); 1009 mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); 1010 1011 spin_lock(&buf->rb_recovery_lock); 1012 } 1013 spin_unlock(&buf->rb_recovery_lock); 1014 } 1015 1016 void 1017 rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) 1018 { 1019 struct rpcrdma_xprt *r_xprt = mw->mw_xprt; 1020 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1021 1022 spin_lock(&buf->rb_recovery_lock); 1023 rpcrdma_push_mw(mw, &buf->rb_stale_mrs); 1024 spin_unlock(&buf->rb_recovery_lock); 1025 1026 schedule_delayed_work(&buf->rb_recovery_worker, 0); 1027 } 1028 1029 static void 1030 rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) 1031 { 1032 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1033 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1034 unsigned int count; 1035 LIST_HEAD(free); 1036 LIST_HEAD(all); 1037 1038 for (count = 0; count < 32; count++) { 1039 struct rpcrdma_mw *mw; 1040 int rc; 1041 1042 mw = kzalloc(sizeof(*mw), GFP_KERNEL); 1043 if (!mw) 1044 break; 1045 1046 rc = ia->ri_ops->ro_init_mr(ia, mw); 1047 if (rc) { 1048 kfree(mw); 1049 break; 1050 } 1051 1052 mw->mw_xprt = r_xprt; 1053 1054 list_add(&mw->mw_list, &free); 1055 list_add(&mw->mw_all, &all); 1056 } 1057 1058 spin_lock(&buf->rb_mwlock); 1059 list_splice(&free, &buf->rb_mws); 1060 list_splice(&all, &buf->rb_all); 1061 r_xprt->rx_stats.mrs_allocated += count; 1062 spin_unlock(&buf->rb_mwlock); 1063 1064 dprintk("RPC: %s: created %u MRs\n", __func__, count); 1065 } 1066 1067 static void 1068 rpcrdma_mr_refresh_worker(struct work_struct *work) 1069 { 1070 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 1071 rb_refresh_worker.work); 1072 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1073 rx_buf); 1074 1075 rpcrdma_create_mrs(r_xprt); 1076 } 1077 1078 struct rpcrdma_req * 1079 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 1080 { 1081 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 1082 struct rpcrdma_req *req; 1083 1084 req = kzalloc(sizeof(*req), GFP_KERNEL); 1085 if (req == NULL) 1086 return ERR_PTR(-ENOMEM); 1087 1088 spin_lock(&buffer->rb_reqslock); 1089 list_add(&req->rl_all, &buffer->rb_allreqs); 1090 spin_unlock(&buffer->rb_reqslock); 1091 req->rl_buffer = &r_xprt->rx_buf; 1092 INIT_LIST_HEAD(&req->rl_registered); 1093 return req; 1094 } 1095 1096 struct rpcrdma_rep * 1097 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 1098 { 1099 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 1100 struct rpcrdma_rep *rep; 1101 int rc; 1102 1103 rc = -ENOMEM; 1104 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 1105 if (rep == NULL) 1106 goto out; 1107 1108 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize, 1109 DMA_FROM_DEVICE, GFP_KERNEL); 1110 if (IS_ERR(rep->rr_rdmabuf)) { 1111 rc = PTR_ERR(rep->rr_rdmabuf); 1112 goto out_free; 1113 } 1114 xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base, 1115 rdmab_length(rep->rr_rdmabuf)); 1116 1117 rep->rr_cqe.done = rpcrdma_wc_receive; 1118 rep->rr_rxprt = r_xprt; 1119 INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion); 1120 rep->rr_recv_wr.next = NULL; 1121 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 1122 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1123 rep->rr_recv_wr.num_sge = 1; 1124 return rep; 1125 1126 out_free: 1127 kfree(rep); 1128 out: 1129 return ERR_PTR(rc); 1130 } 1131 1132 int 1133 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 1134 { 1135 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1136 int i, rc; 1137 1138 buf->rb_max_requests = r_xprt->rx_data.max_requests; 1139 buf->rb_bc_srv_max_requests = 0; 1140 spin_lock_init(&buf->rb_mwlock); 1141 spin_lock_init(&buf->rb_lock); 1142 spin_lock_init(&buf->rb_recovery_lock); 1143 INIT_LIST_HEAD(&buf->rb_mws); 1144 INIT_LIST_HEAD(&buf->rb_all); 1145 INIT_LIST_HEAD(&buf->rb_stale_mrs); 1146 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 1147 rpcrdma_mr_refresh_worker); 1148 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 1149 rpcrdma_mr_recovery_worker); 1150 1151 rpcrdma_create_mrs(r_xprt); 1152 1153 INIT_LIST_HEAD(&buf->rb_send_bufs); 1154 INIT_LIST_HEAD(&buf->rb_allreqs); 1155 spin_lock_init(&buf->rb_reqslock); 1156 for (i = 0; i < buf->rb_max_requests; i++) { 1157 struct rpcrdma_req *req; 1158 1159 req = rpcrdma_create_req(r_xprt); 1160 if (IS_ERR(req)) { 1161 dprintk("RPC: %s: request buffer %d alloc" 1162 " failed\n", __func__, i); 1163 rc = PTR_ERR(req); 1164 goto out; 1165 } 1166 list_add(&req->rl_list, &buf->rb_send_bufs); 1167 } 1168 1169 INIT_LIST_HEAD(&buf->rb_recv_bufs); 1170 for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) { 1171 struct rpcrdma_rep *rep; 1172 1173 rep = rpcrdma_create_rep(r_xprt); 1174 if (IS_ERR(rep)) { 1175 dprintk("RPC: %s: reply buffer %d alloc failed\n", 1176 __func__, i); 1177 rc = PTR_ERR(rep); 1178 goto out; 1179 } 1180 list_add(&rep->rr_list, &buf->rb_recv_bufs); 1181 } 1182 1183 rc = rpcrdma_sendctxs_create(r_xprt); 1184 if (rc) 1185 goto out; 1186 1187 return 0; 1188 out: 1189 rpcrdma_buffer_destroy(buf); 1190 return rc; 1191 } 1192 1193 static struct rpcrdma_req * 1194 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 1195 { 1196 struct rpcrdma_req *req; 1197 1198 req = list_first_entry(&buf->rb_send_bufs, 1199 struct rpcrdma_req, rl_list); 1200 list_del_init(&req->rl_list); 1201 return req; 1202 } 1203 1204 static struct rpcrdma_rep * 1205 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 1206 { 1207 struct rpcrdma_rep *rep; 1208 1209 rep = list_first_entry(&buf->rb_recv_bufs, 1210 struct rpcrdma_rep, rr_list); 1211 list_del(&rep->rr_list); 1212 return rep; 1213 } 1214 1215 static void 1216 rpcrdma_destroy_rep(struct rpcrdma_rep *rep) 1217 { 1218 rpcrdma_free_regbuf(rep->rr_rdmabuf); 1219 kfree(rep); 1220 } 1221 1222 void 1223 rpcrdma_destroy_req(struct rpcrdma_req *req) 1224 { 1225 rpcrdma_free_regbuf(req->rl_recvbuf); 1226 rpcrdma_free_regbuf(req->rl_sendbuf); 1227 rpcrdma_free_regbuf(req->rl_rdmabuf); 1228 kfree(req); 1229 } 1230 1231 static void 1232 rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) 1233 { 1234 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1235 rx_buf); 1236 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1237 struct rpcrdma_mw *mw; 1238 unsigned int count; 1239 1240 count = 0; 1241 spin_lock(&buf->rb_mwlock); 1242 while (!list_empty(&buf->rb_all)) { 1243 mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); 1244 list_del(&mw->mw_all); 1245 1246 spin_unlock(&buf->rb_mwlock); 1247 ia->ri_ops->ro_release_mr(mw); 1248 count++; 1249 spin_lock(&buf->rb_mwlock); 1250 } 1251 spin_unlock(&buf->rb_mwlock); 1252 r_xprt->rx_stats.mrs_allocated = 0; 1253 1254 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1255 } 1256 1257 void 1258 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1259 { 1260 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1261 cancel_delayed_work_sync(&buf->rb_refresh_worker); 1262 1263 rpcrdma_sendctxs_destroy(buf); 1264 1265 while (!list_empty(&buf->rb_recv_bufs)) { 1266 struct rpcrdma_rep *rep; 1267 1268 rep = rpcrdma_buffer_get_rep_locked(buf); 1269 rpcrdma_destroy_rep(rep); 1270 } 1271 buf->rb_send_count = 0; 1272 1273 spin_lock(&buf->rb_reqslock); 1274 while (!list_empty(&buf->rb_allreqs)) { 1275 struct rpcrdma_req *req; 1276 1277 req = list_first_entry(&buf->rb_allreqs, 1278 struct rpcrdma_req, rl_all); 1279 list_del(&req->rl_all); 1280 1281 spin_unlock(&buf->rb_reqslock); 1282 rpcrdma_destroy_req(req); 1283 spin_lock(&buf->rb_reqslock); 1284 } 1285 spin_unlock(&buf->rb_reqslock); 1286 buf->rb_recv_count = 0; 1287 1288 rpcrdma_destroy_mrs(buf); 1289 } 1290 1291 struct rpcrdma_mw * 1292 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) 1293 { 1294 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1295 struct rpcrdma_mw *mw = NULL; 1296 1297 spin_lock(&buf->rb_mwlock); 1298 if (!list_empty(&buf->rb_mws)) 1299 mw = rpcrdma_pop_mw(&buf->rb_mws); 1300 spin_unlock(&buf->rb_mwlock); 1301 1302 if (!mw) 1303 goto out_nomws; 1304 mw->mw_flags = 0; 1305 return mw; 1306 1307 out_nomws: 1308 dprintk("RPC: %s: no MWs available\n", __func__); 1309 if (r_xprt->rx_ep.rep_connected != -ENODEV) 1310 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1311 1312 /* Allow the reply handler and refresh worker to run */ 1313 cond_resched(); 1314 1315 return NULL; 1316 } 1317 1318 void 1319 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) 1320 { 1321 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1322 1323 spin_lock(&buf->rb_mwlock); 1324 rpcrdma_push_mw(mw, &buf->rb_mws); 1325 spin_unlock(&buf->rb_mwlock); 1326 } 1327 1328 static struct rpcrdma_rep * 1329 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) 1330 { 1331 /* If an RPC previously completed without a reply (say, a 1332 * credential problem or a soft timeout occurs) then hold off 1333 * on supplying more Receive buffers until the number of new 1334 * pending RPCs catches up to the number of posted Receives. 1335 */ 1336 if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) 1337 return NULL; 1338 1339 if (unlikely(list_empty(&buffers->rb_recv_bufs))) 1340 return NULL; 1341 buffers->rb_recv_count++; 1342 return rpcrdma_buffer_get_rep_locked(buffers); 1343 } 1344 1345 /* 1346 * Get a set of request/reply buffers. 1347 * 1348 * Reply buffer (if available) is attached to send buffer upon return. 1349 */ 1350 struct rpcrdma_req * 1351 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1352 { 1353 struct rpcrdma_req *req; 1354 1355 spin_lock(&buffers->rb_lock); 1356 if (list_empty(&buffers->rb_send_bufs)) 1357 goto out_reqbuf; 1358 buffers->rb_send_count++; 1359 req = rpcrdma_buffer_get_req_locked(buffers); 1360 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1361 spin_unlock(&buffers->rb_lock); 1362 return req; 1363 1364 out_reqbuf: 1365 spin_unlock(&buffers->rb_lock); 1366 pr_warn("RPC: %s: out of request buffers\n", __func__); 1367 return NULL; 1368 } 1369 1370 /* 1371 * Put request/reply buffers back into pool. 1372 * Pre-decrement counter/array index. 1373 */ 1374 void 1375 rpcrdma_buffer_put(struct rpcrdma_req *req) 1376 { 1377 struct rpcrdma_buffer *buffers = req->rl_buffer; 1378 struct rpcrdma_rep *rep = req->rl_reply; 1379 1380 req->rl_reply = NULL; 1381 1382 spin_lock(&buffers->rb_lock); 1383 buffers->rb_send_count--; 1384 list_add_tail(&req->rl_list, &buffers->rb_send_bufs); 1385 if (rep) { 1386 buffers->rb_recv_count--; 1387 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1388 } 1389 spin_unlock(&buffers->rb_lock); 1390 } 1391 1392 /* 1393 * Recover reply buffers from pool. 1394 * This happens when recovering from disconnect. 1395 */ 1396 void 1397 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1398 { 1399 struct rpcrdma_buffer *buffers = req->rl_buffer; 1400 1401 spin_lock(&buffers->rb_lock); 1402 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1403 spin_unlock(&buffers->rb_lock); 1404 } 1405 1406 /* 1407 * Put reply buffers back into pool when not attached to 1408 * request. This happens in error conditions. 1409 */ 1410 void 1411 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1412 { 1413 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1414 1415 spin_lock(&buffers->rb_lock); 1416 buffers->rb_recv_count--; 1417 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1418 spin_unlock(&buffers->rb_lock); 1419 } 1420 1421 /** 1422 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers 1423 * @size: size of buffer to be allocated, in bytes 1424 * @direction: direction of data movement 1425 * @flags: GFP flags 1426 * 1427 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that 1428 * can be persistently DMA-mapped for I/O. 1429 * 1430 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1431 * receiving the payload of RDMA RECV operations. During Long Calls 1432 * or Replies they may be registered externally via ro_map. 1433 */ 1434 struct rpcrdma_regbuf * 1435 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction, 1436 gfp_t flags) 1437 { 1438 struct rpcrdma_regbuf *rb; 1439 1440 rb = kmalloc(sizeof(*rb) + size, flags); 1441 if (rb == NULL) 1442 return ERR_PTR(-ENOMEM); 1443 1444 rb->rg_device = NULL; 1445 rb->rg_direction = direction; 1446 rb->rg_iov.length = size; 1447 1448 return rb; 1449 } 1450 1451 /** 1452 * __rpcrdma_map_regbuf - DMA-map a regbuf 1453 * @ia: controlling rpcrdma_ia 1454 * @rb: regbuf to be mapped 1455 */ 1456 bool 1457 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1458 { 1459 struct ib_device *device = ia->ri_device; 1460 1461 if (rb->rg_direction == DMA_NONE) 1462 return false; 1463 1464 rb->rg_iov.addr = ib_dma_map_single(device, 1465 (void *)rb->rg_base, 1466 rdmab_length(rb), 1467 rb->rg_direction); 1468 if (ib_dma_mapping_error(device, rdmab_addr(rb))) 1469 return false; 1470 1471 rb->rg_device = device; 1472 rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey; 1473 return true; 1474 } 1475 1476 static void 1477 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) 1478 { 1479 if (!rpcrdma_regbuf_is_mapped(rb)) 1480 return; 1481 1482 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), 1483 rdmab_length(rb), rb->rg_direction); 1484 rb->rg_device = NULL; 1485 } 1486 1487 /** 1488 * rpcrdma_free_regbuf - deregister and free registered buffer 1489 * @rb: regbuf to be deregistered and freed 1490 */ 1491 void 1492 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) 1493 { 1494 if (!rb) 1495 return; 1496 1497 rpcrdma_dma_unmap_regbuf(rb); 1498 kfree(rb); 1499 } 1500 1501 /* 1502 * Prepost any receive buffer, then post send. 1503 * 1504 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1505 */ 1506 int 1507 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1508 struct rpcrdma_ep *ep, 1509 struct rpcrdma_req *req) 1510 { 1511 struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; 1512 struct ib_send_wr *send_wr_fail; 1513 int rc; 1514 1515 if (req->rl_reply) { 1516 rc = rpcrdma_ep_post_recv(ia, req->rl_reply); 1517 if (rc) 1518 return rc; 1519 req->rl_reply = NULL; 1520 } 1521 1522 dprintk("RPC: %s: posting %d s/g entries\n", 1523 __func__, send_wr->num_sge); 1524 1525 if (!ep->rep_send_count || 1526 test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { 1527 send_wr->send_flags |= IB_SEND_SIGNALED; 1528 ep->rep_send_count = ep->rep_send_batch; 1529 } else { 1530 send_wr->send_flags &= ~IB_SEND_SIGNALED; 1531 --ep->rep_send_count; 1532 } 1533 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); 1534 if (rc) 1535 goto out_postsend_err; 1536 return 0; 1537 1538 out_postsend_err: 1539 pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); 1540 return -ENOTCONN; 1541 } 1542 1543 int 1544 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1545 struct rpcrdma_rep *rep) 1546 { 1547 struct ib_recv_wr *recv_wr_fail; 1548 int rc; 1549 1550 if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) 1551 goto out_map; 1552 rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); 1553 if (rc) 1554 goto out_postrecv; 1555 return 0; 1556 1557 out_map: 1558 pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); 1559 return -EIO; 1560 1561 out_postrecv: 1562 pr_err("rpcrdma: ib_post_recv returned %i\n", rc); 1563 return -ENOTCONN; 1564 } 1565 1566 /** 1567 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1568 * @r_xprt: transport associated with these backchannel resources 1569 * @min_reqs: minimum number of incoming requests expected 1570 * 1571 * Returns zero if all requested buffers were posted, or a negative errno. 1572 */ 1573 int 1574 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1575 { 1576 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1577 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1578 struct rpcrdma_rep *rep; 1579 int rc; 1580 1581 while (count--) { 1582 spin_lock(&buffers->rb_lock); 1583 if (list_empty(&buffers->rb_recv_bufs)) 1584 goto out_reqbuf; 1585 rep = rpcrdma_buffer_get_rep_locked(buffers); 1586 spin_unlock(&buffers->rb_lock); 1587 1588 rc = rpcrdma_ep_post_recv(ia, rep); 1589 if (rc) 1590 goto out_rc; 1591 } 1592 1593 return 0; 1594 1595 out_reqbuf: 1596 spin_unlock(&buffers->rb_lock); 1597 pr_warn("%s: no extra receive buffers\n", __func__); 1598 return -ENOMEM; 1599 1600 out_rc: 1601 rpcrdma_recv_buffer_put(rep); 1602 return rc; 1603 } 1604