1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int ls_count;
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32
dlm_control_store(struct dlm_ls * ls,const char * buf,size_t len)33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34 {
35 ssize_t ret = len;
36 int n;
37 int rc = kstrtoint(buf, 0, &n);
38
39 if (rc)
40 return rc;
41 ls = dlm_find_lockspace_local(ls);
42 if (!ls)
43 return -EINVAL;
44
45 switch (n) {
46 case 0:
47 dlm_ls_stop(ls);
48 break;
49 case 1:
50 dlm_ls_start(ls);
51 break;
52 default:
53 ret = -EINVAL;
54 }
55 dlm_put_lockspace(ls);
56 return ret;
57 }
58
dlm_event_store(struct dlm_ls * ls,const char * buf,size_t len)59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62
63 if (rc)
64 return rc;
65 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66 wake_up(&ls->ls_uevent_wait);
67 return len;
68 }
69
dlm_id_show(struct dlm_ls * ls,char * buf)70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71 {
72 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73 }
74
dlm_id_store(struct dlm_ls * ls,const char * buf,size_t len)75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76 {
77 int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78
79 if (rc)
80 return rc;
81 return len;
82 }
83
dlm_nodir_show(struct dlm_ls * ls,char * buf)84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85 {
86 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87 }
88
dlm_nodir_store(struct dlm_ls * ls,const char * buf,size_t len)89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90 {
91 int val;
92 int rc = kstrtoint(buf, 0, &val);
93
94 if (rc)
95 return rc;
96 if (val == 1)
97 set_bit(LSFL_NODIR, &ls->ls_flags);
98 return len;
99 }
100
dlm_recover_status_show(struct dlm_ls * ls,char * buf)101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102 {
103 uint32_t status = dlm_recover_status(ls);
104 return snprintf(buf, PAGE_SIZE, "%x\n", status);
105 }
106
dlm_recover_nodeid_show(struct dlm_ls * ls,char * buf)107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108 {
109 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110 }
111
112 struct dlm_attr {
113 struct attribute attr;
114 ssize_t (*show)(struct dlm_ls *, char *);
115 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116 };
117
118 static struct dlm_attr dlm_attr_control = {
119 .attr = {.name = "control", .mode = S_IWUSR},
120 .store = dlm_control_store
121 };
122
123 static struct dlm_attr dlm_attr_event = {
124 .attr = {.name = "event_done", .mode = S_IWUSR},
125 .store = dlm_event_store
126 };
127
128 static struct dlm_attr dlm_attr_id = {
129 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130 .show = dlm_id_show,
131 .store = dlm_id_store
132 };
133
134 static struct dlm_attr dlm_attr_nodir = {
135 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136 .show = dlm_nodir_show,
137 .store = dlm_nodir_store
138 };
139
140 static struct dlm_attr dlm_attr_recover_status = {
141 .attr = {.name = "recover_status", .mode = S_IRUGO},
142 .show = dlm_recover_status_show
143 };
144
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
147 .show = dlm_recover_nodeid_show
148 };
149
150 static struct attribute *dlm_attrs[] = {
151 &dlm_attr_control.attr,
152 &dlm_attr_event.attr,
153 &dlm_attr_id.attr,
154 &dlm_attr_nodir.attr,
155 &dlm_attr_recover_status.attr,
156 &dlm_attr_recover_nodeid.attr,
157 NULL,
158 };
159 ATTRIBUTE_GROUPS(dlm);
160
dlm_attr_show(struct kobject * kobj,struct attribute * attr,char * buf)161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162 char *buf)
163 {
164 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
165 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166 return a->show ? a->show(ls, buf) : 0;
167 }
168
dlm_attr_store(struct kobject * kobj,struct attribute * attr,const char * buf,size_t len)169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170 const char *buf, size_t len)
171 {
172 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
173 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174 return a->store ? a->store(ls, buf, len) : len;
175 }
176
177 static const struct sysfs_ops dlm_attr_ops = {
178 .show = dlm_attr_show,
179 .store = dlm_attr_store,
180 };
181
182 static struct kobj_type dlm_ktype = {
183 .default_groups = dlm_groups,
184 .sysfs_ops = &dlm_attr_ops,
185 };
186
187 static struct kset *dlm_kset;
188
do_uevent(struct dlm_ls * ls,int in,unsigned int release_recover)189 static int do_uevent(struct dlm_ls *ls, int in, unsigned int release_recover)
190 {
191 char message[512] = {};
192 char *envp[] = { message, NULL };
193
194 if (in) {
195 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
196 } else {
197 snprintf(message, 511, "RELEASE_RECOVER=%u", release_recover);
198 kobject_uevent_env(&ls->ls_kobj, KOBJ_OFFLINE, envp);
199 }
200
201 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
202
203 /* dlm_controld will see the uevent, do the necessary group management
204 and then write to sysfs to wake us */
205
206 wait_event(ls->ls_uevent_wait,
207 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
208
209 log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
210
211 return ls->ls_uevent_result;
212 }
213
dlm_uevent(const struct kobject * kobj,struct kobj_uevent_env * env)214 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
215 {
216 const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
217
218 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
219 return 0;
220 }
221
222 static const struct kset_uevent_ops dlm_uevent_ops = {
223 .uevent = dlm_uevent,
224 };
225
dlm_lockspace_init(void)226 int __init dlm_lockspace_init(void)
227 {
228 ls_count = 0;
229 mutex_init(&ls_lock);
230 INIT_LIST_HEAD(&lslist);
231 spin_lock_init(&lslist_lock);
232
233 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
234 if (!dlm_kset) {
235 printk(KERN_WARNING "%s: can not create kset\n", __func__);
236 return -ENOMEM;
237 }
238 return 0;
239 }
240
dlm_lockspace_exit(void)241 void dlm_lockspace_exit(void)
242 {
243 kset_unregister(dlm_kset);
244 }
245
dlm_find_lockspace_global(uint32_t id)246 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
247 {
248 struct dlm_ls *ls;
249
250 spin_lock_bh(&lslist_lock);
251
252 list_for_each_entry(ls, &lslist, ls_list) {
253 if (ls->ls_global_id == id) {
254 atomic_inc(&ls->ls_count);
255 goto out;
256 }
257 }
258 ls = NULL;
259 out:
260 spin_unlock_bh(&lslist_lock);
261 return ls;
262 }
263
dlm_find_lockspace_local(dlm_lockspace_t * lockspace)264 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
265 {
266 struct dlm_ls *ls = lockspace;
267
268 atomic_inc(&ls->ls_count);
269 return ls;
270 }
271
dlm_find_lockspace_device(int minor)272 struct dlm_ls *dlm_find_lockspace_device(int minor)
273 {
274 struct dlm_ls *ls;
275
276 spin_lock_bh(&lslist_lock);
277 list_for_each_entry(ls, &lslist, ls_list) {
278 if (ls->ls_device.minor == minor) {
279 atomic_inc(&ls->ls_count);
280 goto out;
281 }
282 }
283 ls = NULL;
284 out:
285 spin_unlock_bh(&lslist_lock);
286 return ls;
287 }
288
dlm_put_lockspace(struct dlm_ls * ls)289 void dlm_put_lockspace(struct dlm_ls *ls)
290 {
291 if (atomic_dec_and_test(&ls->ls_count))
292 wake_up(&ls->ls_count_wait);
293 }
294
remove_lockspace(struct dlm_ls * ls)295 static void remove_lockspace(struct dlm_ls *ls)
296 {
297 retry:
298 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
299
300 spin_lock_bh(&lslist_lock);
301 if (atomic_read(&ls->ls_count) != 0) {
302 spin_unlock_bh(&lslist_lock);
303 goto retry;
304 }
305
306 WARN_ON(ls->ls_create_count != 0);
307 list_del(&ls->ls_list);
308 spin_unlock_bh(&lslist_lock);
309 }
310
threads_start(void)311 static int threads_start(void)
312 {
313 int error;
314
315 /* Thread for sending/receiving messages for all lockspace's */
316 error = dlm_midcomms_start();
317 if (error)
318 log_print("cannot start dlm midcomms %d", error);
319
320 return error;
321 }
322
lkb_idr_free(struct dlm_lkb * lkb)323 static int lkb_idr_free(struct dlm_lkb *lkb)
324 {
325 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
326 dlm_free_lvb(lkb->lkb_lvbptr);
327
328 dlm_free_lkb(lkb);
329 return 0;
330 }
331
rhash_free_rsb(void * ptr,void * arg)332 static void rhash_free_rsb(void *ptr, void *arg)
333 {
334 struct dlm_rsb *rsb = ptr;
335
336 dlm_free_rsb(rsb);
337 }
338
free_lockspace(struct work_struct * work)339 static void free_lockspace(struct work_struct *work)
340 {
341 struct dlm_ls *ls = container_of(work, struct dlm_ls, ls_free_work);
342 struct dlm_lkb *lkb;
343 unsigned long id;
344
345 /*
346 * Free all lkb's in xa
347 */
348 xa_for_each(&ls->ls_lkbxa, id, lkb) {
349 lkb_idr_free(lkb);
350 }
351 xa_destroy(&ls->ls_lkbxa);
352
353 /*
354 * Free all rsb's on rsbtbl
355 */
356 rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
357
358 kfree(ls);
359 }
360
new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)361 static int new_lockspace(const char *name, const char *cluster,
362 uint32_t flags, int lvblen,
363 const struct dlm_lockspace_ops *ops, void *ops_arg,
364 int *ops_result, dlm_lockspace_t **lockspace)
365 {
366 struct dlm_ls *ls;
367 int namelen = strlen(name);
368 int error;
369
370 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
371 return -EINVAL;
372
373 if (lvblen % 8)
374 return -EINVAL;
375
376 if (!try_module_get(THIS_MODULE))
377 return -EINVAL;
378
379 if (!dlm_user_daemon_available()) {
380 log_print("dlm user daemon not available");
381 error = -EUNATCH;
382 goto out;
383 }
384
385 if (ops && ops_result) {
386 if (!dlm_config.ci_recover_callbacks)
387 *ops_result = -EOPNOTSUPP;
388 else
389 *ops_result = 0;
390 }
391
392 if (!cluster)
393 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
394 dlm_config.ci_cluster_name);
395
396 if (dlm_config.ci_recover_callbacks && cluster &&
397 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
398 log_print("dlm cluster name '%s' does not match "
399 "the application cluster name '%s'",
400 dlm_config.ci_cluster_name, cluster);
401 error = -EBADR;
402 goto out;
403 }
404
405 error = 0;
406
407 spin_lock_bh(&lslist_lock);
408 list_for_each_entry(ls, &lslist, ls_list) {
409 WARN_ON(ls->ls_create_count <= 0);
410 if (ls->ls_namelen != namelen)
411 continue;
412 if (memcmp(ls->ls_name, name, namelen))
413 continue;
414 if (flags & DLM_LSFL_NEWEXCL) {
415 error = -EEXIST;
416 break;
417 }
418 ls->ls_create_count++;
419 *lockspace = ls;
420 error = 1;
421 break;
422 }
423 spin_unlock_bh(&lslist_lock);
424
425 if (error)
426 goto out;
427
428 error = -ENOMEM;
429
430 ls = kzalloc(sizeof(*ls), GFP_NOFS);
431 if (!ls)
432 goto out;
433 memcpy(ls->ls_name, name, namelen);
434 ls->ls_namelen = namelen;
435 ls->ls_lvblen = lvblen;
436 atomic_set(&ls->ls_count, 0);
437 init_waitqueue_head(&ls->ls_count_wait);
438 ls->ls_flags = 0;
439
440 if (ops && dlm_config.ci_recover_callbacks) {
441 ls->ls_ops = ops;
442 ls->ls_ops_arg = ops_arg;
443 }
444
445 if (flags & DLM_LSFL_SOFTIRQ)
446 set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
447
448 /* ls_exflags are forced to match among nodes, and we don't
449 * need to require all nodes to have some flags set
450 */
451 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
452 DLM_LSFL_SOFTIRQ));
453
454 INIT_LIST_HEAD(&ls->ls_slow_inactive);
455 INIT_LIST_HEAD(&ls->ls_slow_active);
456 rwlock_init(&ls->ls_rsbtbl_lock);
457
458 error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
459 if (error)
460 goto out_lsfree;
461
462 xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
463 rwlock_init(&ls->ls_lkbxa_lock);
464
465 INIT_LIST_HEAD(&ls->ls_waiters);
466 spin_lock_init(&ls->ls_waiters_lock);
467 INIT_LIST_HEAD(&ls->ls_orphans);
468 spin_lock_init(&ls->ls_orphans_lock);
469
470 INIT_LIST_HEAD(&ls->ls_nodes);
471 INIT_LIST_HEAD(&ls->ls_nodes_gone);
472 ls->ls_num_nodes = 0;
473 ls->ls_low_nodeid = 0;
474 ls->ls_total_weight = 0;
475 ls->ls_node_array = NULL;
476
477 memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
478 ls->ls_local_rsb.res_ls = ls;
479
480 ls->ls_debug_rsb_dentry = NULL;
481 ls->ls_debug_waiters_dentry = NULL;
482
483 init_waitqueue_head(&ls->ls_uevent_wait);
484 ls->ls_uevent_result = 0;
485 init_completion(&ls->ls_recovery_done);
486 ls->ls_recovery_result = -1;
487
488 spin_lock_init(&ls->ls_cb_lock);
489 INIT_LIST_HEAD(&ls->ls_cb_delay);
490
491 INIT_WORK(&ls->ls_free_work, free_lockspace);
492
493 ls->ls_recoverd_task = NULL;
494 mutex_init(&ls->ls_recoverd_active);
495 spin_lock_init(&ls->ls_recover_lock);
496 spin_lock_init(&ls->ls_rcom_spin);
497 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
498 ls->ls_recover_status = 0;
499 ls->ls_recover_seq = get_random_u64();
500 ls->ls_recover_args = NULL;
501 init_rwsem(&ls->ls_in_recovery);
502 rwlock_init(&ls->ls_recv_active);
503 INIT_LIST_HEAD(&ls->ls_requestqueue);
504 rwlock_init(&ls->ls_requestqueue_lock);
505 spin_lock_init(&ls->ls_clear_proc_locks);
506
507 /* Due backwards compatibility with 3.1 we need to use maximum
508 * possible dlm message size to be sure the message will fit and
509 * not having out of bounds issues. However on sending side 3.2
510 * might send less.
511 */
512 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
513 if (!ls->ls_recover_buf) {
514 error = -ENOMEM;
515 goto out_lkbxa;
516 }
517
518 ls->ls_slot = 0;
519 ls->ls_num_slots = 0;
520 ls->ls_slots_size = 0;
521 ls->ls_slots = NULL;
522
523 INIT_LIST_HEAD(&ls->ls_recover_list);
524 spin_lock_init(&ls->ls_recover_list_lock);
525 xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
526 spin_lock_init(&ls->ls_recover_xa_lock);
527 ls->ls_recover_list_count = 0;
528 init_waitqueue_head(&ls->ls_wait_general);
529 INIT_LIST_HEAD(&ls->ls_masters_list);
530 rwlock_init(&ls->ls_masters_lock);
531 INIT_LIST_HEAD(&ls->ls_dir_dump_list);
532 rwlock_init(&ls->ls_dir_dump_lock);
533
534 INIT_LIST_HEAD(&ls->ls_scan_list);
535 spin_lock_init(&ls->ls_scan_lock);
536 timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
537
538 spin_lock_bh(&lslist_lock);
539 ls->ls_create_count = 1;
540 list_add(&ls->ls_list, &lslist);
541 spin_unlock_bh(&lslist_lock);
542
543 if (flags & DLM_LSFL_FS)
544 set_bit(LSFL_FS, &ls->ls_flags);
545
546 error = dlm_callback_start(ls);
547 if (error) {
548 log_error(ls, "can't start dlm_callback %d", error);
549 goto out_delist;
550 }
551
552 init_waitqueue_head(&ls->ls_recover_lock_wait);
553
554 /*
555 * Once started, dlm_recoverd first looks for ls in lslist, then
556 * initializes ls_in_recovery as locked in "down" mode. We need
557 * to wait for the wakeup from dlm_recoverd because in_recovery
558 * has to start out in down mode.
559 */
560
561 error = dlm_recoverd_start(ls);
562 if (error) {
563 log_error(ls, "can't start dlm_recoverd %d", error);
564 goto out_callback;
565 }
566
567 wait_event(ls->ls_recover_lock_wait,
568 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
569
570 ls->ls_kobj.kset = dlm_kset;
571 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
572 "%s", ls->ls_name);
573 if (error)
574 goto out_recoverd;
575 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
576
577 /* This uevent triggers dlm_controld in userspace to add us to the
578 group of nodes that are members of this lockspace (managed by the
579 cluster infrastructure.) Once it's done that, it tells us who the
580 current lockspace members are (via configfs) and then tells the
581 lockspace to start running (via sysfs) in dlm_ls_start(). */
582
583 error = do_uevent(ls, 1, 0);
584 if (error < 0)
585 goto out_recoverd;
586
587 /* wait until recovery is successful or failed */
588 wait_for_completion(&ls->ls_recovery_done);
589 error = ls->ls_recovery_result;
590 if (error)
591 goto out_members;
592
593 dlm_create_debug_file(ls);
594
595 log_rinfo(ls, "join complete");
596 *lockspace = ls;
597 return 0;
598
599 out_members:
600 do_uevent(ls, 0, 0);
601 dlm_clear_members(ls);
602 kfree(ls->ls_node_array);
603 out_recoverd:
604 dlm_recoverd_stop(ls);
605 out_callback:
606 dlm_callback_stop(ls);
607 out_delist:
608 spin_lock_bh(&lslist_lock);
609 list_del(&ls->ls_list);
610 spin_unlock_bh(&lslist_lock);
611 xa_destroy(&ls->ls_recover_xa);
612 kfree(ls->ls_recover_buf);
613 out_lkbxa:
614 xa_destroy(&ls->ls_lkbxa);
615 rhashtable_destroy(&ls->ls_rsbtbl);
616 out_lsfree:
617 kobject_put(&ls->ls_kobj);
618 kfree(ls);
619 out:
620 module_put(THIS_MODULE);
621 return error;
622 }
623
__dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)624 static int __dlm_new_lockspace(const char *name, const char *cluster,
625 uint32_t flags, int lvblen,
626 const struct dlm_lockspace_ops *ops,
627 void *ops_arg, int *ops_result,
628 dlm_lockspace_t **lockspace)
629 {
630 int error = 0;
631
632 mutex_lock(&ls_lock);
633 if (!ls_count)
634 error = threads_start();
635 if (error)
636 goto out;
637
638 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
639 ops_result, lockspace);
640 if (!error)
641 ls_count++;
642 if (error > 0)
643 error = 0;
644 if (!ls_count) {
645 dlm_midcomms_shutdown();
646 dlm_midcomms_stop();
647 }
648 out:
649 mutex_unlock(&ls_lock);
650 return error;
651 }
652
dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)653 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
654 int lvblen, const struct dlm_lockspace_ops *ops,
655 void *ops_arg, int *ops_result,
656 dlm_lockspace_t **lockspace)
657 {
658 return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
659 ops, ops_arg, ops_result, lockspace);
660 }
661
dlm_new_user_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)662 int dlm_new_user_lockspace(const char *name, const char *cluster,
663 uint32_t flags, int lvblen,
664 const struct dlm_lockspace_ops *ops,
665 void *ops_arg, int *ops_result,
666 dlm_lockspace_t **lockspace)
667 {
668 if (flags & DLM_LSFL_SOFTIRQ)
669 return -EINVAL;
670
671 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
672 ops_arg, ops_result, lockspace);
673 }
674
675 /* NOTE: We check the lkbxa here rather than the resource table.
676 This is because there may be LKBs queued as ASTs that have been unlinked
677 from their RSBs and are pending deletion once the AST has been delivered */
678
lockspace_busy(struct dlm_ls * ls,unsigned int release_option)679 static int lockspace_busy(struct dlm_ls *ls, unsigned int release_option)
680 {
681 struct dlm_lkb *lkb;
682 unsigned long id;
683 int rv = 0;
684
685 read_lock_bh(&ls->ls_lkbxa_lock);
686 if (release_option == DLM_RELEASE_NO_LOCKS) {
687 xa_for_each(&ls->ls_lkbxa, id, lkb) {
688 rv = 1;
689 break;
690 }
691 } else if (release_option == DLM_RELEASE_UNUSED) {
692 /* TODO: handle this UNUSED option as NO_LOCKS in later patch */
693 xa_for_each(&ls->ls_lkbxa, id, lkb) {
694 if (lkb->lkb_nodeid == 0 &&
695 lkb->lkb_grmode != DLM_LOCK_IV) {
696 rv = 1;
697 break;
698 }
699 }
700 } else {
701 rv = 0;
702 }
703 read_unlock_bh(&ls->ls_lkbxa_lock);
704 return rv;
705 }
706
release_lockspace(struct dlm_ls * ls,unsigned int release_option)707 static int release_lockspace(struct dlm_ls *ls, unsigned int release_option)
708 {
709 int busy, rv;
710
711 busy = lockspace_busy(ls, release_option);
712
713 spin_lock_bh(&lslist_lock);
714 if (ls->ls_create_count == 1) {
715 if (busy) {
716 rv = -EBUSY;
717 } else {
718 /* remove_lockspace takes ls off lslist */
719 ls->ls_create_count = 0;
720 rv = 0;
721 }
722 } else if (ls->ls_create_count > 1) {
723 rv = --ls->ls_create_count;
724 } else {
725 rv = -EINVAL;
726 }
727 spin_unlock_bh(&lslist_lock);
728
729 if (rv) {
730 log_debug(ls, "release_lockspace no remove %d", rv);
731 return rv;
732 }
733
734 if (ls_count == 1)
735 dlm_midcomms_version_wait();
736
737 dlm_device_deregister(ls);
738
739 if (release_option != DLM_RELEASE_NO_EVENT &&
740 dlm_user_daemon_available())
741 do_uevent(ls, 0, (release_option == DLM_RELEASE_RECOVER));
742
743 dlm_recoverd_stop(ls);
744
745 /* clear the LSFL_RUNNING flag to fast up
746 * time_shutdown_sync(), we don't care anymore
747 */
748 clear_bit(LSFL_RUNNING, &ls->ls_flags);
749 timer_shutdown_sync(&ls->ls_scan_timer);
750
751 if (ls_count == 1) {
752 dlm_clear_members(ls);
753 dlm_midcomms_shutdown();
754 }
755
756 dlm_callback_stop(ls);
757
758 remove_lockspace(ls);
759
760 dlm_delete_debug_file(ls);
761
762 kobject_put(&ls->ls_kobj);
763
764 xa_destroy(&ls->ls_recover_xa);
765 kfree(ls->ls_recover_buf);
766
767 /*
768 * Free structures on any other lists
769 */
770
771 dlm_purge_requestqueue(ls);
772 kfree(ls->ls_recover_args);
773 dlm_clear_members(ls);
774 dlm_clear_members_gone(ls);
775 kfree(ls->ls_node_array);
776
777 log_rinfo(ls, "%s final free", __func__);
778
779 /* delayed free of data structures see free_lockspace() */
780 queue_work(dlm_wq, &ls->ls_free_work);
781 module_put(THIS_MODULE);
782 return 0;
783 }
784
785 /*
786 * Called when a system has released all its locks and is not going to use the
787 * lockspace any longer. We free everything we're managing for this lockspace.
788 * Remaining nodes will go through the recovery process as if we'd died. The
789 * lockspace must continue to function as usual, participating in recoveries,
790 * until this returns.
791 *
792 * See DLM_RELEASE defines for release_option values and their meaning.
793 */
794
dlm_release_lockspace(void * lockspace,unsigned int release_option)795 int dlm_release_lockspace(void *lockspace, unsigned int release_option)
796 {
797 struct dlm_ls *ls;
798 int error;
799
800 if (release_option > __DLM_RELEASE_MAX)
801 return -EINVAL;
802
803 ls = dlm_find_lockspace_local(lockspace);
804 if (!ls)
805 return -EINVAL;
806 dlm_put_lockspace(ls);
807
808 mutex_lock(&ls_lock);
809 error = release_lockspace(ls, release_option);
810 if (!error)
811 ls_count--;
812 if (!ls_count)
813 dlm_midcomms_stop();
814 mutex_unlock(&ls_lock);
815
816 return error;
817 }
818
dlm_stop_lockspaces(void)819 void dlm_stop_lockspaces(void)
820 {
821 struct dlm_ls *ls;
822 int count;
823
824 restart:
825 count = 0;
826 spin_lock_bh(&lslist_lock);
827 list_for_each_entry(ls, &lslist, ls_list) {
828 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
829 count++;
830 continue;
831 }
832 spin_unlock_bh(&lslist_lock);
833 log_error(ls, "no userland control daemon, stopping lockspace");
834 dlm_ls_stop(ls);
835 goto restart;
836 }
837 spin_unlock_bh(&lslist_lock);
838
839 if (count)
840 log_print("dlm user daemon left %d lockspaces", count);
841 }
842