xref: /linux/drivers/md/md-cluster.c (revision 9b960d8cd6f712cb2c03e2bdd4d5ca058238037f)
1  // SPDX-License-Identifier: GPL-2.0-or-later
2  /*
3   * Copyright (C) 2015, SUSE
4   */
5  
6  
7  #include <linux/module.h>
8  #include <linux/kthread.h>
9  #include <linux/dlm.h>
10  #include <linux/sched.h>
11  #include <linux/raid/md_p.h>
12  #include "md.h"
13  #include "md-bitmap.h"
14  #include "md-cluster.h"
15  
16  #define LVB_SIZE	64
17  #define NEW_DEV_TIMEOUT 5000
18  #define WAIT_DLM_LOCK_TIMEOUT (30 * HZ)
19  
20  struct dlm_lock_resource {
21  	dlm_lockspace_t *ls;
22  	struct dlm_lksb lksb;
23  	char *name; /* lock name. */
24  	uint32_t flags; /* flags to pass to dlm_lock() */
25  	wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
26  	bool sync_locking_done;
27  	void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
28  	struct mddev *mddev; /* pointing back to mddev. */
29  	int mode;
30  };
31  
32  struct resync_info {
33  	__le64 lo;
34  	__le64 hi;
35  };
36  
37  /* md_cluster_info flags */
38  #define		MD_CLUSTER_WAITING_FOR_NEWDISK		1
39  #define		MD_CLUSTER_SUSPEND_READ_BALANCING	2
40  #define		MD_CLUSTER_BEGIN_JOIN_CLUSTER		3
41  
42  /* Lock the send communication. This is done through
43   * bit manipulation as opposed to a mutex in order to
44   * accommodate lock and hold. See next comment.
45   */
46  #define		MD_CLUSTER_SEND_LOCK			4
47  /* If cluster operations (such as adding a disk) must lock the
48   * communication channel, so as to perform extra operations
49   * (update metadata) and no other operation is allowed on the
50   * MD. Token needs to be locked and held until the operation
51   * completes witha md_update_sb(), which would eventually release
52   * the lock.
53   */
54  #define		MD_CLUSTER_SEND_LOCKED_ALREADY		5
55  /* We should receive message after node joined cluster and
56   * set up all the related infos such as bitmap and personality */
57  #define		MD_CLUSTER_ALREADY_IN_CLUSTER		6
58  #define		MD_CLUSTER_PENDING_RECV_EVENT		7
59  #define 	MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD		8
60  #define		MD_CLUSTER_WAITING_FOR_SYNC		9
61  
62  struct md_cluster_info {
63  	struct mddev *mddev; /* the md device which md_cluster_info belongs to */
64  	/* dlm lock space and resources for clustered raid. */
65  	dlm_lockspace_t *lockspace;
66  	int slot_number;
67  	struct completion completion;
68  	struct mutex recv_mutex;
69  	struct dlm_lock_resource *bitmap_lockres;
70  	struct dlm_lock_resource **other_bitmap_lockres;
71  	struct dlm_lock_resource *resync_lockres;
72  	struct list_head suspend_list;
73  
74  	spinlock_t suspend_lock;
75  	/* record the region which write should be suspended */
76  	sector_t suspend_lo;
77  	sector_t suspend_hi;
78  	int suspend_from; /* the slot which broadcast suspend_lo/hi */
79  
80  	struct md_thread __rcu *recovery_thread;
81  	unsigned long recovery_map;
82  	/* communication loc resources */
83  	struct dlm_lock_resource *ack_lockres;
84  	struct dlm_lock_resource *message_lockres;
85  	struct dlm_lock_resource *token_lockres;
86  	struct dlm_lock_resource *no_new_dev_lockres;
87  	struct md_thread __rcu *recv_thread;
88  	struct completion newdisk_completion;
89  	wait_queue_head_t wait;
90  	unsigned long state;
91  	/* record the region in RESYNCING message */
92  	sector_t sync_low;
93  	sector_t sync_hi;
94  };
95  
96  /* For compatibility, add the new msg_type at the end. */
97  enum msg_type {
98  	METADATA_UPDATED = 0,
99  	RESYNCING,
100  	NEWDISK,
101  	REMOVE,
102  	RE_ADD,
103  	BITMAP_NEEDS_SYNC,
104  	CHANGE_CAPACITY,
105  	BITMAP_RESIZE,
106  	RESYNCING_START,
107  };
108  
109  struct cluster_msg {
110  	__le32 type;
111  	__le32 slot;
112  	/* TODO: Unionize this for smaller footprint */
113  	__le64 low;
114  	__le64 high;
115  	char uuid[16];
116  	__le32 raid_slot;
117  };
118  
sync_ast(void * arg)119  static void sync_ast(void *arg)
120  {
121  	struct dlm_lock_resource *res;
122  
123  	res = arg;
124  	res->sync_locking_done = true;
125  	wake_up(&res->sync_locking);
126  }
127  
dlm_lock_sync(struct dlm_lock_resource * res,int mode)128  static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
129  {
130  	int ret = 0;
131  
132  	ret = dlm_lock(res->ls, mode, &res->lksb,
133  			res->flags, res->name, strlen(res->name),
134  			0, sync_ast, res, res->bast);
135  	if (ret)
136  		return ret;
137  	ret = wait_event_timeout(res->sync_locking, res->sync_locking_done,
138  				WAIT_DLM_LOCK_TIMEOUT);
139  	res->sync_locking_done = false;
140  	if (!ret) {
141  		pr_err("locking DLM '%s' timeout!\n", res->name);
142  		return -EBUSY;
143  	}
144  	if (res->lksb.sb_status == 0)
145  		res->mode = mode;
146  	return res->lksb.sb_status;
147  }
148  
dlm_unlock_sync(struct dlm_lock_resource * res)149  static int dlm_unlock_sync(struct dlm_lock_resource *res)
150  {
151  	return dlm_lock_sync(res, DLM_LOCK_NL);
152  }
153  
154  /*
155   * An variation of dlm_lock_sync, which make lock request could
156   * be interrupted
157   */
dlm_lock_sync_interruptible(struct dlm_lock_resource * res,int mode,struct mddev * mddev)158  static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
159  				       struct mddev *mddev)
160  {
161  	int ret = 0;
162  
163  	ret = dlm_lock(res->ls, mode, &res->lksb,
164  			res->flags, res->name, strlen(res->name),
165  			0, sync_ast, res, res->bast);
166  	if (ret)
167  		return ret;
168  
169  	wait_event(res->sync_locking, res->sync_locking_done
170  				      || kthread_should_stop()
171  				      || test_bit(MD_CLOSING, &mddev->flags));
172  	if (!res->sync_locking_done) {
173  		/*
174  		 * the convert queue contains the lock request when request is
175  		 * interrupted, and sync_ast could still be run, so need to
176  		 * cancel the request and reset completion
177  		 */
178  		ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
179  			&res->lksb, res);
180  		res->sync_locking_done = false;
181  		if (unlikely(ret != 0))
182  			pr_info("failed to cancel previous lock request "
183  				 "%s return %d\n", res->name, ret);
184  		return -EPERM;
185  	} else
186  		res->sync_locking_done = false;
187  	if (res->lksb.sb_status == 0)
188  		res->mode = mode;
189  	return res->lksb.sb_status;
190  }
191  
lockres_init(struct mddev * mddev,char * name,void (* bastfn)(void * arg,int mode),int with_lvb)192  static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
193  		char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
194  {
195  	struct dlm_lock_resource *res = NULL;
196  	int ret, namelen;
197  	struct md_cluster_info *cinfo = mddev->cluster_info;
198  
199  	res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
200  	if (!res)
201  		return NULL;
202  	init_waitqueue_head(&res->sync_locking);
203  	res->sync_locking_done = false;
204  	res->ls = cinfo->lockspace;
205  	res->mddev = mddev;
206  	res->mode = DLM_LOCK_IV;
207  	namelen = strlen(name);
208  	res->name = kzalloc(namelen + 1, GFP_KERNEL);
209  	if (!res->name) {
210  		pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
211  		goto out_err;
212  	}
213  	strscpy(res->name, name, namelen + 1);
214  	if (with_lvb) {
215  		res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
216  		if (!res->lksb.sb_lvbptr) {
217  			pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
218  			goto out_err;
219  		}
220  		res->flags = DLM_LKF_VALBLK;
221  	}
222  
223  	if (bastfn)
224  		res->bast = bastfn;
225  
226  	res->flags |= DLM_LKF_EXPEDITE;
227  
228  	ret = dlm_lock_sync(res, DLM_LOCK_NL);
229  	if (ret) {
230  		pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
231  		goto out_err;
232  	}
233  	res->flags &= ~DLM_LKF_EXPEDITE;
234  	res->flags |= DLM_LKF_CONVERT;
235  
236  	return res;
237  out_err:
238  	kfree(res->lksb.sb_lvbptr);
239  	kfree(res->name);
240  	kfree(res);
241  	return NULL;
242  }
243  
lockres_free(struct dlm_lock_resource * res)244  static void lockres_free(struct dlm_lock_resource *res)
245  {
246  	int ret = 0;
247  
248  	if (!res)
249  		return;
250  
251  	/*
252  	 * use FORCEUNLOCK flag, so we can unlock even the lock is on the
253  	 * waiting or convert queue
254  	 */
255  	ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
256  		&res->lksb, res);
257  	if (unlikely(ret != 0))
258  		pr_err("failed to unlock %s return %d\n", res->name, ret);
259  	else
260  		wait_event(res->sync_locking, res->sync_locking_done);
261  
262  	kfree(res->name);
263  	kfree(res->lksb.sb_lvbptr);
264  	kfree(res);
265  }
266  
add_resync_info(struct dlm_lock_resource * lockres,sector_t lo,sector_t hi)267  static void add_resync_info(struct dlm_lock_resource *lockres,
268  			    sector_t lo, sector_t hi)
269  {
270  	struct resync_info *ri;
271  
272  	ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
273  	ri->lo = cpu_to_le64(lo);
274  	ri->hi = cpu_to_le64(hi);
275  }
276  
read_resync_info(struct mddev * mddev,struct dlm_lock_resource * lockres)277  static int read_resync_info(struct mddev *mddev,
278  			    struct dlm_lock_resource *lockres)
279  {
280  	struct resync_info ri;
281  	struct md_cluster_info *cinfo = mddev->cluster_info;
282  	int ret = 0;
283  
284  	dlm_lock_sync(lockres, DLM_LOCK_CR);
285  	memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
286  	if (le64_to_cpu(ri.hi) > 0) {
287  		cinfo->suspend_hi = le64_to_cpu(ri.hi);
288  		cinfo->suspend_lo = le64_to_cpu(ri.lo);
289  		ret = 1;
290  	}
291  	dlm_unlock_sync(lockres);
292  	return ret;
293  }
294  
recover_bitmaps(struct md_thread * thread)295  static void recover_bitmaps(struct md_thread *thread)
296  {
297  	struct mddev *mddev = thread->mddev;
298  	struct md_cluster_info *cinfo = mddev->cluster_info;
299  	struct dlm_lock_resource *bm_lockres;
300  	char str[64];
301  	int slot, ret;
302  	sector_t lo, hi;
303  
304  	while (cinfo->recovery_map) {
305  		slot = fls64((u64)cinfo->recovery_map) - 1;
306  
307  		snprintf(str, 64, "bitmap%04d", slot);
308  		bm_lockres = lockres_init(mddev, str, NULL, 1);
309  		if (!bm_lockres) {
310  			pr_err("md-cluster: Cannot initialize bitmaps\n");
311  			goto clear_bit;
312  		}
313  
314  		ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
315  		if (ret) {
316  			pr_err("md-cluster: Could not DLM lock %s: %d\n",
317  					str, ret);
318  			goto clear_bit;
319  		}
320  		ret = mddev->bitmap_ops->copy_from_slot(mddev, slot, &lo, &hi, true);
321  		if (ret) {
322  			pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
323  			goto clear_bit;
324  		}
325  
326  		/* Clear suspend_area associated with the bitmap */
327  		spin_lock_irq(&cinfo->suspend_lock);
328  		cinfo->suspend_hi = 0;
329  		cinfo->suspend_lo = 0;
330  		cinfo->suspend_from = -1;
331  		spin_unlock_irq(&cinfo->suspend_lock);
332  
333  		/* Kick off a reshape if needed */
334  		if (test_bit(MD_RESYNCING_REMOTE, &mddev->recovery) &&
335  		    test_bit(MD_RECOVERY_RESHAPE, &mddev->recovery) &&
336  		    mddev->reshape_position != MaxSector)
337  			md_wakeup_thread(mddev->sync_thread);
338  
339  		if (hi > 0) {
340  			if (lo < mddev->recovery_cp)
341  				mddev->recovery_cp = lo;
342  			/* wake up thread to continue resync in case resync
343  			 * is not finished */
344  			if (mddev->recovery_cp != MaxSector) {
345  				/*
346  				 * clear the REMOTE flag since we will launch
347  				 * resync thread in current node.
348  				 */
349  				clear_bit(MD_RESYNCING_REMOTE,
350  					  &mddev->recovery);
351  				set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
352  				md_wakeup_thread(mddev->thread);
353  			}
354  		}
355  clear_bit:
356  		lockres_free(bm_lockres);
357  		clear_bit(slot, &cinfo->recovery_map);
358  	}
359  }
360  
recover_prep(void * arg)361  static void recover_prep(void *arg)
362  {
363  	struct mddev *mddev = arg;
364  	struct md_cluster_info *cinfo = mddev->cluster_info;
365  	set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
366  }
367  
__recover_slot(struct mddev * mddev,int slot)368  static void __recover_slot(struct mddev *mddev, int slot)
369  {
370  	struct md_cluster_info *cinfo = mddev->cluster_info;
371  
372  	set_bit(slot, &cinfo->recovery_map);
373  	if (!cinfo->recovery_thread) {
374  		rcu_assign_pointer(cinfo->recovery_thread,
375  			md_register_thread(recover_bitmaps, mddev, "recover"));
376  		if (!cinfo->recovery_thread) {
377  			pr_warn("md-cluster: Could not create recovery thread\n");
378  			return;
379  		}
380  	}
381  	md_wakeup_thread(cinfo->recovery_thread);
382  }
383  
recover_slot(void * arg,struct dlm_slot * slot)384  static void recover_slot(void *arg, struct dlm_slot *slot)
385  {
386  	struct mddev *mddev = arg;
387  	struct md_cluster_info *cinfo = mddev->cluster_info;
388  
389  	pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
390  			mddev->bitmap_info.cluster_name,
391  			slot->nodeid, slot->slot,
392  			cinfo->slot_number);
393  	/* deduct one since dlm slot starts from one while the num of
394  	 * cluster-md begins with 0 */
395  	__recover_slot(mddev, slot->slot - 1);
396  }
397  
recover_done(void * arg,struct dlm_slot * slots,int num_slots,int our_slot,uint32_t generation)398  static void recover_done(void *arg, struct dlm_slot *slots,
399  		int num_slots, int our_slot,
400  		uint32_t generation)
401  {
402  	struct mddev *mddev = arg;
403  	struct md_cluster_info *cinfo = mddev->cluster_info;
404  
405  	cinfo->slot_number = our_slot;
406  	/* completion is only need to be complete when node join cluster,
407  	 * it doesn't need to run during another node's failure */
408  	if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
409  		complete(&cinfo->completion);
410  		clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
411  	}
412  	clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
413  }
414  
415  /* the ops is called when node join the cluster, and do lock recovery
416   * if node failure occurs */
417  static const struct dlm_lockspace_ops md_ls_ops = {
418  	.recover_prep = recover_prep,
419  	.recover_slot = recover_slot,
420  	.recover_done = recover_done,
421  };
422  
423  /*
424   * The BAST function for the ack lock resource
425   * This function wakes up the receive thread in
426   * order to receive and process the message.
427   */
ack_bast(void * arg,int mode)428  static void ack_bast(void *arg, int mode)
429  {
430  	struct dlm_lock_resource *res = arg;
431  	struct md_cluster_info *cinfo = res->mddev->cluster_info;
432  
433  	if (mode == DLM_LOCK_EX) {
434  		if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
435  			md_wakeup_thread(cinfo->recv_thread);
436  		else
437  			set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
438  	}
439  }
440  
remove_suspend_info(struct mddev * mddev,int slot)441  static void remove_suspend_info(struct mddev *mddev, int slot)
442  {
443  	struct md_cluster_info *cinfo = mddev->cluster_info;
444  	mddev->pers->quiesce(mddev, 1);
445  	spin_lock_irq(&cinfo->suspend_lock);
446  	cinfo->suspend_hi = 0;
447  	cinfo->suspend_lo = 0;
448  	spin_unlock_irq(&cinfo->suspend_lock);
449  	mddev->pers->quiesce(mddev, 0);
450  }
451  
process_suspend_info(struct mddev * mddev,int slot,sector_t lo,sector_t hi)452  static void process_suspend_info(struct mddev *mddev,
453  		int slot, sector_t lo, sector_t hi)
454  {
455  	struct md_cluster_info *cinfo = mddev->cluster_info;
456  	struct mdp_superblock_1 *sb = NULL;
457  	struct md_rdev *rdev;
458  
459  	if (!hi) {
460  		/*
461  		 * clear the REMOTE flag since resync or recovery is finished
462  		 * in remote node.
463  		 */
464  		clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
465  		remove_suspend_info(mddev, slot);
466  		set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
467  		clear_bit(MD_CLUSTER_WAITING_FOR_SYNC, &cinfo->state);
468  		md_wakeup_thread(mddev->thread);
469  		return;
470  	}
471  
472  	rdev_for_each(rdev, mddev)
473  		if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
474  			sb = page_address(rdev->sb_page);
475  			break;
476  		}
477  
478  	/*
479  	 * The bitmaps are not same for different nodes
480  	 * if RESYNCING is happening in one node, then
481  	 * the node which received the RESYNCING message
482  	 * probably will perform resync with the region
483  	 * [lo, hi] again, so we could reduce resync time
484  	 * a lot if we can ensure that the bitmaps among
485  	 * different nodes are match up well.
486  	 *
487  	 * sync_low/hi is used to record the region which
488  	 * arrived in the previous RESYNCING message,
489  	 *
490  	 * Call md_bitmap_sync_with_cluster to clear NEEDED_MASK
491  	 * and set RESYNC_MASK since  resync thread is running
492  	 * in another node, so we don't need to do the resync
493  	 * again with the same section.
494  	 *
495  	 * Skip md_bitmap_sync_with_cluster in case reshape
496  	 * happening, because reshaping region is small and
497  	 * we don't want to trigger lots of WARN.
498  	 */
499  	if (sb && !(le32_to_cpu(sb->feature_map) & MD_FEATURE_RESHAPE_ACTIVE))
500  		mddev->bitmap_ops->sync_with_cluster(mddev, cinfo->sync_low,
501  						     cinfo->sync_hi, lo, hi);
502  	cinfo->sync_low = lo;
503  	cinfo->sync_hi = hi;
504  
505  	mddev->pers->quiesce(mddev, 1);
506  	spin_lock_irq(&cinfo->suspend_lock);
507  	cinfo->suspend_from = slot;
508  	cinfo->suspend_lo = lo;
509  	cinfo->suspend_hi = hi;
510  	spin_unlock_irq(&cinfo->suspend_lock);
511  	mddev->pers->quiesce(mddev, 0);
512  }
513  
process_add_new_disk(struct mddev * mddev,struct cluster_msg * cmsg)514  static int process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
515  {
516  	char disk_uuid[64];
517  	struct md_cluster_info *cinfo = mddev->cluster_info;
518  	char event_name[] = "EVENT=ADD_DEVICE";
519  	char raid_slot[16];
520  	char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
521  	int len;
522  	int res = 0;
523  
524  	len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
525  	sprintf(disk_uuid + len, "%pU", cmsg->uuid);
526  	snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
527  	pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
528  	init_completion(&cinfo->newdisk_completion);
529  	set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
530  	kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
531  	if (!wait_for_completion_timeout(&cinfo->newdisk_completion,
532  					NEW_DEV_TIMEOUT)) {
533  		pr_err("md-cluster(%s:%d): timeout on a new disk adding\n",
534  			__func__, __LINE__);
535  		res = -1;
536  	}
537  	clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
538  	set_bit(MD_CLUSTER_WAITING_FOR_SYNC, &cinfo->state);
539  	return res;
540  }
541  
542  
process_metadata_update(struct mddev * mddev,struct cluster_msg * msg)543  static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
544  {
545  	int got_lock = 0;
546  	struct md_thread *thread;
547  	struct md_cluster_info *cinfo = mddev->cluster_info;
548  	mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
549  
550  	dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
551  
552  	/* daemaon thread must exist */
553  	thread = rcu_dereference_protected(mddev->thread, true);
554  	wait_event(thread->wqueue,
555  		   (got_lock = mddev_trylock(mddev)) ||
556  		    test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
557  	md_reload_sb(mddev, mddev->good_device_nr);
558  	if (got_lock)
559  		mddev_unlock(mddev);
560  }
561  
process_remove_disk(struct mddev * mddev,struct cluster_msg * msg)562  static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
563  {
564  	struct md_rdev *rdev;
565  
566  	rcu_read_lock();
567  	rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
568  	if (rdev) {
569  		set_bit(ClusterRemove, &rdev->flags);
570  		set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
571  		md_wakeup_thread(mddev->thread);
572  	}
573  	else
574  		pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
575  			__func__, __LINE__, le32_to_cpu(msg->raid_slot));
576  	rcu_read_unlock();
577  }
578  
process_readd_disk(struct mddev * mddev,struct cluster_msg * msg)579  static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
580  {
581  	struct md_rdev *rdev;
582  
583  	rcu_read_lock();
584  	rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
585  	if (rdev && test_bit(Faulty, &rdev->flags))
586  		clear_bit(Faulty, &rdev->flags);
587  	else
588  		pr_warn("%s: %d Could not find disk(%d) which is faulty",
589  			__func__, __LINE__, le32_to_cpu(msg->raid_slot));
590  	rcu_read_unlock();
591  }
592  
process_recvd_msg(struct mddev * mddev,struct cluster_msg * msg)593  static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
594  {
595  	int ret = 0;
596  
597  	if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
598  		"node %d received its own msg\n", le32_to_cpu(msg->slot)))
599  		return -1;
600  	switch (le32_to_cpu(msg->type)) {
601  	case METADATA_UPDATED:
602  		process_metadata_update(mddev, msg);
603  		break;
604  	case CHANGE_CAPACITY:
605  		set_capacity_and_notify(mddev->gendisk, mddev->array_sectors);
606  		break;
607  	case RESYNCING_START:
608  		clear_bit(MD_CLUSTER_WAITING_FOR_SYNC, &mddev->cluster_info->state);
609  		break;
610  	case RESYNCING:
611  		set_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
612  		process_suspend_info(mddev, le32_to_cpu(msg->slot),
613  				     le64_to_cpu(msg->low),
614  				     le64_to_cpu(msg->high));
615  		break;
616  	case NEWDISK:
617  		if (process_add_new_disk(mddev, msg))
618  			ret = -1;
619  		break;
620  	case REMOVE:
621  		process_remove_disk(mddev, msg);
622  		break;
623  	case RE_ADD:
624  		process_readd_disk(mddev, msg);
625  		break;
626  	case BITMAP_NEEDS_SYNC:
627  		__recover_slot(mddev, le32_to_cpu(msg->slot));
628  		break;
629  	case BITMAP_RESIZE:
630  		if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0))
631  			ret = mddev->bitmap_ops->resize(mddev,
632  							le64_to_cpu(msg->high),
633  							0, false);
634  		break;
635  	default:
636  		ret = -1;
637  		pr_warn("%s:%d Received unknown message from %d\n",
638  			__func__, __LINE__, msg->slot);
639  	}
640  	return ret;
641  }
642  
643  /*
644   * thread for receiving message
645   */
recv_daemon(struct md_thread * thread)646  static void recv_daemon(struct md_thread *thread)
647  {
648  	struct md_cluster_info *cinfo = thread->mddev->cluster_info;
649  	struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
650  	struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
651  	struct cluster_msg msg;
652  	int ret;
653  
654  	mutex_lock(&cinfo->recv_mutex);
655  	/*get CR on Message*/
656  	if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
657  		pr_err("md/raid1:failed to get CR on MESSAGE\n");
658  		mutex_unlock(&cinfo->recv_mutex);
659  		return;
660  	}
661  
662  	/* read lvb and wake up thread to process this message_lockres */
663  	memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
664  	ret = process_recvd_msg(thread->mddev, &msg);
665  	if (ret)
666  		goto out;
667  
668  	/*release CR on ack_lockres*/
669  	ret = dlm_unlock_sync(ack_lockres);
670  	if (unlikely(ret != 0))
671  		pr_info("unlock ack failed return %d\n", ret);
672  	/*up-convert to PR on message_lockres*/
673  	ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
674  	if (unlikely(ret != 0))
675  		pr_info("lock PR on msg failed return %d\n", ret);
676  	/*get CR on ack_lockres again*/
677  	ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
678  	if (unlikely(ret != 0))
679  		pr_info("lock CR on ack failed return %d\n", ret);
680  out:
681  	/*release CR on message_lockres*/
682  	ret = dlm_unlock_sync(message_lockres);
683  	if (unlikely(ret != 0))
684  		pr_info("unlock msg failed return %d\n", ret);
685  	mutex_unlock(&cinfo->recv_mutex);
686  }
687  
688  /* lock_token()
689   * Takes the lock on the TOKEN lock resource so no other
690   * node can communicate while the operation is underway.
691   */
lock_token(struct md_cluster_info * cinfo)692  static int lock_token(struct md_cluster_info *cinfo)
693  {
694  	int error;
695  
696  	error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
697  	if (error) {
698  		pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
699  				__func__, __LINE__, error);
700  	} else {
701  		/* Lock the receive sequence */
702  		mutex_lock(&cinfo->recv_mutex);
703  	}
704  	return error;
705  }
706  
707  /* lock_comm()
708   * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
709   */
lock_comm(struct md_cluster_info * cinfo,bool mddev_locked)710  static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
711  {
712  	int rv, set_bit = 0;
713  	struct mddev *mddev = cinfo->mddev;
714  
715  	/*
716  	 * If resync thread run after raid1d thread, then process_metadata_update
717  	 * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
718  	 * since another node already got EX on Token and waiting the EX of Ack),
719  	 * so let resync wake up thread in case flag is set.
720  	 */
721  	if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
722  				      &cinfo->state)) {
723  		rv = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
724  					      &cinfo->state);
725  		WARN_ON_ONCE(rv);
726  		md_wakeup_thread(mddev->thread);
727  		set_bit = 1;
728  	}
729  
730  	wait_event(cinfo->wait,
731  		   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
732  	rv = lock_token(cinfo);
733  	if (set_bit)
734  		clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
735  	return rv;
736  }
737  
unlock_comm(struct md_cluster_info * cinfo)738  static void unlock_comm(struct md_cluster_info *cinfo)
739  {
740  	WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
741  	mutex_unlock(&cinfo->recv_mutex);
742  	dlm_unlock_sync(cinfo->token_lockres);
743  	clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
744  	wake_up(&cinfo->wait);
745  }
746  
747  /* __sendmsg()
748   * This function performs the actual sending of the message. This function is
749   * usually called after performing the encompassing operation
750   * The function:
751   * 1. Grabs the message lockresource in EX mode
752   * 2. Copies the message to the message LVB
753   * 3. Downconverts message lockresource to CW
754   * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
755   *    and the other nodes read the message. The thread will wait here until all other
756   *    nodes have released ack lock resource.
757   * 5. Downconvert ack lockresource to CR
758   */
__sendmsg(struct md_cluster_info * cinfo,struct cluster_msg * cmsg)759  static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
760  {
761  	int error, unlock_error;
762  	int slot = cinfo->slot_number - 1;
763  
764  	cmsg->slot = cpu_to_le32(slot);
765  	/*get EX on Message*/
766  	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
767  	if (error) {
768  		pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
769  		return error;
770  	}
771  
772  	memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
773  			sizeof(struct cluster_msg));
774  	/*down-convert EX to CW on Message*/
775  	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
776  	if (error) {
777  		pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
778  				error);
779  		goto failed_ack;
780  	}
781  
782  	/*up-convert CR to EX on Ack*/
783  	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
784  	if (error) {
785  		pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
786  				error);
787  		goto failed_ack;
788  	}
789  
790  	/*down-convert EX to CR on Ack*/
791  	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
792  	if (error) {
793  		pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
794  				error);
795  		goto failed_ack;
796  	}
797  
798  failed_ack:
799  	while ((unlock_error = dlm_unlock_sync(cinfo->message_lockres)))
800  		pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
801  			unlock_error);
802  
803  	return error;
804  }
805  
sendmsg(struct md_cluster_info * cinfo,struct cluster_msg * cmsg,bool mddev_locked)806  static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
807  		   bool mddev_locked)
808  {
809  	int ret;
810  
811  	ret = lock_comm(cinfo, mddev_locked);
812  	if (!ret) {
813  		ret = __sendmsg(cinfo, cmsg);
814  		unlock_comm(cinfo);
815  	}
816  	return ret;
817  }
818  
gather_all_resync_info(struct mddev * mddev,int total_slots)819  static int gather_all_resync_info(struct mddev *mddev, int total_slots)
820  {
821  	struct md_cluster_info *cinfo = mddev->cluster_info;
822  	int i, ret = 0;
823  	struct dlm_lock_resource *bm_lockres;
824  	char str[64];
825  	sector_t lo, hi;
826  
827  
828  	for (i = 0; i < total_slots; i++) {
829  		memset(str, '\0', 64);
830  		snprintf(str, 64, "bitmap%04d", i);
831  		bm_lockres = lockres_init(mddev, str, NULL, 1);
832  		if (!bm_lockres)
833  			return -ENOMEM;
834  		if (i == (cinfo->slot_number - 1)) {
835  			lockres_free(bm_lockres);
836  			continue;
837  		}
838  
839  		bm_lockres->flags |= DLM_LKF_NOQUEUE;
840  		ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
841  		if (ret == -EAGAIN) {
842  			if (read_resync_info(mddev, bm_lockres)) {
843  				pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
844  						__func__, __LINE__,
845  					(unsigned long long) cinfo->suspend_lo,
846  					(unsigned long long) cinfo->suspend_hi,
847  					i);
848  				cinfo->suspend_from = i;
849  			}
850  			ret = 0;
851  			lockres_free(bm_lockres);
852  			continue;
853  		}
854  		if (ret) {
855  			lockres_free(bm_lockres);
856  			goto out;
857  		}
858  
859  		/* Read the disk bitmap sb and check if it needs recovery */
860  		ret = mddev->bitmap_ops->copy_from_slot(mddev, i, &lo, &hi, false);
861  		if (ret) {
862  			pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
863  			lockres_free(bm_lockres);
864  			continue;
865  		}
866  		if ((hi > 0) && (lo < mddev->recovery_cp)) {
867  			set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
868  			mddev->recovery_cp = lo;
869  			md_check_recovery(mddev);
870  		}
871  
872  		lockres_free(bm_lockres);
873  	}
874  out:
875  	return ret;
876  }
877  
join(struct mddev * mddev,int nodes)878  static int join(struct mddev *mddev, int nodes)
879  {
880  	struct md_cluster_info *cinfo;
881  	int ret, ops_rv;
882  	char str[64];
883  
884  	cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
885  	if (!cinfo)
886  		return -ENOMEM;
887  
888  	INIT_LIST_HEAD(&cinfo->suspend_list);
889  	spin_lock_init(&cinfo->suspend_lock);
890  	init_completion(&cinfo->completion);
891  	set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
892  	init_waitqueue_head(&cinfo->wait);
893  	mutex_init(&cinfo->recv_mutex);
894  
895  	mddev->cluster_info = cinfo;
896  	cinfo->mddev = mddev;
897  
898  	memset(str, 0, 64);
899  	sprintf(str, "%pU", mddev->uuid);
900  	ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
901  				DLM_LSFL_SOFTIRQ, LVB_SIZE, &md_ls_ops, mddev,
902  				&ops_rv, &cinfo->lockspace);
903  	if (ret)
904  		goto err;
905  	wait_for_completion(&cinfo->completion);
906  	if (nodes < cinfo->slot_number) {
907  		pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
908  			cinfo->slot_number, nodes);
909  		ret = -ERANGE;
910  		goto err;
911  	}
912  	/* Initiate the communication resources */
913  	ret = -ENOMEM;
914  	rcu_assign_pointer(cinfo->recv_thread,
915  			md_register_thread(recv_daemon, mddev, "cluster_recv"));
916  	if (!cinfo->recv_thread) {
917  		pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
918  		goto err;
919  	}
920  	cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
921  	if (!cinfo->message_lockres)
922  		goto err;
923  	cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
924  	if (!cinfo->token_lockres)
925  		goto err;
926  	cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
927  	if (!cinfo->no_new_dev_lockres)
928  		goto err;
929  
930  	ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
931  	if (ret) {
932  		ret = -EAGAIN;
933  		pr_err("md-cluster: can't join cluster to avoid lock issue\n");
934  		goto err;
935  	}
936  	cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
937  	if (!cinfo->ack_lockres) {
938  		ret = -ENOMEM;
939  		goto err;
940  	}
941  	/* get sync CR lock on ACK. */
942  	if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
943  		pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
944  				ret);
945  	dlm_unlock_sync(cinfo->token_lockres);
946  	/* get sync CR lock on no-new-dev. */
947  	if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
948  		pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
949  
950  
951  	pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
952  	snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
953  	cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
954  	if (!cinfo->bitmap_lockres) {
955  		ret = -ENOMEM;
956  		goto err;
957  	}
958  	if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
959  		pr_err("Failed to get bitmap lock\n");
960  		ret = -EINVAL;
961  		goto err;
962  	}
963  
964  	cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
965  	if (!cinfo->resync_lockres) {
966  		ret = -ENOMEM;
967  		goto err;
968  	}
969  
970  	return 0;
971  err:
972  	set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
973  	md_unregister_thread(mddev, &cinfo->recovery_thread);
974  	md_unregister_thread(mddev, &cinfo->recv_thread);
975  	lockres_free(cinfo->message_lockres);
976  	lockres_free(cinfo->token_lockres);
977  	lockres_free(cinfo->ack_lockres);
978  	lockres_free(cinfo->no_new_dev_lockres);
979  	lockres_free(cinfo->resync_lockres);
980  	lockres_free(cinfo->bitmap_lockres);
981  	if (cinfo->lockspace)
982  		dlm_release_lockspace(cinfo->lockspace, 2);
983  	mddev->cluster_info = NULL;
984  	kfree(cinfo);
985  	return ret;
986  }
987  
load_bitmaps(struct mddev * mddev,int total_slots)988  static void load_bitmaps(struct mddev *mddev, int total_slots)
989  {
990  	struct md_cluster_info *cinfo = mddev->cluster_info;
991  
992  	/* load all the node's bitmap info for resync */
993  	if (gather_all_resync_info(mddev, total_slots))
994  		pr_err("md-cluster: failed to gather all resyn infos\n");
995  	set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
996  	/* wake up recv thread in case something need to be handled */
997  	if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
998  		md_wakeup_thread(cinfo->recv_thread);
999  }
1000  
resync_bitmap(struct mddev * mddev)1001  static void resync_bitmap(struct mddev *mddev)
1002  {
1003  	struct md_cluster_info *cinfo = mddev->cluster_info;
1004  	struct cluster_msg cmsg = {0};
1005  	int err;
1006  
1007  	cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
1008  	err = sendmsg(cinfo, &cmsg, 1);
1009  	if (err)
1010  		pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
1011  			__func__, __LINE__, err);
1012  }
1013  
1014  static void unlock_all_bitmaps(struct mddev *mddev);
leave(struct mddev * mddev)1015  static int leave(struct mddev *mddev)
1016  {
1017  	struct md_cluster_info *cinfo = mddev->cluster_info;
1018  
1019  	if (!cinfo)
1020  		return 0;
1021  
1022  	/*
1023  	 * BITMAP_NEEDS_SYNC message should be sent when node
1024  	 * is leaving the cluster with dirty bitmap, also we
1025  	 * can only deliver it when dlm connection is available.
1026  	 *
1027  	 * Also, we should send BITMAP_NEEDS_SYNC message in
1028  	 * case reshaping is interrupted.
1029  	 */
1030  	if ((cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) ||
1031  	    (mddev->reshape_position != MaxSector &&
1032  	     test_bit(MD_CLOSING, &mddev->flags)))
1033  		resync_bitmap(mddev);
1034  
1035  	set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1036  	md_unregister_thread(mddev, &cinfo->recovery_thread);
1037  	md_unregister_thread(mddev, &cinfo->recv_thread);
1038  	lockres_free(cinfo->message_lockres);
1039  	lockres_free(cinfo->token_lockres);
1040  	lockres_free(cinfo->ack_lockres);
1041  	lockres_free(cinfo->no_new_dev_lockres);
1042  	lockres_free(cinfo->resync_lockres);
1043  	lockres_free(cinfo->bitmap_lockres);
1044  	unlock_all_bitmaps(mddev);
1045  	dlm_release_lockspace(cinfo->lockspace, 2);
1046  	kfree(cinfo);
1047  	return 0;
1048  }
1049  
1050  /* slot_number(): Returns the MD slot number to use
1051   * DLM starts the slot numbers from 1, wheras cluster-md
1052   * wants the number to be from zero, so we deduct one
1053   */
slot_number(struct mddev * mddev)1054  static int slot_number(struct mddev *mddev)
1055  {
1056  	struct md_cluster_info *cinfo = mddev->cluster_info;
1057  
1058  	return cinfo->slot_number - 1;
1059  }
1060  
1061  /*
1062   * Check if the communication is already locked, else lock the communication
1063   * channel.
1064   * If it is already locked, token is in EX mode, and hence lock_token()
1065   * should not be called.
1066   */
metadata_update_start(struct mddev * mddev)1067  static int metadata_update_start(struct mddev *mddev)
1068  {
1069  	struct md_cluster_info *cinfo = mddev->cluster_info;
1070  	int ret;
1071  
1072  	/*
1073  	 * metadata_update_start is always called with the protection of
1074  	 * reconfig_mutex, so set WAITING_FOR_TOKEN here.
1075  	 */
1076  	ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
1077  				    &cinfo->state);
1078  	WARN_ON_ONCE(ret);
1079  	md_wakeup_thread(mddev->thread);
1080  
1081  	wait_event(cinfo->wait,
1082  		   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
1083  		   test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
1084  
1085  	/* If token is already locked, return 0 */
1086  	if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
1087  		clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1088  		return 0;
1089  	}
1090  
1091  	ret = lock_token(cinfo);
1092  	clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1093  	return ret;
1094  }
1095  
metadata_update_finish(struct mddev * mddev)1096  static int metadata_update_finish(struct mddev *mddev)
1097  {
1098  	struct md_cluster_info *cinfo = mddev->cluster_info;
1099  	struct cluster_msg cmsg;
1100  	struct md_rdev *rdev;
1101  	int ret = 0;
1102  	int raid_slot = -1;
1103  
1104  	memset(&cmsg, 0, sizeof(cmsg));
1105  	cmsg.type = cpu_to_le32(METADATA_UPDATED);
1106  	/* Pick up a good active device number to send.
1107  	 */
1108  	rdev_for_each(rdev, mddev)
1109  		if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
1110  			raid_slot = rdev->desc_nr;
1111  			break;
1112  		}
1113  	if (raid_slot >= 0) {
1114  		cmsg.raid_slot = cpu_to_le32(raid_slot);
1115  		ret = __sendmsg(cinfo, &cmsg);
1116  	} else
1117  		pr_warn("md-cluster: No good device id found to send\n");
1118  	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1119  	unlock_comm(cinfo);
1120  	return ret;
1121  }
1122  
metadata_update_cancel(struct mddev * mddev)1123  static void metadata_update_cancel(struct mddev *mddev)
1124  {
1125  	struct md_cluster_info *cinfo = mddev->cluster_info;
1126  	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1127  	unlock_comm(cinfo);
1128  }
1129  
update_bitmap_size(struct mddev * mddev,sector_t size)1130  static int update_bitmap_size(struct mddev *mddev, sector_t size)
1131  {
1132  	struct md_cluster_info *cinfo = mddev->cluster_info;
1133  	struct cluster_msg cmsg = {0};
1134  	int ret;
1135  
1136  	cmsg.type = cpu_to_le32(BITMAP_RESIZE);
1137  	cmsg.high = cpu_to_le64(size);
1138  	ret = sendmsg(cinfo, &cmsg, 0);
1139  	if (ret)
1140  		pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n",
1141  			__func__, __LINE__, ret);
1142  	return ret;
1143  }
1144  
resize_bitmaps(struct mddev * mddev,sector_t newsize,sector_t oldsize)1145  static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize)
1146  {
1147  	void *bitmap = mddev->bitmap;
1148  	struct md_bitmap_stats stats;
1149  	unsigned long my_pages;
1150  	int i, rv;
1151  
1152  	rv = mddev->bitmap_ops->get_stats(bitmap, &stats);
1153  	if (rv)
1154  		return rv;
1155  
1156  	my_pages = stats.pages;
1157  	/*
1158  	 * We need to ensure all the nodes can grow to a larger
1159  	 * bitmap size before make the reshaping.
1160  	 */
1161  	rv = update_bitmap_size(mddev, newsize);
1162  	if (rv)
1163  		return rv;
1164  
1165  	for (i = 0; i < mddev->bitmap_info.nodes; i++) {
1166  		struct dlm_lock_resource *bm_lockres;
1167  		char str[64];
1168  
1169  		if (i == slot_number(mddev))
1170  			continue;
1171  
1172  		bitmap = mddev->bitmap_ops->get_from_slot(mddev, i);
1173  		if (IS_ERR(bitmap)) {
1174  			pr_err("can't get bitmap from slot %d\n", i);
1175  			bitmap = NULL;
1176  			goto out;
1177  		}
1178  
1179  		rv = mddev->bitmap_ops->get_stats(bitmap, &stats);
1180  		if (rv)
1181  			goto out;
1182  		/*
1183  		 * If we can hold the bitmap lock of one node then
1184  		 * the slot is not occupied, update the pages.
1185  		 */
1186  		snprintf(str, 64, "bitmap%04d", i);
1187  		bm_lockres = lockres_init(mddev, str, NULL, 1);
1188  		if (!bm_lockres) {
1189  			pr_err("Cannot initialize %s lock\n", str);
1190  			goto out;
1191  		}
1192  		bm_lockres->flags |= DLM_LKF_NOQUEUE;
1193  		rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1194  		if (!rv)
1195  			mddev->bitmap_ops->set_pages(bitmap, my_pages);
1196  		lockres_free(bm_lockres);
1197  
1198  		if (my_pages != stats.pages)
1199  			/*
1200  			 * Let's revert the bitmap size if one node
1201  			 * can't resize bitmap
1202  			 */
1203  			goto out;
1204  		mddev->bitmap_ops->free(bitmap);
1205  	}
1206  
1207  	return 0;
1208  out:
1209  	mddev->bitmap_ops->free(bitmap);
1210  	update_bitmap_size(mddev, oldsize);
1211  	return -1;
1212  }
1213  
1214  /*
1215   * return 0 if all the bitmaps have the same sync_size
1216   */
cluster_check_sync_size(struct mddev * mddev)1217  static int cluster_check_sync_size(struct mddev *mddev)
1218  {
1219  	int current_slot = slot_number(mddev);
1220  	int node_num = mddev->bitmap_info.nodes;
1221  	struct dlm_lock_resource *bm_lockres;
1222  	struct md_bitmap_stats stats;
1223  	void *bitmap = mddev->bitmap;
1224  	unsigned long sync_size = 0;
1225  	unsigned long my_sync_size;
1226  	char str[64];
1227  	int i, rv;
1228  
1229  	rv = mddev->bitmap_ops->get_stats(bitmap, &stats);
1230  	if (rv)
1231  		return rv;
1232  
1233  	my_sync_size = stats.sync_size;
1234  
1235  	for (i = 0; i < node_num; i++) {
1236  		if (i == current_slot)
1237  			continue;
1238  
1239  		bitmap = mddev->bitmap_ops->get_from_slot(mddev, i);
1240  		if (IS_ERR(bitmap)) {
1241  			pr_err("can't get bitmap from slot %d\n", i);
1242  			return -1;
1243  		}
1244  
1245  		/*
1246  		 * If we can hold the bitmap lock of one node then
1247  		 * the slot is not occupied, update the sb.
1248  		 */
1249  		snprintf(str, 64, "bitmap%04d", i);
1250  		bm_lockres = lockres_init(mddev, str, NULL, 1);
1251  		if (!bm_lockres) {
1252  			pr_err("md-cluster: Cannot initialize %s\n", str);
1253  			mddev->bitmap_ops->free(bitmap);
1254  			return -1;
1255  		}
1256  		bm_lockres->flags |= DLM_LKF_NOQUEUE;
1257  		rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1258  		if (!rv)
1259  			mddev->bitmap_ops->update_sb(bitmap);
1260  		lockres_free(bm_lockres);
1261  
1262  		rv = mddev->bitmap_ops->get_stats(bitmap, &stats);
1263  		if (rv) {
1264  			mddev->bitmap_ops->free(bitmap);
1265  			return rv;
1266  		}
1267  
1268  		if (sync_size == 0) {
1269  			sync_size = stats.sync_size;
1270  		} else if (sync_size != stats.sync_size) {
1271  			mddev->bitmap_ops->free(bitmap);
1272  			return -1;
1273  		}
1274  		mddev->bitmap_ops->free(bitmap);
1275  	}
1276  
1277  	return (my_sync_size == sync_size) ? 0 : -1;
1278  }
1279  
1280  /*
1281   * Update the size for cluster raid is a little more complex, we perform it
1282   * by the steps:
1283   * 1. hold token lock and update superblock in initiator node.
1284   * 2. send METADATA_UPDATED msg to other nodes.
1285   * 3. The initiator node continues to check each bitmap's sync_size, if all
1286   *    bitmaps have the same value of sync_size, then we can set capacity and
1287   *    let other nodes to perform it. If one node can't update sync_size
1288   *    accordingly, we need to revert to previous value.
1289   */
update_size(struct mddev * mddev,sector_t old_dev_sectors)1290  static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
1291  {
1292  	struct md_cluster_info *cinfo = mddev->cluster_info;
1293  	struct cluster_msg cmsg;
1294  	struct md_rdev *rdev;
1295  	int ret = 0;
1296  	int raid_slot = -1;
1297  
1298  	md_update_sb(mddev, 1);
1299  	if (lock_comm(cinfo, 1)) {
1300  		pr_err("%s: lock_comm failed\n", __func__);
1301  		return;
1302  	}
1303  
1304  	memset(&cmsg, 0, sizeof(cmsg));
1305  	cmsg.type = cpu_to_le32(METADATA_UPDATED);
1306  	rdev_for_each(rdev, mddev)
1307  		if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
1308  			raid_slot = rdev->desc_nr;
1309  			break;
1310  		}
1311  	if (raid_slot >= 0) {
1312  		cmsg.raid_slot = cpu_to_le32(raid_slot);
1313  		/*
1314  		 * We can only change capiticy after all the nodes can do it,
1315  		 * so need to wait after other nodes already received the msg
1316  		 * and handled the change
1317  		 */
1318  		ret = __sendmsg(cinfo, &cmsg);
1319  		if (ret) {
1320  			pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1321  			       __func__, __LINE__);
1322  			unlock_comm(cinfo);
1323  			return;
1324  		}
1325  	} else {
1326  		pr_err("md-cluster: No good device id found to send\n");
1327  		unlock_comm(cinfo);
1328  		return;
1329  	}
1330  
1331  	/*
1332  	 * check the sync_size from other node's bitmap, if sync_size
1333  	 * have already updated in other nodes as expected, send an
1334  	 * empty metadata msg to permit the change of capacity
1335  	 */
1336  	if (cluster_check_sync_size(mddev) == 0) {
1337  		memset(&cmsg, 0, sizeof(cmsg));
1338  		cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
1339  		ret = __sendmsg(cinfo, &cmsg);
1340  		if (ret)
1341  			pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
1342  			       __func__, __LINE__);
1343  		set_capacity_and_notify(mddev->gendisk, mddev->array_sectors);
1344  	} else {
1345  		/* revert to previous sectors */
1346  		ret = mddev->pers->resize(mddev, old_dev_sectors);
1347  		ret = __sendmsg(cinfo, &cmsg);
1348  		if (ret)
1349  			pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1350  			       __func__, __LINE__);
1351  	}
1352  	unlock_comm(cinfo);
1353  }
1354  
resync_start(struct mddev * mddev)1355  static int resync_start(struct mddev *mddev)
1356  {
1357  	struct md_cluster_info *cinfo = mddev->cluster_info;
1358  	return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
1359  }
1360  
resync_info_get(struct mddev * mddev,sector_t * lo,sector_t * hi)1361  static void resync_info_get(struct mddev *mddev, sector_t *lo, sector_t *hi)
1362  {
1363  	struct md_cluster_info *cinfo = mddev->cluster_info;
1364  
1365  	spin_lock_irq(&cinfo->suspend_lock);
1366  	*lo = cinfo->suspend_lo;
1367  	*hi = cinfo->suspend_hi;
1368  	spin_unlock_irq(&cinfo->suspend_lock);
1369  }
1370  
resync_status_get(struct mddev * mddev)1371  static int resync_status_get(struct mddev *mddev)
1372  {
1373  	struct md_cluster_info *cinfo = mddev->cluster_info;
1374  
1375  	return test_bit(MD_CLUSTER_WAITING_FOR_SYNC, &cinfo->state);
1376  }
1377  
resync_start_notify(struct mddev * mddev)1378  static int resync_start_notify(struct mddev *mddev)
1379  {
1380  	struct md_cluster_info *cinfo = mddev->cluster_info;
1381  	struct cluster_msg cmsg = {0};
1382  
1383  	cmsg.type = cpu_to_le32(RESYNCING_START);
1384  
1385  	return sendmsg(cinfo, &cmsg, 0);
1386  }
1387  
resync_info_update(struct mddev * mddev,sector_t lo,sector_t hi)1388  static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
1389  {
1390  	struct md_cluster_info *cinfo = mddev->cluster_info;
1391  	struct resync_info ri;
1392  	struct cluster_msg cmsg = {0};
1393  
1394  	/* do not send zero again, if we have sent before */
1395  	if (hi == 0) {
1396  		memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
1397  		if (le64_to_cpu(ri.hi) == 0)
1398  			return 0;
1399  	}
1400  
1401  	add_resync_info(cinfo->bitmap_lockres, lo, hi);
1402  	/* Re-acquire the lock to refresh LVB */
1403  	dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
1404  	cmsg.type = cpu_to_le32(RESYNCING);
1405  	cmsg.low = cpu_to_le64(lo);
1406  	cmsg.high = cpu_to_le64(hi);
1407  
1408  	/*
1409  	 * mddev_lock is held if resync_info_update is called from
1410  	 * resync_finish (md_reap_sync_thread -> resync_finish)
1411  	 */
1412  	if (lo == 0 && hi == 0)
1413  		return sendmsg(cinfo, &cmsg, 1);
1414  	else
1415  		return sendmsg(cinfo, &cmsg, 0);
1416  }
1417  
resync_finish(struct mddev * mddev)1418  static int resync_finish(struct mddev *mddev)
1419  {
1420  	struct md_cluster_info *cinfo = mddev->cluster_info;
1421  	int ret = 0;
1422  
1423  	clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
1424  
1425  	/*
1426  	 * If resync thread is interrupted so we can't say resync is finished,
1427  	 * another node will launch resync thread to continue.
1428  	 */
1429  	if (!test_bit(MD_CLOSING, &mddev->flags))
1430  		ret = resync_info_update(mddev, 0, 0);
1431  	dlm_unlock_sync(cinfo->resync_lockres);
1432  	return ret;
1433  }
1434  
area_resyncing(struct mddev * mddev,int direction,sector_t lo,sector_t hi)1435  static int area_resyncing(struct mddev *mddev, int direction,
1436  		sector_t lo, sector_t hi)
1437  {
1438  	struct md_cluster_info *cinfo = mddev->cluster_info;
1439  	int ret = 0;
1440  
1441  	if ((direction == READ) &&
1442  		test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
1443  		return 1;
1444  
1445  	spin_lock_irq(&cinfo->suspend_lock);
1446  	if (hi > cinfo->suspend_lo && lo < cinfo->suspend_hi)
1447  		ret = 1;
1448  	spin_unlock_irq(&cinfo->suspend_lock);
1449  	return ret;
1450  }
1451  
1452  /* add_new_disk() - initiates a disk add
1453   * However, if this fails before writing md_update_sb(),
1454   * add_new_disk_cancel() must be called to release token lock
1455   */
add_new_disk(struct mddev * mddev,struct md_rdev * rdev)1456  static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
1457  {
1458  	struct md_cluster_info *cinfo = mddev->cluster_info;
1459  	struct cluster_msg cmsg;
1460  	int ret = 0;
1461  	struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
1462  	char *uuid = sb->device_uuid;
1463  
1464  	memset(&cmsg, 0, sizeof(cmsg));
1465  	cmsg.type = cpu_to_le32(NEWDISK);
1466  	memcpy(cmsg.uuid, uuid, 16);
1467  	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1468  	if (lock_comm(cinfo, 1))
1469  		return -EAGAIN;
1470  	ret = __sendmsg(cinfo, &cmsg);
1471  	if (ret) {
1472  		unlock_comm(cinfo);
1473  		return ret;
1474  	}
1475  	cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
1476  	ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
1477  	cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
1478  	/* Some node does not "see" the device */
1479  	if (ret == -EAGAIN)
1480  		ret = -ENOENT;
1481  	if (ret)
1482  		unlock_comm(cinfo);
1483  	else {
1484  		dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
1485  		/* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
1486  		 * will run soon after add_new_disk, the below path will be
1487  		 * invoked:
1488  		 *   md_wakeup_thread(mddev->thread)
1489  		 *	-> conf->thread (raid1d)
1490  		 *	-> md_check_recovery -> md_update_sb
1491  		 *	-> metadata_update_start/finish
1492  		 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
1493  		 *
1494  		 * For other failure cases, metadata_update_cancel and
1495  		 * add_new_disk_cancel also clear below bit as well.
1496  		 * */
1497  		set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1498  		wake_up(&cinfo->wait);
1499  	}
1500  	return ret;
1501  }
1502  
add_new_disk_cancel(struct mddev * mddev)1503  static void add_new_disk_cancel(struct mddev *mddev)
1504  {
1505  	struct md_cluster_info *cinfo = mddev->cluster_info;
1506  	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1507  	unlock_comm(cinfo);
1508  }
1509  
new_disk_ack(struct mddev * mddev,bool ack)1510  static int new_disk_ack(struct mddev *mddev, bool ack)
1511  {
1512  	struct md_cluster_info *cinfo = mddev->cluster_info;
1513  
1514  	if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
1515  		pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
1516  		return -EINVAL;
1517  	}
1518  
1519  	if (ack)
1520  		dlm_unlock_sync(cinfo->no_new_dev_lockres);
1521  	complete(&cinfo->newdisk_completion);
1522  	return 0;
1523  }
1524  
remove_disk(struct mddev * mddev,struct md_rdev * rdev)1525  static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
1526  {
1527  	struct cluster_msg cmsg = {0};
1528  	struct md_cluster_info *cinfo = mddev->cluster_info;
1529  	cmsg.type = cpu_to_le32(REMOVE);
1530  	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1531  	return sendmsg(cinfo, &cmsg, 1);
1532  }
1533  
lock_all_bitmaps(struct mddev * mddev)1534  static int lock_all_bitmaps(struct mddev *mddev)
1535  {
1536  	int slot, my_slot, ret, held = 1, i = 0;
1537  	char str[64];
1538  	struct md_cluster_info *cinfo = mddev->cluster_info;
1539  
1540  	cinfo->other_bitmap_lockres =
1541  		kcalloc(mddev->bitmap_info.nodes - 1,
1542  			sizeof(struct dlm_lock_resource *), GFP_KERNEL);
1543  	if (!cinfo->other_bitmap_lockres) {
1544  		pr_err("md: can't alloc mem for other bitmap locks\n");
1545  		return 0;
1546  	}
1547  
1548  	my_slot = slot_number(mddev);
1549  	for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
1550  		if (slot == my_slot)
1551  			continue;
1552  
1553  		memset(str, '\0', 64);
1554  		snprintf(str, 64, "bitmap%04d", slot);
1555  		cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
1556  		if (!cinfo->other_bitmap_lockres[i])
1557  			return -ENOMEM;
1558  
1559  		cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
1560  		ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
1561  		if (ret)
1562  			held = -1;
1563  		i++;
1564  	}
1565  
1566  	return held;
1567  }
1568  
unlock_all_bitmaps(struct mddev * mddev)1569  static void unlock_all_bitmaps(struct mddev *mddev)
1570  {
1571  	struct md_cluster_info *cinfo = mddev->cluster_info;
1572  	int i;
1573  
1574  	/* release other node's bitmap lock if they are existed */
1575  	if (cinfo->other_bitmap_lockres) {
1576  		for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
1577  			if (cinfo->other_bitmap_lockres[i]) {
1578  				lockres_free(cinfo->other_bitmap_lockres[i]);
1579  			}
1580  		}
1581  		kfree(cinfo->other_bitmap_lockres);
1582  		cinfo->other_bitmap_lockres = NULL;
1583  	}
1584  }
1585  
gather_bitmaps(struct md_rdev * rdev)1586  static int gather_bitmaps(struct md_rdev *rdev)
1587  {
1588  	int sn, err;
1589  	sector_t lo, hi;
1590  	struct cluster_msg cmsg = {0};
1591  	struct mddev *mddev = rdev->mddev;
1592  	struct md_cluster_info *cinfo = mddev->cluster_info;
1593  
1594  	cmsg.type = cpu_to_le32(RE_ADD);
1595  	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1596  	err = sendmsg(cinfo, &cmsg, 1);
1597  	if (err)
1598  		goto out;
1599  
1600  	for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1601  		if (sn == (cinfo->slot_number - 1))
1602  			continue;
1603  		err = mddev->bitmap_ops->copy_from_slot(mddev, sn, &lo, &hi, false);
1604  		if (err) {
1605  			pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1606  			goto out;
1607  		}
1608  		if ((hi > 0) && (lo < mddev->recovery_cp))
1609  			mddev->recovery_cp = lo;
1610  	}
1611  out:
1612  	return err;
1613  }
1614  
1615  static struct md_cluster_operations cluster_ops = {
1616  	.head = {
1617  		.type	= MD_CLUSTER,
1618  		.id	= ID_CLUSTER,
1619  		.name	= "cluster",
1620  		.owner	= THIS_MODULE,
1621  	},
1622  
1623  	.join   = join,
1624  	.leave  = leave,
1625  	.slot_number = slot_number,
1626  	.resync_start = resync_start,
1627  	.resync_finish = resync_finish,
1628  	.resync_info_update = resync_info_update,
1629  	.resync_start_notify = resync_start_notify,
1630  	.resync_status_get = resync_status_get,
1631  	.resync_info_get = resync_info_get,
1632  	.metadata_update_start = metadata_update_start,
1633  	.metadata_update_finish = metadata_update_finish,
1634  	.metadata_update_cancel = metadata_update_cancel,
1635  	.area_resyncing = area_resyncing,
1636  	.add_new_disk = add_new_disk,
1637  	.add_new_disk_cancel = add_new_disk_cancel,
1638  	.new_disk_ack = new_disk_ack,
1639  	.remove_disk = remove_disk,
1640  	.load_bitmaps = load_bitmaps,
1641  	.gather_bitmaps = gather_bitmaps,
1642  	.resize_bitmaps = resize_bitmaps,
1643  	.lock_all_bitmaps = lock_all_bitmaps,
1644  	.unlock_all_bitmaps = unlock_all_bitmaps,
1645  	.update_size = update_size,
1646  };
1647  
cluster_init(void)1648  static int __init cluster_init(void)
1649  {
1650  	pr_warn("md-cluster: support raid1 and raid10 (limited support)\n");
1651  	pr_info("Registering Cluster MD functions\n");
1652  	return register_md_submodule(&cluster_ops.head);
1653  }
1654  
cluster_exit(void)1655  static void cluster_exit(void)
1656  {
1657  	unregister_md_submodule(&cluster_ops.head);
1658  }
1659  
1660  module_init(cluster_init);
1661  module_exit(cluster_exit);
1662  MODULE_AUTHOR("SUSE");
1663  MODULE_LICENSE("GPL");
1664  MODULE_DESCRIPTION("Clustering support for MD");
1665