Lines Matching +full:lock +full:- +full:state
1 // SPDX-License-Identifier: GPL-2.0-or-later
103 assert_spin_locked(&dlm->spinlock); in dlm_set_reco_dead_node()
104 if (dlm->reco.dead_node != dead_node) in dlm_set_reco_dead_node()
106 dlm->name, dlm->reco.dead_node, dead_node); in dlm_set_reco_dead_node()
107 dlm->reco.dead_node = dead_node; in dlm_set_reco_dead_node()
113 assert_spin_locked(&dlm->spinlock); in dlm_set_reco_master()
115 dlm->name, dlm->reco.new_master, master); in dlm_set_reco_master()
116 dlm->reco.new_master = master; in dlm_set_reco_master()
121 assert_spin_locked(&dlm->spinlock); in __dlm_reset_recovery()
122 clear_bit(dlm->reco.dead_node, dlm->recovery_map); in __dlm_reset_recovery()
137 spin_lock(&dlm->work_lock); in dlm_dispatch_work()
138 list_splice_init(&dlm->work_list, &tmp_list); in dlm_dispatch_work()
139 spin_unlock(&dlm->work_lock); in dlm_dispatch_work()
144 mlog(0, "%s: work thread has %d work items\n", dlm->name, tot); in dlm_dispatch_work()
147 workfunc = item->func; in dlm_dispatch_work()
148 list_del_init(&item->list); in dlm_dispatch_work()
151 * it disappear. just double-check. */ in dlm_dispatch_work()
152 BUG_ON(item->dlm != dlm); in dlm_dispatch_work()
156 workfunc(item, item->data); in dlm_dispatch_work()
175 wake_up(&dlm->dlm_reco_thread_wq); in dlm_kick_recovery_thread()
183 dlm->dlm_reco_thread_task = kthread_run(dlm_recovery_thread, dlm, in dlm_launch_recovery_thread()
184 "dlm_reco-%s", dlm->name); in dlm_launch_recovery_thread()
185 if (IS_ERR(dlm->dlm_reco_thread_task)) { in dlm_launch_recovery_thread()
186 mlog_errno(PTR_ERR(dlm->dlm_reco_thread_task)); in dlm_launch_recovery_thread()
187 dlm->dlm_reco_thread_task = NULL; in dlm_launch_recovery_thread()
188 return -EINVAL; in dlm_launch_recovery_thread()
196 if (dlm->dlm_reco_thread_task) { in dlm_complete_recovery_thread()
198 kthread_stop(dlm->dlm_reco_thread_task); in dlm_complete_recovery_thread()
199 dlm->dlm_reco_thread_task = NULL; in dlm_complete_recovery_thread()
214 * 5) the new master collects up all of secondary lock queue info
215 * one lock at a time, forcing each node to communicate back
217 * 6) each secondary lock queue responds with the full known lock info
233 mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n", in dlm_print_reco_node_status()
234 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_print_reco_node_status()
235 dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive", in dlm_print_reco_node_status()
236 dlm->reco.dead_node, dlm->reco.new_master); in dlm_print_reco_node_status()
238 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_print_reco_node_status()
240 switch (ndata->state) { in dlm_print_reco_node_status()
260 st = "finalize-sent"; in dlm_print_reco_node_status()
266 mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n", in dlm_print_reco_node_status()
267 dlm->name, ndata->node_num, st); in dlm_print_reco_node_status()
269 list_for_each_entry(res, &dlm->reco.resources, recovering) { in dlm_print_reco_node_status()
271 dlm->name, res->lockname.len, res->lockname.name); in dlm_print_reco_node_status()
283 mlog(0, "dlm thread running for %s...\n", dlm->name); in dlm_recovery_thread()
288 if (status == -EAGAIN) { in dlm_recovery_thread()
296 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, in dlm_recovery_thread()
309 spin_lock(&dlm->spinlock); in dlm_reco_master_ready()
310 ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM); in dlm_reco_master_ready()
311 spin_unlock(&dlm->spinlock); in dlm_reco_master_ready()
320 spin_lock(&dlm->spinlock); in dlm_is_node_dead()
321 dead = !test_bit(node, dlm->domain_map); in dlm_is_node_dead()
322 spin_unlock(&dlm->spinlock); in dlm_is_node_dead()
331 spin_lock(&dlm->spinlock); in dlm_is_node_recovered()
332 recovered = !test_bit(node, dlm->recovery_map); in dlm_is_node_recovered()
333 spin_unlock(&dlm->spinlock); in dlm_is_node_recovered()
344 "domain %s\n", node, dlm->name); in dlm_wait_for_node_death()
347 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_death()
351 wait_event(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_death()
361 "domain %s\n", node, dlm->name); in dlm_wait_for_node_recovery()
364 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_recovery()
368 wait_event(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_recovery()
372 /* callers of the top-level api calls (dlmlock/dlmunlock) should
373 * block on the dlm->reco.event when recovery is in progress.
374 * the dlm recovery thread will set this state when it begins
376 * the state and wake as soon as all affected lock resources have
381 spin_lock(&dlm->spinlock); in dlm_in_recovery()
382 in_recovery = !!(dlm->reco.state & DLM_RECO_STATE_ACTIVE); in dlm_in_recovery()
383 spin_unlock(&dlm->spinlock); in dlm_in_recovery()
392 "state=%d, master=%u, dead=%u\n", in dlm_wait_for_recovery()
393 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_wait_for_recovery()
394 dlm->reco.state, dlm->reco.new_master, in dlm_wait_for_recovery()
395 dlm->reco.dead_node); in dlm_wait_for_recovery()
397 wait_event(dlm->reco.event, !dlm_in_recovery(dlm)); in dlm_wait_for_recovery()
402 assert_spin_locked(&dlm->spinlock); in dlm_begin_recovery()
403 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); in dlm_begin_recovery()
405 dlm->name, dlm->reco.dead_node); in dlm_begin_recovery()
406 dlm->reco.state |= DLM_RECO_STATE_ACTIVE; in dlm_begin_recovery()
411 spin_lock(&dlm->spinlock); in dlm_end_recovery()
412 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); in dlm_end_recovery()
413 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; in dlm_end_recovery()
414 spin_unlock(&dlm->spinlock); in dlm_end_recovery()
415 printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name); in dlm_end_recovery()
416 wake_up(&dlm->reco.event); in dlm_end_recovery()
422 "dead node %u in domain %s\n", dlm->reco.new_master, in dlm_print_recovery_master()
423 (dlm->node_num == dlm->reco.new_master ? "me" : "he"), in dlm_print_recovery_master()
424 dlm->reco.dead_node, dlm->name); in dlm_print_recovery_master()
432 spin_lock(&dlm->spinlock); in dlm_do_recovery()
434 if (dlm->migrate_done) { in dlm_do_recovery()
436 "lock resources\n", dlm->name); in dlm_do_recovery()
437 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
442 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM && in dlm_do_recovery()
443 test_bit(dlm->reco.new_master, dlm->recovery_map)) { in dlm_do_recovery()
445 dlm->reco.new_master, dlm->reco.dead_node); in dlm_do_recovery()
451 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
454 bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES); in dlm_do_recovery()
459 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { in dlm_do_recovery()
462 dlm->reco.dead_node); in dlm_do_recovery()
466 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
467 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
472 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_do_recovery()
473 dlm->reco.dead_node); in dlm_do_recovery()
479 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
481 if (dlm->reco.new_master == dlm->node_num) in dlm_do_recovery()
484 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
486 * is the master, -EEXIST if it's another node. in dlm_do_recovery()
500 * because all of the dead node's lock resources in dlm_do_recovery()
501 * have been marked as in-recovery */ in dlm_do_recovery()
510 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); in dlm_do_recovery()
514 "retrying.\n", dlm->name, status, dlm->reco.dead_node); in dlm_do_recovery()
521 dlm->name, dlm->reco.dead_node, dlm->node_num); in dlm_do_recovery()
522 spin_lock(&dlm->spinlock); in dlm_do_recovery()
524 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in dlm_do_recovery()
525 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
530 return -EAGAIN; in dlm_do_recovery()
547 "retrying\n", dlm->name); in dlm_remaster_locks()
552 /* safe to access the node data list without a lock, since this in dlm_remaster_locks()
554 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_remaster_locks()
555 BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT); in dlm_remaster_locks()
556 ndata->state = DLM_RECO_NODE_DATA_REQUESTING; in dlm_remaster_locks()
558 mlog(0, "%s: Requesting lock info from node %u\n", dlm->name, in dlm_remaster_locks()
559 ndata->node_num); in dlm_remaster_locks()
561 if (ndata->node_num == dlm->node_num) { in dlm_remaster_locks()
562 ndata->state = DLM_RECO_NODE_DATA_DONE; in dlm_remaster_locks()
567 status = dlm_request_all_locks(dlm, ndata->node_num, in dlm_remaster_locks()
574 ndata->state = DLM_RECO_NODE_DATA_DEAD; in dlm_remaster_locks()
576 * with the network state. */ in dlm_remaster_locks()
577 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_remaster_locks()
579 ndata->node_num), in dlm_remaster_locks()
582 "dead? %s\n", ndata->node_num, in dlm_remaster_locks()
583 str_yes_no(dlm_is_node_dead(dlm, ndata->node_num))); in dlm_remaster_locks()
585 /* -ENOMEM on the other node */ in dlm_remaster_locks()
589 dlm->name, ndata->node_num, in dlm_remaster_locks()
597 switch (ndata->state) { in dlm_remaster_locks()
606 ndata->node_num, dead_node); in dlm_remaster_locks()
611 ndata->state = DLM_RECO_NODE_DATA_REQUESTED; in dlm_remaster_locks()
614 ndata->node_num, dead_node); in dlm_remaster_locks()
619 ndata->node_num, dead_node); in dlm_remaster_locks()
624 ndata->node_num, dead_node); in dlm_remaster_locks()
630 mlog(0, "%s: Done requesting all lock info\n", dlm->name); in dlm_remaster_locks()
640 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_remaster_locks()
641 mlog(0, "checking recovery state of node %u\n", in dlm_remaster_locks()
642 ndata->node_num); in dlm_remaster_locks()
643 switch (ndata->state) { in dlm_remaster_locks()
646 mlog(ML_ERROR, "bad ndata state for " in dlm_remaster_locks()
647 "node %u: state=%d\n", in dlm_remaster_locks()
648 ndata->node_num, ndata->state); in dlm_remaster_locks()
654 "node %u\n", ndata->node_num, in dlm_remaster_locks()
659 mlog(0, "%s: node %u still in state %s\n", in dlm_remaster_locks()
660 dlm->name, ndata->node_num, in dlm_remaster_locks()
661 ndata->state==DLM_RECO_NODE_DATA_RECEIVING ? in dlm_remaster_locks()
666 mlog(0, "%s: node %u state is done\n", in dlm_remaster_locks()
667 dlm->name, ndata->node_num); in dlm_remaster_locks()
670 mlog(0, "%s: node %u state is finalize\n", in dlm_remaster_locks()
671 dlm->name, ndata->node_num); in dlm_remaster_locks()
686 spin_lock(&dlm->spinlock); in dlm_remaster_locks()
687 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; in dlm_remaster_locks()
688 spin_unlock(&dlm->spinlock); in dlm_remaster_locks()
690 /* all nodes are now in DLM_RECO_NODE_DATA_DONE state in dlm_remaster_locks()
698 spin_lock(&dlm->spinlock); in dlm_remaster_locks()
700 dlm->node_num); in dlm_remaster_locks()
701 spin_unlock(&dlm->spinlock); in dlm_remaster_locks()
705 "dead=%u, this=%u, new=%u\n", dlm->name, in dlm_remaster_locks()
706 jiffies, dlm->reco.dead_node, in dlm_remaster_locks()
707 dlm->node_num, dlm->reco.new_master); in dlm_remaster_locks()
716 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, in dlm_remaster_locks()
733 spin_lock(&dlm->spinlock); in dlm_init_recovery_area()
734 bitmap_copy(dlm->reco.node_map, dlm->domain_map, O2NM_MAX_NODES); in dlm_init_recovery_area()
736 * this lock, and death will be trapped later, so this should do */ in dlm_init_recovery_area()
737 spin_unlock(&dlm->spinlock); in dlm_init_recovery_area()
740 num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num); in dlm_init_recovery_area()
749 return -ENOMEM; in dlm_init_recovery_area()
751 ndata->node_num = num; in dlm_init_recovery_area()
752 ndata->state = DLM_RECO_NODE_DATA_INIT; in dlm_init_recovery_area()
754 list_add_tail(&ndata->list, &dlm->reco.node_data); in dlm_init_recovery_area()
768 list_splice_init(&dlm->reco.node_data, &tmplist); in dlm_destroy_recovery_area()
772 list_del_init(&ndata->list); in dlm_destroy_recovery_area()
791 lr.node_idx = dlm->node_num; in dlm_request_all_locks()
795 ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key, in dlm_request_all_locks()
801 "to recover dead node %u\n", dlm->name, ret, in dlm_request_all_locks()
815 struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf; in dlm_request_all_locks_handler()
820 return -EINVAL; in dlm_request_all_locks_handler()
822 if (lr->dead_node != dlm->reco.dead_node) { in dlm_request_all_locks_handler()
824 "dead_node is %u\n", dlm->name, lr->node_idx, in dlm_request_all_locks_handler()
825 lr->dead_node, dlm->reco.dead_node); in dlm_request_all_locks_handler()
829 return -ENOMEM; in dlm_request_all_locks_handler()
831 BUG_ON(lr->dead_node != dlm->reco.dead_node); in dlm_request_all_locks_handler()
836 return -ENOMEM; in dlm_request_all_locks_handler()
844 return -ENOMEM; in dlm_request_all_locks_handler()
850 item->u.ral.reco_master = lr->node_idx; in dlm_request_all_locks_handler()
851 item->u.ral.dead_node = lr->dead_node; in dlm_request_all_locks_handler()
852 spin_lock(&dlm->work_lock); in dlm_request_all_locks_handler()
853 list_add_tail(&item->list, &dlm->work_list); in dlm_request_all_locks_handler()
854 spin_unlock(&dlm->work_lock); in dlm_request_all_locks_handler()
855 queue_work(dlm->dlm_worker, &dlm->dispatched_work); in dlm_request_all_locks_handler()
871 dlm = item->dlm; in dlm_request_all_locks_worker()
872 dead_node = item->u.ral.dead_node; in dlm_request_all_locks_worker()
873 reco_master = item->u.ral.reco_master; in dlm_request_all_locks_worker()
877 dlm->name, dead_node, reco_master); in dlm_request_all_locks_worker()
879 if (dead_node != dlm->reco.dead_node || in dlm_request_all_locks_worker()
880 reco_master != dlm->reco.new_master) { in dlm_request_all_locks_worker()
883 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { in dlm_request_all_locks_worker()
884 mlog(ML_NOTICE, "%s: will not send recovery state, " in dlm_request_all_locks_worker()
886 " current=(dead=%u,mas=%u)\n", dlm->name, in dlm_request_all_locks_worker()
888 dlm->reco.dead_node, dlm->reco.new_master); in dlm_request_all_locks_worker()
890 mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, " in dlm_request_all_locks_worker()
892 dlm->name, dlm->reco.dead_node, in dlm_request_all_locks_worker()
893 dlm->reco.new_master, dead_node, reco_master); in dlm_request_all_locks_worker()
898 /* lock resources should have already been moved to the in dlm_request_all_locks_worker()
899 * dlm->reco.resources list. now move items from that list in dlm_request_all_locks_worker()
902 * can safely move UNKNOWN lock resources for each recovery in dlm_request_all_locks_worker()
906 /* now we can begin blasting lockreses without the dlm lock */ in dlm_request_all_locks_worker()
915 "recovery state for dead node %u, ret=%d\n", dlm->name, in dlm_request_all_locks_worker()
923 spin_lock(&dlm->spinlock); in dlm_request_all_locks_worker()
924 list_splice_init(&resources, &dlm->reco.resources); in dlm_request_all_locks_worker()
925 spin_unlock(&dlm->spinlock); in dlm_request_all_locks_worker()
931 "recovery all-done for dead node %u, ret=%d\n", in dlm_request_all_locks_worker()
932 dlm->name, reco_master, dead_node, ret); in dlm_request_all_locks_worker()
946 done_msg.node_idx = dlm->node_num; in dlm_send_all_done_msg()
952 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, in dlm_send_all_done_msg()
956 "to recover dead node %u\n", dlm->name, ret, send_to, in dlm_send_all_done_msg()
971 struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf; in dlm_reco_data_done_handler()
973 int ret = -EINVAL; in dlm_reco_data_done_handler()
976 return -EINVAL; in dlm_reco_data_done_handler()
979 "node_idx=%u, this node=%u\n", done->dead_node, in dlm_reco_data_done_handler()
980 dlm->reco.dead_node, done->node_idx, dlm->node_num); in dlm_reco_data_done_handler()
982 mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node), in dlm_reco_data_done_handler()
984 "node_idx=%u, this node=%u\n", done->dead_node, in dlm_reco_data_done_handler()
985 dlm->reco.dead_node, done->node_idx, dlm->node_num); in dlm_reco_data_done_handler()
988 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_reco_data_done_handler()
989 if (ndata->node_num != done->node_idx) in dlm_reco_data_done_handler()
992 switch (ndata->state) { in dlm_reco_data_done_handler()
997 mlog(ML_ERROR, "bad ndata state for node %u:" in dlm_reco_data_done_handler()
998 " state=%d\n", ndata->node_num, in dlm_reco_data_done_handler()
999 ndata->state); in dlm_reco_data_done_handler()
1010 ndata->node_num); in dlm_reco_data_done_handler()
1012 ndata->state = DLM_RECO_NODE_DATA_DONE; in dlm_reco_data_done_handler()
1025 "%u\n", done->node_idx); in dlm_reco_data_done_handler()
1037 struct dlm_lock *lock; in dlm_move_reco_locks_to_list() local
1039 spin_lock(&dlm->spinlock); in dlm_move_reco_locks_to_list()
1040 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { in dlm_move_reco_locks_to_list()
1043 if (dlm_is_recovery_lock(res->lockname.name, in dlm_move_reco_locks_to_list()
1044 res->lockname.len)) { in dlm_move_reco_locks_to_list()
1045 spin_lock(&res->spinlock); in dlm_move_reco_locks_to_list()
1046 list_for_each_entry(lock, &res->granted, list) { in dlm_move_reco_locks_to_list()
1047 if (lock->ml.node == dead_node) { in dlm_move_reco_locks_to_list()
1049 "a $RECOVERY lock for dead " in dlm_move_reco_locks_to_list()
1051 dead_node, dlm->name); in dlm_move_reco_locks_to_list()
1052 list_del_init(&lock->list); in dlm_move_reco_locks_to_list()
1053 dlm_lock_put(lock); in dlm_move_reco_locks_to_list()
1055 * - do manually */ in dlm_move_reco_locks_to_list()
1056 dlm_lock_put(lock); in dlm_move_reco_locks_to_list()
1060 spin_unlock(&res->spinlock); in dlm_move_reco_locks_to_list()
1064 if (res->owner == dead_node) { in dlm_move_reco_locks_to_list()
1068 list_move_tail(&res->recovering, list); in dlm_move_reco_locks_to_list()
1069 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { in dlm_move_reco_locks_to_list()
1072 list_move_tail(&res->recovering, list); in dlm_move_reco_locks_to_list()
1075 spin_unlock(&dlm->spinlock); in dlm_move_reco_locks_to_list()
1081 struct list_head *iter, *queue = &res->granted; in dlm_num_locks_in_lockres()
1099 u64 mig_cookie = be64_to_cpu(mres->mig_cookie); in dlm_send_mig_lockres_msg()
1100 int mres_total_locks = be32_to_cpu(mres->total_locks); in dlm_send_mig_lockres_msg()
1102 u8 orig_flags = mres->flags, in dlm_send_mig_lockres_msg()
1103 orig_master = mres->master; in dlm_send_mig_lockres_msg()
1105 BUG_ON(mres->num_locks > DLM_MAX_MIGRATABLE_LOCKS); in dlm_send_mig_lockres_msg()
1106 if (!mres->num_locks) in dlm_send_mig_lockres_msg()
1109 /* add an all-done flag if we reached the last lock */ in dlm_send_mig_lockres_msg()
1110 orig_flags = mres->flags; in dlm_send_mig_lockres_msg()
1113 mres->flags |= DLM_MRES_ALL_DONE; in dlm_send_mig_lockres_msg()
1116 dlm->name, res->lockname.len, res->lockname.name, in dlm_send_mig_lockres_msg()
1121 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, in dlm_send_mig_lockres_msg()
1122 struct_size(mres, ml, mres->num_locks), in dlm_send_mig_lockres_msg()
1128 "node %u (%s)\n", dlm->name, mres->lockname_len, in dlm_send_mig_lockres_msg()
1129 mres->lockname, ret, send_to, in dlm_send_mig_lockres_msg()
1133 /* might get an -ENOMEM back here */ in dlm_send_mig_lockres_msg()
1138 if (ret == -EFAULT) { in dlm_send_mig_lockres_msg()
1147 dlm_init_migratable_lockres(mres, res->lockname.name, in dlm_send_mig_lockres_msg()
1148 res->lockname.len, mres_total_locks, in dlm_send_mig_lockres_msg()
1160 mres->lockname_len = namelen; in dlm_init_migratable_lockres()
1161 memcpy(mres->lockname, lockname, namelen); in dlm_init_migratable_lockres()
1162 mres->num_locks = 0; in dlm_init_migratable_lockres()
1163 mres->total_locks = cpu_to_be32(total_locks); in dlm_init_migratable_lockres()
1164 mres->mig_cookie = cpu_to_be64(cookie); in dlm_init_migratable_lockres()
1165 mres->flags = flags; in dlm_init_migratable_lockres()
1166 mres->master = master; in dlm_init_migratable_lockres()
1169 static void dlm_prepare_lvb_for_migration(struct dlm_lock *lock, in dlm_prepare_lvb_for_migration() argument
1173 if (!lock->lksb) in dlm_prepare_lvb_for_migration()
1180 /* Only consider lvbs in locks with granted EX or PR lock levels */ in dlm_prepare_lvb_for_migration()
1181 if (lock->ml.type != LKM_EXMODE && lock->ml.type != LKM_PRMODE) in dlm_prepare_lvb_for_migration()
1184 if (dlm_lvb_is_empty(mres->lvb)) { in dlm_prepare_lvb_for_migration()
1185 memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN); in dlm_prepare_lvb_for_migration()
1190 if (!memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN)) in dlm_prepare_lvb_for_migration()
1193 mlog(ML_ERROR, "Mismatched lvb in lock cookie=%u:%llu, name=%.*s, " in dlm_prepare_lvb_for_migration()
1195 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), in dlm_prepare_lvb_for_migration()
1196 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), in dlm_prepare_lvb_for_migration()
1197 lock->lockres->lockname.len, lock->lockres->lockname.name, in dlm_prepare_lvb_for_migration()
1198 lock->ml.node); in dlm_prepare_lvb_for_migration()
1199 dlm_print_one_lock_resource(lock->lockres); in dlm_prepare_lvb_for_migration()
1203 /* returns 1 if this lock fills the network structure,
1205 static int dlm_add_lock_to_array(struct dlm_lock *lock, in dlm_add_lock_to_array() argument
1209 int lock_num = mres->num_locks; in dlm_add_lock_to_array()
1211 ml = &(mres->ml[lock_num]); in dlm_add_lock_to_array()
1212 ml->cookie = lock->ml.cookie; in dlm_add_lock_to_array()
1213 ml->type = lock->ml.type; in dlm_add_lock_to_array()
1214 ml->convert_type = lock->ml.convert_type; in dlm_add_lock_to_array()
1215 ml->highest_blocked = lock->ml.highest_blocked; in dlm_add_lock_to_array()
1216 ml->list = queue; in dlm_add_lock_to_array()
1217 if (lock->lksb) { in dlm_add_lock_to_array()
1218 ml->flags = lock->lksb->flags; in dlm_add_lock_to_array()
1219 dlm_prepare_lvb_for_migration(lock, mres, queue); in dlm_add_lock_to_array()
1221 ml->node = lock->ml.node; in dlm_add_lock_to_array()
1222 mres->num_locks++; in dlm_add_lock_to_array()
1224 if (mres->num_locks == DLM_MAX_MIGRATABLE_LOCKS) in dlm_add_lock_to_array()
1239 dummy.ml.node = dlm->node_num; in dlm_add_dummy_lock()
1247 if (unlikely(ml->cookie == 0 && in dlm_is_dummy_lock()
1248 ml->type == LKM_IVMODE && in dlm_is_dummy_lock()
1249 ml->convert_type == LKM_IVMODE && in dlm_is_dummy_lock()
1250 ml->highest_blocked == LKM_IVMODE && in dlm_is_dummy_lock()
1251 ml->list == DLM_BLOCKED_LIST)) { in dlm_is_dummy_lock()
1252 *nodenum = ml->node; in dlm_is_dummy_lock()
1265 struct dlm_lock *lock; in dlm_send_one_lockres() local
1281 dlm_init_migratable_lockres(mres, res->lockname.name, in dlm_send_one_lockres()
1282 res->lockname.len, total_locks, in dlm_send_one_lockres()
1283 mig_cookie, flags, res->owner); in dlm_send_one_lockres()
1288 list_for_each_entry(lock, queue, list) { in dlm_send_one_lockres()
1289 /* add another lock. */ in dlm_send_one_lockres()
1291 if (!dlm_add_lock_to_array(lock, mres, i)) in dlm_send_one_lockres()
1294 /* this filled the lock message, in dlm_send_one_lockres()
1303 /* send a dummy lock to indicate a mastery reference only */ in dlm_send_one_lockres()
1304 mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n", in dlm_send_one_lockres()
1305 dlm->name, res->lockname.len, res->lockname.name, in dlm_send_one_lockres()
1318 dlm->name, ret); in dlm_send_one_lockres()
1322 "lockres %.*s\n", dlm->name, send_to, in dlm_send_one_lockres()
1324 res->lockname.len, res->lockname.name); in dlm_send_one_lockres()
1348 (struct dlm_migratable_lockres *)msg->buf; in dlm_mig_lockres_handler()
1358 return -EINVAL; in dlm_mig_lockres_handler()
1363 dlm->name, mres->lockname_len, in dlm_mig_lockres_handler()
1364 mres->lockname, mres->master); in dlm_mig_lockres_handler()
1366 return -EINVAL; in dlm_mig_lockres_handler()
1369 BUG_ON(!(mres->flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION))); in dlm_mig_lockres_handler()
1371 real_master = mres->master; in dlm_mig_lockres_handler()
1374 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); in dlm_mig_lockres_handler()
1378 (mres->flags & DLM_MRES_RECOVERY) ? in dlm_mig_lockres_handler()
1379 "recovery" : "migration", mres->master); in dlm_mig_lockres_handler()
1380 if (mres->flags & DLM_MRES_ALL_DONE) in dlm_mig_lockres_handler()
1383 ret = -ENOMEM; in dlm_mig_lockres_handler()
1384 buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS); in dlm_mig_lockres_handler()
1389 /* lookup the lock to see if we have a secondary queue for this in dlm_mig_lockres_handler()
1392 hash = dlm_lockid_hash(mres->lockname, mres->lockname_len); in dlm_mig_lockres_handler()
1393 spin_lock(&dlm->spinlock); in dlm_mig_lockres_handler()
1394 res = __dlm_lookup_lockres_full(dlm, mres->lockname, mres->lockname_len, in dlm_mig_lockres_handler()
1399 spin_lock(&res->spinlock); in dlm_mig_lockres_handler()
1400 if (res->state & DLM_LOCK_RES_DROPPING_REF) { in dlm_mig_lockres_handler()
1403 " ref!\n", dlm->name, in dlm_mig_lockres_handler()
1404 mres->lockname_len, mres->lockname); in dlm_mig_lockres_handler()
1405 ret = -EINVAL; in dlm_mig_lockres_handler()
1406 spin_unlock(&res->spinlock); in dlm_mig_lockres_handler()
1407 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1412 if (mres->flags & DLM_MRES_RECOVERY) { in dlm_mig_lockres_handler()
1413 res->state |= DLM_LOCK_RES_RECOVERING; in dlm_mig_lockres_handler()
1415 if (res->state & DLM_LOCK_RES_MIGRATING) { in dlm_mig_lockres_handler()
1418 mlog(0, "lock %.*s is already migrating\n", in dlm_mig_lockres_handler()
1419 mres->lockname_len, in dlm_mig_lockres_handler()
1420 mres->lockname); in dlm_mig_lockres_handler()
1421 } else if (res->state & DLM_LOCK_RES_RECOVERING) { in dlm_mig_lockres_handler()
1424 "lock %.*s, but marked as recovering!\n", in dlm_mig_lockres_handler()
1425 mres->lockname_len, mres->lockname); in dlm_mig_lockres_handler()
1426 ret = -EFAULT; in dlm_mig_lockres_handler()
1427 spin_unlock(&res->spinlock); in dlm_mig_lockres_handler()
1428 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1432 res->state |= DLM_LOCK_RES_MIGRATING; in dlm_mig_lockres_handler()
1434 spin_unlock(&res->spinlock); in dlm_mig_lockres_handler()
1435 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1437 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1440 res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len); in dlm_mig_lockres_handler()
1449 if (mres->flags & DLM_MRES_RECOVERY) in dlm_mig_lockres_handler()
1450 res->state |= DLM_LOCK_RES_RECOVERING; in dlm_mig_lockres_handler()
1452 res->state |= DLM_LOCK_RES_MIGRATING; in dlm_mig_lockres_handler()
1454 spin_lock(&dlm->spinlock); in dlm_mig_lockres_handler()
1456 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1458 /* Add an extra ref for this lock-less lockres lest the in dlm_mig_lockres_handler()
1465 * 2. kref_init in dlm_new_lockres()->dlm_init_lockres(). in dlm_mig_lockres_handler()
1476 spin_lock(&res->spinlock); in dlm_mig_lockres_handler()
1477 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; in dlm_mig_lockres_handler()
1478 spin_unlock(&res->spinlock); in dlm_mig_lockres_handler()
1479 wake_up(&res->wq); in dlm_mig_lockres_handler()
1484 * the proper res->state flags. */ in dlm_mig_lockres_handler()
1486 spin_lock(&res->spinlock); in dlm_mig_lockres_handler()
1488 * or when a lock is added by the recovery worker */ in dlm_mig_lockres_handler()
1490 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) { in dlm_mig_lockres_handler()
1492 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); in dlm_mig_lockres_handler()
1495 "%.*s\n", mres->lockname_len, mres->lockname); in dlm_mig_lockres_handler()
1499 dlm_change_lockres_owner(dlm, res, dlm->node_num); in dlm_mig_lockres_handler()
1501 spin_unlock(&res->spinlock); in dlm_mig_lockres_handler()
1505 memcpy(buf, msg->buf, be16_to_cpu(msg->data_len)); /* copy the whole message */ in dlm_mig_lockres_handler()
1507 item->u.ml.lockres = res; /* already have a ref */ in dlm_mig_lockres_handler()
1508 item->u.ml.real_master = real_master; in dlm_mig_lockres_handler()
1509 item->u.ml.extra_ref = extra_refs; in dlm_mig_lockres_handler()
1510 spin_lock(&dlm->work_lock); in dlm_mig_lockres_handler()
1511 list_add_tail(&item->list, &dlm->work_list); in dlm_mig_lockres_handler()
1512 spin_unlock(&dlm->work_lock); in dlm_mig_lockres_handler()
1513 queue_work(dlm->dlm_worker, &dlm->dispatched_work); in dlm_mig_lockres_handler()
1540 dlm = item->dlm; in dlm_mig_lockres_worker()
1543 res = item->u.ml.lockres; in dlm_mig_lockres_worker()
1544 real_master = item->u.ml.real_master; in dlm_mig_lockres_worker()
1545 extra_ref = item->u.ml.extra_ref; in dlm_mig_lockres_worker()
1548 /* this case is super-rare. only occurs if in dlm_mig_lockres_worker()
1560 res->lockname.len, res->lockname.name); in dlm_mig_lockres_worker()
1562 spin_lock(&res->spinlock); in dlm_mig_lockres_worker()
1564 spin_unlock(&res->spinlock); in dlm_mig_lockres_worker()
1567 real_master, res->lockname.len, in dlm_mig_lockres_worker()
1568 res->lockname.name); in dlm_mig_lockres_worker()
1580 if ((mres->flags & (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) == in dlm_mig_lockres_worker()
1582 ret = dlm_finish_migration(dlm, res, mres->master); in dlm_mig_lockres_worker()
1622 * the lock needs remastering here. in dlm_lockres_master_requery()
1626 * we need to remaster this lock. if not, then the in dlm_lockres_master_requery()
1632 spin_lock(&dlm->spinlock); in dlm_lockres_master_requery()
1633 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_lockres_master_requery()
1634 spin_unlock(&dlm->spinlock); in dlm_lockres_master_requery()
1638 if (nodenum == dlm->node_num) in dlm_lockres_master_requery()
1649 mlog(0, "lock master is %u\n", *real_master); in dlm_lockres_master_requery()
1665 req.node_idx = dlm->node_num; in dlm_do_master_requery()
1666 req.namelen = res->lockname.len; in dlm_do_master_requery()
1667 memcpy(req.name, res->lockname.name, res->lockname.len); in dlm_do_master_requery()
1670 ret = o2net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key, in dlm_do_master_requery()
1675 dlm->key, nodenum); in dlm_do_master_requery()
1676 else if (status == -ENOMEM) { in dlm_do_master_requery()
1699 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; in dlm_master_requery_handler()
1712 hash = dlm_lockid_hash(req->name, req->namelen); in dlm_master_requery_handler()
1714 spin_lock(&dlm->spinlock); in dlm_master_requery_handler()
1715 res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash); in dlm_master_requery_handler()
1717 spin_lock(&res->spinlock); in dlm_master_requery_handler()
1718 master = res->owner; in dlm_master_requery_handler()
1719 if (master == dlm->node_num) { in dlm_master_requery_handler()
1724 spin_unlock(&res->spinlock); in dlm_master_requery_handler()
1726 spin_unlock(&dlm->spinlock); in dlm_master_requery_handler()
1733 spin_unlock(&res->spinlock); in dlm_master_requery_handler()
1737 spin_unlock(&res->spinlock); in dlm_master_requery_handler()
1741 spin_unlock(&dlm->spinlock); in dlm_master_requery_handler()
1754 ret = &(res->granted); in dlm_list_num_to_pointer()
1763 * NOTE about in-flight requests during migration:
1766 * MIGRATING and then flushed all of its pending ASTS. So any in-flight
1768 * case the lock data will reflect the change and a return message is on
1774 * well. For the lock case, there is no way a lock can be on the master
1775 * queue and not be on the secondary queue since the lock is always added
1777 * a lock that he doesn't already have on the list.
1778 * In total, this means that the local lock is correct and should not be
1780 * from the master before the MIGRATING flag will bring the lock properly
1781 * up-to-date, and the change will be ordered properly for the waiter.
1782 * We will *not* attempt to modify the lock underneath the waiter.
1796 struct dlm_lock *lock; in dlm_process_recovery_data() local
1800 mlog(0, "running %d locks for this lockres\n", mres->num_locks); in dlm_process_recovery_data()
1801 for (i=0; i<mres->num_locks; i++) { in dlm_process_recovery_data()
1802 ml = &(mres->ml[i]); in dlm_process_recovery_data()
1806 BUG_ON(mres->num_locks != 1); in dlm_process_recovery_data()
1807 mlog(0, "%s:%.*s: dummy lock for %u\n", in dlm_process_recovery_data()
1808 dlm->name, mres->lockname_len, mres->lockname, in dlm_process_recovery_data()
1810 spin_lock(&res->spinlock); in dlm_process_recovery_data()
1812 spin_unlock(&res->spinlock); in dlm_process_recovery_data()
1815 BUG_ON(ml->highest_blocked != LKM_IVMODE); in dlm_process_recovery_data()
1819 queue = dlm_list_num_to_pointer(res, ml->list); in dlm_process_recovery_data()
1822 /* if the lock is for the local node it needs to in dlm_process_recovery_data()
1824 * do not allocate a new lock structure. */ in dlm_process_recovery_data()
1825 if (ml->node == dlm->node_num) { in dlm_process_recovery_data()
1827 BUG_ON(!(mres->flags & DLM_MRES_MIGRATION)); in dlm_process_recovery_data()
1829 lock = NULL; in dlm_process_recovery_data()
1830 spin_lock(&res->spinlock); in dlm_process_recovery_data()
1834 lock = list_entry(iter, in dlm_process_recovery_data()
1836 if (lock->ml.cookie == ml->cookie) in dlm_process_recovery_data()
1838 lock = NULL; in dlm_process_recovery_data()
1840 if (lock) in dlm_process_recovery_data()
1844 /* lock is always created locally first, and in dlm_process_recovery_data()
1846 if (!lock) { in dlm_process_recovery_data()
1847 c = ml->cookie; in dlm_process_recovery_data()
1848 mlog(ML_ERROR, "Could not find local lock " in dlm_process_recovery_data()
1854 ml->node, ml->list, ml->flags, ml->type, in dlm_process_recovery_data()
1855 ml->convert_type, ml->highest_blocked); in dlm_process_recovery_data()
1860 if (lock->ml.node != ml->node) { in dlm_process_recovery_data()
1861 c = lock->ml.cookie; in dlm_process_recovery_data()
1862 mlog(ML_ERROR, "Mismatched node# in lock " in dlm_process_recovery_data()
1866 res->lockname.len, res->lockname.name, in dlm_process_recovery_data()
1867 lock->ml.node); in dlm_process_recovery_data()
1868 c = ml->cookie; in dlm_process_recovery_data()
1869 mlog(ML_ERROR, "Migrate lock cookie %u:%llu, " in dlm_process_recovery_data()
1874 ml->node, ml->list, ml->flags, ml->type, in dlm_process_recovery_data()
1875 ml->convert_type, ml->highest_blocked); in dlm_process_recovery_data()
1881 c = ml->cookie; in dlm_process_recovery_data()
1882 mlog(0, "Lock cookie %u:%llu was on list %u " in dlm_process_recovery_data()
1886 j, ml->list, res->lockname.len, in dlm_process_recovery_data()
1887 res->lockname.name); in dlm_process_recovery_data()
1889 spin_unlock(&res->spinlock); in dlm_process_recovery_data()
1896 /* move the lock to its proper place */ in dlm_process_recovery_data()
1897 /* do not alter lock refcount. switching lists. */ in dlm_process_recovery_data()
1898 list_move_tail(&lock->list, queue); in dlm_process_recovery_data()
1899 spin_unlock(&res->spinlock); in dlm_process_recovery_data()
1901 mlog(0, "just reordered a local lock!\n"); in dlm_process_recovery_data()
1905 /* lock is for another node. */ in dlm_process_recovery_data()
1906 newlock = dlm_new_lock(ml->type, ml->node, in dlm_process_recovery_data()
1907 be64_to_cpu(ml->cookie), NULL); in dlm_process_recovery_data()
1909 ret = -ENOMEM; in dlm_process_recovery_data()
1912 lksb = newlock->lksb; in dlm_process_recovery_data()
1915 if (ml->convert_type != LKM_IVMODE) { in dlm_process_recovery_data()
1916 BUG_ON(queue != &res->converting); in dlm_process_recovery_data()
1917 newlock->ml.convert_type = ml->convert_type; in dlm_process_recovery_data()
1919 lksb->flags |= (ml->flags & in dlm_process_recovery_data()
1922 if (ml->type == LKM_NLMODE) in dlm_process_recovery_data()
1926 * If the lock is in the blocked list it can't have a valid lvb, in dlm_process_recovery_data()
1929 if (ml->list == DLM_BLOCKED_LIST) in dlm_process_recovery_data()
1932 if (!dlm_lvb_is_empty(mres->lvb)) { in dlm_process_recovery_data()
1933 if (lksb->flags & DLM_LKSB_PUT_LVB) { in dlm_process_recovery_data()
1937 memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN); in dlm_process_recovery_data()
1938 /* the lock resource lvb update must happen in dlm_process_recovery_data()
1942 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); in dlm_process_recovery_data()
1946 BUG_ON(ml->type != LKM_EXMODE && in dlm_process_recovery_data()
1947 ml->type != LKM_PRMODE); in dlm_process_recovery_data()
1948 if (!dlm_lvb_is_empty(res->lvb) && in dlm_process_recovery_data()
1949 (ml->type == LKM_EXMODE || in dlm_process_recovery_data()
1950 memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { in dlm_process_recovery_data()
1953 "lvb! type=%d\n", dlm->name, in dlm_process_recovery_data()
1954 res->lockname.len, in dlm_process_recovery_data()
1955 res->lockname.name, ml->type); in dlm_process_recovery_data()
1958 printk("%02x", res->lvb[i]); in dlm_process_recovery_data()
1961 printk("%02x", mres->lvb[i]); in dlm_process_recovery_data()
1966 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); in dlm_process_recovery_data()
1972 * wrt lock queue ordering and recovery: in dlm_process_recovery_data()
1980 * just means that a lock request may get pushed in dlm_process_recovery_data()
1982 * also note that for a given node the lock order in dlm_process_recovery_data()
1988 spin_lock(&res->spinlock); in dlm_process_recovery_data()
1989 list_for_each_entry(lock, queue, list) { in dlm_process_recovery_data()
1990 if (lock->ml.cookie == ml->cookie) { in dlm_process_recovery_data()
1991 c = lock->ml.cookie; in dlm_process_recovery_data()
1992 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already " in dlm_process_recovery_data()
1993 "exists on this lockres!\n", dlm->name, in dlm_process_recovery_data()
1994 res->lockname.len, res->lockname.name, in dlm_process_recovery_data()
1998 mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, " in dlm_process_recovery_data()
2000 ml->type, ml->convert_type, ml->node, in dlm_process_recovery_data()
2001 dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)), in dlm_process_recovery_data()
2002 dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)), in dlm_process_recovery_data()
2003 ml->list); in dlm_process_recovery_data()
2012 if (mres->flags & DLM_MRES_RECOVERY && in dlm_process_recovery_data()
2013 ml->list == DLM_CONVERTING_LIST && in dlm_process_recovery_data()
2014 newlock->ml.type > in dlm_process_recovery_data()
2015 newlock->ml.convert_type) { in dlm_process_recovery_data()
2018 list_add(&newlock->list, queue); in dlm_process_recovery_data()
2020 list_add_tail(&newlock->list, queue); in dlm_process_recovery_data()
2021 mlog(0, "%s:%.*s: added lock for node %u, " in dlm_process_recovery_data()
2022 "setting refmap bit\n", dlm->name, in dlm_process_recovery_data()
2023 res->lockname.len, res->lockname.name, ml->node); in dlm_process_recovery_data()
2024 dlm_lockres_set_refmap_bit(dlm, res, ml->node); in dlm_process_recovery_data()
2026 spin_unlock(&res->spinlock); in dlm_process_recovery_data()
2032 spin_lock(&res->spinlock); in dlm_process_recovery_data()
2034 spin_unlock(&res->spinlock); in dlm_process_recovery_data()
2047 struct dlm_lock *lock, *next; in dlm_move_lockres_to_recovery_list() local
2049 assert_spin_locked(&dlm->spinlock); in dlm_move_lockres_to_recovery_list()
2050 assert_spin_locked(&res->spinlock); in dlm_move_lockres_to_recovery_list()
2051 res->state |= DLM_LOCK_RES_RECOVERING; in dlm_move_lockres_to_recovery_list()
2052 if (!list_empty(&res->recovering)) { in dlm_move_lockres_to_recovery_list()
2055 dlm->name, res->lockname.len, res->lockname.name); in dlm_move_lockres_to_recovery_list()
2056 list_del_init(&res->recovering); in dlm_move_lockres_to_recovery_list()
2061 list_add_tail(&res->recovering, &dlm->reco.resources); in dlm_move_lockres_to_recovery_list()
2064 for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) { in dlm_move_lockres_to_recovery_list()
2066 list_for_each_entry_safe(lock, next, queue, list) { in dlm_move_lockres_to_recovery_list()
2067 dlm_lock_get(lock); in dlm_move_lockres_to_recovery_list()
2068 if (lock->convert_pending) { in dlm_move_lockres_to_recovery_list()
2069 /* move converting lock back to granted */ in dlm_move_lockres_to_recovery_list()
2072 res->lockname.len, res->lockname.name); in dlm_move_lockres_to_recovery_list()
2073 dlm_revert_pending_convert(res, lock); in dlm_move_lockres_to_recovery_list()
2074 lock->convert_pending = 0; in dlm_move_lockres_to_recovery_list()
2075 } else if (lock->lock_pending) { in dlm_move_lockres_to_recovery_list()
2076 /* remove pending lock requests completely */ in dlm_move_lockres_to_recovery_list()
2078 mlog(0, "node died with lock pending " in dlm_move_lockres_to_recovery_list()
2080 res->lockname.len, res->lockname.name); in dlm_move_lockres_to_recovery_list()
2081 /* lock will be floating until ref in in dlm_move_lockres_to_recovery_list()
2086 dlm_revert_pending_lock(res, lock); in dlm_move_lockres_to_recovery_list()
2087 lock->lock_pending = 0; in dlm_move_lockres_to_recovery_list()
2088 } else if (lock->unlock_pending) { in dlm_move_lockres_to_recovery_list()
2091 * before sending this lock state to the in dlm_move_lockres_to_recovery_list()
2101 res->lockname.len, res->lockname.name); in dlm_move_lockres_to_recovery_list()
2102 dlm_commit_pending_unlock(res, lock); in dlm_move_lockres_to_recovery_list()
2103 lock->unlock_pending = 0; in dlm_move_lockres_to_recovery_list()
2104 } else if (lock->cancel_pending) { in dlm_move_lockres_to_recovery_list()
2107 * before sending this lock state to the in dlm_move_lockres_to_recovery_list()
2112 res->lockname.len, res->lockname.name); in dlm_move_lockres_to_recovery_list()
2113 dlm_commit_pending_cancel(res, lock); in dlm_move_lockres_to_recovery_list()
2114 lock->cancel_pending = 0; in dlm_move_lockres_to_recovery_list()
2116 dlm_lock_put(lock); in dlm_move_lockres_to_recovery_list()
2124 * sets the res->owner to the new master.
2133 assert_spin_locked(&dlm->spinlock); in dlm_finish_local_lockres_recovery()
2135 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { in dlm_finish_local_lockres_recovery()
2136 if (res->owner == dead_node) { in dlm_finish_local_lockres_recovery()
2138 dlm->name, res->lockname.len, res->lockname.name, in dlm_finish_local_lockres_recovery()
2139 res->owner, new_master); in dlm_finish_local_lockres_recovery()
2140 list_del_init(&res->recovering); in dlm_finish_local_lockres_recovery()
2141 spin_lock(&res->spinlock); in dlm_finish_local_lockres_recovery()
2143 * the lock state sent during recovery */ in dlm_finish_local_lockres_recovery()
2145 res->state &= ~DLM_LOCK_RES_RECOVERING; in dlm_finish_local_lockres_recovery()
2148 spin_unlock(&res->spinlock); in dlm_finish_local_lockres_recovery()
2149 wake_up(&res->wq); in dlm_finish_local_lockres_recovery()
2156 * the RECOVERING state and set the owner in dlm_finish_local_lockres_recovery()
2161 if (res->state & DLM_LOCK_RES_RECOVERY_WAITING) { in dlm_finish_local_lockres_recovery()
2162 spin_lock(&res->spinlock); in dlm_finish_local_lockres_recovery()
2163 res->state &= ~DLM_LOCK_RES_RECOVERY_WAITING; in dlm_finish_local_lockres_recovery()
2164 spin_unlock(&res->spinlock); in dlm_finish_local_lockres_recovery()
2165 wake_up(&res->wq); in dlm_finish_local_lockres_recovery()
2168 if (!(res->state & DLM_LOCK_RES_RECOVERING)) in dlm_finish_local_lockres_recovery()
2171 if (res->owner != dead_node && in dlm_finish_local_lockres_recovery()
2172 res->owner != dlm->node_num) in dlm_finish_local_lockres_recovery()
2175 if (!list_empty(&res->recovering)) { in dlm_finish_local_lockres_recovery()
2176 list_del_init(&res->recovering); in dlm_finish_local_lockres_recovery()
2181 * the lock state sent during recovery */ in dlm_finish_local_lockres_recovery()
2183 dlm->name, res->lockname.len, res->lockname.name, in dlm_finish_local_lockres_recovery()
2184 res->owner, new_master); in dlm_finish_local_lockres_recovery()
2185 spin_lock(&res->spinlock); in dlm_finish_local_lockres_recovery()
2187 res->state &= ~DLM_LOCK_RES_RECOVERING; in dlm_finish_local_lockres_recovery()
2190 spin_unlock(&res->spinlock); in dlm_finish_local_lockres_recovery()
2191 wake_up(&res->wq); in dlm_finish_local_lockres_recovery()
2196 static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local) in dlm_lvb_needs_invalidation() argument
2199 if (lock->ml.type != LKM_EXMODE && in dlm_lvb_needs_invalidation()
2200 lock->ml.type != LKM_PRMODE) in dlm_lvb_needs_invalidation()
2202 } else if (lock->ml.type == LKM_EXMODE) in dlm_lvb_needs_invalidation()
2211 struct dlm_lock *lock; in dlm_revalidate_lvb() local
2216 assert_spin_locked(&dlm->spinlock); in dlm_revalidate_lvb()
2217 assert_spin_locked(&res->spinlock); in dlm_revalidate_lvb()
2219 if (res->owner == dlm->node_num) in dlm_revalidate_lvb()
2226 search_node = dlm->node_num; in dlm_revalidate_lvb()
2227 local = 1; /* check local state for valid lvb */ in dlm_revalidate_lvb()
2232 list_for_each_entry(lock, queue, list) { in dlm_revalidate_lvb()
2233 if (lock->ml.node == search_node) { in dlm_revalidate_lvb()
2234 if (dlm_lvb_needs_invalidation(lock, local)) { in dlm_revalidate_lvb()
2237 memset(lock->lksb->lvb, 0, DLM_LVB_LEN); in dlm_revalidate_lvb()
2245 res->lockname.len, res->lockname.name, dead_node); in dlm_revalidate_lvb()
2246 memset(res->lvb, 0, DLM_LVB_LEN); in dlm_revalidate_lvb()
2253 struct dlm_lock *lock, *next; in dlm_free_dead_locks() local
2260 assert_spin_locked(&dlm->spinlock); in dlm_free_dead_locks()
2261 assert_spin_locked(&res->spinlock); in dlm_free_dead_locks()
2267 list_for_each_entry_safe(lock, next, &res->granted, list) { in dlm_free_dead_locks()
2268 if (lock->ml.node == dead_node) { in dlm_free_dead_locks()
2269 list_del_init(&lock->list); in dlm_free_dead_locks()
2270 dlm_lock_put(lock); in dlm_free_dead_locks()
2271 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */ in dlm_free_dead_locks()
2272 dlm_lock_put(lock); in dlm_free_dead_locks()
2276 list_for_each_entry_safe(lock, next, &res->converting, list) { in dlm_free_dead_locks()
2277 if (lock->ml.node == dead_node) { in dlm_free_dead_locks()
2278 list_del_init(&lock->list); in dlm_free_dead_locks()
2279 dlm_lock_put(lock); in dlm_free_dead_locks()
2280 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */ in dlm_free_dead_locks()
2281 dlm_lock_put(lock); in dlm_free_dead_locks()
2285 list_for_each_entry_safe(lock, next, &res->blocked, list) { in dlm_free_dead_locks()
2286 if (lock->ml.node == dead_node) { in dlm_free_dead_locks()
2287 list_del_init(&lock->list); in dlm_free_dead_locks()
2288 dlm_lock_put(lock); in dlm_free_dead_locks()
2289 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */ in dlm_free_dead_locks()
2290 dlm_lock_put(lock); in dlm_free_dead_locks()
2297 "dropping ref from lockres\n", dlm->name, in dlm_free_dead_locks()
2298 res->lockname.len, res->lockname.name, freed, dead_node); in dlm_free_dead_locks()
2299 if(!test_bit(dead_node, res->refmap)) { in dlm_free_dead_locks()
2301 "but ref was not set\n", dlm->name, in dlm_free_dead_locks()
2302 res->lockname.len, res->lockname.name, freed, dead_node); in dlm_free_dead_locks()
2305 res->state |= DLM_LOCK_RES_RECOVERY_WAITING; in dlm_free_dead_locks()
2307 } else if (test_bit(dead_node, res->refmap)) { in dlm_free_dead_locks()
2309 "no locks and had not purged before dying\n", dlm->name, in dlm_free_dead_locks()
2310 res->lockname.len, res->lockname.name, dead_node); in dlm_free_dead_locks()
2324 struct dlm_lock *lock; in dlm_do_local_recovery_cleanup() local
2331 * now clean up all lock resources. there are two rules: in dlm_do_local_recovery_cleanup()
2349 if (dlm_is_recovery_lock(res->lockname.name, in dlm_do_local_recovery_cleanup()
2350 res->lockname.len)) { in dlm_do_local_recovery_cleanup()
2351 spin_lock(&res->spinlock); in dlm_do_local_recovery_cleanup()
2352 list_for_each_entry(lock, &res->granted, list) { in dlm_do_local_recovery_cleanup()
2353 if (lock->ml.node == dead_node) { in dlm_do_local_recovery_cleanup()
2355 "a $RECOVERY lock for dead " in dlm_do_local_recovery_cleanup()
2357 dead_node, dlm->name); in dlm_do_local_recovery_cleanup()
2358 list_del_init(&lock->list); in dlm_do_local_recovery_cleanup()
2359 dlm_lock_put(lock); in dlm_do_local_recovery_cleanup()
2362 * - do manually */ in dlm_do_local_recovery_cleanup()
2363 dlm_lock_put(lock); in dlm_do_local_recovery_cleanup()
2368 if ((res->owner == dead_node) && in dlm_do_local_recovery_cleanup()
2369 (res->state & DLM_LOCK_RES_DROPPING_REF)) { in dlm_do_local_recovery_cleanup()
2372 spin_unlock(&res->spinlock); in dlm_do_local_recovery_cleanup()
2373 wake_up(&res->wq); in dlm_do_local_recovery_cleanup()
2376 } else if (res->owner == dlm->node_num) in dlm_do_local_recovery_cleanup()
2378 spin_unlock(&res->spinlock); in dlm_do_local_recovery_cleanup()
2381 spin_lock(&res->spinlock); in dlm_do_local_recovery_cleanup()
2384 if (res->owner == dead_node) { in dlm_do_local_recovery_cleanup()
2385 if (res->state & DLM_LOCK_RES_DROPPING_REF) { in dlm_do_local_recovery_cleanup()
2390 dlm->name, res->lockname.len, in dlm_do_local_recovery_cleanup()
2391 res->lockname.name, dead_node); in dlm_do_local_recovery_cleanup()
2394 spin_unlock(&res->spinlock); in dlm_do_local_recovery_cleanup()
2395 wake_up(&res->wq); in dlm_do_local_recovery_cleanup()
2400 } else if (res->owner == dlm->node_num) { in dlm_do_local_recovery_cleanup()
2403 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { in dlm_do_local_recovery_cleanup()
2404 if (test_bit(dead_node, res->refmap)) { in dlm_do_local_recovery_cleanup()
2407 dlm->name, res->lockname.len, in dlm_do_local_recovery_cleanup()
2408 res->lockname.name, dead_node); in dlm_do_local_recovery_cleanup()
2412 spin_unlock(&res->spinlock); in dlm_do_local_recovery_cleanup()
2420 assert_spin_locked(&dlm->spinlock); in __dlm_hb_node_down()
2422 if (dlm->reco.new_master == idx) { in __dlm_hb_node_down()
2424 dlm->name, idx); in __dlm_hb_node_down()
2425 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in __dlm_hb_node_down()
2430 "finalize1 state, clearing\n", dlm->name, idx); in __dlm_hb_node_down()
2431 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in __dlm_hb_node_down()
2436 /* Clean up join state on node death. */ in __dlm_hb_node_down()
2437 if (dlm->joining_node == idx) { in __dlm_hb_node_down()
2438 mlog(0, "Clearing join state for node %u\n", idx); in __dlm_hb_node_down()
2443 if (!test_bit(idx, dlm->live_nodes_map)) { in __dlm_hb_node_down()
2446 dlm->name, idx); in __dlm_hb_node_down()
2451 if (!test_bit(idx, dlm->domain_map)) { in __dlm_hb_node_down()
2458 clear_bit(idx, dlm->live_nodes_map); in __dlm_hb_node_down()
2461 if (!test_bit(idx, dlm->recovery_map)) in __dlm_hb_node_down()
2468 clear_bit(idx, dlm->domain_map); in __dlm_hb_node_down()
2469 clear_bit(idx, dlm->exit_domain_map); in __dlm_hb_node_down()
2472 wake_up(&dlm->migration_wq); in __dlm_hb_node_down()
2474 set_bit(idx, dlm->recovery_map); in __dlm_hb_node_down()
2488 if (test_bit(idx, dlm->domain_map)) in dlm_hb_node_down_cb()
2491 spin_lock(&dlm->spinlock); in dlm_hb_node_down_cb()
2493 spin_unlock(&dlm->spinlock); in dlm_hb_node_down_cb()
2505 spin_lock(&dlm->spinlock); in dlm_hb_node_up_cb()
2506 set_bit(idx, dlm->live_nodes_map); in dlm_hb_node_up_cb()
2509 spin_unlock(&dlm->spinlock); in dlm_hb_node_up_cb()
2517 mlog(0, "ast for recovery lock fired!, this=%u, dlm=%s\n", in dlm_reco_ast()
2518 dlm->node_num, dlm->name); in dlm_reco_ast()
2523 mlog(0, "bast for recovery lock fired!, this=%u, dlm=%s\n", in dlm_reco_bast()
2524 dlm->node_num, dlm->name); in dlm_reco_bast()
2528 mlog(0, "unlockast for recovery lock fired!\n"); in dlm_reco_unlock_ast()
2538 * or b) dlm->reco.new_master gets set to some nodenum
2547 int status = -EINVAL; in dlm_pick_recovery_master()
2550 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); in dlm_pick_recovery_master()
2559 dlm->name, ret, lksb.status); in dlm_pick_recovery_master()
2563 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2565 /* got the EX lock. check to see if another node in dlm_pick_recovery_master()
2568 mlog(0, "%s: got reco EX lock, but %u will " in dlm_pick_recovery_master()
2569 "do the recovery\n", dlm->name, in dlm_pick_recovery_master()
2570 dlm->reco.new_master); in dlm_pick_recovery_master()
2571 status = -EEXIST; in dlm_pick_recovery_master()
2576 spin_lock(&dlm->spinlock); in dlm_pick_recovery_master()
2577 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_pick_recovery_master()
2578 status = -EINVAL; in dlm_pick_recovery_master()
2579 mlog(0, "%s: got reco EX lock, but " in dlm_pick_recovery_master()
2580 "node got recovered already\n", dlm->name); in dlm_pick_recovery_master()
2581 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { in dlm_pick_recovery_master()
2584 dlm->name, dlm->reco.new_master); in dlm_pick_recovery_master()
2588 spin_unlock(&dlm->spinlock); in dlm_pick_recovery_master()
2595 "begin_reco now\n", dlm->name, in dlm_pick_recovery_master()
2596 dlm->reco.dead_node, dlm->node_num); in dlm_pick_recovery_master()
2598 dlm->reco.dead_node); in dlm_pick_recovery_master()
2603 spin_lock(&dlm->spinlock); in dlm_pick_recovery_master()
2604 dlm_set_reco_master(dlm, dlm->node_num); in dlm_pick_recovery_master()
2605 spin_unlock(&dlm->spinlock); in dlm_pick_recovery_master()
2608 /* recovery lock is a special case. ast will not get fired, in dlm_pick_recovery_master()
2619 * is actually "done" and the lock structure is in dlm_pick_recovery_master()
2621 * because this specific lock name is special. */ in dlm_pick_recovery_master()
2626 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2630 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_pick_recovery_master()
2635 dlm->name); in dlm_pick_recovery_master()
2640 dlm->name, dlm->reco.new_master, dlm->reco.dead_node); in dlm_pick_recovery_master()
2641 status = -EEXIST; in dlm_pick_recovery_master()
2644 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2651 "lksb.status=%s\n", dlm->name, dlm_errname(ret), in dlm_pick_recovery_master()
2659 mlog(ML_ERROR, "recovery lock not found\n"); in dlm_pick_recovery_master()
2675 mlog(0, "%s: dead node is %u\n", dlm->name, dead_node); in dlm_send_begin_reco_message()
2677 spin_lock(&dlm->spinlock); in dlm_send_begin_reco_message()
2678 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_send_begin_reco_message()
2679 spin_unlock(&dlm->spinlock); in dlm_send_begin_reco_message()
2684 br.node_idx = dlm->node_num; in dlm_send_begin_reco_message()
2694 if (nodenum == dlm->node_num) { in dlm_send_begin_reco_message()
2701 ret = o2net_send_message(DLM_BEGIN_RECO_MSG, dlm->key, in dlm_send_begin_reco_message()
2710 "begin reco msg (%d)\n", dlm->name, nodenum, ret); in dlm_send_begin_reco_message()
2716 * dlm_begin_reco_handler() returned EAGAIN and not -EAGAIN. in dlm_send_begin_reco_message()
2719 if (ret == -EAGAIN || ret == EAGAIN) { in dlm_send_begin_reco_message()
2722 "to complete, backoff for a bit\n", dlm->name, in dlm_send_begin_reco_message()
2734 "returned %d\n", dlm->name, nodenum, ret); in dlm_send_begin_reco_message()
2741 mlog(ML_ERROR, "recovery lock not found\n"); in dlm_send_begin_reco_message()
2757 struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf; in dlm_begin_reco_handler()
2763 spin_lock(&dlm->spinlock); in dlm_begin_reco_handler()
2764 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in dlm_begin_reco_handler()
2766 "but this node is in finalize state, waiting on finalize2\n", in dlm_begin_reco_handler()
2767 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2768 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2769 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2771 return -EAGAIN; in dlm_begin_reco_handler()
2773 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2776 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2777 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2779 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node); in dlm_begin_reco_handler()
2781 spin_lock(&dlm->spinlock); in dlm_begin_reco_handler()
2782 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { in dlm_begin_reco_handler()
2783 if (test_bit(dlm->reco.new_master, dlm->recovery_map)) { in dlm_begin_reco_handler()
2785 "to %u\n", dlm->name, dlm->reco.new_master, in dlm_begin_reco_handler()
2786 br->node_idx); in dlm_begin_reco_handler()
2789 "to %u\n", dlm->name, dlm->reco.new_master, in dlm_begin_reco_handler()
2790 br->node_idx); in dlm_begin_reco_handler()
2794 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { in dlm_begin_reco_handler()
2796 "node %u changing it to %u\n", dlm->name, in dlm_begin_reco_handler()
2797 dlm->reco.dead_node, br->node_idx, br->dead_node); in dlm_begin_reco_handler()
2799 dlm_set_reco_master(dlm, br->node_idx); in dlm_begin_reco_handler()
2800 dlm_set_reco_dead_node(dlm, br->dead_node); in dlm_begin_reco_handler()
2801 if (!test_bit(br->dead_node, dlm->recovery_map)) { in dlm_begin_reco_handler()
2804 br->node_idx, br->dead_node, br->dead_node); in dlm_begin_reco_handler()
2805 if (!test_bit(br->dead_node, dlm->domain_map) || in dlm_begin_reco_handler()
2806 !test_bit(br->dead_node, dlm->live_nodes_map)) in dlm_begin_reco_handler()
2809 br->dead_node); in dlm_begin_reco_handler()
2812 set_bit(br->dead_node, dlm->domain_map); in dlm_begin_reco_handler()
2813 set_bit(br->dead_node, dlm->live_nodes_map); in dlm_begin_reco_handler()
2814 __dlm_hb_node_down(dlm, br->dead_node); in dlm_begin_reco_handler()
2816 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2821 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2822 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2839 "stage %d\n", dlm->name, dlm->reco.dead_node, stage); in dlm_send_finalize_reco_message()
2841 spin_lock(&dlm->spinlock); in dlm_send_finalize_reco_message()
2842 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_send_finalize_reco_message()
2843 spin_unlock(&dlm->spinlock); in dlm_send_finalize_reco_message()
2847 fr.node_idx = dlm->node_num; in dlm_send_finalize_reco_message()
2848 fr.dead_node = dlm->reco.dead_node; in dlm_send_finalize_reco_message()
2853 if (nodenum == dlm->node_num) in dlm_send_finalize_reco_message()
2855 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, in dlm_send_finalize_reco_message()
2862 dlm->key, nodenum); in dlm_send_finalize_reco_message()
2877 iter.curnode = -1; in dlm_send_finalize_reco_message()
2889 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; in dlm_finalize_reco_handler()
2896 if (fr->flags & DLM_FINALIZE_STAGE2) in dlm_finalize_reco_handler()
2900 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage, in dlm_finalize_reco_handler()
2901 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master); in dlm_finalize_reco_handler()
2903 spin_lock(&dlm->spinlock); in dlm_finalize_reco_handler()
2905 if (dlm->reco.new_master != fr->node_idx) { in dlm_finalize_reco_handler()
2908 fr->node_idx, dlm->reco.new_master, fr->dead_node); in dlm_finalize_reco_handler()
2911 if (dlm->reco.dead_node != fr->dead_node) { in dlm_finalize_reco_handler()
2914 fr->node_idx, fr->dead_node, dlm->reco.dead_node); in dlm_finalize_reco_handler()
2920 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); in dlm_finalize_reco_handler()
2921 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in dlm_finalize_reco_handler()
2925 dlm->name, fr->node_idx, fr->dead_node); in dlm_finalize_reco_handler()
2929 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; in dlm_finalize_reco_handler()
2930 spin_unlock(&dlm->spinlock); in dlm_finalize_reco_handler()
2933 if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) { in dlm_finalize_reco_handler()
2937 dlm->name, fr->node_idx, fr->dead_node); in dlm_finalize_reco_handler()
2941 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in dlm_finalize_reco_handler()
2943 spin_unlock(&dlm->spinlock); in dlm_finalize_reco_handler()
2949 dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master); in dlm_finalize_reco_handler()