1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. 6 ** 7 ** 8 ******************************************************************************* 9 ******************************************************************************/ 10 11 #include "dlm_internal.h" 12 #include "member.h" 13 #include "lock.h" 14 #include "dir.h" 15 #include "config.h" 16 #include "requestqueue.h" 17 #include "util.h" 18 19 struct rq_entry { 20 struct list_head list; 21 uint32_t recover_seq; 22 int nodeid; 23 struct dlm_message request; 24 }; 25 26 /* 27 * Requests received while the lockspace is in recovery get added to the 28 * request queue and processed when recovery is complete. This happens when 29 * the lockspace is suspended on some nodes before it is on others, or the 30 * lockspace is enabled on some while still suspended on others. 31 */ 32 33 void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, 34 const struct dlm_message *ms) 35 { 36 struct rq_entry *e; 37 int length = le16_to_cpu(ms->m_header.h_length) - 38 sizeof(struct dlm_message); 39 40 e = kmalloc(sizeof(struct rq_entry) + length, GFP_ATOMIC); 41 if (!e) { 42 log_print("dlm_add_requestqueue: out of memory len %d", length); 43 return; 44 } 45 46 e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF; 47 e->nodeid = nodeid; 48 memcpy(&e->request, ms, sizeof(*ms)); 49 memcpy(&e->request.m_extra, ms->m_extra, length); 50 51 list_add_tail(&e->list, &ls->ls_requestqueue); 52 } 53 54 /* 55 * Called by dlm_recoverd to process normal messages saved while recovery was 56 * happening. Normal locking has been enabled before this is called. dlm_recv 57 * upon receiving a message, will wait for all saved messages to be drained 58 * here before processing the message it got. If a new dlm_ls_stop() arrives 59 * while we're processing these saved messages, it may block trying to suspend 60 * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue. In that 61 * case, we don't abort since locking_stopped is still 0. If dlm_recv is not 62 * waiting for us, then this processing may be aborted due to locking_stopped. 63 */ 64 65 int dlm_process_requestqueue(struct dlm_ls *ls) 66 { 67 struct rq_entry *e; 68 struct dlm_message *ms; 69 int error = 0; 70 71 write_lock_bh(&ls->ls_requestqueue_lock); 72 for (;;) { 73 if (list_empty(&ls->ls_requestqueue)) { 74 clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags); 75 error = 0; 76 break; 77 } 78 e = list_first_entry(&ls->ls_requestqueue, struct rq_entry, list); 79 80 ms = &e->request; 81 82 log_limit(ls, "dlm_process_requestqueue msg %d from %d " 83 "lkid %x remid %x result %d seq %u", 84 le32_to_cpu(ms->m_type), 85 le32_to_cpu(ms->m_header.h_nodeid), 86 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), 87 from_dlm_errno(le32_to_cpu(ms->m_result)), 88 e->recover_seq); 89 90 dlm_receive_message_saved(ls, &e->request, e->recover_seq); 91 list_del(&e->list); 92 kfree(e); 93 94 if (dlm_locking_stopped(ls)) { 95 log_debug(ls, "process_requestqueue abort running"); 96 error = -EINTR; 97 break; 98 } 99 write_unlock_bh(&ls->ls_requestqueue_lock); 100 schedule(); 101 write_lock_bh(&ls->ls_requestqueue_lock); 102 } 103 write_unlock_bh(&ls->ls_requestqueue_lock); 104 105 return error; 106 } 107 108 static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) 109 { 110 __le32 type = ms->m_type; 111 112 /* the ls is being cleaned up and freed by release_lockspace */ 113 if (!atomic_read(&ls->ls_count)) 114 return 1; 115 116 if (dlm_is_removed(ls, nodeid)) 117 return 1; 118 119 /* directory operations are always purged because the directory is 120 always rebuilt during recovery and the lookups resent */ 121 122 if (type == cpu_to_le32(DLM_MSG_REMOVE) || 123 type == cpu_to_le32(DLM_MSG_LOOKUP) || 124 type == cpu_to_le32(DLM_MSG_LOOKUP_REPLY)) 125 return 1; 126 127 if (!dlm_no_directory(ls)) 128 return 0; 129 130 return 1; 131 } 132 133 void dlm_purge_requestqueue(struct dlm_ls *ls) 134 { 135 struct dlm_message *ms; 136 struct rq_entry *e, *safe; 137 138 write_lock_bh(&ls->ls_requestqueue_lock); 139 list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) { 140 ms = &e->request; 141 142 if (purge_request(ls, ms, e->nodeid)) { 143 list_del(&e->list); 144 kfree(e); 145 } 146 } 147 write_unlock_bh(&ls->ls_requestqueue_lock); 148 } 149 150