1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * verbs.c 42 * 43 * Encapsulates the major functions managing: 44 * o adapters 45 * o endpoints 46 * o connections 47 * o buffer memory 48 */ 49 50 #include <linux/interrupt.h> 51 #include <linux/slab.h> 52 #include <linux/prefetch.h> 53 #include <linux/sunrpc/addr.h> 54 #include <asm/bitops.h> 55 #include <linux/module.h> /* try_module_get()/module_put() */ 56 57 #include "xprt_rdma.h" 58 59 /* 60 * Globals/Macros 61 */ 62 63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 64 # define RPCDBG_FACILITY RPCDBG_TRANS 65 #endif 66 67 /* 68 * internal functions 69 */ 70 71 static struct workqueue_struct *rpcrdma_receive_wq; 72 73 int 74 rpcrdma_alloc_wq(void) 75 { 76 struct workqueue_struct *recv_wq; 77 78 recv_wq = alloc_workqueue("xprtrdma_receive", 79 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, 80 0); 81 if (!recv_wq) 82 return -ENOMEM; 83 84 rpcrdma_receive_wq = recv_wq; 85 return 0; 86 } 87 88 void 89 rpcrdma_destroy_wq(void) 90 { 91 struct workqueue_struct *wq; 92 93 if (rpcrdma_receive_wq) { 94 wq = rpcrdma_receive_wq; 95 rpcrdma_receive_wq = NULL; 96 destroy_workqueue(wq); 97 } 98 } 99 100 static void 101 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 102 { 103 struct rpcrdma_ep *ep = context; 104 105 pr_err("RPC: %s: %s on device %s ep %p\n", 106 __func__, ib_event_msg(event->event), 107 event->device->name, context); 108 if (ep->rep_connected == 1) { 109 ep->rep_connected = -EIO; 110 rpcrdma_conn_func(ep); 111 wake_up_all(&ep->rep_connect_wait); 112 } 113 } 114 115 /** 116 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 117 * @cq: completion queue (ignored) 118 * @wc: completed WR 119 * 120 */ 121 static void 122 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 123 { 124 /* WARNING: Only wr_cqe and status are reliable at this point */ 125 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 126 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 127 ib_wc_status_msg(wc->status), 128 wc->status, wc->vendor_err); 129 } 130 131 static void 132 rpcrdma_receive_worker(struct work_struct *work) 133 { 134 struct rpcrdma_rep *rep = 135 container_of(work, struct rpcrdma_rep, rr_work); 136 137 rpcrdma_reply_handler(rep); 138 } 139 140 /* Perform basic sanity checking to avoid using garbage 141 * to update the credit grant value. 142 */ 143 static void 144 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) 145 { 146 struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); 147 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; 148 u32 credits; 149 150 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 151 return; 152 153 credits = be32_to_cpu(rmsgp->rm_credit); 154 if (credits == 0) 155 credits = 1; /* don't deadlock */ 156 else if (credits > buffer->rb_max_requests) 157 credits = buffer->rb_max_requests; 158 159 atomic_set(&buffer->rb_credits, credits); 160 } 161 162 /** 163 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC 164 * @cq: completion queue (ignored) 165 * @wc: completed WR 166 * 167 */ 168 static void 169 rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc) 170 { 171 struct ib_cqe *cqe = wc->wr_cqe; 172 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 173 rr_cqe); 174 175 /* WARNING: Only wr_id and status are reliable at this point */ 176 if (wc->status != IB_WC_SUCCESS) 177 goto out_fail; 178 179 /* status == SUCCESS means all fields in wc are trustworthy */ 180 if (wc->opcode != IB_WC_RECV) 181 return; 182 183 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 184 __func__, rep, wc->byte_len); 185 186 rep->rr_len = wc->byte_len; 187 ib_dma_sync_single_for_cpu(rep->rr_device, 188 rdmab_addr(rep->rr_rdmabuf), 189 rep->rr_len, DMA_FROM_DEVICE); 190 191 rpcrdma_update_granted_credits(rep); 192 193 out_schedule: 194 queue_work(rpcrdma_receive_wq, &rep->rr_work); 195 return; 196 197 out_fail: 198 if (wc->status != IB_WC_WR_FLUSH_ERR) 199 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 200 ib_wc_status_msg(wc->status), 201 wc->status, wc->vendor_err); 202 rep->rr_len = RPCRDMA_BAD_LEN; 203 goto out_schedule; 204 } 205 206 static int 207 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 208 { 209 struct rpcrdma_xprt *xprt = id->context; 210 struct rpcrdma_ia *ia = &xprt->rx_ia; 211 struct rpcrdma_ep *ep = &xprt->rx_ep; 212 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 213 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 214 #endif 215 struct ib_qp_attr *attr = &ia->ri_qp_attr; 216 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr; 217 int connstate = 0; 218 219 switch (event->event) { 220 case RDMA_CM_EVENT_ADDR_RESOLVED: 221 case RDMA_CM_EVENT_ROUTE_RESOLVED: 222 ia->ri_async_rc = 0; 223 complete(&ia->ri_done); 224 break; 225 case RDMA_CM_EVENT_ADDR_ERROR: 226 ia->ri_async_rc = -EHOSTUNREACH; 227 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 228 __func__, ep); 229 complete(&ia->ri_done); 230 break; 231 case RDMA_CM_EVENT_ROUTE_ERROR: 232 ia->ri_async_rc = -ENETUNREACH; 233 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 234 __func__, ep); 235 complete(&ia->ri_done); 236 break; 237 case RDMA_CM_EVENT_ESTABLISHED: 238 connstate = 1; 239 ib_query_qp(ia->ri_id->qp, attr, 240 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, 241 iattr); 242 dprintk("RPC: %s: %d responder resources" 243 " (%d initiator)\n", 244 __func__, attr->max_dest_rd_atomic, 245 attr->max_rd_atomic); 246 goto connected; 247 case RDMA_CM_EVENT_CONNECT_ERROR: 248 connstate = -ENOTCONN; 249 goto connected; 250 case RDMA_CM_EVENT_UNREACHABLE: 251 connstate = -ENETDOWN; 252 goto connected; 253 case RDMA_CM_EVENT_REJECTED: 254 connstate = -ECONNREFUSED; 255 goto connected; 256 case RDMA_CM_EVENT_DISCONNECTED: 257 connstate = -ECONNABORTED; 258 goto connected; 259 case RDMA_CM_EVENT_DEVICE_REMOVAL: 260 connstate = -ENODEV; 261 connected: 262 dprintk("RPC: %s: %sconnected\n", 263 __func__, connstate > 0 ? "" : "dis"); 264 atomic_set(&xprt->rx_buf.rb_credits, 1); 265 ep->rep_connected = connstate; 266 rpcrdma_conn_func(ep); 267 wake_up_all(&ep->rep_connect_wait); 268 /*FALLTHROUGH*/ 269 default: 270 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n", 271 __func__, sap, rpc_get_port(sap), ep, 272 rdma_event_msg(event->event)); 273 break; 274 } 275 276 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 277 if (connstate == 1) { 278 int ird = attr->max_dest_rd_atomic; 279 int tird = ep->rep_remote_cma.responder_resources; 280 281 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n", 282 sap, rpc_get_port(sap), 283 ia->ri_device->name, 284 ia->ri_ops->ro_displayname, 285 xprt->rx_buf.rb_max_requests, 286 ird, ird < 4 && ird < tird / 2 ? " (low!)" : ""); 287 } else if (connstate < 0) { 288 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n", 289 sap, rpc_get_port(sap), connstate); 290 } 291 #endif 292 293 return 0; 294 } 295 296 static void rpcrdma_destroy_id(struct rdma_cm_id *id) 297 { 298 if (id) { 299 module_put(id->device->owner); 300 rdma_destroy_id(id); 301 } 302 } 303 304 static struct rdma_cm_id * 305 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 306 struct rpcrdma_ia *ia, struct sockaddr *addr) 307 { 308 struct rdma_cm_id *id; 309 int rc; 310 311 init_completion(&ia->ri_done); 312 313 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 314 IB_QPT_RC); 315 if (IS_ERR(id)) { 316 rc = PTR_ERR(id); 317 dprintk("RPC: %s: rdma_create_id() failed %i\n", 318 __func__, rc); 319 return id; 320 } 321 322 ia->ri_async_rc = -ETIMEDOUT; 323 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 324 if (rc) { 325 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 326 __func__, rc); 327 goto out; 328 } 329 wait_for_completion_interruptible_timeout(&ia->ri_done, 330 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 331 332 /* FIXME: 333 * Until xprtrdma supports DEVICE_REMOVAL, the provider must 334 * be pinned while there are active NFS/RDMA mounts to prevent 335 * hangs and crashes at umount time. 336 */ 337 if (!ia->ri_async_rc && !try_module_get(id->device->owner)) { 338 dprintk("RPC: %s: Failed to get device module\n", 339 __func__); 340 ia->ri_async_rc = -ENODEV; 341 } 342 rc = ia->ri_async_rc; 343 if (rc) 344 goto out; 345 346 ia->ri_async_rc = -ETIMEDOUT; 347 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 348 if (rc) { 349 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 350 __func__, rc); 351 goto put; 352 } 353 wait_for_completion_interruptible_timeout(&ia->ri_done, 354 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 355 rc = ia->ri_async_rc; 356 if (rc) 357 goto put; 358 359 return id; 360 put: 361 module_put(id->device->owner); 362 out: 363 rdma_destroy_id(id); 364 return ERR_PTR(rc); 365 } 366 367 /* 368 * Exported functions. 369 */ 370 371 /* 372 * Open and initialize an Interface Adapter. 373 * o initializes fields of struct rpcrdma_ia, including 374 * interface and provider attributes and protection zone. 375 */ 376 int 377 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) 378 { 379 struct rpcrdma_ia *ia = &xprt->rx_ia; 380 int rc; 381 382 ia->ri_dma_mr = NULL; 383 384 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 385 if (IS_ERR(ia->ri_id)) { 386 rc = PTR_ERR(ia->ri_id); 387 goto out1; 388 } 389 ia->ri_device = ia->ri_id->device; 390 391 ia->ri_pd = ib_alloc_pd(ia->ri_device); 392 if (IS_ERR(ia->ri_pd)) { 393 rc = PTR_ERR(ia->ri_pd); 394 dprintk("RPC: %s: ib_alloc_pd() failed %i\n", 395 __func__, rc); 396 goto out2; 397 } 398 399 if (memreg == RPCRDMA_FRMR) { 400 if (!(ia->ri_device->attrs.device_cap_flags & 401 IB_DEVICE_MEM_MGT_EXTENSIONS) || 402 (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) { 403 dprintk("RPC: %s: FRMR registration " 404 "not supported by HCA\n", __func__); 405 memreg = RPCRDMA_MTHCAFMR; 406 } 407 } 408 if (memreg == RPCRDMA_MTHCAFMR) { 409 if (!ia->ri_device->alloc_fmr) { 410 dprintk("RPC: %s: MTHCAFMR registration " 411 "not supported by HCA\n", __func__); 412 rc = -EINVAL; 413 goto out3; 414 } 415 } 416 417 switch (memreg) { 418 case RPCRDMA_FRMR: 419 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 420 break; 421 case RPCRDMA_ALLPHYSICAL: 422 ia->ri_ops = &rpcrdma_physical_memreg_ops; 423 break; 424 case RPCRDMA_MTHCAFMR: 425 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 426 break; 427 default: 428 printk(KERN_ERR "RPC: Unsupported memory " 429 "registration mode: %d\n", memreg); 430 rc = -ENOMEM; 431 goto out3; 432 } 433 dprintk("RPC: %s: memory registration strategy is '%s'\n", 434 __func__, ia->ri_ops->ro_displayname); 435 436 return 0; 437 438 out3: 439 ib_dealloc_pd(ia->ri_pd); 440 ia->ri_pd = NULL; 441 out2: 442 rpcrdma_destroy_id(ia->ri_id); 443 ia->ri_id = NULL; 444 out1: 445 return rc; 446 } 447 448 /* 449 * Clean up/close an IA. 450 * o if event handles and PD have been initialized, free them. 451 * o close the IA 452 */ 453 void 454 rpcrdma_ia_close(struct rpcrdma_ia *ia) 455 { 456 dprintk("RPC: %s: entering\n", __func__); 457 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 458 if (ia->ri_id->qp) 459 rdma_destroy_qp(ia->ri_id); 460 rpcrdma_destroy_id(ia->ri_id); 461 ia->ri_id = NULL; 462 } 463 464 /* If the pd is still busy, xprtrdma missed freeing a resource */ 465 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 466 ib_dealloc_pd(ia->ri_pd); 467 } 468 469 /* 470 * Create unconnected endpoint. 471 */ 472 int 473 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 474 struct rpcrdma_create_data_internal *cdata) 475 { 476 struct ib_cq *sendcq, *recvcq; 477 unsigned int max_qp_wr; 478 int rc; 479 480 if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) { 481 dprintk("RPC: %s: insufficient sge's available\n", 482 __func__); 483 return -ENOMEM; 484 } 485 486 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 487 dprintk("RPC: %s: insufficient wqe's available\n", 488 __func__); 489 return -ENOMEM; 490 } 491 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 492 493 /* check provider's send/recv wr limits */ 494 if (cdata->max_requests > max_qp_wr) 495 cdata->max_requests = max_qp_wr; 496 497 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 498 ep->rep_attr.qp_context = ep; 499 ep->rep_attr.srq = NULL; 500 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 501 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 502 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 503 rc = ia->ri_ops->ro_open(ia, ep, cdata); 504 if (rc) 505 return rc; 506 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 507 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 508 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 509 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; 510 ep->rep_attr.cap.max_recv_sge = 1; 511 ep->rep_attr.cap.max_inline_data = 0; 512 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 513 ep->rep_attr.qp_type = IB_QPT_RC; 514 ep->rep_attr.port_num = ~0; 515 516 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 517 "iovs: send %d recv %d\n", 518 __func__, 519 ep->rep_attr.cap.max_send_wr, 520 ep->rep_attr.cap.max_recv_wr, 521 ep->rep_attr.cap.max_send_sge, 522 ep->rep_attr.cap.max_recv_sge); 523 524 /* set trigger for requesting send completion */ 525 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 526 if (ep->rep_cqinit <= 2) 527 ep->rep_cqinit = 0; /* always signal? */ 528 INIT_CQCOUNT(ep); 529 init_waitqueue_head(&ep->rep_connect_wait); 530 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 531 532 sendcq = ib_alloc_cq(ia->ri_device, NULL, 533 ep->rep_attr.cap.max_send_wr + 1, 534 0, IB_POLL_SOFTIRQ); 535 if (IS_ERR(sendcq)) { 536 rc = PTR_ERR(sendcq); 537 dprintk("RPC: %s: failed to create send CQ: %i\n", 538 __func__, rc); 539 goto out1; 540 } 541 542 recvcq = ib_alloc_cq(ia->ri_device, NULL, 543 ep->rep_attr.cap.max_recv_wr + 1, 544 0, IB_POLL_SOFTIRQ); 545 if (IS_ERR(recvcq)) { 546 rc = PTR_ERR(recvcq); 547 dprintk("RPC: %s: failed to create recv CQ: %i\n", 548 __func__, rc); 549 goto out2; 550 } 551 552 ep->rep_attr.send_cq = sendcq; 553 ep->rep_attr.recv_cq = recvcq; 554 555 /* Initialize cma parameters */ 556 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 557 558 /* RPC/RDMA does not use private data */ 559 ep->rep_remote_cma.private_data = NULL; 560 ep->rep_remote_cma.private_data_len = 0; 561 562 /* Client offers RDMA Read but does not initiate */ 563 ep->rep_remote_cma.initiator_depth = 0; 564 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 565 ep->rep_remote_cma.responder_resources = 32; 566 else 567 ep->rep_remote_cma.responder_resources = 568 ia->ri_device->attrs.max_qp_rd_atom; 569 570 /* Limit transport retries so client can detect server 571 * GID changes quickly. RPC layer handles re-establishing 572 * transport connection and retransmission. 573 */ 574 ep->rep_remote_cma.retry_count = 6; 575 576 /* RPC-over-RDMA handles its own flow control. In addition, 577 * make all RNR NAKs visible so we know that RPC-over-RDMA 578 * flow control is working correctly (no NAKs should be seen). 579 */ 580 ep->rep_remote_cma.flow_control = 0; 581 ep->rep_remote_cma.rnr_retry_count = 0; 582 583 return 0; 584 585 out2: 586 ib_free_cq(sendcq); 587 out1: 588 if (ia->ri_dma_mr) 589 ib_dereg_mr(ia->ri_dma_mr); 590 return rc; 591 } 592 593 /* 594 * rpcrdma_ep_destroy 595 * 596 * Disconnect and destroy endpoint. After this, the only 597 * valid operations on the ep are to free it (if dynamically 598 * allocated) or re-create it. 599 */ 600 void 601 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 602 { 603 int rc; 604 605 dprintk("RPC: %s: entering, connected is %d\n", 606 __func__, ep->rep_connected); 607 608 cancel_delayed_work_sync(&ep->rep_connect_worker); 609 610 if (ia->ri_id->qp) { 611 rpcrdma_ep_disconnect(ep, ia); 612 rdma_destroy_qp(ia->ri_id); 613 ia->ri_id->qp = NULL; 614 } 615 616 ib_free_cq(ep->rep_attr.recv_cq); 617 ib_free_cq(ep->rep_attr.send_cq); 618 619 if (ia->ri_dma_mr) { 620 rc = ib_dereg_mr(ia->ri_dma_mr); 621 dprintk("RPC: %s: ib_dereg_mr returned %i\n", 622 __func__, rc); 623 } 624 } 625 626 /* 627 * Connect unconnected endpoint. 628 */ 629 int 630 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 631 { 632 struct rdma_cm_id *id, *old; 633 int rc = 0; 634 int retry_count = 0; 635 636 if (ep->rep_connected != 0) { 637 struct rpcrdma_xprt *xprt; 638 retry: 639 dprintk("RPC: %s: reconnecting...\n", __func__); 640 641 rpcrdma_ep_disconnect(ep, ia); 642 643 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 644 id = rpcrdma_create_id(xprt, ia, 645 (struct sockaddr *)&xprt->rx_data.addr); 646 if (IS_ERR(id)) { 647 rc = -EHOSTUNREACH; 648 goto out; 649 } 650 /* TEMP TEMP TEMP - fail if new device: 651 * Deregister/remarshal *all* requests! 652 * Close and recreate adapter, pd, etc! 653 * Re-determine all attributes still sane! 654 * More stuff I haven't thought of! 655 * Rrrgh! 656 */ 657 if (ia->ri_device != id->device) { 658 printk("RPC: %s: can't reconnect on " 659 "different device!\n", __func__); 660 rpcrdma_destroy_id(id); 661 rc = -ENETUNREACH; 662 goto out; 663 } 664 /* END TEMP */ 665 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 666 if (rc) { 667 dprintk("RPC: %s: rdma_create_qp failed %i\n", 668 __func__, rc); 669 rpcrdma_destroy_id(id); 670 rc = -ENETUNREACH; 671 goto out; 672 } 673 674 old = ia->ri_id; 675 ia->ri_id = id; 676 677 rdma_destroy_qp(old); 678 rpcrdma_destroy_id(old); 679 } else { 680 dprintk("RPC: %s: connecting...\n", __func__); 681 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 682 if (rc) { 683 dprintk("RPC: %s: rdma_create_qp failed %i\n", 684 __func__, rc); 685 /* do not update ep->rep_connected */ 686 return -ENETUNREACH; 687 } 688 } 689 690 ep->rep_connected = 0; 691 692 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 693 if (rc) { 694 dprintk("RPC: %s: rdma_connect() failed with %i\n", 695 __func__, rc); 696 goto out; 697 } 698 699 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 700 701 /* 702 * Check state. A non-peer reject indicates no listener 703 * (ECONNREFUSED), which may be a transient state. All 704 * others indicate a transport condition which has already 705 * undergone a best-effort. 706 */ 707 if (ep->rep_connected == -ECONNREFUSED && 708 ++retry_count <= RDMA_CONNECT_RETRY_MAX) { 709 dprintk("RPC: %s: non-peer_reject, retry\n", __func__); 710 goto retry; 711 } 712 if (ep->rep_connected <= 0) { 713 /* Sometimes, the only way to reliably connect to remote 714 * CMs is to use same nonzero values for ORD and IRD. */ 715 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 && 716 (ep->rep_remote_cma.responder_resources == 0 || 717 ep->rep_remote_cma.initiator_depth != 718 ep->rep_remote_cma.responder_resources)) { 719 if (ep->rep_remote_cma.responder_resources == 0) 720 ep->rep_remote_cma.responder_resources = 1; 721 ep->rep_remote_cma.initiator_depth = 722 ep->rep_remote_cma.responder_resources; 723 goto retry; 724 } 725 rc = ep->rep_connected; 726 } else { 727 struct rpcrdma_xprt *r_xprt; 728 unsigned int extras; 729 730 dprintk("RPC: %s: connected\n", __func__); 731 732 r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 733 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 734 735 if (extras) { 736 rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); 737 if (rc) { 738 pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n", 739 __func__, rc); 740 rc = 0; 741 } 742 } 743 } 744 745 out: 746 if (rc) 747 ep->rep_connected = rc; 748 return rc; 749 } 750 751 /* 752 * rpcrdma_ep_disconnect 753 * 754 * This is separate from destroy to facilitate the ability 755 * to reconnect without recreating the endpoint. 756 * 757 * This call is not reentrant, and must not be made in parallel 758 * on the same endpoint. 759 */ 760 void 761 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 762 { 763 int rc; 764 765 rc = rdma_disconnect(ia->ri_id); 766 if (!rc) { 767 /* returns without wait if not connected */ 768 wait_event_interruptible(ep->rep_connect_wait, 769 ep->rep_connected != 1); 770 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 771 (ep->rep_connected == 1) ? "still " : "dis"); 772 } else { 773 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 774 ep->rep_connected = rc; 775 } 776 777 ib_drain_qp(ia->ri_id->qp); 778 } 779 780 struct rpcrdma_req * 781 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 782 { 783 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 784 struct rpcrdma_req *req; 785 786 req = kzalloc(sizeof(*req), GFP_KERNEL); 787 if (req == NULL) 788 return ERR_PTR(-ENOMEM); 789 790 INIT_LIST_HEAD(&req->rl_free); 791 spin_lock(&buffer->rb_reqslock); 792 list_add(&req->rl_all, &buffer->rb_allreqs); 793 spin_unlock(&buffer->rb_reqslock); 794 req->rl_cqe.done = rpcrdma_wc_send; 795 req->rl_buffer = &r_xprt->rx_buf; 796 return req; 797 } 798 799 struct rpcrdma_rep * 800 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 801 { 802 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 803 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 804 struct rpcrdma_rep *rep; 805 int rc; 806 807 rc = -ENOMEM; 808 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 809 if (rep == NULL) 810 goto out; 811 812 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize, 813 GFP_KERNEL); 814 if (IS_ERR(rep->rr_rdmabuf)) { 815 rc = PTR_ERR(rep->rr_rdmabuf); 816 goto out_free; 817 } 818 819 rep->rr_device = ia->ri_device; 820 rep->rr_cqe.done = rpcrdma_receive_wc; 821 rep->rr_rxprt = r_xprt; 822 INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); 823 return rep; 824 825 out_free: 826 kfree(rep); 827 out: 828 return ERR_PTR(rc); 829 } 830 831 int 832 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 833 { 834 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 835 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 836 int i, rc; 837 838 buf->rb_max_requests = r_xprt->rx_data.max_requests; 839 buf->rb_bc_srv_max_requests = 0; 840 spin_lock_init(&buf->rb_lock); 841 atomic_set(&buf->rb_credits, 1); 842 843 rc = ia->ri_ops->ro_init(r_xprt); 844 if (rc) 845 goto out; 846 847 INIT_LIST_HEAD(&buf->rb_send_bufs); 848 INIT_LIST_HEAD(&buf->rb_allreqs); 849 spin_lock_init(&buf->rb_reqslock); 850 for (i = 0; i < buf->rb_max_requests; i++) { 851 struct rpcrdma_req *req; 852 853 req = rpcrdma_create_req(r_xprt); 854 if (IS_ERR(req)) { 855 dprintk("RPC: %s: request buffer %d alloc" 856 " failed\n", __func__, i); 857 rc = PTR_ERR(req); 858 goto out; 859 } 860 req->rl_backchannel = false; 861 list_add(&req->rl_free, &buf->rb_send_bufs); 862 } 863 864 INIT_LIST_HEAD(&buf->rb_recv_bufs); 865 for (i = 0; i < buf->rb_max_requests + 2; i++) { 866 struct rpcrdma_rep *rep; 867 868 rep = rpcrdma_create_rep(r_xprt); 869 if (IS_ERR(rep)) { 870 dprintk("RPC: %s: reply buffer %d alloc failed\n", 871 __func__, i); 872 rc = PTR_ERR(rep); 873 goto out; 874 } 875 list_add(&rep->rr_list, &buf->rb_recv_bufs); 876 } 877 878 return 0; 879 out: 880 rpcrdma_buffer_destroy(buf); 881 return rc; 882 } 883 884 static struct rpcrdma_req * 885 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 886 { 887 struct rpcrdma_req *req; 888 889 req = list_first_entry(&buf->rb_send_bufs, 890 struct rpcrdma_req, rl_free); 891 list_del(&req->rl_free); 892 return req; 893 } 894 895 static struct rpcrdma_rep * 896 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 897 { 898 struct rpcrdma_rep *rep; 899 900 rep = list_first_entry(&buf->rb_recv_bufs, 901 struct rpcrdma_rep, rr_list); 902 list_del(&rep->rr_list); 903 return rep; 904 } 905 906 static void 907 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) 908 { 909 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); 910 kfree(rep); 911 } 912 913 void 914 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) 915 { 916 rpcrdma_free_regbuf(ia, req->rl_sendbuf); 917 rpcrdma_free_regbuf(ia, req->rl_rdmabuf); 918 kfree(req); 919 } 920 921 void 922 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 923 { 924 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 925 926 while (!list_empty(&buf->rb_recv_bufs)) { 927 struct rpcrdma_rep *rep; 928 929 rep = rpcrdma_buffer_get_rep_locked(buf); 930 rpcrdma_destroy_rep(ia, rep); 931 } 932 933 spin_lock(&buf->rb_reqslock); 934 while (!list_empty(&buf->rb_allreqs)) { 935 struct rpcrdma_req *req; 936 937 req = list_first_entry(&buf->rb_allreqs, 938 struct rpcrdma_req, rl_all); 939 list_del(&req->rl_all); 940 941 spin_unlock(&buf->rb_reqslock); 942 rpcrdma_destroy_req(ia, req); 943 spin_lock(&buf->rb_reqslock); 944 } 945 spin_unlock(&buf->rb_reqslock); 946 947 ia->ri_ops->ro_destroy(buf); 948 } 949 950 struct rpcrdma_mw * 951 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) 952 { 953 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 954 struct rpcrdma_mw *mw = NULL; 955 956 spin_lock(&buf->rb_mwlock); 957 if (!list_empty(&buf->rb_mws)) { 958 mw = list_first_entry(&buf->rb_mws, 959 struct rpcrdma_mw, mw_list); 960 list_del_init(&mw->mw_list); 961 } 962 spin_unlock(&buf->rb_mwlock); 963 964 if (!mw) 965 pr_err("RPC: %s: no MWs available\n", __func__); 966 return mw; 967 } 968 969 void 970 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) 971 { 972 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 973 974 spin_lock(&buf->rb_mwlock); 975 list_add_tail(&mw->mw_list, &buf->rb_mws); 976 spin_unlock(&buf->rb_mwlock); 977 } 978 979 /* 980 * Get a set of request/reply buffers. 981 * 982 * Reply buffer (if available) is attached to send buffer upon return. 983 */ 984 struct rpcrdma_req * 985 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 986 { 987 struct rpcrdma_req *req; 988 989 spin_lock(&buffers->rb_lock); 990 if (list_empty(&buffers->rb_send_bufs)) 991 goto out_reqbuf; 992 req = rpcrdma_buffer_get_req_locked(buffers); 993 if (list_empty(&buffers->rb_recv_bufs)) 994 goto out_repbuf; 995 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); 996 spin_unlock(&buffers->rb_lock); 997 return req; 998 999 out_reqbuf: 1000 spin_unlock(&buffers->rb_lock); 1001 pr_warn("RPC: %s: out of request buffers\n", __func__); 1002 return NULL; 1003 out_repbuf: 1004 spin_unlock(&buffers->rb_lock); 1005 pr_warn("RPC: %s: out of reply buffers\n", __func__); 1006 req->rl_reply = NULL; 1007 return req; 1008 } 1009 1010 /* 1011 * Put request/reply buffers back into pool. 1012 * Pre-decrement counter/array index. 1013 */ 1014 void 1015 rpcrdma_buffer_put(struct rpcrdma_req *req) 1016 { 1017 struct rpcrdma_buffer *buffers = req->rl_buffer; 1018 struct rpcrdma_rep *rep = req->rl_reply; 1019 1020 req->rl_niovs = 0; 1021 req->rl_reply = NULL; 1022 1023 spin_lock(&buffers->rb_lock); 1024 list_add_tail(&req->rl_free, &buffers->rb_send_bufs); 1025 if (rep) 1026 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1027 spin_unlock(&buffers->rb_lock); 1028 } 1029 1030 /* 1031 * Recover reply buffers from pool. 1032 * This happens when recovering from disconnect. 1033 */ 1034 void 1035 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1036 { 1037 struct rpcrdma_buffer *buffers = req->rl_buffer; 1038 1039 spin_lock(&buffers->rb_lock); 1040 if (!list_empty(&buffers->rb_recv_bufs)) 1041 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); 1042 spin_unlock(&buffers->rb_lock); 1043 } 1044 1045 /* 1046 * Put reply buffers back into pool when not attached to 1047 * request. This happens in error conditions. 1048 */ 1049 void 1050 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1051 { 1052 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1053 1054 spin_lock(&buffers->rb_lock); 1055 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1056 spin_unlock(&buffers->rb_lock); 1057 } 1058 1059 /* 1060 * Wrappers for internal-use kmalloc memory registration, used by buffer code. 1061 */ 1062 1063 void 1064 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg) 1065 { 1066 dprintk("RPC: map_one: offset %p iova %llx len %zu\n", 1067 seg->mr_offset, 1068 (unsigned long long)seg->mr_dma, seg->mr_dmalen); 1069 } 1070 1071 /** 1072 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers 1073 * @ia: controlling rpcrdma_ia 1074 * @size: size of buffer to be allocated, in bytes 1075 * @flags: GFP flags 1076 * 1077 * Returns pointer to private header of an area of internally 1078 * registered memory, or an ERR_PTR. The registered buffer follows 1079 * the end of the private header. 1080 * 1081 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1082 * receiving the payload of RDMA RECV operations. regbufs are not 1083 * used for RDMA READ/WRITE operations, thus are registered only for 1084 * LOCAL access. 1085 */ 1086 struct rpcrdma_regbuf * 1087 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags) 1088 { 1089 struct rpcrdma_regbuf *rb; 1090 struct ib_sge *iov; 1091 1092 rb = kmalloc(sizeof(*rb) + size, flags); 1093 if (rb == NULL) 1094 goto out; 1095 1096 iov = &rb->rg_iov; 1097 iov->addr = ib_dma_map_single(ia->ri_device, 1098 (void *)rb->rg_base, size, 1099 DMA_BIDIRECTIONAL); 1100 if (ib_dma_mapping_error(ia->ri_device, iov->addr)) 1101 goto out_free; 1102 1103 iov->length = size; 1104 iov->lkey = ia->ri_pd->local_dma_lkey; 1105 rb->rg_size = size; 1106 rb->rg_owner = NULL; 1107 return rb; 1108 1109 out_free: 1110 kfree(rb); 1111 out: 1112 return ERR_PTR(-ENOMEM); 1113 } 1114 1115 /** 1116 * rpcrdma_free_regbuf - deregister and free registered buffer 1117 * @ia: controlling rpcrdma_ia 1118 * @rb: regbuf to be deregistered and freed 1119 */ 1120 void 1121 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1122 { 1123 struct ib_sge *iov; 1124 1125 if (!rb) 1126 return; 1127 1128 iov = &rb->rg_iov; 1129 ib_dma_unmap_single(ia->ri_device, 1130 iov->addr, iov->length, DMA_BIDIRECTIONAL); 1131 kfree(rb); 1132 } 1133 1134 /* 1135 * Prepost any receive buffer, then post send. 1136 * 1137 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1138 */ 1139 int 1140 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1141 struct rpcrdma_ep *ep, 1142 struct rpcrdma_req *req) 1143 { 1144 struct ib_device *device = ia->ri_device; 1145 struct ib_send_wr send_wr, *send_wr_fail; 1146 struct rpcrdma_rep *rep = req->rl_reply; 1147 struct ib_sge *iov = req->rl_send_iov; 1148 int i, rc; 1149 1150 if (rep) { 1151 rc = rpcrdma_ep_post_recv(ia, ep, rep); 1152 if (rc) 1153 goto out; 1154 req->rl_reply = NULL; 1155 } 1156 1157 send_wr.next = NULL; 1158 send_wr.wr_cqe = &req->rl_cqe; 1159 send_wr.sg_list = iov; 1160 send_wr.num_sge = req->rl_niovs; 1161 send_wr.opcode = IB_WR_SEND; 1162 1163 for (i = 0; i < send_wr.num_sge; i++) 1164 ib_dma_sync_single_for_device(device, iov[i].addr, 1165 iov[i].length, DMA_TO_DEVICE); 1166 dprintk("RPC: %s: posting %d s/g entries\n", 1167 __func__, send_wr.num_sge); 1168 1169 if (DECR_CQCOUNT(ep) > 0) 1170 send_wr.send_flags = 0; 1171 else { /* Provider must take a send completion every now and then */ 1172 INIT_CQCOUNT(ep); 1173 send_wr.send_flags = IB_SEND_SIGNALED; 1174 } 1175 1176 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail); 1177 if (rc) 1178 dprintk("RPC: %s: ib_post_send returned %i\n", __func__, 1179 rc); 1180 out: 1181 return rc; 1182 } 1183 1184 /* 1185 * (Re)post a receive buffer. 1186 */ 1187 int 1188 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1189 struct rpcrdma_ep *ep, 1190 struct rpcrdma_rep *rep) 1191 { 1192 struct ib_recv_wr recv_wr, *recv_wr_fail; 1193 int rc; 1194 1195 recv_wr.next = NULL; 1196 recv_wr.wr_cqe = &rep->rr_cqe; 1197 recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1198 recv_wr.num_sge = 1; 1199 1200 ib_dma_sync_single_for_cpu(ia->ri_device, 1201 rdmab_addr(rep->rr_rdmabuf), 1202 rdmab_length(rep->rr_rdmabuf), 1203 DMA_BIDIRECTIONAL); 1204 1205 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); 1206 1207 if (rc) 1208 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__, 1209 rc); 1210 return rc; 1211 } 1212 1213 /** 1214 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1215 * @r_xprt: transport associated with these backchannel resources 1216 * @min_reqs: minimum number of incoming requests expected 1217 * 1218 * Returns zero if all requested buffers were posted, or a negative errno. 1219 */ 1220 int 1221 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1222 { 1223 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1224 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1225 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 1226 struct rpcrdma_rep *rep; 1227 int rc; 1228 1229 while (count--) { 1230 spin_lock(&buffers->rb_lock); 1231 if (list_empty(&buffers->rb_recv_bufs)) 1232 goto out_reqbuf; 1233 rep = rpcrdma_buffer_get_rep_locked(buffers); 1234 spin_unlock(&buffers->rb_lock); 1235 1236 rc = rpcrdma_ep_post_recv(ia, ep, rep); 1237 if (rc) 1238 goto out_rc; 1239 } 1240 1241 return 0; 1242 1243 out_reqbuf: 1244 spin_unlock(&buffers->rb_lock); 1245 pr_warn("%s: no extra receive buffers\n", __func__); 1246 return -ENOMEM; 1247 1248 out_rc: 1249 rpcrdma_recv_buffer_put(rep); 1250 return rc; 1251 } 1252