1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include <linux/module.h> 13 14 #include "dlm_internal.h" 15 #include "lockspace.h" 16 #include "member.h" 17 #include "recoverd.h" 18 #include "dir.h" 19 #include "midcomms.h" 20 #include "config.h" 21 #include "memory.h" 22 #include "lock.h" 23 #include "recover.h" 24 #include "requestqueue.h" 25 #include "user.h" 26 #include "ast.h" 27 28 static int ls_count; 29 static struct mutex ls_lock; 30 static struct list_head lslist; 31 static spinlock_t lslist_lock; 32 33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) 34 { 35 ssize_t ret = len; 36 int n; 37 int rc = kstrtoint(buf, 0, &n); 38 39 if (rc) 40 return rc; 41 ls = dlm_find_lockspace_local(ls); 42 if (!ls) 43 return -EINVAL; 44 45 switch (n) { 46 case 0: 47 dlm_ls_stop(ls); 48 break; 49 case 1: 50 dlm_ls_start(ls); 51 break; 52 default: 53 ret = -EINVAL; 54 } 55 dlm_put_lockspace(ls); 56 return ret; 57 } 58 59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len) 60 { 61 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result); 62 63 if (rc) 64 return rc; 65 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags); 66 wake_up(&ls->ls_uevent_wait); 67 return len; 68 } 69 70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf) 71 { 72 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id); 73 } 74 75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) 76 { 77 int rc = kstrtouint(buf, 0, &ls->ls_global_id); 78 79 if (rc) 80 return rc; 81 return len; 82 } 83 84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf) 85 { 86 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls)); 87 } 88 89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len) 90 { 91 int val; 92 int rc = kstrtoint(buf, 0, &val); 93 94 if (rc) 95 return rc; 96 if (val == 1) 97 set_bit(LSFL_NODIR, &ls->ls_flags); 98 return len; 99 } 100 101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf) 102 { 103 uint32_t status = dlm_recover_status(ls); 104 return snprintf(buf, PAGE_SIZE, "%x\n", status); 105 } 106 107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf) 108 { 109 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid); 110 } 111 112 struct dlm_attr { 113 struct attribute attr; 114 ssize_t (*show)(struct dlm_ls *, char *); 115 ssize_t (*store)(struct dlm_ls *, const char *, size_t); 116 }; 117 118 static struct dlm_attr dlm_attr_control = { 119 .attr = {.name = "control", .mode = S_IWUSR}, 120 .store = dlm_control_store 121 }; 122 123 static struct dlm_attr dlm_attr_event = { 124 .attr = {.name = "event_done", .mode = S_IWUSR}, 125 .store = dlm_event_store 126 }; 127 128 static struct dlm_attr dlm_attr_id = { 129 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR}, 130 .show = dlm_id_show, 131 .store = dlm_id_store 132 }; 133 134 static struct dlm_attr dlm_attr_nodir = { 135 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR}, 136 .show = dlm_nodir_show, 137 .store = dlm_nodir_store 138 }; 139 140 static struct dlm_attr dlm_attr_recover_status = { 141 .attr = {.name = "recover_status", .mode = S_IRUGO}, 142 .show = dlm_recover_status_show 143 }; 144 145 static struct dlm_attr dlm_attr_recover_nodeid = { 146 .attr = {.name = "recover_nodeid", .mode = S_IRUGO}, 147 .show = dlm_recover_nodeid_show 148 }; 149 150 static struct attribute *dlm_attrs[] = { 151 &dlm_attr_control.attr, 152 &dlm_attr_event.attr, 153 &dlm_attr_id.attr, 154 &dlm_attr_nodir.attr, 155 &dlm_attr_recover_status.attr, 156 &dlm_attr_recover_nodeid.attr, 157 NULL, 158 }; 159 ATTRIBUTE_GROUPS(dlm); 160 161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr, 162 char *buf) 163 { 164 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 165 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 166 return a->show ? a->show(ls, buf) : 0; 167 } 168 169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr, 170 const char *buf, size_t len) 171 { 172 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 173 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 174 return a->store ? a->store(ls, buf, len) : len; 175 } 176 177 static const struct sysfs_ops dlm_attr_ops = { 178 .show = dlm_attr_show, 179 .store = dlm_attr_store, 180 }; 181 182 static struct kobj_type dlm_ktype = { 183 .default_groups = dlm_groups, 184 .sysfs_ops = &dlm_attr_ops, 185 }; 186 187 static struct kset *dlm_kset; 188 189 static int do_uevent(struct dlm_ls *ls, int in) 190 { 191 if (in) 192 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE); 193 else 194 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE); 195 196 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving"); 197 198 /* dlm_controld will see the uevent, do the necessary group management 199 and then write to sysfs to wake us */ 200 201 wait_event(ls->ls_uevent_wait, 202 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags)); 203 204 log_rinfo(ls, "group event done %d", ls->ls_uevent_result); 205 206 return ls->ls_uevent_result; 207 } 208 209 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env) 210 { 211 const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 212 213 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name); 214 return 0; 215 } 216 217 static const struct kset_uevent_ops dlm_uevent_ops = { 218 .uevent = dlm_uevent, 219 }; 220 221 int __init dlm_lockspace_init(void) 222 { 223 ls_count = 0; 224 mutex_init(&ls_lock); 225 INIT_LIST_HEAD(&lslist); 226 spin_lock_init(&lslist_lock); 227 228 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj); 229 if (!dlm_kset) { 230 printk(KERN_WARNING "%s: can not create kset\n", __func__); 231 return -ENOMEM; 232 } 233 return 0; 234 } 235 236 void dlm_lockspace_exit(void) 237 { 238 kset_unregister(dlm_kset); 239 } 240 241 struct dlm_ls *dlm_find_lockspace_global(uint32_t id) 242 { 243 struct dlm_ls *ls; 244 245 spin_lock_bh(&lslist_lock); 246 247 list_for_each_entry(ls, &lslist, ls_list) { 248 if (ls->ls_global_id == id) { 249 atomic_inc(&ls->ls_count); 250 goto out; 251 } 252 } 253 ls = NULL; 254 out: 255 spin_unlock_bh(&lslist_lock); 256 return ls; 257 } 258 259 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) 260 { 261 struct dlm_ls *ls = lockspace; 262 263 atomic_inc(&ls->ls_count); 264 return ls; 265 } 266 267 struct dlm_ls *dlm_find_lockspace_device(int minor) 268 { 269 struct dlm_ls *ls; 270 271 spin_lock_bh(&lslist_lock); 272 list_for_each_entry(ls, &lslist, ls_list) { 273 if (ls->ls_device.minor == minor) { 274 atomic_inc(&ls->ls_count); 275 goto out; 276 } 277 } 278 ls = NULL; 279 out: 280 spin_unlock_bh(&lslist_lock); 281 return ls; 282 } 283 284 void dlm_put_lockspace(struct dlm_ls *ls) 285 { 286 if (atomic_dec_and_test(&ls->ls_count)) 287 wake_up(&ls->ls_count_wait); 288 } 289 290 static void remove_lockspace(struct dlm_ls *ls) 291 { 292 retry: 293 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0); 294 295 spin_lock_bh(&lslist_lock); 296 if (atomic_read(&ls->ls_count) != 0) { 297 spin_unlock_bh(&lslist_lock); 298 goto retry; 299 } 300 301 WARN_ON(ls->ls_create_count != 0); 302 list_del(&ls->ls_list); 303 spin_unlock_bh(&lslist_lock); 304 } 305 306 static int threads_start(void) 307 { 308 int error; 309 310 /* Thread for sending/receiving messages for all lockspace's */ 311 error = dlm_midcomms_start(); 312 if (error) 313 log_print("cannot start dlm midcomms %d", error); 314 315 return error; 316 } 317 318 static int lkb_idr_free(struct dlm_lkb *lkb) 319 { 320 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) 321 dlm_free_lvb(lkb->lkb_lvbptr); 322 323 dlm_free_lkb(lkb); 324 return 0; 325 } 326 327 static void rhash_free_rsb(void *ptr, void *arg) 328 { 329 struct dlm_rsb *rsb = ptr; 330 331 dlm_free_rsb(rsb); 332 } 333 334 static void free_lockspace(struct work_struct *work) 335 { 336 struct dlm_ls *ls = container_of(work, struct dlm_ls, ls_free_work); 337 struct dlm_lkb *lkb; 338 unsigned long id; 339 340 /* 341 * Free all lkb's in xa 342 */ 343 xa_for_each(&ls->ls_lkbxa, id, lkb) { 344 lkb_idr_free(lkb); 345 } 346 xa_destroy(&ls->ls_lkbxa); 347 348 /* 349 * Free all rsb's on rsbtbl 350 */ 351 rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL); 352 353 kfree(ls); 354 } 355 356 static int new_lockspace(const char *name, const char *cluster, 357 uint32_t flags, int lvblen, 358 const struct dlm_lockspace_ops *ops, void *ops_arg, 359 int *ops_result, dlm_lockspace_t **lockspace) 360 { 361 struct dlm_ls *ls; 362 int namelen = strlen(name); 363 int error; 364 365 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0) 366 return -EINVAL; 367 368 if (lvblen % 8) 369 return -EINVAL; 370 371 if (!try_module_get(THIS_MODULE)) 372 return -EINVAL; 373 374 if (!dlm_user_daemon_available()) { 375 log_print("dlm user daemon not available"); 376 error = -EUNATCH; 377 goto out; 378 } 379 380 if (ops && ops_result) { 381 if (!dlm_config.ci_recover_callbacks) 382 *ops_result = -EOPNOTSUPP; 383 else 384 *ops_result = 0; 385 } 386 387 if (!cluster) 388 log_print("dlm cluster name '%s' is being used without an application provided cluster name", 389 dlm_config.ci_cluster_name); 390 391 if (dlm_config.ci_recover_callbacks && cluster && 392 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) { 393 log_print("dlm cluster name '%s' does not match " 394 "the application cluster name '%s'", 395 dlm_config.ci_cluster_name, cluster); 396 error = -EBADR; 397 goto out; 398 } 399 400 error = 0; 401 402 spin_lock_bh(&lslist_lock); 403 list_for_each_entry(ls, &lslist, ls_list) { 404 WARN_ON(ls->ls_create_count <= 0); 405 if (ls->ls_namelen != namelen) 406 continue; 407 if (memcmp(ls->ls_name, name, namelen)) 408 continue; 409 if (flags & DLM_LSFL_NEWEXCL) { 410 error = -EEXIST; 411 break; 412 } 413 ls->ls_create_count++; 414 *lockspace = ls; 415 error = 1; 416 break; 417 } 418 spin_unlock_bh(&lslist_lock); 419 420 if (error) 421 goto out; 422 423 error = -ENOMEM; 424 425 ls = kzalloc(sizeof(*ls), GFP_NOFS); 426 if (!ls) 427 goto out; 428 memcpy(ls->ls_name, name, namelen); 429 ls->ls_namelen = namelen; 430 ls->ls_lvblen = lvblen; 431 atomic_set(&ls->ls_count, 0); 432 init_waitqueue_head(&ls->ls_count_wait); 433 ls->ls_flags = 0; 434 435 if (ops && dlm_config.ci_recover_callbacks) { 436 ls->ls_ops = ops; 437 ls->ls_ops_arg = ops_arg; 438 } 439 440 if (flags & DLM_LSFL_SOFTIRQ) 441 set_bit(LSFL_SOFTIRQ, &ls->ls_flags); 442 443 /* ls_exflags are forced to match among nodes, and we don't 444 * need to require all nodes to have some flags set 445 */ 446 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL | 447 DLM_LSFL_SOFTIRQ)); 448 449 INIT_LIST_HEAD(&ls->ls_slow_inactive); 450 INIT_LIST_HEAD(&ls->ls_slow_active); 451 rwlock_init(&ls->ls_rsbtbl_lock); 452 453 error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params); 454 if (error) 455 goto out_lsfree; 456 457 xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH); 458 rwlock_init(&ls->ls_lkbxa_lock); 459 460 INIT_LIST_HEAD(&ls->ls_waiters); 461 spin_lock_init(&ls->ls_waiters_lock); 462 INIT_LIST_HEAD(&ls->ls_orphans); 463 spin_lock_init(&ls->ls_orphans_lock); 464 465 INIT_LIST_HEAD(&ls->ls_nodes); 466 INIT_LIST_HEAD(&ls->ls_nodes_gone); 467 ls->ls_num_nodes = 0; 468 ls->ls_low_nodeid = 0; 469 ls->ls_total_weight = 0; 470 ls->ls_node_array = NULL; 471 472 memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb)); 473 ls->ls_local_rsb.res_ls = ls; 474 475 ls->ls_debug_rsb_dentry = NULL; 476 ls->ls_debug_waiters_dentry = NULL; 477 478 init_waitqueue_head(&ls->ls_uevent_wait); 479 ls->ls_uevent_result = 0; 480 init_completion(&ls->ls_recovery_done); 481 ls->ls_recovery_result = -1; 482 483 spin_lock_init(&ls->ls_cb_lock); 484 INIT_LIST_HEAD(&ls->ls_cb_delay); 485 486 INIT_WORK(&ls->ls_free_work, free_lockspace); 487 488 ls->ls_recoverd_task = NULL; 489 mutex_init(&ls->ls_recoverd_active); 490 spin_lock_init(&ls->ls_recover_lock); 491 spin_lock_init(&ls->ls_rcom_spin); 492 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t)); 493 ls->ls_recover_status = 0; 494 ls->ls_recover_seq = get_random_u64(); 495 ls->ls_recover_args = NULL; 496 init_rwsem(&ls->ls_in_recovery); 497 rwlock_init(&ls->ls_recv_active); 498 INIT_LIST_HEAD(&ls->ls_requestqueue); 499 rwlock_init(&ls->ls_requestqueue_lock); 500 spin_lock_init(&ls->ls_clear_proc_locks); 501 502 /* Due backwards compatibility with 3.1 we need to use maximum 503 * possible dlm message size to be sure the message will fit and 504 * not having out of bounds issues. However on sending side 3.2 505 * might send less. 506 */ 507 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS); 508 if (!ls->ls_recover_buf) { 509 error = -ENOMEM; 510 goto out_lkbxa; 511 } 512 513 ls->ls_slot = 0; 514 ls->ls_num_slots = 0; 515 ls->ls_slots_size = 0; 516 ls->ls_slots = NULL; 517 518 INIT_LIST_HEAD(&ls->ls_recover_list); 519 spin_lock_init(&ls->ls_recover_list_lock); 520 xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH); 521 spin_lock_init(&ls->ls_recover_xa_lock); 522 ls->ls_recover_list_count = 0; 523 init_waitqueue_head(&ls->ls_wait_general); 524 INIT_LIST_HEAD(&ls->ls_masters_list); 525 rwlock_init(&ls->ls_masters_lock); 526 INIT_LIST_HEAD(&ls->ls_dir_dump_list); 527 rwlock_init(&ls->ls_dir_dump_lock); 528 529 INIT_LIST_HEAD(&ls->ls_scan_list); 530 spin_lock_init(&ls->ls_scan_lock); 531 timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE); 532 533 spin_lock_bh(&lslist_lock); 534 ls->ls_create_count = 1; 535 list_add(&ls->ls_list, &lslist); 536 spin_unlock_bh(&lslist_lock); 537 538 if (flags & DLM_LSFL_FS) 539 set_bit(LSFL_FS, &ls->ls_flags); 540 541 error = dlm_callback_start(ls); 542 if (error) { 543 log_error(ls, "can't start dlm_callback %d", error); 544 goto out_delist; 545 } 546 547 init_waitqueue_head(&ls->ls_recover_lock_wait); 548 549 /* 550 * Once started, dlm_recoverd first looks for ls in lslist, then 551 * initializes ls_in_recovery as locked in "down" mode. We need 552 * to wait for the wakeup from dlm_recoverd because in_recovery 553 * has to start out in down mode. 554 */ 555 556 error = dlm_recoverd_start(ls); 557 if (error) { 558 log_error(ls, "can't start dlm_recoverd %d", error); 559 goto out_callback; 560 } 561 562 wait_event(ls->ls_recover_lock_wait, 563 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); 564 565 ls->ls_kobj.kset = dlm_kset; 566 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, 567 "%s", ls->ls_name); 568 if (error) 569 goto out_recoverd; 570 kobject_uevent(&ls->ls_kobj, KOBJ_ADD); 571 572 /* This uevent triggers dlm_controld in userspace to add us to the 573 group of nodes that are members of this lockspace (managed by the 574 cluster infrastructure.) Once it's done that, it tells us who the 575 current lockspace members are (via configfs) and then tells the 576 lockspace to start running (via sysfs) in dlm_ls_start(). */ 577 578 error = do_uevent(ls, 1); 579 if (error) 580 goto out_recoverd; 581 582 /* wait until recovery is successful or failed */ 583 wait_for_completion(&ls->ls_recovery_done); 584 error = ls->ls_recovery_result; 585 if (error) 586 goto out_members; 587 588 dlm_create_debug_file(ls); 589 590 log_rinfo(ls, "join complete"); 591 *lockspace = ls; 592 return 0; 593 594 out_members: 595 do_uevent(ls, 0); 596 dlm_clear_members(ls); 597 kfree(ls->ls_node_array); 598 out_recoverd: 599 dlm_recoverd_stop(ls); 600 out_callback: 601 dlm_callback_stop(ls); 602 out_delist: 603 spin_lock_bh(&lslist_lock); 604 list_del(&ls->ls_list); 605 spin_unlock_bh(&lslist_lock); 606 xa_destroy(&ls->ls_recover_xa); 607 kfree(ls->ls_recover_buf); 608 out_lkbxa: 609 xa_destroy(&ls->ls_lkbxa); 610 rhashtable_destroy(&ls->ls_rsbtbl); 611 out_lsfree: 612 kobject_put(&ls->ls_kobj); 613 kfree(ls); 614 out: 615 module_put(THIS_MODULE); 616 return error; 617 } 618 619 static int __dlm_new_lockspace(const char *name, const char *cluster, 620 uint32_t flags, int lvblen, 621 const struct dlm_lockspace_ops *ops, 622 void *ops_arg, int *ops_result, 623 dlm_lockspace_t **lockspace) 624 { 625 int error = 0; 626 627 mutex_lock(&ls_lock); 628 if (!ls_count) 629 error = threads_start(); 630 if (error) 631 goto out; 632 633 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg, 634 ops_result, lockspace); 635 if (!error) 636 ls_count++; 637 if (error > 0) 638 error = 0; 639 if (!ls_count) { 640 dlm_midcomms_shutdown(); 641 dlm_midcomms_stop(); 642 } 643 out: 644 mutex_unlock(&ls_lock); 645 return error; 646 } 647 648 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags, 649 int lvblen, const struct dlm_lockspace_ops *ops, 650 void *ops_arg, int *ops_result, 651 dlm_lockspace_t **lockspace) 652 { 653 return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen, 654 ops, ops_arg, ops_result, lockspace); 655 } 656 657 int dlm_new_user_lockspace(const char *name, const char *cluster, 658 uint32_t flags, int lvblen, 659 const struct dlm_lockspace_ops *ops, 660 void *ops_arg, int *ops_result, 661 dlm_lockspace_t **lockspace) 662 { 663 if (flags & DLM_LSFL_SOFTIRQ) 664 return -EINVAL; 665 666 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops, 667 ops_arg, ops_result, lockspace); 668 } 669 670 /* NOTE: We check the lkbxa here rather than the resource table. 671 This is because there may be LKBs queued as ASTs that have been unlinked 672 from their RSBs and are pending deletion once the AST has been delivered */ 673 674 static int lockspace_busy(struct dlm_ls *ls, int force) 675 { 676 struct dlm_lkb *lkb; 677 unsigned long id; 678 int rv = 0; 679 680 read_lock_bh(&ls->ls_lkbxa_lock); 681 if (force == 0) { 682 xa_for_each(&ls->ls_lkbxa, id, lkb) { 683 rv = 1; 684 break; 685 } 686 } else if (force == 1) { 687 xa_for_each(&ls->ls_lkbxa, id, lkb) { 688 if (lkb->lkb_nodeid == 0 && 689 lkb->lkb_grmode != DLM_LOCK_IV) { 690 rv = 1; 691 break; 692 } 693 } 694 } else { 695 rv = 0; 696 } 697 read_unlock_bh(&ls->ls_lkbxa_lock); 698 return rv; 699 } 700 701 static int release_lockspace(struct dlm_ls *ls, int force) 702 { 703 int busy, rv; 704 705 busy = lockspace_busy(ls, force); 706 707 spin_lock_bh(&lslist_lock); 708 if (ls->ls_create_count == 1) { 709 if (busy) { 710 rv = -EBUSY; 711 } else { 712 /* remove_lockspace takes ls off lslist */ 713 ls->ls_create_count = 0; 714 rv = 0; 715 } 716 } else if (ls->ls_create_count > 1) { 717 rv = --ls->ls_create_count; 718 } else { 719 rv = -EINVAL; 720 } 721 spin_unlock_bh(&lslist_lock); 722 723 if (rv) { 724 log_debug(ls, "release_lockspace no remove %d", rv); 725 return rv; 726 } 727 728 if (ls_count == 1) 729 dlm_midcomms_version_wait(); 730 731 dlm_device_deregister(ls); 732 733 if (force < 3 && dlm_user_daemon_available()) 734 do_uevent(ls, 0); 735 736 dlm_recoverd_stop(ls); 737 738 /* clear the LSFL_RUNNING flag to fast up 739 * time_shutdown_sync(), we don't care anymore 740 */ 741 clear_bit(LSFL_RUNNING, &ls->ls_flags); 742 timer_shutdown_sync(&ls->ls_scan_timer); 743 744 if (ls_count == 1) { 745 dlm_clear_members(ls); 746 dlm_midcomms_shutdown(); 747 } 748 749 dlm_callback_stop(ls); 750 751 remove_lockspace(ls); 752 753 dlm_delete_debug_file(ls); 754 755 kobject_put(&ls->ls_kobj); 756 757 xa_destroy(&ls->ls_recover_xa); 758 kfree(ls->ls_recover_buf); 759 760 /* 761 * Free structures on any other lists 762 */ 763 764 dlm_purge_requestqueue(ls); 765 kfree(ls->ls_recover_args); 766 dlm_clear_members(ls); 767 dlm_clear_members_gone(ls); 768 kfree(ls->ls_node_array); 769 770 log_rinfo(ls, "%s final free", __func__); 771 772 /* delayed free of data structures see free_lockspace() */ 773 queue_work(dlm_wq, &ls->ls_free_work); 774 module_put(THIS_MODULE); 775 return 0; 776 } 777 778 /* 779 * Called when a system has released all its locks and is not going to use the 780 * lockspace any longer. We free everything we're managing for this lockspace. 781 * Remaining nodes will go through the recovery process as if we'd died. The 782 * lockspace must continue to function as usual, participating in recoveries, 783 * until this returns. 784 * 785 * Force has 4 possible values: 786 * 0 - don't destroy lockspace if it has any LKBs 787 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs 788 * 2 - destroy lockspace regardless of LKBs 789 * 3 - destroy lockspace as part of a forced shutdown 790 */ 791 792 int dlm_release_lockspace(void *lockspace, int force) 793 { 794 struct dlm_ls *ls; 795 int error; 796 797 ls = dlm_find_lockspace_local(lockspace); 798 if (!ls) 799 return -EINVAL; 800 dlm_put_lockspace(ls); 801 802 mutex_lock(&ls_lock); 803 error = release_lockspace(ls, force); 804 if (!error) 805 ls_count--; 806 if (!ls_count) 807 dlm_midcomms_stop(); 808 mutex_unlock(&ls_lock); 809 810 return error; 811 } 812 813 void dlm_stop_lockspaces(void) 814 { 815 struct dlm_ls *ls; 816 int count; 817 818 restart: 819 count = 0; 820 spin_lock_bh(&lslist_lock); 821 list_for_each_entry(ls, &lslist, ls_list) { 822 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) { 823 count++; 824 continue; 825 } 826 spin_unlock_bh(&lslist_lock); 827 log_error(ls, "no userland control daemon, stopping lockspace"); 828 dlm_ls_stop(ls); 829 goto restart; 830 } 831 spin_unlock_bh(&lslist_lock); 832 833 if (count) 834 log_print("dlm user daemon left %d lockspaces", count); 835 } 836