xref: /linux/drivers/md/dm-vdo/repair.c (revision 75079df919efcc30eb5bf0427c83fb578f4fe4fc)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "repair.h"
7 
8 #include <linux/min_heap.h>
9 #include <linux/minmax.h>
10 
11 #include "logger.h"
12 #include "memory-alloc.h"
13 #include "permassert.h"
14 
15 #include "block-map.h"
16 #include "completion.h"
17 #include "constants.h"
18 #include "encodings.h"
19 #include "int-map.h"
20 #include "io-submitter.h"
21 #include "recovery-journal.h"
22 #include "slab-depot.h"
23 #include "types.h"
24 #include "vdo.h"
25 #include "wait-queue.h"
26 
27 /*
28  * An explicitly numbered block mapping. Numbering the mappings allows them to be sorted by logical
29  * block number during repair while still preserving the relative order of journal entries with
30  * the same logical block number.
31  */
32 struct numbered_block_mapping {
33 	struct block_map_slot block_map_slot;
34 	struct block_map_entry block_map_entry;
35 	/* A serial number to use during replay */
36 	u32 number;
37 } __packed;
38 
39 /*
40  * The absolute position of an entry in the recovery journal, including the sector number and the
41  * entry number within the sector.
42  */
43 struct recovery_point {
44 	/* Block sequence number */
45 	sequence_number_t sequence_number;
46 	/* Sector number */
47 	u8 sector_count;
48 	/* Entry number */
49 	journal_entry_count_t entry_count;
50 	/* Whether or not the increment portion of the current entry has been applied */
51 	bool increment_applied;
52 };
53 
54 struct repair_completion {
55 	/* The completion header */
56 	struct vdo_completion completion;
57 
58 	/* A buffer to hold the data read off disk */
59 	char *journal_data;
60 
61 	/* For loading the journal */
62 	data_vio_count_t vio_count;
63 	data_vio_count_t vios_complete;
64 	struct vio *vios;
65 
66 	/* The number of entries to be applied to the block map */
67 	size_t block_map_entry_count;
68 	/* The sequence number of the first valid block for block map recovery */
69 	sequence_number_t block_map_head;
70 	/* The sequence number of the first valid block for slab journal replay */
71 	sequence_number_t slab_journal_head;
72 	/* The sequence number of the last valid block of the journal (if known) */
73 	sequence_number_t tail;
74 	/*
75 	 * The highest sequence number of the journal. During recovery (vs read-only rebuild), not
76 	 * the same as the tail, since the tail ignores blocks after the first hole.
77 	 */
78 	sequence_number_t highest_tail;
79 
80 	/* The number of logical blocks currently known to be in use */
81 	block_count_t logical_blocks_used;
82 	/* The number of block map data blocks known to be allocated */
83 	block_count_t block_map_data_blocks;
84 
85 	/* These fields are for playing the journal into the block map */
86 	/* The entry data for the block map recovery */
87 	struct numbered_block_mapping *entries;
88 	/* The number of entries in the entry array */
89 	size_t entry_count;
90 	/* number of pending (non-ready) requests*/
91 	page_count_t outstanding;
92 	/* number of page completions */
93 	page_count_t page_count;
94 	bool launching;
95 	/*
96 	 * a heap wrapping journal_entries. It re-orders and sorts journal entries in ascending LBN
97 	 * order, then original journal order. This permits efficient iteration over the journal
98 	 * entries in order.
99 	 */
100 	struct min_heap replay_heap;
101 	/* Fields tracking progress through the journal entries. */
102 	struct numbered_block_mapping *current_entry;
103 	struct numbered_block_mapping *current_unfetched_entry;
104 	/* Current requested page's PBN */
105 	physical_block_number_t pbn;
106 
107 	/* These fields are only used during recovery. */
108 	/* A location just beyond the last valid entry of the journal */
109 	struct recovery_point tail_recovery_point;
110 	/* The location of the next recovery journal entry to apply */
111 	struct recovery_point next_recovery_point;
112 	/* The journal point to give to the next synthesized decref */
113 	struct journal_point next_journal_point;
114 	/* The number of entries played into slab journals */
115 	size_t entries_added_to_slab_journals;
116 
117 	/* These fields are only used during read-only rebuild */
118 	page_count_t page_to_fetch;
119 	/* the number of leaf pages in the block map */
120 	page_count_t leaf_pages;
121 	/* the last slot of the block map */
122 	struct block_map_slot last_slot;
123 
124 	/*
125 	 * The page completions used for playing the journal into the block map, and, during
126 	 * read-only rebuild, for rebuilding the reference counts from the block map.
127 	 */
128 	struct vdo_page_completion page_completions[];
129 };
130 
131 /*
132  * This is a min_heap callback function that orders numbered_block_mappings using the
133  * 'block_map_slot' field as the primary key and the mapping 'number' field as the secondary key.
134  * Using the mapping number preserves the journal order of entries for the same slot, allowing us
135  * to sort by slot while still ensuring we replay all entries with the same slot in the exact order
136  * as they appeared in the journal.
137  */
138 static bool mapping_is_less_than(const void *item1, const void *item2)
139 {
140 	const struct numbered_block_mapping *mapping1 =
141 		(const struct numbered_block_mapping *) item1;
142 	const struct numbered_block_mapping *mapping2 =
143 		(const struct numbered_block_mapping *) item2;
144 
145 	if (mapping1->block_map_slot.pbn != mapping2->block_map_slot.pbn)
146 		return mapping1->block_map_slot.pbn < mapping2->block_map_slot.pbn;
147 
148 	if (mapping1->block_map_slot.slot != mapping2->block_map_slot.slot)
149 		return mapping1->block_map_slot.slot < mapping2->block_map_slot.slot;
150 
151 	if (mapping1->number != mapping2->number)
152 		return mapping1->number < mapping2->number;
153 
154 	return 0;
155 }
156 
157 static void swap_mappings(void *item1, void *item2)
158 {
159 	struct numbered_block_mapping *mapping1 = item1;
160 	struct numbered_block_mapping *mapping2 = item2;
161 
162 	swap(*mapping1, *mapping2);
163 }
164 
165 static const struct min_heap_callbacks repair_min_heap = {
166 	.elem_size = sizeof(struct numbered_block_mapping),
167 	.less = mapping_is_less_than,
168 	.swp = swap_mappings,
169 };
170 
171 static struct numbered_block_mapping *sort_next_heap_element(struct repair_completion *repair)
172 {
173 	struct min_heap *heap = &repair->replay_heap;
174 	struct numbered_block_mapping *last;
175 
176 	if (heap->nr == 0)
177 		return NULL;
178 
179 	/*
180 	 * Swap the next heap element with the last one on the heap, popping it off the heap,
181 	 * restore the heap invariant, and return a pointer to the popped element.
182 	 */
183 	last = &repair->entries[--heap->nr];
184 	swap_mappings(heap->data, last);
185 	min_heapify(heap, 0, &repair_min_heap);
186 	return last;
187 }
188 
189 /**
190  * as_repair_completion() - Convert a generic completion to a repair_completion.
191  * @completion: The completion to convert.
192  *
193  * Return: The repair_completion.
194  */
195 static inline struct repair_completion * __must_check
196 as_repair_completion(struct vdo_completion *completion)
197 {
198 	vdo_assert_completion_type(completion, VDO_REPAIR_COMPLETION);
199 	return container_of(completion, struct repair_completion, completion);
200 }
201 
202 static void prepare_repair_completion(struct repair_completion *repair,
203 				      vdo_action_fn callback, enum vdo_zone_type zone_type)
204 {
205 	struct vdo_completion *completion = &repair->completion;
206 	const struct thread_config *thread_config = &completion->vdo->thread_config;
207 	thread_id_t thread_id;
208 
209 	/* All blockmap access is done on single thread, so use logical zone 0. */
210 	thread_id = ((zone_type == VDO_ZONE_TYPE_LOGICAL) ?
211 		     thread_config->logical_threads[0] :
212 		     thread_config->admin_thread);
213 	vdo_reset_completion(completion);
214 	vdo_set_completion_callback(completion, callback, thread_id);
215 }
216 
217 static void launch_repair_completion(struct repair_completion *repair,
218 				     vdo_action_fn callback, enum vdo_zone_type zone_type)
219 {
220 	prepare_repair_completion(repair, callback, zone_type);
221 	vdo_launch_completion(&repair->completion);
222 }
223 
224 static void uninitialize_vios(struct repair_completion *repair)
225 {
226 	while (repair->vio_count > 0)
227 		free_vio_components(&repair->vios[--repair->vio_count]);
228 
229 	vdo_free(vdo_forget(repair->vios));
230 }
231 
232 static void free_repair_completion(struct repair_completion *repair)
233 {
234 	if (repair == NULL)
235 		return;
236 
237 	/*
238 	 * We do this here because this function is the only common bottleneck for all clean up
239 	 * paths.
240 	 */
241 	repair->completion.vdo->block_map->zones[0].page_cache.rebuilding = false;
242 
243 	uninitialize_vios(repair);
244 	vdo_free(vdo_forget(repair->journal_data));
245 	vdo_free(vdo_forget(repair->entries));
246 	vdo_free(repair);
247 }
248 
249 static void finish_repair(struct vdo_completion *completion)
250 {
251 	struct vdo_completion *parent = completion->parent;
252 	struct vdo *vdo = completion->vdo;
253 	struct repair_completion *repair = as_repair_completion(completion);
254 
255 	vdo_assert_on_admin_thread(vdo, __func__);
256 
257 	if (vdo->load_state != VDO_REBUILD_FOR_UPGRADE)
258 		vdo->states.vdo.complete_recoveries++;
259 
260 	vdo_initialize_recovery_journal_post_repair(vdo->recovery_journal,
261 						    vdo->states.vdo.complete_recoveries,
262 						    repair->highest_tail,
263 						    repair->logical_blocks_used,
264 						    repair->block_map_data_blocks);
265 	free_repair_completion(vdo_forget(repair));
266 
267 	if (vdo_state_requires_read_only_rebuild(vdo->load_state)) {
268 		vdo_log_info("Read-only rebuild complete");
269 		vdo_launch_completion(parent);
270 		return;
271 	}
272 
273 	/* FIXME: shouldn't this say either "recovery" or "repair"? */
274 	vdo_log_info("Rebuild complete");
275 
276 	/*
277 	 * Now that we've freed the repair completion and its vast array of journal entries, we
278 	 * can allocate refcounts.
279 	 */
280 	vdo_continue_completion(parent, vdo_allocate_reference_counters(vdo->depot));
281 }
282 
283 /**
284  * abort_repair() - Handle a repair error.
285  * @completion: The repair completion.
286  */
287 static void abort_repair(struct vdo_completion *completion)
288 {
289 	struct vdo_completion *parent = completion->parent;
290 	int result = completion->result;
291 	struct repair_completion *repair = as_repair_completion(completion);
292 
293 	if (vdo_state_requires_read_only_rebuild(completion->vdo->load_state))
294 		vdo_log_info("Read-only rebuild aborted");
295 	else
296 		vdo_log_warning("Recovery aborted");
297 
298 	free_repair_completion(vdo_forget(repair));
299 	vdo_continue_completion(parent, result);
300 }
301 
302 /**
303  * abort_on_error() - Abort a repair if there is an error.
304  * @result: The result to check.
305  * @repair: The repair completion.
306  *
307  * Return: true if the result was an error.
308  */
309 static bool __must_check abort_on_error(int result, struct repair_completion *repair)
310 {
311 	if (result == VDO_SUCCESS)
312 		return false;
313 
314 	vdo_fail_completion(&repair->completion, result);
315 	return true;
316 }
317 
318 /**
319  * drain_slab_depot() - Flush out all dirty refcounts blocks now that they have been rebuilt or
320  *                      recovered.
321  */
322 static void drain_slab_depot(struct vdo_completion *completion)
323 {
324 	struct vdo *vdo = completion->vdo;
325 	struct repair_completion *repair = as_repair_completion(completion);
326 	const struct admin_state_code *operation;
327 
328 	vdo_assert_on_admin_thread(vdo, __func__);
329 
330 	prepare_repair_completion(repair, finish_repair, VDO_ZONE_TYPE_ADMIN);
331 	if (vdo_state_requires_read_only_rebuild(vdo->load_state)) {
332 		vdo_log_info("Saving rebuilt state");
333 		operation = VDO_ADMIN_STATE_REBUILDING;
334 	} else {
335 		vdo_log_info("Replayed %zu journal entries into slab journals",
336 			     repair->entries_added_to_slab_journals);
337 		operation = VDO_ADMIN_STATE_RECOVERING;
338 	}
339 
340 	vdo_drain_slab_depot(vdo->depot, operation, completion);
341 }
342 
343 /**
344  * flush_block_map_updates() - Flush the block map now that all the reference counts are rebuilt.
345  * @completion: The repair completion.
346  *
347  * This callback is registered in finish_if_done().
348  */
349 static void flush_block_map_updates(struct vdo_completion *completion)
350 {
351 	vdo_assert_on_admin_thread(completion->vdo, __func__);
352 
353 	vdo_log_info("Flushing block map changes");
354 	prepare_repair_completion(as_repair_completion(completion), drain_slab_depot,
355 				  VDO_ZONE_TYPE_ADMIN);
356 	vdo_drain_block_map(completion->vdo->block_map, VDO_ADMIN_STATE_RECOVERING,
357 			    completion);
358 }
359 
360 static bool fetch_page(struct repair_completion *repair,
361 		       struct vdo_completion *completion);
362 
363 /**
364  * handle_page_load_error() - Handle an error loading a page.
365  * @completion: The vdo_page_completion.
366  */
367 static void handle_page_load_error(struct vdo_completion *completion)
368 {
369 	struct repair_completion *repair = completion->parent;
370 
371 	repair->outstanding--;
372 	vdo_set_completion_result(&repair->completion, completion->result);
373 	vdo_release_page_completion(completion);
374 	fetch_page(repair, completion);
375 }
376 
377 /**
378  * unmap_entry() - Unmap an invalid entry and indicate that its page must be written out.
379  * @page: The page containing the entries
380  * @completion: The page_completion for writing the page
381  * @slot: The slot to unmap
382  */
383 static void unmap_entry(struct block_map_page *page, struct vdo_completion *completion,
384 			slot_number_t slot)
385 {
386 	page->entries[slot] = UNMAPPED_BLOCK_MAP_ENTRY;
387 	vdo_request_page_write(completion);
388 }
389 
390 /**
391  * remove_out_of_bounds_entries() - Unmap entries which outside the logical space.
392  * @page: The page containing the entries
393  * @completion: The page_completion for writing the page
394  * @start: The first slot to check
395  */
396 static void remove_out_of_bounds_entries(struct block_map_page *page,
397 					 struct vdo_completion *completion,
398 					 slot_number_t start)
399 {
400 	slot_number_t slot;
401 
402 	for (slot = start; slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; slot++) {
403 		struct data_location mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
404 
405 		if (vdo_is_mapped_location(&mapping))
406 			unmap_entry(page, completion, slot);
407 	}
408 }
409 
410 /**
411  * process_slot() - Update the reference counts for a single entry.
412  * @page: The page containing the entries
413  * @completion: The page_completion for writing the page
414  * @slot: The slot to check
415  *
416  * Return: true if the entry was a valid mapping
417  */
418 static bool process_slot(struct block_map_page *page, struct vdo_completion *completion,
419 			 slot_number_t slot)
420 {
421 	struct slab_depot *depot = completion->vdo->depot;
422 	int result;
423 	struct data_location mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
424 
425 	if (!vdo_is_valid_location(&mapping)) {
426 		/* This entry is invalid, so remove it from the page. */
427 		unmap_entry(page, completion, slot);
428 		return false;
429 	}
430 
431 	if (!vdo_is_mapped_location(&mapping))
432 		return false;
433 
434 
435 	if (mapping.pbn == VDO_ZERO_BLOCK)
436 		return true;
437 
438 	if (!vdo_is_physical_data_block(depot, mapping.pbn)) {
439 		/*
440 		 * This is a nonsense mapping. Remove it from the map so we're at least consistent
441 		 * and mark the page dirty.
442 		 */
443 		unmap_entry(page, completion, slot);
444 		return false;
445 	}
446 
447 	result = vdo_adjust_reference_count_for_rebuild(depot, mapping.pbn,
448 							VDO_JOURNAL_DATA_REMAPPING);
449 	if (result == VDO_SUCCESS)
450 		return true;
451 
452 	vdo_log_error_strerror(result,
453 			       "Could not adjust reference count for PBN %llu, slot %u mapped to PBN %llu",
454 			       (unsigned long long) vdo_get_block_map_page_pbn(page),
455 			       slot, (unsigned long long) mapping.pbn);
456 	unmap_entry(page, completion, slot);
457 	return false;
458 }
459 
460 /**
461  * rebuild_reference_counts_from_page() - Rebuild reference counts from a block map page.
462  * @repair: The repair completion.
463  * @completion: The page completion holding the page.
464  */
465 static void rebuild_reference_counts_from_page(struct repair_completion *repair,
466 					       struct vdo_completion *completion)
467 {
468 	slot_number_t slot, last_slot;
469 	struct block_map_page *page;
470 	int result;
471 
472 	result = vdo_get_cached_page(completion, &page);
473 	if (result != VDO_SUCCESS) {
474 		vdo_set_completion_result(&repair->completion, result);
475 		return;
476 	}
477 
478 	if (!page->header.initialized)
479 		return;
480 
481 	/* Remove any bogus entries which exist beyond the end of the logical space. */
482 	if (vdo_get_block_map_page_pbn(page) == repair->last_slot.pbn) {
483 		last_slot = repair->last_slot.slot;
484 		remove_out_of_bounds_entries(page, completion, last_slot);
485 	} else {
486 		last_slot = VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
487 	}
488 
489 	/* Inform the slab depot of all entries on this page. */
490 	for (slot = 0; slot < last_slot; slot++) {
491 		if (process_slot(page, completion, slot))
492 			repair->logical_blocks_used++;
493 	}
494 }
495 
496 /**
497  * page_loaded() - Process a page which has just been loaded.
498  * @completion: The vdo_page_completion for the fetched page.
499  *
500  * This callback is registered by fetch_page().
501  */
502 static void page_loaded(struct vdo_completion *completion)
503 {
504 	struct repair_completion *repair = completion->parent;
505 
506 	repair->outstanding--;
507 	rebuild_reference_counts_from_page(repair, completion);
508 	vdo_release_page_completion(completion);
509 
510 	/* Advance progress to the next page, and fetch the next page we haven't yet requested. */
511 	fetch_page(repair, completion);
512 }
513 
514 static physical_block_number_t get_pbn_to_fetch(struct repair_completion *repair,
515 						struct block_map *block_map)
516 {
517 	physical_block_number_t pbn = VDO_ZERO_BLOCK;
518 
519 	if (repair->completion.result != VDO_SUCCESS)
520 		return VDO_ZERO_BLOCK;
521 
522 	while ((pbn == VDO_ZERO_BLOCK) && (repair->page_to_fetch < repair->leaf_pages))
523 		pbn = vdo_find_block_map_page_pbn(block_map, repair->page_to_fetch++);
524 
525 	if (vdo_is_physical_data_block(repair->completion.vdo->depot, pbn))
526 		return pbn;
527 
528 	vdo_set_completion_result(&repair->completion, VDO_BAD_MAPPING);
529 	return VDO_ZERO_BLOCK;
530 }
531 
532 /**
533  * fetch_page() - Fetch a page from the block map.
534  * @repair: The repair_completion.
535  * @completion: The page completion to use.
536  *
537  * Return true if the rebuild is complete
538  */
539 static bool fetch_page(struct repair_completion *repair,
540 		       struct vdo_completion *completion)
541 {
542 	struct vdo_page_completion *page_completion = (struct vdo_page_completion *) completion;
543 	struct block_map *block_map = repair->completion.vdo->block_map;
544 	physical_block_number_t pbn = get_pbn_to_fetch(repair, block_map);
545 
546 	if (pbn != VDO_ZERO_BLOCK) {
547 		repair->outstanding++;
548 		/*
549 		 * We must set the requeue flag here to ensure that we don't blow the stack if all
550 		 * the requested pages are already in the cache or get load errors.
551 		 */
552 		vdo_get_page(page_completion, &block_map->zones[0], pbn, true, repair,
553 			     page_loaded, handle_page_load_error, true);
554 	}
555 
556 	if (repair->outstanding > 0)
557 		return false;
558 
559 	launch_repair_completion(repair, flush_block_map_updates, VDO_ZONE_TYPE_ADMIN);
560 	return true;
561 }
562 
563 /**
564  * rebuild_from_leaves() - Rebuild reference counts from the leaf block map pages.
565  * @completion: The repair completion.
566  *
567  * Rebuilds reference counts from the leaf block map pages now that reference counts have been
568  * rebuilt from the interior tree pages (which have been loaded in the process). This callback is
569  * registered in rebuild_reference_counts().
570  */
571 static void rebuild_from_leaves(struct vdo_completion *completion)
572 {
573 	page_count_t i;
574 	struct repair_completion *repair = as_repair_completion(completion);
575 	struct block_map *map = completion->vdo->block_map;
576 
577 	repair->logical_blocks_used = 0;
578 
579 	/*
580 	 * The PBN calculation doesn't work until the tree pages have been loaded, so we can't set
581 	 * this value at the start of repair.
582 	 */
583 	repair->leaf_pages = vdo_compute_block_map_page_count(map->entry_count);
584 	repair->last_slot = (struct block_map_slot) {
585 		.slot = map->entry_count % VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
586 		.pbn = vdo_find_block_map_page_pbn(map, repair->leaf_pages - 1),
587 	};
588 	if (repair->last_slot.slot == 0)
589 		repair->last_slot.slot = VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
590 
591 	for (i = 0; i < repair->page_count; i++) {
592 		if (fetch_page(repair, &repair->page_completions[i].completion)) {
593 			/*
594 			 * The rebuild has already moved on, so it isn't safe nor is there a need
595 			 * to launch any more fetches.
596 			 */
597 			return;
598 		}
599 	}
600 }
601 
602 /**
603  * process_entry() - Process a single entry from the block map tree.
604  * @pbn: A pbn which holds a block map tree page.
605  * @completion: The parent completion of the traversal.
606  *
607  * Implements vdo_entry_callback_fn.
608  *
609  * Return: VDO_SUCCESS or an error.
610  */
611 static int process_entry(physical_block_number_t pbn, struct vdo_completion *completion)
612 {
613 	struct repair_completion *repair = as_repair_completion(completion);
614 	struct slab_depot *depot = completion->vdo->depot;
615 	int result;
616 
617 	if ((pbn == VDO_ZERO_BLOCK) || !vdo_is_physical_data_block(depot, pbn)) {
618 		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
619 					      "PBN %llu out of range",
620 					      (unsigned long long) pbn);
621 	}
622 
623 	result = vdo_adjust_reference_count_for_rebuild(depot, pbn,
624 							VDO_JOURNAL_BLOCK_MAP_REMAPPING);
625 	if (result != VDO_SUCCESS) {
626 		return vdo_log_error_strerror(result,
627 					      "Could not adjust reference count for block map tree PBN %llu",
628 					      (unsigned long long) pbn);
629 	}
630 
631 	repair->block_map_data_blocks++;
632 	return VDO_SUCCESS;
633 }
634 
635 static void rebuild_reference_counts(struct vdo_completion *completion)
636 {
637 	struct repair_completion *repair = as_repair_completion(completion);
638 	struct vdo *vdo = completion->vdo;
639 	struct vdo_page_cache *cache = &vdo->block_map->zones[0].page_cache;
640 
641 	/* We must allocate ref_counts before we can rebuild them. */
642 	if (abort_on_error(vdo_allocate_reference_counters(vdo->depot), repair))
643 		return;
644 
645 	/*
646 	 * Completion chaining from page cache hits can lead to stack overflow during the rebuild,
647 	 * so clear out the cache before this rebuild phase.
648 	 */
649 	if (abort_on_error(vdo_invalidate_page_cache(cache), repair))
650 		return;
651 
652 	prepare_repair_completion(repair, rebuild_from_leaves, VDO_ZONE_TYPE_LOGICAL);
653 	vdo_traverse_forest(vdo->block_map, process_entry, completion);
654 }
655 
656 /**
657  * increment_recovery_point() - Move the given recovery point forward by one entry.
658  */
659 static void increment_recovery_point(struct recovery_point *point)
660 {
661 	if (++point->entry_count < RECOVERY_JOURNAL_ENTRIES_PER_SECTOR)
662 		return;
663 
664 	point->entry_count = 0;
665 	if (point->sector_count < (VDO_SECTORS_PER_BLOCK - 1)) {
666 		point->sector_count++;
667 		return;
668 	}
669 
670 	point->sequence_number++;
671 	point->sector_count = 1;
672 }
673 
674 /**
675  * advance_points() - Advance the current recovery and journal points.
676  * @repair: The repair_completion whose points are to be advanced.
677  * @entries_per_block: The number of entries in a recovery journal block.
678  */
679 static void advance_points(struct repair_completion *repair,
680 			   journal_entry_count_t entries_per_block)
681 {
682 	if (!repair->next_recovery_point.increment_applied) {
683 		repair->next_recovery_point.increment_applied	= true;
684 		return;
685 	}
686 
687 	increment_recovery_point(&repair->next_recovery_point);
688 	vdo_advance_journal_point(&repair->next_journal_point, entries_per_block);
689 	repair->next_recovery_point.increment_applied	= false;
690 }
691 
692 /**
693  * before_recovery_point() - Check whether the first point precedes the second point.
694  * @first: The first recovery point.
695  * @second: The second recovery point.
696  *
697  * Return: true if the first point precedes the second point.
698  */
699 static bool __must_check before_recovery_point(const struct recovery_point *first,
700 					       const struct recovery_point *second)
701 {
702 	if (first->sequence_number < second->sequence_number)
703 		return true;
704 
705 	if (first->sequence_number > second->sequence_number)
706 		return false;
707 
708 	if (first->sector_count < second->sector_count)
709 		return true;
710 
711 	return ((first->sector_count == second->sector_count) &&
712 		(first->entry_count < second->entry_count));
713 }
714 
715 static struct packed_journal_sector * __must_check get_sector(struct recovery_journal *journal,
716 							      char *journal_data,
717 							      sequence_number_t sequence,
718 							      u8 sector_number)
719 {
720 	off_t offset;
721 
722 	offset = ((vdo_get_recovery_journal_block_number(journal, sequence) * VDO_BLOCK_SIZE) +
723 		  (VDO_SECTOR_SIZE * sector_number));
724 	return (struct packed_journal_sector *) (journal_data + offset);
725 }
726 
727 /**
728  * get_entry() - Unpack the recovery journal entry associated with the given recovery point.
729  * @repair: The repair completion.
730  * @point: The recovery point.
731  *
732  * Return: The unpacked contents of the matching recovery journal entry.
733  */
734 static struct recovery_journal_entry get_entry(const struct repair_completion *repair,
735 					       const struct recovery_point *point)
736 {
737 	struct packed_journal_sector *sector;
738 
739 	sector = get_sector(repair->completion.vdo->recovery_journal,
740 			    repair->journal_data, point->sequence_number,
741 			    point->sector_count);
742 	return vdo_unpack_recovery_journal_entry(&sector->entries[point->entry_count]);
743 }
744 
745 /**
746  * validate_recovery_journal_entry() - Validate a recovery journal entry.
747  * @vdo: The vdo.
748  * @entry: The entry to validate.
749  *
750  * Return: VDO_SUCCESS or an error.
751  */
752 static int validate_recovery_journal_entry(const struct vdo *vdo,
753 					   const struct recovery_journal_entry *entry)
754 {
755 	if ((entry->slot.pbn >= vdo->states.vdo.config.physical_blocks) ||
756 	    (entry->slot.slot >= VDO_BLOCK_MAP_ENTRIES_PER_PAGE) ||
757 	    !vdo_is_valid_location(&entry->mapping) ||
758 	    !vdo_is_valid_location(&entry->unmapping) ||
759 	    !vdo_is_physical_data_block(vdo->depot, entry->mapping.pbn) ||
760 	    !vdo_is_physical_data_block(vdo->depot, entry->unmapping.pbn)) {
761 		return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
762 					      "Invalid entry: %s (%llu, %u) from %llu to %llu is not within bounds",
763 					      vdo_get_journal_operation_name(entry->operation),
764 					      (unsigned long long) entry->slot.pbn,
765 					      entry->slot.slot,
766 					      (unsigned long long) entry->unmapping.pbn,
767 					      (unsigned long long) entry->mapping.pbn);
768 	}
769 
770 	if ((entry->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) &&
771 	    (vdo_is_state_compressed(entry->mapping.state) ||
772 	     (entry->mapping.pbn == VDO_ZERO_BLOCK) ||
773 	     (entry->unmapping.state != VDO_MAPPING_STATE_UNMAPPED) ||
774 	     (entry->unmapping.pbn != VDO_ZERO_BLOCK))) {
775 		return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
776 					      "Invalid entry: %s (%llu, %u) from %llu to %llu is not a valid tree mapping",
777 					      vdo_get_journal_operation_name(entry->operation),
778 					      (unsigned long long) entry->slot.pbn,
779 					      entry->slot.slot,
780 					      (unsigned long long) entry->unmapping.pbn,
781 					      (unsigned long long) entry->mapping.pbn);
782 	}
783 
784 	return VDO_SUCCESS;
785 }
786 
787 /**
788  * add_slab_journal_entries() - Replay recovery journal entries into the slab journals of the
789  *                              allocator currently being recovered.
790  * @completion: The allocator completion.
791  *
792  * Waits for slab journal tailblock space when necessary. This method is its own callback.
793  */
794 static void add_slab_journal_entries(struct vdo_completion *completion)
795 {
796 	struct recovery_point *recovery_point;
797 	struct repair_completion *repair = completion->parent;
798 	struct vdo *vdo = completion->vdo;
799 	struct recovery_journal *journal = vdo->recovery_journal;
800 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
801 
802 	/* Get ready in case we need to enqueue again. */
803 	vdo_prepare_completion(completion, add_slab_journal_entries,
804 			       vdo_notify_slab_journals_are_recovered,
805 			       completion->callback_thread_id, repair);
806 	for (recovery_point = &repair->next_recovery_point;
807 	     before_recovery_point(recovery_point, &repair->tail_recovery_point);
808 	     advance_points(repair, journal->entries_per_block)) {
809 		int result;
810 		physical_block_number_t pbn;
811 		struct vdo_slab *slab;
812 		struct recovery_journal_entry entry = get_entry(repair, recovery_point);
813 		bool increment = !repair->next_recovery_point.increment_applied;
814 
815 		if (increment) {
816 			result = validate_recovery_journal_entry(vdo, &entry);
817 			if (result != VDO_SUCCESS) {
818 				vdo_enter_read_only_mode(vdo, result);
819 				vdo_fail_completion(completion, result);
820 				return;
821 			}
822 
823 			pbn = entry.mapping.pbn;
824 		} else {
825 			pbn = entry.unmapping.pbn;
826 		}
827 
828 		if (pbn == VDO_ZERO_BLOCK)
829 			continue;
830 
831 		slab = vdo_get_slab(vdo->depot, pbn);
832 		if (slab->allocator != allocator)
833 			continue;
834 
835 		if (!vdo_attempt_replay_into_slab(slab, pbn, entry.operation, increment,
836 						  &repair->next_journal_point,
837 						  completion))
838 			return;
839 
840 		repair->entries_added_to_slab_journals++;
841 	}
842 
843 	vdo_notify_slab_journals_are_recovered(completion);
844 }
845 
846 /**
847  * vdo_replay_into_slab_journals() - Replay recovery journal entries in the slab journals of slabs
848  *                                   owned by a given block_allocator.
849  * @allocator: The allocator whose slab journals are to be recovered.
850  * @context: The slab depot load context supplied by a recovery when it loads the depot.
851  */
852 void vdo_replay_into_slab_journals(struct block_allocator *allocator, void *context)
853 {
854 	struct vdo_completion *completion = &allocator->completion;
855 	struct repair_completion *repair = context;
856 	struct vdo *vdo = completion->vdo;
857 
858 	vdo_assert_on_physical_zone_thread(vdo, allocator->zone_number, __func__);
859 	if (repair->entry_count == 0) {
860 		/* there's nothing to replay */
861 		repair->logical_blocks_used = vdo->recovery_journal->logical_blocks_used;
862 		repair->block_map_data_blocks = vdo->recovery_journal->block_map_data_blocks;
863 		vdo_notify_slab_journals_are_recovered(completion);
864 		return;
865 	}
866 
867 	repair->next_recovery_point = (struct recovery_point) {
868 		.sequence_number = repair->slab_journal_head,
869 		.sector_count = 1,
870 		.entry_count = 0,
871 	};
872 
873 	repair->next_journal_point = (struct journal_point) {
874 		.sequence_number = repair->slab_journal_head,
875 		.entry_count = 0,
876 	};
877 
878 	vdo_log_info("Replaying entries into slab journals for zone %u",
879 		     allocator->zone_number);
880 	completion->parent = repair;
881 	add_slab_journal_entries(completion);
882 }
883 
884 static void load_slab_depot(struct vdo_completion *completion)
885 {
886 	struct repair_completion *repair = as_repair_completion(completion);
887 	const struct admin_state_code *operation;
888 
889 	vdo_assert_on_admin_thread(completion->vdo, __func__);
890 
891 	if (vdo_state_requires_read_only_rebuild(completion->vdo->load_state)) {
892 		prepare_repair_completion(repair, rebuild_reference_counts,
893 					  VDO_ZONE_TYPE_LOGICAL);
894 		operation = VDO_ADMIN_STATE_LOADING_FOR_REBUILD;
895 	} else {
896 		prepare_repair_completion(repair, drain_slab_depot, VDO_ZONE_TYPE_ADMIN);
897 		operation = VDO_ADMIN_STATE_LOADING_FOR_RECOVERY;
898 	}
899 
900 	vdo_load_slab_depot(completion->vdo->depot, operation, completion, repair);
901 }
902 
903 static void flush_block_map(struct vdo_completion *completion)
904 {
905 	struct repair_completion *repair = as_repair_completion(completion);
906 	const struct admin_state_code *operation;
907 
908 	vdo_assert_on_admin_thread(completion->vdo, __func__);
909 
910 	vdo_log_info("Flushing block map changes");
911 	prepare_repair_completion(repair, load_slab_depot, VDO_ZONE_TYPE_ADMIN);
912 	operation = (vdo_state_requires_read_only_rebuild(completion->vdo->load_state) ?
913 		     VDO_ADMIN_STATE_REBUILDING :
914 		     VDO_ADMIN_STATE_RECOVERING);
915 	vdo_drain_block_map(completion->vdo->block_map, operation, completion);
916 }
917 
918 static bool finish_if_done(struct repair_completion *repair)
919 {
920 	/* Pages are still being launched or there is still work to do */
921 	if (repair->launching || (repair->outstanding > 0))
922 		return false;
923 
924 	if (repair->completion.result != VDO_SUCCESS) {
925 		page_count_t i;
926 
927 		for (i = 0; i < repair->page_count; i++) {
928 			struct vdo_page_completion *page_completion =
929 				&repair->page_completions[i];
930 
931 			if (page_completion->ready)
932 				vdo_release_page_completion(&page_completion->completion);
933 		}
934 
935 		vdo_launch_completion(&repair->completion);
936 		return true;
937 	}
938 
939 	if (repair->current_entry >= repair->entries)
940 		return false;
941 
942 	launch_repair_completion(repair, flush_block_map, VDO_ZONE_TYPE_ADMIN);
943 	return true;
944 }
945 
946 static void abort_block_map_recovery(struct repair_completion *repair, int result)
947 {
948 	vdo_set_completion_result(&repair->completion, result);
949 	finish_if_done(repair);
950 }
951 
952 /**
953  * find_entry_starting_next_page() - Find the first journal entry after a given entry which is not
954  *                                   on the same block map page.
955  * @current_entry: The entry to search from.
956  * @needs_sort: Whether sorting is needed to proceed.
957  *
958  * Return: Pointer to the first later journal entry on a different block map page, or a pointer to
959  *         just before the journal entries if no subsequent entry is on a different block map page.
960  */
961 static struct numbered_block_mapping *
962 find_entry_starting_next_page(struct repair_completion *repair,
963 			      struct numbered_block_mapping *current_entry, bool needs_sort)
964 {
965 	size_t current_page;
966 
967 	/* If current_entry is invalid, return immediately. */
968 	if (current_entry < repair->entries)
969 		return current_entry;
970 
971 	current_page = current_entry->block_map_slot.pbn;
972 
973 	/* Decrement current_entry until it's out of bounds or on a different page. */
974 	while ((current_entry >= repair->entries) &&
975 	       (current_entry->block_map_slot.pbn == current_page)) {
976 		if (needs_sort) {
977 			struct numbered_block_mapping *just_sorted_entry =
978 				sort_next_heap_element(repair);
979 			VDO_ASSERT_LOG_ONLY(just_sorted_entry < current_entry,
980 					    "heap is returning elements in an unexpected order");
981 		}
982 
983 		current_entry--;
984 	}
985 
986 	return current_entry;
987 }
988 
989 /*
990  * Apply a range of journal entries [starting_entry, ending_entry) journal
991  * entries to a block map page.
992  */
993 static void apply_journal_entries_to_page(struct block_map_page *page,
994 					  struct numbered_block_mapping *starting_entry,
995 					  struct numbered_block_mapping *ending_entry)
996 {
997 	struct numbered_block_mapping *current_entry = starting_entry;
998 
999 	while (current_entry != ending_entry) {
1000 		page->entries[current_entry->block_map_slot.slot] = current_entry->block_map_entry;
1001 		current_entry--;
1002 	}
1003 }
1004 
1005 static void recover_ready_pages(struct repair_completion *repair,
1006 				struct vdo_completion *completion);
1007 
1008 static void block_map_page_loaded(struct vdo_completion *completion)
1009 {
1010 	struct repair_completion *repair = as_repair_completion(completion->parent);
1011 
1012 	repair->outstanding--;
1013 	if (!repair->launching)
1014 		recover_ready_pages(repair, completion);
1015 }
1016 
1017 static void handle_block_map_page_load_error(struct vdo_completion *completion)
1018 {
1019 	struct repair_completion *repair = as_repair_completion(completion->parent);
1020 
1021 	repair->outstanding--;
1022 	abort_block_map_recovery(repair, completion->result);
1023 }
1024 
1025 static void fetch_block_map_page(struct repair_completion *repair,
1026 				 struct vdo_completion *completion)
1027 {
1028 	physical_block_number_t pbn;
1029 
1030 	if (repair->current_unfetched_entry < repair->entries)
1031 		/* Nothing left to fetch. */
1032 		return;
1033 
1034 	/* Fetch the next page we haven't yet requested. */
1035 	pbn = repair->current_unfetched_entry->block_map_slot.pbn;
1036 	repair->current_unfetched_entry =
1037 		find_entry_starting_next_page(repair, repair->current_unfetched_entry,
1038 					      true);
1039 	repair->outstanding++;
1040 	vdo_get_page(((struct vdo_page_completion *) completion),
1041 		     &repair->completion.vdo->block_map->zones[0], pbn, true,
1042 		     &repair->completion, block_map_page_loaded,
1043 		     handle_block_map_page_load_error, false);
1044 }
1045 
1046 static struct vdo_page_completion *get_next_page_completion(struct repair_completion *repair,
1047 							    struct vdo_page_completion *completion)
1048 {
1049 	completion++;
1050 	if (completion == (&repair->page_completions[repair->page_count]))
1051 		completion = &repair->page_completions[0];
1052 	return completion;
1053 }
1054 
1055 static void recover_ready_pages(struct repair_completion *repair,
1056 				struct vdo_completion *completion)
1057 {
1058 	struct vdo_page_completion *page_completion = (struct vdo_page_completion *) completion;
1059 
1060 	if (finish_if_done(repair))
1061 		return;
1062 
1063 	if (repair->pbn != page_completion->pbn)
1064 		return;
1065 
1066 	while (page_completion->ready) {
1067 		struct numbered_block_mapping *start_of_next_page;
1068 		struct block_map_page *page;
1069 		int result;
1070 
1071 		result = vdo_get_cached_page(completion, &page);
1072 		if (result != VDO_SUCCESS) {
1073 			abort_block_map_recovery(repair, result);
1074 			return;
1075 		}
1076 
1077 		start_of_next_page =
1078 			find_entry_starting_next_page(repair, repair->current_entry,
1079 						      false);
1080 		apply_journal_entries_to_page(page, repair->current_entry,
1081 					      start_of_next_page);
1082 		repair->current_entry = start_of_next_page;
1083 		vdo_request_page_write(completion);
1084 		vdo_release_page_completion(completion);
1085 
1086 		if (finish_if_done(repair))
1087 			return;
1088 
1089 		repair->pbn = repair->current_entry->block_map_slot.pbn;
1090 		fetch_block_map_page(repair, completion);
1091 		page_completion = get_next_page_completion(repair, page_completion);
1092 		completion = &page_completion->completion;
1093 	}
1094 }
1095 
1096 static void recover_block_map(struct vdo_completion *completion)
1097 {
1098 	struct repair_completion *repair = as_repair_completion(completion);
1099 	struct vdo *vdo = completion->vdo;
1100 	struct numbered_block_mapping *first_sorted_entry;
1101 	page_count_t i;
1102 
1103 	vdo_assert_on_logical_zone_thread(vdo, 0, __func__);
1104 
1105 	/* Suppress block map errors. */
1106 	vdo->block_map->zones[0].page_cache.rebuilding =
1107 		vdo_state_requires_read_only_rebuild(vdo->load_state);
1108 
1109 	if (repair->block_map_entry_count == 0) {
1110 		vdo_log_info("Replaying 0 recovery entries into block map");
1111 		vdo_free(vdo_forget(repair->journal_data));
1112 		launch_repair_completion(repair, load_slab_depot, VDO_ZONE_TYPE_ADMIN);
1113 		return;
1114 	}
1115 
1116 	/*
1117 	 * Organize the journal entries into a binary heap so we can iterate over them in sorted
1118 	 * order incrementally, avoiding an expensive sort call.
1119 	 */
1120 	repair->replay_heap = (struct min_heap) {
1121 		.data = repair->entries,
1122 		.nr = repair->block_map_entry_count,
1123 		.size = repair->block_map_entry_count,
1124 	};
1125 	min_heapify_all(&repair->replay_heap, &repair_min_heap);
1126 
1127 	vdo_log_info("Replaying %zu recovery entries into block map",
1128 		     repair->block_map_entry_count);
1129 
1130 	repair->current_entry = &repair->entries[repair->block_map_entry_count - 1];
1131 	first_sorted_entry = sort_next_heap_element(repair);
1132 	VDO_ASSERT_LOG_ONLY(first_sorted_entry == repair->current_entry,
1133 			    "heap is returning elements in an unexpected order");
1134 
1135 	/* Prevent any page from being processed until all pages have been launched. */
1136 	repair->launching = true;
1137 	repair->pbn = repair->current_entry->block_map_slot.pbn;
1138 	repair->current_unfetched_entry = repair->current_entry;
1139 	for (i = 0; i < repair->page_count; i++) {
1140 		if (repair->current_unfetched_entry < repair->entries)
1141 			break;
1142 
1143 		fetch_block_map_page(repair, &repair->page_completions[i].completion);
1144 	}
1145 	repair->launching = false;
1146 
1147 	/* Process any ready pages. */
1148 	recover_ready_pages(repair, &repair->page_completions[0].completion);
1149 }
1150 
1151 /**
1152  * get_recovery_journal_block_header() - Get the block header for a block at a position in the
1153  *                                       journal data and unpack it.
1154  * @journal: The recovery journal.
1155  * @data: The recovery journal data.
1156  * @sequence: The sequence number.
1157  *
1158  * Return: The unpacked header.
1159  */
1160 static struct recovery_block_header __must_check
1161 get_recovery_journal_block_header(struct recovery_journal *journal, char *data,
1162 				  sequence_number_t sequence)
1163 {
1164 	physical_block_number_t pbn =
1165 		vdo_get_recovery_journal_block_number(journal, sequence);
1166 	char *header = &data[pbn * VDO_BLOCK_SIZE];
1167 
1168 	return vdo_unpack_recovery_block_header((struct packed_journal_header *) header);
1169 }
1170 
1171 /**
1172  * is_valid_recovery_journal_block() - Determine whether the given header describes a valid block
1173  *                                     for the given journal.
1174  * @journal: The journal to use.
1175  * @header: The unpacked block header to check.
1176  * @old_ok: Whether an old format header is valid.
1177  *
1178  * A block is not valid if it is unformatted, or if it is older than the last successful recovery
1179  * or reformat.
1180  *
1181  * Return: True if the header is valid.
1182  */
1183 static bool __must_check is_valid_recovery_journal_block(const struct recovery_journal *journal,
1184 							 const struct recovery_block_header *header,
1185 							 bool old_ok)
1186 {
1187 	if ((header->nonce != journal->nonce) ||
1188 	    (header->recovery_count != journal->recovery_count))
1189 		return false;
1190 
1191 	if (header->metadata_type == VDO_METADATA_RECOVERY_JOURNAL_2)
1192 		return (header->entry_count <= journal->entries_per_block);
1193 
1194 	return (old_ok &&
1195 		(header->metadata_type == VDO_METADATA_RECOVERY_JOURNAL) &&
1196 		(header->entry_count <= RECOVERY_JOURNAL_1_ENTRIES_PER_BLOCK));
1197 }
1198 
1199 /**
1200  * is_exact_recovery_journal_block() - Determine whether the given header describes the exact block
1201  *                                     indicated.
1202  * @journal: The journal to use.
1203  * @header: The unpacked block header to check.
1204  * @sequence: The expected sequence number.
1205  * @type: The expected metadata type.
1206  *
1207  * Return: True if the block matches.
1208  */
1209 static bool __must_check is_exact_recovery_journal_block(const struct recovery_journal *journal,
1210 							 const struct recovery_block_header *header,
1211 							 sequence_number_t sequence,
1212 							 enum vdo_metadata_type type)
1213 {
1214 	return ((header->metadata_type == type) &&
1215 		(header->sequence_number == sequence) &&
1216 		(is_valid_recovery_journal_block(journal, header, true)));
1217 }
1218 
1219 /**
1220  * find_recovery_journal_head_and_tail() - Find the tail and head of the journal.
1221  *
1222  * Return: True if there were valid journal blocks.
1223  */
1224 static bool find_recovery_journal_head_and_tail(struct repair_completion *repair)
1225 {
1226 	struct recovery_journal *journal = repair->completion.vdo->recovery_journal;
1227 	bool found_entries = false;
1228 	physical_block_number_t i;
1229 
1230 	/*
1231 	 * Ensure that we don't replay old entries since we know the tail recorded in the super
1232 	 * block must be a lower bound. Not doing so can result in extra data loss by setting the
1233 	 * tail too early.
1234 	 */
1235 	repair->highest_tail = journal->tail;
1236 	for (i = 0; i < journal->size; i++) {
1237 		struct recovery_block_header header =
1238 			get_recovery_journal_block_header(journal, repair->journal_data, i);
1239 
1240 		if (!is_valid_recovery_journal_block(journal, &header, true)) {
1241 			/* This block is old or incorrectly formatted */
1242 			continue;
1243 		}
1244 
1245 		if (vdo_get_recovery_journal_block_number(journal, header.sequence_number) != i) {
1246 			/* This block is in the wrong location */
1247 			continue;
1248 		}
1249 
1250 		if (header.sequence_number >= repair->highest_tail) {
1251 			found_entries = true;
1252 			repair->highest_tail = header.sequence_number;
1253 		}
1254 
1255 		if (!found_entries)
1256 			continue;
1257 
1258 		if (header.block_map_head > repair->block_map_head)
1259 			repair->block_map_head = header.block_map_head;
1260 
1261 		if (header.slab_journal_head > repair->slab_journal_head)
1262 			repair->slab_journal_head = header.slab_journal_head;
1263 	}
1264 
1265 	return found_entries;
1266 }
1267 
1268 /**
1269  * unpack_entry() - Unpack a recovery journal entry in either format.
1270  * @vdo: The vdo.
1271  * @packed: The entry to unpack.
1272  * @format: The expected format of the entry.
1273  * @entry: The unpacked entry.
1274  *
1275  * Return: true if the entry should be applied.3
1276  */
1277 static bool unpack_entry(struct vdo *vdo, char *packed, enum vdo_metadata_type format,
1278 			 struct recovery_journal_entry *entry)
1279 {
1280 	if (format == VDO_METADATA_RECOVERY_JOURNAL_2) {
1281 		struct packed_recovery_journal_entry *packed_entry =
1282 			(struct packed_recovery_journal_entry *) packed;
1283 
1284 		*entry = vdo_unpack_recovery_journal_entry(packed_entry);
1285 	} else {
1286 		physical_block_number_t low32, high4;
1287 
1288 		struct packed_recovery_journal_entry_1 *packed_entry =
1289 			(struct packed_recovery_journal_entry_1 *) packed;
1290 
1291 		if (packed_entry->operation == VDO_JOURNAL_DATA_INCREMENT)
1292 			entry->operation = VDO_JOURNAL_DATA_REMAPPING;
1293 		else if (packed_entry->operation == VDO_JOURNAL_BLOCK_MAP_INCREMENT)
1294 			entry->operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING;
1295 		else
1296 			return false;
1297 
1298 		low32 = __le32_to_cpu(packed_entry->pbn_low_word);
1299 		high4 = packed_entry->pbn_high_nibble;
1300 		entry->slot = (struct block_map_slot) {
1301 			.pbn = ((high4 << 32) | low32),
1302 			.slot = (packed_entry->slot_low | (packed_entry->slot_high << 6)),
1303 		};
1304 		entry->mapping = vdo_unpack_block_map_entry(&packed_entry->block_map_entry);
1305 		entry->unmapping = (struct data_location) {
1306 			.pbn = VDO_ZERO_BLOCK,
1307 			.state = VDO_MAPPING_STATE_UNMAPPED,
1308 		};
1309 	}
1310 
1311 	return (validate_recovery_journal_entry(vdo, entry) == VDO_SUCCESS);
1312 }
1313 
1314 /**
1315  * append_sector_entries() - Append an array of recovery journal entries from a journal block
1316  *                           sector to the array of numbered mappings in the repair completion,
1317  *                           numbering each entry in the order they are appended.
1318  * @repair: The repair completion.
1319  * @entries: The entries in the sector.
1320  * @format: The format of the sector.
1321  * @entry_count: The number of entries to append.
1322  */
1323 static void append_sector_entries(struct repair_completion *repair, char *entries,
1324 				  enum vdo_metadata_type format,
1325 				  journal_entry_count_t entry_count)
1326 {
1327 	journal_entry_count_t i;
1328 	struct vdo *vdo = repair->completion.vdo;
1329 	off_t increment = ((format == VDO_METADATA_RECOVERY_JOURNAL_2)
1330 			   ? sizeof(struct packed_recovery_journal_entry)
1331 			   : sizeof(struct packed_recovery_journal_entry_1));
1332 
1333 	for (i = 0; i < entry_count; i++, entries += increment) {
1334 		struct recovery_journal_entry entry;
1335 
1336 		if (!unpack_entry(vdo, entries, format, &entry))
1337 			/* When recovering from read-only mode, ignore damaged entries. */
1338 			continue;
1339 
1340 		repair->entries[repair->block_map_entry_count] =
1341 			(struct numbered_block_mapping) {
1342 			.block_map_slot = entry.slot,
1343 			.block_map_entry = vdo_pack_block_map_entry(entry.mapping.pbn,
1344 								    entry.mapping.state),
1345 			.number = repair->block_map_entry_count,
1346 		};
1347 		repair->block_map_entry_count++;
1348 	}
1349 }
1350 
1351 static journal_entry_count_t entries_per_sector(enum vdo_metadata_type format,
1352 						u8 sector_number)
1353 {
1354 	if (format == VDO_METADATA_RECOVERY_JOURNAL_2)
1355 		return RECOVERY_JOURNAL_ENTRIES_PER_SECTOR;
1356 
1357 	return ((sector_number == (VDO_SECTORS_PER_BLOCK - 1))
1358 		? RECOVERY_JOURNAL_1_ENTRIES_IN_LAST_SECTOR
1359 		: RECOVERY_JOURNAL_1_ENTRIES_PER_SECTOR);
1360 }
1361 
1362 static void extract_entries_from_block(struct repair_completion *repair,
1363 				       struct recovery_journal *journal,
1364 				       sequence_number_t sequence,
1365 				       enum vdo_metadata_type format,
1366 				       journal_entry_count_t entries)
1367 {
1368 	sector_count_t i;
1369 	struct recovery_block_header header =
1370 		get_recovery_journal_block_header(journal, repair->journal_data,
1371 						  sequence);
1372 
1373 	if (!is_exact_recovery_journal_block(journal, &header, sequence, format)) {
1374 		/* This block is invalid, so skip it. */
1375 		return;
1376 	}
1377 
1378 	entries = min(entries, header.entry_count);
1379 	for (i = 1; i < VDO_SECTORS_PER_BLOCK; i++) {
1380 		struct packed_journal_sector *sector =
1381 			get_sector(journal, repair->journal_data, sequence, i);
1382 		journal_entry_count_t sector_entries =
1383 			min(entries, entries_per_sector(format, i));
1384 
1385 		if (vdo_is_valid_recovery_journal_sector(&header, sector, i)) {
1386 			/* Only extract as many as the block header calls for. */
1387 			append_sector_entries(repair, (char *) sector->entries, format,
1388 					      min_t(journal_entry_count_t,
1389 						    sector->entry_count,
1390 						    sector_entries));
1391 		}
1392 
1393 		/*
1394 		 * Even if the sector wasn't full, count it as full when counting up to the
1395 		 * entry count the block header claims.
1396 		 */
1397 		entries -= sector_entries;
1398 	}
1399 }
1400 
1401 static int parse_journal_for_rebuild(struct repair_completion *repair)
1402 {
1403 	int result;
1404 	sequence_number_t i;
1405 	block_count_t count;
1406 	enum vdo_metadata_type format;
1407 	struct vdo *vdo = repair->completion.vdo;
1408 	struct recovery_journal *journal = vdo->recovery_journal;
1409 	journal_entry_count_t entries_per_block = journal->entries_per_block;
1410 
1411 	format = get_recovery_journal_block_header(journal, repair->journal_data,
1412 						   repair->highest_tail).metadata_type;
1413 	if (format == VDO_METADATA_RECOVERY_JOURNAL)
1414 		entries_per_block = RECOVERY_JOURNAL_1_ENTRIES_PER_BLOCK;
1415 
1416 	/*
1417 	 * Allocate an array of numbered_block_mapping structures large enough to transcribe every
1418 	 * packed_recovery_journal_entry from every valid journal block.
1419 	 */
1420 	count = ((repair->highest_tail - repair->block_map_head + 1) * entries_per_block);
1421 	result = vdo_allocate(count, struct numbered_block_mapping, __func__,
1422 			      &repair->entries);
1423 	if (result != VDO_SUCCESS)
1424 		return result;
1425 
1426 	for (i = repair->block_map_head; i <= repair->highest_tail; i++)
1427 		extract_entries_from_block(repair, journal, i, format, entries_per_block);
1428 
1429 	return VDO_SUCCESS;
1430 }
1431 
1432 static int validate_heads(struct repair_completion *repair)
1433 {
1434 	/* Both reap heads must be behind the tail. */
1435 	if ((repair->block_map_head <= repair->tail) &&
1436 	    (repair->slab_journal_head <= repair->tail))
1437 		return VDO_SUCCESS;
1438 
1439 
1440 	return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
1441 				      "Journal tail too early. block map head: %llu, slab journal head: %llu, tail: %llu",
1442 				      (unsigned long long) repair->block_map_head,
1443 				      (unsigned long long) repair->slab_journal_head,
1444 				      (unsigned long long) repair->tail);
1445 }
1446 
1447 /**
1448  * extract_new_mappings() - Find all valid new mappings to be applied to the block map.
1449  *
1450  * The mappings are extracted from the journal and stored in a sortable array so that all of the
1451  * mappings to be applied to a given block map page can be done in a single page fetch.
1452  */
1453 static int extract_new_mappings(struct repair_completion *repair)
1454 {
1455 	int result;
1456 	struct vdo *vdo = repair->completion.vdo;
1457 	struct recovery_point recovery_point = {
1458 		.sequence_number = repair->block_map_head,
1459 		.sector_count = 1,
1460 		.entry_count = 0,
1461 	};
1462 
1463 	/*
1464 	 * Allocate an array of numbered_block_mapping structs just large enough to transcribe
1465 	 * every packed_recovery_journal_entry from every valid journal block.
1466 	 */
1467 	result = vdo_allocate(repair->entry_count, struct numbered_block_mapping,
1468 			      __func__, &repair->entries);
1469 	if (result != VDO_SUCCESS)
1470 		return result;
1471 
1472 	for (; before_recovery_point(&recovery_point, &repair->tail_recovery_point);
1473 	     increment_recovery_point(&recovery_point)) {
1474 		struct recovery_journal_entry entry = get_entry(repair, &recovery_point);
1475 
1476 		result = validate_recovery_journal_entry(vdo, &entry);
1477 		if (result != VDO_SUCCESS) {
1478 			vdo_enter_read_only_mode(vdo, result);
1479 			return result;
1480 		}
1481 
1482 		repair->entries[repair->block_map_entry_count] =
1483 			(struct numbered_block_mapping) {
1484 			.block_map_slot = entry.slot,
1485 			.block_map_entry = vdo_pack_block_map_entry(entry.mapping.pbn,
1486 								    entry.mapping.state),
1487 			.number = repair->block_map_entry_count,
1488 		};
1489 		repair->block_map_entry_count++;
1490 	}
1491 
1492 	result = VDO_ASSERT((repair->block_map_entry_count <= repair->entry_count),
1493 			    "approximate entry count is an upper bound");
1494 	if (result != VDO_SUCCESS)
1495 		vdo_enter_read_only_mode(vdo, result);
1496 
1497 	return result;
1498 }
1499 
1500 /**
1501  * compute_usages() - Compute the lbns in use and block map data blocks counts from the tail of
1502  *                    the journal.
1503  */
1504 static noinline int compute_usages(struct repair_completion *repair)
1505 {
1506 	/*
1507 	 * This function is declared noinline to avoid a spurious valgrind error regarding the
1508 	 * following structure being uninitialized.
1509 	 */
1510 	struct recovery_point recovery_point = {
1511 		.sequence_number = repair->tail,
1512 		.sector_count = 1,
1513 		.entry_count = 0,
1514 	};
1515 
1516 	struct vdo *vdo = repair->completion.vdo;
1517 	struct recovery_journal *journal = vdo->recovery_journal;
1518 	struct recovery_block_header header =
1519 		get_recovery_journal_block_header(journal, repair->journal_data,
1520 						  repair->tail);
1521 
1522 	repair->logical_blocks_used = header.logical_blocks_used;
1523 	repair->block_map_data_blocks = header.block_map_data_blocks;
1524 
1525 	for (; before_recovery_point(&recovery_point, &repair->tail_recovery_point);
1526 	     increment_recovery_point(&recovery_point)) {
1527 		struct recovery_journal_entry entry = get_entry(repair, &recovery_point);
1528 		int result;
1529 
1530 		result = validate_recovery_journal_entry(vdo, &entry);
1531 		if (result != VDO_SUCCESS) {
1532 			vdo_enter_read_only_mode(vdo, result);
1533 			return result;
1534 		}
1535 
1536 		if (entry.operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
1537 			repair->block_map_data_blocks++;
1538 			continue;
1539 		}
1540 
1541 		if (vdo_is_mapped_location(&entry.mapping))
1542 			repair->logical_blocks_used++;
1543 
1544 		if (vdo_is_mapped_location(&entry.unmapping))
1545 			repair->logical_blocks_used--;
1546 	}
1547 
1548 	return VDO_SUCCESS;
1549 }
1550 
1551 static int parse_journal_for_recovery(struct repair_completion *repair)
1552 {
1553 	int result;
1554 	sequence_number_t i, head;
1555 	bool found_entries = false;
1556 	struct recovery_journal *journal = repair->completion.vdo->recovery_journal;
1557 
1558 	head = min(repair->block_map_head, repair->slab_journal_head);
1559 	for (i = head; i <= repair->highest_tail; i++) {
1560 		struct recovery_block_header header;
1561 		journal_entry_count_t block_entries;
1562 		u8 j;
1563 
1564 		repair->tail = i;
1565 		repair->tail_recovery_point = (struct recovery_point) {
1566 			.sequence_number = i,
1567 			.sector_count = 0,
1568 			.entry_count = 0,
1569 		};
1570 
1571 		header = get_recovery_journal_block_header(journal, repair->journal_data, i);
1572 		if (header.metadata_type == VDO_METADATA_RECOVERY_JOURNAL) {
1573 			/* This is an old format block, so we need to upgrade */
1574 			vdo_log_error_strerror(VDO_UNSUPPORTED_VERSION,
1575 					       "Recovery journal is in the old format, a read-only rebuild is required.");
1576 			vdo_enter_read_only_mode(repair->completion.vdo,
1577 						 VDO_UNSUPPORTED_VERSION);
1578 			return VDO_UNSUPPORTED_VERSION;
1579 		}
1580 
1581 		if (!is_exact_recovery_journal_block(journal, &header, i,
1582 						     VDO_METADATA_RECOVERY_JOURNAL_2)) {
1583 			/* A bad block header was found so this must be the end of the journal. */
1584 			break;
1585 		}
1586 
1587 		block_entries = header.entry_count;
1588 
1589 		/* Examine each sector in turn to determine the last valid sector. */
1590 		for (j = 1; j < VDO_SECTORS_PER_BLOCK; j++) {
1591 			struct packed_journal_sector *sector =
1592 				get_sector(journal, repair->journal_data, i, j);
1593 			journal_entry_count_t sector_entries =
1594 				min_t(journal_entry_count_t, sector->entry_count,
1595 				      block_entries);
1596 
1597 			/* A bad sector means that this block was torn. */
1598 			if (!vdo_is_valid_recovery_journal_sector(&header, sector, j))
1599 				break;
1600 
1601 			if (sector_entries > 0) {
1602 				found_entries = true;
1603 				repair->tail_recovery_point.sector_count++;
1604 				repair->tail_recovery_point.entry_count = sector_entries;
1605 				block_entries -= sector_entries;
1606 				repair->entry_count += sector_entries;
1607 			}
1608 
1609 			/* If this sector is short, the later sectors can't matter. */
1610 			if ((sector_entries < RECOVERY_JOURNAL_ENTRIES_PER_SECTOR) ||
1611 			    (block_entries == 0))
1612 				break;
1613 		}
1614 
1615 		/* If this block was not filled, or if it tore, no later block can matter. */
1616 		if ((header.entry_count != journal->entries_per_block) || (block_entries > 0))
1617 			break;
1618 	}
1619 
1620 	if (!found_entries)
1621 		return validate_heads(repair);
1622 
1623 	/* Set the tail to the last valid tail block, if there is one. */
1624 	if (repair->tail_recovery_point.sector_count == 0)
1625 		repair->tail--;
1626 
1627 	result = validate_heads(repair);
1628 	if (result != VDO_SUCCESS)
1629 		return result;
1630 
1631 	vdo_log_info("Highest-numbered recovery journal block has sequence number %llu, and the highest-numbered usable block is %llu",
1632 		     (unsigned long long) repair->highest_tail,
1633 		     (unsigned long long) repair->tail);
1634 
1635 	result = extract_new_mappings(repair);
1636 	if (result != VDO_SUCCESS)
1637 		return result;
1638 
1639 	return compute_usages(repair);
1640 }
1641 
1642 static int parse_journal(struct repair_completion *repair)
1643 {
1644 	if (!find_recovery_journal_head_and_tail(repair))
1645 		return VDO_SUCCESS;
1646 
1647 	return (vdo_state_requires_read_only_rebuild(repair->completion.vdo->load_state) ?
1648 		parse_journal_for_rebuild(repair) :
1649 		parse_journal_for_recovery(repair));
1650 }
1651 
1652 static void finish_journal_load(struct vdo_completion *completion)
1653 {
1654 	struct repair_completion *repair = completion->parent;
1655 
1656 	if (++repair->vios_complete != repair->vio_count)
1657 		return;
1658 
1659 	vdo_log_info("Finished reading recovery journal");
1660 	uninitialize_vios(repair);
1661 	prepare_repair_completion(repair, recover_block_map, VDO_ZONE_TYPE_LOGICAL);
1662 	vdo_continue_completion(&repair->completion, parse_journal(repair));
1663 }
1664 
1665 static void handle_journal_load_error(struct vdo_completion *completion)
1666 {
1667 	struct repair_completion *repair = completion->parent;
1668 
1669 	/* Preserve the error */
1670 	vdo_set_completion_result(&repair->completion, completion->result);
1671 	vio_record_metadata_io_error(as_vio(completion));
1672 	completion->callback(completion);
1673 }
1674 
1675 static void read_journal_endio(struct bio *bio)
1676 {
1677 	struct vio *vio = bio->bi_private;
1678 	struct vdo *vdo = vio->completion.vdo;
1679 
1680 	continue_vio_after_io(vio, finish_journal_load, vdo->thread_config.admin_thread);
1681 }
1682 
1683 /**
1684  * vdo_repair() - Load the recovery journal and then recover or rebuild a vdo.
1685  * @parent: The completion to notify when the operation is complete
1686  */
1687 void vdo_repair(struct vdo_completion *parent)
1688 {
1689 	int result;
1690 	char *ptr;
1691 	struct repair_completion *repair;
1692 	struct vdo *vdo = parent->vdo;
1693 	struct recovery_journal *journal = vdo->recovery_journal;
1694 	physical_block_number_t pbn = journal->origin;
1695 	block_count_t remaining = journal->size;
1696 	block_count_t vio_count = DIV_ROUND_UP(remaining, MAX_BLOCKS_PER_VIO);
1697 	page_count_t page_count = min_t(page_count_t,
1698 					vdo->device_config->cache_size >> 1,
1699 					MAXIMUM_SIMULTANEOUS_VDO_BLOCK_MAP_RESTORATION_READS);
1700 
1701 	vdo_assert_on_admin_thread(vdo, __func__);
1702 
1703 	if (vdo->load_state == VDO_FORCE_REBUILD) {
1704 		vdo_log_warning("Rebuilding reference counts to clear read-only mode");
1705 		vdo->states.vdo.read_only_recoveries++;
1706 	} else if (vdo->load_state == VDO_REBUILD_FOR_UPGRADE) {
1707 		vdo_log_warning("Rebuilding reference counts for upgrade");
1708 	} else {
1709 		vdo_log_warning("Device was dirty, rebuilding reference counts");
1710 	}
1711 
1712 	result = vdo_allocate_extended(struct repair_completion, page_count,
1713 				       struct vdo_page_completion, __func__,
1714 				       &repair);
1715 	if (result != VDO_SUCCESS) {
1716 		vdo_fail_completion(parent, result);
1717 		return;
1718 	}
1719 
1720 	vdo_initialize_completion(&repair->completion, vdo, VDO_REPAIR_COMPLETION);
1721 	repair->completion.error_handler = abort_repair;
1722 	repair->completion.parent = parent;
1723 	prepare_repair_completion(repair, finish_repair, VDO_ZONE_TYPE_ADMIN);
1724 	repair->page_count = page_count;
1725 
1726 	result = vdo_allocate(remaining * VDO_BLOCK_SIZE, char, __func__,
1727 			      &repair->journal_data);
1728 	if (abort_on_error(result, repair))
1729 		return;
1730 
1731 	result = vdo_allocate(vio_count, struct vio, __func__, &repair->vios);
1732 	if (abort_on_error(result, repair))
1733 		return;
1734 
1735 	ptr = repair->journal_data;
1736 	for (repair->vio_count = 0; repair->vio_count < vio_count; repair->vio_count++) {
1737 		block_count_t blocks = min_t(block_count_t, remaining,
1738 					     MAX_BLOCKS_PER_VIO);
1739 
1740 		result = allocate_vio_components(vdo, VIO_TYPE_RECOVERY_JOURNAL,
1741 						 VIO_PRIORITY_METADATA,
1742 						 repair, blocks, ptr,
1743 						 &repair->vios[repair->vio_count]);
1744 		if (abort_on_error(result, repair))
1745 			return;
1746 
1747 		ptr += (blocks * VDO_BLOCK_SIZE);
1748 		remaining -= blocks;
1749 	}
1750 
1751 	for (vio_count = 0; vio_count < repair->vio_count;
1752 	     vio_count++, pbn += MAX_BLOCKS_PER_VIO) {
1753 		vdo_submit_metadata_vio(&repair->vios[vio_count], pbn, read_journal_endio,
1754 					handle_journal_load_error, REQ_OP_READ);
1755 	}
1756 }
1757