1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include <linux/module.h> 13 14 #include "dlm_internal.h" 15 #include "lockspace.h" 16 #include "member.h" 17 #include "recoverd.h" 18 #include "dir.h" 19 #include "midcomms.h" 20 #include "config.h" 21 #include "memory.h" 22 #include "lock.h" 23 #include "recover.h" 24 #include "requestqueue.h" 25 #include "user.h" 26 #include "ast.h" 27 28 static int ls_count; 29 static struct mutex ls_lock; 30 static struct list_head lslist; 31 static spinlock_t lslist_lock; 32 33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) 34 { 35 ssize_t ret = len; 36 int n; 37 int rc = kstrtoint(buf, 0, &n); 38 39 if (rc) 40 return rc; 41 ls = dlm_find_lockspace_local(ls->ls_local_handle); 42 if (!ls) 43 return -EINVAL; 44 45 switch (n) { 46 case 0: 47 dlm_ls_stop(ls); 48 break; 49 case 1: 50 dlm_ls_start(ls); 51 break; 52 default: 53 ret = -EINVAL; 54 } 55 dlm_put_lockspace(ls); 56 return ret; 57 } 58 59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len) 60 { 61 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result); 62 63 if (rc) 64 return rc; 65 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags); 66 wake_up(&ls->ls_uevent_wait); 67 return len; 68 } 69 70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf) 71 { 72 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id); 73 } 74 75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) 76 { 77 int rc = kstrtouint(buf, 0, &ls->ls_global_id); 78 79 if (rc) 80 return rc; 81 return len; 82 } 83 84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf) 85 { 86 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls)); 87 } 88 89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len) 90 { 91 int val; 92 int rc = kstrtoint(buf, 0, &val); 93 94 if (rc) 95 return rc; 96 if (val == 1) 97 set_bit(LSFL_NODIR, &ls->ls_flags); 98 return len; 99 } 100 101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf) 102 { 103 uint32_t status = dlm_recover_status(ls); 104 return snprintf(buf, PAGE_SIZE, "%x\n", status); 105 } 106 107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf) 108 { 109 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid); 110 } 111 112 struct dlm_attr { 113 struct attribute attr; 114 ssize_t (*show)(struct dlm_ls *, char *); 115 ssize_t (*store)(struct dlm_ls *, const char *, size_t); 116 }; 117 118 static struct dlm_attr dlm_attr_control = { 119 .attr = {.name = "control", .mode = S_IWUSR}, 120 .store = dlm_control_store 121 }; 122 123 static struct dlm_attr dlm_attr_event = { 124 .attr = {.name = "event_done", .mode = S_IWUSR}, 125 .store = dlm_event_store 126 }; 127 128 static struct dlm_attr dlm_attr_id = { 129 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR}, 130 .show = dlm_id_show, 131 .store = dlm_id_store 132 }; 133 134 static struct dlm_attr dlm_attr_nodir = { 135 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR}, 136 .show = dlm_nodir_show, 137 .store = dlm_nodir_store 138 }; 139 140 static struct dlm_attr dlm_attr_recover_status = { 141 .attr = {.name = "recover_status", .mode = S_IRUGO}, 142 .show = dlm_recover_status_show 143 }; 144 145 static struct dlm_attr dlm_attr_recover_nodeid = { 146 .attr = {.name = "recover_nodeid", .mode = S_IRUGO}, 147 .show = dlm_recover_nodeid_show 148 }; 149 150 static struct attribute *dlm_attrs[] = { 151 &dlm_attr_control.attr, 152 &dlm_attr_event.attr, 153 &dlm_attr_id.attr, 154 &dlm_attr_nodir.attr, 155 &dlm_attr_recover_status.attr, 156 &dlm_attr_recover_nodeid.attr, 157 NULL, 158 }; 159 ATTRIBUTE_GROUPS(dlm); 160 161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr, 162 char *buf) 163 { 164 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 165 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 166 return a->show ? a->show(ls, buf) : 0; 167 } 168 169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr, 170 const char *buf, size_t len) 171 { 172 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 173 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 174 return a->store ? a->store(ls, buf, len) : len; 175 } 176 177 static void lockspace_kobj_release(struct kobject *k) 178 { 179 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj); 180 kfree(ls); 181 } 182 183 static const struct sysfs_ops dlm_attr_ops = { 184 .show = dlm_attr_show, 185 .store = dlm_attr_store, 186 }; 187 188 static struct kobj_type dlm_ktype = { 189 .default_groups = dlm_groups, 190 .sysfs_ops = &dlm_attr_ops, 191 .release = lockspace_kobj_release, 192 }; 193 194 static struct kset *dlm_kset; 195 196 static int do_uevent(struct dlm_ls *ls, int in) 197 { 198 if (in) 199 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE); 200 else 201 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE); 202 203 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving"); 204 205 /* dlm_controld will see the uevent, do the necessary group management 206 and then write to sysfs to wake us */ 207 208 wait_event(ls->ls_uevent_wait, 209 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags)); 210 211 log_rinfo(ls, "group event done %d", ls->ls_uevent_result); 212 213 return ls->ls_uevent_result; 214 } 215 216 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env) 217 { 218 const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 219 220 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name); 221 return 0; 222 } 223 224 static const struct kset_uevent_ops dlm_uevent_ops = { 225 .uevent = dlm_uevent, 226 }; 227 228 int __init dlm_lockspace_init(void) 229 { 230 ls_count = 0; 231 mutex_init(&ls_lock); 232 INIT_LIST_HEAD(&lslist); 233 spin_lock_init(&lslist_lock); 234 235 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj); 236 if (!dlm_kset) { 237 printk(KERN_WARNING "%s: can not create kset\n", __func__); 238 return -ENOMEM; 239 } 240 return 0; 241 } 242 243 void dlm_lockspace_exit(void) 244 { 245 kset_unregister(dlm_kset); 246 } 247 248 struct dlm_ls *dlm_find_lockspace_global(uint32_t id) 249 { 250 struct dlm_ls *ls; 251 252 spin_lock_bh(&lslist_lock); 253 254 list_for_each_entry(ls, &lslist, ls_list) { 255 if (ls->ls_global_id == id) { 256 atomic_inc(&ls->ls_count); 257 goto out; 258 } 259 } 260 ls = NULL; 261 out: 262 spin_unlock_bh(&lslist_lock); 263 return ls; 264 } 265 266 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) 267 { 268 struct dlm_ls *ls; 269 270 spin_lock_bh(&lslist_lock); 271 list_for_each_entry(ls, &lslist, ls_list) { 272 if (ls->ls_local_handle == lockspace) { 273 atomic_inc(&ls->ls_count); 274 goto out; 275 } 276 } 277 ls = NULL; 278 out: 279 spin_unlock_bh(&lslist_lock); 280 return ls; 281 } 282 283 struct dlm_ls *dlm_find_lockspace_device(int minor) 284 { 285 struct dlm_ls *ls; 286 287 spin_lock_bh(&lslist_lock); 288 list_for_each_entry(ls, &lslist, ls_list) { 289 if (ls->ls_device.minor == minor) { 290 atomic_inc(&ls->ls_count); 291 goto out; 292 } 293 } 294 ls = NULL; 295 out: 296 spin_unlock_bh(&lslist_lock); 297 return ls; 298 } 299 300 void dlm_put_lockspace(struct dlm_ls *ls) 301 { 302 if (atomic_dec_and_test(&ls->ls_count)) 303 wake_up(&ls->ls_count_wait); 304 } 305 306 static void remove_lockspace(struct dlm_ls *ls) 307 { 308 retry: 309 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0); 310 311 spin_lock_bh(&lslist_lock); 312 if (atomic_read(&ls->ls_count) != 0) { 313 spin_unlock_bh(&lslist_lock); 314 goto retry; 315 } 316 317 WARN_ON(ls->ls_create_count != 0); 318 list_del(&ls->ls_list); 319 spin_unlock_bh(&lslist_lock); 320 } 321 322 static int threads_start(void) 323 { 324 int error; 325 326 /* Thread for sending/receiving messages for all lockspace's */ 327 error = dlm_midcomms_start(); 328 if (error) 329 log_print("cannot start dlm midcomms %d", error); 330 331 return error; 332 } 333 334 static int new_lockspace(const char *name, const char *cluster, 335 uint32_t flags, int lvblen, 336 const struct dlm_lockspace_ops *ops, void *ops_arg, 337 int *ops_result, dlm_lockspace_t **lockspace) 338 { 339 struct dlm_ls *ls; 340 int do_unreg = 0; 341 int namelen = strlen(name); 342 int error; 343 344 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0) 345 return -EINVAL; 346 347 if (lvblen % 8) 348 return -EINVAL; 349 350 if (!try_module_get(THIS_MODULE)) 351 return -EINVAL; 352 353 if (!dlm_user_daemon_available()) { 354 log_print("dlm user daemon not available"); 355 error = -EUNATCH; 356 goto out; 357 } 358 359 if (ops && ops_result) { 360 if (!dlm_config.ci_recover_callbacks) 361 *ops_result = -EOPNOTSUPP; 362 else 363 *ops_result = 0; 364 } 365 366 if (!cluster) 367 log_print("dlm cluster name '%s' is being used without an application provided cluster name", 368 dlm_config.ci_cluster_name); 369 370 if (dlm_config.ci_recover_callbacks && cluster && 371 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) { 372 log_print("dlm cluster name '%s' does not match " 373 "the application cluster name '%s'", 374 dlm_config.ci_cluster_name, cluster); 375 error = -EBADR; 376 goto out; 377 } 378 379 error = 0; 380 381 spin_lock_bh(&lslist_lock); 382 list_for_each_entry(ls, &lslist, ls_list) { 383 WARN_ON(ls->ls_create_count <= 0); 384 if (ls->ls_namelen != namelen) 385 continue; 386 if (memcmp(ls->ls_name, name, namelen)) 387 continue; 388 if (flags & DLM_LSFL_NEWEXCL) { 389 error = -EEXIST; 390 break; 391 } 392 ls->ls_create_count++; 393 *lockspace = ls; 394 error = 1; 395 break; 396 } 397 spin_unlock_bh(&lslist_lock); 398 399 if (error) 400 goto out; 401 402 error = -ENOMEM; 403 404 ls = kzalloc(sizeof(*ls), GFP_NOFS); 405 if (!ls) 406 goto out; 407 memcpy(ls->ls_name, name, namelen); 408 ls->ls_namelen = namelen; 409 ls->ls_lvblen = lvblen; 410 atomic_set(&ls->ls_count, 0); 411 init_waitqueue_head(&ls->ls_count_wait); 412 ls->ls_flags = 0; 413 ls->ls_scan_time = jiffies; 414 415 if (ops && dlm_config.ci_recover_callbacks) { 416 ls->ls_ops = ops; 417 ls->ls_ops_arg = ops_arg; 418 } 419 420 /* ls_exflags are forced to match among nodes, and we don't 421 * need to require all nodes to have some flags set 422 */ 423 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL)); 424 425 INIT_LIST_HEAD(&ls->ls_toss); 426 INIT_LIST_HEAD(&ls->ls_keep); 427 rwlock_init(&ls->ls_rsbtbl_lock); 428 429 error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params); 430 if (error) 431 goto out_lsfree; 432 433 idr_init(&ls->ls_lkbidr); 434 rwlock_init(&ls->ls_lkbidr_lock); 435 436 INIT_LIST_HEAD(&ls->ls_waiters); 437 spin_lock_init(&ls->ls_waiters_lock); 438 INIT_LIST_HEAD(&ls->ls_orphans); 439 spin_lock_init(&ls->ls_orphans_lock); 440 441 INIT_LIST_HEAD(&ls->ls_new_rsb); 442 spin_lock_init(&ls->ls_new_rsb_spin); 443 444 INIT_LIST_HEAD(&ls->ls_nodes); 445 INIT_LIST_HEAD(&ls->ls_nodes_gone); 446 ls->ls_num_nodes = 0; 447 ls->ls_low_nodeid = 0; 448 ls->ls_total_weight = 0; 449 ls->ls_node_array = NULL; 450 451 memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb)); 452 ls->ls_local_rsb.res_ls = ls; 453 454 ls->ls_debug_rsb_dentry = NULL; 455 ls->ls_debug_waiters_dentry = NULL; 456 457 init_waitqueue_head(&ls->ls_uevent_wait); 458 ls->ls_uevent_result = 0; 459 init_completion(&ls->ls_recovery_done); 460 ls->ls_recovery_result = -1; 461 462 spin_lock_init(&ls->ls_cb_lock); 463 INIT_LIST_HEAD(&ls->ls_cb_delay); 464 465 ls->ls_recoverd_task = NULL; 466 mutex_init(&ls->ls_recoverd_active); 467 spin_lock_init(&ls->ls_recover_lock); 468 spin_lock_init(&ls->ls_rcom_spin); 469 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t)); 470 ls->ls_recover_status = 0; 471 ls->ls_recover_seq = get_random_u64(); 472 ls->ls_recover_args = NULL; 473 init_rwsem(&ls->ls_in_recovery); 474 rwlock_init(&ls->ls_recv_active); 475 INIT_LIST_HEAD(&ls->ls_requestqueue); 476 rwlock_init(&ls->ls_requestqueue_lock); 477 spin_lock_init(&ls->ls_clear_proc_locks); 478 479 /* Due backwards compatibility with 3.1 we need to use maximum 480 * possible dlm message size to be sure the message will fit and 481 * not having out of bounds issues. However on sending side 3.2 482 * might send less. 483 */ 484 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS); 485 if (!ls->ls_recover_buf) { 486 error = -ENOMEM; 487 goto out_lkbidr; 488 } 489 490 ls->ls_slot = 0; 491 ls->ls_num_slots = 0; 492 ls->ls_slots_size = 0; 493 ls->ls_slots = NULL; 494 495 INIT_LIST_HEAD(&ls->ls_recover_list); 496 spin_lock_init(&ls->ls_recover_list_lock); 497 idr_init(&ls->ls_recover_idr); 498 spin_lock_init(&ls->ls_recover_idr_lock); 499 ls->ls_recover_list_count = 0; 500 ls->ls_local_handle = ls; 501 init_waitqueue_head(&ls->ls_wait_general); 502 INIT_LIST_HEAD(&ls->ls_masters_list); 503 rwlock_init(&ls->ls_masters_lock); 504 INIT_LIST_HEAD(&ls->ls_dir_dump_list); 505 rwlock_init(&ls->ls_dir_dump_lock); 506 507 INIT_LIST_HEAD(&ls->ls_toss_q); 508 spin_lock_init(&ls->ls_toss_q_lock); 509 timer_setup(&ls->ls_timer, dlm_rsb_toss_timer, 510 TIMER_DEFERRABLE); 511 512 spin_lock_bh(&lslist_lock); 513 ls->ls_create_count = 1; 514 list_add(&ls->ls_list, &lslist); 515 spin_unlock_bh(&lslist_lock); 516 517 if (flags & DLM_LSFL_FS) { 518 error = dlm_callback_start(ls); 519 if (error) { 520 log_error(ls, "can't start dlm_callback %d", error); 521 goto out_delist; 522 } 523 } 524 525 init_waitqueue_head(&ls->ls_recover_lock_wait); 526 527 /* 528 * Once started, dlm_recoverd first looks for ls in lslist, then 529 * initializes ls_in_recovery as locked in "down" mode. We need 530 * to wait for the wakeup from dlm_recoverd because in_recovery 531 * has to start out in down mode. 532 */ 533 534 error = dlm_recoverd_start(ls); 535 if (error) { 536 log_error(ls, "can't start dlm_recoverd %d", error); 537 goto out_callback; 538 } 539 540 wait_event(ls->ls_recover_lock_wait, 541 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); 542 543 /* let kobject handle freeing of ls if there's an error */ 544 do_unreg = 1; 545 546 ls->ls_kobj.kset = dlm_kset; 547 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, 548 "%s", ls->ls_name); 549 if (error) 550 goto out_recoverd; 551 kobject_uevent(&ls->ls_kobj, KOBJ_ADD); 552 553 /* This uevent triggers dlm_controld in userspace to add us to the 554 group of nodes that are members of this lockspace (managed by the 555 cluster infrastructure.) Once it's done that, it tells us who the 556 current lockspace members are (via configfs) and then tells the 557 lockspace to start running (via sysfs) in dlm_ls_start(). */ 558 559 error = do_uevent(ls, 1); 560 if (error) 561 goto out_recoverd; 562 563 /* wait until recovery is successful or failed */ 564 wait_for_completion(&ls->ls_recovery_done); 565 error = ls->ls_recovery_result; 566 if (error) 567 goto out_members; 568 569 dlm_create_debug_file(ls); 570 571 log_rinfo(ls, "join complete"); 572 *lockspace = ls; 573 return 0; 574 575 out_members: 576 do_uevent(ls, 0); 577 dlm_clear_members(ls); 578 kfree(ls->ls_node_array); 579 out_recoverd: 580 dlm_recoverd_stop(ls); 581 out_callback: 582 dlm_callback_stop(ls); 583 out_delist: 584 spin_lock_bh(&lslist_lock); 585 list_del(&ls->ls_list); 586 spin_unlock_bh(&lslist_lock); 587 idr_destroy(&ls->ls_recover_idr); 588 kfree(ls->ls_recover_buf); 589 out_lkbidr: 590 idr_destroy(&ls->ls_lkbidr); 591 rhashtable_destroy(&ls->ls_rsbtbl); 592 out_lsfree: 593 if (do_unreg) 594 kobject_put(&ls->ls_kobj); 595 else 596 kfree(ls); 597 out: 598 module_put(THIS_MODULE); 599 return error; 600 } 601 602 static int __dlm_new_lockspace(const char *name, const char *cluster, 603 uint32_t flags, int lvblen, 604 const struct dlm_lockspace_ops *ops, 605 void *ops_arg, int *ops_result, 606 dlm_lockspace_t **lockspace) 607 { 608 int error = 0; 609 610 mutex_lock(&ls_lock); 611 if (!ls_count) 612 error = threads_start(); 613 if (error) 614 goto out; 615 616 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg, 617 ops_result, lockspace); 618 if (!error) 619 ls_count++; 620 if (error > 0) 621 error = 0; 622 if (!ls_count) { 623 dlm_midcomms_shutdown(); 624 dlm_midcomms_stop(); 625 } 626 out: 627 mutex_unlock(&ls_lock); 628 return error; 629 } 630 631 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags, 632 int lvblen, const struct dlm_lockspace_ops *ops, 633 void *ops_arg, int *ops_result, 634 dlm_lockspace_t **lockspace) 635 { 636 return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen, 637 ops, ops_arg, ops_result, lockspace); 638 } 639 640 int dlm_new_user_lockspace(const char *name, const char *cluster, 641 uint32_t flags, int lvblen, 642 const struct dlm_lockspace_ops *ops, 643 void *ops_arg, int *ops_result, 644 dlm_lockspace_t **lockspace) 645 { 646 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops, 647 ops_arg, ops_result, lockspace); 648 } 649 650 static int lkb_idr_is_local(int id, void *p, void *data) 651 { 652 struct dlm_lkb *lkb = p; 653 654 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV; 655 } 656 657 static int lkb_idr_is_any(int id, void *p, void *data) 658 { 659 return 1; 660 } 661 662 static int lkb_idr_free(int id, void *p, void *data) 663 { 664 struct dlm_lkb *lkb = p; 665 666 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) 667 dlm_free_lvb(lkb->lkb_lvbptr); 668 669 dlm_free_lkb(lkb); 670 return 0; 671 } 672 673 /* NOTE: We check the lkbidr here rather than the resource table. 674 This is because there may be LKBs queued as ASTs that have been unlinked 675 from their RSBs and are pending deletion once the AST has been delivered */ 676 677 static int lockspace_busy(struct dlm_ls *ls, int force) 678 { 679 int rv; 680 681 read_lock_bh(&ls->ls_lkbidr_lock); 682 if (force == 0) { 683 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls); 684 } else if (force == 1) { 685 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls); 686 } else { 687 rv = 0; 688 } 689 read_unlock_bh(&ls->ls_lkbidr_lock); 690 return rv; 691 } 692 693 static void rhash_free_rsb(void *ptr, void *arg) 694 { 695 struct dlm_rsb *rsb = ptr; 696 697 dlm_free_rsb(rsb); 698 } 699 700 static int release_lockspace(struct dlm_ls *ls, int force) 701 { 702 struct dlm_rsb *rsb; 703 int busy, rv; 704 705 busy = lockspace_busy(ls, force); 706 707 spin_lock_bh(&lslist_lock); 708 if (ls->ls_create_count == 1) { 709 if (busy) { 710 rv = -EBUSY; 711 } else { 712 /* remove_lockspace takes ls off lslist */ 713 ls->ls_create_count = 0; 714 rv = 0; 715 } 716 } else if (ls->ls_create_count > 1) { 717 rv = --ls->ls_create_count; 718 } else { 719 rv = -EINVAL; 720 } 721 spin_unlock_bh(&lslist_lock); 722 723 if (rv) { 724 log_debug(ls, "release_lockspace no remove %d", rv); 725 return rv; 726 } 727 728 if (ls_count == 1) 729 dlm_midcomms_version_wait(); 730 731 dlm_device_deregister(ls); 732 733 if (force < 3 && dlm_user_daemon_available()) 734 do_uevent(ls, 0); 735 736 dlm_recoverd_stop(ls); 737 738 /* clear the LSFL_RUNNING flag to fast up 739 * time_shutdown_sync(), we don't care anymore 740 */ 741 clear_bit(LSFL_RUNNING, &ls->ls_flags); 742 timer_shutdown_sync(&ls->ls_timer); 743 744 if (ls_count == 1) { 745 dlm_clear_members(ls); 746 dlm_midcomms_shutdown(); 747 } 748 749 dlm_callback_stop(ls); 750 751 remove_lockspace(ls); 752 753 dlm_delete_debug_file(ls); 754 755 idr_destroy(&ls->ls_recover_idr); 756 kfree(ls->ls_recover_buf); 757 758 /* 759 * Free all lkb's in idr 760 */ 761 762 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls); 763 idr_destroy(&ls->ls_lkbidr); 764 765 /* 766 * Free all rsb's on rsbtbl 767 */ 768 rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL); 769 770 while (!list_empty(&ls->ls_new_rsb)) { 771 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, 772 res_hashchain); 773 list_del(&rsb->res_hashchain); 774 dlm_free_rsb(rsb); 775 } 776 777 /* 778 * Free structures on any other lists 779 */ 780 781 dlm_purge_requestqueue(ls); 782 kfree(ls->ls_recover_args); 783 dlm_clear_members(ls); 784 dlm_clear_members_gone(ls); 785 kfree(ls->ls_node_array); 786 log_rinfo(ls, "release_lockspace final free"); 787 kobject_put(&ls->ls_kobj); 788 /* The ls structure will be freed when the kobject is done with */ 789 790 module_put(THIS_MODULE); 791 return 0; 792 } 793 794 /* 795 * Called when a system has released all its locks and is not going to use the 796 * lockspace any longer. We free everything we're managing for this lockspace. 797 * Remaining nodes will go through the recovery process as if we'd died. The 798 * lockspace must continue to function as usual, participating in recoveries, 799 * until this returns. 800 * 801 * Force has 4 possible values: 802 * 0 - don't destroy lockspace if it has any LKBs 803 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs 804 * 2 - destroy lockspace regardless of LKBs 805 * 3 - destroy lockspace as part of a forced shutdown 806 */ 807 808 int dlm_release_lockspace(void *lockspace, int force) 809 { 810 struct dlm_ls *ls; 811 int error; 812 813 ls = dlm_find_lockspace_local(lockspace); 814 if (!ls) 815 return -EINVAL; 816 dlm_put_lockspace(ls); 817 818 mutex_lock(&ls_lock); 819 error = release_lockspace(ls, force); 820 if (!error) 821 ls_count--; 822 if (!ls_count) 823 dlm_midcomms_stop(); 824 mutex_unlock(&ls_lock); 825 826 return error; 827 } 828 829 void dlm_stop_lockspaces(void) 830 { 831 struct dlm_ls *ls; 832 int count; 833 834 restart: 835 count = 0; 836 spin_lock_bh(&lslist_lock); 837 list_for_each_entry(ls, &lslist, ls_list) { 838 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) { 839 count++; 840 continue; 841 } 842 spin_unlock_bh(&lslist_lock); 843 log_error(ls, "no userland control daemon, stopping lockspace"); 844 dlm_ls_stop(ls); 845 goto restart; 846 } 847 spin_unlock_bh(&lslist_lock); 848 849 if (count) 850 log_print("dlm user daemon left %d lockspaces", count); 851 } 852