1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * verbs.c 42 * 43 * Encapsulates the major functions managing: 44 * o adapters 45 * o endpoints 46 * o connections 47 * o buffer memory 48 */ 49 50 #include <linux/interrupt.h> 51 #include <linux/slab.h> 52 #include <linux/prefetch.h> 53 #include <linux/sunrpc/addr.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 #include <asm/bitops.h> 56 #include <linux/module.h> /* try_module_get()/module_put() */ 57 58 #include "xprt_rdma.h" 59 60 /* 61 * Globals/Macros 62 */ 63 64 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 65 # define RPCDBG_FACILITY RPCDBG_TRANS 66 #endif 67 68 /* 69 * internal functions 70 */ 71 72 static struct workqueue_struct *rpcrdma_receive_wq; 73 74 int 75 rpcrdma_alloc_wq(void) 76 { 77 struct workqueue_struct *recv_wq; 78 79 recv_wq = alloc_workqueue("xprtrdma_receive", 80 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, 81 0); 82 if (!recv_wq) 83 return -ENOMEM; 84 85 rpcrdma_receive_wq = recv_wq; 86 return 0; 87 } 88 89 void 90 rpcrdma_destroy_wq(void) 91 { 92 struct workqueue_struct *wq; 93 94 if (rpcrdma_receive_wq) { 95 wq = rpcrdma_receive_wq; 96 rpcrdma_receive_wq = NULL; 97 destroy_workqueue(wq); 98 } 99 } 100 101 static void 102 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 103 { 104 struct rpcrdma_ep *ep = context; 105 106 pr_err("RPC: %s: %s on device %s ep %p\n", 107 __func__, ib_event_msg(event->event), 108 event->device->name, context); 109 if (ep->rep_connected == 1) { 110 ep->rep_connected = -EIO; 111 rpcrdma_conn_func(ep); 112 wake_up_all(&ep->rep_connect_wait); 113 } 114 } 115 116 /** 117 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 118 * @cq: completion queue (ignored) 119 * @wc: completed WR 120 * 121 */ 122 static void 123 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 124 { 125 /* WARNING: Only wr_cqe and status are reliable at this point */ 126 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 127 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 128 ib_wc_status_msg(wc->status), 129 wc->status, wc->vendor_err); 130 } 131 132 static void 133 rpcrdma_receive_worker(struct work_struct *work) 134 { 135 struct rpcrdma_rep *rep = 136 container_of(work, struct rpcrdma_rep, rr_work); 137 138 rpcrdma_reply_handler(rep); 139 } 140 141 /* Perform basic sanity checking to avoid using garbage 142 * to update the credit grant value. 143 */ 144 static void 145 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) 146 { 147 struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); 148 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; 149 u32 credits; 150 151 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 152 return; 153 154 credits = be32_to_cpu(rmsgp->rm_credit); 155 if (credits == 0) 156 credits = 1; /* don't deadlock */ 157 else if (credits > buffer->rb_max_requests) 158 credits = buffer->rb_max_requests; 159 160 atomic_set(&buffer->rb_credits, credits); 161 } 162 163 /** 164 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC 165 * @cq: completion queue (ignored) 166 * @wc: completed WR 167 * 168 */ 169 static void 170 rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc) 171 { 172 struct ib_cqe *cqe = wc->wr_cqe; 173 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 174 rr_cqe); 175 176 /* WARNING: Only wr_id and status are reliable at this point */ 177 if (wc->status != IB_WC_SUCCESS) 178 goto out_fail; 179 180 /* status == SUCCESS means all fields in wc are trustworthy */ 181 if (wc->opcode != IB_WC_RECV) 182 return; 183 184 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 185 __func__, rep, wc->byte_len); 186 187 rep->rr_len = wc->byte_len; 188 ib_dma_sync_single_for_cpu(rep->rr_device, 189 rdmab_addr(rep->rr_rdmabuf), 190 rep->rr_len, DMA_FROM_DEVICE); 191 192 rpcrdma_update_granted_credits(rep); 193 194 out_schedule: 195 queue_work(rpcrdma_receive_wq, &rep->rr_work); 196 return; 197 198 out_fail: 199 if (wc->status != IB_WC_WR_FLUSH_ERR) 200 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 201 ib_wc_status_msg(wc->status), 202 wc->status, wc->vendor_err); 203 rep->rr_len = RPCRDMA_BAD_LEN; 204 goto out_schedule; 205 } 206 207 static int 208 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 209 { 210 struct rpcrdma_xprt *xprt = id->context; 211 struct rpcrdma_ia *ia = &xprt->rx_ia; 212 struct rpcrdma_ep *ep = &xprt->rx_ep; 213 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 214 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 215 #endif 216 struct ib_qp_attr *attr = &ia->ri_qp_attr; 217 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr; 218 int connstate = 0; 219 220 switch (event->event) { 221 case RDMA_CM_EVENT_ADDR_RESOLVED: 222 case RDMA_CM_EVENT_ROUTE_RESOLVED: 223 ia->ri_async_rc = 0; 224 complete(&ia->ri_done); 225 break; 226 case RDMA_CM_EVENT_ADDR_ERROR: 227 ia->ri_async_rc = -EHOSTUNREACH; 228 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 229 __func__, ep); 230 complete(&ia->ri_done); 231 break; 232 case RDMA_CM_EVENT_ROUTE_ERROR: 233 ia->ri_async_rc = -ENETUNREACH; 234 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 235 __func__, ep); 236 complete(&ia->ri_done); 237 break; 238 case RDMA_CM_EVENT_ESTABLISHED: 239 connstate = 1; 240 ib_query_qp(ia->ri_id->qp, attr, 241 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, 242 iattr); 243 dprintk("RPC: %s: %d responder resources" 244 " (%d initiator)\n", 245 __func__, attr->max_dest_rd_atomic, 246 attr->max_rd_atomic); 247 goto connected; 248 case RDMA_CM_EVENT_CONNECT_ERROR: 249 connstate = -ENOTCONN; 250 goto connected; 251 case RDMA_CM_EVENT_UNREACHABLE: 252 connstate = -ENETDOWN; 253 goto connected; 254 case RDMA_CM_EVENT_REJECTED: 255 connstate = -ECONNREFUSED; 256 goto connected; 257 case RDMA_CM_EVENT_DISCONNECTED: 258 connstate = -ECONNABORTED; 259 goto connected; 260 case RDMA_CM_EVENT_DEVICE_REMOVAL: 261 connstate = -ENODEV; 262 connected: 263 dprintk("RPC: %s: %sconnected\n", 264 __func__, connstate > 0 ? "" : "dis"); 265 atomic_set(&xprt->rx_buf.rb_credits, 1); 266 ep->rep_connected = connstate; 267 rpcrdma_conn_func(ep); 268 wake_up_all(&ep->rep_connect_wait); 269 /*FALLTHROUGH*/ 270 default: 271 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n", 272 __func__, sap, rpc_get_port(sap), ep, 273 rdma_event_msg(event->event)); 274 break; 275 } 276 277 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 278 if (connstate == 1) { 279 int ird = attr->max_dest_rd_atomic; 280 int tird = ep->rep_remote_cma.responder_resources; 281 282 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n", 283 sap, rpc_get_port(sap), 284 ia->ri_device->name, 285 ia->ri_ops->ro_displayname, 286 xprt->rx_buf.rb_max_requests, 287 ird, ird < 4 && ird < tird / 2 ? " (low!)" : ""); 288 } else if (connstate < 0) { 289 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n", 290 sap, rpc_get_port(sap), connstate); 291 } 292 #endif 293 294 return 0; 295 } 296 297 static void rpcrdma_destroy_id(struct rdma_cm_id *id) 298 { 299 if (id) { 300 module_put(id->device->owner); 301 rdma_destroy_id(id); 302 } 303 } 304 305 static struct rdma_cm_id * 306 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 307 struct rpcrdma_ia *ia, struct sockaddr *addr) 308 { 309 struct rdma_cm_id *id; 310 int rc; 311 312 init_completion(&ia->ri_done); 313 314 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 315 IB_QPT_RC); 316 if (IS_ERR(id)) { 317 rc = PTR_ERR(id); 318 dprintk("RPC: %s: rdma_create_id() failed %i\n", 319 __func__, rc); 320 return id; 321 } 322 323 ia->ri_async_rc = -ETIMEDOUT; 324 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 325 if (rc) { 326 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 327 __func__, rc); 328 goto out; 329 } 330 wait_for_completion_interruptible_timeout(&ia->ri_done, 331 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 332 333 /* FIXME: 334 * Until xprtrdma supports DEVICE_REMOVAL, the provider must 335 * be pinned while there are active NFS/RDMA mounts to prevent 336 * hangs and crashes at umount time. 337 */ 338 if (!ia->ri_async_rc && !try_module_get(id->device->owner)) { 339 dprintk("RPC: %s: Failed to get device module\n", 340 __func__); 341 ia->ri_async_rc = -ENODEV; 342 } 343 rc = ia->ri_async_rc; 344 if (rc) 345 goto out; 346 347 ia->ri_async_rc = -ETIMEDOUT; 348 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 349 if (rc) { 350 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 351 __func__, rc); 352 goto put; 353 } 354 wait_for_completion_interruptible_timeout(&ia->ri_done, 355 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 356 rc = ia->ri_async_rc; 357 if (rc) 358 goto put; 359 360 return id; 361 put: 362 module_put(id->device->owner); 363 out: 364 rdma_destroy_id(id); 365 return ERR_PTR(rc); 366 } 367 368 /* 369 * Exported functions. 370 */ 371 372 /* 373 * Open and initialize an Interface Adapter. 374 * o initializes fields of struct rpcrdma_ia, including 375 * interface and provider attributes and protection zone. 376 */ 377 int 378 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) 379 { 380 struct rpcrdma_ia *ia = &xprt->rx_ia; 381 int rc; 382 383 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 384 if (IS_ERR(ia->ri_id)) { 385 rc = PTR_ERR(ia->ri_id); 386 goto out1; 387 } 388 ia->ri_device = ia->ri_id->device; 389 390 ia->ri_pd = ib_alloc_pd(ia->ri_device); 391 if (IS_ERR(ia->ri_pd)) { 392 rc = PTR_ERR(ia->ri_pd); 393 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 394 goto out2; 395 } 396 397 switch (memreg) { 398 case RPCRDMA_FRMR: 399 if (frwr_is_supported(ia)) { 400 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 401 break; 402 } 403 /*FALLTHROUGH*/ 404 case RPCRDMA_MTHCAFMR: 405 if (fmr_is_supported(ia)) { 406 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 407 break; 408 } 409 /*FALLTHROUGH*/ 410 default: 411 pr_err("rpcrdma: Unsupported memory registration mode: %d\n", 412 memreg); 413 rc = -EINVAL; 414 goto out3; 415 } 416 417 return 0; 418 419 out3: 420 ib_dealloc_pd(ia->ri_pd); 421 ia->ri_pd = NULL; 422 out2: 423 rpcrdma_destroy_id(ia->ri_id); 424 ia->ri_id = NULL; 425 out1: 426 return rc; 427 } 428 429 /* 430 * Clean up/close an IA. 431 * o if event handles and PD have been initialized, free them. 432 * o close the IA 433 */ 434 void 435 rpcrdma_ia_close(struct rpcrdma_ia *ia) 436 { 437 dprintk("RPC: %s: entering\n", __func__); 438 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 439 if (ia->ri_id->qp) 440 rdma_destroy_qp(ia->ri_id); 441 rpcrdma_destroy_id(ia->ri_id); 442 ia->ri_id = NULL; 443 } 444 445 /* If the pd is still busy, xprtrdma missed freeing a resource */ 446 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 447 ib_dealloc_pd(ia->ri_pd); 448 } 449 450 /* 451 * Create unconnected endpoint. 452 */ 453 int 454 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 455 struct rpcrdma_create_data_internal *cdata) 456 { 457 struct ib_cq *sendcq, *recvcq; 458 unsigned int max_qp_wr; 459 int rc; 460 461 if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) { 462 dprintk("RPC: %s: insufficient sge's available\n", 463 __func__); 464 return -ENOMEM; 465 } 466 467 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 468 dprintk("RPC: %s: insufficient wqe's available\n", 469 __func__); 470 return -ENOMEM; 471 } 472 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 473 474 /* check provider's send/recv wr limits */ 475 if (cdata->max_requests > max_qp_wr) 476 cdata->max_requests = max_qp_wr; 477 478 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 479 ep->rep_attr.qp_context = ep; 480 ep->rep_attr.srq = NULL; 481 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 482 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 483 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 484 rc = ia->ri_ops->ro_open(ia, ep, cdata); 485 if (rc) 486 return rc; 487 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 488 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 489 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 490 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; 491 ep->rep_attr.cap.max_recv_sge = 1; 492 ep->rep_attr.cap.max_inline_data = 0; 493 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 494 ep->rep_attr.qp_type = IB_QPT_RC; 495 ep->rep_attr.port_num = ~0; 496 497 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 498 "iovs: send %d recv %d\n", 499 __func__, 500 ep->rep_attr.cap.max_send_wr, 501 ep->rep_attr.cap.max_recv_wr, 502 ep->rep_attr.cap.max_send_sge, 503 ep->rep_attr.cap.max_recv_sge); 504 505 /* set trigger for requesting send completion */ 506 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 507 if (ep->rep_cqinit <= 2) 508 ep->rep_cqinit = 0; /* always signal? */ 509 INIT_CQCOUNT(ep); 510 init_waitqueue_head(&ep->rep_connect_wait); 511 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 512 513 sendcq = ib_alloc_cq(ia->ri_device, NULL, 514 ep->rep_attr.cap.max_send_wr + 1, 515 0, IB_POLL_SOFTIRQ); 516 if (IS_ERR(sendcq)) { 517 rc = PTR_ERR(sendcq); 518 dprintk("RPC: %s: failed to create send CQ: %i\n", 519 __func__, rc); 520 goto out1; 521 } 522 523 recvcq = ib_alloc_cq(ia->ri_device, NULL, 524 ep->rep_attr.cap.max_recv_wr + 1, 525 0, IB_POLL_SOFTIRQ); 526 if (IS_ERR(recvcq)) { 527 rc = PTR_ERR(recvcq); 528 dprintk("RPC: %s: failed to create recv CQ: %i\n", 529 __func__, rc); 530 goto out2; 531 } 532 533 ep->rep_attr.send_cq = sendcq; 534 ep->rep_attr.recv_cq = recvcq; 535 536 /* Initialize cma parameters */ 537 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 538 539 /* RPC/RDMA does not use private data */ 540 ep->rep_remote_cma.private_data = NULL; 541 ep->rep_remote_cma.private_data_len = 0; 542 543 /* Client offers RDMA Read but does not initiate */ 544 ep->rep_remote_cma.initiator_depth = 0; 545 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 546 ep->rep_remote_cma.responder_resources = 32; 547 else 548 ep->rep_remote_cma.responder_resources = 549 ia->ri_device->attrs.max_qp_rd_atom; 550 551 /* Limit transport retries so client can detect server 552 * GID changes quickly. RPC layer handles re-establishing 553 * transport connection and retransmission. 554 */ 555 ep->rep_remote_cma.retry_count = 6; 556 557 /* RPC-over-RDMA handles its own flow control. In addition, 558 * make all RNR NAKs visible so we know that RPC-over-RDMA 559 * flow control is working correctly (no NAKs should be seen). 560 */ 561 ep->rep_remote_cma.flow_control = 0; 562 ep->rep_remote_cma.rnr_retry_count = 0; 563 564 return 0; 565 566 out2: 567 ib_free_cq(sendcq); 568 out1: 569 return rc; 570 } 571 572 /* 573 * rpcrdma_ep_destroy 574 * 575 * Disconnect and destroy endpoint. After this, the only 576 * valid operations on the ep are to free it (if dynamically 577 * allocated) or re-create it. 578 */ 579 void 580 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 581 { 582 dprintk("RPC: %s: entering, connected is %d\n", 583 __func__, ep->rep_connected); 584 585 cancel_delayed_work_sync(&ep->rep_connect_worker); 586 587 if (ia->ri_id->qp) { 588 rpcrdma_ep_disconnect(ep, ia); 589 rdma_destroy_qp(ia->ri_id); 590 ia->ri_id->qp = NULL; 591 } 592 593 ib_free_cq(ep->rep_attr.recv_cq); 594 ib_free_cq(ep->rep_attr.send_cq); 595 } 596 597 /* 598 * Connect unconnected endpoint. 599 */ 600 int 601 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 602 { 603 struct rdma_cm_id *id, *old; 604 int rc = 0; 605 int retry_count = 0; 606 607 if (ep->rep_connected != 0) { 608 struct rpcrdma_xprt *xprt; 609 retry: 610 dprintk("RPC: %s: reconnecting...\n", __func__); 611 612 rpcrdma_ep_disconnect(ep, ia); 613 614 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 615 id = rpcrdma_create_id(xprt, ia, 616 (struct sockaddr *)&xprt->rx_data.addr); 617 if (IS_ERR(id)) { 618 rc = -EHOSTUNREACH; 619 goto out; 620 } 621 /* TEMP TEMP TEMP - fail if new device: 622 * Deregister/remarshal *all* requests! 623 * Close and recreate adapter, pd, etc! 624 * Re-determine all attributes still sane! 625 * More stuff I haven't thought of! 626 * Rrrgh! 627 */ 628 if (ia->ri_device != id->device) { 629 printk("RPC: %s: can't reconnect on " 630 "different device!\n", __func__); 631 rpcrdma_destroy_id(id); 632 rc = -ENETUNREACH; 633 goto out; 634 } 635 /* END TEMP */ 636 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 637 if (rc) { 638 dprintk("RPC: %s: rdma_create_qp failed %i\n", 639 __func__, rc); 640 rpcrdma_destroy_id(id); 641 rc = -ENETUNREACH; 642 goto out; 643 } 644 645 old = ia->ri_id; 646 ia->ri_id = id; 647 648 rdma_destroy_qp(old); 649 rpcrdma_destroy_id(old); 650 } else { 651 dprintk("RPC: %s: connecting...\n", __func__); 652 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 653 if (rc) { 654 dprintk("RPC: %s: rdma_create_qp failed %i\n", 655 __func__, rc); 656 /* do not update ep->rep_connected */ 657 return -ENETUNREACH; 658 } 659 } 660 661 ep->rep_connected = 0; 662 663 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 664 if (rc) { 665 dprintk("RPC: %s: rdma_connect() failed with %i\n", 666 __func__, rc); 667 goto out; 668 } 669 670 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 671 672 /* 673 * Check state. A non-peer reject indicates no listener 674 * (ECONNREFUSED), which may be a transient state. All 675 * others indicate a transport condition which has already 676 * undergone a best-effort. 677 */ 678 if (ep->rep_connected == -ECONNREFUSED && 679 ++retry_count <= RDMA_CONNECT_RETRY_MAX) { 680 dprintk("RPC: %s: non-peer_reject, retry\n", __func__); 681 goto retry; 682 } 683 if (ep->rep_connected <= 0) { 684 /* Sometimes, the only way to reliably connect to remote 685 * CMs is to use same nonzero values for ORD and IRD. */ 686 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 && 687 (ep->rep_remote_cma.responder_resources == 0 || 688 ep->rep_remote_cma.initiator_depth != 689 ep->rep_remote_cma.responder_resources)) { 690 if (ep->rep_remote_cma.responder_resources == 0) 691 ep->rep_remote_cma.responder_resources = 1; 692 ep->rep_remote_cma.initiator_depth = 693 ep->rep_remote_cma.responder_resources; 694 goto retry; 695 } 696 rc = ep->rep_connected; 697 } else { 698 struct rpcrdma_xprt *r_xprt; 699 unsigned int extras; 700 701 dprintk("RPC: %s: connected\n", __func__); 702 703 r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 704 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 705 706 if (extras) { 707 rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); 708 if (rc) { 709 pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n", 710 __func__, rc); 711 rc = 0; 712 } 713 } 714 } 715 716 out: 717 if (rc) 718 ep->rep_connected = rc; 719 return rc; 720 } 721 722 /* 723 * rpcrdma_ep_disconnect 724 * 725 * This is separate from destroy to facilitate the ability 726 * to reconnect without recreating the endpoint. 727 * 728 * This call is not reentrant, and must not be made in parallel 729 * on the same endpoint. 730 */ 731 void 732 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 733 { 734 int rc; 735 736 rc = rdma_disconnect(ia->ri_id); 737 if (!rc) { 738 /* returns without wait if not connected */ 739 wait_event_interruptible(ep->rep_connect_wait, 740 ep->rep_connected != 1); 741 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 742 (ep->rep_connected == 1) ? "still " : "dis"); 743 } else { 744 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 745 ep->rep_connected = rc; 746 } 747 748 ib_drain_qp(ia->ri_id->qp); 749 } 750 751 static void 752 rpcrdma_mr_recovery_worker(struct work_struct *work) 753 { 754 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 755 rb_recovery_worker.work); 756 struct rpcrdma_mw *mw; 757 758 spin_lock(&buf->rb_recovery_lock); 759 while (!list_empty(&buf->rb_stale_mrs)) { 760 mw = list_first_entry(&buf->rb_stale_mrs, 761 struct rpcrdma_mw, mw_list); 762 list_del_init(&mw->mw_list); 763 spin_unlock(&buf->rb_recovery_lock); 764 765 dprintk("RPC: %s: recovering MR %p\n", __func__, mw); 766 mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); 767 768 spin_lock(&buf->rb_recovery_lock); 769 } 770 spin_unlock(&buf->rb_recovery_lock); 771 } 772 773 void 774 rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) 775 { 776 struct rpcrdma_xprt *r_xprt = mw->mw_xprt; 777 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 778 779 spin_lock(&buf->rb_recovery_lock); 780 list_add(&mw->mw_list, &buf->rb_stale_mrs); 781 spin_unlock(&buf->rb_recovery_lock); 782 783 schedule_delayed_work(&buf->rb_recovery_worker, 0); 784 } 785 786 static void 787 rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) 788 { 789 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 790 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 791 unsigned int count; 792 LIST_HEAD(free); 793 LIST_HEAD(all); 794 795 for (count = 0; count < 32; count++) { 796 struct rpcrdma_mw *mw; 797 int rc; 798 799 mw = kzalloc(sizeof(*mw), GFP_KERNEL); 800 if (!mw) 801 break; 802 803 rc = ia->ri_ops->ro_init_mr(ia, mw); 804 if (rc) { 805 kfree(mw); 806 break; 807 } 808 809 mw->mw_xprt = r_xprt; 810 811 list_add(&mw->mw_list, &free); 812 list_add(&mw->mw_all, &all); 813 } 814 815 spin_lock(&buf->rb_mwlock); 816 list_splice(&free, &buf->rb_mws); 817 list_splice(&all, &buf->rb_all); 818 r_xprt->rx_stats.mrs_allocated += count; 819 spin_unlock(&buf->rb_mwlock); 820 821 dprintk("RPC: %s: created %u MRs\n", __func__, count); 822 } 823 824 static void 825 rpcrdma_mr_refresh_worker(struct work_struct *work) 826 { 827 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 828 rb_refresh_worker.work); 829 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 830 rx_buf); 831 832 rpcrdma_create_mrs(r_xprt); 833 } 834 835 struct rpcrdma_req * 836 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 837 { 838 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 839 struct rpcrdma_req *req; 840 841 req = kzalloc(sizeof(*req), GFP_KERNEL); 842 if (req == NULL) 843 return ERR_PTR(-ENOMEM); 844 845 INIT_LIST_HEAD(&req->rl_free); 846 spin_lock(&buffer->rb_reqslock); 847 list_add(&req->rl_all, &buffer->rb_allreqs); 848 spin_unlock(&buffer->rb_reqslock); 849 req->rl_cqe.done = rpcrdma_wc_send; 850 req->rl_buffer = &r_xprt->rx_buf; 851 INIT_LIST_HEAD(&req->rl_registered); 852 return req; 853 } 854 855 struct rpcrdma_rep * 856 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 857 { 858 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 859 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 860 struct rpcrdma_rep *rep; 861 int rc; 862 863 rc = -ENOMEM; 864 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 865 if (rep == NULL) 866 goto out; 867 868 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize, 869 GFP_KERNEL); 870 if (IS_ERR(rep->rr_rdmabuf)) { 871 rc = PTR_ERR(rep->rr_rdmabuf); 872 goto out_free; 873 } 874 875 rep->rr_device = ia->ri_device; 876 rep->rr_cqe.done = rpcrdma_receive_wc; 877 rep->rr_rxprt = r_xprt; 878 INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); 879 return rep; 880 881 out_free: 882 kfree(rep); 883 out: 884 return ERR_PTR(rc); 885 } 886 887 int 888 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 889 { 890 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 891 int i, rc; 892 893 buf->rb_max_requests = r_xprt->rx_data.max_requests; 894 buf->rb_bc_srv_max_requests = 0; 895 atomic_set(&buf->rb_credits, 1); 896 spin_lock_init(&buf->rb_mwlock); 897 spin_lock_init(&buf->rb_lock); 898 spin_lock_init(&buf->rb_recovery_lock); 899 INIT_LIST_HEAD(&buf->rb_mws); 900 INIT_LIST_HEAD(&buf->rb_all); 901 INIT_LIST_HEAD(&buf->rb_stale_mrs); 902 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 903 rpcrdma_mr_refresh_worker); 904 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 905 rpcrdma_mr_recovery_worker); 906 907 rpcrdma_create_mrs(r_xprt); 908 909 INIT_LIST_HEAD(&buf->rb_send_bufs); 910 INIT_LIST_HEAD(&buf->rb_allreqs); 911 spin_lock_init(&buf->rb_reqslock); 912 for (i = 0; i < buf->rb_max_requests; i++) { 913 struct rpcrdma_req *req; 914 915 req = rpcrdma_create_req(r_xprt); 916 if (IS_ERR(req)) { 917 dprintk("RPC: %s: request buffer %d alloc" 918 " failed\n", __func__, i); 919 rc = PTR_ERR(req); 920 goto out; 921 } 922 req->rl_backchannel = false; 923 list_add(&req->rl_free, &buf->rb_send_bufs); 924 } 925 926 INIT_LIST_HEAD(&buf->rb_recv_bufs); 927 for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) { 928 struct rpcrdma_rep *rep; 929 930 rep = rpcrdma_create_rep(r_xprt); 931 if (IS_ERR(rep)) { 932 dprintk("RPC: %s: reply buffer %d alloc failed\n", 933 __func__, i); 934 rc = PTR_ERR(rep); 935 goto out; 936 } 937 list_add(&rep->rr_list, &buf->rb_recv_bufs); 938 } 939 940 return 0; 941 out: 942 rpcrdma_buffer_destroy(buf); 943 return rc; 944 } 945 946 static struct rpcrdma_req * 947 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 948 { 949 struct rpcrdma_req *req; 950 951 req = list_first_entry(&buf->rb_send_bufs, 952 struct rpcrdma_req, rl_free); 953 list_del(&req->rl_free); 954 return req; 955 } 956 957 static struct rpcrdma_rep * 958 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 959 { 960 struct rpcrdma_rep *rep; 961 962 rep = list_first_entry(&buf->rb_recv_bufs, 963 struct rpcrdma_rep, rr_list); 964 list_del(&rep->rr_list); 965 return rep; 966 } 967 968 static void 969 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) 970 { 971 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); 972 kfree(rep); 973 } 974 975 void 976 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) 977 { 978 rpcrdma_free_regbuf(ia, req->rl_sendbuf); 979 rpcrdma_free_regbuf(ia, req->rl_rdmabuf); 980 kfree(req); 981 } 982 983 static void 984 rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) 985 { 986 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 987 rx_buf); 988 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 989 struct rpcrdma_mw *mw; 990 unsigned int count; 991 992 count = 0; 993 spin_lock(&buf->rb_mwlock); 994 while (!list_empty(&buf->rb_all)) { 995 mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); 996 list_del(&mw->mw_all); 997 998 spin_unlock(&buf->rb_mwlock); 999 ia->ri_ops->ro_release_mr(mw); 1000 count++; 1001 spin_lock(&buf->rb_mwlock); 1002 } 1003 spin_unlock(&buf->rb_mwlock); 1004 r_xprt->rx_stats.mrs_allocated = 0; 1005 1006 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1007 } 1008 1009 void 1010 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1011 { 1012 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1013 1014 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1015 1016 while (!list_empty(&buf->rb_recv_bufs)) { 1017 struct rpcrdma_rep *rep; 1018 1019 rep = rpcrdma_buffer_get_rep_locked(buf); 1020 rpcrdma_destroy_rep(ia, rep); 1021 } 1022 buf->rb_send_count = 0; 1023 1024 spin_lock(&buf->rb_reqslock); 1025 while (!list_empty(&buf->rb_allreqs)) { 1026 struct rpcrdma_req *req; 1027 1028 req = list_first_entry(&buf->rb_allreqs, 1029 struct rpcrdma_req, rl_all); 1030 list_del(&req->rl_all); 1031 1032 spin_unlock(&buf->rb_reqslock); 1033 rpcrdma_destroy_req(ia, req); 1034 spin_lock(&buf->rb_reqslock); 1035 } 1036 spin_unlock(&buf->rb_reqslock); 1037 buf->rb_recv_count = 0; 1038 1039 rpcrdma_destroy_mrs(buf); 1040 } 1041 1042 struct rpcrdma_mw * 1043 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) 1044 { 1045 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1046 struct rpcrdma_mw *mw = NULL; 1047 1048 spin_lock(&buf->rb_mwlock); 1049 if (!list_empty(&buf->rb_mws)) { 1050 mw = list_first_entry(&buf->rb_mws, 1051 struct rpcrdma_mw, mw_list); 1052 list_del_init(&mw->mw_list); 1053 } 1054 spin_unlock(&buf->rb_mwlock); 1055 1056 if (!mw) 1057 goto out_nomws; 1058 return mw; 1059 1060 out_nomws: 1061 dprintk("RPC: %s: no MWs available\n", __func__); 1062 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1063 1064 /* Allow the reply handler and refresh worker to run */ 1065 cond_resched(); 1066 1067 return NULL; 1068 } 1069 1070 void 1071 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) 1072 { 1073 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1074 1075 spin_lock(&buf->rb_mwlock); 1076 list_add_tail(&mw->mw_list, &buf->rb_mws); 1077 spin_unlock(&buf->rb_mwlock); 1078 } 1079 1080 static struct rpcrdma_rep * 1081 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) 1082 { 1083 /* If an RPC previously completed without a reply (say, a 1084 * credential problem or a soft timeout occurs) then hold off 1085 * on supplying more Receive buffers until the number of new 1086 * pending RPCs catches up to the number of posted Receives. 1087 */ 1088 if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) 1089 return NULL; 1090 1091 if (unlikely(list_empty(&buffers->rb_recv_bufs))) 1092 return NULL; 1093 buffers->rb_recv_count++; 1094 return rpcrdma_buffer_get_rep_locked(buffers); 1095 } 1096 1097 /* 1098 * Get a set of request/reply buffers. 1099 * 1100 * Reply buffer (if available) is attached to send buffer upon return. 1101 */ 1102 struct rpcrdma_req * 1103 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1104 { 1105 struct rpcrdma_req *req; 1106 1107 spin_lock(&buffers->rb_lock); 1108 if (list_empty(&buffers->rb_send_bufs)) 1109 goto out_reqbuf; 1110 buffers->rb_send_count++; 1111 req = rpcrdma_buffer_get_req_locked(buffers); 1112 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1113 spin_unlock(&buffers->rb_lock); 1114 return req; 1115 1116 out_reqbuf: 1117 spin_unlock(&buffers->rb_lock); 1118 pr_warn("RPC: %s: out of request buffers\n", __func__); 1119 return NULL; 1120 } 1121 1122 /* 1123 * Put request/reply buffers back into pool. 1124 * Pre-decrement counter/array index. 1125 */ 1126 void 1127 rpcrdma_buffer_put(struct rpcrdma_req *req) 1128 { 1129 struct rpcrdma_buffer *buffers = req->rl_buffer; 1130 struct rpcrdma_rep *rep = req->rl_reply; 1131 1132 req->rl_niovs = 0; 1133 req->rl_reply = NULL; 1134 1135 spin_lock(&buffers->rb_lock); 1136 buffers->rb_send_count--; 1137 list_add_tail(&req->rl_free, &buffers->rb_send_bufs); 1138 if (rep) { 1139 buffers->rb_recv_count--; 1140 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1141 } 1142 spin_unlock(&buffers->rb_lock); 1143 } 1144 1145 /* 1146 * Recover reply buffers from pool. 1147 * This happens when recovering from disconnect. 1148 */ 1149 void 1150 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1151 { 1152 struct rpcrdma_buffer *buffers = req->rl_buffer; 1153 1154 spin_lock(&buffers->rb_lock); 1155 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1156 spin_unlock(&buffers->rb_lock); 1157 } 1158 1159 /* 1160 * Put reply buffers back into pool when not attached to 1161 * request. This happens in error conditions. 1162 */ 1163 void 1164 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1165 { 1166 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1167 1168 spin_lock(&buffers->rb_lock); 1169 buffers->rb_recv_count--; 1170 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1171 spin_unlock(&buffers->rb_lock); 1172 } 1173 1174 /* 1175 * Wrappers for internal-use kmalloc memory registration, used by buffer code. 1176 */ 1177 1178 /** 1179 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers 1180 * @ia: controlling rpcrdma_ia 1181 * @size: size of buffer to be allocated, in bytes 1182 * @flags: GFP flags 1183 * 1184 * Returns pointer to private header of an area of internally 1185 * registered memory, or an ERR_PTR. The registered buffer follows 1186 * the end of the private header. 1187 * 1188 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1189 * receiving the payload of RDMA RECV operations. regbufs are not 1190 * used for RDMA READ/WRITE operations, thus are registered only for 1191 * LOCAL access. 1192 */ 1193 struct rpcrdma_regbuf * 1194 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags) 1195 { 1196 struct rpcrdma_regbuf *rb; 1197 struct ib_sge *iov; 1198 1199 rb = kmalloc(sizeof(*rb) + size, flags); 1200 if (rb == NULL) 1201 goto out; 1202 1203 iov = &rb->rg_iov; 1204 iov->addr = ib_dma_map_single(ia->ri_device, 1205 (void *)rb->rg_base, size, 1206 DMA_BIDIRECTIONAL); 1207 if (ib_dma_mapping_error(ia->ri_device, iov->addr)) 1208 goto out_free; 1209 1210 iov->length = size; 1211 iov->lkey = ia->ri_pd->local_dma_lkey; 1212 rb->rg_size = size; 1213 rb->rg_owner = NULL; 1214 return rb; 1215 1216 out_free: 1217 kfree(rb); 1218 out: 1219 return ERR_PTR(-ENOMEM); 1220 } 1221 1222 /** 1223 * rpcrdma_free_regbuf - deregister and free registered buffer 1224 * @ia: controlling rpcrdma_ia 1225 * @rb: regbuf to be deregistered and freed 1226 */ 1227 void 1228 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1229 { 1230 struct ib_sge *iov; 1231 1232 if (!rb) 1233 return; 1234 1235 iov = &rb->rg_iov; 1236 ib_dma_unmap_single(ia->ri_device, 1237 iov->addr, iov->length, DMA_BIDIRECTIONAL); 1238 kfree(rb); 1239 } 1240 1241 /* 1242 * Prepost any receive buffer, then post send. 1243 * 1244 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1245 */ 1246 int 1247 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1248 struct rpcrdma_ep *ep, 1249 struct rpcrdma_req *req) 1250 { 1251 struct ib_device *device = ia->ri_device; 1252 struct ib_send_wr send_wr, *send_wr_fail; 1253 struct rpcrdma_rep *rep = req->rl_reply; 1254 struct ib_sge *iov = req->rl_send_iov; 1255 int i, rc; 1256 1257 if (rep) { 1258 rc = rpcrdma_ep_post_recv(ia, ep, rep); 1259 if (rc) 1260 return rc; 1261 req->rl_reply = NULL; 1262 } 1263 1264 send_wr.next = NULL; 1265 send_wr.wr_cqe = &req->rl_cqe; 1266 send_wr.sg_list = iov; 1267 send_wr.num_sge = req->rl_niovs; 1268 send_wr.opcode = IB_WR_SEND; 1269 1270 for (i = 0; i < send_wr.num_sge; i++) 1271 ib_dma_sync_single_for_device(device, iov[i].addr, 1272 iov[i].length, DMA_TO_DEVICE); 1273 dprintk("RPC: %s: posting %d s/g entries\n", 1274 __func__, send_wr.num_sge); 1275 1276 if (DECR_CQCOUNT(ep) > 0) 1277 send_wr.send_flags = 0; 1278 else { /* Provider must take a send completion every now and then */ 1279 INIT_CQCOUNT(ep); 1280 send_wr.send_flags = IB_SEND_SIGNALED; 1281 } 1282 1283 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail); 1284 if (rc) 1285 goto out_postsend_err; 1286 return 0; 1287 1288 out_postsend_err: 1289 pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); 1290 return -ENOTCONN; 1291 } 1292 1293 /* 1294 * (Re)post a receive buffer. 1295 */ 1296 int 1297 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1298 struct rpcrdma_ep *ep, 1299 struct rpcrdma_rep *rep) 1300 { 1301 struct ib_recv_wr recv_wr, *recv_wr_fail; 1302 int rc; 1303 1304 recv_wr.next = NULL; 1305 recv_wr.wr_cqe = &rep->rr_cqe; 1306 recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1307 recv_wr.num_sge = 1; 1308 1309 ib_dma_sync_single_for_cpu(ia->ri_device, 1310 rdmab_addr(rep->rr_rdmabuf), 1311 rdmab_length(rep->rr_rdmabuf), 1312 DMA_BIDIRECTIONAL); 1313 1314 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); 1315 if (rc) 1316 goto out_postrecv; 1317 return 0; 1318 1319 out_postrecv: 1320 pr_err("rpcrdma: ib_post_recv returned %i\n", rc); 1321 return -ENOTCONN; 1322 } 1323 1324 /** 1325 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1326 * @r_xprt: transport associated with these backchannel resources 1327 * @min_reqs: minimum number of incoming requests expected 1328 * 1329 * Returns zero if all requested buffers were posted, or a negative errno. 1330 */ 1331 int 1332 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1333 { 1334 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1335 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1336 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 1337 struct rpcrdma_rep *rep; 1338 int rc; 1339 1340 while (count--) { 1341 spin_lock(&buffers->rb_lock); 1342 if (list_empty(&buffers->rb_recv_bufs)) 1343 goto out_reqbuf; 1344 rep = rpcrdma_buffer_get_rep_locked(buffers); 1345 spin_unlock(&buffers->rb_lock); 1346 1347 rc = rpcrdma_ep_post_recv(ia, ep, rep); 1348 if (rc) 1349 goto out_rc; 1350 } 1351 1352 return 0; 1353 1354 out_reqbuf: 1355 spin_unlock(&buffers->rb_lock); 1356 pr_warn("%s: no extra receive buffers\n", __func__); 1357 return -ENOMEM; 1358 1359 out_rc: 1360 rpcrdma_recv_buffer_put(rep); 1361 return rc; 1362 } 1363