xref: /linux/fs/dlm/lockspace.c (revision 16018c0d27eda6a7f69dafa750d23770fb46b00f)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27 #include "ast.h"
28 
29 static int			ls_count;
30 static struct mutex		ls_lock;
31 static struct list_head		lslist;
32 static spinlock_t		lslist_lock;
33 static struct task_struct *	scand_task;
34 
35 
36 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 {
38 	ssize_t ret = len;
39 	int n;
40 	int rc = kstrtoint(buf, 0, &n);
41 
42 	if (rc)
43 		return rc;
44 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
45 	if (!ls)
46 		return -EINVAL;
47 
48 	switch (n) {
49 	case 0:
50 		dlm_ls_stop(ls);
51 		break;
52 	case 1:
53 		dlm_ls_start(ls);
54 		break;
55 	default:
56 		ret = -EINVAL;
57 	}
58 	dlm_put_lockspace(ls);
59 	return ret;
60 }
61 
62 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
63 {
64 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
65 
66 	if (rc)
67 		return rc;
68 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69 	wake_up(&ls->ls_uevent_wait);
70 	return len;
71 }
72 
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77 
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
81 
82 	if (rc)
83 		return rc;
84 	return len;
85 }
86 
87 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
88 {
89 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
90 }
91 
92 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
93 {
94 	int val;
95 	int rc = kstrtoint(buf, 0, &val);
96 
97 	if (rc)
98 		return rc;
99 	if (val == 1)
100 		set_bit(LSFL_NODIR, &ls->ls_flags);
101 	return len;
102 }
103 
104 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
105 {
106 	uint32_t status = dlm_recover_status(ls);
107 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
108 }
109 
110 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
111 {
112 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
113 }
114 
115 struct dlm_attr {
116 	struct attribute attr;
117 	ssize_t (*show)(struct dlm_ls *, char *);
118 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
119 };
120 
121 static struct dlm_attr dlm_attr_control = {
122 	.attr  = {.name = "control", .mode = S_IWUSR},
123 	.store = dlm_control_store
124 };
125 
126 static struct dlm_attr dlm_attr_event = {
127 	.attr  = {.name = "event_done", .mode = S_IWUSR},
128 	.store = dlm_event_store
129 };
130 
131 static struct dlm_attr dlm_attr_id = {
132 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
133 	.show  = dlm_id_show,
134 	.store = dlm_id_store
135 };
136 
137 static struct dlm_attr dlm_attr_nodir = {
138 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
139 	.show  = dlm_nodir_show,
140 	.store = dlm_nodir_store
141 };
142 
143 static struct dlm_attr dlm_attr_recover_status = {
144 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
145 	.show  = dlm_recover_status_show
146 };
147 
148 static struct dlm_attr dlm_attr_recover_nodeid = {
149 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
150 	.show  = dlm_recover_nodeid_show
151 };
152 
153 static struct attribute *dlm_attrs[] = {
154 	&dlm_attr_control.attr,
155 	&dlm_attr_event.attr,
156 	&dlm_attr_id.attr,
157 	&dlm_attr_nodir.attr,
158 	&dlm_attr_recover_status.attr,
159 	&dlm_attr_recover_nodeid.attr,
160 	NULL,
161 };
162 ATTRIBUTE_GROUPS(dlm);
163 
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165 			     char *buf)
166 {
167 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169 	return a->show ? a->show(ls, buf) : 0;
170 }
171 
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173 			      const char *buf, size_t len)
174 {
175 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177 	return a->store ? a->store(ls, buf, len) : len;
178 }
179 
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183 	kfree(ls);
184 }
185 
186 static const struct sysfs_ops dlm_attr_ops = {
187 	.show  = dlm_attr_show,
188 	.store = dlm_attr_store,
189 };
190 
191 static struct kobj_type dlm_ktype = {
192 	.default_groups = dlm_groups,
193 	.sysfs_ops     = &dlm_attr_ops,
194 	.release       = lockspace_kobj_release,
195 };
196 
197 static struct kset *dlm_kset;
198 
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201 	if (in)
202 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203 	else
204 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205 
206 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207 
208 	/* dlm_controld will see the uevent, do the necessary group management
209 	   and then write to sysfs to wake us */
210 
211 	wait_event(ls->ls_uevent_wait,
212 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213 
214 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
215 
216 	return ls->ls_uevent_result;
217 }
218 
219 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
220 {
221 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222 
223 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224 	return 0;
225 }
226 
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228 	.uevent = dlm_uevent,
229 };
230 
231 int __init dlm_lockspace_init(void)
232 {
233 	ls_count = 0;
234 	mutex_init(&ls_lock);
235 	INIT_LIST_HEAD(&lslist);
236 	spin_lock_init(&lslist_lock);
237 
238 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239 	if (!dlm_kset) {
240 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
241 		return -ENOMEM;
242 	}
243 	return 0;
244 }
245 
246 void dlm_lockspace_exit(void)
247 {
248 	kset_unregister(dlm_kset);
249 }
250 
251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253 	struct dlm_ls *ls;
254 
255 	spin_lock(&lslist_lock);
256 	list_for_each_entry(ls, &lslist, ls_list) {
257 		if (time_after_eq(jiffies, ls->ls_scan_time +
258 					    dlm_config.ci_scan_secs * HZ)) {
259 			spin_unlock(&lslist_lock);
260 			return ls;
261 		}
262 	}
263 	spin_unlock(&lslist_lock);
264 	return NULL;
265 }
266 
267 static int dlm_scand(void *data)
268 {
269 	struct dlm_ls *ls;
270 
271 	while (!kthread_should_stop()) {
272 		ls = find_ls_to_scan();
273 		if (ls) {
274 			if (dlm_lock_recovery_try(ls)) {
275 				ls->ls_scan_time = jiffies;
276 				dlm_scan_rsbs(ls);
277 				dlm_scan_timeout(ls);
278 				dlm_scan_waiters(ls);
279 				dlm_unlock_recovery(ls);
280 			} else {
281 				ls->ls_scan_time += HZ;
282 			}
283 			continue;
284 		}
285 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
286 	}
287 	return 0;
288 }
289 
290 static int dlm_scand_start(void)
291 {
292 	struct task_struct *p;
293 	int error = 0;
294 
295 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
296 	if (IS_ERR(p))
297 		error = PTR_ERR(p);
298 	else
299 		scand_task = p;
300 	return error;
301 }
302 
303 static void dlm_scand_stop(void)
304 {
305 	kthread_stop(scand_task);
306 }
307 
308 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
309 {
310 	struct dlm_ls *ls;
311 
312 	spin_lock(&lslist_lock);
313 
314 	list_for_each_entry(ls, &lslist, ls_list) {
315 		if (ls->ls_global_id == id) {
316 			atomic_inc(&ls->ls_count);
317 			goto out;
318 		}
319 	}
320 	ls = NULL;
321  out:
322 	spin_unlock(&lslist_lock);
323 	return ls;
324 }
325 
326 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
327 {
328 	struct dlm_ls *ls;
329 
330 	spin_lock(&lslist_lock);
331 	list_for_each_entry(ls, &lslist, ls_list) {
332 		if (ls->ls_local_handle == lockspace) {
333 			atomic_inc(&ls->ls_count);
334 			goto out;
335 		}
336 	}
337 	ls = NULL;
338  out:
339 	spin_unlock(&lslist_lock);
340 	return ls;
341 }
342 
343 struct dlm_ls *dlm_find_lockspace_device(int minor)
344 {
345 	struct dlm_ls *ls;
346 
347 	spin_lock(&lslist_lock);
348 	list_for_each_entry(ls, &lslist, ls_list) {
349 		if (ls->ls_device.minor == minor) {
350 			atomic_inc(&ls->ls_count);
351 			goto out;
352 		}
353 	}
354 	ls = NULL;
355  out:
356 	spin_unlock(&lslist_lock);
357 	return ls;
358 }
359 
360 void dlm_put_lockspace(struct dlm_ls *ls)
361 {
362 	if (atomic_dec_and_test(&ls->ls_count))
363 		wake_up(&ls->ls_count_wait);
364 }
365 
366 static void remove_lockspace(struct dlm_ls *ls)
367 {
368 retry:
369 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
370 
371 	spin_lock(&lslist_lock);
372 	if (atomic_read(&ls->ls_count) != 0) {
373 		spin_unlock(&lslist_lock);
374 		goto retry;
375 	}
376 
377 	WARN_ON(ls->ls_create_count != 0);
378 	list_del(&ls->ls_list);
379 	spin_unlock(&lslist_lock);
380 }
381 
382 static int threads_start(void)
383 {
384 	int error;
385 
386 	error = dlm_scand_start();
387 	if (error) {
388 		log_print("cannot start dlm_scand thread %d", error);
389 		goto fail;
390 	}
391 
392 	/* Thread for sending/receiving messages for all lockspace's */
393 	error = dlm_midcomms_start();
394 	if (error) {
395 		log_print("cannot start dlm lowcomms %d", error);
396 		goto scand_fail;
397 	}
398 
399 	return 0;
400 
401  scand_fail:
402 	dlm_scand_stop();
403  fail:
404 	return error;
405 }
406 
407 static int new_lockspace(const char *name, const char *cluster,
408 			 uint32_t flags, int lvblen,
409 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
410 			 int *ops_result, dlm_lockspace_t **lockspace)
411 {
412 	struct dlm_ls *ls;
413 	int i, size, error;
414 	int do_unreg = 0;
415 	int namelen = strlen(name);
416 
417 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
418 		return -EINVAL;
419 
420 	if (!lvblen || (lvblen % 8))
421 		return -EINVAL;
422 
423 	if (!try_module_get(THIS_MODULE))
424 		return -EINVAL;
425 
426 	if (!dlm_user_daemon_available()) {
427 		log_print("dlm user daemon not available");
428 		error = -EUNATCH;
429 		goto out;
430 	}
431 
432 	if (ops && ops_result) {
433 	       	if (!dlm_config.ci_recover_callbacks)
434 			*ops_result = -EOPNOTSUPP;
435 		else
436 			*ops_result = 0;
437 	}
438 
439 	if (!cluster)
440 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
441 			  dlm_config.ci_cluster_name);
442 
443 	if (dlm_config.ci_recover_callbacks && cluster &&
444 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
445 		log_print("dlm cluster name '%s' does not match "
446 			  "the application cluster name '%s'",
447 			  dlm_config.ci_cluster_name, cluster);
448 		error = -EBADR;
449 		goto out;
450 	}
451 
452 	error = 0;
453 
454 	spin_lock(&lslist_lock);
455 	list_for_each_entry(ls, &lslist, ls_list) {
456 		WARN_ON(ls->ls_create_count <= 0);
457 		if (ls->ls_namelen != namelen)
458 			continue;
459 		if (memcmp(ls->ls_name, name, namelen))
460 			continue;
461 		if (flags & DLM_LSFL_NEWEXCL) {
462 			error = -EEXIST;
463 			break;
464 		}
465 		ls->ls_create_count++;
466 		*lockspace = ls;
467 		error = 1;
468 		break;
469 	}
470 	spin_unlock(&lslist_lock);
471 
472 	if (error)
473 		goto out;
474 
475 	error = -ENOMEM;
476 
477 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
478 	if (!ls)
479 		goto out;
480 	memcpy(ls->ls_name, name, namelen);
481 	ls->ls_namelen = namelen;
482 	ls->ls_lvblen = lvblen;
483 	atomic_set(&ls->ls_count, 0);
484 	init_waitqueue_head(&ls->ls_count_wait);
485 	ls->ls_flags = 0;
486 	ls->ls_scan_time = jiffies;
487 
488 	if (ops && dlm_config.ci_recover_callbacks) {
489 		ls->ls_ops = ops;
490 		ls->ls_ops_arg = ops_arg;
491 	}
492 
493 	if (flags & DLM_LSFL_TIMEWARN)
494 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
495 
496 	/* ls_exflags are forced to match among nodes, and we don't
497 	   need to require all nodes to have some flags set */
498 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
499 				    DLM_LSFL_NEWEXCL));
500 
501 	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
502 	ls->ls_rsbtbl_size = size;
503 
504 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
505 	if (!ls->ls_rsbtbl)
506 		goto out_lsfree;
507 	for (i = 0; i < size; i++) {
508 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
509 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
510 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
511 	}
512 
513 	spin_lock_init(&ls->ls_remove_spin);
514 	init_waitqueue_head(&ls->ls_remove_wait);
515 
516 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
517 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
518 						 GFP_KERNEL);
519 		if (!ls->ls_remove_names[i])
520 			goto out_rsbtbl;
521 	}
522 
523 	idr_init(&ls->ls_lkbidr);
524 	spin_lock_init(&ls->ls_lkbidr_spin);
525 
526 	INIT_LIST_HEAD(&ls->ls_waiters);
527 	mutex_init(&ls->ls_waiters_mutex);
528 	INIT_LIST_HEAD(&ls->ls_orphans);
529 	mutex_init(&ls->ls_orphans_mutex);
530 	INIT_LIST_HEAD(&ls->ls_timeout);
531 	mutex_init(&ls->ls_timeout_mutex);
532 
533 	INIT_LIST_HEAD(&ls->ls_new_rsb);
534 	spin_lock_init(&ls->ls_new_rsb_spin);
535 
536 	INIT_LIST_HEAD(&ls->ls_nodes);
537 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
538 	ls->ls_num_nodes = 0;
539 	ls->ls_low_nodeid = 0;
540 	ls->ls_total_weight = 0;
541 	ls->ls_node_array = NULL;
542 
543 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
544 	ls->ls_stub_rsb.res_ls = ls;
545 
546 	ls->ls_debug_rsb_dentry = NULL;
547 	ls->ls_debug_waiters_dentry = NULL;
548 
549 	init_waitqueue_head(&ls->ls_uevent_wait);
550 	ls->ls_uevent_result = 0;
551 	init_completion(&ls->ls_members_done);
552 	ls->ls_members_result = -1;
553 
554 	mutex_init(&ls->ls_cb_mutex);
555 	INIT_LIST_HEAD(&ls->ls_cb_delay);
556 
557 	ls->ls_recoverd_task = NULL;
558 	mutex_init(&ls->ls_recoverd_active);
559 	spin_lock_init(&ls->ls_recover_lock);
560 	spin_lock_init(&ls->ls_rcom_spin);
561 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
562 	ls->ls_recover_status = 0;
563 	ls->ls_recover_seq = 0;
564 	ls->ls_recover_args = NULL;
565 	init_rwsem(&ls->ls_in_recovery);
566 	init_rwsem(&ls->ls_recv_active);
567 	INIT_LIST_HEAD(&ls->ls_requestqueue);
568 	atomic_set(&ls->ls_requestqueue_cnt, 0);
569 	init_waitqueue_head(&ls->ls_requestqueue_wait);
570 	mutex_init(&ls->ls_requestqueue_mutex);
571 	mutex_init(&ls->ls_clear_proc_locks);
572 
573 	/* Due backwards compatibility with 3.1 we need to use maximum
574 	 * possible dlm message size to be sure the message will fit and
575 	 * not having out of bounds issues. However on sending side 3.2
576 	 * might send less.
577 	 */
578 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
579 	if (!ls->ls_recover_buf)
580 		goto out_lkbidr;
581 
582 	ls->ls_slot = 0;
583 	ls->ls_num_slots = 0;
584 	ls->ls_slots_size = 0;
585 	ls->ls_slots = NULL;
586 
587 	INIT_LIST_HEAD(&ls->ls_recover_list);
588 	spin_lock_init(&ls->ls_recover_list_lock);
589 	idr_init(&ls->ls_recover_idr);
590 	spin_lock_init(&ls->ls_recover_idr_lock);
591 	ls->ls_recover_list_count = 0;
592 	ls->ls_local_handle = ls;
593 	init_waitqueue_head(&ls->ls_wait_general);
594 	INIT_LIST_HEAD(&ls->ls_root_list);
595 	init_rwsem(&ls->ls_root_sem);
596 
597 	spin_lock(&lslist_lock);
598 	ls->ls_create_count = 1;
599 	list_add(&ls->ls_list, &lslist);
600 	spin_unlock(&lslist_lock);
601 
602 	if (flags & DLM_LSFL_FS) {
603 		error = dlm_callback_start(ls);
604 		if (error) {
605 			log_error(ls, "can't start dlm_callback %d", error);
606 			goto out_delist;
607 		}
608 	}
609 
610 	init_waitqueue_head(&ls->ls_recover_lock_wait);
611 
612 	/*
613 	 * Once started, dlm_recoverd first looks for ls in lslist, then
614 	 * initializes ls_in_recovery as locked in "down" mode.  We need
615 	 * to wait for the wakeup from dlm_recoverd because in_recovery
616 	 * has to start out in down mode.
617 	 */
618 
619 	error = dlm_recoverd_start(ls);
620 	if (error) {
621 		log_error(ls, "can't start dlm_recoverd %d", error);
622 		goto out_callback;
623 	}
624 
625 	wait_event(ls->ls_recover_lock_wait,
626 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
627 
628 	/* let kobject handle freeing of ls if there's an error */
629 	do_unreg = 1;
630 
631 	ls->ls_kobj.kset = dlm_kset;
632 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
633 				     "%s", ls->ls_name);
634 	if (error)
635 		goto out_recoverd;
636 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
637 
638 	/* This uevent triggers dlm_controld in userspace to add us to the
639 	   group of nodes that are members of this lockspace (managed by the
640 	   cluster infrastructure.)  Once it's done that, it tells us who the
641 	   current lockspace members are (via configfs) and then tells the
642 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
643 
644 	error = do_uevent(ls, 1);
645 	if (error)
646 		goto out_recoverd;
647 
648 	wait_for_completion(&ls->ls_members_done);
649 	error = ls->ls_members_result;
650 	if (error)
651 		goto out_members;
652 
653 	dlm_create_debug_file(ls);
654 
655 	log_rinfo(ls, "join complete");
656 	*lockspace = ls;
657 	return 0;
658 
659  out_members:
660 	do_uevent(ls, 0);
661 	dlm_clear_members(ls);
662 	kfree(ls->ls_node_array);
663  out_recoverd:
664 	dlm_recoverd_stop(ls);
665  out_callback:
666 	dlm_callback_stop(ls);
667  out_delist:
668 	spin_lock(&lslist_lock);
669 	list_del(&ls->ls_list);
670 	spin_unlock(&lslist_lock);
671 	idr_destroy(&ls->ls_recover_idr);
672 	kfree(ls->ls_recover_buf);
673  out_lkbidr:
674 	idr_destroy(&ls->ls_lkbidr);
675  out_rsbtbl:
676 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
677 		kfree(ls->ls_remove_names[i]);
678 	vfree(ls->ls_rsbtbl);
679  out_lsfree:
680 	if (do_unreg)
681 		kobject_put(&ls->ls_kobj);
682 	else
683 		kfree(ls);
684  out:
685 	module_put(THIS_MODULE);
686 	return error;
687 }
688 
689 int dlm_new_lockspace(const char *name, const char *cluster,
690 		      uint32_t flags, int lvblen,
691 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
692 		      int *ops_result, dlm_lockspace_t **lockspace)
693 {
694 	int error = 0;
695 
696 	mutex_lock(&ls_lock);
697 	if (!ls_count)
698 		error = threads_start();
699 	if (error)
700 		goto out;
701 
702 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
703 			      ops_result, lockspace);
704 	if (!error)
705 		ls_count++;
706 	if (error > 0)
707 		error = 0;
708 	if (!ls_count) {
709 		dlm_scand_stop();
710 		dlm_midcomms_shutdown();
711 		dlm_lowcomms_stop();
712 	}
713  out:
714 	mutex_unlock(&ls_lock);
715 	return error;
716 }
717 
718 static int lkb_idr_is_local(int id, void *p, void *data)
719 {
720 	struct dlm_lkb *lkb = p;
721 
722 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
723 }
724 
725 static int lkb_idr_is_any(int id, void *p, void *data)
726 {
727 	return 1;
728 }
729 
730 static int lkb_idr_free(int id, void *p, void *data)
731 {
732 	struct dlm_lkb *lkb = p;
733 
734 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
735 		dlm_free_lvb(lkb->lkb_lvbptr);
736 
737 	dlm_free_lkb(lkb);
738 	return 0;
739 }
740 
741 /* NOTE: We check the lkbidr here rather than the resource table.
742    This is because there may be LKBs queued as ASTs that have been unlinked
743    from their RSBs and are pending deletion once the AST has been delivered */
744 
745 static int lockspace_busy(struct dlm_ls *ls, int force)
746 {
747 	int rv;
748 
749 	spin_lock(&ls->ls_lkbidr_spin);
750 	if (force == 0) {
751 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
752 	} else if (force == 1) {
753 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
754 	} else {
755 		rv = 0;
756 	}
757 	spin_unlock(&ls->ls_lkbidr_spin);
758 	return rv;
759 }
760 
761 static int release_lockspace(struct dlm_ls *ls, int force)
762 {
763 	struct dlm_rsb *rsb;
764 	struct rb_node *n;
765 	int i, busy, rv;
766 
767 	busy = lockspace_busy(ls, force);
768 
769 	spin_lock(&lslist_lock);
770 	if (ls->ls_create_count == 1) {
771 		if (busy) {
772 			rv = -EBUSY;
773 		} else {
774 			/* remove_lockspace takes ls off lslist */
775 			ls->ls_create_count = 0;
776 			rv = 0;
777 		}
778 	} else if (ls->ls_create_count > 1) {
779 		rv = --ls->ls_create_count;
780 	} else {
781 		rv = -EINVAL;
782 	}
783 	spin_unlock(&lslist_lock);
784 
785 	if (rv) {
786 		log_debug(ls, "release_lockspace no remove %d", rv);
787 		return rv;
788 	}
789 
790 	dlm_device_deregister(ls);
791 
792 	if (force < 3 && dlm_user_daemon_available())
793 		do_uevent(ls, 0);
794 
795 	dlm_recoverd_stop(ls);
796 
797 	if (ls_count == 1) {
798 		dlm_scand_stop();
799 		dlm_clear_members(ls);
800 		dlm_midcomms_shutdown();
801 	}
802 
803 	dlm_callback_stop(ls);
804 
805 	remove_lockspace(ls);
806 
807 	dlm_delete_debug_file(ls);
808 
809 	idr_destroy(&ls->ls_recover_idr);
810 	kfree(ls->ls_recover_buf);
811 
812 	/*
813 	 * Free all lkb's in idr
814 	 */
815 
816 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
817 	idr_destroy(&ls->ls_lkbidr);
818 
819 	/*
820 	 * Free all rsb's on rsbtbl[] lists
821 	 */
822 
823 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
824 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
825 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
826 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
827 			dlm_free_rsb(rsb);
828 		}
829 
830 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
831 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
832 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
833 			dlm_free_rsb(rsb);
834 		}
835 	}
836 
837 	vfree(ls->ls_rsbtbl);
838 
839 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
840 		kfree(ls->ls_remove_names[i]);
841 
842 	while (!list_empty(&ls->ls_new_rsb)) {
843 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
844 				       res_hashchain);
845 		list_del(&rsb->res_hashchain);
846 		dlm_free_rsb(rsb);
847 	}
848 
849 	/*
850 	 * Free structures on any other lists
851 	 */
852 
853 	dlm_purge_requestqueue(ls);
854 	kfree(ls->ls_recover_args);
855 	dlm_clear_members(ls);
856 	dlm_clear_members_gone(ls);
857 	kfree(ls->ls_node_array);
858 	log_rinfo(ls, "release_lockspace final free");
859 	kobject_put(&ls->ls_kobj);
860 	/* The ls structure will be freed when the kobject is done with */
861 
862 	module_put(THIS_MODULE);
863 	return 0;
864 }
865 
866 /*
867  * Called when a system has released all its locks and is not going to use the
868  * lockspace any longer.  We free everything we're managing for this lockspace.
869  * Remaining nodes will go through the recovery process as if we'd died.  The
870  * lockspace must continue to function as usual, participating in recoveries,
871  * until this returns.
872  *
873  * Force has 4 possible values:
874  * 0 - don't destroy lockspace if it has any LKBs
875  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
876  * 2 - destroy lockspace regardless of LKBs
877  * 3 - destroy lockspace as part of a forced shutdown
878  */
879 
880 int dlm_release_lockspace(void *lockspace, int force)
881 {
882 	struct dlm_ls *ls;
883 	int error;
884 
885 	ls = dlm_find_lockspace_local(lockspace);
886 	if (!ls)
887 		return -EINVAL;
888 	dlm_put_lockspace(ls);
889 
890 	mutex_lock(&ls_lock);
891 	error = release_lockspace(ls, force);
892 	if (!error)
893 		ls_count--;
894 	if (!ls_count)
895 		dlm_lowcomms_stop();
896 	mutex_unlock(&ls_lock);
897 
898 	return error;
899 }
900 
901 void dlm_stop_lockspaces(void)
902 {
903 	struct dlm_ls *ls;
904 	int count;
905 
906  restart:
907 	count = 0;
908 	spin_lock(&lslist_lock);
909 	list_for_each_entry(ls, &lslist, ls_list) {
910 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
911 			count++;
912 			continue;
913 		}
914 		spin_unlock(&lslist_lock);
915 		log_error(ls, "no userland control daemon, stopping lockspace");
916 		dlm_ls_stop(ls);
917 		goto restart;
918 	}
919 	spin_unlock(&lslist_lock);
920 
921 	if (count)
922 		log_print("dlm user daemon left %d lockspaces", count);
923 }
924 
925