1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) 2005-2009 Red Hat, Inc. All rights reserved. 5 ** 6 ** This copyrighted material is made available to anyone wishing to use, 7 ** modify, copy, or redistribute it subject to the terms and conditions 8 ** of the GNU General Public License v.2. 9 ** 10 ******************************************************************************* 11 ******************************************************************************/ 12 13 #include "dlm_internal.h" 14 #include "lockspace.h" 15 #include "member.h" 16 #include "recoverd.h" 17 #include "recover.h" 18 #include "rcom.h" 19 #include "config.h" 20 #include "lowcomms.h" 21 22 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) 23 { 24 struct dlm_member *memb = NULL; 25 struct list_head *tmp; 26 struct list_head *newlist = &new->list; 27 struct list_head *head = &ls->ls_nodes; 28 29 list_for_each(tmp, head) { 30 memb = list_entry(tmp, struct dlm_member, list); 31 if (new->nodeid < memb->nodeid) 32 break; 33 } 34 35 if (!memb) 36 list_add_tail(newlist, head); 37 else { 38 /* FIXME: can use list macro here */ 39 newlist->prev = tmp->prev; 40 newlist->next = tmp; 41 tmp->prev->next = newlist; 42 tmp->prev = newlist; 43 } 44 } 45 46 static int dlm_add_member(struct dlm_ls *ls, int nodeid) 47 { 48 struct dlm_member *memb; 49 int w, error; 50 51 memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS); 52 if (!memb) 53 return -ENOMEM; 54 55 w = dlm_node_weight(ls->ls_name, nodeid); 56 if (w < 0) { 57 kfree(memb); 58 return w; 59 } 60 61 error = dlm_lowcomms_connect_node(nodeid); 62 if (error < 0) { 63 kfree(memb); 64 return error; 65 } 66 67 memb->nodeid = nodeid; 68 memb->weight = w; 69 add_ordered_member(ls, memb); 70 ls->ls_num_nodes++; 71 return 0; 72 } 73 74 static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb) 75 { 76 list_move(&memb->list, &ls->ls_nodes_gone); 77 ls->ls_num_nodes--; 78 } 79 80 int dlm_is_member(struct dlm_ls *ls, int nodeid) 81 { 82 struct dlm_member *memb; 83 84 list_for_each_entry(memb, &ls->ls_nodes, list) { 85 if (memb->nodeid == nodeid) 86 return 1; 87 } 88 return 0; 89 } 90 91 int dlm_is_removed(struct dlm_ls *ls, int nodeid) 92 { 93 struct dlm_member *memb; 94 95 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 96 if (memb->nodeid == nodeid) 97 return 1; 98 } 99 return 0; 100 } 101 102 static void clear_memb_list(struct list_head *head) 103 { 104 struct dlm_member *memb; 105 106 while (!list_empty(head)) { 107 memb = list_entry(head->next, struct dlm_member, list); 108 list_del(&memb->list); 109 kfree(memb); 110 } 111 } 112 113 void dlm_clear_members(struct dlm_ls *ls) 114 { 115 clear_memb_list(&ls->ls_nodes); 116 ls->ls_num_nodes = 0; 117 } 118 119 void dlm_clear_members_gone(struct dlm_ls *ls) 120 { 121 clear_memb_list(&ls->ls_nodes_gone); 122 } 123 124 static void make_member_array(struct dlm_ls *ls) 125 { 126 struct dlm_member *memb; 127 int i, w, x = 0, total = 0, all_zero = 0, *array; 128 129 kfree(ls->ls_node_array); 130 ls->ls_node_array = NULL; 131 132 list_for_each_entry(memb, &ls->ls_nodes, list) { 133 if (memb->weight) 134 total += memb->weight; 135 } 136 137 /* all nodes revert to weight of 1 if all have weight 0 */ 138 139 if (!total) { 140 total = ls->ls_num_nodes; 141 all_zero = 1; 142 } 143 144 ls->ls_total_weight = total; 145 146 array = kmalloc(sizeof(int) * total, GFP_NOFS); 147 if (!array) 148 return; 149 150 list_for_each_entry(memb, &ls->ls_nodes, list) { 151 if (!all_zero && !memb->weight) 152 continue; 153 154 if (all_zero) 155 w = 1; 156 else 157 w = memb->weight; 158 159 DLM_ASSERT(x < total, printk("total %d x %d\n", total, x);); 160 161 for (i = 0; i < w; i++) 162 array[x++] = memb->nodeid; 163 } 164 165 ls->ls_node_array = array; 166 } 167 168 /* send a status request to all members just to establish comms connections */ 169 170 static int ping_members(struct dlm_ls *ls) 171 { 172 struct dlm_member *memb; 173 int error = 0; 174 175 list_for_each_entry(memb, &ls->ls_nodes, list) { 176 error = dlm_recovery_stopped(ls); 177 if (error) 178 break; 179 error = dlm_rcom_status(ls, memb->nodeid); 180 if (error) 181 break; 182 } 183 if (error) 184 log_debug(ls, "ping_members aborted %d last nodeid %d", 185 error, ls->ls_recover_nodeid); 186 return error; 187 } 188 189 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) 190 { 191 struct dlm_member *memb, *safe; 192 int i, error, found, pos = 0, neg = 0, low = -1; 193 194 /* previously removed members that we've not finished removing need to 195 count as a negative change so the "neg" recovery steps will happen */ 196 197 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 198 log_debug(ls, "prev removed member %d", memb->nodeid); 199 neg++; 200 } 201 202 /* move departed members from ls_nodes to ls_nodes_gone */ 203 204 list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) { 205 found = 0; 206 for (i = 0; i < rv->node_count; i++) { 207 if (memb->nodeid == rv->nodeids[i]) { 208 found = 1; 209 break; 210 } 211 } 212 213 if (!found) { 214 neg++; 215 dlm_remove_member(ls, memb); 216 log_debug(ls, "remove member %d", memb->nodeid); 217 } 218 } 219 220 /* Add an entry to ls_nodes_gone for members that were removed and 221 then added again, so that previous state for these nodes will be 222 cleared during recovery. */ 223 224 for (i = 0; i < rv->new_count; i++) { 225 if (!dlm_is_member(ls, rv->new[i])) 226 continue; 227 log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]); 228 229 memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS); 230 if (!memb) 231 return -ENOMEM; 232 memb->nodeid = rv->new[i]; 233 list_add_tail(&memb->list, &ls->ls_nodes_gone); 234 neg++; 235 } 236 237 /* add new members to ls_nodes */ 238 239 for (i = 0; i < rv->node_count; i++) { 240 if (dlm_is_member(ls, rv->nodeids[i])) 241 continue; 242 dlm_add_member(ls, rv->nodeids[i]); 243 pos++; 244 log_debug(ls, "add member %d", rv->nodeids[i]); 245 } 246 247 list_for_each_entry(memb, &ls->ls_nodes, list) { 248 if (low == -1 || memb->nodeid < low) 249 low = memb->nodeid; 250 } 251 ls->ls_low_nodeid = low; 252 253 make_member_array(ls); 254 dlm_set_recover_status(ls, DLM_RS_NODES); 255 *neg_out = neg; 256 257 error = ping_members(ls); 258 if (!error || error == -EPROTO) { 259 /* new_lockspace() may be waiting to know if the config 260 is good or bad */ 261 ls->ls_members_result = error; 262 complete(&ls->ls_members_done); 263 } 264 if (error) 265 goto out; 266 267 error = dlm_recover_members_wait(ls); 268 out: 269 log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error); 270 return error; 271 } 272 273 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before 274 dlm_ls_start() is called on any of them to start the new recovery. */ 275 276 int dlm_ls_stop(struct dlm_ls *ls) 277 { 278 int new; 279 280 /* 281 * Prevent dlm_recv from being in the middle of something when we do 282 * the stop. This includes ensuring dlm_recv isn't processing a 283 * recovery message (rcom), while dlm_recoverd is aborting and 284 * resetting things from an in-progress recovery. i.e. we want 285 * dlm_recoverd to abort its recovery without worrying about dlm_recv 286 * processing an rcom at the same time. Stopping dlm_recv also makes 287 * it easy for dlm_receive_message() to check locking stopped and add a 288 * message to the requestqueue without races. 289 */ 290 291 down_write(&ls->ls_recv_active); 292 293 /* 294 * Abort any recovery that's in progress (see RECOVERY_STOP, 295 * dlm_recovery_stopped()) and tell any other threads running in the 296 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()). 297 */ 298 299 spin_lock(&ls->ls_recover_lock); 300 set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 301 new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); 302 ls->ls_recover_seq++; 303 spin_unlock(&ls->ls_recover_lock); 304 305 /* 306 * Let dlm_recv run again, now any normal messages will be saved on the 307 * requestqueue for later. 308 */ 309 310 up_write(&ls->ls_recv_active); 311 312 /* 313 * This in_recovery lock does two things: 314 * 1) Keeps this function from returning until all threads are out 315 * of locking routines and locking is truly stopped. 316 * 2) Keeps any new requests from being processed until it's unlocked 317 * when recovery is complete. 318 */ 319 320 if (new) 321 down_write(&ls->ls_in_recovery); 322 323 /* 324 * The recoverd suspend/resume makes sure that dlm_recoverd (if 325 * running) has noticed RECOVERY_STOP above and quit processing the 326 * previous recovery. 327 */ 328 329 dlm_recoverd_suspend(ls); 330 ls->ls_recover_status = 0; 331 dlm_recoverd_resume(ls); 332 333 if (!ls->ls_recover_begin) 334 ls->ls_recover_begin = jiffies; 335 return 0; 336 } 337 338 int dlm_ls_start(struct dlm_ls *ls) 339 { 340 struct dlm_recover *rv = NULL, *rv_old; 341 int *ids = NULL, *new = NULL; 342 int error, ids_count = 0, new_count = 0; 343 344 rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS); 345 if (!rv) 346 return -ENOMEM; 347 348 error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count, 349 &new, &new_count); 350 if (error < 0) 351 goto fail; 352 353 spin_lock(&ls->ls_recover_lock); 354 355 /* the lockspace needs to be stopped before it can be started */ 356 357 if (!dlm_locking_stopped(ls)) { 358 spin_unlock(&ls->ls_recover_lock); 359 log_error(ls, "start ignored: lockspace running"); 360 error = -EINVAL; 361 goto fail; 362 } 363 364 rv->nodeids = ids; 365 rv->node_count = ids_count; 366 rv->new = new; 367 rv->new_count = new_count; 368 rv->seq = ++ls->ls_recover_seq; 369 rv_old = ls->ls_recover_args; 370 ls->ls_recover_args = rv; 371 spin_unlock(&ls->ls_recover_lock); 372 373 if (rv_old) { 374 log_error(ls, "unused recovery %llx %d", 375 (unsigned long long)rv_old->seq, rv_old->node_count); 376 kfree(rv_old->nodeids); 377 kfree(rv_old->new); 378 kfree(rv_old); 379 } 380 381 dlm_recoverd_kick(ls); 382 return 0; 383 384 fail: 385 kfree(rv); 386 kfree(ids); 387 kfree(new); 388 return error; 389 } 390 391