xref: /linux/fs/dlm/lockspace.c (revision a769648f464c9f453b3dc5c2bb8559b28c5d78a1)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 
dlm_control_store(struct dlm_ls * ls,const char * buf,size_t len)33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34 {
35 	ssize_t ret = len;
36 	int n;
37 	int rc = kstrtoint(buf, 0, &n);
38 
39 	if (rc)
40 		return rc;
41 	ls = dlm_find_lockspace_local(ls);
42 	if (!ls)
43 		return -EINVAL;
44 
45 	switch (n) {
46 	case 0:
47 		dlm_ls_stop(ls);
48 		break;
49 	case 1:
50 		dlm_ls_start(ls);
51 		break;
52 	default:
53 		ret = -EINVAL;
54 	}
55 	dlm_put_lockspace(ls);
56 	return ret;
57 }
58 
dlm_event_store(struct dlm_ls * ls,const char * buf,size_t len)59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62 
63 	if (rc)
64 		return rc;
65 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66 	wake_up(&ls->ls_uevent_wait);
67 	return len;
68 }
69 
dlm_id_show(struct dlm_ls * ls,char * buf)70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71 {
72 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73 }
74 
dlm_id_store(struct dlm_ls * ls,const char * buf,size_t len)75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76 {
77 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78 
79 	if (rc)
80 		return rc;
81 	return len;
82 }
83 
dlm_nodir_show(struct dlm_ls * ls,char * buf)84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85 {
86 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87 }
88 
dlm_nodir_store(struct dlm_ls * ls,const char * buf,size_t len)89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90 {
91 	int val;
92 	int rc = kstrtoint(buf, 0, &val);
93 
94 	if (rc)
95 		return rc;
96 	if (val == 1)
97 		set_bit(LSFL_NODIR, &ls->ls_flags);
98 	return len;
99 }
100 
dlm_recover_status_show(struct dlm_ls * ls,char * buf)101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102 {
103 	uint32_t status = dlm_recover_status(ls);
104 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
105 }
106 
dlm_recover_nodeid_show(struct dlm_ls * ls,char * buf)107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108 {
109 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110 }
111 
112 struct dlm_attr {
113 	struct attribute attr;
114 	ssize_t (*show)(struct dlm_ls *, char *);
115 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116 };
117 
118 static struct dlm_attr dlm_attr_control = {
119 	.attr  = {.name = "control", .mode = S_IWUSR},
120 	.store = dlm_control_store
121 };
122 
123 static struct dlm_attr dlm_attr_event = {
124 	.attr  = {.name = "event_done", .mode = S_IWUSR},
125 	.store = dlm_event_store
126 };
127 
128 static struct dlm_attr dlm_attr_id = {
129 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130 	.show  = dlm_id_show,
131 	.store = dlm_id_store
132 };
133 
134 static struct dlm_attr dlm_attr_nodir = {
135 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136 	.show  = dlm_nodir_show,
137 	.store = dlm_nodir_store
138 };
139 
140 static struct dlm_attr dlm_attr_recover_status = {
141 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
142 	.show  = dlm_recover_status_show
143 };
144 
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
147 	.show  = dlm_recover_nodeid_show
148 };
149 
150 static struct attribute *dlm_attrs[] = {
151 	&dlm_attr_control.attr,
152 	&dlm_attr_event.attr,
153 	&dlm_attr_id.attr,
154 	&dlm_attr_nodir.attr,
155 	&dlm_attr_recover_status.attr,
156 	&dlm_attr_recover_nodeid.attr,
157 	NULL,
158 };
159 ATTRIBUTE_GROUPS(dlm);
160 
dlm_attr_show(struct kobject * kobj,struct attribute * attr,char * buf)161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162 			     char *buf)
163 {
164 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
165 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166 	return a->show ? a->show(ls, buf) : 0;
167 }
168 
dlm_attr_store(struct kobject * kobj,struct attribute * attr,const char * buf,size_t len)169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170 			      const char *buf, size_t len)
171 {
172 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
173 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174 	return a->store ? a->store(ls, buf, len) : len;
175 }
176 
177 static const struct sysfs_ops dlm_attr_ops = {
178 	.show  = dlm_attr_show,
179 	.store = dlm_attr_store,
180 };
181 
182 static struct kobj_type dlm_ktype = {
183 	.default_groups = dlm_groups,
184 	.sysfs_ops     = &dlm_attr_ops,
185 };
186 
187 static struct kset *dlm_kset;
188 
do_uevent(struct dlm_ls * ls,int in,unsigned int release_recover)189 static int do_uevent(struct dlm_ls *ls, int in, unsigned int release_recover)
190 {
191 	char message[512] = {};
192 	char *envp[] = { message, NULL };
193 
194 	if (in) {
195 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
196 	} else {
197 		snprintf(message, 511, "RELEASE_RECOVER=%u", release_recover);
198 		kobject_uevent_env(&ls->ls_kobj, KOBJ_OFFLINE, envp);
199 	}
200 
201 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
202 
203 	/* dlm_controld will see the uevent, do the necessary group management
204 	   and then write to sysfs to wake us */
205 
206 	wait_event(ls->ls_uevent_wait,
207 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
208 
209 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
210 
211 	return ls->ls_uevent_result;
212 }
213 
dlm_uevent(const struct kobject * kobj,struct kobj_uevent_env * env)214 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
215 {
216 	const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
217 
218 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
219 	return 0;
220 }
221 
222 static const struct kset_uevent_ops dlm_uevent_ops = {
223 	.uevent = dlm_uevent,
224 };
225 
dlm_lockspace_init(void)226 int __init dlm_lockspace_init(void)
227 {
228 	ls_count = 0;
229 	mutex_init(&ls_lock);
230 	INIT_LIST_HEAD(&lslist);
231 	spin_lock_init(&lslist_lock);
232 
233 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
234 	if (!dlm_kset) {
235 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
236 		return -ENOMEM;
237 	}
238 	return 0;
239 }
240 
dlm_lockspace_exit(void)241 void dlm_lockspace_exit(void)
242 {
243 	kset_unregister(dlm_kset);
244 }
245 
dlm_find_lockspace_global(uint32_t id)246 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
247 {
248 	struct dlm_ls *ls;
249 
250 	spin_lock_bh(&lslist_lock);
251 
252 	list_for_each_entry(ls, &lslist, ls_list) {
253 		if (ls->ls_global_id == id) {
254 			atomic_inc(&ls->ls_count);
255 			goto out;
256 		}
257 	}
258 	ls = NULL;
259  out:
260 	spin_unlock_bh(&lslist_lock);
261 	return ls;
262 }
263 
dlm_find_lockspace_local(dlm_lockspace_t * lockspace)264 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
265 {
266 	struct dlm_ls *ls = lockspace;
267 
268 	atomic_inc(&ls->ls_count);
269 	return ls;
270 }
271 
dlm_find_lockspace_device(int minor)272 struct dlm_ls *dlm_find_lockspace_device(int minor)
273 {
274 	struct dlm_ls *ls;
275 
276 	spin_lock_bh(&lslist_lock);
277 	list_for_each_entry(ls, &lslist, ls_list) {
278 		if (ls->ls_device.minor == minor) {
279 			atomic_inc(&ls->ls_count);
280 			goto out;
281 		}
282 	}
283 	ls = NULL;
284  out:
285 	spin_unlock_bh(&lslist_lock);
286 	return ls;
287 }
288 
dlm_put_lockspace(struct dlm_ls * ls)289 void dlm_put_lockspace(struct dlm_ls *ls)
290 {
291 	if (atomic_dec_and_test(&ls->ls_count))
292 		wake_up(&ls->ls_count_wait);
293 }
294 
remove_lockspace(struct dlm_ls * ls)295 static void remove_lockspace(struct dlm_ls *ls)
296 {
297 retry:
298 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
299 
300 	spin_lock_bh(&lslist_lock);
301 	if (atomic_read(&ls->ls_count) != 0) {
302 		spin_unlock_bh(&lslist_lock);
303 		goto retry;
304 	}
305 
306 	WARN_ON(ls->ls_create_count != 0);
307 	list_del(&ls->ls_list);
308 	spin_unlock_bh(&lslist_lock);
309 }
310 
threads_start(void)311 static int threads_start(void)
312 {
313 	int error;
314 
315 	/* Thread for sending/receiving messages for all lockspace's */
316 	error = dlm_midcomms_start();
317 	if (error)
318 		log_print("cannot start dlm midcomms %d", error);
319 
320 	return error;
321 }
322 
lkb_idr_free(struct dlm_lkb * lkb)323 static int lkb_idr_free(struct dlm_lkb *lkb)
324 {
325 	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
326 		dlm_free_lvb(lkb->lkb_lvbptr);
327 
328 	dlm_free_lkb(lkb);
329 	return 0;
330 }
331 
rhash_free_rsb(void * ptr,void * arg)332 static void rhash_free_rsb(void *ptr, void *arg)
333 {
334 	struct dlm_rsb *rsb = ptr;
335 
336 	dlm_free_rsb(rsb);
337 }
338 
free_lockspace(struct work_struct * work)339 static void free_lockspace(struct work_struct *work)
340 {
341 	struct dlm_ls *ls  = container_of(work, struct dlm_ls, ls_free_work);
342 	struct dlm_lkb *lkb;
343 	unsigned long id;
344 
345 	/*
346 	 * Free all lkb's in xa
347 	 */
348 	xa_for_each(&ls->ls_lkbxa, id, lkb) {
349 		lkb_idr_free(lkb);
350 	}
351 	xa_destroy(&ls->ls_lkbxa);
352 
353 	/*
354 	 * Free all rsb's on rsbtbl
355 	 */
356 	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
357 
358 	kfree(ls);
359 }
360 
new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)361 static int new_lockspace(const char *name, const char *cluster,
362 			 uint32_t flags, int lvblen,
363 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
364 			 int *ops_result, dlm_lockspace_t **lockspace)
365 {
366 	struct dlm_ls *ls;
367 	int namelen = strlen(name);
368 	int error;
369 
370 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
371 		return -EINVAL;
372 
373 	if (lvblen % 8)
374 		return -EINVAL;
375 
376 	if (!try_module_get(THIS_MODULE))
377 		return -EINVAL;
378 
379 	if (!dlm_user_daemon_available()) {
380 		log_print("dlm user daemon not available");
381 		error = -EUNATCH;
382 		goto out;
383 	}
384 
385 	if (ops && ops_result) {
386 	       	if (!dlm_config.ci_recover_callbacks)
387 			*ops_result = -EOPNOTSUPP;
388 		else
389 			*ops_result = 0;
390 	}
391 
392 	if (!cluster)
393 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
394 			  dlm_config.ci_cluster_name);
395 
396 	if (dlm_config.ci_recover_callbacks && cluster &&
397 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
398 		log_print("dlm cluster name '%s' does not match "
399 			  "the application cluster name '%s'",
400 			  dlm_config.ci_cluster_name, cluster);
401 		error = -EBADR;
402 		goto out;
403 	}
404 
405 	error = 0;
406 
407 	spin_lock_bh(&lslist_lock);
408 	list_for_each_entry(ls, &lslist, ls_list) {
409 		WARN_ON(ls->ls_create_count <= 0);
410 		if (ls->ls_namelen != namelen)
411 			continue;
412 		if (memcmp(ls->ls_name, name, namelen))
413 			continue;
414 		if (flags & DLM_LSFL_NEWEXCL) {
415 			error = -EEXIST;
416 			break;
417 		}
418 		ls->ls_create_count++;
419 		*lockspace = ls;
420 		error = 1;
421 		break;
422 	}
423 	spin_unlock_bh(&lslist_lock);
424 
425 	if (error)
426 		goto out;
427 
428 	error = -ENOMEM;
429 
430 	ls = kzalloc(sizeof(*ls), GFP_NOFS);
431 	if (!ls)
432 		goto out;
433 	memcpy(ls->ls_name, name, namelen);
434 	ls->ls_namelen = namelen;
435 	ls->ls_lvblen = lvblen;
436 	atomic_set(&ls->ls_count, 0);
437 	init_waitqueue_head(&ls->ls_count_wait);
438 	ls->ls_flags = 0;
439 
440 	if (ops && dlm_config.ci_recover_callbacks) {
441 		ls->ls_ops = ops;
442 		ls->ls_ops_arg = ops_arg;
443 	}
444 
445 	if (flags & DLM_LSFL_SOFTIRQ)
446 		set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
447 
448 	/* ls_exflags are forced to match among nodes, and we don't
449 	 * need to require all nodes to have some flags set
450 	 */
451 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
452 				    DLM_LSFL_SOFTIRQ));
453 
454 	INIT_LIST_HEAD(&ls->ls_slow_inactive);
455 	INIT_LIST_HEAD(&ls->ls_slow_active);
456 	rwlock_init(&ls->ls_rsbtbl_lock);
457 
458 	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
459 	if (error)
460 		goto out_lsfree;
461 
462 	xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
463 	rwlock_init(&ls->ls_lkbxa_lock);
464 
465 	INIT_LIST_HEAD(&ls->ls_waiters);
466 	spin_lock_init(&ls->ls_waiters_lock);
467 	INIT_LIST_HEAD(&ls->ls_orphans);
468 	spin_lock_init(&ls->ls_orphans_lock);
469 
470 	INIT_LIST_HEAD(&ls->ls_nodes);
471 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
472 	ls->ls_num_nodes = 0;
473 	ls->ls_low_nodeid = 0;
474 	ls->ls_total_weight = 0;
475 	ls->ls_node_array = NULL;
476 
477 	memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
478 	ls->ls_local_rsb.res_ls = ls;
479 
480 	ls->ls_debug_rsb_dentry = NULL;
481 	ls->ls_debug_waiters_dentry = NULL;
482 
483 	init_waitqueue_head(&ls->ls_uevent_wait);
484 	ls->ls_uevent_result = 0;
485 	init_completion(&ls->ls_recovery_done);
486 	ls->ls_recovery_result = -1;
487 
488 	spin_lock_init(&ls->ls_cb_lock);
489 	INIT_LIST_HEAD(&ls->ls_cb_delay);
490 
491 	INIT_WORK(&ls->ls_free_work, free_lockspace);
492 
493 	ls->ls_recoverd_task = NULL;
494 	mutex_init(&ls->ls_recoverd_active);
495 	spin_lock_init(&ls->ls_recover_lock);
496 	spin_lock_init(&ls->ls_rcom_spin);
497 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
498 	ls->ls_recover_status = 0;
499 	ls->ls_recover_seq = get_random_u64();
500 	ls->ls_recover_args = NULL;
501 	init_rwsem(&ls->ls_in_recovery);
502 	rwlock_init(&ls->ls_recv_active);
503 	INIT_LIST_HEAD(&ls->ls_requestqueue);
504 	rwlock_init(&ls->ls_requestqueue_lock);
505 	spin_lock_init(&ls->ls_clear_proc_locks);
506 
507 	/* Due backwards compatibility with 3.1 we need to use maximum
508 	 * possible dlm message size to be sure the message will fit and
509 	 * not having out of bounds issues. However on sending side 3.2
510 	 * might send less.
511 	 */
512 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
513 	if (!ls->ls_recover_buf) {
514 		error = -ENOMEM;
515 		goto out_lkbxa;
516 	}
517 
518 	ls->ls_slot = 0;
519 	ls->ls_num_slots = 0;
520 	ls->ls_slots_size = 0;
521 	ls->ls_slots = NULL;
522 
523 	INIT_LIST_HEAD(&ls->ls_recover_list);
524 	spin_lock_init(&ls->ls_recover_list_lock);
525 	xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
526 	spin_lock_init(&ls->ls_recover_xa_lock);
527 	ls->ls_recover_list_count = 0;
528 	init_waitqueue_head(&ls->ls_wait_general);
529 	INIT_LIST_HEAD(&ls->ls_masters_list);
530 	rwlock_init(&ls->ls_masters_lock);
531 	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
532 	rwlock_init(&ls->ls_dir_dump_lock);
533 
534 	INIT_LIST_HEAD(&ls->ls_scan_list);
535 	spin_lock_init(&ls->ls_scan_lock);
536 	timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
537 
538 	spin_lock_bh(&lslist_lock);
539 	ls->ls_create_count = 1;
540 	list_add(&ls->ls_list, &lslist);
541 	spin_unlock_bh(&lslist_lock);
542 
543 	if (flags & DLM_LSFL_FS)
544 		set_bit(LSFL_FS, &ls->ls_flags);
545 
546 	error = dlm_callback_start(ls);
547 	if (error) {
548 		log_error(ls, "can't start dlm_callback %d", error);
549 		goto out_delist;
550 	}
551 
552 	init_waitqueue_head(&ls->ls_recover_lock_wait);
553 
554 	/*
555 	 * Once started, dlm_recoverd first looks for ls in lslist, then
556 	 * initializes ls_in_recovery as locked in "down" mode.  We need
557 	 * to wait for the wakeup from dlm_recoverd because in_recovery
558 	 * has to start out in down mode.
559 	 */
560 
561 	error = dlm_recoverd_start(ls);
562 	if (error) {
563 		log_error(ls, "can't start dlm_recoverd %d", error);
564 		goto out_callback;
565 	}
566 
567 	wait_event(ls->ls_recover_lock_wait,
568 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
569 
570 	ls->ls_kobj.kset = dlm_kset;
571 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
572 				     "%s", ls->ls_name);
573 	if (error)
574 		goto out_recoverd;
575 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
576 
577 	/* This uevent triggers dlm_controld in userspace to add us to the
578 	   group of nodes that are members of this lockspace (managed by the
579 	   cluster infrastructure.)  Once it's done that, it tells us who the
580 	   current lockspace members are (via configfs) and then tells the
581 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
582 
583 	error = do_uevent(ls, 1, 0);
584 	if (error < 0)
585 		goto out_recoverd;
586 
587 	/* wait until recovery is successful or failed */
588 	wait_for_completion(&ls->ls_recovery_done);
589 	error = ls->ls_recovery_result;
590 	if (error)
591 		goto out_members;
592 
593 	dlm_create_debug_file(ls);
594 
595 	log_rinfo(ls, "join complete");
596 	*lockspace = ls;
597 	return 0;
598 
599  out_members:
600 	do_uevent(ls, 0, 0);
601 	dlm_clear_members(ls);
602 	kfree(ls->ls_node_array);
603  out_recoverd:
604 	dlm_recoverd_stop(ls);
605  out_callback:
606 	dlm_callback_stop(ls);
607  out_delist:
608 	spin_lock_bh(&lslist_lock);
609 	list_del(&ls->ls_list);
610 	spin_unlock_bh(&lslist_lock);
611 	xa_destroy(&ls->ls_recover_xa);
612 	kfree(ls->ls_recover_buf);
613  out_lkbxa:
614 	xa_destroy(&ls->ls_lkbxa);
615 	rhashtable_destroy(&ls->ls_rsbtbl);
616  out_lsfree:
617 	kobject_put(&ls->ls_kobj);
618 	kfree(ls);
619  out:
620 	module_put(THIS_MODULE);
621 	return error;
622 }
623 
__dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)624 static int __dlm_new_lockspace(const char *name, const char *cluster,
625 			       uint32_t flags, int lvblen,
626 			       const struct dlm_lockspace_ops *ops,
627 			       void *ops_arg, int *ops_result,
628 			       dlm_lockspace_t **lockspace)
629 {
630 	int error = 0;
631 
632 	mutex_lock(&ls_lock);
633 	if (!ls_count)
634 		error = threads_start();
635 	if (error)
636 		goto out;
637 
638 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
639 			      ops_result, lockspace);
640 	if (!error)
641 		ls_count++;
642 	if (error > 0)
643 		error = 0;
644 	if (!ls_count) {
645 		dlm_midcomms_shutdown();
646 		dlm_midcomms_stop();
647 	}
648  out:
649 	mutex_unlock(&ls_lock);
650 	return error;
651 }
652 
dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)653 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
654 		      int lvblen, const struct dlm_lockspace_ops *ops,
655 		      void *ops_arg, int *ops_result,
656 		      dlm_lockspace_t **lockspace)
657 {
658 	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
659 				   ops, ops_arg, ops_result, lockspace);
660 }
661 
dlm_new_user_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)662 int dlm_new_user_lockspace(const char *name, const char *cluster,
663 			   uint32_t flags, int lvblen,
664 			   const struct dlm_lockspace_ops *ops,
665 			   void *ops_arg, int *ops_result,
666 			   dlm_lockspace_t **lockspace)
667 {
668 	if (flags & DLM_LSFL_SOFTIRQ)
669 		return -EINVAL;
670 
671 	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
672 				   ops_arg, ops_result, lockspace);
673 }
674 
675 /* NOTE: We check the lkbxa here rather than the resource table.
676    This is because there may be LKBs queued as ASTs that have been unlinked
677    from their RSBs and are pending deletion once the AST has been delivered */
678 
lockspace_busy(struct dlm_ls * ls,unsigned int release_option)679 static int lockspace_busy(struct dlm_ls *ls, unsigned int release_option)
680 {
681 	struct dlm_lkb *lkb;
682 	unsigned long id;
683 	int rv = 0;
684 
685 	read_lock_bh(&ls->ls_lkbxa_lock);
686 	if (release_option == DLM_RELEASE_NO_LOCKS) {
687 		xa_for_each(&ls->ls_lkbxa, id, lkb) {
688 			rv = 1;
689 			break;
690 		}
691 	} else if (release_option == DLM_RELEASE_UNUSED) {
692 		/* TODO: handle this UNUSED option as NO_LOCKS in later patch */
693 		xa_for_each(&ls->ls_lkbxa, id, lkb) {
694 			if (lkb->lkb_nodeid == 0 &&
695 			    lkb->lkb_grmode != DLM_LOCK_IV) {
696 				rv = 1;
697 				break;
698 			}
699 		}
700 	} else {
701 		rv = 0;
702 	}
703 	read_unlock_bh(&ls->ls_lkbxa_lock);
704 	return rv;
705 }
706 
release_lockspace(struct dlm_ls * ls,unsigned int release_option)707 static int release_lockspace(struct dlm_ls *ls, unsigned int release_option)
708 {
709 	int busy, rv;
710 
711 	busy = lockspace_busy(ls, release_option);
712 
713 	spin_lock_bh(&lslist_lock);
714 	if (ls->ls_create_count == 1) {
715 		if (busy) {
716 			rv = -EBUSY;
717 		} else {
718 			/* remove_lockspace takes ls off lslist */
719 			ls->ls_create_count = 0;
720 			rv = 0;
721 		}
722 	} else if (ls->ls_create_count > 1) {
723 		rv = --ls->ls_create_count;
724 	} else {
725 		rv = -EINVAL;
726 	}
727 	spin_unlock_bh(&lslist_lock);
728 
729 	if (rv) {
730 		log_debug(ls, "release_lockspace no remove %d", rv);
731 		return rv;
732 	}
733 
734 	if (ls_count == 1)
735 		dlm_midcomms_version_wait();
736 
737 	dlm_device_deregister(ls);
738 
739 	if (release_option != DLM_RELEASE_NO_EVENT &&
740 	    dlm_user_daemon_available())
741 		do_uevent(ls, 0, (release_option == DLM_RELEASE_RECOVER));
742 
743 	dlm_recoverd_stop(ls);
744 
745 	/* clear the LSFL_RUNNING flag to fast up
746 	 * time_shutdown_sync(), we don't care anymore
747 	 */
748 	clear_bit(LSFL_RUNNING, &ls->ls_flags);
749 	timer_shutdown_sync(&ls->ls_scan_timer);
750 
751 	if (ls_count == 1) {
752 		dlm_clear_members(ls);
753 		dlm_midcomms_shutdown();
754 	}
755 
756 	dlm_callback_stop(ls);
757 
758 	remove_lockspace(ls);
759 
760 	dlm_delete_debug_file(ls);
761 
762 	kobject_put(&ls->ls_kobj);
763 
764 	xa_destroy(&ls->ls_recover_xa);
765 	kfree(ls->ls_recover_buf);
766 
767 	/*
768 	 * Free structures on any other lists
769 	 */
770 
771 	dlm_purge_requestqueue(ls);
772 	kfree(ls->ls_recover_args);
773 	dlm_clear_members(ls);
774 	dlm_clear_members_gone(ls);
775 	kfree(ls->ls_node_array);
776 
777 	log_rinfo(ls, "%s final free", __func__);
778 
779 	/* delayed free of data structures see free_lockspace() */
780 	queue_work(dlm_wq, &ls->ls_free_work);
781 	module_put(THIS_MODULE);
782 	return 0;
783 }
784 
785 /*
786  * Called when a system has released all its locks and is not going to use the
787  * lockspace any longer.  We free everything we're managing for this lockspace.
788  * Remaining nodes will go through the recovery process as if we'd died.  The
789  * lockspace must continue to function as usual, participating in recoveries,
790  * until this returns.
791  *
792  * See DLM_RELEASE defines for release_option values and their meaning.
793  */
794 
dlm_release_lockspace(void * lockspace,unsigned int release_option)795 int dlm_release_lockspace(void *lockspace, unsigned int release_option)
796 {
797 	struct dlm_ls *ls;
798 	int error;
799 
800 	if (release_option > __DLM_RELEASE_MAX)
801 		return -EINVAL;
802 
803 	ls = dlm_find_lockspace_local(lockspace);
804 	if (!ls)
805 		return -EINVAL;
806 	dlm_put_lockspace(ls);
807 
808 	mutex_lock(&ls_lock);
809 	error = release_lockspace(ls, release_option);
810 	if (!error)
811 		ls_count--;
812 	if (!ls_count)
813 		dlm_midcomms_stop();
814 	mutex_unlock(&ls_lock);
815 
816 	return error;
817 }
818 
dlm_stop_lockspaces(void)819 void dlm_stop_lockspaces(void)
820 {
821 	struct dlm_ls *ls;
822 	int count;
823 
824  restart:
825 	count = 0;
826 	spin_lock_bh(&lslist_lock);
827 	list_for_each_entry(ls, &lslist, ls_list) {
828 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
829 			count++;
830 			continue;
831 		}
832 		spin_unlock_bh(&lslist_lock);
833 		log_error(ls, "no userland control daemon, stopping lockspace");
834 		dlm_ls_stop(ls);
835 		goto restart;
836 	}
837 	spin_unlock_bh(&lslist_lock);
838 
839 	if (count)
840 		log_print("dlm user daemon left %d lockspaces", count);
841 }
842