xref: /linux/fs/dlm/lockspace.c (revision 96532151ff3567154cac92983b9edc3138fa097c)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
30 #else
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
33 #endif
34 
35 static int			ls_count;
36 static struct mutex		ls_lock;
37 static struct list_head		lslist;
38 static spinlock_t		lslist_lock;
39 static struct task_struct *	scand_task;
40 
41 
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
43 {
44 	ssize_t ret = len;
45 	int n = simple_strtol(buf, NULL, 0);
46 
47 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
48 	if (!ls)
49 		return -EINVAL;
50 
51 	switch (n) {
52 	case 0:
53 		dlm_ls_stop(ls);
54 		break;
55 	case 1:
56 		dlm_ls_start(ls);
57 		break;
58 	default:
59 		ret = -EINVAL;
60 	}
61 	dlm_put_lockspace(ls);
62 	return ret;
63 }
64 
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
66 {
67 	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69 	wake_up(&ls->ls_uevent_wait);
70 	return len;
71 }
72 
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77 
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80 	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
81 	return len;
82 }
83 
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
85 {
86 	uint32_t status = dlm_recover_status(ls);
87 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
88 }
89 
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
91 {
92 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
93 }
94 
95 struct dlm_attr {
96 	struct attribute attr;
97 	ssize_t (*show)(struct dlm_ls *, char *);
98 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
99 };
100 
101 static struct dlm_attr dlm_attr_control = {
102 	.attr  = {.name = "control", .mode = S_IWUSR},
103 	.store = dlm_control_store
104 };
105 
106 static struct dlm_attr dlm_attr_event = {
107 	.attr  = {.name = "event_done", .mode = S_IWUSR},
108 	.store = dlm_event_store
109 };
110 
111 static struct dlm_attr dlm_attr_id = {
112 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
113 	.show  = dlm_id_show,
114 	.store = dlm_id_store
115 };
116 
117 static struct dlm_attr dlm_attr_recover_status = {
118 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
119 	.show  = dlm_recover_status_show
120 };
121 
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
124 	.show  = dlm_recover_nodeid_show
125 };
126 
127 static struct attribute *dlm_attrs[] = {
128 	&dlm_attr_control.attr,
129 	&dlm_attr_event.attr,
130 	&dlm_attr_id.attr,
131 	&dlm_attr_recover_status.attr,
132 	&dlm_attr_recover_nodeid.attr,
133 	NULL,
134 };
135 
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
137 			     char *buf)
138 {
139 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141 	return a->show ? a->show(ls, buf) : 0;
142 }
143 
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145 			      const char *buf, size_t len)
146 {
147 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
148 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149 	return a->store ? a->store(ls, buf, len) : len;
150 }
151 
152 static void lockspace_kobj_release(struct kobject *k)
153 {
154 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
155 	kfree(ls);
156 }
157 
158 static struct sysfs_ops dlm_attr_ops = {
159 	.show  = dlm_attr_show,
160 	.store = dlm_attr_store,
161 };
162 
163 static struct kobj_type dlm_ktype = {
164 	.default_attrs = dlm_attrs,
165 	.sysfs_ops     = &dlm_attr_ops,
166 	.release       = lockspace_kobj_release,
167 };
168 
169 static struct kset dlm_kset = {
170 	.kobj   = {.name = "dlm",},
171 	.ktype  = &dlm_ktype,
172 };
173 
174 static int kobject_setup(struct dlm_ls *ls)
175 {
176 	char lsname[DLM_LOCKSPACE_LEN];
177 	int error;
178 
179 	memset(lsname, 0, DLM_LOCKSPACE_LEN);
180 	snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
181 
182 	error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
183 	if (error)
184 		return error;
185 
186 	ls->ls_kobj.kset = &dlm_kset;
187 	ls->ls_kobj.ktype = &dlm_ktype;
188 	return 0;
189 }
190 
191 static int do_uevent(struct dlm_ls *ls, int in)
192 {
193 	int error;
194 
195 	if (in)
196 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
197 	else
198 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
199 
200 	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
201 
202 	/* dlm_controld will see the uevent, do the necessary group management
203 	   and then write to sysfs to wake us */
204 
205 	error = wait_event_interruptible(ls->ls_uevent_wait,
206 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
207 
208 	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
209 
210 	if (error)
211 		goto out;
212 
213 	error = ls->ls_uevent_result;
214  out:
215 	if (error)
216 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
217 			  error, ls->ls_uevent_result);
218 	return error;
219 }
220 
221 
222 int dlm_lockspace_init(void)
223 {
224 	int error;
225 
226 	ls_count = 0;
227 	mutex_init(&ls_lock);
228 	INIT_LIST_HEAD(&lslist);
229 	spin_lock_init(&lslist_lock);
230 
231 	kobj_set_kset_s(&dlm_kset, kernel_subsys);
232 	error = kset_register(&dlm_kset);
233 	if (error)
234 		printk("dlm_lockspace_init: cannot register kset %d\n", error);
235 	return error;
236 }
237 
238 void dlm_lockspace_exit(void)
239 {
240 	kset_unregister(&dlm_kset);
241 }
242 
243 static int dlm_scand(void *data)
244 {
245 	struct dlm_ls *ls;
246 
247 	while (!kthread_should_stop()) {
248 		list_for_each_entry(ls, &lslist, ls_list) {
249 			if (dlm_lock_recovery_try(ls)) {
250 				dlm_scan_rsbs(ls);
251 				dlm_scan_timeout(ls);
252 				dlm_unlock_recovery(ls);
253 			}
254 		}
255 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
256 	}
257 	return 0;
258 }
259 
260 static int dlm_scand_start(void)
261 {
262 	struct task_struct *p;
263 	int error = 0;
264 
265 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
266 	if (IS_ERR(p))
267 		error = PTR_ERR(p);
268 	else
269 		scand_task = p;
270 	return error;
271 }
272 
273 static void dlm_scand_stop(void)
274 {
275 	kthread_stop(scand_task);
276 }
277 
278 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
279 {
280 	struct dlm_ls *ls;
281 
282 	spin_lock(&lslist_lock);
283 
284 	list_for_each_entry(ls, &lslist, ls_list) {
285 		if (ls->ls_namelen == namelen &&
286 		    memcmp(ls->ls_name, name, namelen) == 0)
287 			goto out;
288 	}
289 	ls = NULL;
290  out:
291 	spin_unlock(&lslist_lock);
292 	return ls;
293 }
294 
295 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
296 {
297 	struct dlm_ls *ls;
298 
299 	spin_lock(&lslist_lock);
300 
301 	list_for_each_entry(ls, &lslist, ls_list) {
302 		if (ls->ls_global_id == id) {
303 			ls->ls_count++;
304 			goto out;
305 		}
306 	}
307 	ls = NULL;
308  out:
309 	spin_unlock(&lslist_lock);
310 	return ls;
311 }
312 
313 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
314 {
315 	struct dlm_ls *ls;
316 
317 	spin_lock(&lslist_lock);
318 	list_for_each_entry(ls, &lslist, ls_list) {
319 		if (ls->ls_local_handle == lockspace) {
320 			ls->ls_count++;
321 			goto out;
322 		}
323 	}
324 	ls = NULL;
325  out:
326 	spin_unlock(&lslist_lock);
327 	return ls;
328 }
329 
330 struct dlm_ls *dlm_find_lockspace_device(int minor)
331 {
332 	struct dlm_ls *ls;
333 
334 	spin_lock(&lslist_lock);
335 	list_for_each_entry(ls, &lslist, ls_list) {
336 		if (ls->ls_device.minor == minor) {
337 			ls->ls_count++;
338 			goto out;
339 		}
340 	}
341 	ls = NULL;
342  out:
343 	spin_unlock(&lslist_lock);
344 	return ls;
345 }
346 
347 void dlm_put_lockspace(struct dlm_ls *ls)
348 {
349 	spin_lock(&lslist_lock);
350 	ls->ls_count--;
351 	spin_unlock(&lslist_lock);
352 }
353 
354 static void remove_lockspace(struct dlm_ls *ls)
355 {
356 	for (;;) {
357 		spin_lock(&lslist_lock);
358 		if (ls->ls_count == 0) {
359 			list_del(&ls->ls_list);
360 			spin_unlock(&lslist_lock);
361 			return;
362 		}
363 		spin_unlock(&lslist_lock);
364 		ssleep(1);
365 	}
366 }
367 
368 static int threads_start(void)
369 {
370 	int error;
371 
372 	/* Thread which process lock requests for all lockspace's */
373 	error = dlm_astd_start();
374 	if (error) {
375 		log_print("cannot start dlm_astd thread %d", error);
376 		goto fail;
377 	}
378 
379 	error = dlm_scand_start();
380 	if (error) {
381 		log_print("cannot start dlm_scand thread %d", error);
382 		goto astd_fail;
383 	}
384 
385 	/* Thread for sending/receiving messages for all lockspace's */
386 	error = dlm_lowcomms_start();
387 	if (error) {
388 		log_print("cannot start dlm lowcomms %d", error);
389 		goto scand_fail;
390 	}
391 
392 	return 0;
393 
394  scand_fail:
395 	dlm_scand_stop();
396  astd_fail:
397 	dlm_astd_stop();
398  fail:
399 	return error;
400 }
401 
402 static void threads_stop(void)
403 {
404 	dlm_scand_stop();
405 	dlm_lowcomms_stop();
406 	dlm_astd_stop();
407 }
408 
409 static int new_lockspace(char *name, int namelen, void **lockspace,
410 			 uint32_t flags, int lvblen)
411 {
412 	struct dlm_ls *ls;
413 	int i, size, error = -ENOMEM;
414 	int do_unreg = 0;
415 
416 	if (namelen > DLM_LOCKSPACE_LEN)
417 		return -EINVAL;
418 
419 	if (!lvblen || (lvblen % 8))
420 		return -EINVAL;
421 
422 	if (!try_module_get(THIS_MODULE))
423 		return -EINVAL;
424 
425 	ls = dlm_find_lockspace_name(name, namelen);
426 	if (ls) {
427 		*lockspace = ls;
428 		module_put(THIS_MODULE);
429 		return -EEXIST;
430 	}
431 
432 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
433 	if (!ls)
434 		goto out;
435 	memcpy(ls->ls_name, name, namelen);
436 	ls->ls_namelen = namelen;
437 	ls->ls_lvblen = lvblen;
438 	ls->ls_count = 0;
439 	ls->ls_flags = 0;
440 
441 	if (flags & DLM_LSFL_TIMEWARN)
442 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
443 
444 	if (flags & DLM_LSFL_FS)
445 		ls->ls_allocation = GFP_NOFS;
446 	else
447 		ls->ls_allocation = GFP_KERNEL;
448 
449 	/* ls_exflags are forced to match among nodes, and we don't
450 	   need to require all nodes to have TIMEWARN or FS set */
451 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS));
452 
453 	size = dlm_config.ci_rsbtbl_size;
454 	ls->ls_rsbtbl_size = size;
455 
456 	ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
457 	if (!ls->ls_rsbtbl)
458 		goto out_lsfree;
459 	for (i = 0; i < size; i++) {
460 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
461 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
462 		rwlock_init(&ls->ls_rsbtbl[i].lock);
463 	}
464 
465 	size = dlm_config.ci_lkbtbl_size;
466 	ls->ls_lkbtbl_size = size;
467 
468 	ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
469 	if (!ls->ls_lkbtbl)
470 		goto out_rsbfree;
471 	for (i = 0; i < size; i++) {
472 		INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
473 		rwlock_init(&ls->ls_lkbtbl[i].lock);
474 		ls->ls_lkbtbl[i].counter = 1;
475 	}
476 
477 	size = dlm_config.ci_dirtbl_size;
478 	ls->ls_dirtbl_size = size;
479 
480 	ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
481 	if (!ls->ls_dirtbl)
482 		goto out_lkbfree;
483 	for (i = 0; i < size; i++) {
484 		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
485 		rwlock_init(&ls->ls_dirtbl[i].lock);
486 	}
487 
488 	INIT_LIST_HEAD(&ls->ls_waiters);
489 	mutex_init(&ls->ls_waiters_mutex);
490 	INIT_LIST_HEAD(&ls->ls_orphans);
491 	mutex_init(&ls->ls_orphans_mutex);
492 	INIT_LIST_HEAD(&ls->ls_timeout);
493 	mutex_init(&ls->ls_timeout_mutex);
494 
495 	INIT_LIST_HEAD(&ls->ls_nodes);
496 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
497 	ls->ls_num_nodes = 0;
498 	ls->ls_low_nodeid = 0;
499 	ls->ls_total_weight = 0;
500 	ls->ls_node_array = NULL;
501 
502 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
503 	ls->ls_stub_rsb.res_ls = ls;
504 
505 	ls->ls_debug_rsb_dentry = NULL;
506 	ls->ls_debug_waiters_dentry = NULL;
507 
508 	init_waitqueue_head(&ls->ls_uevent_wait);
509 	ls->ls_uevent_result = 0;
510 	init_completion(&ls->ls_members_done);
511 	ls->ls_members_result = -1;
512 
513 	ls->ls_recoverd_task = NULL;
514 	mutex_init(&ls->ls_recoverd_active);
515 	spin_lock_init(&ls->ls_recover_lock);
516 	spin_lock_init(&ls->ls_rcom_spin);
517 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
518 	ls->ls_recover_status = 0;
519 	ls->ls_recover_seq = 0;
520 	ls->ls_recover_args = NULL;
521 	init_rwsem(&ls->ls_in_recovery);
522 	INIT_LIST_HEAD(&ls->ls_requestqueue);
523 	mutex_init(&ls->ls_requestqueue_mutex);
524 	mutex_init(&ls->ls_clear_proc_locks);
525 
526 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
527 	if (!ls->ls_recover_buf)
528 		goto out_dirfree;
529 
530 	INIT_LIST_HEAD(&ls->ls_recover_list);
531 	spin_lock_init(&ls->ls_recover_list_lock);
532 	ls->ls_recover_list_count = 0;
533 	ls->ls_local_handle = ls;
534 	init_waitqueue_head(&ls->ls_wait_general);
535 	INIT_LIST_HEAD(&ls->ls_root_list);
536 	init_rwsem(&ls->ls_root_sem);
537 
538 	down_write(&ls->ls_in_recovery);
539 
540 	spin_lock(&lslist_lock);
541 	list_add(&ls->ls_list, &lslist);
542 	spin_unlock(&lslist_lock);
543 
544 	/* needs to find ls in lslist */
545 	error = dlm_recoverd_start(ls);
546 	if (error) {
547 		log_error(ls, "can't start dlm_recoverd %d", error);
548 		goto out_delist;
549 	}
550 
551 	error = kobject_setup(ls);
552 	if (error)
553 		goto out_stop;
554 
555 	error = kobject_register(&ls->ls_kobj);
556 	if (error)
557 		goto out_stop;
558 
559 	/* let kobject handle freeing of ls if there's an error */
560 	do_unreg = 1;
561 
562 	/* This uevent triggers dlm_controld in userspace to add us to the
563 	   group of nodes that are members of this lockspace (managed by the
564 	   cluster infrastructure.)  Once it's done that, it tells us who the
565 	   current lockspace members are (via configfs) and then tells the
566 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
567 
568 	error = do_uevent(ls, 1);
569 	if (error)
570 		goto out_stop;
571 
572 	wait_for_completion(&ls->ls_members_done);
573 	error = ls->ls_members_result;
574 	if (error)
575 		goto out_members;
576 
577 	dlm_create_debug_file(ls);
578 
579 	log_debug(ls, "join complete");
580 
581 	*lockspace = ls;
582 	return 0;
583 
584  out_members:
585 	do_uevent(ls, 0);
586 	dlm_clear_members(ls);
587 	kfree(ls->ls_node_array);
588  out_stop:
589 	dlm_recoverd_stop(ls);
590  out_delist:
591 	spin_lock(&lslist_lock);
592 	list_del(&ls->ls_list);
593 	spin_unlock(&lslist_lock);
594 	kfree(ls->ls_recover_buf);
595  out_dirfree:
596 	kfree(ls->ls_dirtbl);
597  out_lkbfree:
598 	kfree(ls->ls_lkbtbl);
599  out_rsbfree:
600 	kfree(ls->ls_rsbtbl);
601  out_lsfree:
602 	if (do_unreg)
603 		kobject_unregister(&ls->ls_kobj);
604 	else
605 		kfree(ls);
606  out:
607 	module_put(THIS_MODULE);
608 	return error;
609 }
610 
611 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
612 		      uint32_t flags, int lvblen)
613 {
614 	int error = 0;
615 
616 	mutex_lock(&ls_lock);
617 	if (!ls_count)
618 		error = threads_start();
619 	if (error)
620 		goto out;
621 
622 	error = new_lockspace(name, namelen, lockspace, flags, lvblen);
623 	if (!error)
624 		ls_count++;
625 	else if (!ls_count)
626 		threads_stop();
627  out:
628 	mutex_unlock(&ls_lock);
629 	return error;
630 }
631 
632 /* Return 1 if the lockspace still has active remote locks,
633  *        2 if the lockspace still has active local locks.
634  */
635 static int lockspace_busy(struct dlm_ls *ls)
636 {
637 	int i, lkb_found = 0;
638 	struct dlm_lkb *lkb;
639 
640 	/* NOTE: We check the lockidtbl here rather than the resource table.
641 	   This is because there may be LKBs queued as ASTs that have been
642 	   unlinked from their RSBs and are pending deletion once the AST has
643 	   been delivered */
644 
645 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
646 		read_lock(&ls->ls_lkbtbl[i].lock);
647 		if (!list_empty(&ls->ls_lkbtbl[i].list)) {
648 			lkb_found = 1;
649 			list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
650 					    lkb_idtbl_list) {
651 				if (!lkb->lkb_nodeid) {
652 					read_unlock(&ls->ls_lkbtbl[i].lock);
653 					return 2;
654 				}
655 			}
656 		}
657 		read_unlock(&ls->ls_lkbtbl[i].lock);
658 	}
659 	return lkb_found;
660 }
661 
662 static int release_lockspace(struct dlm_ls *ls, int force)
663 {
664 	struct dlm_lkb *lkb;
665 	struct dlm_rsb *rsb;
666 	struct list_head *head;
667 	int i;
668 	int busy = lockspace_busy(ls);
669 
670 	if (busy > force)
671 		return -EBUSY;
672 
673 	if (force < 3)
674 		do_uevent(ls, 0);
675 
676 	dlm_recoverd_stop(ls);
677 
678 	remove_lockspace(ls);
679 
680 	dlm_delete_debug_file(ls);
681 
682 	dlm_astd_suspend();
683 
684 	kfree(ls->ls_recover_buf);
685 
686 	/*
687 	 * Free direntry structs.
688 	 */
689 
690 	dlm_dir_clear(ls);
691 	kfree(ls->ls_dirtbl);
692 
693 	/*
694 	 * Free all lkb's on lkbtbl[] lists.
695 	 */
696 
697 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
698 		head = &ls->ls_lkbtbl[i].list;
699 		while (!list_empty(head)) {
700 			lkb = list_entry(head->next, struct dlm_lkb,
701 					 lkb_idtbl_list);
702 
703 			list_del(&lkb->lkb_idtbl_list);
704 
705 			dlm_del_ast(lkb);
706 
707 			if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
708 				free_lvb(lkb->lkb_lvbptr);
709 
710 			free_lkb(lkb);
711 		}
712 	}
713 	dlm_astd_resume();
714 
715 	kfree(ls->ls_lkbtbl);
716 
717 	/*
718 	 * Free all rsb's on rsbtbl[] lists
719 	 */
720 
721 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
722 		head = &ls->ls_rsbtbl[i].list;
723 		while (!list_empty(head)) {
724 			rsb = list_entry(head->next, struct dlm_rsb,
725 					 res_hashchain);
726 
727 			list_del(&rsb->res_hashchain);
728 			free_rsb(rsb);
729 		}
730 
731 		head = &ls->ls_rsbtbl[i].toss;
732 		while (!list_empty(head)) {
733 			rsb = list_entry(head->next, struct dlm_rsb,
734 					 res_hashchain);
735 			list_del(&rsb->res_hashchain);
736 			free_rsb(rsb);
737 		}
738 	}
739 
740 	kfree(ls->ls_rsbtbl);
741 
742 	/*
743 	 * Free structures on any other lists
744 	 */
745 
746 	dlm_purge_requestqueue(ls);
747 	kfree(ls->ls_recover_args);
748 	dlm_clear_free_entries(ls);
749 	dlm_clear_members(ls);
750 	dlm_clear_members_gone(ls);
751 	kfree(ls->ls_node_array);
752 	kobject_unregister(&ls->ls_kobj);
753 	/* The ls structure will be freed when the kobject is done with */
754 
755 	mutex_lock(&ls_lock);
756 	ls_count--;
757 	if (!ls_count)
758 		threads_stop();
759 	mutex_unlock(&ls_lock);
760 
761 	module_put(THIS_MODULE);
762 	return 0;
763 }
764 
765 /*
766  * Called when a system has released all its locks and is not going to use the
767  * lockspace any longer.  We free everything we're managing for this lockspace.
768  * Remaining nodes will go through the recovery process as if we'd died.  The
769  * lockspace must continue to function as usual, participating in recoveries,
770  * until this returns.
771  *
772  * Force has 4 possible values:
773  * 0 - don't destroy locksapce if it has any LKBs
774  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
775  * 2 - destroy lockspace regardless of LKBs
776  * 3 - destroy lockspace as part of a forced shutdown
777  */
778 
779 int dlm_release_lockspace(void *lockspace, int force)
780 {
781 	struct dlm_ls *ls;
782 
783 	ls = dlm_find_lockspace_local(lockspace);
784 	if (!ls)
785 		return -EINVAL;
786 	dlm_put_lockspace(ls);
787 	return release_lockspace(ls, force);
788 }
789 
790