xref: /linux/drivers/block/rbd.c (revision 4413e16d9d21673bb5048a2e542f1aaa00015c2e)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define	SECTOR_SHIFT	9
51 #define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
52 
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55 
56 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
57 
58 #define RBD_MAX_SNAP_NAME_LEN	32
59 #define RBD_MAX_OPT_LEN		1024
60 
61 #define RBD_SNAP_HEAD_NAME	"-"
62 
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN		32
70 #define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
71 
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73 
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78 	u64 image_size;
79 	char *object_prefix;
80 	__u8 obj_order;
81 	__u8 crypt_type;
82 	__u8 comp_type;
83 	struct ceph_snap_context *snapc;
84 	size_t snap_names_len;
85 	u32 total_snaps;
86 
87 	char *snap_names;
88 	u64 *snap_sizes;
89 
90 	u64 obj_version;
91 };
92 
93 struct rbd_options {
94 	int	notify_timeout;
95 };
96 
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101 	struct ceph_client	*client;
102 	struct rbd_options	*rbd_opts;
103 	struct kref		kref;
104 	struct list_head	node;
105 };
106 
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111 	int done;
112 	int rc;
113 	u64 bytes;
114 };
115 
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120 	int			total;
121 	int			num_done;
122 	struct kref		kref;
123 	struct rbd_req_status	status[0];
124 };
125 
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130 	struct request		*rq;		/* blk layer request */
131 	struct bio		*bio;		/* cloned bio */
132 	struct page		**pages;	/* list of used pages */
133 	u64			len;
134 	int			coll_index;
135 	struct rbd_req_coll	*coll;
136 };
137 
138 struct rbd_snap {
139 	struct	device		dev;
140 	const char		*name;
141 	u64			size;
142 	struct list_head	node;
143 	u64			id;
144 };
145 
146 /*
147  * a single device
148  */
149 struct rbd_device {
150 	int			dev_id;		/* blkdev unique id */
151 
152 	int			major;		/* blkdev assigned major */
153 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
154 	struct request_queue	*q;
155 
156 	struct rbd_client	*rbd_client;
157 
158 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159 
160 	spinlock_t		lock;		/* queue lock */
161 
162 	struct rbd_image_header	header;
163 	char			*image_name;
164 	size_t			image_name_len;
165 	char			*header_name;
166 	char			*pool_name;
167 	int			pool_id;
168 
169 	struct ceph_osd_event   *watch_event;
170 	struct ceph_osd_request *watch_request;
171 
172 	/* protects updating the header */
173 	struct rw_semaphore     header_rwsem;
174 	/* name of the snapshot this device reads from */
175 	char                    *snap_name;
176 	/* id of the snapshot this device reads from */
177 	u64                     snap_id;	/* current snapshot id */
178 	/* whether the snap_id this device reads from still exists */
179 	bool                    snap_exists;
180 	int                     read_only;
181 
182 	struct list_head	node;
183 
184 	/* list of snapshots */
185 	struct list_head	snaps;
186 
187 	/* sysfs related */
188 	struct device		dev;
189 };
190 
191 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
192 
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195 
196 static LIST_HEAD(rbd_client_list);		/* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198 
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202 			    struct device_attribute *attr,
203 			    const char *buf,
204 			    size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206 
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208 		       size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210 			  size_t count);
211 
212 static struct bus_attribute rbd_bus_attrs[] = {
213 	__ATTR(add, S_IWUSR, NULL, rbd_add),
214 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
215 	__ATTR_NULL
216 };
217 
218 static struct bus_type rbd_bus_type = {
219 	.name		= "rbd",
220 	.bus_attrs	= rbd_bus_attrs,
221 };
222 
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226 
227 static struct device rbd_root_dev = {
228 	.init_name =    "rbd",
229 	.release =      rbd_root_dev_release,
230 };
231 
232 
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235 	return get_device(&rbd_dev->dev);
236 }
237 
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240 	put_device(&rbd_dev->dev);
241 }
242 
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244 
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248 
249 	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250 		return -EROFS;
251 
252 	rbd_get_dev(rbd_dev);
253 	set_device_ro(bdev, rbd_dev->read_only);
254 
255 	return 0;
256 }
257 
258 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 {
260 	struct rbd_device *rbd_dev = disk->private_data;
261 
262 	rbd_put_dev(rbd_dev);
263 
264 	return 0;
265 }
266 
267 static const struct block_device_operations rbd_bd_ops = {
268 	.owner			= THIS_MODULE,
269 	.open			= rbd_open,
270 	.release		= rbd_release,
271 };
272 
273 /*
274  * Initialize an rbd client instance.
275  * We own *ceph_opts.
276  */
277 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278 					    struct rbd_options *rbd_opts)
279 {
280 	struct rbd_client *rbdc;
281 	int ret = -ENOMEM;
282 
283 	dout("rbd_client_create\n");
284 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285 	if (!rbdc)
286 		goto out_opt;
287 
288 	kref_init(&rbdc->kref);
289 	INIT_LIST_HEAD(&rbdc->node);
290 
291 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292 
293 	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294 	if (IS_ERR(rbdc->client))
295 		goto out_mutex;
296 	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
297 
298 	ret = ceph_open_session(rbdc->client);
299 	if (ret < 0)
300 		goto out_err;
301 
302 	rbdc->rbd_opts = rbd_opts;
303 
304 	spin_lock(&rbd_client_list_lock);
305 	list_add_tail(&rbdc->node, &rbd_client_list);
306 	spin_unlock(&rbd_client_list_lock);
307 
308 	mutex_unlock(&ctl_mutex);
309 
310 	dout("rbd_client_create created %p\n", rbdc);
311 	return rbdc;
312 
313 out_err:
314 	ceph_destroy_client(rbdc->client);
315 out_mutex:
316 	mutex_unlock(&ctl_mutex);
317 	kfree(rbdc);
318 out_opt:
319 	if (ceph_opts)
320 		ceph_destroy_options(ceph_opts);
321 	return ERR_PTR(ret);
322 }
323 
324 /*
325  * Find a ceph client with specific addr and configuration.
326  */
327 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
328 {
329 	struct rbd_client *client_node;
330 
331 	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
332 		return NULL;
333 
334 	list_for_each_entry(client_node, &rbd_client_list, node)
335 		if (!ceph_compare_options(ceph_opts, client_node->client))
336 			return client_node;
337 	return NULL;
338 }
339 
340 /*
341  * mount options
342  */
343 enum {
344 	Opt_notify_timeout,
345 	Opt_last_int,
346 	/* int args above */
347 	Opt_last_string,
348 	/* string args above */
349 };
350 
351 static match_table_t rbd_opts_tokens = {
352 	{Opt_notify_timeout, "notify_timeout=%d"},
353 	/* int args above */
354 	/* string args above */
355 	{-1, NULL}
356 };
357 
358 static int parse_rbd_opts_token(char *c, void *private)
359 {
360 	struct rbd_options *rbd_opts = private;
361 	substring_t argstr[MAX_OPT_ARGS];
362 	int token, intval, ret;
363 
364 	token = match_token(c, rbd_opts_tokens, argstr);
365 	if (token < 0)
366 		return -EINVAL;
367 
368 	if (token < Opt_last_int) {
369 		ret = match_int(&argstr[0], &intval);
370 		if (ret < 0) {
371 			pr_err("bad mount option arg (not int) "
372 			       "at '%s'\n", c);
373 			return ret;
374 		}
375 		dout("got int token %d val %d\n", token, intval);
376 	} else if (token > Opt_last_int && token < Opt_last_string) {
377 		dout("got string token %d val %s\n", token,
378 		     argstr[0].from);
379 	} else {
380 		dout("got token %d\n", token);
381 	}
382 
383 	switch (token) {
384 	case Opt_notify_timeout:
385 		rbd_opts->notify_timeout = intval;
386 		break;
387 	default:
388 		BUG_ON(token);
389 	}
390 	return 0;
391 }
392 
393 /*
394  * Get a ceph client with specific addr and configuration, if one does
395  * not exist create it.
396  */
397 static struct rbd_client *rbd_get_client(const char *mon_addr,
398 					 size_t mon_addr_len,
399 					 char *options)
400 {
401 	struct rbd_client *rbdc;
402 	struct ceph_options *ceph_opts;
403 	struct rbd_options *rbd_opts;
404 
405 	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406 	if (!rbd_opts)
407 		return ERR_PTR(-ENOMEM);
408 
409 	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410 
411 	ceph_opts = ceph_parse_options(options, mon_addr,
412 					mon_addr + mon_addr_len,
413 					parse_rbd_opts_token, rbd_opts);
414 	if (IS_ERR(ceph_opts)) {
415 		kfree(rbd_opts);
416 		return ERR_CAST(ceph_opts);
417 	}
418 
419 	spin_lock(&rbd_client_list_lock);
420 	rbdc = __rbd_client_find(ceph_opts);
421 	if (rbdc) {
422 		/* using an existing client */
423 		kref_get(&rbdc->kref);
424 		spin_unlock(&rbd_client_list_lock);
425 
426 		ceph_destroy_options(ceph_opts);
427 		kfree(rbd_opts);
428 
429 		return rbdc;
430 	}
431 	spin_unlock(&rbd_client_list_lock);
432 
433 	rbdc = rbd_client_create(ceph_opts, rbd_opts);
434 
435 	if (IS_ERR(rbdc))
436 		kfree(rbd_opts);
437 
438 	return rbdc;
439 }
440 
441 /*
442  * Destroy ceph client
443  *
444  * Caller must hold rbd_client_list_lock.
445  */
446 static void rbd_client_release(struct kref *kref)
447 {
448 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449 
450 	dout("rbd_release_client %p\n", rbdc);
451 	spin_lock(&rbd_client_list_lock);
452 	list_del(&rbdc->node);
453 	spin_unlock(&rbd_client_list_lock);
454 
455 	ceph_destroy_client(rbdc->client);
456 	kfree(rbdc->rbd_opts);
457 	kfree(rbdc);
458 }
459 
460 /*
461  * Drop reference to ceph client node. If it's not referenced anymore, release
462  * it.
463  */
464 static void rbd_put_client(struct rbd_device *rbd_dev)
465 {
466 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467 	rbd_dev->rbd_client = NULL;
468 }
469 
470 /*
471  * Destroy requests collection
472  */
473 static void rbd_coll_release(struct kref *kref)
474 {
475 	struct rbd_req_coll *coll =
476 		container_of(kref, struct rbd_req_coll, kref);
477 
478 	dout("rbd_coll_release %p\n", coll);
479 	kfree(coll);
480 }
481 
482 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
483 {
484 	return !memcmp(&ondisk->text,
485 			RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
486 }
487 
488 /*
489  * Create a new header structure, translate header format from the on-disk
490  * header.
491  */
492 static int rbd_header_from_disk(struct rbd_image_header *header,
493 				 struct rbd_image_header_ondisk *ondisk,
494 				 u32 allocated_snaps)
495 {
496 	u32 snap_count;
497 
498 	if (!rbd_dev_ondisk_valid(ondisk))
499 		return -ENXIO;
500 
501 	snap_count = le32_to_cpu(ondisk->snap_count);
502 	if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
503 				 / sizeof (u64))
504 		return -EINVAL;
505 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
506 				snap_count * sizeof(u64),
507 				GFP_KERNEL);
508 	if (!header->snapc)
509 		return -ENOMEM;
510 
511 	if (snap_count) {
512 		header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
513 		header->snap_names = kmalloc(header->snap_names_len,
514 					     GFP_KERNEL);
515 		if (!header->snap_names)
516 			goto err_snapc;
517 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
518 					     GFP_KERNEL);
519 		if (!header->snap_sizes)
520 			goto err_names;
521 	} else {
522 		WARN_ON(ondisk->snap_names_len);
523 		header->snap_names_len = 0;
524 		header->snap_names = NULL;
525 		header->snap_sizes = NULL;
526 	}
527 
528 	header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
529 					GFP_KERNEL);
530 	if (!header->object_prefix)
531 		goto err_sizes;
532 
533 	memcpy(header->object_prefix, ondisk->block_name,
534 	       sizeof(ondisk->block_name));
535 	header->object_prefix[sizeof (ondisk->block_name)] = '\0';
536 
537 	header->image_size = le64_to_cpu(ondisk->image_size);
538 	header->obj_order = ondisk->options.order;
539 	header->crypt_type = ondisk->options.crypt_type;
540 	header->comp_type = ondisk->options.comp_type;
541 
542 	atomic_set(&header->snapc->nref, 1);
543 	header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
544 	header->snapc->num_snaps = snap_count;
545 	header->total_snaps = snap_count;
546 
547 	if (snap_count && allocated_snaps == snap_count) {
548 		int i;
549 
550 		for (i = 0; i < snap_count; i++) {
551 			header->snapc->snaps[i] =
552 				le64_to_cpu(ondisk->snaps[i].id);
553 			header->snap_sizes[i] =
554 				le64_to_cpu(ondisk->snaps[i].image_size);
555 		}
556 
557 		/* copy snapshot names */
558 		memcpy(header->snap_names, &ondisk->snaps[snap_count],
559 			header->snap_names_len);
560 	}
561 
562 	return 0;
563 
564 err_sizes:
565 	kfree(header->snap_sizes);
566 	header->snap_sizes = NULL;
567 err_names:
568 	kfree(header->snap_names);
569 	header->snap_names = NULL;
570 err_snapc:
571 	kfree(header->snapc);
572 	header->snapc = NULL;
573 
574 	return -ENOMEM;
575 }
576 
577 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
578 			u64 *seq, u64 *size)
579 {
580 	int i;
581 	char *p = header->snap_names;
582 
583 	for (i = 0; i < header->total_snaps; i++) {
584 		if (!strcmp(snap_name, p)) {
585 
586 			/* Found it.  Pass back its id and/or size */
587 
588 			if (seq)
589 				*seq = header->snapc->snaps[i];
590 			if (size)
591 				*size = header->snap_sizes[i];
592 			return i;
593 		}
594 		p += strlen(p) + 1;	/* Skip ahead to the next name */
595 	}
596 	return -ENOENT;
597 }
598 
599 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
600 {
601 	int ret;
602 
603 	down_write(&rbd_dev->header_rwsem);
604 
605 	if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
606 		    sizeof (RBD_SNAP_HEAD_NAME))) {
607 		rbd_dev->snap_id = CEPH_NOSNAP;
608 		rbd_dev->snap_exists = false;
609 		rbd_dev->read_only = 0;
610 		if (size)
611 			*size = rbd_dev->header.image_size;
612 	} else {
613 		u64 snap_id = 0;
614 
615 		ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
616 					&snap_id, size);
617 		if (ret < 0)
618 			goto done;
619 		rbd_dev->snap_id = snap_id;
620 		rbd_dev->snap_exists = true;
621 		rbd_dev->read_only = 1;
622 	}
623 
624 	ret = 0;
625 done:
626 	up_write(&rbd_dev->header_rwsem);
627 	return ret;
628 }
629 
630 static void rbd_header_free(struct rbd_image_header *header)
631 {
632 	kfree(header->object_prefix);
633 	kfree(header->snap_sizes);
634 	kfree(header->snap_names);
635 	ceph_put_snap_context(header->snapc);
636 }
637 
638 /*
639  * get the actual striped segment name, offset and length
640  */
641 static u64 rbd_get_segment(struct rbd_image_header *header,
642 			   const char *object_prefix,
643 			   u64 ofs, u64 len,
644 			   char *seg_name, u64 *segofs)
645 {
646 	u64 seg = ofs >> header->obj_order;
647 
648 	if (seg_name)
649 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
650 			 "%s.%012llx", object_prefix, seg);
651 
652 	ofs = ofs & ((1 << header->obj_order) - 1);
653 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
654 
655 	if (segofs)
656 		*segofs = ofs;
657 
658 	return len;
659 }
660 
661 static int rbd_get_num_segments(struct rbd_image_header *header,
662 				u64 ofs, u64 len)
663 {
664 	u64 start_seg = ofs >> header->obj_order;
665 	u64 end_seg = (ofs + len - 1) >> header->obj_order;
666 	return end_seg - start_seg + 1;
667 }
668 
669 /*
670  * returns the size of an object in the image
671  */
672 static u64 rbd_obj_bytes(struct rbd_image_header *header)
673 {
674 	return 1 << header->obj_order;
675 }
676 
677 /*
678  * bio helpers
679  */
680 
681 static void bio_chain_put(struct bio *chain)
682 {
683 	struct bio *tmp;
684 
685 	while (chain) {
686 		tmp = chain;
687 		chain = chain->bi_next;
688 		bio_put(tmp);
689 	}
690 }
691 
692 /*
693  * zeros a bio chain, starting at specific offset
694  */
695 static void zero_bio_chain(struct bio *chain, int start_ofs)
696 {
697 	struct bio_vec *bv;
698 	unsigned long flags;
699 	void *buf;
700 	int i;
701 	int pos = 0;
702 
703 	while (chain) {
704 		bio_for_each_segment(bv, chain, i) {
705 			if (pos + bv->bv_len > start_ofs) {
706 				int remainder = max(start_ofs - pos, 0);
707 				buf = bvec_kmap_irq(bv, &flags);
708 				memset(buf + remainder, 0,
709 				       bv->bv_len - remainder);
710 				bvec_kunmap_irq(buf, &flags);
711 			}
712 			pos += bv->bv_len;
713 		}
714 
715 		chain = chain->bi_next;
716 	}
717 }
718 
719 /*
720  * bio_chain_clone - clone a chain of bios up to a certain length.
721  * might return a bio_pair that will need to be released.
722  */
723 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
724 				   struct bio_pair **bp,
725 				   int len, gfp_t gfpmask)
726 {
727 	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
728 	int total = 0;
729 
730 	if (*bp) {
731 		bio_pair_release(*bp);
732 		*bp = NULL;
733 	}
734 
735 	while (old_chain && (total < len)) {
736 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
737 		if (!tmp)
738 			goto err_out;
739 
740 		if (total + old_chain->bi_size > len) {
741 			struct bio_pair *bp;
742 
743 			/*
744 			 * this split can only happen with a single paged bio,
745 			 * split_bio will BUG_ON if this is not the case
746 			 */
747 			dout("bio_chain_clone split! total=%d remaining=%d"
748 			     "bi_size=%u\n",
749 			     total, len - total, old_chain->bi_size);
750 
751 			/* split the bio. We'll release it either in the next
752 			   call, or it will have to be released outside */
753 			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
754 			if (!bp)
755 				goto err_out;
756 
757 			__bio_clone(tmp, &bp->bio1);
758 
759 			*next = &bp->bio2;
760 		} else {
761 			__bio_clone(tmp, old_chain);
762 			*next = old_chain->bi_next;
763 		}
764 
765 		tmp->bi_bdev = NULL;
766 		gfpmask &= ~__GFP_WAIT;
767 		tmp->bi_next = NULL;
768 
769 		if (!new_chain) {
770 			new_chain = tail = tmp;
771 		} else {
772 			tail->bi_next = tmp;
773 			tail = tmp;
774 		}
775 		old_chain = old_chain->bi_next;
776 
777 		total += tmp->bi_size;
778 	}
779 
780 	BUG_ON(total < len);
781 
782 	if (tail)
783 		tail->bi_next = NULL;
784 
785 	*old = old_chain;
786 
787 	return new_chain;
788 
789 err_out:
790 	dout("bio_chain_clone with err\n");
791 	bio_chain_put(new_chain);
792 	return NULL;
793 }
794 
795 /*
796  * helpers for osd request op vectors.
797  */
798 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
799 					int opcode, u32 payload_len)
800 {
801 	struct ceph_osd_req_op *ops;
802 
803 	ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
804 	if (!ops)
805 		return NULL;
806 
807 	ops[0].op = opcode;
808 
809 	/*
810 	 * op extent offset and length will be set later on
811 	 * in calc_raw_layout()
812 	 */
813 	ops[0].payload_len = payload_len;
814 
815 	return ops;
816 }
817 
818 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
819 {
820 	kfree(ops);
821 }
822 
823 static void rbd_coll_end_req_index(struct request *rq,
824 				   struct rbd_req_coll *coll,
825 				   int index,
826 				   int ret, u64 len)
827 {
828 	struct request_queue *q;
829 	int min, max, i;
830 
831 	dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
832 	     coll, index, ret, (unsigned long long) len);
833 
834 	if (!rq)
835 		return;
836 
837 	if (!coll) {
838 		blk_end_request(rq, ret, len);
839 		return;
840 	}
841 
842 	q = rq->q;
843 
844 	spin_lock_irq(q->queue_lock);
845 	coll->status[index].done = 1;
846 	coll->status[index].rc = ret;
847 	coll->status[index].bytes = len;
848 	max = min = coll->num_done;
849 	while (max < coll->total && coll->status[max].done)
850 		max++;
851 
852 	for (i = min; i<max; i++) {
853 		__blk_end_request(rq, coll->status[i].rc,
854 				  coll->status[i].bytes);
855 		coll->num_done++;
856 		kref_put(&coll->kref, rbd_coll_release);
857 	}
858 	spin_unlock_irq(q->queue_lock);
859 }
860 
861 static void rbd_coll_end_req(struct rbd_request *req,
862 			     int ret, u64 len)
863 {
864 	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
865 }
866 
867 /*
868  * Send ceph osd request
869  */
870 static int rbd_do_request(struct request *rq,
871 			  struct rbd_device *rbd_dev,
872 			  struct ceph_snap_context *snapc,
873 			  u64 snapid,
874 			  const char *object_name, u64 ofs, u64 len,
875 			  struct bio *bio,
876 			  struct page **pages,
877 			  int num_pages,
878 			  int flags,
879 			  struct ceph_osd_req_op *ops,
880 			  struct rbd_req_coll *coll,
881 			  int coll_index,
882 			  void (*rbd_cb)(struct ceph_osd_request *req,
883 					 struct ceph_msg *msg),
884 			  struct ceph_osd_request **linger_req,
885 			  u64 *ver)
886 {
887 	struct ceph_osd_request *req;
888 	struct ceph_file_layout *layout;
889 	int ret;
890 	u64 bno;
891 	struct timespec mtime = CURRENT_TIME;
892 	struct rbd_request *req_data;
893 	struct ceph_osd_request_head *reqhead;
894 	struct ceph_osd_client *osdc;
895 
896 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
897 	if (!req_data) {
898 		if (coll)
899 			rbd_coll_end_req_index(rq, coll, coll_index,
900 					       -ENOMEM, len);
901 		return -ENOMEM;
902 	}
903 
904 	if (coll) {
905 		req_data->coll = coll;
906 		req_data->coll_index = coll_index;
907 	}
908 
909 	dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
910 		(unsigned long long) ofs, (unsigned long long) len);
911 
912 	osdc = &rbd_dev->rbd_client->client->osdc;
913 	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
914 					false, GFP_NOIO, pages, bio);
915 	if (!req) {
916 		ret = -ENOMEM;
917 		goto done_pages;
918 	}
919 
920 	req->r_callback = rbd_cb;
921 
922 	req_data->rq = rq;
923 	req_data->bio = bio;
924 	req_data->pages = pages;
925 	req_data->len = len;
926 
927 	req->r_priv = req_data;
928 
929 	reqhead = req->r_request->front.iov_base;
930 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
931 
932 	strncpy(req->r_oid, object_name, sizeof(req->r_oid));
933 	req->r_oid_len = strlen(req->r_oid);
934 
935 	layout = &req->r_file_layout;
936 	memset(layout, 0, sizeof(*layout));
937 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
938 	layout->fl_stripe_count = cpu_to_le32(1);
939 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
940 	layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
941 	ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
942 				req, ops);
943 
944 	ceph_osdc_build_request(req, ofs, &len,
945 				ops,
946 				snapc,
947 				&mtime,
948 				req->r_oid, req->r_oid_len);
949 
950 	if (linger_req) {
951 		ceph_osdc_set_request_linger(osdc, req);
952 		*linger_req = req;
953 	}
954 
955 	ret = ceph_osdc_start_request(osdc, req, false);
956 	if (ret < 0)
957 		goto done_err;
958 
959 	if (!rbd_cb) {
960 		ret = ceph_osdc_wait_request(osdc, req);
961 		if (ver)
962 			*ver = le64_to_cpu(req->r_reassert_version.version);
963 		dout("reassert_ver=%llu\n",
964 			(unsigned long long)
965 				le64_to_cpu(req->r_reassert_version.version));
966 		ceph_osdc_put_request(req);
967 	}
968 	return ret;
969 
970 done_err:
971 	bio_chain_put(req_data->bio);
972 	ceph_osdc_put_request(req);
973 done_pages:
974 	rbd_coll_end_req(req_data, ret, len);
975 	kfree(req_data);
976 	return ret;
977 }
978 
979 /*
980  * Ceph osd op callback
981  */
982 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
983 {
984 	struct rbd_request *req_data = req->r_priv;
985 	struct ceph_osd_reply_head *replyhead;
986 	struct ceph_osd_op *op;
987 	__s32 rc;
988 	u64 bytes;
989 	int read_op;
990 
991 	/* parse reply */
992 	replyhead = msg->front.iov_base;
993 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
994 	op = (void *)(replyhead + 1);
995 	rc = le32_to_cpu(replyhead->result);
996 	bytes = le64_to_cpu(op->extent.length);
997 	read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
998 
999 	dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1000 		(unsigned long long) bytes, read_op, (int) rc);
1001 
1002 	if (rc == -ENOENT && read_op) {
1003 		zero_bio_chain(req_data->bio, 0);
1004 		rc = 0;
1005 	} else if (rc == 0 && read_op && bytes < req_data->len) {
1006 		zero_bio_chain(req_data->bio, bytes);
1007 		bytes = req_data->len;
1008 	}
1009 
1010 	rbd_coll_end_req(req_data, rc, bytes);
1011 
1012 	if (req_data->bio)
1013 		bio_chain_put(req_data->bio);
1014 
1015 	ceph_osdc_put_request(req);
1016 	kfree(req_data);
1017 }
1018 
1019 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1020 {
1021 	ceph_osdc_put_request(req);
1022 }
1023 
1024 /*
1025  * Do a synchronous ceph osd operation
1026  */
1027 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1028 			   struct ceph_snap_context *snapc,
1029 			   u64 snapid,
1030 			   int flags,
1031 			   struct ceph_osd_req_op *ops,
1032 			   const char *object_name,
1033 			   u64 ofs, u64 len,
1034 			   char *buf,
1035 			   struct ceph_osd_request **linger_req,
1036 			   u64 *ver)
1037 {
1038 	int ret;
1039 	struct page **pages;
1040 	int num_pages;
1041 
1042 	BUG_ON(ops == NULL);
1043 
1044 	num_pages = calc_pages_for(ofs , len);
1045 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1046 	if (IS_ERR(pages))
1047 		return PTR_ERR(pages);
1048 
1049 	ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1050 			  object_name, ofs, len, NULL,
1051 			  pages, num_pages,
1052 			  flags,
1053 			  ops,
1054 			  NULL, 0,
1055 			  NULL,
1056 			  linger_req, ver);
1057 	if (ret < 0)
1058 		goto done;
1059 
1060 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1061 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1062 
1063 done:
1064 	ceph_release_page_vector(pages, num_pages);
1065 	return ret;
1066 }
1067 
1068 /*
1069  * Do an asynchronous ceph osd operation
1070  */
1071 static int rbd_do_op(struct request *rq,
1072 		     struct rbd_device *rbd_dev,
1073 		     struct ceph_snap_context *snapc,
1074 		     u64 snapid,
1075 		     int opcode, int flags,
1076 		     u64 ofs, u64 len,
1077 		     struct bio *bio,
1078 		     struct rbd_req_coll *coll,
1079 		     int coll_index)
1080 {
1081 	char *seg_name;
1082 	u64 seg_ofs;
1083 	u64 seg_len;
1084 	int ret;
1085 	struct ceph_osd_req_op *ops;
1086 	u32 payload_len;
1087 
1088 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1089 	if (!seg_name)
1090 		return -ENOMEM;
1091 
1092 	seg_len = rbd_get_segment(&rbd_dev->header,
1093 				  rbd_dev->header.object_prefix,
1094 				  ofs, len,
1095 				  seg_name, &seg_ofs);
1096 
1097 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1098 
1099 	ret = -ENOMEM;
1100 	ops = rbd_create_rw_ops(1, opcode, payload_len);
1101 	if (!ops)
1102 		goto done;
1103 
1104 	/* we've taken care of segment sizes earlier when we
1105 	   cloned the bios. We should never have a segment
1106 	   truncated at this point */
1107 	BUG_ON(seg_len < len);
1108 
1109 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1110 			     seg_name, seg_ofs, seg_len,
1111 			     bio,
1112 			     NULL, 0,
1113 			     flags,
1114 			     ops,
1115 			     coll, coll_index,
1116 			     rbd_req_cb, 0, NULL);
1117 
1118 	rbd_destroy_ops(ops);
1119 done:
1120 	kfree(seg_name);
1121 	return ret;
1122 }
1123 
1124 /*
1125  * Request async osd write
1126  */
1127 static int rbd_req_write(struct request *rq,
1128 			 struct rbd_device *rbd_dev,
1129 			 struct ceph_snap_context *snapc,
1130 			 u64 ofs, u64 len,
1131 			 struct bio *bio,
1132 			 struct rbd_req_coll *coll,
1133 			 int coll_index)
1134 {
1135 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1136 			 CEPH_OSD_OP_WRITE,
1137 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1138 			 ofs, len, bio, coll, coll_index);
1139 }
1140 
1141 /*
1142  * Request async osd read
1143  */
1144 static int rbd_req_read(struct request *rq,
1145 			 struct rbd_device *rbd_dev,
1146 			 u64 snapid,
1147 			 u64 ofs, u64 len,
1148 			 struct bio *bio,
1149 			 struct rbd_req_coll *coll,
1150 			 int coll_index)
1151 {
1152 	return rbd_do_op(rq, rbd_dev, NULL,
1153 			 snapid,
1154 			 CEPH_OSD_OP_READ,
1155 			 CEPH_OSD_FLAG_READ,
1156 			 ofs, len, bio, coll, coll_index);
1157 }
1158 
1159 /*
1160  * Request sync osd read
1161  */
1162 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1163 			  u64 snapid,
1164 			  const char *object_name,
1165 			  u64 ofs, u64 len,
1166 			  char *buf,
1167 			  u64 *ver)
1168 {
1169 	struct ceph_osd_req_op *ops;
1170 	int ret;
1171 
1172 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1173 	if (!ops)
1174 		return -ENOMEM;
1175 
1176 	ret = rbd_req_sync_op(rbd_dev, NULL,
1177 			       snapid,
1178 			       CEPH_OSD_FLAG_READ,
1179 			       ops, object_name, ofs, len, buf, NULL, ver);
1180 	rbd_destroy_ops(ops);
1181 
1182 	return ret;
1183 }
1184 
1185 /*
1186  * Request sync osd watch
1187  */
1188 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1189 				   u64 ver,
1190 				   u64 notify_id)
1191 {
1192 	struct ceph_osd_req_op *ops;
1193 	int ret;
1194 
1195 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1196 	if (!ops)
1197 		return -ENOMEM;
1198 
1199 	ops[0].watch.ver = cpu_to_le64(ver);
1200 	ops[0].watch.cookie = notify_id;
1201 	ops[0].watch.flag = 0;
1202 
1203 	ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1204 			  rbd_dev->header_name, 0, 0, NULL,
1205 			  NULL, 0,
1206 			  CEPH_OSD_FLAG_READ,
1207 			  ops,
1208 			  NULL, 0,
1209 			  rbd_simple_req_cb, 0, NULL);
1210 
1211 	rbd_destroy_ops(ops);
1212 	return ret;
1213 }
1214 
1215 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1216 {
1217 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
1218 	u64 hver;
1219 	int rc;
1220 
1221 	if (!rbd_dev)
1222 		return;
1223 
1224 	dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1225 		rbd_dev->header_name, (unsigned long long) notify_id,
1226 		(unsigned int) opcode);
1227 	rc = rbd_refresh_header(rbd_dev, &hver);
1228 	if (rc)
1229 		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1230 			   " update snaps: %d\n", rbd_dev->major, rc);
1231 
1232 	rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1233 }
1234 
1235 /*
1236  * Request sync osd watch
1237  */
1238 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1239 {
1240 	struct ceph_osd_req_op *ops;
1241 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1242 	int ret;
1243 
1244 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1245 	if (!ops)
1246 		return -ENOMEM;
1247 
1248 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249 				     (void *)rbd_dev, &rbd_dev->watch_event);
1250 	if (ret < 0)
1251 		goto fail;
1252 
1253 	ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1254 	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1255 	ops[0].watch.flag = 1;
1256 
1257 	ret = rbd_req_sync_op(rbd_dev, NULL,
1258 			      CEPH_NOSNAP,
1259 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1260 			      ops,
1261 			      rbd_dev->header_name,
1262 			      0, 0, NULL,
1263 			      &rbd_dev->watch_request, NULL);
1264 
1265 	if (ret < 0)
1266 		goto fail_event;
1267 
1268 	rbd_destroy_ops(ops);
1269 	return 0;
1270 
1271 fail_event:
1272 	ceph_osdc_cancel_event(rbd_dev->watch_event);
1273 	rbd_dev->watch_event = NULL;
1274 fail:
1275 	rbd_destroy_ops(ops);
1276 	return ret;
1277 }
1278 
1279 /*
1280  * Request sync osd unwatch
1281  */
1282 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1283 {
1284 	struct ceph_osd_req_op *ops;
1285 	int ret;
1286 
1287 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1288 	if (!ops)
1289 		return -ENOMEM;
1290 
1291 	ops[0].watch.ver = 0;
1292 	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1293 	ops[0].watch.flag = 0;
1294 
1295 	ret = rbd_req_sync_op(rbd_dev, NULL,
1296 			      CEPH_NOSNAP,
1297 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1298 			      ops,
1299 			      rbd_dev->header_name,
1300 			      0, 0, NULL, NULL, NULL);
1301 
1302 
1303 	rbd_destroy_ops(ops);
1304 	ceph_osdc_cancel_event(rbd_dev->watch_event);
1305 	rbd_dev->watch_event = NULL;
1306 	return ret;
1307 }
1308 
1309 struct rbd_notify_info {
1310 	struct rbd_device *rbd_dev;
1311 };
1312 
1313 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1314 {
1315 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
1316 	if (!rbd_dev)
1317 		return;
1318 
1319 	dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1320 			rbd_dev->header_name, (unsigned long long) notify_id,
1321 			(unsigned int) opcode);
1322 }
1323 
1324 /*
1325  * Request sync osd notify
1326  */
1327 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1328 {
1329 	struct ceph_osd_req_op *ops;
1330 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1331 	struct ceph_osd_event *event;
1332 	struct rbd_notify_info info;
1333 	int payload_len = sizeof(u32) + sizeof(u32);
1334 	int ret;
1335 
1336 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1337 	if (!ops)
1338 		return -ENOMEM;
1339 
1340 	info.rbd_dev = rbd_dev;
1341 
1342 	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1343 				     (void *)&info, &event);
1344 	if (ret < 0)
1345 		goto fail;
1346 
1347 	ops[0].watch.ver = 1;
1348 	ops[0].watch.flag = 1;
1349 	ops[0].watch.cookie = event->cookie;
1350 	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1351 	ops[0].watch.timeout = 12;
1352 
1353 	ret = rbd_req_sync_op(rbd_dev, NULL,
1354 			       CEPH_NOSNAP,
1355 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356 			       ops,
1357 			       rbd_dev->header_name,
1358 			       0, 0, NULL, NULL, NULL);
1359 	if (ret < 0)
1360 		goto fail_event;
1361 
1362 	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1363 	dout("ceph_osdc_wait_event returned %d\n", ret);
1364 	rbd_destroy_ops(ops);
1365 	return 0;
1366 
1367 fail_event:
1368 	ceph_osdc_cancel_event(event);
1369 fail:
1370 	rbd_destroy_ops(ops);
1371 	return ret;
1372 }
1373 
1374 /*
1375  * Request sync osd read
1376  */
1377 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1378 			     const char *object_name,
1379 			     const char *class_name,
1380 			     const char *method_name,
1381 			     const char *data,
1382 			     int len,
1383 			     u64 *ver)
1384 {
1385 	struct ceph_osd_req_op *ops;
1386 	int class_name_len = strlen(class_name);
1387 	int method_name_len = strlen(method_name);
1388 	int ret;
1389 
1390 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1391 				    class_name_len + method_name_len + len);
1392 	if (!ops)
1393 		return -ENOMEM;
1394 
1395 	ops[0].cls.class_name = class_name;
1396 	ops[0].cls.class_len = (__u8) class_name_len;
1397 	ops[0].cls.method_name = method_name;
1398 	ops[0].cls.method_len = (__u8) method_name_len;
1399 	ops[0].cls.argc = 0;
1400 	ops[0].cls.indata = data;
1401 	ops[0].cls.indata_len = len;
1402 
1403 	ret = rbd_req_sync_op(rbd_dev, NULL,
1404 			       CEPH_NOSNAP,
1405 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1406 			       ops,
1407 			       object_name, 0, 0, NULL, NULL, ver);
1408 
1409 	rbd_destroy_ops(ops);
1410 
1411 	dout("cls_exec returned %d\n", ret);
1412 	return ret;
1413 }
1414 
1415 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1416 {
1417 	struct rbd_req_coll *coll =
1418 			kzalloc(sizeof(struct rbd_req_coll) +
1419 			        sizeof(struct rbd_req_status) * num_reqs,
1420 				GFP_ATOMIC);
1421 
1422 	if (!coll)
1423 		return NULL;
1424 	coll->total = num_reqs;
1425 	kref_init(&coll->kref);
1426 	return coll;
1427 }
1428 
1429 /*
1430  * block device queue callback
1431  */
1432 static void rbd_rq_fn(struct request_queue *q)
1433 {
1434 	struct rbd_device *rbd_dev = q->queuedata;
1435 	struct request *rq;
1436 	struct bio_pair *bp = NULL;
1437 
1438 	while ((rq = blk_fetch_request(q))) {
1439 		struct bio *bio;
1440 		struct bio *rq_bio, *next_bio = NULL;
1441 		bool do_write;
1442 		unsigned int size;
1443 		u64 op_size = 0;
1444 		u64 ofs;
1445 		int num_segs, cur_seg = 0;
1446 		struct rbd_req_coll *coll;
1447 		struct ceph_snap_context *snapc;
1448 
1449 		/* peek at request from block layer */
1450 		if (!rq)
1451 			break;
1452 
1453 		dout("fetched request\n");
1454 
1455 		/* filter out block requests we don't understand */
1456 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1457 			__blk_end_request_all(rq, 0);
1458 			continue;
1459 		}
1460 
1461 		/* deduce our operation (read, write) */
1462 		do_write = (rq_data_dir(rq) == WRITE);
1463 
1464 		size = blk_rq_bytes(rq);
1465 		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1466 		rq_bio = rq->bio;
1467 		if (do_write && rbd_dev->read_only) {
1468 			__blk_end_request_all(rq, -EROFS);
1469 			continue;
1470 		}
1471 
1472 		spin_unlock_irq(q->queue_lock);
1473 
1474 		down_read(&rbd_dev->header_rwsem);
1475 
1476 		if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1477 			up_read(&rbd_dev->header_rwsem);
1478 			dout("request for non-existent snapshot");
1479 			spin_lock_irq(q->queue_lock);
1480 			__blk_end_request_all(rq, -ENXIO);
1481 			continue;
1482 		}
1483 
1484 		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1485 
1486 		up_read(&rbd_dev->header_rwsem);
1487 
1488 		dout("%s 0x%x bytes at 0x%llx\n",
1489 		     do_write ? "write" : "read",
1490 		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1491 
1492 		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1493 		coll = rbd_alloc_coll(num_segs);
1494 		if (!coll) {
1495 			spin_lock_irq(q->queue_lock);
1496 			__blk_end_request_all(rq, -ENOMEM);
1497 			ceph_put_snap_context(snapc);
1498 			continue;
1499 		}
1500 
1501 		do {
1502 			/* a bio clone to be passed down to OSD req */
1503 			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1504 			op_size = rbd_get_segment(&rbd_dev->header,
1505 						  rbd_dev->header.object_prefix,
1506 						  ofs, size,
1507 						  NULL, NULL);
1508 			kref_get(&coll->kref);
1509 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1510 					      op_size, GFP_ATOMIC);
1511 			if (!bio) {
1512 				rbd_coll_end_req_index(rq, coll, cur_seg,
1513 						       -ENOMEM, op_size);
1514 				goto next_seg;
1515 			}
1516 
1517 
1518 			/* init OSD command: write or read */
1519 			if (do_write)
1520 				rbd_req_write(rq, rbd_dev,
1521 					      snapc,
1522 					      ofs,
1523 					      op_size, bio,
1524 					      coll, cur_seg);
1525 			else
1526 				rbd_req_read(rq, rbd_dev,
1527 					     rbd_dev->snap_id,
1528 					     ofs,
1529 					     op_size, bio,
1530 					     coll, cur_seg);
1531 
1532 next_seg:
1533 			size -= op_size;
1534 			ofs += op_size;
1535 
1536 			cur_seg++;
1537 			rq_bio = next_bio;
1538 		} while (size > 0);
1539 		kref_put(&coll->kref, rbd_coll_release);
1540 
1541 		if (bp)
1542 			bio_pair_release(bp);
1543 		spin_lock_irq(q->queue_lock);
1544 
1545 		ceph_put_snap_context(snapc);
1546 	}
1547 }
1548 
1549 /*
1550  * a queue callback. Makes sure that we don't create a bio that spans across
1551  * multiple osd objects. One exception would be with a single page bios,
1552  * which we handle later at bio_chain_clone
1553  */
1554 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1555 			  struct bio_vec *bvec)
1556 {
1557 	struct rbd_device *rbd_dev = q->queuedata;
1558 	unsigned int chunk_sectors;
1559 	sector_t sector;
1560 	unsigned int bio_sectors;
1561 	int max;
1562 
1563 	chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1564 	sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1565 	bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1566 
1567 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1568 				 + bio_sectors)) << SECTOR_SHIFT;
1569 	if (max < 0)
1570 		max = 0; /* bio_add cannot handle a negative return */
1571 	if (max <= bvec->bv_len && bio_sectors == 0)
1572 		return bvec->bv_len;
1573 	return max;
1574 }
1575 
1576 static void rbd_free_disk(struct rbd_device *rbd_dev)
1577 {
1578 	struct gendisk *disk = rbd_dev->disk;
1579 
1580 	if (!disk)
1581 		return;
1582 
1583 	rbd_header_free(&rbd_dev->header);
1584 
1585 	if (disk->flags & GENHD_FL_UP)
1586 		del_gendisk(disk);
1587 	if (disk->queue)
1588 		blk_cleanup_queue(disk->queue);
1589 	put_disk(disk);
1590 }
1591 
1592 /*
1593  * reload the ondisk the header
1594  */
1595 static int rbd_read_header(struct rbd_device *rbd_dev,
1596 			   struct rbd_image_header *header)
1597 {
1598 	ssize_t rc;
1599 	struct rbd_image_header_ondisk *dh;
1600 	u32 snap_count = 0;
1601 	u64 ver;
1602 	size_t len;
1603 
1604 	/*
1605 	 * First reads the fixed-size header to determine the number
1606 	 * of snapshots, then re-reads it, along with all snapshot
1607 	 * records as well as their stored names.
1608 	 */
1609 	len = sizeof (*dh);
1610 	while (1) {
1611 		dh = kmalloc(len, GFP_KERNEL);
1612 		if (!dh)
1613 			return -ENOMEM;
1614 
1615 		rc = rbd_req_sync_read(rbd_dev,
1616 				       CEPH_NOSNAP,
1617 				       rbd_dev->header_name,
1618 				       0, len,
1619 				       (char *)dh, &ver);
1620 		if (rc < 0)
1621 			goto out_dh;
1622 
1623 		rc = rbd_header_from_disk(header, dh, snap_count);
1624 		if (rc < 0) {
1625 			if (rc == -ENXIO)
1626 				pr_warning("unrecognized header format"
1627 					   " for image %s\n",
1628 					   rbd_dev->image_name);
1629 			goto out_dh;
1630 		}
1631 
1632 		if (snap_count == header->total_snaps)
1633 			break;
1634 
1635 		snap_count = header->total_snaps;
1636 		len = sizeof (*dh) +
1637 			snap_count * sizeof(struct rbd_image_snap_ondisk) +
1638 			header->snap_names_len;
1639 
1640 		rbd_header_free(header);
1641 		kfree(dh);
1642 	}
1643 	header->obj_version = ver;
1644 
1645 out_dh:
1646 	kfree(dh);
1647 	return rc;
1648 }
1649 
1650 /*
1651  * create a snapshot
1652  */
1653 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1654 			       const char *snap_name,
1655 			       gfp_t gfp_flags)
1656 {
1657 	int name_len = strlen(snap_name);
1658 	u64 new_snapid;
1659 	int ret;
1660 	void *data, *p, *e;
1661 	struct ceph_mon_client *monc;
1662 
1663 	/* we should create a snapshot only if we're pointing at the head */
1664 	if (rbd_dev->snap_id != CEPH_NOSNAP)
1665 		return -EINVAL;
1666 
1667 	monc = &rbd_dev->rbd_client->client->monc;
1668 	ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1669 	dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1670 	if (ret < 0)
1671 		return ret;
1672 
1673 	data = kmalloc(name_len + 16, gfp_flags);
1674 	if (!data)
1675 		return -ENOMEM;
1676 
1677 	p = data;
1678 	e = data + name_len + 16;
1679 
1680 	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1681 	ceph_encode_64_safe(&p, e, new_snapid, bad);
1682 
1683 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1684 				"rbd", "snap_add",
1685 				data, p - data, NULL);
1686 
1687 	kfree(data);
1688 
1689 	return ret < 0 ? ret : 0;
1690 bad:
1691 	return -ERANGE;
1692 }
1693 
1694 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1695 {
1696 	struct rbd_snap *snap;
1697 	struct rbd_snap *next;
1698 
1699 	list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1700 		__rbd_remove_snap_dev(snap);
1701 }
1702 
1703 /*
1704  * only read the first part of the ondisk header, without the snaps info
1705  */
1706 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1707 {
1708 	int ret;
1709 	struct rbd_image_header h;
1710 
1711 	ret = rbd_read_header(rbd_dev, &h);
1712 	if (ret < 0)
1713 		return ret;
1714 
1715 	down_write(&rbd_dev->header_rwsem);
1716 
1717 	/* resized? */
1718 	if (rbd_dev->snap_id == CEPH_NOSNAP) {
1719 		sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1720 
1721 		dout("setting size to %llu sectors", (unsigned long long) size);
1722 		set_capacity(rbd_dev->disk, size);
1723 	}
1724 
1725 	/* rbd_dev->header.object_prefix shouldn't change */
1726 	kfree(rbd_dev->header.snap_sizes);
1727 	kfree(rbd_dev->header.snap_names);
1728 	/* osd requests may still refer to snapc */
1729 	ceph_put_snap_context(rbd_dev->header.snapc);
1730 
1731 	if (hver)
1732 		*hver = h.obj_version;
1733 	rbd_dev->header.obj_version = h.obj_version;
1734 	rbd_dev->header.image_size = h.image_size;
1735 	rbd_dev->header.total_snaps = h.total_snaps;
1736 	rbd_dev->header.snapc = h.snapc;
1737 	rbd_dev->header.snap_names = h.snap_names;
1738 	rbd_dev->header.snap_names_len = h.snap_names_len;
1739 	rbd_dev->header.snap_sizes = h.snap_sizes;
1740 	/* Free the extra copy of the object prefix */
1741 	WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1742 	kfree(h.object_prefix);
1743 
1744 	ret = __rbd_init_snaps_header(rbd_dev);
1745 
1746 	up_write(&rbd_dev->header_rwsem);
1747 
1748 	return ret;
1749 }
1750 
1751 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1752 {
1753 	int ret;
1754 
1755 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1756 	ret = __rbd_refresh_header(rbd_dev, hver);
1757 	mutex_unlock(&ctl_mutex);
1758 
1759 	return ret;
1760 }
1761 
1762 static int rbd_init_disk(struct rbd_device *rbd_dev)
1763 {
1764 	struct gendisk *disk;
1765 	struct request_queue *q;
1766 	int rc;
1767 	u64 segment_size;
1768 	u64 total_size = 0;
1769 
1770 	/* contact OSD, request size info about the object being mapped */
1771 	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1772 	if (rc)
1773 		return rc;
1774 
1775 	/* no need to lock here, as rbd_dev is not registered yet */
1776 	rc = __rbd_init_snaps_header(rbd_dev);
1777 	if (rc)
1778 		return rc;
1779 
1780 	rc = rbd_header_set_snap(rbd_dev, &total_size);
1781 	if (rc)
1782 		return rc;
1783 
1784 	/* create gendisk info */
1785 	rc = -ENOMEM;
1786 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1787 	if (!disk)
1788 		goto out;
1789 
1790 	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1791 		 rbd_dev->dev_id);
1792 	disk->major = rbd_dev->major;
1793 	disk->first_minor = 0;
1794 	disk->fops = &rbd_bd_ops;
1795 	disk->private_data = rbd_dev;
1796 
1797 	/* init rq */
1798 	rc = -ENOMEM;
1799 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1800 	if (!q)
1801 		goto out_disk;
1802 
1803 	/* We use the default size, but let's be explicit about it. */
1804 	blk_queue_physical_block_size(q, SECTOR_SIZE);
1805 
1806 	/* set io sizes to object size */
1807 	segment_size = rbd_obj_bytes(&rbd_dev->header);
1808 	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1809 	blk_queue_max_segment_size(q, segment_size);
1810 	blk_queue_io_min(q, segment_size);
1811 	blk_queue_io_opt(q, segment_size);
1812 
1813 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1814 	disk->queue = q;
1815 
1816 	q->queuedata = rbd_dev;
1817 
1818 	rbd_dev->disk = disk;
1819 	rbd_dev->q = q;
1820 
1821 	/* finally, announce the disk to the world */
1822 	set_capacity(disk, total_size / SECTOR_SIZE);
1823 	add_disk(disk);
1824 
1825 	pr_info("%s: added with size 0x%llx\n",
1826 		disk->disk_name, (unsigned long long)total_size);
1827 	return 0;
1828 
1829 out_disk:
1830 	put_disk(disk);
1831 out:
1832 	return rc;
1833 }
1834 
1835 /*
1836   sysfs
1837 */
1838 
1839 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1840 {
1841 	return container_of(dev, struct rbd_device, dev);
1842 }
1843 
1844 static ssize_t rbd_size_show(struct device *dev,
1845 			     struct device_attribute *attr, char *buf)
1846 {
1847 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1848 	sector_t size;
1849 
1850 	down_read(&rbd_dev->header_rwsem);
1851 	size = get_capacity(rbd_dev->disk);
1852 	up_read(&rbd_dev->header_rwsem);
1853 
1854 	return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1855 }
1856 
1857 static ssize_t rbd_major_show(struct device *dev,
1858 			      struct device_attribute *attr, char *buf)
1859 {
1860 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861 
1862 	return sprintf(buf, "%d\n", rbd_dev->major);
1863 }
1864 
1865 static ssize_t rbd_client_id_show(struct device *dev,
1866 				  struct device_attribute *attr, char *buf)
1867 {
1868 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869 
1870 	return sprintf(buf, "client%lld\n",
1871 			ceph_client_id(rbd_dev->rbd_client->client));
1872 }
1873 
1874 static ssize_t rbd_pool_show(struct device *dev,
1875 			     struct device_attribute *attr, char *buf)
1876 {
1877 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878 
1879 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1880 }
1881 
1882 static ssize_t rbd_pool_id_show(struct device *dev,
1883 			     struct device_attribute *attr, char *buf)
1884 {
1885 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1886 
1887 	return sprintf(buf, "%d\n", rbd_dev->pool_id);
1888 }
1889 
1890 static ssize_t rbd_name_show(struct device *dev,
1891 			     struct device_attribute *attr, char *buf)
1892 {
1893 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1894 
1895 	return sprintf(buf, "%s\n", rbd_dev->image_name);
1896 }
1897 
1898 static ssize_t rbd_snap_show(struct device *dev,
1899 			     struct device_attribute *attr,
1900 			     char *buf)
1901 {
1902 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903 
1904 	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1905 }
1906 
1907 static ssize_t rbd_image_refresh(struct device *dev,
1908 				 struct device_attribute *attr,
1909 				 const char *buf,
1910 				 size_t size)
1911 {
1912 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913 	int ret;
1914 
1915 	ret = rbd_refresh_header(rbd_dev, NULL);
1916 
1917 	return ret < 0 ? ret : size;
1918 }
1919 
1920 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1921 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1922 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1923 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1924 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1925 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1926 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1927 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1928 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1929 
1930 static struct attribute *rbd_attrs[] = {
1931 	&dev_attr_size.attr,
1932 	&dev_attr_major.attr,
1933 	&dev_attr_client_id.attr,
1934 	&dev_attr_pool.attr,
1935 	&dev_attr_pool_id.attr,
1936 	&dev_attr_name.attr,
1937 	&dev_attr_current_snap.attr,
1938 	&dev_attr_refresh.attr,
1939 	&dev_attr_create_snap.attr,
1940 	NULL
1941 };
1942 
1943 static struct attribute_group rbd_attr_group = {
1944 	.attrs = rbd_attrs,
1945 };
1946 
1947 static const struct attribute_group *rbd_attr_groups[] = {
1948 	&rbd_attr_group,
1949 	NULL
1950 };
1951 
1952 static void rbd_sysfs_dev_release(struct device *dev)
1953 {
1954 }
1955 
1956 static struct device_type rbd_device_type = {
1957 	.name		= "rbd",
1958 	.groups		= rbd_attr_groups,
1959 	.release	= rbd_sysfs_dev_release,
1960 };
1961 
1962 
1963 /*
1964   sysfs - snapshots
1965 */
1966 
1967 static ssize_t rbd_snap_size_show(struct device *dev,
1968 				  struct device_attribute *attr,
1969 				  char *buf)
1970 {
1971 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1972 
1973 	return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1974 }
1975 
1976 static ssize_t rbd_snap_id_show(struct device *dev,
1977 				struct device_attribute *attr,
1978 				char *buf)
1979 {
1980 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1981 
1982 	return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1983 }
1984 
1985 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1986 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1987 
1988 static struct attribute *rbd_snap_attrs[] = {
1989 	&dev_attr_snap_size.attr,
1990 	&dev_attr_snap_id.attr,
1991 	NULL,
1992 };
1993 
1994 static struct attribute_group rbd_snap_attr_group = {
1995 	.attrs = rbd_snap_attrs,
1996 };
1997 
1998 static void rbd_snap_dev_release(struct device *dev)
1999 {
2000 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2001 	kfree(snap->name);
2002 	kfree(snap);
2003 }
2004 
2005 static const struct attribute_group *rbd_snap_attr_groups[] = {
2006 	&rbd_snap_attr_group,
2007 	NULL
2008 };
2009 
2010 static struct device_type rbd_snap_device_type = {
2011 	.groups		= rbd_snap_attr_groups,
2012 	.release	= rbd_snap_dev_release,
2013 };
2014 
2015 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2016 {
2017 	list_del(&snap->node);
2018 	device_unregister(&snap->dev);
2019 }
2020 
2021 static int rbd_register_snap_dev(struct rbd_snap *snap,
2022 				  struct device *parent)
2023 {
2024 	struct device *dev = &snap->dev;
2025 	int ret;
2026 
2027 	dev->type = &rbd_snap_device_type;
2028 	dev->parent = parent;
2029 	dev->release = rbd_snap_dev_release;
2030 	dev_set_name(dev, "snap_%s", snap->name);
2031 	ret = device_register(dev);
2032 
2033 	return ret;
2034 }
2035 
2036 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2037 					      int i, const char *name)
2038 {
2039 	struct rbd_snap *snap;
2040 	int ret;
2041 
2042 	snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2043 	if (!snap)
2044 		return ERR_PTR(-ENOMEM);
2045 
2046 	ret = -ENOMEM;
2047 	snap->name = kstrdup(name, GFP_KERNEL);
2048 	if (!snap->name)
2049 		goto err;
2050 
2051 	snap->size = rbd_dev->header.snap_sizes[i];
2052 	snap->id = rbd_dev->header.snapc->snaps[i];
2053 	if (device_is_registered(&rbd_dev->dev)) {
2054 		ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2055 		if (ret < 0)
2056 			goto err;
2057 	}
2058 
2059 	return snap;
2060 
2061 err:
2062 	kfree(snap->name);
2063 	kfree(snap);
2064 
2065 	return ERR_PTR(ret);
2066 }
2067 
2068 /*
2069  * search for the previous snap in a null delimited string list
2070  */
2071 const char *rbd_prev_snap_name(const char *name, const char *start)
2072 {
2073 	if (name < start + 2)
2074 		return NULL;
2075 
2076 	name -= 2;
2077 	while (*name) {
2078 		if (name == start)
2079 			return start;
2080 		name--;
2081 	}
2082 	return name + 1;
2083 }
2084 
2085 /*
2086  * compare the old list of snapshots that we have to what's in the header
2087  * and update it accordingly. Note that the header holds the snapshots
2088  * in a reverse order (from newest to oldest) and we need to go from
2089  * older to new so that we don't get a duplicate snap name when
2090  * doing the process (e.g., removed snapshot and recreated a new
2091  * one with the same name.
2092  */
2093 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2094 {
2095 	const char *name, *first_name;
2096 	int i = rbd_dev->header.total_snaps;
2097 	struct rbd_snap *snap, *old_snap = NULL;
2098 	struct list_head *p, *n;
2099 
2100 	first_name = rbd_dev->header.snap_names;
2101 	name = first_name + rbd_dev->header.snap_names_len;
2102 
2103 	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2104 		u64 cur_id;
2105 
2106 		old_snap = list_entry(p, struct rbd_snap, node);
2107 
2108 		if (i)
2109 			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2110 
2111 		if (!i || old_snap->id < cur_id) {
2112 			/*
2113 			 * old_snap->id was skipped, thus was
2114 			 * removed.  If this rbd_dev is mapped to
2115 			 * the removed snapshot, record that it no
2116 			 * longer exists, to prevent further I/O.
2117 			 */
2118 			if (rbd_dev->snap_id == old_snap->id)
2119 				rbd_dev->snap_exists = false;
2120 			__rbd_remove_snap_dev(old_snap);
2121 			continue;
2122 		}
2123 		if (old_snap->id == cur_id) {
2124 			/* we have this snapshot already */
2125 			i--;
2126 			name = rbd_prev_snap_name(name, first_name);
2127 			continue;
2128 		}
2129 		for (; i > 0;
2130 		     i--, name = rbd_prev_snap_name(name, first_name)) {
2131 			if (!name) {
2132 				WARN_ON(1);
2133 				return -EINVAL;
2134 			}
2135 			cur_id = rbd_dev->header.snapc->snaps[i];
2136 			/* snapshot removal? handle it above */
2137 			if (cur_id >= old_snap->id)
2138 				break;
2139 			/* a new snapshot */
2140 			snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2141 			if (IS_ERR(snap))
2142 				return PTR_ERR(snap);
2143 
2144 			/* note that we add it backward so using n and not p */
2145 			list_add(&snap->node, n);
2146 			p = &snap->node;
2147 		}
2148 	}
2149 	/* we're done going over the old snap list, just add what's left */
2150 	for (; i > 0; i--) {
2151 		name = rbd_prev_snap_name(name, first_name);
2152 		if (!name) {
2153 			WARN_ON(1);
2154 			return -EINVAL;
2155 		}
2156 		snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2157 		if (IS_ERR(snap))
2158 			return PTR_ERR(snap);
2159 		list_add(&snap->node, &rbd_dev->snaps);
2160 	}
2161 
2162 	return 0;
2163 }
2164 
2165 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2166 {
2167 	int ret;
2168 	struct device *dev;
2169 	struct rbd_snap *snap;
2170 
2171 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2172 	dev = &rbd_dev->dev;
2173 
2174 	dev->bus = &rbd_bus_type;
2175 	dev->type = &rbd_device_type;
2176 	dev->parent = &rbd_root_dev;
2177 	dev->release = rbd_dev_release;
2178 	dev_set_name(dev, "%d", rbd_dev->dev_id);
2179 	ret = device_register(dev);
2180 	if (ret < 0)
2181 		goto out;
2182 
2183 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2184 		ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2185 		if (ret < 0)
2186 			break;
2187 	}
2188 out:
2189 	mutex_unlock(&ctl_mutex);
2190 	return ret;
2191 }
2192 
2193 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2194 {
2195 	device_unregister(&rbd_dev->dev);
2196 }
2197 
2198 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2199 {
2200 	int ret, rc;
2201 
2202 	do {
2203 		ret = rbd_req_sync_watch(rbd_dev);
2204 		if (ret == -ERANGE) {
2205 			rc = rbd_refresh_header(rbd_dev, NULL);
2206 			if (rc < 0)
2207 				return rc;
2208 		}
2209 	} while (ret == -ERANGE);
2210 
2211 	return ret;
2212 }
2213 
2214 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2215 
2216 /*
2217  * Get a unique rbd identifier for the given new rbd_dev, and add
2218  * the rbd_dev to the global list.  The minimum rbd id is 1.
2219  */
2220 static void rbd_id_get(struct rbd_device *rbd_dev)
2221 {
2222 	rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2223 
2224 	spin_lock(&rbd_dev_list_lock);
2225 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2226 	spin_unlock(&rbd_dev_list_lock);
2227 }
2228 
2229 /*
2230  * Remove an rbd_dev from the global list, and record that its
2231  * identifier is no longer in use.
2232  */
2233 static void rbd_id_put(struct rbd_device *rbd_dev)
2234 {
2235 	struct list_head *tmp;
2236 	int rbd_id = rbd_dev->dev_id;
2237 	int max_id;
2238 
2239 	BUG_ON(rbd_id < 1);
2240 
2241 	spin_lock(&rbd_dev_list_lock);
2242 	list_del_init(&rbd_dev->node);
2243 
2244 	/*
2245 	 * If the id being "put" is not the current maximum, there
2246 	 * is nothing special we need to do.
2247 	 */
2248 	if (rbd_id != atomic64_read(&rbd_id_max)) {
2249 		spin_unlock(&rbd_dev_list_lock);
2250 		return;
2251 	}
2252 
2253 	/*
2254 	 * We need to update the current maximum id.  Search the
2255 	 * list to find out what it is.  We're more likely to find
2256 	 * the maximum at the end, so search the list backward.
2257 	 */
2258 	max_id = 0;
2259 	list_for_each_prev(tmp, &rbd_dev_list) {
2260 		struct rbd_device *rbd_dev;
2261 
2262 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2263 		if (rbd_id > max_id)
2264 			max_id = rbd_id;
2265 	}
2266 	spin_unlock(&rbd_dev_list_lock);
2267 
2268 	/*
2269 	 * The max id could have been updated by rbd_id_get(), in
2270 	 * which case it now accurately reflects the new maximum.
2271 	 * Be careful not to overwrite the maximum value in that
2272 	 * case.
2273 	 */
2274 	atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2275 }
2276 
2277 /*
2278  * Skips over white space at *buf, and updates *buf to point to the
2279  * first found non-space character (if any). Returns the length of
2280  * the token (string of non-white space characters) found.  Note
2281  * that *buf must be terminated with '\0'.
2282  */
2283 static inline size_t next_token(const char **buf)
2284 {
2285         /*
2286         * These are the characters that produce nonzero for
2287         * isspace() in the "C" and "POSIX" locales.
2288         */
2289         const char *spaces = " \f\n\r\t\v";
2290 
2291         *buf += strspn(*buf, spaces);	/* Find start of token */
2292 
2293 	return strcspn(*buf, spaces);   /* Return token length */
2294 }
2295 
2296 /*
2297  * Finds the next token in *buf, and if the provided token buffer is
2298  * big enough, copies the found token into it.  The result, if
2299  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2300  * must be terminated with '\0' on entry.
2301  *
2302  * Returns the length of the token found (not including the '\0').
2303  * Return value will be 0 if no token is found, and it will be >=
2304  * token_size if the token would not fit.
2305  *
2306  * The *buf pointer will be updated to point beyond the end of the
2307  * found token.  Note that this occurs even if the token buffer is
2308  * too small to hold it.
2309  */
2310 static inline size_t copy_token(const char **buf,
2311 				char *token,
2312 				size_t token_size)
2313 {
2314         size_t len;
2315 
2316 	len = next_token(buf);
2317 	if (len < token_size) {
2318 		memcpy(token, *buf, len);
2319 		*(token + len) = '\0';
2320 	}
2321 	*buf += len;
2322 
2323         return len;
2324 }
2325 
2326 /*
2327  * Finds the next token in *buf, dynamically allocates a buffer big
2328  * enough to hold a copy of it, and copies the token into the new
2329  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2330  * that a duplicate buffer is created even for a zero-length token.
2331  *
2332  * Returns a pointer to the newly-allocated duplicate, or a null
2333  * pointer if memory for the duplicate was not available.  If
2334  * the lenp argument is a non-null pointer, the length of the token
2335  * (not including the '\0') is returned in *lenp.
2336  *
2337  * If successful, the *buf pointer will be updated to point beyond
2338  * the end of the found token.
2339  *
2340  * Note: uses GFP_KERNEL for allocation.
2341  */
2342 static inline char *dup_token(const char **buf, size_t *lenp)
2343 {
2344 	char *dup;
2345 	size_t len;
2346 
2347 	len = next_token(buf);
2348 	dup = kmalloc(len + 1, GFP_KERNEL);
2349 	if (!dup)
2350 		return NULL;
2351 
2352 	memcpy(dup, *buf, len);
2353 	*(dup + len) = '\0';
2354 	*buf += len;
2355 
2356 	if (lenp)
2357 		*lenp = len;
2358 
2359 	return dup;
2360 }
2361 
2362 /*
2363  * This fills in the pool_name, image_name, image_name_len, snap_name,
2364  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2365  * on the list of monitor addresses and other options provided via
2366  * /sys/bus/rbd/add.
2367  *
2368  * Note: rbd_dev is assumed to have been initially zero-filled.
2369  */
2370 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2371 			      const char *buf,
2372 			      const char **mon_addrs,
2373 			      size_t *mon_addrs_size,
2374 			      char *options,
2375 			     size_t options_size)
2376 {
2377 	size_t len;
2378 	int ret;
2379 
2380 	/* The first four tokens are required */
2381 
2382 	len = next_token(&buf);
2383 	if (!len)
2384 		return -EINVAL;
2385 	*mon_addrs_size = len + 1;
2386 	*mon_addrs = buf;
2387 
2388 	buf += len;
2389 
2390 	len = copy_token(&buf, options, options_size);
2391 	if (!len || len >= options_size)
2392 		return -EINVAL;
2393 
2394 	ret = -ENOMEM;
2395 	rbd_dev->pool_name = dup_token(&buf, NULL);
2396 	if (!rbd_dev->pool_name)
2397 		goto out_err;
2398 
2399 	rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2400 	if (!rbd_dev->image_name)
2401 		goto out_err;
2402 
2403 	/* Create the name of the header object */
2404 
2405 	rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2406 						+ sizeof (RBD_SUFFIX),
2407 					GFP_KERNEL);
2408 	if (!rbd_dev->header_name)
2409 		goto out_err;
2410 	sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2411 
2412 	/*
2413 	 * The snapshot name is optional.  If none is is supplied,
2414 	 * we use the default value.
2415 	 */
2416 	rbd_dev->snap_name = dup_token(&buf, &len);
2417 	if (!rbd_dev->snap_name)
2418 		goto out_err;
2419 	if (!len) {
2420 		/* Replace the empty name with the default */
2421 		kfree(rbd_dev->snap_name);
2422 		rbd_dev->snap_name
2423 			= kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2424 		if (!rbd_dev->snap_name)
2425 			goto out_err;
2426 
2427 		memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2428 			sizeof (RBD_SNAP_HEAD_NAME));
2429 	}
2430 
2431 	return 0;
2432 
2433 out_err:
2434 	kfree(rbd_dev->header_name);
2435 	kfree(rbd_dev->image_name);
2436 	kfree(rbd_dev->pool_name);
2437 	rbd_dev->pool_name = NULL;
2438 
2439 	return ret;
2440 }
2441 
2442 static ssize_t rbd_add(struct bus_type *bus,
2443 		       const char *buf,
2444 		       size_t count)
2445 {
2446 	char *options;
2447 	struct rbd_device *rbd_dev = NULL;
2448 	const char *mon_addrs = NULL;
2449 	size_t mon_addrs_size = 0;
2450 	struct ceph_osd_client *osdc;
2451 	int rc = -ENOMEM;
2452 
2453 	if (!try_module_get(THIS_MODULE))
2454 		return -ENODEV;
2455 
2456 	options = kmalloc(count, GFP_KERNEL);
2457 	if (!options)
2458 		goto err_nomem;
2459 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2460 	if (!rbd_dev)
2461 		goto err_nomem;
2462 
2463 	/* static rbd_device initialization */
2464 	spin_lock_init(&rbd_dev->lock);
2465 	INIT_LIST_HEAD(&rbd_dev->node);
2466 	INIT_LIST_HEAD(&rbd_dev->snaps);
2467 	init_rwsem(&rbd_dev->header_rwsem);
2468 
2469 	/* generate unique id: find highest unique id, add one */
2470 	rbd_id_get(rbd_dev);
2471 
2472 	/* Fill in the device name, now that we have its id. */
2473 	BUILD_BUG_ON(DEV_NAME_LEN
2474 			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2475 	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2476 
2477 	/* parse add command */
2478 	rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2479 				options, count);
2480 	if (rc)
2481 		goto err_put_id;
2482 
2483 	rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2484 						options);
2485 	if (IS_ERR(rbd_dev->rbd_client)) {
2486 		rc = PTR_ERR(rbd_dev->rbd_client);
2487 		goto err_put_id;
2488 	}
2489 
2490 	/* pick the pool */
2491 	osdc = &rbd_dev->rbd_client->client->osdc;
2492 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2493 	if (rc < 0)
2494 		goto err_out_client;
2495 	rbd_dev->pool_id = rc;
2496 
2497 	/* register our block device */
2498 	rc = register_blkdev(0, rbd_dev->name);
2499 	if (rc < 0)
2500 		goto err_out_client;
2501 	rbd_dev->major = rc;
2502 
2503 	rc = rbd_bus_add_dev(rbd_dev);
2504 	if (rc)
2505 		goto err_out_blkdev;
2506 
2507 	/*
2508 	 * At this point cleanup in the event of an error is the job
2509 	 * of the sysfs code (initiated by rbd_bus_del_dev()).
2510 	 *
2511 	 * Set up and announce blkdev mapping.
2512 	 */
2513 	rc = rbd_init_disk(rbd_dev);
2514 	if (rc)
2515 		goto err_out_bus;
2516 
2517 	rc = rbd_init_watch_dev(rbd_dev);
2518 	if (rc)
2519 		goto err_out_bus;
2520 
2521 	return count;
2522 
2523 err_out_bus:
2524 	/* this will also clean up rest of rbd_dev stuff */
2525 
2526 	rbd_bus_del_dev(rbd_dev);
2527 	kfree(options);
2528 	return rc;
2529 
2530 err_out_blkdev:
2531 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2532 err_out_client:
2533 	rbd_put_client(rbd_dev);
2534 err_put_id:
2535 	if (rbd_dev->pool_name) {
2536 		kfree(rbd_dev->snap_name);
2537 		kfree(rbd_dev->header_name);
2538 		kfree(rbd_dev->image_name);
2539 		kfree(rbd_dev->pool_name);
2540 	}
2541 	rbd_id_put(rbd_dev);
2542 err_nomem:
2543 	kfree(rbd_dev);
2544 	kfree(options);
2545 
2546 	dout("Error adding device %s\n", buf);
2547 	module_put(THIS_MODULE);
2548 
2549 	return (ssize_t) rc;
2550 }
2551 
2552 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2553 {
2554 	struct list_head *tmp;
2555 	struct rbd_device *rbd_dev;
2556 
2557 	spin_lock(&rbd_dev_list_lock);
2558 	list_for_each(tmp, &rbd_dev_list) {
2559 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2560 		if (rbd_dev->dev_id == dev_id) {
2561 			spin_unlock(&rbd_dev_list_lock);
2562 			return rbd_dev;
2563 		}
2564 	}
2565 	spin_unlock(&rbd_dev_list_lock);
2566 	return NULL;
2567 }
2568 
2569 static void rbd_dev_release(struct device *dev)
2570 {
2571 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2572 
2573 	if (rbd_dev->watch_request) {
2574 		struct ceph_client *client = rbd_dev->rbd_client->client;
2575 
2576 		ceph_osdc_unregister_linger_request(&client->osdc,
2577 						    rbd_dev->watch_request);
2578 	}
2579 	if (rbd_dev->watch_event)
2580 		rbd_req_sync_unwatch(rbd_dev);
2581 
2582 	rbd_put_client(rbd_dev);
2583 
2584 	/* clean up and free blkdev */
2585 	rbd_free_disk(rbd_dev);
2586 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2587 
2588 	/* done with the id, and with the rbd_dev */
2589 	kfree(rbd_dev->snap_name);
2590 	kfree(rbd_dev->header_name);
2591 	kfree(rbd_dev->pool_name);
2592 	kfree(rbd_dev->image_name);
2593 	rbd_id_put(rbd_dev);
2594 	kfree(rbd_dev);
2595 
2596 	/* release module ref */
2597 	module_put(THIS_MODULE);
2598 }
2599 
2600 static ssize_t rbd_remove(struct bus_type *bus,
2601 			  const char *buf,
2602 			  size_t count)
2603 {
2604 	struct rbd_device *rbd_dev = NULL;
2605 	int target_id, rc;
2606 	unsigned long ul;
2607 	int ret = count;
2608 
2609 	rc = strict_strtoul(buf, 10, &ul);
2610 	if (rc)
2611 		return rc;
2612 
2613 	/* convert to int; abort if we lost anything in the conversion */
2614 	target_id = (int) ul;
2615 	if (target_id != ul)
2616 		return -EINVAL;
2617 
2618 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2619 
2620 	rbd_dev = __rbd_get_dev(target_id);
2621 	if (!rbd_dev) {
2622 		ret = -ENOENT;
2623 		goto done;
2624 	}
2625 
2626 	__rbd_remove_all_snaps(rbd_dev);
2627 	rbd_bus_del_dev(rbd_dev);
2628 
2629 done:
2630 	mutex_unlock(&ctl_mutex);
2631 	return ret;
2632 }
2633 
2634 static ssize_t rbd_snap_add(struct device *dev,
2635 			    struct device_attribute *attr,
2636 			    const char *buf,
2637 			    size_t count)
2638 {
2639 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2640 	int ret;
2641 	char *name = kmalloc(count + 1, GFP_KERNEL);
2642 	if (!name)
2643 		return -ENOMEM;
2644 
2645 	snprintf(name, count, "%s", buf);
2646 
2647 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2648 
2649 	ret = rbd_header_add_snap(rbd_dev,
2650 				  name, GFP_KERNEL);
2651 	if (ret < 0)
2652 		goto err_unlock;
2653 
2654 	ret = __rbd_refresh_header(rbd_dev, NULL);
2655 	if (ret < 0)
2656 		goto err_unlock;
2657 
2658 	/* shouldn't hold ctl_mutex when notifying.. notify might
2659 	   trigger a watch callback that would need to get that mutex */
2660 	mutex_unlock(&ctl_mutex);
2661 
2662 	/* make a best effort, don't error if failed */
2663 	rbd_req_sync_notify(rbd_dev);
2664 
2665 	ret = count;
2666 	kfree(name);
2667 	return ret;
2668 
2669 err_unlock:
2670 	mutex_unlock(&ctl_mutex);
2671 	kfree(name);
2672 	return ret;
2673 }
2674 
2675 /*
2676  * create control files in sysfs
2677  * /sys/bus/rbd/...
2678  */
2679 static int rbd_sysfs_init(void)
2680 {
2681 	int ret;
2682 
2683 	ret = device_register(&rbd_root_dev);
2684 	if (ret < 0)
2685 		return ret;
2686 
2687 	ret = bus_register(&rbd_bus_type);
2688 	if (ret < 0)
2689 		device_unregister(&rbd_root_dev);
2690 
2691 	return ret;
2692 }
2693 
2694 static void rbd_sysfs_cleanup(void)
2695 {
2696 	bus_unregister(&rbd_bus_type);
2697 	device_unregister(&rbd_root_dev);
2698 }
2699 
2700 int __init rbd_init(void)
2701 {
2702 	int rc;
2703 
2704 	rc = rbd_sysfs_init();
2705 	if (rc)
2706 		return rc;
2707 	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2708 	return 0;
2709 }
2710 
2711 void __exit rbd_exit(void)
2712 {
2713 	rbd_sysfs_cleanup();
2714 }
2715 
2716 module_init(rbd_init);
2717 module_exit(rbd_exit);
2718 
2719 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2720 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2721 MODULE_DESCRIPTION("rados block device");
2722 
2723 /* following authorship retained from original osdblk.c */
2724 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2725 
2726 MODULE_LICENSE("GPL");
2727