xref: /linux/fs/dlm/lockspace.c (revision 0dd9ac63ce26ec87b080ca9c3e6efed33c23ace6)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n = simple_strtol(buf, NULL, 0);
39 
40 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 	if (!ls)
42 		return -EINVAL;
43 
44 	switch (n) {
45 	case 0:
46 		dlm_ls_stop(ls);
47 		break;
48 	case 1:
49 		dlm_ls_start(ls);
50 		break;
51 	default:
52 		ret = -EINVAL;
53 	}
54 	dlm_put_lockspace(ls);
55 	return ret;
56 }
57 
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60 	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 	wake_up(&ls->ls_uevent_wait);
63 	return len;
64 }
65 
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70 
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73 	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 	return len;
75 }
76 
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79 	uint32_t status = dlm_recover_status(ls);
80 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82 
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87 
88 struct dlm_attr {
89 	struct attribute attr;
90 	ssize_t (*show)(struct dlm_ls *, char *);
91 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93 
94 static struct dlm_attr dlm_attr_control = {
95 	.attr  = {.name = "control", .mode = S_IWUSR},
96 	.store = dlm_control_store
97 };
98 
99 static struct dlm_attr dlm_attr_event = {
100 	.attr  = {.name = "event_done", .mode = S_IWUSR},
101 	.store = dlm_event_store
102 };
103 
104 static struct dlm_attr dlm_attr_id = {
105 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106 	.show  = dlm_id_show,
107 	.store = dlm_id_store
108 };
109 
110 static struct dlm_attr dlm_attr_recover_status = {
111 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
112 	.show  = dlm_recover_status_show
113 };
114 
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117 	.show  = dlm_recover_nodeid_show
118 };
119 
120 static struct attribute *dlm_attrs[] = {
121 	&dlm_attr_control.attr,
122 	&dlm_attr_event.attr,
123 	&dlm_attr_id.attr,
124 	&dlm_attr_recover_status.attr,
125 	&dlm_attr_recover_nodeid.attr,
126 	NULL,
127 };
128 
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130 			     char *buf)
131 {
132 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134 	return a->show ? a->show(ls, buf) : 0;
135 }
136 
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138 			      const char *buf, size_t len)
139 {
140 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142 	return a->store ? a->store(ls, buf, len) : len;
143 }
144 
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148 	kfree(ls);
149 }
150 
151 static const struct sysfs_ops dlm_attr_ops = {
152 	.show  = dlm_attr_show,
153 	.store = dlm_attr_store,
154 };
155 
156 static struct kobj_type dlm_ktype = {
157 	.default_attrs = dlm_attrs,
158 	.sysfs_ops     = &dlm_attr_ops,
159 	.release       = lockspace_kobj_release,
160 };
161 
162 static struct kset *dlm_kset;
163 
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166 	int error;
167 
168 	if (in)
169 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170 	else
171 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172 
173 	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174 
175 	/* dlm_controld will see the uevent, do the necessary group management
176 	   and then write to sysfs to wake us */
177 
178 	error = wait_event_interruptible(ls->ls_uevent_wait,
179 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180 
181 	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182 
183 	if (error)
184 		goto out;
185 
186 	error = ls->ls_uevent_result;
187  out:
188 	if (error)
189 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190 			  error, ls->ls_uevent_result);
191 	return error;
192 }
193 
194 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
195 		      struct kobj_uevent_env *env)
196 {
197 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
198 
199 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
200 	return 0;
201 }
202 
203 static struct kset_uevent_ops dlm_uevent_ops = {
204 	.uevent = dlm_uevent,
205 };
206 
207 int __init dlm_lockspace_init(void)
208 {
209 	ls_count = 0;
210 	mutex_init(&ls_lock);
211 	INIT_LIST_HEAD(&lslist);
212 	spin_lock_init(&lslist_lock);
213 
214 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
215 	if (!dlm_kset) {
216 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
217 		return -ENOMEM;
218 	}
219 	return 0;
220 }
221 
222 void dlm_lockspace_exit(void)
223 {
224 	kset_unregister(dlm_kset);
225 }
226 
227 static struct dlm_ls *find_ls_to_scan(void)
228 {
229 	struct dlm_ls *ls;
230 
231 	spin_lock(&lslist_lock);
232 	list_for_each_entry(ls, &lslist, ls_list) {
233 		if (time_after_eq(jiffies, ls->ls_scan_time +
234 					    dlm_config.ci_scan_secs * HZ)) {
235 			spin_unlock(&lslist_lock);
236 			return ls;
237 		}
238 	}
239 	spin_unlock(&lslist_lock);
240 	return NULL;
241 }
242 
243 static int dlm_scand(void *data)
244 {
245 	struct dlm_ls *ls;
246 	int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
247 
248 	while (!kthread_should_stop()) {
249 		ls = find_ls_to_scan();
250 		if (ls) {
251 			if (dlm_lock_recovery_try(ls)) {
252 				ls->ls_scan_time = jiffies;
253 				dlm_scan_rsbs(ls);
254 				dlm_scan_timeout(ls);
255 				dlm_unlock_recovery(ls);
256 			} else {
257 				ls->ls_scan_time += HZ;
258 			}
259 		} else {
260 			schedule_timeout_interruptible(timeout_jiffies);
261 		}
262 	}
263 	return 0;
264 }
265 
266 static int dlm_scand_start(void)
267 {
268 	struct task_struct *p;
269 	int error = 0;
270 
271 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
272 	if (IS_ERR(p))
273 		error = PTR_ERR(p);
274 	else
275 		scand_task = p;
276 	return error;
277 }
278 
279 static void dlm_scand_stop(void)
280 {
281 	kthread_stop(scand_task);
282 }
283 
284 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
285 {
286 	struct dlm_ls *ls;
287 
288 	spin_lock(&lslist_lock);
289 
290 	list_for_each_entry(ls, &lslist, ls_list) {
291 		if (ls->ls_global_id == id) {
292 			ls->ls_count++;
293 			goto out;
294 		}
295 	}
296 	ls = NULL;
297  out:
298 	spin_unlock(&lslist_lock);
299 	return ls;
300 }
301 
302 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
303 {
304 	struct dlm_ls *ls;
305 
306 	spin_lock(&lslist_lock);
307 	list_for_each_entry(ls, &lslist, ls_list) {
308 		if (ls->ls_local_handle == lockspace) {
309 			ls->ls_count++;
310 			goto out;
311 		}
312 	}
313 	ls = NULL;
314  out:
315 	spin_unlock(&lslist_lock);
316 	return ls;
317 }
318 
319 struct dlm_ls *dlm_find_lockspace_device(int minor)
320 {
321 	struct dlm_ls *ls;
322 
323 	spin_lock(&lslist_lock);
324 	list_for_each_entry(ls, &lslist, ls_list) {
325 		if (ls->ls_device.minor == minor) {
326 			ls->ls_count++;
327 			goto out;
328 		}
329 	}
330 	ls = NULL;
331  out:
332 	spin_unlock(&lslist_lock);
333 	return ls;
334 }
335 
336 void dlm_put_lockspace(struct dlm_ls *ls)
337 {
338 	spin_lock(&lslist_lock);
339 	ls->ls_count--;
340 	spin_unlock(&lslist_lock);
341 }
342 
343 static void remove_lockspace(struct dlm_ls *ls)
344 {
345 	for (;;) {
346 		spin_lock(&lslist_lock);
347 		if (ls->ls_count == 0) {
348 			WARN_ON(ls->ls_create_count != 0);
349 			list_del(&ls->ls_list);
350 			spin_unlock(&lslist_lock);
351 			return;
352 		}
353 		spin_unlock(&lslist_lock);
354 		ssleep(1);
355 	}
356 }
357 
358 static int threads_start(void)
359 {
360 	int error;
361 
362 	/* Thread which process lock requests for all lockspace's */
363 	error = dlm_astd_start();
364 	if (error) {
365 		log_print("cannot start dlm_astd thread %d", error);
366 		goto fail;
367 	}
368 
369 	error = dlm_scand_start();
370 	if (error) {
371 		log_print("cannot start dlm_scand thread %d", error);
372 		goto astd_fail;
373 	}
374 
375 	/* Thread for sending/receiving messages for all lockspace's */
376 	error = dlm_lowcomms_start();
377 	if (error) {
378 		log_print("cannot start dlm lowcomms %d", error);
379 		goto scand_fail;
380 	}
381 
382 	return 0;
383 
384  scand_fail:
385 	dlm_scand_stop();
386  astd_fail:
387 	dlm_astd_stop();
388  fail:
389 	return error;
390 }
391 
392 static void threads_stop(void)
393 {
394 	dlm_scand_stop();
395 	dlm_lowcomms_stop();
396 	dlm_astd_stop();
397 }
398 
399 static int new_lockspace(const char *name, int namelen, void **lockspace,
400 			 uint32_t flags, int lvblen)
401 {
402 	struct dlm_ls *ls;
403 	int i, size, error;
404 	int do_unreg = 0;
405 
406 	if (namelen > DLM_LOCKSPACE_LEN)
407 		return -EINVAL;
408 
409 	if (!lvblen || (lvblen % 8))
410 		return -EINVAL;
411 
412 	if (!try_module_get(THIS_MODULE))
413 		return -EINVAL;
414 
415 	if (!dlm_user_daemon_available()) {
416 		module_put(THIS_MODULE);
417 		return -EUNATCH;
418 	}
419 
420 	error = 0;
421 
422 	spin_lock(&lslist_lock);
423 	list_for_each_entry(ls, &lslist, ls_list) {
424 		WARN_ON(ls->ls_create_count <= 0);
425 		if (ls->ls_namelen != namelen)
426 			continue;
427 		if (memcmp(ls->ls_name, name, namelen))
428 			continue;
429 		if (flags & DLM_LSFL_NEWEXCL) {
430 			error = -EEXIST;
431 			break;
432 		}
433 		ls->ls_create_count++;
434 		*lockspace = ls;
435 		error = 1;
436 		break;
437 	}
438 	spin_unlock(&lslist_lock);
439 
440 	if (error)
441 		goto out;
442 
443 	error = -ENOMEM;
444 
445 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
446 	if (!ls)
447 		goto out;
448 	memcpy(ls->ls_name, name, namelen);
449 	ls->ls_namelen = namelen;
450 	ls->ls_lvblen = lvblen;
451 	ls->ls_count = 0;
452 	ls->ls_flags = 0;
453 	ls->ls_scan_time = jiffies;
454 
455 	if (flags & DLM_LSFL_TIMEWARN)
456 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
457 
458 	/* ls_exflags are forced to match among nodes, and we don't
459 	   need to require all nodes to have some flags set */
460 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
461 				    DLM_LSFL_NEWEXCL));
462 
463 	size = dlm_config.ci_rsbtbl_size;
464 	ls->ls_rsbtbl_size = size;
465 
466 	ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_NOFS);
467 	if (!ls->ls_rsbtbl)
468 		goto out_lsfree;
469 	for (i = 0; i < size; i++) {
470 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
471 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
472 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
473 	}
474 
475 	size = dlm_config.ci_lkbtbl_size;
476 	ls->ls_lkbtbl_size = size;
477 
478 	ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_NOFS);
479 	if (!ls->ls_lkbtbl)
480 		goto out_rsbfree;
481 	for (i = 0; i < size; i++) {
482 		INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
483 		rwlock_init(&ls->ls_lkbtbl[i].lock);
484 		ls->ls_lkbtbl[i].counter = 1;
485 	}
486 
487 	size = dlm_config.ci_dirtbl_size;
488 	ls->ls_dirtbl_size = size;
489 
490 	ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_NOFS);
491 	if (!ls->ls_dirtbl)
492 		goto out_lkbfree;
493 	for (i = 0; i < size; i++) {
494 		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
495 		spin_lock_init(&ls->ls_dirtbl[i].lock);
496 	}
497 
498 	INIT_LIST_HEAD(&ls->ls_waiters);
499 	mutex_init(&ls->ls_waiters_mutex);
500 	INIT_LIST_HEAD(&ls->ls_orphans);
501 	mutex_init(&ls->ls_orphans_mutex);
502 	INIT_LIST_HEAD(&ls->ls_timeout);
503 	mutex_init(&ls->ls_timeout_mutex);
504 
505 	INIT_LIST_HEAD(&ls->ls_nodes);
506 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
507 	ls->ls_num_nodes = 0;
508 	ls->ls_low_nodeid = 0;
509 	ls->ls_total_weight = 0;
510 	ls->ls_node_array = NULL;
511 
512 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
513 	ls->ls_stub_rsb.res_ls = ls;
514 
515 	ls->ls_debug_rsb_dentry = NULL;
516 	ls->ls_debug_waiters_dentry = NULL;
517 
518 	init_waitqueue_head(&ls->ls_uevent_wait);
519 	ls->ls_uevent_result = 0;
520 	init_completion(&ls->ls_members_done);
521 	ls->ls_members_result = -1;
522 
523 	ls->ls_recoverd_task = NULL;
524 	mutex_init(&ls->ls_recoverd_active);
525 	spin_lock_init(&ls->ls_recover_lock);
526 	spin_lock_init(&ls->ls_rcom_spin);
527 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
528 	ls->ls_recover_status = 0;
529 	ls->ls_recover_seq = 0;
530 	ls->ls_recover_args = NULL;
531 	init_rwsem(&ls->ls_in_recovery);
532 	init_rwsem(&ls->ls_recv_active);
533 	INIT_LIST_HEAD(&ls->ls_requestqueue);
534 	mutex_init(&ls->ls_requestqueue_mutex);
535 	mutex_init(&ls->ls_clear_proc_locks);
536 
537 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
538 	if (!ls->ls_recover_buf)
539 		goto out_dirfree;
540 
541 	INIT_LIST_HEAD(&ls->ls_recover_list);
542 	spin_lock_init(&ls->ls_recover_list_lock);
543 	ls->ls_recover_list_count = 0;
544 	ls->ls_local_handle = ls;
545 	init_waitqueue_head(&ls->ls_wait_general);
546 	INIT_LIST_HEAD(&ls->ls_root_list);
547 	init_rwsem(&ls->ls_root_sem);
548 
549 	down_write(&ls->ls_in_recovery);
550 
551 	spin_lock(&lslist_lock);
552 	ls->ls_create_count = 1;
553 	list_add(&ls->ls_list, &lslist);
554 	spin_unlock(&lslist_lock);
555 
556 	/* needs to find ls in lslist */
557 	error = dlm_recoverd_start(ls);
558 	if (error) {
559 		log_error(ls, "can't start dlm_recoverd %d", error);
560 		goto out_delist;
561 	}
562 
563 	ls->ls_kobj.kset = dlm_kset;
564 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
565 				     "%s", ls->ls_name);
566 	if (error)
567 		goto out_stop;
568 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
569 
570 	/* let kobject handle freeing of ls if there's an error */
571 	do_unreg = 1;
572 
573 	/* This uevent triggers dlm_controld in userspace to add us to the
574 	   group of nodes that are members of this lockspace (managed by the
575 	   cluster infrastructure.)  Once it's done that, it tells us who the
576 	   current lockspace members are (via configfs) and then tells the
577 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
578 
579 	error = do_uevent(ls, 1);
580 	if (error)
581 		goto out_stop;
582 
583 	wait_for_completion(&ls->ls_members_done);
584 	error = ls->ls_members_result;
585 	if (error)
586 		goto out_members;
587 
588 	dlm_create_debug_file(ls);
589 
590 	log_debug(ls, "join complete");
591 	*lockspace = ls;
592 	return 0;
593 
594  out_members:
595 	do_uevent(ls, 0);
596 	dlm_clear_members(ls);
597 	kfree(ls->ls_node_array);
598  out_stop:
599 	dlm_recoverd_stop(ls);
600  out_delist:
601 	spin_lock(&lslist_lock);
602 	list_del(&ls->ls_list);
603 	spin_unlock(&lslist_lock);
604 	kfree(ls->ls_recover_buf);
605  out_dirfree:
606 	kfree(ls->ls_dirtbl);
607  out_lkbfree:
608 	kfree(ls->ls_lkbtbl);
609  out_rsbfree:
610 	kfree(ls->ls_rsbtbl);
611  out_lsfree:
612 	if (do_unreg)
613 		kobject_put(&ls->ls_kobj);
614 	else
615 		kfree(ls);
616  out:
617 	module_put(THIS_MODULE);
618 	return error;
619 }
620 
621 int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
622 		      uint32_t flags, int lvblen)
623 {
624 	int error = 0;
625 
626 	mutex_lock(&ls_lock);
627 	if (!ls_count)
628 		error = threads_start();
629 	if (error)
630 		goto out;
631 
632 	error = new_lockspace(name, namelen, lockspace, flags, lvblen);
633 	if (!error)
634 		ls_count++;
635 	if (error > 0)
636 		error = 0;
637 	if (!ls_count)
638 		threads_stop();
639  out:
640 	mutex_unlock(&ls_lock);
641 	return error;
642 }
643 
644 /* Return 1 if the lockspace still has active remote locks,
645  *        2 if the lockspace still has active local locks.
646  */
647 static int lockspace_busy(struct dlm_ls *ls)
648 {
649 	int i, lkb_found = 0;
650 	struct dlm_lkb *lkb;
651 
652 	/* NOTE: We check the lockidtbl here rather than the resource table.
653 	   This is because there may be LKBs queued as ASTs that have been
654 	   unlinked from their RSBs and are pending deletion once the AST has
655 	   been delivered */
656 
657 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
658 		read_lock(&ls->ls_lkbtbl[i].lock);
659 		if (!list_empty(&ls->ls_lkbtbl[i].list)) {
660 			lkb_found = 1;
661 			list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
662 					    lkb_idtbl_list) {
663 				if (!lkb->lkb_nodeid) {
664 					read_unlock(&ls->ls_lkbtbl[i].lock);
665 					return 2;
666 				}
667 			}
668 		}
669 		read_unlock(&ls->ls_lkbtbl[i].lock);
670 	}
671 	return lkb_found;
672 }
673 
674 static int release_lockspace(struct dlm_ls *ls, int force)
675 {
676 	struct dlm_lkb *lkb;
677 	struct dlm_rsb *rsb;
678 	struct list_head *head;
679 	int i, busy, rv;
680 
681 	busy = lockspace_busy(ls);
682 
683 	spin_lock(&lslist_lock);
684 	if (ls->ls_create_count == 1) {
685 		if (busy > force)
686 			rv = -EBUSY;
687 		else {
688 			/* remove_lockspace takes ls off lslist */
689 			ls->ls_create_count = 0;
690 			rv = 0;
691 		}
692 	} else if (ls->ls_create_count > 1) {
693 		rv = --ls->ls_create_count;
694 	} else {
695 		rv = -EINVAL;
696 	}
697 	spin_unlock(&lslist_lock);
698 
699 	if (rv) {
700 		log_debug(ls, "release_lockspace no remove %d", rv);
701 		return rv;
702 	}
703 
704 	dlm_device_deregister(ls);
705 
706 	if (force < 3 && dlm_user_daemon_available())
707 		do_uevent(ls, 0);
708 
709 	dlm_recoverd_stop(ls);
710 
711 	remove_lockspace(ls);
712 
713 	dlm_delete_debug_file(ls);
714 
715 	dlm_astd_suspend();
716 
717 	kfree(ls->ls_recover_buf);
718 
719 	/*
720 	 * Free direntry structs.
721 	 */
722 
723 	dlm_dir_clear(ls);
724 	kfree(ls->ls_dirtbl);
725 
726 	/*
727 	 * Free all lkb's on lkbtbl[] lists.
728 	 */
729 
730 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
731 		head = &ls->ls_lkbtbl[i].list;
732 		while (!list_empty(head)) {
733 			lkb = list_entry(head->next, struct dlm_lkb,
734 					 lkb_idtbl_list);
735 
736 			list_del(&lkb->lkb_idtbl_list);
737 
738 			dlm_del_ast(lkb);
739 
740 			if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
741 				dlm_free_lvb(lkb->lkb_lvbptr);
742 
743 			dlm_free_lkb(lkb);
744 		}
745 	}
746 	dlm_astd_resume();
747 
748 	kfree(ls->ls_lkbtbl);
749 
750 	/*
751 	 * Free all rsb's on rsbtbl[] lists
752 	 */
753 
754 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
755 		head = &ls->ls_rsbtbl[i].list;
756 		while (!list_empty(head)) {
757 			rsb = list_entry(head->next, struct dlm_rsb,
758 					 res_hashchain);
759 
760 			list_del(&rsb->res_hashchain);
761 			dlm_free_rsb(rsb);
762 		}
763 
764 		head = &ls->ls_rsbtbl[i].toss;
765 		while (!list_empty(head)) {
766 			rsb = list_entry(head->next, struct dlm_rsb,
767 					 res_hashchain);
768 			list_del(&rsb->res_hashchain);
769 			dlm_free_rsb(rsb);
770 		}
771 	}
772 
773 	kfree(ls->ls_rsbtbl);
774 
775 	/*
776 	 * Free structures on any other lists
777 	 */
778 
779 	dlm_purge_requestqueue(ls);
780 	kfree(ls->ls_recover_args);
781 	dlm_clear_free_entries(ls);
782 	dlm_clear_members(ls);
783 	dlm_clear_members_gone(ls);
784 	kfree(ls->ls_node_array);
785 	log_debug(ls, "release_lockspace final free");
786 	kobject_put(&ls->ls_kobj);
787 	/* The ls structure will be freed when the kobject is done with */
788 
789 	module_put(THIS_MODULE);
790 	return 0;
791 }
792 
793 /*
794  * Called when a system has released all its locks and is not going to use the
795  * lockspace any longer.  We free everything we're managing for this lockspace.
796  * Remaining nodes will go through the recovery process as if we'd died.  The
797  * lockspace must continue to function as usual, participating in recoveries,
798  * until this returns.
799  *
800  * Force has 4 possible values:
801  * 0 - don't destroy locksapce if it has any LKBs
802  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
803  * 2 - destroy lockspace regardless of LKBs
804  * 3 - destroy lockspace as part of a forced shutdown
805  */
806 
807 int dlm_release_lockspace(void *lockspace, int force)
808 {
809 	struct dlm_ls *ls;
810 	int error;
811 
812 	ls = dlm_find_lockspace_local(lockspace);
813 	if (!ls)
814 		return -EINVAL;
815 	dlm_put_lockspace(ls);
816 
817 	mutex_lock(&ls_lock);
818 	error = release_lockspace(ls, force);
819 	if (!error)
820 		ls_count--;
821 	if (!ls_count)
822 		threads_stop();
823 	mutex_unlock(&ls_lock);
824 
825 	return error;
826 }
827 
828 void dlm_stop_lockspaces(void)
829 {
830 	struct dlm_ls *ls;
831 
832  restart:
833 	spin_lock(&lslist_lock);
834 	list_for_each_entry(ls, &lslist, ls_list) {
835 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
836 			continue;
837 		spin_unlock(&lslist_lock);
838 		log_error(ls, "no userland control daemon, stopping lockspace");
839 		dlm_ls_stop(ls);
840 		goto restart;
841 	}
842 	spin_unlock(&lslist_lock);
843 }
844 
845