1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * transport.c 42 * 43 * This file contains the top-level implementation of an RPC RDMA 44 * transport. 45 * 46 * Naming convention: functions beginning with xprt_ are part of the 47 * transport switch. All others are RPC RDMA internal. 48 */ 49 50 #include <linux/module.h> 51 #include <linux/init.h> 52 #include <linux/seq_file.h> 53 54 #include "xprt_rdma.h" 55 56 #ifdef RPC_DEBUG 57 # define RPCDBG_FACILITY RPCDBG_TRANS 58 #endif 59 60 MODULE_LICENSE("Dual BSD/GPL"); 61 62 MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS"); 63 MODULE_AUTHOR("Network Appliance, Inc."); 64 65 /* 66 * tunables 67 */ 68 69 static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE; 70 static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE; 71 static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE; 72 static unsigned int xprt_rdma_inline_write_padding; 73 static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR; 74 int xprt_rdma_pad_optimize = 0; 75 76 #ifdef RPC_DEBUG 77 78 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE; 79 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE; 80 static unsigned int zero; 81 static unsigned int max_padding = PAGE_SIZE; 82 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS; 83 static unsigned int max_memreg = RPCRDMA_LAST - 1; 84 85 static struct ctl_table_header *sunrpc_table_header; 86 87 static ctl_table xr_tunables_table[] = { 88 { 89 .procname = "rdma_slot_table_entries", 90 .data = &xprt_rdma_slot_table_entries, 91 .maxlen = sizeof(unsigned int), 92 .mode = 0644, 93 .proc_handler = proc_dointvec_minmax, 94 .extra1 = &min_slot_table_size, 95 .extra2 = &max_slot_table_size 96 }, 97 { 98 .procname = "rdma_max_inline_read", 99 .data = &xprt_rdma_max_inline_read, 100 .maxlen = sizeof(unsigned int), 101 .mode = 0644, 102 .proc_handler = proc_dointvec, 103 }, 104 { 105 .procname = "rdma_max_inline_write", 106 .data = &xprt_rdma_max_inline_write, 107 .maxlen = sizeof(unsigned int), 108 .mode = 0644, 109 .proc_handler = proc_dointvec, 110 }, 111 { 112 .procname = "rdma_inline_write_padding", 113 .data = &xprt_rdma_inline_write_padding, 114 .maxlen = sizeof(unsigned int), 115 .mode = 0644, 116 .proc_handler = proc_dointvec_minmax, 117 .extra1 = &zero, 118 .extra2 = &max_padding, 119 }, 120 { 121 .procname = "rdma_memreg_strategy", 122 .data = &xprt_rdma_memreg_strategy, 123 .maxlen = sizeof(unsigned int), 124 .mode = 0644, 125 .proc_handler = proc_dointvec_minmax, 126 .extra1 = &min_memreg, 127 .extra2 = &max_memreg, 128 }, 129 { 130 .procname = "rdma_pad_optimize", 131 .data = &xprt_rdma_pad_optimize, 132 .maxlen = sizeof(unsigned int), 133 .mode = 0644, 134 .proc_handler = proc_dointvec, 135 }, 136 { }, 137 }; 138 139 static ctl_table sunrpc_table[] = { 140 { 141 .procname = "sunrpc", 142 .mode = 0555, 143 .child = xr_tunables_table 144 }, 145 { }, 146 }; 147 148 #endif 149 150 static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */ 151 152 static void 153 xprt_rdma_format_addresses(struct rpc_xprt *xprt) 154 { 155 struct sockaddr *sap = (struct sockaddr *) 156 &rpcx_to_rdmad(xprt).addr; 157 struct sockaddr_in *sin = (struct sockaddr_in *)sap; 158 char buf[64]; 159 160 (void)rpc_ntop(sap, buf, sizeof(buf)); 161 xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL); 162 163 (void)snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap)); 164 xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL); 165 166 xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma"; 167 168 (void)snprintf(buf, sizeof(buf), "%02x%02x%02x%02x", 169 NIPQUAD(sin->sin_addr.s_addr)); 170 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); 171 172 (void)snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap)); 173 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); 174 175 /* netid */ 176 xprt->address_strings[RPC_DISPLAY_NETID] = "rdma"; 177 } 178 179 static void 180 xprt_rdma_free_addresses(struct rpc_xprt *xprt) 181 { 182 unsigned int i; 183 184 for (i = 0; i < RPC_DISPLAY_MAX; i++) 185 switch (i) { 186 case RPC_DISPLAY_PROTO: 187 case RPC_DISPLAY_NETID: 188 continue; 189 default: 190 kfree(xprt->address_strings[i]); 191 } 192 } 193 194 static void 195 xprt_rdma_connect_worker(struct work_struct *work) 196 { 197 struct rpcrdma_xprt *r_xprt = 198 container_of(work, struct rpcrdma_xprt, rdma_connect.work); 199 struct rpc_xprt *xprt = &r_xprt->xprt; 200 int rc = 0; 201 202 if (!xprt->shutdown) { 203 xprt_clear_connected(xprt); 204 205 dprintk("RPC: %s: %sconnect\n", __func__, 206 r_xprt->rx_ep.rep_connected != 0 ? "re" : ""); 207 rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia); 208 if (rc) 209 goto out; 210 } 211 goto out_clear; 212 213 out: 214 xprt_wake_pending_tasks(xprt, rc); 215 216 out_clear: 217 dprintk("RPC: %s: exit\n", __func__); 218 xprt_clear_connecting(xprt); 219 } 220 221 /* 222 * xprt_rdma_destroy 223 * 224 * Destroy the xprt. 225 * Free all memory associated with the object, including its own. 226 * NOTE: none of the *destroy methods free memory for their top-level 227 * objects, even though they may have allocated it (they do free 228 * private memory). It's up to the caller to handle it. In this 229 * case (RDMA transport), all structure memory is inlined with the 230 * struct rpcrdma_xprt. 231 */ 232 static void 233 xprt_rdma_destroy(struct rpc_xprt *xprt) 234 { 235 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 236 int rc; 237 238 dprintk("RPC: %s: called\n", __func__); 239 240 cancel_delayed_work(&r_xprt->rdma_connect); 241 flush_scheduled_work(); 242 243 xprt_clear_connected(xprt); 244 245 rpcrdma_buffer_destroy(&r_xprt->rx_buf); 246 rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia); 247 if (rc) 248 dprintk("RPC: %s: rpcrdma_ep_destroy returned %i\n", 249 __func__, rc); 250 rpcrdma_ia_close(&r_xprt->rx_ia); 251 252 xprt_rdma_free_addresses(xprt); 253 254 kfree(xprt->slot); 255 xprt->slot = NULL; 256 kfree(xprt); 257 258 dprintk("RPC: %s: returning\n", __func__); 259 260 module_put(THIS_MODULE); 261 } 262 263 static const struct rpc_timeout xprt_rdma_default_timeout = { 264 .to_initval = 60 * HZ, 265 .to_maxval = 60 * HZ, 266 }; 267 268 /** 269 * xprt_setup_rdma - Set up transport to use RDMA 270 * 271 * @args: rpc transport arguments 272 */ 273 static struct rpc_xprt * 274 xprt_setup_rdma(struct xprt_create *args) 275 { 276 struct rpcrdma_create_data_internal cdata; 277 struct rpc_xprt *xprt; 278 struct rpcrdma_xprt *new_xprt; 279 struct rpcrdma_ep *new_ep; 280 struct sockaddr_in *sin; 281 int rc; 282 283 if (args->addrlen > sizeof(xprt->addr)) { 284 dprintk("RPC: %s: address too large\n", __func__); 285 return ERR_PTR(-EBADF); 286 } 287 288 xprt = kzalloc(sizeof(struct rpcrdma_xprt), GFP_KERNEL); 289 if (xprt == NULL) { 290 dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n", 291 __func__); 292 return ERR_PTR(-ENOMEM); 293 } 294 295 xprt->max_reqs = xprt_rdma_slot_table_entries; 296 xprt->slot = kcalloc(xprt->max_reqs, 297 sizeof(struct rpc_rqst), GFP_KERNEL); 298 if (xprt->slot == NULL) { 299 dprintk("RPC: %s: couldn't allocate %d slots\n", 300 __func__, xprt->max_reqs); 301 kfree(xprt); 302 return ERR_PTR(-ENOMEM); 303 } 304 305 /* 60 second timeout, no retries */ 306 xprt->timeout = &xprt_rdma_default_timeout; 307 xprt->bind_timeout = (60U * HZ); 308 xprt->connect_timeout = (60U * HZ); 309 xprt->reestablish_timeout = (5U * HZ); 310 xprt->idle_timeout = (5U * 60 * HZ); 311 312 xprt->resvport = 0; /* privileged port not needed */ 313 xprt->tsh_size = 0; /* RPC-RDMA handles framing */ 314 xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE; 315 xprt->ops = &xprt_rdma_procs; 316 317 /* 318 * Set up RDMA-specific connect data. 319 */ 320 321 /* Put server RDMA address in local cdata */ 322 memcpy(&cdata.addr, args->dstaddr, args->addrlen); 323 324 /* Ensure xprt->addr holds valid server TCP (not RDMA) 325 * address, for any side protocols which peek at it */ 326 xprt->prot = IPPROTO_TCP; 327 xprt->addrlen = args->addrlen; 328 memcpy(&xprt->addr, &cdata.addr, xprt->addrlen); 329 330 sin = (struct sockaddr_in *)&cdata.addr; 331 if (ntohs(sin->sin_port) != 0) 332 xprt_set_bound(xprt); 333 334 dprintk("RPC: %s: %pI4:%u\n", 335 __func__, &sin->sin_addr.s_addr, ntohs(sin->sin_port)); 336 337 /* Set max requests */ 338 cdata.max_requests = xprt->max_reqs; 339 340 /* Set some length limits */ 341 cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */ 342 cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */ 343 344 cdata.inline_wsize = xprt_rdma_max_inline_write; 345 if (cdata.inline_wsize > cdata.wsize) 346 cdata.inline_wsize = cdata.wsize; 347 348 cdata.inline_rsize = xprt_rdma_max_inline_read; 349 if (cdata.inline_rsize > cdata.rsize) 350 cdata.inline_rsize = cdata.rsize; 351 352 cdata.padding = xprt_rdma_inline_write_padding; 353 354 /* 355 * Create new transport instance, which includes initialized 356 * o ia 357 * o endpoint 358 * o buffers 359 */ 360 361 new_xprt = rpcx_to_rdmax(xprt); 362 363 rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr, 364 xprt_rdma_memreg_strategy); 365 if (rc) 366 goto out1; 367 368 /* 369 * initialize and create ep 370 */ 371 new_xprt->rx_data = cdata; 372 new_ep = &new_xprt->rx_ep; 373 new_ep->rep_remote_addr = cdata.addr; 374 375 rc = rpcrdma_ep_create(&new_xprt->rx_ep, 376 &new_xprt->rx_ia, &new_xprt->rx_data); 377 if (rc) 378 goto out2; 379 380 /* 381 * Allocate pre-registered send and receive buffers for headers and 382 * any inline data. Also specify any padding which will be provided 383 * from a preregistered zero buffer. 384 */ 385 rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia, 386 &new_xprt->rx_data); 387 if (rc) 388 goto out3; 389 390 /* 391 * Register a callback for connection events. This is necessary because 392 * connection loss notification is async. We also catch connection loss 393 * when reaping receives. 394 */ 395 INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker); 396 new_ep->rep_func = rpcrdma_conn_func; 397 new_ep->rep_xprt = xprt; 398 399 xprt_rdma_format_addresses(xprt); 400 401 if (!try_module_get(THIS_MODULE)) 402 goto out4; 403 404 return xprt; 405 406 out4: 407 xprt_rdma_free_addresses(xprt); 408 rc = -EINVAL; 409 out3: 410 (void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia); 411 out2: 412 rpcrdma_ia_close(&new_xprt->rx_ia); 413 out1: 414 kfree(xprt->slot); 415 kfree(xprt); 416 return ERR_PTR(rc); 417 } 418 419 /* 420 * Close a connection, during shutdown or timeout/reconnect 421 */ 422 static void 423 xprt_rdma_close(struct rpc_xprt *xprt) 424 { 425 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 426 427 dprintk("RPC: %s: closing\n", __func__); 428 if (r_xprt->rx_ep.rep_connected > 0) 429 xprt->reestablish_timeout = 0; 430 xprt_disconnect_done(xprt); 431 (void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia); 432 } 433 434 static void 435 xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port) 436 { 437 struct sockaddr_in *sap; 438 439 sap = (struct sockaddr_in *)&xprt->addr; 440 sap->sin_port = htons(port); 441 sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr; 442 sap->sin_port = htons(port); 443 dprintk("RPC: %s: %u\n", __func__, port); 444 } 445 446 static void 447 xprt_rdma_connect(struct rpc_task *task) 448 { 449 struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt; 450 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 451 452 if (!xprt_test_and_set_connecting(xprt)) { 453 if (r_xprt->rx_ep.rep_connected != 0) { 454 /* Reconnect */ 455 schedule_delayed_work(&r_xprt->rdma_connect, 456 xprt->reestablish_timeout); 457 xprt->reestablish_timeout <<= 1; 458 if (xprt->reestablish_timeout > (30 * HZ)) 459 xprt->reestablish_timeout = (30 * HZ); 460 else if (xprt->reestablish_timeout < (5 * HZ)) 461 xprt->reestablish_timeout = (5 * HZ); 462 } else { 463 schedule_delayed_work(&r_xprt->rdma_connect, 0); 464 if (!RPC_IS_ASYNC(task)) 465 flush_scheduled_work(); 466 } 467 } 468 } 469 470 static int 471 xprt_rdma_reserve_xprt(struct rpc_task *task) 472 { 473 struct rpc_xprt *xprt = task->tk_xprt; 474 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 475 int credits = atomic_read(&r_xprt->rx_buf.rb_credits); 476 477 /* == RPC_CWNDSCALE @ init, but *after* setup */ 478 if (r_xprt->rx_buf.rb_cwndscale == 0UL) { 479 r_xprt->rx_buf.rb_cwndscale = xprt->cwnd; 480 dprintk("RPC: %s: cwndscale %lu\n", __func__, 481 r_xprt->rx_buf.rb_cwndscale); 482 BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0); 483 } 484 xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale; 485 return xprt_reserve_xprt_cong(task); 486 } 487 488 /* 489 * The RDMA allocate/free functions need the task structure as a place 490 * to hide the struct rpcrdma_req, which is necessary for the actual send/recv 491 * sequence. For this reason, the recv buffers are attached to send 492 * buffers for portions of the RPC. Note that the RPC layer allocates 493 * both send and receive buffers in the same call. We may register 494 * the receive buffer portion when using reply chunks. 495 */ 496 static void * 497 xprt_rdma_allocate(struct rpc_task *task, size_t size) 498 { 499 struct rpc_xprt *xprt = task->tk_xprt; 500 struct rpcrdma_req *req, *nreq; 501 502 req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf); 503 BUG_ON(NULL == req); 504 505 if (size > req->rl_size) { 506 dprintk("RPC: %s: size %zd too large for buffer[%zd]: " 507 "prog %d vers %d proc %d\n", 508 __func__, size, req->rl_size, 509 task->tk_client->cl_prog, task->tk_client->cl_vers, 510 task->tk_msg.rpc_proc->p_proc); 511 /* 512 * Outgoing length shortage. Our inline write max must have 513 * been configured to perform direct i/o. 514 * 515 * This is therefore a large metadata operation, and the 516 * allocate call was made on the maximum possible message, 517 * e.g. containing long filename(s) or symlink data. In 518 * fact, while these metadata operations *might* carry 519 * large outgoing payloads, they rarely *do*. However, we 520 * have to commit to the request here, so reallocate and 521 * register it now. The data path will never require this 522 * reallocation. 523 * 524 * If the allocation or registration fails, the RPC framework 525 * will (doggedly) retry. 526 */ 527 if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy == 528 RPCRDMA_BOUNCEBUFFERS) { 529 /* forced to "pure inline" */ 530 dprintk("RPC: %s: too much data (%zd) for inline " 531 "(r/w max %d/%d)\n", __func__, size, 532 rpcx_to_rdmad(xprt).inline_rsize, 533 rpcx_to_rdmad(xprt).inline_wsize); 534 size = req->rl_size; 535 rpc_exit(task, -EIO); /* fail the operation */ 536 rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; 537 goto out; 538 } 539 if (task->tk_flags & RPC_TASK_SWAPPER) 540 nreq = kmalloc(sizeof *req + size, GFP_ATOMIC); 541 else 542 nreq = kmalloc(sizeof *req + size, GFP_NOFS); 543 if (nreq == NULL) 544 goto outfail; 545 546 if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia, 547 nreq->rl_base, size + sizeof(struct rpcrdma_req) 548 - offsetof(struct rpcrdma_req, rl_base), 549 &nreq->rl_handle, &nreq->rl_iov)) { 550 kfree(nreq); 551 goto outfail; 552 } 553 rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size; 554 nreq->rl_size = size; 555 nreq->rl_niovs = 0; 556 nreq->rl_nchunks = 0; 557 nreq->rl_buffer = (struct rpcrdma_buffer *)req; 558 nreq->rl_reply = req->rl_reply; 559 memcpy(nreq->rl_segments, 560 req->rl_segments, sizeof nreq->rl_segments); 561 /* flag the swap with an unused field */ 562 nreq->rl_iov.length = 0; 563 req->rl_reply = NULL; 564 req = nreq; 565 } 566 dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); 567 out: 568 req->rl_connect_cookie = 0; /* our reserved value */ 569 return req->rl_xdr_buf; 570 571 outfail: 572 rpcrdma_buffer_put(req); 573 rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; 574 return NULL; 575 } 576 577 /* 578 * This function returns all RDMA resources to the pool. 579 */ 580 static void 581 xprt_rdma_free(void *buffer) 582 { 583 struct rpcrdma_req *req; 584 struct rpcrdma_xprt *r_xprt; 585 struct rpcrdma_rep *rep; 586 int i; 587 588 if (buffer == NULL) 589 return; 590 591 req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]); 592 if (req->rl_iov.length == 0) { /* see allocate above */ 593 r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer, 594 struct rpcrdma_xprt, rx_buf); 595 } else 596 r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf); 597 rep = req->rl_reply; 598 599 dprintk("RPC: %s: called on 0x%p%s\n", 600 __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : ""); 601 602 /* 603 * Finish the deregistration. When using mw bind, this was 604 * begun in rpcrdma_reply_handler(). In all other modes, we 605 * do it here, in thread context. The process is considered 606 * complete when the rr_func vector becomes NULL - this 607 * was put in place during rpcrdma_reply_handler() - the wait 608 * call below will not block if the dereg is "done". If 609 * interrupted, our framework will clean up. 610 */ 611 for (i = 0; req->rl_nchunks;) { 612 --req->rl_nchunks; 613 i += rpcrdma_deregister_external( 614 &req->rl_segments[i], r_xprt, NULL); 615 } 616 617 if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) { 618 rep->rr_func = NULL; /* abandon the callback */ 619 req->rl_reply = NULL; 620 } 621 622 if (req->rl_iov.length == 0) { /* see allocate above */ 623 struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer; 624 oreq->rl_reply = req->rl_reply; 625 (void) rpcrdma_deregister_internal(&r_xprt->rx_ia, 626 req->rl_handle, 627 &req->rl_iov); 628 kfree(req); 629 req = oreq; 630 } 631 632 /* Put back request+reply buffers */ 633 rpcrdma_buffer_put(req); 634 } 635 636 /* 637 * send_request invokes the meat of RPC RDMA. It must do the following: 638 * 1. Marshal the RPC request into an RPC RDMA request, which means 639 * putting a header in front of data, and creating IOVs for RDMA 640 * from those in the request. 641 * 2. In marshaling, detect opportunities for RDMA, and use them. 642 * 3. Post a recv message to set up asynch completion, then send 643 * the request (rpcrdma_ep_post). 644 * 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP). 645 */ 646 647 static int 648 xprt_rdma_send_request(struct rpc_task *task) 649 { 650 struct rpc_rqst *rqst = task->tk_rqstp; 651 struct rpc_xprt *xprt = task->tk_xprt; 652 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 653 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 654 655 /* marshal the send itself */ 656 if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) { 657 r_xprt->rx_stats.failed_marshal_count++; 658 dprintk("RPC: %s: rpcrdma_marshal_req failed\n", 659 __func__); 660 return -EIO; 661 } 662 663 if (req->rl_reply == NULL) /* e.g. reconnection */ 664 rpcrdma_recv_buffer_get(req); 665 666 if (req->rl_reply) { 667 req->rl_reply->rr_func = rpcrdma_reply_handler; 668 /* this need only be done once, but... */ 669 req->rl_reply->rr_xprt = xprt; 670 } 671 672 /* Must suppress retransmit to maintain credits */ 673 if (req->rl_connect_cookie == xprt->connect_cookie) 674 goto drop_connection; 675 req->rl_connect_cookie = xprt->connect_cookie; 676 677 if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) 678 goto drop_connection; 679 680 task->tk_bytes_sent += rqst->rq_snd_buf.len; 681 rqst->rq_bytes_sent = 0; 682 return 0; 683 684 drop_connection: 685 xprt_disconnect_done(xprt); 686 return -ENOTCONN; /* implies disconnect */ 687 } 688 689 static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 690 { 691 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 692 long idle_time = 0; 693 694 if (xprt_connected(xprt)) 695 idle_time = (long)(jiffies - xprt->last_used) / HZ; 696 697 seq_printf(seq, 698 "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu " 699 "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n", 700 701 0, /* need a local port? */ 702 xprt->stat.bind_count, 703 xprt->stat.connect_count, 704 xprt->stat.connect_time, 705 idle_time, 706 xprt->stat.sends, 707 xprt->stat.recvs, 708 xprt->stat.bad_xids, 709 xprt->stat.req_u, 710 xprt->stat.bklog_u, 711 712 r_xprt->rx_stats.read_chunk_count, 713 r_xprt->rx_stats.write_chunk_count, 714 r_xprt->rx_stats.reply_chunk_count, 715 r_xprt->rx_stats.total_rdma_request, 716 r_xprt->rx_stats.total_rdma_reply, 717 r_xprt->rx_stats.pullup_copy_count, 718 r_xprt->rx_stats.fixup_copy_count, 719 r_xprt->rx_stats.hardway_register_count, 720 r_xprt->rx_stats.failed_marshal_count, 721 r_xprt->rx_stats.bad_reply_count); 722 } 723 724 /* 725 * Plumbing for rpc transport switch and kernel module 726 */ 727 728 static struct rpc_xprt_ops xprt_rdma_procs = { 729 .reserve_xprt = xprt_rdma_reserve_xprt, 730 .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */ 731 .release_request = xprt_release_rqst_cong, /* ditto */ 732 .set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */ 733 .rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */ 734 .set_port = xprt_rdma_set_port, 735 .connect = xprt_rdma_connect, 736 .buf_alloc = xprt_rdma_allocate, 737 .buf_free = xprt_rdma_free, 738 .send_request = xprt_rdma_send_request, 739 .close = xprt_rdma_close, 740 .destroy = xprt_rdma_destroy, 741 .print_stats = xprt_rdma_print_stats 742 }; 743 744 static struct xprt_class xprt_rdma = { 745 .list = LIST_HEAD_INIT(xprt_rdma.list), 746 .name = "rdma", 747 .owner = THIS_MODULE, 748 .ident = XPRT_TRANSPORT_RDMA, 749 .setup = xprt_setup_rdma, 750 }; 751 752 static void __exit xprt_rdma_cleanup(void) 753 { 754 int rc; 755 756 dprintk(KERN_INFO "RPCRDMA Module Removed, deregister RPC RDMA transport\n"); 757 #ifdef RPC_DEBUG 758 if (sunrpc_table_header) { 759 unregister_sysctl_table(sunrpc_table_header); 760 sunrpc_table_header = NULL; 761 } 762 #endif 763 rc = xprt_unregister_transport(&xprt_rdma); 764 if (rc) 765 dprintk("RPC: %s: xprt_unregister returned %i\n", 766 __func__, rc); 767 } 768 769 static int __init xprt_rdma_init(void) 770 { 771 int rc; 772 773 rc = xprt_register_transport(&xprt_rdma); 774 775 if (rc) 776 return rc; 777 778 dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n"); 779 780 dprintk(KERN_INFO "Defaults:\n"); 781 dprintk(KERN_INFO "\tSlots %d\n" 782 "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n", 783 xprt_rdma_slot_table_entries, 784 xprt_rdma_max_inline_read, xprt_rdma_max_inline_write); 785 dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n", 786 xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy); 787 788 #ifdef RPC_DEBUG 789 if (!sunrpc_table_header) 790 sunrpc_table_header = register_sysctl_table(sunrpc_table); 791 #endif 792 return 0; 793 } 794 795 module_init(xprt_rdma_init); 796 module_exit(xprt_rdma_cleanup); 797