xref: /linux/fs/dlm/lockspace.c (revision 39c089a01a7e431383710a566864644cbbc0f8fe)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 
33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34 {
35 	ssize_t ret = len;
36 	int n;
37 	int rc = kstrtoint(buf, 0, &n);
38 
39 	if (rc)
40 		return rc;
41 	ls = dlm_find_lockspace_local(ls);
42 	if (!ls)
43 		return -EINVAL;
44 
45 	switch (n) {
46 	case 0:
47 		dlm_ls_stop(ls);
48 		break;
49 	case 1:
50 		dlm_ls_start(ls);
51 		break;
52 	default:
53 		ret = -EINVAL;
54 	}
55 	dlm_put_lockspace(ls);
56 	return ret;
57 }
58 
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62 
63 	if (rc)
64 		return rc;
65 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66 	wake_up(&ls->ls_uevent_wait);
67 	return len;
68 }
69 
70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71 {
72 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73 }
74 
75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76 {
77 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78 
79 	if (rc)
80 		return rc;
81 	return len;
82 }
83 
84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85 {
86 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87 }
88 
89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90 {
91 	int val;
92 	int rc = kstrtoint(buf, 0, &val);
93 
94 	if (rc)
95 		return rc;
96 	if (val == 1)
97 		set_bit(LSFL_NODIR, &ls->ls_flags);
98 	return len;
99 }
100 
101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102 {
103 	uint32_t status = dlm_recover_status(ls);
104 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
105 }
106 
107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108 {
109 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110 }
111 
112 struct dlm_attr {
113 	struct attribute attr;
114 	ssize_t (*show)(struct dlm_ls *, char *);
115 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116 };
117 
118 static struct dlm_attr dlm_attr_control = {
119 	.attr  = {.name = "control", .mode = S_IWUSR},
120 	.store = dlm_control_store
121 };
122 
123 static struct dlm_attr dlm_attr_event = {
124 	.attr  = {.name = "event_done", .mode = S_IWUSR},
125 	.store = dlm_event_store
126 };
127 
128 static struct dlm_attr dlm_attr_id = {
129 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130 	.show  = dlm_id_show,
131 	.store = dlm_id_store
132 };
133 
134 static struct dlm_attr dlm_attr_nodir = {
135 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136 	.show  = dlm_nodir_show,
137 	.store = dlm_nodir_store
138 };
139 
140 static struct dlm_attr dlm_attr_recover_status = {
141 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
142 	.show  = dlm_recover_status_show
143 };
144 
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
147 	.show  = dlm_recover_nodeid_show
148 };
149 
150 static struct attribute *dlm_attrs[] = {
151 	&dlm_attr_control.attr,
152 	&dlm_attr_event.attr,
153 	&dlm_attr_id.attr,
154 	&dlm_attr_nodir.attr,
155 	&dlm_attr_recover_status.attr,
156 	&dlm_attr_recover_nodeid.attr,
157 	NULL,
158 };
159 ATTRIBUTE_GROUPS(dlm);
160 
161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162 			     char *buf)
163 {
164 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
165 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166 	return a->show ? a->show(ls, buf) : 0;
167 }
168 
169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170 			      const char *buf, size_t len)
171 {
172 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
173 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174 	return a->store ? a->store(ls, buf, len) : len;
175 }
176 
177 static const struct sysfs_ops dlm_attr_ops = {
178 	.show  = dlm_attr_show,
179 	.store = dlm_attr_store,
180 };
181 
182 static struct kobj_type dlm_ktype = {
183 	.default_groups = dlm_groups,
184 	.sysfs_ops     = &dlm_attr_ops,
185 };
186 
187 static struct kset *dlm_kset;
188 
189 static int do_uevent(struct dlm_ls *ls, int in)
190 {
191 	if (in)
192 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
193 	else
194 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
195 
196 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
197 
198 	/* dlm_controld will see the uevent, do the necessary group management
199 	   and then write to sysfs to wake us */
200 
201 	wait_event(ls->ls_uevent_wait,
202 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
203 
204 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
205 
206 	return ls->ls_uevent_result;
207 }
208 
209 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
210 {
211 	const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
212 
213 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
214 	return 0;
215 }
216 
217 static const struct kset_uevent_ops dlm_uevent_ops = {
218 	.uevent = dlm_uevent,
219 };
220 
221 int __init dlm_lockspace_init(void)
222 {
223 	ls_count = 0;
224 	mutex_init(&ls_lock);
225 	INIT_LIST_HEAD(&lslist);
226 	spin_lock_init(&lslist_lock);
227 
228 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
229 	if (!dlm_kset) {
230 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
231 		return -ENOMEM;
232 	}
233 	return 0;
234 }
235 
236 void dlm_lockspace_exit(void)
237 {
238 	kset_unregister(dlm_kset);
239 }
240 
241 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
242 {
243 	struct dlm_ls *ls;
244 
245 	spin_lock_bh(&lslist_lock);
246 
247 	list_for_each_entry(ls, &lslist, ls_list) {
248 		if (ls->ls_global_id == id) {
249 			atomic_inc(&ls->ls_count);
250 			goto out;
251 		}
252 	}
253 	ls = NULL;
254  out:
255 	spin_unlock_bh(&lslist_lock);
256 	return ls;
257 }
258 
259 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
260 {
261 	struct dlm_ls *ls = lockspace;
262 
263 	atomic_inc(&ls->ls_count);
264 	return ls;
265 }
266 
267 struct dlm_ls *dlm_find_lockspace_device(int minor)
268 {
269 	struct dlm_ls *ls;
270 
271 	spin_lock_bh(&lslist_lock);
272 	list_for_each_entry(ls, &lslist, ls_list) {
273 		if (ls->ls_device.minor == minor) {
274 			atomic_inc(&ls->ls_count);
275 			goto out;
276 		}
277 	}
278 	ls = NULL;
279  out:
280 	spin_unlock_bh(&lslist_lock);
281 	return ls;
282 }
283 
284 void dlm_put_lockspace(struct dlm_ls *ls)
285 {
286 	if (atomic_dec_and_test(&ls->ls_count))
287 		wake_up(&ls->ls_count_wait);
288 }
289 
290 static void remove_lockspace(struct dlm_ls *ls)
291 {
292 retry:
293 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
294 
295 	spin_lock_bh(&lslist_lock);
296 	if (atomic_read(&ls->ls_count) != 0) {
297 		spin_unlock_bh(&lslist_lock);
298 		goto retry;
299 	}
300 
301 	WARN_ON(ls->ls_create_count != 0);
302 	list_del(&ls->ls_list);
303 	spin_unlock_bh(&lslist_lock);
304 }
305 
306 static int threads_start(void)
307 {
308 	int error;
309 
310 	/* Thread for sending/receiving messages for all lockspace's */
311 	error = dlm_midcomms_start();
312 	if (error)
313 		log_print("cannot start dlm midcomms %d", error);
314 
315 	return error;
316 }
317 
318 static int lkb_idr_free(struct dlm_lkb *lkb)
319 {
320 	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
321 		dlm_free_lvb(lkb->lkb_lvbptr);
322 
323 	dlm_free_lkb(lkb);
324 	return 0;
325 }
326 
327 static void rhash_free_rsb(void *ptr, void *arg)
328 {
329 	struct dlm_rsb *rsb = ptr;
330 
331 	dlm_free_rsb(rsb);
332 }
333 
334 static void free_lockspace(struct work_struct *work)
335 {
336 	struct dlm_ls *ls  = container_of(work, struct dlm_ls, ls_free_work);
337 	struct dlm_lkb *lkb;
338 	unsigned long id;
339 
340 	/*
341 	 * Free all lkb's in xa
342 	 */
343 	xa_for_each(&ls->ls_lkbxa, id, lkb) {
344 		lkb_idr_free(lkb);
345 	}
346 	xa_destroy(&ls->ls_lkbxa);
347 
348 	/*
349 	 * Free all rsb's on rsbtbl
350 	 */
351 	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
352 
353 	kfree(ls);
354 }
355 
356 static int new_lockspace(const char *name, const char *cluster,
357 			 uint32_t flags, int lvblen,
358 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
359 			 int *ops_result, dlm_lockspace_t **lockspace)
360 {
361 	struct dlm_ls *ls;
362 	int namelen = strlen(name);
363 	int error;
364 
365 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
366 		return -EINVAL;
367 
368 	if (lvblen % 8)
369 		return -EINVAL;
370 
371 	if (!try_module_get(THIS_MODULE))
372 		return -EINVAL;
373 
374 	if (!dlm_user_daemon_available()) {
375 		log_print("dlm user daemon not available");
376 		error = -EUNATCH;
377 		goto out;
378 	}
379 
380 	if (ops && ops_result) {
381 	       	if (!dlm_config.ci_recover_callbacks)
382 			*ops_result = -EOPNOTSUPP;
383 		else
384 			*ops_result = 0;
385 	}
386 
387 	if (!cluster)
388 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
389 			  dlm_config.ci_cluster_name);
390 
391 	if (dlm_config.ci_recover_callbacks && cluster &&
392 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
393 		log_print("dlm cluster name '%s' does not match "
394 			  "the application cluster name '%s'",
395 			  dlm_config.ci_cluster_name, cluster);
396 		error = -EBADR;
397 		goto out;
398 	}
399 
400 	error = 0;
401 
402 	spin_lock_bh(&lslist_lock);
403 	list_for_each_entry(ls, &lslist, ls_list) {
404 		WARN_ON(ls->ls_create_count <= 0);
405 		if (ls->ls_namelen != namelen)
406 			continue;
407 		if (memcmp(ls->ls_name, name, namelen))
408 			continue;
409 		if (flags & DLM_LSFL_NEWEXCL) {
410 			error = -EEXIST;
411 			break;
412 		}
413 		ls->ls_create_count++;
414 		*lockspace = ls;
415 		error = 1;
416 		break;
417 	}
418 	spin_unlock_bh(&lslist_lock);
419 
420 	if (error)
421 		goto out;
422 
423 	error = -ENOMEM;
424 
425 	ls = kzalloc(sizeof(*ls), GFP_NOFS);
426 	if (!ls)
427 		goto out;
428 	memcpy(ls->ls_name, name, namelen);
429 	ls->ls_namelen = namelen;
430 	ls->ls_lvblen = lvblen;
431 	atomic_set(&ls->ls_count, 0);
432 	init_waitqueue_head(&ls->ls_count_wait);
433 	ls->ls_flags = 0;
434 
435 	if (ops && dlm_config.ci_recover_callbacks) {
436 		ls->ls_ops = ops;
437 		ls->ls_ops_arg = ops_arg;
438 	}
439 
440 	if (flags & DLM_LSFL_SOFTIRQ)
441 		set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
442 
443 	/* ls_exflags are forced to match among nodes, and we don't
444 	 * need to require all nodes to have some flags set
445 	 */
446 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
447 				    DLM_LSFL_SOFTIRQ));
448 
449 	INIT_LIST_HEAD(&ls->ls_slow_inactive);
450 	INIT_LIST_HEAD(&ls->ls_slow_active);
451 	rwlock_init(&ls->ls_rsbtbl_lock);
452 
453 	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
454 	if (error)
455 		goto out_lsfree;
456 
457 	xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
458 	rwlock_init(&ls->ls_lkbxa_lock);
459 
460 	INIT_LIST_HEAD(&ls->ls_waiters);
461 	spin_lock_init(&ls->ls_waiters_lock);
462 	INIT_LIST_HEAD(&ls->ls_orphans);
463 	spin_lock_init(&ls->ls_orphans_lock);
464 
465 	INIT_LIST_HEAD(&ls->ls_nodes);
466 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
467 	ls->ls_num_nodes = 0;
468 	ls->ls_low_nodeid = 0;
469 	ls->ls_total_weight = 0;
470 	ls->ls_node_array = NULL;
471 
472 	memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
473 	ls->ls_local_rsb.res_ls = ls;
474 
475 	ls->ls_debug_rsb_dentry = NULL;
476 	ls->ls_debug_waiters_dentry = NULL;
477 
478 	init_waitqueue_head(&ls->ls_uevent_wait);
479 	ls->ls_uevent_result = 0;
480 	init_completion(&ls->ls_recovery_done);
481 	ls->ls_recovery_result = -1;
482 
483 	spin_lock_init(&ls->ls_cb_lock);
484 	INIT_LIST_HEAD(&ls->ls_cb_delay);
485 
486 	INIT_WORK(&ls->ls_free_work, free_lockspace);
487 
488 	ls->ls_recoverd_task = NULL;
489 	mutex_init(&ls->ls_recoverd_active);
490 	spin_lock_init(&ls->ls_recover_lock);
491 	spin_lock_init(&ls->ls_rcom_spin);
492 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
493 	ls->ls_recover_status = 0;
494 	ls->ls_recover_seq = get_random_u64();
495 	ls->ls_recover_args = NULL;
496 	init_rwsem(&ls->ls_in_recovery);
497 	rwlock_init(&ls->ls_recv_active);
498 	INIT_LIST_HEAD(&ls->ls_requestqueue);
499 	rwlock_init(&ls->ls_requestqueue_lock);
500 	spin_lock_init(&ls->ls_clear_proc_locks);
501 
502 	/* Due backwards compatibility with 3.1 we need to use maximum
503 	 * possible dlm message size to be sure the message will fit and
504 	 * not having out of bounds issues. However on sending side 3.2
505 	 * might send less.
506 	 */
507 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
508 	if (!ls->ls_recover_buf) {
509 		error = -ENOMEM;
510 		goto out_lkbxa;
511 	}
512 
513 	ls->ls_slot = 0;
514 	ls->ls_num_slots = 0;
515 	ls->ls_slots_size = 0;
516 	ls->ls_slots = NULL;
517 
518 	INIT_LIST_HEAD(&ls->ls_recover_list);
519 	spin_lock_init(&ls->ls_recover_list_lock);
520 	xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
521 	spin_lock_init(&ls->ls_recover_xa_lock);
522 	ls->ls_recover_list_count = 0;
523 	init_waitqueue_head(&ls->ls_wait_general);
524 	INIT_LIST_HEAD(&ls->ls_masters_list);
525 	rwlock_init(&ls->ls_masters_lock);
526 	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
527 	rwlock_init(&ls->ls_dir_dump_lock);
528 
529 	INIT_LIST_HEAD(&ls->ls_scan_list);
530 	spin_lock_init(&ls->ls_scan_lock);
531 	timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
532 
533 	spin_lock_bh(&lslist_lock);
534 	ls->ls_create_count = 1;
535 	list_add(&ls->ls_list, &lslist);
536 	spin_unlock_bh(&lslist_lock);
537 
538 	if (flags & DLM_LSFL_FS)
539 		set_bit(LSFL_FS, &ls->ls_flags);
540 
541 	error = dlm_callback_start(ls);
542 	if (error) {
543 		log_error(ls, "can't start dlm_callback %d", error);
544 		goto out_delist;
545 	}
546 
547 	init_waitqueue_head(&ls->ls_recover_lock_wait);
548 
549 	/*
550 	 * Once started, dlm_recoverd first looks for ls in lslist, then
551 	 * initializes ls_in_recovery as locked in "down" mode.  We need
552 	 * to wait for the wakeup from dlm_recoverd because in_recovery
553 	 * has to start out in down mode.
554 	 */
555 
556 	error = dlm_recoverd_start(ls);
557 	if (error) {
558 		log_error(ls, "can't start dlm_recoverd %d", error);
559 		goto out_callback;
560 	}
561 
562 	wait_event(ls->ls_recover_lock_wait,
563 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
564 
565 	ls->ls_kobj.kset = dlm_kset;
566 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
567 				     "%s", ls->ls_name);
568 	if (error)
569 		goto out_recoverd;
570 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
571 
572 	/* This uevent triggers dlm_controld in userspace to add us to the
573 	   group of nodes that are members of this lockspace (managed by the
574 	   cluster infrastructure.)  Once it's done that, it tells us who the
575 	   current lockspace members are (via configfs) and then tells the
576 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
577 
578 	error = do_uevent(ls, 1);
579 	if (error)
580 		goto out_recoverd;
581 
582 	/* wait until recovery is successful or failed */
583 	wait_for_completion(&ls->ls_recovery_done);
584 	error = ls->ls_recovery_result;
585 	if (error)
586 		goto out_members;
587 
588 	dlm_create_debug_file(ls);
589 
590 	log_rinfo(ls, "join complete");
591 	*lockspace = ls;
592 	return 0;
593 
594  out_members:
595 	do_uevent(ls, 0);
596 	dlm_clear_members(ls);
597 	kfree(ls->ls_node_array);
598  out_recoverd:
599 	dlm_recoverd_stop(ls);
600  out_callback:
601 	dlm_callback_stop(ls);
602  out_delist:
603 	spin_lock_bh(&lslist_lock);
604 	list_del(&ls->ls_list);
605 	spin_unlock_bh(&lslist_lock);
606 	xa_destroy(&ls->ls_recover_xa);
607 	kfree(ls->ls_recover_buf);
608  out_lkbxa:
609 	xa_destroy(&ls->ls_lkbxa);
610 	rhashtable_destroy(&ls->ls_rsbtbl);
611  out_lsfree:
612 	kobject_put(&ls->ls_kobj);
613 	kfree(ls);
614  out:
615 	module_put(THIS_MODULE);
616 	return error;
617 }
618 
619 static int __dlm_new_lockspace(const char *name, const char *cluster,
620 			       uint32_t flags, int lvblen,
621 			       const struct dlm_lockspace_ops *ops,
622 			       void *ops_arg, int *ops_result,
623 			       dlm_lockspace_t **lockspace)
624 {
625 	int error = 0;
626 
627 	mutex_lock(&ls_lock);
628 	if (!ls_count)
629 		error = threads_start();
630 	if (error)
631 		goto out;
632 
633 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
634 			      ops_result, lockspace);
635 	if (!error)
636 		ls_count++;
637 	if (error > 0)
638 		error = 0;
639 	if (!ls_count) {
640 		dlm_midcomms_shutdown();
641 		dlm_midcomms_stop();
642 	}
643  out:
644 	mutex_unlock(&ls_lock);
645 	return error;
646 }
647 
648 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
649 		      int lvblen, const struct dlm_lockspace_ops *ops,
650 		      void *ops_arg, int *ops_result,
651 		      dlm_lockspace_t **lockspace)
652 {
653 	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
654 				   ops, ops_arg, ops_result, lockspace);
655 }
656 
657 int dlm_new_user_lockspace(const char *name, const char *cluster,
658 			   uint32_t flags, int lvblen,
659 			   const struct dlm_lockspace_ops *ops,
660 			   void *ops_arg, int *ops_result,
661 			   dlm_lockspace_t **lockspace)
662 {
663 	if (flags & DLM_LSFL_SOFTIRQ)
664 		return -EINVAL;
665 
666 	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
667 				   ops_arg, ops_result, lockspace);
668 }
669 
670 /* NOTE: We check the lkbxa here rather than the resource table.
671    This is because there may be LKBs queued as ASTs that have been unlinked
672    from their RSBs and are pending deletion once the AST has been delivered */
673 
674 static int lockspace_busy(struct dlm_ls *ls, int force)
675 {
676 	struct dlm_lkb *lkb;
677 	unsigned long id;
678 	int rv = 0;
679 
680 	read_lock_bh(&ls->ls_lkbxa_lock);
681 	if (force == 0) {
682 		xa_for_each(&ls->ls_lkbxa, id, lkb) {
683 			rv = 1;
684 			break;
685 		}
686 	} else if (force == 1) {
687 		xa_for_each(&ls->ls_lkbxa, id, lkb) {
688 			if (lkb->lkb_nodeid == 0 &&
689 			    lkb->lkb_grmode != DLM_LOCK_IV) {
690 				rv = 1;
691 				break;
692 			}
693 		}
694 	} else {
695 		rv = 0;
696 	}
697 	read_unlock_bh(&ls->ls_lkbxa_lock);
698 	return rv;
699 }
700 
701 static int release_lockspace(struct dlm_ls *ls, int force)
702 {
703 	int busy, rv;
704 
705 	busy = lockspace_busy(ls, force);
706 
707 	spin_lock_bh(&lslist_lock);
708 	if (ls->ls_create_count == 1) {
709 		if (busy) {
710 			rv = -EBUSY;
711 		} else {
712 			/* remove_lockspace takes ls off lslist */
713 			ls->ls_create_count = 0;
714 			rv = 0;
715 		}
716 	} else if (ls->ls_create_count > 1) {
717 		rv = --ls->ls_create_count;
718 	} else {
719 		rv = -EINVAL;
720 	}
721 	spin_unlock_bh(&lslist_lock);
722 
723 	if (rv) {
724 		log_debug(ls, "release_lockspace no remove %d", rv);
725 		return rv;
726 	}
727 
728 	if (ls_count == 1)
729 		dlm_midcomms_version_wait();
730 
731 	dlm_device_deregister(ls);
732 
733 	if (force < 3 && dlm_user_daemon_available())
734 		do_uevent(ls, 0);
735 
736 	dlm_recoverd_stop(ls);
737 
738 	/* clear the LSFL_RUNNING flag to fast up
739 	 * time_shutdown_sync(), we don't care anymore
740 	 */
741 	clear_bit(LSFL_RUNNING, &ls->ls_flags);
742 	timer_shutdown_sync(&ls->ls_scan_timer);
743 
744 	if (ls_count == 1) {
745 		dlm_clear_members(ls);
746 		dlm_midcomms_shutdown();
747 	}
748 
749 	dlm_callback_stop(ls);
750 
751 	remove_lockspace(ls);
752 
753 	dlm_delete_debug_file(ls);
754 
755 	kobject_put(&ls->ls_kobj);
756 
757 	xa_destroy(&ls->ls_recover_xa);
758 	kfree(ls->ls_recover_buf);
759 
760 	/*
761 	 * Free structures on any other lists
762 	 */
763 
764 	dlm_purge_requestqueue(ls);
765 	kfree(ls->ls_recover_args);
766 	dlm_clear_members(ls);
767 	dlm_clear_members_gone(ls);
768 	kfree(ls->ls_node_array);
769 
770 	log_rinfo(ls, "%s final free", __func__);
771 
772 	/* delayed free of data structures see free_lockspace() */
773 	queue_work(dlm_wq, &ls->ls_free_work);
774 	module_put(THIS_MODULE);
775 	return 0;
776 }
777 
778 /*
779  * Called when a system has released all its locks and is not going to use the
780  * lockspace any longer.  We free everything we're managing for this lockspace.
781  * Remaining nodes will go through the recovery process as if we'd died.  The
782  * lockspace must continue to function as usual, participating in recoveries,
783  * until this returns.
784  *
785  * Force has 4 possible values:
786  * 0 - don't destroy lockspace if it has any LKBs
787  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
788  * 2 - destroy lockspace regardless of LKBs
789  * 3 - destroy lockspace as part of a forced shutdown
790  */
791 
792 int dlm_release_lockspace(void *lockspace, int force)
793 {
794 	struct dlm_ls *ls;
795 	int error;
796 
797 	ls = dlm_find_lockspace_local(lockspace);
798 	if (!ls)
799 		return -EINVAL;
800 	dlm_put_lockspace(ls);
801 
802 	mutex_lock(&ls_lock);
803 	error = release_lockspace(ls, force);
804 	if (!error)
805 		ls_count--;
806 	if (!ls_count)
807 		dlm_midcomms_stop();
808 	mutex_unlock(&ls_lock);
809 
810 	return error;
811 }
812 
813 void dlm_stop_lockspaces(void)
814 {
815 	struct dlm_ls *ls;
816 	int count;
817 
818  restart:
819 	count = 0;
820 	spin_lock_bh(&lslist_lock);
821 	list_for_each_entry(ls, &lslist, ls_list) {
822 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
823 			count++;
824 			continue;
825 		}
826 		spin_unlock_bh(&lslist_lock);
827 		log_error(ls, "no userland control daemon, stopping lockspace");
828 		dlm_ls_stop(ls);
829 		goto restart;
830 	}
831 	spin_unlock_bh(&lslist_lock);
832 
833 	if (count)
834 		log_print("dlm user daemon left %d lockspaces", count);
835 }
836