xref: /linux/fs/dlm/lockspace.c (revision 148f9bb87745ed45f7a11b2cbd3bc0f017d5d257)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n = simple_strtol(buf, NULL, 0);
39 
40 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 	if (!ls)
42 		return -EINVAL;
43 
44 	switch (n) {
45 	case 0:
46 		dlm_ls_stop(ls);
47 		break;
48 	case 1:
49 		dlm_ls_start(ls);
50 		break;
51 	default:
52 		ret = -EINVAL;
53 	}
54 	dlm_put_lockspace(ls);
55 	return ret;
56 }
57 
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60 	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 	wake_up(&ls->ls_uevent_wait);
63 	return len;
64 }
65 
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70 
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73 	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 	return len;
75 }
76 
77 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
78 {
79 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
80 }
81 
82 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
83 {
84 	int val = simple_strtoul(buf, NULL, 0);
85 	if (val == 1)
86 		set_bit(LSFL_NODIR, &ls->ls_flags);
87 	return len;
88 }
89 
90 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
91 {
92 	uint32_t status = dlm_recover_status(ls);
93 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
94 }
95 
96 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
97 {
98 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
99 }
100 
101 struct dlm_attr {
102 	struct attribute attr;
103 	ssize_t (*show)(struct dlm_ls *, char *);
104 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
105 };
106 
107 static struct dlm_attr dlm_attr_control = {
108 	.attr  = {.name = "control", .mode = S_IWUSR},
109 	.store = dlm_control_store
110 };
111 
112 static struct dlm_attr dlm_attr_event = {
113 	.attr  = {.name = "event_done", .mode = S_IWUSR},
114 	.store = dlm_event_store
115 };
116 
117 static struct dlm_attr dlm_attr_id = {
118 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
119 	.show  = dlm_id_show,
120 	.store = dlm_id_store
121 };
122 
123 static struct dlm_attr dlm_attr_nodir = {
124 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125 	.show  = dlm_nodir_show,
126 	.store = dlm_nodir_store
127 };
128 
129 static struct dlm_attr dlm_attr_recover_status = {
130 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
131 	.show  = dlm_recover_status_show
132 };
133 
134 static struct dlm_attr dlm_attr_recover_nodeid = {
135 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
136 	.show  = dlm_recover_nodeid_show
137 };
138 
139 static struct attribute *dlm_attrs[] = {
140 	&dlm_attr_control.attr,
141 	&dlm_attr_event.attr,
142 	&dlm_attr_id.attr,
143 	&dlm_attr_nodir.attr,
144 	&dlm_attr_recover_status.attr,
145 	&dlm_attr_recover_nodeid.attr,
146 	NULL,
147 };
148 
149 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
150 			     char *buf)
151 {
152 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
153 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
154 	return a->show ? a->show(ls, buf) : 0;
155 }
156 
157 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
158 			      const char *buf, size_t len)
159 {
160 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
161 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
162 	return a->store ? a->store(ls, buf, len) : len;
163 }
164 
165 static void lockspace_kobj_release(struct kobject *k)
166 {
167 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
168 	kfree(ls);
169 }
170 
171 static const struct sysfs_ops dlm_attr_ops = {
172 	.show  = dlm_attr_show,
173 	.store = dlm_attr_store,
174 };
175 
176 static struct kobj_type dlm_ktype = {
177 	.default_attrs = dlm_attrs,
178 	.sysfs_ops     = &dlm_attr_ops,
179 	.release       = lockspace_kobj_release,
180 };
181 
182 static struct kset *dlm_kset;
183 
184 static int do_uevent(struct dlm_ls *ls, int in)
185 {
186 	int error;
187 
188 	if (in)
189 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
190 	else
191 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
192 
193 	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
194 
195 	/* dlm_controld will see the uevent, do the necessary group management
196 	   and then write to sysfs to wake us */
197 
198 	error = wait_event_interruptible(ls->ls_uevent_wait,
199 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
200 
201 	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
202 
203 	if (error)
204 		goto out;
205 
206 	error = ls->ls_uevent_result;
207  out:
208 	if (error)
209 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
210 			  error, ls->ls_uevent_result);
211 	return error;
212 }
213 
214 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
215 		      struct kobj_uevent_env *env)
216 {
217 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
218 
219 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
220 	return 0;
221 }
222 
223 static struct kset_uevent_ops dlm_uevent_ops = {
224 	.uevent = dlm_uevent,
225 };
226 
227 int __init dlm_lockspace_init(void)
228 {
229 	ls_count = 0;
230 	mutex_init(&ls_lock);
231 	INIT_LIST_HEAD(&lslist);
232 	spin_lock_init(&lslist_lock);
233 
234 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
235 	if (!dlm_kset) {
236 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
237 		return -ENOMEM;
238 	}
239 	return 0;
240 }
241 
242 void dlm_lockspace_exit(void)
243 {
244 	kset_unregister(dlm_kset);
245 }
246 
247 static struct dlm_ls *find_ls_to_scan(void)
248 {
249 	struct dlm_ls *ls;
250 
251 	spin_lock(&lslist_lock);
252 	list_for_each_entry(ls, &lslist, ls_list) {
253 		if (time_after_eq(jiffies, ls->ls_scan_time +
254 					    dlm_config.ci_scan_secs * HZ)) {
255 			spin_unlock(&lslist_lock);
256 			return ls;
257 		}
258 	}
259 	spin_unlock(&lslist_lock);
260 	return NULL;
261 }
262 
263 static int dlm_scand(void *data)
264 {
265 	struct dlm_ls *ls;
266 
267 	while (!kthread_should_stop()) {
268 		ls = find_ls_to_scan();
269 		if (ls) {
270 			if (dlm_lock_recovery_try(ls)) {
271 				ls->ls_scan_time = jiffies;
272 				dlm_scan_rsbs(ls);
273 				dlm_scan_timeout(ls);
274 				dlm_scan_waiters(ls);
275 				dlm_unlock_recovery(ls);
276 			} else {
277 				ls->ls_scan_time += HZ;
278 			}
279 			continue;
280 		}
281 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
282 	}
283 	return 0;
284 }
285 
286 static int dlm_scand_start(void)
287 {
288 	struct task_struct *p;
289 	int error = 0;
290 
291 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
292 	if (IS_ERR(p))
293 		error = PTR_ERR(p);
294 	else
295 		scand_task = p;
296 	return error;
297 }
298 
299 static void dlm_scand_stop(void)
300 {
301 	kthread_stop(scand_task);
302 }
303 
304 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
305 {
306 	struct dlm_ls *ls;
307 
308 	spin_lock(&lslist_lock);
309 
310 	list_for_each_entry(ls, &lslist, ls_list) {
311 		if (ls->ls_global_id == id) {
312 			ls->ls_count++;
313 			goto out;
314 		}
315 	}
316 	ls = NULL;
317  out:
318 	spin_unlock(&lslist_lock);
319 	return ls;
320 }
321 
322 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
323 {
324 	struct dlm_ls *ls;
325 
326 	spin_lock(&lslist_lock);
327 	list_for_each_entry(ls, &lslist, ls_list) {
328 		if (ls->ls_local_handle == lockspace) {
329 			ls->ls_count++;
330 			goto out;
331 		}
332 	}
333 	ls = NULL;
334  out:
335 	spin_unlock(&lslist_lock);
336 	return ls;
337 }
338 
339 struct dlm_ls *dlm_find_lockspace_device(int minor)
340 {
341 	struct dlm_ls *ls;
342 
343 	spin_lock(&lslist_lock);
344 	list_for_each_entry(ls, &lslist, ls_list) {
345 		if (ls->ls_device.minor == minor) {
346 			ls->ls_count++;
347 			goto out;
348 		}
349 	}
350 	ls = NULL;
351  out:
352 	spin_unlock(&lslist_lock);
353 	return ls;
354 }
355 
356 void dlm_put_lockspace(struct dlm_ls *ls)
357 {
358 	spin_lock(&lslist_lock);
359 	ls->ls_count--;
360 	spin_unlock(&lslist_lock);
361 }
362 
363 static void remove_lockspace(struct dlm_ls *ls)
364 {
365 	for (;;) {
366 		spin_lock(&lslist_lock);
367 		if (ls->ls_count == 0) {
368 			WARN_ON(ls->ls_create_count != 0);
369 			list_del(&ls->ls_list);
370 			spin_unlock(&lslist_lock);
371 			return;
372 		}
373 		spin_unlock(&lslist_lock);
374 		ssleep(1);
375 	}
376 }
377 
378 static int threads_start(void)
379 {
380 	int error;
381 
382 	error = dlm_scand_start();
383 	if (error) {
384 		log_print("cannot start dlm_scand thread %d", error);
385 		goto fail;
386 	}
387 
388 	/* Thread for sending/receiving messages for all lockspace's */
389 	error = dlm_lowcomms_start();
390 	if (error) {
391 		log_print("cannot start dlm lowcomms %d", error);
392 		goto scand_fail;
393 	}
394 
395 	return 0;
396 
397  scand_fail:
398 	dlm_scand_stop();
399  fail:
400 	return error;
401 }
402 
403 static void threads_stop(void)
404 {
405 	dlm_scand_stop();
406 	dlm_lowcomms_stop();
407 }
408 
409 static int new_lockspace(const char *name, const char *cluster,
410 			 uint32_t flags, int lvblen,
411 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
412 			 int *ops_result, dlm_lockspace_t **lockspace)
413 {
414 	struct dlm_ls *ls;
415 	int i, size, error;
416 	int do_unreg = 0;
417 	int namelen = strlen(name);
418 
419 	if (namelen > DLM_LOCKSPACE_LEN)
420 		return -EINVAL;
421 
422 	if (!lvblen || (lvblen % 8))
423 		return -EINVAL;
424 
425 	if (!try_module_get(THIS_MODULE))
426 		return -EINVAL;
427 
428 	if (!dlm_user_daemon_available()) {
429 		log_print("dlm user daemon not available");
430 		error = -EUNATCH;
431 		goto out;
432 	}
433 
434 	if (ops && ops_result) {
435 	       	if (!dlm_config.ci_recover_callbacks)
436 			*ops_result = -EOPNOTSUPP;
437 		else
438 			*ops_result = 0;
439 	}
440 
441 	if (dlm_config.ci_recover_callbacks && cluster &&
442 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443 		log_print("dlm cluster name %s mismatch %s",
444 			  dlm_config.ci_cluster_name, cluster);
445 		error = -EBADR;
446 		goto out;
447 	}
448 
449 	error = 0;
450 
451 	spin_lock(&lslist_lock);
452 	list_for_each_entry(ls, &lslist, ls_list) {
453 		WARN_ON(ls->ls_create_count <= 0);
454 		if (ls->ls_namelen != namelen)
455 			continue;
456 		if (memcmp(ls->ls_name, name, namelen))
457 			continue;
458 		if (flags & DLM_LSFL_NEWEXCL) {
459 			error = -EEXIST;
460 			break;
461 		}
462 		ls->ls_create_count++;
463 		*lockspace = ls;
464 		error = 1;
465 		break;
466 	}
467 	spin_unlock(&lslist_lock);
468 
469 	if (error)
470 		goto out;
471 
472 	error = -ENOMEM;
473 
474 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
475 	if (!ls)
476 		goto out;
477 	memcpy(ls->ls_name, name, namelen);
478 	ls->ls_namelen = namelen;
479 	ls->ls_lvblen = lvblen;
480 	ls->ls_count = 0;
481 	ls->ls_flags = 0;
482 	ls->ls_scan_time = jiffies;
483 
484 	if (ops && dlm_config.ci_recover_callbacks) {
485 		ls->ls_ops = ops;
486 		ls->ls_ops_arg = ops_arg;
487 	}
488 
489 	if (flags & DLM_LSFL_TIMEWARN)
490 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
491 
492 	/* ls_exflags are forced to match among nodes, and we don't
493 	   need to require all nodes to have some flags set */
494 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
495 				    DLM_LSFL_NEWEXCL));
496 
497 	size = dlm_config.ci_rsbtbl_size;
498 	ls->ls_rsbtbl_size = size;
499 
500 	ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
501 	if (!ls->ls_rsbtbl)
502 		goto out_lsfree;
503 	for (i = 0; i < size; i++) {
504 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
505 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
506 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
507 	}
508 
509 	spin_lock_init(&ls->ls_remove_spin);
510 
511 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
512 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
513 						 GFP_KERNEL);
514 		if (!ls->ls_remove_names[i])
515 			goto out_rsbtbl;
516 	}
517 
518 	idr_init(&ls->ls_lkbidr);
519 	spin_lock_init(&ls->ls_lkbidr_spin);
520 
521 	INIT_LIST_HEAD(&ls->ls_waiters);
522 	mutex_init(&ls->ls_waiters_mutex);
523 	INIT_LIST_HEAD(&ls->ls_orphans);
524 	mutex_init(&ls->ls_orphans_mutex);
525 	INIT_LIST_HEAD(&ls->ls_timeout);
526 	mutex_init(&ls->ls_timeout_mutex);
527 
528 	INIT_LIST_HEAD(&ls->ls_new_rsb);
529 	spin_lock_init(&ls->ls_new_rsb_spin);
530 
531 	INIT_LIST_HEAD(&ls->ls_nodes);
532 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
533 	ls->ls_num_nodes = 0;
534 	ls->ls_low_nodeid = 0;
535 	ls->ls_total_weight = 0;
536 	ls->ls_node_array = NULL;
537 
538 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
539 	ls->ls_stub_rsb.res_ls = ls;
540 
541 	ls->ls_debug_rsb_dentry = NULL;
542 	ls->ls_debug_waiters_dentry = NULL;
543 
544 	init_waitqueue_head(&ls->ls_uevent_wait);
545 	ls->ls_uevent_result = 0;
546 	init_completion(&ls->ls_members_done);
547 	ls->ls_members_result = -1;
548 
549 	mutex_init(&ls->ls_cb_mutex);
550 	INIT_LIST_HEAD(&ls->ls_cb_delay);
551 
552 	ls->ls_recoverd_task = NULL;
553 	mutex_init(&ls->ls_recoverd_active);
554 	spin_lock_init(&ls->ls_recover_lock);
555 	spin_lock_init(&ls->ls_rcom_spin);
556 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
557 	ls->ls_recover_status = 0;
558 	ls->ls_recover_seq = 0;
559 	ls->ls_recover_args = NULL;
560 	init_rwsem(&ls->ls_in_recovery);
561 	init_rwsem(&ls->ls_recv_active);
562 	INIT_LIST_HEAD(&ls->ls_requestqueue);
563 	mutex_init(&ls->ls_requestqueue_mutex);
564 	mutex_init(&ls->ls_clear_proc_locks);
565 
566 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
567 	if (!ls->ls_recover_buf)
568 		goto out_lkbidr;
569 
570 	ls->ls_slot = 0;
571 	ls->ls_num_slots = 0;
572 	ls->ls_slots_size = 0;
573 	ls->ls_slots = NULL;
574 
575 	INIT_LIST_HEAD(&ls->ls_recover_list);
576 	spin_lock_init(&ls->ls_recover_list_lock);
577 	idr_init(&ls->ls_recover_idr);
578 	spin_lock_init(&ls->ls_recover_idr_lock);
579 	ls->ls_recover_list_count = 0;
580 	ls->ls_local_handle = ls;
581 	init_waitqueue_head(&ls->ls_wait_general);
582 	INIT_LIST_HEAD(&ls->ls_root_list);
583 	init_rwsem(&ls->ls_root_sem);
584 
585 	spin_lock(&lslist_lock);
586 	ls->ls_create_count = 1;
587 	list_add(&ls->ls_list, &lslist);
588 	spin_unlock(&lslist_lock);
589 
590 	if (flags & DLM_LSFL_FS) {
591 		error = dlm_callback_start(ls);
592 		if (error) {
593 			log_error(ls, "can't start dlm_callback %d", error);
594 			goto out_delist;
595 		}
596 	}
597 
598 	init_waitqueue_head(&ls->ls_recover_lock_wait);
599 
600 	/*
601 	 * Once started, dlm_recoverd first looks for ls in lslist, then
602 	 * initializes ls_in_recovery as locked in "down" mode.  We need
603 	 * to wait for the wakeup from dlm_recoverd because in_recovery
604 	 * has to start out in down mode.
605 	 */
606 
607 	error = dlm_recoverd_start(ls);
608 	if (error) {
609 		log_error(ls, "can't start dlm_recoverd %d", error);
610 		goto out_callback;
611 	}
612 
613 	wait_event(ls->ls_recover_lock_wait,
614 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
615 
616 	ls->ls_kobj.kset = dlm_kset;
617 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
618 				     "%s", ls->ls_name);
619 	if (error)
620 		goto out_recoverd;
621 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
622 
623 	/* let kobject handle freeing of ls if there's an error */
624 	do_unreg = 1;
625 
626 	/* This uevent triggers dlm_controld in userspace to add us to the
627 	   group of nodes that are members of this lockspace (managed by the
628 	   cluster infrastructure.)  Once it's done that, it tells us who the
629 	   current lockspace members are (via configfs) and then tells the
630 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
631 
632 	error = do_uevent(ls, 1);
633 	if (error)
634 		goto out_recoverd;
635 
636 	wait_for_completion(&ls->ls_members_done);
637 	error = ls->ls_members_result;
638 	if (error)
639 		goto out_members;
640 
641 	dlm_create_debug_file(ls);
642 
643 	log_debug(ls, "join complete");
644 	*lockspace = ls;
645 	return 0;
646 
647  out_members:
648 	do_uevent(ls, 0);
649 	dlm_clear_members(ls);
650 	kfree(ls->ls_node_array);
651  out_recoverd:
652 	dlm_recoverd_stop(ls);
653  out_callback:
654 	dlm_callback_stop(ls);
655  out_delist:
656 	spin_lock(&lslist_lock);
657 	list_del(&ls->ls_list);
658 	spin_unlock(&lslist_lock);
659 	idr_destroy(&ls->ls_recover_idr);
660 	kfree(ls->ls_recover_buf);
661  out_lkbidr:
662 	idr_destroy(&ls->ls_lkbidr);
663 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
664 		if (ls->ls_remove_names[i])
665 			kfree(ls->ls_remove_names[i]);
666 	}
667  out_rsbtbl:
668 	vfree(ls->ls_rsbtbl);
669  out_lsfree:
670 	if (do_unreg)
671 		kobject_put(&ls->ls_kobj);
672 	else
673 		kfree(ls);
674  out:
675 	module_put(THIS_MODULE);
676 	return error;
677 }
678 
679 int dlm_new_lockspace(const char *name, const char *cluster,
680 		      uint32_t flags, int lvblen,
681 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
682 		      int *ops_result, dlm_lockspace_t **lockspace)
683 {
684 	int error = 0;
685 
686 	mutex_lock(&ls_lock);
687 	if (!ls_count)
688 		error = threads_start();
689 	if (error)
690 		goto out;
691 
692 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
693 			      ops_result, lockspace);
694 	if (!error)
695 		ls_count++;
696 	if (error > 0)
697 		error = 0;
698 	if (!ls_count)
699 		threads_stop();
700  out:
701 	mutex_unlock(&ls_lock);
702 	return error;
703 }
704 
705 static int lkb_idr_is_local(int id, void *p, void *data)
706 {
707 	struct dlm_lkb *lkb = p;
708 
709 	if (!lkb->lkb_nodeid)
710 		return 1;
711 	return 0;
712 }
713 
714 static int lkb_idr_is_any(int id, void *p, void *data)
715 {
716 	return 1;
717 }
718 
719 static int lkb_idr_free(int id, void *p, void *data)
720 {
721 	struct dlm_lkb *lkb = p;
722 
723 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
724 		dlm_free_lvb(lkb->lkb_lvbptr);
725 
726 	dlm_free_lkb(lkb);
727 	return 0;
728 }
729 
730 /* NOTE: We check the lkbidr here rather than the resource table.
731    This is because there may be LKBs queued as ASTs that have been unlinked
732    from their RSBs and are pending deletion once the AST has been delivered */
733 
734 static int lockspace_busy(struct dlm_ls *ls, int force)
735 {
736 	int rv;
737 
738 	spin_lock(&ls->ls_lkbidr_spin);
739 	if (force == 0) {
740 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
741 	} else if (force == 1) {
742 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
743 	} else {
744 		rv = 0;
745 	}
746 	spin_unlock(&ls->ls_lkbidr_spin);
747 	return rv;
748 }
749 
750 static int release_lockspace(struct dlm_ls *ls, int force)
751 {
752 	struct dlm_rsb *rsb;
753 	struct rb_node *n;
754 	int i, busy, rv;
755 
756 	busy = lockspace_busy(ls, force);
757 
758 	spin_lock(&lslist_lock);
759 	if (ls->ls_create_count == 1) {
760 		if (busy) {
761 			rv = -EBUSY;
762 		} else {
763 			/* remove_lockspace takes ls off lslist */
764 			ls->ls_create_count = 0;
765 			rv = 0;
766 		}
767 	} else if (ls->ls_create_count > 1) {
768 		rv = --ls->ls_create_count;
769 	} else {
770 		rv = -EINVAL;
771 	}
772 	spin_unlock(&lslist_lock);
773 
774 	if (rv) {
775 		log_debug(ls, "release_lockspace no remove %d", rv);
776 		return rv;
777 	}
778 
779 	dlm_device_deregister(ls);
780 
781 	if (force < 3 && dlm_user_daemon_available())
782 		do_uevent(ls, 0);
783 
784 	dlm_recoverd_stop(ls);
785 
786 	dlm_callback_stop(ls);
787 
788 	remove_lockspace(ls);
789 
790 	dlm_delete_debug_file(ls);
791 
792 	kfree(ls->ls_recover_buf);
793 
794 	/*
795 	 * Free all lkb's in idr
796 	 */
797 
798 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
799 	idr_destroy(&ls->ls_lkbidr);
800 
801 	/*
802 	 * Free all rsb's on rsbtbl[] lists
803 	 */
804 
805 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
806 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
807 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
808 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
809 			dlm_free_rsb(rsb);
810 		}
811 
812 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
813 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
814 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
815 			dlm_free_rsb(rsb);
816 		}
817 	}
818 
819 	vfree(ls->ls_rsbtbl);
820 
821 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
822 		kfree(ls->ls_remove_names[i]);
823 
824 	while (!list_empty(&ls->ls_new_rsb)) {
825 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
826 				       res_hashchain);
827 		list_del(&rsb->res_hashchain);
828 		dlm_free_rsb(rsb);
829 	}
830 
831 	/*
832 	 * Free structures on any other lists
833 	 */
834 
835 	dlm_purge_requestqueue(ls);
836 	kfree(ls->ls_recover_args);
837 	dlm_clear_members(ls);
838 	dlm_clear_members_gone(ls);
839 	kfree(ls->ls_node_array);
840 	log_debug(ls, "release_lockspace final free");
841 	kobject_put(&ls->ls_kobj);
842 	/* The ls structure will be freed when the kobject is done with */
843 
844 	module_put(THIS_MODULE);
845 	return 0;
846 }
847 
848 /*
849  * Called when a system has released all its locks and is not going to use the
850  * lockspace any longer.  We free everything we're managing for this lockspace.
851  * Remaining nodes will go through the recovery process as if we'd died.  The
852  * lockspace must continue to function as usual, participating in recoveries,
853  * until this returns.
854  *
855  * Force has 4 possible values:
856  * 0 - don't destroy locksapce if it has any LKBs
857  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
858  * 2 - destroy lockspace regardless of LKBs
859  * 3 - destroy lockspace as part of a forced shutdown
860  */
861 
862 int dlm_release_lockspace(void *lockspace, int force)
863 {
864 	struct dlm_ls *ls;
865 	int error;
866 
867 	ls = dlm_find_lockspace_local(lockspace);
868 	if (!ls)
869 		return -EINVAL;
870 	dlm_put_lockspace(ls);
871 
872 	mutex_lock(&ls_lock);
873 	error = release_lockspace(ls, force);
874 	if (!error)
875 		ls_count--;
876 	if (!ls_count)
877 		threads_stop();
878 	mutex_unlock(&ls_lock);
879 
880 	return error;
881 }
882 
883 void dlm_stop_lockspaces(void)
884 {
885 	struct dlm_ls *ls;
886 	int count;
887 
888  restart:
889 	count = 0;
890 	spin_lock(&lslist_lock);
891 	list_for_each_entry(ls, &lslist, ls_list) {
892 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
893 			count++;
894 			continue;
895 		}
896 		spin_unlock(&lslist_lock);
897 		log_error(ls, "no userland control daemon, stopping lockspace");
898 		dlm_ls_stop(ls);
899 		goto restart;
900 	}
901 	spin_unlock(&lslist_lock);
902 
903 	if (count)
904 		log_print("dlm user daemon left %d lockspaces", count);
905 }
906 
907