xref: /linux/drivers/block/rbd.c (revision 26b0d14106954ae46d2f4f7eec3481828a210f7d)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define	SECTOR_SHIFT	9
51 #define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
52 
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55 
56 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
57 
58 #define RBD_MAX_MD_NAME_LEN	(RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
59 #define RBD_MAX_POOL_NAME_LEN	64
60 #define RBD_MAX_SNAP_NAME_LEN	32
61 #define RBD_MAX_OPT_LEN		1024
62 
63 #define RBD_SNAP_HEAD_NAME	"-"
64 
65 /*
66  * An RBD device name will be "rbd#", where the "rbd" comes from
67  * RBD_DRV_NAME above, and # is a unique integer identifier.
68  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
69  * enough to hold all possible device names.
70  */
71 #define DEV_NAME_LEN		32
72 #define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
73 
74 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 
76 /*
77  * block device image metadata (in-memory version)
78  */
79 struct rbd_image_header {
80 	u64 image_size;
81 	char block_name[32];
82 	__u8 obj_order;
83 	__u8 crypt_type;
84 	__u8 comp_type;
85 	struct ceph_snap_context *snapc;
86 	size_t snap_names_len;
87 	u64 snap_seq;
88 	u32 total_snaps;
89 
90 	char *snap_names;
91 	u64 *snap_sizes;
92 
93 	u64 obj_version;
94 };
95 
96 struct rbd_options {
97 	int	notify_timeout;
98 };
99 
100 /*
101  * an instance of the client.  multiple devices may share an rbd client.
102  */
103 struct rbd_client {
104 	struct ceph_client	*client;
105 	struct rbd_options	*rbd_opts;
106 	struct kref		kref;
107 	struct list_head	node;
108 };
109 
110 /*
111  * a request completion status
112  */
113 struct rbd_req_status {
114 	int done;
115 	int rc;
116 	u64 bytes;
117 };
118 
119 /*
120  * a collection of requests
121  */
122 struct rbd_req_coll {
123 	int			total;
124 	int			num_done;
125 	struct kref		kref;
126 	struct rbd_req_status	status[0];
127 };
128 
129 /*
130  * a single io request
131  */
132 struct rbd_request {
133 	struct request		*rq;		/* blk layer request */
134 	struct bio		*bio;		/* cloned bio */
135 	struct page		**pages;	/* list of used pages */
136 	u64			len;
137 	int			coll_index;
138 	struct rbd_req_coll	*coll;
139 };
140 
141 struct rbd_snap {
142 	struct	device		dev;
143 	const char		*name;
144 	u64			size;
145 	struct list_head	node;
146 	u64			id;
147 };
148 
149 /*
150  * a single device
151  */
152 struct rbd_device {
153 	int			id;		/* blkdev unique id */
154 
155 	int			major;		/* blkdev assigned major */
156 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
157 	struct request_queue	*q;
158 
159 	struct rbd_client	*rbd_client;
160 
161 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
162 
163 	spinlock_t		lock;		/* queue lock */
164 
165 	struct rbd_image_header	header;
166 	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
167 	int			obj_len;
168 	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
169 	char			pool_name[RBD_MAX_POOL_NAME_LEN];
170 	int			poolid;
171 
172 	struct ceph_osd_event   *watch_event;
173 	struct ceph_osd_request *watch_request;
174 
175 	/* protects updating the header */
176 	struct rw_semaphore     header_rwsem;
177 	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
178 	u64                     snap_id;	/* current snapshot id */
179 	int read_only;
180 
181 	struct list_head	node;
182 
183 	/* list of snapshots */
184 	struct list_head	snaps;
185 
186 	/* sysfs related */
187 	struct device		dev;
188 };
189 
190 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
191 
192 static LIST_HEAD(rbd_dev_list);    /* devices */
193 static DEFINE_SPINLOCK(rbd_dev_list_lock);
194 
195 static LIST_HEAD(rbd_client_list);		/* clients */
196 static DEFINE_SPINLOCK(rbd_client_list_lock);
197 
198 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
199 static void rbd_dev_release(struct device *dev);
200 static ssize_t rbd_snap_add(struct device *dev,
201 			    struct device_attribute *attr,
202 			    const char *buf,
203 			    size_t count);
204 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
205 				  struct rbd_snap *snap);
206 
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208 		       size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210 			  size_t count);
211 
212 static struct bus_attribute rbd_bus_attrs[] = {
213 	__ATTR(add, S_IWUSR, NULL, rbd_add),
214 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
215 	__ATTR_NULL
216 };
217 
218 static struct bus_type rbd_bus_type = {
219 	.name		= "rbd",
220 	.bus_attrs	= rbd_bus_attrs,
221 };
222 
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226 
227 static struct device rbd_root_dev = {
228 	.init_name =    "rbd",
229 	.release =      rbd_root_dev_release,
230 };
231 
232 
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235 	return get_device(&rbd_dev->dev);
236 }
237 
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240 	put_device(&rbd_dev->dev);
241 }
242 
243 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
244 
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248 
249 	rbd_get_dev(rbd_dev);
250 
251 	set_device_ro(bdev, rbd_dev->read_only);
252 
253 	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
254 		return -EROFS;
255 
256 	return 0;
257 }
258 
259 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 {
261 	struct rbd_device *rbd_dev = disk->private_data;
262 
263 	rbd_put_dev(rbd_dev);
264 
265 	return 0;
266 }
267 
268 static const struct block_device_operations rbd_bd_ops = {
269 	.owner			= THIS_MODULE,
270 	.open			= rbd_open,
271 	.release		= rbd_release,
272 };
273 
274 /*
275  * Initialize an rbd client instance.
276  * We own *opt.
277  */
278 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
279 					    struct rbd_options *rbd_opts)
280 {
281 	struct rbd_client *rbdc;
282 	int ret = -ENOMEM;
283 
284 	dout("rbd_client_create\n");
285 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
286 	if (!rbdc)
287 		goto out_opt;
288 
289 	kref_init(&rbdc->kref);
290 	INIT_LIST_HEAD(&rbdc->node);
291 
292 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293 
294 	rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
295 	if (IS_ERR(rbdc->client))
296 		goto out_mutex;
297 	opt = NULL; /* Now rbdc->client is responsible for opt */
298 
299 	ret = ceph_open_session(rbdc->client);
300 	if (ret < 0)
301 		goto out_err;
302 
303 	rbdc->rbd_opts = rbd_opts;
304 
305 	spin_lock(&rbd_client_list_lock);
306 	list_add_tail(&rbdc->node, &rbd_client_list);
307 	spin_unlock(&rbd_client_list_lock);
308 
309 	mutex_unlock(&ctl_mutex);
310 
311 	dout("rbd_client_create created %p\n", rbdc);
312 	return rbdc;
313 
314 out_err:
315 	ceph_destroy_client(rbdc->client);
316 out_mutex:
317 	mutex_unlock(&ctl_mutex);
318 	kfree(rbdc);
319 out_opt:
320 	if (opt)
321 		ceph_destroy_options(opt);
322 	return ERR_PTR(ret);
323 }
324 
325 /*
326  * Find a ceph client with specific addr and configuration.
327  */
328 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
329 {
330 	struct rbd_client *client_node;
331 
332 	if (opt->flags & CEPH_OPT_NOSHARE)
333 		return NULL;
334 
335 	list_for_each_entry(client_node, &rbd_client_list, node)
336 		if (ceph_compare_options(opt, client_node->client) == 0)
337 			return client_node;
338 	return NULL;
339 }
340 
341 /*
342  * mount options
343  */
344 enum {
345 	Opt_notify_timeout,
346 	Opt_last_int,
347 	/* int args above */
348 	Opt_last_string,
349 	/* string args above */
350 };
351 
352 static match_table_t rbdopt_tokens = {
353 	{Opt_notify_timeout, "notify_timeout=%d"},
354 	/* int args above */
355 	/* string args above */
356 	{-1, NULL}
357 };
358 
359 static int parse_rbd_opts_token(char *c, void *private)
360 {
361 	struct rbd_options *rbdopt = private;
362 	substring_t argstr[MAX_OPT_ARGS];
363 	int token, intval, ret;
364 
365 	token = match_token(c, rbdopt_tokens, argstr);
366 	if (token < 0)
367 		return -EINVAL;
368 
369 	if (token < Opt_last_int) {
370 		ret = match_int(&argstr[0], &intval);
371 		if (ret < 0) {
372 			pr_err("bad mount option arg (not int) "
373 			       "at '%s'\n", c);
374 			return ret;
375 		}
376 		dout("got int token %d val %d\n", token, intval);
377 	} else if (token > Opt_last_int && token < Opt_last_string) {
378 		dout("got string token %d val %s\n", token,
379 		     argstr[0].from);
380 	} else {
381 		dout("got token %d\n", token);
382 	}
383 
384 	switch (token) {
385 	case Opt_notify_timeout:
386 		rbdopt->notify_timeout = intval;
387 		break;
388 	default:
389 		BUG_ON(token);
390 	}
391 	return 0;
392 }
393 
394 /*
395  * Get a ceph client with specific addr and configuration, if one does
396  * not exist create it.
397  */
398 static struct rbd_client *rbd_get_client(const char *mon_addr,
399 					 size_t mon_addr_len,
400 					 char *options)
401 {
402 	struct rbd_client *rbdc;
403 	struct ceph_options *opt;
404 	struct rbd_options *rbd_opts;
405 
406 	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407 	if (!rbd_opts)
408 		return ERR_PTR(-ENOMEM);
409 
410 	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411 
412 	opt = ceph_parse_options(options, mon_addr,
413 				mon_addr + mon_addr_len,
414 				parse_rbd_opts_token, rbd_opts);
415 	if (IS_ERR(opt)) {
416 		kfree(rbd_opts);
417 		return ERR_CAST(opt);
418 	}
419 
420 	spin_lock(&rbd_client_list_lock);
421 	rbdc = __rbd_client_find(opt);
422 	if (rbdc) {
423 		/* using an existing client */
424 		kref_get(&rbdc->kref);
425 		spin_unlock(&rbd_client_list_lock);
426 
427 		ceph_destroy_options(opt);
428 		kfree(rbd_opts);
429 
430 		return rbdc;
431 	}
432 	spin_unlock(&rbd_client_list_lock);
433 
434 	rbdc = rbd_client_create(opt, rbd_opts);
435 
436 	if (IS_ERR(rbdc))
437 		kfree(rbd_opts);
438 
439 	return rbdc;
440 }
441 
442 /*
443  * Destroy ceph client
444  *
445  * Caller must hold rbd_client_list_lock.
446  */
447 static void rbd_client_release(struct kref *kref)
448 {
449 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450 
451 	dout("rbd_release_client %p\n", rbdc);
452 	spin_lock(&rbd_client_list_lock);
453 	list_del(&rbdc->node);
454 	spin_unlock(&rbd_client_list_lock);
455 
456 	ceph_destroy_client(rbdc->client);
457 	kfree(rbdc->rbd_opts);
458 	kfree(rbdc);
459 }
460 
461 /*
462  * Drop reference to ceph client node. If it's not referenced anymore, release
463  * it.
464  */
465 static void rbd_put_client(struct rbd_device *rbd_dev)
466 {
467 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468 	rbd_dev->rbd_client = NULL;
469 }
470 
471 /*
472  * Destroy requests collection
473  */
474 static void rbd_coll_release(struct kref *kref)
475 {
476 	struct rbd_req_coll *coll =
477 		container_of(kref, struct rbd_req_coll, kref);
478 
479 	dout("rbd_coll_release %p\n", coll);
480 	kfree(coll);
481 }
482 
483 /*
484  * Create a new header structure, translate header format from the on-disk
485  * header.
486  */
487 static int rbd_header_from_disk(struct rbd_image_header *header,
488 				 struct rbd_image_header_ondisk *ondisk,
489 				 u32 allocated_snaps,
490 				 gfp_t gfp_flags)
491 {
492 	u32 i, snap_count;
493 
494 	if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
495 		return -ENXIO;
496 
497 	snap_count = le32_to_cpu(ondisk->snap_count);
498 	if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
499 			 / sizeof (*ondisk))
500 		return -EINVAL;
501 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
502 				snap_count * sizeof (*ondisk),
503 				gfp_flags);
504 	if (!header->snapc)
505 		return -ENOMEM;
506 
507 	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
508 	if (snap_count) {
509 		header->snap_names = kmalloc(header->snap_names_len,
510 					     gfp_flags);
511 		if (!header->snap_names)
512 			goto err_snapc;
513 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
514 					     gfp_flags);
515 		if (!header->snap_sizes)
516 			goto err_names;
517 	} else {
518 		header->snap_names = NULL;
519 		header->snap_sizes = NULL;
520 	}
521 	memcpy(header->block_name, ondisk->block_name,
522 	       sizeof(ondisk->block_name));
523 
524 	header->image_size = le64_to_cpu(ondisk->image_size);
525 	header->obj_order = ondisk->options.order;
526 	header->crypt_type = ondisk->options.crypt_type;
527 	header->comp_type = ondisk->options.comp_type;
528 
529 	atomic_set(&header->snapc->nref, 1);
530 	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
531 	header->snapc->num_snaps = snap_count;
532 	header->total_snaps = snap_count;
533 
534 	if (snap_count && allocated_snaps == snap_count) {
535 		for (i = 0; i < snap_count; i++) {
536 			header->snapc->snaps[i] =
537 				le64_to_cpu(ondisk->snaps[i].id);
538 			header->snap_sizes[i] =
539 				le64_to_cpu(ondisk->snaps[i].image_size);
540 		}
541 
542 		/* copy snapshot names */
543 		memcpy(header->snap_names, &ondisk->snaps[i],
544 			header->snap_names_len);
545 	}
546 
547 	return 0;
548 
549 err_names:
550 	kfree(header->snap_names);
551 err_snapc:
552 	kfree(header->snapc);
553 	return -ENOMEM;
554 }
555 
556 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
557 			u64 *seq, u64 *size)
558 {
559 	int i;
560 	char *p = header->snap_names;
561 
562 	for (i = 0; i < header->total_snaps; i++) {
563 		if (!strcmp(snap_name, p)) {
564 
565 			/* Found it.  Pass back its id and/or size */
566 
567 			if (seq)
568 				*seq = header->snapc->snaps[i];
569 			if (size)
570 				*size = header->snap_sizes[i];
571 			return i;
572 		}
573 		p += strlen(p) + 1;	/* Skip ahead to the next name */
574 	}
575 	return -ENOENT;
576 }
577 
578 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
579 {
580 	struct rbd_image_header *header = &dev->header;
581 	struct ceph_snap_context *snapc = header->snapc;
582 	int ret = -ENOENT;
583 
584 	BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
585 
586 	down_write(&dev->header_rwsem);
587 
588 	if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
589 		    sizeof (RBD_SNAP_HEAD_NAME))) {
590 		if (header->total_snaps)
591 			snapc->seq = header->snap_seq;
592 		else
593 			snapc->seq = 0;
594 		dev->snap_id = CEPH_NOSNAP;
595 		dev->read_only = 0;
596 		if (size)
597 			*size = header->image_size;
598 	} else {
599 		ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
600 		if (ret < 0)
601 			goto done;
602 		dev->snap_id = snapc->seq;
603 		dev->read_only = 1;
604 	}
605 
606 	ret = 0;
607 done:
608 	up_write(&dev->header_rwsem);
609 	return ret;
610 }
611 
612 static void rbd_header_free(struct rbd_image_header *header)
613 {
614 	kfree(header->snapc);
615 	kfree(header->snap_names);
616 	kfree(header->snap_sizes);
617 }
618 
619 /*
620  * get the actual striped segment name, offset and length
621  */
622 static u64 rbd_get_segment(struct rbd_image_header *header,
623 			   const char *block_name,
624 			   u64 ofs, u64 len,
625 			   char *seg_name, u64 *segofs)
626 {
627 	u64 seg = ofs >> header->obj_order;
628 
629 	if (seg_name)
630 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
631 			 "%s.%012llx", block_name, seg);
632 
633 	ofs = ofs & ((1 << header->obj_order) - 1);
634 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
635 
636 	if (segofs)
637 		*segofs = ofs;
638 
639 	return len;
640 }
641 
642 static int rbd_get_num_segments(struct rbd_image_header *header,
643 				u64 ofs, u64 len)
644 {
645 	u64 start_seg = ofs >> header->obj_order;
646 	u64 end_seg = (ofs + len - 1) >> header->obj_order;
647 	return end_seg - start_seg + 1;
648 }
649 
650 /*
651  * returns the size of an object in the image
652  */
653 static u64 rbd_obj_bytes(struct rbd_image_header *header)
654 {
655 	return 1 << header->obj_order;
656 }
657 
658 /*
659  * bio helpers
660  */
661 
662 static void bio_chain_put(struct bio *chain)
663 {
664 	struct bio *tmp;
665 
666 	while (chain) {
667 		tmp = chain;
668 		chain = chain->bi_next;
669 		bio_put(tmp);
670 	}
671 }
672 
673 /*
674  * zeros a bio chain, starting at specific offset
675  */
676 static void zero_bio_chain(struct bio *chain, int start_ofs)
677 {
678 	struct bio_vec *bv;
679 	unsigned long flags;
680 	void *buf;
681 	int i;
682 	int pos = 0;
683 
684 	while (chain) {
685 		bio_for_each_segment(bv, chain, i) {
686 			if (pos + bv->bv_len > start_ofs) {
687 				int remainder = max(start_ofs - pos, 0);
688 				buf = bvec_kmap_irq(bv, &flags);
689 				memset(buf + remainder, 0,
690 				       bv->bv_len - remainder);
691 				bvec_kunmap_irq(buf, &flags);
692 			}
693 			pos += bv->bv_len;
694 		}
695 
696 		chain = chain->bi_next;
697 	}
698 }
699 
700 /*
701  * bio_chain_clone - clone a chain of bios up to a certain length.
702  * might return a bio_pair that will need to be released.
703  */
704 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
705 				   struct bio_pair **bp,
706 				   int len, gfp_t gfpmask)
707 {
708 	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
709 	int total = 0;
710 
711 	if (*bp) {
712 		bio_pair_release(*bp);
713 		*bp = NULL;
714 	}
715 
716 	while (old_chain && (total < len)) {
717 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
718 		if (!tmp)
719 			goto err_out;
720 
721 		if (total + old_chain->bi_size > len) {
722 			struct bio_pair *bp;
723 
724 			/*
725 			 * this split can only happen with a single paged bio,
726 			 * split_bio will BUG_ON if this is not the case
727 			 */
728 			dout("bio_chain_clone split! total=%d remaining=%d"
729 			     "bi_size=%d\n",
730 			     (int)total, (int)len-total,
731 			     (int)old_chain->bi_size);
732 
733 			/* split the bio. We'll release it either in the next
734 			   call, or it will have to be released outside */
735 			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
736 			if (!bp)
737 				goto err_out;
738 
739 			__bio_clone(tmp, &bp->bio1);
740 
741 			*next = &bp->bio2;
742 		} else {
743 			__bio_clone(tmp, old_chain);
744 			*next = old_chain->bi_next;
745 		}
746 
747 		tmp->bi_bdev = NULL;
748 		gfpmask &= ~__GFP_WAIT;
749 		tmp->bi_next = NULL;
750 
751 		if (!new_chain) {
752 			new_chain = tail = tmp;
753 		} else {
754 			tail->bi_next = tmp;
755 			tail = tmp;
756 		}
757 		old_chain = old_chain->bi_next;
758 
759 		total += tmp->bi_size;
760 	}
761 
762 	BUG_ON(total < len);
763 
764 	if (tail)
765 		tail->bi_next = NULL;
766 
767 	*old = old_chain;
768 
769 	return new_chain;
770 
771 err_out:
772 	dout("bio_chain_clone with err\n");
773 	bio_chain_put(new_chain);
774 	return NULL;
775 }
776 
777 /*
778  * helpers for osd request op vectors.
779  */
780 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
781 			    int num_ops,
782 			    int opcode,
783 			    u32 payload_len)
784 {
785 	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
786 		       GFP_NOIO);
787 	if (!*ops)
788 		return -ENOMEM;
789 	(*ops)[0].op = opcode;
790 	/*
791 	 * op extent offset and length will be set later on
792 	 * in calc_raw_layout()
793 	 */
794 	(*ops)[0].payload_len = payload_len;
795 	return 0;
796 }
797 
798 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
799 {
800 	kfree(ops);
801 }
802 
803 static void rbd_coll_end_req_index(struct request *rq,
804 				   struct rbd_req_coll *coll,
805 				   int index,
806 				   int ret, u64 len)
807 {
808 	struct request_queue *q;
809 	int min, max, i;
810 
811 	dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
812 	     coll, index, ret, len);
813 
814 	if (!rq)
815 		return;
816 
817 	if (!coll) {
818 		blk_end_request(rq, ret, len);
819 		return;
820 	}
821 
822 	q = rq->q;
823 
824 	spin_lock_irq(q->queue_lock);
825 	coll->status[index].done = 1;
826 	coll->status[index].rc = ret;
827 	coll->status[index].bytes = len;
828 	max = min = coll->num_done;
829 	while (max < coll->total && coll->status[max].done)
830 		max++;
831 
832 	for (i = min; i<max; i++) {
833 		__blk_end_request(rq, coll->status[i].rc,
834 				  coll->status[i].bytes);
835 		coll->num_done++;
836 		kref_put(&coll->kref, rbd_coll_release);
837 	}
838 	spin_unlock_irq(q->queue_lock);
839 }
840 
841 static void rbd_coll_end_req(struct rbd_request *req,
842 			     int ret, u64 len)
843 {
844 	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
845 }
846 
847 /*
848  * Send ceph osd request
849  */
850 static int rbd_do_request(struct request *rq,
851 			  struct rbd_device *dev,
852 			  struct ceph_snap_context *snapc,
853 			  u64 snapid,
854 			  const char *obj, u64 ofs, u64 len,
855 			  struct bio *bio,
856 			  struct page **pages,
857 			  int num_pages,
858 			  int flags,
859 			  struct ceph_osd_req_op *ops,
860 			  int num_reply,
861 			  struct rbd_req_coll *coll,
862 			  int coll_index,
863 			  void (*rbd_cb)(struct ceph_osd_request *req,
864 					 struct ceph_msg *msg),
865 			  struct ceph_osd_request **linger_req,
866 			  u64 *ver)
867 {
868 	struct ceph_osd_request *req;
869 	struct ceph_file_layout *layout;
870 	int ret;
871 	u64 bno;
872 	struct timespec mtime = CURRENT_TIME;
873 	struct rbd_request *req_data;
874 	struct ceph_osd_request_head *reqhead;
875 	struct ceph_osd_client *osdc;
876 
877 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
878 	if (!req_data) {
879 		if (coll)
880 			rbd_coll_end_req_index(rq, coll, coll_index,
881 					       -ENOMEM, len);
882 		return -ENOMEM;
883 	}
884 
885 	if (coll) {
886 		req_data->coll = coll;
887 		req_data->coll_index = coll_index;
888 	}
889 
890 	dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
891 
892 	down_read(&dev->header_rwsem);
893 
894 	osdc = &dev->rbd_client->client->osdc;
895 	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
896 					false, GFP_NOIO, pages, bio);
897 	if (!req) {
898 		up_read(&dev->header_rwsem);
899 		ret = -ENOMEM;
900 		goto done_pages;
901 	}
902 
903 	req->r_callback = rbd_cb;
904 
905 	req_data->rq = rq;
906 	req_data->bio = bio;
907 	req_data->pages = pages;
908 	req_data->len = len;
909 
910 	req->r_priv = req_data;
911 
912 	reqhead = req->r_request->front.iov_base;
913 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
914 
915 	strncpy(req->r_oid, obj, sizeof(req->r_oid));
916 	req->r_oid_len = strlen(req->r_oid);
917 
918 	layout = &req->r_file_layout;
919 	memset(layout, 0, sizeof(*layout));
920 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
921 	layout->fl_stripe_count = cpu_to_le32(1);
922 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
923 	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
924 	ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
925 				req, ops);
926 
927 	ceph_osdc_build_request(req, ofs, &len,
928 				ops,
929 				snapc,
930 				&mtime,
931 				req->r_oid, req->r_oid_len);
932 	up_read(&dev->header_rwsem);
933 
934 	if (linger_req) {
935 		ceph_osdc_set_request_linger(osdc, req);
936 		*linger_req = req;
937 	}
938 
939 	ret = ceph_osdc_start_request(osdc, req, false);
940 	if (ret < 0)
941 		goto done_err;
942 
943 	if (!rbd_cb) {
944 		ret = ceph_osdc_wait_request(osdc, req);
945 		if (ver)
946 			*ver = le64_to_cpu(req->r_reassert_version.version);
947 		dout("reassert_ver=%lld\n",
948 		     le64_to_cpu(req->r_reassert_version.version));
949 		ceph_osdc_put_request(req);
950 	}
951 	return ret;
952 
953 done_err:
954 	bio_chain_put(req_data->bio);
955 	ceph_osdc_put_request(req);
956 done_pages:
957 	rbd_coll_end_req(req_data, ret, len);
958 	kfree(req_data);
959 	return ret;
960 }
961 
962 /*
963  * Ceph osd op callback
964  */
965 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
966 {
967 	struct rbd_request *req_data = req->r_priv;
968 	struct ceph_osd_reply_head *replyhead;
969 	struct ceph_osd_op *op;
970 	__s32 rc;
971 	u64 bytes;
972 	int read_op;
973 
974 	/* parse reply */
975 	replyhead = msg->front.iov_base;
976 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
977 	op = (void *)(replyhead + 1);
978 	rc = le32_to_cpu(replyhead->result);
979 	bytes = le64_to_cpu(op->extent.length);
980 	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
981 
982 	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
983 
984 	if (rc == -ENOENT && read_op) {
985 		zero_bio_chain(req_data->bio, 0);
986 		rc = 0;
987 	} else if (rc == 0 && read_op && bytes < req_data->len) {
988 		zero_bio_chain(req_data->bio, bytes);
989 		bytes = req_data->len;
990 	}
991 
992 	rbd_coll_end_req(req_data, rc, bytes);
993 
994 	if (req_data->bio)
995 		bio_chain_put(req_data->bio);
996 
997 	ceph_osdc_put_request(req);
998 	kfree(req_data);
999 }
1000 
1001 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1002 {
1003 	ceph_osdc_put_request(req);
1004 }
1005 
1006 /*
1007  * Do a synchronous ceph osd operation
1008  */
1009 static int rbd_req_sync_op(struct rbd_device *dev,
1010 			   struct ceph_snap_context *snapc,
1011 			   u64 snapid,
1012 			   int opcode,
1013 			   int flags,
1014 			   struct ceph_osd_req_op *orig_ops,
1015 			   int num_reply,
1016 			   const char *obj,
1017 			   u64 ofs, u64 len,
1018 			   char *buf,
1019 			   struct ceph_osd_request **linger_req,
1020 			   u64 *ver)
1021 {
1022 	int ret;
1023 	struct page **pages;
1024 	int num_pages;
1025 	struct ceph_osd_req_op *ops = orig_ops;
1026 	u32 payload_len;
1027 
1028 	num_pages = calc_pages_for(ofs , len);
1029 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1030 	if (IS_ERR(pages))
1031 		return PTR_ERR(pages);
1032 
1033 	if (!orig_ops) {
1034 		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1035 		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1036 		if (ret < 0)
1037 			goto done;
1038 
1039 		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1040 			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1041 			if (ret < 0)
1042 				goto done_ops;
1043 		}
1044 	}
1045 
1046 	ret = rbd_do_request(NULL, dev, snapc, snapid,
1047 			  obj, ofs, len, NULL,
1048 			  pages, num_pages,
1049 			  flags,
1050 			  ops,
1051 			  2,
1052 			  NULL, 0,
1053 			  NULL,
1054 			  linger_req, ver);
1055 	if (ret < 0)
1056 		goto done_ops;
1057 
1058 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1059 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1060 
1061 done_ops:
1062 	if (!orig_ops)
1063 		rbd_destroy_ops(ops);
1064 done:
1065 	ceph_release_page_vector(pages, num_pages);
1066 	return ret;
1067 }
1068 
1069 /*
1070  * Do an asynchronous ceph osd operation
1071  */
1072 static int rbd_do_op(struct request *rq,
1073 		     struct rbd_device *rbd_dev ,
1074 		     struct ceph_snap_context *snapc,
1075 		     u64 snapid,
1076 		     int opcode, int flags, int num_reply,
1077 		     u64 ofs, u64 len,
1078 		     struct bio *bio,
1079 		     struct rbd_req_coll *coll,
1080 		     int coll_index)
1081 {
1082 	char *seg_name;
1083 	u64 seg_ofs;
1084 	u64 seg_len;
1085 	int ret;
1086 	struct ceph_osd_req_op *ops;
1087 	u32 payload_len;
1088 
1089 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1090 	if (!seg_name)
1091 		return -ENOMEM;
1092 
1093 	seg_len = rbd_get_segment(&rbd_dev->header,
1094 				  rbd_dev->header.block_name,
1095 				  ofs, len,
1096 				  seg_name, &seg_ofs);
1097 
1098 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1099 
1100 	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1101 	if (ret < 0)
1102 		goto done;
1103 
1104 	/* we've taken care of segment sizes earlier when we
1105 	   cloned the bios. We should never have a segment
1106 	   truncated at this point */
1107 	BUG_ON(seg_len < len);
1108 
1109 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1110 			     seg_name, seg_ofs, seg_len,
1111 			     bio,
1112 			     NULL, 0,
1113 			     flags,
1114 			     ops,
1115 			     num_reply,
1116 			     coll, coll_index,
1117 			     rbd_req_cb, 0, NULL);
1118 
1119 	rbd_destroy_ops(ops);
1120 done:
1121 	kfree(seg_name);
1122 	return ret;
1123 }
1124 
1125 /*
1126  * Request async osd write
1127  */
1128 static int rbd_req_write(struct request *rq,
1129 			 struct rbd_device *rbd_dev,
1130 			 struct ceph_snap_context *snapc,
1131 			 u64 ofs, u64 len,
1132 			 struct bio *bio,
1133 			 struct rbd_req_coll *coll,
1134 			 int coll_index)
1135 {
1136 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1137 			 CEPH_OSD_OP_WRITE,
1138 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1139 			 2,
1140 			 ofs, len, bio, coll, coll_index);
1141 }
1142 
1143 /*
1144  * Request async osd read
1145  */
1146 static int rbd_req_read(struct request *rq,
1147 			 struct rbd_device *rbd_dev,
1148 			 u64 snapid,
1149 			 u64 ofs, u64 len,
1150 			 struct bio *bio,
1151 			 struct rbd_req_coll *coll,
1152 			 int coll_index)
1153 {
1154 	return rbd_do_op(rq, rbd_dev, NULL,
1155 			 snapid,
1156 			 CEPH_OSD_OP_READ,
1157 			 CEPH_OSD_FLAG_READ,
1158 			 2,
1159 			 ofs, len, bio, coll, coll_index);
1160 }
1161 
1162 /*
1163  * Request sync osd read
1164  */
1165 static int rbd_req_sync_read(struct rbd_device *dev,
1166 			  struct ceph_snap_context *snapc,
1167 			  u64 snapid,
1168 			  const char *obj,
1169 			  u64 ofs, u64 len,
1170 			  char *buf,
1171 			  u64 *ver)
1172 {
1173 	return rbd_req_sync_op(dev, NULL,
1174 			       snapid,
1175 			       CEPH_OSD_OP_READ,
1176 			       CEPH_OSD_FLAG_READ,
1177 			       NULL,
1178 			       1, obj, ofs, len, buf, NULL, ver);
1179 }
1180 
1181 /*
1182  * Request sync osd watch
1183  */
1184 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1185 				   u64 ver,
1186 				   u64 notify_id,
1187 				   const char *obj)
1188 {
1189 	struct ceph_osd_req_op *ops;
1190 	struct page **pages = NULL;
1191 	int ret;
1192 
1193 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1194 	if (ret < 0)
1195 		return ret;
1196 
1197 	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1198 	ops[0].watch.cookie = notify_id;
1199 	ops[0].watch.flag = 0;
1200 
1201 	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1202 			  obj, 0, 0, NULL,
1203 			  pages, 0,
1204 			  CEPH_OSD_FLAG_READ,
1205 			  ops,
1206 			  1,
1207 			  NULL, 0,
1208 			  rbd_simple_req_cb, 0, NULL);
1209 
1210 	rbd_destroy_ops(ops);
1211 	return ret;
1212 }
1213 
1214 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1215 {
1216 	struct rbd_device *dev = (struct rbd_device *)data;
1217 	int rc;
1218 
1219 	if (!dev)
1220 		return;
1221 
1222 	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1223 		notify_id, (int)opcode);
1224 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1225 	rc = __rbd_refresh_header(dev);
1226 	mutex_unlock(&ctl_mutex);
1227 	if (rc)
1228 		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1229 			   " update snaps: %d\n", dev->major, rc);
1230 
1231 	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1232 }
1233 
1234 /*
1235  * Request sync osd watch
1236  */
1237 static int rbd_req_sync_watch(struct rbd_device *dev,
1238 			      const char *obj,
1239 			      u64 ver)
1240 {
1241 	struct ceph_osd_req_op *ops;
1242 	struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1243 
1244 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1245 	if (ret < 0)
1246 		return ret;
1247 
1248 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249 				     (void *)dev, &dev->watch_event);
1250 	if (ret < 0)
1251 		goto fail;
1252 
1253 	ops[0].watch.ver = cpu_to_le64(ver);
1254 	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1255 	ops[0].watch.flag = 1;
1256 
1257 	ret = rbd_req_sync_op(dev, NULL,
1258 			      CEPH_NOSNAP,
1259 			      0,
1260 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1261 			      ops,
1262 			      1, obj, 0, 0, NULL,
1263 			      &dev->watch_request, NULL);
1264 
1265 	if (ret < 0)
1266 		goto fail_event;
1267 
1268 	rbd_destroy_ops(ops);
1269 	return 0;
1270 
1271 fail_event:
1272 	ceph_osdc_cancel_event(dev->watch_event);
1273 	dev->watch_event = NULL;
1274 fail:
1275 	rbd_destroy_ops(ops);
1276 	return ret;
1277 }
1278 
1279 /*
1280  * Request sync osd unwatch
1281  */
1282 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1283 				const char *obj)
1284 {
1285 	struct ceph_osd_req_op *ops;
1286 
1287 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1288 	if (ret < 0)
1289 		return ret;
1290 
1291 	ops[0].watch.ver = 0;
1292 	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1293 	ops[0].watch.flag = 0;
1294 
1295 	ret = rbd_req_sync_op(dev, NULL,
1296 			      CEPH_NOSNAP,
1297 			      0,
1298 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299 			      ops,
1300 			      1, obj, 0, 0, NULL, NULL, NULL);
1301 
1302 	rbd_destroy_ops(ops);
1303 	ceph_osdc_cancel_event(dev->watch_event);
1304 	dev->watch_event = NULL;
1305 	return ret;
1306 }
1307 
1308 struct rbd_notify_info {
1309 	struct rbd_device *dev;
1310 };
1311 
1312 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1313 {
1314 	struct rbd_device *dev = (struct rbd_device *)data;
1315 	if (!dev)
1316 		return;
1317 
1318 	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1319 		notify_id, (int)opcode);
1320 }
1321 
1322 /*
1323  * Request sync osd notify
1324  */
1325 static int rbd_req_sync_notify(struct rbd_device *dev,
1326 		          const char *obj)
1327 {
1328 	struct ceph_osd_req_op *ops;
1329 	struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1330 	struct ceph_osd_event *event;
1331 	struct rbd_notify_info info;
1332 	int payload_len = sizeof(u32) + sizeof(u32);
1333 	int ret;
1334 
1335 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1336 	if (ret < 0)
1337 		return ret;
1338 
1339 	info.dev = dev;
1340 
1341 	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1342 				     (void *)&info, &event);
1343 	if (ret < 0)
1344 		goto fail;
1345 
1346 	ops[0].watch.ver = 1;
1347 	ops[0].watch.flag = 1;
1348 	ops[0].watch.cookie = event->cookie;
1349 	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1350 	ops[0].watch.timeout = 12;
1351 
1352 	ret = rbd_req_sync_op(dev, NULL,
1353 			       CEPH_NOSNAP,
1354 			       0,
1355 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356 			       ops,
1357 			       1, obj, 0, 0, NULL, NULL, NULL);
1358 	if (ret < 0)
1359 		goto fail_event;
1360 
1361 	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1362 	dout("ceph_osdc_wait_event returned %d\n", ret);
1363 	rbd_destroy_ops(ops);
1364 	return 0;
1365 
1366 fail_event:
1367 	ceph_osdc_cancel_event(event);
1368 fail:
1369 	rbd_destroy_ops(ops);
1370 	return ret;
1371 }
1372 
1373 /*
1374  * Request sync osd read
1375  */
1376 static int rbd_req_sync_exec(struct rbd_device *dev,
1377 			     const char *obj,
1378 			     const char *cls,
1379 			     const char *method,
1380 			     const char *data,
1381 			     int len,
1382 			     u64 *ver)
1383 {
1384 	struct ceph_osd_req_op *ops;
1385 	int cls_len = strlen(cls);
1386 	int method_len = strlen(method);
1387 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1388 				    cls_len + method_len + len);
1389 	if (ret < 0)
1390 		return ret;
1391 
1392 	ops[0].cls.class_name = cls;
1393 	ops[0].cls.class_len = (__u8)cls_len;
1394 	ops[0].cls.method_name = method;
1395 	ops[0].cls.method_len = (__u8)method_len;
1396 	ops[0].cls.argc = 0;
1397 	ops[0].cls.indata = data;
1398 	ops[0].cls.indata_len = len;
1399 
1400 	ret = rbd_req_sync_op(dev, NULL,
1401 			       CEPH_NOSNAP,
1402 			       0,
1403 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1404 			       ops,
1405 			       1, obj, 0, 0, NULL, NULL, ver);
1406 
1407 	rbd_destroy_ops(ops);
1408 
1409 	dout("cls_exec returned %d\n", ret);
1410 	return ret;
1411 }
1412 
1413 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1414 {
1415 	struct rbd_req_coll *coll =
1416 			kzalloc(sizeof(struct rbd_req_coll) +
1417 			        sizeof(struct rbd_req_status) * num_reqs,
1418 				GFP_ATOMIC);
1419 
1420 	if (!coll)
1421 		return NULL;
1422 	coll->total = num_reqs;
1423 	kref_init(&coll->kref);
1424 	return coll;
1425 }
1426 
1427 /*
1428  * block device queue callback
1429  */
1430 static void rbd_rq_fn(struct request_queue *q)
1431 {
1432 	struct rbd_device *rbd_dev = q->queuedata;
1433 	struct request *rq;
1434 	struct bio_pair *bp = NULL;
1435 
1436 	while ((rq = blk_fetch_request(q))) {
1437 		struct bio *bio;
1438 		struct bio *rq_bio, *next_bio = NULL;
1439 		bool do_write;
1440 		int size, op_size = 0;
1441 		u64 ofs;
1442 		int num_segs, cur_seg = 0;
1443 		struct rbd_req_coll *coll;
1444 
1445 		/* peek at request from block layer */
1446 		if (!rq)
1447 			break;
1448 
1449 		dout("fetched request\n");
1450 
1451 		/* filter out block requests we don't understand */
1452 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1453 			__blk_end_request_all(rq, 0);
1454 			continue;
1455 		}
1456 
1457 		/* deduce our operation (read, write) */
1458 		do_write = (rq_data_dir(rq) == WRITE);
1459 
1460 		size = blk_rq_bytes(rq);
1461 		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1462 		rq_bio = rq->bio;
1463 		if (do_write && rbd_dev->read_only) {
1464 			__blk_end_request_all(rq, -EROFS);
1465 			continue;
1466 		}
1467 
1468 		spin_unlock_irq(q->queue_lock);
1469 
1470 		dout("%s 0x%x bytes at 0x%llx\n",
1471 		     do_write ? "write" : "read",
1472 		     size, blk_rq_pos(rq) * SECTOR_SIZE);
1473 
1474 		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1475 		coll = rbd_alloc_coll(num_segs);
1476 		if (!coll) {
1477 			spin_lock_irq(q->queue_lock);
1478 			__blk_end_request_all(rq, -ENOMEM);
1479 			continue;
1480 		}
1481 
1482 		do {
1483 			/* a bio clone to be passed down to OSD req */
1484 			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1485 			op_size = rbd_get_segment(&rbd_dev->header,
1486 						  rbd_dev->header.block_name,
1487 						  ofs, size,
1488 						  NULL, NULL);
1489 			kref_get(&coll->kref);
1490 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1491 					      op_size, GFP_ATOMIC);
1492 			if (!bio) {
1493 				rbd_coll_end_req_index(rq, coll, cur_seg,
1494 						       -ENOMEM, op_size);
1495 				goto next_seg;
1496 			}
1497 
1498 
1499 			/* init OSD command: write or read */
1500 			if (do_write)
1501 				rbd_req_write(rq, rbd_dev,
1502 					      rbd_dev->header.snapc,
1503 					      ofs,
1504 					      op_size, bio,
1505 					      coll, cur_seg);
1506 			else
1507 				rbd_req_read(rq, rbd_dev,
1508 					     rbd_dev->snap_id,
1509 					     ofs,
1510 					     op_size, bio,
1511 					     coll, cur_seg);
1512 
1513 next_seg:
1514 			size -= op_size;
1515 			ofs += op_size;
1516 
1517 			cur_seg++;
1518 			rq_bio = next_bio;
1519 		} while (size > 0);
1520 		kref_put(&coll->kref, rbd_coll_release);
1521 
1522 		if (bp)
1523 			bio_pair_release(bp);
1524 		spin_lock_irq(q->queue_lock);
1525 	}
1526 }
1527 
1528 /*
1529  * a queue callback. Makes sure that we don't create a bio that spans across
1530  * multiple osd objects. One exception would be with a single page bios,
1531  * which we handle later at bio_chain_clone
1532  */
1533 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1534 			  struct bio_vec *bvec)
1535 {
1536 	struct rbd_device *rbd_dev = q->queuedata;
1537 	unsigned int chunk_sectors;
1538 	sector_t sector;
1539 	unsigned int bio_sectors;
1540 	int max;
1541 
1542 	chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1543 	sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1544 	bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1545 
1546 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1547 				 + bio_sectors)) << SECTOR_SHIFT;
1548 	if (max < 0)
1549 		max = 0; /* bio_add cannot handle a negative return */
1550 	if (max <= bvec->bv_len && bio_sectors == 0)
1551 		return bvec->bv_len;
1552 	return max;
1553 }
1554 
1555 static void rbd_free_disk(struct rbd_device *rbd_dev)
1556 {
1557 	struct gendisk *disk = rbd_dev->disk;
1558 
1559 	if (!disk)
1560 		return;
1561 
1562 	rbd_header_free(&rbd_dev->header);
1563 
1564 	if (disk->flags & GENHD_FL_UP)
1565 		del_gendisk(disk);
1566 	if (disk->queue)
1567 		blk_cleanup_queue(disk->queue);
1568 	put_disk(disk);
1569 }
1570 
1571 /*
1572  * reload the ondisk the header
1573  */
1574 static int rbd_read_header(struct rbd_device *rbd_dev,
1575 			   struct rbd_image_header *header)
1576 {
1577 	ssize_t rc;
1578 	struct rbd_image_header_ondisk *dh;
1579 	u32 snap_count = 0;
1580 	u64 ver;
1581 	size_t len;
1582 
1583 	/*
1584 	 * First reads the fixed-size header to determine the number
1585 	 * of snapshots, then re-reads it, along with all snapshot
1586 	 * records as well as their stored names.
1587 	 */
1588 	len = sizeof (*dh);
1589 	while (1) {
1590 		dh = kmalloc(len, GFP_KERNEL);
1591 		if (!dh)
1592 			return -ENOMEM;
1593 
1594 		rc = rbd_req_sync_read(rbd_dev,
1595 				       NULL, CEPH_NOSNAP,
1596 				       rbd_dev->obj_md_name,
1597 				       0, len,
1598 				       (char *)dh, &ver);
1599 		if (rc < 0)
1600 			goto out_dh;
1601 
1602 		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1603 		if (rc < 0) {
1604 			if (rc == -ENXIO)
1605 				pr_warning("unrecognized header format"
1606 					   " for image %s", rbd_dev->obj);
1607 			goto out_dh;
1608 		}
1609 
1610 		if (snap_count == header->total_snaps)
1611 			break;
1612 
1613 		snap_count = header->total_snaps;
1614 		len = sizeof (*dh) +
1615 			snap_count * sizeof(struct rbd_image_snap_ondisk) +
1616 			header->snap_names_len;
1617 
1618 		rbd_header_free(header);
1619 		kfree(dh);
1620 	}
1621 	header->obj_version = ver;
1622 
1623 out_dh:
1624 	kfree(dh);
1625 	return rc;
1626 }
1627 
1628 /*
1629  * create a snapshot
1630  */
1631 static int rbd_header_add_snap(struct rbd_device *dev,
1632 			       const char *snap_name,
1633 			       gfp_t gfp_flags)
1634 {
1635 	int name_len = strlen(snap_name);
1636 	u64 new_snapid;
1637 	int ret;
1638 	void *data, *p, *e;
1639 	u64 ver;
1640 	struct ceph_mon_client *monc;
1641 
1642 	/* we should create a snapshot only if we're pointing at the head */
1643 	if (dev->snap_id != CEPH_NOSNAP)
1644 		return -EINVAL;
1645 
1646 	monc = &dev->rbd_client->client->monc;
1647 	ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1648 	dout("created snapid=%lld\n", new_snapid);
1649 	if (ret < 0)
1650 		return ret;
1651 
1652 	data = kmalloc(name_len + 16, gfp_flags);
1653 	if (!data)
1654 		return -ENOMEM;
1655 
1656 	p = data;
1657 	e = data + name_len + 16;
1658 
1659 	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1660 	ceph_encode_64_safe(&p, e, new_snapid, bad);
1661 
1662 	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1663 				data, p - data, &ver);
1664 
1665 	kfree(data);
1666 
1667 	if (ret < 0)
1668 		return ret;
1669 
1670 	down_write(&dev->header_rwsem);
1671 	dev->header.snapc->seq = new_snapid;
1672 	up_write(&dev->header_rwsem);
1673 
1674 	return 0;
1675 bad:
1676 	return -ERANGE;
1677 }
1678 
1679 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1680 {
1681 	struct rbd_snap *snap;
1682 
1683 	while (!list_empty(&rbd_dev->snaps)) {
1684 		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1685 		__rbd_remove_snap_dev(rbd_dev, snap);
1686 	}
1687 }
1688 
1689 /*
1690  * only read the first part of the ondisk header, without the snaps info
1691  */
1692 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1693 {
1694 	int ret;
1695 	struct rbd_image_header h;
1696 	u64 snap_seq;
1697 	int follow_seq = 0;
1698 
1699 	ret = rbd_read_header(rbd_dev, &h);
1700 	if (ret < 0)
1701 		return ret;
1702 
1703 	/* resized? */
1704 	set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1705 
1706 	down_write(&rbd_dev->header_rwsem);
1707 
1708 	snap_seq = rbd_dev->header.snapc->seq;
1709 	if (rbd_dev->header.total_snaps &&
1710 	    rbd_dev->header.snapc->snaps[0] == snap_seq)
1711 		/* pointing at the head, will need to follow that
1712 		   if head moves */
1713 		follow_seq = 1;
1714 
1715 	kfree(rbd_dev->header.snapc);
1716 	kfree(rbd_dev->header.snap_names);
1717 	kfree(rbd_dev->header.snap_sizes);
1718 
1719 	rbd_dev->header.total_snaps = h.total_snaps;
1720 	rbd_dev->header.snapc = h.snapc;
1721 	rbd_dev->header.snap_names = h.snap_names;
1722 	rbd_dev->header.snap_names_len = h.snap_names_len;
1723 	rbd_dev->header.snap_sizes = h.snap_sizes;
1724 	if (follow_seq)
1725 		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1726 	else
1727 		rbd_dev->header.snapc->seq = snap_seq;
1728 
1729 	ret = __rbd_init_snaps_header(rbd_dev);
1730 
1731 	up_write(&rbd_dev->header_rwsem);
1732 
1733 	return ret;
1734 }
1735 
1736 static int rbd_init_disk(struct rbd_device *rbd_dev)
1737 {
1738 	struct gendisk *disk;
1739 	struct request_queue *q;
1740 	int rc;
1741 	u64 segment_size;
1742 	u64 total_size = 0;
1743 
1744 	/* contact OSD, request size info about the object being mapped */
1745 	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1746 	if (rc)
1747 		return rc;
1748 
1749 	/* no need to lock here, as rbd_dev is not registered yet */
1750 	rc = __rbd_init_snaps_header(rbd_dev);
1751 	if (rc)
1752 		return rc;
1753 
1754 	rc = rbd_header_set_snap(rbd_dev, &total_size);
1755 	if (rc)
1756 		return rc;
1757 
1758 	/* create gendisk info */
1759 	rc = -ENOMEM;
1760 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1761 	if (!disk)
1762 		goto out;
1763 
1764 	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1765 		 rbd_dev->id);
1766 	disk->major = rbd_dev->major;
1767 	disk->first_minor = 0;
1768 	disk->fops = &rbd_bd_ops;
1769 	disk->private_data = rbd_dev;
1770 
1771 	/* init rq */
1772 	rc = -ENOMEM;
1773 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1774 	if (!q)
1775 		goto out_disk;
1776 
1777 	/* We use the default size, but let's be explicit about it. */
1778 	blk_queue_physical_block_size(q, SECTOR_SIZE);
1779 
1780 	/* set io sizes to object size */
1781 	segment_size = rbd_obj_bytes(&rbd_dev->header);
1782 	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1783 	blk_queue_max_segment_size(q, segment_size);
1784 	blk_queue_io_min(q, segment_size);
1785 	blk_queue_io_opt(q, segment_size);
1786 
1787 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1788 	disk->queue = q;
1789 
1790 	q->queuedata = rbd_dev;
1791 
1792 	rbd_dev->disk = disk;
1793 	rbd_dev->q = q;
1794 
1795 	/* finally, announce the disk to the world */
1796 	set_capacity(disk, total_size / SECTOR_SIZE);
1797 	add_disk(disk);
1798 
1799 	pr_info("%s: added with size 0x%llx\n",
1800 		disk->disk_name, (unsigned long long)total_size);
1801 	return 0;
1802 
1803 out_disk:
1804 	put_disk(disk);
1805 out:
1806 	return rc;
1807 }
1808 
1809 /*
1810   sysfs
1811 */
1812 
1813 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1814 {
1815 	return container_of(dev, struct rbd_device, dev);
1816 }
1817 
1818 static ssize_t rbd_size_show(struct device *dev,
1819 			     struct device_attribute *attr, char *buf)
1820 {
1821 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1822 
1823 	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1824 }
1825 
1826 static ssize_t rbd_major_show(struct device *dev,
1827 			      struct device_attribute *attr, char *buf)
1828 {
1829 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1830 
1831 	return sprintf(buf, "%d\n", rbd_dev->major);
1832 }
1833 
1834 static ssize_t rbd_client_id_show(struct device *dev,
1835 				  struct device_attribute *attr, char *buf)
1836 {
1837 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1838 
1839 	return sprintf(buf, "client%lld\n",
1840 			ceph_client_id(rbd_dev->rbd_client->client));
1841 }
1842 
1843 static ssize_t rbd_pool_show(struct device *dev,
1844 			     struct device_attribute *attr, char *buf)
1845 {
1846 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1847 
1848 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1849 }
1850 
1851 static ssize_t rbd_name_show(struct device *dev,
1852 			     struct device_attribute *attr, char *buf)
1853 {
1854 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1855 
1856 	return sprintf(buf, "%s\n", rbd_dev->obj);
1857 }
1858 
1859 static ssize_t rbd_snap_show(struct device *dev,
1860 			     struct device_attribute *attr,
1861 			     char *buf)
1862 {
1863 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1864 
1865 	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1866 }
1867 
1868 static ssize_t rbd_image_refresh(struct device *dev,
1869 				 struct device_attribute *attr,
1870 				 const char *buf,
1871 				 size_t size)
1872 {
1873 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1874 	int rc;
1875 	int ret = size;
1876 
1877 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1878 
1879 	rc = __rbd_refresh_header(rbd_dev);
1880 	if (rc < 0)
1881 		ret = rc;
1882 
1883 	mutex_unlock(&ctl_mutex);
1884 	return ret;
1885 }
1886 
1887 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1888 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1889 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1890 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1891 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1892 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1893 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1894 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1895 
1896 static struct attribute *rbd_attrs[] = {
1897 	&dev_attr_size.attr,
1898 	&dev_attr_major.attr,
1899 	&dev_attr_client_id.attr,
1900 	&dev_attr_pool.attr,
1901 	&dev_attr_name.attr,
1902 	&dev_attr_current_snap.attr,
1903 	&dev_attr_refresh.attr,
1904 	&dev_attr_create_snap.attr,
1905 	NULL
1906 };
1907 
1908 static struct attribute_group rbd_attr_group = {
1909 	.attrs = rbd_attrs,
1910 };
1911 
1912 static const struct attribute_group *rbd_attr_groups[] = {
1913 	&rbd_attr_group,
1914 	NULL
1915 };
1916 
1917 static void rbd_sysfs_dev_release(struct device *dev)
1918 {
1919 }
1920 
1921 static struct device_type rbd_device_type = {
1922 	.name		= "rbd",
1923 	.groups		= rbd_attr_groups,
1924 	.release	= rbd_sysfs_dev_release,
1925 };
1926 
1927 
1928 /*
1929   sysfs - snapshots
1930 */
1931 
1932 static ssize_t rbd_snap_size_show(struct device *dev,
1933 				  struct device_attribute *attr,
1934 				  char *buf)
1935 {
1936 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1937 
1938 	return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1939 }
1940 
1941 static ssize_t rbd_snap_id_show(struct device *dev,
1942 				struct device_attribute *attr,
1943 				char *buf)
1944 {
1945 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1946 
1947 	return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1948 }
1949 
1950 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1951 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1952 
1953 static struct attribute *rbd_snap_attrs[] = {
1954 	&dev_attr_snap_size.attr,
1955 	&dev_attr_snap_id.attr,
1956 	NULL,
1957 };
1958 
1959 static struct attribute_group rbd_snap_attr_group = {
1960 	.attrs = rbd_snap_attrs,
1961 };
1962 
1963 static void rbd_snap_dev_release(struct device *dev)
1964 {
1965 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1966 	kfree(snap->name);
1967 	kfree(snap);
1968 }
1969 
1970 static const struct attribute_group *rbd_snap_attr_groups[] = {
1971 	&rbd_snap_attr_group,
1972 	NULL
1973 };
1974 
1975 static struct device_type rbd_snap_device_type = {
1976 	.groups		= rbd_snap_attr_groups,
1977 	.release	= rbd_snap_dev_release,
1978 };
1979 
1980 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1981 				  struct rbd_snap *snap)
1982 {
1983 	list_del(&snap->node);
1984 	device_unregister(&snap->dev);
1985 }
1986 
1987 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1988 				  struct rbd_snap *snap,
1989 				  struct device *parent)
1990 {
1991 	struct device *dev = &snap->dev;
1992 	int ret;
1993 
1994 	dev->type = &rbd_snap_device_type;
1995 	dev->parent = parent;
1996 	dev->release = rbd_snap_dev_release;
1997 	dev_set_name(dev, "snap_%s", snap->name);
1998 	ret = device_register(dev);
1999 
2000 	return ret;
2001 }
2002 
2003 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2004 			      int i, const char *name,
2005 			      struct rbd_snap **snapp)
2006 {
2007 	int ret;
2008 	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2009 	if (!snap)
2010 		return -ENOMEM;
2011 	snap->name = kstrdup(name, GFP_KERNEL);
2012 	snap->size = rbd_dev->header.snap_sizes[i];
2013 	snap->id = rbd_dev->header.snapc->snaps[i];
2014 	if (device_is_registered(&rbd_dev->dev)) {
2015 		ret = rbd_register_snap_dev(rbd_dev, snap,
2016 					     &rbd_dev->dev);
2017 		if (ret < 0)
2018 			goto err;
2019 	}
2020 	*snapp = snap;
2021 	return 0;
2022 err:
2023 	kfree(snap->name);
2024 	kfree(snap);
2025 	return ret;
2026 }
2027 
2028 /*
2029  * search for the previous snap in a null delimited string list
2030  */
2031 const char *rbd_prev_snap_name(const char *name, const char *start)
2032 {
2033 	if (name < start + 2)
2034 		return NULL;
2035 
2036 	name -= 2;
2037 	while (*name) {
2038 		if (name == start)
2039 			return start;
2040 		name--;
2041 	}
2042 	return name + 1;
2043 }
2044 
2045 /*
2046  * compare the old list of snapshots that we have to what's in the header
2047  * and update it accordingly. Note that the header holds the snapshots
2048  * in a reverse order (from newest to oldest) and we need to go from
2049  * older to new so that we don't get a duplicate snap name when
2050  * doing the process (e.g., removed snapshot and recreated a new
2051  * one with the same name.
2052  */
2053 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2054 {
2055 	const char *name, *first_name;
2056 	int i = rbd_dev->header.total_snaps;
2057 	struct rbd_snap *snap, *old_snap = NULL;
2058 	int ret;
2059 	struct list_head *p, *n;
2060 
2061 	first_name = rbd_dev->header.snap_names;
2062 	name = first_name + rbd_dev->header.snap_names_len;
2063 
2064 	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2065 		u64 cur_id;
2066 
2067 		old_snap = list_entry(p, struct rbd_snap, node);
2068 
2069 		if (i)
2070 			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2071 
2072 		if (!i || old_snap->id < cur_id) {
2073 			/* old_snap->id was skipped, thus was removed */
2074 			__rbd_remove_snap_dev(rbd_dev, old_snap);
2075 			continue;
2076 		}
2077 		if (old_snap->id == cur_id) {
2078 			/* we have this snapshot already */
2079 			i--;
2080 			name = rbd_prev_snap_name(name, first_name);
2081 			continue;
2082 		}
2083 		for (; i > 0;
2084 		     i--, name = rbd_prev_snap_name(name, first_name)) {
2085 			if (!name) {
2086 				WARN_ON(1);
2087 				return -EINVAL;
2088 			}
2089 			cur_id = rbd_dev->header.snapc->snaps[i];
2090 			/* snapshot removal? handle it above */
2091 			if (cur_id >= old_snap->id)
2092 				break;
2093 			/* a new snapshot */
2094 			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2095 			if (ret < 0)
2096 				return ret;
2097 
2098 			/* note that we add it backward so using n and not p */
2099 			list_add(&snap->node, n);
2100 			p = &snap->node;
2101 		}
2102 	}
2103 	/* we're done going over the old snap list, just add what's left */
2104 	for (; i > 0; i--) {
2105 		name = rbd_prev_snap_name(name, first_name);
2106 		if (!name) {
2107 			WARN_ON(1);
2108 			return -EINVAL;
2109 		}
2110 		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2111 		if (ret < 0)
2112 			return ret;
2113 		list_add(&snap->node, &rbd_dev->snaps);
2114 	}
2115 
2116 	return 0;
2117 }
2118 
2119 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2120 {
2121 	int ret;
2122 	struct device *dev;
2123 	struct rbd_snap *snap;
2124 
2125 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2126 	dev = &rbd_dev->dev;
2127 
2128 	dev->bus = &rbd_bus_type;
2129 	dev->type = &rbd_device_type;
2130 	dev->parent = &rbd_root_dev;
2131 	dev->release = rbd_dev_release;
2132 	dev_set_name(dev, "%d", rbd_dev->id);
2133 	ret = device_register(dev);
2134 	if (ret < 0)
2135 		goto out;
2136 
2137 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2138 		ret = rbd_register_snap_dev(rbd_dev, snap,
2139 					     &rbd_dev->dev);
2140 		if (ret < 0)
2141 			break;
2142 	}
2143 out:
2144 	mutex_unlock(&ctl_mutex);
2145 	return ret;
2146 }
2147 
2148 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2149 {
2150 	device_unregister(&rbd_dev->dev);
2151 }
2152 
2153 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2154 {
2155 	int ret, rc;
2156 
2157 	do {
2158 		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2159 					 rbd_dev->header.obj_version);
2160 		if (ret == -ERANGE) {
2161 			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2162 			rc = __rbd_refresh_header(rbd_dev);
2163 			mutex_unlock(&ctl_mutex);
2164 			if (rc < 0)
2165 				return rc;
2166 		}
2167 	} while (ret == -ERANGE);
2168 
2169 	return ret;
2170 }
2171 
2172 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2173 
2174 /*
2175  * Get a unique rbd identifier for the given new rbd_dev, and add
2176  * the rbd_dev to the global list.  The minimum rbd id is 1.
2177  */
2178 static void rbd_id_get(struct rbd_device *rbd_dev)
2179 {
2180 	rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2181 
2182 	spin_lock(&rbd_dev_list_lock);
2183 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2184 	spin_unlock(&rbd_dev_list_lock);
2185 }
2186 
2187 /*
2188  * Remove an rbd_dev from the global list, and record that its
2189  * identifier is no longer in use.
2190  */
2191 static void rbd_id_put(struct rbd_device *rbd_dev)
2192 {
2193 	struct list_head *tmp;
2194 	int rbd_id = rbd_dev->id;
2195 	int max_id;
2196 
2197 	BUG_ON(rbd_id < 1);
2198 
2199 	spin_lock(&rbd_dev_list_lock);
2200 	list_del_init(&rbd_dev->node);
2201 
2202 	/*
2203 	 * If the id being "put" is not the current maximum, there
2204 	 * is nothing special we need to do.
2205 	 */
2206 	if (rbd_id != atomic64_read(&rbd_id_max)) {
2207 		spin_unlock(&rbd_dev_list_lock);
2208 		return;
2209 	}
2210 
2211 	/*
2212 	 * We need to update the current maximum id.  Search the
2213 	 * list to find out what it is.  We're more likely to find
2214 	 * the maximum at the end, so search the list backward.
2215 	 */
2216 	max_id = 0;
2217 	list_for_each_prev(tmp, &rbd_dev_list) {
2218 		struct rbd_device *rbd_dev;
2219 
2220 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2221 		if (rbd_id > max_id)
2222 			max_id = rbd_id;
2223 	}
2224 	spin_unlock(&rbd_dev_list_lock);
2225 
2226 	/*
2227 	 * The max id could have been updated by rbd_id_get(), in
2228 	 * which case it now accurately reflects the new maximum.
2229 	 * Be careful not to overwrite the maximum value in that
2230 	 * case.
2231 	 */
2232 	atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2233 }
2234 
2235 /*
2236  * Skips over white space at *buf, and updates *buf to point to the
2237  * first found non-space character (if any). Returns the length of
2238  * the token (string of non-white space characters) found.  Note
2239  * that *buf must be terminated with '\0'.
2240  */
2241 static inline size_t next_token(const char **buf)
2242 {
2243         /*
2244         * These are the characters that produce nonzero for
2245         * isspace() in the "C" and "POSIX" locales.
2246         */
2247         const char *spaces = " \f\n\r\t\v";
2248 
2249         *buf += strspn(*buf, spaces);	/* Find start of token */
2250 
2251 	return strcspn(*buf, spaces);   /* Return token length */
2252 }
2253 
2254 /*
2255  * Finds the next token in *buf, and if the provided token buffer is
2256  * big enough, copies the found token into it.  The result, if
2257  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2258  * must be terminated with '\0' on entry.
2259  *
2260  * Returns the length of the token found (not including the '\0').
2261  * Return value will be 0 if no token is found, and it will be >=
2262  * token_size if the token would not fit.
2263  *
2264  * The *buf pointer will be updated to point beyond the end of the
2265  * found token.  Note that this occurs even if the token buffer is
2266  * too small to hold it.
2267  */
2268 static inline size_t copy_token(const char **buf,
2269 				char *token,
2270 				size_t token_size)
2271 {
2272         size_t len;
2273 
2274 	len = next_token(buf);
2275 	if (len < token_size) {
2276 		memcpy(token, *buf, len);
2277 		*(token + len) = '\0';
2278 	}
2279 	*buf += len;
2280 
2281         return len;
2282 }
2283 
2284 /*
2285  * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2286  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2287  * on the list of monitor addresses and other options provided via
2288  * /sys/bus/rbd/add.
2289  */
2290 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2291 			      const char *buf,
2292 			      const char **mon_addrs,
2293 			      size_t *mon_addrs_size,
2294 			      char *options,
2295 			      size_t options_size)
2296 {
2297 	size_t	len;
2298 
2299 	/* The first four tokens are required */
2300 
2301 	len = next_token(&buf);
2302 	if (!len)
2303 		return -EINVAL;
2304 	*mon_addrs_size = len + 1;
2305 	*mon_addrs = buf;
2306 
2307 	buf += len;
2308 
2309 	len = copy_token(&buf, options, options_size);
2310 	if (!len || len >= options_size)
2311 		return -EINVAL;
2312 
2313 	len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2314 	if (!len || len >= sizeof (rbd_dev->pool_name))
2315 		return -EINVAL;
2316 
2317 	len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2318 	if (!len || len >= sizeof (rbd_dev->obj))
2319 		return -EINVAL;
2320 
2321 	/* We have the object length in hand, save it. */
2322 
2323 	rbd_dev->obj_len = len;
2324 
2325 	BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2326 				< RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2327 	sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2328 
2329 	/*
2330 	 * The snapshot name is optional, but it's an error if it's
2331 	 * too long.  If no snapshot is supplied, fill in the default.
2332 	 */
2333 	len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2334 	if (!len)
2335 		memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2336 			sizeof (RBD_SNAP_HEAD_NAME));
2337 	else if (len >= sizeof (rbd_dev->snap_name))
2338 		return -EINVAL;
2339 
2340 	return 0;
2341 }
2342 
2343 static ssize_t rbd_add(struct bus_type *bus,
2344 		       const char *buf,
2345 		       size_t count)
2346 {
2347 	struct rbd_device *rbd_dev;
2348 	const char *mon_addrs = NULL;
2349 	size_t mon_addrs_size = 0;
2350 	char *options = NULL;
2351 	struct ceph_osd_client *osdc;
2352 	int rc = -ENOMEM;
2353 
2354 	if (!try_module_get(THIS_MODULE))
2355 		return -ENODEV;
2356 
2357 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2358 	if (!rbd_dev)
2359 		goto err_nomem;
2360 	options = kmalloc(count, GFP_KERNEL);
2361 	if (!options)
2362 		goto err_nomem;
2363 
2364 	/* static rbd_device initialization */
2365 	spin_lock_init(&rbd_dev->lock);
2366 	INIT_LIST_HEAD(&rbd_dev->node);
2367 	INIT_LIST_HEAD(&rbd_dev->snaps);
2368 	init_rwsem(&rbd_dev->header_rwsem);
2369 
2370 	init_rwsem(&rbd_dev->header_rwsem);
2371 
2372 	/* generate unique id: find highest unique id, add one */
2373 	rbd_id_get(rbd_dev);
2374 
2375 	/* Fill in the device name, now that we have its id. */
2376 	BUILD_BUG_ON(DEV_NAME_LEN
2377 			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2378 	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2379 
2380 	/* parse add command */
2381 	rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2382 				options, count);
2383 	if (rc)
2384 		goto err_put_id;
2385 
2386 	rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2387 						options);
2388 	if (IS_ERR(rbd_dev->rbd_client)) {
2389 		rc = PTR_ERR(rbd_dev->rbd_client);
2390 		goto err_put_id;
2391 	}
2392 
2393 	/* pick the pool */
2394 	osdc = &rbd_dev->rbd_client->client->osdc;
2395 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2396 	if (rc < 0)
2397 		goto err_out_client;
2398 	rbd_dev->poolid = rc;
2399 
2400 	/* register our block device */
2401 	rc = register_blkdev(0, rbd_dev->name);
2402 	if (rc < 0)
2403 		goto err_out_client;
2404 	rbd_dev->major = rc;
2405 
2406 	rc = rbd_bus_add_dev(rbd_dev);
2407 	if (rc)
2408 		goto err_out_blkdev;
2409 
2410 	/*
2411 	 * At this point cleanup in the event of an error is the job
2412 	 * of the sysfs code (initiated by rbd_bus_del_dev()).
2413 	 *
2414 	 * Set up and announce blkdev mapping.
2415 	 */
2416 	rc = rbd_init_disk(rbd_dev);
2417 	if (rc)
2418 		goto err_out_bus;
2419 
2420 	rc = rbd_init_watch_dev(rbd_dev);
2421 	if (rc)
2422 		goto err_out_bus;
2423 
2424 	return count;
2425 
2426 err_out_bus:
2427 	/* this will also clean up rest of rbd_dev stuff */
2428 
2429 	rbd_bus_del_dev(rbd_dev);
2430 	kfree(options);
2431 	return rc;
2432 
2433 err_out_blkdev:
2434 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2435 err_out_client:
2436 	rbd_put_client(rbd_dev);
2437 err_put_id:
2438 	rbd_id_put(rbd_dev);
2439 err_nomem:
2440 	kfree(options);
2441 	kfree(rbd_dev);
2442 
2443 	dout("Error adding device %s\n", buf);
2444 	module_put(THIS_MODULE);
2445 
2446 	return (ssize_t) rc;
2447 }
2448 
2449 static struct rbd_device *__rbd_get_dev(unsigned long id)
2450 {
2451 	struct list_head *tmp;
2452 	struct rbd_device *rbd_dev;
2453 
2454 	spin_lock(&rbd_dev_list_lock);
2455 	list_for_each(tmp, &rbd_dev_list) {
2456 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2457 		if (rbd_dev->id == id) {
2458 			spin_unlock(&rbd_dev_list_lock);
2459 			return rbd_dev;
2460 		}
2461 	}
2462 	spin_unlock(&rbd_dev_list_lock);
2463 	return NULL;
2464 }
2465 
2466 static void rbd_dev_release(struct device *dev)
2467 {
2468 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2469 
2470 	if (rbd_dev->watch_request) {
2471 		struct ceph_client *client = rbd_dev->rbd_client->client;
2472 
2473 		ceph_osdc_unregister_linger_request(&client->osdc,
2474 						    rbd_dev->watch_request);
2475 	}
2476 	if (rbd_dev->watch_event)
2477 		rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2478 
2479 	rbd_put_client(rbd_dev);
2480 
2481 	/* clean up and free blkdev */
2482 	rbd_free_disk(rbd_dev);
2483 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2484 
2485 	/* done with the id, and with the rbd_dev */
2486 	rbd_id_put(rbd_dev);
2487 	kfree(rbd_dev);
2488 
2489 	/* release module ref */
2490 	module_put(THIS_MODULE);
2491 }
2492 
2493 static ssize_t rbd_remove(struct bus_type *bus,
2494 			  const char *buf,
2495 			  size_t count)
2496 {
2497 	struct rbd_device *rbd_dev = NULL;
2498 	int target_id, rc;
2499 	unsigned long ul;
2500 	int ret = count;
2501 
2502 	rc = strict_strtoul(buf, 10, &ul);
2503 	if (rc)
2504 		return rc;
2505 
2506 	/* convert to int; abort if we lost anything in the conversion */
2507 	target_id = (int) ul;
2508 	if (target_id != ul)
2509 		return -EINVAL;
2510 
2511 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2512 
2513 	rbd_dev = __rbd_get_dev(target_id);
2514 	if (!rbd_dev) {
2515 		ret = -ENOENT;
2516 		goto done;
2517 	}
2518 
2519 	__rbd_remove_all_snaps(rbd_dev);
2520 	rbd_bus_del_dev(rbd_dev);
2521 
2522 done:
2523 	mutex_unlock(&ctl_mutex);
2524 	return ret;
2525 }
2526 
2527 static ssize_t rbd_snap_add(struct device *dev,
2528 			    struct device_attribute *attr,
2529 			    const char *buf,
2530 			    size_t count)
2531 {
2532 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2533 	int ret;
2534 	char *name = kmalloc(count + 1, GFP_KERNEL);
2535 	if (!name)
2536 		return -ENOMEM;
2537 
2538 	snprintf(name, count, "%s", buf);
2539 
2540 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2541 
2542 	ret = rbd_header_add_snap(rbd_dev,
2543 				  name, GFP_KERNEL);
2544 	if (ret < 0)
2545 		goto err_unlock;
2546 
2547 	ret = __rbd_refresh_header(rbd_dev);
2548 	if (ret < 0)
2549 		goto err_unlock;
2550 
2551 	/* shouldn't hold ctl_mutex when notifying.. notify might
2552 	   trigger a watch callback that would need to get that mutex */
2553 	mutex_unlock(&ctl_mutex);
2554 
2555 	/* make a best effort, don't error if failed */
2556 	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2557 
2558 	ret = count;
2559 	kfree(name);
2560 	return ret;
2561 
2562 err_unlock:
2563 	mutex_unlock(&ctl_mutex);
2564 	kfree(name);
2565 	return ret;
2566 }
2567 
2568 /*
2569  * create control files in sysfs
2570  * /sys/bus/rbd/...
2571  */
2572 static int rbd_sysfs_init(void)
2573 {
2574 	int ret;
2575 
2576 	ret = device_register(&rbd_root_dev);
2577 	if (ret < 0)
2578 		return ret;
2579 
2580 	ret = bus_register(&rbd_bus_type);
2581 	if (ret < 0)
2582 		device_unregister(&rbd_root_dev);
2583 
2584 	return ret;
2585 }
2586 
2587 static void rbd_sysfs_cleanup(void)
2588 {
2589 	bus_unregister(&rbd_bus_type);
2590 	device_unregister(&rbd_root_dev);
2591 }
2592 
2593 int __init rbd_init(void)
2594 {
2595 	int rc;
2596 
2597 	rc = rbd_sysfs_init();
2598 	if (rc)
2599 		return rc;
2600 	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2601 	return 0;
2602 }
2603 
2604 void __exit rbd_exit(void)
2605 {
2606 	rbd_sysfs_cleanup();
2607 }
2608 
2609 module_init(rbd_init);
2610 module_exit(rbd_exit);
2611 
2612 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2613 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2614 MODULE_DESCRIPTION("rados block device");
2615 
2616 /* following authorship retained from original osdblk.c */
2617 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2618 
2619 MODULE_LICENSE("GPL");
2620