1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 15 #include "rtrs-srv.h" 16 #include "rtrs-log.h" 17 #include <rdma/ib_cm.h> 18 #include <rdma/ib_verbs.h> 19 20 MODULE_DESCRIPTION("RDMA Transport Server"); 21 MODULE_LICENSE("GPL"); 22 23 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 24 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 25 #define DEFAULT_SESS_QUEUE_DEPTH 512 26 #define MAX_HDR_SIZE PAGE_SIZE 27 28 static struct rtrs_rdma_dev_pd dev_pd; 29 struct class *rtrs_dev_class; 30 static struct rtrs_srv_ib_ctx ib_ctx; 31 32 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 33 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 34 35 static bool always_invalidate = true; 36 module_param(always_invalidate, bool, 0444); 37 MODULE_PARM_DESC(always_invalidate, 38 "Invalidate memory registration for contiguous memory regions before accessing."); 39 40 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 41 MODULE_PARM_DESC(max_chunk_size, 42 "Max size for each IO request, when change the unit is in byte (default: " 43 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 44 45 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 46 MODULE_PARM_DESC(sess_queue_depth, 47 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 48 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 49 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 50 51 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 52 53 static struct workqueue_struct *rtrs_wq; 54 55 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 56 { 57 return container_of(c, struct rtrs_srv_con, c); 58 } 59 60 static inline struct rtrs_srv_path *to_srv_path(struct rtrs_path *s) 61 { 62 return container_of(s, struct rtrs_srv_path, s); 63 } 64 65 static bool rtrs_srv_change_state(struct rtrs_srv_path *srv_path, 66 enum rtrs_srv_state new_state) 67 { 68 enum rtrs_srv_state old_state; 69 bool changed = false; 70 71 spin_lock_irq(&srv_path->state_lock); 72 old_state = srv_path->state; 73 switch (new_state) { 74 case RTRS_SRV_CONNECTED: 75 if (old_state == RTRS_SRV_CONNECTING) 76 changed = true; 77 break; 78 case RTRS_SRV_CLOSING: 79 if (old_state == RTRS_SRV_CONNECTING || 80 old_state == RTRS_SRV_CONNECTED) 81 changed = true; 82 break; 83 case RTRS_SRV_CLOSED: 84 if (old_state == RTRS_SRV_CLOSING) 85 changed = true; 86 break; 87 default: 88 break; 89 } 90 if (changed) 91 srv_path->state = new_state; 92 spin_unlock_irq(&srv_path->state_lock); 93 94 return changed; 95 } 96 97 static void free_id(struct rtrs_srv_op *id) 98 { 99 if (!id) 100 return; 101 kfree(id); 102 } 103 104 static void rtrs_srv_free_ops_ids(struct rtrs_srv_path *srv_path) 105 { 106 struct rtrs_srv_sess *srv = srv_path->srv; 107 int i; 108 109 if (srv_path->ops_ids) { 110 for (i = 0; i < srv->queue_depth; i++) 111 free_id(srv_path->ops_ids[i]); 112 kfree(srv_path->ops_ids); 113 srv_path->ops_ids = NULL; 114 } 115 } 116 117 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 118 119 static struct ib_cqe io_comp_cqe = { 120 .done = rtrs_srv_rdma_done 121 }; 122 123 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref) 124 { 125 struct rtrs_srv_path *srv_path = container_of(ref, 126 struct rtrs_srv_path, 127 ids_inflight_ref); 128 129 percpu_ref_exit(&srv_path->ids_inflight_ref); 130 complete(&srv_path->complete_done); 131 } 132 133 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_path *srv_path) 134 { 135 struct rtrs_srv_sess *srv = srv_path->srv; 136 struct rtrs_srv_op *id; 137 int i, ret; 138 139 srv_path->ops_ids = kcalloc(srv->queue_depth, 140 sizeof(*srv_path->ops_ids), 141 GFP_KERNEL); 142 if (!srv_path->ops_ids) 143 goto err; 144 145 for (i = 0; i < srv->queue_depth; ++i) { 146 id = kzalloc(sizeof(*id), GFP_KERNEL); 147 if (!id) 148 goto err; 149 150 srv_path->ops_ids[i] = id; 151 } 152 153 ret = percpu_ref_init(&srv_path->ids_inflight_ref, 154 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL); 155 if (ret) { 156 pr_err("Percpu reference init failed\n"); 157 goto err; 158 } 159 init_completion(&srv_path->complete_done); 160 161 return 0; 162 163 err: 164 rtrs_srv_free_ops_ids(srv_path); 165 return -ENOMEM; 166 } 167 168 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_path *srv_path) 169 { 170 percpu_ref_get(&srv_path->ids_inflight_ref); 171 } 172 173 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_path *srv_path) 174 { 175 percpu_ref_put(&srv_path->ids_inflight_ref); 176 } 177 178 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 179 { 180 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 181 struct rtrs_path *s = con->c.path; 182 struct rtrs_srv_path *srv_path = to_srv_path(s); 183 184 if (wc->status != IB_WC_SUCCESS) { 185 rtrs_err(s, "REG MR failed: %s\n", 186 ib_wc_status_msg(wc->status)); 187 close_path(srv_path); 188 return; 189 } 190 } 191 192 static struct ib_cqe local_reg_cqe = { 193 .done = rtrs_srv_reg_mr_done 194 }; 195 196 static int rdma_write_sg(struct rtrs_srv_op *id) 197 { 198 struct rtrs_path *s = id->con->c.path; 199 struct rtrs_srv_path *srv_path = to_srv_path(s); 200 dma_addr_t dma_addr = srv_path->dma_addr[id->msg_id]; 201 struct rtrs_srv_mr *srv_mr; 202 struct ib_send_wr inv_wr; 203 struct ib_rdma_wr imm_wr; 204 struct ib_rdma_wr *wr = NULL; 205 enum ib_send_flags flags; 206 size_t sg_cnt; 207 int err, offset; 208 bool need_inval; 209 u32 rkey = 0; 210 struct ib_reg_wr rwr; 211 struct ib_sge *plist; 212 struct ib_sge list; 213 214 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 215 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 216 if (sg_cnt != 1) 217 return -EINVAL; 218 219 offset = 0; 220 221 wr = &id->tx_wr; 222 plist = &id->tx_sg; 223 plist->addr = dma_addr + offset; 224 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 225 226 /* WR will fail with length error 227 * if this is 0 228 */ 229 if (plist->length == 0) { 230 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 231 return -EINVAL; 232 } 233 234 plist->lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 235 offset += plist->length; 236 237 wr->wr.sg_list = plist; 238 wr->wr.num_sge = 1; 239 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 240 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 241 if (rkey == 0) 242 rkey = wr->rkey; 243 else 244 /* Only one key is actually used */ 245 WARN_ON_ONCE(rkey != wr->rkey); 246 247 wr->wr.opcode = IB_WR_RDMA_WRITE; 248 wr->wr.wr_cqe = &io_comp_cqe; 249 wr->wr.ex.imm_data = 0; 250 wr->wr.send_flags = 0; 251 252 if (need_inval && always_invalidate) { 253 wr->wr.next = &rwr.wr; 254 rwr.wr.next = &inv_wr; 255 inv_wr.next = &imm_wr.wr; 256 } else if (always_invalidate) { 257 wr->wr.next = &rwr.wr; 258 rwr.wr.next = &imm_wr.wr; 259 } else if (need_inval) { 260 wr->wr.next = &inv_wr; 261 inv_wr.next = &imm_wr.wr; 262 } else { 263 wr->wr.next = &imm_wr.wr; 264 } 265 /* 266 * From time to time we have to post signaled sends, 267 * or send queue will fill up and only QP reset can help. 268 */ 269 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ? 270 0 : IB_SEND_SIGNALED; 271 272 if (need_inval) { 273 inv_wr.sg_list = NULL; 274 inv_wr.num_sge = 0; 275 inv_wr.opcode = IB_WR_SEND_WITH_INV; 276 inv_wr.wr_cqe = &io_comp_cqe; 277 inv_wr.send_flags = 0; 278 inv_wr.ex.invalidate_rkey = rkey; 279 } 280 281 imm_wr.wr.next = NULL; 282 if (always_invalidate) { 283 struct rtrs_msg_rkey_rsp *msg; 284 285 srv_mr = &srv_path->mrs[id->msg_id]; 286 rwr.wr.opcode = IB_WR_REG_MR; 287 rwr.wr.wr_cqe = &local_reg_cqe; 288 rwr.wr.num_sge = 0; 289 rwr.mr = srv_mr->mr; 290 rwr.wr.send_flags = 0; 291 rwr.key = srv_mr->mr->rkey; 292 rwr.access = (IB_ACCESS_LOCAL_WRITE | 293 IB_ACCESS_REMOTE_WRITE); 294 msg = srv_mr->iu->buf; 295 msg->buf_id = cpu_to_le16(id->msg_id); 296 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 297 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 298 299 list.addr = srv_mr->iu->dma_addr; 300 list.length = sizeof(*msg); 301 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 302 imm_wr.wr.sg_list = &list; 303 imm_wr.wr.num_sge = 1; 304 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 305 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 306 srv_mr->iu->dma_addr, 307 srv_mr->iu->size, DMA_TO_DEVICE); 308 } else { 309 imm_wr.wr.sg_list = NULL; 310 imm_wr.wr.num_sge = 0; 311 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 312 } 313 imm_wr.wr.send_flags = flags; 314 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 315 0, need_inval)); 316 317 imm_wr.wr.wr_cqe = &io_comp_cqe; 318 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, dma_addr, 319 offset, DMA_BIDIRECTIONAL); 320 321 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 322 if (err) 323 rtrs_err(s, 324 "Posting RDMA-Write-Request to QP failed, err: %d\n", 325 err); 326 327 return err; 328 } 329 330 /** 331 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 332 * requests or on successful WRITE request. 333 * @con: the connection to send back result 334 * @id: the id associated with the IO 335 * @errno: the error number of the IO. 336 * 337 * Return 0 on success, errno otherwise. 338 */ 339 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 340 int errno) 341 { 342 struct rtrs_path *s = con->c.path; 343 struct rtrs_srv_path *srv_path = to_srv_path(s); 344 struct ib_send_wr inv_wr, *wr = NULL; 345 struct ib_rdma_wr imm_wr; 346 struct ib_reg_wr rwr; 347 struct rtrs_srv_mr *srv_mr; 348 bool need_inval = false; 349 enum ib_send_flags flags; 350 u32 imm; 351 int err; 352 353 if (id->dir == READ) { 354 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 355 size_t sg_cnt; 356 357 need_inval = le16_to_cpu(rd_msg->flags) & 358 RTRS_MSG_NEED_INVAL_F; 359 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 360 361 if (need_inval) { 362 if (sg_cnt) { 363 inv_wr.wr_cqe = &io_comp_cqe; 364 inv_wr.sg_list = NULL; 365 inv_wr.num_sge = 0; 366 inv_wr.opcode = IB_WR_SEND_WITH_INV; 367 inv_wr.send_flags = 0; 368 /* Only one key is actually used */ 369 inv_wr.ex.invalidate_rkey = 370 le32_to_cpu(rd_msg->desc[0].key); 371 } else { 372 WARN_ON_ONCE(1); 373 need_inval = false; 374 } 375 } 376 } 377 378 if (need_inval && always_invalidate) { 379 wr = &inv_wr; 380 inv_wr.next = &rwr.wr; 381 rwr.wr.next = &imm_wr.wr; 382 } else if (always_invalidate) { 383 wr = &rwr.wr; 384 rwr.wr.next = &imm_wr.wr; 385 } else if (need_inval) { 386 wr = &inv_wr; 387 inv_wr.next = &imm_wr.wr; 388 } else { 389 wr = &imm_wr.wr; 390 } 391 /* 392 * From time to time we have to post signalled sends, 393 * or send queue will fill up and only QP reset can help. 394 */ 395 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ? 396 0 : IB_SEND_SIGNALED; 397 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 398 imm_wr.wr.next = NULL; 399 if (always_invalidate) { 400 struct ib_sge list; 401 struct rtrs_msg_rkey_rsp *msg; 402 403 srv_mr = &srv_path->mrs[id->msg_id]; 404 rwr.wr.next = &imm_wr.wr; 405 rwr.wr.opcode = IB_WR_REG_MR; 406 rwr.wr.wr_cqe = &local_reg_cqe; 407 rwr.wr.num_sge = 0; 408 rwr.wr.send_flags = 0; 409 rwr.mr = srv_mr->mr; 410 rwr.key = srv_mr->mr->rkey; 411 rwr.access = (IB_ACCESS_LOCAL_WRITE | 412 IB_ACCESS_REMOTE_WRITE); 413 msg = srv_mr->iu->buf; 414 msg->buf_id = cpu_to_le16(id->msg_id); 415 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 416 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 417 418 list.addr = srv_mr->iu->dma_addr; 419 list.length = sizeof(*msg); 420 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 421 imm_wr.wr.sg_list = &list; 422 imm_wr.wr.num_sge = 1; 423 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 424 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 425 srv_mr->iu->dma_addr, 426 srv_mr->iu->size, DMA_TO_DEVICE); 427 } else { 428 imm_wr.wr.sg_list = NULL; 429 imm_wr.wr.num_sge = 0; 430 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 431 } 432 imm_wr.wr.send_flags = flags; 433 imm_wr.wr.wr_cqe = &io_comp_cqe; 434 435 imm_wr.wr.ex.imm_data = cpu_to_be32(imm); 436 437 err = ib_post_send(id->con->c.qp, wr, NULL); 438 if (err) 439 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n", 440 err); 441 442 return err; 443 } 444 445 void close_path(struct rtrs_srv_path *srv_path) 446 { 447 if (rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSING)) 448 queue_work(rtrs_wq, &srv_path->close_work); 449 WARN_ON(srv_path->state != RTRS_SRV_CLOSING); 450 } 451 452 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 453 { 454 switch (state) { 455 case RTRS_SRV_CONNECTING: 456 return "RTRS_SRV_CONNECTING"; 457 case RTRS_SRV_CONNECTED: 458 return "RTRS_SRV_CONNECTED"; 459 case RTRS_SRV_CLOSING: 460 return "RTRS_SRV_CLOSING"; 461 case RTRS_SRV_CLOSED: 462 return "RTRS_SRV_CLOSED"; 463 default: 464 return "UNKNOWN"; 465 } 466 } 467 468 /** 469 * rtrs_srv_resp_rdma() - Finish an RDMA request 470 * 471 * @id: Internal RTRS operation identifier 472 * @status: Response Code sent to the other side for this operation. 473 * 0 = success, <=0 error 474 * Context: any 475 * 476 * Finish a RDMA operation. A message is sent to the client and the 477 * corresponding memory areas will be released. 478 */ 479 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 480 { 481 struct rtrs_srv_path *srv_path; 482 struct rtrs_srv_con *con; 483 struct rtrs_path *s; 484 int err; 485 486 if (WARN_ON(!id)) 487 return true; 488 489 con = id->con; 490 s = con->c.path; 491 srv_path = to_srv_path(s); 492 493 id->status = status; 494 495 if (srv_path->state != RTRS_SRV_CONNECTED) { 496 rtrs_err_rl(s, 497 "Sending I/O response failed, server path %s is disconnected, path state %s\n", 498 kobject_name(&srv_path->kobj), 499 rtrs_srv_state_str(srv_path->state)); 500 goto out; 501 } 502 if (always_invalidate) { 503 struct rtrs_srv_mr *mr = &srv_path->mrs[id->msg_id]; 504 505 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 506 } 507 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) { 508 rtrs_err(s, "IB send queue full: srv_path=%s cid=%d\n", 509 kobject_name(&srv_path->kobj), 510 con->c.cid); 511 atomic_add(1, &con->c.sq_wr_avail); 512 spin_lock(&con->rsp_wr_wait_lock); 513 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 514 spin_unlock(&con->rsp_wr_wait_lock); 515 return false; 516 } 517 518 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 519 err = send_io_resp_imm(con, id, status); 520 else 521 err = rdma_write_sg(id); 522 523 if (err) { 524 rtrs_err_rl(s, "IO response failed: %d: srv_path=%s\n", err, 525 kobject_name(&srv_path->kobj)); 526 close_path(srv_path); 527 } 528 out: 529 rtrs_srv_put_ops_ids(srv_path); 530 return true; 531 } 532 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 533 534 /** 535 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 536 * @srv: Session pointer 537 * @priv: The private pointer that is associated with the session. 538 */ 539 void rtrs_srv_set_sess_priv(struct rtrs_srv_sess *srv, void *priv) 540 { 541 srv->priv = priv; 542 } 543 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 544 545 static void unmap_cont_bufs(struct rtrs_srv_path *srv_path) 546 { 547 int i; 548 549 for (i = 0; i < srv_path->mrs_num; i++) { 550 struct rtrs_srv_mr *srv_mr; 551 552 srv_mr = &srv_path->mrs[i]; 553 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1); 554 ib_dereg_mr(srv_mr->mr); 555 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, srv_mr->sgt.sgl, 556 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 557 sg_free_table(&srv_mr->sgt); 558 } 559 kfree(srv_path->mrs); 560 } 561 562 static int map_cont_bufs(struct rtrs_srv_path *srv_path) 563 { 564 struct rtrs_srv_sess *srv = srv_path->srv; 565 struct rtrs_path *ss = &srv_path->s; 566 int i, mri, err, mrs_num; 567 unsigned int chunk_bits; 568 int chunks_per_mr = 1; 569 570 /* 571 * Here we map queue_depth chunks to MR. Firstly we have to 572 * figure out how many chunks can we map per MR. 573 */ 574 if (always_invalidate) { 575 /* 576 * in order to do invalidate for each chunks of memory, we needs 577 * more memory regions. 578 */ 579 mrs_num = srv->queue_depth; 580 } else { 581 chunks_per_mr = 582 srv_path->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 583 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 584 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 585 } 586 587 srv_path->mrs = kcalloc(mrs_num, sizeof(*srv_path->mrs), GFP_KERNEL); 588 if (!srv_path->mrs) 589 return -ENOMEM; 590 591 srv_path->mrs_num = mrs_num; 592 593 for (mri = 0; mri < mrs_num; mri++) { 594 struct rtrs_srv_mr *srv_mr = &srv_path->mrs[mri]; 595 struct sg_table *sgt = &srv_mr->sgt; 596 struct scatterlist *s; 597 struct ib_mr *mr; 598 int nr, chunks; 599 600 chunks = chunks_per_mr * mri; 601 if (!always_invalidate) 602 chunks_per_mr = min_t(int, chunks_per_mr, 603 srv->queue_depth - chunks); 604 605 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 606 if (err) 607 goto err; 608 609 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 610 sg_set_page(s, srv->chunks[chunks + i], 611 max_chunk_size, 0); 612 613 nr = ib_dma_map_sg(srv_path->s.dev->ib_dev, sgt->sgl, 614 sgt->nents, DMA_BIDIRECTIONAL); 615 if (nr < sgt->nents) { 616 err = nr < 0 ? nr : -EINVAL; 617 goto free_sg; 618 } 619 mr = ib_alloc_mr(srv_path->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 620 sgt->nents); 621 if (IS_ERR(mr)) { 622 err = PTR_ERR(mr); 623 goto unmap_sg; 624 } 625 nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents, 626 NULL, max_chunk_size); 627 if (nr < 0 || nr < sgt->nents) { 628 err = nr < 0 ? nr : -EINVAL; 629 goto dereg_mr; 630 } 631 632 if (always_invalidate) { 633 srv_mr->iu = rtrs_iu_alloc(1, 634 sizeof(struct rtrs_msg_rkey_rsp), 635 GFP_KERNEL, srv_path->s.dev->ib_dev, 636 DMA_TO_DEVICE, rtrs_srv_rdma_done); 637 if (!srv_mr->iu) { 638 err = -ENOMEM; 639 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err); 640 goto dereg_mr; 641 } 642 } 643 /* Eventually dma addr for each chunk can be cached */ 644 for_each_sg(sgt->sgl, s, sgt->orig_nents, i) 645 srv_path->dma_addr[chunks + i] = sg_dma_address(s); 646 647 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 648 srv_mr->mr = mr; 649 650 continue; 651 err: 652 while (mri--) { 653 srv_mr = &srv_path->mrs[mri]; 654 sgt = &srv_mr->sgt; 655 mr = srv_mr->mr; 656 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1); 657 dereg_mr: 658 ib_dereg_mr(mr); 659 unmap_sg: 660 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, sgt->sgl, 661 sgt->nents, DMA_BIDIRECTIONAL); 662 free_sg: 663 sg_free_table(sgt); 664 } 665 kfree(srv_path->mrs); 666 667 return err; 668 } 669 670 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 671 srv_path->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 672 673 return 0; 674 } 675 676 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 677 { 678 close_path(to_srv_path(c->path)); 679 } 680 681 static void rtrs_srv_init_hb(struct rtrs_srv_path *srv_path) 682 { 683 rtrs_init_hb(&srv_path->s, &io_comp_cqe, 684 RTRS_HB_INTERVAL_MS, 685 RTRS_HB_MISSED_MAX, 686 rtrs_srv_hb_err_handler, 687 rtrs_wq); 688 } 689 690 static void rtrs_srv_start_hb(struct rtrs_srv_path *srv_path) 691 { 692 rtrs_start_hb(&srv_path->s); 693 } 694 695 static void rtrs_srv_stop_hb(struct rtrs_srv_path *srv_path) 696 { 697 rtrs_stop_hb(&srv_path->s); 698 } 699 700 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 701 { 702 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 703 struct rtrs_path *s = con->c.path; 704 struct rtrs_srv_path *srv_path = to_srv_path(s); 705 struct rtrs_iu *iu; 706 707 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 708 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 709 710 if (wc->status != IB_WC_SUCCESS) { 711 rtrs_err(s, "Sess info response send failed: %s\n", 712 ib_wc_status_msg(wc->status)); 713 close_path(srv_path); 714 return; 715 } 716 WARN_ON(wc->opcode != IB_WC_SEND); 717 } 718 719 static void rtrs_srv_path_up(struct rtrs_srv_path *srv_path) 720 { 721 struct rtrs_srv_sess *srv = srv_path->srv; 722 struct rtrs_srv_ctx *ctx = srv->ctx; 723 int up; 724 725 mutex_lock(&srv->paths_ev_mutex); 726 up = ++srv->paths_up; 727 if (up == 1) 728 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 729 mutex_unlock(&srv->paths_ev_mutex); 730 731 /* Mark session as established */ 732 srv_path->established = true; 733 } 734 735 static void rtrs_srv_path_down(struct rtrs_srv_path *srv_path) 736 { 737 struct rtrs_srv_sess *srv = srv_path->srv; 738 struct rtrs_srv_ctx *ctx = srv->ctx; 739 740 if (!srv_path->established) 741 return; 742 743 srv_path->established = false; 744 mutex_lock(&srv->paths_ev_mutex); 745 WARN_ON(!srv->paths_up); 746 if (--srv->paths_up == 0) 747 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 748 mutex_unlock(&srv->paths_ev_mutex); 749 } 750 751 static bool exist_pathname(struct rtrs_srv_ctx *ctx, 752 const char *pathname, const uuid_t *path_uuid) 753 { 754 struct rtrs_srv_sess *srv; 755 struct rtrs_srv_path *srv_path; 756 bool found = false; 757 758 mutex_lock(&ctx->srv_mutex); 759 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 760 mutex_lock(&srv->paths_mutex); 761 762 /* when a client with same uuid and same sessname tried to add a path */ 763 if (uuid_equal(&srv->paths_uuid, path_uuid)) { 764 mutex_unlock(&srv->paths_mutex); 765 continue; 766 } 767 768 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 769 if (strlen(srv_path->s.sessname) == strlen(pathname) && 770 !strcmp(srv_path->s.sessname, pathname)) { 771 found = true; 772 break; 773 } 774 } 775 mutex_unlock(&srv->paths_mutex); 776 if (found) 777 break; 778 } 779 mutex_unlock(&ctx->srv_mutex); 780 return found; 781 } 782 783 static int post_recv_path(struct rtrs_srv_path *srv_path); 784 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno); 785 786 static int process_info_req(struct rtrs_srv_con *con, 787 struct rtrs_msg_info_req *msg) 788 { 789 struct rtrs_path *s = con->c.path; 790 struct rtrs_srv_path *srv_path = to_srv_path(s); 791 struct ib_send_wr *reg_wr = NULL; 792 struct rtrs_msg_info_rsp *rsp; 793 struct rtrs_iu *tx_iu; 794 struct ib_reg_wr *rwr; 795 int mri, err; 796 size_t tx_sz; 797 798 err = post_recv_path(srv_path); 799 if (err) { 800 rtrs_err(s, "post_recv_path(), err: %d\n", err); 801 return err; 802 } 803 804 if (strchr(msg->pathname, '/') || strchr(msg->pathname, '.')) { 805 rtrs_err(s, "pathname cannot contain / and .\n"); 806 return -EINVAL; 807 } 808 809 if (exist_pathname(srv_path->srv->ctx, 810 msg->pathname, &srv_path->srv->paths_uuid)) { 811 rtrs_err(s, "pathname is duplicated: %s\n", msg->pathname); 812 return -EPERM; 813 } 814 strscpy(srv_path->s.sessname, msg->pathname, 815 sizeof(srv_path->s.sessname)); 816 817 rwr = kcalloc(srv_path->mrs_num, sizeof(*rwr), GFP_KERNEL); 818 if (!rwr) 819 return -ENOMEM; 820 821 tx_sz = sizeof(*rsp); 822 tx_sz += sizeof(rsp->desc[0]) * srv_path->mrs_num; 823 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, srv_path->s.dev->ib_dev, 824 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 825 if (!tx_iu) { 826 err = -ENOMEM; 827 goto rwr_free; 828 } 829 830 rsp = tx_iu->buf; 831 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 832 rsp->sg_cnt = cpu_to_le16(srv_path->mrs_num); 833 834 for (mri = 0; mri < srv_path->mrs_num; mri++) { 835 struct ib_mr *mr = srv_path->mrs[mri].mr; 836 837 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 838 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 839 rsp->desc[mri].len = cpu_to_le32(mr->length); 840 841 /* 842 * Fill in reg MR request and chain them *backwards* 843 */ 844 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 845 rwr[mri].wr.opcode = IB_WR_REG_MR; 846 rwr[mri].wr.wr_cqe = &local_reg_cqe; 847 rwr[mri].wr.num_sge = 0; 848 rwr[mri].wr.send_flags = 0; 849 rwr[mri].mr = mr; 850 rwr[mri].key = mr->rkey; 851 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 852 IB_ACCESS_REMOTE_WRITE); 853 reg_wr = &rwr[mri].wr; 854 } 855 856 err = rtrs_srv_create_path_files(srv_path); 857 if (err) 858 goto iu_free; 859 kobject_get(&srv_path->kobj); 860 get_device(&srv_path->srv->dev); 861 rtrs_srv_change_state(srv_path, RTRS_SRV_CONNECTED); 862 rtrs_srv_start_hb(srv_path); 863 864 /* 865 * We do not account number of established connections at the current 866 * moment, we rely on the client, which should send info request when 867 * all connections are successfully established. Thus, simply notify 868 * listener with a proper event if we are the first path. 869 */ 870 rtrs_srv_path_up(srv_path); 871 872 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 873 tx_iu->dma_addr, 874 tx_iu->size, DMA_TO_DEVICE); 875 876 /* Send info response */ 877 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 878 if (err) { 879 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err); 880 iu_free: 881 rtrs_iu_free(tx_iu, srv_path->s.dev->ib_dev, 1); 882 } 883 rwr_free: 884 kfree(rwr); 885 886 return err; 887 } 888 889 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 890 { 891 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 892 struct rtrs_path *s = con->c.path; 893 struct rtrs_srv_path *srv_path = to_srv_path(s); 894 struct rtrs_msg_info_req *msg; 895 struct rtrs_iu *iu; 896 int err; 897 898 WARN_ON(con->c.cid); 899 900 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 901 if (wc->status != IB_WC_SUCCESS) { 902 rtrs_err(s, "Sess info request receive failed: %s\n", 903 ib_wc_status_msg(wc->status)); 904 goto close; 905 } 906 WARN_ON(wc->opcode != IB_WC_RECV); 907 908 if (wc->byte_len < sizeof(*msg)) { 909 rtrs_err(s, "Sess info request is malformed: size %d\n", 910 wc->byte_len); 911 goto close; 912 } 913 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, iu->dma_addr, 914 iu->size, DMA_FROM_DEVICE); 915 msg = iu->buf; 916 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) { 917 rtrs_err(s, "Sess info request is malformed: type %d\n", 918 le16_to_cpu(msg->type)); 919 goto close; 920 } 921 err = process_info_req(con, msg); 922 if (err) 923 goto close; 924 925 out: 926 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 927 return; 928 close: 929 close_path(srv_path); 930 goto out; 931 } 932 933 static int post_recv_info_req(struct rtrs_srv_con *con) 934 { 935 struct rtrs_path *s = con->c.path; 936 struct rtrs_srv_path *srv_path = to_srv_path(s); 937 struct rtrs_iu *rx_iu; 938 int err; 939 940 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 941 GFP_KERNEL, srv_path->s.dev->ib_dev, 942 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 943 if (!rx_iu) 944 return -ENOMEM; 945 /* Prepare for getting info response */ 946 err = rtrs_iu_post_recv(&con->c, rx_iu); 947 if (err) { 948 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err); 949 rtrs_iu_free(rx_iu, srv_path->s.dev->ib_dev, 1); 950 return err; 951 } 952 953 return 0; 954 } 955 956 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 957 { 958 int i, err; 959 960 for (i = 0; i < q_size; i++) { 961 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 962 if (err) 963 return err; 964 } 965 966 return 0; 967 } 968 969 static int post_recv_path(struct rtrs_srv_path *srv_path) 970 { 971 struct rtrs_srv_sess *srv = srv_path->srv; 972 struct rtrs_path *s = &srv_path->s; 973 size_t q_size; 974 int err, cid; 975 976 for (cid = 0; cid < srv_path->s.con_num; cid++) { 977 if (cid == 0) 978 q_size = SERVICE_CON_QUEUE_DEPTH; 979 else 980 q_size = srv->queue_depth; 981 982 err = post_recv_io(to_srv_con(srv_path->s.con[cid]), q_size); 983 if (err) { 984 rtrs_err(s, "post_recv_io(), err: %d\n", err); 985 return err; 986 } 987 } 988 989 return 0; 990 } 991 992 static void process_read(struct rtrs_srv_con *con, 993 struct rtrs_msg_rdma_read *msg, 994 u32 buf_id, u32 off) 995 { 996 struct rtrs_path *s = con->c.path; 997 struct rtrs_srv_path *srv_path = to_srv_path(s); 998 struct rtrs_srv_sess *srv = srv_path->srv; 999 struct rtrs_srv_ctx *ctx = srv->ctx; 1000 struct rtrs_srv_op *id; 1001 1002 size_t usr_len, data_len; 1003 void *data; 1004 int ret; 1005 1006 if (srv_path->state != RTRS_SRV_CONNECTED) { 1007 rtrs_err_rl(s, 1008 "Processing read request failed, session is disconnected, sess state %s\n", 1009 rtrs_srv_state_str(srv_path->state)); 1010 return; 1011 } 1012 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) { 1013 rtrs_err_rl(s, 1014 "Processing read request failed, invalid message\n"); 1015 return; 1016 } 1017 rtrs_srv_get_ops_ids(srv_path); 1018 rtrs_srv_update_rdma_stats(srv_path->stats, off, READ); 1019 id = srv_path->ops_ids[buf_id]; 1020 id->con = con; 1021 id->dir = READ; 1022 id->msg_id = buf_id; 1023 id->rd_msg = msg; 1024 usr_len = le16_to_cpu(msg->usr_len); 1025 data_len = off - usr_len; 1026 data = page_address(srv->chunks[buf_id]); 1027 ret = ctx->ops.rdma_ev(srv->priv, id, READ, data, data_len, 1028 data + data_len, usr_len); 1029 1030 if (ret) { 1031 rtrs_err_rl(s, 1032 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n", 1033 buf_id, ret); 1034 goto send_err_msg; 1035 } 1036 1037 return; 1038 1039 send_err_msg: 1040 ret = send_io_resp_imm(con, id, ret); 1041 if (ret < 0) { 1042 rtrs_err_rl(s, 1043 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n", 1044 buf_id, ret); 1045 close_path(srv_path); 1046 } 1047 rtrs_srv_put_ops_ids(srv_path); 1048 } 1049 1050 static void process_write(struct rtrs_srv_con *con, 1051 struct rtrs_msg_rdma_write *req, 1052 u32 buf_id, u32 off) 1053 { 1054 struct rtrs_path *s = con->c.path; 1055 struct rtrs_srv_path *srv_path = to_srv_path(s); 1056 struct rtrs_srv_sess *srv = srv_path->srv; 1057 struct rtrs_srv_ctx *ctx = srv->ctx; 1058 struct rtrs_srv_op *id; 1059 1060 size_t data_len, usr_len; 1061 void *data; 1062 int ret; 1063 1064 if (srv_path->state != RTRS_SRV_CONNECTED) { 1065 rtrs_err_rl(s, 1066 "Processing write request failed, session is disconnected, sess state %s\n", 1067 rtrs_srv_state_str(srv_path->state)); 1068 return; 1069 } 1070 rtrs_srv_get_ops_ids(srv_path); 1071 rtrs_srv_update_rdma_stats(srv_path->stats, off, WRITE); 1072 id = srv_path->ops_ids[buf_id]; 1073 id->con = con; 1074 id->dir = WRITE; 1075 id->msg_id = buf_id; 1076 1077 usr_len = le16_to_cpu(req->usr_len); 1078 data_len = off - usr_len; 1079 data = page_address(srv->chunks[buf_id]); 1080 ret = ctx->ops.rdma_ev(srv->priv, id, WRITE, data, data_len, 1081 data + data_len, usr_len); 1082 if (ret) { 1083 rtrs_err_rl(s, 1084 "Processing write request failed, user module callback reports err: %d\n", 1085 ret); 1086 goto send_err_msg; 1087 } 1088 1089 return; 1090 1091 send_err_msg: 1092 ret = send_io_resp_imm(con, id, ret); 1093 if (ret < 0) { 1094 rtrs_err_rl(s, 1095 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n", 1096 buf_id, ret); 1097 close_path(srv_path); 1098 } 1099 rtrs_srv_put_ops_ids(srv_path); 1100 } 1101 1102 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1103 u32 id, u32 off) 1104 { 1105 struct rtrs_path *s = con->c.path; 1106 struct rtrs_srv_path *srv_path = to_srv_path(s); 1107 struct rtrs_msg_rdma_hdr *hdr; 1108 unsigned int type; 1109 1110 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, 1111 srv_path->dma_addr[id], 1112 max_chunk_size, DMA_BIDIRECTIONAL); 1113 hdr = msg; 1114 type = le16_to_cpu(hdr->type); 1115 1116 switch (type) { 1117 case RTRS_MSG_WRITE: 1118 process_write(con, msg, id, off); 1119 break; 1120 case RTRS_MSG_READ: 1121 process_read(con, msg, id, off); 1122 break; 1123 default: 1124 rtrs_err(s, 1125 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1126 type); 1127 goto err; 1128 } 1129 1130 return; 1131 1132 err: 1133 close_path(srv_path); 1134 } 1135 1136 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1137 { 1138 struct rtrs_srv_mr *mr = 1139 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1140 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1141 struct rtrs_path *s = con->c.path; 1142 struct rtrs_srv_path *srv_path = to_srv_path(s); 1143 struct rtrs_srv_sess *srv = srv_path->srv; 1144 u32 msg_id, off; 1145 void *data; 1146 1147 if (wc->status != IB_WC_SUCCESS) { 1148 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1149 ib_wc_status_msg(wc->status)); 1150 close_path(srv_path); 1151 } 1152 msg_id = mr->msg_id; 1153 off = mr->msg_off; 1154 data = page_address(srv->chunks[msg_id]) + off; 1155 process_io_req(con, data, msg_id, off); 1156 } 1157 1158 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1159 struct rtrs_srv_mr *mr) 1160 { 1161 struct ib_send_wr wr = { 1162 .opcode = IB_WR_LOCAL_INV, 1163 .wr_cqe = &mr->inv_cqe, 1164 .send_flags = IB_SEND_SIGNALED, 1165 .ex.invalidate_rkey = mr->mr->rkey, 1166 }; 1167 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1168 1169 return ib_post_send(con->c.qp, &wr, NULL); 1170 } 1171 1172 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1173 { 1174 spin_lock(&con->rsp_wr_wait_lock); 1175 while (!list_empty(&con->rsp_wr_wait_list)) { 1176 struct rtrs_srv_op *id; 1177 int ret; 1178 1179 id = list_entry(con->rsp_wr_wait_list.next, 1180 struct rtrs_srv_op, wait_list); 1181 list_del(&id->wait_list); 1182 1183 spin_unlock(&con->rsp_wr_wait_lock); 1184 ret = rtrs_srv_resp_rdma(id, id->status); 1185 spin_lock(&con->rsp_wr_wait_lock); 1186 1187 if (!ret) { 1188 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1189 break; 1190 } 1191 } 1192 spin_unlock(&con->rsp_wr_wait_lock); 1193 } 1194 1195 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1196 { 1197 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1198 struct rtrs_path *s = con->c.path; 1199 struct rtrs_srv_path *srv_path = to_srv_path(s); 1200 struct rtrs_srv_sess *srv = srv_path->srv; 1201 u32 imm_type, imm_payload; 1202 int err; 1203 1204 if (wc->status != IB_WC_SUCCESS) { 1205 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1206 rtrs_err(s, 1207 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1208 ib_wc_status_msg(wc->status), wc->wr_cqe, 1209 wc->opcode, wc->vendor_err, wc->byte_len); 1210 close_path(srv_path); 1211 } 1212 return; 1213 } 1214 1215 switch (wc->opcode) { 1216 case IB_WC_RECV_RDMA_WITH_IMM: 1217 /* 1218 * post_recv() RDMA write completions of IO reqs (read/write) 1219 * and hb 1220 */ 1221 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1222 return; 1223 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1224 if (err) { 1225 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err); 1226 close_path(srv_path); 1227 break; 1228 } 1229 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1230 &imm_type, &imm_payload); 1231 if (imm_type == RTRS_IO_REQ_IMM) { 1232 u32 msg_id, off; 1233 void *data; 1234 1235 msg_id = imm_payload >> srv_path->mem_bits; 1236 off = imm_payload & ((1 << srv_path->mem_bits) - 1); 1237 if (msg_id >= srv->queue_depth || off >= max_chunk_size) { 1238 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1239 msg_id, off); 1240 close_path(srv_path); 1241 return; 1242 } 1243 if (always_invalidate) { 1244 struct rtrs_srv_mr *mr = &srv_path->mrs[msg_id]; 1245 1246 mr->msg_off = off; 1247 mr->msg_id = msg_id; 1248 err = rtrs_srv_inv_rkey(con, mr); 1249 if (err) { 1250 rtrs_err(s, "rtrs_post_recv(), err: %d\n", 1251 err); 1252 close_path(srv_path); 1253 break; 1254 } 1255 } else { 1256 data = page_address(srv->chunks[msg_id]) + off; 1257 process_io_req(con, data, msg_id, off); 1258 } 1259 } else if (imm_type == RTRS_HB_MSG_IMM) { 1260 WARN_ON(con->c.cid); 1261 rtrs_send_hb_ack(&srv_path->s); 1262 } else if (imm_type == RTRS_HB_ACK_IMM) { 1263 WARN_ON(con->c.cid); 1264 srv_path->s.hb_missed_cnt = 0; 1265 } else { 1266 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1267 } 1268 break; 1269 case IB_WC_RDMA_WRITE: 1270 case IB_WC_SEND: 1271 /* 1272 * post_send() RDMA write completions of IO reqs (read/write) 1273 * and hb. 1274 */ 1275 atomic_add(s->signal_interval, &con->c.sq_wr_avail); 1276 1277 if (!list_empty_careful(&con->rsp_wr_wait_list)) 1278 rtrs_rdma_process_wr_wait_list(con); 1279 1280 break; 1281 default: 1282 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1283 return; 1284 } 1285 } 1286 1287 /** 1288 * rtrs_srv_get_path_name() - Get rtrs_srv peer hostname. 1289 * @srv: Session 1290 * @pathname: Pathname buffer 1291 * @len: Length of sessname buffer 1292 */ 1293 int rtrs_srv_get_path_name(struct rtrs_srv_sess *srv, char *pathname, 1294 size_t len) 1295 { 1296 struct rtrs_srv_path *srv_path; 1297 int err = -ENOTCONN; 1298 1299 mutex_lock(&srv->paths_mutex); 1300 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 1301 if (srv_path->state != RTRS_SRV_CONNECTED) 1302 continue; 1303 strscpy(pathname, srv_path->s.sessname, 1304 min_t(size_t, sizeof(srv_path->s.sessname), len)); 1305 err = 0; 1306 break; 1307 } 1308 mutex_unlock(&srv->paths_mutex); 1309 1310 return err; 1311 } 1312 EXPORT_SYMBOL(rtrs_srv_get_path_name); 1313 1314 /** 1315 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth. 1316 * @srv: Session 1317 */ 1318 int rtrs_srv_get_queue_depth(struct rtrs_srv_sess *srv) 1319 { 1320 return srv->queue_depth; 1321 } 1322 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1323 1324 static int find_next_bit_ring(struct rtrs_srv_path *srv_path) 1325 { 1326 struct ib_device *ib_dev = srv_path->s.dev->ib_dev; 1327 int v; 1328 1329 v = cpumask_next(srv_path->cur_cq_vector, &cq_affinity_mask); 1330 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1331 v = cpumask_first(&cq_affinity_mask); 1332 return v; 1333 } 1334 1335 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_path *srv_path) 1336 { 1337 srv_path->cur_cq_vector = find_next_bit_ring(srv_path); 1338 1339 return srv_path->cur_cq_vector; 1340 } 1341 1342 static void rtrs_srv_dev_release(struct device *dev) 1343 { 1344 struct rtrs_srv_sess *srv = container_of(dev, struct rtrs_srv_sess, 1345 dev); 1346 1347 kfree(srv); 1348 } 1349 1350 static void free_srv(struct rtrs_srv_sess *srv) 1351 { 1352 int i; 1353 1354 WARN_ON(refcount_read(&srv->refcount)); 1355 for (i = 0; i < srv->queue_depth; i++) 1356 __free_pages(srv->chunks[i], get_order(max_chunk_size)); 1357 kfree(srv->chunks); 1358 mutex_destroy(&srv->paths_mutex); 1359 mutex_destroy(&srv->paths_ev_mutex); 1360 /* last put to release the srv structure */ 1361 put_device(&srv->dev); 1362 } 1363 1364 static struct rtrs_srv_sess *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1365 const uuid_t *paths_uuid, 1366 bool first_conn) 1367 { 1368 struct rtrs_srv_sess *srv; 1369 int i; 1370 1371 mutex_lock(&ctx->srv_mutex); 1372 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1373 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1374 refcount_inc_not_zero(&srv->refcount)) { 1375 mutex_unlock(&ctx->srv_mutex); 1376 return srv; 1377 } 1378 } 1379 mutex_unlock(&ctx->srv_mutex); 1380 /* 1381 * If this request is not the first connection request from the 1382 * client for this session then fail and return error. 1383 */ 1384 if (!first_conn) { 1385 pr_err_ratelimited("Error: Not the first connection request for this session\n"); 1386 return ERR_PTR(-ENXIO); 1387 } 1388 1389 /* need to allocate a new srv */ 1390 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1391 if (!srv) 1392 return ERR_PTR(-ENOMEM); 1393 1394 INIT_LIST_HEAD(&srv->paths_list); 1395 mutex_init(&srv->paths_mutex); 1396 mutex_init(&srv->paths_ev_mutex); 1397 uuid_copy(&srv->paths_uuid, paths_uuid); 1398 srv->queue_depth = sess_queue_depth; 1399 srv->ctx = ctx; 1400 device_initialize(&srv->dev); 1401 srv->dev.release = rtrs_srv_dev_release; 1402 1403 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1404 GFP_KERNEL); 1405 if (!srv->chunks) 1406 goto err_free_srv; 1407 1408 for (i = 0; i < srv->queue_depth; i++) { 1409 srv->chunks[i] = alloc_pages(GFP_KERNEL, 1410 get_order(max_chunk_size)); 1411 if (!srv->chunks[i]) 1412 goto err_free_chunks; 1413 } 1414 refcount_set(&srv->refcount, 1); 1415 mutex_lock(&ctx->srv_mutex); 1416 list_add(&srv->ctx_list, &ctx->srv_list); 1417 mutex_unlock(&ctx->srv_mutex); 1418 1419 return srv; 1420 1421 err_free_chunks: 1422 while (i--) 1423 __free_pages(srv->chunks[i], get_order(max_chunk_size)); 1424 kfree(srv->chunks); 1425 1426 err_free_srv: 1427 kfree(srv); 1428 return ERR_PTR(-ENOMEM); 1429 } 1430 1431 static void put_srv(struct rtrs_srv_sess *srv) 1432 { 1433 if (refcount_dec_and_test(&srv->refcount)) { 1434 struct rtrs_srv_ctx *ctx = srv->ctx; 1435 1436 WARN_ON(srv->dev.kobj.state_in_sysfs); 1437 1438 mutex_lock(&ctx->srv_mutex); 1439 list_del(&srv->ctx_list); 1440 mutex_unlock(&ctx->srv_mutex); 1441 free_srv(srv); 1442 } 1443 } 1444 1445 static void __add_path_to_srv(struct rtrs_srv_sess *srv, 1446 struct rtrs_srv_path *srv_path) 1447 { 1448 list_add_tail(&srv_path->s.entry, &srv->paths_list); 1449 srv->paths_num++; 1450 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1451 } 1452 1453 static void del_path_from_srv(struct rtrs_srv_path *srv_path) 1454 { 1455 struct rtrs_srv_sess *srv = srv_path->srv; 1456 1457 if (WARN_ON(!srv)) 1458 return; 1459 1460 mutex_lock(&srv->paths_mutex); 1461 list_del(&srv_path->s.entry); 1462 WARN_ON(!srv->paths_num); 1463 srv->paths_num--; 1464 mutex_unlock(&srv->paths_mutex); 1465 } 1466 1467 /* return true if addresses are the same, error other wise */ 1468 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1469 { 1470 switch (a->sa_family) { 1471 case AF_IB: 1472 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1473 &((struct sockaddr_ib *)b)->sib_addr, 1474 sizeof(struct ib_addr)) && 1475 (b->sa_family == AF_IB); 1476 case AF_INET: 1477 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1478 &((struct sockaddr_in *)b)->sin_addr, 1479 sizeof(struct in_addr)) && 1480 (b->sa_family == AF_INET); 1481 case AF_INET6: 1482 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1483 &((struct sockaddr_in6 *)b)->sin6_addr, 1484 sizeof(struct in6_addr)) && 1485 (b->sa_family == AF_INET6); 1486 default: 1487 return -ENOENT; 1488 } 1489 } 1490 1491 static bool __is_path_w_addr_exists(struct rtrs_srv_sess *srv, 1492 struct rdma_addr *addr) 1493 { 1494 struct rtrs_srv_path *srv_path; 1495 1496 list_for_each_entry(srv_path, &srv->paths_list, s.entry) 1497 if (!sockaddr_cmp((struct sockaddr *)&srv_path->s.dst_addr, 1498 (struct sockaddr *)&addr->dst_addr) && 1499 !sockaddr_cmp((struct sockaddr *)&srv_path->s.src_addr, 1500 (struct sockaddr *)&addr->src_addr)) 1501 return true; 1502 1503 return false; 1504 } 1505 1506 static void free_path(struct rtrs_srv_path *srv_path) 1507 { 1508 if (srv_path->kobj.state_in_sysfs) { 1509 kobject_del(&srv_path->kobj); 1510 kobject_put(&srv_path->kobj); 1511 } else { 1512 free_percpu(srv_path->stats->rdma_stats); 1513 kfree(srv_path->stats); 1514 kfree(srv_path); 1515 } 1516 } 1517 1518 static void rtrs_srv_close_work(struct work_struct *work) 1519 { 1520 struct rtrs_srv_path *srv_path; 1521 struct rtrs_srv_con *con; 1522 int i; 1523 1524 srv_path = container_of(work, typeof(*srv_path), close_work); 1525 1526 rtrs_srv_destroy_path_files(srv_path); 1527 rtrs_srv_stop_hb(srv_path); 1528 1529 for (i = 0; i < srv_path->s.con_num; i++) { 1530 if (!srv_path->s.con[i]) 1531 continue; 1532 con = to_srv_con(srv_path->s.con[i]); 1533 rdma_disconnect(con->c.cm_id); 1534 ib_drain_qp(con->c.qp); 1535 } 1536 1537 /* 1538 * Degrade ref count to the usual model with a single shared 1539 * atomic_t counter 1540 */ 1541 percpu_ref_kill(&srv_path->ids_inflight_ref); 1542 1543 /* Wait for all completion */ 1544 wait_for_completion(&srv_path->complete_done); 1545 1546 /* Notify upper layer if we are the last path */ 1547 rtrs_srv_path_down(srv_path); 1548 1549 unmap_cont_bufs(srv_path); 1550 rtrs_srv_free_ops_ids(srv_path); 1551 1552 for (i = 0; i < srv_path->s.con_num; i++) { 1553 if (!srv_path->s.con[i]) 1554 continue; 1555 con = to_srv_con(srv_path->s.con[i]); 1556 rtrs_cq_qp_destroy(&con->c); 1557 rdma_destroy_id(con->c.cm_id); 1558 kfree(con); 1559 } 1560 rtrs_ib_dev_put(srv_path->s.dev); 1561 1562 del_path_from_srv(srv_path); 1563 put_srv(srv_path->srv); 1564 srv_path->srv = NULL; 1565 rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSED); 1566 1567 kfree(srv_path->dma_addr); 1568 kfree(srv_path->s.con); 1569 free_path(srv_path); 1570 } 1571 1572 static int rtrs_rdma_do_accept(struct rtrs_srv_path *srv_path, 1573 struct rdma_cm_id *cm_id) 1574 { 1575 struct rtrs_srv_sess *srv = srv_path->srv; 1576 struct rtrs_msg_conn_rsp msg; 1577 struct rdma_conn_param param; 1578 int err; 1579 1580 param = (struct rdma_conn_param) { 1581 .rnr_retry_count = 7, 1582 .private_data = &msg, 1583 .private_data_len = sizeof(msg), 1584 }; 1585 1586 msg = (struct rtrs_msg_conn_rsp) { 1587 .magic = cpu_to_le16(RTRS_MAGIC), 1588 .version = cpu_to_le16(RTRS_PROTO_VER), 1589 .queue_depth = cpu_to_le16(srv->queue_depth), 1590 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1591 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1592 }; 1593 1594 if (always_invalidate) 1595 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1596 1597 err = rdma_accept(cm_id, ¶m); 1598 if (err) 1599 pr_err("rdma_accept(), err: %d\n", err); 1600 1601 return err; 1602 } 1603 1604 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1605 { 1606 struct rtrs_msg_conn_rsp msg; 1607 int err; 1608 1609 msg = (struct rtrs_msg_conn_rsp) { 1610 .magic = cpu_to_le16(RTRS_MAGIC), 1611 .version = cpu_to_le16(RTRS_PROTO_VER), 1612 .errno = cpu_to_le16(errno), 1613 }; 1614 1615 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1616 if (err) 1617 pr_err("rdma_reject(), err: %d\n", err); 1618 1619 /* Bounce errno back */ 1620 return errno; 1621 } 1622 1623 static struct rtrs_srv_path * 1624 __find_path(struct rtrs_srv_sess *srv, const uuid_t *sess_uuid) 1625 { 1626 struct rtrs_srv_path *srv_path; 1627 1628 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 1629 if (uuid_equal(&srv_path->s.uuid, sess_uuid)) 1630 return srv_path; 1631 } 1632 1633 return NULL; 1634 } 1635 1636 static int create_con(struct rtrs_srv_path *srv_path, 1637 struct rdma_cm_id *cm_id, 1638 unsigned int cid) 1639 { 1640 struct rtrs_srv_sess *srv = srv_path->srv; 1641 struct rtrs_path *s = &srv_path->s; 1642 struct rtrs_srv_con *con; 1643 1644 u32 cq_num, max_send_wr, max_recv_wr, wr_limit; 1645 int err, cq_vector; 1646 1647 con = kzalloc(sizeof(*con), GFP_KERNEL); 1648 if (!con) { 1649 err = -ENOMEM; 1650 goto err; 1651 } 1652 1653 spin_lock_init(&con->rsp_wr_wait_lock); 1654 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1655 con->c.cm_id = cm_id; 1656 con->c.path = &srv_path->s; 1657 con->c.cid = cid; 1658 atomic_set(&con->c.wr_cnt, 1); 1659 wr_limit = srv_path->s.dev->ib_dev->attrs.max_qp_wr; 1660 1661 if (con->c.cid == 0) { 1662 /* 1663 * All receive and all send (each requiring invalidate) 1664 * + 2 for drain and heartbeat 1665 */ 1666 max_send_wr = min_t(int, wr_limit, 1667 SERVICE_CON_QUEUE_DEPTH * 2 + 2); 1668 max_recv_wr = max_send_wr; 1669 s->signal_interval = min_not_zero(srv->queue_depth, 1670 (size_t)SERVICE_CON_QUEUE_DEPTH); 1671 } else { 1672 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */ 1673 if (always_invalidate) 1674 max_send_wr = 1675 min_t(int, wr_limit, 1676 srv->queue_depth * (1 + 4) + 1); 1677 else 1678 max_send_wr = 1679 min_t(int, wr_limit, 1680 srv->queue_depth * (1 + 2) + 1); 1681 1682 max_recv_wr = srv->queue_depth + 1; 1683 /* 1684 * If we have all receive requests posted and 1685 * all write requests posted and each read request 1686 * requires an invalidate request + drain 1687 * and qp gets into error state. 1688 */ 1689 } 1690 cq_num = max_send_wr + max_recv_wr; 1691 atomic_set(&con->c.sq_wr_avail, max_send_wr); 1692 cq_vector = rtrs_srv_get_next_cq_vector(srv_path); 1693 1694 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1695 err = rtrs_cq_qp_create(&srv_path->s, &con->c, 1, cq_vector, cq_num, 1696 max_send_wr, max_recv_wr, 1697 IB_POLL_WORKQUEUE); 1698 if (err) { 1699 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err); 1700 goto free_con; 1701 } 1702 if (con->c.cid == 0) { 1703 err = post_recv_info_req(con); 1704 if (err) 1705 goto free_cqqp; 1706 } 1707 WARN_ON(srv_path->s.con[cid]); 1708 srv_path->s.con[cid] = &con->c; 1709 1710 /* 1711 * Change context from server to current connection. The other 1712 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1713 */ 1714 cm_id->context = &con->c; 1715 1716 return 0; 1717 1718 free_cqqp: 1719 rtrs_cq_qp_destroy(&con->c); 1720 free_con: 1721 kfree(con); 1722 1723 err: 1724 return err; 1725 } 1726 1727 static struct rtrs_srv_path *__alloc_path(struct rtrs_srv_sess *srv, 1728 struct rdma_cm_id *cm_id, 1729 unsigned int con_num, 1730 unsigned int recon_cnt, 1731 const uuid_t *uuid) 1732 { 1733 struct rtrs_srv_path *srv_path; 1734 int err = -ENOMEM; 1735 char str[NAME_MAX]; 1736 struct rtrs_addr path; 1737 1738 if (srv->paths_num >= MAX_PATHS_NUM) { 1739 err = -ECONNRESET; 1740 goto err; 1741 } 1742 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1743 err = -EEXIST; 1744 pr_err("Path with same addr exists\n"); 1745 goto err; 1746 } 1747 srv_path = kzalloc(sizeof(*srv_path), GFP_KERNEL); 1748 if (!srv_path) 1749 goto err; 1750 1751 srv_path->stats = kzalloc(sizeof(*srv_path->stats), GFP_KERNEL); 1752 if (!srv_path->stats) 1753 goto err_free_sess; 1754 1755 srv_path->stats->rdma_stats = alloc_percpu(struct rtrs_srv_stats_rdma_stats); 1756 if (!srv_path->stats->rdma_stats) 1757 goto err_free_stats; 1758 1759 srv_path->stats->srv_path = srv_path; 1760 1761 srv_path->dma_addr = kcalloc(srv->queue_depth, 1762 sizeof(*srv_path->dma_addr), 1763 GFP_KERNEL); 1764 if (!srv_path->dma_addr) 1765 goto err_free_percpu; 1766 1767 srv_path->s.con = kcalloc(con_num, sizeof(*srv_path->s.con), 1768 GFP_KERNEL); 1769 if (!srv_path->s.con) 1770 goto err_free_dma_addr; 1771 1772 srv_path->state = RTRS_SRV_CONNECTING; 1773 srv_path->srv = srv; 1774 srv_path->cur_cq_vector = -1; 1775 srv_path->s.dst_addr = cm_id->route.addr.dst_addr; 1776 srv_path->s.src_addr = cm_id->route.addr.src_addr; 1777 1778 /* temporary until receiving session-name from client */ 1779 path.src = &srv_path->s.src_addr; 1780 path.dst = &srv_path->s.dst_addr; 1781 rtrs_addr_to_str(&path, str, sizeof(str)); 1782 strscpy(srv_path->s.sessname, str, sizeof(srv_path->s.sessname)); 1783 1784 srv_path->s.con_num = con_num; 1785 srv_path->s.irq_con_num = con_num; 1786 srv_path->s.recon_cnt = recon_cnt; 1787 uuid_copy(&srv_path->s.uuid, uuid); 1788 spin_lock_init(&srv_path->state_lock); 1789 INIT_WORK(&srv_path->close_work, rtrs_srv_close_work); 1790 rtrs_srv_init_hb(srv_path); 1791 1792 srv_path->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1793 if (!srv_path->s.dev) { 1794 err = -ENOMEM; 1795 goto err_free_con; 1796 } 1797 err = map_cont_bufs(srv_path); 1798 if (err) 1799 goto err_put_dev; 1800 1801 err = rtrs_srv_alloc_ops_ids(srv_path); 1802 if (err) 1803 goto err_unmap_bufs; 1804 1805 __add_path_to_srv(srv, srv_path); 1806 1807 return srv_path; 1808 1809 err_unmap_bufs: 1810 unmap_cont_bufs(srv_path); 1811 err_put_dev: 1812 rtrs_ib_dev_put(srv_path->s.dev); 1813 err_free_con: 1814 kfree(srv_path->s.con); 1815 err_free_dma_addr: 1816 kfree(srv_path->dma_addr); 1817 err_free_percpu: 1818 free_percpu(srv_path->stats->rdma_stats); 1819 err_free_stats: 1820 kfree(srv_path->stats); 1821 err_free_sess: 1822 kfree(srv_path); 1823 err: 1824 return ERR_PTR(err); 1825 } 1826 1827 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1828 const struct rtrs_msg_conn_req *msg, 1829 size_t len) 1830 { 1831 struct rtrs_srv_ctx *ctx = cm_id->context; 1832 struct rtrs_srv_path *srv_path; 1833 struct rtrs_srv_sess *srv; 1834 1835 u16 version, con_num, cid; 1836 u16 recon_cnt; 1837 int err = -ECONNRESET; 1838 1839 if (len < sizeof(*msg)) { 1840 pr_err("Invalid RTRS connection request\n"); 1841 goto reject_w_err; 1842 } 1843 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1844 pr_err("Invalid RTRS magic\n"); 1845 goto reject_w_err; 1846 } 1847 version = le16_to_cpu(msg->version); 1848 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1849 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1850 version >> 8, RTRS_PROTO_VER_MAJOR); 1851 goto reject_w_err; 1852 } 1853 con_num = le16_to_cpu(msg->cid_num); 1854 if (con_num > 4096) { 1855 /* Sanity check */ 1856 pr_err("Too many connections requested: %d\n", con_num); 1857 goto reject_w_err; 1858 } 1859 cid = le16_to_cpu(msg->cid); 1860 if (cid >= con_num) { 1861 /* Sanity check */ 1862 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1863 goto reject_w_err; 1864 } 1865 recon_cnt = le16_to_cpu(msg->recon_cnt); 1866 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn); 1867 if (IS_ERR(srv)) { 1868 err = PTR_ERR(srv); 1869 pr_err("get_or_create_srv(), error %d\n", err); 1870 goto reject_w_err; 1871 } 1872 mutex_lock(&srv->paths_mutex); 1873 srv_path = __find_path(srv, &msg->sess_uuid); 1874 if (srv_path) { 1875 struct rtrs_path *s = &srv_path->s; 1876 1877 /* Session already holds a reference */ 1878 put_srv(srv); 1879 1880 if (srv_path->state != RTRS_SRV_CONNECTING) { 1881 rtrs_err(s, "Session in wrong state: %s\n", 1882 rtrs_srv_state_str(srv_path->state)); 1883 mutex_unlock(&srv->paths_mutex); 1884 goto reject_w_err; 1885 } 1886 /* 1887 * Sanity checks 1888 */ 1889 if (con_num != s->con_num || cid >= s->con_num) { 1890 rtrs_err(s, "Incorrect request: %d, %d\n", 1891 cid, con_num); 1892 mutex_unlock(&srv->paths_mutex); 1893 goto reject_w_err; 1894 } 1895 if (s->con[cid]) { 1896 rtrs_err(s, "Connection already exists: %d\n", 1897 cid); 1898 mutex_unlock(&srv->paths_mutex); 1899 goto reject_w_err; 1900 } 1901 } else { 1902 srv_path = __alloc_path(srv, cm_id, con_num, recon_cnt, 1903 &msg->sess_uuid); 1904 if (IS_ERR(srv_path)) { 1905 mutex_unlock(&srv->paths_mutex); 1906 put_srv(srv); 1907 err = PTR_ERR(srv_path); 1908 pr_err("RTRS server session allocation failed: %d\n", err); 1909 goto reject_w_err; 1910 } 1911 } 1912 err = create_con(srv_path, cm_id, cid); 1913 if (err) { 1914 rtrs_err((&srv_path->s), "create_con(), error %d\n", err); 1915 rtrs_rdma_do_reject(cm_id, err); 1916 /* 1917 * Since session has other connections we follow normal way 1918 * through workqueue, but still return an error to tell cma.c 1919 * to call rdma_destroy_id() for current connection. 1920 */ 1921 goto close_and_return_err; 1922 } 1923 err = rtrs_rdma_do_accept(srv_path, cm_id); 1924 if (err) { 1925 rtrs_err((&srv_path->s), "rtrs_rdma_do_accept(), error %d\n", err); 1926 rtrs_rdma_do_reject(cm_id, err); 1927 /* 1928 * Since current connection was successfully added to the 1929 * session we follow normal way through workqueue to close the 1930 * session, thus return 0 to tell cma.c we call 1931 * rdma_destroy_id() ourselves. 1932 */ 1933 err = 0; 1934 goto close_and_return_err; 1935 } 1936 mutex_unlock(&srv->paths_mutex); 1937 1938 return 0; 1939 1940 reject_w_err: 1941 return rtrs_rdma_do_reject(cm_id, err); 1942 1943 close_and_return_err: 1944 mutex_unlock(&srv->paths_mutex); 1945 close_path(srv_path); 1946 1947 return err; 1948 } 1949 1950 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 1951 struct rdma_cm_event *ev) 1952 { 1953 struct rtrs_srv_path *srv_path = NULL; 1954 struct rtrs_path *s = NULL; 1955 1956 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 1957 struct rtrs_con *c = cm_id->context; 1958 1959 s = c->path; 1960 srv_path = to_srv_path(s); 1961 } 1962 1963 switch (ev->event) { 1964 case RDMA_CM_EVENT_CONNECT_REQUEST: 1965 /* 1966 * In case of error cma.c will destroy cm_id, 1967 * see cma_process_remove() 1968 */ 1969 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 1970 ev->param.conn.private_data_len); 1971 case RDMA_CM_EVENT_ESTABLISHED: 1972 /* Nothing here */ 1973 break; 1974 case RDMA_CM_EVENT_REJECTED: 1975 case RDMA_CM_EVENT_CONNECT_ERROR: 1976 case RDMA_CM_EVENT_UNREACHABLE: 1977 rtrs_err(s, "CM error (CM event: %s, err: %d)\n", 1978 rdma_event_msg(ev->event), ev->status); 1979 fallthrough; 1980 case RDMA_CM_EVENT_DISCONNECTED: 1981 case RDMA_CM_EVENT_ADDR_CHANGE: 1982 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1983 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1984 close_path(srv_path); 1985 break; 1986 default: 1987 pr_err("Ignoring unexpected CM event %s, err %d\n", 1988 rdma_event_msg(ev->event), ev->status); 1989 break; 1990 } 1991 1992 return 0; 1993 } 1994 1995 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 1996 struct sockaddr *addr, 1997 enum rdma_ucm_port_space ps) 1998 { 1999 struct rdma_cm_id *cm_id; 2000 int ret; 2001 2002 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 2003 ctx, ps, IB_QPT_RC); 2004 if (IS_ERR(cm_id)) { 2005 ret = PTR_ERR(cm_id); 2006 pr_err("Creating id for RDMA connection failed, err: %d\n", 2007 ret); 2008 goto err_out; 2009 } 2010 ret = rdma_bind_addr(cm_id, addr); 2011 if (ret) { 2012 pr_err("Binding RDMA address failed, err: %d\n", ret); 2013 goto err_cm; 2014 } 2015 ret = rdma_listen(cm_id, 64); 2016 if (ret) { 2017 pr_err("Listening on RDMA connection failed, err: %d\n", 2018 ret); 2019 goto err_cm; 2020 } 2021 2022 return cm_id; 2023 2024 err_cm: 2025 rdma_destroy_id(cm_id); 2026 err_out: 2027 2028 return ERR_PTR(ret); 2029 } 2030 2031 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 2032 { 2033 struct sockaddr_in6 sin = { 2034 .sin6_family = AF_INET6, 2035 .sin6_addr = IN6ADDR_ANY_INIT, 2036 .sin6_port = htons(port), 2037 }; 2038 struct sockaddr_ib sib = { 2039 .sib_family = AF_IB, 2040 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 2041 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 2042 .sib_pkey = cpu_to_be16(0xffff), 2043 }; 2044 struct rdma_cm_id *cm_ip, *cm_ib; 2045 int ret; 2046 2047 /* 2048 * We accept both IPoIB and IB connections, so we need to keep 2049 * two cm id's, one for each socket type and port space. 2050 * If the cm initialization of one of the id's fails, we abort 2051 * everything. 2052 */ 2053 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 2054 if (IS_ERR(cm_ip)) 2055 return PTR_ERR(cm_ip); 2056 2057 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 2058 if (IS_ERR(cm_ib)) { 2059 ret = PTR_ERR(cm_ib); 2060 goto free_cm_ip; 2061 } 2062 2063 ctx->cm_id_ip = cm_ip; 2064 ctx->cm_id_ib = cm_ib; 2065 2066 return 0; 2067 2068 free_cm_ip: 2069 rdma_destroy_id(cm_ip); 2070 2071 return ret; 2072 } 2073 2074 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 2075 { 2076 struct rtrs_srv_ctx *ctx; 2077 2078 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2079 if (!ctx) 2080 return NULL; 2081 2082 ctx->ops = *ops; 2083 mutex_init(&ctx->srv_mutex); 2084 INIT_LIST_HEAD(&ctx->srv_list); 2085 2086 return ctx; 2087 } 2088 2089 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2090 { 2091 WARN_ON(!list_empty(&ctx->srv_list)); 2092 mutex_destroy(&ctx->srv_mutex); 2093 kfree(ctx); 2094 } 2095 2096 static int rtrs_srv_add_one(struct ib_device *device) 2097 { 2098 struct rtrs_srv_ctx *ctx; 2099 int ret = 0; 2100 2101 mutex_lock(&ib_ctx.ib_dev_mutex); 2102 if (ib_ctx.ib_dev_count) 2103 goto out; 2104 2105 /* 2106 * Since our CM IDs are NOT bound to any ib device we will create them 2107 * only once 2108 */ 2109 ctx = ib_ctx.srv_ctx; 2110 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2111 if (ret) { 2112 /* 2113 * We errored out here. 2114 * According to the ib code, if we encounter an error here then the 2115 * error code is ignored, and no more calls to our ops are made. 2116 */ 2117 pr_err("Failed to initialize RDMA connection"); 2118 goto err_out; 2119 } 2120 2121 out: 2122 /* 2123 * Keep a track on the number of ib devices added 2124 */ 2125 ib_ctx.ib_dev_count++; 2126 2127 err_out: 2128 mutex_unlock(&ib_ctx.ib_dev_mutex); 2129 return ret; 2130 } 2131 2132 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2133 { 2134 struct rtrs_srv_ctx *ctx; 2135 2136 mutex_lock(&ib_ctx.ib_dev_mutex); 2137 ib_ctx.ib_dev_count--; 2138 2139 if (ib_ctx.ib_dev_count) 2140 goto out; 2141 2142 /* 2143 * Since our CM IDs are NOT bound to any ib device we will remove them 2144 * only once, when the last device is removed 2145 */ 2146 ctx = ib_ctx.srv_ctx; 2147 rdma_destroy_id(ctx->cm_id_ip); 2148 rdma_destroy_id(ctx->cm_id_ib); 2149 2150 out: 2151 mutex_unlock(&ib_ctx.ib_dev_mutex); 2152 } 2153 2154 static struct ib_client rtrs_srv_client = { 2155 .name = "rtrs_server", 2156 .add = rtrs_srv_add_one, 2157 .remove = rtrs_srv_remove_one 2158 }; 2159 2160 /** 2161 * rtrs_srv_open() - open RTRS server context 2162 * @ops: callback functions 2163 * @port: port to listen on 2164 * 2165 * Creates server context with specified callbacks. 2166 * 2167 * Return a valid pointer on success otherwise PTR_ERR. 2168 */ 2169 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2170 { 2171 struct rtrs_srv_ctx *ctx; 2172 int err; 2173 2174 ctx = alloc_srv_ctx(ops); 2175 if (!ctx) 2176 return ERR_PTR(-ENOMEM); 2177 2178 mutex_init(&ib_ctx.ib_dev_mutex); 2179 ib_ctx.srv_ctx = ctx; 2180 ib_ctx.port = port; 2181 2182 err = ib_register_client(&rtrs_srv_client); 2183 if (err) { 2184 free_srv_ctx(ctx); 2185 return ERR_PTR(err); 2186 } 2187 2188 return ctx; 2189 } 2190 EXPORT_SYMBOL(rtrs_srv_open); 2191 2192 static void close_paths(struct rtrs_srv_sess *srv) 2193 { 2194 struct rtrs_srv_path *srv_path; 2195 2196 mutex_lock(&srv->paths_mutex); 2197 list_for_each_entry(srv_path, &srv->paths_list, s.entry) 2198 close_path(srv_path); 2199 mutex_unlock(&srv->paths_mutex); 2200 } 2201 2202 static void close_ctx(struct rtrs_srv_ctx *ctx) 2203 { 2204 struct rtrs_srv_sess *srv; 2205 2206 mutex_lock(&ctx->srv_mutex); 2207 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2208 close_paths(srv); 2209 mutex_unlock(&ctx->srv_mutex); 2210 flush_workqueue(rtrs_wq); 2211 } 2212 2213 /** 2214 * rtrs_srv_close() - close RTRS server context 2215 * @ctx: pointer to server context 2216 * 2217 * Closes RTRS server context with all client sessions. 2218 */ 2219 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2220 { 2221 ib_unregister_client(&rtrs_srv_client); 2222 mutex_destroy(&ib_ctx.ib_dev_mutex); 2223 close_ctx(ctx); 2224 free_srv_ctx(ctx); 2225 } 2226 EXPORT_SYMBOL(rtrs_srv_close); 2227 2228 static int check_module_params(void) 2229 { 2230 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2231 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2232 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2233 return -EINVAL; 2234 } 2235 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) { 2236 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2237 max_chunk_size, MIN_CHUNK_SIZE); 2238 return -EINVAL; 2239 } 2240 2241 /* 2242 * Check if IB immediate data size is enough to hold the mem_id and the 2243 * offset inside the memory chunk 2244 */ 2245 if ((ilog2(sess_queue_depth - 1) + 1) + 2246 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2247 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2248 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2249 return -EINVAL; 2250 } 2251 2252 return 0; 2253 } 2254 2255 static int __init rtrs_server_init(void) 2256 { 2257 int err; 2258 2259 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2260 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2261 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2262 sess_queue_depth, always_invalidate); 2263 2264 rtrs_rdma_dev_pd_init(0, &dev_pd); 2265 2266 err = check_module_params(); 2267 if (err) { 2268 pr_err("Failed to load module, invalid module parameters, err: %d\n", 2269 err); 2270 return err; 2271 } 2272 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server"); 2273 if (IS_ERR(rtrs_dev_class)) { 2274 err = PTR_ERR(rtrs_dev_class); 2275 goto out_err; 2276 } 2277 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2278 if (!rtrs_wq) { 2279 err = -ENOMEM; 2280 goto out_dev_class; 2281 } 2282 2283 return 0; 2284 2285 out_dev_class: 2286 class_destroy(rtrs_dev_class); 2287 out_err: 2288 return err; 2289 } 2290 2291 static void __exit rtrs_server_exit(void) 2292 { 2293 destroy_workqueue(rtrs_wq); 2294 class_destroy(rtrs_dev_class); 2295 rtrs_rdma_dev_pd_deinit(&dev_pd); 2296 } 2297 2298 module_init(rtrs_server_init); 2299 module_exit(rtrs_server_exit); 2300