1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include "dlm_internal.h" 13 #include "lockspace.h" 14 #include "member.h" 15 #include "lowcomms.h" 16 #include "rcom.h" 17 #include "config.h" 18 #include "memory.h" 19 #include "recover.h" 20 #include "util.h" 21 #include "lock.h" 22 #include "dir.h" 23 24 /* 25 * We use the upper 16 bits of the hash value to select the directory node. 26 * Low bits are used for distribution of rsb's among hash buckets on each node. 27 * 28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of 29 * num_nodes to the hash value. This value in the desired range is used as an 30 * offset into the sorted list of nodeid's to give the particular nodeid. 31 */ 32 33 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) 34 { 35 uint32_t node; 36 37 if (ls->ls_num_nodes == 1) 38 return dlm_our_nodeid(); 39 else { 40 node = (hash >> 16) % ls->ls_total_weight; 41 return ls->ls_node_array[node]; 42 } 43 } 44 45 int dlm_dir_nodeid(struct dlm_rsb *r) 46 { 47 return r->res_dir_nodeid; 48 } 49 50 void dlm_recover_dir_nodeid(struct dlm_ls *ls) 51 { 52 struct dlm_rsb *r; 53 54 down_read(&ls->ls_root_sem); 55 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 56 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash); 57 } 58 up_read(&ls->ls_root_sem); 59 } 60 61 int dlm_recover_directory(struct dlm_ls *ls) 62 { 63 struct dlm_member *memb; 64 char *b, *last_name = NULL; 65 int error = -ENOMEM, last_len, nodeid, result; 66 uint16_t namelen; 67 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0; 68 69 log_rinfo(ls, "dlm_recover_directory"); 70 71 if (dlm_no_directory(ls)) 72 goto out_status; 73 74 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS); 75 if (!last_name) 76 goto out; 77 78 list_for_each_entry(memb, &ls->ls_nodes, list) { 79 if (memb->nodeid == dlm_our_nodeid()) 80 continue; 81 82 memset(last_name, 0, DLM_RESNAME_MAXLEN); 83 last_len = 0; 84 85 for (;;) { 86 int left; 87 error = dlm_recovery_stopped(ls); 88 if (error) { 89 error = -EINTR; 90 goto out_free; 91 } 92 93 error = dlm_rcom_names(ls, memb->nodeid, 94 last_name, last_len); 95 if (error) 96 goto out_free; 97 98 cond_resched(); 99 100 /* 101 * pick namelen/name pairs out of received buffer 102 */ 103 104 b = ls->ls_recover_buf->rc_buf; 105 left = ls->ls_recover_buf->rc_header.h_length; 106 left -= sizeof(struct dlm_rcom); 107 108 for (;;) { 109 __be16 v; 110 111 error = -EINVAL; 112 if (left < sizeof(__be16)) 113 goto out_free; 114 115 memcpy(&v, b, sizeof(__be16)); 116 namelen = be16_to_cpu(v); 117 b += sizeof(__be16); 118 left -= sizeof(__be16); 119 120 /* namelen of 0xFFFFF marks end of names for 121 this node; namelen of 0 marks end of the 122 buffer */ 123 124 if (namelen == 0xFFFF) 125 goto done; 126 if (!namelen) 127 break; 128 129 if (namelen > left) 130 goto out_free; 131 132 if (namelen > DLM_RESNAME_MAXLEN) 133 goto out_free; 134 135 error = dlm_master_lookup(ls, memb->nodeid, 136 b, namelen, 137 DLM_LU_RECOVER_DIR, 138 &nodeid, &result); 139 if (error) { 140 log_error(ls, "recover_dir lookup %d", 141 error); 142 goto out_free; 143 } 144 145 /* The name was found in rsbtbl, but the 146 * master nodeid is different from 147 * memb->nodeid which says it is the master. 148 * This should not happen. */ 149 150 if (result == DLM_LU_MATCH && 151 nodeid != memb->nodeid) { 152 count_bad++; 153 log_error(ls, "recover_dir lookup %d " 154 "nodeid %d memb %d bad %u", 155 result, nodeid, memb->nodeid, 156 count_bad); 157 print_hex_dump_bytes("dlm_recover_dir ", 158 DUMP_PREFIX_NONE, 159 b, namelen); 160 } 161 162 /* The name was found in rsbtbl, and the 163 * master nodeid matches memb->nodeid. */ 164 165 if (result == DLM_LU_MATCH && 166 nodeid == memb->nodeid) { 167 count_match++; 168 } 169 170 /* The name was not found in rsbtbl and was 171 * added with memb->nodeid as the master. */ 172 173 if (result == DLM_LU_ADD) { 174 count_add++; 175 } 176 177 last_len = namelen; 178 memcpy(last_name, b, namelen); 179 b += namelen; 180 left -= namelen; 181 count++; 182 } 183 } 184 done: 185 ; 186 } 187 188 out_status: 189 error = 0; 190 dlm_set_recover_status(ls, DLM_RS_DIR); 191 192 log_rinfo(ls, "dlm_recover_directory %u in %u new", 193 count, count_add); 194 out_free: 195 kfree(last_name); 196 out: 197 return error; 198 } 199 200 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) 201 { 202 struct dlm_rsb *r; 203 uint32_t hash, bucket; 204 int rv; 205 206 hash = jhash(name, len, 0); 207 bucket = hash & (ls->ls_rsbtbl_size - 1); 208 209 spin_lock(&ls->ls_rsbtbl[bucket].lock); 210 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r); 211 if (rv) 212 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, 213 name, len, &r); 214 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 215 216 if (!rv) 217 return r; 218 219 down_read(&ls->ls_root_sem); 220 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 221 if (len == r->res_length && !memcmp(name, r->res_name, len)) { 222 up_read(&ls->ls_root_sem); 223 log_debug(ls, "find_rsb_root revert to root_list %s", 224 r->res_name); 225 return r; 226 } 227 } 228 up_read(&ls->ls_root_sem); 229 return NULL; 230 } 231 232 /* Find the rsb where we left off (or start again), then send rsb names 233 for rsb's we're master of and whose directory node matches the requesting 234 node. inbuf is the rsb name last sent, inlen is the name's length */ 235 236 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, 237 char *outbuf, int outlen, int nodeid) 238 { 239 struct list_head *list; 240 struct dlm_rsb *r; 241 int offset = 0, dir_nodeid; 242 __be16 be_namelen; 243 244 down_read(&ls->ls_root_sem); 245 246 if (inlen > 1) { 247 r = find_rsb_root(ls, inbuf, inlen); 248 if (!r) { 249 inbuf[inlen - 1] = '\0'; 250 log_error(ls, "copy_master_names from %d start %d %s", 251 nodeid, inlen, inbuf); 252 goto out; 253 } 254 list = r->res_root_list.next; 255 } else { 256 list = ls->ls_root_list.next; 257 } 258 259 for (offset = 0; list != &ls->ls_root_list; list = list->next) { 260 r = list_entry(list, struct dlm_rsb, res_root_list); 261 if (r->res_nodeid) 262 continue; 263 264 dir_nodeid = dlm_dir_nodeid(r); 265 if (dir_nodeid != nodeid) 266 continue; 267 268 /* 269 * The block ends when we can't fit the following in the 270 * remaining buffer space: 271 * namelen (uint16_t) + 272 * name (r->res_length) + 273 * end-of-block record 0x0000 (uint16_t) 274 */ 275 276 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) { 277 /* Write end-of-block record */ 278 be_namelen = cpu_to_be16(0); 279 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 280 offset += sizeof(__be16); 281 ls->ls_recover_dir_sent_msg++; 282 goto out; 283 } 284 285 be_namelen = cpu_to_be16(r->res_length); 286 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 287 offset += sizeof(__be16); 288 memcpy(outbuf + offset, r->res_name, r->res_length); 289 offset += r->res_length; 290 ls->ls_recover_dir_sent_res++; 291 } 292 293 /* 294 * If we've reached the end of the list (and there's room) write a 295 * terminating record. 296 */ 297 298 if ((list == &ls->ls_root_list) && 299 (offset + sizeof(uint16_t) <= outlen)) { 300 be_namelen = cpu_to_be16(0xFFFF); 301 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 302 offset += sizeof(__be16); 303 ls->ls_recover_dir_sent_msg++; 304 } 305 out: 306 up_read(&ls->ls_root_sem); 307 } 308 309