1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 /* 3 * Copyright (c) 2015-2018 Oracle. All rights reserved. 4 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 5 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 6 * 7 * This software is available to you under a choice of one of two 8 * licenses. You may choose to be licensed under the terms of the GNU 9 * General Public License (GPL) Version 2, available from the file 10 * COPYING in the main directory of this source tree, or the BSD-type 11 * license below: 12 * 13 * Redistribution and use in source and binary forms, with or without 14 * modification, are permitted provided that the following conditions 15 * are met: 16 * 17 * Redistributions of source code must retain the above copyright 18 * notice, this list of conditions and the following disclaimer. 19 * 20 * Redistributions in binary form must reproduce the above 21 * copyright notice, this list of conditions and the following 22 * disclaimer in the documentation and/or other materials provided 23 * with the distribution. 24 * 25 * Neither the name of the Network Appliance, Inc. nor the names of 26 * its contributors may be used to endorse or promote products 27 * derived from this software without specific prior written 28 * permission. 29 * 30 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 31 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 32 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 33 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 34 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 36 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 37 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 38 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 39 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 40 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 41 * 42 * Author: Tom Tucker <tom@opengridcomputing.com> 43 */ 44 45 #include <linux/interrupt.h> 46 #include <linux/sched.h> 47 #include <linux/slab.h> 48 #include <linux/spinlock.h> 49 #include <linux/workqueue.h> 50 #include <linux/export.h> 51 52 #include <rdma/ib_verbs.h> 53 #include <rdma/rdma_cm.h> 54 #include <rdma/rw.h> 55 56 #include <linux/sunrpc/addr.h> 57 #include <linux/sunrpc/debug.h> 58 #include <linux/sunrpc/svc_xprt.h> 59 #include <linux/sunrpc/svc_rdma.h> 60 61 #include "xprt_rdma.h" 62 #include <trace/events/rpcrdma.h> 63 64 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 65 66 static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, 67 struct net *net, int node); 68 static int svc_rdma_listen_handler(struct rdma_cm_id *cma_id, 69 struct rdma_cm_event *event); 70 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 71 struct net *net, 72 struct sockaddr *sa, int salen, 73 int flags); 74 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 75 static void svc_rdma_detach(struct svc_xprt *xprt); 76 static void svc_rdma_free(struct svc_xprt *xprt); 77 static int svc_rdma_has_wspace(struct svc_xprt *xprt); 78 static void svc_rdma_kill_temp_xprt(struct svc_xprt *); 79 80 static const struct svc_xprt_ops svc_rdma_ops = { 81 .xpo_create = svc_rdma_create, 82 .xpo_recvfrom = svc_rdma_recvfrom, 83 .xpo_sendto = svc_rdma_sendto, 84 .xpo_result_payload = svc_rdma_result_payload, 85 .xpo_release_ctxt = svc_rdma_release_ctxt, 86 .xpo_detach = svc_rdma_detach, 87 .xpo_free = svc_rdma_free, 88 .xpo_has_wspace = svc_rdma_has_wspace, 89 .xpo_accept = svc_rdma_accept, 90 .xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt, 91 }; 92 93 struct svc_xprt_class svc_rdma_class = { 94 .xcl_name = "rdma", 95 .xcl_owner = THIS_MODULE, 96 .xcl_ops = &svc_rdma_ops, 97 .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA, 98 .xcl_ident = XPRT_TRANSPORT_RDMA, 99 }; 100 101 /* QP event handler */ 102 static void qp_event_handler(struct ib_event *event, void *context) 103 { 104 struct svc_xprt *xprt = context; 105 106 trace_svcrdma_qp_error(event, (struct sockaddr *)&xprt->xpt_remote); 107 switch (event->event) { 108 /* These are considered benign events */ 109 case IB_EVENT_PATH_MIG: 110 case IB_EVENT_COMM_EST: 111 case IB_EVENT_SQ_DRAINED: 112 case IB_EVENT_QP_LAST_WQE_REACHED: 113 break; 114 115 /* These are considered fatal events */ 116 case IB_EVENT_PATH_MIG_ERR: 117 case IB_EVENT_QP_FATAL: 118 case IB_EVENT_QP_REQ_ERR: 119 case IB_EVENT_QP_ACCESS_ERR: 120 case IB_EVENT_DEVICE_FATAL: 121 default: 122 svc_xprt_deferred_close(xprt); 123 break; 124 } 125 } 126 127 static struct rdma_cm_id * 128 svc_rdma_create_listen_id(struct net *net, struct sockaddr *sap, 129 void *context) 130 { 131 struct rdma_cm_id *listen_id; 132 int ret; 133 134 listen_id = rdma_create_id(net, svc_rdma_listen_handler, context, 135 RDMA_PS_TCP, IB_QPT_RC); 136 if (IS_ERR(listen_id)) 137 return listen_id; 138 139 /* Allow both IPv4 and IPv6 sockets to bind a single port 140 * at the same time. 141 */ 142 #if IS_ENABLED(CONFIG_IPV6) 143 ret = rdma_set_afonly(listen_id, 1); 144 if (ret) 145 goto out_destroy; 146 #endif 147 ret = rdma_bind_addr(listen_id, sap); 148 if (ret) 149 goto out_destroy; 150 151 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 152 if (ret) 153 goto out_destroy; 154 155 return listen_id; 156 157 out_destroy: 158 rdma_destroy_id(listen_id); 159 return ERR_PTR(ret); 160 } 161 162 static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, 163 struct net *net, int node) 164 { 165 static struct lock_class_key svcrdma_rwctx_lock; 166 static struct lock_class_key svcrdma_sctx_lock; 167 static struct lock_class_key svcrdma_dto_lock; 168 struct svcxprt_rdma *cma_xprt; 169 170 cma_xprt = kzalloc_node(sizeof(*cma_xprt), GFP_KERNEL, node); 171 if (!cma_xprt) 172 return NULL; 173 174 svc_xprt_init(net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); 175 INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 176 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 177 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 178 init_llist_head(&cma_xprt->sc_send_ctxts); 179 init_llist_head(&cma_xprt->sc_recv_ctxts); 180 init_llist_head(&cma_xprt->sc_rw_ctxts); 181 init_waitqueue_head(&cma_xprt->sc_send_wait); 182 init_waitqueue_head(&cma_xprt->sc_sq_ticket_wait); 183 184 spin_lock_init(&cma_xprt->sc_lock); 185 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 186 lockdep_set_class(&cma_xprt->sc_rq_dto_lock, &svcrdma_dto_lock); 187 spin_lock_init(&cma_xprt->sc_send_lock); 188 lockdep_set_class(&cma_xprt->sc_send_lock, &svcrdma_sctx_lock); 189 spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); 190 lockdep_set_class(&cma_xprt->sc_rw_ctxt_lock, &svcrdma_rwctx_lock); 191 192 /* 193 * Note that this implies that the underlying transport support 194 * has some form of congestion control (see RFC 7530 section 3.1 195 * paragraph 2). For now, we assume that all supported RDMA 196 * transports are suitable here. 197 */ 198 set_bit(XPT_CONG_CTRL, &cma_xprt->sc_xprt.xpt_flags); 199 200 return cma_xprt; 201 } 202 203 static void 204 svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, 205 struct rdma_conn_param *param) 206 { 207 const struct rpcrdma_connect_private *pmsg = param->private_data; 208 209 if (pmsg && 210 pmsg->cp_magic == rpcrdma_cmp_magic && 211 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 212 newxprt->sc_snd_w_inv = pmsg->cp_flags & 213 RPCRDMA_CMP_F_SND_W_INV_OK; 214 215 dprintk("svcrdma: client send_size %u, recv_size %u " 216 "remote inv %ssupported\n", 217 rpcrdma_decode_buffer_size(pmsg->cp_send_size), 218 rpcrdma_decode_buffer_size(pmsg->cp_recv_size), 219 newxprt->sc_snd_w_inv ? "" : "un"); 220 } 221 } 222 223 /* 224 * This function handles the CONNECT_REQUEST event on a listening 225 * endpoint. It is passed the cma_id for the _new_ connection. The context in 226 * this cma_id is inherited from the listening cma_id and is the svc_xprt 227 * structure for the listening endpoint. 228 * 229 * This function creates a new xprt for the new connection and enqueues it on 230 * the accept queue for the listent xprt. When the listen thread is kicked, it 231 * will call the recvfrom method on the listen xprt which will accept the new 232 * connection. 233 */ 234 static void handle_connect_req(struct rdma_cm_id *new_cma_id, 235 struct rdma_conn_param *param) 236 { 237 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 238 struct svcxprt_rdma *newxprt; 239 struct sockaddr *sa; 240 241 newxprt = svc_rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 242 listen_xprt->sc_xprt.xpt_net, 243 ibdev_to_node(new_cma_id->device)); 244 if (!newxprt) 245 return; 246 newxprt->sc_cm_id = new_cma_id; 247 new_cma_id->context = newxprt; 248 svc_rdma_parse_connect_private(newxprt, param); 249 250 /* Save client advertised inbound read limit for use later in accept. */ 251 newxprt->sc_ord = param->initiator_depth; 252 253 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 254 newxprt->sc_xprt.xpt_remotelen = svc_addr_len(sa); 255 memcpy(&newxprt->sc_xprt.xpt_remote, sa, 256 newxprt->sc_xprt.xpt_remotelen); 257 snprintf(newxprt->sc_xprt.xpt_remotebuf, 258 sizeof(newxprt->sc_xprt.xpt_remotebuf) - 1, "%pISc", sa); 259 260 /* The remote port is arbitrary and not under the control of the 261 * client ULP. Set it to a fixed value so that the DRC continues 262 * to be effective after a reconnect. 263 */ 264 rpc_set_port((struct sockaddr *)&newxprt->sc_xprt.xpt_remote, 0); 265 266 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 267 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 268 269 /* 270 * Enqueue the new transport on the accept queue of the listening 271 * transport 272 */ 273 spin_lock(&listen_xprt->sc_lock); 274 list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); 275 spin_unlock(&listen_xprt->sc_lock); 276 277 set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); 278 svc_xprt_enqueue(&listen_xprt->sc_xprt); 279 } 280 281 /** 282 * svc_rdma_listen_handler - Handle CM events generated on a listening endpoint 283 * @cma_id: the server's listener rdma_cm_id 284 * @event: details of the event 285 * 286 * Return values: 287 * %0: Do not destroy @cma_id 288 * %1: Destroy @cma_id 289 * 290 * NB: There is never a DEVICE_REMOVAL event for INADDR_ANY listeners. 291 */ 292 static int svc_rdma_listen_handler(struct rdma_cm_id *cma_id, 293 struct rdma_cm_event *event) 294 { 295 struct sockaddr *sap = (struct sockaddr *)&cma_id->route.addr.src_addr; 296 struct svcxprt_rdma *cma_xprt = cma_id->context; 297 struct svc_xprt *cma_rdma = &cma_xprt->sc_xprt; 298 struct rdma_cm_id *listen_id; 299 300 switch (event->event) { 301 case RDMA_CM_EVENT_CONNECT_REQUEST: 302 handle_connect_req(cma_id, &event->param.conn); 303 break; 304 case RDMA_CM_EVENT_ADDR_CHANGE: 305 listen_id = svc_rdma_create_listen_id(cma_rdma->xpt_net, 306 sap, cma_xprt); 307 if (IS_ERR(listen_id)) { 308 pr_err("Listener dead, address change failed for device %s\n", 309 cma_id->device->name); 310 } else 311 cma_xprt->sc_cm_id = listen_id; 312 return 1; 313 default: 314 break; 315 } 316 return 0; 317 } 318 319 /** 320 * svc_rdma_cma_handler - Handle CM events on client connections 321 * @cma_id: the server's listener rdma_cm_id 322 * @event: details of the event 323 * 324 * Return values: 325 * %0: Do not destroy @cma_id 326 * %1: Destroy @cma_id (never returned here) 327 */ 328 static int svc_rdma_cma_handler(struct rdma_cm_id *cma_id, 329 struct rdma_cm_event *event) 330 { 331 struct svcxprt_rdma *rdma = cma_id->context; 332 struct svc_xprt *xprt = &rdma->sc_xprt; 333 334 switch (event->event) { 335 case RDMA_CM_EVENT_ESTABLISHED: 336 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); 337 338 /* Handle any requests that were received while 339 * CONN_PENDING was set. */ 340 svc_xprt_enqueue(xprt); 341 break; 342 case RDMA_CM_EVENT_DISCONNECTED: 343 svc_xprt_deferred_close(xprt); 344 break; 345 default: 346 break; 347 } 348 return 0; 349 } 350 351 /* 352 * Create a listening RDMA service endpoint. 353 */ 354 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 355 struct net *net, 356 struct sockaddr *sa, int salen, 357 int flags) 358 { 359 struct rdma_cm_id *listen_id; 360 struct svcxprt_rdma *cma_xprt; 361 362 if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6) 363 return ERR_PTR(-EAFNOSUPPORT); 364 cma_xprt = svc_rdma_create_xprt(serv, net, NUMA_NO_NODE); 365 if (!cma_xprt) 366 return ERR_PTR(-ENOMEM); 367 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 368 strcpy(cma_xprt->sc_xprt.xpt_remotebuf, "listener"); 369 370 listen_id = svc_rdma_create_listen_id(net, sa, cma_xprt); 371 if (IS_ERR(listen_id)) { 372 kfree(cma_xprt); 373 return ERR_CAST(listen_id); 374 } 375 cma_xprt->sc_cm_id = listen_id; 376 377 /* 378 * We need to use the address from the cm_id in case the 379 * caller specified 0 for the port number. 380 */ 381 sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr; 382 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 383 384 return &cma_xprt->sc_xprt; 385 } 386 387 static void svc_rdma_xprt_done(struct rpcrdma_notification *rn) 388 { 389 struct svcxprt_rdma *rdma = container_of(rn, struct svcxprt_rdma, 390 sc_rn); 391 struct rdma_cm_id *id = rdma->sc_cm_id; 392 393 trace_svcrdma_device_removal(id); 394 svc_xprt_close(&rdma->sc_xprt); 395 } 396 397 /* 398 * This is the xpo_recvfrom function for listening endpoints. Its 399 * purpose is to accept incoming connections. The CMA callback handler 400 * has already created a new transport and attached it to the new CMA 401 * ID. 402 * 403 * There is a queue of pending connections hung on the listening 404 * transport. This queue contains the new svc_xprt structure. This 405 * function takes svc_xprt structures off the accept_q and completes 406 * the connection. 407 */ 408 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) 409 { 410 unsigned int ctxts, rq_depth, maxpayload; 411 struct svcxprt_rdma *listen_rdma; 412 struct svcxprt_rdma *newxprt = NULL; 413 struct rdma_conn_param conn_param; 414 struct rpcrdma_connect_private pmsg; 415 struct ib_qp_init_attr qp_attr; 416 struct ib_device *dev; 417 int ret = 0; 418 419 listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); 420 clear_bit(XPT_CONN, &xprt->xpt_flags); 421 /* Get the next entry off the accept list */ 422 spin_lock(&listen_rdma->sc_lock); 423 if (!list_empty(&listen_rdma->sc_accept_q)) { 424 newxprt = list_entry(listen_rdma->sc_accept_q.next, 425 struct svcxprt_rdma, sc_accept_q); 426 list_del_init(&newxprt->sc_accept_q); 427 } 428 if (!list_empty(&listen_rdma->sc_accept_q)) 429 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); 430 spin_unlock(&listen_rdma->sc_lock); 431 if (!newxprt) 432 return NULL; 433 434 dev = newxprt->sc_cm_id->device; 435 newxprt->sc_port_num = newxprt->sc_cm_id->port_num; 436 437 if (rpcrdma_rn_register(dev, &newxprt->sc_rn, svc_rdma_xprt_done)) 438 goto errout; 439 440 newxprt->sc_max_req_size = svcrdma_max_req_size; 441 newxprt->sc_max_requests = svcrdma_max_requests; 442 newxprt->sc_max_bc_requests = svcrdma_max_bc_requests; 443 newxprt->sc_recv_batch = RPCRDMA_MAX_RECV_BATCH; 444 newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests); 445 446 /* Qualify the transport's resource defaults with the 447 * capabilities of this particular device. 448 */ 449 450 /* Transport header, head iovec, tail iovec */ 451 newxprt->sc_max_send_sges = 3; 452 /* Add one SGE per page list entry */ 453 newxprt->sc_max_send_sges += (svcrdma_max_req_size / PAGE_SIZE) + 1; 454 if (newxprt->sc_max_send_sges > dev->attrs.max_send_sge) 455 newxprt->sc_max_send_sges = dev->attrs.max_send_sge; 456 rq_depth = newxprt->sc_max_requests + newxprt->sc_max_bc_requests + 457 newxprt->sc_recv_batch + 1 /* drain */; 458 if (rq_depth > dev->attrs.max_qp_wr) { 459 rq_depth = dev->attrs.max_qp_wr; 460 newxprt->sc_recv_batch = 1; 461 newxprt->sc_max_requests = rq_depth - 2; 462 newxprt->sc_max_bc_requests = 2; 463 } 464 465 /* Estimate the needed number of rdma_rw contexts. The maximum 466 * Read and Write chunks have one segment each. Each request 467 * can involve one Read chunk and either a Write chunk or Reply 468 * chunk; thus a factor of three. 469 */ 470 maxpayload = min(xprt->xpt_server->sv_max_payload, 471 RPCSVC_MAXPAYLOAD_RDMA); 472 ctxts = newxprt->sc_max_requests * 3 * 473 rdma_rw_mr_factor(dev, newxprt->sc_port_num, 474 maxpayload >> PAGE_SHIFT); 475 476 newxprt->sc_sq_depth = rq_depth + 477 rdma_rw_max_send_wr(dev, newxprt->sc_port_num, ctxts, 0); 478 if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr) 479 newxprt->sc_sq_depth = dev->attrs.max_qp_wr; 480 atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); 481 atomic_set(&newxprt->sc_sq_ticket_head, 0); 482 atomic_set(&newxprt->sc_sq_ticket_tail, 0); 483 484 newxprt->sc_pd = ib_alloc_pd(dev, 0); 485 if (IS_ERR(newxprt->sc_pd)) { 486 trace_svcrdma_pd_err(newxprt, PTR_ERR(newxprt->sc_pd)); 487 goto errout; 488 } 489 newxprt->sc_sq_cq = ib_alloc_cq_any(dev, newxprt, newxprt->sc_sq_depth, 490 IB_POLL_WORKQUEUE); 491 if (IS_ERR(newxprt->sc_sq_cq)) 492 goto errout; 493 newxprt->sc_rq_cq = 494 ib_alloc_cq_any(dev, newxprt, rq_depth, IB_POLL_WORKQUEUE); 495 if (IS_ERR(newxprt->sc_rq_cq)) 496 goto errout; 497 498 memset(&qp_attr, 0, sizeof qp_attr); 499 qp_attr.event_handler = qp_event_handler; 500 qp_attr.qp_context = &newxprt->sc_xprt; 501 qp_attr.port_num = newxprt->sc_port_num; 502 qp_attr.cap.max_rdma_ctxs = ctxts; 503 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth - ctxts; 504 qp_attr.cap.max_recv_wr = rq_depth; 505 qp_attr.cap.max_send_sge = newxprt->sc_max_send_sges; 506 qp_attr.cap.max_recv_sge = 1; 507 qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 508 qp_attr.qp_type = IB_QPT_RC; 509 qp_attr.send_cq = newxprt->sc_sq_cq; 510 qp_attr.recv_cq = newxprt->sc_rq_cq; 511 dprintk(" cap.max_send_wr = %d, cap.max_recv_wr = %d\n", 512 qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr); 513 dprintk(" cap.max_send_sge = %d, cap.max_recv_sge = %d\n", 514 qp_attr.cap.max_send_sge, qp_attr.cap.max_recv_sge); 515 dprintk(" send CQ depth = %u, recv CQ depth = %u\n", 516 newxprt->sc_sq_depth, rq_depth); 517 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr); 518 if (ret) { 519 trace_svcrdma_qp_err(newxprt, ret); 520 goto errout; 521 } 522 newxprt->sc_max_send_sges = qp_attr.cap.max_send_sge; 523 newxprt->sc_qp = newxprt->sc_cm_id->qp; 524 525 if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)) 526 newxprt->sc_snd_w_inv = false; 527 if (!rdma_protocol_iwarp(dev, newxprt->sc_port_num) && 528 !rdma_ib_or_roce(dev, newxprt->sc_port_num)) { 529 trace_svcrdma_fabric_err(newxprt, -EINVAL); 530 goto errout; 531 } 532 533 if (!svc_rdma_post_recvs(newxprt)) 534 goto errout; 535 536 /* Construct RDMA-CM private message */ 537 pmsg.cp_magic = rpcrdma_cmp_magic; 538 pmsg.cp_version = RPCRDMA_CMP_VERSION; 539 pmsg.cp_flags = 0; 540 pmsg.cp_send_size = pmsg.cp_recv_size = 541 rpcrdma_encode_buffer_size(newxprt->sc_max_req_size); 542 543 /* Accept Connection */ 544 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 545 memset(&conn_param, 0, sizeof conn_param); 546 conn_param.responder_resources = 0; 547 conn_param.initiator_depth = min_t(int, newxprt->sc_ord, 548 dev->attrs.max_qp_init_rd_atom); 549 if (!conn_param.initiator_depth) { 550 ret = -EINVAL; 551 trace_svcrdma_initdepth_err(newxprt, ret); 552 goto errout; 553 } 554 conn_param.private_data = &pmsg; 555 conn_param.private_data_len = sizeof(pmsg); 556 rdma_lock_handler(newxprt->sc_cm_id); 557 newxprt->sc_cm_id->event_handler = svc_rdma_cma_handler; 558 ret = rdma_accept(newxprt->sc_cm_id, &conn_param); 559 rdma_unlock_handler(newxprt->sc_cm_id); 560 if (ret) { 561 trace_svcrdma_accept_err(newxprt, ret); 562 goto errout; 563 } 564 565 if (IS_ENABLED(CONFIG_SUNRPC_DEBUG)) { 566 struct sockaddr *sap; 567 568 dprintk("svcrdma: new connection accepted on device %s:\n", dev->name); 569 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 570 dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap)); 571 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 572 dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap)); 573 dprintk(" max_sge : %d\n", newxprt->sc_max_send_sges); 574 dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth); 575 dprintk(" rdma_rw_ctxs : %d\n", ctxts); 576 dprintk(" max_requests : %d\n", newxprt->sc_max_requests); 577 dprintk(" ord : %d\n", conn_param.initiator_depth); 578 } 579 580 return &newxprt->sc_xprt; 581 582 errout: 583 /* Take a reference in case the DTO handler runs */ 584 svc_xprt_get(&newxprt->sc_xprt); 585 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) 586 ib_destroy_qp(newxprt->sc_qp); 587 rdma_destroy_id(newxprt->sc_cm_id); 588 rpcrdma_rn_unregister(dev, &newxprt->sc_rn); 589 /* This call to put will destroy the transport */ 590 svc_xprt_put(&newxprt->sc_xprt); 591 return NULL; 592 } 593 594 static void svc_rdma_detach(struct svc_xprt *xprt) 595 { 596 struct svcxprt_rdma *rdma = 597 container_of(xprt, struct svcxprt_rdma, sc_xprt); 598 599 rdma_disconnect(rdma->sc_cm_id); 600 } 601 602 /** 603 * svc_rdma_free - Release class-specific transport resources 604 * @xprt: Generic svc transport object 605 */ 606 static void svc_rdma_free(struct svc_xprt *xprt) 607 { 608 struct svcxprt_rdma *rdma = 609 container_of(xprt, struct svcxprt_rdma, sc_xprt); 610 struct ib_device *device = rdma->sc_cm_id->device; 611 612 might_sleep(); 613 614 /* This blocks until the Completion Queues are empty */ 615 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 616 ib_drain_qp(rdma->sc_qp); 617 flush_workqueue(svcrdma_wq); 618 619 svc_rdma_flush_recv_queues(rdma); 620 621 svc_rdma_destroy_rw_ctxts(rdma); 622 svc_rdma_send_ctxts_destroy(rdma); 623 svc_rdma_recv_ctxts_destroy(rdma); 624 625 /* Destroy the QP if present (not a listener) */ 626 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 627 ib_destroy_qp(rdma->sc_qp); 628 629 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 630 ib_free_cq(rdma->sc_sq_cq); 631 632 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 633 ib_free_cq(rdma->sc_rq_cq); 634 635 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 636 ib_dealloc_pd(rdma->sc_pd); 637 638 /* Destroy the CM ID */ 639 rdma_destroy_id(rdma->sc_cm_id); 640 641 if (!test_bit(XPT_LISTENER, &rdma->sc_xprt.xpt_flags)) 642 rpcrdma_rn_unregister(device, &rdma->sc_rn); 643 kfree(rdma); 644 } 645 646 static int svc_rdma_has_wspace(struct svc_xprt *xprt) 647 { 648 struct svcxprt_rdma *rdma = 649 container_of(xprt, struct svcxprt_rdma, sc_xprt); 650 651 /* 652 * If there are already waiters on the SQ, 653 * return false. 654 */ 655 if (waitqueue_active(&rdma->sc_send_wait) || 656 waitqueue_active(&rdma->sc_sq_ticket_wait)) 657 return 0; 658 659 /* Otherwise return true. */ 660 return 1; 661 } 662 663 static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt) 664 { 665 } 666