xref: /linux/fs/dlm/lockspace.c (revision 3c4fc7bf4c9e66fe71abcbf93f62f4ddb89b7f15)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27 #include "ast.h"
28 
29 static int			ls_count;
30 static struct mutex		ls_lock;
31 static struct list_head		lslist;
32 static spinlock_t		lslist_lock;
33 static struct task_struct *	scand_task;
34 
35 
36 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 {
38 	ssize_t ret = len;
39 	int n;
40 	int rc = kstrtoint(buf, 0, &n);
41 
42 	if (rc)
43 		return rc;
44 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
45 	if (!ls)
46 		return -EINVAL;
47 
48 	switch (n) {
49 	case 0:
50 		dlm_ls_stop(ls);
51 		break;
52 	case 1:
53 		dlm_ls_start(ls);
54 		break;
55 	default:
56 		ret = -EINVAL;
57 	}
58 	dlm_put_lockspace(ls);
59 	return ret;
60 }
61 
62 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
63 {
64 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
65 
66 	if (rc)
67 		return rc;
68 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69 	wake_up(&ls->ls_uevent_wait);
70 	return len;
71 }
72 
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77 
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
81 
82 	if (rc)
83 		return rc;
84 	return len;
85 }
86 
87 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
88 {
89 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
90 }
91 
92 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
93 {
94 	int val;
95 	int rc = kstrtoint(buf, 0, &val);
96 
97 	if (rc)
98 		return rc;
99 	if (val == 1)
100 		set_bit(LSFL_NODIR, &ls->ls_flags);
101 	return len;
102 }
103 
104 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
105 {
106 	uint32_t status = dlm_recover_status(ls);
107 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
108 }
109 
110 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
111 {
112 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
113 }
114 
115 struct dlm_attr {
116 	struct attribute attr;
117 	ssize_t (*show)(struct dlm_ls *, char *);
118 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
119 };
120 
121 static struct dlm_attr dlm_attr_control = {
122 	.attr  = {.name = "control", .mode = S_IWUSR},
123 	.store = dlm_control_store
124 };
125 
126 static struct dlm_attr dlm_attr_event = {
127 	.attr  = {.name = "event_done", .mode = S_IWUSR},
128 	.store = dlm_event_store
129 };
130 
131 static struct dlm_attr dlm_attr_id = {
132 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
133 	.show  = dlm_id_show,
134 	.store = dlm_id_store
135 };
136 
137 static struct dlm_attr dlm_attr_nodir = {
138 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
139 	.show  = dlm_nodir_show,
140 	.store = dlm_nodir_store
141 };
142 
143 static struct dlm_attr dlm_attr_recover_status = {
144 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
145 	.show  = dlm_recover_status_show
146 };
147 
148 static struct dlm_attr dlm_attr_recover_nodeid = {
149 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
150 	.show  = dlm_recover_nodeid_show
151 };
152 
153 static struct attribute *dlm_attrs[] = {
154 	&dlm_attr_control.attr,
155 	&dlm_attr_event.attr,
156 	&dlm_attr_id.attr,
157 	&dlm_attr_nodir.attr,
158 	&dlm_attr_recover_status.attr,
159 	&dlm_attr_recover_nodeid.attr,
160 	NULL,
161 };
162 ATTRIBUTE_GROUPS(dlm);
163 
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165 			     char *buf)
166 {
167 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169 	return a->show ? a->show(ls, buf) : 0;
170 }
171 
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173 			      const char *buf, size_t len)
174 {
175 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177 	return a->store ? a->store(ls, buf, len) : len;
178 }
179 
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183 	kfree(ls);
184 }
185 
186 static const struct sysfs_ops dlm_attr_ops = {
187 	.show  = dlm_attr_show,
188 	.store = dlm_attr_store,
189 };
190 
191 static struct kobj_type dlm_ktype = {
192 	.default_groups = dlm_groups,
193 	.sysfs_ops     = &dlm_attr_ops,
194 	.release       = lockspace_kobj_release,
195 };
196 
197 static struct kset *dlm_kset;
198 
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201 	if (in)
202 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203 	else
204 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205 
206 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207 
208 	/* dlm_controld will see the uevent, do the necessary group management
209 	   and then write to sysfs to wake us */
210 
211 	wait_event(ls->ls_uevent_wait,
212 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213 
214 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
215 
216 	return ls->ls_uevent_result;
217 }
218 
219 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
220 {
221 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222 
223 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224 	return 0;
225 }
226 
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228 	.uevent = dlm_uevent,
229 };
230 
231 int __init dlm_lockspace_init(void)
232 {
233 	ls_count = 0;
234 	mutex_init(&ls_lock);
235 	INIT_LIST_HEAD(&lslist);
236 	spin_lock_init(&lslist_lock);
237 
238 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239 	if (!dlm_kset) {
240 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
241 		return -ENOMEM;
242 	}
243 	return 0;
244 }
245 
246 void dlm_lockspace_exit(void)
247 {
248 	kset_unregister(dlm_kset);
249 }
250 
251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253 	struct dlm_ls *ls;
254 
255 	spin_lock(&lslist_lock);
256 	list_for_each_entry(ls, &lslist, ls_list) {
257 		if (time_after_eq(jiffies, ls->ls_scan_time +
258 					    dlm_config.ci_scan_secs * HZ)) {
259 			spin_unlock(&lslist_lock);
260 			return ls;
261 		}
262 	}
263 	spin_unlock(&lslist_lock);
264 	return NULL;
265 }
266 
267 static int dlm_scand(void *data)
268 {
269 	struct dlm_ls *ls;
270 
271 	while (!kthread_should_stop()) {
272 		ls = find_ls_to_scan();
273 		if (ls) {
274 			if (dlm_lock_recovery_try(ls)) {
275 				ls->ls_scan_time = jiffies;
276 				dlm_scan_rsbs(ls);
277 				dlm_scan_timeout(ls);
278 				dlm_unlock_recovery(ls);
279 			} else {
280 				ls->ls_scan_time += HZ;
281 			}
282 			continue;
283 		}
284 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
285 	}
286 	return 0;
287 }
288 
289 static int dlm_scand_start(void)
290 {
291 	struct task_struct *p;
292 	int error = 0;
293 
294 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
295 	if (IS_ERR(p))
296 		error = PTR_ERR(p);
297 	else
298 		scand_task = p;
299 	return error;
300 }
301 
302 static void dlm_scand_stop(void)
303 {
304 	kthread_stop(scand_task);
305 }
306 
307 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
308 {
309 	struct dlm_ls *ls;
310 
311 	spin_lock(&lslist_lock);
312 
313 	list_for_each_entry(ls, &lslist, ls_list) {
314 		if (ls->ls_global_id == id) {
315 			atomic_inc(&ls->ls_count);
316 			goto out;
317 		}
318 	}
319 	ls = NULL;
320  out:
321 	spin_unlock(&lslist_lock);
322 	return ls;
323 }
324 
325 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
326 {
327 	struct dlm_ls *ls;
328 
329 	spin_lock(&lslist_lock);
330 	list_for_each_entry(ls, &lslist, ls_list) {
331 		if (ls->ls_local_handle == lockspace) {
332 			atomic_inc(&ls->ls_count);
333 			goto out;
334 		}
335 	}
336 	ls = NULL;
337  out:
338 	spin_unlock(&lslist_lock);
339 	return ls;
340 }
341 
342 struct dlm_ls *dlm_find_lockspace_device(int minor)
343 {
344 	struct dlm_ls *ls;
345 
346 	spin_lock(&lslist_lock);
347 	list_for_each_entry(ls, &lslist, ls_list) {
348 		if (ls->ls_device.minor == minor) {
349 			atomic_inc(&ls->ls_count);
350 			goto out;
351 		}
352 	}
353 	ls = NULL;
354  out:
355 	spin_unlock(&lslist_lock);
356 	return ls;
357 }
358 
359 void dlm_put_lockspace(struct dlm_ls *ls)
360 {
361 	if (atomic_dec_and_test(&ls->ls_count))
362 		wake_up(&ls->ls_count_wait);
363 }
364 
365 static void remove_lockspace(struct dlm_ls *ls)
366 {
367 retry:
368 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
369 
370 	spin_lock(&lslist_lock);
371 	if (atomic_read(&ls->ls_count) != 0) {
372 		spin_unlock(&lslist_lock);
373 		goto retry;
374 	}
375 
376 	WARN_ON(ls->ls_create_count != 0);
377 	list_del(&ls->ls_list);
378 	spin_unlock(&lslist_lock);
379 }
380 
381 static int threads_start(void)
382 {
383 	int error;
384 
385 	error = dlm_scand_start();
386 	if (error) {
387 		log_print("cannot start dlm_scand thread %d", error);
388 		goto fail;
389 	}
390 
391 	/* Thread for sending/receiving messages for all lockspace's */
392 	error = dlm_midcomms_start();
393 	if (error) {
394 		log_print("cannot start dlm lowcomms %d", error);
395 		goto scand_fail;
396 	}
397 
398 	return 0;
399 
400  scand_fail:
401 	dlm_scand_stop();
402  fail:
403 	return error;
404 }
405 
406 static int new_lockspace(const char *name, const char *cluster,
407 			 uint32_t flags, int lvblen,
408 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
409 			 int *ops_result, dlm_lockspace_t **lockspace)
410 {
411 	struct dlm_ls *ls;
412 	int i, size, error;
413 	int do_unreg = 0;
414 	int namelen = strlen(name);
415 
416 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
417 		return -EINVAL;
418 
419 	if (!lvblen || (lvblen % 8))
420 		return -EINVAL;
421 
422 	if (!try_module_get(THIS_MODULE))
423 		return -EINVAL;
424 
425 	if (!dlm_user_daemon_available()) {
426 		log_print("dlm user daemon not available");
427 		error = -EUNATCH;
428 		goto out;
429 	}
430 
431 	if (ops && ops_result) {
432 	       	if (!dlm_config.ci_recover_callbacks)
433 			*ops_result = -EOPNOTSUPP;
434 		else
435 			*ops_result = 0;
436 	}
437 
438 	if (!cluster)
439 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
440 			  dlm_config.ci_cluster_name);
441 
442 	if (dlm_config.ci_recover_callbacks && cluster &&
443 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
444 		log_print("dlm cluster name '%s' does not match "
445 			  "the application cluster name '%s'",
446 			  dlm_config.ci_cluster_name, cluster);
447 		error = -EBADR;
448 		goto out;
449 	}
450 
451 	error = 0;
452 
453 	spin_lock(&lslist_lock);
454 	list_for_each_entry(ls, &lslist, ls_list) {
455 		WARN_ON(ls->ls_create_count <= 0);
456 		if (ls->ls_namelen != namelen)
457 			continue;
458 		if (memcmp(ls->ls_name, name, namelen))
459 			continue;
460 		if (flags & DLM_LSFL_NEWEXCL) {
461 			error = -EEXIST;
462 			break;
463 		}
464 		ls->ls_create_count++;
465 		*lockspace = ls;
466 		error = 1;
467 		break;
468 	}
469 	spin_unlock(&lslist_lock);
470 
471 	if (error)
472 		goto out;
473 
474 	error = -ENOMEM;
475 
476 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
477 	if (!ls)
478 		goto out;
479 	memcpy(ls->ls_name, name, namelen);
480 	ls->ls_namelen = namelen;
481 	ls->ls_lvblen = lvblen;
482 	atomic_set(&ls->ls_count, 0);
483 	init_waitqueue_head(&ls->ls_count_wait);
484 	ls->ls_flags = 0;
485 	ls->ls_scan_time = jiffies;
486 
487 	if (ops && dlm_config.ci_recover_callbacks) {
488 		ls->ls_ops = ops;
489 		ls->ls_ops_arg = ops_arg;
490 	}
491 
492 #ifdef CONFIG_DLM_DEPRECATED_API
493 	if (flags & DLM_LSFL_TIMEWARN) {
494 		pr_warn_once("===============================================================\n"
495 			     "WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
496 			     "         will be removed in v6.2!\n"
497 			     "         Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
498 			     "===============================================================\n");
499 
500 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
501 	}
502 
503 	/* ls_exflags are forced to match among nodes, and we don't
504 	 * need to require all nodes to have some flags set
505 	 */
506 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
507 				    DLM_LSFL_NEWEXCL));
508 #else
509 	/* ls_exflags are forced to match among nodes, and we don't
510 	 * need to require all nodes to have some flags set
511 	 */
512 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
513 #endif
514 
515 	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
516 	ls->ls_rsbtbl_size = size;
517 
518 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
519 	if (!ls->ls_rsbtbl)
520 		goto out_lsfree;
521 	for (i = 0; i < size; i++) {
522 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
523 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
524 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
525 	}
526 
527 	spin_lock_init(&ls->ls_remove_spin);
528 	init_waitqueue_head(&ls->ls_remove_wait);
529 
530 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
531 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
532 						 GFP_KERNEL);
533 		if (!ls->ls_remove_names[i])
534 			goto out_rsbtbl;
535 	}
536 
537 	idr_init(&ls->ls_lkbidr);
538 	spin_lock_init(&ls->ls_lkbidr_spin);
539 
540 	INIT_LIST_HEAD(&ls->ls_waiters);
541 	mutex_init(&ls->ls_waiters_mutex);
542 	INIT_LIST_HEAD(&ls->ls_orphans);
543 	mutex_init(&ls->ls_orphans_mutex);
544 #ifdef CONFIG_DLM_DEPRECATED_API
545 	INIT_LIST_HEAD(&ls->ls_timeout);
546 	mutex_init(&ls->ls_timeout_mutex);
547 #endif
548 
549 	INIT_LIST_HEAD(&ls->ls_new_rsb);
550 	spin_lock_init(&ls->ls_new_rsb_spin);
551 
552 	INIT_LIST_HEAD(&ls->ls_nodes);
553 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
554 	ls->ls_num_nodes = 0;
555 	ls->ls_low_nodeid = 0;
556 	ls->ls_total_weight = 0;
557 	ls->ls_node_array = NULL;
558 
559 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
560 	ls->ls_stub_rsb.res_ls = ls;
561 
562 	ls->ls_debug_rsb_dentry = NULL;
563 	ls->ls_debug_waiters_dentry = NULL;
564 
565 	init_waitqueue_head(&ls->ls_uevent_wait);
566 	ls->ls_uevent_result = 0;
567 	init_completion(&ls->ls_recovery_done);
568 	ls->ls_recovery_result = -1;
569 
570 	mutex_init(&ls->ls_cb_mutex);
571 	INIT_LIST_HEAD(&ls->ls_cb_delay);
572 
573 	ls->ls_recoverd_task = NULL;
574 	mutex_init(&ls->ls_recoverd_active);
575 	spin_lock_init(&ls->ls_recover_lock);
576 	spin_lock_init(&ls->ls_rcom_spin);
577 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
578 	ls->ls_recover_status = 0;
579 	ls->ls_recover_seq = 0;
580 	ls->ls_recover_args = NULL;
581 	init_rwsem(&ls->ls_in_recovery);
582 	init_rwsem(&ls->ls_recv_active);
583 	INIT_LIST_HEAD(&ls->ls_requestqueue);
584 	atomic_set(&ls->ls_requestqueue_cnt, 0);
585 	init_waitqueue_head(&ls->ls_requestqueue_wait);
586 	mutex_init(&ls->ls_requestqueue_mutex);
587 	mutex_init(&ls->ls_clear_proc_locks);
588 
589 	/* Due backwards compatibility with 3.1 we need to use maximum
590 	 * possible dlm message size to be sure the message will fit and
591 	 * not having out of bounds issues. However on sending side 3.2
592 	 * might send less.
593 	 */
594 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
595 	if (!ls->ls_recover_buf)
596 		goto out_lkbidr;
597 
598 	ls->ls_slot = 0;
599 	ls->ls_num_slots = 0;
600 	ls->ls_slots_size = 0;
601 	ls->ls_slots = NULL;
602 
603 	INIT_LIST_HEAD(&ls->ls_recover_list);
604 	spin_lock_init(&ls->ls_recover_list_lock);
605 	idr_init(&ls->ls_recover_idr);
606 	spin_lock_init(&ls->ls_recover_idr_lock);
607 	ls->ls_recover_list_count = 0;
608 	ls->ls_local_handle = ls;
609 	init_waitqueue_head(&ls->ls_wait_general);
610 	INIT_LIST_HEAD(&ls->ls_root_list);
611 	init_rwsem(&ls->ls_root_sem);
612 
613 	spin_lock(&lslist_lock);
614 	ls->ls_create_count = 1;
615 	list_add(&ls->ls_list, &lslist);
616 	spin_unlock(&lslist_lock);
617 
618 	if (flags & DLM_LSFL_FS) {
619 		error = dlm_callback_start(ls);
620 		if (error) {
621 			log_error(ls, "can't start dlm_callback %d", error);
622 			goto out_delist;
623 		}
624 	}
625 
626 	init_waitqueue_head(&ls->ls_recover_lock_wait);
627 
628 	/*
629 	 * Once started, dlm_recoverd first looks for ls in lslist, then
630 	 * initializes ls_in_recovery as locked in "down" mode.  We need
631 	 * to wait for the wakeup from dlm_recoverd because in_recovery
632 	 * has to start out in down mode.
633 	 */
634 
635 	error = dlm_recoverd_start(ls);
636 	if (error) {
637 		log_error(ls, "can't start dlm_recoverd %d", error);
638 		goto out_callback;
639 	}
640 
641 	wait_event(ls->ls_recover_lock_wait,
642 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
643 
644 	/* let kobject handle freeing of ls if there's an error */
645 	do_unreg = 1;
646 
647 	ls->ls_kobj.kset = dlm_kset;
648 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
649 				     "%s", ls->ls_name);
650 	if (error)
651 		goto out_recoverd;
652 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
653 
654 	/* This uevent triggers dlm_controld in userspace to add us to the
655 	   group of nodes that are members of this lockspace (managed by the
656 	   cluster infrastructure.)  Once it's done that, it tells us who the
657 	   current lockspace members are (via configfs) and then tells the
658 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
659 
660 	error = do_uevent(ls, 1);
661 	if (error)
662 		goto out_recoverd;
663 
664 	/* wait until recovery is successful or failed */
665 	wait_for_completion(&ls->ls_recovery_done);
666 	error = ls->ls_recovery_result;
667 	if (error)
668 		goto out_members;
669 
670 	dlm_create_debug_file(ls);
671 
672 	log_rinfo(ls, "join complete");
673 	*lockspace = ls;
674 	return 0;
675 
676  out_members:
677 	do_uevent(ls, 0);
678 	dlm_clear_members(ls);
679 	kfree(ls->ls_node_array);
680  out_recoverd:
681 	dlm_recoverd_stop(ls);
682  out_callback:
683 	dlm_callback_stop(ls);
684  out_delist:
685 	spin_lock(&lslist_lock);
686 	list_del(&ls->ls_list);
687 	spin_unlock(&lslist_lock);
688 	idr_destroy(&ls->ls_recover_idr);
689 	kfree(ls->ls_recover_buf);
690  out_lkbidr:
691 	idr_destroy(&ls->ls_lkbidr);
692  out_rsbtbl:
693 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
694 		kfree(ls->ls_remove_names[i]);
695 	vfree(ls->ls_rsbtbl);
696  out_lsfree:
697 	if (do_unreg)
698 		kobject_put(&ls->ls_kobj);
699 	else
700 		kfree(ls);
701  out:
702 	module_put(THIS_MODULE);
703 	return error;
704 }
705 
706 int dlm_new_lockspace(const char *name, const char *cluster,
707 		      uint32_t flags, int lvblen,
708 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
709 		      int *ops_result, dlm_lockspace_t **lockspace)
710 {
711 	int error = 0;
712 
713 	mutex_lock(&ls_lock);
714 	if (!ls_count)
715 		error = threads_start();
716 	if (error)
717 		goto out;
718 
719 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
720 			      ops_result, lockspace);
721 	if (!error)
722 		ls_count++;
723 	if (error > 0)
724 		error = 0;
725 	if (!ls_count) {
726 		dlm_scand_stop();
727 		dlm_midcomms_shutdown();
728 		dlm_lowcomms_stop();
729 	}
730  out:
731 	mutex_unlock(&ls_lock);
732 	return error;
733 }
734 
735 static int lkb_idr_is_local(int id, void *p, void *data)
736 {
737 	struct dlm_lkb *lkb = p;
738 
739 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
740 }
741 
742 static int lkb_idr_is_any(int id, void *p, void *data)
743 {
744 	return 1;
745 }
746 
747 static int lkb_idr_free(int id, void *p, void *data)
748 {
749 	struct dlm_lkb *lkb = p;
750 
751 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
752 		dlm_free_lvb(lkb->lkb_lvbptr);
753 
754 	dlm_free_lkb(lkb);
755 	return 0;
756 }
757 
758 /* NOTE: We check the lkbidr here rather than the resource table.
759    This is because there may be LKBs queued as ASTs that have been unlinked
760    from their RSBs and are pending deletion once the AST has been delivered */
761 
762 static int lockspace_busy(struct dlm_ls *ls, int force)
763 {
764 	int rv;
765 
766 	spin_lock(&ls->ls_lkbidr_spin);
767 	if (force == 0) {
768 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
769 	} else if (force == 1) {
770 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
771 	} else {
772 		rv = 0;
773 	}
774 	spin_unlock(&ls->ls_lkbidr_spin);
775 	return rv;
776 }
777 
778 static int release_lockspace(struct dlm_ls *ls, int force)
779 {
780 	struct dlm_rsb *rsb;
781 	struct rb_node *n;
782 	int i, busy, rv;
783 
784 	busy = lockspace_busy(ls, force);
785 
786 	spin_lock(&lslist_lock);
787 	if (ls->ls_create_count == 1) {
788 		if (busy) {
789 			rv = -EBUSY;
790 		} else {
791 			/* remove_lockspace takes ls off lslist */
792 			ls->ls_create_count = 0;
793 			rv = 0;
794 		}
795 	} else if (ls->ls_create_count > 1) {
796 		rv = --ls->ls_create_count;
797 	} else {
798 		rv = -EINVAL;
799 	}
800 	spin_unlock(&lslist_lock);
801 
802 	if (rv) {
803 		log_debug(ls, "release_lockspace no remove %d", rv);
804 		return rv;
805 	}
806 
807 	dlm_device_deregister(ls);
808 
809 	if (force < 3 && dlm_user_daemon_available())
810 		do_uevent(ls, 0);
811 
812 	dlm_recoverd_stop(ls);
813 
814 	if (ls_count == 1) {
815 		dlm_scand_stop();
816 		dlm_clear_members(ls);
817 		dlm_midcomms_shutdown();
818 	}
819 
820 	dlm_callback_stop(ls);
821 
822 	remove_lockspace(ls);
823 
824 	dlm_delete_debug_file(ls);
825 
826 	idr_destroy(&ls->ls_recover_idr);
827 	kfree(ls->ls_recover_buf);
828 
829 	/*
830 	 * Free all lkb's in idr
831 	 */
832 
833 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
834 	idr_destroy(&ls->ls_lkbidr);
835 
836 	/*
837 	 * Free all rsb's on rsbtbl[] lists
838 	 */
839 
840 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
841 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
842 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
843 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
844 			dlm_free_rsb(rsb);
845 		}
846 
847 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
848 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
849 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
850 			dlm_free_rsb(rsb);
851 		}
852 	}
853 
854 	vfree(ls->ls_rsbtbl);
855 
856 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
857 		kfree(ls->ls_remove_names[i]);
858 
859 	while (!list_empty(&ls->ls_new_rsb)) {
860 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
861 				       res_hashchain);
862 		list_del(&rsb->res_hashchain);
863 		dlm_free_rsb(rsb);
864 	}
865 
866 	/*
867 	 * Free structures on any other lists
868 	 */
869 
870 	dlm_purge_requestqueue(ls);
871 	kfree(ls->ls_recover_args);
872 	dlm_clear_members(ls);
873 	dlm_clear_members_gone(ls);
874 	kfree(ls->ls_node_array);
875 	log_rinfo(ls, "release_lockspace final free");
876 	kobject_put(&ls->ls_kobj);
877 	/* The ls structure will be freed when the kobject is done with */
878 
879 	module_put(THIS_MODULE);
880 	return 0;
881 }
882 
883 /*
884  * Called when a system has released all its locks and is not going to use the
885  * lockspace any longer.  We free everything we're managing for this lockspace.
886  * Remaining nodes will go through the recovery process as if we'd died.  The
887  * lockspace must continue to function as usual, participating in recoveries,
888  * until this returns.
889  *
890  * Force has 4 possible values:
891  * 0 - don't destroy lockspace if it has any LKBs
892  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
893  * 2 - destroy lockspace regardless of LKBs
894  * 3 - destroy lockspace as part of a forced shutdown
895  */
896 
897 int dlm_release_lockspace(void *lockspace, int force)
898 {
899 	struct dlm_ls *ls;
900 	int error;
901 
902 	ls = dlm_find_lockspace_local(lockspace);
903 	if (!ls)
904 		return -EINVAL;
905 	dlm_put_lockspace(ls);
906 
907 	mutex_lock(&ls_lock);
908 	error = release_lockspace(ls, force);
909 	if (!error)
910 		ls_count--;
911 	if (!ls_count)
912 		dlm_lowcomms_stop();
913 	mutex_unlock(&ls_lock);
914 
915 	return error;
916 }
917 
918 void dlm_stop_lockspaces(void)
919 {
920 	struct dlm_ls *ls;
921 	int count;
922 
923  restart:
924 	count = 0;
925 	spin_lock(&lslist_lock);
926 	list_for_each_entry(ls, &lslist, ls_list) {
927 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
928 			count++;
929 			continue;
930 		}
931 		spin_unlock(&lslist_lock);
932 		log_error(ls, "no userland control daemon, stopping lockspace");
933 		dlm_ls_stop(ls);
934 		goto restart;
935 	}
936 	spin_unlock(&lslist_lock);
937 
938 	if (count)
939 		log_print("dlm user daemon left %d lockspaces", count);
940 }
941 
942 void dlm_stop_lockspaces_check(void)
943 {
944 	struct dlm_ls *ls;
945 
946 	spin_lock(&lslist_lock);
947 	list_for_each_entry(ls, &lslist, ls_list) {
948 		if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
949 			    !dlm_locking_stopped(ls)))
950 			break;
951 	}
952 	spin_unlock(&lslist_lock);
953 }
954