1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmdomain.c 5 * 6 * defines domain join / leave apis 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 #include <linux/module.h> 28 #include <linux/types.h> 29 #include <linux/slab.h> 30 #include <linux/highmem.h> 31 #include <linux/utsname.h> 32 #include <linux/init.h> 33 #include <linux/spinlock.h> 34 #include <linux/delay.h> 35 #include <linux/err.h> 36 37 #include "cluster/heartbeat.h" 38 #include "cluster/nodemanager.h" 39 #include "cluster/tcp.h" 40 41 #include "dlmapi.h" 42 #include "dlmcommon.h" 43 44 #include "dlmdomain.h" 45 46 #include "dlmver.h" 47 48 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) 49 #include "cluster/masklog.h" 50 51 static void dlm_free_pagevec(void **vec, int pages) 52 { 53 while (pages--) 54 free_page((unsigned long)vec[pages]); 55 kfree(vec); 56 } 57 58 static void **dlm_alloc_pagevec(int pages) 59 { 60 void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL); 61 int i; 62 63 if (!vec) 64 return NULL; 65 66 for (i = 0; i < pages; i++) 67 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL))) 68 goto out_free; 69 70 mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n", 71 pages, DLM_HASH_PAGES, (unsigned long)DLM_BUCKETS_PER_PAGE); 72 return vec; 73 out_free: 74 dlm_free_pagevec(vec, i); 75 return NULL; 76 } 77 78 /* 79 * 80 * spinlock lock ordering: if multiple locks are needed, obey this ordering: 81 * dlm_domain_lock 82 * struct dlm_ctxt->spinlock 83 * struct dlm_lock_resource->spinlock 84 * struct dlm_ctxt->master_lock 85 * struct dlm_ctxt->ast_lock 86 * dlm_master_list_entry->spinlock 87 * dlm_lock->spinlock 88 * 89 */ 90 91 DEFINE_SPINLOCK(dlm_domain_lock); 92 LIST_HEAD(dlm_domains); 93 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events); 94 95 #define DLM_DOMAIN_BACKOFF_MS 200 96 97 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data); 98 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data); 99 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data); 100 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data); 101 102 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); 103 104 void __dlm_unhash_lockres(struct dlm_lock_resource *lockres) 105 { 106 hlist_del_init(&lockres->hash_node); 107 dlm_lockres_put(lockres); 108 } 109 110 void __dlm_insert_lockres(struct dlm_ctxt *dlm, 111 struct dlm_lock_resource *res) 112 { 113 struct hlist_head *bucket; 114 struct qstr *q; 115 116 assert_spin_locked(&dlm->spinlock); 117 118 q = &res->lockname; 119 bucket = dlm_lockres_hash(dlm, q->hash); 120 121 /* get a reference for our hashtable */ 122 dlm_lockres_get(res); 123 124 hlist_add_head(&res->hash_node, bucket); 125 } 126 127 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 128 const char *name, 129 unsigned int len, 130 unsigned int hash) 131 { 132 struct hlist_head *bucket; 133 struct hlist_node *list; 134 135 mlog_entry("%.*s\n", len, name); 136 137 assert_spin_locked(&dlm->spinlock); 138 139 bucket = dlm_lockres_hash(dlm, hash); 140 141 hlist_for_each(list, bucket) { 142 struct dlm_lock_resource *res = hlist_entry(list, 143 struct dlm_lock_resource, hash_node); 144 if (res->lockname.name[0] != name[0]) 145 continue; 146 if (unlikely(res->lockname.len != len)) 147 continue; 148 if (memcmp(res->lockname.name + 1, name + 1, len - 1)) 149 continue; 150 dlm_lockres_get(res); 151 return res; 152 } 153 return NULL; 154 } 155 156 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, 157 const char *name, 158 unsigned int len) 159 { 160 struct dlm_lock_resource *res; 161 unsigned int hash = dlm_lockid_hash(name, len); 162 163 spin_lock(&dlm->spinlock); 164 res = __dlm_lookup_lockres(dlm, name, len, hash); 165 spin_unlock(&dlm->spinlock); 166 return res; 167 } 168 169 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len) 170 { 171 struct dlm_ctxt *tmp = NULL; 172 struct list_head *iter; 173 174 assert_spin_locked(&dlm_domain_lock); 175 176 /* tmp->name here is always NULL terminated, 177 * but domain may not be! */ 178 list_for_each(iter, &dlm_domains) { 179 tmp = list_entry (iter, struct dlm_ctxt, list); 180 if (strlen(tmp->name) == len && 181 memcmp(tmp->name, domain, len)==0) 182 break; 183 tmp = NULL; 184 } 185 186 return tmp; 187 } 188 189 /* For null terminated domain strings ONLY */ 190 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain) 191 { 192 assert_spin_locked(&dlm_domain_lock); 193 194 return __dlm_lookup_domain_full(domain, strlen(domain)); 195 } 196 197 198 /* returns true on one of two conditions: 199 * 1) the domain does not exist 200 * 2) the domain exists and it's state is "joined" */ 201 static int dlm_wait_on_domain_helper(const char *domain) 202 { 203 int ret = 0; 204 struct dlm_ctxt *tmp = NULL; 205 206 spin_lock(&dlm_domain_lock); 207 208 tmp = __dlm_lookup_domain(domain); 209 if (!tmp) 210 ret = 1; 211 else if (tmp->dlm_state == DLM_CTXT_JOINED) 212 ret = 1; 213 214 spin_unlock(&dlm_domain_lock); 215 return ret; 216 } 217 218 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) 219 { 220 if (dlm->lockres_hash) 221 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); 222 223 if (dlm->name) 224 kfree(dlm->name); 225 226 kfree(dlm); 227 } 228 229 /* A little strange - this function will be called while holding 230 * dlm_domain_lock and is expected to be holding it on the way out. We 231 * will however drop and reacquire it multiple times */ 232 static void dlm_ctxt_release(struct kref *kref) 233 { 234 struct dlm_ctxt *dlm; 235 236 dlm = container_of(kref, struct dlm_ctxt, dlm_refs); 237 238 BUG_ON(dlm->num_joins); 239 BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED); 240 241 /* we may still be in the list if we hit an error during join. */ 242 list_del_init(&dlm->list); 243 244 spin_unlock(&dlm_domain_lock); 245 246 mlog(0, "freeing memory from domain %s\n", dlm->name); 247 248 wake_up(&dlm_domain_events); 249 250 dlm_free_ctxt_mem(dlm); 251 252 spin_lock(&dlm_domain_lock); 253 } 254 255 void dlm_put(struct dlm_ctxt *dlm) 256 { 257 spin_lock(&dlm_domain_lock); 258 kref_put(&dlm->dlm_refs, dlm_ctxt_release); 259 spin_unlock(&dlm_domain_lock); 260 } 261 262 static void __dlm_get(struct dlm_ctxt *dlm) 263 { 264 kref_get(&dlm->dlm_refs); 265 } 266 267 /* given a questionable reference to a dlm object, gets a reference if 268 * it can find it in the list, otherwise returns NULL in which case 269 * you shouldn't trust your pointer. */ 270 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm) 271 { 272 struct list_head *iter; 273 struct dlm_ctxt *target = NULL; 274 275 spin_lock(&dlm_domain_lock); 276 277 list_for_each(iter, &dlm_domains) { 278 target = list_entry (iter, struct dlm_ctxt, list); 279 280 if (target == dlm) { 281 __dlm_get(target); 282 break; 283 } 284 285 target = NULL; 286 } 287 288 spin_unlock(&dlm_domain_lock); 289 290 return target; 291 } 292 293 int dlm_domain_fully_joined(struct dlm_ctxt *dlm) 294 { 295 int ret; 296 297 spin_lock(&dlm_domain_lock); 298 ret = (dlm->dlm_state == DLM_CTXT_JOINED) || 299 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN); 300 spin_unlock(&dlm_domain_lock); 301 302 return ret; 303 } 304 305 static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm) 306 { 307 if (dlm->dlm_worker) { 308 flush_workqueue(dlm->dlm_worker); 309 destroy_workqueue(dlm->dlm_worker); 310 dlm->dlm_worker = NULL; 311 } 312 } 313 314 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm) 315 { 316 dlm_unregister_domain_handlers(dlm); 317 dlm_complete_thread(dlm); 318 dlm_complete_recovery_thread(dlm); 319 dlm_destroy_dlm_worker(dlm); 320 321 /* We've left the domain. Now we can take ourselves out of the 322 * list and allow the kref stuff to help us free the 323 * memory. */ 324 spin_lock(&dlm_domain_lock); 325 list_del_init(&dlm->list); 326 spin_unlock(&dlm_domain_lock); 327 328 /* Wake up anyone waiting for us to remove this domain */ 329 wake_up(&dlm_domain_events); 330 } 331 332 static void dlm_migrate_all_locks(struct dlm_ctxt *dlm) 333 { 334 int i; 335 struct dlm_lock_resource *res; 336 337 mlog(0, "Migrating locks from domain %s\n", dlm->name); 338 restart: 339 spin_lock(&dlm->spinlock); 340 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 341 while (!hlist_empty(dlm_lockres_hash(dlm, i))) { 342 res = hlist_entry(dlm_lockres_hash(dlm, i)->first, 343 struct dlm_lock_resource, hash_node); 344 /* need reference when manually grabbing lockres */ 345 dlm_lockres_get(res); 346 /* this should unhash the lockres 347 * and exit with dlm->spinlock */ 348 mlog(0, "purging res=%p\n", res); 349 if (dlm_lockres_is_dirty(dlm, res)) { 350 /* HACK! this should absolutely go. 351 * need to figure out why some empty 352 * lockreses are still marked dirty */ 353 mlog(ML_ERROR, "lockres %.*s dirty!\n", 354 res->lockname.len, res->lockname.name); 355 356 spin_unlock(&dlm->spinlock); 357 dlm_kick_thread(dlm, res); 358 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res)); 359 dlm_lockres_put(res); 360 goto restart; 361 } 362 dlm_purge_lockres(dlm, res); 363 dlm_lockres_put(res); 364 } 365 } 366 spin_unlock(&dlm->spinlock); 367 368 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); 369 } 370 371 static int dlm_no_joining_node(struct dlm_ctxt *dlm) 372 { 373 int ret; 374 375 spin_lock(&dlm->spinlock); 376 ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN; 377 spin_unlock(&dlm->spinlock); 378 379 return ret; 380 } 381 382 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm) 383 { 384 /* Yikes, a double spinlock! I need domain_lock for the dlm 385 * state and the dlm spinlock for join state... Sorry! */ 386 again: 387 spin_lock(&dlm_domain_lock); 388 spin_lock(&dlm->spinlock); 389 390 if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 391 mlog(0, "Node %d is joining, we wait on it.\n", 392 dlm->joining_node); 393 spin_unlock(&dlm->spinlock); 394 spin_unlock(&dlm_domain_lock); 395 396 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm)); 397 goto again; 398 } 399 400 dlm->dlm_state = DLM_CTXT_LEAVING; 401 spin_unlock(&dlm->spinlock); 402 spin_unlock(&dlm_domain_lock); 403 } 404 405 static void __dlm_print_nodes(struct dlm_ctxt *dlm) 406 { 407 int node = -1; 408 409 assert_spin_locked(&dlm->spinlock); 410 411 printk(KERN_INFO "ocfs2_dlm: Nodes in domain (\"%s\"): ", dlm->name); 412 413 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 414 node + 1)) < O2NM_MAX_NODES) { 415 printk("%d ", node); 416 } 417 printk("\n"); 418 } 419 420 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data) 421 { 422 struct dlm_ctxt *dlm = data; 423 unsigned int node; 424 struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf; 425 426 mlog_entry("%p %u %p", msg, len, data); 427 428 if (!dlm_grab(dlm)) 429 return 0; 430 431 node = exit_msg->node_idx; 432 433 printk(KERN_INFO "ocfs2_dlm: Node %u leaves domain %s\n", node, dlm->name); 434 435 spin_lock(&dlm->spinlock); 436 clear_bit(node, dlm->domain_map); 437 __dlm_print_nodes(dlm); 438 439 /* notify anything attached to the heartbeat events */ 440 dlm_hb_event_notify_attached(dlm, node, 0); 441 442 spin_unlock(&dlm->spinlock); 443 444 dlm_put(dlm); 445 446 return 0; 447 } 448 449 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, 450 unsigned int node) 451 { 452 int status; 453 struct dlm_exit_domain leave_msg; 454 455 mlog(0, "Asking node %u if we can leave the domain %s me = %u\n", 456 node, dlm->name, dlm->node_num); 457 458 memset(&leave_msg, 0, sizeof(leave_msg)); 459 leave_msg.node_idx = dlm->node_num; 460 461 status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key, 462 &leave_msg, sizeof(leave_msg), node, 463 NULL); 464 465 mlog(0, "status return %d from o2net_send_message\n", status); 466 467 return status; 468 } 469 470 471 static void dlm_leave_domain(struct dlm_ctxt *dlm) 472 { 473 int node, clear_node, status; 474 475 /* At this point we've migrated away all our locks and won't 476 * accept mastership of new ones. The dlm is responsible for 477 * almost nothing now. We make sure not to confuse any joining 478 * nodes and then commence shutdown procedure. */ 479 480 spin_lock(&dlm->spinlock); 481 /* Clear ourselves from the domain map */ 482 clear_bit(dlm->node_num, dlm->domain_map); 483 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 484 0)) < O2NM_MAX_NODES) { 485 /* Drop the dlm spinlock. This is safe wrt the domain_map. 486 * -nodes cannot be added now as the 487 * query_join_handlers knows to respond with OK_NO_MAP 488 * -we catch the right network errors if a node is 489 * removed from the map while we're sending him the 490 * exit message. */ 491 spin_unlock(&dlm->spinlock); 492 493 clear_node = 1; 494 495 status = dlm_send_one_domain_exit(dlm, node); 496 if (status < 0 && 497 status != -ENOPROTOOPT && 498 status != -ENOTCONN) { 499 mlog(ML_NOTICE, "Error %d sending domain exit message " 500 "to node %d\n", status, node); 501 502 /* Not sure what to do here but lets sleep for 503 * a bit in case this was a transient 504 * error... */ 505 msleep(DLM_DOMAIN_BACKOFF_MS); 506 clear_node = 0; 507 } 508 509 spin_lock(&dlm->spinlock); 510 /* If we're not clearing the node bit then we intend 511 * to loop back around to try again. */ 512 if (clear_node) 513 clear_bit(node, dlm->domain_map); 514 } 515 spin_unlock(&dlm->spinlock); 516 } 517 518 int dlm_joined(struct dlm_ctxt *dlm) 519 { 520 int ret = 0; 521 522 spin_lock(&dlm_domain_lock); 523 524 if (dlm->dlm_state == DLM_CTXT_JOINED) 525 ret = 1; 526 527 spin_unlock(&dlm_domain_lock); 528 529 return ret; 530 } 531 532 int dlm_shutting_down(struct dlm_ctxt *dlm) 533 { 534 int ret = 0; 535 536 spin_lock(&dlm_domain_lock); 537 538 if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN) 539 ret = 1; 540 541 spin_unlock(&dlm_domain_lock); 542 543 return ret; 544 } 545 546 void dlm_unregister_domain(struct dlm_ctxt *dlm) 547 { 548 int leave = 0; 549 550 spin_lock(&dlm_domain_lock); 551 BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED); 552 BUG_ON(!dlm->num_joins); 553 554 dlm->num_joins--; 555 if (!dlm->num_joins) { 556 /* We mark it "in shutdown" now so new register 557 * requests wait until we've completely left the 558 * domain. Don't use DLM_CTXT_LEAVING yet as we still 559 * want new domain joins to communicate with us at 560 * least until we've completed migration of our 561 * resources. */ 562 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN; 563 leave = 1; 564 } 565 spin_unlock(&dlm_domain_lock); 566 567 if (leave) { 568 mlog(0, "shutting down domain %s\n", dlm->name); 569 570 /* We changed dlm state, notify the thread */ 571 dlm_kick_thread(dlm, NULL); 572 573 dlm_migrate_all_locks(dlm); 574 dlm_mark_domain_leaving(dlm); 575 dlm_leave_domain(dlm); 576 dlm_complete_dlm_shutdown(dlm); 577 } 578 dlm_put(dlm); 579 } 580 EXPORT_SYMBOL_GPL(dlm_unregister_domain); 581 582 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data) 583 { 584 struct dlm_query_join_request *query; 585 enum dlm_query_join_response response; 586 struct dlm_ctxt *dlm = NULL; 587 588 query = (struct dlm_query_join_request *) msg->buf; 589 590 mlog(0, "node %u wants to join domain %s\n", query->node_idx, 591 query->domain); 592 593 /* 594 * If heartbeat doesn't consider the node live, tell it 595 * to back off and try again. This gives heartbeat a chance 596 * to catch up. 597 */ 598 if (!o2hb_check_node_heartbeating(query->node_idx)) { 599 mlog(0, "node %u is not in our live map yet\n", 600 query->node_idx); 601 602 response = JOIN_DISALLOW; 603 goto respond; 604 } 605 606 response = JOIN_OK_NO_MAP; 607 608 spin_lock(&dlm_domain_lock); 609 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 610 /* Once the dlm ctxt is marked as leaving then we don't want 611 * to be put in someone's domain map. 612 * Also, explicitly disallow joining at certain troublesome 613 * times (ie. during recovery). */ 614 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) { 615 int bit = query->node_idx; 616 spin_lock(&dlm->spinlock); 617 618 if (dlm->dlm_state == DLM_CTXT_NEW && 619 dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) { 620 /*If this is a brand new context and we 621 * haven't started our join process yet, then 622 * the other node won the race. */ 623 response = JOIN_OK_NO_MAP; 624 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 625 /* Disallow parallel joins. */ 626 response = JOIN_DISALLOW; 627 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 628 mlog(ML_NOTICE, "node %u trying to join, but recovery " 629 "is ongoing.\n", bit); 630 response = JOIN_DISALLOW; 631 } else if (test_bit(bit, dlm->recovery_map)) { 632 mlog(ML_NOTICE, "node %u trying to join, but it " 633 "still needs recovery.\n", bit); 634 response = JOIN_DISALLOW; 635 } else if (test_bit(bit, dlm->domain_map)) { 636 mlog(ML_NOTICE, "node %u trying to join, but it " 637 "is still in the domain! needs recovery?\n", 638 bit); 639 response = JOIN_DISALLOW; 640 } else { 641 /* Alright we're fully a part of this domain 642 * so we keep some state as to who's joining 643 * and indicate to him that needs to be fixed 644 * up. */ 645 response = JOIN_OK; 646 __dlm_set_joining_node(dlm, query->node_idx); 647 } 648 649 spin_unlock(&dlm->spinlock); 650 } 651 spin_unlock(&dlm_domain_lock); 652 653 respond: 654 mlog(0, "We respond with %u\n", response); 655 656 return response; 657 } 658 659 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data) 660 { 661 struct dlm_assert_joined *assert; 662 struct dlm_ctxt *dlm = NULL; 663 664 assert = (struct dlm_assert_joined *) msg->buf; 665 666 mlog(0, "node %u asserts join on domain %s\n", assert->node_idx, 667 assert->domain); 668 669 spin_lock(&dlm_domain_lock); 670 dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len); 671 /* XXX should we consider no dlm ctxt an error? */ 672 if (dlm) { 673 spin_lock(&dlm->spinlock); 674 675 /* Alright, this node has officially joined our 676 * domain. Set him in the map and clean up our 677 * leftover join state. */ 678 BUG_ON(dlm->joining_node != assert->node_idx); 679 set_bit(assert->node_idx, dlm->domain_map); 680 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 681 682 printk(KERN_INFO "ocfs2_dlm: Node %u joins domain %s\n", 683 assert->node_idx, dlm->name); 684 __dlm_print_nodes(dlm); 685 686 /* notify anything attached to the heartbeat events */ 687 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1); 688 689 spin_unlock(&dlm->spinlock); 690 } 691 spin_unlock(&dlm_domain_lock); 692 693 return 0; 694 } 695 696 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data) 697 { 698 struct dlm_cancel_join *cancel; 699 struct dlm_ctxt *dlm = NULL; 700 701 cancel = (struct dlm_cancel_join *) msg->buf; 702 703 mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx, 704 cancel->domain); 705 706 spin_lock(&dlm_domain_lock); 707 dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len); 708 709 if (dlm) { 710 spin_lock(&dlm->spinlock); 711 712 /* Yikes, this guy wants to cancel his join. No 713 * problem, we simply cleanup our join state. */ 714 BUG_ON(dlm->joining_node != cancel->node_idx); 715 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 716 717 spin_unlock(&dlm->spinlock); 718 } 719 spin_unlock(&dlm_domain_lock); 720 721 return 0; 722 } 723 724 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm, 725 unsigned int node) 726 { 727 int status; 728 struct dlm_cancel_join cancel_msg; 729 730 memset(&cancel_msg, 0, sizeof(cancel_msg)); 731 cancel_msg.node_idx = dlm->node_num; 732 cancel_msg.name_len = strlen(dlm->name); 733 memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len); 734 735 status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 736 &cancel_msg, sizeof(cancel_msg), node, 737 NULL); 738 if (status < 0) { 739 mlog_errno(status); 740 goto bail; 741 } 742 743 bail: 744 return status; 745 } 746 747 /* map_size should be in bytes. */ 748 static int dlm_send_join_cancels(struct dlm_ctxt *dlm, 749 unsigned long *node_map, 750 unsigned int map_size) 751 { 752 int status, tmpstat; 753 unsigned int node; 754 755 if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) * 756 sizeof(unsigned long))) { 757 mlog(ML_ERROR, 758 "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n", 759 map_size, BITS_TO_LONGS(O2NM_MAX_NODES)); 760 return -EINVAL; 761 } 762 763 status = 0; 764 node = -1; 765 while ((node = find_next_bit(node_map, O2NM_MAX_NODES, 766 node + 1)) < O2NM_MAX_NODES) { 767 if (node == dlm->node_num) 768 continue; 769 770 tmpstat = dlm_send_one_join_cancel(dlm, node); 771 if (tmpstat) { 772 mlog(ML_ERROR, "Error return %d cancelling join on " 773 "node %d\n", tmpstat, node); 774 if (!status) 775 status = tmpstat; 776 } 777 } 778 779 if (status) 780 mlog_errno(status); 781 return status; 782 } 783 784 static int dlm_request_join(struct dlm_ctxt *dlm, 785 int node, 786 enum dlm_query_join_response *response) 787 { 788 int status, retval; 789 struct dlm_query_join_request join_msg; 790 791 mlog(0, "querying node %d\n", node); 792 793 memset(&join_msg, 0, sizeof(join_msg)); 794 join_msg.node_idx = dlm->node_num; 795 join_msg.name_len = strlen(dlm->name); 796 memcpy(join_msg.domain, dlm->name, join_msg.name_len); 797 798 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, 799 sizeof(join_msg), node, &retval); 800 if (status < 0 && status != -ENOPROTOOPT) { 801 mlog_errno(status); 802 goto bail; 803 } 804 805 /* -ENOPROTOOPT from the net code means the other side isn't 806 listening for our message type -- that's fine, it means 807 his dlm isn't up, so we can consider him a 'yes' but not 808 joined into the domain. */ 809 if (status == -ENOPROTOOPT) { 810 status = 0; 811 *response = JOIN_OK_NO_MAP; 812 } else if (retval == JOIN_DISALLOW || 813 retval == JOIN_OK || 814 retval == JOIN_OK_NO_MAP) { 815 *response = retval; 816 } else { 817 status = -EINVAL; 818 mlog(ML_ERROR, "invalid response %d from node %u\n", retval, 819 node); 820 } 821 822 mlog(0, "status %d, node %d response is %d\n", status, node, 823 *response); 824 825 bail: 826 return status; 827 } 828 829 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm, 830 unsigned int node) 831 { 832 int status; 833 struct dlm_assert_joined assert_msg; 834 835 mlog(0, "Sending join assert to node %u\n", node); 836 837 memset(&assert_msg, 0, sizeof(assert_msg)); 838 assert_msg.node_idx = dlm->node_num; 839 assert_msg.name_len = strlen(dlm->name); 840 memcpy(assert_msg.domain, dlm->name, assert_msg.name_len); 841 842 status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 843 &assert_msg, sizeof(assert_msg), node, 844 NULL); 845 if (status < 0) 846 mlog_errno(status); 847 848 return status; 849 } 850 851 static void dlm_send_join_asserts(struct dlm_ctxt *dlm, 852 unsigned long *node_map) 853 { 854 int status, node, live; 855 856 status = 0; 857 node = -1; 858 while ((node = find_next_bit(node_map, O2NM_MAX_NODES, 859 node + 1)) < O2NM_MAX_NODES) { 860 if (node == dlm->node_num) 861 continue; 862 863 do { 864 /* It is very important that this message be 865 * received so we spin until either the node 866 * has died or it gets the message. */ 867 status = dlm_send_one_join_assert(dlm, node); 868 869 spin_lock(&dlm->spinlock); 870 live = test_bit(node, dlm->live_nodes_map); 871 spin_unlock(&dlm->spinlock); 872 873 if (status) { 874 mlog(ML_ERROR, "Error return %d asserting " 875 "join on node %d\n", status, node); 876 877 /* give us some time between errors... */ 878 if (live) 879 msleep(DLM_DOMAIN_BACKOFF_MS); 880 } 881 } while (status && live); 882 } 883 } 884 885 struct domain_join_ctxt { 886 unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 887 unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 888 }; 889 890 static int dlm_should_restart_join(struct dlm_ctxt *dlm, 891 struct domain_join_ctxt *ctxt, 892 enum dlm_query_join_response response) 893 { 894 int ret; 895 896 if (response == JOIN_DISALLOW) { 897 mlog(0, "Latest response of disallow -- should restart\n"); 898 return 1; 899 } 900 901 spin_lock(&dlm->spinlock); 902 /* For now, we restart the process if the node maps have 903 * changed at all */ 904 ret = memcmp(ctxt->live_map, dlm->live_nodes_map, 905 sizeof(dlm->live_nodes_map)); 906 spin_unlock(&dlm->spinlock); 907 908 if (ret) 909 mlog(0, "Node maps changed -- should restart\n"); 910 911 return ret; 912 } 913 914 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm) 915 { 916 int status = 0, tmpstat, node; 917 struct domain_join_ctxt *ctxt; 918 enum dlm_query_join_response response; 919 920 mlog_entry("%p", dlm); 921 922 ctxt = kcalloc(1, sizeof(*ctxt), GFP_KERNEL); 923 if (!ctxt) { 924 status = -ENOMEM; 925 mlog_errno(status); 926 goto bail; 927 } 928 929 /* group sem locking should work for us here -- we're already 930 * registered for heartbeat events so filling this should be 931 * atomic wrt getting those handlers called. */ 932 o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map)); 933 934 spin_lock(&dlm->spinlock); 935 memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map)); 936 937 __dlm_set_joining_node(dlm, dlm->node_num); 938 939 spin_unlock(&dlm->spinlock); 940 941 node = -1; 942 while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES, 943 node + 1)) < O2NM_MAX_NODES) { 944 if (node == dlm->node_num) 945 continue; 946 947 status = dlm_request_join(dlm, node, &response); 948 if (status < 0) { 949 mlog_errno(status); 950 goto bail; 951 } 952 953 /* Ok, either we got a response or the node doesn't have a 954 * dlm up. */ 955 if (response == JOIN_OK) 956 set_bit(node, ctxt->yes_resp_map); 957 958 if (dlm_should_restart_join(dlm, ctxt, response)) { 959 status = -EAGAIN; 960 goto bail; 961 } 962 } 963 964 mlog(0, "Yay, done querying nodes!\n"); 965 966 /* Yay, everyone agree's we can join the domain. My domain is 967 * comprised of all nodes who were put in the 968 * yes_resp_map. Copy that into our domain map and send a join 969 * assert message to clean up everyone elses state. */ 970 spin_lock(&dlm->spinlock); 971 memcpy(dlm->domain_map, ctxt->yes_resp_map, 972 sizeof(ctxt->yes_resp_map)); 973 set_bit(dlm->node_num, dlm->domain_map); 974 spin_unlock(&dlm->spinlock); 975 976 dlm_send_join_asserts(dlm, ctxt->yes_resp_map); 977 978 /* Joined state *must* be set before the joining node 979 * information, otherwise the query_join handler may read no 980 * current joiner but a state of NEW and tell joining nodes 981 * we're not in the domain. */ 982 spin_lock(&dlm_domain_lock); 983 dlm->dlm_state = DLM_CTXT_JOINED; 984 dlm->num_joins++; 985 spin_unlock(&dlm_domain_lock); 986 987 bail: 988 spin_lock(&dlm->spinlock); 989 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 990 if (!status) 991 __dlm_print_nodes(dlm); 992 spin_unlock(&dlm->spinlock); 993 994 if (ctxt) { 995 /* Do we need to send a cancel message to any nodes? */ 996 if (status < 0) { 997 tmpstat = dlm_send_join_cancels(dlm, 998 ctxt->yes_resp_map, 999 sizeof(ctxt->yes_resp_map)); 1000 if (tmpstat < 0) 1001 mlog_errno(tmpstat); 1002 } 1003 kfree(ctxt); 1004 } 1005 1006 mlog(0, "returning %d\n", status); 1007 return status; 1008 } 1009 1010 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm) 1011 { 1012 o2hb_unregister_callback(&dlm->dlm_hb_up); 1013 o2hb_unregister_callback(&dlm->dlm_hb_down); 1014 o2net_unregister_handler_list(&dlm->dlm_domain_handlers); 1015 } 1016 1017 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm) 1018 { 1019 int status; 1020 1021 mlog(0, "registering handlers.\n"); 1022 1023 o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB, 1024 dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI); 1025 status = o2hb_register_callback(&dlm->dlm_hb_down); 1026 if (status) 1027 goto bail; 1028 1029 o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB, 1030 dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI); 1031 status = o2hb_register_callback(&dlm->dlm_hb_up); 1032 if (status) 1033 goto bail; 1034 1035 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, 1036 sizeof(struct dlm_master_request), 1037 dlm_master_request_handler, 1038 dlm, &dlm->dlm_domain_handlers); 1039 if (status) 1040 goto bail; 1041 1042 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, 1043 sizeof(struct dlm_assert_master), 1044 dlm_assert_master_handler, 1045 dlm, &dlm->dlm_domain_handlers); 1046 if (status) 1047 goto bail; 1048 1049 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key, 1050 sizeof(struct dlm_create_lock), 1051 dlm_create_lock_handler, 1052 dlm, &dlm->dlm_domain_handlers); 1053 if (status) 1054 goto bail; 1055 1056 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key, 1057 DLM_CONVERT_LOCK_MAX_LEN, 1058 dlm_convert_lock_handler, 1059 dlm, &dlm->dlm_domain_handlers); 1060 if (status) 1061 goto bail; 1062 1063 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key, 1064 DLM_UNLOCK_LOCK_MAX_LEN, 1065 dlm_unlock_lock_handler, 1066 dlm, &dlm->dlm_domain_handlers); 1067 if (status) 1068 goto bail; 1069 1070 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key, 1071 DLM_PROXY_AST_MAX_LEN, 1072 dlm_proxy_ast_handler, 1073 dlm, &dlm->dlm_domain_handlers); 1074 if (status) 1075 goto bail; 1076 1077 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key, 1078 sizeof(struct dlm_exit_domain), 1079 dlm_exit_domain_handler, 1080 dlm, &dlm->dlm_domain_handlers); 1081 if (status) 1082 goto bail; 1083 1084 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, 1085 sizeof(struct dlm_migrate_request), 1086 dlm_migrate_request_handler, 1087 dlm, &dlm->dlm_domain_handlers); 1088 if (status) 1089 goto bail; 1090 1091 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key, 1092 DLM_MIG_LOCKRES_MAX_LEN, 1093 dlm_mig_lockres_handler, 1094 dlm, &dlm->dlm_domain_handlers); 1095 if (status) 1096 goto bail; 1097 1098 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key, 1099 sizeof(struct dlm_master_requery), 1100 dlm_master_requery_handler, 1101 dlm, &dlm->dlm_domain_handlers); 1102 if (status) 1103 goto bail; 1104 1105 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key, 1106 sizeof(struct dlm_lock_request), 1107 dlm_request_all_locks_handler, 1108 dlm, &dlm->dlm_domain_handlers); 1109 if (status) 1110 goto bail; 1111 1112 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key, 1113 sizeof(struct dlm_reco_data_done), 1114 dlm_reco_data_done_handler, 1115 dlm, &dlm->dlm_domain_handlers); 1116 if (status) 1117 goto bail; 1118 1119 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key, 1120 sizeof(struct dlm_begin_reco), 1121 dlm_begin_reco_handler, 1122 dlm, &dlm->dlm_domain_handlers); 1123 if (status) 1124 goto bail; 1125 1126 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key, 1127 sizeof(struct dlm_finalize_reco), 1128 dlm_finalize_reco_handler, 1129 dlm, &dlm->dlm_domain_handlers); 1130 if (status) 1131 goto bail; 1132 1133 bail: 1134 if (status) 1135 dlm_unregister_domain_handlers(dlm); 1136 1137 return status; 1138 } 1139 1140 static int dlm_join_domain(struct dlm_ctxt *dlm) 1141 { 1142 int status; 1143 1144 BUG_ON(!dlm); 1145 1146 mlog(0, "Join domain %s\n", dlm->name); 1147 1148 status = dlm_register_domain_handlers(dlm); 1149 if (status) { 1150 mlog_errno(status); 1151 goto bail; 1152 } 1153 1154 status = dlm_launch_thread(dlm); 1155 if (status < 0) { 1156 mlog_errno(status); 1157 goto bail; 1158 } 1159 1160 status = dlm_launch_recovery_thread(dlm); 1161 if (status < 0) { 1162 mlog_errno(status); 1163 goto bail; 1164 } 1165 1166 dlm->dlm_worker = create_singlethread_workqueue("dlm_wq"); 1167 if (!dlm->dlm_worker) { 1168 status = -ENOMEM; 1169 mlog_errno(status); 1170 goto bail; 1171 } 1172 1173 do { 1174 unsigned int backoff; 1175 status = dlm_try_to_join_domain(dlm); 1176 1177 /* If we're racing another node to the join, then we 1178 * need to back off temporarily and let them 1179 * complete. */ 1180 if (status == -EAGAIN) { 1181 if (signal_pending(current)) { 1182 status = -ERESTARTSYS; 1183 goto bail; 1184 } 1185 1186 /* 1187 * <chip> After you! 1188 * <dale> No, after you! 1189 * <chip> I insist! 1190 * <dale> But you first! 1191 * ... 1192 */ 1193 backoff = (unsigned int)(jiffies & 0x3); 1194 backoff *= DLM_DOMAIN_BACKOFF_MS; 1195 mlog(0, "backoff %d\n", backoff); 1196 msleep(backoff); 1197 } 1198 } while (status == -EAGAIN); 1199 1200 if (status < 0) { 1201 mlog_errno(status); 1202 goto bail; 1203 } 1204 1205 status = 0; 1206 bail: 1207 wake_up(&dlm_domain_events); 1208 1209 if (status) { 1210 dlm_unregister_domain_handlers(dlm); 1211 dlm_complete_thread(dlm); 1212 dlm_complete_recovery_thread(dlm); 1213 dlm_destroy_dlm_worker(dlm); 1214 } 1215 1216 return status; 1217 } 1218 1219 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, 1220 u32 key) 1221 { 1222 int i; 1223 struct dlm_ctxt *dlm = NULL; 1224 1225 dlm = kcalloc(1, sizeof(*dlm), GFP_KERNEL); 1226 if (!dlm) { 1227 mlog_errno(-ENOMEM); 1228 goto leave; 1229 } 1230 1231 dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL); 1232 if (dlm->name == NULL) { 1233 mlog_errno(-ENOMEM); 1234 kfree(dlm); 1235 dlm = NULL; 1236 goto leave; 1237 } 1238 1239 dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES); 1240 if (!dlm->lockres_hash) { 1241 mlog_errno(-ENOMEM); 1242 kfree(dlm->name); 1243 kfree(dlm); 1244 dlm = NULL; 1245 goto leave; 1246 } 1247 1248 for (i = 0; i < DLM_HASH_BUCKETS; i++) 1249 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i)); 1250 1251 strcpy(dlm->name, domain); 1252 dlm->key = key; 1253 dlm->node_num = o2nm_this_node(); 1254 1255 spin_lock_init(&dlm->spinlock); 1256 spin_lock_init(&dlm->master_lock); 1257 spin_lock_init(&dlm->ast_lock); 1258 INIT_LIST_HEAD(&dlm->list); 1259 INIT_LIST_HEAD(&dlm->dirty_list); 1260 INIT_LIST_HEAD(&dlm->reco.resources); 1261 INIT_LIST_HEAD(&dlm->reco.received); 1262 INIT_LIST_HEAD(&dlm->reco.node_data); 1263 INIT_LIST_HEAD(&dlm->purge_list); 1264 INIT_LIST_HEAD(&dlm->dlm_domain_handlers); 1265 dlm->reco.state = 0; 1266 1267 INIT_LIST_HEAD(&dlm->pending_asts); 1268 INIT_LIST_HEAD(&dlm->pending_basts); 1269 1270 mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n", 1271 dlm->recovery_map, &(dlm->recovery_map[0])); 1272 1273 memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map)); 1274 memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map)); 1275 memset(dlm->domain_map, 0, sizeof(dlm->domain_map)); 1276 1277 dlm->dlm_thread_task = NULL; 1278 dlm->dlm_reco_thread_task = NULL; 1279 dlm->dlm_worker = NULL; 1280 init_waitqueue_head(&dlm->dlm_thread_wq); 1281 init_waitqueue_head(&dlm->dlm_reco_thread_wq); 1282 init_waitqueue_head(&dlm->reco.event); 1283 init_waitqueue_head(&dlm->ast_wq); 1284 init_waitqueue_head(&dlm->migration_wq); 1285 INIT_LIST_HEAD(&dlm->master_list); 1286 INIT_LIST_HEAD(&dlm->mle_hb_events); 1287 1288 dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN; 1289 init_waitqueue_head(&dlm->dlm_join_events); 1290 1291 dlm->reco.new_master = O2NM_INVALID_NODE_NUM; 1292 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; 1293 atomic_set(&dlm->local_resources, 0); 1294 atomic_set(&dlm->remote_resources, 0); 1295 atomic_set(&dlm->unknown_resources, 0); 1296 1297 spin_lock_init(&dlm->work_lock); 1298 INIT_LIST_HEAD(&dlm->work_list); 1299 INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work, dlm); 1300 1301 kref_init(&dlm->dlm_refs); 1302 dlm->dlm_state = DLM_CTXT_NEW; 1303 1304 INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks); 1305 1306 mlog(0, "context init: refcount %u\n", 1307 atomic_read(&dlm->dlm_refs.refcount)); 1308 1309 leave: 1310 return dlm; 1311 } 1312 1313 /* 1314 * dlm_register_domain: one-time setup per "domain" 1315 */ 1316 struct dlm_ctxt * dlm_register_domain(const char *domain, 1317 u32 key) 1318 { 1319 int ret; 1320 struct dlm_ctxt *dlm = NULL; 1321 struct dlm_ctxt *new_ctxt = NULL; 1322 1323 if (strlen(domain) > O2NM_MAX_NAME_LEN) { 1324 ret = -ENAMETOOLONG; 1325 mlog(ML_ERROR, "domain name length too long\n"); 1326 goto leave; 1327 } 1328 1329 if (!o2hb_check_local_node_heartbeating()) { 1330 mlog(ML_ERROR, "the local node has not been configured, or is " 1331 "not heartbeating\n"); 1332 ret = -EPROTO; 1333 goto leave; 1334 } 1335 1336 mlog(0, "register called for domain \"%s\"\n", domain); 1337 1338 retry: 1339 dlm = NULL; 1340 if (signal_pending(current)) { 1341 ret = -ERESTARTSYS; 1342 mlog_errno(ret); 1343 goto leave; 1344 } 1345 1346 spin_lock(&dlm_domain_lock); 1347 1348 dlm = __dlm_lookup_domain(domain); 1349 if (dlm) { 1350 if (dlm->dlm_state != DLM_CTXT_JOINED) { 1351 spin_unlock(&dlm_domain_lock); 1352 1353 mlog(0, "This ctxt is not joined yet!\n"); 1354 wait_event_interruptible(dlm_domain_events, 1355 dlm_wait_on_domain_helper( 1356 domain)); 1357 goto retry; 1358 } 1359 1360 __dlm_get(dlm); 1361 dlm->num_joins++; 1362 1363 spin_unlock(&dlm_domain_lock); 1364 1365 ret = 0; 1366 goto leave; 1367 } 1368 1369 /* doesn't exist */ 1370 if (!new_ctxt) { 1371 spin_unlock(&dlm_domain_lock); 1372 1373 new_ctxt = dlm_alloc_ctxt(domain, key); 1374 if (new_ctxt) 1375 goto retry; 1376 1377 ret = -ENOMEM; 1378 mlog_errno(ret); 1379 goto leave; 1380 } 1381 1382 /* a little variable switch-a-roo here... */ 1383 dlm = new_ctxt; 1384 new_ctxt = NULL; 1385 1386 /* add the new domain */ 1387 list_add_tail(&dlm->list, &dlm_domains); 1388 spin_unlock(&dlm_domain_lock); 1389 1390 ret = dlm_join_domain(dlm); 1391 if (ret) { 1392 mlog_errno(ret); 1393 dlm_put(dlm); 1394 goto leave; 1395 } 1396 1397 ret = 0; 1398 leave: 1399 if (new_ctxt) 1400 dlm_free_ctxt_mem(new_ctxt); 1401 1402 if (ret < 0) 1403 dlm = ERR_PTR(ret); 1404 1405 return dlm; 1406 } 1407 EXPORT_SYMBOL_GPL(dlm_register_domain); 1408 1409 static LIST_HEAD(dlm_join_handlers); 1410 1411 static void dlm_unregister_net_handlers(void) 1412 { 1413 o2net_unregister_handler_list(&dlm_join_handlers); 1414 } 1415 1416 static int dlm_register_net_handlers(void) 1417 { 1418 int status = 0; 1419 1420 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 1421 sizeof(struct dlm_query_join_request), 1422 dlm_query_join_handler, 1423 NULL, &dlm_join_handlers); 1424 if (status) 1425 goto bail; 1426 1427 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1428 sizeof(struct dlm_assert_joined), 1429 dlm_assert_joined_handler, 1430 NULL, &dlm_join_handlers); 1431 if (status) 1432 goto bail; 1433 1434 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 1435 sizeof(struct dlm_cancel_join), 1436 dlm_cancel_join_handler, 1437 NULL, &dlm_join_handlers); 1438 1439 bail: 1440 if (status < 0) 1441 dlm_unregister_net_handlers(); 1442 1443 return status; 1444 } 1445 1446 /* Domain eviction callback handling. 1447 * 1448 * The file system requires notification of node death *before* the 1449 * dlm completes it's recovery work, otherwise it may be able to 1450 * acquire locks on resources requiring recovery. Since the dlm can 1451 * evict a node from it's domain *before* heartbeat fires, a similar 1452 * mechanism is required. */ 1453 1454 /* Eviction is not expected to happen often, so a per-domain lock is 1455 * not necessary. Eviction callbacks are allowed to sleep for short 1456 * periods of time. */ 1457 static DECLARE_RWSEM(dlm_callback_sem); 1458 1459 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm, 1460 int node_num) 1461 { 1462 struct list_head *iter; 1463 struct dlm_eviction_cb *cb; 1464 1465 down_read(&dlm_callback_sem); 1466 list_for_each(iter, &dlm->dlm_eviction_callbacks) { 1467 cb = list_entry(iter, struct dlm_eviction_cb, ec_item); 1468 1469 cb->ec_func(node_num, cb->ec_data); 1470 } 1471 up_read(&dlm_callback_sem); 1472 } 1473 1474 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb, 1475 dlm_eviction_func *f, 1476 void *data) 1477 { 1478 INIT_LIST_HEAD(&cb->ec_item); 1479 cb->ec_func = f; 1480 cb->ec_data = data; 1481 } 1482 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb); 1483 1484 void dlm_register_eviction_cb(struct dlm_ctxt *dlm, 1485 struct dlm_eviction_cb *cb) 1486 { 1487 down_write(&dlm_callback_sem); 1488 list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks); 1489 up_write(&dlm_callback_sem); 1490 } 1491 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb); 1492 1493 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb) 1494 { 1495 down_write(&dlm_callback_sem); 1496 list_del_init(&cb->ec_item); 1497 up_write(&dlm_callback_sem); 1498 } 1499 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb); 1500 1501 static int __init dlm_init(void) 1502 { 1503 int status; 1504 1505 dlm_print_version(); 1506 1507 status = dlm_init_mle_cache(); 1508 if (status) 1509 return -1; 1510 1511 status = dlm_register_net_handlers(); 1512 if (status) { 1513 dlm_destroy_mle_cache(); 1514 return -1; 1515 } 1516 1517 return 0; 1518 } 1519 1520 static void __exit dlm_exit (void) 1521 { 1522 dlm_unregister_net_handlers(); 1523 dlm_destroy_mle_cache(); 1524 } 1525 1526 MODULE_AUTHOR("Oracle"); 1527 MODULE_LICENSE("GPL"); 1528 1529 module_init(dlm_init); 1530 module_exit(dlm_exit); 1531