xref: /linux/fs/dlm/lockspace.c (revision 68c402fe5c5e5aa9a04c8bba9d99feb08a68afa7)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 
33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34 {
35 	ssize_t ret = len;
36 	int n;
37 	int rc = kstrtoint(buf, 0, &n);
38 
39 	if (rc)
40 		return rc;
41 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
42 	if (!ls)
43 		return -EINVAL;
44 
45 	switch (n) {
46 	case 0:
47 		dlm_ls_stop(ls);
48 		break;
49 	case 1:
50 		dlm_ls_start(ls);
51 		break;
52 	default:
53 		ret = -EINVAL;
54 	}
55 	dlm_put_lockspace(ls);
56 	return ret;
57 }
58 
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62 
63 	if (rc)
64 		return rc;
65 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66 	wake_up(&ls->ls_uevent_wait);
67 	return len;
68 }
69 
70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71 {
72 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73 }
74 
75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76 {
77 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78 
79 	if (rc)
80 		return rc;
81 	return len;
82 }
83 
84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85 {
86 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87 }
88 
89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90 {
91 	int val;
92 	int rc = kstrtoint(buf, 0, &val);
93 
94 	if (rc)
95 		return rc;
96 	if (val == 1)
97 		set_bit(LSFL_NODIR, &ls->ls_flags);
98 	return len;
99 }
100 
101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102 {
103 	uint32_t status = dlm_recover_status(ls);
104 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
105 }
106 
107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108 {
109 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110 }
111 
112 struct dlm_attr {
113 	struct attribute attr;
114 	ssize_t (*show)(struct dlm_ls *, char *);
115 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116 };
117 
118 static struct dlm_attr dlm_attr_control = {
119 	.attr  = {.name = "control", .mode = S_IWUSR},
120 	.store = dlm_control_store
121 };
122 
123 static struct dlm_attr dlm_attr_event = {
124 	.attr  = {.name = "event_done", .mode = S_IWUSR},
125 	.store = dlm_event_store
126 };
127 
128 static struct dlm_attr dlm_attr_id = {
129 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130 	.show  = dlm_id_show,
131 	.store = dlm_id_store
132 };
133 
134 static struct dlm_attr dlm_attr_nodir = {
135 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136 	.show  = dlm_nodir_show,
137 	.store = dlm_nodir_store
138 };
139 
140 static struct dlm_attr dlm_attr_recover_status = {
141 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
142 	.show  = dlm_recover_status_show
143 };
144 
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
147 	.show  = dlm_recover_nodeid_show
148 };
149 
150 static struct attribute *dlm_attrs[] = {
151 	&dlm_attr_control.attr,
152 	&dlm_attr_event.attr,
153 	&dlm_attr_id.attr,
154 	&dlm_attr_nodir.attr,
155 	&dlm_attr_recover_status.attr,
156 	&dlm_attr_recover_nodeid.attr,
157 	NULL,
158 };
159 ATTRIBUTE_GROUPS(dlm);
160 
161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162 			     char *buf)
163 {
164 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
165 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166 	return a->show ? a->show(ls, buf) : 0;
167 }
168 
169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170 			      const char *buf, size_t len)
171 {
172 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
173 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174 	return a->store ? a->store(ls, buf, len) : len;
175 }
176 
177 static void lockspace_kobj_release(struct kobject *k)
178 {
179 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
180 	kfree(ls);
181 }
182 
183 static const struct sysfs_ops dlm_attr_ops = {
184 	.show  = dlm_attr_show,
185 	.store = dlm_attr_store,
186 };
187 
188 static struct kobj_type dlm_ktype = {
189 	.default_groups = dlm_groups,
190 	.sysfs_ops     = &dlm_attr_ops,
191 	.release       = lockspace_kobj_release,
192 };
193 
194 static struct kset *dlm_kset;
195 
196 static int do_uevent(struct dlm_ls *ls, int in)
197 {
198 	if (in)
199 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
200 	else
201 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
202 
203 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
204 
205 	/* dlm_controld will see the uevent, do the necessary group management
206 	   and then write to sysfs to wake us */
207 
208 	wait_event(ls->ls_uevent_wait,
209 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
210 
211 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
212 
213 	return ls->ls_uevent_result;
214 }
215 
216 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
217 {
218 	const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
219 
220 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
221 	return 0;
222 }
223 
224 static const struct kset_uevent_ops dlm_uevent_ops = {
225 	.uevent = dlm_uevent,
226 };
227 
228 int __init dlm_lockspace_init(void)
229 {
230 	ls_count = 0;
231 	mutex_init(&ls_lock);
232 	INIT_LIST_HEAD(&lslist);
233 	spin_lock_init(&lslist_lock);
234 
235 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
236 	if (!dlm_kset) {
237 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
238 		return -ENOMEM;
239 	}
240 	return 0;
241 }
242 
243 void dlm_lockspace_exit(void)
244 {
245 	kset_unregister(dlm_kset);
246 }
247 
248 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
249 {
250 	struct dlm_ls *ls;
251 
252 	spin_lock_bh(&lslist_lock);
253 
254 	list_for_each_entry(ls, &lslist, ls_list) {
255 		if (ls->ls_global_id == id) {
256 			atomic_inc(&ls->ls_count);
257 			goto out;
258 		}
259 	}
260 	ls = NULL;
261  out:
262 	spin_unlock_bh(&lslist_lock);
263 	return ls;
264 }
265 
266 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
267 {
268 	struct dlm_ls *ls;
269 
270 	spin_lock_bh(&lslist_lock);
271 	list_for_each_entry(ls, &lslist, ls_list) {
272 		if (ls->ls_local_handle == lockspace) {
273 			atomic_inc(&ls->ls_count);
274 			goto out;
275 		}
276 	}
277 	ls = NULL;
278  out:
279 	spin_unlock_bh(&lslist_lock);
280 	return ls;
281 }
282 
283 struct dlm_ls *dlm_find_lockspace_device(int minor)
284 {
285 	struct dlm_ls *ls;
286 
287 	spin_lock_bh(&lslist_lock);
288 	list_for_each_entry(ls, &lslist, ls_list) {
289 		if (ls->ls_device.minor == minor) {
290 			atomic_inc(&ls->ls_count);
291 			goto out;
292 		}
293 	}
294 	ls = NULL;
295  out:
296 	spin_unlock_bh(&lslist_lock);
297 	return ls;
298 }
299 
300 void dlm_put_lockspace(struct dlm_ls *ls)
301 {
302 	if (atomic_dec_and_test(&ls->ls_count))
303 		wake_up(&ls->ls_count_wait);
304 }
305 
306 static void remove_lockspace(struct dlm_ls *ls)
307 {
308 retry:
309 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
310 
311 	spin_lock_bh(&lslist_lock);
312 	if (atomic_read(&ls->ls_count) != 0) {
313 		spin_unlock_bh(&lslist_lock);
314 		goto retry;
315 	}
316 
317 	WARN_ON(ls->ls_create_count != 0);
318 	list_del(&ls->ls_list);
319 	spin_unlock_bh(&lslist_lock);
320 }
321 
322 static int threads_start(void)
323 {
324 	int error;
325 
326 	/* Thread for sending/receiving messages for all lockspace's */
327 	error = dlm_midcomms_start();
328 	if (error)
329 		log_print("cannot start dlm midcomms %d", error);
330 
331 	return error;
332 }
333 
334 static int new_lockspace(const char *name, const char *cluster,
335 			 uint32_t flags, int lvblen,
336 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
337 			 int *ops_result, dlm_lockspace_t **lockspace)
338 {
339 	struct dlm_ls *ls;
340 	int do_unreg = 0;
341 	int namelen = strlen(name);
342 	int error;
343 
344 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
345 		return -EINVAL;
346 
347 	if (lvblen % 8)
348 		return -EINVAL;
349 
350 	if (!try_module_get(THIS_MODULE))
351 		return -EINVAL;
352 
353 	if (!dlm_user_daemon_available()) {
354 		log_print("dlm user daemon not available");
355 		error = -EUNATCH;
356 		goto out;
357 	}
358 
359 	if (ops && ops_result) {
360 	       	if (!dlm_config.ci_recover_callbacks)
361 			*ops_result = -EOPNOTSUPP;
362 		else
363 			*ops_result = 0;
364 	}
365 
366 	if (!cluster)
367 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
368 			  dlm_config.ci_cluster_name);
369 
370 	if (dlm_config.ci_recover_callbacks && cluster &&
371 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
372 		log_print("dlm cluster name '%s' does not match "
373 			  "the application cluster name '%s'",
374 			  dlm_config.ci_cluster_name, cluster);
375 		error = -EBADR;
376 		goto out;
377 	}
378 
379 	error = 0;
380 
381 	spin_lock_bh(&lslist_lock);
382 	list_for_each_entry(ls, &lslist, ls_list) {
383 		WARN_ON(ls->ls_create_count <= 0);
384 		if (ls->ls_namelen != namelen)
385 			continue;
386 		if (memcmp(ls->ls_name, name, namelen))
387 			continue;
388 		if (flags & DLM_LSFL_NEWEXCL) {
389 			error = -EEXIST;
390 			break;
391 		}
392 		ls->ls_create_count++;
393 		*lockspace = ls;
394 		error = 1;
395 		break;
396 	}
397 	spin_unlock_bh(&lslist_lock);
398 
399 	if (error)
400 		goto out;
401 
402 	error = -ENOMEM;
403 
404 	ls = kzalloc(sizeof(*ls), GFP_NOFS);
405 	if (!ls)
406 		goto out;
407 	memcpy(ls->ls_name, name, namelen);
408 	ls->ls_namelen = namelen;
409 	ls->ls_lvblen = lvblen;
410 	atomic_set(&ls->ls_count, 0);
411 	init_waitqueue_head(&ls->ls_count_wait);
412 	ls->ls_flags = 0;
413 	ls->ls_scan_time = jiffies;
414 
415 	if (ops && dlm_config.ci_recover_callbacks) {
416 		ls->ls_ops = ops;
417 		ls->ls_ops_arg = ops_arg;
418 	}
419 
420 	/* ls_exflags are forced to match among nodes, and we don't
421 	 * need to require all nodes to have some flags set
422 	 */
423 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
424 
425 	INIT_LIST_HEAD(&ls->ls_toss);
426 	INIT_LIST_HEAD(&ls->ls_keep);
427 	rwlock_init(&ls->ls_rsbtbl_lock);
428 
429 	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
430 	if (error)
431 		goto out_lsfree;
432 
433 	idr_init(&ls->ls_lkbidr);
434 	rwlock_init(&ls->ls_lkbidr_lock);
435 
436 	INIT_LIST_HEAD(&ls->ls_waiters);
437 	spin_lock_init(&ls->ls_waiters_lock);
438 	INIT_LIST_HEAD(&ls->ls_orphans);
439 	spin_lock_init(&ls->ls_orphans_lock);
440 
441 	INIT_LIST_HEAD(&ls->ls_new_rsb);
442 	spin_lock_init(&ls->ls_new_rsb_spin);
443 
444 	INIT_LIST_HEAD(&ls->ls_nodes);
445 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
446 	ls->ls_num_nodes = 0;
447 	ls->ls_low_nodeid = 0;
448 	ls->ls_total_weight = 0;
449 	ls->ls_node_array = NULL;
450 
451 	memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
452 	ls->ls_local_rsb.res_ls = ls;
453 
454 	ls->ls_debug_rsb_dentry = NULL;
455 	ls->ls_debug_waiters_dentry = NULL;
456 
457 	init_waitqueue_head(&ls->ls_uevent_wait);
458 	ls->ls_uevent_result = 0;
459 	init_completion(&ls->ls_recovery_done);
460 	ls->ls_recovery_result = -1;
461 
462 	spin_lock_init(&ls->ls_cb_lock);
463 	INIT_LIST_HEAD(&ls->ls_cb_delay);
464 
465 	ls->ls_recoverd_task = NULL;
466 	mutex_init(&ls->ls_recoverd_active);
467 	spin_lock_init(&ls->ls_recover_lock);
468 	spin_lock_init(&ls->ls_rcom_spin);
469 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
470 	ls->ls_recover_status = 0;
471 	ls->ls_recover_seq = get_random_u64();
472 	ls->ls_recover_args = NULL;
473 	init_rwsem(&ls->ls_in_recovery);
474 	rwlock_init(&ls->ls_recv_active);
475 	INIT_LIST_HEAD(&ls->ls_requestqueue);
476 	rwlock_init(&ls->ls_requestqueue_lock);
477 	spin_lock_init(&ls->ls_clear_proc_locks);
478 
479 	/* Due backwards compatibility with 3.1 we need to use maximum
480 	 * possible dlm message size to be sure the message will fit and
481 	 * not having out of bounds issues. However on sending side 3.2
482 	 * might send less.
483 	 */
484 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
485 	if (!ls->ls_recover_buf) {
486 		error = -ENOMEM;
487 		goto out_lkbidr;
488 	}
489 
490 	ls->ls_slot = 0;
491 	ls->ls_num_slots = 0;
492 	ls->ls_slots_size = 0;
493 	ls->ls_slots = NULL;
494 
495 	INIT_LIST_HEAD(&ls->ls_recover_list);
496 	spin_lock_init(&ls->ls_recover_list_lock);
497 	idr_init(&ls->ls_recover_idr);
498 	spin_lock_init(&ls->ls_recover_idr_lock);
499 	ls->ls_recover_list_count = 0;
500 	ls->ls_local_handle = ls;
501 	init_waitqueue_head(&ls->ls_wait_general);
502 	INIT_LIST_HEAD(&ls->ls_masters_list);
503 	rwlock_init(&ls->ls_masters_lock);
504 	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
505 	rwlock_init(&ls->ls_dir_dump_lock);
506 
507 	INIT_LIST_HEAD(&ls->ls_toss_q);
508 	spin_lock_init(&ls->ls_toss_q_lock);
509 	timer_setup(&ls->ls_timer, dlm_rsb_toss_timer,
510 		    TIMER_DEFERRABLE);
511 
512 	spin_lock_bh(&lslist_lock);
513 	ls->ls_create_count = 1;
514 	list_add(&ls->ls_list, &lslist);
515 	spin_unlock_bh(&lslist_lock);
516 
517 	if (flags & DLM_LSFL_FS) {
518 		error = dlm_callback_start(ls);
519 		if (error) {
520 			log_error(ls, "can't start dlm_callback %d", error);
521 			goto out_delist;
522 		}
523 	}
524 
525 	init_waitqueue_head(&ls->ls_recover_lock_wait);
526 
527 	/*
528 	 * Once started, dlm_recoverd first looks for ls in lslist, then
529 	 * initializes ls_in_recovery as locked in "down" mode.  We need
530 	 * to wait for the wakeup from dlm_recoverd because in_recovery
531 	 * has to start out in down mode.
532 	 */
533 
534 	error = dlm_recoverd_start(ls);
535 	if (error) {
536 		log_error(ls, "can't start dlm_recoverd %d", error);
537 		goto out_callback;
538 	}
539 
540 	wait_event(ls->ls_recover_lock_wait,
541 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
542 
543 	/* let kobject handle freeing of ls if there's an error */
544 	do_unreg = 1;
545 
546 	ls->ls_kobj.kset = dlm_kset;
547 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
548 				     "%s", ls->ls_name);
549 	if (error)
550 		goto out_recoverd;
551 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
552 
553 	/* This uevent triggers dlm_controld in userspace to add us to the
554 	   group of nodes that are members of this lockspace (managed by the
555 	   cluster infrastructure.)  Once it's done that, it tells us who the
556 	   current lockspace members are (via configfs) and then tells the
557 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
558 
559 	error = do_uevent(ls, 1);
560 	if (error)
561 		goto out_recoverd;
562 
563 	/* wait until recovery is successful or failed */
564 	wait_for_completion(&ls->ls_recovery_done);
565 	error = ls->ls_recovery_result;
566 	if (error)
567 		goto out_members;
568 
569 	dlm_create_debug_file(ls);
570 
571 	log_rinfo(ls, "join complete");
572 	*lockspace = ls;
573 	return 0;
574 
575  out_members:
576 	do_uevent(ls, 0);
577 	dlm_clear_members(ls);
578 	kfree(ls->ls_node_array);
579  out_recoverd:
580 	dlm_recoverd_stop(ls);
581  out_callback:
582 	dlm_callback_stop(ls);
583  out_delist:
584 	spin_lock_bh(&lslist_lock);
585 	list_del(&ls->ls_list);
586 	spin_unlock_bh(&lslist_lock);
587 	idr_destroy(&ls->ls_recover_idr);
588 	kfree(ls->ls_recover_buf);
589  out_lkbidr:
590 	idr_destroy(&ls->ls_lkbidr);
591 	rhashtable_destroy(&ls->ls_rsbtbl);
592  out_lsfree:
593 	if (do_unreg)
594 		kobject_put(&ls->ls_kobj);
595 	else
596 		kfree(ls);
597  out:
598 	module_put(THIS_MODULE);
599 	return error;
600 }
601 
602 static int __dlm_new_lockspace(const char *name, const char *cluster,
603 			       uint32_t flags, int lvblen,
604 			       const struct dlm_lockspace_ops *ops,
605 			       void *ops_arg, int *ops_result,
606 			       dlm_lockspace_t **lockspace)
607 {
608 	int error = 0;
609 
610 	mutex_lock(&ls_lock);
611 	if (!ls_count)
612 		error = threads_start();
613 	if (error)
614 		goto out;
615 
616 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
617 			      ops_result, lockspace);
618 	if (!error)
619 		ls_count++;
620 	if (error > 0)
621 		error = 0;
622 	if (!ls_count) {
623 		dlm_midcomms_shutdown();
624 		dlm_midcomms_stop();
625 	}
626  out:
627 	mutex_unlock(&ls_lock);
628 	return error;
629 }
630 
631 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
632 		      int lvblen, const struct dlm_lockspace_ops *ops,
633 		      void *ops_arg, int *ops_result,
634 		      dlm_lockspace_t **lockspace)
635 {
636 	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
637 				   ops, ops_arg, ops_result, lockspace);
638 }
639 
640 int dlm_new_user_lockspace(const char *name, const char *cluster,
641 			   uint32_t flags, int lvblen,
642 			   const struct dlm_lockspace_ops *ops,
643 			   void *ops_arg, int *ops_result,
644 			   dlm_lockspace_t **lockspace)
645 {
646 	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
647 				   ops_arg, ops_result, lockspace);
648 }
649 
650 static int lkb_idr_is_local(int id, void *p, void *data)
651 {
652 	struct dlm_lkb *lkb = p;
653 
654 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
655 }
656 
657 static int lkb_idr_is_any(int id, void *p, void *data)
658 {
659 	return 1;
660 }
661 
662 static int lkb_idr_free(int id, void *p, void *data)
663 {
664 	struct dlm_lkb *lkb = p;
665 
666 	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
667 		dlm_free_lvb(lkb->lkb_lvbptr);
668 
669 	dlm_free_lkb(lkb);
670 	return 0;
671 }
672 
673 /* NOTE: We check the lkbidr here rather than the resource table.
674    This is because there may be LKBs queued as ASTs that have been unlinked
675    from their RSBs and are pending deletion once the AST has been delivered */
676 
677 static int lockspace_busy(struct dlm_ls *ls, int force)
678 {
679 	int rv;
680 
681 	read_lock_bh(&ls->ls_lkbidr_lock);
682 	if (force == 0) {
683 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
684 	} else if (force == 1) {
685 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
686 	} else {
687 		rv = 0;
688 	}
689 	read_unlock_bh(&ls->ls_lkbidr_lock);
690 	return rv;
691 }
692 
693 static void rhash_free_rsb(void *ptr, void *arg)
694 {
695 	struct dlm_rsb *rsb = ptr;
696 
697 	dlm_free_rsb(rsb);
698 }
699 
700 static int release_lockspace(struct dlm_ls *ls, int force)
701 {
702 	struct dlm_rsb *rsb;
703 	int busy, rv;
704 
705 	busy = lockspace_busy(ls, force);
706 
707 	spin_lock_bh(&lslist_lock);
708 	if (ls->ls_create_count == 1) {
709 		if (busy) {
710 			rv = -EBUSY;
711 		} else {
712 			/* remove_lockspace takes ls off lslist */
713 			ls->ls_create_count = 0;
714 			rv = 0;
715 		}
716 	} else if (ls->ls_create_count > 1) {
717 		rv = --ls->ls_create_count;
718 	} else {
719 		rv = -EINVAL;
720 	}
721 	spin_unlock_bh(&lslist_lock);
722 
723 	if (rv) {
724 		log_debug(ls, "release_lockspace no remove %d", rv);
725 		return rv;
726 	}
727 
728 	if (ls_count == 1)
729 		dlm_midcomms_version_wait();
730 
731 	dlm_device_deregister(ls);
732 
733 	if (force < 3 && dlm_user_daemon_available())
734 		do_uevent(ls, 0);
735 
736 	dlm_recoverd_stop(ls);
737 
738 	/* clear the LSFL_RUNNING flag to fast up
739 	 * time_shutdown_sync(), we don't care anymore
740 	 */
741 	clear_bit(LSFL_RUNNING, &ls->ls_flags);
742 	timer_shutdown_sync(&ls->ls_timer);
743 
744 	if (ls_count == 1) {
745 		dlm_clear_members(ls);
746 		dlm_midcomms_shutdown();
747 	}
748 
749 	dlm_callback_stop(ls);
750 
751 	remove_lockspace(ls);
752 
753 	dlm_delete_debug_file(ls);
754 
755 	idr_destroy(&ls->ls_recover_idr);
756 	kfree(ls->ls_recover_buf);
757 
758 	/*
759 	 * Free all lkb's in idr
760 	 */
761 
762 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
763 	idr_destroy(&ls->ls_lkbidr);
764 
765 	/*
766 	 * Free all rsb's on rsbtbl
767 	 */
768 	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
769 
770 	while (!list_empty(&ls->ls_new_rsb)) {
771 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
772 				       res_hashchain);
773 		list_del(&rsb->res_hashchain);
774 		dlm_free_rsb(rsb);
775 	}
776 
777 	/*
778 	 * Free structures on any other lists
779 	 */
780 
781 	dlm_purge_requestqueue(ls);
782 	kfree(ls->ls_recover_args);
783 	dlm_clear_members(ls);
784 	dlm_clear_members_gone(ls);
785 	kfree(ls->ls_node_array);
786 	log_rinfo(ls, "release_lockspace final free");
787 	kobject_put(&ls->ls_kobj);
788 	/* The ls structure will be freed when the kobject is done with */
789 
790 	module_put(THIS_MODULE);
791 	return 0;
792 }
793 
794 /*
795  * Called when a system has released all its locks and is not going to use the
796  * lockspace any longer.  We free everything we're managing for this lockspace.
797  * Remaining nodes will go through the recovery process as if we'd died.  The
798  * lockspace must continue to function as usual, participating in recoveries,
799  * until this returns.
800  *
801  * Force has 4 possible values:
802  * 0 - don't destroy lockspace if it has any LKBs
803  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
804  * 2 - destroy lockspace regardless of LKBs
805  * 3 - destroy lockspace as part of a forced shutdown
806  */
807 
808 int dlm_release_lockspace(void *lockspace, int force)
809 {
810 	struct dlm_ls *ls;
811 	int error;
812 
813 	ls = dlm_find_lockspace_local(lockspace);
814 	if (!ls)
815 		return -EINVAL;
816 	dlm_put_lockspace(ls);
817 
818 	mutex_lock(&ls_lock);
819 	error = release_lockspace(ls, force);
820 	if (!error)
821 		ls_count--;
822 	if (!ls_count)
823 		dlm_midcomms_stop();
824 	mutex_unlock(&ls_lock);
825 
826 	return error;
827 }
828 
829 void dlm_stop_lockspaces(void)
830 {
831 	struct dlm_ls *ls;
832 	int count;
833 
834  restart:
835 	count = 0;
836 	spin_lock_bh(&lslist_lock);
837 	list_for_each_entry(ls, &lslist, ls_list) {
838 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
839 			count++;
840 			continue;
841 		}
842 		spin_unlock_bh(&lslist_lock);
843 		log_error(ls, "no userland control daemon, stopping lockspace");
844 		dlm_ls_stop(ls);
845 		goto restart;
846 	}
847 	spin_unlock_bh(&lslist_lock);
848 
849 	if (count)
850 		log_print("dlm user daemon left %d lockspaces", count);
851 }
852