xref: /linux/fs/dlm/lockspace.c (revision 7f3edee81fbd49114c28057512906f169caa0bed)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
30 #else
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
33 #endif
34 
35 static int			ls_count;
36 static struct mutex		ls_lock;
37 static struct list_head		lslist;
38 static spinlock_t		lslist_lock;
39 static struct task_struct *	scand_task;
40 
41 
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
43 {
44 	ssize_t ret = len;
45 	int n = simple_strtol(buf, NULL, 0);
46 
47 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
48 	if (!ls)
49 		return -EINVAL;
50 
51 	switch (n) {
52 	case 0:
53 		dlm_ls_stop(ls);
54 		break;
55 	case 1:
56 		dlm_ls_start(ls);
57 		break;
58 	default:
59 		ret = -EINVAL;
60 	}
61 	dlm_put_lockspace(ls);
62 	return ret;
63 }
64 
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
66 {
67 	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69 	wake_up(&ls->ls_uevent_wait);
70 	return len;
71 }
72 
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77 
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80 	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
81 	return len;
82 }
83 
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
85 {
86 	uint32_t status = dlm_recover_status(ls);
87 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
88 }
89 
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
91 {
92 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
93 }
94 
95 struct dlm_attr {
96 	struct attribute attr;
97 	ssize_t (*show)(struct dlm_ls *, char *);
98 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
99 };
100 
101 static struct dlm_attr dlm_attr_control = {
102 	.attr  = {.name = "control", .mode = S_IWUSR},
103 	.store = dlm_control_store
104 };
105 
106 static struct dlm_attr dlm_attr_event = {
107 	.attr  = {.name = "event_done", .mode = S_IWUSR},
108 	.store = dlm_event_store
109 };
110 
111 static struct dlm_attr dlm_attr_id = {
112 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
113 	.show  = dlm_id_show,
114 	.store = dlm_id_store
115 };
116 
117 static struct dlm_attr dlm_attr_recover_status = {
118 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
119 	.show  = dlm_recover_status_show
120 };
121 
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
124 	.show  = dlm_recover_nodeid_show
125 };
126 
127 static struct attribute *dlm_attrs[] = {
128 	&dlm_attr_control.attr,
129 	&dlm_attr_event.attr,
130 	&dlm_attr_id.attr,
131 	&dlm_attr_recover_status.attr,
132 	&dlm_attr_recover_nodeid.attr,
133 	NULL,
134 };
135 
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
137 			     char *buf)
138 {
139 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141 	return a->show ? a->show(ls, buf) : 0;
142 }
143 
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145 			      const char *buf, size_t len)
146 {
147 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
148 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149 	return a->store ? a->store(ls, buf, len) : len;
150 }
151 
152 static void lockspace_kobj_release(struct kobject *k)
153 {
154 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
155 	kfree(ls);
156 }
157 
158 static struct sysfs_ops dlm_attr_ops = {
159 	.show  = dlm_attr_show,
160 	.store = dlm_attr_store,
161 };
162 
163 static struct kobj_type dlm_ktype = {
164 	.default_attrs = dlm_attrs,
165 	.sysfs_ops     = &dlm_attr_ops,
166 	.release       = lockspace_kobj_release,
167 };
168 
169 static struct kset *dlm_kset;
170 
171 static int do_uevent(struct dlm_ls *ls, int in)
172 {
173 	int error;
174 
175 	if (in)
176 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
177 	else
178 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
179 
180 	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
181 
182 	/* dlm_controld will see the uevent, do the necessary group management
183 	   and then write to sysfs to wake us */
184 
185 	error = wait_event_interruptible(ls->ls_uevent_wait,
186 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
187 
188 	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
189 
190 	if (error)
191 		goto out;
192 
193 	error = ls->ls_uevent_result;
194  out:
195 	if (error)
196 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
197 			  error, ls->ls_uevent_result);
198 	return error;
199 }
200 
201 
202 int dlm_lockspace_init(void)
203 {
204 	ls_count = 0;
205 	mutex_init(&ls_lock);
206 	INIT_LIST_HEAD(&lslist);
207 	spin_lock_init(&lslist_lock);
208 
209 	dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
210 	if (!dlm_kset) {
211 		printk(KERN_WARNING "%s: can not create kset\n", __FUNCTION__);
212 		return -ENOMEM;
213 	}
214 	return 0;
215 }
216 
217 void dlm_lockspace_exit(void)
218 {
219 	kset_unregister(dlm_kset);
220 }
221 
222 static int dlm_scand(void *data)
223 {
224 	struct dlm_ls *ls;
225 
226 	while (!kthread_should_stop()) {
227 		list_for_each_entry(ls, &lslist, ls_list) {
228 			if (dlm_lock_recovery_try(ls)) {
229 				dlm_scan_rsbs(ls);
230 				dlm_scan_timeout(ls);
231 				dlm_unlock_recovery(ls);
232 			}
233 		}
234 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
235 	}
236 	return 0;
237 }
238 
239 static int dlm_scand_start(void)
240 {
241 	struct task_struct *p;
242 	int error = 0;
243 
244 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
245 	if (IS_ERR(p))
246 		error = PTR_ERR(p);
247 	else
248 		scand_task = p;
249 	return error;
250 }
251 
252 static void dlm_scand_stop(void)
253 {
254 	kthread_stop(scand_task);
255 }
256 
257 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
258 {
259 	struct dlm_ls *ls;
260 
261 	spin_lock(&lslist_lock);
262 
263 	list_for_each_entry(ls, &lslist, ls_list) {
264 		if (ls->ls_namelen == namelen &&
265 		    memcmp(ls->ls_name, name, namelen) == 0)
266 			goto out;
267 	}
268 	ls = NULL;
269  out:
270 	spin_unlock(&lslist_lock);
271 	return ls;
272 }
273 
274 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
275 {
276 	struct dlm_ls *ls;
277 
278 	spin_lock(&lslist_lock);
279 
280 	list_for_each_entry(ls, &lslist, ls_list) {
281 		if (ls->ls_global_id == id) {
282 			ls->ls_count++;
283 			goto out;
284 		}
285 	}
286 	ls = NULL;
287  out:
288 	spin_unlock(&lslist_lock);
289 	return ls;
290 }
291 
292 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
293 {
294 	struct dlm_ls *ls;
295 
296 	spin_lock(&lslist_lock);
297 	list_for_each_entry(ls, &lslist, ls_list) {
298 		if (ls->ls_local_handle == lockspace) {
299 			ls->ls_count++;
300 			goto out;
301 		}
302 	}
303 	ls = NULL;
304  out:
305 	spin_unlock(&lslist_lock);
306 	return ls;
307 }
308 
309 struct dlm_ls *dlm_find_lockspace_device(int minor)
310 {
311 	struct dlm_ls *ls;
312 
313 	spin_lock(&lslist_lock);
314 	list_for_each_entry(ls, &lslist, ls_list) {
315 		if (ls->ls_device.minor == minor) {
316 			ls->ls_count++;
317 			goto out;
318 		}
319 	}
320 	ls = NULL;
321  out:
322 	spin_unlock(&lslist_lock);
323 	return ls;
324 }
325 
326 void dlm_put_lockspace(struct dlm_ls *ls)
327 {
328 	spin_lock(&lslist_lock);
329 	ls->ls_count--;
330 	spin_unlock(&lslist_lock);
331 }
332 
333 static void remove_lockspace(struct dlm_ls *ls)
334 {
335 	for (;;) {
336 		spin_lock(&lslist_lock);
337 		if (ls->ls_count == 0) {
338 			list_del(&ls->ls_list);
339 			spin_unlock(&lslist_lock);
340 			return;
341 		}
342 		spin_unlock(&lslist_lock);
343 		ssleep(1);
344 	}
345 }
346 
347 static int threads_start(void)
348 {
349 	int error;
350 
351 	/* Thread which process lock requests for all lockspace's */
352 	error = dlm_astd_start();
353 	if (error) {
354 		log_print("cannot start dlm_astd thread %d", error);
355 		goto fail;
356 	}
357 
358 	error = dlm_scand_start();
359 	if (error) {
360 		log_print("cannot start dlm_scand thread %d", error);
361 		goto astd_fail;
362 	}
363 
364 	/* Thread for sending/receiving messages for all lockspace's */
365 	error = dlm_lowcomms_start();
366 	if (error) {
367 		log_print("cannot start dlm lowcomms %d", error);
368 		goto scand_fail;
369 	}
370 
371 	return 0;
372 
373  scand_fail:
374 	dlm_scand_stop();
375  astd_fail:
376 	dlm_astd_stop();
377  fail:
378 	return error;
379 }
380 
381 static void threads_stop(void)
382 {
383 	dlm_scand_stop();
384 	dlm_lowcomms_stop();
385 	dlm_astd_stop();
386 }
387 
388 static int new_lockspace(char *name, int namelen, void **lockspace,
389 			 uint32_t flags, int lvblen)
390 {
391 	struct dlm_ls *ls;
392 	int i, size, error = -ENOMEM;
393 	int do_unreg = 0;
394 
395 	if (namelen > DLM_LOCKSPACE_LEN)
396 		return -EINVAL;
397 
398 	if (!lvblen || (lvblen % 8))
399 		return -EINVAL;
400 
401 	if (!try_module_get(THIS_MODULE))
402 		return -EINVAL;
403 
404 	ls = dlm_find_lockspace_name(name, namelen);
405 	if (ls) {
406 		*lockspace = ls;
407 		module_put(THIS_MODULE);
408 		return -EEXIST;
409 	}
410 
411 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
412 	if (!ls)
413 		goto out;
414 	memcpy(ls->ls_name, name, namelen);
415 	ls->ls_namelen = namelen;
416 	ls->ls_lvblen = lvblen;
417 	ls->ls_count = 0;
418 	ls->ls_flags = 0;
419 
420 	if (flags & DLM_LSFL_TIMEWARN)
421 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
422 
423 	if (flags & DLM_LSFL_FS)
424 		ls->ls_allocation = GFP_NOFS;
425 	else
426 		ls->ls_allocation = GFP_KERNEL;
427 
428 	/* ls_exflags are forced to match among nodes, and we don't
429 	   need to require all nodes to have TIMEWARN or FS set */
430 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS));
431 
432 	size = dlm_config.ci_rsbtbl_size;
433 	ls->ls_rsbtbl_size = size;
434 
435 	ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
436 	if (!ls->ls_rsbtbl)
437 		goto out_lsfree;
438 	for (i = 0; i < size; i++) {
439 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
440 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
441 		rwlock_init(&ls->ls_rsbtbl[i].lock);
442 	}
443 
444 	size = dlm_config.ci_lkbtbl_size;
445 	ls->ls_lkbtbl_size = size;
446 
447 	ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
448 	if (!ls->ls_lkbtbl)
449 		goto out_rsbfree;
450 	for (i = 0; i < size; i++) {
451 		INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
452 		rwlock_init(&ls->ls_lkbtbl[i].lock);
453 		ls->ls_lkbtbl[i].counter = 1;
454 	}
455 
456 	size = dlm_config.ci_dirtbl_size;
457 	ls->ls_dirtbl_size = size;
458 
459 	ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
460 	if (!ls->ls_dirtbl)
461 		goto out_lkbfree;
462 	for (i = 0; i < size; i++) {
463 		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
464 		rwlock_init(&ls->ls_dirtbl[i].lock);
465 	}
466 
467 	INIT_LIST_HEAD(&ls->ls_waiters);
468 	mutex_init(&ls->ls_waiters_mutex);
469 	INIT_LIST_HEAD(&ls->ls_orphans);
470 	mutex_init(&ls->ls_orphans_mutex);
471 	INIT_LIST_HEAD(&ls->ls_timeout);
472 	mutex_init(&ls->ls_timeout_mutex);
473 
474 	INIT_LIST_HEAD(&ls->ls_nodes);
475 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
476 	ls->ls_num_nodes = 0;
477 	ls->ls_low_nodeid = 0;
478 	ls->ls_total_weight = 0;
479 	ls->ls_node_array = NULL;
480 
481 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
482 	ls->ls_stub_rsb.res_ls = ls;
483 
484 	ls->ls_debug_rsb_dentry = NULL;
485 	ls->ls_debug_waiters_dentry = NULL;
486 
487 	init_waitqueue_head(&ls->ls_uevent_wait);
488 	ls->ls_uevent_result = 0;
489 	init_completion(&ls->ls_members_done);
490 	ls->ls_members_result = -1;
491 
492 	ls->ls_recoverd_task = NULL;
493 	mutex_init(&ls->ls_recoverd_active);
494 	spin_lock_init(&ls->ls_recover_lock);
495 	spin_lock_init(&ls->ls_rcom_spin);
496 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
497 	ls->ls_recover_status = 0;
498 	ls->ls_recover_seq = 0;
499 	ls->ls_recover_args = NULL;
500 	init_rwsem(&ls->ls_in_recovery);
501 	init_rwsem(&ls->ls_recv_active);
502 	INIT_LIST_HEAD(&ls->ls_requestqueue);
503 	mutex_init(&ls->ls_requestqueue_mutex);
504 	mutex_init(&ls->ls_clear_proc_locks);
505 
506 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
507 	if (!ls->ls_recover_buf)
508 		goto out_dirfree;
509 
510 	INIT_LIST_HEAD(&ls->ls_recover_list);
511 	spin_lock_init(&ls->ls_recover_list_lock);
512 	ls->ls_recover_list_count = 0;
513 	ls->ls_local_handle = ls;
514 	init_waitqueue_head(&ls->ls_wait_general);
515 	INIT_LIST_HEAD(&ls->ls_root_list);
516 	init_rwsem(&ls->ls_root_sem);
517 
518 	down_write(&ls->ls_in_recovery);
519 
520 	spin_lock(&lslist_lock);
521 	list_add(&ls->ls_list, &lslist);
522 	spin_unlock(&lslist_lock);
523 
524 	/* needs to find ls in lslist */
525 	error = dlm_recoverd_start(ls);
526 	if (error) {
527 		log_error(ls, "can't start dlm_recoverd %d", error);
528 		goto out_delist;
529 	}
530 
531 	ls->ls_kobj.kset = dlm_kset;
532 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
533 				     "%s", ls->ls_name);
534 	if (error)
535 		goto out_stop;
536 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
537 
538 	/* let kobject handle freeing of ls if there's an error */
539 	do_unreg = 1;
540 
541 	/* This uevent triggers dlm_controld in userspace to add us to the
542 	   group of nodes that are members of this lockspace (managed by the
543 	   cluster infrastructure.)  Once it's done that, it tells us who the
544 	   current lockspace members are (via configfs) and then tells the
545 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
546 
547 	error = do_uevent(ls, 1);
548 	if (error)
549 		goto out_stop;
550 
551 	wait_for_completion(&ls->ls_members_done);
552 	error = ls->ls_members_result;
553 	if (error)
554 		goto out_members;
555 
556 	dlm_create_debug_file(ls);
557 
558 	log_debug(ls, "join complete");
559 
560 	*lockspace = ls;
561 	return 0;
562 
563  out_members:
564 	do_uevent(ls, 0);
565 	dlm_clear_members(ls);
566 	kfree(ls->ls_node_array);
567  out_stop:
568 	dlm_recoverd_stop(ls);
569  out_delist:
570 	spin_lock(&lslist_lock);
571 	list_del(&ls->ls_list);
572 	spin_unlock(&lslist_lock);
573 	kfree(ls->ls_recover_buf);
574  out_dirfree:
575 	kfree(ls->ls_dirtbl);
576  out_lkbfree:
577 	kfree(ls->ls_lkbtbl);
578  out_rsbfree:
579 	kfree(ls->ls_rsbtbl);
580  out_lsfree:
581 	if (do_unreg)
582 		kobject_put(&ls->ls_kobj);
583 	else
584 		kfree(ls);
585  out:
586 	module_put(THIS_MODULE);
587 	return error;
588 }
589 
590 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
591 		      uint32_t flags, int lvblen)
592 {
593 	int error = 0;
594 
595 	mutex_lock(&ls_lock);
596 	if (!ls_count)
597 		error = threads_start();
598 	if (error)
599 		goto out;
600 
601 	error = new_lockspace(name, namelen, lockspace, flags, lvblen);
602 	if (!error)
603 		ls_count++;
604 	else if (!ls_count)
605 		threads_stop();
606  out:
607 	mutex_unlock(&ls_lock);
608 	return error;
609 }
610 
611 /* Return 1 if the lockspace still has active remote locks,
612  *        2 if the lockspace still has active local locks.
613  */
614 static int lockspace_busy(struct dlm_ls *ls)
615 {
616 	int i, lkb_found = 0;
617 	struct dlm_lkb *lkb;
618 
619 	/* NOTE: We check the lockidtbl here rather than the resource table.
620 	   This is because there may be LKBs queued as ASTs that have been
621 	   unlinked from their RSBs and are pending deletion once the AST has
622 	   been delivered */
623 
624 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
625 		read_lock(&ls->ls_lkbtbl[i].lock);
626 		if (!list_empty(&ls->ls_lkbtbl[i].list)) {
627 			lkb_found = 1;
628 			list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
629 					    lkb_idtbl_list) {
630 				if (!lkb->lkb_nodeid) {
631 					read_unlock(&ls->ls_lkbtbl[i].lock);
632 					return 2;
633 				}
634 			}
635 		}
636 		read_unlock(&ls->ls_lkbtbl[i].lock);
637 	}
638 	return lkb_found;
639 }
640 
641 static int release_lockspace(struct dlm_ls *ls, int force)
642 {
643 	struct dlm_lkb *lkb;
644 	struct dlm_rsb *rsb;
645 	struct list_head *head;
646 	int i;
647 	int busy = lockspace_busy(ls);
648 
649 	if (busy > force)
650 		return -EBUSY;
651 
652 	if (force < 3)
653 		do_uevent(ls, 0);
654 
655 	dlm_recoverd_stop(ls);
656 
657 	remove_lockspace(ls);
658 
659 	dlm_delete_debug_file(ls);
660 
661 	dlm_astd_suspend();
662 
663 	kfree(ls->ls_recover_buf);
664 
665 	/*
666 	 * Free direntry structs.
667 	 */
668 
669 	dlm_dir_clear(ls);
670 	kfree(ls->ls_dirtbl);
671 
672 	/*
673 	 * Free all lkb's on lkbtbl[] lists.
674 	 */
675 
676 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
677 		head = &ls->ls_lkbtbl[i].list;
678 		while (!list_empty(head)) {
679 			lkb = list_entry(head->next, struct dlm_lkb,
680 					 lkb_idtbl_list);
681 
682 			list_del(&lkb->lkb_idtbl_list);
683 
684 			dlm_del_ast(lkb);
685 
686 			if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
687 				free_lvb(lkb->lkb_lvbptr);
688 
689 			free_lkb(lkb);
690 		}
691 	}
692 	dlm_astd_resume();
693 
694 	kfree(ls->ls_lkbtbl);
695 
696 	/*
697 	 * Free all rsb's on rsbtbl[] lists
698 	 */
699 
700 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
701 		head = &ls->ls_rsbtbl[i].list;
702 		while (!list_empty(head)) {
703 			rsb = list_entry(head->next, struct dlm_rsb,
704 					 res_hashchain);
705 
706 			list_del(&rsb->res_hashchain);
707 			free_rsb(rsb);
708 		}
709 
710 		head = &ls->ls_rsbtbl[i].toss;
711 		while (!list_empty(head)) {
712 			rsb = list_entry(head->next, struct dlm_rsb,
713 					 res_hashchain);
714 			list_del(&rsb->res_hashchain);
715 			free_rsb(rsb);
716 		}
717 	}
718 
719 	kfree(ls->ls_rsbtbl);
720 
721 	/*
722 	 * Free structures on any other lists
723 	 */
724 
725 	dlm_purge_requestqueue(ls);
726 	kfree(ls->ls_recover_args);
727 	dlm_clear_free_entries(ls);
728 	dlm_clear_members(ls);
729 	dlm_clear_members_gone(ls);
730 	kfree(ls->ls_node_array);
731 	kobject_put(&ls->ls_kobj);
732 	/* The ls structure will be freed when the kobject is done with */
733 
734 	mutex_lock(&ls_lock);
735 	ls_count--;
736 	if (!ls_count)
737 		threads_stop();
738 	mutex_unlock(&ls_lock);
739 
740 	module_put(THIS_MODULE);
741 	return 0;
742 }
743 
744 /*
745  * Called when a system has released all its locks and is not going to use the
746  * lockspace any longer.  We free everything we're managing for this lockspace.
747  * Remaining nodes will go through the recovery process as if we'd died.  The
748  * lockspace must continue to function as usual, participating in recoveries,
749  * until this returns.
750  *
751  * Force has 4 possible values:
752  * 0 - don't destroy locksapce if it has any LKBs
753  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
754  * 2 - destroy lockspace regardless of LKBs
755  * 3 - destroy lockspace as part of a forced shutdown
756  */
757 
758 int dlm_release_lockspace(void *lockspace, int force)
759 {
760 	struct dlm_ls *ls;
761 
762 	ls = dlm_find_lockspace_local(lockspace);
763 	if (!ls)
764 		return -EINVAL;
765 	dlm_put_lockspace(ls);
766 	return release_lockspace(ls, force);
767 }
768 
769