1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include <linux/module.h> 13 14 #include "dlm_internal.h" 15 #include "lockspace.h" 16 #include "member.h" 17 #include "recoverd.h" 18 #include "dir.h" 19 #include "midcomms.h" 20 #include "config.h" 21 #include "memory.h" 22 #include "lock.h" 23 #include "recover.h" 24 #include "requestqueue.h" 25 #include "user.h" 26 #include "ast.h" 27 28 static int ls_count; 29 static struct mutex ls_lock; 30 static struct list_head lslist; 31 static spinlock_t lslist_lock; 32 33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) 34 { 35 ssize_t ret = len; 36 int n; 37 int rc = kstrtoint(buf, 0, &n); 38 39 if (rc) 40 return rc; 41 ls = dlm_find_lockspace_local(ls); 42 if (!ls) 43 return -EINVAL; 44 45 switch (n) { 46 case 0: 47 dlm_ls_stop(ls); 48 break; 49 case 1: 50 dlm_ls_start(ls); 51 break; 52 default: 53 ret = -EINVAL; 54 } 55 dlm_put_lockspace(ls); 56 return ret; 57 } 58 59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len) 60 { 61 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result); 62 63 if (rc) 64 return rc; 65 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags); 66 wake_up(&ls->ls_uevent_wait); 67 return len; 68 } 69 70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf) 71 { 72 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id); 73 } 74 75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) 76 { 77 int rc = kstrtouint(buf, 0, &ls->ls_global_id); 78 79 if (rc) 80 return rc; 81 return len; 82 } 83 84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf) 85 { 86 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls)); 87 } 88 89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len) 90 { 91 int val; 92 int rc = kstrtoint(buf, 0, &val); 93 94 if (rc) 95 return rc; 96 if (val == 1) 97 set_bit(LSFL_NODIR, &ls->ls_flags); 98 return len; 99 } 100 101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf) 102 { 103 uint32_t status = dlm_recover_status(ls); 104 return snprintf(buf, PAGE_SIZE, "%x\n", status); 105 } 106 107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf) 108 { 109 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid); 110 } 111 112 struct dlm_attr { 113 struct attribute attr; 114 ssize_t (*show)(struct dlm_ls *, char *); 115 ssize_t (*store)(struct dlm_ls *, const char *, size_t); 116 }; 117 118 static struct dlm_attr dlm_attr_control = { 119 .attr = {.name = "control", .mode = S_IWUSR}, 120 .store = dlm_control_store 121 }; 122 123 static struct dlm_attr dlm_attr_event = { 124 .attr = {.name = "event_done", .mode = S_IWUSR}, 125 .store = dlm_event_store 126 }; 127 128 static struct dlm_attr dlm_attr_id = { 129 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR}, 130 .show = dlm_id_show, 131 .store = dlm_id_store 132 }; 133 134 static struct dlm_attr dlm_attr_nodir = { 135 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR}, 136 .show = dlm_nodir_show, 137 .store = dlm_nodir_store 138 }; 139 140 static struct dlm_attr dlm_attr_recover_status = { 141 .attr = {.name = "recover_status", .mode = S_IRUGO}, 142 .show = dlm_recover_status_show 143 }; 144 145 static struct dlm_attr dlm_attr_recover_nodeid = { 146 .attr = {.name = "recover_nodeid", .mode = S_IRUGO}, 147 .show = dlm_recover_nodeid_show 148 }; 149 150 static struct attribute *dlm_attrs[] = { 151 &dlm_attr_control.attr, 152 &dlm_attr_event.attr, 153 &dlm_attr_id.attr, 154 &dlm_attr_nodir.attr, 155 &dlm_attr_recover_status.attr, 156 &dlm_attr_recover_nodeid.attr, 157 NULL, 158 }; 159 ATTRIBUTE_GROUPS(dlm); 160 161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr, 162 char *buf) 163 { 164 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 165 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 166 return a->show ? a->show(ls, buf) : 0; 167 } 168 169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr, 170 const char *buf, size_t len) 171 { 172 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 173 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 174 return a->store ? a->store(ls, buf, len) : len; 175 } 176 177 static void lockspace_kobj_release(struct kobject *k) 178 { 179 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj); 180 kfree(ls); 181 } 182 183 static const struct sysfs_ops dlm_attr_ops = { 184 .show = dlm_attr_show, 185 .store = dlm_attr_store, 186 }; 187 188 static struct kobj_type dlm_ktype = { 189 .default_groups = dlm_groups, 190 .sysfs_ops = &dlm_attr_ops, 191 .release = lockspace_kobj_release, 192 }; 193 194 static struct kset *dlm_kset; 195 196 static int do_uevent(struct dlm_ls *ls, int in) 197 { 198 if (in) 199 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE); 200 else 201 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE); 202 203 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving"); 204 205 /* dlm_controld will see the uevent, do the necessary group management 206 and then write to sysfs to wake us */ 207 208 wait_event(ls->ls_uevent_wait, 209 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags)); 210 211 log_rinfo(ls, "group event done %d", ls->ls_uevent_result); 212 213 return ls->ls_uevent_result; 214 } 215 216 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env) 217 { 218 const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 219 220 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name); 221 return 0; 222 } 223 224 static const struct kset_uevent_ops dlm_uevent_ops = { 225 .uevent = dlm_uevent, 226 }; 227 228 int __init dlm_lockspace_init(void) 229 { 230 ls_count = 0; 231 mutex_init(&ls_lock); 232 INIT_LIST_HEAD(&lslist); 233 spin_lock_init(&lslist_lock); 234 235 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj); 236 if (!dlm_kset) { 237 printk(KERN_WARNING "%s: can not create kset\n", __func__); 238 return -ENOMEM; 239 } 240 return 0; 241 } 242 243 void dlm_lockspace_exit(void) 244 { 245 kset_unregister(dlm_kset); 246 } 247 248 struct dlm_ls *dlm_find_lockspace_global(uint32_t id) 249 { 250 struct dlm_ls *ls; 251 252 spin_lock_bh(&lslist_lock); 253 254 list_for_each_entry(ls, &lslist, ls_list) { 255 if (ls->ls_global_id == id) { 256 atomic_inc(&ls->ls_count); 257 goto out; 258 } 259 } 260 ls = NULL; 261 out: 262 spin_unlock_bh(&lslist_lock); 263 return ls; 264 } 265 266 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) 267 { 268 struct dlm_ls *ls = lockspace; 269 270 atomic_inc(&ls->ls_count); 271 return ls; 272 } 273 274 struct dlm_ls *dlm_find_lockspace_device(int minor) 275 { 276 struct dlm_ls *ls; 277 278 spin_lock_bh(&lslist_lock); 279 list_for_each_entry(ls, &lslist, ls_list) { 280 if (ls->ls_device.minor == minor) { 281 atomic_inc(&ls->ls_count); 282 goto out; 283 } 284 } 285 ls = NULL; 286 out: 287 spin_unlock_bh(&lslist_lock); 288 return ls; 289 } 290 291 void dlm_put_lockspace(struct dlm_ls *ls) 292 { 293 if (atomic_dec_and_test(&ls->ls_count)) 294 wake_up(&ls->ls_count_wait); 295 } 296 297 static void remove_lockspace(struct dlm_ls *ls) 298 { 299 retry: 300 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0); 301 302 spin_lock_bh(&lslist_lock); 303 if (atomic_read(&ls->ls_count) != 0) { 304 spin_unlock_bh(&lslist_lock); 305 goto retry; 306 } 307 308 WARN_ON(ls->ls_create_count != 0); 309 list_del(&ls->ls_list); 310 spin_unlock_bh(&lslist_lock); 311 } 312 313 static int threads_start(void) 314 { 315 int error; 316 317 /* Thread for sending/receiving messages for all lockspace's */ 318 error = dlm_midcomms_start(); 319 if (error) 320 log_print("cannot start dlm midcomms %d", error); 321 322 return error; 323 } 324 325 static int new_lockspace(const char *name, const char *cluster, 326 uint32_t flags, int lvblen, 327 const struct dlm_lockspace_ops *ops, void *ops_arg, 328 int *ops_result, dlm_lockspace_t **lockspace) 329 { 330 struct dlm_ls *ls; 331 int do_unreg = 0; 332 int namelen = strlen(name); 333 int error; 334 335 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0) 336 return -EINVAL; 337 338 if (lvblen % 8) 339 return -EINVAL; 340 341 if (!try_module_get(THIS_MODULE)) 342 return -EINVAL; 343 344 if (!dlm_user_daemon_available()) { 345 log_print("dlm user daemon not available"); 346 error = -EUNATCH; 347 goto out; 348 } 349 350 if (ops && ops_result) { 351 if (!dlm_config.ci_recover_callbacks) 352 *ops_result = -EOPNOTSUPP; 353 else 354 *ops_result = 0; 355 } 356 357 if (!cluster) 358 log_print("dlm cluster name '%s' is being used without an application provided cluster name", 359 dlm_config.ci_cluster_name); 360 361 if (dlm_config.ci_recover_callbacks && cluster && 362 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) { 363 log_print("dlm cluster name '%s' does not match " 364 "the application cluster name '%s'", 365 dlm_config.ci_cluster_name, cluster); 366 error = -EBADR; 367 goto out; 368 } 369 370 error = 0; 371 372 spin_lock_bh(&lslist_lock); 373 list_for_each_entry(ls, &lslist, ls_list) { 374 WARN_ON(ls->ls_create_count <= 0); 375 if (ls->ls_namelen != namelen) 376 continue; 377 if (memcmp(ls->ls_name, name, namelen)) 378 continue; 379 if (flags & DLM_LSFL_NEWEXCL) { 380 error = -EEXIST; 381 break; 382 } 383 ls->ls_create_count++; 384 *lockspace = ls; 385 error = 1; 386 break; 387 } 388 spin_unlock_bh(&lslist_lock); 389 390 if (error) 391 goto out; 392 393 error = -ENOMEM; 394 395 ls = kzalloc(sizeof(*ls), GFP_NOFS); 396 if (!ls) 397 goto out; 398 memcpy(ls->ls_name, name, namelen); 399 ls->ls_namelen = namelen; 400 ls->ls_lvblen = lvblen; 401 atomic_set(&ls->ls_count, 0); 402 init_waitqueue_head(&ls->ls_count_wait); 403 ls->ls_flags = 0; 404 405 if (ops && dlm_config.ci_recover_callbacks) { 406 ls->ls_ops = ops; 407 ls->ls_ops_arg = ops_arg; 408 } 409 410 if (flags & DLM_LSFL_SOFTIRQ) 411 set_bit(LSFL_SOFTIRQ, &ls->ls_flags); 412 413 /* ls_exflags are forced to match among nodes, and we don't 414 * need to require all nodes to have some flags set 415 */ 416 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL | 417 DLM_LSFL_SOFTIRQ)); 418 419 INIT_LIST_HEAD(&ls->ls_slow_inactive); 420 INIT_LIST_HEAD(&ls->ls_slow_active); 421 rwlock_init(&ls->ls_rsbtbl_lock); 422 423 error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params); 424 if (error) 425 goto out_lsfree; 426 427 xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH); 428 rwlock_init(&ls->ls_lkbxa_lock); 429 430 INIT_LIST_HEAD(&ls->ls_waiters); 431 spin_lock_init(&ls->ls_waiters_lock); 432 INIT_LIST_HEAD(&ls->ls_orphans); 433 spin_lock_init(&ls->ls_orphans_lock); 434 435 INIT_LIST_HEAD(&ls->ls_nodes); 436 INIT_LIST_HEAD(&ls->ls_nodes_gone); 437 ls->ls_num_nodes = 0; 438 ls->ls_low_nodeid = 0; 439 ls->ls_total_weight = 0; 440 ls->ls_node_array = NULL; 441 442 memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb)); 443 ls->ls_local_rsb.res_ls = ls; 444 445 ls->ls_debug_rsb_dentry = NULL; 446 ls->ls_debug_waiters_dentry = NULL; 447 448 init_waitqueue_head(&ls->ls_uevent_wait); 449 ls->ls_uevent_result = 0; 450 init_completion(&ls->ls_recovery_done); 451 ls->ls_recovery_result = -1; 452 453 spin_lock_init(&ls->ls_cb_lock); 454 INIT_LIST_HEAD(&ls->ls_cb_delay); 455 456 ls->ls_recoverd_task = NULL; 457 mutex_init(&ls->ls_recoverd_active); 458 spin_lock_init(&ls->ls_recover_lock); 459 spin_lock_init(&ls->ls_rcom_spin); 460 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t)); 461 ls->ls_recover_status = 0; 462 ls->ls_recover_seq = get_random_u64(); 463 ls->ls_recover_args = NULL; 464 init_rwsem(&ls->ls_in_recovery); 465 rwlock_init(&ls->ls_recv_active); 466 INIT_LIST_HEAD(&ls->ls_requestqueue); 467 rwlock_init(&ls->ls_requestqueue_lock); 468 spin_lock_init(&ls->ls_clear_proc_locks); 469 470 /* Due backwards compatibility with 3.1 we need to use maximum 471 * possible dlm message size to be sure the message will fit and 472 * not having out of bounds issues. However on sending side 3.2 473 * might send less. 474 */ 475 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS); 476 if (!ls->ls_recover_buf) { 477 error = -ENOMEM; 478 goto out_lkbxa; 479 } 480 481 ls->ls_slot = 0; 482 ls->ls_num_slots = 0; 483 ls->ls_slots_size = 0; 484 ls->ls_slots = NULL; 485 486 INIT_LIST_HEAD(&ls->ls_recover_list); 487 spin_lock_init(&ls->ls_recover_list_lock); 488 xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH); 489 spin_lock_init(&ls->ls_recover_xa_lock); 490 ls->ls_recover_list_count = 0; 491 init_waitqueue_head(&ls->ls_wait_general); 492 INIT_LIST_HEAD(&ls->ls_masters_list); 493 rwlock_init(&ls->ls_masters_lock); 494 INIT_LIST_HEAD(&ls->ls_dir_dump_list); 495 rwlock_init(&ls->ls_dir_dump_lock); 496 497 INIT_LIST_HEAD(&ls->ls_scan_list); 498 spin_lock_init(&ls->ls_scan_lock); 499 timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE); 500 501 spin_lock_bh(&lslist_lock); 502 ls->ls_create_count = 1; 503 list_add(&ls->ls_list, &lslist); 504 spin_unlock_bh(&lslist_lock); 505 506 if (flags & DLM_LSFL_FS) 507 set_bit(LSFL_FS, &ls->ls_flags); 508 509 error = dlm_callback_start(ls); 510 if (error) { 511 log_error(ls, "can't start dlm_callback %d", error); 512 goto out_delist; 513 } 514 515 init_waitqueue_head(&ls->ls_recover_lock_wait); 516 517 /* 518 * Once started, dlm_recoverd first looks for ls in lslist, then 519 * initializes ls_in_recovery as locked in "down" mode. We need 520 * to wait for the wakeup from dlm_recoverd because in_recovery 521 * has to start out in down mode. 522 */ 523 524 error = dlm_recoverd_start(ls); 525 if (error) { 526 log_error(ls, "can't start dlm_recoverd %d", error); 527 goto out_callback; 528 } 529 530 wait_event(ls->ls_recover_lock_wait, 531 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); 532 533 /* let kobject handle freeing of ls if there's an error */ 534 do_unreg = 1; 535 536 ls->ls_kobj.kset = dlm_kset; 537 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, 538 "%s", ls->ls_name); 539 if (error) 540 goto out_recoverd; 541 kobject_uevent(&ls->ls_kobj, KOBJ_ADD); 542 543 /* This uevent triggers dlm_controld in userspace to add us to the 544 group of nodes that are members of this lockspace (managed by the 545 cluster infrastructure.) Once it's done that, it tells us who the 546 current lockspace members are (via configfs) and then tells the 547 lockspace to start running (via sysfs) in dlm_ls_start(). */ 548 549 error = do_uevent(ls, 1); 550 if (error) 551 goto out_recoverd; 552 553 /* wait until recovery is successful or failed */ 554 wait_for_completion(&ls->ls_recovery_done); 555 error = ls->ls_recovery_result; 556 if (error) 557 goto out_members; 558 559 dlm_create_debug_file(ls); 560 561 log_rinfo(ls, "join complete"); 562 *lockspace = ls; 563 return 0; 564 565 out_members: 566 do_uevent(ls, 0); 567 dlm_clear_members(ls); 568 kfree(ls->ls_node_array); 569 out_recoverd: 570 dlm_recoverd_stop(ls); 571 out_callback: 572 dlm_callback_stop(ls); 573 out_delist: 574 spin_lock_bh(&lslist_lock); 575 list_del(&ls->ls_list); 576 spin_unlock_bh(&lslist_lock); 577 xa_destroy(&ls->ls_recover_xa); 578 kfree(ls->ls_recover_buf); 579 out_lkbxa: 580 xa_destroy(&ls->ls_lkbxa); 581 rhashtable_destroy(&ls->ls_rsbtbl); 582 out_lsfree: 583 if (do_unreg) 584 kobject_put(&ls->ls_kobj); 585 else 586 kfree(ls); 587 out: 588 module_put(THIS_MODULE); 589 return error; 590 } 591 592 static int __dlm_new_lockspace(const char *name, const char *cluster, 593 uint32_t flags, int lvblen, 594 const struct dlm_lockspace_ops *ops, 595 void *ops_arg, int *ops_result, 596 dlm_lockspace_t **lockspace) 597 { 598 int error = 0; 599 600 mutex_lock(&ls_lock); 601 if (!ls_count) 602 error = threads_start(); 603 if (error) 604 goto out; 605 606 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg, 607 ops_result, lockspace); 608 if (!error) 609 ls_count++; 610 if (error > 0) 611 error = 0; 612 if (!ls_count) { 613 dlm_midcomms_shutdown(); 614 dlm_midcomms_stop(); 615 } 616 out: 617 mutex_unlock(&ls_lock); 618 return error; 619 } 620 621 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags, 622 int lvblen, const struct dlm_lockspace_ops *ops, 623 void *ops_arg, int *ops_result, 624 dlm_lockspace_t **lockspace) 625 { 626 return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen, 627 ops, ops_arg, ops_result, lockspace); 628 } 629 630 int dlm_new_user_lockspace(const char *name, const char *cluster, 631 uint32_t flags, int lvblen, 632 const struct dlm_lockspace_ops *ops, 633 void *ops_arg, int *ops_result, 634 dlm_lockspace_t **lockspace) 635 { 636 if (flags & DLM_LSFL_SOFTIRQ) 637 return -EINVAL; 638 639 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops, 640 ops_arg, ops_result, lockspace); 641 } 642 643 static int lkb_idr_free(struct dlm_lkb *lkb) 644 { 645 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) 646 dlm_free_lvb(lkb->lkb_lvbptr); 647 648 dlm_free_lkb(lkb); 649 return 0; 650 } 651 652 /* NOTE: We check the lkbxa here rather than the resource table. 653 This is because there may be LKBs queued as ASTs that have been unlinked 654 from their RSBs and are pending deletion once the AST has been delivered */ 655 656 static int lockspace_busy(struct dlm_ls *ls, int force) 657 { 658 struct dlm_lkb *lkb; 659 unsigned long id; 660 int rv = 0; 661 662 read_lock_bh(&ls->ls_lkbxa_lock); 663 if (force == 0) { 664 xa_for_each(&ls->ls_lkbxa, id, lkb) { 665 rv = 1; 666 break; 667 } 668 } else if (force == 1) { 669 xa_for_each(&ls->ls_lkbxa, id, lkb) { 670 if (lkb->lkb_nodeid == 0 && 671 lkb->lkb_grmode != DLM_LOCK_IV) { 672 rv = 1; 673 break; 674 } 675 } 676 } else { 677 rv = 0; 678 } 679 read_unlock_bh(&ls->ls_lkbxa_lock); 680 return rv; 681 } 682 683 static void rhash_free_rsb(void *ptr, void *arg) 684 { 685 struct dlm_rsb *rsb = ptr; 686 687 dlm_free_rsb(rsb); 688 } 689 690 static int release_lockspace(struct dlm_ls *ls, int force) 691 { 692 struct dlm_lkb *lkb; 693 unsigned long id; 694 int busy, rv; 695 696 busy = lockspace_busy(ls, force); 697 698 spin_lock_bh(&lslist_lock); 699 if (ls->ls_create_count == 1) { 700 if (busy) { 701 rv = -EBUSY; 702 } else { 703 /* remove_lockspace takes ls off lslist */ 704 ls->ls_create_count = 0; 705 rv = 0; 706 } 707 } else if (ls->ls_create_count > 1) { 708 rv = --ls->ls_create_count; 709 } else { 710 rv = -EINVAL; 711 } 712 spin_unlock_bh(&lslist_lock); 713 714 if (rv) { 715 log_debug(ls, "release_lockspace no remove %d", rv); 716 return rv; 717 } 718 719 if (ls_count == 1) 720 dlm_midcomms_version_wait(); 721 722 dlm_device_deregister(ls); 723 724 if (force < 3 && dlm_user_daemon_available()) 725 do_uevent(ls, 0); 726 727 dlm_recoverd_stop(ls); 728 729 /* clear the LSFL_RUNNING flag to fast up 730 * time_shutdown_sync(), we don't care anymore 731 */ 732 clear_bit(LSFL_RUNNING, &ls->ls_flags); 733 timer_shutdown_sync(&ls->ls_scan_timer); 734 735 if (ls_count == 1) { 736 dlm_clear_members(ls); 737 dlm_midcomms_shutdown(); 738 } 739 740 dlm_callback_stop(ls); 741 742 remove_lockspace(ls); 743 744 dlm_delete_debug_file(ls); 745 746 xa_destroy(&ls->ls_recover_xa); 747 kfree(ls->ls_recover_buf); 748 749 /* 750 * Free all lkb's in xa 751 */ 752 xa_for_each(&ls->ls_lkbxa, id, lkb) { 753 lkb_idr_free(lkb); 754 } 755 xa_destroy(&ls->ls_lkbxa); 756 757 /* 758 * Free all rsb's on rsbtbl 759 */ 760 rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL); 761 762 /* 763 * Free structures on any other lists 764 */ 765 766 dlm_purge_requestqueue(ls); 767 kfree(ls->ls_recover_args); 768 dlm_clear_members(ls); 769 dlm_clear_members_gone(ls); 770 kfree(ls->ls_node_array); 771 log_rinfo(ls, "release_lockspace final free"); 772 kobject_put(&ls->ls_kobj); 773 /* The ls structure will be freed when the kobject is done with */ 774 775 module_put(THIS_MODULE); 776 return 0; 777 } 778 779 /* 780 * Called when a system has released all its locks and is not going to use the 781 * lockspace any longer. We free everything we're managing for this lockspace. 782 * Remaining nodes will go through the recovery process as if we'd died. The 783 * lockspace must continue to function as usual, participating in recoveries, 784 * until this returns. 785 * 786 * Force has 4 possible values: 787 * 0 - don't destroy lockspace if it has any LKBs 788 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs 789 * 2 - destroy lockspace regardless of LKBs 790 * 3 - destroy lockspace as part of a forced shutdown 791 */ 792 793 int dlm_release_lockspace(void *lockspace, int force) 794 { 795 struct dlm_ls *ls; 796 int error; 797 798 ls = dlm_find_lockspace_local(lockspace); 799 if (!ls) 800 return -EINVAL; 801 dlm_put_lockspace(ls); 802 803 mutex_lock(&ls_lock); 804 error = release_lockspace(ls, force); 805 if (!error) 806 ls_count--; 807 if (!ls_count) 808 dlm_midcomms_stop(); 809 mutex_unlock(&ls_lock); 810 811 return error; 812 } 813 814 void dlm_stop_lockspaces(void) 815 { 816 struct dlm_ls *ls; 817 int count; 818 819 restart: 820 count = 0; 821 spin_lock_bh(&lslist_lock); 822 list_for_each_entry(ls, &lslist, ls_list) { 823 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) { 824 count++; 825 continue; 826 } 827 spin_unlock_bh(&lslist_lock); 828 log_error(ls, "no userland control daemon, stopping lockspace"); 829 dlm_ls_stop(ls); 830 goto restart; 831 } 832 spin_unlock_bh(&lslist_lock); 833 834 if (count) 835 log_print("dlm user daemon left %d lockspaces", count); 836 } 837