1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 15 #include "rtrs-srv.h" 16 #include "rtrs-log.h" 17 #include <rdma/ib_cm.h> 18 #include <rdma/ib_verbs.h> 19 #include "rtrs-srv-trace.h" 20 21 MODULE_DESCRIPTION("RDMA Transport Server"); 22 MODULE_LICENSE("GPL"); 23 24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 26 #define DEFAULT_SESS_QUEUE_DEPTH 512 27 #define MAX_HDR_SIZE PAGE_SIZE 28 29 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops; 30 static struct rtrs_rdma_dev_pd dev_pd = { 31 .ops = &dev_pd_ops 32 }; 33 const struct class rtrs_dev_class = { 34 .name = "rtrs-server", 35 }; 36 static struct rtrs_srv_ib_ctx ib_ctx; 37 38 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 39 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 40 41 static bool always_invalidate = true; 42 module_param(always_invalidate, bool, 0444); 43 MODULE_PARM_DESC(always_invalidate, 44 "Invalidate memory registration for contiguous memory regions before accessing."); 45 46 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 47 MODULE_PARM_DESC(max_chunk_size, 48 "Max size for each IO request, when change the unit is in byte (default: " 49 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 50 51 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 52 MODULE_PARM_DESC(sess_queue_depth, 53 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 54 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 55 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 56 57 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 58 59 static struct workqueue_struct *rtrs_wq; 60 61 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 62 { 63 return container_of(c, struct rtrs_srv_con, c); 64 } 65 66 static bool rtrs_srv_change_state(struct rtrs_srv_path *srv_path, 67 enum rtrs_srv_state new_state) 68 { 69 enum rtrs_srv_state old_state; 70 bool changed = false; 71 unsigned long flags; 72 73 spin_lock_irqsave(&srv_path->state_lock, flags); 74 old_state = srv_path->state; 75 switch (new_state) { 76 case RTRS_SRV_CONNECTED: 77 if (old_state == RTRS_SRV_CONNECTING) 78 changed = true; 79 break; 80 case RTRS_SRV_CLOSING: 81 if (old_state == RTRS_SRV_CONNECTING || 82 old_state == RTRS_SRV_CONNECTED) 83 changed = true; 84 break; 85 case RTRS_SRV_CLOSED: 86 if (old_state == RTRS_SRV_CLOSING) 87 changed = true; 88 break; 89 default: 90 break; 91 } 92 if (changed) 93 srv_path->state = new_state; 94 spin_unlock_irqrestore(&srv_path->state_lock, flags); 95 96 return changed; 97 } 98 99 static void free_id(struct rtrs_srv_op *id) 100 { 101 if (!id) 102 return; 103 kfree(id); 104 } 105 106 static void rtrs_srv_free_ops_ids(struct rtrs_srv_path *srv_path) 107 { 108 struct rtrs_srv_sess *srv = srv_path->srv; 109 int i; 110 111 if (srv_path->ops_ids) { 112 for (i = 0; i < srv->queue_depth; i++) 113 free_id(srv_path->ops_ids[i]); 114 kfree(srv_path->ops_ids); 115 srv_path->ops_ids = NULL; 116 } 117 } 118 119 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 120 121 static struct ib_cqe io_comp_cqe = { 122 .done = rtrs_srv_rdma_done 123 }; 124 125 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref) 126 { 127 struct rtrs_srv_path *srv_path = container_of(ref, 128 struct rtrs_srv_path, 129 ids_inflight_ref); 130 131 percpu_ref_exit(&srv_path->ids_inflight_ref); 132 complete(&srv_path->complete_done); 133 } 134 135 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_path *srv_path) 136 { 137 struct rtrs_srv_sess *srv = srv_path->srv; 138 struct rtrs_srv_op *id; 139 int i, ret; 140 141 srv_path->ops_ids = kcalloc(srv->queue_depth, 142 sizeof(*srv_path->ops_ids), 143 GFP_KERNEL); 144 if (!srv_path->ops_ids) 145 goto err; 146 147 for (i = 0; i < srv->queue_depth; ++i) { 148 id = kzalloc(sizeof(*id), GFP_KERNEL); 149 if (!id) 150 goto err; 151 152 srv_path->ops_ids[i] = id; 153 } 154 155 ret = percpu_ref_init(&srv_path->ids_inflight_ref, 156 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL); 157 if (ret) { 158 pr_err("Percpu reference init failed\n"); 159 goto err; 160 } 161 init_completion(&srv_path->complete_done); 162 163 return 0; 164 165 err: 166 rtrs_srv_free_ops_ids(srv_path); 167 return -ENOMEM; 168 } 169 170 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_path *srv_path) 171 { 172 percpu_ref_get(&srv_path->ids_inflight_ref); 173 } 174 175 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_path *srv_path) 176 { 177 percpu_ref_put(&srv_path->ids_inflight_ref); 178 } 179 180 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 181 { 182 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 183 struct rtrs_path *s = con->c.path; 184 struct rtrs_srv_path *srv_path = to_srv_path(s); 185 186 if (wc->status != IB_WC_SUCCESS) { 187 rtrs_err_rl(s, "REG MR failed: %s\n", 188 ib_wc_status_msg(wc->status)); 189 close_path(srv_path); 190 return; 191 } 192 } 193 194 static struct ib_cqe local_reg_cqe = { 195 .done = rtrs_srv_reg_mr_done 196 }; 197 198 static int rdma_write_sg(struct rtrs_srv_op *id) 199 { 200 struct rtrs_path *s = id->con->c.path; 201 struct rtrs_srv_path *srv_path = to_srv_path(s); 202 dma_addr_t dma_addr = srv_path->dma_addr[id->msg_id]; 203 struct rtrs_srv_mr *srv_mr; 204 struct ib_send_wr inv_wr; 205 struct ib_rdma_wr imm_wr; 206 struct ib_rdma_wr *wr = NULL; 207 enum ib_send_flags flags; 208 size_t sg_cnt; 209 int err, offset; 210 bool need_inval; 211 struct ib_reg_wr rwr; 212 struct ib_sge *plist; 213 struct ib_sge list; 214 215 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 216 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 217 if (sg_cnt != 1) 218 return -EINVAL; 219 220 offset = 0; 221 222 wr = &id->tx_wr; 223 plist = &id->tx_sg; 224 plist->addr = dma_addr + offset; 225 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 226 227 /* WR will fail with length error 228 * if this is 0 229 */ 230 if (plist->length == 0) { 231 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 232 return -EINVAL; 233 } 234 235 plist->lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 236 offset += plist->length; 237 238 wr->wr.sg_list = plist; 239 wr->wr.num_sge = 1; 240 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 241 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 242 243 wr->wr.opcode = IB_WR_RDMA_WRITE; 244 wr->wr.wr_cqe = &io_comp_cqe; 245 wr->wr.ex.imm_data = 0; 246 wr->wr.send_flags = 0; 247 248 if (need_inval && always_invalidate) { 249 wr->wr.next = &rwr.wr; 250 rwr.wr.next = &inv_wr; 251 inv_wr.next = &imm_wr.wr; 252 } else if (always_invalidate) { 253 wr->wr.next = &rwr.wr; 254 rwr.wr.next = &imm_wr.wr; 255 } else if (need_inval) { 256 wr->wr.next = &inv_wr; 257 inv_wr.next = &imm_wr.wr; 258 } else { 259 wr->wr.next = &imm_wr.wr; 260 } 261 /* 262 * From time to time we have to post signaled sends, 263 * or send queue will fill up and only QP reset can help. 264 */ 265 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ? 266 0 : IB_SEND_SIGNALED; 267 268 if (need_inval) { 269 inv_wr.sg_list = NULL; 270 inv_wr.num_sge = 0; 271 inv_wr.opcode = IB_WR_SEND_WITH_INV; 272 inv_wr.wr_cqe = &io_comp_cqe; 273 inv_wr.send_flags = 0; 274 inv_wr.ex.invalidate_rkey = wr->rkey; 275 } 276 277 imm_wr.wr.next = NULL; 278 if (always_invalidate) { 279 struct rtrs_msg_rkey_rsp *msg; 280 281 srv_mr = &srv_path->mrs[id->msg_id]; 282 rwr.wr.opcode = IB_WR_REG_MR; 283 rwr.wr.wr_cqe = &local_reg_cqe; 284 rwr.wr.num_sge = 0; 285 rwr.mr = srv_mr->mr; 286 rwr.wr.send_flags = 0; 287 rwr.key = srv_mr->mr->rkey; 288 rwr.access = (IB_ACCESS_LOCAL_WRITE | 289 IB_ACCESS_REMOTE_WRITE); 290 msg = srv_mr->iu->buf; 291 msg->buf_id = cpu_to_le16(id->msg_id); 292 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 293 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 294 295 list.addr = srv_mr->iu->dma_addr; 296 list.length = sizeof(*msg); 297 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 298 imm_wr.wr.sg_list = &list; 299 imm_wr.wr.num_sge = 1; 300 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 301 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 302 srv_mr->iu->dma_addr, 303 srv_mr->iu->size, DMA_TO_DEVICE); 304 } else { 305 imm_wr.wr.sg_list = NULL; 306 imm_wr.wr.num_sge = 0; 307 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 308 } 309 imm_wr.wr.send_flags = flags; 310 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 311 0, need_inval)); 312 313 imm_wr.wr.wr_cqe = &io_comp_cqe; 314 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, dma_addr, 315 offset, DMA_BIDIRECTIONAL); 316 317 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 318 if (err) 319 rtrs_err(s, 320 "Posting RDMA-Write-Request to QP failed, err: %pe\n", 321 ERR_PTR(err)); 322 323 return err; 324 } 325 326 /** 327 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 328 * requests or on successful WRITE request. 329 * @con: the connection to send back result 330 * @id: the id associated with the IO 331 * @errno: the error number of the IO. 332 * 333 * Return 0 on success, errno otherwise. 334 */ 335 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 336 int errno) 337 { 338 struct rtrs_path *s = con->c.path; 339 struct rtrs_srv_path *srv_path = to_srv_path(s); 340 struct ib_send_wr inv_wr, *wr = NULL; 341 struct ib_rdma_wr imm_wr; 342 struct ib_reg_wr rwr; 343 struct rtrs_srv_mr *srv_mr; 344 bool need_inval = false; 345 enum ib_send_flags flags; 346 struct ib_sge list; 347 u32 imm; 348 int err; 349 350 if (id->dir == READ) { 351 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 352 size_t sg_cnt; 353 354 need_inval = le16_to_cpu(rd_msg->flags) & 355 RTRS_MSG_NEED_INVAL_F; 356 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 357 358 if (need_inval) { 359 if (sg_cnt) { 360 inv_wr.wr_cqe = &io_comp_cqe; 361 inv_wr.sg_list = NULL; 362 inv_wr.num_sge = 0; 363 inv_wr.opcode = IB_WR_SEND_WITH_INV; 364 inv_wr.send_flags = 0; 365 /* Only one key is actually used */ 366 inv_wr.ex.invalidate_rkey = 367 le32_to_cpu(rd_msg->desc[0].key); 368 } else { 369 WARN_ON_ONCE(1); 370 need_inval = false; 371 } 372 } 373 } 374 375 trace_send_io_resp_imm(id, need_inval, always_invalidate, errno); 376 377 if (need_inval && always_invalidate) { 378 wr = &inv_wr; 379 inv_wr.next = &rwr.wr; 380 rwr.wr.next = &imm_wr.wr; 381 } else if (always_invalidate) { 382 wr = &rwr.wr; 383 rwr.wr.next = &imm_wr.wr; 384 } else if (need_inval) { 385 wr = &inv_wr; 386 inv_wr.next = &imm_wr.wr; 387 } else { 388 wr = &imm_wr.wr; 389 } 390 /* 391 * From time to time we have to post signalled sends, 392 * or send queue will fill up and only QP reset can help. 393 */ 394 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ? 395 0 : IB_SEND_SIGNALED; 396 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 397 imm_wr.wr.next = NULL; 398 if (always_invalidate) { 399 struct rtrs_msg_rkey_rsp *msg; 400 401 srv_mr = &srv_path->mrs[id->msg_id]; 402 rwr.wr.next = &imm_wr.wr; 403 rwr.wr.opcode = IB_WR_REG_MR; 404 rwr.wr.wr_cqe = &local_reg_cqe; 405 rwr.wr.num_sge = 0; 406 rwr.wr.send_flags = 0; 407 rwr.mr = srv_mr->mr; 408 rwr.key = srv_mr->mr->rkey; 409 rwr.access = (IB_ACCESS_LOCAL_WRITE | 410 IB_ACCESS_REMOTE_WRITE); 411 msg = srv_mr->iu->buf; 412 msg->buf_id = cpu_to_le16(id->msg_id); 413 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 414 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 415 416 list.addr = srv_mr->iu->dma_addr; 417 list.length = sizeof(*msg); 418 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 419 imm_wr.wr.sg_list = &list; 420 imm_wr.wr.num_sge = 1; 421 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 422 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 423 srv_mr->iu->dma_addr, 424 srv_mr->iu->size, DMA_TO_DEVICE); 425 } else { 426 imm_wr.wr.sg_list = NULL; 427 imm_wr.wr.num_sge = 0; 428 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 429 } 430 imm_wr.wr.send_flags = flags; 431 imm_wr.wr.wr_cqe = &io_comp_cqe; 432 433 imm_wr.wr.ex.imm_data = cpu_to_be32(imm); 434 435 err = ib_post_send(id->con->c.qp, wr, NULL); 436 if (err) 437 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %pe\n", 438 ERR_PTR(err)); 439 440 return err; 441 } 442 443 void close_path(struct rtrs_srv_path *srv_path) 444 { 445 if (rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSING)) 446 queue_work(rtrs_wq, &srv_path->close_work); 447 WARN_ON(srv_path->state != RTRS_SRV_CLOSING); 448 } 449 450 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 451 { 452 switch (state) { 453 case RTRS_SRV_CONNECTING: 454 return "RTRS_SRV_CONNECTING"; 455 case RTRS_SRV_CONNECTED: 456 return "RTRS_SRV_CONNECTED"; 457 case RTRS_SRV_CLOSING: 458 return "RTRS_SRV_CLOSING"; 459 case RTRS_SRV_CLOSED: 460 return "RTRS_SRV_CLOSED"; 461 default: 462 return "UNKNOWN"; 463 } 464 } 465 466 /** 467 * rtrs_srv_resp_rdma() - Finish an RDMA request 468 * 469 * @id: Internal RTRS operation identifier 470 * @status: Response Code sent to the other side for this operation. 471 * 0 = success, <=0 error 472 * Context: any 473 * 474 * Finish a RDMA operation. A message is sent to the client and the 475 * corresponding memory areas will be released. 476 */ 477 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 478 { 479 struct rtrs_srv_path *srv_path; 480 struct rtrs_srv_con *con; 481 struct rtrs_path *s; 482 int err; 483 484 if (WARN_ON(!id)) 485 return true; 486 487 con = id->con; 488 s = con->c.path; 489 srv_path = to_srv_path(s); 490 491 id->status = status; 492 493 if (srv_path->state != RTRS_SRV_CONNECTED) { 494 rtrs_err_rl(s, 495 "Sending I/O response failed, server path %s is disconnected, path state %s\n", 496 kobject_name(&srv_path->kobj), 497 rtrs_srv_state_str(srv_path->state)); 498 goto out; 499 } 500 if (always_invalidate) { 501 struct rtrs_srv_mr *mr = &srv_path->mrs[id->msg_id]; 502 503 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 504 } 505 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) { 506 rtrs_err(s, "IB send queue full: srv_path=%s cid=%d\n", 507 kobject_name(&srv_path->kobj), 508 con->c.cid); 509 atomic_add(1, &con->c.sq_wr_avail); 510 spin_lock(&con->rsp_wr_wait_lock); 511 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 512 spin_unlock(&con->rsp_wr_wait_lock); 513 return false; 514 } 515 516 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 517 err = send_io_resp_imm(con, id, status); 518 else 519 err = rdma_write_sg(id); 520 521 if (err) { 522 rtrs_err_rl(s, "IO response failed: %pe: srv_path=%s\n", 523 ERR_PTR(err), kobject_name(&srv_path->kobj)); 524 close_path(srv_path); 525 } 526 out: 527 rtrs_srv_put_ops_ids(srv_path); 528 return true; 529 } 530 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 531 532 /** 533 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 534 * @srv: Session pointer 535 * @priv: The private pointer that is associated with the session. 536 */ 537 void rtrs_srv_set_sess_priv(struct rtrs_srv_sess *srv, void *priv) 538 { 539 srv->priv = priv; 540 } 541 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 542 543 static void unmap_cont_bufs(struct rtrs_srv_path *srv_path) 544 { 545 int i; 546 547 for (i = 0; i < srv_path->mrs_num; i++) { 548 struct rtrs_srv_mr *srv_mr; 549 550 srv_mr = &srv_path->mrs[i]; 551 552 if (always_invalidate) 553 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1); 554 555 ib_dereg_mr(srv_mr->mr); 556 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, srv_mr->sgt.sgl, 557 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 558 sg_free_table(&srv_mr->sgt); 559 } 560 kfree(srv_path->mrs); 561 } 562 563 static int map_cont_bufs(struct rtrs_srv_path *srv_path) 564 { 565 struct ib_device *ib_dev = srv_path->s.dev->ib_dev; 566 struct rtrs_srv_sess *srv = srv_path->srv; 567 struct rtrs_path *ss = &srv_path->s; 568 int i, err, mrs_num; 569 unsigned int chunk_bits; 570 enum ib_mr_type mr_type; 571 int chunks_per_mr = 1; 572 struct sg_table *sgt; 573 struct ib_mr *mr; 574 575 /* 576 * Here we map queue_depth chunks to MR. Firstly we have to 577 * figure out how many chunks can we map per MR. 578 */ 579 if (always_invalidate) { 580 /* 581 * in order to do invalidate for each chunks of memory, we needs 582 * more memory regions. 583 */ 584 mrs_num = srv->queue_depth; 585 } else { 586 chunks_per_mr = 587 srv_path->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 588 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 589 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 590 } 591 592 srv_path->mrs = kcalloc(mrs_num, sizeof(*srv_path->mrs), GFP_KERNEL); 593 if (!srv_path->mrs) 594 return -ENOMEM; 595 596 for (srv_path->mrs_num = 0; srv_path->mrs_num < mrs_num; 597 srv_path->mrs_num++) { 598 struct rtrs_srv_mr *srv_mr = &srv_path->mrs[srv_path->mrs_num]; 599 struct scatterlist *s; 600 int nr, nr_sgt, chunks, ind; 601 602 sgt = &srv_mr->sgt; 603 chunks = chunks_per_mr * srv_path->mrs_num; 604 if (!always_invalidate) 605 chunks_per_mr = min_t(int, chunks_per_mr, 606 srv->queue_depth - chunks); 607 608 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 609 if (err) 610 goto err; 611 612 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 613 sg_set_page(s, srv->chunks[chunks + i], 614 max_chunk_size, 0); 615 616 nr_sgt = ib_dma_map_sg(srv_path->s.dev->ib_dev, sgt->sgl, 617 sgt->nents, DMA_BIDIRECTIONAL); 618 if (!nr_sgt) { 619 err = -EINVAL; 620 goto free_sg; 621 } 622 623 if (ib_dev->attrs.kernel_cap_flags & IBK_SG_GAPS_REG) 624 mr_type = IB_MR_TYPE_SG_GAPS; 625 else 626 mr_type = IB_MR_TYPE_MEM_REG; 627 628 mr = ib_alloc_mr(srv_path->s.dev->ib_pd, mr_type, nr_sgt); 629 if (IS_ERR(mr)) { 630 err = PTR_ERR(mr); 631 goto unmap_sg; 632 } 633 nr = ib_map_mr_sg(mr, sgt->sgl, nr_sgt, 634 NULL, max_chunk_size); 635 if (nr < nr_sgt) { 636 err = nr < 0 ? nr : -EINVAL; 637 goto dereg_mr; 638 } 639 640 if (always_invalidate) { 641 srv_mr->iu = rtrs_iu_alloc(1, 642 sizeof(struct rtrs_msg_rkey_rsp), 643 GFP_KERNEL, srv_path->s.dev->ib_dev, 644 DMA_TO_DEVICE, rtrs_srv_rdma_done); 645 if (!srv_mr->iu) { 646 err = -ENOMEM; 647 rtrs_err(ss, "rtrs_iu_alloc(), err: %pe\n", ERR_PTR(err)); 648 goto dereg_mr; 649 } 650 } 651 652 /* 653 * Cache DMA addresses by traversing sg entries. If 654 * regions were merged, an inner loop is required to 655 * populate the DMA address array by traversing larger 656 * regions. 657 */ 658 ind = chunks; 659 for_each_sg(sgt->sgl, s, nr_sgt, i) { 660 unsigned int dma_len = sg_dma_len(s); 661 u64 dma_addr = sg_dma_address(s); 662 u64 dma_addr_end = dma_addr + dma_len; 663 664 do { 665 srv_path->dma_addr[ind++] = dma_addr; 666 dma_addr += max_chunk_size; 667 } while (dma_addr < dma_addr_end); 668 } 669 670 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 671 srv_mr->mr = mr; 672 } 673 674 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 675 srv_path->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 676 677 return 0; 678 679 dereg_mr: 680 ib_dereg_mr(mr); 681 unmap_sg: 682 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, sgt->sgl, 683 sgt->nents, DMA_BIDIRECTIONAL); 684 free_sg: 685 sg_free_table(sgt); 686 err: 687 unmap_cont_bufs(srv_path); 688 689 return err; 690 } 691 692 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 693 { 694 struct rtrs_srv_con *con = container_of(c, typeof(*con), c); 695 struct rtrs_srv_path *srv_path = to_srv_path(con->c.path); 696 697 rtrs_err(con->c.path, "HB err handler for path=%s\n", kobject_name(&srv_path->kobj)); 698 close_path(to_srv_path(c->path)); 699 } 700 701 static void rtrs_srv_init_hb(struct rtrs_srv_path *srv_path) 702 { 703 rtrs_init_hb(&srv_path->s, &io_comp_cqe, 704 RTRS_HB_INTERVAL_MS, 705 RTRS_HB_MISSED_MAX, 706 rtrs_srv_hb_err_handler, 707 rtrs_wq); 708 } 709 710 static void rtrs_srv_start_hb(struct rtrs_srv_path *srv_path) 711 { 712 rtrs_start_hb(&srv_path->s); 713 } 714 715 static void rtrs_srv_stop_hb(struct rtrs_srv_path *srv_path) 716 { 717 rtrs_stop_hb(&srv_path->s); 718 } 719 720 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 721 { 722 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 723 struct rtrs_path *s = con->c.path; 724 struct rtrs_srv_path *srv_path = to_srv_path(s); 725 struct rtrs_iu *iu; 726 727 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 728 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 729 730 if (wc->status != IB_WC_SUCCESS) { 731 rtrs_err(s, "Sess info response send failed: %s\n", 732 ib_wc_status_msg(wc->status)); 733 close_path(srv_path); 734 return; 735 } 736 WARN_ON(wc->opcode != IB_WC_SEND); 737 } 738 739 static int rtrs_srv_path_up(struct rtrs_srv_path *srv_path) 740 { 741 struct rtrs_srv_sess *srv = srv_path->srv; 742 struct rtrs_srv_ctx *ctx = srv->ctx; 743 int up, ret = 0; 744 745 mutex_lock(&srv->paths_ev_mutex); 746 up = ++srv->paths_up; 747 if (up == 1) 748 ret = ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 749 mutex_unlock(&srv->paths_ev_mutex); 750 751 /* Mark session as established */ 752 if (!ret) 753 srv_path->established = true; 754 755 return ret; 756 } 757 758 static void rtrs_srv_path_down(struct rtrs_srv_path *srv_path) 759 { 760 struct rtrs_srv_sess *srv = srv_path->srv; 761 struct rtrs_srv_ctx *ctx = srv->ctx; 762 763 if (!srv_path->established) 764 return; 765 766 srv_path->established = false; 767 mutex_lock(&srv->paths_ev_mutex); 768 WARN_ON(!srv->paths_up); 769 if (--srv->paths_up == 0) 770 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 771 mutex_unlock(&srv->paths_ev_mutex); 772 } 773 774 static bool exist_pathname(struct rtrs_srv_ctx *ctx, 775 const char *pathname, const uuid_t *path_uuid) 776 { 777 struct rtrs_srv_sess *srv; 778 struct rtrs_srv_path *srv_path; 779 bool found = false; 780 781 mutex_lock(&ctx->srv_mutex); 782 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 783 mutex_lock(&srv->paths_mutex); 784 785 /* when a client with same uuid and same sessname tried to add a path */ 786 if (uuid_equal(&srv->paths_uuid, path_uuid)) { 787 mutex_unlock(&srv->paths_mutex); 788 continue; 789 } 790 791 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 792 if (strlen(srv_path->s.sessname) == strlen(pathname) && 793 !strcmp(srv_path->s.sessname, pathname)) { 794 found = true; 795 break; 796 } 797 } 798 mutex_unlock(&srv->paths_mutex); 799 if (found) 800 break; 801 } 802 mutex_unlock(&ctx->srv_mutex); 803 return found; 804 } 805 806 static int post_recv_path(struct rtrs_srv_path *srv_path); 807 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno); 808 809 static int process_info_req(struct rtrs_srv_con *con, 810 struct rtrs_msg_info_req *msg) 811 { 812 struct rtrs_path *s = con->c.path; 813 struct rtrs_srv_path *srv_path = to_srv_path(s); 814 struct ib_send_wr *reg_wr = NULL; 815 struct rtrs_msg_info_rsp *rsp; 816 struct rtrs_iu *tx_iu; 817 struct ib_reg_wr *rwr; 818 int mri, err; 819 size_t tx_sz; 820 821 err = post_recv_path(srv_path); 822 if (err) { 823 rtrs_err(s, "post_recv_path(), err: %pe\n", ERR_PTR(err)); 824 return err; 825 } 826 827 if (strchr(msg->pathname, '/') || strchr(msg->pathname, '.')) { 828 rtrs_err(s, "pathname cannot contain / and .\n"); 829 return -EINVAL; 830 } 831 832 if (exist_pathname(srv_path->srv->ctx, 833 msg->pathname, &srv_path->srv->paths_uuid)) { 834 rtrs_err(s, "pathname is duplicated: %s\n", msg->pathname); 835 return -EPERM; 836 } 837 strscpy(srv_path->s.sessname, msg->pathname, 838 sizeof(srv_path->s.sessname)); 839 840 rwr = kcalloc(srv_path->mrs_num, sizeof(*rwr), GFP_KERNEL); 841 if (!rwr) 842 return -ENOMEM; 843 844 tx_sz = sizeof(*rsp); 845 tx_sz += sizeof(rsp->desc[0]) * srv_path->mrs_num; 846 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, srv_path->s.dev->ib_dev, 847 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 848 if (!tx_iu) { 849 err = -ENOMEM; 850 goto rwr_free; 851 } 852 853 rsp = tx_iu->buf; 854 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 855 rsp->sg_cnt = cpu_to_le16(srv_path->mrs_num); 856 857 for (mri = 0; mri < srv_path->mrs_num; mri++) { 858 struct ib_mr *mr = srv_path->mrs[mri].mr; 859 860 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 861 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 862 rsp->desc[mri].len = cpu_to_le32(mr->length); 863 864 /* 865 * Fill in reg MR request and chain them *backwards* 866 */ 867 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 868 rwr[mri].wr.opcode = IB_WR_REG_MR; 869 rwr[mri].wr.wr_cqe = &local_reg_cqe; 870 rwr[mri].wr.num_sge = 0; 871 rwr[mri].wr.send_flags = 0; 872 rwr[mri].mr = mr; 873 rwr[mri].key = mr->rkey; 874 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 875 IB_ACCESS_REMOTE_WRITE); 876 reg_wr = &rwr[mri].wr; 877 } 878 879 err = rtrs_srv_create_path_files(srv_path); 880 if (err) 881 goto iu_free; 882 kobject_get(&srv_path->kobj); 883 get_device(&srv_path->srv->dev); 884 err = rtrs_srv_change_state(srv_path, RTRS_SRV_CONNECTED); 885 if (!err) { 886 rtrs_err(s, "rtrs_srv_change_state() failed\n"); 887 goto iu_free; 888 } 889 890 rtrs_srv_start_hb(srv_path); 891 892 /* 893 * We do not account number of established connections at the current 894 * moment, we rely on the client, which should send info request when 895 * all connections are successfully established. Thus, simply notify 896 * listener with a proper event if we are the first path. 897 */ 898 err = rtrs_srv_path_up(srv_path); 899 if (err) { 900 rtrs_err(s, "rtrs_srv_path_up(), err: %pe\n", ERR_PTR(err)); 901 goto iu_free; 902 } 903 904 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 905 tx_iu->dma_addr, 906 tx_iu->size, DMA_TO_DEVICE); 907 908 /* 909 * Now disable zombie connection closing. Since from the logs and code, 910 * we know that it can never be in CONNECTED state. 911 */ 912 srv_path->connection_timeout = 0; 913 914 /* Send info response */ 915 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 916 if (err) { 917 rtrs_err(s, "rtrs_iu_post_send(), err: %pe\n", ERR_PTR(err)); 918 iu_free: 919 rtrs_iu_free(tx_iu, srv_path->s.dev->ib_dev, 1); 920 } 921 rwr_free: 922 kfree(rwr); 923 924 return err; 925 } 926 927 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 928 { 929 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 930 struct rtrs_path *s = con->c.path; 931 struct rtrs_srv_path *srv_path = to_srv_path(s); 932 struct rtrs_msg_info_req *msg; 933 struct rtrs_iu *iu; 934 int err; 935 936 WARN_ON(con->c.cid); 937 938 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 939 if (wc->status != IB_WC_SUCCESS) { 940 rtrs_err(s, "Sess info request receive failed: %s\n", 941 ib_wc_status_msg(wc->status)); 942 goto close; 943 } 944 WARN_ON(wc->opcode != IB_WC_RECV); 945 946 if (wc->byte_len < sizeof(*msg)) { 947 rtrs_err(s, "Sess info request is malformed: size %d\n", 948 wc->byte_len); 949 goto close; 950 } 951 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, iu->dma_addr, 952 iu->size, DMA_FROM_DEVICE); 953 msg = iu->buf; 954 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) { 955 rtrs_err(s, "Sess info request is malformed: type %d\n", 956 le16_to_cpu(msg->type)); 957 goto close; 958 } 959 err = process_info_req(con, msg); 960 if (err) 961 goto close; 962 963 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 964 return; 965 close: 966 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 967 close_path(srv_path); 968 } 969 970 static int post_recv_info_req(struct rtrs_srv_con *con) 971 { 972 struct rtrs_path *s = con->c.path; 973 struct rtrs_srv_path *srv_path = to_srv_path(s); 974 struct rtrs_iu *rx_iu; 975 int err; 976 977 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 978 GFP_KERNEL, srv_path->s.dev->ib_dev, 979 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 980 if (!rx_iu) 981 return -ENOMEM; 982 /* Prepare for getting info response */ 983 err = rtrs_iu_post_recv(&con->c, rx_iu); 984 if (err) { 985 rtrs_err(s, "rtrs_iu_post_recv(), err: %pe\n", ERR_PTR(err)); 986 rtrs_iu_free(rx_iu, srv_path->s.dev->ib_dev, 1); 987 return err; 988 } 989 990 return 0; 991 } 992 993 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 994 { 995 int i, err; 996 997 for (i = 0; i < q_size; i++) { 998 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 999 if (err) 1000 return err; 1001 } 1002 1003 return 0; 1004 } 1005 1006 static int post_recv_path(struct rtrs_srv_path *srv_path) 1007 { 1008 struct rtrs_srv_sess *srv = srv_path->srv; 1009 struct rtrs_path *s = &srv_path->s; 1010 size_t q_size; 1011 int err, cid; 1012 1013 for (cid = 0; cid < srv_path->s.con_num; cid++) { 1014 if (cid == 0) 1015 q_size = SERVICE_CON_QUEUE_DEPTH; 1016 else 1017 q_size = srv->queue_depth; 1018 if (srv_path->state != RTRS_SRV_CONNECTING) { 1019 rtrs_err(s, "Path state invalid. state %s\n", 1020 rtrs_srv_state_str(srv_path->state)); 1021 return -EIO; 1022 } 1023 1024 if (!srv_path->s.con[cid]) { 1025 rtrs_err(s, "Conn not set for %d\n", cid); 1026 return -EIO; 1027 } 1028 1029 err = post_recv_io(to_srv_con(srv_path->s.con[cid]), q_size); 1030 if (err) { 1031 rtrs_err(s, "post_recv_io(), err: %pe\n", ERR_PTR(err)); 1032 return err; 1033 } 1034 } 1035 1036 return 0; 1037 } 1038 1039 static void process_read(struct rtrs_srv_con *con, 1040 struct rtrs_msg_rdma_read *msg, 1041 u32 buf_id, u32 off) 1042 { 1043 struct rtrs_path *s = con->c.path; 1044 struct rtrs_srv_path *srv_path = to_srv_path(s); 1045 struct rtrs_srv_sess *srv = srv_path->srv; 1046 struct rtrs_srv_ctx *ctx = srv->ctx; 1047 struct rtrs_srv_op *id; 1048 1049 size_t usr_len, data_len; 1050 void *data; 1051 int ret; 1052 1053 if (srv_path->state != RTRS_SRV_CONNECTED) { 1054 rtrs_err_rl(s, 1055 "Processing read request failed, session is disconnected, sess state %s\n", 1056 rtrs_srv_state_str(srv_path->state)); 1057 return; 1058 } 1059 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) { 1060 rtrs_err_rl(s, 1061 "Processing read request failed, invalid message\n"); 1062 return; 1063 } 1064 rtrs_srv_get_ops_ids(srv_path); 1065 rtrs_srv_update_rdma_stats(srv_path->stats, off, READ); 1066 id = srv_path->ops_ids[buf_id]; 1067 id->con = con; 1068 id->dir = READ; 1069 id->msg_id = buf_id; 1070 id->rd_msg = msg; 1071 usr_len = le16_to_cpu(msg->usr_len); 1072 data_len = off - usr_len; 1073 data = page_address(srv->chunks[buf_id]); 1074 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len, 1075 data + data_len, usr_len); 1076 1077 if (ret) { 1078 rtrs_err_rl(s, 1079 "Processing read request failed, user module cb reported for msg_id %d, err: %pe\n", 1080 buf_id, ERR_PTR(ret)); 1081 goto send_err_msg; 1082 } 1083 1084 return; 1085 1086 send_err_msg: 1087 ret = send_io_resp_imm(con, id, ret); 1088 if (ret < 0) { 1089 rtrs_err_rl(s, 1090 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %pe\n", 1091 buf_id, ERR_PTR(ret)); 1092 close_path(srv_path); 1093 } 1094 rtrs_srv_put_ops_ids(srv_path); 1095 } 1096 1097 static void process_write(struct rtrs_srv_con *con, 1098 struct rtrs_msg_rdma_write *req, 1099 u32 buf_id, u32 off) 1100 { 1101 struct rtrs_path *s = con->c.path; 1102 struct rtrs_srv_path *srv_path = to_srv_path(s); 1103 struct rtrs_srv_sess *srv = srv_path->srv; 1104 struct rtrs_srv_ctx *ctx = srv->ctx; 1105 struct rtrs_srv_op *id; 1106 1107 size_t data_len, usr_len; 1108 void *data; 1109 int ret; 1110 1111 if (srv_path->state != RTRS_SRV_CONNECTED) { 1112 rtrs_err_rl(s, 1113 "Processing write request failed, session is disconnected, sess state %s\n", 1114 rtrs_srv_state_str(srv_path->state)); 1115 return; 1116 } 1117 rtrs_srv_get_ops_ids(srv_path); 1118 rtrs_srv_update_rdma_stats(srv_path->stats, off, WRITE); 1119 id = srv_path->ops_ids[buf_id]; 1120 id->con = con; 1121 id->dir = WRITE; 1122 id->msg_id = buf_id; 1123 1124 usr_len = le16_to_cpu(req->usr_len); 1125 data_len = off - usr_len; 1126 data = page_address(srv->chunks[buf_id]); 1127 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len, 1128 data + data_len, usr_len); 1129 if (ret) { 1130 rtrs_err_rl(s, 1131 "Processing write request failed, user module callback reports err: %pe\n", 1132 ERR_PTR(ret)); 1133 goto send_err_msg; 1134 } 1135 1136 return; 1137 1138 send_err_msg: 1139 ret = send_io_resp_imm(con, id, ret); 1140 if (ret < 0) { 1141 rtrs_err_rl(s, 1142 "Processing write request failed, sending I/O response failed, msg_id %d, err: %pe\n", 1143 buf_id, ERR_PTR(ret)); 1144 close_path(srv_path); 1145 } 1146 rtrs_srv_put_ops_ids(srv_path); 1147 } 1148 1149 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1150 u32 id, u32 off) 1151 { 1152 struct rtrs_path *s = con->c.path; 1153 struct rtrs_srv_path *srv_path = to_srv_path(s); 1154 struct rtrs_msg_rdma_hdr *hdr; 1155 unsigned int type; 1156 1157 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, 1158 srv_path->dma_addr[id], 1159 max_chunk_size, DMA_BIDIRECTIONAL); 1160 hdr = msg; 1161 type = le16_to_cpu(hdr->type); 1162 1163 switch (type) { 1164 case RTRS_MSG_WRITE: 1165 process_write(con, msg, id, off); 1166 break; 1167 case RTRS_MSG_READ: 1168 process_read(con, msg, id, off); 1169 break; 1170 default: 1171 rtrs_err(s, 1172 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1173 type); 1174 goto err; 1175 } 1176 1177 return; 1178 1179 err: 1180 close_path(srv_path); 1181 } 1182 1183 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1184 { 1185 struct rtrs_srv_mr *mr = 1186 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1187 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1188 struct rtrs_path *s = con->c.path; 1189 struct rtrs_srv_path *srv_path = to_srv_path(s); 1190 struct rtrs_srv_sess *srv = srv_path->srv; 1191 u32 msg_id, off; 1192 void *data; 1193 1194 if (wc->status != IB_WC_SUCCESS) { 1195 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1196 ib_wc_status_msg(wc->status)); 1197 close_path(srv_path); 1198 } 1199 msg_id = mr->msg_id; 1200 off = mr->msg_off; 1201 data = page_address(srv->chunks[msg_id]) + off; 1202 process_io_req(con, data, msg_id, off); 1203 } 1204 1205 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1206 struct rtrs_srv_mr *mr) 1207 { 1208 struct ib_send_wr wr = { 1209 .opcode = IB_WR_LOCAL_INV, 1210 .wr_cqe = &mr->inv_cqe, 1211 .send_flags = IB_SEND_SIGNALED, 1212 .ex.invalidate_rkey = mr->mr->rkey, 1213 }; 1214 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1215 1216 return ib_post_send(con->c.qp, &wr, NULL); 1217 } 1218 1219 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1220 { 1221 spin_lock(&con->rsp_wr_wait_lock); 1222 while (!list_empty(&con->rsp_wr_wait_list)) { 1223 struct rtrs_srv_op *id; 1224 int ret; 1225 1226 id = list_entry(con->rsp_wr_wait_list.next, 1227 struct rtrs_srv_op, wait_list); 1228 list_del(&id->wait_list); 1229 1230 spin_unlock(&con->rsp_wr_wait_lock); 1231 ret = rtrs_srv_resp_rdma(id, id->status); 1232 spin_lock(&con->rsp_wr_wait_lock); 1233 1234 if (!ret) { 1235 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1236 break; 1237 } 1238 } 1239 spin_unlock(&con->rsp_wr_wait_lock); 1240 } 1241 1242 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1243 { 1244 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1245 struct rtrs_path *s = con->c.path; 1246 struct rtrs_srv_path *srv_path = to_srv_path(s); 1247 struct rtrs_srv_sess *srv = srv_path->srv; 1248 u32 imm_type, imm_payload; 1249 int err; 1250 1251 if (wc->status != IB_WC_SUCCESS) { 1252 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1253 rtrs_err(s, 1254 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1255 ib_wc_status_msg(wc->status), wc->wr_cqe, 1256 wc->opcode, wc->vendor_err, wc->byte_len); 1257 close_path(srv_path); 1258 } 1259 return; 1260 } 1261 1262 switch (wc->opcode) { 1263 case IB_WC_RECV_RDMA_WITH_IMM: 1264 /* 1265 * post_recv() RDMA write completions of IO reqs (read/write) 1266 * and hb 1267 */ 1268 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1269 return; 1270 srv_path->s.hb_missed_cnt = 0; 1271 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1272 if (err) { 1273 rtrs_err(s, "rtrs_post_recv(), err: %pe\n", 1274 ERR_PTR(err)); 1275 close_path(srv_path); 1276 break; 1277 } 1278 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1279 &imm_type, &imm_payload); 1280 if (imm_type == RTRS_IO_REQ_IMM) { 1281 u32 msg_id, off; 1282 void *data; 1283 1284 msg_id = imm_payload >> srv_path->mem_bits; 1285 off = imm_payload & ((1 << srv_path->mem_bits) - 1); 1286 if (msg_id >= srv->queue_depth || off >= max_chunk_size) { 1287 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1288 msg_id, off); 1289 close_path(srv_path); 1290 return; 1291 } 1292 if (always_invalidate) { 1293 struct rtrs_srv_mr *mr = &srv_path->mrs[msg_id]; 1294 1295 mr->msg_off = off; 1296 mr->msg_id = msg_id; 1297 err = rtrs_srv_inv_rkey(con, mr); 1298 if (err) { 1299 rtrs_err(s, "rtrs_post_recv(), err: %pe\n", 1300 ERR_PTR(err)); 1301 close_path(srv_path); 1302 break; 1303 } 1304 } else { 1305 data = page_address(srv->chunks[msg_id]) + off; 1306 process_io_req(con, data, msg_id, off); 1307 } 1308 } else if (imm_type == RTRS_HB_MSG_IMM) { 1309 WARN_ON(con->c.cid); 1310 rtrs_send_hb_ack(&srv_path->s); 1311 } else if (imm_type == RTRS_HB_ACK_IMM) { 1312 WARN_ON(con->c.cid); 1313 srv_path->s.hb_missed_cnt = 0; 1314 } else { 1315 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1316 } 1317 break; 1318 case IB_WC_RDMA_WRITE: 1319 case IB_WC_SEND: 1320 /* 1321 * post_send() RDMA write completions of IO reqs (read/write) 1322 * and hb. 1323 */ 1324 atomic_add(s->signal_interval, &con->c.sq_wr_avail); 1325 1326 if (!list_empty_careful(&con->rsp_wr_wait_list)) 1327 rtrs_rdma_process_wr_wait_list(con); 1328 1329 break; 1330 default: 1331 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1332 return; 1333 } 1334 } 1335 1336 /** 1337 * rtrs_srv_get_path_name() - Get rtrs_srv peer hostname. 1338 * @srv: Session 1339 * @pathname: Pathname buffer 1340 * @len: Length of sessname buffer 1341 */ 1342 int rtrs_srv_get_path_name(struct rtrs_srv_sess *srv, char *pathname, 1343 size_t len) 1344 { 1345 struct rtrs_srv_path *srv_path; 1346 int err = -ENOTCONN; 1347 1348 mutex_lock(&srv->paths_mutex); 1349 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 1350 if (srv_path->state != RTRS_SRV_CONNECTED) 1351 continue; 1352 strscpy(pathname, srv_path->s.sessname, 1353 min_t(size_t, sizeof(srv_path->s.sessname), len)); 1354 err = 0; 1355 break; 1356 } 1357 mutex_unlock(&srv->paths_mutex); 1358 1359 return err; 1360 } 1361 EXPORT_SYMBOL(rtrs_srv_get_path_name); 1362 1363 /** 1364 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth. 1365 * @srv: Session 1366 */ 1367 int rtrs_srv_get_queue_depth(struct rtrs_srv_sess *srv) 1368 { 1369 return srv->queue_depth; 1370 } 1371 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1372 1373 static int find_next_bit_ring(struct rtrs_srv_path *srv_path) 1374 { 1375 struct ib_device *ib_dev = srv_path->s.dev->ib_dev; 1376 int v; 1377 1378 v = cpumask_next(srv_path->cur_cq_vector, &cq_affinity_mask); 1379 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1380 v = cpumask_first(&cq_affinity_mask); 1381 return v; 1382 } 1383 1384 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_path *srv_path) 1385 { 1386 srv_path->cur_cq_vector = find_next_bit_ring(srv_path); 1387 1388 return srv_path->cur_cq_vector; 1389 } 1390 1391 static void rtrs_srv_dev_release(struct device *dev) 1392 { 1393 struct rtrs_srv_sess *srv = container_of(dev, struct rtrs_srv_sess, 1394 dev); 1395 1396 kfree(srv); 1397 } 1398 1399 static void free_srv(struct rtrs_srv_sess *srv) 1400 { 1401 int i; 1402 1403 WARN_ON(refcount_read(&srv->refcount)); 1404 for (i = 0; i < srv->queue_depth; i++) 1405 __free_pages(srv->chunks[i], get_order(max_chunk_size)); 1406 kfree(srv->chunks); 1407 mutex_destroy(&srv->paths_mutex); 1408 mutex_destroy(&srv->paths_ev_mutex); 1409 /* last put to release the srv structure */ 1410 put_device(&srv->dev); 1411 } 1412 1413 static struct rtrs_srv_sess *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1414 const uuid_t *paths_uuid, 1415 bool first_conn) 1416 { 1417 struct rtrs_srv_sess *srv; 1418 int i; 1419 1420 mutex_lock(&ctx->srv_mutex); 1421 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1422 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1423 refcount_inc_not_zero(&srv->refcount)) { 1424 mutex_unlock(&ctx->srv_mutex); 1425 return srv; 1426 } 1427 } 1428 mutex_unlock(&ctx->srv_mutex); 1429 /* 1430 * If this request is not the first connection request from the 1431 * client for this session then fail and return error. 1432 */ 1433 if (!first_conn) { 1434 pr_err_ratelimited("Error: Not the first connection request for this session\n"); 1435 return ERR_PTR(-ENXIO); 1436 } 1437 1438 /* need to allocate a new srv */ 1439 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1440 if (!srv) 1441 return ERR_PTR(-ENOMEM); 1442 1443 INIT_LIST_HEAD(&srv->paths_list); 1444 mutex_init(&srv->paths_mutex); 1445 mutex_init(&srv->paths_ev_mutex); 1446 uuid_copy(&srv->paths_uuid, paths_uuid); 1447 srv->queue_depth = sess_queue_depth; 1448 srv->ctx = ctx; 1449 device_initialize(&srv->dev); 1450 srv->dev.release = rtrs_srv_dev_release; 1451 1452 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1453 GFP_KERNEL); 1454 if (!srv->chunks) 1455 goto err_free_srv; 1456 1457 for (i = 0; i < srv->queue_depth; i++) { 1458 srv->chunks[i] = alloc_pages(GFP_KERNEL, 1459 get_order(max_chunk_size)); 1460 if (!srv->chunks[i]) 1461 goto err_free_chunks; 1462 } 1463 refcount_set(&srv->refcount, 1); 1464 mutex_lock(&ctx->srv_mutex); 1465 list_add(&srv->ctx_list, &ctx->srv_list); 1466 mutex_unlock(&ctx->srv_mutex); 1467 1468 return srv; 1469 1470 err_free_chunks: 1471 while (i--) 1472 __free_pages(srv->chunks[i], get_order(max_chunk_size)); 1473 kfree(srv->chunks); 1474 1475 err_free_srv: 1476 put_device(&srv->dev); 1477 return ERR_PTR(-ENOMEM); 1478 } 1479 1480 static void put_srv(struct rtrs_srv_sess *srv) 1481 { 1482 if (refcount_dec_and_test(&srv->refcount)) { 1483 struct rtrs_srv_ctx *ctx = srv->ctx; 1484 1485 WARN_ON(srv->dev.kobj.state_in_sysfs); 1486 1487 mutex_lock(&ctx->srv_mutex); 1488 list_del(&srv->ctx_list); 1489 mutex_unlock(&ctx->srv_mutex); 1490 free_srv(srv); 1491 } 1492 } 1493 1494 static void __add_path_to_srv(struct rtrs_srv_sess *srv, 1495 struct rtrs_srv_path *srv_path) 1496 { 1497 list_add_tail(&srv_path->s.entry, &srv->paths_list); 1498 srv->paths_num++; 1499 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1500 } 1501 1502 static void del_path_from_srv(struct rtrs_srv_path *srv_path) 1503 { 1504 struct rtrs_srv_sess *srv = srv_path->srv; 1505 1506 if (WARN_ON(!srv)) 1507 return; 1508 1509 mutex_lock(&srv->paths_mutex); 1510 list_del(&srv_path->s.entry); 1511 WARN_ON(!srv->paths_num); 1512 srv->paths_num--; 1513 mutex_unlock(&srv->paths_mutex); 1514 } 1515 1516 /* return true if addresses are the same, error other wise */ 1517 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1518 { 1519 switch (a->sa_family) { 1520 case AF_IB: 1521 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1522 &((struct sockaddr_ib *)b)->sib_addr, 1523 sizeof(struct ib_addr)) && 1524 (b->sa_family == AF_IB); 1525 case AF_INET: 1526 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1527 &((struct sockaddr_in *)b)->sin_addr, 1528 sizeof(struct in_addr)) && 1529 (b->sa_family == AF_INET); 1530 case AF_INET6: 1531 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1532 &((struct sockaddr_in6 *)b)->sin6_addr, 1533 sizeof(struct in6_addr)) && 1534 (b->sa_family == AF_INET6); 1535 default: 1536 return -ENOENT; 1537 } 1538 } 1539 1540 /* Let's close connections which have been waiting for more than 30 seconds */ 1541 #define RTRS_MAX_CONN_TIMEOUT 30000 1542 1543 static void rtrs_srv_check_close_path(struct rtrs_srv_path *srv_path) 1544 { 1545 struct rtrs_path *s = &srv_path->s; 1546 1547 if (srv_path->state == RTRS_SRV_CONNECTING && srv_path->connection_timeout && 1548 (jiffies_to_msecs(jiffies - srv_path->connection_timeout) > RTRS_MAX_CONN_TIMEOUT)) { 1549 rtrs_err(s, "Closing zombie path\n"); 1550 close_path(srv_path); 1551 } 1552 } 1553 1554 static bool __is_path_w_addr_exists(struct rtrs_srv_sess *srv, 1555 struct rdma_addr *addr) 1556 { 1557 struct rtrs_srv_path *srv_path; 1558 1559 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 1560 if (!sockaddr_cmp((struct sockaddr *)&srv_path->s.dst_addr, 1561 (struct sockaddr *)&addr->dst_addr) && 1562 !sockaddr_cmp((struct sockaddr *)&srv_path->s.src_addr, 1563 (struct sockaddr *)&addr->src_addr)) { 1564 rtrs_err((&srv_path->s), 1565 "Path (%s) with same addr exists (lifetime %u)\n", 1566 rtrs_srv_state_str(srv_path->state), 1567 (jiffies_to_msecs(jiffies - srv_path->connection_timeout))); 1568 rtrs_srv_check_close_path(srv_path); 1569 return true; 1570 } 1571 } 1572 1573 return false; 1574 } 1575 1576 static void free_path(struct rtrs_srv_path *srv_path) 1577 { 1578 if (srv_path->kobj.state_in_sysfs) { 1579 kobject_del(&srv_path->kobj); 1580 kobject_put(&srv_path->kobj); 1581 } else { 1582 free_percpu(srv_path->stats->rdma_stats); 1583 kfree(srv_path->stats); 1584 kfree(srv_path); 1585 } 1586 } 1587 1588 static void rtrs_srv_close_work(struct work_struct *work) 1589 { 1590 struct rtrs_srv_path *srv_path; 1591 struct rtrs_srv_con *con; 1592 int i; 1593 1594 srv_path = container_of(work, typeof(*srv_path), close_work); 1595 1596 rtrs_srv_stop_hb(srv_path); 1597 1598 for (i = 0; i < srv_path->s.con_num; i++) { 1599 if (!srv_path->s.con[i]) 1600 continue; 1601 con = to_srv_con(srv_path->s.con[i]); 1602 rdma_disconnect(con->c.cm_id); 1603 ib_drain_qp(con->c.qp); 1604 } 1605 1606 /* 1607 * Degrade ref count to the usual model with a single shared 1608 * atomic_t counter 1609 */ 1610 percpu_ref_kill(&srv_path->ids_inflight_ref); 1611 1612 /* Wait for all completion */ 1613 wait_for_completion(&srv_path->complete_done); 1614 1615 rtrs_srv_destroy_path_files(srv_path); 1616 1617 /* Notify upper layer if we are the last path */ 1618 rtrs_srv_path_down(srv_path); 1619 1620 unmap_cont_bufs(srv_path); 1621 rtrs_srv_free_ops_ids(srv_path); 1622 1623 for (i = 0; i < srv_path->s.con_num; i++) { 1624 if (!srv_path->s.con[i]) 1625 continue; 1626 con = to_srv_con(srv_path->s.con[i]); 1627 rtrs_cq_qp_destroy(&con->c); 1628 rdma_destroy_id(con->c.cm_id); 1629 kfree(con); 1630 } 1631 rtrs_ib_dev_put(srv_path->s.dev); 1632 1633 del_path_from_srv(srv_path); 1634 put_srv(srv_path->srv); 1635 srv_path->srv = NULL; 1636 rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSED); 1637 1638 kfree(srv_path->dma_addr); 1639 kfree(srv_path->s.con); 1640 free_path(srv_path); 1641 } 1642 1643 static int rtrs_rdma_do_accept(struct rtrs_srv_path *srv_path, 1644 struct rdma_cm_id *cm_id) 1645 { 1646 struct rtrs_srv_sess *srv = srv_path->srv; 1647 struct rtrs_msg_conn_rsp msg; 1648 struct rdma_conn_param param; 1649 int err; 1650 1651 param = (struct rdma_conn_param) { 1652 .rnr_retry_count = 7, 1653 .private_data = &msg, 1654 .private_data_len = sizeof(msg), 1655 }; 1656 1657 msg = (struct rtrs_msg_conn_rsp) { 1658 .magic = cpu_to_le16(RTRS_MAGIC), 1659 .version = cpu_to_le16(RTRS_PROTO_VER), 1660 .queue_depth = cpu_to_le16(srv->queue_depth), 1661 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1662 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1663 }; 1664 1665 if (always_invalidate) 1666 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1667 1668 err = rdma_accept(cm_id, ¶m); 1669 if (err) 1670 pr_err("rdma_accept(), err: %pe\n", ERR_PTR(err)); 1671 1672 return err; 1673 } 1674 1675 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1676 { 1677 struct rtrs_msg_conn_rsp msg; 1678 int err; 1679 1680 msg = (struct rtrs_msg_conn_rsp) { 1681 .magic = cpu_to_le16(RTRS_MAGIC), 1682 .version = cpu_to_le16(RTRS_PROTO_VER), 1683 .errno = cpu_to_le16(errno), 1684 }; 1685 1686 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1687 if (err) 1688 pr_err("rdma_reject(), err: %pe\n", ERR_PTR(err)); 1689 1690 /* Bounce errno back */ 1691 return errno; 1692 } 1693 1694 static struct rtrs_srv_path * 1695 __find_path(struct rtrs_srv_sess *srv, const uuid_t *sess_uuid) 1696 { 1697 struct rtrs_srv_path *srv_path; 1698 1699 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 1700 if (uuid_equal(&srv_path->s.uuid, sess_uuid)) 1701 return srv_path; 1702 } 1703 1704 return NULL; 1705 } 1706 1707 static int create_con(struct rtrs_srv_path *srv_path, 1708 struct rdma_cm_id *cm_id, 1709 unsigned int cid) 1710 { 1711 struct rtrs_srv_sess *srv = srv_path->srv; 1712 struct rtrs_path *s = &srv_path->s; 1713 struct rtrs_srv_con *con; 1714 1715 u32 cq_num, max_send_wr, max_recv_wr, wr_limit; 1716 int err, cq_vector; 1717 1718 con = kzalloc(sizeof(*con), GFP_KERNEL); 1719 if (!con) { 1720 err = -ENOMEM; 1721 goto err; 1722 } 1723 1724 spin_lock_init(&con->rsp_wr_wait_lock); 1725 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1726 con->c.cm_id = cm_id; 1727 con->c.path = &srv_path->s; 1728 con->c.cid = cid; 1729 atomic_set(&con->c.wr_cnt, 1); 1730 wr_limit = srv_path->s.dev->ib_dev->attrs.max_qp_wr; 1731 1732 if (con->c.cid == 0) { 1733 /* 1734 * All receive and all send (each requiring invalidate) 1735 * + 2 for drain and heartbeat 1736 */ 1737 max_send_wr = min_t(int, wr_limit, 1738 SERVICE_CON_QUEUE_DEPTH * 2 + 2); 1739 max_recv_wr = max_send_wr; 1740 s->signal_interval = min_not_zero(srv->queue_depth, 1741 (size_t)SERVICE_CON_QUEUE_DEPTH); 1742 } else { 1743 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */ 1744 if (always_invalidate) 1745 max_send_wr = 1746 min_t(int, wr_limit, 1747 srv->queue_depth * (1 + 4) + 1); 1748 else 1749 max_send_wr = 1750 min_t(int, wr_limit, 1751 srv->queue_depth * (1 + 2) + 1); 1752 1753 max_recv_wr = srv->queue_depth + 1; 1754 } 1755 cq_num = max_send_wr + max_recv_wr; 1756 atomic_set(&con->c.sq_wr_avail, max_send_wr); 1757 cq_vector = rtrs_srv_get_next_cq_vector(srv_path); 1758 1759 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1760 err = rtrs_cq_qp_create(&srv_path->s, &con->c, 1, cq_vector, cq_num, 1761 max_send_wr, max_recv_wr, 1762 IB_POLL_WORKQUEUE); 1763 if (err) { 1764 rtrs_err(s, "rtrs_cq_qp_create(), err: %pe\n", ERR_PTR(err)); 1765 goto free_con; 1766 } 1767 if (con->c.cid == 0) { 1768 err = post_recv_info_req(con); 1769 if (err) 1770 goto free_cqqp; 1771 } 1772 WARN_ON(srv_path->s.con[cid]); 1773 srv_path->s.con[cid] = &con->c; 1774 1775 /* 1776 * Change context from server to current connection. The other 1777 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1778 */ 1779 cm_id->context = &con->c; 1780 1781 return 0; 1782 1783 free_cqqp: 1784 rtrs_cq_qp_destroy(&con->c); 1785 free_con: 1786 kfree(con); 1787 1788 err: 1789 return err; 1790 } 1791 1792 static struct rtrs_srv_path *__alloc_path(struct rtrs_srv_sess *srv, 1793 struct rdma_cm_id *cm_id, 1794 unsigned int con_num, 1795 unsigned int recon_cnt, 1796 const uuid_t *uuid) 1797 { 1798 struct rtrs_srv_path *srv_path; 1799 int err = -ENOMEM; 1800 char str[NAME_MAX]; 1801 struct rtrs_addr path; 1802 1803 if (srv->paths_num >= MAX_PATHS_NUM) { 1804 err = -ECONNRESET; 1805 goto err; 1806 } 1807 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1808 err = -EEXIST; 1809 goto err; 1810 } 1811 srv_path = kzalloc(sizeof(*srv_path), GFP_KERNEL); 1812 if (!srv_path) 1813 goto err; 1814 1815 srv_path->stats = kzalloc(sizeof(*srv_path->stats), GFP_KERNEL); 1816 if (!srv_path->stats) 1817 goto err_free_sess; 1818 1819 srv_path->stats->rdma_stats = alloc_percpu(struct rtrs_srv_stats_rdma_stats); 1820 if (!srv_path->stats->rdma_stats) 1821 goto err_free_stats; 1822 1823 srv_path->stats->srv_path = srv_path; 1824 1825 srv_path->dma_addr = kcalloc(srv->queue_depth, 1826 sizeof(*srv_path->dma_addr), 1827 GFP_KERNEL); 1828 if (!srv_path->dma_addr) 1829 goto err_free_percpu; 1830 1831 srv_path->s.con = kcalloc(con_num, sizeof(*srv_path->s.con), 1832 GFP_KERNEL); 1833 if (!srv_path->s.con) 1834 goto err_free_dma_addr; 1835 1836 srv_path->state = RTRS_SRV_CONNECTING; 1837 srv_path->srv = srv; 1838 srv_path->cur_cq_vector = -1; 1839 srv_path->s.dst_addr = cm_id->route.addr.dst_addr; 1840 srv_path->s.src_addr = cm_id->route.addr.src_addr; 1841 1842 /* temporary until receiving session-name from client */ 1843 path.src = &srv_path->s.src_addr; 1844 path.dst = &srv_path->s.dst_addr; 1845 rtrs_addr_to_str(&path, str, sizeof(str)); 1846 strscpy(srv_path->s.sessname, str, sizeof(srv_path->s.sessname)); 1847 1848 srv_path->s.con_num = con_num; 1849 srv_path->s.irq_con_num = con_num; 1850 srv_path->s.recon_cnt = recon_cnt; 1851 uuid_copy(&srv_path->s.uuid, uuid); 1852 spin_lock_init(&srv_path->state_lock); 1853 INIT_WORK(&srv_path->close_work, rtrs_srv_close_work); 1854 rtrs_srv_init_hb(srv_path); 1855 srv_path->connection_timeout = 0; 1856 1857 srv_path->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1858 if (!srv_path->s.dev) { 1859 err = -ENOMEM; 1860 goto err_free_con; 1861 } 1862 err = map_cont_bufs(srv_path); 1863 if (err) 1864 goto err_put_dev; 1865 1866 err = rtrs_srv_alloc_ops_ids(srv_path); 1867 if (err) 1868 goto err_unmap_bufs; 1869 1870 __add_path_to_srv(srv, srv_path); 1871 1872 return srv_path; 1873 1874 err_unmap_bufs: 1875 unmap_cont_bufs(srv_path); 1876 err_put_dev: 1877 rtrs_ib_dev_put(srv_path->s.dev); 1878 err_free_con: 1879 kfree(srv_path->s.con); 1880 err_free_dma_addr: 1881 kfree(srv_path->dma_addr); 1882 err_free_percpu: 1883 free_percpu(srv_path->stats->rdma_stats); 1884 err_free_stats: 1885 kfree(srv_path->stats); 1886 err_free_sess: 1887 kfree(srv_path); 1888 err: 1889 return ERR_PTR(err); 1890 } 1891 1892 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1893 const struct rtrs_msg_conn_req *msg, 1894 size_t len) 1895 { 1896 struct rtrs_srv_ctx *ctx = cm_id->context; 1897 struct rtrs_srv_path *srv_path; 1898 struct rtrs_srv_sess *srv; 1899 1900 u16 version, con_num, cid; 1901 u16 recon_cnt; 1902 int err = -ECONNRESET; 1903 1904 if (len < sizeof(*msg)) { 1905 pr_err("Invalid RTRS connection request\n"); 1906 goto reject_w_err; 1907 } 1908 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1909 pr_err("Invalid RTRS magic\n"); 1910 goto reject_w_err; 1911 } 1912 version = le16_to_cpu(msg->version); 1913 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1914 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1915 version >> 8, RTRS_PROTO_VER_MAJOR); 1916 goto reject_w_err; 1917 } 1918 con_num = le16_to_cpu(msg->cid_num); 1919 if (con_num > 4096) { 1920 /* Sanity check */ 1921 pr_err("Too many connections requested: %d\n", con_num); 1922 goto reject_w_err; 1923 } 1924 cid = le16_to_cpu(msg->cid); 1925 if (cid >= con_num) { 1926 /* Sanity check */ 1927 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1928 goto reject_w_err; 1929 } 1930 recon_cnt = le16_to_cpu(msg->recon_cnt); 1931 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn); 1932 if (IS_ERR(srv)) { 1933 err = PTR_ERR(srv); 1934 pr_err("get_or_create_srv(), error %d\n", err); 1935 goto reject_w_err; 1936 } 1937 mutex_lock(&srv->paths_mutex); 1938 srv_path = __find_path(srv, &msg->sess_uuid); 1939 if (srv_path) { 1940 struct rtrs_path *s = &srv_path->s; 1941 1942 /* Session already holds a reference */ 1943 put_srv(srv); 1944 1945 if (srv_path->state != RTRS_SRV_CONNECTING) { 1946 rtrs_err(s, "Session in wrong state: %s\n", 1947 rtrs_srv_state_str(srv_path->state)); 1948 mutex_unlock(&srv->paths_mutex); 1949 goto reject_w_err; 1950 } 1951 /* 1952 * Sanity checks 1953 */ 1954 if (con_num != s->con_num || cid >= s->con_num) { 1955 rtrs_err(s, "Incorrect request: %d, %d\n", 1956 cid, con_num); 1957 mutex_unlock(&srv->paths_mutex); 1958 goto reject_w_err; 1959 } 1960 if (s->con[cid]) { 1961 rtrs_err(s, "Connection (%s) already exists: %d (lifetime %u)\n", 1962 rtrs_srv_state_str(srv_path->state), cid, 1963 (jiffies_to_msecs(jiffies - srv_path->connection_timeout))); 1964 rtrs_srv_check_close_path(srv_path); 1965 mutex_unlock(&srv->paths_mutex); 1966 goto reject_w_err; 1967 } 1968 } else { 1969 srv_path = __alloc_path(srv, cm_id, con_num, recon_cnt, 1970 &msg->sess_uuid); 1971 if (IS_ERR(srv_path)) { 1972 mutex_unlock(&srv->paths_mutex); 1973 put_srv(srv); 1974 err = PTR_ERR(srv_path); 1975 pr_err("RTRS server session allocation failed: %d\n", err); 1976 goto reject_w_err; 1977 } 1978 } 1979 1980 /* 1981 * Start of any connection creation resets the timeout for the path. 1982 */ 1983 srv_path->connection_timeout = jiffies; 1984 1985 err = create_con(srv_path, cm_id, cid); 1986 if (err) { 1987 rtrs_err((&srv_path->s), "create_con(), error %pe\n", ERR_PTR(err)); 1988 rtrs_rdma_do_reject(cm_id, err); 1989 /* 1990 * Since session has other connections we follow normal way 1991 * through workqueue, but still return an error to tell cma.c 1992 * to call rdma_destroy_id() for current connection. 1993 */ 1994 goto close_and_return_err; 1995 } 1996 err = rtrs_rdma_do_accept(srv_path, cm_id); 1997 if (err) { 1998 rtrs_err((&srv_path->s), "rtrs_rdma_do_accept(), error %pe\n", 1999 ERR_PTR(err)); 2000 rtrs_rdma_do_reject(cm_id, err); 2001 /* 2002 * Since current connection was successfully added to the 2003 * session we follow normal way through workqueue to close the 2004 * session, thus return 0 to tell cma.c we call 2005 * rdma_destroy_id() ourselves. 2006 */ 2007 err = 0; 2008 goto close_and_return_err; 2009 } 2010 mutex_unlock(&srv->paths_mutex); 2011 2012 return 0; 2013 2014 reject_w_err: 2015 return rtrs_rdma_do_reject(cm_id, err); 2016 2017 close_and_return_err: 2018 mutex_unlock(&srv->paths_mutex); 2019 close_path(srv_path); 2020 2021 return err; 2022 } 2023 2024 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 2025 struct rdma_cm_event *ev) 2026 { 2027 struct rtrs_srv_path *srv_path = NULL; 2028 struct rtrs_path *s = NULL; 2029 struct rtrs_con *c = NULL; 2030 2031 if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST) 2032 /* 2033 * In case of error cma.c will destroy cm_id, 2034 * see cma_process_remove() 2035 */ 2036 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 2037 ev->param.conn.private_data_len); 2038 2039 c = cm_id->context; 2040 s = c->path; 2041 srv_path = to_srv_path(s); 2042 2043 switch (ev->event) { 2044 case RDMA_CM_EVENT_ESTABLISHED: 2045 /* Nothing here */ 2046 break; 2047 case RDMA_CM_EVENT_REJECTED: 2048 case RDMA_CM_EVENT_CONNECT_ERROR: 2049 case RDMA_CM_EVENT_UNREACHABLE: 2050 if (ev->status < 0) { 2051 rtrs_err(s, "CM error (CM event: %s, err: %pe)\n", 2052 rdma_event_msg(ev->event), 2053 ERR_PTR(ev->status)); 2054 } else if (ev->status > 0) { 2055 rtrs_err(s, "CM error (CM event: %s, err: %s)\n", 2056 rdma_event_msg(ev->event), 2057 rdma_reject_msg(cm_id, ev->status)); 2058 } 2059 fallthrough; 2060 case RDMA_CM_EVENT_DISCONNECTED: 2061 case RDMA_CM_EVENT_ADDR_CHANGE: 2062 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 2063 case RDMA_CM_EVENT_DEVICE_REMOVAL: 2064 close_path(srv_path); 2065 break; 2066 default: 2067 if (ev->status < 0) { 2068 pr_err("Ignoring unexpected CM event %s, err %pe\n", 2069 rdma_event_msg(ev->event), 2070 ERR_PTR(ev->status)); 2071 } else if (ev->status > 0) { 2072 pr_err("Ignoring unexpected CM event %s, err %s\n", 2073 rdma_event_msg(ev->event), 2074 rdma_reject_msg(cm_id, ev->status)); 2075 } 2076 break; 2077 } 2078 2079 return 0; 2080 } 2081 2082 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 2083 struct sockaddr *addr, 2084 enum rdma_ucm_port_space ps) 2085 { 2086 struct rdma_cm_id *cm_id; 2087 int ret; 2088 2089 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 2090 ctx, ps, IB_QPT_RC); 2091 if (IS_ERR(cm_id)) { 2092 ret = PTR_ERR(cm_id); 2093 pr_err("Creating id for RDMA connection failed, err: %d\n", 2094 ret); 2095 goto err_out; 2096 } 2097 ret = rdma_bind_addr(cm_id, addr); 2098 if (ret) { 2099 pr_err("Binding RDMA address failed, err: %pe\n", ERR_PTR(ret)); 2100 goto err_cm; 2101 } 2102 ret = rdma_listen(cm_id, 64); 2103 if (ret) { 2104 pr_err("Listening on RDMA connection failed, err: %pe\n", 2105 ERR_PTR(ret)); 2106 goto err_cm; 2107 } 2108 2109 return cm_id; 2110 2111 err_cm: 2112 rdma_destroy_id(cm_id); 2113 err_out: 2114 2115 return ERR_PTR(ret); 2116 } 2117 2118 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 2119 { 2120 struct sockaddr_in6 sin = { 2121 .sin6_family = AF_INET6, 2122 .sin6_addr = IN6ADDR_ANY_INIT, 2123 .sin6_port = htons(port), 2124 }; 2125 struct sockaddr_ib sib = { 2126 .sib_family = AF_IB, 2127 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 2128 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 2129 .sib_pkey = cpu_to_be16(0xffff), 2130 }; 2131 struct rdma_cm_id *cm_ip, *cm_ib; 2132 int ret; 2133 2134 /* 2135 * We accept both IPoIB and IB connections, so we need to keep 2136 * two cm id's, one for each socket type and port space. 2137 * If the cm initialization of one of the id's fails, we abort 2138 * everything. 2139 */ 2140 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 2141 if (IS_ERR(cm_ip)) 2142 return PTR_ERR(cm_ip); 2143 2144 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 2145 if (IS_ERR(cm_ib)) { 2146 ret = PTR_ERR(cm_ib); 2147 goto free_cm_ip; 2148 } 2149 2150 ctx->cm_id_ip = cm_ip; 2151 ctx->cm_id_ib = cm_ib; 2152 2153 return 0; 2154 2155 free_cm_ip: 2156 rdma_destroy_id(cm_ip); 2157 2158 return ret; 2159 } 2160 2161 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 2162 { 2163 struct rtrs_srv_ctx *ctx; 2164 2165 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2166 if (!ctx) 2167 return NULL; 2168 2169 ctx->ops = *ops; 2170 mutex_init(&ctx->srv_mutex); 2171 INIT_LIST_HEAD(&ctx->srv_list); 2172 2173 return ctx; 2174 } 2175 2176 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2177 { 2178 WARN_ON(!list_empty(&ctx->srv_list)); 2179 mutex_destroy(&ctx->srv_mutex); 2180 kfree(ctx); 2181 } 2182 2183 static int rtrs_srv_add_one(struct ib_device *device) 2184 { 2185 struct rtrs_srv_ctx *ctx; 2186 int ret = 0; 2187 2188 mutex_lock(&ib_ctx.ib_dev_mutex); 2189 if (ib_ctx.ib_dev_count) 2190 goto out; 2191 2192 /* 2193 * Since our CM IDs are NOT bound to any ib device we will create them 2194 * only once 2195 */ 2196 ctx = ib_ctx.srv_ctx; 2197 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2198 if (ret) { 2199 /* 2200 * We errored out here. 2201 * According to the ib code, if we encounter an error here then the 2202 * error code is ignored, and no more calls to our ops are made. 2203 */ 2204 pr_err("Failed to initialize RDMA connection"); 2205 goto err_out; 2206 } 2207 2208 out: 2209 /* 2210 * Keep a track on the number of ib devices added 2211 */ 2212 ib_ctx.ib_dev_count++; 2213 2214 err_out: 2215 mutex_unlock(&ib_ctx.ib_dev_mutex); 2216 return ret; 2217 } 2218 2219 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2220 { 2221 struct rtrs_srv_ctx *ctx; 2222 2223 mutex_lock(&ib_ctx.ib_dev_mutex); 2224 ib_ctx.ib_dev_count--; 2225 2226 if (ib_ctx.ib_dev_count) 2227 goto out; 2228 2229 /* 2230 * Since our CM IDs are NOT bound to any ib device we will remove them 2231 * only once, when the last device is removed 2232 */ 2233 ctx = ib_ctx.srv_ctx; 2234 rdma_destroy_id(ctx->cm_id_ip); 2235 rdma_destroy_id(ctx->cm_id_ib); 2236 2237 out: 2238 mutex_unlock(&ib_ctx.ib_dev_mutex); 2239 } 2240 2241 static struct ib_client rtrs_srv_client = { 2242 .name = "rtrs_server", 2243 .add = rtrs_srv_add_one, 2244 .remove = rtrs_srv_remove_one 2245 }; 2246 2247 /** 2248 * rtrs_srv_open() - open RTRS server context 2249 * @ops: callback functions 2250 * @port: port to listen on 2251 * 2252 * Creates server context with specified callbacks. 2253 * 2254 * Return a valid pointer on success otherwise PTR_ERR. 2255 */ 2256 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2257 { 2258 struct rtrs_srv_ctx *ctx; 2259 int err; 2260 2261 ctx = alloc_srv_ctx(ops); 2262 if (!ctx) 2263 return ERR_PTR(-ENOMEM); 2264 2265 mutex_init(&ib_ctx.ib_dev_mutex); 2266 ib_ctx.srv_ctx = ctx; 2267 ib_ctx.port = port; 2268 2269 err = ib_register_client(&rtrs_srv_client); 2270 if (err) { 2271 free_srv_ctx(ctx); 2272 return ERR_PTR(err); 2273 } 2274 2275 return ctx; 2276 } 2277 EXPORT_SYMBOL(rtrs_srv_open); 2278 2279 static void close_paths(struct rtrs_srv_sess *srv) 2280 { 2281 struct rtrs_srv_path *srv_path; 2282 2283 mutex_lock(&srv->paths_mutex); 2284 list_for_each_entry(srv_path, &srv->paths_list, s.entry) 2285 close_path(srv_path); 2286 mutex_unlock(&srv->paths_mutex); 2287 } 2288 2289 static void close_ctx(struct rtrs_srv_ctx *ctx) 2290 { 2291 struct rtrs_srv_sess *srv; 2292 2293 mutex_lock(&ctx->srv_mutex); 2294 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2295 close_paths(srv); 2296 mutex_unlock(&ctx->srv_mutex); 2297 flush_workqueue(rtrs_wq); 2298 } 2299 2300 /** 2301 * rtrs_srv_close() - close RTRS server context 2302 * @ctx: pointer to server context 2303 * 2304 * Closes RTRS server context with all client sessions. 2305 */ 2306 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2307 { 2308 ib_unregister_client(&rtrs_srv_client); 2309 mutex_destroy(&ib_ctx.ib_dev_mutex); 2310 close_ctx(ctx); 2311 free_srv_ctx(ctx); 2312 } 2313 EXPORT_SYMBOL(rtrs_srv_close); 2314 2315 static int check_module_params(void) 2316 { 2317 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2318 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2319 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2320 return -EINVAL; 2321 } 2322 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) { 2323 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2324 max_chunk_size, MIN_CHUNK_SIZE); 2325 return -EINVAL; 2326 } 2327 2328 /* 2329 * Check if IB immediate data size is enough to hold the mem_id and the 2330 * offset inside the memory chunk 2331 */ 2332 if ((ilog2(sess_queue_depth - 1) + 1) + 2333 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2334 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2335 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2336 return -EINVAL; 2337 } 2338 2339 return 0; 2340 } 2341 2342 void rtrs_srv_ib_event_handler(struct ib_event_handler *handler, 2343 struct ib_event *ibevent) 2344 { 2345 struct ib_device *idev = ibevent->device; 2346 u32 port_num = ibevent->element.port_num; 2347 2348 pr_info("Handling event: %s (%d). HCA name: %s, port num: %u\n", 2349 ib_event_msg(ibevent->event), ibevent->event, idev->name, port_num); 2350 } 2351 2352 static int rtrs_srv_ib_dev_init(struct rtrs_ib_dev *dev) 2353 { 2354 INIT_IB_EVENT_HANDLER(&dev->event_handler, dev->ib_dev, 2355 rtrs_srv_ib_event_handler); 2356 ib_register_event_handler(&dev->event_handler); 2357 2358 return 0; 2359 } 2360 2361 static void rtrs_srv_ib_dev_deinit(struct rtrs_ib_dev *dev) 2362 { 2363 ib_unregister_event_handler(&dev->event_handler); 2364 } 2365 2366 2367 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = { 2368 .init = rtrs_srv_ib_dev_init, 2369 .deinit = rtrs_srv_ib_dev_deinit 2370 }; 2371 2372 2373 static int __init rtrs_server_init(void) 2374 { 2375 int err; 2376 2377 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2378 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2379 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2380 sess_queue_depth, always_invalidate); 2381 2382 rtrs_rdma_dev_pd_init(0, &dev_pd); 2383 2384 err = check_module_params(); 2385 if (err) { 2386 pr_err("Failed to load module, invalid module parameters, err: %pe\n", 2387 ERR_PTR(err)); 2388 return err; 2389 } 2390 err = class_register(&rtrs_dev_class); 2391 if (err) 2392 goto out_err; 2393 2394 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2395 if (!rtrs_wq) { 2396 err = -ENOMEM; 2397 goto out_dev_class; 2398 } 2399 2400 return 0; 2401 2402 out_dev_class: 2403 class_unregister(&rtrs_dev_class); 2404 out_err: 2405 return err; 2406 } 2407 2408 static void __exit rtrs_server_exit(void) 2409 { 2410 destroy_workqueue(rtrs_wq); 2411 class_unregister(&rtrs_dev_class); 2412 rtrs_rdma_dev_pd_deinit(&dev_pd); 2413 } 2414 2415 module_init(rtrs_server_init); 2416 module_exit(rtrs_server_exit); 2417