1 2 /* 3 rbd.c -- Export ceph rados objects as a Linux block device 4 5 6 based on drivers/block/osdblk.c: 7 8 Copyright 2009 Red Hat, Inc. 9 10 This program is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; see the file COPYING. If not, write to 21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 22 23 24 25 For usage instructions, please refer to: 26 27 Documentation/ABI/testing/sysfs-bus-rbd 28 29 */ 30 31 #include <linux/ceph/libceph.h> 32 #include <linux/ceph/osd_client.h> 33 #include <linux/ceph/mon_client.h> 34 #include <linux/ceph/cls_lock_client.h> 35 #include <linux/ceph/striper.h> 36 #include <linux/ceph/decode.h> 37 #include <linux/parser.h> 38 #include <linux/bsearch.h> 39 40 #include <linux/kernel.h> 41 #include <linux/device.h> 42 #include <linux/module.h> 43 #include <linux/blk-mq.h> 44 #include <linux/fs.h> 45 #include <linux/blkdev.h> 46 #include <linux/slab.h> 47 #include <linux/idr.h> 48 #include <linux/workqueue.h> 49 50 #include "rbd_types.h" 51 52 #define RBD_DEBUG /* Activate rbd_assert() calls */ 53 54 /* 55 * Increment the given counter and return its updated value. 56 * If the counter is already 0 it will not be incremented. 57 * If the counter is already at its maximum value returns 58 * -EINVAL without updating it. 59 */ 60 static int atomic_inc_return_safe(atomic_t *v) 61 { 62 unsigned int counter; 63 64 counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0); 65 if (counter <= (unsigned int)INT_MAX) 66 return (int)counter; 67 68 atomic_dec(v); 69 70 return -EINVAL; 71 } 72 73 /* Decrement the counter. Return the resulting value, or -EINVAL */ 74 static int atomic_dec_return_safe(atomic_t *v) 75 { 76 int counter; 77 78 counter = atomic_dec_return(v); 79 if (counter >= 0) 80 return counter; 81 82 atomic_inc(v); 83 84 return -EINVAL; 85 } 86 87 #define RBD_DRV_NAME "rbd" 88 89 #define RBD_MINORS_PER_MAJOR 256 90 #define RBD_SINGLE_MAJOR_PART_SHIFT 4 91 92 #define RBD_MAX_PARENT_CHAIN_LEN 16 93 94 #define RBD_SNAP_DEV_NAME_PREFIX "snap_" 95 #define RBD_MAX_SNAP_NAME_LEN \ 96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 97 98 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 99 100 #define RBD_SNAP_HEAD_NAME "-" 101 102 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */ 103 104 /* This allows a single page to hold an image name sent by OSD */ 105 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 106 #define RBD_IMAGE_ID_LEN_MAX 64 107 108 #define RBD_OBJ_PREFIX_LEN_MAX 64 109 110 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */ 111 #define RBD_RETRY_DELAY msecs_to_jiffies(1000) 112 113 /* Feature bits */ 114 115 #define RBD_FEATURE_LAYERING (1ULL<<0) 116 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1) 117 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) 118 #define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5) 119 #define RBD_FEATURE_DATA_POOL (1ULL<<7) 120 #define RBD_FEATURE_OPERATIONS (1ULL<<8) 121 122 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ 123 RBD_FEATURE_STRIPINGV2 | \ 124 RBD_FEATURE_EXCLUSIVE_LOCK | \ 125 RBD_FEATURE_DEEP_FLATTEN | \ 126 RBD_FEATURE_DATA_POOL | \ 127 RBD_FEATURE_OPERATIONS) 128 129 /* Features supported by this (client software) implementation. */ 130 131 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL) 132 133 /* 134 * An RBD device name will be "rbd#", where the "rbd" comes from 135 * RBD_DRV_NAME above, and # is a unique integer identifier. 136 */ 137 #define DEV_NAME_LEN 32 138 139 /* 140 * block device image metadata (in-memory version) 141 */ 142 struct rbd_image_header { 143 /* These six fields never change for a given rbd image */ 144 char *object_prefix; 145 __u8 obj_order; 146 u64 stripe_unit; 147 u64 stripe_count; 148 s64 data_pool_id; 149 u64 features; /* Might be changeable someday? */ 150 151 /* The remaining fields need to be updated occasionally */ 152 u64 image_size; 153 struct ceph_snap_context *snapc; 154 char *snap_names; /* format 1 only */ 155 u64 *snap_sizes; /* format 1 only */ 156 }; 157 158 /* 159 * An rbd image specification. 160 * 161 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 162 * identify an image. Each rbd_dev structure includes a pointer to 163 * an rbd_spec structure that encapsulates this identity. 164 * 165 * Each of the id's in an rbd_spec has an associated name. For a 166 * user-mapped image, the names are supplied and the id's associated 167 * with them are looked up. For a layered image, a parent image is 168 * defined by the tuple, and the names are looked up. 169 * 170 * An rbd_dev structure contains a parent_spec pointer which is 171 * non-null if the image it represents is a child in a layered 172 * image. This pointer will refer to the rbd_spec structure used 173 * by the parent rbd_dev for its own identity (i.e., the structure 174 * is shared between the parent and child). 175 * 176 * Since these structures are populated once, during the discovery 177 * phase of image construction, they are effectively immutable so 178 * we make no effort to synchronize access to them. 179 * 180 * Note that code herein does not assume the image name is known (it 181 * could be a null pointer). 182 */ 183 struct rbd_spec { 184 u64 pool_id; 185 const char *pool_name; 186 const char *pool_ns; /* NULL if default, never "" */ 187 188 const char *image_id; 189 const char *image_name; 190 191 u64 snap_id; 192 const char *snap_name; 193 194 struct kref kref; 195 }; 196 197 /* 198 * an instance of the client. multiple devices may share an rbd client. 199 */ 200 struct rbd_client { 201 struct ceph_client *client; 202 struct kref kref; 203 struct list_head node; 204 }; 205 206 struct rbd_img_request; 207 208 enum obj_request_type { 209 OBJ_REQUEST_NODATA = 1, 210 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */ 211 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */ 212 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */ 213 }; 214 215 enum obj_operation_type { 216 OBJ_OP_READ = 1, 217 OBJ_OP_WRITE, 218 OBJ_OP_DISCARD, 219 OBJ_OP_ZEROOUT, 220 }; 221 222 /* 223 * Writes go through the following state machine to deal with 224 * layering: 225 * 226 * . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . . 227 * . | . 228 * . v . 229 * . RBD_OBJ_WRITE_READ_FROM_PARENT. . . . 230 * . | . . 231 * . v v (deep-copyup . 232 * (image . RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC . not needed) . 233 * flattened) v | . . 234 * . v . . 235 * . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . . (copyup . 236 * | not needed) v 237 * v . 238 * done . . . . . . . . . . . . . . . . . . 239 * ^ 240 * | 241 * RBD_OBJ_WRITE_FLAT 242 * 243 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether 244 * assert_exists guard is needed or not (in some cases it's not needed 245 * even if there is a parent). 246 */ 247 enum rbd_obj_write_state { 248 RBD_OBJ_WRITE_FLAT = 1, 249 RBD_OBJ_WRITE_GUARD, 250 RBD_OBJ_WRITE_READ_FROM_PARENT, 251 RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC, 252 RBD_OBJ_WRITE_COPYUP_OPS, 253 }; 254 255 struct rbd_obj_request { 256 struct ceph_object_extent ex; 257 union { 258 bool tried_parent; /* for reads */ 259 enum rbd_obj_write_state write_state; /* for writes */ 260 }; 261 262 struct rbd_img_request *img_request; 263 struct ceph_file_extent *img_extents; 264 u32 num_img_extents; 265 266 union { 267 struct ceph_bio_iter bio_pos; 268 struct { 269 struct ceph_bvec_iter bvec_pos; 270 u32 bvec_count; 271 u32 bvec_idx; 272 }; 273 }; 274 struct bio_vec *copyup_bvecs; 275 u32 copyup_bvec_count; 276 277 struct ceph_osd_request *osd_req; 278 279 u64 xferred; /* bytes transferred */ 280 int result; 281 282 struct kref kref; 283 }; 284 285 enum img_req_flags { 286 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ 287 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ 288 }; 289 290 struct rbd_img_request { 291 struct rbd_device *rbd_dev; 292 enum obj_operation_type op_type; 293 enum obj_request_type data_type; 294 unsigned long flags; 295 union { 296 u64 snap_id; /* for reads */ 297 struct ceph_snap_context *snapc; /* for writes */ 298 }; 299 union { 300 struct request *rq; /* block request */ 301 struct rbd_obj_request *obj_request; /* obj req initiator */ 302 }; 303 spinlock_t completion_lock; 304 u64 xferred;/* aggregate bytes transferred */ 305 int result; /* first nonzero obj_request result */ 306 307 struct list_head object_extents; /* obj_req.ex structs */ 308 u32 pending_count; 309 310 struct kref kref; 311 }; 312 313 #define for_each_obj_request(ireq, oreq) \ 314 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item) 315 #define for_each_obj_request_safe(ireq, oreq, n) \ 316 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item) 317 318 enum rbd_watch_state { 319 RBD_WATCH_STATE_UNREGISTERED, 320 RBD_WATCH_STATE_REGISTERED, 321 RBD_WATCH_STATE_ERROR, 322 }; 323 324 enum rbd_lock_state { 325 RBD_LOCK_STATE_UNLOCKED, 326 RBD_LOCK_STATE_LOCKED, 327 RBD_LOCK_STATE_RELEASING, 328 }; 329 330 /* WatchNotify::ClientId */ 331 struct rbd_client_id { 332 u64 gid; 333 u64 handle; 334 }; 335 336 struct rbd_mapping { 337 u64 size; 338 u64 features; 339 }; 340 341 /* 342 * a single device 343 */ 344 struct rbd_device { 345 int dev_id; /* blkdev unique id */ 346 347 int major; /* blkdev assigned major */ 348 int minor; 349 struct gendisk *disk; /* blkdev's gendisk and rq */ 350 351 u32 image_format; /* Either 1 or 2 */ 352 struct rbd_client *rbd_client; 353 354 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 355 356 spinlock_t lock; /* queue, flags, open_count */ 357 358 struct rbd_image_header header; 359 unsigned long flags; /* possibly lock protected */ 360 struct rbd_spec *spec; 361 struct rbd_options *opts; 362 char *config_info; /* add{,_single_major} string */ 363 364 struct ceph_object_id header_oid; 365 struct ceph_object_locator header_oloc; 366 367 struct ceph_file_layout layout; /* used for all rbd requests */ 368 369 struct mutex watch_mutex; 370 enum rbd_watch_state watch_state; 371 struct ceph_osd_linger_request *watch_handle; 372 u64 watch_cookie; 373 struct delayed_work watch_dwork; 374 375 struct rw_semaphore lock_rwsem; 376 enum rbd_lock_state lock_state; 377 char lock_cookie[32]; 378 struct rbd_client_id owner_cid; 379 struct work_struct acquired_lock_work; 380 struct work_struct released_lock_work; 381 struct delayed_work lock_dwork; 382 struct work_struct unlock_work; 383 wait_queue_head_t lock_waitq; 384 385 struct workqueue_struct *task_wq; 386 387 struct rbd_spec *parent_spec; 388 u64 parent_overlap; 389 atomic_t parent_ref; 390 struct rbd_device *parent; 391 392 /* Block layer tags. */ 393 struct blk_mq_tag_set tag_set; 394 395 /* protects updating the header */ 396 struct rw_semaphore header_rwsem; 397 398 struct rbd_mapping mapping; 399 400 struct list_head node; 401 402 /* sysfs related */ 403 struct device dev; 404 unsigned long open_count; /* protected by lock */ 405 }; 406 407 /* 408 * Flag bits for rbd_dev->flags: 409 * - REMOVING (which is coupled with rbd_dev->open_count) is protected 410 * by rbd_dev->lock 411 * - BLACKLISTED is protected by rbd_dev->lock_rwsem 412 */ 413 enum rbd_dev_flags { 414 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ 415 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ 416 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ 417 }; 418 419 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ 420 421 static LIST_HEAD(rbd_dev_list); /* devices */ 422 static DEFINE_SPINLOCK(rbd_dev_list_lock); 423 424 static LIST_HEAD(rbd_client_list); /* clients */ 425 static DEFINE_SPINLOCK(rbd_client_list_lock); 426 427 /* Slab caches for frequently-allocated structures */ 428 429 static struct kmem_cache *rbd_img_request_cache; 430 static struct kmem_cache *rbd_obj_request_cache; 431 432 static int rbd_major; 433 static DEFINE_IDA(rbd_dev_id_ida); 434 435 static struct workqueue_struct *rbd_wq; 436 437 static struct ceph_snap_context rbd_empty_snapc = { 438 .nref = REFCOUNT_INIT(1), 439 }; 440 441 /* 442 * single-major requires >= 0.75 version of userspace rbd utility. 443 */ 444 static bool single_major = true; 445 module_param(single_major, bool, 0444); 446 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)"); 447 448 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count); 449 static ssize_t remove_store(struct bus_type *bus, const char *buf, 450 size_t count); 451 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf, 452 size_t count); 453 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf, 454 size_t count); 455 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth); 456 457 static int rbd_dev_id_to_minor(int dev_id) 458 { 459 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT; 460 } 461 462 static int minor_to_rbd_dev_id(int minor) 463 { 464 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; 465 } 466 467 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) 468 { 469 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || 470 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; 471 } 472 473 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) 474 { 475 bool is_lock_owner; 476 477 down_read(&rbd_dev->lock_rwsem); 478 is_lock_owner = __rbd_is_lock_owner(rbd_dev); 479 up_read(&rbd_dev->lock_rwsem); 480 return is_lock_owner; 481 } 482 483 static ssize_t supported_features_show(struct bus_type *bus, char *buf) 484 { 485 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED); 486 } 487 488 static BUS_ATTR_WO(add); 489 static BUS_ATTR_WO(remove); 490 static BUS_ATTR_WO(add_single_major); 491 static BUS_ATTR_WO(remove_single_major); 492 static BUS_ATTR_RO(supported_features); 493 494 static struct attribute *rbd_bus_attrs[] = { 495 &bus_attr_add.attr, 496 &bus_attr_remove.attr, 497 &bus_attr_add_single_major.attr, 498 &bus_attr_remove_single_major.attr, 499 &bus_attr_supported_features.attr, 500 NULL, 501 }; 502 503 static umode_t rbd_bus_is_visible(struct kobject *kobj, 504 struct attribute *attr, int index) 505 { 506 if (!single_major && 507 (attr == &bus_attr_add_single_major.attr || 508 attr == &bus_attr_remove_single_major.attr)) 509 return 0; 510 511 return attr->mode; 512 } 513 514 static const struct attribute_group rbd_bus_group = { 515 .attrs = rbd_bus_attrs, 516 .is_visible = rbd_bus_is_visible, 517 }; 518 __ATTRIBUTE_GROUPS(rbd_bus); 519 520 static struct bus_type rbd_bus_type = { 521 .name = "rbd", 522 .bus_groups = rbd_bus_groups, 523 }; 524 525 static void rbd_root_dev_release(struct device *dev) 526 { 527 } 528 529 static struct device rbd_root_dev = { 530 .init_name = "rbd", 531 .release = rbd_root_dev_release, 532 }; 533 534 static __printf(2, 3) 535 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) 536 { 537 struct va_format vaf; 538 va_list args; 539 540 va_start(args, fmt); 541 vaf.fmt = fmt; 542 vaf.va = &args; 543 544 if (!rbd_dev) 545 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf); 546 else if (rbd_dev->disk) 547 printk(KERN_WARNING "%s: %s: %pV\n", 548 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf); 549 else if (rbd_dev->spec && rbd_dev->spec->image_name) 550 printk(KERN_WARNING "%s: image %s: %pV\n", 551 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf); 552 else if (rbd_dev->spec && rbd_dev->spec->image_id) 553 printk(KERN_WARNING "%s: id %s: %pV\n", 554 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf); 555 else /* punt */ 556 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n", 557 RBD_DRV_NAME, rbd_dev, &vaf); 558 va_end(args); 559 } 560 561 #ifdef RBD_DEBUG 562 #define rbd_assert(expr) \ 563 if (unlikely(!(expr))) { \ 564 printk(KERN_ERR "\nAssertion failure in %s() " \ 565 "at line %d:\n\n" \ 566 "\trbd_assert(%s);\n\n", \ 567 __func__, __LINE__, #expr); \ 568 BUG(); \ 569 } 570 #else /* !RBD_DEBUG */ 571 # define rbd_assert(expr) ((void) 0) 572 #endif /* !RBD_DEBUG */ 573 574 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); 575 576 static int rbd_dev_refresh(struct rbd_device *rbd_dev); 577 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev); 578 static int rbd_dev_header_info(struct rbd_device *rbd_dev); 579 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev); 580 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 581 u64 snap_id); 582 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 583 u8 *order, u64 *snap_size); 584 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 585 u64 *snap_features); 586 587 static int rbd_open(struct block_device *bdev, fmode_t mode) 588 { 589 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 590 bool removing = false; 591 592 spin_lock_irq(&rbd_dev->lock); 593 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) 594 removing = true; 595 else 596 rbd_dev->open_count++; 597 spin_unlock_irq(&rbd_dev->lock); 598 if (removing) 599 return -ENOENT; 600 601 (void) get_device(&rbd_dev->dev); 602 603 return 0; 604 } 605 606 static void rbd_release(struct gendisk *disk, fmode_t mode) 607 { 608 struct rbd_device *rbd_dev = disk->private_data; 609 unsigned long open_count_before; 610 611 spin_lock_irq(&rbd_dev->lock); 612 open_count_before = rbd_dev->open_count--; 613 spin_unlock_irq(&rbd_dev->lock); 614 rbd_assert(open_count_before > 0); 615 616 put_device(&rbd_dev->dev); 617 } 618 619 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg) 620 { 621 int ro; 622 623 if (get_user(ro, (int __user *)arg)) 624 return -EFAULT; 625 626 /* Snapshots can't be marked read-write */ 627 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro) 628 return -EROFS; 629 630 /* Let blkdev_roset() handle it */ 631 return -ENOTTY; 632 } 633 634 static int rbd_ioctl(struct block_device *bdev, fmode_t mode, 635 unsigned int cmd, unsigned long arg) 636 { 637 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 638 int ret; 639 640 switch (cmd) { 641 case BLKROSET: 642 ret = rbd_ioctl_set_ro(rbd_dev, arg); 643 break; 644 default: 645 ret = -ENOTTY; 646 } 647 648 return ret; 649 } 650 651 #ifdef CONFIG_COMPAT 652 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode, 653 unsigned int cmd, unsigned long arg) 654 { 655 return rbd_ioctl(bdev, mode, cmd, arg); 656 } 657 #endif /* CONFIG_COMPAT */ 658 659 static const struct block_device_operations rbd_bd_ops = { 660 .owner = THIS_MODULE, 661 .open = rbd_open, 662 .release = rbd_release, 663 .ioctl = rbd_ioctl, 664 #ifdef CONFIG_COMPAT 665 .compat_ioctl = rbd_compat_ioctl, 666 #endif 667 }; 668 669 /* 670 * Initialize an rbd client instance. Success or not, this function 671 * consumes ceph_opts. Caller holds client_mutex. 672 */ 673 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 674 { 675 struct rbd_client *rbdc; 676 int ret = -ENOMEM; 677 678 dout("%s:\n", __func__); 679 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 680 if (!rbdc) 681 goto out_opt; 682 683 kref_init(&rbdc->kref); 684 INIT_LIST_HEAD(&rbdc->node); 685 686 rbdc->client = ceph_create_client(ceph_opts, rbdc); 687 if (IS_ERR(rbdc->client)) 688 goto out_rbdc; 689 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 690 691 ret = ceph_open_session(rbdc->client); 692 if (ret < 0) 693 goto out_client; 694 695 spin_lock(&rbd_client_list_lock); 696 list_add_tail(&rbdc->node, &rbd_client_list); 697 spin_unlock(&rbd_client_list_lock); 698 699 dout("%s: rbdc %p\n", __func__, rbdc); 700 701 return rbdc; 702 out_client: 703 ceph_destroy_client(rbdc->client); 704 out_rbdc: 705 kfree(rbdc); 706 out_opt: 707 if (ceph_opts) 708 ceph_destroy_options(ceph_opts); 709 dout("%s: error %d\n", __func__, ret); 710 711 return ERR_PTR(ret); 712 } 713 714 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) 715 { 716 kref_get(&rbdc->kref); 717 718 return rbdc; 719 } 720 721 /* 722 * Find a ceph client with specific addr and configuration. If 723 * found, bump its reference count. 724 */ 725 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) 726 { 727 struct rbd_client *client_node; 728 bool found = false; 729 730 if (ceph_opts->flags & CEPH_OPT_NOSHARE) 731 return NULL; 732 733 spin_lock(&rbd_client_list_lock); 734 list_for_each_entry(client_node, &rbd_client_list, node) { 735 if (!ceph_compare_options(ceph_opts, client_node->client)) { 736 __rbd_get_client(client_node); 737 738 found = true; 739 break; 740 } 741 } 742 spin_unlock(&rbd_client_list_lock); 743 744 return found ? client_node : NULL; 745 } 746 747 /* 748 * (Per device) rbd map options 749 */ 750 enum { 751 Opt_queue_depth, 752 Opt_alloc_size, 753 Opt_lock_timeout, 754 Opt_last_int, 755 /* int args above */ 756 Opt_pool_ns, 757 Opt_last_string, 758 /* string args above */ 759 Opt_read_only, 760 Opt_read_write, 761 Opt_lock_on_read, 762 Opt_exclusive, 763 Opt_notrim, 764 Opt_err 765 }; 766 767 static match_table_t rbd_opts_tokens = { 768 {Opt_queue_depth, "queue_depth=%d"}, 769 {Opt_alloc_size, "alloc_size=%d"}, 770 {Opt_lock_timeout, "lock_timeout=%d"}, 771 /* int args above */ 772 {Opt_pool_ns, "_pool_ns=%s"}, 773 /* string args above */ 774 {Opt_read_only, "read_only"}, 775 {Opt_read_only, "ro"}, /* Alternate spelling */ 776 {Opt_read_write, "read_write"}, 777 {Opt_read_write, "rw"}, /* Alternate spelling */ 778 {Opt_lock_on_read, "lock_on_read"}, 779 {Opt_exclusive, "exclusive"}, 780 {Opt_notrim, "notrim"}, 781 {Opt_err, NULL} 782 }; 783 784 struct rbd_options { 785 int queue_depth; 786 int alloc_size; 787 unsigned long lock_timeout; 788 bool read_only; 789 bool lock_on_read; 790 bool exclusive; 791 bool trim; 792 }; 793 794 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ 795 #define RBD_ALLOC_SIZE_DEFAULT (64 * 1024) 796 #define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */ 797 #define RBD_READ_ONLY_DEFAULT false 798 #define RBD_LOCK_ON_READ_DEFAULT false 799 #define RBD_EXCLUSIVE_DEFAULT false 800 #define RBD_TRIM_DEFAULT true 801 802 struct parse_rbd_opts_ctx { 803 struct rbd_spec *spec; 804 struct rbd_options *opts; 805 }; 806 807 static int parse_rbd_opts_token(char *c, void *private) 808 { 809 struct parse_rbd_opts_ctx *pctx = private; 810 substring_t argstr[MAX_OPT_ARGS]; 811 int token, intval, ret; 812 813 token = match_token(c, rbd_opts_tokens, argstr); 814 if (token < Opt_last_int) { 815 ret = match_int(&argstr[0], &intval); 816 if (ret < 0) { 817 pr_err("bad option arg (not int) at '%s'\n", c); 818 return ret; 819 } 820 dout("got int token %d val %d\n", token, intval); 821 } else if (token > Opt_last_int && token < Opt_last_string) { 822 dout("got string token %d val %s\n", token, argstr[0].from); 823 } else { 824 dout("got token %d\n", token); 825 } 826 827 switch (token) { 828 case Opt_queue_depth: 829 if (intval < 1) { 830 pr_err("queue_depth out of range\n"); 831 return -EINVAL; 832 } 833 pctx->opts->queue_depth = intval; 834 break; 835 case Opt_alloc_size: 836 if (intval < SECTOR_SIZE) { 837 pr_err("alloc_size out of range\n"); 838 return -EINVAL; 839 } 840 if (!is_power_of_2(intval)) { 841 pr_err("alloc_size must be a power of 2\n"); 842 return -EINVAL; 843 } 844 pctx->opts->alloc_size = intval; 845 break; 846 case Opt_lock_timeout: 847 /* 0 is "wait forever" (i.e. infinite timeout) */ 848 if (intval < 0 || intval > INT_MAX / 1000) { 849 pr_err("lock_timeout out of range\n"); 850 return -EINVAL; 851 } 852 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000); 853 break; 854 case Opt_pool_ns: 855 kfree(pctx->spec->pool_ns); 856 pctx->spec->pool_ns = match_strdup(argstr); 857 if (!pctx->spec->pool_ns) 858 return -ENOMEM; 859 break; 860 case Opt_read_only: 861 pctx->opts->read_only = true; 862 break; 863 case Opt_read_write: 864 pctx->opts->read_only = false; 865 break; 866 case Opt_lock_on_read: 867 pctx->opts->lock_on_read = true; 868 break; 869 case Opt_exclusive: 870 pctx->opts->exclusive = true; 871 break; 872 case Opt_notrim: 873 pctx->opts->trim = false; 874 break; 875 default: 876 /* libceph prints "bad option" msg */ 877 return -EINVAL; 878 } 879 880 return 0; 881 } 882 883 static char* obj_op_name(enum obj_operation_type op_type) 884 { 885 switch (op_type) { 886 case OBJ_OP_READ: 887 return "read"; 888 case OBJ_OP_WRITE: 889 return "write"; 890 case OBJ_OP_DISCARD: 891 return "discard"; 892 case OBJ_OP_ZEROOUT: 893 return "zeroout"; 894 default: 895 return "???"; 896 } 897 } 898 899 /* 900 * Destroy ceph client 901 * 902 * Caller must hold rbd_client_list_lock. 903 */ 904 static void rbd_client_release(struct kref *kref) 905 { 906 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 907 908 dout("%s: rbdc %p\n", __func__, rbdc); 909 spin_lock(&rbd_client_list_lock); 910 list_del(&rbdc->node); 911 spin_unlock(&rbd_client_list_lock); 912 913 ceph_destroy_client(rbdc->client); 914 kfree(rbdc); 915 } 916 917 /* 918 * Drop reference to ceph client node. If it's not referenced anymore, release 919 * it. 920 */ 921 static void rbd_put_client(struct rbd_client *rbdc) 922 { 923 if (rbdc) 924 kref_put(&rbdc->kref, rbd_client_release); 925 } 926 927 /* 928 * Get a ceph client with specific addr and configuration, if one does 929 * not exist create it. Either way, ceph_opts is consumed by this 930 * function. 931 */ 932 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) 933 { 934 struct rbd_client *rbdc; 935 int ret; 936 937 mutex_lock(&client_mutex); 938 rbdc = rbd_client_find(ceph_opts); 939 if (rbdc) { 940 ceph_destroy_options(ceph_opts); 941 942 /* 943 * Using an existing client. Make sure ->pg_pools is up to 944 * date before we look up the pool id in do_rbd_add(). 945 */ 946 ret = ceph_wait_for_latest_osdmap(rbdc->client, 947 rbdc->client->options->mount_timeout); 948 if (ret) { 949 rbd_warn(NULL, "failed to get latest osdmap: %d", ret); 950 rbd_put_client(rbdc); 951 rbdc = ERR_PTR(ret); 952 } 953 } else { 954 rbdc = rbd_client_create(ceph_opts); 955 } 956 mutex_unlock(&client_mutex); 957 958 return rbdc; 959 } 960 961 static bool rbd_image_format_valid(u32 image_format) 962 { 963 return image_format == 1 || image_format == 2; 964 } 965 966 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) 967 { 968 size_t size; 969 u32 snap_count; 970 971 /* The header has to start with the magic rbd header text */ 972 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 973 return false; 974 975 /* The bio layer requires at least sector-sized I/O */ 976 977 if (ondisk->options.order < SECTOR_SHIFT) 978 return false; 979 980 /* If we use u64 in a few spots we may be able to loosen this */ 981 982 if (ondisk->options.order > 8 * sizeof (int) - 1) 983 return false; 984 985 /* 986 * The size of a snapshot header has to fit in a size_t, and 987 * that limits the number of snapshots. 988 */ 989 snap_count = le32_to_cpu(ondisk->snap_count); 990 size = SIZE_MAX - sizeof (struct ceph_snap_context); 991 if (snap_count > size / sizeof (__le64)) 992 return false; 993 994 /* 995 * Not only that, but the size of the entire the snapshot 996 * header must also be representable in a size_t. 997 */ 998 size -= snap_count * sizeof (__le64); 999 if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) 1000 return false; 1001 1002 return true; 1003 } 1004 1005 /* 1006 * returns the size of an object in the image 1007 */ 1008 static u32 rbd_obj_bytes(struct rbd_image_header *header) 1009 { 1010 return 1U << header->obj_order; 1011 } 1012 1013 static void rbd_init_layout(struct rbd_device *rbd_dev) 1014 { 1015 if (rbd_dev->header.stripe_unit == 0 || 1016 rbd_dev->header.stripe_count == 0) { 1017 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header); 1018 rbd_dev->header.stripe_count = 1; 1019 } 1020 1021 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit; 1022 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count; 1023 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header); 1024 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ? 1025 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id; 1026 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); 1027 } 1028 1029 /* 1030 * Fill an rbd image header with information from the given format 1 1031 * on-disk header. 1032 */ 1033 static int rbd_header_from_disk(struct rbd_device *rbd_dev, 1034 struct rbd_image_header_ondisk *ondisk) 1035 { 1036 struct rbd_image_header *header = &rbd_dev->header; 1037 bool first_time = header->object_prefix == NULL; 1038 struct ceph_snap_context *snapc; 1039 char *object_prefix = NULL; 1040 char *snap_names = NULL; 1041 u64 *snap_sizes = NULL; 1042 u32 snap_count; 1043 int ret = -ENOMEM; 1044 u32 i; 1045 1046 /* Allocate this now to avoid having to handle failure below */ 1047 1048 if (first_time) { 1049 object_prefix = kstrndup(ondisk->object_prefix, 1050 sizeof(ondisk->object_prefix), 1051 GFP_KERNEL); 1052 if (!object_prefix) 1053 return -ENOMEM; 1054 } 1055 1056 /* Allocate the snapshot context and fill it in */ 1057 1058 snap_count = le32_to_cpu(ondisk->snap_count); 1059 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 1060 if (!snapc) 1061 goto out_err; 1062 snapc->seq = le64_to_cpu(ondisk->snap_seq); 1063 if (snap_count) { 1064 struct rbd_image_snap_ondisk *snaps; 1065 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 1066 1067 /* We'll keep a copy of the snapshot names... */ 1068 1069 if (snap_names_len > (u64)SIZE_MAX) 1070 goto out_2big; 1071 snap_names = kmalloc(snap_names_len, GFP_KERNEL); 1072 if (!snap_names) 1073 goto out_err; 1074 1075 /* ...as well as the array of their sizes. */ 1076 snap_sizes = kmalloc_array(snap_count, 1077 sizeof(*header->snap_sizes), 1078 GFP_KERNEL); 1079 if (!snap_sizes) 1080 goto out_err; 1081 1082 /* 1083 * Copy the names, and fill in each snapshot's id 1084 * and size. 1085 * 1086 * Note that rbd_dev_v1_header_info() guarantees the 1087 * ondisk buffer we're working with has 1088 * snap_names_len bytes beyond the end of the 1089 * snapshot id array, this memcpy() is safe. 1090 */ 1091 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len); 1092 snaps = ondisk->snaps; 1093 for (i = 0; i < snap_count; i++) { 1094 snapc->snaps[i] = le64_to_cpu(snaps[i].id); 1095 snap_sizes[i] = le64_to_cpu(snaps[i].image_size); 1096 } 1097 } 1098 1099 /* We won't fail any more, fill in the header */ 1100 1101 if (first_time) { 1102 header->object_prefix = object_prefix; 1103 header->obj_order = ondisk->options.order; 1104 rbd_init_layout(rbd_dev); 1105 } else { 1106 ceph_put_snap_context(header->snapc); 1107 kfree(header->snap_names); 1108 kfree(header->snap_sizes); 1109 } 1110 1111 /* The remaining fields always get updated (when we refresh) */ 1112 1113 header->image_size = le64_to_cpu(ondisk->image_size); 1114 header->snapc = snapc; 1115 header->snap_names = snap_names; 1116 header->snap_sizes = snap_sizes; 1117 1118 return 0; 1119 out_2big: 1120 ret = -EIO; 1121 out_err: 1122 kfree(snap_sizes); 1123 kfree(snap_names); 1124 ceph_put_snap_context(snapc); 1125 kfree(object_prefix); 1126 1127 return ret; 1128 } 1129 1130 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which) 1131 { 1132 const char *snap_name; 1133 1134 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 1135 1136 /* Skip over names until we find the one we are looking for */ 1137 1138 snap_name = rbd_dev->header.snap_names; 1139 while (which--) 1140 snap_name += strlen(snap_name) + 1; 1141 1142 return kstrdup(snap_name, GFP_KERNEL); 1143 } 1144 1145 /* 1146 * Snapshot id comparison function for use with qsort()/bsearch(). 1147 * Note that result is for snapshots in *descending* order. 1148 */ 1149 static int snapid_compare_reverse(const void *s1, const void *s2) 1150 { 1151 u64 snap_id1 = *(u64 *)s1; 1152 u64 snap_id2 = *(u64 *)s2; 1153 1154 if (snap_id1 < snap_id2) 1155 return 1; 1156 return snap_id1 == snap_id2 ? 0 : -1; 1157 } 1158 1159 /* 1160 * Search a snapshot context to see if the given snapshot id is 1161 * present. 1162 * 1163 * Returns the position of the snapshot id in the array if it's found, 1164 * or BAD_SNAP_INDEX otherwise. 1165 * 1166 * Note: The snapshot array is in kept sorted (by the osd) in 1167 * reverse order, highest snapshot id first. 1168 */ 1169 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id) 1170 { 1171 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 1172 u64 *found; 1173 1174 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps, 1175 sizeof (snap_id), snapid_compare_reverse); 1176 1177 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX; 1178 } 1179 1180 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, 1181 u64 snap_id) 1182 { 1183 u32 which; 1184 const char *snap_name; 1185 1186 which = rbd_dev_snap_index(rbd_dev, snap_id); 1187 if (which == BAD_SNAP_INDEX) 1188 return ERR_PTR(-ENOENT); 1189 1190 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which); 1191 return snap_name ? snap_name : ERR_PTR(-ENOMEM); 1192 } 1193 1194 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 1195 { 1196 if (snap_id == CEPH_NOSNAP) 1197 return RBD_SNAP_HEAD_NAME; 1198 1199 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1200 if (rbd_dev->image_format == 1) 1201 return rbd_dev_v1_snap_name(rbd_dev, snap_id); 1202 1203 return rbd_dev_v2_snap_name(rbd_dev, snap_id); 1204 } 1205 1206 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 1207 u64 *snap_size) 1208 { 1209 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1210 if (snap_id == CEPH_NOSNAP) { 1211 *snap_size = rbd_dev->header.image_size; 1212 } else if (rbd_dev->image_format == 1) { 1213 u32 which; 1214 1215 which = rbd_dev_snap_index(rbd_dev, snap_id); 1216 if (which == BAD_SNAP_INDEX) 1217 return -ENOENT; 1218 1219 *snap_size = rbd_dev->header.snap_sizes[which]; 1220 } else { 1221 u64 size = 0; 1222 int ret; 1223 1224 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size); 1225 if (ret) 1226 return ret; 1227 1228 *snap_size = size; 1229 } 1230 return 0; 1231 } 1232 1233 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 1234 u64 *snap_features) 1235 { 1236 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1237 if (snap_id == CEPH_NOSNAP) { 1238 *snap_features = rbd_dev->header.features; 1239 } else if (rbd_dev->image_format == 1) { 1240 *snap_features = 0; /* No features for format 1 */ 1241 } else { 1242 u64 features = 0; 1243 int ret; 1244 1245 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features); 1246 if (ret) 1247 return ret; 1248 1249 *snap_features = features; 1250 } 1251 return 0; 1252 } 1253 1254 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) 1255 { 1256 u64 snap_id = rbd_dev->spec->snap_id; 1257 u64 size = 0; 1258 u64 features = 0; 1259 int ret; 1260 1261 ret = rbd_snap_size(rbd_dev, snap_id, &size); 1262 if (ret) 1263 return ret; 1264 ret = rbd_snap_features(rbd_dev, snap_id, &features); 1265 if (ret) 1266 return ret; 1267 1268 rbd_dev->mapping.size = size; 1269 rbd_dev->mapping.features = features; 1270 1271 return 0; 1272 } 1273 1274 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) 1275 { 1276 rbd_dev->mapping.size = 0; 1277 rbd_dev->mapping.features = 0; 1278 } 1279 1280 static void zero_bvec(struct bio_vec *bv) 1281 { 1282 void *buf; 1283 unsigned long flags; 1284 1285 buf = bvec_kmap_irq(bv, &flags); 1286 memset(buf, 0, bv->bv_len); 1287 flush_dcache_page(bv->bv_page); 1288 bvec_kunmap_irq(buf, &flags); 1289 } 1290 1291 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes) 1292 { 1293 struct ceph_bio_iter it = *bio_pos; 1294 1295 ceph_bio_iter_advance(&it, off); 1296 ceph_bio_iter_advance_step(&it, bytes, ({ 1297 zero_bvec(&bv); 1298 })); 1299 } 1300 1301 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) 1302 { 1303 struct ceph_bvec_iter it = *bvec_pos; 1304 1305 ceph_bvec_iter_advance(&it, off); 1306 ceph_bvec_iter_advance_step(&it, bytes, ({ 1307 zero_bvec(&bv); 1308 })); 1309 } 1310 1311 /* 1312 * Zero a range in @obj_req data buffer defined by a bio (list) or 1313 * (private) bio_vec array. 1314 * 1315 * @off is relative to the start of the data buffer. 1316 */ 1317 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, 1318 u32 bytes) 1319 { 1320 switch (obj_req->img_request->data_type) { 1321 case OBJ_REQUEST_BIO: 1322 zero_bios(&obj_req->bio_pos, off, bytes); 1323 break; 1324 case OBJ_REQUEST_BVECS: 1325 case OBJ_REQUEST_OWN_BVECS: 1326 zero_bvecs(&obj_req->bvec_pos, off, bytes); 1327 break; 1328 default: 1329 BUG(); 1330 } 1331 } 1332 1333 static void rbd_obj_request_destroy(struct kref *kref); 1334 static void rbd_obj_request_put(struct rbd_obj_request *obj_request) 1335 { 1336 rbd_assert(obj_request != NULL); 1337 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1338 kref_read(&obj_request->kref)); 1339 kref_put(&obj_request->kref, rbd_obj_request_destroy); 1340 } 1341 1342 static void rbd_img_request_get(struct rbd_img_request *img_request) 1343 { 1344 dout("%s: img %p (was %d)\n", __func__, img_request, 1345 kref_read(&img_request->kref)); 1346 kref_get(&img_request->kref); 1347 } 1348 1349 static void rbd_img_request_destroy(struct kref *kref); 1350 static void rbd_img_request_put(struct rbd_img_request *img_request) 1351 { 1352 rbd_assert(img_request != NULL); 1353 dout("%s: img %p (was %d)\n", __func__, img_request, 1354 kref_read(&img_request->kref)); 1355 kref_put(&img_request->kref, rbd_img_request_destroy); 1356 } 1357 1358 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, 1359 struct rbd_obj_request *obj_request) 1360 { 1361 rbd_assert(obj_request->img_request == NULL); 1362 1363 /* Image request now owns object's original reference */ 1364 obj_request->img_request = img_request; 1365 img_request->pending_count++; 1366 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1367 } 1368 1369 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, 1370 struct rbd_obj_request *obj_request) 1371 { 1372 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1373 list_del(&obj_request->ex.oe_item); 1374 rbd_assert(obj_request->img_request == img_request); 1375 rbd_obj_request_put(obj_request); 1376 } 1377 1378 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) 1379 { 1380 struct ceph_osd_request *osd_req = obj_request->osd_req; 1381 1382 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, 1383 obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off, 1384 obj_request->ex.oe_len, osd_req); 1385 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); 1386 } 1387 1388 /* 1389 * The default/initial value for all image request flags is 0. Each 1390 * is conditionally set to 1 at image request initialization time 1391 * and currently never change thereafter. 1392 */ 1393 static void img_request_layered_set(struct rbd_img_request *img_request) 1394 { 1395 set_bit(IMG_REQ_LAYERED, &img_request->flags); 1396 smp_mb(); 1397 } 1398 1399 static void img_request_layered_clear(struct rbd_img_request *img_request) 1400 { 1401 clear_bit(IMG_REQ_LAYERED, &img_request->flags); 1402 smp_mb(); 1403 } 1404 1405 static bool img_request_layered_test(struct rbd_img_request *img_request) 1406 { 1407 smp_mb(); 1408 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; 1409 } 1410 1411 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req) 1412 { 1413 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1414 1415 return !obj_req->ex.oe_off && 1416 obj_req->ex.oe_len == rbd_dev->layout.object_size; 1417 } 1418 1419 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req) 1420 { 1421 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1422 1423 return obj_req->ex.oe_off + obj_req->ex.oe_len == 1424 rbd_dev->layout.object_size; 1425 } 1426 1427 /* 1428 * Must be called after rbd_obj_calc_img_extents(). 1429 */ 1430 static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req) 1431 { 1432 if (!obj_req->num_img_extents || 1433 (rbd_obj_is_entire(obj_req) && 1434 !obj_req->img_request->snapc->num_snaps)) 1435 return false; 1436 1437 return true; 1438 } 1439 1440 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req) 1441 { 1442 return ceph_file_extents_bytes(obj_req->img_extents, 1443 obj_req->num_img_extents); 1444 } 1445 1446 static bool rbd_img_is_write(struct rbd_img_request *img_req) 1447 { 1448 switch (img_req->op_type) { 1449 case OBJ_OP_READ: 1450 return false; 1451 case OBJ_OP_WRITE: 1452 case OBJ_OP_DISCARD: 1453 case OBJ_OP_ZEROOUT: 1454 return true; 1455 default: 1456 BUG(); 1457 } 1458 } 1459 1460 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req); 1461 1462 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) 1463 { 1464 struct rbd_obj_request *obj_req = osd_req->r_priv; 1465 1466 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, 1467 osd_req->r_result, obj_req); 1468 rbd_assert(osd_req == obj_req->osd_req); 1469 1470 obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0; 1471 if (!obj_req->result && !rbd_img_is_write(obj_req->img_request)) 1472 obj_req->xferred = osd_req->r_result; 1473 else 1474 /* 1475 * Writes aren't allowed to return a data payload. In some 1476 * guarded write cases (e.g. stat + zero on an empty object) 1477 * a stat response makes it through, but we don't care. 1478 */ 1479 obj_req->xferred = 0; 1480 1481 rbd_obj_handle_request(obj_req); 1482 } 1483 1484 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) 1485 { 1486 struct ceph_osd_request *osd_req = obj_request->osd_req; 1487 1488 osd_req->r_flags = CEPH_OSD_FLAG_READ; 1489 osd_req->r_snapid = obj_request->img_request->snap_id; 1490 } 1491 1492 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) 1493 { 1494 struct ceph_osd_request *osd_req = obj_request->osd_req; 1495 1496 osd_req->r_flags = CEPH_OSD_FLAG_WRITE; 1497 ktime_get_real_ts64(&osd_req->r_mtime); 1498 osd_req->r_data_offset = obj_request->ex.oe_off; 1499 } 1500 1501 static struct ceph_osd_request * 1502 __rbd_osd_req_create(struct rbd_obj_request *obj_req, 1503 struct ceph_snap_context *snapc, unsigned int num_ops) 1504 { 1505 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1506 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1507 struct ceph_osd_request *req; 1508 const char *name_format = rbd_dev->image_format == 1 ? 1509 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; 1510 1511 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); 1512 if (!req) 1513 return NULL; 1514 1515 req->r_callback = rbd_osd_req_callback; 1516 req->r_priv = obj_req; 1517 1518 /* 1519 * Data objects may be stored in a separate pool, but always in 1520 * the same namespace in that pool as the header in its pool. 1521 */ 1522 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); 1523 req->r_base_oloc.pool = rbd_dev->layout.pool_id; 1524 1525 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, 1526 rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) 1527 goto err_req; 1528 1529 return req; 1530 1531 err_req: 1532 ceph_osdc_put_request(req); 1533 return NULL; 1534 } 1535 1536 static struct ceph_osd_request * 1537 rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops) 1538 { 1539 return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc, 1540 num_ops); 1541 } 1542 1543 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) 1544 { 1545 ceph_osdc_put_request(osd_req); 1546 } 1547 1548 static struct rbd_obj_request *rbd_obj_request_create(void) 1549 { 1550 struct rbd_obj_request *obj_request; 1551 1552 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); 1553 if (!obj_request) 1554 return NULL; 1555 1556 ceph_object_extent_init(&obj_request->ex); 1557 kref_init(&obj_request->kref); 1558 1559 dout("%s %p\n", __func__, obj_request); 1560 return obj_request; 1561 } 1562 1563 static void rbd_obj_request_destroy(struct kref *kref) 1564 { 1565 struct rbd_obj_request *obj_request; 1566 u32 i; 1567 1568 obj_request = container_of(kref, struct rbd_obj_request, kref); 1569 1570 dout("%s: obj %p\n", __func__, obj_request); 1571 1572 if (obj_request->osd_req) 1573 rbd_osd_req_destroy(obj_request->osd_req); 1574 1575 switch (obj_request->img_request->data_type) { 1576 case OBJ_REQUEST_NODATA: 1577 case OBJ_REQUEST_BIO: 1578 case OBJ_REQUEST_BVECS: 1579 break; /* Nothing to do */ 1580 case OBJ_REQUEST_OWN_BVECS: 1581 kfree(obj_request->bvec_pos.bvecs); 1582 break; 1583 default: 1584 BUG(); 1585 } 1586 1587 kfree(obj_request->img_extents); 1588 if (obj_request->copyup_bvecs) { 1589 for (i = 0; i < obj_request->copyup_bvec_count; i++) { 1590 if (obj_request->copyup_bvecs[i].bv_page) 1591 __free_page(obj_request->copyup_bvecs[i].bv_page); 1592 } 1593 kfree(obj_request->copyup_bvecs); 1594 } 1595 1596 kmem_cache_free(rbd_obj_request_cache, obj_request); 1597 } 1598 1599 /* It's OK to call this for a device with no parent */ 1600 1601 static void rbd_spec_put(struct rbd_spec *spec); 1602 static void rbd_dev_unparent(struct rbd_device *rbd_dev) 1603 { 1604 rbd_dev_remove_parent(rbd_dev); 1605 rbd_spec_put(rbd_dev->parent_spec); 1606 rbd_dev->parent_spec = NULL; 1607 rbd_dev->parent_overlap = 0; 1608 } 1609 1610 /* 1611 * Parent image reference counting is used to determine when an 1612 * image's parent fields can be safely torn down--after there are no 1613 * more in-flight requests to the parent image. When the last 1614 * reference is dropped, cleaning them up is safe. 1615 */ 1616 static void rbd_dev_parent_put(struct rbd_device *rbd_dev) 1617 { 1618 int counter; 1619 1620 if (!rbd_dev->parent_spec) 1621 return; 1622 1623 counter = atomic_dec_return_safe(&rbd_dev->parent_ref); 1624 if (counter > 0) 1625 return; 1626 1627 /* Last reference; clean up parent data structures */ 1628 1629 if (!counter) 1630 rbd_dev_unparent(rbd_dev); 1631 else 1632 rbd_warn(rbd_dev, "parent reference underflow"); 1633 } 1634 1635 /* 1636 * If an image has a non-zero parent overlap, get a reference to its 1637 * parent. 1638 * 1639 * Returns true if the rbd device has a parent with a non-zero 1640 * overlap and a reference for it was successfully taken, or 1641 * false otherwise. 1642 */ 1643 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev) 1644 { 1645 int counter = 0; 1646 1647 if (!rbd_dev->parent_spec) 1648 return false; 1649 1650 down_read(&rbd_dev->header_rwsem); 1651 if (rbd_dev->parent_overlap) 1652 counter = atomic_inc_return_safe(&rbd_dev->parent_ref); 1653 up_read(&rbd_dev->header_rwsem); 1654 1655 if (counter < 0) 1656 rbd_warn(rbd_dev, "parent reference overflow"); 1657 1658 return counter > 0; 1659 } 1660 1661 /* 1662 * Caller is responsible for filling in the list of object requests 1663 * that comprises the image request, and the Linux request pointer 1664 * (if there is one). 1665 */ 1666 static struct rbd_img_request *rbd_img_request_create( 1667 struct rbd_device *rbd_dev, 1668 enum obj_operation_type op_type, 1669 struct ceph_snap_context *snapc) 1670 { 1671 struct rbd_img_request *img_request; 1672 1673 img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO); 1674 if (!img_request) 1675 return NULL; 1676 1677 img_request->rbd_dev = rbd_dev; 1678 img_request->op_type = op_type; 1679 if (!rbd_img_is_write(img_request)) 1680 img_request->snap_id = rbd_dev->spec->snap_id; 1681 else 1682 img_request->snapc = snapc; 1683 1684 if (rbd_dev_parent_get(rbd_dev)) 1685 img_request_layered_set(img_request); 1686 1687 spin_lock_init(&img_request->completion_lock); 1688 INIT_LIST_HEAD(&img_request->object_extents); 1689 kref_init(&img_request->kref); 1690 1691 dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev, 1692 obj_op_name(op_type), img_request); 1693 return img_request; 1694 } 1695 1696 static void rbd_img_request_destroy(struct kref *kref) 1697 { 1698 struct rbd_img_request *img_request; 1699 struct rbd_obj_request *obj_request; 1700 struct rbd_obj_request *next_obj_request; 1701 1702 img_request = container_of(kref, struct rbd_img_request, kref); 1703 1704 dout("%s: img %p\n", __func__, img_request); 1705 1706 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1707 rbd_img_obj_request_del(img_request, obj_request); 1708 1709 if (img_request_layered_test(img_request)) { 1710 img_request_layered_clear(img_request); 1711 rbd_dev_parent_put(img_request->rbd_dev); 1712 } 1713 1714 if (rbd_img_is_write(img_request)) 1715 ceph_put_snap_context(img_request->snapc); 1716 1717 kmem_cache_free(rbd_img_request_cache, img_request); 1718 } 1719 1720 static void prune_extents(struct ceph_file_extent *img_extents, 1721 u32 *num_img_extents, u64 overlap) 1722 { 1723 u32 cnt = *num_img_extents; 1724 1725 /* drop extents completely beyond the overlap */ 1726 while (cnt && img_extents[cnt - 1].fe_off >= overlap) 1727 cnt--; 1728 1729 if (cnt) { 1730 struct ceph_file_extent *ex = &img_extents[cnt - 1]; 1731 1732 /* trim final overlapping extent */ 1733 if (ex->fe_off + ex->fe_len > overlap) 1734 ex->fe_len = overlap - ex->fe_off; 1735 } 1736 1737 *num_img_extents = cnt; 1738 } 1739 1740 /* 1741 * Determine the byte range(s) covered by either just the object extent 1742 * or the entire object in the parent image. 1743 */ 1744 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req, 1745 bool entire) 1746 { 1747 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1748 int ret; 1749 1750 if (!rbd_dev->parent_overlap) 1751 return 0; 1752 1753 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno, 1754 entire ? 0 : obj_req->ex.oe_off, 1755 entire ? rbd_dev->layout.object_size : 1756 obj_req->ex.oe_len, 1757 &obj_req->img_extents, 1758 &obj_req->num_img_extents); 1759 if (ret) 1760 return ret; 1761 1762 prune_extents(obj_req->img_extents, &obj_req->num_img_extents, 1763 rbd_dev->parent_overlap); 1764 return 0; 1765 } 1766 1767 static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) 1768 { 1769 switch (obj_req->img_request->data_type) { 1770 case OBJ_REQUEST_BIO: 1771 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which, 1772 &obj_req->bio_pos, 1773 obj_req->ex.oe_len); 1774 break; 1775 case OBJ_REQUEST_BVECS: 1776 case OBJ_REQUEST_OWN_BVECS: 1777 rbd_assert(obj_req->bvec_pos.iter.bi_size == 1778 obj_req->ex.oe_len); 1779 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count); 1780 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which, 1781 &obj_req->bvec_pos); 1782 break; 1783 default: 1784 BUG(); 1785 } 1786 } 1787 1788 static int rbd_obj_setup_read(struct rbd_obj_request *obj_req) 1789 { 1790 obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1); 1791 if (!obj_req->osd_req) 1792 return -ENOMEM; 1793 1794 osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ, 1795 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); 1796 rbd_osd_req_setup_data(obj_req, 0); 1797 1798 rbd_osd_req_format_read(obj_req); 1799 return 0; 1800 } 1801 1802 static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, 1803 unsigned int which) 1804 { 1805 struct page **pages; 1806 1807 /* 1808 * The response data for a STAT call consists of: 1809 * le64 length; 1810 * struct { 1811 * le32 tv_sec; 1812 * le32 tv_nsec; 1813 * } mtime; 1814 */ 1815 pages = ceph_alloc_page_vector(1, GFP_NOIO); 1816 if (IS_ERR(pages)) 1817 return PTR_ERR(pages); 1818 1819 osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0); 1820 osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages, 1821 8 + sizeof(struct ceph_timespec), 1822 0, false, true); 1823 return 0; 1824 } 1825 1826 static int count_write_ops(struct rbd_obj_request *obj_req) 1827 { 1828 return 2; /* setallochint + write/writefull */ 1829 } 1830 1831 static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req, 1832 unsigned int which) 1833 { 1834 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1835 u16 opcode; 1836 1837 osd_req_op_alloc_hint_init(obj_req->osd_req, which++, 1838 rbd_dev->layout.object_size, 1839 rbd_dev->layout.object_size); 1840 1841 if (rbd_obj_is_entire(obj_req)) 1842 opcode = CEPH_OSD_OP_WRITEFULL; 1843 else 1844 opcode = CEPH_OSD_OP_WRITE; 1845 1846 osd_req_op_extent_init(obj_req->osd_req, which, opcode, 1847 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); 1848 rbd_osd_req_setup_data(obj_req, which++); 1849 1850 rbd_assert(which == obj_req->osd_req->r_num_ops); 1851 rbd_osd_req_format_write(obj_req); 1852 } 1853 1854 static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) 1855 { 1856 unsigned int num_osd_ops, which = 0; 1857 bool need_guard; 1858 int ret; 1859 1860 /* reverse map the entire object onto the parent */ 1861 ret = rbd_obj_calc_img_extents(obj_req, true); 1862 if (ret) 1863 return ret; 1864 1865 need_guard = rbd_obj_copyup_enabled(obj_req); 1866 num_osd_ops = need_guard + count_write_ops(obj_req); 1867 1868 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); 1869 if (!obj_req->osd_req) 1870 return -ENOMEM; 1871 1872 if (need_guard) { 1873 ret = __rbd_obj_setup_stat(obj_req, which++); 1874 if (ret) 1875 return ret; 1876 1877 obj_req->write_state = RBD_OBJ_WRITE_GUARD; 1878 } else { 1879 obj_req->write_state = RBD_OBJ_WRITE_FLAT; 1880 } 1881 1882 __rbd_obj_setup_write(obj_req, which); 1883 return 0; 1884 } 1885 1886 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req) 1887 { 1888 return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE : 1889 CEPH_OSD_OP_ZERO; 1890 } 1891 1892 static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) 1893 { 1894 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1895 u64 off = obj_req->ex.oe_off; 1896 u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len; 1897 int ret; 1898 1899 /* 1900 * Align the range to alloc_size boundary and punt on discards 1901 * that are too small to free up any space. 1902 * 1903 * alloc_size == object_size && is_tail() is a special case for 1904 * filestore with filestore_punch_hole = false, needed to allow 1905 * truncate (in addition to delete). 1906 */ 1907 if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size || 1908 !rbd_obj_is_tail(obj_req)) { 1909 off = round_up(off, rbd_dev->opts->alloc_size); 1910 next_off = round_down(next_off, rbd_dev->opts->alloc_size); 1911 if (off >= next_off) 1912 return 1; 1913 } 1914 1915 /* reverse map the entire object onto the parent */ 1916 ret = rbd_obj_calc_img_extents(obj_req, true); 1917 if (ret) 1918 return ret; 1919 1920 obj_req->osd_req = rbd_osd_req_create(obj_req, 1); 1921 if (!obj_req->osd_req) 1922 return -ENOMEM; 1923 1924 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) { 1925 osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0); 1926 } else { 1927 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__, 1928 obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len, 1929 off, next_off - off); 1930 osd_req_op_extent_init(obj_req->osd_req, 0, 1931 truncate_or_zero_opcode(obj_req), 1932 off, next_off - off, 0, 0); 1933 } 1934 1935 obj_req->write_state = RBD_OBJ_WRITE_FLAT; 1936 rbd_osd_req_format_write(obj_req); 1937 return 0; 1938 } 1939 1940 static int count_zeroout_ops(struct rbd_obj_request *obj_req) 1941 { 1942 int num_osd_ops; 1943 1944 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents && 1945 !rbd_obj_copyup_enabled(obj_req)) 1946 num_osd_ops = 2; /* create + truncate */ 1947 else 1948 num_osd_ops = 1; /* delete/truncate/zero */ 1949 1950 return num_osd_ops; 1951 } 1952 1953 static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req, 1954 unsigned int which) 1955 { 1956 u16 opcode; 1957 1958 if (rbd_obj_is_entire(obj_req)) { 1959 if (obj_req->num_img_extents) { 1960 if (!rbd_obj_copyup_enabled(obj_req)) 1961 osd_req_op_init(obj_req->osd_req, which++, 1962 CEPH_OSD_OP_CREATE, 0); 1963 opcode = CEPH_OSD_OP_TRUNCATE; 1964 } else { 1965 osd_req_op_init(obj_req->osd_req, which++, 1966 CEPH_OSD_OP_DELETE, 0); 1967 opcode = 0; 1968 } 1969 } else { 1970 opcode = truncate_or_zero_opcode(obj_req); 1971 } 1972 1973 if (opcode) 1974 osd_req_op_extent_init(obj_req->osd_req, which++, opcode, 1975 obj_req->ex.oe_off, obj_req->ex.oe_len, 1976 0, 0); 1977 1978 rbd_assert(which == obj_req->osd_req->r_num_ops); 1979 rbd_osd_req_format_write(obj_req); 1980 } 1981 1982 static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req) 1983 { 1984 unsigned int num_osd_ops, which = 0; 1985 bool need_guard; 1986 int ret; 1987 1988 /* reverse map the entire object onto the parent */ 1989 ret = rbd_obj_calc_img_extents(obj_req, true); 1990 if (ret) 1991 return ret; 1992 1993 need_guard = rbd_obj_copyup_enabled(obj_req); 1994 num_osd_ops = need_guard + count_zeroout_ops(obj_req); 1995 1996 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); 1997 if (!obj_req->osd_req) 1998 return -ENOMEM; 1999 2000 if (need_guard) { 2001 ret = __rbd_obj_setup_stat(obj_req, which++); 2002 if (ret) 2003 return ret; 2004 2005 obj_req->write_state = RBD_OBJ_WRITE_GUARD; 2006 } else { 2007 obj_req->write_state = RBD_OBJ_WRITE_FLAT; 2008 } 2009 2010 __rbd_obj_setup_zeroout(obj_req, which); 2011 return 0; 2012 } 2013 2014 /* 2015 * For each object request in @img_req, allocate an OSD request, add 2016 * individual OSD ops and prepare them for submission. The number of 2017 * OSD ops depends on op_type and the overlap point (if any). 2018 */ 2019 static int __rbd_img_fill_request(struct rbd_img_request *img_req) 2020 { 2021 struct rbd_obj_request *obj_req, *next_obj_req; 2022 int ret; 2023 2024 for_each_obj_request_safe(img_req, obj_req, next_obj_req) { 2025 switch (img_req->op_type) { 2026 case OBJ_OP_READ: 2027 ret = rbd_obj_setup_read(obj_req); 2028 break; 2029 case OBJ_OP_WRITE: 2030 ret = rbd_obj_setup_write(obj_req); 2031 break; 2032 case OBJ_OP_DISCARD: 2033 ret = rbd_obj_setup_discard(obj_req); 2034 break; 2035 case OBJ_OP_ZEROOUT: 2036 ret = rbd_obj_setup_zeroout(obj_req); 2037 break; 2038 default: 2039 BUG(); 2040 } 2041 if (ret < 0) 2042 return ret; 2043 if (ret > 0) { 2044 img_req->xferred += obj_req->ex.oe_len; 2045 img_req->pending_count--; 2046 rbd_img_obj_request_del(img_req, obj_req); 2047 continue; 2048 } 2049 2050 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); 2051 if (ret) 2052 return ret; 2053 } 2054 2055 return 0; 2056 } 2057 2058 union rbd_img_fill_iter { 2059 struct ceph_bio_iter bio_iter; 2060 struct ceph_bvec_iter bvec_iter; 2061 }; 2062 2063 struct rbd_img_fill_ctx { 2064 enum obj_request_type pos_type; 2065 union rbd_img_fill_iter *pos; 2066 union rbd_img_fill_iter iter; 2067 ceph_object_extent_fn_t set_pos_fn; 2068 ceph_object_extent_fn_t count_fn; 2069 ceph_object_extent_fn_t copy_fn; 2070 }; 2071 2072 static struct ceph_object_extent *alloc_object_extent(void *arg) 2073 { 2074 struct rbd_img_request *img_req = arg; 2075 struct rbd_obj_request *obj_req; 2076 2077 obj_req = rbd_obj_request_create(); 2078 if (!obj_req) 2079 return NULL; 2080 2081 rbd_img_obj_request_add(img_req, obj_req); 2082 return &obj_req->ex; 2083 } 2084 2085 /* 2086 * While su != os && sc == 1 is technically not fancy (it's the same 2087 * layout as su == os && sc == 1), we can't use the nocopy path for it 2088 * because ->set_pos_fn() should be called only once per object. 2089 * ceph_file_to_extents() invokes action_fn once per stripe unit, so 2090 * treat su != os && sc == 1 as fancy. 2091 */ 2092 static bool rbd_layout_is_fancy(struct ceph_file_layout *l) 2093 { 2094 return l->stripe_unit != l->object_size; 2095 } 2096 2097 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req, 2098 struct ceph_file_extent *img_extents, 2099 u32 num_img_extents, 2100 struct rbd_img_fill_ctx *fctx) 2101 { 2102 u32 i; 2103 int ret; 2104 2105 img_req->data_type = fctx->pos_type; 2106 2107 /* 2108 * Create object requests and set each object request's starting 2109 * position in the provided bio (list) or bio_vec array. 2110 */ 2111 fctx->iter = *fctx->pos; 2112 for (i = 0; i < num_img_extents; i++) { 2113 ret = ceph_file_to_extents(&img_req->rbd_dev->layout, 2114 img_extents[i].fe_off, 2115 img_extents[i].fe_len, 2116 &img_req->object_extents, 2117 alloc_object_extent, img_req, 2118 fctx->set_pos_fn, &fctx->iter); 2119 if (ret) 2120 return ret; 2121 } 2122 2123 return __rbd_img_fill_request(img_req); 2124 } 2125 2126 /* 2127 * Map a list of image extents to a list of object extents, create the 2128 * corresponding object requests (normally each to a different object, 2129 * but not always) and add them to @img_req. For each object request, 2130 * set up its data descriptor to point to the corresponding chunk(s) of 2131 * @fctx->pos data buffer. 2132 * 2133 * Because ceph_file_to_extents() will merge adjacent object extents 2134 * together, each object request's data descriptor may point to multiple 2135 * different chunks of @fctx->pos data buffer. 2136 * 2137 * @fctx->pos data buffer is assumed to be large enough. 2138 */ 2139 static int rbd_img_fill_request(struct rbd_img_request *img_req, 2140 struct ceph_file_extent *img_extents, 2141 u32 num_img_extents, 2142 struct rbd_img_fill_ctx *fctx) 2143 { 2144 struct rbd_device *rbd_dev = img_req->rbd_dev; 2145 struct rbd_obj_request *obj_req; 2146 u32 i; 2147 int ret; 2148 2149 if (fctx->pos_type == OBJ_REQUEST_NODATA || 2150 !rbd_layout_is_fancy(&rbd_dev->layout)) 2151 return rbd_img_fill_request_nocopy(img_req, img_extents, 2152 num_img_extents, fctx); 2153 2154 img_req->data_type = OBJ_REQUEST_OWN_BVECS; 2155 2156 /* 2157 * Create object requests and determine ->bvec_count for each object 2158 * request. Note that ->bvec_count sum over all object requests may 2159 * be greater than the number of bio_vecs in the provided bio (list) 2160 * or bio_vec array because when mapped, those bio_vecs can straddle 2161 * stripe unit boundaries. 2162 */ 2163 fctx->iter = *fctx->pos; 2164 for (i = 0; i < num_img_extents; i++) { 2165 ret = ceph_file_to_extents(&rbd_dev->layout, 2166 img_extents[i].fe_off, 2167 img_extents[i].fe_len, 2168 &img_req->object_extents, 2169 alloc_object_extent, img_req, 2170 fctx->count_fn, &fctx->iter); 2171 if (ret) 2172 return ret; 2173 } 2174 2175 for_each_obj_request(img_req, obj_req) { 2176 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count, 2177 sizeof(*obj_req->bvec_pos.bvecs), 2178 GFP_NOIO); 2179 if (!obj_req->bvec_pos.bvecs) 2180 return -ENOMEM; 2181 } 2182 2183 /* 2184 * Fill in each object request's private bio_vec array, splitting and 2185 * rearranging the provided bio_vecs in stripe unit chunks as needed. 2186 */ 2187 fctx->iter = *fctx->pos; 2188 for (i = 0; i < num_img_extents; i++) { 2189 ret = ceph_iterate_extents(&rbd_dev->layout, 2190 img_extents[i].fe_off, 2191 img_extents[i].fe_len, 2192 &img_req->object_extents, 2193 fctx->copy_fn, &fctx->iter); 2194 if (ret) 2195 return ret; 2196 } 2197 2198 return __rbd_img_fill_request(img_req); 2199 } 2200 2201 static int rbd_img_fill_nodata(struct rbd_img_request *img_req, 2202 u64 off, u64 len) 2203 { 2204 struct ceph_file_extent ex = { off, len }; 2205 union rbd_img_fill_iter dummy; 2206 struct rbd_img_fill_ctx fctx = { 2207 .pos_type = OBJ_REQUEST_NODATA, 2208 .pos = &dummy, 2209 }; 2210 2211 return rbd_img_fill_request(img_req, &ex, 1, &fctx); 2212 } 2213 2214 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) 2215 { 2216 struct rbd_obj_request *obj_req = 2217 container_of(ex, struct rbd_obj_request, ex); 2218 struct ceph_bio_iter *it = arg; 2219 2220 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2221 obj_req->bio_pos = *it; 2222 ceph_bio_iter_advance(it, bytes); 2223 } 2224 2225 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2226 { 2227 struct rbd_obj_request *obj_req = 2228 container_of(ex, struct rbd_obj_request, ex); 2229 struct ceph_bio_iter *it = arg; 2230 2231 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2232 ceph_bio_iter_advance_step(it, bytes, ({ 2233 obj_req->bvec_count++; 2234 })); 2235 2236 } 2237 2238 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2239 { 2240 struct rbd_obj_request *obj_req = 2241 container_of(ex, struct rbd_obj_request, ex); 2242 struct ceph_bio_iter *it = arg; 2243 2244 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2245 ceph_bio_iter_advance_step(it, bytes, ({ 2246 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; 2247 obj_req->bvec_pos.iter.bi_size += bv.bv_len; 2248 })); 2249 } 2250 2251 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req, 2252 struct ceph_file_extent *img_extents, 2253 u32 num_img_extents, 2254 struct ceph_bio_iter *bio_pos) 2255 { 2256 struct rbd_img_fill_ctx fctx = { 2257 .pos_type = OBJ_REQUEST_BIO, 2258 .pos = (union rbd_img_fill_iter *)bio_pos, 2259 .set_pos_fn = set_bio_pos, 2260 .count_fn = count_bio_bvecs, 2261 .copy_fn = copy_bio_bvecs, 2262 }; 2263 2264 return rbd_img_fill_request(img_req, img_extents, num_img_extents, 2265 &fctx); 2266 } 2267 2268 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req, 2269 u64 off, u64 len, struct bio *bio) 2270 { 2271 struct ceph_file_extent ex = { off, len }; 2272 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter }; 2273 2274 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it); 2275 } 2276 2277 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) 2278 { 2279 struct rbd_obj_request *obj_req = 2280 container_of(ex, struct rbd_obj_request, ex); 2281 struct ceph_bvec_iter *it = arg; 2282 2283 obj_req->bvec_pos = *it; 2284 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes); 2285 ceph_bvec_iter_advance(it, bytes); 2286 } 2287 2288 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2289 { 2290 struct rbd_obj_request *obj_req = 2291 container_of(ex, struct rbd_obj_request, ex); 2292 struct ceph_bvec_iter *it = arg; 2293 2294 ceph_bvec_iter_advance_step(it, bytes, ({ 2295 obj_req->bvec_count++; 2296 })); 2297 } 2298 2299 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2300 { 2301 struct rbd_obj_request *obj_req = 2302 container_of(ex, struct rbd_obj_request, ex); 2303 struct ceph_bvec_iter *it = arg; 2304 2305 ceph_bvec_iter_advance_step(it, bytes, ({ 2306 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; 2307 obj_req->bvec_pos.iter.bi_size += bv.bv_len; 2308 })); 2309 } 2310 2311 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, 2312 struct ceph_file_extent *img_extents, 2313 u32 num_img_extents, 2314 struct ceph_bvec_iter *bvec_pos) 2315 { 2316 struct rbd_img_fill_ctx fctx = { 2317 .pos_type = OBJ_REQUEST_BVECS, 2318 .pos = (union rbd_img_fill_iter *)bvec_pos, 2319 .set_pos_fn = set_bvec_pos, 2320 .count_fn = count_bvecs, 2321 .copy_fn = copy_bvecs, 2322 }; 2323 2324 return rbd_img_fill_request(img_req, img_extents, num_img_extents, 2325 &fctx); 2326 } 2327 2328 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, 2329 struct ceph_file_extent *img_extents, 2330 u32 num_img_extents, 2331 struct bio_vec *bvecs) 2332 { 2333 struct ceph_bvec_iter it = { 2334 .bvecs = bvecs, 2335 .iter = { .bi_size = ceph_file_extents_bytes(img_extents, 2336 num_img_extents) }, 2337 }; 2338 2339 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents, 2340 &it); 2341 } 2342 2343 static void rbd_img_request_submit(struct rbd_img_request *img_request) 2344 { 2345 struct rbd_obj_request *obj_request; 2346 2347 dout("%s: img %p\n", __func__, img_request); 2348 2349 rbd_img_request_get(img_request); 2350 for_each_obj_request(img_request, obj_request) 2351 rbd_obj_request_submit(obj_request); 2352 2353 rbd_img_request_put(img_request); 2354 } 2355 2356 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) 2357 { 2358 struct rbd_img_request *img_req = obj_req->img_request; 2359 struct rbd_img_request *child_img_req; 2360 int ret; 2361 2362 child_img_req = rbd_img_request_create(img_req->rbd_dev->parent, 2363 OBJ_OP_READ, NULL); 2364 if (!child_img_req) 2365 return -ENOMEM; 2366 2367 __set_bit(IMG_REQ_CHILD, &child_img_req->flags); 2368 child_img_req->obj_request = obj_req; 2369 2370 if (!rbd_img_is_write(img_req)) { 2371 switch (img_req->data_type) { 2372 case OBJ_REQUEST_BIO: 2373 ret = __rbd_img_fill_from_bio(child_img_req, 2374 obj_req->img_extents, 2375 obj_req->num_img_extents, 2376 &obj_req->bio_pos); 2377 break; 2378 case OBJ_REQUEST_BVECS: 2379 case OBJ_REQUEST_OWN_BVECS: 2380 ret = __rbd_img_fill_from_bvecs(child_img_req, 2381 obj_req->img_extents, 2382 obj_req->num_img_extents, 2383 &obj_req->bvec_pos); 2384 break; 2385 default: 2386 BUG(); 2387 } 2388 } else { 2389 ret = rbd_img_fill_from_bvecs(child_img_req, 2390 obj_req->img_extents, 2391 obj_req->num_img_extents, 2392 obj_req->copyup_bvecs); 2393 } 2394 if (ret) { 2395 rbd_img_request_put(child_img_req); 2396 return ret; 2397 } 2398 2399 rbd_img_request_submit(child_img_req); 2400 return 0; 2401 } 2402 2403 static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req) 2404 { 2405 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2406 int ret; 2407 2408 if (obj_req->result == -ENOENT && 2409 rbd_dev->parent_overlap && !obj_req->tried_parent) { 2410 /* reverse map this object extent onto the parent */ 2411 ret = rbd_obj_calc_img_extents(obj_req, false); 2412 if (ret) { 2413 obj_req->result = ret; 2414 return true; 2415 } 2416 2417 if (obj_req->num_img_extents) { 2418 obj_req->tried_parent = true; 2419 ret = rbd_obj_read_from_parent(obj_req); 2420 if (ret) { 2421 obj_req->result = ret; 2422 return true; 2423 } 2424 return false; 2425 } 2426 } 2427 2428 /* 2429 * -ENOENT means a hole in the image -- zero-fill the entire 2430 * length of the request. A short read also implies zero-fill 2431 * to the end of the request. In both cases we update xferred 2432 * count to indicate the whole request was satisfied. 2433 */ 2434 if (obj_req->result == -ENOENT || 2435 (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) { 2436 rbd_assert(!obj_req->xferred || !obj_req->result); 2437 rbd_obj_zero_range(obj_req, obj_req->xferred, 2438 obj_req->ex.oe_len - obj_req->xferred); 2439 obj_req->result = 0; 2440 obj_req->xferred = obj_req->ex.oe_len; 2441 } 2442 2443 return true; 2444 } 2445 2446 /* 2447 * copyup_bvecs pages are never highmem pages 2448 */ 2449 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) 2450 { 2451 struct ceph_bvec_iter it = { 2452 .bvecs = bvecs, 2453 .iter = { .bi_size = bytes }, 2454 }; 2455 2456 ceph_bvec_iter_advance_step(&it, bytes, ({ 2457 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0, 2458 bv.bv_len)) 2459 return false; 2460 })); 2461 return true; 2462 } 2463 2464 #define MODS_ONLY U32_MAX 2465 2466 static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req, 2467 u32 bytes) 2468 { 2469 int ret; 2470 2471 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); 2472 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); 2473 rbd_assert(bytes > 0 && bytes != MODS_ONLY); 2474 rbd_osd_req_destroy(obj_req->osd_req); 2475 2476 obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1); 2477 if (!obj_req->osd_req) 2478 return -ENOMEM; 2479 2480 ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup"); 2481 if (ret) 2482 return ret; 2483 2484 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, 2485 obj_req->copyup_bvecs, 2486 obj_req->copyup_bvec_count, 2487 bytes); 2488 rbd_osd_req_format_write(obj_req); 2489 2490 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); 2491 if (ret) 2492 return ret; 2493 2494 rbd_obj_request_submit(obj_req); 2495 return 0; 2496 } 2497 2498 static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes) 2499 { 2500 struct rbd_img_request *img_req = obj_req->img_request; 2501 unsigned int num_osd_ops = (bytes != MODS_ONLY); 2502 unsigned int which = 0; 2503 int ret; 2504 2505 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); 2506 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT || 2507 obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL); 2508 rbd_osd_req_destroy(obj_req->osd_req); 2509 2510 switch (img_req->op_type) { 2511 case OBJ_OP_WRITE: 2512 num_osd_ops += count_write_ops(obj_req); 2513 break; 2514 case OBJ_OP_ZEROOUT: 2515 num_osd_ops += count_zeroout_ops(obj_req); 2516 break; 2517 default: 2518 BUG(); 2519 } 2520 2521 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); 2522 if (!obj_req->osd_req) 2523 return -ENOMEM; 2524 2525 if (bytes != MODS_ONLY) { 2526 ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd", 2527 "copyup"); 2528 if (ret) 2529 return ret; 2530 2531 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++, 2532 obj_req->copyup_bvecs, 2533 obj_req->copyup_bvec_count, 2534 bytes); 2535 } 2536 2537 switch (img_req->op_type) { 2538 case OBJ_OP_WRITE: 2539 __rbd_obj_setup_write(obj_req, which); 2540 break; 2541 case OBJ_OP_ZEROOUT: 2542 __rbd_obj_setup_zeroout(obj_req, which); 2543 break; 2544 default: 2545 BUG(); 2546 } 2547 2548 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); 2549 if (ret) 2550 return ret; 2551 2552 rbd_obj_request_submit(obj_req); 2553 return 0; 2554 } 2555 2556 static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) 2557 { 2558 /* 2559 * Only send non-zero copyup data to save some I/O and network 2560 * bandwidth -- zero copyup data is equivalent to the object not 2561 * existing. 2562 */ 2563 if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) { 2564 dout("%s obj_req %p detected zeroes\n", __func__, obj_req); 2565 bytes = 0; 2566 } 2567 2568 if (obj_req->img_request->snapc->num_snaps && bytes > 0) { 2569 /* 2570 * Send a copyup request with an empty snapshot context to 2571 * deep-copyup the object through all existing snapshots. 2572 * A second request with the current snapshot context will be 2573 * sent for the actual modification. 2574 */ 2575 obj_req->write_state = RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC; 2576 return rbd_obj_issue_copyup_empty_snapc(obj_req, bytes); 2577 } 2578 2579 obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; 2580 return rbd_obj_issue_copyup_ops(obj_req, bytes); 2581 } 2582 2583 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) 2584 { 2585 u32 i; 2586 2587 rbd_assert(!obj_req->copyup_bvecs); 2588 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap); 2589 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count, 2590 sizeof(*obj_req->copyup_bvecs), 2591 GFP_NOIO); 2592 if (!obj_req->copyup_bvecs) 2593 return -ENOMEM; 2594 2595 for (i = 0; i < obj_req->copyup_bvec_count; i++) { 2596 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE); 2597 2598 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO); 2599 if (!obj_req->copyup_bvecs[i].bv_page) 2600 return -ENOMEM; 2601 2602 obj_req->copyup_bvecs[i].bv_offset = 0; 2603 obj_req->copyup_bvecs[i].bv_len = len; 2604 obj_overlap -= len; 2605 } 2606 2607 rbd_assert(!obj_overlap); 2608 return 0; 2609 } 2610 2611 static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) 2612 { 2613 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2614 int ret; 2615 2616 rbd_assert(obj_req->num_img_extents); 2617 prune_extents(obj_req->img_extents, &obj_req->num_img_extents, 2618 rbd_dev->parent_overlap); 2619 if (!obj_req->num_img_extents) { 2620 /* 2621 * The overlap has become 0 (most likely because the 2622 * image has been flattened). Re-submit the original write 2623 * request -- pass MODS_ONLY since the copyup isn't needed 2624 * anymore. 2625 */ 2626 obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; 2627 return rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY); 2628 } 2629 2630 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req)); 2631 if (ret) 2632 return ret; 2633 2634 obj_req->write_state = RBD_OBJ_WRITE_READ_FROM_PARENT; 2635 return rbd_obj_read_from_parent(obj_req); 2636 } 2637 2638 static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req) 2639 { 2640 int ret; 2641 2642 switch (obj_req->write_state) { 2643 case RBD_OBJ_WRITE_GUARD: 2644 rbd_assert(!obj_req->xferred); 2645 if (obj_req->result == -ENOENT) { 2646 /* 2647 * The target object doesn't exist. Read the data for 2648 * the entire target object up to the overlap point (if 2649 * any) from the parent, so we can use it for a copyup. 2650 */ 2651 ret = rbd_obj_handle_write_guard(obj_req); 2652 if (ret) { 2653 obj_req->result = ret; 2654 return true; 2655 } 2656 return false; 2657 } 2658 /* fall through */ 2659 case RBD_OBJ_WRITE_FLAT: 2660 case RBD_OBJ_WRITE_COPYUP_OPS: 2661 if (!obj_req->result) 2662 /* 2663 * There is no such thing as a successful short 2664 * write -- indicate the whole request was satisfied. 2665 */ 2666 obj_req->xferred = obj_req->ex.oe_len; 2667 return true; 2668 case RBD_OBJ_WRITE_READ_FROM_PARENT: 2669 if (obj_req->result) 2670 return true; 2671 2672 rbd_assert(obj_req->xferred); 2673 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred); 2674 if (ret) { 2675 obj_req->result = ret; 2676 obj_req->xferred = 0; 2677 return true; 2678 } 2679 return false; 2680 case RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC: 2681 if (obj_req->result) 2682 return true; 2683 2684 obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; 2685 ret = rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY); 2686 if (ret) { 2687 obj_req->result = ret; 2688 return true; 2689 } 2690 return false; 2691 default: 2692 BUG(); 2693 } 2694 } 2695 2696 /* 2697 * Returns true if @obj_req is completed, or false otherwise. 2698 */ 2699 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req) 2700 { 2701 switch (obj_req->img_request->op_type) { 2702 case OBJ_OP_READ: 2703 return rbd_obj_handle_read(obj_req); 2704 case OBJ_OP_WRITE: 2705 return rbd_obj_handle_write(obj_req); 2706 case OBJ_OP_DISCARD: 2707 case OBJ_OP_ZEROOUT: 2708 if (rbd_obj_handle_write(obj_req)) { 2709 /* 2710 * Hide -ENOENT from delete/truncate/zero -- discarding 2711 * a non-existent object is not a problem. 2712 */ 2713 if (obj_req->result == -ENOENT) { 2714 obj_req->result = 0; 2715 obj_req->xferred = obj_req->ex.oe_len; 2716 } 2717 return true; 2718 } 2719 return false; 2720 default: 2721 BUG(); 2722 } 2723 } 2724 2725 static void rbd_obj_end_request(struct rbd_obj_request *obj_req) 2726 { 2727 struct rbd_img_request *img_req = obj_req->img_request; 2728 2729 rbd_assert((!obj_req->result && 2730 obj_req->xferred == obj_req->ex.oe_len) || 2731 (obj_req->result < 0 && !obj_req->xferred)); 2732 if (!obj_req->result) { 2733 img_req->xferred += obj_req->xferred; 2734 return; 2735 } 2736 2737 rbd_warn(img_req->rbd_dev, 2738 "%s at objno %llu %llu~%llu result %d xferred %llu", 2739 obj_op_name(img_req->op_type), obj_req->ex.oe_objno, 2740 obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result, 2741 obj_req->xferred); 2742 if (!img_req->result) { 2743 img_req->result = obj_req->result; 2744 img_req->xferred = 0; 2745 } 2746 } 2747 2748 static void rbd_img_end_child_request(struct rbd_img_request *img_req) 2749 { 2750 struct rbd_obj_request *obj_req = img_req->obj_request; 2751 2752 rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags)); 2753 rbd_assert((!img_req->result && 2754 img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) || 2755 (img_req->result < 0 && !img_req->xferred)); 2756 2757 obj_req->result = img_req->result; 2758 obj_req->xferred = img_req->xferred; 2759 rbd_img_request_put(img_req); 2760 } 2761 2762 static void rbd_img_end_request(struct rbd_img_request *img_req) 2763 { 2764 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); 2765 rbd_assert((!img_req->result && 2766 img_req->xferred == blk_rq_bytes(img_req->rq)) || 2767 (img_req->result < 0 && !img_req->xferred)); 2768 2769 blk_mq_end_request(img_req->rq, 2770 errno_to_blk_status(img_req->result)); 2771 rbd_img_request_put(img_req); 2772 } 2773 2774 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req) 2775 { 2776 struct rbd_img_request *img_req; 2777 2778 again: 2779 if (!__rbd_obj_handle_request(obj_req)) 2780 return; 2781 2782 img_req = obj_req->img_request; 2783 spin_lock(&img_req->completion_lock); 2784 rbd_obj_end_request(obj_req); 2785 rbd_assert(img_req->pending_count); 2786 if (--img_req->pending_count) { 2787 spin_unlock(&img_req->completion_lock); 2788 return; 2789 } 2790 2791 spin_unlock(&img_req->completion_lock); 2792 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) { 2793 obj_req = img_req->obj_request; 2794 rbd_img_end_child_request(img_req); 2795 goto again; 2796 } 2797 rbd_img_end_request(img_req); 2798 } 2799 2800 static const struct rbd_client_id rbd_empty_cid; 2801 2802 static bool rbd_cid_equal(const struct rbd_client_id *lhs, 2803 const struct rbd_client_id *rhs) 2804 { 2805 return lhs->gid == rhs->gid && lhs->handle == rhs->handle; 2806 } 2807 2808 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev) 2809 { 2810 struct rbd_client_id cid; 2811 2812 mutex_lock(&rbd_dev->watch_mutex); 2813 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client); 2814 cid.handle = rbd_dev->watch_cookie; 2815 mutex_unlock(&rbd_dev->watch_mutex); 2816 return cid; 2817 } 2818 2819 /* 2820 * lock_rwsem must be held for write 2821 */ 2822 static void rbd_set_owner_cid(struct rbd_device *rbd_dev, 2823 const struct rbd_client_id *cid) 2824 { 2825 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev, 2826 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle, 2827 cid->gid, cid->handle); 2828 rbd_dev->owner_cid = *cid; /* struct */ 2829 } 2830 2831 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf) 2832 { 2833 mutex_lock(&rbd_dev->watch_mutex); 2834 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie); 2835 mutex_unlock(&rbd_dev->watch_mutex); 2836 } 2837 2838 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) 2839 { 2840 struct rbd_client_id cid = rbd_get_cid(rbd_dev); 2841 2842 strcpy(rbd_dev->lock_cookie, cookie); 2843 rbd_set_owner_cid(rbd_dev, &cid); 2844 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); 2845 } 2846 2847 /* 2848 * lock_rwsem must be held for write 2849 */ 2850 static int rbd_lock(struct rbd_device *rbd_dev) 2851 { 2852 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2853 char cookie[32]; 2854 int ret; 2855 2856 WARN_ON(__rbd_is_lock_owner(rbd_dev) || 2857 rbd_dev->lock_cookie[0] != '\0'); 2858 2859 format_lock_cookie(rbd_dev, cookie); 2860 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 2861 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, 2862 RBD_LOCK_TAG, "", 0); 2863 if (ret) 2864 return ret; 2865 2866 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; 2867 __rbd_lock(rbd_dev, cookie); 2868 return 0; 2869 } 2870 2871 /* 2872 * lock_rwsem must be held for write 2873 */ 2874 static void rbd_unlock(struct rbd_device *rbd_dev) 2875 { 2876 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2877 int ret; 2878 2879 WARN_ON(!__rbd_is_lock_owner(rbd_dev) || 2880 rbd_dev->lock_cookie[0] == '\0'); 2881 2882 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 2883 RBD_LOCK_NAME, rbd_dev->lock_cookie); 2884 if (ret && ret != -ENOENT) 2885 rbd_warn(rbd_dev, "failed to unlock: %d", ret); 2886 2887 /* treat errors as the image is unlocked */ 2888 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 2889 rbd_dev->lock_cookie[0] = '\0'; 2890 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 2891 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); 2892 } 2893 2894 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, 2895 enum rbd_notify_op notify_op, 2896 struct page ***preply_pages, 2897 size_t *preply_len) 2898 { 2899 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2900 struct rbd_client_id cid = rbd_get_cid(rbd_dev); 2901 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN]; 2902 int buf_size = sizeof(buf); 2903 void *p = buf; 2904 2905 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); 2906 2907 /* encode *LockPayload NotifyMessage (op + ClientId) */ 2908 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN); 2909 ceph_encode_32(&p, notify_op); 2910 ceph_encode_64(&p, cid.gid); 2911 ceph_encode_64(&p, cid.handle); 2912 2913 return ceph_osdc_notify(osdc, &rbd_dev->header_oid, 2914 &rbd_dev->header_oloc, buf, buf_size, 2915 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len); 2916 } 2917 2918 static void rbd_notify_op_lock(struct rbd_device *rbd_dev, 2919 enum rbd_notify_op notify_op) 2920 { 2921 struct page **reply_pages; 2922 size_t reply_len; 2923 2924 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len); 2925 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); 2926 } 2927 2928 static void rbd_notify_acquired_lock(struct work_struct *work) 2929 { 2930 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 2931 acquired_lock_work); 2932 2933 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK); 2934 } 2935 2936 static void rbd_notify_released_lock(struct work_struct *work) 2937 { 2938 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 2939 released_lock_work); 2940 2941 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK); 2942 } 2943 2944 static int rbd_request_lock(struct rbd_device *rbd_dev) 2945 { 2946 struct page **reply_pages; 2947 size_t reply_len; 2948 bool lock_owner_responded = false; 2949 int ret; 2950 2951 dout("%s rbd_dev %p\n", __func__, rbd_dev); 2952 2953 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, 2954 &reply_pages, &reply_len); 2955 if (ret && ret != -ETIMEDOUT) { 2956 rbd_warn(rbd_dev, "failed to request lock: %d", ret); 2957 goto out; 2958 } 2959 2960 if (reply_len > 0 && reply_len <= PAGE_SIZE) { 2961 void *p = page_address(reply_pages[0]); 2962 void *const end = p + reply_len; 2963 u32 n; 2964 2965 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */ 2966 while (n--) { 2967 u8 struct_v; 2968 u32 len; 2969 2970 ceph_decode_need(&p, end, 8 + 8, e_inval); 2971 p += 8 + 8; /* skip gid and cookie */ 2972 2973 ceph_decode_32_safe(&p, end, len, e_inval); 2974 if (!len) 2975 continue; 2976 2977 if (lock_owner_responded) { 2978 rbd_warn(rbd_dev, 2979 "duplicate lock owners detected"); 2980 ret = -EIO; 2981 goto out; 2982 } 2983 2984 lock_owner_responded = true; 2985 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage", 2986 &struct_v, &len); 2987 if (ret) { 2988 rbd_warn(rbd_dev, 2989 "failed to decode ResponseMessage: %d", 2990 ret); 2991 goto e_inval; 2992 } 2993 2994 ret = ceph_decode_32(&p); 2995 } 2996 } 2997 2998 if (!lock_owner_responded) { 2999 rbd_warn(rbd_dev, "no lock owners detected"); 3000 ret = -ETIMEDOUT; 3001 } 3002 3003 out: 3004 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); 3005 return ret; 3006 3007 e_inval: 3008 ret = -EINVAL; 3009 goto out; 3010 } 3011 3012 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) 3013 { 3014 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); 3015 3016 cancel_delayed_work(&rbd_dev->lock_dwork); 3017 if (wake_all) 3018 wake_up_all(&rbd_dev->lock_waitq); 3019 else 3020 wake_up(&rbd_dev->lock_waitq); 3021 } 3022 3023 static int get_lock_owner_info(struct rbd_device *rbd_dev, 3024 struct ceph_locker **lockers, u32 *num_lockers) 3025 { 3026 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3027 u8 lock_type; 3028 char *lock_tag; 3029 int ret; 3030 3031 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3032 3033 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, 3034 &rbd_dev->header_oloc, RBD_LOCK_NAME, 3035 &lock_type, &lock_tag, lockers, num_lockers); 3036 if (ret) 3037 return ret; 3038 3039 if (*num_lockers == 0) { 3040 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); 3041 goto out; 3042 } 3043 3044 if (strcmp(lock_tag, RBD_LOCK_TAG)) { 3045 rbd_warn(rbd_dev, "locked by external mechanism, tag %s", 3046 lock_tag); 3047 ret = -EBUSY; 3048 goto out; 3049 } 3050 3051 if (lock_type == CEPH_CLS_LOCK_SHARED) { 3052 rbd_warn(rbd_dev, "shared lock type detected"); 3053 ret = -EBUSY; 3054 goto out; 3055 } 3056 3057 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, 3058 strlen(RBD_LOCK_COOKIE_PREFIX))) { 3059 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", 3060 (*lockers)[0].id.cookie); 3061 ret = -EBUSY; 3062 goto out; 3063 } 3064 3065 out: 3066 kfree(lock_tag); 3067 return ret; 3068 } 3069 3070 static int find_watcher(struct rbd_device *rbd_dev, 3071 const struct ceph_locker *locker) 3072 { 3073 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3074 struct ceph_watch_item *watchers; 3075 u32 num_watchers; 3076 u64 cookie; 3077 int i; 3078 int ret; 3079 3080 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, 3081 &rbd_dev->header_oloc, &watchers, 3082 &num_watchers); 3083 if (ret) 3084 return ret; 3085 3086 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); 3087 for (i = 0; i < num_watchers; i++) { 3088 if (!memcmp(&watchers[i].addr, &locker->info.addr, 3089 sizeof(locker->info.addr)) && 3090 watchers[i].cookie == cookie) { 3091 struct rbd_client_id cid = { 3092 .gid = le64_to_cpu(watchers[i].name.num), 3093 .handle = cookie, 3094 }; 3095 3096 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__, 3097 rbd_dev, cid.gid, cid.handle); 3098 rbd_set_owner_cid(rbd_dev, &cid); 3099 ret = 1; 3100 goto out; 3101 } 3102 } 3103 3104 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev); 3105 ret = 0; 3106 out: 3107 kfree(watchers); 3108 return ret; 3109 } 3110 3111 /* 3112 * lock_rwsem must be held for write 3113 */ 3114 static int rbd_try_lock(struct rbd_device *rbd_dev) 3115 { 3116 struct ceph_client *client = rbd_dev->rbd_client->client; 3117 struct ceph_locker *lockers; 3118 u32 num_lockers; 3119 int ret; 3120 3121 for (;;) { 3122 ret = rbd_lock(rbd_dev); 3123 if (ret != -EBUSY) 3124 return ret; 3125 3126 /* determine if the current lock holder is still alive */ 3127 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); 3128 if (ret) 3129 return ret; 3130 3131 if (num_lockers == 0) 3132 goto again; 3133 3134 ret = find_watcher(rbd_dev, lockers); 3135 if (ret) { 3136 if (ret > 0) 3137 ret = 0; /* have to request lock */ 3138 goto out; 3139 } 3140 3141 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", 3142 ENTITY_NAME(lockers[0].id.name)); 3143 3144 ret = ceph_monc_blacklist_add(&client->monc, 3145 &lockers[0].info.addr); 3146 if (ret) { 3147 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", 3148 ENTITY_NAME(lockers[0].id.name), ret); 3149 goto out; 3150 } 3151 3152 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, 3153 &rbd_dev->header_oloc, RBD_LOCK_NAME, 3154 lockers[0].id.cookie, 3155 &lockers[0].id.name); 3156 if (ret && ret != -ENOENT) 3157 goto out; 3158 3159 again: 3160 ceph_free_lockers(lockers, num_lockers); 3161 } 3162 3163 out: 3164 ceph_free_lockers(lockers, num_lockers); 3165 return ret; 3166 } 3167 3168 /* 3169 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED 3170 */ 3171 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, 3172 int *pret) 3173 { 3174 enum rbd_lock_state lock_state; 3175 3176 down_read(&rbd_dev->lock_rwsem); 3177 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 3178 rbd_dev->lock_state); 3179 if (__rbd_is_lock_owner(rbd_dev)) { 3180 lock_state = rbd_dev->lock_state; 3181 up_read(&rbd_dev->lock_rwsem); 3182 return lock_state; 3183 } 3184 3185 up_read(&rbd_dev->lock_rwsem); 3186 down_write(&rbd_dev->lock_rwsem); 3187 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 3188 rbd_dev->lock_state); 3189 if (!__rbd_is_lock_owner(rbd_dev)) { 3190 *pret = rbd_try_lock(rbd_dev); 3191 if (*pret) 3192 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); 3193 } 3194 3195 lock_state = rbd_dev->lock_state; 3196 up_write(&rbd_dev->lock_rwsem); 3197 return lock_state; 3198 } 3199 3200 static void rbd_acquire_lock(struct work_struct *work) 3201 { 3202 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3203 struct rbd_device, lock_dwork); 3204 enum rbd_lock_state lock_state; 3205 int ret = 0; 3206 3207 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3208 again: 3209 lock_state = rbd_try_acquire_lock(rbd_dev, &ret); 3210 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { 3211 if (lock_state == RBD_LOCK_STATE_LOCKED) 3212 wake_requests(rbd_dev, true); 3213 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, 3214 rbd_dev, lock_state, ret); 3215 return; 3216 } 3217 3218 ret = rbd_request_lock(rbd_dev); 3219 if (ret == -ETIMEDOUT) { 3220 goto again; /* treat this as a dead client */ 3221 } else if (ret == -EROFS) { 3222 rbd_warn(rbd_dev, "peer will not release lock"); 3223 /* 3224 * If this is rbd_add_acquire_lock(), we want to fail 3225 * immediately -- reuse BLACKLISTED flag. Otherwise we 3226 * want to block. 3227 */ 3228 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) { 3229 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); 3230 /* wake "rbd map --exclusive" process */ 3231 wake_requests(rbd_dev, false); 3232 } 3233 } else if (ret < 0) { 3234 rbd_warn(rbd_dev, "error requesting lock: %d", ret); 3235 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3236 RBD_RETRY_DELAY); 3237 } else { 3238 /* 3239 * lock owner acked, but resend if we don't see them 3240 * release the lock 3241 */ 3242 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__, 3243 rbd_dev); 3244 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3245 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); 3246 } 3247 } 3248 3249 /* 3250 * lock_rwsem must be held for write 3251 */ 3252 static bool rbd_release_lock(struct rbd_device *rbd_dev) 3253 { 3254 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 3255 rbd_dev->lock_state); 3256 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) 3257 return false; 3258 3259 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; 3260 downgrade_write(&rbd_dev->lock_rwsem); 3261 /* 3262 * Ensure that all in-flight IO is flushed. 3263 * 3264 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which 3265 * may be shared with other devices. 3266 */ 3267 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); 3268 up_read(&rbd_dev->lock_rwsem); 3269 3270 down_write(&rbd_dev->lock_rwsem); 3271 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 3272 rbd_dev->lock_state); 3273 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) 3274 return false; 3275 3276 rbd_unlock(rbd_dev); 3277 /* 3278 * Give others a chance to grab the lock - we would re-acquire 3279 * almost immediately if we got new IO during ceph_osdc_sync() 3280 * otherwise. We need to ack our own notifications, so this 3281 * lock_dwork will be requeued from rbd_wait_state_locked() 3282 * after wake_requests() in rbd_handle_released_lock(). 3283 */ 3284 cancel_delayed_work(&rbd_dev->lock_dwork); 3285 return true; 3286 } 3287 3288 static void rbd_release_lock_work(struct work_struct *work) 3289 { 3290 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 3291 unlock_work); 3292 3293 down_write(&rbd_dev->lock_rwsem); 3294 rbd_release_lock(rbd_dev); 3295 up_write(&rbd_dev->lock_rwsem); 3296 } 3297 3298 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, 3299 void **p) 3300 { 3301 struct rbd_client_id cid = { 0 }; 3302 3303 if (struct_v >= 2) { 3304 cid.gid = ceph_decode_64(p); 3305 cid.handle = ceph_decode_64(p); 3306 } 3307 3308 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3309 cid.handle); 3310 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 3311 down_write(&rbd_dev->lock_rwsem); 3312 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 3313 /* 3314 * we already know that the remote client is 3315 * the owner 3316 */ 3317 up_write(&rbd_dev->lock_rwsem); 3318 return; 3319 } 3320 3321 rbd_set_owner_cid(rbd_dev, &cid); 3322 downgrade_write(&rbd_dev->lock_rwsem); 3323 } else { 3324 down_read(&rbd_dev->lock_rwsem); 3325 } 3326 3327 if (!__rbd_is_lock_owner(rbd_dev)) 3328 wake_requests(rbd_dev, false); 3329 up_read(&rbd_dev->lock_rwsem); 3330 } 3331 3332 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, 3333 void **p) 3334 { 3335 struct rbd_client_id cid = { 0 }; 3336 3337 if (struct_v >= 2) { 3338 cid.gid = ceph_decode_64(p); 3339 cid.handle = ceph_decode_64(p); 3340 } 3341 3342 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3343 cid.handle); 3344 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 3345 down_write(&rbd_dev->lock_rwsem); 3346 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 3347 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", 3348 __func__, rbd_dev, cid.gid, cid.handle, 3349 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); 3350 up_write(&rbd_dev->lock_rwsem); 3351 return; 3352 } 3353 3354 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3355 downgrade_write(&rbd_dev->lock_rwsem); 3356 } else { 3357 down_read(&rbd_dev->lock_rwsem); 3358 } 3359 3360 if (!__rbd_is_lock_owner(rbd_dev)) 3361 wake_requests(rbd_dev, false); 3362 up_read(&rbd_dev->lock_rwsem); 3363 } 3364 3365 /* 3366 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no 3367 * ResponseMessage is needed. 3368 */ 3369 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, 3370 void **p) 3371 { 3372 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); 3373 struct rbd_client_id cid = { 0 }; 3374 int result = 1; 3375 3376 if (struct_v >= 2) { 3377 cid.gid = ceph_decode_64(p); 3378 cid.handle = ceph_decode_64(p); 3379 } 3380 3381 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3382 cid.handle); 3383 if (rbd_cid_equal(&cid, &my_cid)) 3384 return result; 3385 3386 down_read(&rbd_dev->lock_rwsem); 3387 if (__rbd_is_lock_owner(rbd_dev)) { 3388 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED && 3389 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) 3390 goto out_unlock; 3391 3392 /* 3393 * encode ResponseMessage(0) so the peer can detect 3394 * a missing owner 3395 */ 3396 result = 0; 3397 3398 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { 3399 if (!rbd_dev->opts->exclusive) { 3400 dout("%s rbd_dev %p queueing unlock_work\n", 3401 __func__, rbd_dev); 3402 queue_work(rbd_dev->task_wq, 3403 &rbd_dev->unlock_work); 3404 } else { 3405 /* refuse to release the lock */ 3406 result = -EROFS; 3407 } 3408 } 3409 } 3410 3411 out_unlock: 3412 up_read(&rbd_dev->lock_rwsem); 3413 return result; 3414 } 3415 3416 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, 3417 u64 notify_id, u64 cookie, s32 *result) 3418 { 3419 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3420 char buf[4 + CEPH_ENCODING_START_BLK_LEN]; 3421 int buf_size = sizeof(buf); 3422 int ret; 3423 3424 if (result) { 3425 void *p = buf; 3426 3427 /* encode ResponseMessage */ 3428 ceph_start_encoding(&p, 1, 1, 3429 buf_size - CEPH_ENCODING_START_BLK_LEN); 3430 ceph_encode_32(&p, *result); 3431 } else { 3432 buf_size = 0; 3433 } 3434 3435 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, 3436 &rbd_dev->header_oloc, notify_id, cookie, 3437 buf, buf_size); 3438 if (ret) 3439 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret); 3440 } 3441 3442 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, 3443 u64 cookie) 3444 { 3445 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3446 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL); 3447 } 3448 3449 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev, 3450 u64 notify_id, u64 cookie, s32 result) 3451 { 3452 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); 3453 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result); 3454 } 3455 3456 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, 3457 u64 notifier_id, void *data, size_t data_len) 3458 { 3459 struct rbd_device *rbd_dev = arg; 3460 void *p = data; 3461 void *const end = p + data_len; 3462 u8 struct_v = 0; 3463 u32 len; 3464 u32 notify_op; 3465 int ret; 3466 3467 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n", 3468 __func__, rbd_dev, cookie, notify_id, data_len); 3469 if (data_len) { 3470 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage", 3471 &struct_v, &len); 3472 if (ret) { 3473 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d", 3474 ret); 3475 return; 3476 } 3477 3478 notify_op = ceph_decode_32(&p); 3479 } else { 3480 /* legacy notification for header updates */ 3481 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE; 3482 len = 0; 3483 } 3484 3485 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op); 3486 switch (notify_op) { 3487 case RBD_NOTIFY_OP_ACQUIRED_LOCK: 3488 rbd_handle_acquired_lock(rbd_dev, struct_v, &p); 3489 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3490 break; 3491 case RBD_NOTIFY_OP_RELEASED_LOCK: 3492 rbd_handle_released_lock(rbd_dev, struct_v, &p); 3493 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3494 break; 3495 case RBD_NOTIFY_OP_REQUEST_LOCK: 3496 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p); 3497 if (ret <= 0) 3498 rbd_acknowledge_notify_result(rbd_dev, notify_id, 3499 cookie, ret); 3500 else 3501 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3502 break; 3503 case RBD_NOTIFY_OP_HEADER_UPDATE: 3504 ret = rbd_dev_refresh(rbd_dev); 3505 if (ret) 3506 rbd_warn(rbd_dev, "refresh failed: %d", ret); 3507 3508 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3509 break; 3510 default: 3511 if (rbd_is_lock_owner(rbd_dev)) 3512 rbd_acknowledge_notify_result(rbd_dev, notify_id, 3513 cookie, -EOPNOTSUPP); 3514 else 3515 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3516 break; 3517 } 3518 } 3519 3520 static void __rbd_unregister_watch(struct rbd_device *rbd_dev); 3521 3522 static void rbd_watch_errcb(void *arg, u64 cookie, int err) 3523 { 3524 struct rbd_device *rbd_dev = arg; 3525 3526 rbd_warn(rbd_dev, "encountered watch error: %d", err); 3527 3528 down_write(&rbd_dev->lock_rwsem); 3529 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3530 up_write(&rbd_dev->lock_rwsem); 3531 3532 mutex_lock(&rbd_dev->watch_mutex); 3533 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) { 3534 __rbd_unregister_watch(rbd_dev); 3535 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR; 3536 3537 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0); 3538 } 3539 mutex_unlock(&rbd_dev->watch_mutex); 3540 } 3541 3542 /* 3543 * watch_mutex must be locked 3544 */ 3545 static int __rbd_register_watch(struct rbd_device *rbd_dev) 3546 { 3547 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3548 struct ceph_osd_linger_request *handle; 3549 3550 rbd_assert(!rbd_dev->watch_handle); 3551 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3552 3553 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, 3554 &rbd_dev->header_oloc, rbd_watch_cb, 3555 rbd_watch_errcb, rbd_dev); 3556 if (IS_ERR(handle)) 3557 return PTR_ERR(handle); 3558 3559 rbd_dev->watch_handle = handle; 3560 return 0; 3561 } 3562 3563 /* 3564 * watch_mutex must be locked 3565 */ 3566 static void __rbd_unregister_watch(struct rbd_device *rbd_dev) 3567 { 3568 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3569 int ret; 3570 3571 rbd_assert(rbd_dev->watch_handle); 3572 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3573 3574 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); 3575 if (ret) 3576 rbd_warn(rbd_dev, "failed to unwatch: %d", ret); 3577 3578 rbd_dev->watch_handle = NULL; 3579 } 3580 3581 static int rbd_register_watch(struct rbd_device *rbd_dev) 3582 { 3583 int ret; 3584 3585 mutex_lock(&rbd_dev->watch_mutex); 3586 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED); 3587 ret = __rbd_register_watch(rbd_dev); 3588 if (ret) 3589 goto out; 3590 3591 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 3592 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 3593 3594 out: 3595 mutex_unlock(&rbd_dev->watch_mutex); 3596 return ret; 3597 } 3598 3599 static void cancel_tasks_sync(struct rbd_device *rbd_dev) 3600 { 3601 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3602 3603 cancel_work_sync(&rbd_dev->acquired_lock_work); 3604 cancel_work_sync(&rbd_dev->released_lock_work); 3605 cancel_delayed_work_sync(&rbd_dev->lock_dwork); 3606 cancel_work_sync(&rbd_dev->unlock_work); 3607 } 3608 3609 static void rbd_unregister_watch(struct rbd_device *rbd_dev) 3610 { 3611 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); 3612 cancel_tasks_sync(rbd_dev); 3613 3614 mutex_lock(&rbd_dev->watch_mutex); 3615 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) 3616 __rbd_unregister_watch(rbd_dev); 3617 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 3618 mutex_unlock(&rbd_dev->watch_mutex); 3619 3620 cancel_delayed_work_sync(&rbd_dev->watch_dwork); 3621 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); 3622 } 3623 3624 /* 3625 * lock_rwsem must be held for write 3626 */ 3627 static void rbd_reacquire_lock(struct rbd_device *rbd_dev) 3628 { 3629 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3630 char cookie[32]; 3631 int ret; 3632 3633 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); 3634 3635 format_lock_cookie(rbd_dev, cookie); 3636 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, 3637 &rbd_dev->header_oloc, RBD_LOCK_NAME, 3638 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie, 3639 RBD_LOCK_TAG, cookie); 3640 if (ret) { 3641 if (ret != -EOPNOTSUPP) 3642 rbd_warn(rbd_dev, "failed to update lock cookie: %d", 3643 ret); 3644 3645 /* 3646 * Lock cookie cannot be updated on older OSDs, so do 3647 * a manual release and queue an acquire. 3648 */ 3649 if (rbd_release_lock(rbd_dev)) 3650 queue_delayed_work(rbd_dev->task_wq, 3651 &rbd_dev->lock_dwork, 0); 3652 } else { 3653 __rbd_lock(rbd_dev, cookie); 3654 } 3655 } 3656 3657 static void rbd_reregister_watch(struct work_struct *work) 3658 { 3659 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3660 struct rbd_device, watch_dwork); 3661 int ret; 3662 3663 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3664 3665 mutex_lock(&rbd_dev->watch_mutex); 3666 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) { 3667 mutex_unlock(&rbd_dev->watch_mutex); 3668 return; 3669 } 3670 3671 ret = __rbd_register_watch(rbd_dev); 3672 if (ret) { 3673 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); 3674 if (ret == -EBLACKLISTED || ret == -ENOENT) { 3675 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); 3676 wake_requests(rbd_dev, true); 3677 } else { 3678 queue_delayed_work(rbd_dev->task_wq, 3679 &rbd_dev->watch_dwork, 3680 RBD_RETRY_DELAY); 3681 } 3682 mutex_unlock(&rbd_dev->watch_mutex); 3683 return; 3684 } 3685 3686 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 3687 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 3688 mutex_unlock(&rbd_dev->watch_mutex); 3689 3690 down_write(&rbd_dev->lock_rwsem); 3691 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) 3692 rbd_reacquire_lock(rbd_dev); 3693 up_write(&rbd_dev->lock_rwsem); 3694 3695 ret = rbd_dev_refresh(rbd_dev); 3696 if (ret) 3697 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret); 3698 } 3699 3700 /* 3701 * Synchronous osd object method call. Returns the number of bytes 3702 * returned in the outbound buffer, or a negative error code. 3703 */ 3704 static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 3705 struct ceph_object_id *oid, 3706 struct ceph_object_locator *oloc, 3707 const char *method_name, 3708 const void *outbound, 3709 size_t outbound_size, 3710 void *inbound, 3711 size_t inbound_size) 3712 { 3713 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3714 struct page *req_page = NULL; 3715 struct page *reply_page; 3716 int ret; 3717 3718 /* 3719 * Method calls are ultimately read operations. The result 3720 * should placed into the inbound buffer provided. They 3721 * also supply outbound data--parameters for the object 3722 * method. Currently if this is present it will be a 3723 * snapshot id. 3724 */ 3725 if (outbound) { 3726 if (outbound_size > PAGE_SIZE) 3727 return -E2BIG; 3728 3729 req_page = alloc_page(GFP_KERNEL); 3730 if (!req_page) 3731 return -ENOMEM; 3732 3733 memcpy(page_address(req_page), outbound, outbound_size); 3734 } 3735 3736 reply_page = alloc_page(GFP_KERNEL); 3737 if (!reply_page) { 3738 if (req_page) 3739 __free_page(req_page); 3740 return -ENOMEM; 3741 } 3742 3743 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, 3744 CEPH_OSD_FLAG_READ, req_page, outbound_size, 3745 reply_page, &inbound_size); 3746 if (!ret) { 3747 memcpy(inbound, page_address(reply_page), inbound_size); 3748 ret = inbound_size; 3749 } 3750 3751 if (req_page) 3752 __free_page(req_page); 3753 __free_page(reply_page); 3754 return ret; 3755 } 3756 3757 /* 3758 * lock_rwsem must be held for read 3759 */ 3760 static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire) 3761 { 3762 DEFINE_WAIT(wait); 3763 unsigned long timeout; 3764 int ret = 0; 3765 3766 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) 3767 return -EBLACKLISTED; 3768 3769 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) 3770 return 0; 3771 3772 if (!may_acquire) { 3773 rbd_warn(rbd_dev, "exclusive lock required"); 3774 return -EROFS; 3775 } 3776 3777 do { 3778 /* 3779 * Note the use of mod_delayed_work() in rbd_acquire_lock() 3780 * and cancel_delayed_work() in wake_requests(). 3781 */ 3782 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); 3783 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); 3784 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, 3785 TASK_UNINTERRUPTIBLE); 3786 up_read(&rbd_dev->lock_rwsem); 3787 timeout = schedule_timeout(ceph_timeout_jiffies( 3788 rbd_dev->opts->lock_timeout)); 3789 down_read(&rbd_dev->lock_rwsem); 3790 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { 3791 ret = -EBLACKLISTED; 3792 break; 3793 } 3794 if (!timeout) { 3795 rbd_warn(rbd_dev, "timed out waiting for lock"); 3796 ret = -ETIMEDOUT; 3797 break; 3798 } 3799 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); 3800 3801 finish_wait(&rbd_dev->lock_waitq, &wait); 3802 return ret; 3803 } 3804 3805 static void rbd_queue_workfn(struct work_struct *work) 3806 { 3807 struct request *rq = blk_mq_rq_from_pdu(work); 3808 struct rbd_device *rbd_dev = rq->q->queuedata; 3809 struct rbd_img_request *img_request; 3810 struct ceph_snap_context *snapc = NULL; 3811 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; 3812 u64 length = blk_rq_bytes(rq); 3813 enum obj_operation_type op_type; 3814 u64 mapping_size; 3815 bool must_be_locked; 3816 int result; 3817 3818 switch (req_op(rq)) { 3819 case REQ_OP_DISCARD: 3820 op_type = OBJ_OP_DISCARD; 3821 break; 3822 case REQ_OP_WRITE_ZEROES: 3823 op_type = OBJ_OP_ZEROOUT; 3824 break; 3825 case REQ_OP_WRITE: 3826 op_type = OBJ_OP_WRITE; 3827 break; 3828 case REQ_OP_READ: 3829 op_type = OBJ_OP_READ; 3830 break; 3831 default: 3832 dout("%s: non-fs request type %d\n", __func__, req_op(rq)); 3833 result = -EIO; 3834 goto err; 3835 } 3836 3837 /* Ignore/skip any zero-length requests */ 3838 3839 if (!length) { 3840 dout("%s: zero-length request\n", __func__); 3841 result = 0; 3842 goto err_rq; 3843 } 3844 3845 if (op_type != OBJ_OP_READ && rbd_dev->spec->snap_id != CEPH_NOSNAP) { 3846 rbd_warn(rbd_dev, "%s on read-only snapshot", 3847 obj_op_name(op_type)); 3848 result = -EIO; 3849 goto err; 3850 } 3851 3852 /* 3853 * Quit early if the mapped snapshot no longer exists. It's 3854 * still possible the snapshot will have disappeared by the 3855 * time our request arrives at the osd, but there's no sense in 3856 * sending it if we already know. 3857 */ 3858 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { 3859 dout("request for non-existent snapshot"); 3860 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 3861 result = -ENXIO; 3862 goto err_rq; 3863 } 3864 3865 if (offset && length > U64_MAX - offset + 1) { 3866 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset, 3867 length); 3868 result = -EINVAL; 3869 goto err_rq; /* Shouldn't happen */ 3870 } 3871 3872 blk_mq_start_request(rq); 3873 3874 down_read(&rbd_dev->header_rwsem); 3875 mapping_size = rbd_dev->mapping.size; 3876 if (op_type != OBJ_OP_READ) { 3877 snapc = rbd_dev->header.snapc; 3878 ceph_get_snap_context(snapc); 3879 } 3880 up_read(&rbd_dev->header_rwsem); 3881 3882 if (offset + length > mapping_size) { 3883 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset, 3884 length, mapping_size); 3885 result = -EIO; 3886 goto err_rq; 3887 } 3888 3889 must_be_locked = 3890 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && 3891 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read); 3892 if (must_be_locked) { 3893 down_read(&rbd_dev->lock_rwsem); 3894 result = rbd_wait_state_locked(rbd_dev, 3895 !rbd_dev->opts->exclusive); 3896 if (result) 3897 goto err_unlock; 3898 } 3899 3900 img_request = rbd_img_request_create(rbd_dev, op_type, snapc); 3901 if (!img_request) { 3902 result = -ENOMEM; 3903 goto err_unlock; 3904 } 3905 img_request->rq = rq; 3906 snapc = NULL; /* img_request consumes a ref */ 3907 3908 if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT) 3909 result = rbd_img_fill_nodata(img_request, offset, length); 3910 else 3911 result = rbd_img_fill_from_bio(img_request, offset, length, 3912 rq->bio); 3913 if (result || !img_request->pending_count) 3914 goto err_img_request; 3915 3916 rbd_img_request_submit(img_request); 3917 if (must_be_locked) 3918 up_read(&rbd_dev->lock_rwsem); 3919 return; 3920 3921 err_img_request: 3922 rbd_img_request_put(img_request); 3923 err_unlock: 3924 if (must_be_locked) 3925 up_read(&rbd_dev->lock_rwsem); 3926 err_rq: 3927 if (result) 3928 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 3929 obj_op_name(op_type), length, offset, result); 3930 ceph_put_snap_context(snapc); 3931 err: 3932 blk_mq_end_request(rq, errno_to_blk_status(result)); 3933 } 3934 3935 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx, 3936 const struct blk_mq_queue_data *bd) 3937 { 3938 struct request *rq = bd->rq; 3939 struct work_struct *work = blk_mq_rq_to_pdu(rq); 3940 3941 queue_work(rbd_wq, work); 3942 return BLK_STS_OK; 3943 } 3944 3945 static void rbd_free_disk(struct rbd_device *rbd_dev) 3946 { 3947 blk_cleanup_queue(rbd_dev->disk->queue); 3948 blk_mq_free_tag_set(&rbd_dev->tag_set); 3949 put_disk(rbd_dev->disk); 3950 rbd_dev->disk = NULL; 3951 } 3952 3953 static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 3954 struct ceph_object_id *oid, 3955 struct ceph_object_locator *oloc, 3956 void *buf, int buf_len) 3957 3958 { 3959 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3960 struct ceph_osd_request *req; 3961 struct page **pages; 3962 int num_pages = calc_pages_for(0, buf_len); 3963 int ret; 3964 3965 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); 3966 if (!req) 3967 return -ENOMEM; 3968 3969 ceph_oid_copy(&req->r_base_oid, oid); 3970 ceph_oloc_copy(&req->r_base_oloc, oloc); 3971 req->r_flags = CEPH_OSD_FLAG_READ; 3972 3973 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 3974 if (IS_ERR(pages)) { 3975 ret = PTR_ERR(pages); 3976 goto out_req; 3977 } 3978 3979 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0); 3980 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, 3981 true); 3982 3983 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); 3984 if (ret) 3985 goto out_req; 3986 3987 ceph_osdc_start_request(osdc, req, false); 3988 ret = ceph_osdc_wait_request(osdc, req); 3989 if (ret >= 0) 3990 ceph_copy_from_page_vector(pages, buf, 0, ret); 3991 3992 out_req: 3993 ceph_osdc_put_request(req); 3994 return ret; 3995 } 3996 3997 /* 3998 * Read the complete header for the given rbd device. On successful 3999 * return, the rbd_dev->header field will contain up-to-date 4000 * information about the image. 4001 */ 4002 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) 4003 { 4004 struct rbd_image_header_ondisk *ondisk = NULL; 4005 u32 snap_count = 0; 4006 u64 names_size = 0; 4007 u32 want_count; 4008 int ret; 4009 4010 /* 4011 * The complete header will include an array of its 64-bit 4012 * snapshot ids, followed by the names of those snapshots as 4013 * a contiguous block of NUL-terminated strings. Note that 4014 * the number of snapshots could change by the time we read 4015 * it in, in which case we re-read it. 4016 */ 4017 do { 4018 size_t size; 4019 4020 kfree(ondisk); 4021 4022 size = sizeof (*ondisk); 4023 size += snap_count * sizeof (struct rbd_image_snap_ondisk); 4024 size += names_size; 4025 ondisk = kmalloc(size, GFP_KERNEL); 4026 if (!ondisk) 4027 return -ENOMEM; 4028 4029 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid, 4030 &rbd_dev->header_oloc, ondisk, size); 4031 if (ret < 0) 4032 goto out; 4033 if ((size_t)ret < size) { 4034 ret = -ENXIO; 4035 rbd_warn(rbd_dev, "short header read (want %zd got %d)", 4036 size, ret); 4037 goto out; 4038 } 4039 if (!rbd_dev_ondisk_valid(ondisk)) { 4040 ret = -ENXIO; 4041 rbd_warn(rbd_dev, "invalid header"); 4042 goto out; 4043 } 4044 4045 names_size = le64_to_cpu(ondisk->snap_names_len); 4046 want_count = snap_count; 4047 snap_count = le32_to_cpu(ondisk->snap_count); 4048 } while (snap_count != want_count); 4049 4050 ret = rbd_header_from_disk(rbd_dev, ondisk); 4051 out: 4052 kfree(ondisk); 4053 4054 return ret; 4055 } 4056 4057 /* 4058 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to 4059 * has disappeared from the (just updated) snapshot context. 4060 */ 4061 static void rbd_exists_validate(struct rbd_device *rbd_dev) 4062 { 4063 u64 snap_id; 4064 4065 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) 4066 return; 4067 4068 snap_id = rbd_dev->spec->snap_id; 4069 if (snap_id == CEPH_NOSNAP) 4070 return; 4071 4072 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX) 4073 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 4074 } 4075 4076 static void rbd_dev_update_size(struct rbd_device *rbd_dev) 4077 { 4078 sector_t size; 4079 4080 /* 4081 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't 4082 * try to update its size. If REMOVING is set, updating size 4083 * is just useless work since the device can't be opened. 4084 */ 4085 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) && 4086 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) { 4087 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE; 4088 dout("setting size to %llu sectors", (unsigned long long)size); 4089 set_capacity(rbd_dev->disk, size); 4090 revalidate_disk(rbd_dev->disk); 4091 } 4092 } 4093 4094 static int rbd_dev_refresh(struct rbd_device *rbd_dev) 4095 { 4096 u64 mapping_size; 4097 int ret; 4098 4099 down_write(&rbd_dev->header_rwsem); 4100 mapping_size = rbd_dev->mapping.size; 4101 4102 ret = rbd_dev_header_info(rbd_dev); 4103 if (ret) 4104 goto out; 4105 4106 /* 4107 * If there is a parent, see if it has disappeared due to the 4108 * mapped image getting flattened. 4109 */ 4110 if (rbd_dev->parent) { 4111 ret = rbd_dev_v2_parent_info(rbd_dev); 4112 if (ret) 4113 goto out; 4114 } 4115 4116 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { 4117 rbd_dev->mapping.size = rbd_dev->header.image_size; 4118 } else { 4119 /* validate mapped snapshot's EXISTS flag */ 4120 rbd_exists_validate(rbd_dev); 4121 } 4122 4123 out: 4124 up_write(&rbd_dev->header_rwsem); 4125 if (!ret && mapping_size != rbd_dev->mapping.size) 4126 rbd_dev_update_size(rbd_dev); 4127 4128 return ret; 4129 } 4130 4131 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq, 4132 unsigned int hctx_idx, unsigned int numa_node) 4133 { 4134 struct work_struct *work = blk_mq_rq_to_pdu(rq); 4135 4136 INIT_WORK(work, rbd_queue_workfn); 4137 return 0; 4138 } 4139 4140 static const struct blk_mq_ops rbd_mq_ops = { 4141 .queue_rq = rbd_queue_rq, 4142 .init_request = rbd_init_request, 4143 }; 4144 4145 static int rbd_init_disk(struct rbd_device *rbd_dev) 4146 { 4147 struct gendisk *disk; 4148 struct request_queue *q; 4149 unsigned int objset_bytes = 4150 rbd_dev->layout.object_size * rbd_dev->layout.stripe_count; 4151 int err; 4152 4153 /* create gendisk info */ 4154 disk = alloc_disk(single_major ? 4155 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) : 4156 RBD_MINORS_PER_MAJOR); 4157 if (!disk) 4158 return -ENOMEM; 4159 4160 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 4161 rbd_dev->dev_id); 4162 disk->major = rbd_dev->major; 4163 disk->first_minor = rbd_dev->minor; 4164 if (single_major) 4165 disk->flags |= GENHD_FL_EXT_DEVT; 4166 disk->fops = &rbd_bd_ops; 4167 disk->private_data = rbd_dev; 4168 4169 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set)); 4170 rbd_dev->tag_set.ops = &rbd_mq_ops; 4171 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth; 4172 rbd_dev->tag_set.numa_node = NUMA_NO_NODE; 4173 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE; 4174 rbd_dev->tag_set.nr_hw_queues = 1; 4175 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct); 4176 4177 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set); 4178 if (err) 4179 goto out_disk; 4180 4181 q = blk_mq_init_queue(&rbd_dev->tag_set); 4182 if (IS_ERR(q)) { 4183 err = PTR_ERR(q); 4184 goto out_tag_set; 4185 } 4186 4187 blk_queue_flag_set(QUEUE_FLAG_NONROT, q); 4188 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */ 4189 4190 blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT); 4191 q->limits.max_sectors = queue_max_hw_sectors(q); 4192 blk_queue_max_segments(q, USHRT_MAX); 4193 blk_queue_max_segment_size(q, UINT_MAX); 4194 blk_queue_io_min(q, rbd_dev->opts->alloc_size); 4195 blk_queue_io_opt(q, rbd_dev->opts->alloc_size); 4196 4197 if (rbd_dev->opts->trim) { 4198 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q); 4199 q->limits.discard_granularity = rbd_dev->opts->alloc_size; 4200 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT); 4201 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT); 4202 } 4203 4204 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC)) 4205 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES; 4206 4207 /* 4208 * disk_release() expects a queue ref from add_disk() and will 4209 * put it. Hold an extra ref until add_disk() is called. 4210 */ 4211 WARN_ON(!blk_get_queue(q)); 4212 disk->queue = q; 4213 q->queuedata = rbd_dev; 4214 4215 rbd_dev->disk = disk; 4216 4217 return 0; 4218 out_tag_set: 4219 blk_mq_free_tag_set(&rbd_dev->tag_set); 4220 out_disk: 4221 put_disk(disk); 4222 return err; 4223 } 4224 4225 /* 4226 sysfs 4227 */ 4228 4229 static struct rbd_device *dev_to_rbd_dev(struct device *dev) 4230 { 4231 return container_of(dev, struct rbd_device, dev); 4232 } 4233 4234 static ssize_t rbd_size_show(struct device *dev, 4235 struct device_attribute *attr, char *buf) 4236 { 4237 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4238 4239 return sprintf(buf, "%llu\n", 4240 (unsigned long long)rbd_dev->mapping.size); 4241 } 4242 4243 /* 4244 * Note this shows the features for whatever's mapped, which is not 4245 * necessarily the base image. 4246 */ 4247 static ssize_t rbd_features_show(struct device *dev, 4248 struct device_attribute *attr, char *buf) 4249 { 4250 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4251 4252 return sprintf(buf, "0x%016llx\n", 4253 (unsigned long long)rbd_dev->mapping.features); 4254 } 4255 4256 static ssize_t rbd_major_show(struct device *dev, 4257 struct device_attribute *attr, char *buf) 4258 { 4259 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4260 4261 if (rbd_dev->major) 4262 return sprintf(buf, "%d\n", rbd_dev->major); 4263 4264 return sprintf(buf, "(none)\n"); 4265 } 4266 4267 static ssize_t rbd_minor_show(struct device *dev, 4268 struct device_attribute *attr, char *buf) 4269 { 4270 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4271 4272 return sprintf(buf, "%d\n", rbd_dev->minor); 4273 } 4274 4275 static ssize_t rbd_client_addr_show(struct device *dev, 4276 struct device_attribute *attr, char *buf) 4277 { 4278 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4279 struct ceph_entity_addr *client_addr = 4280 ceph_client_addr(rbd_dev->rbd_client->client); 4281 4282 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr, 4283 le32_to_cpu(client_addr->nonce)); 4284 } 4285 4286 static ssize_t rbd_client_id_show(struct device *dev, 4287 struct device_attribute *attr, char *buf) 4288 { 4289 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4290 4291 return sprintf(buf, "client%lld\n", 4292 ceph_client_gid(rbd_dev->rbd_client->client)); 4293 } 4294 4295 static ssize_t rbd_cluster_fsid_show(struct device *dev, 4296 struct device_attribute *attr, char *buf) 4297 { 4298 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4299 4300 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid); 4301 } 4302 4303 static ssize_t rbd_config_info_show(struct device *dev, 4304 struct device_attribute *attr, char *buf) 4305 { 4306 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4307 4308 return sprintf(buf, "%s\n", rbd_dev->config_info); 4309 } 4310 4311 static ssize_t rbd_pool_show(struct device *dev, 4312 struct device_attribute *attr, char *buf) 4313 { 4314 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4315 4316 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); 4317 } 4318 4319 static ssize_t rbd_pool_id_show(struct device *dev, 4320 struct device_attribute *attr, char *buf) 4321 { 4322 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4323 4324 return sprintf(buf, "%llu\n", 4325 (unsigned long long) rbd_dev->spec->pool_id); 4326 } 4327 4328 static ssize_t rbd_pool_ns_show(struct device *dev, 4329 struct device_attribute *attr, char *buf) 4330 { 4331 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4332 4333 return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: ""); 4334 } 4335 4336 static ssize_t rbd_name_show(struct device *dev, 4337 struct device_attribute *attr, char *buf) 4338 { 4339 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4340 4341 if (rbd_dev->spec->image_name) 4342 return sprintf(buf, "%s\n", rbd_dev->spec->image_name); 4343 4344 return sprintf(buf, "(unknown)\n"); 4345 } 4346 4347 static ssize_t rbd_image_id_show(struct device *dev, 4348 struct device_attribute *attr, char *buf) 4349 { 4350 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4351 4352 return sprintf(buf, "%s\n", rbd_dev->spec->image_id); 4353 } 4354 4355 /* 4356 * Shows the name of the currently-mapped snapshot (or 4357 * RBD_SNAP_HEAD_NAME for the base image). 4358 */ 4359 static ssize_t rbd_snap_show(struct device *dev, 4360 struct device_attribute *attr, 4361 char *buf) 4362 { 4363 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4364 4365 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 4366 } 4367 4368 static ssize_t rbd_snap_id_show(struct device *dev, 4369 struct device_attribute *attr, char *buf) 4370 { 4371 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4372 4373 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id); 4374 } 4375 4376 /* 4377 * For a v2 image, shows the chain of parent images, separated by empty 4378 * lines. For v1 images or if there is no parent, shows "(no parent 4379 * image)". 4380 */ 4381 static ssize_t rbd_parent_show(struct device *dev, 4382 struct device_attribute *attr, 4383 char *buf) 4384 { 4385 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4386 ssize_t count = 0; 4387 4388 if (!rbd_dev->parent) 4389 return sprintf(buf, "(no parent image)\n"); 4390 4391 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) { 4392 struct rbd_spec *spec = rbd_dev->parent_spec; 4393 4394 count += sprintf(&buf[count], "%s" 4395 "pool_id %llu\npool_name %s\n" 4396 "pool_ns %s\n" 4397 "image_id %s\nimage_name %s\n" 4398 "snap_id %llu\nsnap_name %s\n" 4399 "overlap %llu\n", 4400 !count ? "" : "\n", /* first? */ 4401 spec->pool_id, spec->pool_name, 4402 spec->pool_ns ?: "", 4403 spec->image_id, spec->image_name ?: "(unknown)", 4404 spec->snap_id, spec->snap_name, 4405 rbd_dev->parent_overlap); 4406 } 4407 4408 return count; 4409 } 4410 4411 static ssize_t rbd_image_refresh(struct device *dev, 4412 struct device_attribute *attr, 4413 const char *buf, 4414 size_t size) 4415 { 4416 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4417 int ret; 4418 4419 ret = rbd_dev_refresh(rbd_dev); 4420 if (ret) 4421 return ret; 4422 4423 return size; 4424 } 4425 4426 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL); 4427 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL); 4428 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL); 4429 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL); 4430 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL); 4431 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL); 4432 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL); 4433 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL); 4434 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL); 4435 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL); 4436 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL); 4437 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL); 4438 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL); 4439 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh); 4440 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL); 4441 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL); 4442 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL); 4443 4444 static struct attribute *rbd_attrs[] = { 4445 &dev_attr_size.attr, 4446 &dev_attr_features.attr, 4447 &dev_attr_major.attr, 4448 &dev_attr_minor.attr, 4449 &dev_attr_client_addr.attr, 4450 &dev_attr_client_id.attr, 4451 &dev_attr_cluster_fsid.attr, 4452 &dev_attr_config_info.attr, 4453 &dev_attr_pool.attr, 4454 &dev_attr_pool_id.attr, 4455 &dev_attr_pool_ns.attr, 4456 &dev_attr_name.attr, 4457 &dev_attr_image_id.attr, 4458 &dev_attr_current_snap.attr, 4459 &dev_attr_snap_id.attr, 4460 &dev_attr_parent.attr, 4461 &dev_attr_refresh.attr, 4462 NULL 4463 }; 4464 4465 static struct attribute_group rbd_attr_group = { 4466 .attrs = rbd_attrs, 4467 }; 4468 4469 static const struct attribute_group *rbd_attr_groups[] = { 4470 &rbd_attr_group, 4471 NULL 4472 }; 4473 4474 static void rbd_dev_release(struct device *dev); 4475 4476 static const struct device_type rbd_device_type = { 4477 .name = "rbd", 4478 .groups = rbd_attr_groups, 4479 .release = rbd_dev_release, 4480 }; 4481 4482 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 4483 { 4484 kref_get(&spec->kref); 4485 4486 return spec; 4487 } 4488 4489 static void rbd_spec_free(struct kref *kref); 4490 static void rbd_spec_put(struct rbd_spec *spec) 4491 { 4492 if (spec) 4493 kref_put(&spec->kref, rbd_spec_free); 4494 } 4495 4496 static struct rbd_spec *rbd_spec_alloc(void) 4497 { 4498 struct rbd_spec *spec; 4499 4500 spec = kzalloc(sizeof (*spec), GFP_KERNEL); 4501 if (!spec) 4502 return NULL; 4503 4504 spec->pool_id = CEPH_NOPOOL; 4505 spec->snap_id = CEPH_NOSNAP; 4506 kref_init(&spec->kref); 4507 4508 return spec; 4509 } 4510 4511 static void rbd_spec_free(struct kref *kref) 4512 { 4513 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); 4514 4515 kfree(spec->pool_name); 4516 kfree(spec->pool_ns); 4517 kfree(spec->image_id); 4518 kfree(spec->image_name); 4519 kfree(spec->snap_name); 4520 kfree(spec); 4521 } 4522 4523 static void rbd_dev_free(struct rbd_device *rbd_dev) 4524 { 4525 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED); 4526 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED); 4527 4528 ceph_oid_destroy(&rbd_dev->header_oid); 4529 ceph_oloc_destroy(&rbd_dev->header_oloc); 4530 kfree(rbd_dev->config_info); 4531 4532 rbd_put_client(rbd_dev->rbd_client); 4533 rbd_spec_put(rbd_dev->spec); 4534 kfree(rbd_dev->opts); 4535 kfree(rbd_dev); 4536 } 4537 4538 static void rbd_dev_release(struct device *dev) 4539 { 4540 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4541 bool need_put = !!rbd_dev->opts; 4542 4543 if (need_put) { 4544 destroy_workqueue(rbd_dev->task_wq); 4545 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); 4546 } 4547 4548 rbd_dev_free(rbd_dev); 4549 4550 /* 4551 * This is racy, but way better than putting module outside of 4552 * the release callback. The race window is pretty small, so 4553 * doing something similar to dm (dm-builtin.c) is overkill. 4554 */ 4555 if (need_put) 4556 module_put(THIS_MODULE); 4557 } 4558 4559 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, 4560 struct rbd_spec *spec) 4561 { 4562 struct rbd_device *rbd_dev; 4563 4564 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 4565 if (!rbd_dev) 4566 return NULL; 4567 4568 spin_lock_init(&rbd_dev->lock); 4569 INIT_LIST_HEAD(&rbd_dev->node); 4570 init_rwsem(&rbd_dev->header_rwsem); 4571 4572 rbd_dev->header.data_pool_id = CEPH_NOPOOL; 4573 ceph_oid_init(&rbd_dev->header_oid); 4574 rbd_dev->header_oloc.pool = spec->pool_id; 4575 if (spec->pool_ns) { 4576 WARN_ON(!*spec->pool_ns); 4577 rbd_dev->header_oloc.pool_ns = 4578 ceph_find_or_create_string(spec->pool_ns, 4579 strlen(spec->pool_ns)); 4580 } 4581 4582 mutex_init(&rbd_dev->watch_mutex); 4583 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 4584 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch); 4585 4586 init_rwsem(&rbd_dev->lock_rwsem); 4587 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 4588 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock); 4589 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); 4590 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); 4591 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); 4592 init_waitqueue_head(&rbd_dev->lock_waitq); 4593 4594 rbd_dev->dev.bus = &rbd_bus_type; 4595 rbd_dev->dev.type = &rbd_device_type; 4596 rbd_dev->dev.parent = &rbd_root_dev; 4597 device_initialize(&rbd_dev->dev); 4598 4599 rbd_dev->rbd_client = rbdc; 4600 rbd_dev->spec = spec; 4601 4602 return rbd_dev; 4603 } 4604 4605 /* 4606 * Create a mapping rbd_dev. 4607 */ 4608 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 4609 struct rbd_spec *spec, 4610 struct rbd_options *opts) 4611 { 4612 struct rbd_device *rbd_dev; 4613 4614 rbd_dev = __rbd_dev_create(rbdc, spec); 4615 if (!rbd_dev) 4616 return NULL; 4617 4618 rbd_dev->opts = opts; 4619 4620 /* get an id and fill in device name */ 4621 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0, 4622 minor_to_rbd_dev_id(1 << MINORBITS), 4623 GFP_KERNEL); 4624 if (rbd_dev->dev_id < 0) 4625 goto fail_rbd_dev; 4626 4627 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id); 4628 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM, 4629 rbd_dev->name); 4630 if (!rbd_dev->task_wq) 4631 goto fail_dev_id; 4632 4633 /* we have a ref from do_rbd_add() */ 4634 __module_get(THIS_MODULE); 4635 4636 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id); 4637 return rbd_dev; 4638 4639 fail_dev_id: 4640 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); 4641 fail_rbd_dev: 4642 rbd_dev_free(rbd_dev); 4643 return NULL; 4644 } 4645 4646 static void rbd_dev_destroy(struct rbd_device *rbd_dev) 4647 { 4648 if (rbd_dev) 4649 put_device(&rbd_dev->dev); 4650 } 4651 4652 /* 4653 * Get the size and object order for an image snapshot, or if 4654 * snap_id is CEPH_NOSNAP, gets this information for the base 4655 * image. 4656 */ 4657 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 4658 u8 *order, u64 *snap_size) 4659 { 4660 __le64 snapid = cpu_to_le64(snap_id); 4661 int ret; 4662 struct { 4663 u8 order; 4664 __le64 size; 4665 } __attribute__ ((packed)) size_buf = { 0 }; 4666 4667 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4668 &rbd_dev->header_oloc, "get_size", 4669 &snapid, sizeof(snapid), 4670 &size_buf, sizeof(size_buf)); 4671 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4672 if (ret < 0) 4673 return ret; 4674 if (ret < sizeof (size_buf)) 4675 return -ERANGE; 4676 4677 if (order) { 4678 *order = size_buf.order; 4679 dout(" order %u", (unsigned int)*order); 4680 } 4681 *snap_size = le64_to_cpu(size_buf.size); 4682 4683 dout(" snap_id 0x%016llx snap_size = %llu\n", 4684 (unsigned long long)snap_id, 4685 (unsigned long long)*snap_size); 4686 4687 return 0; 4688 } 4689 4690 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) 4691 { 4692 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, 4693 &rbd_dev->header.obj_order, 4694 &rbd_dev->header.image_size); 4695 } 4696 4697 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) 4698 { 4699 void *reply_buf; 4700 int ret; 4701 void *p; 4702 4703 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); 4704 if (!reply_buf) 4705 return -ENOMEM; 4706 4707 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4708 &rbd_dev->header_oloc, "get_object_prefix", 4709 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX); 4710 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4711 if (ret < 0) 4712 goto out; 4713 4714 p = reply_buf; 4715 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 4716 p + ret, NULL, GFP_NOIO); 4717 ret = 0; 4718 4719 if (IS_ERR(rbd_dev->header.object_prefix)) { 4720 ret = PTR_ERR(rbd_dev->header.object_prefix); 4721 rbd_dev->header.object_prefix = NULL; 4722 } else { 4723 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 4724 } 4725 out: 4726 kfree(reply_buf); 4727 4728 return ret; 4729 } 4730 4731 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 4732 u64 *snap_features) 4733 { 4734 __le64 snapid = cpu_to_le64(snap_id); 4735 struct { 4736 __le64 features; 4737 __le64 incompat; 4738 } __attribute__ ((packed)) features_buf = { 0 }; 4739 u64 unsup; 4740 int ret; 4741 4742 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4743 &rbd_dev->header_oloc, "get_features", 4744 &snapid, sizeof(snapid), 4745 &features_buf, sizeof(features_buf)); 4746 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4747 if (ret < 0) 4748 return ret; 4749 if (ret < sizeof (features_buf)) 4750 return -ERANGE; 4751 4752 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED; 4753 if (unsup) { 4754 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx", 4755 unsup); 4756 return -ENXIO; 4757 } 4758 4759 *snap_features = le64_to_cpu(features_buf.features); 4760 4761 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 4762 (unsigned long long)snap_id, 4763 (unsigned long long)*snap_features, 4764 (unsigned long long)le64_to_cpu(features_buf.incompat)); 4765 4766 return 0; 4767 } 4768 4769 static int rbd_dev_v2_features(struct rbd_device *rbd_dev) 4770 { 4771 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 4772 &rbd_dev->header.features); 4773 } 4774 4775 struct parent_image_info { 4776 u64 pool_id; 4777 const char *pool_ns; 4778 const char *image_id; 4779 u64 snap_id; 4780 4781 bool has_overlap; 4782 u64 overlap; 4783 }; 4784 4785 /* 4786 * The caller is responsible for @pii. 4787 */ 4788 static int decode_parent_image_spec(void **p, void *end, 4789 struct parent_image_info *pii) 4790 { 4791 u8 struct_v; 4792 u32 struct_len; 4793 int ret; 4794 4795 ret = ceph_start_decoding(p, end, 1, "ParentImageSpec", 4796 &struct_v, &struct_len); 4797 if (ret) 4798 return ret; 4799 4800 ceph_decode_64_safe(p, end, pii->pool_id, e_inval); 4801 pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL); 4802 if (IS_ERR(pii->pool_ns)) { 4803 ret = PTR_ERR(pii->pool_ns); 4804 pii->pool_ns = NULL; 4805 return ret; 4806 } 4807 pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL); 4808 if (IS_ERR(pii->image_id)) { 4809 ret = PTR_ERR(pii->image_id); 4810 pii->image_id = NULL; 4811 return ret; 4812 } 4813 ceph_decode_64_safe(p, end, pii->snap_id, e_inval); 4814 return 0; 4815 4816 e_inval: 4817 return -EINVAL; 4818 } 4819 4820 static int __get_parent_info(struct rbd_device *rbd_dev, 4821 struct page *req_page, 4822 struct page *reply_page, 4823 struct parent_image_info *pii) 4824 { 4825 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4826 size_t reply_len = PAGE_SIZE; 4827 void *p, *end; 4828 int ret; 4829 4830 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 4831 "rbd", "parent_get", CEPH_OSD_FLAG_READ, 4832 req_page, sizeof(u64), reply_page, &reply_len); 4833 if (ret) 4834 return ret == -EOPNOTSUPP ? 1 : ret; 4835 4836 p = page_address(reply_page); 4837 end = p + reply_len; 4838 ret = decode_parent_image_spec(&p, end, pii); 4839 if (ret) 4840 return ret; 4841 4842 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 4843 "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ, 4844 req_page, sizeof(u64), reply_page, &reply_len); 4845 if (ret) 4846 return ret; 4847 4848 p = page_address(reply_page); 4849 end = p + reply_len; 4850 ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval); 4851 if (pii->has_overlap) 4852 ceph_decode_64_safe(&p, end, pii->overlap, e_inval); 4853 4854 return 0; 4855 4856 e_inval: 4857 return -EINVAL; 4858 } 4859 4860 /* 4861 * The caller is responsible for @pii. 4862 */ 4863 static int __get_parent_info_legacy(struct rbd_device *rbd_dev, 4864 struct page *req_page, 4865 struct page *reply_page, 4866 struct parent_image_info *pii) 4867 { 4868 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4869 size_t reply_len = PAGE_SIZE; 4870 void *p, *end; 4871 int ret; 4872 4873 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 4874 "rbd", "get_parent", CEPH_OSD_FLAG_READ, 4875 req_page, sizeof(u64), reply_page, &reply_len); 4876 if (ret) 4877 return ret; 4878 4879 p = page_address(reply_page); 4880 end = p + reply_len; 4881 ceph_decode_64_safe(&p, end, pii->pool_id, e_inval); 4882 pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 4883 if (IS_ERR(pii->image_id)) { 4884 ret = PTR_ERR(pii->image_id); 4885 pii->image_id = NULL; 4886 return ret; 4887 } 4888 ceph_decode_64_safe(&p, end, pii->snap_id, e_inval); 4889 pii->has_overlap = true; 4890 ceph_decode_64_safe(&p, end, pii->overlap, e_inval); 4891 4892 return 0; 4893 4894 e_inval: 4895 return -EINVAL; 4896 } 4897 4898 static int get_parent_info(struct rbd_device *rbd_dev, 4899 struct parent_image_info *pii) 4900 { 4901 struct page *req_page, *reply_page; 4902 void *p; 4903 int ret; 4904 4905 req_page = alloc_page(GFP_KERNEL); 4906 if (!req_page) 4907 return -ENOMEM; 4908 4909 reply_page = alloc_page(GFP_KERNEL); 4910 if (!reply_page) { 4911 __free_page(req_page); 4912 return -ENOMEM; 4913 } 4914 4915 p = page_address(req_page); 4916 ceph_encode_64(&p, rbd_dev->spec->snap_id); 4917 ret = __get_parent_info(rbd_dev, req_page, reply_page, pii); 4918 if (ret > 0) 4919 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page, 4920 pii); 4921 4922 __free_page(req_page); 4923 __free_page(reply_page); 4924 return ret; 4925 } 4926 4927 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) 4928 { 4929 struct rbd_spec *parent_spec; 4930 struct parent_image_info pii = { 0 }; 4931 int ret; 4932 4933 parent_spec = rbd_spec_alloc(); 4934 if (!parent_spec) 4935 return -ENOMEM; 4936 4937 ret = get_parent_info(rbd_dev, &pii); 4938 if (ret) 4939 goto out_err; 4940 4941 dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n", 4942 __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id, 4943 pii.has_overlap, pii.overlap); 4944 4945 if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) { 4946 /* 4947 * Either the parent never existed, or we have 4948 * record of it but the image got flattened so it no 4949 * longer has a parent. When the parent of a 4950 * layered image disappears we immediately set the 4951 * overlap to 0. The effect of this is that all new 4952 * requests will be treated as if the image had no 4953 * parent. 4954 * 4955 * If !pii.has_overlap, the parent image spec is not 4956 * applicable. It's there to avoid duplication in each 4957 * snapshot record. 4958 */ 4959 if (rbd_dev->parent_overlap) { 4960 rbd_dev->parent_overlap = 0; 4961 rbd_dev_parent_put(rbd_dev); 4962 pr_info("%s: clone image has been flattened\n", 4963 rbd_dev->disk->disk_name); 4964 } 4965 4966 goto out; /* No parent? No problem. */ 4967 } 4968 4969 /* The ceph file layout needs to fit pool id in 32 bits */ 4970 4971 ret = -EIO; 4972 if (pii.pool_id > (u64)U32_MAX) { 4973 rbd_warn(NULL, "parent pool id too large (%llu > %u)", 4974 (unsigned long long)pii.pool_id, U32_MAX); 4975 goto out_err; 4976 } 4977 4978 /* 4979 * The parent won't change (except when the clone is 4980 * flattened, already handled that). So we only need to 4981 * record the parent spec we have not already done so. 4982 */ 4983 if (!rbd_dev->parent_spec) { 4984 parent_spec->pool_id = pii.pool_id; 4985 if (pii.pool_ns && *pii.pool_ns) { 4986 parent_spec->pool_ns = pii.pool_ns; 4987 pii.pool_ns = NULL; 4988 } 4989 parent_spec->image_id = pii.image_id; 4990 pii.image_id = NULL; 4991 parent_spec->snap_id = pii.snap_id; 4992 4993 rbd_dev->parent_spec = parent_spec; 4994 parent_spec = NULL; /* rbd_dev now owns this */ 4995 } 4996 4997 /* 4998 * We always update the parent overlap. If it's zero we issue 4999 * a warning, as we will proceed as if there was no parent. 5000 */ 5001 if (!pii.overlap) { 5002 if (parent_spec) { 5003 /* refresh, careful to warn just once */ 5004 if (rbd_dev->parent_overlap) 5005 rbd_warn(rbd_dev, 5006 "clone now standalone (overlap became 0)"); 5007 } else { 5008 /* initial probe */ 5009 rbd_warn(rbd_dev, "clone is standalone (overlap 0)"); 5010 } 5011 } 5012 rbd_dev->parent_overlap = pii.overlap; 5013 5014 out: 5015 ret = 0; 5016 out_err: 5017 kfree(pii.pool_ns); 5018 kfree(pii.image_id); 5019 rbd_spec_put(parent_spec); 5020 return ret; 5021 } 5022 5023 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) 5024 { 5025 struct { 5026 __le64 stripe_unit; 5027 __le64 stripe_count; 5028 } __attribute__ ((packed)) striping_info_buf = { 0 }; 5029 size_t size = sizeof (striping_info_buf); 5030 void *p; 5031 int ret; 5032 5033 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5034 &rbd_dev->header_oloc, "get_stripe_unit_count", 5035 NULL, 0, &striping_info_buf, size); 5036 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5037 if (ret < 0) 5038 return ret; 5039 if (ret < size) 5040 return -ERANGE; 5041 5042 p = &striping_info_buf; 5043 rbd_dev->header.stripe_unit = ceph_decode_64(&p); 5044 rbd_dev->header.stripe_count = ceph_decode_64(&p); 5045 return 0; 5046 } 5047 5048 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev) 5049 { 5050 __le64 data_pool_id; 5051 int ret; 5052 5053 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5054 &rbd_dev->header_oloc, "get_data_pool", 5055 NULL, 0, &data_pool_id, sizeof(data_pool_id)); 5056 if (ret < 0) 5057 return ret; 5058 if (ret < sizeof(data_pool_id)) 5059 return -EBADMSG; 5060 5061 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id); 5062 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL); 5063 return 0; 5064 } 5065 5066 static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 5067 { 5068 CEPH_DEFINE_OID_ONSTACK(oid); 5069 size_t image_id_size; 5070 char *image_id; 5071 void *p; 5072 void *end; 5073 size_t size; 5074 void *reply_buf = NULL; 5075 size_t len = 0; 5076 char *image_name = NULL; 5077 int ret; 5078 5079 rbd_assert(!rbd_dev->spec->image_name); 5080 5081 len = strlen(rbd_dev->spec->image_id); 5082 image_id_size = sizeof (__le32) + len; 5083 image_id = kmalloc(image_id_size, GFP_KERNEL); 5084 if (!image_id) 5085 return NULL; 5086 5087 p = image_id; 5088 end = image_id + image_id_size; 5089 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len); 5090 5091 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 5092 reply_buf = kmalloc(size, GFP_KERNEL); 5093 if (!reply_buf) 5094 goto out; 5095 5096 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY); 5097 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, 5098 "dir_get_name", image_id, image_id_size, 5099 reply_buf, size); 5100 if (ret < 0) 5101 goto out; 5102 p = reply_buf; 5103 end = reply_buf + ret; 5104 5105 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 5106 if (IS_ERR(image_name)) 5107 image_name = NULL; 5108 else 5109 dout("%s: name is %s len is %zd\n", __func__, image_name, len); 5110 out: 5111 kfree(reply_buf); 5112 kfree(image_id); 5113 5114 return image_name; 5115 } 5116 5117 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 5118 { 5119 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 5120 const char *snap_name; 5121 u32 which = 0; 5122 5123 /* Skip over names until we find the one we are looking for */ 5124 5125 snap_name = rbd_dev->header.snap_names; 5126 while (which < snapc->num_snaps) { 5127 if (!strcmp(name, snap_name)) 5128 return snapc->snaps[which]; 5129 snap_name += strlen(snap_name) + 1; 5130 which++; 5131 } 5132 return CEPH_NOSNAP; 5133 } 5134 5135 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 5136 { 5137 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 5138 u32 which; 5139 bool found = false; 5140 u64 snap_id; 5141 5142 for (which = 0; !found && which < snapc->num_snaps; which++) { 5143 const char *snap_name; 5144 5145 snap_id = snapc->snaps[which]; 5146 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id); 5147 if (IS_ERR(snap_name)) { 5148 /* ignore no-longer existing snapshots */ 5149 if (PTR_ERR(snap_name) == -ENOENT) 5150 continue; 5151 else 5152 break; 5153 } 5154 found = !strcmp(name, snap_name); 5155 kfree(snap_name); 5156 } 5157 return found ? snap_id : CEPH_NOSNAP; 5158 } 5159 5160 /* 5161 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if 5162 * no snapshot by that name is found, or if an error occurs. 5163 */ 5164 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 5165 { 5166 if (rbd_dev->image_format == 1) 5167 return rbd_v1_snap_id_by_name(rbd_dev, name); 5168 5169 return rbd_v2_snap_id_by_name(rbd_dev, name); 5170 } 5171 5172 /* 5173 * An image being mapped will have everything but the snap id. 5174 */ 5175 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev) 5176 { 5177 struct rbd_spec *spec = rbd_dev->spec; 5178 5179 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name); 5180 rbd_assert(spec->image_id && spec->image_name); 5181 rbd_assert(spec->snap_name); 5182 5183 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) { 5184 u64 snap_id; 5185 5186 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name); 5187 if (snap_id == CEPH_NOSNAP) 5188 return -ENOENT; 5189 5190 spec->snap_id = snap_id; 5191 } else { 5192 spec->snap_id = CEPH_NOSNAP; 5193 } 5194 5195 return 0; 5196 } 5197 5198 /* 5199 * A parent image will have all ids but none of the names. 5200 * 5201 * All names in an rbd spec are dynamically allocated. It's OK if we 5202 * can't figure out the name for an image id. 5203 */ 5204 static int rbd_spec_fill_names(struct rbd_device *rbd_dev) 5205 { 5206 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 5207 struct rbd_spec *spec = rbd_dev->spec; 5208 const char *pool_name; 5209 const char *image_name; 5210 const char *snap_name; 5211 int ret; 5212 5213 rbd_assert(spec->pool_id != CEPH_NOPOOL); 5214 rbd_assert(spec->image_id); 5215 rbd_assert(spec->snap_id != CEPH_NOSNAP); 5216 5217 /* Get the pool name; we have to make our own copy of this */ 5218 5219 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id); 5220 if (!pool_name) { 5221 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id); 5222 return -EIO; 5223 } 5224 pool_name = kstrdup(pool_name, GFP_KERNEL); 5225 if (!pool_name) 5226 return -ENOMEM; 5227 5228 /* Fetch the image name; tolerate failure here */ 5229 5230 image_name = rbd_dev_image_name(rbd_dev); 5231 if (!image_name) 5232 rbd_warn(rbd_dev, "unable to get image name"); 5233 5234 /* Fetch the snapshot name */ 5235 5236 snap_name = rbd_snap_name(rbd_dev, spec->snap_id); 5237 if (IS_ERR(snap_name)) { 5238 ret = PTR_ERR(snap_name); 5239 goto out_err; 5240 } 5241 5242 spec->pool_name = pool_name; 5243 spec->image_name = image_name; 5244 spec->snap_name = snap_name; 5245 5246 return 0; 5247 5248 out_err: 5249 kfree(image_name); 5250 kfree(pool_name); 5251 return ret; 5252 } 5253 5254 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) 5255 { 5256 size_t size; 5257 int ret; 5258 void *reply_buf; 5259 void *p; 5260 void *end; 5261 u64 seq; 5262 u32 snap_count; 5263 struct ceph_snap_context *snapc; 5264 u32 i; 5265 5266 /* 5267 * We'll need room for the seq value (maximum snapshot id), 5268 * snapshot count, and array of that many snapshot ids. 5269 * For now we have a fixed upper limit on the number we're 5270 * prepared to receive. 5271 */ 5272 size = sizeof (__le64) + sizeof (__le32) + 5273 RBD_MAX_SNAP_COUNT * sizeof (__le64); 5274 reply_buf = kzalloc(size, GFP_KERNEL); 5275 if (!reply_buf) 5276 return -ENOMEM; 5277 5278 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5279 &rbd_dev->header_oloc, "get_snapcontext", 5280 NULL, 0, reply_buf, size); 5281 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5282 if (ret < 0) 5283 goto out; 5284 5285 p = reply_buf; 5286 end = reply_buf + ret; 5287 ret = -ERANGE; 5288 ceph_decode_64_safe(&p, end, seq, out); 5289 ceph_decode_32_safe(&p, end, snap_count, out); 5290 5291 /* 5292 * Make sure the reported number of snapshot ids wouldn't go 5293 * beyond the end of our buffer. But before checking that, 5294 * make sure the computed size of the snapshot context we 5295 * allocate is representable in a size_t. 5296 */ 5297 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) 5298 / sizeof (u64)) { 5299 ret = -EINVAL; 5300 goto out; 5301 } 5302 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 5303 goto out; 5304 ret = 0; 5305 5306 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 5307 if (!snapc) { 5308 ret = -ENOMEM; 5309 goto out; 5310 } 5311 snapc->seq = seq; 5312 for (i = 0; i < snap_count; i++) 5313 snapc->snaps[i] = ceph_decode_64(&p); 5314 5315 ceph_put_snap_context(rbd_dev->header.snapc); 5316 rbd_dev->header.snapc = snapc; 5317 5318 dout(" snap context seq = %llu, snap_count = %u\n", 5319 (unsigned long long)seq, (unsigned int)snap_count); 5320 out: 5321 kfree(reply_buf); 5322 5323 return ret; 5324 } 5325 5326 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 5327 u64 snap_id) 5328 { 5329 size_t size; 5330 void *reply_buf; 5331 __le64 snapid; 5332 int ret; 5333 void *p; 5334 void *end; 5335 char *snap_name; 5336 5337 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 5338 reply_buf = kmalloc(size, GFP_KERNEL); 5339 if (!reply_buf) 5340 return ERR_PTR(-ENOMEM); 5341 5342 snapid = cpu_to_le64(snap_id); 5343 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5344 &rbd_dev->header_oloc, "get_snapshot_name", 5345 &snapid, sizeof(snapid), reply_buf, size); 5346 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5347 if (ret < 0) { 5348 snap_name = ERR_PTR(ret); 5349 goto out; 5350 } 5351 5352 p = reply_buf; 5353 end = reply_buf + ret; 5354 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 5355 if (IS_ERR(snap_name)) 5356 goto out; 5357 5358 dout(" snap_id 0x%016llx snap_name = %s\n", 5359 (unsigned long long)snap_id, snap_name); 5360 out: 5361 kfree(reply_buf); 5362 5363 return snap_name; 5364 } 5365 5366 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) 5367 { 5368 bool first_time = rbd_dev->header.object_prefix == NULL; 5369 int ret; 5370 5371 ret = rbd_dev_v2_image_size(rbd_dev); 5372 if (ret) 5373 return ret; 5374 5375 if (first_time) { 5376 ret = rbd_dev_v2_header_onetime(rbd_dev); 5377 if (ret) 5378 return ret; 5379 } 5380 5381 ret = rbd_dev_v2_snap_context(rbd_dev); 5382 if (ret && first_time) { 5383 kfree(rbd_dev->header.object_prefix); 5384 rbd_dev->header.object_prefix = NULL; 5385 } 5386 5387 return ret; 5388 } 5389 5390 static int rbd_dev_header_info(struct rbd_device *rbd_dev) 5391 { 5392 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 5393 5394 if (rbd_dev->image_format == 1) 5395 return rbd_dev_v1_header_info(rbd_dev); 5396 5397 return rbd_dev_v2_header_info(rbd_dev); 5398 } 5399 5400 /* 5401 * Skips over white space at *buf, and updates *buf to point to the 5402 * first found non-space character (if any). Returns the length of 5403 * the token (string of non-white space characters) found. Note 5404 * that *buf must be terminated with '\0'. 5405 */ 5406 static inline size_t next_token(const char **buf) 5407 { 5408 /* 5409 * These are the characters that produce nonzero for 5410 * isspace() in the "C" and "POSIX" locales. 5411 */ 5412 const char *spaces = " \f\n\r\t\v"; 5413 5414 *buf += strspn(*buf, spaces); /* Find start of token */ 5415 5416 return strcspn(*buf, spaces); /* Return token length */ 5417 } 5418 5419 /* 5420 * Finds the next token in *buf, dynamically allocates a buffer big 5421 * enough to hold a copy of it, and copies the token into the new 5422 * buffer. The copy is guaranteed to be terminated with '\0'. Note 5423 * that a duplicate buffer is created even for a zero-length token. 5424 * 5425 * Returns a pointer to the newly-allocated duplicate, or a null 5426 * pointer if memory for the duplicate was not available. If 5427 * the lenp argument is a non-null pointer, the length of the token 5428 * (not including the '\0') is returned in *lenp. 5429 * 5430 * If successful, the *buf pointer will be updated to point beyond 5431 * the end of the found token. 5432 * 5433 * Note: uses GFP_KERNEL for allocation. 5434 */ 5435 static inline char *dup_token(const char **buf, size_t *lenp) 5436 { 5437 char *dup; 5438 size_t len; 5439 5440 len = next_token(buf); 5441 dup = kmemdup(*buf, len + 1, GFP_KERNEL); 5442 if (!dup) 5443 return NULL; 5444 *(dup + len) = '\0'; 5445 *buf += len; 5446 5447 if (lenp) 5448 *lenp = len; 5449 5450 return dup; 5451 } 5452 5453 /* 5454 * Parse the options provided for an "rbd add" (i.e., rbd image 5455 * mapping) request. These arrive via a write to /sys/bus/rbd/add, 5456 * and the data written is passed here via a NUL-terminated buffer. 5457 * Returns 0 if successful or an error code otherwise. 5458 * 5459 * The information extracted from these options is recorded in 5460 * the other parameters which return dynamically-allocated 5461 * structures: 5462 * ceph_opts 5463 * The address of a pointer that will refer to a ceph options 5464 * structure. Caller must release the returned pointer using 5465 * ceph_destroy_options() when it is no longer needed. 5466 * rbd_opts 5467 * Address of an rbd options pointer. Fully initialized by 5468 * this function; caller must release with kfree(). 5469 * spec 5470 * Address of an rbd image specification pointer. Fully 5471 * initialized by this function based on parsed options. 5472 * Caller must release with rbd_spec_put(). 5473 * 5474 * The options passed take this form: 5475 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] 5476 * where: 5477 * <mon_addrs> 5478 * A comma-separated list of one or more monitor addresses. 5479 * A monitor address is an ip address, optionally followed 5480 * by a port number (separated by a colon). 5481 * I.e.: ip1[:port1][,ip2[:port2]...] 5482 * <options> 5483 * A comma-separated list of ceph and/or rbd options. 5484 * <pool_name> 5485 * The name of the rados pool containing the rbd image. 5486 * <image_name> 5487 * The name of the image in that pool to map. 5488 * <snap_id> 5489 * An optional snapshot id. If provided, the mapping will 5490 * present data from the image at the time that snapshot was 5491 * created. The image head is used if no snapshot id is 5492 * provided. Snapshot mappings are always read-only. 5493 */ 5494 static int rbd_add_parse_args(const char *buf, 5495 struct ceph_options **ceph_opts, 5496 struct rbd_options **opts, 5497 struct rbd_spec **rbd_spec) 5498 { 5499 size_t len; 5500 char *options; 5501 const char *mon_addrs; 5502 char *snap_name; 5503 size_t mon_addrs_size; 5504 struct parse_rbd_opts_ctx pctx = { 0 }; 5505 struct ceph_options *copts; 5506 int ret; 5507 5508 /* The first four tokens are required */ 5509 5510 len = next_token(&buf); 5511 if (!len) { 5512 rbd_warn(NULL, "no monitor address(es) provided"); 5513 return -EINVAL; 5514 } 5515 mon_addrs = buf; 5516 mon_addrs_size = len + 1; 5517 buf += len; 5518 5519 ret = -EINVAL; 5520 options = dup_token(&buf, NULL); 5521 if (!options) 5522 return -ENOMEM; 5523 if (!*options) { 5524 rbd_warn(NULL, "no options provided"); 5525 goto out_err; 5526 } 5527 5528 pctx.spec = rbd_spec_alloc(); 5529 if (!pctx.spec) 5530 goto out_mem; 5531 5532 pctx.spec->pool_name = dup_token(&buf, NULL); 5533 if (!pctx.spec->pool_name) 5534 goto out_mem; 5535 if (!*pctx.spec->pool_name) { 5536 rbd_warn(NULL, "no pool name provided"); 5537 goto out_err; 5538 } 5539 5540 pctx.spec->image_name = dup_token(&buf, NULL); 5541 if (!pctx.spec->image_name) 5542 goto out_mem; 5543 if (!*pctx.spec->image_name) { 5544 rbd_warn(NULL, "no image name provided"); 5545 goto out_err; 5546 } 5547 5548 /* 5549 * Snapshot name is optional; default is to use "-" 5550 * (indicating the head/no snapshot). 5551 */ 5552 len = next_token(&buf); 5553 if (!len) { 5554 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 5555 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 5556 } else if (len > RBD_MAX_SNAP_NAME_LEN) { 5557 ret = -ENAMETOOLONG; 5558 goto out_err; 5559 } 5560 snap_name = kmemdup(buf, len + 1, GFP_KERNEL); 5561 if (!snap_name) 5562 goto out_mem; 5563 *(snap_name + len) = '\0'; 5564 pctx.spec->snap_name = snap_name; 5565 5566 /* Initialize all rbd options to the defaults */ 5567 5568 pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL); 5569 if (!pctx.opts) 5570 goto out_mem; 5571 5572 pctx.opts->read_only = RBD_READ_ONLY_DEFAULT; 5573 pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; 5574 pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT; 5575 pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT; 5576 pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; 5577 pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT; 5578 pctx.opts->trim = RBD_TRIM_DEFAULT; 5579 5580 copts = ceph_parse_options(options, mon_addrs, 5581 mon_addrs + mon_addrs_size - 1, 5582 parse_rbd_opts_token, &pctx); 5583 if (IS_ERR(copts)) { 5584 ret = PTR_ERR(copts); 5585 goto out_err; 5586 } 5587 kfree(options); 5588 5589 *ceph_opts = copts; 5590 *opts = pctx.opts; 5591 *rbd_spec = pctx.spec; 5592 5593 return 0; 5594 out_mem: 5595 ret = -ENOMEM; 5596 out_err: 5597 kfree(pctx.opts); 5598 rbd_spec_put(pctx.spec); 5599 kfree(options); 5600 5601 return ret; 5602 } 5603 5604 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) 5605 { 5606 down_write(&rbd_dev->lock_rwsem); 5607 if (__rbd_is_lock_owner(rbd_dev)) 5608 rbd_unlock(rbd_dev); 5609 up_write(&rbd_dev->lock_rwsem); 5610 } 5611 5612 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) 5613 { 5614 int ret; 5615 5616 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { 5617 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); 5618 return -EINVAL; 5619 } 5620 5621 /* FIXME: "rbd map --exclusive" should be in interruptible */ 5622 down_read(&rbd_dev->lock_rwsem); 5623 ret = rbd_wait_state_locked(rbd_dev, true); 5624 up_read(&rbd_dev->lock_rwsem); 5625 if (ret) { 5626 rbd_warn(rbd_dev, "failed to acquire exclusive lock"); 5627 return -EROFS; 5628 } 5629 5630 return 0; 5631 } 5632 5633 /* 5634 * An rbd format 2 image has a unique identifier, distinct from the 5635 * name given to it by the user. Internally, that identifier is 5636 * what's used to specify the names of objects related to the image. 5637 * 5638 * A special "rbd id" object is used to map an rbd image name to its 5639 * id. If that object doesn't exist, then there is no v2 rbd image 5640 * with the supplied name. 5641 * 5642 * This function will record the given rbd_dev's image_id field if 5643 * it can be determined, and in that case will return 0. If any 5644 * errors occur a negative errno will be returned and the rbd_dev's 5645 * image_id field will be unchanged (and should be NULL). 5646 */ 5647 static int rbd_dev_image_id(struct rbd_device *rbd_dev) 5648 { 5649 int ret; 5650 size_t size; 5651 CEPH_DEFINE_OID_ONSTACK(oid); 5652 void *response; 5653 char *image_id; 5654 5655 /* 5656 * When probing a parent image, the image id is already 5657 * known (and the image name likely is not). There's no 5658 * need to fetch the image id again in this case. We 5659 * do still need to set the image format though. 5660 */ 5661 if (rbd_dev->spec->image_id) { 5662 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1; 5663 5664 return 0; 5665 } 5666 5667 /* 5668 * First, see if the format 2 image id file exists, and if 5669 * so, get the image's persistent id from it. 5670 */ 5671 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX, 5672 rbd_dev->spec->image_name); 5673 if (ret) 5674 return ret; 5675 5676 dout("rbd id object name is %s\n", oid.name); 5677 5678 /* Response will be an encoded string, which includes a length */ 5679 5680 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; 5681 response = kzalloc(size, GFP_NOIO); 5682 if (!response) { 5683 ret = -ENOMEM; 5684 goto out; 5685 } 5686 5687 /* If it doesn't exist we'll assume it's a format 1 image */ 5688 5689 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, 5690 "get_id", NULL, 0, 5691 response, RBD_IMAGE_ID_LEN_MAX); 5692 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5693 if (ret == -ENOENT) { 5694 image_id = kstrdup("", GFP_KERNEL); 5695 ret = image_id ? 0 : -ENOMEM; 5696 if (!ret) 5697 rbd_dev->image_format = 1; 5698 } else if (ret >= 0) { 5699 void *p = response; 5700 5701 image_id = ceph_extract_encoded_string(&p, p + ret, 5702 NULL, GFP_NOIO); 5703 ret = PTR_ERR_OR_ZERO(image_id); 5704 if (!ret) 5705 rbd_dev->image_format = 2; 5706 } 5707 5708 if (!ret) { 5709 rbd_dev->spec->image_id = image_id; 5710 dout("image_id is %s\n", image_id); 5711 } 5712 out: 5713 kfree(response); 5714 ceph_oid_destroy(&oid); 5715 return ret; 5716 } 5717 5718 /* 5719 * Undo whatever state changes are made by v1 or v2 header info 5720 * call. 5721 */ 5722 static void rbd_dev_unprobe(struct rbd_device *rbd_dev) 5723 { 5724 struct rbd_image_header *header; 5725 5726 rbd_dev_parent_put(rbd_dev); 5727 5728 /* Free dynamic fields from the header, then zero it out */ 5729 5730 header = &rbd_dev->header; 5731 ceph_put_snap_context(header->snapc); 5732 kfree(header->snap_sizes); 5733 kfree(header->snap_names); 5734 kfree(header->object_prefix); 5735 memset(header, 0, sizeof (*header)); 5736 } 5737 5738 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) 5739 { 5740 int ret; 5741 5742 ret = rbd_dev_v2_object_prefix(rbd_dev); 5743 if (ret) 5744 goto out_err; 5745 5746 /* 5747 * Get the and check features for the image. Currently the 5748 * features are assumed to never change. 5749 */ 5750 ret = rbd_dev_v2_features(rbd_dev); 5751 if (ret) 5752 goto out_err; 5753 5754 /* If the image supports fancy striping, get its parameters */ 5755 5756 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { 5757 ret = rbd_dev_v2_striping_info(rbd_dev); 5758 if (ret < 0) 5759 goto out_err; 5760 } 5761 5762 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) { 5763 ret = rbd_dev_v2_data_pool(rbd_dev); 5764 if (ret) 5765 goto out_err; 5766 } 5767 5768 rbd_init_layout(rbd_dev); 5769 return 0; 5770 5771 out_err: 5772 rbd_dev->header.features = 0; 5773 kfree(rbd_dev->header.object_prefix); 5774 rbd_dev->header.object_prefix = NULL; 5775 return ret; 5776 } 5777 5778 /* 5779 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() -> 5780 * rbd_dev_image_probe() recursion depth, which means it's also the 5781 * length of the already discovered part of the parent chain. 5782 */ 5783 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) 5784 { 5785 struct rbd_device *parent = NULL; 5786 int ret; 5787 5788 if (!rbd_dev->parent_spec) 5789 return 0; 5790 5791 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) { 5792 pr_info("parent chain is too long (%d)\n", depth); 5793 ret = -EINVAL; 5794 goto out_err; 5795 } 5796 5797 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec); 5798 if (!parent) { 5799 ret = -ENOMEM; 5800 goto out_err; 5801 } 5802 5803 /* 5804 * Images related by parent/child relationships always share 5805 * rbd_client and spec/parent_spec, so bump their refcounts. 5806 */ 5807 __rbd_get_client(rbd_dev->rbd_client); 5808 rbd_spec_get(rbd_dev->parent_spec); 5809 5810 ret = rbd_dev_image_probe(parent, depth); 5811 if (ret < 0) 5812 goto out_err; 5813 5814 rbd_dev->parent = parent; 5815 atomic_set(&rbd_dev->parent_ref, 1); 5816 return 0; 5817 5818 out_err: 5819 rbd_dev_unparent(rbd_dev); 5820 rbd_dev_destroy(parent); 5821 return ret; 5822 } 5823 5824 static void rbd_dev_device_release(struct rbd_device *rbd_dev) 5825 { 5826 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5827 rbd_dev_mapping_clear(rbd_dev); 5828 rbd_free_disk(rbd_dev); 5829 if (!single_major) 5830 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5831 } 5832 5833 /* 5834 * rbd_dev->header_rwsem must be locked for write and will be unlocked 5835 * upon return. 5836 */ 5837 static int rbd_dev_device_setup(struct rbd_device *rbd_dev) 5838 { 5839 int ret; 5840 5841 /* Record our major and minor device numbers. */ 5842 5843 if (!single_major) { 5844 ret = register_blkdev(0, rbd_dev->name); 5845 if (ret < 0) 5846 goto err_out_unlock; 5847 5848 rbd_dev->major = ret; 5849 rbd_dev->minor = 0; 5850 } else { 5851 rbd_dev->major = rbd_major; 5852 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id); 5853 } 5854 5855 /* Set up the blkdev mapping. */ 5856 5857 ret = rbd_init_disk(rbd_dev); 5858 if (ret) 5859 goto err_out_blkdev; 5860 5861 ret = rbd_dev_mapping_set(rbd_dev); 5862 if (ret) 5863 goto err_out_disk; 5864 5865 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 5866 set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only); 5867 5868 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); 5869 if (ret) 5870 goto err_out_mapping; 5871 5872 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5873 up_write(&rbd_dev->header_rwsem); 5874 return 0; 5875 5876 err_out_mapping: 5877 rbd_dev_mapping_clear(rbd_dev); 5878 err_out_disk: 5879 rbd_free_disk(rbd_dev); 5880 err_out_blkdev: 5881 if (!single_major) 5882 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5883 err_out_unlock: 5884 up_write(&rbd_dev->header_rwsem); 5885 return ret; 5886 } 5887 5888 static int rbd_dev_header_name(struct rbd_device *rbd_dev) 5889 { 5890 struct rbd_spec *spec = rbd_dev->spec; 5891 int ret; 5892 5893 /* Record the header object name for this rbd image. */ 5894 5895 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 5896 if (rbd_dev->image_format == 1) 5897 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", 5898 spec->image_name, RBD_SUFFIX); 5899 else 5900 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", 5901 RBD_HEADER_PREFIX, spec->image_id); 5902 5903 return ret; 5904 } 5905 5906 static void rbd_dev_image_release(struct rbd_device *rbd_dev) 5907 { 5908 rbd_dev_unprobe(rbd_dev); 5909 if (rbd_dev->opts) 5910 rbd_unregister_watch(rbd_dev); 5911 rbd_dev->image_format = 0; 5912 kfree(rbd_dev->spec->image_id); 5913 rbd_dev->spec->image_id = NULL; 5914 } 5915 5916 /* 5917 * Probe for the existence of the header object for the given rbd 5918 * device. If this image is the one being mapped (i.e., not a 5919 * parent), initiate a watch on its header object before using that 5920 * object to get detailed information about the rbd image. 5921 */ 5922 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) 5923 { 5924 int ret; 5925 5926 /* 5927 * Get the id from the image id object. Unless there's an 5928 * error, rbd_dev->spec->image_id will be filled in with 5929 * a dynamically-allocated string, and rbd_dev->image_format 5930 * will be set to either 1 or 2. 5931 */ 5932 ret = rbd_dev_image_id(rbd_dev); 5933 if (ret) 5934 return ret; 5935 5936 ret = rbd_dev_header_name(rbd_dev); 5937 if (ret) 5938 goto err_out_format; 5939 5940 if (!depth) { 5941 ret = rbd_register_watch(rbd_dev); 5942 if (ret) { 5943 if (ret == -ENOENT) 5944 pr_info("image %s/%s%s%s does not exist\n", 5945 rbd_dev->spec->pool_name, 5946 rbd_dev->spec->pool_ns ?: "", 5947 rbd_dev->spec->pool_ns ? "/" : "", 5948 rbd_dev->spec->image_name); 5949 goto err_out_format; 5950 } 5951 } 5952 5953 ret = rbd_dev_header_info(rbd_dev); 5954 if (ret) 5955 goto err_out_watch; 5956 5957 /* 5958 * If this image is the one being mapped, we have pool name and 5959 * id, image name and id, and snap name - need to fill snap id. 5960 * Otherwise this is a parent image, identified by pool, image 5961 * and snap ids - need to fill in names for those ids. 5962 */ 5963 if (!depth) 5964 ret = rbd_spec_fill_snap_id(rbd_dev); 5965 else 5966 ret = rbd_spec_fill_names(rbd_dev); 5967 if (ret) { 5968 if (ret == -ENOENT) 5969 pr_info("snap %s/%s%s%s@%s does not exist\n", 5970 rbd_dev->spec->pool_name, 5971 rbd_dev->spec->pool_ns ?: "", 5972 rbd_dev->spec->pool_ns ? "/" : "", 5973 rbd_dev->spec->image_name, 5974 rbd_dev->spec->snap_name); 5975 goto err_out_probe; 5976 } 5977 5978 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 5979 ret = rbd_dev_v2_parent_info(rbd_dev); 5980 if (ret) 5981 goto err_out_probe; 5982 } 5983 5984 ret = rbd_dev_probe_parent(rbd_dev, depth); 5985 if (ret) 5986 goto err_out_probe; 5987 5988 dout("discovered format %u image, header name is %s\n", 5989 rbd_dev->image_format, rbd_dev->header_oid.name); 5990 return 0; 5991 5992 err_out_probe: 5993 rbd_dev_unprobe(rbd_dev); 5994 err_out_watch: 5995 if (!depth) 5996 rbd_unregister_watch(rbd_dev); 5997 err_out_format: 5998 rbd_dev->image_format = 0; 5999 kfree(rbd_dev->spec->image_id); 6000 rbd_dev->spec->image_id = NULL; 6001 return ret; 6002 } 6003 6004 static ssize_t do_rbd_add(struct bus_type *bus, 6005 const char *buf, 6006 size_t count) 6007 { 6008 struct rbd_device *rbd_dev = NULL; 6009 struct ceph_options *ceph_opts = NULL; 6010 struct rbd_options *rbd_opts = NULL; 6011 struct rbd_spec *spec = NULL; 6012 struct rbd_client *rbdc; 6013 int rc; 6014 6015 if (!try_module_get(THIS_MODULE)) 6016 return -ENODEV; 6017 6018 /* parse add command */ 6019 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 6020 if (rc < 0) 6021 goto out; 6022 6023 rbdc = rbd_get_client(ceph_opts); 6024 if (IS_ERR(rbdc)) { 6025 rc = PTR_ERR(rbdc); 6026 goto err_out_args; 6027 } 6028 6029 /* pick the pool */ 6030 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name); 6031 if (rc < 0) { 6032 if (rc == -ENOENT) 6033 pr_info("pool %s does not exist\n", spec->pool_name); 6034 goto err_out_client; 6035 } 6036 spec->pool_id = (u64)rc; 6037 6038 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts); 6039 if (!rbd_dev) { 6040 rc = -ENOMEM; 6041 goto err_out_client; 6042 } 6043 rbdc = NULL; /* rbd_dev now owns this */ 6044 spec = NULL; /* rbd_dev now owns this */ 6045 rbd_opts = NULL; /* rbd_dev now owns this */ 6046 6047 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL); 6048 if (!rbd_dev->config_info) { 6049 rc = -ENOMEM; 6050 goto err_out_rbd_dev; 6051 } 6052 6053 down_write(&rbd_dev->header_rwsem); 6054 rc = rbd_dev_image_probe(rbd_dev, 0); 6055 if (rc < 0) { 6056 up_write(&rbd_dev->header_rwsem); 6057 goto err_out_rbd_dev; 6058 } 6059 6060 /* If we are mapping a snapshot it must be marked read-only */ 6061 if (rbd_dev->spec->snap_id != CEPH_NOSNAP) 6062 rbd_dev->opts->read_only = true; 6063 6064 if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) { 6065 rbd_warn(rbd_dev, "alloc_size adjusted to %u", 6066 rbd_dev->layout.object_size); 6067 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size; 6068 } 6069 6070 rc = rbd_dev_device_setup(rbd_dev); 6071 if (rc) 6072 goto err_out_image_probe; 6073 6074 if (rbd_dev->opts->exclusive) { 6075 rc = rbd_add_acquire_lock(rbd_dev); 6076 if (rc) 6077 goto err_out_device_setup; 6078 } 6079 6080 /* Everything's ready. Announce the disk to the world. */ 6081 6082 rc = device_add(&rbd_dev->dev); 6083 if (rc) 6084 goto err_out_image_lock; 6085 6086 add_disk(rbd_dev->disk); 6087 /* see rbd_init_disk() */ 6088 blk_put_queue(rbd_dev->disk->queue); 6089 6090 spin_lock(&rbd_dev_list_lock); 6091 list_add_tail(&rbd_dev->node, &rbd_dev_list); 6092 spin_unlock(&rbd_dev_list_lock); 6093 6094 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name, 6095 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT, 6096 rbd_dev->header.features); 6097 rc = count; 6098 out: 6099 module_put(THIS_MODULE); 6100 return rc; 6101 6102 err_out_image_lock: 6103 rbd_dev_image_unlock(rbd_dev); 6104 err_out_device_setup: 6105 rbd_dev_device_release(rbd_dev); 6106 err_out_image_probe: 6107 rbd_dev_image_release(rbd_dev); 6108 err_out_rbd_dev: 6109 rbd_dev_destroy(rbd_dev); 6110 err_out_client: 6111 rbd_put_client(rbdc); 6112 err_out_args: 6113 rbd_spec_put(spec); 6114 kfree(rbd_opts); 6115 goto out; 6116 } 6117 6118 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count) 6119 { 6120 if (single_major) 6121 return -EINVAL; 6122 6123 return do_rbd_add(bus, buf, count); 6124 } 6125 6126 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf, 6127 size_t count) 6128 { 6129 return do_rbd_add(bus, buf, count); 6130 } 6131 6132 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) 6133 { 6134 while (rbd_dev->parent) { 6135 struct rbd_device *first = rbd_dev; 6136 struct rbd_device *second = first->parent; 6137 struct rbd_device *third; 6138 6139 /* 6140 * Follow to the parent with no grandparent and 6141 * remove it. 6142 */ 6143 while (second && (third = second->parent)) { 6144 first = second; 6145 second = third; 6146 } 6147 rbd_assert(second); 6148 rbd_dev_image_release(second); 6149 rbd_dev_destroy(second); 6150 first->parent = NULL; 6151 first->parent_overlap = 0; 6152 6153 rbd_assert(first->parent_spec); 6154 rbd_spec_put(first->parent_spec); 6155 first->parent_spec = NULL; 6156 } 6157 } 6158 6159 static ssize_t do_rbd_remove(struct bus_type *bus, 6160 const char *buf, 6161 size_t count) 6162 { 6163 struct rbd_device *rbd_dev = NULL; 6164 struct list_head *tmp; 6165 int dev_id; 6166 char opt_buf[6]; 6167 bool force = false; 6168 int ret; 6169 6170 dev_id = -1; 6171 opt_buf[0] = '\0'; 6172 sscanf(buf, "%d %5s", &dev_id, opt_buf); 6173 if (dev_id < 0) { 6174 pr_err("dev_id out of range\n"); 6175 return -EINVAL; 6176 } 6177 if (opt_buf[0] != '\0') { 6178 if (!strcmp(opt_buf, "force")) { 6179 force = true; 6180 } else { 6181 pr_err("bad remove option at '%s'\n", opt_buf); 6182 return -EINVAL; 6183 } 6184 } 6185 6186 ret = -ENOENT; 6187 spin_lock(&rbd_dev_list_lock); 6188 list_for_each(tmp, &rbd_dev_list) { 6189 rbd_dev = list_entry(tmp, struct rbd_device, node); 6190 if (rbd_dev->dev_id == dev_id) { 6191 ret = 0; 6192 break; 6193 } 6194 } 6195 if (!ret) { 6196 spin_lock_irq(&rbd_dev->lock); 6197 if (rbd_dev->open_count && !force) 6198 ret = -EBUSY; 6199 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING, 6200 &rbd_dev->flags)) 6201 ret = -EINPROGRESS; 6202 spin_unlock_irq(&rbd_dev->lock); 6203 } 6204 spin_unlock(&rbd_dev_list_lock); 6205 if (ret) 6206 return ret; 6207 6208 if (force) { 6209 /* 6210 * Prevent new IO from being queued and wait for existing 6211 * IO to complete/fail. 6212 */ 6213 blk_mq_freeze_queue(rbd_dev->disk->queue); 6214 blk_set_queue_dying(rbd_dev->disk->queue); 6215 } 6216 6217 del_gendisk(rbd_dev->disk); 6218 spin_lock(&rbd_dev_list_lock); 6219 list_del_init(&rbd_dev->node); 6220 spin_unlock(&rbd_dev_list_lock); 6221 device_del(&rbd_dev->dev); 6222 6223 rbd_dev_image_unlock(rbd_dev); 6224 rbd_dev_device_release(rbd_dev); 6225 rbd_dev_image_release(rbd_dev); 6226 rbd_dev_destroy(rbd_dev); 6227 return count; 6228 } 6229 6230 static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count) 6231 { 6232 if (single_major) 6233 return -EINVAL; 6234 6235 return do_rbd_remove(bus, buf, count); 6236 } 6237 6238 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf, 6239 size_t count) 6240 { 6241 return do_rbd_remove(bus, buf, count); 6242 } 6243 6244 /* 6245 * create control files in sysfs 6246 * /sys/bus/rbd/... 6247 */ 6248 static int __init rbd_sysfs_init(void) 6249 { 6250 int ret; 6251 6252 ret = device_register(&rbd_root_dev); 6253 if (ret < 0) 6254 return ret; 6255 6256 ret = bus_register(&rbd_bus_type); 6257 if (ret < 0) 6258 device_unregister(&rbd_root_dev); 6259 6260 return ret; 6261 } 6262 6263 static void __exit rbd_sysfs_cleanup(void) 6264 { 6265 bus_unregister(&rbd_bus_type); 6266 device_unregister(&rbd_root_dev); 6267 } 6268 6269 static int __init rbd_slab_init(void) 6270 { 6271 rbd_assert(!rbd_img_request_cache); 6272 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0); 6273 if (!rbd_img_request_cache) 6274 return -ENOMEM; 6275 6276 rbd_assert(!rbd_obj_request_cache); 6277 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0); 6278 if (!rbd_obj_request_cache) 6279 goto out_err; 6280 6281 return 0; 6282 6283 out_err: 6284 kmem_cache_destroy(rbd_img_request_cache); 6285 rbd_img_request_cache = NULL; 6286 return -ENOMEM; 6287 } 6288 6289 static void rbd_slab_exit(void) 6290 { 6291 rbd_assert(rbd_obj_request_cache); 6292 kmem_cache_destroy(rbd_obj_request_cache); 6293 rbd_obj_request_cache = NULL; 6294 6295 rbd_assert(rbd_img_request_cache); 6296 kmem_cache_destroy(rbd_img_request_cache); 6297 rbd_img_request_cache = NULL; 6298 } 6299 6300 static int __init rbd_init(void) 6301 { 6302 int rc; 6303 6304 if (!libceph_compatible(NULL)) { 6305 rbd_warn(NULL, "libceph incompatibility (quitting)"); 6306 return -EINVAL; 6307 } 6308 6309 rc = rbd_slab_init(); 6310 if (rc) 6311 return rc; 6312 6313 /* 6314 * The number of active work items is limited by the number of 6315 * rbd devices * queue depth, so leave @max_active at default. 6316 */ 6317 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0); 6318 if (!rbd_wq) { 6319 rc = -ENOMEM; 6320 goto err_out_slab; 6321 } 6322 6323 if (single_major) { 6324 rbd_major = register_blkdev(0, RBD_DRV_NAME); 6325 if (rbd_major < 0) { 6326 rc = rbd_major; 6327 goto err_out_wq; 6328 } 6329 } 6330 6331 rc = rbd_sysfs_init(); 6332 if (rc) 6333 goto err_out_blkdev; 6334 6335 if (single_major) 6336 pr_info("loaded (major %d)\n", rbd_major); 6337 else 6338 pr_info("loaded\n"); 6339 6340 return 0; 6341 6342 err_out_blkdev: 6343 if (single_major) 6344 unregister_blkdev(rbd_major, RBD_DRV_NAME); 6345 err_out_wq: 6346 destroy_workqueue(rbd_wq); 6347 err_out_slab: 6348 rbd_slab_exit(); 6349 return rc; 6350 } 6351 6352 static void __exit rbd_exit(void) 6353 { 6354 ida_destroy(&rbd_dev_id_ida); 6355 rbd_sysfs_cleanup(); 6356 if (single_major) 6357 unregister_blkdev(rbd_major, RBD_DRV_NAME); 6358 destroy_workqueue(rbd_wq); 6359 rbd_slab_exit(); 6360 } 6361 6362 module_init(rbd_init); 6363 module_exit(rbd_exit); 6364 6365 MODULE_AUTHOR("Alex Elder <elder@inktank.com>"); 6366 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 6367 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 6368 /* following authorship retained from original osdblk.c */ 6369 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 6370 6371 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver"); 6372 MODULE_LICENSE("GPL"); 6373