1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * verbs.c 42 * 43 * Encapsulates the major functions managing: 44 * o adapters 45 * o endpoints 46 * o connections 47 * o buffer memory 48 */ 49 50 #include <linux/interrupt.h> 51 #include <linux/slab.h> 52 #include <linux/prefetch.h> 53 #include <linux/sunrpc/addr.h> 54 #include <asm/bitops.h> 55 56 #include "xprt_rdma.h" 57 58 /* 59 * Globals/Macros 60 */ 61 62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 63 # define RPCDBG_FACILITY RPCDBG_TRANS 64 #endif 65 66 /* 67 * internal functions 68 */ 69 70 /* 71 * handle replies in tasklet context, using a single, global list 72 * rdma tasklet function -- just turn around and call the func 73 * for all replies on the list 74 */ 75 76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g); 77 static LIST_HEAD(rpcrdma_tasklets_g); 78 79 static void 80 rpcrdma_run_tasklet(unsigned long data) 81 { 82 struct rpcrdma_rep *rep; 83 void (*func)(struct rpcrdma_rep *); 84 unsigned long flags; 85 86 data = data; 87 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); 88 while (!list_empty(&rpcrdma_tasklets_g)) { 89 rep = list_entry(rpcrdma_tasklets_g.next, 90 struct rpcrdma_rep, rr_list); 91 list_del(&rep->rr_list); 92 func = rep->rr_func; 93 rep->rr_func = NULL; 94 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); 95 96 if (func) 97 func(rep); 98 else 99 rpcrdma_recv_buffer_put(rep); 100 101 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); 102 } 103 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); 104 } 105 106 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL); 107 108 static void 109 rpcrdma_schedule_tasklet(struct list_head *sched_list) 110 { 111 unsigned long flags; 112 113 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); 114 list_splice_tail(sched_list, &rpcrdma_tasklets_g); 115 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); 116 tasklet_schedule(&rpcrdma_tasklet_g); 117 } 118 119 static void 120 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 121 { 122 struct rpcrdma_ep *ep = context; 123 124 pr_err("RPC: %s: %s on device %s ep %p\n", 125 __func__, ib_event_msg(event->event), 126 event->device->name, context); 127 if (ep->rep_connected == 1) { 128 ep->rep_connected = -EIO; 129 rpcrdma_conn_func(ep); 130 wake_up_all(&ep->rep_connect_wait); 131 } 132 } 133 134 static void 135 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context) 136 { 137 struct rpcrdma_ep *ep = context; 138 139 pr_err("RPC: %s: %s on device %s ep %p\n", 140 __func__, ib_event_msg(event->event), 141 event->device->name, context); 142 if (ep->rep_connected == 1) { 143 ep->rep_connected = -EIO; 144 rpcrdma_conn_func(ep); 145 wake_up_all(&ep->rep_connect_wait); 146 } 147 } 148 149 static void 150 rpcrdma_sendcq_process_wc(struct ib_wc *wc) 151 { 152 /* WARNING: Only wr_id and status are reliable at this point */ 153 if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) { 154 if (wc->status != IB_WC_SUCCESS && 155 wc->status != IB_WC_WR_FLUSH_ERR) 156 pr_err("RPC: %s: SEND: %s\n", 157 __func__, ib_wc_status_msg(wc->status)); 158 } else { 159 struct rpcrdma_mw *r; 160 161 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; 162 r->mw_sendcompletion(wc); 163 } 164 } 165 166 static int 167 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) 168 { 169 struct ib_wc *wcs; 170 int budget, count, rc; 171 172 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; 173 do { 174 wcs = ep->rep_send_wcs; 175 176 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); 177 if (rc <= 0) 178 return rc; 179 180 count = rc; 181 while (count-- > 0) 182 rpcrdma_sendcq_process_wc(wcs++); 183 } while (rc == RPCRDMA_POLLSIZE && --budget); 184 return 0; 185 } 186 187 /* 188 * Handle send, fast_reg_mr, and local_inv completions. 189 * 190 * Send events are typically suppressed and thus do not result 191 * in an upcall. Occasionally one is signaled, however. This 192 * prevents the provider's completion queue from wrapping and 193 * losing a completion. 194 */ 195 static void 196 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) 197 { 198 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; 199 int rc; 200 201 rc = rpcrdma_sendcq_poll(cq, ep); 202 if (rc) { 203 dprintk("RPC: %s: ib_poll_cq failed: %i\n", 204 __func__, rc); 205 return; 206 } 207 208 rc = ib_req_notify_cq(cq, 209 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); 210 if (rc == 0) 211 return; 212 if (rc < 0) { 213 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", 214 __func__, rc); 215 return; 216 } 217 218 rpcrdma_sendcq_poll(cq, ep); 219 } 220 221 static void 222 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) 223 { 224 struct rpcrdma_rep *rep = 225 (struct rpcrdma_rep *)(unsigned long)wc->wr_id; 226 227 /* WARNING: Only wr_id and status are reliable at this point */ 228 if (wc->status != IB_WC_SUCCESS) 229 goto out_fail; 230 231 /* status == SUCCESS means all fields in wc are trustworthy */ 232 if (wc->opcode != IB_WC_RECV) 233 return; 234 235 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 236 __func__, rep, wc->byte_len); 237 238 rep->rr_len = wc->byte_len; 239 ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device, 240 rdmab_addr(rep->rr_rdmabuf), 241 rep->rr_len, DMA_FROM_DEVICE); 242 prefetch(rdmab_to_msg(rep->rr_rdmabuf)); 243 244 out_schedule: 245 list_add_tail(&rep->rr_list, sched_list); 246 return; 247 out_fail: 248 if (wc->status != IB_WC_WR_FLUSH_ERR) 249 pr_err("RPC: %s: rep %p: %s\n", 250 __func__, rep, ib_wc_status_msg(wc->status)); 251 rep->rr_len = ~0U; 252 goto out_schedule; 253 } 254 255 static int 256 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) 257 { 258 struct list_head sched_list; 259 struct ib_wc *wcs; 260 int budget, count, rc; 261 262 INIT_LIST_HEAD(&sched_list); 263 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; 264 do { 265 wcs = ep->rep_recv_wcs; 266 267 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); 268 if (rc <= 0) 269 goto out_schedule; 270 271 count = rc; 272 while (count-- > 0) 273 rpcrdma_recvcq_process_wc(wcs++, &sched_list); 274 } while (rc == RPCRDMA_POLLSIZE && --budget); 275 rc = 0; 276 277 out_schedule: 278 rpcrdma_schedule_tasklet(&sched_list); 279 return rc; 280 } 281 282 /* 283 * Handle receive completions. 284 * 285 * It is reentrant but processes single events in order to maintain 286 * ordering of receives to keep server credits. 287 * 288 * It is the responsibility of the scheduled tasklet to return 289 * recv buffers to the pool. NOTE: this affects synchronization of 290 * connection shutdown. That is, the structures required for 291 * the completion of the reply handler must remain intact until 292 * all memory has been reclaimed. 293 */ 294 static void 295 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) 296 { 297 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; 298 int rc; 299 300 rc = rpcrdma_recvcq_poll(cq, ep); 301 if (rc) { 302 dprintk("RPC: %s: ib_poll_cq failed: %i\n", 303 __func__, rc); 304 return; 305 } 306 307 rc = ib_req_notify_cq(cq, 308 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); 309 if (rc == 0) 310 return; 311 if (rc < 0) { 312 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", 313 __func__, rc); 314 return; 315 } 316 317 rpcrdma_recvcq_poll(cq, ep); 318 } 319 320 static void 321 rpcrdma_flush_cqs(struct rpcrdma_ep *ep) 322 { 323 struct ib_wc wc; 324 LIST_HEAD(sched_list); 325 326 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) 327 rpcrdma_recvcq_process_wc(&wc, &sched_list); 328 if (!list_empty(&sched_list)) 329 rpcrdma_schedule_tasklet(&sched_list); 330 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) 331 rpcrdma_sendcq_process_wc(&wc); 332 } 333 334 static int 335 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 336 { 337 struct rpcrdma_xprt *xprt = id->context; 338 struct rpcrdma_ia *ia = &xprt->rx_ia; 339 struct rpcrdma_ep *ep = &xprt->rx_ep; 340 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 341 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 342 #endif 343 struct ib_qp_attr *attr = &ia->ri_qp_attr; 344 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr; 345 int connstate = 0; 346 347 switch (event->event) { 348 case RDMA_CM_EVENT_ADDR_RESOLVED: 349 case RDMA_CM_EVENT_ROUTE_RESOLVED: 350 ia->ri_async_rc = 0; 351 complete(&ia->ri_done); 352 break; 353 case RDMA_CM_EVENT_ADDR_ERROR: 354 ia->ri_async_rc = -EHOSTUNREACH; 355 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 356 __func__, ep); 357 complete(&ia->ri_done); 358 break; 359 case RDMA_CM_EVENT_ROUTE_ERROR: 360 ia->ri_async_rc = -ENETUNREACH; 361 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 362 __func__, ep); 363 complete(&ia->ri_done); 364 break; 365 case RDMA_CM_EVENT_ESTABLISHED: 366 connstate = 1; 367 ib_query_qp(ia->ri_id->qp, attr, 368 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, 369 iattr); 370 dprintk("RPC: %s: %d responder resources" 371 " (%d initiator)\n", 372 __func__, attr->max_dest_rd_atomic, 373 attr->max_rd_atomic); 374 goto connected; 375 case RDMA_CM_EVENT_CONNECT_ERROR: 376 connstate = -ENOTCONN; 377 goto connected; 378 case RDMA_CM_EVENT_UNREACHABLE: 379 connstate = -ENETDOWN; 380 goto connected; 381 case RDMA_CM_EVENT_REJECTED: 382 connstate = -ECONNREFUSED; 383 goto connected; 384 case RDMA_CM_EVENT_DISCONNECTED: 385 connstate = -ECONNABORTED; 386 goto connected; 387 case RDMA_CM_EVENT_DEVICE_REMOVAL: 388 connstate = -ENODEV; 389 connected: 390 dprintk("RPC: %s: %sconnected\n", 391 __func__, connstate > 0 ? "" : "dis"); 392 ep->rep_connected = connstate; 393 rpcrdma_conn_func(ep); 394 wake_up_all(&ep->rep_connect_wait); 395 /*FALLTHROUGH*/ 396 default: 397 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n", 398 __func__, sap, rpc_get_port(sap), ep, 399 rdma_event_msg(event->event)); 400 break; 401 } 402 403 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 404 if (connstate == 1) { 405 int ird = attr->max_dest_rd_atomic; 406 int tird = ep->rep_remote_cma.responder_resources; 407 408 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n", 409 sap, rpc_get_port(sap), 410 ia->ri_id->device->name, 411 ia->ri_ops->ro_displayname, 412 xprt->rx_buf.rb_max_requests, 413 ird, ird < 4 && ird < tird / 2 ? " (low!)" : ""); 414 } else if (connstate < 0) { 415 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n", 416 sap, rpc_get_port(sap), connstate); 417 } 418 #endif 419 420 return 0; 421 } 422 423 static struct rdma_cm_id * 424 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 425 struct rpcrdma_ia *ia, struct sockaddr *addr) 426 { 427 struct rdma_cm_id *id; 428 int rc; 429 430 init_completion(&ia->ri_done); 431 432 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC); 433 if (IS_ERR(id)) { 434 rc = PTR_ERR(id); 435 dprintk("RPC: %s: rdma_create_id() failed %i\n", 436 __func__, rc); 437 return id; 438 } 439 440 ia->ri_async_rc = -ETIMEDOUT; 441 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 442 if (rc) { 443 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 444 __func__, rc); 445 goto out; 446 } 447 wait_for_completion_interruptible_timeout(&ia->ri_done, 448 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 449 rc = ia->ri_async_rc; 450 if (rc) 451 goto out; 452 453 ia->ri_async_rc = -ETIMEDOUT; 454 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 455 if (rc) { 456 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 457 __func__, rc); 458 goto out; 459 } 460 wait_for_completion_interruptible_timeout(&ia->ri_done, 461 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 462 rc = ia->ri_async_rc; 463 if (rc) 464 goto out; 465 466 return id; 467 468 out: 469 rdma_destroy_id(id); 470 return ERR_PTR(rc); 471 } 472 473 /* 474 * Drain any cq, prior to teardown. 475 */ 476 static void 477 rpcrdma_clean_cq(struct ib_cq *cq) 478 { 479 struct ib_wc wc; 480 int count = 0; 481 482 while (1 == ib_poll_cq(cq, 1, &wc)) 483 ++count; 484 485 if (count) 486 dprintk("RPC: %s: flushed %d events (last 0x%x)\n", 487 __func__, count, wc.opcode); 488 } 489 490 /* 491 * Exported functions. 492 */ 493 494 /* 495 * Open and initialize an Interface Adapter. 496 * o initializes fields of struct rpcrdma_ia, including 497 * interface and provider attributes and protection zone. 498 */ 499 int 500 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) 501 { 502 int rc, mem_priv; 503 struct rpcrdma_ia *ia = &xprt->rx_ia; 504 struct ib_device_attr *devattr = &ia->ri_devattr; 505 506 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 507 if (IS_ERR(ia->ri_id)) { 508 rc = PTR_ERR(ia->ri_id); 509 goto out1; 510 } 511 512 ia->ri_pd = ib_alloc_pd(ia->ri_id->device); 513 if (IS_ERR(ia->ri_pd)) { 514 rc = PTR_ERR(ia->ri_pd); 515 dprintk("RPC: %s: ib_alloc_pd() failed %i\n", 516 __func__, rc); 517 goto out2; 518 } 519 520 rc = ib_query_device(ia->ri_id->device, devattr); 521 if (rc) { 522 dprintk("RPC: %s: ib_query_device failed %d\n", 523 __func__, rc); 524 goto out3; 525 } 526 527 if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) { 528 ia->ri_have_dma_lkey = 1; 529 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey; 530 } 531 532 if (memreg == RPCRDMA_FRMR) { 533 /* Requires both frmr reg and local dma lkey */ 534 if (((devattr->device_cap_flags & 535 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) != 536 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) || 537 (devattr->max_fast_reg_page_list_len == 0)) { 538 dprintk("RPC: %s: FRMR registration " 539 "not supported by HCA\n", __func__); 540 memreg = RPCRDMA_MTHCAFMR; 541 } 542 } 543 if (memreg == RPCRDMA_MTHCAFMR) { 544 if (!ia->ri_id->device->alloc_fmr) { 545 dprintk("RPC: %s: MTHCAFMR registration " 546 "not supported by HCA\n", __func__); 547 memreg = RPCRDMA_ALLPHYSICAL; 548 } 549 } 550 551 /* 552 * Optionally obtain an underlying physical identity mapping in 553 * order to do a memory window-based bind. This base registration 554 * is protected from remote access - that is enabled only by binding 555 * for the specific bytes targeted during each RPC operation, and 556 * revoked after the corresponding completion similar to a storage 557 * adapter. 558 */ 559 switch (memreg) { 560 case RPCRDMA_FRMR: 561 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 562 break; 563 case RPCRDMA_ALLPHYSICAL: 564 ia->ri_ops = &rpcrdma_physical_memreg_ops; 565 mem_priv = IB_ACCESS_LOCAL_WRITE | 566 IB_ACCESS_REMOTE_WRITE | 567 IB_ACCESS_REMOTE_READ; 568 goto register_setup; 569 case RPCRDMA_MTHCAFMR: 570 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 571 if (ia->ri_have_dma_lkey) 572 break; 573 mem_priv = IB_ACCESS_LOCAL_WRITE; 574 register_setup: 575 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv); 576 if (IS_ERR(ia->ri_bind_mem)) { 577 printk(KERN_ALERT "%s: ib_get_dma_mr for " 578 "phys register failed with %lX\n", 579 __func__, PTR_ERR(ia->ri_bind_mem)); 580 rc = -ENOMEM; 581 goto out3; 582 } 583 break; 584 default: 585 printk(KERN_ERR "RPC: Unsupported memory " 586 "registration mode: %d\n", memreg); 587 rc = -ENOMEM; 588 goto out3; 589 } 590 dprintk("RPC: %s: memory registration strategy is '%s'\n", 591 __func__, ia->ri_ops->ro_displayname); 592 593 /* Else will do memory reg/dereg for each chunk */ 594 ia->ri_memreg_strategy = memreg; 595 596 rwlock_init(&ia->ri_qplock); 597 return 0; 598 599 out3: 600 ib_dealloc_pd(ia->ri_pd); 601 ia->ri_pd = NULL; 602 out2: 603 rdma_destroy_id(ia->ri_id); 604 ia->ri_id = NULL; 605 out1: 606 return rc; 607 } 608 609 /* 610 * Clean up/close an IA. 611 * o if event handles and PD have been initialized, free them. 612 * o close the IA 613 */ 614 void 615 rpcrdma_ia_close(struct rpcrdma_ia *ia) 616 { 617 int rc; 618 619 dprintk("RPC: %s: entering\n", __func__); 620 if (ia->ri_bind_mem != NULL) { 621 rc = ib_dereg_mr(ia->ri_bind_mem); 622 dprintk("RPC: %s: ib_dereg_mr returned %i\n", 623 __func__, rc); 624 } 625 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 626 if (ia->ri_id->qp) 627 rdma_destroy_qp(ia->ri_id); 628 rdma_destroy_id(ia->ri_id); 629 ia->ri_id = NULL; 630 } 631 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) { 632 rc = ib_dealloc_pd(ia->ri_pd); 633 dprintk("RPC: %s: ib_dealloc_pd returned %i\n", 634 __func__, rc); 635 } 636 } 637 638 /* 639 * Create unconnected endpoint. 640 */ 641 int 642 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 643 struct rpcrdma_create_data_internal *cdata) 644 { 645 struct ib_device_attr *devattr = &ia->ri_devattr; 646 struct ib_cq *sendcq, *recvcq; 647 struct ib_cq_init_attr cq_attr = {}; 648 int rc, err; 649 650 /* check provider's send/recv wr limits */ 651 if (cdata->max_requests > devattr->max_qp_wr) 652 cdata->max_requests = devattr->max_qp_wr; 653 654 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 655 ep->rep_attr.qp_context = ep; 656 ep->rep_attr.srq = NULL; 657 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 658 rc = ia->ri_ops->ro_open(ia, ep, cdata); 659 if (rc) 660 return rc; 661 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 662 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2); 663 ep->rep_attr.cap.max_recv_sge = 1; 664 ep->rep_attr.cap.max_inline_data = 0; 665 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 666 ep->rep_attr.qp_type = IB_QPT_RC; 667 ep->rep_attr.port_num = ~0; 668 669 if (cdata->padding) { 670 ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding, 671 GFP_KERNEL); 672 if (IS_ERR(ep->rep_padbuf)) 673 return PTR_ERR(ep->rep_padbuf); 674 } else 675 ep->rep_padbuf = NULL; 676 677 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 678 "iovs: send %d recv %d\n", 679 __func__, 680 ep->rep_attr.cap.max_send_wr, 681 ep->rep_attr.cap.max_recv_wr, 682 ep->rep_attr.cap.max_send_sge, 683 ep->rep_attr.cap.max_recv_sge); 684 685 /* set trigger for requesting send completion */ 686 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 687 if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS) 688 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS; 689 else if (ep->rep_cqinit <= 2) 690 ep->rep_cqinit = 0; 691 INIT_CQCOUNT(ep); 692 init_waitqueue_head(&ep->rep_connect_wait); 693 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 694 695 cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; 696 sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall, 697 rpcrdma_cq_async_error_upcall, ep, &cq_attr); 698 if (IS_ERR(sendcq)) { 699 rc = PTR_ERR(sendcq); 700 dprintk("RPC: %s: failed to create send CQ: %i\n", 701 __func__, rc); 702 goto out1; 703 } 704 705 rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP); 706 if (rc) { 707 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", 708 __func__, rc); 709 goto out2; 710 } 711 712 cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; 713 recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall, 714 rpcrdma_cq_async_error_upcall, ep, &cq_attr); 715 if (IS_ERR(recvcq)) { 716 rc = PTR_ERR(recvcq); 717 dprintk("RPC: %s: failed to create recv CQ: %i\n", 718 __func__, rc); 719 goto out2; 720 } 721 722 rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP); 723 if (rc) { 724 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", 725 __func__, rc); 726 ib_destroy_cq(recvcq); 727 goto out2; 728 } 729 730 ep->rep_attr.send_cq = sendcq; 731 ep->rep_attr.recv_cq = recvcq; 732 733 /* Initialize cma parameters */ 734 735 /* RPC/RDMA does not use private data */ 736 ep->rep_remote_cma.private_data = NULL; 737 ep->rep_remote_cma.private_data_len = 0; 738 739 /* Client offers RDMA Read but does not initiate */ 740 ep->rep_remote_cma.initiator_depth = 0; 741 if (devattr->max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 742 ep->rep_remote_cma.responder_resources = 32; 743 else 744 ep->rep_remote_cma.responder_resources = 745 devattr->max_qp_rd_atom; 746 747 ep->rep_remote_cma.retry_count = 7; 748 ep->rep_remote_cma.flow_control = 0; 749 ep->rep_remote_cma.rnr_retry_count = 0; 750 751 return 0; 752 753 out2: 754 err = ib_destroy_cq(sendcq); 755 if (err) 756 dprintk("RPC: %s: ib_destroy_cq returned %i\n", 757 __func__, err); 758 out1: 759 rpcrdma_free_regbuf(ia, ep->rep_padbuf); 760 return rc; 761 } 762 763 /* 764 * rpcrdma_ep_destroy 765 * 766 * Disconnect and destroy endpoint. After this, the only 767 * valid operations on the ep are to free it (if dynamically 768 * allocated) or re-create it. 769 */ 770 void 771 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 772 { 773 int rc; 774 775 dprintk("RPC: %s: entering, connected is %d\n", 776 __func__, ep->rep_connected); 777 778 cancel_delayed_work_sync(&ep->rep_connect_worker); 779 780 if (ia->ri_id->qp) { 781 rpcrdma_ep_disconnect(ep, ia); 782 rdma_destroy_qp(ia->ri_id); 783 ia->ri_id->qp = NULL; 784 } 785 786 rpcrdma_free_regbuf(ia, ep->rep_padbuf); 787 788 rpcrdma_clean_cq(ep->rep_attr.recv_cq); 789 rc = ib_destroy_cq(ep->rep_attr.recv_cq); 790 if (rc) 791 dprintk("RPC: %s: ib_destroy_cq returned %i\n", 792 __func__, rc); 793 794 rpcrdma_clean_cq(ep->rep_attr.send_cq); 795 rc = ib_destroy_cq(ep->rep_attr.send_cq); 796 if (rc) 797 dprintk("RPC: %s: ib_destroy_cq returned %i\n", 798 __func__, rc); 799 } 800 801 /* 802 * Connect unconnected endpoint. 803 */ 804 int 805 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 806 { 807 struct rdma_cm_id *id, *old; 808 int rc = 0; 809 int retry_count = 0; 810 811 if (ep->rep_connected != 0) { 812 struct rpcrdma_xprt *xprt; 813 retry: 814 dprintk("RPC: %s: reconnecting...\n", __func__); 815 816 rpcrdma_ep_disconnect(ep, ia); 817 rpcrdma_flush_cqs(ep); 818 819 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 820 ia->ri_ops->ro_reset(xprt); 821 822 id = rpcrdma_create_id(xprt, ia, 823 (struct sockaddr *)&xprt->rx_data.addr); 824 if (IS_ERR(id)) { 825 rc = -EHOSTUNREACH; 826 goto out; 827 } 828 /* TEMP TEMP TEMP - fail if new device: 829 * Deregister/remarshal *all* requests! 830 * Close and recreate adapter, pd, etc! 831 * Re-determine all attributes still sane! 832 * More stuff I haven't thought of! 833 * Rrrgh! 834 */ 835 if (ia->ri_id->device != id->device) { 836 printk("RPC: %s: can't reconnect on " 837 "different device!\n", __func__); 838 rdma_destroy_id(id); 839 rc = -ENETUNREACH; 840 goto out; 841 } 842 /* END TEMP */ 843 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 844 if (rc) { 845 dprintk("RPC: %s: rdma_create_qp failed %i\n", 846 __func__, rc); 847 rdma_destroy_id(id); 848 rc = -ENETUNREACH; 849 goto out; 850 } 851 852 write_lock(&ia->ri_qplock); 853 old = ia->ri_id; 854 ia->ri_id = id; 855 write_unlock(&ia->ri_qplock); 856 857 rdma_destroy_qp(old); 858 rdma_destroy_id(old); 859 } else { 860 dprintk("RPC: %s: connecting...\n", __func__); 861 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 862 if (rc) { 863 dprintk("RPC: %s: rdma_create_qp failed %i\n", 864 __func__, rc); 865 /* do not update ep->rep_connected */ 866 return -ENETUNREACH; 867 } 868 } 869 870 ep->rep_connected = 0; 871 872 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 873 if (rc) { 874 dprintk("RPC: %s: rdma_connect() failed with %i\n", 875 __func__, rc); 876 goto out; 877 } 878 879 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 880 881 /* 882 * Check state. A non-peer reject indicates no listener 883 * (ECONNREFUSED), which may be a transient state. All 884 * others indicate a transport condition which has already 885 * undergone a best-effort. 886 */ 887 if (ep->rep_connected == -ECONNREFUSED && 888 ++retry_count <= RDMA_CONNECT_RETRY_MAX) { 889 dprintk("RPC: %s: non-peer_reject, retry\n", __func__); 890 goto retry; 891 } 892 if (ep->rep_connected <= 0) { 893 /* Sometimes, the only way to reliably connect to remote 894 * CMs is to use same nonzero values for ORD and IRD. */ 895 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 && 896 (ep->rep_remote_cma.responder_resources == 0 || 897 ep->rep_remote_cma.initiator_depth != 898 ep->rep_remote_cma.responder_resources)) { 899 if (ep->rep_remote_cma.responder_resources == 0) 900 ep->rep_remote_cma.responder_resources = 1; 901 ep->rep_remote_cma.initiator_depth = 902 ep->rep_remote_cma.responder_resources; 903 goto retry; 904 } 905 rc = ep->rep_connected; 906 } else { 907 dprintk("RPC: %s: connected\n", __func__); 908 } 909 910 out: 911 if (rc) 912 ep->rep_connected = rc; 913 return rc; 914 } 915 916 /* 917 * rpcrdma_ep_disconnect 918 * 919 * This is separate from destroy to facilitate the ability 920 * to reconnect without recreating the endpoint. 921 * 922 * This call is not reentrant, and must not be made in parallel 923 * on the same endpoint. 924 */ 925 void 926 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 927 { 928 int rc; 929 930 rpcrdma_flush_cqs(ep); 931 rc = rdma_disconnect(ia->ri_id); 932 if (!rc) { 933 /* returns without wait if not connected */ 934 wait_event_interruptible(ep->rep_connect_wait, 935 ep->rep_connected != 1); 936 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 937 (ep->rep_connected == 1) ? "still " : "dis"); 938 } else { 939 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 940 ep->rep_connected = rc; 941 } 942 } 943 944 static struct rpcrdma_req * 945 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 946 { 947 struct rpcrdma_req *req; 948 949 req = kzalloc(sizeof(*req), GFP_KERNEL); 950 if (req == NULL) 951 return ERR_PTR(-ENOMEM); 952 953 req->rl_buffer = &r_xprt->rx_buf; 954 return req; 955 } 956 957 static struct rpcrdma_rep * 958 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 959 { 960 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 961 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 962 struct rpcrdma_rep *rep; 963 int rc; 964 965 rc = -ENOMEM; 966 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 967 if (rep == NULL) 968 goto out; 969 970 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize, 971 GFP_KERNEL); 972 if (IS_ERR(rep->rr_rdmabuf)) { 973 rc = PTR_ERR(rep->rr_rdmabuf); 974 goto out_free; 975 } 976 977 rep->rr_buffer = &r_xprt->rx_buf; 978 return rep; 979 980 out_free: 981 kfree(rep); 982 out: 983 return ERR_PTR(rc); 984 } 985 986 int 987 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 988 { 989 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 990 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 991 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 992 char *p; 993 size_t len; 994 int i, rc; 995 996 buf->rb_max_requests = cdata->max_requests; 997 spin_lock_init(&buf->rb_lock); 998 999 /* Need to allocate: 1000 * 1. arrays for send and recv pointers 1001 * 2. arrays of struct rpcrdma_req to fill in pointers 1002 * 3. array of struct rpcrdma_rep for replies 1003 * Send/recv buffers in req/rep need to be registered 1004 */ 1005 len = buf->rb_max_requests * 1006 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *)); 1007 1008 p = kzalloc(len, GFP_KERNEL); 1009 if (p == NULL) { 1010 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n", 1011 __func__, len); 1012 rc = -ENOMEM; 1013 goto out; 1014 } 1015 buf->rb_pool = p; /* for freeing it later */ 1016 1017 buf->rb_send_bufs = (struct rpcrdma_req **) p; 1018 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests]; 1019 buf->rb_recv_bufs = (struct rpcrdma_rep **) p; 1020 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests]; 1021 1022 rc = ia->ri_ops->ro_init(r_xprt); 1023 if (rc) 1024 goto out; 1025 1026 for (i = 0; i < buf->rb_max_requests; i++) { 1027 struct rpcrdma_req *req; 1028 struct rpcrdma_rep *rep; 1029 1030 req = rpcrdma_create_req(r_xprt); 1031 if (IS_ERR(req)) { 1032 dprintk("RPC: %s: request buffer %d alloc" 1033 " failed\n", __func__, i); 1034 rc = PTR_ERR(req); 1035 goto out; 1036 } 1037 buf->rb_send_bufs[i] = req; 1038 1039 rep = rpcrdma_create_rep(r_xprt); 1040 if (IS_ERR(rep)) { 1041 dprintk("RPC: %s: reply buffer %d alloc failed\n", 1042 __func__, i); 1043 rc = PTR_ERR(rep); 1044 goto out; 1045 } 1046 buf->rb_recv_bufs[i] = rep; 1047 } 1048 1049 return 0; 1050 out: 1051 rpcrdma_buffer_destroy(buf); 1052 return rc; 1053 } 1054 1055 static void 1056 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) 1057 { 1058 if (!rep) 1059 return; 1060 1061 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); 1062 kfree(rep); 1063 } 1064 1065 static void 1066 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) 1067 { 1068 if (!req) 1069 return; 1070 1071 rpcrdma_free_regbuf(ia, req->rl_sendbuf); 1072 rpcrdma_free_regbuf(ia, req->rl_rdmabuf); 1073 kfree(req); 1074 } 1075 1076 void 1077 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1078 { 1079 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1080 int i; 1081 1082 /* clean up in reverse order from create 1083 * 1. recv mr memory (mr free, then kfree) 1084 * 2. send mr memory (mr free, then kfree) 1085 * 3. MWs 1086 */ 1087 dprintk("RPC: %s: entering\n", __func__); 1088 1089 for (i = 0; i < buf->rb_max_requests; i++) { 1090 if (buf->rb_recv_bufs) 1091 rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]); 1092 if (buf->rb_send_bufs) 1093 rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]); 1094 } 1095 1096 ia->ri_ops->ro_destroy(buf); 1097 1098 kfree(buf->rb_pool); 1099 } 1100 1101 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving 1102 * some req segments uninitialized. 1103 */ 1104 static void 1105 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf) 1106 { 1107 if (*mw) { 1108 list_add_tail(&(*mw)->mw_list, &buf->rb_mws); 1109 *mw = NULL; 1110 } 1111 } 1112 1113 /* Cycle mw's back in reverse order, and "spin" them. 1114 * This delays and scrambles reuse as much as possible. 1115 */ 1116 static void 1117 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) 1118 { 1119 struct rpcrdma_mr_seg *seg = req->rl_segments; 1120 struct rpcrdma_mr_seg *seg1 = seg; 1121 int i; 1122 1123 for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++) 1124 rpcrdma_buffer_put_mr(&seg->rl_mw, buf); 1125 rpcrdma_buffer_put_mr(&seg1->rl_mw, buf); 1126 } 1127 1128 static void 1129 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) 1130 { 1131 buf->rb_send_bufs[--buf->rb_send_index] = req; 1132 req->rl_niovs = 0; 1133 if (req->rl_reply) { 1134 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply; 1135 req->rl_reply->rr_func = NULL; 1136 req->rl_reply = NULL; 1137 } 1138 } 1139 1140 /* rpcrdma_unmap_one() was already done during deregistration. 1141 * Redo only the ib_post_send(). 1142 */ 1143 static void 1144 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia) 1145 { 1146 struct rpcrdma_xprt *r_xprt = 1147 container_of(ia, struct rpcrdma_xprt, rx_ia); 1148 struct ib_send_wr invalidate_wr, *bad_wr; 1149 int rc; 1150 1151 dprintk("RPC: %s: FRMR %p is stale\n", __func__, r); 1152 1153 /* When this FRMR is re-inserted into rb_mws, it is no longer stale */ 1154 r->r.frmr.fr_state = FRMR_IS_INVALID; 1155 1156 memset(&invalidate_wr, 0, sizeof(invalidate_wr)); 1157 invalidate_wr.wr_id = (unsigned long)(void *)r; 1158 invalidate_wr.opcode = IB_WR_LOCAL_INV; 1159 invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey; 1160 DECR_CQCOUNT(&r_xprt->rx_ep); 1161 1162 dprintk("RPC: %s: frmr %p invalidating rkey %08x\n", 1163 __func__, r, r->r.frmr.fr_mr->rkey); 1164 1165 read_lock(&ia->ri_qplock); 1166 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr); 1167 read_unlock(&ia->ri_qplock); 1168 if (rc) { 1169 /* Force rpcrdma_buffer_get() to retry */ 1170 r->r.frmr.fr_state = FRMR_IS_STALE; 1171 dprintk("RPC: %s: ib_post_send failed, %i\n", 1172 __func__, rc); 1173 } 1174 } 1175 1176 static void 1177 rpcrdma_retry_flushed_linv(struct list_head *stale, 1178 struct rpcrdma_buffer *buf) 1179 { 1180 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1181 struct list_head *pos; 1182 struct rpcrdma_mw *r; 1183 unsigned long flags; 1184 1185 list_for_each(pos, stale) { 1186 r = list_entry(pos, struct rpcrdma_mw, mw_list); 1187 rpcrdma_retry_local_inv(r, ia); 1188 } 1189 1190 spin_lock_irqsave(&buf->rb_lock, flags); 1191 list_splice_tail(stale, &buf->rb_mws); 1192 spin_unlock_irqrestore(&buf->rb_lock, flags); 1193 } 1194 1195 static struct rpcrdma_req * 1196 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf, 1197 struct list_head *stale) 1198 { 1199 struct rpcrdma_mw *r; 1200 int i; 1201 1202 i = RPCRDMA_MAX_SEGS - 1; 1203 while (!list_empty(&buf->rb_mws)) { 1204 r = list_entry(buf->rb_mws.next, 1205 struct rpcrdma_mw, mw_list); 1206 list_del(&r->mw_list); 1207 if (r->r.frmr.fr_state == FRMR_IS_STALE) { 1208 list_add(&r->mw_list, stale); 1209 continue; 1210 } 1211 req->rl_segments[i].rl_mw = r; 1212 if (unlikely(i-- == 0)) 1213 return req; /* Success */ 1214 } 1215 1216 /* Not enough entries on rb_mws for this req */ 1217 rpcrdma_buffer_put_sendbuf(req, buf); 1218 rpcrdma_buffer_put_mrs(req, buf); 1219 return NULL; 1220 } 1221 1222 static struct rpcrdma_req * 1223 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) 1224 { 1225 struct rpcrdma_mw *r; 1226 int i; 1227 1228 i = RPCRDMA_MAX_SEGS - 1; 1229 while (!list_empty(&buf->rb_mws)) { 1230 r = list_entry(buf->rb_mws.next, 1231 struct rpcrdma_mw, mw_list); 1232 list_del(&r->mw_list); 1233 req->rl_segments[i].rl_mw = r; 1234 if (unlikely(i-- == 0)) 1235 return req; /* Success */ 1236 } 1237 1238 /* Not enough entries on rb_mws for this req */ 1239 rpcrdma_buffer_put_sendbuf(req, buf); 1240 rpcrdma_buffer_put_mrs(req, buf); 1241 return NULL; 1242 } 1243 1244 /* 1245 * Get a set of request/reply buffers. 1246 * 1247 * Reply buffer (if needed) is attached to send buffer upon return. 1248 * Rule: 1249 * rb_send_index and rb_recv_index MUST always be pointing to the 1250 * *next* available buffer (non-NULL). They are incremented after 1251 * removing buffers, and decremented *before* returning them. 1252 */ 1253 struct rpcrdma_req * 1254 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1255 { 1256 struct rpcrdma_ia *ia = rdmab_to_ia(buffers); 1257 struct list_head stale; 1258 struct rpcrdma_req *req; 1259 unsigned long flags; 1260 1261 spin_lock_irqsave(&buffers->rb_lock, flags); 1262 if (buffers->rb_send_index == buffers->rb_max_requests) { 1263 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1264 dprintk("RPC: %s: out of request buffers\n", __func__); 1265 return ((struct rpcrdma_req *)NULL); 1266 } 1267 1268 req = buffers->rb_send_bufs[buffers->rb_send_index]; 1269 if (buffers->rb_send_index < buffers->rb_recv_index) { 1270 dprintk("RPC: %s: %d extra receives outstanding (ok)\n", 1271 __func__, 1272 buffers->rb_recv_index - buffers->rb_send_index); 1273 req->rl_reply = NULL; 1274 } else { 1275 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; 1276 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; 1277 } 1278 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL; 1279 1280 INIT_LIST_HEAD(&stale); 1281 switch (ia->ri_memreg_strategy) { 1282 case RPCRDMA_FRMR: 1283 req = rpcrdma_buffer_get_frmrs(req, buffers, &stale); 1284 break; 1285 case RPCRDMA_MTHCAFMR: 1286 req = rpcrdma_buffer_get_fmrs(req, buffers); 1287 break; 1288 default: 1289 break; 1290 } 1291 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1292 if (!list_empty(&stale)) 1293 rpcrdma_retry_flushed_linv(&stale, buffers); 1294 return req; 1295 } 1296 1297 /* 1298 * Put request/reply buffers back into pool. 1299 * Pre-decrement counter/array index. 1300 */ 1301 void 1302 rpcrdma_buffer_put(struct rpcrdma_req *req) 1303 { 1304 struct rpcrdma_buffer *buffers = req->rl_buffer; 1305 struct rpcrdma_ia *ia = rdmab_to_ia(buffers); 1306 unsigned long flags; 1307 1308 spin_lock_irqsave(&buffers->rb_lock, flags); 1309 rpcrdma_buffer_put_sendbuf(req, buffers); 1310 switch (ia->ri_memreg_strategy) { 1311 case RPCRDMA_FRMR: 1312 case RPCRDMA_MTHCAFMR: 1313 rpcrdma_buffer_put_mrs(req, buffers); 1314 break; 1315 default: 1316 break; 1317 } 1318 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1319 } 1320 1321 /* 1322 * Recover reply buffers from pool. 1323 * This happens when recovering from error conditions. 1324 * Post-increment counter/array index. 1325 */ 1326 void 1327 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1328 { 1329 struct rpcrdma_buffer *buffers = req->rl_buffer; 1330 unsigned long flags; 1331 1332 spin_lock_irqsave(&buffers->rb_lock, flags); 1333 if (buffers->rb_recv_index < buffers->rb_max_requests) { 1334 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; 1335 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; 1336 } 1337 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1338 } 1339 1340 /* 1341 * Put reply buffers back into pool when not attached to 1342 * request. This happens in error conditions. 1343 */ 1344 void 1345 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1346 { 1347 struct rpcrdma_buffer *buffers = rep->rr_buffer; 1348 unsigned long flags; 1349 1350 rep->rr_func = NULL; 1351 spin_lock_irqsave(&buffers->rb_lock, flags); 1352 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep; 1353 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1354 } 1355 1356 /* 1357 * Wrappers for internal-use kmalloc memory registration, used by buffer code. 1358 */ 1359 1360 void 1361 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg) 1362 { 1363 dprintk("RPC: map_one: offset %p iova %llx len %zu\n", 1364 seg->mr_offset, 1365 (unsigned long long)seg->mr_dma, seg->mr_dmalen); 1366 } 1367 1368 static int 1369 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len, 1370 struct ib_mr **mrp, struct ib_sge *iov) 1371 { 1372 struct ib_phys_buf ipb; 1373 struct ib_mr *mr; 1374 int rc; 1375 1376 /* 1377 * All memory passed here was kmalloc'ed, therefore phys-contiguous. 1378 */ 1379 iov->addr = ib_dma_map_single(ia->ri_id->device, 1380 va, len, DMA_BIDIRECTIONAL); 1381 if (ib_dma_mapping_error(ia->ri_id->device, iov->addr)) 1382 return -ENOMEM; 1383 1384 iov->length = len; 1385 1386 if (ia->ri_have_dma_lkey) { 1387 *mrp = NULL; 1388 iov->lkey = ia->ri_dma_lkey; 1389 return 0; 1390 } else if (ia->ri_bind_mem != NULL) { 1391 *mrp = NULL; 1392 iov->lkey = ia->ri_bind_mem->lkey; 1393 return 0; 1394 } 1395 1396 ipb.addr = iov->addr; 1397 ipb.size = iov->length; 1398 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1, 1399 IB_ACCESS_LOCAL_WRITE, &iov->addr); 1400 1401 dprintk("RPC: %s: phys convert: 0x%llx " 1402 "registered 0x%llx length %d\n", 1403 __func__, (unsigned long long)ipb.addr, 1404 (unsigned long long)iov->addr, len); 1405 1406 if (IS_ERR(mr)) { 1407 *mrp = NULL; 1408 rc = PTR_ERR(mr); 1409 dprintk("RPC: %s: failed with %i\n", __func__, rc); 1410 } else { 1411 *mrp = mr; 1412 iov->lkey = mr->lkey; 1413 rc = 0; 1414 } 1415 1416 return rc; 1417 } 1418 1419 static int 1420 rpcrdma_deregister_internal(struct rpcrdma_ia *ia, 1421 struct ib_mr *mr, struct ib_sge *iov) 1422 { 1423 int rc; 1424 1425 ib_dma_unmap_single(ia->ri_id->device, 1426 iov->addr, iov->length, DMA_BIDIRECTIONAL); 1427 1428 if (NULL == mr) 1429 return 0; 1430 1431 rc = ib_dereg_mr(mr); 1432 if (rc) 1433 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc); 1434 return rc; 1435 } 1436 1437 /** 1438 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers 1439 * @ia: controlling rpcrdma_ia 1440 * @size: size of buffer to be allocated, in bytes 1441 * @flags: GFP flags 1442 * 1443 * Returns pointer to private header of an area of internally 1444 * registered memory, or an ERR_PTR. The registered buffer follows 1445 * the end of the private header. 1446 * 1447 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1448 * receiving the payload of RDMA RECV operations. regbufs are not 1449 * used for RDMA READ/WRITE operations, thus are registered only for 1450 * LOCAL access. 1451 */ 1452 struct rpcrdma_regbuf * 1453 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags) 1454 { 1455 struct rpcrdma_regbuf *rb; 1456 int rc; 1457 1458 rc = -ENOMEM; 1459 rb = kmalloc(sizeof(*rb) + size, flags); 1460 if (rb == NULL) 1461 goto out; 1462 1463 rb->rg_size = size; 1464 rb->rg_owner = NULL; 1465 rc = rpcrdma_register_internal(ia, rb->rg_base, size, 1466 &rb->rg_mr, &rb->rg_iov); 1467 if (rc) 1468 goto out_free; 1469 1470 return rb; 1471 1472 out_free: 1473 kfree(rb); 1474 out: 1475 return ERR_PTR(rc); 1476 } 1477 1478 /** 1479 * rpcrdma_free_regbuf - deregister and free registered buffer 1480 * @ia: controlling rpcrdma_ia 1481 * @rb: regbuf to be deregistered and freed 1482 */ 1483 void 1484 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1485 { 1486 if (rb) { 1487 rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov); 1488 kfree(rb); 1489 } 1490 } 1491 1492 /* 1493 * Prepost any receive buffer, then post send. 1494 * 1495 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1496 */ 1497 int 1498 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1499 struct rpcrdma_ep *ep, 1500 struct rpcrdma_req *req) 1501 { 1502 struct ib_send_wr send_wr, *send_wr_fail; 1503 struct rpcrdma_rep *rep = req->rl_reply; 1504 int rc; 1505 1506 if (rep) { 1507 rc = rpcrdma_ep_post_recv(ia, ep, rep); 1508 if (rc) 1509 goto out; 1510 req->rl_reply = NULL; 1511 } 1512 1513 send_wr.next = NULL; 1514 send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION; 1515 send_wr.sg_list = req->rl_send_iov; 1516 send_wr.num_sge = req->rl_niovs; 1517 send_wr.opcode = IB_WR_SEND; 1518 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */ 1519 ib_dma_sync_single_for_device(ia->ri_id->device, 1520 req->rl_send_iov[3].addr, req->rl_send_iov[3].length, 1521 DMA_TO_DEVICE); 1522 ib_dma_sync_single_for_device(ia->ri_id->device, 1523 req->rl_send_iov[1].addr, req->rl_send_iov[1].length, 1524 DMA_TO_DEVICE); 1525 ib_dma_sync_single_for_device(ia->ri_id->device, 1526 req->rl_send_iov[0].addr, req->rl_send_iov[0].length, 1527 DMA_TO_DEVICE); 1528 1529 if (DECR_CQCOUNT(ep) > 0) 1530 send_wr.send_flags = 0; 1531 else { /* Provider must take a send completion every now and then */ 1532 INIT_CQCOUNT(ep); 1533 send_wr.send_flags = IB_SEND_SIGNALED; 1534 } 1535 1536 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail); 1537 if (rc) 1538 dprintk("RPC: %s: ib_post_send returned %i\n", __func__, 1539 rc); 1540 out: 1541 return rc; 1542 } 1543 1544 /* 1545 * (Re)post a receive buffer. 1546 */ 1547 int 1548 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1549 struct rpcrdma_ep *ep, 1550 struct rpcrdma_rep *rep) 1551 { 1552 struct ib_recv_wr recv_wr, *recv_wr_fail; 1553 int rc; 1554 1555 recv_wr.next = NULL; 1556 recv_wr.wr_id = (u64) (unsigned long) rep; 1557 recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1558 recv_wr.num_sge = 1; 1559 1560 ib_dma_sync_single_for_cpu(ia->ri_id->device, 1561 rdmab_addr(rep->rr_rdmabuf), 1562 rdmab_length(rep->rr_rdmabuf), 1563 DMA_BIDIRECTIONAL); 1564 1565 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); 1566 1567 if (rc) 1568 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__, 1569 rc); 1570 return rc; 1571 } 1572 1573 /* How many chunk list items fit within our inline buffers? 1574 */ 1575 unsigned int 1576 rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt) 1577 { 1578 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 1579 int bytes, segments; 1580 1581 bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize); 1582 bytes -= RPCRDMA_HDRLEN_MIN; 1583 if (bytes < sizeof(struct rpcrdma_segment) * 2) { 1584 pr_warn("RPC: %s: inline threshold too small\n", 1585 __func__); 1586 return 0; 1587 } 1588 1589 segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1); 1590 dprintk("RPC: %s: max chunk list size = %d segments\n", 1591 __func__, segments); 1592 return segments; 1593 } 1594