1 /* 2 rbd.c -- Export ceph rados objects as a Linux block device 3 4 5 based on drivers/block/osdblk.c: 6 7 Copyright 2009 Red Hat, Inc. 8 9 This program is free software; you can redistribute it and/or modify 10 it under the terms of the GNU General Public License as published by 11 the Free Software Foundation. 12 13 This program is distributed in the hope that it will be useful, 14 but WITHOUT ANY WARRANTY; without even the implied warranty of 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 GNU General Public License for more details. 17 18 You should have received a copy of the GNU General Public License 19 along with this program; see the file COPYING. If not, write to 20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 21 22 23 24 For usage instructions, please refer to: 25 26 Documentation/ABI/testing/sysfs-bus-rbd 27 28 */ 29 30 #include <linux/ceph/libceph.h> 31 #include <linux/ceph/osd_client.h> 32 #include <linux/ceph/mon_client.h> 33 #include <linux/ceph/decode.h> 34 #include <linux/parser.h> 35 36 #include <linux/kernel.h> 37 #include <linux/device.h> 38 #include <linux/module.h> 39 #include <linux/fs.h> 40 #include <linux/blkdev.h> 41 42 #include "rbd_types.h" 43 44 #define RBD_DEBUG /* Activate rbd_assert() calls */ 45 46 /* 47 * The basic unit of block I/O is a sector. It is interpreted in a 48 * number of contexts in Linux (blk, bio, genhd), but the default is 49 * universally 512 bytes. These symbols are just slightly more 50 * meaningful than the bare numbers they represent. 51 */ 52 #define SECTOR_SHIFT 9 53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT) 54 55 /* It might be useful to have these defined elsewhere */ 56 57 #define U8_MAX ((u8) (~0U)) 58 #define U16_MAX ((u16) (~0U)) 59 #define U32_MAX ((u32) (~0U)) 60 #define U64_MAX ((u64) (~0ULL)) 61 62 #define RBD_DRV_NAME "rbd" 63 #define RBD_DRV_NAME_LONG "rbd (rados block device)" 64 65 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 66 67 #define RBD_SNAP_DEV_NAME_PREFIX "snap_" 68 #define RBD_MAX_SNAP_NAME_LEN \ 69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 70 71 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 72 73 #define RBD_SNAP_HEAD_NAME "-" 74 75 /* This allows a single page to hold an image name sent by OSD */ 76 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 77 #define RBD_IMAGE_ID_LEN_MAX 64 78 79 #define RBD_OBJ_PREFIX_LEN_MAX 64 80 81 /* Feature bits */ 82 83 #define RBD_FEATURE_LAYERING 1 84 85 /* Features supported by this (client software) implementation. */ 86 87 #define RBD_FEATURES_ALL (0) 88 89 /* 90 * An RBD device name will be "rbd#", where the "rbd" comes from 91 * RBD_DRV_NAME above, and # is a unique integer identifier. 92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big 93 * enough to hold all possible device names. 94 */ 95 #define DEV_NAME_LEN 32 96 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) 97 98 /* 99 * block device image metadata (in-memory version) 100 */ 101 struct rbd_image_header { 102 /* These four fields never change for a given rbd image */ 103 char *object_prefix; 104 u64 features; 105 __u8 obj_order; 106 __u8 crypt_type; 107 __u8 comp_type; 108 109 /* The remaining fields need to be updated occasionally */ 110 u64 image_size; 111 struct ceph_snap_context *snapc; 112 char *snap_names; 113 u64 *snap_sizes; 114 115 u64 obj_version; 116 }; 117 118 /* 119 * An rbd image specification. 120 * 121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 122 * identify an image. Each rbd_dev structure includes a pointer to 123 * an rbd_spec structure that encapsulates this identity. 124 * 125 * Each of the id's in an rbd_spec has an associated name. For a 126 * user-mapped image, the names are supplied and the id's associated 127 * with them are looked up. For a layered image, a parent image is 128 * defined by the tuple, and the names are looked up. 129 * 130 * An rbd_dev structure contains a parent_spec pointer which is 131 * non-null if the image it represents is a child in a layered 132 * image. This pointer will refer to the rbd_spec structure used 133 * by the parent rbd_dev for its own identity (i.e., the structure 134 * is shared between the parent and child). 135 * 136 * Since these structures are populated once, during the discovery 137 * phase of image construction, they are effectively immutable so 138 * we make no effort to synchronize access to them. 139 * 140 * Note that code herein does not assume the image name is known (it 141 * could be a null pointer). 142 */ 143 struct rbd_spec { 144 u64 pool_id; 145 char *pool_name; 146 147 char *image_id; 148 char *image_name; 149 150 u64 snap_id; 151 char *snap_name; 152 153 struct kref kref; 154 }; 155 156 /* 157 * an instance of the client. multiple devices may share an rbd client. 158 */ 159 struct rbd_client { 160 struct ceph_client *client; 161 struct kref kref; 162 struct list_head node; 163 }; 164 165 struct rbd_img_request; 166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *); 167 168 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */ 169 170 struct rbd_obj_request; 171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *); 172 173 enum obj_request_type { 174 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES 175 }; 176 177 struct rbd_obj_request { 178 const char *object_name; 179 u64 offset; /* object start byte */ 180 u64 length; /* bytes from offset */ 181 182 struct rbd_img_request *img_request; 183 struct list_head links; /* img_request->obj_requests */ 184 u32 which; /* posn image request list */ 185 186 enum obj_request_type type; 187 union { 188 struct bio *bio_list; 189 struct { 190 struct page **pages; 191 u32 page_count; 192 }; 193 }; 194 195 struct ceph_osd_request *osd_req; 196 197 u64 xferred; /* bytes transferred */ 198 u64 version; 199 int result; 200 atomic_t done; 201 202 rbd_obj_callback_t callback; 203 struct completion completion; 204 205 struct kref kref; 206 }; 207 208 struct rbd_img_request { 209 struct request *rq; 210 struct rbd_device *rbd_dev; 211 u64 offset; /* starting image byte offset */ 212 u64 length; /* byte count from offset */ 213 bool write_request; /* false for read */ 214 union { 215 struct ceph_snap_context *snapc; /* for writes */ 216 u64 snap_id; /* for reads */ 217 }; 218 spinlock_t completion_lock;/* protects next_completion */ 219 u32 next_completion; 220 rbd_img_callback_t callback; 221 222 u32 obj_request_count; 223 struct list_head obj_requests; /* rbd_obj_request structs */ 224 225 struct kref kref; 226 }; 227 228 #define for_each_obj_request(ireq, oreq) \ 229 list_for_each_entry(oreq, &(ireq)->obj_requests, links) 230 #define for_each_obj_request_from(ireq, oreq) \ 231 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links) 232 #define for_each_obj_request_safe(ireq, oreq, n) \ 233 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) 234 235 struct rbd_snap { 236 struct device dev; 237 const char *name; 238 u64 size; 239 struct list_head node; 240 u64 id; 241 u64 features; 242 }; 243 244 struct rbd_mapping { 245 u64 size; 246 u64 features; 247 bool read_only; 248 }; 249 250 /* 251 * a single device 252 */ 253 struct rbd_device { 254 int dev_id; /* blkdev unique id */ 255 256 int major; /* blkdev assigned major */ 257 struct gendisk *disk; /* blkdev's gendisk and rq */ 258 259 u32 image_format; /* Either 1 or 2 */ 260 struct rbd_client *rbd_client; 261 262 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 263 264 spinlock_t lock; /* queue, flags, open_count */ 265 266 struct rbd_image_header header; 267 unsigned long flags; /* possibly lock protected */ 268 struct rbd_spec *spec; 269 270 char *header_name; 271 272 struct ceph_file_layout layout; 273 274 struct ceph_osd_event *watch_event; 275 struct rbd_obj_request *watch_request; 276 277 struct rbd_spec *parent_spec; 278 u64 parent_overlap; 279 280 /* protects updating the header */ 281 struct rw_semaphore header_rwsem; 282 283 struct rbd_mapping mapping; 284 285 struct list_head node; 286 287 /* list of snapshots */ 288 struct list_head snaps; 289 290 /* sysfs related */ 291 struct device dev; 292 unsigned long open_count; /* protected by lock */ 293 }; 294 295 /* 296 * Flag bits for rbd_dev->flags. If atomicity is required, 297 * rbd_dev->lock is used to protect access. 298 * 299 * Currently, only the "removing" flag (which is coupled with the 300 * "open_count" field) requires atomic access. 301 */ 302 enum rbd_dev_flags { 303 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ 304 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ 305 }; 306 307 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 308 309 static LIST_HEAD(rbd_dev_list); /* devices */ 310 static DEFINE_SPINLOCK(rbd_dev_list_lock); 311 312 static LIST_HEAD(rbd_client_list); /* clients */ 313 static DEFINE_SPINLOCK(rbd_client_list_lock); 314 315 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); 316 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); 317 318 static void rbd_dev_release(struct device *dev); 319 static void rbd_remove_snap_dev(struct rbd_snap *snap); 320 321 static ssize_t rbd_add(struct bus_type *bus, const char *buf, 322 size_t count); 323 static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 324 size_t count); 325 326 static struct bus_attribute rbd_bus_attrs[] = { 327 __ATTR(add, S_IWUSR, NULL, rbd_add), 328 __ATTR(remove, S_IWUSR, NULL, rbd_remove), 329 __ATTR_NULL 330 }; 331 332 static struct bus_type rbd_bus_type = { 333 .name = "rbd", 334 .bus_attrs = rbd_bus_attrs, 335 }; 336 337 static void rbd_root_dev_release(struct device *dev) 338 { 339 } 340 341 static struct device rbd_root_dev = { 342 .init_name = "rbd", 343 .release = rbd_root_dev_release, 344 }; 345 346 static __printf(2, 3) 347 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) 348 { 349 struct va_format vaf; 350 va_list args; 351 352 va_start(args, fmt); 353 vaf.fmt = fmt; 354 vaf.va = &args; 355 356 if (!rbd_dev) 357 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf); 358 else if (rbd_dev->disk) 359 printk(KERN_WARNING "%s: %s: %pV\n", 360 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf); 361 else if (rbd_dev->spec && rbd_dev->spec->image_name) 362 printk(KERN_WARNING "%s: image %s: %pV\n", 363 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf); 364 else if (rbd_dev->spec && rbd_dev->spec->image_id) 365 printk(KERN_WARNING "%s: id %s: %pV\n", 366 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf); 367 else /* punt */ 368 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n", 369 RBD_DRV_NAME, rbd_dev, &vaf); 370 va_end(args); 371 } 372 373 #ifdef RBD_DEBUG 374 #define rbd_assert(expr) \ 375 if (unlikely(!(expr))) { \ 376 printk(KERN_ERR "\nAssertion failure in %s() " \ 377 "at line %d:\n\n" \ 378 "\trbd_assert(%s);\n\n", \ 379 __func__, __LINE__, #expr); \ 380 BUG(); \ 381 } 382 #else /* !RBD_DEBUG */ 383 # define rbd_assert(expr) ((void) 0) 384 #endif /* !RBD_DEBUG */ 385 386 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver); 387 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver); 388 389 static int rbd_open(struct block_device *bdev, fmode_t mode) 390 { 391 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 392 bool removing = false; 393 394 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) 395 return -EROFS; 396 397 spin_lock_irq(&rbd_dev->lock); 398 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) 399 removing = true; 400 else 401 rbd_dev->open_count++; 402 spin_unlock_irq(&rbd_dev->lock); 403 if (removing) 404 return -ENOENT; 405 406 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 407 (void) get_device(&rbd_dev->dev); 408 set_device_ro(bdev, rbd_dev->mapping.read_only); 409 mutex_unlock(&ctl_mutex); 410 411 return 0; 412 } 413 414 static int rbd_release(struct gendisk *disk, fmode_t mode) 415 { 416 struct rbd_device *rbd_dev = disk->private_data; 417 unsigned long open_count_before; 418 419 spin_lock_irq(&rbd_dev->lock); 420 open_count_before = rbd_dev->open_count--; 421 spin_unlock_irq(&rbd_dev->lock); 422 rbd_assert(open_count_before > 0); 423 424 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 425 put_device(&rbd_dev->dev); 426 mutex_unlock(&ctl_mutex); 427 428 return 0; 429 } 430 431 static const struct block_device_operations rbd_bd_ops = { 432 .owner = THIS_MODULE, 433 .open = rbd_open, 434 .release = rbd_release, 435 }; 436 437 /* 438 * Initialize an rbd client instance. 439 * We own *ceph_opts. 440 */ 441 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 442 { 443 struct rbd_client *rbdc; 444 int ret = -ENOMEM; 445 446 dout("%s:\n", __func__); 447 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 448 if (!rbdc) 449 goto out_opt; 450 451 kref_init(&rbdc->kref); 452 INIT_LIST_HEAD(&rbdc->node); 453 454 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 455 456 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); 457 if (IS_ERR(rbdc->client)) 458 goto out_mutex; 459 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 460 461 ret = ceph_open_session(rbdc->client); 462 if (ret < 0) 463 goto out_err; 464 465 spin_lock(&rbd_client_list_lock); 466 list_add_tail(&rbdc->node, &rbd_client_list); 467 spin_unlock(&rbd_client_list_lock); 468 469 mutex_unlock(&ctl_mutex); 470 dout("%s: rbdc %p\n", __func__, rbdc); 471 472 return rbdc; 473 474 out_err: 475 ceph_destroy_client(rbdc->client); 476 out_mutex: 477 mutex_unlock(&ctl_mutex); 478 kfree(rbdc); 479 out_opt: 480 if (ceph_opts) 481 ceph_destroy_options(ceph_opts); 482 dout("%s: error %d\n", __func__, ret); 483 484 return ERR_PTR(ret); 485 } 486 487 /* 488 * Find a ceph client with specific addr and configuration. If 489 * found, bump its reference count. 490 */ 491 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) 492 { 493 struct rbd_client *client_node; 494 bool found = false; 495 496 if (ceph_opts->flags & CEPH_OPT_NOSHARE) 497 return NULL; 498 499 spin_lock(&rbd_client_list_lock); 500 list_for_each_entry(client_node, &rbd_client_list, node) { 501 if (!ceph_compare_options(ceph_opts, client_node->client)) { 502 kref_get(&client_node->kref); 503 found = true; 504 break; 505 } 506 } 507 spin_unlock(&rbd_client_list_lock); 508 509 return found ? client_node : NULL; 510 } 511 512 /* 513 * mount options 514 */ 515 enum { 516 Opt_last_int, 517 /* int args above */ 518 Opt_last_string, 519 /* string args above */ 520 Opt_read_only, 521 Opt_read_write, 522 /* Boolean args above */ 523 Opt_last_bool, 524 }; 525 526 static match_table_t rbd_opts_tokens = { 527 /* int args above */ 528 /* string args above */ 529 {Opt_read_only, "read_only"}, 530 {Opt_read_only, "ro"}, /* Alternate spelling */ 531 {Opt_read_write, "read_write"}, 532 {Opt_read_write, "rw"}, /* Alternate spelling */ 533 /* Boolean args above */ 534 {-1, NULL} 535 }; 536 537 struct rbd_options { 538 bool read_only; 539 }; 540 541 #define RBD_READ_ONLY_DEFAULT false 542 543 static int parse_rbd_opts_token(char *c, void *private) 544 { 545 struct rbd_options *rbd_opts = private; 546 substring_t argstr[MAX_OPT_ARGS]; 547 int token, intval, ret; 548 549 token = match_token(c, rbd_opts_tokens, argstr); 550 if (token < 0) 551 return -EINVAL; 552 553 if (token < Opt_last_int) { 554 ret = match_int(&argstr[0], &intval); 555 if (ret < 0) { 556 pr_err("bad mount option arg (not int) " 557 "at '%s'\n", c); 558 return ret; 559 } 560 dout("got int token %d val %d\n", token, intval); 561 } else if (token > Opt_last_int && token < Opt_last_string) { 562 dout("got string token %d val %s\n", token, 563 argstr[0].from); 564 } else if (token > Opt_last_string && token < Opt_last_bool) { 565 dout("got Boolean token %d\n", token); 566 } else { 567 dout("got token %d\n", token); 568 } 569 570 switch (token) { 571 case Opt_read_only: 572 rbd_opts->read_only = true; 573 break; 574 case Opt_read_write: 575 rbd_opts->read_only = false; 576 break; 577 default: 578 rbd_assert(false); 579 break; 580 } 581 return 0; 582 } 583 584 /* 585 * Get a ceph client with specific addr and configuration, if one does 586 * not exist create it. 587 */ 588 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) 589 { 590 struct rbd_client *rbdc; 591 592 rbdc = rbd_client_find(ceph_opts); 593 if (rbdc) /* using an existing client */ 594 ceph_destroy_options(ceph_opts); 595 else 596 rbdc = rbd_client_create(ceph_opts); 597 598 return rbdc; 599 } 600 601 /* 602 * Destroy ceph client 603 * 604 * Caller must hold rbd_client_list_lock. 605 */ 606 static void rbd_client_release(struct kref *kref) 607 { 608 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 609 610 dout("%s: rbdc %p\n", __func__, rbdc); 611 spin_lock(&rbd_client_list_lock); 612 list_del(&rbdc->node); 613 spin_unlock(&rbd_client_list_lock); 614 615 ceph_destroy_client(rbdc->client); 616 kfree(rbdc); 617 } 618 619 /* 620 * Drop reference to ceph client node. If it's not referenced anymore, release 621 * it. 622 */ 623 static void rbd_put_client(struct rbd_client *rbdc) 624 { 625 if (rbdc) 626 kref_put(&rbdc->kref, rbd_client_release); 627 } 628 629 static bool rbd_image_format_valid(u32 image_format) 630 { 631 return image_format == 1 || image_format == 2; 632 } 633 634 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) 635 { 636 size_t size; 637 u32 snap_count; 638 639 /* The header has to start with the magic rbd header text */ 640 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 641 return false; 642 643 /* The bio layer requires at least sector-sized I/O */ 644 645 if (ondisk->options.order < SECTOR_SHIFT) 646 return false; 647 648 /* If we use u64 in a few spots we may be able to loosen this */ 649 650 if (ondisk->options.order > 8 * sizeof (int) - 1) 651 return false; 652 653 /* 654 * The size of a snapshot header has to fit in a size_t, and 655 * that limits the number of snapshots. 656 */ 657 snap_count = le32_to_cpu(ondisk->snap_count); 658 size = SIZE_MAX - sizeof (struct ceph_snap_context); 659 if (snap_count > size / sizeof (__le64)) 660 return false; 661 662 /* 663 * Not only that, but the size of the entire the snapshot 664 * header must also be representable in a size_t. 665 */ 666 size -= snap_count * sizeof (__le64); 667 if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) 668 return false; 669 670 return true; 671 } 672 673 /* 674 * Create a new header structure, translate header format from the on-disk 675 * header. 676 */ 677 static int rbd_header_from_disk(struct rbd_image_header *header, 678 struct rbd_image_header_ondisk *ondisk) 679 { 680 u32 snap_count; 681 size_t len; 682 size_t size; 683 u32 i; 684 685 memset(header, 0, sizeof (*header)); 686 687 snap_count = le32_to_cpu(ondisk->snap_count); 688 689 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix)); 690 header->object_prefix = kmalloc(len + 1, GFP_KERNEL); 691 if (!header->object_prefix) 692 return -ENOMEM; 693 memcpy(header->object_prefix, ondisk->object_prefix, len); 694 header->object_prefix[len] = '\0'; 695 696 if (snap_count) { 697 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 698 699 /* Save a copy of the snapshot names */ 700 701 if (snap_names_len > (u64) SIZE_MAX) 702 return -EIO; 703 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL); 704 if (!header->snap_names) 705 goto out_err; 706 /* 707 * Note that rbd_dev_v1_header_read() guarantees 708 * the ondisk buffer we're working with has 709 * snap_names_len bytes beyond the end of the 710 * snapshot id array, this memcpy() is safe. 711 */ 712 memcpy(header->snap_names, &ondisk->snaps[snap_count], 713 snap_names_len); 714 715 /* Record each snapshot's size */ 716 717 size = snap_count * sizeof (*header->snap_sizes); 718 header->snap_sizes = kmalloc(size, GFP_KERNEL); 719 if (!header->snap_sizes) 720 goto out_err; 721 for (i = 0; i < snap_count; i++) 722 header->snap_sizes[i] = 723 le64_to_cpu(ondisk->snaps[i].image_size); 724 } else { 725 WARN_ON(ondisk->snap_names_len); 726 header->snap_names = NULL; 727 header->snap_sizes = NULL; 728 } 729 730 header->features = 0; /* No features support in v1 images */ 731 header->obj_order = ondisk->options.order; 732 header->crypt_type = ondisk->options.crypt_type; 733 header->comp_type = ondisk->options.comp_type; 734 735 /* Allocate and fill in the snapshot context */ 736 737 header->image_size = le64_to_cpu(ondisk->image_size); 738 size = sizeof (struct ceph_snap_context); 739 size += snap_count * sizeof (header->snapc->snaps[0]); 740 header->snapc = kzalloc(size, GFP_KERNEL); 741 if (!header->snapc) 742 goto out_err; 743 744 atomic_set(&header->snapc->nref, 1); 745 header->snapc->seq = le64_to_cpu(ondisk->snap_seq); 746 header->snapc->num_snaps = snap_count; 747 for (i = 0; i < snap_count; i++) 748 header->snapc->snaps[i] = 749 le64_to_cpu(ondisk->snaps[i].id); 750 751 return 0; 752 753 out_err: 754 kfree(header->snap_sizes); 755 header->snap_sizes = NULL; 756 kfree(header->snap_names); 757 header->snap_names = NULL; 758 kfree(header->object_prefix); 759 header->object_prefix = NULL; 760 761 return -ENOMEM; 762 } 763 764 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 765 { 766 struct rbd_snap *snap; 767 768 if (snap_id == CEPH_NOSNAP) 769 return RBD_SNAP_HEAD_NAME; 770 771 list_for_each_entry(snap, &rbd_dev->snaps, node) 772 if (snap_id == snap->id) 773 return snap->name; 774 775 return NULL; 776 } 777 778 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) 779 { 780 781 struct rbd_snap *snap; 782 783 list_for_each_entry(snap, &rbd_dev->snaps, node) { 784 if (!strcmp(snap_name, snap->name)) { 785 rbd_dev->spec->snap_id = snap->id; 786 rbd_dev->mapping.size = snap->size; 787 rbd_dev->mapping.features = snap->features; 788 789 return 0; 790 } 791 } 792 793 return -ENOENT; 794 } 795 796 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev) 797 { 798 int ret; 799 800 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME, 801 sizeof (RBD_SNAP_HEAD_NAME))) { 802 rbd_dev->spec->snap_id = CEPH_NOSNAP; 803 rbd_dev->mapping.size = rbd_dev->header.image_size; 804 rbd_dev->mapping.features = rbd_dev->header.features; 805 ret = 0; 806 } else { 807 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name); 808 if (ret < 0) 809 goto done; 810 rbd_dev->mapping.read_only = true; 811 } 812 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 813 814 done: 815 return ret; 816 } 817 818 static void rbd_header_free(struct rbd_image_header *header) 819 { 820 kfree(header->object_prefix); 821 header->object_prefix = NULL; 822 kfree(header->snap_sizes); 823 header->snap_sizes = NULL; 824 kfree(header->snap_names); 825 header->snap_names = NULL; 826 ceph_put_snap_context(header->snapc); 827 header->snapc = NULL; 828 } 829 830 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) 831 { 832 char *name; 833 u64 segment; 834 int ret; 835 836 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO); 837 if (!name) 838 return NULL; 839 segment = offset >> rbd_dev->header.obj_order; 840 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx", 841 rbd_dev->header.object_prefix, segment); 842 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) { 843 pr_err("error formatting segment name for #%llu (%d)\n", 844 segment, ret); 845 kfree(name); 846 name = NULL; 847 } 848 849 return name; 850 } 851 852 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) 853 { 854 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 855 856 return offset & (segment_size - 1); 857 } 858 859 static u64 rbd_segment_length(struct rbd_device *rbd_dev, 860 u64 offset, u64 length) 861 { 862 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 863 864 offset &= segment_size - 1; 865 866 rbd_assert(length <= U64_MAX - offset); 867 if (offset + length > segment_size) 868 length = segment_size - offset; 869 870 return length; 871 } 872 873 /* 874 * returns the size of an object in the image 875 */ 876 static u64 rbd_obj_bytes(struct rbd_image_header *header) 877 { 878 return 1 << header->obj_order; 879 } 880 881 /* 882 * bio helpers 883 */ 884 885 static void bio_chain_put(struct bio *chain) 886 { 887 struct bio *tmp; 888 889 while (chain) { 890 tmp = chain; 891 chain = chain->bi_next; 892 bio_put(tmp); 893 } 894 } 895 896 /* 897 * zeros a bio chain, starting at specific offset 898 */ 899 static void zero_bio_chain(struct bio *chain, int start_ofs) 900 { 901 struct bio_vec *bv; 902 unsigned long flags; 903 void *buf; 904 int i; 905 int pos = 0; 906 907 while (chain) { 908 bio_for_each_segment(bv, chain, i) { 909 if (pos + bv->bv_len > start_ofs) { 910 int remainder = max(start_ofs - pos, 0); 911 buf = bvec_kmap_irq(bv, &flags); 912 memset(buf + remainder, 0, 913 bv->bv_len - remainder); 914 bvec_kunmap_irq(buf, &flags); 915 } 916 pos += bv->bv_len; 917 } 918 919 chain = chain->bi_next; 920 } 921 } 922 923 /* 924 * Clone a portion of a bio, starting at the given byte offset 925 * and continuing for the number of bytes indicated. 926 */ 927 static struct bio *bio_clone_range(struct bio *bio_src, 928 unsigned int offset, 929 unsigned int len, 930 gfp_t gfpmask) 931 { 932 struct bio_vec *bv; 933 unsigned int resid; 934 unsigned short idx; 935 unsigned int voff; 936 unsigned short end_idx; 937 unsigned short vcnt; 938 struct bio *bio; 939 940 /* Handle the easy case for the caller */ 941 942 if (!offset && len == bio_src->bi_size) 943 return bio_clone(bio_src, gfpmask); 944 945 if (WARN_ON_ONCE(!len)) 946 return NULL; 947 if (WARN_ON_ONCE(len > bio_src->bi_size)) 948 return NULL; 949 if (WARN_ON_ONCE(offset > bio_src->bi_size - len)) 950 return NULL; 951 952 /* Find first affected segment... */ 953 954 resid = offset; 955 __bio_for_each_segment(bv, bio_src, idx, 0) { 956 if (resid < bv->bv_len) 957 break; 958 resid -= bv->bv_len; 959 } 960 voff = resid; 961 962 /* ...and the last affected segment */ 963 964 resid += len; 965 __bio_for_each_segment(bv, bio_src, end_idx, idx) { 966 if (resid <= bv->bv_len) 967 break; 968 resid -= bv->bv_len; 969 } 970 vcnt = end_idx - idx + 1; 971 972 /* Build the clone */ 973 974 bio = bio_alloc(gfpmask, (unsigned int) vcnt); 975 if (!bio) 976 return NULL; /* ENOMEM */ 977 978 bio->bi_bdev = bio_src->bi_bdev; 979 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT); 980 bio->bi_rw = bio_src->bi_rw; 981 bio->bi_flags |= 1 << BIO_CLONED; 982 983 /* 984 * Copy over our part of the bio_vec, then update the first 985 * and last (or only) entries. 986 */ 987 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx], 988 vcnt * sizeof (struct bio_vec)); 989 bio->bi_io_vec[0].bv_offset += voff; 990 if (vcnt > 1) { 991 bio->bi_io_vec[0].bv_len -= voff; 992 bio->bi_io_vec[vcnt - 1].bv_len = resid; 993 } else { 994 bio->bi_io_vec[0].bv_len = len; 995 } 996 997 bio->bi_vcnt = vcnt; 998 bio->bi_size = len; 999 bio->bi_idx = 0; 1000 1001 return bio; 1002 } 1003 1004 /* 1005 * Clone a portion of a bio chain, starting at the given byte offset 1006 * into the first bio in the source chain and continuing for the 1007 * number of bytes indicated. The result is another bio chain of 1008 * exactly the given length, or a null pointer on error. 1009 * 1010 * The bio_src and offset parameters are both in-out. On entry they 1011 * refer to the first source bio and the offset into that bio where 1012 * the start of data to be cloned is located. 1013 * 1014 * On return, bio_src is updated to refer to the bio in the source 1015 * chain that contains first un-cloned byte, and *offset will 1016 * contain the offset of that byte within that bio. 1017 */ 1018 static struct bio *bio_chain_clone_range(struct bio **bio_src, 1019 unsigned int *offset, 1020 unsigned int len, 1021 gfp_t gfpmask) 1022 { 1023 struct bio *bi = *bio_src; 1024 unsigned int off = *offset; 1025 struct bio *chain = NULL; 1026 struct bio **end; 1027 1028 /* Build up a chain of clone bios up to the limit */ 1029 1030 if (!bi || off >= bi->bi_size || !len) 1031 return NULL; /* Nothing to clone */ 1032 1033 end = &chain; 1034 while (len) { 1035 unsigned int bi_size; 1036 struct bio *bio; 1037 1038 if (!bi) { 1039 rbd_warn(NULL, "bio_chain exhausted with %u left", len); 1040 goto out_err; /* EINVAL; ran out of bio's */ 1041 } 1042 bi_size = min_t(unsigned int, bi->bi_size - off, len); 1043 bio = bio_clone_range(bi, off, bi_size, gfpmask); 1044 if (!bio) 1045 goto out_err; /* ENOMEM */ 1046 1047 *end = bio; 1048 end = &bio->bi_next; 1049 1050 off += bi_size; 1051 if (off == bi->bi_size) { 1052 bi = bi->bi_next; 1053 off = 0; 1054 } 1055 len -= bi_size; 1056 } 1057 *bio_src = bi; 1058 *offset = off; 1059 1060 return chain; 1061 out_err: 1062 bio_chain_put(chain); 1063 1064 return NULL; 1065 } 1066 1067 static void rbd_obj_request_get(struct rbd_obj_request *obj_request) 1068 { 1069 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1070 atomic_read(&obj_request->kref.refcount)); 1071 kref_get(&obj_request->kref); 1072 } 1073 1074 static void rbd_obj_request_destroy(struct kref *kref); 1075 static void rbd_obj_request_put(struct rbd_obj_request *obj_request) 1076 { 1077 rbd_assert(obj_request != NULL); 1078 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1079 atomic_read(&obj_request->kref.refcount)); 1080 kref_put(&obj_request->kref, rbd_obj_request_destroy); 1081 } 1082 1083 static void rbd_img_request_get(struct rbd_img_request *img_request) 1084 { 1085 dout("%s: img %p (was %d)\n", __func__, img_request, 1086 atomic_read(&img_request->kref.refcount)); 1087 kref_get(&img_request->kref); 1088 } 1089 1090 static void rbd_img_request_destroy(struct kref *kref); 1091 static void rbd_img_request_put(struct rbd_img_request *img_request) 1092 { 1093 rbd_assert(img_request != NULL); 1094 dout("%s: img %p (was %d)\n", __func__, img_request, 1095 atomic_read(&img_request->kref.refcount)); 1096 kref_put(&img_request->kref, rbd_img_request_destroy); 1097 } 1098 1099 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, 1100 struct rbd_obj_request *obj_request) 1101 { 1102 rbd_assert(obj_request->img_request == NULL); 1103 1104 rbd_obj_request_get(obj_request); 1105 obj_request->img_request = img_request; 1106 obj_request->which = img_request->obj_request_count; 1107 rbd_assert(obj_request->which != BAD_WHICH); 1108 img_request->obj_request_count++; 1109 list_add_tail(&obj_request->links, &img_request->obj_requests); 1110 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, 1111 obj_request->which); 1112 } 1113 1114 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, 1115 struct rbd_obj_request *obj_request) 1116 { 1117 rbd_assert(obj_request->which != BAD_WHICH); 1118 1119 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, 1120 obj_request->which); 1121 list_del(&obj_request->links); 1122 rbd_assert(img_request->obj_request_count > 0); 1123 img_request->obj_request_count--; 1124 rbd_assert(obj_request->which == img_request->obj_request_count); 1125 obj_request->which = BAD_WHICH; 1126 rbd_assert(obj_request->img_request == img_request); 1127 obj_request->img_request = NULL; 1128 obj_request->callback = NULL; 1129 rbd_obj_request_put(obj_request); 1130 } 1131 1132 static bool obj_request_type_valid(enum obj_request_type type) 1133 { 1134 switch (type) { 1135 case OBJ_REQUEST_NODATA: 1136 case OBJ_REQUEST_BIO: 1137 case OBJ_REQUEST_PAGES: 1138 return true; 1139 default: 1140 return false; 1141 } 1142 } 1143 1144 static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...) 1145 { 1146 struct ceph_osd_req_op *op; 1147 va_list args; 1148 size_t size; 1149 1150 op = kzalloc(sizeof (*op), GFP_NOIO); 1151 if (!op) 1152 return NULL; 1153 op->op = opcode; 1154 va_start(args, opcode); 1155 switch (opcode) { 1156 case CEPH_OSD_OP_READ: 1157 case CEPH_OSD_OP_WRITE: 1158 /* rbd_osd_req_op_create(READ, offset, length) */ 1159 /* rbd_osd_req_op_create(WRITE, offset, length) */ 1160 op->extent.offset = va_arg(args, u64); 1161 op->extent.length = va_arg(args, u64); 1162 if (opcode == CEPH_OSD_OP_WRITE) 1163 op->payload_len = op->extent.length; 1164 break; 1165 case CEPH_OSD_OP_STAT: 1166 break; 1167 case CEPH_OSD_OP_CALL: 1168 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */ 1169 op->cls.class_name = va_arg(args, char *); 1170 size = strlen(op->cls.class_name); 1171 rbd_assert(size <= (size_t) U8_MAX); 1172 op->cls.class_len = size; 1173 op->payload_len = size; 1174 1175 op->cls.method_name = va_arg(args, char *); 1176 size = strlen(op->cls.method_name); 1177 rbd_assert(size <= (size_t) U8_MAX); 1178 op->cls.method_len = size; 1179 op->payload_len += size; 1180 1181 op->cls.argc = 0; 1182 op->cls.indata = va_arg(args, void *); 1183 size = va_arg(args, size_t); 1184 rbd_assert(size <= (size_t) U32_MAX); 1185 op->cls.indata_len = (u32) size; 1186 op->payload_len += size; 1187 break; 1188 case CEPH_OSD_OP_NOTIFY_ACK: 1189 case CEPH_OSD_OP_WATCH: 1190 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */ 1191 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */ 1192 op->watch.cookie = va_arg(args, u64); 1193 op->watch.ver = va_arg(args, u64); 1194 op->watch.ver = cpu_to_le64(op->watch.ver); 1195 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int)) 1196 op->watch.flag = (u8) 1; 1197 break; 1198 default: 1199 rbd_warn(NULL, "unsupported opcode %hu\n", opcode); 1200 kfree(op); 1201 op = NULL; 1202 break; 1203 } 1204 va_end(args); 1205 1206 return op; 1207 } 1208 1209 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op) 1210 { 1211 kfree(op); 1212 } 1213 1214 static int rbd_obj_request_submit(struct ceph_osd_client *osdc, 1215 struct rbd_obj_request *obj_request) 1216 { 1217 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request); 1218 1219 return ceph_osdc_start_request(osdc, obj_request->osd_req, false); 1220 } 1221 1222 static void rbd_img_request_complete(struct rbd_img_request *img_request) 1223 { 1224 dout("%s: img %p\n", __func__, img_request); 1225 if (img_request->callback) 1226 img_request->callback(img_request); 1227 else 1228 rbd_img_request_put(img_request); 1229 } 1230 1231 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */ 1232 1233 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) 1234 { 1235 dout("%s: obj %p\n", __func__, obj_request); 1236 1237 return wait_for_completion_interruptible(&obj_request->completion); 1238 } 1239 1240 static void obj_request_done_init(struct rbd_obj_request *obj_request) 1241 { 1242 atomic_set(&obj_request->done, 0); 1243 smp_wmb(); 1244 } 1245 1246 static void obj_request_done_set(struct rbd_obj_request *obj_request) 1247 { 1248 int done; 1249 1250 done = atomic_inc_return(&obj_request->done); 1251 if (done > 1) { 1252 struct rbd_img_request *img_request = obj_request->img_request; 1253 struct rbd_device *rbd_dev; 1254 1255 rbd_dev = img_request ? img_request->rbd_dev : NULL; 1256 rbd_warn(rbd_dev, "obj_request %p was already done\n", 1257 obj_request); 1258 } 1259 } 1260 1261 static bool obj_request_done_test(struct rbd_obj_request *obj_request) 1262 { 1263 smp_mb(); 1264 return atomic_read(&obj_request->done) != 0; 1265 } 1266 1267 static void 1268 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) 1269 { 1270 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, 1271 obj_request, obj_request->img_request, obj_request->result, 1272 obj_request->xferred, obj_request->length); 1273 /* 1274 * ENOENT means a hole in the image. We zero-fill the 1275 * entire length of the request. A short read also implies 1276 * zero-fill to the end of the request. Either way we 1277 * update the xferred count to indicate the whole request 1278 * was satisfied. 1279 */ 1280 BUG_ON(obj_request->type != OBJ_REQUEST_BIO); 1281 if (obj_request->result == -ENOENT) { 1282 zero_bio_chain(obj_request->bio_list, 0); 1283 obj_request->result = 0; 1284 obj_request->xferred = obj_request->length; 1285 } else if (obj_request->xferred < obj_request->length && 1286 !obj_request->result) { 1287 zero_bio_chain(obj_request->bio_list, obj_request->xferred); 1288 obj_request->xferred = obj_request->length; 1289 } 1290 obj_request_done_set(obj_request); 1291 } 1292 1293 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) 1294 { 1295 dout("%s: obj %p cb %p\n", __func__, obj_request, 1296 obj_request->callback); 1297 if (obj_request->callback) 1298 obj_request->callback(obj_request); 1299 else 1300 complete_all(&obj_request->completion); 1301 } 1302 1303 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request) 1304 { 1305 dout("%s: obj %p\n", __func__, obj_request); 1306 obj_request_done_set(obj_request); 1307 } 1308 1309 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) 1310 { 1311 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request, 1312 obj_request->result, obj_request->xferred, obj_request->length); 1313 if (obj_request->img_request) 1314 rbd_img_obj_request_read_callback(obj_request); 1315 else 1316 obj_request_done_set(obj_request); 1317 } 1318 1319 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) 1320 { 1321 dout("%s: obj %p result %d %llu\n", __func__, obj_request, 1322 obj_request->result, obj_request->length); 1323 /* 1324 * There is no such thing as a successful short write. 1325 * Our xferred value is the number of bytes transferred 1326 * back. Set it to our originally-requested length. 1327 */ 1328 obj_request->xferred = obj_request->length; 1329 obj_request_done_set(obj_request); 1330 } 1331 1332 /* 1333 * For a simple stat call there's nothing to do. We'll do more if 1334 * this is part of a write sequence for a layered image. 1335 */ 1336 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request) 1337 { 1338 dout("%s: obj %p\n", __func__, obj_request); 1339 obj_request_done_set(obj_request); 1340 } 1341 1342 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, 1343 struct ceph_msg *msg) 1344 { 1345 struct rbd_obj_request *obj_request = osd_req->r_priv; 1346 u16 opcode; 1347 1348 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); 1349 rbd_assert(osd_req == obj_request->osd_req); 1350 rbd_assert(!!obj_request->img_request ^ 1351 (obj_request->which == BAD_WHICH)); 1352 1353 if (osd_req->r_result < 0) 1354 obj_request->result = osd_req->r_result; 1355 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version); 1356 1357 WARN_ON(osd_req->r_num_ops != 1); /* For now */ 1358 1359 /* 1360 * We support a 64-bit length, but ultimately it has to be 1361 * passed to blk_end_request(), which takes an unsigned int. 1362 */ 1363 obj_request->xferred = osd_req->r_reply_op_len[0]; 1364 rbd_assert(obj_request->xferred < (u64) UINT_MAX); 1365 opcode = osd_req->r_request_ops[0].op; 1366 switch (opcode) { 1367 case CEPH_OSD_OP_READ: 1368 rbd_osd_read_callback(obj_request); 1369 break; 1370 case CEPH_OSD_OP_WRITE: 1371 rbd_osd_write_callback(obj_request); 1372 break; 1373 case CEPH_OSD_OP_STAT: 1374 rbd_osd_stat_callback(obj_request); 1375 break; 1376 case CEPH_OSD_OP_CALL: 1377 case CEPH_OSD_OP_NOTIFY_ACK: 1378 case CEPH_OSD_OP_WATCH: 1379 rbd_osd_trivial_callback(obj_request); 1380 break; 1381 default: 1382 rbd_warn(NULL, "%s: unsupported op %hu\n", 1383 obj_request->object_name, (unsigned short) opcode); 1384 break; 1385 } 1386 1387 if (obj_request_done_test(obj_request)) 1388 rbd_obj_request_complete(obj_request); 1389 } 1390 1391 static struct ceph_osd_request *rbd_osd_req_create( 1392 struct rbd_device *rbd_dev, 1393 bool write_request, 1394 struct rbd_obj_request *obj_request, 1395 struct ceph_osd_req_op *op) 1396 { 1397 struct rbd_img_request *img_request = obj_request->img_request; 1398 struct ceph_snap_context *snapc = NULL; 1399 struct ceph_osd_client *osdc; 1400 struct ceph_osd_request *osd_req; 1401 struct timespec now; 1402 struct timespec *mtime; 1403 u64 snap_id = CEPH_NOSNAP; 1404 u64 offset = obj_request->offset; 1405 u64 length = obj_request->length; 1406 1407 if (img_request) { 1408 rbd_assert(img_request->write_request == write_request); 1409 if (img_request->write_request) 1410 snapc = img_request->snapc; 1411 else 1412 snap_id = img_request->snap_id; 1413 } 1414 1415 /* Allocate and initialize the request, for the single op */ 1416 1417 osdc = &rbd_dev->rbd_client->client->osdc; 1418 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC); 1419 if (!osd_req) 1420 return NULL; /* ENOMEM */ 1421 1422 rbd_assert(obj_request_type_valid(obj_request->type)); 1423 switch (obj_request->type) { 1424 case OBJ_REQUEST_NODATA: 1425 break; /* Nothing to do */ 1426 case OBJ_REQUEST_BIO: 1427 rbd_assert(obj_request->bio_list != NULL); 1428 osd_req->r_bio = obj_request->bio_list; 1429 break; 1430 case OBJ_REQUEST_PAGES: 1431 osd_req->r_pages = obj_request->pages; 1432 osd_req->r_num_pages = obj_request->page_count; 1433 osd_req->r_page_alignment = offset & ~PAGE_MASK; 1434 break; 1435 } 1436 1437 if (write_request) { 1438 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; 1439 now = CURRENT_TIME; 1440 mtime = &now; 1441 } else { 1442 osd_req->r_flags = CEPH_OSD_FLAG_READ; 1443 mtime = NULL; /* not needed for reads */ 1444 offset = 0; /* These are not used... */ 1445 length = 0; /* ...for osd read requests */ 1446 } 1447 1448 osd_req->r_callback = rbd_osd_req_callback; 1449 osd_req->r_priv = obj_request; 1450 1451 osd_req->r_oid_len = strlen(obj_request->object_name); 1452 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); 1453 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); 1454 1455 osd_req->r_file_layout = rbd_dev->layout; /* struct */ 1456 1457 /* osd_req will get its own reference to snapc (if non-null) */ 1458 1459 ceph_osdc_build_request(osd_req, offset, length, 1, op, 1460 snapc, snap_id, mtime); 1461 1462 return osd_req; 1463 } 1464 1465 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) 1466 { 1467 ceph_osdc_put_request(osd_req); 1468 } 1469 1470 /* object_name is assumed to be a non-null pointer and NUL-terminated */ 1471 1472 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, 1473 u64 offset, u64 length, 1474 enum obj_request_type type) 1475 { 1476 struct rbd_obj_request *obj_request; 1477 size_t size; 1478 char *name; 1479 1480 rbd_assert(obj_request_type_valid(type)); 1481 1482 size = strlen(object_name) + 1; 1483 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL); 1484 if (!obj_request) 1485 return NULL; 1486 1487 name = (char *)(obj_request + 1); 1488 obj_request->object_name = memcpy(name, object_name, size); 1489 obj_request->offset = offset; 1490 obj_request->length = length; 1491 obj_request->which = BAD_WHICH; 1492 obj_request->type = type; 1493 INIT_LIST_HEAD(&obj_request->links); 1494 obj_request_done_init(obj_request); 1495 init_completion(&obj_request->completion); 1496 kref_init(&obj_request->kref); 1497 1498 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name, 1499 offset, length, (int)type, obj_request); 1500 1501 return obj_request; 1502 } 1503 1504 static void rbd_obj_request_destroy(struct kref *kref) 1505 { 1506 struct rbd_obj_request *obj_request; 1507 1508 obj_request = container_of(kref, struct rbd_obj_request, kref); 1509 1510 dout("%s: obj %p\n", __func__, obj_request); 1511 1512 rbd_assert(obj_request->img_request == NULL); 1513 rbd_assert(obj_request->which == BAD_WHICH); 1514 1515 if (obj_request->osd_req) 1516 rbd_osd_req_destroy(obj_request->osd_req); 1517 1518 rbd_assert(obj_request_type_valid(obj_request->type)); 1519 switch (obj_request->type) { 1520 case OBJ_REQUEST_NODATA: 1521 break; /* Nothing to do */ 1522 case OBJ_REQUEST_BIO: 1523 if (obj_request->bio_list) 1524 bio_chain_put(obj_request->bio_list); 1525 break; 1526 case OBJ_REQUEST_PAGES: 1527 if (obj_request->pages) 1528 ceph_release_page_vector(obj_request->pages, 1529 obj_request->page_count); 1530 break; 1531 } 1532 1533 kfree(obj_request); 1534 } 1535 1536 /* 1537 * Caller is responsible for filling in the list of object requests 1538 * that comprises the image request, and the Linux request pointer 1539 * (if there is one). 1540 */ 1541 static struct rbd_img_request *rbd_img_request_create( 1542 struct rbd_device *rbd_dev, 1543 u64 offset, u64 length, 1544 bool write_request) 1545 { 1546 struct rbd_img_request *img_request; 1547 struct ceph_snap_context *snapc = NULL; 1548 1549 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC); 1550 if (!img_request) 1551 return NULL; 1552 1553 if (write_request) { 1554 down_read(&rbd_dev->header_rwsem); 1555 snapc = ceph_get_snap_context(rbd_dev->header.snapc); 1556 up_read(&rbd_dev->header_rwsem); 1557 if (WARN_ON(!snapc)) { 1558 kfree(img_request); 1559 return NULL; /* Shouldn't happen */ 1560 } 1561 } 1562 1563 img_request->rq = NULL; 1564 img_request->rbd_dev = rbd_dev; 1565 img_request->offset = offset; 1566 img_request->length = length; 1567 img_request->write_request = write_request; 1568 if (write_request) 1569 img_request->snapc = snapc; 1570 else 1571 img_request->snap_id = rbd_dev->spec->snap_id; 1572 spin_lock_init(&img_request->completion_lock); 1573 img_request->next_completion = 0; 1574 img_request->callback = NULL; 1575 img_request->obj_request_count = 0; 1576 INIT_LIST_HEAD(&img_request->obj_requests); 1577 kref_init(&img_request->kref); 1578 1579 rbd_img_request_get(img_request); /* Avoid a warning */ 1580 rbd_img_request_put(img_request); /* TEMPORARY */ 1581 1582 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, 1583 write_request ? "write" : "read", offset, length, 1584 img_request); 1585 1586 return img_request; 1587 } 1588 1589 static void rbd_img_request_destroy(struct kref *kref) 1590 { 1591 struct rbd_img_request *img_request; 1592 struct rbd_obj_request *obj_request; 1593 struct rbd_obj_request *next_obj_request; 1594 1595 img_request = container_of(kref, struct rbd_img_request, kref); 1596 1597 dout("%s: img %p\n", __func__, img_request); 1598 1599 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1600 rbd_img_obj_request_del(img_request, obj_request); 1601 rbd_assert(img_request->obj_request_count == 0); 1602 1603 if (img_request->write_request) 1604 ceph_put_snap_context(img_request->snapc); 1605 1606 kfree(img_request); 1607 } 1608 1609 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, 1610 struct bio *bio_list) 1611 { 1612 struct rbd_device *rbd_dev = img_request->rbd_dev; 1613 struct rbd_obj_request *obj_request = NULL; 1614 struct rbd_obj_request *next_obj_request; 1615 unsigned int bio_offset; 1616 u64 image_offset; 1617 u64 resid; 1618 u16 opcode; 1619 1620 dout("%s: img %p bio %p\n", __func__, img_request, bio_list); 1621 1622 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE 1623 : CEPH_OSD_OP_READ; 1624 bio_offset = 0; 1625 image_offset = img_request->offset; 1626 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT); 1627 resid = img_request->length; 1628 rbd_assert(resid > 0); 1629 while (resid) { 1630 const char *object_name; 1631 unsigned int clone_size; 1632 struct ceph_osd_req_op *op; 1633 u64 offset; 1634 u64 length; 1635 1636 object_name = rbd_segment_name(rbd_dev, image_offset); 1637 if (!object_name) 1638 goto out_unwind; 1639 offset = rbd_segment_offset(rbd_dev, image_offset); 1640 length = rbd_segment_length(rbd_dev, image_offset, resid); 1641 obj_request = rbd_obj_request_create(object_name, 1642 offset, length, 1643 OBJ_REQUEST_BIO); 1644 kfree(object_name); /* object request has its own copy */ 1645 if (!obj_request) 1646 goto out_unwind; 1647 1648 rbd_assert(length <= (u64) UINT_MAX); 1649 clone_size = (unsigned int) length; 1650 obj_request->bio_list = bio_chain_clone_range(&bio_list, 1651 &bio_offset, clone_size, 1652 GFP_ATOMIC); 1653 if (!obj_request->bio_list) 1654 goto out_partial; 1655 1656 /* 1657 * Build up the op to use in building the osd 1658 * request. Note that the contents of the op are 1659 * copied by rbd_osd_req_create(). 1660 */ 1661 op = rbd_osd_req_op_create(opcode, offset, length); 1662 if (!op) 1663 goto out_partial; 1664 obj_request->osd_req = rbd_osd_req_create(rbd_dev, 1665 img_request->write_request, 1666 obj_request, op); 1667 rbd_osd_req_op_destroy(op); 1668 if (!obj_request->osd_req) 1669 goto out_partial; 1670 /* status and version are initially zero-filled */ 1671 1672 rbd_img_obj_request_add(img_request, obj_request); 1673 1674 image_offset += length; 1675 resid -= length; 1676 } 1677 1678 return 0; 1679 1680 out_partial: 1681 rbd_obj_request_put(obj_request); 1682 out_unwind: 1683 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1684 rbd_obj_request_put(obj_request); 1685 1686 return -ENOMEM; 1687 } 1688 1689 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) 1690 { 1691 struct rbd_img_request *img_request; 1692 u32 which = obj_request->which; 1693 bool more = true; 1694 1695 img_request = obj_request->img_request; 1696 1697 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1698 rbd_assert(img_request != NULL); 1699 rbd_assert(img_request->rq != NULL); 1700 rbd_assert(img_request->obj_request_count > 0); 1701 rbd_assert(which != BAD_WHICH); 1702 rbd_assert(which < img_request->obj_request_count); 1703 rbd_assert(which >= img_request->next_completion); 1704 1705 spin_lock_irq(&img_request->completion_lock); 1706 if (which != img_request->next_completion) 1707 goto out; 1708 1709 for_each_obj_request_from(img_request, obj_request) { 1710 unsigned int xferred; 1711 int result; 1712 1713 rbd_assert(more); 1714 rbd_assert(which < img_request->obj_request_count); 1715 1716 if (!obj_request_done_test(obj_request)) 1717 break; 1718 1719 rbd_assert(obj_request->xferred <= (u64) UINT_MAX); 1720 xferred = (unsigned int) obj_request->xferred; 1721 result = (int) obj_request->result; 1722 if (result) 1723 rbd_warn(NULL, "obj_request %s result %d xferred %u\n", 1724 img_request->write_request ? "write" : "read", 1725 result, xferred); 1726 1727 more = blk_end_request(img_request->rq, result, xferred); 1728 which++; 1729 } 1730 1731 rbd_assert(more ^ (which == img_request->obj_request_count)); 1732 img_request->next_completion = which; 1733 out: 1734 spin_unlock_irq(&img_request->completion_lock); 1735 1736 if (!more) 1737 rbd_img_request_complete(img_request); 1738 } 1739 1740 static int rbd_img_request_submit(struct rbd_img_request *img_request) 1741 { 1742 struct rbd_device *rbd_dev = img_request->rbd_dev; 1743 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1744 struct rbd_obj_request *obj_request; 1745 1746 dout("%s: img %p\n", __func__, img_request); 1747 for_each_obj_request(img_request, obj_request) { 1748 int ret; 1749 1750 obj_request->callback = rbd_img_obj_callback; 1751 ret = rbd_obj_request_submit(osdc, obj_request); 1752 if (ret) 1753 return ret; 1754 /* 1755 * The image request has its own reference to each 1756 * of its object requests, so we can safely drop the 1757 * initial one here. 1758 */ 1759 rbd_obj_request_put(obj_request); 1760 } 1761 1762 return 0; 1763 } 1764 1765 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, 1766 u64 ver, u64 notify_id) 1767 { 1768 struct rbd_obj_request *obj_request; 1769 struct ceph_osd_req_op *op; 1770 struct ceph_osd_client *osdc; 1771 int ret; 1772 1773 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 1774 OBJ_REQUEST_NODATA); 1775 if (!obj_request) 1776 return -ENOMEM; 1777 1778 ret = -ENOMEM; 1779 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver); 1780 if (!op) 1781 goto out; 1782 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1783 obj_request, op); 1784 rbd_osd_req_op_destroy(op); 1785 if (!obj_request->osd_req) 1786 goto out; 1787 1788 osdc = &rbd_dev->rbd_client->client->osdc; 1789 obj_request->callback = rbd_obj_request_put; 1790 ret = rbd_obj_request_submit(osdc, obj_request); 1791 out: 1792 if (ret) 1793 rbd_obj_request_put(obj_request); 1794 1795 return ret; 1796 } 1797 1798 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 1799 { 1800 struct rbd_device *rbd_dev = (struct rbd_device *)data; 1801 u64 hver; 1802 int rc; 1803 1804 if (!rbd_dev) 1805 return; 1806 1807 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, 1808 rbd_dev->header_name, (unsigned long long) notify_id, 1809 (unsigned int) opcode); 1810 rc = rbd_dev_refresh(rbd_dev, &hver); 1811 if (rc) 1812 rbd_warn(rbd_dev, "got notification but failed to " 1813 " update snaps: %d\n", rc); 1814 1815 rbd_obj_notify_ack(rbd_dev, hver, notify_id); 1816 } 1817 1818 /* 1819 * Request sync osd watch/unwatch. The value of "start" determines 1820 * whether a watch request is being initiated or torn down. 1821 */ 1822 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) 1823 { 1824 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1825 struct rbd_obj_request *obj_request; 1826 struct ceph_osd_req_op *op; 1827 int ret; 1828 1829 rbd_assert(start ^ !!rbd_dev->watch_event); 1830 rbd_assert(start ^ !!rbd_dev->watch_request); 1831 1832 if (start) { 1833 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, 1834 &rbd_dev->watch_event); 1835 if (ret < 0) 1836 return ret; 1837 rbd_assert(rbd_dev->watch_event != NULL); 1838 } 1839 1840 ret = -ENOMEM; 1841 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 1842 OBJ_REQUEST_NODATA); 1843 if (!obj_request) 1844 goto out_cancel; 1845 1846 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH, 1847 rbd_dev->watch_event->cookie, 1848 rbd_dev->header.obj_version, start); 1849 if (!op) 1850 goto out_cancel; 1851 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1852 obj_request, op); 1853 rbd_osd_req_op_destroy(op); 1854 if (!obj_request->osd_req) 1855 goto out_cancel; 1856 1857 if (start) 1858 ceph_osdc_set_request_linger(osdc, obj_request->osd_req); 1859 else 1860 ceph_osdc_unregister_linger_request(osdc, 1861 rbd_dev->watch_request->osd_req); 1862 ret = rbd_obj_request_submit(osdc, obj_request); 1863 if (ret) 1864 goto out_cancel; 1865 ret = rbd_obj_request_wait(obj_request); 1866 if (ret) 1867 goto out_cancel; 1868 ret = obj_request->result; 1869 if (ret) 1870 goto out_cancel; 1871 1872 /* 1873 * A watch request is set to linger, so the underlying osd 1874 * request won't go away until we unregister it. We retain 1875 * a pointer to the object request during that time (in 1876 * rbd_dev->watch_request), so we'll keep a reference to 1877 * it. We'll drop that reference (below) after we've 1878 * unregistered it. 1879 */ 1880 if (start) { 1881 rbd_dev->watch_request = obj_request; 1882 1883 return 0; 1884 } 1885 1886 /* We have successfully torn down the watch request */ 1887 1888 rbd_obj_request_put(rbd_dev->watch_request); 1889 rbd_dev->watch_request = NULL; 1890 out_cancel: 1891 /* Cancel the event if we're tearing down, or on error */ 1892 ceph_osdc_cancel_event(rbd_dev->watch_event); 1893 rbd_dev->watch_event = NULL; 1894 if (obj_request) 1895 rbd_obj_request_put(obj_request); 1896 1897 return ret; 1898 } 1899 1900 /* 1901 * Synchronous osd object method call 1902 */ 1903 static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 1904 const char *object_name, 1905 const char *class_name, 1906 const char *method_name, 1907 const char *outbound, 1908 size_t outbound_size, 1909 char *inbound, 1910 size_t inbound_size, 1911 u64 *version) 1912 { 1913 struct rbd_obj_request *obj_request; 1914 struct ceph_osd_client *osdc; 1915 struct ceph_osd_req_op *op; 1916 struct page **pages; 1917 u32 page_count; 1918 int ret; 1919 1920 /* 1921 * Method calls are ultimately read operations but they 1922 * don't involve object data (so no offset or length). 1923 * The result should placed into the inbound buffer 1924 * provided. They also supply outbound data--parameters for 1925 * the object method. Currently if this is present it will 1926 * be a snapshot id. 1927 */ 1928 page_count = (u32) calc_pages_for(0, inbound_size); 1929 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 1930 if (IS_ERR(pages)) 1931 return PTR_ERR(pages); 1932 1933 ret = -ENOMEM; 1934 obj_request = rbd_obj_request_create(object_name, 0, 0, 1935 OBJ_REQUEST_PAGES); 1936 if (!obj_request) 1937 goto out; 1938 1939 obj_request->pages = pages; 1940 obj_request->page_count = page_count; 1941 1942 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name, 1943 method_name, outbound, outbound_size); 1944 if (!op) 1945 goto out; 1946 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1947 obj_request, op); 1948 rbd_osd_req_op_destroy(op); 1949 if (!obj_request->osd_req) 1950 goto out; 1951 1952 osdc = &rbd_dev->rbd_client->client->osdc; 1953 ret = rbd_obj_request_submit(osdc, obj_request); 1954 if (ret) 1955 goto out; 1956 ret = rbd_obj_request_wait(obj_request); 1957 if (ret) 1958 goto out; 1959 1960 ret = obj_request->result; 1961 if (ret < 0) 1962 goto out; 1963 ret = 0; 1964 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred); 1965 if (version) 1966 *version = obj_request->version; 1967 out: 1968 if (obj_request) 1969 rbd_obj_request_put(obj_request); 1970 else 1971 ceph_release_page_vector(pages, page_count); 1972 1973 return ret; 1974 } 1975 1976 static void rbd_request_fn(struct request_queue *q) 1977 __releases(q->queue_lock) __acquires(q->queue_lock) 1978 { 1979 struct rbd_device *rbd_dev = q->queuedata; 1980 bool read_only = rbd_dev->mapping.read_only; 1981 struct request *rq; 1982 int result; 1983 1984 while ((rq = blk_fetch_request(q))) { 1985 bool write_request = rq_data_dir(rq) == WRITE; 1986 struct rbd_img_request *img_request; 1987 u64 offset; 1988 u64 length; 1989 1990 /* Ignore any non-FS requests that filter through. */ 1991 1992 if (rq->cmd_type != REQ_TYPE_FS) { 1993 dout("%s: non-fs request type %d\n", __func__, 1994 (int) rq->cmd_type); 1995 __blk_end_request_all(rq, 0); 1996 continue; 1997 } 1998 1999 /* Ignore/skip any zero-length requests */ 2000 2001 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; 2002 length = (u64) blk_rq_bytes(rq); 2003 2004 if (!length) { 2005 dout("%s: zero-length request\n", __func__); 2006 __blk_end_request_all(rq, 0); 2007 continue; 2008 } 2009 2010 spin_unlock_irq(q->queue_lock); 2011 2012 /* Disallow writes to a read-only device */ 2013 2014 if (write_request) { 2015 result = -EROFS; 2016 if (read_only) 2017 goto end_request; 2018 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); 2019 } 2020 2021 /* 2022 * Quit early if the mapped snapshot no longer 2023 * exists. It's still possible the snapshot will 2024 * have disappeared by the time our request arrives 2025 * at the osd, but there's no sense in sending it if 2026 * we already know. 2027 */ 2028 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { 2029 dout("request for non-existent snapshot"); 2030 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 2031 result = -ENXIO; 2032 goto end_request; 2033 } 2034 2035 result = -EINVAL; 2036 if (WARN_ON(offset && length > U64_MAX - offset + 1)) 2037 goto end_request; /* Shouldn't happen */ 2038 2039 result = -ENOMEM; 2040 img_request = rbd_img_request_create(rbd_dev, offset, length, 2041 write_request); 2042 if (!img_request) 2043 goto end_request; 2044 2045 img_request->rq = rq; 2046 2047 result = rbd_img_request_fill_bio(img_request, rq->bio); 2048 if (!result) 2049 result = rbd_img_request_submit(img_request); 2050 if (result) 2051 rbd_img_request_put(img_request); 2052 end_request: 2053 spin_lock_irq(q->queue_lock); 2054 if (result < 0) { 2055 rbd_warn(rbd_dev, "obj_request %s result %d\n", 2056 write_request ? "write" : "read", result); 2057 __blk_end_request_all(rq, result); 2058 } 2059 } 2060 } 2061 2062 /* 2063 * a queue callback. Makes sure that we don't create a bio that spans across 2064 * multiple osd objects. One exception would be with a single page bios, 2065 * which we handle later at bio_chain_clone_range() 2066 */ 2067 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 2068 struct bio_vec *bvec) 2069 { 2070 struct rbd_device *rbd_dev = q->queuedata; 2071 sector_t sector_offset; 2072 sector_t sectors_per_obj; 2073 sector_t obj_sector_offset; 2074 int ret; 2075 2076 /* 2077 * Find how far into its rbd object the partition-relative 2078 * bio start sector is to offset relative to the enclosing 2079 * device. 2080 */ 2081 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector; 2082 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); 2083 obj_sector_offset = sector_offset & (sectors_per_obj - 1); 2084 2085 /* 2086 * Compute the number of bytes from that offset to the end 2087 * of the object. Account for what's already used by the bio. 2088 */ 2089 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT; 2090 if (ret > bmd->bi_size) 2091 ret -= bmd->bi_size; 2092 else 2093 ret = 0; 2094 2095 /* 2096 * Don't send back more than was asked for. And if the bio 2097 * was empty, let the whole thing through because: "Note 2098 * that a block device *must* allow a single page to be 2099 * added to an empty bio." 2100 */ 2101 rbd_assert(bvec->bv_len <= PAGE_SIZE); 2102 if (ret > (int) bvec->bv_len || !bmd->bi_size) 2103 ret = (int) bvec->bv_len; 2104 2105 return ret; 2106 } 2107 2108 static void rbd_free_disk(struct rbd_device *rbd_dev) 2109 { 2110 struct gendisk *disk = rbd_dev->disk; 2111 2112 if (!disk) 2113 return; 2114 2115 if (disk->flags & GENHD_FL_UP) 2116 del_gendisk(disk); 2117 if (disk->queue) 2118 blk_cleanup_queue(disk->queue); 2119 put_disk(disk); 2120 } 2121 2122 static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 2123 const char *object_name, 2124 u64 offset, u64 length, 2125 char *buf, u64 *version) 2126 2127 { 2128 struct ceph_osd_req_op *op; 2129 struct rbd_obj_request *obj_request; 2130 struct ceph_osd_client *osdc; 2131 struct page **pages = NULL; 2132 u32 page_count; 2133 size_t size; 2134 int ret; 2135 2136 page_count = (u32) calc_pages_for(offset, length); 2137 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2138 if (IS_ERR(pages)) 2139 ret = PTR_ERR(pages); 2140 2141 ret = -ENOMEM; 2142 obj_request = rbd_obj_request_create(object_name, offset, length, 2143 OBJ_REQUEST_PAGES); 2144 if (!obj_request) 2145 goto out; 2146 2147 obj_request->pages = pages; 2148 obj_request->page_count = page_count; 2149 2150 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length); 2151 if (!op) 2152 goto out; 2153 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 2154 obj_request, op); 2155 rbd_osd_req_op_destroy(op); 2156 if (!obj_request->osd_req) 2157 goto out; 2158 2159 osdc = &rbd_dev->rbd_client->client->osdc; 2160 ret = rbd_obj_request_submit(osdc, obj_request); 2161 if (ret) 2162 goto out; 2163 ret = rbd_obj_request_wait(obj_request); 2164 if (ret) 2165 goto out; 2166 2167 ret = obj_request->result; 2168 if (ret < 0) 2169 goto out; 2170 2171 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX); 2172 size = (size_t) obj_request->xferred; 2173 ceph_copy_from_page_vector(pages, buf, 0, size); 2174 rbd_assert(size <= (size_t) INT_MAX); 2175 ret = (int) size; 2176 if (version) 2177 *version = obj_request->version; 2178 out: 2179 if (obj_request) 2180 rbd_obj_request_put(obj_request); 2181 else 2182 ceph_release_page_vector(pages, page_count); 2183 2184 return ret; 2185 } 2186 2187 /* 2188 * Read the complete header for the given rbd device. 2189 * 2190 * Returns a pointer to a dynamically-allocated buffer containing 2191 * the complete and validated header. Caller can pass the address 2192 * of a variable that will be filled in with the version of the 2193 * header object at the time it was read. 2194 * 2195 * Returns a pointer-coded errno if a failure occurs. 2196 */ 2197 static struct rbd_image_header_ondisk * 2198 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version) 2199 { 2200 struct rbd_image_header_ondisk *ondisk = NULL; 2201 u32 snap_count = 0; 2202 u64 names_size = 0; 2203 u32 want_count; 2204 int ret; 2205 2206 /* 2207 * The complete header will include an array of its 64-bit 2208 * snapshot ids, followed by the names of those snapshots as 2209 * a contiguous block of NUL-terminated strings. Note that 2210 * the number of snapshots could change by the time we read 2211 * it in, in which case we re-read it. 2212 */ 2213 do { 2214 size_t size; 2215 2216 kfree(ondisk); 2217 2218 size = sizeof (*ondisk); 2219 size += snap_count * sizeof (struct rbd_image_snap_ondisk); 2220 size += names_size; 2221 ondisk = kmalloc(size, GFP_KERNEL); 2222 if (!ondisk) 2223 return ERR_PTR(-ENOMEM); 2224 2225 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name, 2226 0, size, 2227 (char *) ondisk, version); 2228 if (ret < 0) 2229 goto out_err; 2230 if (WARN_ON((size_t) ret < size)) { 2231 ret = -ENXIO; 2232 rbd_warn(rbd_dev, "short header read (want %zd got %d)", 2233 size, ret); 2234 goto out_err; 2235 } 2236 if (!rbd_dev_ondisk_valid(ondisk)) { 2237 ret = -ENXIO; 2238 rbd_warn(rbd_dev, "invalid header"); 2239 goto out_err; 2240 } 2241 2242 names_size = le64_to_cpu(ondisk->snap_names_len); 2243 want_count = snap_count; 2244 snap_count = le32_to_cpu(ondisk->snap_count); 2245 } while (snap_count != want_count); 2246 2247 return ondisk; 2248 2249 out_err: 2250 kfree(ondisk); 2251 2252 return ERR_PTR(ret); 2253 } 2254 2255 /* 2256 * reload the ondisk the header 2257 */ 2258 static int rbd_read_header(struct rbd_device *rbd_dev, 2259 struct rbd_image_header *header) 2260 { 2261 struct rbd_image_header_ondisk *ondisk; 2262 u64 ver = 0; 2263 int ret; 2264 2265 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver); 2266 if (IS_ERR(ondisk)) 2267 return PTR_ERR(ondisk); 2268 ret = rbd_header_from_disk(header, ondisk); 2269 if (ret >= 0) 2270 header->obj_version = ver; 2271 kfree(ondisk); 2272 2273 return ret; 2274 } 2275 2276 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev) 2277 { 2278 struct rbd_snap *snap; 2279 struct rbd_snap *next; 2280 2281 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) 2282 rbd_remove_snap_dev(snap); 2283 } 2284 2285 static void rbd_update_mapping_size(struct rbd_device *rbd_dev) 2286 { 2287 sector_t size; 2288 2289 if (rbd_dev->spec->snap_id != CEPH_NOSNAP) 2290 return; 2291 2292 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE; 2293 dout("setting size to %llu sectors", (unsigned long long) size); 2294 rbd_dev->mapping.size = (u64) size; 2295 set_capacity(rbd_dev->disk, size); 2296 } 2297 2298 /* 2299 * only read the first part of the ondisk header, without the snaps info 2300 */ 2301 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver) 2302 { 2303 int ret; 2304 struct rbd_image_header h; 2305 2306 ret = rbd_read_header(rbd_dev, &h); 2307 if (ret < 0) 2308 return ret; 2309 2310 down_write(&rbd_dev->header_rwsem); 2311 2312 /* Update image size, and check for resize of mapped image */ 2313 rbd_dev->header.image_size = h.image_size; 2314 rbd_update_mapping_size(rbd_dev); 2315 2316 /* rbd_dev->header.object_prefix shouldn't change */ 2317 kfree(rbd_dev->header.snap_sizes); 2318 kfree(rbd_dev->header.snap_names); 2319 /* osd requests may still refer to snapc */ 2320 ceph_put_snap_context(rbd_dev->header.snapc); 2321 2322 if (hver) 2323 *hver = h.obj_version; 2324 rbd_dev->header.obj_version = h.obj_version; 2325 rbd_dev->header.image_size = h.image_size; 2326 rbd_dev->header.snapc = h.snapc; 2327 rbd_dev->header.snap_names = h.snap_names; 2328 rbd_dev->header.snap_sizes = h.snap_sizes; 2329 /* Free the extra copy of the object prefix */ 2330 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix)); 2331 kfree(h.object_prefix); 2332 2333 ret = rbd_dev_snaps_update(rbd_dev); 2334 if (!ret) 2335 ret = rbd_dev_snaps_register(rbd_dev); 2336 2337 up_write(&rbd_dev->header_rwsem); 2338 2339 return ret; 2340 } 2341 2342 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver) 2343 { 2344 int ret; 2345 2346 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 2347 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2348 if (rbd_dev->image_format == 1) 2349 ret = rbd_dev_v1_refresh(rbd_dev, hver); 2350 else 2351 ret = rbd_dev_v2_refresh(rbd_dev, hver); 2352 mutex_unlock(&ctl_mutex); 2353 2354 return ret; 2355 } 2356 2357 static int rbd_init_disk(struct rbd_device *rbd_dev) 2358 { 2359 struct gendisk *disk; 2360 struct request_queue *q; 2361 u64 segment_size; 2362 2363 /* create gendisk info */ 2364 disk = alloc_disk(RBD_MINORS_PER_MAJOR); 2365 if (!disk) 2366 return -ENOMEM; 2367 2368 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 2369 rbd_dev->dev_id); 2370 disk->major = rbd_dev->major; 2371 disk->first_minor = 0; 2372 disk->fops = &rbd_bd_ops; 2373 disk->private_data = rbd_dev; 2374 2375 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); 2376 if (!q) 2377 goto out_disk; 2378 2379 /* We use the default size, but let's be explicit about it. */ 2380 blk_queue_physical_block_size(q, SECTOR_SIZE); 2381 2382 /* set io sizes to object size */ 2383 segment_size = rbd_obj_bytes(&rbd_dev->header); 2384 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); 2385 blk_queue_max_segment_size(q, segment_size); 2386 blk_queue_io_min(q, segment_size); 2387 blk_queue_io_opt(q, segment_size); 2388 2389 blk_queue_merge_bvec(q, rbd_merge_bvec); 2390 disk->queue = q; 2391 2392 q->queuedata = rbd_dev; 2393 2394 rbd_dev->disk = disk; 2395 2396 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 2397 2398 return 0; 2399 out_disk: 2400 put_disk(disk); 2401 2402 return -ENOMEM; 2403 } 2404 2405 /* 2406 sysfs 2407 */ 2408 2409 static struct rbd_device *dev_to_rbd_dev(struct device *dev) 2410 { 2411 return container_of(dev, struct rbd_device, dev); 2412 } 2413 2414 static ssize_t rbd_size_show(struct device *dev, 2415 struct device_attribute *attr, char *buf) 2416 { 2417 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2418 sector_t size; 2419 2420 down_read(&rbd_dev->header_rwsem); 2421 size = get_capacity(rbd_dev->disk); 2422 up_read(&rbd_dev->header_rwsem); 2423 2424 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE); 2425 } 2426 2427 /* 2428 * Note this shows the features for whatever's mapped, which is not 2429 * necessarily the base image. 2430 */ 2431 static ssize_t rbd_features_show(struct device *dev, 2432 struct device_attribute *attr, char *buf) 2433 { 2434 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2435 2436 return sprintf(buf, "0x%016llx\n", 2437 (unsigned long long) rbd_dev->mapping.features); 2438 } 2439 2440 static ssize_t rbd_major_show(struct device *dev, 2441 struct device_attribute *attr, char *buf) 2442 { 2443 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2444 2445 return sprintf(buf, "%d\n", rbd_dev->major); 2446 } 2447 2448 static ssize_t rbd_client_id_show(struct device *dev, 2449 struct device_attribute *attr, char *buf) 2450 { 2451 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2452 2453 return sprintf(buf, "client%lld\n", 2454 ceph_client_id(rbd_dev->rbd_client->client)); 2455 } 2456 2457 static ssize_t rbd_pool_show(struct device *dev, 2458 struct device_attribute *attr, char *buf) 2459 { 2460 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2461 2462 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); 2463 } 2464 2465 static ssize_t rbd_pool_id_show(struct device *dev, 2466 struct device_attribute *attr, char *buf) 2467 { 2468 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2469 2470 return sprintf(buf, "%llu\n", 2471 (unsigned long long) rbd_dev->spec->pool_id); 2472 } 2473 2474 static ssize_t rbd_name_show(struct device *dev, 2475 struct device_attribute *attr, char *buf) 2476 { 2477 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2478 2479 if (rbd_dev->spec->image_name) 2480 return sprintf(buf, "%s\n", rbd_dev->spec->image_name); 2481 2482 return sprintf(buf, "(unknown)\n"); 2483 } 2484 2485 static ssize_t rbd_image_id_show(struct device *dev, 2486 struct device_attribute *attr, char *buf) 2487 { 2488 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2489 2490 return sprintf(buf, "%s\n", rbd_dev->spec->image_id); 2491 } 2492 2493 /* 2494 * Shows the name of the currently-mapped snapshot (or 2495 * RBD_SNAP_HEAD_NAME for the base image). 2496 */ 2497 static ssize_t rbd_snap_show(struct device *dev, 2498 struct device_attribute *attr, 2499 char *buf) 2500 { 2501 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2502 2503 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 2504 } 2505 2506 /* 2507 * For an rbd v2 image, shows the pool id, image id, and snapshot id 2508 * for the parent image. If there is no parent, simply shows 2509 * "(no parent image)". 2510 */ 2511 static ssize_t rbd_parent_show(struct device *dev, 2512 struct device_attribute *attr, 2513 char *buf) 2514 { 2515 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2516 struct rbd_spec *spec = rbd_dev->parent_spec; 2517 int count; 2518 char *bufp = buf; 2519 2520 if (!spec) 2521 return sprintf(buf, "(no parent image)\n"); 2522 2523 count = sprintf(bufp, "pool_id %llu\npool_name %s\n", 2524 (unsigned long long) spec->pool_id, spec->pool_name); 2525 if (count < 0) 2526 return count; 2527 bufp += count; 2528 2529 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id, 2530 spec->image_name ? spec->image_name : "(unknown)"); 2531 if (count < 0) 2532 return count; 2533 bufp += count; 2534 2535 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n", 2536 (unsigned long long) spec->snap_id, spec->snap_name); 2537 if (count < 0) 2538 return count; 2539 bufp += count; 2540 2541 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap); 2542 if (count < 0) 2543 return count; 2544 bufp += count; 2545 2546 return (ssize_t) (bufp - buf); 2547 } 2548 2549 static ssize_t rbd_image_refresh(struct device *dev, 2550 struct device_attribute *attr, 2551 const char *buf, 2552 size_t size) 2553 { 2554 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2555 int ret; 2556 2557 ret = rbd_dev_refresh(rbd_dev, NULL); 2558 2559 return ret < 0 ? ret : size; 2560 } 2561 2562 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 2563 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); 2564 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 2565 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 2566 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 2567 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); 2568 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 2569 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 2570 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 2571 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 2572 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); 2573 2574 static struct attribute *rbd_attrs[] = { 2575 &dev_attr_size.attr, 2576 &dev_attr_features.attr, 2577 &dev_attr_major.attr, 2578 &dev_attr_client_id.attr, 2579 &dev_attr_pool.attr, 2580 &dev_attr_pool_id.attr, 2581 &dev_attr_name.attr, 2582 &dev_attr_image_id.attr, 2583 &dev_attr_current_snap.attr, 2584 &dev_attr_parent.attr, 2585 &dev_attr_refresh.attr, 2586 NULL 2587 }; 2588 2589 static struct attribute_group rbd_attr_group = { 2590 .attrs = rbd_attrs, 2591 }; 2592 2593 static const struct attribute_group *rbd_attr_groups[] = { 2594 &rbd_attr_group, 2595 NULL 2596 }; 2597 2598 static void rbd_sysfs_dev_release(struct device *dev) 2599 { 2600 } 2601 2602 static struct device_type rbd_device_type = { 2603 .name = "rbd", 2604 .groups = rbd_attr_groups, 2605 .release = rbd_sysfs_dev_release, 2606 }; 2607 2608 2609 /* 2610 sysfs - snapshots 2611 */ 2612 2613 static ssize_t rbd_snap_size_show(struct device *dev, 2614 struct device_attribute *attr, 2615 char *buf) 2616 { 2617 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2618 2619 return sprintf(buf, "%llu\n", (unsigned long long)snap->size); 2620 } 2621 2622 static ssize_t rbd_snap_id_show(struct device *dev, 2623 struct device_attribute *attr, 2624 char *buf) 2625 { 2626 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2627 2628 return sprintf(buf, "%llu\n", (unsigned long long)snap->id); 2629 } 2630 2631 static ssize_t rbd_snap_features_show(struct device *dev, 2632 struct device_attribute *attr, 2633 char *buf) 2634 { 2635 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2636 2637 return sprintf(buf, "0x%016llx\n", 2638 (unsigned long long) snap->features); 2639 } 2640 2641 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); 2642 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); 2643 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL); 2644 2645 static struct attribute *rbd_snap_attrs[] = { 2646 &dev_attr_snap_size.attr, 2647 &dev_attr_snap_id.attr, 2648 &dev_attr_snap_features.attr, 2649 NULL, 2650 }; 2651 2652 static struct attribute_group rbd_snap_attr_group = { 2653 .attrs = rbd_snap_attrs, 2654 }; 2655 2656 static void rbd_snap_dev_release(struct device *dev) 2657 { 2658 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2659 kfree(snap->name); 2660 kfree(snap); 2661 } 2662 2663 static const struct attribute_group *rbd_snap_attr_groups[] = { 2664 &rbd_snap_attr_group, 2665 NULL 2666 }; 2667 2668 static struct device_type rbd_snap_device_type = { 2669 .groups = rbd_snap_attr_groups, 2670 .release = rbd_snap_dev_release, 2671 }; 2672 2673 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 2674 { 2675 kref_get(&spec->kref); 2676 2677 return spec; 2678 } 2679 2680 static void rbd_spec_free(struct kref *kref); 2681 static void rbd_spec_put(struct rbd_spec *spec) 2682 { 2683 if (spec) 2684 kref_put(&spec->kref, rbd_spec_free); 2685 } 2686 2687 static struct rbd_spec *rbd_spec_alloc(void) 2688 { 2689 struct rbd_spec *spec; 2690 2691 spec = kzalloc(sizeof (*spec), GFP_KERNEL); 2692 if (!spec) 2693 return NULL; 2694 kref_init(&spec->kref); 2695 2696 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */ 2697 2698 return spec; 2699 } 2700 2701 static void rbd_spec_free(struct kref *kref) 2702 { 2703 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); 2704 2705 kfree(spec->pool_name); 2706 kfree(spec->image_id); 2707 kfree(spec->image_name); 2708 kfree(spec->snap_name); 2709 kfree(spec); 2710 } 2711 2712 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 2713 struct rbd_spec *spec) 2714 { 2715 struct rbd_device *rbd_dev; 2716 2717 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); 2718 if (!rbd_dev) 2719 return NULL; 2720 2721 spin_lock_init(&rbd_dev->lock); 2722 rbd_dev->flags = 0; 2723 INIT_LIST_HEAD(&rbd_dev->node); 2724 INIT_LIST_HEAD(&rbd_dev->snaps); 2725 init_rwsem(&rbd_dev->header_rwsem); 2726 2727 rbd_dev->spec = spec; 2728 rbd_dev->rbd_client = rbdc; 2729 2730 /* Initialize the layout used for all rbd requests */ 2731 2732 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 2733 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1); 2734 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 2735 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id); 2736 2737 return rbd_dev; 2738 } 2739 2740 static void rbd_dev_destroy(struct rbd_device *rbd_dev) 2741 { 2742 rbd_spec_put(rbd_dev->parent_spec); 2743 kfree(rbd_dev->header_name); 2744 rbd_put_client(rbd_dev->rbd_client); 2745 rbd_spec_put(rbd_dev->spec); 2746 kfree(rbd_dev); 2747 } 2748 2749 static bool rbd_snap_registered(struct rbd_snap *snap) 2750 { 2751 bool ret = snap->dev.type == &rbd_snap_device_type; 2752 bool reg = device_is_registered(&snap->dev); 2753 2754 rbd_assert(!ret ^ reg); 2755 2756 return ret; 2757 } 2758 2759 static void rbd_remove_snap_dev(struct rbd_snap *snap) 2760 { 2761 list_del(&snap->node); 2762 if (device_is_registered(&snap->dev)) 2763 device_unregister(&snap->dev); 2764 } 2765 2766 static int rbd_register_snap_dev(struct rbd_snap *snap, 2767 struct device *parent) 2768 { 2769 struct device *dev = &snap->dev; 2770 int ret; 2771 2772 dev->type = &rbd_snap_device_type; 2773 dev->parent = parent; 2774 dev->release = rbd_snap_dev_release; 2775 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name); 2776 dout("%s: registering device for snapshot %s\n", __func__, snap->name); 2777 2778 ret = device_register(dev); 2779 2780 return ret; 2781 } 2782 2783 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev, 2784 const char *snap_name, 2785 u64 snap_id, u64 snap_size, 2786 u64 snap_features) 2787 { 2788 struct rbd_snap *snap; 2789 int ret; 2790 2791 snap = kzalloc(sizeof (*snap), GFP_KERNEL); 2792 if (!snap) 2793 return ERR_PTR(-ENOMEM); 2794 2795 ret = -ENOMEM; 2796 snap->name = kstrdup(snap_name, GFP_KERNEL); 2797 if (!snap->name) 2798 goto err; 2799 2800 snap->id = snap_id; 2801 snap->size = snap_size; 2802 snap->features = snap_features; 2803 2804 return snap; 2805 2806 err: 2807 kfree(snap->name); 2808 kfree(snap); 2809 2810 return ERR_PTR(ret); 2811 } 2812 2813 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which, 2814 u64 *snap_size, u64 *snap_features) 2815 { 2816 char *snap_name; 2817 2818 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 2819 2820 *snap_size = rbd_dev->header.snap_sizes[which]; 2821 *snap_features = 0; /* No features for v1 */ 2822 2823 /* Skip over names until we find the one we are looking for */ 2824 2825 snap_name = rbd_dev->header.snap_names; 2826 while (which--) 2827 snap_name += strlen(snap_name) + 1; 2828 2829 return snap_name; 2830 } 2831 2832 /* 2833 * Get the size and object order for an image snapshot, or if 2834 * snap_id is CEPH_NOSNAP, gets this information for the base 2835 * image. 2836 */ 2837 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 2838 u8 *order, u64 *snap_size) 2839 { 2840 __le64 snapid = cpu_to_le64(snap_id); 2841 int ret; 2842 struct { 2843 u8 order; 2844 __le64 size; 2845 } __attribute__ ((packed)) size_buf = { 0 }; 2846 2847 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 2848 "rbd", "get_size", 2849 (char *) &snapid, sizeof (snapid), 2850 (char *) &size_buf, sizeof (size_buf), NULL); 2851 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 2852 if (ret < 0) 2853 return ret; 2854 2855 *order = size_buf.order; 2856 *snap_size = le64_to_cpu(size_buf.size); 2857 2858 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n", 2859 (unsigned long long) snap_id, (unsigned int) *order, 2860 (unsigned long long) *snap_size); 2861 2862 return 0; 2863 } 2864 2865 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) 2866 { 2867 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, 2868 &rbd_dev->header.obj_order, 2869 &rbd_dev->header.image_size); 2870 } 2871 2872 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) 2873 { 2874 void *reply_buf; 2875 int ret; 2876 void *p; 2877 2878 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); 2879 if (!reply_buf) 2880 return -ENOMEM; 2881 2882 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 2883 "rbd", "get_object_prefix", 2884 NULL, 0, 2885 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL); 2886 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 2887 if (ret < 0) 2888 goto out; 2889 2890 p = reply_buf; 2891 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 2892 p + RBD_OBJ_PREFIX_LEN_MAX, 2893 NULL, GFP_NOIO); 2894 2895 if (IS_ERR(rbd_dev->header.object_prefix)) { 2896 ret = PTR_ERR(rbd_dev->header.object_prefix); 2897 rbd_dev->header.object_prefix = NULL; 2898 } else { 2899 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 2900 } 2901 2902 out: 2903 kfree(reply_buf); 2904 2905 return ret; 2906 } 2907 2908 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 2909 u64 *snap_features) 2910 { 2911 __le64 snapid = cpu_to_le64(snap_id); 2912 struct { 2913 __le64 features; 2914 __le64 incompat; 2915 } features_buf = { 0 }; 2916 u64 incompat; 2917 int ret; 2918 2919 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 2920 "rbd", "get_features", 2921 (char *) &snapid, sizeof (snapid), 2922 (char *) &features_buf, sizeof (features_buf), 2923 NULL); 2924 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 2925 if (ret < 0) 2926 return ret; 2927 2928 incompat = le64_to_cpu(features_buf.incompat); 2929 if (incompat & ~RBD_FEATURES_ALL) 2930 return -ENXIO; 2931 2932 *snap_features = le64_to_cpu(features_buf.features); 2933 2934 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 2935 (unsigned long long) snap_id, 2936 (unsigned long long) *snap_features, 2937 (unsigned long long) le64_to_cpu(features_buf.incompat)); 2938 2939 return 0; 2940 } 2941 2942 static int rbd_dev_v2_features(struct rbd_device *rbd_dev) 2943 { 2944 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 2945 &rbd_dev->header.features); 2946 } 2947 2948 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) 2949 { 2950 struct rbd_spec *parent_spec; 2951 size_t size; 2952 void *reply_buf = NULL; 2953 __le64 snapid; 2954 void *p; 2955 void *end; 2956 char *image_id; 2957 u64 overlap; 2958 int ret; 2959 2960 parent_spec = rbd_spec_alloc(); 2961 if (!parent_spec) 2962 return -ENOMEM; 2963 2964 size = sizeof (__le64) + /* pool_id */ 2965 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */ 2966 sizeof (__le64) + /* snap_id */ 2967 sizeof (__le64); /* overlap */ 2968 reply_buf = kmalloc(size, GFP_KERNEL); 2969 if (!reply_buf) { 2970 ret = -ENOMEM; 2971 goto out_err; 2972 } 2973 2974 snapid = cpu_to_le64(CEPH_NOSNAP); 2975 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 2976 "rbd", "get_parent", 2977 (char *) &snapid, sizeof (snapid), 2978 (char *) reply_buf, size, NULL); 2979 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 2980 if (ret < 0) 2981 goto out_err; 2982 2983 ret = -ERANGE; 2984 p = reply_buf; 2985 end = (char *) reply_buf + size; 2986 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err); 2987 if (parent_spec->pool_id == CEPH_NOPOOL) 2988 goto out; /* No parent? No problem. */ 2989 2990 /* The ceph file layout needs to fit pool id in 32 bits */ 2991 2992 ret = -EIO; 2993 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX)) 2994 goto out; 2995 2996 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 2997 if (IS_ERR(image_id)) { 2998 ret = PTR_ERR(image_id); 2999 goto out_err; 3000 } 3001 parent_spec->image_id = image_id; 3002 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); 3003 ceph_decode_64_safe(&p, end, overlap, out_err); 3004 3005 rbd_dev->parent_overlap = overlap; 3006 rbd_dev->parent_spec = parent_spec; 3007 parent_spec = NULL; /* rbd_dev now owns this */ 3008 out: 3009 ret = 0; 3010 out_err: 3011 kfree(reply_buf); 3012 rbd_spec_put(parent_spec); 3013 3014 return ret; 3015 } 3016 3017 static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 3018 { 3019 size_t image_id_size; 3020 char *image_id; 3021 void *p; 3022 void *end; 3023 size_t size; 3024 void *reply_buf = NULL; 3025 size_t len = 0; 3026 char *image_name = NULL; 3027 int ret; 3028 3029 rbd_assert(!rbd_dev->spec->image_name); 3030 3031 len = strlen(rbd_dev->spec->image_id); 3032 image_id_size = sizeof (__le32) + len; 3033 image_id = kmalloc(image_id_size, GFP_KERNEL); 3034 if (!image_id) 3035 return NULL; 3036 3037 p = image_id; 3038 end = (char *) image_id + image_id_size; 3039 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len); 3040 3041 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 3042 reply_buf = kmalloc(size, GFP_KERNEL); 3043 if (!reply_buf) 3044 goto out; 3045 3046 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY, 3047 "rbd", "dir_get_name", 3048 image_id, image_id_size, 3049 (char *) reply_buf, size, NULL); 3050 if (ret < 0) 3051 goto out; 3052 p = reply_buf; 3053 end = (char *) reply_buf + size; 3054 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 3055 if (IS_ERR(image_name)) 3056 image_name = NULL; 3057 else 3058 dout("%s: name is %s len is %zd\n", __func__, image_name, len); 3059 out: 3060 kfree(reply_buf); 3061 kfree(image_id); 3062 3063 return image_name; 3064 } 3065 3066 /* 3067 * When a parent image gets probed, we only have the pool, image, 3068 * and snapshot ids but not the names of any of them. This call 3069 * is made later to fill in those names. It has to be done after 3070 * rbd_dev_snaps_update() has completed because some of the 3071 * information (in particular, snapshot name) is not available 3072 * until then. 3073 */ 3074 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev) 3075 { 3076 struct ceph_osd_client *osdc; 3077 const char *name; 3078 void *reply_buf = NULL; 3079 int ret; 3080 3081 if (rbd_dev->spec->pool_name) 3082 return 0; /* Already have the names */ 3083 3084 /* Look up the pool name */ 3085 3086 osdc = &rbd_dev->rbd_client->client->osdc; 3087 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id); 3088 if (!name) { 3089 rbd_warn(rbd_dev, "there is no pool with id %llu", 3090 rbd_dev->spec->pool_id); /* Really a BUG() */ 3091 return -EIO; 3092 } 3093 3094 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL); 3095 if (!rbd_dev->spec->pool_name) 3096 return -ENOMEM; 3097 3098 /* Fetch the image name; tolerate failure here */ 3099 3100 name = rbd_dev_image_name(rbd_dev); 3101 if (name) 3102 rbd_dev->spec->image_name = (char *) name; 3103 else 3104 rbd_warn(rbd_dev, "unable to get image name"); 3105 3106 /* Look up the snapshot name. */ 3107 3108 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id); 3109 if (!name) { 3110 rbd_warn(rbd_dev, "no snapshot with id %llu", 3111 rbd_dev->spec->snap_id); /* Really a BUG() */ 3112 ret = -EIO; 3113 goto out_err; 3114 } 3115 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL); 3116 if(!rbd_dev->spec->snap_name) 3117 goto out_err; 3118 3119 return 0; 3120 out_err: 3121 kfree(reply_buf); 3122 kfree(rbd_dev->spec->pool_name); 3123 rbd_dev->spec->pool_name = NULL; 3124 3125 return ret; 3126 } 3127 3128 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) 3129 { 3130 size_t size; 3131 int ret; 3132 void *reply_buf; 3133 void *p; 3134 void *end; 3135 u64 seq; 3136 u32 snap_count; 3137 struct ceph_snap_context *snapc; 3138 u32 i; 3139 3140 /* 3141 * We'll need room for the seq value (maximum snapshot id), 3142 * snapshot count, and array of that many snapshot ids. 3143 * For now we have a fixed upper limit on the number we're 3144 * prepared to receive. 3145 */ 3146 size = sizeof (__le64) + sizeof (__le32) + 3147 RBD_MAX_SNAP_COUNT * sizeof (__le64); 3148 reply_buf = kzalloc(size, GFP_KERNEL); 3149 if (!reply_buf) 3150 return -ENOMEM; 3151 3152 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3153 "rbd", "get_snapcontext", 3154 NULL, 0, 3155 reply_buf, size, ver); 3156 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3157 if (ret < 0) 3158 goto out; 3159 3160 ret = -ERANGE; 3161 p = reply_buf; 3162 end = (char *) reply_buf + size; 3163 ceph_decode_64_safe(&p, end, seq, out); 3164 ceph_decode_32_safe(&p, end, snap_count, out); 3165 3166 /* 3167 * Make sure the reported number of snapshot ids wouldn't go 3168 * beyond the end of our buffer. But before checking that, 3169 * make sure the computed size of the snapshot context we 3170 * allocate is representable in a size_t. 3171 */ 3172 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) 3173 / sizeof (u64)) { 3174 ret = -EINVAL; 3175 goto out; 3176 } 3177 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 3178 goto out; 3179 3180 size = sizeof (struct ceph_snap_context) + 3181 snap_count * sizeof (snapc->snaps[0]); 3182 snapc = kmalloc(size, GFP_KERNEL); 3183 if (!snapc) { 3184 ret = -ENOMEM; 3185 goto out; 3186 } 3187 3188 atomic_set(&snapc->nref, 1); 3189 snapc->seq = seq; 3190 snapc->num_snaps = snap_count; 3191 for (i = 0; i < snap_count; i++) 3192 snapc->snaps[i] = ceph_decode_64(&p); 3193 3194 rbd_dev->header.snapc = snapc; 3195 3196 dout(" snap context seq = %llu, snap_count = %u\n", 3197 (unsigned long long) seq, (unsigned int) snap_count); 3198 3199 out: 3200 kfree(reply_buf); 3201 3202 return 0; 3203 } 3204 3205 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which) 3206 { 3207 size_t size; 3208 void *reply_buf; 3209 __le64 snap_id; 3210 int ret; 3211 void *p; 3212 void *end; 3213 char *snap_name; 3214 3215 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 3216 reply_buf = kmalloc(size, GFP_KERNEL); 3217 if (!reply_buf) 3218 return ERR_PTR(-ENOMEM); 3219 3220 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]); 3221 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3222 "rbd", "get_snapshot_name", 3223 (char *) &snap_id, sizeof (snap_id), 3224 reply_buf, size, NULL); 3225 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3226 if (ret < 0) 3227 goto out; 3228 3229 p = reply_buf; 3230 end = (char *) reply_buf + size; 3231 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 3232 if (IS_ERR(snap_name)) { 3233 ret = PTR_ERR(snap_name); 3234 goto out; 3235 } else { 3236 dout(" snap_id 0x%016llx snap_name = %s\n", 3237 (unsigned long long) le64_to_cpu(snap_id), snap_name); 3238 } 3239 kfree(reply_buf); 3240 3241 return snap_name; 3242 out: 3243 kfree(reply_buf); 3244 3245 return ERR_PTR(ret); 3246 } 3247 3248 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which, 3249 u64 *snap_size, u64 *snap_features) 3250 { 3251 u64 snap_id; 3252 u8 order; 3253 int ret; 3254 3255 snap_id = rbd_dev->header.snapc->snaps[which]; 3256 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size); 3257 if (ret) 3258 return ERR_PTR(ret); 3259 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features); 3260 if (ret) 3261 return ERR_PTR(ret); 3262 3263 return rbd_dev_v2_snap_name(rbd_dev, which); 3264 } 3265 3266 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which, 3267 u64 *snap_size, u64 *snap_features) 3268 { 3269 if (rbd_dev->image_format == 1) 3270 return rbd_dev_v1_snap_info(rbd_dev, which, 3271 snap_size, snap_features); 3272 if (rbd_dev->image_format == 2) 3273 return rbd_dev_v2_snap_info(rbd_dev, which, 3274 snap_size, snap_features); 3275 return ERR_PTR(-EINVAL); 3276 } 3277 3278 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver) 3279 { 3280 int ret; 3281 __u8 obj_order; 3282 3283 down_write(&rbd_dev->header_rwsem); 3284 3285 /* Grab old order first, to see if it changes */ 3286 3287 obj_order = rbd_dev->header.obj_order, 3288 ret = rbd_dev_v2_image_size(rbd_dev); 3289 if (ret) 3290 goto out; 3291 if (rbd_dev->header.obj_order != obj_order) { 3292 ret = -EIO; 3293 goto out; 3294 } 3295 rbd_update_mapping_size(rbd_dev); 3296 3297 ret = rbd_dev_v2_snap_context(rbd_dev, hver); 3298 dout("rbd_dev_v2_snap_context returned %d\n", ret); 3299 if (ret) 3300 goto out; 3301 ret = rbd_dev_snaps_update(rbd_dev); 3302 dout("rbd_dev_snaps_update returned %d\n", ret); 3303 if (ret) 3304 goto out; 3305 ret = rbd_dev_snaps_register(rbd_dev); 3306 dout("rbd_dev_snaps_register returned %d\n", ret); 3307 out: 3308 up_write(&rbd_dev->header_rwsem); 3309 3310 return ret; 3311 } 3312 3313 /* 3314 * Scan the rbd device's current snapshot list and compare it to the 3315 * newly-received snapshot context. Remove any existing snapshots 3316 * not present in the new snapshot context. Add a new snapshot for 3317 * any snaphots in the snapshot context not in the current list. 3318 * And verify there are no changes to snapshots we already know 3319 * about. 3320 * 3321 * Assumes the snapshots in the snapshot context are sorted by 3322 * snapshot id, highest id first. (Snapshots in the rbd_dev's list 3323 * are also maintained in that order.) 3324 */ 3325 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev) 3326 { 3327 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 3328 const u32 snap_count = snapc->num_snaps; 3329 struct list_head *head = &rbd_dev->snaps; 3330 struct list_head *links = head->next; 3331 u32 index = 0; 3332 3333 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count); 3334 while (index < snap_count || links != head) { 3335 u64 snap_id; 3336 struct rbd_snap *snap; 3337 char *snap_name; 3338 u64 snap_size = 0; 3339 u64 snap_features = 0; 3340 3341 snap_id = index < snap_count ? snapc->snaps[index] 3342 : CEPH_NOSNAP; 3343 snap = links != head ? list_entry(links, struct rbd_snap, node) 3344 : NULL; 3345 rbd_assert(!snap || snap->id != CEPH_NOSNAP); 3346 3347 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) { 3348 struct list_head *next = links->next; 3349 3350 /* 3351 * A previously-existing snapshot is not in 3352 * the new snap context. 3353 * 3354 * If the now missing snapshot is the one the 3355 * image is mapped to, clear its exists flag 3356 * so we can avoid sending any more requests 3357 * to it. 3358 */ 3359 if (rbd_dev->spec->snap_id == snap->id) 3360 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 3361 rbd_remove_snap_dev(snap); 3362 dout("%ssnap id %llu has been removed\n", 3363 rbd_dev->spec->snap_id == snap->id ? 3364 "mapped " : "", 3365 (unsigned long long) snap->id); 3366 3367 /* Done with this list entry; advance */ 3368 3369 links = next; 3370 continue; 3371 } 3372 3373 snap_name = rbd_dev_snap_info(rbd_dev, index, 3374 &snap_size, &snap_features); 3375 if (IS_ERR(snap_name)) 3376 return PTR_ERR(snap_name); 3377 3378 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count, 3379 (unsigned long long) snap_id); 3380 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) { 3381 struct rbd_snap *new_snap; 3382 3383 /* We haven't seen this snapshot before */ 3384 3385 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name, 3386 snap_id, snap_size, snap_features); 3387 if (IS_ERR(new_snap)) { 3388 int err = PTR_ERR(new_snap); 3389 3390 dout(" failed to add dev, error %d\n", err); 3391 3392 return err; 3393 } 3394 3395 /* New goes before existing, or at end of list */ 3396 3397 dout(" added dev%s\n", snap ? "" : " at end\n"); 3398 if (snap) 3399 list_add_tail(&new_snap->node, &snap->node); 3400 else 3401 list_add_tail(&new_snap->node, head); 3402 } else { 3403 /* Already have this one */ 3404 3405 dout(" already present\n"); 3406 3407 rbd_assert(snap->size == snap_size); 3408 rbd_assert(!strcmp(snap->name, snap_name)); 3409 rbd_assert(snap->features == snap_features); 3410 3411 /* Done with this list entry; advance */ 3412 3413 links = links->next; 3414 } 3415 3416 /* Advance to the next entry in the snapshot context */ 3417 3418 index++; 3419 } 3420 dout("%s: done\n", __func__); 3421 3422 return 0; 3423 } 3424 3425 /* 3426 * Scan the list of snapshots and register the devices for any that 3427 * have not already been registered. 3428 */ 3429 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev) 3430 { 3431 struct rbd_snap *snap; 3432 int ret = 0; 3433 3434 dout("%s:\n", __func__); 3435 if (WARN_ON(!device_is_registered(&rbd_dev->dev))) 3436 return -EIO; 3437 3438 list_for_each_entry(snap, &rbd_dev->snaps, node) { 3439 if (!rbd_snap_registered(snap)) { 3440 ret = rbd_register_snap_dev(snap, &rbd_dev->dev); 3441 if (ret < 0) 3442 break; 3443 } 3444 } 3445 dout("%s: returning %d\n", __func__, ret); 3446 3447 return ret; 3448 } 3449 3450 static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 3451 { 3452 struct device *dev; 3453 int ret; 3454 3455 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 3456 3457 dev = &rbd_dev->dev; 3458 dev->bus = &rbd_bus_type; 3459 dev->type = &rbd_device_type; 3460 dev->parent = &rbd_root_dev; 3461 dev->release = rbd_dev_release; 3462 dev_set_name(dev, "%d", rbd_dev->dev_id); 3463 ret = device_register(dev); 3464 3465 mutex_unlock(&ctl_mutex); 3466 3467 return ret; 3468 } 3469 3470 static void rbd_bus_del_dev(struct rbd_device *rbd_dev) 3471 { 3472 device_unregister(&rbd_dev->dev); 3473 } 3474 3475 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); 3476 3477 /* 3478 * Get a unique rbd identifier for the given new rbd_dev, and add 3479 * the rbd_dev to the global list. The minimum rbd id is 1. 3480 */ 3481 static void rbd_dev_id_get(struct rbd_device *rbd_dev) 3482 { 3483 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max); 3484 3485 spin_lock(&rbd_dev_list_lock); 3486 list_add_tail(&rbd_dev->node, &rbd_dev_list); 3487 spin_unlock(&rbd_dev_list_lock); 3488 dout("rbd_dev %p given dev id %llu\n", rbd_dev, 3489 (unsigned long long) rbd_dev->dev_id); 3490 } 3491 3492 /* 3493 * Remove an rbd_dev from the global list, and record that its 3494 * identifier is no longer in use. 3495 */ 3496 static void rbd_dev_id_put(struct rbd_device *rbd_dev) 3497 { 3498 struct list_head *tmp; 3499 int rbd_id = rbd_dev->dev_id; 3500 int max_id; 3501 3502 rbd_assert(rbd_id > 0); 3503 3504 dout("rbd_dev %p released dev id %llu\n", rbd_dev, 3505 (unsigned long long) rbd_dev->dev_id); 3506 spin_lock(&rbd_dev_list_lock); 3507 list_del_init(&rbd_dev->node); 3508 3509 /* 3510 * If the id being "put" is not the current maximum, there 3511 * is nothing special we need to do. 3512 */ 3513 if (rbd_id != atomic64_read(&rbd_dev_id_max)) { 3514 spin_unlock(&rbd_dev_list_lock); 3515 return; 3516 } 3517 3518 /* 3519 * We need to update the current maximum id. Search the 3520 * list to find out what it is. We're more likely to find 3521 * the maximum at the end, so search the list backward. 3522 */ 3523 max_id = 0; 3524 list_for_each_prev(tmp, &rbd_dev_list) { 3525 struct rbd_device *rbd_dev; 3526 3527 rbd_dev = list_entry(tmp, struct rbd_device, node); 3528 if (rbd_dev->dev_id > max_id) 3529 max_id = rbd_dev->dev_id; 3530 } 3531 spin_unlock(&rbd_dev_list_lock); 3532 3533 /* 3534 * The max id could have been updated by rbd_dev_id_get(), in 3535 * which case it now accurately reflects the new maximum. 3536 * Be careful not to overwrite the maximum value in that 3537 * case. 3538 */ 3539 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id); 3540 dout(" max dev id has been reset\n"); 3541 } 3542 3543 /* 3544 * Skips over white space at *buf, and updates *buf to point to the 3545 * first found non-space character (if any). Returns the length of 3546 * the token (string of non-white space characters) found. Note 3547 * that *buf must be terminated with '\0'. 3548 */ 3549 static inline size_t next_token(const char **buf) 3550 { 3551 /* 3552 * These are the characters that produce nonzero for 3553 * isspace() in the "C" and "POSIX" locales. 3554 */ 3555 const char *spaces = " \f\n\r\t\v"; 3556 3557 *buf += strspn(*buf, spaces); /* Find start of token */ 3558 3559 return strcspn(*buf, spaces); /* Return token length */ 3560 } 3561 3562 /* 3563 * Finds the next token in *buf, and if the provided token buffer is 3564 * big enough, copies the found token into it. The result, if 3565 * copied, is guaranteed to be terminated with '\0'. Note that *buf 3566 * must be terminated with '\0' on entry. 3567 * 3568 * Returns the length of the token found (not including the '\0'). 3569 * Return value will be 0 if no token is found, and it will be >= 3570 * token_size if the token would not fit. 3571 * 3572 * The *buf pointer will be updated to point beyond the end of the 3573 * found token. Note that this occurs even if the token buffer is 3574 * too small to hold it. 3575 */ 3576 static inline size_t copy_token(const char **buf, 3577 char *token, 3578 size_t token_size) 3579 { 3580 size_t len; 3581 3582 len = next_token(buf); 3583 if (len < token_size) { 3584 memcpy(token, *buf, len); 3585 *(token + len) = '\0'; 3586 } 3587 *buf += len; 3588 3589 return len; 3590 } 3591 3592 /* 3593 * Finds the next token in *buf, dynamically allocates a buffer big 3594 * enough to hold a copy of it, and copies the token into the new 3595 * buffer. The copy is guaranteed to be terminated with '\0'. Note 3596 * that a duplicate buffer is created even for a zero-length token. 3597 * 3598 * Returns a pointer to the newly-allocated duplicate, or a null 3599 * pointer if memory for the duplicate was not available. If 3600 * the lenp argument is a non-null pointer, the length of the token 3601 * (not including the '\0') is returned in *lenp. 3602 * 3603 * If successful, the *buf pointer will be updated to point beyond 3604 * the end of the found token. 3605 * 3606 * Note: uses GFP_KERNEL for allocation. 3607 */ 3608 static inline char *dup_token(const char **buf, size_t *lenp) 3609 { 3610 char *dup; 3611 size_t len; 3612 3613 len = next_token(buf); 3614 dup = kmemdup(*buf, len + 1, GFP_KERNEL); 3615 if (!dup) 3616 return NULL; 3617 *(dup + len) = '\0'; 3618 *buf += len; 3619 3620 if (lenp) 3621 *lenp = len; 3622 3623 return dup; 3624 } 3625 3626 /* 3627 * Parse the options provided for an "rbd add" (i.e., rbd image 3628 * mapping) request. These arrive via a write to /sys/bus/rbd/add, 3629 * and the data written is passed here via a NUL-terminated buffer. 3630 * Returns 0 if successful or an error code otherwise. 3631 * 3632 * The information extracted from these options is recorded in 3633 * the other parameters which return dynamically-allocated 3634 * structures: 3635 * ceph_opts 3636 * The address of a pointer that will refer to a ceph options 3637 * structure. Caller must release the returned pointer using 3638 * ceph_destroy_options() when it is no longer needed. 3639 * rbd_opts 3640 * Address of an rbd options pointer. Fully initialized by 3641 * this function; caller must release with kfree(). 3642 * spec 3643 * Address of an rbd image specification pointer. Fully 3644 * initialized by this function based on parsed options. 3645 * Caller must release with rbd_spec_put(). 3646 * 3647 * The options passed take this form: 3648 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] 3649 * where: 3650 * <mon_addrs> 3651 * A comma-separated list of one or more monitor addresses. 3652 * A monitor address is an ip address, optionally followed 3653 * by a port number (separated by a colon). 3654 * I.e.: ip1[:port1][,ip2[:port2]...] 3655 * <options> 3656 * A comma-separated list of ceph and/or rbd options. 3657 * <pool_name> 3658 * The name of the rados pool containing the rbd image. 3659 * <image_name> 3660 * The name of the image in that pool to map. 3661 * <snap_id> 3662 * An optional snapshot id. If provided, the mapping will 3663 * present data from the image at the time that snapshot was 3664 * created. The image head is used if no snapshot id is 3665 * provided. Snapshot mappings are always read-only. 3666 */ 3667 static int rbd_add_parse_args(const char *buf, 3668 struct ceph_options **ceph_opts, 3669 struct rbd_options **opts, 3670 struct rbd_spec **rbd_spec) 3671 { 3672 size_t len; 3673 char *options; 3674 const char *mon_addrs; 3675 size_t mon_addrs_size; 3676 struct rbd_spec *spec = NULL; 3677 struct rbd_options *rbd_opts = NULL; 3678 struct ceph_options *copts; 3679 int ret; 3680 3681 /* The first four tokens are required */ 3682 3683 len = next_token(&buf); 3684 if (!len) { 3685 rbd_warn(NULL, "no monitor address(es) provided"); 3686 return -EINVAL; 3687 } 3688 mon_addrs = buf; 3689 mon_addrs_size = len + 1; 3690 buf += len; 3691 3692 ret = -EINVAL; 3693 options = dup_token(&buf, NULL); 3694 if (!options) 3695 return -ENOMEM; 3696 if (!*options) { 3697 rbd_warn(NULL, "no options provided"); 3698 goto out_err; 3699 } 3700 3701 spec = rbd_spec_alloc(); 3702 if (!spec) 3703 goto out_mem; 3704 3705 spec->pool_name = dup_token(&buf, NULL); 3706 if (!spec->pool_name) 3707 goto out_mem; 3708 if (!*spec->pool_name) { 3709 rbd_warn(NULL, "no pool name provided"); 3710 goto out_err; 3711 } 3712 3713 spec->image_name = dup_token(&buf, NULL); 3714 if (!spec->image_name) 3715 goto out_mem; 3716 if (!*spec->image_name) { 3717 rbd_warn(NULL, "no image name provided"); 3718 goto out_err; 3719 } 3720 3721 /* 3722 * Snapshot name is optional; default is to use "-" 3723 * (indicating the head/no snapshot). 3724 */ 3725 len = next_token(&buf); 3726 if (!len) { 3727 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 3728 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 3729 } else if (len > RBD_MAX_SNAP_NAME_LEN) { 3730 ret = -ENAMETOOLONG; 3731 goto out_err; 3732 } 3733 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL); 3734 if (!spec->snap_name) 3735 goto out_mem; 3736 *(spec->snap_name + len) = '\0'; 3737 3738 /* Initialize all rbd options to the defaults */ 3739 3740 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL); 3741 if (!rbd_opts) 3742 goto out_mem; 3743 3744 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 3745 3746 copts = ceph_parse_options(options, mon_addrs, 3747 mon_addrs + mon_addrs_size - 1, 3748 parse_rbd_opts_token, rbd_opts); 3749 if (IS_ERR(copts)) { 3750 ret = PTR_ERR(copts); 3751 goto out_err; 3752 } 3753 kfree(options); 3754 3755 *ceph_opts = copts; 3756 *opts = rbd_opts; 3757 *rbd_spec = spec; 3758 3759 return 0; 3760 out_mem: 3761 ret = -ENOMEM; 3762 out_err: 3763 kfree(rbd_opts); 3764 rbd_spec_put(spec); 3765 kfree(options); 3766 3767 return ret; 3768 } 3769 3770 /* 3771 * An rbd format 2 image has a unique identifier, distinct from the 3772 * name given to it by the user. Internally, that identifier is 3773 * what's used to specify the names of objects related to the image. 3774 * 3775 * A special "rbd id" object is used to map an rbd image name to its 3776 * id. If that object doesn't exist, then there is no v2 rbd image 3777 * with the supplied name. 3778 * 3779 * This function will record the given rbd_dev's image_id field if 3780 * it can be determined, and in that case will return 0. If any 3781 * errors occur a negative errno will be returned and the rbd_dev's 3782 * image_id field will be unchanged (and should be NULL). 3783 */ 3784 static int rbd_dev_image_id(struct rbd_device *rbd_dev) 3785 { 3786 int ret; 3787 size_t size; 3788 char *object_name; 3789 void *response; 3790 void *p; 3791 3792 /* 3793 * When probing a parent image, the image id is already 3794 * known (and the image name likely is not). There's no 3795 * need to fetch the image id again in this case. 3796 */ 3797 if (rbd_dev->spec->image_id) 3798 return 0; 3799 3800 /* 3801 * First, see if the format 2 image id file exists, and if 3802 * so, get the image's persistent id from it. 3803 */ 3804 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name); 3805 object_name = kmalloc(size, GFP_NOIO); 3806 if (!object_name) 3807 return -ENOMEM; 3808 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name); 3809 dout("rbd id object name is %s\n", object_name); 3810 3811 /* Response will be an encoded string, which includes a length */ 3812 3813 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; 3814 response = kzalloc(size, GFP_NOIO); 3815 if (!response) { 3816 ret = -ENOMEM; 3817 goto out; 3818 } 3819 3820 ret = rbd_obj_method_sync(rbd_dev, object_name, 3821 "rbd", "get_id", 3822 NULL, 0, 3823 response, RBD_IMAGE_ID_LEN_MAX, NULL); 3824 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3825 if (ret < 0) 3826 goto out; 3827 3828 p = response; 3829 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p, 3830 p + RBD_IMAGE_ID_LEN_MAX, 3831 NULL, GFP_NOIO); 3832 if (IS_ERR(rbd_dev->spec->image_id)) { 3833 ret = PTR_ERR(rbd_dev->spec->image_id); 3834 rbd_dev->spec->image_id = NULL; 3835 } else { 3836 dout("image_id is %s\n", rbd_dev->spec->image_id); 3837 } 3838 out: 3839 kfree(response); 3840 kfree(object_name); 3841 3842 return ret; 3843 } 3844 3845 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) 3846 { 3847 int ret; 3848 size_t size; 3849 3850 /* Version 1 images have no id; empty string is used */ 3851 3852 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL); 3853 if (!rbd_dev->spec->image_id) 3854 return -ENOMEM; 3855 3856 /* Record the header object name for this rbd image. */ 3857 3858 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX); 3859 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3860 if (!rbd_dev->header_name) { 3861 ret = -ENOMEM; 3862 goto out_err; 3863 } 3864 sprintf(rbd_dev->header_name, "%s%s", 3865 rbd_dev->spec->image_name, RBD_SUFFIX); 3866 3867 /* Populate rbd image metadata */ 3868 3869 ret = rbd_read_header(rbd_dev, &rbd_dev->header); 3870 if (ret < 0) 3871 goto out_err; 3872 3873 /* Version 1 images have no parent (no layering) */ 3874 3875 rbd_dev->parent_spec = NULL; 3876 rbd_dev->parent_overlap = 0; 3877 3878 rbd_dev->image_format = 1; 3879 3880 dout("discovered version 1 image, header name is %s\n", 3881 rbd_dev->header_name); 3882 3883 return 0; 3884 3885 out_err: 3886 kfree(rbd_dev->header_name); 3887 rbd_dev->header_name = NULL; 3888 kfree(rbd_dev->spec->image_id); 3889 rbd_dev->spec->image_id = NULL; 3890 3891 return ret; 3892 } 3893 3894 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) 3895 { 3896 size_t size; 3897 int ret; 3898 u64 ver = 0; 3899 3900 /* 3901 * Image id was filled in by the caller. Record the header 3902 * object name for this rbd image. 3903 */ 3904 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id); 3905 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3906 if (!rbd_dev->header_name) 3907 return -ENOMEM; 3908 sprintf(rbd_dev->header_name, "%s%s", 3909 RBD_HEADER_PREFIX, rbd_dev->spec->image_id); 3910 3911 /* Get the size and object order for the image */ 3912 3913 ret = rbd_dev_v2_image_size(rbd_dev); 3914 if (ret < 0) 3915 goto out_err; 3916 3917 /* Get the object prefix (a.k.a. block_name) for the image */ 3918 3919 ret = rbd_dev_v2_object_prefix(rbd_dev); 3920 if (ret < 0) 3921 goto out_err; 3922 3923 /* Get the and check features for the image */ 3924 3925 ret = rbd_dev_v2_features(rbd_dev); 3926 if (ret < 0) 3927 goto out_err; 3928 3929 /* If the image supports layering, get the parent info */ 3930 3931 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 3932 ret = rbd_dev_v2_parent_info(rbd_dev); 3933 if (ret < 0) 3934 goto out_err; 3935 } 3936 3937 /* crypto and compression type aren't (yet) supported for v2 images */ 3938 3939 rbd_dev->header.crypt_type = 0; 3940 rbd_dev->header.comp_type = 0; 3941 3942 /* Get the snapshot context, plus the header version */ 3943 3944 ret = rbd_dev_v2_snap_context(rbd_dev, &ver); 3945 if (ret) 3946 goto out_err; 3947 rbd_dev->header.obj_version = ver; 3948 3949 rbd_dev->image_format = 2; 3950 3951 dout("discovered version 2 image, header name is %s\n", 3952 rbd_dev->header_name); 3953 3954 return 0; 3955 out_err: 3956 rbd_dev->parent_overlap = 0; 3957 rbd_spec_put(rbd_dev->parent_spec); 3958 rbd_dev->parent_spec = NULL; 3959 kfree(rbd_dev->header_name); 3960 rbd_dev->header_name = NULL; 3961 kfree(rbd_dev->header.object_prefix); 3962 rbd_dev->header.object_prefix = NULL; 3963 3964 return ret; 3965 } 3966 3967 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) 3968 { 3969 int ret; 3970 3971 /* no need to lock here, as rbd_dev is not registered yet */ 3972 ret = rbd_dev_snaps_update(rbd_dev); 3973 if (ret) 3974 return ret; 3975 3976 ret = rbd_dev_probe_update_spec(rbd_dev); 3977 if (ret) 3978 goto err_out_snaps; 3979 3980 ret = rbd_dev_set_mapping(rbd_dev); 3981 if (ret) 3982 goto err_out_snaps; 3983 3984 /* generate unique id: find highest unique id, add one */ 3985 rbd_dev_id_get(rbd_dev); 3986 3987 /* Fill in the device name, now that we have its id. */ 3988 BUILD_BUG_ON(DEV_NAME_LEN 3989 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); 3990 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); 3991 3992 /* Get our block major device number. */ 3993 3994 ret = register_blkdev(0, rbd_dev->name); 3995 if (ret < 0) 3996 goto err_out_id; 3997 rbd_dev->major = ret; 3998 3999 /* Set up the blkdev mapping. */ 4000 4001 ret = rbd_init_disk(rbd_dev); 4002 if (ret) 4003 goto err_out_blkdev; 4004 4005 ret = rbd_bus_add_dev(rbd_dev); 4006 if (ret) 4007 goto err_out_disk; 4008 4009 /* 4010 * At this point cleanup in the event of an error is the job 4011 * of the sysfs code (initiated by rbd_bus_del_dev()). 4012 */ 4013 down_write(&rbd_dev->header_rwsem); 4014 ret = rbd_dev_snaps_register(rbd_dev); 4015 up_write(&rbd_dev->header_rwsem); 4016 if (ret) 4017 goto err_out_bus; 4018 4019 ret = rbd_dev_header_watch_sync(rbd_dev, 1); 4020 if (ret) 4021 goto err_out_bus; 4022 4023 /* Everything's ready. Announce the disk to the world. */ 4024 4025 add_disk(rbd_dev->disk); 4026 4027 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 4028 (unsigned long long) rbd_dev->mapping.size); 4029 4030 return ret; 4031 err_out_bus: 4032 /* this will also clean up rest of rbd_dev stuff */ 4033 4034 rbd_bus_del_dev(rbd_dev); 4035 4036 return ret; 4037 err_out_disk: 4038 rbd_free_disk(rbd_dev); 4039 err_out_blkdev: 4040 unregister_blkdev(rbd_dev->major, rbd_dev->name); 4041 err_out_id: 4042 rbd_dev_id_put(rbd_dev); 4043 err_out_snaps: 4044 rbd_remove_all_snaps(rbd_dev); 4045 4046 return ret; 4047 } 4048 4049 /* 4050 * Probe for the existence of the header object for the given rbd 4051 * device. For format 2 images this includes determining the image 4052 * id. 4053 */ 4054 static int rbd_dev_probe(struct rbd_device *rbd_dev) 4055 { 4056 int ret; 4057 4058 /* 4059 * Get the id from the image id object. If it's not a 4060 * format 2 image, we'll get ENOENT back, and we'll assume 4061 * it's a format 1 image. 4062 */ 4063 ret = rbd_dev_image_id(rbd_dev); 4064 if (ret) 4065 ret = rbd_dev_v1_probe(rbd_dev); 4066 else 4067 ret = rbd_dev_v2_probe(rbd_dev); 4068 if (ret) { 4069 dout("probe failed, returning %d\n", ret); 4070 4071 return ret; 4072 } 4073 4074 ret = rbd_dev_probe_finish(rbd_dev); 4075 if (ret) 4076 rbd_header_free(&rbd_dev->header); 4077 4078 return ret; 4079 } 4080 4081 static ssize_t rbd_add(struct bus_type *bus, 4082 const char *buf, 4083 size_t count) 4084 { 4085 struct rbd_device *rbd_dev = NULL; 4086 struct ceph_options *ceph_opts = NULL; 4087 struct rbd_options *rbd_opts = NULL; 4088 struct rbd_spec *spec = NULL; 4089 struct rbd_client *rbdc; 4090 struct ceph_osd_client *osdc; 4091 int rc = -ENOMEM; 4092 4093 if (!try_module_get(THIS_MODULE)) 4094 return -ENODEV; 4095 4096 /* parse add command */ 4097 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 4098 if (rc < 0) 4099 goto err_out_module; 4100 4101 rbdc = rbd_get_client(ceph_opts); 4102 if (IS_ERR(rbdc)) { 4103 rc = PTR_ERR(rbdc); 4104 goto err_out_args; 4105 } 4106 ceph_opts = NULL; /* rbd_dev client now owns this */ 4107 4108 /* pick the pool */ 4109 osdc = &rbdc->client->osdc; 4110 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name); 4111 if (rc < 0) 4112 goto err_out_client; 4113 spec->pool_id = (u64) rc; 4114 4115 /* The ceph file layout needs to fit pool id in 32 bits */ 4116 4117 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) { 4118 rc = -EIO; 4119 goto err_out_client; 4120 } 4121 4122 rbd_dev = rbd_dev_create(rbdc, spec); 4123 if (!rbd_dev) 4124 goto err_out_client; 4125 rbdc = NULL; /* rbd_dev now owns this */ 4126 spec = NULL; /* rbd_dev now owns this */ 4127 4128 rbd_dev->mapping.read_only = rbd_opts->read_only; 4129 kfree(rbd_opts); 4130 rbd_opts = NULL; /* done with this */ 4131 4132 rc = rbd_dev_probe(rbd_dev); 4133 if (rc < 0) 4134 goto err_out_rbd_dev; 4135 4136 return count; 4137 err_out_rbd_dev: 4138 rbd_dev_destroy(rbd_dev); 4139 err_out_client: 4140 rbd_put_client(rbdc); 4141 err_out_args: 4142 if (ceph_opts) 4143 ceph_destroy_options(ceph_opts); 4144 kfree(rbd_opts); 4145 rbd_spec_put(spec); 4146 err_out_module: 4147 module_put(THIS_MODULE); 4148 4149 dout("Error adding device %s\n", buf); 4150 4151 return (ssize_t) rc; 4152 } 4153 4154 static struct rbd_device *__rbd_get_dev(unsigned long dev_id) 4155 { 4156 struct list_head *tmp; 4157 struct rbd_device *rbd_dev; 4158 4159 spin_lock(&rbd_dev_list_lock); 4160 list_for_each(tmp, &rbd_dev_list) { 4161 rbd_dev = list_entry(tmp, struct rbd_device, node); 4162 if (rbd_dev->dev_id == dev_id) { 4163 spin_unlock(&rbd_dev_list_lock); 4164 return rbd_dev; 4165 } 4166 } 4167 spin_unlock(&rbd_dev_list_lock); 4168 return NULL; 4169 } 4170 4171 static void rbd_dev_release(struct device *dev) 4172 { 4173 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4174 4175 if (rbd_dev->watch_event) 4176 rbd_dev_header_watch_sync(rbd_dev, 0); 4177 4178 /* clean up and free blkdev */ 4179 rbd_free_disk(rbd_dev); 4180 unregister_blkdev(rbd_dev->major, rbd_dev->name); 4181 4182 /* release allocated disk header fields */ 4183 rbd_header_free(&rbd_dev->header); 4184 4185 /* done with the id, and with the rbd_dev */ 4186 rbd_dev_id_put(rbd_dev); 4187 rbd_assert(rbd_dev->rbd_client != NULL); 4188 rbd_dev_destroy(rbd_dev); 4189 4190 /* release module ref */ 4191 module_put(THIS_MODULE); 4192 } 4193 4194 static ssize_t rbd_remove(struct bus_type *bus, 4195 const char *buf, 4196 size_t count) 4197 { 4198 struct rbd_device *rbd_dev = NULL; 4199 int target_id, rc; 4200 unsigned long ul; 4201 int ret = count; 4202 4203 rc = strict_strtoul(buf, 10, &ul); 4204 if (rc) 4205 return rc; 4206 4207 /* convert to int; abort if we lost anything in the conversion */ 4208 target_id = (int) ul; 4209 if (target_id != ul) 4210 return -EINVAL; 4211 4212 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 4213 4214 rbd_dev = __rbd_get_dev(target_id); 4215 if (!rbd_dev) { 4216 ret = -ENOENT; 4217 goto done; 4218 } 4219 4220 spin_lock_irq(&rbd_dev->lock); 4221 if (rbd_dev->open_count) 4222 ret = -EBUSY; 4223 else 4224 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags); 4225 spin_unlock_irq(&rbd_dev->lock); 4226 if (ret < 0) 4227 goto done; 4228 4229 rbd_remove_all_snaps(rbd_dev); 4230 rbd_bus_del_dev(rbd_dev); 4231 4232 done: 4233 mutex_unlock(&ctl_mutex); 4234 4235 return ret; 4236 } 4237 4238 /* 4239 * create control files in sysfs 4240 * /sys/bus/rbd/... 4241 */ 4242 static int rbd_sysfs_init(void) 4243 { 4244 int ret; 4245 4246 ret = device_register(&rbd_root_dev); 4247 if (ret < 0) 4248 return ret; 4249 4250 ret = bus_register(&rbd_bus_type); 4251 if (ret < 0) 4252 device_unregister(&rbd_root_dev); 4253 4254 return ret; 4255 } 4256 4257 static void rbd_sysfs_cleanup(void) 4258 { 4259 bus_unregister(&rbd_bus_type); 4260 device_unregister(&rbd_root_dev); 4261 } 4262 4263 static int __init rbd_init(void) 4264 { 4265 int rc; 4266 4267 if (!libceph_compatible(NULL)) { 4268 rbd_warn(NULL, "libceph incompatibility (quitting)"); 4269 4270 return -EINVAL; 4271 } 4272 rc = rbd_sysfs_init(); 4273 if (rc) 4274 return rc; 4275 pr_info("loaded " RBD_DRV_NAME_LONG "\n"); 4276 return 0; 4277 } 4278 4279 static void __exit rbd_exit(void) 4280 { 4281 rbd_sysfs_cleanup(); 4282 } 4283 4284 module_init(rbd_init); 4285 module_exit(rbd_exit); 4286 4287 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 4288 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 4289 MODULE_DESCRIPTION("rados block device"); 4290 4291 /* following authorship retained from original osdblk.c */ 4292 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 4293 4294 MODULE_LICENSE("GPL"); 4295