xref: /linux/drivers/block/rbd.c (revision 6eb2fb3170549737207974c2c6ad34bcc2f3025e)
1 
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4 
5 
6    based on drivers/block/osdblk.c:
7 
8    Copyright 2009 Red Hat, Inc.
9 
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22 
23 
24 
25    For usage instructions, please refer to:
26 
27                  Documentation/ABI/testing/sysfs-bus-rbd
28 
29  */
30 
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37 
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44 
45 #include "rbd_types.h"
46 
47 #define RBD_DEBUG	/* Activate rbd_assert() calls */
48 
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define	SECTOR_SHIFT	9
56 #define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
57 
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60 
61 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
62 
63 #define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
64 #define RBD_MAX_SNAP_NAME_LEN	\
65 			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66 
67 #define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
68 
69 #define RBD_SNAP_HEAD_NAME	"-"
70 
71 #define	BAD_SNAP_INDEX	U32_MAX		/* invalid index into snap array */
72 
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX	64
76 
77 #define RBD_OBJ_PREFIX_LEN_MAX	64
78 
79 /* Feature bits */
80 
81 #define RBD_FEATURE_LAYERING	(1<<0)
82 #define RBD_FEATURE_STRIPINGV2	(1<<1)
83 #define RBD_FEATURES_ALL \
84 	    (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85 
86 /* Features supported by this (client software) implementation. */
87 
88 #define RBD_FEATURES_SUPPORTED	(RBD_FEATURES_ALL)
89 
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN		32
97 #define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
98 
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103 	/* These four fields never change for a given rbd image */
104 	char *object_prefix;
105 	u64 features;
106 	__u8 obj_order;
107 	__u8 crypt_type;
108 	__u8 comp_type;
109 
110 	/* The remaining fields need to be updated occasionally */
111 	u64 image_size;
112 	struct ceph_snap_context *snapc;
113 	char *snap_names;
114 	u64 *snap_sizes;
115 
116 	u64 stripe_unit;
117 	u64 stripe_count;
118 };
119 
120 /*
121  * An rbd image specification.
122  *
123  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
124  * identify an image.  Each rbd_dev structure includes a pointer to
125  * an rbd_spec structure that encapsulates this identity.
126  *
127  * Each of the id's in an rbd_spec has an associated name.  For a
128  * user-mapped image, the names are supplied and the id's associated
129  * with them are looked up.  For a layered image, a parent image is
130  * defined by the tuple, and the names are looked up.
131  *
132  * An rbd_dev structure contains a parent_spec pointer which is
133  * non-null if the image it represents is a child in a layered
134  * image.  This pointer will refer to the rbd_spec structure used
135  * by the parent rbd_dev for its own identity (i.e., the structure
136  * is shared between the parent and child).
137  *
138  * Since these structures are populated once, during the discovery
139  * phase of image construction, they are effectively immutable so
140  * we make no effort to synchronize access to them.
141  *
142  * Note that code herein does not assume the image name is known (it
143  * could be a null pointer).
144  */
145 struct rbd_spec {
146 	u64		pool_id;
147 	const char	*pool_name;
148 
149 	const char	*image_id;
150 	const char	*image_name;
151 
152 	u64		snap_id;
153 	const char	*snap_name;
154 
155 	struct kref	kref;
156 };
157 
158 /*
159  * an instance of the client.  multiple devices may share an rbd client.
160  */
161 struct rbd_client {
162 	struct ceph_client	*client;
163 	struct kref		kref;
164 	struct list_head	node;
165 };
166 
167 struct rbd_img_request;
168 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169 
170 #define	BAD_WHICH	U32_MAX		/* Good which or bad which, which? */
171 
172 struct rbd_obj_request;
173 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174 
175 enum obj_request_type {
176 	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
177 };
178 
179 enum obj_req_flags {
180 	OBJ_REQ_DONE,		/* completion flag: not done = 0, done = 1 */
181 	OBJ_REQ_IMG_DATA,	/* object usage: standalone = 0, image = 1 */
182 	OBJ_REQ_KNOWN,		/* EXISTS flag valid: no = 0, yes = 1 */
183 	OBJ_REQ_EXISTS,		/* target exists: no = 0, yes = 1 */
184 };
185 
186 struct rbd_obj_request {
187 	const char		*object_name;
188 	u64			offset;		/* object start byte */
189 	u64			length;		/* bytes from offset */
190 	unsigned long		flags;
191 
192 	/*
193 	 * An object request associated with an image will have its
194 	 * img_data flag set; a standalone object request will not.
195 	 *
196 	 * A standalone object request will have which == BAD_WHICH
197 	 * and a null obj_request pointer.
198 	 *
199 	 * An object request initiated in support of a layered image
200 	 * object (to check for its existence before a write) will
201 	 * have which == BAD_WHICH and a non-null obj_request pointer.
202 	 *
203 	 * Finally, an object request for rbd image data will have
204 	 * which != BAD_WHICH, and will have a non-null img_request
205 	 * pointer.  The value of which will be in the range
206 	 * 0..(img_request->obj_request_count-1).
207 	 */
208 	union {
209 		struct rbd_obj_request	*obj_request;	/* STAT op */
210 		struct {
211 			struct rbd_img_request	*img_request;
212 			u64			img_offset;
213 			/* links for img_request->obj_requests list */
214 			struct list_head	links;
215 		};
216 	};
217 	u32			which;		/* posn image request list */
218 
219 	enum obj_request_type	type;
220 	union {
221 		struct bio	*bio_list;
222 		struct {
223 			struct page	**pages;
224 			u32		page_count;
225 		};
226 	};
227 	struct page		**copyup_pages;
228 
229 	struct ceph_osd_request	*osd_req;
230 
231 	u64			xferred;	/* bytes transferred */
232 	int			result;
233 
234 	rbd_obj_callback_t	callback;
235 	struct completion	completion;
236 
237 	struct kref		kref;
238 };
239 
240 enum img_req_flags {
241 	IMG_REQ_WRITE,		/* I/O direction: read = 0, write = 1 */
242 	IMG_REQ_CHILD,		/* initiator: block = 0, child image = 1 */
243 	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
244 };
245 
246 struct rbd_img_request {
247 	struct rbd_device	*rbd_dev;
248 	u64			offset;	/* starting image byte offset */
249 	u64			length;	/* byte count from offset */
250 	unsigned long		flags;
251 	union {
252 		u64			snap_id;	/* for reads */
253 		struct ceph_snap_context *snapc;	/* for writes */
254 	};
255 	union {
256 		struct request		*rq;		/* block request */
257 		struct rbd_obj_request	*obj_request;	/* obj req initiator */
258 	};
259 	struct page		**copyup_pages;
260 	spinlock_t		completion_lock;/* protects next_completion */
261 	u32			next_completion;
262 	rbd_img_callback_t	callback;
263 	u64			xferred;/* aggregate bytes transferred */
264 	int			result;	/* first nonzero obj_request result */
265 
266 	u32			obj_request_count;
267 	struct list_head	obj_requests;	/* rbd_obj_request structs */
268 
269 	struct kref		kref;
270 };
271 
272 #define for_each_obj_request(ireq, oreq) \
273 	list_for_each_entry(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_from(ireq, oreq) \
275 	list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
276 #define for_each_obj_request_safe(ireq, oreq, n) \
277 	list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
278 
279 struct rbd_mapping {
280 	u64                     size;
281 	u64                     features;
282 	bool			read_only;
283 };
284 
285 /*
286  * a single device
287  */
288 struct rbd_device {
289 	int			dev_id;		/* blkdev unique id */
290 
291 	int			major;		/* blkdev assigned major */
292 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
293 
294 	u32			image_format;	/* Either 1 or 2 */
295 	struct rbd_client	*rbd_client;
296 
297 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
298 
299 	spinlock_t		lock;		/* queue, flags, open_count */
300 
301 	struct rbd_image_header	header;
302 	unsigned long		flags;		/* possibly lock protected */
303 	struct rbd_spec		*spec;
304 
305 	char			*header_name;
306 
307 	struct ceph_file_layout	layout;
308 
309 	struct ceph_osd_event   *watch_event;
310 	struct rbd_obj_request	*watch_request;
311 
312 	struct rbd_spec		*parent_spec;
313 	u64			parent_overlap;
314 	struct rbd_device	*parent;
315 
316 	/* protects updating the header */
317 	struct rw_semaphore     header_rwsem;
318 
319 	struct rbd_mapping	mapping;
320 
321 	struct list_head	node;
322 
323 	/* sysfs related */
324 	struct device		dev;
325 	unsigned long		open_count;	/* protected by lock */
326 };
327 
328 /*
329  * Flag bits for rbd_dev->flags.  If atomicity is required,
330  * rbd_dev->lock is used to protect access.
331  *
332  * Currently, only the "removing" flag (which is coupled with the
333  * "open_count" field) requires atomic access.
334  */
335 enum rbd_dev_flags {
336 	RBD_DEV_FLAG_EXISTS,	/* mapped snapshot has not been deleted */
337 	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
338 };
339 
340 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
341 
342 static LIST_HEAD(rbd_dev_list);    /* devices */
343 static DEFINE_SPINLOCK(rbd_dev_list_lock);
344 
345 static LIST_HEAD(rbd_client_list);		/* clients */
346 static DEFINE_SPINLOCK(rbd_client_list_lock);
347 
348 /* Slab caches for frequently-allocated structures */
349 
350 static struct kmem_cache	*rbd_img_request_cache;
351 static struct kmem_cache	*rbd_obj_request_cache;
352 static struct kmem_cache	*rbd_segment_name_cache;
353 
354 static int rbd_img_request_submit(struct rbd_img_request *img_request);
355 
356 static void rbd_dev_device_release(struct device *dev);
357 
358 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
359 		       size_t count);
360 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
361 			  size_t count);
362 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
363 
364 static struct bus_attribute rbd_bus_attrs[] = {
365 	__ATTR(add, S_IWUSR, NULL, rbd_add),
366 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
367 	__ATTR_NULL
368 };
369 
370 static struct bus_type rbd_bus_type = {
371 	.name		= "rbd",
372 	.bus_attrs	= rbd_bus_attrs,
373 };
374 
375 static void rbd_root_dev_release(struct device *dev)
376 {
377 }
378 
379 static struct device rbd_root_dev = {
380 	.init_name =    "rbd",
381 	.release =      rbd_root_dev_release,
382 };
383 
384 static __printf(2, 3)
385 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
386 {
387 	struct va_format vaf;
388 	va_list args;
389 
390 	va_start(args, fmt);
391 	vaf.fmt = fmt;
392 	vaf.va = &args;
393 
394 	if (!rbd_dev)
395 		printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
396 	else if (rbd_dev->disk)
397 		printk(KERN_WARNING "%s: %s: %pV\n",
398 			RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
399 	else if (rbd_dev->spec && rbd_dev->spec->image_name)
400 		printk(KERN_WARNING "%s: image %s: %pV\n",
401 			RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
402 	else if (rbd_dev->spec && rbd_dev->spec->image_id)
403 		printk(KERN_WARNING "%s: id %s: %pV\n",
404 			RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
405 	else	/* punt */
406 		printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
407 			RBD_DRV_NAME, rbd_dev, &vaf);
408 	va_end(args);
409 }
410 
411 #ifdef RBD_DEBUG
412 #define rbd_assert(expr)						\
413 		if (unlikely(!(expr))) {				\
414 			printk(KERN_ERR "\nAssertion failure in %s() "	\
415 						"at line %d:\n\n"	\
416 					"\trbd_assert(%s);\n\n",	\
417 					__func__, __LINE__, #expr);	\
418 			BUG();						\
419 		}
420 #else /* !RBD_DEBUG */
421 #  define rbd_assert(expr)	((void) 0)
422 #endif /* !RBD_DEBUG */
423 
424 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
425 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
427 
428 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
429 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431 					u64 snap_id);
432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433 				u8 *order, u64 *snap_size);
434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
435 		u64 *snap_features);
436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
437 
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
439 {
440 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441 	bool removing = false;
442 
443 	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
444 		return -EROFS;
445 
446 	spin_lock_irq(&rbd_dev->lock);
447 	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
448 		removing = true;
449 	else
450 		rbd_dev->open_count++;
451 	spin_unlock_irq(&rbd_dev->lock);
452 	if (removing)
453 		return -ENOENT;
454 
455 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456 	(void) get_device(&rbd_dev->dev);
457 	set_device_ro(bdev, rbd_dev->mapping.read_only);
458 	mutex_unlock(&ctl_mutex);
459 
460 	return 0;
461 }
462 
463 static void rbd_release(struct gendisk *disk, fmode_t mode)
464 {
465 	struct rbd_device *rbd_dev = disk->private_data;
466 	unsigned long open_count_before;
467 
468 	spin_lock_irq(&rbd_dev->lock);
469 	open_count_before = rbd_dev->open_count--;
470 	spin_unlock_irq(&rbd_dev->lock);
471 	rbd_assert(open_count_before > 0);
472 
473 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474 	put_device(&rbd_dev->dev);
475 	mutex_unlock(&ctl_mutex);
476 }
477 
478 static const struct block_device_operations rbd_bd_ops = {
479 	.owner			= THIS_MODULE,
480 	.open			= rbd_open,
481 	.release		= rbd_release,
482 };
483 
484 /*
485  * Initialize an rbd client instance.
486  * We own *ceph_opts.
487  */
488 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489 {
490 	struct rbd_client *rbdc;
491 	int ret = -ENOMEM;
492 
493 	dout("%s:\n", __func__);
494 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
495 	if (!rbdc)
496 		goto out_opt;
497 
498 	kref_init(&rbdc->kref);
499 	INIT_LIST_HEAD(&rbdc->node);
500 
501 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502 
503 	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
504 	if (IS_ERR(rbdc->client))
505 		goto out_mutex;
506 	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507 
508 	ret = ceph_open_session(rbdc->client);
509 	if (ret < 0)
510 		goto out_err;
511 
512 	spin_lock(&rbd_client_list_lock);
513 	list_add_tail(&rbdc->node, &rbd_client_list);
514 	spin_unlock(&rbd_client_list_lock);
515 
516 	mutex_unlock(&ctl_mutex);
517 	dout("%s: rbdc %p\n", __func__, rbdc);
518 
519 	return rbdc;
520 
521 out_err:
522 	ceph_destroy_client(rbdc->client);
523 out_mutex:
524 	mutex_unlock(&ctl_mutex);
525 	kfree(rbdc);
526 out_opt:
527 	if (ceph_opts)
528 		ceph_destroy_options(ceph_opts);
529 	dout("%s: error %d\n", __func__, ret);
530 
531 	return ERR_PTR(ret);
532 }
533 
534 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535 {
536 	kref_get(&rbdc->kref);
537 
538 	return rbdc;
539 }
540 
541 /*
542  * Find a ceph client with specific addr and configuration.  If
543  * found, bump its reference count.
544  */
545 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546 {
547 	struct rbd_client *client_node;
548 	bool found = false;
549 
550 	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
551 		return NULL;
552 
553 	spin_lock(&rbd_client_list_lock);
554 	list_for_each_entry(client_node, &rbd_client_list, node) {
555 		if (!ceph_compare_options(ceph_opts, client_node->client)) {
556 			__rbd_get_client(client_node);
557 
558 			found = true;
559 			break;
560 		}
561 	}
562 	spin_unlock(&rbd_client_list_lock);
563 
564 	return found ? client_node : NULL;
565 }
566 
567 /*
568  * mount options
569  */
570 enum {
571 	Opt_last_int,
572 	/* int args above */
573 	Opt_last_string,
574 	/* string args above */
575 	Opt_read_only,
576 	Opt_read_write,
577 	/* Boolean args above */
578 	Opt_last_bool,
579 };
580 
581 static match_table_t rbd_opts_tokens = {
582 	/* int args above */
583 	/* string args above */
584 	{Opt_read_only, "read_only"},
585 	{Opt_read_only, "ro"},		/* Alternate spelling */
586 	{Opt_read_write, "read_write"},
587 	{Opt_read_write, "rw"},		/* Alternate spelling */
588 	/* Boolean args above */
589 	{-1, NULL}
590 };
591 
592 struct rbd_options {
593 	bool	read_only;
594 };
595 
596 #define RBD_READ_ONLY_DEFAULT	false
597 
598 static int parse_rbd_opts_token(char *c, void *private)
599 {
600 	struct rbd_options *rbd_opts = private;
601 	substring_t argstr[MAX_OPT_ARGS];
602 	int token, intval, ret;
603 
604 	token = match_token(c, rbd_opts_tokens, argstr);
605 	if (token < 0)
606 		return -EINVAL;
607 
608 	if (token < Opt_last_int) {
609 		ret = match_int(&argstr[0], &intval);
610 		if (ret < 0) {
611 			pr_err("bad mount option arg (not int) "
612 			       "at '%s'\n", c);
613 			return ret;
614 		}
615 		dout("got int token %d val %d\n", token, intval);
616 	} else if (token > Opt_last_int && token < Opt_last_string) {
617 		dout("got string token %d val %s\n", token,
618 		     argstr[0].from);
619 	} else if (token > Opt_last_string && token < Opt_last_bool) {
620 		dout("got Boolean token %d\n", token);
621 	} else {
622 		dout("got token %d\n", token);
623 	}
624 
625 	switch (token) {
626 	case Opt_read_only:
627 		rbd_opts->read_only = true;
628 		break;
629 	case Opt_read_write:
630 		rbd_opts->read_only = false;
631 		break;
632 	default:
633 		rbd_assert(false);
634 		break;
635 	}
636 	return 0;
637 }
638 
639 /*
640  * Get a ceph client with specific addr and configuration, if one does
641  * not exist create it.
642  */
643 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644 {
645 	struct rbd_client *rbdc;
646 
647 	rbdc = rbd_client_find(ceph_opts);
648 	if (rbdc)	/* using an existing client */
649 		ceph_destroy_options(ceph_opts);
650 	else
651 		rbdc = rbd_client_create(ceph_opts);
652 
653 	return rbdc;
654 }
655 
656 /*
657  * Destroy ceph client
658  *
659  * Caller must hold rbd_client_list_lock.
660  */
661 static void rbd_client_release(struct kref *kref)
662 {
663 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664 
665 	dout("%s: rbdc %p\n", __func__, rbdc);
666 	spin_lock(&rbd_client_list_lock);
667 	list_del(&rbdc->node);
668 	spin_unlock(&rbd_client_list_lock);
669 
670 	ceph_destroy_client(rbdc->client);
671 	kfree(rbdc);
672 }
673 
674 /*
675  * Drop reference to ceph client node. If it's not referenced anymore, release
676  * it.
677  */
678 static void rbd_put_client(struct rbd_client *rbdc)
679 {
680 	if (rbdc)
681 		kref_put(&rbdc->kref, rbd_client_release);
682 }
683 
684 static bool rbd_image_format_valid(u32 image_format)
685 {
686 	return image_format == 1 || image_format == 2;
687 }
688 
689 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
690 {
691 	size_t size;
692 	u32 snap_count;
693 
694 	/* The header has to start with the magic rbd header text */
695 	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
696 		return false;
697 
698 	/* The bio layer requires at least sector-sized I/O */
699 
700 	if (ondisk->options.order < SECTOR_SHIFT)
701 		return false;
702 
703 	/* If we use u64 in a few spots we may be able to loosen this */
704 
705 	if (ondisk->options.order > 8 * sizeof (int) - 1)
706 		return false;
707 
708 	/*
709 	 * The size of a snapshot header has to fit in a size_t, and
710 	 * that limits the number of snapshots.
711 	 */
712 	snap_count = le32_to_cpu(ondisk->snap_count);
713 	size = SIZE_MAX - sizeof (struct ceph_snap_context);
714 	if (snap_count > size / sizeof (__le64))
715 		return false;
716 
717 	/*
718 	 * Not only that, but the size of the entire the snapshot
719 	 * header must also be representable in a size_t.
720 	 */
721 	size -= snap_count * sizeof (__le64);
722 	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
723 		return false;
724 
725 	return true;
726 }
727 
728 /*
729  * Create a new header structure, translate header format from the on-disk
730  * header.
731  */
732 static int rbd_header_from_disk(struct rbd_image_header *header,
733 				 struct rbd_image_header_ondisk *ondisk)
734 {
735 	u32 snap_count;
736 	size_t len;
737 	size_t size;
738 	u32 i;
739 
740 	memset(header, 0, sizeof (*header));
741 
742 	snap_count = le32_to_cpu(ondisk->snap_count);
743 
744 	len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
745 	header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
746 	if (!header->object_prefix)
747 		return -ENOMEM;
748 	memcpy(header->object_prefix, ondisk->object_prefix, len);
749 	header->object_prefix[len] = '\0';
750 
751 	if (snap_count) {
752 		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
753 
754 		/* Save a copy of the snapshot names */
755 
756 		if (snap_names_len > (u64) SIZE_MAX)
757 			return -EIO;
758 		header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
759 		if (!header->snap_names)
760 			goto out_err;
761 		/*
762 		 * Note that rbd_dev_v1_header_read() guarantees
763 		 * the ondisk buffer we're working with has
764 		 * snap_names_len bytes beyond the end of the
765 		 * snapshot id array, this memcpy() is safe.
766 		 */
767 		memcpy(header->snap_names, &ondisk->snaps[snap_count],
768 			snap_names_len);
769 
770 		/* Record each snapshot's size */
771 
772 		size = snap_count * sizeof (*header->snap_sizes);
773 		header->snap_sizes = kmalloc(size, GFP_KERNEL);
774 		if (!header->snap_sizes)
775 			goto out_err;
776 		for (i = 0; i < snap_count; i++)
777 			header->snap_sizes[i] =
778 				le64_to_cpu(ondisk->snaps[i].image_size);
779 	} else {
780 		header->snap_names = NULL;
781 		header->snap_sizes = NULL;
782 	}
783 
784 	header->features = 0;	/* No features support in v1 images */
785 	header->obj_order = ondisk->options.order;
786 	header->crypt_type = ondisk->options.crypt_type;
787 	header->comp_type = ondisk->options.comp_type;
788 
789 	/* Allocate and fill in the snapshot context */
790 
791 	header->image_size = le64_to_cpu(ondisk->image_size);
792 
793 	header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
794 	if (!header->snapc)
795 		goto out_err;
796 	header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
797 	for (i = 0; i < snap_count; i++)
798 		header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
799 
800 	return 0;
801 
802 out_err:
803 	kfree(header->snap_sizes);
804 	header->snap_sizes = NULL;
805 	kfree(header->snap_names);
806 	header->snap_names = NULL;
807 	kfree(header->object_prefix);
808 	header->object_prefix = NULL;
809 
810 	return -ENOMEM;
811 }
812 
813 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
814 {
815 	const char *snap_name;
816 
817 	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
818 
819 	/* Skip over names until we find the one we are looking for */
820 
821 	snap_name = rbd_dev->header.snap_names;
822 	while (which--)
823 		snap_name += strlen(snap_name) + 1;
824 
825 	return kstrdup(snap_name, GFP_KERNEL);
826 }
827 
828 /*
829  * Snapshot id comparison function for use with qsort()/bsearch().
830  * Note that result is for snapshots in *descending* order.
831  */
832 static int snapid_compare_reverse(const void *s1, const void *s2)
833 {
834 	u64 snap_id1 = *(u64 *)s1;
835 	u64 snap_id2 = *(u64 *)s2;
836 
837 	if (snap_id1 < snap_id2)
838 		return 1;
839 	return snap_id1 == snap_id2 ? 0 : -1;
840 }
841 
842 /*
843  * Search a snapshot context to see if the given snapshot id is
844  * present.
845  *
846  * Returns the position of the snapshot id in the array if it's found,
847  * or BAD_SNAP_INDEX otherwise.
848  *
849  * Note: The snapshot array is in kept sorted (by the osd) in
850  * reverse order, highest snapshot id first.
851  */
852 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
853 {
854 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
855 	u64 *found;
856 
857 	found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
858 				sizeof (snap_id), snapid_compare_reverse);
859 
860 	return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
861 }
862 
863 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
864 					u64 snap_id)
865 {
866 	u32 which;
867 
868 	which = rbd_dev_snap_index(rbd_dev, snap_id);
869 	if (which == BAD_SNAP_INDEX)
870 		return NULL;
871 
872 	return _rbd_dev_v1_snap_name(rbd_dev, which);
873 }
874 
875 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
876 {
877 	if (snap_id == CEPH_NOSNAP)
878 		return RBD_SNAP_HEAD_NAME;
879 
880 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
881 	if (rbd_dev->image_format == 1)
882 		return rbd_dev_v1_snap_name(rbd_dev, snap_id);
883 
884 	return rbd_dev_v2_snap_name(rbd_dev, snap_id);
885 }
886 
887 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
888 				u64 *snap_size)
889 {
890 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
891 	if (snap_id == CEPH_NOSNAP) {
892 		*snap_size = rbd_dev->header.image_size;
893 	} else if (rbd_dev->image_format == 1) {
894 		u32 which;
895 
896 		which = rbd_dev_snap_index(rbd_dev, snap_id);
897 		if (which == BAD_SNAP_INDEX)
898 			return -ENOENT;
899 
900 		*snap_size = rbd_dev->header.snap_sizes[which];
901 	} else {
902 		u64 size = 0;
903 		int ret;
904 
905 		ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
906 		if (ret)
907 			return ret;
908 
909 		*snap_size = size;
910 	}
911 	return 0;
912 }
913 
914 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
915 			u64 *snap_features)
916 {
917 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
918 	if (snap_id == CEPH_NOSNAP) {
919 		*snap_features = rbd_dev->header.features;
920 	} else if (rbd_dev->image_format == 1) {
921 		*snap_features = 0;	/* No features for format 1 */
922 	} else {
923 		u64 features = 0;
924 		int ret;
925 
926 		ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
927 		if (ret)
928 			return ret;
929 
930 		*snap_features = features;
931 	}
932 	return 0;
933 }
934 
935 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
936 {
937 	const char *snap_name = rbd_dev->spec->snap_name;
938 	u64 snap_id;
939 	u64 size = 0;
940 	u64 features = 0;
941 	int ret;
942 
943 	if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
944 		snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
945 		if (snap_id == CEPH_NOSNAP)
946 			return -ENOENT;
947 	} else {
948 		snap_id = CEPH_NOSNAP;
949 	}
950 
951 	ret = rbd_snap_size(rbd_dev, snap_id, &size);
952 	if (ret)
953 		return ret;
954 	ret = rbd_snap_features(rbd_dev, snap_id, &features);
955 	if (ret)
956 		return ret;
957 
958 	rbd_dev->mapping.size = size;
959 	rbd_dev->mapping.features = features;
960 
961 	/* If we are mapping a snapshot it must be marked read-only */
962 
963 	if (snap_id != CEPH_NOSNAP)
964 		rbd_dev->mapping.read_only = true;
965 
966 	return 0;
967 }
968 
969 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
970 {
971 	rbd_dev->mapping.size = 0;
972 	rbd_dev->mapping.features = 0;
973 	rbd_dev->mapping.read_only = true;
974 }
975 
976 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
977 {
978 	rbd_dev->mapping.size = 0;
979 	rbd_dev->mapping.features = 0;
980 	rbd_dev->mapping.read_only = true;
981 }
982 
983 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
984 {
985 	char *name;
986 	u64 segment;
987 	int ret;
988 
989 	name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
990 	if (!name)
991 		return NULL;
992 	segment = offset >> rbd_dev->header.obj_order;
993 	ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
994 			rbd_dev->header.object_prefix, segment);
995 	if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
996 		pr_err("error formatting segment name for #%llu (%d)\n",
997 			segment, ret);
998 		kfree(name);
999 		name = NULL;
1000 	}
1001 
1002 	return name;
1003 }
1004 
1005 static void rbd_segment_name_free(const char *name)
1006 {
1007 	/* The explicit cast here is needed to drop the const qualifier */
1008 
1009 	kmem_cache_free(rbd_segment_name_cache, (void *)name);
1010 }
1011 
1012 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1013 {
1014 	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1015 
1016 	return offset & (segment_size - 1);
1017 }
1018 
1019 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1020 				u64 offset, u64 length)
1021 {
1022 	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1023 
1024 	offset &= segment_size - 1;
1025 
1026 	rbd_assert(length <= U64_MAX - offset);
1027 	if (offset + length > segment_size)
1028 		length = segment_size - offset;
1029 
1030 	return length;
1031 }
1032 
1033 /*
1034  * returns the size of an object in the image
1035  */
1036 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1037 {
1038 	return 1 << header->obj_order;
1039 }
1040 
1041 /*
1042  * bio helpers
1043  */
1044 
1045 static void bio_chain_put(struct bio *chain)
1046 {
1047 	struct bio *tmp;
1048 
1049 	while (chain) {
1050 		tmp = chain;
1051 		chain = chain->bi_next;
1052 		bio_put(tmp);
1053 	}
1054 }
1055 
1056 /*
1057  * zeros a bio chain, starting at specific offset
1058  */
1059 static void zero_bio_chain(struct bio *chain, int start_ofs)
1060 {
1061 	struct bio_vec *bv;
1062 	unsigned long flags;
1063 	void *buf;
1064 	int i;
1065 	int pos = 0;
1066 
1067 	while (chain) {
1068 		bio_for_each_segment(bv, chain, i) {
1069 			if (pos + bv->bv_len > start_ofs) {
1070 				int remainder = max(start_ofs - pos, 0);
1071 				buf = bvec_kmap_irq(bv, &flags);
1072 				memset(buf + remainder, 0,
1073 				       bv->bv_len - remainder);
1074 				bvec_kunmap_irq(buf, &flags);
1075 			}
1076 			pos += bv->bv_len;
1077 		}
1078 
1079 		chain = chain->bi_next;
1080 	}
1081 }
1082 
1083 /*
1084  * similar to zero_bio_chain(), zeros data defined by a page array,
1085  * starting at the given byte offset from the start of the array and
1086  * continuing up to the given end offset.  The pages array is
1087  * assumed to be big enough to hold all bytes up to the end.
1088  */
1089 static void zero_pages(struct page **pages, u64 offset, u64 end)
1090 {
1091 	struct page **page = &pages[offset >> PAGE_SHIFT];
1092 
1093 	rbd_assert(end > offset);
1094 	rbd_assert(end - offset <= (u64)SIZE_MAX);
1095 	while (offset < end) {
1096 		size_t page_offset;
1097 		size_t length;
1098 		unsigned long flags;
1099 		void *kaddr;
1100 
1101 		page_offset = (size_t)(offset & ~PAGE_MASK);
1102 		length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1103 		local_irq_save(flags);
1104 		kaddr = kmap_atomic(*page);
1105 		memset(kaddr + page_offset, 0, length);
1106 		kunmap_atomic(kaddr);
1107 		local_irq_restore(flags);
1108 
1109 		offset += length;
1110 		page++;
1111 	}
1112 }
1113 
1114 /*
1115  * Clone a portion of a bio, starting at the given byte offset
1116  * and continuing for the number of bytes indicated.
1117  */
1118 static struct bio *bio_clone_range(struct bio *bio_src,
1119 					unsigned int offset,
1120 					unsigned int len,
1121 					gfp_t gfpmask)
1122 {
1123 	struct bio_vec *bv;
1124 	unsigned int resid;
1125 	unsigned short idx;
1126 	unsigned int voff;
1127 	unsigned short end_idx;
1128 	unsigned short vcnt;
1129 	struct bio *bio;
1130 
1131 	/* Handle the easy case for the caller */
1132 
1133 	if (!offset && len == bio_src->bi_size)
1134 		return bio_clone(bio_src, gfpmask);
1135 
1136 	if (WARN_ON_ONCE(!len))
1137 		return NULL;
1138 	if (WARN_ON_ONCE(len > bio_src->bi_size))
1139 		return NULL;
1140 	if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1141 		return NULL;
1142 
1143 	/* Find first affected segment... */
1144 
1145 	resid = offset;
1146 	bio_for_each_segment(bv, bio_src, idx) {
1147 		if (resid < bv->bv_len)
1148 			break;
1149 		resid -= bv->bv_len;
1150 	}
1151 	voff = resid;
1152 
1153 	/* ...and the last affected segment */
1154 
1155 	resid += len;
1156 	__bio_for_each_segment(bv, bio_src, end_idx, idx) {
1157 		if (resid <= bv->bv_len)
1158 			break;
1159 		resid -= bv->bv_len;
1160 	}
1161 	vcnt = end_idx - idx + 1;
1162 
1163 	/* Build the clone */
1164 
1165 	bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1166 	if (!bio)
1167 		return NULL;	/* ENOMEM */
1168 
1169 	bio->bi_bdev = bio_src->bi_bdev;
1170 	bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1171 	bio->bi_rw = bio_src->bi_rw;
1172 	bio->bi_flags |= 1 << BIO_CLONED;
1173 
1174 	/*
1175 	 * Copy over our part of the bio_vec, then update the first
1176 	 * and last (or only) entries.
1177 	 */
1178 	memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1179 			vcnt * sizeof (struct bio_vec));
1180 	bio->bi_io_vec[0].bv_offset += voff;
1181 	if (vcnt > 1) {
1182 		bio->bi_io_vec[0].bv_len -= voff;
1183 		bio->bi_io_vec[vcnt - 1].bv_len = resid;
1184 	} else {
1185 		bio->bi_io_vec[0].bv_len = len;
1186 	}
1187 
1188 	bio->bi_vcnt = vcnt;
1189 	bio->bi_size = len;
1190 	bio->bi_idx = 0;
1191 
1192 	return bio;
1193 }
1194 
1195 /*
1196  * Clone a portion of a bio chain, starting at the given byte offset
1197  * into the first bio in the source chain and continuing for the
1198  * number of bytes indicated.  The result is another bio chain of
1199  * exactly the given length, or a null pointer on error.
1200  *
1201  * The bio_src and offset parameters are both in-out.  On entry they
1202  * refer to the first source bio and the offset into that bio where
1203  * the start of data to be cloned is located.
1204  *
1205  * On return, bio_src is updated to refer to the bio in the source
1206  * chain that contains first un-cloned byte, and *offset will
1207  * contain the offset of that byte within that bio.
1208  */
1209 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1210 					unsigned int *offset,
1211 					unsigned int len,
1212 					gfp_t gfpmask)
1213 {
1214 	struct bio *bi = *bio_src;
1215 	unsigned int off = *offset;
1216 	struct bio *chain = NULL;
1217 	struct bio **end;
1218 
1219 	/* Build up a chain of clone bios up to the limit */
1220 
1221 	if (!bi || off >= bi->bi_size || !len)
1222 		return NULL;		/* Nothing to clone */
1223 
1224 	end = &chain;
1225 	while (len) {
1226 		unsigned int bi_size;
1227 		struct bio *bio;
1228 
1229 		if (!bi) {
1230 			rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1231 			goto out_err;	/* EINVAL; ran out of bio's */
1232 		}
1233 		bi_size = min_t(unsigned int, bi->bi_size - off, len);
1234 		bio = bio_clone_range(bi, off, bi_size, gfpmask);
1235 		if (!bio)
1236 			goto out_err;	/* ENOMEM */
1237 
1238 		*end = bio;
1239 		end = &bio->bi_next;
1240 
1241 		off += bi_size;
1242 		if (off == bi->bi_size) {
1243 			bi = bi->bi_next;
1244 			off = 0;
1245 		}
1246 		len -= bi_size;
1247 	}
1248 	*bio_src = bi;
1249 	*offset = off;
1250 
1251 	return chain;
1252 out_err:
1253 	bio_chain_put(chain);
1254 
1255 	return NULL;
1256 }
1257 
1258 /*
1259  * The default/initial value for all object request flags is 0.  For
1260  * each flag, once its value is set to 1 it is never reset to 0
1261  * again.
1262  */
1263 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1264 {
1265 	if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1266 		struct rbd_device *rbd_dev;
1267 
1268 		rbd_dev = obj_request->img_request->rbd_dev;
1269 		rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1270 			obj_request);
1271 	}
1272 }
1273 
1274 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1275 {
1276 	smp_mb();
1277 	return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1278 }
1279 
1280 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1281 {
1282 	if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1283 		struct rbd_device *rbd_dev = NULL;
1284 
1285 		if (obj_request_img_data_test(obj_request))
1286 			rbd_dev = obj_request->img_request->rbd_dev;
1287 		rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1288 			obj_request);
1289 	}
1290 }
1291 
1292 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1293 {
1294 	smp_mb();
1295 	return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1296 }
1297 
1298 /*
1299  * This sets the KNOWN flag after (possibly) setting the EXISTS
1300  * flag.  The latter is set based on the "exists" value provided.
1301  *
1302  * Note that for our purposes once an object exists it never goes
1303  * away again.  It's possible that the response from two existence
1304  * checks are separated by the creation of the target object, and
1305  * the first ("doesn't exist") response arrives *after* the second
1306  * ("does exist").  In that case we ignore the second one.
1307  */
1308 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1309 				bool exists)
1310 {
1311 	if (exists)
1312 		set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1313 	set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1314 	smp_mb();
1315 }
1316 
1317 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1318 {
1319 	smp_mb();
1320 	return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1321 }
1322 
1323 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1324 {
1325 	smp_mb();
1326 	return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1327 }
1328 
1329 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1330 {
1331 	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1332 		atomic_read(&obj_request->kref.refcount));
1333 	kref_get(&obj_request->kref);
1334 }
1335 
1336 static void rbd_obj_request_destroy(struct kref *kref);
1337 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1338 {
1339 	rbd_assert(obj_request != NULL);
1340 	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1341 		atomic_read(&obj_request->kref.refcount));
1342 	kref_put(&obj_request->kref, rbd_obj_request_destroy);
1343 }
1344 
1345 static void rbd_img_request_get(struct rbd_img_request *img_request)
1346 {
1347 	dout("%s: img %p (was %d)\n", __func__, img_request,
1348 		atomic_read(&img_request->kref.refcount));
1349 	kref_get(&img_request->kref);
1350 }
1351 
1352 static void rbd_img_request_destroy(struct kref *kref);
1353 static void rbd_img_request_put(struct rbd_img_request *img_request)
1354 {
1355 	rbd_assert(img_request != NULL);
1356 	dout("%s: img %p (was %d)\n", __func__, img_request,
1357 		atomic_read(&img_request->kref.refcount));
1358 	kref_put(&img_request->kref, rbd_img_request_destroy);
1359 }
1360 
1361 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1362 					struct rbd_obj_request *obj_request)
1363 {
1364 	rbd_assert(obj_request->img_request == NULL);
1365 
1366 	/* Image request now owns object's original reference */
1367 	obj_request->img_request = img_request;
1368 	obj_request->which = img_request->obj_request_count;
1369 	rbd_assert(!obj_request_img_data_test(obj_request));
1370 	obj_request_img_data_set(obj_request);
1371 	rbd_assert(obj_request->which != BAD_WHICH);
1372 	img_request->obj_request_count++;
1373 	list_add_tail(&obj_request->links, &img_request->obj_requests);
1374 	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1375 		obj_request->which);
1376 }
1377 
1378 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1379 					struct rbd_obj_request *obj_request)
1380 {
1381 	rbd_assert(obj_request->which != BAD_WHICH);
1382 
1383 	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1384 		obj_request->which);
1385 	list_del(&obj_request->links);
1386 	rbd_assert(img_request->obj_request_count > 0);
1387 	img_request->obj_request_count--;
1388 	rbd_assert(obj_request->which == img_request->obj_request_count);
1389 	obj_request->which = BAD_WHICH;
1390 	rbd_assert(obj_request_img_data_test(obj_request));
1391 	rbd_assert(obj_request->img_request == img_request);
1392 	obj_request->img_request = NULL;
1393 	obj_request->callback = NULL;
1394 	rbd_obj_request_put(obj_request);
1395 }
1396 
1397 static bool obj_request_type_valid(enum obj_request_type type)
1398 {
1399 	switch (type) {
1400 	case OBJ_REQUEST_NODATA:
1401 	case OBJ_REQUEST_BIO:
1402 	case OBJ_REQUEST_PAGES:
1403 		return true;
1404 	default:
1405 		return false;
1406 	}
1407 }
1408 
1409 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1410 				struct rbd_obj_request *obj_request)
1411 {
1412 	dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1413 
1414 	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1415 }
1416 
1417 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1418 {
1419 
1420 	dout("%s: img %p\n", __func__, img_request);
1421 
1422 	/*
1423 	 * If no error occurred, compute the aggregate transfer
1424 	 * count for the image request.  We could instead use
1425 	 * atomic64_cmpxchg() to update it as each object request
1426 	 * completes; not clear which way is better off hand.
1427 	 */
1428 	if (!img_request->result) {
1429 		struct rbd_obj_request *obj_request;
1430 		u64 xferred = 0;
1431 
1432 		for_each_obj_request(img_request, obj_request)
1433 			xferred += obj_request->xferred;
1434 		img_request->xferred = xferred;
1435 	}
1436 
1437 	if (img_request->callback)
1438 		img_request->callback(img_request);
1439 	else
1440 		rbd_img_request_put(img_request);
1441 }
1442 
1443 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1444 
1445 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1446 {
1447 	dout("%s: obj %p\n", __func__, obj_request);
1448 
1449 	return wait_for_completion_interruptible(&obj_request->completion);
1450 }
1451 
1452 /*
1453  * The default/initial value for all image request flags is 0.  Each
1454  * is conditionally set to 1 at image request initialization time
1455  * and currently never change thereafter.
1456  */
1457 static void img_request_write_set(struct rbd_img_request *img_request)
1458 {
1459 	set_bit(IMG_REQ_WRITE, &img_request->flags);
1460 	smp_mb();
1461 }
1462 
1463 static bool img_request_write_test(struct rbd_img_request *img_request)
1464 {
1465 	smp_mb();
1466 	return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1467 }
1468 
1469 static void img_request_child_set(struct rbd_img_request *img_request)
1470 {
1471 	set_bit(IMG_REQ_CHILD, &img_request->flags);
1472 	smp_mb();
1473 }
1474 
1475 static bool img_request_child_test(struct rbd_img_request *img_request)
1476 {
1477 	smp_mb();
1478 	return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1479 }
1480 
1481 static void img_request_layered_set(struct rbd_img_request *img_request)
1482 {
1483 	set_bit(IMG_REQ_LAYERED, &img_request->flags);
1484 	smp_mb();
1485 }
1486 
1487 static bool img_request_layered_test(struct rbd_img_request *img_request)
1488 {
1489 	smp_mb();
1490 	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1491 }
1492 
1493 static void
1494 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1495 {
1496 	u64 xferred = obj_request->xferred;
1497 	u64 length = obj_request->length;
1498 
1499 	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1500 		obj_request, obj_request->img_request, obj_request->result,
1501 		xferred, length);
1502 	/*
1503 	 * ENOENT means a hole in the image.  We zero-fill the
1504 	 * entire length of the request.  A short read also implies
1505 	 * zero-fill to the end of the request.  Either way we
1506 	 * update the xferred count to indicate the whole request
1507 	 * was satisfied.
1508 	 */
1509 	rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1510 	if (obj_request->result == -ENOENT) {
1511 		if (obj_request->type == OBJ_REQUEST_BIO)
1512 			zero_bio_chain(obj_request->bio_list, 0);
1513 		else
1514 			zero_pages(obj_request->pages, 0, length);
1515 		obj_request->result = 0;
1516 		obj_request->xferred = length;
1517 	} else if (xferred < length && !obj_request->result) {
1518 		if (obj_request->type == OBJ_REQUEST_BIO)
1519 			zero_bio_chain(obj_request->bio_list, xferred);
1520 		else
1521 			zero_pages(obj_request->pages, xferred, length);
1522 		obj_request->xferred = length;
1523 	}
1524 	obj_request_done_set(obj_request);
1525 }
1526 
1527 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1528 {
1529 	dout("%s: obj %p cb %p\n", __func__, obj_request,
1530 		obj_request->callback);
1531 	if (obj_request->callback)
1532 		obj_request->callback(obj_request);
1533 	else
1534 		complete_all(&obj_request->completion);
1535 }
1536 
1537 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1538 {
1539 	dout("%s: obj %p\n", __func__, obj_request);
1540 	obj_request_done_set(obj_request);
1541 }
1542 
1543 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1544 {
1545 	struct rbd_img_request *img_request = NULL;
1546 	struct rbd_device *rbd_dev = NULL;
1547 	bool layered = false;
1548 
1549 	if (obj_request_img_data_test(obj_request)) {
1550 		img_request = obj_request->img_request;
1551 		layered = img_request && img_request_layered_test(img_request);
1552 		rbd_dev = img_request->rbd_dev;
1553 	}
1554 
1555 	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1556 		obj_request, img_request, obj_request->result,
1557 		obj_request->xferred, obj_request->length);
1558 	if (layered && obj_request->result == -ENOENT &&
1559 			obj_request->img_offset < rbd_dev->parent_overlap)
1560 		rbd_img_parent_read(obj_request);
1561 	else if (img_request)
1562 		rbd_img_obj_request_read_callback(obj_request);
1563 	else
1564 		obj_request_done_set(obj_request);
1565 }
1566 
1567 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1568 {
1569 	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1570 		obj_request->result, obj_request->length);
1571 	/*
1572 	 * There is no such thing as a successful short write.  Set
1573 	 * it to our originally-requested length.
1574 	 */
1575 	obj_request->xferred = obj_request->length;
1576 	obj_request_done_set(obj_request);
1577 }
1578 
1579 /*
1580  * For a simple stat call there's nothing to do.  We'll do more if
1581  * this is part of a write sequence for a layered image.
1582  */
1583 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1584 {
1585 	dout("%s: obj %p\n", __func__, obj_request);
1586 	obj_request_done_set(obj_request);
1587 }
1588 
1589 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1590 				struct ceph_msg *msg)
1591 {
1592 	struct rbd_obj_request *obj_request = osd_req->r_priv;
1593 	u16 opcode;
1594 
1595 	dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1596 	rbd_assert(osd_req == obj_request->osd_req);
1597 	if (obj_request_img_data_test(obj_request)) {
1598 		rbd_assert(obj_request->img_request);
1599 		rbd_assert(obj_request->which != BAD_WHICH);
1600 	} else {
1601 		rbd_assert(obj_request->which == BAD_WHICH);
1602 	}
1603 
1604 	if (osd_req->r_result < 0)
1605 		obj_request->result = osd_req->r_result;
1606 
1607 	BUG_ON(osd_req->r_num_ops > 2);
1608 
1609 	/*
1610 	 * We support a 64-bit length, but ultimately it has to be
1611 	 * passed to blk_end_request(), which takes an unsigned int.
1612 	 */
1613 	obj_request->xferred = osd_req->r_reply_op_len[0];
1614 	rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1615 	opcode = osd_req->r_ops[0].op;
1616 	switch (opcode) {
1617 	case CEPH_OSD_OP_READ:
1618 		rbd_osd_read_callback(obj_request);
1619 		break;
1620 	case CEPH_OSD_OP_WRITE:
1621 		rbd_osd_write_callback(obj_request);
1622 		break;
1623 	case CEPH_OSD_OP_STAT:
1624 		rbd_osd_stat_callback(obj_request);
1625 		break;
1626 	case CEPH_OSD_OP_CALL:
1627 	case CEPH_OSD_OP_NOTIFY_ACK:
1628 	case CEPH_OSD_OP_WATCH:
1629 		rbd_osd_trivial_callback(obj_request);
1630 		break;
1631 	default:
1632 		rbd_warn(NULL, "%s: unsupported op %hu\n",
1633 			obj_request->object_name, (unsigned short) opcode);
1634 		break;
1635 	}
1636 
1637 	if (obj_request_done_test(obj_request))
1638 		rbd_obj_request_complete(obj_request);
1639 }
1640 
1641 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1642 {
1643 	struct rbd_img_request *img_request = obj_request->img_request;
1644 	struct ceph_osd_request *osd_req = obj_request->osd_req;
1645 	u64 snap_id;
1646 
1647 	rbd_assert(osd_req != NULL);
1648 
1649 	snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1650 	ceph_osdc_build_request(osd_req, obj_request->offset,
1651 			NULL, snap_id, NULL);
1652 }
1653 
1654 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1655 {
1656 	struct rbd_img_request *img_request = obj_request->img_request;
1657 	struct ceph_osd_request *osd_req = obj_request->osd_req;
1658 	struct ceph_snap_context *snapc;
1659 	struct timespec mtime = CURRENT_TIME;
1660 
1661 	rbd_assert(osd_req != NULL);
1662 
1663 	snapc = img_request ? img_request->snapc : NULL;
1664 	ceph_osdc_build_request(osd_req, obj_request->offset,
1665 			snapc, CEPH_NOSNAP, &mtime);
1666 }
1667 
1668 static struct ceph_osd_request *rbd_osd_req_create(
1669 					struct rbd_device *rbd_dev,
1670 					bool write_request,
1671 					struct rbd_obj_request *obj_request)
1672 {
1673 	struct ceph_snap_context *snapc = NULL;
1674 	struct ceph_osd_client *osdc;
1675 	struct ceph_osd_request *osd_req;
1676 
1677 	if (obj_request_img_data_test(obj_request)) {
1678 		struct rbd_img_request *img_request = obj_request->img_request;
1679 
1680 		rbd_assert(write_request ==
1681 				img_request_write_test(img_request));
1682 		if (write_request)
1683 			snapc = img_request->snapc;
1684 	}
1685 
1686 	/* Allocate and initialize the request, for the single op */
1687 
1688 	osdc = &rbd_dev->rbd_client->client->osdc;
1689 	osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1690 	if (!osd_req)
1691 		return NULL;	/* ENOMEM */
1692 
1693 	if (write_request)
1694 		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1695 	else
1696 		osd_req->r_flags = CEPH_OSD_FLAG_READ;
1697 
1698 	osd_req->r_callback = rbd_osd_req_callback;
1699 	osd_req->r_priv = obj_request;
1700 
1701 	osd_req->r_oid_len = strlen(obj_request->object_name);
1702 	rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1703 	memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1704 
1705 	osd_req->r_file_layout = rbd_dev->layout;	/* struct */
1706 
1707 	return osd_req;
1708 }
1709 
1710 /*
1711  * Create a copyup osd request based on the information in the
1712  * object request supplied.  A copyup request has two osd ops,
1713  * a copyup method call, and a "normal" write request.
1714  */
1715 static struct ceph_osd_request *
1716 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1717 {
1718 	struct rbd_img_request *img_request;
1719 	struct ceph_snap_context *snapc;
1720 	struct rbd_device *rbd_dev;
1721 	struct ceph_osd_client *osdc;
1722 	struct ceph_osd_request *osd_req;
1723 
1724 	rbd_assert(obj_request_img_data_test(obj_request));
1725 	img_request = obj_request->img_request;
1726 	rbd_assert(img_request);
1727 	rbd_assert(img_request_write_test(img_request));
1728 
1729 	/* Allocate and initialize the request, for the two ops */
1730 
1731 	snapc = img_request->snapc;
1732 	rbd_dev = img_request->rbd_dev;
1733 	osdc = &rbd_dev->rbd_client->client->osdc;
1734 	osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1735 	if (!osd_req)
1736 		return NULL;	/* ENOMEM */
1737 
1738 	osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1739 	osd_req->r_callback = rbd_osd_req_callback;
1740 	osd_req->r_priv = obj_request;
1741 
1742 	osd_req->r_oid_len = strlen(obj_request->object_name);
1743 	rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1744 	memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1745 
1746 	osd_req->r_file_layout = rbd_dev->layout;	/* struct */
1747 
1748 	return osd_req;
1749 }
1750 
1751 
1752 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1753 {
1754 	ceph_osdc_put_request(osd_req);
1755 }
1756 
1757 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1758 
1759 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1760 						u64 offset, u64 length,
1761 						enum obj_request_type type)
1762 {
1763 	struct rbd_obj_request *obj_request;
1764 	size_t size;
1765 	char *name;
1766 
1767 	rbd_assert(obj_request_type_valid(type));
1768 
1769 	size = strlen(object_name) + 1;
1770 	name = kmalloc(size, GFP_KERNEL);
1771 	if (!name)
1772 		return NULL;
1773 
1774 	obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1775 	if (!obj_request) {
1776 		kfree(name);
1777 		return NULL;
1778 	}
1779 
1780 	obj_request->object_name = memcpy(name, object_name, size);
1781 	obj_request->offset = offset;
1782 	obj_request->length = length;
1783 	obj_request->flags = 0;
1784 	obj_request->which = BAD_WHICH;
1785 	obj_request->type = type;
1786 	INIT_LIST_HEAD(&obj_request->links);
1787 	init_completion(&obj_request->completion);
1788 	kref_init(&obj_request->kref);
1789 
1790 	dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1791 		offset, length, (int)type, obj_request);
1792 
1793 	return obj_request;
1794 }
1795 
1796 static void rbd_obj_request_destroy(struct kref *kref)
1797 {
1798 	struct rbd_obj_request *obj_request;
1799 
1800 	obj_request = container_of(kref, struct rbd_obj_request, kref);
1801 
1802 	dout("%s: obj %p\n", __func__, obj_request);
1803 
1804 	rbd_assert(obj_request->img_request == NULL);
1805 	rbd_assert(obj_request->which == BAD_WHICH);
1806 
1807 	if (obj_request->osd_req)
1808 		rbd_osd_req_destroy(obj_request->osd_req);
1809 
1810 	rbd_assert(obj_request_type_valid(obj_request->type));
1811 	switch (obj_request->type) {
1812 	case OBJ_REQUEST_NODATA:
1813 		break;		/* Nothing to do */
1814 	case OBJ_REQUEST_BIO:
1815 		if (obj_request->bio_list)
1816 			bio_chain_put(obj_request->bio_list);
1817 		break;
1818 	case OBJ_REQUEST_PAGES:
1819 		if (obj_request->pages)
1820 			ceph_release_page_vector(obj_request->pages,
1821 						obj_request->page_count);
1822 		break;
1823 	}
1824 
1825 	kfree(obj_request->object_name);
1826 	obj_request->object_name = NULL;
1827 	kmem_cache_free(rbd_obj_request_cache, obj_request);
1828 }
1829 
1830 /*
1831  * Caller is responsible for filling in the list of object requests
1832  * that comprises the image request, and the Linux request pointer
1833  * (if there is one).
1834  */
1835 static struct rbd_img_request *rbd_img_request_create(
1836 					struct rbd_device *rbd_dev,
1837 					u64 offset, u64 length,
1838 					bool write_request,
1839 					bool child_request)
1840 {
1841 	struct rbd_img_request *img_request;
1842 
1843 	img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1844 	if (!img_request)
1845 		return NULL;
1846 
1847 	if (write_request) {
1848 		down_read(&rbd_dev->header_rwsem);
1849 		ceph_get_snap_context(rbd_dev->header.snapc);
1850 		up_read(&rbd_dev->header_rwsem);
1851 	}
1852 
1853 	img_request->rq = NULL;
1854 	img_request->rbd_dev = rbd_dev;
1855 	img_request->offset = offset;
1856 	img_request->length = length;
1857 	img_request->flags = 0;
1858 	if (write_request) {
1859 		img_request_write_set(img_request);
1860 		img_request->snapc = rbd_dev->header.snapc;
1861 	} else {
1862 		img_request->snap_id = rbd_dev->spec->snap_id;
1863 	}
1864 	if (child_request)
1865 		img_request_child_set(img_request);
1866 	if (rbd_dev->parent_spec)
1867 		img_request_layered_set(img_request);
1868 	spin_lock_init(&img_request->completion_lock);
1869 	img_request->next_completion = 0;
1870 	img_request->callback = NULL;
1871 	img_request->result = 0;
1872 	img_request->obj_request_count = 0;
1873 	INIT_LIST_HEAD(&img_request->obj_requests);
1874 	kref_init(&img_request->kref);
1875 
1876 	rbd_img_request_get(img_request);	/* Avoid a warning */
1877 	rbd_img_request_put(img_request);	/* TEMPORARY */
1878 
1879 	dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1880 		write_request ? "write" : "read", offset, length,
1881 		img_request);
1882 
1883 	return img_request;
1884 }
1885 
1886 static void rbd_img_request_destroy(struct kref *kref)
1887 {
1888 	struct rbd_img_request *img_request;
1889 	struct rbd_obj_request *obj_request;
1890 	struct rbd_obj_request *next_obj_request;
1891 
1892 	img_request = container_of(kref, struct rbd_img_request, kref);
1893 
1894 	dout("%s: img %p\n", __func__, img_request);
1895 
1896 	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1897 		rbd_img_obj_request_del(img_request, obj_request);
1898 	rbd_assert(img_request->obj_request_count == 0);
1899 
1900 	if (img_request_write_test(img_request))
1901 		ceph_put_snap_context(img_request->snapc);
1902 
1903 	if (img_request_child_test(img_request))
1904 		rbd_obj_request_put(img_request->obj_request);
1905 
1906 	kmem_cache_free(rbd_img_request_cache, img_request);
1907 }
1908 
1909 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1910 {
1911 	struct rbd_img_request *img_request;
1912 	unsigned int xferred;
1913 	int result;
1914 	bool more;
1915 
1916 	rbd_assert(obj_request_img_data_test(obj_request));
1917 	img_request = obj_request->img_request;
1918 
1919 	rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1920 	xferred = (unsigned int)obj_request->xferred;
1921 	result = obj_request->result;
1922 	if (result) {
1923 		struct rbd_device *rbd_dev = img_request->rbd_dev;
1924 
1925 		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1926 			img_request_write_test(img_request) ? "write" : "read",
1927 			obj_request->length, obj_request->img_offset,
1928 			obj_request->offset);
1929 		rbd_warn(rbd_dev, "  result %d xferred %x\n",
1930 			result, xferred);
1931 		if (!img_request->result)
1932 			img_request->result = result;
1933 	}
1934 
1935 	/* Image object requests don't own their page array */
1936 
1937 	if (obj_request->type == OBJ_REQUEST_PAGES) {
1938 		obj_request->pages = NULL;
1939 		obj_request->page_count = 0;
1940 	}
1941 
1942 	if (img_request_child_test(img_request)) {
1943 		rbd_assert(img_request->obj_request != NULL);
1944 		more = obj_request->which < img_request->obj_request_count - 1;
1945 	} else {
1946 		rbd_assert(img_request->rq != NULL);
1947 		more = blk_end_request(img_request->rq, result, xferred);
1948 	}
1949 
1950 	return more;
1951 }
1952 
1953 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1954 {
1955 	struct rbd_img_request *img_request;
1956 	u32 which = obj_request->which;
1957 	bool more = true;
1958 
1959 	rbd_assert(obj_request_img_data_test(obj_request));
1960 	img_request = obj_request->img_request;
1961 
1962 	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1963 	rbd_assert(img_request != NULL);
1964 	rbd_assert(img_request->obj_request_count > 0);
1965 	rbd_assert(which != BAD_WHICH);
1966 	rbd_assert(which < img_request->obj_request_count);
1967 	rbd_assert(which >= img_request->next_completion);
1968 
1969 	spin_lock_irq(&img_request->completion_lock);
1970 	if (which != img_request->next_completion)
1971 		goto out;
1972 
1973 	for_each_obj_request_from(img_request, obj_request) {
1974 		rbd_assert(more);
1975 		rbd_assert(which < img_request->obj_request_count);
1976 
1977 		if (!obj_request_done_test(obj_request))
1978 			break;
1979 		more = rbd_img_obj_end_request(obj_request);
1980 		which++;
1981 	}
1982 
1983 	rbd_assert(more ^ (which == img_request->obj_request_count));
1984 	img_request->next_completion = which;
1985 out:
1986 	spin_unlock_irq(&img_request->completion_lock);
1987 
1988 	if (!more)
1989 		rbd_img_request_complete(img_request);
1990 }
1991 
1992 /*
1993  * Split up an image request into one or more object requests, each
1994  * to a different object.  The "type" parameter indicates whether
1995  * "data_desc" is the pointer to the head of a list of bio
1996  * structures, or the base of a page array.  In either case this
1997  * function assumes data_desc describes memory sufficient to hold
1998  * all data described by the image request.
1999  */
2000 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2001 					enum obj_request_type type,
2002 					void *data_desc)
2003 {
2004 	struct rbd_device *rbd_dev = img_request->rbd_dev;
2005 	struct rbd_obj_request *obj_request = NULL;
2006 	struct rbd_obj_request *next_obj_request;
2007 	bool write_request = img_request_write_test(img_request);
2008 	struct bio *bio_list;
2009 	unsigned int bio_offset = 0;
2010 	struct page **pages;
2011 	u64 img_offset;
2012 	u64 resid;
2013 	u16 opcode;
2014 
2015 	dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2016 		(int)type, data_desc);
2017 
2018 	opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2019 	img_offset = img_request->offset;
2020 	resid = img_request->length;
2021 	rbd_assert(resid > 0);
2022 
2023 	if (type == OBJ_REQUEST_BIO) {
2024 		bio_list = data_desc;
2025 		rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2026 	} else {
2027 		rbd_assert(type == OBJ_REQUEST_PAGES);
2028 		pages = data_desc;
2029 	}
2030 
2031 	while (resid) {
2032 		struct ceph_osd_request *osd_req;
2033 		const char *object_name;
2034 		u64 offset;
2035 		u64 length;
2036 
2037 		object_name = rbd_segment_name(rbd_dev, img_offset);
2038 		if (!object_name)
2039 			goto out_unwind;
2040 		offset = rbd_segment_offset(rbd_dev, img_offset);
2041 		length = rbd_segment_length(rbd_dev, img_offset, resid);
2042 		obj_request = rbd_obj_request_create(object_name,
2043 						offset, length, type);
2044 		/* object request has its own copy of the object name */
2045 		rbd_segment_name_free(object_name);
2046 		if (!obj_request)
2047 			goto out_unwind;
2048 
2049 		if (type == OBJ_REQUEST_BIO) {
2050 			unsigned int clone_size;
2051 
2052 			rbd_assert(length <= (u64)UINT_MAX);
2053 			clone_size = (unsigned int)length;
2054 			obj_request->bio_list =
2055 					bio_chain_clone_range(&bio_list,
2056 								&bio_offset,
2057 								clone_size,
2058 								GFP_ATOMIC);
2059 			if (!obj_request->bio_list)
2060 				goto out_partial;
2061 		} else {
2062 			unsigned int page_count;
2063 
2064 			obj_request->pages = pages;
2065 			page_count = (u32)calc_pages_for(offset, length);
2066 			obj_request->page_count = page_count;
2067 			if ((offset + length) & ~PAGE_MASK)
2068 				page_count--;	/* more on last page */
2069 			pages += page_count;
2070 		}
2071 
2072 		osd_req = rbd_osd_req_create(rbd_dev, write_request,
2073 						obj_request);
2074 		if (!osd_req)
2075 			goto out_partial;
2076 		obj_request->osd_req = osd_req;
2077 		obj_request->callback = rbd_img_obj_callback;
2078 
2079 		osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2080 						0, 0);
2081 		if (type == OBJ_REQUEST_BIO)
2082 			osd_req_op_extent_osd_data_bio(osd_req, 0,
2083 					obj_request->bio_list, length);
2084 		else
2085 			osd_req_op_extent_osd_data_pages(osd_req, 0,
2086 					obj_request->pages, length,
2087 					offset & ~PAGE_MASK, false, false);
2088 
2089 		if (write_request)
2090 			rbd_osd_req_format_write(obj_request);
2091 		else
2092 			rbd_osd_req_format_read(obj_request);
2093 
2094 		obj_request->img_offset = img_offset;
2095 		rbd_img_obj_request_add(img_request, obj_request);
2096 
2097 		img_offset += length;
2098 		resid -= length;
2099 	}
2100 
2101 	return 0;
2102 
2103 out_partial:
2104 	rbd_obj_request_put(obj_request);
2105 out_unwind:
2106 	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2107 		rbd_obj_request_put(obj_request);
2108 
2109 	return -ENOMEM;
2110 }
2111 
2112 static void
2113 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2114 {
2115 	struct rbd_img_request *img_request;
2116 	struct rbd_device *rbd_dev;
2117 	u64 length;
2118 	u32 page_count;
2119 
2120 	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2121 	rbd_assert(obj_request_img_data_test(obj_request));
2122 	img_request = obj_request->img_request;
2123 	rbd_assert(img_request);
2124 
2125 	rbd_dev = img_request->rbd_dev;
2126 	rbd_assert(rbd_dev);
2127 	length = (u64)1 << rbd_dev->header.obj_order;
2128 	page_count = (u32)calc_pages_for(0, length);
2129 
2130 	rbd_assert(obj_request->copyup_pages);
2131 	ceph_release_page_vector(obj_request->copyup_pages, page_count);
2132 	obj_request->copyup_pages = NULL;
2133 
2134 	/*
2135 	 * We want the transfer count to reflect the size of the
2136 	 * original write request.  There is no such thing as a
2137 	 * successful short write, so if the request was successful
2138 	 * we can just set it to the originally-requested length.
2139 	 */
2140 	if (!obj_request->result)
2141 		obj_request->xferred = obj_request->length;
2142 
2143 	/* Finish up with the normal image object callback */
2144 
2145 	rbd_img_obj_callback(obj_request);
2146 }
2147 
2148 static void
2149 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2150 {
2151 	struct rbd_obj_request *orig_request;
2152 	struct ceph_osd_request *osd_req;
2153 	struct ceph_osd_client *osdc;
2154 	struct rbd_device *rbd_dev;
2155 	struct page **pages;
2156 	int result;
2157 	u64 obj_size;
2158 	u64 xferred;
2159 
2160 	rbd_assert(img_request_child_test(img_request));
2161 
2162 	/* First get what we need from the image request */
2163 
2164 	pages = img_request->copyup_pages;
2165 	rbd_assert(pages != NULL);
2166 	img_request->copyup_pages = NULL;
2167 
2168 	orig_request = img_request->obj_request;
2169 	rbd_assert(orig_request != NULL);
2170 	rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2171 	result = img_request->result;
2172 	obj_size = img_request->length;
2173 	xferred = img_request->xferred;
2174 
2175 	rbd_dev = img_request->rbd_dev;
2176 	rbd_assert(rbd_dev);
2177 	rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2178 
2179 	rbd_img_request_put(img_request);
2180 
2181 	if (result)
2182 		goto out_err;
2183 
2184 	/* Allocate the new copyup osd request for the original request */
2185 
2186 	result = -ENOMEM;
2187 	rbd_assert(!orig_request->osd_req);
2188 	osd_req = rbd_osd_req_create_copyup(orig_request);
2189 	if (!osd_req)
2190 		goto out_err;
2191 	orig_request->osd_req = osd_req;
2192 	orig_request->copyup_pages = pages;
2193 
2194 	/* Initialize the copyup op */
2195 
2196 	osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2197 	osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2198 						false, false);
2199 
2200 	/* Then the original write request op */
2201 
2202 	osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2203 					orig_request->offset,
2204 					orig_request->length, 0, 0);
2205 	osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2206 					orig_request->length);
2207 
2208 	rbd_osd_req_format_write(orig_request);
2209 
2210 	/* All set, send it off. */
2211 
2212 	orig_request->callback = rbd_img_obj_copyup_callback;
2213 	osdc = &rbd_dev->rbd_client->client->osdc;
2214 	result = rbd_obj_request_submit(osdc, orig_request);
2215 	if (!result)
2216 		return;
2217 out_err:
2218 	/* Record the error code and complete the request */
2219 
2220 	orig_request->result = result;
2221 	orig_request->xferred = 0;
2222 	obj_request_done_set(orig_request);
2223 	rbd_obj_request_complete(orig_request);
2224 }
2225 
2226 /*
2227  * Read from the parent image the range of data that covers the
2228  * entire target of the given object request.  This is used for
2229  * satisfying a layered image write request when the target of an
2230  * object request from the image request does not exist.
2231  *
2232  * A page array big enough to hold the returned data is allocated
2233  * and supplied to rbd_img_request_fill() as the "data descriptor."
2234  * When the read completes, this page array will be transferred to
2235  * the original object request for the copyup operation.
2236  *
2237  * If an error occurs, record it as the result of the original
2238  * object request and mark it done so it gets completed.
2239  */
2240 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2241 {
2242 	struct rbd_img_request *img_request = NULL;
2243 	struct rbd_img_request *parent_request = NULL;
2244 	struct rbd_device *rbd_dev;
2245 	u64 img_offset;
2246 	u64 length;
2247 	struct page **pages = NULL;
2248 	u32 page_count;
2249 	int result;
2250 
2251 	rbd_assert(obj_request_img_data_test(obj_request));
2252 	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2253 
2254 	img_request = obj_request->img_request;
2255 	rbd_assert(img_request != NULL);
2256 	rbd_dev = img_request->rbd_dev;
2257 	rbd_assert(rbd_dev->parent != NULL);
2258 
2259 	/*
2260 	 * First things first.  The original osd request is of no
2261 	 * use to use any more, we'll need a new one that can hold
2262 	 * the two ops in a copyup request.  We'll get that later,
2263 	 * but for now we can release the old one.
2264 	 */
2265 	rbd_osd_req_destroy(obj_request->osd_req);
2266 	obj_request->osd_req = NULL;
2267 
2268 	/*
2269 	 * Determine the byte range covered by the object in the
2270 	 * child image to which the original request was to be sent.
2271 	 */
2272 	img_offset = obj_request->img_offset - obj_request->offset;
2273 	length = (u64)1 << rbd_dev->header.obj_order;
2274 
2275 	/*
2276 	 * There is no defined parent data beyond the parent
2277 	 * overlap, so limit what we read at that boundary if
2278 	 * necessary.
2279 	 */
2280 	if (img_offset + length > rbd_dev->parent_overlap) {
2281 		rbd_assert(img_offset < rbd_dev->parent_overlap);
2282 		length = rbd_dev->parent_overlap - img_offset;
2283 	}
2284 
2285 	/*
2286 	 * Allocate a page array big enough to receive the data read
2287 	 * from the parent.
2288 	 */
2289 	page_count = (u32)calc_pages_for(0, length);
2290 	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2291 	if (IS_ERR(pages)) {
2292 		result = PTR_ERR(pages);
2293 		pages = NULL;
2294 		goto out_err;
2295 	}
2296 
2297 	result = -ENOMEM;
2298 	parent_request = rbd_img_request_create(rbd_dev->parent,
2299 						img_offset, length,
2300 						false, true);
2301 	if (!parent_request)
2302 		goto out_err;
2303 	rbd_obj_request_get(obj_request);
2304 	parent_request->obj_request = obj_request;
2305 
2306 	result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2307 	if (result)
2308 		goto out_err;
2309 	parent_request->copyup_pages = pages;
2310 
2311 	parent_request->callback = rbd_img_obj_parent_read_full_callback;
2312 	result = rbd_img_request_submit(parent_request);
2313 	if (!result)
2314 		return 0;
2315 
2316 	parent_request->copyup_pages = NULL;
2317 	parent_request->obj_request = NULL;
2318 	rbd_obj_request_put(obj_request);
2319 out_err:
2320 	if (pages)
2321 		ceph_release_page_vector(pages, page_count);
2322 	if (parent_request)
2323 		rbd_img_request_put(parent_request);
2324 	obj_request->result = result;
2325 	obj_request->xferred = 0;
2326 	obj_request_done_set(obj_request);
2327 
2328 	return result;
2329 }
2330 
2331 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2332 {
2333 	struct rbd_obj_request *orig_request;
2334 	int result;
2335 
2336 	rbd_assert(!obj_request_img_data_test(obj_request));
2337 
2338 	/*
2339 	 * All we need from the object request is the original
2340 	 * request and the result of the STAT op.  Grab those, then
2341 	 * we're done with the request.
2342 	 */
2343 	orig_request = obj_request->obj_request;
2344 	obj_request->obj_request = NULL;
2345 	rbd_assert(orig_request);
2346 	rbd_assert(orig_request->img_request);
2347 
2348 	result = obj_request->result;
2349 	obj_request->result = 0;
2350 
2351 	dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2352 		obj_request, orig_request, result,
2353 		obj_request->xferred, obj_request->length);
2354 	rbd_obj_request_put(obj_request);
2355 
2356 	rbd_assert(orig_request);
2357 	rbd_assert(orig_request->img_request);
2358 
2359 	/*
2360 	 * Our only purpose here is to determine whether the object
2361 	 * exists, and we don't want to treat the non-existence as
2362 	 * an error.  If something else comes back, transfer the
2363 	 * error to the original request and complete it now.
2364 	 */
2365 	if (!result) {
2366 		obj_request_existence_set(orig_request, true);
2367 	} else if (result == -ENOENT) {
2368 		obj_request_existence_set(orig_request, false);
2369 	} else if (result) {
2370 		orig_request->result = result;
2371 		goto out;
2372 	}
2373 
2374 	/*
2375 	 * Resubmit the original request now that we have recorded
2376 	 * whether the target object exists.
2377 	 */
2378 	orig_request->result = rbd_img_obj_request_submit(orig_request);
2379 out:
2380 	if (orig_request->result)
2381 		rbd_obj_request_complete(orig_request);
2382 	rbd_obj_request_put(orig_request);
2383 }
2384 
2385 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2386 {
2387 	struct rbd_obj_request *stat_request;
2388 	struct rbd_device *rbd_dev;
2389 	struct ceph_osd_client *osdc;
2390 	struct page **pages = NULL;
2391 	u32 page_count;
2392 	size_t size;
2393 	int ret;
2394 
2395 	/*
2396 	 * The response data for a STAT call consists of:
2397 	 *     le64 length;
2398 	 *     struct {
2399 	 *         le32 tv_sec;
2400 	 *         le32 tv_nsec;
2401 	 *     } mtime;
2402 	 */
2403 	size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2404 	page_count = (u32)calc_pages_for(0, size);
2405 	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2406 	if (IS_ERR(pages))
2407 		return PTR_ERR(pages);
2408 
2409 	ret = -ENOMEM;
2410 	stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2411 							OBJ_REQUEST_PAGES);
2412 	if (!stat_request)
2413 		goto out;
2414 
2415 	rbd_obj_request_get(obj_request);
2416 	stat_request->obj_request = obj_request;
2417 	stat_request->pages = pages;
2418 	stat_request->page_count = page_count;
2419 
2420 	rbd_assert(obj_request->img_request);
2421 	rbd_dev = obj_request->img_request->rbd_dev;
2422 	stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2423 						stat_request);
2424 	if (!stat_request->osd_req)
2425 		goto out;
2426 	stat_request->callback = rbd_img_obj_exists_callback;
2427 
2428 	osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2429 	osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2430 					false, false);
2431 	rbd_osd_req_format_read(stat_request);
2432 
2433 	osdc = &rbd_dev->rbd_client->client->osdc;
2434 	ret = rbd_obj_request_submit(osdc, stat_request);
2435 out:
2436 	if (ret)
2437 		rbd_obj_request_put(obj_request);
2438 
2439 	return ret;
2440 }
2441 
2442 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2443 {
2444 	struct rbd_img_request *img_request;
2445 	struct rbd_device *rbd_dev;
2446 	bool known;
2447 
2448 	rbd_assert(obj_request_img_data_test(obj_request));
2449 
2450 	img_request = obj_request->img_request;
2451 	rbd_assert(img_request);
2452 	rbd_dev = img_request->rbd_dev;
2453 
2454 	/*
2455 	 * Only writes to layered images need special handling.
2456 	 * Reads and non-layered writes are simple object requests.
2457 	 * Layered writes that start beyond the end of the overlap
2458 	 * with the parent have no parent data, so they too are
2459 	 * simple object requests.  Finally, if the target object is
2460 	 * known to already exist, its parent data has already been
2461 	 * copied, so a write to the object can also be handled as a
2462 	 * simple object request.
2463 	 */
2464 	if (!img_request_write_test(img_request) ||
2465 		!img_request_layered_test(img_request) ||
2466 		rbd_dev->parent_overlap <= obj_request->img_offset ||
2467 		((known = obj_request_known_test(obj_request)) &&
2468 			obj_request_exists_test(obj_request))) {
2469 
2470 		struct rbd_device *rbd_dev;
2471 		struct ceph_osd_client *osdc;
2472 
2473 		rbd_dev = obj_request->img_request->rbd_dev;
2474 		osdc = &rbd_dev->rbd_client->client->osdc;
2475 
2476 		return rbd_obj_request_submit(osdc, obj_request);
2477 	}
2478 
2479 	/*
2480 	 * It's a layered write.  The target object might exist but
2481 	 * we may not know that yet.  If we know it doesn't exist,
2482 	 * start by reading the data for the full target object from
2483 	 * the parent so we can use it for a copyup to the target.
2484 	 */
2485 	if (known)
2486 		return rbd_img_obj_parent_read_full(obj_request);
2487 
2488 	/* We don't know whether the target exists.  Go find out. */
2489 
2490 	return rbd_img_obj_exists_submit(obj_request);
2491 }
2492 
2493 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2494 {
2495 	struct rbd_obj_request *obj_request;
2496 	struct rbd_obj_request *next_obj_request;
2497 
2498 	dout("%s: img %p\n", __func__, img_request);
2499 	for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2500 		int ret;
2501 
2502 		ret = rbd_img_obj_request_submit(obj_request);
2503 		if (ret)
2504 			return ret;
2505 	}
2506 
2507 	return 0;
2508 }
2509 
2510 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2511 {
2512 	struct rbd_obj_request *obj_request;
2513 	struct rbd_device *rbd_dev;
2514 	u64 obj_end;
2515 
2516 	rbd_assert(img_request_child_test(img_request));
2517 
2518 	obj_request = img_request->obj_request;
2519 	rbd_assert(obj_request);
2520 	rbd_assert(obj_request->img_request);
2521 
2522 	obj_request->result = img_request->result;
2523 	if (obj_request->result)
2524 		goto out;
2525 
2526 	/*
2527 	 * We need to zero anything beyond the parent overlap
2528 	 * boundary.  Since rbd_img_obj_request_read_callback()
2529 	 * will zero anything beyond the end of a short read, an
2530 	 * easy way to do this is to pretend the data from the
2531 	 * parent came up short--ending at the overlap boundary.
2532 	 */
2533 	rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2534 	obj_end = obj_request->img_offset + obj_request->length;
2535 	rbd_dev = obj_request->img_request->rbd_dev;
2536 	if (obj_end > rbd_dev->parent_overlap) {
2537 		u64 xferred = 0;
2538 
2539 		if (obj_request->img_offset < rbd_dev->parent_overlap)
2540 			xferred = rbd_dev->parent_overlap -
2541 					obj_request->img_offset;
2542 
2543 		obj_request->xferred = min(img_request->xferred, xferred);
2544 	} else {
2545 		obj_request->xferred = img_request->xferred;
2546 	}
2547 out:
2548 	rbd_img_request_put(img_request);
2549 	rbd_img_obj_request_read_callback(obj_request);
2550 	rbd_obj_request_complete(obj_request);
2551 }
2552 
2553 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2554 {
2555 	struct rbd_device *rbd_dev;
2556 	struct rbd_img_request *img_request;
2557 	int result;
2558 
2559 	rbd_assert(obj_request_img_data_test(obj_request));
2560 	rbd_assert(obj_request->img_request != NULL);
2561 	rbd_assert(obj_request->result == (s32) -ENOENT);
2562 	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2563 
2564 	rbd_dev = obj_request->img_request->rbd_dev;
2565 	rbd_assert(rbd_dev->parent != NULL);
2566 	/* rbd_read_finish(obj_request, obj_request->length); */
2567 	img_request = rbd_img_request_create(rbd_dev->parent,
2568 						obj_request->img_offset,
2569 						obj_request->length,
2570 						false, true);
2571 	result = -ENOMEM;
2572 	if (!img_request)
2573 		goto out_err;
2574 
2575 	rbd_obj_request_get(obj_request);
2576 	img_request->obj_request = obj_request;
2577 
2578 	result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2579 					obj_request->bio_list);
2580 	if (result)
2581 		goto out_err;
2582 
2583 	img_request->callback = rbd_img_parent_read_callback;
2584 	result = rbd_img_request_submit(img_request);
2585 	if (result)
2586 		goto out_err;
2587 
2588 	return;
2589 out_err:
2590 	if (img_request)
2591 		rbd_img_request_put(img_request);
2592 	obj_request->result = result;
2593 	obj_request->xferred = 0;
2594 	obj_request_done_set(obj_request);
2595 }
2596 
2597 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2598 {
2599 	struct rbd_obj_request *obj_request;
2600 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2601 	int ret;
2602 
2603 	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2604 							OBJ_REQUEST_NODATA);
2605 	if (!obj_request)
2606 		return -ENOMEM;
2607 
2608 	ret = -ENOMEM;
2609 	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2610 	if (!obj_request->osd_req)
2611 		goto out;
2612 	obj_request->callback = rbd_obj_request_put;
2613 
2614 	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2615 					notify_id, 0, 0);
2616 	rbd_osd_req_format_read(obj_request);
2617 
2618 	ret = rbd_obj_request_submit(osdc, obj_request);
2619 out:
2620 	if (ret)
2621 		rbd_obj_request_put(obj_request);
2622 
2623 	return ret;
2624 }
2625 
2626 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2627 {
2628 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
2629 
2630 	if (!rbd_dev)
2631 		return;
2632 
2633 	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2634 		rbd_dev->header_name, (unsigned long long)notify_id,
2635 		(unsigned int)opcode);
2636 	(void)rbd_dev_refresh(rbd_dev);
2637 
2638 	rbd_obj_notify_ack(rbd_dev, notify_id);
2639 }
2640 
2641 /*
2642  * Request sync osd watch/unwatch.  The value of "start" determines
2643  * whether a watch request is being initiated or torn down.
2644  */
2645 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2646 {
2647 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2648 	struct rbd_obj_request *obj_request;
2649 	int ret;
2650 
2651 	rbd_assert(start ^ !!rbd_dev->watch_event);
2652 	rbd_assert(start ^ !!rbd_dev->watch_request);
2653 
2654 	if (start) {
2655 		ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2656 						&rbd_dev->watch_event);
2657 		if (ret < 0)
2658 			return ret;
2659 		rbd_assert(rbd_dev->watch_event != NULL);
2660 	}
2661 
2662 	ret = -ENOMEM;
2663 	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2664 							OBJ_REQUEST_NODATA);
2665 	if (!obj_request)
2666 		goto out_cancel;
2667 
2668 	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2669 	if (!obj_request->osd_req)
2670 		goto out_cancel;
2671 
2672 	if (start)
2673 		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2674 	else
2675 		ceph_osdc_unregister_linger_request(osdc,
2676 					rbd_dev->watch_request->osd_req);
2677 
2678 	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2679 				rbd_dev->watch_event->cookie, 0, start);
2680 	rbd_osd_req_format_write(obj_request);
2681 
2682 	ret = rbd_obj_request_submit(osdc, obj_request);
2683 	if (ret)
2684 		goto out_cancel;
2685 	ret = rbd_obj_request_wait(obj_request);
2686 	if (ret)
2687 		goto out_cancel;
2688 	ret = obj_request->result;
2689 	if (ret)
2690 		goto out_cancel;
2691 
2692 	/*
2693 	 * A watch request is set to linger, so the underlying osd
2694 	 * request won't go away until we unregister it.  We retain
2695 	 * a pointer to the object request during that time (in
2696 	 * rbd_dev->watch_request), so we'll keep a reference to
2697 	 * it.  We'll drop that reference (below) after we've
2698 	 * unregistered it.
2699 	 */
2700 	if (start) {
2701 		rbd_dev->watch_request = obj_request;
2702 
2703 		return 0;
2704 	}
2705 
2706 	/* We have successfully torn down the watch request */
2707 
2708 	rbd_obj_request_put(rbd_dev->watch_request);
2709 	rbd_dev->watch_request = NULL;
2710 out_cancel:
2711 	/* Cancel the event if we're tearing down, or on error */
2712 	ceph_osdc_cancel_event(rbd_dev->watch_event);
2713 	rbd_dev->watch_event = NULL;
2714 	if (obj_request)
2715 		rbd_obj_request_put(obj_request);
2716 
2717 	return ret;
2718 }
2719 
2720 /*
2721  * Synchronous osd object method call.  Returns the number of bytes
2722  * returned in the outbound buffer, or a negative error code.
2723  */
2724 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2725 			     const char *object_name,
2726 			     const char *class_name,
2727 			     const char *method_name,
2728 			     const void *outbound,
2729 			     size_t outbound_size,
2730 			     void *inbound,
2731 			     size_t inbound_size)
2732 {
2733 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2734 	struct rbd_obj_request *obj_request;
2735 	struct page **pages;
2736 	u32 page_count;
2737 	int ret;
2738 
2739 	/*
2740 	 * Method calls are ultimately read operations.  The result
2741 	 * should placed into the inbound buffer provided.  They
2742 	 * also supply outbound data--parameters for the object
2743 	 * method.  Currently if this is present it will be a
2744 	 * snapshot id.
2745 	 */
2746 	page_count = (u32)calc_pages_for(0, inbound_size);
2747 	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2748 	if (IS_ERR(pages))
2749 		return PTR_ERR(pages);
2750 
2751 	ret = -ENOMEM;
2752 	obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2753 							OBJ_REQUEST_PAGES);
2754 	if (!obj_request)
2755 		goto out;
2756 
2757 	obj_request->pages = pages;
2758 	obj_request->page_count = page_count;
2759 
2760 	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2761 	if (!obj_request->osd_req)
2762 		goto out;
2763 
2764 	osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2765 					class_name, method_name);
2766 	if (outbound_size) {
2767 		struct ceph_pagelist *pagelist;
2768 
2769 		pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2770 		if (!pagelist)
2771 			goto out;
2772 
2773 		ceph_pagelist_init(pagelist);
2774 		ceph_pagelist_append(pagelist, outbound, outbound_size);
2775 		osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2776 						pagelist);
2777 	}
2778 	osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2779 					obj_request->pages, inbound_size,
2780 					0, false, false);
2781 	rbd_osd_req_format_read(obj_request);
2782 
2783 	ret = rbd_obj_request_submit(osdc, obj_request);
2784 	if (ret)
2785 		goto out;
2786 	ret = rbd_obj_request_wait(obj_request);
2787 	if (ret)
2788 		goto out;
2789 
2790 	ret = obj_request->result;
2791 	if (ret < 0)
2792 		goto out;
2793 
2794 	rbd_assert(obj_request->xferred < (u64)INT_MAX);
2795 	ret = (int)obj_request->xferred;
2796 	ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2797 out:
2798 	if (obj_request)
2799 		rbd_obj_request_put(obj_request);
2800 	else
2801 		ceph_release_page_vector(pages, page_count);
2802 
2803 	return ret;
2804 }
2805 
2806 static void rbd_request_fn(struct request_queue *q)
2807 		__releases(q->queue_lock) __acquires(q->queue_lock)
2808 {
2809 	struct rbd_device *rbd_dev = q->queuedata;
2810 	bool read_only = rbd_dev->mapping.read_only;
2811 	struct request *rq;
2812 	int result;
2813 
2814 	while ((rq = blk_fetch_request(q))) {
2815 		bool write_request = rq_data_dir(rq) == WRITE;
2816 		struct rbd_img_request *img_request;
2817 		u64 offset;
2818 		u64 length;
2819 
2820 		/* Ignore any non-FS requests that filter through. */
2821 
2822 		if (rq->cmd_type != REQ_TYPE_FS) {
2823 			dout("%s: non-fs request type %d\n", __func__,
2824 				(int) rq->cmd_type);
2825 			__blk_end_request_all(rq, 0);
2826 			continue;
2827 		}
2828 
2829 		/* Ignore/skip any zero-length requests */
2830 
2831 		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2832 		length = (u64) blk_rq_bytes(rq);
2833 
2834 		if (!length) {
2835 			dout("%s: zero-length request\n", __func__);
2836 			__blk_end_request_all(rq, 0);
2837 			continue;
2838 		}
2839 
2840 		spin_unlock_irq(q->queue_lock);
2841 
2842 		/* Disallow writes to a read-only device */
2843 
2844 		if (write_request) {
2845 			result = -EROFS;
2846 			if (read_only)
2847 				goto end_request;
2848 			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2849 		}
2850 
2851 		/*
2852 		 * Quit early if the mapped snapshot no longer
2853 		 * exists.  It's still possible the snapshot will
2854 		 * have disappeared by the time our request arrives
2855 		 * at the osd, but there's no sense in sending it if
2856 		 * we already know.
2857 		 */
2858 		if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2859 			dout("request for non-existent snapshot");
2860 			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2861 			result = -ENXIO;
2862 			goto end_request;
2863 		}
2864 
2865 		result = -EINVAL;
2866 		if (offset && length > U64_MAX - offset + 1) {
2867 			rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2868 				offset, length);
2869 			goto end_request;	/* Shouldn't happen */
2870 		}
2871 
2872 		result = -ENOMEM;
2873 		img_request = rbd_img_request_create(rbd_dev, offset, length,
2874 							write_request, false);
2875 		if (!img_request)
2876 			goto end_request;
2877 
2878 		img_request->rq = rq;
2879 
2880 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2881 						rq->bio);
2882 		if (!result)
2883 			result = rbd_img_request_submit(img_request);
2884 		if (result)
2885 			rbd_img_request_put(img_request);
2886 end_request:
2887 		spin_lock_irq(q->queue_lock);
2888 		if (result < 0) {
2889 			rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2890 				write_request ? "write" : "read",
2891 				length, offset, result);
2892 
2893 			__blk_end_request_all(rq, result);
2894 		}
2895 	}
2896 }
2897 
2898 /*
2899  * a queue callback. Makes sure that we don't create a bio that spans across
2900  * multiple osd objects. One exception would be with a single page bios,
2901  * which we handle later at bio_chain_clone_range()
2902  */
2903 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2904 			  struct bio_vec *bvec)
2905 {
2906 	struct rbd_device *rbd_dev = q->queuedata;
2907 	sector_t sector_offset;
2908 	sector_t sectors_per_obj;
2909 	sector_t obj_sector_offset;
2910 	int ret;
2911 
2912 	/*
2913 	 * Find how far into its rbd object the partition-relative
2914 	 * bio start sector is to offset relative to the enclosing
2915 	 * device.
2916 	 */
2917 	sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2918 	sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2919 	obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2920 
2921 	/*
2922 	 * Compute the number of bytes from that offset to the end
2923 	 * of the object.  Account for what's already used by the bio.
2924 	 */
2925 	ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2926 	if (ret > bmd->bi_size)
2927 		ret -= bmd->bi_size;
2928 	else
2929 		ret = 0;
2930 
2931 	/*
2932 	 * Don't send back more than was asked for.  And if the bio
2933 	 * was empty, let the whole thing through because:  "Note
2934 	 * that a block device *must* allow a single page to be
2935 	 * added to an empty bio."
2936 	 */
2937 	rbd_assert(bvec->bv_len <= PAGE_SIZE);
2938 	if (ret > (int) bvec->bv_len || !bmd->bi_size)
2939 		ret = (int) bvec->bv_len;
2940 
2941 	return ret;
2942 }
2943 
2944 static void rbd_free_disk(struct rbd_device *rbd_dev)
2945 {
2946 	struct gendisk *disk = rbd_dev->disk;
2947 
2948 	if (!disk)
2949 		return;
2950 
2951 	rbd_dev->disk = NULL;
2952 	if (disk->flags & GENHD_FL_UP) {
2953 		del_gendisk(disk);
2954 		if (disk->queue)
2955 			blk_cleanup_queue(disk->queue);
2956 	}
2957 	put_disk(disk);
2958 }
2959 
2960 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2961 				const char *object_name,
2962 				u64 offset, u64 length, void *buf)
2963 
2964 {
2965 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2966 	struct rbd_obj_request *obj_request;
2967 	struct page **pages = NULL;
2968 	u32 page_count;
2969 	size_t size;
2970 	int ret;
2971 
2972 	page_count = (u32) calc_pages_for(offset, length);
2973 	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2974 	if (IS_ERR(pages))
2975 		ret = PTR_ERR(pages);
2976 
2977 	ret = -ENOMEM;
2978 	obj_request = rbd_obj_request_create(object_name, offset, length,
2979 							OBJ_REQUEST_PAGES);
2980 	if (!obj_request)
2981 		goto out;
2982 
2983 	obj_request->pages = pages;
2984 	obj_request->page_count = page_count;
2985 
2986 	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2987 	if (!obj_request->osd_req)
2988 		goto out;
2989 
2990 	osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2991 					offset, length, 0, 0);
2992 	osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2993 					obj_request->pages,
2994 					obj_request->length,
2995 					obj_request->offset & ~PAGE_MASK,
2996 					false, false);
2997 	rbd_osd_req_format_read(obj_request);
2998 
2999 	ret = rbd_obj_request_submit(osdc, obj_request);
3000 	if (ret)
3001 		goto out;
3002 	ret = rbd_obj_request_wait(obj_request);
3003 	if (ret)
3004 		goto out;
3005 
3006 	ret = obj_request->result;
3007 	if (ret < 0)
3008 		goto out;
3009 
3010 	rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3011 	size = (size_t) obj_request->xferred;
3012 	ceph_copy_from_page_vector(pages, buf, 0, size);
3013 	rbd_assert(size <= (size_t)INT_MAX);
3014 	ret = (int)size;
3015 out:
3016 	if (obj_request)
3017 		rbd_obj_request_put(obj_request);
3018 	else
3019 		ceph_release_page_vector(pages, page_count);
3020 
3021 	return ret;
3022 }
3023 
3024 /*
3025  * Read the complete header for the given rbd device.
3026  *
3027  * Returns a pointer to a dynamically-allocated buffer containing
3028  * the complete and validated header.  Caller can pass the address
3029  * of a variable that will be filled in with the version of the
3030  * header object at the time it was read.
3031  *
3032  * Returns a pointer-coded errno if a failure occurs.
3033  */
3034 static struct rbd_image_header_ondisk *
3035 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3036 {
3037 	struct rbd_image_header_ondisk *ondisk = NULL;
3038 	u32 snap_count = 0;
3039 	u64 names_size = 0;
3040 	u32 want_count;
3041 	int ret;
3042 
3043 	/*
3044 	 * The complete header will include an array of its 64-bit
3045 	 * snapshot ids, followed by the names of those snapshots as
3046 	 * a contiguous block of NUL-terminated strings.  Note that
3047 	 * the number of snapshots could change by the time we read
3048 	 * it in, in which case we re-read it.
3049 	 */
3050 	do {
3051 		size_t size;
3052 
3053 		kfree(ondisk);
3054 
3055 		size = sizeof (*ondisk);
3056 		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3057 		size += names_size;
3058 		ondisk = kmalloc(size, GFP_KERNEL);
3059 		if (!ondisk)
3060 			return ERR_PTR(-ENOMEM);
3061 
3062 		ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3063 				       0, size, ondisk);
3064 		if (ret < 0)
3065 			goto out_err;
3066 		if ((size_t)ret < size) {
3067 			ret = -ENXIO;
3068 			rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3069 				size, ret);
3070 			goto out_err;
3071 		}
3072 		if (!rbd_dev_ondisk_valid(ondisk)) {
3073 			ret = -ENXIO;
3074 			rbd_warn(rbd_dev, "invalid header");
3075 			goto out_err;
3076 		}
3077 
3078 		names_size = le64_to_cpu(ondisk->snap_names_len);
3079 		want_count = snap_count;
3080 		snap_count = le32_to_cpu(ondisk->snap_count);
3081 	} while (snap_count != want_count);
3082 
3083 	return ondisk;
3084 
3085 out_err:
3086 	kfree(ondisk);
3087 
3088 	return ERR_PTR(ret);
3089 }
3090 
3091 /*
3092  * reload the ondisk the header
3093  */
3094 static int rbd_read_header(struct rbd_device *rbd_dev,
3095 			   struct rbd_image_header *header)
3096 {
3097 	struct rbd_image_header_ondisk *ondisk;
3098 	int ret;
3099 
3100 	ondisk = rbd_dev_v1_header_read(rbd_dev);
3101 	if (IS_ERR(ondisk))
3102 		return PTR_ERR(ondisk);
3103 	ret = rbd_header_from_disk(header, ondisk);
3104 	kfree(ondisk);
3105 
3106 	return ret;
3107 }
3108 
3109 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3110 {
3111 	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3112 		return;
3113 
3114 	if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3115 		sector_t size;
3116 
3117 		rbd_dev->mapping.size = rbd_dev->header.image_size;
3118 		size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3119 		dout("setting size to %llu sectors", (unsigned long long)size);
3120 		set_capacity(rbd_dev->disk, size);
3121 	}
3122 }
3123 
3124 /*
3125  * only read the first part of the ondisk header, without the snaps info
3126  */
3127 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3128 {
3129 	int ret;
3130 	struct rbd_image_header h;
3131 
3132 	ret = rbd_read_header(rbd_dev, &h);
3133 	if (ret < 0)
3134 		return ret;
3135 
3136 	down_write(&rbd_dev->header_rwsem);
3137 
3138 	/* Update image size, and check for resize of mapped image */
3139 	rbd_dev->header.image_size = h.image_size;
3140 	rbd_update_mapping_size(rbd_dev);
3141 
3142 	/* rbd_dev->header.object_prefix shouldn't change */
3143 	kfree(rbd_dev->header.snap_sizes);
3144 	kfree(rbd_dev->header.snap_names);
3145 	/* osd requests may still refer to snapc */
3146 	ceph_put_snap_context(rbd_dev->header.snapc);
3147 
3148 	rbd_dev->header.image_size = h.image_size;
3149 	rbd_dev->header.snapc = h.snapc;
3150 	rbd_dev->header.snap_names = h.snap_names;
3151 	rbd_dev->header.snap_sizes = h.snap_sizes;
3152 	/* Free the extra copy of the object prefix */
3153 	if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3154 		rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3155 	kfree(h.object_prefix);
3156 
3157 	up_write(&rbd_dev->header_rwsem);
3158 
3159 	return ret;
3160 }
3161 
3162 /*
3163  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3164  * has disappeared from the (just updated) snapshot context.
3165  */
3166 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3167 {
3168 	u64 snap_id;
3169 
3170 	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3171 		return;
3172 
3173 	snap_id = rbd_dev->spec->snap_id;
3174 	if (snap_id == CEPH_NOSNAP)
3175 		return;
3176 
3177 	if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3178 		clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3179 }
3180 
3181 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3182 {
3183 	u64 image_size;
3184 	int ret;
3185 
3186 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3187 	image_size = rbd_dev->header.image_size;
3188 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3189 	if (rbd_dev->image_format == 1)
3190 		ret = rbd_dev_v1_refresh(rbd_dev);
3191 	else
3192 		ret = rbd_dev_v2_refresh(rbd_dev);
3193 
3194 	/* If it's a mapped snapshot, validate its EXISTS flag */
3195 
3196 	rbd_exists_validate(rbd_dev);
3197 	mutex_unlock(&ctl_mutex);
3198 	if (ret)
3199 		rbd_warn(rbd_dev, "got notification but failed to "
3200 			   " update snaps: %d\n", ret);
3201 	if (image_size != rbd_dev->header.image_size)
3202 		revalidate_disk(rbd_dev->disk);
3203 
3204 	return ret;
3205 }
3206 
3207 static int rbd_init_disk(struct rbd_device *rbd_dev)
3208 {
3209 	struct gendisk *disk;
3210 	struct request_queue *q;
3211 	u64 segment_size;
3212 
3213 	/* create gendisk info */
3214 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3215 	if (!disk)
3216 		return -ENOMEM;
3217 
3218 	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3219 		 rbd_dev->dev_id);
3220 	disk->major = rbd_dev->major;
3221 	disk->first_minor = 0;
3222 	disk->fops = &rbd_bd_ops;
3223 	disk->private_data = rbd_dev;
3224 
3225 	q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3226 	if (!q)
3227 		goto out_disk;
3228 
3229 	/* We use the default size, but let's be explicit about it. */
3230 	blk_queue_physical_block_size(q, SECTOR_SIZE);
3231 
3232 	/* set io sizes to object size */
3233 	segment_size = rbd_obj_bytes(&rbd_dev->header);
3234 	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3235 	blk_queue_max_segment_size(q, segment_size);
3236 	blk_queue_io_min(q, segment_size);
3237 	blk_queue_io_opt(q, segment_size);
3238 
3239 	blk_queue_merge_bvec(q, rbd_merge_bvec);
3240 	disk->queue = q;
3241 
3242 	q->queuedata = rbd_dev;
3243 
3244 	rbd_dev->disk = disk;
3245 
3246 	return 0;
3247 out_disk:
3248 	put_disk(disk);
3249 
3250 	return -ENOMEM;
3251 }
3252 
3253 /*
3254   sysfs
3255 */
3256 
3257 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3258 {
3259 	return container_of(dev, struct rbd_device, dev);
3260 }
3261 
3262 static ssize_t rbd_size_show(struct device *dev,
3263 			     struct device_attribute *attr, char *buf)
3264 {
3265 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3266 
3267 	return sprintf(buf, "%llu\n",
3268 		(unsigned long long)rbd_dev->mapping.size);
3269 }
3270 
3271 /*
3272  * Note this shows the features for whatever's mapped, which is not
3273  * necessarily the base image.
3274  */
3275 static ssize_t rbd_features_show(struct device *dev,
3276 			     struct device_attribute *attr, char *buf)
3277 {
3278 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3279 
3280 	return sprintf(buf, "0x%016llx\n",
3281 			(unsigned long long)rbd_dev->mapping.features);
3282 }
3283 
3284 static ssize_t rbd_major_show(struct device *dev,
3285 			      struct device_attribute *attr, char *buf)
3286 {
3287 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3288 
3289 	if (rbd_dev->major)
3290 		return sprintf(buf, "%d\n", rbd_dev->major);
3291 
3292 	return sprintf(buf, "(none)\n");
3293 
3294 }
3295 
3296 static ssize_t rbd_client_id_show(struct device *dev,
3297 				  struct device_attribute *attr, char *buf)
3298 {
3299 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3300 
3301 	return sprintf(buf, "client%lld\n",
3302 			ceph_client_id(rbd_dev->rbd_client->client));
3303 }
3304 
3305 static ssize_t rbd_pool_show(struct device *dev,
3306 			     struct device_attribute *attr, char *buf)
3307 {
3308 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3309 
3310 	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3311 }
3312 
3313 static ssize_t rbd_pool_id_show(struct device *dev,
3314 			     struct device_attribute *attr, char *buf)
3315 {
3316 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3317 
3318 	return sprintf(buf, "%llu\n",
3319 			(unsigned long long) rbd_dev->spec->pool_id);
3320 }
3321 
3322 static ssize_t rbd_name_show(struct device *dev,
3323 			     struct device_attribute *attr, char *buf)
3324 {
3325 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3326 
3327 	if (rbd_dev->spec->image_name)
3328 		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3329 
3330 	return sprintf(buf, "(unknown)\n");
3331 }
3332 
3333 static ssize_t rbd_image_id_show(struct device *dev,
3334 			     struct device_attribute *attr, char *buf)
3335 {
3336 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3337 
3338 	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3339 }
3340 
3341 /*
3342  * Shows the name of the currently-mapped snapshot (or
3343  * RBD_SNAP_HEAD_NAME for the base image).
3344  */
3345 static ssize_t rbd_snap_show(struct device *dev,
3346 			     struct device_attribute *attr,
3347 			     char *buf)
3348 {
3349 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3350 
3351 	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3352 }
3353 
3354 /*
3355  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3356  * for the parent image.  If there is no parent, simply shows
3357  * "(no parent image)".
3358  */
3359 static ssize_t rbd_parent_show(struct device *dev,
3360 			     struct device_attribute *attr,
3361 			     char *buf)
3362 {
3363 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3364 	struct rbd_spec *spec = rbd_dev->parent_spec;
3365 	int count;
3366 	char *bufp = buf;
3367 
3368 	if (!spec)
3369 		return sprintf(buf, "(no parent image)\n");
3370 
3371 	count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3372 			(unsigned long long) spec->pool_id, spec->pool_name);
3373 	if (count < 0)
3374 		return count;
3375 	bufp += count;
3376 
3377 	count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3378 			spec->image_name ? spec->image_name : "(unknown)");
3379 	if (count < 0)
3380 		return count;
3381 	bufp += count;
3382 
3383 	count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3384 			(unsigned long long) spec->snap_id, spec->snap_name);
3385 	if (count < 0)
3386 		return count;
3387 	bufp += count;
3388 
3389 	count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3390 	if (count < 0)
3391 		return count;
3392 	bufp += count;
3393 
3394 	return (ssize_t) (bufp - buf);
3395 }
3396 
3397 static ssize_t rbd_image_refresh(struct device *dev,
3398 				 struct device_attribute *attr,
3399 				 const char *buf,
3400 				 size_t size)
3401 {
3402 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3403 	int ret;
3404 
3405 	ret = rbd_dev_refresh(rbd_dev);
3406 
3407 	return ret < 0 ? ret : size;
3408 }
3409 
3410 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3411 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3412 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3413 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3414 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3415 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3416 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3417 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3418 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3419 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3420 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3421 
3422 static struct attribute *rbd_attrs[] = {
3423 	&dev_attr_size.attr,
3424 	&dev_attr_features.attr,
3425 	&dev_attr_major.attr,
3426 	&dev_attr_client_id.attr,
3427 	&dev_attr_pool.attr,
3428 	&dev_attr_pool_id.attr,
3429 	&dev_attr_name.attr,
3430 	&dev_attr_image_id.attr,
3431 	&dev_attr_current_snap.attr,
3432 	&dev_attr_parent.attr,
3433 	&dev_attr_refresh.attr,
3434 	NULL
3435 };
3436 
3437 static struct attribute_group rbd_attr_group = {
3438 	.attrs = rbd_attrs,
3439 };
3440 
3441 static const struct attribute_group *rbd_attr_groups[] = {
3442 	&rbd_attr_group,
3443 	NULL
3444 };
3445 
3446 static void rbd_sysfs_dev_release(struct device *dev)
3447 {
3448 }
3449 
3450 static struct device_type rbd_device_type = {
3451 	.name		= "rbd",
3452 	.groups		= rbd_attr_groups,
3453 	.release	= rbd_sysfs_dev_release,
3454 };
3455 
3456 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3457 {
3458 	kref_get(&spec->kref);
3459 
3460 	return spec;
3461 }
3462 
3463 static void rbd_spec_free(struct kref *kref);
3464 static void rbd_spec_put(struct rbd_spec *spec)
3465 {
3466 	if (spec)
3467 		kref_put(&spec->kref, rbd_spec_free);
3468 }
3469 
3470 static struct rbd_spec *rbd_spec_alloc(void)
3471 {
3472 	struct rbd_spec *spec;
3473 
3474 	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3475 	if (!spec)
3476 		return NULL;
3477 	kref_init(&spec->kref);
3478 
3479 	return spec;
3480 }
3481 
3482 static void rbd_spec_free(struct kref *kref)
3483 {
3484 	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3485 
3486 	kfree(spec->pool_name);
3487 	kfree(spec->image_id);
3488 	kfree(spec->image_name);
3489 	kfree(spec->snap_name);
3490 	kfree(spec);
3491 }
3492 
3493 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3494 				struct rbd_spec *spec)
3495 {
3496 	struct rbd_device *rbd_dev;
3497 
3498 	rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3499 	if (!rbd_dev)
3500 		return NULL;
3501 
3502 	spin_lock_init(&rbd_dev->lock);
3503 	rbd_dev->flags = 0;
3504 	INIT_LIST_HEAD(&rbd_dev->node);
3505 	init_rwsem(&rbd_dev->header_rwsem);
3506 
3507 	rbd_dev->spec = spec;
3508 	rbd_dev->rbd_client = rbdc;
3509 
3510 	/* Initialize the layout used for all rbd requests */
3511 
3512 	rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3513 	rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3514 	rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3515 	rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3516 
3517 	return rbd_dev;
3518 }
3519 
3520 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3521 {
3522 	rbd_put_client(rbd_dev->rbd_client);
3523 	rbd_spec_put(rbd_dev->spec);
3524 	kfree(rbd_dev);
3525 }
3526 
3527 /*
3528  * Get the size and object order for an image snapshot, or if
3529  * snap_id is CEPH_NOSNAP, gets this information for the base
3530  * image.
3531  */
3532 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3533 				u8 *order, u64 *snap_size)
3534 {
3535 	__le64 snapid = cpu_to_le64(snap_id);
3536 	int ret;
3537 	struct {
3538 		u8 order;
3539 		__le64 size;
3540 	} __attribute__ ((packed)) size_buf = { 0 };
3541 
3542 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3543 				"rbd", "get_size",
3544 				&snapid, sizeof (snapid),
3545 				&size_buf, sizeof (size_buf));
3546 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3547 	if (ret < 0)
3548 		return ret;
3549 	if (ret < sizeof (size_buf))
3550 		return -ERANGE;
3551 
3552 	if (order)
3553 		*order = size_buf.order;
3554 	*snap_size = le64_to_cpu(size_buf.size);
3555 
3556 	dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3557 		(unsigned long long)snap_id, (unsigned int)*order,
3558 		(unsigned long long)*snap_size);
3559 
3560 	return 0;
3561 }
3562 
3563 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3564 {
3565 	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3566 					&rbd_dev->header.obj_order,
3567 					&rbd_dev->header.image_size);
3568 }
3569 
3570 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3571 {
3572 	void *reply_buf;
3573 	int ret;
3574 	void *p;
3575 
3576 	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3577 	if (!reply_buf)
3578 		return -ENOMEM;
3579 
3580 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3581 				"rbd", "get_object_prefix", NULL, 0,
3582 				reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3583 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3584 	if (ret < 0)
3585 		goto out;
3586 
3587 	p = reply_buf;
3588 	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3589 						p + ret, NULL, GFP_NOIO);
3590 	ret = 0;
3591 
3592 	if (IS_ERR(rbd_dev->header.object_prefix)) {
3593 		ret = PTR_ERR(rbd_dev->header.object_prefix);
3594 		rbd_dev->header.object_prefix = NULL;
3595 	} else {
3596 		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3597 	}
3598 out:
3599 	kfree(reply_buf);
3600 
3601 	return ret;
3602 }
3603 
3604 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3605 		u64 *snap_features)
3606 {
3607 	__le64 snapid = cpu_to_le64(snap_id);
3608 	struct {
3609 		__le64 features;
3610 		__le64 incompat;
3611 	} __attribute__ ((packed)) features_buf = { 0 };
3612 	u64 incompat;
3613 	int ret;
3614 
3615 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3616 				"rbd", "get_features",
3617 				&snapid, sizeof (snapid),
3618 				&features_buf, sizeof (features_buf));
3619 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3620 	if (ret < 0)
3621 		return ret;
3622 	if (ret < sizeof (features_buf))
3623 		return -ERANGE;
3624 
3625 	incompat = le64_to_cpu(features_buf.incompat);
3626 	if (incompat & ~RBD_FEATURES_SUPPORTED)
3627 		return -ENXIO;
3628 
3629 	*snap_features = le64_to_cpu(features_buf.features);
3630 
3631 	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3632 		(unsigned long long)snap_id,
3633 		(unsigned long long)*snap_features,
3634 		(unsigned long long)le64_to_cpu(features_buf.incompat));
3635 
3636 	return 0;
3637 }
3638 
3639 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3640 {
3641 	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3642 						&rbd_dev->header.features);
3643 }
3644 
3645 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3646 {
3647 	struct rbd_spec *parent_spec;
3648 	size_t size;
3649 	void *reply_buf = NULL;
3650 	__le64 snapid;
3651 	void *p;
3652 	void *end;
3653 	char *image_id;
3654 	u64 overlap;
3655 	int ret;
3656 
3657 	parent_spec = rbd_spec_alloc();
3658 	if (!parent_spec)
3659 		return -ENOMEM;
3660 
3661 	size = sizeof (__le64) +				/* pool_id */
3662 		sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +	/* image_id */
3663 		sizeof (__le64) +				/* snap_id */
3664 		sizeof (__le64);				/* overlap */
3665 	reply_buf = kmalloc(size, GFP_KERNEL);
3666 	if (!reply_buf) {
3667 		ret = -ENOMEM;
3668 		goto out_err;
3669 	}
3670 
3671 	snapid = cpu_to_le64(CEPH_NOSNAP);
3672 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3673 				"rbd", "get_parent",
3674 				&snapid, sizeof (snapid),
3675 				reply_buf, size);
3676 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3677 	if (ret < 0)
3678 		goto out_err;
3679 
3680 	p = reply_buf;
3681 	end = reply_buf + ret;
3682 	ret = -ERANGE;
3683 	ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3684 	if (parent_spec->pool_id == CEPH_NOPOOL)
3685 		goto out;	/* No parent?  No problem. */
3686 
3687 	/* The ceph file layout needs to fit pool id in 32 bits */
3688 
3689 	ret = -EIO;
3690 	if (parent_spec->pool_id > (u64)U32_MAX) {
3691 		rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3692 			(unsigned long long)parent_spec->pool_id, U32_MAX);
3693 		goto out_err;
3694 	}
3695 
3696 	image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3697 	if (IS_ERR(image_id)) {
3698 		ret = PTR_ERR(image_id);
3699 		goto out_err;
3700 	}
3701 	parent_spec->image_id = image_id;
3702 	ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3703 	ceph_decode_64_safe(&p, end, overlap, out_err);
3704 
3705 	rbd_dev->parent_overlap = overlap;
3706 	rbd_dev->parent_spec = parent_spec;
3707 	parent_spec = NULL;	/* rbd_dev now owns this */
3708 out:
3709 	ret = 0;
3710 out_err:
3711 	kfree(reply_buf);
3712 	rbd_spec_put(parent_spec);
3713 
3714 	return ret;
3715 }
3716 
3717 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3718 {
3719 	struct {
3720 		__le64 stripe_unit;
3721 		__le64 stripe_count;
3722 	} __attribute__ ((packed)) striping_info_buf = { 0 };
3723 	size_t size = sizeof (striping_info_buf);
3724 	void *p;
3725 	u64 obj_size;
3726 	u64 stripe_unit;
3727 	u64 stripe_count;
3728 	int ret;
3729 
3730 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3731 				"rbd", "get_stripe_unit_count", NULL, 0,
3732 				(char *)&striping_info_buf, size);
3733 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3734 	if (ret < 0)
3735 		return ret;
3736 	if (ret < size)
3737 		return -ERANGE;
3738 
3739 	/*
3740 	 * We don't actually support the "fancy striping" feature
3741 	 * (STRIPINGV2) yet, but if the striping sizes are the
3742 	 * defaults the behavior is the same as before.  So find
3743 	 * out, and only fail if the image has non-default values.
3744 	 */
3745 	ret = -EINVAL;
3746 	obj_size = (u64)1 << rbd_dev->header.obj_order;
3747 	p = &striping_info_buf;
3748 	stripe_unit = ceph_decode_64(&p);
3749 	if (stripe_unit != obj_size) {
3750 		rbd_warn(rbd_dev, "unsupported stripe unit "
3751 				"(got %llu want %llu)",
3752 				stripe_unit, obj_size);
3753 		return -EINVAL;
3754 	}
3755 	stripe_count = ceph_decode_64(&p);
3756 	if (stripe_count != 1) {
3757 		rbd_warn(rbd_dev, "unsupported stripe count "
3758 				"(got %llu want 1)", stripe_count);
3759 		return -EINVAL;
3760 	}
3761 	rbd_dev->header.stripe_unit = stripe_unit;
3762 	rbd_dev->header.stripe_count = stripe_count;
3763 
3764 	return 0;
3765 }
3766 
3767 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3768 {
3769 	size_t image_id_size;
3770 	char *image_id;
3771 	void *p;
3772 	void *end;
3773 	size_t size;
3774 	void *reply_buf = NULL;
3775 	size_t len = 0;
3776 	char *image_name = NULL;
3777 	int ret;
3778 
3779 	rbd_assert(!rbd_dev->spec->image_name);
3780 
3781 	len = strlen(rbd_dev->spec->image_id);
3782 	image_id_size = sizeof (__le32) + len;
3783 	image_id = kmalloc(image_id_size, GFP_KERNEL);
3784 	if (!image_id)
3785 		return NULL;
3786 
3787 	p = image_id;
3788 	end = image_id + image_id_size;
3789 	ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3790 
3791 	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3792 	reply_buf = kmalloc(size, GFP_KERNEL);
3793 	if (!reply_buf)
3794 		goto out;
3795 
3796 	ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3797 				"rbd", "dir_get_name",
3798 				image_id, image_id_size,
3799 				reply_buf, size);
3800 	if (ret < 0)
3801 		goto out;
3802 	p = reply_buf;
3803 	end = reply_buf + ret;
3804 
3805 	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3806 	if (IS_ERR(image_name))
3807 		image_name = NULL;
3808 	else
3809 		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3810 out:
3811 	kfree(reply_buf);
3812 	kfree(image_id);
3813 
3814 	return image_name;
3815 }
3816 
3817 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3818 {
3819 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3820 	const char *snap_name;
3821 	u32 which = 0;
3822 
3823 	/* Skip over names until we find the one we are looking for */
3824 
3825 	snap_name = rbd_dev->header.snap_names;
3826 	while (which < snapc->num_snaps) {
3827 		if (!strcmp(name, snap_name))
3828 			return snapc->snaps[which];
3829 		snap_name += strlen(snap_name) + 1;
3830 		which++;
3831 	}
3832 	return CEPH_NOSNAP;
3833 }
3834 
3835 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3836 {
3837 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3838 	u32 which;
3839 	bool found = false;
3840 	u64 snap_id;
3841 
3842 	for (which = 0; !found && which < snapc->num_snaps; which++) {
3843 		const char *snap_name;
3844 
3845 		snap_id = snapc->snaps[which];
3846 		snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3847 		if (IS_ERR(snap_name))
3848 			break;
3849 		found = !strcmp(name, snap_name);
3850 		kfree(snap_name);
3851 	}
3852 	return found ? snap_id : CEPH_NOSNAP;
3853 }
3854 
3855 /*
3856  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3857  * no snapshot by that name is found, or if an error occurs.
3858  */
3859 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3860 {
3861 	if (rbd_dev->image_format == 1)
3862 		return rbd_v1_snap_id_by_name(rbd_dev, name);
3863 
3864 	return rbd_v2_snap_id_by_name(rbd_dev, name);
3865 }
3866 
3867 /*
3868  * When an rbd image has a parent image, it is identified by the
3869  * pool, image, and snapshot ids (not names).  This function fills
3870  * in the names for those ids.  (It's OK if we can't figure out the
3871  * name for an image id, but the pool and snapshot ids should always
3872  * exist and have names.)  All names in an rbd spec are dynamically
3873  * allocated.
3874  *
3875  * When an image being mapped (not a parent) is probed, we have the
3876  * pool name and pool id, image name and image id, and the snapshot
3877  * name.  The only thing we're missing is the snapshot id.
3878  */
3879 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3880 {
3881 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3882 	struct rbd_spec *spec = rbd_dev->spec;
3883 	const char *pool_name;
3884 	const char *image_name;
3885 	const char *snap_name;
3886 	int ret;
3887 
3888 	/*
3889 	 * An image being mapped will have the pool name (etc.), but
3890 	 * we need to look up the snapshot id.
3891 	 */
3892 	if (spec->pool_name) {
3893 		if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3894 			u64 snap_id;
3895 
3896 			snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3897 			if (snap_id == CEPH_NOSNAP)
3898 				return -ENOENT;
3899 			spec->snap_id = snap_id;
3900 		} else {
3901 			spec->snap_id = CEPH_NOSNAP;
3902 		}
3903 
3904 		return 0;
3905 	}
3906 
3907 	/* Get the pool name; we have to make our own copy of this */
3908 
3909 	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3910 	if (!pool_name) {
3911 		rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3912 		return -EIO;
3913 	}
3914 	pool_name = kstrdup(pool_name, GFP_KERNEL);
3915 	if (!pool_name)
3916 		return -ENOMEM;
3917 
3918 	/* Fetch the image name; tolerate failure here */
3919 
3920 	image_name = rbd_dev_image_name(rbd_dev);
3921 	if (!image_name)
3922 		rbd_warn(rbd_dev, "unable to get image name");
3923 
3924 	/* Look up the snapshot name, and make a copy */
3925 
3926 	snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3927 	if (!snap_name) {
3928 		ret = -ENOMEM;
3929 		goto out_err;
3930 	}
3931 
3932 	spec->pool_name = pool_name;
3933 	spec->image_name = image_name;
3934 	spec->snap_name = snap_name;
3935 
3936 	return 0;
3937 out_err:
3938 	kfree(image_name);
3939 	kfree(pool_name);
3940 
3941 	return ret;
3942 }
3943 
3944 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3945 {
3946 	size_t size;
3947 	int ret;
3948 	void *reply_buf;
3949 	void *p;
3950 	void *end;
3951 	u64 seq;
3952 	u32 snap_count;
3953 	struct ceph_snap_context *snapc;
3954 	u32 i;
3955 
3956 	/*
3957 	 * We'll need room for the seq value (maximum snapshot id),
3958 	 * snapshot count, and array of that many snapshot ids.
3959 	 * For now we have a fixed upper limit on the number we're
3960 	 * prepared to receive.
3961 	 */
3962 	size = sizeof (__le64) + sizeof (__le32) +
3963 			RBD_MAX_SNAP_COUNT * sizeof (__le64);
3964 	reply_buf = kzalloc(size, GFP_KERNEL);
3965 	if (!reply_buf)
3966 		return -ENOMEM;
3967 
3968 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3969 				"rbd", "get_snapcontext", NULL, 0,
3970 				reply_buf, size);
3971 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3972 	if (ret < 0)
3973 		goto out;
3974 
3975 	p = reply_buf;
3976 	end = reply_buf + ret;
3977 	ret = -ERANGE;
3978 	ceph_decode_64_safe(&p, end, seq, out);
3979 	ceph_decode_32_safe(&p, end, snap_count, out);
3980 
3981 	/*
3982 	 * Make sure the reported number of snapshot ids wouldn't go
3983 	 * beyond the end of our buffer.  But before checking that,
3984 	 * make sure the computed size of the snapshot context we
3985 	 * allocate is representable in a size_t.
3986 	 */
3987 	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3988 				 / sizeof (u64)) {
3989 		ret = -EINVAL;
3990 		goto out;
3991 	}
3992 	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3993 		goto out;
3994 	ret = 0;
3995 
3996 	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3997 	if (!snapc) {
3998 		ret = -ENOMEM;
3999 		goto out;
4000 	}
4001 	snapc->seq = seq;
4002 	for (i = 0; i < snap_count; i++)
4003 		snapc->snaps[i] = ceph_decode_64(&p);
4004 
4005 	rbd_dev->header.snapc = snapc;
4006 
4007 	dout("  snap context seq = %llu, snap_count = %u\n",
4008 		(unsigned long long)seq, (unsigned int)snap_count);
4009 out:
4010 	kfree(reply_buf);
4011 
4012 	return ret;
4013 }
4014 
4015 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4016 					u64 snap_id)
4017 {
4018 	size_t size;
4019 	void *reply_buf;
4020 	__le64 snapid;
4021 	int ret;
4022 	void *p;
4023 	void *end;
4024 	char *snap_name;
4025 
4026 	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4027 	reply_buf = kmalloc(size, GFP_KERNEL);
4028 	if (!reply_buf)
4029 		return ERR_PTR(-ENOMEM);
4030 
4031 	snapid = cpu_to_le64(snap_id);
4032 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4033 				"rbd", "get_snapshot_name",
4034 				&snapid, sizeof (snapid),
4035 				reply_buf, size);
4036 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4037 	if (ret < 0) {
4038 		snap_name = ERR_PTR(ret);
4039 		goto out;
4040 	}
4041 
4042 	p = reply_buf;
4043 	end = reply_buf + ret;
4044 	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4045 	if (IS_ERR(snap_name))
4046 		goto out;
4047 
4048 	dout("  snap_id 0x%016llx snap_name = %s\n",
4049 		(unsigned long long)snap_id, snap_name);
4050 out:
4051 	kfree(reply_buf);
4052 
4053 	return snap_name;
4054 }
4055 
4056 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4057 {
4058 	int ret;
4059 
4060 	down_write(&rbd_dev->header_rwsem);
4061 
4062 	ret = rbd_dev_v2_image_size(rbd_dev);
4063 	if (ret)
4064 		goto out;
4065 	rbd_update_mapping_size(rbd_dev);
4066 
4067 	ret = rbd_dev_v2_snap_context(rbd_dev);
4068 	dout("rbd_dev_v2_snap_context returned %d\n", ret);
4069 	if (ret)
4070 		goto out;
4071 out:
4072 	up_write(&rbd_dev->header_rwsem);
4073 
4074 	return ret;
4075 }
4076 
4077 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4078 {
4079 	struct device *dev;
4080 	int ret;
4081 
4082 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4083 
4084 	dev = &rbd_dev->dev;
4085 	dev->bus = &rbd_bus_type;
4086 	dev->type = &rbd_device_type;
4087 	dev->parent = &rbd_root_dev;
4088 	dev->release = rbd_dev_device_release;
4089 	dev_set_name(dev, "%d", rbd_dev->dev_id);
4090 	ret = device_register(dev);
4091 
4092 	mutex_unlock(&ctl_mutex);
4093 
4094 	return ret;
4095 }
4096 
4097 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4098 {
4099 	device_unregister(&rbd_dev->dev);
4100 }
4101 
4102 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4103 
4104 /*
4105  * Get a unique rbd identifier for the given new rbd_dev, and add
4106  * the rbd_dev to the global list.  The minimum rbd id is 1.
4107  */
4108 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4109 {
4110 	rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4111 
4112 	spin_lock(&rbd_dev_list_lock);
4113 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
4114 	spin_unlock(&rbd_dev_list_lock);
4115 	dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4116 		(unsigned long long) rbd_dev->dev_id);
4117 }
4118 
4119 /*
4120  * Remove an rbd_dev from the global list, and record that its
4121  * identifier is no longer in use.
4122  */
4123 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4124 {
4125 	struct list_head *tmp;
4126 	int rbd_id = rbd_dev->dev_id;
4127 	int max_id;
4128 
4129 	rbd_assert(rbd_id > 0);
4130 
4131 	dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4132 		(unsigned long long) rbd_dev->dev_id);
4133 	spin_lock(&rbd_dev_list_lock);
4134 	list_del_init(&rbd_dev->node);
4135 
4136 	/*
4137 	 * If the id being "put" is not the current maximum, there
4138 	 * is nothing special we need to do.
4139 	 */
4140 	if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4141 		spin_unlock(&rbd_dev_list_lock);
4142 		return;
4143 	}
4144 
4145 	/*
4146 	 * We need to update the current maximum id.  Search the
4147 	 * list to find out what it is.  We're more likely to find
4148 	 * the maximum at the end, so search the list backward.
4149 	 */
4150 	max_id = 0;
4151 	list_for_each_prev(tmp, &rbd_dev_list) {
4152 		struct rbd_device *rbd_dev;
4153 
4154 		rbd_dev = list_entry(tmp, struct rbd_device, node);
4155 		if (rbd_dev->dev_id > max_id)
4156 			max_id = rbd_dev->dev_id;
4157 	}
4158 	spin_unlock(&rbd_dev_list_lock);
4159 
4160 	/*
4161 	 * The max id could have been updated by rbd_dev_id_get(), in
4162 	 * which case it now accurately reflects the new maximum.
4163 	 * Be careful not to overwrite the maximum value in that
4164 	 * case.
4165 	 */
4166 	atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4167 	dout("  max dev id has been reset\n");
4168 }
4169 
4170 /*
4171  * Skips over white space at *buf, and updates *buf to point to the
4172  * first found non-space character (if any). Returns the length of
4173  * the token (string of non-white space characters) found.  Note
4174  * that *buf must be terminated with '\0'.
4175  */
4176 static inline size_t next_token(const char **buf)
4177 {
4178         /*
4179         * These are the characters that produce nonzero for
4180         * isspace() in the "C" and "POSIX" locales.
4181         */
4182         const char *spaces = " \f\n\r\t\v";
4183 
4184         *buf += strspn(*buf, spaces);	/* Find start of token */
4185 
4186 	return strcspn(*buf, spaces);   /* Return token length */
4187 }
4188 
4189 /*
4190  * Finds the next token in *buf, and if the provided token buffer is
4191  * big enough, copies the found token into it.  The result, if
4192  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4193  * must be terminated with '\0' on entry.
4194  *
4195  * Returns the length of the token found (not including the '\0').
4196  * Return value will be 0 if no token is found, and it will be >=
4197  * token_size if the token would not fit.
4198  *
4199  * The *buf pointer will be updated to point beyond the end of the
4200  * found token.  Note that this occurs even if the token buffer is
4201  * too small to hold it.
4202  */
4203 static inline size_t copy_token(const char **buf,
4204 				char *token,
4205 				size_t token_size)
4206 {
4207         size_t len;
4208 
4209 	len = next_token(buf);
4210 	if (len < token_size) {
4211 		memcpy(token, *buf, len);
4212 		*(token + len) = '\0';
4213 	}
4214 	*buf += len;
4215 
4216         return len;
4217 }
4218 
4219 /*
4220  * Finds the next token in *buf, dynamically allocates a buffer big
4221  * enough to hold a copy of it, and copies the token into the new
4222  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4223  * that a duplicate buffer is created even for a zero-length token.
4224  *
4225  * Returns a pointer to the newly-allocated duplicate, or a null
4226  * pointer if memory for the duplicate was not available.  If
4227  * the lenp argument is a non-null pointer, the length of the token
4228  * (not including the '\0') is returned in *lenp.
4229  *
4230  * If successful, the *buf pointer will be updated to point beyond
4231  * the end of the found token.
4232  *
4233  * Note: uses GFP_KERNEL for allocation.
4234  */
4235 static inline char *dup_token(const char **buf, size_t *lenp)
4236 {
4237 	char *dup;
4238 	size_t len;
4239 
4240 	len = next_token(buf);
4241 	dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4242 	if (!dup)
4243 		return NULL;
4244 	*(dup + len) = '\0';
4245 	*buf += len;
4246 
4247 	if (lenp)
4248 		*lenp = len;
4249 
4250 	return dup;
4251 }
4252 
4253 /*
4254  * Parse the options provided for an "rbd add" (i.e., rbd image
4255  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4256  * and the data written is passed here via a NUL-terminated buffer.
4257  * Returns 0 if successful or an error code otherwise.
4258  *
4259  * The information extracted from these options is recorded in
4260  * the other parameters which return dynamically-allocated
4261  * structures:
4262  *  ceph_opts
4263  *      The address of a pointer that will refer to a ceph options
4264  *      structure.  Caller must release the returned pointer using
4265  *      ceph_destroy_options() when it is no longer needed.
4266  *  rbd_opts
4267  *	Address of an rbd options pointer.  Fully initialized by
4268  *	this function; caller must release with kfree().
4269  *  spec
4270  *	Address of an rbd image specification pointer.  Fully
4271  *	initialized by this function based on parsed options.
4272  *	Caller must release with rbd_spec_put().
4273  *
4274  * The options passed take this form:
4275  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4276  * where:
4277  *  <mon_addrs>
4278  *      A comma-separated list of one or more monitor addresses.
4279  *      A monitor address is an ip address, optionally followed
4280  *      by a port number (separated by a colon).
4281  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4282  *  <options>
4283  *      A comma-separated list of ceph and/or rbd options.
4284  *  <pool_name>
4285  *      The name of the rados pool containing the rbd image.
4286  *  <image_name>
4287  *      The name of the image in that pool to map.
4288  *  <snap_id>
4289  *      An optional snapshot id.  If provided, the mapping will
4290  *      present data from the image at the time that snapshot was
4291  *      created.  The image head is used if no snapshot id is
4292  *      provided.  Snapshot mappings are always read-only.
4293  */
4294 static int rbd_add_parse_args(const char *buf,
4295 				struct ceph_options **ceph_opts,
4296 				struct rbd_options **opts,
4297 				struct rbd_spec **rbd_spec)
4298 {
4299 	size_t len;
4300 	char *options;
4301 	const char *mon_addrs;
4302 	char *snap_name;
4303 	size_t mon_addrs_size;
4304 	struct rbd_spec *spec = NULL;
4305 	struct rbd_options *rbd_opts = NULL;
4306 	struct ceph_options *copts;
4307 	int ret;
4308 
4309 	/* The first four tokens are required */
4310 
4311 	len = next_token(&buf);
4312 	if (!len) {
4313 		rbd_warn(NULL, "no monitor address(es) provided");
4314 		return -EINVAL;
4315 	}
4316 	mon_addrs = buf;
4317 	mon_addrs_size = len + 1;
4318 	buf += len;
4319 
4320 	ret = -EINVAL;
4321 	options = dup_token(&buf, NULL);
4322 	if (!options)
4323 		return -ENOMEM;
4324 	if (!*options) {
4325 		rbd_warn(NULL, "no options provided");
4326 		goto out_err;
4327 	}
4328 
4329 	spec = rbd_spec_alloc();
4330 	if (!spec)
4331 		goto out_mem;
4332 
4333 	spec->pool_name = dup_token(&buf, NULL);
4334 	if (!spec->pool_name)
4335 		goto out_mem;
4336 	if (!*spec->pool_name) {
4337 		rbd_warn(NULL, "no pool name provided");
4338 		goto out_err;
4339 	}
4340 
4341 	spec->image_name = dup_token(&buf, NULL);
4342 	if (!spec->image_name)
4343 		goto out_mem;
4344 	if (!*spec->image_name) {
4345 		rbd_warn(NULL, "no image name provided");
4346 		goto out_err;
4347 	}
4348 
4349 	/*
4350 	 * Snapshot name is optional; default is to use "-"
4351 	 * (indicating the head/no snapshot).
4352 	 */
4353 	len = next_token(&buf);
4354 	if (!len) {
4355 		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4356 		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4357 	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
4358 		ret = -ENAMETOOLONG;
4359 		goto out_err;
4360 	}
4361 	snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4362 	if (!snap_name)
4363 		goto out_mem;
4364 	*(snap_name + len) = '\0';
4365 	spec->snap_name = snap_name;
4366 
4367 	/* Initialize all rbd options to the defaults */
4368 
4369 	rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4370 	if (!rbd_opts)
4371 		goto out_mem;
4372 
4373 	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4374 
4375 	copts = ceph_parse_options(options, mon_addrs,
4376 					mon_addrs + mon_addrs_size - 1,
4377 					parse_rbd_opts_token, rbd_opts);
4378 	if (IS_ERR(copts)) {
4379 		ret = PTR_ERR(copts);
4380 		goto out_err;
4381 	}
4382 	kfree(options);
4383 
4384 	*ceph_opts = copts;
4385 	*opts = rbd_opts;
4386 	*rbd_spec = spec;
4387 
4388 	return 0;
4389 out_mem:
4390 	ret = -ENOMEM;
4391 out_err:
4392 	kfree(rbd_opts);
4393 	rbd_spec_put(spec);
4394 	kfree(options);
4395 
4396 	return ret;
4397 }
4398 
4399 /*
4400  * An rbd format 2 image has a unique identifier, distinct from the
4401  * name given to it by the user.  Internally, that identifier is
4402  * what's used to specify the names of objects related to the image.
4403  *
4404  * A special "rbd id" object is used to map an rbd image name to its
4405  * id.  If that object doesn't exist, then there is no v2 rbd image
4406  * with the supplied name.
4407  *
4408  * This function will record the given rbd_dev's image_id field if
4409  * it can be determined, and in that case will return 0.  If any
4410  * errors occur a negative errno will be returned and the rbd_dev's
4411  * image_id field will be unchanged (and should be NULL).
4412  */
4413 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4414 {
4415 	int ret;
4416 	size_t size;
4417 	char *object_name;
4418 	void *response;
4419 	char *image_id;
4420 
4421 	/*
4422 	 * When probing a parent image, the image id is already
4423 	 * known (and the image name likely is not).  There's no
4424 	 * need to fetch the image id again in this case.  We
4425 	 * do still need to set the image format though.
4426 	 */
4427 	if (rbd_dev->spec->image_id) {
4428 		rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4429 
4430 		return 0;
4431 	}
4432 
4433 	/*
4434 	 * First, see if the format 2 image id file exists, and if
4435 	 * so, get the image's persistent id from it.
4436 	 */
4437 	size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4438 	object_name = kmalloc(size, GFP_NOIO);
4439 	if (!object_name)
4440 		return -ENOMEM;
4441 	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4442 	dout("rbd id object name is %s\n", object_name);
4443 
4444 	/* Response will be an encoded string, which includes a length */
4445 
4446 	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4447 	response = kzalloc(size, GFP_NOIO);
4448 	if (!response) {
4449 		ret = -ENOMEM;
4450 		goto out;
4451 	}
4452 
4453 	/* If it doesn't exist we'll assume it's a format 1 image */
4454 
4455 	ret = rbd_obj_method_sync(rbd_dev, object_name,
4456 				"rbd", "get_id", NULL, 0,
4457 				response, RBD_IMAGE_ID_LEN_MAX);
4458 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4459 	if (ret == -ENOENT) {
4460 		image_id = kstrdup("", GFP_KERNEL);
4461 		ret = image_id ? 0 : -ENOMEM;
4462 		if (!ret)
4463 			rbd_dev->image_format = 1;
4464 	} else if (ret > sizeof (__le32)) {
4465 		void *p = response;
4466 
4467 		image_id = ceph_extract_encoded_string(&p, p + ret,
4468 						NULL, GFP_NOIO);
4469 		ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4470 		if (!ret)
4471 			rbd_dev->image_format = 2;
4472 	} else {
4473 		ret = -EINVAL;
4474 	}
4475 
4476 	if (!ret) {
4477 		rbd_dev->spec->image_id = image_id;
4478 		dout("image_id is %s\n", image_id);
4479 	}
4480 out:
4481 	kfree(response);
4482 	kfree(object_name);
4483 
4484 	return ret;
4485 }
4486 
4487 /* Undo whatever state changes are made by v1 or v2 image probe */
4488 
4489 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4490 {
4491 	struct rbd_image_header	*header;
4492 
4493 	rbd_dev_remove_parent(rbd_dev);
4494 	rbd_spec_put(rbd_dev->parent_spec);
4495 	rbd_dev->parent_spec = NULL;
4496 	rbd_dev->parent_overlap = 0;
4497 
4498 	/* Free dynamic fields from the header, then zero it out */
4499 
4500 	header = &rbd_dev->header;
4501 	ceph_put_snap_context(header->snapc);
4502 	kfree(header->snap_sizes);
4503 	kfree(header->snap_names);
4504 	kfree(header->object_prefix);
4505 	memset(header, 0, sizeof (*header));
4506 }
4507 
4508 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4509 {
4510 	int ret;
4511 
4512 	/* Populate rbd image metadata */
4513 
4514 	ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4515 	if (ret < 0)
4516 		goto out_err;
4517 
4518 	/* Version 1 images have no parent (no layering) */
4519 
4520 	rbd_dev->parent_spec = NULL;
4521 	rbd_dev->parent_overlap = 0;
4522 
4523 	dout("discovered version 1 image, header name is %s\n",
4524 		rbd_dev->header_name);
4525 
4526 	return 0;
4527 
4528 out_err:
4529 	kfree(rbd_dev->header_name);
4530 	rbd_dev->header_name = NULL;
4531 	kfree(rbd_dev->spec->image_id);
4532 	rbd_dev->spec->image_id = NULL;
4533 
4534 	return ret;
4535 }
4536 
4537 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4538 {
4539 	int ret;
4540 
4541 	ret = rbd_dev_v2_image_size(rbd_dev);
4542 	if (ret)
4543 		goto out_err;
4544 
4545 	/* Get the object prefix (a.k.a. block_name) for the image */
4546 
4547 	ret = rbd_dev_v2_object_prefix(rbd_dev);
4548 	if (ret)
4549 		goto out_err;
4550 
4551 	/* Get the and check features for the image */
4552 
4553 	ret = rbd_dev_v2_features(rbd_dev);
4554 	if (ret)
4555 		goto out_err;
4556 
4557 	/* If the image supports layering, get the parent info */
4558 
4559 	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4560 		ret = rbd_dev_v2_parent_info(rbd_dev);
4561 		if (ret)
4562 			goto out_err;
4563 
4564 		/*
4565 		 * Don't print a warning for parent images.  We can
4566 		 * tell this point because we won't know its pool
4567 		 * name yet (just its pool id).
4568 		 */
4569 		if (rbd_dev->spec->pool_name)
4570 			rbd_warn(rbd_dev, "WARNING: kernel layering "
4571 					"is EXPERIMENTAL!");
4572 	}
4573 
4574 	/* If the image supports fancy striping, get its parameters */
4575 
4576 	if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4577 		ret = rbd_dev_v2_striping_info(rbd_dev);
4578 		if (ret < 0)
4579 			goto out_err;
4580 	}
4581 
4582 	/* crypto and compression type aren't (yet) supported for v2 images */
4583 
4584 	rbd_dev->header.crypt_type = 0;
4585 	rbd_dev->header.comp_type = 0;
4586 
4587 	/* Get the snapshot context, plus the header version */
4588 
4589 	ret = rbd_dev_v2_snap_context(rbd_dev);
4590 	if (ret)
4591 		goto out_err;
4592 
4593 	dout("discovered version 2 image, header name is %s\n",
4594 		rbd_dev->header_name);
4595 
4596 	return 0;
4597 out_err:
4598 	rbd_dev->parent_overlap = 0;
4599 	rbd_spec_put(rbd_dev->parent_spec);
4600 	rbd_dev->parent_spec = NULL;
4601 	kfree(rbd_dev->header_name);
4602 	rbd_dev->header_name = NULL;
4603 	kfree(rbd_dev->header.object_prefix);
4604 	rbd_dev->header.object_prefix = NULL;
4605 
4606 	return ret;
4607 }
4608 
4609 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4610 {
4611 	struct rbd_device *parent = NULL;
4612 	struct rbd_spec *parent_spec;
4613 	struct rbd_client *rbdc;
4614 	int ret;
4615 
4616 	if (!rbd_dev->parent_spec)
4617 		return 0;
4618 	/*
4619 	 * We need to pass a reference to the client and the parent
4620 	 * spec when creating the parent rbd_dev.  Images related by
4621 	 * parent/child relationships always share both.
4622 	 */
4623 	parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4624 	rbdc = __rbd_get_client(rbd_dev->rbd_client);
4625 
4626 	ret = -ENOMEM;
4627 	parent = rbd_dev_create(rbdc, parent_spec);
4628 	if (!parent)
4629 		goto out_err;
4630 
4631 	ret = rbd_dev_image_probe(parent);
4632 	if (ret < 0)
4633 		goto out_err;
4634 	rbd_dev->parent = parent;
4635 
4636 	return 0;
4637 out_err:
4638 	if (parent) {
4639 		rbd_spec_put(rbd_dev->parent_spec);
4640 		kfree(rbd_dev->header_name);
4641 		rbd_dev_destroy(parent);
4642 	} else {
4643 		rbd_put_client(rbdc);
4644 		rbd_spec_put(parent_spec);
4645 	}
4646 
4647 	return ret;
4648 }
4649 
4650 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4651 {
4652 	int ret;
4653 
4654 	ret = rbd_dev_mapping_set(rbd_dev);
4655 	if (ret)
4656 		return ret;
4657 
4658 	/* generate unique id: find highest unique id, add one */
4659 	rbd_dev_id_get(rbd_dev);
4660 
4661 	/* Fill in the device name, now that we have its id. */
4662 	BUILD_BUG_ON(DEV_NAME_LEN
4663 			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4664 	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4665 
4666 	/* Get our block major device number. */
4667 
4668 	ret = register_blkdev(0, rbd_dev->name);
4669 	if (ret < 0)
4670 		goto err_out_id;
4671 	rbd_dev->major = ret;
4672 
4673 	/* Set up the blkdev mapping. */
4674 
4675 	ret = rbd_init_disk(rbd_dev);
4676 	if (ret)
4677 		goto err_out_blkdev;
4678 
4679 	ret = rbd_bus_add_dev(rbd_dev);
4680 	if (ret)
4681 		goto err_out_disk;
4682 
4683 	/* Everything's ready.  Announce the disk to the world. */
4684 
4685 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4686 	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4687 	add_disk(rbd_dev->disk);
4688 
4689 	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4690 		(unsigned long long) rbd_dev->mapping.size);
4691 
4692 	return ret;
4693 
4694 err_out_disk:
4695 	rbd_free_disk(rbd_dev);
4696 err_out_blkdev:
4697 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
4698 err_out_id:
4699 	rbd_dev_id_put(rbd_dev);
4700 	rbd_dev_mapping_clear(rbd_dev);
4701 
4702 	return ret;
4703 }
4704 
4705 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4706 {
4707 	struct rbd_spec *spec = rbd_dev->spec;
4708 	size_t size;
4709 
4710 	/* Record the header object name for this rbd image. */
4711 
4712 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4713 
4714 	if (rbd_dev->image_format == 1)
4715 		size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4716 	else
4717 		size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4718 
4719 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4720 	if (!rbd_dev->header_name)
4721 		return -ENOMEM;
4722 
4723 	if (rbd_dev->image_format == 1)
4724 		sprintf(rbd_dev->header_name, "%s%s",
4725 			spec->image_name, RBD_SUFFIX);
4726 	else
4727 		sprintf(rbd_dev->header_name, "%s%s",
4728 			RBD_HEADER_PREFIX, spec->image_id);
4729 	return 0;
4730 }
4731 
4732 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4733 {
4734 	int ret;
4735 
4736 	rbd_dev_unprobe(rbd_dev);
4737 	ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4738 	if (ret)
4739 		rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4740 	kfree(rbd_dev->header_name);
4741 	rbd_dev->header_name = NULL;
4742 	rbd_dev->image_format = 0;
4743 	kfree(rbd_dev->spec->image_id);
4744 	rbd_dev->spec->image_id = NULL;
4745 
4746 	rbd_dev_destroy(rbd_dev);
4747 }
4748 
4749 /*
4750  * Probe for the existence of the header object for the given rbd
4751  * device.  For format 2 images this includes determining the image
4752  * id.
4753  */
4754 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4755 {
4756 	int ret;
4757 	int tmp;
4758 
4759 	/*
4760 	 * Get the id from the image id object.  If it's not a
4761 	 * format 2 image, we'll get ENOENT back, and we'll assume
4762 	 * it's a format 1 image.
4763 	 */
4764 	ret = rbd_dev_image_id(rbd_dev);
4765 	if (ret)
4766 		return ret;
4767 	rbd_assert(rbd_dev->spec->image_id);
4768 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4769 
4770 	ret = rbd_dev_header_name(rbd_dev);
4771 	if (ret)
4772 		goto err_out_format;
4773 
4774 	ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4775 	if (ret)
4776 		goto out_header_name;
4777 
4778 	if (rbd_dev->image_format == 1)
4779 		ret = rbd_dev_v1_probe(rbd_dev);
4780 	else
4781 		ret = rbd_dev_v2_probe(rbd_dev);
4782 	if (ret)
4783 		goto err_out_watch;
4784 
4785 	ret = rbd_dev_spec_update(rbd_dev);
4786 	if (ret)
4787 		goto err_out_probe;
4788 
4789 	ret = rbd_dev_probe_parent(rbd_dev);
4790 	if (!ret)
4791 		return 0;
4792 
4793 err_out_probe:
4794 	rbd_dev_unprobe(rbd_dev);
4795 err_out_watch:
4796 	tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4797 	if (tmp)
4798 		rbd_warn(rbd_dev, "unable to tear down watch request\n");
4799 out_header_name:
4800 	kfree(rbd_dev->header_name);
4801 	rbd_dev->header_name = NULL;
4802 err_out_format:
4803 	rbd_dev->image_format = 0;
4804 	kfree(rbd_dev->spec->image_id);
4805 	rbd_dev->spec->image_id = NULL;
4806 
4807 	dout("probe failed, returning %d\n", ret);
4808 
4809 	return ret;
4810 }
4811 
4812 static ssize_t rbd_add(struct bus_type *bus,
4813 		       const char *buf,
4814 		       size_t count)
4815 {
4816 	struct rbd_device *rbd_dev = NULL;
4817 	struct ceph_options *ceph_opts = NULL;
4818 	struct rbd_options *rbd_opts = NULL;
4819 	struct rbd_spec *spec = NULL;
4820 	struct rbd_client *rbdc;
4821 	struct ceph_osd_client *osdc;
4822 	int rc = -ENOMEM;
4823 
4824 	if (!try_module_get(THIS_MODULE))
4825 		return -ENODEV;
4826 
4827 	/* parse add command */
4828 	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4829 	if (rc < 0)
4830 		goto err_out_module;
4831 
4832 	rbdc = rbd_get_client(ceph_opts);
4833 	if (IS_ERR(rbdc)) {
4834 		rc = PTR_ERR(rbdc);
4835 		goto err_out_args;
4836 	}
4837 	ceph_opts = NULL;	/* rbd_dev client now owns this */
4838 
4839 	/* pick the pool */
4840 	osdc = &rbdc->client->osdc;
4841 	rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4842 	if (rc < 0)
4843 		goto err_out_client;
4844 	spec->pool_id = (u64)rc;
4845 
4846 	/* The ceph file layout needs to fit pool id in 32 bits */
4847 
4848 	if (spec->pool_id > (u64)U32_MAX) {
4849 		rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4850 				(unsigned long long)spec->pool_id, U32_MAX);
4851 		rc = -EIO;
4852 		goto err_out_client;
4853 	}
4854 
4855 	rbd_dev = rbd_dev_create(rbdc, spec);
4856 	if (!rbd_dev)
4857 		goto err_out_client;
4858 	rbdc = NULL;		/* rbd_dev now owns this */
4859 	spec = NULL;		/* rbd_dev now owns this */
4860 
4861 	rbd_dev->mapping.read_only = rbd_opts->read_only;
4862 	kfree(rbd_opts);
4863 	rbd_opts = NULL;	/* done with this */
4864 
4865 	rc = rbd_dev_image_probe(rbd_dev);
4866 	if (rc < 0)
4867 		goto err_out_rbd_dev;
4868 
4869 	rc = rbd_dev_device_setup(rbd_dev);
4870 	if (!rc)
4871 		return count;
4872 
4873 	rbd_dev_image_release(rbd_dev);
4874 err_out_rbd_dev:
4875 	rbd_dev_destroy(rbd_dev);
4876 err_out_client:
4877 	rbd_put_client(rbdc);
4878 err_out_args:
4879 	if (ceph_opts)
4880 		ceph_destroy_options(ceph_opts);
4881 	kfree(rbd_opts);
4882 	rbd_spec_put(spec);
4883 err_out_module:
4884 	module_put(THIS_MODULE);
4885 
4886 	dout("Error adding device %s\n", buf);
4887 
4888 	return (ssize_t)rc;
4889 }
4890 
4891 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4892 {
4893 	struct list_head *tmp;
4894 	struct rbd_device *rbd_dev;
4895 
4896 	spin_lock(&rbd_dev_list_lock);
4897 	list_for_each(tmp, &rbd_dev_list) {
4898 		rbd_dev = list_entry(tmp, struct rbd_device, node);
4899 		if (rbd_dev->dev_id == dev_id) {
4900 			spin_unlock(&rbd_dev_list_lock);
4901 			return rbd_dev;
4902 		}
4903 	}
4904 	spin_unlock(&rbd_dev_list_lock);
4905 	return NULL;
4906 }
4907 
4908 static void rbd_dev_device_release(struct device *dev)
4909 {
4910 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4911 
4912 	rbd_free_disk(rbd_dev);
4913 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4914 	rbd_dev_clear_mapping(rbd_dev);
4915 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
4916 	rbd_dev->major = 0;
4917 	rbd_dev_id_put(rbd_dev);
4918 	rbd_dev_mapping_clear(rbd_dev);
4919 }
4920 
4921 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4922 {
4923 	while (rbd_dev->parent) {
4924 		struct rbd_device *first = rbd_dev;
4925 		struct rbd_device *second = first->parent;
4926 		struct rbd_device *third;
4927 
4928 		/*
4929 		 * Follow to the parent with no grandparent and
4930 		 * remove it.
4931 		 */
4932 		while (second && (third = second->parent)) {
4933 			first = second;
4934 			second = third;
4935 		}
4936 		rbd_assert(second);
4937 		rbd_dev_image_release(second);
4938 		first->parent = NULL;
4939 		first->parent_overlap = 0;
4940 
4941 		rbd_assert(first->parent_spec);
4942 		rbd_spec_put(first->parent_spec);
4943 		first->parent_spec = NULL;
4944 	}
4945 }
4946 
4947 static ssize_t rbd_remove(struct bus_type *bus,
4948 			  const char *buf,
4949 			  size_t count)
4950 {
4951 	struct rbd_device *rbd_dev = NULL;
4952 	int target_id;
4953 	unsigned long ul;
4954 	int ret;
4955 
4956 	ret = strict_strtoul(buf, 10, &ul);
4957 	if (ret)
4958 		return ret;
4959 
4960 	/* convert to int; abort if we lost anything in the conversion */
4961 	target_id = (int) ul;
4962 	if (target_id != ul)
4963 		return -EINVAL;
4964 
4965 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4966 
4967 	rbd_dev = __rbd_get_dev(target_id);
4968 	if (!rbd_dev) {
4969 		ret = -ENOENT;
4970 		goto done;
4971 	}
4972 
4973 	spin_lock_irq(&rbd_dev->lock);
4974 	if (rbd_dev->open_count)
4975 		ret = -EBUSY;
4976 	else
4977 		set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4978 	spin_unlock_irq(&rbd_dev->lock);
4979 	if (ret < 0)
4980 		goto done;
4981 	ret = count;
4982 	rbd_bus_del_dev(rbd_dev);
4983 	rbd_dev_image_release(rbd_dev);
4984 	module_put(THIS_MODULE);
4985 done:
4986 	mutex_unlock(&ctl_mutex);
4987 
4988 	return ret;
4989 }
4990 
4991 /*
4992  * create control files in sysfs
4993  * /sys/bus/rbd/...
4994  */
4995 static int rbd_sysfs_init(void)
4996 {
4997 	int ret;
4998 
4999 	ret = device_register(&rbd_root_dev);
5000 	if (ret < 0)
5001 		return ret;
5002 
5003 	ret = bus_register(&rbd_bus_type);
5004 	if (ret < 0)
5005 		device_unregister(&rbd_root_dev);
5006 
5007 	return ret;
5008 }
5009 
5010 static void rbd_sysfs_cleanup(void)
5011 {
5012 	bus_unregister(&rbd_bus_type);
5013 	device_unregister(&rbd_root_dev);
5014 }
5015 
5016 static int rbd_slab_init(void)
5017 {
5018 	rbd_assert(!rbd_img_request_cache);
5019 	rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5020 					sizeof (struct rbd_img_request),
5021 					__alignof__(struct rbd_img_request),
5022 					0, NULL);
5023 	if (!rbd_img_request_cache)
5024 		return -ENOMEM;
5025 
5026 	rbd_assert(!rbd_obj_request_cache);
5027 	rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5028 					sizeof (struct rbd_obj_request),
5029 					__alignof__(struct rbd_obj_request),
5030 					0, NULL);
5031 	if (!rbd_obj_request_cache)
5032 		goto out_err;
5033 
5034 	rbd_assert(!rbd_segment_name_cache);
5035 	rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5036 					MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5037 	if (rbd_segment_name_cache)
5038 		return 0;
5039 out_err:
5040 	if (rbd_obj_request_cache) {
5041 		kmem_cache_destroy(rbd_obj_request_cache);
5042 		rbd_obj_request_cache = NULL;
5043 	}
5044 
5045 	kmem_cache_destroy(rbd_img_request_cache);
5046 	rbd_img_request_cache = NULL;
5047 
5048 	return -ENOMEM;
5049 }
5050 
5051 static void rbd_slab_exit(void)
5052 {
5053 	rbd_assert(rbd_segment_name_cache);
5054 	kmem_cache_destroy(rbd_segment_name_cache);
5055 	rbd_segment_name_cache = NULL;
5056 
5057 	rbd_assert(rbd_obj_request_cache);
5058 	kmem_cache_destroy(rbd_obj_request_cache);
5059 	rbd_obj_request_cache = NULL;
5060 
5061 	rbd_assert(rbd_img_request_cache);
5062 	kmem_cache_destroy(rbd_img_request_cache);
5063 	rbd_img_request_cache = NULL;
5064 }
5065 
5066 static int __init rbd_init(void)
5067 {
5068 	int rc;
5069 
5070 	if (!libceph_compatible(NULL)) {
5071 		rbd_warn(NULL, "libceph incompatibility (quitting)");
5072 
5073 		return -EINVAL;
5074 	}
5075 	rc = rbd_slab_init();
5076 	if (rc)
5077 		return rc;
5078 	rc = rbd_sysfs_init();
5079 	if (rc)
5080 		rbd_slab_exit();
5081 	else
5082 		pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5083 
5084 	return rc;
5085 }
5086 
5087 static void __exit rbd_exit(void)
5088 {
5089 	rbd_sysfs_cleanup();
5090 	rbd_slab_exit();
5091 }
5092 
5093 module_init(rbd_init);
5094 module_exit(rbd_exit);
5095 
5096 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5097 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5098 MODULE_DESCRIPTION("rados block device");
5099 
5100 /* following authorship retained from original osdblk.c */
5101 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5102 
5103 MODULE_LICENSE("GPL");
5104