1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* -*- mode: c; c-basic-offset: 8; -*- 3 * vim: noexpandtab sw=8 ts=8 sts=0: 4 * 5 * dlmrecovery.c 6 * 7 * recovery stuff 8 * 9 * Copyright (C) 2004 Oracle. All rights reserved. 10 */ 11 12 13 #include <linux/module.h> 14 #include <linux/fs.h> 15 #include <linux/types.h> 16 #include <linux/slab.h> 17 #include <linux/highmem.h> 18 #include <linux/init.h> 19 #include <linux/sysctl.h> 20 #include <linux/random.h> 21 #include <linux/blkdev.h> 22 #include <linux/socket.h> 23 #include <linux/inet.h> 24 #include <linux/timer.h> 25 #include <linux/kthread.h> 26 #include <linux/delay.h> 27 28 29 #include "cluster/heartbeat.h" 30 #include "cluster/nodemanager.h" 31 #include "cluster/tcp.h" 32 33 #include "dlmapi.h" 34 #include "dlmcommon.h" 35 #include "dlmdomain.h" 36 37 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_RECOVERY) 38 #include "cluster/masklog.h" 39 40 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node); 41 42 static int dlm_recovery_thread(void *data); 43 static int dlm_do_recovery(struct dlm_ctxt *dlm); 44 45 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm); 46 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node); 47 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node); 48 static int dlm_request_all_locks(struct dlm_ctxt *dlm, 49 u8 request_from, u8 dead_node); 50 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm); 51 52 static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res); 53 static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres, 54 const char *lockname, int namelen, 55 int total_locks, u64 cookie, 56 u8 flags, u8 master); 57 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, 58 struct dlm_migratable_lockres *mres, 59 u8 send_to, 60 struct dlm_lock_resource *res, 61 int total_locks); 62 static int dlm_process_recovery_data(struct dlm_ctxt *dlm, 63 struct dlm_lock_resource *res, 64 struct dlm_migratable_lockres *mres); 65 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm); 66 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, 67 u8 dead_node, u8 send_to); 68 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node); 69 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, 70 struct list_head *list, u8 dead_node); 71 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, 72 u8 dead_node, u8 new_master); 73 static void dlm_reco_ast(void *astdata); 74 static void dlm_reco_bast(void *astdata, int blocked_type); 75 static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st); 76 static void dlm_request_all_locks_worker(struct dlm_work_item *item, 77 void *data); 78 static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data); 79 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, 80 struct dlm_lock_resource *res, 81 u8 *real_master); 82 83 static u64 dlm_get_next_mig_cookie(void); 84 85 static DEFINE_SPINLOCK(dlm_reco_state_lock); 86 static DEFINE_SPINLOCK(dlm_mig_cookie_lock); 87 static u64 dlm_mig_cookie = 1; 88 89 static u64 dlm_get_next_mig_cookie(void) 90 { 91 u64 c; 92 spin_lock(&dlm_mig_cookie_lock); 93 c = dlm_mig_cookie; 94 if (dlm_mig_cookie == (~0ULL)) 95 dlm_mig_cookie = 1; 96 else 97 dlm_mig_cookie++; 98 spin_unlock(&dlm_mig_cookie_lock); 99 return c; 100 } 101 102 static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm, 103 u8 dead_node) 104 { 105 assert_spin_locked(&dlm->spinlock); 106 if (dlm->reco.dead_node != dead_node) 107 mlog(0, "%s: changing dead_node from %u to %u\n", 108 dlm->name, dlm->reco.dead_node, dead_node); 109 dlm->reco.dead_node = dead_node; 110 } 111 112 static inline void dlm_set_reco_master(struct dlm_ctxt *dlm, 113 u8 master) 114 { 115 assert_spin_locked(&dlm->spinlock); 116 mlog(0, "%s: changing new_master from %u to %u\n", 117 dlm->name, dlm->reco.new_master, master); 118 dlm->reco.new_master = master; 119 } 120 121 static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm) 122 { 123 assert_spin_locked(&dlm->spinlock); 124 clear_bit(dlm->reco.dead_node, dlm->recovery_map); 125 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); 126 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); 127 } 128 129 static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) 130 { 131 spin_lock(&dlm->spinlock); 132 __dlm_reset_recovery(dlm); 133 spin_unlock(&dlm->spinlock); 134 } 135 136 /* Worker function used during recovery. */ 137 void dlm_dispatch_work(struct work_struct *work) 138 { 139 struct dlm_ctxt *dlm = 140 container_of(work, struct dlm_ctxt, dispatched_work); 141 LIST_HEAD(tmp_list); 142 struct dlm_work_item *item, *next; 143 dlm_workfunc_t *workfunc; 144 int tot=0; 145 146 spin_lock(&dlm->work_lock); 147 list_splice_init(&dlm->work_list, &tmp_list); 148 spin_unlock(&dlm->work_lock); 149 150 list_for_each_entry(item, &tmp_list, list) { 151 tot++; 152 } 153 mlog(0, "%s: work thread has %d work items\n", dlm->name, tot); 154 155 list_for_each_entry_safe(item, next, &tmp_list, list) { 156 workfunc = item->func; 157 list_del_init(&item->list); 158 159 /* already have ref on dlm to avoid having 160 * it disappear. just double-check. */ 161 BUG_ON(item->dlm != dlm); 162 163 /* this is allowed to sleep and 164 * call network stuff */ 165 workfunc(item, item->data); 166 167 dlm_put(dlm); 168 kfree(item); 169 } 170 } 171 172 /* 173 * RECOVERY THREAD 174 */ 175 176 void dlm_kick_recovery_thread(struct dlm_ctxt *dlm) 177 { 178 /* wake the recovery thread 179 * this will wake the reco thread in one of three places 180 * 1) sleeping with no recovery happening 181 * 2) sleeping with recovery mastered elsewhere 182 * 3) recovery mastered here, waiting on reco data */ 183 184 wake_up(&dlm->dlm_reco_thread_wq); 185 } 186 187 /* Launch the recovery thread */ 188 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm) 189 { 190 mlog(0, "starting dlm recovery thread...\n"); 191 192 dlm->dlm_reco_thread_task = kthread_run(dlm_recovery_thread, dlm, 193 "dlm_reco-%s", dlm->name); 194 if (IS_ERR(dlm->dlm_reco_thread_task)) { 195 mlog_errno(PTR_ERR(dlm->dlm_reco_thread_task)); 196 dlm->dlm_reco_thread_task = NULL; 197 return -EINVAL; 198 } 199 200 return 0; 201 } 202 203 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm) 204 { 205 if (dlm->dlm_reco_thread_task) { 206 mlog(0, "waiting for dlm recovery thread to exit\n"); 207 kthread_stop(dlm->dlm_reco_thread_task); 208 dlm->dlm_reco_thread_task = NULL; 209 } 210 } 211 212 213 214 /* 215 * this is lame, but here's how recovery works... 216 * 1) all recovery threads cluster wide will work on recovering 217 * ONE node at a time 218 * 2) negotiate who will take over all the locks for the dead node. 219 * thats right... ALL the locks. 220 * 3) once a new master is chosen, everyone scans all locks 221 * and moves aside those mastered by the dead guy 222 * 4) each of these locks should be locked until recovery is done 223 * 5) the new master collects up all of secondary lock queue info 224 * one lock at a time, forcing each node to communicate back 225 * before continuing 226 * 6) each secondary lock queue responds with the full known lock info 227 * 7) once the new master has run all its locks, it sends a ALLDONE! 228 * message to everyone 229 * 8) upon receiving this message, the secondary queue node unlocks 230 * and responds to the ALLDONE 231 * 9) once the new master gets responses from everyone, he unlocks 232 * everything and recovery for this dead node is done 233 *10) go back to 2) while there are still dead nodes 234 * 235 */ 236 237 static void dlm_print_reco_node_status(struct dlm_ctxt *dlm) 238 { 239 struct dlm_reco_node_data *ndata; 240 struct dlm_lock_resource *res; 241 242 mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n", 243 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), 244 dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive", 245 dlm->reco.dead_node, dlm->reco.new_master); 246 247 list_for_each_entry(ndata, &dlm->reco.node_data, list) { 248 char *st = "unknown"; 249 switch (ndata->state) { 250 case DLM_RECO_NODE_DATA_INIT: 251 st = "init"; 252 break; 253 case DLM_RECO_NODE_DATA_REQUESTING: 254 st = "requesting"; 255 break; 256 case DLM_RECO_NODE_DATA_DEAD: 257 st = "dead"; 258 break; 259 case DLM_RECO_NODE_DATA_RECEIVING: 260 st = "receiving"; 261 break; 262 case DLM_RECO_NODE_DATA_REQUESTED: 263 st = "requested"; 264 break; 265 case DLM_RECO_NODE_DATA_DONE: 266 st = "done"; 267 break; 268 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 269 st = "finalize-sent"; 270 break; 271 default: 272 st = "bad"; 273 break; 274 } 275 mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n", 276 dlm->name, ndata->node_num, st); 277 } 278 list_for_each_entry(res, &dlm->reco.resources, recovering) { 279 mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n", 280 dlm->name, res->lockname.len, res->lockname.name); 281 } 282 } 283 284 #define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000) 285 286 static int dlm_recovery_thread(void *data) 287 { 288 int status; 289 struct dlm_ctxt *dlm = data; 290 unsigned long timeout = msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS); 291 292 mlog(0, "dlm thread running for %s...\n", dlm->name); 293 294 while (!kthread_should_stop()) { 295 if (dlm_domain_fully_joined(dlm)) { 296 status = dlm_do_recovery(dlm); 297 if (status == -EAGAIN) { 298 /* do not sleep, recheck immediately. */ 299 continue; 300 } 301 if (status < 0) 302 mlog_errno(status); 303 } 304 305 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, 306 kthread_should_stop(), 307 timeout); 308 } 309 310 mlog(0, "quitting DLM recovery thread\n"); 311 return 0; 312 } 313 314 /* returns true when the recovery master has contacted us */ 315 static int dlm_reco_master_ready(struct dlm_ctxt *dlm) 316 { 317 int ready; 318 spin_lock(&dlm->spinlock); 319 ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM); 320 spin_unlock(&dlm->spinlock); 321 return ready; 322 } 323 324 /* returns true if node is no longer in the domain 325 * could be dead or just not joined */ 326 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node) 327 { 328 int dead; 329 spin_lock(&dlm->spinlock); 330 dead = !test_bit(node, dlm->domain_map); 331 spin_unlock(&dlm->spinlock); 332 return dead; 333 } 334 335 /* returns true if node is no longer in the domain 336 * could be dead or just not joined */ 337 static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node) 338 { 339 int recovered; 340 spin_lock(&dlm->spinlock); 341 recovered = !test_bit(node, dlm->recovery_map); 342 spin_unlock(&dlm->spinlock); 343 return recovered; 344 } 345 346 347 void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) 348 { 349 if (dlm_is_node_dead(dlm, node)) 350 return; 351 352 printk(KERN_NOTICE "o2dlm: Waiting on the death of node %u in " 353 "domain %s\n", node, dlm->name); 354 355 if (timeout) 356 wait_event_timeout(dlm->dlm_reco_thread_wq, 357 dlm_is_node_dead(dlm, node), 358 msecs_to_jiffies(timeout)); 359 else 360 wait_event(dlm->dlm_reco_thread_wq, 361 dlm_is_node_dead(dlm, node)); 362 } 363 364 void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) 365 { 366 if (dlm_is_node_recovered(dlm, node)) 367 return; 368 369 printk(KERN_NOTICE "o2dlm: Waiting on the recovery of node %u in " 370 "domain %s\n", node, dlm->name); 371 372 if (timeout) 373 wait_event_timeout(dlm->dlm_reco_thread_wq, 374 dlm_is_node_recovered(dlm, node), 375 msecs_to_jiffies(timeout)); 376 else 377 wait_event(dlm->dlm_reco_thread_wq, 378 dlm_is_node_recovered(dlm, node)); 379 } 380 381 /* callers of the top-level api calls (dlmlock/dlmunlock) should 382 * block on the dlm->reco.event when recovery is in progress. 383 * the dlm recovery thread will set this state when it begins 384 * recovering a dead node (as the new master or not) and clear 385 * the state and wake as soon as all affected lock resources have 386 * been marked with the RECOVERY flag */ 387 static int dlm_in_recovery(struct dlm_ctxt *dlm) 388 { 389 int in_recovery; 390 spin_lock(&dlm->spinlock); 391 in_recovery = !!(dlm->reco.state & DLM_RECO_STATE_ACTIVE); 392 spin_unlock(&dlm->spinlock); 393 return in_recovery; 394 } 395 396 397 void dlm_wait_for_recovery(struct dlm_ctxt *dlm) 398 { 399 if (dlm_in_recovery(dlm)) { 400 mlog(0, "%s: reco thread %d in recovery: " 401 "state=%d, master=%u, dead=%u\n", 402 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), 403 dlm->reco.state, dlm->reco.new_master, 404 dlm->reco.dead_node); 405 } 406 wait_event(dlm->reco.event, !dlm_in_recovery(dlm)); 407 } 408 409 static void dlm_begin_recovery(struct dlm_ctxt *dlm) 410 { 411 assert_spin_locked(&dlm->spinlock); 412 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); 413 printk(KERN_NOTICE "o2dlm: Begin recovery on domain %s for node %u\n", 414 dlm->name, dlm->reco.dead_node); 415 dlm->reco.state |= DLM_RECO_STATE_ACTIVE; 416 } 417 418 static void dlm_end_recovery(struct dlm_ctxt *dlm) 419 { 420 spin_lock(&dlm->spinlock); 421 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); 422 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; 423 spin_unlock(&dlm->spinlock); 424 printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name); 425 wake_up(&dlm->reco.event); 426 } 427 428 static void dlm_print_recovery_master(struct dlm_ctxt *dlm) 429 { 430 printk(KERN_NOTICE "o2dlm: Node %u (%s) is the Recovery Master for the " 431 "dead node %u in domain %s\n", dlm->reco.new_master, 432 (dlm->node_num == dlm->reco.new_master ? "me" : "he"), 433 dlm->reco.dead_node, dlm->name); 434 } 435 436 static int dlm_do_recovery(struct dlm_ctxt *dlm) 437 { 438 int status = 0; 439 int ret; 440 441 spin_lock(&dlm->spinlock); 442 443 if (dlm->migrate_done) { 444 mlog(0, "%s: no need do recovery after migrating all " 445 "lock resources\n", dlm->name); 446 spin_unlock(&dlm->spinlock); 447 return 0; 448 } 449 450 /* check to see if the new master has died */ 451 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM && 452 test_bit(dlm->reco.new_master, dlm->recovery_map)) { 453 mlog(0, "new master %u died while recovering %u!\n", 454 dlm->reco.new_master, dlm->reco.dead_node); 455 /* unset the new_master, leave dead_node */ 456 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); 457 } 458 459 /* select a target to recover */ 460 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { 461 int bit; 462 463 bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES, 0); 464 if (bit >= O2NM_MAX_NODES || bit < 0) 465 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); 466 else 467 dlm_set_reco_dead_node(dlm, bit); 468 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { 469 /* BUG? */ 470 mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n", 471 dlm->reco.dead_node); 472 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); 473 } 474 475 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { 476 // mlog(0, "nothing to recover! sleeping now!\n"); 477 spin_unlock(&dlm->spinlock); 478 /* return to main thread loop and sleep. */ 479 return 0; 480 } 481 mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n", 482 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), 483 dlm->reco.dead_node); 484 485 /* take write barrier */ 486 /* (stops the list reshuffling thread, proxy ast handling) */ 487 dlm_begin_recovery(dlm); 488 489 spin_unlock(&dlm->spinlock); 490 491 if (dlm->reco.new_master == dlm->node_num) 492 goto master_here; 493 494 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { 495 /* choose a new master, returns 0 if this node 496 * is the master, -EEXIST if it's another node. 497 * this does not return until a new master is chosen 498 * or recovery completes entirely. */ 499 ret = dlm_pick_recovery_master(dlm); 500 if (!ret) { 501 /* already notified everyone. go. */ 502 goto master_here; 503 } 504 mlog(0, "another node will master this recovery session.\n"); 505 } 506 507 dlm_print_recovery_master(dlm); 508 509 /* it is safe to start everything back up here 510 * because all of the dead node's lock resources 511 * have been marked as in-recovery */ 512 dlm_end_recovery(dlm); 513 514 /* sleep out in main dlm_recovery_thread loop. */ 515 return 0; 516 517 master_here: 518 dlm_print_recovery_master(dlm); 519 520 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); 521 if (status < 0) { 522 /* we should never hit this anymore */ 523 mlog(ML_ERROR, "%s: Error %d remastering locks for node %u, " 524 "retrying.\n", dlm->name, status, dlm->reco.dead_node); 525 /* yield a bit to allow any final network messages 526 * to get handled on remaining nodes */ 527 msleep(100); 528 } else { 529 /* success! see if any other nodes need recovery */ 530 mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n", 531 dlm->name, dlm->reco.dead_node, dlm->node_num); 532 spin_lock(&dlm->spinlock); 533 __dlm_reset_recovery(dlm); 534 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; 535 spin_unlock(&dlm->spinlock); 536 } 537 dlm_end_recovery(dlm); 538 539 /* continue and look for another dead node */ 540 return -EAGAIN; 541 } 542 543 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) 544 { 545 int status = 0; 546 struct dlm_reco_node_data *ndata; 547 int all_nodes_done; 548 int destroy = 0; 549 int pass = 0; 550 551 do { 552 /* we have become recovery master. there is no escaping 553 * this, so just keep trying until we get it. */ 554 status = dlm_init_recovery_area(dlm, dead_node); 555 if (status < 0) { 556 mlog(ML_ERROR, "%s: failed to alloc recovery area, " 557 "retrying\n", dlm->name); 558 msleep(1000); 559 } 560 } while (status != 0); 561 562 /* safe to access the node data list without a lock, since this 563 * process is the only one to change the list */ 564 list_for_each_entry(ndata, &dlm->reco.node_data, list) { 565 BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT); 566 ndata->state = DLM_RECO_NODE_DATA_REQUESTING; 567 568 mlog(0, "%s: Requesting lock info from node %u\n", dlm->name, 569 ndata->node_num); 570 571 if (ndata->node_num == dlm->node_num) { 572 ndata->state = DLM_RECO_NODE_DATA_DONE; 573 continue; 574 } 575 576 do { 577 status = dlm_request_all_locks(dlm, ndata->node_num, 578 dead_node); 579 if (status < 0) { 580 mlog_errno(status); 581 if (dlm_is_host_down(status)) { 582 /* node died, ignore it for recovery */ 583 status = 0; 584 ndata->state = DLM_RECO_NODE_DATA_DEAD; 585 /* wait for the domain map to catch up 586 * with the network state. */ 587 wait_event_timeout(dlm->dlm_reco_thread_wq, 588 dlm_is_node_dead(dlm, 589 ndata->node_num), 590 msecs_to_jiffies(1000)); 591 mlog(0, "waited 1 sec for %u, " 592 "dead? %s\n", ndata->node_num, 593 dlm_is_node_dead(dlm, ndata->node_num) ? 594 "yes" : "no"); 595 } else { 596 /* -ENOMEM on the other node */ 597 mlog(0, "%s: node %u returned " 598 "%d during recovery, retrying " 599 "after a short wait\n", 600 dlm->name, ndata->node_num, 601 status); 602 msleep(100); 603 } 604 } 605 } while (status != 0); 606 607 spin_lock(&dlm_reco_state_lock); 608 switch (ndata->state) { 609 case DLM_RECO_NODE_DATA_INIT: 610 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 611 case DLM_RECO_NODE_DATA_REQUESTED: 612 BUG(); 613 break; 614 case DLM_RECO_NODE_DATA_DEAD: 615 mlog(0, "node %u died after requesting " 616 "recovery info for node %u\n", 617 ndata->node_num, dead_node); 618 /* fine. don't need this node's info. 619 * continue without it. */ 620 break; 621 case DLM_RECO_NODE_DATA_REQUESTING: 622 ndata->state = DLM_RECO_NODE_DATA_REQUESTED; 623 mlog(0, "now receiving recovery data from " 624 "node %u for dead node %u\n", 625 ndata->node_num, dead_node); 626 break; 627 case DLM_RECO_NODE_DATA_RECEIVING: 628 mlog(0, "already receiving recovery data from " 629 "node %u for dead node %u\n", 630 ndata->node_num, dead_node); 631 break; 632 case DLM_RECO_NODE_DATA_DONE: 633 mlog(0, "already DONE receiving recovery data " 634 "from node %u for dead node %u\n", 635 ndata->node_num, dead_node); 636 break; 637 } 638 spin_unlock(&dlm_reco_state_lock); 639 } 640 641 mlog(0, "%s: Done requesting all lock info\n", dlm->name); 642 643 /* nodes should be sending reco data now 644 * just need to wait */ 645 646 while (1) { 647 /* check all the nodes now to see if we are 648 * done, or if anyone died */ 649 all_nodes_done = 1; 650 spin_lock(&dlm_reco_state_lock); 651 list_for_each_entry(ndata, &dlm->reco.node_data, list) { 652 mlog(0, "checking recovery state of node %u\n", 653 ndata->node_num); 654 switch (ndata->state) { 655 case DLM_RECO_NODE_DATA_INIT: 656 case DLM_RECO_NODE_DATA_REQUESTING: 657 mlog(ML_ERROR, "bad ndata state for " 658 "node %u: state=%d\n", 659 ndata->node_num, ndata->state); 660 BUG(); 661 break; 662 case DLM_RECO_NODE_DATA_DEAD: 663 mlog(0, "node %u died after " 664 "requesting recovery info for " 665 "node %u\n", ndata->node_num, 666 dead_node); 667 break; 668 case DLM_RECO_NODE_DATA_RECEIVING: 669 case DLM_RECO_NODE_DATA_REQUESTED: 670 mlog(0, "%s: node %u still in state %s\n", 671 dlm->name, ndata->node_num, 672 ndata->state==DLM_RECO_NODE_DATA_RECEIVING ? 673 "receiving" : "requested"); 674 all_nodes_done = 0; 675 break; 676 case DLM_RECO_NODE_DATA_DONE: 677 mlog(0, "%s: node %u state is done\n", 678 dlm->name, ndata->node_num); 679 break; 680 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 681 mlog(0, "%s: node %u state is finalize\n", 682 dlm->name, ndata->node_num); 683 break; 684 } 685 } 686 spin_unlock(&dlm_reco_state_lock); 687 688 mlog(0, "pass #%d, all_nodes_done?: %s\n", ++pass, 689 all_nodes_done?"yes":"no"); 690 if (all_nodes_done) { 691 int ret; 692 693 /* Set this flag on recovery master to avoid 694 * a new recovery for another dead node start 695 * before the recovery is not done. That may 696 * cause recovery hung.*/ 697 spin_lock(&dlm->spinlock); 698 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; 699 spin_unlock(&dlm->spinlock); 700 701 /* all nodes are now in DLM_RECO_NODE_DATA_DONE state 702 * just send a finalize message to everyone and 703 * clean up */ 704 mlog(0, "all nodes are done! send finalize\n"); 705 ret = dlm_send_finalize_reco_message(dlm); 706 if (ret < 0) 707 mlog_errno(ret); 708 709 spin_lock(&dlm->spinlock); 710 dlm_finish_local_lockres_recovery(dlm, dead_node, 711 dlm->node_num); 712 spin_unlock(&dlm->spinlock); 713 mlog(0, "should be done with recovery!\n"); 714 715 mlog(0, "finishing recovery of %s at %lu, " 716 "dead=%u, this=%u, new=%u\n", dlm->name, 717 jiffies, dlm->reco.dead_node, 718 dlm->node_num, dlm->reco.new_master); 719 destroy = 1; 720 status = 0; 721 /* rescan everything marked dirty along the way */ 722 dlm_kick_thread(dlm, NULL); 723 break; 724 } 725 /* wait to be signalled, with periodic timeout 726 * to check for node death */ 727 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, 728 kthread_should_stop(), 729 msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS)); 730 731 } 732 733 if (destroy) 734 dlm_destroy_recovery_area(dlm); 735 736 return status; 737 } 738 739 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node) 740 { 741 int num=0; 742 struct dlm_reco_node_data *ndata; 743 744 spin_lock(&dlm->spinlock); 745 memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map)); 746 /* nodes can only be removed (by dying) after dropping 747 * this lock, and death will be trapped later, so this should do */ 748 spin_unlock(&dlm->spinlock); 749 750 while (1) { 751 num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num); 752 if (num >= O2NM_MAX_NODES) { 753 break; 754 } 755 BUG_ON(num == dead_node); 756 757 ndata = kzalloc(sizeof(*ndata), GFP_NOFS); 758 if (!ndata) { 759 dlm_destroy_recovery_area(dlm); 760 return -ENOMEM; 761 } 762 ndata->node_num = num; 763 ndata->state = DLM_RECO_NODE_DATA_INIT; 764 spin_lock(&dlm_reco_state_lock); 765 list_add_tail(&ndata->list, &dlm->reco.node_data); 766 spin_unlock(&dlm_reco_state_lock); 767 num++; 768 } 769 770 return 0; 771 } 772 773 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm) 774 { 775 struct dlm_reco_node_data *ndata, *next; 776 LIST_HEAD(tmplist); 777 778 spin_lock(&dlm_reco_state_lock); 779 list_splice_init(&dlm->reco.node_data, &tmplist); 780 spin_unlock(&dlm_reco_state_lock); 781 782 list_for_each_entry_safe(ndata, next, &tmplist, list) { 783 list_del_init(&ndata->list); 784 kfree(ndata); 785 } 786 } 787 788 static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, 789 u8 dead_node) 790 { 791 struct dlm_lock_request lr; 792 int ret; 793 int status; 794 795 mlog(0, "\n"); 796 797 798 mlog(0, "dlm_request_all_locks: dead node is %u, sending request " 799 "to %u\n", dead_node, request_from); 800 801 memset(&lr, 0, sizeof(lr)); 802 lr.node_idx = dlm->node_num; 803 lr.dead_node = dead_node; 804 805 // send message 806 ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key, 807 &lr, sizeof(lr), request_from, &status); 808 809 /* negative status is handled by caller */ 810 if (ret < 0) 811 mlog(ML_ERROR, "%s: Error %d send LOCK_REQUEST to node %u " 812 "to recover dead node %u\n", dlm->name, ret, 813 request_from, dead_node); 814 else 815 ret = status; 816 // return from here, then 817 // sleep until all received or error 818 return ret; 819 820 } 821 822 int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data, 823 void **ret_data) 824 { 825 struct dlm_ctxt *dlm = data; 826 struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf; 827 char *buf = NULL; 828 struct dlm_work_item *item = NULL; 829 830 if (!dlm_grab(dlm)) 831 return -EINVAL; 832 833 if (lr->dead_node != dlm->reco.dead_node) { 834 mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local " 835 "dead_node is %u\n", dlm->name, lr->node_idx, 836 lr->dead_node, dlm->reco.dead_node); 837 dlm_print_reco_node_status(dlm); 838 /* this is a hack */ 839 dlm_put(dlm); 840 return -ENOMEM; 841 } 842 BUG_ON(lr->dead_node != dlm->reco.dead_node); 843 844 item = kzalloc(sizeof(*item), GFP_NOFS); 845 if (!item) { 846 dlm_put(dlm); 847 return -ENOMEM; 848 } 849 850 /* this will get freed by dlm_request_all_locks_worker */ 851 buf = (char *) __get_free_page(GFP_NOFS); 852 if (!buf) { 853 kfree(item); 854 dlm_put(dlm); 855 return -ENOMEM; 856 } 857 858 /* queue up work for dlm_request_all_locks_worker */ 859 dlm_grab(dlm); /* get an extra ref for the work item */ 860 dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf); 861 item->u.ral.reco_master = lr->node_idx; 862 item->u.ral.dead_node = lr->dead_node; 863 spin_lock(&dlm->work_lock); 864 list_add_tail(&item->list, &dlm->work_list); 865 spin_unlock(&dlm->work_lock); 866 queue_work(dlm->dlm_worker, &dlm->dispatched_work); 867 868 dlm_put(dlm); 869 return 0; 870 } 871 872 static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) 873 { 874 struct dlm_migratable_lockres *mres; 875 struct dlm_lock_resource *res; 876 struct dlm_ctxt *dlm; 877 LIST_HEAD(resources); 878 int ret; 879 u8 dead_node, reco_master; 880 int skip_all_done = 0; 881 882 dlm = item->dlm; 883 dead_node = item->u.ral.dead_node; 884 reco_master = item->u.ral.reco_master; 885 mres = (struct dlm_migratable_lockres *)data; 886 887 mlog(0, "%s: recovery worker started, dead=%u, master=%u\n", 888 dlm->name, dead_node, reco_master); 889 890 if (dead_node != dlm->reco.dead_node || 891 reco_master != dlm->reco.new_master) { 892 /* worker could have been created before the recovery master 893 * died. if so, do not continue, but do not error. */ 894 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { 895 mlog(ML_NOTICE, "%s: will not send recovery state, " 896 "recovery master %u died, thread=(dead=%u,mas=%u)" 897 " current=(dead=%u,mas=%u)\n", dlm->name, 898 reco_master, dead_node, reco_master, 899 dlm->reco.dead_node, dlm->reco.new_master); 900 } else { 901 mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, " 902 "master=%u), request(dead=%u, master=%u)\n", 903 dlm->name, dlm->reco.dead_node, 904 dlm->reco.new_master, dead_node, reco_master); 905 } 906 goto leave; 907 } 908 909 /* lock resources should have already been moved to the 910 * dlm->reco.resources list. now move items from that list 911 * to a temp list if the dead owner matches. note that the 912 * whole cluster recovers only one node at a time, so we 913 * can safely move UNKNOWN lock resources for each recovery 914 * session. */ 915 dlm_move_reco_locks_to_list(dlm, &resources, dead_node); 916 917 /* now we can begin blasting lockreses without the dlm lock */ 918 919 /* any errors returned will be due to the new_master dying, 920 * the dlm_reco_thread should detect this */ 921 list_for_each_entry(res, &resources, recovering) { 922 ret = dlm_send_one_lockres(dlm, res, mres, reco_master, 923 DLM_MRES_RECOVERY); 924 if (ret < 0) { 925 mlog(ML_ERROR, "%s: node %u went down while sending " 926 "recovery state for dead node %u, ret=%d\n", dlm->name, 927 reco_master, dead_node, ret); 928 skip_all_done = 1; 929 break; 930 } 931 } 932 933 /* move the resources back to the list */ 934 spin_lock(&dlm->spinlock); 935 list_splice_init(&resources, &dlm->reco.resources); 936 spin_unlock(&dlm->spinlock); 937 938 if (!skip_all_done) { 939 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); 940 if (ret < 0) { 941 mlog(ML_ERROR, "%s: node %u went down while sending " 942 "recovery all-done for dead node %u, ret=%d\n", 943 dlm->name, reco_master, dead_node, ret); 944 } 945 } 946 leave: 947 free_page((unsigned long)data); 948 } 949 950 951 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to) 952 { 953 int ret, tmpret; 954 struct dlm_reco_data_done done_msg; 955 956 memset(&done_msg, 0, sizeof(done_msg)); 957 done_msg.node_idx = dlm->node_num; 958 done_msg.dead_node = dead_node; 959 mlog(0, "sending DATA DONE message to %u, " 960 "my node=%u, dead node=%u\n", send_to, done_msg.node_idx, 961 done_msg.dead_node); 962 963 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, 964 sizeof(done_msg), send_to, &tmpret); 965 if (ret < 0) { 966 mlog(ML_ERROR, "%s: Error %d send RECO_DATA_DONE to node %u " 967 "to recover dead node %u\n", dlm->name, ret, send_to, 968 dead_node); 969 if (!dlm_is_host_down(ret)) { 970 BUG(); 971 } 972 } else 973 ret = tmpret; 974 return ret; 975 } 976 977 978 int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data, 979 void **ret_data) 980 { 981 struct dlm_ctxt *dlm = data; 982 struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf; 983 struct dlm_reco_node_data *ndata = NULL; 984 int ret = -EINVAL; 985 986 if (!dlm_grab(dlm)) 987 return -EINVAL; 988 989 mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, " 990 "node_idx=%u, this node=%u\n", done->dead_node, 991 dlm->reco.dead_node, done->node_idx, dlm->node_num); 992 993 mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node), 994 "Got DATA DONE: dead_node=%u, reco.dead_node=%u, " 995 "node_idx=%u, this node=%u\n", done->dead_node, 996 dlm->reco.dead_node, done->node_idx, dlm->node_num); 997 998 spin_lock(&dlm_reco_state_lock); 999 list_for_each_entry(ndata, &dlm->reco.node_data, list) { 1000 if (ndata->node_num != done->node_idx) 1001 continue; 1002 1003 switch (ndata->state) { 1004 /* should have moved beyond INIT but not to FINALIZE yet */ 1005 case DLM_RECO_NODE_DATA_INIT: 1006 case DLM_RECO_NODE_DATA_DEAD: 1007 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 1008 mlog(ML_ERROR, "bad ndata state for node %u:" 1009 " state=%d\n", ndata->node_num, 1010 ndata->state); 1011 BUG(); 1012 break; 1013 /* these states are possible at this point, anywhere along 1014 * the line of recovery */ 1015 case DLM_RECO_NODE_DATA_DONE: 1016 case DLM_RECO_NODE_DATA_RECEIVING: 1017 case DLM_RECO_NODE_DATA_REQUESTED: 1018 case DLM_RECO_NODE_DATA_REQUESTING: 1019 mlog(0, "node %u is DONE sending " 1020 "recovery data!\n", 1021 ndata->node_num); 1022 1023 ndata->state = DLM_RECO_NODE_DATA_DONE; 1024 ret = 0; 1025 break; 1026 } 1027 } 1028 spin_unlock(&dlm_reco_state_lock); 1029 1030 /* wake the recovery thread, some node is done */ 1031 if (!ret) 1032 dlm_kick_recovery_thread(dlm); 1033 1034 if (ret < 0) 1035 mlog(ML_ERROR, "failed to find recovery node data for node " 1036 "%u\n", done->node_idx); 1037 dlm_put(dlm); 1038 1039 mlog(0, "leaving reco data done handler, ret=%d\n", ret); 1040 return ret; 1041 } 1042 1043 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, 1044 struct list_head *list, 1045 u8 dead_node) 1046 { 1047 struct dlm_lock_resource *res, *next; 1048 struct dlm_lock *lock; 1049 1050 spin_lock(&dlm->spinlock); 1051 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { 1052 /* always prune any $RECOVERY entries for dead nodes, 1053 * otherwise hangs can occur during later recovery */ 1054 if (dlm_is_recovery_lock(res->lockname.name, 1055 res->lockname.len)) { 1056 spin_lock(&res->spinlock); 1057 list_for_each_entry(lock, &res->granted, list) { 1058 if (lock->ml.node == dead_node) { 1059 mlog(0, "AHA! there was " 1060 "a $RECOVERY lock for dead " 1061 "node %u (%s)!\n", 1062 dead_node, dlm->name); 1063 list_del_init(&lock->list); 1064 dlm_lock_put(lock); 1065 /* Can't schedule DLM_UNLOCK_FREE_LOCK 1066 * - do manually */ 1067 dlm_lock_put(lock); 1068 break; 1069 } 1070 } 1071 spin_unlock(&res->spinlock); 1072 continue; 1073 } 1074 1075 if (res->owner == dead_node) { 1076 mlog(0, "found lockres owned by dead node while " 1077 "doing recovery for node %u. sending it.\n", 1078 dead_node); 1079 list_move_tail(&res->recovering, list); 1080 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { 1081 mlog(0, "found UNKNOWN owner while doing recovery " 1082 "for node %u. sending it.\n", dead_node); 1083 list_move_tail(&res->recovering, list); 1084 } 1085 } 1086 spin_unlock(&dlm->spinlock); 1087 } 1088 1089 static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res) 1090 { 1091 int total_locks = 0; 1092 struct list_head *iter, *queue = &res->granted; 1093 int i; 1094 1095 for (i=0; i<3; i++) { 1096 list_for_each(iter, queue) 1097 total_locks++; 1098 queue++; 1099 } 1100 return total_locks; 1101 } 1102 1103 1104 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, 1105 struct dlm_migratable_lockres *mres, 1106 u8 send_to, 1107 struct dlm_lock_resource *res, 1108 int total_locks) 1109 { 1110 u64 mig_cookie = be64_to_cpu(mres->mig_cookie); 1111 int mres_total_locks = be32_to_cpu(mres->total_locks); 1112 int sz, ret = 0, status = 0; 1113 u8 orig_flags = mres->flags, 1114 orig_master = mres->master; 1115 1116 BUG_ON(mres->num_locks > DLM_MAX_MIGRATABLE_LOCKS); 1117 if (!mres->num_locks) 1118 return 0; 1119 1120 sz = sizeof(struct dlm_migratable_lockres) + 1121 (mres->num_locks * sizeof(struct dlm_migratable_lock)); 1122 1123 /* add an all-done flag if we reached the last lock */ 1124 orig_flags = mres->flags; 1125 BUG_ON(total_locks > mres_total_locks); 1126 if (total_locks == mres_total_locks) 1127 mres->flags |= DLM_MRES_ALL_DONE; 1128 1129 mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n", 1130 dlm->name, res->lockname.len, res->lockname.name, 1131 orig_flags & DLM_MRES_MIGRATION ? "migration" : "recovery", 1132 send_to); 1133 1134 /* send it */ 1135 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, 1136 sz, send_to, &status); 1137 if (ret < 0) { 1138 /* XXX: negative status is not handled. 1139 * this will end up killing this node. */ 1140 mlog(ML_ERROR, "%s: res %.*s, Error %d send MIG_LOCKRES to " 1141 "node %u (%s)\n", dlm->name, mres->lockname_len, 1142 mres->lockname, ret, send_to, 1143 (orig_flags & DLM_MRES_MIGRATION ? 1144 "migration" : "recovery")); 1145 } else { 1146 /* might get an -ENOMEM back here */ 1147 ret = status; 1148 if (ret < 0) { 1149 mlog_errno(ret); 1150 1151 if (ret == -EFAULT) { 1152 mlog(ML_ERROR, "node %u told me to kill " 1153 "myself!\n", send_to); 1154 BUG(); 1155 } 1156 } 1157 } 1158 1159 /* zero and reinit the message buffer */ 1160 dlm_init_migratable_lockres(mres, res->lockname.name, 1161 res->lockname.len, mres_total_locks, 1162 mig_cookie, orig_flags, orig_master); 1163 return ret; 1164 } 1165 1166 static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres, 1167 const char *lockname, int namelen, 1168 int total_locks, u64 cookie, 1169 u8 flags, u8 master) 1170 { 1171 /* mres here is one full page */ 1172 clear_page(mres); 1173 mres->lockname_len = namelen; 1174 memcpy(mres->lockname, lockname, namelen); 1175 mres->num_locks = 0; 1176 mres->total_locks = cpu_to_be32(total_locks); 1177 mres->mig_cookie = cpu_to_be64(cookie); 1178 mres->flags = flags; 1179 mres->master = master; 1180 } 1181 1182 static void dlm_prepare_lvb_for_migration(struct dlm_lock *lock, 1183 struct dlm_migratable_lockres *mres, 1184 int queue) 1185 { 1186 if (!lock->lksb) 1187 return; 1188 1189 /* Ignore lvb in all locks in the blocked list */ 1190 if (queue == DLM_BLOCKED_LIST) 1191 return; 1192 1193 /* Only consider lvbs in locks with granted EX or PR lock levels */ 1194 if (lock->ml.type != LKM_EXMODE && lock->ml.type != LKM_PRMODE) 1195 return; 1196 1197 if (dlm_lvb_is_empty(mres->lvb)) { 1198 memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN); 1199 return; 1200 } 1201 1202 /* Ensure the lvb copied for migration matches in other valid locks */ 1203 if (!memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN)) 1204 return; 1205 1206 mlog(ML_ERROR, "Mismatched lvb in lock cookie=%u:%llu, name=%.*s, " 1207 "node=%u\n", 1208 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 1209 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 1210 lock->lockres->lockname.len, lock->lockres->lockname.name, 1211 lock->ml.node); 1212 dlm_print_one_lock_resource(lock->lockres); 1213 BUG(); 1214 } 1215 1216 /* returns 1 if this lock fills the network structure, 1217 * 0 otherwise */ 1218 static int dlm_add_lock_to_array(struct dlm_lock *lock, 1219 struct dlm_migratable_lockres *mres, int queue) 1220 { 1221 struct dlm_migratable_lock *ml; 1222 int lock_num = mres->num_locks; 1223 1224 ml = &(mres->ml[lock_num]); 1225 ml->cookie = lock->ml.cookie; 1226 ml->type = lock->ml.type; 1227 ml->convert_type = lock->ml.convert_type; 1228 ml->highest_blocked = lock->ml.highest_blocked; 1229 ml->list = queue; 1230 if (lock->lksb) { 1231 ml->flags = lock->lksb->flags; 1232 dlm_prepare_lvb_for_migration(lock, mres, queue); 1233 } 1234 ml->node = lock->ml.node; 1235 mres->num_locks++; 1236 /* we reached the max, send this network message */ 1237 if (mres->num_locks == DLM_MAX_MIGRATABLE_LOCKS) 1238 return 1; 1239 return 0; 1240 } 1241 1242 static void dlm_add_dummy_lock(struct dlm_ctxt *dlm, 1243 struct dlm_migratable_lockres *mres) 1244 { 1245 struct dlm_lock dummy; 1246 memset(&dummy, 0, sizeof(dummy)); 1247 dummy.ml.cookie = 0; 1248 dummy.ml.type = LKM_IVMODE; 1249 dummy.ml.convert_type = LKM_IVMODE; 1250 dummy.ml.highest_blocked = LKM_IVMODE; 1251 dummy.lksb = NULL; 1252 dummy.ml.node = dlm->node_num; 1253 dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST); 1254 } 1255 1256 static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm, 1257 struct dlm_migratable_lock *ml, 1258 u8 *nodenum) 1259 { 1260 if (unlikely(ml->cookie == 0 && 1261 ml->type == LKM_IVMODE && 1262 ml->convert_type == LKM_IVMODE && 1263 ml->highest_blocked == LKM_IVMODE && 1264 ml->list == DLM_BLOCKED_LIST)) { 1265 *nodenum = ml->node; 1266 return 1; 1267 } 1268 return 0; 1269 } 1270 1271 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 1272 struct dlm_migratable_lockres *mres, 1273 u8 send_to, u8 flags) 1274 { 1275 struct list_head *queue; 1276 int total_locks, i; 1277 u64 mig_cookie = 0; 1278 struct dlm_lock *lock; 1279 int ret = 0; 1280 1281 BUG_ON(!(flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION))); 1282 1283 mlog(0, "sending to %u\n", send_to); 1284 1285 total_locks = dlm_num_locks_in_lockres(res); 1286 if (total_locks > DLM_MAX_MIGRATABLE_LOCKS) { 1287 /* rare, but possible */ 1288 mlog(0, "argh. lockres has %d locks. this will " 1289 "require more than one network packet to " 1290 "migrate\n", total_locks); 1291 mig_cookie = dlm_get_next_mig_cookie(); 1292 } 1293 1294 dlm_init_migratable_lockres(mres, res->lockname.name, 1295 res->lockname.len, total_locks, 1296 mig_cookie, flags, res->owner); 1297 1298 total_locks = 0; 1299 for (i=DLM_GRANTED_LIST; i<=DLM_BLOCKED_LIST; i++) { 1300 queue = dlm_list_idx_to_ptr(res, i); 1301 list_for_each_entry(lock, queue, list) { 1302 /* add another lock. */ 1303 total_locks++; 1304 if (!dlm_add_lock_to_array(lock, mres, i)) 1305 continue; 1306 1307 /* this filled the lock message, 1308 * we must send it immediately. */ 1309 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, 1310 res, total_locks); 1311 if (ret < 0) 1312 goto error; 1313 } 1314 } 1315 if (total_locks == 0) { 1316 /* send a dummy lock to indicate a mastery reference only */ 1317 mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n", 1318 dlm->name, res->lockname.len, res->lockname.name, 1319 send_to, flags & DLM_MRES_RECOVERY ? "recovery" : 1320 "migration"); 1321 dlm_add_dummy_lock(dlm, mres); 1322 } 1323 /* flush any remaining locks */ 1324 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); 1325 if (ret < 0) 1326 goto error; 1327 return ret; 1328 1329 error: 1330 mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n", 1331 dlm->name, ret); 1332 if (!dlm_is_host_down(ret)) 1333 BUG(); 1334 mlog(0, "%s: node %u went down while sending %s " 1335 "lockres %.*s\n", dlm->name, send_to, 1336 flags & DLM_MRES_RECOVERY ? "recovery" : "migration", 1337 res->lockname.len, res->lockname.name); 1338 return ret; 1339 } 1340 1341 1342 1343 /* 1344 * this message will contain no more than one page worth of 1345 * recovery data, and it will work on only one lockres. 1346 * there may be many locks in this page, and we may need to wait 1347 * for additional packets to complete all the locks (rare, but 1348 * possible). 1349 */ 1350 /* 1351 * NOTE: the allocation error cases here are scary 1352 * we really cannot afford to fail an alloc in recovery 1353 * do we spin? returning an error only delays the problem really 1354 */ 1355 1356 int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data, 1357 void **ret_data) 1358 { 1359 struct dlm_ctxt *dlm = data; 1360 struct dlm_migratable_lockres *mres = 1361 (struct dlm_migratable_lockres *)msg->buf; 1362 int ret = 0; 1363 u8 real_master; 1364 u8 extra_refs = 0; 1365 char *buf = NULL; 1366 struct dlm_work_item *item = NULL; 1367 struct dlm_lock_resource *res = NULL; 1368 unsigned int hash; 1369 1370 if (!dlm_grab(dlm)) 1371 return -EINVAL; 1372 1373 if (!dlm_joined(dlm)) { 1374 mlog(ML_ERROR, "Domain %s not joined! " 1375 "lockres %.*s, master %u\n", 1376 dlm->name, mres->lockname_len, 1377 mres->lockname, mres->master); 1378 dlm_put(dlm); 1379 return -EINVAL; 1380 } 1381 1382 BUG_ON(!(mres->flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION))); 1383 1384 real_master = mres->master; 1385 if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1386 /* cannot migrate a lockres with no master */ 1387 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); 1388 } 1389 1390 mlog(0, "%s message received from node %u\n", 1391 (mres->flags & DLM_MRES_RECOVERY) ? 1392 "recovery" : "migration", mres->master); 1393 if (mres->flags & DLM_MRES_ALL_DONE) 1394 mlog(0, "all done flag. all lockres data received!\n"); 1395 1396 ret = -ENOMEM; 1397 buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS); 1398 item = kzalloc(sizeof(*item), GFP_NOFS); 1399 if (!buf || !item) 1400 goto leave; 1401 1402 /* lookup the lock to see if we have a secondary queue for this 1403 * already... just add the locks in and this will have its owner 1404 * and RECOVERY flag changed when it completes. */ 1405 hash = dlm_lockid_hash(mres->lockname, mres->lockname_len); 1406 spin_lock(&dlm->spinlock); 1407 res = __dlm_lookup_lockres_full(dlm, mres->lockname, mres->lockname_len, 1408 hash); 1409 if (res) { 1410 /* this will get a ref on res */ 1411 /* mark it as recovering/migrating and hash it */ 1412 spin_lock(&res->spinlock); 1413 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 1414 mlog(0, "%s: node is attempting to migrate " 1415 "lockres %.*s, but marked as dropping " 1416 " ref!\n", dlm->name, 1417 mres->lockname_len, mres->lockname); 1418 ret = -EINVAL; 1419 spin_unlock(&res->spinlock); 1420 spin_unlock(&dlm->spinlock); 1421 dlm_lockres_put(res); 1422 goto leave; 1423 } 1424 1425 if (mres->flags & DLM_MRES_RECOVERY) { 1426 res->state |= DLM_LOCK_RES_RECOVERING; 1427 } else { 1428 if (res->state & DLM_LOCK_RES_MIGRATING) { 1429 /* this is at least the second 1430 * lockres message */ 1431 mlog(0, "lock %.*s is already migrating\n", 1432 mres->lockname_len, 1433 mres->lockname); 1434 } else if (res->state & DLM_LOCK_RES_RECOVERING) { 1435 /* caller should BUG */ 1436 mlog(ML_ERROR, "node is attempting to migrate " 1437 "lock %.*s, but marked as recovering!\n", 1438 mres->lockname_len, mres->lockname); 1439 ret = -EFAULT; 1440 spin_unlock(&res->spinlock); 1441 spin_unlock(&dlm->spinlock); 1442 dlm_lockres_put(res); 1443 goto leave; 1444 } 1445 res->state |= DLM_LOCK_RES_MIGRATING; 1446 } 1447 spin_unlock(&res->spinlock); 1448 spin_unlock(&dlm->spinlock); 1449 } else { 1450 spin_unlock(&dlm->spinlock); 1451 /* need to allocate, just like if it was 1452 * mastered here normally */ 1453 res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len); 1454 if (!res) 1455 goto leave; 1456 1457 /* to match the ref that we would have gotten if 1458 * dlm_lookup_lockres had succeeded */ 1459 dlm_lockres_get(res); 1460 1461 /* mark it as recovering/migrating and hash it */ 1462 if (mres->flags & DLM_MRES_RECOVERY) 1463 res->state |= DLM_LOCK_RES_RECOVERING; 1464 else 1465 res->state |= DLM_LOCK_RES_MIGRATING; 1466 1467 spin_lock(&dlm->spinlock); 1468 __dlm_insert_lockres(dlm, res); 1469 spin_unlock(&dlm->spinlock); 1470 1471 /* Add an extra ref for this lock-less lockres lest the 1472 * dlm_thread purges it before we get the chance to add 1473 * locks to it */ 1474 dlm_lockres_get(res); 1475 1476 /* There are three refs that need to be put. 1477 * 1. Taken above. 1478 * 2. kref_init in dlm_new_lockres()->dlm_init_lockres(). 1479 * 3. dlm_lookup_lockres() 1480 * The first one is handled at the end of this function. The 1481 * other two are handled in the worker thread after locks have 1482 * been attached. Yes, we don't wait for purge time to match 1483 * kref_init. The lockres will still have atleast one ref 1484 * added because it is in the hash __dlm_insert_lockres() */ 1485 extra_refs++; 1486 1487 /* now that the new lockres is inserted, 1488 * make it usable by other processes */ 1489 spin_lock(&res->spinlock); 1490 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 1491 spin_unlock(&res->spinlock); 1492 wake_up(&res->wq); 1493 } 1494 1495 /* at this point we have allocated everything we need, 1496 * and we have a hashed lockres with an extra ref and 1497 * the proper res->state flags. */ 1498 ret = 0; 1499 spin_lock(&res->spinlock); 1500 /* drop this either when master requery finds a different master 1501 * or when a lock is added by the recovery worker */ 1502 dlm_lockres_grab_inflight_ref(dlm, res); 1503 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1504 /* migration cannot have an unknown master */ 1505 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); 1506 mlog(0, "recovery has passed me a lockres with an " 1507 "unknown owner.. will need to requery: " 1508 "%.*s\n", mres->lockname_len, mres->lockname); 1509 } else { 1510 /* take a reference now to pin the lockres, drop it 1511 * when locks are added in the worker */ 1512 dlm_change_lockres_owner(dlm, res, dlm->node_num); 1513 } 1514 spin_unlock(&res->spinlock); 1515 1516 /* queue up work for dlm_mig_lockres_worker */ 1517 dlm_grab(dlm); /* get an extra ref for the work item */ 1518 memcpy(buf, msg->buf, be16_to_cpu(msg->data_len)); /* copy the whole message */ 1519 dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf); 1520 item->u.ml.lockres = res; /* already have a ref */ 1521 item->u.ml.real_master = real_master; 1522 item->u.ml.extra_ref = extra_refs; 1523 spin_lock(&dlm->work_lock); 1524 list_add_tail(&item->list, &dlm->work_list); 1525 spin_unlock(&dlm->work_lock); 1526 queue_work(dlm->dlm_worker, &dlm->dispatched_work); 1527 1528 leave: 1529 /* One extra ref taken needs to be put here */ 1530 if (extra_refs) 1531 dlm_lockres_put(res); 1532 1533 dlm_put(dlm); 1534 if (ret < 0) { 1535 kfree(buf); 1536 kfree(item); 1537 mlog_errno(ret); 1538 } 1539 1540 return ret; 1541 } 1542 1543 1544 static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data) 1545 { 1546 struct dlm_ctxt *dlm; 1547 struct dlm_migratable_lockres *mres; 1548 int ret = 0; 1549 struct dlm_lock_resource *res; 1550 u8 real_master; 1551 u8 extra_ref; 1552 1553 dlm = item->dlm; 1554 mres = (struct dlm_migratable_lockres *)data; 1555 1556 res = item->u.ml.lockres; 1557 real_master = item->u.ml.real_master; 1558 extra_ref = item->u.ml.extra_ref; 1559 1560 if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1561 /* this case is super-rare. only occurs if 1562 * node death happens during migration. */ 1563 again: 1564 ret = dlm_lockres_master_requery(dlm, res, &real_master); 1565 if (ret < 0) { 1566 mlog(0, "dlm_lockres_master_requery ret=%d\n", 1567 ret); 1568 goto again; 1569 } 1570 if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1571 mlog(0, "lockres %.*s not claimed. " 1572 "this node will take it.\n", 1573 res->lockname.len, res->lockname.name); 1574 } else { 1575 spin_lock(&res->spinlock); 1576 dlm_lockres_drop_inflight_ref(dlm, res); 1577 spin_unlock(&res->spinlock); 1578 mlog(0, "master needs to respond to sender " 1579 "that node %u still owns %.*s\n", 1580 real_master, res->lockname.len, 1581 res->lockname.name); 1582 /* cannot touch this lockres */ 1583 goto leave; 1584 } 1585 } 1586 1587 ret = dlm_process_recovery_data(dlm, res, mres); 1588 if (ret < 0) 1589 mlog(0, "dlm_process_recovery_data returned %d\n", ret); 1590 else 1591 mlog(0, "dlm_process_recovery_data succeeded\n"); 1592 1593 if ((mres->flags & (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) == 1594 (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) { 1595 ret = dlm_finish_migration(dlm, res, mres->master); 1596 if (ret < 0) 1597 mlog_errno(ret); 1598 } 1599 1600 leave: 1601 /* See comment in dlm_mig_lockres_handler() */ 1602 if (res) { 1603 if (extra_ref) 1604 dlm_lockres_put(res); 1605 dlm_lockres_put(res); 1606 } 1607 kfree(data); 1608 } 1609 1610 1611 1612 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, 1613 struct dlm_lock_resource *res, 1614 u8 *real_master) 1615 { 1616 struct dlm_node_iter iter; 1617 int nodenum; 1618 int ret = 0; 1619 1620 *real_master = DLM_LOCK_RES_OWNER_UNKNOWN; 1621 1622 /* we only reach here if one of the two nodes in a 1623 * migration died while the migration was in progress. 1624 * at this point we need to requery the master. we 1625 * know that the new_master got as far as creating 1626 * an mle on at least one node, but we do not know 1627 * if any nodes had actually cleared the mle and set 1628 * the master to the new_master. the old master 1629 * is supposed to set the owner to UNKNOWN in the 1630 * event of a new_master death, so the only possible 1631 * responses that we can get from nodes here are 1632 * that the master is new_master, or that the master 1633 * is UNKNOWN. 1634 * if all nodes come back with UNKNOWN then we know 1635 * the lock needs remastering here. 1636 * if any node comes back with a valid master, check 1637 * to see if that master is the one that we are 1638 * recovering. if so, then the new_master died and 1639 * we need to remaster this lock. if not, then the 1640 * new_master survived and that node will respond to 1641 * other nodes about the owner. 1642 * if there is an owner, this node needs to dump this 1643 * lockres and alert the sender that this lockres 1644 * was rejected. */ 1645 spin_lock(&dlm->spinlock); 1646 dlm_node_iter_init(dlm->domain_map, &iter); 1647 spin_unlock(&dlm->spinlock); 1648 1649 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 1650 /* do not send to self */ 1651 if (nodenum == dlm->node_num) 1652 continue; 1653 ret = dlm_do_master_requery(dlm, res, nodenum, real_master); 1654 if (ret < 0) { 1655 mlog_errno(ret); 1656 if (!dlm_is_host_down(ret)) 1657 BUG(); 1658 /* host is down, so answer for that node would be 1659 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ 1660 } 1661 if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) { 1662 mlog(0, "lock master is %u\n", *real_master); 1663 break; 1664 } 1665 } 1666 return ret; 1667 } 1668 1669 1670 int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 1671 u8 nodenum, u8 *real_master) 1672 { 1673 int ret = -EINVAL; 1674 struct dlm_master_requery req; 1675 int status = DLM_LOCK_RES_OWNER_UNKNOWN; 1676 1677 memset(&req, 0, sizeof(req)); 1678 req.node_idx = dlm->node_num; 1679 req.namelen = res->lockname.len; 1680 memcpy(req.name, res->lockname.name, res->lockname.len); 1681 1682 resend: 1683 ret = o2net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key, 1684 &req, sizeof(req), nodenum, &status); 1685 if (ret < 0) 1686 mlog(ML_ERROR, "Error %d when sending message %u (key " 1687 "0x%x) to node %u\n", ret, DLM_MASTER_REQUERY_MSG, 1688 dlm->key, nodenum); 1689 else if (status == -ENOMEM) { 1690 mlog_errno(status); 1691 msleep(50); 1692 goto resend; 1693 } else { 1694 BUG_ON(status < 0); 1695 BUG_ON(status > DLM_LOCK_RES_OWNER_UNKNOWN); 1696 *real_master = (u8) (status & 0xff); 1697 mlog(0, "node %u responded to master requery with %u\n", 1698 nodenum, *real_master); 1699 ret = 0; 1700 } 1701 return ret; 1702 } 1703 1704 1705 /* this function cannot error, so unless the sending 1706 * or receiving of the message failed, the owner can 1707 * be trusted */ 1708 int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data, 1709 void **ret_data) 1710 { 1711 struct dlm_ctxt *dlm = data; 1712 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; 1713 struct dlm_lock_resource *res = NULL; 1714 unsigned int hash; 1715 int master = DLM_LOCK_RES_OWNER_UNKNOWN; 1716 u32 flags = DLM_ASSERT_MASTER_REQUERY; 1717 int dispatched = 0; 1718 1719 if (!dlm_grab(dlm)) { 1720 /* since the domain has gone away on this 1721 * node, the proper response is UNKNOWN */ 1722 return master; 1723 } 1724 1725 hash = dlm_lockid_hash(req->name, req->namelen); 1726 1727 spin_lock(&dlm->spinlock); 1728 res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash); 1729 if (res) { 1730 spin_lock(&res->spinlock); 1731 master = res->owner; 1732 if (master == dlm->node_num) { 1733 int ret = dlm_dispatch_assert_master(dlm, res, 1734 0, 0, flags); 1735 if (ret < 0) { 1736 mlog_errno(ret); 1737 spin_unlock(&res->spinlock); 1738 dlm_lockres_put(res); 1739 spin_unlock(&dlm->spinlock); 1740 dlm_put(dlm); 1741 /* sender will take care of this and retry */ 1742 return ret; 1743 } else { 1744 dispatched = 1; 1745 __dlm_lockres_grab_inflight_worker(dlm, res); 1746 spin_unlock(&res->spinlock); 1747 } 1748 } else { 1749 /* put.. incase we are not the master */ 1750 spin_unlock(&res->spinlock); 1751 dlm_lockres_put(res); 1752 } 1753 } 1754 spin_unlock(&dlm->spinlock); 1755 1756 if (!dispatched) 1757 dlm_put(dlm); 1758 return master; 1759 } 1760 1761 static inline struct list_head * 1762 dlm_list_num_to_pointer(struct dlm_lock_resource *res, int list_num) 1763 { 1764 struct list_head *ret; 1765 BUG_ON(list_num < 0); 1766 BUG_ON(list_num > 2); 1767 ret = &(res->granted); 1768 ret += list_num; 1769 return ret; 1770 } 1771 /* TODO: do ast flush business 1772 * TODO: do MIGRATING and RECOVERING spinning 1773 */ 1774 1775 /* 1776 * NOTE about in-flight requests during migration: 1777 * 1778 * Before attempting the migrate, the master has marked the lockres as 1779 * MIGRATING and then flushed all of its pending ASTS. So any in-flight 1780 * requests either got queued before the MIGRATING flag got set, in which 1781 * case the lock data will reflect the change and a return message is on 1782 * the way, or the request failed to get in before MIGRATING got set. In 1783 * this case, the caller will be told to spin and wait for the MIGRATING 1784 * flag to be dropped, then recheck the master. 1785 * This holds true for the convert, cancel and unlock cases, and since lvb 1786 * updates are tied to these same messages, it applies to lvb updates as 1787 * well. For the lock case, there is no way a lock can be on the master 1788 * queue and not be on the secondary queue since the lock is always added 1789 * locally first. This means that the new target node will never be sent 1790 * a lock that he doesn't already have on the list. 1791 * In total, this means that the local lock is correct and should not be 1792 * updated to match the one sent by the master. Any messages sent back 1793 * from the master before the MIGRATING flag will bring the lock properly 1794 * up-to-date, and the change will be ordered properly for the waiter. 1795 * We will *not* attempt to modify the lock underneath the waiter. 1796 */ 1797 1798 static int dlm_process_recovery_data(struct dlm_ctxt *dlm, 1799 struct dlm_lock_resource *res, 1800 struct dlm_migratable_lockres *mres) 1801 { 1802 struct dlm_migratable_lock *ml; 1803 struct list_head *queue, *iter; 1804 struct list_head *tmpq = NULL; 1805 struct dlm_lock *newlock = NULL; 1806 struct dlm_lockstatus *lksb = NULL; 1807 int ret = 0; 1808 int i, j, bad; 1809 struct dlm_lock *lock; 1810 u8 from = O2NM_MAX_NODES; 1811 __be64 c; 1812 1813 mlog(0, "running %d locks for this lockres\n", mres->num_locks); 1814 for (i=0; i<mres->num_locks; i++) { 1815 ml = &(mres->ml[i]); 1816 1817 if (dlm_is_dummy_lock(dlm, ml, &from)) { 1818 /* placeholder, just need to set the refmap bit */ 1819 BUG_ON(mres->num_locks != 1); 1820 mlog(0, "%s:%.*s: dummy lock for %u\n", 1821 dlm->name, mres->lockname_len, mres->lockname, 1822 from); 1823 spin_lock(&res->spinlock); 1824 dlm_lockres_set_refmap_bit(dlm, res, from); 1825 spin_unlock(&res->spinlock); 1826 break; 1827 } 1828 BUG_ON(ml->highest_blocked != LKM_IVMODE); 1829 newlock = NULL; 1830 lksb = NULL; 1831 1832 queue = dlm_list_num_to_pointer(res, ml->list); 1833 tmpq = NULL; 1834 1835 /* if the lock is for the local node it needs to 1836 * be moved to the proper location within the queue. 1837 * do not allocate a new lock structure. */ 1838 if (ml->node == dlm->node_num) { 1839 /* MIGRATION ONLY! */ 1840 BUG_ON(!(mres->flags & DLM_MRES_MIGRATION)); 1841 1842 lock = NULL; 1843 spin_lock(&res->spinlock); 1844 for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) { 1845 tmpq = dlm_list_idx_to_ptr(res, j); 1846 list_for_each(iter, tmpq) { 1847 lock = list_entry(iter, 1848 struct dlm_lock, list); 1849 if (lock->ml.cookie == ml->cookie) 1850 break; 1851 lock = NULL; 1852 } 1853 if (lock) 1854 break; 1855 } 1856 1857 /* lock is always created locally first, and 1858 * destroyed locally last. it must be on the list */ 1859 if (!lock) { 1860 c = ml->cookie; 1861 mlog(ML_ERROR, "Could not find local lock " 1862 "with cookie %u:%llu, node %u, " 1863 "list %u, flags 0x%x, type %d, " 1864 "conv %d, highest blocked %d\n", 1865 dlm_get_lock_cookie_node(be64_to_cpu(c)), 1866 dlm_get_lock_cookie_seq(be64_to_cpu(c)), 1867 ml->node, ml->list, ml->flags, ml->type, 1868 ml->convert_type, ml->highest_blocked); 1869 __dlm_print_one_lock_resource(res); 1870 BUG(); 1871 } 1872 1873 if (lock->ml.node != ml->node) { 1874 c = lock->ml.cookie; 1875 mlog(ML_ERROR, "Mismatched node# in lock " 1876 "cookie %u:%llu, name %.*s, node %u\n", 1877 dlm_get_lock_cookie_node(be64_to_cpu(c)), 1878 dlm_get_lock_cookie_seq(be64_to_cpu(c)), 1879 res->lockname.len, res->lockname.name, 1880 lock->ml.node); 1881 c = ml->cookie; 1882 mlog(ML_ERROR, "Migrate lock cookie %u:%llu, " 1883 "node %u, list %u, flags 0x%x, type %d, " 1884 "conv %d, highest blocked %d\n", 1885 dlm_get_lock_cookie_node(be64_to_cpu(c)), 1886 dlm_get_lock_cookie_seq(be64_to_cpu(c)), 1887 ml->node, ml->list, ml->flags, ml->type, 1888 ml->convert_type, ml->highest_blocked); 1889 __dlm_print_one_lock_resource(res); 1890 BUG(); 1891 } 1892 1893 if (tmpq != queue) { 1894 c = ml->cookie; 1895 mlog(0, "Lock cookie %u:%llu was on list %u " 1896 "instead of list %u for %.*s\n", 1897 dlm_get_lock_cookie_node(be64_to_cpu(c)), 1898 dlm_get_lock_cookie_seq(be64_to_cpu(c)), 1899 j, ml->list, res->lockname.len, 1900 res->lockname.name); 1901 __dlm_print_one_lock_resource(res); 1902 spin_unlock(&res->spinlock); 1903 continue; 1904 } 1905 1906 /* see NOTE above about why we do not update 1907 * to match the master here */ 1908 1909 /* move the lock to its proper place */ 1910 /* do not alter lock refcount. switching lists. */ 1911 list_move_tail(&lock->list, queue); 1912 spin_unlock(&res->spinlock); 1913 1914 mlog(0, "just reordered a local lock!\n"); 1915 continue; 1916 } 1917 1918 /* lock is for another node. */ 1919 newlock = dlm_new_lock(ml->type, ml->node, 1920 be64_to_cpu(ml->cookie), NULL); 1921 if (!newlock) { 1922 ret = -ENOMEM; 1923 goto leave; 1924 } 1925 lksb = newlock->lksb; 1926 dlm_lock_attach_lockres(newlock, res); 1927 1928 if (ml->convert_type != LKM_IVMODE) { 1929 BUG_ON(queue != &res->converting); 1930 newlock->ml.convert_type = ml->convert_type; 1931 } 1932 lksb->flags |= (ml->flags & 1933 (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)); 1934 1935 if (ml->type == LKM_NLMODE) 1936 goto skip_lvb; 1937 1938 /* 1939 * If the lock is in the blocked list it can't have a valid lvb, 1940 * so skip it 1941 */ 1942 if (ml->list == DLM_BLOCKED_LIST) 1943 goto skip_lvb; 1944 1945 if (!dlm_lvb_is_empty(mres->lvb)) { 1946 if (lksb->flags & DLM_LKSB_PUT_LVB) { 1947 /* other node was trying to update 1948 * lvb when node died. recreate the 1949 * lksb with the updated lvb. */ 1950 memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN); 1951 /* the lock resource lvb update must happen 1952 * NOW, before the spinlock is dropped. 1953 * we no longer wait for the AST to update 1954 * the lvb. */ 1955 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); 1956 } else { 1957 /* otherwise, the node is sending its 1958 * most recent valid lvb info */ 1959 BUG_ON(ml->type != LKM_EXMODE && 1960 ml->type != LKM_PRMODE); 1961 if (!dlm_lvb_is_empty(res->lvb) && 1962 (ml->type == LKM_EXMODE || 1963 memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { 1964 int i; 1965 mlog(ML_ERROR, "%s:%.*s: received bad " 1966 "lvb! type=%d\n", dlm->name, 1967 res->lockname.len, 1968 res->lockname.name, ml->type); 1969 printk("lockres lvb=["); 1970 for (i=0; i<DLM_LVB_LEN; i++) 1971 printk("%02x", res->lvb[i]); 1972 printk("]\nmigrated lvb=["); 1973 for (i=0; i<DLM_LVB_LEN; i++) 1974 printk("%02x", mres->lvb[i]); 1975 printk("]\n"); 1976 dlm_print_one_lock_resource(res); 1977 BUG(); 1978 } 1979 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); 1980 } 1981 } 1982 skip_lvb: 1983 1984 /* NOTE: 1985 * wrt lock queue ordering and recovery: 1986 * 1. order of locks on granted queue is 1987 * meaningless. 1988 * 2. order of locks on converting queue is 1989 * LOST with the node death. sorry charlie. 1990 * 3. order of locks on the blocked queue is 1991 * also LOST. 1992 * order of locks does not affect integrity, it 1993 * just means that a lock request may get pushed 1994 * back in line as a result of the node death. 1995 * also note that for a given node the lock order 1996 * for its secondary queue locks is preserved 1997 * relative to each other, but clearly *not* 1998 * preserved relative to locks from other nodes. 1999 */ 2000 bad = 0; 2001 spin_lock(&res->spinlock); 2002 list_for_each_entry(lock, queue, list) { 2003 if (lock->ml.cookie == ml->cookie) { 2004 c = lock->ml.cookie; 2005 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already " 2006 "exists on this lockres!\n", dlm->name, 2007 res->lockname.len, res->lockname.name, 2008 dlm_get_lock_cookie_node(be64_to_cpu(c)), 2009 dlm_get_lock_cookie_seq(be64_to_cpu(c))); 2010 2011 mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, " 2012 "node=%u, cookie=%u:%llu, queue=%d\n", 2013 ml->type, ml->convert_type, ml->node, 2014 dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)), 2015 dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)), 2016 ml->list); 2017 2018 __dlm_print_one_lock_resource(res); 2019 bad = 1; 2020 break; 2021 } 2022 } 2023 if (!bad) { 2024 dlm_lock_get(newlock); 2025 if (mres->flags & DLM_MRES_RECOVERY && 2026 ml->list == DLM_CONVERTING_LIST && 2027 newlock->ml.type > 2028 newlock->ml.convert_type) { 2029 /* newlock is doing downconvert, add it to the 2030 * head of converting list */ 2031 list_add(&newlock->list, queue); 2032 } else 2033 list_add_tail(&newlock->list, queue); 2034 mlog(0, "%s:%.*s: added lock for node %u, " 2035 "setting refmap bit\n", dlm->name, 2036 res->lockname.len, res->lockname.name, ml->node); 2037 dlm_lockres_set_refmap_bit(dlm, res, ml->node); 2038 } 2039 spin_unlock(&res->spinlock); 2040 } 2041 mlog(0, "done running all the locks\n"); 2042 2043 leave: 2044 /* balance the ref taken when the work was queued */ 2045 spin_lock(&res->spinlock); 2046 dlm_lockres_drop_inflight_ref(dlm, res); 2047 spin_unlock(&res->spinlock); 2048 2049 if (ret < 0) 2050 mlog_errno(ret); 2051 2052 return ret; 2053 } 2054 2055 void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm, 2056 struct dlm_lock_resource *res) 2057 { 2058 int i; 2059 struct list_head *queue; 2060 struct dlm_lock *lock, *next; 2061 2062 assert_spin_locked(&dlm->spinlock); 2063 assert_spin_locked(&res->spinlock); 2064 res->state |= DLM_LOCK_RES_RECOVERING; 2065 if (!list_empty(&res->recovering)) { 2066 mlog(0, 2067 "Recovering res %s:%.*s, is already on recovery list!\n", 2068 dlm->name, res->lockname.len, res->lockname.name); 2069 list_del_init(&res->recovering); 2070 dlm_lockres_put(res); 2071 } 2072 /* We need to hold a reference while on the recovery list */ 2073 dlm_lockres_get(res); 2074 list_add_tail(&res->recovering, &dlm->reco.resources); 2075 2076 /* find any pending locks and put them back on proper list */ 2077 for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) { 2078 queue = dlm_list_idx_to_ptr(res, i); 2079 list_for_each_entry_safe(lock, next, queue, list) { 2080 dlm_lock_get(lock); 2081 if (lock->convert_pending) { 2082 /* move converting lock back to granted */ 2083 mlog(0, "node died with convert pending " 2084 "on %.*s. move back to granted list.\n", 2085 res->lockname.len, res->lockname.name); 2086 dlm_revert_pending_convert(res, lock); 2087 lock->convert_pending = 0; 2088 } else if (lock->lock_pending) { 2089 /* remove pending lock requests completely */ 2090 BUG_ON(i != DLM_BLOCKED_LIST); 2091 mlog(0, "node died with lock pending " 2092 "on %.*s. remove from blocked list and skip.\n", 2093 res->lockname.len, res->lockname.name); 2094 /* lock will be floating until ref in 2095 * dlmlock_remote is freed after the network 2096 * call returns. ok for it to not be on any 2097 * list since no ast can be called 2098 * (the master is dead). */ 2099 dlm_revert_pending_lock(res, lock); 2100 lock->lock_pending = 0; 2101 } else if (lock->unlock_pending) { 2102 /* if an unlock was in progress, treat as 2103 * if this had completed successfully 2104 * before sending this lock state to the 2105 * new master. note that the dlm_unlock 2106 * call is still responsible for calling 2107 * the unlockast. that will happen after 2108 * the network call times out. for now, 2109 * just move lists to prepare the new 2110 * recovery master. */ 2111 BUG_ON(i != DLM_GRANTED_LIST); 2112 mlog(0, "node died with unlock pending " 2113 "on %.*s. remove from blocked list and skip.\n", 2114 res->lockname.len, res->lockname.name); 2115 dlm_commit_pending_unlock(res, lock); 2116 lock->unlock_pending = 0; 2117 } else if (lock->cancel_pending) { 2118 /* if a cancel was in progress, treat as 2119 * if this had completed successfully 2120 * before sending this lock state to the 2121 * new master */ 2122 BUG_ON(i != DLM_CONVERTING_LIST); 2123 mlog(0, "node died with cancel pending " 2124 "on %.*s. move back to granted list.\n", 2125 res->lockname.len, res->lockname.name); 2126 dlm_commit_pending_cancel(res, lock); 2127 lock->cancel_pending = 0; 2128 } 2129 dlm_lock_put(lock); 2130 } 2131 } 2132 } 2133 2134 2135 2136 /* removes all recovered locks from the recovery list. 2137 * sets the res->owner to the new master. 2138 * unsets the RECOVERY flag and wakes waiters. */ 2139 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, 2140 u8 dead_node, u8 new_master) 2141 { 2142 int i; 2143 struct hlist_head *bucket; 2144 struct dlm_lock_resource *res, *next; 2145 2146 assert_spin_locked(&dlm->spinlock); 2147 2148 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { 2149 if (res->owner == dead_node) { 2150 mlog(0, "%s: res %.*s, Changing owner from %u to %u\n", 2151 dlm->name, res->lockname.len, res->lockname.name, 2152 res->owner, new_master); 2153 list_del_init(&res->recovering); 2154 spin_lock(&res->spinlock); 2155 /* new_master has our reference from 2156 * the lock state sent during recovery */ 2157 dlm_change_lockres_owner(dlm, res, new_master); 2158 res->state &= ~DLM_LOCK_RES_RECOVERING; 2159 if (__dlm_lockres_has_locks(res)) 2160 __dlm_dirty_lockres(dlm, res); 2161 spin_unlock(&res->spinlock); 2162 wake_up(&res->wq); 2163 dlm_lockres_put(res); 2164 } 2165 } 2166 2167 /* this will become unnecessary eventually, but 2168 * for now we need to run the whole hash, clear 2169 * the RECOVERING state and set the owner 2170 * if necessary */ 2171 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 2172 bucket = dlm_lockres_hash(dlm, i); 2173 hlist_for_each_entry(res, bucket, hash_node) { 2174 if (res->state & DLM_LOCK_RES_RECOVERY_WAITING) { 2175 spin_lock(&res->spinlock); 2176 res->state &= ~DLM_LOCK_RES_RECOVERY_WAITING; 2177 spin_unlock(&res->spinlock); 2178 wake_up(&res->wq); 2179 } 2180 2181 if (!(res->state & DLM_LOCK_RES_RECOVERING)) 2182 continue; 2183 2184 if (res->owner != dead_node && 2185 res->owner != dlm->node_num) 2186 continue; 2187 2188 if (!list_empty(&res->recovering)) { 2189 list_del_init(&res->recovering); 2190 dlm_lockres_put(res); 2191 } 2192 2193 /* new_master has our reference from 2194 * the lock state sent during recovery */ 2195 mlog(0, "%s: res %.*s, Changing owner from %u to %u\n", 2196 dlm->name, res->lockname.len, res->lockname.name, 2197 res->owner, new_master); 2198 spin_lock(&res->spinlock); 2199 dlm_change_lockres_owner(dlm, res, new_master); 2200 res->state &= ~DLM_LOCK_RES_RECOVERING; 2201 if (__dlm_lockres_has_locks(res)) 2202 __dlm_dirty_lockres(dlm, res); 2203 spin_unlock(&res->spinlock); 2204 wake_up(&res->wq); 2205 } 2206 } 2207 } 2208 2209 static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local) 2210 { 2211 if (local) { 2212 if (lock->ml.type != LKM_EXMODE && 2213 lock->ml.type != LKM_PRMODE) 2214 return 1; 2215 } else if (lock->ml.type == LKM_EXMODE) 2216 return 1; 2217 return 0; 2218 } 2219 2220 static void dlm_revalidate_lvb(struct dlm_ctxt *dlm, 2221 struct dlm_lock_resource *res, u8 dead_node) 2222 { 2223 struct list_head *queue; 2224 struct dlm_lock *lock; 2225 int blank_lvb = 0, local = 0; 2226 int i; 2227 u8 search_node; 2228 2229 assert_spin_locked(&dlm->spinlock); 2230 assert_spin_locked(&res->spinlock); 2231 2232 if (res->owner == dlm->node_num) 2233 /* if this node owned the lockres, and if the dead node 2234 * had an EX when he died, blank out the lvb */ 2235 search_node = dead_node; 2236 else { 2237 /* if this is a secondary lockres, and we had no EX or PR 2238 * locks granted, we can no longer trust the lvb */ 2239 search_node = dlm->node_num; 2240 local = 1; /* check local state for valid lvb */ 2241 } 2242 2243 for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) { 2244 queue = dlm_list_idx_to_ptr(res, i); 2245 list_for_each_entry(lock, queue, list) { 2246 if (lock->ml.node == search_node) { 2247 if (dlm_lvb_needs_invalidation(lock, local)) { 2248 /* zero the lksb lvb and lockres lvb */ 2249 blank_lvb = 1; 2250 memset(lock->lksb->lvb, 0, DLM_LVB_LEN); 2251 } 2252 } 2253 } 2254 } 2255 2256 if (blank_lvb) { 2257 mlog(0, "clearing %.*s lvb, dead node %u had EX\n", 2258 res->lockname.len, res->lockname.name, dead_node); 2259 memset(res->lvb, 0, DLM_LVB_LEN); 2260 } 2261 } 2262 2263 static void dlm_free_dead_locks(struct dlm_ctxt *dlm, 2264 struct dlm_lock_resource *res, u8 dead_node) 2265 { 2266 struct dlm_lock *lock, *next; 2267 unsigned int freed = 0; 2268 2269 /* this node is the lockres master: 2270 * 1) remove any stale locks for the dead node 2271 * 2) if the dead node had an EX when he died, blank out the lvb 2272 */ 2273 assert_spin_locked(&dlm->spinlock); 2274 assert_spin_locked(&res->spinlock); 2275 2276 /* We do two dlm_lock_put(). One for removing from list and the other is 2277 * to force the DLM_UNLOCK_FREE_LOCK action so as to free the locks */ 2278 2279 /* TODO: check pending_asts, pending_basts here */ 2280 list_for_each_entry_safe(lock, next, &res->granted, list) { 2281 if (lock->ml.node == dead_node) { 2282 list_del_init(&lock->list); 2283 dlm_lock_put(lock); 2284 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */ 2285 dlm_lock_put(lock); 2286 freed++; 2287 } 2288 } 2289 list_for_each_entry_safe(lock, next, &res->converting, list) { 2290 if (lock->ml.node == dead_node) { 2291 list_del_init(&lock->list); 2292 dlm_lock_put(lock); 2293 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */ 2294 dlm_lock_put(lock); 2295 freed++; 2296 } 2297 } 2298 list_for_each_entry_safe(lock, next, &res->blocked, list) { 2299 if (lock->ml.node == dead_node) { 2300 list_del_init(&lock->list); 2301 dlm_lock_put(lock); 2302 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */ 2303 dlm_lock_put(lock); 2304 freed++; 2305 } 2306 } 2307 2308 if (freed) { 2309 mlog(0, "%s:%.*s: freed %u locks for dead node %u, " 2310 "dropping ref from lockres\n", dlm->name, 2311 res->lockname.len, res->lockname.name, freed, dead_node); 2312 if(!test_bit(dead_node, res->refmap)) { 2313 mlog(ML_ERROR, "%s:%.*s: freed %u locks for dead node %u, " 2314 "but ref was not set\n", dlm->name, 2315 res->lockname.len, res->lockname.name, freed, dead_node); 2316 __dlm_print_one_lock_resource(res); 2317 } 2318 res->state |= DLM_LOCK_RES_RECOVERY_WAITING; 2319 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); 2320 } else if (test_bit(dead_node, res->refmap)) { 2321 mlog(0, "%s:%.*s: dead node %u had a ref, but had " 2322 "no locks and had not purged before dying\n", dlm->name, 2323 res->lockname.len, res->lockname.name, dead_node); 2324 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); 2325 } 2326 2327 /* do not kick thread yet */ 2328 __dlm_dirty_lockres(dlm, res); 2329 } 2330 2331 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) 2332 { 2333 struct dlm_lock_resource *res; 2334 int i; 2335 struct hlist_head *bucket; 2336 struct hlist_node *tmp; 2337 struct dlm_lock *lock; 2338 2339 2340 /* purge any stale mles */ 2341 dlm_clean_master_list(dlm, dead_node); 2342 2343 /* 2344 * now clean up all lock resources. there are two rules: 2345 * 2346 * 1) if the dead node was the master, move the lockres 2347 * to the recovering list. set the RECOVERING flag. 2348 * this lockres needs to be cleaned up before it can 2349 * be used further. 2350 * 2351 * 2) if this node was the master, remove all locks from 2352 * each of the lockres queues that were owned by the 2353 * dead node. once recovery finishes, the dlm thread 2354 * can be kicked again to see if any ASTs or BASTs 2355 * need to be fired as a result. 2356 */ 2357 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 2358 bucket = dlm_lockres_hash(dlm, i); 2359 hlist_for_each_entry_safe(res, tmp, bucket, hash_node) { 2360 /* always prune any $RECOVERY entries for dead nodes, 2361 * otherwise hangs can occur during later recovery */ 2362 if (dlm_is_recovery_lock(res->lockname.name, 2363 res->lockname.len)) { 2364 spin_lock(&res->spinlock); 2365 list_for_each_entry(lock, &res->granted, list) { 2366 if (lock->ml.node == dead_node) { 2367 mlog(0, "AHA! there was " 2368 "a $RECOVERY lock for dead " 2369 "node %u (%s)!\n", 2370 dead_node, dlm->name); 2371 list_del_init(&lock->list); 2372 dlm_lock_put(lock); 2373 /* Can't schedule 2374 * DLM_UNLOCK_FREE_LOCK 2375 * - do manually */ 2376 dlm_lock_put(lock); 2377 break; 2378 } 2379 } 2380 2381 if ((res->owner == dead_node) && 2382 (res->state & DLM_LOCK_RES_DROPPING_REF)) { 2383 dlm_lockres_get(res); 2384 __dlm_do_purge_lockres(dlm, res); 2385 spin_unlock(&res->spinlock); 2386 wake_up(&res->wq); 2387 dlm_lockres_put(res); 2388 continue; 2389 } else if (res->owner == dlm->node_num) 2390 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); 2391 spin_unlock(&res->spinlock); 2392 continue; 2393 } 2394 spin_lock(&res->spinlock); 2395 /* zero the lvb if necessary */ 2396 dlm_revalidate_lvb(dlm, res, dead_node); 2397 if (res->owner == dead_node) { 2398 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 2399 mlog(0, "%s:%.*s: owned by " 2400 "dead node %u, this node was " 2401 "dropping its ref when master died. " 2402 "continue, purging the lockres.\n", 2403 dlm->name, res->lockname.len, 2404 res->lockname.name, dead_node); 2405 dlm_lockres_get(res); 2406 __dlm_do_purge_lockres(dlm, res); 2407 spin_unlock(&res->spinlock); 2408 wake_up(&res->wq); 2409 dlm_lockres_put(res); 2410 continue; 2411 } 2412 dlm_move_lockres_to_recovery_list(dlm, res); 2413 } else if (res->owner == dlm->node_num) { 2414 dlm_free_dead_locks(dlm, res, dead_node); 2415 __dlm_lockres_calc_usage(dlm, res); 2416 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { 2417 if (test_bit(dead_node, res->refmap)) { 2418 mlog(0, "%s:%.*s: dead node %u had a ref, but had " 2419 "no locks and had not purged before dying\n", 2420 dlm->name, res->lockname.len, 2421 res->lockname.name, dead_node); 2422 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); 2423 } 2424 } 2425 spin_unlock(&res->spinlock); 2426 } 2427 } 2428 2429 } 2430 2431 static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx) 2432 { 2433 assert_spin_locked(&dlm->spinlock); 2434 2435 if (dlm->reco.new_master == idx) { 2436 mlog(0, "%s: recovery master %d just died\n", 2437 dlm->name, idx); 2438 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { 2439 /* finalize1 was reached, so it is safe to clear 2440 * the new_master and dead_node. that recovery 2441 * is complete. */ 2442 mlog(0, "%s: dead master %d had reached " 2443 "finalize1 state, clearing\n", dlm->name, idx); 2444 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; 2445 __dlm_reset_recovery(dlm); 2446 } 2447 } 2448 2449 /* Clean up join state on node death. */ 2450 if (dlm->joining_node == idx) { 2451 mlog(0, "Clearing join state for node %u\n", idx); 2452 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 2453 } 2454 2455 /* check to see if the node is already considered dead */ 2456 if (!test_bit(idx, dlm->live_nodes_map)) { 2457 mlog(0, "for domain %s, node %d is already dead. " 2458 "another node likely did recovery already.\n", 2459 dlm->name, idx); 2460 return; 2461 } 2462 2463 /* check to see if we do not care about this node */ 2464 if (!test_bit(idx, dlm->domain_map)) { 2465 /* This also catches the case that we get a node down 2466 * but haven't joined the domain yet. */ 2467 mlog(0, "node %u already removed from domain!\n", idx); 2468 return; 2469 } 2470 2471 clear_bit(idx, dlm->live_nodes_map); 2472 2473 /* make sure local cleanup occurs before the heartbeat events */ 2474 if (!test_bit(idx, dlm->recovery_map)) 2475 dlm_do_local_recovery_cleanup(dlm, idx); 2476 2477 /* notify anything attached to the heartbeat events */ 2478 dlm_hb_event_notify_attached(dlm, idx, 0); 2479 2480 mlog(0, "node %u being removed from domain map!\n", idx); 2481 clear_bit(idx, dlm->domain_map); 2482 clear_bit(idx, dlm->exit_domain_map); 2483 /* wake up migration waiters if a node goes down. 2484 * perhaps later we can genericize this for other waiters. */ 2485 wake_up(&dlm->migration_wq); 2486 2487 set_bit(idx, dlm->recovery_map); 2488 } 2489 2490 void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data) 2491 { 2492 struct dlm_ctxt *dlm = data; 2493 2494 if (!dlm_grab(dlm)) 2495 return; 2496 2497 /* 2498 * This will notify any dlm users that a node in our domain 2499 * went away without notifying us first. 2500 */ 2501 if (test_bit(idx, dlm->domain_map)) 2502 dlm_fire_domain_eviction_callbacks(dlm, idx); 2503 2504 spin_lock(&dlm->spinlock); 2505 __dlm_hb_node_down(dlm, idx); 2506 spin_unlock(&dlm->spinlock); 2507 2508 dlm_put(dlm); 2509 } 2510 2511 void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data) 2512 { 2513 struct dlm_ctxt *dlm = data; 2514 2515 if (!dlm_grab(dlm)) 2516 return; 2517 2518 spin_lock(&dlm->spinlock); 2519 set_bit(idx, dlm->live_nodes_map); 2520 /* do NOT notify mle attached to the heartbeat events. 2521 * new nodes are not interesting in mastery until joined. */ 2522 spin_unlock(&dlm->spinlock); 2523 2524 dlm_put(dlm); 2525 } 2526 2527 static void dlm_reco_ast(void *astdata) 2528 { 2529 struct dlm_ctxt *dlm = astdata; 2530 mlog(0, "ast for recovery lock fired!, this=%u, dlm=%s\n", 2531 dlm->node_num, dlm->name); 2532 } 2533 static void dlm_reco_bast(void *astdata, int blocked_type) 2534 { 2535 struct dlm_ctxt *dlm = astdata; 2536 mlog(0, "bast for recovery lock fired!, this=%u, dlm=%s\n", 2537 dlm->node_num, dlm->name); 2538 } 2539 static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st) 2540 { 2541 mlog(0, "unlockast for recovery lock fired!\n"); 2542 } 2543 2544 /* 2545 * dlm_pick_recovery_master will continually attempt to use 2546 * dlmlock() on the special "$RECOVERY" lockres with the 2547 * LKM_NOQUEUE flag to get an EX. every thread that enters 2548 * this function on each node racing to become the recovery 2549 * master will not stop attempting this until either: 2550 * a) this node gets the EX (and becomes the recovery master), 2551 * or b) dlm->reco.new_master gets set to some nodenum 2552 * != O2NM_INVALID_NODE_NUM (another node will do the reco). 2553 * so each time a recovery master is needed, the entire cluster 2554 * will sync at this point. if the new master dies, that will 2555 * be detected in dlm_do_recovery */ 2556 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) 2557 { 2558 enum dlm_status ret; 2559 struct dlm_lockstatus lksb; 2560 int status = -EINVAL; 2561 2562 mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n", 2563 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); 2564 again: 2565 memset(&lksb, 0, sizeof(lksb)); 2566 2567 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, 2568 DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN, 2569 dlm_reco_ast, dlm, dlm_reco_bast); 2570 2571 mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n", 2572 dlm->name, ret, lksb.status); 2573 2574 if (ret == DLM_NORMAL) { 2575 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n", 2576 dlm->name, dlm->node_num); 2577 2578 /* got the EX lock. check to see if another node 2579 * just became the reco master */ 2580 if (dlm_reco_master_ready(dlm)) { 2581 mlog(0, "%s: got reco EX lock, but %u will " 2582 "do the recovery\n", dlm->name, 2583 dlm->reco.new_master); 2584 status = -EEXIST; 2585 } else { 2586 status = 0; 2587 2588 /* see if recovery was already finished elsewhere */ 2589 spin_lock(&dlm->spinlock); 2590 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { 2591 status = -EINVAL; 2592 mlog(0, "%s: got reco EX lock, but " 2593 "node got recovered already\n", dlm->name); 2594 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { 2595 mlog(ML_ERROR, "%s: new master is %u " 2596 "but no dead node!\n", 2597 dlm->name, dlm->reco.new_master); 2598 BUG(); 2599 } 2600 } 2601 spin_unlock(&dlm->spinlock); 2602 } 2603 2604 /* if this node has actually become the recovery master, 2605 * set the master and send the messages to begin recovery */ 2606 if (!status) { 2607 mlog(0, "%s: dead=%u, this=%u, sending " 2608 "begin_reco now\n", dlm->name, 2609 dlm->reco.dead_node, dlm->node_num); 2610 status = dlm_send_begin_reco_message(dlm, 2611 dlm->reco.dead_node); 2612 /* this always succeeds */ 2613 BUG_ON(status); 2614 2615 /* set the new_master to this node */ 2616 spin_lock(&dlm->spinlock); 2617 dlm_set_reco_master(dlm, dlm->node_num); 2618 spin_unlock(&dlm->spinlock); 2619 } 2620 2621 /* recovery lock is a special case. ast will not get fired, 2622 * so just go ahead and unlock it. */ 2623 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm); 2624 if (ret == DLM_DENIED) { 2625 mlog(0, "got DLM_DENIED, trying LKM_CANCEL\n"); 2626 ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm); 2627 } 2628 if (ret != DLM_NORMAL) { 2629 /* this would really suck. this could only happen 2630 * if there was a network error during the unlock 2631 * because of node death. this means the unlock 2632 * is actually "done" and the lock structure is 2633 * even freed. we can continue, but only 2634 * because this specific lock name is special. */ 2635 mlog(ML_ERROR, "dlmunlock returned %d\n", ret); 2636 } 2637 } else if (ret == DLM_NOTQUEUED) { 2638 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n", 2639 dlm->name, dlm->node_num); 2640 /* another node is master. wait on 2641 * reco.new_master != O2NM_INVALID_NODE_NUM 2642 * for at most one second */ 2643 wait_event_timeout(dlm->dlm_reco_thread_wq, 2644 dlm_reco_master_ready(dlm), 2645 msecs_to_jiffies(1000)); 2646 if (!dlm_reco_master_ready(dlm)) { 2647 mlog(0, "%s: reco master taking awhile\n", 2648 dlm->name); 2649 goto again; 2650 } 2651 /* another node has informed this one that it is reco master */ 2652 mlog(0, "%s: reco master %u is ready to recover %u\n", 2653 dlm->name, dlm->reco.new_master, dlm->reco.dead_node); 2654 status = -EEXIST; 2655 } else if (ret == DLM_RECOVERING) { 2656 mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n", 2657 dlm->name, dlm->node_num); 2658 goto again; 2659 } else { 2660 struct dlm_lock_resource *res; 2661 2662 /* dlmlock returned something other than NOTQUEUED or NORMAL */ 2663 mlog(ML_ERROR, "%s: got %s from dlmlock($RECOVERY), " 2664 "lksb.status=%s\n", dlm->name, dlm_errname(ret), 2665 dlm_errname(lksb.status)); 2666 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, 2667 DLM_RECOVERY_LOCK_NAME_LEN); 2668 if (res) { 2669 dlm_print_one_lock_resource(res); 2670 dlm_lockres_put(res); 2671 } else { 2672 mlog(ML_ERROR, "recovery lock not found\n"); 2673 } 2674 BUG(); 2675 } 2676 2677 return status; 2678 } 2679 2680 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) 2681 { 2682 struct dlm_begin_reco br; 2683 int ret = 0; 2684 struct dlm_node_iter iter; 2685 int nodenum; 2686 int status; 2687 2688 mlog(0, "%s: dead node is %u\n", dlm->name, dead_node); 2689 2690 spin_lock(&dlm->spinlock); 2691 dlm_node_iter_init(dlm->domain_map, &iter); 2692 spin_unlock(&dlm->spinlock); 2693 2694 clear_bit(dead_node, iter.node_map); 2695 2696 memset(&br, 0, sizeof(br)); 2697 br.node_idx = dlm->node_num; 2698 br.dead_node = dead_node; 2699 2700 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 2701 ret = 0; 2702 if (nodenum == dead_node) { 2703 mlog(0, "not sending begin reco to dead node " 2704 "%u\n", dead_node); 2705 continue; 2706 } 2707 if (nodenum == dlm->node_num) { 2708 mlog(0, "not sending begin reco to self\n"); 2709 continue; 2710 } 2711 retry: 2712 ret = -EINVAL; 2713 mlog(0, "attempting to send begin reco msg to %d\n", 2714 nodenum); 2715 ret = o2net_send_message(DLM_BEGIN_RECO_MSG, dlm->key, 2716 &br, sizeof(br), nodenum, &status); 2717 /* negative status is handled ok by caller here */ 2718 if (ret >= 0) 2719 ret = status; 2720 if (dlm_is_host_down(ret)) { 2721 /* node is down. not involved in recovery 2722 * so just keep going */ 2723 mlog(ML_NOTICE, "%s: node %u was down when sending " 2724 "begin reco msg (%d)\n", dlm->name, nodenum, ret); 2725 ret = 0; 2726 } 2727 2728 /* 2729 * Prior to commit aad1b15310b9bcd59fa81ab8f2b1513b59553ea8, 2730 * dlm_begin_reco_handler() returned EAGAIN and not -EAGAIN. 2731 * We are handling both for compatibility reasons. 2732 */ 2733 if (ret == -EAGAIN || ret == EAGAIN) { 2734 mlog(0, "%s: trying to start recovery of node " 2735 "%u, but node %u is waiting for last recovery " 2736 "to complete, backoff for a bit\n", dlm->name, 2737 dead_node, nodenum); 2738 msleep(100); 2739 goto retry; 2740 } 2741 if (ret < 0) { 2742 struct dlm_lock_resource *res; 2743 2744 /* this is now a serious problem, possibly ENOMEM 2745 * in the network stack. must retry */ 2746 mlog_errno(ret); 2747 mlog(ML_ERROR, "begin reco of dlm %s to node %u " 2748 "returned %d\n", dlm->name, nodenum, ret); 2749 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, 2750 DLM_RECOVERY_LOCK_NAME_LEN); 2751 if (res) { 2752 dlm_print_one_lock_resource(res); 2753 dlm_lockres_put(res); 2754 } else { 2755 mlog(ML_ERROR, "recovery lock not found\n"); 2756 } 2757 /* sleep for a bit in hopes that we can avoid 2758 * another ENOMEM */ 2759 msleep(100); 2760 goto retry; 2761 } 2762 } 2763 2764 return ret; 2765 } 2766 2767 int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data, 2768 void **ret_data) 2769 { 2770 struct dlm_ctxt *dlm = data; 2771 struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf; 2772 2773 /* ok to return 0, domain has gone away */ 2774 if (!dlm_grab(dlm)) 2775 return 0; 2776 2777 spin_lock(&dlm->spinlock); 2778 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { 2779 mlog(0, "%s: node %u wants to recover node %u (%u:%u) " 2780 "but this node is in finalize state, waiting on finalize2\n", 2781 dlm->name, br->node_idx, br->dead_node, 2782 dlm->reco.dead_node, dlm->reco.new_master); 2783 spin_unlock(&dlm->spinlock); 2784 dlm_put(dlm); 2785 return -EAGAIN; 2786 } 2787 spin_unlock(&dlm->spinlock); 2788 2789 mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n", 2790 dlm->name, br->node_idx, br->dead_node, 2791 dlm->reco.dead_node, dlm->reco.new_master); 2792 2793 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node); 2794 2795 spin_lock(&dlm->spinlock); 2796 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { 2797 if (test_bit(dlm->reco.new_master, dlm->recovery_map)) { 2798 mlog(0, "%s: new_master %u died, changing " 2799 "to %u\n", dlm->name, dlm->reco.new_master, 2800 br->node_idx); 2801 } else { 2802 mlog(0, "%s: new_master %u NOT DEAD, changing " 2803 "to %u\n", dlm->name, dlm->reco.new_master, 2804 br->node_idx); 2805 /* may not have seen the new master as dead yet */ 2806 } 2807 } 2808 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { 2809 mlog(ML_NOTICE, "%s: dead_node previously set to %u, " 2810 "node %u changing it to %u\n", dlm->name, 2811 dlm->reco.dead_node, br->node_idx, br->dead_node); 2812 } 2813 dlm_set_reco_master(dlm, br->node_idx); 2814 dlm_set_reco_dead_node(dlm, br->dead_node); 2815 if (!test_bit(br->dead_node, dlm->recovery_map)) { 2816 mlog(0, "recovery master %u sees %u as dead, but this " 2817 "node has not yet. marking %u as dead\n", 2818 br->node_idx, br->dead_node, br->dead_node); 2819 if (!test_bit(br->dead_node, dlm->domain_map) || 2820 !test_bit(br->dead_node, dlm->live_nodes_map)) 2821 mlog(0, "%u not in domain/live_nodes map " 2822 "so setting it in reco map manually\n", 2823 br->dead_node); 2824 /* force the recovery cleanup in __dlm_hb_node_down 2825 * both of these will be cleared in a moment */ 2826 set_bit(br->dead_node, dlm->domain_map); 2827 set_bit(br->dead_node, dlm->live_nodes_map); 2828 __dlm_hb_node_down(dlm, br->dead_node); 2829 } 2830 spin_unlock(&dlm->spinlock); 2831 2832 dlm_kick_recovery_thread(dlm); 2833 2834 mlog(0, "%s: recovery started by node %u, for %u (%u:%u)\n", 2835 dlm->name, br->node_idx, br->dead_node, 2836 dlm->reco.dead_node, dlm->reco.new_master); 2837 2838 dlm_put(dlm); 2839 return 0; 2840 } 2841 2842 #define DLM_FINALIZE_STAGE2 0x01 2843 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) 2844 { 2845 int ret = 0; 2846 struct dlm_finalize_reco fr; 2847 struct dlm_node_iter iter; 2848 int nodenum; 2849 int status; 2850 int stage = 1; 2851 2852 mlog(0, "finishing recovery for node %s:%u, " 2853 "stage %d\n", dlm->name, dlm->reco.dead_node, stage); 2854 2855 spin_lock(&dlm->spinlock); 2856 dlm_node_iter_init(dlm->domain_map, &iter); 2857 spin_unlock(&dlm->spinlock); 2858 2859 stage2: 2860 memset(&fr, 0, sizeof(fr)); 2861 fr.node_idx = dlm->node_num; 2862 fr.dead_node = dlm->reco.dead_node; 2863 if (stage == 2) 2864 fr.flags |= DLM_FINALIZE_STAGE2; 2865 2866 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 2867 if (nodenum == dlm->node_num) 2868 continue; 2869 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, 2870 &fr, sizeof(fr), nodenum, &status); 2871 if (ret >= 0) 2872 ret = status; 2873 if (ret < 0) { 2874 mlog(ML_ERROR, "Error %d when sending message %u (key " 2875 "0x%x) to node %u\n", ret, DLM_FINALIZE_RECO_MSG, 2876 dlm->key, nodenum); 2877 if (dlm_is_host_down(ret)) { 2878 /* this has no effect on this recovery 2879 * session, so set the status to zero to 2880 * finish out the last recovery */ 2881 mlog(ML_ERROR, "node %u went down after this " 2882 "node finished recovery.\n", nodenum); 2883 ret = 0; 2884 continue; 2885 } 2886 break; 2887 } 2888 } 2889 if (stage == 1) { 2890 /* reset the node_iter back to the top and send finalize2 */ 2891 iter.curnode = -1; 2892 stage = 2; 2893 goto stage2; 2894 } 2895 2896 return ret; 2897 } 2898 2899 int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data, 2900 void **ret_data) 2901 { 2902 struct dlm_ctxt *dlm = data; 2903 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; 2904 int stage = 1; 2905 2906 /* ok to return 0, domain has gone away */ 2907 if (!dlm_grab(dlm)) 2908 return 0; 2909 2910 if (fr->flags & DLM_FINALIZE_STAGE2) 2911 stage = 2; 2912 2913 mlog(0, "%s: node %u finalizing recovery stage%d of " 2914 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage, 2915 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master); 2916 2917 spin_lock(&dlm->spinlock); 2918 2919 if (dlm->reco.new_master != fr->node_idx) { 2920 mlog(ML_ERROR, "node %u sent recovery finalize msg, but node " 2921 "%u is supposed to be the new master, dead=%u\n", 2922 fr->node_idx, dlm->reco.new_master, fr->dead_node); 2923 BUG(); 2924 } 2925 if (dlm->reco.dead_node != fr->dead_node) { 2926 mlog(ML_ERROR, "node %u sent recovery finalize msg for dead " 2927 "node %u, but node %u is supposed to be dead\n", 2928 fr->node_idx, fr->dead_node, dlm->reco.dead_node); 2929 BUG(); 2930 } 2931 2932 switch (stage) { 2933 case 1: 2934 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); 2935 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { 2936 mlog(ML_ERROR, "%s: received finalize1 from " 2937 "new master %u for dead node %u, but " 2938 "this node has already received it!\n", 2939 dlm->name, fr->node_idx, fr->dead_node); 2940 dlm_print_reco_node_status(dlm); 2941 BUG(); 2942 } 2943 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; 2944 spin_unlock(&dlm->spinlock); 2945 break; 2946 case 2: 2947 if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) { 2948 mlog(ML_ERROR, "%s: received finalize2 from " 2949 "new master %u for dead node %u, but " 2950 "this node did not have finalize1!\n", 2951 dlm->name, fr->node_idx, fr->dead_node); 2952 dlm_print_reco_node_status(dlm); 2953 BUG(); 2954 } 2955 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; 2956 __dlm_reset_recovery(dlm); 2957 spin_unlock(&dlm->spinlock); 2958 dlm_kick_recovery_thread(dlm); 2959 break; 2960 } 2961 2962 mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n", 2963 dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master); 2964 2965 dlm_put(dlm); 2966 return 0; 2967 } 2968