xref: /linux/drivers/md/dm-vdo/repair.c (revision 566ab427f827b0256d3e8ce0235d088e6a9c28bd)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "repair.h"
7 
8 #include <linux/min_heap.h>
9 #include <linux/minmax.h>
10 
11 #include "logger.h"
12 #include "memory-alloc.h"
13 #include "permassert.h"
14 
15 #include "block-map.h"
16 #include "completion.h"
17 #include "constants.h"
18 #include "encodings.h"
19 #include "int-map.h"
20 #include "io-submitter.h"
21 #include "recovery-journal.h"
22 #include "slab-depot.h"
23 #include "types.h"
24 #include "vdo.h"
25 #include "wait-queue.h"
26 
27 /*
28  * An explicitly numbered block mapping. Numbering the mappings allows them to be sorted by logical
29  * block number during repair while still preserving the relative order of journal entries with
30  * the same logical block number.
31  */
32 struct numbered_block_mapping {
33 	struct block_map_slot block_map_slot;
34 	struct block_map_entry block_map_entry;
35 	/* A serial number to use during replay */
36 	u32 number;
37 } __packed;
38 
39 /*
40  * The absolute position of an entry in the recovery journal, including the sector number and the
41  * entry number within the sector.
42  */
43 struct recovery_point {
44 	/* Block sequence number */
45 	sequence_number_t sequence_number;
46 	/* Sector number */
47 	u8 sector_count;
48 	/* Entry number */
49 	journal_entry_count_t entry_count;
50 	/* Whether or not the increment portion of the current entry has been applied */
51 	bool increment_applied;
52 };
53 
54 DEFINE_MIN_HEAP(struct numbered_block_mapping, replay_heap);
55 
56 struct repair_completion {
57 	/* The completion header */
58 	struct vdo_completion completion;
59 
60 	/* A buffer to hold the data read off disk */
61 	char *journal_data;
62 
63 	/* For loading the journal */
64 	data_vio_count_t vio_count;
65 	data_vio_count_t vios_complete;
66 	struct vio *vios;
67 
68 	/* The number of entries to be applied to the block map */
69 	size_t block_map_entry_count;
70 	/* The sequence number of the first valid block for block map recovery */
71 	sequence_number_t block_map_head;
72 	/* The sequence number of the first valid block for slab journal replay */
73 	sequence_number_t slab_journal_head;
74 	/* The sequence number of the last valid block of the journal (if known) */
75 	sequence_number_t tail;
76 	/*
77 	 * The highest sequence number of the journal. During recovery (vs read-only rebuild), not
78 	 * the same as the tail, since the tail ignores blocks after the first hole.
79 	 */
80 	sequence_number_t highest_tail;
81 
82 	/* The number of logical blocks currently known to be in use */
83 	block_count_t logical_blocks_used;
84 	/* The number of block map data blocks known to be allocated */
85 	block_count_t block_map_data_blocks;
86 
87 	/* These fields are for playing the journal into the block map */
88 	/* The entry data for the block map recovery */
89 	struct numbered_block_mapping *entries;
90 	/* The number of entries in the entry array */
91 	size_t entry_count;
92 	/* number of pending (non-ready) requests*/
93 	page_count_t outstanding;
94 	/* number of page completions */
95 	page_count_t page_count;
96 	bool launching;
97 	/*
98 	 * a heap wrapping journal_entries. It re-orders and sorts journal entries in ascending LBN
99 	 * order, then original journal order. This permits efficient iteration over the journal
100 	 * entries in order.
101 	 */
102 	struct replay_heap replay_heap;
103 	/* Fields tracking progress through the journal entries. */
104 	struct numbered_block_mapping *current_entry;
105 	struct numbered_block_mapping *current_unfetched_entry;
106 	/* Current requested page's PBN */
107 	physical_block_number_t pbn;
108 
109 	/* These fields are only used during recovery. */
110 	/* A location just beyond the last valid entry of the journal */
111 	struct recovery_point tail_recovery_point;
112 	/* The location of the next recovery journal entry to apply */
113 	struct recovery_point next_recovery_point;
114 	/* The journal point to give to the next synthesized decref */
115 	struct journal_point next_journal_point;
116 	/* The number of entries played into slab journals */
117 	size_t entries_added_to_slab_journals;
118 
119 	/* These fields are only used during read-only rebuild */
120 	page_count_t page_to_fetch;
121 	/* the number of leaf pages in the block map */
122 	page_count_t leaf_pages;
123 	/* the last slot of the block map */
124 	struct block_map_slot last_slot;
125 
126 	/*
127 	 * The page completions used for playing the journal into the block map, and, during
128 	 * read-only rebuild, for rebuilding the reference counts from the block map.
129 	 */
130 	struct vdo_page_completion page_completions[];
131 };
132 
133 /*
134  * This is a min_heap callback function that orders numbered_block_mappings using the
135  * 'block_map_slot' field as the primary key and the mapping 'number' field as the secondary key.
136  * Using the mapping number preserves the journal order of entries for the same slot, allowing us
137  * to sort by slot while still ensuring we replay all entries with the same slot in the exact order
138  * as they appeared in the journal.
139  */
140 static bool mapping_is_less_than(const void *item1, const void *item2, void __always_unused *args)
141 {
142 	const struct numbered_block_mapping *mapping1 =
143 		(const struct numbered_block_mapping *) item1;
144 	const struct numbered_block_mapping *mapping2 =
145 		(const struct numbered_block_mapping *) item2;
146 
147 	if (mapping1->block_map_slot.pbn != mapping2->block_map_slot.pbn)
148 		return mapping1->block_map_slot.pbn < mapping2->block_map_slot.pbn;
149 
150 	if (mapping1->block_map_slot.slot != mapping2->block_map_slot.slot)
151 		return mapping1->block_map_slot.slot < mapping2->block_map_slot.slot;
152 
153 	if (mapping1->number != mapping2->number)
154 		return mapping1->number < mapping2->number;
155 
156 	return 0;
157 }
158 
159 static void swap_mappings(void *item1, void *item2, void __always_unused *args)
160 {
161 	struct numbered_block_mapping *mapping1 = item1;
162 	struct numbered_block_mapping *mapping2 = item2;
163 
164 	swap(*mapping1, *mapping2);
165 }
166 
167 static const struct min_heap_callbacks repair_min_heap = {
168 	.less = mapping_is_less_than,
169 	.swp = swap_mappings,
170 };
171 
172 static struct numbered_block_mapping *sort_next_heap_element(struct repair_completion *repair)
173 {
174 	struct replay_heap *heap = &repair->replay_heap;
175 	struct numbered_block_mapping *last;
176 
177 	if (heap->nr == 0)
178 		return NULL;
179 
180 	/*
181 	 * Swap the next heap element with the last one on the heap, popping it off the heap,
182 	 * restore the heap invariant, and return a pointer to the popped element.
183 	 */
184 	last = &repair->entries[--heap->nr];
185 	swap_mappings(heap->data, last, NULL);
186 	min_heap_sift_down(heap, 0, &repair_min_heap, NULL);
187 	return last;
188 }
189 
190 /**
191  * as_repair_completion() - Convert a generic completion to a repair_completion.
192  * @completion: The completion to convert.
193  *
194  * Return: The repair_completion.
195  */
196 static inline struct repair_completion * __must_check
197 as_repair_completion(struct vdo_completion *completion)
198 {
199 	vdo_assert_completion_type(completion, VDO_REPAIR_COMPLETION);
200 	return container_of(completion, struct repair_completion, completion);
201 }
202 
203 static void prepare_repair_completion(struct repair_completion *repair,
204 				      vdo_action_fn callback, enum vdo_zone_type zone_type)
205 {
206 	struct vdo_completion *completion = &repair->completion;
207 	const struct thread_config *thread_config = &completion->vdo->thread_config;
208 	thread_id_t thread_id;
209 
210 	/* All blockmap access is done on single thread, so use logical zone 0. */
211 	thread_id = ((zone_type == VDO_ZONE_TYPE_LOGICAL) ?
212 		     thread_config->logical_threads[0] :
213 		     thread_config->admin_thread);
214 	vdo_reset_completion(completion);
215 	vdo_set_completion_callback(completion, callback, thread_id);
216 }
217 
218 static void launch_repair_completion(struct repair_completion *repair,
219 				     vdo_action_fn callback, enum vdo_zone_type zone_type)
220 {
221 	prepare_repair_completion(repair, callback, zone_type);
222 	vdo_launch_completion(&repair->completion);
223 }
224 
225 static void uninitialize_vios(struct repair_completion *repair)
226 {
227 	while (repair->vio_count > 0)
228 		free_vio_components(&repair->vios[--repair->vio_count]);
229 
230 	vdo_free(vdo_forget(repair->vios));
231 }
232 
233 static void free_repair_completion(struct repair_completion *repair)
234 {
235 	if (repair == NULL)
236 		return;
237 
238 	/*
239 	 * We do this here because this function is the only common bottleneck for all clean up
240 	 * paths.
241 	 */
242 	repair->completion.vdo->block_map->zones[0].page_cache.rebuilding = false;
243 
244 	uninitialize_vios(repair);
245 	vdo_free(vdo_forget(repair->journal_data));
246 	vdo_free(vdo_forget(repair->entries));
247 	vdo_free(repair);
248 }
249 
250 static void finish_repair(struct vdo_completion *completion)
251 {
252 	struct vdo_completion *parent = completion->parent;
253 	struct vdo *vdo = completion->vdo;
254 	struct repair_completion *repair = as_repair_completion(completion);
255 
256 	vdo_assert_on_admin_thread(vdo, __func__);
257 
258 	if (vdo->load_state != VDO_REBUILD_FOR_UPGRADE)
259 		vdo->states.vdo.complete_recoveries++;
260 
261 	vdo_initialize_recovery_journal_post_repair(vdo->recovery_journal,
262 						    vdo->states.vdo.complete_recoveries,
263 						    repair->highest_tail,
264 						    repair->logical_blocks_used,
265 						    repair->block_map_data_blocks);
266 	free_repair_completion(vdo_forget(repair));
267 
268 	if (vdo_state_requires_read_only_rebuild(vdo->load_state)) {
269 		vdo_log_info("Read-only rebuild complete");
270 		vdo_launch_completion(parent);
271 		return;
272 	}
273 
274 	/* FIXME: shouldn't this say either "recovery" or "repair"? */
275 	vdo_log_info("Rebuild complete");
276 
277 	/*
278 	 * Now that we've freed the repair completion and its vast array of journal entries, we
279 	 * can allocate refcounts.
280 	 */
281 	vdo_continue_completion(parent, vdo_allocate_reference_counters(vdo->depot));
282 }
283 
284 /**
285  * abort_repair() - Handle a repair error.
286  * @completion: The repair completion.
287  */
288 static void abort_repair(struct vdo_completion *completion)
289 {
290 	struct vdo_completion *parent = completion->parent;
291 	int result = completion->result;
292 	struct repair_completion *repair = as_repair_completion(completion);
293 
294 	if (vdo_state_requires_read_only_rebuild(completion->vdo->load_state))
295 		vdo_log_info("Read-only rebuild aborted");
296 	else
297 		vdo_log_warning("Recovery aborted");
298 
299 	free_repair_completion(vdo_forget(repair));
300 	vdo_continue_completion(parent, result);
301 }
302 
303 /**
304  * abort_on_error() - Abort a repair if there is an error.
305  * @result: The result to check.
306  * @repair: The repair completion.
307  *
308  * Return: true if the result was an error.
309  */
310 static bool __must_check abort_on_error(int result, struct repair_completion *repair)
311 {
312 	if (result == VDO_SUCCESS)
313 		return false;
314 
315 	vdo_fail_completion(&repair->completion, result);
316 	return true;
317 }
318 
319 /**
320  * drain_slab_depot() - Flush out all dirty refcounts blocks now that they have been rebuilt or
321  *                      recovered.
322  * @completion: The repair completion.
323  */
324 static void drain_slab_depot(struct vdo_completion *completion)
325 {
326 	struct vdo *vdo = completion->vdo;
327 	struct repair_completion *repair = as_repair_completion(completion);
328 	const struct admin_state_code *operation;
329 
330 	vdo_assert_on_admin_thread(vdo, __func__);
331 
332 	prepare_repair_completion(repair, finish_repair, VDO_ZONE_TYPE_ADMIN);
333 	if (vdo_state_requires_read_only_rebuild(vdo->load_state)) {
334 		vdo_log_info("Saving rebuilt state");
335 		operation = VDO_ADMIN_STATE_REBUILDING;
336 	} else {
337 		vdo_log_info("Replayed %zu journal entries into slab journals",
338 			     repair->entries_added_to_slab_journals);
339 		operation = VDO_ADMIN_STATE_RECOVERING;
340 	}
341 
342 	vdo_drain_slab_depot(vdo->depot, operation, completion);
343 }
344 
345 /**
346  * flush_block_map_updates() - Flush the block map now that all the reference counts are rebuilt.
347  * @completion: The repair completion.
348  *
349  * This callback is registered in finish_if_done().
350  */
351 static void flush_block_map_updates(struct vdo_completion *completion)
352 {
353 	vdo_assert_on_admin_thread(completion->vdo, __func__);
354 
355 	vdo_log_info("Flushing block map changes");
356 	prepare_repair_completion(as_repair_completion(completion), drain_slab_depot,
357 				  VDO_ZONE_TYPE_ADMIN);
358 	vdo_drain_block_map(completion->vdo->block_map, VDO_ADMIN_STATE_RECOVERING,
359 			    completion);
360 }
361 
362 static bool fetch_page(struct repair_completion *repair,
363 		       struct vdo_completion *completion);
364 
365 /**
366  * handle_page_load_error() - Handle an error loading a page.
367  * @completion: The vdo_page_completion.
368  */
369 static void handle_page_load_error(struct vdo_completion *completion)
370 {
371 	struct repair_completion *repair = completion->parent;
372 
373 	repair->outstanding--;
374 	vdo_set_completion_result(&repair->completion, completion->result);
375 	vdo_release_page_completion(completion);
376 	fetch_page(repair, completion);
377 }
378 
379 /**
380  * unmap_entry() - Unmap an invalid entry and indicate that its page must be written out.
381  * @page: The page containing the entries
382  * @completion: The page_completion for writing the page
383  * @slot: The slot to unmap
384  */
385 static void unmap_entry(struct block_map_page *page, struct vdo_completion *completion,
386 			slot_number_t slot)
387 {
388 	page->entries[slot] = UNMAPPED_BLOCK_MAP_ENTRY;
389 	vdo_request_page_write(completion);
390 }
391 
392 /**
393  * remove_out_of_bounds_entries() - Unmap entries which outside the logical space.
394  * @page: The page containing the entries
395  * @completion: The page_completion for writing the page
396  * @start: The first slot to check
397  */
398 static void remove_out_of_bounds_entries(struct block_map_page *page,
399 					 struct vdo_completion *completion,
400 					 slot_number_t start)
401 {
402 	slot_number_t slot;
403 
404 	for (slot = start; slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; slot++) {
405 		struct data_location mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
406 
407 		if (vdo_is_mapped_location(&mapping))
408 			unmap_entry(page, completion, slot);
409 	}
410 }
411 
412 /**
413  * process_slot() - Update the reference counts for a single entry.
414  * @page: The page containing the entries
415  * @completion: The page_completion for writing the page
416  * @slot: The slot to check
417  *
418  * Return: true if the entry was a valid mapping
419  */
420 static bool process_slot(struct block_map_page *page, struct vdo_completion *completion,
421 			 slot_number_t slot)
422 {
423 	struct slab_depot *depot = completion->vdo->depot;
424 	int result;
425 	struct data_location mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
426 
427 	if (!vdo_is_valid_location(&mapping)) {
428 		/* This entry is invalid, so remove it from the page. */
429 		unmap_entry(page, completion, slot);
430 		return false;
431 	}
432 
433 	if (!vdo_is_mapped_location(&mapping))
434 		return false;
435 
436 
437 	if (mapping.pbn == VDO_ZERO_BLOCK)
438 		return true;
439 
440 	if (!vdo_is_physical_data_block(depot, mapping.pbn)) {
441 		/*
442 		 * This is a nonsense mapping. Remove it from the map so we're at least consistent
443 		 * and mark the page dirty.
444 		 */
445 		unmap_entry(page, completion, slot);
446 		return false;
447 	}
448 
449 	result = vdo_adjust_reference_count_for_rebuild(depot, mapping.pbn,
450 							VDO_JOURNAL_DATA_REMAPPING);
451 	if (result == VDO_SUCCESS)
452 		return true;
453 
454 	vdo_log_error_strerror(result,
455 			       "Could not adjust reference count for PBN %llu, slot %u mapped to PBN %llu",
456 			       (unsigned long long) vdo_get_block_map_page_pbn(page),
457 			       slot, (unsigned long long) mapping.pbn);
458 	unmap_entry(page, completion, slot);
459 	return false;
460 }
461 
462 /**
463  * rebuild_reference_counts_from_page() - Rebuild reference counts from a block map page.
464  * @repair: The repair completion.
465  * @completion: The page completion holding the page.
466  */
467 static void rebuild_reference_counts_from_page(struct repair_completion *repair,
468 					       struct vdo_completion *completion)
469 {
470 	slot_number_t slot, last_slot;
471 	struct block_map_page *page;
472 	int result;
473 
474 	result = vdo_get_cached_page(completion, &page);
475 	if (result != VDO_SUCCESS) {
476 		vdo_set_completion_result(&repair->completion, result);
477 		return;
478 	}
479 
480 	if (!page->header.initialized)
481 		return;
482 
483 	/* Remove any bogus entries which exist beyond the end of the logical space. */
484 	if (vdo_get_block_map_page_pbn(page) == repair->last_slot.pbn) {
485 		last_slot = repair->last_slot.slot;
486 		remove_out_of_bounds_entries(page, completion, last_slot);
487 	} else {
488 		last_slot = VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
489 	}
490 
491 	/* Inform the slab depot of all entries on this page. */
492 	for (slot = 0; slot < last_slot; slot++) {
493 		if (process_slot(page, completion, slot))
494 			repair->logical_blocks_used++;
495 	}
496 }
497 
498 /**
499  * page_loaded() - Process a page which has just been loaded.
500  * @completion: The vdo_page_completion for the fetched page.
501  *
502  * This callback is registered by fetch_page().
503  */
504 static void page_loaded(struct vdo_completion *completion)
505 {
506 	struct repair_completion *repair = completion->parent;
507 
508 	repair->outstanding--;
509 	rebuild_reference_counts_from_page(repair, completion);
510 	vdo_release_page_completion(completion);
511 
512 	/* Advance progress to the next page, and fetch the next page we haven't yet requested. */
513 	fetch_page(repair, completion);
514 }
515 
516 static physical_block_number_t get_pbn_to_fetch(struct repair_completion *repair,
517 						struct block_map *block_map)
518 {
519 	physical_block_number_t pbn = VDO_ZERO_BLOCK;
520 
521 	if (repair->completion.result != VDO_SUCCESS)
522 		return VDO_ZERO_BLOCK;
523 
524 	while ((pbn == VDO_ZERO_BLOCK) && (repair->page_to_fetch < repair->leaf_pages))
525 		pbn = vdo_find_block_map_page_pbn(block_map, repair->page_to_fetch++);
526 
527 	if (vdo_is_physical_data_block(repair->completion.vdo->depot, pbn))
528 		return pbn;
529 
530 	vdo_set_completion_result(&repair->completion, VDO_BAD_MAPPING);
531 	return VDO_ZERO_BLOCK;
532 }
533 
534 /**
535  * fetch_page() - Fetch a page from the block map.
536  * @repair: The repair_completion.
537  * @completion: The page completion to use.
538  *
539  * Return true if the rebuild is complete
540  */
541 static bool fetch_page(struct repair_completion *repair,
542 		       struct vdo_completion *completion)
543 {
544 	struct vdo_page_completion *page_completion = (struct vdo_page_completion *) completion;
545 	struct block_map *block_map = repair->completion.vdo->block_map;
546 	physical_block_number_t pbn = get_pbn_to_fetch(repair, block_map);
547 
548 	if (pbn != VDO_ZERO_BLOCK) {
549 		repair->outstanding++;
550 		/*
551 		 * We must set the requeue flag here to ensure that we don't blow the stack if all
552 		 * the requested pages are already in the cache or get load errors.
553 		 */
554 		vdo_get_page(page_completion, &block_map->zones[0], pbn, true, repair,
555 			     page_loaded, handle_page_load_error, true);
556 	}
557 
558 	if (repair->outstanding > 0)
559 		return false;
560 
561 	launch_repair_completion(repair, flush_block_map_updates, VDO_ZONE_TYPE_ADMIN);
562 	return true;
563 }
564 
565 /**
566  * rebuild_from_leaves() - Rebuild reference counts from the leaf block map pages.
567  * @completion: The repair completion.
568  *
569  * Rebuilds reference counts from the leaf block map pages now that reference counts have been
570  * rebuilt from the interior tree pages (which have been loaded in the process). This callback is
571  * registered in rebuild_reference_counts().
572  */
573 static void rebuild_from_leaves(struct vdo_completion *completion)
574 {
575 	page_count_t i;
576 	struct repair_completion *repair = as_repair_completion(completion);
577 	struct block_map *map = completion->vdo->block_map;
578 
579 	repair->logical_blocks_used = 0;
580 
581 	/*
582 	 * The PBN calculation doesn't work until the tree pages have been loaded, so we can't set
583 	 * this value at the start of repair.
584 	 */
585 	repair->leaf_pages = vdo_compute_block_map_page_count(map->entry_count);
586 	repair->last_slot = (struct block_map_slot) {
587 		.slot = map->entry_count % VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
588 		.pbn = vdo_find_block_map_page_pbn(map, repair->leaf_pages - 1),
589 	};
590 	if (repair->last_slot.slot == 0)
591 		repair->last_slot.slot = VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
592 
593 	for (i = 0; i < repair->page_count; i++) {
594 		if (fetch_page(repair, &repair->page_completions[i].completion)) {
595 			/*
596 			 * The rebuild has already moved on, so it isn't safe nor is there a need
597 			 * to launch any more fetches.
598 			 */
599 			return;
600 		}
601 	}
602 }
603 
604 /**
605  * process_entry() - Process a single entry from the block map tree.
606  * @pbn: A pbn which holds a block map tree page.
607  * @completion: The parent completion of the traversal.
608  *
609  * Implements vdo_entry_callback_fn.
610  *
611  * Return: VDO_SUCCESS or an error.
612  */
613 static int process_entry(physical_block_number_t pbn, struct vdo_completion *completion)
614 {
615 	struct repair_completion *repair = as_repair_completion(completion);
616 	struct slab_depot *depot = completion->vdo->depot;
617 	int result;
618 
619 	if ((pbn == VDO_ZERO_BLOCK) || !vdo_is_physical_data_block(depot, pbn)) {
620 		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
621 					      "PBN %llu out of range",
622 					      (unsigned long long) pbn);
623 	}
624 
625 	result = vdo_adjust_reference_count_for_rebuild(depot, pbn,
626 							VDO_JOURNAL_BLOCK_MAP_REMAPPING);
627 	if (result != VDO_SUCCESS) {
628 		return vdo_log_error_strerror(result,
629 					      "Could not adjust reference count for block map tree PBN %llu",
630 					      (unsigned long long) pbn);
631 	}
632 
633 	repair->block_map_data_blocks++;
634 	return VDO_SUCCESS;
635 }
636 
637 static void rebuild_reference_counts(struct vdo_completion *completion)
638 {
639 	struct repair_completion *repair = as_repair_completion(completion);
640 	struct vdo *vdo = completion->vdo;
641 	struct vdo_page_cache *cache = &vdo->block_map->zones[0].page_cache;
642 
643 	/* We must allocate ref_counts before we can rebuild them. */
644 	if (abort_on_error(vdo_allocate_reference_counters(vdo->depot), repair))
645 		return;
646 
647 	/*
648 	 * Completion chaining from page cache hits can lead to stack overflow during the rebuild,
649 	 * so clear out the cache before this rebuild phase.
650 	 */
651 	if (abort_on_error(vdo_invalidate_page_cache(cache), repair))
652 		return;
653 
654 	prepare_repair_completion(repair, rebuild_from_leaves, VDO_ZONE_TYPE_LOGICAL);
655 	vdo_traverse_forest(vdo->block_map, process_entry, completion);
656 }
657 
658 static void increment_recovery_point(struct recovery_point *point)
659 {
660 	if (++point->entry_count < RECOVERY_JOURNAL_ENTRIES_PER_SECTOR)
661 		return;
662 
663 	point->entry_count = 0;
664 	if (point->sector_count < (VDO_SECTORS_PER_BLOCK - 1)) {
665 		point->sector_count++;
666 		return;
667 	}
668 
669 	point->sequence_number++;
670 	point->sector_count = 1;
671 }
672 
673 /**
674  * advance_points() - Advance the current recovery and journal points.
675  * @repair: The repair_completion whose points are to be advanced.
676  * @entries_per_block: The number of entries in a recovery journal block.
677  */
678 static void advance_points(struct repair_completion *repair,
679 			   journal_entry_count_t entries_per_block)
680 {
681 	if (!repair->next_recovery_point.increment_applied) {
682 		repair->next_recovery_point.increment_applied	= true;
683 		return;
684 	}
685 
686 	increment_recovery_point(&repair->next_recovery_point);
687 	vdo_advance_journal_point(&repair->next_journal_point, entries_per_block);
688 	repair->next_recovery_point.increment_applied	= false;
689 }
690 
691 /**
692  * before_recovery_point() - Check whether the first point precedes the second point.
693  * @first: The first recovery point.
694  * @second: The second recovery point.
695  *
696  * Return: true if the first point precedes the second point.
697  */
698 static bool __must_check before_recovery_point(const struct recovery_point *first,
699 					       const struct recovery_point *second)
700 {
701 	if (first->sequence_number < second->sequence_number)
702 		return true;
703 
704 	if (first->sequence_number > second->sequence_number)
705 		return false;
706 
707 	if (first->sector_count < second->sector_count)
708 		return true;
709 
710 	return ((first->sector_count == second->sector_count) &&
711 		(first->entry_count < second->entry_count));
712 }
713 
714 static struct packed_journal_sector * __must_check get_sector(struct recovery_journal *journal,
715 							      char *journal_data,
716 							      sequence_number_t sequence,
717 							      u8 sector_number)
718 {
719 	off_t offset;
720 
721 	offset = ((vdo_get_recovery_journal_block_number(journal, sequence) * VDO_BLOCK_SIZE) +
722 		  (VDO_SECTOR_SIZE * sector_number));
723 	return (struct packed_journal_sector *) (journal_data + offset);
724 }
725 
726 /**
727  * get_entry() - Unpack the recovery journal entry associated with the given recovery point.
728  * @repair: The repair completion.
729  * @point: The recovery point.
730  *
731  * Return: The unpacked contents of the matching recovery journal entry.
732  */
733 static struct recovery_journal_entry get_entry(const struct repair_completion *repair,
734 					       const struct recovery_point *point)
735 {
736 	struct packed_journal_sector *sector;
737 
738 	sector = get_sector(repair->completion.vdo->recovery_journal,
739 			    repair->journal_data, point->sequence_number,
740 			    point->sector_count);
741 	return vdo_unpack_recovery_journal_entry(&sector->entries[point->entry_count]);
742 }
743 
744 /**
745  * validate_recovery_journal_entry() - Validate a recovery journal entry.
746  * @vdo: The vdo.
747  * @entry: The entry to validate.
748  *
749  * Return: VDO_SUCCESS or an error.
750  */
751 static int validate_recovery_journal_entry(const struct vdo *vdo,
752 					   const struct recovery_journal_entry *entry)
753 {
754 	if ((entry->slot.pbn >= vdo->states.vdo.config.physical_blocks) ||
755 	    (entry->slot.slot >= VDO_BLOCK_MAP_ENTRIES_PER_PAGE) ||
756 	    !vdo_is_valid_location(&entry->mapping) ||
757 	    !vdo_is_valid_location(&entry->unmapping) ||
758 	    !vdo_is_physical_data_block(vdo->depot, entry->mapping.pbn) ||
759 	    !vdo_is_physical_data_block(vdo->depot, entry->unmapping.pbn)) {
760 		return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
761 					      "Invalid entry: %s (%llu, %u) from %llu to %llu is not within bounds",
762 					      vdo_get_journal_operation_name(entry->operation),
763 					      (unsigned long long) entry->slot.pbn,
764 					      entry->slot.slot,
765 					      (unsigned long long) entry->unmapping.pbn,
766 					      (unsigned long long) entry->mapping.pbn);
767 	}
768 
769 	if ((entry->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) &&
770 	    (vdo_is_state_compressed(entry->mapping.state) ||
771 	     (entry->mapping.pbn == VDO_ZERO_BLOCK) ||
772 	     (entry->unmapping.state != VDO_MAPPING_STATE_UNMAPPED) ||
773 	     (entry->unmapping.pbn != VDO_ZERO_BLOCK))) {
774 		return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
775 					      "Invalid entry: %s (%llu, %u) from %llu to %llu is not a valid tree mapping",
776 					      vdo_get_journal_operation_name(entry->operation),
777 					      (unsigned long long) entry->slot.pbn,
778 					      entry->slot.slot,
779 					      (unsigned long long) entry->unmapping.pbn,
780 					      (unsigned long long) entry->mapping.pbn);
781 	}
782 
783 	return VDO_SUCCESS;
784 }
785 
786 /**
787  * add_slab_journal_entries() - Replay recovery journal entries into the slab journals of the
788  *                              allocator currently being recovered.
789  * @completion: The allocator completion.
790  *
791  * Waits for slab journal tailblock space when necessary. This method is its own callback.
792  */
793 static void add_slab_journal_entries(struct vdo_completion *completion)
794 {
795 	struct recovery_point *recovery_point;
796 	struct repair_completion *repair = completion->parent;
797 	struct vdo *vdo = completion->vdo;
798 	struct recovery_journal *journal = vdo->recovery_journal;
799 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
800 
801 	/* Get ready in case we need to enqueue again. */
802 	vdo_prepare_completion(completion, add_slab_journal_entries,
803 			       vdo_notify_slab_journals_are_recovered,
804 			       completion->callback_thread_id, repair);
805 	for (recovery_point = &repair->next_recovery_point;
806 	     before_recovery_point(recovery_point, &repair->tail_recovery_point);
807 	     advance_points(repair, journal->entries_per_block)) {
808 		int result;
809 		physical_block_number_t pbn;
810 		struct vdo_slab *slab;
811 		struct recovery_journal_entry entry = get_entry(repair, recovery_point);
812 		bool increment = !repair->next_recovery_point.increment_applied;
813 
814 		if (increment) {
815 			result = validate_recovery_journal_entry(vdo, &entry);
816 			if (result != VDO_SUCCESS) {
817 				vdo_enter_read_only_mode(vdo, result);
818 				vdo_fail_completion(completion, result);
819 				return;
820 			}
821 
822 			pbn = entry.mapping.pbn;
823 		} else {
824 			pbn = entry.unmapping.pbn;
825 		}
826 
827 		if (pbn == VDO_ZERO_BLOCK)
828 			continue;
829 
830 		slab = vdo_get_slab(vdo->depot, pbn);
831 		if (slab->allocator != allocator)
832 			continue;
833 
834 		if (!vdo_attempt_replay_into_slab(slab, pbn, entry.operation, increment,
835 						  &repair->next_journal_point,
836 						  completion))
837 			return;
838 
839 		repair->entries_added_to_slab_journals++;
840 	}
841 
842 	vdo_notify_slab_journals_are_recovered(completion);
843 }
844 
845 /**
846  * vdo_replay_into_slab_journals() - Replay recovery journal entries in the slab journals of slabs
847  *                                   owned by a given block_allocator.
848  * @allocator: The allocator whose slab journals are to be recovered.
849  * @context: The slab depot load context supplied by a recovery when it loads the depot.
850  */
851 void vdo_replay_into_slab_journals(struct block_allocator *allocator, void *context)
852 {
853 	struct vdo_completion *completion = &allocator->completion;
854 	struct repair_completion *repair = context;
855 	struct vdo *vdo = completion->vdo;
856 
857 	vdo_assert_on_physical_zone_thread(vdo, allocator->zone_number, __func__);
858 	if (repair->entry_count == 0) {
859 		/* there's nothing to replay */
860 		repair->logical_blocks_used = vdo->recovery_journal->logical_blocks_used;
861 		repair->block_map_data_blocks = vdo->recovery_journal->block_map_data_blocks;
862 		vdo_notify_slab_journals_are_recovered(completion);
863 		return;
864 	}
865 
866 	repair->next_recovery_point = (struct recovery_point) {
867 		.sequence_number = repair->slab_journal_head,
868 		.sector_count = 1,
869 		.entry_count = 0,
870 	};
871 
872 	repair->next_journal_point = (struct journal_point) {
873 		.sequence_number = repair->slab_journal_head,
874 		.entry_count = 0,
875 	};
876 
877 	vdo_log_info("Replaying entries into slab journals for zone %u",
878 		     allocator->zone_number);
879 	completion->parent = repair;
880 	add_slab_journal_entries(completion);
881 }
882 
883 static void load_slab_depot(struct vdo_completion *completion)
884 {
885 	struct repair_completion *repair = as_repair_completion(completion);
886 	const struct admin_state_code *operation;
887 
888 	vdo_assert_on_admin_thread(completion->vdo, __func__);
889 
890 	if (vdo_state_requires_read_only_rebuild(completion->vdo->load_state)) {
891 		prepare_repair_completion(repair, rebuild_reference_counts,
892 					  VDO_ZONE_TYPE_LOGICAL);
893 		operation = VDO_ADMIN_STATE_LOADING_FOR_REBUILD;
894 	} else {
895 		prepare_repair_completion(repair, drain_slab_depot, VDO_ZONE_TYPE_ADMIN);
896 		operation = VDO_ADMIN_STATE_LOADING_FOR_RECOVERY;
897 	}
898 
899 	vdo_load_slab_depot(completion->vdo->depot, operation, completion, repair);
900 }
901 
902 static void flush_block_map(struct vdo_completion *completion)
903 {
904 	struct repair_completion *repair = as_repair_completion(completion);
905 	const struct admin_state_code *operation;
906 
907 	vdo_assert_on_admin_thread(completion->vdo, __func__);
908 
909 	vdo_log_info("Flushing block map changes");
910 	prepare_repair_completion(repair, load_slab_depot, VDO_ZONE_TYPE_ADMIN);
911 	operation = (vdo_state_requires_read_only_rebuild(completion->vdo->load_state) ?
912 		     VDO_ADMIN_STATE_REBUILDING :
913 		     VDO_ADMIN_STATE_RECOVERING);
914 	vdo_drain_block_map(completion->vdo->block_map, operation, completion);
915 }
916 
917 static bool finish_if_done(struct repair_completion *repair)
918 {
919 	/* Pages are still being launched or there is still work to do */
920 	if (repair->launching || (repair->outstanding > 0))
921 		return false;
922 
923 	if (repair->completion.result != VDO_SUCCESS) {
924 		page_count_t i;
925 
926 		for (i = 0; i < repair->page_count; i++) {
927 			struct vdo_page_completion *page_completion =
928 				&repair->page_completions[i];
929 
930 			if (page_completion->ready)
931 				vdo_release_page_completion(&page_completion->completion);
932 		}
933 
934 		vdo_launch_completion(&repair->completion);
935 		return true;
936 	}
937 
938 	if (repair->current_entry >= repair->entries)
939 		return false;
940 
941 	launch_repair_completion(repair, flush_block_map, VDO_ZONE_TYPE_ADMIN);
942 	return true;
943 }
944 
945 static void abort_block_map_recovery(struct repair_completion *repair, int result)
946 {
947 	vdo_set_completion_result(&repair->completion, result);
948 	finish_if_done(repair);
949 }
950 
951 /**
952  * find_entry_starting_next_page() - Find the first journal entry after a given entry which is not
953  *                                   on the same block map page.
954  * @repair: The repair completion.
955  * @current_entry: The entry to search from.
956  * @needs_sort: Whether sorting is needed to proceed.
957  *
958  * Return: Pointer to the first later journal entry on a different block map page, or a pointer to
959  *         just before the journal entries if no subsequent entry is on a different block map page.
960  */
961 static struct numbered_block_mapping *
962 find_entry_starting_next_page(struct repair_completion *repair,
963 			      struct numbered_block_mapping *current_entry, bool needs_sort)
964 {
965 	size_t current_page;
966 
967 	/* If current_entry is invalid, return immediately. */
968 	if (current_entry < repair->entries)
969 		return current_entry;
970 
971 	current_page = current_entry->block_map_slot.pbn;
972 
973 	/* Decrement current_entry until it's out of bounds or on a different page. */
974 	while ((current_entry >= repair->entries) &&
975 	       (current_entry->block_map_slot.pbn == current_page)) {
976 		if (needs_sort) {
977 			struct numbered_block_mapping *just_sorted_entry =
978 				sort_next_heap_element(repair);
979 			VDO_ASSERT_LOG_ONLY(just_sorted_entry < current_entry,
980 					    "heap is returning elements in an unexpected order");
981 		}
982 
983 		current_entry--;
984 	}
985 
986 	return current_entry;
987 }
988 
989 /*
990  * Apply a range of journal entries [starting_entry, ending_entry) journal
991  * entries to a block map page.
992  */
993 static void apply_journal_entries_to_page(struct block_map_page *page,
994 					  struct numbered_block_mapping *starting_entry,
995 					  struct numbered_block_mapping *ending_entry)
996 {
997 	struct numbered_block_mapping *current_entry = starting_entry;
998 
999 	while (current_entry != ending_entry) {
1000 		page->entries[current_entry->block_map_slot.slot] = current_entry->block_map_entry;
1001 		current_entry--;
1002 	}
1003 }
1004 
1005 static void recover_ready_pages(struct repair_completion *repair,
1006 				struct vdo_completion *completion);
1007 
1008 static void block_map_page_loaded(struct vdo_completion *completion)
1009 {
1010 	struct repair_completion *repair = as_repair_completion(completion->parent);
1011 
1012 	repair->outstanding--;
1013 	if (!repair->launching)
1014 		recover_ready_pages(repair, completion);
1015 }
1016 
1017 static void handle_block_map_page_load_error(struct vdo_completion *completion)
1018 {
1019 	struct repair_completion *repair = as_repair_completion(completion->parent);
1020 
1021 	repair->outstanding--;
1022 	abort_block_map_recovery(repair, completion->result);
1023 }
1024 
1025 static void fetch_block_map_page(struct repair_completion *repair,
1026 				 struct vdo_completion *completion)
1027 {
1028 	physical_block_number_t pbn;
1029 
1030 	if (repair->current_unfetched_entry < repair->entries)
1031 		/* Nothing left to fetch. */
1032 		return;
1033 
1034 	/* Fetch the next page we haven't yet requested. */
1035 	pbn = repair->current_unfetched_entry->block_map_slot.pbn;
1036 	repair->current_unfetched_entry =
1037 		find_entry_starting_next_page(repair, repair->current_unfetched_entry,
1038 					      true);
1039 	repair->outstanding++;
1040 	vdo_get_page(((struct vdo_page_completion *) completion),
1041 		     &repair->completion.vdo->block_map->zones[0], pbn, true,
1042 		     &repair->completion, block_map_page_loaded,
1043 		     handle_block_map_page_load_error, false);
1044 }
1045 
1046 static struct vdo_page_completion *get_next_page_completion(struct repair_completion *repair,
1047 							    struct vdo_page_completion *completion)
1048 {
1049 	completion++;
1050 	if (completion == (&repair->page_completions[repair->page_count]))
1051 		completion = &repair->page_completions[0];
1052 	return completion;
1053 }
1054 
1055 static void recover_ready_pages(struct repair_completion *repair,
1056 				struct vdo_completion *completion)
1057 {
1058 	struct vdo_page_completion *page_completion = (struct vdo_page_completion *) completion;
1059 
1060 	if (finish_if_done(repair))
1061 		return;
1062 
1063 	if (repair->pbn != page_completion->pbn)
1064 		return;
1065 
1066 	while (page_completion->ready) {
1067 		struct numbered_block_mapping *start_of_next_page;
1068 		struct block_map_page *page;
1069 		int result;
1070 
1071 		result = vdo_get_cached_page(completion, &page);
1072 		if (result != VDO_SUCCESS) {
1073 			abort_block_map_recovery(repair, result);
1074 			return;
1075 		}
1076 
1077 		start_of_next_page =
1078 			find_entry_starting_next_page(repair, repair->current_entry,
1079 						      false);
1080 		apply_journal_entries_to_page(page, repair->current_entry,
1081 					      start_of_next_page);
1082 		repair->current_entry = start_of_next_page;
1083 		vdo_request_page_write(completion);
1084 		vdo_release_page_completion(completion);
1085 
1086 		if (finish_if_done(repair))
1087 			return;
1088 
1089 		repair->pbn = repair->current_entry->block_map_slot.pbn;
1090 		fetch_block_map_page(repair, completion);
1091 		page_completion = get_next_page_completion(repair, page_completion);
1092 		completion = &page_completion->completion;
1093 	}
1094 }
1095 
1096 static void recover_block_map(struct vdo_completion *completion)
1097 {
1098 	struct repair_completion *repair = as_repair_completion(completion);
1099 	struct vdo *vdo = completion->vdo;
1100 	struct numbered_block_mapping *first_sorted_entry;
1101 	page_count_t i;
1102 
1103 	vdo_assert_on_logical_zone_thread(vdo, 0, __func__);
1104 
1105 	/* Suppress block map errors. */
1106 	vdo->block_map->zones[0].page_cache.rebuilding =
1107 		vdo_state_requires_read_only_rebuild(vdo->load_state);
1108 
1109 	if (repair->block_map_entry_count == 0) {
1110 		vdo_log_info("Replaying 0 recovery entries into block map");
1111 		vdo_free(vdo_forget(repair->journal_data));
1112 		launch_repair_completion(repair, load_slab_depot, VDO_ZONE_TYPE_ADMIN);
1113 		return;
1114 	}
1115 
1116 	/*
1117 	 * Organize the journal entries into a binary heap so we can iterate over them in sorted
1118 	 * order incrementally, avoiding an expensive sort call.
1119 	 */
1120 	repair->replay_heap = (struct replay_heap) {
1121 		.data = repair->entries,
1122 		.nr = repair->block_map_entry_count,
1123 		.size = repair->block_map_entry_count,
1124 	};
1125 	min_heapify_all(&repair->replay_heap, &repair_min_heap, NULL);
1126 
1127 	vdo_log_info("Replaying %zu recovery entries into block map",
1128 		     repair->block_map_entry_count);
1129 
1130 	repair->current_entry = &repair->entries[repair->block_map_entry_count - 1];
1131 	first_sorted_entry = sort_next_heap_element(repair);
1132 	VDO_ASSERT_LOG_ONLY(first_sorted_entry == repair->current_entry,
1133 			    "heap is returning elements in an unexpected order");
1134 
1135 	/* Prevent any page from being processed until all pages have been launched. */
1136 	repair->launching = true;
1137 	repair->pbn = repair->current_entry->block_map_slot.pbn;
1138 	repair->current_unfetched_entry = repair->current_entry;
1139 	for (i = 0; i < repair->page_count; i++) {
1140 		if (repair->current_unfetched_entry < repair->entries)
1141 			break;
1142 
1143 		fetch_block_map_page(repair, &repair->page_completions[i].completion);
1144 	}
1145 	repair->launching = false;
1146 
1147 	/* Process any ready pages. */
1148 	recover_ready_pages(repair, &repair->page_completions[0].completion);
1149 }
1150 
1151 /**
1152  * get_recovery_journal_block_header() - Get the block header for a block at a position in the
1153  *                                       journal data and unpack it.
1154  * @journal: The recovery journal.
1155  * @data: The recovery journal data.
1156  * @sequence: The sequence number.
1157  *
1158  * Return: The unpacked header.
1159  */
1160 static struct recovery_block_header __must_check
1161 get_recovery_journal_block_header(struct recovery_journal *journal, char *data,
1162 				  sequence_number_t sequence)
1163 {
1164 	physical_block_number_t pbn =
1165 		vdo_get_recovery_journal_block_number(journal, sequence);
1166 	char *header = &data[pbn * VDO_BLOCK_SIZE];
1167 
1168 	return vdo_unpack_recovery_block_header((struct packed_journal_header *) header);
1169 }
1170 
1171 /**
1172  * is_valid_recovery_journal_block() - Determine whether the given header describes a valid block
1173  *                                     for the given journal.
1174  * @journal: The journal to use.
1175  * @header: The unpacked block header to check.
1176  * @old_ok: Whether an old format header is valid.
1177  *
1178  * A block is not valid if it is unformatted, or if it is older than the last successful recovery
1179  * or reformat.
1180  *
1181  * Return: True if the header is valid.
1182  */
1183 static bool __must_check is_valid_recovery_journal_block(const struct recovery_journal *journal,
1184 							 const struct recovery_block_header *header,
1185 							 bool old_ok)
1186 {
1187 	if ((header->nonce != journal->nonce) ||
1188 	    (header->recovery_count != journal->recovery_count))
1189 		return false;
1190 
1191 	if (header->metadata_type == VDO_METADATA_RECOVERY_JOURNAL_2)
1192 		return (header->entry_count <= journal->entries_per_block);
1193 
1194 	return (old_ok &&
1195 		(header->metadata_type == VDO_METADATA_RECOVERY_JOURNAL) &&
1196 		(header->entry_count <= RECOVERY_JOURNAL_1_ENTRIES_PER_BLOCK));
1197 }
1198 
1199 /**
1200  * is_exact_recovery_journal_block() - Determine whether the given header describes the exact block
1201  *                                     indicated.
1202  * @journal: The journal to use.
1203  * @header: The unpacked block header to check.
1204  * @sequence: The expected sequence number.
1205  *
1206  * Return: True if the block matches.
1207  */
1208 static bool __must_check is_exact_recovery_journal_block(const struct recovery_journal *journal,
1209 							 const struct recovery_block_header *header,
1210 							 sequence_number_t sequence)
1211 {
1212 	return ((header->sequence_number == sequence) &&
1213 		(is_valid_recovery_journal_block(journal, header, true)));
1214 }
1215 
1216 /**
1217  * find_recovery_journal_head_and_tail() - Find the tail and head of the journal.
1218  * @repair: The repair completion.
1219  *
1220  * Return: True if there were valid journal blocks.
1221  */
1222 static bool find_recovery_journal_head_and_tail(struct repair_completion *repair)
1223 {
1224 	struct recovery_journal *journal = repair->completion.vdo->recovery_journal;
1225 	bool found_entries = false;
1226 	physical_block_number_t i;
1227 
1228 	/*
1229 	 * Ensure that we don't replay old entries since we know the tail recorded in the super
1230 	 * block must be a lower bound. Not doing so can result in extra data loss by setting the
1231 	 * tail too early.
1232 	 */
1233 	repair->highest_tail = journal->tail;
1234 	for (i = 0; i < journal->size; i++) {
1235 		struct recovery_block_header header =
1236 			get_recovery_journal_block_header(journal, repair->journal_data, i);
1237 
1238 		if (!is_valid_recovery_journal_block(journal, &header, true)) {
1239 			/* This block is old or incorrectly formatted */
1240 			continue;
1241 		}
1242 
1243 		if (vdo_get_recovery_journal_block_number(journal, header.sequence_number) != i) {
1244 			/* This block is in the wrong location */
1245 			continue;
1246 		}
1247 
1248 		if (header.sequence_number >= repair->highest_tail) {
1249 			found_entries = true;
1250 			repair->highest_tail = header.sequence_number;
1251 		}
1252 
1253 		if (!found_entries)
1254 			continue;
1255 
1256 		if (header.block_map_head > repair->block_map_head)
1257 			repair->block_map_head = header.block_map_head;
1258 
1259 		if (header.slab_journal_head > repair->slab_journal_head)
1260 			repair->slab_journal_head = header.slab_journal_head;
1261 	}
1262 
1263 	return found_entries;
1264 }
1265 
1266 /**
1267  * unpack_entry() - Unpack a recovery journal entry in either format.
1268  * @vdo: The vdo.
1269  * @packed: The entry to unpack.
1270  * @format: The expected format of the entry.
1271  * @entry: The unpacked entry.
1272  *
1273  * Return: true if the entry should be applied.3
1274  */
1275 static bool unpack_entry(struct vdo *vdo, char *packed, enum vdo_metadata_type format,
1276 			 struct recovery_journal_entry *entry)
1277 {
1278 	if (format == VDO_METADATA_RECOVERY_JOURNAL_2) {
1279 		struct packed_recovery_journal_entry *packed_entry =
1280 			(struct packed_recovery_journal_entry *) packed;
1281 
1282 		*entry = vdo_unpack_recovery_journal_entry(packed_entry);
1283 	} else {
1284 		physical_block_number_t low32, high4;
1285 
1286 		struct packed_recovery_journal_entry_1 *packed_entry =
1287 			(struct packed_recovery_journal_entry_1 *) packed;
1288 
1289 		if (packed_entry->operation == VDO_JOURNAL_DATA_INCREMENT)
1290 			entry->operation = VDO_JOURNAL_DATA_REMAPPING;
1291 		else if (packed_entry->operation == VDO_JOURNAL_BLOCK_MAP_INCREMENT)
1292 			entry->operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING;
1293 		else
1294 			return false;
1295 
1296 		low32 = __le32_to_cpu(packed_entry->pbn_low_word);
1297 		high4 = packed_entry->pbn_high_nibble;
1298 		entry->slot = (struct block_map_slot) {
1299 			.pbn = ((high4 << 32) | low32),
1300 			.slot = (packed_entry->slot_low | (packed_entry->slot_high << 6)),
1301 		};
1302 		entry->mapping = vdo_unpack_block_map_entry(&packed_entry->block_map_entry);
1303 		entry->unmapping = (struct data_location) {
1304 			.pbn = VDO_ZERO_BLOCK,
1305 			.state = VDO_MAPPING_STATE_UNMAPPED,
1306 		};
1307 	}
1308 
1309 	return (validate_recovery_journal_entry(vdo, entry) == VDO_SUCCESS);
1310 }
1311 
1312 /**
1313  * append_sector_entries() - Append an array of recovery journal entries from a journal block
1314  *                           sector to the array of numbered mappings in the repair completion,
1315  *                           numbering each entry in the order they are appended.
1316  * @repair: The repair completion.
1317  * @entries: The entries in the sector.
1318  * @format: The format of the sector.
1319  * @entry_count: The number of entries to append.
1320  */
1321 static void append_sector_entries(struct repair_completion *repair, char *entries,
1322 				  enum vdo_metadata_type format,
1323 				  journal_entry_count_t entry_count)
1324 {
1325 	journal_entry_count_t i;
1326 	struct vdo *vdo = repair->completion.vdo;
1327 	off_t increment = ((format == VDO_METADATA_RECOVERY_JOURNAL_2)
1328 			   ? sizeof(struct packed_recovery_journal_entry)
1329 			   : sizeof(struct packed_recovery_journal_entry_1));
1330 
1331 	for (i = 0; i < entry_count; i++, entries += increment) {
1332 		struct recovery_journal_entry entry;
1333 
1334 		if (!unpack_entry(vdo, entries, format, &entry))
1335 			/* When recovering from read-only mode, ignore damaged entries. */
1336 			continue;
1337 
1338 		repair->entries[repair->block_map_entry_count] =
1339 			(struct numbered_block_mapping) {
1340 			.block_map_slot = entry.slot,
1341 			.block_map_entry = vdo_pack_block_map_entry(entry.mapping.pbn,
1342 								    entry.mapping.state),
1343 			.number = repair->block_map_entry_count,
1344 		};
1345 		repair->block_map_entry_count++;
1346 	}
1347 }
1348 
1349 static journal_entry_count_t entries_per_sector(enum vdo_metadata_type format,
1350 						u8 sector_number)
1351 {
1352 	if (format == VDO_METADATA_RECOVERY_JOURNAL_2)
1353 		return RECOVERY_JOURNAL_ENTRIES_PER_SECTOR;
1354 
1355 	return ((sector_number == (VDO_SECTORS_PER_BLOCK - 1))
1356 		? RECOVERY_JOURNAL_1_ENTRIES_IN_LAST_SECTOR
1357 		: RECOVERY_JOURNAL_1_ENTRIES_PER_SECTOR);
1358 }
1359 
1360 static void extract_entries_from_block(struct repair_completion *repair,
1361 				       struct recovery_journal *journal,
1362 				       sequence_number_t sequence,
1363 				       enum vdo_metadata_type format,
1364 				       journal_entry_count_t entries)
1365 {
1366 	sector_count_t i;
1367 	struct recovery_block_header header =
1368 		get_recovery_journal_block_header(journal, repair->journal_data,
1369 						  sequence);
1370 
1371 	if (!is_exact_recovery_journal_block(journal, &header, sequence) ||
1372 	    (header.metadata_type != format)) {
1373 		/* This block is invalid, so skip it. */
1374 		return;
1375 	}
1376 
1377 	entries = min(entries, header.entry_count);
1378 	for (i = 1; i < VDO_SECTORS_PER_BLOCK; i++) {
1379 		struct packed_journal_sector *sector =
1380 			get_sector(journal, repair->journal_data, sequence, i);
1381 		journal_entry_count_t sector_entries =
1382 			min(entries, entries_per_sector(format, i));
1383 
1384 		if (vdo_is_valid_recovery_journal_sector(&header, sector, i)) {
1385 			/* Only extract as many as the block header calls for. */
1386 			append_sector_entries(repair, (char *) sector->entries, format,
1387 					      min_t(journal_entry_count_t,
1388 						    sector->entry_count,
1389 						    sector_entries));
1390 		}
1391 
1392 		/*
1393 		 * Even if the sector wasn't full, count it as full when counting up to the
1394 		 * entry count the block header claims.
1395 		 */
1396 		entries -= sector_entries;
1397 	}
1398 }
1399 
1400 static int parse_journal_for_rebuild(struct repair_completion *repair)
1401 {
1402 	int result;
1403 	sequence_number_t i;
1404 	block_count_t count;
1405 	enum vdo_metadata_type format;
1406 	struct vdo *vdo = repair->completion.vdo;
1407 	struct recovery_journal *journal = vdo->recovery_journal;
1408 	journal_entry_count_t entries_per_block = journal->entries_per_block;
1409 
1410 	format = get_recovery_journal_block_header(journal, repair->journal_data,
1411 						   repair->highest_tail).metadata_type;
1412 	if (format == VDO_METADATA_RECOVERY_JOURNAL)
1413 		entries_per_block = RECOVERY_JOURNAL_1_ENTRIES_PER_BLOCK;
1414 
1415 	/*
1416 	 * Allocate an array of numbered_block_mapping structures large enough to transcribe every
1417 	 * packed_recovery_journal_entry from every valid journal block.
1418 	 */
1419 	count = ((repair->highest_tail - repair->block_map_head + 1) * entries_per_block);
1420 	result = vdo_allocate(count, struct numbered_block_mapping, __func__,
1421 			      &repair->entries);
1422 	if (result != VDO_SUCCESS)
1423 		return result;
1424 
1425 	for (i = repair->block_map_head; i <= repair->highest_tail; i++)
1426 		extract_entries_from_block(repair, journal, i, format, entries_per_block);
1427 
1428 	return VDO_SUCCESS;
1429 }
1430 
1431 static int validate_heads(struct repair_completion *repair)
1432 {
1433 	/* Both reap heads must be behind the tail. */
1434 	if ((repair->block_map_head <= repair->tail) &&
1435 	    (repair->slab_journal_head <= repair->tail))
1436 		return VDO_SUCCESS;
1437 
1438 
1439 	return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
1440 				      "Journal tail too early. block map head: %llu, slab journal head: %llu, tail: %llu",
1441 				      (unsigned long long) repair->block_map_head,
1442 				      (unsigned long long) repair->slab_journal_head,
1443 				      (unsigned long long) repair->tail);
1444 }
1445 
1446 /**
1447  * extract_new_mappings() - Find all valid new mappings to be applied to the block map.
1448  * @repair: The repair completion.
1449  *
1450  * The mappings are extracted from the journal and stored in a sortable array so that all of the
1451  * mappings to be applied to a given block map page can be done in a single page fetch.
1452  */
1453 static int extract_new_mappings(struct repair_completion *repair)
1454 {
1455 	int result;
1456 	struct vdo *vdo = repair->completion.vdo;
1457 	struct recovery_point recovery_point = {
1458 		.sequence_number = repair->block_map_head,
1459 		.sector_count = 1,
1460 		.entry_count = 0,
1461 	};
1462 
1463 	/*
1464 	 * Allocate an array of numbered_block_mapping structs just large enough to transcribe
1465 	 * every packed_recovery_journal_entry from every valid journal block.
1466 	 */
1467 	result = vdo_allocate(repair->entry_count, struct numbered_block_mapping,
1468 			      __func__, &repair->entries);
1469 	if (result != VDO_SUCCESS)
1470 		return result;
1471 
1472 	for (; before_recovery_point(&recovery_point, &repair->tail_recovery_point);
1473 	     increment_recovery_point(&recovery_point)) {
1474 		struct recovery_journal_entry entry = get_entry(repair, &recovery_point);
1475 
1476 		result = validate_recovery_journal_entry(vdo, &entry);
1477 		if (result != VDO_SUCCESS) {
1478 			vdo_enter_read_only_mode(vdo, result);
1479 			return result;
1480 		}
1481 
1482 		repair->entries[repair->block_map_entry_count] =
1483 			(struct numbered_block_mapping) {
1484 			.block_map_slot = entry.slot,
1485 			.block_map_entry = vdo_pack_block_map_entry(entry.mapping.pbn,
1486 								    entry.mapping.state),
1487 			.number = repair->block_map_entry_count,
1488 		};
1489 		repair->block_map_entry_count++;
1490 	}
1491 
1492 	result = VDO_ASSERT((repair->block_map_entry_count <= repair->entry_count),
1493 			    "approximate entry count is an upper bound");
1494 	if (result != VDO_SUCCESS)
1495 		vdo_enter_read_only_mode(vdo, result);
1496 
1497 	return result;
1498 }
1499 
1500 /**
1501  * compute_usages() - Compute the lbns in use and block map data blocks counts from the tail of
1502  *                    the journal.
1503  * @repair: The repair completion.
1504  */
1505 static noinline int compute_usages(struct repair_completion *repair)
1506 {
1507 	/*
1508 	 * This function is declared noinline to avoid a spurious valgrind error regarding the
1509 	 * following structure being uninitialized.
1510 	 */
1511 	struct recovery_point recovery_point = {
1512 		.sequence_number = repair->tail,
1513 		.sector_count = 1,
1514 		.entry_count = 0,
1515 	};
1516 
1517 	struct vdo *vdo = repair->completion.vdo;
1518 	struct recovery_journal *journal = vdo->recovery_journal;
1519 	struct recovery_block_header header =
1520 		get_recovery_journal_block_header(journal, repair->journal_data,
1521 						  repair->tail);
1522 
1523 	repair->logical_blocks_used = header.logical_blocks_used;
1524 	repair->block_map_data_blocks = header.block_map_data_blocks;
1525 
1526 	for (; before_recovery_point(&recovery_point, &repair->tail_recovery_point);
1527 	     increment_recovery_point(&recovery_point)) {
1528 		struct recovery_journal_entry entry = get_entry(repair, &recovery_point);
1529 		int result;
1530 
1531 		result = validate_recovery_journal_entry(vdo, &entry);
1532 		if (result != VDO_SUCCESS) {
1533 			vdo_enter_read_only_mode(vdo, result);
1534 			return result;
1535 		}
1536 
1537 		if (entry.operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
1538 			repair->block_map_data_blocks++;
1539 			continue;
1540 		}
1541 
1542 		if (vdo_is_mapped_location(&entry.mapping))
1543 			repair->logical_blocks_used++;
1544 
1545 		if (vdo_is_mapped_location(&entry.unmapping))
1546 			repair->logical_blocks_used--;
1547 	}
1548 
1549 	return VDO_SUCCESS;
1550 }
1551 
1552 static int parse_journal_for_recovery(struct repair_completion *repair)
1553 {
1554 	int result;
1555 	sequence_number_t i, head;
1556 	bool found_entries = false;
1557 	struct recovery_journal *journal = repair->completion.vdo->recovery_journal;
1558 	struct recovery_block_header header;
1559 	enum vdo_metadata_type expected_format;
1560 
1561 	head = min(repair->block_map_head, repair->slab_journal_head);
1562 	header = get_recovery_journal_block_header(journal, repair->journal_data, head);
1563 	expected_format = header.metadata_type;
1564 	for (i = head; i <= repair->highest_tail; i++) {
1565 		journal_entry_count_t block_entries;
1566 		u8 j;
1567 
1568 		repair->tail = i;
1569 		repair->tail_recovery_point = (struct recovery_point) {
1570 			.sequence_number = i,
1571 			.sector_count = 0,
1572 			.entry_count = 0,
1573 		};
1574 
1575 		header = get_recovery_journal_block_header(journal, repair->journal_data, i);
1576 		if (!is_exact_recovery_journal_block(journal, &header, i)) {
1577 			/* A bad block header was found so this must be the end of the journal. */
1578 			break;
1579 		} else if (header.metadata_type != expected_format) {
1580 			/* There is a mix of old and new format blocks, so we need to rebuild. */
1581 			vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
1582 					       "Recovery journal is in an invalid format, a read-only rebuild is required.");
1583 			vdo_enter_read_only_mode(repair->completion.vdo, VDO_CORRUPT_JOURNAL);
1584 			return VDO_CORRUPT_JOURNAL;
1585 		}
1586 
1587 		block_entries = header.entry_count;
1588 
1589 		/* Examine each sector in turn to determine the last valid sector. */
1590 		for (j = 1; j < VDO_SECTORS_PER_BLOCK; j++) {
1591 			struct packed_journal_sector *sector =
1592 				get_sector(journal, repair->journal_data, i, j);
1593 			journal_entry_count_t sector_entries =
1594 				min_t(journal_entry_count_t, sector->entry_count,
1595 				      block_entries);
1596 
1597 			/* A bad sector means that this block was torn. */
1598 			if (!vdo_is_valid_recovery_journal_sector(&header, sector, j))
1599 				break;
1600 
1601 			if (sector_entries > 0) {
1602 				found_entries = true;
1603 				repair->tail_recovery_point.sector_count++;
1604 				repair->tail_recovery_point.entry_count = sector_entries;
1605 				block_entries -= sector_entries;
1606 				repair->entry_count += sector_entries;
1607 			}
1608 
1609 			/* If this sector is short, the later sectors can't matter. */
1610 			if ((sector_entries < RECOVERY_JOURNAL_ENTRIES_PER_SECTOR) ||
1611 			    (block_entries == 0))
1612 				break;
1613 		}
1614 
1615 		/* If this block was not filled, or if it tore, no later block can matter. */
1616 		if ((header.entry_count != journal->entries_per_block) || (block_entries > 0))
1617 			break;
1618 	}
1619 
1620 	if (!found_entries) {
1621 		return validate_heads(repair);
1622 	} else if (expected_format == VDO_METADATA_RECOVERY_JOURNAL) {
1623 		/* All journal blocks have the old format, so we need to upgrade. */
1624 		vdo_log_error_strerror(VDO_UNSUPPORTED_VERSION,
1625 				       "Recovery journal is in the old format. Downgrade and complete recovery, then upgrade with a clean volume");
1626 		return VDO_UNSUPPORTED_VERSION;
1627 	}
1628 
1629 	/* Set the tail to the last valid tail block, if there is one. */
1630 	if (repair->tail_recovery_point.sector_count == 0)
1631 		repair->tail--;
1632 
1633 	result = validate_heads(repair);
1634 	if (result != VDO_SUCCESS)
1635 		return result;
1636 
1637 	vdo_log_info("Highest-numbered recovery journal block has sequence number %llu, and the highest-numbered usable block is %llu",
1638 		     (unsigned long long) repair->highest_tail,
1639 		     (unsigned long long) repair->tail);
1640 
1641 	result = extract_new_mappings(repair);
1642 	if (result != VDO_SUCCESS)
1643 		return result;
1644 
1645 	return compute_usages(repair);
1646 }
1647 
1648 static int parse_journal(struct repair_completion *repair)
1649 {
1650 	if (!find_recovery_journal_head_and_tail(repair))
1651 		return VDO_SUCCESS;
1652 
1653 	return (vdo_state_requires_read_only_rebuild(repair->completion.vdo->load_state) ?
1654 		parse_journal_for_rebuild(repair) :
1655 		parse_journal_for_recovery(repair));
1656 }
1657 
1658 static void finish_journal_load(struct vdo_completion *completion)
1659 {
1660 	struct repair_completion *repair = completion->parent;
1661 
1662 	if (++repair->vios_complete != repair->vio_count)
1663 		return;
1664 
1665 	vdo_log_info("Finished reading recovery journal");
1666 	uninitialize_vios(repair);
1667 	prepare_repair_completion(repair, recover_block_map, VDO_ZONE_TYPE_LOGICAL);
1668 	vdo_continue_completion(&repair->completion, parse_journal(repair));
1669 }
1670 
1671 static void handle_journal_load_error(struct vdo_completion *completion)
1672 {
1673 	struct repair_completion *repair = completion->parent;
1674 
1675 	/* Preserve the error */
1676 	vdo_set_completion_result(&repair->completion, completion->result);
1677 	vio_record_metadata_io_error(as_vio(completion));
1678 	completion->callback(completion);
1679 }
1680 
1681 static void read_journal_endio(struct bio *bio)
1682 {
1683 	struct vio *vio = bio->bi_private;
1684 	struct vdo *vdo = vio->completion.vdo;
1685 
1686 	continue_vio_after_io(vio, finish_journal_load, vdo->thread_config.admin_thread);
1687 }
1688 
1689 /**
1690  * vdo_repair() - Load the recovery journal and then recover or rebuild a vdo.
1691  * @parent: The completion to notify when the operation is complete
1692  */
1693 void vdo_repair(struct vdo_completion *parent)
1694 {
1695 	int result;
1696 	char *ptr;
1697 	struct repair_completion *repair;
1698 	struct vdo *vdo = parent->vdo;
1699 	struct recovery_journal *journal = vdo->recovery_journal;
1700 	physical_block_number_t pbn = journal->origin;
1701 	block_count_t remaining = journal->size;
1702 	block_count_t vio_count = DIV_ROUND_UP(remaining, MAX_BLOCKS_PER_VIO);
1703 	page_count_t page_count = min_t(page_count_t,
1704 					vdo->device_config->cache_size >> 1,
1705 					MAXIMUM_SIMULTANEOUS_VDO_BLOCK_MAP_RESTORATION_READS);
1706 
1707 	vdo_assert_on_admin_thread(vdo, __func__);
1708 
1709 	if (vdo->load_state == VDO_FORCE_REBUILD) {
1710 		vdo_log_warning("Rebuilding reference counts to clear read-only mode");
1711 		vdo->states.vdo.read_only_recoveries++;
1712 	} else if (vdo->load_state == VDO_REBUILD_FOR_UPGRADE) {
1713 		vdo_log_warning("Rebuilding reference counts for upgrade");
1714 	} else {
1715 		vdo_log_warning("Device was dirty, rebuilding reference counts");
1716 	}
1717 
1718 	result = vdo_allocate_extended(struct repair_completion, page_count,
1719 				       struct vdo_page_completion, __func__,
1720 				       &repair);
1721 	if (result != VDO_SUCCESS) {
1722 		vdo_fail_completion(parent, result);
1723 		return;
1724 	}
1725 
1726 	vdo_initialize_completion(&repair->completion, vdo, VDO_REPAIR_COMPLETION);
1727 	repair->completion.error_handler = abort_repair;
1728 	repair->completion.parent = parent;
1729 	prepare_repair_completion(repair, finish_repair, VDO_ZONE_TYPE_ADMIN);
1730 	repair->page_count = page_count;
1731 
1732 	result = vdo_allocate(remaining * VDO_BLOCK_SIZE, char, __func__,
1733 			      &repair->journal_data);
1734 	if (abort_on_error(result, repair))
1735 		return;
1736 
1737 	result = vdo_allocate(vio_count, struct vio, __func__, &repair->vios);
1738 	if (abort_on_error(result, repair))
1739 		return;
1740 
1741 	ptr = repair->journal_data;
1742 	for (repair->vio_count = 0; repair->vio_count < vio_count; repair->vio_count++) {
1743 		block_count_t blocks = min_t(block_count_t, remaining,
1744 					     MAX_BLOCKS_PER_VIO);
1745 
1746 		result = allocate_vio_components(vdo, VIO_TYPE_RECOVERY_JOURNAL,
1747 						 VIO_PRIORITY_METADATA,
1748 						 repair, blocks, ptr,
1749 						 &repair->vios[repair->vio_count]);
1750 		if (abort_on_error(result, repair))
1751 			return;
1752 
1753 		ptr += (blocks * VDO_BLOCK_SIZE);
1754 		remaining -= blocks;
1755 	}
1756 
1757 	for (vio_count = 0; vio_count < repair->vio_count;
1758 	     vio_count++, pbn += MAX_BLOCKS_PER_VIO) {
1759 		vdo_submit_metadata_vio(&repair->vios[vio_count], pbn, read_journal_endio,
1760 					handle_journal_load_error, REQ_OP_READ);
1761 	}
1762 }
1763