xref: /linux/drivers/md/md-cluster.c (revision e5c86679d5e864947a52fb31e45a425dea3e7fa9)
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10 
11 
12 #include <linux/module.h>
13 #include <linux/kthread.h>
14 #include <linux/dlm.h>
15 #include <linux/sched.h>
16 #include <linux/raid/md_p.h>
17 #include "md.h"
18 #include "bitmap.h"
19 #include "md-cluster.h"
20 
21 #define LVB_SIZE	64
22 #define NEW_DEV_TIMEOUT 5000
23 
24 struct dlm_lock_resource {
25 	dlm_lockspace_t *ls;
26 	struct dlm_lksb lksb;
27 	char *name; /* lock name. */
28 	uint32_t flags; /* flags to pass to dlm_lock() */
29 	wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
30 	bool sync_locking_done;
31 	void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
32 	struct mddev *mddev; /* pointing back to mddev. */
33 	int mode;
34 };
35 
36 struct suspend_info {
37 	int slot;
38 	sector_t lo;
39 	sector_t hi;
40 	struct list_head list;
41 };
42 
43 struct resync_info {
44 	__le64 lo;
45 	__le64 hi;
46 };
47 
48 /* md_cluster_info flags */
49 #define		MD_CLUSTER_WAITING_FOR_NEWDISK		1
50 #define		MD_CLUSTER_SUSPEND_READ_BALANCING	2
51 #define		MD_CLUSTER_BEGIN_JOIN_CLUSTER		3
52 
53 /* Lock the send communication. This is done through
54  * bit manipulation as opposed to a mutex in order to
55  * accomodate lock and hold. See next comment.
56  */
57 #define		MD_CLUSTER_SEND_LOCK			4
58 /* If cluster operations (such as adding a disk) must lock the
59  * communication channel, so as to perform extra operations
60  * (update metadata) and no other operation is allowed on the
61  * MD. Token needs to be locked and held until the operation
62  * completes witha md_update_sb(), which would eventually release
63  * the lock.
64  */
65 #define		MD_CLUSTER_SEND_LOCKED_ALREADY		5
66 /* We should receive message after node joined cluster and
67  * set up all the related infos such as bitmap and personality */
68 #define		MD_CLUSTER_ALREADY_IN_CLUSTER		6
69 #define		MD_CLUSTER_PENDING_RECV_EVENT		7
70 
71 
72 struct md_cluster_info {
73 	/* dlm lock space and resources for clustered raid. */
74 	dlm_lockspace_t *lockspace;
75 	int slot_number;
76 	struct completion completion;
77 	struct mutex recv_mutex;
78 	struct dlm_lock_resource *bitmap_lockres;
79 	struct dlm_lock_resource **other_bitmap_lockres;
80 	struct dlm_lock_resource *resync_lockres;
81 	struct list_head suspend_list;
82 	spinlock_t suspend_lock;
83 	struct md_thread *recovery_thread;
84 	unsigned long recovery_map;
85 	/* communication loc resources */
86 	struct dlm_lock_resource *ack_lockres;
87 	struct dlm_lock_resource *message_lockres;
88 	struct dlm_lock_resource *token_lockres;
89 	struct dlm_lock_resource *no_new_dev_lockres;
90 	struct md_thread *recv_thread;
91 	struct completion newdisk_completion;
92 	wait_queue_head_t wait;
93 	unsigned long state;
94 	/* record the region in RESYNCING message */
95 	sector_t sync_low;
96 	sector_t sync_hi;
97 };
98 
99 enum msg_type {
100 	METADATA_UPDATED = 0,
101 	RESYNCING,
102 	NEWDISK,
103 	REMOVE,
104 	RE_ADD,
105 	BITMAP_NEEDS_SYNC,
106 };
107 
108 struct cluster_msg {
109 	__le32 type;
110 	__le32 slot;
111 	/* TODO: Unionize this for smaller footprint */
112 	__le64 low;
113 	__le64 high;
114 	char uuid[16];
115 	__le32 raid_slot;
116 };
117 
118 static void sync_ast(void *arg)
119 {
120 	struct dlm_lock_resource *res;
121 
122 	res = arg;
123 	res->sync_locking_done = true;
124 	wake_up(&res->sync_locking);
125 }
126 
127 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
128 {
129 	int ret = 0;
130 
131 	ret = dlm_lock(res->ls, mode, &res->lksb,
132 			res->flags, res->name, strlen(res->name),
133 			0, sync_ast, res, res->bast);
134 	if (ret)
135 		return ret;
136 	wait_event(res->sync_locking, res->sync_locking_done);
137 	res->sync_locking_done = false;
138 	if (res->lksb.sb_status == 0)
139 		res->mode = mode;
140 	return res->lksb.sb_status;
141 }
142 
143 static int dlm_unlock_sync(struct dlm_lock_resource *res)
144 {
145 	return dlm_lock_sync(res, DLM_LOCK_NL);
146 }
147 
148 /*
149  * An variation of dlm_lock_sync, which make lock request could
150  * be interrupted
151  */
152 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
153 				       struct mddev *mddev)
154 {
155 	int ret = 0;
156 
157 	ret = dlm_lock(res->ls, mode, &res->lksb,
158 			res->flags, res->name, strlen(res->name),
159 			0, sync_ast, res, res->bast);
160 	if (ret)
161 		return ret;
162 
163 	wait_event(res->sync_locking, res->sync_locking_done
164 				      || kthread_should_stop()
165 				      || test_bit(MD_CLOSING, &mddev->flags));
166 	if (!res->sync_locking_done) {
167 		/*
168 		 * the convert queue contains the lock request when request is
169 		 * interrupted, and sync_ast could still be run, so need to
170 		 * cancel the request and reset completion
171 		 */
172 		ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
173 			&res->lksb, res);
174 		res->sync_locking_done = false;
175 		if (unlikely(ret != 0))
176 			pr_info("failed to cancel previous lock request "
177 				 "%s return %d\n", res->name, ret);
178 		return -EPERM;
179 	} else
180 		res->sync_locking_done = false;
181 	if (res->lksb.sb_status == 0)
182 		res->mode = mode;
183 	return res->lksb.sb_status;
184 }
185 
186 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
187 		char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
188 {
189 	struct dlm_lock_resource *res = NULL;
190 	int ret, namelen;
191 	struct md_cluster_info *cinfo = mddev->cluster_info;
192 
193 	res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
194 	if (!res)
195 		return NULL;
196 	init_waitqueue_head(&res->sync_locking);
197 	res->sync_locking_done = false;
198 	res->ls = cinfo->lockspace;
199 	res->mddev = mddev;
200 	res->mode = DLM_LOCK_IV;
201 	namelen = strlen(name);
202 	res->name = kzalloc(namelen + 1, GFP_KERNEL);
203 	if (!res->name) {
204 		pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
205 		goto out_err;
206 	}
207 	strlcpy(res->name, name, namelen + 1);
208 	if (with_lvb) {
209 		res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
210 		if (!res->lksb.sb_lvbptr) {
211 			pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
212 			goto out_err;
213 		}
214 		res->flags = DLM_LKF_VALBLK;
215 	}
216 
217 	if (bastfn)
218 		res->bast = bastfn;
219 
220 	res->flags |= DLM_LKF_EXPEDITE;
221 
222 	ret = dlm_lock_sync(res, DLM_LOCK_NL);
223 	if (ret) {
224 		pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
225 		goto out_err;
226 	}
227 	res->flags &= ~DLM_LKF_EXPEDITE;
228 	res->flags |= DLM_LKF_CONVERT;
229 
230 	return res;
231 out_err:
232 	kfree(res->lksb.sb_lvbptr);
233 	kfree(res->name);
234 	kfree(res);
235 	return NULL;
236 }
237 
238 static void lockres_free(struct dlm_lock_resource *res)
239 {
240 	int ret = 0;
241 
242 	if (!res)
243 		return;
244 
245 	/*
246 	 * use FORCEUNLOCK flag, so we can unlock even the lock is on the
247 	 * waiting or convert queue
248 	 */
249 	ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
250 		&res->lksb, res);
251 	if (unlikely(ret != 0))
252 		pr_err("failed to unlock %s return %d\n", res->name, ret);
253 	else
254 		wait_event(res->sync_locking, res->sync_locking_done);
255 
256 	kfree(res->name);
257 	kfree(res->lksb.sb_lvbptr);
258 	kfree(res);
259 }
260 
261 static void add_resync_info(struct dlm_lock_resource *lockres,
262 			    sector_t lo, sector_t hi)
263 {
264 	struct resync_info *ri;
265 
266 	ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
267 	ri->lo = cpu_to_le64(lo);
268 	ri->hi = cpu_to_le64(hi);
269 }
270 
271 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
272 {
273 	struct resync_info ri;
274 	struct suspend_info *s = NULL;
275 	sector_t hi = 0;
276 
277 	dlm_lock_sync(lockres, DLM_LOCK_CR);
278 	memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
279 	hi = le64_to_cpu(ri.hi);
280 	if (hi > 0) {
281 		s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
282 		if (!s)
283 			goto out;
284 		s->hi = hi;
285 		s->lo = le64_to_cpu(ri.lo);
286 	}
287 	dlm_unlock_sync(lockres);
288 out:
289 	return s;
290 }
291 
292 static void recover_bitmaps(struct md_thread *thread)
293 {
294 	struct mddev *mddev = thread->mddev;
295 	struct md_cluster_info *cinfo = mddev->cluster_info;
296 	struct dlm_lock_resource *bm_lockres;
297 	char str[64];
298 	int slot, ret;
299 	struct suspend_info *s, *tmp;
300 	sector_t lo, hi;
301 
302 	while (cinfo->recovery_map) {
303 		slot = fls64((u64)cinfo->recovery_map) - 1;
304 
305 		/* Clear suspend_area associated with the bitmap */
306 		spin_lock_irq(&cinfo->suspend_lock);
307 		list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
308 			if (slot == s->slot) {
309 				list_del(&s->list);
310 				kfree(s);
311 			}
312 		spin_unlock_irq(&cinfo->suspend_lock);
313 
314 		snprintf(str, 64, "bitmap%04d", slot);
315 		bm_lockres = lockres_init(mddev, str, NULL, 1);
316 		if (!bm_lockres) {
317 			pr_err("md-cluster: Cannot initialize bitmaps\n");
318 			goto clear_bit;
319 		}
320 
321 		ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
322 		if (ret) {
323 			pr_err("md-cluster: Could not DLM lock %s: %d\n",
324 					str, ret);
325 			goto clear_bit;
326 		}
327 		ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
328 		if (ret) {
329 			pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
330 			goto clear_bit;
331 		}
332 		if (hi > 0) {
333 			if (lo < mddev->recovery_cp)
334 				mddev->recovery_cp = lo;
335 			/* wake up thread to continue resync in case resync
336 			 * is not finished */
337 			if (mddev->recovery_cp != MaxSector) {
338 			    set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
339 			    md_wakeup_thread(mddev->thread);
340 			}
341 		}
342 clear_bit:
343 		lockres_free(bm_lockres);
344 		clear_bit(slot, &cinfo->recovery_map);
345 	}
346 }
347 
348 static void recover_prep(void *arg)
349 {
350 	struct mddev *mddev = arg;
351 	struct md_cluster_info *cinfo = mddev->cluster_info;
352 	set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
353 }
354 
355 static void __recover_slot(struct mddev *mddev, int slot)
356 {
357 	struct md_cluster_info *cinfo = mddev->cluster_info;
358 
359 	set_bit(slot, &cinfo->recovery_map);
360 	if (!cinfo->recovery_thread) {
361 		cinfo->recovery_thread = md_register_thread(recover_bitmaps,
362 				mddev, "recover");
363 		if (!cinfo->recovery_thread) {
364 			pr_warn("md-cluster: Could not create recovery thread\n");
365 			return;
366 		}
367 	}
368 	md_wakeup_thread(cinfo->recovery_thread);
369 }
370 
371 static void recover_slot(void *arg, struct dlm_slot *slot)
372 {
373 	struct mddev *mddev = arg;
374 	struct md_cluster_info *cinfo = mddev->cluster_info;
375 
376 	pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
377 			mddev->bitmap_info.cluster_name,
378 			slot->nodeid, slot->slot,
379 			cinfo->slot_number);
380 	/* deduct one since dlm slot starts from one while the num of
381 	 * cluster-md begins with 0 */
382 	__recover_slot(mddev, slot->slot - 1);
383 }
384 
385 static void recover_done(void *arg, struct dlm_slot *slots,
386 		int num_slots, int our_slot,
387 		uint32_t generation)
388 {
389 	struct mddev *mddev = arg;
390 	struct md_cluster_info *cinfo = mddev->cluster_info;
391 
392 	cinfo->slot_number = our_slot;
393 	/* completion is only need to be complete when node join cluster,
394 	 * it doesn't need to run during another node's failure */
395 	if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
396 		complete(&cinfo->completion);
397 		clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
398 	}
399 	clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
400 }
401 
402 /* the ops is called when node join the cluster, and do lock recovery
403  * if node failure occurs */
404 static const struct dlm_lockspace_ops md_ls_ops = {
405 	.recover_prep = recover_prep,
406 	.recover_slot = recover_slot,
407 	.recover_done = recover_done,
408 };
409 
410 /*
411  * The BAST function for the ack lock resource
412  * This function wakes up the receive thread in
413  * order to receive and process the message.
414  */
415 static void ack_bast(void *arg, int mode)
416 {
417 	struct dlm_lock_resource *res = arg;
418 	struct md_cluster_info *cinfo = res->mddev->cluster_info;
419 
420 	if (mode == DLM_LOCK_EX) {
421 		if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
422 			md_wakeup_thread(cinfo->recv_thread);
423 		else
424 			set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
425 	}
426 }
427 
428 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
429 {
430 	struct suspend_info *s, *tmp;
431 
432 	list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
433 		if (slot == s->slot) {
434 			list_del(&s->list);
435 			kfree(s);
436 			break;
437 		}
438 }
439 
440 static void remove_suspend_info(struct mddev *mddev, int slot)
441 {
442 	struct md_cluster_info *cinfo = mddev->cluster_info;
443 	spin_lock_irq(&cinfo->suspend_lock);
444 	__remove_suspend_info(cinfo, slot);
445 	spin_unlock_irq(&cinfo->suspend_lock);
446 	mddev->pers->quiesce(mddev, 2);
447 }
448 
449 
450 static void process_suspend_info(struct mddev *mddev,
451 		int slot, sector_t lo, sector_t hi)
452 {
453 	struct md_cluster_info *cinfo = mddev->cluster_info;
454 	struct suspend_info *s;
455 
456 	if (!hi) {
457 		remove_suspend_info(mddev, slot);
458 		set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
459 		md_wakeup_thread(mddev->thread);
460 		return;
461 	}
462 
463 	/*
464 	 * The bitmaps are not same for different nodes
465 	 * if RESYNCING is happening in one node, then
466 	 * the node which received the RESYNCING message
467 	 * probably will perform resync with the region
468 	 * [lo, hi] again, so we could reduce resync time
469 	 * a lot if we can ensure that the bitmaps among
470 	 * different nodes are match up well.
471 	 *
472 	 * sync_low/hi is used to record the region which
473 	 * arrived in the previous RESYNCING message,
474 	 *
475 	 * Call bitmap_sync_with_cluster to clear
476 	 * NEEDED_MASK and set RESYNC_MASK since
477 	 * resync thread is running in another node,
478 	 * so we don't need to do the resync again
479 	 * with the same section */
480 	bitmap_sync_with_cluster(mddev, cinfo->sync_low,
481 					cinfo->sync_hi,
482 					lo, hi);
483 	cinfo->sync_low = lo;
484 	cinfo->sync_hi = hi;
485 
486 	s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
487 	if (!s)
488 		return;
489 	s->slot = slot;
490 	s->lo = lo;
491 	s->hi = hi;
492 	mddev->pers->quiesce(mddev, 1);
493 	mddev->pers->quiesce(mddev, 0);
494 	spin_lock_irq(&cinfo->suspend_lock);
495 	/* Remove existing entry (if exists) before adding */
496 	__remove_suspend_info(cinfo, slot);
497 	list_add(&s->list, &cinfo->suspend_list);
498 	spin_unlock_irq(&cinfo->suspend_lock);
499 	mddev->pers->quiesce(mddev, 2);
500 }
501 
502 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
503 {
504 	char disk_uuid[64];
505 	struct md_cluster_info *cinfo = mddev->cluster_info;
506 	char event_name[] = "EVENT=ADD_DEVICE";
507 	char raid_slot[16];
508 	char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
509 	int len;
510 
511 	len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
512 	sprintf(disk_uuid + len, "%pU", cmsg->uuid);
513 	snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
514 	pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
515 	init_completion(&cinfo->newdisk_completion);
516 	set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
517 	kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
518 	wait_for_completion_timeout(&cinfo->newdisk_completion,
519 			NEW_DEV_TIMEOUT);
520 	clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
521 }
522 
523 
524 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
525 {
526 	struct md_cluster_info *cinfo = mddev->cluster_info;
527 	mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
528 	set_bit(MD_RELOAD_SB, &mddev->flags);
529 	dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
530 	md_wakeup_thread(mddev->thread);
531 }
532 
533 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
534 {
535 	struct md_rdev *rdev;
536 
537 	rcu_read_lock();
538 	rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
539 	if (rdev) {
540 		set_bit(ClusterRemove, &rdev->flags);
541 		set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
542 		md_wakeup_thread(mddev->thread);
543 	}
544 	else
545 		pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
546 			__func__, __LINE__, le32_to_cpu(msg->raid_slot));
547 	rcu_read_unlock();
548 }
549 
550 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
551 {
552 	struct md_rdev *rdev;
553 
554 	rcu_read_lock();
555 	rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
556 	if (rdev && test_bit(Faulty, &rdev->flags))
557 		clear_bit(Faulty, &rdev->flags);
558 	else
559 		pr_warn("%s: %d Could not find disk(%d) which is faulty",
560 			__func__, __LINE__, le32_to_cpu(msg->raid_slot));
561 	rcu_read_unlock();
562 }
563 
564 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
565 {
566 	int ret = 0;
567 
568 	if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
569 		"node %d received it's own msg\n", le32_to_cpu(msg->slot)))
570 		return -1;
571 	switch (le32_to_cpu(msg->type)) {
572 	case METADATA_UPDATED:
573 		process_metadata_update(mddev, msg);
574 		break;
575 	case RESYNCING:
576 		process_suspend_info(mddev, le32_to_cpu(msg->slot),
577 				     le64_to_cpu(msg->low),
578 				     le64_to_cpu(msg->high));
579 		break;
580 	case NEWDISK:
581 		process_add_new_disk(mddev, msg);
582 		break;
583 	case REMOVE:
584 		process_remove_disk(mddev, msg);
585 		break;
586 	case RE_ADD:
587 		process_readd_disk(mddev, msg);
588 		break;
589 	case BITMAP_NEEDS_SYNC:
590 		__recover_slot(mddev, le32_to_cpu(msg->slot));
591 		break;
592 	default:
593 		ret = -1;
594 		pr_warn("%s:%d Received unknown message from %d\n",
595 			__func__, __LINE__, msg->slot);
596 	}
597 	return ret;
598 }
599 
600 /*
601  * thread for receiving message
602  */
603 static void recv_daemon(struct md_thread *thread)
604 {
605 	struct md_cluster_info *cinfo = thread->mddev->cluster_info;
606 	struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
607 	struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
608 	struct cluster_msg msg;
609 	int ret;
610 
611 	mutex_lock(&cinfo->recv_mutex);
612 	/*get CR on Message*/
613 	if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
614 		pr_err("md/raid1:failed to get CR on MESSAGE\n");
615 		mutex_unlock(&cinfo->recv_mutex);
616 		return;
617 	}
618 
619 	/* read lvb and wake up thread to process this message_lockres */
620 	memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
621 	ret = process_recvd_msg(thread->mddev, &msg);
622 	if (ret)
623 		goto out;
624 
625 	/*release CR on ack_lockres*/
626 	ret = dlm_unlock_sync(ack_lockres);
627 	if (unlikely(ret != 0))
628 		pr_info("unlock ack failed return %d\n", ret);
629 	/*up-convert to PR on message_lockres*/
630 	ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
631 	if (unlikely(ret != 0))
632 		pr_info("lock PR on msg failed return %d\n", ret);
633 	/*get CR on ack_lockres again*/
634 	ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
635 	if (unlikely(ret != 0))
636 		pr_info("lock CR on ack failed return %d\n", ret);
637 out:
638 	/*release CR on message_lockres*/
639 	ret = dlm_unlock_sync(message_lockres);
640 	if (unlikely(ret != 0))
641 		pr_info("unlock msg failed return %d\n", ret);
642 	mutex_unlock(&cinfo->recv_mutex);
643 }
644 
645 /* lock_token()
646  * Takes the lock on the TOKEN lock resource so no other
647  * node can communicate while the operation is underway.
648  */
649 static int lock_token(struct md_cluster_info *cinfo)
650 {
651 	int error;
652 
653 	error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
654 	if (error)
655 		pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
656 				__func__, __LINE__, error);
657 
658 	/* Lock the receive sequence */
659 	mutex_lock(&cinfo->recv_mutex);
660 	return error;
661 }
662 
663 /* lock_comm()
664  * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
665  */
666 static int lock_comm(struct md_cluster_info *cinfo)
667 {
668 	wait_event(cinfo->wait,
669 		   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
670 
671 	return lock_token(cinfo);
672 }
673 
674 static void unlock_comm(struct md_cluster_info *cinfo)
675 {
676 	WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
677 	mutex_unlock(&cinfo->recv_mutex);
678 	dlm_unlock_sync(cinfo->token_lockres);
679 	clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
680 	wake_up(&cinfo->wait);
681 }
682 
683 /* __sendmsg()
684  * This function performs the actual sending of the message. This function is
685  * usually called after performing the encompassing operation
686  * The function:
687  * 1. Grabs the message lockresource in EX mode
688  * 2. Copies the message to the message LVB
689  * 3. Downconverts message lockresource to CW
690  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
691  *    and the other nodes read the message. The thread will wait here until all other
692  *    nodes have released ack lock resource.
693  * 5. Downconvert ack lockresource to CR
694  */
695 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
696 {
697 	int error;
698 	int slot = cinfo->slot_number - 1;
699 
700 	cmsg->slot = cpu_to_le32(slot);
701 	/*get EX on Message*/
702 	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
703 	if (error) {
704 		pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
705 		goto failed_message;
706 	}
707 
708 	memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
709 			sizeof(struct cluster_msg));
710 	/*down-convert EX to CW on Message*/
711 	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
712 	if (error) {
713 		pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
714 				error);
715 		goto failed_ack;
716 	}
717 
718 	/*up-convert CR to EX on Ack*/
719 	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
720 	if (error) {
721 		pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
722 				error);
723 		goto failed_ack;
724 	}
725 
726 	/*down-convert EX to CR on Ack*/
727 	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
728 	if (error) {
729 		pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
730 				error);
731 		goto failed_ack;
732 	}
733 
734 failed_ack:
735 	error = dlm_unlock_sync(cinfo->message_lockres);
736 	if (unlikely(error != 0)) {
737 		pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
738 			error);
739 		/* in case the message can't be released due to some reason */
740 		goto failed_ack;
741 	}
742 failed_message:
743 	return error;
744 }
745 
746 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
747 {
748 	int ret;
749 
750 	lock_comm(cinfo);
751 	ret = __sendmsg(cinfo, cmsg);
752 	unlock_comm(cinfo);
753 	return ret;
754 }
755 
756 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
757 {
758 	struct md_cluster_info *cinfo = mddev->cluster_info;
759 	int i, ret = 0;
760 	struct dlm_lock_resource *bm_lockres;
761 	struct suspend_info *s;
762 	char str[64];
763 	sector_t lo, hi;
764 
765 
766 	for (i = 0; i < total_slots; i++) {
767 		memset(str, '\0', 64);
768 		snprintf(str, 64, "bitmap%04d", i);
769 		bm_lockres = lockres_init(mddev, str, NULL, 1);
770 		if (!bm_lockres)
771 			return -ENOMEM;
772 		if (i == (cinfo->slot_number - 1)) {
773 			lockres_free(bm_lockres);
774 			continue;
775 		}
776 
777 		bm_lockres->flags |= DLM_LKF_NOQUEUE;
778 		ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
779 		if (ret == -EAGAIN) {
780 			s = read_resync_info(mddev, bm_lockres);
781 			if (s) {
782 				pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
783 						__func__, __LINE__,
784 						(unsigned long long) s->lo,
785 						(unsigned long long) s->hi, i);
786 				spin_lock_irq(&cinfo->suspend_lock);
787 				s->slot = i;
788 				list_add(&s->list, &cinfo->suspend_list);
789 				spin_unlock_irq(&cinfo->suspend_lock);
790 			}
791 			ret = 0;
792 			lockres_free(bm_lockres);
793 			continue;
794 		}
795 		if (ret) {
796 			lockres_free(bm_lockres);
797 			goto out;
798 		}
799 
800 		/* Read the disk bitmap sb and check if it needs recovery */
801 		ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
802 		if (ret) {
803 			pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
804 			lockres_free(bm_lockres);
805 			continue;
806 		}
807 		if ((hi > 0) && (lo < mddev->recovery_cp)) {
808 			set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
809 			mddev->recovery_cp = lo;
810 			md_check_recovery(mddev);
811 		}
812 
813 		lockres_free(bm_lockres);
814 	}
815 out:
816 	return ret;
817 }
818 
819 static int join(struct mddev *mddev, int nodes)
820 {
821 	struct md_cluster_info *cinfo;
822 	int ret, ops_rv;
823 	char str[64];
824 
825 	cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
826 	if (!cinfo)
827 		return -ENOMEM;
828 
829 	INIT_LIST_HEAD(&cinfo->suspend_list);
830 	spin_lock_init(&cinfo->suspend_lock);
831 	init_completion(&cinfo->completion);
832 	set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
833 	init_waitqueue_head(&cinfo->wait);
834 	mutex_init(&cinfo->recv_mutex);
835 
836 	mddev->cluster_info = cinfo;
837 
838 	memset(str, 0, 64);
839 	sprintf(str, "%pU", mddev->uuid);
840 	ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
841 				DLM_LSFL_FS, LVB_SIZE,
842 				&md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
843 	if (ret)
844 		goto err;
845 	wait_for_completion(&cinfo->completion);
846 	if (nodes < cinfo->slot_number) {
847 		pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
848 			cinfo->slot_number, nodes);
849 		ret = -ERANGE;
850 		goto err;
851 	}
852 	/* Initiate the communication resources */
853 	ret = -ENOMEM;
854 	cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
855 	if (!cinfo->recv_thread) {
856 		pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
857 		goto err;
858 	}
859 	cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
860 	if (!cinfo->message_lockres)
861 		goto err;
862 	cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
863 	if (!cinfo->token_lockres)
864 		goto err;
865 	cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
866 	if (!cinfo->no_new_dev_lockres)
867 		goto err;
868 
869 	ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
870 	if (ret) {
871 		ret = -EAGAIN;
872 		pr_err("md-cluster: can't join cluster to avoid lock issue\n");
873 		goto err;
874 	}
875 	cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
876 	if (!cinfo->ack_lockres) {
877 		ret = -ENOMEM;
878 		goto err;
879 	}
880 	/* get sync CR lock on ACK. */
881 	if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
882 		pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
883 				ret);
884 	dlm_unlock_sync(cinfo->token_lockres);
885 	/* get sync CR lock on no-new-dev. */
886 	if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
887 		pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
888 
889 
890 	pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
891 	snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
892 	cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
893 	if (!cinfo->bitmap_lockres) {
894 		ret = -ENOMEM;
895 		goto err;
896 	}
897 	if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
898 		pr_err("Failed to get bitmap lock\n");
899 		ret = -EINVAL;
900 		goto err;
901 	}
902 
903 	cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
904 	if (!cinfo->resync_lockres) {
905 		ret = -ENOMEM;
906 		goto err;
907 	}
908 
909 	return 0;
910 err:
911 	md_unregister_thread(&cinfo->recovery_thread);
912 	md_unregister_thread(&cinfo->recv_thread);
913 	lockres_free(cinfo->message_lockres);
914 	lockres_free(cinfo->token_lockres);
915 	lockres_free(cinfo->ack_lockres);
916 	lockres_free(cinfo->no_new_dev_lockres);
917 	lockres_free(cinfo->resync_lockres);
918 	lockres_free(cinfo->bitmap_lockres);
919 	if (cinfo->lockspace)
920 		dlm_release_lockspace(cinfo->lockspace, 2);
921 	mddev->cluster_info = NULL;
922 	kfree(cinfo);
923 	return ret;
924 }
925 
926 static void load_bitmaps(struct mddev *mddev, int total_slots)
927 {
928 	struct md_cluster_info *cinfo = mddev->cluster_info;
929 
930 	/* load all the node's bitmap info for resync */
931 	if (gather_all_resync_info(mddev, total_slots))
932 		pr_err("md-cluster: failed to gather all resyn infos\n");
933 	set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
934 	/* wake up recv thread in case something need to be handled */
935 	if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
936 		md_wakeup_thread(cinfo->recv_thread);
937 }
938 
939 static void resync_bitmap(struct mddev *mddev)
940 {
941 	struct md_cluster_info *cinfo = mddev->cluster_info;
942 	struct cluster_msg cmsg = {0};
943 	int err;
944 
945 	cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
946 	err = sendmsg(cinfo, &cmsg);
947 	if (err)
948 		pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
949 			__func__, __LINE__, err);
950 }
951 
952 static void unlock_all_bitmaps(struct mddev *mddev);
953 static int leave(struct mddev *mddev)
954 {
955 	struct md_cluster_info *cinfo = mddev->cluster_info;
956 
957 	if (!cinfo)
958 		return 0;
959 
960 	/* BITMAP_NEEDS_SYNC message should be sent when node
961 	 * is leaving the cluster with dirty bitmap, also we
962 	 * can only deliver it when dlm connection is available */
963 	if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector)
964 		resync_bitmap(mddev);
965 
966 	md_unregister_thread(&cinfo->recovery_thread);
967 	md_unregister_thread(&cinfo->recv_thread);
968 	lockres_free(cinfo->message_lockres);
969 	lockres_free(cinfo->token_lockres);
970 	lockres_free(cinfo->ack_lockres);
971 	lockres_free(cinfo->no_new_dev_lockres);
972 	lockres_free(cinfo->resync_lockres);
973 	lockres_free(cinfo->bitmap_lockres);
974 	unlock_all_bitmaps(mddev);
975 	dlm_release_lockspace(cinfo->lockspace, 2);
976 	kfree(cinfo);
977 	return 0;
978 }
979 
980 /* slot_number(): Returns the MD slot number to use
981  * DLM starts the slot numbers from 1, wheras cluster-md
982  * wants the number to be from zero, so we deduct one
983  */
984 static int slot_number(struct mddev *mddev)
985 {
986 	struct md_cluster_info *cinfo = mddev->cluster_info;
987 
988 	return cinfo->slot_number - 1;
989 }
990 
991 /*
992  * Check if the communication is already locked, else lock the communication
993  * channel.
994  * If it is already locked, token is in EX mode, and hence lock_token()
995  * should not be called.
996  */
997 static int metadata_update_start(struct mddev *mddev)
998 {
999 	struct md_cluster_info *cinfo = mddev->cluster_info;
1000 
1001 	wait_event(cinfo->wait,
1002 		   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
1003 		   test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
1004 
1005 	/* If token is already locked, return 0 */
1006 	if (cinfo->token_lockres->mode == DLM_LOCK_EX)
1007 		return 0;
1008 
1009 	return lock_token(cinfo);
1010 }
1011 
1012 static int metadata_update_finish(struct mddev *mddev)
1013 {
1014 	struct md_cluster_info *cinfo = mddev->cluster_info;
1015 	struct cluster_msg cmsg;
1016 	struct md_rdev *rdev;
1017 	int ret = 0;
1018 	int raid_slot = -1;
1019 
1020 	memset(&cmsg, 0, sizeof(cmsg));
1021 	cmsg.type = cpu_to_le32(METADATA_UPDATED);
1022 	/* Pick up a good active device number to send.
1023 	 */
1024 	rdev_for_each(rdev, mddev)
1025 		if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
1026 			raid_slot = rdev->desc_nr;
1027 			break;
1028 		}
1029 	if (raid_slot >= 0) {
1030 		cmsg.raid_slot = cpu_to_le32(raid_slot);
1031 		ret = __sendmsg(cinfo, &cmsg);
1032 	} else
1033 		pr_warn("md-cluster: No good device id found to send\n");
1034 	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1035 	unlock_comm(cinfo);
1036 	return ret;
1037 }
1038 
1039 static void metadata_update_cancel(struct mddev *mddev)
1040 {
1041 	struct md_cluster_info *cinfo = mddev->cluster_info;
1042 	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1043 	unlock_comm(cinfo);
1044 }
1045 
1046 static int resync_start(struct mddev *mddev)
1047 {
1048 	struct md_cluster_info *cinfo = mddev->cluster_info;
1049 	return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
1050 }
1051 
1052 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
1053 {
1054 	struct md_cluster_info *cinfo = mddev->cluster_info;
1055 	struct resync_info ri;
1056 	struct cluster_msg cmsg = {0};
1057 
1058 	/* do not send zero again, if we have sent before */
1059 	if (hi == 0) {
1060 		memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
1061 		if (le64_to_cpu(ri.hi) == 0)
1062 			return 0;
1063 	}
1064 
1065 	add_resync_info(cinfo->bitmap_lockres, lo, hi);
1066 	/* Re-acquire the lock to refresh LVB */
1067 	dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
1068 	cmsg.type = cpu_to_le32(RESYNCING);
1069 	cmsg.low = cpu_to_le64(lo);
1070 	cmsg.high = cpu_to_le64(hi);
1071 
1072 	return sendmsg(cinfo, &cmsg);
1073 }
1074 
1075 static int resync_finish(struct mddev *mddev)
1076 {
1077 	struct md_cluster_info *cinfo = mddev->cluster_info;
1078 	dlm_unlock_sync(cinfo->resync_lockres);
1079 	return resync_info_update(mddev, 0, 0);
1080 }
1081 
1082 static int area_resyncing(struct mddev *mddev, int direction,
1083 		sector_t lo, sector_t hi)
1084 {
1085 	struct md_cluster_info *cinfo = mddev->cluster_info;
1086 	int ret = 0;
1087 	struct suspend_info *s;
1088 
1089 	if ((direction == READ) &&
1090 		test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
1091 		return 1;
1092 
1093 	spin_lock_irq(&cinfo->suspend_lock);
1094 	if (list_empty(&cinfo->suspend_list))
1095 		goto out;
1096 	list_for_each_entry(s, &cinfo->suspend_list, list)
1097 		if (hi > s->lo && lo < s->hi) {
1098 			ret = 1;
1099 			break;
1100 		}
1101 out:
1102 	spin_unlock_irq(&cinfo->suspend_lock);
1103 	return ret;
1104 }
1105 
1106 /* add_new_disk() - initiates a disk add
1107  * However, if this fails before writing md_update_sb(),
1108  * add_new_disk_cancel() must be called to release token lock
1109  */
1110 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
1111 {
1112 	struct md_cluster_info *cinfo = mddev->cluster_info;
1113 	struct cluster_msg cmsg;
1114 	int ret = 0;
1115 	struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
1116 	char *uuid = sb->device_uuid;
1117 
1118 	memset(&cmsg, 0, sizeof(cmsg));
1119 	cmsg.type = cpu_to_le32(NEWDISK);
1120 	memcpy(cmsg.uuid, uuid, 16);
1121 	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1122 	lock_comm(cinfo);
1123 	ret = __sendmsg(cinfo, &cmsg);
1124 	if (ret)
1125 		return ret;
1126 	cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
1127 	ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
1128 	cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
1129 	/* Some node does not "see" the device */
1130 	if (ret == -EAGAIN)
1131 		ret = -ENOENT;
1132 	if (ret)
1133 		unlock_comm(cinfo);
1134 	else {
1135 		dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
1136 		/* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
1137 		 * will run soon after add_new_disk, the below path will be
1138 		 * invoked:
1139 		 *   md_wakeup_thread(mddev->thread)
1140 		 *	-> conf->thread (raid1d)
1141 		 *	-> md_check_recovery -> md_update_sb
1142 		 *	-> metadata_update_start/finish
1143 		 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
1144 		 *
1145 		 * For other failure cases, metadata_update_cancel and
1146 		 * add_new_disk_cancel also clear below bit as well.
1147 		 * */
1148 		set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1149 		wake_up(&cinfo->wait);
1150 	}
1151 	return ret;
1152 }
1153 
1154 static void add_new_disk_cancel(struct mddev *mddev)
1155 {
1156 	struct md_cluster_info *cinfo = mddev->cluster_info;
1157 	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1158 	unlock_comm(cinfo);
1159 }
1160 
1161 static int new_disk_ack(struct mddev *mddev, bool ack)
1162 {
1163 	struct md_cluster_info *cinfo = mddev->cluster_info;
1164 
1165 	if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
1166 		pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
1167 		return -EINVAL;
1168 	}
1169 
1170 	if (ack)
1171 		dlm_unlock_sync(cinfo->no_new_dev_lockres);
1172 	complete(&cinfo->newdisk_completion);
1173 	return 0;
1174 }
1175 
1176 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
1177 {
1178 	struct cluster_msg cmsg = {0};
1179 	struct md_cluster_info *cinfo = mddev->cluster_info;
1180 	cmsg.type = cpu_to_le32(REMOVE);
1181 	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1182 	return sendmsg(cinfo, &cmsg);
1183 }
1184 
1185 static int lock_all_bitmaps(struct mddev *mddev)
1186 {
1187 	int slot, my_slot, ret, held = 1, i = 0;
1188 	char str[64];
1189 	struct md_cluster_info *cinfo = mddev->cluster_info;
1190 
1191 	cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) *
1192 					     sizeof(struct dlm_lock_resource *),
1193 					     GFP_KERNEL);
1194 	if (!cinfo->other_bitmap_lockres) {
1195 		pr_err("md: can't alloc mem for other bitmap locks\n");
1196 		return 0;
1197 	}
1198 
1199 	my_slot = slot_number(mddev);
1200 	for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
1201 		if (slot == my_slot)
1202 			continue;
1203 
1204 		memset(str, '\0', 64);
1205 		snprintf(str, 64, "bitmap%04d", slot);
1206 		cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
1207 		if (!cinfo->other_bitmap_lockres[i])
1208 			return -ENOMEM;
1209 
1210 		cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
1211 		ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
1212 		if (ret)
1213 			held = -1;
1214 		i++;
1215 	}
1216 
1217 	return held;
1218 }
1219 
1220 static void unlock_all_bitmaps(struct mddev *mddev)
1221 {
1222 	struct md_cluster_info *cinfo = mddev->cluster_info;
1223 	int i;
1224 
1225 	/* release other node's bitmap lock if they are existed */
1226 	if (cinfo->other_bitmap_lockres) {
1227 		for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
1228 			if (cinfo->other_bitmap_lockres[i]) {
1229 				lockres_free(cinfo->other_bitmap_lockres[i]);
1230 			}
1231 		}
1232 		kfree(cinfo->other_bitmap_lockres);
1233 	}
1234 }
1235 
1236 static int gather_bitmaps(struct md_rdev *rdev)
1237 {
1238 	int sn, err;
1239 	sector_t lo, hi;
1240 	struct cluster_msg cmsg = {0};
1241 	struct mddev *mddev = rdev->mddev;
1242 	struct md_cluster_info *cinfo = mddev->cluster_info;
1243 
1244 	cmsg.type = cpu_to_le32(RE_ADD);
1245 	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1246 	err = sendmsg(cinfo, &cmsg);
1247 	if (err)
1248 		goto out;
1249 
1250 	for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1251 		if (sn == (cinfo->slot_number - 1))
1252 			continue;
1253 		err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
1254 		if (err) {
1255 			pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1256 			goto out;
1257 		}
1258 		if ((hi > 0) && (lo < mddev->recovery_cp))
1259 			mddev->recovery_cp = lo;
1260 	}
1261 out:
1262 	return err;
1263 }
1264 
1265 static struct md_cluster_operations cluster_ops = {
1266 	.join   = join,
1267 	.leave  = leave,
1268 	.slot_number = slot_number,
1269 	.resync_start = resync_start,
1270 	.resync_finish = resync_finish,
1271 	.resync_info_update = resync_info_update,
1272 	.metadata_update_start = metadata_update_start,
1273 	.metadata_update_finish = metadata_update_finish,
1274 	.metadata_update_cancel = metadata_update_cancel,
1275 	.area_resyncing = area_resyncing,
1276 	.add_new_disk = add_new_disk,
1277 	.add_new_disk_cancel = add_new_disk_cancel,
1278 	.new_disk_ack = new_disk_ack,
1279 	.remove_disk = remove_disk,
1280 	.load_bitmaps = load_bitmaps,
1281 	.gather_bitmaps = gather_bitmaps,
1282 	.lock_all_bitmaps = lock_all_bitmaps,
1283 	.unlock_all_bitmaps = unlock_all_bitmaps,
1284 };
1285 
1286 static int __init cluster_init(void)
1287 {
1288 	pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
1289 	pr_info("Registering Cluster MD functions\n");
1290 	register_md_cluster_operations(&cluster_ops, THIS_MODULE);
1291 	return 0;
1292 }
1293 
1294 static void cluster_exit(void)
1295 {
1296 	unregister_md_cluster_operations();
1297 }
1298 
1299 module_init(cluster_init);
1300 module_exit(cluster_exit);
1301 MODULE_AUTHOR("SUSE");
1302 MODULE_LICENSE("GPL");
1303 MODULE_DESCRIPTION("Clustering support for MD");
1304