1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 15 #include "rtrs-srv.h" 16 #include "rtrs-log.h" 17 #include <rdma/ib_cm.h> 18 #include <rdma/ib_verbs.h> 19 #include "rtrs-srv-trace.h" 20 21 MODULE_DESCRIPTION("RDMA Transport Server"); 22 MODULE_LICENSE("GPL"); 23 24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 26 #define DEFAULT_SESS_QUEUE_DEPTH 512 27 #define MAX_HDR_SIZE PAGE_SIZE 28 29 static struct rtrs_rdma_dev_pd dev_pd; 30 const struct class rtrs_dev_class = { 31 .name = "rtrs-server", 32 }; 33 static struct rtrs_srv_ib_ctx ib_ctx; 34 35 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 36 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 37 38 static bool always_invalidate = true; 39 module_param(always_invalidate, bool, 0444); 40 MODULE_PARM_DESC(always_invalidate, 41 "Invalidate memory registration for contiguous memory regions before accessing."); 42 43 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 44 MODULE_PARM_DESC(max_chunk_size, 45 "Max size for each IO request, when change the unit is in byte (default: " 46 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 47 48 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 49 MODULE_PARM_DESC(sess_queue_depth, 50 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 51 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 52 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 53 54 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 55 56 static struct workqueue_struct *rtrs_wq; 57 58 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 59 { 60 return container_of(c, struct rtrs_srv_con, c); 61 } 62 63 static bool rtrs_srv_change_state(struct rtrs_srv_path *srv_path, 64 enum rtrs_srv_state new_state) 65 { 66 enum rtrs_srv_state old_state; 67 bool changed = false; 68 69 spin_lock_irq(&srv_path->state_lock); 70 old_state = srv_path->state; 71 switch (new_state) { 72 case RTRS_SRV_CONNECTED: 73 if (old_state == RTRS_SRV_CONNECTING) 74 changed = true; 75 break; 76 case RTRS_SRV_CLOSING: 77 if (old_state == RTRS_SRV_CONNECTING || 78 old_state == RTRS_SRV_CONNECTED) 79 changed = true; 80 break; 81 case RTRS_SRV_CLOSED: 82 if (old_state == RTRS_SRV_CLOSING) 83 changed = true; 84 break; 85 default: 86 break; 87 } 88 if (changed) 89 srv_path->state = new_state; 90 spin_unlock_irq(&srv_path->state_lock); 91 92 return changed; 93 } 94 95 static void free_id(struct rtrs_srv_op *id) 96 { 97 if (!id) 98 return; 99 kfree(id); 100 } 101 102 static void rtrs_srv_free_ops_ids(struct rtrs_srv_path *srv_path) 103 { 104 struct rtrs_srv_sess *srv = srv_path->srv; 105 int i; 106 107 if (srv_path->ops_ids) { 108 for (i = 0; i < srv->queue_depth; i++) 109 free_id(srv_path->ops_ids[i]); 110 kfree(srv_path->ops_ids); 111 srv_path->ops_ids = NULL; 112 } 113 } 114 115 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 116 117 static struct ib_cqe io_comp_cqe = { 118 .done = rtrs_srv_rdma_done 119 }; 120 121 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref) 122 { 123 struct rtrs_srv_path *srv_path = container_of(ref, 124 struct rtrs_srv_path, 125 ids_inflight_ref); 126 127 percpu_ref_exit(&srv_path->ids_inflight_ref); 128 complete(&srv_path->complete_done); 129 } 130 131 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_path *srv_path) 132 { 133 struct rtrs_srv_sess *srv = srv_path->srv; 134 struct rtrs_srv_op *id; 135 int i, ret; 136 137 srv_path->ops_ids = kcalloc(srv->queue_depth, 138 sizeof(*srv_path->ops_ids), 139 GFP_KERNEL); 140 if (!srv_path->ops_ids) 141 goto err; 142 143 for (i = 0; i < srv->queue_depth; ++i) { 144 id = kzalloc(sizeof(*id), GFP_KERNEL); 145 if (!id) 146 goto err; 147 148 srv_path->ops_ids[i] = id; 149 } 150 151 ret = percpu_ref_init(&srv_path->ids_inflight_ref, 152 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL); 153 if (ret) { 154 pr_err("Percpu reference init failed\n"); 155 goto err; 156 } 157 init_completion(&srv_path->complete_done); 158 159 return 0; 160 161 err: 162 rtrs_srv_free_ops_ids(srv_path); 163 return -ENOMEM; 164 } 165 166 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_path *srv_path) 167 { 168 percpu_ref_get(&srv_path->ids_inflight_ref); 169 } 170 171 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_path *srv_path) 172 { 173 percpu_ref_put(&srv_path->ids_inflight_ref); 174 } 175 176 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 177 { 178 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 179 struct rtrs_path *s = con->c.path; 180 struct rtrs_srv_path *srv_path = to_srv_path(s); 181 182 if (wc->status != IB_WC_SUCCESS) { 183 rtrs_err(s, "REG MR failed: %s\n", 184 ib_wc_status_msg(wc->status)); 185 close_path(srv_path); 186 return; 187 } 188 } 189 190 static struct ib_cqe local_reg_cqe = { 191 .done = rtrs_srv_reg_mr_done 192 }; 193 194 static int rdma_write_sg(struct rtrs_srv_op *id) 195 { 196 struct rtrs_path *s = id->con->c.path; 197 struct rtrs_srv_path *srv_path = to_srv_path(s); 198 dma_addr_t dma_addr = srv_path->dma_addr[id->msg_id]; 199 struct rtrs_srv_mr *srv_mr; 200 struct ib_send_wr inv_wr; 201 struct ib_rdma_wr imm_wr; 202 struct ib_rdma_wr *wr = NULL; 203 enum ib_send_flags flags; 204 size_t sg_cnt; 205 int err, offset; 206 bool need_inval; 207 u32 rkey = 0; 208 struct ib_reg_wr rwr; 209 struct ib_sge *plist; 210 struct ib_sge list; 211 212 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 213 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 214 if (sg_cnt != 1) 215 return -EINVAL; 216 217 offset = 0; 218 219 wr = &id->tx_wr; 220 plist = &id->tx_sg; 221 plist->addr = dma_addr + offset; 222 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 223 224 /* WR will fail with length error 225 * if this is 0 226 */ 227 if (plist->length == 0) { 228 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 229 return -EINVAL; 230 } 231 232 plist->lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 233 offset += plist->length; 234 235 wr->wr.sg_list = plist; 236 wr->wr.num_sge = 1; 237 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 238 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 239 if (rkey == 0) 240 rkey = wr->rkey; 241 else 242 /* Only one key is actually used */ 243 WARN_ON_ONCE(rkey != wr->rkey); 244 245 wr->wr.opcode = IB_WR_RDMA_WRITE; 246 wr->wr.wr_cqe = &io_comp_cqe; 247 wr->wr.ex.imm_data = 0; 248 wr->wr.send_flags = 0; 249 250 if (need_inval && always_invalidate) { 251 wr->wr.next = &rwr.wr; 252 rwr.wr.next = &inv_wr; 253 inv_wr.next = &imm_wr.wr; 254 } else if (always_invalidate) { 255 wr->wr.next = &rwr.wr; 256 rwr.wr.next = &imm_wr.wr; 257 } else if (need_inval) { 258 wr->wr.next = &inv_wr; 259 inv_wr.next = &imm_wr.wr; 260 } else { 261 wr->wr.next = &imm_wr.wr; 262 } 263 /* 264 * From time to time we have to post signaled sends, 265 * or send queue will fill up and only QP reset can help. 266 */ 267 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ? 268 0 : IB_SEND_SIGNALED; 269 270 if (need_inval) { 271 inv_wr.sg_list = NULL; 272 inv_wr.num_sge = 0; 273 inv_wr.opcode = IB_WR_SEND_WITH_INV; 274 inv_wr.wr_cqe = &io_comp_cqe; 275 inv_wr.send_flags = 0; 276 inv_wr.ex.invalidate_rkey = rkey; 277 } 278 279 imm_wr.wr.next = NULL; 280 if (always_invalidate) { 281 struct rtrs_msg_rkey_rsp *msg; 282 283 srv_mr = &srv_path->mrs[id->msg_id]; 284 rwr.wr.opcode = IB_WR_REG_MR; 285 rwr.wr.wr_cqe = &local_reg_cqe; 286 rwr.wr.num_sge = 0; 287 rwr.mr = srv_mr->mr; 288 rwr.wr.send_flags = 0; 289 rwr.key = srv_mr->mr->rkey; 290 rwr.access = (IB_ACCESS_LOCAL_WRITE | 291 IB_ACCESS_REMOTE_WRITE); 292 msg = srv_mr->iu->buf; 293 msg->buf_id = cpu_to_le16(id->msg_id); 294 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 295 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 296 297 list.addr = srv_mr->iu->dma_addr; 298 list.length = sizeof(*msg); 299 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 300 imm_wr.wr.sg_list = &list; 301 imm_wr.wr.num_sge = 1; 302 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 303 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 304 srv_mr->iu->dma_addr, 305 srv_mr->iu->size, DMA_TO_DEVICE); 306 } else { 307 imm_wr.wr.sg_list = NULL; 308 imm_wr.wr.num_sge = 0; 309 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 310 } 311 imm_wr.wr.send_flags = flags; 312 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 313 0, need_inval)); 314 315 imm_wr.wr.wr_cqe = &io_comp_cqe; 316 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, dma_addr, 317 offset, DMA_BIDIRECTIONAL); 318 319 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 320 if (err) 321 rtrs_err(s, 322 "Posting RDMA-Write-Request to QP failed, err: %d\n", 323 err); 324 325 return err; 326 } 327 328 /** 329 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 330 * requests or on successful WRITE request. 331 * @con: the connection to send back result 332 * @id: the id associated with the IO 333 * @errno: the error number of the IO. 334 * 335 * Return 0 on success, errno otherwise. 336 */ 337 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 338 int errno) 339 { 340 struct rtrs_path *s = con->c.path; 341 struct rtrs_srv_path *srv_path = to_srv_path(s); 342 struct ib_send_wr inv_wr, *wr = NULL; 343 struct ib_rdma_wr imm_wr; 344 struct ib_reg_wr rwr; 345 struct rtrs_srv_mr *srv_mr; 346 bool need_inval = false; 347 enum ib_send_flags flags; 348 u32 imm; 349 int err; 350 351 if (id->dir == READ) { 352 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 353 size_t sg_cnt; 354 355 need_inval = le16_to_cpu(rd_msg->flags) & 356 RTRS_MSG_NEED_INVAL_F; 357 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 358 359 if (need_inval) { 360 if (sg_cnt) { 361 inv_wr.wr_cqe = &io_comp_cqe; 362 inv_wr.sg_list = NULL; 363 inv_wr.num_sge = 0; 364 inv_wr.opcode = IB_WR_SEND_WITH_INV; 365 inv_wr.send_flags = 0; 366 /* Only one key is actually used */ 367 inv_wr.ex.invalidate_rkey = 368 le32_to_cpu(rd_msg->desc[0].key); 369 } else { 370 WARN_ON_ONCE(1); 371 need_inval = false; 372 } 373 } 374 } 375 376 trace_send_io_resp_imm(id, need_inval, always_invalidate, errno); 377 378 if (need_inval && always_invalidate) { 379 wr = &inv_wr; 380 inv_wr.next = &rwr.wr; 381 rwr.wr.next = &imm_wr.wr; 382 } else if (always_invalidate) { 383 wr = &rwr.wr; 384 rwr.wr.next = &imm_wr.wr; 385 } else if (need_inval) { 386 wr = &inv_wr; 387 inv_wr.next = &imm_wr.wr; 388 } else { 389 wr = &imm_wr.wr; 390 } 391 /* 392 * From time to time we have to post signalled sends, 393 * or send queue will fill up and only QP reset can help. 394 */ 395 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ? 396 0 : IB_SEND_SIGNALED; 397 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 398 imm_wr.wr.next = NULL; 399 if (always_invalidate) { 400 struct ib_sge list; 401 struct rtrs_msg_rkey_rsp *msg; 402 403 srv_mr = &srv_path->mrs[id->msg_id]; 404 rwr.wr.next = &imm_wr.wr; 405 rwr.wr.opcode = IB_WR_REG_MR; 406 rwr.wr.wr_cqe = &local_reg_cqe; 407 rwr.wr.num_sge = 0; 408 rwr.wr.send_flags = 0; 409 rwr.mr = srv_mr->mr; 410 rwr.key = srv_mr->mr->rkey; 411 rwr.access = (IB_ACCESS_LOCAL_WRITE | 412 IB_ACCESS_REMOTE_WRITE); 413 msg = srv_mr->iu->buf; 414 msg->buf_id = cpu_to_le16(id->msg_id); 415 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 416 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 417 418 list.addr = srv_mr->iu->dma_addr; 419 list.length = sizeof(*msg); 420 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 421 imm_wr.wr.sg_list = &list; 422 imm_wr.wr.num_sge = 1; 423 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 424 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 425 srv_mr->iu->dma_addr, 426 srv_mr->iu->size, DMA_TO_DEVICE); 427 } else { 428 imm_wr.wr.sg_list = NULL; 429 imm_wr.wr.num_sge = 0; 430 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 431 } 432 imm_wr.wr.send_flags = flags; 433 imm_wr.wr.wr_cqe = &io_comp_cqe; 434 435 imm_wr.wr.ex.imm_data = cpu_to_be32(imm); 436 437 err = ib_post_send(id->con->c.qp, wr, NULL); 438 if (err) 439 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n", 440 err); 441 442 return err; 443 } 444 445 void close_path(struct rtrs_srv_path *srv_path) 446 { 447 if (rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSING)) 448 queue_work(rtrs_wq, &srv_path->close_work); 449 WARN_ON(srv_path->state != RTRS_SRV_CLOSING); 450 } 451 452 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 453 { 454 switch (state) { 455 case RTRS_SRV_CONNECTING: 456 return "RTRS_SRV_CONNECTING"; 457 case RTRS_SRV_CONNECTED: 458 return "RTRS_SRV_CONNECTED"; 459 case RTRS_SRV_CLOSING: 460 return "RTRS_SRV_CLOSING"; 461 case RTRS_SRV_CLOSED: 462 return "RTRS_SRV_CLOSED"; 463 default: 464 return "UNKNOWN"; 465 } 466 } 467 468 /** 469 * rtrs_srv_resp_rdma() - Finish an RDMA request 470 * 471 * @id: Internal RTRS operation identifier 472 * @status: Response Code sent to the other side for this operation. 473 * 0 = success, <=0 error 474 * Context: any 475 * 476 * Finish a RDMA operation. A message is sent to the client and the 477 * corresponding memory areas will be released. 478 */ 479 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 480 { 481 struct rtrs_srv_path *srv_path; 482 struct rtrs_srv_con *con; 483 struct rtrs_path *s; 484 int err; 485 486 if (WARN_ON(!id)) 487 return true; 488 489 con = id->con; 490 s = con->c.path; 491 srv_path = to_srv_path(s); 492 493 id->status = status; 494 495 if (srv_path->state != RTRS_SRV_CONNECTED) { 496 rtrs_err_rl(s, 497 "Sending I/O response failed, server path %s is disconnected, path state %s\n", 498 kobject_name(&srv_path->kobj), 499 rtrs_srv_state_str(srv_path->state)); 500 goto out; 501 } 502 if (always_invalidate) { 503 struct rtrs_srv_mr *mr = &srv_path->mrs[id->msg_id]; 504 505 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 506 } 507 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) { 508 rtrs_err(s, "IB send queue full: srv_path=%s cid=%d\n", 509 kobject_name(&srv_path->kobj), 510 con->c.cid); 511 atomic_add(1, &con->c.sq_wr_avail); 512 spin_lock(&con->rsp_wr_wait_lock); 513 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 514 spin_unlock(&con->rsp_wr_wait_lock); 515 return false; 516 } 517 518 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 519 err = send_io_resp_imm(con, id, status); 520 else 521 err = rdma_write_sg(id); 522 523 if (err) { 524 rtrs_err_rl(s, "IO response failed: %d: srv_path=%s\n", err, 525 kobject_name(&srv_path->kobj)); 526 close_path(srv_path); 527 } 528 out: 529 rtrs_srv_put_ops_ids(srv_path); 530 return true; 531 } 532 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 533 534 /** 535 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 536 * @srv: Session pointer 537 * @priv: The private pointer that is associated with the session. 538 */ 539 void rtrs_srv_set_sess_priv(struct rtrs_srv_sess *srv, void *priv) 540 { 541 srv->priv = priv; 542 } 543 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 544 545 static void unmap_cont_bufs(struct rtrs_srv_path *srv_path) 546 { 547 int i; 548 549 for (i = 0; i < srv_path->mrs_num; i++) { 550 struct rtrs_srv_mr *srv_mr; 551 552 srv_mr = &srv_path->mrs[i]; 553 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1); 554 ib_dereg_mr(srv_mr->mr); 555 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, srv_mr->sgt.sgl, 556 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 557 sg_free_table(&srv_mr->sgt); 558 } 559 kfree(srv_path->mrs); 560 } 561 562 static int map_cont_bufs(struct rtrs_srv_path *srv_path) 563 { 564 struct rtrs_srv_sess *srv = srv_path->srv; 565 struct rtrs_path *ss = &srv_path->s; 566 int i, err, mrs_num; 567 unsigned int chunk_bits; 568 int chunks_per_mr = 1; 569 struct ib_mr *mr; 570 struct sg_table *sgt; 571 572 /* 573 * Here we map queue_depth chunks to MR. Firstly we have to 574 * figure out how many chunks can we map per MR. 575 */ 576 if (always_invalidate) { 577 /* 578 * in order to do invalidate for each chunks of memory, we needs 579 * more memory regions. 580 */ 581 mrs_num = srv->queue_depth; 582 } else { 583 chunks_per_mr = 584 srv_path->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 585 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 586 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 587 } 588 589 srv_path->mrs = kcalloc(mrs_num, sizeof(*srv_path->mrs), GFP_KERNEL); 590 if (!srv_path->mrs) 591 return -ENOMEM; 592 593 for (srv_path->mrs_num = 0; srv_path->mrs_num < mrs_num; 594 srv_path->mrs_num++) { 595 struct rtrs_srv_mr *srv_mr = &srv_path->mrs[srv_path->mrs_num]; 596 struct scatterlist *s; 597 int nr, nr_sgt, chunks; 598 599 sgt = &srv_mr->sgt; 600 chunks = chunks_per_mr * srv_path->mrs_num; 601 if (!always_invalidate) 602 chunks_per_mr = min_t(int, chunks_per_mr, 603 srv->queue_depth - chunks); 604 605 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 606 if (err) 607 goto err; 608 609 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 610 sg_set_page(s, srv->chunks[chunks + i], 611 max_chunk_size, 0); 612 613 nr_sgt = ib_dma_map_sg(srv_path->s.dev->ib_dev, sgt->sgl, 614 sgt->nents, DMA_BIDIRECTIONAL); 615 if (!nr_sgt) { 616 err = -EINVAL; 617 goto free_sg; 618 } 619 mr = ib_alloc_mr(srv_path->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 620 nr_sgt); 621 if (IS_ERR(mr)) { 622 err = PTR_ERR(mr); 623 goto unmap_sg; 624 } 625 nr = ib_map_mr_sg(mr, sgt->sgl, nr_sgt, 626 NULL, max_chunk_size); 627 if (nr != nr_sgt) { 628 err = nr < 0 ? nr : -EINVAL; 629 goto dereg_mr; 630 } 631 632 if (always_invalidate) { 633 srv_mr->iu = rtrs_iu_alloc(1, 634 sizeof(struct rtrs_msg_rkey_rsp), 635 GFP_KERNEL, srv_path->s.dev->ib_dev, 636 DMA_TO_DEVICE, rtrs_srv_rdma_done); 637 if (!srv_mr->iu) { 638 err = -ENOMEM; 639 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err); 640 goto dereg_mr; 641 } 642 } 643 /* Eventually dma addr for each chunk can be cached */ 644 for_each_sg(sgt->sgl, s, nr_sgt, i) 645 srv_path->dma_addr[chunks + i] = sg_dma_address(s); 646 647 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 648 srv_mr->mr = mr; 649 } 650 651 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 652 srv_path->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 653 654 return 0; 655 656 dereg_mr: 657 ib_dereg_mr(mr); 658 unmap_sg: 659 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, sgt->sgl, 660 sgt->nents, DMA_BIDIRECTIONAL); 661 free_sg: 662 sg_free_table(sgt); 663 err: 664 unmap_cont_bufs(srv_path); 665 666 return err; 667 } 668 669 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 670 { 671 close_path(to_srv_path(c->path)); 672 } 673 674 static void rtrs_srv_init_hb(struct rtrs_srv_path *srv_path) 675 { 676 rtrs_init_hb(&srv_path->s, &io_comp_cqe, 677 RTRS_HB_INTERVAL_MS, 678 RTRS_HB_MISSED_MAX, 679 rtrs_srv_hb_err_handler, 680 rtrs_wq); 681 } 682 683 static void rtrs_srv_start_hb(struct rtrs_srv_path *srv_path) 684 { 685 rtrs_start_hb(&srv_path->s); 686 } 687 688 static void rtrs_srv_stop_hb(struct rtrs_srv_path *srv_path) 689 { 690 rtrs_stop_hb(&srv_path->s); 691 } 692 693 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 694 { 695 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 696 struct rtrs_path *s = con->c.path; 697 struct rtrs_srv_path *srv_path = to_srv_path(s); 698 struct rtrs_iu *iu; 699 700 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 701 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 702 703 if (wc->status != IB_WC_SUCCESS) { 704 rtrs_err(s, "Sess info response send failed: %s\n", 705 ib_wc_status_msg(wc->status)); 706 close_path(srv_path); 707 return; 708 } 709 WARN_ON(wc->opcode != IB_WC_SEND); 710 } 711 712 static void rtrs_srv_path_up(struct rtrs_srv_path *srv_path) 713 { 714 struct rtrs_srv_sess *srv = srv_path->srv; 715 struct rtrs_srv_ctx *ctx = srv->ctx; 716 int up; 717 718 mutex_lock(&srv->paths_ev_mutex); 719 up = ++srv->paths_up; 720 if (up == 1) 721 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 722 mutex_unlock(&srv->paths_ev_mutex); 723 724 /* Mark session as established */ 725 srv_path->established = true; 726 } 727 728 static void rtrs_srv_path_down(struct rtrs_srv_path *srv_path) 729 { 730 struct rtrs_srv_sess *srv = srv_path->srv; 731 struct rtrs_srv_ctx *ctx = srv->ctx; 732 733 if (!srv_path->established) 734 return; 735 736 srv_path->established = false; 737 mutex_lock(&srv->paths_ev_mutex); 738 WARN_ON(!srv->paths_up); 739 if (--srv->paths_up == 0) 740 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 741 mutex_unlock(&srv->paths_ev_mutex); 742 } 743 744 static bool exist_pathname(struct rtrs_srv_ctx *ctx, 745 const char *pathname, const uuid_t *path_uuid) 746 { 747 struct rtrs_srv_sess *srv; 748 struct rtrs_srv_path *srv_path; 749 bool found = false; 750 751 mutex_lock(&ctx->srv_mutex); 752 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 753 mutex_lock(&srv->paths_mutex); 754 755 /* when a client with same uuid and same sessname tried to add a path */ 756 if (uuid_equal(&srv->paths_uuid, path_uuid)) { 757 mutex_unlock(&srv->paths_mutex); 758 continue; 759 } 760 761 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 762 if (strlen(srv_path->s.sessname) == strlen(pathname) && 763 !strcmp(srv_path->s.sessname, pathname)) { 764 found = true; 765 break; 766 } 767 } 768 mutex_unlock(&srv->paths_mutex); 769 if (found) 770 break; 771 } 772 mutex_unlock(&ctx->srv_mutex); 773 return found; 774 } 775 776 static int post_recv_path(struct rtrs_srv_path *srv_path); 777 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno); 778 779 static int process_info_req(struct rtrs_srv_con *con, 780 struct rtrs_msg_info_req *msg) 781 { 782 struct rtrs_path *s = con->c.path; 783 struct rtrs_srv_path *srv_path = to_srv_path(s); 784 struct ib_send_wr *reg_wr = NULL; 785 struct rtrs_msg_info_rsp *rsp; 786 struct rtrs_iu *tx_iu; 787 struct ib_reg_wr *rwr; 788 int mri, err; 789 size_t tx_sz; 790 791 err = post_recv_path(srv_path); 792 if (err) { 793 rtrs_err(s, "post_recv_path(), err: %d\n", err); 794 return err; 795 } 796 797 if (strchr(msg->pathname, '/') || strchr(msg->pathname, '.')) { 798 rtrs_err(s, "pathname cannot contain / and .\n"); 799 return -EINVAL; 800 } 801 802 if (exist_pathname(srv_path->srv->ctx, 803 msg->pathname, &srv_path->srv->paths_uuid)) { 804 rtrs_err(s, "pathname is duplicated: %s\n", msg->pathname); 805 return -EPERM; 806 } 807 strscpy(srv_path->s.sessname, msg->pathname, 808 sizeof(srv_path->s.sessname)); 809 810 rwr = kcalloc(srv_path->mrs_num, sizeof(*rwr), GFP_KERNEL); 811 if (!rwr) 812 return -ENOMEM; 813 814 tx_sz = sizeof(*rsp); 815 tx_sz += sizeof(rsp->desc[0]) * srv_path->mrs_num; 816 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, srv_path->s.dev->ib_dev, 817 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 818 if (!tx_iu) { 819 err = -ENOMEM; 820 goto rwr_free; 821 } 822 823 rsp = tx_iu->buf; 824 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 825 rsp->sg_cnt = cpu_to_le16(srv_path->mrs_num); 826 827 for (mri = 0; mri < srv_path->mrs_num; mri++) { 828 struct ib_mr *mr = srv_path->mrs[mri].mr; 829 830 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 831 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 832 rsp->desc[mri].len = cpu_to_le32(mr->length); 833 834 /* 835 * Fill in reg MR request and chain them *backwards* 836 */ 837 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 838 rwr[mri].wr.opcode = IB_WR_REG_MR; 839 rwr[mri].wr.wr_cqe = &local_reg_cqe; 840 rwr[mri].wr.num_sge = 0; 841 rwr[mri].wr.send_flags = 0; 842 rwr[mri].mr = mr; 843 rwr[mri].key = mr->rkey; 844 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 845 IB_ACCESS_REMOTE_WRITE); 846 reg_wr = &rwr[mri].wr; 847 } 848 849 err = rtrs_srv_create_path_files(srv_path); 850 if (err) 851 goto iu_free; 852 kobject_get(&srv_path->kobj); 853 get_device(&srv_path->srv->dev); 854 rtrs_srv_change_state(srv_path, RTRS_SRV_CONNECTED); 855 rtrs_srv_start_hb(srv_path); 856 857 /* 858 * We do not account number of established connections at the current 859 * moment, we rely on the client, which should send info request when 860 * all connections are successfully established. Thus, simply notify 861 * listener with a proper event if we are the first path. 862 */ 863 rtrs_srv_path_up(srv_path); 864 865 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 866 tx_iu->dma_addr, 867 tx_iu->size, DMA_TO_DEVICE); 868 869 /* Send info response */ 870 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 871 if (err) { 872 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err); 873 iu_free: 874 rtrs_iu_free(tx_iu, srv_path->s.dev->ib_dev, 1); 875 } 876 rwr_free: 877 kfree(rwr); 878 879 return err; 880 } 881 882 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 883 { 884 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 885 struct rtrs_path *s = con->c.path; 886 struct rtrs_srv_path *srv_path = to_srv_path(s); 887 struct rtrs_msg_info_req *msg; 888 struct rtrs_iu *iu; 889 int err; 890 891 WARN_ON(con->c.cid); 892 893 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 894 if (wc->status != IB_WC_SUCCESS) { 895 rtrs_err(s, "Sess info request receive failed: %s\n", 896 ib_wc_status_msg(wc->status)); 897 goto close; 898 } 899 WARN_ON(wc->opcode != IB_WC_RECV); 900 901 if (wc->byte_len < sizeof(*msg)) { 902 rtrs_err(s, "Sess info request is malformed: size %d\n", 903 wc->byte_len); 904 goto close; 905 } 906 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, iu->dma_addr, 907 iu->size, DMA_FROM_DEVICE); 908 msg = iu->buf; 909 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) { 910 rtrs_err(s, "Sess info request is malformed: type %d\n", 911 le16_to_cpu(msg->type)); 912 goto close; 913 } 914 err = process_info_req(con, msg); 915 if (err) 916 goto close; 917 918 out: 919 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 920 return; 921 close: 922 close_path(srv_path); 923 goto out; 924 } 925 926 static int post_recv_info_req(struct rtrs_srv_con *con) 927 { 928 struct rtrs_path *s = con->c.path; 929 struct rtrs_srv_path *srv_path = to_srv_path(s); 930 struct rtrs_iu *rx_iu; 931 int err; 932 933 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 934 GFP_KERNEL, srv_path->s.dev->ib_dev, 935 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 936 if (!rx_iu) 937 return -ENOMEM; 938 /* Prepare for getting info response */ 939 err = rtrs_iu_post_recv(&con->c, rx_iu); 940 if (err) { 941 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err); 942 rtrs_iu_free(rx_iu, srv_path->s.dev->ib_dev, 1); 943 return err; 944 } 945 946 return 0; 947 } 948 949 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 950 { 951 int i, err; 952 953 for (i = 0; i < q_size; i++) { 954 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 955 if (err) 956 return err; 957 } 958 959 return 0; 960 } 961 962 static int post_recv_path(struct rtrs_srv_path *srv_path) 963 { 964 struct rtrs_srv_sess *srv = srv_path->srv; 965 struct rtrs_path *s = &srv_path->s; 966 size_t q_size; 967 int err, cid; 968 969 for (cid = 0; cid < srv_path->s.con_num; cid++) { 970 if (cid == 0) 971 q_size = SERVICE_CON_QUEUE_DEPTH; 972 else 973 q_size = srv->queue_depth; 974 975 err = post_recv_io(to_srv_con(srv_path->s.con[cid]), q_size); 976 if (err) { 977 rtrs_err(s, "post_recv_io(), err: %d\n", err); 978 return err; 979 } 980 } 981 982 return 0; 983 } 984 985 static void process_read(struct rtrs_srv_con *con, 986 struct rtrs_msg_rdma_read *msg, 987 u32 buf_id, u32 off) 988 { 989 struct rtrs_path *s = con->c.path; 990 struct rtrs_srv_path *srv_path = to_srv_path(s); 991 struct rtrs_srv_sess *srv = srv_path->srv; 992 struct rtrs_srv_ctx *ctx = srv->ctx; 993 struct rtrs_srv_op *id; 994 995 size_t usr_len, data_len; 996 void *data; 997 int ret; 998 999 if (srv_path->state != RTRS_SRV_CONNECTED) { 1000 rtrs_err_rl(s, 1001 "Processing read request failed, session is disconnected, sess state %s\n", 1002 rtrs_srv_state_str(srv_path->state)); 1003 return; 1004 } 1005 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) { 1006 rtrs_err_rl(s, 1007 "Processing read request failed, invalid message\n"); 1008 return; 1009 } 1010 rtrs_srv_get_ops_ids(srv_path); 1011 rtrs_srv_update_rdma_stats(srv_path->stats, off, READ); 1012 id = srv_path->ops_ids[buf_id]; 1013 id->con = con; 1014 id->dir = READ; 1015 id->msg_id = buf_id; 1016 id->rd_msg = msg; 1017 usr_len = le16_to_cpu(msg->usr_len); 1018 data_len = off - usr_len; 1019 data = page_address(srv->chunks[buf_id]); 1020 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len, 1021 data + data_len, usr_len); 1022 1023 if (ret) { 1024 rtrs_err_rl(s, 1025 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n", 1026 buf_id, ret); 1027 goto send_err_msg; 1028 } 1029 1030 return; 1031 1032 send_err_msg: 1033 ret = send_io_resp_imm(con, id, ret); 1034 if (ret < 0) { 1035 rtrs_err_rl(s, 1036 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n", 1037 buf_id, ret); 1038 close_path(srv_path); 1039 } 1040 rtrs_srv_put_ops_ids(srv_path); 1041 } 1042 1043 static void process_write(struct rtrs_srv_con *con, 1044 struct rtrs_msg_rdma_write *req, 1045 u32 buf_id, u32 off) 1046 { 1047 struct rtrs_path *s = con->c.path; 1048 struct rtrs_srv_path *srv_path = to_srv_path(s); 1049 struct rtrs_srv_sess *srv = srv_path->srv; 1050 struct rtrs_srv_ctx *ctx = srv->ctx; 1051 struct rtrs_srv_op *id; 1052 1053 size_t data_len, usr_len; 1054 void *data; 1055 int ret; 1056 1057 if (srv_path->state != RTRS_SRV_CONNECTED) { 1058 rtrs_err_rl(s, 1059 "Processing write request failed, session is disconnected, sess state %s\n", 1060 rtrs_srv_state_str(srv_path->state)); 1061 return; 1062 } 1063 rtrs_srv_get_ops_ids(srv_path); 1064 rtrs_srv_update_rdma_stats(srv_path->stats, off, WRITE); 1065 id = srv_path->ops_ids[buf_id]; 1066 id->con = con; 1067 id->dir = WRITE; 1068 id->msg_id = buf_id; 1069 1070 usr_len = le16_to_cpu(req->usr_len); 1071 data_len = off - usr_len; 1072 data = page_address(srv->chunks[buf_id]); 1073 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len, 1074 data + data_len, usr_len); 1075 if (ret) { 1076 rtrs_err_rl(s, 1077 "Processing write request failed, user module callback reports err: %d\n", 1078 ret); 1079 goto send_err_msg; 1080 } 1081 1082 return; 1083 1084 send_err_msg: 1085 ret = send_io_resp_imm(con, id, ret); 1086 if (ret < 0) { 1087 rtrs_err_rl(s, 1088 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n", 1089 buf_id, ret); 1090 close_path(srv_path); 1091 } 1092 rtrs_srv_put_ops_ids(srv_path); 1093 } 1094 1095 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1096 u32 id, u32 off) 1097 { 1098 struct rtrs_path *s = con->c.path; 1099 struct rtrs_srv_path *srv_path = to_srv_path(s); 1100 struct rtrs_msg_rdma_hdr *hdr; 1101 unsigned int type; 1102 1103 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, 1104 srv_path->dma_addr[id], 1105 max_chunk_size, DMA_BIDIRECTIONAL); 1106 hdr = msg; 1107 type = le16_to_cpu(hdr->type); 1108 1109 switch (type) { 1110 case RTRS_MSG_WRITE: 1111 process_write(con, msg, id, off); 1112 break; 1113 case RTRS_MSG_READ: 1114 process_read(con, msg, id, off); 1115 break; 1116 default: 1117 rtrs_err(s, 1118 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1119 type); 1120 goto err; 1121 } 1122 1123 return; 1124 1125 err: 1126 close_path(srv_path); 1127 } 1128 1129 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1130 { 1131 struct rtrs_srv_mr *mr = 1132 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1133 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1134 struct rtrs_path *s = con->c.path; 1135 struct rtrs_srv_path *srv_path = to_srv_path(s); 1136 struct rtrs_srv_sess *srv = srv_path->srv; 1137 u32 msg_id, off; 1138 void *data; 1139 1140 if (wc->status != IB_WC_SUCCESS) { 1141 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1142 ib_wc_status_msg(wc->status)); 1143 close_path(srv_path); 1144 } 1145 msg_id = mr->msg_id; 1146 off = mr->msg_off; 1147 data = page_address(srv->chunks[msg_id]) + off; 1148 process_io_req(con, data, msg_id, off); 1149 } 1150 1151 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1152 struct rtrs_srv_mr *mr) 1153 { 1154 struct ib_send_wr wr = { 1155 .opcode = IB_WR_LOCAL_INV, 1156 .wr_cqe = &mr->inv_cqe, 1157 .send_flags = IB_SEND_SIGNALED, 1158 .ex.invalidate_rkey = mr->mr->rkey, 1159 }; 1160 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1161 1162 return ib_post_send(con->c.qp, &wr, NULL); 1163 } 1164 1165 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1166 { 1167 spin_lock(&con->rsp_wr_wait_lock); 1168 while (!list_empty(&con->rsp_wr_wait_list)) { 1169 struct rtrs_srv_op *id; 1170 int ret; 1171 1172 id = list_entry(con->rsp_wr_wait_list.next, 1173 struct rtrs_srv_op, wait_list); 1174 list_del(&id->wait_list); 1175 1176 spin_unlock(&con->rsp_wr_wait_lock); 1177 ret = rtrs_srv_resp_rdma(id, id->status); 1178 spin_lock(&con->rsp_wr_wait_lock); 1179 1180 if (!ret) { 1181 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1182 break; 1183 } 1184 } 1185 spin_unlock(&con->rsp_wr_wait_lock); 1186 } 1187 1188 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1189 { 1190 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1191 struct rtrs_path *s = con->c.path; 1192 struct rtrs_srv_path *srv_path = to_srv_path(s); 1193 struct rtrs_srv_sess *srv = srv_path->srv; 1194 u32 imm_type, imm_payload; 1195 int err; 1196 1197 if (wc->status != IB_WC_SUCCESS) { 1198 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1199 rtrs_err(s, 1200 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1201 ib_wc_status_msg(wc->status), wc->wr_cqe, 1202 wc->opcode, wc->vendor_err, wc->byte_len); 1203 close_path(srv_path); 1204 } 1205 return; 1206 } 1207 1208 switch (wc->opcode) { 1209 case IB_WC_RECV_RDMA_WITH_IMM: 1210 /* 1211 * post_recv() RDMA write completions of IO reqs (read/write) 1212 * and hb 1213 */ 1214 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1215 return; 1216 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1217 if (err) { 1218 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err); 1219 close_path(srv_path); 1220 break; 1221 } 1222 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1223 &imm_type, &imm_payload); 1224 if (imm_type == RTRS_IO_REQ_IMM) { 1225 u32 msg_id, off; 1226 void *data; 1227 1228 msg_id = imm_payload >> srv_path->mem_bits; 1229 off = imm_payload & ((1 << srv_path->mem_bits) - 1); 1230 if (msg_id >= srv->queue_depth || off >= max_chunk_size) { 1231 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1232 msg_id, off); 1233 close_path(srv_path); 1234 return; 1235 } 1236 if (always_invalidate) { 1237 struct rtrs_srv_mr *mr = &srv_path->mrs[msg_id]; 1238 1239 mr->msg_off = off; 1240 mr->msg_id = msg_id; 1241 err = rtrs_srv_inv_rkey(con, mr); 1242 if (err) { 1243 rtrs_err(s, "rtrs_post_recv(), err: %d\n", 1244 err); 1245 close_path(srv_path); 1246 break; 1247 } 1248 } else { 1249 data = page_address(srv->chunks[msg_id]) + off; 1250 process_io_req(con, data, msg_id, off); 1251 } 1252 } else if (imm_type == RTRS_HB_MSG_IMM) { 1253 WARN_ON(con->c.cid); 1254 rtrs_send_hb_ack(&srv_path->s); 1255 } else if (imm_type == RTRS_HB_ACK_IMM) { 1256 WARN_ON(con->c.cid); 1257 srv_path->s.hb_missed_cnt = 0; 1258 } else { 1259 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1260 } 1261 break; 1262 case IB_WC_RDMA_WRITE: 1263 case IB_WC_SEND: 1264 /* 1265 * post_send() RDMA write completions of IO reqs (read/write) 1266 * and hb. 1267 */ 1268 atomic_add(s->signal_interval, &con->c.sq_wr_avail); 1269 1270 if (!list_empty_careful(&con->rsp_wr_wait_list)) 1271 rtrs_rdma_process_wr_wait_list(con); 1272 1273 break; 1274 default: 1275 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1276 return; 1277 } 1278 } 1279 1280 /** 1281 * rtrs_srv_get_path_name() - Get rtrs_srv peer hostname. 1282 * @srv: Session 1283 * @pathname: Pathname buffer 1284 * @len: Length of sessname buffer 1285 */ 1286 int rtrs_srv_get_path_name(struct rtrs_srv_sess *srv, char *pathname, 1287 size_t len) 1288 { 1289 struct rtrs_srv_path *srv_path; 1290 int err = -ENOTCONN; 1291 1292 mutex_lock(&srv->paths_mutex); 1293 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 1294 if (srv_path->state != RTRS_SRV_CONNECTED) 1295 continue; 1296 strscpy(pathname, srv_path->s.sessname, 1297 min_t(size_t, sizeof(srv_path->s.sessname), len)); 1298 err = 0; 1299 break; 1300 } 1301 mutex_unlock(&srv->paths_mutex); 1302 1303 return err; 1304 } 1305 EXPORT_SYMBOL(rtrs_srv_get_path_name); 1306 1307 /** 1308 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth. 1309 * @srv: Session 1310 */ 1311 int rtrs_srv_get_queue_depth(struct rtrs_srv_sess *srv) 1312 { 1313 return srv->queue_depth; 1314 } 1315 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1316 1317 static int find_next_bit_ring(struct rtrs_srv_path *srv_path) 1318 { 1319 struct ib_device *ib_dev = srv_path->s.dev->ib_dev; 1320 int v; 1321 1322 v = cpumask_next(srv_path->cur_cq_vector, &cq_affinity_mask); 1323 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1324 v = cpumask_first(&cq_affinity_mask); 1325 return v; 1326 } 1327 1328 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_path *srv_path) 1329 { 1330 srv_path->cur_cq_vector = find_next_bit_ring(srv_path); 1331 1332 return srv_path->cur_cq_vector; 1333 } 1334 1335 static void rtrs_srv_dev_release(struct device *dev) 1336 { 1337 struct rtrs_srv_sess *srv = container_of(dev, struct rtrs_srv_sess, 1338 dev); 1339 1340 kfree(srv); 1341 } 1342 1343 static void free_srv(struct rtrs_srv_sess *srv) 1344 { 1345 int i; 1346 1347 WARN_ON(refcount_read(&srv->refcount)); 1348 for (i = 0; i < srv->queue_depth; i++) 1349 __free_pages(srv->chunks[i], get_order(max_chunk_size)); 1350 kfree(srv->chunks); 1351 mutex_destroy(&srv->paths_mutex); 1352 mutex_destroy(&srv->paths_ev_mutex); 1353 /* last put to release the srv structure */ 1354 put_device(&srv->dev); 1355 } 1356 1357 static struct rtrs_srv_sess *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1358 const uuid_t *paths_uuid, 1359 bool first_conn) 1360 { 1361 struct rtrs_srv_sess *srv; 1362 int i; 1363 1364 mutex_lock(&ctx->srv_mutex); 1365 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1366 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1367 refcount_inc_not_zero(&srv->refcount)) { 1368 mutex_unlock(&ctx->srv_mutex); 1369 return srv; 1370 } 1371 } 1372 mutex_unlock(&ctx->srv_mutex); 1373 /* 1374 * If this request is not the first connection request from the 1375 * client for this session then fail and return error. 1376 */ 1377 if (!first_conn) { 1378 pr_err_ratelimited("Error: Not the first connection request for this session\n"); 1379 return ERR_PTR(-ENXIO); 1380 } 1381 1382 /* need to allocate a new srv */ 1383 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1384 if (!srv) 1385 return ERR_PTR(-ENOMEM); 1386 1387 INIT_LIST_HEAD(&srv->paths_list); 1388 mutex_init(&srv->paths_mutex); 1389 mutex_init(&srv->paths_ev_mutex); 1390 uuid_copy(&srv->paths_uuid, paths_uuid); 1391 srv->queue_depth = sess_queue_depth; 1392 srv->ctx = ctx; 1393 device_initialize(&srv->dev); 1394 srv->dev.release = rtrs_srv_dev_release; 1395 1396 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1397 GFP_KERNEL); 1398 if (!srv->chunks) 1399 goto err_free_srv; 1400 1401 for (i = 0; i < srv->queue_depth; i++) { 1402 srv->chunks[i] = alloc_pages(GFP_KERNEL, 1403 get_order(max_chunk_size)); 1404 if (!srv->chunks[i]) 1405 goto err_free_chunks; 1406 } 1407 refcount_set(&srv->refcount, 1); 1408 mutex_lock(&ctx->srv_mutex); 1409 list_add(&srv->ctx_list, &ctx->srv_list); 1410 mutex_unlock(&ctx->srv_mutex); 1411 1412 return srv; 1413 1414 err_free_chunks: 1415 while (i--) 1416 __free_pages(srv->chunks[i], get_order(max_chunk_size)); 1417 kfree(srv->chunks); 1418 1419 err_free_srv: 1420 kfree(srv); 1421 return ERR_PTR(-ENOMEM); 1422 } 1423 1424 static void put_srv(struct rtrs_srv_sess *srv) 1425 { 1426 if (refcount_dec_and_test(&srv->refcount)) { 1427 struct rtrs_srv_ctx *ctx = srv->ctx; 1428 1429 WARN_ON(srv->dev.kobj.state_in_sysfs); 1430 1431 mutex_lock(&ctx->srv_mutex); 1432 list_del(&srv->ctx_list); 1433 mutex_unlock(&ctx->srv_mutex); 1434 free_srv(srv); 1435 } 1436 } 1437 1438 static void __add_path_to_srv(struct rtrs_srv_sess *srv, 1439 struct rtrs_srv_path *srv_path) 1440 { 1441 list_add_tail(&srv_path->s.entry, &srv->paths_list); 1442 srv->paths_num++; 1443 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1444 } 1445 1446 static void del_path_from_srv(struct rtrs_srv_path *srv_path) 1447 { 1448 struct rtrs_srv_sess *srv = srv_path->srv; 1449 1450 if (WARN_ON(!srv)) 1451 return; 1452 1453 mutex_lock(&srv->paths_mutex); 1454 list_del(&srv_path->s.entry); 1455 WARN_ON(!srv->paths_num); 1456 srv->paths_num--; 1457 mutex_unlock(&srv->paths_mutex); 1458 } 1459 1460 /* return true if addresses are the same, error other wise */ 1461 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1462 { 1463 switch (a->sa_family) { 1464 case AF_IB: 1465 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1466 &((struct sockaddr_ib *)b)->sib_addr, 1467 sizeof(struct ib_addr)) && 1468 (b->sa_family == AF_IB); 1469 case AF_INET: 1470 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1471 &((struct sockaddr_in *)b)->sin_addr, 1472 sizeof(struct in_addr)) && 1473 (b->sa_family == AF_INET); 1474 case AF_INET6: 1475 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1476 &((struct sockaddr_in6 *)b)->sin6_addr, 1477 sizeof(struct in6_addr)) && 1478 (b->sa_family == AF_INET6); 1479 default: 1480 return -ENOENT; 1481 } 1482 } 1483 1484 static bool __is_path_w_addr_exists(struct rtrs_srv_sess *srv, 1485 struct rdma_addr *addr) 1486 { 1487 struct rtrs_srv_path *srv_path; 1488 1489 list_for_each_entry(srv_path, &srv->paths_list, s.entry) 1490 if (!sockaddr_cmp((struct sockaddr *)&srv_path->s.dst_addr, 1491 (struct sockaddr *)&addr->dst_addr) && 1492 !sockaddr_cmp((struct sockaddr *)&srv_path->s.src_addr, 1493 (struct sockaddr *)&addr->src_addr)) 1494 return true; 1495 1496 return false; 1497 } 1498 1499 static void free_path(struct rtrs_srv_path *srv_path) 1500 { 1501 if (srv_path->kobj.state_in_sysfs) { 1502 kobject_del(&srv_path->kobj); 1503 kobject_put(&srv_path->kobj); 1504 } else { 1505 free_percpu(srv_path->stats->rdma_stats); 1506 kfree(srv_path->stats); 1507 kfree(srv_path); 1508 } 1509 } 1510 1511 static void rtrs_srv_close_work(struct work_struct *work) 1512 { 1513 struct rtrs_srv_path *srv_path; 1514 struct rtrs_srv_con *con; 1515 int i; 1516 1517 srv_path = container_of(work, typeof(*srv_path), close_work); 1518 1519 rtrs_srv_destroy_path_files(srv_path); 1520 rtrs_srv_stop_hb(srv_path); 1521 1522 for (i = 0; i < srv_path->s.con_num; i++) { 1523 if (!srv_path->s.con[i]) 1524 continue; 1525 con = to_srv_con(srv_path->s.con[i]); 1526 rdma_disconnect(con->c.cm_id); 1527 ib_drain_qp(con->c.qp); 1528 } 1529 1530 /* 1531 * Degrade ref count to the usual model with a single shared 1532 * atomic_t counter 1533 */ 1534 percpu_ref_kill(&srv_path->ids_inflight_ref); 1535 1536 /* Wait for all completion */ 1537 wait_for_completion(&srv_path->complete_done); 1538 1539 /* Notify upper layer if we are the last path */ 1540 rtrs_srv_path_down(srv_path); 1541 1542 unmap_cont_bufs(srv_path); 1543 rtrs_srv_free_ops_ids(srv_path); 1544 1545 for (i = 0; i < srv_path->s.con_num; i++) { 1546 if (!srv_path->s.con[i]) 1547 continue; 1548 con = to_srv_con(srv_path->s.con[i]); 1549 rtrs_cq_qp_destroy(&con->c); 1550 rdma_destroy_id(con->c.cm_id); 1551 kfree(con); 1552 } 1553 rtrs_ib_dev_put(srv_path->s.dev); 1554 1555 del_path_from_srv(srv_path); 1556 put_srv(srv_path->srv); 1557 srv_path->srv = NULL; 1558 rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSED); 1559 1560 kfree(srv_path->dma_addr); 1561 kfree(srv_path->s.con); 1562 free_path(srv_path); 1563 } 1564 1565 static int rtrs_rdma_do_accept(struct rtrs_srv_path *srv_path, 1566 struct rdma_cm_id *cm_id) 1567 { 1568 struct rtrs_srv_sess *srv = srv_path->srv; 1569 struct rtrs_msg_conn_rsp msg; 1570 struct rdma_conn_param param; 1571 int err; 1572 1573 param = (struct rdma_conn_param) { 1574 .rnr_retry_count = 7, 1575 .private_data = &msg, 1576 .private_data_len = sizeof(msg), 1577 }; 1578 1579 msg = (struct rtrs_msg_conn_rsp) { 1580 .magic = cpu_to_le16(RTRS_MAGIC), 1581 .version = cpu_to_le16(RTRS_PROTO_VER), 1582 .queue_depth = cpu_to_le16(srv->queue_depth), 1583 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1584 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1585 }; 1586 1587 if (always_invalidate) 1588 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1589 1590 err = rdma_accept(cm_id, ¶m); 1591 if (err) 1592 pr_err("rdma_accept(), err: %d\n", err); 1593 1594 return err; 1595 } 1596 1597 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1598 { 1599 struct rtrs_msg_conn_rsp msg; 1600 int err; 1601 1602 msg = (struct rtrs_msg_conn_rsp) { 1603 .magic = cpu_to_le16(RTRS_MAGIC), 1604 .version = cpu_to_le16(RTRS_PROTO_VER), 1605 .errno = cpu_to_le16(errno), 1606 }; 1607 1608 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1609 if (err) 1610 pr_err("rdma_reject(), err: %d\n", err); 1611 1612 /* Bounce errno back */ 1613 return errno; 1614 } 1615 1616 static struct rtrs_srv_path * 1617 __find_path(struct rtrs_srv_sess *srv, const uuid_t *sess_uuid) 1618 { 1619 struct rtrs_srv_path *srv_path; 1620 1621 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 1622 if (uuid_equal(&srv_path->s.uuid, sess_uuid)) 1623 return srv_path; 1624 } 1625 1626 return NULL; 1627 } 1628 1629 static int create_con(struct rtrs_srv_path *srv_path, 1630 struct rdma_cm_id *cm_id, 1631 unsigned int cid) 1632 { 1633 struct rtrs_srv_sess *srv = srv_path->srv; 1634 struct rtrs_path *s = &srv_path->s; 1635 struct rtrs_srv_con *con; 1636 1637 u32 cq_num, max_send_wr, max_recv_wr, wr_limit; 1638 int err, cq_vector; 1639 1640 con = kzalloc(sizeof(*con), GFP_KERNEL); 1641 if (!con) { 1642 err = -ENOMEM; 1643 goto err; 1644 } 1645 1646 spin_lock_init(&con->rsp_wr_wait_lock); 1647 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1648 con->c.cm_id = cm_id; 1649 con->c.path = &srv_path->s; 1650 con->c.cid = cid; 1651 atomic_set(&con->c.wr_cnt, 1); 1652 wr_limit = srv_path->s.dev->ib_dev->attrs.max_qp_wr; 1653 1654 if (con->c.cid == 0) { 1655 /* 1656 * All receive and all send (each requiring invalidate) 1657 * + 2 for drain and heartbeat 1658 */ 1659 max_send_wr = min_t(int, wr_limit, 1660 SERVICE_CON_QUEUE_DEPTH * 2 + 2); 1661 max_recv_wr = max_send_wr; 1662 s->signal_interval = min_not_zero(srv->queue_depth, 1663 (size_t)SERVICE_CON_QUEUE_DEPTH); 1664 } else { 1665 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */ 1666 if (always_invalidate) 1667 max_send_wr = 1668 min_t(int, wr_limit, 1669 srv->queue_depth * (1 + 4) + 1); 1670 else 1671 max_send_wr = 1672 min_t(int, wr_limit, 1673 srv->queue_depth * (1 + 2) + 1); 1674 1675 max_recv_wr = srv->queue_depth + 1; 1676 } 1677 cq_num = max_send_wr + max_recv_wr; 1678 atomic_set(&con->c.sq_wr_avail, max_send_wr); 1679 cq_vector = rtrs_srv_get_next_cq_vector(srv_path); 1680 1681 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1682 err = rtrs_cq_qp_create(&srv_path->s, &con->c, 1, cq_vector, cq_num, 1683 max_send_wr, max_recv_wr, 1684 IB_POLL_WORKQUEUE); 1685 if (err) { 1686 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err); 1687 goto free_con; 1688 } 1689 if (con->c.cid == 0) { 1690 err = post_recv_info_req(con); 1691 if (err) 1692 goto free_cqqp; 1693 } 1694 WARN_ON(srv_path->s.con[cid]); 1695 srv_path->s.con[cid] = &con->c; 1696 1697 /* 1698 * Change context from server to current connection. The other 1699 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1700 */ 1701 cm_id->context = &con->c; 1702 1703 return 0; 1704 1705 free_cqqp: 1706 rtrs_cq_qp_destroy(&con->c); 1707 free_con: 1708 kfree(con); 1709 1710 err: 1711 return err; 1712 } 1713 1714 static struct rtrs_srv_path *__alloc_path(struct rtrs_srv_sess *srv, 1715 struct rdma_cm_id *cm_id, 1716 unsigned int con_num, 1717 unsigned int recon_cnt, 1718 const uuid_t *uuid) 1719 { 1720 struct rtrs_srv_path *srv_path; 1721 int err = -ENOMEM; 1722 char str[NAME_MAX]; 1723 struct rtrs_addr path; 1724 1725 if (srv->paths_num >= MAX_PATHS_NUM) { 1726 err = -ECONNRESET; 1727 goto err; 1728 } 1729 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1730 err = -EEXIST; 1731 pr_err("Path with same addr exists\n"); 1732 goto err; 1733 } 1734 srv_path = kzalloc(sizeof(*srv_path), GFP_KERNEL); 1735 if (!srv_path) 1736 goto err; 1737 1738 srv_path->stats = kzalloc(sizeof(*srv_path->stats), GFP_KERNEL); 1739 if (!srv_path->stats) 1740 goto err_free_sess; 1741 1742 srv_path->stats->rdma_stats = alloc_percpu(struct rtrs_srv_stats_rdma_stats); 1743 if (!srv_path->stats->rdma_stats) 1744 goto err_free_stats; 1745 1746 srv_path->stats->srv_path = srv_path; 1747 1748 srv_path->dma_addr = kcalloc(srv->queue_depth, 1749 sizeof(*srv_path->dma_addr), 1750 GFP_KERNEL); 1751 if (!srv_path->dma_addr) 1752 goto err_free_percpu; 1753 1754 srv_path->s.con = kcalloc(con_num, sizeof(*srv_path->s.con), 1755 GFP_KERNEL); 1756 if (!srv_path->s.con) 1757 goto err_free_dma_addr; 1758 1759 srv_path->state = RTRS_SRV_CONNECTING; 1760 srv_path->srv = srv; 1761 srv_path->cur_cq_vector = -1; 1762 srv_path->s.dst_addr = cm_id->route.addr.dst_addr; 1763 srv_path->s.src_addr = cm_id->route.addr.src_addr; 1764 1765 /* temporary until receiving session-name from client */ 1766 path.src = &srv_path->s.src_addr; 1767 path.dst = &srv_path->s.dst_addr; 1768 rtrs_addr_to_str(&path, str, sizeof(str)); 1769 strscpy(srv_path->s.sessname, str, sizeof(srv_path->s.sessname)); 1770 1771 srv_path->s.con_num = con_num; 1772 srv_path->s.irq_con_num = con_num; 1773 srv_path->s.recon_cnt = recon_cnt; 1774 uuid_copy(&srv_path->s.uuid, uuid); 1775 spin_lock_init(&srv_path->state_lock); 1776 INIT_WORK(&srv_path->close_work, rtrs_srv_close_work); 1777 rtrs_srv_init_hb(srv_path); 1778 1779 srv_path->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1780 if (!srv_path->s.dev) { 1781 err = -ENOMEM; 1782 goto err_free_con; 1783 } 1784 err = map_cont_bufs(srv_path); 1785 if (err) 1786 goto err_put_dev; 1787 1788 err = rtrs_srv_alloc_ops_ids(srv_path); 1789 if (err) 1790 goto err_unmap_bufs; 1791 1792 __add_path_to_srv(srv, srv_path); 1793 1794 return srv_path; 1795 1796 err_unmap_bufs: 1797 unmap_cont_bufs(srv_path); 1798 err_put_dev: 1799 rtrs_ib_dev_put(srv_path->s.dev); 1800 err_free_con: 1801 kfree(srv_path->s.con); 1802 err_free_dma_addr: 1803 kfree(srv_path->dma_addr); 1804 err_free_percpu: 1805 free_percpu(srv_path->stats->rdma_stats); 1806 err_free_stats: 1807 kfree(srv_path->stats); 1808 err_free_sess: 1809 kfree(srv_path); 1810 err: 1811 return ERR_PTR(err); 1812 } 1813 1814 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1815 const struct rtrs_msg_conn_req *msg, 1816 size_t len) 1817 { 1818 struct rtrs_srv_ctx *ctx = cm_id->context; 1819 struct rtrs_srv_path *srv_path; 1820 struct rtrs_srv_sess *srv; 1821 1822 u16 version, con_num, cid; 1823 u16 recon_cnt; 1824 int err = -ECONNRESET; 1825 1826 if (len < sizeof(*msg)) { 1827 pr_err("Invalid RTRS connection request\n"); 1828 goto reject_w_err; 1829 } 1830 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1831 pr_err("Invalid RTRS magic\n"); 1832 goto reject_w_err; 1833 } 1834 version = le16_to_cpu(msg->version); 1835 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1836 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1837 version >> 8, RTRS_PROTO_VER_MAJOR); 1838 goto reject_w_err; 1839 } 1840 con_num = le16_to_cpu(msg->cid_num); 1841 if (con_num > 4096) { 1842 /* Sanity check */ 1843 pr_err("Too many connections requested: %d\n", con_num); 1844 goto reject_w_err; 1845 } 1846 cid = le16_to_cpu(msg->cid); 1847 if (cid >= con_num) { 1848 /* Sanity check */ 1849 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1850 goto reject_w_err; 1851 } 1852 recon_cnt = le16_to_cpu(msg->recon_cnt); 1853 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn); 1854 if (IS_ERR(srv)) { 1855 err = PTR_ERR(srv); 1856 pr_err("get_or_create_srv(), error %d\n", err); 1857 goto reject_w_err; 1858 } 1859 mutex_lock(&srv->paths_mutex); 1860 srv_path = __find_path(srv, &msg->sess_uuid); 1861 if (srv_path) { 1862 struct rtrs_path *s = &srv_path->s; 1863 1864 /* Session already holds a reference */ 1865 put_srv(srv); 1866 1867 if (srv_path->state != RTRS_SRV_CONNECTING) { 1868 rtrs_err(s, "Session in wrong state: %s\n", 1869 rtrs_srv_state_str(srv_path->state)); 1870 mutex_unlock(&srv->paths_mutex); 1871 goto reject_w_err; 1872 } 1873 /* 1874 * Sanity checks 1875 */ 1876 if (con_num != s->con_num || cid >= s->con_num) { 1877 rtrs_err(s, "Incorrect request: %d, %d\n", 1878 cid, con_num); 1879 mutex_unlock(&srv->paths_mutex); 1880 goto reject_w_err; 1881 } 1882 if (s->con[cid]) { 1883 rtrs_err(s, "Connection already exists: %d\n", 1884 cid); 1885 mutex_unlock(&srv->paths_mutex); 1886 goto reject_w_err; 1887 } 1888 } else { 1889 srv_path = __alloc_path(srv, cm_id, con_num, recon_cnt, 1890 &msg->sess_uuid); 1891 if (IS_ERR(srv_path)) { 1892 mutex_unlock(&srv->paths_mutex); 1893 put_srv(srv); 1894 err = PTR_ERR(srv_path); 1895 pr_err("RTRS server session allocation failed: %d\n", err); 1896 goto reject_w_err; 1897 } 1898 } 1899 err = create_con(srv_path, cm_id, cid); 1900 if (err) { 1901 rtrs_err((&srv_path->s), "create_con(), error %d\n", err); 1902 rtrs_rdma_do_reject(cm_id, err); 1903 /* 1904 * Since session has other connections we follow normal way 1905 * through workqueue, but still return an error to tell cma.c 1906 * to call rdma_destroy_id() for current connection. 1907 */ 1908 goto close_and_return_err; 1909 } 1910 err = rtrs_rdma_do_accept(srv_path, cm_id); 1911 if (err) { 1912 rtrs_err((&srv_path->s), "rtrs_rdma_do_accept(), error %d\n", err); 1913 rtrs_rdma_do_reject(cm_id, err); 1914 /* 1915 * Since current connection was successfully added to the 1916 * session we follow normal way through workqueue to close the 1917 * session, thus return 0 to tell cma.c we call 1918 * rdma_destroy_id() ourselves. 1919 */ 1920 err = 0; 1921 goto close_and_return_err; 1922 } 1923 mutex_unlock(&srv->paths_mutex); 1924 1925 return 0; 1926 1927 reject_w_err: 1928 return rtrs_rdma_do_reject(cm_id, err); 1929 1930 close_and_return_err: 1931 mutex_unlock(&srv->paths_mutex); 1932 close_path(srv_path); 1933 1934 return err; 1935 } 1936 1937 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 1938 struct rdma_cm_event *ev) 1939 { 1940 struct rtrs_srv_path *srv_path = NULL; 1941 struct rtrs_path *s = NULL; 1942 struct rtrs_con *c = NULL; 1943 1944 if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST) 1945 /* 1946 * In case of error cma.c will destroy cm_id, 1947 * see cma_process_remove() 1948 */ 1949 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 1950 ev->param.conn.private_data_len); 1951 1952 c = cm_id->context; 1953 s = c->path; 1954 srv_path = to_srv_path(s); 1955 1956 switch (ev->event) { 1957 case RDMA_CM_EVENT_ESTABLISHED: 1958 /* Nothing here */ 1959 break; 1960 case RDMA_CM_EVENT_REJECTED: 1961 case RDMA_CM_EVENT_CONNECT_ERROR: 1962 case RDMA_CM_EVENT_UNREACHABLE: 1963 rtrs_err(s, "CM error (CM event: %s, err: %d)\n", 1964 rdma_event_msg(ev->event), ev->status); 1965 fallthrough; 1966 case RDMA_CM_EVENT_DISCONNECTED: 1967 case RDMA_CM_EVENT_ADDR_CHANGE: 1968 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1969 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1970 close_path(srv_path); 1971 break; 1972 default: 1973 pr_err("Ignoring unexpected CM event %s, err %d\n", 1974 rdma_event_msg(ev->event), ev->status); 1975 break; 1976 } 1977 1978 return 0; 1979 } 1980 1981 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 1982 struct sockaddr *addr, 1983 enum rdma_ucm_port_space ps) 1984 { 1985 struct rdma_cm_id *cm_id; 1986 int ret; 1987 1988 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 1989 ctx, ps, IB_QPT_RC); 1990 if (IS_ERR(cm_id)) { 1991 ret = PTR_ERR(cm_id); 1992 pr_err("Creating id for RDMA connection failed, err: %d\n", 1993 ret); 1994 goto err_out; 1995 } 1996 ret = rdma_bind_addr(cm_id, addr); 1997 if (ret) { 1998 pr_err("Binding RDMA address failed, err: %d\n", ret); 1999 goto err_cm; 2000 } 2001 ret = rdma_listen(cm_id, 64); 2002 if (ret) { 2003 pr_err("Listening on RDMA connection failed, err: %d\n", 2004 ret); 2005 goto err_cm; 2006 } 2007 2008 return cm_id; 2009 2010 err_cm: 2011 rdma_destroy_id(cm_id); 2012 err_out: 2013 2014 return ERR_PTR(ret); 2015 } 2016 2017 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 2018 { 2019 struct sockaddr_in6 sin = { 2020 .sin6_family = AF_INET6, 2021 .sin6_addr = IN6ADDR_ANY_INIT, 2022 .sin6_port = htons(port), 2023 }; 2024 struct sockaddr_ib sib = { 2025 .sib_family = AF_IB, 2026 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 2027 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 2028 .sib_pkey = cpu_to_be16(0xffff), 2029 }; 2030 struct rdma_cm_id *cm_ip, *cm_ib; 2031 int ret; 2032 2033 /* 2034 * We accept both IPoIB and IB connections, so we need to keep 2035 * two cm id's, one for each socket type and port space. 2036 * If the cm initialization of one of the id's fails, we abort 2037 * everything. 2038 */ 2039 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 2040 if (IS_ERR(cm_ip)) 2041 return PTR_ERR(cm_ip); 2042 2043 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 2044 if (IS_ERR(cm_ib)) { 2045 ret = PTR_ERR(cm_ib); 2046 goto free_cm_ip; 2047 } 2048 2049 ctx->cm_id_ip = cm_ip; 2050 ctx->cm_id_ib = cm_ib; 2051 2052 return 0; 2053 2054 free_cm_ip: 2055 rdma_destroy_id(cm_ip); 2056 2057 return ret; 2058 } 2059 2060 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 2061 { 2062 struct rtrs_srv_ctx *ctx; 2063 2064 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2065 if (!ctx) 2066 return NULL; 2067 2068 ctx->ops = *ops; 2069 mutex_init(&ctx->srv_mutex); 2070 INIT_LIST_HEAD(&ctx->srv_list); 2071 2072 return ctx; 2073 } 2074 2075 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2076 { 2077 WARN_ON(!list_empty(&ctx->srv_list)); 2078 mutex_destroy(&ctx->srv_mutex); 2079 kfree(ctx); 2080 } 2081 2082 static int rtrs_srv_add_one(struct ib_device *device) 2083 { 2084 struct rtrs_srv_ctx *ctx; 2085 int ret = 0; 2086 2087 mutex_lock(&ib_ctx.ib_dev_mutex); 2088 if (ib_ctx.ib_dev_count) 2089 goto out; 2090 2091 /* 2092 * Since our CM IDs are NOT bound to any ib device we will create them 2093 * only once 2094 */ 2095 ctx = ib_ctx.srv_ctx; 2096 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2097 if (ret) { 2098 /* 2099 * We errored out here. 2100 * According to the ib code, if we encounter an error here then the 2101 * error code is ignored, and no more calls to our ops are made. 2102 */ 2103 pr_err("Failed to initialize RDMA connection"); 2104 goto err_out; 2105 } 2106 2107 out: 2108 /* 2109 * Keep a track on the number of ib devices added 2110 */ 2111 ib_ctx.ib_dev_count++; 2112 2113 err_out: 2114 mutex_unlock(&ib_ctx.ib_dev_mutex); 2115 return ret; 2116 } 2117 2118 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2119 { 2120 struct rtrs_srv_ctx *ctx; 2121 2122 mutex_lock(&ib_ctx.ib_dev_mutex); 2123 ib_ctx.ib_dev_count--; 2124 2125 if (ib_ctx.ib_dev_count) 2126 goto out; 2127 2128 /* 2129 * Since our CM IDs are NOT bound to any ib device we will remove them 2130 * only once, when the last device is removed 2131 */ 2132 ctx = ib_ctx.srv_ctx; 2133 rdma_destroy_id(ctx->cm_id_ip); 2134 rdma_destroy_id(ctx->cm_id_ib); 2135 2136 out: 2137 mutex_unlock(&ib_ctx.ib_dev_mutex); 2138 } 2139 2140 static struct ib_client rtrs_srv_client = { 2141 .name = "rtrs_server", 2142 .add = rtrs_srv_add_one, 2143 .remove = rtrs_srv_remove_one 2144 }; 2145 2146 /** 2147 * rtrs_srv_open() - open RTRS server context 2148 * @ops: callback functions 2149 * @port: port to listen on 2150 * 2151 * Creates server context with specified callbacks. 2152 * 2153 * Return a valid pointer on success otherwise PTR_ERR. 2154 */ 2155 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2156 { 2157 struct rtrs_srv_ctx *ctx; 2158 int err; 2159 2160 ctx = alloc_srv_ctx(ops); 2161 if (!ctx) 2162 return ERR_PTR(-ENOMEM); 2163 2164 mutex_init(&ib_ctx.ib_dev_mutex); 2165 ib_ctx.srv_ctx = ctx; 2166 ib_ctx.port = port; 2167 2168 err = ib_register_client(&rtrs_srv_client); 2169 if (err) { 2170 free_srv_ctx(ctx); 2171 return ERR_PTR(err); 2172 } 2173 2174 return ctx; 2175 } 2176 EXPORT_SYMBOL(rtrs_srv_open); 2177 2178 static void close_paths(struct rtrs_srv_sess *srv) 2179 { 2180 struct rtrs_srv_path *srv_path; 2181 2182 mutex_lock(&srv->paths_mutex); 2183 list_for_each_entry(srv_path, &srv->paths_list, s.entry) 2184 close_path(srv_path); 2185 mutex_unlock(&srv->paths_mutex); 2186 } 2187 2188 static void close_ctx(struct rtrs_srv_ctx *ctx) 2189 { 2190 struct rtrs_srv_sess *srv; 2191 2192 mutex_lock(&ctx->srv_mutex); 2193 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2194 close_paths(srv); 2195 mutex_unlock(&ctx->srv_mutex); 2196 flush_workqueue(rtrs_wq); 2197 } 2198 2199 /** 2200 * rtrs_srv_close() - close RTRS server context 2201 * @ctx: pointer to server context 2202 * 2203 * Closes RTRS server context with all client sessions. 2204 */ 2205 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2206 { 2207 ib_unregister_client(&rtrs_srv_client); 2208 mutex_destroy(&ib_ctx.ib_dev_mutex); 2209 close_ctx(ctx); 2210 free_srv_ctx(ctx); 2211 } 2212 EXPORT_SYMBOL(rtrs_srv_close); 2213 2214 static int check_module_params(void) 2215 { 2216 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2217 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2218 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2219 return -EINVAL; 2220 } 2221 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) { 2222 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2223 max_chunk_size, MIN_CHUNK_SIZE); 2224 return -EINVAL; 2225 } 2226 2227 /* 2228 * Check if IB immediate data size is enough to hold the mem_id and the 2229 * offset inside the memory chunk 2230 */ 2231 if ((ilog2(sess_queue_depth - 1) + 1) + 2232 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2233 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2234 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2235 return -EINVAL; 2236 } 2237 2238 return 0; 2239 } 2240 2241 static int __init rtrs_server_init(void) 2242 { 2243 int err; 2244 2245 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2246 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2247 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2248 sess_queue_depth, always_invalidate); 2249 2250 rtrs_rdma_dev_pd_init(0, &dev_pd); 2251 2252 err = check_module_params(); 2253 if (err) { 2254 pr_err("Failed to load module, invalid module parameters, err: %d\n", 2255 err); 2256 return err; 2257 } 2258 err = class_register(&rtrs_dev_class); 2259 if (err) 2260 goto out_err; 2261 2262 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2263 if (!rtrs_wq) { 2264 err = -ENOMEM; 2265 goto out_dev_class; 2266 } 2267 2268 return 0; 2269 2270 out_dev_class: 2271 class_unregister(&rtrs_dev_class); 2272 out_err: 2273 return err; 2274 } 2275 2276 static void __exit rtrs_server_exit(void) 2277 { 2278 destroy_workqueue(rtrs_wq); 2279 class_unregister(&rtrs_dev_class); 2280 rtrs_rdma_dev_pd_deinit(&dev_pd); 2281 } 2282 2283 module_init(rtrs_server_init); 2284 module_exit(rtrs_server_exit); 2285