1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 15 #include "rtrs-srv.h" 16 #include "rtrs-log.h" 17 #include <rdma/ib_cm.h> 18 #include <rdma/ib_verbs.h> 19 #include "rtrs-srv-trace.h" 20 21 MODULE_DESCRIPTION("RDMA Transport Server"); 22 MODULE_LICENSE("GPL"); 23 24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 26 #define DEFAULT_SESS_QUEUE_DEPTH 512 27 #define MAX_HDR_SIZE PAGE_SIZE 28 29 static struct rtrs_rdma_dev_pd dev_pd; 30 const struct class rtrs_dev_class = { 31 .name = "rtrs-server", 32 }; 33 static struct rtrs_srv_ib_ctx ib_ctx; 34 35 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 36 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 37 38 static bool always_invalidate = true; 39 module_param(always_invalidate, bool, 0444); 40 MODULE_PARM_DESC(always_invalidate, 41 "Invalidate memory registration for contiguous memory regions before accessing."); 42 43 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 44 MODULE_PARM_DESC(max_chunk_size, 45 "Max size for each IO request, when change the unit is in byte (default: " 46 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 47 48 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 49 MODULE_PARM_DESC(sess_queue_depth, 50 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 51 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 52 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 53 54 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 55 56 static struct workqueue_struct *rtrs_wq; 57 58 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 59 { 60 return container_of(c, struct rtrs_srv_con, c); 61 } 62 63 static bool rtrs_srv_change_state(struct rtrs_srv_path *srv_path, 64 enum rtrs_srv_state new_state) 65 { 66 enum rtrs_srv_state old_state; 67 bool changed = false; 68 unsigned long flags; 69 70 spin_lock_irqsave(&srv_path->state_lock, flags); 71 old_state = srv_path->state; 72 switch (new_state) { 73 case RTRS_SRV_CONNECTED: 74 if (old_state == RTRS_SRV_CONNECTING) 75 changed = true; 76 break; 77 case RTRS_SRV_CLOSING: 78 if (old_state == RTRS_SRV_CONNECTING || 79 old_state == RTRS_SRV_CONNECTED) 80 changed = true; 81 break; 82 case RTRS_SRV_CLOSED: 83 if (old_state == RTRS_SRV_CLOSING) 84 changed = true; 85 break; 86 default: 87 break; 88 } 89 if (changed) 90 srv_path->state = new_state; 91 spin_unlock_irqrestore(&srv_path->state_lock, flags); 92 93 return changed; 94 } 95 96 static void free_id(struct rtrs_srv_op *id) 97 { 98 if (!id) 99 return; 100 kfree(id); 101 } 102 103 static void rtrs_srv_free_ops_ids(struct rtrs_srv_path *srv_path) 104 { 105 struct rtrs_srv_sess *srv = srv_path->srv; 106 int i; 107 108 if (srv_path->ops_ids) { 109 for (i = 0; i < srv->queue_depth; i++) 110 free_id(srv_path->ops_ids[i]); 111 kfree(srv_path->ops_ids); 112 srv_path->ops_ids = NULL; 113 } 114 } 115 116 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 117 118 static struct ib_cqe io_comp_cqe = { 119 .done = rtrs_srv_rdma_done 120 }; 121 122 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref) 123 { 124 struct rtrs_srv_path *srv_path = container_of(ref, 125 struct rtrs_srv_path, 126 ids_inflight_ref); 127 128 percpu_ref_exit(&srv_path->ids_inflight_ref); 129 complete(&srv_path->complete_done); 130 } 131 132 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_path *srv_path) 133 { 134 struct rtrs_srv_sess *srv = srv_path->srv; 135 struct rtrs_srv_op *id; 136 int i, ret; 137 138 srv_path->ops_ids = kcalloc(srv->queue_depth, 139 sizeof(*srv_path->ops_ids), 140 GFP_KERNEL); 141 if (!srv_path->ops_ids) 142 goto err; 143 144 for (i = 0; i < srv->queue_depth; ++i) { 145 id = kzalloc(sizeof(*id), GFP_KERNEL); 146 if (!id) 147 goto err; 148 149 srv_path->ops_ids[i] = id; 150 } 151 152 ret = percpu_ref_init(&srv_path->ids_inflight_ref, 153 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL); 154 if (ret) { 155 pr_err("Percpu reference init failed\n"); 156 goto err; 157 } 158 init_completion(&srv_path->complete_done); 159 160 return 0; 161 162 err: 163 rtrs_srv_free_ops_ids(srv_path); 164 return -ENOMEM; 165 } 166 167 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_path *srv_path) 168 { 169 percpu_ref_get(&srv_path->ids_inflight_ref); 170 } 171 172 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_path *srv_path) 173 { 174 percpu_ref_put(&srv_path->ids_inflight_ref); 175 } 176 177 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 178 { 179 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 180 struct rtrs_path *s = con->c.path; 181 struct rtrs_srv_path *srv_path = to_srv_path(s); 182 183 if (wc->status != IB_WC_SUCCESS) { 184 rtrs_err(s, "REG MR failed: %s\n", 185 ib_wc_status_msg(wc->status)); 186 close_path(srv_path); 187 return; 188 } 189 } 190 191 static struct ib_cqe local_reg_cqe = { 192 .done = rtrs_srv_reg_mr_done 193 }; 194 195 static int rdma_write_sg(struct rtrs_srv_op *id) 196 { 197 struct rtrs_path *s = id->con->c.path; 198 struct rtrs_srv_path *srv_path = to_srv_path(s); 199 dma_addr_t dma_addr = srv_path->dma_addr[id->msg_id]; 200 struct rtrs_srv_mr *srv_mr; 201 struct ib_send_wr inv_wr; 202 struct ib_rdma_wr imm_wr; 203 struct ib_rdma_wr *wr = NULL; 204 enum ib_send_flags flags; 205 size_t sg_cnt; 206 int err, offset; 207 bool need_inval; 208 u32 rkey = 0; 209 struct ib_reg_wr rwr; 210 struct ib_sge *plist; 211 struct ib_sge list; 212 213 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 214 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 215 if (sg_cnt != 1) 216 return -EINVAL; 217 218 offset = 0; 219 220 wr = &id->tx_wr; 221 plist = &id->tx_sg; 222 plist->addr = dma_addr + offset; 223 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 224 225 /* WR will fail with length error 226 * if this is 0 227 */ 228 if (plist->length == 0) { 229 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 230 return -EINVAL; 231 } 232 233 plist->lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 234 offset += plist->length; 235 236 wr->wr.sg_list = plist; 237 wr->wr.num_sge = 1; 238 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 239 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 240 if (rkey == 0) 241 rkey = wr->rkey; 242 else 243 /* Only one key is actually used */ 244 WARN_ON_ONCE(rkey != wr->rkey); 245 246 wr->wr.opcode = IB_WR_RDMA_WRITE; 247 wr->wr.wr_cqe = &io_comp_cqe; 248 wr->wr.ex.imm_data = 0; 249 wr->wr.send_flags = 0; 250 251 if (need_inval && always_invalidate) { 252 wr->wr.next = &rwr.wr; 253 rwr.wr.next = &inv_wr; 254 inv_wr.next = &imm_wr.wr; 255 } else if (always_invalidate) { 256 wr->wr.next = &rwr.wr; 257 rwr.wr.next = &imm_wr.wr; 258 } else if (need_inval) { 259 wr->wr.next = &inv_wr; 260 inv_wr.next = &imm_wr.wr; 261 } else { 262 wr->wr.next = &imm_wr.wr; 263 } 264 /* 265 * From time to time we have to post signaled sends, 266 * or send queue will fill up and only QP reset can help. 267 */ 268 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ? 269 0 : IB_SEND_SIGNALED; 270 271 if (need_inval) { 272 inv_wr.sg_list = NULL; 273 inv_wr.num_sge = 0; 274 inv_wr.opcode = IB_WR_SEND_WITH_INV; 275 inv_wr.wr_cqe = &io_comp_cqe; 276 inv_wr.send_flags = 0; 277 inv_wr.ex.invalidate_rkey = rkey; 278 } 279 280 imm_wr.wr.next = NULL; 281 if (always_invalidate) { 282 struct rtrs_msg_rkey_rsp *msg; 283 284 srv_mr = &srv_path->mrs[id->msg_id]; 285 rwr.wr.opcode = IB_WR_REG_MR; 286 rwr.wr.wr_cqe = &local_reg_cqe; 287 rwr.wr.num_sge = 0; 288 rwr.mr = srv_mr->mr; 289 rwr.wr.send_flags = 0; 290 rwr.key = srv_mr->mr->rkey; 291 rwr.access = (IB_ACCESS_LOCAL_WRITE | 292 IB_ACCESS_REMOTE_WRITE); 293 msg = srv_mr->iu->buf; 294 msg->buf_id = cpu_to_le16(id->msg_id); 295 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 296 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 297 298 list.addr = srv_mr->iu->dma_addr; 299 list.length = sizeof(*msg); 300 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 301 imm_wr.wr.sg_list = &list; 302 imm_wr.wr.num_sge = 1; 303 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 304 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 305 srv_mr->iu->dma_addr, 306 srv_mr->iu->size, DMA_TO_DEVICE); 307 } else { 308 imm_wr.wr.sg_list = NULL; 309 imm_wr.wr.num_sge = 0; 310 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 311 } 312 imm_wr.wr.send_flags = flags; 313 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 314 0, need_inval)); 315 316 imm_wr.wr.wr_cqe = &io_comp_cqe; 317 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, dma_addr, 318 offset, DMA_BIDIRECTIONAL); 319 320 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 321 if (err) 322 rtrs_err(s, 323 "Posting RDMA-Write-Request to QP failed, err: %d\n", 324 err); 325 326 return err; 327 } 328 329 /** 330 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 331 * requests or on successful WRITE request. 332 * @con: the connection to send back result 333 * @id: the id associated with the IO 334 * @errno: the error number of the IO. 335 * 336 * Return 0 on success, errno otherwise. 337 */ 338 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 339 int errno) 340 { 341 struct rtrs_path *s = con->c.path; 342 struct rtrs_srv_path *srv_path = to_srv_path(s); 343 struct ib_send_wr inv_wr, *wr = NULL; 344 struct ib_rdma_wr imm_wr; 345 struct ib_reg_wr rwr; 346 struct rtrs_srv_mr *srv_mr; 347 bool need_inval = false; 348 enum ib_send_flags flags; 349 u32 imm; 350 int err; 351 352 if (id->dir == READ) { 353 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 354 size_t sg_cnt; 355 356 need_inval = le16_to_cpu(rd_msg->flags) & 357 RTRS_MSG_NEED_INVAL_F; 358 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 359 360 if (need_inval) { 361 if (sg_cnt) { 362 inv_wr.wr_cqe = &io_comp_cqe; 363 inv_wr.sg_list = NULL; 364 inv_wr.num_sge = 0; 365 inv_wr.opcode = IB_WR_SEND_WITH_INV; 366 inv_wr.send_flags = 0; 367 /* Only one key is actually used */ 368 inv_wr.ex.invalidate_rkey = 369 le32_to_cpu(rd_msg->desc[0].key); 370 } else { 371 WARN_ON_ONCE(1); 372 need_inval = false; 373 } 374 } 375 } 376 377 trace_send_io_resp_imm(id, need_inval, always_invalidate, errno); 378 379 if (need_inval && always_invalidate) { 380 wr = &inv_wr; 381 inv_wr.next = &rwr.wr; 382 rwr.wr.next = &imm_wr.wr; 383 } else if (always_invalidate) { 384 wr = &rwr.wr; 385 rwr.wr.next = &imm_wr.wr; 386 } else if (need_inval) { 387 wr = &inv_wr; 388 inv_wr.next = &imm_wr.wr; 389 } else { 390 wr = &imm_wr.wr; 391 } 392 /* 393 * From time to time we have to post signalled sends, 394 * or send queue will fill up and only QP reset can help. 395 */ 396 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ? 397 0 : IB_SEND_SIGNALED; 398 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 399 imm_wr.wr.next = NULL; 400 if (always_invalidate) { 401 struct ib_sge list; 402 struct rtrs_msg_rkey_rsp *msg; 403 404 srv_mr = &srv_path->mrs[id->msg_id]; 405 rwr.wr.next = &imm_wr.wr; 406 rwr.wr.opcode = IB_WR_REG_MR; 407 rwr.wr.wr_cqe = &local_reg_cqe; 408 rwr.wr.num_sge = 0; 409 rwr.wr.send_flags = 0; 410 rwr.mr = srv_mr->mr; 411 rwr.key = srv_mr->mr->rkey; 412 rwr.access = (IB_ACCESS_LOCAL_WRITE | 413 IB_ACCESS_REMOTE_WRITE); 414 msg = srv_mr->iu->buf; 415 msg->buf_id = cpu_to_le16(id->msg_id); 416 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 417 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 418 419 list.addr = srv_mr->iu->dma_addr; 420 list.length = sizeof(*msg); 421 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 422 imm_wr.wr.sg_list = &list; 423 imm_wr.wr.num_sge = 1; 424 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 425 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 426 srv_mr->iu->dma_addr, 427 srv_mr->iu->size, DMA_TO_DEVICE); 428 } else { 429 imm_wr.wr.sg_list = NULL; 430 imm_wr.wr.num_sge = 0; 431 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 432 } 433 imm_wr.wr.send_flags = flags; 434 imm_wr.wr.wr_cqe = &io_comp_cqe; 435 436 imm_wr.wr.ex.imm_data = cpu_to_be32(imm); 437 438 err = ib_post_send(id->con->c.qp, wr, NULL); 439 if (err) 440 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n", 441 err); 442 443 return err; 444 } 445 446 void close_path(struct rtrs_srv_path *srv_path) 447 { 448 if (rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSING)) 449 queue_work(rtrs_wq, &srv_path->close_work); 450 WARN_ON(srv_path->state != RTRS_SRV_CLOSING); 451 } 452 453 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 454 { 455 switch (state) { 456 case RTRS_SRV_CONNECTING: 457 return "RTRS_SRV_CONNECTING"; 458 case RTRS_SRV_CONNECTED: 459 return "RTRS_SRV_CONNECTED"; 460 case RTRS_SRV_CLOSING: 461 return "RTRS_SRV_CLOSING"; 462 case RTRS_SRV_CLOSED: 463 return "RTRS_SRV_CLOSED"; 464 default: 465 return "UNKNOWN"; 466 } 467 } 468 469 /** 470 * rtrs_srv_resp_rdma() - Finish an RDMA request 471 * 472 * @id: Internal RTRS operation identifier 473 * @status: Response Code sent to the other side for this operation. 474 * 0 = success, <=0 error 475 * Context: any 476 * 477 * Finish a RDMA operation. A message is sent to the client and the 478 * corresponding memory areas will be released. 479 */ 480 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 481 { 482 struct rtrs_srv_path *srv_path; 483 struct rtrs_srv_con *con; 484 struct rtrs_path *s; 485 int err; 486 487 if (WARN_ON(!id)) 488 return true; 489 490 con = id->con; 491 s = con->c.path; 492 srv_path = to_srv_path(s); 493 494 id->status = status; 495 496 if (srv_path->state != RTRS_SRV_CONNECTED) { 497 rtrs_err_rl(s, 498 "Sending I/O response failed, server path %s is disconnected, path state %s\n", 499 kobject_name(&srv_path->kobj), 500 rtrs_srv_state_str(srv_path->state)); 501 goto out; 502 } 503 if (always_invalidate) { 504 struct rtrs_srv_mr *mr = &srv_path->mrs[id->msg_id]; 505 506 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 507 } 508 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) { 509 rtrs_err(s, "IB send queue full: srv_path=%s cid=%d\n", 510 kobject_name(&srv_path->kobj), 511 con->c.cid); 512 atomic_add(1, &con->c.sq_wr_avail); 513 spin_lock(&con->rsp_wr_wait_lock); 514 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 515 spin_unlock(&con->rsp_wr_wait_lock); 516 return false; 517 } 518 519 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 520 err = send_io_resp_imm(con, id, status); 521 else 522 err = rdma_write_sg(id); 523 524 if (err) { 525 rtrs_err_rl(s, "IO response failed: %d: srv_path=%s\n", err, 526 kobject_name(&srv_path->kobj)); 527 close_path(srv_path); 528 } 529 out: 530 rtrs_srv_put_ops_ids(srv_path); 531 return true; 532 } 533 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 534 535 /** 536 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 537 * @srv: Session pointer 538 * @priv: The private pointer that is associated with the session. 539 */ 540 void rtrs_srv_set_sess_priv(struct rtrs_srv_sess *srv, void *priv) 541 { 542 srv->priv = priv; 543 } 544 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 545 546 static void unmap_cont_bufs(struct rtrs_srv_path *srv_path) 547 { 548 int i; 549 550 for (i = 0; i < srv_path->mrs_num; i++) { 551 struct rtrs_srv_mr *srv_mr; 552 553 srv_mr = &srv_path->mrs[i]; 554 555 if (always_invalidate) 556 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1); 557 558 ib_dereg_mr(srv_mr->mr); 559 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, srv_mr->sgt.sgl, 560 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 561 sg_free_table(&srv_mr->sgt); 562 } 563 kfree(srv_path->mrs); 564 } 565 566 static int map_cont_bufs(struct rtrs_srv_path *srv_path) 567 { 568 struct rtrs_srv_sess *srv = srv_path->srv; 569 struct rtrs_path *ss = &srv_path->s; 570 int i, err, mrs_num; 571 unsigned int chunk_bits; 572 int chunks_per_mr = 1; 573 struct ib_mr *mr; 574 struct sg_table *sgt; 575 576 /* 577 * Here we map queue_depth chunks to MR. Firstly we have to 578 * figure out how many chunks can we map per MR. 579 */ 580 if (always_invalidate) { 581 /* 582 * in order to do invalidate for each chunks of memory, we needs 583 * more memory regions. 584 */ 585 mrs_num = srv->queue_depth; 586 } else { 587 chunks_per_mr = 588 srv_path->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 589 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 590 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 591 } 592 593 srv_path->mrs = kcalloc(mrs_num, sizeof(*srv_path->mrs), GFP_KERNEL); 594 if (!srv_path->mrs) 595 return -ENOMEM; 596 597 for (srv_path->mrs_num = 0; srv_path->mrs_num < mrs_num; 598 srv_path->mrs_num++) { 599 struct rtrs_srv_mr *srv_mr = &srv_path->mrs[srv_path->mrs_num]; 600 struct scatterlist *s; 601 int nr, nr_sgt, chunks; 602 603 sgt = &srv_mr->sgt; 604 chunks = chunks_per_mr * srv_path->mrs_num; 605 if (!always_invalidate) 606 chunks_per_mr = min_t(int, chunks_per_mr, 607 srv->queue_depth - chunks); 608 609 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 610 if (err) 611 goto err; 612 613 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 614 sg_set_page(s, srv->chunks[chunks + i], 615 max_chunk_size, 0); 616 617 nr_sgt = ib_dma_map_sg(srv_path->s.dev->ib_dev, sgt->sgl, 618 sgt->nents, DMA_BIDIRECTIONAL); 619 if (!nr_sgt) { 620 err = -EINVAL; 621 goto free_sg; 622 } 623 mr = ib_alloc_mr(srv_path->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 624 nr_sgt); 625 if (IS_ERR(mr)) { 626 err = PTR_ERR(mr); 627 goto unmap_sg; 628 } 629 nr = ib_map_mr_sg(mr, sgt->sgl, nr_sgt, 630 NULL, max_chunk_size); 631 if (nr != nr_sgt) { 632 err = nr < 0 ? nr : -EINVAL; 633 goto dereg_mr; 634 } 635 636 if (always_invalidate) { 637 srv_mr->iu = rtrs_iu_alloc(1, 638 sizeof(struct rtrs_msg_rkey_rsp), 639 GFP_KERNEL, srv_path->s.dev->ib_dev, 640 DMA_TO_DEVICE, rtrs_srv_rdma_done); 641 if (!srv_mr->iu) { 642 err = -ENOMEM; 643 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err); 644 goto dereg_mr; 645 } 646 } 647 /* Eventually dma addr for each chunk can be cached */ 648 for_each_sg(sgt->sgl, s, nr_sgt, i) 649 srv_path->dma_addr[chunks + i] = sg_dma_address(s); 650 651 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 652 srv_mr->mr = mr; 653 } 654 655 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 656 srv_path->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 657 658 return 0; 659 660 dereg_mr: 661 ib_dereg_mr(mr); 662 unmap_sg: 663 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, sgt->sgl, 664 sgt->nents, DMA_BIDIRECTIONAL); 665 free_sg: 666 sg_free_table(sgt); 667 err: 668 unmap_cont_bufs(srv_path); 669 670 return err; 671 } 672 673 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 674 { 675 struct rtrs_srv_con *con = container_of(c, typeof(*con), c); 676 struct rtrs_srv_path *srv_path = to_srv_path(con->c.path); 677 678 rtrs_err(con->c.path, "HB err handler for path=%s\n", kobject_name(&srv_path->kobj)); 679 close_path(to_srv_path(c->path)); 680 } 681 682 static void rtrs_srv_init_hb(struct rtrs_srv_path *srv_path) 683 { 684 rtrs_init_hb(&srv_path->s, &io_comp_cqe, 685 RTRS_HB_INTERVAL_MS, 686 RTRS_HB_MISSED_MAX, 687 rtrs_srv_hb_err_handler, 688 rtrs_wq); 689 } 690 691 static void rtrs_srv_start_hb(struct rtrs_srv_path *srv_path) 692 { 693 rtrs_start_hb(&srv_path->s); 694 } 695 696 static void rtrs_srv_stop_hb(struct rtrs_srv_path *srv_path) 697 { 698 rtrs_stop_hb(&srv_path->s); 699 } 700 701 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 702 { 703 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 704 struct rtrs_path *s = con->c.path; 705 struct rtrs_srv_path *srv_path = to_srv_path(s); 706 struct rtrs_iu *iu; 707 708 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 709 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 710 711 if (wc->status != IB_WC_SUCCESS) { 712 rtrs_err(s, "Sess info response send failed: %s\n", 713 ib_wc_status_msg(wc->status)); 714 close_path(srv_path); 715 return; 716 } 717 WARN_ON(wc->opcode != IB_WC_SEND); 718 } 719 720 static int rtrs_srv_path_up(struct rtrs_srv_path *srv_path) 721 { 722 struct rtrs_srv_sess *srv = srv_path->srv; 723 struct rtrs_srv_ctx *ctx = srv->ctx; 724 int up, ret = 0; 725 726 mutex_lock(&srv->paths_ev_mutex); 727 up = ++srv->paths_up; 728 if (up == 1) 729 ret = ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 730 mutex_unlock(&srv->paths_ev_mutex); 731 732 /* Mark session as established */ 733 if (!ret) 734 srv_path->established = true; 735 736 return ret; 737 } 738 739 static void rtrs_srv_path_down(struct rtrs_srv_path *srv_path) 740 { 741 struct rtrs_srv_sess *srv = srv_path->srv; 742 struct rtrs_srv_ctx *ctx = srv->ctx; 743 744 if (!srv_path->established) 745 return; 746 747 srv_path->established = false; 748 mutex_lock(&srv->paths_ev_mutex); 749 WARN_ON(!srv->paths_up); 750 if (--srv->paths_up == 0) 751 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 752 mutex_unlock(&srv->paths_ev_mutex); 753 } 754 755 static bool exist_pathname(struct rtrs_srv_ctx *ctx, 756 const char *pathname, const uuid_t *path_uuid) 757 { 758 struct rtrs_srv_sess *srv; 759 struct rtrs_srv_path *srv_path; 760 bool found = false; 761 762 mutex_lock(&ctx->srv_mutex); 763 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 764 mutex_lock(&srv->paths_mutex); 765 766 /* when a client with same uuid and same sessname tried to add a path */ 767 if (uuid_equal(&srv->paths_uuid, path_uuid)) { 768 mutex_unlock(&srv->paths_mutex); 769 continue; 770 } 771 772 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 773 if (strlen(srv_path->s.sessname) == strlen(pathname) && 774 !strcmp(srv_path->s.sessname, pathname)) { 775 found = true; 776 break; 777 } 778 } 779 mutex_unlock(&srv->paths_mutex); 780 if (found) 781 break; 782 } 783 mutex_unlock(&ctx->srv_mutex); 784 return found; 785 } 786 787 static int post_recv_path(struct rtrs_srv_path *srv_path); 788 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno); 789 790 static int process_info_req(struct rtrs_srv_con *con, 791 struct rtrs_msg_info_req *msg) 792 { 793 struct rtrs_path *s = con->c.path; 794 struct rtrs_srv_path *srv_path = to_srv_path(s); 795 struct ib_send_wr *reg_wr = NULL; 796 struct rtrs_msg_info_rsp *rsp; 797 struct rtrs_iu *tx_iu; 798 struct ib_reg_wr *rwr; 799 int mri, err; 800 size_t tx_sz; 801 802 err = post_recv_path(srv_path); 803 if (err) { 804 rtrs_err(s, "post_recv_path(), err: %d\n", err); 805 return err; 806 } 807 808 if (strchr(msg->pathname, '/') || strchr(msg->pathname, '.')) { 809 rtrs_err(s, "pathname cannot contain / and .\n"); 810 return -EINVAL; 811 } 812 813 if (exist_pathname(srv_path->srv->ctx, 814 msg->pathname, &srv_path->srv->paths_uuid)) { 815 rtrs_err(s, "pathname is duplicated: %s\n", msg->pathname); 816 return -EPERM; 817 } 818 strscpy(srv_path->s.sessname, msg->pathname, 819 sizeof(srv_path->s.sessname)); 820 821 rwr = kcalloc(srv_path->mrs_num, sizeof(*rwr), GFP_KERNEL); 822 if (!rwr) 823 return -ENOMEM; 824 825 tx_sz = sizeof(*rsp); 826 tx_sz += sizeof(rsp->desc[0]) * srv_path->mrs_num; 827 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, srv_path->s.dev->ib_dev, 828 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 829 if (!tx_iu) { 830 err = -ENOMEM; 831 goto rwr_free; 832 } 833 834 rsp = tx_iu->buf; 835 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 836 rsp->sg_cnt = cpu_to_le16(srv_path->mrs_num); 837 838 for (mri = 0; mri < srv_path->mrs_num; mri++) { 839 struct ib_mr *mr = srv_path->mrs[mri].mr; 840 841 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 842 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 843 rsp->desc[mri].len = cpu_to_le32(mr->length); 844 845 /* 846 * Fill in reg MR request and chain them *backwards* 847 */ 848 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 849 rwr[mri].wr.opcode = IB_WR_REG_MR; 850 rwr[mri].wr.wr_cqe = &local_reg_cqe; 851 rwr[mri].wr.num_sge = 0; 852 rwr[mri].wr.send_flags = 0; 853 rwr[mri].mr = mr; 854 rwr[mri].key = mr->rkey; 855 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 856 IB_ACCESS_REMOTE_WRITE); 857 reg_wr = &rwr[mri].wr; 858 } 859 860 err = rtrs_srv_create_path_files(srv_path); 861 if (err) 862 goto iu_free; 863 kobject_get(&srv_path->kobj); 864 get_device(&srv_path->srv->dev); 865 err = rtrs_srv_change_state(srv_path, RTRS_SRV_CONNECTED); 866 if (!err) { 867 rtrs_err(s, "rtrs_srv_change_state(), err: %d\n", err); 868 goto iu_free; 869 } 870 871 rtrs_srv_start_hb(srv_path); 872 873 /* 874 * We do not account number of established connections at the current 875 * moment, we rely on the client, which should send info request when 876 * all connections are successfully established. Thus, simply notify 877 * listener with a proper event if we are the first path. 878 */ 879 err = rtrs_srv_path_up(srv_path); 880 if (err) { 881 rtrs_err(s, "rtrs_srv_path_up(), err: %d\n", err); 882 goto iu_free; 883 } 884 885 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 886 tx_iu->dma_addr, 887 tx_iu->size, DMA_TO_DEVICE); 888 889 /* Send info response */ 890 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 891 if (err) { 892 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err); 893 iu_free: 894 rtrs_iu_free(tx_iu, srv_path->s.dev->ib_dev, 1); 895 } 896 rwr_free: 897 kfree(rwr); 898 899 return err; 900 } 901 902 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 903 { 904 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 905 struct rtrs_path *s = con->c.path; 906 struct rtrs_srv_path *srv_path = to_srv_path(s); 907 struct rtrs_msg_info_req *msg; 908 struct rtrs_iu *iu; 909 int err; 910 911 WARN_ON(con->c.cid); 912 913 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 914 if (wc->status != IB_WC_SUCCESS) { 915 rtrs_err(s, "Sess info request receive failed: %s\n", 916 ib_wc_status_msg(wc->status)); 917 goto close; 918 } 919 WARN_ON(wc->opcode != IB_WC_RECV); 920 921 if (wc->byte_len < sizeof(*msg)) { 922 rtrs_err(s, "Sess info request is malformed: size %d\n", 923 wc->byte_len); 924 goto close; 925 } 926 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, iu->dma_addr, 927 iu->size, DMA_FROM_DEVICE); 928 msg = iu->buf; 929 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) { 930 rtrs_err(s, "Sess info request is malformed: type %d\n", 931 le16_to_cpu(msg->type)); 932 goto close; 933 } 934 err = process_info_req(con, msg); 935 if (err) 936 goto close; 937 938 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 939 return; 940 close: 941 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 942 close_path(srv_path); 943 } 944 945 static int post_recv_info_req(struct rtrs_srv_con *con) 946 { 947 struct rtrs_path *s = con->c.path; 948 struct rtrs_srv_path *srv_path = to_srv_path(s); 949 struct rtrs_iu *rx_iu; 950 int err; 951 952 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 953 GFP_KERNEL, srv_path->s.dev->ib_dev, 954 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 955 if (!rx_iu) 956 return -ENOMEM; 957 /* Prepare for getting info response */ 958 err = rtrs_iu_post_recv(&con->c, rx_iu); 959 if (err) { 960 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err); 961 rtrs_iu_free(rx_iu, srv_path->s.dev->ib_dev, 1); 962 return err; 963 } 964 965 return 0; 966 } 967 968 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 969 { 970 int i, err; 971 972 for (i = 0; i < q_size; i++) { 973 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 974 if (err) 975 return err; 976 } 977 978 return 0; 979 } 980 981 static int post_recv_path(struct rtrs_srv_path *srv_path) 982 { 983 struct rtrs_srv_sess *srv = srv_path->srv; 984 struct rtrs_path *s = &srv_path->s; 985 size_t q_size; 986 int err, cid; 987 988 for (cid = 0; cid < srv_path->s.con_num; cid++) { 989 if (cid == 0) 990 q_size = SERVICE_CON_QUEUE_DEPTH; 991 else 992 q_size = srv->queue_depth; 993 if (srv_path->state != RTRS_SRV_CONNECTING) { 994 rtrs_err(s, "Path state invalid. state %s\n", 995 rtrs_srv_state_str(srv_path->state)); 996 return -EIO; 997 } 998 999 if (!srv_path->s.con[cid]) { 1000 rtrs_err(s, "Conn not set for %d\n", cid); 1001 return -EIO; 1002 } 1003 1004 err = post_recv_io(to_srv_con(srv_path->s.con[cid]), q_size); 1005 if (err) { 1006 rtrs_err(s, "post_recv_io(), err: %d\n", err); 1007 return err; 1008 } 1009 } 1010 1011 return 0; 1012 } 1013 1014 static void process_read(struct rtrs_srv_con *con, 1015 struct rtrs_msg_rdma_read *msg, 1016 u32 buf_id, u32 off) 1017 { 1018 struct rtrs_path *s = con->c.path; 1019 struct rtrs_srv_path *srv_path = to_srv_path(s); 1020 struct rtrs_srv_sess *srv = srv_path->srv; 1021 struct rtrs_srv_ctx *ctx = srv->ctx; 1022 struct rtrs_srv_op *id; 1023 1024 size_t usr_len, data_len; 1025 void *data; 1026 int ret; 1027 1028 if (srv_path->state != RTRS_SRV_CONNECTED) { 1029 rtrs_err_rl(s, 1030 "Processing read request failed, session is disconnected, sess state %s\n", 1031 rtrs_srv_state_str(srv_path->state)); 1032 return; 1033 } 1034 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) { 1035 rtrs_err_rl(s, 1036 "Processing read request failed, invalid message\n"); 1037 return; 1038 } 1039 rtrs_srv_get_ops_ids(srv_path); 1040 rtrs_srv_update_rdma_stats(srv_path->stats, off, READ); 1041 id = srv_path->ops_ids[buf_id]; 1042 id->con = con; 1043 id->dir = READ; 1044 id->msg_id = buf_id; 1045 id->rd_msg = msg; 1046 usr_len = le16_to_cpu(msg->usr_len); 1047 data_len = off - usr_len; 1048 data = page_address(srv->chunks[buf_id]); 1049 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len, 1050 data + data_len, usr_len); 1051 1052 if (ret) { 1053 rtrs_err_rl(s, 1054 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n", 1055 buf_id, ret); 1056 goto send_err_msg; 1057 } 1058 1059 return; 1060 1061 send_err_msg: 1062 ret = send_io_resp_imm(con, id, ret); 1063 if (ret < 0) { 1064 rtrs_err_rl(s, 1065 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n", 1066 buf_id, ret); 1067 close_path(srv_path); 1068 } 1069 rtrs_srv_put_ops_ids(srv_path); 1070 } 1071 1072 static void process_write(struct rtrs_srv_con *con, 1073 struct rtrs_msg_rdma_write *req, 1074 u32 buf_id, u32 off) 1075 { 1076 struct rtrs_path *s = con->c.path; 1077 struct rtrs_srv_path *srv_path = to_srv_path(s); 1078 struct rtrs_srv_sess *srv = srv_path->srv; 1079 struct rtrs_srv_ctx *ctx = srv->ctx; 1080 struct rtrs_srv_op *id; 1081 1082 size_t data_len, usr_len; 1083 void *data; 1084 int ret; 1085 1086 if (srv_path->state != RTRS_SRV_CONNECTED) { 1087 rtrs_err_rl(s, 1088 "Processing write request failed, session is disconnected, sess state %s\n", 1089 rtrs_srv_state_str(srv_path->state)); 1090 return; 1091 } 1092 rtrs_srv_get_ops_ids(srv_path); 1093 rtrs_srv_update_rdma_stats(srv_path->stats, off, WRITE); 1094 id = srv_path->ops_ids[buf_id]; 1095 id->con = con; 1096 id->dir = WRITE; 1097 id->msg_id = buf_id; 1098 1099 usr_len = le16_to_cpu(req->usr_len); 1100 data_len = off - usr_len; 1101 data = page_address(srv->chunks[buf_id]); 1102 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len, 1103 data + data_len, usr_len); 1104 if (ret) { 1105 rtrs_err_rl(s, 1106 "Processing write request failed, user module callback reports err: %d\n", 1107 ret); 1108 goto send_err_msg; 1109 } 1110 1111 return; 1112 1113 send_err_msg: 1114 ret = send_io_resp_imm(con, id, ret); 1115 if (ret < 0) { 1116 rtrs_err_rl(s, 1117 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n", 1118 buf_id, ret); 1119 close_path(srv_path); 1120 } 1121 rtrs_srv_put_ops_ids(srv_path); 1122 } 1123 1124 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1125 u32 id, u32 off) 1126 { 1127 struct rtrs_path *s = con->c.path; 1128 struct rtrs_srv_path *srv_path = to_srv_path(s); 1129 struct rtrs_msg_rdma_hdr *hdr; 1130 unsigned int type; 1131 1132 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, 1133 srv_path->dma_addr[id], 1134 max_chunk_size, DMA_BIDIRECTIONAL); 1135 hdr = msg; 1136 type = le16_to_cpu(hdr->type); 1137 1138 switch (type) { 1139 case RTRS_MSG_WRITE: 1140 process_write(con, msg, id, off); 1141 break; 1142 case RTRS_MSG_READ: 1143 process_read(con, msg, id, off); 1144 break; 1145 default: 1146 rtrs_err(s, 1147 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1148 type); 1149 goto err; 1150 } 1151 1152 return; 1153 1154 err: 1155 close_path(srv_path); 1156 } 1157 1158 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1159 { 1160 struct rtrs_srv_mr *mr = 1161 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1162 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1163 struct rtrs_path *s = con->c.path; 1164 struct rtrs_srv_path *srv_path = to_srv_path(s); 1165 struct rtrs_srv_sess *srv = srv_path->srv; 1166 u32 msg_id, off; 1167 void *data; 1168 1169 if (wc->status != IB_WC_SUCCESS) { 1170 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1171 ib_wc_status_msg(wc->status)); 1172 close_path(srv_path); 1173 } 1174 msg_id = mr->msg_id; 1175 off = mr->msg_off; 1176 data = page_address(srv->chunks[msg_id]) + off; 1177 process_io_req(con, data, msg_id, off); 1178 } 1179 1180 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1181 struct rtrs_srv_mr *mr) 1182 { 1183 struct ib_send_wr wr = { 1184 .opcode = IB_WR_LOCAL_INV, 1185 .wr_cqe = &mr->inv_cqe, 1186 .send_flags = IB_SEND_SIGNALED, 1187 .ex.invalidate_rkey = mr->mr->rkey, 1188 }; 1189 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1190 1191 return ib_post_send(con->c.qp, &wr, NULL); 1192 } 1193 1194 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1195 { 1196 spin_lock(&con->rsp_wr_wait_lock); 1197 while (!list_empty(&con->rsp_wr_wait_list)) { 1198 struct rtrs_srv_op *id; 1199 int ret; 1200 1201 id = list_entry(con->rsp_wr_wait_list.next, 1202 struct rtrs_srv_op, wait_list); 1203 list_del(&id->wait_list); 1204 1205 spin_unlock(&con->rsp_wr_wait_lock); 1206 ret = rtrs_srv_resp_rdma(id, id->status); 1207 spin_lock(&con->rsp_wr_wait_lock); 1208 1209 if (!ret) { 1210 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1211 break; 1212 } 1213 } 1214 spin_unlock(&con->rsp_wr_wait_lock); 1215 } 1216 1217 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1218 { 1219 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1220 struct rtrs_path *s = con->c.path; 1221 struct rtrs_srv_path *srv_path = to_srv_path(s); 1222 struct rtrs_srv_sess *srv = srv_path->srv; 1223 u32 imm_type, imm_payload; 1224 int err; 1225 1226 if (wc->status != IB_WC_SUCCESS) { 1227 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1228 rtrs_err(s, 1229 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1230 ib_wc_status_msg(wc->status), wc->wr_cqe, 1231 wc->opcode, wc->vendor_err, wc->byte_len); 1232 close_path(srv_path); 1233 } 1234 return; 1235 } 1236 1237 switch (wc->opcode) { 1238 case IB_WC_RECV_RDMA_WITH_IMM: 1239 /* 1240 * post_recv() RDMA write completions of IO reqs (read/write) 1241 * and hb 1242 */ 1243 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1244 return; 1245 srv_path->s.hb_missed_cnt = 0; 1246 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1247 if (err) { 1248 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err); 1249 close_path(srv_path); 1250 break; 1251 } 1252 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1253 &imm_type, &imm_payload); 1254 if (imm_type == RTRS_IO_REQ_IMM) { 1255 u32 msg_id, off; 1256 void *data; 1257 1258 msg_id = imm_payload >> srv_path->mem_bits; 1259 off = imm_payload & ((1 << srv_path->mem_bits) - 1); 1260 if (msg_id >= srv->queue_depth || off >= max_chunk_size) { 1261 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1262 msg_id, off); 1263 close_path(srv_path); 1264 return; 1265 } 1266 if (always_invalidate) { 1267 struct rtrs_srv_mr *mr = &srv_path->mrs[msg_id]; 1268 1269 mr->msg_off = off; 1270 mr->msg_id = msg_id; 1271 err = rtrs_srv_inv_rkey(con, mr); 1272 if (err) { 1273 rtrs_err(s, "rtrs_post_recv(), err: %d\n", 1274 err); 1275 close_path(srv_path); 1276 break; 1277 } 1278 } else { 1279 data = page_address(srv->chunks[msg_id]) + off; 1280 process_io_req(con, data, msg_id, off); 1281 } 1282 } else if (imm_type == RTRS_HB_MSG_IMM) { 1283 WARN_ON(con->c.cid); 1284 rtrs_send_hb_ack(&srv_path->s); 1285 } else if (imm_type == RTRS_HB_ACK_IMM) { 1286 WARN_ON(con->c.cid); 1287 srv_path->s.hb_missed_cnt = 0; 1288 } else { 1289 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1290 } 1291 break; 1292 case IB_WC_RDMA_WRITE: 1293 case IB_WC_SEND: 1294 /* 1295 * post_send() RDMA write completions of IO reqs (read/write) 1296 * and hb. 1297 */ 1298 atomic_add(s->signal_interval, &con->c.sq_wr_avail); 1299 1300 if (!list_empty_careful(&con->rsp_wr_wait_list)) 1301 rtrs_rdma_process_wr_wait_list(con); 1302 1303 break; 1304 default: 1305 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1306 return; 1307 } 1308 } 1309 1310 /** 1311 * rtrs_srv_get_path_name() - Get rtrs_srv peer hostname. 1312 * @srv: Session 1313 * @pathname: Pathname buffer 1314 * @len: Length of sessname buffer 1315 */ 1316 int rtrs_srv_get_path_name(struct rtrs_srv_sess *srv, char *pathname, 1317 size_t len) 1318 { 1319 struct rtrs_srv_path *srv_path; 1320 int err = -ENOTCONN; 1321 1322 mutex_lock(&srv->paths_mutex); 1323 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 1324 if (srv_path->state != RTRS_SRV_CONNECTED) 1325 continue; 1326 strscpy(pathname, srv_path->s.sessname, 1327 min_t(size_t, sizeof(srv_path->s.sessname), len)); 1328 err = 0; 1329 break; 1330 } 1331 mutex_unlock(&srv->paths_mutex); 1332 1333 return err; 1334 } 1335 EXPORT_SYMBOL(rtrs_srv_get_path_name); 1336 1337 /** 1338 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth. 1339 * @srv: Session 1340 */ 1341 int rtrs_srv_get_queue_depth(struct rtrs_srv_sess *srv) 1342 { 1343 return srv->queue_depth; 1344 } 1345 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1346 1347 static int find_next_bit_ring(struct rtrs_srv_path *srv_path) 1348 { 1349 struct ib_device *ib_dev = srv_path->s.dev->ib_dev; 1350 int v; 1351 1352 v = cpumask_next(srv_path->cur_cq_vector, &cq_affinity_mask); 1353 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1354 v = cpumask_first(&cq_affinity_mask); 1355 return v; 1356 } 1357 1358 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_path *srv_path) 1359 { 1360 srv_path->cur_cq_vector = find_next_bit_ring(srv_path); 1361 1362 return srv_path->cur_cq_vector; 1363 } 1364 1365 static void rtrs_srv_dev_release(struct device *dev) 1366 { 1367 struct rtrs_srv_sess *srv = container_of(dev, struct rtrs_srv_sess, 1368 dev); 1369 1370 kfree(srv); 1371 } 1372 1373 static void free_srv(struct rtrs_srv_sess *srv) 1374 { 1375 int i; 1376 1377 WARN_ON(refcount_read(&srv->refcount)); 1378 for (i = 0; i < srv->queue_depth; i++) 1379 __free_pages(srv->chunks[i], get_order(max_chunk_size)); 1380 kfree(srv->chunks); 1381 mutex_destroy(&srv->paths_mutex); 1382 mutex_destroy(&srv->paths_ev_mutex); 1383 /* last put to release the srv structure */ 1384 put_device(&srv->dev); 1385 } 1386 1387 static struct rtrs_srv_sess *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1388 const uuid_t *paths_uuid, 1389 bool first_conn) 1390 { 1391 struct rtrs_srv_sess *srv; 1392 int i; 1393 1394 mutex_lock(&ctx->srv_mutex); 1395 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1396 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1397 refcount_inc_not_zero(&srv->refcount)) { 1398 mutex_unlock(&ctx->srv_mutex); 1399 return srv; 1400 } 1401 } 1402 mutex_unlock(&ctx->srv_mutex); 1403 /* 1404 * If this request is not the first connection request from the 1405 * client for this session then fail and return error. 1406 */ 1407 if (!first_conn) { 1408 pr_err_ratelimited("Error: Not the first connection request for this session\n"); 1409 return ERR_PTR(-ENXIO); 1410 } 1411 1412 /* need to allocate a new srv */ 1413 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1414 if (!srv) 1415 return ERR_PTR(-ENOMEM); 1416 1417 INIT_LIST_HEAD(&srv->paths_list); 1418 mutex_init(&srv->paths_mutex); 1419 mutex_init(&srv->paths_ev_mutex); 1420 uuid_copy(&srv->paths_uuid, paths_uuid); 1421 srv->queue_depth = sess_queue_depth; 1422 srv->ctx = ctx; 1423 device_initialize(&srv->dev); 1424 srv->dev.release = rtrs_srv_dev_release; 1425 1426 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1427 GFP_KERNEL); 1428 if (!srv->chunks) 1429 goto err_free_srv; 1430 1431 for (i = 0; i < srv->queue_depth; i++) { 1432 srv->chunks[i] = alloc_pages(GFP_KERNEL, 1433 get_order(max_chunk_size)); 1434 if (!srv->chunks[i]) 1435 goto err_free_chunks; 1436 } 1437 refcount_set(&srv->refcount, 1); 1438 mutex_lock(&ctx->srv_mutex); 1439 list_add(&srv->ctx_list, &ctx->srv_list); 1440 mutex_unlock(&ctx->srv_mutex); 1441 1442 return srv; 1443 1444 err_free_chunks: 1445 while (i--) 1446 __free_pages(srv->chunks[i], get_order(max_chunk_size)); 1447 kfree(srv->chunks); 1448 1449 err_free_srv: 1450 kfree(srv); 1451 return ERR_PTR(-ENOMEM); 1452 } 1453 1454 static void put_srv(struct rtrs_srv_sess *srv) 1455 { 1456 if (refcount_dec_and_test(&srv->refcount)) { 1457 struct rtrs_srv_ctx *ctx = srv->ctx; 1458 1459 WARN_ON(srv->dev.kobj.state_in_sysfs); 1460 1461 mutex_lock(&ctx->srv_mutex); 1462 list_del(&srv->ctx_list); 1463 mutex_unlock(&ctx->srv_mutex); 1464 free_srv(srv); 1465 } 1466 } 1467 1468 static void __add_path_to_srv(struct rtrs_srv_sess *srv, 1469 struct rtrs_srv_path *srv_path) 1470 { 1471 list_add_tail(&srv_path->s.entry, &srv->paths_list); 1472 srv->paths_num++; 1473 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1474 } 1475 1476 static void del_path_from_srv(struct rtrs_srv_path *srv_path) 1477 { 1478 struct rtrs_srv_sess *srv = srv_path->srv; 1479 1480 if (WARN_ON(!srv)) 1481 return; 1482 1483 mutex_lock(&srv->paths_mutex); 1484 list_del(&srv_path->s.entry); 1485 WARN_ON(!srv->paths_num); 1486 srv->paths_num--; 1487 mutex_unlock(&srv->paths_mutex); 1488 } 1489 1490 /* return true if addresses are the same, error other wise */ 1491 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1492 { 1493 switch (a->sa_family) { 1494 case AF_IB: 1495 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1496 &((struct sockaddr_ib *)b)->sib_addr, 1497 sizeof(struct ib_addr)) && 1498 (b->sa_family == AF_IB); 1499 case AF_INET: 1500 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1501 &((struct sockaddr_in *)b)->sin_addr, 1502 sizeof(struct in_addr)) && 1503 (b->sa_family == AF_INET); 1504 case AF_INET6: 1505 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1506 &((struct sockaddr_in6 *)b)->sin6_addr, 1507 sizeof(struct in6_addr)) && 1508 (b->sa_family == AF_INET6); 1509 default: 1510 return -ENOENT; 1511 } 1512 } 1513 1514 static bool __is_path_w_addr_exists(struct rtrs_srv_sess *srv, 1515 struct rdma_addr *addr) 1516 { 1517 struct rtrs_srv_path *srv_path; 1518 1519 list_for_each_entry(srv_path, &srv->paths_list, s.entry) 1520 if (!sockaddr_cmp((struct sockaddr *)&srv_path->s.dst_addr, 1521 (struct sockaddr *)&addr->dst_addr) && 1522 !sockaddr_cmp((struct sockaddr *)&srv_path->s.src_addr, 1523 (struct sockaddr *)&addr->src_addr)) 1524 return true; 1525 1526 return false; 1527 } 1528 1529 static void free_path(struct rtrs_srv_path *srv_path) 1530 { 1531 if (srv_path->kobj.state_in_sysfs) { 1532 kobject_del(&srv_path->kobj); 1533 kobject_put(&srv_path->kobj); 1534 } else { 1535 free_percpu(srv_path->stats->rdma_stats); 1536 kfree(srv_path->stats); 1537 kfree(srv_path); 1538 } 1539 } 1540 1541 static void rtrs_srv_close_work(struct work_struct *work) 1542 { 1543 struct rtrs_srv_path *srv_path; 1544 struct rtrs_srv_con *con; 1545 int i; 1546 1547 srv_path = container_of(work, typeof(*srv_path), close_work); 1548 1549 rtrs_srv_stop_hb(srv_path); 1550 1551 for (i = 0; i < srv_path->s.con_num; i++) { 1552 if (!srv_path->s.con[i]) 1553 continue; 1554 con = to_srv_con(srv_path->s.con[i]); 1555 rdma_disconnect(con->c.cm_id); 1556 ib_drain_qp(con->c.qp); 1557 } 1558 1559 /* 1560 * Degrade ref count to the usual model with a single shared 1561 * atomic_t counter 1562 */ 1563 percpu_ref_kill(&srv_path->ids_inflight_ref); 1564 1565 /* Wait for all completion */ 1566 wait_for_completion(&srv_path->complete_done); 1567 1568 rtrs_srv_destroy_path_files(srv_path); 1569 1570 /* Notify upper layer if we are the last path */ 1571 rtrs_srv_path_down(srv_path); 1572 1573 unmap_cont_bufs(srv_path); 1574 rtrs_srv_free_ops_ids(srv_path); 1575 1576 for (i = 0; i < srv_path->s.con_num; i++) { 1577 if (!srv_path->s.con[i]) 1578 continue; 1579 con = to_srv_con(srv_path->s.con[i]); 1580 rtrs_cq_qp_destroy(&con->c); 1581 rdma_destroy_id(con->c.cm_id); 1582 kfree(con); 1583 } 1584 rtrs_ib_dev_put(srv_path->s.dev); 1585 1586 del_path_from_srv(srv_path); 1587 put_srv(srv_path->srv); 1588 srv_path->srv = NULL; 1589 rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSED); 1590 1591 kfree(srv_path->dma_addr); 1592 kfree(srv_path->s.con); 1593 free_path(srv_path); 1594 } 1595 1596 static int rtrs_rdma_do_accept(struct rtrs_srv_path *srv_path, 1597 struct rdma_cm_id *cm_id) 1598 { 1599 struct rtrs_srv_sess *srv = srv_path->srv; 1600 struct rtrs_msg_conn_rsp msg; 1601 struct rdma_conn_param param; 1602 int err; 1603 1604 param = (struct rdma_conn_param) { 1605 .rnr_retry_count = 7, 1606 .private_data = &msg, 1607 .private_data_len = sizeof(msg), 1608 }; 1609 1610 msg = (struct rtrs_msg_conn_rsp) { 1611 .magic = cpu_to_le16(RTRS_MAGIC), 1612 .version = cpu_to_le16(RTRS_PROTO_VER), 1613 .queue_depth = cpu_to_le16(srv->queue_depth), 1614 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1615 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1616 }; 1617 1618 if (always_invalidate) 1619 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1620 1621 err = rdma_accept(cm_id, ¶m); 1622 if (err) 1623 pr_err("rdma_accept(), err: %d\n", err); 1624 1625 return err; 1626 } 1627 1628 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1629 { 1630 struct rtrs_msg_conn_rsp msg; 1631 int err; 1632 1633 msg = (struct rtrs_msg_conn_rsp) { 1634 .magic = cpu_to_le16(RTRS_MAGIC), 1635 .version = cpu_to_le16(RTRS_PROTO_VER), 1636 .errno = cpu_to_le16(errno), 1637 }; 1638 1639 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1640 if (err) 1641 pr_err("rdma_reject(), err: %d\n", err); 1642 1643 /* Bounce errno back */ 1644 return errno; 1645 } 1646 1647 static struct rtrs_srv_path * 1648 __find_path(struct rtrs_srv_sess *srv, const uuid_t *sess_uuid) 1649 { 1650 struct rtrs_srv_path *srv_path; 1651 1652 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 1653 if (uuid_equal(&srv_path->s.uuid, sess_uuid)) 1654 return srv_path; 1655 } 1656 1657 return NULL; 1658 } 1659 1660 static int create_con(struct rtrs_srv_path *srv_path, 1661 struct rdma_cm_id *cm_id, 1662 unsigned int cid) 1663 { 1664 struct rtrs_srv_sess *srv = srv_path->srv; 1665 struct rtrs_path *s = &srv_path->s; 1666 struct rtrs_srv_con *con; 1667 1668 u32 cq_num, max_send_wr, max_recv_wr, wr_limit; 1669 int err, cq_vector; 1670 1671 con = kzalloc(sizeof(*con), GFP_KERNEL); 1672 if (!con) { 1673 err = -ENOMEM; 1674 goto err; 1675 } 1676 1677 spin_lock_init(&con->rsp_wr_wait_lock); 1678 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1679 con->c.cm_id = cm_id; 1680 con->c.path = &srv_path->s; 1681 con->c.cid = cid; 1682 atomic_set(&con->c.wr_cnt, 1); 1683 wr_limit = srv_path->s.dev->ib_dev->attrs.max_qp_wr; 1684 1685 if (con->c.cid == 0) { 1686 /* 1687 * All receive and all send (each requiring invalidate) 1688 * + 2 for drain and heartbeat 1689 */ 1690 max_send_wr = min_t(int, wr_limit, 1691 SERVICE_CON_QUEUE_DEPTH * 2 + 2); 1692 max_recv_wr = max_send_wr; 1693 s->signal_interval = min_not_zero(srv->queue_depth, 1694 (size_t)SERVICE_CON_QUEUE_DEPTH); 1695 } else { 1696 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */ 1697 if (always_invalidate) 1698 max_send_wr = 1699 min_t(int, wr_limit, 1700 srv->queue_depth * (1 + 4) + 1); 1701 else 1702 max_send_wr = 1703 min_t(int, wr_limit, 1704 srv->queue_depth * (1 + 2) + 1); 1705 1706 max_recv_wr = srv->queue_depth + 1; 1707 } 1708 cq_num = max_send_wr + max_recv_wr; 1709 atomic_set(&con->c.sq_wr_avail, max_send_wr); 1710 cq_vector = rtrs_srv_get_next_cq_vector(srv_path); 1711 1712 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1713 err = rtrs_cq_qp_create(&srv_path->s, &con->c, 1, cq_vector, cq_num, 1714 max_send_wr, max_recv_wr, 1715 IB_POLL_WORKQUEUE); 1716 if (err) { 1717 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err); 1718 goto free_con; 1719 } 1720 if (con->c.cid == 0) { 1721 err = post_recv_info_req(con); 1722 if (err) 1723 goto free_cqqp; 1724 } 1725 WARN_ON(srv_path->s.con[cid]); 1726 srv_path->s.con[cid] = &con->c; 1727 1728 /* 1729 * Change context from server to current connection. The other 1730 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1731 */ 1732 cm_id->context = &con->c; 1733 1734 return 0; 1735 1736 free_cqqp: 1737 rtrs_cq_qp_destroy(&con->c); 1738 free_con: 1739 kfree(con); 1740 1741 err: 1742 return err; 1743 } 1744 1745 static struct rtrs_srv_path *__alloc_path(struct rtrs_srv_sess *srv, 1746 struct rdma_cm_id *cm_id, 1747 unsigned int con_num, 1748 unsigned int recon_cnt, 1749 const uuid_t *uuid) 1750 { 1751 struct rtrs_srv_path *srv_path; 1752 int err = -ENOMEM; 1753 char str[NAME_MAX]; 1754 struct rtrs_addr path; 1755 1756 if (srv->paths_num >= MAX_PATHS_NUM) { 1757 err = -ECONNRESET; 1758 goto err; 1759 } 1760 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1761 err = -EEXIST; 1762 pr_err("Path with same addr exists\n"); 1763 goto err; 1764 } 1765 srv_path = kzalloc(sizeof(*srv_path), GFP_KERNEL); 1766 if (!srv_path) 1767 goto err; 1768 1769 srv_path->stats = kzalloc(sizeof(*srv_path->stats), GFP_KERNEL); 1770 if (!srv_path->stats) 1771 goto err_free_sess; 1772 1773 srv_path->stats->rdma_stats = alloc_percpu(struct rtrs_srv_stats_rdma_stats); 1774 if (!srv_path->stats->rdma_stats) 1775 goto err_free_stats; 1776 1777 srv_path->stats->srv_path = srv_path; 1778 1779 srv_path->dma_addr = kcalloc(srv->queue_depth, 1780 sizeof(*srv_path->dma_addr), 1781 GFP_KERNEL); 1782 if (!srv_path->dma_addr) 1783 goto err_free_percpu; 1784 1785 srv_path->s.con = kcalloc(con_num, sizeof(*srv_path->s.con), 1786 GFP_KERNEL); 1787 if (!srv_path->s.con) 1788 goto err_free_dma_addr; 1789 1790 srv_path->state = RTRS_SRV_CONNECTING; 1791 srv_path->srv = srv; 1792 srv_path->cur_cq_vector = -1; 1793 srv_path->s.dst_addr = cm_id->route.addr.dst_addr; 1794 srv_path->s.src_addr = cm_id->route.addr.src_addr; 1795 1796 /* temporary until receiving session-name from client */ 1797 path.src = &srv_path->s.src_addr; 1798 path.dst = &srv_path->s.dst_addr; 1799 rtrs_addr_to_str(&path, str, sizeof(str)); 1800 strscpy(srv_path->s.sessname, str, sizeof(srv_path->s.sessname)); 1801 1802 srv_path->s.con_num = con_num; 1803 srv_path->s.irq_con_num = con_num; 1804 srv_path->s.recon_cnt = recon_cnt; 1805 uuid_copy(&srv_path->s.uuid, uuid); 1806 spin_lock_init(&srv_path->state_lock); 1807 INIT_WORK(&srv_path->close_work, rtrs_srv_close_work); 1808 rtrs_srv_init_hb(srv_path); 1809 1810 srv_path->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1811 if (!srv_path->s.dev) { 1812 err = -ENOMEM; 1813 goto err_free_con; 1814 } 1815 err = map_cont_bufs(srv_path); 1816 if (err) 1817 goto err_put_dev; 1818 1819 err = rtrs_srv_alloc_ops_ids(srv_path); 1820 if (err) 1821 goto err_unmap_bufs; 1822 1823 __add_path_to_srv(srv, srv_path); 1824 1825 return srv_path; 1826 1827 err_unmap_bufs: 1828 unmap_cont_bufs(srv_path); 1829 err_put_dev: 1830 rtrs_ib_dev_put(srv_path->s.dev); 1831 err_free_con: 1832 kfree(srv_path->s.con); 1833 err_free_dma_addr: 1834 kfree(srv_path->dma_addr); 1835 err_free_percpu: 1836 free_percpu(srv_path->stats->rdma_stats); 1837 err_free_stats: 1838 kfree(srv_path->stats); 1839 err_free_sess: 1840 kfree(srv_path); 1841 err: 1842 return ERR_PTR(err); 1843 } 1844 1845 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1846 const struct rtrs_msg_conn_req *msg, 1847 size_t len) 1848 { 1849 struct rtrs_srv_ctx *ctx = cm_id->context; 1850 struct rtrs_srv_path *srv_path; 1851 struct rtrs_srv_sess *srv; 1852 1853 u16 version, con_num, cid; 1854 u16 recon_cnt; 1855 int err = -ECONNRESET; 1856 1857 if (len < sizeof(*msg)) { 1858 pr_err("Invalid RTRS connection request\n"); 1859 goto reject_w_err; 1860 } 1861 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1862 pr_err("Invalid RTRS magic\n"); 1863 goto reject_w_err; 1864 } 1865 version = le16_to_cpu(msg->version); 1866 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1867 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1868 version >> 8, RTRS_PROTO_VER_MAJOR); 1869 goto reject_w_err; 1870 } 1871 con_num = le16_to_cpu(msg->cid_num); 1872 if (con_num > 4096) { 1873 /* Sanity check */ 1874 pr_err("Too many connections requested: %d\n", con_num); 1875 goto reject_w_err; 1876 } 1877 cid = le16_to_cpu(msg->cid); 1878 if (cid >= con_num) { 1879 /* Sanity check */ 1880 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1881 goto reject_w_err; 1882 } 1883 recon_cnt = le16_to_cpu(msg->recon_cnt); 1884 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn); 1885 if (IS_ERR(srv)) { 1886 err = PTR_ERR(srv); 1887 pr_err("get_or_create_srv(), error %d\n", err); 1888 goto reject_w_err; 1889 } 1890 mutex_lock(&srv->paths_mutex); 1891 srv_path = __find_path(srv, &msg->sess_uuid); 1892 if (srv_path) { 1893 struct rtrs_path *s = &srv_path->s; 1894 1895 /* Session already holds a reference */ 1896 put_srv(srv); 1897 1898 if (srv_path->state != RTRS_SRV_CONNECTING) { 1899 rtrs_err(s, "Session in wrong state: %s\n", 1900 rtrs_srv_state_str(srv_path->state)); 1901 mutex_unlock(&srv->paths_mutex); 1902 goto reject_w_err; 1903 } 1904 /* 1905 * Sanity checks 1906 */ 1907 if (con_num != s->con_num || cid >= s->con_num) { 1908 rtrs_err(s, "Incorrect request: %d, %d\n", 1909 cid, con_num); 1910 mutex_unlock(&srv->paths_mutex); 1911 goto reject_w_err; 1912 } 1913 if (s->con[cid]) { 1914 rtrs_err(s, "Connection already exists: %d\n", 1915 cid); 1916 mutex_unlock(&srv->paths_mutex); 1917 goto reject_w_err; 1918 } 1919 } else { 1920 srv_path = __alloc_path(srv, cm_id, con_num, recon_cnt, 1921 &msg->sess_uuid); 1922 if (IS_ERR(srv_path)) { 1923 mutex_unlock(&srv->paths_mutex); 1924 put_srv(srv); 1925 err = PTR_ERR(srv_path); 1926 pr_err("RTRS server session allocation failed: %d\n", err); 1927 goto reject_w_err; 1928 } 1929 } 1930 err = create_con(srv_path, cm_id, cid); 1931 if (err) { 1932 rtrs_err((&srv_path->s), "create_con(), error %d\n", err); 1933 rtrs_rdma_do_reject(cm_id, err); 1934 /* 1935 * Since session has other connections we follow normal way 1936 * through workqueue, but still return an error to tell cma.c 1937 * to call rdma_destroy_id() for current connection. 1938 */ 1939 goto close_and_return_err; 1940 } 1941 err = rtrs_rdma_do_accept(srv_path, cm_id); 1942 if (err) { 1943 rtrs_err((&srv_path->s), "rtrs_rdma_do_accept(), error %d\n", err); 1944 rtrs_rdma_do_reject(cm_id, err); 1945 /* 1946 * Since current connection was successfully added to the 1947 * session we follow normal way through workqueue to close the 1948 * session, thus return 0 to tell cma.c we call 1949 * rdma_destroy_id() ourselves. 1950 */ 1951 err = 0; 1952 goto close_and_return_err; 1953 } 1954 mutex_unlock(&srv->paths_mutex); 1955 1956 return 0; 1957 1958 reject_w_err: 1959 return rtrs_rdma_do_reject(cm_id, err); 1960 1961 close_and_return_err: 1962 mutex_unlock(&srv->paths_mutex); 1963 close_path(srv_path); 1964 1965 return err; 1966 } 1967 1968 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 1969 struct rdma_cm_event *ev) 1970 { 1971 struct rtrs_srv_path *srv_path = NULL; 1972 struct rtrs_path *s = NULL; 1973 struct rtrs_con *c = NULL; 1974 1975 if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST) 1976 /* 1977 * In case of error cma.c will destroy cm_id, 1978 * see cma_process_remove() 1979 */ 1980 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 1981 ev->param.conn.private_data_len); 1982 1983 c = cm_id->context; 1984 s = c->path; 1985 srv_path = to_srv_path(s); 1986 1987 switch (ev->event) { 1988 case RDMA_CM_EVENT_ESTABLISHED: 1989 /* Nothing here */ 1990 break; 1991 case RDMA_CM_EVENT_REJECTED: 1992 case RDMA_CM_EVENT_CONNECT_ERROR: 1993 case RDMA_CM_EVENT_UNREACHABLE: 1994 rtrs_err(s, "CM error (CM event: %s, err: %d)\n", 1995 rdma_event_msg(ev->event), ev->status); 1996 fallthrough; 1997 case RDMA_CM_EVENT_DISCONNECTED: 1998 case RDMA_CM_EVENT_ADDR_CHANGE: 1999 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 2000 case RDMA_CM_EVENT_DEVICE_REMOVAL: 2001 close_path(srv_path); 2002 break; 2003 default: 2004 pr_err("Ignoring unexpected CM event %s, err %d\n", 2005 rdma_event_msg(ev->event), ev->status); 2006 break; 2007 } 2008 2009 return 0; 2010 } 2011 2012 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 2013 struct sockaddr *addr, 2014 enum rdma_ucm_port_space ps) 2015 { 2016 struct rdma_cm_id *cm_id; 2017 int ret; 2018 2019 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 2020 ctx, ps, IB_QPT_RC); 2021 if (IS_ERR(cm_id)) { 2022 ret = PTR_ERR(cm_id); 2023 pr_err("Creating id for RDMA connection failed, err: %d\n", 2024 ret); 2025 goto err_out; 2026 } 2027 ret = rdma_bind_addr(cm_id, addr); 2028 if (ret) { 2029 pr_err("Binding RDMA address failed, err: %d\n", ret); 2030 goto err_cm; 2031 } 2032 ret = rdma_listen(cm_id, 64); 2033 if (ret) { 2034 pr_err("Listening on RDMA connection failed, err: %d\n", 2035 ret); 2036 goto err_cm; 2037 } 2038 2039 return cm_id; 2040 2041 err_cm: 2042 rdma_destroy_id(cm_id); 2043 err_out: 2044 2045 return ERR_PTR(ret); 2046 } 2047 2048 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 2049 { 2050 struct sockaddr_in6 sin = { 2051 .sin6_family = AF_INET6, 2052 .sin6_addr = IN6ADDR_ANY_INIT, 2053 .sin6_port = htons(port), 2054 }; 2055 struct sockaddr_ib sib = { 2056 .sib_family = AF_IB, 2057 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 2058 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 2059 .sib_pkey = cpu_to_be16(0xffff), 2060 }; 2061 struct rdma_cm_id *cm_ip, *cm_ib; 2062 int ret; 2063 2064 /* 2065 * We accept both IPoIB and IB connections, so we need to keep 2066 * two cm id's, one for each socket type and port space. 2067 * If the cm initialization of one of the id's fails, we abort 2068 * everything. 2069 */ 2070 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 2071 if (IS_ERR(cm_ip)) 2072 return PTR_ERR(cm_ip); 2073 2074 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 2075 if (IS_ERR(cm_ib)) { 2076 ret = PTR_ERR(cm_ib); 2077 goto free_cm_ip; 2078 } 2079 2080 ctx->cm_id_ip = cm_ip; 2081 ctx->cm_id_ib = cm_ib; 2082 2083 return 0; 2084 2085 free_cm_ip: 2086 rdma_destroy_id(cm_ip); 2087 2088 return ret; 2089 } 2090 2091 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 2092 { 2093 struct rtrs_srv_ctx *ctx; 2094 2095 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2096 if (!ctx) 2097 return NULL; 2098 2099 ctx->ops = *ops; 2100 mutex_init(&ctx->srv_mutex); 2101 INIT_LIST_HEAD(&ctx->srv_list); 2102 2103 return ctx; 2104 } 2105 2106 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2107 { 2108 WARN_ON(!list_empty(&ctx->srv_list)); 2109 mutex_destroy(&ctx->srv_mutex); 2110 kfree(ctx); 2111 } 2112 2113 static int rtrs_srv_add_one(struct ib_device *device) 2114 { 2115 struct rtrs_srv_ctx *ctx; 2116 int ret = 0; 2117 2118 mutex_lock(&ib_ctx.ib_dev_mutex); 2119 if (ib_ctx.ib_dev_count) 2120 goto out; 2121 2122 /* 2123 * Since our CM IDs are NOT bound to any ib device we will create them 2124 * only once 2125 */ 2126 ctx = ib_ctx.srv_ctx; 2127 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2128 if (ret) { 2129 /* 2130 * We errored out here. 2131 * According to the ib code, if we encounter an error here then the 2132 * error code is ignored, and no more calls to our ops are made. 2133 */ 2134 pr_err("Failed to initialize RDMA connection"); 2135 goto err_out; 2136 } 2137 2138 out: 2139 /* 2140 * Keep a track on the number of ib devices added 2141 */ 2142 ib_ctx.ib_dev_count++; 2143 2144 err_out: 2145 mutex_unlock(&ib_ctx.ib_dev_mutex); 2146 return ret; 2147 } 2148 2149 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2150 { 2151 struct rtrs_srv_ctx *ctx; 2152 2153 mutex_lock(&ib_ctx.ib_dev_mutex); 2154 ib_ctx.ib_dev_count--; 2155 2156 if (ib_ctx.ib_dev_count) 2157 goto out; 2158 2159 /* 2160 * Since our CM IDs are NOT bound to any ib device we will remove them 2161 * only once, when the last device is removed 2162 */ 2163 ctx = ib_ctx.srv_ctx; 2164 rdma_destroy_id(ctx->cm_id_ip); 2165 rdma_destroy_id(ctx->cm_id_ib); 2166 2167 out: 2168 mutex_unlock(&ib_ctx.ib_dev_mutex); 2169 } 2170 2171 static struct ib_client rtrs_srv_client = { 2172 .name = "rtrs_server", 2173 .add = rtrs_srv_add_one, 2174 .remove = rtrs_srv_remove_one 2175 }; 2176 2177 /** 2178 * rtrs_srv_open() - open RTRS server context 2179 * @ops: callback functions 2180 * @port: port to listen on 2181 * 2182 * Creates server context with specified callbacks. 2183 * 2184 * Return a valid pointer on success otherwise PTR_ERR. 2185 */ 2186 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2187 { 2188 struct rtrs_srv_ctx *ctx; 2189 int err; 2190 2191 ctx = alloc_srv_ctx(ops); 2192 if (!ctx) 2193 return ERR_PTR(-ENOMEM); 2194 2195 mutex_init(&ib_ctx.ib_dev_mutex); 2196 ib_ctx.srv_ctx = ctx; 2197 ib_ctx.port = port; 2198 2199 err = ib_register_client(&rtrs_srv_client); 2200 if (err) { 2201 free_srv_ctx(ctx); 2202 return ERR_PTR(err); 2203 } 2204 2205 return ctx; 2206 } 2207 EXPORT_SYMBOL(rtrs_srv_open); 2208 2209 static void close_paths(struct rtrs_srv_sess *srv) 2210 { 2211 struct rtrs_srv_path *srv_path; 2212 2213 mutex_lock(&srv->paths_mutex); 2214 list_for_each_entry(srv_path, &srv->paths_list, s.entry) 2215 close_path(srv_path); 2216 mutex_unlock(&srv->paths_mutex); 2217 } 2218 2219 static void close_ctx(struct rtrs_srv_ctx *ctx) 2220 { 2221 struct rtrs_srv_sess *srv; 2222 2223 mutex_lock(&ctx->srv_mutex); 2224 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2225 close_paths(srv); 2226 mutex_unlock(&ctx->srv_mutex); 2227 flush_workqueue(rtrs_wq); 2228 } 2229 2230 /** 2231 * rtrs_srv_close() - close RTRS server context 2232 * @ctx: pointer to server context 2233 * 2234 * Closes RTRS server context with all client sessions. 2235 */ 2236 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2237 { 2238 ib_unregister_client(&rtrs_srv_client); 2239 mutex_destroy(&ib_ctx.ib_dev_mutex); 2240 close_ctx(ctx); 2241 free_srv_ctx(ctx); 2242 } 2243 EXPORT_SYMBOL(rtrs_srv_close); 2244 2245 static int check_module_params(void) 2246 { 2247 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2248 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2249 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2250 return -EINVAL; 2251 } 2252 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) { 2253 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2254 max_chunk_size, MIN_CHUNK_SIZE); 2255 return -EINVAL; 2256 } 2257 2258 /* 2259 * Check if IB immediate data size is enough to hold the mem_id and the 2260 * offset inside the memory chunk 2261 */ 2262 if ((ilog2(sess_queue_depth - 1) + 1) + 2263 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2264 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2265 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2266 return -EINVAL; 2267 } 2268 2269 return 0; 2270 } 2271 2272 static int __init rtrs_server_init(void) 2273 { 2274 int err; 2275 2276 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2277 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2278 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2279 sess_queue_depth, always_invalidate); 2280 2281 rtrs_rdma_dev_pd_init(0, &dev_pd); 2282 2283 err = check_module_params(); 2284 if (err) { 2285 pr_err("Failed to load module, invalid module parameters, err: %d\n", 2286 err); 2287 return err; 2288 } 2289 err = class_register(&rtrs_dev_class); 2290 if (err) 2291 goto out_err; 2292 2293 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2294 if (!rtrs_wq) { 2295 err = -ENOMEM; 2296 goto out_dev_class; 2297 } 2298 2299 return 0; 2300 2301 out_dev_class: 2302 class_unregister(&rtrs_dev_class); 2303 out_err: 2304 return err; 2305 } 2306 2307 static void __exit rtrs_server_exit(void) 2308 { 2309 destroy_workqueue(rtrs_wq); 2310 class_unregister(&rtrs_dev_class); 2311 rtrs_rdma_dev_pd_deinit(&dev_pd); 2312 } 2313 2314 module_init(rtrs_server_init); 2315 module_exit(rtrs_server_exit); 2316