1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/mempool.h> 15 16 #include "rtrs-srv.h" 17 #include "rtrs-log.h" 18 #include <rdma/ib_cm.h> 19 #include <rdma/ib_verbs.h> 20 21 MODULE_DESCRIPTION("RDMA Transport Server"); 22 MODULE_LICENSE("GPL"); 23 24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 26 #define DEFAULT_SESS_QUEUE_DEPTH 512 27 #define MAX_HDR_SIZE PAGE_SIZE 28 29 /* We guarantee to serve 10 paths at least */ 30 #define CHUNK_POOL_SZ 10 31 32 static struct rtrs_rdma_dev_pd dev_pd; 33 static mempool_t *chunk_pool; 34 struct class *rtrs_dev_class; 35 static struct rtrs_srv_ib_ctx ib_ctx; 36 37 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 38 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 39 40 static bool always_invalidate = true; 41 module_param(always_invalidate, bool, 0444); 42 MODULE_PARM_DESC(always_invalidate, 43 "Invalidate memory registration for contiguous memory regions before accessing."); 44 45 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 46 MODULE_PARM_DESC(max_chunk_size, 47 "Max size for each IO request, when change the unit is in byte (default: " 48 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 49 50 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 51 MODULE_PARM_DESC(sess_queue_depth, 52 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 53 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 54 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 55 56 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 57 58 static struct workqueue_struct *rtrs_wq; 59 60 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 61 { 62 return container_of(c, struct rtrs_srv_con, c); 63 } 64 65 static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s) 66 { 67 return container_of(s, struct rtrs_srv_sess, s); 68 } 69 70 static bool __rtrs_srv_change_state(struct rtrs_srv_sess *sess, 71 enum rtrs_srv_state new_state) 72 { 73 enum rtrs_srv_state old_state; 74 bool changed = false; 75 76 lockdep_assert_held(&sess->state_lock); 77 old_state = sess->state; 78 switch (new_state) { 79 case RTRS_SRV_CONNECTED: 80 switch (old_state) { 81 case RTRS_SRV_CONNECTING: 82 changed = true; 83 fallthrough; 84 default: 85 break; 86 } 87 break; 88 case RTRS_SRV_CLOSING: 89 switch (old_state) { 90 case RTRS_SRV_CONNECTING: 91 case RTRS_SRV_CONNECTED: 92 changed = true; 93 fallthrough; 94 default: 95 break; 96 } 97 break; 98 case RTRS_SRV_CLOSED: 99 switch (old_state) { 100 case RTRS_SRV_CLOSING: 101 changed = true; 102 fallthrough; 103 default: 104 break; 105 } 106 break; 107 default: 108 break; 109 } 110 if (changed) 111 sess->state = new_state; 112 113 return changed; 114 } 115 116 static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess, 117 enum rtrs_srv_state new_state) 118 { 119 bool changed; 120 121 spin_lock_irq(&sess->state_lock); 122 changed = __rtrs_srv_change_state(sess, new_state); 123 spin_unlock_irq(&sess->state_lock); 124 125 return changed; 126 } 127 128 static void free_id(struct rtrs_srv_op *id) 129 { 130 if (!id) 131 return; 132 kfree(id); 133 } 134 135 static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess) 136 { 137 struct rtrs_srv *srv = sess->srv; 138 int i; 139 140 WARN_ON(atomic_read(&sess->ids_inflight)); 141 if (sess->ops_ids) { 142 for (i = 0; i < srv->queue_depth; i++) 143 free_id(sess->ops_ids[i]); 144 kfree(sess->ops_ids); 145 sess->ops_ids = NULL; 146 } 147 } 148 149 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 150 151 static struct ib_cqe io_comp_cqe = { 152 .done = rtrs_srv_rdma_done 153 }; 154 155 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess) 156 { 157 struct rtrs_srv *srv = sess->srv; 158 struct rtrs_srv_op *id; 159 int i; 160 161 sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids), 162 GFP_KERNEL); 163 if (!sess->ops_ids) 164 goto err; 165 166 for (i = 0; i < srv->queue_depth; ++i) { 167 id = kzalloc(sizeof(*id), GFP_KERNEL); 168 if (!id) 169 goto err; 170 171 sess->ops_ids[i] = id; 172 } 173 init_waitqueue_head(&sess->ids_waitq); 174 atomic_set(&sess->ids_inflight, 0); 175 176 return 0; 177 178 err: 179 rtrs_srv_free_ops_ids(sess); 180 return -ENOMEM; 181 } 182 183 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess) 184 { 185 atomic_inc(&sess->ids_inflight); 186 } 187 188 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess) 189 { 190 if (atomic_dec_and_test(&sess->ids_inflight)) 191 wake_up(&sess->ids_waitq); 192 } 193 194 static void rtrs_srv_wait_ops_ids(struct rtrs_srv_sess *sess) 195 { 196 wait_event(sess->ids_waitq, !atomic_read(&sess->ids_inflight)); 197 } 198 199 200 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 201 { 202 struct rtrs_srv_con *con = cq->cq_context; 203 struct rtrs_sess *s = con->c.sess; 204 struct rtrs_srv_sess *sess = to_srv_sess(s); 205 206 if (unlikely(wc->status != IB_WC_SUCCESS)) { 207 rtrs_err(s, "REG MR failed: %s\n", 208 ib_wc_status_msg(wc->status)); 209 close_sess(sess); 210 return; 211 } 212 } 213 214 static struct ib_cqe local_reg_cqe = { 215 .done = rtrs_srv_reg_mr_done 216 }; 217 218 static int rdma_write_sg(struct rtrs_srv_op *id) 219 { 220 struct rtrs_sess *s = id->con->c.sess; 221 struct rtrs_srv_sess *sess = to_srv_sess(s); 222 dma_addr_t dma_addr = sess->dma_addr[id->msg_id]; 223 struct rtrs_srv_mr *srv_mr; 224 struct rtrs_srv *srv = sess->srv; 225 struct ib_send_wr inv_wr, imm_wr; 226 struct ib_rdma_wr *wr = NULL; 227 enum ib_send_flags flags; 228 size_t sg_cnt; 229 int err, offset; 230 bool need_inval; 231 u32 rkey = 0; 232 struct ib_reg_wr rwr; 233 struct ib_sge *plist; 234 struct ib_sge list; 235 236 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 237 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 238 if (unlikely(sg_cnt != 1)) 239 return -EINVAL; 240 241 offset = 0; 242 243 wr = &id->tx_wr; 244 plist = &id->tx_sg; 245 plist->addr = dma_addr + offset; 246 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 247 248 /* WR will fail with length error 249 * if this is 0 250 */ 251 if (unlikely(plist->length == 0)) { 252 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 253 return -EINVAL; 254 } 255 256 plist->lkey = sess->s.dev->ib_pd->local_dma_lkey; 257 offset += plist->length; 258 259 wr->wr.sg_list = plist; 260 wr->wr.num_sge = 1; 261 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 262 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 263 if (rkey == 0) 264 rkey = wr->rkey; 265 else 266 /* Only one key is actually used */ 267 WARN_ON_ONCE(rkey != wr->rkey); 268 269 wr->wr.opcode = IB_WR_RDMA_WRITE; 270 wr->wr.ex.imm_data = 0; 271 wr->wr.send_flags = 0; 272 273 if (need_inval && always_invalidate) { 274 wr->wr.next = &rwr.wr; 275 rwr.wr.next = &inv_wr; 276 inv_wr.next = &imm_wr; 277 } else if (always_invalidate) { 278 wr->wr.next = &rwr.wr; 279 rwr.wr.next = &imm_wr; 280 } else if (need_inval) { 281 wr->wr.next = &inv_wr; 282 inv_wr.next = &imm_wr; 283 } else { 284 wr->wr.next = &imm_wr; 285 } 286 /* 287 * From time to time we have to post signaled sends, 288 * or send queue will fill up and only QP reset can help. 289 */ 290 flags = (atomic_inc_return(&id->con->wr_cnt) % srv->queue_depth) ? 291 0 : IB_SEND_SIGNALED; 292 293 if (need_inval) { 294 inv_wr.sg_list = NULL; 295 inv_wr.num_sge = 0; 296 inv_wr.opcode = IB_WR_SEND_WITH_INV; 297 inv_wr.send_flags = 0; 298 inv_wr.ex.invalidate_rkey = rkey; 299 } 300 301 imm_wr.next = NULL; 302 if (always_invalidate) { 303 struct rtrs_msg_rkey_rsp *msg; 304 305 srv_mr = &sess->mrs[id->msg_id]; 306 rwr.wr.opcode = IB_WR_REG_MR; 307 rwr.wr.num_sge = 0; 308 rwr.mr = srv_mr->mr; 309 rwr.wr.send_flags = 0; 310 rwr.key = srv_mr->mr->rkey; 311 rwr.access = (IB_ACCESS_LOCAL_WRITE | 312 IB_ACCESS_REMOTE_WRITE); 313 msg = srv_mr->iu->buf; 314 msg->buf_id = cpu_to_le16(id->msg_id); 315 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 316 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 317 318 list.addr = srv_mr->iu->dma_addr; 319 list.length = sizeof(*msg); 320 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 321 imm_wr.sg_list = &list; 322 imm_wr.num_sge = 1; 323 imm_wr.opcode = IB_WR_SEND_WITH_IMM; 324 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 325 srv_mr->iu->dma_addr, 326 srv_mr->iu->size, DMA_TO_DEVICE); 327 } else { 328 imm_wr.sg_list = NULL; 329 imm_wr.num_sge = 0; 330 imm_wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 331 } 332 imm_wr.send_flags = flags; 333 imm_wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 334 0, need_inval)); 335 336 imm_wr.wr_cqe = &io_comp_cqe; 337 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr, 338 offset, DMA_BIDIRECTIONAL); 339 340 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 341 if (unlikely(err)) 342 rtrs_err(s, 343 "Posting RDMA-Write-Request to QP failed, err: %d\n", 344 err); 345 346 return err; 347 } 348 349 /** 350 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 351 * requests or on successful WRITE request. 352 * @con: the connection to send back result 353 * @id: the id associated with the IO 354 * @errno: the error number of the IO. 355 * 356 * Return 0 on success, errno otherwise. 357 */ 358 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 359 int errno) 360 { 361 struct rtrs_sess *s = con->c.sess; 362 struct rtrs_srv_sess *sess = to_srv_sess(s); 363 struct ib_send_wr inv_wr, imm_wr, *wr = NULL; 364 struct ib_reg_wr rwr; 365 struct rtrs_srv *srv = sess->srv; 366 struct rtrs_srv_mr *srv_mr; 367 bool need_inval = false; 368 enum ib_send_flags flags; 369 u32 imm; 370 int err; 371 372 if (id->dir == READ) { 373 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 374 size_t sg_cnt; 375 376 need_inval = le16_to_cpu(rd_msg->flags) & 377 RTRS_MSG_NEED_INVAL_F; 378 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 379 380 if (need_inval) { 381 if (likely(sg_cnt)) { 382 inv_wr.sg_list = NULL; 383 inv_wr.num_sge = 0; 384 inv_wr.opcode = IB_WR_SEND_WITH_INV; 385 inv_wr.send_flags = 0; 386 /* Only one key is actually used */ 387 inv_wr.ex.invalidate_rkey = 388 le32_to_cpu(rd_msg->desc[0].key); 389 } else { 390 WARN_ON_ONCE(1); 391 need_inval = false; 392 } 393 } 394 } 395 396 if (need_inval && always_invalidate) { 397 wr = &inv_wr; 398 inv_wr.next = &rwr.wr; 399 rwr.wr.next = &imm_wr; 400 } else if (always_invalidate) { 401 wr = &rwr.wr; 402 rwr.wr.next = &imm_wr; 403 } else if (need_inval) { 404 wr = &inv_wr; 405 inv_wr.next = &imm_wr; 406 } else { 407 wr = &imm_wr; 408 } 409 /* 410 * From time to time we have to post signalled sends, 411 * or send queue will fill up and only QP reset can help. 412 */ 413 flags = (atomic_inc_return(&con->wr_cnt) % srv->queue_depth) ? 414 0 : IB_SEND_SIGNALED; 415 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 416 imm_wr.next = NULL; 417 if (always_invalidate) { 418 struct ib_sge list; 419 struct rtrs_msg_rkey_rsp *msg; 420 421 srv_mr = &sess->mrs[id->msg_id]; 422 rwr.wr.next = &imm_wr; 423 rwr.wr.opcode = IB_WR_REG_MR; 424 rwr.wr.num_sge = 0; 425 rwr.wr.send_flags = 0; 426 rwr.mr = srv_mr->mr; 427 rwr.key = srv_mr->mr->rkey; 428 rwr.access = (IB_ACCESS_LOCAL_WRITE | 429 IB_ACCESS_REMOTE_WRITE); 430 msg = srv_mr->iu->buf; 431 msg->buf_id = cpu_to_le16(id->msg_id); 432 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 433 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 434 435 list.addr = srv_mr->iu->dma_addr; 436 list.length = sizeof(*msg); 437 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 438 imm_wr.sg_list = &list; 439 imm_wr.num_sge = 1; 440 imm_wr.opcode = IB_WR_SEND_WITH_IMM; 441 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 442 srv_mr->iu->dma_addr, 443 srv_mr->iu->size, DMA_TO_DEVICE); 444 } else { 445 imm_wr.sg_list = NULL; 446 imm_wr.num_sge = 0; 447 imm_wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 448 } 449 imm_wr.send_flags = flags; 450 imm_wr.wr_cqe = &io_comp_cqe; 451 452 imm_wr.ex.imm_data = cpu_to_be32(imm); 453 454 err = ib_post_send(id->con->c.qp, wr, NULL); 455 if (unlikely(err)) 456 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n", 457 err); 458 459 return err; 460 } 461 462 void close_sess(struct rtrs_srv_sess *sess) 463 { 464 if (rtrs_srv_change_state(sess, RTRS_SRV_CLOSING)) 465 queue_work(rtrs_wq, &sess->close_work); 466 WARN_ON(sess->state != RTRS_SRV_CLOSING); 467 } 468 469 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 470 { 471 switch (state) { 472 case RTRS_SRV_CONNECTING: 473 return "RTRS_SRV_CONNECTING"; 474 case RTRS_SRV_CONNECTED: 475 return "RTRS_SRV_CONNECTED"; 476 case RTRS_SRV_CLOSING: 477 return "RTRS_SRV_CLOSING"; 478 case RTRS_SRV_CLOSED: 479 return "RTRS_SRV_CLOSED"; 480 default: 481 return "UNKNOWN"; 482 } 483 } 484 485 /** 486 * rtrs_srv_resp_rdma() - Finish an RDMA request 487 * 488 * @id: Internal RTRS operation identifier 489 * @status: Response Code sent to the other side for this operation. 490 * 0 = success, <=0 error 491 * Context: any 492 * 493 * Finish a RDMA operation. A message is sent to the client and the 494 * corresponding memory areas will be released. 495 */ 496 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 497 { 498 struct rtrs_srv_sess *sess; 499 struct rtrs_srv_con *con; 500 struct rtrs_sess *s; 501 int err; 502 503 if (WARN_ON(!id)) 504 return true; 505 506 con = id->con; 507 s = con->c.sess; 508 sess = to_srv_sess(s); 509 510 id->status = status; 511 512 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 513 rtrs_err_rl(s, 514 "Sending I/O response failed, session is disconnected, sess state %s\n", 515 rtrs_srv_state_str(sess->state)); 516 goto out; 517 } 518 if (always_invalidate) { 519 struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id]; 520 521 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 522 } 523 if (unlikely(atomic_sub_return(1, 524 &con->sq_wr_avail) < 0)) { 525 pr_err("IB send queue full\n"); 526 atomic_add(1, &con->sq_wr_avail); 527 spin_lock(&con->rsp_wr_wait_lock); 528 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 529 spin_unlock(&con->rsp_wr_wait_lock); 530 return false; 531 } 532 533 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 534 err = send_io_resp_imm(con, id, status); 535 else 536 err = rdma_write_sg(id); 537 538 if (unlikely(err)) { 539 rtrs_err_rl(s, "IO response failed: %d\n", err); 540 close_sess(sess); 541 } 542 out: 543 rtrs_srv_put_ops_ids(sess); 544 return true; 545 } 546 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 547 548 /** 549 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 550 * @srv: Session pointer 551 * @priv: The private pointer that is associated with the session. 552 */ 553 void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv) 554 { 555 srv->priv = priv; 556 } 557 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 558 559 static void unmap_cont_bufs(struct rtrs_srv_sess *sess) 560 { 561 int i; 562 563 for (i = 0; i < sess->mrs_num; i++) { 564 struct rtrs_srv_mr *srv_mr; 565 566 srv_mr = &sess->mrs[i]; 567 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 568 ib_dereg_mr(srv_mr->mr); 569 ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl, 570 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 571 sg_free_table(&srv_mr->sgt); 572 } 573 kfree(sess->mrs); 574 } 575 576 static int map_cont_bufs(struct rtrs_srv_sess *sess) 577 { 578 struct rtrs_srv *srv = sess->srv; 579 struct rtrs_sess *ss = &sess->s; 580 int i, mri, err, mrs_num; 581 unsigned int chunk_bits; 582 int chunks_per_mr = 1; 583 584 /* 585 * Here we map queue_depth chunks to MR. Firstly we have to 586 * figure out how many chunks can we map per MR. 587 */ 588 if (always_invalidate) { 589 /* 590 * in order to do invalidate for each chunks of memory, we needs 591 * more memory regions. 592 */ 593 mrs_num = srv->queue_depth; 594 } else { 595 chunks_per_mr = 596 sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 597 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 598 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 599 } 600 601 sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL); 602 if (!sess->mrs) 603 return -ENOMEM; 604 605 sess->mrs_num = mrs_num; 606 607 for (mri = 0; mri < mrs_num; mri++) { 608 struct rtrs_srv_mr *srv_mr = &sess->mrs[mri]; 609 struct sg_table *sgt = &srv_mr->sgt; 610 struct scatterlist *s; 611 struct ib_mr *mr; 612 int nr, chunks; 613 614 chunks = chunks_per_mr * mri; 615 if (!always_invalidate) 616 chunks_per_mr = min_t(int, chunks_per_mr, 617 srv->queue_depth - chunks); 618 619 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 620 if (err) 621 goto err; 622 623 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 624 sg_set_page(s, srv->chunks[chunks + i], 625 max_chunk_size, 0); 626 627 nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl, 628 sgt->nents, DMA_BIDIRECTIONAL); 629 if (nr < sgt->nents) { 630 err = nr < 0 ? nr : -EINVAL; 631 goto free_sg; 632 } 633 mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 634 sgt->nents); 635 if (IS_ERR(mr)) { 636 err = PTR_ERR(mr); 637 goto unmap_sg; 638 } 639 nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents, 640 NULL, max_chunk_size); 641 if (nr < 0 || nr < sgt->nents) { 642 err = nr < 0 ? nr : -EINVAL; 643 goto dereg_mr; 644 } 645 646 if (always_invalidate) { 647 srv_mr->iu = rtrs_iu_alloc(1, 648 sizeof(struct rtrs_msg_rkey_rsp), 649 GFP_KERNEL, sess->s.dev->ib_dev, 650 DMA_TO_DEVICE, rtrs_srv_rdma_done); 651 if (!srv_mr->iu) { 652 err = -ENOMEM; 653 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err); 654 goto free_iu; 655 } 656 } 657 /* Eventually dma addr for each chunk can be cached */ 658 for_each_sg(sgt->sgl, s, sgt->orig_nents, i) 659 sess->dma_addr[chunks + i] = sg_dma_address(s); 660 661 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 662 srv_mr->mr = mr; 663 664 continue; 665 err: 666 while (mri--) { 667 srv_mr = &sess->mrs[mri]; 668 sgt = &srv_mr->sgt; 669 mr = srv_mr->mr; 670 free_iu: 671 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 672 dereg_mr: 673 ib_dereg_mr(mr); 674 unmap_sg: 675 ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl, 676 sgt->nents, DMA_BIDIRECTIONAL); 677 free_sg: 678 sg_free_table(sgt); 679 } 680 kfree(sess->mrs); 681 682 return err; 683 } 684 685 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 686 sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 687 688 return 0; 689 } 690 691 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 692 { 693 close_sess(to_srv_sess(c->sess)); 694 } 695 696 static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess) 697 { 698 rtrs_init_hb(&sess->s, &io_comp_cqe, 699 RTRS_HB_INTERVAL_MS, 700 RTRS_HB_MISSED_MAX, 701 rtrs_srv_hb_err_handler, 702 rtrs_wq); 703 } 704 705 static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess) 706 { 707 rtrs_start_hb(&sess->s); 708 } 709 710 static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess) 711 { 712 rtrs_stop_hb(&sess->s); 713 } 714 715 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 716 { 717 struct rtrs_srv_con *con = cq->cq_context; 718 struct rtrs_sess *s = con->c.sess; 719 struct rtrs_srv_sess *sess = to_srv_sess(s); 720 struct rtrs_iu *iu; 721 722 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 723 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 724 725 if (unlikely(wc->status != IB_WC_SUCCESS)) { 726 rtrs_err(s, "Sess info response send failed: %s\n", 727 ib_wc_status_msg(wc->status)); 728 close_sess(sess); 729 return; 730 } 731 WARN_ON(wc->opcode != IB_WC_SEND); 732 } 733 734 static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess) 735 { 736 struct rtrs_srv *srv = sess->srv; 737 struct rtrs_srv_ctx *ctx = srv->ctx; 738 int up; 739 740 mutex_lock(&srv->paths_ev_mutex); 741 up = ++srv->paths_up; 742 if (up == 1) 743 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 744 mutex_unlock(&srv->paths_ev_mutex); 745 746 /* Mark session as established */ 747 sess->established = true; 748 } 749 750 static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess) 751 { 752 struct rtrs_srv *srv = sess->srv; 753 struct rtrs_srv_ctx *ctx = srv->ctx; 754 755 if (!sess->established) 756 return; 757 758 sess->established = false; 759 mutex_lock(&srv->paths_ev_mutex); 760 WARN_ON(!srv->paths_up); 761 if (--srv->paths_up == 0) 762 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 763 mutex_unlock(&srv->paths_ev_mutex); 764 } 765 766 static int post_recv_sess(struct rtrs_srv_sess *sess); 767 768 static int process_info_req(struct rtrs_srv_con *con, 769 struct rtrs_msg_info_req *msg) 770 { 771 struct rtrs_sess *s = con->c.sess; 772 struct rtrs_srv_sess *sess = to_srv_sess(s); 773 struct ib_send_wr *reg_wr = NULL; 774 struct rtrs_msg_info_rsp *rsp; 775 struct rtrs_iu *tx_iu; 776 struct ib_reg_wr *rwr; 777 int mri, err; 778 size_t tx_sz; 779 780 err = post_recv_sess(sess); 781 if (unlikely(err)) { 782 rtrs_err(s, "post_recv_sess(), err: %d\n", err); 783 return err; 784 } 785 rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL); 786 if (unlikely(!rwr)) 787 return -ENOMEM; 788 strlcpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname)); 789 790 tx_sz = sizeof(*rsp); 791 tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num; 792 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev, 793 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 794 if (unlikely(!tx_iu)) { 795 err = -ENOMEM; 796 goto rwr_free; 797 } 798 799 rsp = tx_iu->buf; 800 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 801 rsp->sg_cnt = cpu_to_le16(sess->mrs_num); 802 803 for (mri = 0; mri < sess->mrs_num; mri++) { 804 struct ib_mr *mr = sess->mrs[mri].mr; 805 806 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 807 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 808 rsp->desc[mri].len = cpu_to_le32(mr->length); 809 810 /* 811 * Fill in reg MR request and chain them *backwards* 812 */ 813 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 814 rwr[mri].wr.opcode = IB_WR_REG_MR; 815 rwr[mri].wr.wr_cqe = &local_reg_cqe; 816 rwr[mri].wr.num_sge = 0; 817 rwr[mri].wr.send_flags = mri ? 0 : IB_SEND_SIGNALED; 818 rwr[mri].mr = mr; 819 rwr[mri].key = mr->rkey; 820 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 821 IB_ACCESS_REMOTE_WRITE); 822 reg_wr = &rwr[mri].wr; 823 } 824 825 err = rtrs_srv_create_sess_files(sess); 826 if (unlikely(err)) 827 goto iu_free; 828 kobject_get(&sess->kobj); 829 get_device(&sess->srv->dev); 830 rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED); 831 rtrs_srv_start_hb(sess); 832 833 /* 834 * We do not account number of established connections at the current 835 * moment, we rely on the client, which should send info request when 836 * all connections are successfully established. Thus, simply notify 837 * listener with a proper event if we are the first path. 838 */ 839 rtrs_srv_sess_up(sess); 840 841 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr, 842 tx_iu->size, DMA_TO_DEVICE); 843 844 /* Send info response */ 845 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 846 if (unlikely(err)) { 847 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err); 848 iu_free: 849 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1); 850 } 851 rwr_free: 852 kfree(rwr); 853 854 return err; 855 } 856 857 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 858 { 859 struct rtrs_srv_con *con = cq->cq_context; 860 struct rtrs_sess *s = con->c.sess; 861 struct rtrs_srv_sess *sess = to_srv_sess(s); 862 struct rtrs_msg_info_req *msg; 863 struct rtrs_iu *iu; 864 int err; 865 866 WARN_ON(con->c.cid); 867 868 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 869 if (unlikely(wc->status != IB_WC_SUCCESS)) { 870 rtrs_err(s, "Sess info request receive failed: %s\n", 871 ib_wc_status_msg(wc->status)); 872 goto close; 873 } 874 WARN_ON(wc->opcode != IB_WC_RECV); 875 876 if (unlikely(wc->byte_len < sizeof(*msg))) { 877 rtrs_err(s, "Sess info request is malformed: size %d\n", 878 wc->byte_len); 879 goto close; 880 } 881 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 882 iu->size, DMA_FROM_DEVICE); 883 msg = iu->buf; 884 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ)) { 885 rtrs_err(s, "Sess info request is malformed: type %d\n", 886 le16_to_cpu(msg->type)); 887 goto close; 888 } 889 err = process_info_req(con, msg); 890 if (unlikely(err)) 891 goto close; 892 893 out: 894 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 895 return; 896 close: 897 close_sess(sess); 898 goto out; 899 } 900 901 static int post_recv_info_req(struct rtrs_srv_con *con) 902 { 903 struct rtrs_sess *s = con->c.sess; 904 struct rtrs_srv_sess *sess = to_srv_sess(s); 905 struct rtrs_iu *rx_iu; 906 int err; 907 908 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 909 GFP_KERNEL, sess->s.dev->ib_dev, 910 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 911 if (unlikely(!rx_iu)) 912 return -ENOMEM; 913 /* Prepare for getting info response */ 914 err = rtrs_iu_post_recv(&con->c, rx_iu); 915 if (unlikely(err)) { 916 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err); 917 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1); 918 return err; 919 } 920 921 return 0; 922 } 923 924 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 925 { 926 int i, err; 927 928 for (i = 0; i < q_size; i++) { 929 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 930 if (unlikely(err)) 931 return err; 932 } 933 934 return 0; 935 } 936 937 static int post_recv_sess(struct rtrs_srv_sess *sess) 938 { 939 struct rtrs_srv *srv = sess->srv; 940 struct rtrs_sess *s = &sess->s; 941 size_t q_size; 942 int err, cid; 943 944 for (cid = 0; cid < sess->s.con_num; cid++) { 945 if (cid == 0) 946 q_size = SERVICE_CON_QUEUE_DEPTH; 947 else 948 q_size = srv->queue_depth; 949 950 err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size); 951 if (unlikely(err)) { 952 rtrs_err(s, "post_recv_io(), err: %d\n", err); 953 return err; 954 } 955 } 956 957 return 0; 958 } 959 960 static void process_read(struct rtrs_srv_con *con, 961 struct rtrs_msg_rdma_read *msg, 962 u32 buf_id, u32 off) 963 { 964 struct rtrs_sess *s = con->c.sess; 965 struct rtrs_srv_sess *sess = to_srv_sess(s); 966 struct rtrs_srv *srv = sess->srv; 967 struct rtrs_srv_ctx *ctx = srv->ctx; 968 struct rtrs_srv_op *id; 969 970 size_t usr_len, data_len; 971 void *data; 972 int ret; 973 974 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 975 rtrs_err_rl(s, 976 "Processing read request failed, session is disconnected, sess state %s\n", 977 rtrs_srv_state_str(sess->state)); 978 return; 979 } 980 if (unlikely(msg->sg_cnt != 1 && msg->sg_cnt != 0)) { 981 rtrs_err_rl(s, 982 "Processing read request failed, invalid message\n"); 983 return; 984 } 985 rtrs_srv_get_ops_ids(sess); 986 rtrs_srv_update_rdma_stats(sess->stats, off, READ); 987 id = sess->ops_ids[buf_id]; 988 id->con = con; 989 id->dir = READ; 990 id->msg_id = buf_id; 991 id->rd_msg = msg; 992 usr_len = le16_to_cpu(msg->usr_len); 993 data_len = off - usr_len; 994 data = page_address(srv->chunks[buf_id]); 995 ret = ctx->ops.rdma_ev(srv, srv->priv, id, READ, data, data_len, 996 data + data_len, usr_len); 997 998 if (unlikely(ret)) { 999 rtrs_err_rl(s, 1000 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n", 1001 buf_id, ret); 1002 goto send_err_msg; 1003 } 1004 1005 return; 1006 1007 send_err_msg: 1008 ret = send_io_resp_imm(con, id, ret); 1009 if (ret < 0) { 1010 rtrs_err_rl(s, 1011 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n", 1012 buf_id, ret); 1013 close_sess(sess); 1014 } 1015 rtrs_srv_put_ops_ids(sess); 1016 } 1017 1018 static void process_write(struct rtrs_srv_con *con, 1019 struct rtrs_msg_rdma_write *req, 1020 u32 buf_id, u32 off) 1021 { 1022 struct rtrs_sess *s = con->c.sess; 1023 struct rtrs_srv_sess *sess = to_srv_sess(s); 1024 struct rtrs_srv *srv = sess->srv; 1025 struct rtrs_srv_ctx *ctx = srv->ctx; 1026 struct rtrs_srv_op *id; 1027 1028 size_t data_len, usr_len; 1029 void *data; 1030 int ret; 1031 1032 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 1033 rtrs_err_rl(s, 1034 "Processing write request failed, session is disconnected, sess state %s\n", 1035 rtrs_srv_state_str(sess->state)); 1036 return; 1037 } 1038 rtrs_srv_get_ops_ids(sess); 1039 rtrs_srv_update_rdma_stats(sess->stats, off, WRITE); 1040 id = sess->ops_ids[buf_id]; 1041 id->con = con; 1042 id->dir = WRITE; 1043 id->msg_id = buf_id; 1044 1045 usr_len = le16_to_cpu(req->usr_len); 1046 data_len = off - usr_len; 1047 data = page_address(srv->chunks[buf_id]); 1048 ret = ctx->ops.rdma_ev(srv, srv->priv, id, WRITE, data, data_len, 1049 data + data_len, usr_len); 1050 if (unlikely(ret)) { 1051 rtrs_err_rl(s, 1052 "Processing write request failed, user module callback reports err: %d\n", 1053 ret); 1054 goto send_err_msg; 1055 } 1056 1057 return; 1058 1059 send_err_msg: 1060 ret = send_io_resp_imm(con, id, ret); 1061 if (ret < 0) { 1062 rtrs_err_rl(s, 1063 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n", 1064 buf_id, ret); 1065 close_sess(sess); 1066 } 1067 rtrs_srv_put_ops_ids(sess); 1068 } 1069 1070 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1071 u32 id, u32 off) 1072 { 1073 struct rtrs_sess *s = con->c.sess; 1074 struct rtrs_srv_sess *sess = to_srv_sess(s); 1075 struct rtrs_msg_rdma_hdr *hdr; 1076 unsigned int type; 1077 1078 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id], 1079 max_chunk_size, DMA_BIDIRECTIONAL); 1080 hdr = msg; 1081 type = le16_to_cpu(hdr->type); 1082 1083 switch (type) { 1084 case RTRS_MSG_WRITE: 1085 process_write(con, msg, id, off); 1086 break; 1087 case RTRS_MSG_READ: 1088 process_read(con, msg, id, off); 1089 break; 1090 default: 1091 rtrs_err(s, 1092 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1093 type); 1094 goto err; 1095 } 1096 1097 return; 1098 1099 err: 1100 close_sess(sess); 1101 } 1102 1103 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1104 { 1105 struct rtrs_srv_mr *mr = 1106 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1107 struct rtrs_srv_con *con = cq->cq_context; 1108 struct rtrs_sess *s = con->c.sess; 1109 struct rtrs_srv_sess *sess = to_srv_sess(s); 1110 struct rtrs_srv *srv = sess->srv; 1111 u32 msg_id, off; 1112 void *data; 1113 1114 if (unlikely(wc->status != IB_WC_SUCCESS)) { 1115 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1116 ib_wc_status_msg(wc->status)); 1117 close_sess(sess); 1118 } 1119 msg_id = mr->msg_id; 1120 off = mr->msg_off; 1121 data = page_address(srv->chunks[msg_id]) + off; 1122 process_io_req(con, data, msg_id, off); 1123 } 1124 1125 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1126 struct rtrs_srv_mr *mr) 1127 { 1128 struct ib_send_wr wr = { 1129 .opcode = IB_WR_LOCAL_INV, 1130 .wr_cqe = &mr->inv_cqe, 1131 .send_flags = IB_SEND_SIGNALED, 1132 .ex.invalidate_rkey = mr->mr->rkey, 1133 }; 1134 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1135 1136 return ib_post_send(con->c.qp, &wr, NULL); 1137 } 1138 1139 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1140 { 1141 spin_lock(&con->rsp_wr_wait_lock); 1142 while (!list_empty(&con->rsp_wr_wait_list)) { 1143 struct rtrs_srv_op *id; 1144 int ret; 1145 1146 id = list_entry(con->rsp_wr_wait_list.next, 1147 struct rtrs_srv_op, wait_list); 1148 list_del(&id->wait_list); 1149 1150 spin_unlock(&con->rsp_wr_wait_lock); 1151 ret = rtrs_srv_resp_rdma(id, id->status); 1152 spin_lock(&con->rsp_wr_wait_lock); 1153 1154 if (!ret) { 1155 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1156 break; 1157 } 1158 } 1159 spin_unlock(&con->rsp_wr_wait_lock); 1160 } 1161 1162 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1163 { 1164 struct rtrs_srv_con *con = cq->cq_context; 1165 struct rtrs_sess *s = con->c.sess; 1166 struct rtrs_srv_sess *sess = to_srv_sess(s); 1167 struct rtrs_srv *srv = sess->srv; 1168 u32 imm_type, imm_payload; 1169 int err; 1170 1171 if (unlikely(wc->status != IB_WC_SUCCESS)) { 1172 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1173 rtrs_err(s, 1174 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1175 ib_wc_status_msg(wc->status), wc->wr_cqe, 1176 wc->opcode, wc->vendor_err, wc->byte_len); 1177 close_sess(sess); 1178 } 1179 return; 1180 } 1181 1182 switch (wc->opcode) { 1183 case IB_WC_RECV_RDMA_WITH_IMM: 1184 /* 1185 * post_recv() RDMA write completions of IO reqs (read/write) 1186 * and hb 1187 */ 1188 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1189 return; 1190 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1191 if (unlikely(err)) { 1192 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err); 1193 close_sess(sess); 1194 break; 1195 } 1196 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1197 &imm_type, &imm_payload); 1198 if (likely(imm_type == RTRS_IO_REQ_IMM)) { 1199 u32 msg_id, off; 1200 void *data; 1201 1202 msg_id = imm_payload >> sess->mem_bits; 1203 off = imm_payload & ((1 << sess->mem_bits) - 1); 1204 if (unlikely(msg_id >= srv->queue_depth || 1205 off >= max_chunk_size)) { 1206 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1207 msg_id, off); 1208 close_sess(sess); 1209 return; 1210 } 1211 if (always_invalidate) { 1212 struct rtrs_srv_mr *mr = &sess->mrs[msg_id]; 1213 1214 mr->msg_off = off; 1215 mr->msg_id = msg_id; 1216 err = rtrs_srv_inv_rkey(con, mr); 1217 if (unlikely(err)) { 1218 rtrs_err(s, "rtrs_post_recv(), err: %d\n", 1219 err); 1220 close_sess(sess); 1221 break; 1222 } 1223 } else { 1224 data = page_address(srv->chunks[msg_id]) + off; 1225 process_io_req(con, data, msg_id, off); 1226 } 1227 } else if (imm_type == RTRS_HB_MSG_IMM) { 1228 WARN_ON(con->c.cid); 1229 rtrs_send_hb_ack(&sess->s); 1230 } else if (imm_type == RTRS_HB_ACK_IMM) { 1231 WARN_ON(con->c.cid); 1232 sess->s.hb_missed_cnt = 0; 1233 } else { 1234 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1235 } 1236 break; 1237 case IB_WC_RDMA_WRITE: 1238 case IB_WC_SEND: 1239 /* 1240 * post_send() RDMA write completions of IO reqs (read/write) 1241 * and hb 1242 */ 1243 atomic_add(srv->queue_depth, &con->sq_wr_avail); 1244 1245 if (unlikely(!list_empty_careful(&con->rsp_wr_wait_list))) 1246 rtrs_rdma_process_wr_wait_list(con); 1247 1248 break; 1249 default: 1250 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1251 return; 1252 } 1253 } 1254 1255 /** 1256 * rtrs_srv_get_sess_name() - Get rtrs_srv peer hostname. 1257 * @srv: Session 1258 * @sessname: Sessname buffer 1259 * @len: Length of sessname buffer 1260 */ 1261 int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len) 1262 { 1263 struct rtrs_srv_sess *sess; 1264 int err = -ENOTCONN; 1265 1266 mutex_lock(&srv->paths_mutex); 1267 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1268 if (sess->state != RTRS_SRV_CONNECTED) 1269 continue; 1270 strlcpy(sessname, sess->s.sessname, 1271 min_t(size_t, sizeof(sess->s.sessname), len)); 1272 err = 0; 1273 break; 1274 } 1275 mutex_unlock(&srv->paths_mutex); 1276 1277 return err; 1278 } 1279 EXPORT_SYMBOL(rtrs_srv_get_sess_name); 1280 1281 /** 1282 * rtrs_srv_get_sess_qdepth() - Get rtrs_srv qdepth. 1283 * @srv: Session 1284 */ 1285 int rtrs_srv_get_queue_depth(struct rtrs_srv *srv) 1286 { 1287 return srv->queue_depth; 1288 } 1289 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1290 1291 static int find_next_bit_ring(struct rtrs_srv_sess *sess) 1292 { 1293 struct ib_device *ib_dev = sess->s.dev->ib_dev; 1294 int v; 1295 1296 v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask); 1297 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1298 v = cpumask_first(&cq_affinity_mask); 1299 return v; 1300 } 1301 1302 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess) 1303 { 1304 sess->cur_cq_vector = find_next_bit_ring(sess); 1305 1306 return sess->cur_cq_vector; 1307 } 1308 1309 static void rtrs_srv_dev_release(struct device *dev) 1310 { 1311 struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev); 1312 1313 kfree(srv); 1314 } 1315 1316 static void free_srv(struct rtrs_srv *srv) 1317 { 1318 int i; 1319 1320 WARN_ON(refcount_read(&srv->refcount)); 1321 for (i = 0; i < srv->queue_depth; i++) 1322 mempool_free(srv->chunks[i], chunk_pool); 1323 kfree(srv->chunks); 1324 mutex_destroy(&srv->paths_mutex); 1325 mutex_destroy(&srv->paths_ev_mutex); 1326 /* last put to release the srv structure */ 1327 put_device(&srv->dev); 1328 } 1329 1330 static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1331 const uuid_t *paths_uuid) 1332 { 1333 struct rtrs_srv *srv; 1334 int i; 1335 1336 mutex_lock(&ctx->srv_mutex); 1337 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1338 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1339 refcount_inc_not_zero(&srv->refcount)) { 1340 mutex_unlock(&ctx->srv_mutex); 1341 return srv; 1342 } 1343 } 1344 1345 /* need to allocate a new srv */ 1346 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1347 if (!srv) { 1348 mutex_unlock(&ctx->srv_mutex); 1349 return NULL; 1350 } 1351 1352 INIT_LIST_HEAD(&srv->paths_list); 1353 mutex_init(&srv->paths_mutex); 1354 mutex_init(&srv->paths_ev_mutex); 1355 uuid_copy(&srv->paths_uuid, paths_uuid); 1356 srv->queue_depth = sess_queue_depth; 1357 srv->ctx = ctx; 1358 device_initialize(&srv->dev); 1359 srv->dev.release = rtrs_srv_dev_release; 1360 list_add(&srv->ctx_list, &ctx->srv_list); 1361 mutex_unlock(&ctx->srv_mutex); 1362 1363 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1364 GFP_KERNEL); 1365 if (!srv->chunks) 1366 goto err_free_srv; 1367 1368 for (i = 0; i < srv->queue_depth; i++) { 1369 srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL); 1370 if (!srv->chunks[i]) 1371 goto err_free_chunks; 1372 } 1373 refcount_set(&srv->refcount, 1); 1374 1375 return srv; 1376 1377 err_free_chunks: 1378 while (i--) 1379 mempool_free(srv->chunks[i], chunk_pool); 1380 kfree(srv->chunks); 1381 1382 err_free_srv: 1383 kfree(srv); 1384 return NULL; 1385 } 1386 1387 static void put_srv(struct rtrs_srv *srv) 1388 { 1389 if (refcount_dec_and_test(&srv->refcount)) { 1390 struct rtrs_srv_ctx *ctx = srv->ctx; 1391 1392 WARN_ON(srv->dev.kobj.state_in_sysfs); 1393 1394 mutex_lock(&ctx->srv_mutex); 1395 list_del(&srv->ctx_list); 1396 mutex_unlock(&ctx->srv_mutex); 1397 free_srv(srv); 1398 } 1399 } 1400 1401 static void __add_path_to_srv(struct rtrs_srv *srv, 1402 struct rtrs_srv_sess *sess) 1403 { 1404 list_add_tail(&sess->s.entry, &srv->paths_list); 1405 srv->paths_num++; 1406 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1407 } 1408 1409 static void del_path_from_srv(struct rtrs_srv_sess *sess) 1410 { 1411 struct rtrs_srv *srv = sess->srv; 1412 1413 if (WARN_ON(!srv)) 1414 return; 1415 1416 mutex_lock(&srv->paths_mutex); 1417 list_del(&sess->s.entry); 1418 WARN_ON(!srv->paths_num); 1419 srv->paths_num--; 1420 mutex_unlock(&srv->paths_mutex); 1421 } 1422 1423 /* return true if addresses are the same, error other wise */ 1424 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1425 { 1426 switch (a->sa_family) { 1427 case AF_IB: 1428 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1429 &((struct sockaddr_ib *)b)->sib_addr, 1430 sizeof(struct ib_addr)) && 1431 (b->sa_family == AF_IB); 1432 case AF_INET: 1433 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1434 &((struct sockaddr_in *)b)->sin_addr, 1435 sizeof(struct in_addr)) && 1436 (b->sa_family == AF_INET); 1437 case AF_INET6: 1438 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1439 &((struct sockaddr_in6 *)b)->sin6_addr, 1440 sizeof(struct in6_addr)) && 1441 (b->sa_family == AF_INET6); 1442 default: 1443 return -ENOENT; 1444 } 1445 } 1446 1447 static bool __is_path_w_addr_exists(struct rtrs_srv *srv, 1448 struct rdma_addr *addr) 1449 { 1450 struct rtrs_srv_sess *sess; 1451 1452 list_for_each_entry(sess, &srv->paths_list, s.entry) 1453 if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr, 1454 (struct sockaddr *)&addr->dst_addr) && 1455 !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr, 1456 (struct sockaddr *)&addr->src_addr)) 1457 return true; 1458 1459 return false; 1460 } 1461 1462 static void free_sess(struct rtrs_srv_sess *sess) 1463 { 1464 if (sess->kobj.state_in_sysfs) 1465 kobject_put(&sess->kobj); 1466 else 1467 kfree(sess); 1468 } 1469 1470 static void rtrs_srv_close_work(struct work_struct *work) 1471 { 1472 struct rtrs_srv_sess *sess; 1473 struct rtrs_srv_con *con; 1474 int i; 1475 1476 sess = container_of(work, typeof(*sess), close_work); 1477 1478 rtrs_srv_destroy_sess_files(sess); 1479 rtrs_srv_stop_hb(sess); 1480 1481 for (i = 0; i < sess->s.con_num; i++) { 1482 if (!sess->s.con[i]) 1483 continue; 1484 con = to_srv_con(sess->s.con[i]); 1485 rdma_disconnect(con->c.cm_id); 1486 ib_drain_qp(con->c.qp); 1487 } 1488 /* Wait for all inflights */ 1489 rtrs_srv_wait_ops_ids(sess); 1490 1491 /* Notify upper layer if we are the last path */ 1492 rtrs_srv_sess_down(sess); 1493 1494 unmap_cont_bufs(sess); 1495 rtrs_srv_free_ops_ids(sess); 1496 1497 for (i = 0; i < sess->s.con_num; i++) { 1498 if (!sess->s.con[i]) 1499 continue; 1500 con = to_srv_con(sess->s.con[i]); 1501 rtrs_cq_qp_destroy(&con->c); 1502 rdma_destroy_id(con->c.cm_id); 1503 kfree(con); 1504 } 1505 rtrs_ib_dev_put(sess->s.dev); 1506 1507 del_path_from_srv(sess); 1508 put_srv(sess->srv); 1509 sess->srv = NULL; 1510 rtrs_srv_change_state(sess, RTRS_SRV_CLOSED); 1511 1512 kfree(sess->dma_addr); 1513 kfree(sess->s.con); 1514 free_sess(sess); 1515 } 1516 1517 static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess, 1518 struct rdma_cm_id *cm_id) 1519 { 1520 struct rtrs_srv *srv = sess->srv; 1521 struct rtrs_msg_conn_rsp msg; 1522 struct rdma_conn_param param; 1523 int err; 1524 1525 param = (struct rdma_conn_param) { 1526 .rnr_retry_count = 7, 1527 .private_data = &msg, 1528 .private_data_len = sizeof(msg), 1529 }; 1530 1531 msg = (struct rtrs_msg_conn_rsp) { 1532 .magic = cpu_to_le16(RTRS_MAGIC), 1533 .version = cpu_to_le16(RTRS_PROTO_VER), 1534 .queue_depth = cpu_to_le16(srv->queue_depth), 1535 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1536 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1537 }; 1538 1539 if (always_invalidate) 1540 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1541 1542 err = rdma_accept(cm_id, ¶m); 1543 if (err) 1544 pr_err("rdma_accept(), err: %d\n", err); 1545 1546 return err; 1547 } 1548 1549 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1550 { 1551 struct rtrs_msg_conn_rsp msg; 1552 int err; 1553 1554 msg = (struct rtrs_msg_conn_rsp) { 1555 .magic = cpu_to_le16(RTRS_MAGIC), 1556 .version = cpu_to_le16(RTRS_PROTO_VER), 1557 .errno = cpu_to_le16(errno), 1558 }; 1559 1560 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1561 if (err) 1562 pr_err("rdma_reject(), err: %d\n", err); 1563 1564 /* Bounce errno back */ 1565 return errno; 1566 } 1567 1568 static struct rtrs_srv_sess * 1569 __find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid) 1570 { 1571 struct rtrs_srv_sess *sess; 1572 1573 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1574 if (uuid_equal(&sess->s.uuid, sess_uuid)) 1575 return sess; 1576 } 1577 1578 return NULL; 1579 } 1580 1581 static int create_con(struct rtrs_srv_sess *sess, 1582 struct rdma_cm_id *cm_id, 1583 unsigned int cid) 1584 { 1585 struct rtrs_srv *srv = sess->srv; 1586 struct rtrs_sess *s = &sess->s; 1587 struct rtrs_srv_con *con; 1588 1589 u16 cq_size, wr_queue_size; 1590 int err, cq_vector; 1591 1592 con = kzalloc(sizeof(*con), GFP_KERNEL); 1593 if (!con) { 1594 err = -ENOMEM; 1595 goto err; 1596 } 1597 1598 spin_lock_init(&con->rsp_wr_wait_lock); 1599 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1600 con->c.cm_id = cm_id; 1601 con->c.sess = &sess->s; 1602 con->c.cid = cid; 1603 atomic_set(&con->wr_cnt, 0); 1604 1605 if (con->c.cid == 0) { 1606 /* 1607 * All receive and all send (each requiring invalidate) 1608 * + 2 for drain and heartbeat 1609 */ 1610 wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2; 1611 cq_size = wr_queue_size; 1612 } else { 1613 /* 1614 * If we have all receive requests posted and 1615 * all write requests posted and each read request 1616 * requires an invalidate request + drain 1617 * and qp gets into error state. 1618 */ 1619 cq_size = srv->queue_depth * 3 + 1; 1620 /* 1621 * In theory we might have queue_depth * 32 1622 * outstanding requests if an unsafe global key is used 1623 * and we have queue_depth read requests each consisting 1624 * of 32 different addresses. div 3 for mlx5. 1625 */ 1626 wr_queue_size = sess->s.dev->ib_dev->attrs.max_qp_wr / 3; 1627 } 1628 atomic_set(&con->sq_wr_avail, wr_queue_size); 1629 cq_vector = rtrs_srv_get_next_cq_vector(sess); 1630 1631 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1632 err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_size, 1633 wr_queue_size, IB_POLL_WORKQUEUE); 1634 if (err) { 1635 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err); 1636 goto free_con; 1637 } 1638 if (con->c.cid == 0) { 1639 err = post_recv_info_req(con); 1640 if (err) 1641 goto free_cqqp; 1642 } 1643 WARN_ON(sess->s.con[cid]); 1644 sess->s.con[cid] = &con->c; 1645 1646 /* 1647 * Change context from server to current connection. The other 1648 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1649 */ 1650 cm_id->context = &con->c; 1651 1652 return 0; 1653 1654 free_cqqp: 1655 rtrs_cq_qp_destroy(&con->c); 1656 free_con: 1657 kfree(con); 1658 1659 err: 1660 return err; 1661 } 1662 1663 static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv, 1664 struct rdma_cm_id *cm_id, 1665 unsigned int con_num, 1666 unsigned int recon_cnt, 1667 const uuid_t *uuid) 1668 { 1669 struct rtrs_srv_sess *sess; 1670 int err = -ENOMEM; 1671 1672 if (srv->paths_num >= MAX_PATHS_NUM) { 1673 err = -ECONNRESET; 1674 goto err; 1675 } 1676 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1677 err = -EEXIST; 1678 pr_err("Path with same addr exists\n"); 1679 goto err; 1680 } 1681 sess = kzalloc(sizeof(*sess), GFP_KERNEL); 1682 if (!sess) 1683 goto err; 1684 1685 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL); 1686 if (!sess->stats) 1687 goto err_free_sess; 1688 1689 sess->stats->sess = sess; 1690 1691 sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr), 1692 GFP_KERNEL); 1693 if (!sess->dma_addr) 1694 goto err_free_stats; 1695 1696 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL); 1697 if (!sess->s.con) 1698 goto err_free_dma_addr; 1699 1700 sess->state = RTRS_SRV_CONNECTING; 1701 sess->srv = srv; 1702 sess->cur_cq_vector = -1; 1703 sess->s.dst_addr = cm_id->route.addr.dst_addr; 1704 sess->s.src_addr = cm_id->route.addr.src_addr; 1705 sess->s.con_num = con_num; 1706 sess->s.recon_cnt = recon_cnt; 1707 uuid_copy(&sess->s.uuid, uuid); 1708 spin_lock_init(&sess->state_lock); 1709 INIT_WORK(&sess->close_work, rtrs_srv_close_work); 1710 rtrs_srv_init_hb(sess); 1711 1712 sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1713 if (!sess->s.dev) { 1714 err = -ENOMEM; 1715 goto err_free_con; 1716 } 1717 err = map_cont_bufs(sess); 1718 if (err) 1719 goto err_put_dev; 1720 1721 err = rtrs_srv_alloc_ops_ids(sess); 1722 if (err) 1723 goto err_unmap_bufs; 1724 1725 __add_path_to_srv(srv, sess); 1726 1727 return sess; 1728 1729 err_unmap_bufs: 1730 unmap_cont_bufs(sess); 1731 err_put_dev: 1732 rtrs_ib_dev_put(sess->s.dev); 1733 err_free_con: 1734 kfree(sess->s.con); 1735 err_free_dma_addr: 1736 kfree(sess->dma_addr); 1737 err_free_stats: 1738 kfree(sess->stats); 1739 err_free_sess: 1740 kfree(sess); 1741 err: 1742 return ERR_PTR(err); 1743 } 1744 1745 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1746 const struct rtrs_msg_conn_req *msg, 1747 size_t len) 1748 { 1749 struct rtrs_srv_ctx *ctx = cm_id->context; 1750 struct rtrs_srv_sess *sess; 1751 struct rtrs_srv *srv; 1752 1753 u16 version, con_num, cid; 1754 u16 recon_cnt; 1755 int err; 1756 1757 if (len < sizeof(*msg)) { 1758 pr_err("Invalid RTRS connection request\n"); 1759 goto reject_w_econnreset; 1760 } 1761 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1762 pr_err("Invalid RTRS magic\n"); 1763 goto reject_w_econnreset; 1764 } 1765 version = le16_to_cpu(msg->version); 1766 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1767 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1768 version >> 8, RTRS_PROTO_VER_MAJOR); 1769 goto reject_w_econnreset; 1770 } 1771 con_num = le16_to_cpu(msg->cid_num); 1772 if (con_num > 4096) { 1773 /* Sanity check */ 1774 pr_err("Too many connections requested: %d\n", con_num); 1775 goto reject_w_econnreset; 1776 } 1777 cid = le16_to_cpu(msg->cid); 1778 if (cid >= con_num) { 1779 /* Sanity check */ 1780 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1781 goto reject_w_econnreset; 1782 } 1783 recon_cnt = le16_to_cpu(msg->recon_cnt); 1784 srv = get_or_create_srv(ctx, &msg->paths_uuid); 1785 /* 1786 * "refcount == 0" happens if a previous thread calls get_or_create_srv 1787 * allocate srv, but chunks of srv are not allocated yet. 1788 */ 1789 if (!srv || refcount_read(&srv->refcount) == 0) { 1790 err = -ENOMEM; 1791 goto reject_w_err; 1792 } 1793 mutex_lock(&srv->paths_mutex); 1794 sess = __find_sess(srv, &msg->sess_uuid); 1795 if (sess) { 1796 struct rtrs_sess *s = &sess->s; 1797 1798 /* Session already holds a reference */ 1799 put_srv(srv); 1800 1801 if (sess->state != RTRS_SRV_CONNECTING) { 1802 rtrs_err(s, "Session in wrong state: %s\n", 1803 rtrs_srv_state_str(sess->state)); 1804 mutex_unlock(&srv->paths_mutex); 1805 goto reject_w_econnreset; 1806 } 1807 /* 1808 * Sanity checks 1809 */ 1810 if (con_num != s->con_num || cid >= s->con_num) { 1811 rtrs_err(s, "Incorrect request: %d, %d\n", 1812 cid, con_num); 1813 mutex_unlock(&srv->paths_mutex); 1814 goto reject_w_econnreset; 1815 } 1816 if (s->con[cid]) { 1817 rtrs_err(s, "Connection already exists: %d\n", 1818 cid); 1819 mutex_unlock(&srv->paths_mutex); 1820 goto reject_w_econnreset; 1821 } 1822 } else { 1823 sess = __alloc_sess(srv, cm_id, con_num, recon_cnt, 1824 &msg->sess_uuid); 1825 if (IS_ERR(sess)) { 1826 mutex_unlock(&srv->paths_mutex); 1827 put_srv(srv); 1828 err = PTR_ERR(sess); 1829 goto reject_w_err; 1830 } 1831 } 1832 err = create_con(sess, cm_id, cid); 1833 if (err) { 1834 (void)rtrs_rdma_do_reject(cm_id, err); 1835 /* 1836 * Since session has other connections we follow normal way 1837 * through workqueue, but still return an error to tell cma.c 1838 * to call rdma_destroy_id() for current connection. 1839 */ 1840 goto close_and_return_err; 1841 } 1842 err = rtrs_rdma_do_accept(sess, cm_id); 1843 if (err) { 1844 (void)rtrs_rdma_do_reject(cm_id, err); 1845 /* 1846 * Since current connection was successfully added to the 1847 * session we follow normal way through workqueue to close the 1848 * session, thus return 0 to tell cma.c we call 1849 * rdma_destroy_id() ourselves. 1850 */ 1851 err = 0; 1852 goto close_and_return_err; 1853 } 1854 mutex_unlock(&srv->paths_mutex); 1855 1856 return 0; 1857 1858 reject_w_err: 1859 return rtrs_rdma_do_reject(cm_id, err); 1860 1861 reject_w_econnreset: 1862 return rtrs_rdma_do_reject(cm_id, -ECONNRESET); 1863 1864 close_and_return_err: 1865 close_sess(sess); 1866 mutex_unlock(&srv->paths_mutex); 1867 1868 return err; 1869 } 1870 1871 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 1872 struct rdma_cm_event *ev) 1873 { 1874 struct rtrs_srv_sess *sess = NULL; 1875 struct rtrs_sess *s = NULL; 1876 1877 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 1878 struct rtrs_con *c = cm_id->context; 1879 1880 s = c->sess; 1881 sess = to_srv_sess(s); 1882 } 1883 1884 switch (ev->event) { 1885 case RDMA_CM_EVENT_CONNECT_REQUEST: 1886 /* 1887 * In case of error cma.c will destroy cm_id, 1888 * see cma_process_remove() 1889 */ 1890 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 1891 ev->param.conn.private_data_len); 1892 case RDMA_CM_EVENT_ESTABLISHED: 1893 /* Nothing here */ 1894 break; 1895 case RDMA_CM_EVENT_REJECTED: 1896 case RDMA_CM_EVENT_CONNECT_ERROR: 1897 case RDMA_CM_EVENT_UNREACHABLE: 1898 rtrs_err(s, "CM error (CM event: %s, err: %d)\n", 1899 rdma_event_msg(ev->event), ev->status); 1900 close_sess(sess); 1901 break; 1902 case RDMA_CM_EVENT_DISCONNECTED: 1903 case RDMA_CM_EVENT_ADDR_CHANGE: 1904 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1905 close_sess(sess); 1906 break; 1907 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1908 close_sess(sess); 1909 break; 1910 default: 1911 pr_err("Ignoring unexpected CM event %s, err %d\n", 1912 rdma_event_msg(ev->event), ev->status); 1913 break; 1914 } 1915 1916 return 0; 1917 } 1918 1919 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 1920 struct sockaddr *addr, 1921 enum rdma_ucm_port_space ps) 1922 { 1923 struct rdma_cm_id *cm_id; 1924 int ret; 1925 1926 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 1927 ctx, ps, IB_QPT_RC); 1928 if (IS_ERR(cm_id)) { 1929 ret = PTR_ERR(cm_id); 1930 pr_err("Creating id for RDMA connection failed, err: %d\n", 1931 ret); 1932 goto err_out; 1933 } 1934 ret = rdma_bind_addr(cm_id, addr); 1935 if (ret) { 1936 pr_err("Binding RDMA address failed, err: %d\n", ret); 1937 goto err_cm; 1938 } 1939 ret = rdma_listen(cm_id, 64); 1940 if (ret) { 1941 pr_err("Listening on RDMA connection failed, err: %d\n", 1942 ret); 1943 goto err_cm; 1944 } 1945 1946 return cm_id; 1947 1948 err_cm: 1949 rdma_destroy_id(cm_id); 1950 err_out: 1951 1952 return ERR_PTR(ret); 1953 } 1954 1955 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 1956 { 1957 struct sockaddr_in6 sin = { 1958 .sin6_family = AF_INET6, 1959 .sin6_addr = IN6ADDR_ANY_INIT, 1960 .sin6_port = htons(port), 1961 }; 1962 struct sockaddr_ib sib = { 1963 .sib_family = AF_IB, 1964 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 1965 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 1966 .sib_pkey = cpu_to_be16(0xffff), 1967 }; 1968 struct rdma_cm_id *cm_ip, *cm_ib; 1969 int ret; 1970 1971 /* 1972 * We accept both IPoIB and IB connections, so we need to keep 1973 * two cm id's, one for each socket type and port space. 1974 * If the cm initialization of one of the id's fails, we abort 1975 * everything. 1976 */ 1977 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 1978 if (IS_ERR(cm_ip)) 1979 return PTR_ERR(cm_ip); 1980 1981 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 1982 if (IS_ERR(cm_ib)) { 1983 ret = PTR_ERR(cm_ib); 1984 goto free_cm_ip; 1985 } 1986 1987 ctx->cm_id_ip = cm_ip; 1988 ctx->cm_id_ib = cm_ib; 1989 1990 return 0; 1991 1992 free_cm_ip: 1993 rdma_destroy_id(cm_ip); 1994 1995 return ret; 1996 } 1997 1998 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 1999 { 2000 struct rtrs_srv_ctx *ctx; 2001 2002 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2003 if (!ctx) 2004 return NULL; 2005 2006 ctx->ops = *ops; 2007 mutex_init(&ctx->srv_mutex); 2008 INIT_LIST_HEAD(&ctx->srv_list); 2009 2010 return ctx; 2011 } 2012 2013 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2014 { 2015 WARN_ON(!list_empty(&ctx->srv_list)); 2016 mutex_destroy(&ctx->srv_mutex); 2017 kfree(ctx); 2018 } 2019 2020 static int rtrs_srv_add_one(struct ib_device *device) 2021 { 2022 struct rtrs_srv_ctx *ctx; 2023 int ret = 0; 2024 2025 mutex_lock(&ib_ctx.ib_dev_mutex); 2026 if (ib_ctx.ib_dev_count) 2027 goto out; 2028 2029 /* 2030 * Since our CM IDs are NOT bound to any ib device we will create them 2031 * only once 2032 */ 2033 ctx = ib_ctx.srv_ctx; 2034 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2035 if (ret) { 2036 /* 2037 * We errored out here. 2038 * According to the ib code, if we encounter an error here then the 2039 * error code is ignored, and no more calls to our ops are made. 2040 */ 2041 pr_err("Failed to initialize RDMA connection"); 2042 goto err_out; 2043 } 2044 2045 out: 2046 /* 2047 * Keep a track on the number of ib devices added 2048 */ 2049 ib_ctx.ib_dev_count++; 2050 2051 err_out: 2052 mutex_unlock(&ib_ctx.ib_dev_mutex); 2053 return ret; 2054 } 2055 2056 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2057 { 2058 struct rtrs_srv_ctx *ctx; 2059 2060 mutex_lock(&ib_ctx.ib_dev_mutex); 2061 ib_ctx.ib_dev_count--; 2062 2063 if (ib_ctx.ib_dev_count) 2064 goto out; 2065 2066 /* 2067 * Since our CM IDs are NOT bound to any ib device we will remove them 2068 * only once, when the last device is removed 2069 */ 2070 ctx = ib_ctx.srv_ctx; 2071 rdma_destroy_id(ctx->cm_id_ip); 2072 rdma_destroy_id(ctx->cm_id_ib); 2073 2074 out: 2075 mutex_unlock(&ib_ctx.ib_dev_mutex); 2076 } 2077 2078 static struct ib_client rtrs_srv_client = { 2079 .name = "rtrs_server", 2080 .add = rtrs_srv_add_one, 2081 .remove = rtrs_srv_remove_one 2082 }; 2083 2084 /** 2085 * rtrs_srv_open() - open RTRS server context 2086 * @ops: callback functions 2087 * @port: port to listen on 2088 * 2089 * Creates server context with specified callbacks. 2090 * 2091 * Return a valid pointer on success otherwise PTR_ERR. 2092 */ 2093 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2094 { 2095 struct rtrs_srv_ctx *ctx; 2096 int err; 2097 2098 ctx = alloc_srv_ctx(ops); 2099 if (!ctx) 2100 return ERR_PTR(-ENOMEM); 2101 2102 mutex_init(&ib_ctx.ib_dev_mutex); 2103 ib_ctx.srv_ctx = ctx; 2104 ib_ctx.port = port; 2105 2106 err = ib_register_client(&rtrs_srv_client); 2107 if (err) { 2108 free_srv_ctx(ctx); 2109 return ERR_PTR(err); 2110 } 2111 2112 return ctx; 2113 } 2114 EXPORT_SYMBOL(rtrs_srv_open); 2115 2116 static void close_sessions(struct rtrs_srv *srv) 2117 { 2118 struct rtrs_srv_sess *sess; 2119 2120 mutex_lock(&srv->paths_mutex); 2121 list_for_each_entry(sess, &srv->paths_list, s.entry) 2122 close_sess(sess); 2123 mutex_unlock(&srv->paths_mutex); 2124 } 2125 2126 static void close_ctx(struct rtrs_srv_ctx *ctx) 2127 { 2128 struct rtrs_srv *srv; 2129 2130 mutex_lock(&ctx->srv_mutex); 2131 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2132 close_sessions(srv); 2133 mutex_unlock(&ctx->srv_mutex); 2134 flush_workqueue(rtrs_wq); 2135 } 2136 2137 /** 2138 * rtrs_srv_close() - close RTRS server context 2139 * @ctx: pointer to server context 2140 * 2141 * Closes RTRS server context with all client sessions. 2142 */ 2143 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2144 { 2145 ib_unregister_client(&rtrs_srv_client); 2146 mutex_destroy(&ib_ctx.ib_dev_mutex); 2147 close_ctx(ctx); 2148 free_srv_ctx(ctx); 2149 } 2150 EXPORT_SYMBOL(rtrs_srv_close); 2151 2152 static int check_module_params(void) 2153 { 2154 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2155 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2156 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2157 return -EINVAL; 2158 } 2159 if (max_chunk_size < 4096 || !is_power_of_2(max_chunk_size)) { 2160 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2161 max_chunk_size, 4096); 2162 return -EINVAL; 2163 } 2164 2165 /* 2166 * Check if IB immediate data size is enough to hold the mem_id and the 2167 * offset inside the memory chunk 2168 */ 2169 if ((ilog2(sess_queue_depth - 1) + 1) + 2170 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2171 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2172 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2173 return -EINVAL; 2174 } 2175 2176 return 0; 2177 } 2178 2179 static int __init rtrs_server_init(void) 2180 { 2181 int err; 2182 2183 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2184 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2185 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2186 sess_queue_depth, always_invalidate); 2187 2188 rtrs_rdma_dev_pd_init(0, &dev_pd); 2189 2190 err = check_module_params(); 2191 if (err) { 2192 pr_err("Failed to load module, invalid module parameters, err: %d\n", 2193 err); 2194 return err; 2195 } 2196 chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ, 2197 get_order(max_chunk_size)); 2198 if (!chunk_pool) 2199 return -ENOMEM; 2200 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server"); 2201 if (IS_ERR(rtrs_dev_class)) { 2202 err = PTR_ERR(rtrs_dev_class); 2203 goto out_chunk_pool; 2204 } 2205 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2206 if (!rtrs_wq) { 2207 err = -ENOMEM; 2208 goto out_dev_class; 2209 } 2210 2211 return 0; 2212 2213 out_dev_class: 2214 class_destroy(rtrs_dev_class); 2215 out_chunk_pool: 2216 mempool_destroy(chunk_pool); 2217 2218 return err; 2219 } 2220 2221 static void __exit rtrs_server_exit(void) 2222 { 2223 destroy_workqueue(rtrs_wq); 2224 class_destroy(rtrs_dev_class); 2225 mempool_destroy(chunk_pool); 2226 rtrs_rdma_dev_pd_deinit(&dev_pd); 2227 } 2228 2229 module_init(rtrs_server_init); 2230 module_exit(rtrs_server_exit); 2231