xref: /linux/fs/dlm/lockspace.c (revision 0969001569e403107c11561d497893a07394d691)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n;
39 	int rc = kstrtoint(buf, 0, &n);
40 
41 	if (rc)
42 		return rc;
43 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
44 	if (!ls)
45 		return -EINVAL;
46 
47 	switch (n) {
48 	case 0:
49 		dlm_ls_stop(ls);
50 		break;
51 	case 1:
52 		dlm_ls_start(ls);
53 		break;
54 	default:
55 		ret = -EINVAL;
56 	}
57 	dlm_put_lockspace(ls);
58 	return ret;
59 }
60 
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64 
65 	if (rc)
66 		return rc;
67 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 	wake_up(&ls->ls_uevent_wait);
69 	return len;
70 }
71 
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76 
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80 
81 	if (rc)
82 		return rc;
83 	return len;
84 }
85 
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90 
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93 	int val;
94 	int rc = kstrtoint(buf, 0, &val);
95 
96 	if (rc)
97 		return rc;
98 	if (val == 1)
99 		set_bit(LSFL_NODIR, &ls->ls_flags);
100 	return len;
101 }
102 
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105 	uint32_t status = dlm_recover_status(ls);
106 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108 
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113 
114 struct dlm_attr {
115 	struct attribute attr;
116 	ssize_t (*show)(struct dlm_ls *, char *);
117 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119 
120 static struct dlm_attr dlm_attr_control = {
121 	.attr  = {.name = "control", .mode = S_IWUSR},
122 	.store = dlm_control_store
123 };
124 
125 static struct dlm_attr dlm_attr_event = {
126 	.attr  = {.name = "event_done", .mode = S_IWUSR},
127 	.store = dlm_event_store
128 };
129 
130 static struct dlm_attr dlm_attr_id = {
131 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132 	.show  = dlm_id_show,
133 	.store = dlm_id_store
134 };
135 
136 static struct dlm_attr dlm_attr_nodir = {
137 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 	.show  = dlm_nodir_show,
139 	.store = dlm_nodir_store
140 };
141 
142 static struct dlm_attr dlm_attr_recover_status = {
143 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144 	.show  = dlm_recover_status_show
145 };
146 
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149 	.show  = dlm_recover_nodeid_show
150 };
151 
152 static struct attribute *dlm_attrs[] = {
153 	&dlm_attr_control.attr,
154 	&dlm_attr_event.attr,
155 	&dlm_attr_id.attr,
156 	&dlm_attr_nodir.attr,
157 	&dlm_attr_recover_status.attr,
158 	&dlm_attr_recover_nodeid.attr,
159 	NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162 
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164 			     char *buf)
165 {
166 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168 	return a->show ? a->show(ls, buf) : 0;
169 }
170 
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172 			      const char *buf, size_t len)
173 {
174 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176 	return a->store ? a->store(ls, buf, len) : len;
177 }
178 
179 static void lockspace_kobj_release(struct kobject *k)
180 {
181 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182 	kfree(ls);
183 }
184 
185 static const struct sysfs_ops dlm_attr_ops = {
186 	.show  = dlm_attr_show,
187 	.store = dlm_attr_store,
188 };
189 
190 static struct kobj_type dlm_ktype = {
191 	.default_groups = dlm_groups,
192 	.sysfs_ops     = &dlm_attr_ops,
193 	.release       = lockspace_kobj_release,
194 };
195 
196 static struct kset *dlm_kset;
197 
198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200 	if (in)
201 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202 	else
203 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204 
205 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206 
207 	/* dlm_controld will see the uevent, do the necessary group management
208 	   and then write to sysfs to wake us */
209 
210 	wait_event(ls->ls_uevent_wait,
211 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212 
213 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214 
215 	return ls->ls_uevent_result;
216 }
217 
218 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
219 {
220 	const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
221 
222 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
223 	return 0;
224 }
225 
226 static const struct kset_uevent_ops dlm_uevent_ops = {
227 	.uevent = dlm_uevent,
228 };
229 
230 int __init dlm_lockspace_init(void)
231 {
232 	ls_count = 0;
233 	mutex_init(&ls_lock);
234 	INIT_LIST_HEAD(&lslist);
235 	spin_lock_init(&lslist_lock);
236 
237 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
238 	if (!dlm_kset) {
239 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
240 		return -ENOMEM;
241 	}
242 	return 0;
243 }
244 
245 void dlm_lockspace_exit(void)
246 {
247 	kset_unregister(dlm_kset);
248 }
249 
250 static struct dlm_ls *find_ls_to_scan(void)
251 {
252 	struct dlm_ls *ls;
253 
254 	spin_lock(&lslist_lock);
255 	list_for_each_entry(ls, &lslist, ls_list) {
256 		if (time_after_eq(jiffies, ls->ls_scan_time +
257 					    dlm_config.ci_scan_secs * HZ)) {
258 			spin_unlock(&lslist_lock);
259 			return ls;
260 		}
261 	}
262 	spin_unlock(&lslist_lock);
263 	return NULL;
264 }
265 
266 static int dlm_scand(void *data)
267 {
268 	struct dlm_ls *ls;
269 
270 	while (!kthread_should_stop()) {
271 		ls = find_ls_to_scan();
272 		if (ls) {
273 			if (dlm_lock_recovery_try(ls)) {
274 				ls->ls_scan_time = jiffies;
275 				dlm_scan_rsbs(ls);
276 				dlm_unlock_recovery(ls);
277 			} else {
278 				ls->ls_scan_time += HZ;
279 			}
280 			continue;
281 		}
282 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
283 	}
284 	return 0;
285 }
286 
287 static int dlm_scand_start(void)
288 {
289 	struct task_struct *p;
290 	int error = 0;
291 
292 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
293 	if (IS_ERR(p))
294 		error = PTR_ERR(p);
295 	else
296 		scand_task = p;
297 	return error;
298 }
299 
300 static void dlm_scand_stop(void)
301 {
302 	kthread_stop(scand_task);
303 }
304 
305 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
306 {
307 	struct dlm_ls *ls;
308 
309 	spin_lock(&lslist_lock);
310 
311 	list_for_each_entry(ls, &lslist, ls_list) {
312 		if (ls->ls_global_id == id) {
313 			atomic_inc(&ls->ls_count);
314 			goto out;
315 		}
316 	}
317 	ls = NULL;
318  out:
319 	spin_unlock(&lslist_lock);
320 	return ls;
321 }
322 
323 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
324 {
325 	struct dlm_ls *ls;
326 
327 	spin_lock(&lslist_lock);
328 	list_for_each_entry(ls, &lslist, ls_list) {
329 		if (ls->ls_local_handle == lockspace) {
330 			atomic_inc(&ls->ls_count);
331 			goto out;
332 		}
333 	}
334 	ls = NULL;
335  out:
336 	spin_unlock(&lslist_lock);
337 	return ls;
338 }
339 
340 struct dlm_ls *dlm_find_lockspace_device(int minor)
341 {
342 	struct dlm_ls *ls;
343 
344 	spin_lock(&lslist_lock);
345 	list_for_each_entry(ls, &lslist, ls_list) {
346 		if (ls->ls_device.minor == minor) {
347 			atomic_inc(&ls->ls_count);
348 			goto out;
349 		}
350 	}
351 	ls = NULL;
352  out:
353 	spin_unlock(&lslist_lock);
354 	return ls;
355 }
356 
357 void dlm_put_lockspace(struct dlm_ls *ls)
358 {
359 	if (atomic_dec_and_test(&ls->ls_count))
360 		wake_up(&ls->ls_count_wait);
361 }
362 
363 static void remove_lockspace(struct dlm_ls *ls)
364 {
365 retry:
366 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
367 
368 	spin_lock(&lslist_lock);
369 	if (atomic_read(&ls->ls_count) != 0) {
370 		spin_unlock(&lslist_lock);
371 		goto retry;
372 	}
373 
374 	WARN_ON(ls->ls_create_count != 0);
375 	list_del(&ls->ls_list);
376 	spin_unlock(&lslist_lock);
377 }
378 
379 static int threads_start(void)
380 {
381 	int error;
382 
383 	/* Thread for sending/receiving messages for all lockspace's */
384 	error = dlm_midcomms_start();
385 	if (error) {
386 		log_print("cannot start dlm midcomms %d", error);
387 		goto fail;
388 	}
389 
390 	error = dlm_scand_start();
391 	if (error) {
392 		log_print("cannot start dlm_scand thread %d", error);
393 		goto midcomms_fail;
394 	}
395 
396 	return 0;
397 
398  midcomms_fail:
399 	dlm_midcomms_stop();
400  fail:
401 	return error;
402 }
403 
404 static int new_lockspace(const char *name, const char *cluster,
405 			 uint32_t flags, int lvblen,
406 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
407 			 int *ops_result, dlm_lockspace_t **lockspace)
408 {
409 	struct dlm_ls *ls;
410 	int i, size, error;
411 	int do_unreg = 0;
412 	int namelen = strlen(name);
413 
414 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
415 		return -EINVAL;
416 
417 	if (lvblen % 8)
418 		return -EINVAL;
419 
420 	if (!try_module_get(THIS_MODULE))
421 		return -EINVAL;
422 
423 	if (!dlm_user_daemon_available()) {
424 		log_print("dlm user daemon not available");
425 		error = -EUNATCH;
426 		goto out;
427 	}
428 
429 	if (ops && ops_result) {
430 	       	if (!dlm_config.ci_recover_callbacks)
431 			*ops_result = -EOPNOTSUPP;
432 		else
433 			*ops_result = 0;
434 	}
435 
436 	if (!cluster)
437 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
438 			  dlm_config.ci_cluster_name);
439 
440 	if (dlm_config.ci_recover_callbacks && cluster &&
441 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
442 		log_print("dlm cluster name '%s' does not match "
443 			  "the application cluster name '%s'",
444 			  dlm_config.ci_cluster_name, cluster);
445 		error = -EBADR;
446 		goto out;
447 	}
448 
449 	error = 0;
450 
451 	spin_lock(&lslist_lock);
452 	list_for_each_entry(ls, &lslist, ls_list) {
453 		WARN_ON(ls->ls_create_count <= 0);
454 		if (ls->ls_namelen != namelen)
455 			continue;
456 		if (memcmp(ls->ls_name, name, namelen))
457 			continue;
458 		if (flags & DLM_LSFL_NEWEXCL) {
459 			error = -EEXIST;
460 			break;
461 		}
462 		ls->ls_create_count++;
463 		*lockspace = ls;
464 		error = 1;
465 		break;
466 	}
467 	spin_unlock(&lslist_lock);
468 
469 	if (error)
470 		goto out;
471 
472 	error = -ENOMEM;
473 
474 	ls = kzalloc(sizeof(*ls), GFP_NOFS);
475 	if (!ls)
476 		goto out;
477 	memcpy(ls->ls_name, name, namelen);
478 	ls->ls_namelen = namelen;
479 	ls->ls_lvblen = lvblen;
480 	atomic_set(&ls->ls_count, 0);
481 	init_waitqueue_head(&ls->ls_count_wait);
482 	ls->ls_flags = 0;
483 	ls->ls_scan_time = jiffies;
484 
485 	if (ops && dlm_config.ci_recover_callbacks) {
486 		ls->ls_ops = ops;
487 		ls->ls_ops_arg = ops_arg;
488 	}
489 
490 	/* ls_exflags are forced to match among nodes, and we don't
491 	 * need to require all nodes to have some flags set
492 	 */
493 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
494 
495 	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
496 	ls->ls_rsbtbl_size = size;
497 
498 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
499 	if (!ls->ls_rsbtbl)
500 		goto out_lsfree;
501 	for (i = 0; i < size; i++) {
502 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
503 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
504 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
505 	}
506 
507 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
508 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
509 						 GFP_KERNEL);
510 		if (!ls->ls_remove_names[i])
511 			goto out_rsbtbl;
512 	}
513 
514 	idr_init(&ls->ls_lkbidr);
515 	spin_lock_init(&ls->ls_lkbidr_spin);
516 
517 	INIT_LIST_HEAD(&ls->ls_waiters);
518 	mutex_init(&ls->ls_waiters_mutex);
519 	INIT_LIST_HEAD(&ls->ls_orphans);
520 	mutex_init(&ls->ls_orphans_mutex);
521 
522 	INIT_LIST_HEAD(&ls->ls_new_rsb);
523 	spin_lock_init(&ls->ls_new_rsb_spin);
524 
525 	INIT_LIST_HEAD(&ls->ls_nodes);
526 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
527 	ls->ls_num_nodes = 0;
528 	ls->ls_low_nodeid = 0;
529 	ls->ls_total_weight = 0;
530 	ls->ls_node_array = NULL;
531 
532 	memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
533 	ls->ls_local_rsb.res_ls = ls;
534 
535 	ls->ls_debug_rsb_dentry = NULL;
536 	ls->ls_debug_waiters_dentry = NULL;
537 
538 	init_waitqueue_head(&ls->ls_uevent_wait);
539 	ls->ls_uevent_result = 0;
540 	init_completion(&ls->ls_recovery_done);
541 	ls->ls_recovery_result = -1;
542 
543 	spin_lock_init(&ls->ls_cb_lock);
544 	INIT_LIST_HEAD(&ls->ls_cb_delay);
545 
546 	ls->ls_recoverd_task = NULL;
547 	mutex_init(&ls->ls_recoverd_active);
548 	spin_lock_init(&ls->ls_recover_lock);
549 	spin_lock_init(&ls->ls_rcom_spin);
550 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
551 	ls->ls_recover_status = 0;
552 	ls->ls_recover_seq = get_random_u64();
553 	ls->ls_recover_args = NULL;
554 	init_rwsem(&ls->ls_in_recovery);
555 	init_rwsem(&ls->ls_recv_active);
556 	INIT_LIST_HEAD(&ls->ls_requestqueue);
557 	atomic_set(&ls->ls_requestqueue_cnt, 0);
558 	init_waitqueue_head(&ls->ls_requestqueue_wait);
559 	mutex_init(&ls->ls_requestqueue_mutex);
560 	spin_lock_init(&ls->ls_clear_proc_locks);
561 
562 	/* Due backwards compatibility with 3.1 we need to use maximum
563 	 * possible dlm message size to be sure the message will fit and
564 	 * not having out of bounds issues. However on sending side 3.2
565 	 * might send less.
566 	 */
567 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
568 	if (!ls->ls_recover_buf)
569 		goto out_lkbidr;
570 
571 	ls->ls_slot = 0;
572 	ls->ls_num_slots = 0;
573 	ls->ls_slots_size = 0;
574 	ls->ls_slots = NULL;
575 
576 	INIT_LIST_HEAD(&ls->ls_recover_list);
577 	spin_lock_init(&ls->ls_recover_list_lock);
578 	idr_init(&ls->ls_recover_idr);
579 	spin_lock_init(&ls->ls_recover_idr_lock);
580 	ls->ls_recover_list_count = 0;
581 	ls->ls_local_handle = ls;
582 	init_waitqueue_head(&ls->ls_wait_general);
583 	INIT_LIST_HEAD(&ls->ls_root_list);
584 	init_rwsem(&ls->ls_root_sem);
585 
586 	spin_lock(&lslist_lock);
587 	ls->ls_create_count = 1;
588 	list_add(&ls->ls_list, &lslist);
589 	spin_unlock(&lslist_lock);
590 
591 	if (flags & DLM_LSFL_FS) {
592 		error = dlm_callback_start(ls);
593 		if (error) {
594 			log_error(ls, "can't start dlm_callback %d", error);
595 			goto out_delist;
596 		}
597 	}
598 
599 	init_waitqueue_head(&ls->ls_recover_lock_wait);
600 
601 	/*
602 	 * Once started, dlm_recoverd first looks for ls in lslist, then
603 	 * initializes ls_in_recovery as locked in "down" mode.  We need
604 	 * to wait for the wakeup from dlm_recoverd because in_recovery
605 	 * has to start out in down mode.
606 	 */
607 
608 	error = dlm_recoverd_start(ls);
609 	if (error) {
610 		log_error(ls, "can't start dlm_recoverd %d", error);
611 		goto out_callback;
612 	}
613 
614 	wait_event(ls->ls_recover_lock_wait,
615 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
616 
617 	/* let kobject handle freeing of ls if there's an error */
618 	do_unreg = 1;
619 
620 	ls->ls_kobj.kset = dlm_kset;
621 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
622 				     "%s", ls->ls_name);
623 	if (error)
624 		goto out_recoverd;
625 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
626 
627 	/* This uevent triggers dlm_controld in userspace to add us to the
628 	   group of nodes that are members of this lockspace (managed by the
629 	   cluster infrastructure.)  Once it's done that, it tells us who the
630 	   current lockspace members are (via configfs) and then tells the
631 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
632 
633 	error = do_uevent(ls, 1);
634 	if (error)
635 		goto out_recoverd;
636 
637 	/* wait until recovery is successful or failed */
638 	wait_for_completion(&ls->ls_recovery_done);
639 	error = ls->ls_recovery_result;
640 	if (error)
641 		goto out_members;
642 
643 	dlm_create_debug_file(ls);
644 
645 	log_rinfo(ls, "join complete");
646 	*lockspace = ls;
647 	return 0;
648 
649  out_members:
650 	do_uevent(ls, 0);
651 	dlm_clear_members(ls);
652 	kfree(ls->ls_node_array);
653  out_recoverd:
654 	dlm_recoverd_stop(ls);
655  out_callback:
656 	dlm_callback_stop(ls);
657  out_delist:
658 	spin_lock(&lslist_lock);
659 	list_del(&ls->ls_list);
660 	spin_unlock(&lslist_lock);
661 	idr_destroy(&ls->ls_recover_idr);
662 	kfree(ls->ls_recover_buf);
663  out_lkbidr:
664 	idr_destroy(&ls->ls_lkbidr);
665  out_rsbtbl:
666 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
667 		kfree(ls->ls_remove_names[i]);
668 	vfree(ls->ls_rsbtbl);
669  out_lsfree:
670 	if (do_unreg)
671 		kobject_put(&ls->ls_kobj);
672 	else
673 		kfree(ls);
674  out:
675 	module_put(THIS_MODULE);
676 	return error;
677 }
678 
679 static int __dlm_new_lockspace(const char *name, const char *cluster,
680 			       uint32_t flags, int lvblen,
681 			       const struct dlm_lockspace_ops *ops,
682 			       void *ops_arg, int *ops_result,
683 			       dlm_lockspace_t **lockspace)
684 {
685 	int error = 0;
686 
687 	mutex_lock(&ls_lock);
688 	if (!ls_count)
689 		error = threads_start();
690 	if (error)
691 		goto out;
692 
693 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
694 			      ops_result, lockspace);
695 	if (!error)
696 		ls_count++;
697 	if (error > 0)
698 		error = 0;
699 	if (!ls_count) {
700 		dlm_scand_stop();
701 		dlm_midcomms_shutdown();
702 		dlm_midcomms_stop();
703 	}
704  out:
705 	mutex_unlock(&ls_lock);
706 	return error;
707 }
708 
709 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
710 		      int lvblen, const struct dlm_lockspace_ops *ops,
711 		      void *ops_arg, int *ops_result,
712 		      dlm_lockspace_t **lockspace)
713 {
714 	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
715 				   ops, ops_arg, ops_result, lockspace);
716 }
717 
718 int dlm_new_user_lockspace(const char *name, const char *cluster,
719 			   uint32_t flags, int lvblen,
720 			   const struct dlm_lockspace_ops *ops,
721 			   void *ops_arg, int *ops_result,
722 			   dlm_lockspace_t **lockspace)
723 {
724 	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
725 				   ops_arg, ops_result, lockspace);
726 }
727 
728 static int lkb_idr_is_local(int id, void *p, void *data)
729 {
730 	struct dlm_lkb *lkb = p;
731 
732 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
733 }
734 
735 static int lkb_idr_is_any(int id, void *p, void *data)
736 {
737 	return 1;
738 }
739 
740 static int lkb_idr_free(int id, void *p, void *data)
741 {
742 	struct dlm_lkb *lkb = p;
743 
744 	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
745 		dlm_free_lvb(lkb->lkb_lvbptr);
746 
747 	dlm_free_lkb(lkb);
748 	return 0;
749 }
750 
751 /* NOTE: We check the lkbidr here rather than the resource table.
752    This is because there may be LKBs queued as ASTs that have been unlinked
753    from their RSBs and are pending deletion once the AST has been delivered */
754 
755 static int lockspace_busy(struct dlm_ls *ls, int force)
756 {
757 	int rv;
758 
759 	spin_lock(&ls->ls_lkbidr_spin);
760 	if (force == 0) {
761 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
762 	} else if (force == 1) {
763 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
764 	} else {
765 		rv = 0;
766 	}
767 	spin_unlock(&ls->ls_lkbidr_spin);
768 	return rv;
769 }
770 
771 static int release_lockspace(struct dlm_ls *ls, int force)
772 {
773 	struct dlm_rsb *rsb;
774 	struct rb_node *n;
775 	int i, busy, rv;
776 
777 	busy = lockspace_busy(ls, force);
778 
779 	spin_lock(&lslist_lock);
780 	if (ls->ls_create_count == 1) {
781 		if (busy) {
782 			rv = -EBUSY;
783 		} else {
784 			/* remove_lockspace takes ls off lslist */
785 			ls->ls_create_count = 0;
786 			rv = 0;
787 		}
788 	} else if (ls->ls_create_count > 1) {
789 		rv = --ls->ls_create_count;
790 	} else {
791 		rv = -EINVAL;
792 	}
793 	spin_unlock(&lslist_lock);
794 
795 	if (rv) {
796 		log_debug(ls, "release_lockspace no remove %d", rv);
797 		return rv;
798 	}
799 
800 	if (ls_count == 1)
801 		dlm_midcomms_version_wait();
802 
803 	dlm_device_deregister(ls);
804 
805 	if (force < 3 && dlm_user_daemon_available())
806 		do_uevent(ls, 0);
807 
808 	dlm_recoverd_stop(ls);
809 
810 	if (ls_count == 1) {
811 		dlm_scand_stop();
812 		dlm_clear_members(ls);
813 		dlm_midcomms_shutdown();
814 	}
815 
816 	dlm_callback_stop(ls);
817 
818 	remove_lockspace(ls);
819 
820 	dlm_delete_debug_file(ls);
821 
822 	idr_destroy(&ls->ls_recover_idr);
823 	kfree(ls->ls_recover_buf);
824 
825 	/*
826 	 * Free all lkb's in idr
827 	 */
828 
829 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
830 	idr_destroy(&ls->ls_lkbidr);
831 
832 	/*
833 	 * Free all rsb's on rsbtbl[] lists
834 	 */
835 
836 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
837 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
838 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
839 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
840 			dlm_free_rsb(rsb);
841 		}
842 
843 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
844 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
845 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
846 			dlm_free_rsb(rsb);
847 		}
848 	}
849 
850 	vfree(ls->ls_rsbtbl);
851 
852 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
853 		kfree(ls->ls_remove_names[i]);
854 
855 	while (!list_empty(&ls->ls_new_rsb)) {
856 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
857 				       res_hashchain);
858 		list_del(&rsb->res_hashchain);
859 		dlm_free_rsb(rsb);
860 	}
861 
862 	/*
863 	 * Free structures on any other lists
864 	 */
865 
866 	dlm_purge_requestqueue(ls);
867 	kfree(ls->ls_recover_args);
868 	dlm_clear_members(ls);
869 	dlm_clear_members_gone(ls);
870 	kfree(ls->ls_node_array);
871 	log_rinfo(ls, "release_lockspace final free");
872 	kobject_put(&ls->ls_kobj);
873 	/* The ls structure will be freed when the kobject is done with */
874 
875 	module_put(THIS_MODULE);
876 	return 0;
877 }
878 
879 /*
880  * Called when a system has released all its locks and is not going to use the
881  * lockspace any longer.  We free everything we're managing for this lockspace.
882  * Remaining nodes will go through the recovery process as if we'd died.  The
883  * lockspace must continue to function as usual, participating in recoveries,
884  * until this returns.
885  *
886  * Force has 4 possible values:
887  * 0 - don't destroy lockspace if it has any LKBs
888  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
889  * 2 - destroy lockspace regardless of LKBs
890  * 3 - destroy lockspace as part of a forced shutdown
891  */
892 
893 int dlm_release_lockspace(void *lockspace, int force)
894 {
895 	struct dlm_ls *ls;
896 	int error;
897 
898 	ls = dlm_find_lockspace_local(lockspace);
899 	if (!ls)
900 		return -EINVAL;
901 	dlm_put_lockspace(ls);
902 
903 	mutex_lock(&ls_lock);
904 	error = release_lockspace(ls, force);
905 	if (!error)
906 		ls_count--;
907 	if (!ls_count)
908 		dlm_midcomms_stop();
909 	mutex_unlock(&ls_lock);
910 
911 	return error;
912 }
913 
914 void dlm_stop_lockspaces(void)
915 {
916 	struct dlm_ls *ls;
917 	int count;
918 
919  restart:
920 	count = 0;
921 	spin_lock(&lslist_lock);
922 	list_for_each_entry(ls, &lslist, ls_list) {
923 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
924 			count++;
925 			continue;
926 		}
927 		spin_unlock(&lslist_lock);
928 		log_error(ls, "no userland control daemon, stopping lockspace");
929 		dlm_ls_stop(ls);
930 		goto restart;
931 	}
932 	spin_unlock(&lslist_lock);
933 
934 	if (count)
935 		log_print("dlm user daemon left %d lockspaces", count);
936 }
937 
938