xref: /linux/drivers/block/rbd.c (revision 90ab5ee94171b3e28de6bb42ee30b527014e0be7)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46 
47 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
48 
49 #define RBD_MAX_MD_NAME_LEN	(96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN	64
51 #define RBD_MAX_SNAP_NAME_LEN	32
52 #define RBD_MAX_OPT_LEN		1024
53 
54 #define RBD_SNAP_HEAD_NAME	"-"
55 
56 #define DEV_NAME_LEN		32
57 
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59 
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64 	u64 image_size;
65 	char block_name[32];
66 	__u8 obj_order;
67 	__u8 crypt_type;
68 	__u8 comp_type;
69 	struct rw_semaphore snap_rwsem;
70 	struct ceph_snap_context *snapc;
71 	size_t snap_names_len;
72 	u64 snap_seq;
73 	u32 total_snaps;
74 
75 	char *snap_names;
76 	u64 *snap_sizes;
77 
78 	u64 obj_version;
79 };
80 
81 struct rbd_options {
82 	int	notify_timeout;
83 };
84 
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89 	struct ceph_client	*client;
90 	struct rbd_options	*rbd_opts;
91 	struct kref		kref;
92 	struct list_head	node;
93 };
94 
95 struct rbd_req_coll;
96 
97 /*
98  * a single io request
99  */
100 struct rbd_request {
101 	struct request		*rq;		/* blk layer request */
102 	struct bio		*bio;		/* cloned bio */
103 	struct page		**pages;	/* list of used pages */
104 	u64			len;
105 	int			coll_index;
106 	struct rbd_req_coll	*coll;
107 };
108 
109 struct rbd_req_status {
110 	int done;
111 	int rc;
112 	u64 bytes;
113 };
114 
115 /*
116  * a collection of requests
117  */
118 struct rbd_req_coll {
119 	int			total;
120 	int			num_done;
121 	struct kref		kref;
122 	struct rbd_req_status	status[0];
123 };
124 
125 struct rbd_snap {
126 	struct	device		dev;
127 	const char		*name;
128 	size_t			size;
129 	struct list_head	node;
130 	u64			id;
131 };
132 
133 /*
134  * a single device
135  */
136 struct rbd_device {
137 	int			id;		/* blkdev unique id */
138 
139 	int			major;		/* blkdev assigned major */
140 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
141 	struct request_queue	*q;
142 
143 	struct ceph_client	*client;
144 	struct rbd_client	*rbd_client;
145 
146 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147 
148 	spinlock_t		lock;		/* queue lock */
149 
150 	struct rbd_image_header	header;
151 	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152 	int			obj_len;
153 	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154 	char			pool_name[RBD_MAX_POOL_NAME_LEN];
155 	int			poolid;
156 
157 	struct ceph_osd_event   *watch_event;
158 	struct ceph_osd_request *watch_request;
159 
160 	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
161 	u32 cur_snap;	/* index+1 of current snapshot within snap context
162 			   0 - for the head */
163 	int read_only;
164 
165 	struct list_head	node;
166 
167 	/* list of snapshots */
168 	struct list_head	snaps;
169 
170 	/* sysfs related */
171 	struct device		dev;
172 };
173 
174 static struct bus_type rbd_bus_type = {
175 	.name		= "rbd",
176 };
177 
178 static spinlock_t node_lock;      /* protects client get/put */
179 
180 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list);    /* devices */
182 static LIST_HEAD(rbd_client_list);      /* clients */
183 
184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185 static void rbd_dev_release(struct device *dev);
186 static ssize_t rbd_snap_add(struct device *dev,
187 			    struct device_attribute *attr,
188 			    const char *buf,
189 			    size_t count);
190 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
191 				  struct rbd_snap *snap);
192 
193 
194 static struct rbd_device *dev_to_rbd(struct device *dev)
195 {
196 	return container_of(dev, struct rbd_device, dev);
197 }
198 
199 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
200 {
201 	return get_device(&rbd_dev->dev);
202 }
203 
204 static void rbd_put_dev(struct rbd_device *rbd_dev)
205 {
206 	put_device(&rbd_dev->dev);
207 }
208 
209 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
210 
211 static int rbd_open(struct block_device *bdev, fmode_t mode)
212 {
213 	struct gendisk *disk = bdev->bd_disk;
214 	struct rbd_device *rbd_dev = disk->private_data;
215 
216 	rbd_get_dev(rbd_dev);
217 
218 	set_device_ro(bdev, rbd_dev->read_only);
219 
220 	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
221 		return -EROFS;
222 
223 	return 0;
224 }
225 
226 static int rbd_release(struct gendisk *disk, fmode_t mode)
227 {
228 	struct rbd_device *rbd_dev = disk->private_data;
229 
230 	rbd_put_dev(rbd_dev);
231 
232 	return 0;
233 }
234 
235 static const struct block_device_operations rbd_bd_ops = {
236 	.owner			= THIS_MODULE,
237 	.open			= rbd_open,
238 	.release		= rbd_release,
239 };
240 
241 /*
242  * Initialize an rbd client instance.
243  * We own *opt.
244  */
245 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
246 					    struct rbd_options *rbd_opts)
247 {
248 	struct rbd_client *rbdc;
249 	int ret = -ENOMEM;
250 
251 	dout("rbd_client_create\n");
252 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
253 	if (!rbdc)
254 		goto out_opt;
255 
256 	kref_init(&rbdc->kref);
257 	INIT_LIST_HEAD(&rbdc->node);
258 
259 	rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
260 	if (IS_ERR(rbdc->client))
261 		goto out_rbdc;
262 	opt = NULL; /* Now rbdc->client is responsible for opt */
263 
264 	ret = ceph_open_session(rbdc->client);
265 	if (ret < 0)
266 		goto out_err;
267 
268 	rbdc->rbd_opts = rbd_opts;
269 
270 	spin_lock(&node_lock);
271 	list_add_tail(&rbdc->node, &rbd_client_list);
272 	spin_unlock(&node_lock);
273 
274 	dout("rbd_client_create created %p\n", rbdc);
275 	return rbdc;
276 
277 out_err:
278 	ceph_destroy_client(rbdc->client);
279 out_rbdc:
280 	kfree(rbdc);
281 out_opt:
282 	if (opt)
283 		ceph_destroy_options(opt);
284 	return ERR_PTR(ret);
285 }
286 
287 /*
288  * Find a ceph client with specific addr and configuration.
289  */
290 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
291 {
292 	struct rbd_client *client_node;
293 
294 	if (opt->flags & CEPH_OPT_NOSHARE)
295 		return NULL;
296 
297 	list_for_each_entry(client_node, &rbd_client_list, node)
298 		if (ceph_compare_options(opt, client_node->client) == 0)
299 			return client_node;
300 	return NULL;
301 }
302 
303 /*
304  * mount options
305  */
306 enum {
307 	Opt_notify_timeout,
308 	Opt_last_int,
309 	/* int args above */
310 	Opt_last_string,
311 	/* string args above */
312 };
313 
314 static match_table_t rbdopt_tokens = {
315 	{Opt_notify_timeout, "notify_timeout=%d"},
316 	/* int args above */
317 	/* string args above */
318 	{-1, NULL}
319 };
320 
321 static int parse_rbd_opts_token(char *c, void *private)
322 {
323 	struct rbd_options *rbdopt = private;
324 	substring_t argstr[MAX_OPT_ARGS];
325 	int token, intval, ret;
326 
327 	token = match_token((char *)c, rbdopt_tokens, argstr);
328 	if (token < 0)
329 		return -EINVAL;
330 
331 	if (token < Opt_last_int) {
332 		ret = match_int(&argstr[0], &intval);
333 		if (ret < 0) {
334 			pr_err("bad mount option arg (not int) "
335 			       "at '%s'\n", c);
336 			return ret;
337 		}
338 		dout("got int token %d val %d\n", token, intval);
339 	} else if (token > Opt_last_int && token < Opt_last_string) {
340 		dout("got string token %d val %s\n", token,
341 		     argstr[0].from);
342 	} else {
343 		dout("got token %d\n", token);
344 	}
345 
346 	switch (token) {
347 	case Opt_notify_timeout:
348 		rbdopt->notify_timeout = intval;
349 		break;
350 	default:
351 		BUG_ON(token);
352 	}
353 	return 0;
354 }
355 
356 /*
357  * Get a ceph client with specific addr and configuration, if one does
358  * not exist create it.
359  */
360 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
361 			  char *options)
362 {
363 	struct rbd_client *rbdc;
364 	struct ceph_options *opt;
365 	int ret;
366 	struct rbd_options *rbd_opts;
367 
368 	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
369 	if (!rbd_opts)
370 		return -ENOMEM;
371 
372 	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
373 
374 	ret = ceph_parse_options(&opt, options, mon_addr,
375 				 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
376 	if (ret < 0)
377 		goto done_err;
378 
379 	spin_lock(&node_lock);
380 	rbdc = __rbd_client_find(opt);
381 	if (rbdc) {
382 		ceph_destroy_options(opt);
383 
384 		/* using an existing client */
385 		kref_get(&rbdc->kref);
386 		rbd_dev->rbd_client = rbdc;
387 		rbd_dev->client = rbdc->client;
388 		spin_unlock(&node_lock);
389 		return 0;
390 	}
391 	spin_unlock(&node_lock);
392 
393 	rbdc = rbd_client_create(opt, rbd_opts);
394 	if (IS_ERR(rbdc)) {
395 		ret = PTR_ERR(rbdc);
396 		goto done_err;
397 	}
398 
399 	rbd_dev->rbd_client = rbdc;
400 	rbd_dev->client = rbdc->client;
401 	return 0;
402 done_err:
403 	kfree(rbd_opts);
404 	return ret;
405 }
406 
407 /*
408  * Destroy ceph client
409  */
410 static void rbd_client_release(struct kref *kref)
411 {
412 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
413 
414 	dout("rbd_release_client %p\n", rbdc);
415 	spin_lock(&node_lock);
416 	list_del(&rbdc->node);
417 	spin_unlock(&node_lock);
418 
419 	ceph_destroy_client(rbdc->client);
420 	kfree(rbdc->rbd_opts);
421 	kfree(rbdc);
422 }
423 
424 /*
425  * Drop reference to ceph client node. If it's not referenced anymore, release
426  * it.
427  */
428 static void rbd_put_client(struct rbd_device *rbd_dev)
429 {
430 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
431 	rbd_dev->rbd_client = NULL;
432 	rbd_dev->client = NULL;
433 }
434 
435 /*
436  * Destroy requests collection
437  */
438 static void rbd_coll_release(struct kref *kref)
439 {
440 	struct rbd_req_coll *coll =
441 		container_of(kref, struct rbd_req_coll, kref);
442 
443 	dout("rbd_coll_release %p\n", coll);
444 	kfree(coll);
445 }
446 
447 /*
448  * Create a new header structure, translate header format from the on-disk
449  * header.
450  */
451 static int rbd_header_from_disk(struct rbd_image_header *header,
452 				 struct rbd_image_header_ondisk *ondisk,
453 				 int allocated_snaps,
454 				 gfp_t gfp_flags)
455 {
456 	int i;
457 	u32 snap_count = le32_to_cpu(ondisk->snap_count);
458 	int ret = -ENOMEM;
459 
460 	if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) {
461 		return -ENXIO;
462 	}
463 
464 	init_rwsem(&header->snap_rwsem);
465 	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
466 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
467 				snap_count *
468 				 sizeof(struct rbd_image_snap_ondisk),
469 				gfp_flags);
470 	if (!header->snapc)
471 		return -ENOMEM;
472 	if (snap_count) {
473 		header->snap_names = kmalloc(header->snap_names_len,
474 					     GFP_KERNEL);
475 		if (!header->snap_names)
476 			goto err_snapc;
477 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
478 					     GFP_KERNEL);
479 		if (!header->snap_sizes)
480 			goto err_names;
481 	} else {
482 		header->snap_names = NULL;
483 		header->snap_sizes = NULL;
484 	}
485 	memcpy(header->block_name, ondisk->block_name,
486 	       sizeof(ondisk->block_name));
487 
488 	header->image_size = le64_to_cpu(ondisk->image_size);
489 	header->obj_order = ondisk->options.order;
490 	header->crypt_type = ondisk->options.crypt_type;
491 	header->comp_type = ondisk->options.comp_type;
492 
493 	atomic_set(&header->snapc->nref, 1);
494 	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495 	header->snapc->num_snaps = snap_count;
496 	header->total_snaps = snap_count;
497 
498 	if (snap_count &&
499 	    allocated_snaps == snap_count) {
500 		for (i = 0; i < snap_count; i++) {
501 			header->snapc->snaps[i] =
502 				le64_to_cpu(ondisk->snaps[i].id);
503 			header->snap_sizes[i] =
504 				le64_to_cpu(ondisk->snaps[i].image_size);
505 		}
506 
507 		/* copy snapshot names */
508 		memcpy(header->snap_names, &ondisk->snaps[i],
509 			header->snap_names_len);
510 	}
511 
512 	return 0;
513 
514 err_names:
515 	kfree(header->snap_names);
516 err_snapc:
517 	kfree(header->snapc);
518 	return ret;
519 }
520 
521 static int snap_index(struct rbd_image_header *header, int snap_num)
522 {
523 	return header->total_snaps - snap_num;
524 }
525 
526 static u64 cur_snap_id(struct rbd_device *rbd_dev)
527 {
528 	struct rbd_image_header *header = &rbd_dev->header;
529 
530 	if (!rbd_dev->cur_snap)
531 		return 0;
532 
533 	return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
534 }
535 
536 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
537 			u64 *seq, u64 *size)
538 {
539 	int i;
540 	char *p = header->snap_names;
541 
542 	for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
543 		if (strcmp(snap_name, p) == 0)
544 			break;
545 	}
546 	if (i == header->total_snaps)
547 		return -ENOENT;
548 	if (seq)
549 		*seq = header->snapc->snaps[i];
550 
551 	if (size)
552 		*size = header->snap_sizes[i];
553 
554 	return i;
555 }
556 
557 static int rbd_header_set_snap(struct rbd_device *dev,
558 			       const char *snap_name,
559 			       u64 *size)
560 {
561 	struct rbd_image_header *header = &dev->header;
562 	struct ceph_snap_context *snapc = header->snapc;
563 	int ret = -ENOENT;
564 
565 	down_write(&header->snap_rwsem);
566 
567 	if (!snap_name ||
568 	    !*snap_name ||
569 	    strcmp(snap_name, "-") == 0 ||
570 	    strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
571 		if (header->total_snaps)
572 			snapc->seq = header->snap_seq;
573 		else
574 			snapc->seq = 0;
575 		dev->cur_snap = 0;
576 		dev->read_only = 0;
577 		if (size)
578 			*size = header->image_size;
579 	} else {
580 		ret = snap_by_name(header, snap_name, &snapc->seq, size);
581 		if (ret < 0)
582 			goto done;
583 
584 		dev->cur_snap = header->total_snaps - ret;
585 		dev->read_only = 1;
586 	}
587 
588 	ret = 0;
589 done:
590 	up_write(&header->snap_rwsem);
591 	return ret;
592 }
593 
594 static void rbd_header_free(struct rbd_image_header *header)
595 {
596 	kfree(header->snapc);
597 	kfree(header->snap_names);
598 	kfree(header->snap_sizes);
599 }
600 
601 /*
602  * get the actual striped segment name, offset and length
603  */
604 static u64 rbd_get_segment(struct rbd_image_header *header,
605 			   const char *block_name,
606 			   u64 ofs, u64 len,
607 			   char *seg_name, u64 *segofs)
608 {
609 	u64 seg = ofs >> header->obj_order;
610 
611 	if (seg_name)
612 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
613 			 "%s.%012llx", block_name, seg);
614 
615 	ofs = ofs & ((1 << header->obj_order) - 1);
616 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
617 
618 	if (segofs)
619 		*segofs = ofs;
620 
621 	return len;
622 }
623 
624 static int rbd_get_num_segments(struct rbd_image_header *header,
625 				u64 ofs, u64 len)
626 {
627 	u64 start_seg = ofs >> header->obj_order;
628 	u64 end_seg = (ofs + len - 1) >> header->obj_order;
629 	return end_seg - start_seg + 1;
630 }
631 
632 /*
633  * returns the size of an object in the image
634  */
635 static u64 rbd_obj_bytes(struct rbd_image_header *header)
636 {
637 	return 1 << header->obj_order;
638 }
639 
640 /*
641  * bio helpers
642  */
643 
644 static void bio_chain_put(struct bio *chain)
645 {
646 	struct bio *tmp;
647 
648 	while (chain) {
649 		tmp = chain;
650 		chain = chain->bi_next;
651 		bio_put(tmp);
652 	}
653 }
654 
655 /*
656  * zeros a bio chain, starting at specific offset
657  */
658 static void zero_bio_chain(struct bio *chain, int start_ofs)
659 {
660 	struct bio_vec *bv;
661 	unsigned long flags;
662 	void *buf;
663 	int i;
664 	int pos = 0;
665 
666 	while (chain) {
667 		bio_for_each_segment(bv, chain, i) {
668 			if (pos + bv->bv_len > start_ofs) {
669 				int remainder = max(start_ofs - pos, 0);
670 				buf = bvec_kmap_irq(bv, &flags);
671 				memset(buf + remainder, 0,
672 				       bv->bv_len - remainder);
673 				bvec_kunmap_irq(buf, &flags);
674 			}
675 			pos += bv->bv_len;
676 		}
677 
678 		chain = chain->bi_next;
679 	}
680 }
681 
682 /*
683  * bio_chain_clone - clone a chain of bios up to a certain length.
684  * might return a bio_pair that will need to be released.
685  */
686 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
687 				   struct bio_pair **bp,
688 				   int len, gfp_t gfpmask)
689 {
690 	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
691 	int total = 0;
692 
693 	if (*bp) {
694 		bio_pair_release(*bp);
695 		*bp = NULL;
696 	}
697 
698 	while (old_chain && (total < len)) {
699 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
700 		if (!tmp)
701 			goto err_out;
702 
703 		if (total + old_chain->bi_size > len) {
704 			struct bio_pair *bp;
705 
706 			/*
707 			 * this split can only happen with a single paged bio,
708 			 * split_bio will BUG_ON if this is not the case
709 			 */
710 			dout("bio_chain_clone split! total=%d remaining=%d"
711 			     "bi_size=%d\n",
712 			     (int)total, (int)len-total,
713 			     (int)old_chain->bi_size);
714 
715 			/* split the bio. We'll release it either in the next
716 			   call, or it will have to be released outside */
717 			bp = bio_split(old_chain, (len - total) / 512ULL);
718 			if (!bp)
719 				goto err_out;
720 
721 			__bio_clone(tmp, &bp->bio1);
722 
723 			*next = &bp->bio2;
724 		} else {
725 			__bio_clone(tmp, old_chain);
726 			*next = old_chain->bi_next;
727 		}
728 
729 		tmp->bi_bdev = NULL;
730 		gfpmask &= ~__GFP_WAIT;
731 		tmp->bi_next = NULL;
732 
733 		if (!new_chain) {
734 			new_chain = tail = tmp;
735 		} else {
736 			tail->bi_next = tmp;
737 			tail = tmp;
738 		}
739 		old_chain = old_chain->bi_next;
740 
741 		total += tmp->bi_size;
742 	}
743 
744 	BUG_ON(total < len);
745 
746 	if (tail)
747 		tail->bi_next = NULL;
748 
749 	*old = old_chain;
750 
751 	return new_chain;
752 
753 err_out:
754 	dout("bio_chain_clone with err\n");
755 	bio_chain_put(new_chain);
756 	return NULL;
757 }
758 
759 /*
760  * helpers for osd request op vectors.
761  */
762 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
763 			    int num_ops,
764 			    int opcode,
765 			    u32 payload_len)
766 {
767 	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
768 		       GFP_NOIO);
769 	if (!*ops)
770 		return -ENOMEM;
771 	(*ops)[0].op = opcode;
772 	/*
773 	 * op extent offset and length will be set later on
774 	 * in calc_raw_layout()
775 	 */
776 	(*ops)[0].payload_len = payload_len;
777 	return 0;
778 }
779 
780 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
781 {
782 	kfree(ops);
783 }
784 
785 static void rbd_coll_end_req_index(struct request *rq,
786 				   struct rbd_req_coll *coll,
787 				   int index,
788 				   int ret, u64 len)
789 {
790 	struct request_queue *q;
791 	int min, max, i;
792 
793 	dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
794 	     coll, index, ret, len);
795 
796 	if (!rq)
797 		return;
798 
799 	if (!coll) {
800 		blk_end_request(rq, ret, len);
801 		return;
802 	}
803 
804 	q = rq->q;
805 
806 	spin_lock_irq(q->queue_lock);
807 	coll->status[index].done = 1;
808 	coll->status[index].rc = ret;
809 	coll->status[index].bytes = len;
810 	max = min = coll->num_done;
811 	while (max < coll->total && coll->status[max].done)
812 		max++;
813 
814 	for (i = min; i<max; i++) {
815 		__blk_end_request(rq, coll->status[i].rc,
816 				  coll->status[i].bytes);
817 		coll->num_done++;
818 		kref_put(&coll->kref, rbd_coll_release);
819 	}
820 	spin_unlock_irq(q->queue_lock);
821 }
822 
823 static void rbd_coll_end_req(struct rbd_request *req,
824 			     int ret, u64 len)
825 {
826 	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
827 }
828 
829 /*
830  * Send ceph osd request
831  */
832 static int rbd_do_request(struct request *rq,
833 			  struct rbd_device *dev,
834 			  struct ceph_snap_context *snapc,
835 			  u64 snapid,
836 			  const char *obj, u64 ofs, u64 len,
837 			  struct bio *bio,
838 			  struct page **pages,
839 			  int num_pages,
840 			  int flags,
841 			  struct ceph_osd_req_op *ops,
842 			  int num_reply,
843 			  struct rbd_req_coll *coll,
844 			  int coll_index,
845 			  void (*rbd_cb)(struct ceph_osd_request *req,
846 					 struct ceph_msg *msg),
847 			  struct ceph_osd_request **linger_req,
848 			  u64 *ver)
849 {
850 	struct ceph_osd_request *req;
851 	struct ceph_file_layout *layout;
852 	int ret;
853 	u64 bno;
854 	struct timespec mtime = CURRENT_TIME;
855 	struct rbd_request *req_data;
856 	struct ceph_osd_request_head *reqhead;
857 	struct rbd_image_header *header = &dev->header;
858 
859 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
860 	if (!req_data) {
861 		if (coll)
862 			rbd_coll_end_req_index(rq, coll, coll_index,
863 					       -ENOMEM, len);
864 		return -ENOMEM;
865 	}
866 
867 	if (coll) {
868 		req_data->coll = coll;
869 		req_data->coll_index = coll_index;
870 	}
871 
872 	dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
873 
874 	down_read(&header->snap_rwsem);
875 
876 	req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
877 				      snapc,
878 				      ops,
879 				      false,
880 				      GFP_NOIO, pages, bio);
881 	if (!req) {
882 		up_read(&header->snap_rwsem);
883 		ret = -ENOMEM;
884 		goto done_pages;
885 	}
886 
887 	req->r_callback = rbd_cb;
888 
889 	req_data->rq = rq;
890 	req_data->bio = bio;
891 	req_data->pages = pages;
892 	req_data->len = len;
893 
894 	req->r_priv = req_data;
895 
896 	reqhead = req->r_request->front.iov_base;
897 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
898 
899 	strncpy(req->r_oid, obj, sizeof(req->r_oid));
900 	req->r_oid_len = strlen(req->r_oid);
901 
902 	layout = &req->r_file_layout;
903 	memset(layout, 0, sizeof(*layout));
904 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
905 	layout->fl_stripe_count = cpu_to_le32(1);
906 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
907 	layout->fl_pg_preferred = cpu_to_le32(-1);
908 	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
909 	ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
910 			     ofs, &len, &bno, req, ops);
911 
912 	ceph_osdc_build_request(req, ofs, &len,
913 				ops,
914 				snapc,
915 				&mtime,
916 				req->r_oid, req->r_oid_len);
917 	up_read(&header->snap_rwsem);
918 
919 	if (linger_req) {
920 		ceph_osdc_set_request_linger(&dev->client->osdc, req);
921 		*linger_req = req;
922 	}
923 
924 	ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
925 	if (ret < 0)
926 		goto done_err;
927 
928 	if (!rbd_cb) {
929 		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
930 		if (ver)
931 			*ver = le64_to_cpu(req->r_reassert_version.version);
932 		dout("reassert_ver=%lld\n",
933 		     le64_to_cpu(req->r_reassert_version.version));
934 		ceph_osdc_put_request(req);
935 	}
936 	return ret;
937 
938 done_err:
939 	bio_chain_put(req_data->bio);
940 	ceph_osdc_put_request(req);
941 done_pages:
942 	rbd_coll_end_req(req_data, ret, len);
943 	kfree(req_data);
944 	return ret;
945 }
946 
947 /*
948  * Ceph osd op callback
949  */
950 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
951 {
952 	struct rbd_request *req_data = req->r_priv;
953 	struct ceph_osd_reply_head *replyhead;
954 	struct ceph_osd_op *op;
955 	__s32 rc;
956 	u64 bytes;
957 	int read_op;
958 
959 	/* parse reply */
960 	replyhead = msg->front.iov_base;
961 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
962 	op = (void *)(replyhead + 1);
963 	rc = le32_to_cpu(replyhead->result);
964 	bytes = le64_to_cpu(op->extent.length);
965 	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
966 
967 	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
968 
969 	if (rc == -ENOENT && read_op) {
970 		zero_bio_chain(req_data->bio, 0);
971 		rc = 0;
972 	} else if (rc == 0 && read_op && bytes < req_data->len) {
973 		zero_bio_chain(req_data->bio, bytes);
974 		bytes = req_data->len;
975 	}
976 
977 	rbd_coll_end_req(req_data, rc, bytes);
978 
979 	if (req_data->bio)
980 		bio_chain_put(req_data->bio);
981 
982 	ceph_osdc_put_request(req);
983 	kfree(req_data);
984 }
985 
986 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
987 {
988 	ceph_osdc_put_request(req);
989 }
990 
991 /*
992  * Do a synchronous ceph osd operation
993  */
994 static int rbd_req_sync_op(struct rbd_device *dev,
995 			   struct ceph_snap_context *snapc,
996 			   u64 snapid,
997 			   int opcode,
998 			   int flags,
999 			   struct ceph_osd_req_op *orig_ops,
1000 			   int num_reply,
1001 			   const char *obj,
1002 			   u64 ofs, u64 len,
1003 			   char *buf,
1004 			   struct ceph_osd_request **linger_req,
1005 			   u64 *ver)
1006 {
1007 	int ret;
1008 	struct page **pages;
1009 	int num_pages;
1010 	struct ceph_osd_req_op *ops = orig_ops;
1011 	u32 payload_len;
1012 
1013 	num_pages = calc_pages_for(ofs , len);
1014 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1015 	if (IS_ERR(pages))
1016 		return PTR_ERR(pages);
1017 
1018 	if (!orig_ops) {
1019 		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1020 		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1021 		if (ret < 0)
1022 			goto done;
1023 
1024 		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1025 			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1026 			if (ret < 0)
1027 				goto done_ops;
1028 		}
1029 	}
1030 
1031 	ret = rbd_do_request(NULL, dev, snapc, snapid,
1032 			  obj, ofs, len, NULL,
1033 			  pages, num_pages,
1034 			  flags,
1035 			  ops,
1036 			  2,
1037 			  NULL, 0,
1038 			  NULL,
1039 			  linger_req, ver);
1040 	if (ret < 0)
1041 		goto done_ops;
1042 
1043 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1044 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1045 
1046 done_ops:
1047 	if (!orig_ops)
1048 		rbd_destroy_ops(ops);
1049 done:
1050 	ceph_release_page_vector(pages, num_pages);
1051 	return ret;
1052 }
1053 
1054 /*
1055  * Do an asynchronous ceph osd operation
1056  */
1057 static int rbd_do_op(struct request *rq,
1058 		     struct rbd_device *rbd_dev ,
1059 		     struct ceph_snap_context *snapc,
1060 		     u64 snapid,
1061 		     int opcode, int flags, int num_reply,
1062 		     u64 ofs, u64 len,
1063 		     struct bio *bio,
1064 		     struct rbd_req_coll *coll,
1065 		     int coll_index)
1066 {
1067 	char *seg_name;
1068 	u64 seg_ofs;
1069 	u64 seg_len;
1070 	int ret;
1071 	struct ceph_osd_req_op *ops;
1072 	u32 payload_len;
1073 
1074 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1075 	if (!seg_name)
1076 		return -ENOMEM;
1077 
1078 	seg_len = rbd_get_segment(&rbd_dev->header,
1079 				  rbd_dev->header.block_name,
1080 				  ofs, len,
1081 				  seg_name, &seg_ofs);
1082 
1083 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1084 
1085 	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1086 	if (ret < 0)
1087 		goto done;
1088 
1089 	/* we've taken care of segment sizes earlier when we
1090 	   cloned the bios. We should never have a segment
1091 	   truncated at this point */
1092 	BUG_ON(seg_len < len);
1093 
1094 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1095 			     seg_name, seg_ofs, seg_len,
1096 			     bio,
1097 			     NULL, 0,
1098 			     flags,
1099 			     ops,
1100 			     num_reply,
1101 			     coll, coll_index,
1102 			     rbd_req_cb, 0, NULL);
1103 
1104 	rbd_destroy_ops(ops);
1105 done:
1106 	kfree(seg_name);
1107 	return ret;
1108 }
1109 
1110 /*
1111  * Request async osd write
1112  */
1113 static int rbd_req_write(struct request *rq,
1114 			 struct rbd_device *rbd_dev,
1115 			 struct ceph_snap_context *snapc,
1116 			 u64 ofs, u64 len,
1117 			 struct bio *bio,
1118 			 struct rbd_req_coll *coll,
1119 			 int coll_index)
1120 {
1121 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1122 			 CEPH_OSD_OP_WRITE,
1123 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1124 			 2,
1125 			 ofs, len, bio, coll, coll_index);
1126 }
1127 
1128 /*
1129  * Request async osd read
1130  */
1131 static int rbd_req_read(struct request *rq,
1132 			 struct rbd_device *rbd_dev,
1133 			 u64 snapid,
1134 			 u64 ofs, u64 len,
1135 			 struct bio *bio,
1136 			 struct rbd_req_coll *coll,
1137 			 int coll_index)
1138 {
1139 	return rbd_do_op(rq, rbd_dev, NULL,
1140 			 (snapid ? snapid : CEPH_NOSNAP),
1141 			 CEPH_OSD_OP_READ,
1142 			 CEPH_OSD_FLAG_READ,
1143 			 2,
1144 			 ofs, len, bio, coll, coll_index);
1145 }
1146 
1147 /*
1148  * Request sync osd read
1149  */
1150 static int rbd_req_sync_read(struct rbd_device *dev,
1151 			  struct ceph_snap_context *snapc,
1152 			  u64 snapid,
1153 			  const char *obj,
1154 			  u64 ofs, u64 len,
1155 			  char *buf,
1156 			  u64 *ver)
1157 {
1158 	return rbd_req_sync_op(dev, NULL,
1159 			       (snapid ? snapid : CEPH_NOSNAP),
1160 			       CEPH_OSD_OP_READ,
1161 			       CEPH_OSD_FLAG_READ,
1162 			       NULL,
1163 			       1, obj, ofs, len, buf, NULL, ver);
1164 }
1165 
1166 /*
1167  * Request sync osd watch
1168  */
1169 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1170 				   u64 ver,
1171 				   u64 notify_id,
1172 				   const char *obj)
1173 {
1174 	struct ceph_osd_req_op *ops;
1175 	struct page **pages = NULL;
1176 	int ret;
1177 
1178 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1179 	if (ret < 0)
1180 		return ret;
1181 
1182 	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1183 	ops[0].watch.cookie = notify_id;
1184 	ops[0].watch.flag = 0;
1185 
1186 	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1187 			  obj, 0, 0, NULL,
1188 			  pages, 0,
1189 			  CEPH_OSD_FLAG_READ,
1190 			  ops,
1191 			  1,
1192 			  NULL, 0,
1193 			  rbd_simple_req_cb, 0, NULL);
1194 
1195 	rbd_destroy_ops(ops);
1196 	return ret;
1197 }
1198 
1199 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1200 {
1201 	struct rbd_device *dev = (struct rbd_device *)data;
1202 	int rc;
1203 
1204 	if (!dev)
1205 		return;
1206 
1207 	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1208 		notify_id, (int)opcode);
1209 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1210 	rc = __rbd_update_snaps(dev);
1211 	mutex_unlock(&ctl_mutex);
1212 	if (rc)
1213 		pr_warning(DRV_NAME "%d got notification but failed to update"
1214 			   " snaps: %d\n", dev->major, rc);
1215 
1216 	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1217 }
1218 
1219 /*
1220  * Request sync osd watch
1221  */
1222 static int rbd_req_sync_watch(struct rbd_device *dev,
1223 			      const char *obj,
1224 			      u64 ver)
1225 {
1226 	struct ceph_osd_req_op *ops;
1227 	struct ceph_osd_client *osdc = &dev->client->osdc;
1228 
1229 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1230 	if (ret < 0)
1231 		return ret;
1232 
1233 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1234 				     (void *)dev, &dev->watch_event);
1235 	if (ret < 0)
1236 		goto fail;
1237 
1238 	ops[0].watch.ver = cpu_to_le64(ver);
1239 	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1240 	ops[0].watch.flag = 1;
1241 
1242 	ret = rbd_req_sync_op(dev, NULL,
1243 			      CEPH_NOSNAP,
1244 			      0,
1245 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1246 			      ops,
1247 			      1, obj, 0, 0, NULL,
1248 			      &dev->watch_request, NULL);
1249 
1250 	if (ret < 0)
1251 		goto fail_event;
1252 
1253 	rbd_destroy_ops(ops);
1254 	return 0;
1255 
1256 fail_event:
1257 	ceph_osdc_cancel_event(dev->watch_event);
1258 	dev->watch_event = NULL;
1259 fail:
1260 	rbd_destroy_ops(ops);
1261 	return ret;
1262 }
1263 
1264 /*
1265  * Request sync osd unwatch
1266  */
1267 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1268 				const char *obj)
1269 {
1270 	struct ceph_osd_req_op *ops;
1271 
1272 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1273 	if (ret < 0)
1274 		return ret;
1275 
1276 	ops[0].watch.ver = 0;
1277 	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1278 	ops[0].watch.flag = 0;
1279 
1280 	ret = rbd_req_sync_op(dev, NULL,
1281 			      CEPH_NOSNAP,
1282 			      0,
1283 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1284 			      ops,
1285 			      1, obj, 0, 0, NULL, NULL, NULL);
1286 
1287 	rbd_destroy_ops(ops);
1288 	ceph_osdc_cancel_event(dev->watch_event);
1289 	dev->watch_event = NULL;
1290 	return ret;
1291 }
1292 
1293 struct rbd_notify_info {
1294 	struct rbd_device *dev;
1295 };
1296 
1297 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1298 {
1299 	struct rbd_device *dev = (struct rbd_device *)data;
1300 	if (!dev)
1301 		return;
1302 
1303 	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1304 		notify_id, (int)opcode);
1305 }
1306 
1307 /*
1308  * Request sync osd notify
1309  */
1310 static int rbd_req_sync_notify(struct rbd_device *dev,
1311 		          const char *obj)
1312 {
1313 	struct ceph_osd_req_op *ops;
1314 	struct ceph_osd_client *osdc = &dev->client->osdc;
1315 	struct ceph_osd_event *event;
1316 	struct rbd_notify_info info;
1317 	int payload_len = sizeof(u32) + sizeof(u32);
1318 	int ret;
1319 
1320 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1321 	if (ret < 0)
1322 		return ret;
1323 
1324 	info.dev = dev;
1325 
1326 	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1327 				     (void *)&info, &event);
1328 	if (ret < 0)
1329 		goto fail;
1330 
1331 	ops[0].watch.ver = 1;
1332 	ops[0].watch.flag = 1;
1333 	ops[0].watch.cookie = event->cookie;
1334 	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1335 	ops[0].watch.timeout = 12;
1336 
1337 	ret = rbd_req_sync_op(dev, NULL,
1338 			       CEPH_NOSNAP,
1339 			       0,
1340 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1341 			       ops,
1342 			       1, obj, 0, 0, NULL, NULL, NULL);
1343 	if (ret < 0)
1344 		goto fail_event;
1345 
1346 	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1347 	dout("ceph_osdc_wait_event returned %d\n", ret);
1348 	rbd_destroy_ops(ops);
1349 	return 0;
1350 
1351 fail_event:
1352 	ceph_osdc_cancel_event(event);
1353 fail:
1354 	rbd_destroy_ops(ops);
1355 	return ret;
1356 }
1357 
1358 /*
1359  * Request sync osd read
1360  */
1361 static int rbd_req_sync_exec(struct rbd_device *dev,
1362 			     const char *obj,
1363 			     const char *cls,
1364 			     const char *method,
1365 			     const char *data,
1366 			     int len,
1367 			     u64 *ver)
1368 {
1369 	struct ceph_osd_req_op *ops;
1370 	int cls_len = strlen(cls);
1371 	int method_len = strlen(method);
1372 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1373 				    cls_len + method_len + len);
1374 	if (ret < 0)
1375 		return ret;
1376 
1377 	ops[0].cls.class_name = cls;
1378 	ops[0].cls.class_len = (__u8)cls_len;
1379 	ops[0].cls.method_name = method;
1380 	ops[0].cls.method_len = (__u8)method_len;
1381 	ops[0].cls.argc = 0;
1382 	ops[0].cls.indata = data;
1383 	ops[0].cls.indata_len = len;
1384 
1385 	ret = rbd_req_sync_op(dev, NULL,
1386 			       CEPH_NOSNAP,
1387 			       0,
1388 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1389 			       ops,
1390 			       1, obj, 0, 0, NULL, NULL, ver);
1391 
1392 	rbd_destroy_ops(ops);
1393 
1394 	dout("cls_exec returned %d\n", ret);
1395 	return ret;
1396 }
1397 
1398 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1399 {
1400 	struct rbd_req_coll *coll =
1401 			kzalloc(sizeof(struct rbd_req_coll) +
1402 			        sizeof(struct rbd_req_status) * num_reqs,
1403 				GFP_ATOMIC);
1404 
1405 	if (!coll)
1406 		return NULL;
1407 	coll->total = num_reqs;
1408 	kref_init(&coll->kref);
1409 	return coll;
1410 }
1411 
1412 /*
1413  * block device queue callback
1414  */
1415 static void rbd_rq_fn(struct request_queue *q)
1416 {
1417 	struct rbd_device *rbd_dev = q->queuedata;
1418 	struct request *rq;
1419 	struct bio_pair *bp = NULL;
1420 
1421 	rq = blk_fetch_request(q);
1422 
1423 	while (1) {
1424 		struct bio *bio;
1425 		struct bio *rq_bio, *next_bio = NULL;
1426 		bool do_write;
1427 		int size, op_size = 0;
1428 		u64 ofs;
1429 		int num_segs, cur_seg = 0;
1430 		struct rbd_req_coll *coll;
1431 
1432 		/* peek at request from block layer */
1433 		if (!rq)
1434 			break;
1435 
1436 		dout("fetched request\n");
1437 
1438 		/* filter out block requests we don't understand */
1439 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1440 			__blk_end_request_all(rq, 0);
1441 			goto next;
1442 		}
1443 
1444 		/* deduce our operation (read, write) */
1445 		do_write = (rq_data_dir(rq) == WRITE);
1446 
1447 		size = blk_rq_bytes(rq);
1448 		ofs = blk_rq_pos(rq) * 512ULL;
1449 		rq_bio = rq->bio;
1450 		if (do_write && rbd_dev->read_only) {
1451 			__blk_end_request_all(rq, -EROFS);
1452 			goto next;
1453 		}
1454 
1455 		spin_unlock_irq(q->queue_lock);
1456 
1457 		dout("%s 0x%x bytes at 0x%llx\n",
1458 		     do_write ? "write" : "read",
1459 		     size, blk_rq_pos(rq) * 512ULL);
1460 
1461 		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1462 		coll = rbd_alloc_coll(num_segs);
1463 		if (!coll) {
1464 			spin_lock_irq(q->queue_lock);
1465 			__blk_end_request_all(rq, -ENOMEM);
1466 			goto next;
1467 		}
1468 
1469 		do {
1470 			/* a bio clone to be passed down to OSD req */
1471 			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1472 			op_size = rbd_get_segment(&rbd_dev->header,
1473 						  rbd_dev->header.block_name,
1474 						  ofs, size,
1475 						  NULL, NULL);
1476 			kref_get(&coll->kref);
1477 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1478 					      op_size, GFP_ATOMIC);
1479 			if (!bio) {
1480 				rbd_coll_end_req_index(rq, coll, cur_seg,
1481 						       -ENOMEM, op_size);
1482 				goto next_seg;
1483 			}
1484 
1485 
1486 			/* init OSD command: write or read */
1487 			if (do_write)
1488 				rbd_req_write(rq, rbd_dev,
1489 					      rbd_dev->header.snapc,
1490 					      ofs,
1491 					      op_size, bio,
1492 					      coll, cur_seg);
1493 			else
1494 				rbd_req_read(rq, rbd_dev,
1495 					     cur_snap_id(rbd_dev),
1496 					     ofs,
1497 					     op_size, bio,
1498 					     coll, cur_seg);
1499 
1500 next_seg:
1501 			size -= op_size;
1502 			ofs += op_size;
1503 
1504 			cur_seg++;
1505 			rq_bio = next_bio;
1506 		} while (size > 0);
1507 		kref_put(&coll->kref, rbd_coll_release);
1508 
1509 		if (bp)
1510 			bio_pair_release(bp);
1511 		spin_lock_irq(q->queue_lock);
1512 next:
1513 		rq = blk_fetch_request(q);
1514 	}
1515 }
1516 
1517 /*
1518  * a queue callback. Makes sure that we don't create a bio that spans across
1519  * multiple osd objects. One exception would be with a single page bios,
1520  * which we handle later at bio_chain_clone
1521  */
1522 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1523 			  struct bio_vec *bvec)
1524 {
1525 	struct rbd_device *rbd_dev = q->queuedata;
1526 	unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1527 	sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1528 	unsigned int bio_sectors = bmd->bi_size >> 9;
1529 	int max;
1530 
1531 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1532 				 + bio_sectors)) << 9;
1533 	if (max < 0)
1534 		max = 0; /* bio_add cannot handle a negative return */
1535 	if (max <= bvec->bv_len && bio_sectors == 0)
1536 		return bvec->bv_len;
1537 	return max;
1538 }
1539 
1540 static void rbd_free_disk(struct rbd_device *rbd_dev)
1541 {
1542 	struct gendisk *disk = rbd_dev->disk;
1543 
1544 	if (!disk)
1545 		return;
1546 
1547 	rbd_header_free(&rbd_dev->header);
1548 
1549 	if (disk->flags & GENHD_FL_UP)
1550 		del_gendisk(disk);
1551 	if (disk->queue)
1552 		blk_cleanup_queue(disk->queue);
1553 	put_disk(disk);
1554 }
1555 
1556 /*
1557  * reload the ondisk the header
1558  */
1559 static int rbd_read_header(struct rbd_device *rbd_dev,
1560 			   struct rbd_image_header *header)
1561 {
1562 	ssize_t rc;
1563 	struct rbd_image_header_ondisk *dh;
1564 	int snap_count = 0;
1565 	u64 snap_names_len = 0;
1566 	u64 ver;
1567 
1568 	while (1) {
1569 		int len = sizeof(*dh) +
1570 			  snap_count * sizeof(struct rbd_image_snap_ondisk) +
1571 			  snap_names_len;
1572 
1573 		rc = -ENOMEM;
1574 		dh = kmalloc(len, GFP_KERNEL);
1575 		if (!dh)
1576 			return -ENOMEM;
1577 
1578 		rc = rbd_req_sync_read(rbd_dev,
1579 				       NULL, CEPH_NOSNAP,
1580 				       rbd_dev->obj_md_name,
1581 				       0, len,
1582 				       (char *)dh, &ver);
1583 		if (rc < 0)
1584 			goto out_dh;
1585 
1586 		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1587 		if (rc < 0) {
1588 			if (rc == -ENXIO) {
1589 				pr_warning("unrecognized header format"
1590 					   " for image %s", rbd_dev->obj);
1591 			}
1592 			goto out_dh;
1593 		}
1594 
1595 		if (snap_count != header->total_snaps) {
1596 			snap_count = header->total_snaps;
1597 			snap_names_len = header->snap_names_len;
1598 			rbd_header_free(header);
1599 			kfree(dh);
1600 			continue;
1601 		}
1602 		break;
1603 	}
1604 	header->obj_version = ver;
1605 
1606 out_dh:
1607 	kfree(dh);
1608 	return rc;
1609 }
1610 
1611 /*
1612  * create a snapshot
1613  */
1614 static int rbd_header_add_snap(struct rbd_device *dev,
1615 			       const char *snap_name,
1616 			       gfp_t gfp_flags)
1617 {
1618 	int name_len = strlen(snap_name);
1619 	u64 new_snapid;
1620 	int ret;
1621 	void *data, *p, *e;
1622 	u64 ver;
1623 
1624 	/* we should create a snapshot only if we're pointing at the head */
1625 	if (dev->cur_snap)
1626 		return -EINVAL;
1627 
1628 	ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1629 				      &new_snapid);
1630 	dout("created snapid=%lld\n", new_snapid);
1631 	if (ret < 0)
1632 		return ret;
1633 
1634 	data = kmalloc(name_len + 16, gfp_flags);
1635 	if (!data)
1636 		return -ENOMEM;
1637 
1638 	p = data;
1639 	e = data + name_len + 16;
1640 
1641 	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1642 	ceph_encode_64_safe(&p, e, new_snapid, bad);
1643 
1644 	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1645 				data, p - data, &ver);
1646 
1647 	kfree(data);
1648 
1649 	if (ret < 0)
1650 		return ret;
1651 
1652 	dev->header.snapc->seq =  new_snapid;
1653 
1654 	return 0;
1655 bad:
1656 	return -ERANGE;
1657 }
1658 
1659 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1660 {
1661 	struct rbd_snap *snap;
1662 
1663 	while (!list_empty(&rbd_dev->snaps)) {
1664 		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1665 		__rbd_remove_snap_dev(rbd_dev, snap);
1666 	}
1667 }
1668 
1669 /*
1670  * only read the first part of the ondisk header, without the snaps info
1671  */
1672 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1673 {
1674 	int ret;
1675 	struct rbd_image_header h;
1676 	u64 snap_seq;
1677 	int follow_seq = 0;
1678 
1679 	ret = rbd_read_header(rbd_dev, &h);
1680 	if (ret < 0)
1681 		return ret;
1682 
1683 	/* resized? */
1684 	set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1685 
1686 	down_write(&rbd_dev->header.snap_rwsem);
1687 
1688 	snap_seq = rbd_dev->header.snapc->seq;
1689 	if (rbd_dev->header.total_snaps &&
1690 	    rbd_dev->header.snapc->snaps[0] == snap_seq)
1691 		/* pointing at the head, will need to follow that
1692 		   if head moves */
1693 		follow_seq = 1;
1694 
1695 	kfree(rbd_dev->header.snapc);
1696 	kfree(rbd_dev->header.snap_names);
1697 	kfree(rbd_dev->header.snap_sizes);
1698 
1699 	rbd_dev->header.total_snaps = h.total_snaps;
1700 	rbd_dev->header.snapc = h.snapc;
1701 	rbd_dev->header.snap_names = h.snap_names;
1702 	rbd_dev->header.snap_names_len = h.snap_names_len;
1703 	rbd_dev->header.snap_sizes = h.snap_sizes;
1704 	if (follow_seq)
1705 		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1706 	else
1707 		rbd_dev->header.snapc->seq = snap_seq;
1708 
1709 	ret = __rbd_init_snaps_header(rbd_dev);
1710 
1711 	up_write(&rbd_dev->header.snap_rwsem);
1712 
1713 	return ret;
1714 }
1715 
1716 static int rbd_init_disk(struct rbd_device *rbd_dev)
1717 {
1718 	struct gendisk *disk;
1719 	struct request_queue *q;
1720 	int rc;
1721 	u64 total_size = 0;
1722 
1723 	/* contact OSD, request size info about the object being mapped */
1724 	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1725 	if (rc)
1726 		return rc;
1727 
1728 	/* no need to lock here, as rbd_dev is not registered yet */
1729 	rc = __rbd_init_snaps_header(rbd_dev);
1730 	if (rc)
1731 		return rc;
1732 
1733 	rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1734 	if (rc)
1735 		return rc;
1736 
1737 	/* create gendisk info */
1738 	rc = -ENOMEM;
1739 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1740 	if (!disk)
1741 		goto out;
1742 
1743 	snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1744 		 rbd_dev->id);
1745 	disk->major = rbd_dev->major;
1746 	disk->first_minor = 0;
1747 	disk->fops = &rbd_bd_ops;
1748 	disk->private_data = rbd_dev;
1749 
1750 	/* init rq */
1751 	rc = -ENOMEM;
1752 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1753 	if (!q)
1754 		goto out_disk;
1755 
1756 	/* set io sizes to object size */
1757 	blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1758 	blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1759 	blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1760 	blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1761 
1762 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1763 	disk->queue = q;
1764 
1765 	q->queuedata = rbd_dev;
1766 
1767 	rbd_dev->disk = disk;
1768 	rbd_dev->q = q;
1769 
1770 	/* finally, announce the disk to the world */
1771 	set_capacity(disk, total_size / 512ULL);
1772 	add_disk(disk);
1773 
1774 	pr_info("%s: added with size 0x%llx\n",
1775 		disk->disk_name, (unsigned long long)total_size);
1776 	return 0;
1777 
1778 out_disk:
1779 	put_disk(disk);
1780 out:
1781 	return rc;
1782 }
1783 
1784 /*
1785   sysfs
1786 */
1787 
1788 static ssize_t rbd_size_show(struct device *dev,
1789 			     struct device_attribute *attr, char *buf)
1790 {
1791 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1792 
1793 	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1794 }
1795 
1796 static ssize_t rbd_major_show(struct device *dev,
1797 			      struct device_attribute *attr, char *buf)
1798 {
1799 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1800 
1801 	return sprintf(buf, "%d\n", rbd_dev->major);
1802 }
1803 
1804 static ssize_t rbd_client_id_show(struct device *dev,
1805 				  struct device_attribute *attr, char *buf)
1806 {
1807 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1808 
1809 	return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1810 }
1811 
1812 static ssize_t rbd_pool_show(struct device *dev,
1813 			     struct device_attribute *attr, char *buf)
1814 {
1815 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1816 
1817 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1818 }
1819 
1820 static ssize_t rbd_name_show(struct device *dev,
1821 			     struct device_attribute *attr, char *buf)
1822 {
1823 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1824 
1825 	return sprintf(buf, "%s\n", rbd_dev->obj);
1826 }
1827 
1828 static ssize_t rbd_snap_show(struct device *dev,
1829 			     struct device_attribute *attr,
1830 			     char *buf)
1831 {
1832 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1833 
1834 	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1835 }
1836 
1837 static ssize_t rbd_image_refresh(struct device *dev,
1838 				 struct device_attribute *attr,
1839 				 const char *buf,
1840 				 size_t size)
1841 {
1842 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1843 	int rc;
1844 	int ret = size;
1845 
1846 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1847 
1848 	rc = __rbd_update_snaps(rbd_dev);
1849 	if (rc < 0)
1850 		ret = rc;
1851 
1852 	mutex_unlock(&ctl_mutex);
1853 	return ret;
1854 }
1855 
1856 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1857 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1858 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1859 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1860 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1861 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1862 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1863 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1864 
1865 static struct attribute *rbd_attrs[] = {
1866 	&dev_attr_size.attr,
1867 	&dev_attr_major.attr,
1868 	&dev_attr_client_id.attr,
1869 	&dev_attr_pool.attr,
1870 	&dev_attr_name.attr,
1871 	&dev_attr_current_snap.attr,
1872 	&dev_attr_refresh.attr,
1873 	&dev_attr_create_snap.attr,
1874 	NULL
1875 };
1876 
1877 static struct attribute_group rbd_attr_group = {
1878 	.attrs = rbd_attrs,
1879 };
1880 
1881 static const struct attribute_group *rbd_attr_groups[] = {
1882 	&rbd_attr_group,
1883 	NULL
1884 };
1885 
1886 static void rbd_sysfs_dev_release(struct device *dev)
1887 {
1888 }
1889 
1890 static struct device_type rbd_device_type = {
1891 	.name		= "rbd",
1892 	.groups		= rbd_attr_groups,
1893 	.release	= rbd_sysfs_dev_release,
1894 };
1895 
1896 
1897 /*
1898   sysfs - snapshots
1899 */
1900 
1901 static ssize_t rbd_snap_size_show(struct device *dev,
1902 				  struct device_attribute *attr,
1903 				  char *buf)
1904 {
1905 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1906 
1907 	return sprintf(buf, "%lld\n", (long long)snap->size);
1908 }
1909 
1910 static ssize_t rbd_snap_id_show(struct device *dev,
1911 				struct device_attribute *attr,
1912 				char *buf)
1913 {
1914 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1915 
1916 	return sprintf(buf, "%lld\n", (long long)snap->id);
1917 }
1918 
1919 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1920 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1921 
1922 static struct attribute *rbd_snap_attrs[] = {
1923 	&dev_attr_snap_size.attr,
1924 	&dev_attr_snap_id.attr,
1925 	NULL,
1926 };
1927 
1928 static struct attribute_group rbd_snap_attr_group = {
1929 	.attrs = rbd_snap_attrs,
1930 };
1931 
1932 static void rbd_snap_dev_release(struct device *dev)
1933 {
1934 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1935 	kfree(snap->name);
1936 	kfree(snap);
1937 }
1938 
1939 static const struct attribute_group *rbd_snap_attr_groups[] = {
1940 	&rbd_snap_attr_group,
1941 	NULL
1942 };
1943 
1944 static struct device_type rbd_snap_device_type = {
1945 	.groups		= rbd_snap_attr_groups,
1946 	.release	= rbd_snap_dev_release,
1947 };
1948 
1949 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1950 				  struct rbd_snap *snap)
1951 {
1952 	list_del(&snap->node);
1953 	device_unregister(&snap->dev);
1954 }
1955 
1956 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1957 				  struct rbd_snap *snap,
1958 				  struct device *parent)
1959 {
1960 	struct device *dev = &snap->dev;
1961 	int ret;
1962 
1963 	dev->type = &rbd_snap_device_type;
1964 	dev->parent = parent;
1965 	dev->release = rbd_snap_dev_release;
1966 	dev_set_name(dev, "snap_%s", snap->name);
1967 	ret = device_register(dev);
1968 
1969 	return ret;
1970 }
1971 
1972 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1973 			      int i, const char *name,
1974 			      struct rbd_snap **snapp)
1975 {
1976 	int ret;
1977 	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1978 	if (!snap)
1979 		return -ENOMEM;
1980 	snap->name = kstrdup(name, GFP_KERNEL);
1981 	snap->size = rbd_dev->header.snap_sizes[i];
1982 	snap->id = rbd_dev->header.snapc->snaps[i];
1983 	if (device_is_registered(&rbd_dev->dev)) {
1984 		ret = rbd_register_snap_dev(rbd_dev, snap,
1985 					     &rbd_dev->dev);
1986 		if (ret < 0)
1987 			goto err;
1988 	}
1989 	*snapp = snap;
1990 	return 0;
1991 err:
1992 	kfree(snap->name);
1993 	kfree(snap);
1994 	return ret;
1995 }
1996 
1997 /*
1998  * search for the previous snap in a null delimited string list
1999  */
2000 const char *rbd_prev_snap_name(const char *name, const char *start)
2001 {
2002 	if (name < start + 2)
2003 		return NULL;
2004 
2005 	name -= 2;
2006 	while (*name) {
2007 		if (name == start)
2008 			return start;
2009 		name--;
2010 	}
2011 	return name + 1;
2012 }
2013 
2014 /*
2015  * compare the old list of snapshots that we have to what's in the header
2016  * and update it accordingly. Note that the header holds the snapshots
2017  * in a reverse order (from newest to oldest) and we need to go from
2018  * older to new so that we don't get a duplicate snap name when
2019  * doing the process (e.g., removed snapshot and recreated a new
2020  * one with the same name.
2021  */
2022 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2023 {
2024 	const char *name, *first_name;
2025 	int i = rbd_dev->header.total_snaps;
2026 	struct rbd_snap *snap, *old_snap = NULL;
2027 	int ret;
2028 	struct list_head *p, *n;
2029 
2030 	first_name = rbd_dev->header.snap_names;
2031 	name = first_name + rbd_dev->header.snap_names_len;
2032 
2033 	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2034 		u64 cur_id;
2035 
2036 		old_snap = list_entry(p, struct rbd_snap, node);
2037 
2038 		if (i)
2039 			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2040 
2041 		if (!i || old_snap->id < cur_id) {
2042 			/* old_snap->id was skipped, thus was removed */
2043 			__rbd_remove_snap_dev(rbd_dev, old_snap);
2044 			continue;
2045 		}
2046 		if (old_snap->id == cur_id) {
2047 			/* we have this snapshot already */
2048 			i--;
2049 			name = rbd_prev_snap_name(name, first_name);
2050 			continue;
2051 		}
2052 		for (; i > 0;
2053 		     i--, name = rbd_prev_snap_name(name, first_name)) {
2054 			if (!name) {
2055 				WARN_ON(1);
2056 				return -EINVAL;
2057 			}
2058 			cur_id = rbd_dev->header.snapc->snaps[i];
2059 			/* snapshot removal? handle it above */
2060 			if (cur_id >= old_snap->id)
2061 				break;
2062 			/* a new snapshot */
2063 			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2064 			if (ret < 0)
2065 				return ret;
2066 
2067 			/* note that we add it backward so using n and not p */
2068 			list_add(&snap->node, n);
2069 			p = &snap->node;
2070 		}
2071 	}
2072 	/* we're done going over the old snap list, just add what's left */
2073 	for (; i > 0; i--) {
2074 		name = rbd_prev_snap_name(name, first_name);
2075 		if (!name) {
2076 			WARN_ON(1);
2077 			return -EINVAL;
2078 		}
2079 		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2080 		if (ret < 0)
2081 			return ret;
2082 		list_add(&snap->node, &rbd_dev->snaps);
2083 	}
2084 
2085 	return 0;
2086 }
2087 
2088 
2089 static void rbd_root_dev_release(struct device *dev)
2090 {
2091 }
2092 
2093 static struct device rbd_root_dev = {
2094 	.init_name =    "rbd",
2095 	.release =      rbd_root_dev_release,
2096 };
2097 
2098 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2099 {
2100 	int ret = -ENOMEM;
2101 	struct device *dev;
2102 	struct rbd_snap *snap;
2103 
2104 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2105 	dev = &rbd_dev->dev;
2106 
2107 	dev->bus = &rbd_bus_type;
2108 	dev->type = &rbd_device_type;
2109 	dev->parent = &rbd_root_dev;
2110 	dev->release = rbd_dev_release;
2111 	dev_set_name(dev, "%d", rbd_dev->id);
2112 	ret = device_register(dev);
2113 	if (ret < 0)
2114 		goto done_free;
2115 
2116 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2117 		ret = rbd_register_snap_dev(rbd_dev, snap,
2118 					     &rbd_dev->dev);
2119 		if (ret < 0)
2120 			break;
2121 	}
2122 
2123 	mutex_unlock(&ctl_mutex);
2124 	return 0;
2125 done_free:
2126 	mutex_unlock(&ctl_mutex);
2127 	return ret;
2128 }
2129 
2130 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2131 {
2132 	device_unregister(&rbd_dev->dev);
2133 }
2134 
2135 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2136 {
2137 	int ret, rc;
2138 
2139 	do {
2140 		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2141 					 rbd_dev->header.obj_version);
2142 		if (ret == -ERANGE) {
2143 			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2144 			rc = __rbd_update_snaps(rbd_dev);
2145 			mutex_unlock(&ctl_mutex);
2146 			if (rc < 0)
2147 				return rc;
2148 		}
2149 	} while (ret == -ERANGE);
2150 
2151 	return ret;
2152 }
2153 
2154 static ssize_t rbd_add(struct bus_type *bus,
2155 		       const char *buf,
2156 		       size_t count)
2157 {
2158 	struct ceph_osd_client *osdc;
2159 	struct rbd_device *rbd_dev;
2160 	ssize_t rc = -ENOMEM;
2161 	int irc, new_id = 0;
2162 	struct list_head *tmp;
2163 	char *mon_dev_name;
2164 	char *options;
2165 
2166 	if (!try_module_get(THIS_MODULE))
2167 		return -ENODEV;
2168 
2169 	mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2170 	if (!mon_dev_name)
2171 		goto err_out_mod;
2172 
2173 	options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2174 	if (!options)
2175 		goto err_mon_dev;
2176 
2177 	/* new rbd_device object */
2178 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2179 	if (!rbd_dev)
2180 		goto err_out_opt;
2181 
2182 	/* static rbd_device initialization */
2183 	spin_lock_init(&rbd_dev->lock);
2184 	INIT_LIST_HEAD(&rbd_dev->node);
2185 	INIT_LIST_HEAD(&rbd_dev->snaps);
2186 
2187 	/* generate unique id: find highest unique id, add one */
2188 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2189 
2190 	list_for_each(tmp, &rbd_dev_list) {
2191 		struct rbd_device *rbd_dev;
2192 
2193 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2194 		if (rbd_dev->id >= new_id)
2195 			new_id = rbd_dev->id + 1;
2196 	}
2197 
2198 	rbd_dev->id = new_id;
2199 
2200 	/* add to global list */
2201 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2202 
2203 	/* parse add command */
2204 	if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2205 		   "%" __stringify(RBD_MAX_OPT_LEN) "s "
2206 		   "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2207 		   "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2208 		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2209 		   mon_dev_name, options, rbd_dev->pool_name,
2210 		   rbd_dev->obj, rbd_dev->snap_name) < 4) {
2211 		rc = -EINVAL;
2212 		goto err_out_slot;
2213 	}
2214 
2215 	if (rbd_dev->snap_name[0] == 0)
2216 		rbd_dev->snap_name[0] = '-';
2217 
2218 	rbd_dev->obj_len = strlen(rbd_dev->obj);
2219 	snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2220 		 rbd_dev->obj, RBD_SUFFIX);
2221 
2222 	/* initialize rest of new object */
2223 	snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2224 	rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2225 	if (rc < 0)
2226 		goto err_out_slot;
2227 
2228 	mutex_unlock(&ctl_mutex);
2229 
2230 	/* pick the pool */
2231 	osdc = &rbd_dev->client->osdc;
2232 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2233 	if (rc < 0)
2234 		goto err_out_client;
2235 	rbd_dev->poolid = rc;
2236 
2237 	/* register our block device */
2238 	irc = register_blkdev(0, rbd_dev->name);
2239 	if (irc < 0) {
2240 		rc = irc;
2241 		goto err_out_client;
2242 	}
2243 	rbd_dev->major = irc;
2244 
2245 	rc = rbd_bus_add_dev(rbd_dev);
2246 	if (rc)
2247 		goto err_out_blkdev;
2248 
2249 	/* set up and announce blkdev mapping */
2250 	rc = rbd_init_disk(rbd_dev);
2251 	if (rc)
2252 		goto err_out_bus;
2253 
2254 	rc = rbd_init_watch_dev(rbd_dev);
2255 	if (rc)
2256 		goto err_out_bus;
2257 
2258 	return count;
2259 
2260 err_out_bus:
2261 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2262 	list_del_init(&rbd_dev->node);
2263 	mutex_unlock(&ctl_mutex);
2264 
2265 	/* this will also clean up rest of rbd_dev stuff */
2266 
2267 	rbd_bus_del_dev(rbd_dev);
2268 	kfree(options);
2269 	kfree(mon_dev_name);
2270 	return rc;
2271 
2272 err_out_blkdev:
2273 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2274 err_out_client:
2275 	rbd_put_client(rbd_dev);
2276 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2277 err_out_slot:
2278 	list_del_init(&rbd_dev->node);
2279 	mutex_unlock(&ctl_mutex);
2280 
2281 	kfree(rbd_dev);
2282 err_out_opt:
2283 	kfree(options);
2284 err_mon_dev:
2285 	kfree(mon_dev_name);
2286 err_out_mod:
2287 	dout("Error adding device %s\n", buf);
2288 	module_put(THIS_MODULE);
2289 	return rc;
2290 }
2291 
2292 static struct rbd_device *__rbd_get_dev(unsigned long id)
2293 {
2294 	struct list_head *tmp;
2295 	struct rbd_device *rbd_dev;
2296 
2297 	list_for_each(tmp, &rbd_dev_list) {
2298 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2299 		if (rbd_dev->id == id)
2300 			return rbd_dev;
2301 	}
2302 	return NULL;
2303 }
2304 
2305 static void rbd_dev_release(struct device *dev)
2306 {
2307 	struct rbd_device *rbd_dev =
2308 			container_of(dev, struct rbd_device, dev);
2309 
2310 	if (rbd_dev->watch_request)
2311 		ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2312 						    rbd_dev->watch_request);
2313 	if (rbd_dev->watch_event)
2314 		rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2315 
2316 	rbd_put_client(rbd_dev);
2317 
2318 	/* clean up and free blkdev */
2319 	rbd_free_disk(rbd_dev);
2320 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2321 	kfree(rbd_dev);
2322 
2323 	/* release module ref */
2324 	module_put(THIS_MODULE);
2325 }
2326 
2327 static ssize_t rbd_remove(struct bus_type *bus,
2328 			  const char *buf,
2329 			  size_t count)
2330 {
2331 	struct rbd_device *rbd_dev = NULL;
2332 	int target_id, rc;
2333 	unsigned long ul;
2334 	int ret = count;
2335 
2336 	rc = strict_strtoul(buf, 10, &ul);
2337 	if (rc)
2338 		return rc;
2339 
2340 	/* convert to int; abort if we lost anything in the conversion */
2341 	target_id = (int) ul;
2342 	if (target_id != ul)
2343 		return -EINVAL;
2344 
2345 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2346 
2347 	rbd_dev = __rbd_get_dev(target_id);
2348 	if (!rbd_dev) {
2349 		ret = -ENOENT;
2350 		goto done;
2351 	}
2352 
2353 	list_del_init(&rbd_dev->node);
2354 
2355 	__rbd_remove_all_snaps(rbd_dev);
2356 	rbd_bus_del_dev(rbd_dev);
2357 
2358 done:
2359 	mutex_unlock(&ctl_mutex);
2360 	return ret;
2361 }
2362 
2363 static ssize_t rbd_snap_add(struct device *dev,
2364 			    struct device_attribute *attr,
2365 			    const char *buf,
2366 			    size_t count)
2367 {
2368 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
2369 	int ret;
2370 	char *name = kmalloc(count + 1, GFP_KERNEL);
2371 	if (!name)
2372 		return -ENOMEM;
2373 
2374 	snprintf(name, count, "%s", buf);
2375 
2376 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2377 
2378 	ret = rbd_header_add_snap(rbd_dev,
2379 				  name, GFP_KERNEL);
2380 	if (ret < 0)
2381 		goto err_unlock;
2382 
2383 	ret = __rbd_update_snaps(rbd_dev);
2384 	if (ret < 0)
2385 		goto err_unlock;
2386 
2387 	/* shouldn't hold ctl_mutex when notifying.. notify might
2388 	   trigger a watch callback that would need to get that mutex */
2389 	mutex_unlock(&ctl_mutex);
2390 
2391 	/* make a best effort, don't error if failed */
2392 	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2393 
2394 	ret = count;
2395 	kfree(name);
2396 	return ret;
2397 
2398 err_unlock:
2399 	mutex_unlock(&ctl_mutex);
2400 	kfree(name);
2401 	return ret;
2402 }
2403 
2404 static struct bus_attribute rbd_bus_attrs[] = {
2405 	__ATTR(add, S_IWUSR, NULL, rbd_add),
2406 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
2407 	__ATTR_NULL
2408 };
2409 
2410 /*
2411  * create control files in sysfs
2412  * /sys/bus/rbd/...
2413  */
2414 static int rbd_sysfs_init(void)
2415 {
2416 	int ret;
2417 
2418 	rbd_bus_type.bus_attrs = rbd_bus_attrs;
2419 
2420 	ret = bus_register(&rbd_bus_type);
2421 	 if (ret < 0)
2422 		return ret;
2423 
2424 	ret = device_register(&rbd_root_dev);
2425 
2426 	return ret;
2427 }
2428 
2429 static void rbd_sysfs_cleanup(void)
2430 {
2431 	device_unregister(&rbd_root_dev);
2432 	bus_unregister(&rbd_bus_type);
2433 }
2434 
2435 int __init rbd_init(void)
2436 {
2437 	int rc;
2438 
2439 	rc = rbd_sysfs_init();
2440 	if (rc)
2441 		return rc;
2442 	spin_lock_init(&node_lock);
2443 	pr_info("loaded " DRV_NAME_LONG "\n");
2444 	return 0;
2445 }
2446 
2447 void __exit rbd_exit(void)
2448 {
2449 	rbd_sysfs_cleanup();
2450 }
2451 
2452 module_init(rbd_init);
2453 module_exit(rbd_exit);
2454 
2455 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2456 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2457 MODULE_DESCRIPTION("rados block device");
2458 
2459 /* following authorship retained from original osdblk.c */
2460 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2461 
2462 MODULE_LICENSE("GPL");
2463