xref: /linux/fs/dlm/lockspace.c (revision 46e6acfe3501fa938af9c5bd730f0020235b08a2)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 
33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34 {
35 	ssize_t ret = len;
36 	int n;
37 	int rc = kstrtoint(buf, 0, &n);
38 
39 	if (rc)
40 		return rc;
41 	ls = dlm_find_lockspace_local(ls);
42 	if (!ls)
43 		return -EINVAL;
44 
45 	switch (n) {
46 	case 0:
47 		dlm_ls_stop(ls);
48 		break;
49 	case 1:
50 		dlm_ls_start(ls);
51 		break;
52 	default:
53 		ret = -EINVAL;
54 	}
55 	dlm_put_lockspace(ls);
56 	return ret;
57 }
58 
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62 
63 	if (rc)
64 		return rc;
65 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66 	wake_up(&ls->ls_uevent_wait);
67 	return len;
68 }
69 
70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71 {
72 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73 }
74 
75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76 {
77 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78 
79 	if (rc)
80 		return rc;
81 	return len;
82 }
83 
84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85 {
86 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87 }
88 
89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90 {
91 	int val;
92 	int rc = kstrtoint(buf, 0, &val);
93 
94 	if (rc)
95 		return rc;
96 	if (val == 1)
97 		set_bit(LSFL_NODIR, &ls->ls_flags);
98 	return len;
99 }
100 
101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102 {
103 	uint32_t status = dlm_recover_status(ls);
104 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
105 }
106 
107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108 {
109 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110 }
111 
112 struct dlm_attr {
113 	struct attribute attr;
114 	ssize_t (*show)(struct dlm_ls *, char *);
115 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116 };
117 
118 static struct dlm_attr dlm_attr_control = {
119 	.attr  = {.name = "control", .mode = S_IWUSR},
120 	.store = dlm_control_store
121 };
122 
123 static struct dlm_attr dlm_attr_event = {
124 	.attr  = {.name = "event_done", .mode = S_IWUSR},
125 	.store = dlm_event_store
126 };
127 
128 static struct dlm_attr dlm_attr_id = {
129 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130 	.show  = dlm_id_show,
131 	.store = dlm_id_store
132 };
133 
134 static struct dlm_attr dlm_attr_nodir = {
135 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136 	.show  = dlm_nodir_show,
137 	.store = dlm_nodir_store
138 };
139 
140 static struct dlm_attr dlm_attr_recover_status = {
141 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
142 	.show  = dlm_recover_status_show
143 };
144 
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
147 	.show  = dlm_recover_nodeid_show
148 };
149 
150 static struct attribute *dlm_attrs[] = {
151 	&dlm_attr_control.attr,
152 	&dlm_attr_event.attr,
153 	&dlm_attr_id.attr,
154 	&dlm_attr_nodir.attr,
155 	&dlm_attr_recover_status.attr,
156 	&dlm_attr_recover_nodeid.attr,
157 	NULL,
158 };
159 ATTRIBUTE_GROUPS(dlm);
160 
161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162 			     char *buf)
163 {
164 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
165 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166 	return a->show ? a->show(ls, buf) : 0;
167 }
168 
169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170 			      const char *buf, size_t len)
171 {
172 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
173 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174 	return a->store ? a->store(ls, buf, len) : len;
175 }
176 
177 static void lockspace_kobj_release(struct kobject *k)
178 {
179 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
180 	kfree(ls);
181 }
182 
183 static const struct sysfs_ops dlm_attr_ops = {
184 	.show  = dlm_attr_show,
185 	.store = dlm_attr_store,
186 };
187 
188 static struct kobj_type dlm_ktype = {
189 	.default_groups = dlm_groups,
190 	.sysfs_ops     = &dlm_attr_ops,
191 	.release       = lockspace_kobj_release,
192 };
193 
194 static struct kset *dlm_kset;
195 
196 static int do_uevent(struct dlm_ls *ls, int in)
197 {
198 	if (in)
199 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
200 	else
201 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
202 
203 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
204 
205 	/* dlm_controld will see the uevent, do the necessary group management
206 	   and then write to sysfs to wake us */
207 
208 	wait_event(ls->ls_uevent_wait,
209 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
210 
211 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
212 
213 	return ls->ls_uevent_result;
214 }
215 
216 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
217 {
218 	const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
219 
220 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
221 	return 0;
222 }
223 
224 static const struct kset_uevent_ops dlm_uevent_ops = {
225 	.uevent = dlm_uevent,
226 };
227 
228 int __init dlm_lockspace_init(void)
229 {
230 	ls_count = 0;
231 	mutex_init(&ls_lock);
232 	INIT_LIST_HEAD(&lslist);
233 	spin_lock_init(&lslist_lock);
234 
235 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
236 	if (!dlm_kset) {
237 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
238 		return -ENOMEM;
239 	}
240 	return 0;
241 }
242 
243 void dlm_lockspace_exit(void)
244 {
245 	kset_unregister(dlm_kset);
246 }
247 
248 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
249 {
250 	struct dlm_ls *ls;
251 
252 	spin_lock_bh(&lslist_lock);
253 
254 	list_for_each_entry(ls, &lslist, ls_list) {
255 		if (ls->ls_global_id == id) {
256 			atomic_inc(&ls->ls_count);
257 			goto out;
258 		}
259 	}
260 	ls = NULL;
261  out:
262 	spin_unlock_bh(&lslist_lock);
263 	return ls;
264 }
265 
266 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
267 {
268 	struct dlm_ls *ls = lockspace;
269 
270 	atomic_inc(&ls->ls_count);
271 	return ls;
272 }
273 
274 struct dlm_ls *dlm_find_lockspace_device(int minor)
275 {
276 	struct dlm_ls *ls;
277 
278 	spin_lock_bh(&lslist_lock);
279 	list_for_each_entry(ls, &lslist, ls_list) {
280 		if (ls->ls_device.minor == minor) {
281 			atomic_inc(&ls->ls_count);
282 			goto out;
283 		}
284 	}
285 	ls = NULL;
286  out:
287 	spin_unlock_bh(&lslist_lock);
288 	return ls;
289 }
290 
291 void dlm_put_lockspace(struct dlm_ls *ls)
292 {
293 	if (atomic_dec_and_test(&ls->ls_count))
294 		wake_up(&ls->ls_count_wait);
295 }
296 
297 static void remove_lockspace(struct dlm_ls *ls)
298 {
299 retry:
300 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
301 
302 	spin_lock_bh(&lslist_lock);
303 	if (atomic_read(&ls->ls_count) != 0) {
304 		spin_unlock_bh(&lslist_lock);
305 		goto retry;
306 	}
307 
308 	WARN_ON(ls->ls_create_count != 0);
309 	list_del(&ls->ls_list);
310 	spin_unlock_bh(&lslist_lock);
311 }
312 
313 static int threads_start(void)
314 {
315 	int error;
316 
317 	/* Thread for sending/receiving messages for all lockspace's */
318 	error = dlm_midcomms_start();
319 	if (error)
320 		log_print("cannot start dlm midcomms %d", error);
321 
322 	return error;
323 }
324 
325 static int new_lockspace(const char *name, const char *cluster,
326 			 uint32_t flags, int lvblen,
327 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
328 			 int *ops_result, dlm_lockspace_t **lockspace)
329 {
330 	struct dlm_ls *ls;
331 	int do_unreg = 0;
332 	int namelen = strlen(name);
333 	int error;
334 
335 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
336 		return -EINVAL;
337 
338 	if (lvblen % 8)
339 		return -EINVAL;
340 
341 	if (!try_module_get(THIS_MODULE))
342 		return -EINVAL;
343 
344 	if (!dlm_user_daemon_available()) {
345 		log_print("dlm user daemon not available");
346 		error = -EUNATCH;
347 		goto out;
348 	}
349 
350 	if (ops && ops_result) {
351 	       	if (!dlm_config.ci_recover_callbacks)
352 			*ops_result = -EOPNOTSUPP;
353 		else
354 			*ops_result = 0;
355 	}
356 
357 	if (!cluster)
358 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
359 			  dlm_config.ci_cluster_name);
360 
361 	if (dlm_config.ci_recover_callbacks && cluster &&
362 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
363 		log_print("dlm cluster name '%s' does not match "
364 			  "the application cluster name '%s'",
365 			  dlm_config.ci_cluster_name, cluster);
366 		error = -EBADR;
367 		goto out;
368 	}
369 
370 	error = 0;
371 
372 	spin_lock_bh(&lslist_lock);
373 	list_for_each_entry(ls, &lslist, ls_list) {
374 		WARN_ON(ls->ls_create_count <= 0);
375 		if (ls->ls_namelen != namelen)
376 			continue;
377 		if (memcmp(ls->ls_name, name, namelen))
378 			continue;
379 		if (flags & DLM_LSFL_NEWEXCL) {
380 			error = -EEXIST;
381 			break;
382 		}
383 		ls->ls_create_count++;
384 		*lockspace = ls;
385 		error = 1;
386 		break;
387 	}
388 	spin_unlock_bh(&lslist_lock);
389 
390 	if (error)
391 		goto out;
392 
393 	error = -ENOMEM;
394 
395 	ls = kzalloc(sizeof(*ls), GFP_NOFS);
396 	if (!ls)
397 		goto out;
398 	memcpy(ls->ls_name, name, namelen);
399 	ls->ls_namelen = namelen;
400 	ls->ls_lvblen = lvblen;
401 	atomic_set(&ls->ls_count, 0);
402 	init_waitqueue_head(&ls->ls_count_wait);
403 	ls->ls_flags = 0;
404 
405 	if (ops && dlm_config.ci_recover_callbacks) {
406 		ls->ls_ops = ops;
407 		ls->ls_ops_arg = ops_arg;
408 	}
409 
410 	if (flags & DLM_LSFL_SOFTIRQ)
411 		set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
412 
413 	/* ls_exflags are forced to match among nodes, and we don't
414 	 * need to require all nodes to have some flags set
415 	 */
416 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
417 				    DLM_LSFL_SOFTIRQ));
418 
419 	INIT_LIST_HEAD(&ls->ls_slow_inactive);
420 	INIT_LIST_HEAD(&ls->ls_slow_active);
421 	rwlock_init(&ls->ls_rsbtbl_lock);
422 
423 	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
424 	if (error)
425 		goto out_lsfree;
426 
427 	xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
428 	rwlock_init(&ls->ls_lkbxa_lock);
429 
430 	INIT_LIST_HEAD(&ls->ls_waiters);
431 	spin_lock_init(&ls->ls_waiters_lock);
432 	INIT_LIST_HEAD(&ls->ls_orphans);
433 	spin_lock_init(&ls->ls_orphans_lock);
434 
435 	INIT_LIST_HEAD(&ls->ls_nodes);
436 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
437 	ls->ls_num_nodes = 0;
438 	ls->ls_low_nodeid = 0;
439 	ls->ls_total_weight = 0;
440 	ls->ls_node_array = NULL;
441 
442 	memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
443 	ls->ls_local_rsb.res_ls = ls;
444 
445 	ls->ls_debug_rsb_dentry = NULL;
446 	ls->ls_debug_waiters_dentry = NULL;
447 
448 	init_waitqueue_head(&ls->ls_uevent_wait);
449 	ls->ls_uevent_result = 0;
450 	init_completion(&ls->ls_recovery_done);
451 	ls->ls_recovery_result = -1;
452 
453 	spin_lock_init(&ls->ls_cb_lock);
454 	INIT_LIST_HEAD(&ls->ls_cb_delay);
455 
456 	ls->ls_recoverd_task = NULL;
457 	mutex_init(&ls->ls_recoverd_active);
458 	spin_lock_init(&ls->ls_recover_lock);
459 	spin_lock_init(&ls->ls_rcom_spin);
460 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
461 	ls->ls_recover_status = 0;
462 	ls->ls_recover_seq = get_random_u64();
463 	ls->ls_recover_args = NULL;
464 	init_rwsem(&ls->ls_in_recovery);
465 	rwlock_init(&ls->ls_recv_active);
466 	INIT_LIST_HEAD(&ls->ls_requestqueue);
467 	rwlock_init(&ls->ls_requestqueue_lock);
468 	spin_lock_init(&ls->ls_clear_proc_locks);
469 
470 	/* Due backwards compatibility with 3.1 we need to use maximum
471 	 * possible dlm message size to be sure the message will fit and
472 	 * not having out of bounds issues. However on sending side 3.2
473 	 * might send less.
474 	 */
475 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
476 	if (!ls->ls_recover_buf) {
477 		error = -ENOMEM;
478 		goto out_lkbxa;
479 	}
480 
481 	ls->ls_slot = 0;
482 	ls->ls_num_slots = 0;
483 	ls->ls_slots_size = 0;
484 	ls->ls_slots = NULL;
485 
486 	INIT_LIST_HEAD(&ls->ls_recover_list);
487 	spin_lock_init(&ls->ls_recover_list_lock);
488 	xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
489 	spin_lock_init(&ls->ls_recover_xa_lock);
490 	ls->ls_recover_list_count = 0;
491 	init_waitqueue_head(&ls->ls_wait_general);
492 	INIT_LIST_HEAD(&ls->ls_masters_list);
493 	rwlock_init(&ls->ls_masters_lock);
494 	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
495 	rwlock_init(&ls->ls_dir_dump_lock);
496 
497 	INIT_LIST_HEAD(&ls->ls_scan_list);
498 	spin_lock_init(&ls->ls_scan_lock);
499 	timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
500 
501 	spin_lock_bh(&lslist_lock);
502 	ls->ls_create_count = 1;
503 	list_add(&ls->ls_list, &lslist);
504 	spin_unlock_bh(&lslist_lock);
505 
506 	if (flags & DLM_LSFL_FS)
507 		set_bit(LSFL_FS, &ls->ls_flags);
508 
509 	error = dlm_callback_start(ls);
510 	if (error) {
511 		log_error(ls, "can't start dlm_callback %d", error);
512 		goto out_delist;
513 	}
514 
515 	init_waitqueue_head(&ls->ls_recover_lock_wait);
516 
517 	/*
518 	 * Once started, dlm_recoverd first looks for ls in lslist, then
519 	 * initializes ls_in_recovery as locked in "down" mode.  We need
520 	 * to wait for the wakeup from dlm_recoverd because in_recovery
521 	 * has to start out in down mode.
522 	 */
523 
524 	error = dlm_recoverd_start(ls);
525 	if (error) {
526 		log_error(ls, "can't start dlm_recoverd %d", error);
527 		goto out_callback;
528 	}
529 
530 	wait_event(ls->ls_recover_lock_wait,
531 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
532 
533 	/* let kobject handle freeing of ls if there's an error */
534 	do_unreg = 1;
535 
536 	ls->ls_kobj.kset = dlm_kset;
537 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
538 				     "%s", ls->ls_name);
539 	if (error)
540 		goto out_recoverd;
541 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
542 
543 	/* This uevent triggers dlm_controld in userspace to add us to the
544 	   group of nodes that are members of this lockspace (managed by the
545 	   cluster infrastructure.)  Once it's done that, it tells us who the
546 	   current lockspace members are (via configfs) and then tells the
547 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
548 
549 	error = do_uevent(ls, 1);
550 	if (error)
551 		goto out_recoverd;
552 
553 	/* wait until recovery is successful or failed */
554 	wait_for_completion(&ls->ls_recovery_done);
555 	error = ls->ls_recovery_result;
556 	if (error)
557 		goto out_members;
558 
559 	dlm_create_debug_file(ls);
560 
561 	log_rinfo(ls, "join complete");
562 	*lockspace = ls;
563 	return 0;
564 
565  out_members:
566 	do_uevent(ls, 0);
567 	dlm_clear_members(ls);
568 	kfree(ls->ls_node_array);
569  out_recoverd:
570 	dlm_recoverd_stop(ls);
571  out_callback:
572 	dlm_callback_stop(ls);
573  out_delist:
574 	spin_lock_bh(&lslist_lock);
575 	list_del(&ls->ls_list);
576 	spin_unlock_bh(&lslist_lock);
577 	xa_destroy(&ls->ls_recover_xa);
578 	kfree(ls->ls_recover_buf);
579  out_lkbxa:
580 	xa_destroy(&ls->ls_lkbxa);
581 	rhashtable_destroy(&ls->ls_rsbtbl);
582  out_lsfree:
583 	if (do_unreg)
584 		kobject_put(&ls->ls_kobj);
585 	else
586 		kfree(ls);
587  out:
588 	module_put(THIS_MODULE);
589 	return error;
590 }
591 
592 static int __dlm_new_lockspace(const char *name, const char *cluster,
593 			       uint32_t flags, int lvblen,
594 			       const struct dlm_lockspace_ops *ops,
595 			       void *ops_arg, int *ops_result,
596 			       dlm_lockspace_t **lockspace)
597 {
598 	int error = 0;
599 
600 	mutex_lock(&ls_lock);
601 	if (!ls_count)
602 		error = threads_start();
603 	if (error)
604 		goto out;
605 
606 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
607 			      ops_result, lockspace);
608 	if (!error)
609 		ls_count++;
610 	if (error > 0)
611 		error = 0;
612 	if (!ls_count) {
613 		dlm_midcomms_shutdown();
614 		dlm_midcomms_stop();
615 	}
616  out:
617 	mutex_unlock(&ls_lock);
618 	return error;
619 }
620 
621 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
622 		      int lvblen, const struct dlm_lockspace_ops *ops,
623 		      void *ops_arg, int *ops_result,
624 		      dlm_lockspace_t **lockspace)
625 {
626 	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
627 				   ops, ops_arg, ops_result, lockspace);
628 }
629 
630 int dlm_new_user_lockspace(const char *name, const char *cluster,
631 			   uint32_t flags, int lvblen,
632 			   const struct dlm_lockspace_ops *ops,
633 			   void *ops_arg, int *ops_result,
634 			   dlm_lockspace_t **lockspace)
635 {
636 	if (flags & DLM_LSFL_SOFTIRQ)
637 		return -EINVAL;
638 
639 	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
640 				   ops_arg, ops_result, lockspace);
641 }
642 
643 static int lkb_idr_free(struct dlm_lkb *lkb)
644 {
645 	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
646 		dlm_free_lvb(lkb->lkb_lvbptr);
647 
648 	dlm_free_lkb(lkb);
649 	return 0;
650 }
651 
652 /* NOTE: We check the lkbxa here rather than the resource table.
653    This is because there may be LKBs queued as ASTs that have been unlinked
654    from their RSBs and are pending deletion once the AST has been delivered */
655 
656 static int lockspace_busy(struct dlm_ls *ls, int force)
657 {
658 	struct dlm_lkb *lkb;
659 	unsigned long id;
660 	int rv = 0;
661 
662 	read_lock_bh(&ls->ls_lkbxa_lock);
663 	if (force == 0) {
664 		xa_for_each(&ls->ls_lkbxa, id, lkb) {
665 			rv = 1;
666 			break;
667 		}
668 	} else if (force == 1) {
669 		xa_for_each(&ls->ls_lkbxa, id, lkb) {
670 			if (lkb->lkb_nodeid == 0 &&
671 			    lkb->lkb_grmode != DLM_LOCK_IV) {
672 				rv = 1;
673 				break;
674 			}
675 		}
676 	} else {
677 		rv = 0;
678 	}
679 	read_unlock_bh(&ls->ls_lkbxa_lock);
680 	return rv;
681 }
682 
683 static void rhash_free_rsb(void *ptr, void *arg)
684 {
685 	struct dlm_rsb *rsb = ptr;
686 
687 	dlm_free_rsb(rsb);
688 }
689 
690 static int release_lockspace(struct dlm_ls *ls, int force)
691 {
692 	struct dlm_lkb *lkb;
693 	unsigned long id;
694 	int busy, rv;
695 
696 	busy = lockspace_busy(ls, force);
697 
698 	spin_lock_bh(&lslist_lock);
699 	if (ls->ls_create_count == 1) {
700 		if (busy) {
701 			rv = -EBUSY;
702 		} else {
703 			/* remove_lockspace takes ls off lslist */
704 			ls->ls_create_count = 0;
705 			rv = 0;
706 		}
707 	} else if (ls->ls_create_count > 1) {
708 		rv = --ls->ls_create_count;
709 	} else {
710 		rv = -EINVAL;
711 	}
712 	spin_unlock_bh(&lslist_lock);
713 
714 	if (rv) {
715 		log_debug(ls, "release_lockspace no remove %d", rv);
716 		return rv;
717 	}
718 
719 	if (ls_count == 1)
720 		dlm_midcomms_version_wait();
721 
722 	dlm_device_deregister(ls);
723 
724 	if (force < 3 && dlm_user_daemon_available())
725 		do_uevent(ls, 0);
726 
727 	dlm_recoverd_stop(ls);
728 
729 	/* clear the LSFL_RUNNING flag to fast up
730 	 * time_shutdown_sync(), we don't care anymore
731 	 */
732 	clear_bit(LSFL_RUNNING, &ls->ls_flags);
733 	timer_shutdown_sync(&ls->ls_scan_timer);
734 
735 	if (ls_count == 1) {
736 		dlm_clear_members(ls);
737 		dlm_midcomms_shutdown();
738 	}
739 
740 	dlm_callback_stop(ls);
741 
742 	remove_lockspace(ls);
743 
744 	dlm_delete_debug_file(ls);
745 
746 	xa_destroy(&ls->ls_recover_xa);
747 	kfree(ls->ls_recover_buf);
748 
749 	/*
750 	 * Free all lkb's in xa
751 	 */
752 	xa_for_each(&ls->ls_lkbxa, id, lkb) {
753 		lkb_idr_free(lkb);
754 	}
755 	xa_destroy(&ls->ls_lkbxa);
756 
757 	/*
758 	 * Free all rsb's on rsbtbl
759 	 */
760 	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
761 
762 	/*
763 	 * Free structures on any other lists
764 	 */
765 
766 	dlm_purge_requestqueue(ls);
767 	kfree(ls->ls_recover_args);
768 	dlm_clear_members(ls);
769 	dlm_clear_members_gone(ls);
770 	kfree(ls->ls_node_array);
771 	log_rinfo(ls, "release_lockspace final free");
772 	kobject_put(&ls->ls_kobj);
773 	/* The ls structure will be freed when the kobject is done with */
774 
775 	module_put(THIS_MODULE);
776 	return 0;
777 }
778 
779 /*
780  * Called when a system has released all its locks and is not going to use the
781  * lockspace any longer.  We free everything we're managing for this lockspace.
782  * Remaining nodes will go through the recovery process as if we'd died.  The
783  * lockspace must continue to function as usual, participating in recoveries,
784  * until this returns.
785  *
786  * Force has 4 possible values:
787  * 0 - don't destroy lockspace if it has any LKBs
788  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
789  * 2 - destroy lockspace regardless of LKBs
790  * 3 - destroy lockspace as part of a forced shutdown
791  */
792 
793 int dlm_release_lockspace(void *lockspace, int force)
794 {
795 	struct dlm_ls *ls;
796 	int error;
797 
798 	ls = dlm_find_lockspace_local(lockspace);
799 	if (!ls)
800 		return -EINVAL;
801 	dlm_put_lockspace(ls);
802 
803 	mutex_lock(&ls_lock);
804 	error = release_lockspace(ls, force);
805 	if (!error)
806 		ls_count--;
807 	if (!ls_count)
808 		dlm_midcomms_stop();
809 	mutex_unlock(&ls_lock);
810 
811 	return error;
812 }
813 
814 void dlm_stop_lockspaces(void)
815 {
816 	struct dlm_ls *ls;
817 	int count;
818 
819  restart:
820 	count = 0;
821 	spin_lock_bh(&lslist_lock);
822 	list_for_each_entry(ls, &lslist, ls_list) {
823 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
824 			count++;
825 			continue;
826 		}
827 		spin_unlock_bh(&lslist_lock);
828 		log_error(ls, "no userland control daemon, stopping lockspace");
829 		dlm_ls_stop(ls);
830 		goto restart;
831 	}
832 	spin_unlock_bh(&lslist_lock);
833 
834 	if (count)
835 		log_print("dlm user daemon left %d lockspaces", count);
836 }
837