1 /* 2 * Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 3 * Copyright 2004-2011 Red Hat, Inc. 4 * 5 * This copyrighted material is made available to anyone wishing to use, 6 * modify, copy, or redistribute it subject to the terms and conditions 7 * of the GNU General Public License version 2. 8 */ 9 10 #include <linux/fs.h> 11 #include <linux/dlm.h> 12 #include <linux/slab.h> 13 #include <linux/types.h> 14 #include <linux/delay.h> 15 #include <linux/gfs2_ondisk.h> 16 17 #include "incore.h" 18 #include "glock.h" 19 #include "util.h" 20 #include "sys.h" 21 22 extern struct workqueue_struct *gfs2_control_wq; 23 24 static void gdlm_ast(void *arg) 25 { 26 struct gfs2_glock *gl = arg; 27 unsigned ret = gl->gl_state; 28 29 BUG_ON(gl->gl_lksb.sb_flags & DLM_SBF_DEMOTED); 30 31 if (gl->gl_lksb.sb_flags & DLM_SBF_VALNOTVALID) 32 memset(gl->gl_lvb, 0, GDLM_LVB_SIZE); 33 34 switch (gl->gl_lksb.sb_status) { 35 case -DLM_EUNLOCK: /* Unlocked, so glock can be freed */ 36 gfs2_glock_free(gl); 37 return; 38 case -DLM_ECANCEL: /* Cancel while getting lock */ 39 ret |= LM_OUT_CANCELED; 40 goto out; 41 case -EAGAIN: /* Try lock fails */ 42 case -EDEADLK: /* Deadlock detected */ 43 goto out; 44 case -ETIMEDOUT: /* Canceled due to timeout */ 45 ret |= LM_OUT_ERROR; 46 goto out; 47 case 0: /* Success */ 48 break; 49 default: /* Something unexpected */ 50 BUG(); 51 } 52 53 ret = gl->gl_req; 54 if (gl->gl_lksb.sb_flags & DLM_SBF_ALTMODE) { 55 if (gl->gl_req == LM_ST_SHARED) 56 ret = LM_ST_DEFERRED; 57 else if (gl->gl_req == LM_ST_DEFERRED) 58 ret = LM_ST_SHARED; 59 else 60 BUG(); 61 } 62 63 set_bit(GLF_INITIAL, &gl->gl_flags); 64 gfs2_glock_complete(gl, ret); 65 return; 66 out: 67 if (!test_bit(GLF_INITIAL, &gl->gl_flags)) 68 gl->gl_lksb.sb_lkid = 0; 69 gfs2_glock_complete(gl, ret); 70 } 71 72 static void gdlm_bast(void *arg, int mode) 73 { 74 struct gfs2_glock *gl = arg; 75 76 switch (mode) { 77 case DLM_LOCK_EX: 78 gfs2_glock_cb(gl, LM_ST_UNLOCKED); 79 break; 80 case DLM_LOCK_CW: 81 gfs2_glock_cb(gl, LM_ST_DEFERRED); 82 break; 83 case DLM_LOCK_PR: 84 gfs2_glock_cb(gl, LM_ST_SHARED); 85 break; 86 default: 87 printk(KERN_ERR "unknown bast mode %d", mode); 88 BUG(); 89 } 90 } 91 92 /* convert gfs lock-state to dlm lock-mode */ 93 94 static int make_mode(const unsigned int lmstate) 95 { 96 switch (lmstate) { 97 case LM_ST_UNLOCKED: 98 return DLM_LOCK_NL; 99 case LM_ST_EXCLUSIVE: 100 return DLM_LOCK_EX; 101 case LM_ST_DEFERRED: 102 return DLM_LOCK_CW; 103 case LM_ST_SHARED: 104 return DLM_LOCK_PR; 105 } 106 printk(KERN_ERR "unknown LM state %d", lmstate); 107 BUG(); 108 return -1; 109 } 110 111 static u32 make_flags(const u32 lkid, const unsigned int gfs_flags, 112 const int req) 113 { 114 u32 lkf = 0; 115 116 if (gfs_flags & LM_FLAG_TRY) 117 lkf |= DLM_LKF_NOQUEUE; 118 119 if (gfs_flags & LM_FLAG_TRY_1CB) { 120 lkf |= DLM_LKF_NOQUEUE; 121 lkf |= DLM_LKF_NOQUEUEBAST; 122 } 123 124 if (gfs_flags & LM_FLAG_PRIORITY) { 125 lkf |= DLM_LKF_NOORDER; 126 lkf |= DLM_LKF_HEADQUE; 127 } 128 129 if (gfs_flags & LM_FLAG_ANY) { 130 if (req == DLM_LOCK_PR) 131 lkf |= DLM_LKF_ALTCW; 132 else if (req == DLM_LOCK_CW) 133 lkf |= DLM_LKF_ALTPR; 134 else 135 BUG(); 136 } 137 138 if (lkid != 0) 139 lkf |= DLM_LKF_CONVERT; 140 141 lkf |= DLM_LKF_VALBLK; 142 143 return lkf; 144 } 145 146 static int gdlm_lock(struct gfs2_glock *gl, unsigned int req_state, 147 unsigned int flags) 148 { 149 struct lm_lockstruct *ls = &gl->gl_sbd->sd_lockstruct; 150 int req; 151 u32 lkf; 152 153 req = make_mode(req_state); 154 lkf = make_flags(gl->gl_lksb.sb_lkid, flags, req); 155 156 /* 157 * Submit the actual lock request. 158 */ 159 160 return dlm_lock(ls->ls_dlm, req, &gl->gl_lksb, lkf, gl->gl_strname, 161 GDLM_STRNAME_BYTES - 1, 0, gdlm_ast, gl, gdlm_bast); 162 } 163 164 static void gdlm_put_lock(struct gfs2_glock *gl) 165 { 166 struct gfs2_sbd *sdp = gl->gl_sbd; 167 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 168 int error; 169 170 if (gl->gl_lksb.sb_lkid == 0) { 171 gfs2_glock_free(gl); 172 return; 173 } 174 175 error = dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_VALBLK, 176 NULL, gl); 177 if (error) { 178 printk(KERN_ERR "gdlm_unlock %x,%llx err=%d\n", 179 gl->gl_name.ln_type, 180 (unsigned long long)gl->gl_name.ln_number, error); 181 return; 182 } 183 } 184 185 static void gdlm_cancel(struct gfs2_glock *gl) 186 { 187 struct lm_lockstruct *ls = &gl->gl_sbd->sd_lockstruct; 188 dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_CANCEL, NULL, gl); 189 } 190 191 /* 192 * dlm/gfs2 recovery coordination using dlm_recover callbacks 193 * 194 * 1. dlm_controld sees lockspace members change 195 * 2. dlm_controld blocks dlm-kernel locking activity 196 * 3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep) 197 * 4. dlm_controld starts and finishes its own user level recovery 198 * 5. dlm_controld starts dlm-kernel dlm_recoverd to do kernel recovery 199 * 6. dlm_recoverd notifies gfs2 of failed nodes (recover_slot) 200 * 7. dlm_recoverd does its own lock recovery 201 * 8. dlm_recoverd unblocks dlm-kernel locking activity 202 * 9. dlm_recoverd notifies gfs2 when done (recover_done with new generation) 203 * 10. gfs2_control updates control_lock lvb with new generation and jid bits 204 * 11. gfs2_control enqueues journals for gfs2_recover to recover (maybe none) 205 * 12. gfs2_recover dequeues and recovers journals of failed nodes 206 * 13. gfs2_recover provides recovery results to gfs2_control (recovery_result) 207 * 14. gfs2_control updates control_lock lvb jid bits for recovered journals 208 * 15. gfs2_control unblocks normal locking when all journals are recovered 209 * 210 * - failures during recovery 211 * 212 * recover_prep() may set BLOCK_LOCKS (step 3) again before gfs2_control 213 * clears BLOCK_LOCKS (step 15), e.g. another node fails while still 214 * recovering for a prior failure. gfs2_control needs a way to detect 215 * this so it can leave BLOCK_LOCKS set in step 15. This is managed using 216 * the recover_block and recover_start values. 217 * 218 * recover_done() provides a new lockspace generation number each time it 219 * is called (step 9). This generation number is saved as recover_start. 220 * When recover_prep() is called, it sets BLOCK_LOCKS and sets 221 * recover_block = recover_start. So, while recover_block is equal to 222 * recover_start, BLOCK_LOCKS should remain set. (recover_spin must 223 * be held around the BLOCK_LOCKS/recover_block/recover_start logic.) 224 * 225 * - more specific gfs2 steps in sequence above 226 * 227 * 3. recover_prep sets BLOCK_LOCKS and sets recover_block = recover_start 228 * 6. recover_slot records any failed jids (maybe none) 229 * 9. recover_done sets recover_start = new generation number 230 * 10. gfs2_control sets control_lock lvb = new gen + bits for failed jids 231 * 12. gfs2_recover does journal recoveries for failed jids identified above 232 * 14. gfs2_control clears control_lock lvb bits for recovered jids 233 * 15. gfs2_control checks if recover_block == recover_start (step 3 occured 234 * again) then do nothing, otherwise if recover_start > recover_block 235 * then clear BLOCK_LOCKS. 236 * 237 * - parallel recovery steps across all nodes 238 * 239 * All nodes attempt to update the control_lock lvb with the new generation 240 * number and jid bits, but only the first to get the control_lock EX will 241 * do so; others will see that it's already done (lvb already contains new 242 * generation number.) 243 * 244 * . All nodes get the same recover_prep/recover_slot/recover_done callbacks 245 * . All nodes attempt to set control_lock lvb gen + bits for the new gen 246 * . One node gets control_lock first and writes the lvb, others see it's done 247 * . All nodes attempt to recover jids for which they see control_lock bits set 248 * . One node succeeds for a jid, and that one clears the jid bit in the lvb 249 * . All nodes will eventually see all lvb bits clear and unblock locks 250 * 251 * - is there a problem with clearing an lvb bit that should be set 252 * and missing a journal recovery? 253 * 254 * 1. jid fails 255 * 2. lvb bit set for step 1 256 * 3. jid recovered for step 1 257 * 4. jid taken again (new mount) 258 * 5. jid fails (for step 4) 259 * 6. lvb bit set for step 5 (will already be set) 260 * 7. lvb bit cleared for step 3 261 * 262 * This is not a problem because the failure in step 5 does not 263 * require recovery, because the mount in step 4 could not have 264 * progressed far enough to unblock locks and access the fs. The 265 * control_mount() function waits for all recoveries to be complete 266 * for the latest lockspace generation before ever unblocking locks 267 * and returning. The mount in step 4 waits until the recovery in 268 * step 1 is done. 269 * 270 * - special case of first mounter: first node to mount the fs 271 * 272 * The first node to mount a gfs2 fs needs to check all the journals 273 * and recover any that need recovery before other nodes are allowed 274 * to mount the fs. (Others may begin mounting, but they must wait 275 * for the first mounter to be done before taking locks on the fs 276 * or accessing the fs.) This has two parts: 277 * 278 * 1. The mounted_lock tells a node it's the first to mount the fs. 279 * Each node holds the mounted_lock in PR while it's mounted. 280 * Each node tries to acquire the mounted_lock in EX when it mounts. 281 * If a node is granted the mounted_lock EX it means there are no 282 * other mounted nodes (no PR locks exist), and it is the first mounter. 283 * The mounted_lock is demoted to PR when first recovery is done, so 284 * others will fail to get an EX lock, but will get a PR lock. 285 * 286 * 2. The control_lock blocks others in control_mount() while the first 287 * mounter is doing first mount recovery of all journals. 288 * A mounting node needs to acquire control_lock in EX mode before 289 * it can proceed. The first mounter holds control_lock in EX while doing 290 * the first mount recovery, blocking mounts from other nodes, then demotes 291 * control_lock to NL when it's done (others_may_mount/first_done), 292 * allowing other nodes to continue mounting. 293 * 294 * first mounter: 295 * control_lock EX/NOQUEUE success 296 * mounted_lock EX/NOQUEUE success (no other PR, so no other mounters) 297 * set first=1 298 * do first mounter recovery 299 * mounted_lock EX->PR 300 * control_lock EX->NL, write lvb generation 301 * 302 * other mounter: 303 * control_lock EX/NOQUEUE success (if fail -EAGAIN, retry) 304 * mounted_lock EX/NOQUEUE fail -EAGAIN (expected due to other mounters PR) 305 * mounted_lock PR/NOQUEUE success 306 * read lvb generation 307 * control_lock EX->NL 308 * set first=0 309 * 310 * - mount during recovery 311 * 312 * If a node mounts while others are doing recovery (not first mounter), 313 * the mounting node will get its initial recover_done() callback without 314 * having seen any previous failures/callbacks. 315 * 316 * It must wait for all recoveries preceding its mount to be finished 317 * before it unblocks locks. It does this by repeating the "other mounter" 318 * steps above until the lvb generation number is >= its mount generation 319 * number (from initial recover_done) and all lvb bits are clear. 320 * 321 * - control_lock lvb format 322 * 323 * 4 bytes generation number: the latest dlm lockspace generation number 324 * from recover_done callback. Indicates the jid bitmap has been updated 325 * to reflect all slot failures through that generation. 326 * 4 bytes unused. 327 * GDLM_LVB_SIZE-8 bytes of jid bit map. If bit N is set, it indicates 328 * that jid N needs recovery. 329 */ 330 331 #define JID_BITMAP_OFFSET 8 /* 4 byte generation number + 4 byte unused */ 332 333 static void control_lvb_read(struct lm_lockstruct *ls, uint32_t *lvb_gen, 334 char *lvb_bits) 335 { 336 uint32_t gen; 337 memcpy(lvb_bits, ls->ls_control_lvb, GDLM_LVB_SIZE); 338 memcpy(&gen, lvb_bits, sizeof(uint32_t)); 339 *lvb_gen = le32_to_cpu(gen); 340 } 341 342 static void control_lvb_write(struct lm_lockstruct *ls, uint32_t lvb_gen, 343 char *lvb_bits) 344 { 345 uint32_t gen; 346 memcpy(ls->ls_control_lvb, lvb_bits, GDLM_LVB_SIZE); 347 gen = cpu_to_le32(lvb_gen); 348 memcpy(ls->ls_control_lvb, &gen, sizeof(uint32_t)); 349 } 350 351 static int all_jid_bits_clear(char *lvb) 352 { 353 int i; 354 for (i = JID_BITMAP_OFFSET; i < GDLM_LVB_SIZE; i++) { 355 if (lvb[i]) 356 return 0; 357 } 358 return 1; 359 } 360 361 static void sync_wait_cb(void *arg) 362 { 363 struct lm_lockstruct *ls = arg; 364 complete(&ls->ls_sync_wait); 365 } 366 367 static int sync_unlock(struct gfs2_sbd *sdp, struct dlm_lksb *lksb, char *name) 368 { 369 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 370 int error; 371 372 error = dlm_unlock(ls->ls_dlm, lksb->sb_lkid, 0, lksb, ls); 373 if (error) { 374 fs_err(sdp, "%s lkid %x error %d\n", 375 name, lksb->sb_lkid, error); 376 return error; 377 } 378 379 wait_for_completion(&ls->ls_sync_wait); 380 381 if (lksb->sb_status != -DLM_EUNLOCK) { 382 fs_err(sdp, "%s lkid %x status %d\n", 383 name, lksb->sb_lkid, lksb->sb_status); 384 return -1; 385 } 386 return 0; 387 } 388 389 static int sync_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags, 390 unsigned int num, struct dlm_lksb *lksb, char *name) 391 { 392 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 393 char strname[GDLM_STRNAME_BYTES]; 394 int error, status; 395 396 memset(strname, 0, GDLM_STRNAME_BYTES); 397 snprintf(strname, GDLM_STRNAME_BYTES, "%8x%16x", LM_TYPE_NONDISK, num); 398 399 error = dlm_lock(ls->ls_dlm, mode, lksb, flags, 400 strname, GDLM_STRNAME_BYTES - 1, 401 0, sync_wait_cb, ls, NULL); 402 if (error) { 403 fs_err(sdp, "%s lkid %x flags %x mode %d error %d\n", 404 name, lksb->sb_lkid, flags, mode, error); 405 return error; 406 } 407 408 wait_for_completion(&ls->ls_sync_wait); 409 410 status = lksb->sb_status; 411 412 if (status && status != -EAGAIN) { 413 fs_err(sdp, "%s lkid %x flags %x mode %d status %d\n", 414 name, lksb->sb_lkid, flags, mode, status); 415 } 416 417 return status; 418 } 419 420 static int mounted_unlock(struct gfs2_sbd *sdp) 421 { 422 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 423 return sync_unlock(sdp, &ls->ls_mounted_lksb, "mounted_lock"); 424 } 425 426 static int mounted_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags) 427 { 428 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 429 return sync_lock(sdp, mode, flags, GFS2_MOUNTED_LOCK, 430 &ls->ls_mounted_lksb, "mounted_lock"); 431 } 432 433 static int control_unlock(struct gfs2_sbd *sdp) 434 { 435 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 436 return sync_unlock(sdp, &ls->ls_control_lksb, "control_lock"); 437 } 438 439 static int control_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags) 440 { 441 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 442 return sync_lock(sdp, mode, flags, GFS2_CONTROL_LOCK, 443 &ls->ls_control_lksb, "control_lock"); 444 } 445 446 static void gfs2_control_func(struct work_struct *work) 447 { 448 struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, sd_control_work.work); 449 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 450 char lvb_bits[GDLM_LVB_SIZE]; 451 uint32_t block_gen, start_gen, lvb_gen, flags; 452 int recover_set = 0; 453 int write_lvb = 0; 454 int recover_size; 455 int i, error; 456 457 spin_lock(&ls->ls_recover_spin); 458 /* 459 * No MOUNT_DONE means we're still mounting; control_mount() 460 * will set this flag, after which this thread will take over 461 * all further clearing of BLOCK_LOCKS. 462 * 463 * FIRST_MOUNT means this node is doing first mounter recovery, 464 * for which recovery control is handled by 465 * control_mount()/control_first_done(), not this thread. 466 */ 467 if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) || 468 test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) { 469 spin_unlock(&ls->ls_recover_spin); 470 return; 471 } 472 block_gen = ls->ls_recover_block; 473 start_gen = ls->ls_recover_start; 474 spin_unlock(&ls->ls_recover_spin); 475 476 /* 477 * Equal block_gen and start_gen implies we are between 478 * recover_prep and recover_done callbacks, which means 479 * dlm recovery is in progress and dlm locking is blocked. 480 * There's no point trying to do any work until recover_done. 481 */ 482 483 if (block_gen == start_gen) 484 return; 485 486 /* 487 * Propagate recover_submit[] and recover_result[] to lvb: 488 * dlm_recoverd adds to recover_submit[] jids needing recovery 489 * gfs2_recover adds to recover_result[] journal recovery results 490 * 491 * set lvb bit for jids in recover_submit[] if the lvb has not 492 * yet been updated for the generation of the failure 493 * 494 * clear lvb bit for jids in recover_result[] if the result of 495 * the journal recovery is SUCCESS 496 */ 497 498 error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_VALBLK); 499 if (error) { 500 fs_err(sdp, "control lock EX error %d\n", error); 501 return; 502 } 503 504 control_lvb_read(ls, &lvb_gen, lvb_bits); 505 506 spin_lock(&ls->ls_recover_spin); 507 if (block_gen != ls->ls_recover_block || 508 start_gen != ls->ls_recover_start) { 509 fs_info(sdp, "recover generation %u block1 %u %u\n", 510 start_gen, block_gen, ls->ls_recover_block); 511 spin_unlock(&ls->ls_recover_spin); 512 control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT); 513 return; 514 } 515 516 recover_size = ls->ls_recover_size; 517 518 if (lvb_gen <= start_gen) { 519 /* 520 * Clear lvb bits for jids we've successfully recovered. 521 * Because all nodes attempt to recover failed journals, 522 * a journal can be recovered multiple times successfully 523 * in succession. Only the first will really do recovery, 524 * the others find it clean, but still report a successful 525 * recovery. So, another node may have already recovered 526 * the jid and cleared the lvb bit for it. 527 */ 528 for (i = 0; i < recover_size; i++) { 529 if (ls->ls_recover_result[i] != LM_RD_SUCCESS) 530 continue; 531 532 ls->ls_recover_result[i] = 0; 533 534 if (!test_bit_le(i, lvb_bits + JID_BITMAP_OFFSET)) 535 continue; 536 537 __clear_bit_le(i, lvb_bits + JID_BITMAP_OFFSET); 538 write_lvb = 1; 539 } 540 } 541 542 if (lvb_gen == start_gen) { 543 /* 544 * Failed slots before start_gen are already set in lvb. 545 */ 546 for (i = 0; i < recover_size; i++) { 547 if (!ls->ls_recover_submit[i]) 548 continue; 549 if (ls->ls_recover_submit[i] < lvb_gen) 550 ls->ls_recover_submit[i] = 0; 551 } 552 } else if (lvb_gen < start_gen) { 553 /* 554 * Failed slots before start_gen are not yet set in lvb. 555 */ 556 for (i = 0; i < recover_size; i++) { 557 if (!ls->ls_recover_submit[i]) 558 continue; 559 if (ls->ls_recover_submit[i] < start_gen) { 560 ls->ls_recover_submit[i] = 0; 561 __set_bit_le(i, lvb_bits + JID_BITMAP_OFFSET); 562 } 563 } 564 /* even if there are no bits to set, we need to write the 565 latest generation to the lvb */ 566 write_lvb = 1; 567 } else { 568 /* 569 * we should be getting a recover_done() for lvb_gen soon 570 */ 571 } 572 spin_unlock(&ls->ls_recover_spin); 573 574 if (write_lvb) { 575 control_lvb_write(ls, start_gen, lvb_bits); 576 flags = DLM_LKF_CONVERT | DLM_LKF_VALBLK; 577 } else { 578 flags = DLM_LKF_CONVERT; 579 } 580 581 error = control_lock(sdp, DLM_LOCK_NL, flags); 582 if (error) { 583 fs_err(sdp, "control lock NL error %d\n", error); 584 return; 585 } 586 587 /* 588 * Everyone will see jid bits set in the lvb, run gfs2_recover_set(), 589 * and clear a jid bit in the lvb if the recovery is a success. 590 * Eventually all journals will be recovered, all jid bits will 591 * be cleared in the lvb, and everyone will clear BLOCK_LOCKS. 592 */ 593 594 for (i = 0; i < recover_size; i++) { 595 if (test_bit_le(i, lvb_bits + JID_BITMAP_OFFSET)) { 596 fs_info(sdp, "recover generation %u jid %d\n", 597 start_gen, i); 598 gfs2_recover_set(sdp, i); 599 recover_set++; 600 } 601 } 602 if (recover_set) 603 return; 604 605 /* 606 * No more jid bits set in lvb, all recovery is done, unblock locks 607 * (unless a new recover_prep callback has occured blocking locks 608 * again while working above) 609 */ 610 611 spin_lock(&ls->ls_recover_spin); 612 if (ls->ls_recover_block == block_gen && 613 ls->ls_recover_start == start_gen) { 614 clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 615 spin_unlock(&ls->ls_recover_spin); 616 fs_info(sdp, "recover generation %u done\n", start_gen); 617 gfs2_glock_thaw(sdp); 618 } else { 619 fs_info(sdp, "recover generation %u block2 %u %u\n", 620 start_gen, block_gen, ls->ls_recover_block); 621 spin_unlock(&ls->ls_recover_spin); 622 } 623 } 624 625 static int control_mount(struct gfs2_sbd *sdp) 626 { 627 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 628 char lvb_bits[GDLM_LVB_SIZE]; 629 uint32_t start_gen, block_gen, mount_gen, lvb_gen; 630 int mounted_mode; 631 int retries = 0; 632 int error; 633 634 memset(&ls->ls_mounted_lksb, 0, sizeof(struct dlm_lksb)); 635 memset(&ls->ls_control_lksb, 0, sizeof(struct dlm_lksb)); 636 memset(&ls->ls_control_lvb, 0, GDLM_LVB_SIZE); 637 ls->ls_control_lksb.sb_lvbptr = ls->ls_control_lvb; 638 init_completion(&ls->ls_sync_wait); 639 640 set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 641 642 error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_VALBLK); 643 if (error) { 644 fs_err(sdp, "control_mount control_lock NL error %d\n", error); 645 return error; 646 } 647 648 error = mounted_lock(sdp, DLM_LOCK_NL, 0); 649 if (error) { 650 fs_err(sdp, "control_mount mounted_lock NL error %d\n", error); 651 control_unlock(sdp); 652 return error; 653 } 654 mounted_mode = DLM_LOCK_NL; 655 656 restart: 657 if (retries++ && signal_pending(current)) { 658 error = -EINTR; 659 goto fail; 660 } 661 662 /* 663 * We always start with both locks in NL. control_lock is 664 * demoted to NL below so we don't need to do it here. 665 */ 666 667 if (mounted_mode != DLM_LOCK_NL) { 668 error = mounted_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT); 669 if (error) 670 goto fail; 671 mounted_mode = DLM_LOCK_NL; 672 } 673 674 /* 675 * Other nodes need to do some work in dlm recovery and gfs2_control 676 * before the recover_done and control_lock will be ready for us below. 677 * A delay here is not required but often avoids having to retry. 678 */ 679 680 msleep_interruptible(500); 681 682 /* 683 * Acquire control_lock in EX and mounted_lock in either EX or PR. 684 * control_lock lvb keeps track of any pending journal recoveries. 685 * mounted_lock indicates if any other nodes have the fs mounted. 686 */ 687 688 error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE|DLM_LKF_VALBLK); 689 if (error == -EAGAIN) { 690 goto restart; 691 } else if (error) { 692 fs_err(sdp, "control_mount control_lock EX error %d\n", error); 693 goto fail; 694 } 695 696 error = mounted_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE); 697 if (!error) { 698 mounted_mode = DLM_LOCK_EX; 699 goto locks_done; 700 } else if (error != -EAGAIN) { 701 fs_err(sdp, "control_mount mounted_lock EX error %d\n", error); 702 goto fail; 703 } 704 705 error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE); 706 if (!error) { 707 mounted_mode = DLM_LOCK_PR; 708 goto locks_done; 709 } else { 710 /* not even -EAGAIN should happen here */ 711 fs_err(sdp, "control_mount mounted_lock PR error %d\n", error); 712 goto fail; 713 } 714 715 locks_done: 716 /* 717 * If we got both locks above in EX, then we're the first mounter. 718 * If not, then we need to wait for the control_lock lvb to be 719 * updated by other mounted nodes to reflect our mount generation. 720 * 721 * In simple first mounter cases, first mounter will see zero lvb_gen, 722 * but in cases where all existing nodes leave/fail before mounting 723 * nodes finish control_mount, then all nodes will be mounting and 724 * lvb_gen will be non-zero. 725 */ 726 727 control_lvb_read(ls, &lvb_gen, lvb_bits); 728 729 if (lvb_gen == 0xFFFFFFFF) { 730 /* special value to force mount attempts to fail */ 731 fs_err(sdp, "control_mount control_lock disabled\n"); 732 error = -EINVAL; 733 goto fail; 734 } 735 736 if (mounted_mode == DLM_LOCK_EX) { 737 /* first mounter, keep both EX while doing first recovery */ 738 spin_lock(&ls->ls_recover_spin); 739 clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 740 set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags); 741 set_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags); 742 spin_unlock(&ls->ls_recover_spin); 743 fs_info(sdp, "first mounter control generation %u\n", lvb_gen); 744 return 0; 745 } 746 747 error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT); 748 if (error) 749 goto fail; 750 751 /* 752 * We are not first mounter, now we need to wait for the control_lock 753 * lvb generation to be >= the generation from our first recover_done 754 * and all lvb bits to be clear (no pending journal recoveries.) 755 */ 756 757 if (!all_jid_bits_clear(lvb_bits)) { 758 /* journals need recovery, wait until all are clear */ 759 fs_info(sdp, "control_mount wait for journal recovery\n"); 760 goto restart; 761 } 762 763 spin_lock(&ls->ls_recover_spin); 764 block_gen = ls->ls_recover_block; 765 start_gen = ls->ls_recover_start; 766 mount_gen = ls->ls_recover_mount; 767 768 if (lvb_gen < mount_gen) { 769 /* wait for mounted nodes to update control_lock lvb to our 770 generation, which might include new recovery bits set */ 771 fs_info(sdp, "control_mount wait1 block %u start %u mount %u " 772 "lvb %u flags %lx\n", block_gen, start_gen, mount_gen, 773 lvb_gen, ls->ls_recover_flags); 774 spin_unlock(&ls->ls_recover_spin); 775 goto restart; 776 } 777 778 if (lvb_gen != start_gen) { 779 /* wait for mounted nodes to update control_lock lvb to the 780 latest recovery generation */ 781 fs_info(sdp, "control_mount wait2 block %u start %u mount %u " 782 "lvb %u flags %lx\n", block_gen, start_gen, mount_gen, 783 lvb_gen, ls->ls_recover_flags); 784 spin_unlock(&ls->ls_recover_spin); 785 goto restart; 786 } 787 788 if (block_gen == start_gen) { 789 /* dlm recovery in progress, wait for it to finish */ 790 fs_info(sdp, "control_mount wait3 block %u start %u mount %u " 791 "lvb %u flags %lx\n", block_gen, start_gen, mount_gen, 792 lvb_gen, ls->ls_recover_flags); 793 spin_unlock(&ls->ls_recover_spin); 794 goto restart; 795 } 796 797 clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 798 set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags); 799 memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t)); 800 memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t)); 801 spin_unlock(&ls->ls_recover_spin); 802 return 0; 803 804 fail: 805 mounted_unlock(sdp); 806 control_unlock(sdp); 807 return error; 808 } 809 810 static int dlm_recovery_wait(void *word) 811 { 812 schedule(); 813 return 0; 814 } 815 816 static int control_first_done(struct gfs2_sbd *sdp) 817 { 818 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 819 char lvb_bits[GDLM_LVB_SIZE]; 820 uint32_t start_gen, block_gen; 821 int error; 822 823 restart: 824 spin_lock(&ls->ls_recover_spin); 825 start_gen = ls->ls_recover_start; 826 block_gen = ls->ls_recover_block; 827 828 if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags) || 829 !test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) || 830 !test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) { 831 /* sanity check, should not happen */ 832 fs_err(sdp, "control_first_done start %u block %u flags %lx\n", 833 start_gen, block_gen, ls->ls_recover_flags); 834 spin_unlock(&ls->ls_recover_spin); 835 control_unlock(sdp); 836 return -1; 837 } 838 839 if (start_gen == block_gen) { 840 /* 841 * Wait for the end of a dlm recovery cycle to switch from 842 * first mounter recovery. We can ignore any recover_slot 843 * callbacks between the recover_prep and next recover_done 844 * because we are still the first mounter and any failed nodes 845 * have not fully mounted, so they don't need recovery. 846 */ 847 spin_unlock(&ls->ls_recover_spin); 848 fs_info(sdp, "control_first_done wait gen %u\n", start_gen); 849 850 wait_on_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY, 851 dlm_recovery_wait, TASK_UNINTERRUPTIBLE); 852 goto restart; 853 } 854 855 clear_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags); 856 set_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags); 857 memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t)); 858 memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t)); 859 spin_unlock(&ls->ls_recover_spin); 860 861 memset(lvb_bits, 0, sizeof(lvb_bits)); 862 control_lvb_write(ls, start_gen, lvb_bits); 863 864 error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT); 865 if (error) 866 fs_err(sdp, "control_first_done mounted PR error %d\n", error); 867 868 error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT|DLM_LKF_VALBLK); 869 if (error) 870 fs_err(sdp, "control_first_done control NL error %d\n", error); 871 872 return error; 873 } 874 875 /* 876 * Expand static jid arrays if necessary (by increments of RECOVER_SIZE_INC) 877 * to accomodate the largest slot number. (NB dlm slot numbers start at 1, 878 * gfs2 jids start at 0, so jid = slot - 1) 879 */ 880 881 #define RECOVER_SIZE_INC 16 882 883 static int set_recover_size(struct gfs2_sbd *sdp, struct dlm_slot *slots, 884 int num_slots) 885 { 886 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 887 uint32_t *submit = NULL; 888 uint32_t *result = NULL; 889 uint32_t old_size, new_size; 890 int i, max_jid; 891 892 max_jid = 0; 893 for (i = 0; i < num_slots; i++) { 894 if (max_jid < slots[i].slot - 1) 895 max_jid = slots[i].slot - 1; 896 } 897 898 old_size = ls->ls_recover_size; 899 900 if (old_size >= max_jid + 1) 901 return 0; 902 903 new_size = old_size + RECOVER_SIZE_INC; 904 905 submit = kzalloc(new_size * sizeof(uint32_t), GFP_NOFS); 906 result = kzalloc(new_size * sizeof(uint32_t), GFP_NOFS); 907 if (!submit || !result) { 908 kfree(submit); 909 kfree(result); 910 return -ENOMEM; 911 } 912 913 spin_lock(&ls->ls_recover_spin); 914 memcpy(submit, ls->ls_recover_submit, old_size * sizeof(uint32_t)); 915 memcpy(result, ls->ls_recover_result, old_size * sizeof(uint32_t)); 916 kfree(ls->ls_recover_submit); 917 kfree(ls->ls_recover_result); 918 ls->ls_recover_submit = submit; 919 ls->ls_recover_result = result; 920 ls->ls_recover_size = new_size; 921 spin_unlock(&ls->ls_recover_spin); 922 return 0; 923 } 924 925 static void free_recover_size(struct lm_lockstruct *ls) 926 { 927 kfree(ls->ls_recover_submit); 928 kfree(ls->ls_recover_result); 929 ls->ls_recover_submit = NULL; 930 ls->ls_recover_result = NULL; 931 ls->ls_recover_size = 0; 932 } 933 934 /* dlm calls before it does lock recovery */ 935 936 static void gdlm_recover_prep(void *arg) 937 { 938 struct gfs2_sbd *sdp = arg; 939 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 940 941 spin_lock(&ls->ls_recover_spin); 942 ls->ls_recover_block = ls->ls_recover_start; 943 set_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags); 944 945 if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) || 946 test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) { 947 spin_unlock(&ls->ls_recover_spin); 948 return; 949 } 950 set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 951 spin_unlock(&ls->ls_recover_spin); 952 } 953 954 /* dlm calls after recover_prep has been completed on all lockspace members; 955 identifies slot/jid of failed member */ 956 957 static void gdlm_recover_slot(void *arg, struct dlm_slot *slot) 958 { 959 struct gfs2_sbd *sdp = arg; 960 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 961 int jid = slot->slot - 1; 962 963 spin_lock(&ls->ls_recover_spin); 964 if (ls->ls_recover_size < jid + 1) { 965 fs_err(sdp, "recover_slot jid %d gen %u short size %d", 966 jid, ls->ls_recover_block, ls->ls_recover_size); 967 spin_unlock(&ls->ls_recover_spin); 968 return; 969 } 970 971 if (ls->ls_recover_submit[jid]) { 972 fs_info(sdp, "recover_slot jid %d gen %u prev %u", 973 jid, ls->ls_recover_block, ls->ls_recover_submit[jid]); 974 } 975 ls->ls_recover_submit[jid] = ls->ls_recover_block; 976 spin_unlock(&ls->ls_recover_spin); 977 } 978 979 /* dlm calls after recover_slot and after it completes lock recovery */ 980 981 static void gdlm_recover_done(void *arg, struct dlm_slot *slots, int num_slots, 982 int our_slot, uint32_t generation) 983 { 984 struct gfs2_sbd *sdp = arg; 985 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 986 987 /* ensure the ls jid arrays are large enough */ 988 set_recover_size(sdp, slots, num_slots); 989 990 spin_lock(&ls->ls_recover_spin); 991 ls->ls_recover_start = generation; 992 993 if (!ls->ls_recover_mount) { 994 ls->ls_recover_mount = generation; 995 ls->ls_jid = our_slot - 1; 996 } 997 998 if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags)) 999 queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0); 1000 1001 clear_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags); 1002 smp_mb__after_clear_bit(); 1003 wake_up_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY); 1004 spin_unlock(&ls->ls_recover_spin); 1005 } 1006 1007 /* gfs2_recover thread has a journal recovery result */ 1008 1009 static void gdlm_recovery_result(struct gfs2_sbd *sdp, unsigned int jid, 1010 unsigned int result) 1011 { 1012 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 1013 1014 if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags)) 1015 return; 1016 1017 /* don't care about the recovery of own journal during mount */ 1018 if (jid == ls->ls_jid) 1019 return; 1020 1021 spin_lock(&ls->ls_recover_spin); 1022 if (test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) { 1023 spin_unlock(&ls->ls_recover_spin); 1024 return; 1025 } 1026 if (ls->ls_recover_size < jid + 1) { 1027 fs_err(sdp, "recovery_result jid %d short size %d", 1028 jid, ls->ls_recover_size); 1029 spin_unlock(&ls->ls_recover_spin); 1030 return; 1031 } 1032 1033 fs_info(sdp, "recover jid %d result %s\n", jid, 1034 result == LM_RD_GAVEUP ? "busy" : "success"); 1035 1036 ls->ls_recover_result[jid] = result; 1037 1038 /* GAVEUP means another node is recovering the journal; delay our 1039 next attempt to recover it, to give the other node a chance to 1040 finish before trying again */ 1041 1042 if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags)) 1043 queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 1044 result == LM_RD_GAVEUP ? HZ : 0); 1045 spin_unlock(&ls->ls_recover_spin); 1046 } 1047 1048 const struct dlm_lockspace_ops gdlm_lockspace_ops = { 1049 .recover_prep = gdlm_recover_prep, 1050 .recover_slot = gdlm_recover_slot, 1051 .recover_done = gdlm_recover_done, 1052 }; 1053 1054 static int gdlm_mount(struct gfs2_sbd *sdp, const char *table) 1055 { 1056 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 1057 char cluster[GFS2_LOCKNAME_LEN]; 1058 const char *fsname; 1059 uint32_t flags; 1060 int error, ops_result; 1061 1062 /* 1063 * initialize everything 1064 */ 1065 1066 INIT_DELAYED_WORK(&sdp->sd_control_work, gfs2_control_func); 1067 spin_lock_init(&ls->ls_recover_spin); 1068 ls->ls_recover_flags = 0; 1069 ls->ls_recover_mount = 0; 1070 ls->ls_recover_start = 0; 1071 ls->ls_recover_block = 0; 1072 ls->ls_recover_size = 0; 1073 ls->ls_recover_submit = NULL; 1074 ls->ls_recover_result = NULL; 1075 1076 error = set_recover_size(sdp, NULL, 0); 1077 if (error) 1078 goto fail; 1079 1080 /* 1081 * prepare dlm_new_lockspace args 1082 */ 1083 1084 fsname = strchr(table, ':'); 1085 if (!fsname) { 1086 fs_info(sdp, "no fsname found\n"); 1087 error = -EINVAL; 1088 goto fail_free; 1089 } 1090 memset(cluster, 0, sizeof(cluster)); 1091 memcpy(cluster, table, strlen(table) - strlen(fsname)); 1092 fsname++; 1093 1094 flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL; 1095 if (ls->ls_nodir) 1096 flags |= DLM_LSFL_NODIR; 1097 1098 /* 1099 * create/join lockspace 1100 */ 1101 1102 error = dlm_new_lockspace(fsname, cluster, flags, GDLM_LVB_SIZE, 1103 &gdlm_lockspace_ops, sdp, &ops_result, 1104 &ls->ls_dlm); 1105 if (error) { 1106 fs_err(sdp, "dlm_new_lockspace error %d\n", error); 1107 goto fail_free; 1108 } 1109 1110 if (ops_result < 0) { 1111 /* 1112 * dlm does not support ops callbacks, 1113 * old dlm_controld/gfs_controld are used, try without ops. 1114 */ 1115 fs_info(sdp, "dlm lockspace ops not used\n"); 1116 free_recover_size(ls); 1117 set_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags); 1118 return 0; 1119 } 1120 1121 if (!test_bit(SDF_NOJOURNALID, &sdp->sd_flags)) { 1122 fs_err(sdp, "dlm lockspace ops disallow jid preset\n"); 1123 error = -EINVAL; 1124 goto fail_release; 1125 } 1126 1127 /* 1128 * control_mount() uses control_lock to determine first mounter, 1129 * and for later mounts, waits for any recoveries to be cleared. 1130 */ 1131 1132 error = control_mount(sdp); 1133 if (error) { 1134 fs_err(sdp, "mount control error %d\n", error); 1135 goto fail_release; 1136 } 1137 1138 ls->ls_first = !!test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags); 1139 clear_bit(SDF_NOJOURNALID, &sdp->sd_flags); 1140 smp_mb__after_clear_bit(); 1141 wake_up_bit(&sdp->sd_flags, SDF_NOJOURNALID); 1142 return 0; 1143 1144 fail_release: 1145 dlm_release_lockspace(ls->ls_dlm, 2); 1146 fail_free: 1147 free_recover_size(ls); 1148 fail: 1149 return error; 1150 } 1151 1152 static void gdlm_first_done(struct gfs2_sbd *sdp) 1153 { 1154 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 1155 int error; 1156 1157 if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags)) 1158 return; 1159 1160 error = control_first_done(sdp); 1161 if (error) 1162 fs_err(sdp, "mount first_done error %d\n", error); 1163 } 1164 1165 static void gdlm_unmount(struct gfs2_sbd *sdp) 1166 { 1167 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 1168 1169 if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags)) 1170 goto release; 1171 1172 /* wait for gfs2_control_wq to be done with this mount */ 1173 1174 spin_lock(&ls->ls_recover_spin); 1175 set_bit(DFL_UNMOUNT, &ls->ls_recover_flags); 1176 spin_unlock(&ls->ls_recover_spin); 1177 flush_delayed_work_sync(&sdp->sd_control_work); 1178 1179 /* mounted_lock and control_lock will be purged in dlm recovery */ 1180 release: 1181 if (ls->ls_dlm) { 1182 dlm_release_lockspace(ls->ls_dlm, 2); 1183 ls->ls_dlm = NULL; 1184 } 1185 1186 free_recover_size(ls); 1187 } 1188 1189 static const match_table_t dlm_tokens = { 1190 { Opt_jid, "jid=%d"}, 1191 { Opt_id, "id=%d"}, 1192 { Opt_first, "first=%d"}, 1193 { Opt_nodir, "nodir=%d"}, 1194 { Opt_err, NULL }, 1195 }; 1196 1197 const struct lm_lockops gfs2_dlm_ops = { 1198 .lm_proto_name = "lock_dlm", 1199 .lm_mount = gdlm_mount, 1200 .lm_first_done = gdlm_first_done, 1201 .lm_recovery_result = gdlm_recovery_result, 1202 .lm_unmount = gdlm_unmount, 1203 .lm_put_lock = gdlm_put_lock, 1204 .lm_lock = gdlm_lock, 1205 .lm_cancel = gdlm_cancel, 1206 .lm_tokens = &dlm_tokens, 1207 }; 1208 1209