1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/rculist.h> 15 16 #include "rtrs-clt.h" 17 #include "rtrs-log.h" 18 19 #define RTRS_CONNECT_TIMEOUT_MS 30000 20 /* 21 * Wait a bit before trying to reconnect after a failure 22 * in order to give server time to finish clean up which 23 * leads to "false positives" failed reconnect attempts 24 */ 25 #define RTRS_RECONNECT_BACKOFF 1000 26 27 MODULE_DESCRIPTION("RDMA Transport Client"); 28 MODULE_LICENSE("GPL"); 29 30 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops; 31 static struct rtrs_rdma_dev_pd dev_pd = { 32 .ops = &dev_pd_ops 33 }; 34 35 static struct workqueue_struct *rtrs_wq; 36 static struct class *rtrs_clt_dev_class; 37 38 static inline bool rtrs_clt_is_connected(const struct rtrs_clt *clt) 39 { 40 struct rtrs_clt_sess *sess; 41 bool connected = false; 42 43 rcu_read_lock(); 44 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) 45 connected |= READ_ONCE(sess->state) == RTRS_CLT_CONNECTED; 46 rcu_read_unlock(); 47 48 return connected; 49 } 50 51 static struct rtrs_permit * 52 __rtrs_get_permit(struct rtrs_clt *clt, enum rtrs_clt_con_type con_type) 53 { 54 size_t max_depth = clt->queue_depth; 55 struct rtrs_permit *permit; 56 int bit; 57 58 /* 59 * Adapted from null_blk get_tag(). Callers from different cpus may 60 * grab the same bit, since find_first_zero_bit is not atomic. 61 * But then the test_and_set_bit_lock will fail for all the 62 * callers but one, so that they will loop again. 63 * This way an explicit spinlock is not required. 64 */ 65 do { 66 bit = find_first_zero_bit(clt->permits_map, max_depth); 67 if (unlikely(bit >= max_depth)) 68 return NULL; 69 } while (unlikely(test_and_set_bit_lock(bit, clt->permits_map))); 70 71 permit = get_permit(clt, bit); 72 WARN_ON(permit->mem_id != bit); 73 permit->cpu_id = raw_smp_processor_id(); 74 permit->con_type = con_type; 75 76 return permit; 77 } 78 79 static inline void __rtrs_put_permit(struct rtrs_clt *clt, 80 struct rtrs_permit *permit) 81 { 82 clear_bit_unlock(permit->mem_id, clt->permits_map); 83 } 84 85 /** 86 * rtrs_clt_get_permit() - allocates permit for future RDMA operation 87 * @clt: Current session 88 * @con_type: Type of connection to use with the permit 89 * @can_wait: Wait type 90 * 91 * Description: 92 * Allocates permit for the following RDMA operation. Permit is used 93 * to preallocate all resources and to propagate memory pressure 94 * up earlier. 95 * 96 * Context: 97 * Can sleep if @wait == RTRS_TAG_WAIT 98 */ 99 struct rtrs_permit *rtrs_clt_get_permit(struct rtrs_clt *clt, 100 enum rtrs_clt_con_type con_type, 101 int can_wait) 102 { 103 struct rtrs_permit *permit; 104 DEFINE_WAIT(wait); 105 106 permit = __rtrs_get_permit(clt, con_type); 107 if (likely(permit) || !can_wait) 108 return permit; 109 110 do { 111 prepare_to_wait(&clt->permits_wait, &wait, 112 TASK_UNINTERRUPTIBLE); 113 permit = __rtrs_get_permit(clt, con_type); 114 if (likely(permit)) 115 break; 116 117 io_schedule(); 118 } while (1); 119 120 finish_wait(&clt->permits_wait, &wait); 121 122 return permit; 123 } 124 EXPORT_SYMBOL(rtrs_clt_get_permit); 125 126 /** 127 * rtrs_clt_put_permit() - puts allocated permit 128 * @clt: Current session 129 * @permit: Permit to be freed 130 * 131 * Context: 132 * Does not matter 133 */ 134 void rtrs_clt_put_permit(struct rtrs_clt *clt, struct rtrs_permit *permit) 135 { 136 if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map))) 137 return; 138 139 __rtrs_put_permit(clt, permit); 140 141 /* 142 * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list 143 * before calling schedule(). So if rtrs_clt_get_permit() is sleeping 144 * it must have added itself to &clt->permits_wait before 145 * __rtrs_put_permit() finished. 146 * Hence it is safe to guard wake_up() with a waitqueue_active() test. 147 */ 148 if (waitqueue_active(&clt->permits_wait)) 149 wake_up(&clt->permits_wait); 150 } 151 EXPORT_SYMBOL(rtrs_clt_put_permit); 152 153 void *rtrs_permit_to_pdu(struct rtrs_permit *permit) 154 { 155 return permit + 1; 156 } 157 EXPORT_SYMBOL(rtrs_permit_to_pdu); 158 159 /** 160 * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit 161 * @sess: client session pointer 162 * @permit: permit for the allocation of the RDMA buffer 163 * Note: 164 * IO connection starts from 1. 165 * 0 connection is for user messages. 166 */ 167 static 168 struct rtrs_clt_con *rtrs_permit_to_clt_con(struct rtrs_clt_sess *sess, 169 struct rtrs_permit *permit) 170 { 171 int id = 0; 172 173 if (likely(permit->con_type == RTRS_IO_CON)) 174 id = (permit->cpu_id % (sess->s.con_num - 1)) + 1; 175 176 return to_clt_con(sess->s.con[id]); 177 } 178 179 /** 180 * __rtrs_clt_change_state() - change the session state through session state 181 * machine. 182 * 183 * @sess: client session to change the state of. 184 * @new_state: state to change to. 185 * 186 * returns true if successful, false if the requested state can not be set. 187 * 188 * Locks: 189 * state_wq lock must be hold. 190 */ 191 static bool __rtrs_clt_change_state(struct rtrs_clt_sess *sess, 192 enum rtrs_clt_state new_state) 193 { 194 enum rtrs_clt_state old_state; 195 bool changed = false; 196 197 lockdep_assert_held(&sess->state_wq.lock); 198 199 old_state = sess->state; 200 switch (new_state) { 201 case RTRS_CLT_CONNECTING: 202 switch (old_state) { 203 case RTRS_CLT_RECONNECTING: 204 changed = true; 205 fallthrough; 206 default: 207 break; 208 } 209 break; 210 case RTRS_CLT_RECONNECTING: 211 switch (old_state) { 212 case RTRS_CLT_CONNECTED: 213 case RTRS_CLT_CONNECTING_ERR: 214 case RTRS_CLT_CLOSED: 215 changed = true; 216 fallthrough; 217 default: 218 break; 219 } 220 break; 221 case RTRS_CLT_CONNECTED: 222 switch (old_state) { 223 case RTRS_CLT_CONNECTING: 224 changed = true; 225 fallthrough; 226 default: 227 break; 228 } 229 break; 230 case RTRS_CLT_CONNECTING_ERR: 231 switch (old_state) { 232 case RTRS_CLT_CONNECTING: 233 changed = true; 234 fallthrough; 235 default: 236 break; 237 } 238 break; 239 case RTRS_CLT_CLOSING: 240 switch (old_state) { 241 case RTRS_CLT_CONNECTING: 242 case RTRS_CLT_CONNECTING_ERR: 243 case RTRS_CLT_RECONNECTING: 244 case RTRS_CLT_CONNECTED: 245 changed = true; 246 fallthrough; 247 default: 248 break; 249 } 250 break; 251 case RTRS_CLT_CLOSED: 252 switch (old_state) { 253 case RTRS_CLT_CLOSING: 254 changed = true; 255 fallthrough; 256 default: 257 break; 258 } 259 break; 260 case RTRS_CLT_DEAD: 261 switch (old_state) { 262 case RTRS_CLT_CLOSED: 263 changed = true; 264 fallthrough; 265 default: 266 break; 267 } 268 break; 269 default: 270 break; 271 } 272 if (changed) { 273 sess->state = new_state; 274 wake_up_locked(&sess->state_wq); 275 } 276 277 return changed; 278 } 279 280 static bool rtrs_clt_change_state_from_to(struct rtrs_clt_sess *sess, 281 enum rtrs_clt_state old_state, 282 enum rtrs_clt_state new_state) 283 { 284 bool changed = false; 285 286 spin_lock_irq(&sess->state_wq.lock); 287 if (sess->state == old_state) 288 changed = __rtrs_clt_change_state(sess, new_state); 289 spin_unlock_irq(&sess->state_wq.lock); 290 291 return changed; 292 } 293 294 static void rtrs_rdma_error_recovery(struct rtrs_clt_con *con) 295 { 296 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 297 298 if (rtrs_clt_change_state_from_to(sess, 299 RTRS_CLT_CONNECTED, 300 RTRS_CLT_RECONNECTING)) { 301 struct rtrs_clt *clt = sess->clt; 302 unsigned int delay_ms; 303 304 /* 305 * Normal scenario, reconnect if we were successfully connected 306 */ 307 delay_ms = clt->reconnect_delay_sec * 1000; 308 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 309 msecs_to_jiffies(delay_ms)); 310 } else { 311 /* 312 * Error can happen just on establishing new connection, 313 * so notify waiter with error state, waiter is responsible 314 * for cleaning the rest and reconnect if needed. 315 */ 316 rtrs_clt_change_state_from_to(sess, 317 RTRS_CLT_CONNECTING, 318 RTRS_CLT_CONNECTING_ERR); 319 } 320 } 321 322 static void rtrs_clt_fast_reg_done(struct ib_cq *cq, struct ib_wc *wc) 323 { 324 struct rtrs_clt_con *con = cq->cq_context; 325 326 if (unlikely(wc->status != IB_WC_SUCCESS)) { 327 rtrs_err(con->c.sess, "Failed IB_WR_REG_MR: %s\n", 328 ib_wc_status_msg(wc->status)); 329 rtrs_rdma_error_recovery(con); 330 } 331 } 332 333 static struct ib_cqe fast_reg_cqe = { 334 .done = rtrs_clt_fast_reg_done 335 }; 336 337 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno, 338 bool notify, bool can_wait); 339 340 static void rtrs_clt_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 341 { 342 struct rtrs_clt_io_req *req = 343 container_of(wc->wr_cqe, typeof(*req), inv_cqe); 344 struct rtrs_clt_con *con = cq->cq_context; 345 346 if (unlikely(wc->status != IB_WC_SUCCESS)) { 347 rtrs_err(con->c.sess, "Failed IB_WR_LOCAL_INV: %s\n", 348 ib_wc_status_msg(wc->status)); 349 rtrs_rdma_error_recovery(con); 350 } 351 req->need_inv = false; 352 if (likely(req->need_inv_comp)) 353 complete(&req->inv_comp); 354 else 355 /* Complete request from INV callback */ 356 complete_rdma_req(req, req->inv_errno, true, false); 357 } 358 359 static int rtrs_inv_rkey(struct rtrs_clt_io_req *req) 360 { 361 struct rtrs_clt_con *con = req->con; 362 struct ib_send_wr wr = { 363 .opcode = IB_WR_LOCAL_INV, 364 .wr_cqe = &req->inv_cqe, 365 .send_flags = IB_SEND_SIGNALED, 366 .ex.invalidate_rkey = req->mr->rkey, 367 }; 368 req->inv_cqe.done = rtrs_clt_inv_rkey_done; 369 370 return ib_post_send(con->c.qp, &wr, NULL); 371 } 372 373 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno, 374 bool notify, bool can_wait) 375 { 376 struct rtrs_clt_con *con = req->con; 377 struct rtrs_clt_sess *sess; 378 int err; 379 380 if (WARN_ON(!req->in_use)) 381 return; 382 if (WARN_ON(!req->con)) 383 return; 384 sess = to_clt_sess(con->c.sess); 385 386 if (req->sg_cnt) { 387 if (unlikely(req->dir == DMA_FROM_DEVICE && req->need_inv)) { 388 /* 389 * We are here to invalidate read requests 390 * ourselves. In normal scenario server should 391 * send INV for all read requests, but 392 * we are here, thus two things could happen: 393 * 394 * 1. this is failover, when errno != 0 395 * and can_wait == 1, 396 * 397 * 2. something totally bad happened and 398 * server forgot to send INV, so we 399 * should do that ourselves. 400 */ 401 402 if (likely(can_wait)) { 403 req->need_inv_comp = true; 404 } else { 405 /* This should be IO path, so always notify */ 406 WARN_ON(!notify); 407 /* Save errno for INV callback */ 408 req->inv_errno = errno; 409 } 410 411 err = rtrs_inv_rkey(req); 412 if (unlikely(err)) { 413 rtrs_err(con->c.sess, "Send INV WR key=%#x: %d\n", 414 req->mr->rkey, err); 415 } else if (likely(can_wait)) { 416 wait_for_completion(&req->inv_comp); 417 } else { 418 /* 419 * Something went wrong, so request will be 420 * completed from INV callback. 421 */ 422 WARN_ON_ONCE(1); 423 424 return; 425 } 426 } 427 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist, 428 req->sg_cnt, req->dir); 429 } 430 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 431 atomic_dec(&sess->stats->inflight); 432 433 req->in_use = false; 434 req->con = NULL; 435 436 if (notify) 437 req->conf(req->priv, errno); 438 } 439 440 static int rtrs_post_send_rdma(struct rtrs_clt_con *con, 441 struct rtrs_clt_io_req *req, 442 struct rtrs_rbuf *rbuf, u32 off, 443 u32 imm, struct ib_send_wr *wr) 444 { 445 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 446 enum ib_send_flags flags; 447 struct ib_sge sge; 448 449 if (unlikely(!req->sg_size)) { 450 rtrs_wrn(con->c.sess, 451 "Doing RDMA Write failed, no data supplied\n"); 452 return -EINVAL; 453 } 454 455 /* user data and user message in the first list element */ 456 sge.addr = req->iu->dma_addr; 457 sge.length = req->sg_size; 458 sge.lkey = sess->s.dev->ib_pd->local_dma_lkey; 459 460 /* 461 * From time to time we have to post signalled sends, 462 * or send queue will fill up and only QP reset can help. 463 */ 464 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ? 465 0 : IB_SEND_SIGNALED; 466 467 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr, 468 req->sg_size, DMA_TO_DEVICE); 469 470 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1, 471 rbuf->rkey, rbuf->addr + off, 472 imm, flags, wr); 473 } 474 475 static void process_io_rsp(struct rtrs_clt_sess *sess, u32 msg_id, 476 s16 errno, bool w_inval) 477 { 478 struct rtrs_clt_io_req *req; 479 480 if (WARN_ON(msg_id >= sess->queue_depth)) 481 return; 482 483 req = &sess->reqs[msg_id]; 484 /* Drop need_inv if server responded with send with invalidation */ 485 req->need_inv &= !w_inval; 486 complete_rdma_req(req, errno, true, false); 487 } 488 489 static void rtrs_clt_recv_done(struct rtrs_clt_con *con, struct ib_wc *wc) 490 { 491 struct rtrs_iu *iu; 492 int err; 493 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 494 495 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F); 496 iu = container_of(wc->wr_cqe, struct rtrs_iu, 497 cqe); 498 err = rtrs_iu_post_recv(&con->c, iu); 499 if (unlikely(err)) { 500 rtrs_err(con->c.sess, "post iu failed %d\n", err); 501 rtrs_rdma_error_recovery(con); 502 } 503 } 504 505 static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con *con, struct ib_wc *wc) 506 { 507 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 508 struct rtrs_msg_rkey_rsp *msg; 509 u32 imm_type, imm_payload; 510 bool w_inval = false; 511 struct rtrs_iu *iu; 512 u32 buf_id; 513 int err; 514 515 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F); 516 517 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 518 519 if (unlikely(wc->byte_len < sizeof(*msg))) { 520 rtrs_err(con->c.sess, "rkey response is malformed: size %d\n", 521 wc->byte_len); 522 goto out; 523 } 524 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 525 iu->size, DMA_FROM_DEVICE); 526 msg = iu->buf; 527 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_RKEY_RSP)) { 528 rtrs_err(sess->clt, "rkey response is malformed: type %d\n", 529 le16_to_cpu(msg->type)); 530 goto out; 531 } 532 buf_id = le16_to_cpu(msg->buf_id); 533 if (WARN_ON(buf_id >= sess->queue_depth)) 534 goto out; 535 536 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), &imm_type, &imm_payload); 537 if (likely(imm_type == RTRS_IO_RSP_IMM || 538 imm_type == RTRS_IO_RSP_W_INV_IMM)) { 539 u32 msg_id; 540 541 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM); 542 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err); 543 544 if (WARN_ON(buf_id != msg_id)) 545 goto out; 546 sess->rbufs[buf_id].rkey = le32_to_cpu(msg->rkey); 547 process_io_rsp(sess, msg_id, err, w_inval); 548 } 549 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, iu->dma_addr, 550 iu->size, DMA_FROM_DEVICE); 551 return rtrs_clt_recv_done(con, wc); 552 out: 553 rtrs_rdma_error_recovery(con); 554 } 555 556 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 557 558 static struct ib_cqe io_comp_cqe = { 559 .done = rtrs_clt_rdma_done 560 }; 561 562 /* 563 * Post x2 empty WRs: first is for this RDMA with IMM, 564 * second is for RECV with INV, which happened earlier. 565 */ 566 static int rtrs_post_recv_empty_x2(struct rtrs_con *con, struct ib_cqe *cqe) 567 { 568 struct ib_recv_wr wr_arr[2], *wr; 569 int i; 570 571 memset(wr_arr, 0, sizeof(wr_arr)); 572 for (i = 0; i < ARRAY_SIZE(wr_arr); i++) { 573 wr = &wr_arr[i]; 574 wr->wr_cqe = cqe; 575 if (i) 576 /* Chain backwards */ 577 wr->next = &wr_arr[i - 1]; 578 } 579 580 return ib_post_recv(con->qp, wr, NULL); 581 } 582 583 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 584 { 585 struct rtrs_clt_con *con = cq->cq_context; 586 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 587 u32 imm_type, imm_payload; 588 bool w_inval = false; 589 int err; 590 591 if (unlikely(wc->status != IB_WC_SUCCESS)) { 592 if (wc->status != IB_WC_WR_FLUSH_ERR) { 593 rtrs_err(sess->clt, "RDMA failed: %s\n", 594 ib_wc_status_msg(wc->status)); 595 rtrs_rdma_error_recovery(con); 596 } 597 return; 598 } 599 rtrs_clt_update_wc_stats(con); 600 601 switch (wc->opcode) { 602 case IB_WC_RECV_RDMA_WITH_IMM: 603 /* 604 * post_recv() RDMA write completions of IO reqs (read/write) 605 * and hb 606 */ 607 if (WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done)) 608 return; 609 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 610 &imm_type, &imm_payload); 611 if (likely(imm_type == RTRS_IO_RSP_IMM || 612 imm_type == RTRS_IO_RSP_W_INV_IMM)) { 613 u32 msg_id; 614 615 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM); 616 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err); 617 618 process_io_rsp(sess, msg_id, err, w_inval); 619 } else if (imm_type == RTRS_HB_MSG_IMM) { 620 WARN_ON(con->c.cid); 621 rtrs_send_hb_ack(&sess->s); 622 if (sess->flags == RTRS_MSG_NEW_RKEY_F) 623 return rtrs_clt_recv_done(con, wc); 624 } else if (imm_type == RTRS_HB_ACK_IMM) { 625 WARN_ON(con->c.cid); 626 sess->s.hb_missed_cnt = 0; 627 if (sess->flags == RTRS_MSG_NEW_RKEY_F) 628 return rtrs_clt_recv_done(con, wc); 629 } else { 630 rtrs_wrn(con->c.sess, "Unknown IMM type %u\n", 631 imm_type); 632 } 633 if (w_inval) 634 /* 635 * Post x2 empty WRs: first is for this RDMA with IMM, 636 * second is for RECV with INV, which happened earlier. 637 */ 638 err = rtrs_post_recv_empty_x2(&con->c, &io_comp_cqe); 639 else 640 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 641 if (unlikely(err)) { 642 rtrs_err(con->c.sess, "rtrs_post_recv_empty(): %d\n", 643 err); 644 rtrs_rdma_error_recovery(con); 645 break; 646 } 647 break; 648 case IB_WC_RECV: 649 /* 650 * Key invalidations from server side 651 */ 652 WARN_ON(!(wc->wc_flags & IB_WC_WITH_INVALIDATE || 653 wc->wc_flags & IB_WC_WITH_IMM)); 654 WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done); 655 if (sess->flags == RTRS_MSG_NEW_RKEY_F) { 656 if (wc->wc_flags & IB_WC_WITH_INVALIDATE) 657 return rtrs_clt_recv_done(con, wc); 658 659 return rtrs_clt_rkey_rsp_done(con, wc); 660 } 661 break; 662 case IB_WC_RDMA_WRITE: 663 /* 664 * post_send() RDMA write completions of IO reqs (read/write) 665 * and hb 666 */ 667 break; 668 669 default: 670 rtrs_wrn(sess->clt, "Unexpected WC type: %d\n", wc->opcode); 671 return; 672 } 673 } 674 675 static int post_recv_io(struct rtrs_clt_con *con, size_t q_size) 676 { 677 int err, i; 678 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 679 680 for (i = 0; i < q_size; i++) { 681 if (sess->flags == RTRS_MSG_NEW_RKEY_F) { 682 struct rtrs_iu *iu = &con->rsp_ius[i]; 683 684 err = rtrs_iu_post_recv(&con->c, iu); 685 } else { 686 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 687 } 688 if (unlikely(err)) 689 return err; 690 } 691 692 return 0; 693 } 694 695 static int post_recv_sess(struct rtrs_clt_sess *sess) 696 { 697 size_t q_size = 0; 698 int err, cid; 699 700 for (cid = 0; cid < sess->s.con_num; cid++) { 701 if (cid == 0) 702 q_size = SERVICE_CON_QUEUE_DEPTH; 703 else 704 q_size = sess->queue_depth; 705 706 /* 707 * x2 for RDMA read responses + FR key invalidations, 708 * RDMA writes do not require any FR registrations. 709 */ 710 q_size *= 2; 711 712 err = post_recv_io(to_clt_con(sess->s.con[cid]), q_size); 713 if (unlikely(err)) { 714 rtrs_err(sess->clt, "post_recv_io(), err: %d\n", err); 715 return err; 716 } 717 } 718 719 return 0; 720 } 721 722 struct path_it { 723 int i; 724 struct list_head skip_list; 725 struct rtrs_clt *clt; 726 struct rtrs_clt_sess *(*next_path)(struct path_it *it); 727 }; 728 729 /** 730 * list_next_or_null_rr_rcu - get next list element in round-robin fashion. 731 * @head: the head for the list. 732 * @ptr: the list head to take the next element from. 733 * @type: the type of the struct this is embedded in. 734 * @memb: the name of the list_head within the struct. 735 * 736 * Next element returned in round-robin fashion, i.e. head will be skipped, 737 * but if list is observed as empty, NULL will be returned. 738 * 739 * This primitive may safely run concurrently with the _rcu list-mutation 740 * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock(). 741 */ 742 #define list_next_or_null_rr_rcu(head, ptr, type, memb) \ 743 ({ \ 744 list_next_or_null_rcu(head, ptr, type, memb) ?: \ 745 list_next_or_null_rcu(head, READ_ONCE((ptr)->next), \ 746 type, memb); \ 747 }) 748 749 /** 750 * get_next_path_rr() - Returns path in round-robin fashion. 751 * @it: the path pointer 752 * 753 * Related to @MP_POLICY_RR 754 * 755 * Locks: 756 * rcu_read_lock() must be hold. 757 */ 758 static struct rtrs_clt_sess *get_next_path_rr(struct path_it *it) 759 { 760 struct rtrs_clt_sess __rcu **ppcpu_path; 761 struct rtrs_clt_sess *path; 762 struct rtrs_clt *clt; 763 764 clt = it->clt; 765 766 /* 767 * Here we use two RCU objects: @paths_list and @pcpu_path 768 * pointer. See rtrs_clt_remove_path_from_arr() for details 769 * how that is handled. 770 */ 771 772 ppcpu_path = this_cpu_ptr(clt->pcpu_path); 773 path = rcu_dereference(*ppcpu_path); 774 if (unlikely(!path)) 775 path = list_first_or_null_rcu(&clt->paths_list, 776 typeof(*path), s.entry); 777 else 778 path = list_next_or_null_rr_rcu(&clt->paths_list, 779 &path->s.entry, 780 typeof(*path), 781 s.entry); 782 rcu_assign_pointer(*ppcpu_path, path); 783 784 return path; 785 } 786 787 /** 788 * get_next_path_min_inflight() - Returns path with minimal inflight count. 789 * @it: the path pointer 790 * 791 * Related to @MP_POLICY_MIN_INFLIGHT 792 * 793 * Locks: 794 * rcu_read_lock() must be hold. 795 */ 796 static struct rtrs_clt_sess *get_next_path_min_inflight(struct path_it *it) 797 { 798 struct rtrs_clt_sess *min_path = NULL; 799 struct rtrs_clt *clt = it->clt; 800 struct rtrs_clt_sess *sess; 801 int min_inflight = INT_MAX; 802 int inflight; 803 804 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) { 805 if (unlikely(!list_empty(raw_cpu_ptr(sess->mp_skip_entry)))) 806 continue; 807 808 inflight = atomic_read(&sess->stats->inflight); 809 810 if (inflight < min_inflight) { 811 min_inflight = inflight; 812 min_path = sess; 813 } 814 } 815 816 /* 817 * add the path to the skip list, so that next time we can get 818 * a different one 819 */ 820 if (min_path) 821 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list); 822 823 return min_path; 824 } 825 826 static inline void path_it_init(struct path_it *it, struct rtrs_clt *clt) 827 { 828 INIT_LIST_HEAD(&it->skip_list); 829 it->clt = clt; 830 it->i = 0; 831 832 if (clt->mp_policy == MP_POLICY_RR) 833 it->next_path = get_next_path_rr; 834 else 835 it->next_path = get_next_path_min_inflight; 836 } 837 838 static inline void path_it_deinit(struct path_it *it) 839 { 840 struct list_head *skip, *tmp; 841 /* 842 * The skip_list is used only for the MIN_INFLIGHT policy. 843 * We need to remove paths from it, so that next IO can insert 844 * paths (->mp_skip_entry) into a skip_list again. 845 */ 846 list_for_each_safe(skip, tmp, &it->skip_list) 847 list_del_init(skip); 848 } 849 850 /** 851 * rtrs_clt_init_req() Initialize an rtrs_clt_io_req holding information 852 * about an inflight IO. 853 * The user buffer holding user control message (not data) is copied into 854 * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will 855 * also hold the control message of rtrs. 856 * @req: an io request holding information about IO. 857 * @sess: client session 858 * @conf: conformation callback function to notify upper layer. 859 * @permit: permit for allocation of RDMA remote buffer 860 * @priv: private pointer 861 * @vec: kernel vector containing control message 862 * @usr_len: length of the user message 863 * @sg: scater list for IO data 864 * @sg_cnt: number of scater list entries 865 * @data_len: length of the IO data 866 * @dir: direction of the IO. 867 */ 868 static void rtrs_clt_init_req(struct rtrs_clt_io_req *req, 869 struct rtrs_clt_sess *sess, 870 void (*conf)(void *priv, int errno), 871 struct rtrs_permit *permit, void *priv, 872 const struct kvec *vec, size_t usr_len, 873 struct scatterlist *sg, size_t sg_cnt, 874 size_t data_len, int dir) 875 { 876 struct iov_iter iter; 877 size_t len; 878 879 req->permit = permit; 880 req->in_use = true; 881 req->usr_len = usr_len; 882 req->data_len = data_len; 883 req->sglist = sg; 884 req->sg_cnt = sg_cnt; 885 req->priv = priv; 886 req->dir = dir; 887 req->con = rtrs_permit_to_clt_con(sess, permit); 888 req->conf = conf; 889 req->need_inv = false; 890 req->need_inv_comp = false; 891 req->inv_errno = 0; 892 893 iov_iter_kvec(&iter, READ, vec, 1, usr_len); 894 len = _copy_from_iter(req->iu->buf, usr_len, &iter); 895 WARN_ON(len != usr_len); 896 897 reinit_completion(&req->inv_comp); 898 } 899 900 static struct rtrs_clt_io_req * 901 rtrs_clt_get_req(struct rtrs_clt_sess *sess, 902 void (*conf)(void *priv, int errno), 903 struct rtrs_permit *permit, void *priv, 904 const struct kvec *vec, size_t usr_len, 905 struct scatterlist *sg, size_t sg_cnt, 906 size_t data_len, int dir) 907 { 908 struct rtrs_clt_io_req *req; 909 910 req = &sess->reqs[permit->mem_id]; 911 rtrs_clt_init_req(req, sess, conf, permit, priv, vec, usr_len, 912 sg, sg_cnt, data_len, dir); 913 return req; 914 } 915 916 static struct rtrs_clt_io_req * 917 rtrs_clt_get_copy_req(struct rtrs_clt_sess *alive_sess, 918 struct rtrs_clt_io_req *fail_req) 919 { 920 struct rtrs_clt_io_req *req; 921 struct kvec vec = { 922 .iov_base = fail_req->iu->buf, 923 .iov_len = fail_req->usr_len 924 }; 925 926 req = &alive_sess->reqs[fail_req->permit->mem_id]; 927 rtrs_clt_init_req(req, alive_sess, fail_req->conf, fail_req->permit, 928 fail_req->priv, &vec, fail_req->usr_len, 929 fail_req->sglist, fail_req->sg_cnt, 930 fail_req->data_len, fail_req->dir); 931 return req; 932 } 933 934 static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con, 935 struct rtrs_clt_io_req *req, 936 struct rtrs_rbuf *rbuf, 937 u32 size, u32 imm) 938 { 939 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 940 struct ib_sge *sge = req->sge; 941 enum ib_send_flags flags; 942 struct scatterlist *sg; 943 size_t num_sge; 944 int i; 945 946 for_each_sg(req->sglist, sg, req->sg_cnt, i) { 947 sge[i].addr = sg_dma_address(sg); 948 sge[i].length = sg_dma_len(sg); 949 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey; 950 } 951 sge[i].addr = req->iu->dma_addr; 952 sge[i].length = size; 953 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey; 954 955 num_sge = 1 + req->sg_cnt; 956 957 /* 958 * From time to time we have to post signalled sends, 959 * or send queue will fill up and only QP reset can help. 960 */ 961 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ? 962 0 : IB_SEND_SIGNALED; 963 964 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr, 965 size, DMA_TO_DEVICE); 966 967 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge, 968 rbuf->rkey, rbuf->addr, imm, 969 flags, NULL); 970 } 971 972 static int rtrs_clt_write_req(struct rtrs_clt_io_req *req) 973 { 974 struct rtrs_clt_con *con = req->con; 975 struct rtrs_sess *s = con->c.sess; 976 struct rtrs_clt_sess *sess = to_clt_sess(s); 977 struct rtrs_msg_rdma_write *msg; 978 979 struct rtrs_rbuf *rbuf; 980 int ret, count = 0; 981 u32 imm, buf_id; 982 983 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len; 984 985 if (unlikely(tsize > sess->chunk_size)) { 986 rtrs_wrn(s, "Write request failed, size too big %zu > %d\n", 987 tsize, sess->chunk_size); 988 return -EMSGSIZE; 989 } 990 if (req->sg_cnt) { 991 count = ib_dma_map_sg(sess->s.dev->ib_dev, req->sglist, 992 req->sg_cnt, req->dir); 993 if (unlikely(!count)) { 994 rtrs_wrn(s, "Write request failed, map failed\n"); 995 return -EINVAL; 996 } 997 } 998 /* put rtrs msg after sg and user message */ 999 msg = req->iu->buf + req->usr_len; 1000 msg->type = cpu_to_le16(RTRS_MSG_WRITE); 1001 msg->usr_len = cpu_to_le16(req->usr_len); 1002 1003 /* rtrs message on server side will be after user data and message */ 1004 imm = req->permit->mem_off + req->data_len + req->usr_len; 1005 imm = rtrs_to_io_req_imm(imm); 1006 buf_id = req->permit->mem_id; 1007 req->sg_size = tsize; 1008 rbuf = &sess->rbufs[buf_id]; 1009 1010 /* 1011 * Update stats now, after request is successfully sent it is not 1012 * safe anymore to touch it. 1013 */ 1014 rtrs_clt_update_all_stats(req, WRITE); 1015 1016 ret = rtrs_post_rdma_write_sg(req->con, req, rbuf, 1017 req->usr_len + sizeof(*msg), 1018 imm); 1019 if (unlikely(ret)) { 1020 rtrs_err(s, "Write request failed: %d\n", ret); 1021 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 1022 atomic_dec(&sess->stats->inflight); 1023 if (req->sg_cnt) 1024 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist, 1025 req->sg_cnt, req->dir); 1026 } 1027 1028 return ret; 1029 } 1030 1031 static int rtrs_map_sg_fr(struct rtrs_clt_io_req *req, size_t count) 1032 { 1033 int nr; 1034 1035 /* Align the MR to a 4K page size to match the block virt boundary */ 1036 nr = ib_map_mr_sg(req->mr, req->sglist, count, NULL, SZ_4K); 1037 if (nr < 0) 1038 return nr; 1039 if (unlikely(nr < req->sg_cnt)) 1040 return -EINVAL; 1041 ib_update_fast_reg_key(req->mr, ib_inc_rkey(req->mr->rkey)); 1042 1043 return nr; 1044 } 1045 1046 static int rtrs_clt_read_req(struct rtrs_clt_io_req *req) 1047 { 1048 struct rtrs_clt_con *con = req->con; 1049 struct rtrs_sess *s = con->c.sess; 1050 struct rtrs_clt_sess *sess = to_clt_sess(s); 1051 struct rtrs_msg_rdma_read *msg; 1052 struct rtrs_ib_dev *dev; 1053 1054 struct ib_reg_wr rwr; 1055 struct ib_send_wr *wr = NULL; 1056 1057 int ret, count = 0; 1058 u32 imm, buf_id; 1059 1060 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len; 1061 1062 s = &sess->s; 1063 dev = sess->s.dev; 1064 1065 if (unlikely(tsize > sess->chunk_size)) { 1066 rtrs_wrn(s, 1067 "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n", 1068 tsize, sess->chunk_size); 1069 return -EMSGSIZE; 1070 } 1071 1072 if (req->sg_cnt) { 1073 count = ib_dma_map_sg(dev->ib_dev, req->sglist, req->sg_cnt, 1074 req->dir); 1075 if (unlikely(!count)) { 1076 rtrs_wrn(s, 1077 "Read request failed, dma map failed\n"); 1078 return -EINVAL; 1079 } 1080 } 1081 /* put our message into req->buf after user message*/ 1082 msg = req->iu->buf + req->usr_len; 1083 msg->type = cpu_to_le16(RTRS_MSG_READ); 1084 msg->usr_len = cpu_to_le16(req->usr_len); 1085 1086 if (count) { 1087 ret = rtrs_map_sg_fr(req, count); 1088 if (ret < 0) { 1089 rtrs_err_rl(s, 1090 "Read request failed, failed to map fast reg. data, err: %d\n", 1091 ret); 1092 ib_dma_unmap_sg(dev->ib_dev, req->sglist, req->sg_cnt, 1093 req->dir); 1094 return ret; 1095 } 1096 rwr = (struct ib_reg_wr) { 1097 .wr.opcode = IB_WR_REG_MR, 1098 .wr.wr_cqe = &fast_reg_cqe, 1099 .mr = req->mr, 1100 .key = req->mr->rkey, 1101 .access = (IB_ACCESS_LOCAL_WRITE | 1102 IB_ACCESS_REMOTE_WRITE), 1103 }; 1104 wr = &rwr.wr; 1105 1106 msg->sg_cnt = cpu_to_le16(1); 1107 msg->flags = cpu_to_le16(RTRS_MSG_NEED_INVAL_F); 1108 1109 msg->desc[0].addr = cpu_to_le64(req->mr->iova); 1110 msg->desc[0].key = cpu_to_le32(req->mr->rkey); 1111 msg->desc[0].len = cpu_to_le32(req->mr->length); 1112 1113 /* Further invalidation is required */ 1114 req->need_inv = !!RTRS_MSG_NEED_INVAL_F; 1115 1116 } else { 1117 msg->sg_cnt = 0; 1118 msg->flags = 0; 1119 } 1120 /* 1121 * rtrs message will be after the space reserved for disk data and 1122 * user message 1123 */ 1124 imm = req->permit->mem_off + req->data_len + req->usr_len; 1125 imm = rtrs_to_io_req_imm(imm); 1126 buf_id = req->permit->mem_id; 1127 1128 req->sg_size = sizeof(*msg); 1129 req->sg_size += le16_to_cpu(msg->sg_cnt) * sizeof(struct rtrs_sg_desc); 1130 req->sg_size += req->usr_len; 1131 1132 /* 1133 * Update stats now, after request is successfully sent it is not 1134 * safe anymore to touch it. 1135 */ 1136 rtrs_clt_update_all_stats(req, READ); 1137 1138 ret = rtrs_post_send_rdma(req->con, req, &sess->rbufs[buf_id], 1139 req->data_len, imm, wr); 1140 if (unlikely(ret)) { 1141 rtrs_err(s, "Read request failed: %d\n", ret); 1142 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 1143 atomic_dec(&sess->stats->inflight); 1144 req->need_inv = false; 1145 if (req->sg_cnt) 1146 ib_dma_unmap_sg(dev->ib_dev, req->sglist, 1147 req->sg_cnt, req->dir); 1148 } 1149 1150 return ret; 1151 } 1152 1153 /** 1154 * rtrs_clt_failover_req() Try to find an active path for a failed request 1155 * @clt: clt context 1156 * @fail_req: a failed io request. 1157 */ 1158 static int rtrs_clt_failover_req(struct rtrs_clt *clt, 1159 struct rtrs_clt_io_req *fail_req) 1160 { 1161 struct rtrs_clt_sess *alive_sess; 1162 struct rtrs_clt_io_req *req; 1163 int err = -ECONNABORTED; 1164 struct path_it it; 1165 1166 rcu_read_lock(); 1167 for (path_it_init(&it, clt); 1168 (alive_sess = it.next_path(&it)) && it.i < it.clt->paths_num; 1169 it.i++) { 1170 if (unlikely(READ_ONCE(alive_sess->state) != 1171 RTRS_CLT_CONNECTED)) 1172 continue; 1173 req = rtrs_clt_get_copy_req(alive_sess, fail_req); 1174 if (req->dir == DMA_TO_DEVICE) 1175 err = rtrs_clt_write_req(req); 1176 else 1177 err = rtrs_clt_read_req(req); 1178 if (unlikely(err)) { 1179 req->in_use = false; 1180 continue; 1181 } 1182 /* Success path */ 1183 rtrs_clt_inc_failover_cnt(alive_sess->stats); 1184 break; 1185 } 1186 path_it_deinit(&it); 1187 rcu_read_unlock(); 1188 1189 return err; 1190 } 1191 1192 static void fail_all_outstanding_reqs(struct rtrs_clt_sess *sess) 1193 { 1194 struct rtrs_clt *clt = sess->clt; 1195 struct rtrs_clt_io_req *req; 1196 int i, err; 1197 1198 if (!sess->reqs) 1199 return; 1200 for (i = 0; i < sess->queue_depth; ++i) { 1201 req = &sess->reqs[i]; 1202 if (!req->in_use) 1203 continue; 1204 1205 /* 1206 * Safely (without notification) complete failed request. 1207 * After completion this request is still useble and can 1208 * be failovered to another path. 1209 */ 1210 complete_rdma_req(req, -ECONNABORTED, false, true); 1211 1212 err = rtrs_clt_failover_req(clt, req); 1213 if (unlikely(err)) 1214 /* Failover failed, notify anyway */ 1215 req->conf(req->priv, err); 1216 } 1217 } 1218 1219 static void free_sess_reqs(struct rtrs_clt_sess *sess) 1220 { 1221 struct rtrs_clt_io_req *req; 1222 int i; 1223 1224 if (!sess->reqs) 1225 return; 1226 for (i = 0; i < sess->queue_depth; ++i) { 1227 req = &sess->reqs[i]; 1228 if (req->mr) 1229 ib_dereg_mr(req->mr); 1230 kfree(req->sge); 1231 rtrs_iu_free(req->iu, DMA_TO_DEVICE, 1232 sess->s.dev->ib_dev, 1); 1233 } 1234 kfree(sess->reqs); 1235 sess->reqs = NULL; 1236 } 1237 1238 static int alloc_sess_reqs(struct rtrs_clt_sess *sess) 1239 { 1240 struct rtrs_clt_io_req *req; 1241 struct rtrs_clt *clt = sess->clt; 1242 int i, err = -ENOMEM; 1243 1244 sess->reqs = kcalloc(sess->queue_depth, sizeof(*sess->reqs), 1245 GFP_KERNEL); 1246 if (!sess->reqs) 1247 return -ENOMEM; 1248 1249 for (i = 0; i < sess->queue_depth; ++i) { 1250 req = &sess->reqs[i]; 1251 req->iu = rtrs_iu_alloc(1, sess->max_hdr_size, GFP_KERNEL, 1252 sess->s.dev->ib_dev, 1253 DMA_TO_DEVICE, 1254 rtrs_clt_rdma_done); 1255 if (!req->iu) 1256 goto out; 1257 1258 req->sge = kmalloc_array(clt->max_segments + 1, 1259 sizeof(*req->sge), GFP_KERNEL); 1260 if (!req->sge) 1261 goto out; 1262 1263 req->mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 1264 sess->max_pages_per_mr); 1265 if (IS_ERR(req->mr)) { 1266 err = PTR_ERR(req->mr); 1267 req->mr = NULL; 1268 pr_err("Failed to alloc sess->max_pages_per_mr %d\n", 1269 sess->max_pages_per_mr); 1270 goto out; 1271 } 1272 1273 init_completion(&req->inv_comp); 1274 } 1275 1276 return 0; 1277 1278 out: 1279 free_sess_reqs(sess); 1280 1281 return err; 1282 } 1283 1284 static int alloc_permits(struct rtrs_clt *clt) 1285 { 1286 unsigned int chunk_bits; 1287 int err, i; 1288 1289 clt->permits_map = kcalloc(BITS_TO_LONGS(clt->queue_depth), 1290 sizeof(long), GFP_KERNEL); 1291 if (!clt->permits_map) { 1292 err = -ENOMEM; 1293 goto out_err; 1294 } 1295 clt->permits = kcalloc(clt->queue_depth, permit_size(clt), GFP_KERNEL); 1296 if (!clt->permits) { 1297 err = -ENOMEM; 1298 goto err_map; 1299 } 1300 chunk_bits = ilog2(clt->queue_depth - 1) + 1; 1301 for (i = 0; i < clt->queue_depth; i++) { 1302 struct rtrs_permit *permit; 1303 1304 permit = get_permit(clt, i); 1305 permit->mem_id = i; 1306 permit->mem_off = i << (MAX_IMM_PAYL_BITS - chunk_bits); 1307 } 1308 1309 return 0; 1310 1311 err_map: 1312 kfree(clt->permits_map); 1313 clt->permits_map = NULL; 1314 out_err: 1315 return err; 1316 } 1317 1318 static void free_permits(struct rtrs_clt *clt) 1319 { 1320 kfree(clt->permits_map); 1321 clt->permits_map = NULL; 1322 kfree(clt->permits); 1323 clt->permits = NULL; 1324 } 1325 1326 static void query_fast_reg_mode(struct rtrs_clt_sess *sess) 1327 { 1328 struct ib_device *ib_dev; 1329 u64 max_pages_per_mr; 1330 int mr_page_shift; 1331 1332 ib_dev = sess->s.dev->ib_dev; 1333 1334 /* 1335 * Use the smallest page size supported by the HCA, down to a 1336 * minimum of 4096 bytes. We're unlikely to build large sglists 1337 * out of smaller entries. 1338 */ 1339 mr_page_shift = max(12, ffs(ib_dev->attrs.page_size_cap) - 1); 1340 max_pages_per_mr = ib_dev->attrs.max_mr_size; 1341 do_div(max_pages_per_mr, (1ull << mr_page_shift)); 1342 sess->max_pages_per_mr = 1343 min3(sess->max_pages_per_mr, (u32)max_pages_per_mr, 1344 ib_dev->attrs.max_fast_reg_page_list_len); 1345 sess->max_send_sge = ib_dev->attrs.max_send_sge; 1346 } 1347 1348 static bool rtrs_clt_change_state_get_old(struct rtrs_clt_sess *sess, 1349 enum rtrs_clt_state new_state, 1350 enum rtrs_clt_state *old_state) 1351 { 1352 bool changed; 1353 1354 spin_lock_irq(&sess->state_wq.lock); 1355 *old_state = sess->state; 1356 changed = __rtrs_clt_change_state(sess, new_state); 1357 spin_unlock_irq(&sess->state_wq.lock); 1358 1359 return changed; 1360 } 1361 1362 static bool rtrs_clt_change_state(struct rtrs_clt_sess *sess, 1363 enum rtrs_clt_state new_state) 1364 { 1365 enum rtrs_clt_state old_state; 1366 1367 return rtrs_clt_change_state_get_old(sess, new_state, &old_state); 1368 } 1369 1370 static void rtrs_clt_hb_err_handler(struct rtrs_con *c) 1371 { 1372 struct rtrs_clt_con *con = container_of(c, typeof(*con), c); 1373 1374 rtrs_rdma_error_recovery(con); 1375 } 1376 1377 static void rtrs_clt_init_hb(struct rtrs_clt_sess *sess) 1378 { 1379 rtrs_init_hb(&sess->s, &io_comp_cqe, 1380 RTRS_HB_INTERVAL_MS, 1381 RTRS_HB_MISSED_MAX, 1382 rtrs_clt_hb_err_handler, 1383 rtrs_wq); 1384 } 1385 1386 static void rtrs_clt_start_hb(struct rtrs_clt_sess *sess) 1387 { 1388 rtrs_start_hb(&sess->s); 1389 } 1390 1391 static void rtrs_clt_stop_hb(struct rtrs_clt_sess *sess) 1392 { 1393 rtrs_stop_hb(&sess->s); 1394 } 1395 1396 static void rtrs_clt_reconnect_work(struct work_struct *work); 1397 static void rtrs_clt_close_work(struct work_struct *work); 1398 1399 static struct rtrs_clt_sess *alloc_sess(struct rtrs_clt *clt, 1400 const struct rtrs_addr *path, 1401 size_t con_num, u16 max_segments, 1402 size_t max_segment_size) 1403 { 1404 struct rtrs_clt_sess *sess; 1405 int err = -ENOMEM; 1406 int cpu; 1407 1408 sess = kzalloc(sizeof(*sess), GFP_KERNEL); 1409 if (!sess) 1410 goto err; 1411 1412 /* Extra connection for user messages */ 1413 con_num += 1; 1414 1415 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL); 1416 if (!sess->s.con) 1417 goto err_free_sess; 1418 1419 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL); 1420 if (!sess->stats) 1421 goto err_free_con; 1422 1423 mutex_init(&sess->init_mutex); 1424 uuid_gen(&sess->s.uuid); 1425 memcpy(&sess->s.dst_addr, path->dst, 1426 rdma_addr_size((struct sockaddr *)path->dst)); 1427 1428 /* 1429 * rdma_resolve_addr() passes src_addr to cma_bind_addr, which 1430 * checks the sa_family to be non-zero. If user passed src_addr=NULL 1431 * the sess->src_addr will contain only zeros, which is then fine. 1432 */ 1433 if (path->src) 1434 memcpy(&sess->s.src_addr, path->src, 1435 rdma_addr_size((struct sockaddr *)path->src)); 1436 strlcpy(sess->s.sessname, clt->sessname, sizeof(sess->s.sessname)); 1437 sess->s.con_num = con_num; 1438 sess->clt = clt; 1439 sess->max_pages_per_mr = max_segments * max_segment_size >> 12; 1440 init_waitqueue_head(&sess->state_wq); 1441 sess->state = RTRS_CLT_CONNECTING; 1442 atomic_set(&sess->connected_cnt, 0); 1443 INIT_WORK(&sess->close_work, rtrs_clt_close_work); 1444 INIT_DELAYED_WORK(&sess->reconnect_dwork, rtrs_clt_reconnect_work); 1445 rtrs_clt_init_hb(sess); 1446 1447 sess->mp_skip_entry = alloc_percpu(typeof(*sess->mp_skip_entry)); 1448 if (!sess->mp_skip_entry) 1449 goto err_free_stats; 1450 1451 for_each_possible_cpu(cpu) 1452 INIT_LIST_HEAD(per_cpu_ptr(sess->mp_skip_entry, cpu)); 1453 1454 err = rtrs_clt_init_stats(sess->stats); 1455 if (err) 1456 goto err_free_percpu; 1457 1458 return sess; 1459 1460 err_free_percpu: 1461 free_percpu(sess->mp_skip_entry); 1462 err_free_stats: 1463 kfree(sess->stats); 1464 err_free_con: 1465 kfree(sess->s.con); 1466 err_free_sess: 1467 kfree(sess); 1468 err: 1469 return ERR_PTR(err); 1470 } 1471 1472 void free_sess(struct rtrs_clt_sess *sess) 1473 { 1474 free_percpu(sess->mp_skip_entry); 1475 mutex_destroy(&sess->init_mutex); 1476 kfree(sess->s.con); 1477 kfree(sess->rbufs); 1478 kfree(sess); 1479 } 1480 1481 static int create_con(struct rtrs_clt_sess *sess, unsigned int cid) 1482 { 1483 struct rtrs_clt_con *con; 1484 1485 con = kzalloc(sizeof(*con), GFP_KERNEL); 1486 if (!con) 1487 return -ENOMEM; 1488 1489 /* Map first two connections to the first CPU */ 1490 con->cpu = (cid ? cid - 1 : 0) % nr_cpu_ids; 1491 con->c.cid = cid; 1492 con->c.sess = &sess->s; 1493 atomic_set(&con->io_cnt, 0); 1494 1495 sess->s.con[cid] = &con->c; 1496 1497 return 0; 1498 } 1499 1500 static void destroy_con(struct rtrs_clt_con *con) 1501 { 1502 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1503 1504 sess->s.con[con->c.cid] = NULL; 1505 kfree(con); 1506 } 1507 1508 static int create_con_cq_qp(struct rtrs_clt_con *con) 1509 { 1510 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1511 u16 wr_queue_size; 1512 int err, cq_vector; 1513 struct rtrs_msg_rkey_rsp *rsp; 1514 1515 /* 1516 * This function can fail, but still destroy_con_cq_qp() should 1517 * be called, this is because create_con_cq_qp() is called on cm 1518 * event path, thus caller/waiter never knows: have we failed before 1519 * create_con_cq_qp() or after. To solve this dilemma without 1520 * creating any additional flags just allow destroy_con_cq_qp() be 1521 * called many times. 1522 */ 1523 1524 if (con->c.cid == 0) { 1525 /* 1526 * One completion for each receive and two for each send 1527 * (send request + registration) 1528 * + 2 for drain and heartbeat 1529 * in case qp gets into error state 1530 */ 1531 wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2; 1532 /* We must be the first here */ 1533 if (WARN_ON(sess->s.dev)) 1534 return -EINVAL; 1535 1536 /* 1537 * The whole session uses device from user connection. 1538 * Be careful not to close user connection before ib dev 1539 * is gracefully put. 1540 */ 1541 sess->s.dev = rtrs_ib_dev_find_or_add(con->c.cm_id->device, 1542 &dev_pd); 1543 if (!sess->s.dev) { 1544 rtrs_wrn(sess->clt, 1545 "rtrs_ib_dev_find_get_or_add(): no memory\n"); 1546 return -ENOMEM; 1547 } 1548 sess->s.dev_ref = 1; 1549 query_fast_reg_mode(sess); 1550 } else { 1551 /* 1552 * Here we assume that session members are correctly set. 1553 * This is always true if user connection (cid == 0) is 1554 * established first. 1555 */ 1556 if (WARN_ON(!sess->s.dev)) 1557 return -EINVAL; 1558 if (WARN_ON(!sess->queue_depth)) 1559 return -EINVAL; 1560 1561 /* Shared between connections */ 1562 sess->s.dev_ref++; 1563 wr_queue_size = 1564 min_t(int, sess->s.dev->ib_dev->attrs.max_qp_wr, 1565 /* QD * (REQ + RSP + FR REGS or INVS) + drain */ 1566 sess->queue_depth * 3 + 1); 1567 } 1568 /* alloc iu to recv new rkey reply when server reports flags set */ 1569 if (sess->flags == RTRS_MSG_NEW_RKEY_F || con->c.cid == 0) { 1570 con->rsp_ius = rtrs_iu_alloc(wr_queue_size, sizeof(*rsp), 1571 GFP_KERNEL, sess->s.dev->ib_dev, 1572 DMA_FROM_DEVICE, 1573 rtrs_clt_rdma_done); 1574 if (!con->rsp_ius) 1575 return -ENOMEM; 1576 con->queue_size = wr_queue_size; 1577 } 1578 cq_vector = con->cpu % sess->s.dev->ib_dev->num_comp_vectors; 1579 err = rtrs_cq_qp_create(&sess->s, &con->c, sess->max_send_sge, 1580 cq_vector, wr_queue_size, wr_queue_size, 1581 IB_POLL_SOFTIRQ); 1582 /* 1583 * In case of error we do not bother to clean previous allocations, 1584 * since destroy_con_cq_qp() must be called. 1585 */ 1586 return err; 1587 } 1588 1589 static void destroy_con_cq_qp(struct rtrs_clt_con *con) 1590 { 1591 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1592 1593 /* 1594 * Be careful here: destroy_con_cq_qp() can be called even 1595 * create_con_cq_qp() failed, see comments there. 1596 */ 1597 1598 rtrs_cq_qp_destroy(&con->c); 1599 if (con->rsp_ius) { 1600 rtrs_iu_free(con->rsp_ius, DMA_FROM_DEVICE, 1601 sess->s.dev->ib_dev, con->queue_size); 1602 con->rsp_ius = NULL; 1603 con->queue_size = 0; 1604 } 1605 if (sess->s.dev_ref && !--sess->s.dev_ref) { 1606 rtrs_ib_dev_put(sess->s.dev); 1607 sess->s.dev = NULL; 1608 } 1609 } 1610 1611 static void stop_cm(struct rtrs_clt_con *con) 1612 { 1613 rdma_disconnect(con->c.cm_id); 1614 if (con->c.qp) 1615 ib_drain_qp(con->c.qp); 1616 } 1617 1618 static void destroy_cm(struct rtrs_clt_con *con) 1619 { 1620 rdma_destroy_id(con->c.cm_id); 1621 con->c.cm_id = NULL; 1622 } 1623 1624 static int rtrs_rdma_addr_resolved(struct rtrs_clt_con *con) 1625 { 1626 struct rtrs_sess *s = con->c.sess; 1627 int err; 1628 1629 err = create_con_cq_qp(con); 1630 if (err) { 1631 rtrs_err(s, "create_con_cq_qp(), err: %d\n", err); 1632 return err; 1633 } 1634 err = rdma_resolve_route(con->c.cm_id, RTRS_CONNECT_TIMEOUT_MS); 1635 if (err) { 1636 rtrs_err(s, "Resolving route failed, err: %d\n", err); 1637 destroy_con_cq_qp(con); 1638 } 1639 1640 return err; 1641 } 1642 1643 static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con) 1644 { 1645 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1646 struct rtrs_clt *clt = sess->clt; 1647 struct rtrs_msg_conn_req msg; 1648 struct rdma_conn_param param; 1649 1650 int err; 1651 1652 param = (struct rdma_conn_param) { 1653 .retry_count = 7, 1654 .rnr_retry_count = 7, 1655 .private_data = &msg, 1656 .private_data_len = sizeof(msg), 1657 }; 1658 1659 msg = (struct rtrs_msg_conn_req) { 1660 .magic = cpu_to_le16(RTRS_MAGIC), 1661 .version = cpu_to_le16(RTRS_PROTO_VER), 1662 .cid = cpu_to_le16(con->c.cid), 1663 .cid_num = cpu_to_le16(sess->s.con_num), 1664 .recon_cnt = cpu_to_le16(sess->s.recon_cnt), 1665 }; 1666 uuid_copy(&msg.sess_uuid, &sess->s.uuid); 1667 uuid_copy(&msg.paths_uuid, &clt->paths_uuid); 1668 1669 err = rdma_connect(con->c.cm_id, ¶m); 1670 if (err) 1671 rtrs_err(clt, "rdma_connect(): %d\n", err); 1672 1673 return err; 1674 } 1675 1676 static int rtrs_rdma_conn_established(struct rtrs_clt_con *con, 1677 struct rdma_cm_event *ev) 1678 { 1679 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1680 struct rtrs_clt *clt = sess->clt; 1681 const struct rtrs_msg_conn_rsp *msg; 1682 u16 version, queue_depth; 1683 int errno; 1684 u8 len; 1685 1686 msg = ev->param.conn.private_data; 1687 len = ev->param.conn.private_data_len; 1688 if (len < sizeof(*msg)) { 1689 rtrs_err(clt, "Invalid RTRS connection response\n"); 1690 return -ECONNRESET; 1691 } 1692 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1693 rtrs_err(clt, "Invalid RTRS magic\n"); 1694 return -ECONNRESET; 1695 } 1696 version = le16_to_cpu(msg->version); 1697 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1698 rtrs_err(clt, "Unsupported major RTRS version: %d, expected %d\n", 1699 version >> 8, RTRS_PROTO_VER_MAJOR); 1700 return -ECONNRESET; 1701 } 1702 errno = le16_to_cpu(msg->errno); 1703 if (errno) { 1704 rtrs_err(clt, "Invalid RTRS message: errno %d\n", 1705 errno); 1706 return -ECONNRESET; 1707 } 1708 if (con->c.cid == 0) { 1709 queue_depth = le16_to_cpu(msg->queue_depth); 1710 1711 if (queue_depth > MAX_SESS_QUEUE_DEPTH) { 1712 rtrs_err(clt, "Invalid RTRS message: queue=%d\n", 1713 queue_depth); 1714 return -ECONNRESET; 1715 } 1716 if (!sess->rbufs || sess->queue_depth < queue_depth) { 1717 kfree(sess->rbufs); 1718 sess->rbufs = kcalloc(queue_depth, sizeof(*sess->rbufs), 1719 GFP_KERNEL); 1720 if (!sess->rbufs) 1721 return -ENOMEM; 1722 } 1723 sess->queue_depth = queue_depth; 1724 sess->max_hdr_size = le32_to_cpu(msg->max_hdr_size); 1725 sess->max_io_size = le32_to_cpu(msg->max_io_size); 1726 sess->flags = le32_to_cpu(msg->flags); 1727 sess->chunk_size = sess->max_io_size + sess->max_hdr_size; 1728 1729 /* 1730 * Global queue depth and IO size is always a minimum. 1731 * If while a reconnection server sends us a value a bit 1732 * higher - client does not care and uses cached minimum. 1733 * 1734 * Since we can have several sessions (paths) restablishing 1735 * connections in parallel, use lock. 1736 */ 1737 mutex_lock(&clt->paths_mutex); 1738 clt->queue_depth = min_not_zero(sess->queue_depth, 1739 clt->queue_depth); 1740 clt->max_io_size = min_not_zero(sess->max_io_size, 1741 clt->max_io_size); 1742 mutex_unlock(&clt->paths_mutex); 1743 1744 /* 1745 * Cache the hca_port and hca_name for sysfs 1746 */ 1747 sess->hca_port = con->c.cm_id->port_num; 1748 scnprintf(sess->hca_name, sizeof(sess->hca_name), 1749 sess->s.dev->ib_dev->name); 1750 sess->s.src_addr = con->c.cm_id->route.addr.src_addr; 1751 } 1752 1753 return 0; 1754 } 1755 1756 static inline void flag_success_on_conn(struct rtrs_clt_con *con) 1757 { 1758 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1759 1760 atomic_inc(&sess->connected_cnt); 1761 con->cm_err = 1; 1762 } 1763 1764 static int rtrs_rdma_conn_rejected(struct rtrs_clt_con *con, 1765 struct rdma_cm_event *ev) 1766 { 1767 struct rtrs_sess *s = con->c.sess; 1768 const struct rtrs_msg_conn_rsp *msg; 1769 const char *rej_msg; 1770 int status, errno; 1771 u8 data_len; 1772 1773 status = ev->status; 1774 rej_msg = rdma_reject_msg(con->c.cm_id, status); 1775 msg = rdma_consumer_reject_data(con->c.cm_id, ev, &data_len); 1776 1777 if (msg && data_len >= sizeof(*msg)) { 1778 errno = (int16_t)le16_to_cpu(msg->errno); 1779 if (errno == -EBUSY) 1780 rtrs_err(s, 1781 "Previous session is still exists on the server, please reconnect later\n"); 1782 else 1783 rtrs_err(s, 1784 "Connect rejected: status %d (%s), rtrs errno %d\n", 1785 status, rej_msg, errno); 1786 } else { 1787 rtrs_err(s, 1788 "Connect rejected but with malformed message: status %d (%s)\n", 1789 status, rej_msg); 1790 } 1791 1792 return -ECONNRESET; 1793 } 1794 1795 static void rtrs_clt_close_conns(struct rtrs_clt_sess *sess, bool wait) 1796 { 1797 if (rtrs_clt_change_state(sess, RTRS_CLT_CLOSING)) 1798 queue_work(rtrs_wq, &sess->close_work); 1799 if (wait) 1800 flush_work(&sess->close_work); 1801 } 1802 1803 static inline void flag_error_on_conn(struct rtrs_clt_con *con, int cm_err) 1804 { 1805 if (con->cm_err == 1) { 1806 struct rtrs_clt_sess *sess; 1807 1808 sess = to_clt_sess(con->c.sess); 1809 if (atomic_dec_and_test(&sess->connected_cnt)) 1810 1811 wake_up(&sess->state_wq); 1812 } 1813 con->cm_err = cm_err; 1814 } 1815 1816 static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id *cm_id, 1817 struct rdma_cm_event *ev) 1818 { 1819 struct rtrs_clt_con *con = cm_id->context; 1820 struct rtrs_sess *s = con->c.sess; 1821 struct rtrs_clt_sess *sess = to_clt_sess(s); 1822 int cm_err = 0; 1823 1824 switch (ev->event) { 1825 case RDMA_CM_EVENT_ADDR_RESOLVED: 1826 cm_err = rtrs_rdma_addr_resolved(con); 1827 break; 1828 case RDMA_CM_EVENT_ROUTE_RESOLVED: 1829 cm_err = rtrs_rdma_route_resolved(con); 1830 break; 1831 case RDMA_CM_EVENT_ESTABLISHED: 1832 con->cm_err = rtrs_rdma_conn_established(con, ev); 1833 if (likely(!con->cm_err)) { 1834 /* 1835 * Report success and wake up. Here we abuse state_wq, 1836 * i.e. wake up without state change, but we set cm_err. 1837 */ 1838 flag_success_on_conn(con); 1839 wake_up(&sess->state_wq); 1840 return 0; 1841 } 1842 break; 1843 case RDMA_CM_EVENT_REJECTED: 1844 cm_err = rtrs_rdma_conn_rejected(con, ev); 1845 break; 1846 case RDMA_CM_EVENT_CONNECT_ERROR: 1847 case RDMA_CM_EVENT_UNREACHABLE: 1848 rtrs_wrn(s, "CM error event %d\n", ev->event); 1849 cm_err = -ECONNRESET; 1850 break; 1851 case RDMA_CM_EVENT_ADDR_ERROR: 1852 case RDMA_CM_EVENT_ROUTE_ERROR: 1853 cm_err = -EHOSTUNREACH; 1854 break; 1855 case RDMA_CM_EVENT_DISCONNECTED: 1856 case RDMA_CM_EVENT_ADDR_CHANGE: 1857 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1858 cm_err = -ECONNRESET; 1859 break; 1860 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1861 /* 1862 * Device removal is a special case. Queue close and return 0. 1863 */ 1864 rtrs_clt_close_conns(sess, false); 1865 return 0; 1866 default: 1867 rtrs_err(s, "Unexpected RDMA CM event (%d)\n", ev->event); 1868 cm_err = -ECONNRESET; 1869 break; 1870 } 1871 1872 if (cm_err) { 1873 /* 1874 * cm error makes sense only on connection establishing, 1875 * in other cases we rely on normal procedure of reconnecting. 1876 */ 1877 flag_error_on_conn(con, cm_err); 1878 rtrs_rdma_error_recovery(con); 1879 } 1880 1881 return 0; 1882 } 1883 1884 static int create_cm(struct rtrs_clt_con *con) 1885 { 1886 struct rtrs_sess *s = con->c.sess; 1887 struct rtrs_clt_sess *sess = to_clt_sess(s); 1888 struct rdma_cm_id *cm_id; 1889 int err; 1890 1891 cm_id = rdma_create_id(&init_net, rtrs_clt_rdma_cm_handler, con, 1892 sess->s.dst_addr.ss_family == AF_IB ? 1893 RDMA_PS_IB : RDMA_PS_TCP, IB_QPT_RC); 1894 if (IS_ERR(cm_id)) { 1895 err = PTR_ERR(cm_id); 1896 rtrs_err(s, "Failed to create CM ID, err: %d\n", err); 1897 1898 return err; 1899 } 1900 con->c.cm_id = cm_id; 1901 con->cm_err = 0; 1902 /* allow the port to be reused */ 1903 err = rdma_set_reuseaddr(cm_id, 1); 1904 if (err != 0) { 1905 rtrs_err(s, "Set address reuse failed, err: %d\n", err); 1906 goto destroy_cm; 1907 } 1908 err = rdma_resolve_addr(cm_id, (struct sockaddr *)&sess->s.src_addr, 1909 (struct sockaddr *)&sess->s.dst_addr, 1910 RTRS_CONNECT_TIMEOUT_MS); 1911 if (err) { 1912 rtrs_err(s, "Failed to resolve address, err: %d\n", err); 1913 goto destroy_cm; 1914 } 1915 /* 1916 * Combine connection status and session events. This is needed 1917 * for waiting two possible cases: cm_err has something meaningful 1918 * or session state was really changed to error by device removal. 1919 */ 1920 err = wait_event_interruptible_timeout( 1921 sess->state_wq, 1922 con->cm_err || sess->state != RTRS_CLT_CONNECTING, 1923 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS)); 1924 if (err == 0 || err == -ERESTARTSYS) { 1925 if (err == 0) 1926 err = -ETIMEDOUT; 1927 /* Timedout or interrupted */ 1928 goto errr; 1929 } 1930 if (con->cm_err < 0) { 1931 err = con->cm_err; 1932 goto errr; 1933 } 1934 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTING) { 1935 /* Device removal */ 1936 err = -ECONNABORTED; 1937 goto errr; 1938 } 1939 1940 return 0; 1941 1942 errr: 1943 stop_cm(con); 1944 /* Is safe to call destroy if cq_qp is not inited */ 1945 destroy_con_cq_qp(con); 1946 destroy_cm: 1947 destroy_cm(con); 1948 1949 return err; 1950 } 1951 1952 static void rtrs_clt_sess_up(struct rtrs_clt_sess *sess) 1953 { 1954 struct rtrs_clt *clt = sess->clt; 1955 int up; 1956 1957 /* 1958 * We can fire RECONNECTED event only when all paths were 1959 * connected on rtrs_clt_open(), then each was disconnected 1960 * and the first one connected again. That's why this nasty 1961 * game with counter value. 1962 */ 1963 1964 mutex_lock(&clt->paths_ev_mutex); 1965 up = ++clt->paths_up; 1966 /* 1967 * Here it is safe to access paths num directly since up counter 1968 * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is 1969 * in progress, thus paths removals are impossible. 1970 */ 1971 if (up > MAX_PATHS_NUM && up == MAX_PATHS_NUM + clt->paths_num) 1972 clt->paths_up = clt->paths_num; 1973 else if (up == 1) 1974 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_RECONNECTED); 1975 mutex_unlock(&clt->paths_ev_mutex); 1976 1977 /* Mark session as established */ 1978 sess->established = true; 1979 sess->reconnect_attempts = 0; 1980 sess->stats->reconnects.successful_cnt++; 1981 } 1982 1983 static void rtrs_clt_sess_down(struct rtrs_clt_sess *sess) 1984 { 1985 struct rtrs_clt *clt = sess->clt; 1986 1987 if (!sess->established) 1988 return; 1989 1990 sess->established = false; 1991 mutex_lock(&clt->paths_ev_mutex); 1992 WARN_ON(!clt->paths_up); 1993 if (--clt->paths_up == 0) 1994 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_DISCONNECTED); 1995 mutex_unlock(&clt->paths_ev_mutex); 1996 } 1997 1998 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess *sess) 1999 { 2000 struct rtrs_clt_con *con; 2001 unsigned int cid; 2002 2003 WARN_ON(READ_ONCE(sess->state) == RTRS_CLT_CONNECTED); 2004 2005 /* 2006 * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes 2007 * exactly in between. Start destroying after it finishes. 2008 */ 2009 mutex_lock(&sess->init_mutex); 2010 mutex_unlock(&sess->init_mutex); 2011 2012 /* 2013 * All IO paths must observe !CONNECTED state before we 2014 * free everything. 2015 */ 2016 synchronize_rcu(); 2017 2018 rtrs_clt_stop_hb(sess); 2019 2020 /* 2021 * The order it utterly crucial: firstly disconnect and complete all 2022 * rdma requests with error (thus set in_use=false for requests), 2023 * then fail outstanding requests checking in_use for each, and 2024 * eventually notify upper layer about session disconnection. 2025 */ 2026 2027 for (cid = 0; cid < sess->s.con_num; cid++) { 2028 if (!sess->s.con[cid]) 2029 break; 2030 con = to_clt_con(sess->s.con[cid]); 2031 stop_cm(con); 2032 } 2033 fail_all_outstanding_reqs(sess); 2034 free_sess_reqs(sess); 2035 rtrs_clt_sess_down(sess); 2036 2037 /* 2038 * Wait for graceful shutdown, namely when peer side invokes 2039 * rdma_disconnect(). 'connected_cnt' is decremented only on 2040 * CM events, thus if other side had crashed and hb has detected 2041 * something is wrong, here we will stuck for exactly timeout ms, 2042 * since CM does not fire anything. That is fine, we are not in 2043 * hurry. 2044 */ 2045 wait_event_timeout(sess->state_wq, !atomic_read(&sess->connected_cnt), 2046 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS)); 2047 2048 for (cid = 0; cid < sess->s.con_num; cid++) { 2049 if (!sess->s.con[cid]) 2050 break; 2051 con = to_clt_con(sess->s.con[cid]); 2052 destroy_con_cq_qp(con); 2053 destroy_cm(con); 2054 destroy_con(con); 2055 } 2056 } 2057 2058 static inline bool xchg_sessions(struct rtrs_clt_sess __rcu **rcu_ppcpu_path, 2059 struct rtrs_clt_sess *sess, 2060 struct rtrs_clt_sess *next) 2061 { 2062 struct rtrs_clt_sess **ppcpu_path; 2063 2064 /* Call cmpxchg() without sparse warnings */ 2065 ppcpu_path = (typeof(ppcpu_path))rcu_ppcpu_path; 2066 return sess == cmpxchg(ppcpu_path, sess, next); 2067 } 2068 2069 static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess *sess) 2070 { 2071 struct rtrs_clt *clt = sess->clt; 2072 struct rtrs_clt_sess *next; 2073 bool wait_for_grace = false; 2074 int cpu; 2075 2076 mutex_lock(&clt->paths_mutex); 2077 list_del_rcu(&sess->s.entry); 2078 2079 /* Make sure everybody observes path removal. */ 2080 synchronize_rcu(); 2081 2082 /* 2083 * At this point nobody sees @sess in the list, but still we have 2084 * dangling pointer @pcpu_path which _can_ point to @sess. Since 2085 * nobody can observe @sess in the list, we guarantee that IO path 2086 * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal 2087 * to @sess, but can never again become @sess. 2088 */ 2089 2090 /* 2091 * Decrement paths number only after grace period, because 2092 * caller of do_each_path() must firstly observe list without 2093 * path and only then decremented paths number. 2094 * 2095 * Otherwise there can be the following situation: 2096 * o Two paths exist and IO is coming. 2097 * o One path is removed: 2098 * CPU#0 CPU#1 2099 * do_each_path(): rtrs_clt_remove_path_from_arr(): 2100 * path = get_next_path() 2101 * ^^^ list_del_rcu(path) 2102 * [!CONNECTED path] clt->paths_num-- 2103 * ^^^^^^^^^ 2104 * load clt->paths_num from 2 to 1 2105 * ^^^^^^^^^ 2106 * sees 1 2107 * 2108 * path is observed as !CONNECTED, but do_each_path() loop 2109 * ends, because expression i < clt->paths_num is false. 2110 */ 2111 clt->paths_num--; 2112 2113 /* 2114 * Get @next connection from current @sess which is going to be 2115 * removed. If @sess is the last element, then @next is NULL. 2116 */ 2117 rcu_read_lock(); 2118 next = list_next_or_null_rr_rcu(&clt->paths_list, &sess->s.entry, 2119 typeof(*next), s.entry); 2120 rcu_read_unlock(); 2121 2122 /* 2123 * @pcpu paths can still point to the path which is going to be 2124 * removed, so change the pointer manually. 2125 */ 2126 for_each_possible_cpu(cpu) { 2127 struct rtrs_clt_sess __rcu **ppcpu_path; 2128 2129 ppcpu_path = per_cpu_ptr(clt->pcpu_path, cpu); 2130 if (rcu_dereference_protected(*ppcpu_path, 2131 lockdep_is_held(&clt->paths_mutex)) != sess) 2132 /* 2133 * synchronize_rcu() was called just after deleting 2134 * entry from the list, thus IO code path cannot 2135 * change pointer back to the pointer which is going 2136 * to be removed, we are safe here. 2137 */ 2138 continue; 2139 2140 /* 2141 * We race with IO code path, which also changes pointer, 2142 * thus we have to be careful not to overwrite it. 2143 */ 2144 if (xchg_sessions(ppcpu_path, sess, next)) 2145 /* 2146 * @ppcpu_path was successfully replaced with @next, 2147 * that means that someone could also pick up the 2148 * @sess and dereferencing it right now, so wait for 2149 * a grace period is required. 2150 */ 2151 wait_for_grace = true; 2152 } 2153 if (wait_for_grace) 2154 synchronize_rcu(); 2155 2156 mutex_unlock(&clt->paths_mutex); 2157 } 2158 2159 static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess *sess, 2160 struct rtrs_addr *addr) 2161 { 2162 struct rtrs_clt *clt = sess->clt; 2163 2164 mutex_lock(&clt->paths_mutex); 2165 clt->paths_num++; 2166 2167 list_add_tail_rcu(&sess->s.entry, &clt->paths_list); 2168 mutex_unlock(&clt->paths_mutex); 2169 } 2170 2171 static void rtrs_clt_close_work(struct work_struct *work) 2172 { 2173 struct rtrs_clt_sess *sess; 2174 2175 sess = container_of(work, struct rtrs_clt_sess, close_work); 2176 2177 cancel_delayed_work_sync(&sess->reconnect_dwork); 2178 rtrs_clt_stop_and_destroy_conns(sess); 2179 rtrs_clt_change_state(sess, RTRS_CLT_CLOSED); 2180 } 2181 2182 static int init_conns(struct rtrs_clt_sess *sess) 2183 { 2184 unsigned int cid; 2185 int err; 2186 2187 /* 2188 * On every new session connections increase reconnect counter 2189 * to avoid clashes with previous sessions not yet closed 2190 * sessions on a server side. 2191 */ 2192 sess->s.recon_cnt++; 2193 2194 /* Establish all RDMA connections */ 2195 for (cid = 0; cid < sess->s.con_num; cid++) { 2196 err = create_con(sess, cid); 2197 if (err) 2198 goto destroy; 2199 2200 err = create_cm(to_clt_con(sess->s.con[cid])); 2201 if (err) { 2202 destroy_con(to_clt_con(sess->s.con[cid])); 2203 goto destroy; 2204 } 2205 } 2206 err = alloc_sess_reqs(sess); 2207 if (err) 2208 goto destroy; 2209 2210 rtrs_clt_start_hb(sess); 2211 2212 return 0; 2213 2214 destroy: 2215 while (cid--) { 2216 struct rtrs_clt_con *con = to_clt_con(sess->s.con[cid]); 2217 2218 stop_cm(con); 2219 destroy_con_cq_qp(con); 2220 destroy_cm(con); 2221 destroy_con(con); 2222 } 2223 /* 2224 * If we've never taken async path and got an error, say, 2225 * doing rdma_resolve_addr(), switch to CONNECTION_ERR state 2226 * manually to keep reconnecting. 2227 */ 2228 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2229 2230 return err; 2231 } 2232 2233 static void rtrs_clt_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 2234 { 2235 struct rtrs_clt_con *con = cq->cq_context; 2236 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 2237 struct rtrs_iu *iu; 2238 2239 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 2240 rtrs_iu_free(iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1); 2241 2242 if (unlikely(wc->status != IB_WC_SUCCESS)) { 2243 rtrs_err(sess->clt, "Sess info request send failed: %s\n", 2244 ib_wc_status_msg(wc->status)); 2245 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2246 return; 2247 } 2248 2249 rtrs_clt_update_wc_stats(con); 2250 } 2251 2252 static int process_info_rsp(struct rtrs_clt_sess *sess, 2253 const struct rtrs_msg_info_rsp *msg) 2254 { 2255 unsigned int sg_cnt, total_len; 2256 int i, sgi; 2257 2258 sg_cnt = le16_to_cpu(msg->sg_cnt); 2259 if (unlikely(!sg_cnt)) 2260 return -EINVAL; 2261 /* 2262 * Check if IB immediate data size is enough to hold the mem_id and 2263 * the offset inside the memory chunk. 2264 */ 2265 if (unlikely((ilog2(sg_cnt - 1) + 1) + 2266 (ilog2(sess->chunk_size - 1) + 1) > 2267 MAX_IMM_PAYL_BITS)) { 2268 rtrs_err(sess->clt, 2269 "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n", 2270 MAX_IMM_PAYL_BITS, sg_cnt, sess->chunk_size); 2271 return -EINVAL; 2272 } 2273 if (unlikely(!sg_cnt || (sess->queue_depth % sg_cnt))) { 2274 rtrs_err(sess->clt, "Incorrect sg_cnt %d, is not multiple\n", 2275 sg_cnt); 2276 return -EINVAL; 2277 } 2278 total_len = 0; 2279 for (sgi = 0, i = 0; sgi < sg_cnt && i < sess->queue_depth; sgi++) { 2280 const struct rtrs_sg_desc *desc = &msg->desc[sgi]; 2281 u32 len, rkey; 2282 u64 addr; 2283 2284 addr = le64_to_cpu(desc->addr); 2285 rkey = le32_to_cpu(desc->key); 2286 len = le32_to_cpu(desc->len); 2287 2288 total_len += len; 2289 2290 if (unlikely(!len || (len % sess->chunk_size))) { 2291 rtrs_err(sess->clt, "Incorrect [%d].len %d\n", sgi, 2292 len); 2293 return -EINVAL; 2294 } 2295 for ( ; len && i < sess->queue_depth; i++) { 2296 sess->rbufs[i].addr = addr; 2297 sess->rbufs[i].rkey = rkey; 2298 2299 len -= sess->chunk_size; 2300 addr += sess->chunk_size; 2301 } 2302 } 2303 /* Sanity check */ 2304 if (unlikely(sgi != sg_cnt || i != sess->queue_depth)) { 2305 rtrs_err(sess->clt, "Incorrect sg vector, not fully mapped\n"); 2306 return -EINVAL; 2307 } 2308 if (unlikely(total_len != sess->chunk_size * sess->queue_depth)) { 2309 rtrs_err(sess->clt, "Incorrect total_len %d\n", total_len); 2310 return -EINVAL; 2311 } 2312 2313 return 0; 2314 } 2315 2316 static void rtrs_clt_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 2317 { 2318 struct rtrs_clt_con *con = cq->cq_context; 2319 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 2320 struct rtrs_msg_info_rsp *msg; 2321 enum rtrs_clt_state state; 2322 struct rtrs_iu *iu; 2323 size_t rx_sz; 2324 int err; 2325 2326 state = RTRS_CLT_CONNECTING_ERR; 2327 2328 WARN_ON(con->c.cid); 2329 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 2330 if (unlikely(wc->status != IB_WC_SUCCESS)) { 2331 rtrs_err(sess->clt, "Sess info response recv failed: %s\n", 2332 ib_wc_status_msg(wc->status)); 2333 goto out; 2334 } 2335 WARN_ON(wc->opcode != IB_WC_RECV); 2336 2337 if (unlikely(wc->byte_len < sizeof(*msg))) { 2338 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n", 2339 wc->byte_len); 2340 goto out; 2341 } 2342 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 2343 iu->size, DMA_FROM_DEVICE); 2344 msg = iu->buf; 2345 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_RSP)) { 2346 rtrs_err(sess->clt, "Sess info response is malformed: type %d\n", 2347 le16_to_cpu(msg->type)); 2348 goto out; 2349 } 2350 rx_sz = sizeof(*msg); 2351 rx_sz += sizeof(msg->desc[0]) * le16_to_cpu(msg->sg_cnt); 2352 if (unlikely(wc->byte_len < rx_sz)) { 2353 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n", 2354 wc->byte_len); 2355 goto out; 2356 } 2357 err = process_info_rsp(sess, msg); 2358 if (unlikely(err)) 2359 goto out; 2360 2361 err = post_recv_sess(sess); 2362 if (unlikely(err)) 2363 goto out; 2364 2365 state = RTRS_CLT_CONNECTED; 2366 2367 out: 2368 rtrs_clt_update_wc_stats(con); 2369 rtrs_iu_free(iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1); 2370 rtrs_clt_change_state(sess, state); 2371 } 2372 2373 static int rtrs_send_sess_info(struct rtrs_clt_sess *sess) 2374 { 2375 struct rtrs_clt_con *usr_con = to_clt_con(sess->s.con[0]); 2376 struct rtrs_msg_info_req *msg; 2377 struct rtrs_iu *tx_iu, *rx_iu; 2378 size_t rx_sz; 2379 int err; 2380 2381 rx_sz = sizeof(struct rtrs_msg_info_rsp); 2382 rx_sz += sizeof(u64) * MAX_SESS_QUEUE_DEPTH; 2383 2384 tx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), GFP_KERNEL, 2385 sess->s.dev->ib_dev, DMA_TO_DEVICE, 2386 rtrs_clt_info_req_done); 2387 rx_iu = rtrs_iu_alloc(1, rx_sz, GFP_KERNEL, sess->s.dev->ib_dev, 2388 DMA_FROM_DEVICE, rtrs_clt_info_rsp_done); 2389 if (unlikely(!tx_iu || !rx_iu)) { 2390 err = -ENOMEM; 2391 goto out; 2392 } 2393 /* Prepare for getting info response */ 2394 err = rtrs_iu_post_recv(&usr_con->c, rx_iu); 2395 if (unlikely(err)) { 2396 rtrs_err(sess->clt, "rtrs_iu_post_recv(), err: %d\n", err); 2397 goto out; 2398 } 2399 rx_iu = NULL; 2400 2401 msg = tx_iu->buf; 2402 msg->type = cpu_to_le16(RTRS_MSG_INFO_REQ); 2403 memcpy(msg->sessname, sess->s.sessname, sizeof(msg->sessname)); 2404 2405 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr, 2406 tx_iu->size, DMA_TO_DEVICE); 2407 2408 /* Send info request */ 2409 err = rtrs_iu_post_send(&usr_con->c, tx_iu, sizeof(*msg), NULL); 2410 if (unlikely(err)) { 2411 rtrs_err(sess->clt, "rtrs_iu_post_send(), err: %d\n", err); 2412 goto out; 2413 } 2414 tx_iu = NULL; 2415 2416 /* Wait for state change */ 2417 wait_event_interruptible_timeout(sess->state_wq, 2418 sess->state != RTRS_CLT_CONNECTING, 2419 msecs_to_jiffies( 2420 RTRS_CONNECT_TIMEOUT_MS)); 2421 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) { 2422 if (READ_ONCE(sess->state) == RTRS_CLT_CONNECTING_ERR) 2423 err = -ECONNRESET; 2424 else 2425 err = -ETIMEDOUT; 2426 goto out; 2427 } 2428 2429 out: 2430 if (tx_iu) 2431 rtrs_iu_free(tx_iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1); 2432 if (rx_iu) 2433 rtrs_iu_free(rx_iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1); 2434 if (unlikely(err)) 2435 /* If we've never taken async path because of malloc problems */ 2436 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2437 2438 return err; 2439 } 2440 2441 /** 2442 * init_sess() - establishes all session connections and does handshake 2443 * @sess: client session. 2444 * In case of error full close or reconnect procedure should be taken, 2445 * because reconnect or close async works can be started. 2446 */ 2447 static int init_sess(struct rtrs_clt_sess *sess) 2448 { 2449 int err; 2450 2451 mutex_lock(&sess->init_mutex); 2452 err = init_conns(sess); 2453 if (err) { 2454 rtrs_err(sess->clt, "init_conns(), err: %d\n", err); 2455 goto out; 2456 } 2457 err = rtrs_send_sess_info(sess); 2458 if (err) { 2459 rtrs_err(sess->clt, "rtrs_send_sess_info(), err: %d\n", err); 2460 goto out; 2461 } 2462 rtrs_clt_sess_up(sess); 2463 out: 2464 mutex_unlock(&sess->init_mutex); 2465 2466 return err; 2467 } 2468 2469 static void rtrs_clt_reconnect_work(struct work_struct *work) 2470 { 2471 struct rtrs_clt_sess *sess; 2472 struct rtrs_clt *clt; 2473 unsigned int delay_ms; 2474 int err; 2475 2476 sess = container_of(to_delayed_work(work), struct rtrs_clt_sess, 2477 reconnect_dwork); 2478 clt = sess->clt; 2479 2480 if (READ_ONCE(sess->state) != RTRS_CLT_RECONNECTING) 2481 return; 2482 2483 if (sess->reconnect_attempts >= clt->max_reconnect_attempts) { 2484 /* Close a session completely if max attempts is reached */ 2485 rtrs_clt_close_conns(sess, false); 2486 return; 2487 } 2488 sess->reconnect_attempts++; 2489 2490 /* Stop everything */ 2491 rtrs_clt_stop_and_destroy_conns(sess); 2492 msleep(RTRS_RECONNECT_BACKOFF); 2493 if (rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING)) { 2494 err = init_sess(sess); 2495 if (err) 2496 goto reconnect_again; 2497 } 2498 2499 return; 2500 2501 reconnect_again: 2502 if (rtrs_clt_change_state(sess, RTRS_CLT_RECONNECTING)) { 2503 sess->stats->reconnects.fail_cnt++; 2504 delay_ms = clt->reconnect_delay_sec * 1000; 2505 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 2506 msecs_to_jiffies(delay_ms)); 2507 } 2508 } 2509 2510 static void rtrs_clt_dev_release(struct device *dev) 2511 { 2512 struct rtrs_clt *clt = container_of(dev, struct rtrs_clt, dev); 2513 2514 kfree(clt); 2515 } 2516 2517 static struct rtrs_clt *alloc_clt(const char *sessname, size_t paths_num, 2518 u16 port, size_t pdu_sz, void *priv, 2519 void (*link_ev)(void *priv, 2520 enum rtrs_clt_link_ev ev), 2521 unsigned int max_segments, 2522 size_t max_segment_size, 2523 unsigned int reconnect_delay_sec, 2524 unsigned int max_reconnect_attempts) 2525 { 2526 struct rtrs_clt *clt; 2527 int err; 2528 2529 if (!paths_num || paths_num > MAX_PATHS_NUM) 2530 return ERR_PTR(-EINVAL); 2531 2532 if (strlen(sessname) >= sizeof(clt->sessname)) 2533 return ERR_PTR(-EINVAL); 2534 2535 clt = kzalloc(sizeof(*clt), GFP_KERNEL); 2536 if (!clt) 2537 return ERR_PTR(-ENOMEM); 2538 2539 clt->pcpu_path = alloc_percpu(typeof(*clt->pcpu_path)); 2540 if (!clt->pcpu_path) { 2541 kfree(clt); 2542 return ERR_PTR(-ENOMEM); 2543 } 2544 2545 uuid_gen(&clt->paths_uuid); 2546 INIT_LIST_HEAD_RCU(&clt->paths_list); 2547 clt->paths_num = paths_num; 2548 clt->paths_up = MAX_PATHS_NUM; 2549 clt->port = port; 2550 clt->pdu_sz = pdu_sz; 2551 clt->max_segments = max_segments; 2552 clt->max_segment_size = max_segment_size; 2553 clt->reconnect_delay_sec = reconnect_delay_sec; 2554 clt->max_reconnect_attempts = max_reconnect_attempts; 2555 clt->priv = priv; 2556 clt->link_ev = link_ev; 2557 clt->mp_policy = MP_POLICY_MIN_INFLIGHT; 2558 strlcpy(clt->sessname, sessname, sizeof(clt->sessname)); 2559 init_waitqueue_head(&clt->permits_wait); 2560 mutex_init(&clt->paths_ev_mutex); 2561 mutex_init(&clt->paths_mutex); 2562 2563 clt->dev.class = rtrs_clt_dev_class; 2564 clt->dev.release = rtrs_clt_dev_release; 2565 err = dev_set_name(&clt->dev, "%s", sessname); 2566 if (err) { 2567 free_percpu(clt->pcpu_path); 2568 kfree(clt); 2569 return ERR_PTR(err); 2570 } 2571 /* 2572 * Suppress user space notification until 2573 * sysfs files are created 2574 */ 2575 dev_set_uevent_suppress(&clt->dev, true); 2576 err = device_register(&clt->dev); 2577 if (err) { 2578 free_percpu(clt->pcpu_path); 2579 put_device(&clt->dev); 2580 return ERR_PTR(err); 2581 } 2582 2583 clt->kobj_paths = kobject_create_and_add("paths", &clt->dev.kobj); 2584 if (!clt->kobj_paths) { 2585 free_percpu(clt->pcpu_path); 2586 device_unregister(&clt->dev); 2587 return NULL; 2588 } 2589 err = rtrs_clt_create_sysfs_root_files(clt); 2590 if (err) { 2591 free_percpu(clt->pcpu_path); 2592 kobject_del(clt->kobj_paths); 2593 kobject_put(clt->kobj_paths); 2594 device_unregister(&clt->dev); 2595 return ERR_PTR(err); 2596 } 2597 dev_set_uevent_suppress(&clt->dev, false); 2598 kobject_uevent(&clt->dev.kobj, KOBJ_ADD); 2599 2600 return clt; 2601 } 2602 2603 static void wait_for_inflight_permits(struct rtrs_clt *clt) 2604 { 2605 if (clt->permits_map) { 2606 size_t sz = clt->queue_depth; 2607 2608 wait_event(clt->permits_wait, 2609 find_first_bit(clt->permits_map, sz) >= sz); 2610 } 2611 } 2612 2613 static void free_clt(struct rtrs_clt *clt) 2614 { 2615 wait_for_inflight_permits(clt); 2616 free_permits(clt); 2617 free_percpu(clt->pcpu_path); 2618 mutex_destroy(&clt->paths_ev_mutex); 2619 mutex_destroy(&clt->paths_mutex); 2620 /* release callback will free clt in last put */ 2621 device_unregister(&clt->dev); 2622 } 2623 2624 /** 2625 * rtrs_clt_open() - Open a session to an RTRS server 2626 * @ops: holds the link event callback and the private pointer. 2627 * @sessname: name of the session 2628 * @paths: Paths to be established defined by their src and dst addresses 2629 * @paths_num: Number of elements in the @paths array 2630 * @port: port to be used by the RTRS session 2631 * @pdu_sz: Size of extra payload which can be accessed after permit allocation. 2632 * @reconnect_delay_sec: time between reconnect tries 2633 * @max_segments: Max. number of segments per IO request 2634 * @max_segment_size: Max. size of one segment 2635 * @max_reconnect_attempts: Number of times to reconnect on error before giving 2636 * up, 0 for * disabled, -1 for forever 2637 * 2638 * Starts session establishment with the rtrs_server. The function can block 2639 * up to ~2000ms before it returns. 2640 * 2641 * Return a valid pointer on success otherwise PTR_ERR. 2642 */ 2643 struct rtrs_clt *rtrs_clt_open(struct rtrs_clt_ops *ops, 2644 const char *sessname, 2645 const struct rtrs_addr *paths, 2646 size_t paths_num, u16 port, 2647 size_t pdu_sz, u8 reconnect_delay_sec, 2648 u16 max_segments, 2649 size_t max_segment_size, 2650 s16 max_reconnect_attempts) 2651 { 2652 struct rtrs_clt_sess *sess, *tmp; 2653 struct rtrs_clt *clt; 2654 int err, i; 2655 2656 clt = alloc_clt(sessname, paths_num, port, pdu_sz, ops->priv, 2657 ops->link_ev, 2658 max_segments, max_segment_size, reconnect_delay_sec, 2659 max_reconnect_attempts); 2660 if (IS_ERR(clt)) { 2661 err = PTR_ERR(clt); 2662 goto out; 2663 } 2664 for (i = 0; i < paths_num; i++) { 2665 struct rtrs_clt_sess *sess; 2666 2667 sess = alloc_sess(clt, &paths[i], nr_cpu_ids, 2668 max_segments, max_segment_size); 2669 if (IS_ERR(sess)) { 2670 err = PTR_ERR(sess); 2671 goto close_all_sess; 2672 } 2673 list_add_tail_rcu(&sess->s.entry, &clt->paths_list); 2674 2675 err = init_sess(sess); 2676 if (err) { 2677 list_del_rcu(&sess->s.entry); 2678 rtrs_clt_close_conns(sess, true); 2679 free_sess(sess); 2680 goto close_all_sess; 2681 } 2682 2683 err = rtrs_clt_create_sess_files(sess); 2684 if (err) { 2685 list_del_rcu(&sess->s.entry); 2686 rtrs_clt_close_conns(sess, true); 2687 free_sess(sess); 2688 goto close_all_sess; 2689 } 2690 } 2691 err = alloc_permits(clt); 2692 if (err) 2693 goto close_all_sess; 2694 2695 return clt; 2696 2697 close_all_sess: 2698 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) { 2699 rtrs_clt_destroy_sess_files(sess, NULL); 2700 rtrs_clt_close_conns(sess, true); 2701 kobject_put(&sess->kobj); 2702 } 2703 rtrs_clt_destroy_sysfs_root_files(clt); 2704 rtrs_clt_destroy_sysfs_root_folders(clt); 2705 free_clt(clt); 2706 2707 out: 2708 return ERR_PTR(err); 2709 } 2710 EXPORT_SYMBOL(rtrs_clt_open); 2711 2712 /** 2713 * rtrs_clt_close() - Close a session 2714 * @clt: Session handle. Session is freed upon return. 2715 */ 2716 void rtrs_clt_close(struct rtrs_clt *clt) 2717 { 2718 struct rtrs_clt_sess *sess, *tmp; 2719 2720 /* Firstly forbid sysfs access */ 2721 rtrs_clt_destroy_sysfs_root_files(clt); 2722 rtrs_clt_destroy_sysfs_root_folders(clt); 2723 2724 /* Now it is safe to iterate over all paths without locks */ 2725 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) { 2726 rtrs_clt_destroy_sess_files(sess, NULL); 2727 rtrs_clt_close_conns(sess, true); 2728 kobject_put(&sess->kobj); 2729 } 2730 free_clt(clt); 2731 } 2732 EXPORT_SYMBOL(rtrs_clt_close); 2733 2734 int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess *sess) 2735 { 2736 enum rtrs_clt_state old_state; 2737 int err = -EBUSY; 2738 bool changed; 2739 2740 changed = rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING, 2741 &old_state); 2742 if (changed) { 2743 sess->reconnect_attempts = 0; 2744 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 0); 2745 } 2746 if (changed || old_state == RTRS_CLT_RECONNECTING) { 2747 /* 2748 * flush_delayed_work() queues pending work for immediate 2749 * execution, so do the flush if we have queued something 2750 * right now or work is pending. 2751 */ 2752 flush_delayed_work(&sess->reconnect_dwork); 2753 err = (READ_ONCE(sess->state) == 2754 RTRS_CLT_CONNECTED ? 0 : -ENOTCONN); 2755 } 2756 2757 return err; 2758 } 2759 2760 int rtrs_clt_disconnect_from_sysfs(struct rtrs_clt_sess *sess) 2761 { 2762 rtrs_clt_close_conns(sess, true); 2763 2764 return 0; 2765 } 2766 2767 int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess *sess, 2768 const struct attribute *sysfs_self) 2769 { 2770 enum rtrs_clt_state old_state; 2771 bool changed; 2772 2773 /* 2774 * Continue stopping path till state was changed to DEAD or 2775 * state was observed as DEAD: 2776 * 1. State was changed to DEAD - we were fast and nobody 2777 * invoked rtrs_clt_reconnect(), which can again start 2778 * reconnecting. 2779 * 2. State was observed as DEAD - we have someone in parallel 2780 * removing the path. 2781 */ 2782 do { 2783 rtrs_clt_close_conns(sess, true); 2784 changed = rtrs_clt_change_state_get_old(sess, 2785 RTRS_CLT_DEAD, 2786 &old_state); 2787 } while (!changed && old_state != RTRS_CLT_DEAD); 2788 2789 if (likely(changed)) { 2790 rtrs_clt_destroy_sess_files(sess, sysfs_self); 2791 rtrs_clt_remove_path_from_arr(sess); 2792 kobject_put(&sess->kobj); 2793 } 2794 2795 return 0; 2796 } 2797 2798 void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt *clt, int value) 2799 { 2800 clt->max_reconnect_attempts = (unsigned int)value; 2801 } 2802 2803 int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt *clt) 2804 { 2805 return (int)clt->max_reconnect_attempts; 2806 } 2807 2808 /** 2809 * rtrs_clt_request() - Request data transfer to/from server via RDMA. 2810 * 2811 * @dir: READ/WRITE 2812 * @ops: callback function to be called as confirmation, and the pointer. 2813 * @clt: Session 2814 * @permit: Preallocated permit 2815 * @vec: Message that is sent to server together with the request. 2816 * Sum of len of all @vec elements limited to <= IO_MSG_SIZE. 2817 * Since the msg is copied internally it can be allocated on stack. 2818 * @nr: Number of elements in @vec. 2819 * @data_len: length of data sent to/from server 2820 * @sg: Pages to be sent/received to/from server. 2821 * @sg_cnt: Number of elements in the @sg 2822 * 2823 * Return: 2824 * 0: Success 2825 * <0: Error 2826 * 2827 * On dir=READ rtrs client will request a data transfer from Server to client. 2828 * The data that the server will respond with will be stored in @sg when 2829 * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event. 2830 * On dir=WRITE rtrs client will rdma write data in sg to server side. 2831 */ 2832 int rtrs_clt_request(int dir, struct rtrs_clt_req_ops *ops, 2833 struct rtrs_clt *clt, struct rtrs_permit *permit, 2834 const struct kvec *vec, size_t nr, size_t data_len, 2835 struct scatterlist *sg, unsigned int sg_cnt) 2836 { 2837 struct rtrs_clt_io_req *req; 2838 struct rtrs_clt_sess *sess; 2839 2840 enum dma_data_direction dma_dir; 2841 int err = -ECONNABORTED, i; 2842 size_t usr_len, hdr_len; 2843 struct path_it it; 2844 2845 /* Get kvec length */ 2846 for (i = 0, usr_len = 0; i < nr; i++) 2847 usr_len += vec[i].iov_len; 2848 2849 if (dir == READ) { 2850 hdr_len = sizeof(struct rtrs_msg_rdma_read) + 2851 sg_cnt * sizeof(struct rtrs_sg_desc); 2852 dma_dir = DMA_FROM_DEVICE; 2853 } else { 2854 hdr_len = sizeof(struct rtrs_msg_rdma_write); 2855 dma_dir = DMA_TO_DEVICE; 2856 } 2857 2858 rcu_read_lock(); 2859 for (path_it_init(&it, clt); 2860 (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) { 2861 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) 2862 continue; 2863 2864 if (unlikely(usr_len + hdr_len > sess->max_hdr_size)) { 2865 rtrs_wrn_rl(sess->clt, 2866 "%s request failed, user message size is %zu and header length %zu, but max size is %u\n", 2867 dir == READ ? "Read" : "Write", 2868 usr_len, hdr_len, sess->max_hdr_size); 2869 err = -EMSGSIZE; 2870 break; 2871 } 2872 req = rtrs_clt_get_req(sess, ops->conf_fn, permit, ops->priv, 2873 vec, usr_len, sg, sg_cnt, data_len, 2874 dma_dir); 2875 if (dir == READ) 2876 err = rtrs_clt_read_req(req); 2877 else 2878 err = rtrs_clt_write_req(req); 2879 if (unlikely(err)) { 2880 req->in_use = false; 2881 continue; 2882 } 2883 /* Success path */ 2884 break; 2885 } 2886 path_it_deinit(&it); 2887 rcu_read_unlock(); 2888 2889 return err; 2890 } 2891 EXPORT_SYMBOL(rtrs_clt_request); 2892 2893 /** 2894 * rtrs_clt_query() - queries RTRS session attributes 2895 *@clt: session pointer 2896 *@attr: query results for session attributes. 2897 * Returns: 2898 * 0 on success 2899 * -ECOMM no connection to the server 2900 */ 2901 int rtrs_clt_query(struct rtrs_clt *clt, struct rtrs_attrs *attr) 2902 { 2903 if (!rtrs_clt_is_connected(clt)) 2904 return -ECOMM; 2905 2906 attr->queue_depth = clt->queue_depth; 2907 attr->max_io_size = clt->max_io_size; 2908 attr->sess_kobj = &clt->dev.kobj; 2909 strlcpy(attr->sessname, clt->sessname, sizeof(attr->sessname)); 2910 2911 return 0; 2912 } 2913 EXPORT_SYMBOL(rtrs_clt_query); 2914 2915 int rtrs_clt_create_path_from_sysfs(struct rtrs_clt *clt, 2916 struct rtrs_addr *addr) 2917 { 2918 struct rtrs_clt_sess *sess; 2919 int err; 2920 2921 sess = alloc_sess(clt, addr, nr_cpu_ids, clt->max_segments, 2922 clt->max_segment_size); 2923 if (IS_ERR(sess)) 2924 return PTR_ERR(sess); 2925 2926 /* 2927 * It is totally safe to add path in CONNECTING state: coming 2928 * IO will never grab it. Also it is very important to add 2929 * path before init, since init fires LINK_CONNECTED event. 2930 */ 2931 rtrs_clt_add_path_to_arr(sess, addr); 2932 2933 err = init_sess(sess); 2934 if (err) 2935 goto close_sess; 2936 2937 err = rtrs_clt_create_sess_files(sess); 2938 if (err) 2939 goto close_sess; 2940 2941 return 0; 2942 2943 close_sess: 2944 rtrs_clt_remove_path_from_arr(sess); 2945 rtrs_clt_close_conns(sess, true); 2946 free_sess(sess); 2947 2948 return err; 2949 } 2950 2951 static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev *dev) 2952 { 2953 if (!(dev->ib_dev->attrs.device_cap_flags & 2954 IB_DEVICE_MEM_MGT_EXTENSIONS)) { 2955 pr_err("Memory registrations not supported.\n"); 2956 return -ENOTSUPP; 2957 } 2958 2959 return 0; 2960 } 2961 2962 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = { 2963 .init = rtrs_clt_ib_dev_init 2964 }; 2965 2966 static int __init rtrs_client_init(void) 2967 { 2968 rtrs_rdma_dev_pd_init(0, &dev_pd); 2969 2970 rtrs_clt_dev_class = class_create(THIS_MODULE, "rtrs-client"); 2971 if (IS_ERR(rtrs_clt_dev_class)) { 2972 pr_err("Failed to create rtrs-client dev class\n"); 2973 return PTR_ERR(rtrs_clt_dev_class); 2974 } 2975 rtrs_wq = alloc_workqueue("rtrs_client_wq", WQ_MEM_RECLAIM, 0); 2976 if (!rtrs_wq) { 2977 class_destroy(rtrs_clt_dev_class); 2978 return -ENOMEM; 2979 } 2980 2981 return 0; 2982 } 2983 2984 static void __exit rtrs_client_exit(void) 2985 { 2986 destroy_workqueue(rtrs_wq); 2987 class_destroy(rtrs_clt_dev_class); 2988 rtrs_rdma_dev_pd_deinit(&dev_pd); 2989 } 2990 2991 module_init(rtrs_client_init); 2992 module_exit(rtrs_client_exit); 2993