1 /* 2 rbd.c -- Export ceph rados objects as a Linux block device 3 4 5 based on drivers/block/osdblk.c: 6 7 Copyright 2009 Red Hat, Inc. 8 9 This program is free software; you can redistribute it and/or modify 10 it under the terms of the GNU General Public License as published by 11 the Free Software Foundation. 12 13 This program is distributed in the hope that it will be useful, 14 but WITHOUT ANY WARRANTY; without even the implied warranty of 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 GNU General Public License for more details. 17 18 You should have received a copy of the GNU General Public License 19 along with this program; see the file COPYING. If not, write to 20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 21 22 23 24 For usage instructions, please refer to: 25 26 Documentation/ABI/testing/sysfs-bus-rbd 27 28 */ 29 30 #include <linux/ceph/libceph.h> 31 #include <linux/ceph/osd_client.h> 32 #include <linux/ceph/mon_client.h> 33 #include <linux/ceph/decode.h> 34 #include <linux/parser.h> 35 36 #include <linux/kernel.h> 37 #include <linux/device.h> 38 #include <linux/module.h> 39 #include <linux/fs.h> 40 #include <linux/blkdev.h> 41 42 #include "rbd_types.h" 43 44 #define RBD_DEBUG /* Activate rbd_assert() calls */ 45 46 /* 47 * The basic unit of block I/O is a sector. It is interpreted in a 48 * number of contexts in Linux (blk, bio, genhd), but the default is 49 * universally 512 bytes. These symbols are just slightly more 50 * meaningful than the bare numbers they represent. 51 */ 52 #define SECTOR_SHIFT 9 53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT) 54 55 /* It might be useful to have this defined elsewhere too */ 56 57 #define U64_MAX ((u64) (~0ULL)) 58 59 #define RBD_DRV_NAME "rbd" 60 #define RBD_DRV_NAME_LONG "rbd (rados block device)" 61 62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 63 64 #define RBD_MAX_SNAP_NAME_LEN 32 65 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 66 #define RBD_MAX_OPT_LEN 1024 67 68 #define RBD_SNAP_HEAD_NAME "-" 69 70 #define RBD_IMAGE_ID_LEN_MAX 64 71 #define RBD_OBJ_PREFIX_LEN_MAX 64 72 73 /* 74 * An RBD device name will be "rbd#", where the "rbd" comes from 75 * RBD_DRV_NAME above, and # is a unique integer identifier. 76 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big 77 * enough to hold all possible device names. 78 */ 79 #define DEV_NAME_LEN 32 80 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) 81 82 #define RBD_READ_ONLY_DEFAULT false 83 84 /* 85 * block device image metadata (in-memory version) 86 */ 87 struct rbd_image_header { 88 /* These four fields never change for a given rbd image */ 89 char *object_prefix; 90 u64 features; 91 __u8 obj_order; 92 __u8 crypt_type; 93 __u8 comp_type; 94 95 /* The remaining fields need to be updated occasionally */ 96 u64 image_size; 97 struct ceph_snap_context *snapc; 98 char *snap_names; 99 u64 *snap_sizes; 100 101 u64 obj_version; 102 }; 103 104 struct rbd_options { 105 bool read_only; 106 }; 107 108 /* 109 * an instance of the client. multiple devices may share an rbd client. 110 */ 111 struct rbd_client { 112 struct ceph_client *client; 113 struct kref kref; 114 struct list_head node; 115 }; 116 117 /* 118 * a request completion status 119 */ 120 struct rbd_req_status { 121 int done; 122 int rc; 123 u64 bytes; 124 }; 125 126 /* 127 * a collection of requests 128 */ 129 struct rbd_req_coll { 130 int total; 131 int num_done; 132 struct kref kref; 133 struct rbd_req_status status[0]; 134 }; 135 136 /* 137 * a single io request 138 */ 139 struct rbd_request { 140 struct request *rq; /* blk layer request */ 141 struct bio *bio; /* cloned bio */ 142 struct page **pages; /* list of used pages */ 143 u64 len; 144 int coll_index; 145 struct rbd_req_coll *coll; 146 }; 147 148 struct rbd_snap { 149 struct device dev; 150 const char *name; 151 u64 size; 152 struct list_head node; 153 u64 id; 154 u64 features; 155 }; 156 157 struct rbd_mapping { 158 char *snap_name; 159 u64 snap_id; 160 u64 size; 161 u64 features; 162 bool snap_exists; 163 bool read_only; 164 }; 165 166 /* 167 * a single device 168 */ 169 struct rbd_device { 170 int dev_id; /* blkdev unique id */ 171 172 int major; /* blkdev assigned major */ 173 struct gendisk *disk; /* blkdev's gendisk and rq */ 174 175 u32 image_format; /* Either 1 or 2 */ 176 struct rbd_options rbd_opts; 177 struct rbd_client *rbd_client; 178 179 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 180 181 spinlock_t lock; /* queue lock */ 182 183 struct rbd_image_header header; 184 char *image_id; 185 size_t image_id_len; 186 char *image_name; 187 size_t image_name_len; 188 char *header_name; 189 char *pool_name; 190 int pool_id; 191 192 struct ceph_osd_event *watch_event; 193 struct ceph_osd_request *watch_request; 194 195 /* protects updating the header */ 196 struct rw_semaphore header_rwsem; 197 198 struct rbd_mapping mapping; 199 200 struct list_head node; 201 202 /* list of snapshots */ 203 struct list_head snaps; 204 205 /* sysfs related */ 206 struct device dev; 207 }; 208 209 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 210 211 static LIST_HEAD(rbd_dev_list); /* devices */ 212 static DEFINE_SPINLOCK(rbd_dev_list_lock); 213 214 static LIST_HEAD(rbd_client_list); /* clients */ 215 static DEFINE_SPINLOCK(rbd_client_list_lock); 216 217 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); 218 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); 219 220 static void rbd_dev_release(struct device *dev); 221 static void __rbd_remove_snap_dev(struct rbd_snap *snap); 222 223 static ssize_t rbd_add(struct bus_type *bus, const char *buf, 224 size_t count); 225 static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 226 size_t count); 227 228 static struct bus_attribute rbd_bus_attrs[] = { 229 __ATTR(add, S_IWUSR, NULL, rbd_add), 230 __ATTR(remove, S_IWUSR, NULL, rbd_remove), 231 __ATTR_NULL 232 }; 233 234 static struct bus_type rbd_bus_type = { 235 .name = "rbd", 236 .bus_attrs = rbd_bus_attrs, 237 }; 238 239 static void rbd_root_dev_release(struct device *dev) 240 { 241 } 242 243 static struct device rbd_root_dev = { 244 .init_name = "rbd", 245 .release = rbd_root_dev_release, 246 }; 247 248 #ifdef RBD_DEBUG 249 #define rbd_assert(expr) \ 250 if (unlikely(!(expr))) { \ 251 printk(KERN_ERR "\nAssertion failure in %s() " \ 252 "at line %d:\n\n" \ 253 "\trbd_assert(%s);\n\n", \ 254 __func__, __LINE__, #expr); \ 255 BUG(); \ 256 } 257 #else /* !RBD_DEBUG */ 258 # define rbd_assert(expr) ((void) 0) 259 #endif /* !RBD_DEBUG */ 260 261 static struct device *rbd_get_dev(struct rbd_device *rbd_dev) 262 { 263 return get_device(&rbd_dev->dev); 264 } 265 266 static void rbd_put_dev(struct rbd_device *rbd_dev) 267 { 268 put_device(&rbd_dev->dev); 269 } 270 271 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver); 272 273 static int rbd_open(struct block_device *bdev, fmode_t mode) 274 { 275 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 276 277 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) 278 return -EROFS; 279 280 rbd_get_dev(rbd_dev); 281 set_device_ro(bdev, rbd_dev->mapping.read_only); 282 283 return 0; 284 } 285 286 static int rbd_release(struct gendisk *disk, fmode_t mode) 287 { 288 struct rbd_device *rbd_dev = disk->private_data; 289 290 rbd_put_dev(rbd_dev); 291 292 return 0; 293 } 294 295 static const struct block_device_operations rbd_bd_ops = { 296 .owner = THIS_MODULE, 297 .open = rbd_open, 298 .release = rbd_release, 299 }; 300 301 /* 302 * Initialize an rbd client instance. 303 * We own *ceph_opts. 304 */ 305 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 306 { 307 struct rbd_client *rbdc; 308 int ret = -ENOMEM; 309 310 dout("rbd_client_create\n"); 311 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 312 if (!rbdc) 313 goto out_opt; 314 315 kref_init(&rbdc->kref); 316 INIT_LIST_HEAD(&rbdc->node); 317 318 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 319 320 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); 321 if (IS_ERR(rbdc->client)) 322 goto out_mutex; 323 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 324 325 ret = ceph_open_session(rbdc->client); 326 if (ret < 0) 327 goto out_err; 328 329 spin_lock(&rbd_client_list_lock); 330 list_add_tail(&rbdc->node, &rbd_client_list); 331 spin_unlock(&rbd_client_list_lock); 332 333 mutex_unlock(&ctl_mutex); 334 335 dout("rbd_client_create created %p\n", rbdc); 336 return rbdc; 337 338 out_err: 339 ceph_destroy_client(rbdc->client); 340 out_mutex: 341 mutex_unlock(&ctl_mutex); 342 kfree(rbdc); 343 out_opt: 344 if (ceph_opts) 345 ceph_destroy_options(ceph_opts); 346 return ERR_PTR(ret); 347 } 348 349 /* 350 * Find a ceph client with specific addr and configuration. If 351 * found, bump its reference count. 352 */ 353 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) 354 { 355 struct rbd_client *client_node; 356 bool found = false; 357 358 if (ceph_opts->flags & CEPH_OPT_NOSHARE) 359 return NULL; 360 361 spin_lock(&rbd_client_list_lock); 362 list_for_each_entry(client_node, &rbd_client_list, node) { 363 if (!ceph_compare_options(ceph_opts, client_node->client)) { 364 kref_get(&client_node->kref); 365 found = true; 366 break; 367 } 368 } 369 spin_unlock(&rbd_client_list_lock); 370 371 return found ? client_node : NULL; 372 } 373 374 /* 375 * mount options 376 */ 377 enum { 378 Opt_last_int, 379 /* int args above */ 380 Opt_last_string, 381 /* string args above */ 382 Opt_read_only, 383 Opt_read_write, 384 /* Boolean args above */ 385 Opt_last_bool, 386 }; 387 388 static match_table_t rbd_opts_tokens = { 389 /* int args above */ 390 /* string args above */ 391 {Opt_read_only, "mapping.read_only"}, 392 {Opt_read_only, "ro"}, /* Alternate spelling */ 393 {Opt_read_write, "read_write"}, 394 {Opt_read_write, "rw"}, /* Alternate spelling */ 395 /* Boolean args above */ 396 {-1, NULL} 397 }; 398 399 static int parse_rbd_opts_token(char *c, void *private) 400 { 401 struct rbd_options *rbd_opts = private; 402 substring_t argstr[MAX_OPT_ARGS]; 403 int token, intval, ret; 404 405 token = match_token(c, rbd_opts_tokens, argstr); 406 if (token < 0) 407 return -EINVAL; 408 409 if (token < Opt_last_int) { 410 ret = match_int(&argstr[0], &intval); 411 if (ret < 0) { 412 pr_err("bad mount option arg (not int) " 413 "at '%s'\n", c); 414 return ret; 415 } 416 dout("got int token %d val %d\n", token, intval); 417 } else if (token > Opt_last_int && token < Opt_last_string) { 418 dout("got string token %d val %s\n", token, 419 argstr[0].from); 420 } else if (token > Opt_last_string && token < Opt_last_bool) { 421 dout("got Boolean token %d\n", token); 422 } else { 423 dout("got token %d\n", token); 424 } 425 426 switch (token) { 427 case Opt_read_only: 428 rbd_opts->read_only = true; 429 break; 430 case Opt_read_write: 431 rbd_opts->read_only = false; 432 break; 433 default: 434 rbd_assert(false); 435 break; 436 } 437 return 0; 438 } 439 440 /* 441 * Get a ceph client with specific addr and configuration, if one does 442 * not exist create it. 443 */ 444 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, 445 size_t mon_addr_len, char *options) 446 { 447 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts; 448 struct ceph_options *ceph_opts; 449 struct rbd_client *rbdc; 450 451 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 452 453 ceph_opts = ceph_parse_options(options, mon_addr, 454 mon_addr + mon_addr_len, 455 parse_rbd_opts_token, rbd_opts); 456 if (IS_ERR(ceph_opts)) 457 return PTR_ERR(ceph_opts); 458 459 rbdc = rbd_client_find(ceph_opts); 460 if (rbdc) { 461 /* using an existing client */ 462 ceph_destroy_options(ceph_opts); 463 } else { 464 rbdc = rbd_client_create(ceph_opts); 465 if (IS_ERR(rbdc)) 466 return PTR_ERR(rbdc); 467 } 468 rbd_dev->rbd_client = rbdc; 469 470 return 0; 471 } 472 473 /* 474 * Destroy ceph client 475 * 476 * Caller must hold rbd_client_list_lock. 477 */ 478 static void rbd_client_release(struct kref *kref) 479 { 480 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 481 482 dout("rbd_release_client %p\n", rbdc); 483 spin_lock(&rbd_client_list_lock); 484 list_del(&rbdc->node); 485 spin_unlock(&rbd_client_list_lock); 486 487 ceph_destroy_client(rbdc->client); 488 kfree(rbdc); 489 } 490 491 /* 492 * Drop reference to ceph client node. If it's not referenced anymore, release 493 * it. 494 */ 495 static void rbd_put_client(struct rbd_device *rbd_dev) 496 { 497 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); 498 rbd_dev->rbd_client = NULL; 499 } 500 501 /* 502 * Destroy requests collection 503 */ 504 static void rbd_coll_release(struct kref *kref) 505 { 506 struct rbd_req_coll *coll = 507 container_of(kref, struct rbd_req_coll, kref); 508 509 dout("rbd_coll_release %p\n", coll); 510 kfree(coll); 511 } 512 513 static bool rbd_image_format_valid(u32 image_format) 514 { 515 return image_format == 1 || image_format == 2; 516 } 517 518 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) 519 { 520 size_t size; 521 u32 snap_count; 522 523 /* The header has to start with the magic rbd header text */ 524 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 525 return false; 526 527 /* 528 * The size of a snapshot header has to fit in a size_t, and 529 * that limits the number of snapshots. 530 */ 531 snap_count = le32_to_cpu(ondisk->snap_count); 532 size = SIZE_MAX - sizeof (struct ceph_snap_context); 533 if (snap_count > size / sizeof (__le64)) 534 return false; 535 536 /* 537 * Not only that, but the size of the entire the snapshot 538 * header must also be representable in a size_t. 539 */ 540 size -= snap_count * sizeof (__le64); 541 if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) 542 return false; 543 544 return true; 545 } 546 547 /* 548 * Create a new header structure, translate header format from the on-disk 549 * header. 550 */ 551 static int rbd_header_from_disk(struct rbd_image_header *header, 552 struct rbd_image_header_ondisk *ondisk) 553 { 554 u32 snap_count; 555 size_t len; 556 size_t size; 557 u32 i; 558 559 memset(header, 0, sizeof (*header)); 560 561 snap_count = le32_to_cpu(ondisk->snap_count); 562 563 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix)); 564 header->object_prefix = kmalloc(len + 1, GFP_KERNEL); 565 if (!header->object_prefix) 566 return -ENOMEM; 567 memcpy(header->object_prefix, ondisk->object_prefix, len); 568 header->object_prefix[len] = '\0'; 569 570 if (snap_count) { 571 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 572 573 /* Save a copy of the snapshot names */ 574 575 if (snap_names_len > (u64) SIZE_MAX) 576 return -EIO; 577 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL); 578 if (!header->snap_names) 579 goto out_err; 580 /* 581 * Note that rbd_dev_v1_header_read() guarantees 582 * the ondisk buffer we're working with has 583 * snap_names_len bytes beyond the end of the 584 * snapshot id array, this memcpy() is safe. 585 */ 586 memcpy(header->snap_names, &ondisk->snaps[snap_count], 587 snap_names_len); 588 589 /* Record each snapshot's size */ 590 591 size = snap_count * sizeof (*header->snap_sizes); 592 header->snap_sizes = kmalloc(size, GFP_KERNEL); 593 if (!header->snap_sizes) 594 goto out_err; 595 for (i = 0; i < snap_count; i++) 596 header->snap_sizes[i] = 597 le64_to_cpu(ondisk->snaps[i].image_size); 598 } else { 599 WARN_ON(ondisk->snap_names_len); 600 header->snap_names = NULL; 601 header->snap_sizes = NULL; 602 } 603 604 header->features = 0; /* No features support in v1 images */ 605 header->obj_order = ondisk->options.order; 606 header->crypt_type = ondisk->options.crypt_type; 607 header->comp_type = ondisk->options.comp_type; 608 609 /* Allocate and fill in the snapshot context */ 610 611 header->image_size = le64_to_cpu(ondisk->image_size); 612 size = sizeof (struct ceph_snap_context); 613 size += snap_count * sizeof (header->snapc->snaps[0]); 614 header->snapc = kzalloc(size, GFP_KERNEL); 615 if (!header->snapc) 616 goto out_err; 617 618 atomic_set(&header->snapc->nref, 1); 619 header->snapc->seq = le64_to_cpu(ondisk->snap_seq); 620 header->snapc->num_snaps = snap_count; 621 for (i = 0; i < snap_count; i++) 622 header->snapc->snaps[i] = 623 le64_to_cpu(ondisk->snaps[i].id); 624 625 return 0; 626 627 out_err: 628 kfree(header->snap_sizes); 629 header->snap_sizes = NULL; 630 kfree(header->snap_names); 631 header->snap_names = NULL; 632 kfree(header->object_prefix); 633 header->object_prefix = NULL; 634 635 return -ENOMEM; 636 } 637 638 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) 639 { 640 641 struct rbd_snap *snap; 642 643 list_for_each_entry(snap, &rbd_dev->snaps, node) { 644 if (!strcmp(snap_name, snap->name)) { 645 rbd_dev->mapping.snap_id = snap->id; 646 rbd_dev->mapping.size = snap->size; 647 rbd_dev->mapping.features = snap->features; 648 649 return 0; 650 } 651 } 652 653 return -ENOENT; 654 } 655 656 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name) 657 { 658 int ret; 659 660 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME, 661 sizeof (RBD_SNAP_HEAD_NAME))) { 662 rbd_dev->mapping.snap_id = CEPH_NOSNAP; 663 rbd_dev->mapping.size = rbd_dev->header.image_size; 664 rbd_dev->mapping.features = rbd_dev->header.features; 665 rbd_dev->mapping.snap_exists = false; 666 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only; 667 ret = 0; 668 } else { 669 ret = snap_by_name(rbd_dev, snap_name); 670 if (ret < 0) 671 goto done; 672 rbd_dev->mapping.snap_exists = true; 673 rbd_dev->mapping.read_only = true; 674 } 675 rbd_dev->mapping.snap_name = snap_name; 676 done: 677 return ret; 678 } 679 680 static void rbd_header_free(struct rbd_image_header *header) 681 { 682 kfree(header->object_prefix); 683 header->object_prefix = NULL; 684 kfree(header->snap_sizes); 685 header->snap_sizes = NULL; 686 kfree(header->snap_names); 687 header->snap_names = NULL; 688 ceph_put_snap_context(header->snapc); 689 header->snapc = NULL; 690 } 691 692 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) 693 { 694 char *name; 695 u64 segment; 696 int ret; 697 698 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); 699 if (!name) 700 return NULL; 701 segment = offset >> rbd_dev->header.obj_order; 702 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx", 703 rbd_dev->header.object_prefix, segment); 704 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) { 705 pr_err("error formatting segment name for #%llu (%d)\n", 706 segment, ret); 707 kfree(name); 708 name = NULL; 709 } 710 711 return name; 712 } 713 714 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) 715 { 716 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 717 718 return offset & (segment_size - 1); 719 } 720 721 static u64 rbd_segment_length(struct rbd_device *rbd_dev, 722 u64 offset, u64 length) 723 { 724 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 725 726 offset &= segment_size - 1; 727 728 rbd_assert(length <= U64_MAX - offset); 729 if (offset + length > segment_size) 730 length = segment_size - offset; 731 732 return length; 733 } 734 735 static int rbd_get_num_segments(struct rbd_image_header *header, 736 u64 ofs, u64 len) 737 { 738 u64 start_seg; 739 u64 end_seg; 740 741 if (!len) 742 return 0; 743 if (len - 1 > U64_MAX - ofs) 744 return -ERANGE; 745 746 start_seg = ofs >> header->obj_order; 747 end_seg = (ofs + len - 1) >> header->obj_order; 748 749 return end_seg - start_seg + 1; 750 } 751 752 /* 753 * returns the size of an object in the image 754 */ 755 static u64 rbd_obj_bytes(struct rbd_image_header *header) 756 { 757 return 1 << header->obj_order; 758 } 759 760 /* 761 * bio helpers 762 */ 763 764 static void bio_chain_put(struct bio *chain) 765 { 766 struct bio *tmp; 767 768 while (chain) { 769 tmp = chain; 770 chain = chain->bi_next; 771 bio_put(tmp); 772 } 773 } 774 775 /* 776 * zeros a bio chain, starting at specific offset 777 */ 778 static void zero_bio_chain(struct bio *chain, int start_ofs) 779 { 780 struct bio_vec *bv; 781 unsigned long flags; 782 void *buf; 783 int i; 784 int pos = 0; 785 786 while (chain) { 787 bio_for_each_segment(bv, chain, i) { 788 if (pos + bv->bv_len > start_ofs) { 789 int remainder = max(start_ofs - pos, 0); 790 buf = bvec_kmap_irq(bv, &flags); 791 memset(buf + remainder, 0, 792 bv->bv_len - remainder); 793 bvec_kunmap_irq(buf, &flags); 794 } 795 pos += bv->bv_len; 796 } 797 798 chain = chain->bi_next; 799 } 800 } 801 802 /* 803 * bio_chain_clone - clone a chain of bios up to a certain length. 804 * might return a bio_pair that will need to be released. 805 */ 806 static struct bio *bio_chain_clone(struct bio **old, struct bio **next, 807 struct bio_pair **bp, 808 int len, gfp_t gfpmask) 809 { 810 struct bio *old_chain = *old; 811 struct bio *new_chain = NULL; 812 struct bio *tail; 813 int total = 0; 814 815 if (*bp) { 816 bio_pair_release(*bp); 817 *bp = NULL; 818 } 819 820 while (old_chain && (total < len)) { 821 struct bio *tmp; 822 823 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); 824 if (!tmp) 825 goto err_out; 826 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */ 827 828 if (total + old_chain->bi_size > len) { 829 struct bio_pair *bp; 830 831 /* 832 * this split can only happen with a single paged bio, 833 * split_bio will BUG_ON if this is not the case 834 */ 835 dout("bio_chain_clone split! total=%d remaining=%d" 836 "bi_size=%u\n", 837 total, len - total, old_chain->bi_size); 838 839 /* split the bio. We'll release it either in the next 840 call, or it will have to be released outside */ 841 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE); 842 if (!bp) 843 goto err_out; 844 845 __bio_clone(tmp, &bp->bio1); 846 847 *next = &bp->bio2; 848 } else { 849 __bio_clone(tmp, old_chain); 850 *next = old_chain->bi_next; 851 } 852 853 tmp->bi_bdev = NULL; 854 tmp->bi_next = NULL; 855 if (new_chain) 856 tail->bi_next = tmp; 857 else 858 new_chain = tmp; 859 tail = tmp; 860 old_chain = old_chain->bi_next; 861 862 total += tmp->bi_size; 863 } 864 865 rbd_assert(total == len); 866 867 *old = old_chain; 868 869 return new_chain; 870 871 err_out: 872 dout("bio_chain_clone with err\n"); 873 bio_chain_put(new_chain); 874 return NULL; 875 } 876 877 /* 878 * helpers for osd request op vectors. 879 */ 880 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops, 881 int opcode, u32 payload_len) 882 { 883 struct ceph_osd_req_op *ops; 884 885 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO); 886 if (!ops) 887 return NULL; 888 889 ops[0].op = opcode; 890 891 /* 892 * op extent offset and length will be set later on 893 * in calc_raw_layout() 894 */ 895 ops[0].payload_len = payload_len; 896 897 return ops; 898 } 899 900 static void rbd_destroy_ops(struct ceph_osd_req_op *ops) 901 { 902 kfree(ops); 903 } 904 905 static void rbd_coll_end_req_index(struct request *rq, 906 struct rbd_req_coll *coll, 907 int index, 908 int ret, u64 len) 909 { 910 struct request_queue *q; 911 int min, max, i; 912 913 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n", 914 coll, index, ret, (unsigned long long) len); 915 916 if (!rq) 917 return; 918 919 if (!coll) { 920 blk_end_request(rq, ret, len); 921 return; 922 } 923 924 q = rq->q; 925 926 spin_lock_irq(q->queue_lock); 927 coll->status[index].done = 1; 928 coll->status[index].rc = ret; 929 coll->status[index].bytes = len; 930 max = min = coll->num_done; 931 while (max < coll->total && coll->status[max].done) 932 max++; 933 934 for (i = min; i<max; i++) { 935 __blk_end_request(rq, coll->status[i].rc, 936 coll->status[i].bytes); 937 coll->num_done++; 938 kref_put(&coll->kref, rbd_coll_release); 939 } 940 spin_unlock_irq(q->queue_lock); 941 } 942 943 static void rbd_coll_end_req(struct rbd_request *req, 944 int ret, u64 len) 945 { 946 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len); 947 } 948 949 /* 950 * Send ceph osd request 951 */ 952 static int rbd_do_request(struct request *rq, 953 struct rbd_device *rbd_dev, 954 struct ceph_snap_context *snapc, 955 u64 snapid, 956 const char *object_name, u64 ofs, u64 len, 957 struct bio *bio, 958 struct page **pages, 959 int num_pages, 960 int flags, 961 struct ceph_osd_req_op *ops, 962 struct rbd_req_coll *coll, 963 int coll_index, 964 void (*rbd_cb)(struct ceph_osd_request *req, 965 struct ceph_msg *msg), 966 struct ceph_osd_request **linger_req, 967 u64 *ver) 968 { 969 struct ceph_osd_request *req; 970 struct ceph_file_layout *layout; 971 int ret; 972 u64 bno; 973 struct timespec mtime = CURRENT_TIME; 974 struct rbd_request *req_data; 975 struct ceph_osd_request_head *reqhead; 976 struct ceph_osd_client *osdc; 977 978 req_data = kzalloc(sizeof(*req_data), GFP_NOIO); 979 if (!req_data) { 980 if (coll) 981 rbd_coll_end_req_index(rq, coll, coll_index, 982 -ENOMEM, len); 983 return -ENOMEM; 984 } 985 986 if (coll) { 987 req_data->coll = coll; 988 req_data->coll_index = coll_index; 989 } 990 991 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name, 992 (unsigned long long) ofs, (unsigned long long) len); 993 994 osdc = &rbd_dev->rbd_client->client->osdc; 995 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, 996 false, GFP_NOIO, pages, bio); 997 if (!req) { 998 ret = -ENOMEM; 999 goto done_pages; 1000 } 1001 1002 req->r_callback = rbd_cb; 1003 1004 req_data->rq = rq; 1005 req_data->bio = bio; 1006 req_data->pages = pages; 1007 req_data->len = len; 1008 1009 req->r_priv = req_data; 1010 1011 reqhead = req->r_request->front.iov_base; 1012 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP); 1013 1014 strncpy(req->r_oid, object_name, sizeof(req->r_oid)); 1015 req->r_oid_len = strlen(req->r_oid); 1016 1017 layout = &req->r_file_layout; 1018 memset(layout, 0, sizeof(*layout)); 1019 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 1020 layout->fl_stripe_count = cpu_to_le32(1); 1021 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 1022 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id); 1023 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, 1024 req, ops); 1025 rbd_assert(ret == 0); 1026 1027 ceph_osdc_build_request(req, ofs, &len, 1028 ops, 1029 snapc, 1030 &mtime, 1031 req->r_oid, req->r_oid_len); 1032 1033 if (linger_req) { 1034 ceph_osdc_set_request_linger(osdc, req); 1035 *linger_req = req; 1036 } 1037 1038 ret = ceph_osdc_start_request(osdc, req, false); 1039 if (ret < 0) 1040 goto done_err; 1041 1042 if (!rbd_cb) { 1043 ret = ceph_osdc_wait_request(osdc, req); 1044 if (ver) 1045 *ver = le64_to_cpu(req->r_reassert_version.version); 1046 dout("reassert_ver=%llu\n", 1047 (unsigned long long) 1048 le64_to_cpu(req->r_reassert_version.version)); 1049 ceph_osdc_put_request(req); 1050 } 1051 return ret; 1052 1053 done_err: 1054 bio_chain_put(req_data->bio); 1055 ceph_osdc_put_request(req); 1056 done_pages: 1057 rbd_coll_end_req(req_data, ret, len); 1058 kfree(req_data); 1059 return ret; 1060 } 1061 1062 /* 1063 * Ceph osd op callback 1064 */ 1065 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 1066 { 1067 struct rbd_request *req_data = req->r_priv; 1068 struct ceph_osd_reply_head *replyhead; 1069 struct ceph_osd_op *op; 1070 __s32 rc; 1071 u64 bytes; 1072 int read_op; 1073 1074 /* parse reply */ 1075 replyhead = msg->front.iov_base; 1076 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); 1077 op = (void *)(replyhead + 1); 1078 rc = le32_to_cpu(replyhead->result); 1079 bytes = le64_to_cpu(op->extent.length); 1080 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ); 1081 1082 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n", 1083 (unsigned long long) bytes, read_op, (int) rc); 1084 1085 if (rc == -ENOENT && read_op) { 1086 zero_bio_chain(req_data->bio, 0); 1087 rc = 0; 1088 } else if (rc == 0 && read_op && bytes < req_data->len) { 1089 zero_bio_chain(req_data->bio, bytes); 1090 bytes = req_data->len; 1091 } 1092 1093 rbd_coll_end_req(req_data, rc, bytes); 1094 1095 if (req_data->bio) 1096 bio_chain_put(req_data->bio); 1097 1098 ceph_osdc_put_request(req); 1099 kfree(req_data); 1100 } 1101 1102 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 1103 { 1104 ceph_osdc_put_request(req); 1105 } 1106 1107 /* 1108 * Do a synchronous ceph osd operation 1109 */ 1110 static int rbd_req_sync_op(struct rbd_device *rbd_dev, 1111 struct ceph_snap_context *snapc, 1112 u64 snapid, 1113 int flags, 1114 struct ceph_osd_req_op *ops, 1115 const char *object_name, 1116 u64 ofs, u64 inbound_size, 1117 char *inbound, 1118 struct ceph_osd_request **linger_req, 1119 u64 *ver) 1120 { 1121 int ret; 1122 struct page **pages; 1123 int num_pages; 1124 1125 rbd_assert(ops != NULL); 1126 1127 num_pages = calc_pages_for(ofs, inbound_size); 1128 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 1129 if (IS_ERR(pages)) 1130 return PTR_ERR(pages); 1131 1132 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid, 1133 object_name, ofs, inbound_size, NULL, 1134 pages, num_pages, 1135 flags, 1136 ops, 1137 NULL, 0, 1138 NULL, 1139 linger_req, ver); 1140 if (ret < 0) 1141 goto done; 1142 1143 if ((flags & CEPH_OSD_FLAG_READ) && inbound) 1144 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret); 1145 1146 done: 1147 ceph_release_page_vector(pages, num_pages); 1148 return ret; 1149 } 1150 1151 /* 1152 * Do an asynchronous ceph osd operation 1153 */ 1154 static int rbd_do_op(struct request *rq, 1155 struct rbd_device *rbd_dev, 1156 struct ceph_snap_context *snapc, 1157 u64 snapid, 1158 int opcode, int flags, 1159 u64 ofs, u64 len, 1160 struct bio *bio, 1161 struct rbd_req_coll *coll, 1162 int coll_index) 1163 { 1164 char *seg_name; 1165 u64 seg_ofs; 1166 u64 seg_len; 1167 int ret; 1168 struct ceph_osd_req_op *ops; 1169 u32 payload_len; 1170 1171 seg_name = rbd_segment_name(rbd_dev, ofs); 1172 if (!seg_name) 1173 return -ENOMEM; 1174 seg_len = rbd_segment_length(rbd_dev, ofs, len); 1175 seg_ofs = rbd_segment_offset(rbd_dev, ofs); 1176 1177 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0); 1178 1179 ret = -ENOMEM; 1180 ops = rbd_create_rw_ops(1, opcode, payload_len); 1181 if (!ops) 1182 goto done; 1183 1184 /* we've taken care of segment sizes earlier when we 1185 cloned the bios. We should never have a segment 1186 truncated at this point */ 1187 rbd_assert(seg_len == len); 1188 1189 ret = rbd_do_request(rq, rbd_dev, snapc, snapid, 1190 seg_name, seg_ofs, seg_len, 1191 bio, 1192 NULL, 0, 1193 flags, 1194 ops, 1195 coll, coll_index, 1196 rbd_req_cb, 0, NULL); 1197 1198 rbd_destroy_ops(ops); 1199 done: 1200 kfree(seg_name); 1201 return ret; 1202 } 1203 1204 /* 1205 * Request async osd write 1206 */ 1207 static int rbd_req_write(struct request *rq, 1208 struct rbd_device *rbd_dev, 1209 struct ceph_snap_context *snapc, 1210 u64 ofs, u64 len, 1211 struct bio *bio, 1212 struct rbd_req_coll *coll, 1213 int coll_index) 1214 { 1215 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, 1216 CEPH_OSD_OP_WRITE, 1217 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1218 ofs, len, bio, coll, coll_index); 1219 } 1220 1221 /* 1222 * Request async osd read 1223 */ 1224 static int rbd_req_read(struct request *rq, 1225 struct rbd_device *rbd_dev, 1226 u64 snapid, 1227 u64 ofs, u64 len, 1228 struct bio *bio, 1229 struct rbd_req_coll *coll, 1230 int coll_index) 1231 { 1232 return rbd_do_op(rq, rbd_dev, NULL, 1233 snapid, 1234 CEPH_OSD_OP_READ, 1235 CEPH_OSD_FLAG_READ, 1236 ofs, len, bio, coll, coll_index); 1237 } 1238 1239 /* 1240 * Request sync osd read 1241 */ 1242 static int rbd_req_sync_read(struct rbd_device *rbd_dev, 1243 u64 snapid, 1244 const char *object_name, 1245 u64 ofs, u64 len, 1246 char *buf, 1247 u64 *ver) 1248 { 1249 struct ceph_osd_req_op *ops; 1250 int ret; 1251 1252 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0); 1253 if (!ops) 1254 return -ENOMEM; 1255 1256 ret = rbd_req_sync_op(rbd_dev, NULL, 1257 snapid, 1258 CEPH_OSD_FLAG_READ, 1259 ops, object_name, ofs, len, buf, NULL, ver); 1260 rbd_destroy_ops(ops); 1261 1262 return ret; 1263 } 1264 1265 /* 1266 * Request sync osd watch 1267 */ 1268 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev, 1269 u64 ver, 1270 u64 notify_id) 1271 { 1272 struct ceph_osd_req_op *ops; 1273 int ret; 1274 1275 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0); 1276 if (!ops) 1277 return -ENOMEM; 1278 1279 ops[0].watch.ver = cpu_to_le64(ver); 1280 ops[0].watch.cookie = notify_id; 1281 ops[0].watch.flag = 0; 1282 1283 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP, 1284 rbd_dev->header_name, 0, 0, NULL, 1285 NULL, 0, 1286 CEPH_OSD_FLAG_READ, 1287 ops, 1288 NULL, 0, 1289 rbd_simple_req_cb, 0, NULL); 1290 1291 rbd_destroy_ops(ops); 1292 return ret; 1293 } 1294 1295 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 1296 { 1297 struct rbd_device *rbd_dev = (struct rbd_device *)data; 1298 u64 hver; 1299 int rc; 1300 1301 if (!rbd_dev) 1302 return; 1303 1304 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n", 1305 rbd_dev->header_name, (unsigned long long) notify_id, 1306 (unsigned int) opcode); 1307 rc = rbd_refresh_header(rbd_dev, &hver); 1308 if (rc) 1309 pr_warning(RBD_DRV_NAME "%d got notification but failed to " 1310 " update snaps: %d\n", rbd_dev->major, rc); 1311 1312 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id); 1313 } 1314 1315 /* 1316 * Request sync osd watch 1317 */ 1318 static int rbd_req_sync_watch(struct rbd_device *rbd_dev) 1319 { 1320 struct ceph_osd_req_op *ops; 1321 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1322 int ret; 1323 1324 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0); 1325 if (!ops) 1326 return -ENOMEM; 1327 1328 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, 1329 (void *)rbd_dev, &rbd_dev->watch_event); 1330 if (ret < 0) 1331 goto fail; 1332 1333 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version); 1334 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie); 1335 ops[0].watch.flag = 1; 1336 1337 ret = rbd_req_sync_op(rbd_dev, NULL, 1338 CEPH_NOSNAP, 1339 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1340 ops, 1341 rbd_dev->header_name, 1342 0, 0, NULL, 1343 &rbd_dev->watch_request, NULL); 1344 1345 if (ret < 0) 1346 goto fail_event; 1347 1348 rbd_destroy_ops(ops); 1349 return 0; 1350 1351 fail_event: 1352 ceph_osdc_cancel_event(rbd_dev->watch_event); 1353 rbd_dev->watch_event = NULL; 1354 fail: 1355 rbd_destroy_ops(ops); 1356 return ret; 1357 } 1358 1359 /* 1360 * Request sync osd unwatch 1361 */ 1362 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev) 1363 { 1364 struct ceph_osd_req_op *ops; 1365 int ret; 1366 1367 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0); 1368 if (!ops) 1369 return -ENOMEM; 1370 1371 ops[0].watch.ver = 0; 1372 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie); 1373 ops[0].watch.flag = 0; 1374 1375 ret = rbd_req_sync_op(rbd_dev, NULL, 1376 CEPH_NOSNAP, 1377 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1378 ops, 1379 rbd_dev->header_name, 1380 0, 0, NULL, NULL, NULL); 1381 1382 1383 rbd_destroy_ops(ops); 1384 ceph_osdc_cancel_event(rbd_dev->watch_event); 1385 rbd_dev->watch_event = NULL; 1386 return ret; 1387 } 1388 1389 /* 1390 * Synchronous osd object method call 1391 */ 1392 static int rbd_req_sync_exec(struct rbd_device *rbd_dev, 1393 const char *object_name, 1394 const char *class_name, 1395 const char *method_name, 1396 const char *outbound, 1397 size_t outbound_size, 1398 char *inbound, 1399 size_t inbound_size, 1400 int flags, 1401 u64 *ver) 1402 { 1403 struct ceph_osd_req_op *ops; 1404 int class_name_len = strlen(class_name); 1405 int method_name_len = strlen(method_name); 1406 int payload_size; 1407 int ret; 1408 1409 /* 1410 * Any input parameters required by the method we're calling 1411 * will be sent along with the class and method names as 1412 * part of the message payload. That data and its size are 1413 * supplied via the indata and indata_len fields (named from 1414 * the perspective of the server side) in the OSD request 1415 * operation. 1416 */ 1417 payload_size = class_name_len + method_name_len + outbound_size; 1418 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size); 1419 if (!ops) 1420 return -ENOMEM; 1421 1422 ops[0].cls.class_name = class_name; 1423 ops[0].cls.class_len = (__u8) class_name_len; 1424 ops[0].cls.method_name = method_name; 1425 ops[0].cls.method_len = (__u8) method_name_len; 1426 ops[0].cls.argc = 0; 1427 ops[0].cls.indata = outbound; 1428 ops[0].cls.indata_len = outbound_size; 1429 1430 ret = rbd_req_sync_op(rbd_dev, NULL, 1431 CEPH_NOSNAP, 1432 flags, ops, 1433 object_name, 0, inbound_size, inbound, 1434 NULL, ver); 1435 1436 rbd_destroy_ops(ops); 1437 1438 dout("cls_exec returned %d\n", ret); 1439 return ret; 1440 } 1441 1442 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs) 1443 { 1444 struct rbd_req_coll *coll = 1445 kzalloc(sizeof(struct rbd_req_coll) + 1446 sizeof(struct rbd_req_status) * num_reqs, 1447 GFP_ATOMIC); 1448 1449 if (!coll) 1450 return NULL; 1451 coll->total = num_reqs; 1452 kref_init(&coll->kref); 1453 return coll; 1454 } 1455 1456 /* 1457 * block device queue callback 1458 */ 1459 static void rbd_rq_fn(struct request_queue *q) 1460 { 1461 struct rbd_device *rbd_dev = q->queuedata; 1462 struct request *rq; 1463 struct bio_pair *bp = NULL; 1464 1465 while ((rq = blk_fetch_request(q))) { 1466 struct bio *bio; 1467 struct bio *rq_bio, *next_bio = NULL; 1468 bool do_write; 1469 unsigned int size; 1470 u64 op_size = 0; 1471 u64 ofs; 1472 int num_segs, cur_seg = 0; 1473 struct rbd_req_coll *coll; 1474 struct ceph_snap_context *snapc; 1475 1476 dout("fetched request\n"); 1477 1478 /* filter out block requests we don't understand */ 1479 if ((rq->cmd_type != REQ_TYPE_FS)) { 1480 __blk_end_request_all(rq, 0); 1481 continue; 1482 } 1483 1484 /* deduce our operation (read, write) */ 1485 do_write = (rq_data_dir(rq) == WRITE); 1486 1487 size = blk_rq_bytes(rq); 1488 ofs = blk_rq_pos(rq) * SECTOR_SIZE; 1489 rq_bio = rq->bio; 1490 if (do_write && rbd_dev->mapping.read_only) { 1491 __blk_end_request_all(rq, -EROFS); 1492 continue; 1493 } 1494 1495 spin_unlock_irq(q->queue_lock); 1496 1497 down_read(&rbd_dev->header_rwsem); 1498 1499 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP && 1500 !rbd_dev->mapping.snap_exists) { 1501 up_read(&rbd_dev->header_rwsem); 1502 dout("request for non-existent snapshot"); 1503 spin_lock_irq(q->queue_lock); 1504 __blk_end_request_all(rq, -ENXIO); 1505 continue; 1506 } 1507 1508 snapc = ceph_get_snap_context(rbd_dev->header.snapc); 1509 1510 up_read(&rbd_dev->header_rwsem); 1511 1512 dout("%s 0x%x bytes at 0x%llx\n", 1513 do_write ? "write" : "read", 1514 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); 1515 1516 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); 1517 if (num_segs <= 0) { 1518 spin_lock_irq(q->queue_lock); 1519 __blk_end_request_all(rq, num_segs); 1520 ceph_put_snap_context(snapc); 1521 continue; 1522 } 1523 coll = rbd_alloc_coll(num_segs); 1524 if (!coll) { 1525 spin_lock_irq(q->queue_lock); 1526 __blk_end_request_all(rq, -ENOMEM); 1527 ceph_put_snap_context(snapc); 1528 continue; 1529 } 1530 1531 do { 1532 /* a bio clone to be passed down to OSD req */ 1533 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); 1534 op_size = rbd_segment_length(rbd_dev, ofs, size); 1535 kref_get(&coll->kref); 1536 bio = bio_chain_clone(&rq_bio, &next_bio, &bp, 1537 op_size, GFP_ATOMIC); 1538 if (!bio) { 1539 rbd_coll_end_req_index(rq, coll, cur_seg, 1540 -ENOMEM, op_size); 1541 goto next_seg; 1542 } 1543 1544 1545 /* init OSD command: write or read */ 1546 if (do_write) 1547 rbd_req_write(rq, rbd_dev, 1548 snapc, 1549 ofs, 1550 op_size, bio, 1551 coll, cur_seg); 1552 else 1553 rbd_req_read(rq, rbd_dev, 1554 rbd_dev->mapping.snap_id, 1555 ofs, 1556 op_size, bio, 1557 coll, cur_seg); 1558 1559 next_seg: 1560 size -= op_size; 1561 ofs += op_size; 1562 1563 cur_seg++; 1564 rq_bio = next_bio; 1565 } while (size > 0); 1566 kref_put(&coll->kref, rbd_coll_release); 1567 1568 if (bp) 1569 bio_pair_release(bp); 1570 spin_lock_irq(q->queue_lock); 1571 1572 ceph_put_snap_context(snapc); 1573 } 1574 } 1575 1576 /* 1577 * a queue callback. Makes sure that we don't create a bio that spans across 1578 * multiple osd objects. One exception would be with a single page bios, 1579 * which we handle later at bio_chain_clone 1580 */ 1581 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 1582 struct bio_vec *bvec) 1583 { 1584 struct rbd_device *rbd_dev = q->queuedata; 1585 unsigned int chunk_sectors; 1586 sector_t sector; 1587 unsigned int bio_sectors; 1588 int max; 1589 1590 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); 1591 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); 1592 bio_sectors = bmd->bi_size >> SECTOR_SHIFT; 1593 1594 max = (chunk_sectors - ((sector & (chunk_sectors - 1)) 1595 + bio_sectors)) << SECTOR_SHIFT; 1596 if (max < 0) 1597 max = 0; /* bio_add cannot handle a negative return */ 1598 if (max <= bvec->bv_len && bio_sectors == 0) 1599 return bvec->bv_len; 1600 return max; 1601 } 1602 1603 static void rbd_free_disk(struct rbd_device *rbd_dev) 1604 { 1605 struct gendisk *disk = rbd_dev->disk; 1606 1607 if (!disk) 1608 return; 1609 1610 if (disk->flags & GENHD_FL_UP) 1611 del_gendisk(disk); 1612 if (disk->queue) 1613 blk_cleanup_queue(disk->queue); 1614 put_disk(disk); 1615 } 1616 1617 /* 1618 * Read the complete header for the given rbd device. 1619 * 1620 * Returns a pointer to a dynamically-allocated buffer containing 1621 * the complete and validated header. Caller can pass the address 1622 * of a variable that will be filled in with the version of the 1623 * header object at the time it was read. 1624 * 1625 * Returns a pointer-coded errno if a failure occurs. 1626 */ 1627 static struct rbd_image_header_ondisk * 1628 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version) 1629 { 1630 struct rbd_image_header_ondisk *ondisk = NULL; 1631 u32 snap_count = 0; 1632 u64 names_size = 0; 1633 u32 want_count; 1634 int ret; 1635 1636 /* 1637 * The complete header will include an array of its 64-bit 1638 * snapshot ids, followed by the names of those snapshots as 1639 * a contiguous block of NUL-terminated strings. Note that 1640 * the number of snapshots could change by the time we read 1641 * it in, in which case we re-read it. 1642 */ 1643 do { 1644 size_t size; 1645 1646 kfree(ondisk); 1647 1648 size = sizeof (*ondisk); 1649 size += snap_count * sizeof (struct rbd_image_snap_ondisk); 1650 size += names_size; 1651 ondisk = kmalloc(size, GFP_KERNEL); 1652 if (!ondisk) 1653 return ERR_PTR(-ENOMEM); 1654 1655 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP, 1656 rbd_dev->header_name, 1657 0, size, 1658 (char *) ondisk, version); 1659 1660 if (ret < 0) 1661 goto out_err; 1662 if (WARN_ON((size_t) ret < size)) { 1663 ret = -ENXIO; 1664 pr_warning("short header read for image %s" 1665 " (want %zd got %d)\n", 1666 rbd_dev->image_name, size, ret); 1667 goto out_err; 1668 } 1669 if (!rbd_dev_ondisk_valid(ondisk)) { 1670 ret = -ENXIO; 1671 pr_warning("invalid header for image %s\n", 1672 rbd_dev->image_name); 1673 goto out_err; 1674 } 1675 1676 names_size = le64_to_cpu(ondisk->snap_names_len); 1677 want_count = snap_count; 1678 snap_count = le32_to_cpu(ondisk->snap_count); 1679 } while (snap_count != want_count); 1680 1681 return ondisk; 1682 1683 out_err: 1684 kfree(ondisk); 1685 1686 return ERR_PTR(ret); 1687 } 1688 1689 /* 1690 * reload the ondisk the header 1691 */ 1692 static int rbd_read_header(struct rbd_device *rbd_dev, 1693 struct rbd_image_header *header) 1694 { 1695 struct rbd_image_header_ondisk *ondisk; 1696 u64 ver = 0; 1697 int ret; 1698 1699 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver); 1700 if (IS_ERR(ondisk)) 1701 return PTR_ERR(ondisk); 1702 ret = rbd_header_from_disk(header, ondisk); 1703 if (ret >= 0) 1704 header->obj_version = ver; 1705 kfree(ondisk); 1706 1707 return ret; 1708 } 1709 1710 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) 1711 { 1712 struct rbd_snap *snap; 1713 struct rbd_snap *next; 1714 1715 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) 1716 __rbd_remove_snap_dev(snap); 1717 } 1718 1719 /* 1720 * only read the first part of the ondisk header, without the snaps info 1721 */ 1722 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) 1723 { 1724 int ret; 1725 struct rbd_image_header h; 1726 1727 ret = rbd_read_header(rbd_dev, &h); 1728 if (ret < 0) 1729 return ret; 1730 1731 down_write(&rbd_dev->header_rwsem); 1732 1733 /* resized? */ 1734 if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) { 1735 sector_t size = (sector_t) h.image_size / SECTOR_SIZE; 1736 1737 if (size != (sector_t) rbd_dev->mapping.size) { 1738 dout("setting size to %llu sectors", 1739 (unsigned long long) size); 1740 rbd_dev->mapping.size = (u64) size; 1741 set_capacity(rbd_dev->disk, size); 1742 } 1743 } 1744 1745 /* rbd_dev->header.object_prefix shouldn't change */ 1746 kfree(rbd_dev->header.snap_sizes); 1747 kfree(rbd_dev->header.snap_names); 1748 /* osd requests may still refer to snapc */ 1749 ceph_put_snap_context(rbd_dev->header.snapc); 1750 1751 if (hver) 1752 *hver = h.obj_version; 1753 rbd_dev->header.obj_version = h.obj_version; 1754 rbd_dev->header.image_size = h.image_size; 1755 rbd_dev->header.snapc = h.snapc; 1756 rbd_dev->header.snap_names = h.snap_names; 1757 rbd_dev->header.snap_sizes = h.snap_sizes; 1758 /* Free the extra copy of the object prefix */ 1759 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix)); 1760 kfree(h.object_prefix); 1761 1762 ret = rbd_dev_snaps_update(rbd_dev); 1763 if (!ret) 1764 ret = rbd_dev_snaps_register(rbd_dev); 1765 1766 up_write(&rbd_dev->header_rwsem); 1767 1768 return ret; 1769 } 1770 1771 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) 1772 { 1773 int ret; 1774 1775 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1776 ret = __rbd_refresh_header(rbd_dev, hver); 1777 mutex_unlock(&ctl_mutex); 1778 1779 return ret; 1780 } 1781 1782 static int rbd_init_disk(struct rbd_device *rbd_dev) 1783 { 1784 struct gendisk *disk; 1785 struct request_queue *q; 1786 u64 segment_size; 1787 1788 /* create gendisk info */ 1789 disk = alloc_disk(RBD_MINORS_PER_MAJOR); 1790 if (!disk) 1791 return -ENOMEM; 1792 1793 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 1794 rbd_dev->dev_id); 1795 disk->major = rbd_dev->major; 1796 disk->first_minor = 0; 1797 disk->fops = &rbd_bd_ops; 1798 disk->private_data = rbd_dev; 1799 1800 /* init rq */ 1801 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock); 1802 if (!q) 1803 goto out_disk; 1804 1805 /* We use the default size, but let's be explicit about it. */ 1806 blk_queue_physical_block_size(q, SECTOR_SIZE); 1807 1808 /* set io sizes to object size */ 1809 segment_size = rbd_obj_bytes(&rbd_dev->header); 1810 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); 1811 blk_queue_max_segment_size(q, segment_size); 1812 blk_queue_io_min(q, segment_size); 1813 blk_queue_io_opt(q, segment_size); 1814 1815 blk_queue_merge_bvec(q, rbd_merge_bvec); 1816 disk->queue = q; 1817 1818 q->queuedata = rbd_dev; 1819 1820 rbd_dev->disk = disk; 1821 1822 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 1823 1824 return 0; 1825 out_disk: 1826 put_disk(disk); 1827 1828 return -ENOMEM; 1829 } 1830 1831 /* 1832 sysfs 1833 */ 1834 1835 static struct rbd_device *dev_to_rbd_dev(struct device *dev) 1836 { 1837 return container_of(dev, struct rbd_device, dev); 1838 } 1839 1840 static ssize_t rbd_size_show(struct device *dev, 1841 struct device_attribute *attr, char *buf) 1842 { 1843 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1844 sector_t size; 1845 1846 down_read(&rbd_dev->header_rwsem); 1847 size = get_capacity(rbd_dev->disk); 1848 up_read(&rbd_dev->header_rwsem); 1849 1850 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE); 1851 } 1852 1853 /* 1854 * Note this shows the features for whatever's mapped, which is not 1855 * necessarily the base image. 1856 */ 1857 static ssize_t rbd_features_show(struct device *dev, 1858 struct device_attribute *attr, char *buf) 1859 { 1860 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1861 1862 return sprintf(buf, "0x%016llx\n", 1863 (unsigned long long) rbd_dev->mapping.features); 1864 } 1865 1866 static ssize_t rbd_major_show(struct device *dev, 1867 struct device_attribute *attr, char *buf) 1868 { 1869 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1870 1871 return sprintf(buf, "%d\n", rbd_dev->major); 1872 } 1873 1874 static ssize_t rbd_client_id_show(struct device *dev, 1875 struct device_attribute *attr, char *buf) 1876 { 1877 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1878 1879 return sprintf(buf, "client%lld\n", 1880 ceph_client_id(rbd_dev->rbd_client->client)); 1881 } 1882 1883 static ssize_t rbd_pool_show(struct device *dev, 1884 struct device_attribute *attr, char *buf) 1885 { 1886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1887 1888 return sprintf(buf, "%s\n", rbd_dev->pool_name); 1889 } 1890 1891 static ssize_t rbd_pool_id_show(struct device *dev, 1892 struct device_attribute *attr, char *buf) 1893 { 1894 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1895 1896 return sprintf(buf, "%d\n", rbd_dev->pool_id); 1897 } 1898 1899 static ssize_t rbd_name_show(struct device *dev, 1900 struct device_attribute *attr, char *buf) 1901 { 1902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1903 1904 return sprintf(buf, "%s\n", rbd_dev->image_name); 1905 } 1906 1907 static ssize_t rbd_image_id_show(struct device *dev, 1908 struct device_attribute *attr, char *buf) 1909 { 1910 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1911 1912 return sprintf(buf, "%s\n", rbd_dev->image_id); 1913 } 1914 1915 /* 1916 * Shows the name of the currently-mapped snapshot (or 1917 * RBD_SNAP_HEAD_NAME for the base image). 1918 */ 1919 static ssize_t rbd_snap_show(struct device *dev, 1920 struct device_attribute *attr, 1921 char *buf) 1922 { 1923 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1924 1925 return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name); 1926 } 1927 1928 static ssize_t rbd_image_refresh(struct device *dev, 1929 struct device_attribute *attr, 1930 const char *buf, 1931 size_t size) 1932 { 1933 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1934 int ret; 1935 1936 ret = rbd_refresh_header(rbd_dev, NULL); 1937 1938 return ret < 0 ? ret : size; 1939 } 1940 1941 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 1942 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); 1943 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 1944 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 1945 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 1946 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); 1947 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 1948 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 1949 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 1950 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 1951 1952 static struct attribute *rbd_attrs[] = { 1953 &dev_attr_size.attr, 1954 &dev_attr_features.attr, 1955 &dev_attr_major.attr, 1956 &dev_attr_client_id.attr, 1957 &dev_attr_pool.attr, 1958 &dev_attr_pool_id.attr, 1959 &dev_attr_name.attr, 1960 &dev_attr_image_id.attr, 1961 &dev_attr_current_snap.attr, 1962 &dev_attr_refresh.attr, 1963 NULL 1964 }; 1965 1966 static struct attribute_group rbd_attr_group = { 1967 .attrs = rbd_attrs, 1968 }; 1969 1970 static const struct attribute_group *rbd_attr_groups[] = { 1971 &rbd_attr_group, 1972 NULL 1973 }; 1974 1975 static void rbd_sysfs_dev_release(struct device *dev) 1976 { 1977 } 1978 1979 static struct device_type rbd_device_type = { 1980 .name = "rbd", 1981 .groups = rbd_attr_groups, 1982 .release = rbd_sysfs_dev_release, 1983 }; 1984 1985 1986 /* 1987 sysfs - snapshots 1988 */ 1989 1990 static ssize_t rbd_snap_size_show(struct device *dev, 1991 struct device_attribute *attr, 1992 char *buf) 1993 { 1994 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1995 1996 return sprintf(buf, "%llu\n", (unsigned long long)snap->size); 1997 } 1998 1999 static ssize_t rbd_snap_id_show(struct device *dev, 2000 struct device_attribute *attr, 2001 char *buf) 2002 { 2003 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2004 2005 return sprintf(buf, "%llu\n", (unsigned long long)snap->id); 2006 } 2007 2008 static ssize_t rbd_snap_features_show(struct device *dev, 2009 struct device_attribute *attr, 2010 char *buf) 2011 { 2012 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2013 2014 return sprintf(buf, "0x%016llx\n", 2015 (unsigned long long) snap->features); 2016 } 2017 2018 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); 2019 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); 2020 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL); 2021 2022 static struct attribute *rbd_snap_attrs[] = { 2023 &dev_attr_snap_size.attr, 2024 &dev_attr_snap_id.attr, 2025 &dev_attr_snap_features.attr, 2026 NULL, 2027 }; 2028 2029 static struct attribute_group rbd_snap_attr_group = { 2030 .attrs = rbd_snap_attrs, 2031 }; 2032 2033 static void rbd_snap_dev_release(struct device *dev) 2034 { 2035 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2036 kfree(snap->name); 2037 kfree(snap); 2038 } 2039 2040 static const struct attribute_group *rbd_snap_attr_groups[] = { 2041 &rbd_snap_attr_group, 2042 NULL 2043 }; 2044 2045 static struct device_type rbd_snap_device_type = { 2046 .groups = rbd_snap_attr_groups, 2047 .release = rbd_snap_dev_release, 2048 }; 2049 2050 static bool rbd_snap_registered(struct rbd_snap *snap) 2051 { 2052 bool ret = snap->dev.type == &rbd_snap_device_type; 2053 bool reg = device_is_registered(&snap->dev); 2054 2055 rbd_assert(!ret ^ reg); 2056 2057 return ret; 2058 } 2059 2060 static void __rbd_remove_snap_dev(struct rbd_snap *snap) 2061 { 2062 list_del(&snap->node); 2063 if (device_is_registered(&snap->dev)) 2064 device_unregister(&snap->dev); 2065 } 2066 2067 static int rbd_register_snap_dev(struct rbd_snap *snap, 2068 struct device *parent) 2069 { 2070 struct device *dev = &snap->dev; 2071 int ret; 2072 2073 dev->type = &rbd_snap_device_type; 2074 dev->parent = parent; 2075 dev->release = rbd_snap_dev_release; 2076 dev_set_name(dev, "snap_%s", snap->name); 2077 dout("%s: registering device for snapshot %s\n", __func__, snap->name); 2078 2079 ret = device_register(dev); 2080 2081 return ret; 2082 } 2083 2084 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev, 2085 const char *snap_name, 2086 u64 snap_id, u64 snap_size, 2087 u64 snap_features) 2088 { 2089 struct rbd_snap *snap; 2090 int ret; 2091 2092 snap = kzalloc(sizeof (*snap), GFP_KERNEL); 2093 if (!snap) 2094 return ERR_PTR(-ENOMEM); 2095 2096 ret = -ENOMEM; 2097 snap->name = kstrdup(snap_name, GFP_KERNEL); 2098 if (!snap->name) 2099 goto err; 2100 2101 snap->id = snap_id; 2102 snap->size = snap_size; 2103 snap->features = snap_features; 2104 2105 return snap; 2106 2107 err: 2108 kfree(snap->name); 2109 kfree(snap); 2110 2111 return ERR_PTR(ret); 2112 } 2113 2114 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which, 2115 u64 *snap_size, u64 *snap_features) 2116 { 2117 char *snap_name; 2118 2119 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 2120 2121 *snap_size = rbd_dev->header.snap_sizes[which]; 2122 *snap_features = 0; /* No features for v1 */ 2123 2124 /* Skip over names until we find the one we are looking for */ 2125 2126 snap_name = rbd_dev->header.snap_names; 2127 while (which--) 2128 snap_name += strlen(snap_name) + 1; 2129 2130 return snap_name; 2131 } 2132 2133 /* 2134 * Get the size and object order for an image snapshot, or if 2135 * snap_id is CEPH_NOSNAP, gets this information for the base 2136 * image. 2137 */ 2138 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 2139 u8 *order, u64 *snap_size) 2140 { 2141 __le64 snapid = cpu_to_le64(snap_id); 2142 int ret; 2143 struct { 2144 u8 order; 2145 __le64 size; 2146 } __attribute__ ((packed)) size_buf = { 0 }; 2147 2148 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2149 "rbd", "get_size", 2150 (char *) &snapid, sizeof (snapid), 2151 (char *) &size_buf, sizeof (size_buf), 2152 CEPH_OSD_FLAG_READ, NULL); 2153 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2154 if (ret < 0) 2155 return ret; 2156 2157 *order = size_buf.order; 2158 *snap_size = le64_to_cpu(size_buf.size); 2159 2160 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n", 2161 (unsigned long long) snap_id, (unsigned int) *order, 2162 (unsigned long long) *snap_size); 2163 2164 return 0; 2165 } 2166 2167 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) 2168 { 2169 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, 2170 &rbd_dev->header.obj_order, 2171 &rbd_dev->header.image_size); 2172 } 2173 2174 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) 2175 { 2176 void *reply_buf; 2177 int ret; 2178 void *p; 2179 2180 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); 2181 if (!reply_buf) 2182 return -ENOMEM; 2183 2184 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2185 "rbd", "get_object_prefix", 2186 NULL, 0, 2187 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, 2188 CEPH_OSD_FLAG_READ, NULL); 2189 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2190 if (ret < 0) 2191 goto out; 2192 2193 p = reply_buf; 2194 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 2195 p + RBD_OBJ_PREFIX_LEN_MAX, 2196 NULL, GFP_NOIO); 2197 2198 if (IS_ERR(rbd_dev->header.object_prefix)) { 2199 ret = PTR_ERR(rbd_dev->header.object_prefix); 2200 rbd_dev->header.object_prefix = NULL; 2201 } else { 2202 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 2203 } 2204 2205 out: 2206 kfree(reply_buf); 2207 2208 return ret; 2209 } 2210 2211 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 2212 u64 *snap_features) 2213 { 2214 __le64 snapid = cpu_to_le64(snap_id); 2215 struct { 2216 __le64 features; 2217 __le64 incompat; 2218 } features_buf = { 0 }; 2219 int ret; 2220 2221 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2222 "rbd", "get_features", 2223 (char *) &snapid, sizeof (snapid), 2224 (char *) &features_buf, sizeof (features_buf), 2225 CEPH_OSD_FLAG_READ, NULL); 2226 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2227 if (ret < 0) 2228 return ret; 2229 *snap_features = le64_to_cpu(features_buf.features); 2230 2231 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 2232 (unsigned long long) snap_id, 2233 (unsigned long long) *snap_features, 2234 (unsigned long long) le64_to_cpu(features_buf.incompat)); 2235 2236 return 0; 2237 } 2238 2239 static int rbd_dev_v2_features(struct rbd_device *rbd_dev) 2240 { 2241 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 2242 &rbd_dev->header.features); 2243 } 2244 2245 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) 2246 { 2247 size_t size; 2248 int ret; 2249 void *reply_buf; 2250 void *p; 2251 void *end; 2252 u64 seq; 2253 u32 snap_count; 2254 struct ceph_snap_context *snapc; 2255 u32 i; 2256 2257 /* 2258 * We'll need room for the seq value (maximum snapshot id), 2259 * snapshot count, and array of that many snapshot ids. 2260 * For now we have a fixed upper limit on the number we're 2261 * prepared to receive. 2262 */ 2263 size = sizeof (__le64) + sizeof (__le32) + 2264 RBD_MAX_SNAP_COUNT * sizeof (__le64); 2265 reply_buf = kzalloc(size, GFP_KERNEL); 2266 if (!reply_buf) 2267 return -ENOMEM; 2268 2269 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2270 "rbd", "get_snapcontext", 2271 NULL, 0, 2272 reply_buf, size, 2273 CEPH_OSD_FLAG_READ, ver); 2274 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2275 if (ret < 0) 2276 goto out; 2277 2278 ret = -ERANGE; 2279 p = reply_buf; 2280 end = (char *) reply_buf + size; 2281 ceph_decode_64_safe(&p, end, seq, out); 2282 ceph_decode_32_safe(&p, end, snap_count, out); 2283 2284 /* 2285 * Make sure the reported number of snapshot ids wouldn't go 2286 * beyond the end of our buffer. But before checking that, 2287 * make sure the computed size of the snapshot context we 2288 * allocate is representable in a size_t. 2289 */ 2290 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) 2291 / sizeof (u64)) { 2292 ret = -EINVAL; 2293 goto out; 2294 } 2295 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 2296 goto out; 2297 2298 size = sizeof (struct ceph_snap_context) + 2299 snap_count * sizeof (snapc->snaps[0]); 2300 snapc = kmalloc(size, GFP_KERNEL); 2301 if (!snapc) { 2302 ret = -ENOMEM; 2303 goto out; 2304 } 2305 2306 atomic_set(&snapc->nref, 1); 2307 snapc->seq = seq; 2308 snapc->num_snaps = snap_count; 2309 for (i = 0; i < snap_count; i++) 2310 snapc->snaps[i] = ceph_decode_64(&p); 2311 2312 rbd_dev->header.snapc = snapc; 2313 2314 dout(" snap context seq = %llu, snap_count = %u\n", 2315 (unsigned long long) seq, (unsigned int) snap_count); 2316 2317 out: 2318 kfree(reply_buf); 2319 2320 return 0; 2321 } 2322 2323 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which) 2324 { 2325 size_t size; 2326 void *reply_buf; 2327 __le64 snap_id; 2328 int ret; 2329 void *p; 2330 void *end; 2331 size_t snap_name_len; 2332 char *snap_name; 2333 2334 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 2335 reply_buf = kmalloc(size, GFP_KERNEL); 2336 if (!reply_buf) 2337 return ERR_PTR(-ENOMEM); 2338 2339 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]); 2340 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2341 "rbd", "get_snapshot_name", 2342 (char *) &snap_id, sizeof (snap_id), 2343 reply_buf, size, 2344 CEPH_OSD_FLAG_READ, NULL); 2345 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2346 if (ret < 0) 2347 goto out; 2348 2349 p = reply_buf; 2350 end = (char *) reply_buf + size; 2351 snap_name_len = 0; 2352 snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len, 2353 GFP_KERNEL); 2354 if (IS_ERR(snap_name)) { 2355 ret = PTR_ERR(snap_name); 2356 goto out; 2357 } else { 2358 dout(" snap_id 0x%016llx snap_name = %s\n", 2359 (unsigned long long) le64_to_cpu(snap_id), snap_name); 2360 } 2361 kfree(reply_buf); 2362 2363 return snap_name; 2364 out: 2365 kfree(reply_buf); 2366 2367 return ERR_PTR(ret); 2368 } 2369 2370 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which, 2371 u64 *snap_size, u64 *snap_features) 2372 { 2373 __le64 snap_id; 2374 u8 order; 2375 int ret; 2376 2377 snap_id = rbd_dev->header.snapc->snaps[which]; 2378 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size); 2379 if (ret) 2380 return ERR_PTR(ret); 2381 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features); 2382 if (ret) 2383 return ERR_PTR(ret); 2384 2385 return rbd_dev_v2_snap_name(rbd_dev, which); 2386 } 2387 2388 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which, 2389 u64 *snap_size, u64 *snap_features) 2390 { 2391 if (rbd_dev->image_format == 1) 2392 return rbd_dev_v1_snap_info(rbd_dev, which, 2393 snap_size, snap_features); 2394 if (rbd_dev->image_format == 2) 2395 return rbd_dev_v2_snap_info(rbd_dev, which, 2396 snap_size, snap_features); 2397 return ERR_PTR(-EINVAL); 2398 } 2399 2400 /* 2401 * Scan the rbd device's current snapshot list and compare it to the 2402 * newly-received snapshot context. Remove any existing snapshots 2403 * not present in the new snapshot context. Add a new snapshot for 2404 * any snaphots in the snapshot context not in the current list. 2405 * And verify there are no changes to snapshots we already know 2406 * about. 2407 * 2408 * Assumes the snapshots in the snapshot context are sorted by 2409 * snapshot id, highest id first. (Snapshots in the rbd_dev's list 2410 * are also maintained in that order.) 2411 */ 2412 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev) 2413 { 2414 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 2415 const u32 snap_count = snapc->num_snaps; 2416 struct list_head *head = &rbd_dev->snaps; 2417 struct list_head *links = head->next; 2418 u32 index = 0; 2419 2420 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count); 2421 while (index < snap_count || links != head) { 2422 u64 snap_id; 2423 struct rbd_snap *snap; 2424 char *snap_name; 2425 u64 snap_size = 0; 2426 u64 snap_features = 0; 2427 2428 snap_id = index < snap_count ? snapc->snaps[index] 2429 : CEPH_NOSNAP; 2430 snap = links != head ? list_entry(links, struct rbd_snap, node) 2431 : NULL; 2432 rbd_assert(!snap || snap->id != CEPH_NOSNAP); 2433 2434 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) { 2435 struct list_head *next = links->next; 2436 2437 /* Existing snapshot not in the new snap context */ 2438 2439 if (rbd_dev->mapping.snap_id == snap->id) 2440 rbd_dev->mapping.snap_exists = false; 2441 __rbd_remove_snap_dev(snap); 2442 dout("%ssnap id %llu has been removed\n", 2443 rbd_dev->mapping.snap_id == snap->id ? 2444 "mapped " : "", 2445 (unsigned long long) snap->id); 2446 2447 /* Done with this list entry; advance */ 2448 2449 links = next; 2450 continue; 2451 } 2452 2453 snap_name = rbd_dev_snap_info(rbd_dev, index, 2454 &snap_size, &snap_features); 2455 if (IS_ERR(snap_name)) 2456 return PTR_ERR(snap_name); 2457 2458 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count, 2459 (unsigned long long) snap_id); 2460 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) { 2461 struct rbd_snap *new_snap; 2462 2463 /* We haven't seen this snapshot before */ 2464 2465 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name, 2466 snap_id, snap_size, snap_features); 2467 if (IS_ERR(new_snap)) { 2468 int err = PTR_ERR(new_snap); 2469 2470 dout(" failed to add dev, error %d\n", err); 2471 2472 return err; 2473 } 2474 2475 /* New goes before existing, or at end of list */ 2476 2477 dout(" added dev%s\n", snap ? "" : " at end\n"); 2478 if (snap) 2479 list_add_tail(&new_snap->node, &snap->node); 2480 else 2481 list_add_tail(&new_snap->node, head); 2482 } else { 2483 /* Already have this one */ 2484 2485 dout(" already present\n"); 2486 2487 rbd_assert(snap->size == snap_size); 2488 rbd_assert(!strcmp(snap->name, snap_name)); 2489 rbd_assert(snap->features == snap_features); 2490 2491 /* Done with this list entry; advance */ 2492 2493 links = links->next; 2494 } 2495 2496 /* Advance to the next entry in the snapshot context */ 2497 2498 index++; 2499 } 2500 dout("%s: done\n", __func__); 2501 2502 return 0; 2503 } 2504 2505 /* 2506 * Scan the list of snapshots and register the devices for any that 2507 * have not already been registered. 2508 */ 2509 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev) 2510 { 2511 struct rbd_snap *snap; 2512 int ret = 0; 2513 2514 dout("%s called\n", __func__); 2515 if (WARN_ON(!device_is_registered(&rbd_dev->dev))) 2516 return -EIO; 2517 2518 list_for_each_entry(snap, &rbd_dev->snaps, node) { 2519 if (!rbd_snap_registered(snap)) { 2520 ret = rbd_register_snap_dev(snap, &rbd_dev->dev); 2521 if (ret < 0) 2522 break; 2523 } 2524 } 2525 dout("%s: returning %d\n", __func__, ret); 2526 2527 return ret; 2528 } 2529 2530 static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 2531 { 2532 struct device *dev; 2533 int ret; 2534 2535 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2536 2537 dev = &rbd_dev->dev; 2538 dev->bus = &rbd_bus_type; 2539 dev->type = &rbd_device_type; 2540 dev->parent = &rbd_root_dev; 2541 dev->release = rbd_dev_release; 2542 dev_set_name(dev, "%d", rbd_dev->dev_id); 2543 ret = device_register(dev); 2544 2545 mutex_unlock(&ctl_mutex); 2546 2547 return ret; 2548 } 2549 2550 static void rbd_bus_del_dev(struct rbd_device *rbd_dev) 2551 { 2552 device_unregister(&rbd_dev->dev); 2553 } 2554 2555 static int rbd_init_watch_dev(struct rbd_device *rbd_dev) 2556 { 2557 int ret, rc; 2558 2559 do { 2560 ret = rbd_req_sync_watch(rbd_dev); 2561 if (ret == -ERANGE) { 2562 rc = rbd_refresh_header(rbd_dev, NULL); 2563 if (rc < 0) 2564 return rc; 2565 } 2566 } while (ret == -ERANGE); 2567 2568 return ret; 2569 } 2570 2571 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); 2572 2573 /* 2574 * Get a unique rbd identifier for the given new rbd_dev, and add 2575 * the rbd_dev to the global list. The minimum rbd id is 1. 2576 */ 2577 static void rbd_dev_id_get(struct rbd_device *rbd_dev) 2578 { 2579 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max); 2580 2581 spin_lock(&rbd_dev_list_lock); 2582 list_add_tail(&rbd_dev->node, &rbd_dev_list); 2583 spin_unlock(&rbd_dev_list_lock); 2584 dout("rbd_dev %p given dev id %llu\n", rbd_dev, 2585 (unsigned long long) rbd_dev->dev_id); 2586 } 2587 2588 /* 2589 * Remove an rbd_dev from the global list, and record that its 2590 * identifier is no longer in use. 2591 */ 2592 static void rbd_dev_id_put(struct rbd_device *rbd_dev) 2593 { 2594 struct list_head *tmp; 2595 int rbd_id = rbd_dev->dev_id; 2596 int max_id; 2597 2598 rbd_assert(rbd_id > 0); 2599 2600 dout("rbd_dev %p released dev id %llu\n", rbd_dev, 2601 (unsigned long long) rbd_dev->dev_id); 2602 spin_lock(&rbd_dev_list_lock); 2603 list_del_init(&rbd_dev->node); 2604 2605 /* 2606 * If the id being "put" is not the current maximum, there 2607 * is nothing special we need to do. 2608 */ 2609 if (rbd_id != atomic64_read(&rbd_dev_id_max)) { 2610 spin_unlock(&rbd_dev_list_lock); 2611 return; 2612 } 2613 2614 /* 2615 * We need to update the current maximum id. Search the 2616 * list to find out what it is. We're more likely to find 2617 * the maximum at the end, so search the list backward. 2618 */ 2619 max_id = 0; 2620 list_for_each_prev(tmp, &rbd_dev_list) { 2621 struct rbd_device *rbd_dev; 2622 2623 rbd_dev = list_entry(tmp, struct rbd_device, node); 2624 if (rbd_id > max_id) 2625 max_id = rbd_id; 2626 } 2627 spin_unlock(&rbd_dev_list_lock); 2628 2629 /* 2630 * The max id could have been updated by rbd_dev_id_get(), in 2631 * which case it now accurately reflects the new maximum. 2632 * Be careful not to overwrite the maximum value in that 2633 * case. 2634 */ 2635 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id); 2636 dout(" max dev id has been reset\n"); 2637 } 2638 2639 /* 2640 * Skips over white space at *buf, and updates *buf to point to the 2641 * first found non-space character (if any). Returns the length of 2642 * the token (string of non-white space characters) found. Note 2643 * that *buf must be terminated with '\0'. 2644 */ 2645 static inline size_t next_token(const char **buf) 2646 { 2647 /* 2648 * These are the characters that produce nonzero for 2649 * isspace() in the "C" and "POSIX" locales. 2650 */ 2651 const char *spaces = " \f\n\r\t\v"; 2652 2653 *buf += strspn(*buf, spaces); /* Find start of token */ 2654 2655 return strcspn(*buf, spaces); /* Return token length */ 2656 } 2657 2658 /* 2659 * Finds the next token in *buf, and if the provided token buffer is 2660 * big enough, copies the found token into it. The result, if 2661 * copied, is guaranteed to be terminated with '\0'. Note that *buf 2662 * must be terminated with '\0' on entry. 2663 * 2664 * Returns the length of the token found (not including the '\0'). 2665 * Return value will be 0 if no token is found, and it will be >= 2666 * token_size if the token would not fit. 2667 * 2668 * The *buf pointer will be updated to point beyond the end of the 2669 * found token. Note that this occurs even if the token buffer is 2670 * too small to hold it. 2671 */ 2672 static inline size_t copy_token(const char **buf, 2673 char *token, 2674 size_t token_size) 2675 { 2676 size_t len; 2677 2678 len = next_token(buf); 2679 if (len < token_size) { 2680 memcpy(token, *buf, len); 2681 *(token + len) = '\0'; 2682 } 2683 *buf += len; 2684 2685 return len; 2686 } 2687 2688 /* 2689 * Finds the next token in *buf, dynamically allocates a buffer big 2690 * enough to hold a copy of it, and copies the token into the new 2691 * buffer. The copy is guaranteed to be terminated with '\0'. Note 2692 * that a duplicate buffer is created even for a zero-length token. 2693 * 2694 * Returns a pointer to the newly-allocated duplicate, or a null 2695 * pointer if memory for the duplicate was not available. If 2696 * the lenp argument is a non-null pointer, the length of the token 2697 * (not including the '\0') is returned in *lenp. 2698 * 2699 * If successful, the *buf pointer will be updated to point beyond 2700 * the end of the found token. 2701 * 2702 * Note: uses GFP_KERNEL for allocation. 2703 */ 2704 static inline char *dup_token(const char **buf, size_t *lenp) 2705 { 2706 char *dup; 2707 size_t len; 2708 2709 len = next_token(buf); 2710 dup = kmalloc(len + 1, GFP_KERNEL); 2711 if (!dup) 2712 return NULL; 2713 2714 memcpy(dup, *buf, len); 2715 *(dup + len) = '\0'; 2716 *buf += len; 2717 2718 if (lenp) 2719 *lenp = len; 2720 2721 return dup; 2722 } 2723 2724 /* 2725 * This fills in the pool_name, image_name, image_name_len, rbd_dev, 2726 * rbd_md_name, and name fields of the given rbd_dev, based on the 2727 * list of monitor addresses and other options provided via 2728 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated 2729 * copy of the snapshot name to map if successful, or a 2730 * pointer-coded error otherwise. 2731 * 2732 * Note: rbd_dev is assumed to have been initially zero-filled. 2733 */ 2734 static char *rbd_add_parse_args(struct rbd_device *rbd_dev, 2735 const char *buf, 2736 const char **mon_addrs, 2737 size_t *mon_addrs_size, 2738 char *options, 2739 size_t options_size) 2740 { 2741 size_t len; 2742 char *err_ptr = ERR_PTR(-EINVAL); 2743 char *snap_name; 2744 2745 /* The first four tokens are required */ 2746 2747 len = next_token(&buf); 2748 if (!len) 2749 return err_ptr; 2750 *mon_addrs_size = len + 1; 2751 *mon_addrs = buf; 2752 2753 buf += len; 2754 2755 len = copy_token(&buf, options, options_size); 2756 if (!len || len >= options_size) 2757 return err_ptr; 2758 2759 err_ptr = ERR_PTR(-ENOMEM); 2760 rbd_dev->pool_name = dup_token(&buf, NULL); 2761 if (!rbd_dev->pool_name) 2762 goto out_err; 2763 2764 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len); 2765 if (!rbd_dev->image_name) 2766 goto out_err; 2767 2768 /* Snapshot name is optional */ 2769 len = next_token(&buf); 2770 if (!len) { 2771 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 2772 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 2773 } 2774 snap_name = kmalloc(len + 1, GFP_KERNEL); 2775 if (!snap_name) 2776 goto out_err; 2777 memcpy(snap_name, buf, len); 2778 *(snap_name + len) = '\0'; 2779 2780 dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len); 2781 2782 return snap_name; 2783 2784 out_err: 2785 kfree(rbd_dev->image_name); 2786 rbd_dev->image_name = NULL; 2787 rbd_dev->image_name_len = 0; 2788 kfree(rbd_dev->pool_name); 2789 rbd_dev->pool_name = NULL; 2790 2791 return err_ptr; 2792 } 2793 2794 /* 2795 * An rbd format 2 image has a unique identifier, distinct from the 2796 * name given to it by the user. Internally, that identifier is 2797 * what's used to specify the names of objects related to the image. 2798 * 2799 * A special "rbd id" object is used to map an rbd image name to its 2800 * id. If that object doesn't exist, then there is no v2 rbd image 2801 * with the supplied name. 2802 * 2803 * This function will record the given rbd_dev's image_id field if 2804 * it can be determined, and in that case will return 0. If any 2805 * errors occur a negative errno will be returned and the rbd_dev's 2806 * image_id field will be unchanged (and should be NULL). 2807 */ 2808 static int rbd_dev_image_id(struct rbd_device *rbd_dev) 2809 { 2810 int ret; 2811 size_t size; 2812 char *object_name; 2813 void *response; 2814 void *p; 2815 2816 /* 2817 * First, see if the format 2 image id file exists, and if 2818 * so, get the image's persistent id from it. 2819 */ 2820 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len; 2821 object_name = kmalloc(size, GFP_NOIO); 2822 if (!object_name) 2823 return -ENOMEM; 2824 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name); 2825 dout("rbd id object name is %s\n", object_name); 2826 2827 /* Response will be an encoded string, which includes a length */ 2828 2829 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; 2830 response = kzalloc(size, GFP_NOIO); 2831 if (!response) { 2832 ret = -ENOMEM; 2833 goto out; 2834 } 2835 2836 ret = rbd_req_sync_exec(rbd_dev, object_name, 2837 "rbd", "get_id", 2838 NULL, 0, 2839 response, RBD_IMAGE_ID_LEN_MAX, 2840 CEPH_OSD_FLAG_READ, NULL); 2841 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2842 if (ret < 0) 2843 goto out; 2844 2845 p = response; 2846 rbd_dev->image_id = ceph_extract_encoded_string(&p, 2847 p + RBD_IMAGE_ID_LEN_MAX, 2848 &rbd_dev->image_id_len, 2849 GFP_NOIO); 2850 if (IS_ERR(rbd_dev->image_id)) { 2851 ret = PTR_ERR(rbd_dev->image_id); 2852 rbd_dev->image_id = NULL; 2853 } else { 2854 dout("image_id is %s\n", rbd_dev->image_id); 2855 } 2856 out: 2857 kfree(response); 2858 kfree(object_name); 2859 2860 return ret; 2861 } 2862 2863 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) 2864 { 2865 int ret; 2866 size_t size; 2867 2868 /* Version 1 images have no id; empty string is used */ 2869 2870 rbd_dev->image_id = kstrdup("", GFP_KERNEL); 2871 if (!rbd_dev->image_id) 2872 return -ENOMEM; 2873 rbd_dev->image_id_len = 0; 2874 2875 /* Record the header object name for this rbd image. */ 2876 2877 size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX); 2878 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 2879 if (!rbd_dev->header_name) { 2880 ret = -ENOMEM; 2881 goto out_err; 2882 } 2883 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX); 2884 2885 /* Populate rbd image metadata */ 2886 2887 ret = rbd_read_header(rbd_dev, &rbd_dev->header); 2888 if (ret < 0) 2889 goto out_err; 2890 rbd_dev->image_format = 1; 2891 2892 dout("discovered version 1 image, header name is %s\n", 2893 rbd_dev->header_name); 2894 2895 return 0; 2896 2897 out_err: 2898 kfree(rbd_dev->header_name); 2899 rbd_dev->header_name = NULL; 2900 kfree(rbd_dev->image_id); 2901 rbd_dev->image_id = NULL; 2902 2903 return ret; 2904 } 2905 2906 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) 2907 { 2908 size_t size; 2909 int ret; 2910 u64 ver = 0; 2911 2912 /* 2913 * Image id was filled in by the caller. Record the header 2914 * object name for this rbd image. 2915 */ 2916 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len; 2917 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 2918 if (!rbd_dev->header_name) 2919 return -ENOMEM; 2920 sprintf(rbd_dev->header_name, "%s%s", 2921 RBD_HEADER_PREFIX, rbd_dev->image_id); 2922 2923 /* Get the size and object order for the image */ 2924 2925 ret = rbd_dev_v2_image_size(rbd_dev); 2926 if (ret < 0) 2927 goto out_err; 2928 2929 /* Get the object prefix (a.k.a. block_name) for the image */ 2930 2931 ret = rbd_dev_v2_object_prefix(rbd_dev); 2932 if (ret < 0) 2933 goto out_err; 2934 2935 /* Get the features for the image */ 2936 2937 ret = rbd_dev_v2_features(rbd_dev); 2938 if (ret < 0) 2939 goto out_err; 2940 2941 /* crypto and compression type aren't (yet) supported for v2 images */ 2942 2943 rbd_dev->header.crypt_type = 0; 2944 rbd_dev->header.comp_type = 0; 2945 2946 /* Get the snapshot context, plus the header version */ 2947 2948 ret = rbd_dev_v2_snap_context(rbd_dev, &ver); 2949 if (ret) 2950 goto out_err; 2951 rbd_dev->header.obj_version = ver; 2952 2953 rbd_dev->image_format = 2; 2954 2955 dout("discovered version 2 image, header name is %s\n", 2956 rbd_dev->header_name); 2957 2958 return -ENOTSUPP; 2959 out_err: 2960 kfree(rbd_dev->header_name); 2961 rbd_dev->header_name = NULL; 2962 kfree(rbd_dev->header.object_prefix); 2963 rbd_dev->header.object_prefix = NULL; 2964 2965 return ret; 2966 } 2967 2968 /* 2969 * Probe for the existence of the header object for the given rbd 2970 * device. For format 2 images this includes determining the image 2971 * id. 2972 */ 2973 static int rbd_dev_probe(struct rbd_device *rbd_dev) 2974 { 2975 int ret; 2976 2977 /* 2978 * Get the id from the image id object. If it's not a 2979 * format 2 image, we'll get ENOENT back, and we'll assume 2980 * it's a format 1 image. 2981 */ 2982 ret = rbd_dev_image_id(rbd_dev); 2983 if (ret) 2984 ret = rbd_dev_v1_probe(rbd_dev); 2985 else 2986 ret = rbd_dev_v2_probe(rbd_dev); 2987 if (ret) 2988 dout("probe failed, returning %d\n", ret); 2989 2990 return ret; 2991 } 2992 2993 static ssize_t rbd_add(struct bus_type *bus, 2994 const char *buf, 2995 size_t count) 2996 { 2997 char *options; 2998 struct rbd_device *rbd_dev = NULL; 2999 const char *mon_addrs = NULL; 3000 size_t mon_addrs_size = 0; 3001 struct ceph_osd_client *osdc; 3002 int rc = -ENOMEM; 3003 char *snap_name; 3004 3005 if (!try_module_get(THIS_MODULE)) 3006 return -ENODEV; 3007 3008 options = kmalloc(count, GFP_KERNEL); 3009 if (!options) 3010 goto err_out_mem; 3011 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 3012 if (!rbd_dev) 3013 goto err_out_mem; 3014 3015 /* static rbd_device initialization */ 3016 spin_lock_init(&rbd_dev->lock); 3017 INIT_LIST_HEAD(&rbd_dev->node); 3018 INIT_LIST_HEAD(&rbd_dev->snaps); 3019 init_rwsem(&rbd_dev->header_rwsem); 3020 3021 /* parse add command */ 3022 snap_name = rbd_add_parse_args(rbd_dev, buf, 3023 &mon_addrs, &mon_addrs_size, options, count); 3024 if (IS_ERR(snap_name)) { 3025 rc = PTR_ERR(snap_name); 3026 goto err_out_mem; 3027 } 3028 3029 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options); 3030 if (rc < 0) 3031 goto err_out_args; 3032 3033 /* pick the pool */ 3034 osdc = &rbd_dev->rbd_client->client->osdc; 3035 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name); 3036 if (rc < 0) 3037 goto err_out_client; 3038 rbd_dev->pool_id = rc; 3039 3040 rc = rbd_dev_probe(rbd_dev); 3041 if (rc < 0) 3042 goto err_out_client; 3043 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 3044 3045 /* no need to lock here, as rbd_dev is not registered yet */ 3046 rc = rbd_dev_snaps_update(rbd_dev); 3047 if (rc) 3048 goto err_out_header; 3049 3050 rc = rbd_dev_set_mapping(rbd_dev, snap_name); 3051 if (rc) 3052 goto err_out_header; 3053 3054 /* generate unique id: find highest unique id, add one */ 3055 rbd_dev_id_get(rbd_dev); 3056 3057 /* Fill in the device name, now that we have its id. */ 3058 BUILD_BUG_ON(DEV_NAME_LEN 3059 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); 3060 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); 3061 3062 /* Get our block major device number. */ 3063 3064 rc = register_blkdev(0, rbd_dev->name); 3065 if (rc < 0) 3066 goto err_out_id; 3067 rbd_dev->major = rc; 3068 3069 /* Set up the blkdev mapping. */ 3070 3071 rc = rbd_init_disk(rbd_dev); 3072 if (rc) 3073 goto err_out_blkdev; 3074 3075 rc = rbd_bus_add_dev(rbd_dev); 3076 if (rc) 3077 goto err_out_disk; 3078 3079 /* 3080 * At this point cleanup in the event of an error is the job 3081 * of the sysfs code (initiated by rbd_bus_del_dev()). 3082 */ 3083 3084 down_write(&rbd_dev->header_rwsem); 3085 rc = rbd_dev_snaps_register(rbd_dev); 3086 up_write(&rbd_dev->header_rwsem); 3087 if (rc) 3088 goto err_out_bus; 3089 3090 rc = rbd_init_watch_dev(rbd_dev); 3091 if (rc) 3092 goto err_out_bus; 3093 3094 /* Everything's ready. Announce the disk to the world. */ 3095 3096 add_disk(rbd_dev->disk); 3097 3098 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 3099 (unsigned long long) rbd_dev->mapping.size); 3100 3101 return count; 3102 3103 err_out_bus: 3104 /* this will also clean up rest of rbd_dev stuff */ 3105 3106 rbd_bus_del_dev(rbd_dev); 3107 kfree(options); 3108 return rc; 3109 3110 err_out_disk: 3111 rbd_free_disk(rbd_dev); 3112 err_out_blkdev: 3113 unregister_blkdev(rbd_dev->major, rbd_dev->name); 3114 err_out_id: 3115 rbd_dev_id_put(rbd_dev); 3116 err_out_header: 3117 rbd_header_free(&rbd_dev->header); 3118 err_out_client: 3119 kfree(rbd_dev->header_name); 3120 rbd_put_client(rbd_dev); 3121 kfree(rbd_dev->image_id); 3122 err_out_args: 3123 kfree(rbd_dev->mapping.snap_name); 3124 kfree(rbd_dev->image_name); 3125 kfree(rbd_dev->pool_name); 3126 err_out_mem: 3127 kfree(rbd_dev); 3128 kfree(options); 3129 3130 dout("Error adding device %s\n", buf); 3131 module_put(THIS_MODULE); 3132 3133 return (ssize_t) rc; 3134 } 3135 3136 static struct rbd_device *__rbd_get_dev(unsigned long dev_id) 3137 { 3138 struct list_head *tmp; 3139 struct rbd_device *rbd_dev; 3140 3141 spin_lock(&rbd_dev_list_lock); 3142 list_for_each(tmp, &rbd_dev_list) { 3143 rbd_dev = list_entry(tmp, struct rbd_device, node); 3144 if (rbd_dev->dev_id == dev_id) { 3145 spin_unlock(&rbd_dev_list_lock); 3146 return rbd_dev; 3147 } 3148 } 3149 spin_unlock(&rbd_dev_list_lock); 3150 return NULL; 3151 } 3152 3153 static void rbd_dev_release(struct device *dev) 3154 { 3155 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3156 3157 if (rbd_dev->watch_request) { 3158 struct ceph_client *client = rbd_dev->rbd_client->client; 3159 3160 ceph_osdc_unregister_linger_request(&client->osdc, 3161 rbd_dev->watch_request); 3162 } 3163 if (rbd_dev->watch_event) 3164 rbd_req_sync_unwatch(rbd_dev); 3165 3166 rbd_put_client(rbd_dev); 3167 3168 /* clean up and free blkdev */ 3169 rbd_free_disk(rbd_dev); 3170 unregister_blkdev(rbd_dev->major, rbd_dev->name); 3171 3172 /* release allocated disk header fields */ 3173 rbd_header_free(&rbd_dev->header); 3174 3175 /* done with the id, and with the rbd_dev */ 3176 kfree(rbd_dev->mapping.snap_name); 3177 kfree(rbd_dev->image_id); 3178 kfree(rbd_dev->header_name); 3179 kfree(rbd_dev->pool_name); 3180 kfree(rbd_dev->image_name); 3181 rbd_dev_id_put(rbd_dev); 3182 kfree(rbd_dev); 3183 3184 /* release module ref */ 3185 module_put(THIS_MODULE); 3186 } 3187 3188 static ssize_t rbd_remove(struct bus_type *bus, 3189 const char *buf, 3190 size_t count) 3191 { 3192 struct rbd_device *rbd_dev = NULL; 3193 int target_id, rc; 3194 unsigned long ul; 3195 int ret = count; 3196 3197 rc = strict_strtoul(buf, 10, &ul); 3198 if (rc) 3199 return rc; 3200 3201 /* convert to int; abort if we lost anything in the conversion */ 3202 target_id = (int) ul; 3203 if (target_id != ul) 3204 return -EINVAL; 3205 3206 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 3207 3208 rbd_dev = __rbd_get_dev(target_id); 3209 if (!rbd_dev) { 3210 ret = -ENOENT; 3211 goto done; 3212 } 3213 3214 __rbd_remove_all_snaps(rbd_dev); 3215 rbd_bus_del_dev(rbd_dev); 3216 3217 done: 3218 mutex_unlock(&ctl_mutex); 3219 3220 return ret; 3221 } 3222 3223 /* 3224 * create control files in sysfs 3225 * /sys/bus/rbd/... 3226 */ 3227 static int rbd_sysfs_init(void) 3228 { 3229 int ret; 3230 3231 ret = device_register(&rbd_root_dev); 3232 if (ret < 0) 3233 return ret; 3234 3235 ret = bus_register(&rbd_bus_type); 3236 if (ret < 0) 3237 device_unregister(&rbd_root_dev); 3238 3239 return ret; 3240 } 3241 3242 static void rbd_sysfs_cleanup(void) 3243 { 3244 bus_unregister(&rbd_bus_type); 3245 device_unregister(&rbd_root_dev); 3246 } 3247 3248 int __init rbd_init(void) 3249 { 3250 int rc; 3251 3252 rc = rbd_sysfs_init(); 3253 if (rc) 3254 return rc; 3255 pr_info("loaded " RBD_DRV_NAME_LONG "\n"); 3256 return 0; 3257 } 3258 3259 void __exit rbd_exit(void) 3260 { 3261 rbd_sysfs_cleanup(); 3262 } 3263 3264 module_init(rbd_init); 3265 module_exit(rbd_exit); 3266 3267 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 3268 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 3269 MODULE_DESCRIPTION("rados block device"); 3270 3271 /* following authorship retained from original osdblk.c */ 3272 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 3273 3274 MODULE_LICENSE("GPL"); 3275