1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 6 ** 7 ** This copyrighted material is made available to anyone wishing to use, 8 ** modify, copy, or redistribute it subject to the terms and conditions 9 ** of the GNU General Public License v.2. 10 ** 11 ******************************************************************************* 12 ******************************************************************************/ 13 14 #include "dlm_internal.h" 15 #include "lockspace.h" 16 #include "member.h" 17 #include "lowcomms.h" 18 #include "rcom.h" 19 #include "config.h" 20 #include "memory.h" 21 #include "recover.h" 22 #include "util.h" 23 #include "lock.h" 24 #include "dir.h" 25 26 27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de) 28 { 29 spin_lock(&ls->ls_recover_list_lock); 30 list_add(&de->list, &ls->ls_recover_list); 31 spin_unlock(&ls->ls_recover_list_lock); 32 } 33 34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len) 35 { 36 int found = 0; 37 struct dlm_direntry *de; 38 39 spin_lock(&ls->ls_recover_list_lock); 40 list_for_each_entry(de, &ls->ls_recover_list, list) { 41 if (de->length == len) { 42 list_del(&de->list); 43 de->master_nodeid = 0; 44 memset(de->name, 0, len); 45 found = 1; 46 break; 47 } 48 } 49 spin_unlock(&ls->ls_recover_list_lock); 50 51 if (!found) 52 de = allocate_direntry(ls, len); 53 return de; 54 } 55 56 void dlm_clear_free_entries(struct dlm_ls *ls) 57 { 58 struct dlm_direntry *de; 59 60 spin_lock(&ls->ls_recover_list_lock); 61 while (!list_empty(&ls->ls_recover_list)) { 62 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry, 63 list); 64 list_del(&de->list); 65 free_direntry(de); 66 } 67 spin_unlock(&ls->ls_recover_list_lock); 68 } 69 70 /* 71 * We use the upper 16 bits of the hash value to select the directory node. 72 * Low bits are used for distribution of rsb's among hash buckets on each node. 73 * 74 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of 75 * num_nodes to the hash value. This value in the desired range is used as an 76 * offset into the sorted list of nodeid's to give the particular nodeid. 77 */ 78 79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) 80 { 81 struct list_head *tmp; 82 struct dlm_member *memb = NULL; 83 uint32_t node, n = 0; 84 int nodeid; 85 86 if (ls->ls_num_nodes == 1) { 87 nodeid = dlm_our_nodeid(); 88 goto out; 89 } 90 91 if (ls->ls_node_array) { 92 node = (hash >> 16) % ls->ls_total_weight; 93 nodeid = ls->ls_node_array[node]; 94 goto out; 95 } 96 97 /* make_member_array() failed to kmalloc ls_node_array... */ 98 99 node = (hash >> 16) % ls->ls_num_nodes; 100 101 list_for_each(tmp, &ls->ls_nodes) { 102 if (n++ != node) 103 continue; 104 memb = list_entry(tmp, struct dlm_member, list); 105 break; 106 } 107 108 DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n", 109 ls->ls_num_nodes, n, node);); 110 nodeid = memb->nodeid; 111 out: 112 return nodeid; 113 } 114 115 int dlm_dir_nodeid(struct dlm_rsb *r) 116 { 117 return dlm_hash2nodeid(r->res_ls, r->res_hash); 118 } 119 120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len) 121 { 122 uint32_t val; 123 124 val = jhash(name, len, 0); 125 val &= (ls->ls_dirtbl_size - 1); 126 127 return val; 128 } 129 130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de) 131 { 132 uint32_t bucket; 133 134 bucket = dir_hash(ls, de->name, de->length); 135 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); 136 } 137 138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name, 139 int namelen, uint32_t bucket) 140 { 141 struct dlm_direntry *de; 142 143 list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) { 144 if (de->length == namelen && !memcmp(name, de->name, namelen)) 145 goto out; 146 } 147 de = NULL; 148 out: 149 return de; 150 } 151 152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen) 153 { 154 struct dlm_direntry *de; 155 uint32_t bucket; 156 157 bucket = dir_hash(ls, name, namelen); 158 159 write_lock(&ls->ls_dirtbl[bucket].lock); 160 161 de = search_bucket(ls, name, namelen, bucket); 162 163 if (!de) { 164 log_error(ls, "remove fr %u none", nodeid); 165 goto out; 166 } 167 168 if (de->master_nodeid != nodeid) { 169 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid); 170 goto out; 171 } 172 173 list_del(&de->list); 174 free_direntry(de); 175 out: 176 write_unlock(&ls->ls_dirtbl[bucket].lock); 177 } 178 179 void dlm_dir_clear(struct dlm_ls *ls) 180 { 181 struct list_head *head; 182 struct dlm_direntry *de; 183 int i; 184 185 DLM_ASSERT(list_empty(&ls->ls_recover_list), ); 186 187 for (i = 0; i < ls->ls_dirtbl_size; i++) { 188 write_lock(&ls->ls_dirtbl[i].lock); 189 head = &ls->ls_dirtbl[i].list; 190 while (!list_empty(head)) { 191 de = list_entry(head->next, struct dlm_direntry, list); 192 list_del(&de->list); 193 put_free_de(ls, de); 194 } 195 write_unlock(&ls->ls_dirtbl[i].lock); 196 } 197 } 198 199 int dlm_recover_directory(struct dlm_ls *ls) 200 { 201 struct dlm_member *memb; 202 struct dlm_direntry *de; 203 char *b, *last_name = NULL; 204 int error = -ENOMEM, last_len, count = 0; 205 uint16_t namelen; 206 207 log_debug(ls, "dlm_recover_directory"); 208 209 if (dlm_no_directory(ls)) 210 goto out_status; 211 212 dlm_dir_clear(ls); 213 214 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL); 215 if (!last_name) 216 goto out; 217 218 list_for_each_entry(memb, &ls->ls_nodes, list) { 219 memset(last_name, 0, DLM_RESNAME_MAXLEN); 220 last_len = 0; 221 222 for (;;) { 223 error = dlm_recovery_stopped(ls); 224 if (error) 225 goto out_free; 226 227 error = dlm_rcom_names(ls, memb->nodeid, 228 last_name, last_len); 229 if (error) 230 goto out_free; 231 232 schedule(); 233 234 /* 235 * pick namelen/name pairs out of received buffer 236 */ 237 238 b = ls->ls_recover_buf + sizeof(struct dlm_rcom); 239 240 for (;;) { 241 memcpy(&namelen, b, sizeof(uint16_t)); 242 namelen = be16_to_cpu(namelen); 243 b += sizeof(uint16_t); 244 245 /* namelen of 0xFFFFF marks end of names for 246 this node; namelen of 0 marks end of the 247 buffer */ 248 249 if (namelen == 0xFFFF) 250 goto done; 251 if (!namelen) 252 break; 253 254 error = -ENOMEM; 255 de = get_free_de(ls, namelen); 256 if (!de) 257 goto out_free; 258 259 de->master_nodeid = memb->nodeid; 260 de->length = namelen; 261 last_len = namelen; 262 memcpy(de->name, b, namelen); 263 memcpy(last_name, b, namelen); 264 b += namelen; 265 266 add_entry_to_hash(ls, de); 267 count++; 268 } 269 } 270 done: 271 ; 272 } 273 274 out_status: 275 error = 0; 276 dlm_set_recover_status(ls, DLM_RS_DIR); 277 log_debug(ls, "dlm_recover_directory %d entries", count); 278 out_free: 279 kfree(last_name); 280 out: 281 dlm_clear_free_entries(ls); 282 return error; 283 } 284 285 static int get_entry(struct dlm_ls *ls, int nodeid, char *name, 286 int namelen, int *r_nodeid) 287 { 288 struct dlm_direntry *de, *tmp; 289 uint32_t bucket; 290 291 bucket = dir_hash(ls, name, namelen); 292 293 write_lock(&ls->ls_dirtbl[bucket].lock); 294 de = search_bucket(ls, name, namelen, bucket); 295 if (de) { 296 *r_nodeid = de->master_nodeid; 297 write_unlock(&ls->ls_dirtbl[bucket].lock); 298 if (*r_nodeid == nodeid) 299 return -EEXIST; 300 return 0; 301 } 302 303 write_unlock(&ls->ls_dirtbl[bucket].lock); 304 305 de = allocate_direntry(ls, namelen); 306 if (!de) 307 return -ENOMEM; 308 309 de->master_nodeid = nodeid; 310 de->length = namelen; 311 memcpy(de->name, name, namelen); 312 313 write_lock(&ls->ls_dirtbl[bucket].lock); 314 tmp = search_bucket(ls, name, namelen, bucket); 315 if (tmp) { 316 free_direntry(de); 317 de = tmp; 318 } else { 319 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); 320 } 321 *r_nodeid = de->master_nodeid; 322 write_unlock(&ls->ls_dirtbl[bucket].lock); 323 return 0; 324 } 325 326 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen, 327 int *r_nodeid) 328 { 329 return get_entry(ls, nodeid, name, namelen, r_nodeid); 330 } 331 332 /* Copy the names of master rsb's into the buffer provided. 333 Only select names whose dir node is the given nodeid. */ 334 335 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, 336 char *outbuf, int outlen, int nodeid) 337 { 338 struct list_head *list; 339 struct dlm_rsb *start_r = NULL, *r = NULL; 340 int offset = 0, start_namelen, error, dir_nodeid; 341 char *start_name; 342 uint16_t be_namelen; 343 344 /* 345 * Find the rsb where we left off (or start again) 346 */ 347 348 start_namelen = inlen; 349 start_name = inbuf; 350 351 if (start_namelen > 1) { 352 /* 353 * We could also use a find_rsb_root() function here that 354 * searched the ls_root_list. 355 */ 356 error = dlm_find_rsb(ls, start_name, start_namelen, R_MASTER, 357 &start_r); 358 DLM_ASSERT(!error && start_r, 359 printk("error %d\n", error);); 360 DLM_ASSERT(!list_empty(&start_r->res_root_list), 361 dlm_print_rsb(start_r);); 362 dlm_put_rsb(start_r); 363 } 364 365 /* 366 * Send rsb names for rsb's we're master of and whose directory node 367 * matches the requesting node. 368 */ 369 370 down_read(&ls->ls_root_sem); 371 if (start_r) 372 list = start_r->res_root_list.next; 373 else 374 list = ls->ls_root_list.next; 375 376 for (offset = 0; list != &ls->ls_root_list; list = list->next) { 377 r = list_entry(list, struct dlm_rsb, res_root_list); 378 if (r->res_nodeid) 379 continue; 380 381 dir_nodeid = dlm_dir_nodeid(r); 382 if (dir_nodeid != nodeid) 383 continue; 384 385 /* 386 * The block ends when we can't fit the following in the 387 * remaining buffer space: 388 * namelen (uint16_t) + 389 * name (r->res_length) + 390 * end-of-block record 0x0000 (uint16_t) 391 */ 392 393 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) { 394 /* Write end-of-block record */ 395 be_namelen = 0; 396 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t)); 397 offset += sizeof(uint16_t); 398 goto out; 399 } 400 401 be_namelen = cpu_to_be16(r->res_length); 402 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t)); 403 offset += sizeof(uint16_t); 404 memcpy(outbuf + offset, r->res_name, r->res_length); 405 offset += r->res_length; 406 } 407 408 /* 409 * If we've reached the end of the list (and there's room) write a 410 * terminating record. 411 */ 412 413 if ((list == &ls->ls_root_list) && 414 (offset + sizeof(uint16_t) <= outlen)) { 415 be_namelen = 0xFFFF; 416 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t)); 417 offset += sizeof(uint16_t); 418 } 419 420 out: 421 up_read(&ls->ls_root_sem); 422 } 423 424