1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "recovery-journal.h" 7 8 #include <linux/atomic.h> 9 #include <linux/bio.h> 10 11 #include "logger.h" 12 #include "memory-alloc.h" 13 #include "permassert.h" 14 15 #include "block-map.h" 16 #include "completion.h" 17 #include "constants.h" 18 #include "data-vio.h" 19 #include "encodings.h" 20 #include "io-submitter.h" 21 #include "slab-depot.h" 22 #include "types.h" 23 #include "vdo.h" 24 #include "vio.h" 25 #include "wait-queue.h" 26 27 static const u64 RECOVERY_COUNT_MASK = 0xff; 28 29 /* 30 * The number of reserved blocks must be large enough to prevent a new recovery journal 31 * block write from overwriting a block which appears to still be a valid head block of the 32 * journal. Currently, that means reserving enough space for all 2048 data_vios. 33 */ 34 #define RECOVERY_JOURNAL_RESERVED_BLOCKS \ 35 ((MAXIMUM_VDO_USER_VIOS / RECOVERY_JOURNAL_ENTRIES_PER_BLOCK) + 2) 36 37 /** 38 * DOC: Lock Counters. 39 * 40 * A lock_counter is intended to keep all of the locks for the blocks in the recovery journal. The 41 * per-zone counters are all kept in a single array which is arranged by zone (i.e. zone 0's lock 0 42 * is at index 0, zone 0's lock 1 is at index 1, and zone 1's lock 0 is at index 'locks'. This 43 * arrangement is intended to minimize cache-line contention for counters from different zones. 44 * 45 * The locks are implemented as a single object instead of as a lock counter per lock both to 46 * afford this opportunity to reduce cache line contention and also to eliminate the need to have a 47 * completion per lock. 48 * 49 * Lock sets are laid out with the set for recovery journal first, followed by the logical zones, 50 * and then the physical zones. 51 */ 52 53 enum lock_counter_state { 54 LOCK_COUNTER_STATE_NOT_NOTIFYING, 55 LOCK_COUNTER_STATE_NOTIFYING, 56 LOCK_COUNTER_STATE_SUSPENDED, 57 }; 58 59 /** 60 * get_zone_count_ptr() - Get a pointer to the zone count for a given lock on a given zone. 61 * @journal: The recovery journal. 62 * @lock_number: The lock to get. 63 * @zone_type: The zone type whose count is desired. 64 * 65 * Return: A pointer to the zone count for the given lock and zone. 66 */ 67 static inline atomic_t *get_zone_count_ptr(struct recovery_journal *journal, 68 block_count_t lock_number, 69 enum vdo_zone_type zone_type) 70 { 71 return ((zone_type == VDO_ZONE_TYPE_LOGICAL) 72 ? &journal->lock_counter.logical_zone_counts[lock_number] 73 : &journal->lock_counter.physical_zone_counts[lock_number]); 74 } 75 76 /** 77 * get_counter() - Get the zone counter for a given lock on a given zone. 78 * @journal: The recovery journal. 79 * @lock_number: The lock to get. 80 * @zone_type: The zone type whose count is desired. 81 * @zone_id: The zone index whose count is desired. 82 * 83 * Return: The counter for the given lock and zone. 84 */ 85 static inline u16 *get_counter(struct recovery_journal *journal, 86 block_count_t lock_number, enum vdo_zone_type zone_type, 87 zone_count_t zone_id) 88 { 89 struct lock_counter *counter = &journal->lock_counter; 90 block_count_t zone_counter = (counter->locks * zone_id) + lock_number; 91 92 if (zone_type == VDO_ZONE_TYPE_JOURNAL) 93 return &counter->journal_counters[zone_counter]; 94 95 if (zone_type == VDO_ZONE_TYPE_LOGICAL) 96 return &counter->logical_counters[zone_counter]; 97 98 return &counter->physical_counters[zone_counter]; 99 } 100 101 static atomic_t *get_decrement_counter(struct recovery_journal *journal, 102 block_count_t lock_number) 103 { 104 return &journal->lock_counter.journal_decrement_counts[lock_number]; 105 } 106 107 /** 108 * is_journal_zone_locked() - Check whether the journal zone is locked for a given lock. 109 * @journal: The recovery journal. 110 * @lock_number: The lock to check. 111 * 112 * Return: true if the journal zone is locked. 113 */ 114 static bool is_journal_zone_locked(struct recovery_journal *journal, 115 block_count_t lock_number) 116 { 117 u16 journal_value = *get_counter(journal, lock_number, VDO_ZONE_TYPE_JOURNAL, 0); 118 u32 decrements = atomic_read(get_decrement_counter(journal, lock_number)); 119 120 /* Pairs with barrier in vdo_release_journal_entry_lock() */ 121 smp_rmb(); 122 VDO_ASSERT_LOG_ONLY((decrements <= journal_value), 123 "journal zone lock counter must not underflow"); 124 return (journal_value != decrements); 125 } 126 127 /** 128 * vdo_release_recovery_journal_block_reference() - Release a reference to a recovery journal 129 * block. 130 * @journal: The recovery journal. 131 * @sequence_number: The journal sequence number of the referenced block. 132 * @zone_type: The type of the zone making the adjustment. 133 * @zone_id: The ID of the zone making the adjustment. 134 * 135 * If this is the last reference for a given zone type, an attempt will be made to reap the 136 * journal. 137 */ 138 void vdo_release_recovery_journal_block_reference(struct recovery_journal *journal, 139 sequence_number_t sequence_number, 140 enum vdo_zone_type zone_type, 141 zone_count_t zone_id) 142 { 143 u16 *current_value; 144 block_count_t lock_number; 145 int prior_state; 146 147 if (sequence_number == 0) 148 return; 149 150 lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number); 151 current_value = get_counter(journal, lock_number, zone_type, zone_id); 152 153 VDO_ASSERT_LOG_ONLY((*current_value >= 1), 154 "decrement of lock counter must not underflow"); 155 *current_value -= 1; 156 157 if (zone_type == VDO_ZONE_TYPE_JOURNAL) { 158 if (is_journal_zone_locked(journal, lock_number)) 159 return; 160 } else { 161 atomic_t *zone_count; 162 163 if (*current_value != 0) 164 return; 165 166 zone_count = get_zone_count_ptr(journal, lock_number, zone_type); 167 168 if (atomic_add_return(-1, zone_count) > 0) 169 return; 170 } 171 172 /* 173 * Extra barriers because this was original developed using a CAS operation that implicitly 174 * had them. 175 */ 176 smp_mb__before_atomic(); 177 prior_state = atomic_cmpxchg(&journal->lock_counter.state, 178 LOCK_COUNTER_STATE_NOT_NOTIFYING, 179 LOCK_COUNTER_STATE_NOTIFYING); 180 /* same as before_atomic */ 181 smp_mb__after_atomic(); 182 183 if (prior_state != LOCK_COUNTER_STATE_NOT_NOTIFYING) 184 return; 185 186 vdo_launch_completion(&journal->lock_counter.completion); 187 } 188 189 static inline struct recovery_journal_block * __must_check get_journal_block(struct list_head *list) 190 { 191 return list_first_entry_or_null(list, struct recovery_journal_block, list_node); 192 } 193 194 /** 195 * pop_free_list() - Get a block from the end of the free list. 196 * @journal: The journal. 197 * 198 * Return: The block or NULL if the list is empty. 199 */ 200 static struct recovery_journal_block * __must_check pop_free_list(struct recovery_journal *journal) 201 { 202 struct recovery_journal_block *block; 203 204 if (list_empty(&journal->free_tail_blocks)) 205 return NULL; 206 207 block = list_last_entry(&journal->free_tail_blocks, 208 struct recovery_journal_block, list_node); 209 list_del_init(&block->list_node); 210 return block; 211 } 212 213 /** 214 * is_block_dirty() - Check whether a recovery block is dirty. 215 * @block: The block to check. 216 * 217 * Indicates it has any uncommitted entries, which includes both entries not written and entries 218 * written but not yet acknowledged. 219 * 220 * Return: true if the block has any uncommitted entries. 221 */ 222 static inline bool __must_check is_block_dirty(const struct recovery_journal_block *block) 223 { 224 return (block->uncommitted_entry_count > 0); 225 } 226 227 /** 228 * is_block_empty() - Check whether a journal block is empty. 229 * @block: The block to check. 230 * 231 * Return: true if the block has no entries. 232 */ 233 static inline bool __must_check is_block_empty(const struct recovery_journal_block *block) 234 { 235 return (block->entry_count == 0); 236 } 237 238 /** 239 * is_block_full() - Check whether a journal block is full. 240 * @block: The block to check. 241 * 242 * Return: true if the block is full. 243 */ 244 static inline bool __must_check is_block_full(const struct recovery_journal_block *block) 245 { 246 return ((block == NULL) || (block->journal->entries_per_block == block->entry_count)); 247 } 248 249 /** 250 * assert_on_journal_thread() - Assert that we are running on the journal thread. 251 * @journal: The journal. 252 * @function_name: The function doing the check (for logging). 253 */ 254 static void assert_on_journal_thread(struct recovery_journal *journal, 255 const char *function_name) 256 { 257 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == journal->thread_id), 258 "%s() called on journal thread", function_name); 259 } 260 261 /** 262 * continue_waiter() - Release a data_vio from the journal. 263 * 264 * Invoked whenever a data_vio is to be released from the journal, either because its entry was 265 * committed to disk, or because there was an error. Implements waiter_callback_fn. 266 */ 267 static void continue_waiter(struct vdo_waiter *waiter, void *context) 268 { 269 continue_data_vio_with_error(vdo_waiter_as_data_vio(waiter), *((int *) context)); 270 } 271 272 /** 273 * has_block_waiters() - Check whether the journal has any waiters on any blocks. 274 * @journal: The journal in question. 275 * 276 * Return: true if any block has a waiter. 277 */ 278 static inline bool has_block_waiters(struct recovery_journal *journal) 279 { 280 struct recovery_journal_block *block = get_journal_block(&journal->active_tail_blocks); 281 282 /* 283 * Either the first active tail block (if it exists) has waiters, or no active tail block 284 * has waiters. 285 */ 286 return ((block != NULL) && 287 (vdo_waitq_has_waiters(&block->entry_waiters) || 288 vdo_waitq_has_waiters(&block->commit_waiters))); 289 } 290 291 static void recycle_journal_blocks(struct recovery_journal *journal); 292 static void recycle_journal_block(struct recovery_journal_block *block); 293 static void notify_commit_waiters(struct recovery_journal *journal); 294 295 /** 296 * suspend_lock_counter() - Prevent the lock counter from notifying. 297 * @counter: The counter. 298 * 299 * Return: true if the lock counter was not notifying and hence the suspend was efficacious. 300 */ 301 static bool suspend_lock_counter(struct lock_counter *counter) 302 { 303 int prior_state; 304 305 /* 306 * Extra barriers because this was originally developed using a CAS operation that 307 * implicitly had them. 308 */ 309 smp_mb__before_atomic(); 310 prior_state = atomic_cmpxchg(&counter->state, LOCK_COUNTER_STATE_NOT_NOTIFYING, 311 LOCK_COUNTER_STATE_SUSPENDED); 312 /* same as before_atomic */ 313 smp_mb__after_atomic(); 314 315 return ((prior_state == LOCK_COUNTER_STATE_SUSPENDED) || 316 (prior_state == LOCK_COUNTER_STATE_NOT_NOTIFYING)); 317 } 318 319 static inline bool is_read_only(struct recovery_journal *journal) 320 { 321 return vdo_is_read_only(journal->flush_vio->completion.vdo); 322 } 323 324 /** 325 * check_for_drain_complete() - Check whether the journal has drained. 326 * @journal: The journal which may have just drained. 327 */ 328 static void check_for_drain_complete(struct recovery_journal *journal) 329 { 330 int result = VDO_SUCCESS; 331 332 if (is_read_only(journal)) { 333 result = VDO_READ_ONLY; 334 /* 335 * Clean up any full active blocks which were not written due to read-only mode. 336 * 337 * FIXME: This would probably be better as a short-circuit in write_block(). 338 */ 339 notify_commit_waiters(journal); 340 recycle_journal_blocks(journal); 341 342 /* Release any data_vios waiting to be assigned entries. */ 343 vdo_waitq_notify_all_waiters(&journal->entry_waiters, 344 continue_waiter, &result); 345 } 346 347 if (!vdo_is_state_draining(&journal->state) || 348 journal->reaping || 349 has_block_waiters(journal) || 350 vdo_waitq_has_waiters(&journal->entry_waiters) || 351 !suspend_lock_counter(&journal->lock_counter)) 352 return; 353 354 if (vdo_is_state_saving(&journal->state)) { 355 if (journal->active_block != NULL) { 356 VDO_ASSERT_LOG_ONLY(((result == VDO_READ_ONLY) || 357 !is_block_dirty(journal->active_block)), 358 "journal being saved has clean active block"); 359 recycle_journal_block(journal->active_block); 360 } 361 362 VDO_ASSERT_LOG_ONLY(list_empty(&journal->active_tail_blocks), 363 "all blocks in a journal being saved must be inactive"); 364 } 365 366 vdo_finish_draining_with_result(&journal->state, result); 367 } 368 369 /** 370 * notify_recovery_journal_of_read_only_mode() - Notify a recovery journal that the VDO has gone 371 * read-only. 372 * @listener: The journal. 373 * @parent: The completion to notify in order to acknowledge the notification. 374 * 375 * Implements vdo_read_only_notification_fn. 376 */ 377 static void notify_recovery_journal_of_read_only_mode(void *listener, 378 struct vdo_completion *parent) 379 { 380 check_for_drain_complete(listener); 381 vdo_finish_completion(parent); 382 } 383 384 /** 385 * enter_journal_read_only_mode() - Put the journal in read-only mode. 386 * @journal: The journal which has failed. 387 * @error_code: The error result triggering this call. 388 * 389 * All attempts to add entries after this function is called will fail. All VIOs waiting for 390 * commits will be awakened with an error. 391 */ 392 static void enter_journal_read_only_mode(struct recovery_journal *journal, 393 int error_code) 394 { 395 vdo_enter_read_only_mode(journal->flush_vio->completion.vdo, error_code); 396 check_for_drain_complete(journal); 397 } 398 399 /** 400 * vdo_get_recovery_journal_current_sequence_number() - Obtain the recovery journal's current 401 * sequence number. 402 * @journal: The journal in question. 403 * 404 * Exposed only so the block map can be initialized therefrom. 405 * 406 * Return: The sequence number of the tail block. 407 */ 408 sequence_number_t vdo_get_recovery_journal_current_sequence_number(struct recovery_journal *journal) 409 { 410 return journal->tail; 411 } 412 413 /** 414 * get_recovery_journal_head() - Get the head of the recovery journal. 415 * @journal: The journal. 416 * 417 * The head is the lowest sequence number of the block map head and the slab journal head. 418 * 419 * Return: the head of the journal. 420 */ 421 static inline sequence_number_t get_recovery_journal_head(const struct recovery_journal *journal) 422 { 423 return min(journal->block_map_head, journal->slab_journal_head); 424 } 425 426 /** 427 * compute_recovery_count_byte() - Compute the recovery count byte for a given recovery count. 428 * @recovery_count: The recovery count. 429 * 430 * Return: The byte corresponding to the recovery count. 431 */ 432 static inline u8 __must_check compute_recovery_count_byte(u64 recovery_count) 433 { 434 return (u8)(recovery_count & RECOVERY_COUNT_MASK); 435 } 436 437 /** 438 * check_slab_journal_commit_threshold() - Check whether the journal is over the threshold, and if 439 * so, force the oldest slab journal tail block to commit. 440 * @journal: The journal. 441 */ 442 static void check_slab_journal_commit_threshold(struct recovery_journal *journal) 443 { 444 block_count_t current_length = journal->tail - journal->slab_journal_head; 445 446 if (current_length > journal->slab_journal_commit_threshold) { 447 journal->events.slab_journal_commits_requested++; 448 vdo_commit_oldest_slab_journal_tail_blocks(journal->depot, 449 journal->slab_journal_head); 450 } 451 } 452 453 static void reap_recovery_journal(struct recovery_journal *journal); 454 static void assign_entries(struct recovery_journal *journal); 455 456 /** 457 * finish_reaping() - Finish reaping the journal. 458 * @journal: The journal being reaped. 459 */ 460 static void finish_reaping(struct recovery_journal *journal) 461 { 462 block_count_t blocks_reaped; 463 sequence_number_t old_head = get_recovery_journal_head(journal); 464 465 journal->block_map_head = journal->block_map_reap_head; 466 journal->slab_journal_head = journal->slab_journal_reap_head; 467 blocks_reaped = get_recovery_journal_head(journal) - old_head; 468 journal->available_space += blocks_reaped * journal->entries_per_block; 469 journal->reaping = false; 470 check_slab_journal_commit_threshold(journal); 471 assign_entries(journal); 472 check_for_drain_complete(journal); 473 } 474 475 /** 476 * complete_reaping() - Finish reaping the journal after flushing the lower layer. 477 * @completion: The journal's flush VIO. 478 * 479 * This is the callback registered in reap_recovery_journal(). 480 */ 481 static void complete_reaping(struct vdo_completion *completion) 482 { 483 struct recovery_journal *journal = completion->parent; 484 485 finish_reaping(journal); 486 487 /* Try reaping again in case more locks were released while flush was out. */ 488 reap_recovery_journal(journal); 489 } 490 491 /** 492 * handle_flush_error() - Handle an error when flushing the lower layer due to reaping. 493 * @completion: The journal's flush VIO. 494 */ 495 static void handle_flush_error(struct vdo_completion *completion) 496 { 497 struct recovery_journal *journal = completion->parent; 498 499 vio_record_metadata_io_error(as_vio(completion)); 500 journal->reaping = false; 501 enter_journal_read_only_mode(journal, completion->result); 502 } 503 504 static void flush_endio(struct bio *bio) 505 { 506 struct vio *vio = bio->bi_private; 507 struct recovery_journal *journal = vio->completion.parent; 508 509 continue_vio_after_io(vio, complete_reaping, journal->thread_id); 510 } 511 512 /** 513 * initialize_journal_state() - Set all journal fields appropriately to start journaling from the 514 * current active block. 515 * @journal: The journal to be reset based on its active block. 516 */ 517 static void initialize_journal_state(struct recovery_journal *journal) 518 { 519 journal->append_point.sequence_number = journal->tail; 520 journal->last_write_acknowledged = journal->tail; 521 journal->block_map_head = journal->tail; 522 journal->slab_journal_head = journal->tail; 523 journal->block_map_reap_head = journal->tail; 524 journal->slab_journal_reap_head = journal->tail; 525 journal->block_map_head_block_number = 526 vdo_get_recovery_journal_block_number(journal, journal->block_map_head); 527 journal->slab_journal_head_block_number = 528 vdo_get_recovery_journal_block_number(journal, 529 journal->slab_journal_head); 530 journal->available_space = 531 (journal->entries_per_block * vdo_get_recovery_journal_length(journal->size)); 532 } 533 534 /** 535 * vdo_get_recovery_journal_length() - Get the number of usable recovery journal blocks. 536 * @journal_size: The size of the recovery journal in blocks. 537 * 538 * Return: the number of recovery journal blocks usable for entries. 539 */ 540 block_count_t vdo_get_recovery_journal_length(block_count_t journal_size) 541 { 542 block_count_t reserved_blocks = journal_size / 4; 543 544 if (reserved_blocks > RECOVERY_JOURNAL_RESERVED_BLOCKS) 545 reserved_blocks = RECOVERY_JOURNAL_RESERVED_BLOCKS; 546 return (journal_size - reserved_blocks); 547 } 548 549 /** 550 * reap_recovery_journal_callback() - Attempt to reap the journal. 551 * @completion: The lock counter completion. 552 * 553 * Attempts to reap the journal now that all the locks on some journal block have been released. 554 * This is the callback registered with the lock counter. 555 */ 556 static void reap_recovery_journal_callback(struct vdo_completion *completion) 557 { 558 struct recovery_journal *journal = (struct recovery_journal *) completion->parent; 559 /* 560 * The acknowledgment must be done before reaping so that there is no race between 561 * acknowledging the notification and unlocks wishing to notify. 562 */ 563 smp_wmb(); 564 atomic_set(&journal->lock_counter.state, LOCK_COUNTER_STATE_NOT_NOTIFYING); 565 566 if (vdo_is_state_quiescing(&journal->state)) { 567 /* 568 * Don't start reaping when the journal is trying to quiesce. Do check if this 569 * notification is the last thing the is waiting on. 570 */ 571 check_for_drain_complete(journal); 572 return; 573 } 574 575 reap_recovery_journal(journal); 576 check_slab_journal_commit_threshold(journal); 577 } 578 579 /** 580 * initialize_lock_counter() - Initialize a lock counter. 581 * 582 * @journal: The recovery journal. 583 * @vdo: The vdo. 584 * 585 * Return: VDO_SUCCESS or an error. 586 */ 587 static int __must_check initialize_lock_counter(struct recovery_journal *journal, 588 struct vdo *vdo) 589 { 590 int result; 591 struct thread_config *config = &vdo->thread_config; 592 struct lock_counter *counter = &journal->lock_counter; 593 594 result = vdo_allocate(journal->size, u16, __func__, &counter->journal_counters); 595 if (result != VDO_SUCCESS) 596 return result; 597 598 result = vdo_allocate(journal->size, atomic_t, __func__, 599 &counter->journal_decrement_counts); 600 if (result != VDO_SUCCESS) 601 return result; 602 603 result = vdo_allocate(journal->size * config->logical_zone_count, u16, __func__, 604 &counter->logical_counters); 605 if (result != VDO_SUCCESS) 606 return result; 607 608 result = vdo_allocate(journal->size, atomic_t, __func__, 609 &counter->logical_zone_counts); 610 if (result != VDO_SUCCESS) 611 return result; 612 613 result = vdo_allocate(journal->size * config->physical_zone_count, u16, __func__, 614 &counter->physical_counters); 615 if (result != VDO_SUCCESS) 616 return result; 617 618 result = vdo_allocate(journal->size, atomic_t, __func__, 619 &counter->physical_zone_counts); 620 if (result != VDO_SUCCESS) 621 return result; 622 623 vdo_initialize_completion(&counter->completion, vdo, 624 VDO_LOCK_COUNTER_COMPLETION); 625 vdo_prepare_completion(&counter->completion, reap_recovery_journal_callback, 626 reap_recovery_journal_callback, config->journal_thread, 627 journal); 628 counter->logical_zones = config->logical_zone_count; 629 counter->physical_zones = config->physical_zone_count; 630 counter->locks = journal->size; 631 return VDO_SUCCESS; 632 } 633 634 /** 635 * set_journal_tail() - Set the journal's tail sequence number. 636 * @journal: The journal whose tail is to be set. 637 * @tail: The new tail value. 638 */ 639 static void set_journal_tail(struct recovery_journal *journal, sequence_number_t tail) 640 { 641 /* VDO does not support sequence numbers above 1 << 48 in the slab journal. */ 642 if (tail >= (1ULL << 48)) 643 enter_journal_read_only_mode(journal, VDO_JOURNAL_OVERFLOW); 644 645 journal->tail = tail; 646 } 647 648 /** 649 * initialize_recovery_block() - Initialize a journal block. 650 * @vdo: The vdo from which to construct vios. 651 * @journal: The journal to which the block will belong. 652 * @block: The block to initialize. 653 * 654 * Return: VDO_SUCCESS or an error. 655 */ 656 static int initialize_recovery_block(struct vdo *vdo, struct recovery_journal *journal, 657 struct recovery_journal_block *block) 658 { 659 char *data; 660 int result; 661 662 /* 663 * Ensure that a block is large enough to store RECOVERY_JOURNAL_ENTRIES_PER_BLOCK entries. 664 */ 665 BUILD_BUG_ON(RECOVERY_JOURNAL_ENTRIES_PER_BLOCK > 666 ((VDO_BLOCK_SIZE - sizeof(struct packed_journal_header)) / 667 sizeof(struct packed_recovery_journal_entry))); 668 669 /* 670 * Allocate a full block for the journal block even though not all of the space is used 671 * since the VIO needs to write a full disk block. 672 */ 673 result = vdo_allocate(VDO_BLOCK_SIZE, char, __func__, &data); 674 if (result != VDO_SUCCESS) 675 return result; 676 677 result = allocate_vio_components(vdo, VIO_TYPE_RECOVERY_JOURNAL, 678 VIO_PRIORITY_HIGH, block, 1, data, &block->vio); 679 if (result != VDO_SUCCESS) { 680 vdo_free(data); 681 return result; 682 } 683 684 list_add_tail(&block->list_node, &journal->free_tail_blocks); 685 block->journal = journal; 686 return VDO_SUCCESS; 687 } 688 689 /** 690 * vdo_decode_recovery_journal() - Make a recovery journal and initialize it with the state that 691 * was decoded from the super block. 692 * 693 * @state: The decoded state of the journal. 694 * @nonce: The nonce of the VDO. 695 * @vdo: The VDO. 696 * @partition: The partition for the journal. 697 * @recovery_count: The VDO's number of completed recoveries. 698 * @journal_size: The number of blocks in the journal on disk. 699 * @journal_ptr: The pointer to hold the new recovery journal. 700 * 701 * Return: A success or error code. 702 */ 703 int vdo_decode_recovery_journal(struct recovery_journal_state_7_0 state, nonce_t nonce, 704 struct vdo *vdo, struct partition *partition, 705 u64 recovery_count, block_count_t journal_size, 706 struct recovery_journal **journal_ptr) 707 { 708 block_count_t i; 709 struct recovery_journal *journal; 710 int result; 711 712 result = vdo_allocate_extended(struct recovery_journal, 713 RECOVERY_JOURNAL_RESERVED_BLOCKS, 714 struct recovery_journal_block, __func__, 715 &journal); 716 if (result != VDO_SUCCESS) 717 return result; 718 719 INIT_LIST_HEAD(&journal->free_tail_blocks); 720 INIT_LIST_HEAD(&journal->active_tail_blocks); 721 vdo_waitq_init(&journal->pending_writes); 722 723 journal->thread_id = vdo->thread_config.journal_thread; 724 journal->origin = partition->offset; 725 journal->nonce = nonce; 726 journal->recovery_count = compute_recovery_count_byte(recovery_count); 727 journal->size = journal_size; 728 journal->slab_journal_commit_threshold = (journal_size * 2) / 3; 729 journal->logical_blocks_used = state.logical_blocks_used; 730 journal->block_map_data_blocks = state.block_map_data_blocks; 731 journal->entries_per_block = RECOVERY_JOURNAL_ENTRIES_PER_BLOCK; 732 set_journal_tail(journal, state.journal_start); 733 initialize_journal_state(journal); 734 /* TODO: this will have to change if we make initial resume of a VDO a real resume */ 735 vdo_set_admin_state_code(&journal->state, VDO_ADMIN_STATE_SUSPENDED); 736 737 for (i = 0; i < RECOVERY_JOURNAL_RESERVED_BLOCKS; i++) { 738 struct recovery_journal_block *block = &journal->blocks[i]; 739 740 result = initialize_recovery_block(vdo, journal, block); 741 if (result != VDO_SUCCESS) { 742 vdo_free_recovery_journal(journal); 743 return result; 744 } 745 } 746 747 result = initialize_lock_counter(journal, vdo); 748 if (result != VDO_SUCCESS) { 749 vdo_free_recovery_journal(journal); 750 return result; 751 } 752 753 result = create_metadata_vio(vdo, VIO_TYPE_RECOVERY_JOURNAL, VIO_PRIORITY_HIGH, 754 journal, NULL, &journal->flush_vio); 755 if (result != VDO_SUCCESS) { 756 vdo_free_recovery_journal(journal); 757 return result; 758 } 759 760 result = vdo_register_read_only_listener(vdo, journal, 761 notify_recovery_journal_of_read_only_mode, 762 journal->thread_id); 763 if (result != VDO_SUCCESS) { 764 vdo_free_recovery_journal(journal); 765 return result; 766 } 767 768 result = vdo_make_default_thread(vdo, journal->thread_id); 769 if (result != VDO_SUCCESS) { 770 vdo_free_recovery_journal(journal); 771 return result; 772 } 773 774 journal->flush_vio->completion.callback_thread_id = journal->thread_id; 775 *journal_ptr = journal; 776 return VDO_SUCCESS; 777 } 778 779 /** 780 * vdo_free_recovery_journal() - Free a recovery journal. 781 * @journal: The recovery journal to free. 782 */ 783 void vdo_free_recovery_journal(struct recovery_journal *journal) 784 { 785 block_count_t i; 786 787 if (journal == NULL) 788 return; 789 790 vdo_free(vdo_forget(journal->lock_counter.logical_zone_counts)); 791 vdo_free(vdo_forget(journal->lock_counter.physical_zone_counts)); 792 vdo_free(vdo_forget(journal->lock_counter.journal_counters)); 793 vdo_free(vdo_forget(journal->lock_counter.journal_decrement_counts)); 794 vdo_free(vdo_forget(journal->lock_counter.logical_counters)); 795 vdo_free(vdo_forget(journal->lock_counter.physical_counters)); 796 free_vio(vdo_forget(journal->flush_vio)); 797 798 /* 799 * FIXME: eventually, the journal should be constructed in a quiescent state which 800 * requires opening before use. 801 */ 802 if (!vdo_is_state_quiescent(&journal->state)) { 803 VDO_ASSERT_LOG_ONLY(list_empty(&journal->active_tail_blocks), 804 "journal being freed has no active tail blocks"); 805 } else if (!vdo_is_state_saved(&journal->state) && 806 !list_empty(&journal->active_tail_blocks)) { 807 vdo_log_warning("journal being freed has uncommitted entries"); 808 } 809 810 for (i = 0; i < RECOVERY_JOURNAL_RESERVED_BLOCKS; i++) { 811 struct recovery_journal_block *block = &journal->blocks[i]; 812 813 vdo_free(vdo_forget(block->vio.data)); 814 free_vio_components(&block->vio); 815 } 816 817 vdo_free(journal); 818 } 819 820 /** 821 * vdo_initialize_recovery_journal_post_repair() - Initialize the journal after a repair. 822 * @journal: The journal in question. 823 * @recovery_count: The number of completed recoveries. 824 * @tail: The new tail block sequence number. 825 * @logical_blocks_used: The new number of logical blocks used. 826 * @block_map_data_blocks: The new number of block map data blocks. 827 */ 828 void vdo_initialize_recovery_journal_post_repair(struct recovery_journal *journal, 829 u64 recovery_count, 830 sequence_number_t tail, 831 block_count_t logical_blocks_used, 832 block_count_t block_map_data_blocks) 833 { 834 set_journal_tail(journal, tail + 1); 835 journal->recovery_count = compute_recovery_count_byte(recovery_count); 836 initialize_journal_state(journal); 837 journal->logical_blocks_used = logical_blocks_used; 838 journal->block_map_data_blocks = block_map_data_blocks; 839 } 840 841 /** 842 * vdo_get_journal_block_map_data_blocks_used() - Get the number of block map pages, allocated from 843 * data blocks, currently in use. 844 * @journal: The journal in question. 845 * 846 * Return: The number of block map pages allocated from slabs. 847 */ 848 block_count_t vdo_get_journal_block_map_data_blocks_used(struct recovery_journal *journal) 849 { 850 return journal->block_map_data_blocks; 851 } 852 853 /** 854 * vdo_get_recovery_journal_thread_id() - Get the ID of a recovery journal's thread. 855 * @journal: The journal to query. 856 * 857 * Return: The ID of the journal's thread. 858 */ 859 thread_id_t vdo_get_recovery_journal_thread_id(struct recovery_journal *journal) 860 { 861 return journal->thread_id; 862 } 863 864 /** 865 * vdo_open_recovery_journal() - Prepare the journal for new entries. 866 * @journal: The journal in question. 867 * @depot: The slab depot for this VDO. 868 * @block_map: The block map for this VDO. 869 */ 870 void vdo_open_recovery_journal(struct recovery_journal *journal, 871 struct slab_depot *depot, struct block_map *block_map) 872 { 873 journal->depot = depot; 874 journal->block_map = block_map; 875 WRITE_ONCE(journal->state.current_state, VDO_ADMIN_STATE_NORMAL_OPERATION); 876 } 877 878 /** 879 * vdo_record_recovery_journal() - Record the state of a recovery journal for encoding in the super 880 * block. 881 * @journal: the recovery journal. 882 * 883 * Return: the state of the journal. 884 */ 885 struct recovery_journal_state_7_0 886 vdo_record_recovery_journal(const struct recovery_journal *journal) 887 { 888 struct recovery_journal_state_7_0 state = { 889 .logical_blocks_used = journal->logical_blocks_used, 890 .block_map_data_blocks = journal->block_map_data_blocks, 891 }; 892 893 if (vdo_is_state_saved(&journal->state)) { 894 /* 895 * If the journal is saved, we should start one past the active block (since the 896 * active block is not guaranteed to be empty). 897 */ 898 state.journal_start = journal->tail; 899 } else { 900 /* 901 * When we're merely suspended or have gone read-only, we must record the first 902 * block that might have entries that need to be applied. 903 */ 904 state.journal_start = get_recovery_journal_head(journal); 905 } 906 907 return state; 908 } 909 910 /** 911 * get_block_header() - Get a pointer to the packed journal block header in the block buffer. 912 * @block: The recovery block. 913 * 914 * Return: The block's header. 915 */ 916 static inline struct packed_journal_header * 917 get_block_header(const struct recovery_journal_block *block) 918 { 919 return (struct packed_journal_header *) block->vio.data; 920 } 921 922 /** 923 * set_active_sector() - Set the current sector of the current block and initialize it. 924 * @block: The block to update. 925 * @sector: A pointer to the first byte of the new sector. 926 */ 927 static void set_active_sector(struct recovery_journal_block *block, void *sector) 928 { 929 block->sector = sector; 930 block->sector->check_byte = get_block_header(block)->check_byte; 931 block->sector->recovery_count = block->journal->recovery_count; 932 block->sector->entry_count = 0; 933 } 934 935 /** 936 * advance_tail() - Advance the tail of the journal. 937 * @journal: The journal whose tail should be advanced. 938 * 939 * Return: true if the tail was advanced. 940 */ 941 static bool advance_tail(struct recovery_journal *journal) 942 { 943 struct recovery_block_header unpacked; 944 struct packed_journal_header *header; 945 struct recovery_journal_block *block; 946 947 block = journal->active_block = pop_free_list(journal); 948 if (block == NULL) 949 return false; 950 951 list_move_tail(&block->list_node, &journal->active_tail_blocks); 952 953 unpacked = (struct recovery_block_header) { 954 .metadata_type = VDO_METADATA_RECOVERY_JOURNAL_2, 955 .block_map_data_blocks = journal->block_map_data_blocks, 956 .logical_blocks_used = journal->logical_blocks_used, 957 .nonce = journal->nonce, 958 .recovery_count = journal->recovery_count, 959 .sequence_number = journal->tail, 960 .check_byte = vdo_compute_recovery_journal_check_byte(journal, 961 journal->tail), 962 }; 963 964 header = get_block_header(block); 965 memset(block->vio.data, 0x0, VDO_BLOCK_SIZE); 966 block->sequence_number = journal->tail; 967 block->entry_count = 0; 968 block->uncommitted_entry_count = 0; 969 block->block_number = vdo_get_recovery_journal_block_number(journal, 970 journal->tail); 971 972 vdo_pack_recovery_block_header(&unpacked, header); 973 set_active_sector(block, vdo_get_journal_block_sector(header, 1)); 974 set_journal_tail(journal, journal->tail + 1); 975 vdo_advance_block_map_era(journal->block_map, journal->tail); 976 return true; 977 } 978 979 /** 980 * initialize_lock_count() - Initialize the value of the journal zone's counter for a given lock. 981 * @journal: The recovery journal. 982 * 983 * Context: This must be called from the journal zone. 984 */ 985 static void initialize_lock_count(struct recovery_journal *journal) 986 { 987 u16 *journal_value; 988 block_count_t lock_number = journal->active_block->block_number; 989 atomic_t *decrement_counter = get_decrement_counter(journal, lock_number); 990 991 journal_value = get_counter(journal, lock_number, VDO_ZONE_TYPE_JOURNAL, 0); 992 VDO_ASSERT_LOG_ONLY((*journal_value == atomic_read(decrement_counter)), 993 "count to be initialized not in use"); 994 *journal_value = journal->entries_per_block + 1; 995 atomic_set(decrement_counter, 0); 996 } 997 998 /** 999 * prepare_to_assign_entry() - Prepare the currently active block to receive an entry and check 1000 * whether an entry of the given type may be assigned at this time. 1001 * @journal: The journal receiving an entry. 1002 * 1003 * Return: true if there is space in the journal to store an entry of the specified type. 1004 */ 1005 static bool prepare_to_assign_entry(struct recovery_journal *journal) 1006 { 1007 if (journal->available_space == 0) 1008 return false; 1009 1010 if (is_block_full(journal->active_block) && !advance_tail(journal)) 1011 return false; 1012 1013 if (!is_block_empty(journal->active_block)) 1014 return true; 1015 1016 if ((journal->tail - get_recovery_journal_head(journal)) > journal->size) { 1017 /* Cannot use this block since the journal is full. */ 1018 journal->events.disk_full++; 1019 return false; 1020 } 1021 1022 /* 1023 * Don't allow the new block to be reaped until all of its entries have been committed to 1024 * the block map and until the journal block has been fully committed as well. Because the 1025 * block map update is done only after any slab journal entries have been made, the 1026 * per-entry lock for the block map entry serves to protect those as well. 1027 */ 1028 initialize_lock_count(journal); 1029 return true; 1030 } 1031 1032 static void write_blocks(struct recovery_journal *journal); 1033 1034 /** 1035 * schedule_block_write() - Queue a block for writing. 1036 * @journal: The journal in question. 1037 * @block: The block which is now ready to write. 1038 * 1039 * The block is expected to be full. If the block is currently writing, this is a noop as the block 1040 * will be queued for writing when the write finishes. The block must not currently be queued for 1041 * writing. 1042 */ 1043 static void schedule_block_write(struct recovery_journal *journal, 1044 struct recovery_journal_block *block) 1045 { 1046 if (!block->committing) 1047 vdo_waitq_enqueue_waiter(&journal->pending_writes, &block->write_waiter); 1048 /* 1049 * At the end of adding entries, or discovering this partial block is now full and ready to 1050 * rewrite, we will call write_blocks() and write a whole batch. 1051 */ 1052 } 1053 1054 /** 1055 * release_journal_block_reference() - Release a reference to a journal block. 1056 * @block: The journal block from which to release a reference. 1057 */ 1058 static void release_journal_block_reference(struct recovery_journal_block *block) 1059 { 1060 vdo_release_recovery_journal_block_reference(block->journal, 1061 block->sequence_number, 1062 VDO_ZONE_TYPE_JOURNAL, 0); 1063 } 1064 1065 static void update_usages(struct recovery_journal *journal, struct data_vio *data_vio) 1066 { 1067 if (data_vio->increment_updater.operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) { 1068 journal->block_map_data_blocks++; 1069 return; 1070 } 1071 1072 if (data_vio->new_mapped.state != VDO_MAPPING_STATE_UNMAPPED) 1073 journal->logical_blocks_used++; 1074 1075 if (data_vio->mapped.state != VDO_MAPPING_STATE_UNMAPPED) 1076 journal->logical_blocks_used--; 1077 } 1078 1079 /** 1080 * assign_entry() - Assign an entry waiter to the active block. 1081 * 1082 * Implements waiter_callback_fn. 1083 */ 1084 static void assign_entry(struct vdo_waiter *waiter, void *context) 1085 { 1086 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1087 struct recovery_journal_block *block = context; 1088 struct recovery_journal *journal = block->journal; 1089 1090 /* Record the point at which we will make the journal entry. */ 1091 data_vio->recovery_journal_point = (struct journal_point) { 1092 .sequence_number = block->sequence_number, 1093 .entry_count = block->entry_count, 1094 }; 1095 1096 update_usages(journal, data_vio); 1097 journal->available_space--; 1098 1099 if (!vdo_waitq_has_waiters(&block->entry_waiters)) 1100 journal->events.blocks.started++; 1101 1102 vdo_waitq_enqueue_waiter(&block->entry_waiters, &data_vio->waiter); 1103 block->entry_count++; 1104 block->uncommitted_entry_count++; 1105 journal->events.entries.started++; 1106 1107 if (is_block_full(block)) { 1108 /* 1109 * The block is full, so we can write it anytime henceforth. If it is already 1110 * committing, we'll queue it for writing when it comes back. 1111 */ 1112 schedule_block_write(journal, block); 1113 } 1114 1115 /* Force out slab journal tail blocks when threshold is reached. */ 1116 check_slab_journal_commit_threshold(journal); 1117 } 1118 1119 static void assign_entries(struct recovery_journal *journal) 1120 { 1121 if (journal->adding_entries) { 1122 /* Protect against re-entrancy. */ 1123 return; 1124 } 1125 1126 journal->adding_entries = true; 1127 while (vdo_waitq_has_waiters(&journal->entry_waiters) && 1128 prepare_to_assign_entry(journal)) { 1129 vdo_waitq_notify_next_waiter(&journal->entry_waiters, 1130 assign_entry, journal->active_block); 1131 } 1132 1133 /* Now that we've finished with entries, see if we have a batch of blocks to write. */ 1134 write_blocks(journal); 1135 journal->adding_entries = false; 1136 } 1137 1138 /** 1139 * recycle_journal_block() - Prepare an in-memory journal block to be reused now that it has been 1140 * fully committed. 1141 * @block: The block to be recycled. 1142 */ 1143 static void recycle_journal_block(struct recovery_journal_block *block) 1144 { 1145 struct recovery_journal *journal = block->journal; 1146 block_count_t i; 1147 1148 list_move_tail(&block->list_node, &journal->free_tail_blocks); 1149 1150 /* Release any unused entry locks. */ 1151 for (i = block->entry_count; i < journal->entries_per_block; i++) 1152 release_journal_block_reference(block); 1153 1154 /* 1155 * Release our own lock against reaping now that the block is completely committed, or 1156 * we're giving up because we're in read-only mode. 1157 */ 1158 if (block->entry_count > 0) 1159 release_journal_block_reference(block); 1160 1161 if (block == journal->active_block) 1162 journal->active_block = NULL; 1163 } 1164 1165 /** 1166 * continue_committed_waiter() - invoked whenever a VIO is to be released from the journal because 1167 * its entry was committed to disk. 1168 * 1169 * Implements waiter_callback_fn. 1170 */ 1171 static void continue_committed_waiter(struct vdo_waiter *waiter, void *context) 1172 { 1173 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1174 struct recovery_journal *journal = context; 1175 int result = (is_read_only(journal) ? VDO_READ_ONLY : VDO_SUCCESS); 1176 bool has_decrement; 1177 1178 VDO_ASSERT_LOG_ONLY(vdo_before_journal_point(&journal->commit_point, 1179 &data_vio->recovery_journal_point), 1180 "DataVIOs released from recovery journal in order. Recovery journal point is (%llu, %u), but commit waiter point is (%llu, %u)", 1181 (unsigned long long) journal->commit_point.sequence_number, 1182 journal->commit_point.entry_count, 1183 (unsigned long long) data_vio->recovery_journal_point.sequence_number, 1184 data_vio->recovery_journal_point.entry_count); 1185 1186 journal->commit_point = data_vio->recovery_journal_point; 1187 data_vio->last_async_operation = VIO_ASYNC_OP_UPDATE_REFERENCE_COUNTS; 1188 if (result != VDO_SUCCESS) { 1189 continue_data_vio_with_error(data_vio, result); 1190 return; 1191 } 1192 1193 /* 1194 * The increment must be launched first since it must come before the 1195 * decrement if they are in the same slab. 1196 */ 1197 has_decrement = (data_vio->decrement_updater.zpbn.pbn != VDO_ZERO_BLOCK); 1198 if ((data_vio->increment_updater.zpbn.pbn != VDO_ZERO_BLOCK) || !has_decrement) 1199 continue_data_vio(data_vio); 1200 1201 if (has_decrement) 1202 vdo_launch_completion(&data_vio->decrement_completion); 1203 } 1204 1205 /** 1206 * notify_commit_waiters() - Notify any VIOs whose entries have now committed. 1207 * @journal: The recovery journal to update. 1208 */ 1209 static void notify_commit_waiters(struct recovery_journal *journal) 1210 { 1211 struct recovery_journal_block *block; 1212 1213 list_for_each_entry(block, &journal->active_tail_blocks, list_node) { 1214 if (block->committing) 1215 return; 1216 1217 vdo_waitq_notify_all_waiters(&block->commit_waiters, 1218 continue_committed_waiter, journal); 1219 if (is_read_only(journal)) { 1220 vdo_waitq_notify_all_waiters(&block->entry_waiters, 1221 continue_committed_waiter, 1222 journal); 1223 } else if (is_block_dirty(block) || !is_block_full(block)) { 1224 /* Stop at partially-committed or partially-filled blocks. */ 1225 return; 1226 } 1227 } 1228 } 1229 1230 /** 1231 * recycle_journal_blocks() - Recycle any journal blocks which have been fully committed. 1232 * @journal: The recovery journal to update. 1233 */ 1234 static void recycle_journal_blocks(struct recovery_journal *journal) 1235 { 1236 struct recovery_journal_block *block, *tmp; 1237 1238 list_for_each_entry_safe(block, tmp, &journal->active_tail_blocks, list_node) { 1239 if (block->committing) { 1240 /* Don't recycle committing blocks. */ 1241 return; 1242 } 1243 1244 if (!is_read_only(journal) && 1245 (is_block_dirty(block) || !is_block_full(block))) { 1246 /* 1247 * Don't recycle partially written or partially full blocks, except in 1248 * read-only mode. 1249 */ 1250 return; 1251 } 1252 1253 recycle_journal_block(block); 1254 } 1255 } 1256 1257 /** 1258 * complete_write() - Handle post-commit processing. 1259 * @completion: The completion of the VIO writing this block. 1260 * 1261 * This is the callback registered by write_block(). If more entries accumulated in the block being 1262 * committed while the commit was in progress, another commit will be initiated. 1263 */ 1264 static void complete_write(struct vdo_completion *completion) 1265 { 1266 struct recovery_journal_block *block = completion->parent; 1267 struct recovery_journal *journal = block->journal; 1268 struct recovery_journal_block *last_active_block; 1269 1270 assert_on_journal_thread(journal, __func__); 1271 1272 journal->pending_write_count -= 1; 1273 journal->events.blocks.committed += 1; 1274 journal->events.entries.committed += block->entries_in_commit; 1275 block->uncommitted_entry_count -= block->entries_in_commit; 1276 block->entries_in_commit = 0; 1277 block->committing = false; 1278 1279 /* If this block is the latest block to be acknowledged, record that fact. */ 1280 if (block->sequence_number > journal->last_write_acknowledged) 1281 journal->last_write_acknowledged = block->sequence_number; 1282 1283 last_active_block = get_journal_block(&journal->active_tail_blocks); 1284 VDO_ASSERT_LOG_ONLY((block->sequence_number >= last_active_block->sequence_number), 1285 "completed journal write is still active"); 1286 1287 notify_commit_waiters(journal); 1288 1289 /* 1290 * Is this block now full? Reaping, and adding entries, might have already sent it off for 1291 * rewriting; else, queue it for rewrite. 1292 */ 1293 if (is_block_dirty(block) && is_block_full(block)) 1294 schedule_block_write(journal, block); 1295 1296 recycle_journal_blocks(journal); 1297 write_blocks(journal); 1298 1299 check_for_drain_complete(journal); 1300 } 1301 1302 static void handle_write_error(struct vdo_completion *completion) 1303 { 1304 struct recovery_journal_block *block = completion->parent; 1305 struct recovery_journal *journal = block->journal; 1306 1307 vio_record_metadata_io_error(as_vio(completion)); 1308 vdo_log_error_strerror(completion->result, 1309 "cannot write recovery journal block %llu", 1310 (unsigned long long) block->sequence_number); 1311 enter_journal_read_only_mode(journal, completion->result); 1312 complete_write(completion); 1313 } 1314 1315 static void complete_write_endio(struct bio *bio) 1316 { 1317 struct vio *vio = bio->bi_private; 1318 struct recovery_journal_block *block = vio->completion.parent; 1319 struct recovery_journal *journal = block->journal; 1320 1321 continue_vio_after_io(vio, complete_write, journal->thread_id); 1322 } 1323 1324 /** 1325 * add_queued_recovery_entries() - Actually add entries from the queue to the given block. 1326 * @block: The journal block. 1327 */ 1328 static void add_queued_recovery_entries(struct recovery_journal_block *block) 1329 { 1330 while (vdo_waitq_has_waiters(&block->entry_waiters)) { 1331 struct data_vio *data_vio = 1332 vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&block->entry_waiters)); 1333 struct tree_lock *lock = &data_vio->tree_lock; 1334 struct packed_recovery_journal_entry *packed_entry; 1335 struct recovery_journal_entry new_entry; 1336 1337 if (block->sector->entry_count == RECOVERY_JOURNAL_ENTRIES_PER_SECTOR) 1338 set_active_sector(block, 1339 (char *) block->sector + VDO_SECTOR_SIZE); 1340 1341 /* Compose and encode the entry. */ 1342 packed_entry = &block->sector->entries[block->sector->entry_count++]; 1343 new_entry = (struct recovery_journal_entry) { 1344 .mapping = { 1345 .pbn = data_vio->increment_updater.zpbn.pbn, 1346 .state = data_vio->increment_updater.zpbn.state, 1347 }, 1348 .unmapping = { 1349 .pbn = data_vio->decrement_updater.zpbn.pbn, 1350 .state = data_vio->decrement_updater.zpbn.state, 1351 }, 1352 .operation = data_vio->increment_updater.operation, 1353 .slot = lock->tree_slots[lock->height].block_map_slot, 1354 }; 1355 *packed_entry = vdo_pack_recovery_journal_entry(&new_entry); 1356 data_vio->recovery_sequence_number = block->sequence_number; 1357 1358 /* Enqueue the data_vio to wait for its entry to commit. */ 1359 vdo_waitq_enqueue_waiter(&block->commit_waiters, &data_vio->waiter); 1360 } 1361 } 1362 1363 /** 1364 * write_block() - Issue a block for writing. 1365 * 1366 * Implements waiter_callback_fn. 1367 */ 1368 static void write_block(struct vdo_waiter *waiter, void *context __always_unused) 1369 { 1370 struct recovery_journal_block *block = 1371 container_of(waiter, struct recovery_journal_block, write_waiter); 1372 struct recovery_journal *journal = block->journal; 1373 struct packed_journal_header *header = get_block_header(block); 1374 1375 if (block->committing || !vdo_waitq_has_waiters(&block->entry_waiters) || 1376 is_read_only(journal)) 1377 return; 1378 1379 block->entries_in_commit = vdo_waitq_num_waiters(&block->entry_waiters); 1380 add_queued_recovery_entries(block); 1381 1382 journal->pending_write_count += 1; 1383 journal->events.blocks.written += 1; 1384 journal->events.entries.written += block->entries_in_commit; 1385 1386 header->block_map_head = __cpu_to_le64(journal->block_map_head); 1387 header->slab_journal_head = __cpu_to_le64(journal->slab_journal_head); 1388 header->entry_count = __cpu_to_le16(block->entry_count); 1389 1390 block->committing = true; 1391 1392 /* 1393 * We must issue a flush and a FUA for every commit. The flush is necessary to ensure that 1394 * the data being referenced is stable. The FUA is necessary to ensure that the journal 1395 * block itself is stable before allowing overwrites of the lbn's previous data. 1396 */ 1397 vdo_submit_metadata_vio(&block->vio, journal->origin + block->block_number, 1398 complete_write_endio, handle_write_error, 1399 REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH | REQ_SYNC | REQ_FUA); 1400 } 1401 1402 1403 /** 1404 * write_blocks() - Attempt to commit blocks, according to write policy. 1405 * @journal: The recovery journal. 1406 */ 1407 static void write_blocks(struct recovery_journal *journal) 1408 { 1409 assert_on_journal_thread(journal, __func__); 1410 /* 1411 * We call this function after adding entries to the journal and after finishing a block 1412 * write. Thus, when this function terminates we must either have no VIOs waiting in the 1413 * journal or have some outstanding IO to provide a future wakeup. 1414 * 1415 * We want to only issue full blocks if there are no pending writes. However, if there are 1416 * no outstanding writes and some unwritten entries, we must issue a block, even if it's 1417 * the active block and it isn't full. 1418 */ 1419 if (journal->pending_write_count > 0) 1420 return; 1421 1422 /* Write all the full blocks. */ 1423 vdo_waitq_notify_all_waiters(&journal->pending_writes, write_block, NULL); 1424 1425 /* 1426 * Do we need to write the active block? Only if we have no outstanding writes, even after 1427 * issuing all of the full writes. 1428 */ 1429 if ((journal->pending_write_count == 0) && (journal->active_block != NULL)) 1430 write_block(&journal->active_block->write_waiter, NULL); 1431 } 1432 1433 /** 1434 * vdo_add_recovery_journal_entry() - Add an entry to a recovery journal. 1435 * @journal: The journal in which to make an entry. 1436 * @data_vio: The data_vio for which to add the entry. The entry will be taken 1437 * from the logical and new_mapped fields of the data_vio. The 1438 * data_vio's recovery_sequence_number field will be set to the 1439 * sequence number of the journal block in which the entry was 1440 * made. 1441 * 1442 * This method is asynchronous. The data_vio will not be called back until the entry is committed 1443 * to the on-disk journal. 1444 */ 1445 void vdo_add_recovery_journal_entry(struct recovery_journal *journal, 1446 struct data_vio *data_vio) 1447 { 1448 assert_on_journal_thread(journal, __func__); 1449 if (!vdo_is_state_normal(&journal->state)) { 1450 continue_data_vio_with_error(data_vio, VDO_INVALID_ADMIN_STATE); 1451 return; 1452 } 1453 1454 if (is_read_only(journal)) { 1455 continue_data_vio_with_error(data_vio, VDO_READ_ONLY); 1456 return; 1457 } 1458 1459 VDO_ASSERT_LOG_ONLY(data_vio->recovery_sequence_number == 0, 1460 "journal lock not held for new entry"); 1461 1462 vdo_advance_journal_point(&journal->append_point, journal->entries_per_block); 1463 vdo_waitq_enqueue_waiter(&journal->entry_waiters, &data_vio->waiter); 1464 assign_entries(journal); 1465 } 1466 1467 /** 1468 * is_lock_locked() - Check whether a lock is locked for a zone type. 1469 * @journal: The recovery journal. 1470 * @lock_number: The lock to check. 1471 * @zone_type: The type of the zone. 1472 * 1473 * If the recovery journal has a lock on the lock number, both logical and physical zones are 1474 * considered locked. 1475 * 1476 * Return: true if the specified lock has references (is locked). 1477 */ 1478 static bool is_lock_locked(struct recovery_journal *journal, block_count_t lock_number, 1479 enum vdo_zone_type zone_type) 1480 { 1481 atomic_t *zone_count; 1482 bool locked; 1483 1484 if (is_journal_zone_locked(journal, lock_number)) 1485 return true; 1486 1487 zone_count = get_zone_count_ptr(journal, lock_number, zone_type); 1488 locked = (atomic_read(zone_count) != 0); 1489 /* Pairs with implicit barrier in vdo_release_recovery_journal_block_reference() */ 1490 smp_rmb(); 1491 return locked; 1492 } 1493 1494 /** 1495 * reap_recovery_journal() - Conduct a sweep on a recovery journal to reclaim unreferenced blocks. 1496 * @journal: The recovery journal. 1497 */ 1498 static void reap_recovery_journal(struct recovery_journal *journal) 1499 { 1500 if (journal->reaping) { 1501 /* 1502 * We already have an outstanding reap in progress. We need to wait for it to 1503 * finish. 1504 */ 1505 return; 1506 } 1507 1508 if (vdo_is_state_quiescent(&journal->state)) { 1509 /* We are supposed to not do IO. Don't botch it by reaping. */ 1510 return; 1511 } 1512 1513 /* 1514 * Start reclaiming blocks only when the journal head has no references. Then stop when a 1515 * block is referenced. 1516 */ 1517 while ((journal->block_map_reap_head < journal->last_write_acknowledged) && 1518 !is_lock_locked(journal, journal->block_map_head_block_number, 1519 VDO_ZONE_TYPE_LOGICAL)) { 1520 journal->block_map_reap_head++; 1521 if (++journal->block_map_head_block_number == journal->size) 1522 journal->block_map_head_block_number = 0; 1523 } 1524 1525 while ((journal->slab_journal_reap_head < journal->last_write_acknowledged) && 1526 !is_lock_locked(journal, journal->slab_journal_head_block_number, 1527 VDO_ZONE_TYPE_PHYSICAL)) { 1528 journal->slab_journal_reap_head++; 1529 if (++journal->slab_journal_head_block_number == journal->size) 1530 journal->slab_journal_head_block_number = 0; 1531 } 1532 1533 if ((journal->block_map_reap_head == journal->block_map_head) && 1534 (journal->slab_journal_reap_head == journal->slab_journal_head)) { 1535 /* Nothing happened. */ 1536 return; 1537 } 1538 1539 /* 1540 * If the block map head will advance, we must flush any block map page modified by the 1541 * entries we are reaping. If the slab journal head will advance, we must flush the slab 1542 * summary update covering the slab journal that just released some lock. 1543 */ 1544 journal->reaping = true; 1545 vdo_submit_flush_vio(journal->flush_vio, flush_endio, handle_flush_error); 1546 } 1547 1548 /** 1549 * vdo_acquire_recovery_journal_block_reference() - Acquire a reference to a recovery journal block 1550 * from somewhere other than the journal itself. 1551 * @journal: The recovery journal. 1552 * @sequence_number: The journal sequence number of the referenced block. 1553 * @zone_type: The type of the zone making the adjustment. 1554 * @zone_id: The ID of the zone making the adjustment. 1555 */ 1556 void vdo_acquire_recovery_journal_block_reference(struct recovery_journal *journal, 1557 sequence_number_t sequence_number, 1558 enum vdo_zone_type zone_type, 1559 zone_count_t zone_id) 1560 { 1561 block_count_t lock_number; 1562 u16 *current_value; 1563 1564 if (sequence_number == 0) 1565 return; 1566 1567 VDO_ASSERT_LOG_ONLY((zone_type != VDO_ZONE_TYPE_JOURNAL), 1568 "invalid lock count increment from journal zone"); 1569 1570 lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number); 1571 current_value = get_counter(journal, lock_number, zone_type, zone_id); 1572 VDO_ASSERT_LOG_ONLY(*current_value < U16_MAX, 1573 "increment of lock counter must not overflow"); 1574 1575 if (*current_value == 0) { 1576 /* 1577 * This zone is acquiring this lock for the first time. Extra barriers because this 1578 * was original developed using an atomic add operation that implicitly had them. 1579 */ 1580 smp_mb__before_atomic(); 1581 atomic_inc(get_zone_count_ptr(journal, lock_number, zone_type)); 1582 /* same as before_atomic */ 1583 smp_mb__after_atomic(); 1584 } 1585 1586 *current_value += 1; 1587 } 1588 1589 /** 1590 * vdo_release_journal_entry_lock() - Release a single per-entry reference count for a recovery 1591 * journal block. 1592 * @journal: The recovery journal. 1593 * @sequence_number: The journal sequence number of the referenced block. 1594 */ 1595 void vdo_release_journal_entry_lock(struct recovery_journal *journal, 1596 sequence_number_t sequence_number) 1597 { 1598 block_count_t lock_number; 1599 1600 if (sequence_number == 0) 1601 return; 1602 1603 lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number); 1604 /* 1605 * Extra barriers because this was originally developed using an atomic add operation that 1606 * implicitly had them. 1607 */ 1608 smp_mb__before_atomic(); 1609 atomic_inc(get_decrement_counter(journal, lock_number)); 1610 /* same as before_atomic */ 1611 smp_mb__after_atomic(); 1612 } 1613 1614 /** 1615 * initiate_drain() - Initiate a drain. 1616 * 1617 * Implements vdo_admin_initiator_fn. 1618 */ 1619 static void initiate_drain(struct admin_state *state) 1620 { 1621 check_for_drain_complete(container_of(state, struct recovery_journal, state)); 1622 } 1623 1624 /** 1625 * vdo_drain_recovery_journal() - Drain recovery journal I/O. 1626 * @journal: The journal to drain. 1627 * @operation: The drain operation (suspend or save). 1628 * @parent: The completion to notify once the journal is drained. 1629 * 1630 * All uncommitted entries will be written out. 1631 */ 1632 void vdo_drain_recovery_journal(struct recovery_journal *journal, 1633 const struct admin_state_code *operation, 1634 struct vdo_completion *parent) 1635 { 1636 assert_on_journal_thread(journal, __func__); 1637 vdo_start_draining(&journal->state, operation, parent, initiate_drain); 1638 } 1639 1640 /** 1641 * resume_lock_counter() - Re-allow notifications from a suspended lock counter. 1642 * @counter: The counter. 1643 * 1644 * Return: true if the lock counter was suspended. 1645 */ 1646 static bool resume_lock_counter(struct lock_counter *counter) 1647 { 1648 int prior_state; 1649 1650 /* 1651 * Extra barriers because this was original developed using a CAS operation that implicitly 1652 * had them. 1653 */ 1654 smp_mb__before_atomic(); 1655 prior_state = atomic_cmpxchg(&counter->state, LOCK_COUNTER_STATE_SUSPENDED, 1656 LOCK_COUNTER_STATE_NOT_NOTIFYING); 1657 /* same as before_atomic */ 1658 smp_mb__after_atomic(); 1659 1660 return (prior_state == LOCK_COUNTER_STATE_SUSPENDED); 1661 } 1662 1663 /** 1664 * vdo_resume_recovery_journal() - Resume a recovery journal which has been drained. 1665 * @journal: The journal to resume. 1666 * @parent: The completion to finish once the journal is resumed. 1667 */ 1668 void vdo_resume_recovery_journal(struct recovery_journal *journal, 1669 struct vdo_completion *parent) 1670 { 1671 bool saved; 1672 1673 assert_on_journal_thread(journal, __func__); 1674 saved = vdo_is_state_saved(&journal->state); 1675 vdo_set_completion_result(parent, vdo_resume_if_quiescent(&journal->state)); 1676 if (is_read_only(journal)) { 1677 vdo_continue_completion(parent, VDO_READ_ONLY); 1678 return; 1679 } 1680 1681 if (saved) 1682 initialize_journal_state(journal); 1683 1684 if (resume_lock_counter(&journal->lock_counter)) { 1685 /* We might have missed a notification. */ 1686 reap_recovery_journal(journal); 1687 } 1688 1689 vdo_launch_completion(parent); 1690 } 1691 1692 /** 1693 * vdo_get_recovery_journal_logical_blocks_used() - Get the number of logical blocks in use by the 1694 * VDO. 1695 * @journal: The journal. 1696 * 1697 * Return: The number of logical blocks in use by the VDO. 1698 */ 1699 block_count_t vdo_get_recovery_journal_logical_blocks_used(const struct recovery_journal *journal) 1700 { 1701 return journal->logical_blocks_used; 1702 } 1703 1704 /** 1705 * vdo_get_recovery_journal_statistics() - Get the current statistics from the recovery journal. 1706 * @journal: The recovery journal to query. 1707 * 1708 * Return: A copy of the current statistics for the journal. 1709 */ 1710 struct recovery_journal_statistics 1711 vdo_get_recovery_journal_statistics(const struct recovery_journal *journal) 1712 { 1713 return journal->events; 1714 } 1715 1716 /** 1717 * dump_recovery_block() - Dump the contents of the recovery block to the log. 1718 * @block: The block to dump. 1719 */ 1720 static void dump_recovery_block(const struct recovery_journal_block *block) 1721 { 1722 vdo_log_info(" sequence number %llu; entries %u; %s; %zu entry waiters; %zu commit waiters", 1723 (unsigned long long) block->sequence_number, block->entry_count, 1724 (block->committing ? "committing" : "waiting"), 1725 vdo_waitq_num_waiters(&block->entry_waiters), 1726 vdo_waitq_num_waiters(&block->commit_waiters)); 1727 } 1728 1729 /** 1730 * vdo_dump_recovery_journal_statistics() - Dump some current statistics and other debug info from 1731 * the recovery journal. 1732 * @journal: The recovery journal to dump. 1733 */ 1734 void vdo_dump_recovery_journal_statistics(const struct recovery_journal *journal) 1735 { 1736 const struct recovery_journal_block *block; 1737 struct recovery_journal_statistics stats = vdo_get_recovery_journal_statistics(journal); 1738 1739 vdo_log_info("Recovery Journal"); 1740 vdo_log_info(" block_map_head=%llu slab_journal_head=%llu last_write_acknowledged=%llu tail=%llu block_map_reap_head=%llu slab_journal_reap_head=%llu disk_full=%llu slab_journal_commits_requested=%llu entry_waiters=%zu", 1741 (unsigned long long) journal->block_map_head, 1742 (unsigned long long) journal->slab_journal_head, 1743 (unsigned long long) journal->last_write_acknowledged, 1744 (unsigned long long) journal->tail, 1745 (unsigned long long) journal->block_map_reap_head, 1746 (unsigned long long) journal->slab_journal_reap_head, 1747 (unsigned long long) stats.disk_full, 1748 (unsigned long long) stats.slab_journal_commits_requested, 1749 vdo_waitq_num_waiters(&journal->entry_waiters)); 1750 vdo_log_info(" entries: started=%llu written=%llu committed=%llu", 1751 (unsigned long long) stats.entries.started, 1752 (unsigned long long) stats.entries.written, 1753 (unsigned long long) stats.entries.committed); 1754 vdo_log_info(" blocks: started=%llu written=%llu committed=%llu", 1755 (unsigned long long) stats.blocks.started, 1756 (unsigned long long) stats.blocks.written, 1757 (unsigned long long) stats.blocks.committed); 1758 1759 vdo_log_info(" active blocks:"); 1760 list_for_each_entry(block, &journal->active_tail_blocks, list_node) 1761 dump_recovery_block(block); 1762 } 1763