xref: /linux/fs/dlm/lockspace.c (revision bab2c80e5a6c855657482eac9e97f5f3eedb509a)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include <linux/module.h>
15 
16 #include "dlm_internal.h"
17 #include "lockspace.h"
18 #include "member.h"
19 #include "recoverd.h"
20 #include "dir.h"
21 #include "lowcomms.h"
22 #include "config.h"
23 #include "memory.h"
24 #include "lock.h"
25 #include "recover.h"
26 #include "requestqueue.h"
27 #include "user.h"
28 #include "ast.h"
29 
30 static int			ls_count;
31 static struct mutex		ls_lock;
32 static struct list_head		lslist;
33 static spinlock_t		lslist_lock;
34 static struct task_struct *	scand_task;
35 
36 
37 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
38 {
39 	ssize_t ret = len;
40 	int n;
41 	int rc = kstrtoint(buf, 0, &n);
42 
43 	if (rc)
44 		return rc;
45 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
46 	if (!ls)
47 		return -EINVAL;
48 
49 	switch (n) {
50 	case 0:
51 		dlm_ls_stop(ls);
52 		break;
53 	case 1:
54 		dlm_ls_start(ls);
55 		break;
56 	default:
57 		ret = -EINVAL;
58 	}
59 	dlm_put_lockspace(ls);
60 	return ret;
61 }
62 
63 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
64 {
65 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
66 
67 	if (rc)
68 		return rc;
69 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
70 	wake_up(&ls->ls_uevent_wait);
71 	return len;
72 }
73 
74 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
75 {
76 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
77 }
78 
79 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
80 {
81 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
82 
83 	if (rc)
84 		return rc;
85 	return len;
86 }
87 
88 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
89 {
90 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
91 }
92 
93 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
94 {
95 	int val;
96 	int rc = kstrtoint(buf, 0, &val);
97 
98 	if (rc)
99 		return rc;
100 	if (val == 1)
101 		set_bit(LSFL_NODIR, &ls->ls_flags);
102 	return len;
103 }
104 
105 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
106 {
107 	uint32_t status = dlm_recover_status(ls);
108 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
109 }
110 
111 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
112 {
113 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
114 }
115 
116 struct dlm_attr {
117 	struct attribute attr;
118 	ssize_t (*show)(struct dlm_ls *, char *);
119 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
120 };
121 
122 static struct dlm_attr dlm_attr_control = {
123 	.attr  = {.name = "control", .mode = S_IWUSR},
124 	.store = dlm_control_store
125 };
126 
127 static struct dlm_attr dlm_attr_event = {
128 	.attr  = {.name = "event_done", .mode = S_IWUSR},
129 	.store = dlm_event_store
130 };
131 
132 static struct dlm_attr dlm_attr_id = {
133 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
134 	.show  = dlm_id_show,
135 	.store = dlm_id_store
136 };
137 
138 static struct dlm_attr dlm_attr_nodir = {
139 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
140 	.show  = dlm_nodir_show,
141 	.store = dlm_nodir_store
142 };
143 
144 static struct dlm_attr dlm_attr_recover_status = {
145 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
146 	.show  = dlm_recover_status_show
147 };
148 
149 static struct dlm_attr dlm_attr_recover_nodeid = {
150 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
151 	.show  = dlm_recover_nodeid_show
152 };
153 
154 static struct attribute *dlm_attrs[] = {
155 	&dlm_attr_control.attr,
156 	&dlm_attr_event.attr,
157 	&dlm_attr_id.attr,
158 	&dlm_attr_nodir.attr,
159 	&dlm_attr_recover_status.attr,
160 	&dlm_attr_recover_nodeid.attr,
161 	NULL,
162 };
163 
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165 			     char *buf)
166 {
167 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169 	return a->show ? a->show(ls, buf) : 0;
170 }
171 
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173 			      const char *buf, size_t len)
174 {
175 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177 	return a->store ? a->store(ls, buf, len) : len;
178 }
179 
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183 	kfree(ls);
184 }
185 
186 static const struct sysfs_ops dlm_attr_ops = {
187 	.show  = dlm_attr_show,
188 	.store = dlm_attr_store,
189 };
190 
191 static struct kobj_type dlm_ktype = {
192 	.default_attrs = dlm_attrs,
193 	.sysfs_ops     = &dlm_attr_ops,
194 	.release       = lockspace_kobj_release,
195 };
196 
197 static struct kset *dlm_kset;
198 
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201 	int error;
202 
203 	if (in)
204 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
205 	else
206 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
207 
208 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
209 
210 	/* dlm_controld will see the uevent, do the necessary group management
211 	   and then write to sysfs to wake us */
212 
213 	error = wait_event_interruptible(ls->ls_uevent_wait,
214 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
215 
216 	log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
217 
218 	if (error)
219 		goto out;
220 
221 	error = ls->ls_uevent_result;
222  out:
223 	if (error)
224 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
225 			  error, ls->ls_uevent_result);
226 	return error;
227 }
228 
229 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
230 		      struct kobj_uevent_env *env)
231 {
232 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
233 
234 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
235 	return 0;
236 }
237 
238 static const struct kset_uevent_ops dlm_uevent_ops = {
239 	.uevent = dlm_uevent,
240 };
241 
242 int __init dlm_lockspace_init(void)
243 {
244 	ls_count = 0;
245 	mutex_init(&ls_lock);
246 	INIT_LIST_HEAD(&lslist);
247 	spin_lock_init(&lslist_lock);
248 
249 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
250 	if (!dlm_kset) {
251 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
252 		return -ENOMEM;
253 	}
254 	return 0;
255 }
256 
257 void dlm_lockspace_exit(void)
258 {
259 	kset_unregister(dlm_kset);
260 }
261 
262 static struct dlm_ls *find_ls_to_scan(void)
263 {
264 	struct dlm_ls *ls;
265 
266 	spin_lock(&lslist_lock);
267 	list_for_each_entry(ls, &lslist, ls_list) {
268 		if (time_after_eq(jiffies, ls->ls_scan_time +
269 					    dlm_config.ci_scan_secs * HZ)) {
270 			spin_unlock(&lslist_lock);
271 			return ls;
272 		}
273 	}
274 	spin_unlock(&lslist_lock);
275 	return NULL;
276 }
277 
278 static int dlm_scand(void *data)
279 {
280 	struct dlm_ls *ls;
281 
282 	while (!kthread_should_stop()) {
283 		ls = find_ls_to_scan();
284 		if (ls) {
285 			if (dlm_lock_recovery_try(ls)) {
286 				ls->ls_scan_time = jiffies;
287 				dlm_scan_rsbs(ls);
288 				dlm_scan_timeout(ls);
289 				dlm_scan_waiters(ls);
290 				dlm_unlock_recovery(ls);
291 			} else {
292 				ls->ls_scan_time += HZ;
293 			}
294 			continue;
295 		}
296 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
297 	}
298 	return 0;
299 }
300 
301 static int dlm_scand_start(void)
302 {
303 	struct task_struct *p;
304 	int error = 0;
305 
306 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
307 	if (IS_ERR(p))
308 		error = PTR_ERR(p);
309 	else
310 		scand_task = p;
311 	return error;
312 }
313 
314 static void dlm_scand_stop(void)
315 {
316 	kthread_stop(scand_task);
317 }
318 
319 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
320 {
321 	struct dlm_ls *ls;
322 
323 	spin_lock(&lslist_lock);
324 
325 	list_for_each_entry(ls, &lslist, ls_list) {
326 		if (ls->ls_global_id == id) {
327 			ls->ls_count++;
328 			goto out;
329 		}
330 	}
331 	ls = NULL;
332  out:
333 	spin_unlock(&lslist_lock);
334 	return ls;
335 }
336 
337 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
338 {
339 	struct dlm_ls *ls;
340 
341 	spin_lock(&lslist_lock);
342 	list_for_each_entry(ls, &lslist, ls_list) {
343 		if (ls->ls_local_handle == lockspace) {
344 			ls->ls_count++;
345 			goto out;
346 		}
347 	}
348 	ls = NULL;
349  out:
350 	spin_unlock(&lslist_lock);
351 	return ls;
352 }
353 
354 struct dlm_ls *dlm_find_lockspace_device(int minor)
355 {
356 	struct dlm_ls *ls;
357 
358 	spin_lock(&lslist_lock);
359 	list_for_each_entry(ls, &lslist, ls_list) {
360 		if (ls->ls_device.minor == minor) {
361 			ls->ls_count++;
362 			goto out;
363 		}
364 	}
365 	ls = NULL;
366  out:
367 	spin_unlock(&lslist_lock);
368 	return ls;
369 }
370 
371 void dlm_put_lockspace(struct dlm_ls *ls)
372 {
373 	spin_lock(&lslist_lock);
374 	ls->ls_count--;
375 	spin_unlock(&lslist_lock);
376 }
377 
378 static void remove_lockspace(struct dlm_ls *ls)
379 {
380 	for (;;) {
381 		spin_lock(&lslist_lock);
382 		if (ls->ls_count == 0) {
383 			WARN_ON(ls->ls_create_count != 0);
384 			list_del(&ls->ls_list);
385 			spin_unlock(&lslist_lock);
386 			return;
387 		}
388 		spin_unlock(&lslist_lock);
389 		ssleep(1);
390 	}
391 }
392 
393 static int threads_start(void)
394 {
395 	int error;
396 
397 	error = dlm_scand_start();
398 	if (error) {
399 		log_print("cannot start dlm_scand thread %d", error);
400 		goto fail;
401 	}
402 
403 	/* Thread for sending/receiving messages for all lockspace's */
404 	error = dlm_lowcomms_start();
405 	if (error) {
406 		log_print("cannot start dlm lowcomms %d", error);
407 		goto scand_fail;
408 	}
409 
410 	return 0;
411 
412  scand_fail:
413 	dlm_scand_stop();
414  fail:
415 	return error;
416 }
417 
418 static void threads_stop(void)
419 {
420 	dlm_scand_stop();
421 	dlm_lowcomms_stop();
422 }
423 
424 static int new_lockspace(const char *name, const char *cluster,
425 			 uint32_t flags, int lvblen,
426 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
427 			 int *ops_result, dlm_lockspace_t **lockspace)
428 {
429 	struct dlm_ls *ls;
430 	int i, size, error;
431 	int do_unreg = 0;
432 	int namelen = strlen(name);
433 
434 	if (namelen > DLM_LOCKSPACE_LEN)
435 		return -EINVAL;
436 
437 	if (!lvblen || (lvblen % 8))
438 		return -EINVAL;
439 
440 	if (!try_module_get(THIS_MODULE))
441 		return -EINVAL;
442 
443 	if (!dlm_user_daemon_available()) {
444 		log_print("dlm user daemon not available");
445 		error = -EUNATCH;
446 		goto out;
447 	}
448 
449 	if (ops && ops_result) {
450 	       	if (!dlm_config.ci_recover_callbacks)
451 			*ops_result = -EOPNOTSUPP;
452 		else
453 			*ops_result = 0;
454 	}
455 
456 	if (!cluster)
457 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
458 			  dlm_config.ci_cluster_name);
459 
460 	if (dlm_config.ci_recover_callbacks && cluster &&
461 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
462 		log_print("dlm cluster name '%s' does not match "
463 			  "the application cluster name '%s'",
464 			  dlm_config.ci_cluster_name, cluster);
465 		error = -EBADR;
466 		goto out;
467 	}
468 
469 	error = 0;
470 
471 	spin_lock(&lslist_lock);
472 	list_for_each_entry(ls, &lslist, ls_list) {
473 		WARN_ON(ls->ls_create_count <= 0);
474 		if (ls->ls_namelen != namelen)
475 			continue;
476 		if (memcmp(ls->ls_name, name, namelen))
477 			continue;
478 		if (flags & DLM_LSFL_NEWEXCL) {
479 			error = -EEXIST;
480 			break;
481 		}
482 		ls->ls_create_count++;
483 		*lockspace = ls;
484 		error = 1;
485 		break;
486 	}
487 	spin_unlock(&lslist_lock);
488 
489 	if (error)
490 		goto out;
491 
492 	error = -ENOMEM;
493 
494 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
495 	if (!ls)
496 		goto out;
497 	memcpy(ls->ls_name, name, namelen);
498 	ls->ls_namelen = namelen;
499 	ls->ls_lvblen = lvblen;
500 	ls->ls_count = 0;
501 	ls->ls_flags = 0;
502 	ls->ls_scan_time = jiffies;
503 
504 	if (ops && dlm_config.ci_recover_callbacks) {
505 		ls->ls_ops = ops;
506 		ls->ls_ops_arg = ops_arg;
507 	}
508 
509 	if (flags & DLM_LSFL_TIMEWARN)
510 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
511 
512 	/* ls_exflags are forced to match among nodes, and we don't
513 	   need to require all nodes to have some flags set */
514 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
515 				    DLM_LSFL_NEWEXCL));
516 
517 	size = dlm_config.ci_rsbtbl_size;
518 	ls->ls_rsbtbl_size = size;
519 
520 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
521 	if (!ls->ls_rsbtbl)
522 		goto out_lsfree;
523 	for (i = 0; i < size; i++) {
524 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
525 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
526 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
527 	}
528 
529 	spin_lock_init(&ls->ls_remove_spin);
530 
531 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
532 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
533 						 GFP_KERNEL);
534 		if (!ls->ls_remove_names[i])
535 			goto out_rsbtbl;
536 	}
537 
538 	idr_init(&ls->ls_lkbidr);
539 	spin_lock_init(&ls->ls_lkbidr_spin);
540 
541 	INIT_LIST_HEAD(&ls->ls_waiters);
542 	mutex_init(&ls->ls_waiters_mutex);
543 	INIT_LIST_HEAD(&ls->ls_orphans);
544 	mutex_init(&ls->ls_orphans_mutex);
545 	INIT_LIST_HEAD(&ls->ls_timeout);
546 	mutex_init(&ls->ls_timeout_mutex);
547 
548 	INIT_LIST_HEAD(&ls->ls_new_rsb);
549 	spin_lock_init(&ls->ls_new_rsb_spin);
550 
551 	INIT_LIST_HEAD(&ls->ls_nodes);
552 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
553 	ls->ls_num_nodes = 0;
554 	ls->ls_low_nodeid = 0;
555 	ls->ls_total_weight = 0;
556 	ls->ls_node_array = NULL;
557 
558 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
559 	ls->ls_stub_rsb.res_ls = ls;
560 
561 	ls->ls_debug_rsb_dentry = NULL;
562 	ls->ls_debug_waiters_dentry = NULL;
563 
564 	init_waitqueue_head(&ls->ls_uevent_wait);
565 	ls->ls_uevent_result = 0;
566 	init_completion(&ls->ls_members_done);
567 	ls->ls_members_result = -1;
568 
569 	mutex_init(&ls->ls_cb_mutex);
570 	INIT_LIST_HEAD(&ls->ls_cb_delay);
571 
572 	ls->ls_recoverd_task = NULL;
573 	mutex_init(&ls->ls_recoverd_active);
574 	spin_lock_init(&ls->ls_recover_lock);
575 	spin_lock_init(&ls->ls_rcom_spin);
576 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
577 	ls->ls_recover_status = 0;
578 	ls->ls_recover_seq = 0;
579 	ls->ls_recover_args = NULL;
580 	init_rwsem(&ls->ls_in_recovery);
581 	init_rwsem(&ls->ls_recv_active);
582 	INIT_LIST_HEAD(&ls->ls_requestqueue);
583 	mutex_init(&ls->ls_requestqueue_mutex);
584 	mutex_init(&ls->ls_clear_proc_locks);
585 
586 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
587 	if (!ls->ls_recover_buf)
588 		goto out_lkbidr;
589 
590 	ls->ls_slot = 0;
591 	ls->ls_num_slots = 0;
592 	ls->ls_slots_size = 0;
593 	ls->ls_slots = NULL;
594 
595 	INIT_LIST_HEAD(&ls->ls_recover_list);
596 	spin_lock_init(&ls->ls_recover_list_lock);
597 	idr_init(&ls->ls_recover_idr);
598 	spin_lock_init(&ls->ls_recover_idr_lock);
599 	ls->ls_recover_list_count = 0;
600 	ls->ls_local_handle = ls;
601 	init_waitqueue_head(&ls->ls_wait_general);
602 	INIT_LIST_HEAD(&ls->ls_root_list);
603 	init_rwsem(&ls->ls_root_sem);
604 
605 	spin_lock(&lslist_lock);
606 	ls->ls_create_count = 1;
607 	list_add(&ls->ls_list, &lslist);
608 	spin_unlock(&lslist_lock);
609 
610 	if (flags & DLM_LSFL_FS) {
611 		error = dlm_callback_start(ls);
612 		if (error) {
613 			log_error(ls, "can't start dlm_callback %d", error);
614 			goto out_delist;
615 		}
616 	}
617 
618 	init_waitqueue_head(&ls->ls_recover_lock_wait);
619 
620 	/*
621 	 * Once started, dlm_recoverd first looks for ls in lslist, then
622 	 * initializes ls_in_recovery as locked in "down" mode.  We need
623 	 * to wait for the wakeup from dlm_recoverd because in_recovery
624 	 * has to start out in down mode.
625 	 */
626 
627 	error = dlm_recoverd_start(ls);
628 	if (error) {
629 		log_error(ls, "can't start dlm_recoverd %d", error);
630 		goto out_callback;
631 	}
632 
633 	wait_event(ls->ls_recover_lock_wait,
634 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
635 
636 	ls->ls_kobj.kset = dlm_kset;
637 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
638 				     "%s", ls->ls_name);
639 	if (error)
640 		goto out_recoverd;
641 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
642 
643 	/* let kobject handle freeing of ls if there's an error */
644 	do_unreg = 1;
645 
646 	/* This uevent triggers dlm_controld in userspace to add us to the
647 	   group of nodes that are members of this lockspace (managed by the
648 	   cluster infrastructure.)  Once it's done that, it tells us who the
649 	   current lockspace members are (via configfs) and then tells the
650 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
651 
652 	error = do_uevent(ls, 1);
653 	if (error)
654 		goto out_recoverd;
655 
656 	wait_for_completion(&ls->ls_members_done);
657 	error = ls->ls_members_result;
658 	if (error)
659 		goto out_members;
660 
661 	dlm_create_debug_file(ls);
662 
663 	log_rinfo(ls, "join complete");
664 	*lockspace = ls;
665 	return 0;
666 
667  out_members:
668 	do_uevent(ls, 0);
669 	dlm_clear_members(ls);
670 	kfree(ls->ls_node_array);
671  out_recoverd:
672 	dlm_recoverd_stop(ls);
673  out_callback:
674 	dlm_callback_stop(ls);
675  out_delist:
676 	spin_lock(&lslist_lock);
677 	list_del(&ls->ls_list);
678 	spin_unlock(&lslist_lock);
679 	idr_destroy(&ls->ls_recover_idr);
680 	kfree(ls->ls_recover_buf);
681  out_lkbidr:
682 	idr_destroy(&ls->ls_lkbidr);
683 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
684 		if (ls->ls_remove_names[i])
685 			kfree(ls->ls_remove_names[i]);
686 	}
687  out_rsbtbl:
688 	vfree(ls->ls_rsbtbl);
689  out_lsfree:
690 	if (do_unreg)
691 		kobject_put(&ls->ls_kobj);
692 	else
693 		kfree(ls);
694  out:
695 	module_put(THIS_MODULE);
696 	return error;
697 }
698 
699 int dlm_new_lockspace(const char *name, const char *cluster,
700 		      uint32_t flags, int lvblen,
701 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
702 		      int *ops_result, dlm_lockspace_t **lockspace)
703 {
704 	int error = 0;
705 
706 	mutex_lock(&ls_lock);
707 	if (!ls_count)
708 		error = threads_start();
709 	if (error)
710 		goto out;
711 
712 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
713 			      ops_result, lockspace);
714 	if (!error)
715 		ls_count++;
716 	if (error > 0)
717 		error = 0;
718 	if (!ls_count)
719 		threads_stop();
720  out:
721 	mutex_unlock(&ls_lock);
722 	return error;
723 }
724 
725 static int lkb_idr_is_local(int id, void *p, void *data)
726 {
727 	struct dlm_lkb *lkb = p;
728 
729 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
730 }
731 
732 static int lkb_idr_is_any(int id, void *p, void *data)
733 {
734 	return 1;
735 }
736 
737 static int lkb_idr_free(int id, void *p, void *data)
738 {
739 	struct dlm_lkb *lkb = p;
740 
741 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
742 		dlm_free_lvb(lkb->lkb_lvbptr);
743 
744 	dlm_free_lkb(lkb);
745 	return 0;
746 }
747 
748 /* NOTE: We check the lkbidr here rather than the resource table.
749    This is because there may be LKBs queued as ASTs that have been unlinked
750    from their RSBs and are pending deletion once the AST has been delivered */
751 
752 static int lockspace_busy(struct dlm_ls *ls, int force)
753 {
754 	int rv;
755 
756 	spin_lock(&ls->ls_lkbidr_spin);
757 	if (force == 0) {
758 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
759 	} else if (force == 1) {
760 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
761 	} else {
762 		rv = 0;
763 	}
764 	spin_unlock(&ls->ls_lkbidr_spin);
765 	return rv;
766 }
767 
768 static int release_lockspace(struct dlm_ls *ls, int force)
769 {
770 	struct dlm_rsb *rsb;
771 	struct rb_node *n;
772 	int i, busy, rv;
773 
774 	busy = lockspace_busy(ls, force);
775 
776 	spin_lock(&lslist_lock);
777 	if (ls->ls_create_count == 1) {
778 		if (busy) {
779 			rv = -EBUSY;
780 		} else {
781 			/* remove_lockspace takes ls off lslist */
782 			ls->ls_create_count = 0;
783 			rv = 0;
784 		}
785 	} else if (ls->ls_create_count > 1) {
786 		rv = --ls->ls_create_count;
787 	} else {
788 		rv = -EINVAL;
789 	}
790 	spin_unlock(&lslist_lock);
791 
792 	if (rv) {
793 		log_debug(ls, "release_lockspace no remove %d", rv);
794 		return rv;
795 	}
796 
797 	dlm_device_deregister(ls);
798 
799 	if (force < 3 && dlm_user_daemon_available())
800 		do_uevent(ls, 0);
801 
802 	dlm_recoverd_stop(ls);
803 
804 	dlm_callback_stop(ls);
805 
806 	remove_lockspace(ls);
807 
808 	dlm_delete_debug_file(ls);
809 
810 	kfree(ls->ls_recover_buf);
811 
812 	/*
813 	 * Free all lkb's in idr
814 	 */
815 
816 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
817 	idr_destroy(&ls->ls_lkbidr);
818 
819 	/*
820 	 * Free all rsb's on rsbtbl[] lists
821 	 */
822 
823 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
824 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
825 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
826 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
827 			dlm_free_rsb(rsb);
828 		}
829 
830 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
831 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
832 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
833 			dlm_free_rsb(rsb);
834 		}
835 	}
836 
837 	vfree(ls->ls_rsbtbl);
838 
839 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
840 		kfree(ls->ls_remove_names[i]);
841 
842 	while (!list_empty(&ls->ls_new_rsb)) {
843 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
844 				       res_hashchain);
845 		list_del(&rsb->res_hashchain);
846 		dlm_free_rsb(rsb);
847 	}
848 
849 	/*
850 	 * Free structures on any other lists
851 	 */
852 
853 	dlm_purge_requestqueue(ls);
854 	kfree(ls->ls_recover_args);
855 	dlm_clear_members(ls);
856 	dlm_clear_members_gone(ls);
857 	kfree(ls->ls_node_array);
858 	log_rinfo(ls, "release_lockspace final free");
859 	kobject_put(&ls->ls_kobj);
860 	/* The ls structure will be freed when the kobject is done with */
861 
862 	module_put(THIS_MODULE);
863 	return 0;
864 }
865 
866 /*
867  * Called when a system has released all its locks and is not going to use the
868  * lockspace any longer.  We free everything we're managing for this lockspace.
869  * Remaining nodes will go through the recovery process as if we'd died.  The
870  * lockspace must continue to function as usual, participating in recoveries,
871  * until this returns.
872  *
873  * Force has 4 possible values:
874  * 0 - don't destroy locksapce if it has any LKBs
875  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
876  * 2 - destroy lockspace regardless of LKBs
877  * 3 - destroy lockspace as part of a forced shutdown
878  */
879 
880 int dlm_release_lockspace(void *lockspace, int force)
881 {
882 	struct dlm_ls *ls;
883 	int error;
884 
885 	ls = dlm_find_lockspace_local(lockspace);
886 	if (!ls)
887 		return -EINVAL;
888 	dlm_put_lockspace(ls);
889 
890 	mutex_lock(&ls_lock);
891 	error = release_lockspace(ls, force);
892 	if (!error)
893 		ls_count--;
894 	if (!ls_count)
895 		threads_stop();
896 	mutex_unlock(&ls_lock);
897 
898 	return error;
899 }
900 
901 void dlm_stop_lockspaces(void)
902 {
903 	struct dlm_ls *ls;
904 	int count;
905 
906  restart:
907 	count = 0;
908 	spin_lock(&lslist_lock);
909 	list_for_each_entry(ls, &lslist, ls_list) {
910 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
911 			count++;
912 			continue;
913 		}
914 		spin_unlock(&lslist_lock);
915 		log_error(ls, "no userland control daemon, stopping lockspace");
916 		dlm_ls_stop(ls);
917 		goto restart;
918 	}
919 	spin_unlock(&lslist_lock);
920 
921 	if (count)
922 		log_print("dlm user daemon left %d lockspaces", count);
923 }
924 
925