1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmmod.c 5 * 6 * standalone DLM module 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 28 #include <linux/module.h> 29 #include <linux/fs.h> 30 #include <linux/types.h> 31 #include <linux/slab.h> 32 #include <linux/highmem.h> 33 #include <linux/utsname.h> 34 #include <linux/init.h> 35 #include <linux/sysctl.h> 36 #include <linux/random.h> 37 #include <linux/blkdev.h> 38 #include <linux/socket.h> 39 #include <linux/inet.h> 40 #include <linux/spinlock.h> 41 #include <linux/delay.h> 42 43 44 #include "cluster/heartbeat.h" 45 #include "cluster/nodemanager.h" 46 #include "cluster/tcp.h" 47 48 #include "dlmapi.h" 49 #include "dlmcommon.h" 50 #include "dlmdebug.h" 51 #include "dlmdomain.h" 52 53 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER) 54 #include "cluster/masklog.h" 55 56 enum dlm_mle_type { 57 DLM_MLE_BLOCK, 58 DLM_MLE_MASTER, 59 DLM_MLE_MIGRATION 60 }; 61 62 struct dlm_lock_name 63 { 64 u8 len; 65 u8 name[DLM_LOCKID_NAME_MAX]; 66 }; 67 68 struct dlm_master_list_entry 69 { 70 struct list_head list; 71 struct list_head hb_events; 72 struct dlm_ctxt *dlm; 73 spinlock_t spinlock; 74 wait_queue_head_t wq; 75 atomic_t woken; 76 struct kref mle_refs; 77 unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 78 unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 79 unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 80 unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 81 u8 master; 82 u8 new_master; 83 enum dlm_mle_type type; 84 struct o2hb_callback_func mle_hb_up; 85 struct o2hb_callback_func mle_hb_down; 86 union { 87 struct dlm_lock_resource *res; 88 struct dlm_lock_name name; 89 } u; 90 }; 91 92 static void dlm_mle_node_down(struct dlm_ctxt *dlm, 93 struct dlm_master_list_entry *mle, 94 struct o2nm_node *node, 95 int idx); 96 static void dlm_mle_node_up(struct dlm_ctxt *dlm, 97 struct dlm_master_list_entry *mle, 98 struct o2nm_node *node, 99 int idx); 100 101 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data); 102 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, 103 unsigned int namelen, void *nodemap, 104 u32 flags); 105 106 static inline int dlm_mle_equal(struct dlm_ctxt *dlm, 107 struct dlm_master_list_entry *mle, 108 const char *name, 109 unsigned int namelen) 110 { 111 struct dlm_lock_resource *res; 112 113 if (dlm != mle->dlm) 114 return 0; 115 116 if (mle->type == DLM_MLE_BLOCK || 117 mle->type == DLM_MLE_MIGRATION) { 118 if (namelen != mle->u.name.len || 119 memcmp(name, mle->u.name.name, namelen)!=0) 120 return 0; 121 } else { 122 res = mle->u.res; 123 if (namelen != res->lockname.len || 124 memcmp(res->lockname.name, name, namelen) != 0) 125 return 0; 126 } 127 return 1; 128 } 129 130 #if 0 131 /* Code here is included but defined out as it aids debugging */ 132 133 void dlm_print_one_mle(struct dlm_master_list_entry *mle) 134 { 135 int i = 0, refs; 136 char *type; 137 char attached; 138 u8 master; 139 unsigned int namelen; 140 const char *name; 141 struct kref *k; 142 143 k = &mle->mle_refs; 144 if (mle->type == DLM_MLE_BLOCK) 145 type = "BLK"; 146 else if (mle->type == DLM_MLE_MASTER) 147 type = "MAS"; 148 else 149 type = "MIG"; 150 refs = atomic_read(&k->refcount); 151 master = mle->master; 152 attached = (list_empty(&mle->hb_events) ? 'N' : 'Y'); 153 154 if (mle->type != DLM_MLE_MASTER) { 155 namelen = mle->u.name.len; 156 name = mle->u.name.name; 157 } else { 158 namelen = mle->u.res->lockname.len; 159 name = mle->u.res->lockname.name; 160 } 161 162 mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n", 163 i, type, refs, master, mle->new_master, attached, 164 namelen, namelen, name); 165 } 166 167 static void dlm_dump_mles(struct dlm_ctxt *dlm) 168 { 169 struct dlm_master_list_entry *mle; 170 struct list_head *iter; 171 172 mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name); 173 mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n"); 174 spin_lock(&dlm->master_lock); 175 list_for_each(iter, &dlm->master_list) { 176 mle = list_entry(iter, struct dlm_master_list_entry, list); 177 dlm_print_one_mle(mle); 178 } 179 spin_unlock(&dlm->master_lock); 180 } 181 182 int dlm_dump_all_mles(const char __user *data, unsigned int len) 183 { 184 struct list_head *iter; 185 struct dlm_ctxt *dlm; 186 187 spin_lock(&dlm_domain_lock); 188 list_for_each(iter, &dlm_domains) { 189 dlm = list_entry (iter, struct dlm_ctxt, list); 190 mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name); 191 dlm_dump_mles(dlm); 192 } 193 spin_unlock(&dlm_domain_lock); 194 return len; 195 } 196 EXPORT_SYMBOL_GPL(dlm_dump_all_mles); 197 198 #endif /* 0 */ 199 200 201 static kmem_cache_t *dlm_mle_cache = NULL; 202 203 204 static void dlm_mle_release(struct kref *kref); 205 static void dlm_init_mle(struct dlm_master_list_entry *mle, 206 enum dlm_mle_type type, 207 struct dlm_ctxt *dlm, 208 struct dlm_lock_resource *res, 209 const char *name, 210 unsigned int namelen); 211 static void dlm_put_mle(struct dlm_master_list_entry *mle); 212 static void __dlm_put_mle(struct dlm_master_list_entry *mle); 213 static int dlm_find_mle(struct dlm_ctxt *dlm, 214 struct dlm_master_list_entry **mle, 215 char *name, unsigned int namelen); 216 217 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to); 218 219 220 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm, 221 struct dlm_lock_resource *res, 222 struct dlm_master_list_entry *mle, 223 int *blocked); 224 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, 225 struct dlm_lock_resource *res, 226 struct dlm_master_list_entry *mle, 227 int blocked); 228 static int dlm_add_migration_mle(struct dlm_ctxt *dlm, 229 struct dlm_lock_resource *res, 230 struct dlm_master_list_entry *mle, 231 struct dlm_master_list_entry **oldmle, 232 const char *name, unsigned int namelen, 233 u8 new_master, u8 master); 234 235 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm, 236 struct dlm_lock_resource *res); 237 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, 238 struct dlm_lock_resource *res); 239 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm, 240 struct dlm_lock_resource *res, 241 u8 target); 242 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, 243 struct dlm_lock_resource *res); 244 245 246 int dlm_is_host_down(int errno) 247 { 248 switch (errno) { 249 case -EBADF: 250 case -ECONNREFUSED: 251 case -ENOTCONN: 252 case -ECONNRESET: 253 case -EPIPE: 254 case -EHOSTDOWN: 255 case -EHOSTUNREACH: 256 case -ETIMEDOUT: 257 case -ECONNABORTED: 258 case -ENETDOWN: 259 case -ENETUNREACH: 260 case -ENETRESET: 261 case -ESHUTDOWN: 262 case -ENOPROTOOPT: 263 case -EINVAL: /* if returned from our tcp code, 264 this means there is no socket */ 265 return 1; 266 } 267 return 0; 268 } 269 270 271 /* 272 * MASTER LIST FUNCTIONS 273 */ 274 275 276 /* 277 * regarding master list entries and heartbeat callbacks: 278 * 279 * in order to avoid sleeping and allocation that occurs in 280 * heartbeat, master list entries are simply attached to the 281 * dlm's established heartbeat callbacks. the mle is attached 282 * when it is created, and since the dlm->spinlock is held at 283 * that time, any heartbeat event will be properly discovered 284 * by the mle. the mle needs to be detached from the 285 * dlm->mle_hb_events list as soon as heartbeat events are no 286 * longer useful to the mle, and before the mle is freed. 287 * 288 * as a general rule, heartbeat events are no longer needed by 289 * the mle once an "answer" regarding the lock master has been 290 * received. 291 */ 292 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm, 293 struct dlm_master_list_entry *mle) 294 { 295 assert_spin_locked(&dlm->spinlock); 296 297 list_add_tail(&mle->hb_events, &dlm->mle_hb_events); 298 } 299 300 301 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm, 302 struct dlm_master_list_entry *mle) 303 { 304 if (!list_empty(&mle->hb_events)) 305 list_del_init(&mle->hb_events); 306 } 307 308 309 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm, 310 struct dlm_master_list_entry *mle) 311 { 312 spin_lock(&dlm->spinlock); 313 __dlm_mle_detach_hb_events(dlm, mle); 314 spin_unlock(&dlm->spinlock); 315 } 316 317 /* remove from list and free */ 318 static void __dlm_put_mle(struct dlm_master_list_entry *mle) 319 { 320 struct dlm_ctxt *dlm; 321 dlm = mle->dlm; 322 323 assert_spin_locked(&dlm->spinlock); 324 assert_spin_locked(&dlm->master_lock); 325 BUG_ON(!atomic_read(&mle->mle_refs.refcount)); 326 327 kref_put(&mle->mle_refs, dlm_mle_release); 328 } 329 330 331 /* must not have any spinlocks coming in */ 332 static void dlm_put_mle(struct dlm_master_list_entry *mle) 333 { 334 struct dlm_ctxt *dlm; 335 dlm = mle->dlm; 336 337 spin_lock(&dlm->spinlock); 338 spin_lock(&dlm->master_lock); 339 __dlm_put_mle(mle); 340 spin_unlock(&dlm->master_lock); 341 spin_unlock(&dlm->spinlock); 342 } 343 344 static inline void dlm_get_mle(struct dlm_master_list_entry *mle) 345 { 346 kref_get(&mle->mle_refs); 347 } 348 349 static void dlm_init_mle(struct dlm_master_list_entry *mle, 350 enum dlm_mle_type type, 351 struct dlm_ctxt *dlm, 352 struct dlm_lock_resource *res, 353 const char *name, 354 unsigned int namelen) 355 { 356 assert_spin_locked(&dlm->spinlock); 357 358 mle->dlm = dlm; 359 mle->type = type; 360 INIT_LIST_HEAD(&mle->list); 361 INIT_LIST_HEAD(&mle->hb_events); 362 memset(mle->maybe_map, 0, sizeof(mle->maybe_map)); 363 spin_lock_init(&mle->spinlock); 364 init_waitqueue_head(&mle->wq); 365 atomic_set(&mle->woken, 0); 366 kref_init(&mle->mle_refs); 367 memset(mle->response_map, 0, sizeof(mle->response_map)); 368 mle->master = O2NM_MAX_NODES; 369 mle->new_master = O2NM_MAX_NODES; 370 371 if (mle->type == DLM_MLE_MASTER) { 372 BUG_ON(!res); 373 mle->u.res = res; 374 } else if (mle->type == DLM_MLE_BLOCK) { 375 BUG_ON(!name); 376 memcpy(mle->u.name.name, name, namelen); 377 mle->u.name.len = namelen; 378 } else /* DLM_MLE_MIGRATION */ { 379 BUG_ON(!name); 380 memcpy(mle->u.name.name, name, namelen); 381 mle->u.name.len = namelen; 382 } 383 384 /* copy off the node_map and register hb callbacks on our copy */ 385 memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map)); 386 memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map)); 387 clear_bit(dlm->node_num, mle->vote_map); 388 clear_bit(dlm->node_num, mle->node_map); 389 390 /* attach the mle to the domain node up/down events */ 391 __dlm_mle_attach_hb_events(dlm, mle); 392 } 393 394 395 /* returns 1 if found, 0 if not */ 396 static int dlm_find_mle(struct dlm_ctxt *dlm, 397 struct dlm_master_list_entry **mle, 398 char *name, unsigned int namelen) 399 { 400 struct dlm_master_list_entry *tmpmle; 401 struct list_head *iter; 402 403 assert_spin_locked(&dlm->master_lock); 404 405 list_for_each(iter, &dlm->master_list) { 406 tmpmle = list_entry(iter, struct dlm_master_list_entry, list); 407 if (!dlm_mle_equal(dlm, tmpmle, name, namelen)) 408 continue; 409 dlm_get_mle(tmpmle); 410 *mle = tmpmle; 411 return 1; 412 } 413 return 0; 414 } 415 416 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up) 417 { 418 struct dlm_master_list_entry *mle; 419 struct list_head *iter; 420 421 assert_spin_locked(&dlm->spinlock); 422 423 list_for_each(iter, &dlm->mle_hb_events) { 424 mle = list_entry(iter, struct dlm_master_list_entry, 425 hb_events); 426 if (node_up) 427 dlm_mle_node_up(dlm, mle, NULL, idx); 428 else 429 dlm_mle_node_down(dlm, mle, NULL, idx); 430 } 431 } 432 433 static void dlm_mle_node_down(struct dlm_ctxt *dlm, 434 struct dlm_master_list_entry *mle, 435 struct o2nm_node *node, int idx) 436 { 437 spin_lock(&mle->spinlock); 438 439 if (!test_bit(idx, mle->node_map)) 440 mlog(0, "node %u already removed from nodemap!\n", idx); 441 else 442 clear_bit(idx, mle->node_map); 443 444 spin_unlock(&mle->spinlock); 445 } 446 447 static void dlm_mle_node_up(struct dlm_ctxt *dlm, 448 struct dlm_master_list_entry *mle, 449 struct o2nm_node *node, int idx) 450 { 451 spin_lock(&mle->spinlock); 452 453 if (test_bit(idx, mle->node_map)) 454 mlog(0, "node %u already in node map!\n", idx); 455 else 456 set_bit(idx, mle->node_map); 457 458 spin_unlock(&mle->spinlock); 459 } 460 461 462 int dlm_init_mle_cache(void) 463 { 464 dlm_mle_cache = kmem_cache_create("dlm_mle_cache", 465 sizeof(struct dlm_master_list_entry), 466 0, SLAB_HWCACHE_ALIGN, 467 NULL, NULL); 468 if (dlm_mle_cache == NULL) 469 return -ENOMEM; 470 return 0; 471 } 472 473 void dlm_destroy_mle_cache(void) 474 { 475 if (dlm_mle_cache) 476 kmem_cache_destroy(dlm_mle_cache); 477 } 478 479 static void dlm_mle_release(struct kref *kref) 480 { 481 struct dlm_master_list_entry *mle; 482 struct dlm_ctxt *dlm; 483 484 mlog_entry_void(); 485 486 mle = container_of(kref, struct dlm_master_list_entry, mle_refs); 487 dlm = mle->dlm; 488 489 if (mle->type != DLM_MLE_MASTER) { 490 mlog(0, "calling mle_release for %.*s, type %d\n", 491 mle->u.name.len, mle->u.name.name, mle->type); 492 } else { 493 mlog(0, "calling mle_release for %.*s, type %d\n", 494 mle->u.res->lockname.len, 495 mle->u.res->lockname.name, mle->type); 496 } 497 assert_spin_locked(&dlm->spinlock); 498 assert_spin_locked(&dlm->master_lock); 499 500 /* remove from list if not already */ 501 if (!list_empty(&mle->list)) 502 list_del_init(&mle->list); 503 504 /* detach the mle from the domain node up/down events */ 505 __dlm_mle_detach_hb_events(dlm, mle); 506 507 /* NOTE: kfree under spinlock here. 508 * if this is bad, we can move this to a freelist. */ 509 kmem_cache_free(dlm_mle_cache, mle); 510 } 511 512 513 /* 514 * LOCK RESOURCE FUNCTIONS 515 */ 516 517 static void dlm_set_lockres_owner(struct dlm_ctxt *dlm, 518 struct dlm_lock_resource *res, 519 u8 owner) 520 { 521 assert_spin_locked(&res->spinlock); 522 523 mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner); 524 525 if (owner == dlm->node_num) 526 atomic_inc(&dlm->local_resources); 527 else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN) 528 atomic_inc(&dlm->unknown_resources); 529 else 530 atomic_inc(&dlm->remote_resources); 531 532 res->owner = owner; 533 } 534 535 void dlm_change_lockres_owner(struct dlm_ctxt *dlm, 536 struct dlm_lock_resource *res, u8 owner) 537 { 538 assert_spin_locked(&res->spinlock); 539 540 if (owner == res->owner) 541 return; 542 543 if (res->owner == dlm->node_num) 544 atomic_dec(&dlm->local_resources); 545 else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) 546 atomic_dec(&dlm->unknown_resources); 547 else 548 atomic_dec(&dlm->remote_resources); 549 550 dlm_set_lockres_owner(dlm, res, owner); 551 } 552 553 554 static void dlm_lockres_release(struct kref *kref) 555 { 556 struct dlm_lock_resource *res; 557 558 res = container_of(kref, struct dlm_lock_resource, refs); 559 560 /* This should not happen -- all lockres' have a name 561 * associated with them at init time. */ 562 BUG_ON(!res->lockname.name); 563 564 mlog(0, "destroying lockres %.*s\n", res->lockname.len, 565 res->lockname.name); 566 567 /* By the time we're ready to blow this guy away, we shouldn't 568 * be on any lists. */ 569 BUG_ON(!hlist_unhashed(&res->hash_node)); 570 BUG_ON(!list_empty(&res->granted)); 571 BUG_ON(!list_empty(&res->converting)); 572 BUG_ON(!list_empty(&res->blocked)); 573 BUG_ON(!list_empty(&res->dirty)); 574 BUG_ON(!list_empty(&res->recovering)); 575 BUG_ON(!list_empty(&res->purge)); 576 577 kfree(res->lockname.name); 578 579 kfree(res); 580 } 581 582 void dlm_lockres_get(struct dlm_lock_resource *res) 583 { 584 kref_get(&res->refs); 585 } 586 587 void dlm_lockres_put(struct dlm_lock_resource *res) 588 { 589 kref_put(&res->refs, dlm_lockres_release); 590 } 591 592 static void dlm_init_lockres(struct dlm_ctxt *dlm, 593 struct dlm_lock_resource *res, 594 const char *name, unsigned int namelen) 595 { 596 char *qname; 597 598 /* If we memset here, we lose our reference to the kmalloc'd 599 * res->lockname.name, so be sure to init every field 600 * correctly! */ 601 602 qname = (char *) res->lockname.name; 603 memcpy(qname, name, namelen); 604 605 res->lockname.len = namelen; 606 res->lockname.hash = full_name_hash(name, namelen); 607 608 init_waitqueue_head(&res->wq); 609 spin_lock_init(&res->spinlock); 610 INIT_HLIST_NODE(&res->hash_node); 611 INIT_LIST_HEAD(&res->granted); 612 INIT_LIST_HEAD(&res->converting); 613 INIT_LIST_HEAD(&res->blocked); 614 INIT_LIST_HEAD(&res->dirty); 615 INIT_LIST_HEAD(&res->recovering); 616 INIT_LIST_HEAD(&res->purge); 617 atomic_set(&res->asts_reserved, 0); 618 res->migration_pending = 0; 619 620 kref_init(&res->refs); 621 622 /* just for consistency */ 623 spin_lock(&res->spinlock); 624 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN); 625 spin_unlock(&res->spinlock); 626 627 res->state = DLM_LOCK_RES_IN_PROGRESS; 628 629 res->last_used = 0; 630 631 memset(res->lvb, 0, DLM_LVB_LEN); 632 } 633 634 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, 635 const char *name, 636 unsigned int namelen) 637 { 638 struct dlm_lock_resource *res; 639 640 res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 641 if (!res) 642 return NULL; 643 644 res->lockname.name = kmalloc(namelen, GFP_KERNEL); 645 if (!res->lockname.name) { 646 kfree(res); 647 return NULL; 648 } 649 650 dlm_init_lockres(dlm, res, name, namelen); 651 return res; 652 } 653 654 /* 655 * lookup a lock resource by name. 656 * may already exist in the hashtable. 657 * lockid is null terminated 658 * 659 * if not, allocate enough for the lockres and for 660 * the temporary structure used in doing the mastering. 661 * 662 * also, do a lookup in the dlm->master_list to see 663 * if another node has begun mastering the same lock. 664 * if so, there should be a block entry in there 665 * for this name, and we should *not* attempt to master 666 * the lock here. need to wait around for that node 667 * to assert_master (or die). 668 * 669 */ 670 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, 671 const char *lockid, 672 int flags) 673 { 674 struct dlm_lock_resource *tmpres=NULL, *res=NULL; 675 struct dlm_master_list_entry *mle = NULL; 676 struct dlm_master_list_entry *alloc_mle = NULL; 677 int blocked = 0; 678 int ret, nodenum; 679 struct dlm_node_iter iter; 680 unsigned int namelen; 681 int tries = 0; 682 int bit, wait_on_recovery = 0; 683 684 BUG_ON(!lockid); 685 686 namelen = strlen(lockid); 687 688 mlog(0, "get lockres %s (len %d)\n", lockid, namelen); 689 690 lookup: 691 spin_lock(&dlm->spinlock); 692 tmpres = __dlm_lookup_lockres(dlm, lockid, namelen); 693 if (tmpres) { 694 spin_unlock(&dlm->spinlock); 695 mlog(0, "found in hash!\n"); 696 if (res) 697 dlm_lockres_put(res); 698 res = tmpres; 699 goto leave; 700 } 701 702 if (!res) { 703 spin_unlock(&dlm->spinlock); 704 mlog(0, "allocating a new resource\n"); 705 /* nothing found and we need to allocate one. */ 706 alloc_mle = (struct dlm_master_list_entry *) 707 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); 708 if (!alloc_mle) 709 goto leave; 710 res = dlm_new_lockres(dlm, lockid, namelen); 711 if (!res) 712 goto leave; 713 goto lookup; 714 } 715 716 mlog(0, "no lockres found, allocated our own: %p\n", res); 717 718 if (flags & LKM_LOCAL) { 719 /* caller knows it's safe to assume it's not mastered elsewhere 720 * DONE! return right away */ 721 spin_lock(&res->spinlock); 722 dlm_change_lockres_owner(dlm, res, dlm->node_num); 723 __dlm_insert_lockres(dlm, res); 724 spin_unlock(&res->spinlock); 725 spin_unlock(&dlm->spinlock); 726 /* lockres still marked IN_PROGRESS */ 727 goto wake_waiters; 728 } 729 730 /* check master list to see if another node has started mastering it */ 731 spin_lock(&dlm->master_lock); 732 733 /* if we found a block, wait for lock to be mastered by another node */ 734 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen); 735 if (blocked) { 736 if (mle->type == DLM_MLE_MASTER) { 737 mlog(ML_ERROR, "master entry for nonexistent lock!\n"); 738 BUG(); 739 } else if (mle->type == DLM_MLE_MIGRATION) { 740 /* migration is in progress! */ 741 /* the good news is that we now know the 742 * "current" master (mle->master). */ 743 744 spin_unlock(&dlm->master_lock); 745 assert_spin_locked(&dlm->spinlock); 746 747 /* set the lockres owner and hash it */ 748 spin_lock(&res->spinlock); 749 dlm_set_lockres_owner(dlm, res, mle->master); 750 __dlm_insert_lockres(dlm, res); 751 spin_unlock(&res->spinlock); 752 spin_unlock(&dlm->spinlock); 753 754 /* master is known, detach */ 755 dlm_mle_detach_hb_events(dlm, mle); 756 dlm_put_mle(mle); 757 mle = NULL; 758 goto wake_waiters; 759 } 760 } else { 761 /* go ahead and try to master lock on this node */ 762 mle = alloc_mle; 763 /* make sure this does not get freed below */ 764 alloc_mle = NULL; 765 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0); 766 set_bit(dlm->node_num, mle->maybe_map); 767 list_add(&mle->list, &dlm->master_list); 768 769 /* still holding the dlm spinlock, check the recovery map 770 * to see if there are any nodes that still need to be 771 * considered. these will not appear in the mle nodemap 772 * but they might own this lockres. wait on them. */ 773 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); 774 if (bit < O2NM_MAX_NODES) { 775 mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to" 776 "recover before lock mastery can begin\n", 777 dlm->name, namelen, (char *)lockid, bit); 778 wait_on_recovery = 1; 779 } 780 } 781 782 /* at this point there is either a DLM_MLE_BLOCK or a 783 * DLM_MLE_MASTER on the master list, so it's safe to add the 784 * lockres to the hashtable. anyone who finds the lock will 785 * still have to wait on the IN_PROGRESS. */ 786 787 /* finally add the lockres to its hash bucket */ 788 __dlm_insert_lockres(dlm, res); 789 /* get an extra ref on the mle in case this is a BLOCK 790 * if so, the creator of the BLOCK may try to put the last 791 * ref at this time in the assert master handler, so we 792 * need an extra one to keep from a bad ptr deref. */ 793 dlm_get_mle(mle); 794 spin_unlock(&dlm->master_lock); 795 spin_unlock(&dlm->spinlock); 796 797 while (wait_on_recovery) { 798 /* any cluster changes that occurred after dropping the 799 * dlm spinlock would be detectable be a change on the mle, 800 * so we only need to clear out the recovery map once. */ 801 if (dlm_is_recovery_lock(lockid, namelen)) { 802 mlog(ML_NOTICE, "%s: recovery map is not empty, but " 803 "must master $RECOVERY lock now\n", dlm->name); 804 if (!dlm_pre_master_reco_lockres(dlm, res)) 805 wait_on_recovery = 0; 806 else { 807 mlog(0, "%s: waiting 500ms for heartbeat state " 808 "change\n", dlm->name); 809 msleep(500); 810 } 811 continue; 812 } 813 814 dlm_kick_recovery_thread(dlm); 815 msleep(100); 816 dlm_wait_for_recovery(dlm); 817 818 spin_lock(&dlm->spinlock); 819 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); 820 if (bit < O2NM_MAX_NODES) { 821 mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to" 822 "recover before lock mastery can begin\n", 823 dlm->name, namelen, (char *)lockid, bit); 824 wait_on_recovery = 1; 825 } else 826 wait_on_recovery = 0; 827 spin_unlock(&dlm->spinlock); 828 } 829 830 /* must wait for lock to be mastered elsewhere */ 831 if (blocked) 832 goto wait; 833 834 redo_request: 835 ret = -EINVAL; 836 dlm_node_iter_init(mle->vote_map, &iter); 837 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 838 ret = dlm_do_master_request(mle, nodenum); 839 if (ret < 0) 840 mlog_errno(ret); 841 if (mle->master != O2NM_MAX_NODES) { 842 /* found a master ! */ 843 if (mle->master <= nodenum) 844 break; 845 /* if our master request has not reached the master 846 * yet, keep going until it does. this is how the 847 * master will know that asserts are needed back to 848 * the lower nodes. */ 849 mlog(0, "%s:%.*s: requests only up to %u but master " 850 "is %u, keep going\n", dlm->name, namelen, 851 lockid, nodenum, mle->master); 852 } 853 } 854 855 wait: 856 /* keep going until the response map includes all nodes */ 857 ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); 858 if (ret < 0) { 859 mlog(0, "%s:%.*s: node map changed, redo the " 860 "master request now, blocked=%d\n", 861 dlm->name, res->lockname.len, 862 res->lockname.name, blocked); 863 if (++tries > 20) { 864 mlog(ML_ERROR, "%s:%.*s: spinning on " 865 "dlm_wait_for_lock_mastery, blocked=%d\n", 866 dlm->name, res->lockname.len, 867 res->lockname.name, blocked); 868 dlm_print_one_lock_resource(res); 869 /* dlm_print_one_mle(mle); */ 870 tries = 0; 871 } 872 goto redo_request; 873 } 874 875 mlog(0, "lockres mastered by %u\n", res->owner); 876 /* make sure we never continue without this */ 877 BUG_ON(res->owner == O2NM_MAX_NODES); 878 879 /* master is known, detach if not already detached */ 880 dlm_mle_detach_hb_events(dlm, mle); 881 dlm_put_mle(mle); 882 /* put the extra ref */ 883 dlm_put_mle(mle); 884 885 wake_waiters: 886 spin_lock(&res->spinlock); 887 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 888 spin_unlock(&res->spinlock); 889 wake_up(&res->wq); 890 891 leave: 892 /* need to free the unused mle */ 893 if (alloc_mle) 894 kmem_cache_free(dlm_mle_cache, alloc_mle); 895 896 return res; 897 } 898 899 900 #define DLM_MASTERY_TIMEOUT_MS 5000 901 902 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm, 903 struct dlm_lock_resource *res, 904 struct dlm_master_list_entry *mle, 905 int *blocked) 906 { 907 u8 m; 908 int ret, bit; 909 int map_changed, voting_done; 910 int assert, sleep; 911 912 recheck: 913 ret = 0; 914 assert = 0; 915 916 /* check if another node has already become the owner */ 917 spin_lock(&res->spinlock); 918 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { 919 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name, 920 res->lockname.len, res->lockname.name, res->owner); 921 spin_unlock(&res->spinlock); 922 /* this will cause the master to re-assert across 923 * the whole cluster, freeing up mles */ 924 ret = dlm_do_master_request(mle, res->owner); 925 if (ret < 0) { 926 /* give recovery a chance to run */ 927 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); 928 msleep(500); 929 goto recheck; 930 } 931 ret = 0; 932 goto leave; 933 } 934 spin_unlock(&res->spinlock); 935 936 spin_lock(&mle->spinlock); 937 m = mle->master; 938 map_changed = (memcmp(mle->vote_map, mle->node_map, 939 sizeof(mle->vote_map)) != 0); 940 voting_done = (memcmp(mle->vote_map, mle->response_map, 941 sizeof(mle->vote_map)) == 0); 942 943 /* restart if we hit any errors */ 944 if (map_changed) { 945 int b; 946 mlog(0, "%s: %.*s: node map changed, restarting\n", 947 dlm->name, res->lockname.len, res->lockname.name); 948 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked); 949 b = (mle->type == DLM_MLE_BLOCK); 950 if ((*blocked && !b) || (!*blocked && b)) { 951 mlog(0, "%s:%.*s: status change: old=%d new=%d\n", 952 dlm->name, res->lockname.len, res->lockname.name, 953 *blocked, b); 954 *blocked = b; 955 } 956 spin_unlock(&mle->spinlock); 957 if (ret < 0) { 958 mlog_errno(ret); 959 goto leave; 960 } 961 mlog(0, "%s:%.*s: restart lock mastery succeeded, " 962 "rechecking now\n", dlm->name, res->lockname.len, 963 res->lockname.name); 964 goto recheck; 965 } 966 967 if (m != O2NM_MAX_NODES) { 968 /* another node has done an assert! 969 * all done! */ 970 sleep = 0; 971 } else { 972 sleep = 1; 973 /* have all nodes responded? */ 974 if (voting_done && !*blocked) { 975 bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); 976 if (dlm->node_num <= bit) { 977 /* my node number is lowest. 978 * now tell other nodes that I am 979 * mastering this. */ 980 mle->master = dlm->node_num; 981 assert = 1; 982 sleep = 0; 983 } 984 /* if voting is done, but we have not received 985 * an assert master yet, we must sleep */ 986 } 987 } 988 989 spin_unlock(&mle->spinlock); 990 991 /* sleep if we haven't finished voting yet */ 992 if (sleep) { 993 unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS); 994 995 /* 996 if (atomic_read(&mle->mle_refs.refcount) < 2) 997 mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle, 998 atomic_read(&mle->mle_refs.refcount), 999 res->lockname.len, res->lockname.name); 1000 */ 1001 atomic_set(&mle->woken, 0); 1002 (void)wait_event_timeout(mle->wq, 1003 (atomic_read(&mle->woken) == 1), 1004 timeo); 1005 if (res->owner == O2NM_MAX_NODES) { 1006 mlog(0, "waiting again\n"); 1007 goto recheck; 1008 } 1009 mlog(0, "done waiting, master is %u\n", res->owner); 1010 ret = 0; 1011 goto leave; 1012 } 1013 1014 ret = 0; /* done */ 1015 if (assert) { 1016 m = dlm->node_num; 1017 mlog(0, "about to master %.*s here, this=%u\n", 1018 res->lockname.len, res->lockname.name, m); 1019 ret = dlm_do_assert_master(dlm, res->lockname.name, 1020 res->lockname.len, mle->vote_map, 0); 1021 if (ret) { 1022 /* This is a failure in the network path, 1023 * not in the response to the assert_master 1024 * (any nonzero response is a BUG on this node). 1025 * Most likely a socket just got disconnected 1026 * due to node death. */ 1027 mlog_errno(ret); 1028 } 1029 /* no longer need to restart lock mastery. 1030 * all living nodes have been contacted. */ 1031 ret = 0; 1032 } 1033 1034 /* set the lockres owner */ 1035 spin_lock(&res->spinlock); 1036 dlm_change_lockres_owner(dlm, res, m); 1037 spin_unlock(&res->spinlock); 1038 1039 leave: 1040 return ret; 1041 } 1042 1043 struct dlm_bitmap_diff_iter 1044 { 1045 int curnode; 1046 unsigned long *orig_bm; 1047 unsigned long *cur_bm; 1048 unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1049 }; 1050 1051 enum dlm_node_state_change 1052 { 1053 NODE_DOWN = -1, 1054 NODE_NO_CHANGE = 0, 1055 NODE_UP 1056 }; 1057 1058 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter, 1059 unsigned long *orig_bm, 1060 unsigned long *cur_bm) 1061 { 1062 unsigned long p1, p2; 1063 int i; 1064 1065 iter->curnode = -1; 1066 iter->orig_bm = orig_bm; 1067 iter->cur_bm = cur_bm; 1068 1069 for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) { 1070 p1 = *(iter->orig_bm + i); 1071 p2 = *(iter->cur_bm + i); 1072 iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1); 1073 } 1074 } 1075 1076 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter, 1077 enum dlm_node_state_change *state) 1078 { 1079 int bit; 1080 1081 if (iter->curnode >= O2NM_MAX_NODES) 1082 return -ENOENT; 1083 1084 bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES, 1085 iter->curnode+1); 1086 if (bit >= O2NM_MAX_NODES) { 1087 iter->curnode = O2NM_MAX_NODES; 1088 return -ENOENT; 1089 } 1090 1091 /* if it was there in the original then this node died */ 1092 if (test_bit(bit, iter->orig_bm)) 1093 *state = NODE_DOWN; 1094 else 1095 *state = NODE_UP; 1096 1097 iter->curnode = bit; 1098 return bit; 1099 } 1100 1101 1102 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, 1103 struct dlm_lock_resource *res, 1104 struct dlm_master_list_entry *mle, 1105 int blocked) 1106 { 1107 struct dlm_bitmap_diff_iter bdi; 1108 enum dlm_node_state_change sc; 1109 int node; 1110 int ret = 0; 1111 1112 mlog(0, "something happened such that the " 1113 "master process may need to be restarted!\n"); 1114 1115 assert_spin_locked(&mle->spinlock); 1116 1117 dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map); 1118 node = dlm_bitmap_diff_iter_next(&bdi, &sc); 1119 while (node >= 0) { 1120 if (sc == NODE_UP) { 1121 /* a node came up. clear any old vote from 1122 * the response map and set it in the vote map 1123 * then restart the mastery. */ 1124 mlog(ML_NOTICE, "node %d up while restarting\n", node); 1125 1126 /* redo the master request, but only for the new node */ 1127 mlog(0, "sending request to new node\n"); 1128 clear_bit(node, mle->response_map); 1129 set_bit(node, mle->vote_map); 1130 } else { 1131 mlog(ML_ERROR, "node down! %d\n", node); 1132 1133 /* if the node wasn't involved in mastery skip it, 1134 * but clear it out from the maps so that it will 1135 * not affect mastery of this lockres */ 1136 clear_bit(node, mle->response_map); 1137 clear_bit(node, mle->vote_map); 1138 if (!test_bit(node, mle->maybe_map)) 1139 goto next; 1140 1141 /* if we're already blocked on lock mastery, and the 1142 * dead node wasn't the expected master, or there is 1143 * another node in the maybe_map, keep waiting */ 1144 if (blocked) { 1145 int lowest = find_next_bit(mle->maybe_map, 1146 O2NM_MAX_NODES, 0); 1147 1148 /* act like it was never there */ 1149 clear_bit(node, mle->maybe_map); 1150 1151 if (node != lowest) 1152 goto next; 1153 1154 mlog(ML_ERROR, "expected master %u died while " 1155 "this node was blocked waiting on it!\n", 1156 node); 1157 lowest = find_next_bit(mle->maybe_map, 1158 O2NM_MAX_NODES, 1159 lowest+1); 1160 if (lowest < O2NM_MAX_NODES) { 1161 mlog(0, "still blocked. waiting " 1162 "on %u now\n", lowest); 1163 goto next; 1164 } 1165 1166 /* mle is an MLE_BLOCK, but there is now 1167 * nothing left to block on. we need to return 1168 * all the way back out and try again with 1169 * an MLE_MASTER. dlm_do_local_recovery_cleanup 1170 * has already run, so the mle refcount is ok */ 1171 mlog(0, "no longer blocking. we can " 1172 "try to master this here\n"); 1173 mle->type = DLM_MLE_MASTER; 1174 memset(mle->maybe_map, 0, 1175 sizeof(mle->maybe_map)); 1176 memset(mle->response_map, 0, 1177 sizeof(mle->maybe_map)); 1178 memcpy(mle->vote_map, mle->node_map, 1179 sizeof(mle->node_map)); 1180 mle->u.res = res; 1181 set_bit(dlm->node_num, mle->maybe_map); 1182 1183 ret = -EAGAIN; 1184 goto next; 1185 } 1186 1187 clear_bit(node, mle->maybe_map); 1188 if (node > dlm->node_num) 1189 goto next; 1190 1191 mlog(0, "dead node in map!\n"); 1192 /* yuck. go back and re-contact all nodes 1193 * in the vote_map, removing this node. */ 1194 memset(mle->response_map, 0, 1195 sizeof(mle->response_map)); 1196 } 1197 ret = -EAGAIN; 1198 next: 1199 node = dlm_bitmap_diff_iter_next(&bdi, &sc); 1200 } 1201 return ret; 1202 } 1203 1204 1205 /* 1206 * DLM_MASTER_REQUEST_MSG 1207 * 1208 * returns: 0 on success, 1209 * -errno on a network error 1210 * 1211 * on error, the caller should assume the target node is "dead" 1212 * 1213 */ 1214 1215 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to) 1216 { 1217 struct dlm_ctxt *dlm = mle->dlm; 1218 struct dlm_master_request request; 1219 int ret, response=0, resend; 1220 1221 memset(&request, 0, sizeof(request)); 1222 request.node_idx = dlm->node_num; 1223 1224 BUG_ON(mle->type == DLM_MLE_MIGRATION); 1225 1226 if (mle->type != DLM_MLE_MASTER) { 1227 request.namelen = mle->u.name.len; 1228 memcpy(request.name, mle->u.name.name, request.namelen); 1229 } else { 1230 request.namelen = mle->u.res->lockname.len; 1231 memcpy(request.name, mle->u.res->lockname.name, 1232 request.namelen); 1233 } 1234 1235 again: 1236 ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request, 1237 sizeof(request), to, &response); 1238 if (ret < 0) { 1239 if (ret == -ESRCH) { 1240 /* should never happen */ 1241 mlog(ML_ERROR, "TCP stack not ready!\n"); 1242 BUG(); 1243 } else if (ret == -EINVAL) { 1244 mlog(ML_ERROR, "bad args passed to o2net!\n"); 1245 BUG(); 1246 } else if (ret == -ENOMEM) { 1247 mlog(ML_ERROR, "out of memory while trying to send " 1248 "network message! retrying\n"); 1249 /* this is totally crude */ 1250 msleep(50); 1251 goto again; 1252 } else if (!dlm_is_host_down(ret)) { 1253 /* not a network error. bad. */ 1254 mlog_errno(ret); 1255 mlog(ML_ERROR, "unhandled error!"); 1256 BUG(); 1257 } 1258 /* all other errors should be network errors, 1259 * and likely indicate node death */ 1260 mlog(ML_ERROR, "link to %d went down!\n", to); 1261 goto out; 1262 } 1263 1264 ret = 0; 1265 resend = 0; 1266 spin_lock(&mle->spinlock); 1267 switch (response) { 1268 case DLM_MASTER_RESP_YES: 1269 set_bit(to, mle->response_map); 1270 mlog(0, "node %u is the master, response=YES\n", to); 1271 mle->master = to; 1272 break; 1273 case DLM_MASTER_RESP_NO: 1274 mlog(0, "node %u not master, response=NO\n", to); 1275 set_bit(to, mle->response_map); 1276 break; 1277 case DLM_MASTER_RESP_MAYBE: 1278 mlog(0, "node %u not master, response=MAYBE\n", to); 1279 set_bit(to, mle->response_map); 1280 set_bit(to, mle->maybe_map); 1281 break; 1282 case DLM_MASTER_RESP_ERROR: 1283 mlog(0, "node %u hit an error, resending\n", to); 1284 resend = 1; 1285 response = 0; 1286 break; 1287 default: 1288 mlog(ML_ERROR, "bad response! %u\n", response); 1289 BUG(); 1290 } 1291 spin_unlock(&mle->spinlock); 1292 if (resend) { 1293 /* this is also totally crude */ 1294 msleep(50); 1295 goto again; 1296 } 1297 1298 out: 1299 return ret; 1300 } 1301 1302 /* 1303 * locks that can be taken here: 1304 * dlm->spinlock 1305 * res->spinlock 1306 * mle->spinlock 1307 * dlm->master_list 1308 * 1309 * if possible, TRIM THIS DOWN!!! 1310 */ 1311 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) 1312 { 1313 u8 response = DLM_MASTER_RESP_MAYBE; 1314 struct dlm_ctxt *dlm = data; 1315 struct dlm_lock_resource *res = NULL; 1316 struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; 1317 struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; 1318 char *name; 1319 unsigned int namelen; 1320 int found, ret; 1321 int set_maybe; 1322 int dispatch_assert = 0; 1323 1324 if (!dlm_grab(dlm)) 1325 return DLM_MASTER_RESP_NO; 1326 1327 if (!dlm_domain_fully_joined(dlm)) { 1328 response = DLM_MASTER_RESP_NO; 1329 goto send_response; 1330 } 1331 1332 name = request->name; 1333 namelen = request->namelen; 1334 1335 if (namelen > DLM_LOCKID_NAME_MAX) { 1336 response = DLM_IVBUFLEN; 1337 goto send_response; 1338 } 1339 1340 way_up_top: 1341 spin_lock(&dlm->spinlock); 1342 res = __dlm_lookup_lockres(dlm, name, namelen); 1343 if (res) { 1344 spin_unlock(&dlm->spinlock); 1345 1346 /* take care of the easy cases up front */ 1347 spin_lock(&res->spinlock); 1348 if (res->state & DLM_LOCK_RES_RECOVERING) { 1349 spin_unlock(&res->spinlock); 1350 mlog(0, "returning DLM_MASTER_RESP_ERROR since res is " 1351 "being recovered\n"); 1352 response = DLM_MASTER_RESP_ERROR; 1353 if (mle) 1354 kmem_cache_free(dlm_mle_cache, mle); 1355 goto send_response; 1356 } 1357 1358 if (res->owner == dlm->node_num) { 1359 spin_unlock(&res->spinlock); 1360 // mlog(0, "this node is the master\n"); 1361 response = DLM_MASTER_RESP_YES; 1362 if (mle) 1363 kmem_cache_free(dlm_mle_cache, mle); 1364 1365 /* this node is the owner. 1366 * there is some extra work that needs to 1367 * happen now. the requesting node has 1368 * caused all nodes up to this one to 1369 * create mles. this node now needs to 1370 * go back and clean those up. */ 1371 dispatch_assert = 1; 1372 goto send_response; 1373 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { 1374 spin_unlock(&res->spinlock); 1375 // mlog(0, "node %u is the master\n", res->owner); 1376 response = DLM_MASTER_RESP_NO; 1377 if (mle) 1378 kmem_cache_free(dlm_mle_cache, mle); 1379 goto send_response; 1380 } 1381 1382 /* ok, there is no owner. either this node is 1383 * being blocked, or it is actively trying to 1384 * master this lock. */ 1385 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) { 1386 mlog(ML_ERROR, "lock with no owner should be " 1387 "in-progress!\n"); 1388 BUG(); 1389 } 1390 1391 // mlog(0, "lockres is in progress...\n"); 1392 spin_lock(&dlm->master_lock); 1393 found = dlm_find_mle(dlm, &tmpmle, name, namelen); 1394 if (!found) { 1395 mlog(ML_ERROR, "no mle found for this lock!\n"); 1396 BUG(); 1397 } 1398 set_maybe = 1; 1399 spin_lock(&tmpmle->spinlock); 1400 if (tmpmle->type == DLM_MLE_BLOCK) { 1401 // mlog(0, "this node is waiting for " 1402 // "lockres to be mastered\n"); 1403 response = DLM_MASTER_RESP_NO; 1404 } else if (tmpmle->type == DLM_MLE_MIGRATION) { 1405 mlog(0, "node %u is master, but trying to migrate to " 1406 "node %u.\n", tmpmle->master, tmpmle->new_master); 1407 if (tmpmle->master == dlm->node_num) { 1408 response = DLM_MASTER_RESP_YES; 1409 mlog(ML_ERROR, "no owner on lockres, but this " 1410 "node is trying to migrate it to %u?!\n", 1411 tmpmle->new_master); 1412 BUG(); 1413 } else { 1414 /* the real master can respond on its own */ 1415 response = DLM_MASTER_RESP_NO; 1416 } 1417 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) { 1418 set_maybe = 0; 1419 if (tmpmle->master == dlm->node_num) { 1420 response = DLM_MASTER_RESP_YES; 1421 /* this node will be the owner. 1422 * go back and clean the mles on any 1423 * other nodes */ 1424 dispatch_assert = 1; 1425 } else 1426 response = DLM_MASTER_RESP_NO; 1427 } else { 1428 // mlog(0, "this node is attempting to " 1429 // "master lockres\n"); 1430 response = DLM_MASTER_RESP_MAYBE; 1431 } 1432 if (set_maybe) 1433 set_bit(request->node_idx, tmpmle->maybe_map); 1434 spin_unlock(&tmpmle->spinlock); 1435 1436 spin_unlock(&dlm->master_lock); 1437 spin_unlock(&res->spinlock); 1438 1439 /* keep the mle attached to heartbeat events */ 1440 dlm_put_mle(tmpmle); 1441 if (mle) 1442 kmem_cache_free(dlm_mle_cache, mle); 1443 goto send_response; 1444 } 1445 1446 /* 1447 * lockres doesn't exist on this node 1448 * if there is an MLE_BLOCK, return NO 1449 * if there is an MLE_MASTER, return MAYBE 1450 * otherwise, add an MLE_BLOCK, return NO 1451 */ 1452 spin_lock(&dlm->master_lock); 1453 found = dlm_find_mle(dlm, &tmpmle, name, namelen); 1454 if (!found) { 1455 /* this lockid has never been seen on this node yet */ 1456 // mlog(0, "no mle found\n"); 1457 if (!mle) { 1458 spin_unlock(&dlm->master_lock); 1459 spin_unlock(&dlm->spinlock); 1460 1461 mle = (struct dlm_master_list_entry *) 1462 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); 1463 if (!mle) { 1464 response = DLM_MASTER_RESP_ERROR; 1465 mlog_errno(-ENOMEM); 1466 goto send_response; 1467 } 1468 spin_lock(&dlm->spinlock); 1469 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, 1470 name, namelen); 1471 spin_unlock(&dlm->spinlock); 1472 goto way_up_top; 1473 } 1474 1475 // mlog(0, "this is second time thru, already allocated, " 1476 // "add the block.\n"); 1477 set_bit(request->node_idx, mle->maybe_map); 1478 list_add(&mle->list, &dlm->master_list); 1479 response = DLM_MASTER_RESP_NO; 1480 } else { 1481 // mlog(0, "mle was found\n"); 1482 set_maybe = 1; 1483 spin_lock(&tmpmle->spinlock); 1484 if (tmpmle->master == dlm->node_num) { 1485 mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n"); 1486 BUG(); 1487 } 1488 if (tmpmle->type == DLM_MLE_BLOCK) 1489 response = DLM_MASTER_RESP_NO; 1490 else if (tmpmle->type == DLM_MLE_MIGRATION) { 1491 mlog(0, "migration mle was found (%u->%u)\n", 1492 tmpmle->master, tmpmle->new_master); 1493 /* real master can respond on its own */ 1494 response = DLM_MASTER_RESP_NO; 1495 } else 1496 response = DLM_MASTER_RESP_MAYBE; 1497 if (set_maybe) 1498 set_bit(request->node_idx, tmpmle->maybe_map); 1499 spin_unlock(&tmpmle->spinlock); 1500 } 1501 spin_unlock(&dlm->master_lock); 1502 spin_unlock(&dlm->spinlock); 1503 1504 if (found) { 1505 /* keep the mle attached to heartbeat events */ 1506 dlm_put_mle(tmpmle); 1507 } 1508 send_response: 1509 1510 if (dispatch_assert) { 1511 if (response != DLM_MASTER_RESP_YES) 1512 mlog(ML_ERROR, "invalid response %d\n", response); 1513 if (!res) { 1514 mlog(ML_ERROR, "bad lockres while trying to assert!\n"); 1515 BUG(); 1516 } 1517 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n", 1518 dlm->node_num, res->lockname.len, res->lockname.name); 1519 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, 1520 DLM_ASSERT_MASTER_MLE_CLEANUP); 1521 if (ret < 0) { 1522 mlog(ML_ERROR, "failed to dispatch assert master work\n"); 1523 response = DLM_MASTER_RESP_ERROR; 1524 } 1525 } 1526 1527 dlm_put(dlm); 1528 return response; 1529 } 1530 1531 /* 1532 * DLM_ASSERT_MASTER_MSG 1533 */ 1534 1535 1536 /* 1537 * NOTE: this can be used for debugging 1538 * can periodically run all locks owned by this node 1539 * and re-assert across the cluster... 1540 */ 1541 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, 1542 unsigned int namelen, void *nodemap, 1543 u32 flags) 1544 { 1545 struct dlm_assert_master assert; 1546 int to, tmpret; 1547 struct dlm_node_iter iter; 1548 int ret = 0; 1549 int reassert; 1550 1551 BUG_ON(namelen > O2NM_MAX_NAME_LEN); 1552 again: 1553 reassert = 0; 1554 1555 /* note that if this nodemap is empty, it returns 0 */ 1556 dlm_node_iter_init(nodemap, &iter); 1557 while ((to = dlm_node_iter_next(&iter)) >= 0) { 1558 int r = 0; 1559 mlog(0, "sending assert master to %d (%.*s)\n", to, 1560 namelen, lockname); 1561 memset(&assert, 0, sizeof(assert)); 1562 assert.node_idx = dlm->node_num; 1563 assert.namelen = namelen; 1564 memcpy(assert.name, lockname, namelen); 1565 assert.flags = cpu_to_be32(flags); 1566 1567 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key, 1568 &assert, sizeof(assert), to, &r); 1569 if (tmpret < 0) { 1570 mlog(ML_ERROR, "assert_master returned %d!\n", tmpret); 1571 if (!dlm_is_host_down(tmpret)) { 1572 mlog(ML_ERROR, "unhandled error!\n"); 1573 BUG(); 1574 } 1575 /* a node died. finish out the rest of the nodes. */ 1576 mlog(ML_ERROR, "link to %d went down!\n", to); 1577 /* any nonzero status return will do */ 1578 ret = tmpret; 1579 } else if (r < 0) { 1580 /* ok, something horribly messed. kill thyself. */ 1581 mlog(ML_ERROR,"during assert master of %.*s to %u, " 1582 "got %d.\n", namelen, lockname, to, r); 1583 dlm_dump_lock_resources(dlm); 1584 BUG(); 1585 } else if (r == EAGAIN) { 1586 mlog(0, "%.*s: node %u create mles on other " 1587 "nodes and requests a re-assert\n", 1588 namelen, lockname, to); 1589 reassert = 1; 1590 } 1591 } 1592 1593 if (reassert) 1594 goto again; 1595 1596 return ret; 1597 } 1598 1599 /* 1600 * locks that can be taken here: 1601 * dlm->spinlock 1602 * res->spinlock 1603 * mle->spinlock 1604 * dlm->master_list 1605 * 1606 * if possible, TRIM THIS DOWN!!! 1607 */ 1608 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) 1609 { 1610 struct dlm_ctxt *dlm = data; 1611 struct dlm_master_list_entry *mle = NULL; 1612 struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf; 1613 struct dlm_lock_resource *res = NULL; 1614 char *name; 1615 unsigned int namelen; 1616 u32 flags; 1617 int master_request = 0; 1618 int ret = 0; 1619 1620 if (!dlm_grab(dlm)) 1621 return 0; 1622 1623 name = assert->name; 1624 namelen = assert->namelen; 1625 flags = be32_to_cpu(assert->flags); 1626 1627 if (namelen > DLM_LOCKID_NAME_MAX) { 1628 mlog(ML_ERROR, "Invalid name length!"); 1629 goto done; 1630 } 1631 1632 spin_lock(&dlm->spinlock); 1633 1634 if (flags) 1635 mlog(0, "assert_master with flags: %u\n", flags); 1636 1637 /* find the MLE */ 1638 spin_lock(&dlm->master_lock); 1639 if (!dlm_find_mle(dlm, &mle, name, namelen)) { 1640 /* not an error, could be master just re-asserting */ 1641 mlog(0, "just got an assert_master from %u, but no " 1642 "MLE for it! (%.*s)\n", assert->node_idx, 1643 namelen, name); 1644 } else { 1645 int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0); 1646 if (bit >= O2NM_MAX_NODES) { 1647 /* not necessarily an error, though less likely. 1648 * could be master just re-asserting. */ 1649 mlog(ML_ERROR, "no bits set in the maybe_map, but %u " 1650 "is asserting! (%.*s)\n", assert->node_idx, 1651 namelen, name); 1652 } else if (bit != assert->node_idx) { 1653 if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) { 1654 mlog(0, "master %u was found, %u should " 1655 "back off\n", assert->node_idx, bit); 1656 } else { 1657 /* with the fix for bug 569, a higher node 1658 * number winning the mastery will respond 1659 * YES to mastery requests, but this node 1660 * had no way of knowing. let it pass. */ 1661 mlog(ML_ERROR, "%u is the lowest node, " 1662 "%u is asserting. (%.*s) %u must " 1663 "have begun after %u won.\n", bit, 1664 assert->node_idx, namelen, name, bit, 1665 assert->node_idx); 1666 } 1667 } 1668 } 1669 spin_unlock(&dlm->master_lock); 1670 1671 /* ok everything checks out with the MLE 1672 * now check to see if there is a lockres */ 1673 res = __dlm_lookup_lockres(dlm, name, namelen); 1674 if (res) { 1675 spin_lock(&res->spinlock); 1676 if (res->state & DLM_LOCK_RES_RECOVERING) { 1677 mlog(ML_ERROR, "%u asserting but %.*s is " 1678 "RECOVERING!\n", assert->node_idx, namelen, name); 1679 goto kill; 1680 } 1681 if (!mle) { 1682 if (res->owner != assert->node_idx) { 1683 mlog(ML_ERROR, "assert_master from " 1684 "%u, but current owner is " 1685 "%u! (%.*s)\n", 1686 assert->node_idx, res->owner, 1687 namelen, name); 1688 goto kill; 1689 } 1690 } else if (mle->type != DLM_MLE_MIGRATION) { 1691 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { 1692 /* owner is just re-asserting */ 1693 if (res->owner == assert->node_idx) { 1694 mlog(0, "owner %u re-asserting on " 1695 "lock %.*s\n", assert->node_idx, 1696 namelen, name); 1697 goto ok; 1698 } 1699 mlog(ML_ERROR, "got assert_master from " 1700 "node %u, but %u is the owner! " 1701 "(%.*s)\n", assert->node_idx, 1702 res->owner, namelen, name); 1703 goto kill; 1704 } 1705 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) { 1706 mlog(ML_ERROR, "got assert from %u, but lock " 1707 "with no owner should be " 1708 "in-progress! (%.*s)\n", 1709 assert->node_idx, 1710 namelen, name); 1711 goto kill; 1712 } 1713 } else /* mle->type == DLM_MLE_MIGRATION */ { 1714 /* should only be getting an assert from new master */ 1715 if (assert->node_idx != mle->new_master) { 1716 mlog(ML_ERROR, "got assert from %u, but " 1717 "new master is %u, and old master " 1718 "was %u (%.*s)\n", 1719 assert->node_idx, mle->new_master, 1720 mle->master, namelen, name); 1721 goto kill; 1722 } 1723 1724 } 1725 ok: 1726 spin_unlock(&res->spinlock); 1727 } 1728 spin_unlock(&dlm->spinlock); 1729 1730 // mlog(0, "woo! got an assert_master from node %u!\n", 1731 // assert->node_idx); 1732 if (mle) { 1733 int extra_ref = 0; 1734 int nn = -1; 1735 1736 spin_lock(&mle->spinlock); 1737 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) 1738 extra_ref = 1; 1739 else { 1740 /* MASTER mle: if any bits set in the response map 1741 * then the calling node needs to re-assert to clear 1742 * up nodes that this node contacted */ 1743 while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, 1744 nn+1)) < O2NM_MAX_NODES) { 1745 if (nn != dlm->node_num && nn != assert->node_idx) 1746 master_request = 1; 1747 } 1748 } 1749 mle->master = assert->node_idx; 1750 atomic_set(&mle->woken, 1); 1751 wake_up(&mle->wq); 1752 spin_unlock(&mle->spinlock); 1753 1754 if (mle->type == DLM_MLE_MIGRATION && res) { 1755 mlog(0, "finishing off migration of lockres %.*s, " 1756 "from %u to %u\n", 1757 res->lockname.len, res->lockname.name, 1758 dlm->node_num, mle->new_master); 1759 spin_lock(&res->spinlock); 1760 res->state &= ~DLM_LOCK_RES_MIGRATING; 1761 dlm_change_lockres_owner(dlm, res, mle->new_master); 1762 BUG_ON(res->state & DLM_LOCK_RES_DIRTY); 1763 spin_unlock(&res->spinlock); 1764 } 1765 /* master is known, detach if not already detached */ 1766 dlm_mle_detach_hb_events(dlm, mle); 1767 dlm_put_mle(mle); 1768 1769 if (extra_ref) { 1770 /* the assert master message now balances the extra 1771 * ref given by the master / migration request message. 1772 * if this is the last put, it will be removed 1773 * from the list. */ 1774 dlm_put_mle(mle); 1775 } 1776 } 1777 1778 done: 1779 ret = 0; 1780 if (res) 1781 dlm_lockres_put(res); 1782 dlm_put(dlm); 1783 if (master_request) { 1784 mlog(0, "need to tell master to reassert\n"); 1785 ret = EAGAIN; // positive. negative would shoot down the node. 1786 } 1787 return ret; 1788 1789 kill: 1790 /* kill the caller! */ 1791 spin_unlock(&res->spinlock); 1792 spin_unlock(&dlm->spinlock); 1793 dlm_lockres_put(res); 1794 mlog(ML_ERROR, "Bad message received from another node. Dumping state " 1795 "and killing the other node now! This node is OK and can continue.\n"); 1796 dlm_dump_lock_resources(dlm); 1797 dlm_put(dlm); 1798 return -EINVAL; 1799 } 1800 1801 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, 1802 struct dlm_lock_resource *res, 1803 int ignore_higher, u8 request_from, u32 flags) 1804 { 1805 struct dlm_work_item *item; 1806 item = kcalloc(1, sizeof(*item), GFP_KERNEL); 1807 if (!item) 1808 return -ENOMEM; 1809 1810 1811 /* queue up work for dlm_assert_master_worker */ 1812 dlm_grab(dlm); /* get an extra ref for the work item */ 1813 dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL); 1814 item->u.am.lockres = res; /* already have a ref */ 1815 /* can optionally ignore node numbers higher than this node */ 1816 item->u.am.ignore_higher = ignore_higher; 1817 item->u.am.request_from = request_from; 1818 item->u.am.flags = flags; 1819 1820 if (ignore_higher) 1821 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, 1822 res->lockname.name); 1823 1824 spin_lock(&dlm->work_lock); 1825 list_add_tail(&item->list, &dlm->work_list); 1826 spin_unlock(&dlm->work_lock); 1827 1828 schedule_work(&dlm->dispatched_work); 1829 return 0; 1830 } 1831 1832 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) 1833 { 1834 struct dlm_ctxt *dlm = data; 1835 int ret = 0; 1836 struct dlm_lock_resource *res; 1837 unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1838 int ignore_higher; 1839 int bit; 1840 u8 request_from; 1841 u32 flags; 1842 1843 dlm = item->dlm; 1844 res = item->u.am.lockres; 1845 ignore_higher = item->u.am.ignore_higher; 1846 request_from = item->u.am.request_from; 1847 flags = item->u.am.flags; 1848 1849 spin_lock(&dlm->spinlock); 1850 memcpy(nodemap, dlm->domain_map, sizeof(nodemap)); 1851 spin_unlock(&dlm->spinlock); 1852 1853 clear_bit(dlm->node_num, nodemap); 1854 if (ignore_higher) { 1855 /* if is this just to clear up mles for nodes below 1856 * this node, do not send the message to the original 1857 * caller or any node number higher than this */ 1858 clear_bit(request_from, nodemap); 1859 bit = dlm->node_num; 1860 while (1) { 1861 bit = find_next_bit(nodemap, O2NM_MAX_NODES, 1862 bit+1); 1863 if (bit >= O2NM_MAX_NODES) 1864 break; 1865 clear_bit(bit, nodemap); 1866 } 1867 } 1868 1869 /* this call now finishes out the nodemap 1870 * even if one or more nodes die */ 1871 mlog(0, "worker about to master %.*s here, this=%u\n", 1872 res->lockname.len, res->lockname.name, dlm->node_num); 1873 ret = dlm_do_assert_master(dlm, res->lockname.name, 1874 res->lockname.len, 1875 nodemap, flags); 1876 if (ret < 0) { 1877 /* no need to restart, we are done */ 1878 mlog_errno(ret); 1879 } 1880 1881 dlm_lockres_put(res); 1882 1883 mlog(0, "finished with dlm_assert_master_worker\n"); 1884 } 1885 1886 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread. 1887 * We cannot wait for node recovery to complete to begin mastering this 1888 * lockres because this lockres is used to kick off recovery! ;-) 1889 * So, do a pre-check on all living nodes to see if any of those nodes 1890 * think that $RECOVERY is currently mastered by a dead node. If so, 1891 * we wait a short time to allow that node to get notified by its own 1892 * heartbeat stack, then check again. All $RECOVERY lock resources 1893 * mastered by dead nodes are purged when the hearbeat callback is 1894 * fired, so we can know for sure that it is safe to continue once 1895 * the node returns a live node or no node. */ 1896 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, 1897 struct dlm_lock_resource *res) 1898 { 1899 struct dlm_node_iter iter; 1900 int nodenum; 1901 int ret = 0; 1902 u8 master = DLM_LOCK_RES_OWNER_UNKNOWN; 1903 1904 spin_lock(&dlm->spinlock); 1905 dlm_node_iter_init(dlm->domain_map, &iter); 1906 spin_unlock(&dlm->spinlock); 1907 1908 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 1909 /* do not send to self */ 1910 if (nodenum == dlm->node_num) 1911 continue; 1912 ret = dlm_do_master_requery(dlm, res, nodenum, &master); 1913 if (ret < 0) { 1914 mlog_errno(ret); 1915 if (!dlm_is_host_down(ret)) 1916 BUG(); 1917 /* host is down, so answer for that node would be 1918 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ 1919 } 1920 1921 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) { 1922 /* check to see if this master is in the recovery map */ 1923 spin_lock(&dlm->spinlock); 1924 if (test_bit(master, dlm->recovery_map)) { 1925 mlog(ML_NOTICE, "%s: node %u has not seen " 1926 "node %u go down yet, and thinks the " 1927 "dead node is mastering the recovery " 1928 "lock. must wait.\n", dlm->name, 1929 nodenum, master); 1930 ret = -EAGAIN; 1931 } 1932 spin_unlock(&dlm->spinlock); 1933 mlog(0, "%s: reco lock master is %u\n", dlm->name, 1934 master); 1935 break; 1936 } 1937 } 1938 return ret; 1939 } 1940 1941 1942 /* 1943 * DLM_MIGRATE_LOCKRES 1944 */ 1945 1946 1947 int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 1948 u8 target) 1949 { 1950 struct dlm_master_list_entry *mle = NULL; 1951 struct dlm_master_list_entry *oldmle = NULL; 1952 struct dlm_migratable_lockres *mres = NULL; 1953 int ret = -EINVAL; 1954 const char *name; 1955 unsigned int namelen; 1956 int mle_added = 0; 1957 struct list_head *queue, *iter; 1958 int i; 1959 struct dlm_lock *lock; 1960 int empty = 1; 1961 1962 if (!dlm_grab(dlm)) 1963 return -EINVAL; 1964 1965 name = res->lockname.name; 1966 namelen = res->lockname.len; 1967 1968 mlog(0, "migrating %.*s to %u\n", namelen, name, target); 1969 1970 /* 1971 * ensure this lockres is a proper candidate for migration 1972 */ 1973 spin_lock(&res->spinlock); 1974 if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { 1975 mlog(0, "cannot migrate lockres with unknown owner!\n"); 1976 spin_unlock(&res->spinlock); 1977 goto leave; 1978 } 1979 if (res->owner != dlm->node_num) { 1980 mlog(0, "cannot migrate lockres this node doesn't own!\n"); 1981 spin_unlock(&res->spinlock); 1982 goto leave; 1983 } 1984 mlog(0, "checking queues...\n"); 1985 queue = &res->granted; 1986 for (i=0; i<3; i++) { 1987 list_for_each(iter, queue) { 1988 lock = list_entry (iter, struct dlm_lock, list); 1989 empty = 0; 1990 if (lock->ml.node == dlm->node_num) { 1991 mlog(0, "found a lock owned by this node " 1992 "still on the %s queue! will not " 1993 "migrate this lockres\n", 1994 i==0 ? "granted" : 1995 (i==1 ? "converting" : "blocked")); 1996 spin_unlock(&res->spinlock); 1997 ret = -ENOTEMPTY; 1998 goto leave; 1999 } 2000 } 2001 queue++; 2002 } 2003 mlog(0, "all locks on this lockres are nonlocal. continuing\n"); 2004 spin_unlock(&res->spinlock); 2005 2006 /* no work to do */ 2007 if (empty) { 2008 mlog(0, "no locks were found on this lockres! done!\n"); 2009 ret = 0; 2010 goto leave; 2011 } 2012 2013 /* 2014 * preallocate up front 2015 * if this fails, abort 2016 */ 2017 2018 ret = -ENOMEM; 2019 mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL); 2020 if (!mres) { 2021 mlog_errno(ret); 2022 goto leave; 2023 } 2024 2025 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, 2026 GFP_KERNEL); 2027 if (!mle) { 2028 mlog_errno(ret); 2029 goto leave; 2030 } 2031 ret = 0; 2032 2033 /* 2034 * find a node to migrate the lockres to 2035 */ 2036 2037 mlog(0, "picking a migration node\n"); 2038 spin_lock(&dlm->spinlock); 2039 /* pick a new node */ 2040 if (!test_bit(target, dlm->domain_map) || 2041 target >= O2NM_MAX_NODES) { 2042 target = dlm_pick_migration_target(dlm, res); 2043 } 2044 mlog(0, "node %u chosen for migration\n", target); 2045 2046 if (target >= O2NM_MAX_NODES || 2047 !test_bit(target, dlm->domain_map)) { 2048 /* target chosen is not alive */ 2049 ret = -EINVAL; 2050 } 2051 2052 if (ret) { 2053 spin_unlock(&dlm->spinlock); 2054 goto fail; 2055 } 2056 2057 mlog(0, "continuing with target = %u\n", target); 2058 2059 /* 2060 * clear any existing master requests and 2061 * add the migration mle to the list 2062 */ 2063 spin_lock(&dlm->master_lock); 2064 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name, 2065 namelen, target, dlm->node_num); 2066 spin_unlock(&dlm->master_lock); 2067 spin_unlock(&dlm->spinlock); 2068 2069 if (ret == -EEXIST) { 2070 mlog(0, "another process is already migrating it\n"); 2071 goto fail; 2072 } 2073 mle_added = 1; 2074 2075 /* 2076 * set the MIGRATING flag and flush asts 2077 * if we fail after this we need to re-dirty the lockres 2078 */ 2079 if (dlm_mark_lockres_migrating(dlm, res, target) < 0) { 2080 mlog(ML_ERROR, "tried to migrate %.*s to %u, but " 2081 "the target went down.\n", res->lockname.len, 2082 res->lockname.name, target); 2083 spin_lock(&res->spinlock); 2084 res->state &= ~DLM_LOCK_RES_MIGRATING; 2085 spin_unlock(&res->spinlock); 2086 ret = -EINVAL; 2087 } 2088 2089 fail: 2090 if (oldmle) { 2091 /* master is known, detach if not already detached */ 2092 dlm_mle_detach_hb_events(dlm, oldmle); 2093 dlm_put_mle(oldmle); 2094 } 2095 2096 if (ret < 0) { 2097 if (mle_added) { 2098 dlm_mle_detach_hb_events(dlm, mle); 2099 dlm_put_mle(mle); 2100 } else if (mle) { 2101 kmem_cache_free(dlm_mle_cache, mle); 2102 } 2103 goto leave; 2104 } 2105 2106 /* 2107 * at this point, we have a migration target, an mle 2108 * in the master list, and the MIGRATING flag set on 2109 * the lockres 2110 */ 2111 2112 2113 /* get an extra reference on the mle. 2114 * otherwise the assert_master from the new 2115 * master will destroy this. 2116 * also, make sure that all callers of dlm_get_mle 2117 * take both dlm->spinlock and dlm->master_lock */ 2118 spin_lock(&dlm->spinlock); 2119 spin_lock(&dlm->master_lock); 2120 dlm_get_mle(mle); 2121 spin_unlock(&dlm->master_lock); 2122 spin_unlock(&dlm->spinlock); 2123 2124 /* notify new node and send all lock state */ 2125 /* call send_one_lockres with migration flag. 2126 * this serves as notice to the target node that a 2127 * migration is starting. */ 2128 ret = dlm_send_one_lockres(dlm, res, mres, target, 2129 DLM_MRES_MIGRATION); 2130 2131 if (ret < 0) { 2132 mlog(0, "migration to node %u failed with %d\n", 2133 target, ret); 2134 /* migration failed, detach and clean up mle */ 2135 dlm_mle_detach_hb_events(dlm, mle); 2136 dlm_put_mle(mle); 2137 dlm_put_mle(mle); 2138 goto leave; 2139 } 2140 2141 /* at this point, the target sends a message to all nodes, 2142 * (using dlm_do_migrate_request). this node is skipped since 2143 * we had to put an mle in the list to begin the process. this 2144 * node now waits for target to do an assert master. this node 2145 * will be the last one notified, ensuring that the migration 2146 * is complete everywhere. if the target dies while this is 2147 * going on, some nodes could potentially see the target as the 2148 * master, so it is important that my recovery finds the migration 2149 * mle and sets the master to UNKNONWN. */ 2150 2151 2152 /* wait for new node to assert master */ 2153 while (1) { 2154 ret = wait_event_interruptible_timeout(mle->wq, 2155 (atomic_read(&mle->woken) == 1), 2156 msecs_to_jiffies(5000)); 2157 2158 if (ret >= 0) { 2159 if (atomic_read(&mle->woken) == 1 || 2160 res->owner == target) 2161 break; 2162 2163 mlog(0, "timed out during migration\n"); 2164 /* avoid hang during shutdown when migrating lockres 2165 * to a node which also goes down */ 2166 if (dlm_is_node_dead(dlm, target)) { 2167 mlog(0, "%s:%.*s: expected migration target %u " 2168 "is no longer up. restarting.\n", 2169 dlm->name, res->lockname.len, 2170 res->lockname.name, target); 2171 ret = -ERESTARTSYS; 2172 } 2173 } 2174 if (ret == -ERESTARTSYS) { 2175 /* migration failed, detach and clean up mle */ 2176 dlm_mle_detach_hb_events(dlm, mle); 2177 dlm_put_mle(mle); 2178 dlm_put_mle(mle); 2179 goto leave; 2180 } 2181 /* TODO: if node died: stop, clean up, return error */ 2182 } 2183 2184 /* all done, set the owner, clear the flag */ 2185 spin_lock(&res->spinlock); 2186 dlm_set_lockres_owner(dlm, res, target); 2187 res->state &= ~DLM_LOCK_RES_MIGRATING; 2188 dlm_remove_nonlocal_locks(dlm, res); 2189 spin_unlock(&res->spinlock); 2190 wake_up(&res->wq); 2191 2192 /* master is known, detach if not already detached */ 2193 dlm_mle_detach_hb_events(dlm, mle); 2194 dlm_put_mle(mle); 2195 ret = 0; 2196 2197 dlm_lockres_calc_usage(dlm, res); 2198 2199 leave: 2200 /* re-dirty the lockres if we failed */ 2201 if (ret < 0) 2202 dlm_kick_thread(dlm, res); 2203 2204 /* TODO: cleanup */ 2205 if (mres) 2206 free_page((unsigned long)mres); 2207 2208 dlm_put(dlm); 2209 2210 mlog(0, "returning %d\n", ret); 2211 return ret; 2212 } 2213 EXPORT_SYMBOL_GPL(dlm_migrate_lockres); 2214 2215 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock) 2216 { 2217 int ret; 2218 spin_lock(&dlm->ast_lock); 2219 spin_lock(&lock->spinlock); 2220 ret = (list_empty(&lock->bast_list) && !lock->bast_pending); 2221 spin_unlock(&lock->spinlock); 2222 spin_unlock(&dlm->ast_lock); 2223 return ret; 2224 } 2225 2226 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm, 2227 struct dlm_lock_resource *res, 2228 u8 mig_target) 2229 { 2230 int can_proceed; 2231 spin_lock(&res->spinlock); 2232 can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING); 2233 spin_unlock(&res->spinlock); 2234 2235 /* target has died, so make the caller break out of the 2236 * wait_event, but caller must recheck the domain_map */ 2237 spin_lock(&dlm->spinlock); 2238 if (!test_bit(mig_target, dlm->domain_map)) 2239 can_proceed = 1; 2240 spin_unlock(&dlm->spinlock); 2241 return can_proceed; 2242 } 2243 2244 int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 2245 { 2246 int ret; 2247 spin_lock(&res->spinlock); 2248 ret = !!(res->state & DLM_LOCK_RES_DIRTY); 2249 spin_unlock(&res->spinlock); 2250 return ret; 2251 } 2252 2253 2254 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm, 2255 struct dlm_lock_resource *res, 2256 u8 target) 2257 { 2258 int ret = 0; 2259 2260 mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n", 2261 res->lockname.len, res->lockname.name, dlm->node_num, 2262 target); 2263 /* need to set MIGRATING flag on lockres. this is done by 2264 * ensuring that all asts have been flushed for this lockres. */ 2265 spin_lock(&res->spinlock); 2266 BUG_ON(res->migration_pending); 2267 res->migration_pending = 1; 2268 /* strategy is to reserve an extra ast then release 2269 * it below, letting the release do all of the work */ 2270 __dlm_lockres_reserve_ast(res); 2271 spin_unlock(&res->spinlock); 2272 2273 /* now flush all the pending asts.. hang out for a bit */ 2274 dlm_kick_thread(dlm, res); 2275 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res)); 2276 dlm_lockres_release_ast(dlm, res); 2277 2278 mlog(0, "about to wait on migration_wq, dirty=%s\n", 2279 res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no"); 2280 /* if the extra ref we just put was the final one, this 2281 * will pass thru immediately. otherwise, we need to wait 2282 * for the last ast to finish. */ 2283 again: 2284 ret = wait_event_interruptible_timeout(dlm->migration_wq, 2285 dlm_migration_can_proceed(dlm, res, target), 2286 msecs_to_jiffies(1000)); 2287 if (ret < 0) { 2288 mlog(0, "woken again: migrating? %s, dead? %s\n", 2289 res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no", 2290 test_bit(target, dlm->domain_map) ? "no":"yes"); 2291 } else { 2292 mlog(0, "all is well: migrating? %s, dead? %s\n", 2293 res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no", 2294 test_bit(target, dlm->domain_map) ? "no":"yes"); 2295 } 2296 if (!dlm_migration_can_proceed(dlm, res, target)) { 2297 mlog(0, "trying again...\n"); 2298 goto again; 2299 } 2300 2301 /* did the target go down or die? */ 2302 spin_lock(&dlm->spinlock); 2303 if (!test_bit(target, dlm->domain_map)) { 2304 mlog(ML_ERROR, "aha. migration target %u just went down\n", 2305 target); 2306 ret = -EHOSTDOWN; 2307 } 2308 spin_unlock(&dlm->spinlock); 2309 2310 /* 2311 * at this point: 2312 * 2313 * o the DLM_LOCK_RES_MIGRATING flag is set 2314 * o there are no pending asts on this lockres 2315 * o all processes trying to reserve an ast on this 2316 * lockres must wait for the MIGRATING flag to clear 2317 */ 2318 return ret; 2319 } 2320 2321 /* last step in the migration process. 2322 * original master calls this to free all of the dlm_lock 2323 * structures that used to be for other nodes. */ 2324 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, 2325 struct dlm_lock_resource *res) 2326 { 2327 struct list_head *iter, *iter2; 2328 struct list_head *queue = &res->granted; 2329 int i; 2330 struct dlm_lock *lock; 2331 2332 assert_spin_locked(&res->spinlock); 2333 2334 BUG_ON(res->owner == dlm->node_num); 2335 2336 for (i=0; i<3; i++) { 2337 list_for_each_safe(iter, iter2, queue) { 2338 lock = list_entry (iter, struct dlm_lock, list); 2339 if (lock->ml.node != dlm->node_num) { 2340 mlog(0, "putting lock for node %u\n", 2341 lock->ml.node); 2342 /* be extra careful */ 2343 BUG_ON(!list_empty(&lock->ast_list)); 2344 BUG_ON(!list_empty(&lock->bast_list)); 2345 BUG_ON(lock->ast_pending); 2346 BUG_ON(lock->bast_pending); 2347 list_del_init(&lock->list); 2348 dlm_lock_put(lock); 2349 } 2350 } 2351 queue++; 2352 } 2353 } 2354 2355 /* for now this is not too intelligent. we will 2356 * need stats to make this do the right thing. 2357 * this just finds the first lock on one of the 2358 * queues and uses that node as the target. */ 2359 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm, 2360 struct dlm_lock_resource *res) 2361 { 2362 int i; 2363 struct list_head *queue = &res->granted; 2364 struct list_head *iter; 2365 struct dlm_lock *lock; 2366 int nodenum; 2367 2368 assert_spin_locked(&dlm->spinlock); 2369 2370 spin_lock(&res->spinlock); 2371 for (i=0; i<3; i++) { 2372 list_for_each(iter, queue) { 2373 /* up to the caller to make sure this node 2374 * is alive */ 2375 lock = list_entry (iter, struct dlm_lock, list); 2376 if (lock->ml.node != dlm->node_num) { 2377 spin_unlock(&res->spinlock); 2378 return lock->ml.node; 2379 } 2380 } 2381 queue++; 2382 } 2383 spin_unlock(&res->spinlock); 2384 mlog(0, "have not found a suitable target yet! checking domain map\n"); 2385 2386 /* ok now we're getting desperate. pick anyone alive. */ 2387 nodenum = -1; 2388 while (1) { 2389 nodenum = find_next_bit(dlm->domain_map, 2390 O2NM_MAX_NODES, nodenum+1); 2391 mlog(0, "found %d in domain map\n", nodenum); 2392 if (nodenum >= O2NM_MAX_NODES) 2393 break; 2394 if (nodenum != dlm->node_num) { 2395 mlog(0, "picking %d\n", nodenum); 2396 return nodenum; 2397 } 2398 } 2399 2400 mlog(0, "giving up. no master to migrate to\n"); 2401 return DLM_LOCK_RES_OWNER_UNKNOWN; 2402 } 2403 2404 2405 2406 /* this is called by the new master once all lockres 2407 * data has been received */ 2408 static int dlm_do_migrate_request(struct dlm_ctxt *dlm, 2409 struct dlm_lock_resource *res, 2410 u8 master, u8 new_master, 2411 struct dlm_node_iter *iter) 2412 { 2413 struct dlm_migrate_request migrate; 2414 int ret, status = 0; 2415 int nodenum; 2416 2417 memset(&migrate, 0, sizeof(migrate)); 2418 migrate.namelen = res->lockname.len; 2419 memcpy(migrate.name, res->lockname.name, migrate.namelen); 2420 migrate.new_master = new_master; 2421 migrate.master = master; 2422 2423 ret = 0; 2424 2425 /* send message to all nodes, except the master and myself */ 2426 while ((nodenum = dlm_node_iter_next(iter)) >= 0) { 2427 if (nodenum == master || 2428 nodenum == new_master) 2429 continue; 2430 2431 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key, 2432 &migrate, sizeof(migrate), nodenum, 2433 &status); 2434 if (ret < 0) 2435 mlog_errno(ret); 2436 else if (status < 0) { 2437 mlog(0, "migrate request (node %u) returned %d!\n", 2438 nodenum, status); 2439 ret = status; 2440 } 2441 } 2442 2443 if (ret < 0) 2444 mlog_errno(ret); 2445 2446 mlog(0, "returning ret=%d\n", ret); 2447 return ret; 2448 } 2449 2450 2451 /* if there is an existing mle for this lockres, we now know who the master is. 2452 * (the one who sent us *this* message) we can clear it up right away. 2453 * since the process that put the mle on the list still has a reference to it, 2454 * we can unhash it now, set the master and wake the process. as a result, 2455 * we will have no mle in the list to start with. now we can add an mle for 2456 * the migration and this should be the only one found for those scanning the 2457 * list. */ 2458 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) 2459 { 2460 struct dlm_ctxt *dlm = data; 2461 struct dlm_lock_resource *res = NULL; 2462 struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf; 2463 struct dlm_master_list_entry *mle = NULL, *oldmle = NULL; 2464 const char *name; 2465 unsigned int namelen; 2466 int ret = 0; 2467 2468 if (!dlm_grab(dlm)) 2469 return -EINVAL; 2470 2471 name = migrate->name; 2472 namelen = migrate->namelen; 2473 2474 /* preallocate.. if this fails, abort */ 2475 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, 2476 GFP_KERNEL); 2477 2478 if (!mle) { 2479 ret = -ENOMEM; 2480 goto leave; 2481 } 2482 2483 /* check for pre-existing lock */ 2484 spin_lock(&dlm->spinlock); 2485 res = __dlm_lookup_lockres(dlm, name, namelen); 2486 spin_lock(&dlm->master_lock); 2487 2488 if (res) { 2489 spin_lock(&res->spinlock); 2490 if (res->state & DLM_LOCK_RES_RECOVERING) { 2491 /* if all is working ok, this can only mean that we got 2492 * a migrate request from a node that we now see as 2493 * dead. what can we do here? drop it to the floor? */ 2494 spin_unlock(&res->spinlock); 2495 mlog(ML_ERROR, "Got a migrate request, but the " 2496 "lockres is marked as recovering!"); 2497 kmem_cache_free(dlm_mle_cache, mle); 2498 ret = -EINVAL; /* need a better solution */ 2499 goto unlock; 2500 } 2501 res->state |= DLM_LOCK_RES_MIGRATING; 2502 spin_unlock(&res->spinlock); 2503 } 2504 2505 /* ignore status. only nonzero status would BUG. */ 2506 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, 2507 name, namelen, 2508 migrate->new_master, 2509 migrate->master); 2510 2511 unlock: 2512 spin_unlock(&dlm->master_lock); 2513 spin_unlock(&dlm->spinlock); 2514 2515 if (oldmle) { 2516 /* master is known, detach if not already detached */ 2517 dlm_mle_detach_hb_events(dlm, oldmle); 2518 dlm_put_mle(oldmle); 2519 } 2520 2521 if (res) 2522 dlm_lockres_put(res); 2523 leave: 2524 dlm_put(dlm); 2525 return ret; 2526 } 2527 2528 /* must be holding dlm->spinlock and dlm->master_lock 2529 * when adding a migration mle, we can clear any other mles 2530 * in the master list because we know with certainty that 2531 * the master is "master". so we remove any old mle from 2532 * the list after setting it's master field, and then add 2533 * the new migration mle. this way we can hold with the rule 2534 * of having only one mle for a given lock name at all times. */ 2535 static int dlm_add_migration_mle(struct dlm_ctxt *dlm, 2536 struct dlm_lock_resource *res, 2537 struct dlm_master_list_entry *mle, 2538 struct dlm_master_list_entry **oldmle, 2539 const char *name, unsigned int namelen, 2540 u8 new_master, u8 master) 2541 { 2542 int found; 2543 int ret = 0; 2544 2545 *oldmle = NULL; 2546 2547 mlog_entry_void(); 2548 2549 assert_spin_locked(&dlm->spinlock); 2550 assert_spin_locked(&dlm->master_lock); 2551 2552 /* caller is responsible for any ref taken here on oldmle */ 2553 found = dlm_find_mle(dlm, oldmle, (char *)name, namelen); 2554 if (found) { 2555 struct dlm_master_list_entry *tmp = *oldmle; 2556 spin_lock(&tmp->spinlock); 2557 if (tmp->type == DLM_MLE_MIGRATION) { 2558 if (master == dlm->node_num) { 2559 /* ah another process raced me to it */ 2560 mlog(0, "tried to migrate %.*s, but some " 2561 "process beat me to it\n", 2562 namelen, name); 2563 ret = -EEXIST; 2564 } else { 2565 /* bad. 2 NODES are trying to migrate! */ 2566 mlog(ML_ERROR, "migration error mle: " 2567 "master=%u new_master=%u // request: " 2568 "master=%u new_master=%u // " 2569 "lockres=%.*s\n", 2570 tmp->master, tmp->new_master, 2571 master, new_master, 2572 namelen, name); 2573 BUG(); 2574 } 2575 } else { 2576 /* this is essentially what assert_master does */ 2577 tmp->master = master; 2578 atomic_set(&tmp->woken, 1); 2579 wake_up(&tmp->wq); 2580 /* remove it from the list so that only one 2581 * mle will be found */ 2582 list_del_init(&tmp->list); 2583 } 2584 spin_unlock(&tmp->spinlock); 2585 } 2586 2587 /* now add a migration mle to the tail of the list */ 2588 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen); 2589 mle->new_master = new_master; 2590 mle->master = master; 2591 /* do this for consistency with other mle types */ 2592 set_bit(new_master, mle->maybe_map); 2593 list_add(&mle->list, &dlm->master_list); 2594 2595 return ret; 2596 } 2597 2598 2599 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) 2600 { 2601 struct list_head *iter, *iter2; 2602 struct dlm_master_list_entry *mle; 2603 struct dlm_lock_resource *res; 2604 2605 mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); 2606 top: 2607 assert_spin_locked(&dlm->spinlock); 2608 2609 /* clean the master list */ 2610 spin_lock(&dlm->master_lock); 2611 list_for_each_safe(iter, iter2, &dlm->master_list) { 2612 mle = list_entry(iter, struct dlm_master_list_entry, list); 2613 2614 BUG_ON(mle->type != DLM_MLE_BLOCK && 2615 mle->type != DLM_MLE_MASTER && 2616 mle->type != DLM_MLE_MIGRATION); 2617 2618 /* MASTER mles are initiated locally. the waiting 2619 * process will notice the node map change 2620 * shortly. let that happen as normal. */ 2621 if (mle->type == DLM_MLE_MASTER) 2622 continue; 2623 2624 2625 /* BLOCK mles are initiated by other nodes. 2626 * need to clean up if the dead node would have 2627 * been the master. */ 2628 if (mle->type == DLM_MLE_BLOCK) { 2629 int bit; 2630 2631 spin_lock(&mle->spinlock); 2632 bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); 2633 if (bit != dead_node) { 2634 mlog(0, "mle found, but dead node %u would " 2635 "not have been master\n", dead_node); 2636 spin_unlock(&mle->spinlock); 2637 } else { 2638 /* must drop the refcount by one since the 2639 * assert_master will never arrive. this 2640 * may result in the mle being unlinked and 2641 * freed, but there may still be a process 2642 * waiting in the dlmlock path which is fine. */ 2643 mlog(ML_ERROR, "node %u was expected master\n", 2644 dead_node); 2645 atomic_set(&mle->woken, 1); 2646 spin_unlock(&mle->spinlock); 2647 wake_up(&mle->wq); 2648 /* do not need events any longer, so detach 2649 * from heartbeat */ 2650 __dlm_mle_detach_hb_events(dlm, mle); 2651 __dlm_put_mle(mle); 2652 } 2653 continue; 2654 } 2655 2656 /* everything else is a MIGRATION mle */ 2657 2658 /* the rule for MIGRATION mles is that the master 2659 * becomes UNKNOWN if *either* the original or 2660 * the new master dies. all UNKNOWN lockreses 2661 * are sent to whichever node becomes the recovery 2662 * master. the new master is responsible for 2663 * determining if there is still a master for 2664 * this lockres, or if he needs to take over 2665 * mastery. either way, this node should expect 2666 * another message to resolve this. */ 2667 if (mle->master != dead_node && 2668 mle->new_master != dead_node) 2669 continue; 2670 2671 /* if we have reached this point, this mle needs to 2672 * be removed from the list and freed. */ 2673 2674 /* remove from the list early. NOTE: unlinking 2675 * list_head while in list_for_each_safe */ 2676 spin_lock(&mle->spinlock); 2677 list_del_init(&mle->list); 2678 atomic_set(&mle->woken, 1); 2679 spin_unlock(&mle->spinlock); 2680 wake_up(&mle->wq); 2681 2682 mlog(0, "node %u died during migration from " 2683 "%u to %u!\n", dead_node, 2684 mle->master, mle->new_master); 2685 /* if there is a lockres associated with this 2686 * mle, find it and set its owner to UNKNOWN */ 2687 res = __dlm_lookup_lockres(dlm, mle->u.name.name, 2688 mle->u.name.len); 2689 if (res) { 2690 /* unfortunately if we hit this rare case, our 2691 * lock ordering is messed. we need to drop 2692 * the master lock so that we can take the 2693 * lockres lock, meaning that we will have to 2694 * restart from the head of list. */ 2695 spin_unlock(&dlm->master_lock); 2696 2697 /* move lockres onto recovery list */ 2698 spin_lock(&res->spinlock); 2699 dlm_set_lockres_owner(dlm, res, 2700 DLM_LOCK_RES_OWNER_UNKNOWN); 2701 dlm_move_lockres_to_recovery_list(dlm, res); 2702 spin_unlock(&res->spinlock); 2703 dlm_lockres_put(res); 2704 2705 /* about to get rid of mle, detach from heartbeat */ 2706 __dlm_mle_detach_hb_events(dlm, mle); 2707 2708 /* dump the mle */ 2709 spin_lock(&dlm->master_lock); 2710 __dlm_put_mle(mle); 2711 spin_unlock(&dlm->master_lock); 2712 2713 /* restart */ 2714 goto top; 2715 } 2716 2717 /* this may be the last reference */ 2718 __dlm_put_mle(mle); 2719 } 2720 spin_unlock(&dlm->master_lock); 2721 } 2722 2723 2724 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 2725 u8 old_master) 2726 { 2727 struct dlm_node_iter iter; 2728 int ret = 0; 2729 2730 spin_lock(&dlm->spinlock); 2731 dlm_node_iter_init(dlm->domain_map, &iter); 2732 clear_bit(old_master, iter.node_map); 2733 clear_bit(dlm->node_num, iter.node_map); 2734 spin_unlock(&dlm->spinlock); 2735 2736 mlog(0, "now time to do a migrate request to other nodes\n"); 2737 ret = dlm_do_migrate_request(dlm, res, old_master, 2738 dlm->node_num, &iter); 2739 if (ret < 0) { 2740 mlog_errno(ret); 2741 goto leave; 2742 } 2743 2744 mlog(0, "doing assert master of %.*s to all except the original node\n", 2745 res->lockname.len, res->lockname.name); 2746 /* this call now finishes out the nodemap 2747 * even if one or more nodes die */ 2748 ret = dlm_do_assert_master(dlm, res->lockname.name, 2749 res->lockname.len, iter.node_map, 2750 DLM_ASSERT_MASTER_FINISH_MIGRATION); 2751 if (ret < 0) { 2752 /* no longer need to retry. all living nodes contacted. */ 2753 mlog_errno(ret); 2754 ret = 0; 2755 } 2756 2757 memset(iter.node_map, 0, sizeof(iter.node_map)); 2758 set_bit(old_master, iter.node_map); 2759 mlog(0, "doing assert master of %.*s back to %u\n", 2760 res->lockname.len, res->lockname.name, old_master); 2761 ret = dlm_do_assert_master(dlm, res->lockname.name, 2762 res->lockname.len, iter.node_map, 2763 DLM_ASSERT_MASTER_FINISH_MIGRATION); 2764 if (ret < 0) { 2765 mlog(0, "assert master to original master failed " 2766 "with %d.\n", ret); 2767 /* the only nonzero status here would be because of 2768 * a dead original node. we're done. */ 2769 ret = 0; 2770 } 2771 2772 /* all done, set the owner, clear the flag */ 2773 spin_lock(&res->spinlock); 2774 dlm_set_lockres_owner(dlm, res, dlm->node_num); 2775 res->state &= ~DLM_LOCK_RES_MIGRATING; 2776 spin_unlock(&res->spinlock); 2777 /* re-dirty it on the new master */ 2778 dlm_kick_thread(dlm, res); 2779 wake_up(&res->wq); 2780 leave: 2781 return ret; 2782 } 2783 2784 /* 2785 * LOCKRES AST REFCOUNT 2786 * this is integral to migration 2787 */ 2788 2789 /* for future intent to call an ast, reserve one ahead of time. 2790 * this should be called only after waiting on the lockres 2791 * with dlm_wait_on_lockres, and while still holding the 2792 * spinlock after the call. */ 2793 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res) 2794 { 2795 assert_spin_locked(&res->spinlock); 2796 if (res->state & DLM_LOCK_RES_MIGRATING) { 2797 __dlm_print_one_lock_resource(res); 2798 } 2799 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); 2800 2801 atomic_inc(&res->asts_reserved); 2802 } 2803 2804 /* 2805 * used to drop the reserved ast, either because it went unused, 2806 * or because the ast/bast was actually called. 2807 * 2808 * also, if there is a pending migration on this lockres, 2809 * and this was the last pending ast on the lockres, 2810 * atomically set the MIGRATING flag before we drop the lock. 2811 * this is how we ensure that migration can proceed with no 2812 * asts in progress. note that it is ok if the state of the 2813 * queues is such that a lock should be granted in the future 2814 * or that a bast should be fired, because the new master will 2815 * shuffle the lists on this lockres as soon as it is migrated. 2816 */ 2817 void dlm_lockres_release_ast(struct dlm_ctxt *dlm, 2818 struct dlm_lock_resource *res) 2819 { 2820 if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock)) 2821 return; 2822 2823 if (!res->migration_pending) { 2824 spin_unlock(&res->spinlock); 2825 return; 2826 } 2827 2828 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); 2829 res->migration_pending = 0; 2830 res->state |= DLM_LOCK_RES_MIGRATING; 2831 spin_unlock(&res->spinlock); 2832 wake_up(&res->wq); 2833 wake_up(&dlm->migration_wq); 2834 } 2835