1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int ls_count;
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32
dlm_control_store(struct dlm_ls * ls,const char * buf,size_t len)33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34 {
35 ssize_t ret = len;
36 int n;
37 int rc = kstrtoint(buf, 0, &n);
38
39 if (rc)
40 return rc;
41 ls = dlm_find_lockspace_local(ls);
42 if (!ls)
43 return -EINVAL;
44
45 switch (n) {
46 case 0:
47 dlm_ls_stop(ls);
48 break;
49 case 1:
50 dlm_ls_start(ls);
51 break;
52 default:
53 ret = -EINVAL;
54 }
55 dlm_put_lockspace(ls);
56 return ret;
57 }
58
dlm_event_store(struct dlm_ls * ls,const char * buf,size_t len)59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62
63 if (rc)
64 return rc;
65 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66 wake_up(&ls->ls_uevent_wait);
67 return len;
68 }
69
dlm_id_show(struct dlm_ls * ls,char * buf)70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71 {
72 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73 }
74
dlm_id_store(struct dlm_ls * ls,const char * buf,size_t len)75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76 {
77 int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78
79 if (rc)
80 return rc;
81 return len;
82 }
83
dlm_nodir_show(struct dlm_ls * ls,char * buf)84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85 {
86 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87 }
88
dlm_nodir_store(struct dlm_ls * ls,const char * buf,size_t len)89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90 {
91 int val;
92 int rc = kstrtoint(buf, 0, &val);
93
94 if (rc)
95 return rc;
96 if (val == 1)
97 set_bit(LSFL_NODIR, &ls->ls_flags);
98 return len;
99 }
100
dlm_recover_status_show(struct dlm_ls * ls,char * buf)101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102 {
103 uint32_t status = dlm_recover_status(ls);
104 return snprintf(buf, PAGE_SIZE, "%x\n", status);
105 }
106
dlm_recover_nodeid_show(struct dlm_ls * ls,char * buf)107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108 {
109 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110 }
111
112 struct dlm_attr {
113 struct attribute attr;
114 ssize_t (*show)(struct dlm_ls *, char *);
115 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116 };
117
118 static struct dlm_attr dlm_attr_control = {
119 .attr = {.name = "control", .mode = S_IWUSR},
120 .store = dlm_control_store
121 };
122
123 static struct dlm_attr dlm_attr_event = {
124 .attr = {.name = "event_done", .mode = S_IWUSR},
125 .store = dlm_event_store
126 };
127
128 static struct dlm_attr dlm_attr_id = {
129 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130 .show = dlm_id_show,
131 .store = dlm_id_store
132 };
133
134 static struct dlm_attr dlm_attr_nodir = {
135 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136 .show = dlm_nodir_show,
137 .store = dlm_nodir_store
138 };
139
140 static struct dlm_attr dlm_attr_recover_status = {
141 .attr = {.name = "recover_status", .mode = S_IRUGO},
142 .show = dlm_recover_status_show
143 };
144
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
147 .show = dlm_recover_nodeid_show
148 };
149
150 static struct attribute *dlm_attrs[] = {
151 &dlm_attr_control.attr,
152 &dlm_attr_event.attr,
153 &dlm_attr_id.attr,
154 &dlm_attr_nodir.attr,
155 &dlm_attr_recover_status.attr,
156 &dlm_attr_recover_nodeid.attr,
157 NULL,
158 };
159 ATTRIBUTE_GROUPS(dlm);
160
dlm_attr_show(struct kobject * kobj,struct attribute * attr,char * buf)161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162 char *buf)
163 {
164 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
165 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166 return a->show ? a->show(ls, buf) : 0;
167 }
168
dlm_attr_store(struct kobject * kobj,struct attribute * attr,const char * buf,size_t len)169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170 const char *buf, size_t len)
171 {
172 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
173 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174 return a->store ? a->store(ls, buf, len) : len;
175 }
176
177 static const struct sysfs_ops dlm_attr_ops = {
178 .show = dlm_attr_show,
179 .store = dlm_attr_store,
180 };
181
182 static struct kobj_type dlm_ktype = {
183 .default_groups = dlm_groups,
184 .sysfs_ops = &dlm_attr_ops,
185 };
186
187 static struct kset *dlm_kset;
188
do_uevent(struct dlm_ls * ls,int in)189 static int do_uevent(struct dlm_ls *ls, int in)
190 {
191 if (in)
192 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
193 else
194 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
195
196 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
197
198 /* dlm_controld will see the uevent, do the necessary group management
199 and then write to sysfs to wake us */
200
201 wait_event(ls->ls_uevent_wait,
202 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
203
204 log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
205
206 return ls->ls_uevent_result;
207 }
208
dlm_uevent(const struct kobject * kobj,struct kobj_uevent_env * env)209 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
210 {
211 const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
212
213 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
214 return 0;
215 }
216
217 static const struct kset_uevent_ops dlm_uevent_ops = {
218 .uevent = dlm_uevent,
219 };
220
dlm_lockspace_init(void)221 int __init dlm_lockspace_init(void)
222 {
223 ls_count = 0;
224 mutex_init(&ls_lock);
225 INIT_LIST_HEAD(&lslist);
226 spin_lock_init(&lslist_lock);
227
228 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
229 if (!dlm_kset) {
230 printk(KERN_WARNING "%s: can not create kset\n", __func__);
231 return -ENOMEM;
232 }
233 return 0;
234 }
235
dlm_lockspace_exit(void)236 void dlm_lockspace_exit(void)
237 {
238 kset_unregister(dlm_kset);
239 }
240
dlm_find_lockspace_global(uint32_t id)241 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
242 {
243 struct dlm_ls *ls;
244
245 spin_lock_bh(&lslist_lock);
246
247 list_for_each_entry(ls, &lslist, ls_list) {
248 if (ls->ls_global_id == id) {
249 atomic_inc(&ls->ls_count);
250 goto out;
251 }
252 }
253 ls = NULL;
254 out:
255 spin_unlock_bh(&lslist_lock);
256 return ls;
257 }
258
dlm_find_lockspace_local(dlm_lockspace_t * lockspace)259 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
260 {
261 struct dlm_ls *ls = lockspace;
262
263 atomic_inc(&ls->ls_count);
264 return ls;
265 }
266
dlm_find_lockspace_device(int minor)267 struct dlm_ls *dlm_find_lockspace_device(int minor)
268 {
269 struct dlm_ls *ls;
270
271 spin_lock_bh(&lslist_lock);
272 list_for_each_entry(ls, &lslist, ls_list) {
273 if (ls->ls_device.minor == minor) {
274 atomic_inc(&ls->ls_count);
275 goto out;
276 }
277 }
278 ls = NULL;
279 out:
280 spin_unlock_bh(&lslist_lock);
281 return ls;
282 }
283
dlm_put_lockspace(struct dlm_ls * ls)284 void dlm_put_lockspace(struct dlm_ls *ls)
285 {
286 if (atomic_dec_and_test(&ls->ls_count))
287 wake_up(&ls->ls_count_wait);
288 }
289
remove_lockspace(struct dlm_ls * ls)290 static void remove_lockspace(struct dlm_ls *ls)
291 {
292 retry:
293 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
294
295 spin_lock_bh(&lslist_lock);
296 if (atomic_read(&ls->ls_count) != 0) {
297 spin_unlock_bh(&lslist_lock);
298 goto retry;
299 }
300
301 WARN_ON(ls->ls_create_count != 0);
302 list_del(&ls->ls_list);
303 spin_unlock_bh(&lslist_lock);
304 }
305
threads_start(void)306 static int threads_start(void)
307 {
308 int error;
309
310 /* Thread for sending/receiving messages for all lockspace's */
311 error = dlm_midcomms_start();
312 if (error)
313 log_print("cannot start dlm midcomms %d", error);
314
315 return error;
316 }
317
lkb_idr_free(struct dlm_lkb * lkb)318 static int lkb_idr_free(struct dlm_lkb *lkb)
319 {
320 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
321 dlm_free_lvb(lkb->lkb_lvbptr);
322
323 dlm_free_lkb(lkb);
324 return 0;
325 }
326
rhash_free_rsb(void * ptr,void * arg)327 static void rhash_free_rsb(void *ptr, void *arg)
328 {
329 struct dlm_rsb *rsb = ptr;
330
331 dlm_free_rsb(rsb);
332 }
333
free_lockspace(struct work_struct * work)334 static void free_lockspace(struct work_struct *work)
335 {
336 struct dlm_ls *ls = container_of(work, struct dlm_ls, ls_free_work);
337 struct dlm_lkb *lkb;
338 unsigned long id;
339
340 /*
341 * Free all lkb's in xa
342 */
343 xa_for_each(&ls->ls_lkbxa, id, lkb) {
344 lkb_idr_free(lkb);
345 }
346 xa_destroy(&ls->ls_lkbxa);
347
348 /*
349 * Free all rsb's on rsbtbl
350 */
351 rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
352
353 kfree(ls);
354 }
355
new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)356 static int new_lockspace(const char *name, const char *cluster,
357 uint32_t flags, int lvblen,
358 const struct dlm_lockspace_ops *ops, void *ops_arg,
359 int *ops_result, dlm_lockspace_t **lockspace)
360 {
361 struct dlm_ls *ls;
362 int namelen = strlen(name);
363 int error;
364
365 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
366 return -EINVAL;
367
368 if (lvblen % 8)
369 return -EINVAL;
370
371 if (!try_module_get(THIS_MODULE))
372 return -EINVAL;
373
374 if (!dlm_user_daemon_available()) {
375 log_print("dlm user daemon not available");
376 error = -EUNATCH;
377 goto out;
378 }
379
380 if (ops && ops_result) {
381 if (!dlm_config.ci_recover_callbacks)
382 *ops_result = -EOPNOTSUPP;
383 else
384 *ops_result = 0;
385 }
386
387 if (!cluster)
388 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
389 dlm_config.ci_cluster_name);
390
391 if (dlm_config.ci_recover_callbacks && cluster &&
392 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
393 log_print("dlm cluster name '%s' does not match "
394 "the application cluster name '%s'",
395 dlm_config.ci_cluster_name, cluster);
396 error = -EBADR;
397 goto out;
398 }
399
400 error = 0;
401
402 spin_lock_bh(&lslist_lock);
403 list_for_each_entry(ls, &lslist, ls_list) {
404 WARN_ON(ls->ls_create_count <= 0);
405 if (ls->ls_namelen != namelen)
406 continue;
407 if (memcmp(ls->ls_name, name, namelen))
408 continue;
409 if (flags & DLM_LSFL_NEWEXCL) {
410 error = -EEXIST;
411 break;
412 }
413 ls->ls_create_count++;
414 *lockspace = ls;
415 error = 1;
416 break;
417 }
418 spin_unlock_bh(&lslist_lock);
419
420 if (error)
421 goto out;
422
423 error = -ENOMEM;
424
425 ls = kzalloc(sizeof(*ls), GFP_NOFS);
426 if (!ls)
427 goto out;
428 memcpy(ls->ls_name, name, namelen);
429 ls->ls_namelen = namelen;
430 ls->ls_lvblen = lvblen;
431 atomic_set(&ls->ls_count, 0);
432 init_waitqueue_head(&ls->ls_count_wait);
433 ls->ls_flags = 0;
434
435 if (ops && dlm_config.ci_recover_callbacks) {
436 ls->ls_ops = ops;
437 ls->ls_ops_arg = ops_arg;
438 }
439
440 if (flags & DLM_LSFL_SOFTIRQ)
441 set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
442
443 /* ls_exflags are forced to match among nodes, and we don't
444 * need to require all nodes to have some flags set
445 */
446 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
447 DLM_LSFL_SOFTIRQ));
448
449 INIT_LIST_HEAD(&ls->ls_slow_inactive);
450 INIT_LIST_HEAD(&ls->ls_slow_active);
451 rwlock_init(&ls->ls_rsbtbl_lock);
452
453 error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
454 if (error)
455 goto out_lsfree;
456
457 xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
458 rwlock_init(&ls->ls_lkbxa_lock);
459
460 INIT_LIST_HEAD(&ls->ls_waiters);
461 spin_lock_init(&ls->ls_waiters_lock);
462 INIT_LIST_HEAD(&ls->ls_orphans);
463 spin_lock_init(&ls->ls_orphans_lock);
464
465 INIT_LIST_HEAD(&ls->ls_nodes);
466 INIT_LIST_HEAD(&ls->ls_nodes_gone);
467 ls->ls_num_nodes = 0;
468 ls->ls_low_nodeid = 0;
469 ls->ls_total_weight = 0;
470 ls->ls_node_array = NULL;
471
472 memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
473 ls->ls_local_rsb.res_ls = ls;
474
475 ls->ls_debug_rsb_dentry = NULL;
476 ls->ls_debug_waiters_dentry = NULL;
477
478 init_waitqueue_head(&ls->ls_uevent_wait);
479 ls->ls_uevent_result = 0;
480 init_completion(&ls->ls_recovery_done);
481 ls->ls_recovery_result = -1;
482
483 spin_lock_init(&ls->ls_cb_lock);
484 INIT_LIST_HEAD(&ls->ls_cb_delay);
485
486 INIT_WORK(&ls->ls_free_work, free_lockspace);
487
488 ls->ls_recoverd_task = NULL;
489 mutex_init(&ls->ls_recoverd_active);
490 spin_lock_init(&ls->ls_recover_lock);
491 spin_lock_init(&ls->ls_rcom_spin);
492 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
493 ls->ls_recover_status = 0;
494 ls->ls_recover_seq = get_random_u64();
495 ls->ls_recover_args = NULL;
496 init_rwsem(&ls->ls_in_recovery);
497 rwlock_init(&ls->ls_recv_active);
498 INIT_LIST_HEAD(&ls->ls_requestqueue);
499 rwlock_init(&ls->ls_requestqueue_lock);
500 spin_lock_init(&ls->ls_clear_proc_locks);
501
502 /* Due backwards compatibility with 3.1 we need to use maximum
503 * possible dlm message size to be sure the message will fit and
504 * not having out of bounds issues. However on sending side 3.2
505 * might send less.
506 */
507 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
508 if (!ls->ls_recover_buf) {
509 error = -ENOMEM;
510 goto out_lkbxa;
511 }
512
513 ls->ls_slot = 0;
514 ls->ls_num_slots = 0;
515 ls->ls_slots_size = 0;
516 ls->ls_slots = NULL;
517
518 INIT_LIST_HEAD(&ls->ls_recover_list);
519 spin_lock_init(&ls->ls_recover_list_lock);
520 xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
521 spin_lock_init(&ls->ls_recover_xa_lock);
522 ls->ls_recover_list_count = 0;
523 init_waitqueue_head(&ls->ls_wait_general);
524 INIT_LIST_HEAD(&ls->ls_masters_list);
525 rwlock_init(&ls->ls_masters_lock);
526 INIT_LIST_HEAD(&ls->ls_dir_dump_list);
527 rwlock_init(&ls->ls_dir_dump_lock);
528
529 INIT_LIST_HEAD(&ls->ls_scan_list);
530 spin_lock_init(&ls->ls_scan_lock);
531 timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
532
533 spin_lock_bh(&lslist_lock);
534 ls->ls_create_count = 1;
535 list_add(&ls->ls_list, &lslist);
536 spin_unlock_bh(&lslist_lock);
537
538 if (flags & DLM_LSFL_FS)
539 set_bit(LSFL_FS, &ls->ls_flags);
540
541 error = dlm_callback_start(ls);
542 if (error) {
543 log_error(ls, "can't start dlm_callback %d", error);
544 goto out_delist;
545 }
546
547 init_waitqueue_head(&ls->ls_recover_lock_wait);
548
549 /*
550 * Once started, dlm_recoverd first looks for ls in lslist, then
551 * initializes ls_in_recovery as locked in "down" mode. We need
552 * to wait for the wakeup from dlm_recoverd because in_recovery
553 * has to start out in down mode.
554 */
555
556 error = dlm_recoverd_start(ls);
557 if (error) {
558 log_error(ls, "can't start dlm_recoverd %d", error);
559 goto out_callback;
560 }
561
562 wait_event(ls->ls_recover_lock_wait,
563 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
564
565 ls->ls_kobj.kset = dlm_kset;
566 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
567 "%s", ls->ls_name);
568 if (error)
569 goto out_recoverd;
570 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
571
572 /* This uevent triggers dlm_controld in userspace to add us to the
573 group of nodes that are members of this lockspace (managed by the
574 cluster infrastructure.) Once it's done that, it tells us who the
575 current lockspace members are (via configfs) and then tells the
576 lockspace to start running (via sysfs) in dlm_ls_start(). */
577
578 error = do_uevent(ls, 1);
579 if (error)
580 goto out_recoverd;
581
582 /* wait until recovery is successful or failed */
583 wait_for_completion(&ls->ls_recovery_done);
584 error = ls->ls_recovery_result;
585 if (error)
586 goto out_members;
587
588 dlm_create_debug_file(ls);
589
590 log_rinfo(ls, "join complete");
591 *lockspace = ls;
592 return 0;
593
594 out_members:
595 do_uevent(ls, 0);
596 dlm_clear_members(ls);
597 kfree(ls->ls_node_array);
598 out_recoverd:
599 dlm_recoverd_stop(ls);
600 out_callback:
601 dlm_callback_stop(ls);
602 out_delist:
603 spin_lock_bh(&lslist_lock);
604 list_del(&ls->ls_list);
605 spin_unlock_bh(&lslist_lock);
606 xa_destroy(&ls->ls_recover_xa);
607 kfree(ls->ls_recover_buf);
608 out_lkbxa:
609 xa_destroy(&ls->ls_lkbxa);
610 rhashtable_destroy(&ls->ls_rsbtbl);
611 out_lsfree:
612 kobject_put(&ls->ls_kobj);
613 kfree(ls);
614 out:
615 module_put(THIS_MODULE);
616 return error;
617 }
618
__dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)619 static int __dlm_new_lockspace(const char *name, const char *cluster,
620 uint32_t flags, int lvblen,
621 const struct dlm_lockspace_ops *ops,
622 void *ops_arg, int *ops_result,
623 dlm_lockspace_t **lockspace)
624 {
625 int error = 0;
626
627 mutex_lock(&ls_lock);
628 if (!ls_count)
629 error = threads_start();
630 if (error)
631 goto out;
632
633 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
634 ops_result, lockspace);
635 if (!error)
636 ls_count++;
637 if (error > 0)
638 error = 0;
639 if (!ls_count) {
640 dlm_midcomms_shutdown();
641 dlm_midcomms_stop();
642 }
643 out:
644 mutex_unlock(&ls_lock);
645 return error;
646 }
647
dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)648 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
649 int lvblen, const struct dlm_lockspace_ops *ops,
650 void *ops_arg, int *ops_result,
651 dlm_lockspace_t **lockspace)
652 {
653 return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
654 ops, ops_arg, ops_result, lockspace);
655 }
656
dlm_new_user_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)657 int dlm_new_user_lockspace(const char *name, const char *cluster,
658 uint32_t flags, int lvblen,
659 const struct dlm_lockspace_ops *ops,
660 void *ops_arg, int *ops_result,
661 dlm_lockspace_t **lockspace)
662 {
663 if (flags & DLM_LSFL_SOFTIRQ)
664 return -EINVAL;
665
666 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
667 ops_arg, ops_result, lockspace);
668 }
669
670 /* NOTE: We check the lkbxa here rather than the resource table.
671 This is because there may be LKBs queued as ASTs that have been unlinked
672 from their RSBs and are pending deletion once the AST has been delivered */
673
lockspace_busy(struct dlm_ls * ls,int force)674 static int lockspace_busy(struct dlm_ls *ls, int force)
675 {
676 struct dlm_lkb *lkb;
677 unsigned long id;
678 int rv = 0;
679
680 read_lock_bh(&ls->ls_lkbxa_lock);
681 if (force == 0) {
682 xa_for_each(&ls->ls_lkbxa, id, lkb) {
683 rv = 1;
684 break;
685 }
686 } else if (force == 1) {
687 xa_for_each(&ls->ls_lkbxa, id, lkb) {
688 if (lkb->lkb_nodeid == 0 &&
689 lkb->lkb_grmode != DLM_LOCK_IV) {
690 rv = 1;
691 break;
692 }
693 }
694 } else {
695 rv = 0;
696 }
697 read_unlock_bh(&ls->ls_lkbxa_lock);
698 return rv;
699 }
700
release_lockspace(struct dlm_ls * ls,int force)701 static int release_lockspace(struct dlm_ls *ls, int force)
702 {
703 int busy, rv;
704
705 busy = lockspace_busy(ls, force);
706
707 spin_lock_bh(&lslist_lock);
708 if (ls->ls_create_count == 1) {
709 if (busy) {
710 rv = -EBUSY;
711 } else {
712 /* remove_lockspace takes ls off lslist */
713 ls->ls_create_count = 0;
714 rv = 0;
715 }
716 } else if (ls->ls_create_count > 1) {
717 rv = --ls->ls_create_count;
718 } else {
719 rv = -EINVAL;
720 }
721 spin_unlock_bh(&lslist_lock);
722
723 if (rv) {
724 log_debug(ls, "release_lockspace no remove %d", rv);
725 return rv;
726 }
727
728 if (ls_count == 1)
729 dlm_midcomms_version_wait();
730
731 dlm_device_deregister(ls);
732
733 if (force < 3 && dlm_user_daemon_available())
734 do_uevent(ls, 0);
735
736 dlm_recoverd_stop(ls);
737
738 /* clear the LSFL_RUNNING flag to fast up
739 * time_shutdown_sync(), we don't care anymore
740 */
741 clear_bit(LSFL_RUNNING, &ls->ls_flags);
742 timer_shutdown_sync(&ls->ls_scan_timer);
743
744 if (ls_count == 1) {
745 dlm_clear_members(ls);
746 dlm_midcomms_shutdown();
747 }
748
749 dlm_callback_stop(ls);
750
751 remove_lockspace(ls);
752
753 dlm_delete_debug_file(ls);
754
755 kobject_put(&ls->ls_kobj);
756
757 xa_destroy(&ls->ls_recover_xa);
758 kfree(ls->ls_recover_buf);
759
760 /*
761 * Free structures on any other lists
762 */
763
764 dlm_purge_requestqueue(ls);
765 kfree(ls->ls_recover_args);
766 dlm_clear_members(ls);
767 dlm_clear_members_gone(ls);
768 kfree(ls->ls_node_array);
769
770 log_rinfo(ls, "%s final free", __func__);
771
772 /* delayed free of data structures see free_lockspace() */
773 queue_work(dlm_wq, &ls->ls_free_work);
774 module_put(THIS_MODULE);
775 return 0;
776 }
777
778 /*
779 * Called when a system has released all its locks and is not going to use the
780 * lockspace any longer. We free everything we're managing for this lockspace.
781 * Remaining nodes will go through the recovery process as if we'd died. The
782 * lockspace must continue to function as usual, participating in recoveries,
783 * until this returns.
784 *
785 * Force has 4 possible values:
786 * 0 - don't destroy lockspace if it has any LKBs
787 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
788 * 2 - destroy lockspace regardless of LKBs
789 * 3 - destroy lockspace as part of a forced shutdown
790 */
791
dlm_release_lockspace(void * lockspace,int force)792 int dlm_release_lockspace(void *lockspace, int force)
793 {
794 struct dlm_ls *ls;
795 int error;
796
797 ls = dlm_find_lockspace_local(lockspace);
798 if (!ls)
799 return -EINVAL;
800 dlm_put_lockspace(ls);
801
802 mutex_lock(&ls_lock);
803 error = release_lockspace(ls, force);
804 if (!error)
805 ls_count--;
806 if (!ls_count)
807 dlm_midcomms_stop();
808 mutex_unlock(&ls_lock);
809
810 return error;
811 }
812
dlm_stop_lockspaces(void)813 void dlm_stop_lockspaces(void)
814 {
815 struct dlm_ls *ls;
816 int count;
817
818 restart:
819 count = 0;
820 spin_lock_bh(&lslist_lock);
821 list_for_each_entry(ls, &lslist, ls_list) {
822 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
823 count++;
824 continue;
825 }
826 spin_unlock_bh(&lslist_lock);
827 log_error(ls, "no userland control daemon, stopping lockspace");
828 dlm_ls_stop(ls);
829 goto restart;
830 }
831 spin_unlock_bh(&lslist_lock);
832
833 if (count)
834 log_print("dlm user daemon left %d lockspaces", count);
835 }
836