1 2 /* 3 rbd.c -- Export ceph rados objects as a Linux block device 4 5 6 based on drivers/block/osdblk.c: 7 8 Copyright 2009 Red Hat, Inc. 9 10 This program is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; see the file COPYING. If not, write to 21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 22 23 24 25 For usage instructions, please refer to: 26 27 Documentation/ABI/testing/sysfs-bus-rbd 28 29 */ 30 31 #include <linux/ceph/libceph.h> 32 #include <linux/ceph/osd_client.h> 33 #include <linux/ceph/mon_client.h> 34 #include <linux/ceph/decode.h> 35 #include <linux/parser.h> 36 #include <linux/bsearch.h> 37 38 #include <linux/kernel.h> 39 #include <linux/device.h> 40 #include <linux/module.h> 41 #include <linux/fs.h> 42 #include <linux/blkdev.h> 43 #include <linux/slab.h> 44 45 #include "rbd_types.h" 46 47 #define RBD_DEBUG /* Activate rbd_assert() calls */ 48 49 /* 50 * The basic unit of block I/O is a sector. It is interpreted in a 51 * number of contexts in Linux (blk, bio, genhd), but the default is 52 * universally 512 bytes. These symbols are just slightly more 53 * meaningful than the bare numbers they represent. 54 */ 55 #define SECTOR_SHIFT 9 56 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT) 57 58 #define RBD_DRV_NAME "rbd" 59 #define RBD_DRV_NAME_LONG "rbd (rados block device)" 60 61 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 62 63 #define RBD_SNAP_DEV_NAME_PREFIX "snap_" 64 #define RBD_MAX_SNAP_NAME_LEN \ 65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 66 67 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 68 69 #define RBD_SNAP_HEAD_NAME "-" 70 71 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */ 72 73 /* This allows a single page to hold an image name sent by OSD */ 74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 75 #define RBD_IMAGE_ID_LEN_MAX 64 76 77 #define RBD_OBJ_PREFIX_LEN_MAX 64 78 79 /* Feature bits */ 80 81 #define RBD_FEATURE_LAYERING (1<<0) 82 #define RBD_FEATURE_STRIPINGV2 (1<<1) 83 #define RBD_FEATURES_ALL \ 84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) 85 86 /* Features supported by this (client software) implementation. */ 87 88 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL) 89 90 /* 91 * An RBD device name will be "rbd#", where the "rbd" comes from 92 * RBD_DRV_NAME above, and # is a unique integer identifier. 93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big 94 * enough to hold all possible device names. 95 */ 96 #define DEV_NAME_LEN 32 97 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) 98 99 /* 100 * block device image metadata (in-memory version) 101 */ 102 struct rbd_image_header { 103 /* These four fields never change for a given rbd image */ 104 char *object_prefix; 105 u64 features; 106 __u8 obj_order; 107 __u8 crypt_type; 108 __u8 comp_type; 109 110 /* The remaining fields need to be updated occasionally */ 111 u64 image_size; 112 struct ceph_snap_context *snapc; 113 char *snap_names; 114 u64 *snap_sizes; 115 116 u64 stripe_unit; 117 u64 stripe_count; 118 }; 119 120 /* 121 * An rbd image specification. 122 * 123 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 124 * identify an image. Each rbd_dev structure includes a pointer to 125 * an rbd_spec structure that encapsulates this identity. 126 * 127 * Each of the id's in an rbd_spec has an associated name. For a 128 * user-mapped image, the names are supplied and the id's associated 129 * with them are looked up. For a layered image, a parent image is 130 * defined by the tuple, and the names are looked up. 131 * 132 * An rbd_dev structure contains a parent_spec pointer which is 133 * non-null if the image it represents is a child in a layered 134 * image. This pointer will refer to the rbd_spec structure used 135 * by the parent rbd_dev for its own identity (i.e., the structure 136 * is shared between the parent and child). 137 * 138 * Since these structures are populated once, during the discovery 139 * phase of image construction, they are effectively immutable so 140 * we make no effort to synchronize access to them. 141 * 142 * Note that code herein does not assume the image name is known (it 143 * could be a null pointer). 144 */ 145 struct rbd_spec { 146 u64 pool_id; 147 const char *pool_name; 148 149 const char *image_id; 150 const char *image_name; 151 152 u64 snap_id; 153 const char *snap_name; 154 155 struct kref kref; 156 }; 157 158 /* 159 * an instance of the client. multiple devices may share an rbd client. 160 */ 161 struct rbd_client { 162 struct ceph_client *client; 163 struct kref kref; 164 struct list_head node; 165 }; 166 167 struct rbd_img_request; 168 typedef void (*rbd_img_callback_t)(struct rbd_img_request *); 169 170 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */ 171 172 struct rbd_obj_request; 173 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *); 174 175 enum obj_request_type { 176 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES 177 }; 178 179 enum obj_req_flags { 180 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */ 181 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */ 182 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */ 183 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ 184 }; 185 186 struct rbd_obj_request { 187 const char *object_name; 188 u64 offset; /* object start byte */ 189 u64 length; /* bytes from offset */ 190 unsigned long flags; 191 192 /* 193 * An object request associated with an image will have its 194 * img_data flag set; a standalone object request will not. 195 * 196 * A standalone object request will have which == BAD_WHICH 197 * and a null obj_request pointer. 198 * 199 * An object request initiated in support of a layered image 200 * object (to check for its existence before a write) will 201 * have which == BAD_WHICH and a non-null obj_request pointer. 202 * 203 * Finally, an object request for rbd image data will have 204 * which != BAD_WHICH, and will have a non-null img_request 205 * pointer. The value of which will be in the range 206 * 0..(img_request->obj_request_count-1). 207 */ 208 union { 209 struct rbd_obj_request *obj_request; /* STAT op */ 210 struct { 211 struct rbd_img_request *img_request; 212 u64 img_offset; 213 /* links for img_request->obj_requests list */ 214 struct list_head links; 215 }; 216 }; 217 u32 which; /* posn image request list */ 218 219 enum obj_request_type type; 220 union { 221 struct bio *bio_list; 222 struct { 223 struct page **pages; 224 u32 page_count; 225 }; 226 }; 227 struct page **copyup_pages; 228 229 struct ceph_osd_request *osd_req; 230 231 u64 xferred; /* bytes transferred */ 232 int result; 233 234 rbd_obj_callback_t callback; 235 struct completion completion; 236 237 struct kref kref; 238 }; 239 240 enum img_req_flags { 241 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */ 242 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ 243 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ 244 }; 245 246 struct rbd_img_request { 247 struct rbd_device *rbd_dev; 248 u64 offset; /* starting image byte offset */ 249 u64 length; /* byte count from offset */ 250 unsigned long flags; 251 union { 252 u64 snap_id; /* for reads */ 253 struct ceph_snap_context *snapc; /* for writes */ 254 }; 255 union { 256 struct request *rq; /* block request */ 257 struct rbd_obj_request *obj_request; /* obj req initiator */ 258 }; 259 struct page **copyup_pages; 260 spinlock_t completion_lock;/* protects next_completion */ 261 u32 next_completion; 262 rbd_img_callback_t callback; 263 u64 xferred;/* aggregate bytes transferred */ 264 int result; /* first nonzero obj_request result */ 265 266 u32 obj_request_count; 267 struct list_head obj_requests; /* rbd_obj_request structs */ 268 269 struct kref kref; 270 }; 271 272 #define for_each_obj_request(ireq, oreq) \ 273 list_for_each_entry(oreq, &(ireq)->obj_requests, links) 274 #define for_each_obj_request_from(ireq, oreq) \ 275 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links) 276 #define for_each_obj_request_safe(ireq, oreq, n) \ 277 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) 278 279 struct rbd_mapping { 280 u64 size; 281 u64 features; 282 bool read_only; 283 }; 284 285 /* 286 * a single device 287 */ 288 struct rbd_device { 289 int dev_id; /* blkdev unique id */ 290 291 int major; /* blkdev assigned major */ 292 struct gendisk *disk; /* blkdev's gendisk and rq */ 293 294 u32 image_format; /* Either 1 or 2 */ 295 struct rbd_client *rbd_client; 296 297 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 298 299 spinlock_t lock; /* queue, flags, open_count */ 300 301 struct rbd_image_header header; 302 unsigned long flags; /* possibly lock protected */ 303 struct rbd_spec *spec; 304 305 char *header_name; 306 307 struct ceph_file_layout layout; 308 309 struct ceph_osd_event *watch_event; 310 struct rbd_obj_request *watch_request; 311 312 struct rbd_spec *parent_spec; 313 u64 parent_overlap; 314 struct rbd_device *parent; 315 316 /* protects updating the header */ 317 struct rw_semaphore header_rwsem; 318 319 struct rbd_mapping mapping; 320 321 struct list_head node; 322 323 /* sysfs related */ 324 struct device dev; 325 unsigned long open_count; /* protected by lock */ 326 }; 327 328 /* 329 * Flag bits for rbd_dev->flags. If atomicity is required, 330 * rbd_dev->lock is used to protect access. 331 * 332 * Currently, only the "removing" flag (which is coupled with the 333 * "open_count" field) requires atomic access. 334 */ 335 enum rbd_dev_flags { 336 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ 337 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ 338 }; 339 340 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 341 342 static LIST_HEAD(rbd_dev_list); /* devices */ 343 static DEFINE_SPINLOCK(rbd_dev_list_lock); 344 345 static LIST_HEAD(rbd_client_list); /* clients */ 346 static DEFINE_SPINLOCK(rbd_client_list_lock); 347 348 /* Slab caches for frequently-allocated structures */ 349 350 static struct kmem_cache *rbd_img_request_cache; 351 static struct kmem_cache *rbd_obj_request_cache; 352 static struct kmem_cache *rbd_segment_name_cache; 353 354 static int rbd_img_request_submit(struct rbd_img_request *img_request); 355 356 static void rbd_dev_device_release(struct device *dev); 357 358 static ssize_t rbd_add(struct bus_type *bus, const char *buf, 359 size_t count); 360 static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 361 size_t count); 362 static int rbd_dev_image_probe(struct rbd_device *rbd_dev); 363 364 static struct bus_attribute rbd_bus_attrs[] = { 365 __ATTR(add, S_IWUSR, NULL, rbd_add), 366 __ATTR(remove, S_IWUSR, NULL, rbd_remove), 367 __ATTR_NULL 368 }; 369 370 static struct bus_type rbd_bus_type = { 371 .name = "rbd", 372 .bus_attrs = rbd_bus_attrs, 373 }; 374 375 static void rbd_root_dev_release(struct device *dev) 376 { 377 } 378 379 static struct device rbd_root_dev = { 380 .init_name = "rbd", 381 .release = rbd_root_dev_release, 382 }; 383 384 static __printf(2, 3) 385 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) 386 { 387 struct va_format vaf; 388 va_list args; 389 390 va_start(args, fmt); 391 vaf.fmt = fmt; 392 vaf.va = &args; 393 394 if (!rbd_dev) 395 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf); 396 else if (rbd_dev->disk) 397 printk(KERN_WARNING "%s: %s: %pV\n", 398 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf); 399 else if (rbd_dev->spec && rbd_dev->spec->image_name) 400 printk(KERN_WARNING "%s: image %s: %pV\n", 401 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf); 402 else if (rbd_dev->spec && rbd_dev->spec->image_id) 403 printk(KERN_WARNING "%s: id %s: %pV\n", 404 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf); 405 else /* punt */ 406 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n", 407 RBD_DRV_NAME, rbd_dev, &vaf); 408 va_end(args); 409 } 410 411 #ifdef RBD_DEBUG 412 #define rbd_assert(expr) \ 413 if (unlikely(!(expr))) { \ 414 printk(KERN_ERR "\nAssertion failure in %s() " \ 415 "at line %d:\n\n" \ 416 "\trbd_assert(%s);\n\n", \ 417 __func__, __LINE__, #expr); \ 418 BUG(); \ 419 } 420 #else /* !RBD_DEBUG */ 421 # define rbd_assert(expr) ((void) 0) 422 #endif /* !RBD_DEBUG */ 423 424 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request); 425 static void rbd_img_parent_read(struct rbd_obj_request *obj_request); 426 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); 427 428 static int rbd_dev_refresh(struct rbd_device *rbd_dev); 429 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev); 430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 431 u64 snap_id); 432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 433 u8 *order, u64 *snap_size); 434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 435 u64 *snap_features); 436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name); 437 438 static int rbd_open(struct block_device *bdev, fmode_t mode) 439 { 440 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 441 bool removing = false; 442 443 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) 444 return -EROFS; 445 446 spin_lock_irq(&rbd_dev->lock); 447 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) 448 removing = true; 449 else 450 rbd_dev->open_count++; 451 spin_unlock_irq(&rbd_dev->lock); 452 if (removing) 453 return -ENOENT; 454 455 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 456 (void) get_device(&rbd_dev->dev); 457 set_device_ro(bdev, rbd_dev->mapping.read_only); 458 mutex_unlock(&ctl_mutex); 459 460 return 0; 461 } 462 463 static void rbd_release(struct gendisk *disk, fmode_t mode) 464 { 465 struct rbd_device *rbd_dev = disk->private_data; 466 unsigned long open_count_before; 467 468 spin_lock_irq(&rbd_dev->lock); 469 open_count_before = rbd_dev->open_count--; 470 spin_unlock_irq(&rbd_dev->lock); 471 rbd_assert(open_count_before > 0); 472 473 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 474 put_device(&rbd_dev->dev); 475 mutex_unlock(&ctl_mutex); 476 } 477 478 static const struct block_device_operations rbd_bd_ops = { 479 .owner = THIS_MODULE, 480 .open = rbd_open, 481 .release = rbd_release, 482 }; 483 484 /* 485 * Initialize an rbd client instance. 486 * We own *ceph_opts. 487 */ 488 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 489 { 490 struct rbd_client *rbdc; 491 int ret = -ENOMEM; 492 493 dout("%s:\n", __func__); 494 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 495 if (!rbdc) 496 goto out_opt; 497 498 kref_init(&rbdc->kref); 499 INIT_LIST_HEAD(&rbdc->node); 500 501 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 502 503 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); 504 if (IS_ERR(rbdc->client)) 505 goto out_mutex; 506 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 507 508 ret = ceph_open_session(rbdc->client); 509 if (ret < 0) 510 goto out_err; 511 512 spin_lock(&rbd_client_list_lock); 513 list_add_tail(&rbdc->node, &rbd_client_list); 514 spin_unlock(&rbd_client_list_lock); 515 516 mutex_unlock(&ctl_mutex); 517 dout("%s: rbdc %p\n", __func__, rbdc); 518 519 return rbdc; 520 521 out_err: 522 ceph_destroy_client(rbdc->client); 523 out_mutex: 524 mutex_unlock(&ctl_mutex); 525 kfree(rbdc); 526 out_opt: 527 if (ceph_opts) 528 ceph_destroy_options(ceph_opts); 529 dout("%s: error %d\n", __func__, ret); 530 531 return ERR_PTR(ret); 532 } 533 534 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) 535 { 536 kref_get(&rbdc->kref); 537 538 return rbdc; 539 } 540 541 /* 542 * Find a ceph client with specific addr and configuration. If 543 * found, bump its reference count. 544 */ 545 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) 546 { 547 struct rbd_client *client_node; 548 bool found = false; 549 550 if (ceph_opts->flags & CEPH_OPT_NOSHARE) 551 return NULL; 552 553 spin_lock(&rbd_client_list_lock); 554 list_for_each_entry(client_node, &rbd_client_list, node) { 555 if (!ceph_compare_options(ceph_opts, client_node->client)) { 556 __rbd_get_client(client_node); 557 558 found = true; 559 break; 560 } 561 } 562 spin_unlock(&rbd_client_list_lock); 563 564 return found ? client_node : NULL; 565 } 566 567 /* 568 * mount options 569 */ 570 enum { 571 Opt_last_int, 572 /* int args above */ 573 Opt_last_string, 574 /* string args above */ 575 Opt_read_only, 576 Opt_read_write, 577 /* Boolean args above */ 578 Opt_last_bool, 579 }; 580 581 static match_table_t rbd_opts_tokens = { 582 /* int args above */ 583 /* string args above */ 584 {Opt_read_only, "read_only"}, 585 {Opt_read_only, "ro"}, /* Alternate spelling */ 586 {Opt_read_write, "read_write"}, 587 {Opt_read_write, "rw"}, /* Alternate spelling */ 588 /* Boolean args above */ 589 {-1, NULL} 590 }; 591 592 struct rbd_options { 593 bool read_only; 594 }; 595 596 #define RBD_READ_ONLY_DEFAULT false 597 598 static int parse_rbd_opts_token(char *c, void *private) 599 { 600 struct rbd_options *rbd_opts = private; 601 substring_t argstr[MAX_OPT_ARGS]; 602 int token, intval, ret; 603 604 token = match_token(c, rbd_opts_tokens, argstr); 605 if (token < 0) 606 return -EINVAL; 607 608 if (token < Opt_last_int) { 609 ret = match_int(&argstr[0], &intval); 610 if (ret < 0) { 611 pr_err("bad mount option arg (not int) " 612 "at '%s'\n", c); 613 return ret; 614 } 615 dout("got int token %d val %d\n", token, intval); 616 } else if (token > Opt_last_int && token < Opt_last_string) { 617 dout("got string token %d val %s\n", token, 618 argstr[0].from); 619 } else if (token > Opt_last_string && token < Opt_last_bool) { 620 dout("got Boolean token %d\n", token); 621 } else { 622 dout("got token %d\n", token); 623 } 624 625 switch (token) { 626 case Opt_read_only: 627 rbd_opts->read_only = true; 628 break; 629 case Opt_read_write: 630 rbd_opts->read_only = false; 631 break; 632 default: 633 rbd_assert(false); 634 break; 635 } 636 return 0; 637 } 638 639 /* 640 * Get a ceph client with specific addr and configuration, if one does 641 * not exist create it. 642 */ 643 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) 644 { 645 struct rbd_client *rbdc; 646 647 rbdc = rbd_client_find(ceph_opts); 648 if (rbdc) /* using an existing client */ 649 ceph_destroy_options(ceph_opts); 650 else 651 rbdc = rbd_client_create(ceph_opts); 652 653 return rbdc; 654 } 655 656 /* 657 * Destroy ceph client 658 * 659 * Caller must hold rbd_client_list_lock. 660 */ 661 static void rbd_client_release(struct kref *kref) 662 { 663 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 664 665 dout("%s: rbdc %p\n", __func__, rbdc); 666 spin_lock(&rbd_client_list_lock); 667 list_del(&rbdc->node); 668 spin_unlock(&rbd_client_list_lock); 669 670 ceph_destroy_client(rbdc->client); 671 kfree(rbdc); 672 } 673 674 /* 675 * Drop reference to ceph client node. If it's not referenced anymore, release 676 * it. 677 */ 678 static void rbd_put_client(struct rbd_client *rbdc) 679 { 680 if (rbdc) 681 kref_put(&rbdc->kref, rbd_client_release); 682 } 683 684 static bool rbd_image_format_valid(u32 image_format) 685 { 686 return image_format == 1 || image_format == 2; 687 } 688 689 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) 690 { 691 size_t size; 692 u32 snap_count; 693 694 /* The header has to start with the magic rbd header text */ 695 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 696 return false; 697 698 /* The bio layer requires at least sector-sized I/O */ 699 700 if (ondisk->options.order < SECTOR_SHIFT) 701 return false; 702 703 /* If we use u64 in a few spots we may be able to loosen this */ 704 705 if (ondisk->options.order > 8 * sizeof (int) - 1) 706 return false; 707 708 /* 709 * The size of a snapshot header has to fit in a size_t, and 710 * that limits the number of snapshots. 711 */ 712 snap_count = le32_to_cpu(ondisk->snap_count); 713 size = SIZE_MAX - sizeof (struct ceph_snap_context); 714 if (snap_count > size / sizeof (__le64)) 715 return false; 716 717 /* 718 * Not only that, but the size of the entire the snapshot 719 * header must also be representable in a size_t. 720 */ 721 size -= snap_count * sizeof (__le64); 722 if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) 723 return false; 724 725 return true; 726 } 727 728 /* 729 * Create a new header structure, translate header format from the on-disk 730 * header. 731 */ 732 static int rbd_header_from_disk(struct rbd_image_header *header, 733 struct rbd_image_header_ondisk *ondisk) 734 { 735 u32 snap_count; 736 size_t len; 737 size_t size; 738 u32 i; 739 740 memset(header, 0, sizeof (*header)); 741 742 snap_count = le32_to_cpu(ondisk->snap_count); 743 744 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix)); 745 header->object_prefix = kmalloc(len + 1, GFP_KERNEL); 746 if (!header->object_prefix) 747 return -ENOMEM; 748 memcpy(header->object_prefix, ondisk->object_prefix, len); 749 header->object_prefix[len] = '\0'; 750 751 if (snap_count) { 752 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 753 754 /* Save a copy of the snapshot names */ 755 756 if (snap_names_len > (u64) SIZE_MAX) 757 return -EIO; 758 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL); 759 if (!header->snap_names) 760 goto out_err; 761 /* 762 * Note that rbd_dev_v1_header_read() guarantees 763 * the ondisk buffer we're working with has 764 * snap_names_len bytes beyond the end of the 765 * snapshot id array, this memcpy() is safe. 766 */ 767 memcpy(header->snap_names, &ondisk->snaps[snap_count], 768 snap_names_len); 769 770 /* Record each snapshot's size */ 771 772 size = snap_count * sizeof (*header->snap_sizes); 773 header->snap_sizes = kmalloc(size, GFP_KERNEL); 774 if (!header->snap_sizes) 775 goto out_err; 776 for (i = 0; i < snap_count; i++) 777 header->snap_sizes[i] = 778 le64_to_cpu(ondisk->snaps[i].image_size); 779 } else { 780 header->snap_names = NULL; 781 header->snap_sizes = NULL; 782 } 783 784 header->features = 0; /* No features support in v1 images */ 785 header->obj_order = ondisk->options.order; 786 header->crypt_type = ondisk->options.crypt_type; 787 header->comp_type = ondisk->options.comp_type; 788 789 /* Allocate and fill in the snapshot context */ 790 791 header->image_size = le64_to_cpu(ondisk->image_size); 792 793 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 794 if (!header->snapc) 795 goto out_err; 796 header->snapc->seq = le64_to_cpu(ondisk->snap_seq); 797 for (i = 0; i < snap_count; i++) 798 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id); 799 800 return 0; 801 802 out_err: 803 kfree(header->snap_sizes); 804 header->snap_sizes = NULL; 805 kfree(header->snap_names); 806 header->snap_names = NULL; 807 kfree(header->object_prefix); 808 header->object_prefix = NULL; 809 810 return -ENOMEM; 811 } 812 813 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which) 814 { 815 const char *snap_name; 816 817 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 818 819 /* Skip over names until we find the one we are looking for */ 820 821 snap_name = rbd_dev->header.snap_names; 822 while (which--) 823 snap_name += strlen(snap_name) + 1; 824 825 return kstrdup(snap_name, GFP_KERNEL); 826 } 827 828 /* 829 * Snapshot id comparison function for use with qsort()/bsearch(). 830 * Note that result is for snapshots in *descending* order. 831 */ 832 static int snapid_compare_reverse(const void *s1, const void *s2) 833 { 834 u64 snap_id1 = *(u64 *)s1; 835 u64 snap_id2 = *(u64 *)s2; 836 837 if (snap_id1 < snap_id2) 838 return 1; 839 return snap_id1 == snap_id2 ? 0 : -1; 840 } 841 842 /* 843 * Search a snapshot context to see if the given snapshot id is 844 * present. 845 * 846 * Returns the position of the snapshot id in the array if it's found, 847 * or BAD_SNAP_INDEX otherwise. 848 * 849 * Note: The snapshot array is in kept sorted (by the osd) in 850 * reverse order, highest snapshot id first. 851 */ 852 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id) 853 { 854 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 855 u64 *found; 856 857 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps, 858 sizeof (snap_id), snapid_compare_reverse); 859 860 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX; 861 } 862 863 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, 864 u64 snap_id) 865 { 866 u32 which; 867 868 which = rbd_dev_snap_index(rbd_dev, snap_id); 869 if (which == BAD_SNAP_INDEX) 870 return NULL; 871 872 return _rbd_dev_v1_snap_name(rbd_dev, which); 873 } 874 875 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 876 { 877 if (snap_id == CEPH_NOSNAP) 878 return RBD_SNAP_HEAD_NAME; 879 880 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 881 if (rbd_dev->image_format == 1) 882 return rbd_dev_v1_snap_name(rbd_dev, snap_id); 883 884 return rbd_dev_v2_snap_name(rbd_dev, snap_id); 885 } 886 887 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 888 u64 *snap_size) 889 { 890 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 891 if (snap_id == CEPH_NOSNAP) { 892 *snap_size = rbd_dev->header.image_size; 893 } else if (rbd_dev->image_format == 1) { 894 u32 which; 895 896 which = rbd_dev_snap_index(rbd_dev, snap_id); 897 if (which == BAD_SNAP_INDEX) 898 return -ENOENT; 899 900 *snap_size = rbd_dev->header.snap_sizes[which]; 901 } else { 902 u64 size = 0; 903 int ret; 904 905 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size); 906 if (ret) 907 return ret; 908 909 *snap_size = size; 910 } 911 return 0; 912 } 913 914 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 915 u64 *snap_features) 916 { 917 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 918 if (snap_id == CEPH_NOSNAP) { 919 *snap_features = rbd_dev->header.features; 920 } else if (rbd_dev->image_format == 1) { 921 *snap_features = 0; /* No features for format 1 */ 922 } else { 923 u64 features = 0; 924 int ret; 925 926 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features); 927 if (ret) 928 return ret; 929 930 *snap_features = features; 931 } 932 return 0; 933 } 934 935 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) 936 { 937 const char *snap_name = rbd_dev->spec->snap_name; 938 u64 snap_id; 939 u64 size = 0; 940 u64 features = 0; 941 int ret; 942 943 if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) { 944 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name); 945 if (snap_id == CEPH_NOSNAP) 946 return -ENOENT; 947 } else { 948 snap_id = CEPH_NOSNAP; 949 } 950 951 ret = rbd_snap_size(rbd_dev, snap_id, &size); 952 if (ret) 953 return ret; 954 ret = rbd_snap_features(rbd_dev, snap_id, &features); 955 if (ret) 956 return ret; 957 958 rbd_dev->mapping.size = size; 959 rbd_dev->mapping.features = features; 960 961 /* If we are mapping a snapshot it must be marked read-only */ 962 963 if (snap_id != CEPH_NOSNAP) 964 rbd_dev->mapping.read_only = true; 965 966 return 0; 967 } 968 969 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) 970 { 971 rbd_dev->mapping.size = 0; 972 rbd_dev->mapping.features = 0; 973 rbd_dev->mapping.read_only = true; 974 } 975 976 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev) 977 { 978 rbd_dev->mapping.size = 0; 979 rbd_dev->mapping.features = 0; 980 rbd_dev->mapping.read_only = true; 981 } 982 983 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) 984 { 985 char *name; 986 u64 segment; 987 int ret; 988 989 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO); 990 if (!name) 991 return NULL; 992 segment = offset >> rbd_dev->header.obj_order; 993 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx", 994 rbd_dev->header.object_prefix, segment); 995 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) { 996 pr_err("error formatting segment name for #%llu (%d)\n", 997 segment, ret); 998 kfree(name); 999 name = NULL; 1000 } 1001 1002 return name; 1003 } 1004 1005 static void rbd_segment_name_free(const char *name) 1006 { 1007 /* The explicit cast here is needed to drop the const qualifier */ 1008 1009 kmem_cache_free(rbd_segment_name_cache, (void *)name); 1010 } 1011 1012 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) 1013 { 1014 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 1015 1016 return offset & (segment_size - 1); 1017 } 1018 1019 static u64 rbd_segment_length(struct rbd_device *rbd_dev, 1020 u64 offset, u64 length) 1021 { 1022 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 1023 1024 offset &= segment_size - 1; 1025 1026 rbd_assert(length <= U64_MAX - offset); 1027 if (offset + length > segment_size) 1028 length = segment_size - offset; 1029 1030 return length; 1031 } 1032 1033 /* 1034 * returns the size of an object in the image 1035 */ 1036 static u64 rbd_obj_bytes(struct rbd_image_header *header) 1037 { 1038 return 1 << header->obj_order; 1039 } 1040 1041 /* 1042 * bio helpers 1043 */ 1044 1045 static void bio_chain_put(struct bio *chain) 1046 { 1047 struct bio *tmp; 1048 1049 while (chain) { 1050 tmp = chain; 1051 chain = chain->bi_next; 1052 bio_put(tmp); 1053 } 1054 } 1055 1056 /* 1057 * zeros a bio chain, starting at specific offset 1058 */ 1059 static void zero_bio_chain(struct bio *chain, int start_ofs) 1060 { 1061 struct bio_vec *bv; 1062 unsigned long flags; 1063 void *buf; 1064 int i; 1065 int pos = 0; 1066 1067 while (chain) { 1068 bio_for_each_segment(bv, chain, i) { 1069 if (pos + bv->bv_len > start_ofs) { 1070 int remainder = max(start_ofs - pos, 0); 1071 buf = bvec_kmap_irq(bv, &flags); 1072 memset(buf + remainder, 0, 1073 bv->bv_len - remainder); 1074 bvec_kunmap_irq(buf, &flags); 1075 } 1076 pos += bv->bv_len; 1077 } 1078 1079 chain = chain->bi_next; 1080 } 1081 } 1082 1083 /* 1084 * similar to zero_bio_chain(), zeros data defined by a page array, 1085 * starting at the given byte offset from the start of the array and 1086 * continuing up to the given end offset. The pages array is 1087 * assumed to be big enough to hold all bytes up to the end. 1088 */ 1089 static void zero_pages(struct page **pages, u64 offset, u64 end) 1090 { 1091 struct page **page = &pages[offset >> PAGE_SHIFT]; 1092 1093 rbd_assert(end > offset); 1094 rbd_assert(end - offset <= (u64)SIZE_MAX); 1095 while (offset < end) { 1096 size_t page_offset; 1097 size_t length; 1098 unsigned long flags; 1099 void *kaddr; 1100 1101 page_offset = (size_t)(offset & ~PAGE_MASK); 1102 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset)); 1103 local_irq_save(flags); 1104 kaddr = kmap_atomic(*page); 1105 memset(kaddr + page_offset, 0, length); 1106 kunmap_atomic(kaddr); 1107 local_irq_restore(flags); 1108 1109 offset += length; 1110 page++; 1111 } 1112 } 1113 1114 /* 1115 * Clone a portion of a bio, starting at the given byte offset 1116 * and continuing for the number of bytes indicated. 1117 */ 1118 static struct bio *bio_clone_range(struct bio *bio_src, 1119 unsigned int offset, 1120 unsigned int len, 1121 gfp_t gfpmask) 1122 { 1123 struct bio_vec *bv; 1124 unsigned int resid; 1125 unsigned short idx; 1126 unsigned int voff; 1127 unsigned short end_idx; 1128 unsigned short vcnt; 1129 struct bio *bio; 1130 1131 /* Handle the easy case for the caller */ 1132 1133 if (!offset && len == bio_src->bi_size) 1134 return bio_clone(bio_src, gfpmask); 1135 1136 if (WARN_ON_ONCE(!len)) 1137 return NULL; 1138 if (WARN_ON_ONCE(len > bio_src->bi_size)) 1139 return NULL; 1140 if (WARN_ON_ONCE(offset > bio_src->bi_size - len)) 1141 return NULL; 1142 1143 /* Find first affected segment... */ 1144 1145 resid = offset; 1146 bio_for_each_segment(bv, bio_src, idx) { 1147 if (resid < bv->bv_len) 1148 break; 1149 resid -= bv->bv_len; 1150 } 1151 voff = resid; 1152 1153 /* ...and the last affected segment */ 1154 1155 resid += len; 1156 __bio_for_each_segment(bv, bio_src, end_idx, idx) { 1157 if (resid <= bv->bv_len) 1158 break; 1159 resid -= bv->bv_len; 1160 } 1161 vcnt = end_idx - idx + 1; 1162 1163 /* Build the clone */ 1164 1165 bio = bio_alloc(gfpmask, (unsigned int) vcnt); 1166 if (!bio) 1167 return NULL; /* ENOMEM */ 1168 1169 bio->bi_bdev = bio_src->bi_bdev; 1170 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT); 1171 bio->bi_rw = bio_src->bi_rw; 1172 bio->bi_flags |= 1 << BIO_CLONED; 1173 1174 /* 1175 * Copy over our part of the bio_vec, then update the first 1176 * and last (or only) entries. 1177 */ 1178 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx], 1179 vcnt * sizeof (struct bio_vec)); 1180 bio->bi_io_vec[0].bv_offset += voff; 1181 if (vcnt > 1) { 1182 bio->bi_io_vec[0].bv_len -= voff; 1183 bio->bi_io_vec[vcnt - 1].bv_len = resid; 1184 } else { 1185 bio->bi_io_vec[0].bv_len = len; 1186 } 1187 1188 bio->bi_vcnt = vcnt; 1189 bio->bi_size = len; 1190 bio->bi_idx = 0; 1191 1192 return bio; 1193 } 1194 1195 /* 1196 * Clone a portion of a bio chain, starting at the given byte offset 1197 * into the first bio in the source chain and continuing for the 1198 * number of bytes indicated. The result is another bio chain of 1199 * exactly the given length, or a null pointer on error. 1200 * 1201 * The bio_src and offset parameters are both in-out. On entry they 1202 * refer to the first source bio and the offset into that bio where 1203 * the start of data to be cloned is located. 1204 * 1205 * On return, bio_src is updated to refer to the bio in the source 1206 * chain that contains first un-cloned byte, and *offset will 1207 * contain the offset of that byte within that bio. 1208 */ 1209 static struct bio *bio_chain_clone_range(struct bio **bio_src, 1210 unsigned int *offset, 1211 unsigned int len, 1212 gfp_t gfpmask) 1213 { 1214 struct bio *bi = *bio_src; 1215 unsigned int off = *offset; 1216 struct bio *chain = NULL; 1217 struct bio **end; 1218 1219 /* Build up a chain of clone bios up to the limit */ 1220 1221 if (!bi || off >= bi->bi_size || !len) 1222 return NULL; /* Nothing to clone */ 1223 1224 end = &chain; 1225 while (len) { 1226 unsigned int bi_size; 1227 struct bio *bio; 1228 1229 if (!bi) { 1230 rbd_warn(NULL, "bio_chain exhausted with %u left", len); 1231 goto out_err; /* EINVAL; ran out of bio's */ 1232 } 1233 bi_size = min_t(unsigned int, bi->bi_size - off, len); 1234 bio = bio_clone_range(bi, off, bi_size, gfpmask); 1235 if (!bio) 1236 goto out_err; /* ENOMEM */ 1237 1238 *end = bio; 1239 end = &bio->bi_next; 1240 1241 off += bi_size; 1242 if (off == bi->bi_size) { 1243 bi = bi->bi_next; 1244 off = 0; 1245 } 1246 len -= bi_size; 1247 } 1248 *bio_src = bi; 1249 *offset = off; 1250 1251 return chain; 1252 out_err: 1253 bio_chain_put(chain); 1254 1255 return NULL; 1256 } 1257 1258 /* 1259 * The default/initial value for all object request flags is 0. For 1260 * each flag, once its value is set to 1 it is never reset to 0 1261 * again. 1262 */ 1263 static void obj_request_img_data_set(struct rbd_obj_request *obj_request) 1264 { 1265 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) { 1266 struct rbd_device *rbd_dev; 1267 1268 rbd_dev = obj_request->img_request->rbd_dev; 1269 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n", 1270 obj_request); 1271 } 1272 } 1273 1274 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request) 1275 { 1276 smp_mb(); 1277 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0; 1278 } 1279 1280 static void obj_request_done_set(struct rbd_obj_request *obj_request) 1281 { 1282 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) { 1283 struct rbd_device *rbd_dev = NULL; 1284 1285 if (obj_request_img_data_test(obj_request)) 1286 rbd_dev = obj_request->img_request->rbd_dev; 1287 rbd_warn(rbd_dev, "obj_request %p already marked done\n", 1288 obj_request); 1289 } 1290 } 1291 1292 static bool obj_request_done_test(struct rbd_obj_request *obj_request) 1293 { 1294 smp_mb(); 1295 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0; 1296 } 1297 1298 /* 1299 * This sets the KNOWN flag after (possibly) setting the EXISTS 1300 * flag. The latter is set based on the "exists" value provided. 1301 * 1302 * Note that for our purposes once an object exists it never goes 1303 * away again. It's possible that the response from two existence 1304 * checks are separated by the creation of the target object, and 1305 * the first ("doesn't exist") response arrives *after* the second 1306 * ("does exist"). In that case we ignore the second one. 1307 */ 1308 static void obj_request_existence_set(struct rbd_obj_request *obj_request, 1309 bool exists) 1310 { 1311 if (exists) 1312 set_bit(OBJ_REQ_EXISTS, &obj_request->flags); 1313 set_bit(OBJ_REQ_KNOWN, &obj_request->flags); 1314 smp_mb(); 1315 } 1316 1317 static bool obj_request_known_test(struct rbd_obj_request *obj_request) 1318 { 1319 smp_mb(); 1320 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0; 1321 } 1322 1323 static bool obj_request_exists_test(struct rbd_obj_request *obj_request) 1324 { 1325 smp_mb(); 1326 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0; 1327 } 1328 1329 static void rbd_obj_request_get(struct rbd_obj_request *obj_request) 1330 { 1331 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1332 atomic_read(&obj_request->kref.refcount)); 1333 kref_get(&obj_request->kref); 1334 } 1335 1336 static void rbd_obj_request_destroy(struct kref *kref); 1337 static void rbd_obj_request_put(struct rbd_obj_request *obj_request) 1338 { 1339 rbd_assert(obj_request != NULL); 1340 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1341 atomic_read(&obj_request->kref.refcount)); 1342 kref_put(&obj_request->kref, rbd_obj_request_destroy); 1343 } 1344 1345 static void rbd_img_request_get(struct rbd_img_request *img_request) 1346 { 1347 dout("%s: img %p (was %d)\n", __func__, img_request, 1348 atomic_read(&img_request->kref.refcount)); 1349 kref_get(&img_request->kref); 1350 } 1351 1352 static void rbd_img_request_destroy(struct kref *kref); 1353 static void rbd_img_request_put(struct rbd_img_request *img_request) 1354 { 1355 rbd_assert(img_request != NULL); 1356 dout("%s: img %p (was %d)\n", __func__, img_request, 1357 atomic_read(&img_request->kref.refcount)); 1358 kref_put(&img_request->kref, rbd_img_request_destroy); 1359 } 1360 1361 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, 1362 struct rbd_obj_request *obj_request) 1363 { 1364 rbd_assert(obj_request->img_request == NULL); 1365 1366 /* Image request now owns object's original reference */ 1367 obj_request->img_request = img_request; 1368 obj_request->which = img_request->obj_request_count; 1369 rbd_assert(!obj_request_img_data_test(obj_request)); 1370 obj_request_img_data_set(obj_request); 1371 rbd_assert(obj_request->which != BAD_WHICH); 1372 img_request->obj_request_count++; 1373 list_add_tail(&obj_request->links, &img_request->obj_requests); 1374 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, 1375 obj_request->which); 1376 } 1377 1378 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, 1379 struct rbd_obj_request *obj_request) 1380 { 1381 rbd_assert(obj_request->which != BAD_WHICH); 1382 1383 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, 1384 obj_request->which); 1385 list_del(&obj_request->links); 1386 rbd_assert(img_request->obj_request_count > 0); 1387 img_request->obj_request_count--; 1388 rbd_assert(obj_request->which == img_request->obj_request_count); 1389 obj_request->which = BAD_WHICH; 1390 rbd_assert(obj_request_img_data_test(obj_request)); 1391 rbd_assert(obj_request->img_request == img_request); 1392 obj_request->img_request = NULL; 1393 obj_request->callback = NULL; 1394 rbd_obj_request_put(obj_request); 1395 } 1396 1397 static bool obj_request_type_valid(enum obj_request_type type) 1398 { 1399 switch (type) { 1400 case OBJ_REQUEST_NODATA: 1401 case OBJ_REQUEST_BIO: 1402 case OBJ_REQUEST_PAGES: 1403 return true; 1404 default: 1405 return false; 1406 } 1407 } 1408 1409 static int rbd_obj_request_submit(struct ceph_osd_client *osdc, 1410 struct rbd_obj_request *obj_request) 1411 { 1412 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request); 1413 1414 return ceph_osdc_start_request(osdc, obj_request->osd_req, false); 1415 } 1416 1417 static void rbd_img_request_complete(struct rbd_img_request *img_request) 1418 { 1419 1420 dout("%s: img %p\n", __func__, img_request); 1421 1422 /* 1423 * If no error occurred, compute the aggregate transfer 1424 * count for the image request. We could instead use 1425 * atomic64_cmpxchg() to update it as each object request 1426 * completes; not clear which way is better off hand. 1427 */ 1428 if (!img_request->result) { 1429 struct rbd_obj_request *obj_request; 1430 u64 xferred = 0; 1431 1432 for_each_obj_request(img_request, obj_request) 1433 xferred += obj_request->xferred; 1434 img_request->xferred = xferred; 1435 } 1436 1437 if (img_request->callback) 1438 img_request->callback(img_request); 1439 else 1440 rbd_img_request_put(img_request); 1441 } 1442 1443 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */ 1444 1445 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) 1446 { 1447 dout("%s: obj %p\n", __func__, obj_request); 1448 1449 return wait_for_completion_interruptible(&obj_request->completion); 1450 } 1451 1452 /* 1453 * The default/initial value for all image request flags is 0. Each 1454 * is conditionally set to 1 at image request initialization time 1455 * and currently never change thereafter. 1456 */ 1457 static void img_request_write_set(struct rbd_img_request *img_request) 1458 { 1459 set_bit(IMG_REQ_WRITE, &img_request->flags); 1460 smp_mb(); 1461 } 1462 1463 static bool img_request_write_test(struct rbd_img_request *img_request) 1464 { 1465 smp_mb(); 1466 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0; 1467 } 1468 1469 static void img_request_child_set(struct rbd_img_request *img_request) 1470 { 1471 set_bit(IMG_REQ_CHILD, &img_request->flags); 1472 smp_mb(); 1473 } 1474 1475 static bool img_request_child_test(struct rbd_img_request *img_request) 1476 { 1477 smp_mb(); 1478 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0; 1479 } 1480 1481 static void img_request_layered_set(struct rbd_img_request *img_request) 1482 { 1483 set_bit(IMG_REQ_LAYERED, &img_request->flags); 1484 smp_mb(); 1485 } 1486 1487 static bool img_request_layered_test(struct rbd_img_request *img_request) 1488 { 1489 smp_mb(); 1490 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; 1491 } 1492 1493 static void 1494 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) 1495 { 1496 u64 xferred = obj_request->xferred; 1497 u64 length = obj_request->length; 1498 1499 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, 1500 obj_request, obj_request->img_request, obj_request->result, 1501 xferred, length); 1502 /* 1503 * ENOENT means a hole in the image. We zero-fill the 1504 * entire length of the request. A short read also implies 1505 * zero-fill to the end of the request. Either way we 1506 * update the xferred count to indicate the whole request 1507 * was satisfied. 1508 */ 1509 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); 1510 if (obj_request->result == -ENOENT) { 1511 if (obj_request->type == OBJ_REQUEST_BIO) 1512 zero_bio_chain(obj_request->bio_list, 0); 1513 else 1514 zero_pages(obj_request->pages, 0, length); 1515 obj_request->result = 0; 1516 obj_request->xferred = length; 1517 } else if (xferred < length && !obj_request->result) { 1518 if (obj_request->type == OBJ_REQUEST_BIO) 1519 zero_bio_chain(obj_request->bio_list, xferred); 1520 else 1521 zero_pages(obj_request->pages, xferred, length); 1522 obj_request->xferred = length; 1523 } 1524 obj_request_done_set(obj_request); 1525 } 1526 1527 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) 1528 { 1529 dout("%s: obj %p cb %p\n", __func__, obj_request, 1530 obj_request->callback); 1531 if (obj_request->callback) 1532 obj_request->callback(obj_request); 1533 else 1534 complete_all(&obj_request->completion); 1535 } 1536 1537 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request) 1538 { 1539 dout("%s: obj %p\n", __func__, obj_request); 1540 obj_request_done_set(obj_request); 1541 } 1542 1543 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) 1544 { 1545 struct rbd_img_request *img_request = NULL; 1546 struct rbd_device *rbd_dev = NULL; 1547 bool layered = false; 1548 1549 if (obj_request_img_data_test(obj_request)) { 1550 img_request = obj_request->img_request; 1551 layered = img_request && img_request_layered_test(img_request); 1552 rbd_dev = img_request->rbd_dev; 1553 } 1554 1555 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, 1556 obj_request, img_request, obj_request->result, 1557 obj_request->xferred, obj_request->length); 1558 if (layered && obj_request->result == -ENOENT && 1559 obj_request->img_offset < rbd_dev->parent_overlap) 1560 rbd_img_parent_read(obj_request); 1561 else if (img_request) 1562 rbd_img_obj_request_read_callback(obj_request); 1563 else 1564 obj_request_done_set(obj_request); 1565 } 1566 1567 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) 1568 { 1569 dout("%s: obj %p result %d %llu\n", __func__, obj_request, 1570 obj_request->result, obj_request->length); 1571 /* 1572 * There is no such thing as a successful short write. Set 1573 * it to our originally-requested length. 1574 */ 1575 obj_request->xferred = obj_request->length; 1576 obj_request_done_set(obj_request); 1577 } 1578 1579 /* 1580 * For a simple stat call there's nothing to do. We'll do more if 1581 * this is part of a write sequence for a layered image. 1582 */ 1583 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request) 1584 { 1585 dout("%s: obj %p\n", __func__, obj_request); 1586 obj_request_done_set(obj_request); 1587 } 1588 1589 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, 1590 struct ceph_msg *msg) 1591 { 1592 struct rbd_obj_request *obj_request = osd_req->r_priv; 1593 u16 opcode; 1594 1595 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); 1596 rbd_assert(osd_req == obj_request->osd_req); 1597 if (obj_request_img_data_test(obj_request)) { 1598 rbd_assert(obj_request->img_request); 1599 rbd_assert(obj_request->which != BAD_WHICH); 1600 } else { 1601 rbd_assert(obj_request->which == BAD_WHICH); 1602 } 1603 1604 if (osd_req->r_result < 0) 1605 obj_request->result = osd_req->r_result; 1606 1607 BUG_ON(osd_req->r_num_ops > 2); 1608 1609 /* 1610 * We support a 64-bit length, but ultimately it has to be 1611 * passed to blk_end_request(), which takes an unsigned int. 1612 */ 1613 obj_request->xferred = osd_req->r_reply_op_len[0]; 1614 rbd_assert(obj_request->xferred < (u64)UINT_MAX); 1615 opcode = osd_req->r_ops[0].op; 1616 switch (opcode) { 1617 case CEPH_OSD_OP_READ: 1618 rbd_osd_read_callback(obj_request); 1619 break; 1620 case CEPH_OSD_OP_WRITE: 1621 rbd_osd_write_callback(obj_request); 1622 break; 1623 case CEPH_OSD_OP_STAT: 1624 rbd_osd_stat_callback(obj_request); 1625 break; 1626 case CEPH_OSD_OP_CALL: 1627 case CEPH_OSD_OP_NOTIFY_ACK: 1628 case CEPH_OSD_OP_WATCH: 1629 rbd_osd_trivial_callback(obj_request); 1630 break; 1631 default: 1632 rbd_warn(NULL, "%s: unsupported op %hu\n", 1633 obj_request->object_name, (unsigned short) opcode); 1634 break; 1635 } 1636 1637 if (obj_request_done_test(obj_request)) 1638 rbd_obj_request_complete(obj_request); 1639 } 1640 1641 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) 1642 { 1643 struct rbd_img_request *img_request = obj_request->img_request; 1644 struct ceph_osd_request *osd_req = obj_request->osd_req; 1645 u64 snap_id; 1646 1647 rbd_assert(osd_req != NULL); 1648 1649 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP; 1650 ceph_osdc_build_request(osd_req, obj_request->offset, 1651 NULL, snap_id, NULL); 1652 } 1653 1654 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) 1655 { 1656 struct rbd_img_request *img_request = obj_request->img_request; 1657 struct ceph_osd_request *osd_req = obj_request->osd_req; 1658 struct ceph_snap_context *snapc; 1659 struct timespec mtime = CURRENT_TIME; 1660 1661 rbd_assert(osd_req != NULL); 1662 1663 snapc = img_request ? img_request->snapc : NULL; 1664 ceph_osdc_build_request(osd_req, obj_request->offset, 1665 snapc, CEPH_NOSNAP, &mtime); 1666 } 1667 1668 static struct ceph_osd_request *rbd_osd_req_create( 1669 struct rbd_device *rbd_dev, 1670 bool write_request, 1671 struct rbd_obj_request *obj_request) 1672 { 1673 struct ceph_snap_context *snapc = NULL; 1674 struct ceph_osd_client *osdc; 1675 struct ceph_osd_request *osd_req; 1676 1677 if (obj_request_img_data_test(obj_request)) { 1678 struct rbd_img_request *img_request = obj_request->img_request; 1679 1680 rbd_assert(write_request == 1681 img_request_write_test(img_request)); 1682 if (write_request) 1683 snapc = img_request->snapc; 1684 } 1685 1686 /* Allocate and initialize the request, for the single op */ 1687 1688 osdc = &rbd_dev->rbd_client->client->osdc; 1689 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC); 1690 if (!osd_req) 1691 return NULL; /* ENOMEM */ 1692 1693 if (write_request) 1694 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; 1695 else 1696 osd_req->r_flags = CEPH_OSD_FLAG_READ; 1697 1698 osd_req->r_callback = rbd_osd_req_callback; 1699 osd_req->r_priv = obj_request; 1700 1701 osd_req->r_oid_len = strlen(obj_request->object_name); 1702 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); 1703 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); 1704 1705 osd_req->r_file_layout = rbd_dev->layout; /* struct */ 1706 1707 return osd_req; 1708 } 1709 1710 /* 1711 * Create a copyup osd request based on the information in the 1712 * object request supplied. A copyup request has two osd ops, 1713 * a copyup method call, and a "normal" write request. 1714 */ 1715 static struct ceph_osd_request * 1716 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) 1717 { 1718 struct rbd_img_request *img_request; 1719 struct ceph_snap_context *snapc; 1720 struct rbd_device *rbd_dev; 1721 struct ceph_osd_client *osdc; 1722 struct ceph_osd_request *osd_req; 1723 1724 rbd_assert(obj_request_img_data_test(obj_request)); 1725 img_request = obj_request->img_request; 1726 rbd_assert(img_request); 1727 rbd_assert(img_request_write_test(img_request)); 1728 1729 /* Allocate and initialize the request, for the two ops */ 1730 1731 snapc = img_request->snapc; 1732 rbd_dev = img_request->rbd_dev; 1733 osdc = &rbd_dev->rbd_client->client->osdc; 1734 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC); 1735 if (!osd_req) 1736 return NULL; /* ENOMEM */ 1737 1738 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; 1739 osd_req->r_callback = rbd_osd_req_callback; 1740 osd_req->r_priv = obj_request; 1741 1742 osd_req->r_oid_len = strlen(obj_request->object_name); 1743 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); 1744 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); 1745 1746 osd_req->r_file_layout = rbd_dev->layout; /* struct */ 1747 1748 return osd_req; 1749 } 1750 1751 1752 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) 1753 { 1754 ceph_osdc_put_request(osd_req); 1755 } 1756 1757 /* object_name is assumed to be a non-null pointer and NUL-terminated */ 1758 1759 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, 1760 u64 offset, u64 length, 1761 enum obj_request_type type) 1762 { 1763 struct rbd_obj_request *obj_request; 1764 size_t size; 1765 char *name; 1766 1767 rbd_assert(obj_request_type_valid(type)); 1768 1769 size = strlen(object_name) + 1; 1770 name = kmalloc(size, GFP_KERNEL); 1771 if (!name) 1772 return NULL; 1773 1774 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL); 1775 if (!obj_request) { 1776 kfree(name); 1777 return NULL; 1778 } 1779 1780 obj_request->object_name = memcpy(name, object_name, size); 1781 obj_request->offset = offset; 1782 obj_request->length = length; 1783 obj_request->flags = 0; 1784 obj_request->which = BAD_WHICH; 1785 obj_request->type = type; 1786 INIT_LIST_HEAD(&obj_request->links); 1787 init_completion(&obj_request->completion); 1788 kref_init(&obj_request->kref); 1789 1790 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name, 1791 offset, length, (int)type, obj_request); 1792 1793 return obj_request; 1794 } 1795 1796 static void rbd_obj_request_destroy(struct kref *kref) 1797 { 1798 struct rbd_obj_request *obj_request; 1799 1800 obj_request = container_of(kref, struct rbd_obj_request, kref); 1801 1802 dout("%s: obj %p\n", __func__, obj_request); 1803 1804 rbd_assert(obj_request->img_request == NULL); 1805 rbd_assert(obj_request->which == BAD_WHICH); 1806 1807 if (obj_request->osd_req) 1808 rbd_osd_req_destroy(obj_request->osd_req); 1809 1810 rbd_assert(obj_request_type_valid(obj_request->type)); 1811 switch (obj_request->type) { 1812 case OBJ_REQUEST_NODATA: 1813 break; /* Nothing to do */ 1814 case OBJ_REQUEST_BIO: 1815 if (obj_request->bio_list) 1816 bio_chain_put(obj_request->bio_list); 1817 break; 1818 case OBJ_REQUEST_PAGES: 1819 if (obj_request->pages) 1820 ceph_release_page_vector(obj_request->pages, 1821 obj_request->page_count); 1822 break; 1823 } 1824 1825 kfree(obj_request->object_name); 1826 obj_request->object_name = NULL; 1827 kmem_cache_free(rbd_obj_request_cache, obj_request); 1828 } 1829 1830 /* 1831 * Caller is responsible for filling in the list of object requests 1832 * that comprises the image request, and the Linux request pointer 1833 * (if there is one). 1834 */ 1835 static struct rbd_img_request *rbd_img_request_create( 1836 struct rbd_device *rbd_dev, 1837 u64 offset, u64 length, 1838 bool write_request, 1839 bool child_request) 1840 { 1841 struct rbd_img_request *img_request; 1842 1843 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC); 1844 if (!img_request) 1845 return NULL; 1846 1847 if (write_request) { 1848 down_read(&rbd_dev->header_rwsem); 1849 ceph_get_snap_context(rbd_dev->header.snapc); 1850 up_read(&rbd_dev->header_rwsem); 1851 } 1852 1853 img_request->rq = NULL; 1854 img_request->rbd_dev = rbd_dev; 1855 img_request->offset = offset; 1856 img_request->length = length; 1857 img_request->flags = 0; 1858 if (write_request) { 1859 img_request_write_set(img_request); 1860 img_request->snapc = rbd_dev->header.snapc; 1861 } else { 1862 img_request->snap_id = rbd_dev->spec->snap_id; 1863 } 1864 if (child_request) 1865 img_request_child_set(img_request); 1866 if (rbd_dev->parent_spec) 1867 img_request_layered_set(img_request); 1868 spin_lock_init(&img_request->completion_lock); 1869 img_request->next_completion = 0; 1870 img_request->callback = NULL; 1871 img_request->result = 0; 1872 img_request->obj_request_count = 0; 1873 INIT_LIST_HEAD(&img_request->obj_requests); 1874 kref_init(&img_request->kref); 1875 1876 rbd_img_request_get(img_request); /* Avoid a warning */ 1877 rbd_img_request_put(img_request); /* TEMPORARY */ 1878 1879 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, 1880 write_request ? "write" : "read", offset, length, 1881 img_request); 1882 1883 return img_request; 1884 } 1885 1886 static void rbd_img_request_destroy(struct kref *kref) 1887 { 1888 struct rbd_img_request *img_request; 1889 struct rbd_obj_request *obj_request; 1890 struct rbd_obj_request *next_obj_request; 1891 1892 img_request = container_of(kref, struct rbd_img_request, kref); 1893 1894 dout("%s: img %p\n", __func__, img_request); 1895 1896 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1897 rbd_img_obj_request_del(img_request, obj_request); 1898 rbd_assert(img_request->obj_request_count == 0); 1899 1900 if (img_request_write_test(img_request)) 1901 ceph_put_snap_context(img_request->snapc); 1902 1903 if (img_request_child_test(img_request)) 1904 rbd_obj_request_put(img_request->obj_request); 1905 1906 kmem_cache_free(rbd_img_request_cache, img_request); 1907 } 1908 1909 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) 1910 { 1911 struct rbd_img_request *img_request; 1912 unsigned int xferred; 1913 int result; 1914 bool more; 1915 1916 rbd_assert(obj_request_img_data_test(obj_request)); 1917 img_request = obj_request->img_request; 1918 1919 rbd_assert(obj_request->xferred <= (u64)UINT_MAX); 1920 xferred = (unsigned int)obj_request->xferred; 1921 result = obj_request->result; 1922 if (result) { 1923 struct rbd_device *rbd_dev = img_request->rbd_dev; 1924 1925 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n", 1926 img_request_write_test(img_request) ? "write" : "read", 1927 obj_request->length, obj_request->img_offset, 1928 obj_request->offset); 1929 rbd_warn(rbd_dev, " result %d xferred %x\n", 1930 result, xferred); 1931 if (!img_request->result) 1932 img_request->result = result; 1933 } 1934 1935 /* Image object requests don't own their page array */ 1936 1937 if (obj_request->type == OBJ_REQUEST_PAGES) { 1938 obj_request->pages = NULL; 1939 obj_request->page_count = 0; 1940 } 1941 1942 if (img_request_child_test(img_request)) { 1943 rbd_assert(img_request->obj_request != NULL); 1944 more = obj_request->which < img_request->obj_request_count - 1; 1945 } else { 1946 rbd_assert(img_request->rq != NULL); 1947 more = blk_end_request(img_request->rq, result, xferred); 1948 } 1949 1950 return more; 1951 } 1952 1953 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) 1954 { 1955 struct rbd_img_request *img_request; 1956 u32 which = obj_request->which; 1957 bool more = true; 1958 1959 rbd_assert(obj_request_img_data_test(obj_request)); 1960 img_request = obj_request->img_request; 1961 1962 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1963 rbd_assert(img_request != NULL); 1964 rbd_assert(img_request->obj_request_count > 0); 1965 rbd_assert(which != BAD_WHICH); 1966 rbd_assert(which < img_request->obj_request_count); 1967 rbd_assert(which >= img_request->next_completion); 1968 1969 spin_lock_irq(&img_request->completion_lock); 1970 if (which != img_request->next_completion) 1971 goto out; 1972 1973 for_each_obj_request_from(img_request, obj_request) { 1974 rbd_assert(more); 1975 rbd_assert(which < img_request->obj_request_count); 1976 1977 if (!obj_request_done_test(obj_request)) 1978 break; 1979 more = rbd_img_obj_end_request(obj_request); 1980 which++; 1981 } 1982 1983 rbd_assert(more ^ (which == img_request->obj_request_count)); 1984 img_request->next_completion = which; 1985 out: 1986 spin_unlock_irq(&img_request->completion_lock); 1987 1988 if (!more) 1989 rbd_img_request_complete(img_request); 1990 } 1991 1992 /* 1993 * Split up an image request into one or more object requests, each 1994 * to a different object. The "type" parameter indicates whether 1995 * "data_desc" is the pointer to the head of a list of bio 1996 * structures, or the base of a page array. In either case this 1997 * function assumes data_desc describes memory sufficient to hold 1998 * all data described by the image request. 1999 */ 2000 static int rbd_img_request_fill(struct rbd_img_request *img_request, 2001 enum obj_request_type type, 2002 void *data_desc) 2003 { 2004 struct rbd_device *rbd_dev = img_request->rbd_dev; 2005 struct rbd_obj_request *obj_request = NULL; 2006 struct rbd_obj_request *next_obj_request; 2007 bool write_request = img_request_write_test(img_request); 2008 struct bio *bio_list; 2009 unsigned int bio_offset = 0; 2010 struct page **pages; 2011 u64 img_offset; 2012 u64 resid; 2013 u16 opcode; 2014 2015 dout("%s: img %p type %d data_desc %p\n", __func__, img_request, 2016 (int)type, data_desc); 2017 2018 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ; 2019 img_offset = img_request->offset; 2020 resid = img_request->length; 2021 rbd_assert(resid > 0); 2022 2023 if (type == OBJ_REQUEST_BIO) { 2024 bio_list = data_desc; 2025 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT); 2026 } else { 2027 rbd_assert(type == OBJ_REQUEST_PAGES); 2028 pages = data_desc; 2029 } 2030 2031 while (resid) { 2032 struct ceph_osd_request *osd_req; 2033 const char *object_name; 2034 u64 offset; 2035 u64 length; 2036 2037 object_name = rbd_segment_name(rbd_dev, img_offset); 2038 if (!object_name) 2039 goto out_unwind; 2040 offset = rbd_segment_offset(rbd_dev, img_offset); 2041 length = rbd_segment_length(rbd_dev, img_offset, resid); 2042 obj_request = rbd_obj_request_create(object_name, 2043 offset, length, type); 2044 /* object request has its own copy of the object name */ 2045 rbd_segment_name_free(object_name); 2046 if (!obj_request) 2047 goto out_unwind; 2048 2049 if (type == OBJ_REQUEST_BIO) { 2050 unsigned int clone_size; 2051 2052 rbd_assert(length <= (u64)UINT_MAX); 2053 clone_size = (unsigned int)length; 2054 obj_request->bio_list = 2055 bio_chain_clone_range(&bio_list, 2056 &bio_offset, 2057 clone_size, 2058 GFP_ATOMIC); 2059 if (!obj_request->bio_list) 2060 goto out_partial; 2061 } else { 2062 unsigned int page_count; 2063 2064 obj_request->pages = pages; 2065 page_count = (u32)calc_pages_for(offset, length); 2066 obj_request->page_count = page_count; 2067 if ((offset + length) & ~PAGE_MASK) 2068 page_count--; /* more on last page */ 2069 pages += page_count; 2070 } 2071 2072 osd_req = rbd_osd_req_create(rbd_dev, write_request, 2073 obj_request); 2074 if (!osd_req) 2075 goto out_partial; 2076 obj_request->osd_req = osd_req; 2077 obj_request->callback = rbd_img_obj_callback; 2078 2079 osd_req_op_extent_init(osd_req, 0, opcode, offset, length, 2080 0, 0); 2081 if (type == OBJ_REQUEST_BIO) 2082 osd_req_op_extent_osd_data_bio(osd_req, 0, 2083 obj_request->bio_list, length); 2084 else 2085 osd_req_op_extent_osd_data_pages(osd_req, 0, 2086 obj_request->pages, length, 2087 offset & ~PAGE_MASK, false, false); 2088 2089 if (write_request) 2090 rbd_osd_req_format_write(obj_request); 2091 else 2092 rbd_osd_req_format_read(obj_request); 2093 2094 obj_request->img_offset = img_offset; 2095 rbd_img_obj_request_add(img_request, obj_request); 2096 2097 img_offset += length; 2098 resid -= length; 2099 } 2100 2101 return 0; 2102 2103 out_partial: 2104 rbd_obj_request_put(obj_request); 2105 out_unwind: 2106 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 2107 rbd_obj_request_put(obj_request); 2108 2109 return -ENOMEM; 2110 } 2111 2112 static void 2113 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request) 2114 { 2115 struct rbd_img_request *img_request; 2116 struct rbd_device *rbd_dev; 2117 u64 length; 2118 u32 page_count; 2119 2120 rbd_assert(obj_request->type == OBJ_REQUEST_BIO); 2121 rbd_assert(obj_request_img_data_test(obj_request)); 2122 img_request = obj_request->img_request; 2123 rbd_assert(img_request); 2124 2125 rbd_dev = img_request->rbd_dev; 2126 rbd_assert(rbd_dev); 2127 length = (u64)1 << rbd_dev->header.obj_order; 2128 page_count = (u32)calc_pages_for(0, length); 2129 2130 rbd_assert(obj_request->copyup_pages); 2131 ceph_release_page_vector(obj_request->copyup_pages, page_count); 2132 obj_request->copyup_pages = NULL; 2133 2134 /* 2135 * We want the transfer count to reflect the size of the 2136 * original write request. There is no such thing as a 2137 * successful short write, so if the request was successful 2138 * we can just set it to the originally-requested length. 2139 */ 2140 if (!obj_request->result) 2141 obj_request->xferred = obj_request->length; 2142 2143 /* Finish up with the normal image object callback */ 2144 2145 rbd_img_obj_callback(obj_request); 2146 } 2147 2148 static void 2149 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) 2150 { 2151 struct rbd_obj_request *orig_request; 2152 struct ceph_osd_request *osd_req; 2153 struct ceph_osd_client *osdc; 2154 struct rbd_device *rbd_dev; 2155 struct page **pages; 2156 int result; 2157 u64 obj_size; 2158 u64 xferred; 2159 2160 rbd_assert(img_request_child_test(img_request)); 2161 2162 /* First get what we need from the image request */ 2163 2164 pages = img_request->copyup_pages; 2165 rbd_assert(pages != NULL); 2166 img_request->copyup_pages = NULL; 2167 2168 orig_request = img_request->obj_request; 2169 rbd_assert(orig_request != NULL); 2170 rbd_assert(orig_request->type == OBJ_REQUEST_BIO); 2171 result = img_request->result; 2172 obj_size = img_request->length; 2173 xferred = img_request->xferred; 2174 2175 rbd_dev = img_request->rbd_dev; 2176 rbd_assert(rbd_dev); 2177 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order); 2178 2179 rbd_img_request_put(img_request); 2180 2181 if (result) 2182 goto out_err; 2183 2184 /* Allocate the new copyup osd request for the original request */ 2185 2186 result = -ENOMEM; 2187 rbd_assert(!orig_request->osd_req); 2188 osd_req = rbd_osd_req_create_copyup(orig_request); 2189 if (!osd_req) 2190 goto out_err; 2191 orig_request->osd_req = osd_req; 2192 orig_request->copyup_pages = pages; 2193 2194 /* Initialize the copyup op */ 2195 2196 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); 2197 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0, 2198 false, false); 2199 2200 /* Then the original write request op */ 2201 2202 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE, 2203 orig_request->offset, 2204 orig_request->length, 0, 0); 2205 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list, 2206 orig_request->length); 2207 2208 rbd_osd_req_format_write(orig_request); 2209 2210 /* All set, send it off. */ 2211 2212 orig_request->callback = rbd_img_obj_copyup_callback; 2213 osdc = &rbd_dev->rbd_client->client->osdc; 2214 result = rbd_obj_request_submit(osdc, orig_request); 2215 if (!result) 2216 return; 2217 out_err: 2218 /* Record the error code and complete the request */ 2219 2220 orig_request->result = result; 2221 orig_request->xferred = 0; 2222 obj_request_done_set(orig_request); 2223 rbd_obj_request_complete(orig_request); 2224 } 2225 2226 /* 2227 * Read from the parent image the range of data that covers the 2228 * entire target of the given object request. This is used for 2229 * satisfying a layered image write request when the target of an 2230 * object request from the image request does not exist. 2231 * 2232 * A page array big enough to hold the returned data is allocated 2233 * and supplied to rbd_img_request_fill() as the "data descriptor." 2234 * When the read completes, this page array will be transferred to 2235 * the original object request for the copyup operation. 2236 * 2237 * If an error occurs, record it as the result of the original 2238 * object request and mark it done so it gets completed. 2239 */ 2240 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) 2241 { 2242 struct rbd_img_request *img_request = NULL; 2243 struct rbd_img_request *parent_request = NULL; 2244 struct rbd_device *rbd_dev; 2245 u64 img_offset; 2246 u64 length; 2247 struct page **pages = NULL; 2248 u32 page_count; 2249 int result; 2250 2251 rbd_assert(obj_request_img_data_test(obj_request)); 2252 rbd_assert(obj_request->type == OBJ_REQUEST_BIO); 2253 2254 img_request = obj_request->img_request; 2255 rbd_assert(img_request != NULL); 2256 rbd_dev = img_request->rbd_dev; 2257 rbd_assert(rbd_dev->parent != NULL); 2258 2259 /* 2260 * First things first. The original osd request is of no 2261 * use to use any more, we'll need a new one that can hold 2262 * the two ops in a copyup request. We'll get that later, 2263 * but for now we can release the old one. 2264 */ 2265 rbd_osd_req_destroy(obj_request->osd_req); 2266 obj_request->osd_req = NULL; 2267 2268 /* 2269 * Determine the byte range covered by the object in the 2270 * child image to which the original request was to be sent. 2271 */ 2272 img_offset = obj_request->img_offset - obj_request->offset; 2273 length = (u64)1 << rbd_dev->header.obj_order; 2274 2275 /* 2276 * There is no defined parent data beyond the parent 2277 * overlap, so limit what we read at that boundary if 2278 * necessary. 2279 */ 2280 if (img_offset + length > rbd_dev->parent_overlap) { 2281 rbd_assert(img_offset < rbd_dev->parent_overlap); 2282 length = rbd_dev->parent_overlap - img_offset; 2283 } 2284 2285 /* 2286 * Allocate a page array big enough to receive the data read 2287 * from the parent. 2288 */ 2289 page_count = (u32)calc_pages_for(0, length); 2290 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2291 if (IS_ERR(pages)) { 2292 result = PTR_ERR(pages); 2293 pages = NULL; 2294 goto out_err; 2295 } 2296 2297 result = -ENOMEM; 2298 parent_request = rbd_img_request_create(rbd_dev->parent, 2299 img_offset, length, 2300 false, true); 2301 if (!parent_request) 2302 goto out_err; 2303 rbd_obj_request_get(obj_request); 2304 parent_request->obj_request = obj_request; 2305 2306 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); 2307 if (result) 2308 goto out_err; 2309 parent_request->copyup_pages = pages; 2310 2311 parent_request->callback = rbd_img_obj_parent_read_full_callback; 2312 result = rbd_img_request_submit(parent_request); 2313 if (!result) 2314 return 0; 2315 2316 parent_request->copyup_pages = NULL; 2317 parent_request->obj_request = NULL; 2318 rbd_obj_request_put(obj_request); 2319 out_err: 2320 if (pages) 2321 ceph_release_page_vector(pages, page_count); 2322 if (parent_request) 2323 rbd_img_request_put(parent_request); 2324 obj_request->result = result; 2325 obj_request->xferred = 0; 2326 obj_request_done_set(obj_request); 2327 2328 return result; 2329 } 2330 2331 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) 2332 { 2333 struct rbd_obj_request *orig_request; 2334 int result; 2335 2336 rbd_assert(!obj_request_img_data_test(obj_request)); 2337 2338 /* 2339 * All we need from the object request is the original 2340 * request and the result of the STAT op. Grab those, then 2341 * we're done with the request. 2342 */ 2343 orig_request = obj_request->obj_request; 2344 obj_request->obj_request = NULL; 2345 rbd_assert(orig_request); 2346 rbd_assert(orig_request->img_request); 2347 2348 result = obj_request->result; 2349 obj_request->result = 0; 2350 2351 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__, 2352 obj_request, orig_request, result, 2353 obj_request->xferred, obj_request->length); 2354 rbd_obj_request_put(obj_request); 2355 2356 rbd_assert(orig_request); 2357 rbd_assert(orig_request->img_request); 2358 2359 /* 2360 * Our only purpose here is to determine whether the object 2361 * exists, and we don't want to treat the non-existence as 2362 * an error. If something else comes back, transfer the 2363 * error to the original request and complete it now. 2364 */ 2365 if (!result) { 2366 obj_request_existence_set(orig_request, true); 2367 } else if (result == -ENOENT) { 2368 obj_request_existence_set(orig_request, false); 2369 } else if (result) { 2370 orig_request->result = result; 2371 goto out; 2372 } 2373 2374 /* 2375 * Resubmit the original request now that we have recorded 2376 * whether the target object exists. 2377 */ 2378 orig_request->result = rbd_img_obj_request_submit(orig_request); 2379 out: 2380 if (orig_request->result) 2381 rbd_obj_request_complete(orig_request); 2382 rbd_obj_request_put(orig_request); 2383 } 2384 2385 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) 2386 { 2387 struct rbd_obj_request *stat_request; 2388 struct rbd_device *rbd_dev; 2389 struct ceph_osd_client *osdc; 2390 struct page **pages = NULL; 2391 u32 page_count; 2392 size_t size; 2393 int ret; 2394 2395 /* 2396 * The response data for a STAT call consists of: 2397 * le64 length; 2398 * struct { 2399 * le32 tv_sec; 2400 * le32 tv_nsec; 2401 * } mtime; 2402 */ 2403 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); 2404 page_count = (u32)calc_pages_for(0, size); 2405 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2406 if (IS_ERR(pages)) 2407 return PTR_ERR(pages); 2408 2409 ret = -ENOMEM; 2410 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, 2411 OBJ_REQUEST_PAGES); 2412 if (!stat_request) 2413 goto out; 2414 2415 rbd_obj_request_get(obj_request); 2416 stat_request->obj_request = obj_request; 2417 stat_request->pages = pages; 2418 stat_request->page_count = page_count; 2419 2420 rbd_assert(obj_request->img_request); 2421 rbd_dev = obj_request->img_request->rbd_dev; 2422 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, 2423 stat_request); 2424 if (!stat_request->osd_req) 2425 goto out; 2426 stat_request->callback = rbd_img_obj_exists_callback; 2427 2428 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT); 2429 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, 2430 false, false); 2431 rbd_osd_req_format_read(stat_request); 2432 2433 osdc = &rbd_dev->rbd_client->client->osdc; 2434 ret = rbd_obj_request_submit(osdc, stat_request); 2435 out: 2436 if (ret) 2437 rbd_obj_request_put(obj_request); 2438 2439 return ret; 2440 } 2441 2442 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) 2443 { 2444 struct rbd_img_request *img_request; 2445 struct rbd_device *rbd_dev; 2446 bool known; 2447 2448 rbd_assert(obj_request_img_data_test(obj_request)); 2449 2450 img_request = obj_request->img_request; 2451 rbd_assert(img_request); 2452 rbd_dev = img_request->rbd_dev; 2453 2454 /* 2455 * Only writes to layered images need special handling. 2456 * Reads and non-layered writes are simple object requests. 2457 * Layered writes that start beyond the end of the overlap 2458 * with the parent have no parent data, so they too are 2459 * simple object requests. Finally, if the target object is 2460 * known to already exist, its parent data has already been 2461 * copied, so a write to the object can also be handled as a 2462 * simple object request. 2463 */ 2464 if (!img_request_write_test(img_request) || 2465 !img_request_layered_test(img_request) || 2466 rbd_dev->parent_overlap <= obj_request->img_offset || 2467 ((known = obj_request_known_test(obj_request)) && 2468 obj_request_exists_test(obj_request))) { 2469 2470 struct rbd_device *rbd_dev; 2471 struct ceph_osd_client *osdc; 2472 2473 rbd_dev = obj_request->img_request->rbd_dev; 2474 osdc = &rbd_dev->rbd_client->client->osdc; 2475 2476 return rbd_obj_request_submit(osdc, obj_request); 2477 } 2478 2479 /* 2480 * It's a layered write. The target object might exist but 2481 * we may not know that yet. If we know it doesn't exist, 2482 * start by reading the data for the full target object from 2483 * the parent so we can use it for a copyup to the target. 2484 */ 2485 if (known) 2486 return rbd_img_obj_parent_read_full(obj_request); 2487 2488 /* We don't know whether the target exists. Go find out. */ 2489 2490 return rbd_img_obj_exists_submit(obj_request); 2491 } 2492 2493 static int rbd_img_request_submit(struct rbd_img_request *img_request) 2494 { 2495 struct rbd_obj_request *obj_request; 2496 struct rbd_obj_request *next_obj_request; 2497 2498 dout("%s: img %p\n", __func__, img_request); 2499 for_each_obj_request_safe(img_request, obj_request, next_obj_request) { 2500 int ret; 2501 2502 ret = rbd_img_obj_request_submit(obj_request); 2503 if (ret) 2504 return ret; 2505 } 2506 2507 return 0; 2508 } 2509 2510 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) 2511 { 2512 struct rbd_obj_request *obj_request; 2513 struct rbd_device *rbd_dev; 2514 u64 obj_end; 2515 2516 rbd_assert(img_request_child_test(img_request)); 2517 2518 obj_request = img_request->obj_request; 2519 rbd_assert(obj_request); 2520 rbd_assert(obj_request->img_request); 2521 2522 obj_request->result = img_request->result; 2523 if (obj_request->result) 2524 goto out; 2525 2526 /* 2527 * We need to zero anything beyond the parent overlap 2528 * boundary. Since rbd_img_obj_request_read_callback() 2529 * will zero anything beyond the end of a short read, an 2530 * easy way to do this is to pretend the data from the 2531 * parent came up short--ending at the overlap boundary. 2532 */ 2533 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length); 2534 obj_end = obj_request->img_offset + obj_request->length; 2535 rbd_dev = obj_request->img_request->rbd_dev; 2536 if (obj_end > rbd_dev->parent_overlap) { 2537 u64 xferred = 0; 2538 2539 if (obj_request->img_offset < rbd_dev->parent_overlap) 2540 xferred = rbd_dev->parent_overlap - 2541 obj_request->img_offset; 2542 2543 obj_request->xferred = min(img_request->xferred, xferred); 2544 } else { 2545 obj_request->xferred = img_request->xferred; 2546 } 2547 out: 2548 rbd_img_request_put(img_request); 2549 rbd_img_obj_request_read_callback(obj_request); 2550 rbd_obj_request_complete(obj_request); 2551 } 2552 2553 static void rbd_img_parent_read(struct rbd_obj_request *obj_request) 2554 { 2555 struct rbd_device *rbd_dev; 2556 struct rbd_img_request *img_request; 2557 int result; 2558 2559 rbd_assert(obj_request_img_data_test(obj_request)); 2560 rbd_assert(obj_request->img_request != NULL); 2561 rbd_assert(obj_request->result == (s32) -ENOENT); 2562 rbd_assert(obj_request->type == OBJ_REQUEST_BIO); 2563 2564 rbd_dev = obj_request->img_request->rbd_dev; 2565 rbd_assert(rbd_dev->parent != NULL); 2566 /* rbd_read_finish(obj_request, obj_request->length); */ 2567 img_request = rbd_img_request_create(rbd_dev->parent, 2568 obj_request->img_offset, 2569 obj_request->length, 2570 false, true); 2571 result = -ENOMEM; 2572 if (!img_request) 2573 goto out_err; 2574 2575 rbd_obj_request_get(obj_request); 2576 img_request->obj_request = obj_request; 2577 2578 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, 2579 obj_request->bio_list); 2580 if (result) 2581 goto out_err; 2582 2583 img_request->callback = rbd_img_parent_read_callback; 2584 result = rbd_img_request_submit(img_request); 2585 if (result) 2586 goto out_err; 2587 2588 return; 2589 out_err: 2590 if (img_request) 2591 rbd_img_request_put(img_request); 2592 obj_request->result = result; 2593 obj_request->xferred = 0; 2594 obj_request_done_set(obj_request); 2595 } 2596 2597 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id) 2598 { 2599 struct rbd_obj_request *obj_request; 2600 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2601 int ret; 2602 2603 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 2604 OBJ_REQUEST_NODATA); 2605 if (!obj_request) 2606 return -ENOMEM; 2607 2608 ret = -ENOMEM; 2609 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); 2610 if (!obj_request->osd_req) 2611 goto out; 2612 obj_request->callback = rbd_obj_request_put; 2613 2614 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK, 2615 notify_id, 0, 0); 2616 rbd_osd_req_format_read(obj_request); 2617 2618 ret = rbd_obj_request_submit(osdc, obj_request); 2619 out: 2620 if (ret) 2621 rbd_obj_request_put(obj_request); 2622 2623 return ret; 2624 } 2625 2626 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 2627 { 2628 struct rbd_device *rbd_dev = (struct rbd_device *)data; 2629 2630 if (!rbd_dev) 2631 return; 2632 2633 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, 2634 rbd_dev->header_name, (unsigned long long)notify_id, 2635 (unsigned int)opcode); 2636 (void)rbd_dev_refresh(rbd_dev); 2637 2638 rbd_obj_notify_ack(rbd_dev, notify_id); 2639 } 2640 2641 /* 2642 * Request sync osd watch/unwatch. The value of "start" determines 2643 * whether a watch request is being initiated or torn down. 2644 */ 2645 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) 2646 { 2647 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2648 struct rbd_obj_request *obj_request; 2649 int ret; 2650 2651 rbd_assert(start ^ !!rbd_dev->watch_event); 2652 rbd_assert(start ^ !!rbd_dev->watch_request); 2653 2654 if (start) { 2655 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, 2656 &rbd_dev->watch_event); 2657 if (ret < 0) 2658 return ret; 2659 rbd_assert(rbd_dev->watch_event != NULL); 2660 } 2661 2662 ret = -ENOMEM; 2663 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 2664 OBJ_REQUEST_NODATA); 2665 if (!obj_request) 2666 goto out_cancel; 2667 2668 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request); 2669 if (!obj_request->osd_req) 2670 goto out_cancel; 2671 2672 if (start) 2673 ceph_osdc_set_request_linger(osdc, obj_request->osd_req); 2674 else 2675 ceph_osdc_unregister_linger_request(osdc, 2676 rbd_dev->watch_request->osd_req); 2677 2678 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, 2679 rbd_dev->watch_event->cookie, 0, start); 2680 rbd_osd_req_format_write(obj_request); 2681 2682 ret = rbd_obj_request_submit(osdc, obj_request); 2683 if (ret) 2684 goto out_cancel; 2685 ret = rbd_obj_request_wait(obj_request); 2686 if (ret) 2687 goto out_cancel; 2688 ret = obj_request->result; 2689 if (ret) 2690 goto out_cancel; 2691 2692 /* 2693 * A watch request is set to linger, so the underlying osd 2694 * request won't go away until we unregister it. We retain 2695 * a pointer to the object request during that time (in 2696 * rbd_dev->watch_request), so we'll keep a reference to 2697 * it. We'll drop that reference (below) after we've 2698 * unregistered it. 2699 */ 2700 if (start) { 2701 rbd_dev->watch_request = obj_request; 2702 2703 return 0; 2704 } 2705 2706 /* We have successfully torn down the watch request */ 2707 2708 rbd_obj_request_put(rbd_dev->watch_request); 2709 rbd_dev->watch_request = NULL; 2710 out_cancel: 2711 /* Cancel the event if we're tearing down, or on error */ 2712 ceph_osdc_cancel_event(rbd_dev->watch_event); 2713 rbd_dev->watch_event = NULL; 2714 if (obj_request) 2715 rbd_obj_request_put(obj_request); 2716 2717 return ret; 2718 } 2719 2720 /* 2721 * Synchronous osd object method call. Returns the number of bytes 2722 * returned in the outbound buffer, or a negative error code. 2723 */ 2724 static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 2725 const char *object_name, 2726 const char *class_name, 2727 const char *method_name, 2728 const void *outbound, 2729 size_t outbound_size, 2730 void *inbound, 2731 size_t inbound_size) 2732 { 2733 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2734 struct rbd_obj_request *obj_request; 2735 struct page **pages; 2736 u32 page_count; 2737 int ret; 2738 2739 /* 2740 * Method calls are ultimately read operations. The result 2741 * should placed into the inbound buffer provided. They 2742 * also supply outbound data--parameters for the object 2743 * method. Currently if this is present it will be a 2744 * snapshot id. 2745 */ 2746 page_count = (u32)calc_pages_for(0, inbound_size); 2747 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2748 if (IS_ERR(pages)) 2749 return PTR_ERR(pages); 2750 2751 ret = -ENOMEM; 2752 obj_request = rbd_obj_request_create(object_name, 0, inbound_size, 2753 OBJ_REQUEST_PAGES); 2754 if (!obj_request) 2755 goto out; 2756 2757 obj_request->pages = pages; 2758 obj_request->page_count = page_count; 2759 2760 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); 2761 if (!obj_request->osd_req) 2762 goto out; 2763 2764 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL, 2765 class_name, method_name); 2766 if (outbound_size) { 2767 struct ceph_pagelist *pagelist; 2768 2769 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 2770 if (!pagelist) 2771 goto out; 2772 2773 ceph_pagelist_init(pagelist); 2774 ceph_pagelist_append(pagelist, outbound, outbound_size); 2775 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0, 2776 pagelist); 2777 } 2778 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, 2779 obj_request->pages, inbound_size, 2780 0, false, false); 2781 rbd_osd_req_format_read(obj_request); 2782 2783 ret = rbd_obj_request_submit(osdc, obj_request); 2784 if (ret) 2785 goto out; 2786 ret = rbd_obj_request_wait(obj_request); 2787 if (ret) 2788 goto out; 2789 2790 ret = obj_request->result; 2791 if (ret < 0) 2792 goto out; 2793 2794 rbd_assert(obj_request->xferred < (u64)INT_MAX); 2795 ret = (int)obj_request->xferred; 2796 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred); 2797 out: 2798 if (obj_request) 2799 rbd_obj_request_put(obj_request); 2800 else 2801 ceph_release_page_vector(pages, page_count); 2802 2803 return ret; 2804 } 2805 2806 static void rbd_request_fn(struct request_queue *q) 2807 __releases(q->queue_lock) __acquires(q->queue_lock) 2808 { 2809 struct rbd_device *rbd_dev = q->queuedata; 2810 bool read_only = rbd_dev->mapping.read_only; 2811 struct request *rq; 2812 int result; 2813 2814 while ((rq = blk_fetch_request(q))) { 2815 bool write_request = rq_data_dir(rq) == WRITE; 2816 struct rbd_img_request *img_request; 2817 u64 offset; 2818 u64 length; 2819 2820 /* Ignore any non-FS requests that filter through. */ 2821 2822 if (rq->cmd_type != REQ_TYPE_FS) { 2823 dout("%s: non-fs request type %d\n", __func__, 2824 (int) rq->cmd_type); 2825 __blk_end_request_all(rq, 0); 2826 continue; 2827 } 2828 2829 /* Ignore/skip any zero-length requests */ 2830 2831 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; 2832 length = (u64) blk_rq_bytes(rq); 2833 2834 if (!length) { 2835 dout("%s: zero-length request\n", __func__); 2836 __blk_end_request_all(rq, 0); 2837 continue; 2838 } 2839 2840 spin_unlock_irq(q->queue_lock); 2841 2842 /* Disallow writes to a read-only device */ 2843 2844 if (write_request) { 2845 result = -EROFS; 2846 if (read_only) 2847 goto end_request; 2848 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); 2849 } 2850 2851 /* 2852 * Quit early if the mapped snapshot no longer 2853 * exists. It's still possible the snapshot will 2854 * have disappeared by the time our request arrives 2855 * at the osd, but there's no sense in sending it if 2856 * we already know. 2857 */ 2858 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { 2859 dout("request for non-existent snapshot"); 2860 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 2861 result = -ENXIO; 2862 goto end_request; 2863 } 2864 2865 result = -EINVAL; 2866 if (offset && length > U64_MAX - offset + 1) { 2867 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n", 2868 offset, length); 2869 goto end_request; /* Shouldn't happen */ 2870 } 2871 2872 result = -ENOMEM; 2873 img_request = rbd_img_request_create(rbd_dev, offset, length, 2874 write_request, false); 2875 if (!img_request) 2876 goto end_request; 2877 2878 img_request->rq = rq; 2879 2880 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, 2881 rq->bio); 2882 if (!result) 2883 result = rbd_img_request_submit(img_request); 2884 if (result) 2885 rbd_img_request_put(img_request); 2886 end_request: 2887 spin_lock_irq(q->queue_lock); 2888 if (result < 0) { 2889 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n", 2890 write_request ? "write" : "read", 2891 length, offset, result); 2892 2893 __blk_end_request_all(rq, result); 2894 } 2895 } 2896 } 2897 2898 /* 2899 * a queue callback. Makes sure that we don't create a bio that spans across 2900 * multiple osd objects. One exception would be with a single page bios, 2901 * which we handle later at bio_chain_clone_range() 2902 */ 2903 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 2904 struct bio_vec *bvec) 2905 { 2906 struct rbd_device *rbd_dev = q->queuedata; 2907 sector_t sector_offset; 2908 sector_t sectors_per_obj; 2909 sector_t obj_sector_offset; 2910 int ret; 2911 2912 /* 2913 * Find how far into its rbd object the partition-relative 2914 * bio start sector is to offset relative to the enclosing 2915 * device. 2916 */ 2917 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector; 2918 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); 2919 obj_sector_offset = sector_offset & (sectors_per_obj - 1); 2920 2921 /* 2922 * Compute the number of bytes from that offset to the end 2923 * of the object. Account for what's already used by the bio. 2924 */ 2925 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT; 2926 if (ret > bmd->bi_size) 2927 ret -= bmd->bi_size; 2928 else 2929 ret = 0; 2930 2931 /* 2932 * Don't send back more than was asked for. And if the bio 2933 * was empty, let the whole thing through because: "Note 2934 * that a block device *must* allow a single page to be 2935 * added to an empty bio." 2936 */ 2937 rbd_assert(bvec->bv_len <= PAGE_SIZE); 2938 if (ret > (int) bvec->bv_len || !bmd->bi_size) 2939 ret = (int) bvec->bv_len; 2940 2941 return ret; 2942 } 2943 2944 static void rbd_free_disk(struct rbd_device *rbd_dev) 2945 { 2946 struct gendisk *disk = rbd_dev->disk; 2947 2948 if (!disk) 2949 return; 2950 2951 rbd_dev->disk = NULL; 2952 if (disk->flags & GENHD_FL_UP) { 2953 del_gendisk(disk); 2954 if (disk->queue) 2955 blk_cleanup_queue(disk->queue); 2956 } 2957 put_disk(disk); 2958 } 2959 2960 static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 2961 const char *object_name, 2962 u64 offset, u64 length, void *buf) 2963 2964 { 2965 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2966 struct rbd_obj_request *obj_request; 2967 struct page **pages = NULL; 2968 u32 page_count; 2969 size_t size; 2970 int ret; 2971 2972 page_count = (u32) calc_pages_for(offset, length); 2973 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2974 if (IS_ERR(pages)) 2975 ret = PTR_ERR(pages); 2976 2977 ret = -ENOMEM; 2978 obj_request = rbd_obj_request_create(object_name, offset, length, 2979 OBJ_REQUEST_PAGES); 2980 if (!obj_request) 2981 goto out; 2982 2983 obj_request->pages = pages; 2984 obj_request->page_count = page_count; 2985 2986 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); 2987 if (!obj_request->osd_req) 2988 goto out; 2989 2990 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ, 2991 offset, length, 0, 0); 2992 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, 2993 obj_request->pages, 2994 obj_request->length, 2995 obj_request->offset & ~PAGE_MASK, 2996 false, false); 2997 rbd_osd_req_format_read(obj_request); 2998 2999 ret = rbd_obj_request_submit(osdc, obj_request); 3000 if (ret) 3001 goto out; 3002 ret = rbd_obj_request_wait(obj_request); 3003 if (ret) 3004 goto out; 3005 3006 ret = obj_request->result; 3007 if (ret < 0) 3008 goto out; 3009 3010 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX); 3011 size = (size_t) obj_request->xferred; 3012 ceph_copy_from_page_vector(pages, buf, 0, size); 3013 rbd_assert(size <= (size_t)INT_MAX); 3014 ret = (int)size; 3015 out: 3016 if (obj_request) 3017 rbd_obj_request_put(obj_request); 3018 else 3019 ceph_release_page_vector(pages, page_count); 3020 3021 return ret; 3022 } 3023 3024 /* 3025 * Read the complete header for the given rbd device. 3026 * 3027 * Returns a pointer to a dynamically-allocated buffer containing 3028 * the complete and validated header. Caller can pass the address 3029 * of a variable that will be filled in with the version of the 3030 * header object at the time it was read. 3031 * 3032 * Returns a pointer-coded errno if a failure occurs. 3033 */ 3034 static struct rbd_image_header_ondisk * 3035 rbd_dev_v1_header_read(struct rbd_device *rbd_dev) 3036 { 3037 struct rbd_image_header_ondisk *ondisk = NULL; 3038 u32 snap_count = 0; 3039 u64 names_size = 0; 3040 u32 want_count; 3041 int ret; 3042 3043 /* 3044 * The complete header will include an array of its 64-bit 3045 * snapshot ids, followed by the names of those snapshots as 3046 * a contiguous block of NUL-terminated strings. Note that 3047 * the number of snapshots could change by the time we read 3048 * it in, in which case we re-read it. 3049 */ 3050 do { 3051 size_t size; 3052 3053 kfree(ondisk); 3054 3055 size = sizeof (*ondisk); 3056 size += snap_count * sizeof (struct rbd_image_snap_ondisk); 3057 size += names_size; 3058 ondisk = kmalloc(size, GFP_KERNEL); 3059 if (!ondisk) 3060 return ERR_PTR(-ENOMEM); 3061 3062 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name, 3063 0, size, ondisk); 3064 if (ret < 0) 3065 goto out_err; 3066 if ((size_t)ret < size) { 3067 ret = -ENXIO; 3068 rbd_warn(rbd_dev, "short header read (want %zd got %d)", 3069 size, ret); 3070 goto out_err; 3071 } 3072 if (!rbd_dev_ondisk_valid(ondisk)) { 3073 ret = -ENXIO; 3074 rbd_warn(rbd_dev, "invalid header"); 3075 goto out_err; 3076 } 3077 3078 names_size = le64_to_cpu(ondisk->snap_names_len); 3079 want_count = snap_count; 3080 snap_count = le32_to_cpu(ondisk->snap_count); 3081 } while (snap_count != want_count); 3082 3083 return ondisk; 3084 3085 out_err: 3086 kfree(ondisk); 3087 3088 return ERR_PTR(ret); 3089 } 3090 3091 /* 3092 * reload the ondisk the header 3093 */ 3094 static int rbd_read_header(struct rbd_device *rbd_dev, 3095 struct rbd_image_header *header) 3096 { 3097 struct rbd_image_header_ondisk *ondisk; 3098 int ret; 3099 3100 ondisk = rbd_dev_v1_header_read(rbd_dev); 3101 if (IS_ERR(ondisk)) 3102 return PTR_ERR(ondisk); 3103 ret = rbd_header_from_disk(header, ondisk); 3104 kfree(ondisk); 3105 3106 return ret; 3107 } 3108 3109 static void rbd_update_mapping_size(struct rbd_device *rbd_dev) 3110 { 3111 if (rbd_dev->spec->snap_id != CEPH_NOSNAP) 3112 return; 3113 3114 if (rbd_dev->mapping.size != rbd_dev->header.image_size) { 3115 sector_t size; 3116 3117 rbd_dev->mapping.size = rbd_dev->header.image_size; 3118 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE; 3119 dout("setting size to %llu sectors", (unsigned long long)size); 3120 set_capacity(rbd_dev->disk, size); 3121 } 3122 } 3123 3124 /* 3125 * only read the first part of the ondisk header, without the snaps info 3126 */ 3127 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev) 3128 { 3129 int ret; 3130 struct rbd_image_header h; 3131 3132 ret = rbd_read_header(rbd_dev, &h); 3133 if (ret < 0) 3134 return ret; 3135 3136 down_write(&rbd_dev->header_rwsem); 3137 3138 /* Update image size, and check for resize of mapped image */ 3139 rbd_dev->header.image_size = h.image_size; 3140 rbd_update_mapping_size(rbd_dev); 3141 3142 /* rbd_dev->header.object_prefix shouldn't change */ 3143 kfree(rbd_dev->header.snap_sizes); 3144 kfree(rbd_dev->header.snap_names); 3145 /* osd requests may still refer to snapc */ 3146 ceph_put_snap_context(rbd_dev->header.snapc); 3147 3148 rbd_dev->header.image_size = h.image_size; 3149 rbd_dev->header.snapc = h.snapc; 3150 rbd_dev->header.snap_names = h.snap_names; 3151 rbd_dev->header.snap_sizes = h.snap_sizes; 3152 /* Free the extra copy of the object prefix */ 3153 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix)) 3154 rbd_warn(rbd_dev, "object prefix changed (ignoring)"); 3155 kfree(h.object_prefix); 3156 3157 up_write(&rbd_dev->header_rwsem); 3158 3159 return ret; 3160 } 3161 3162 /* 3163 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to 3164 * has disappeared from the (just updated) snapshot context. 3165 */ 3166 static void rbd_exists_validate(struct rbd_device *rbd_dev) 3167 { 3168 u64 snap_id; 3169 3170 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) 3171 return; 3172 3173 snap_id = rbd_dev->spec->snap_id; 3174 if (snap_id == CEPH_NOSNAP) 3175 return; 3176 3177 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX) 3178 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 3179 } 3180 3181 static int rbd_dev_refresh(struct rbd_device *rbd_dev) 3182 { 3183 u64 image_size; 3184 int ret; 3185 3186 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 3187 image_size = rbd_dev->header.image_size; 3188 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 3189 if (rbd_dev->image_format == 1) 3190 ret = rbd_dev_v1_refresh(rbd_dev); 3191 else 3192 ret = rbd_dev_v2_refresh(rbd_dev); 3193 3194 /* If it's a mapped snapshot, validate its EXISTS flag */ 3195 3196 rbd_exists_validate(rbd_dev); 3197 mutex_unlock(&ctl_mutex); 3198 if (ret) 3199 rbd_warn(rbd_dev, "got notification but failed to " 3200 " update snaps: %d\n", ret); 3201 if (image_size != rbd_dev->header.image_size) 3202 revalidate_disk(rbd_dev->disk); 3203 3204 return ret; 3205 } 3206 3207 static int rbd_init_disk(struct rbd_device *rbd_dev) 3208 { 3209 struct gendisk *disk; 3210 struct request_queue *q; 3211 u64 segment_size; 3212 3213 /* create gendisk info */ 3214 disk = alloc_disk(RBD_MINORS_PER_MAJOR); 3215 if (!disk) 3216 return -ENOMEM; 3217 3218 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 3219 rbd_dev->dev_id); 3220 disk->major = rbd_dev->major; 3221 disk->first_minor = 0; 3222 disk->fops = &rbd_bd_ops; 3223 disk->private_data = rbd_dev; 3224 3225 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); 3226 if (!q) 3227 goto out_disk; 3228 3229 /* We use the default size, but let's be explicit about it. */ 3230 blk_queue_physical_block_size(q, SECTOR_SIZE); 3231 3232 /* set io sizes to object size */ 3233 segment_size = rbd_obj_bytes(&rbd_dev->header); 3234 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); 3235 blk_queue_max_segment_size(q, segment_size); 3236 blk_queue_io_min(q, segment_size); 3237 blk_queue_io_opt(q, segment_size); 3238 3239 blk_queue_merge_bvec(q, rbd_merge_bvec); 3240 disk->queue = q; 3241 3242 q->queuedata = rbd_dev; 3243 3244 rbd_dev->disk = disk; 3245 3246 return 0; 3247 out_disk: 3248 put_disk(disk); 3249 3250 return -ENOMEM; 3251 } 3252 3253 /* 3254 sysfs 3255 */ 3256 3257 static struct rbd_device *dev_to_rbd_dev(struct device *dev) 3258 { 3259 return container_of(dev, struct rbd_device, dev); 3260 } 3261 3262 static ssize_t rbd_size_show(struct device *dev, 3263 struct device_attribute *attr, char *buf) 3264 { 3265 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3266 3267 return sprintf(buf, "%llu\n", 3268 (unsigned long long)rbd_dev->mapping.size); 3269 } 3270 3271 /* 3272 * Note this shows the features for whatever's mapped, which is not 3273 * necessarily the base image. 3274 */ 3275 static ssize_t rbd_features_show(struct device *dev, 3276 struct device_attribute *attr, char *buf) 3277 { 3278 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3279 3280 return sprintf(buf, "0x%016llx\n", 3281 (unsigned long long)rbd_dev->mapping.features); 3282 } 3283 3284 static ssize_t rbd_major_show(struct device *dev, 3285 struct device_attribute *attr, char *buf) 3286 { 3287 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3288 3289 if (rbd_dev->major) 3290 return sprintf(buf, "%d\n", rbd_dev->major); 3291 3292 return sprintf(buf, "(none)\n"); 3293 3294 } 3295 3296 static ssize_t rbd_client_id_show(struct device *dev, 3297 struct device_attribute *attr, char *buf) 3298 { 3299 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3300 3301 return sprintf(buf, "client%lld\n", 3302 ceph_client_id(rbd_dev->rbd_client->client)); 3303 } 3304 3305 static ssize_t rbd_pool_show(struct device *dev, 3306 struct device_attribute *attr, char *buf) 3307 { 3308 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3309 3310 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); 3311 } 3312 3313 static ssize_t rbd_pool_id_show(struct device *dev, 3314 struct device_attribute *attr, char *buf) 3315 { 3316 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3317 3318 return sprintf(buf, "%llu\n", 3319 (unsigned long long) rbd_dev->spec->pool_id); 3320 } 3321 3322 static ssize_t rbd_name_show(struct device *dev, 3323 struct device_attribute *attr, char *buf) 3324 { 3325 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3326 3327 if (rbd_dev->spec->image_name) 3328 return sprintf(buf, "%s\n", rbd_dev->spec->image_name); 3329 3330 return sprintf(buf, "(unknown)\n"); 3331 } 3332 3333 static ssize_t rbd_image_id_show(struct device *dev, 3334 struct device_attribute *attr, char *buf) 3335 { 3336 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3337 3338 return sprintf(buf, "%s\n", rbd_dev->spec->image_id); 3339 } 3340 3341 /* 3342 * Shows the name of the currently-mapped snapshot (or 3343 * RBD_SNAP_HEAD_NAME for the base image). 3344 */ 3345 static ssize_t rbd_snap_show(struct device *dev, 3346 struct device_attribute *attr, 3347 char *buf) 3348 { 3349 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3350 3351 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 3352 } 3353 3354 /* 3355 * For an rbd v2 image, shows the pool id, image id, and snapshot id 3356 * for the parent image. If there is no parent, simply shows 3357 * "(no parent image)". 3358 */ 3359 static ssize_t rbd_parent_show(struct device *dev, 3360 struct device_attribute *attr, 3361 char *buf) 3362 { 3363 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3364 struct rbd_spec *spec = rbd_dev->parent_spec; 3365 int count; 3366 char *bufp = buf; 3367 3368 if (!spec) 3369 return sprintf(buf, "(no parent image)\n"); 3370 3371 count = sprintf(bufp, "pool_id %llu\npool_name %s\n", 3372 (unsigned long long) spec->pool_id, spec->pool_name); 3373 if (count < 0) 3374 return count; 3375 bufp += count; 3376 3377 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id, 3378 spec->image_name ? spec->image_name : "(unknown)"); 3379 if (count < 0) 3380 return count; 3381 bufp += count; 3382 3383 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n", 3384 (unsigned long long) spec->snap_id, spec->snap_name); 3385 if (count < 0) 3386 return count; 3387 bufp += count; 3388 3389 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap); 3390 if (count < 0) 3391 return count; 3392 bufp += count; 3393 3394 return (ssize_t) (bufp - buf); 3395 } 3396 3397 static ssize_t rbd_image_refresh(struct device *dev, 3398 struct device_attribute *attr, 3399 const char *buf, 3400 size_t size) 3401 { 3402 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3403 int ret; 3404 3405 ret = rbd_dev_refresh(rbd_dev); 3406 3407 return ret < 0 ? ret : size; 3408 } 3409 3410 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 3411 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); 3412 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 3413 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 3414 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 3415 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); 3416 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 3417 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 3418 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 3419 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 3420 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); 3421 3422 static struct attribute *rbd_attrs[] = { 3423 &dev_attr_size.attr, 3424 &dev_attr_features.attr, 3425 &dev_attr_major.attr, 3426 &dev_attr_client_id.attr, 3427 &dev_attr_pool.attr, 3428 &dev_attr_pool_id.attr, 3429 &dev_attr_name.attr, 3430 &dev_attr_image_id.attr, 3431 &dev_attr_current_snap.attr, 3432 &dev_attr_parent.attr, 3433 &dev_attr_refresh.attr, 3434 NULL 3435 }; 3436 3437 static struct attribute_group rbd_attr_group = { 3438 .attrs = rbd_attrs, 3439 }; 3440 3441 static const struct attribute_group *rbd_attr_groups[] = { 3442 &rbd_attr_group, 3443 NULL 3444 }; 3445 3446 static void rbd_sysfs_dev_release(struct device *dev) 3447 { 3448 } 3449 3450 static struct device_type rbd_device_type = { 3451 .name = "rbd", 3452 .groups = rbd_attr_groups, 3453 .release = rbd_sysfs_dev_release, 3454 }; 3455 3456 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 3457 { 3458 kref_get(&spec->kref); 3459 3460 return spec; 3461 } 3462 3463 static void rbd_spec_free(struct kref *kref); 3464 static void rbd_spec_put(struct rbd_spec *spec) 3465 { 3466 if (spec) 3467 kref_put(&spec->kref, rbd_spec_free); 3468 } 3469 3470 static struct rbd_spec *rbd_spec_alloc(void) 3471 { 3472 struct rbd_spec *spec; 3473 3474 spec = kzalloc(sizeof (*spec), GFP_KERNEL); 3475 if (!spec) 3476 return NULL; 3477 kref_init(&spec->kref); 3478 3479 return spec; 3480 } 3481 3482 static void rbd_spec_free(struct kref *kref) 3483 { 3484 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); 3485 3486 kfree(spec->pool_name); 3487 kfree(spec->image_id); 3488 kfree(spec->image_name); 3489 kfree(spec->snap_name); 3490 kfree(spec); 3491 } 3492 3493 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 3494 struct rbd_spec *spec) 3495 { 3496 struct rbd_device *rbd_dev; 3497 3498 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); 3499 if (!rbd_dev) 3500 return NULL; 3501 3502 spin_lock_init(&rbd_dev->lock); 3503 rbd_dev->flags = 0; 3504 INIT_LIST_HEAD(&rbd_dev->node); 3505 init_rwsem(&rbd_dev->header_rwsem); 3506 3507 rbd_dev->spec = spec; 3508 rbd_dev->rbd_client = rbdc; 3509 3510 /* Initialize the layout used for all rbd requests */ 3511 3512 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 3513 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1); 3514 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 3515 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id); 3516 3517 return rbd_dev; 3518 } 3519 3520 static void rbd_dev_destroy(struct rbd_device *rbd_dev) 3521 { 3522 rbd_put_client(rbd_dev->rbd_client); 3523 rbd_spec_put(rbd_dev->spec); 3524 kfree(rbd_dev); 3525 } 3526 3527 /* 3528 * Get the size and object order for an image snapshot, or if 3529 * snap_id is CEPH_NOSNAP, gets this information for the base 3530 * image. 3531 */ 3532 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 3533 u8 *order, u64 *snap_size) 3534 { 3535 __le64 snapid = cpu_to_le64(snap_id); 3536 int ret; 3537 struct { 3538 u8 order; 3539 __le64 size; 3540 } __attribute__ ((packed)) size_buf = { 0 }; 3541 3542 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3543 "rbd", "get_size", 3544 &snapid, sizeof (snapid), 3545 &size_buf, sizeof (size_buf)); 3546 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3547 if (ret < 0) 3548 return ret; 3549 if (ret < sizeof (size_buf)) 3550 return -ERANGE; 3551 3552 if (order) 3553 *order = size_buf.order; 3554 *snap_size = le64_to_cpu(size_buf.size); 3555 3556 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n", 3557 (unsigned long long)snap_id, (unsigned int)*order, 3558 (unsigned long long)*snap_size); 3559 3560 return 0; 3561 } 3562 3563 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) 3564 { 3565 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, 3566 &rbd_dev->header.obj_order, 3567 &rbd_dev->header.image_size); 3568 } 3569 3570 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) 3571 { 3572 void *reply_buf; 3573 int ret; 3574 void *p; 3575 3576 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); 3577 if (!reply_buf) 3578 return -ENOMEM; 3579 3580 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3581 "rbd", "get_object_prefix", NULL, 0, 3582 reply_buf, RBD_OBJ_PREFIX_LEN_MAX); 3583 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3584 if (ret < 0) 3585 goto out; 3586 3587 p = reply_buf; 3588 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 3589 p + ret, NULL, GFP_NOIO); 3590 ret = 0; 3591 3592 if (IS_ERR(rbd_dev->header.object_prefix)) { 3593 ret = PTR_ERR(rbd_dev->header.object_prefix); 3594 rbd_dev->header.object_prefix = NULL; 3595 } else { 3596 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 3597 } 3598 out: 3599 kfree(reply_buf); 3600 3601 return ret; 3602 } 3603 3604 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 3605 u64 *snap_features) 3606 { 3607 __le64 snapid = cpu_to_le64(snap_id); 3608 struct { 3609 __le64 features; 3610 __le64 incompat; 3611 } __attribute__ ((packed)) features_buf = { 0 }; 3612 u64 incompat; 3613 int ret; 3614 3615 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3616 "rbd", "get_features", 3617 &snapid, sizeof (snapid), 3618 &features_buf, sizeof (features_buf)); 3619 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3620 if (ret < 0) 3621 return ret; 3622 if (ret < sizeof (features_buf)) 3623 return -ERANGE; 3624 3625 incompat = le64_to_cpu(features_buf.incompat); 3626 if (incompat & ~RBD_FEATURES_SUPPORTED) 3627 return -ENXIO; 3628 3629 *snap_features = le64_to_cpu(features_buf.features); 3630 3631 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 3632 (unsigned long long)snap_id, 3633 (unsigned long long)*snap_features, 3634 (unsigned long long)le64_to_cpu(features_buf.incompat)); 3635 3636 return 0; 3637 } 3638 3639 static int rbd_dev_v2_features(struct rbd_device *rbd_dev) 3640 { 3641 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 3642 &rbd_dev->header.features); 3643 } 3644 3645 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) 3646 { 3647 struct rbd_spec *parent_spec; 3648 size_t size; 3649 void *reply_buf = NULL; 3650 __le64 snapid; 3651 void *p; 3652 void *end; 3653 char *image_id; 3654 u64 overlap; 3655 int ret; 3656 3657 parent_spec = rbd_spec_alloc(); 3658 if (!parent_spec) 3659 return -ENOMEM; 3660 3661 size = sizeof (__le64) + /* pool_id */ 3662 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */ 3663 sizeof (__le64) + /* snap_id */ 3664 sizeof (__le64); /* overlap */ 3665 reply_buf = kmalloc(size, GFP_KERNEL); 3666 if (!reply_buf) { 3667 ret = -ENOMEM; 3668 goto out_err; 3669 } 3670 3671 snapid = cpu_to_le64(CEPH_NOSNAP); 3672 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3673 "rbd", "get_parent", 3674 &snapid, sizeof (snapid), 3675 reply_buf, size); 3676 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3677 if (ret < 0) 3678 goto out_err; 3679 3680 p = reply_buf; 3681 end = reply_buf + ret; 3682 ret = -ERANGE; 3683 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err); 3684 if (parent_spec->pool_id == CEPH_NOPOOL) 3685 goto out; /* No parent? No problem. */ 3686 3687 /* The ceph file layout needs to fit pool id in 32 bits */ 3688 3689 ret = -EIO; 3690 if (parent_spec->pool_id > (u64)U32_MAX) { 3691 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n", 3692 (unsigned long long)parent_spec->pool_id, U32_MAX); 3693 goto out_err; 3694 } 3695 3696 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 3697 if (IS_ERR(image_id)) { 3698 ret = PTR_ERR(image_id); 3699 goto out_err; 3700 } 3701 parent_spec->image_id = image_id; 3702 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); 3703 ceph_decode_64_safe(&p, end, overlap, out_err); 3704 3705 rbd_dev->parent_overlap = overlap; 3706 rbd_dev->parent_spec = parent_spec; 3707 parent_spec = NULL; /* rbd_dev now owns this */ 3708 out: 3709 ret = 0; 3710 out_err: 3711 kfree(reply_buf); 3712 rbd_spec_put(parent_spec); 3713 3714 return ret; 3715 } 3716 3717 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) 3718 { 3719 struct { 3720 __le64 stripe_unit; 3721 __le64 stripe_count; 3722 } __attribute__ ((packed)) striping_info_buf = { 0 }; 3723 size_t size = sizeof (striping_info_buf); 3724 void *p; 3725 u64 obj_size; 3726 u64 stripe_unit; 3727 u64 stripe_count; 3728 int ret; 3729 3730 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3731 "rbd", "get_stripe_unit_count", NULL, 0, 3732 (char *)&striping_info_buf, size); 3733 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3734 if (ret < 0) 3735 return ret; 3736 if (ret < size) 3737 return -ERANGE; 3738 3739 /* 3740 * We don't actually support the "fancy striping" feature 3741 * (STRIPINGV2) yet, but if the striping sizes are the 3742 * defaults the behavior is the same as before. So find 3743 * out, and only fail if the image has non-default values. 3744 */ 3745 ret = -EINVAL; 3746 obj_size = (u64)1 << rbd_dev->header.obj_order; 3747 p = &striping_info_buf; 3748 stripe_unit = ceph_decode_64(&p); 3749 if (stripe_unit != obj_size) { 3750 rbd_warn(rbd_dev, "unsupported stripe unit " 3751 "(got %llu want %llu)", 3752 stripe_unit, obj_size); 3753 return -EINVAL; 3754 } 3755 stripe_count = ceph_decode_64(&p); 3756 if (stripe_count != 1) { 3757 rbd_warn(rbd_dev, "unsupported stripe count " 3758 "(got %llu want 1)", stripe_count); 3759 return -EINVAL; 3760 } 3761 rbd_dev->header.stripe_unit = stripe_unit; 3762 rbd_dev->header.stripe_count = stripe_count; 3763 3764 return 0; 3765 } 3766 3767 static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 3768 { 3769 size_t image_id_size; 3770 char *image_id; 3771 void *p; 3772 void *end; 3773 size_t size; 3774 void *reply_buf = NULL; 3775 size_t len = 0; 3776 char *image_name = NULL; 3777 int ret; 3778 3779 rbd_assert(!rbd_dev->spec->image_name); 3780 3781 len = strlen(rbd_dev->spec->image_id); 3782 image_id_size = sizeof (__le32) + len; 3783 image_id = kmalloc(image_id_size, GFP_KERNEL); 3784 if (!image_id) 3785 return NULL; 3786 3787 p = image_id; 3788 end = image_id + image_id_size; 3789 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len); 3790 3791 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 3792 reply_buf = kmalloc(size, GFP_KERNEL); 3793 if (!reply_buf) 3794 goto out; 3795 3796 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY, 3797 "rbd", "dir_get_name", 3798 image_id, image_id_size, 3799 reply_buf, size); 3800 if (ret < 0) 3801 goto out; 3802 p = reply_buf; 3803 end = reply_buf + ret; 3804 3805 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 3806 if (IS_ERR(image_name)) 3807 image_name = NULL; 3808 else 3809 dout("%s: name is %s len is %zd\n", __func__, image_name, len); 3810 out: 3811 kfree(reply_buf); 3812 kfree(image_id); 3813 3814 return image_name; 3815 } 3816 3817 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 3818 { 3819 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 3820 const char *snap_name; 3821 u32 which = 0; 3822 3823 /* Skip over names until we find the one we are looking for */ 3824 3825 snap_name = rbd_dev->header.snap_names; 3826 while (which < snapc->num_snaps) { 3827 if (!strcmp(name, snap_name)) 3828 return snapc->snaps[which]; 3829 snap_name += strlen(snap_name) + 1; 3830 which++; 3831 } 3832 return CEPH_NOSNAP; 3833 } 3834 3835 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 3836 { 3837 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 3838 u32 which; 3839 bool found = false; 3840 u64 snap_id; 3841 3842 for (which = 0; !found && which < snapc->num_snaps; which++) { 3843 const char *snap_name; 3844 3845 snap_id = snapc->snaps[which]; 3846 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id); 3847 if (IS_ERR(snap_name)) 3848 break; 3849 found = !strcmp(name, snap_name); 3850 kfree(snap_name); 3851 } 3852 return found ? snap_id : CEPH_NOSNAP; 3853 } 3854 3855 /* 3856 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if 3857 * no snapshot by that name is found, or if an error occurs. 3858 */ 3859 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 3860 { 3861 if (rbd_dev->image_format == 1) 3862 return rbd_v1_snap_id_by_name(rbd_dev, name); 3863 3864 return rbd_v2_snap_id_by_name(rbd_dev, name); 3865 } 3866 3867 /* 3868 * When an rbd image has a parent image, it is identified by the 3869 * pool, image, and snapshot ids (not names). This function fills 3870 * in the names for those ids. (It's OK if we can't figure out the 3871 * name for an image id, but the pool and snapshot ids should always 3872 * exist and have names.) All names in an rbd spec are dynamically 3873 * allocated. 3874 * 3875 * When an image being mapped (not a parent) is probed, we have the 3876 * pool name and pool id, image name and image id, and the snapshot 3877 * name. The only thing we're missing is the snapshot id. 3878 */ 3879 static int rbd_dev_spec_update(struct rbd_device *rbd_dev) 3880 { 3881 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3882 struct rbd_spec *spec = rbd_dev->spec; 3883 const char *pool_name; 3884 const char *image_name; 3885 const char *snap_name; 3886 int ret; 3887 3888 /* 3889 * An image being mapped will have the pool name (etc.), but 3890 * we need to look up the snapshot id. 3891 */ 3892 if (spec->pool_name) { 3893 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) { 3894 u64 snap_id; 3895 3896 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name); 3897 if (snap_id == CEPH_NOSNAP) 3898 return -ENOENT; 3899 spec->snap_id = snap_id; 3900 } else { 3901 spec->snap_id = CEPH_NOSNAP; 3902 } 3903 3904 return 0; 3905 } 3906 3907 /* Get the pool name; we have to make our own copy of this */ 3908 3909 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id); 3910 if (!pool_name) { 3911 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id); 3912 return -EIO; 3913 } 3914 pool_name = kstrdup(pool_name, GFP_KERNEL); 3915 if (!pool_name) 3916 return -ENOMEM; 3917 3918 /* Fetch the image name; tolerate failure here */ 3919 3920 image_name = rbd_dev_image_name(rbd_dev); 3921 if (!image_name) 3922 rbd_warn(rbd_dev, "unable to get image name"); 3923 3924 /* Look up the snapshot name, and make a copy */ 3925 3926 snap_name = rbd_snap_name(rbd_dev, spec->snap_id); 3927 if (!snap_name) { 3928 ret = -ENOMEM; 3929 goto out_err; 3930 } 3931 3932 spec->pool_name = pool_name; 3933 spec->image_name = image_name; 3934 spec->snap_name = snap_name; 3935 3936 return 0; 3937 out_err: 3938 kfree(image_name); 3939 kfree(pool_name); 3940 3941 return ret; 3942 } 3943 3944 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) 3945 { 3946 size_t size; 3947 int ret; 3948 void *reply_buf; 3949 void *p; 3950 void *end; 3951 u64 seq; 3952 u32 snap_count; 3953 struct ceph_snap_context *snapc; 3954 u32 i; 3955 3956 /* 3957 * We'll need room for the seq value (maximum snapshot id), 3958 * snapshot count, and array of that many snapshot ids. 3959 * For now we have a fixed upper limit on the number we're 3960 * prepared to receive. 3961 */ 3962 size = sizeof (__le64) + sizeof (__le32) + 3963 RBD_MAX_SNAP_COUNT * sizeof (__le64); 3964 reply_buf = kzalloc(size, GFP_KERNEL); 3965 if (!reply_buf) 3966 return -ENOMEM; 3967 3968 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3969 "rbd", "get_snapcontext", NULL, 0, 3970 reply_buf, size); 3971 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3972 if (ret < 0) 3973 goto out; 3974 3975 p = reply_buf; 3976 end = reply_buf + ret; 3977 ret = -ERANGE; 3978 ceph_decode_64_safe(&p, end, seq, out); 3979 ceph_decode_32_safe(&p, end, snap_count, out); 3980 3981 /* 3982 * Make sure the reported number of snapshot ids wouldn't go 3983 * beyond the end of our buffer. But before checking that, 3984 * make sure the computed size of the snapshot context we 3985 * allocate is representable in a size_t. 3986 */ 3987 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) 3988 / sizeof (u64)) { 3989 ret = -EINVAL; 3990 goto out; 3991 } 3992 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 3993 goto out; 3994 ret = 0; 3995 3996 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 3997 if (!snapc) { 3998 ret = -ENOMEM; 3999 goto out; 4000 } 4001 snapc->seq = seq; 4002 for (i = 0; i < snap_count; i++) 4003 snapc->snaps[i] = ceph_decode_64(&p); 4004 4005 rbd_dev->header.snapc = snapc; 4006 4007 dout(" snap context seq = %llu, snap_count = %u\n", 4008 (unsigned long long)seq, (unsigned int)snap_count); 4009 out: 4010 kfree(reply_buf); 4011 4012 return ret; 4013 } 4014 4015 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 4016 u64 snap_id) 4017 { 4018 size_t size; 4019 void *reply_buf; 4020 __le64 snapid; 4021 int ret; 4022 void *p; 4023 void *end; 4024 char *snap_name; 4025 4026 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 4027 reply_buf = kmalloc(size, GFP_KERNEL); 4028 if (!reply_buf) 4029 return ERR_PTR(-ENOMEM); 4030 4031 snapid = cpu_to_le64(snap_id); 4032 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 4033 "rbd", "get_snapshot_name", 4034 &snapid, sizeof (snapid), 4035 reply_buf, size); 4036 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4037 if (ret < 0) { 4038 snap_name = ERR_PTR(ret); 4039 goto out; 4040 } 4041 4042 p = reply_buf; 4043 end = reply_buf + ret; 4044 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 4045 if (IS_ERR(snap_name)) 4046 goto out; 4047 4048 dout(" snap_id 0x%016llx snap_name = %s\n", 4049 (unsigned long long)snap_id, snap_name); 4050 out: 4051 kfree(reply_buf); 4052 4053 return snap_name; 4054 } 4055 4056 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev) 4057 { 4058 int ret; 4059 4060 down_write(&rbd_dev->header_rwsem); 4061 4062 ret = rbd_dev_v2_image_size(rbd_dev); 4063 if (ret) 4064 goto out; 4065 rbd_update_mapping_size(rbd_dev); 4066 4067 ret = rbd_dev_v2_snap_context(rbd_dev); 4068 dout("rbd_dev_v2_snap_context returned %d\n", ret); 4069 if (ret) 4070 goto out; 4071 out: 4072 up_write(&rbd_dev->header_rwsem); 4073 4074 return ret; 4075 } 4076 4077 static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 4078 { 4079 struct device *dev; 4080 int ret; 4081 4082 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 4083 4084 dev = &rbd_dev->dev; 4085 dev->bus = &rbd_bus_type; 4086 dev->type = &rbd_device_type; 4087 dev->parent = &rbd_root_dev; 4088 dev->release = rbd_dev_device_release; 4089 dev_set_name(dev, "%d", rbd_dev->dev_id); 4090 ret = device_register(dev); 4091 4092 mutex_unlock(&ctl_mutex); 4093 4094 return ret; 4095 } 4096 4097 static void rbd_bus_del_dev(struct rbd_device *rbd_dev) 4098 { 4099 device_unregister(&rbd_dev->dev); 4100 } 4101 4102 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); 4103 4104 /* 4105 * Get a unique rbd identifier for the given new rbd_dev, and add 4106 * the rbd_dev to the global list. The minimum rbd id is 1. 4107 */ 4108 static void rbd_dev_id_get(struct rbd_device *rbd_dev) 4109 { 4110 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max); 4111 4112 spin_lock(&rbd_dev_list_lock); 4113 list_add_tail(&rbd_dev->node, &rbd_dev_list); 4114 spin_unlock(&rbd_dev_list_lock); 4115 dout("rbd_dev %p given dev id %llu\n", rbd_dev, 4116 (unsigned long long) rbd_dev->dev_id); 4117 } 4118 4119 /* 4120 * Remove an rbd_dev from the global list, and record that its 4121 * identifier is no longer in use. 4122 */ 4123 static void rbd_dev_id_put(struct rbd_device *rbd_dev) 4124 { 4125 struct list_head *tmp; 4126 int rbd_id = rbd_dev->dev_id; 4127 int max_id; 4128 4129 rbd_assert(rbd_id > 0); 4130 4131 dout("rbd_dev %p released dev id %llu\n", rbd_dev, 4132 (unsigned long long) rbd_dev->dev_id); 4133 spin_lock(&rbd_dev_list_lock); 4134 list_del_init(&rbd_dev->node); 4135 4136 /* 4137 * If the id being "put" is not the current maximum, there 4138 * is nothing special we need to do. 4139 */ 4140 if (rbd_id != atomic64_read(&rbd_dev_id_max)) { 4141 spin_unlock(&rbd_dev_list_lock); 4142 return; 4143 } 4144 4145 /* 4146 * We need to update the current maximum id. Search the 4147 * list to find out what it is. We're more likely to find 4148 * the maximum at the end, so search the list backward. 4149 */ 4150 max_id = 0; 4151 list_for_each_prev(tmp, &rbd_dev_list) { 4152 struct rbd_device *rbd_dev; 4153 4154 rbd_dev = list_entry(tmp, struct rbd_device, node); 4155 if (rbd_dev->dev_id > max_id) 4156 max_id = rbd_dev->dev_id; 4157 } 4158 spin_unlock(&rbd_dev_list_lock); 4159 4160 /* 4161 * The max id could have been updated by rbd_dev_id_get(), in 4162 * which case it now accurately reflects the new maximum. 4163 * Be careful not to overwrite the maximum value in that 4164 * case. 4165 */ 4166 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id); 4167 dout(" max dev id has been reset\n"); 4168 } 4169 4170 /* 4171 * Skips over white space at *buf, and updates *buf to point to the 4172 * first found non-space character (if any). Returns the length of 4173 * the token (string of non-white space characters) found. Note 4174 * that *buf must be terminated with '\0'. 4175 */ 4176 static inline size_t next_token(const char **buf) 4177 { 4178 /* 4179 * These are the characters that produce nonzero for 4180 * isspace() in the "C" and "POSIX" locales. 4181 */ 4182 const char *spaces = " \f\n\r\t\v"; 4183 4184 *buf += strspn(*buf, spaces); /* Find start of token */ 4185 4186 return strcspn(*buf, spaces); /* Return token length */ 4187 } 4188 4189 /* 4190 * Finds the next token in *buf, and if the provided token buffer is 4191 * big enough, copies the found token into it. The result, if 4192 * copied, is guaranteed to be terminated with '\0'. Note that *buf 4193 * must be terminated with '\0' on entry. 4194 * 4195 * Returns the length of the token found (not including the '\0'). 4196 * Return value will be 0 if no token is found, and it will be >= 4197 * token_size if the token would not fit. 4198 * 4199 * The *buf pointer will be updated to point beyond the end of the 4200 * found token. Note that this occurs even if the token buffer is 4201 * too small to hold it. 4202 */ 4203 static inline size_t copy_token(const char **buf, 4204 char *token, 4205 size_t token_size) 4206 { 4207 size_t len; 4208 4209 len = next_token(buf); 4210 if (len < token_size) { 4211 memcpy(token, *buf, len); 4212 *(token + len) = '\0'; 4213 } 4214 *buf += len; 4215 4216 return len; 4217 } 4218 4219 /* 4220 * Finds the next token in *buf, dynamically allocates a buffer big 4221 * enough to hold a copy of it, and copies the token into the new 4222 * buffer. The copy is guaranteed to be terminated with '\0'. Note 4223 * that a duplicate buffer is created even for a zero-length token. 4224 * 4225 * Returns a pointer to the newly-allocated duplicate, or a null 4226 * pointer if memory for the duplicate was not available. If 4227 * the lenp argument is a non-null pointer, the length of the token 4228 * (not including the '\0') is returned in *lenp. 4229 * 4230 * If successful, the *buf pointer will be updated to point beyond 4231 * the end of the found token. 4232 * 4233 * Note: uses GFP_KERNEL for allocation. 4234 */ 4235 static inline char *dup_token(const char **buf, size_t *lenp) 4236 { 4237 char *dup; 4238 size_t len; 4239 4240 len = next_token(buf); 4241 dup = kmemdup(*buf, len + 1, GFP_KERNEL); 4242 if (!dup) 4243 return NULL; 4244 *(dup + len) = '\0'; 4245 *buf += len; 4246 4247 if (lenp) 4248 *lenp = len; 4249 4250 return dup; 4251 } 4252 4253 /* 4254 * Parse the options provided for an "rbd add" (i.e., rbd image 4255 * mapping) request. These arrive via a write to /sys/bus/rbd/add, 4256 * and the data written is passed here via a NUL-terminated buffer. 4257 * Returns 0 if successful or an error code otherwise. 4258 * 4259 * The information extracted from these options is recorded in 4260 * the other parameters which return dynamically-allocated 4261 * structures: 4262 * ceph_opts 4263 * The address of a pointer that will refer to a ceph options 4264 * structure. Caller must release the returned pointer using 4265 * ceph_destroy_options() when it is no longer needed. 4266 * rbd_opts 4267 * Address of an rbd options pointer. Fully initialized by 4268 * this function; caller must release with kfree(). 4269 * spec 4270 * Address of an rbd image specification pointer. Fully 4271 * initialized by this function based on parsed options. 4272 * Caller must release with rbd_spec_put(). 4273 * 4274 * The options passed take this form: 4275 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] 4276 * where: 4277 * <mon_addrs> 4278 * A comma-separated list of one or more monitor addresses. 4279 * A monitor address is an ip address, optionally followed 4280 * by a port number (separated by a colon). 4281 * I.e.: ip1[:port1][,ip2[:port2]...] 4282 * <options> 4283 * A comma-separated list of ceph and/or rbd options. 4284 * <pool_name> 4285 * The name of the rados pool containing the rbd image. 4286 * <image_name> 4287 * The name of the image in that pool to map. 4288 * <snap_id> 4289 * An optional snapshot id. If provided, the mapping will 4290 * present data from the image at the time that snapshot was 4291 * created. The image head is used if no snapshot id is 4292 * provided. Snapshot mappings are always read-only. 4293 */ 4294 static int rbd_add_parse_args(const char *buf, 4295 struct ceph_options **ceph_opts, 4296 struct rbd_options **opts, 4297 struct rbd_spec **rbd_spec) 4298 { 4299 size_t len; 4300 char *options; 4301 const char *mon_addrs; 4302 char *snap_name; 4303 size_t mon_addrs_size; 4304 struct rbd_spec *spec = NULL; 4305 struct rbd_options *rbd_opts = NULL; 4306 struct ceph_options *copts; 4307 int ret; 4308 4309 /* The first four tokens are required */ 4310 4311 len = next_token(&buf); 4312 if (!len) { 4313 rbd_warn(NULL, "no monitor address(es) provided"); 4314 return -EINVAL; 4315 } 4316 mon_addrs = buf; 4317 mon_addrs_size = len + 1; 4318 buf += len; 4319 4320 ret = -EINVAL; 4321 options = dup_token(&buf, NULL); 4322 if (!options) 4323 return -ENOMEM; 4324 if (!*options) { 4325 rbd_warn(NULL, "no options provided"); 4326 goto out_err; 4327 } 4328 4329 spec = rbd_spec_alloc(); 4330 if (!spec) 4331 goto out_mem; 4332 4333 spec->pool_name = dup_token(&buf, NULL); 4334 if (!spec->pool_name) 4335 goto out_mem; 4336 if (!*spec->pool_name) { 4337 rbd_warn(NULL, "no pool name provided"); 4338 goto out_err; 4339 } 4340 4341 spec->image_name = dup_token(&buf, NULL); 4342 if (!spec->image_name) 4343 goto out_mem; 4344 if (!*spec->image_name) { 4345 rbd_warn(NULL, "no image name provided"); 4346 goto out_err; 4347 } 4348 4349 /* 4350 * Snapshot name is optional; default is to use "-" 4351 * (indicating the head/no snapshot). 4352 */ 4353 len = next_token(&buf); 4354 if (!len) { 4355 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 4356 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 4357 } else if (len > RBD_MAX_SNAP_NAME_LEN) { 4358 ret = -ENAMETOOLONG; 4359 goto out_err; 4360 } 4361 snap_name = kmemdup(buf, len + 1, GFP_KERNEL); 4362 if (!snap_name) 4363 goto out_mem; 4364 *(snap_name + len) = '\0'; 4365 spec->snap_name = snap_name; 4366 4367 /* Initialize all rbd options to the defaults */ 4368 4369 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL); 4370 if (!rbd_opts) 4371 goto out_mem; 4372 4373 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 4374 4375 copts = ceph_parse_options(options, mon_addrs, 4376 mon_addrs + mon_addrs_size - 1, 4377 parse_rbd_opts_token, rbd_opts); 4378 if (IS_ERR(copts)) { 4379 ret = PTR_ERR(copts); 4380 goto out_err; 4381 } 4382 kfree(options); 4383 4384 *ceph_opts = copts; 4385 *opts = rbd_opts; 4386 *rbd_spec = spec; 4387 4388 return 0; 4389 out_mem: 4390 ret = -ENOMEM; 4391 out_err: 4392 kfree(rbd_opts); 4393 rbd_spec_put(spec); 4394 kfree(options); 4395 4396 return ret; 4397 } 4398 4399 /* 4400 * An rbd format 2 image has a unique identifier, distinct from the 4401 * name given to it by the user. Internally, that identifier is 4402 * what's used to specify the names of objects related to the image. 4403 * 4404 * A special "rbd id" object is used to map an rbd image name to its 4405 * id. If that object doesn't exist, then there is no v2 rbd image 4406 * with the supplied name. 4407 * 4408 * This function will record the given rbd_dev's image_id field if 4409 * it can be determined, and in that case will return 0. If any 4410 * errors occur a negative errno will be returned and the rbd_dev's 4411 * image_id field will be unchanged (and should be NULL). 4412 */ 4413 static int rbd_dev_image_id(struct rbd_device *rbd_dev) 4414 { 4415 int ret; 4416 size_t size; 4417 char *object_name; 4418 void *response; 4419 char *image_id; 4420 4421 /* 4422 * When probing a parent image, the image id is already 4423 * known (and the image name likely is not). There's no 4424 * need to fetch the image id again in this case. We 4425 * do still need to set the image format though. 4426 */ 4427 if (rbd_dev->spec->image_id) { 4428 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1; 4429 4430 return 0; 4431 } 4432 4433 /* 4434 * First, see if the format 2 image id file exists, and if 4435 * so, get the image's persistent id from it. 4436 */ 4437 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name); 4438 object_name = kmalloc(size, GFP_NOIO); 4439 if (!object_name) 4440 return -ENOMEM; 4441 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name); 4442 dout("rbd id object name is %s\n", object_name); 4443 4444 /* Response will be an encoded string, which includes a length */ 4445 4446 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; 4447 response = kzalloc(size, GFP_NOIO); 4448 if (!response) { 4449 ret = -ENOMEM; 4450 goto out; 4451 } 4452 4453 /* If it doesn't exist we'll assume it's a format 1 image */ 4454 4455 ret = rbd_obj_method_sync(rbd_dev, object_name, 4456 "rbd", "get_id", NULL, 0, 4457 response, RBD_IMAGE_ID_LEN_MAX); 4458 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4459 if (ret == -ENOENT) { 4460 image_id = kstrdup("", GFP_KERNEL); 4461 ret = image_id ? 0 : -ENOMEM; 4462 if (!ret) 4463 rbd_dev->image_format = 1; 4464 } else if (ret > sizeof (__le32)) { 4465 void *p = response; 4466 4467 image_id = ceph_extract_encoded_string(&p, p + ret, 4468 NULL, GFP_NOIO); 4469 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0; 4470 if (!ret) 4471 rbd_dev->image_format = 2; 4472 } else { 4473 ret = -EINVAL; 4474 } 4475 4476 if (!ret) { 4477 rbd_dev->spec->image_id = image_id; 4478 dout("image_id is %s\n", image_id); 4479 } 4480 out: 4481 kfree(response); 4482 kfree(object_name); 4483 4484 return ret; 4485 } 4486 4487 /* Undo whatever state changes are made by v1 or v2 image probe */ 4488 4489 static void rbd_dev_unprobe(struct rbd_device *rbd_dev) 4490 { 4491 struct rbd_image_header *header; 4492 4493 rbd_dev_remove_parent(rbd_dev); 4494 rbd_spec_put(rbd_dev->parent_spec); 4495 rbd_dev->parent_spec = NULL; 4496 rbd_dev->parent_overlap = 0; 4497 4498 /* Free dynamic fields from the header, then zero it out */ 4499 4500 header = &rbd_dev->header; 4501 ceph_put_snap_context(header->snapc); 4502 kfree(header->snap_sizes); 4503 kfree(header->snap_names); 4504 kfree(header->object_prefix); 4505 memset(header, 0, sizeof (*header)); 4506 } 4507 4508 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) 4509 { 4510 int ret; 4511 4512 /* Populate rbd image metadata */ 4513 4514 ret = rbd_read_header(rbd_dev, &rbd_dev->header); 4515 if (ret < 0) 4516 goto out_err; 4517 4518 /* Version 1 images have no parent (no layering) */ 4519 4520 rbd_dev->parent_spec = NULL; 4521 rbd_dev->parent_overlap = 0; 4522 4523 dout("discovered version 1 image, header name is %s\n", 4524 rbd_dev->header_name); 4525 4526 return 0; 4527 4528 out_err: 4529 kfree(rbd_dev->header_name); 4530 rbd_dev->header_name = NULL; 4531 kfree(rbd_dev->spec->image_id); 4532 rbd_dev->spec->image_id = NULL; 4533 4534 return ret; 4535 } 4536 4537 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) 4538 { 4539 int ret; 4540 4541 ret = rbd_dev_v2_image_size(rbd_dev); 4542 if (ret) 4543 goto out_err; 4544 4545 /* Get the object prefix (a.k.a. block_name) for the image */ 4546 4547 ret = rbd_dev_v2_object_prefix(rbd_dev); 4548 if (ret) 4549 goto out_err; 4550 4551 /* Get the and check features for the image */ 4552 4553 ret = rbd_dev_v2_features(rbd_dev); 4554 if (ret) 4555 goto out_err; 4556 4557 /* If the image supports layering, get the parent info */ 4558 4559 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 4560 ret = rbd_dev_v2_parent_info(rbd_dev); 4561 if (ret) 4562 goto out_err; 4563 4564 /* 4565 * Don't print a warning for parent images. We can 4566 * tell this point because we won't know its pool 4567 * name yet (just its pool id). 4568 */ 4569 if (rbd_dev->spec->pool_name) 4570 rbd_warn(rbd_dev, "WARNING: kernel layering " 4571 "is EXPERIMENTAL!"); 4572 } 4573 4574 /* If the image supports fancy striping, get its parameters */ 4575 4576 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { 4577 ret = rbd_dev_v2_striping_info(rbd_dev); 4578 if (ret < 0) 4579 goto out_err; 4580 } 4581 4582 /* crypto and compression type aren't (yet) supported for v2 images */ 4583 4584 rbd_dev->header.crypt_type = 0; 4585 rbd_dev->header.comp_type = 0; 4586 4587 /* Get the snapshot context, plus the header version */ 4588 4589 ret = rbd_dev_v2_snap_context(rbd_dev); 4590 if (ret) 4591 goto out_err; 4592 4593 dout("discovered version 2 image, header name is %s\n", 4594 rbd_dev->header_name); 4595 4596 return 0; 4597 out_err: 4598 rbd_dev->parent_overlap = 0; 4599 rbd_spec_put(rbd_dev->parent_spec); 4600 rbd_dev->parent_spec = NULL; 4601 kfree(rbd_dev->header_name); 4602 rbd_dev->header_name = NULL; 4603 kfree(rbd_dev->header.object_prefix); 4604 rbd_dev->header.object_prefix = NULL; 4605 4606 return ret; 4607 } 4608 4609 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev) 4610 { 4611 struct rbd_device *parent = NULL; 4612 struct rbd_spec *parent_spec; 4613 struct rbd_client *rbdc; 4614 int ret; 4615 4616 if (!rbd_dev->parent_spec) 4617 return 0; 4618 /* 4619 * We need to pass a reference to the client and the parent 4620 * spec when creating the parent rbd_dev. Images related by 4621 * parent/child relationships always share both. 4622 */ 4623 parent_spec = rbd_spec_get(rbd_dev->parent_spec); 4624 rbdc = __rbd_get_client(rbd_dev->rbd_client); 4625 4626 ret = -ENOMEM; 4627 parent = rbd_dev_create(rbdc, parent_spec); 4628 if (!parent) 4629 goto out_err; 4630 4631 ret = rbd_dev_image_probe(parent); 4632 if (ret < 0) 4633 goto out_err; 4634 rbd_dev->parent = parent; 4635 4636 return 0; 4637 out_err: 4638 if (parent) { 4639 rbd_spec_put(rbd_dev->parent_spec); 4640 kfree(rbd_dev->header_name); 4641 rbd_dev_destroy(parent); 4642 } else { 4643 rbd_put_client(rbdc); 4644 rbd_spec_put(parent_spec); 4645 } 4646 4647 return ret; 4648 } 4649 4650 static int rbd_dev_device_setup(struct rbd_device *rbd_dev) 4651 { 4652 int ret; 4653 4654 ret = rbd_dev_mapping_set(rbd_dev); 4655 if (ret) 4656 return ret; 4657 4658 /* generate unique id: find highest unique id, add one */ 4659 rbd_dev_id_get(rbd_dev); 4660 4661 /* Fill in the device name, now that we have its id. */ 4662 BUILD_BUG_ON(DEV_NAME_LEN 4663 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); 4664 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); 4665 4666 /* Get our block major device number. */ 4667 4668 ret = register_blkdev(0, rbd_dev->name); 4669 if (ret < 0) 4670 goto err_out_id; 4671 rbd_dev->major = ret; 4672 4673 /* Set up the blkdev mapping. */ 4674 4675 ret = rbd_init_disk(rbd_dev); 4676 if (ret) 4677 goto err_out_blkdev; 4678 4679 ret = rbd_bus_add_dev(rbd_dev); 4680 if (ret) 4681 goto err_out_disk; 4682 4683 /* Everything's ready. Announce the disk to the world. */ 4684 4685 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 4686 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 4687 add_disk(rbd_dev->disk); 4688 4689 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 4690 (unsigned long long) rbd_dev->mapping.size); 4691 4692 return ret; 4693 4694 err_out_disk: 4695 rbd_free_disk(rbd_dev); 4696 err_out_blkdev: 4697 unregister_blkdev(rbd_dev->major, rbd_dev->name); 4698 err_out_id: 4699 rbd_dev_id_put(rbd_dev); 4700 rbd_dev_mapping_clear(rbd_dev); 4701 4702 return ret; 4703 } 4704 4705 static int rbd_dev_header_name(struct rbd_device *rbd_dev) 4706 { 4707 struct rbd_spec *spec = rbd_dev->spec; 4708 size_t size; 4709 4710 /* Record the header object name for this rbd image. */ 4711 4712 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 4713 4714 if (rbd_dev->image_format == 1) 4715 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX); 4716 else 4717 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id); 4718 4719 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 4720 if (!rbd_dev->header_name) 4721 return -ENOMEM; 4722 4723 if (rbd_dev->image_format == 1) 4724 sprintf(rbd_dev->header_name, "%s%s", 4725 spec->image_name, RBD_SUFFIX); 4726 else 4727 sprintf(rbd_dev->header_name, "%s%s", 4728 RBD_HEADER_PREFIX, spec->image_id); 4729 return 0; 4730 } 4731 4732 static void rbd_dev_image_release(struct rbd_device *rbd_dev) 4733 { 4734 int ret; 4735 4736 rbd_dev_unprobe(rbd_dev); 4737 ret = rbd_dev_header_watch_sync(rbd_dev, 0); 4738 if (ret) 4739 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret); 4740 kfree(rbd_dev->header_name); 4741 rbd_dev->header_name = NULL; 4742 rbd_dev->image_format = 0; 4743 kfree(rbd_dev->spec->image_id); 4744 rbd_dev->spec->image_id = NULL; 4745 4746 rbd_dev_destroy(rbd_dev); 4747 } 4748 4749 /* 4750 * Probe for the existence of the header object for the given rbd 4751 * device. For format 2 images this includes determining the image 4752 * id. 4753 */ 4754 static int rbd_dev_image_probe(struct rbd_device *rbd_dev) 4755 { 4756 int ret; 4757 int tmp; 4758 4759 /* 4760 * Get the id from the image id object. If it's not a 4761 * format 2 image, we'll get ENOENT back, and we'll assume 4762 * it's a format 1 image. 4763 */ 4764 ret = rbd_dev_image_id(rbd_dev); 4765 if (ret) 4766 return ret; 4767 rbd_assert(rbd_dev->spec->image_id); 4768 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 4769 4770 ret = rbd_dev_header_name(rbd_dev); 4771 if (ret) 4772 goto err_out_format; 4773 4774 ret = rbd_dev_header_watch_sync(rbd_dev, 1); 4775 if (ret) 4776 goto out_header_name; 4777 4778 if (rbd_dev->image_format == 1) 4779 ret = rbd_dev_v1_probe(rbd_dev); 4780 else 4781 ret = rbd_dev_v2_probe(rbd_dev); 4782 if (ret) 4783 goto err_out_watch; 4784 4785 ret = rbd_dev_spec_update(rbd_dev); 4786 if (ret) 4787 goto err_out_probe; 4788 4789 ret = rbd_dev_probe_parent(rbd_dev); 4790 if (!ret) 4791 return 0; 4792 4793 err_out_probe: 4794 rbd_dev_unprobe(rbd_dev); 4795 err_out_watch: 4796 tmp = rbd_dev_header_watch_sync(rbd_dev, 0); 4797 if (tmp) 4798 rbd_warn(rbd_dev, "unable to tear down watch request\n"); 4799 out_header_name: 4800 kfree(rbd_dev->header_name); 4801 rbd_dev->header_name = NULL; 4802 err_out_format: 4803 rbd_dev->image_format = 0; 4804 kfree(rbd_dev->spec->image_id); 4805 rbd_dev->spec->image_id = NULL; 4806 4807 dout("probe failed, returning %d\n", ret); 4808 4809 return ret; 4810 } 4811 4812 static ssize_t rbd_add(struct bus_type *bus, 4813 const char *buf, 4814 size_t count) 4815 { 4816 struct rbd_device *rbd_dev = NULL; 4817 struct ceph_options *ceph_opts = NULL; 4818 struct rbd_options *rbd_opts = NULL; 4819 struct rbd_spec *spec = NULL; 4820 struct rbd_client *rbdc; 4821 struct ceph_osd_client *osdc; 4822 int rc = -ENOMEM; 4823 4824 if (!try_module_get(THIS_MODULE)) 4825 return -ENODEV; 4826 4827 /* parse add command */ 4828 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 4829 if (rc < 0) 4830 goto err_out_module; 4831 4832 rbdc = rbd_get_client(ceph_opts); 4833 if (IS_ERR(rbdc)) { 4834 rc = PTR_ERR(rbdc); 4835 goto err_out_args; 4836 } 4837 ceph_opts = NULL; /* rbd_dev client now owns this */ 4838 4839 /* pick the pool */ 4840 osdc = &rbdc->client->osdc; 4841 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name); 4842 if (rc < 0) 4843 goto err_out_client; 4844 spec->pool_id = (u64)rc; 4845 4846 /* The ceph file layout needs to fit pool id in 32 bits */ 4847 4848 if (spec->pool_id > (u64)U32_MAX) { 4849 rbd_warn(NULL, "pool id too large (%llu > %u)\n", 4850 (unsigned long long)spec->pool_id, U32_MAX); 4851 rc = -EIO; 4852 goto err_out_client; 4853 } 4854 4855 rbd_dev = rbd_dev_create(rbdc, spec); 4856 if (!rbd_dev) 4857 goto err_out_client; 4858 rbdc = NULL; /* rbd_dev now owns this */ 4859 spec = NULL; /* rbd_dev now owns this */ 4860 4861 rbd_dev->mapping.read_only = rbd_opts->read_only; 4862 kfree(rbd_opts); 4863 rbd_opts = NULL; /* done with this */ 4864 4865 rc = rbd_dev_image_probe(rbd_dev); 4866 if (rc < 0) 4867 goto err_out_rbd_dev; 4868 4869 rc = rbd_dev_device_setup(rbd_dev); 4870 if (!rc) 4871 return count; 4872 4873 rbd_dev_image_release(rbd_dev); 4874 err_out_rbd_dev: 4875 rbd_dev_destroy(rbd_dev); 4876 err_out_client: 4877 rbd_put_client(rbdc); 4878 err_out_args: 4879 if (ceph_opts) 4880 ceph_destroy_options(ceph_opts); 4881 kfree(rbd_opts); 4882 rbd_spec_put(spec); 4883 err_out_module: 4884 module_put(THIS_MODULE); 4885 4886 dout("Error adding device %s\n", buf); 4887 4888 return (ssize_t)rc; 4889 } 4890 4891 static struct rbd_device *__rbd_get_dev(unsigned long dev_id) 4892 { 4893 struct list_head *tmp; 4894 struct rbd_device *rbd_dev; 4895 4896 spin_lock(&rbd_dev_list_lock); 4897 list_for_each(tmp, &rbd_dev_list) { 4898 rbd_dev = list_entry(tmp, struct rbd_device, node); 4899 if (rbd_dev->dev_id == dev_id) { 4900 spin_unlock(&rbd_dev_list_lock); 4901 return rbd_dev; 4902 } 4903 } 4904 spin_unlock(&rbd_dev_list_lock); 4905 return NULL; 4906 } 4907 4908 static void rbd_dev_device_release(struct device *dev) 4909 { 4910 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4911 4912 rbd_free_disk(rbd_dev); 4913 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 4914 rbd_dev_clear_mapping(rbd_dev); 4915 unregister_blkdev(rbd_dev->major, rbd_dev->name); 4916 rbd_dev->major = 0; 4917 rbd_dev_id_put(rbd_dev); 4918 rbd_dev_mapping_clear(rbd_dev); 4919 } 4920 4921 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) 4922 { 4923 while (rbd_dev->parent) { 4924 struct rbd_device *first = rbd_dev; 4925 struct rbd_device *second = first->parent; 4926 struct rbd_device *third; 4927 4928 /* 4929 * Follow to the parent with no grandparent and 4930 * remove it. 4931 */ 4932 while (second && (third = second->parent)) { 4933 first = second; 4934 second = third; 4935 } 4936 rbd_assert(second); 4937 rbd_dev_image_release(second); 4938 first->parent = NULL; 4939 first->parent_overlap = 0; 4940 4941 rbd_assert(first->parent_spec); 4942 rbd_spec_put(first->parent_spec); 4943 first->parent_spec = NULL; 4944 } 4945 } 4946 4947 static ssize_t rbd_remove(struct bus_type *bus, 4948 const char *buf, 4949 size_t count) 4950 { 4951 struct rbd_device *rbd_dev = NULL; 4952 int target_id; 4953 unsigned long ul; 4954 int ret; 4955 4956 ret = strict_strtoul(buf, 10, &ul); 4957 if (ret) 4958 return ret; 4959 4960 /* convert to int; abort if we lost anything in the conversion */ 4961 target_id = (int) ul; 4962 if (target_id != ul) 4963 return -EINVAL; 4964 4965 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 4966 4967 rbd_dev = __rbd_get_dev(target_id); 4968 if (!rbd_dev) { 4969 ret = -ENOENT; 4970 goto done; 4971 } 4972 4973 spin_lock_irq(&rbd_dev->lock); 4974 if (rbd_dev->open_count) 4975 ret = -EBUSY; 4976 else 4977 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags); 4978 spin_unlock_irq(&rbd_dev->lock); 4979 if (ret < 0) 4980 goto done; 4981 ret = count; 4982 rbd_bus_del_dev(rbd_dev); 4983 rbd_dev_image_release(rbd_dev); 4984 module_put(THIS_MODULE); 4985 done: 4986 mutex_unlock(&ctl_mutex); 4987 4988 return ret; 4989 } 4990 4991 /* 4992 * create control files in sysfs 4993 * /sys/bus/rbd/... 4994 */ 4995 static int rbd_sysfs_init(void) 4996 { 4997 int ret; 4998 4999 ret = device_register(&rbd_root_dev); 5000 if (ret < 0) 5001 return ret; 5002 5003 ret = bus_register(&rbd_bus_type); 5004 if (ret < 0) 5005 device_unregister(&rbd_root_dev); 5006 5007 return ret; 5008 } 5009 5010 static void rbd_sysfs_cleanup(void) 5011 { 5012 bus_unregister(&rbd_bus_type); 5013 device_unregister(&rbd_root_dev); 5014 } 5015 5016 static int rbd_slab_init(void) 5017 { 5018 rbd_assert(!rbd_img_request_cache); 5019 rbd_img_request_cache = kmem_cache_create("rbd_img_request", 5020 sizeof (struct rbd_img_request), 5021 __alignof__(struct rbd_img_request), 5022 0, NULL); 5023 if (!rbd_img_request_cache) 5024 return -ENOMEM; 5025 5026 rbd_assert(!rbd_obj_request_cache); 5027 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request", 5028 sizeof (struct rbd_obj_request), 5029 __alignof__(struct rbd_obj_request), 5030 0, NULL); 5031 if (!rbd_obj_request_cache) 5032 goto out_err; 5033 5034 rbd_assert(!rbd_segment_name_cache); 5035 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name", 5036 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL); 5037 if (rbd_segment_name_cache) 5038 return 0; 5039 out_err: 5040 if (rbd_obj_request_cache) { 5041 kmem_cache_destroy(rbd_obj_request_cache); 5042 rbd_obj_request_cache = NULL; 5043 } 5044 5045 kmem_cache_destroy(rbd_img_request_cache); 5046 rbd_img_request_cache = NULL; 5047 5048 return -ENOMEM; 5049 } 5050 5051 static void rbd_slab_exit(void) 5052 { 5053 rbd_assert(rbd_segment_name_cache); 5054 kmem_cache_destroy(rbd_segment_name_cache); 5055 rbd_segment_name_cache = NULL; 5056 5057 rbd_assert(rbd_obj_request_cache); 5058 kmem_cache_destroy(rbd_obj_request_cache); 5059 rbd_obj_request_cache = NULL; 5060 5061 rbd_assert(rbd_img_request_cache); 5062 kmem_cache_destroy(rbd_img_request_cache); 5063 rbd_img_request_cache = NULL; 5064 } 5065 5066 static int __init rbd_init(void) 5067 { 5068 int rc; 5069 5070 if (!libceph_compatible(NULL)) { 5071 rbd_warn(NULL, "libceph incompatibility (quitting)"); 5072 5073 return -EINVAL; 5074 } 5075 rc = rbd_slab_init(); 5076 if (rc) 5077 return rc; 5078 rc = rbd_sysfs_init(); 5079 if (rc) 5080 rbd_slab_exit(); 5081 else 5082 pr_info("loaded " RBD_DRV_NAME_LONG "\n"); 5083 5084 return rc; 5085 } 5086 5087 static void __exit rbd_exit(void) 5088 { 5089 rbd_sysfs_cleanup(); 5090 rbd_slab_exit(); 5091 } 5092 5093 module_init(rbd_init); 5094 module_exit(rbd_exit); 5095 5096 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 5097 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 5098 MODULE_DESCRIPTION("rados block device"); 5099 5100 /* following authorship retained from original osdblk.c */ 5101 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 5102 5103 MODULE_LICENSE("GPL"); 5104