xref: /linux/drivers/block/rbd.c (revision 092e0e7e520a1fca03e13c9f2d157432a8657ff2)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    Instructions for use
25    --------------------
26 
27    1) Map a Linux block device to an existing rbd image.
28 
29       Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
30 
31       $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add
32 
33       The snapshot name can be "-" or omitted to map the image read/write.
34 
35    2) List all active blkdev<->object mappings.
36 
37       In this example, we have performed step #1 twice, creating two blkdevs,
38       mapped to two separate rados objects in the rados rbd pool
39 
40       $ cat /sys/class/rbd/list
41       #id     major   client_name     pool    name    snap    KB
42       0       254     client4143      rbd     foo     -      1024000
43 
44       The columns, in order, are:
45       - blkdev unique id
46       - blkdev assigned major
47       - rados client id
48       - rados pool name
49       - rados block device name
50       - mapped snapshot ("-" if none)
51       - device size in KB
52 
53 
54    3) Create a snapshot.
55 
56       Usage: <blkdev id> <snapname>
57 
58       $ echo "0 mysnap" > /sys/class/rbd/snap_create
59 
60 
61    4) Listing a snapshot.
62 
63       $ cat /sys/class/rbd/snaps_list
64       #id     snap    KB
65       0       -       1024000 (*)
66       0       foo     1024000
67 
68       The columns, in order, are:
69       - blkdev unique id
70       - snapshot name, '-' means none (active read/write version)
71       - size of device at time of snapshot
72       - the (*) indicates this is the active version
73 
74    5) Rollback to snapshot.
75 
76       Usage: <blkdev id> <snapname>
77 
78       $ echo "0 mysnap" > /sys/class/rbd/snap_rollback
79 
80 
81    6) Mapping an image using snapshot.
82 
83       A snapshot mapping is read-only. This is being done by passing
84       snap=<snapname> to the options when adding a device.
85 
86       $ echo "192.168.0.1 name=admin,snap=mysnap rbd foo" > /sys/class/rbd/add
87 
88 
89    7) Remove an active blkdev<->rbd image mapping.
90 
91       In this example, we remove the mapping with blkdev unique id 1.
92 
93       $ echo 1 > /sys/class/rbd/remove
94 
95 
96    NOTE:  The actual creation and deletion of rados objects is outside the scope
97    of this driver.
98 
99  */
100 
101 #include <linux/ceph/libceph.h>
102 #include <linux/ceph/osd_client.h>
103 #include <linux/ceph/mon_client.h>
104 #include <linux/ceph/decode.h>
105 
106 #include <linux/kernel.h>
107 #include <linux/device.h>
108 #include <linux/module.h>
109 #include <linux/fs.h>
110 #include <linux/blkdev.h>
111 
112 #include "rbd_types.h"
113 
114 #define DRV_NAME "rbd"
115 #define DRV_NAME_LONG "rbd (rados block device)"
116 
117 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
118 
119 #define RBD_MAX_MD_NAME_LEN	(96 + sizeof(RBD_SUFFIX))
120 #define RBD_MAX_POOL_NAME_LEN	64
121 #define RBD_MAX_SNAP_NAME_LEN	32
122 #define RBD_MAX_OPT_LEN		1024
123 
124 #define RBD_SNAP_HEAD_NAME	"-"
125 
126 #define DEV_NAME_LEN		32
127 
128 /*
129  * block device image metadata (in-memory version)
130  */
131 struct rbd_image_header {
132 	u64 image_size;
133 	char block_name[32];
134 	__u8 obj_order;
135 	__u8 crypt_type;
136 	__u8 comp_type;
137 	struct rw_semaphore snap_rwsem;
138 	struct ceph_snap_context *snapc;
139 	size_t snap_names_len;
140 	u64 snap_seq;
141 	u32 total_snaps;
142 
143 	char *snap_names;
144 	u64 *snap_sizes;
145 };
146 
147 /*
148  * an instance of the client.  multiple devices may share a client.
149  */
150 struct rbd_client {
151 	struct ceph_client	*client;
152 	struct kref		kref;
153 	struct list_head	node;
154 };
155 
156 /*
157  * a single io request
158  */
159 struct rbd_request {
160 	struct request		*rq;		/* blk layer request */
161 	struct bio		*bio;		/* cloned bio */
162 	struct page		**pages;	/* list of used pages */
163 	u64			len;
164 };
165 
166 /*
167  * a single device
168  */
169 struct rbd_device {
170 	int			id;		/* blkdev unique id */
171 
172 	int			major;		/* blkdev assigned major */
173 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
174 	struct request_queue	*q;
175 
176 	struct ceph_client	*client;
177 	struct rbd_client	*rbd_client;
178 
179 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
180 
181 	spinlock_t		lock;		/* queue lock */
182 
183 	struct rbd_image_header	header;
184 	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
185 	int			obj_len;
186 	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
187 	char			pool_name[RBD_MAX_POOL_NAME_LEN];
188 	int			poolid;
189 
190 	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
191 	u32 cur_snap;	/* index+1 of current snapshot within snap context
192 			   0 - for the head */
193 	int read_only;
194 
195 	struct list_head	node;
196 };
197 
198 static spinlock_t node_lock;      /* protects client get/put */
199 
200 static struct class *class_rbd;	  /* /sys/class/rbd */
201 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
202 static LIST_HEAD(rbd_dev_list);    /* devices */
203 static LIST_HEAD(rbd_client_list);      /* clients */
204 
205 
206 static int rbd_open(struct block_device *bdev, fmode_t mode)
207 {
208 	struct gendisk *disk = bdev->bd_disk;
209 	struct rbd_device *rbd_dev = disk->private_data;
210 
211 	set_device_ro(bdev, rbd_dev->read_only);
212 
213 	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
214 		return -EROFS;
215 
216 	return 0;
217 }
218 
219 static const struct block_device_operations rbd_bd_ops = {
220 	.owner			= THIS_MODULE,
221 	.open			= rbd_open,
222 };
223 
224 /*
225  * Initialize an rbd client instance.
226  * We own *opt.
227  */
228 static struct rbd_client *rbd_client_create(struct ceph_options *opt)
229 {
230 	struct rbd_client *rbdc;
231 	int ret = -ENOMEM;
232 
233 	dout("rbd_client_create\n");
234 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
235 	if (!rbdc)
236 		goto out_opt;
237 
238 	kref_init(&rbdc->kref);
239 	INIT_LIST_HEAD(&rbdc->node);
240 
241 	rbdc->client = ceph_create_client(opt, rbdc);
242 	if (IS_ERR(rbdc->client))
243 		goto out_rbdc;
244 	opt = NULL; /* Now rbdc->client is responsible for opt */
245 
246 	ret = ceph_open_session(rbdc->client);
247 	if (ret < 0)
248 		goto out_err;
249 
250 	spin_lock(&node_lock);
251 	list_add_tail(&rbdc->node, &rbd_client_list);
252 	spin_unlock(&node_lock);
253 
254 	dout("rbd_client_create created %p\n", rbdc);
255 	return rbdc;
256 
257 out_err:
258 	ceph_destroy_client(rbdc->client);
259 out_rbdc:
260 	kfree(rbdc);
261 out_opt:
262 	if (opt)
263 		ceph_destroy_options(opt);
264 	return ERR_PTR(ret);
265 }
266 
267 /*
268  * Find a ceph client with specific addr and configuration.
269  */
270 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
271 {
272 	struct rbd_client *client_node;
273 
274 	if (opt->flags & CEPH_OPT_NOSHARE)
275 		return NULL;
276 
277 	list_for_each_entry(client_node, &rbd_client_list, node)
278 		if (ceph_compare_options(opt, client_node->client) == 0)
279 			return client_node;
280 	return NULL;
281 }
282 
283 /*
284  * Get a ceph client with specific addr and configuration, if one does
285  * not exist create it.
286  */
287 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
288 			  char *options)
289 {
290 	struct rbd_client *rbdc;
291 	struct ceph_options *opt;
292 	int ret;
293 
294 	ret = ceph_parse_options(&opt, options, mon_addr,
295 				 mon_addr + strlen(mon_addr), NULL, NULL);
296 	if (ret < 0)
297 		return ret;
298 
299 	spin_lock(&node_lock);
300 	rbdc = __rbd_client_find(opt);
301 	if (rbdc) {
302 		ceph_destroy_options(opt);
303 
304 		/* using an existing client */
305 		kref_get(&rbdc->kref);
306 		rbd_dev->rbd_client = rbdc;
307 		rbd_dev->client = rbdc->client;
308 		spin_unlock(&node_lock);
309 		return 0;
310 	}
311 	spin_unlock(&node_lock);
312 
313 	rbdc = rbd_client_create(opt);
314 	if (IS_ERR(rbdc))
315 		return PTR_ERR(rbdc);
316 
317 	rbd_dev->rbd_client = rbdc;
318 	rbd_dev->client = rbdc->client;
319 	return 0;
320 }
321 
322 /*
323  * Destroy ceph client
324  */
325 static void rbd_client_release(struct kref *kref)
326 {
327 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
328 
329 	dout("rbd_release_client %p\n", rbdc);
330 	spin_lock(&node_lock);
331 	list_del(&rbdc->node);
332 	spin_unlock(&node_lock);
333 
334 	ceph_destroy_client(rbdc->client);
335 	kfree(rbdc);
336 }
337 
338 /*
339  * Drop reference to ceph client node. If it's not referenced anymore, release
340  * it.
341  */
342 static void rbd_put_client(struct rbd_device *rbd_dev)
343 {
344 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
345 	rbd_dev->rbd_client = NULL;
346 	rbd_dev->client = NULL;
347 }
348 
349 
350 /*
351  * Create a new header structure, translate header format from the on-disk
352  * header.
353  */
354 static int rbd_header_from_disk(struct rbd_image_header *header,
355 				 struct rbd_image_header_ondisk *ondisk,
356 				 int allocated_snaps,
357 				 gfp_t gfp_flags)
358 {
359 	int i;
360 	u32 snap_count = le32_to_cpu(ondisk->snap_count);
361 	int ret = -ENOMEM;
362 
363 	init_rwsem(&header->snap_rwsem);
364 
365 	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
366 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
367 				snap_count *
368 				 sizeof(struct rbd_image_snap_ondisk),
369 				gfp_flags);
370 	if (!header->snapc)
371 		return -ENOMEM;
372 	if (snap_count) {
373 		header->snap_names = kmalloc(header->snap_names_len,
374 					     GFP_KERNEL);
375 		if (!header->snap_names)
376 			goto err_snapc;
377 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
378 					     GFP_KERNEL);
379 		if (!header->snap_sizes)
380 			goto err_names;
381 	} else {
382 		header->snap_names = NULL;
383 		header->snap_sizes = NULL;
384 	}
385 	memcpy(header->block_name, ondisk->block_name,
386 	       sizeof(ondisk->block_name));
387 
388 	header->image_size = le64_to_cpu(ondisk->image_size);
389 	header->obj_order = ondisk->options.order;
390 	header->crypt_type = ondisk->options.crypt_type;
391 	header->comp_type = ondisk->options.comp_type;
392 
393 	atomic_set(&header->snapc->nref, 1);
394 	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
395 	header->snapc->num_snaps = snap_count;
396 	header->total_snaps = snap_count;
397 
398 	if (snap_count &&
399 	    allocated_snaps == snap_count) {
400 		for (i = 0; i < snap_count; i++) {
401 			header->snapc->snaps[i] =
402 				le64_to_cpu(ondisk->snaps[i].id);
403 			header->snap_sizes[i] =
404 				le64_to_cpu(ondisk->snaps[i].image_size);
405 		}
406 
407 		/* copy snapshot names */
408 		memcpy(header->snap_names, &ondisk->snaps[i],
409 			header->snap_names_len);
410 	}
411 
412 	return 0;
413 
414 err_names:
415 	kfree(header->snap_names);
416 err_snapc:
417 	kfree(header->snapc);
418 	return ret;
419 }
420 
421 static int snap_index(struct rbd_image_header *header, int snap_num)
422 {
423 	return header->total_snaps - snap_num;
424 }
425 
426 static u64 cur_snap_id(struct rbd_device *rbd_dev)
427 {
428 	struct rbd_image_header *header = &rbd_dev->header;
429 
430 	if (!rbd_dev->cur_snap)
431 		return 0;
432 
433 	return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
434 }
435 
436 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
437 			u64 *seq, u64 *size)
438 {
439 	int i;
440 	char *p = header->snap_names;
441 
442 	for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
443 		if (strcmp(snap_name, p) == 0)
444 			break;
445 	}
446 	if (i == header->total_snaps)
447 		return -ENOENT;
448 	if (seq)
449 		*seq = header->snapc->snaps[i];
450 
451 	if (size)
452 		*size = header->snap_sizes[i];
453 
454 	return i;
455 }
456 
457 static int rbd_header_set_snap(struct rbd_device *dev,
458 			       const char *snap_name,
459 			       u64 *size)
460 {
461 	struct rbd_image_header *header = &dev->header;
462 	struct ceph_snap_context *snapc = header->snapc;
463 	int ret = -ENOENT;
464 
465 	down_write(&header->snap_rwsem);
466 
467 	if (!snap_name ||
468 	    !*snap_name ||
469 	    strcmp(snap_name, "-") == 0 ||
470 	    strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
471 		if (header->total_snaps)
472 			snapc->seq = header->snap_seq;
473 		else
474 			snapc->seq = 0;
475 		dev->cur_snap = 0;
476 		dev->read_only = 0;
477 		if (size)
478 			*size = header->image_size;
479 	} else {
480 		ret = snap_by_name(header, snap_name, &snapc->seq, size);
481 		if (ret < 0)
482 			goto done;
483 
484 		dev->cur_snap = header->total_snaps - ret;
485 		dev->read_only = 1;
486 	}
487 
488 	ret = 0;
489 done:
490 	up_write(&header->snap_rwsem);
491 	return ret;
492 }
493 
494 static void rbd_header_free(struct rbd_image_header *header)
495 {
496 	kfree(header->snapc);
497 	kfree(header->snap_names);
498 	kfree(header->snap_sizes);
499 }
500 
501 /*
502  * get the actual striped segment name, offset and length
503  */
504 static u64 rbd_get_segment(struct rbd_image_header *header,
505 			   const char *block_name,
506 			   u64 ofs, u64 len,
507 			   char *seg_name, u64 *segofs)
508 {
509 	u64 seg = ofs >> header->obj_order;
510 
511 	if (seg_name)
512 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
513 			 "%s.%012llx", block_name, seg);
514 
515 	ofs = ofs & ((1 << header->obj_order) - 1);
516 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
517 
518 	if (segofs)
519 		*segofs = ofs;
520 
521 	return len;
522 }
523 
524 /*
525  * bio helpers
526  */
527 
528 static void bio_chain_put(struct bio *chain)
529 {
530 	struct bio *tmp;
531 
532 	while (chain) {
533 		tmp = chain;
534 		chain = chain->bi_next;
535 		bio_put(tmp);
536 	}
537 }
538 
539 /*
540  * zeros a bio chain, starting at specific offset
541  */
542 static void zero_bio_chain(struct bio *chain, int start_ofs)
543 {
544 	struct bio_vec *bv;
545 	unsigned long flags;
546 	void *buf;
547 	int i;
548 	int pos = 0;
549 
550 	while (chain) {
551 		bio_for_each_segment(bv, chain, i) {
552 			if (pos + bv->bv_len > start_ofs) {
553 				int remainder = max(start_ofs - pos, 0);
554 				buf = bvec_kmap_irq(bv, &flags);
555 				memset(buf + remainder, 0,
556 				       bv->bv_len - remainder);
557 				bvec_kunmap_irq(buf, &flags);
558 			}
559 			pos += bv->bv_len;
560 		}
561 
562 		chain = chain->bi_next;
563 	}
564 }
565 
566 /*
567  * bio_chain_clone - clone a chain of bios up to a certain length.
568  * might return a bio_pair that will need to be released.
569  */
570 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
571 				   struct bio_pair **bp,
572 				   int len, gfp_t gfpmask)
573 {
574 	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
575 	int total = 0;
576 
577 	if (*bp) {
578 		bio_pair_release(*bp);
579 		*bp = NULL;
580 	}
581 
582 	while (old_chain && (total < len)) {
583 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
584 		if (!tmp)
585 			goto err_out;
586 
587 		if (total + old_chain->bi_size > len) {
588 			struct bio_pair *bp;
589 
590 			/*
591 			 * this split can only happen with a single paged bio,
592 			 * split_bio will BUG_ON if this is not the case
593 			 */
594 			dout("bio_chain_clone split! total=%d remaining=%d"
595 			     "bi_size=%d\n",
596 			     (int)total, (int)len-total,
597 			     (int)old_chain->bi_size);
598 
599 			/* split the bio. We'll release it either in the next
600 			   call, or it will have to be released outside */
601 			bp = bio_split(old_chain, (len - total) / 512ULL);
602 			if (!bp)
603 				goto err_out;
604 
605 			__bio_clone(tmp, &bp->bio1);
606 
607 			*next = &bp->bio2;
608 		} else {
609 			__bio_clone(tmp, old_chain);
610 			*next = old_chain->bi_next;
611 		}
612 
613 		tmp->bi_bdev = NULL;
614 		gfpmask &= ~__GFP_WAIT;
615 		tmp->bi_next = NULL;
616 
617 		if (!new_chain) {
618 			new_chain = tail = tmp;
619 		} else {
620 			tail->bi_next = tmp;
621 			tail = tmp;
622 		}
623 		old_chain = old_chain->bi_next;
624 
625 		total += tmp->bi_size;
626 	}
627 
628 	BUG_ON(total < len);
629 
630 	if (tail)
631 		tail->bi_next = NULL;
632 
633 	*old = old_chain;
634 
635 	return new_chain;
636 
637 err_out:
638 	dout("bio_chain_clone with err\n");
639 	bio_chain_put(new_chain);
640 	return NULL;
641 }
642 
643 /*
644  * helpers for osd request op vectors.
645  */
646 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
647 			    int num_ops,
648 			    int opcode,
649 			    u32 payload_len)
650 {
651 	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
652 		       GFP_NOIO);
653 	if (!*ops)
654 		return -ENOMEM;
655 	(*ops)[0].op = opcode;
656 	/*
657 	 * op extent offset and length will be set later on
658 	 * in calc_raw_layout()
659 	 */
660 	(*ops)[0].payload_len = payload_len;
661 	return 0;
662 }
663 
664 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
665 {
666 	kfree(ops);
667 }
668 
669 /*
670  * Send ceph osd request
671  */
672 static int rbd_do_request(struct request *rq,
673 			  struct rbd_device *dev,
674 			  struct ceph_snap_context *snapc,
675 			  u64 snapid,
676 			  const char *obj, u64 ofs, u64 len,
677 			  struct bio *bio,
678 			  struct page **pages,
679 			  int num_pages,
680 			  int flags,
681 			  struct ceph_osd_req_op *ops,
682 			  int num_reply,
683 			  void (*rbd_cb)(struct ceph_osd_request *req,
684 					 struct ceph_msg *msg))
685 {
686 	struct ceph_osd_request *req;
687 	struct ceph_file_layout *layout;
688 	int ret;
689 	u64 bno;
690 	struct timespec mtime = CURRENT_TIME;
691 	struct rbd_request *req_data;
692 	struct ceph_osd_request_head *reqhead;
693 	struct rbd_image_header *header = &dev->header;
694 
695 	ret = -ENOMEM;
696 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
697 	if (!req_data)
698 		goto done;
699 
700 	dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
701 
702 	down_read(&header->snap_rwsem);
703 
704 	req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
705 				      snapc,
706 				      ops,
707 				      false,
708 				      GFP_NOIO, pages, bio);
709 	if (IS_ERR(req)) {
710 		up_read(&header->snap_rwsem);
711 		ret = PTR_ERR(req);
712 		goto done_pages;
713 	}
714 
715 	req->r_callback = rbd_cb;
716 
717 	req_data->rq = rq;
718 	req_data->bio = bio;
719 	req_data->pages = pages;
720 	req_data->len = len;
721 
722 	req->r_priv = req_data;
723 
724 	reqhead = req->r_request->front.iov_base;
725 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
726 
727 	strncpy(req->r_oid, obj, sizeof(req->r_oid));
728 	req->r_oid_len = strlen(req->r_oid);
729 
730 	layout = &req->r_file_layout;
731 	memset(layout, 0, sizeof(*layout));
732 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
733 	layout->fl_stripe_count = cpu_to_le32(1);
734 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
735 	layout->fl_pg_preferred = cpu_to_le32(-1);
736 	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
737 	ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
738 			     ofs, &len, &bno, req, ops);
739 
740 	ceph_osdc_build_request(req, ofs, &len,
741 				ops,
742 				snapc,
743 				&mtime,
744 				req->r_oid, req->r_oid_len);
745 	up_read(&header->snap_rwsem);
746 
747 	ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
748 	if (ret < 0)
749 		goto done_err;
750 
751 	if (!rbd_cb) {
752 		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
753 		ceph_osdc_put_request(req);
754 	}
755 	return ret;
756 
757 done_err:
758 	bio_chain_put(req_data->bio);
759 	ceph_osdc_put_request(req);
760 done_pages:
761 	kfree(req_data);
762 done:
763 	if (rq)
764 		blk_end_request(rq, ret, len);
765 	return ret;
766 }
767 
768 /*
769  * Ceph osd op callback
770  */
771 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
772 {
773 	struct rbd_request *req_data = req->r_priv;
774 	struct ceph_osd_reply_head *replyhead;
775 	struct ceph_osd_op *op;
776 	__s32 rc;
777 	u64 bytes;
778 	int read_op;
779 
780 	/* parse reply */
781 	replyhead = msg->front.iov_base;
782 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
783 	op = (void *)(replyhead + 1);
784 	rc = le32_to_cpu(replyhead->result);
785 	bytes = le64_to_cpu(op->extent.length);
786 	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
787 
788 	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
789 
790 	if (rc == -ENOENT && read_op) {
791 		zero_bio_chain(req_data->bio, 0);
792 		rc = 0;
793 	} else if (rc == 0 && read_op && bytes < req_data->len) {
794 		zero_bio_chain(req_data->bio, bytes);
795 		bytes = req_data->len;
796 	}
797 
798 	blk_end_request(req_data->rq, rc, bytes);
799 
800 	if (req_data->bio)
801 		bio_chain_put(req_data->bio);
802 
803 	ceph_osdc_put_request(req);
804 	kfree(req_data);
805 }
806 
807 /*
808  * Do a synchronous ceph osd operation
809  */
810 static int rbd_req_sync_op(struct rbd_device *dev,
811 			   struct ceph_snap_context *snapc,
812 			   u64 snapid,
813 			   int opcode,
814 			   int flags,
815 			   struct ceph_osd_req_op *orig_ops,
816 			   int num_reply,
817 			   const char *obj,
818 			   u64 ofs, u64 len,
819 			   char *buf)
820 {
821 	int ret;
822 	struct page **pages;
823 	int num_pages;
824 	struct ceph_osd_req_op *ops = orig_ops;
825 	u32 payload_len;
826 
827 	num_pages = calc_pages_for(ofs , len);
828 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
829 	if (IS_ERR(pages))
830 		return PTR_ERR(pages);
831 
832 	if (!orig_ops) {
833 		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
834 		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
835 		if (ret < 0)
836 			goto done;
837 
838 		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
839 			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
840 			if (ret < 0)
841 				goto done_ops;
842 		}
843 	}
844 
845 	ret = rbd_do_request(NULL, dev, snapc, snapid,
846 			  obj, ofs, len, NULL,
847 			  pages, num_pages,
848 			  flags,
849 			  ops,
850 			  2,
851 			  NULL);
852 	if (ret < 0)
853 		goto done_ops;
854 
855 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
856 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
857 
858 done_ops:
859 	if (!orig_ops)
860 		rbd_destroy_ops(ops);
861 done:
862 	ceph_release_page_vector(pages, num_pages);
863 	return ret;
864 }
865 
866 /*
867  * Do an asynchronous ceph osd operation
868  */
869 static int rbd_do_op(struct request *rq,
870 		     struct rbd_device *rbd_dev ,
871 		     struct ceph_snap_context *snapc,
872 		     u64 snapid,
873 		     int opcode, int flags, int num_reply,
874 		     u64 ofs, u64 len,
875 		     struct bio *bio)
876 {
877 	char *seg_name;
878 	u64 seg_ofs;
879 	u64 seg_len;
880 	int ret;
881 	struct ceph_osd_req_op *ops;
882 	u32 payload_len;
883 
884 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
885 	if (!seg_name)
886 		return -ENOMEM;
887 
888 	seg_len = rbd_get_segment(&rbd_dev->header,
889 				  rbd_dev->header.block_name,
890 				  ofs, len,
891 				  seg_name, &seg_ofs);
892 
893 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
894 
895 	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
896 	if (ret < 0)
897 		goto done;
898 
899 	/* we've taken care of segment sizes earlier when we
900 	   cloned the bios. We should never have a segment
901 	   truncated at this point */
902 	BUG_ON(seg_len < len);
903 
904 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
905 			     seg_name, seg_ofs, seg_len,
906 			     bio,
907 			     NULL, 0,
908 			     flags,
909 			     ops,
910 			     num_reply,
911 			     rbd_req_cb);
912 done:
913 	kfree(seg_name);
914 	return ret;
915 }
916 
917 /*
918  * Request async osd write
919  */
920 static int rbd_req_write(struct request *rq,
921 			 struct rbd_device *rbd_dev,
922 			 struct ceph_snap_context *snapc,
923 			 u64 ofs, u64 len,
924 			 struct bio *bio)
925 {
926 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
927 			 CEPH_OSD_OP_WRITE,
928 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
929 			 2,
930 			 ofs, len, bio);
931 }
932 
933 /*
934  * Request async osd read
935  */
936 static int rbd_req_read(struct request *rq,
937 			 struct rbd_device *rbd_dev,
938 			 u64 snapid,
939 			 u64 ofs, u64 len,
940 			 struct bio *bio)
941 {
942 	return rbd_do_op(rq, rbd_dev, NULL,
943 			 (snapid ? snapid : CEPH_NOSNAP),
944 			 CEPH_OSD_OP_READ,
945 			 CEPH_OSD_FLAG_READ,
946 			 2,
947 			 ofs, len, bio);
948 }
949 
950 /*
951  * Request sync osd read
952  */
953 static int rbd_req_sync_read(struct rbd_device *dev,
954 			  struct ceph_snap_context *snapc,
955 			  u64 snapid,
956 			  const char *obj,
957 			  u64 ofs, u64 len,
958 			  char *buf)
959 {
960 	return rbd_req_sync_op(dev, NULL,
961 			       (snapid ? snapid : CEPH_NOSNAP),
962 			       CEPH_OSD_OP_READ,
963 			       CEPH_OSD_FLAG_READ,
964 			       NULL,
965 			       1, obj, ofs, len, buf);
966 }
967 
968 /*
969  * Request sync osd read
970  */
971 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
972 				     u64 snapid,
973 				     const char *obj)
974 {
975 	struct ceph_osd_req_op *ops;
976 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
977 	if (ret < 0)
978 		return ret;
979 
980 	ops[0].snap.snapid = snapid;
981 
982 	ret = rbd_req_sync_op(dev, NULL,
983 			       CEPH_NOSNAP,
984 			       0,
985 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
986 			       ops,
987 			       1, obj, 0, 0, NULL);
988 
989 	rbd_destroy_ops(ops);
990 
991 	if (ret < 0)
992 		return ret;
993 
994 	return ret;
995 }
996 
997 /*
998  * Request sync osd read
999  */
1000 static int rbd_req_sync_exec(struct rbd_device *dev,
1001 			     const char *obj,
1002 			     const char *cls,
1003 			     const char *method,
1004 			     const char *data,
1005 			     int len)
1006 {
1007 	struct ceph_osd_req_op *ops;
1008 	int cls_len = strlen(cls);
1009 	int method_len = strlen(method);
1010 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1011 				    cls_len + method_len + len);
1012 	if (ret < 0)
1013 		return ret;
1014 
1015 	ops[0].cls.class_name = cls;
1016 	ops[0].cls.class_len = (__u8)cls_len;
1017 	ops[0].cls.method_name = method;
1018 	ops[0].cls.method_len = (__u8)method_len;
1019 	ops[0].cls.argc = 0;
1020 	ops[0].cls.indata = data;
1021 	ops[0].cls.indata_len = len;
1022 
1023 	ret = rbd_req_sync_op(dev, NULL,
1024 			       CEPH_NOSNAP,
1025 			       0,
1026 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1027 			       ops,
1028 			       1, obj, 0, 0, NULL);
1029 
1030 	rbd_destroy_ops(ops);
1031 
1032 	dout("cls_exec returned %d\n", ret);
1033 	return ret;
1034 }
1035 
1036 /*
1037  * block device queue callback
1038  */
1039 static void rbd_rq_fn(struct request_queue *q)
1040 {
1041 	struct rbd_device *rbd_dev = q->queuedata;
1042 	struct request *rq;
1043 	struct bio_pair *bp = NULL;
1044 
1045 	rq = blk_fetch_request(q);
1046 
1047 	while (1) {
1048 		struct bio *bio;
1049 		struct bio *rq_bio, *next_bio = NULL;
1050 		bool do_write;
1051 		int size, op_size = 0;
1052 		u64 ofs;
1053 
1054 		/* peek at request from block layer */
1055 		if (!rq)
1056 			break;
1057 
1058 		dout("fetched request\n");
1059 
1060 		/* filter out block requests we don't understand */
1061 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1062 			__blk_end_request_all(rq, 0);
1063 			goto next;
1064 		}
1065 
1066 		/* deduce our operation (read, write) */
1067 		do_write = (rq_data_dir(rq) == WRITE);
1068 
1069 		size = blk_rq_bytes(rq);
1070 		ofs = blk_rq_pos(rq) * 512ULL;
1071 		rq_bio = rq->bio;
1072 		if (do_write && rbd_dev->read_only) {
1073 			__blk_end_request_all(rq, -EROFS);
1074 			goto next;
1075 		}
1076 
1077 		spin_unlock_irq(q->queue_lock);
1078 
1079 		dout("%s 0x%x bytes at 0x%llx\n",
1080 		     do_write ? "write" : "read",
1081 		     size, blk_rq_pos(rq) * 512ULL);
1082 
1083 		do {
1084 			/* a bio clone to be passed down to OSD req */
1085 			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1086 			op_size = rbd_get_segment(&rbd_dev->header,
1087 						  rbd_dev->header.block_name,
1088 						  ofs, size,
1089 						  NULL, NULL);
1090 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1091 					      op_size, GFP_ATOMIC);
1092 			if (!bio) {
1093 				spin_lock_irq(q->queue_lock);
1094 				__blk_end_request_all(rq, -ENOMEM);
1095 				goto next;
1096 			}
1097 
1098 			/* init OSD command: write or read */
1099 			if (do_write)
1100 				rbd_req_write(rq, rbd_dev,
1101 					      rbd_dev->header.snapc,
1102 					      ofs,
1103 					      op_size, bio);
1104 			else
1105 				rbd_req_read(rq, rbd_dev,
1106 					     cur_snap_id(rbd_dev),
1107 					     ofs,
1108 					     op_size, bio);
1109 
1110 			size -= op_size;
1111 			ofs += op_size;
1112 
1113 			rq_bio = next_bio;
1114 		} while (size > 0);
1115 
1116 		if (bp)
1117 			bio_pair_release(bp);
1118 
1119 		spin_lock_irq(q->queue_lock);
1120 next:
1121 		rq = blk_fetch_request(q);
1122 	}
1123 }
1124 
1125 /*
1126  * a queue callback. Makes sure that we don't create a bio that spans across
1127  * multiple osd objects. One exception would be with a single page bios,
1128  * which we handle later at bio_chain_clone
1129  */
1130 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1131 			  struct bio_vec *bvec)
1132 {
1133 	struct rbd_device *rbd_dev = q->queuedata;
1134 	unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1135 	sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1136 	unsigned int bio_sectors = bmd->bi_size >> 9;
1137 	int max;
1138 
1139 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1140 				 + bio_sectors)) << 9;
1141 	if (max < 0)
1142 		max = 0; /* bio_add cannot handle a negative return */
1143 	if (max <= bvec->bv_len && bio_sectors == 0)
1144 		return bvec->bv_len;
1145 	return max;
1146 }
1147 
1148 static void rbd_free_disk(struct rbd_device *rbd_dev)
1149 {
1150 	struct gendisk *disk = rbd_dev->disk;
1151 
1152 	if (!disk)
1153 		return;
1154 
1155 	rbd_header_free(&rbd_dev->header);
1156 
1157 	if (disk->flags & GENHD_FL_UP)
1158 		del_gendisk(disk);
1159 	if (disk->queue)
1160 		blk_cleanup_queue(disk->queue);
1161 	put_disk(disk);
1162 }
1163 
1164 /*
1165  * reload the ondisk the header
1166  */
1167 static int rbd_read_header(struct rbd_device *rbd_dev,
1168 			   struct rbd_image_header *header)
1169 {
1170 	ssize_t rc;
1171 	struct rbd_image_header_ondisk *dh;
1172 	int snap_count = 0;
1173 	u64 snap_names_len = 0;
1174 
1175 	while (1) {
1176 		int len = sizeof(*dh) +
1177 			  snap_count * sizeof(struct rbd_image_snap_ondisk) +
1178 			  snap_names_len;
1179 
1180 		rc = -ENOMEM;
1181 		dh = kmalloc(len, GFP_KERNEL);
1182 		if (!dh)
1183 			return -ENOMEM;
1184 
1185 		rc = rbd_req_sync_read(rbd_dev,
1186 				       NULL, CEPH_NOSNAP,
1187 				       rbd_dev->obj_md_name,
1188 				       0, len,
1189 				       (char *)dh);
1190 		if (rc < 0)
1191 			goto out_dh;
1192 
1193 		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1194 		if (rc < 0)
1195 			goto out_dh;
1196 
1197 		if (snap_count != header->total_snaps) {
1198 			snap_count = header->total_snaps;
1199 			snap_names_len = header->snap_names_len;
1200 			rbd_header_free(header);
1201 			kfree(dh);
1202 			continue;
1203 		}
1204 		break;
1205 	}
1206 
1207 out_dh:
1208 	kfree(dh);
1209 	return rc;
1210 }
1211 
1212 /*
1213  * create a snapshot
1214  */
1215 static int rbd_header_add_snap(struct rbd_device *dev,
1216 			       const char *snap_name,
1217 			       gfp_t gfp_flags)
1218 {
1219 	int name_len = strlen(snap_name);
1220 	u64 new_snapid;
1221 	int ret;
1222 	void *data, *data_start, *data_end;
1223 
1224 	/* we should create a snapshot only if we're pointing at the head */
1225 	if (dev->cur_snap)
1226 		return -EINVAL;
1227 
1228 	ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1229 				      &new_snapid);
1230 	dout("created snapid=%lld\n", new_snapid);
1231 	if (ret < 0)
1232 		return ret;
1233 
1234 	data = kmalloc(name_len + 16, gfp_flags);
1235 	if (!data)
1236 		return -ENOMEM;
1237 
1238 	data_start = data;
1239 	data_end = data + name_len + 16;
1240 
1241 	ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1242 	ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1243 
1244 	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1245 				data_start, data - data_start);
1246 
1247 	kfree(data_start);
1248 
1249 	if (ret < 0)
1250 		return ret;
1251 
1252 	dev->header.snapc->seq =  new_snapid;
1253 
1254 	return 0;
1255 bad:
1256 	return -ERANGE;
1257 }
1258 
1259 /*
1260  * only read the first part of the ondisk header, without the snaps info
1261  */
1262 static int rbd_update_snaps(struct rbd_device *rbd_dev)
1263 {
1264 	int ret;
1265 	struct rbd_image_header h;
1266 	u64 snap_seq;
1267 
1268 	ret = rbd_read_header(rbd_dev, &h);
1269 	if (ret < 0)
1270 		return ret;
1271 
1272 	down_write(&rbd_dev->header.snap_rwsem);
1273 
1274 	snap_seq = rbd_dev->header.snapc->seq;
1275 
1276 	kfree(rbd_dev->header.snapc);
1277 	kfree(rbd_dev->header.snap_names);
1278 	kfree(rbd_dev->header.snap_sizes);
1279 
1280 	rbd_dev->header.total_snaps = h.total_snaps;
1281 	rbd_dev->header.snapc = h.snapc;
1282 	rbd_dev->header.snap_names = h.snap_names;
1283 	rbd_dev->header.snap_sizes = h.snap_sizes;
1284 	rbd_dev->header.snapc->seq = snap_seq;
1285 
1286 	up_write(&rbd_dev->header.snap_rwsem);
1287 
1288 	return 0;
1289 }
1290 
1291 static int rbd_init_disk(struct rbd_device *rbd_dev)
1292 {
1293 	struct gendisk *disk;
1294 	struct request_queue *q;
1295 	int rc;
1296 	u64 total_size = 0;
1297 
1298 	/* contact OSD, request size info about the object being mapped */
1299 	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1300 	if (rc)
1301 		return rc;
1302 
1303 	rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1304 	if (rc)
1305 		return rc;
1306 
1307 	/* create gendisk info */
1308 	rc = -ENOMEM;
1309 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1310 	if (!disk)
1311 		goto out;
1312 
1313 	sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1314 	disk->major = rbd_dev->major;
1315 	disk->first_minor = 0;
1316 	disk->fops = &rbd_bd_ops;
1317 	disk->private_data = rbd_dev;
1318 
1319 	/* init rq */
1320 	rc = -ENOMEM;
1321 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1322 	if (!q)
1323 		goto out_disk;
1324 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1325 	disk->queue = q;
1326 
1327 	q->queuedata = rbd_dev;
1328 
1329 	rbd_dev->disk = disk;
1330 	rbd_dev->q = q;
1331 
1332 	/* finally, announce the disk to the world */
1333 	set_capacity(disk, total_size / 512ULL);
1334 	add_disk(disk);
1335 
1336 	pr_info("%s: added with size 0x%llx\n",
1337 		disk->disk_name, (unsigned long long)total_size);
1338 	return 0;
1339 
1340 out_disk:
1341 	put_disk(disk);
1342 out:
1343 	return rc;
1344 }
1345 
1346 /********************************************************************
1347  * /sys/class/rbd/
1348  *                   add	map rados objects to blkdev
1349  *                   remove	unmap rados objects
1350  *                   list	show mappings
1351  *******************************************************************/
1352 
1353 static void class_rbd_release(struct class *cls)
1354 {
1355 	kfree(cls);
1356 }
1357 
1358 static ssize_t class_rbd_list(struct class *c,
1359 			      struct class_attribute *attr,
1360 			      char *data)
1361 {
1362 	int n = 0;
1363 	struct list_head *tmp;
1364 	int max = PAGE_SIZE;
1365 
1366 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1367 
1368 	n += snprintf(data, max,
1369 		      "#id\tmajor\tclient_name\tpool\tname\tsnap\tKB\n");
1370 
1371 	list_for_each(tmp, &rbd_dev_list) {
1372 		struct rbd_device *rbd_dev;
1373 
1374 		rbd_dev = list_entry(tmp, struct rbd_device, node);
1375 		n += snprintf(data+n, max-n,
1376 			      "%d\t%d\tclient%lld\t%s\t%s\t%s\t%lld\n",
1377 			      rbd_dev->id,
1378 			      rbd_dev->major,
1379 			      ceph_client_id(rbd_dev->client),
1380 			      rbd_dev->pool_name,
1381 			      rbd_dev->obj, rbd_dev->snap_name,
1382 			      rbd_dev->header.image_size >> 10);
1383 		if (n == max)
1384 			break;
1385 	}
1386 
1387 	mutex_unlock(&ctl_mutex);
1388 	return n;
1389 }
1390 
1391 static ssize_t class_rbd_add(struct class *c,
1392 			     struct class_attribute *attr,
1393 			     const char *buf, size_t count)
1394 {
1395 	struct ceph_osd_client *osdc;
1396 	struct rbd_device *rbd_dev;
1397 	ssize_t rc = -ENOMEM;
1398 	int irc, new_id = 0;
1399 	struct list_head *tmp;
1400 	char *mon_dev_name;
1401 	char *options;
1402 
1403 	if (!try_module_get(THIS_MODULE))
1404 		return -ENODEV;
1405 
1406 	mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1407 	if (!mon_dev_name)
1408 		goto err_out_mod;
1409 
1410 	options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1411 	if (!options)
1412 		goto err_mon_dev;
1413 
1414 	/* new rbd_device object */
1415 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1416 	if (!rbd_dev)
1417 		goto err_out_opt;
1418 
1419 	/* static rbd_device initialization */
1420 	spin_lock_init(&rbd_dev->lock);
1421 	INIT_LIST_HEAD(&rbd_dev->node);
1422 
1423 	/* generate unique id: find highest unique id, add one */
1424 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1425 
1426 	list_for_each(tmp, &rbd_dev_list) {
1427 		struct rbd_device *rbd_dev;
1428 
1429 		rbd_dev = list_entry(tmp, struct rbd_device, node);
1430 		if (rbd_dev->id >= new_id)
1431 			new_id = rbd_dev->id + 1;
1432 	}
1433 
1434 	rbd_dev->id = new_id;
1435 
1436 	/* add to global list */
1437 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
1438 
1439 	/* parse add command */
1440 	if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1441 		   "%" __stringify(RBD_MAX_OPT_LEN) "s "
1442 		   "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1443 		   "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1444 		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1445 		   mon_dev_name, options, rbd_dev->pool_name,
1446 		   rbd_dev->obj, rbd_dev->snap_name) < 4) {
1447 		rc = -EINVAL;
1448 		goto err_out_slot;
1449 	}
1450 
1451 	if (rbd_dev->snap_name[0] == 0)
1452 		rbd_dev->snap_name[0] = '-';
1453 
1454 	rbd_dev->obj_len = strlen(rbd_dev->obj);
1455 	snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1456 		 rbd_dev->obj, RBD_SUFFIX);
1457 
1458 	/* initialize rest of new object */
1459 	snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1460 	rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1461 	if (rc < 0)
1462 		goto err_out_slot;
1463 
1464 	mutex_unlock(&ctl_mutex);
1465 
1466 	/* pick the pool */
1467 	osdc = &rbd_dev->client->osdc;
1468 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1469 	if (rc < 0)
1470 		goto err_out_client;
1471 	rbd_dev->poolid = rc;
1472 
1473 	/* register our block device */
1474 	irc = register_blkdev(0, rbd_dev->name);
1475 	if (irc < 0) {
1476 		rc = irc;
1477 		goto err_out_client;
1478 	}
1479 	rbd_dev->major = irc;
1480 
1481 	/* set up and announce blkdev mapping */
1482 	rc = rbd_init_disk(rbd_dev);
1483 	if (rc)
1484 		goto err_out_blkdev;
1485 
1486 	return count;
1487 
1488 err_out_blkdev:
1489 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
1490 err_out_client:
1491 	rbd_put_client(rbd_dev);
1492 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1493 err_out_slot:
1494 	list_del_init(&rbd_dev->node);
1495 	mutex_unlock(&ctl_mutex);
1496 
1497 	kfree(rbd_dev);
1498 err_out_opt:
1499 	kfree(options);
1500 err_mon_dev:
1501 	kfree(mon_dev_name);
1502 err_out_mod:
1503 	dout("Error adding device %s\n", buf);
1504 	module_put(THIS_MODULE);
1505 	return rc;
1506 }
1507 
1508 static struct rbd_device *__rbd_get_dev(unsigned long id)
1509 {
1510 	struct list_head *tmp;
1511 	struct rbd_device *rbd_dev;
1512 
1513 	list_for_each(tmp, &rbd_dev_list) {
1514 		rbd_dev = list_entry(tmp, struct rbd_device, node);
1515 		if (rbd_dev->id == id)
1516 			return rbd_dev;
1517 	}
1518 	return NULL;
1519 }
1520 
1521 static ssize_t class_rbd_remove(struct class *c,
1522 				struct class_attribute *attr,
1523 				const char *buf,
1524 				size_t count)
1525 {
1526 	struct rbd_device *rbd_dev = NULL;
1527 	int target_id, rc;
1528 	unsigned long ul;
1529 
1530 	rc = strict_strtoul(buf, 10, &ul);
1531 	if (rc)
1532 		return rc;
1533 
1534 	/* convert to int; abort if we lost anything in the conversion */
1535 	target_id = (int) ul;
1536 	if (target_id != ul)
1537 		return -EINVAL;
1538 
1539 	/* remove object from list immediately */
1540 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1541 
1542 	rbd_dev = __rbd_get_dev(target_id);
1543 	if (rbd_dev)
1544 		list_del_init(&rbd_dev->node);
1545 
1546 	mutex_unlock(&ctl_mutex);
1547 
1548 	if (!rbd_dev)
1549 		return -ENOENT;
1550 
1551 	rbd_put_client(rbd_dev);
1552 
1553 	/* clean up and free blkdev */
1554 	rbd_free_disk(rbd_dev);
1555 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
1556 	kfree(rbd_dev);
1557 
1558 	/* release module ref */
1559 	module_put(THIS_MODULE);
1560 
1561 	return count;
1562 }
1563 
1564 static ssize_t class_rbd_snaps_list(struct class *c,
1565 			      struct class_attribute *attr,
1566 			      char *data)
1567 {
1568 	struct rbd_device *rbd_dev = NULL;
1569 	struct list_head *tmp;
1570 	struct rbd_image_header *header;
1571 	int i, n = 0, max = PAGE_SIZE;
1572 	int ret;
1573 
1574 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1575 
1576 	n += snprintf(data, max, "#id\tsnap\tKB\n");
1577 
1578 	list_for_each(tmp, &rbd_dev_list) {
1579 		char *names, *p;
1580 		struct ceph_snap_context *snapc;
1581 
1582 		rbd_dev = list_entry(tmp, struct rbd_device, node);
1583 		header = &rbd_dev->header;
1584 
1585 		down_read(&header->snap_rwsem);
1586 
1587 		names = header->snap_names;
1588 		snapc = header->snapc;
1589 
1590 		n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1591 			      rbd_dev->id, RBD_SNAP_HEAD_NAME,
1592 			      header->image_size >> 10,
1593 			      (!rbd_dev->cur_snap ? " (*)" : ""));
1594 		if (n == max)
1595 			break;
1596 
1597 		p = names;
1598 		for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
1599 			n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1600 			      rbd_dev->id, p, header->snap_sizes[i] >> 10,
1601 			      (rbd_dev->cur_snap &&
1602 			       (snap_index(header, i) == rbd_dev->cur_snap) ?
1603 			       " (*)" : ""));
1604 			if (n == max)
1605 				break;
1606 		}
1607 
1608 		up_read(&header->snap_rwsem);
1609 	}
1610 
1611 
1612 	ret = n;
1613 	mutex_unlock(&ctl_mutex);
1614 	return ret;
1615 }
1616 
1617 static ssize_t class_rbd_snaps_refresh(struct class *c,
1618 				struct class_attribute *attr,
1619 				const char *buf,
1620 				size_t count)
1621 {
1622 	struct rbd_device *rbd_dev = NULL;
1623 	int target_id, rc;
1624 	unsigned long ul;
1625 	int ret = count;
1626 
1627 	rc = strict_strtoul(buf, 10, &ul);
1628 	if (rc)
1629 		return rc;
1630 
1631 	/* convert to int; abort if we lost anything in the conversion */
1632 	target_id = (int) ul;
1633 	if (target_id != ul)
1634 		return -EINVAL;
1635 
1636 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1637 
1638 	rbd_dev = __rbd_get_dev(target_id);
1639 	if (!rbd_dev) {
1640 		ret = -ENOENT;
1641 		goto done;
1642 	}
1643 
1644 	rc = rbd_update_snaps(rbd_dev);
1645 	if (rc < 0)
1646 		ret = rc;
1647 
1648 done:
1649 	mutex_unlock(&ctl_mutex);
1650 	return ret;
1651 }
1652 
1653 static ssize_t class_rbd_snap_create(struct class *c,
1654 				struct class_attribute *attr,
1655 				const char *buf,
1656 				size_t count)
1657 {
1658 	struct rbd_device *rbd_dev = NULL;
1659 	int target_id, ret;
1660 	char *name;
1661 
1662 	name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL);
1663 	if (!name)
1664 		return -ENOMEM;
1665 
1666 	/* parse snaps add command */
1667 	if (sscanf(buf, "%d "
1668 		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1669 		   &target_id,
1670 		   name) != 2) {
1671 		ret = -EINVAL;
1672 		goto done;
1673 	}
1674 
1675 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1676 
1677 	rbd_dev = __rbd_get_dev(target_id);
1678 	if (!rbd_dev) {
1679 		ret = -ENOENT;
1680 		goto done_unlock;
1681 	}
1682 
1683 	ret = rbd_header_add_snap(rbd_dev,
1684 				  name, GFP_KERNEL);
1685 	if (ret < 0)
1686 		goto done_unlock;
1687 
1688 	ret = rbd_update_snaps(rbd_dev);
1689 	if (ret < 0)
1690 		goto done_unlock;
1691 
1692 	ret = count;
1693 done_unlock:
1694 	mutex_unlock(&ctl_mutex);
1695 done:
1696 	kfree(name);
1697 	return ret;
1698 }
1699 
1700 static ssize_t class_rbd_rollback(struct class *c,
1701 				struct class_attribute *attr,
1702 				const char *buf,
1703 				size_t count)
1704 {
1705 	struct rbd_device *rbd_dev = NULL;
1706 	int target_id, ret;
1707 	u64 snapid;
1708 	char snap_name[RBD_MAX_SNAP_NAME_LEN];
1709 	u64 cur_ofs;
1710 	char *seg_name;
1711 
1712 	/* parse snaps add command */
1713 	if (sscanf(buf, "%d "
1714 		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1715 		   &target_id,
1716 		   snap_name) != 2) {
1717 		return -EINVAL;
1718 	}
1719 
1720 	ret = -ENOMEM;
1721 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1722 	if (!seg_name)
1723 		return ret;
1724 
1725 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1726 
1727 	rbd_dev = __rbd_get_dev(target_id);
1728 	if (!rbd_dev) {
1729 		ret = -ENOENT;
1730 		goto done_unlock;
1731 	}
1732 
1733 	ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1734 	if (ret < 0)
1735 		goto done_unlock;
1736 
1737 	dout("snapid=%lld\n", snapid);
1738 
1739 	cur_ofs = 0;
1740 	while (cur_ofs < rbd_dev->header.image_size) {
1741 		cur_ofs += rbd_get_segment(&rbd_dev->header,
1742 					   rbd_dev->obj,
1743 					   cur_ofs, (u64)-1,
1744 					   seg_name, NULL);
1745 		dout("seg_name=%s\n", seg_name);
1746 
1747 		ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1748 		if (ret < 0)
1749 			pr_warning("could not roll back obj %s err=%d\n",
1750 				   seg_name, ret);
1751 	}
1752 
1753 	ret = rbd_update_snaps(rbd_dev);
1754 	if (ret < 0)
1755 		goto done_unlock;
1756 
1757 	ret = count;
1758 
1759 done_unlock:
1760 	mutex_unlock(&ctl_mutex);
1761 	kfree(seg_name);
1762 
1763 	return ret;
1764 }
1765 
1766 static struct class_attribute class_rbd_attrs[] = {
1767 	__ATTR(add,		0200, NULL, class_rbd_add),
1768 	__ATTR(remove,		0200, NULL, class_rbd_remove),
1769 	__ATTR(list,		0444, class_rbd_list, NULL),
1770 	__ATTR(snaps_refresh,	0200, NULL, class_rbd_snaps_refresh),
1771 	__ATTR(snap_create,	0200, NULL, class_rbd_snap_create),
1772 	__ATTR(snaps_list,	0444, class_rbd_snaps_list, NULL),
1773 	__ATTR(snap_rollback,	0200, NULL, class_rbd_rollback),
1774 	__ATTR_NULL
1775 };
1776 
1777 /*
1778  * create control files in sysfs
1779  * /sys/class/rbd/...
1780  */
1781 static int rbd_sysfs_init(void)
1782 {
1783 	int ret = -ENOMEM;
1784 
1785 	class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
1786 	if (!class_rbd)
1787 		goto out;
1788 
1789 	class_rbd->name = DRV_NAME;
1790 	class_rbd->owner = THIS_MODULE;
1791 	class_rbd->class_release = class_rbd_release;
1792 	class_rbd->class_attrs = class_rbd_attrs;
1793 
1794 	ret = class_register(class_rbd);
1795 	if (ret)
1796 		goto out_class;
1797 	return 0;
1798 
1799 out_class:
1800 	kfree(class_rbd);
1801 	class_rbd = NULL;
1802 	pr_err(DRV_NAME ": failed to create class rbd\n");
1803 out:
1804 	return ret;
1805 }
1806 
1807 static void rbd_sysfs_cleanup(void)
1808 {
1809 	if (class_rbd)
1810 		class_destroy(class_rbd);
1811 	class_rbd = NULL;
1812 }
1813 
1814 int __init rbd_init(void)
1815 {
1816 	int rc;
1817 
1818 	rc = rbd_sysfs_init();
1819 	if (rc)
1820 		return rc;
1821 	spin_lock_init(&node_lock);
1822 	pr_info("loaded " DRV_NAME_LONG "\n");
1823 	return 0;
1824 }
1825 
1826 void __exit rbd_exit(void)
1827 {
1828 	rbd_sysfs_cleanup();
1829 }
1830 
1831 module_init(rbd_init);
1832 module_exit(rbd_exit);
1833 
1834 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
1835 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
1836 MODULE_DESCRIPTION("rados block device");
1837 
1838 /* following authorship retained from original osdblk.c */
1839 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
1840 
1841 MODULE_LICENSE("GPL");
1842