1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * journal.c 5 * 6 * Defines functions of journalling api 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/fs.h> 27 #include <linux/types.h> 28 #include <linux/slab.h> 29 #include <linux/highmem.h> 30 #include <linux/kthread.h> 31 #include <linux/time.h> 32 #include <linux/random.h> 33 34 #define MLOG_MASK_PREFIX ML_JOURNAL 35 #include <cluster/masklog.h> 36 37 #include "ocfs2.h" 38 39 #include "alloc.h" 40 #include "blockcheck.h" 41 #include "dir.h" 42 #include "dlmglue.h" 43 #include "extent_map.h" 44 #include "heartbeat.h" 45 #include "inode.h" 46 #include "journal.h" 47 #include "localalloc.h" 48 #include "slot_map.h" 49 #include "super.h" 50 #include "sysfile.h" 51 #include "uptodate.h" 52 #include "quota.h" 53 54 #include "buffer_head_io.h" 55 56 DEFINE_SPINLOCK(trans_inc_lock); 57 58 #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000 59 60 static int ocfs2_force_read_journal(struct inode *inode); 61 static int ocfs2_recover_node(struct ocfs2_super *osb, 62 int node_num, int slot_num); 63 static int __ocfs2_recovery_thread(void *arg); 64 static int ocfs2_commit_cache(struct ocfs2_super *osb); 65 static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota); 66 static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb, 67 int dirty, int replayed); 68 static int ocfs2_trylock_journal(struct ocfs2_super *osb, 69 int slot_num); 70 static int ocfs2_recover_orphans(struct ocfs2_super *osb, 71 int slot); 72 static int ocfs2_commit_thread(void *arg); 73 static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, 74 int slot_num, 75 struct ocfs2_dinode *la_dinode, 76 struct ocfs2_dinode *tl_dinode, 77 struct ocfs2_quota_recovery *qrec); 78 79 static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb) 80 { 81 return __ocfs2_wait_on_mount(osb, 0); 82 } 83 84 static inline int ocfs2_wait_on_quotas(struct ocfs2_super *osb) 85 { 86 return __ocfs2_wait_on_mount(osb, 1); 87 } 88 89 /* 90 * This replay_map is to track online/offline slots, so we could recover 91 * offline slots during recovery and mount 92 */ 93 94 enum ocfs2_replay_state { 95 REPLAY_UNNEEDED = 0, /* Replay is not needed, so ignore this map */ 96 REPLAY_NEEDED, /* Replay slots marked in rm_replay_slots */ 97 REPLAY_DONE /* Replay was already queued */ 98 }; 99 100 struct ocfs2_replay_map { 101 unsigned int rm_slots; 102 enum ocfs2_replay_state rm_state; 103 unsigned char rm_replay_slots[0]; 104 }; 105 106 void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state) 107 { 108 if (!osb->replay_map) 109 return; 110 111 /* If we've already queued the replay, we don't have any more to do */ 112 if (osb->replay_map->rm_state == REPLAY_DONE) 113 return; 114 115 osb->replay_map->rm_state = state; 116 } 117 118 int ocfs2_compute_replay_slots(struct ocfs2_super *osb) 119 { 120 struct ocfs2_replay_map *replay_map; 121 int i, node_num; 122 123 /* If replay map is already set, we don't do it again */ 124 if (osb->replay_map) 125 return 0; 126 127 replay_map = kzalloc(sizeof(struct ocfs2_replay_map) + 128 (osb->max_slots * sizeof(char)), GFP_KERNEL); 129 130 if (!replay_map) { 131 mlog_errno(-ENOMEM); 132 return -ENOMEM; 133 } 134 135 spin_lock(&osb->osb_lock); 136 137 replay_map->rm_slots = osb->max_slots; 138 replay_map->rm_state = REPLAY_UNNEEDED; 139 140 /* set rm_replay_slots for offline slot(s) */ 141 for (i = 0; i < replay_map->rm_slots; i++) { 142 if (ocfs2_slot_to_node_num_locked(osb, i, &node_num) == -ENOENT) 143 replay_map->rm_replay_slots[i] = 1; 144 } 145 146 osb->replay_map = replay_map; 147 spin_unlock(&osb->osb_lock); 148 return 0; 149 } 150 151 void ocfs2_queue_replay_slots(struct ocfs2_super *osb) 152 { 153 struct ocfs2_replay_map *replay_map = osb->replay_map; 154 int i; 155 156 if (!replay_map) 157 return; 158 159 if (replay_map->rm_state != REPLAY_NEEDED) 160 return; 161 162 for (i = 0; i < replay_map->rm_slots; i++) 163 if (replay_map->rm_replay_slots[i]) 164 ocfs2_queue_recovery_completion(osb->journal, i, NULL, 165 NULL, NULL); 166 replay_map->rm_state = REPLAY_DONE; 167 } 168 169 void ocfs2_free_replay_slots(struct ocfs2_super *osb) 170 { 171 struct ocfs2_replay_map *replay_map = osb->replay_map; 172 173 if (!osb->replay_map) 174 return; 175 176 kfree(replay_map); 177 osb->replay_map = NULL; 178 } 179 180 int ocfs2_recovery_init(struct ocfs2_super *osb) 181 { 182 struct ocfs2_recovery_map *rm; 183 184 mutex_init(&osb->recovery_lock); 185 osb->disable_recovery = 0; 186 osb->recovery_thread_task = NULL; 187 init_waitqueue_head(&osb->recovery_event); 188 189 rm = kzalloc(sizeof(struct ocfs2_recovery_map) + 190 osb->max_slots * sizeof(unsigned int), 191 GFP_KERNEL); 192 if (!rm) { 193 mlog_errno(-ENOMEM); 194 return -ENOMEM; 195 } 196 197 rm->rm_entries = (unsigned int *)((char *)rm + 198 sizeof(struct ocfs2_recovery_map)); 199 osb->recovery_map = rm; 200 201 return 0; 202 } 203 204 /* we can't grab the goofy sem lock from inside wait_event, so we use 205 * memory barriers to make sure that we'll see the null task before 206 * being woken up */ 207 static int ocfs2_recovery_thread_running(struct ocfs2_super *osb) 208 { 209 mb(); 210 return osb->recovery_thread_task != NULL; 211 } 212 213 void ocfs2_recovery_exit(struct ocfs2_super *osb) 214 { 215 struct ocfs2_recovery_map *rm; 216 217 /* disable any new recovery threads and wait for any currently 218 * running ones to exit. Do this before setting the vol_state. */ 219 mutex_lock(&osb->recovery_lock); 220 osb->disable_recovery = 1; 221 mutex_unlock(&osb->recovery_lock); 222 wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb)); 223 224 /* At this point, we know that no more recovery threads can be 225 * launched, so wait for any recovery completion work to 226 * complete. */ 227 flush_workqueue(ocfs2_wq); 228 229 /* 230 * Now that recovery is shut down, and the osb is about to be 231 * freed, the osb_lock is not taken here. 232 */ 233 rm = osb->recovery_map; 234 /* XXX: Should we bug if there are dirty entries? */ 235 236 kfree(rm); 237 } 238 239 static int __ocfs2_recovery_map_test(struct ocfs2_super *osb, 240 unsigned int node_num) 241 { 242 int i; 243 struct ocfs2_recovery_map *rm = osb->recovery_map; 244 245 assert_spin_locked(&osb->osb_lock); 246 247 for (i = 0; i < rm->rm_used; i++) { 248 if (rm->rm_entries[i] == node_num) 249 return 1; 250 } 251 252 return 0; 253 } 254 255 /* Behaves like test-and-set. Returns the previous value */ 256 static int ocfs2_recovery_map_set(struct ocfs2_super *osb, 257 unsigned int node_num) 258 { 259 struct ocfs2_recovery_map *rm = osb->recovery_map; 260 261 spin_lock(&osb->osb_lock); 262 if (__ocfs2_recovery_map_test(osb, node_num)) { 263 spin_unlock(&osb->osb_lock); 264 return 1; 265 } 266 267 /* XXX: Can this be exploited? Not from o2dlm... */ 268 BUG_ON(rm->rm_used >= osb->max_slots); 269 270 rm->rm_entries[rm->rm_used] = node_num; 271 rm->rm_used++; 272 spin_unlock(&osb->osb_lock); 273 274 return 0; 275 } 276 277 static void ocfs2_recovery_map_clear(struct ocfs2_super *osb, 278 unsigned int node_num) 279 { 280 int i; 281 struct ocfs2_recovery_map *rm = osb->recovery_map; 282 283 spin_lock(&osb->osb_lock); 284 285 for (i = 0; i < rm->rm_used; i++) { 286 if (rm->rm_entries[i] == node_num) 287 break; 288 } 289 290 if (i < rm->rm_used) { 291 /* XXX: be careful with the pointer math */ 292 memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]), 293 (rm->rm_used - i - 1) * sizeof(unsigned int)); 294 rm->rm_used--; 295 } 296 297 spin_unlock(&osb->osb_lock); 298 } 299 300 static int ocfs2_commit_cache(struct ocfs2_super *osb) 301 { 302 int status = 0; 303 unsigned int flushed; 304 unsigned long old_id; 305 struct ocfs2_journal *journal = NULL; 306 307 mlog_entry_void(); 308 309 journal = osb->journal; 310 311 /* Flush all pending commits and checkpoint the journal. */ 312 down_write(&journal->j_trans_barrier); 313 314 if (atomic_read(&journal->j_num_trans) == 0) { 315 up_write(&journal->j_trans_barrier); 316 mlog(0, "No transactions for me to flush!\n"); 317 goto finally; 318 } 319 320 jbd2_journal_lock_updates(journal->j_journal); 321 status = jbd2_journal_flush(journal->j_journal); 322 jbd2_journal_unlock_updates(journal->j_journal); 323 if (status < 0) { 324 up_write(&journal->j_trans_barrier); 325 mlog_errno(status); 326 goto finally; 327 } 328 329 old_id = ocfs2_inc_trans_id(journal); 330 331 flushed = atomic_read(&journal->j_num_trans); 332 atomic_set(&journal->j_num_trans, 0); 333 up_write(&journal->j_trans_barrier); 334 335 mlog(0, "commit_thread: flushed transaction %lu (%u handles)\n", 336 journal->j_trans_id, flushed); 337 338 ocfs2_wake_downconvert_thread(osb); 339 wake_up(&journal->j_checkpointed); 340 finally: 341 mlog_exit(status); 342 return status; 343 } 344 345 /* pass it NULL and it will allocate a new handle object for you. If 346 * you pass it a handle however, it may still return error, in which 347 * case it has free'd the passed handle for you. */ 348 handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs) 349 { 350 journal_t *journal = osb->journal->j_journal; 351 handle_t *handle; 352 353 BUG_ON(!osb || !osb->journal->j_journal); 354 355 if (ocfs2_is_hard_readonly(osb)) 356 return ERR_PTR(-EROFS); 357 358 BUG_ON(osb->journal->j_state == OCFS2_JOURNAL_FREE); 359 BUG_ON(max_buffs <= 0); 360 361 /* Nested transaction? Just return the handle... */ 362 if (journal_current_handle()) 363 return jbd2_journal_start(journal, max_buffs); 364 365 down_read(&osb->journal->j_trans_barrier); 366 367 handle = jbd2_journal_start(journal, max_buffs); 368 if (IS_ERR(handle)) { 369 up_read(&osb->journal->j_trans_barrier); 370 371 mlog_errno(PTR_ERR(handle)); 372 373 if (is_journal_aborted(journal)) { 374 ocfs2_abort(osb->sb, "Detected aborted journal"); 375 handle = ERR_PTR(-EROFS); 376 } 377 } else { 378 if (!ocfs2_mount_local(osb)) 379 atomic_inc(&(osb->journal->j_num_trans)); 380 } 381 382 return handle; 383 } 384 385 int ocfs2_commit_trans(struct ocfs2_super *osb, 386 handle_t *handle) 387 { 388 int ret, nested; 389 struct ocfs2_journal *journal = osb->journal; 390 391 BUG_ON(!handle); 392 393 nested = handle->h_ref > 1; 394 ret = jbd2_journal_stop(handle); 395 if (ret < 0) 396 mlog_errno(ret); 397 398 if (!nested) 399 up_read(&journal->j_trans_barrier); 400 401 return ret; 402 } 403 404 /* 405 * 'nblocks' is what you want to add to the current transaction. 406 * 407 * This might call jbd2_journal_restart() which will commit dirty buffers 408 * and then restart the transaction. Before calling 409 * ocfs2_extend_trans(), any changed blocks should have been 410 * dirtied. After calling it, all blocks which need to be changed must 411 * go through another set of journal_access/journal_dirty calls. 412 * 413 * WARNING: This will not release any semaphores or disk locks taken 414 * during the transaction, so make sure they were taken *before* 415 * start_trans or we'll have ordering deadlocks. 416 * 417 * WARNING2: Note that we do *not* drop j_trans_barrier here. This is 418 * good because transaction ids haven't yet been recorded on the 419 * cluster locks associated with this handle. 420 */ 421 int ocfs2_extend_trans(handle_t *handle, int nblocks) 422 { 423 int status, old_nblocks; 424 425 BUG_ON(!handle); 426 BUG_ON(nblocks < 0); 427 428 if (!nblocks) 429 return 0; 430 431 old_nblocks = handle->h_buffer_credits; 432 mlog_entry_void(); 433 434 mlog(0, "Trying to extend transaction by %d blocks\n", nblocks); 435 436 #ifdef CONFIG_OCFS2_DEBUG_FS 437 status = 1; 438 #else 439 status = jbd2_journal_extend(handle, nblocks); 440 if (status < 0) { 441 mlog_errno(status); 442 goto bail; 443 } 444 #endif 445 446 if (status > 0) { 447 mlog(0, 448 "jbd2_journal_extend failed, trying " 449 "jbd2_journal_restart\n"); 450 status = jbd2_journal_restart(handle, 451 old_nblocks + nblocks); 452 if (status < 0) { 453 mlog_errno(status); 454 goto bail; 455 } 456 } 457 458 status = 0; 459 bail: 460 461 mlog_exit(status); 462 return status; 463 } 464 465 struct ocfs2_triggers { 466 struct jbd2_buffer_trigger_type ot_triggers; 467 int ot_offset; 468 }; 469 470 static inline struct ocfs2_triggers *to_ocfs2_trigger(struct jbd2_buffer_trigger_type *triggers) 471 { 472 return container_of(triggers, struct ocfs2_triggers, ot_triggers); 473 } 474 475 static void ocfs2_frozen_trigger(struct jbd2_buffer_trigger_type *triggers, 476 struct buffer_head *bh, 477 void *data, size_t size) 478 { 479 struct ocfs2_triggers *ot = to_ocfs2_trigger(triggers); 480 481 /* 482 * We aren't guaranteed to have the superblock here, so we 483 * must unconditionally compute the ecc data. 484 * __ocfs2_journal_access() will only set the triggers if 485 * metaecc is enabled. 486 */ 487 ocfs2_block_check_compute(data, size, data + ot->ot_offset); 488 } 489 490 /* 491 * Quota blocks have their own trigger because the struct ocfs2_block_check 492 * offset depends on the blocksize. 493 */ 494 static void ocfs2_dq_frozen_trigger(struct jbd2_buffer_trigger_type *triggers, 495 struct buffer_head *bh, 496 void *data, size_t size) 497 { 498 struct ocfs2_disk_dqtrailer *dqt = 499 ocfs2_block_dqtrailer(size, data); 500 501 /* 502 * We aren't guaranteed to have the superblock here, so we 503 * must unconditionally compute the ecc data. 504 * __ocfs2_journal_access() will only set the triggers if 505 * metaecc is enabled. 506 */ 507 ocfs2_block_check_compute(data, size, &dqt->dq_check); 508 } 509 510 /* 511 * Directory blocks also have their own trigger because the 512 * struct ocfs2_block_check offset depends on the blocksize. 513 */ 514 static void ocfs2_db_frozen_trigger(struct jbd2_buffer_trigger_type *triggers, 515 struct buffer_head *bh, 516 void *data, size_t size) 517 { 518 struct ocfs2_dir_block_trailer *trailer = 519 ocfs2_dir_trailer_from_size(size, data); 520 521 /* 522 * We aren't guaranteed to have the superblock here, so we 523 * must unconditionally compute the ecc data. 524 * __ocfs2_journal_access() will only set the triggers if 525 * metaecc is enabled. 526 */ 527 ocfs2_block_check_compute(data, size, &trailer->db_check); 528 } 529 530 static void ocfs2_abort_trigger(struct jbd2_buffer_trigger_type *triggers, 531 struct buffer_head *bh) 532 { 533 mlog(ML_ERROR, 534 "ocfs2_abort_trigger called by JBD2. bh = 0x%lx, " 535 "bh->b_blocknr = %llu\n", 536 (unsigned long)bh, 537 (unsigned long long)bh->b_blocknr); 538 539 /* We aren't guaranteed to have the superblock here - but if we 540 * don't, it'll just crash. */ 541 ocfs2_error(bh->b_assoc_map->host->i_sb, 542 "JBD2 has aborted our journal, ocfs2 cannot continue\n"); 543 } 544 545 static struct ocfs2_triggers di_triggers = { 546 .ot_triggers = { 547 .t_frozen = ocfs2_frozen_trigger, 548 .t_abort = ocfs2_abort_trigger, 549 }, 550 .ot_offset = offsetof(struct ocfs2_dinode, i_check), 551 }; 552 553 static struct ocfs2_triggers eb_triggers = { 554 .ot_triggers = { 555 .t_frozen = ocfs2_frozen_trigger, 556 .t_abort = ocfs2_abort_trigger, 557 }, 558 .ot_offset = offsetof(struct ocfs2_extent_block, h_check), 559 }; 560 561 static struct ocfs2_triggers rb_triggers = { 562 .ot_triggers = { 563 .t_frozen = ocfs2_frozen_trigger, 564 .t_abort = ocfs2_abort_trigger, 565 }, 566 .ot_offset = offsetof(struct ocfs2_refcount_block, rf_check), 567 }; 568 569 static struct ocfs2_triggers gd_triggers = { 570 .ot_triggers = { 571 .t_frozen = ocfs2_frozen_trigger, 572 .t_abort = ocfs2_abort_trigger, 573 }, 574 .ot_offset = offsetof(struct ocfs2_group_desc, bg_check), 575 }; 576 577 static struct ocfs2_triggers db_triggers = { 578 .ot_triggers = { 579 .t_frozen = ocfs2_db_frozen_trigger, 580 .t_abort = ocfs2_abort_trigger, 581 }, 582 }; 583 584 static struct ocfs2_triggers xb_triggers = { 585 .ot_triggers = { 586 .t_frozen = ocfs2_frozen_trigger, 587 .t_abort = ocfs2_abort_trigger, 588 }, 589 .ot_offset = offsetof(struct ocfs2_xattr_block, xb_check), 590 }; 591 592 static struct ocfs2_triggers dq_triggers = { 593 .ot_triggers = { 594 .t_frozen = ocfs2_dq_frozen_trigger, 595 .t_abort = ocfs2_abort_trigger, 596 }, 597 }; 598 599 static struct ocfs2_triggers dr_triggers = { 600 .ot_triggers = { 601 .t_frozen = ocfs2_frozen_trigger, 602 .t_abort = ocfs2_abort_trigger, 603 }, 604 .ot_offset = offsetof(struct ocfs2_dx_root_block, dr_check), 605 }; 606 607 static struct ocfs2_triggers dl_triggers = { 608 .ot_triggers = { 609 .t_frozen = ocfs2_frozen_trigger, 610 .t_abort = ocfs2_abort_trigger, 611 }, 612 .ot_offset = offsetof(struct ocfs2_dx_leaf, dl_check), 613 }; 614 615 static int __ocfs2_journal_access(handle_t *handle, 616 struct ocfs2_caching_info *ci, 617 struct buffer_head *bh, 618 struct ocfs2_triggers *triggers, 619 int type) 620 { 621 int status; 622 struct ocfs2_super *osb = 623 OCFS2_SB(ocfs2_metadata_cache_get_super(ci)); 624 625 BUG_ON(!ci || !ci->ci_ops); 626 BUG_ON(!handle); 627 BUG_ON(!bh); 628 629 mlog_entry("bh->b_blocknr=%llu, type=%d (\"%s\"), bh->b_size = %zu\n", 630 (unsigned long long)bh->b_blocknr, type, 631 (type == OCFS2_JOURNAL_ACCESS_CREATE) ? 632 "OCFS2_JOURNAL_ACCESS_CREATE" : 633 "OCFS2_JOURNAL_ACCESS_WRITE", 634 bh->b_size); 635 636 /* we can safely remove this assertion after testing. */ 637 if (!buffer_uptodate(bh)) { 638 mlog(ML_ERROR, "giving me a buffer that's not uptodate!\n"); 639 mlog(ML_ERROR, "b_blocknr=%llu\n", 640 (unsigned long long)bh->b_blocknr); 641 BUG(); 642 } 643 644 /* Set the current transaction information on the ci so 645 * that the locking code knows whether it can drop it's locks 646 * on this ci or not. We're protected from the commit 647 * thread updating the current transaction id until 648 * ocfs2_commit_trans() because ocfs2_start_trans() took 649 * j_trans_barrier for us. */ 650 ocfs2_set_ci_lock_trans(osb->journal, ci); 651 652 ocfs2_metadata_cache_io_lock(ci); 653 switch (type) { 654 case OCFS2_JOURNAL_ACCESS_CREATE: 655 case OCFS2_JOURNAL_ACCESS_WRITE: 656 status = jbd2_journal_get_write_access(handle, bh); 657 break; 658 659 case OCFS2_JOURNAL_ACCESS_UNDO: 660 status = jbd2_journal_get_undo_access(handle, bh); 661 break; 662 663 default: 664 status = -EINVAL; 665 mlog(ML_ERROR, "Unknown access type!\n"); 666 } 667 if (!status && ocfs2_meta_ecc(osb) && triggers) 668 jbd2_journal_set_triggers(bh, &triggers->ot_triggers); 669 ocfs2_metadata_cache_io_unlock(ci); 670 671 if (status < 0) 672 mlog(ML_ERROR, "Error %d getting %d access to buffer!\n", 673 status, type); 674 675 mlog_exit(status); 676 return status; 677 } 678 679 int ocfs2_journal_access_di(handle_t *handle, struct ocfs2_caching_info *ci, 680 struct buffer_head *bh, int type) 681 { 682 return __ocfs2_journal_access(handle, ci, bh, &di_triggers, type); 683 } 684 685 int ocfs2_journal_access_eb(handle_t *handle, struct ocfs2_caching_info *ci, 686 struct buffer_head *bh, int type) 687 { 688 return __ocfs2_journal_access(handle, ci, bh, &eb_triggers, type); 689 } 690 691 int ocfs2_journal_access_rb(handle_t *handle, struct ocfs2_caching_info *ci, 692 struct buffer_head *bh, int type) 693 { 694 return __ocfs2_journal_access(handle, ci, bh, &rb_triggers, 695 type); 696 } 697 698 int ocfs2_journal_access_gd(handle_t *handle, struct ocfs2_caching_info *ci, 699 struct buffer_head *bh, int type) 700 { 701 return __ocfs2_journal_access(handle, ci, bh, &gd_triggers, type); 702 } 703 704 int ocfs2_journal_access_db(handle_t *handle, struct ocfs2_caching_info *ci, 705 struct buffer_head *bh, int type) 706 { 707 return __ocfs2_journal_access(handle, ci, bh, &db_triggers, type); 708 } 709 710 int ocfs2_journal_access_xb(handle_t *handle, struct ocfs2_caching_info *ci, 711 struct buffer_head *bh, int type) 712 { 713 return __ocfs2_journal_access(handle, ci, bh, &xb_triggers, type); 714 } 715 716 int ocfs2_journal_access_dq(handle_t *handle, struct ocfs2_caching_info *ci, 717 struct buffer_head *bh, int type) 718 { 719 return __ocfs2_journal_access(handle, ci, bh, &dq_triggers, type); 720 } 721 722 int ocfs2_journal_access_dr(handle_t *handle, struct ocfs2_caching_info *ci, 723 struct buffer_head *bh, int type) 724 { 725 return __ocfs2_journal_access(handle, ci, bh, &dr_triggers, type); 726 } 727 728 int ocfs2_journal_access_dl(handle_t *handle, struct ocfs2_caching_info *ci, 729 struct buffer_head *bh, int type) 730 { 731 return __ocfs2_journal_access(handle, ci, bh, &dl_triggers, type); 732 } 733 734 int ocfs2_journal_access(handle_t *handle, struct ocfs2_caching_info *ci, 735 struct buffer_head *bh, int type) 736 { 737 return __ocfs2_journal_access(handle, ci, bh, NULL, type); 738 } 739 740 void ocfs2_journal_dirty(handle_t *handle, struct buffer_head *bh) 741 { 742 int status; 743 744 mlog_entry("(bh->b_blocknr=%llu)\n", 745 (unsigned long long)bh->b_blocknr); 746 747 status = jbd2_journal_dirty_metadata(handle, bh); 748 BUG_ON(status); 749 750 mlog_exit_void(); 751 } 752 753 #define OCFS2_DEFAULT_COMMIT_INTERVAL (HZ * JBD2_DEFAULT_MAX_COMMIT_AGE) 754 755 void ocfs2_set_journal_params(struct ocfs2_super *osb) 756 { 757 journal_t *journal = osb->journal->j_journal; 758 unsigned long commit_interval = OCFS2_DEFAULT_COMMIT_INTERVAL; 759 760 if (osb->osb_commit_interval) 761 commit_interval = osb->osb_commit_interval; 762 763 write_lock(&journal->j_state_lock); 764 journal->j_commit_interval = commit_interval; 765 if (osb->s_mount_opt & OCFS2_MOUNT_BARRIER) 766 journal->j_flags |= JBD2_BARRIER; 767 else 768 journal->j_flags &= ~JBD2_BARRIER; 769 write_unlock(&journal->j_state_lock); 770 } 771 772 int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty) 773 { 774 int status = -1; 775 struct inode *inode = NULL; /* the journal inode */ 776 journal_t *j_journal = NULL; 777 struct ocfs2_dinode *di = NULL; 778 struct buffer_head *bh = NULL; 779 struct ocfs2_super *osb; 780 int inode_lock = 0; 781 782 mlog_entry_void(); 783 784 BUG_ON(!journal); 785 786 osb = journal->j_osb; 787 788 /* already have the inode for our journal */ 789 inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, 790 osb->slot_num); 791 if (inode == NULL) { 792 status = -EACCES; 793 mlog_errno(status); 794 goto done; 795 } 796 if (is_bad_inode(inode)) { 797 mlog(ML_ERROR, "access error (bad inode)\n"); 798 iput(inode); 799 inode = NULL; 800 status = -EACCES; 801 goto done; 802 } 803 804 SET_INODE_JOURNAL(inode); 805 OCFS2_I(inode)->ip_open_count++; 806 807 /* Skip recovery waits here - journal inode metadata never 808 * changes in a live cluster so it can be considered an 809 * exception to the rule. */ 810 status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY); 811 if (status < 0) { 812 if (status != -ERESTARTSYS) 813 mlog(ML_ERROR, "Could not get lock on journal!\n"); 814 goto done; 815 } 816 817 inode_lock = 1; 818 di = (struct ocfs2_dinode *)bh->b_data; 819 820 if (inode->i_size < OCFS2_MIN_JOURNAL_SIZE) { 821 mlog(ML_ERROR, "Journal file size (%lld) is too small!\n", 822 inode->i_size); 823 status = -EINVAL; 824 goto done; 825 } 826 827 mlog(0, "inode->i_size = %lld\n", inode->i_size); 828 mlog(0, "inode->i_blocks = %llu\n", 829 (unsigned long long)inode->i_blocks); 830 mlog(0, "inode->ip_clusters = %u\n", OCFS2_I(inode)->ip_clusters); 831 832 /* call the kernels journal init function now */ 833 j_journal = jbd2_journal_init_inode(inode); 834 if (j_journal == NULL) { 835 mlog(ML_ERROR, "Linux journal layer error\n"); 836 status = -EINVAL; 837 goto done; 838 } 839 840 mlog(0, "Returned from jbd2_journal_init_inode\n"); 841 mlog(0, "j_journal->j_maxlen = %u\n", j_journal->j_maxlen); 842 843 *dirty = (le32_to_cpu(di->id1.journal1.ij_flags) & 844 OCFS2_JOURNAL_DIRTY_FL); 845 846 journal->j_journal = j_journal; 847 journal->j_inode = inode; 848 journal->j_bh = bh; 849 850 ocfs2_set_journal_params(osb); 851 852 journal->j_state = OCFS2_JOURNAL_LOADED; 853 854 status = 0; 855 done: 856 if (status < 0) { 857 if (inode_lock) 858 ocfs2_inode_unlock(inode, 1); 859 brelse(bh); 860 if (inode) { 861 OCFS2_I(inode)->ip_open_count--; 862 iput(inode); 863 } 864 } 865 866 mlog_exit(status); 867 return status; 868 } 869 870 static void ocfs2_bump_recovery_generation(struct ocfs2_dinode *di) 871 { 872 le32_add_cpu(&(di->id1.journal1.ij_recovery_generation), 1); 873 } 874 875 static u32 ocfs2_get_recovery_generation(struct ocfs2_dinode *di) 876 { 877 return le32_to_cpu(di->id1.journal1.ij_recovery_generation); 878 } 879 880 static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb, 881 int dirty, int replayed) 882 { 883 int status; 884 unsigned int flags; 885 struct ocfs2_journal *journal = osb->journal; 886 struct buffer_head *bh = journal->j_bh; 887 struct ocfs2_dinode *fe; 888 889 mlog_entry_void(); 890 891 fe = (struct ocfs2_dinode *)bh->b_data; 892 893 /* The journal bh on the osb always comes from ocfs2_journal_init() 894 * and was validated there inside ocfs2_inode_lock_full(). It's a 895 * code bug if we mess it up. */ 896 BUG_ON(!OCFS2_IS_VALID_DINODE(fe)); 897 898 flags = le32_to_cpu(fe->id1.journal1.ij_flags); 899 if (dirty) 900 flags |= OCFS2_JOURNAL_DIRTY_FL; 901 else 902 flags &= ~OCFS2_JOURNAL_DIRTY_FL; 903 fe->id1.journal1.ij_flags = cpu_to_le32(flags); 904 905 if (replayed) 906 ocfs2_bump_recovery_generation(fe); 907 908 ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check); 909 status = ocfs2_write_block(osb, bh, INODE_CACHE(journal->j_inode)); 910 if (status < 0) 911 mlog_errno(status); 912 913 mlog_exit(status); 914 return status; 915 } 916 917 /* 918 * If the journal has been kmalloc'd it needs to be freed after this 919 * call. 920 */ 921 void ocfs2_journal_shutdown(struct ocfs2_super *osb) 922 { 923 struct ocfs2_journal *journal = NULL; 924 int status = 0; 925 struct inode *inode = NULL; 926 int num_running_trans = 0; 927 928 mlog_entry_void(); 929 930 BUG_ON(!osb); 931 932 journal = osb->journal; 933 if (!journal) 934 goto done; 935 936 inode = journal->j_inode; 937 938 if (journal->j_state != OCFS2_JOURNAL_LOADED) 939 goto done; 940 941 /* need to inc inode use count - jbd2_journal_destroy will iput. */ 942 if (!igrab(inode)) 943 BUG(); 944 945 num_running_trans = atomic_read(&(osb->journal->j_num_trans)); 946 if (num_running_trans > 0) 947 mlog(0, "Shutting down journal: must wait on %d " 948 "running transactions!\n", 949 num_running_trans); 950 951 /* Do a commit_cache here. It will flush our journal, *and* 952 * release any locks that are still held. 953 * set the SHUTDOWN flag and release the trans lock. 954 * the commit thread will take the trans lock for us below. */ 955 journal->j_state = OCFS2_JOURNAL_IN_SHUTDOWN; 956 957 /* The OCFS2_JOURNAL_IN_SHUTDOWN will signal to commit_cache to not 958 * drop the trans_lock (which we want to hold until we 959 * completely destroy the journal. */ 960 if (osb->commit_task) { 961 /* Wait for the commit thread */ 962 mlog(0, "Waiting for ocfs2commit to exit....\n"); 963 kthread_stop(osb->commit_task); 964 osb->commit_task = NULL; 965 } 966 967 BUG_ON(atomic_read(&(osb->journal->j_num_trans)) != 0); 968 969 if (ocfs2_mount_local(osb)) { 970 jbd2_journal_lock_updates(journal->j_journal); 971 status = jbd2_journal_flush(journal->j_journal); 972 jbd2_journal_unlock_updates(journal->j_journal); 973 if (status < 0) 974 mlog_errno(status); 975 } 976 977 if (status == 0) { 978 /* 979 * Do not toggle if flush was unsuccessful otherwise 980 * will leave dirty metadata in a "clean" journal 981 */ 982 status = ocfs2_journal_toggle_dirty(osb, 0, 0); 983 if (status < 0) 984 mlog_errno(status); 985 } 986 987 /* Shutdown the kernel journal system */ 988 jbd2_journal_destroy(journal->j_journal); 989 journal->j_journal = NULL; 990 991 OCFS2_I(inode)->ip_open_count--; 992 993 /* unlock our journal */ 994 ocfs2_inode_unlock(inode, 1); 995 996 brelse(journal->j_bh); 997 journal->j_bh = NULL; 998 999 journal->j_state = OCFS2_JOURNAL_FREE; 1000 1001 // up_write(&journal->j_trans_barrier); 1002 done: 1003 if (inode) 1004 iput(inode); 1005 mlog_exit_void(); 1006 } 1007 1008 static void ocfs2_clear_journal_error(struct super_block *sb, 1009 journal_t *journal, 1010 int slot) 1011 { 1012 int olderr; 1013 1014 olderr = jbd2_journal_errno(journal); 1015 if (olderr) { 1016 mlog(ML_ERROR, "File system error %d recorded in " 1017 "journal %u.\n", olderr, slot); 1018 mlog(ML_ERROR, "File system on device %s needs checking.\n", 1019 sb->s_id); 1020 1021 jbd2_journal_ack_err(journal); 1022 jbd2_journal_clear_err(journal); 1023 } 1024 } 1025 1026 int ocfs2_journal_load(struct ocfs2_journal *journal, int local, int replayed) 1027 { 1028 int status = 0; 1029 struct ocfs2_super *osb; 1030 1031 mlog_entry_void(); 1032 1033 BUG_ON(!journal); 1034 1035 osb = journal->j_osb; 1036 1037 status = jbd2_journal_load(journal->j_journal); 1038 if (status < 0) { 1039 mlog(ML_ERROR, "Failed to load journal!\n"); 1040 goto done; 1041 } 1042 1043 ocfs2_clear_journal_error(osb->sb, journal->j_journal, osb->slot_num); 1044 1045 status = ocfs2_journal_toggle_dirty(osb, 1, replayed); 1046 if (status < 0) { 1047 mlog_errno(status); 1048 goto done; 1049 } 1050 1051 /* Launch the commit thread */ 1052 if (!local) { 1053 osb->commit_task = kthread_run(ocfs2_commit_thread, osb, 1054 "ocfs2cmt"); 1055 if (IS_ERR(osb->commit_task)) { 1056 status = PTR_ERR(osb->commit_task); 1057 osb->commit_task = NULL; 1058 mlog(ML_ERROR, "unable to launch ocfs2commit thread, " 1059 "error=%d", status); 1060 goto done; 1061 } 1062 } else 1063 osb->commit_task = NULL; 1064 1065 done: 1066 mlog_exit(status); 1067 return status; 1068 } 1069 1070 1071 /* 'full' flag tells us whether we clear out all blocks or if we just 1072 * mark the journal clean */ 1073 int ocfs2_journal_wipe(struct ocfs2_journal *journal, int full) 1074 { 1075 int status; 1076 1077 mlog_entry_void(); 1078 1079 BUG_ON(!journal); 1080 1081 status = jbd2_journal_wipe(journal->j_journal, full); 1082 if (status < 0) { 1083 mlog_errno(status); 1084 goto bail; 1085 } 1086 1087 status = ocfs2_journal_toggle_dirty(journal->j_osb, 0, 0); 1088 if (status < 0) 1089 mlog_errno(status); 1090 1091 bail: 1092 mlog_exit(status); 1093 return status; 1094 } 1095 1096 static int ocfs2_recovery_completed(struct ocfs2_super *osb) 1097 { 1098 int empty; 1099 struct ocfs2_recovery_map *rm = osb->recovery_map; 1100 1101 spin_lock(&osb->osb_lock); 1102 empty = (rm->rm_used == 0); 1103 spin_unlock(&osb->osb_lock); 1104 1105 return empty; 1106 } 1107 1108 void ocfs2_wait_for_recovery(struct ocfs2_super *osb) 1109 { 1110 wait_event(osb->recovery_event, ocfs2_recovery_completed(osb)); 1111 } 1112 1113 /* 1114 * JBD Might read a cached version of another nodes journal file. We 1115 * don't want this as this file changes often and we get no 1116 * notification on those changes. The only way to be sure that we've 1117 * got the most up to date version of those blocks then is to force 1118 * read them off disk. Just searching through the buffer cache won't 1119 * work as there may be pages backing this file which are still marked 1120 * up to date. We know things can't change on this file underneath us 1121 * as we have the lock by now :) 1122 */ 1123 static int ocfs2_force_read_journal(struct inode *inode) 1124 { 1125 int status = 0; 1126 int i; 1127 u64 v_blkno, p_blkno, p_blocks, num_blocks; 1128 #define CONCURRENT_JOURNAL_FILL 32ULL 1129 struct buffer_head *bhs[CONCURRENT_JOURNAL_FILL]; 1130 1131 mlog_entry_void(); 1132 1133 memset(bhs, 0, sizeof(struct buffer_head *) * CONCURRENT_JOURNAL_FILL); 1134 1135 num_blocks = ocfs2_blocks_for_bytes(inode->i_sb, inode->i_size); 1136 v_blkno = 0; 1137 while (v_blkno < num_blocks) { 1138 status = ocfs2_extent_map_get_blocks(inode, v_blkno, 1139 &p_blkno, &p_blocks, NULL); 1140 if (status < 0) { 1141 mlog_errno(status); 1142 goto bail; 1143 } 1144 1145 if (p_blocks > CONCURRENT_JOURNAL_FILL) 1146 p_blocks = CONCURRENT_JOURNAL_FILL; 1147 1148 /* We are reading journal data which should not 1149 * be put in the uptodate cache */ 1150 status = ocfs2_read_blocks_sync(OCFS2_SB(inode->i_sb), 1151 p_blkno, p_blocks, bhs); 1152 if (status < 0) { 1153 mlog_errno(status); 1154 goto bail; 1155 } 1156 1157 for(i = 0; i < p_blocks; i++) { 1158 brelse(bhs[i]); 1159 bhs[i] = NULL; 1160 } 1161 1162 v_blkno += p_blocks; 1163 } 1164 1165 bail: 1166 for(i = 0; i < CONCURRENT_JOURNAL_FILL; i++) 1167 brelse(bhs[i]); 1168 mlog_exit(status); 1169 return status; 1170 } 1171 1172 struct ocfs2_la_recovery_item { 1173 struct list_head lri_list; 1174 int lri_slot; 1175 struct ocfs2_dinode *lri_la_dinode; 1176 struct ocfs2_dinode *lri_tl_dinode; 1177 struct ocfs2_quota_recovery *lri_qrec; 1178 }; 1179 1180 /* Does the second half of the recovery process. By this point, the 1181 * node is marked clean and can actually be considered recovered, 1182 * hence it's no longer in the recovery map, but there's still some 1183 * cleanup we can do which shouldn't happen within the recovery thread 1184 * as locking in that context becomes very difficult if we are to take 1185 * recovering nodes into account. 1186 * 1187 * NOTE: This function can and will sleep on recovery of other nodes 1188 * during cluster locking, just like any other ocfs2 process. 1189 */ 1190 void ocfs2_complete_recovery(struct work_struct *work) 1191 { 1192 int ret; 1193 struct ocfs2_journal *journal = 1194 container_of(work, struct ocfs2_journal, j_recovery_work); 1195 struct ocfs2_super *osb = journal->j_osb; 1196 struct ocfs2_dinode *la_dinode, *tl_dinode; 1197 struct ocfs2_la_recovery_item *item, *n; 1198 struct ocfs2_quota_recovery *qrec; 1199 LIST_HEAD(tmp_la_list); 1200 1201 mlog_entry_void(); 1202 1203 mlog(0, "completing recovery from keventd\n"); 1204 1205 spin_lock(&journal->j_lock); 1206 list_splice_init(&journal->j_la_cleanups, &tmp_la_list); 1207 spin_unlock(&journal->j_lock); 1208 1209 list_for_each_entry_safe(item, n, &tmp_la_list, lri_list) { 1210 list_del_init(&item->lri_list); 1211 1212 mlog(0, "Complete recovery for slot %d\n", item->lri_slot); 1213 1214 ocfs2_wait_on_quotas(osb); 1215 1216 la_dinode = item->lri_la_dinode; 1217 if (la_dinode) { 1218 mlog(0, "Clean up local alloc %llu\n", 1219 (unsigned long long)le64_to_cpu(la_dinode->i_blkno)); 1220 1221 ret = ocfs2_complete_local_alloc_recovery(osb, 1222 la_dinode); 1223 if (ret < 0) 1224 mlog_errno(ret); 1225 1226 kfree(la_dinode); 1227 } 1228 1229 tl_dinode = item->lri_tl_dinode; 1230 if (tl_dinode) { 1231 mlog(0, "Clean up truncate log %llu\n", 1232 (unsigned long long)le64_to_cpu(tl_dinode->i_blkno)); 1233 1234 ret = ocfs2_complete_truncate_log_recovery(osb, 1235 tl_dinode); 1236 if (ret < 0) 1237 mlog_errno(ret); 1238 1239 kfree(tl_dinode); 1240 } 1241 1242 ret = ocfs2_recover_orphans(osb, item->lri_slot); 1243 if (ret < 0) 1244 mlog_errno(ret); 1245 1246 qrec = item->lri_qrec; 1247 if (qrec) { 1248 mlog(0, "Recovering quota files"); 1249 ret = ocfs2_finish_quota_recovery(osb, qrec, 1250 item->lri_slot); 1251 if (ret < 0) 1252 mlog_errno(ret); 1253 /* Recovery info is already freed now */ 1254 } 1255 1256 kfree(item); 1257 } 1258 1259 mlog(0, "Recovery completion\n"); 1260 mlog_exit_void(); 1261 } 1262 1263 /* NOTE: This function always eats your references to la_dinode and 1264 * tl_dinode, either manually on error, or by passing them to 1265 * ocfs2_complete_recovery */ 1266 static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, 1267 int slot_num, 1268 struct ocfs2_dinode *la_dinode, 1269 struct ocfs2_dinode *tl_dinode, 1270 struct ocfs2_quota_recovery *qrec) 1271 { 1272 struct ocfs2_la_recovery_item *item; 1273 1274 item = kmalloc(sizeof(struct ocfs2_la_recovery_item), GFP_NOFS); 1275 if (!item) { 1276 /* Though we wish to avoid it, we are in fact safe in 1277 * skipping local alloc cleanup as fsck.ocfs2 is more 1278 * than capable of reclaiming unused space. */ 1279 if (la_dinode) 1280 kfree(la_dinode); 1281 1282 if (tl_dinode) 1283 kfree(tl_dinode); 1284 1285 if (qrec) 1286 ocfs2_free_quota_recovery(qrec); 1287 1288 mlog_errno(-ENOMEM); 1289 return; 1290 } 1291 1292 INIT_LIST_HEAD(&item->lri_list); 1293 item->lri_la_dinode = la_dinode; 1294 item->lri_slot = slot_num; 1295 item->lri_tl_dinode = tl_dinode; 1296 item->lri_qrec = qrec; 1297 1298 spin_lock(&journal->j_lock); 1299 list_add_tail(&item->lri_list, &journal->j_la_cleanups); 1300 queue_work(ocfs2_wq, &journal->j_recovery_work); 1301 spin_unlock(&journal->j_lock); 1302 } 1303 1304 /* Called by the mount code to queue recovery the last part of 1305 * recovery for it's own and offline slot(s). */ 1306 void ocfs2_complete_mount_recovery(struct ocfs2_super *osb) 1307 { 1308 struct ocfs2_journal *journal = osb->journal; 1309 1310 /* No need to queue up our truncate_log as regular cleanup will catch 1311 * that */ 1312 ocfs2_queue_recovery_completion(journal, osb->slot_num, 1313 osb->local_alloc_copy, NULL, NULL); 1314 ocfs2_schedule_truncate_log_flush(osb, 0); 1315 1316 osb->local_alloc_copy = NULL; 1317 osb->dirty = 0; 1318 1319 /* queue to recover orphan slots for all offline slots */ 1320 ocfs2_replay_map_set_state(osb, REPLAY_NEEDED); 1321 ocfs2_queue_replay_slots(osb); 1322 ocfs2_free_replay_slots(osb); 1323 } 1324 1325 void ocfs2_complete_quota_recovery(struct ocfs2_super *osb) 1326 { 1327 if (osb->quota_rec) { 1328 ocfs2_queue_recovery_completion(osb->journal, 1329 osb->slot_num, 1330 NULL, 1331 NULL, 1332 osb->quota_rec); 1333 osb->quota_rec = NULL; 1334 } 1335 } 1336 1337 static int __ocfs2_recovery_thread(void *arg) 1338 { 1339 int status, node_num, slot_num; 1340 struct ocfs2_super *osb = arg; 1341 struct ocfs2_recovery_map *rm = osb->recovery_map; 1342 int *rm_quota = NULL; 1343 int rm_quota_used = 0, i; 1344 struct ocfs2_quota_recovery *qrec; 1345 1346 mlog_entry_void(); 1347 1348 status = ocfs2_wait_on_mount(osb); 1349 if (status < 0) { 1350 goto bail; 1351 } 1352 1353 rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS); 1354 if (!rm_quota) { 1355 status = -ENOMEM; 1356 goto bail; 1357 } 1358 restart: 1359 status = ocfs2_super_lock(osb, 1); 1360 if (status < 0) { 1361 mlog_errno(status); 1362 goto bail; 1363 } 1364 1365 status = ocfs2_compute_replay_slots(osb); 1366 if (status < 0) 1367 mlog_errno(status); 1368 1369 /* queue recovery for our own slot */ 1370 ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL, 1371 NULL, NULL); 1372 1373 spin_lock(&osb->osb_lock); 1374 while (rm->rm_used) { 1375 /* It's always safe to remove entry zero, as we won't 1376 * clear it until ocfs2_recover_node() has succeeded. */ 1377 node_num = rm->rm_entries[0]; 1378 spin_unlock(&osb->osb_lock); 1379 mlog(0, "checking node %d\n", node_num); 1380 slot_num = ocfs2_node_num_to_slot(osb, node_num); 1381 if (slot_num == -ENOENT) { 1382 status = 0; 1383 mlog(0, "no slot for this node, so no recovery" 1384 "required.\n"); 1385 goto skip_recovery; 1386 } 1387 mlog(0, "node %d was using slot %d\n", node_num, slot_num); 1388 1389 /* It is a bit subtle with quota recovery. We cannot do it 1390 * immediately because we have to obtain cluster locks from 1391 * quota files and we also don't want to just skip it because 1392 * then quota usage would be out of sync until some node takes 1393 * the slot. So we remember which nodes need quota recovery 1394 * and when everything else is done, we recover quotas. */ 1395 for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++); 1396 if (i == rm_quota_used) 1397 rm_quota[rm_quota_used++] = slot_num; 1398 1399 status = ocfs2_recover_node(osb, node_num, slot_num); 1400 skip_recovery: 1401 if (!status) { 1402 ocfs2_recovery_map_clear(osb, node_num); 1403 } else { 1404 mlog(ML_ERROR, 1405 "Error %d recovering node %d on device (%u,%u)!\n", 1406 status, node_num, 1407 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); 1408 mlog(ML_ERROR, "Volume requires unmount.\n"); 1409 } 1410 1411 spin_lock(&osb->osb_lock); 1412 } 1413 spin_unlock(&osb->osb_lock); 1414 mlog(0, "All nodes recovered\n"); 1415 1416 /* Refresh all journal recovery generations from disk */ 1417 status = ocfs2_check_journals_nolocks(osb); 1418 status = (status == -EROFS) ? 0 : status; 1419 if (status < 0) 1420 mlog_errno(status); 1421 1422 /* Now it is right time to recover quotas... We have to do this under 1423 * superblock lock so that noone can start using the slot (and crash) 1424 * before we recover it */ 1425 for (i = 0; i < rm_quota_used; i++) { 1426 qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]); 1427 if (IS_ERR(qrec)) { 1428 status = PTR_ERR(qrec); 1429 mlog_errno(status); 1430 continue; 1431 } 1432 ocfs2_queue_recovery_completion(osb->journal, rm_quota[i], 1433 NULL, NULL, qrec); 1434 } 1435 1436 ocfs2_super_unlock(osb, 1); 1437 1438 /* queue recovery for offline slots */ 1439 ocfs2_queue_replay_slots(osb); 1440 1441 bail: 1442 mutex_lock(&osb->recovery_lock); 1443 if (!status && !ocfs2_recovery_completed(osb)) { 1444 mutex_unlock(&osb->recovery_lock); 1445 goto restart; 1446 } 1447 1448 ocfs2_free_replay_slots(osb); 1449 osb->recovery_thread_task = NULL; 1450 mb(); /* sync with ocfs2_recovery_thread_running */ 1451 wake_up(&osb->recovery_event); 1452 1453 mutex_unlock(&osb->recovery_lock); 1454 1455 if (rm_quota) 1456 kfree(rm_quota); 1457 1458 mlog_exit(status); 1459 /* no one is callint kthread_stop() for us so the kthread() api 1460 * requires that we call do_exit(). And it isn't exported, but 1461 * complete_and_exit() seems to be a minimal wrapper around it. */ 1462 complete_and_exit(NULL, status); 1463 return status; 1464 } 1465 1466 void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num) 1467 { 1468 mlog_entry("(node_num=%d, osb->node_num = %d)\n", 1469 node_num, osb->node_num); 1470 1471 mutex_lock(&osb->recovery_lock); 1472 if (osb->disable_recovery) 1473 goto out; 1474 1475 /* People waiting on recovery will wait on 1476 * the recovery map to empty. */ 1477 if (ocfs2_recovery_map_set(osb, node_num)) 1478 mlog(0, "node %d already in recovery map.\n", node_num); 1479 1480 mlog(0, "starting recovery thread...\n"); 1481 1482 if (osb->recovery_thread_task) 1483 goto out; 1484 1485 osb->recovery_thread_task = kthread_run(__ocfs2_recovery_thread, osb, 1486 "ocfs2rec"); 1487 if (IS_ERR(osb->recovery_thread_task)) { 1488 mlog_errno((int)PTR_ERR(osb->recovery_thread_task)); 1489 osb->recovery_thread_task = NULL; 1490 } 1491 1492 out: 1493 mutex_unlock(&osb->recovery_lock); 1494 wake_up(&osb->recovery_event); 1495 1496 mlog_exit_void(); 1497 } 1498 1499 static int ocfs2_read_journal_inode(struct ocfs2_super *osb, 1500 int slot_num, 1501 struct buffer_head **bh, 1502 struct inode **ret_inode) 1503 { 1504 int status = -EACCES; 1505 struct inode *inode = NULL; 1506 1507 BUG_ON(slot_num >= osb->max_slots); 1508 1509 inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, 1510 slot_num); 1511 if (!inode || is_bad_inode(inode)) { 1512 mlog_errno(status); 1513 goto bail; 1514 } 1515 SET_INODE_JOURNAL(inode); 1516 1517 status = ocfs2_read_inode_block_full(inode, bh, OCFS2_BH_IGNORE_CACHE); 1518 if (status < 0) { 1519 mlog_errno(status); 1520 goto bail; 1521 } 1522 1523 status = 0; 1524 1525 bail: 1526 if (inode) { 1527 if (status || !ret_inode) 1528 iput(inode); 1529 else 1530 *ret_inode = inode; 1531 } 1532 return status; 1533 } 1534 1535 /* Does the actual journal replay and marks the journal inode as 1536 * clean. Will only replay if the journal inode is marked dirty. */ 1537 static int ocfs2_replay_journal(struct ocfs2_super *osb, 1538 int node_num, 1539 int slot_num) 1540 { 1541 int status; 1542 int got_lock = 0; 1543 unsigned int flags; 1544 struct inode *inode = NULL; 1545 struct ocfs2_dinode *fe; 1546 journal_t *journal = NULL; 1547 struct buffer_head *bh = NULL; 1548 u32 slot_reco_gen; 1549 1550 status = ocfs2_read_journal_inode(osb, slot_num, &bh, &inode); 1551 if (status) { 1552 mlog_errno(status); 1553 goto done; 1554 } 1555 1556 fe = (struct ocfs2_dinode *)bh->b_data; 1557 slot_reco_gen = ocfs2_get_recovery_generation(fe); 1558 brelse(bh); 1559 bh = NULL; 1560 1561 /* 1562 * As the fs recovery is asynchronous, there is a small chance that 1563 * another node mounted (and recovered) the slot before the recovery 1564 * thread could get the lock. To handle that, we dirty read the journal 1565 * inode for that slot to get the recovery generation. If it is 1566 * different than what we expected, the slot has been recovered. 1567 * If not, it needs recovery. 1568 */ 1569 if (osb->slot_recovery_generations[slot_num] != slot_reco_gen) { 1570 mlog(0, "Slot %u already recovered (old/new=%u/%u)\n", slot_num, 1571 osb->slot_recovery_generations[slot_num], slot_reco_gen); 1572 osb->slot_recovery_generations[slot_num] = slot_reco_gen; 1573 status = -EBUSY; 1574 goto done; 1575 } 1576 1577 /* Continue with recovery as the journal has not yet been recovered */ 1578 1579 status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY); 1580 if (status < 0) { 1581 mlog(0, "status returned from ocfs2_inode_lock=%d\n", status); 1582 if (status != -ERESTARTSYS) 1583 mlog(ML_ERROR, "Could not lock journal!\n"); 1584 goto done; 1585 } 1586 got_lock = 1; 1587 1588 fe = (struct ocfs2_dinode *) bh->b_data; 1589 1590 flags = le32_to_cpu(fe->id1.journal1.ij_flags); 1591 slot_reco_gen = ocfs2_get_recovery_generation(fe); 1592 1593 if (!(flags & OCFS2_JOURNAL_DIRTY_FL)) { 1594 mlog(0, "No recovery required for node %d\n", node_num); 1595 /* Refresh recovery generation for the slot */ 1596 osb->slot_recovery_generations[slot_num] = slot_reco_gen; 1597 goto done; 1598 } 1599 1600 /* we need to run complete recovery for offline orphan slots */ 1601 ocfs2_replay_map_set_state(osb, REPLAY_NEEDED); 1602 1603 mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n", 1604 node_num, slot_num, 1605 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); 1606 1607 OCFS2_I(inode)->ip_clusters = le32_to_cpu(fe->i_clusters); 1608 1609 status = ocfs2_force_read_journal(inode); 1610 if (status < 0) { 1611 mlog_errno(status); 1612 goto done; 1613 } 1614 1615 mlog(0, "calling journal_init_inode\n"); 1616 journal = jbd2_journal_init_inode(inode); 1617 if (journal == NULL) { 1618 mlog(ML_ERROR, "Linux journal layer error\n"); 1619 status = -EIO; 1620 goto done; 1621 } 1622 1623 status = jbd2_journal_load(journal); 1624 if (status < 0) { 1625 mlog_errno(status); 1626 if (!igrab(inode)) 1627 BUG(); 1628 jbd2_journal_destroy(journal); 1629 goto done; 1630 } 1631 1632 ocfs2_clear_journal_error(osb->sb, journal, slot_num); 1633 1634 /* wipe the journal */ 1635 mlog(0, "flushing the journal.\n"); 1636 jbd2_journal_lock_updates(journal); 1637 status = jbd2_journal_flush(journal); 1638 jbd2_journal_unlock_updates(journal); 1639 if (status < 0) 1640 mlog_errno(status); 1641 1642 /* This will mark the node clean */ 1643 flags = le32_to_cpu(fe->id1.journal1.ij_flags); 1644 flags &= ~OCFS2_JOURNAL_DIRTY_FL; 1645 fe->id1.journal1.ij_flags = cpu_to_le32(flags); 1646 1647 /* Increment recovery generation to indicate successful recovery */ 1648 ocfs2_bump_recovery_generation(fe); 1649 osb->slot_recovery_generations[slot_num] = 1650 ocfs2_get_recovery_generation(fe); 1651 1652 ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check); 1653 status = ocfs2_write_block(osb, bh, INODE_CACHE(inode)); 1654 if (status < 0) 1655 mlog_errno(status); 1656 1657 if (!igrab(inode)) 1658 BUG(); 1659 1660 jbd2_journal_destroy(journal); 1661 1662 done: 1663 /* drop the lock on this nodes journal */ 1664 if (got_lock) 1665 ocfs2_inode_unlock(inode, 1); 1666 1667 if (inode) 1668 iput(inode); 1669 1670 brelse(bh); 1671 1672 mlog_exit(status); 1673 return status; 1674 } 1675 1676 /* 1677 * Do the most important parts of node recovery: 1678 * - Replay it's journal 1679 * - Stamp a clean local allocator file 1680 * - Stamp a clean truncate log 1681 * - Mark the node clean 1682 * 1683 * If this function completes without error, a node in OCFS2 can be 1684 * said to have been safely recovered. As a result, failure during the 1685 * second part of a nodes recovery process (local alloc recovery) is 1686 * far less concerning. 1687 */ 1688 static int ocfs2_recover_node(struct ocfs2_super *osb, 1689 int node_num, int slot_num) 1690 { 1691 int status = 0; 1692 struct ocfs2_dinode *la_copy = NULL; 1693 struct ocfs2_dinode *tl_copy = NULL; 1694 1695 mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n", 1696 node_num, slot_num, osb->node_num); 1697 1698 /* Should not ever be called to recover ourselves -- in that 1699 * case we should've called ocfs2_journal_load instead. */ 1700 BUG_ON(osb->node_num == node_num); 1701 1702 status = ocfs2_replay_journal(osb, node_num, slot_num); 1703 if (status < 0) { 1704 if (status == -EBUSY) { 1705 mlog(0, "Skipping recovery for slot %u (node %u) " 1706 "as another node has recovered it\n", slot_num, 1707 node_num); 1708 status = 0; 1709 goto done; 1710 } 1711 mlog_errno(status); 1712 goto done; 1713 } 1714 1715 /* Stamp a clean local alloc file AFTER recovering the journal... */ 1716 status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy); 1717 if (status < 0) { 1718 mlog_errno(status); 1719 goto done; 1720 } 1721 1722 /* An error from begin_truncate_log_recovery is not 1723 * serious enough to warrant halting the rest of 1724 * recovery. */ 1725 status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy); 1726 if (status < 0) 1727 mlog_errno(status); 1728 1729 /* Likewise, this would be a strange but ultimately not so 1730 * harmful place to get an error... */ 1731 status = ocfs2_clear_slot(osb, slot_num); 1732 if (status < 0) 1733 mlog_errno(status); 1734 1735 /* This will kfree the memory pointed to by la_copy and tl_copy */ 1736 ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy, 1737 tl_copy, NULL); 1738 1739 status = 0; 1740 done: 1741 1742 mlog_exit(status); 1743 return status; 1744 } 1745 1746 /* Test node liveness by trylocking his journal. If we get the lock, 1747 * we drop it here. Return 0 if we got the lock, -EAGAIN if node is 1748 * still alive (we couldn't get the lock) and < 0 on error. */ 1749 static int ocfs2_trylock_journal(struct ocfs2_super *osb, 1750 int slot_num) 1751 { 1752 int status, flags; 1753 struct inode *inode = NULL; 1754 1755 inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, 1756 slot_num); 1757 if (inode == NULL) { 1758 mlog(ML_ERROR, "access error\n"); 1759 status = -EACCES; 1760 goto bail; 1761 } 1762 if (is_bad_inode(inode)) { 1763 mlog(ML_ERROR, "access error (bad inode)\n"); 1764 iput(inode); 1765 inode = NULL; 1766 status = -EACCES; 1767 goto bail; 1768 } 1769 SET_INODE_JOURNAL(inode); 1770 1771 flags = OCFS2_META_LOCK_RECOVERY | OCFS2_META_LOCK_NOQUEUE; 1772 status = ocfs2_inode_lock_full(inode, NULL, 1, flags); 1773 if (status < 0) { 1774 if (status != -EAGAIN) 1775 mlog_errno(status); 1776 goto bail; 1777 } 1778 1779 ocfs2_inode_unlock(inode, 1); 1780 bail: 1781 if (inode) 1782 iput(inode); 1783 1784 return status; 1785 } 1786 1787 /* Call this underneath ocfs2_super_lock. It also assumes that the 1788 * slot info struct has been updated from disk. */ 1789 int ocfs2_mark_dead_nodes(struct ocfs2_super *osb) 1790 { 1791 unsigned int node_num; 1792 int status, i; 1793 u32 gen; 1794 struct buffer_head *bh = NULL; 1795 struct ocfs2_dinode *di; 1796 1797 /* This is called with the super block cluster lock, so we 1798 * know that the slot map can't change underneath us. */ 1799 1800 for (i = 0; i < osb->max_slots; i++) { 1801 /* Read journal inode to get the recovery generation */ 1802 status = ocfs2_read_journal_inode(osb, i, &bh, NULL); 1803 if (status) { 1804 mlog_errno(status); 1805 goto bail; 1806 } 1807 di = (struct ocfs2_dinode *)bh->b_data; 1808 gen = ocfs2_get_recovery_generation(di); 1809 brelse(bh); 1810 bh = NULL; 1811 1812 spin_lock(&osb->osb_lock); 1813 osb->slot_recovery_generations[i] = gen; 1814 1815 mlog(0, "Slot %u recovery generation is %u\n", i, 1816 osb->slot_recovery_generations[i]); 1817 1818 if (i == osb->slot_num) { 1819 spin_unlock(&osb->osb_lock); 1820 continue; 1821 } 1822 1823 status = ocfs2_slot_to_node_num_locked(osb, i, &node_num); 1824 if (status == -ENOENT) { 1825 spin_unlock(&osb->osb_lock); 1826 continue; 1827 } 1828 1829 if (__ocfs2_recovery_map_test(osb, node_num)) { 1830 spin_unlock(&osb->osb_lock); 1831 continue; 1832 } 1833 spin_unlock(&osb->osb_lock); 1834 1835 /* Ok, we have a slot occupied by another node which 1836 * is not in the recovery map. We trylock his journal 1837 * file here to test if he's alive. */ 1838 status = ocfs2_trylock_journal(osb, i); 1839 if (!status) { 1840 /* Since we're called from mount, we know that 1841 * the recovery thread can't race us on 1842 * setting / checking the recovery bits. */ 1843 ocfs2_recovery_thread(osb, node_num); 1844 } else if ((status < 0) && (status != -EAGAIN)) { 1845 mlog_errno(status); 1846 goto bail; 1847 } 1848 } 1849 1850 status = 0; 1851 bail: 1852 mlog_exit(status); 1853 return status; 1854 } 1855 1856 /* 1857 * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some 1858 * randomness to the timeout to minimize multple nodes firing the timer at the 1859 * same time. 1860 */ 1861 static inline unsigned long ocfs2_orphan_scan_timeout(void) 1862 { 1863 unsigned long time; 1864 1865 get_random_bytes(&time, sizeof(time)); 1866 time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000); 1867 return msecs_to_jiffies(time); 1868 } 1869 1870 /* 1871 * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for 1872 * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This 1873 * is done to catch any orphans that are left over in orphan directories. 1874 * 1875 * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT 1876 * seconds. It gets an EX lock on os_lockres and checks sequence number 1877 * stored in LVB. If the sequence number has changed, it means some other 1878 * node has done the scan. This node skips the scan and tracks the 1879 * sequence number. If the sequence number didn't change, it means a scan 1880 * hasn't happened. The node queues a scan and increments the 1881 * sequence number in the LVB. 1882 */ 1883 void ocfs2_queue_orphan_scan(struct ocfs2_super *osb) 1884 { 1885 struct ocfs2_orphan_scan *os; 1886 int status, i; 1887 u32 seqno = 0; 1888 1889 os = &osb->osb_orphan_scan; 1890 1891 if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE) 1892 goto out; 1893 1894 status = ocfs2_orphan_scan_lock(osb, &seqno); 1895 if (status < 0) { 1896 if (status != -EAGAIN) 1897 mlog_errno(status); 1898 goto out; 1899 } 1900 1901 /* Do no queue the tasks if the volume is being umounted */ 1902 if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE) 1903 goto unlock; 1904 1905 if (os->os_seqno != seqno) { 1906 os->os_seqno = seqno; 1907 goto unlock; 1908 } 1909 1910 for (i = 0; i < osb->max_slots; i++) 1911 ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL, 1912 NULL); 1913 /* 1914 * We queued a recovery on orphan slots, increment the sequence 1915 * number and update LVB so other node will skip the scan for a while 1916 */ 1917 seqno++; 1918 os->os_count++; 1919 os->os_scantime = CURRENT_TIME; 1920 unlock: 1921 ocfs2_orphan_scan_unlock(osb, seqno); 1922 out: 1923 return; 1924 } 1925 1926 /* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */ 1927 void ocfs2_orphan_scan_work(struct work_struct *work) 1928 { 1929 struct ocfs2_orphan_scan *os; 1930 struct ocfs2_super *osb; 1931 1932 os = container_of(work, struct ocfs2_orphan_scan, 1933 os_orphan_scan_work.work); 1934 osb = os->os_osb; 1935 1936 mutex_lock(&os->os_lock); 1937 ocfs2_queue_orphan_scan(osb); 1938 if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE) 1939 queue_delayed_work(ocfs2_wq, &os->os_orphan_scan_work, 1940 ocfs2_orphan_scan_timeout()); 1941 mutex_unlock(&os->os_lock); 1942 } 1943 1944 void ocfs2_orphan_scan_stop(struct ocfs2_super *osb) 1945 { 1946 struct ocfs2_orphan_scan *os; 1947 1948 os = &osb->osb_orphan_scan; 1949 if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE) { 1950 atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE); 1951 mutex_lock(&os->os_lock); 1952 cancel_delayed_work(&os->os_orphan_scan_work); 1953 mutex_unlock(&os->os_lock); 1954 } 1955 } 1956 1957 void ocfs2_orphan_scan_init(struct ocfs2_super *osb) 1958 { 1959 struct ocfs2_orphan_scan *os; 1960 1961 os = &osb->osb_orphan_scan; 1962 os->os_osb = osb; 1963 os->os_count = 0; 1964 os->os_seqno = 0; 1965 mutex_init(&os->os_lock); 1966 INIT_DELAYED_WORK(&os->os_orphan_scan_work, ocfs2_orphan_scan_work); 1967 } 1968 1969 void ocfs2_orphan_scan_start(struct ocfs2_super *osb) 1970 { 1971 struct ocfs2_orphan_scan *os; 1972 1973 os = &osb->osb_orphan_scan; 1974 os->os_scantime = CURRENT_TIME; 1975 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1976 atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE); 1977 else { 1978 atomic_set(&os->os_state, ORPHAN_SCAN_ACTIVE); 1979 queue_delayed_work(ocfs2_wq, &os->os_orphan_scan_work, 1980 ocfs2_orphan_scan_timeout()); 1981 } 1982 } 1983 1984 struct ocfs2_orphan_filldir_priv { 1985 struct inode *head; 1986 struct ocfs2_super *osb; 1987 }; 1988 1989 static int ocfs2_orphan_filldir(void *priv, const char *name, int name_len, 1990 loff_t pos, u64 ino, unsigned type) 1991 { 1992 struct ocfs2_orphan_filldir_priv *p = priv; 1993 struct inode *iter; 1994 1995 if (name_len == 1 && !strncmp(".", name, 1)) 1996 return 0; 1997 if (name_len == 2 && !strncmp("..", name, 2)) 1998 return 0; 1999 2000 /* Skip bad inodes so that recovery can continue */ 2001 iter = ocfs2_iget(p->osb, ino, 2002 OCFS2_FI_FLAG_ORPHAN_RECOVERY, 0); 2003 if (IS_ERR(iter)) 2004 return 0; 2005 2006 mlog(0, "queue orphan %llu\n", 2007 (unsigned long long)OCFS2_I(iter)->ip_blkno); 2008 /* No locking is required for the next_orphan queue as there 2009 * is only ever a single process doing orphan recovery. */ 2010 OCFS2_I(iter)->ip_next_orphan = p->head; 2011 p->head = iter; 2012 2013 return 0; 2014 } 2015 2016 static int ocfs2_queue_orphans(struct ocfs2_super *osb, 2017 int slot, 2018 struct inode **head) 2019 { 2020 int status; 2021 struct inode *orphan_dir_inode = NULL; 2022 struct ocfs2_orphan_filldir_priv priv; 2023 loff_t pos = 0; 2024 2025 priv.osb = osb; 2026 priv.head = *head; 2027 2028 orphan_dir_inode = ocfs2_get_system_file_inode(osb, 2029 ORPHAN_DIR_SYSTEM_INODE, 2030 slot); 2031 if (!orphan_dir_inode) { 2032 status = -ENOENT; 2033 mlog_errno(status); 2034 return status; 2035 } 2036 2037 mutex_lock(&orphan_dir_inode->i_mutex); 2038 status = ocfs2_inode_lock(orphan_dir_inode, NULL, 0); 2039 if (status < 0) { 2040 mlog_errno(status); 2041 goto out; 2042 } 2043 2044 status = ocfs2_dir_foreach(orphan_dir_inode, &pos, &priv, 2045 ocfs2_orphan_filldir); 2046 if (status) { 2047 mlog_errno(status); 2048 goto out_cluster; 2049 } 2050 2051 *head = priv.head; 2052 2053 out_cluster: 2054 ocfs2_inode_unlock(orphan_dir_inode, 0); 2055 out: 2056 mutex_unlock(&orphan_dir_inode->i_mutex); 2057 iput(orphan_dir_inode); 2058 return status; 2059 } 2060 2061 static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb, 2062 int slot) 2063 { 2064 int ret; 2065 2066 spin_lock(&osb->osb_lock); 2067 ret = !osb->osb_orphan_wipes[slot]; 2068 spin_unlock(&osb->osb_lock); 2069 return ret; 2070 } 2071 2072 static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb, 2073 int slot) 2074 { 2075 spin_lock(&osb->osb_lock); 2076 /* Mark ourselves such that new processes in delete_inode() 2077 * know to quit early. */ 2078 ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot); 2079 while (osb->osb_orphan_wipes[slot]) { 2080 /* If any processes are already in the middle of an 2081 * orphan wipe on this dir, then we need to wait for 2082 * them. */ 2083 spin_unlock(&osb->osb_lock); 2084 wait_event_interruptible(osb->osb_wipe_event, 2085 ocfs2_orphan_recovery_can_continue(osb, slot)); 2086 spin_lock(&osb->osb_lock); 2087 } 2088 spin_unlock(&osb->osb_lock); 2089 } 2090 2091 static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb, 2092 int slot) 2093 { 2094 ocfs2_node_map_clear_bit(osb, &osb->osb_recovering_orphan_dirs, slot); 2095 } 2096 2097 /* 2098 * Orphan recovery. Each mounted node has it's own orphan dir which we 2099 * must run during recovery. Our strategy here is to build a list of 2100 * the inodes in the orphan dir and iget/iput them. The VFS does 2101 * (most) of the rest of the work. 2102 * 2103 * Orphan recovery can happen at any time, not just mount so we have a 2104 * couple of extra considerations. 2105 * 2106 * - We grab as many inodes as we can under the orphan dir lock - 2107 * doing iget() outside the orphan dir risks getting a reference on 2108 * an invalid inode. 2109 * - We must be sure not to deadlock with other processes on the 2110 * system wanting to run delete_inode(). This can happen when they go 2111 * to lock the orphan dir and the orphan recovery process attempts to 2112 * iget() inside the orphan dir lock. This can be avoided by 2113 * advertising our state to ocfs2_delete_inode(). 2114 */ 2115 static int ocfs2_recover_orphans(struct ocfs2_super *osb, 2116 int slot) 2117 { 2118 int ret = 0; 2119 struct inode *inode = NULL; 2120 struct inode *iter; 2121 struct ocfs2_inode_info *oi; 2122 2123 mlog(0, "Recover inodes from orphan dir in slot %d\n", slot); 2124 2125 ocfs2_mark_recovering_orphan_dir(osb, slot); 2126 ret = ocfs2_queue_orphans(osb, slot, &inode); 2127 ocfs2_clear_recovering_orphan_dir(osb, slot); 2128 2129 /* Error here should be noted, but we want to continue with as 2130 * many queued inodes as we've got. */ 2131 if (ret) 2132 mlog_errno(ret); 2133 2134 while (inode) { 2135 oi = OCFS2_I(inode); 2136 mlog(0, "iput orphan %llu\n", (unsigned long long)oi->ip_blkno); 2137 2138 iter = oi->ip_next_orphan; 2139 2140 spin_lock(&oi->ip_lock); 2141 /* The remote delete code may have set these on the 2142 * assumption that the other node would wipe them 2143 * successfully. If they are still in the node's 2144 * orphan dir, we need to reset that state. */ 2145 oi->ip_flags &= ~(OCFS2_INODE_DELETED|OCFS2_INODE_SKIP_DELETE); 2146 2147 /* Set the proper information to get us going into 2148 * ocfs2_delete_inode. */ 2149 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 2150 spin_unlock(&oi->ip_lock); 2151 2152 iput(inode); 2153 2154 inode = iter; 2155 } 2156 2157 return ret; 2158 } 2159 2160 static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota) 2161 { 2162 /* This check is good because ocfs2 will wait on our recovery 2163 * thread before changing it to something other than MOUNTED 2164 * or DISABLED. */ 2165 wait_event(osb->osb_mount_event, 2166 (!quota && atomic_read(&osb->vol_state) == VOLUME_MOUNTED) || 2167 atomic_read(&osb->vol_state) == VOLUME_MOUNTED_QUOTAS || 2168 atomic_read(&osb->vol_state) == VOLUME_DISABLED); 2169 2170 /* If there's an error on mount, then we may never get to the 2171 * MOUNTED flag, but this is set right before 2172 * dismount_volume() so we can trust it. */ 2173 if (atomic_read(&osb->vol_state) == VOLUME_DISABLED) { 2174 mlog(0, "mount error, exiting!\n"); 2175 return -EBUSY; 2176 } 2177 2178 return 0; 2179 } 2180 2181 static int ocfs2_commit_thread(void *arg) 2182 { 2183 int status; 2184 struct ocfs2_super *osb = arg; 2185 struct ocfs2_journal *journal = osb->journal; 2186 2187 /* we can trust j_num_trans here because _should_stop() is only set in 2188 * shutdown and nobody other than ourselves should be able to start 2189 * transactions. committing on shutdown might take a few iterations 2190 * as final transactions put deleted inodes on the list */ 2191 while (!(kthread_should_stop() && 2192 atomic_read(&journal->j_num_trans) == 0)) { 2193 2194 wait_event_interruptible(osb->checkpoint_event, 2195 atomic_read(&journal->j_num_trans) 2196 || kthread_should_stop()); 2197 2198 status = ocfs2_commit_cache(osb); 2199 if (status < 0) 2200 mlog_errno(status); 2201 2202 if (kthread_should_stop() && atomic_read(&journal->j_num_trans)){ 2203 mlog(ML_KTHREAD, 2204 "commit_thread: %u transactions pending on " 2205 "shutdown\n", 2206 atomic_read(&journal->j_num_trans)); 2207 } 2208 } 2209 2210 return 0; 2211 } 2212 2213 /* Reads all the journal inodes without taking any cluster locks. Used 2214 * for hard readonly access to determine whether any journal requires 2215 * recovery. Also used to refresh the recovery generation numbers after 2216 * a journal has been recovered by another node. 2217 */ 2218 int ocfs2_check_journals_nolocks(struct ocfs2_super *osb) 2219 { 2220 int ret = 0; 2221 unsigned int slot; 2222 struct buffer_head *di_bh = NULL; 2223 struct ocfs2_dinode *di; 2224 int journal_dirty = 0; 2225 2226 for(slot = 0; slot < osb->max_slots; slot++) { 2227 ret = ocfs2_read_journal_inode(osb, slot, &di_bh, NULL); 2228 if (ret) { 2229 mlog_errno(ret); 2230 goto out; 2231 } 2232 2233 di = (struct ocfs2_dinode *) di_bh->b_data; 2234 2235 osb->slot_recovery_generations[slot] = 2236 ocfs2_get_recovery_generation(di); 2237 2238 if (le32_to_cpu(di->id1.journal1.ij_flags) & 2239 OCFS2_JOURNAL_DIRTY_FL) 2240 journal_dirty = 1; 2241 2242 brelse(di_bh); 2243 di_bh = NULL; 2244 } 2245 2246 out: 2247 if (journal_dirty) 2248 ret = -EROFS; 2249 return ret; 2250 } 2251