xref: /linux/fs/dlm/lockspace.c (revision 0de1e84263a2cc47fc642bcccee589a1e5220792)
1  // SPDX-License-Identifier: GPL-2.0-only
2  /******************************************************************************
3  *******************************************************************************
4  **
5  **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6  **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7  **
8  **
9  *******************************************************************************
10  ******************************************************************************/
11  
12  #include <linux/module.h>
13  
14  #include "dlm_internal.h"
15  #include "lockspace.h"
16  #include "member.h"
17  #include "recoverd.h"
18  #include "dir.h"
19  #include "midcomms.h"
20  #include "config.h"
21  #include "memory.h"
22  #include "lock.h"
23  #include "recover.h"
24  #include "requestqueue.h"
25  #include "user.h"
26  #include "ast.h"
27  
28  static int			ls_count;
29  static struct mutex		ls_lock;
30  static struct list_head		lslist;
31  static spinlock_t		lslist_lock;
32  
dlm_control_store(struct dlm_ls * ls,const char * buf,size_t len)33  static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34  {
35  	ssize_t ret = len;
36  	int n;
37  	int rc = kstrtoint(buf, 0, &n);
38  
39  	if (rc)
40  		return rc;
41  	ls = dlm_find_lockspace_local(ls);
42  	if (!ls)
43  		return -EINVAL;
44  
45  	switch (n) {
46  	case 0:
47  		dlm_ls_stop(ls);
48  		break;
49  	case 1:
50  		dlm_ls_start(ls);
51  		break;
52  	default:
53  		ret = -EINVAL;
54  	}
55  	dlm_put_lockspace(ls);
56  	return ret;
57  }
58  
dlm_event_store(struct dlm_ls * ls,const char * buf,size_t len)59  static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60  {
61  	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62  
63  	if (rc)
64  		return rc;
65  	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66  	wake_up(&ls->ls_uevent_wait);
67  	return len;
68  }
69  
dlm_id_show(struct dlm_ls * ls,char * buf)70  static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71  {
72  	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73  }
74  
dlm_id_store(struct dlm_ls * ls,const char * buf,size_t len)75  static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76  {
77  	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78  
79  	if (rc)
80  		return rc;
81  	return len;
82  }
83  
dlm_nodir_show(struct dlm_ls * ls,char * buf)84  static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85  {
86  	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87  }
88  
dlm_nodir_store(struct dlm_ls * ls,const char * buf,size_t len)89  static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90  {
91  	int val;
92  	int rc = kstrtoint(buf, 0, &val);
93  
94  	if (rc)
95  		return rc;
96  	if (val == 1)
97  		set_bit(LSFL_NODIR, &ls->ls_flags);
98  	return len;
99  }
100  
dlm_recover_status_show(struct dlm_ls * ls,char * buf)101  static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102  {
103  	uint32_t status = dlm_recover_status(ls);
104  	return snprintf(buf, PAGE_SIZE, "%x\n", status);
105  }
106  
dlm_recover_nodeid_show(struct dlm_ls * ls,char * buf)107  static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108  {
109  	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110  }
111  
112  struct dlm_attr {
113  	struct attribute attr;
114  	ssize_t (*show)(struct dlm_ls *, char *);
115  	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116  };
117  
118  static struct dlm_attr dlm_attr_control = {
119  	.attr  = {.name = "control", .mode = S_IWUSR},
120  	.store = dlm_control_store
121  };
122  
123  static struct dlm_attr dlm_attr_event = {
124  	.attr  = {.name = "event_done", .mode = S_IWUSR},
125  	.store = dlm_event_store
126  };
127  
128  static struct dlm_attr dlm_attr_id = {
129  	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130  	.show  = dlm_id_show,
131  	.store = dlm_id_store
132  };
133  
134  static struct dlm_attr dlm_attr_nodir = {
135  	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136  	.show  = dlm_nodir_show,
137  	.store = dlm_nodir_store
138  };
139  
140  static struct dlm_attr dlm_attr_recover_status = {
141  	.attr  = {.name = "recover_status", .mode = S_IRUGO},
142  	.show  = dlm_recover_status_show
143  };
144  
145  static struct dlm_attr dlm_attr_recover_nodeid = {
146  	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
147  	.show  = dlm_recover_nodeid_show
148  };
149  
150  static struct attribute *dlm_attrs[] = {
151  	&dlm_attr_control.attr,
152  	&dlm_attr_event.attr,
153  	&dlm_attr_id.attr,
154  	&dlm_attr_nodir.attr,
155  	&dlm_attr_recover_status.attr,
156  	&dlm_attr_recover_nodeid.attr,
157  	NULL,
158  };
159  ATTRIBUTE_GROUPS(dlm);
160  
dlm_attr_show(struct kobject * kobj,struct attribute * attr,char * buf)161  static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162  			     char *buf)
163  {
164  	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
165  	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166  	return a->show ? a->show(ls, buf) : 0;
167  }
168  
dlm_attr_store(struct kobject * kobj,struct attribute * attr,const char * buf,size_t len)169  static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170  			      const char *buf, size_t len)
171  {
172  	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
173  	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174  	return a->store ? a->store(ls, buf, len) : len;
175  }
176  
177  static const struct sysfs_ops dlm_attr_ops = {
178  	.show  = dlm_attr_show,
179  	.store = dlm_attr_store,
180  };
181  
182  static struct kobj_type dlm_ktype = {
183  	.default_groups = dlm_groups,
184  	.sysfs_ops     = &dlm_attr_ops,
185  };
186  
187  static struct kset *dlm_kset;
188  
do_uevent(struct dlm_ls * ls,int in)189  static int do_uevent(struct dlm_ls *ls, int in)
190  {
191  	if (in)
192  		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
193  	else
194  		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
195  
196  	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
197  
198  	/* dlm_controld will see the uevent, do the necessary group management
199  	   and then write to sysfs to wake us */
200  
201  	wait_event(ls->ls_uevent_wait,
202  		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
203  
204  	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
205  
206  	return ls->ls_uevent_result;
207  }
208  
dlm_uevent(const struct kobject * kobj,struct kobj_uevent_env * env)209  static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
210  {
211  	const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
212  
213  	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
214  	return 0;
215  }
216  
217  static const struct kset_uevent_ops dlm_uevent_ops = {
218  	.uevent = dlm_uevent,
219  };
220  
dlm_lockspace_init(void)221  int __init dlm_lockspace_init(void)
222  {
223  	ls_count = 0;
224  	mutex_init(&ls_lock);
225  	INIT_LIST_HEAD(&lslist);
226  	spin_lock_init(&lslist_lock);
227  
228  	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
229  	if (!dlm_kset) {
230  		printk(KERN_WARNING "%s: can not create kset\n", __func__);
231  		return -ENOMEM;
232  	}
233  	return 0;
234  }
235  
dlm_lockspace_exit(void)236  void dlm_lockspace_exit(void)
237  {
238  	kset_unregister(dlm_kset);
239  }
240  
dlm_find_lockspace_global(uint32_t id)241  struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
242  {
243  	struct dlm_ls *ls;
244  
245  	spin_lock_bh(&lslist_lock);
246  
247  	list_for_each_entry(ls, &lslist, ls_list) {
248  		if (ls->ls_global_id == id) {
249  			atomic_inc(&ls->ls_count);
250  			goto out;
251  		}
252  	}
253  	ls = NULL;
254   out:
255  	spin_unlock_bh(&lslist_lock);
256  	return ls;
257  }
258  
dlm_find_lockspace_local(dlm_lockspace_t * lockspace)259  struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
260  {
261  	struct dlm_ls *ls = lockspace;
262  
263  	atomic_inc(&ls->ls_count);
264  	return ls;
265  }
266  
dlm_find_lockspace_device(int minor)267  struct dlm_ls *dlm_find_lockspace_device(int minor)
268  {
269  	struct dlm_ls *ls;
270  
271  	spin_lock_bh(&lslist_lock);
272  	list_for_each_entry(ls, &lslist, ls_list) {
273  		if (ls->ls_device.minor == minor) {
274  			atomic_inc(&ls->ls_count);
275  			goto out;
276  		}
277  	}
278  	ls = NULL;
279   out:
280  	spin_unlock_bh(&lslist_lock);
281  	return ls;
282  }
283  
dlm_put_lockspace(struct dlm_ls * ls)284  void dlm_put_lockspace(struct dlm_ls *ls)
285  {
286  	if (atomic_dec_and_test(&ls->ls_count))
287  		wake_up(&ls->ls_count_wait);
288  }
289  
remove_lockspace(struct dlm_ls * ls)290  static void remove_lockspace(struct dlm_ls *ls)
291  {
292  retry:
293  	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
294  
295  	spin_lock_bh(&lslist_lock);
296  	if (atomic_read(&ls->ls_count) != 0) {
297  		spin_unlock_bh(&lslist_lock);
298  		goto retry;
299  	}
300  
301  	WARN_ON(ls->ls_create_count != 0);
302  	list_del(&ls->ls_list);
303  	spin_unlock_bh(&lslist_lock);
304  }
305  
threads_start(void)306  static int threads_start(void)
307  {
308  	int error;
309  
310  	/* Thread for sending/receiving messages for all lockspace's */
311  	error = dlm_midcomms_start();
312  	if (error)
313  		log_print("cannot start dlm midcomms %d", error);
314  
315  	return error;
316  }
317  
lkb_idr_free(struct dlm_lkb * lkb)318  static int lkb_idr_free(struct dlm_lkb *lkb)
319  {
320  	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
321  		dlm_free_lvb(lkb->lkb_lvbptr);
322  
323  	dlm_free_lkb(lkb);
324  	return 0;
325  }
326  
rhash_free_rsb(void * ptr,void * arg)327  static void rhash_free_rsb(void *ptr, void *arg)
328  {
329  	struct dlm_rsb *rsb = ptr;
330  
331  	dlm_free_rsb(rsb);
332  }
333  
free_lockspace(struct work_struct * work)334  static void free_lockspace(struct work_struct *work)
335  {
336  	struct dlm_ls *ls  = container_of(work, struct dlm_ls, ls_free_work);
337  	struct dlm_lkb *lkb;
338  	unsigned long id;
339  
340  	/*
341  	 * Free all lkb's in xa
342  	 */
343  	xa_for_each(&ls->ls_lkbxa, id, lkb) {
344  		lkb_idr_free(lkb);
345  	}
346  	xa_destroy(&ls->ls_lkbxa);
347  
348  	/*
349  	 * Free all rsb's on rsbtbl
350  	 */
351  	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
352  
353  	kfree(ls);
354  }
355  
new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)356  static int new_lockspace(const char *name, const char *cluster,
357  			 uint32_t flags, int lvblen,
358  			 const struct dlm_lockspace_ops *ops, void *ops_arg,
359  			 int *ops_result, dlm_lockspace_t **lockspace)
360  {
361  	struct dlm_ls *ls;
362  	int namelen = strlen(name);
363  	int error;
364  
365  	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
366  		return -EINVAL;
367  
368  	if (lvblen % 8)
369  		return -EINVAL;
370  
371  	if (!try_module_get(THIS_MODULE))
372  		return -EINVAL;
373  
374  	if (!dlm_user_daemon_available()) {
375  		log_print("dlm user daemon not available");
376  		error = -EUNATCH;
377  		goto out;
378  	}
379  
380  	if (ops && ops_result) {
381  	       	if (!dlm_config.ci_recover_callbacks)
382  			*ops_result = -EOPNOTSUPP;
383  		else
384  			*ops_result = 0;
385  	}
386  
387  	if (!cluster)
388  		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
389  			  dlm_config.ci_cluster_name);
390  
391  	if (dlm_config.ci_recover_callbacks && cluster &&
392  	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
393  		log_print("dlm cluster name '%s' does not match "
394  			  "the application cluster name '%s'",
395  			  dlm_config.ci_cluster_name, cluster);
396  		error = -EBADR;
397  		goto out;
398  	}
399  
400  	error = 0;
401  
402  	spin_lock_bh(&lslist_lock);
403  	list_for_each_entry(ls, &lslist, ls_list) {
404  		WARN_ON(ls->ls_create_count <= 0);
405  		if (ls->ls_namelen != namelen)
406  			continue;
407  		if (memcmp(ls->ls_name, name, namelen))
408  			continue;
409  		if (flags & DLM_LSFL_NEWEXCL) {
410  			error = -EEXIST;
411  			break;
412  		}
413  		ls->ls_create_count++;
414  		*lockspace = ls;
415  		error = 1;
416  		break;
417  	}
418  	spin_unlock_bh(&lslist_lock);
419  
420  	if (error)
421  		goto out;
422  
423  	error = -ENOMEM;
424  
425  	ls = kzalloc(sizeof(*ls), GFP_NOFS);
426  	if (!ls)
427  		goto out;
428  	memcpy(ls->ls_name, name, namelen);
429  	ls->ls_namelen = namelen;
430  	ls->ls_lvblen = lvblen;
431  	atomic_set(&ls->ls_count, 0);
432  	init_waitqueue_head(&ls->ls_count_wait);
433  	ls->ls_flags = 0;
434  
435  	if (ops && dlm_config.ci_recover_callbacks) {
436  		ls->ls_ops = ops;
437  		ls->ls_ops_arg = ops_arg;
438  	}
439  
440  	if (flags & DLM_LSFL_SOFTIRQ)
441  		set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
442  
443  	/* ls_exflags are forced to match among nodes, and we don't
444  	 * need to require all nodes to have some flags set
445  	 */
446  	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
447  				    DLM_LSFL_SOFTIRQ));
448  
449  	INIT_LIST_HEAD(&ls->ls_slow_inactive);
450  	INIT_LIST_HEAD(&ls->ls_slow_active);
451  	rwlock_init(&ls->ls_rsbtbl_lock);
452  
453  	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
454  	if (error)
455  		goto out_lsfree;
456  
457  	xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
458  	rwlock_init(&ls->ls_lkbxa_lock);
459  
460  	INIT_LIST_HEAD(&ls->ls_waiters);
461  	spin_lock_init(&ls->ls_waiters_lock);
462  	INIT_LIST_HEAD(&ls->ls_orphans);
463  	spin_lock_init(&ls->ls_orphans_lock);
464  
465  	INIT_LIST_HEAD(&ls->ls_nodes);
466  	INIT_LIST_HEAD(&ls->ls_nodes_gone);
467  	ls->ls_num_nodes = 0;
468  	ls->ls_low_nodeid = 0;
469  	ls->ls_total_weight = 0;
470  	ls->ls_node_array = NULL;
471  
472  	memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
473  	ls->ls_local_rsb.res_ls = ls;
474  
475  	ls->ls_debug_rsb_dentry = NULL;
476  	ls->ls_debug_waiters_dentry = NULL;
477  
478  	init_waitqueue_head(&ls->ls_uevent_wait);
479  	ls->ls_uevent_result = 0;
480  	init_completion(&ls->ls_recovery_done);
481  	ls->ls_recovery_result = -1;
482  
483  	spin_lock_init(&ls->ls_cb_lock);
484  	INIT_LIST_HEAD(&ls->ls_cb_delay);
485  
486  	INIT_WORK(&ls->ls_free_work, free_lockspace);
487  
488  	ls->ls_recoverd_task = NULL;
489  	mutex_init(&ls->ls_recoverd_active);
490  	spin_lock_init(&ls->ls_recover_lock);
491  	spin_lock_init(&ls->ls_rcom_spin);
492  	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
493  	ls->ls_recover_status = 0;
494  	ls->ls_recover_seq = get_random_u64();
495  	ls->ls_recover_args = NULL;
496  	init_rwsem(&ls->ls_in_recovery);
497  	rwlock_init(&ls->ls_recv_active);
498  	INIT_LIST_HEAD(&ls->ls_requestqueue);
499  	rwlock_init(&ls->ls_requestqueue_lock);
500  	spin_lock_init(&ls->ls_clear_proc_locks);
501  
502  	/* Due backwards compatibility with 3.1 we need to use maximum
503  	 * possible dlm message size to be sure the message will fit and
504  	 * not having out of bounds issues. However on sending side 3.2
505  	 * might send less.
506  	 */
507  	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
508  	if (!ls->ls_recover_buf) {
509  		error = -ENOMEM;
510  		goto out_lkbxa;
511  	}
512  
513  	ls->ls_slot = 0;
514  	ls->ls_num_slots = 0;
515  	ls->ls_slots_size = 0;
516  	ls->ls_slots = NULL;
517  
518  	INIT_LIST_HEAD(&ls->ls_recover_list);
519  	spin_lock_init(&ls->ls_recover_list_lock);
520  	xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
521  	spin_lock_init(&ls->ls_recover_xa_lock);
522  	ls->ls_recover_list_count = 0;
523  	init_waitqueue_head(&ls->ls_wait_general);
524  	INIT_LIST_HEAD(&ls->ls_masters_list);
525  	rwlock_init(&ls->ls_masters_lock);
526  	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
527  	rwlock_init(&ls->ls_dir_dump_lock);
528  
529  	INIT_LIST_HEAD(&ls->ls_scan_list);
530  	spin_lock_init(&ls->ls_scan_lock);
531  	timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
532  
533  	spin_lock_bh(&lslist_lock);
534  	ls->ls_create_count = 1;
535  	list_add(&ls->ls_list, &lslist);
536  	spin_unlock_bh(&lslist_lock);
537  
538  	if (flags & DLM_LSFL_FS)
539  		set_bit(LSFL_FS, &ls->ls_flags);
540  
541  	error = dlm_callback_start(ls);
542  	if (error) {
543  		log_error(ls, "can't start dlm_callback %d", error);
544  		goto out_delist;
545  	}
546  
547  	init_waitqueue_head(&ls->ls_recover_lock_wait);
548  
549  	/*
550  	 * Once started, dlm_recoverd first looks for ls in lslist, then
551  	 * initializes ls_in_recovery as locked in "down" mode.  We need
552  	 * to wait for the wakeup from dlm_recoverd because in_recovery
553  	 * has to start out in down mode.
554  	 */
555  
556  	error = dlm_recoverd_start(ls);
557  	if (error) {
558  		log_error(ls, "can't start dlm_recoverd %d", error);
559  		goto out_callback;
560  	}
561  
562  	wait_event(ls->ls_recover_lock_wait,
563  		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
564  
565  	ls->ls_kobj.kset = dlm_kset;
566  	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
567  				     "%s", ls->ls_name);
568  	if (error)
569  		goto out_recoverd;
570  	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
571  
572  	/* This uevent triggers dlm_controld in userspace to add us to the
573  	   group of nodes that are members of this lockspace (managed by the
574  	   cluster infrastructure.)  Once it's done that, it tells us who the
575  	   current lockspace members are (via configfs) and then tells the
576  	   lockspace to start running (via sysfs) in dlm_ls_start(). */
577  
578  	error = do_uevent(ls, 1);
579  	if (error < 0)
580  		goto out_recoverd;
581  
582  	/* wait until recovery is successful or failed */
583  	wait_for_completion(&ls->ls_recovery_done);
584  	error = ls->ls_recovery_result;
585  	if (error)
586  		goto out_members;
587  
588  	dlm_create_debug_file(ls);
589  
590  	log_rinfo(ls, "join complete");
591  	*lockspace = ls;
592  	return 0;
593  
594   out_members:
595  	do_uevent(ls, 0);
596  	dlm_clear_members(ls);
597  	kfree(ls->ls_node_array);
598   out_recoverd:
599  	dlm_recoverd_stop(ls);
600   out_callback:
601  	dlm_callback_stop(ls);
602   out_delist:
603  	spin_lock_bh(&lslist_lock);
604  	list_del(&ls->ls_list);
605  	spin_unlock_bh(&lslist_lock);
606  	xa_destroy(&ls->ls_recover_xa);
607  	kfree(ls->ls_recover_buf);
608   out_lkbxa:
609  	xa_destroy(&ls->ls_lkbxa);
610  	rhashtable_destroy(&ls->ls_rsbtbl);
611   out_lsfree:
612  	kobject_put(&ls->ls_kobj);
613  	kfree(ls);
614   out:
615  	module_put(THIS_MODULE);
616  	return error;
617  }
618  
__dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)619  static int __dlm_new_lockspace(const char *name, const char *cluster,
620  			       uint32_t flags, int lvblen,
621  			       const struct dlm_lockspace_ops *ops,
622  			       void *ops_arg, int *ops_result,
623  			       dlm_lockspace_t **lockspace)
624  {
625  	int error = 0;
626  
627  	mutex_lock(&ls_lock);
628  	if (!ls_count)
629  		error = threads_start();
630  	if (error)
631  		goto out;
632  
633  	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
634  			      ops_result, lockspace);
635  	if (!error)
636  		ls_count++;
637  	if (error > 0)
638  		error = 0;
639  	if (!ls_count) {
640  		dlm_midcomms_shutdown();
641  		dlm_midcomms_stop();
642  	}
643   out:
644  	mutex_unlock(&ls_lock);
645  	return error;
646  }
647  
dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)648  int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
649  		      int lvblen, const struct dlm_lockspace_ops *ops,
650  		      void *ops_arg, int *ops_result,
651  		      dlm_lockspace_t **lockspace)
652  {
653  	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
654  				   ops, ops_arg, ops_result, lockspace);
655  }
656  
dlm_new_user_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)657  int dlm_new_user_lockspace(const char *name, const char *cluster,
658  			   uint32_t flags, int lvblen,
659  			   const struct dlm_lockspace_ops *ops,
660  			   void *ops_arg, int *ops_result,
661  			   dlm_lockspace_t **lockspace)
662  {
663  	if (flags & DLM_LSFL_SOFTIRQ)
664  		return -EINVAL;
665  
666  	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
667  				   ops_arg, ops_result, lockspace);
668  }
669  
670  /* NOTE: We check the lkbxa here rather than the resource table.
671     This is because there may be LKBs queued as ASTs that have been unlinked
672     from their RSBs and are pending deletion once the AST has been delivered */
673  
lockspace_busy(struct dlm_ls * ls,int force)674  static int lockspace_busy(struct dlm_ls *ls, int force)
675  {
676  	struct dlm_lkb *lkb;
677  	unsigned long id;
678  	int rv = 0;
679  
680  	read_lock_bh(&ls->ls_lkbxa_lock);
681  	if (force == 0) {
682  		xa_for_each(&ls->ls_lkbxa, id, lkb) {
683  			rv = 1;
684  			break;
685  		}
686  	} else if (force == 1) {
687  		xa_for_each(&ls->ls_lkbxa, id, lkb) {
688  			if (lkb->lkb_nodeid == 0 &&
689  			    lkb->lkb_grmode != DLM_LOCK_IV) {
690  				rv = 1;
691  				break;
692  			}
693  		}
694  	} else {
695  		rv = 0;
696  	}
697  	read_unlock_bh(&ls->ls_lkbxa_lock);
698  	return rv;
699  }
700  
release_lockspace(struct dlm_ls * ls,int force)701  static int release_lockspace(struct dlm_ls *ls, int force)
702  {
703  	int busy, rv;
704  
705  	busy = lockspace_busy(ls, force);
706  
707  	spin_lock_bh(&lslist_lock);
708  	if (ls->ls_create_count == 1) {
709  		if (busy) {
710  			rv = -EBUSY;
711  		} else {
712  			/* remove_lockspace takes ls off lslist */
713  			ls->ls_create_count = 0;
714  			rv = 0;
715  		}
716  	} else if (ls->ls_create_count > 1) {
717  		rv = --ls->ls_create_count;
718  	} else {
719  		rv = -EINVAL;
720  	}
721  	spin_unlock_bh(&lslist_lock);
722  
723  	if (rv) {
724  		log_debug(ls, "release_lockspace no remove %d", rv);
725  		return rv;
726  	}
727  
728  	if (ls_count == 1)
729  		dlm_midcomms_version_wait();
730  
731  	dlm_device_deregister(ls);
732  
733  	if (force < 3 && dlm_user_daemon_available())
734  		do_uevent(ls, 0);
735  
736  	dlm_recoverd_stop(ls);
737  
738  	/* clear the LSFL_RUNNING flag to fast up
739  	 * time_shutdown_sync(), we don't care anymore
740  	 */
741  	clear_bit(LSFL_RUNNING, &ls->ls_flags);
742  	timer_shutdown_sync(&ls->ls_scan_timer);
743  
744  	if (ls_count == 1) {
745  		dlm_clear_members(ls);
746  		dlm_midcomms_shutdown();
747  	}
748  
749  	dlm_callback_stop(ls);
750  
751  	remove_lockspace(ls);
752  
753  	dlm_delete_debug_file(ls);
754  
755  	kobject_put(&ls->ls_kobj);
756  
757  	xa_destroy(&ls->ls_recover_xa);
758  	kfree(ls->ls_recover_buf);
759  
760  	/*
761  	 * Free structures on any other lists
762  	 */
763  
764  	dlm_purge_requestqueue(ls);
765  	kfree(ls->ls_recover_args);
766  	dlm_clear_members(ls);
767  	dlm_clear_members_gone(ls);
768  	kfree(ls->ls_node_array);
769  
770  	log_rinfo(ls, "%s final free", __func__);
771  
772  	/* delayed free of data structures see free_lockspace() */
773  	queue_work(dlm_wq, &ls->ls_free_work);
774  	module_put(THIS_MODULE);
775  	return 0;
776  }
777  
778  /*
779   * Called when a system has released all its locks and is not going to use the
780   * lockspace any longer.  We free everything we're managing for this lockspace.
781   * Remaining nodes will go through the recovery process as if we'd died.  The
782   * lockspace must continue to function as usual, participating in recoveries,
783   * until this returns.
784   *
785   * Force has 4 possible values:
786   * 0 - don't destroy lockspace if it has any LKBs
787   * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
788   * 2 - destroy lockspace regardless of LKBs
789   * 3 - destroy lockspace as part of a forced shutdown
790   */
791  
dlm_release_lockspace(void * lockspace,int force)792  int dlm_release_lockspace(void *lockspace, int force)
793  {
794  	struct dlm_ls *ls;
795  	int error;
796  
797  	ls = dlm_find_lockspace_local(lockspace);
798  	if (!ls)
799  		return -EINVAL;
800  	dlm_put_lockspace(ls);
801  
802  	mutex_lock(&ls_lock);
803  	error = release_lockspace(ls, force);
804  	if (!error)
805  		ls_count--;
806  	if (!ls_count)
807  		dlm_midcomms_stop();
808  	mutex_unlock(&ls_lock);
809  
810  	return error;
811  }
812  
dlm_stop_lockspaces(void)813  void dlm_stop_lockspaces(void)
814  {
815  	struct dlm_ls *ls;
816  	int count;
817  
818   restart:
819  	count = 0;
820  	spin_lock_bh(&lslist_lock);
821  	list_for_each_entry(ls, &lslist, ls_list) {
822  		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
823  			count++;
824  			continue;
825  		}
826  		spin_unlock_bh(&lslist_lock);
827  		log_error(ls, "no userland control daemon, stopping lockspace");
828  		dlm_ls_stop(ls);
829  		goto restart;
830  	}
831  	spin_unlock_bh(&lslist_lock);
832  
833  	if (count)
834  		log_print("dlm user daemon left %d lockspaces", count);
835  }
836