1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Network Block Driver 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/blkdev.h> 15 #include <linux/hdreg.h> 16 #include <linux/scatterlist.h> 17 #include <linux/idr.h> 18 19 #include "rnbd-clt.h" 20 21 MODULE_DESCRIPTION("RDMA Network Block Device Client"); 22 MODULE_LICENSE("GPL"); 23 24 static int rnbd_client_major; 25 static DEFINE_IDA(index_ida); 26 static DEFINE_MUTEX(sess_lock); 27 static LIST_HEAD(sess_list); 28 static struct workqueue_struct *rnbd_clt_wq; 29 30 /* 31 * Maximum number of partitions an instance can have. 32 * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself) 33 */ 34 #define RNBD_PART_BITS 6 35 36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess) 37 { 38 return refcount_inc_not_zero(&sess->refcount); 39 } 40 41 static void free_sess(struct rnbd_clt_session *sess); 42 43 static void rnbd_clt_put_sess(struct rnbd_clt_session *sess) 44 { 45 might_sleep(); 46 47 if (refcount_dec_and_test(&sess->refcount)) 48 free_sess(sess); 49 } 50 51 static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev) 52 { 53 might_sleep(); 54 55 if (!refcount_dec_and_test(&dev->refcount)) 56 return; 57 58 ida_free(&index_ida, dev->clt_device_id); 59 kfree(dev->hw_queues); 60 kfree(dev->pathname); 61 rnbd_clt_put_sess(dev->sess); 62 mutex_destroy(&dev->lock); 63 64 if (dev->kobj.state_initialized) 65 kobject_put(&dev->kobj); 66 } 67 68 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev) 69 { 70 return refcount_inc_not_zero(&dev->refcount); 71 } 72 73 static void rnbd_clt_change_capacity(struct rnbd_clt_dev *dev, 74 sector_t new_nsectors) 75 { 76 if (get_capacity(dev->gd) == new_nsectors) 77 return; 78 79 /* 80 * If the size changed, we need to revalidate it 81 */ 82 rnbd_clt_info(dev, "Device size changed from %llu to %llu sectors\n", 83 get_capacity(dev->gd), new_nsectors); 84 set_capacity_and_notify(dev->gd, new_nsectors); 85 } 86 87 static int process_msg_open_rsp(struct rnbd_clt_dev *dev, 88 struct rnbd_msg_open_rsp *rsp) 89 { 90 struct kobject *gd_kobj; 91 int err = 0; 92 93 mutex_lock(&dev->lock); 94 if (dev->dev_state == DEV_STATE_UNMAPPED) { 95 rnbd_clt_info(dev, 96 "Ignoring Open-Response message from server for unmapped device\n"); 97 err = -ENOENT; 98 goto out; 99 } 100 if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) { 101 u64 nsectors = le64_to_cpu(rsp->nsectors); 102 103 rnbd_clt_change_capacity(dev, nsectors); 104 gd_kobj = &disk_to_dev(dev->gd)->kobj; 105 kobject_uevent(gd_kobj, KOBJ_ONLINE); 106 rnbd_clt_info(dev, "Device online, device remapped successfully\n"); 107 } 108 if (!rsp->logical_block_size) { 109 err = -EINVAL; 110 goto out; 111 } 112 dev->device_id = le32_to_cpu(rsp->device_id); 113 dev->dev_state = DEV_STATE_MAPPED; 114 115 out: 116 mutex_unlock(&dev->lock); 117 118 return err; 119 } 120 121 int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, sector_t newsize) 122 { 123 int ret = 0; 124 125 mutex_lock(&dev->lock); 126 if (dev->dev_state != DEV_STATE_MAPPED) { 127 pr_err("Failed to set new size of the device, device is not opened\n"); 128 ret = -ENOENT; 129 goto out; 130 } 131 rnbd_clt_change_capacity(dev, newsize); 132 133 out: 134 mutex_unlock(&dev->lock); 135 136 return ret; 137 } 138 139 static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q) 140 { 141 if (WARN_ON(!q->hctx)) 142 return; 143 144 /* We can come here from interrupt, thus async=true */ 145 blk_mq_run_hw_queue(q->hctx, true); 146 } 147 148 enum { 149 RNBD_DELAY_IFBUSY = -1, 150 }; 151 152 /** 153 * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun 154 * @sess: Session to find a queue for 155 * @cpu: Cpu to start the search from 156 * 157 * Description: 158 * Each CPU has a list of HW queues, which needs to be rerun. If a list 159 * is not empty - it is marked with a bit. This function finds first 160 * set bit in a bitmap and returns corresponding CPU list. 161 */ 162 static struct rnbd_cpu_qlist * 163 rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu) 164 { 165 int bit; 166 167 /* Search from cpu to nr_cpu_ids */ 168 bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu); 169 if (bit < nr_cpu_ids) { 170 return per_cpu_ptr(sess->cpu_queues, bit); 171 } else if (cpu != 0) { 172 /* Search from 0 to cpu */ 173 bit = find_first_bit(sess->cpu_queues_bm, cpu); 174 if (bit < cpu) 175 return per_cpu_ptr(sess->cpu_queues, bit); 176 } 177 178 return NULL; 179 } 180 181 static inline int nxt_cpu(int cpu) 182 { 183 return (cpu + 1) % nr_cpu_ids; 184 } 185 186 /** 187 * rnbd_rerun_if_needed() - rerun next queue marked as stopped 188 * @sess: Session to rerun a queue on 189 * 190 * Description: 191 * Each CPU has it's own list of HW queues, which should be rerun. 192 * Function finds such list with HW queues, takes a list lock, picks up 193 * the first HW queue out of the list and requeues it. 194 * 195 * Return: 196 * True if the queue was requeued, false otherwise. 197 * 198 * Context: 199 * Does not matter. 200 */ 201 static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess) 202 { 203 struct rnbd_queue *q = NULL; 204 struct rnbd_cpu_qlist *cpu_q; 205 unsigned long flags; 206 int *cpup; 207 208 /* 209 * To keep fairness and not to let other queues starve we always 210 * try to wake up someone else in round-robin manner. That of course 211 * increases latency but queues always have a chance to be executed. 212 */ 213 cpup = get_cpu_ptr(sess->cpu_rr); 214 for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q; 215 cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) { 216 if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags)) 217 continue; 218 if (!test_bit(cpu_q->cpu, sess->cpu_queues_bm)) 219 goto unlock; 220 q = list_first_entry_or_null(&cpu_q->requeue_list, 221 typeof(*q), requeue_list); 222 if (WARN_ON(!q)) 223 goto clear_bit; 224 list_del_init(&q->requeue_list); 225 clear_bit_unlock(0, &q->in_list); 226 227 if (list_empty(&cpu_q->requeue_list)) { 228 /* Clear bit if nothing is left */ 229 clear_bit: 230 clear_bit(cpu_q->cpu, sess->cpu_queues_bm); 231 } 232 unlock: 233 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); 234 235 if (q) 236 break; 237 } 238 239 /** 240 * Saves the CPU that is going to be requeued on the per-cpu var. Just 241 * incrementing it doesn't work because rnbd_get_cpu_qlist() will 242 * always return the first CPU with something on the queue list when the 243 * value stored on the var is greater than the last CPU with something 244 * on the list. 245 */ 246 if (cpu_q) 247 *cpup = cpu_q->cpu; 248 put_cpu_ptr(sess->cpu_rr); 249 250 if (q) 251 rnbd_clt_dev_requeue(q); 252 253 return q; 254 } 255 256 /** 257 * rnbd_rerun_all_if_idle() - rerun all queues left in the list if 258 * session is idling (there are no requests 259 * in-flight). 260 * @sess: Session to rerun the queues on 261 * 262 * Description: 263 * This function tries to rerun all stopped queues if there are no 264 * requests in-flight anymore. This function tries to solve an obvious 265 * problem, when number of tags < than number of queues (hctx), which 266 * are stopped and put to sleep. If last permit, which has been just put, 267 * does not wake up all left queues (hctxs), IO requests hang forever. 268 * 269 * That can happen when all number of permits, say N, have been exhausted 270 * from one CPU, and we have many block devices per session, say M. 271 * Each block device has it's own queue (hctx) for each CPU, so eventually 272 * we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids. 273 * If number of permits N < M x nr_cpu_ids finally we will get an IO hang. 274 * 275 * To avoid this hang last caller of rnbd_put_permit() (last caller is the 276 * one who observes sess->busy == 0) must wake up all remaining queues. 277 * 278 * Context: 279 * Does not matter. 280 */ 281 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess) 282 { 283 bool requeued; 284 285 do { 286 requeued = rnbd_rerun_if_needed(sess); 287 } while (atomic_read(&sess->busy) == 0 && requeued); 288 } 289 290 static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess, 291 enum rtrs_clt_con_type con_type, 292 enum wait_type wait) 293 { 294 struct rtrs_permit *permit; 295 296 permit = rtrs_clt_get_permit(sess->rtrs, con_type, wait); 297 if (permit) 298 /* We have a subtle rare case here, when all permits can be 299 * consumed before busy counter increased. This is safe, 300 * because loser will get NULL as a permit, observe 0 busy 301 * counter and immediately restart the queue himself. 302 */ 303 atomic_inc(&sess->busy); 304 305 return permit; 306 } 307 308 static void rnbd_put_permit(struct rnbd_clt_session *sess, 309 struct rtrs_permit *permit) 310 { 311 rtrs_clt_put_permit(sess->rtrs, permit); 312 atomic_dec(&sess->busy); 313 /* Paired with rnbd_clt_dev_add_to_requeue(). Decrement first 314 * and then check queue bits. 315 */ 316 smp_mb__after_atomic(); 317 rnbd_rerun_all_if_idle(sess); 318 } 319 320 static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess, 321 enum rtrs_clt_con_type con_type, 322 enum wait_type wait) 323 { 324 struct rnbd_iu *iu; 325 struct rtrs_permit *permit; 326 327 iu = kzalloc(sizeof(*iu), GFP_KERNEL); 328 if (!iu) 329 return NULL; 330 331 permit = rnbd_get_permit(sess, con_type, wait); 332 if (!permit) { 333 kfree(iu); 334 return NULL; 335 } 336 337 iu->permit = permit; 338 /* 339 * 1st reference is dropped after finishing sending a "user" message, 340 * 2nd reference is dropped after confirmation with the response is 341 * returned. 342 * 1st and 2nd can happen in any order, so the rnbd_iu should be 343 * released (rtrs_permit returned to rtrs) only after both 344 * are finished. 345 */ 346 atomic_set(&iu->refcount, 2); 347 init_waitqueue_head(&iu->comp.wait); 348 iu->comp.errno = INT_MAX; 349 350 if (sg_alloc_table(&iu->sgt, 1, GFP_KERNEL)) { 351 rnbd_put_permit(sess, permit); 352 kfree(iu); 353 return NULL; 354 } 355 356 return iu; 357 } 358 359 static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu) 360 { 361 if (atomic_dec_and_test(&iu->refcount)) { 362 sg_free_table(&iu->sgt); 363 rnbd_put_permit(sess, iu->permit); 364 kfree(iu); 365 } 366 } 367 368 static void rnbd_softirq_done_fn(struct request *rq) 369 { 370 struct rnbd_clt_dev *dev = rq->q->disk->private_data; 371 struct rnbd_clt_session *sess = dev->sess; 372 struct rnbd_iu *iu; 373 374 iu = blk_mq_rq_to_pdu(rq); 375 sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT); 376 rnbd_put_permit(sess, iu->permit); 377 blk_mq_end_request(rq, errno_to_blk_status(iu->errno)); 378 } 379 380 static void msg_io_conf(void *priv, int errno) 381 { 382 struct rnbd_iu *iu = priv; 383 struct rnbd_clt_dev *dev = iu->dev; 384 struct request *rq = iu->rq; 385 int rw = rq_data_dir(rq); 386 387 iu->errno = errno; 388 389 blk_mq_complete_request(rq); 390 391 if (errno) 392 rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n", 393 rw == READ ? "read" : "write", errno); 394 } 395 396 static void wake_up_iu_comp(struct rnbd_iu *iu, int errno) 397 { 398 iu->comp.errno = errno; 399 wake_up(&iu->comp.wait); 400 } 401 402 static void msg_conf(void *priv, int errno) 403 { 404 struct rnbd_iu *iu = priv; 405 406 iu->errno = errno; 407 schedule_work(&iu->work); 408 } 409 410 static int send_usr_msg(struct rtrs_clt_sess *rtrs, int dir, 411 struct rnbd_iu *iu, struct kvec *vec, 412 size_t len, struct scatterlist *sg, unsigned int sg_len, 413 void (*conf)(struct work_struct *work), 414 int *errno, int wait) 415 { 416 int err; 417 struct rtrs_clt_req_ops req_ops; 418 419 INIT_WORK(&iu->work, conf); 420 req_ops = (struct rtrs_clt_req_ops) { 421 .priv = iu, 422 .conf_fn = msg_conf, 423 }; 424 err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit, 425 vec, 1, len, sg, sg_len); 426 if (!err && wait) { 427 wait_event(iu->comp.wait, iu->comp.errno != INT_MAX); 428 *errno = iu->comp.errno; 429 } else { 430 *errno = 0; 431 } 432 433 return err; 434 } 435 436 static void msg_close_conf(struct work_struct *work) 437 { 438 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 439 struct rnbd_clt_dev *dev = iu->dev; 440 441 wake_up_iu_comp(iu, iu->errno); 442 rnbd_put_iu(dev->sess, iu); 443 rnbd_clt_put_dev(dev); 444 } 445 446 static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, 447 enum wait_type wait) 448 { 449 struct rnbd_clt_session *sess = dev->sess; 450 struct rnbd_msg_close msg; 451 struct rnbd_iu *iu; 452 struct kvec vec = { 453 .iov_base = &msg, 454 .iov_len = sizeof(msg) 455 }; 456 int err, errno; 457 458 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 459 if (!iu) 460 return -ENOMEM; 461 462 iu->buf = NULL; 463 iu->dev = dev; 464 465 msg.hdr.type = cpu_to_le16(RNBD_MSG_CLOSE); 466 msg.device_id = cpu_to_le32(device_id); 467 468 WARN_ON(!rnbd_clt_get_dev(dev)); 469 err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0, 470 msg_close_conf, &errno, wait); 471 if (err) { 472 rnbd_clt_put_dev(dev); 473 rnbd_put_iu(sess, iu); 474 } else { 475 err = errno; 476 } 477 478 rnbd_put_iu(sess, iu); 479 return err; 480 } 481 482 static void msg_open_conf(struct work_struct *work) 483 { 484 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 485 struct rnbd_msg_open_rsp *rsp = iu->buf; 486 struct rnbd_clt_dev *dev = iu->dev; 487 int errno = iu->errno; 488 bool from_map = false; 489 490 /* INIT state is only triggered from rnbd_clt_map_device */ 491 if (dev->dev_state == DEV_STATE_INIT) 492 from_map = true; 493 494 if (errno) { 495 rnbd_clt_err(dev, 496 "Opening failed, server responded: %d\n", 497 errno); 498 } else { 499 errno = process_msg_open_rsp(dev, rsp); 500 if (errno) { 501 u32 device_id = le32_to_cpu(rsp->device_id); 502 /* 503 * If server thinks its fine, but we fail to process 504 * then be nice and send a close to server. 505 */ 506 send_msg_close(dev, device_id, RTRS_PERMIT_NOWAIT); 507 } 508 } 509 /* We free rsp in rnbd_clt_map_device for map scenario */ 510 if (!from_map) 511 kfree(rsp); 512 wake_up_iu_comp(iu, errno); 513 rnbd_put_iu(dev->sess, iu); 514 rnbd_clt_put_dev(dev); 515 } 516 517 static void msg_sess_info_conf(struct work_struct *work) 518 { 519 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 520 struct rnbd_msg_sess_info_rsp *rsp = iu->buf; 521 struct rnbd_clt_session *sess = iu->sess; 522 523 if (!iu->errno) 524 sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR); 525 526 kfree(rsp); 527 wake_up_iu_comp(iu, iu->errno); 528 rnbd_put_iu(sess, iu); 529 rnbd_clt_put_sess(sess); 530 } 531 532 static int send_msg_open(struct rnbd_clt_dev *dev, enum wait_type wait) 533 { 534 struct rnbd_clt_session *sess = dev->sess; 535 struct rnbd_msg_open_rsp *rsp; 536 struct rnbd_msg_open msg; 537 struct rnbd_iu *iu; 538 struct kvec vec = { 539 .iov_base = &msg, 540 .iov_len = sizeof(msg) 541 }; 542 int err, errno; 543 544 rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); 545 if (!rsp) 546 return -ENOMEM; 547 548 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 549 if (!iu) { 550 kfree(rsp); 551 return -ENOMEM; 552 } 553 554 iu->buf = rsp; 555 iu->dev = dev; 556 557 sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp)); 558 559 msg.hdr.type = cpu_to_le16(RNBD_MSG_OPEN); 560 msg.access_mode = dev->access_mode; 561 strscpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name)); 562 563 WARN_ON(!rnbd_clt_get_dev(dev)); 564 err = send_usr_msg(sess->rtrs, READ, iu, 565 &vec, sizeof(*rsp), iu->sgt.sgl, 1, 566 msg_open_conf, &errno, wait); 567 if (err) { 568 rnbd_clt_put_dev(dev); 569 rnbd_put_iu(sess, iu); 570 kfree(rsp); 571 } else { 572 err = errno; 573 } 574 575 rnbd_put_iu(sess, iu); 576 return err; 577 } 578 579 static int send_msg_sess_info(struct rnbd_clt_session *sess, enum wait_type wait) 580 { 581 struct rnbd_msg_sess_info_rsp *rsp; 582 struct rnbd_msg_sess_info msg; 583 struct rnbd_iu *iu; 584 struct kvec vec = { 585 .iov_base = &msg, 586 .iov_len = sizeof(msg) 587 }; 588 int err, errno; 589 590 rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); 591 if (!rsp) 592 return -ENOMEM; 593 594 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 595 if (!iu) { 596 kfree(rsp); 597 return -ENOMEM; 598 } 599 600 iu->buf = rsp; 601 iu->sess = sess; 602 sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp)); 603 604 msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO); 605 msg.ver = RNBD_PROTO_VER_MAJOR; 606 607 if (!rnbd_clt_get_sess(sess)) { 608 /* 609 * That can happen only in one case, when RTRS has restablished 610 * the connection and link_ev() is called, but session is almost 611 * dead, last reference on session is put and caller is waiting 612 * for RTRS to close everything. 613 */ 614 err = -ENODEV; 615 goto put_iu; 616 } 617 err = send_usr_msg(sess->rtrs, READ, iu, 618 &vec, sizeof(*rsp), iu->sgt.sgl, 1, 619 msg_sess_info_conf, &errno, wait); 620 if (err) { 621 rnbd_clt_put_sess(sess); 622 put_iu: 623 rnbd_put_iu(sess, iu); 624 kfree(rsp); 625 } else { 626 err = errno; 627 } 628 rnbd_put_iu(sess, iu); 629 return err; 630 } 631 632 static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess) 633 { 634 struct rnbd_clt_dev *dev; 635 struct kobject *gd_kobj; 636 637 mutex_lock(&sess->lock); 638 list_for_each_entry(dev, &sess->devs_list, list) { 639 rnbd_clt_err(dev, "Device disconnected.\n"); 640 641 mutex_lock(&dev->lock); 642 if (dev->dev_state == DEV_STATE_MAPPED) { 643 dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED; 644 gd_kobj = &disk_to_dev(dev->gd)->kobj; 645 kobject_uevent(gd_kobj, KOBJ_OFFLINE); 646 } 647 mutex_unlock(&dev->lock); 648 } 649 mutex_unlock(&sess->lock); 650 } 651 652 static void remap_devs(struct rnbd_clt_session *sess) 653 { 654 struct rnbd_clt_dev *dev; 655 struct rtrs_attrs attrs; 656 int err; 657 658 /* 659 * Careful here: we are called from RTRS link event directly, 660 * thus we can't send any RTRS request and wait for response 661 * or RTRS will not be able to complete request with failure 662 * if something goes wrong (failing of outstanding requests 663 * happens exactly from the context where we are blocking now). 664 * 665 * So to avoid deadlocks each usr message sent from here must 666 * be asynchronous. 667 */ 668 669 err = send_msg_sess_info(sess, RTRS_PERMIT_NOWAIT); 670 if (err) { 671 pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err); 672 return; 673 } 674 675 err = rtrs_clt_query(sess->rtrs, &attrs); 676 if (err) { 677 pr_err("rtrs_clt_query(\"%s\"): %d\n", sess->sessname, err); 678 return; 679 } 680 mutex_lock(&sess->lock); 681 sess->max_io_size = attrs.max_io_size; 682 683 list_for_each_entry(dev, &sess->devs_list, list) { 684 bool skip; 685 686 mutex_lock(&dev->lock); 687 skip = (dev->dev_state == DEV_STATE_INIT); 688 mutex_unlock(&dev->lock); 689 if (skip) 690 /* 691 * When device is establishing connection for the first 692 * time - do not remap, it will be closed soon. 693 */ 694 continue; 695 696 rnbd_clt_info(dev, "session reconnected, remapping device\n"); 697 err = send_msg_open(dev, RTRS_PERMIT_NOWAIT); 698 if (err) { 699 rnbd_clt_err(dev, "send_msg_open(): %d\n", err); 700 break; 701 } 702 } 703 mutex_unlock(&sess->lock); 704 } 705 706 static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev) 707 { 708 struct rnbd_clt_session *sess = priv; 709 710 switch (ev) { 711 case RTRS_CLT_LINK_EV_DISCONNECTED: 712 set_dev_states_to_disconnected(sess); 713 break; 714 case RTRS_CLT_LINK_EV_RECONNECTED: 715 remap_devs(sess); 716 break; 717 default: 718 pr_err("Unknown session event received (%d), session: %s\n", 719 ev, sess->sessname); 720 } 721 } 722 723 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues) 724 { 725 unsigned int cpu; 726 struct rnbd_cpu_qlist *cpu_q; 727 728 for_each_possible_cpu(cpu) { 729 cpu_q = per_cpu_ptr(cpu_queues, cpu); 730 731 cpu_q->cpu = cpu; 732 INIT_LIST_HEAD(&cpu_q->requeue_list); 733 spin_lock_init(&cpu_q->requeue_lock); 734 } 735 } 736 737 static void destroy_mq_tags(struct rnbd_clt_session *sess) 738 { 739 if (sess->tag_set.tags) 740 blk_mq_free_tag_set(&sess->tag_set); 741 } 742 743 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess) 744 { 745 sess->rtrs_ready = true; 746 wake_up_all(&sess->rtrs_waitq); 747 } 748 749 static void close_rtrs(struct rnbd_clt_session *sess) 750 { 751 might_sleep(); 752 753 if (!IS_ERR_OR_NULL(sess->rtrs)) { 754 rtrs_clt_close(sess->rtrs); 755 sess->rtrs = NULL; 756 wake_up_rtrs_waiters(sess); 757 } 758 } 759 760 static void free_sess(struct rnbd_clt_session *sess) 761 { 762 WARN_ON(!list_empty(&sess->devs_list)); 763 764 might_sleep(); 765 766 close_rtrs(sess); 767 destroy_mq_tags(sess); 768 if (!list_empty(&sess->list)) { 769 mutex_lock(&sess_lock); 770 list_del(&sess->list); 771 mutex_unlock(&sess_lock); 772 } 773 free_percpu(sess->cpu_queues); 774 free_percpu(sess->cpu_rr); 775 mutex_destroy(&sess->lock); 776 kfree(sess); 777 } 778 779 static struct rnbd_clt_session *alloc_sess(const char *sessname) 780 { 781 struct rnbd_clt_session *sess; 782 int err, cpu; 783 784 sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE); 785 if (!sess) 786 return ERR_PTR(-ENOMEM); 787 strscpy(sess->sessname, sessname, sizeof(sess->sessname)); 788 atomic_set(&sess->busy, 0); 789 mutex_init(&sess->lock); 790 INIT_LIST_HEAD(&sess->devs_list); 791 INIT_LIST_HEAD(&sess->list); 792 bitmap_zero(sess->cpu_queues_bm, num_possible_cpus()); 793 init_waitqueue_head(&sess->rtrs_waitq); 794 refcount_set(&sess->refcount, 1); 795 796 sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist); 797 if (!sess->cpu_queues) { 798 err = -ENOMEM; 799 goto err; 800 } 801 rnbd_init_cpu_qlists(sess->cpu_queues); 802 803 /* 804 * That is simple percpu variable which stores cpu indices, which are 805 * incremented on each access. We need that for the sake of fairness 806 * to wake up queues in a round-robin manner. 807 */ 808 sess->cpu_rr = alloc_percpu(int); 809 if (!sess->cpu_rr) { 810 err = -ENOMEM; 811 goto err; 812 } 813 for_each_possible_cpu(cpu) 814 * per_cpu_ptr(sess->cpu_rr, cpu) = cpu; 815 816 return sess; 817 818 err: 819 free_sess(sess); 820 821 return ERR_PTR(err); 822 } 823 824 static int wait_for_rtrs_connection(struct rnbd_clt_session *sess) 825 { 826 wait_event(sess->rtrs_waitq, sess->rtrs_ready); 827 if (IS_ERR_OR_NULL(sess->rtrs)) 828 return -ECONNRESET; 829 830 return 0; 831 } 832 833 static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess) 834 __releases(&sess_lock) 835 __acquires(&sess_lock) 836 { 837 DEFINE_WAIT(wait); 838 839 prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE); 840 if (IS_ERR_OR_NULL(sess->rtrs)) { 841 finish_wait(&sess->rtrs_waitq, &wait); 842 return; 843 } 844 mutex_unlock(&sess_lock); 845 /* loop in caller, see __find_and_get_sess(). 846 * You can't leave mutex locked and call schedule(), you will catch a 847 * deadlock with a caller of free_sess(), which has just put the last 848 * reference and is about to take the sess_lock in order to delete 849 * the session from the list. 850 */ 851 schedule(); 852 mutex_lock(&sess_lock); 853 } 854 855 static struct rnbd_clt_session *__find_and_get_sess(const char *sessname) 856 __releases(&sess_lock) 857 __acquires(&sess_lock) 858 { 859 struct rnbd_clt_session *sess, *sn; 860 int err; 861 862 again: 863 list_for_each_entry_safe(sess, sn, &sess_list, list) { 864 if (strcmp(sessname, sess->sessname)) 865 continue; 866 867 if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs)) 868 /* 869 * No RTRS connection, session is dying. 870 */ 871 continue; 872 873 if (rnbd_clt_get_sess(sess)) { 874 /* 875 * Alive session is found, wait for RTRS connection. 876 */ 877 mutex_unlock(&sess_lock); 878 err = wait_for_rtrs_connection(sess); 879 if (err) 880 rnbd_clt_put_sess(sess); 881 mutex_lock(&sess_lock); 882 883 if (err) 884 /* Session is dying, repeat the loop */ 885 goto again; 886 887 return sess; 888 } 889 /* 890 * Ref is 0, session is dying, wait for RTRS disconnect 891 * in order to avoid session names clashes. 892 */ 893 wait_for_rtrs_disconnection(sess); 894 /* 895 * RTRS is disconnected and soon session will be freed, 896 * so repeat a loop. 897 */ 898 goto again; 899 } 900 901 return NULL; 902 } 903 904 /* caller is responsible for initializing 'first' to false */ 905 static struct 906 rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first) 907 { 908 struct rnbd_clt_session *sess = NULL; 909 910 mutex_lock(&sess_lock); 911 sess = __find_and_get_sess(sessname); 912 if (!sess) { 913 sess = alloc_sess(sessname); 914 if (IS_ERR(sess)) { 915 mutex_unlock(&sess_lock); 916 return sess; 917 } 918 list_add(&sess->list, &sess_list); 919 *first = true; 920 } 921 mutex_unlock(&sess_lock); 922 923 return sess; 924 } 925 926 static int rnbd_client_open(struct gendisk *disk, blk_mode_t mode) 927 { 928 struct rnbd_clt_dev *dev = disk->private_data; 929 930 if (get_disk_ro(dev->gd) && (mode & BLK_OPEN_WRITE)) 931 return -EPERM; 932 933 if (dev->dev_state == DEV_STATE_UNMAPPED || 934 !rnbd_clt_get_dev(dev)) 935 return -EIO; 936 937 return 0; 938 } 939 940 static void rnbd_client_release(struct gendisk *gen) 941 { 942 struct rnbd_clt_dev *dev = gen->private_data; 943 944 rnbd_clt_put_dev(dev); 945 } 946 947 static int rnbd_client_getgeo(struct gendisk *disk, 948 struct hd_geometry *geo) 949 { 950 u64 size; 951 struct rnbd_clt_dev *dev = disk->private_data; 952 struct queue_limits *limit = &dev->queue->limits; 953 954 size = dev->size * (limit->logical_block_size / SECTOR_SIZE); 955 geo->cylinders = size >> 6; /* size/64 */ 956 geo->heads = 4; 957 geo->sectors = 16; 958 geo->start = 0; 959 960 return 0; 961 } 962 963 static const struct block_device_operations rnbd_client_ops = { 964 .owner = THIS_MODULE, 965 .open = rnbd_client_open, 966 .release = rnbd_client_release, 967 .getgeo = rnbd_client_getgeo 968 }; 969 970 /* The amount of data that belongs to an I/O and the amount of data that 971 * should be read or written to the disk (bi_size) can differ. 972 * 973 * E.g. When WRITE_SAME is used, only a small amount of data is 974 * transferred that is then written repeatedly over a lot of sectors. 975 * 976 * Get the size of data to be transferred via RTRS by summing up the size 977 * of the scather-gather list entries. 978 */ 979 static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len) 980 { 981 struct scatterlist *sg; 982 size_t tsize = 0; 983 int i; 984 985 for_each_sg(sglist, sg, len, i) 986 tsize += sg->length; 987 return tsize; 988 } 989 990 static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev, 991 struct request *rq, 992 struct rnbd_iu *iu) 993 { 994 struct rtrs_clt_sess *rtrs = dev->sess->rtrs; 995 struct rtrs_permit *permit = iu->permit; 996 struct rnbd_msg_io msg; 997 struct rtrs_clt_req_ops req_ops; 998 unsigned int sg_cnt = 0; 999 struct kvec vec; 1000 size_t size; 1001 int err; 1002 1003 iu->rq = rq; 1004 iu->dev = dev; 1005 msg.sector = cpu_to_le64(blk_rq_pos(rq)); 1006 msg.bi_size = cpu_to_le32(blk_rq_bytes(rq)); 1007 msg.rw = cpu_to_le32(rq_to_rnbd_flags(rq)); 1008 msg.prio = cpu_to_le16(req_get_ioprio(rq)); 1009 1010 /* 1011 * We only support discards/WRITE_ZEROES with single segment for now. 1012 * See queue limits. 1013 */ 1014 if ((req_op(rq) != REQ_OP_DISCARD) && (req_op(rq) != REQ_OP_WRITE_ZEROES)) 1015 sg_cnt = blk_rq_map_sg(rq, iu->sgt.sgl); 1016 1017 if (sg_cnt == 0) 1018 sg_mark_end(&iu->sgt.sgl[0]); 1019 1020 msg.hdr.type = cpu_to_le16(RNBD_MSG_IO); 1021 msg.device_id = cpu_to_le32(dev->device_id); 1022 1023 vec = (struct kvec) { 1024 .iov_base = &msg, 1025 .iov_len = sizeof(msg) 1026 }; 1027 size = rnbd_clt_get_sg_size(iu->sgt.sgl, sg_cnt); 1028 req_ops = (struct rtrs_clt_req_ops) { 1029 .priv = iu, 1030 .conf_fn = msg_io_conf, 1031 }; 1032 err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit, 1033 &vec, 1, size, iu->sgt.sgl, sg_cnt); 1034 if (err) { 1035 rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n", 1036 err); 1037 return err; 1038 } 1039 1040 return 0; 1041 } 1042 1043 /** 1044 * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy 1045 * @dev: Device to be checked 1046 * @q: Queue to be added to the requeue list if required 1047 * 1048 * Description: 1049 * If session is busy, that means someone will requeue us when resources 1050 * are freed. If session is not doing anything - device is not added to 1051 * the list and @false is returned. 1052 */ 1053 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev, 1054 struct rnbd_queue *q) 1055 { 1056 struct rnbd_clt_session *sess = dev->sess; 1057 struct rnbd_cpu_qlist *cpu_q; 1058 unsigned long flags; 1059 bool added = true; 1060 bool need_set; 1061 1062 cpu_q = get_cpu_ptr(sess->cpu_queues); 1063 spin_lock_irqsave(&cpu_q->requeue_lock, flags); 1064 1065 if (!test_and_set_bit_lock(0, &q->in_list)) { 1066 if (WARN_ON(!list_empty(&q->requeue_list))) 1067 goto unlock; 1068 1069 need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm); 1070 if (need_set) { 1071 set_bit(cpu_q->cpu, sess->cpu_queues_bm); 1072 /* Paired with rnbd_put_permit(). Set a bit first 1073 * and then observe the busy counter. 1074 */ 1075 smp_mb__before_atomic(); 1076 } 1077 if (atomic_read(&sess->busy)) { 1078 list_add_tail(&q->requeue_list, &cpu_q->requeue_list); 1079 } else { 1080 /* Very unlikely, but possible: busy counter was 1081 * observed as zero. Drop all bits and return 1082 * false to restart the queue by ourselves. 1083 */ 1084 if (need_set) 1085 clear_bit(cpu_q->cpu, sess->cpu_queues_bm); 1086 clear_bit_unlock(0, &q->in_list); 1087 added = false; 1088 } 1089 } 1090 unlock: 1091 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); 1092 put_cpu_ptr(sess->cpu_queues); 1093 1094 return added; 1095 } 1096 1097 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev, 1098 struct blk_mq_hw_ctx *hctx, 1099 int delay) 1100 { 1101 struct rnbd_queue *q = hctx->driver_data; 1102 1103 if (delay != RNBD_DELAY_IFBUSY) 1104 blk_mq_delay_run_hw_queue(hctx, delay); 1105 else if (!rnbd_clt_dev_add_to_requeue(dev, q)) 1106 /* 1107 * If session is not busy we have to restart 1108 * the queue ourselves. 1109 */ 1110 blk_mq_delay_run_hw_queue(hctx, 10/*ms*/); 1111 } 1112 1113 static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx, 1114 const struct blk_mq_queue_data *bd) 1115 { 1116 struct request *rq = bd->rq; 1117 struct rnbd_clt_dev *dev = rq->q->disk->private_data; 1118 struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq); 1119 int err; 1120 blk_status_t ret = BLK_STS_IOERR; 1121 1122 if (dev->dev_state != DEV_STATE_MAPPED) 1123 return BLK_STS_IOERR; 1124 1125 iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON, 1126 RTRS_PERMIT_NOWAIT); 1127 if (!iu->permit) { 1128 rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY); 1129 return BLK_STS_RESOURCE; 1130 } 1131 1132 iu->sgt.sgl = iu->first_sgl; 1133 err = sg_alloc_table_chained(&iu->sgt, 1134 /* Even-if the request has no segment, 1135 * sglist must have one entry at least. 1136 */ 1137 blk_rq_nr_phys_segments(rq) ? : 1, 1138 iu->sgt.sgl, 1139 RNBD_INLINE_SG_CNT); 1140 if (err) { 1141 rnbd_clt_err_rl(dev, "sg_alloc_table_chained ret=%d\n", err); 1142 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/); 1143 rnbd_put_permit(dev->sess, iu->permit); 1144 return BLK_STS_RESOURCE; 1145 } 1146 1147 blk_mq_start_request(rq); 1148 err = rnbd_client_xfer_request(dev, rq, iu); 1149 if (err == 0) 1150 return BLK_STS_OK; 1151 if (err == -EAGAIN || err == -ENOMEM) { 1152 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/); 1153 ret = BLK_STS_RESOURCE; 1154 } 1155 sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT); 1156 rnbd_put_permit(dev->sess, iu->permit); 1157 return ret; 1158 } 1159 1160 static int rnbd_rdma_poll(struct blk_mq_hw_ctx *hctx, struct io_comp_batch *iob) 1161 { 1162 struct rnbd_queue *q = hctx->driver_data; 1163 struct rnbd_clt_dev *dev = q->dev; 1164 1165 return rtrs_clt_rdma_cq_direct(dev->sess->rtrs, hctx->queue_num); 1166 } 1167 1168 static void rnbd_rdma_map_queues(struct blk_mq_tag_set *set) 1169 { 1170 struct rnbd_clt_session *sess = set->driver_data; 1171 1172 /* shared read/write queues */ 1173 set->map[HCTX_TYPE_DEFAULT].nr_queues = num_online_cpus(); 1174 set->map[HCTX_TYPE_DEFAULT].queue_offset = 0; 1175 set->map[HCTX_TYPE_READ].nr_queues = num_online_cpus(); 1176 set->map[HCTX_TYPE_READ].queue_offset = 0; 1177 blk_mq_map_queues(&set->map[HCTX_TYPE_DEFAULT]); 1178 blk_mq_map_queues(&set->map[HCTX_TYPE_READ]); 1179 1180 if (sess->nr_poll_queues) { 1181 /* dedicated queue for poll */ 1182 set->map[HCTX_TYPE_POLL].nr_queues = sess->nr_poll_queues; 1183 set->map[HCTX_TYPE_POLL].queue_offset = set->map[HCTX_TYPE_READ].queue_offset + 1184 set->map[HCTX_TYPE_READ].nr_queues; 1185 blk_mq_map_queues(&set->map[HCTX_TYPE_POLL]); 1186 pr_info("[session=%s] mapped %d/%d/%d default/read/poll queues.\n", 1187 sess->sessname, 1188 set->map[HCTX_TYPE_DEFAULT].nr_queues, 1189 set->map[HCTX_TYPE_READ].nr_queues, 1190 set->map[HCTX_TYPE_POLL].nr_queues); 1191 } else { 1192 pr_info("[session=%s] mapped %d/%d default/read queues.\n", 1193 sess->sessname, 1194 set->map[HCTX_TYPE_DEFAULT].nr_queues, 1195 set->map[HCTX_TYPE_READ].nr_queues); 1196 } 1197 } 1198 1199 static struct blk_mq_ops rnbd_mq_ops = { 1200 .queue_rq = rnbd_queue_rq, 1201 .complete = rnbd_softirq_done_fn, 1202 .map_queues = rnbd_rdma_map_queues, 1203 .poll = rnbd_rdma_poll, 1204 }; 1205 1206 static int setup_mq_tags(struct rnbd_clt_session *sess) 1207 { 1208 struct blk_mq_tag_set *tag_set = &sess->tag_set; 1209 1210 memset(tag_set, 0, sizeof(*tag_set)); 1211 tag_set->ops = &rnbd_mq_ops; 1212 tag_set->queue_depth = sess->queue_depth; 1213 tag_set->numa_node = NUMA_NO_NODE; 1214 tag_set->flags = BLK_MQ_F_TAG_QUEUE_SHARED; 1215 tag_set->cmd_size = sizeof(struct rnbd_iu) + RNBD_RDMA_SGL_SIZE; 1216 1217 /* for HCTX_TYPE_DEFAULT, HCTX_TYPE_READ, HCTX_TYPE_POLL */ 1218 tag_set->nr_maps = sess->nr_poll_queues ? HCTX_MAX_TYPES : 2; 1219 /* 1220 * HCTX_TYPE_DEFAULT and HCTX_TYPE_READ share one set of queues 1221 * others are for HCTX_TYPE_POLL 1222 */ 1223 tag_set->nr_hw_queues = num_online_cpus() + sess->nr_poll_queues; 1224 tag_set->driver_data = sess; 1225 1226 return blk_mq_alloc_tag_set(tag_set); 1227 } 1228 1229 static struct rnbd_clt_session * 1230 find_and_get_or_create_sess(const char *sessname, 1231 const struct rtrs_addr *paths, 1232 size_t path_cnt, u16 port_nr, u32 nr_poll_queues) 1233 { 1234 struct rnbd_clt_session *sess; 1235 struct rtrs_attrs attrs; 1236 int err; 1237 bool first = false; 1238 struct rtrs_clt_ops rtrs_ops; 1239 1240 sess = find_or_create_sess(sessname, &first); 1241 if (sess == ERR_PTR(-ENOMEM)) { 1242 return ERR_PTR(-ENOMEM); 1243 } else if ((nr_poll_queues && !first) || (!nr_poll_queues && sess->nr_poll_queues)) { 1244 /* 1245 * A device MUST have its own session to use the polling-mode. 1246 * It must fail to map new device with the same session. 1247 */ 1248 err = -EINVAL; 1249 goto put_sess; 1250 } 1251 1252 if (!first) 1253 return sess; 1254 1255 if (!path_cnt) { 1256 pr_err("Session %s not found, and path parameter not given", sessname); 1257 err = -ENXIO; 1258 goto put_sess; 1259 } 1260 1261 rtrs_ops = (struct rtrs_clt_ops) { 1262 .priv = sess, 1263 .link_ev = rnbd_clt_link_ev, 1264 }; 1265 /* 1266 * Nothing was found, establish rtrs connection and proceed further. 1267 */ 1268 sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname, 1269 paths, path_cnt, port_nr, 1270 0, /* Do not use pdu of rtrs */ 1271 RECONNECT_DELAY, 1272 MAX_RECONNECTS, nr_poll_queues); 1273 if (IS_ERR(sess->rtrs)) { 1274 err = PTR_ERR(sess->rtrs); 1275 goto wake_up_and_put; 1276 } 1277 1278 err = rtrs_clt_query(sess->rtrs, &attrs); 1279 if (err) 1280 goto close_rtrs; 1281 1282 sess->max_io_size = attrs.max_io_size; 1283 sess->queue_depth = attrs.queue_depth; 1284 sess->nr_poll_queues = nr_poll_queues; 1285 sess->max_segments = attrs.max_segments; 1286 1287 err = setup_mq_tags(sess); 1288 if (err) 1289 goto close_rtrs; 1290 1291 err = send_msg_sess_info(sess, RTRS_PERMIT_WAIT); 1292 if (err) 1293 goto close_rtrs; 1294 1295 wake_up_rtrs_waiters(sess); 1296 1297 return sess; 1298 1299 close_rtrs: 1300 close_rtrs(sess); 1301 put_sess: 1302 rnbd_clt_put_sess(sess); 1303 1304 return ERR_PTR(err); 1305 1306 wake_up_and_put: 1307 wake_up_rtrs_waiters(sess); 1308 goto put_sess; 1309 } 1310 1311 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev, 1312 struct rnbd_queue *q, 1313 struct blk_mq_hw_ctx *hctx) 1314 { 1315 INIT_LIST_HEAD(&q->requeue_list); 1316 q->dev = dev; 1317 q->hctx = hctx; 1318 } 1319 1320 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev) 1321 { 1322 unsigned long i; 1323 struct blk_mq_hw_ctx *hctx; 1324 struct rnbd_queue *q; 1325 1326 queue_for_each_hw_ctx(dev->queue, hctx, i) { 1327 q = &dev->hw_queues[i]; 1328 rnbd_init_hw_queue(dev, q, hctx); 1329 hctx->driver_data = q; 1330 } 1331 } 1332 1333 static int rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, 1334 struct rnbd_msg_open_rsp *rsp, int idx) 1335 { 1336 int err; 1337 1338 dev->gd->major = rnbd_client_major; 1339 dev->gd->first_minor = idx << RNBD_PART_BITS; 1340 dev->gd->minors = 1 << RNBD_PART_BITS; 1341 dev->gd->fops = &rnbd_client_ops; 1342 dev->gd->queue = dev->queue; 1343 dev->gd->private_data = dev; 1344 snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d", 1345 idx); 1346 pr_debug("disk_name=%s, capacity=%llu\n", 1347 dev->gd->disk_name, 1348 le64_to_cpu(rsp->nsectors) * 1349 (le16_to_cpu(rsp->logical_block_size) / SECTOR_SIZE)); 1350 1351 set_capacity(dev->gd, le64_to_cpu(rsp->nsectors)); 1352 1353 if (dev->access_mode == RNBD_ACCESS_RO) 1354 set_disk_ro(dev->gd, true); 1355 1356 err = add_disk(dev->gd); 1357 if (err) 1358 put_disk(dev->gd); 1359 1360 return err; 1361 } 1362 1363 static int rnbd_client_setup_device(struct rnbd_clt_dev *dev, 1364 struct rnbd_msg_open_rsp *rsp) 1365 { 1366 struct queue_limits lim = { 1367 .logical_block_size = le16_to_cpu(rsp->logical_block_size), 1368 .physical_block_size = le16_to_cpu(rsp->physical_block_size), 1369 .io_opt = dev->sess->max_io_size, 1370 .max_hw_sectors = dev->sess->max_io_size / SECTOR_SIZE, 1371 .max_hw_discard_sectors = le32_to_cpu(rsp->max_discard_sectors), 1372 .discard_granularity = le32_to_cpu(rsp->discard_granularity), 1373 .discard_alignment = le32_to_cpu(rsp->discard_alignment), 1374 .max_segments = dev->sess->max_segments, 1375 .virt_boundary_mask = SZ_4K - 1, 1376 .max_write_zeroes_sectors = 1377 le32_to_cpu(rsp->max_write_zeroes_sectors), 1378 }; 1379 int idx = dev->clt_device_id; 1380 1381 dev->size = le64_to_cpu(rsp->nsectors) * 1382 le16_to_cpu(rsp->logical_block_size); 1383 1384 if (rsp->secure_discard) { 1385 lim.max_secure_erase_sectors = 1386 le32_to_cpu(rsp->max_discard_sectors); 1387 } 1388 1389 if (rsp->cache_policy & RNBD_WRITEBACK) { 1390 lim.features |= BLK_FEAT_WRITE_CACHE; 1391 if (rsp->cache_policy & RNBD_FUA) 1392 lim.features |= BLK_FEAT_FUA; 1393 } 1394 1395 dev->gd = blk_mq_alloc_disk(&dev->sess->tag_set, &lim, dev); 1396 if (IS_ERR(dev->gd)) 1397 return PTR_ERR(dev->gd); 1398 dev->queue = dev->gd->queue; 1399 rnbd_init_mq_hw_queues(dev); 1400 1401 return rnbd_clt_setup_gen_disk(dev, rsp, idx); 1402 } 1403 1404 static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess, 1405 enum rnbd_access_mode access_mode, 1406 const char *pathname, 1407 u32 nr_poll_queues) 1408 { 1409 struct rnbd_clt_dev *dev; 1410 int ret; 1411 1412 dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE); 1413 if (!dev) 1414 return ERR_PTR(-ENOMEM); 1415 1416 /* 1417 * nr_cpu_ids: the number of softirq queues 1418 * nr_poll_queues: the number of polling queues 1419 */ 1420 dev->hw_queues = kcalloc(nr_cpu_ids + nr_poll_queues, 1421 sizeof(*dev->hw_queues), 1422 GFP_KERNEL); 1423 if (!dev->hw_queues) { 1424 ret = -ENOMEM; 1425 goto out_alloc; 1426 } 1427 1428 dev->clt_device_id = ida_alloc_max(&index_ida, 1429 (1 << (MINORBITS - RNBD_PART_BITS)) - 1, 1430 GFP_KERNEL); 1431 if (dev->clt_device_id < 0) { 1432 ret = dev->clt_device_id; 1433 pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n", 1434 pathname, sess->sessname, ret); 1435 goto out_queues; 1436 } 1437 1438 dev->pathname = kstrdup(pathname, GFP_KERNEL); 1439 if (!dev->pathname) { 1440 ret = -ENOMEM; 1441 goto out_ida; 1442 } 1443 1444 dev->sess = sess; 1445 dev->access_mode = access_mode; 1446 dev->nr_poll_queues = nr_poll_queues; 1447 mutex_init(&dev->lock); 1448 refcount_set(&dev->refcount, 1); 1449 dev->dev_state = DEV_STATE_INIT; 1450 1451 /* 1452 * Here we called from sysfs entry, thus clt-sysfs is 1453 * responsible that session will not disappear. 1454 */ 1455 WARN_ON(!rnbd_clt_get_sess(sess)); 1456 1457 return dev; 1458 1459 out_ida: 1460 ida_free(&index_ida, dev->clt_device_id); 1461 out_queues: 1462 kfree(dev->hw_queues); 1463 out_alloc: 1464 kfree(dev); 1465 return ERR_PTR(ret); 1466 } 1467 1468 static bool __exists_dev(const char *pathname, const char *sessname) 1469 { 1470 struct rnbd_clt_session *sess; 1471 struct rnbd_clt_dev *dev; 1472 bool found = false; 1473 1474 list_for_each_entry(sess, &sess_list, list) { 1475 if (sessname && strncmp(sess->sessname, sessname, 1476 sizeof(sess->sessname))) 1477 continue; 1478 mutex_lock(&sess->lock); 1479 list_for_each_entry(dev, &sess->devs_list, list) { 1480 if (strlen(dev->pathname) == strlen(pathname) && 1481 !strcmp(dev->pathname, pathname)) { 1482 found = true; 1483 break; 1484 } 1485 } 1486 mutex_unlock(&sess->lock); 1487 if (found) 1488 break; 1489 } 1490 1491 return found; 1492 } 1493 1494 static bool exists_devpath(const char *pathname, const char *sessname) 1495 { 1496 bool found; 1497 1498 mutex_lock(&sess_lock); 1499 found = __exists_dev(pathname, sessname); 1500 mutex_unlock(&sess_lock); 1501 1502 return found; 1503 } 1504 1505 static bool insert_dev_if_not_exists_devpath(struct rnbd_clt_dev *dev) 1506 { 1507 bool found; 1508 struct rnbd_clt_session *sess = dev->sess; 1509 1510 mutex_lock(&sess_lock); 1511 found = __exists_dev(dev->pathname, sess->sessname); 1512 if (!found) { 1513 mutex_lock(&sess->lock); 1514 list_add_tail(&dev->list, &sess->devs_list); 1515 mutex_unlock(&sess->lock); 1516 } 1517 mutex_unlock(&sess_lock); 1518 1519 return found; 1520 } 1521 1522 static void rnbd_delete_dev(struct rnbd_clt_dev *dev) 1523 { 1524 struct rnbd_clt_session *sess = dev->sess; 1525 1526 mutex_lock(&sess->lock); 1527 list_del(&dev->list); 1528 mutex_unlock(&sess->lock); 1529 } 1530 1531 struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname, 1532 struct rtrs_addr *paths, 1533 size_t path_cnt, u16 port_nr, 1534 const char *pathname, 1535 enum rnbd_access_mode access_mode, 1536 u32 nr_poll_queues) 1537 { 1538 struct rnbd_clt_session *sess; 1539 struct rnbd_clt_dev *dev; 1540 int ret, errno; 1541 struct rnbd_msg_open_rsp *rsp; 1542 struct rnbd_msg_open msg; 1543 struct rnbd_iu *iu; 1544 struct kvec vec = { 1545 .iov_base = &msg, 1546 .iov_len = sizeof(msg) 1547 }; 1548 1549 if (exists_devpath(pathname, sessname)) 1550 return ERR_PTR(-EEXIST); 1551 1552 sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr, nr_poll_queues); 1553 if (IS_ERR(sess)) 1554 return ERR_CAST(sess); 1555 1556 dev = init_dev(sess, access_mode, pathname, nr_poll_queues); 1557 if (IS_ERR(dev)) { 1558 pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %pe\n", 1559 pathname, sess->sessname, dev); 1560 ret = PTR_ERR(dev); 1561 goto put_sess; 1562 } 1563 if (insert_dev_if_not_exists_devpath(dev)) { 1564 ret = -EEXIST; 1565 goto put_dev; 1566 } 1567 1568 rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); 1569 if (!rsp) { 1570 ret = -ENOMEM; 1571 goto del_dev; 1572 } 1573 1574 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 1575 if (!iu) { 1576 ret = -ENOMEM; 1577 kfree(rsp); 1578 goto del_dev; 1579 } 1580 iu->buf = rsp; 1581 iu->dev = dev; 1582 sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp)); 1583 1584 msg.hdr.type = cpu_to_le16(RNBD_MSG_OPEN); 1585 msg.access_mode = dev->access_mode; 1586 strscpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name)); 1587 1588 WARN_ON(!rnbd_clt_get_dev(dev)); 1589 ret = send_usr_msg(sess->rtrs, READ, iu, 1590 &vec, sizeof(*rsp), iu->sgt.sgl, 1, 1591 msg_open_conf, &errno, RTRS_PERMIT_WAIT); 1592 if (ret) { 1593 rnbd_clt_put_dev(dev); 1594 rnbd_put_iu(sess, iu); 1595 } else { 1596 ret = errno; 1597 } 1598 if (ret) { 1599 rnbd_clt_err(dev, 1600 "map_device: failed, can't open remote device, err: %d\n", 1601 ret); 1602 goto put_iu; 1603 } 1604 mutex_lock(&dev->lock); 1605 pr_debug("Opened remote device: session=%s, path='%s'\n", 1606 sess->sessname, pathname); 1607 ret = rnbd_client_setup_device(dev, rsp); 1608 if (ret) { 1609 rnbd_clt_err(dev, 1610 "map_device: Failed to configure device, err: %d\n", 1611 ret); 1612 mutex_unlock(&dev->lock); 1613 goto send_close; 1614 } 1615 1616 rnbd_clt_info(dev, 1617 "map_device: Device mapped as %s (nsectors: %llu, logical_block_size: %d, physical_block_size: %d, max_write_zeroes_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, wc: %d, fua: %d)\n", 1618 dev->gd->disk_name, le64_to_cpu(rsp->nsectors), 1619 le16_to_cpu(rsp->logical_block_size), 1620 le16_to_cpu(rsp->physical_block_size), 1621 le32_to_cpu(rsp->max_write_zeroes_sectors), 1622 le32_to_cpu(rsp->max_discard_sectors), 1623 le32_to_cpu(rsp->discard_granularity), 1624 le32_to_cpu(rsp->discard_alignment), 1625 le16_to_cpu(rsp->secure_discard), 1626 sess->max_segments, sess->max_io_size / SECTOR_SIZE, 1627 !!(rsp->cache_policy & RNBD_WRITEBACK), 1628 !!(rsp->cache_policy & RNBD_FUA)); 1629 1630 mutex_unlock(&dev->lock); 1631 kfree(rsp); 1632 rnbd_put_iu(sess, iu); 1633 rnbd_clt_put_sess(sess); 1634 1635 return dev; 1636 1637 send_close: 1638 send_msg_close(dev, dev->device_id, RTRS_PERMIT_WAIT); 1639 put_iu: 1640 kfree(rsp); 1641 rnbd_put_iu(sess, iu); 1642 del_dev: 1643 rnbd_delete_dev(dev); 1644 put_dev: 1645 rnbd_clt_put_dev(dev); 1646 put_sess: 1647 rnbd_clt_put_sess(sess); 1648 1649 return ERR_PTR(ret); 1650 } 1651 1652 static void rnbd_destroy_gen_disk(struct rnbd_clt_dev *dev) 1653 { 1654 del_gendisk(dev->gd); 1655 put_disk(dev->gd); 1656 } 1657 1658 static void rnbd_destroy_sysfs(struct rnbd_clt_dev *dev, 1659 const struct attribute *sysfs_self) 1660 { 1661 rnbd_clt_remove_dev_symlink(dev); 1662 if (dev->kobj.state_initialized) { 1663 if (sysfs_self) 1664 /* To avoid deadlock firstly remove itself */ 1665 sysfs_remove_file_self(&dev->kobj, sysfs_self); 1666 kobject_del(&dev->kobj); 1667 } 1668 } 1669 1670 int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force, 1671 const struct attribute *sysfs_self) 1672 { 1673 struct rnbd_clt_session *sess = dev->sess; 1674 int refcount, ret = 0; 1675 bool was_mapped; 1676 1677 mutex_lock(&dev->lock); 1678 if (dev->dev_state == DEV_STATE_UNMAPPED) { 1679 rnbd_clt_info(dev, "Device is already being unmapped\n"); 1680 ret = -EALREADY; 1681 goto err; 1682 } 1683 refcount = refcount_read(&dev->refcount); 1684 if (!force && refcount > 1) { 1685 rnbd_clt_err(dev, 1686 "Closing device failed, device is in use, (%d device users)\n", 1687 refcount - 1); 1688 ret = -EBUSY; 1689 goto err; 1690 } 1691 was_mapped = (dev->dev_state == DEV_STATE_MAPPED); 1692 dev->dev_state = DEV_STATE_UNMAPPED; 1693 mutex_unlock(&dev->lock); 1694 1695 rnbd_delete_dev(dev); 1696 rnbd_destroy_sysfs(dev, sysfs_self); 1697 rnbd_destroy_gen_disk(dev); 1698 if (was_mapped && sess->rtrs) 1699 send_msg_close(dev, dev->device_id, RTRS_PERMIT_WAIT); 1700 1701 rnbd_clt_info(dev, "Device is unmapped\n"); 1702 1703 /* Likely last reference put */ 1704 rnbd_clt_put_dev(dev); 1705 1706 /* 1707 * Here device and session can be vanished! 1708 */ 1709 1710 return 0; 1711 err: 1712 mutex_unlock(&dev->lock); 1713 1714 return ret; 1715 } 1716 1717 int rnbd_clt_remap_device(struct rnbd_clt_dev *dev) 1718 { 1719 int err; 1720 1721 mutex_lock(&dev->lock); 1722 if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) 1723 err = 0; 1724 else if (dev->dev_state == DEV_STATE_UNMAPPED) 1725 err = -ENODEV; 1726 else if (dev->dev_state == DEV_STATE_MAPPED) 1727 err = -EALREADY; 1728 else 1729 err = -EBUSY; 1730 mutex_unlock(&dev->lock); 1731 if (!err) { 1732 rnbd_clt_info(dev, "Remapping device.\n"); 1733 err = send_msg_open(dev, RTRS_PERMIT_WAIT); 1734 if (err) 1735 rnbd_clt_err(dev, "remap_device: %d\n", err); 1736 } 1737 1738 return err; 1739 } 1740 1741 static void unmap_device_work(struct work_struct *work) 1742 { 1743 struct rnbd_clt_dev *dev; 1744 1745 dev = container_of(work, typeof(*dev), unmap_on_rmmod_work); 1746 rnbd_clt_unmap_device(dev, true, NULL); 1747 } 1748 1749 static void rnbd_destroy_sessions(void) 1750 { 1751 struct rnbd_clt_session *sess, *sn; 1752 struct rnbd_clt_dev *dev, *tn; 1753 1754 /* Firstly forbid access through sysfs interface */ 1755 rnbd_clt_destroy_sysfs_files(); 1756 1757 /* 1758 * Here at this point there is no any concurrent access to sessions 1759 * list and devices list: 1760 * 1. New session or device can't be created - session sysfs files 1761 * are removed. 1762 * 2. Device or session can't be removed - module reference is taken 1763 * into account in unmap device sysfs callback. 1764 * 3. No IO requests inflight - each file open of block_dev increases 1765 * module reference in get_disk(). 1766 * 1767 * But still there can be user requests inflights, which are sent by 1768 * asynchronous send_msg_*() functions, thus before unmapping devices 1769 * RTRS session must be explicitly closed. 1770 */ 1771 1772 list_for_each_entry_safe(sess, sn, &sess_list, list) { 1773 if (!rnbd_clt_get_sess(sess)) 1774 continue; 1775 close_rtrs(sess); 1776 list_for_each_entry_safe(dev, tn, &sess->devs_list, list) { 1777 /* 1778 * Here unmap happens in parallel for only one reason: 1779 * del_gendisk() takes around half a second, so 1780 * on huge amount of devices the whole module unload 1781 * procedure takes minutes. 1782 */ 1783 INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work); 1784 queue_work(rnbd_clt_wq, &dev->unmap_on_rmmod_work); 1785 } 1786 rnbd_clt_put_sess(sess); 1787 } 1788 /* Wait for all scheduled unmap works */ 1789 flush_workqueue(rnbd_clt_wq); 1790 WARN_ON(!list_empty(&sess_list)); 1791 } 1792 1793 static int __init rnbd_client_init(void) 1794 { 1795 int err = 0; 1796 1797 BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4); 1798 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36); 1799 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36); 1800 BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264); 1801 BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8); 1802 BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56); 1803 rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd"); 1804 if (rnbd_client_major <= 0) { 1805 pr_err("Failed to load module, block device registration failed\n"); 1806 return -EBUSY; 1807 } 1808 1809 err = rnbd_clt_create_sysfs_files(); 1810 if (err) { 1811 pr_err("Failed to load module, creating sysfs device files failed, err: %d\n", 1812 err); 1813 unregister_blkdev(rnbd_client_major, "rnbd"); 1814 return err; 1815 } 1816 rnbd_clt_wq = alloc_workqueue("rnbd_clt_wq", WQ_PERCPU, 0); 1817 if (!rnbd_clt_wq) { 1818 pr_err("Failed to load module, alloc_workqueue failed.\n"); 1819 rnbd_clt_destroy_sysfs_files(); 1820 unregister_blkdev(rnbd_client_major, "rnbd"); 1821 err = -ENOMEM; 1822 } 1823 1824 return err; 1825 } 1826 1827 static void __exit rnbd_client_exit(void) 1828 { 1829 rnbd_destroy_sessions(); 1830 unregister_blkdev(rnbd_client_major, "rnbd"); 1831 ida_destroy(&index_ida); 1832 destroy_workqueue(rnbd_clt_wq); 1833 } 1834 1835 module_init(rnbd_client_init); 1836 module_exit(rnbd_client_exit); 1837