xref: /linux/drivers/md/dm-vdo/block-map.c (revision bf52ca5912c07664276c7b94db820fa2d638b681)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "block-map.h"
7 
8 #include <linux/bio.h>
9 #include <linux/ratelimit.h>
10 
11 #include "errors.h"
12 #include "logger.h"
13 #include "memory-alloc.h"
14 #include "permassert.h"
15 
16 #include "action-manager.h"
17 #include "admin-state.h"
18 #include "completion.h"
19 #include "constants.h"
20 #include "data-vio.h"
21 #include "encodings.h"
22 #include "io-submitter.h"
23 #include "physical-zone.h"
24 #include "recovery-journal.h"
25 #include "slab-depot.h"
26 #include "status-codes.h"
27 #include "types.h"
28 #include "vdo.h"
29 #include "vio.h"
30 #include "wait-queue.h"
31 
32 /**
33  * DOC: Block map eras
34  *
35  * The block map era, or maximum age, is used as follows:
36  *
37  * Each block map page, when dirty, records the earliest recovery journal block sequence number of
38  * the changes reflected in that dirty block. Sequence numbers are classified into eras: every
39  * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras
40  * according to the sequence number they record.
41  *
42  * In the current (newest) era, block map pages are not written unless there is cache pressure. In
43  * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in
44  * this era are issued for write. In all older eras, pages are issued for write immediately.
45  */
46 
47 struct page_descriptor {
48 	root_count_t root_index;
49 	height_t height;
50 	page_number_t page_index;
51 	slot_number_t slot;
52 } __packed;
53 
54 union page_key {
55 	struct page_descriptor descriptor;
56 	u64 key;
57 };
58 
59 struct write_if_not_dirtied_context {
60 	struct block_map_zone *zone;
61 	u8 generation;
62 };
63 
64 struct block_map_tree_segment {
65 	struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT];
66 };
67 
68 struct block_map_tree {
69 	struct block_map_tree_segment *segments;
70 };
71 
72 struct forest {
73 	struct block_map *map;
74 	size_t segments;
75 	struct boundary *boundaries;
76 	struct tree_page **pages;
77 	struct block_map_tree trees[];
78 };
79 
80 struct cursor_level {
81 	page_number_t page_index;
82 	slot_number_t slot;
83 };
84 
85 struct cursors;
86 
87 struct cursor {
88 	struct vdo_waiter waiter;
89 	struct block_map_tree *tree;
90 	height_t height;
91 	struct cursors *parent;
92 	struct boundary boundary;
93 	struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT];
94 	struct pooled_vio *vio;
95 };
96 
97 struct cursors {
98 	struct block_map_zone *zone;
99 	struct vio_pool *pool;
100 	vdo_entry_callback_fn entry_callback;
101 	struct vdo_completion *completion;
102 	root_count_t active_roots;
103 	struct cursor cursors[];
104 };
105 
106 static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF;
107 
108 /* Used to indicate that the page holding the location of a tree root has been "loaded". */
109 static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF;
110 
111 const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = {
112 	.mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F,
113 	.pbn_high_nibble = 0,
114 	.pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX),
115 };
116 
117 #define LOG_INTERVAL 4000
118 #define DISPLAY_INTERVAL 100000
119 
120 /*
121  * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread.
122  * Prevents any compiler shenanigans from affecting other threads reading those stats.
123  */
124 #define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta))
125 
126 static inline bool is_dirty(const struct page_info *info)
127 {
128 	return info->state == PS_DIRTY;
129 }
130 
131 static inline bool is_present(const struct page_info *info)
132 {
133 	return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY);
134 }
135 
136 static inline bool is_in_flight(const struct page_info *info)
137 {
138 	return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING);
139 }
140 
141 static inline bool is_incoming(const struct page_info *info)
142 {
143 	return info->state == PS_INCOMING;
144 }
145 
146 static inline bool is_outgoing(const struct page_info *info)
147 {
148 	return info->state == PS_OUTGOING;
149 }
150 
151 static inline bool is_valid(const struct page_info *info)
152 {
153 	return is_present(info) || is_outgoing(info);
154 }
155 
156 static char *get_page_buffer(struct page_info *info)
157 {
158 	struct vdo_page_cache *cache = info->cache;
159 
160 	return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE];
161 }
162 
163 static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter)
164 {
165 	struct vdo_page_completion *completion;
166 
167 	if (waiter == NULL)
168 		return NULL;
169 
170 	completion = container_of(waiter, struct vdo_page_completion, waiter);
171 	vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION);
172 	return completion;
173 }
174 
175 /**
176  * initialize_info() - Initialize all page info structures and put them on the free list.
177  *
178  * Return: VDO_SUCCESS or an error.
179  */
180 static int initialize_info(struct vdo_page_cache *cache)
181 {
182 	struct page_info *info;
183 
184 	INIT_LIST_HEAD(&cache->free_list);
185 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
186 		int result;
187 
188 		info->cache = cache;
189 		info->state = PS_FREE;
190 		info->pbn = NO_PAGE;
191 
192 		result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP,
193 					     VIO_PRIORITY_METADATA, info,
194 					     get_page_buffer(info), &info->vio);
195 		if (result != VDO_SUCCESS)
196 			return result;
197 
198 		/* The thread ID should never change. */
199 		info->vio->completion.callback_thread_id = cache->zone->thread_id;
200 
201 		INIT_LIST_HEAD(&info->state_entry);
202 		list_add_tail(&info->state_entry, &cache->free_list);
203 		INIT_LIST_HEAD(&info->lru_entry);
204 	}
205 
206 	return VDO_SUCCESS;
207 }
208 
209 /**
210  * allocate_cache_components() - Allocate components of the cache which require their own
211  *                               allocation.
212  * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be
213  *               written out.
214  *
215  * The caller is responsible for all clean up on errors.
216  *
217  * Return: VDO_SUCCESS or an error code.
218  */
219 static int __must_check allocate_cache_components(struct vdo_page_cache *cache)
220 {
221 	u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE;
222 	int result;
223 
224 	result = vdo_allocate(cache->page_count, struct page_info, "page infos",
225 			      &cache->infos);
226 	if (result != VDO_SUCCESS)
227 		return result;
228 
229 	result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages);
230 	if (result != VDO_SUCCESS)
231 		return result;
232 
233 	result = vdo_int_map_create(cache->page_count, &cache->page_map);
234 	if (result != VDO_SUCCESS)
235 		return result;
236 
237 	return initialize_info(cache);
238 }
239 
240 /**
241  * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's
242  *                            thread.
243  */
244 static inline void assert_on_cache_thread(struct vdo_page_cache *cache,
245 					  const char *function_name)
246 {
247 	thread_id_t thread_id = vdo_get_callback_thread_id();
248 
249 	VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id),
250 			    "%s() must only be called on cache thread %d, not thread %d",
251 			    function_name, cache->zone->thread_id, thread_id);
252 }
253 
254 /** assert_io_allowed() - Assert that a page cache may issue I/O. */
255 static inline void assert_io_allowed(struct vdo_page_cache *cache)
256 {
257 	VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state),
258 			    "VDO page cache may issue I/O");
259 }
260 
261 /** report_cache_pressure() - Log and, if enabled, report cache pressure. */
262 static void report_cache_pressure(struct vdo_page_cache *cache)
263 {
264 	ADD_ONCE(cache->stats.cache_pressure, 1);
265 	if (cache->waiter_count > cache->page_count) {
266 		if ((cache->pressure_report % LOG_INTERVAL) == 0)
267 			vdo_log_info("page cache pressure %u", cache->stats.cache_pressure);
268 
269 		if (++cache->pressure_report >= DISPLAY_INTERVAL)
270 			cache->pressure_report = 0;
271 	}
272 }
273 
274 /**
275  * get_page_state_name() - Return the name of a page state.
276  *
277  * If the page state is invalid a static string is returned and the invalid state is logged.
278  *
279  * Return: A pointer to a static page state name.
280  */
281 static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state)
282 {
283 	int result;
284 	static const char * const state_names[] = {
285 		"FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING"
286 	};
287 
288 	BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT);
289 
290 	result = VDO_ASSERT(state < ARRAY_SIZE(state_names),
291 			    "Unknown page_state value %d", state);
292 	if (result != VDO_SUCCESS)
293 		return "[UNKNOWN PAGE STATE]";
294 
295 	return state_names[state];
296 }
297 
298 /**
299  * update_counter() - Update the counter associated with a given state.
300  * @info: The page info to count.
301  * @delta: The delta to apply to the counter.
302  */
303 static void update_counter(struct page_info *info, s32 delta)
304 {
305 	struct block_map_statistics *stats = &info->cache->stats;
306 
307 	switch (info->state) {
308 	case PS_FREE:
309 		ADD_ONCE(stats->free_pages, delta);
310 		return;
311 
312 	case PS_INCOMING:
313 		ADD_ONCE(stats->incoming_pages, delta);
314 		return;
315 
316 	case PS_OUTGOING:
317 		ADD_ONCE(stats->outgoing_pages, delta);
318 		return;
319 
320 	case PS_FAILED:
321 		ADD_ONCE(stats->failed_pages, delta);
322 		return;
323 
324 	case PS_RESIDENT:
325 		ADD_ONCE(stats->clean_pages, delta);
326 		return;
327 
328 	case PS_DIRTY:
329 		ADD_ONCE(stats->dirty_pages, delta);
330 		return;
331 
332 	default:
333 		return;
334 	}
335 }
336 
337 /** update_lru() - Update the lru information for an active page. */
338 static void update_lru(struct page_info *info)
339 {
340 	if (info->cache->lru_list.prev != &info->lru_entry)
341 		list_move_tail(&info->lru_entry, &info->cache->lru_list);
342 }
343 
344 /**
345  * set_info_state() - Set the state of a page_info and put it on the right list, adjusting
346  *                    counters.
347  */
348 static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state)
349 {
350 	if (new_state == info->state)
351 		return;
352 
353 	update_counter(info, -1);
354 	info->state = new_state;
355 	update_counter(info, 1);
356 
357 	switch (info->state) {
358 	case PS_FREE:
359 	case PS_FAILED:
360 		list_move_tail(&info->state_entry, &info->cache->free_list);
361 		return;
362 
363 	case PS_OUTGOING:
364 		list_move_tail(&info->state_entry, &info->cache->outgoing_list);
365 		return;
366 
367 	case PS_DIRTY:
368 		return;
369 
370 	default:
371 		list_del_init(&info->state_entry);
372 	}
373 }
374 
375 /** set_info_pbn() - Set the pbn for an info, updating the map as needed. */
376 static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn)
377 {
378 	struct vdo_page_cache *cache = info->cache;
379 
380 	/* Either the new or the old page number must be NO_PAGE. */
381 	int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE),
382 				"Must free a page before reusing it.");
383 	if (result != VDO_SUCCESS)
384 		return result;
385 
386 	if (info->pbn != NO_PAGE)
387 		vdo_int_map_remove(cache->page_map, info->pbn);
388 
389 	info->pbn = pbn;
390 
391 	if (pbn != NO_PAGE) {
392 		result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL);
393 		if (result != VDO_SUCCESS)
394 			return result;
395 	}
396 	return VDO_SUCCESS;
397 }
398 
399 /** reset_page_info() - Reset page info to represent an unallocated page. */
400 static int reset_page_info(struct page_info *info)
401 {
402 	int result;
403 
404 	result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy");
405 	if (result != VDO_SUCCESS)
406 		return result;
407 
408 	result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting),
409 			    "VDO Page must not have waiters");
410 	if (result != VDO_SUCCESS)
411 		return result;
412 
413 	result = set_info_pbn(info, NO_PAGE);
414 	set_info_state(info, PS_FREE);
415 	list_del_init(&info->lru_entry);
416 	return result;
417 }
418 
419 /**
420  * find_free_page() - Find a free page.
421  *
422  * Return: A pointer to the page info structure (if found), NULL otherwise.
423  */
424 static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache)
425 {
426 	struct page_info *info;
427 
428 	info = list_first_entry_or_null(&cache->free_list, struct page_info,
429 					state_entry);
430 	if (info != NULL)
431 		list_del_init(&info->state_entry);
432 
433 	return info;
434 }
435 
436 /**
437  * find_page() - Find the page info (if any) associated with a given pbn.
438  * @pbn: The absolute physical block number of the page.
439  *
440  * Return: The page info for the page if available, or NULL if not.
441  */
442 static struct page_info * __must_check find_page(struct vdo_page_cache *cache,
443 						 physical_block_number_t pbn)
444 {
445 	if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn))
446 		return cache->last_found;
447 
448 	cache->last_found = vdo_int_map_get(cache->page_map, pbn);
449 	return cache->last_found;
450 }
451 
452 /**
453  * select_lru_page() - Determine which page is least recently used.
454  *
455  * Picks the least recently used from among the non-busy entries at the front of each of the lru
456  * ring. Since whenever we mark a page busy we also put it to the end of the ring it is unlikely
457  * that the entries at the front are busy unless the queue is very short, but not impossible.
458  *
459  * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be
460  *         found. The page can be dirty or resident.
461  */
462 static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache)
463 {
464 	struct page_info *info;
465 
466 	list_for_each_entry(info, &cache->lru_list, lru_entry)
467 		if ((info->busy == 0) && !is_in_flight(info))
468 			return info;
469 
470 	return NULL;
471 }
472 
473 /* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */
474 
475 /**
476  * complete_with_page() - Helper to complete the VDO Page Completion request successfully.
477  * @info: The page info representing the result page.
478  * @vdo_page_comp: The VDO page completion to complete.
479  */
480 static void complete_with_page(struct page_info *info,
481 			       struct vdo_page_completion *vdo_page_comp)
482 {
483 	bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info);
484 
485 	if (!available) {
486 		vdo_log_error_strerror(VDO_BAD_PAGE,
487 				       "Requested cache page %llu in state %s is not %s",
488 				       (unsigned long long) info->pbn,
489 				       get_page_state_name(info->state),
490 				       vdo_page_comp->writable ? "present" : "valid");
491 		vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE);
492 		return;
493 	}
494 
495 	vdo_page_comp->info = info;
496 	vdo_page_comp->ready = true;
497 	vdo_finish_completion(&vdo_page_comp->completion);
498 }
499 
500 /**
501  * complete_waiter_with_error() - Complete a page completion with an error code.
502  * @waiter: The page completion, as a waiter.
503  * @result_ptr: A pointer to the error code.
504  *
505  * Implements waiter_callback_fn.
506  */
507 static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr)
508 {
509 	int *result = result_ptr;
510 
511 	vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result);
512 }
513 
514 /**
515  * complete_waiter_with_page() - Complete a page completion with a page.
516  * @waiter: The page completion, as a waiter.
517  * @page_info: The page info to complete with.
518  *
519  * Implements waiter_callback_fn.
520  */
521 static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info)
522 {
523 	complete_with_page(page_info, page_completion_from_waiter(waiter));
524 }
525 
526 /**
527  * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result.
528  *
529  * Upon completion the waitq will be empty.
530  *
531  * Return: The number of pages distributed.
532  */
533 static unsigned int distribute_page_over_waitq(struct page_info *info,
534 					       struct vdo_wait_queue *waitq)
535 {
536 	size_t num_pages;
537 
538 	update_lru(info);
539 	num_pages = vdo_waitq_num_waiters(waitq);
540 
541 	/*
542 	 * Increment the busy count once for each pending completion so that this page does not
543 	 * stop being busy until all completions have been processed.
544 	 */
545 	info->busy += num_pages;
546 
547 	vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info);
548 	return num_pages;
549 }
550 
551 /**
552  * set_persistent_error() - Set a persistent error which all requests will receive in the future.
553  * @context: A string describing what triggered the error.
554  *
555  * Once triggered, all enqueued completions will get this error. Any future requests will result in
556  * this error as well.
557  */
558 static void set_persistent_error(struct vdo_page_cache *cache, const char *context,
559 				 int result)
560 {
561 	struct page_info *info;
562 	/* If we're already read-only, there's no need to log. */
563 	struct vdo *vdo = cache->vdo;
564 
565 	if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) {
566 		vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s",
567 				       context);
568 		vdo_enter_read_only_mode(vdo, result);
569 	}
570 
571 	assert_on_cache_thread(cache, __func__);
572 
573 	vdo_waitq_notify_all_waiters(&cache->free_waiters,
574 				     complete_waiter_with_error, &result);
575 	cache->waiter_count = 0;
576 
577 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
578 		vdo_waitq_notify_all_waiters(&info->waiting,
579 					     complete_waiter_with_error, &result);
580 	}
581 }
582 
583 /**
584  * validate_completed_page() - Check that a page completion which is being freed to the cache
585  *                             referred to a valid page and is in a valid state.
586  * @writable: Whether a writable page is required.
587  *
588  * Return: VDO_SUCCESS if the page was valid, otherwise as error
589  */
590 static int __must_check validate_completed_page(struct vdo_page_completion *completion,
591 						bool writable)
592 {
593 	int result;
594 
595 	result = VDO_ASSERT(completion->ready, "VDO Page completion not ready");
596 	if (result != VDO_SUCCESS)
597 		return result;
598 
599 	result = VDO_ASSERT(completion->info != NULL,
600 			    "VDO Page Completion must be complete");
601 	if (result != VDO_SUCCESS)
602 		return result;
603 
604 	result = VDO_ASSERT(completion->info->pbn == completion->pbn,
605 			    "VDO Page Completion pbn must be consistent");
606 	if (result != VDO_SUCCESS)
607 		return result;
608 
609 	result = VDO_ASSERT(is_valid(completion->info),
610 			    "VDO Page Completion page must be valid");
611 	if (result != VDO_SUCCESS)
612 		return result;
613 
614 	if (writable) {
615 		result = VDO_ASSERT(completion->writable,
616 				    "VDO Page Completion must be writable");
617 		if (result != VDO_SUCCESS)
618 			return result;
619 	}
620 
621 	return VDO_SUCCESS;
622 }
623 
624 static void check_for_drain_complete(struct block_map_zone *zone)
625 {
626 	if (vdo_is_state_draining(&zone->state) &&
627 	    (zone->active_lookups == 0) &&
628 	    !vdo_waitq_has_waiters(&zone->flush_waiters) &&
629 	    !is_vio_pool_busy(zone->vio_pool) &&
630 	    (zone->page_cache.outstanding_reads == 0) &&
631 	    (zone->page_cache.outstanding_writes == 0)) {
632 		vdo_finish_draining_with_result(&zone->state,
633 						(vdo_is_read_only(zone->block_map->vdo) ?
634 						 VDO_READ_ONLY : VDO_SUCCESS));
635 	}
636 }
637 
638 static void enter_zone_read_only_mode(struct block_map_zone *zone, int result)
639 {
640 	vdo_enter_read_only_mode(zone->block_map->vdo, result);
641 
642 	/*
643 	 * We are in read-only mode, so we won't ever write any page out.
644 	 * Just take all waiters off the waitq so the zone can drain.
645 	 */
646 	vdo_waitq_init(&zone->flush_waiters);
647 	check_for_drain_complete(zone);
648 }
649 
650 static bool __must_check
651 validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion,
652 						bool writable)
653 {
654 	int result = validate_completed_page(completion, writable);
655 
656 	if (result == VDO_SUCCESS)
657 		return true;
658 
659 	enter_zone_read_only_mode(completion->info->cache->zone, result);
660 	return false;
661 }
662 
663 /**
664  * handle_load_error() - Handle page load errors.
665  * @completion: The page read vio.
666  */
667 static void handle_load_error(struct vdo_completion *completion)
668 {
669 	int result = completion->result;
670 	struct page_info *info = completion->parent;
671 	struct vdo_page_cache *cache = info->cache;
672 
673 	assert_on_cache_thread(cache, __func__);
674 	vio_record_metadata_io_error(as_vio(completion));
675 	vdo_enter_read_only_mode(cache->zone->block_map->vdo, result);
676 	ADD_ONCE(cache->stats.failed_reads, 1);
677 	set_info_state(info, PS_FAILED);
678 	vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result);
679 	reset_page_info(info);
680 
681 	/*
682 	 * Don't decrement until right before calling check_for_drain_complete() to
683 	 * ensure that the above work can't cause the page cache to be freed out from under us.
684 	 */
685 	cache->outstanding_reads--;
686 	check_for_drain_complete(cache->zone);
687 }
688 
689 /**
690  * page_is_loaded() - Callback used when a page has been loaded.
691  * @completion: The vio which has loaded the page. Its parent is the page_info.
692  */
693 static void page_is_loaded(struct vdo_completion *completion)
694 {
695 	struct page_info *info = completion->parent;
696 	struct vdo_page_cache *cache = info->cache;
697 	nonce_t nonce = info->cache->zone->block_map->nonce;
698 	struct block_map_page *page;
699 	enum block_map_page_validity validity;
700 
701 	assert_on_cache_thread(cache, __func__);
702 
703 	page = (struct block_map_page *) get_page_buffer(info);
704 	validity = vdo_validate_block_map_page(page, nonce, info->pbn);
705 	if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
706 		physical_block_number_t pbn = vdo_get_block_map_page_pbn(page);
707 		int result = vdo_log_error_strerror(VDO_BAD_PAGE,
708 						    "Expected page %llu but got page %llu instead",
709 						    (unsigned long long) info->pbn,
710 						    (unsigned long long) pbn);
711 
712 		vdo_continue_completion(completion, result);
713 		return;
714 	}
715 
716 	if (validity == VDO_BLOCK_MAP_PAGE_INVALID)
717 		vdo_format_block_map_page(page, nonce, info->pbn, false);
718 
719 	info->recovery_lock = 0;
720 	set_info_state(info, PS_RESIDENT);
721 	distribute_page_over_waitq(info, &info->waiting);
722 
723 	/*
724 	 * Don't decrement until right before calling check_for_drain_complete() to
725 	 * ensure that the above work can't cause the page cache to be freed out from under us.
726 	 */
727 	cache->outstanding_reads--;
728 	check_for_drain_complete(cache->zone);
729 }
730 
731 /**
732  * handle_rebuild_read_error() - Handle a read error during a read-only rebuild.
733  * @completion: The page load completion.
734  */
735 static void handle_rebuild_read_error(struct vdo_completion *completion)
736 {
737 	struct page_info *info = completion->parent;
738 	struct vdo_page_cache *cache = info->cache;
739 
740 	assert_on_cache_thread(cache, __func__);
741 
742 	/*
743 	 * We are doing a read-only rebuild, so treat this as a successful read
744 	 * of an uninitialized page.
745 	 */
746 	vio_record_metadata_io_error(as_vio(completion));
747 	ADD_ONCE(cache->stats.failed_reads, 1);
748 	memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE);
749 	vdo_reset_completion(completion);
750 	page_is_loaded(completion);
751 }
752 
753 static void load_cache_page_endio(struct bio *bio)
754 {
755 	struct vio *vio = bio->bi_private;
756 	struct page_info *info = vio->completion.parent;
757 
758 	continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id);
759 }
760 
761 /**
762  * launch_page_load() - Begin the process of loading a page.
763  *
764  * Return: VDO_SUCCESS or an error code.
765  */
766 static int __must_check launch_page_load(struct page_info *info,
767 					 physical_block_number_t pbn)
768 {
769 	int result;
770 	vdo_action_fn callback;
771 	struct vdo_page_cache *cache = info->cache;
772 
773 	assert_io_allowed(cache);
774 
775 	result = set_info_pbn(info, pbn);
776 	if (result != VDO_SUCCESS)
777 		return result;
778 
779 	result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading.");
780 	if (result != VDO_SUCCESS)
781 		return result;
782 
783 	set_info_state(info, PS_INCOMING);
784 	cache->outstanding_reads++;
785 	ADD_ONCE(cache->stats.pages_loaded, 1);
786 	callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error);
787 	vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio,
788 				callback, REQ_OP_READ | REQ_PRIO);
789 	return VDO_SUCCESS;
790 }
791 
792 static void write_pages(struct vdo_completion *completion);
793 
794 /** handle_flush_error() - Handle errors flushing the layer. */
795 static void handle_flush_error(struct vdo_completion *completion)
796 {
797 	struct page_info *info = completion->parent;
798 
799 	vio_record_metadata_io_error(as_vio(completion));
800 	set_persistent_error(info->cache, "flush failed", completion->result);
801 	write_pages(completion);
802 }
803 
804 static void flush_endio(struct bio *bio)
805 {
806 	struct vio *vio = bio->bi_private;
807 	struct page_info *info = vio->completion.parent;
808 
809 	continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id);
810 }
811 
812 /** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */
813 static void save_pages(struct vdo_page_cache *cache)
814 {
815 	struct page_info *info;
816 	struct vio *vio;
817 
818 	if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0))
819 		return;
820 
821 	assert_io_allowed(cache);
822 
823 	info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry);
824 
825 	cache->pages_in_flush = cache->pages_to_flush;
826 	cache->pages_to_flush = 0;
827 	ADD_ONCE(cache->stats.flush_count, 1);
828 
829 	vio = info->vio;
830 
831 	/*
832 	 * We must make sure that the recovery journal entries that changed these pages were
833 	 * successfully persisted, and thus must issue a flush before each batch of pages is
834 	 * written to ensure this.
835 	 */
836 	vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
837 }
838 
839 /**
840  * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved.
841  *
842  * Once in the list, a page may not be used until it has been written out.
843  */
844 static void schedule_page_save(struct page_info *info)
845 {
846 	if (info->busy > 0) {
847 		info->write_status = WRITE_STATUS_DEFERRED;
848 		return;
849 	}
850 
851 	info->cache->pages_to_flush++;
852 	info->cache->outstanding_writes++;
853 	set_info_state(info, PS_OUTGOING);
854 }
855 
856 /**
857  * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving
858  * pages if another save is not in progress.
859  */
860 static void launch_page_save(struct page_info *info)
861 {
862 	schedule_page_save(info);
863 	save_pages(info->cache);
864 }
865 
866 /**
867  * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is
868  *                           requesting a given page number.
869  * @context: A pointer to the pbn of the desired page.
870  *
871  * Implements waiter_match_fn.
872  *
873  * Return: true if the page completion is for the desired page number.
874  */
875 static bool completion_needs_page(struct vdo_waiter *waiter, void *context)
876 {
877 	physical_block_number_t *pbn = context;
878 
879 	return (page_completion_from_waiter(waiter)->pbn == *pbn);
880 }
881 
882 /**
883  * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and
884  *                        any other completions that match it in page number.
885  */
886 static void allocate_free_page(struct page_info *info)
887 {
888 	int result;
889 	struct vdo_waiter *oldest_waiter;
890 	physical_block_number_t pbn;
891 	struct vdo_page_cache *cache = info->cache;
892 
893 	assert_on_cache_thread(cache, __func__);
894 
895 	if (!vdo_waitq_has_waiters(&cache->free_waiters)) {
896 		if (cache->stats.cache_pressure > 0) {
897 			vdo_log_info("page cache pressure relieved");
898 			WRITE_ONCE(cache->stats.cache_pressure, 0);
899 		}
900 
901 		return;
902 	}
903 
904 	result = reset_page_info(info);
905 	if (result != VDO_SUCCESS) {
906 		set_persistent_error(cache, "cannot reset page info", result);
907 		return;
908 	}
909 
910 	oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters);
911 	pbn = page_completion_from_waiter(oldest_waiter)->pbn;
912 
913 	/*
914 	 * Remove all entries which match the page number in question and push them onto the page
915 	 * info's waitq.
916 	 */
917 	vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page,
918 					   &pbn, &info->waiting);
919 	cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting);
920 
921 	result = launch_page_load(info, pbn);
922 	if (result != VDO_SUCCESS) {
923 		vdo_waitq_notify_all_waiters(&info->waiting,
924 					     complete_waiter_with_error, &result);
925 	}
926 }
927 
928 /**
929  * discard_a_page() - Begin the process of discarding a page.
930  *
931  * If no page is discardable, increments a count of deferred frees so that the next release of a
932  * page which is no longer busy will kick off another discard cycle. This is an indication that the
933  * cache is not big enough.
934  *
935  * If the selected page is not dirty, immediately allocates the page to the oldest completion
936  * waiting for a free page.
937  */
938 static void discard_a_page(struct vdo_page_cache *cache)
939 {
940 	struct page_info *info = select_lru_page(cache);
941 
942 	if (info == NULL) {
943 		report_cache_pressure(cache);
944 		return;
945 	}
946 
947 	if (!is_dirty(info)) {
948 		allocate_free_page(info);
949 		return;
950 	}
951 
952 	VDO_ASSERT_LOG_ONLY(!is_in_flight(info),
953 			    "page selected for discard is not in flight");
954 
955 	cache->discard_count++;
956 	info->write_status = WRITE_STATUS_DISCARD;
957 	launch_page_save(info);
958 }
959 
960 /**
961  * discard_page_for_completion() - Helper used to trigger a discard so that the completion can get
962  *                                 a different page.
963  */
964 static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp)
965 {
966 	struct vdo_page_cache *cache = vdo_page_comp->cache;
967 
968 	cache->waiter_count++;
969 	vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter);
970 	discard_a_page(cache);
971 }
972 
973 /**
974  * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free
975  *                            page.
976  * @cache: The page cache.
977  */
978 static void discard_page_if_needed(struct vdo_page_cache *cache)
979 {
980 	if (cache->waiter_count > cache->discard_count)
981 		discard_a_page(cache);
982 }
983 
984 /**
985  * write_has_finished() - Inform the cache that a write has finished (possibly with an error).
986  * @info: The info structure for the page whose write just completed.
987  *
988  * Return: true if the page write was a discard.
989  */
990 static bool write_has_finished(struct page_info *info)
991 {
992 	bool was_discard = (info->write_status == WRITE_STATUS_DISCARD);
993 
994 	assert_on_cache_thread(info->cache, __func__);
995 	info->cache->outstanding_writes--;
996 
997 	info->write_status = WRITE_STATUS_NORMAL;
998 	return was_discard;
999 }
1000 
1001 /**
1002  * handle_page_write_error() - Handler for page write errors.
1003  * @completion: The page write vio.
1004  */
1005 static void handle_page_write_error(struct vdo_completion *completion)
1006 {
1007 	int result = completion->result;
1008 	struct page_info *info = completion->parent;
1009 	struct vdo_page_cache *cache = info->cache;
1010 
1011 	vio_record_metadata_io_error(as_vio(completion));
1012 
1013 	/* If we're already read-only, write failures are to be expected. */
1014 	if (result != VDO_READ_ONLY) {
1015 		vdo_log_ratelimit(vdo_log_error,
1016 				  "failed to write block map page %llu",
1017 				  (unsigned long long) info->pbn);
1018 	}
1019 
1020 	set_info_state(info, PS_DIRTY);
1021 	ADD_ONCE(cache->stats.failed_writes, 1);
1022 	set_persistent_error(cache, "cannot write page", result);
1023 
1024 	if (!write_has_finished(info))
1025 		discard_page_if_needed(cache);
1026 
1027 	check_for_drain_complete(cache->zone);
1028 }
1029 
1030 static void page_is_written_out(struct vdo_completion *completion);
1031 
1032 static void write_cache_page_endio(struct bio *bio)
1033 {
1034 	struct vio *vio = bio->bi_private;
1035 	struct page_info *info = vio->completion.parent;
1036 
1037 	continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id);
1038 }
1039 
1040 /**
1041  * page_is_written_out() - Callback used when a page has been written out.
1042  * @completion: The vio which wrote the page. Its parent is a page_info.
1043  */
1044 static void page_is_written_out(struct vdo_completion *completion)
1045 {
1046 	bool was_discard, reclaimed;
1047 	u32 reclamations;
1048 	struct page_info *info = completion->parent;
1049 	struct vdo_page_cache *cache = info->cache;
1050 	struct block_map_page *page = (struct block_map_page *) get_page_buffer(info);
1051 
1052 	if (!page->header.initialized) {
1053 		page->header.initialized = true;
1054 		vdo_submit_metadata_vio(info->vio, info->pbn,
1055 					write_cache_page_endio,
1056 					handle_page_write_error,
1057 					REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH);
1058 		return;
1059 	}
1060 
1061 	/* Handle journal updates and torn write protection. */
1062 	vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal,
1063 						     info->recovery_lock,
1064 						     VDO_ZONE_TYPE_LOGICAL,
1065 						     cache->zone->zone_number);
1066 	info->recovery_lock = 0;
1067 	was_discard = write_has_finished(info);
1068 	reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting));
1069 
1070 	set_info_state(info, PS_RESIDENT);
1071 
1072 	reclamations = distribute_page_over_waitq(info, &info->waiting);
1073 	ADD_ONCE(cache->stats.reclaimed, reclamations);
1074 
1075 	if (was_discard)
1076 		cache->discard_count--;
1077 
1078 	if (reclaimed)
1079 		discard_page_if_needed(cache);
1080 	else
1081 		allocate_free_page(info);
1082 
1083 	check_for_drain_complete(cache->zone);
1084 }
1085 
1086 /**
1087  * write_pages() - Write the batch of pages which were covered by the layer flush which just
1088  *                 completed.
1089  * @flush_completion: The flush vio.
1090  *
1091  * This callback is registered in save_pages().
1092  */
1093 static void write_pages(struct vdo_completion *flush_completion)
1094 {
1095 	struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache;
1096 
1097 	/*
1098 	 * We need to cache these two values on the stack since it is possible for the last
1099 	 * page info to cause the page cache to get freed. Hence once we launch the last page,
1100 	 * it may be unsafe to dereference the cache.
1101 	 */
1102 	bool has_unflushed_pages = (cache->pages_to_flush > 0);
1103 	page_count_t pages_in_flush = cache->pages_in_flush;
1104 
1105 	cache->pages_in_flush = 0;
1106 	while (pages_in_flush-- > 0) {
1107 		struct page_info *info =
1108 			list_first_entry(&cache->outgoing_list, struct page_info,
1109 					 state_entry);
1110 
1111 		list_del_init(&info->state_entry);
1112 		if (vdo_is_read_only(info->cache->vdo)) {
1113 			struct vdo_completion *completion = &info->vio->completion;
1114 
1115 			vdo_reset_completion(completion);
1116 			completion->callback = page_is_written_out;
1117 			completion->error_handler = handle_page_write_error;
1118 			vdo_fail_completion(completion, VDO_READ_ONLY);
1119 			continue;
1120 		}
1121 		ADD_ONCE(info->cache->stats.pages_saved, 1);
1122 		vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio,
1123 					handle_page_write_error, REQ_OP_WRITE | REQ_PRIO);
1124 	}
1125 
1126 	if (has_unflushed_pages) {
1127 		/*
1128 		 * If there are unflushed pages, the cache can't have been freed, so this call is
1129 		 * safe.
1130 		 */
1131 		save_pages(cache);
1132 	}
1133 }
1134 
1135 /**
1136  * vdo_release_page_completion() - Release a VDO Page Completion.
1137  *
1138  * The page referenced by this completion (if any) will no longer be held busy by this completion.
1139  * If a page becomes discardable and there are completions awaiting free pages then a new round of
1140  * page discarding is started.
1141  */
1142 void vdo_release_page_completion(struct vdo_completion *completion)
1143 {
1144 	struct page_info *discard_info = NULL;
1145 	struct vdo_page_completion *page_completion = as_vdo_page_completion(completion);
1146 	struct vdo_page_cache *cache;
1147 
1148 	if (completion->result == VDO_SUCCESS) {
1149 		if (!validate_completed_page_or_enter_read_only_mode(page_completion, false))
1150 			return;
1151 
1152 		if (--page_completion->info->busy == 0)
1153 			discard_info = page_completion->info;
1154 	}
1155 
1156 	VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1157 			    "Page being released after leaving all queues");
1158 
1159 	page_completion->info = NULL;
1160 	cache = page_completion->cache;
1161 	assert_on_cache_thread(cache, __func__);
1162 
1163 	if (discard_info != NULL) {
1164 		if (discard_info->write_status == WRITE_STATUS_DEFERRED) {
1165 			discard_info->write_status = WRITE_STATUS_NORMAL;
1166 			launch_page_save(discard_info);
1167 		}
1168 
1169 		/*
1170 		 * if there are excess requests for pages (that have not already started discards)
1171 		 * we need to discard some page (which may be this one)
1172 		 */
1173 		discard_page_if_needed(cache);
1174 	}
1175 }
1176 
1177 /**
1178  * load_page_for_completion() - Helper function to load a page as described by a VDO Page
1179  *                              Completion.
1180  */
1181 static void load_page_for_completion(struct page_info *info,
1182 				     struct vdo_page_completion *vdo_page_comp)
1183 {
1184 	int result;
1185 
1186 	vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter);
1187 	result = launch_page_load(info, vdo_page_comp->pbn);
1188 	if (result != VDO_SUCCESS) {
1189 		vdo_waitq_notify_all_waiters(&info->waiting,
1190 					     complete_waiter_with_error, &result);
1191 	}
1192 }
1193 
1194 /**
1195  * vdo_get_page() - Initialize a page completion and get a block map page.
1196  * @page_completion: The vdo_page_completion to initialize.
1197  * @zone: The block map zone of the desired page.
1198  * @pbn: The absolute physical block of the desired page.
1199  * @writable: Whether the page can be modified.
1200  * @parent: The object to notify when the fetch is complete.
1201  * @callback: The notification callback.
1202  * @error_handler: The handler for fetch errors.
1203  * @requeue: Whether we must requeue when notifying the parent.
1204  *
1205  * May cause another page to be discarded (potentially writing a dirty page) and the one nominated
1206  * by the completion to be loaded from disk. When the callback is invoked, the page will be
1207  * resident in the cache and marked busy. All callers must call vdo_release_page_completion()
1208  * when they are done with the page to clear the busy mark.
1209  */
1210 void vdo_get_page(struct vdo_page_completion *page_completion,
1211 		  struct block_map_zone *zone, physical_block_number_t pbn,
1212 		  bool writable, void *parent, vdo_action_fn callback,
1213 		  vdo_action_fn error_handler, bool requeue)
1214 {
1215 	struct vdo_page_cache *cache = &zone->page_cache;
1216 	struct vdo_completion *completion = &page_completion->completion;
1217 	struct page_info *info;
1218 
1219 	assert_on_cache_thread(cache, __func__);
1220 	VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1221 			    "New page completion was not already on a wait queue");
1222 
1223 	*page_completion = (struct vdo_page_completion) {
1224 		.pbn = pbn,
1225 		.writable = writable,
1226 		.cache = cache,
1227 	};
1228 
1229 	vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION);
1230 	vdo_prepare_completion(completion, callback, error_handler,
1231 			       cache->zone->thread_id, parent);
1232 	completion->requeue = requeue;
1233 
1234 	if (page_completion->writable && vdo_is_read_only(cache->vdo)) {
1235 		vdo_fail_completion(completion, VDO_READ_ONLY);
1236 		return;
1237 	}
1238 
1239 	if (page_completion->writable)
1240 		ADD_ONCE(cache->stats.write_count, 1);
1241 	else
1242 		ADD_ONCE(cache->stats.read_count, 1);
1243 
1244 	info = find_page(cache, page_completion->pbn);
1245 	if (info != NULL) {
1246 		/* The page is in the cache already. */
1247 		if ((info->write_status == WRITE_STATUS_DEFERRED) ||
1248 		    is_incoming(info) ||
1249 		    (is_outgoing(info) && page_completion->writable)) {
1250 			/* The page is unusable until it has finished I/O. */
1251 			ADD_ONCE(cache->stats.wait_for_page, 1);
1252 			vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter);
1253 			return;
1254 		}
1255 
1256 		if (is_valid(info)) {
1257 			/* The page is usable. */
1258 			ADD_ONCE(cache->stats.found_in_cache, 1);
1259 			if (!is_present(info))
1260 				ADD_ONCE(cache->stats.read_outgoing, 1);
1261 			update_lru(info);
1262 			info->busy++;
1263 			complete_with_page(info, page_completion);
1264 			return;
1265 		}
1266 
1267 		/* Something horrible has gone wrong. */
1268 		VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state.");
1269 	}
1270 
1271 	/* The page must be fetched. */
1272 	info = find_free_page(cache);
1273 	if (info != NULL) {
1274 		ADD_ONCE(cache->stats.fetch_required, 1);
1275 		load_page_for_completion(info, page_completion);
1276 		return;
1277 	}
1278 
1279 	/* The page must wait for a page to be discarded. */
1280 	ADD_ONCE(cache->stats.discard_required, 1);
1281 	discard_page_for_completion(page_completion);
1282 }
1283 
1284 /**
1285  * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy.
1286  * @completion: The vdo_page_completion containing the page.
1287  */
1288 void vdo_request_page_write(struct vdo_completion *completion)
1289 {
1290 	struct page_info *info;
1291 	struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion);
1292 
1293 	if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true))
1294 		return;
1295 
1296 	info = vdo_page_comp->info;
1297 	set_info_state(info, PS_DIRTY);
1298 	launch_page_save(info);
1299 }
1300 
1301 /**
1302  * vdo_get_cached_page() - Get the block map page from a page completion.
1303  * @completion: A vdo page completion whose callback has been called.
1304  * @page_ptr: A pointer to hold the page
1305  *
1306  * Return: VDO_SUCCESS or an error
1307  */
1308 int vdo_get_cached_page(struct vdo_completion *completion,
1309 			struct block_map_page **page_ptr)
1310 {
1311 	int result;
1312 	struct vdo_page_completion *vpc;
1313 
1314 	vpc = as_vdo_page_completion(completion);
1315 	result = validate_completed_page(vpc, true);
1316 	if (result == VDO_SUCCESS)
1317 		*page_ptr = (struct block_map_page *) get_page_buffer(vpc->info);
1318 
1319 	return result;
1320 }
1321 
1322 /**
1323  * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache.
1324  *
1325  * There must not be any dirty pages in the cache.
1326  *
1327  * Return: A success or error code.
1328  */
1329 int vdo_invalidate_page_cache(struct vdo_page_cache *cache)
1330 {
1331 	struct page_info *info;
1332 
1333 	assert_on_cache_thread(cache, __func__);
1334 
1335 	/* Make sure we don't throw away any dirty pages. */
1336 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
1337 		int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages");
1338 
1339 		if (result != VDO_SUCCESS)
1340 			return result;
1341 	}
1342 
1343 	/* Reset the page map by re-allocating it. */
1344 	vdo_int_map_free(vdo_forget(cache->page_map));
1345 	return vdo_int_map_create(cache->page_count, &cache->page_map);
1346 }
1347 
1348 /**
1349  * get_tree_page_by_index() - Get the tree page for a given height and page index.
1350  *
1351  * Return: The requested page.
1352  */
1353 static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest,
1354 							      root_count_t root_index,
1355 							      height_t height,
1356 							      page_number_t page_index)
1357 {
1358 	page_number_t offset = 0;
1359 	size_t segment;
1360 
1361 	for (segment = 0; segment < forest->segments; segment++) {
1362 		page_number_t border = forest->boundaries[segment].levels[height - 1];
1363 
1364 		if (page_index < border) {
1365 			struct block_map_tree *tree = &forest->trees[root_index];
1366 
1367 			return &(tree->segments[segment].levels[height - 1][page_index - offset]);
1368 		}
1369 
1370 		offset = border;
1371 	}
1372 
1373 	return NULL;
1374 }
1375 
1376 /* Get the page referred to by the lock's tree slot at its current height. */
1377 static inline struct tree_page *get_tree_page(const struct block_map_zone *zone,
1378 					      const struct tree_lock *lock)
1379 {
1380 	return get_tree_page_by_index(zone->block_map->forest, lock->root_index,
1381 				      lock->height,
1382 				      lock->tree_slots[lock->height].page_index);
1383 }
1384 
1385 /** vdo_copy_valid_page() - Validate and copy a buffer to a page. */
1386 bool vdo_copy_valid_page(char *buffer, nonce_t nonce,
1387 			 physical_block_number_t pbn,
1388 			 struct block_map_page *page)
1389 {
1390 	struct block_map_page *loaded = (struct block_map_page *) buffer;
1391 	enum block_map_page_validity validity =
1392 		vdo_validate_block_map_page(loaded, nonce, pbn);
1393 
1394 	if (validity == VDO_BLOCK_MAP_PAGE_VALID) {
1395 		memcpy(page, loaded, VDO_BLOCK_SIZE);
1396 		return true;
1397 	}
1398 
1399 	if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
1400 		vdo_log_error_strerror(VDO_BAD_PAGE,
1401 				       "Expected page %llu but got page %llu instead",
1402 				       (unsigned long long) pbn,
1403 				       (unsigned long long) vdo_get_block_map_page_pbn(loaded));
1404 	}
1405 
1406 	return false;
1407 }
1408 
1409 /**
1410  * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within
1411  *                     a cyclic range of values from 0 to (modulus - 1).
1412  * @lower: The lowest value to accept.
1413  * @value: The value to check.
1414  * @upper: The highest value to accept.
1415  * @modulus: The size of the cyclic space, no more than 2^15.
1416  *
1417  * The value and both bounds must be smaller than the modulus.
1418  *
1419  * Return: true if the value is in range.
1420  */
1421 static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus)
1422 {
1423 	if (value < lower)
1424 		value += modulus;
1425 	if (upper < lower)
1426 		upper += modulus;
1427 	return (value <= upper);
1428 }
1429 
1430 /**
1431  * is_not_older() - Check whether a generation is strictly older than some other generation in the
1432  *                  context of a zone's current generation range.
1433  * @zone: The zone in which to do the comparison.
1434  * @a: The generation in question.
1435  * @b: The generation to compare to.
1436  *
1437  * Return: true if generation @a is not strictly older than generation @b in the context of @zone
1438  */
1439 static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b)
1440 {
1441 	int result;
1442 
1443 	result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) &&
1444 			     in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)),
1445 			    "generation(s) %u, %u are out of range [%u, %u]",
1446 			    a, b, zone->oldest_generation, zone->generation);
1447 	if (result != VDO_SUCCESS) {
1448 		enter_zone_read_only_mode(zone, result);
1449 		return true;
1450 	}
1451 
1452 	return in_cyclic_range(b, a, zone->generation, 1 << 8);
1453 }
1454 
1455 static void release_generation(struct block_map_zone *zone, u8 generation)
1456 {
1457 	int result;
1458 
1459 	result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0),
1460 			    "dirty page count underflow for generation %u", generation);
1461 	if (result != VDO_SUCCESS) {
1462 		enter_zone_read_only_mode(zone, result);
1463 		return;
1464 	}
1465 
1466 	zone->dirty_page_counts[generation]--;
1467 	while ((zone->dirty_page_counts[zone->oldest_generation] == 0) &&
1468 	       (zone->oldest_generation != zone->generation))
1469 		zone->oldest_generation++;
1470 }
1471 
1472 static void set_generation(struct block_map_zone *zone, struct tree_page *page,
1473 			   u8 new_generation)
1474 {
1475 	u32 new_count;
1476 	int result;
1477 	bool decrement_old = vdo_waiter_is_waiting(&page->waiter);
1478 	u8 old_generation = page->generation;
1479 
1480 	if (decrement_old && (old_generation == new_generation))
1481 		return;
1482 
1483 	page->generation = new_generation;
1484 	new_count = ++zone->dirty_page_counts[new_generation];
1485 	result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u",
1486 			    new_generation);
1487 	if (result != VDO_SUCCESS) {
1488 		enter_zone_read_only_mode(zone, result);
1489 		return;
1490 	}
1491 
1492 	if (decrement_old)
1493 		release_generation(zone, old_generation);
1494 }
1495 
1496 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio);
1497 
1498 /* Implements waiter_callback_fn */
1499 static void write_page_callback(struct vdo_waiter *waiter, void *context)
1500 {
1501 	write_page(container_of(waiter, struct tree_page, waiter), context);
1502 }
1503 
1504 static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone)
1505 {
1506 	waiter->callback = write_page_callback;
1507 	acquire_vio_from_pool(zone->vio_pool, waiter);
1508 }
1509 
1510 /* Return: true if all possible generations were not already active */
1511 static bool attempt_increment(struct block_map_zone *zone)
1512 {
1513 	u8 generation = zone->generation + 1;
1514 
1515 	if (zone->oldest_generation == generation)
1516 		return false;
1517 
1518 	zone->generation = generation;
1519 	return true;
1520 }
1521 
1522 /* Launches a flush if one is not already in progress. */
1523 static void enqueue_page(struct tree_page *page, struct block_map_zone *zone)
1524 {
1525 	if ((zone->flusher == NULL) && attempt_increment(zone)) {
1526 		zone->flusher = page;
1527 		acquire_vio(&page->waiter, zone);
1528 		return;
1529 	}
1530 
1531 	vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter);
1532 }
1533 
1534 static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context)
1535 {
1536 	struct tree_page *page = container_of(waiter, struct tree_page, waiter);
1537 	struct write_if_not_dirtied_context *write_context = context;
1538 
1539 	if (page->generation == write_context->generation) {
1540 		acquire_vio(waiter, write_context->zone);
1541 		return;
1542 	}
1543 
1544 	enqueue_page(page, write_context->zone);
1545 }
1546 
1547 static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio)
1548 {
1549 	return_vio_to_pool(zone->vio_pool, vio);
1550 	check_for_drain_complete(zone);
1551 }
1552 
1553 /* This callback is registered in write_initialized_page(). */
1554 static void finish_page_write(struct vdo_completion *completion)
1555 {
1556 	bool dirty;
1557 	struct vio *vio = as_vio(completion);
1558 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1559 	struct tree_page *page = completion->parent;
1560 	struct block_map_zone *zone = pooled->context;
1561 
1562 	vdo_release_recovery_journal_block_reference(zone->block_map->journal,
1563 						     page->writing_recovery_lock,
1564 						     VDO_ZONE_TYPE_LOGICAL,
1565 						     zone->zone_number);
1566 
1567 	dirty = (page->writing_generation != page->generation);
1568 	release_generation(zone, page->writing_generation);
1569 	page->writing = false;
1570 
1571 	if (zone->flusher == page) {
1572 		struct write_if_not_dirtied_context context = {
1573 			.zone = zone,
1574 			.generation = page->writing_generation,
1575 		};
1576 
1577 		vdo_waitq_notify_all_waiters(&zone->flush_waiters,
1578 					     write_page_if_not_dirtied, &context);
1579 		if (dirty && attempt_increment(zone)) {
1580 			write_page(page, pooled);
1581 			return;
1582 		}
1583 
1584 		zone->flusher = NULL;
1585 	}
1586 
1587 	if (dirty) {
1588 		enqueue_page(page, zone);
1589 	} else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) &&
1590 		   attempt_increment(zone)) {
1591 		zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters),
1592 					     struct tree_page, waiter);
1593 		write_page(zone->flusher, pooled);
1594 		return;
1595 	}
1596 
1597 	return_to_pool(zone, pooled);
1598 }
1599 
1600 static void handle_write_error(struct vdo_completion *completion)
1601 {
1602 	int result = completion->result;
1603 	struct vio *vio = as_vio(completion);
1604 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1605 	struct block_map_zone *zone = pooled->context;
1606 
1607 	vio_record_metadata_io_error(vio);
1608 	enter_zone_read_only_mode(zone, result);
1609 	return_to_pool(zone, pooled);
1610 }
1611 
1612 static void write_page_endio(struct bio *bio);
1613 
1614 static void write_initialized_page(struct vdo_completion *completion)
1615 {
1616 	struct vio *vio = as_vio(completion);
1617 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1618 	struct block_map_zone *zone = pooled->context;
1619 	struct tree_page *tree_page = completion->parent;
1620 	struct block_map_page *page = (struct block_map_page *) vio->data;
1621 	blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO;
1622 
1623 	/*
1624 	 * Now that we know the page has been written at least once, mark the copy we are writing
1625 	 * as initialized.
1626 	 */
1627 	page->header.initialized = true;
1628 
1629 	if (zone->flusher == tree_page)
1630 		operation |= REQ_PREFLUSH;
1631 
1632 	vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page),
1633 				write_page_endio, handle_write_error,
1634 				operation);
1635 }
1636 
1637 static void write_page_endio(struct bio *bio)
1638 {
1639 	struct pooled_vio *vio = bio->bi_private;
1640 	struct block_map_zone *zone = vio->context;
1641 	struct block_map_page *page = (struct block_map_page *) vio->vio.data;
1642 
1643 	continue_vio_after_io(&vio->vio,
1644 			      (page->header.initialized ?
1645 			       finish_page_write : write_initialized_page),
1646 			      zone->thread_id);
1647 }
1648 
1649 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio)
1650 {
1651 	struct vdo_completion *completion = &vio->vio.completion;
1652 	struct block_map_zone *zone = vio->context;
1653 	struct block_map_page *page = vdo_as_block_map_page(tree_page);
1654 
1655 	if ((zone->flusher != tree_page) &&
1656 	    is_not_older(zone, tree_page->generation, zone->generation)) {
1657 		/*
1658 		 * This page was re-dirtied after the last flush was issued, hence we need to do
1659 		 * another flush.
1660 		 */
1661 		enqueue_page(tree_page, zone);
1662 		return_to_pool(zone, vio);
1663 		return;
1664 	}
1665 
1666 	completion->parent = tree_page;
1667 	memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE);
1668 	completion->callback_thread_id = zone->thread_id;
1669 
1670 	tree_page->writing = true;
1671 	tree_page->writing_generation = tree_page->generation;
1672 	tree_page->writing_recovery_lock = tree_page->recovery_lock;
1673 
1674 	/* Clear this now so that we know this page is not on any dirty list. */
1675 	tree_page->recovery_lock = 0;
1676 
1677 	/*
1678 	 * We've already copied the page into the vio which will write it, so if it was not yet
1679 	 * initialized, the first write will indicate that (for torn write protection). It is now
1680 	 * safe to mark it as initialized in memory since if the write fails, the in memory state
1681 	 * will become irrelevant.
1682 	 */
1683 	if (page->header.initialized) {
1684 		write_initialized_page(completion);
1685 		return;
1686 	}
1687 
1688 	page->header.initialized = true;
1689 	vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page),
1690 				write_page_endio, handle_write_error,
1691 				REQ_OP_WRITE | REQ_PRIO);
1692 }
1693 
1694 /* Release a lock on a page which was being loaded or allocated. */
1695 static void release_page_lock(struct data_vio *data_vio, char *what)
1696 {
1697 	struct block_map_zone *zone;
1698 	struct tree_lock *lock_holder;
1699 	struct tree_lock *lock = &data_vio->tree_lock;
1700 
1701 	VDO_ASSERT_LOG_ONLY(lock->locked,
1702 			    "release of unlocked block map page %s for key %llu in tree %u",
1703 			    what, (unsigned long long) lock->key, lock->root_index);
1704 
1705 	zone = data_vio->logical.zone->block_map_zone;
1706 	lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key);
1707 	VDO_ASSERT_LOG_ONLY((lock_holder == lock),
1708 			    "block map page %s mismatch for key %llu in tree %u",
1709 			    what, (unsigned long long) lock->key, lock->root_index);
1710 	lock->locked = false;
1711 }
1712 
1713 static void finish_lookup(struct data_vio *data_vio, int result)
1714 {
1715 	data_vio->tree_lock.height = 0;
1716 
1717 	--data_vio->logical.zone->block_map_zone->active_lookups;
1718 
1719 	set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot);
1720 	data_vio->vio.completion.error_handler = handle_data_vio_error;
1721 	continue_data_vio_with_error(data_vio, result);
1722 }
1723 
1724 static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context)
1725 {
1726 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1727 	int result = *((int *) context);
1728 
1729 	if (!data_vio->write) {
1730 		if (result == VDO_NO_SPACE)
1731 			result = VDO_SUCCESS;
1732 	} else if (result != VDO_NO_SPACE) {
1733 		result = VDO_READ_ONLY;
1734 	}
1735 
1736 	finish_lookup(data_vio, result);
1737 }
1738 
1739 static void abort_lookup(struct data_vio *data_vio, int result, char *what)
1740 {
1741 	if (result != VDO_NO_SPACE)
1742 		enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result);
1743 
1744 	if (data_vio->tree_lock.locked) {
1745 		release_page_lock(data_vio, what);
1746 		vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters,
1747 					     abort_lookup_for_waiter,
1748 					     &result);
1749 	}
1750 
1751 	finish_lookup(data_vio, result);
1752 }
1753 
1754 static void abort_load(struct data_vio *data_vio, int result)
1755 {
1756 	abort_lookup(data_vio, result, "load");
1757 }
1758 
1759 static bool __must_check is_invalid_tree_entry(const struct vdo *vdo,
1760 					       const struct data_location *mapping,
1761 					       height_t height)
1762 {
1763 	if (!vdo_is_valid_location(mapping) ||
1764 	    vdo_is_state_compressed(mapping->state) ||
1765 	    (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK)))
1766 		return true;
1767 
1768 	/* Roots aren't physical data blocks, so we can't check their PBNs. */
1769 	if (height == VDO_BLOCK_MAP_TREE_HEIGHT)
1770 		return false;
1771 
1772 	return !vdo_is_physical_data_block(vdo->depot, mapping->pbn);
1773 }
1774 
1775 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio);
1776 static void allocate_block_map_page(struct block_map_zone *zone,
1777 				    struct data_vio *data_vio);
1778 
1779 static void continue_with_loaded_page(struct data_vio *data_vio,
1780 				      struct block_map_page *page)
1781 {
1782 	struct tree_lock *lock = &data_vio->tree_lock;
1783 	struct block_map_tree_slot slot = lock->tree_slots[lock->height];
1784 	struct data_location mapping =
1785 		vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]);
1786 
1787 	if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
1788 		vdo_log_error_strerror(VDO_BAD_MAPPING,
1789 				       "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
1790 				       (unsigned long long) mapping.pbn, mapping.state,
1791 				       lock->tree_slots[lock->height - 1].page_index,
1792 				       lock->height - 1);
1793 		abort_load(data_vio, VDO_BAD_MAPPING);
1794 		return;
1795 	}
1796 
1797 	if (!vdo_is_mapped_location(&mapping)) {
1798 		/* The page we need is unallocated */
1799 		allocate_block_map_page(data_vio->logical.zone->block_map_zone,
1800 					data_vio);
1801 		return;
1802 	}
1803 
1804 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
1805 	if (lock->height == 1) {
1806 		finish_lookup(data_vio, VDO_SUCCESS);
1807 		return;
1808 	}
1809 
1810 	/* We know what page we need to load next */
1811 	load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1812 }
1813 
1814 static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context)
1815 {
1816 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1817 
1818 	data_vio->tree_lock.height--;
1819 	continue_with_loaded_page(data_vio, context);
1820 }
1821 
1822 static void finish_block_map_page_load(struct vdo_completion *completion)
1823 {
1824 	physical_block_number_t pbn;
1825 	struct tree_page *tree_page;
1826 	struct block_map_page *page;
1827 	nonce_t nonce;
1828 	struct vio *vio = as_vio(completion);
1829 	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1830 	struct data_vio *data_vio = completion->parent;
1831 	struct block_map_zone *zone = pooled->context;
1832 	struct tree_lock *tree_lock = &data_vio->tree_lock;
1833 
1834 	tree_lock->height--;
1835 	pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn;
1836 	tree_page = get_tree_page(zone, tree_lock);
1837 	page = (struct block_map_page *) tree_page->page_buffer;
1838 	nonce = zone->block_map->nonce;
1839 
1840 	if (!vdo_copy_valid_page(vio->data, nonce, pbn, page))
1841 		vdo_format_block_map_page(page, nonce, pbn, false);
1842 	return_vio_to_pool(zone->vio_pool, pooled);
1843 
1844 	/* Release our claim to the load and wake any waiters */
1845 	release_page_lock(data_vio, "load");
1846 	vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page);
1847 	continue_with_loaded_page(data_vio, page);
1848 }
1849 
1850 static void handle_io_error(struct vdo_completion *completion)
1851 {
1852 	int result = completion->result;
1853 	struct vio *vio = as_vio(completion);
1854 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1855 	struct data_vio *data_vio = completion->parent;
1856 	struct block_map_zone *zone = pooled->context;
1857 
1858 	vio_record_metadata_io_error(vio);
1859 	return_vio_to_pool(zone->vio_pool, pooled);
1860 	abort_load(data_vio, result);
1861 }
1862 
1863 static void load_page_endio(struct bio *bio)
1864 {
1865 	struct vio *vio = bio->bi_private;
1866 	struct data_vio *data_vio = vio->completion.parent;
1867 
1868 	continue_vio_after_io(vio, finish_block_map_page_load,
1869 			      data_vio->logical.zone->thread_id);
1870 }
1871 
1872 static void load_page(struct vdo_waiter *waiter, void *context)
1873 {
1874 	struct pooled_vio *pooled = context;
1875 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1876 	struct tree_lock *lock = &data_vio->tree_lock;
1877 	physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn;
1878 
1879 	pooled->vio.completion.parent = data_vio;
1880 	vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio,
1881 				handle_io_error, REQ_OP_READ | REQ_PRIO);
1882 }
1883 
1884 /*
1885  * If the page is already locked, queue up to wait for the lock to be released. If the lock is
1886  * acquired, @data_vio->tree_lock.locked will be true.
1887  */
1888 static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio)
1889 {
1890 	int result;
1891 	struct tree_lock *lock_holder;
1892 	struct tree_lock *lock = &data_vio->tree_lock;
1893 	height_t height = lock->height;
1894 	struct block_map_tree_slot tree_slot = lock->tree_slots[height];
1895 	union page_key key;
1896 
1897 	key.descriptor = (struct page_descriptor) {
1898 		.root_index = lock->root_index,
1899 		.height = height,
1900 		.page_index = tree_slot.page_index,
1901 		.slot = tree_slot.block_map_slot.slot,
1902 	};
1903 	lock->key = key.key;
1904 
1905 	result = vdo_int_map_put(zone->loading_pages, lock->key,
1906 				 lock, false, (void **) &lock_holder);
1907 	if (result != VDO_SUCCESS)
1908 		return result;
1909 
1910 	if (lock_holder == NULL) {
1911 		/* We got the lock */
1912 		data_vio->tree_lock.locked = true;
1913 		return VDO_SUCCESS;
1914 	}
1915 
1916 	/* Someone else is loading or allocating the page we need */
1917 	vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter);
1918 	return VDO_SUCCESS;
1919 }
1920 
1921 /* Load a block map tree page from disk, for the next level in the data vio tree lock. */
1922 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio)
1923 {
1924 	int result;
1925 
1926 	result = attempt_page_lock(zone, data_vio);
1927 	if (result != VDO_SUCCESS) {
1928 		abort_load(data_vio, result);
1929 		return;
1930 	}
1931 
1932 	if (data_vio->tree_lock.locked) {
1933 		data_vio->waiter.callback = load_page;
1934 		acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter);
1935 	}
1936 }
1937 
1938 static void allocation_failure(struct vdo_completion *completion)
1939 {
1940 	struct data_vio *data_vio = as_data_vio(completion);
1941 
1942 	if (vdo_requeue_completion_if_needed(completion,
1943 					     data_vio->logical.zone->thread_id))
1944 		return;
1945 
1946 	abort_lookup(data_vio, completion->result, "allocation");
1947 }
1948 
1949 static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context)
1950 {
1951 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1952 	struct tree_lock *tree_lock = &data_vio->tree_lock;
1953 	physical_block_number_t pbn = *((physical_block_number_t *) context);
1954 
1955 	tree_lock->height--;
1956 	data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn;
1957 
1958 	if (tree_lock->height == 0) {
1959 		finish_lookup(data_vio, VDO_SUCCESS);
1960 		return;
1961 	}
1962 
1963 	allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1964 }
1965 
1966 /** expire_oldest_list() - Expire the oldest list. */
1967 static void expire_oldest_list(struct dirty_lists *dirty_lists)
1968 {
1969 	block_count_t i = dirty_lists->offset++;
1970 
1971 	dirty_lists->oldest_period++;
1972 	if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) {
1973 		list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE],
1974 				      &dirty_lists->expired[VDO_TREE_PAGE]);
1975 	}
1976 
1977 	if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) {
1978 		list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE],
1979 				      &dirty_lists->expired[VDO_CACHE_PAGE]);
1980 	}
1981 
1982 	if (dirty_lists->offset == dirty_lists->maximum_age)
1983 		dirty_lists->offset = 0;
1984 }
1985 
1986 
1987 /** update_period() - Update the dirty_lists period if necessary. */
1988 static void update_period(struct dirty_lists *dirty, sequence_number_t period)
1989 {
1990 	while (dirty->next_period <= period) {
1991 		if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age)
1992 			expire_oldest_list(dirty);
1993 		dirty->next_period++;
1994 	}
1995 }
1996 
1997 /** write_expired_elements() - Write out the expired list. */
1998 static void write_expired_elements(struct block_map_zone *zone)
1999 {
2000 	struct tree_page *page, *ttmp;
2001 	struct page_info *info, *ptmp;
2002 	struct list_head *expired;
2003 	u8 generation = zone->generation;
2004 
2005 	expired = &zone->dirty_lists->expired[VDO_TREE_PAGE];
2006 	list_for_each_entry_safe(page, ttmp, expired, entry) {
2007 		int result;
2008 
2009 		list_del_init(&page->entry);
2010 
2011 		result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter),
2012 				    "Newly expired page not already waiting to write");
2013 		if (result != VDO_SUCCESS) {
2014 			enter_zone_read_only_mode(zone, result);
2015 			continue;
2016 		}
2017 
2018 		set_generation(zone, page, generation);
2019 		if (!page->writing)
2020 			enqueue_page(page, zone);
2021 	}
2022 
2023 	expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE];
2024 	list_for_each_entry_safe(info, ptmp, expired, state_entry) {
2025 		list_del_init(&info->state_entry);
2026 		schedule_page_save(info);
2027 	}
2028 
2029 	save_pages(&zone->page_cache);
2030 }
2031 
2032 /**
2033  * add_to_dirty_lists() - Add an element to the dirty lists.
2034  * @zone: The zone in which we are operating.
2035  * @entry: The list entry of the element to add.
2036  * @type: The type of page.
2037  * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty.
2038  * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a
2039  *              lock.
2040  */
2041 static void add_to_dirty_lists(struct block_map_zone *zone,
2042 			       struct list_head *entry,
2043 			       enum block_map_page_type type,
2044 			       sequence_number_t old_period,
2045 			       sequence_number_t new_period)
2046 {
2047 	struct dirty_lists *dirty_lists = zone->dirty_lists;
2048 
2049 	if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period)))
2050 		return;
2051 
2052 	if (new_period < dirty_lists->oldest_period) {
2053 		list_move_tail(entry, &dirty_lists->expired[type]);
2054 	} else {
2055 		update_period(dirty_lists, new_period);
2056 		list_move_tail(entry,
2057 			       &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]);
2058 	}
2059 
2060 	write_expired_elements(zone);
2061 }
2062 
2063 /*
2064  * Record the allocation in the tree and wake any waiters now that the write lock has been
2065  * released.
2066  */
2067 static void finish_block_map_allocation(struct vdo_completion *completion)
2068 {
2069 	physical_block_number_t pbn;
2070 	struct tree_page *tree_page;
2071 	struct block_map_page *page;
2072 	sequence_number_t old_lock;
2073 	struct data_vio *data_vio = as_data_vio(completion);
2074 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2075 	struct tree_lock *tree_lock = &data_vio->tree_lock;
2076 	height_t height = tree_lock->height;
2077 
2078 	assert_data_vio_in_logical_zone(data_vio);
2079 
2080 	tree_page = get_tree_page(zone, tree_lock);
2081 	pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn;
2082 
2083 	/* Record the allocation. */
2084 	page = (struct block_map_page *) tree_page->page_buffer;
2085 	old_lock = tree_page->recovery_lock;
2086 	vdo_update_block_map_page(page, data_vio, pbn,
2087 				  VDO_MAPPING_STATE_UNCOMPRESSED,
2088 				  &tree_page->recovery_lock);
2089 
2090 	if (vdo_waiter_is_waiting(&tree_page->waiter)) {
2091 		/* This page is waiting to be written out. */
2092 		if (zone->flusher != tree_page) {
2093 			/*
2094 			 * The outstanding flush won't cover the update we just made,
2095 			 * so mark the page as needing another flush.
2096 			 */
2097 			set_generation(zone, tree_page, zone->generation);
2098 		}
2099 	} else {
2100 		/* Put the page on a dirty list */
2101 		if (old_lock == 0)
2102 			INIT_LIST_HEAD(&tree_page->entry);
2103 		add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE,
2104 				   old_lock, tree_page->recovery_lock);
2105 	}
2106 
2107 	tree_lock->height--;
2108 	if (height > 1) {
2109 		/* Format the interior node we just allocated (in memory). */
2110 		tree_page = get_tree_page(zone, tree_lock);
2111 		vdo_format_block_map_page(tree_page->page_buffer,
2112 					  zone->block_map->nonce,
2113 					  pbn, false);
2114 	}
2115 
2116 	/* Release our claim to the allocation and wake any waiters */
2117 	release_page_lock(data_vio, "allocation");
2118 	vdo_waitq_notify_all_waiters(&tree_lock->waiters,
2119 				     continue_allocation_for_waiter, &pbn);
2120 	if (tree_lock->height == 0) {
2121 		finish_lookup(data_vio, VDO_SUCCESS);
2122 		return;
2123 	}
2124 
2125 	allocate_block_map_page(zone, data_vio);
2126 }
2127 
2128 static void release_block_map_write_lock(struct vdo_completion *completion)
2129 {
2130 	struct data_vio *data_vio = as_data_vio(completion);
2131 
2132 	assert_data_vio_in_allocated_zone(data_vio);
2133 
2134 	release_data_vio_allocation_lock(data_vio, true);
2135 	launch_data_vio_logical_callback(data_vio, finish_block_map_allocation);
2136 }
2137 
2138 /*
2139  * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled,
2140  * to prevent deduplication against the block after we release the write lock on it, but before we
2141  * write out the page.
2142  */
2143 static void set_block_map_page_reference_count(struct vdo_completion *completion)
2144 {
2145 	struct data_vio *data_vio = as_data_vio(completion);
2146 
2147 	assert_data_vio_in_allocated_zone(data_vio);
2148 
2149 	completion->callback = release_block_map_write_lock;
2150 	vdo_modify_reference_count(completion, &data_vio->increment_updater);
2151 }
2152 
2153 static void journal_block_map_allocation(struct vdo_completion *completion)
2154 {
2155 	struct data_vio *data_vio = as_data_vio(completion);
2156 
2157 	assert_data_vio_in_journal_zone(data_vio);
2158 
2159 	set_data_vio_allocated_zone_callback(data_vio,
2160 					     set_block_map_page_reference_count);
2161 	vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
2162 }
2163 
2164 static void allocate_block(struct vdo_completion *completion)
2165 {
2166 	struct data_vio *data_vio = as_data_vio(completion);
2167 	struct tree_lock *lock = &data_vio->tree_lock;
2168 	physical_block_number_t pbn;
2169 
2170 	assert_data_vio_in_allocated_zone(data_vio);
2171 
2172 	if (!vdo_allocate_block_in_zone(data_vio))
2173 		return;
2174 
2175 	pbn = data_vio->allocation.pbn;
2176 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn;
2177 	data_vio->increment_updater = (struct reference_updater) {
2178 		.operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING,
2179 		.increment = true,
2180 		.zpbn = {
2181 			.pbn = pbn,
2182 			.state = VDO_MAPPING_STATE_UNCOMPRESSED,
2183 		},
2184 		.lock = data_vio->allocation.lock,
2185 	};
2186 
2187 	launch_data_vio_journal_callback(data_vio, journal_block_map_allocation);
2188 }
2189 
2190 static void allocate_block_map_page(struct block_map_zone *zone,
2191 				    struct data_vio *data_vio)
2192 {
2193 	int result;
2194 
2195 	if (!data_vio->write || data_vio->is_discard) {
2196 		/* This is a pure read or a discard, so there's nothing left to do here. */
2197 		finish_lookup(data_vio, VDO_SUCCESS);
2198 		return;
2199 	}
2200 
2201 	result = attempt_page_lock(zone, data_vio);
2202 	if (result != VDO_SUCCESS) {
2203 		abort_lookup(data_vio, result, "allocation");
2204 		return;
2205 	}
2206 
2207 	if (!data_vio->tree_lock.locked)
2208 		return;
2209 
2210 	data_vio_allocate_data_block(data_vio, VIO_BLOCK_MAP_WRITE_LOCK,
2211 				     allocate_block, allocation_failure);
2212 }
2213 
2214 /**
2215  * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio
2216  *                             resides and cache that result in the data_vio.
2217  *
2218  * All ancestors in the tree will be allocated or loaded, as needed.
2219  */
2220 void vdo_find_block_map_slot(struct data_vio *data_vio)
2221 {
2222 	page_number_t page_index;
2223 	struct block_map_tree_slot tree_slot;
2224 	struct data_location mapping;
2225 	struct block_map_page *page = NULL;
2226 	struct tree_lock *lock = &data_vio->tree_lock;
2227 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2228 
2229 	zone->active_lookups++;
2230 	if (vdo_is_state_draining(&zone->state)) {
2231 		finish_lookup(data_vio, VDO_SHUTTING_DOWN);
2232 		return;
2233 	}
2234 
2235 	lock->tree_slots[0].block_map_slot.slot =
2236 		data_vio->logical.lbn % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2237 	page_index = (lock->tree_slots[0].page_index / zone->block_map->root_count);
2238 	tree_slot = (struct block_map_tree_slot) {
2239 		.page_index = page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2240 		.block_map_slot = {
2241 			.pbn = 0,
2242 			.slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2243 		},
2244 	};
2245 
2246 	for (lock->height = 1; lock->height <= VDO_BLOCK_MAP_TREE_HEIGHT; lock->height++) {
2247 		physical_block_number_t pbn;
2248 
2249 		lock->tree_slots[lock->height] = tree_slot;
2250 		page = (struct block_map_page *) (get_tree_page(zone, lock)->page_buffer);
2251 		pbn = vdo_get_block_map_page_pbn(page);
2252 		if (pbn != VDO_ZERO_BLOCK) {
2253 			lock->tree_slots[lock->height].block_map_slot.pbn = pbn;
2254 			break;
2255 		}
2256 
2257 		/* Calculate the index and slot for the next level. */
2258 		tree_slot.block_map_slot.slot =
2259 			tree_slot.page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2260 		tree_slot.page_index = tree_slot.page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2261 	}
2262 
2263 	/* The page at this height has been allocated and loaded. */
2264 	mapping = vdo_unpack_block_map_entry(&page->entries[tree_slot.block_map_slot.slot]);
2265 	if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
2266 		vdo_log_error_strerror(VDO_BAD_MAPPING,
2267 				       "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
2268 				       (unsigned long long) mapping.pbn, mapping.state,
2269 				       lock->tree_slots[lock->height - 1].page_index,
2270 				       lock->height - 1);
2271 		abort_load(data_vio, VDO_BAD_MAPPING);
2272 		return;
2273 	}
2274 
2275 	if (!vdo_is_mapped_location(&mapping)) {
2276 		/* The page we want one level down has not been allocated, so allocate it. */
2277 		allocate_block_map_page(zone, data_vio);
2278 		return;
2279 	}
2280 
2281 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
2282 	if (lock->height == 1) {
2283 		/* This is the ultimate block map page, so we're done */
2284 		finish_lookup(data_vio, VDO_SUCCESS);
2285 		return;
2286 	}
2287 
2288 	/* We know what page we need to load. */
2289 	load_block_map_page(zone, data_vio);
2290 }
2291 
2292 /*
2293  * Find the PBN of a leaf block map page. This method may only be used after all allocated tree
2294  * pages have been loaded, otherwise, it may give the wrong answer (0).
2295  */
2296 physical_block_number_t vdo_find_block_map_page_pbn(struct block_map *map,
2297 						    page_number_t page_number)
2298 {
2299 	struct data_location mapping;
2300 	struct tree_page *tree_page;
2301 	struct block_map_page *page;
2302 	root_count_t root_index = page_number % map->root_count;
2303 	page_number_t page_index = page_number / map->root_count;
2304 	slot_number_t slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2305 
2306 	page_index /= VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2307 
2308 	tree_page = get_tree_page_by_index(map->forest, root_index, 1, page_index);
2309 	page = (struct block_map_page *) tree_page->page_buffer;
2310 	if (!page->header.initialized)
2311 		return VDO_ZERO_BLOCK;
2312 
2313 	mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
2314 	if (!vdo_is_valid_location(&mapping) || vdo_is_state_compressed(mapping.state))
2315 		return VDO_ZERO_BLOCK;
2316 	return mapping.pbn;
2317 }
2318 
2319 /*
2320  * Write a tree page or indicate that it has been re-dirtied if it is already being written. This
2321  * method is used when correcting errors in the tree during read-only rebuild.
2322  */
2323 void vdo_write_tree_page(struct tree_page *page, struct block_map_zone *zone)
2324 {
2325 	bool waiting = vdo_waiter_is_waiting(&page->waiter);
2326 
2327 	if (waiting && (zone->flusher == page))
2328 		return;
2329 
2330 	set_generation(zone, page, zone->generation);
2331 	if (waiting || page->writing)
2332 		return;
2333 
2334 	enqueue_page(page, zone);
2335 }
2336 
2337 static int make_segment(struct forest *old_forest, block_count_t new_pages,
2338 			struct boundary *new_boundary, struct forest *forest)
2339 {
2340 	size_t index = (old_forest == NULL) ? 0 : old_forest->segments;
2341 	struct tree_page *page_ptr;
2342 	page_count_t segment_sizes[VDO_BLOCK_MAP_TREE_HEIGHT];
2343 	height_t height;
2344 	root_count_t root;
2345 	int result;
2346 
2347 	forest->segments = index + 1;
2348 
2349 	result = vdo_allocate(forest->segments, struct boundary,
2350 			      "forest boundary array", &forest->boundaries);
2351 	if (result != VDO_SUCCESS)
2352 		return result;
2353 
2354 	result = vdo_allocate(forest->segments, struct tree_page *,
2355 			      "forest page pointers", &forest->pages);
2356 	if (result != VDO_SUCCESS)
2357 		return result;
2358 
2359 	result = vdo_allocate(new_pages, struct tree_page,
2360 			      "new forest pages", &forest->pages[index]);
2361 	if (result != VDO_SUCCESS)
2362 		return result;
2363 
2364 	if (index > 0) {
2365 		memcpy(forest->boundaries, old_forest->boundaries,
2366 		       index * sizeof(struct boundary));
2367 		memcpy(forest->pages, old_forest->pages,
2368 		       index * sizeof(struct tree_page *));
2369 	}
2370 
2371 	memcpy(&(forest->boundaries[index]), new_boundary, sizeof(struct boundary));
2372 
2373 	for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2374 		segment_sizes[height] = new_boundary->levels[height];
2375 		if (index > 0)
2376 			segment_sizes[height] -= old_forest->boundaries[index - 1].levels[height];
2377 	}
2378 
2379 	page_ptr = forest->pages[index];
2380 	for (root = 0; root < forest->map->root_count; root++) {
2381 		struct block_map_tree_segment *segment;
2382 		struct block_map_tree *tree = &(forest->trees[root]);
2383 		height_t height;
2384 
2385 		int result = vdo_allocate(forest->segments,
2386 					  struct block_map_tree_segment,
2387 					  "tree root segments", &tree->segments);
2388 		if (result != VDO_SUCCESS)
2389 			return result;
2390 
2391 		if (index > 0) {
2392 			memcpy(tree->segments, old_forest->trees[root].segments,
2393 			       index * sizeof(struct block_map_tree_segment));
2394 		}
2395 
2396 		segment = &(tree->segments[index]);
2397 		for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2398 			if (segment_sizes[height] == 0)
2399 				continue;
2400 
2401 			segment->levels[height] = page_ptr;
2402 			if (height == (VDO_BLOCK_MAP_TREE_HEIGHT - 1)) {
2403 				/* Record the root. */
2404 				struct block_map_page *page =
2405 					vdo_format_block_map_page(page_ptr->page_buffer,
2406 								  forest->map->nonce,
2407 								  VDO_INVALID_PBN, true);
2408 				page->entries[0] =
2409 					vdo_pack_block_map_entry(forest->map->root_origin + root,
2410 								 VDO_MAPPING_STATE_UNCOMPRESSED);
2411 			}
2412 			page_ptr += segment_sizes[height];
2413 		}
2414 	}
2415 
2416 	return VDO_SUCCESS;
2417 }
2418 
2419 static void deforest(struct forest *forest, size_t first_page_segment)
2420 {
2421 	root_count_t root;
2422 
2423 	if (forest->pages != NULL) {
2424 		size_t segment;
2425 
2426 		for (segment = first_page_segment; segment < forest->segments; segment++)
2427 			vdo_free(forest->pages[segment]);
2428 		vdo_free(forest->pages);
2429 	}
2430 
2431 	for (root = 0; root < forest->map->root_count; root++)
2432 		vdo_free(forest->trees[root].segments);
2433 
2434 	vdo_free(forest->boundaries);
2435 	vdo_free(forest);
2436 }
2437 
2438 /**
2439  * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if
2440  *                 there is one.
2441  * @entries: The number of entries the block map will hold.
2442  *
2443  * Return: VDO_SUCCESS or an error.
2444  */
2445 static int make_forest(struct block_map *map, block_count_t entries)
2446 {
2447 	struct forest *forest, *old_forest = map->forest;
2448 	struct boundary new_boundary, *old_boundary = NULL;
2449 	block_count_t new_pages;
2450 	int result;
2451 
2452 	if (old_forest != NULL)
2453 		old_boundary = &(old_forest->boundaries[old_forest->segments - 1]);
2454 
2455 	new_pages = vdo_compute_new_forest_pages(map->root_count, old_boundary,
2456 						 entries, &new_boundary);
2457 	if (new_pages == 0) {
2458 		map->next_entry_count = entries;
2459 		return VDO_SUCCESS;
2460 	}
2461 
2462 	result = vdo_allocate_extended(struct forest, map->root_count,
2463 				       struct block_map_tree, __func__,
2464 				       &forest);
2465 	if (result != VDO_SUCCESS)
2466 		return result;
2467 
2468 	forest->map = map;
2469 	result = make_segment(old_forest, new_pages, &new_boundary, forest);
2470 	if (result != VDO_SUCCESS) {
2471 		deforest(forest, forest->segments - 1);
2472 		return result;
2473 	}
2474 
2475 	map->next_forest = forest;
2476 	map->next_entry_count = entries;
2477 	return VDO_SUCCESS;
2478 }
2479 
2480 /**
2481  * replace_forest() - Replace a block_map's forest with the already-prepared larger forest.
2482  */
2483 static void replace_forest(struct block_map *map)
2484 {
2485 	if (map->next_forest != NULL) {
2486 		if (map->forest != NULL)
2487 			deforest(map->forest, map->forest->segments);
2488 		map->forest = vdo_forget(map->next_forest);
2489 	}
2490 
2491 	map->entry_count = map->next_entry_count;
2492 	map->next_entry_count = 0;
2493 }
2494 
2495 /**
2496  * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the
2497  *                   traversal.
2498  */
2499 static void finish_cursor(struct cursor *cursor)
2500 {
2501 	struct cursors *cursors = cursor->parent;
2502 	struct vdo_completion *completion = cursors->completion;
2503 
2504 	return_vio_to_pool(cursors->pool, vdo_forget(cursor->vio));
2505 	if (--cursors->active_roots > 0)
2506 		return;
2507 
2508 	vdo_free(cursors);
2509 
2510 	vdo_finish_completion(completion);
2511 }
2512 
2513 static void traverse(struct cursor *cursor);
2514 
2515 /**
2516  * continue_traversal() - Continue traversing a block map tree.
2517  * @completion: The VIO doing a read or write.
2518  */
2519 static void continue_traversal(struct vdo_completion *completion)
2520 {
2521 	vio_record_metadata_io_error(as_vio(completion));
2522 	traverse(completion->parent);
2523 }
2524 
2525 /**
2526  * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded.
2527  * @completion: The VIO doing the read.
2528  */
2529 static void finish_traversal_load(struct vdo_completion *completion)
2530 {
2531 	struct cursor *cursor = completion->parent;
2532 	height_t height = cursor->height;
2533 	struct cursor_level *level = &cursor->levels[height];
2534 	struct tree_page *tree_page =
2535 		&(cursor->tree->segments[0].levels[height][level->page_index]);
2536 	struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2537 
2538 	vdo_copy_valid_page(cursor->vio->vio.data,
2539 			    cursor->parent->zone->block_map->nonce,
2540 			    pbn_from_vio_bio(cursor->vio->vio.bio), page);
2541 	traverse(cursor);
2542 }
2543 
2544 static void traversal_endio(struct bio *bio)
2545 {
2546 	struct vio *vio = bio->bi_private;
2547 	struct cursor *cursor = vio->completion.parent;
2548 
2549 	continue_vio_after_io(vio, finish_traversal_load,
2550 			      cursor->parent->zone->thread_id);
2551 }
2552 
2553 /**
2554  * traverse() - Traverse a single block map tree.
2555  *
2556  * This is the recursive heart of the traversal process.
2557  */
2558 static void traverse(struct cursor *cursor)
2559 {
2560 	for (; cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT; cursor->height++) {
2561 		height_t height = cursor->height;
2562 		struct cursor_level *level = &cursor->levels[height];
2563 		struct tree_page *tree_page =
2564 			&(cursor->tree->segments[0].levels[height][level->page_index]);
2565 		struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2566 
2567 		if (!page->header.initialized)
2568 			continue;
2569 
2570 		for (; level->slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; level->slot++) {
2571 			struct cursor_level *next_level;
2572 			page_number_t entry_index =
2573 				(VDO_BLOCK_MAP_ENTRIES_PER_PAGE * level->page_index) + level->slot;
2574 			struct data_location location =
2575 				vdo_unpack_block_map_entry(&page->entries[level->slot]);
2576 
2577 			if (!vdo_is_valid_location(&location)) {
2578 				/* This entry is invalid, so remove it from the page. */
2579 				page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2580 				vdo_write_tree_page(tree_page, cursor->parent->zone);
2581 				continue;
2582 			}
2583 
2584 			if (!vdo_is_mapped_location(&location))
2585 				continue;
2586 
2587 			/* Erase mapped entries past the end of the logical space. */
2588 			if (entry_index >= cursor->boundary.levels[height]) {
2589 				page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2590 				vdo_write_tree_page(tree_page, cursor->parent->zone);
2591 				continue;
2592 			}
2593 
2594 			if (cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT - 1) {
2595 				int result = cursor->parent->entry_callback(location.pbn,
2596 									    cursor->parent->completion);
2597 				if (result != VDO_SUCCESS) {
2598 					page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2599 					vdo_write_tree_page(tree_page, cursor->parent->zone);
2600 					continue;
2601 				}
2602 			}
2603 
2604 			if (cursor->height == 0)
2605 				continue;
2606 
2607 			cursor->height--;
2608 			next_level = &cursor->levels[cursor->height];
2609 			next_level->page_index = entry_index;
2610 			next_level->slot = 0;
2611 			level->slot++;
2612 			vdo_submit_metadata_vio(&cursor->vio->vio, location.pbn,
2613 						traversal_endio, continue_traversal,
2614 						REQ_OP_READ | REQ_PRIO);
2615 			return;
2616 		}
2617 	}
2618 
2619 	finish_cursor(cursor);
2620 }
2621 
2622 /**
2623  * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with
2624  *                   which to load pages.
2625  * @context: The pooled_vio just acquired.
2626  *
2627  * Implements waiter_callback_fn.
2628  */
2629 static void launch_cursor(struct vdo_waiter *waiter, void *context)
2630 {
2631 	struct cursor *cursor = container_of(waiter, struct cursor, waiter);
2632 	struct pooled_vio *pooled = context;
2633 
2634 	cursor->vio = pooled;
2635 	pooled->vio.completion.parent = cursor;
2636 	pooled->vio.completion.callback_thread_id = cursor->parent->zone->thread_id;
2637 	traverse(cursor);
2638 }
2639 
2640 /**
2641  * compute_boundary() - Compute the number of pages used at each level of the given root's tree.
2642  *
2643  * Return: The list of page counts as a boundary structure.
2644  */
2645 static struct boundary compute_boundary(struct block_map *map, root_count_t root_index)
2646 {
2647 	struct boundary boundary;
2648 	height_t height;
2649 	page_count_t leaf_pages = vdo_compute_block_map_page_count(map->entry_count);
2650 	/*
2651 	 * Compute the leaf pages for this root. If the number of leaf pages does not distribute
2652 	 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to
2653 	 * roots starting from tree 0.
2654 	 */
2655 	page_count_t last_tree_root = (leaf_pages - 1) % map->root_count;
2656 	page_count_t level_pages = leaf_pages / map->root_count;
2657 
2658 	if (root_index <= last_tree_root)
2659 		level_pages++;
2660 
2661 	for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT - 1; height++) {
2662 		boundary.levels[height] = level_pages;
2663 		level_pages = DIV_ROUND_UP(level_pages, VDO_BLOCK_MAP_ENTRIES_PER_PAGE);
2664 	}
2665 
2666 	/* The root node always exists, even if the root is otherwise unused. */
2667 	boundary.levels[VDO_BLOCK_MAP_TREE_HEIGHT - 1] = 1;
2668 
2669 	return boundary;
2670 }
2671 
2672 /**
2673  * vdo_traverse_forest() - Walk the entire forest of a block map.
2674  * @callback: A function to call with the pbn of each allocated node in the forest.
2675  * @completion: The completion to notify on each traversed PBN, and when traversal completes.
2676  */
2677 void vdo_traverse_forest(struct block_map *map, vdo_entry_callback_fn callback,
2678 			 struct vdo_completion *completion)
2679 {
2680 	root_count_t root;
2681 	struct cursors *cursors;
2682 	int result;
2683 
2684 	result = vdo_allocate_extended(struct cursors, map->root_count,
2685 				       struct cursor, __func__, &cursors);
2686 	if (result != VDO_SUCCESS) {
2687 		vdo_fail_completion(completion, result);
2688 		return;
2689 	}
2690 
2691 	cursors->zone = &map->zones[0];
2692 	cursors->pool = cursors->zone->vio_pool;
2693 	cursors->entry_callback = callback;
2694 	cursors->completion = completion;
2695 	cursors->active_roots = map->root_count;
2696 	for (root = 0; root < map->root_count; root++) {
2697 		struct cursor *cursor = &cursors->cursors[root];
2698 
2699 		*cursor = (struct cursor) {
2700 			.tree = &map->forest->trees[root],
2701 			.height = VDO_BLOCK_MAP_TREE_HEIGHT - 1,
2702 			.parent = cursors,
2703 			.boundary = compute_boundary(map, root),
2704 		};
2705 
2706 		cursor->waiter.callback = launch_cursor;
2707 		acquire_vio_from_pool(cursors->pool, &cursor->waiter);
2708 	}
2709 }
2710 
2711 /**
2712  * initialize_block_map_zone() - Initialize the per-zone portions of the block map.
2713  * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be
2714  *               written out.
2715  */
2716 static int __must_check initialize_block_map_zone(struct block_map *map,
2717 						  zone_count_t zone_number,
2718 						  page_count_t cache_size,
2719 						  block_count_t maximum_age)
2720 {
2721 	int result;
2722 	block_count_t i;
2723 	struct vdo *vdo = map->vdo;
2724 	struct block_map_zone *zone = &map->zones[zone_number];
2725 
2726 	BUILD_BUG_ON(sizeof(struct page_descriptor) != sizeof(u64));
2727 
2728 	zone->zone_number = zone_number;
2729 	zone->thread_id = vdo->thread_config.logical_threads[zone_number];
2730 	zone->block_map = map;
2731 
2732 	result = vdo_allocate_extended(struct dirty_lists, maximum_age,
2733 				       dirty_era_t, __func__,
2734 				       &zone->dirty_lists);
2735 	if (result != VDO_SUCCESS)
2736 		return result;
2737 
2738 	zone->dirty_lists->maximum_age = maximum_age;
2739 	INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_TREE_PAGE]);
2740 	INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_CACHE_PAGE]);
2741 
2742 	for (i = 0; i < maximum_age; i++) {
2743 		INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_TREE_PAGE]);
2744 		INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_CACHE_PAGE]);
2745 	}
2746 
2747 	result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->loading_pages);
2748 	if (result != VDO_SUCCESS)
2749 		return result;
2750 
2751 	result = make_vio_pool(vdo, BLOCK_MAP_VIO_POOL_SIZE,
2752 			       zone->thread_id, VIO_TYPE_BLOCK_MAP_INTERIOR,
2753 			       VIO_PRIORITY_METADATA, zone, &zone->vio_pool);
2754 	if (result != VDO_SUCCESS)
2755 		return result;
2756 
2757 	vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2758 
2759 	zone->page_cache.zone = zone;
2760 	zone->page_cache.vdo = vdo;
2761 	zone->page_cache.page_count = cache_size / map->zone_count;
2762 	zone->page_cache.stats.free_pages = zone->page_cache.page_count;
2763 
2764 	result = allocate_cache_components(&zone->page_cache);
2765 	if (result != VDO_SUCCESS)
2766 		return result;
2767 
2768 	/* initialize empty circular queues */
2769 	INIT_LIST_HEAD(&zone->page_cache.lru_list);
2770 	INIT_LIST_HEAD(&zone->page_cache.outgoing_list);
2771 
2772 	return VDO_SUCCESS;
2773 }
2774 
2775 /* Implements vdo_zone_thread_getter_fn */
2776 static thread_id_t get_block_map_zone_thread_id(void *context, zone_count_t zone_number)
2777 {
2778 	struct block_map *map = context;
2779 
2780 	return map->zones[zone_number].thread_id;
2781 }
2782 
2783 /* Implements vdo_action_preamble_fn */
2784 static void prepare_for_era_advance(void *context, struct vdo_completion *parent)
2785 {
2786 	struct block_map *map = context;
2787 
2788 	map->current_era_point = map->pending_era_point;
2789 	vdo_finish_completion(parent);
2790 }
2791 
2792 /* Implements vdo_zone_action_fn */
2793 static void advance_block_map_zone_era(void *context, zone_count_t zone_number,
2794 				       struct vdo_completion *parent)
2795 {
2796 	struct block_map *map = context;
2797 	struct block_map_zone *zone = &map->zones[zone_number];
2798 
2799 	update_period(zone->dirty_lists, map->current_era_point);
2800 	write_expired_elements(zone);
2801 	vdo_finish_completion(parent);
2802 }
2803 
2804 /*
2805  * Schedule an era advance if necessary. This method should not be called directly. Rather, call
2806  * vdo_schedule_default_action() on the block map's action manager.
2807  *
2808  * Implements vdo_action_scheduler_fn.
2809  */
2810 static bool schedule_era_advance(void *context)
2811 {
2812 	struct block_map *map = context;
2813 
2814 	if (map->current_era_point == map->pending_era_point)
2815 		return false;
2816 
2817 	return vdo_schedule_action(map->action_manager, prepare_for_era_advance,
2818 				   advance_block_map_zone_era, NULL, NULL);
2819 }
2820 
2821 static void uninitialize_block_map_zone(struct block_map_zone *zone)
2822 {
2823 	struct vdo_page_cache *cache = &zone->page_cache;
2824 
2825 	vdo_free(vdo_forget(zone->dirty_lists));
2826 	free_vio_pool(vdo_forget(zone->vio_pool));
2827 	vdo_int_map_free(vdo_forget(zone->loading_pages));
2828 	if (cache->infos != NULL) {
2829 		struct page_info *info;
2830 
2831 		for (info = cache->infos; info < cache->infos + cache->page_count; info++)
2832 			free_vio(vdo_forget(info->vio));
2833 	}
2834 
2835 	vdo_int_map_free(vdo_forget(cache->page_map));
2836 	vdo_free(vdo_forget(cache->infos));
2837 	vdo_free(vdo_forget(cache->pages));
2838 }
2839 
2840 void vdo_free_block_map(struct block_map *map)
2841 {
2842 	zone_count_t zone;
2843 
2844 	if (map == NULL)
2845 		return;
2846 
2847 	for (zone = 0; zone < map->zone_count; zone++)
2848 		uninitialize_block_map_zone(&map->zones[zone]);
2849 
2850 	vdo_abandon_block_map_growth(map);
2851 	if (map->forest != NULL)
2852 		deforest(vdo_forget(map->forest), 0);
2853 	vdo_free(vdo_forget(map->action_manager));
2854 	vdo_free(map);
2855 }
2856 
2857 /* @journal may be NULL. */
2858 int vdo_decode_block_map(struct block_map_state_2_0 state, block_count_t logical_blocks,
2859 			 struct vdo *vdo, struct recovery_journal *journal,
2860 			 nonce_t nonce, page_count_t cache_size, block_count_t maximum_age,
2861 			 struct block_map **map_ptr)
2862 {
2863 	struct block_map *map;
2864 	int result;
2865 	zone_count_t zone = 0;
2866 
2867 	BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE !=
2868 		     ((VDO_BLOCK_SIZE - sizeof(struct block_map_page)) /
2869 		      sizeof(struct block_map_entry)));
2870 	result = VDO_ASSERT(cache_size > 0, "block map cache size is specified");
2871 	if (result != VDO_SUCCESS)
2872 		return result;
2873 
2874 	result = vdo_allocate_extended(struct block_map,
2875 				       vdo->thread_config.logical_zone_count,
2876 				       struct block_map_zone, __func__, &map);
2877 	if (result != VDO_SUCCESS)
2878 		return result;
2879 
2880 	map->vdo = vdo;
2881 	map->root_origin = state.root_origin;
2882 	map->root_count = state.root_count;
2883 	map->entry_count = logical_blocks;
2884 	map->journal = journal;
2885 	map->nonce = nonce;
2886 
2887 	result = make_forest(map, map->entry_count);
2888 	if (result != VDO_SUCCESS) {
2889 		vdo_free_block_map(map);
2890 		return result;
2891 	}
2892 
2893 	replace_forest(map);
2894 
2895 	map->zone_count = vdo->thread_config.logical_zone_count;
2896 	for (zone = 0; zone < map->zone_count; zone++) {
2897 		result = initialize_block_map_zone(map, zone, cache_size, maximum_age);
2898 		if (result != VDO_SUCCESS) {
2899 			vdo_free_block_map(map);
2900 			return result;
2901 		}
2902 	}
2903 
2904 	result = vdo_make_action_manager(map->zone_count, get_block_map_zone_thread_id,
2905 					 vdo_get_recovery_journal_thread_id(journal),
2906 					 map, schedule_era_advance, vdo,
2907 					 &map->action_manager);
2908 	if (result != VDO_SUCCESS) {
2909 		vdo_free_block_map(map);
2910 		return result;
2911 	}
2912 
2913 	*map_ptr = map;
2914 	return VDO_SUCCESS;
2915 }
2916 
2917 struct block_map_state_2_0 vdo_record_block_map(const struct block_map *map)
2918 {
2919 	return (struct block_map_state_2_0) {
2920 		.flat_page_origin = VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN,
2921 		/* This is the flat page count, which has turned out to always be 0. */
2922 		.flat_page_count = 0,
2923 		.root_origin = map->root_origin,
2924 		.root_count = map->root_count,
2925 	};
2926 }
2927 
2928 /* The block map needs to know the journals' sequence number to initialize the eras. */
2929 void vdo_initialize_block_map_from_journal(struct block_map *map,
2930 					   struct recovery_journal *journal)
2931 {
2932 	zone_count_t z = 0;
2933 
2934 	map->current_era_point = vdo_get_recovery_journal_current_sequence_number(journal);
2935 	map->pending_era_point = map->current_era_point;
2936 
2937 	for (z = 0; z < map->zone_count; z++) {
2938 		struct dirty_lists *dirty_lists = map->zones[z].dirty_lists;
2939 
2940 		VDO_ASSERT_LOG_ONLY(dirty_lists->next_period == 0, "current period not set");
2941 		dirty_lists->oldest_period = map->current_era_point;
2942 		dirty_lists->next_period = map->current_era_point + 1;
2943 		dirty_lists->offset = map->current_era_point % dirty_lists->maximum_age;
2944 	}
2945 }
2946 
2947 /* Compute the logical zone for the LBN of a data vio. */
2948 zone_count_t vdo_compute_logical_zone(struct data_vio *data_vio)
2949 {
2950 	struct block_map *map = vdo_from_data_vio(data_vio)->block_map;
2951 	struct tree_lock *tree_lock = &data_vio->tree_lock;
2952 	page_number_t page_number = data_vio->logical.lbn / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2953 
2954 	tree_lock->tree_slots[0].page_index = page_number;
2955 	tree_lock->root_index = page_number % map->root_count;
2956 	return (tree_lock->root_index % map->zone_count);
2957 }
2958 
2959 void vdo_advance_block_map_era(struct block_map *map,
2960 			       sequence_number_t recovery_block_number)
2961 {
2962 	if (map == NULL)
2963 		return;
2964 
2965 	map->pending_era_point = recovery_block_number;
2966 	vdo_schedule_default_action(map->action_manager);
2967 }
2968 
2969 /* Implements vdo_admin_initiator_fn */
2970 static void initiate_drain(struct admin_state *state)
2971 {
2972 	struct block_map_zone *zone = container_of(state, struct block_map_zone, state);
2973 
2974 	VDO_ASSERT_LOG_ONLY((zone->active_lookups == 0),
2975 			    "%s() called with no active lookups", __func__);
2976 
2977 	if (!vdo_is_state_suspending(state)) {
2978 		while (zone->dirty_lists->oldest_period < zone->dirty_lists->next_period)
2979 			expire_oldest_list(zone->dirty_lists);
2980 		write_expired_elements(zone);
2981 	}
2982 
2983 	check_for_drain_complete(zone);
2984 }
2985 
2986 /* Implements vdo_zone_action_fn. */
2987 static void drain_zone(void *context, zone_count_t zone_number,
2988 		       struct vdo_completion *parent)
2989 {
2990 	struct block_map *map = context;
2991 	struct block_map_zone *zone = &map->zones[zone_number];
2992 
2993 	vdo_start_draining(&zone->state,
2994 			   vdo_get_current_manager_operation(map->action_manager),
2995 			   parent, initiate_drain);
2996 }
2997 
2998 void vdo_drain_block_map(struct block_map *map, const struct admin_state_code *operation,
2999 			 struct vdo_completion *parent)
3000 {
3001 	vdo_schedule_operation(map->action_manager, operation, NULL, drain_zone, NULL,
3002 			       parent);
3003 }
3004 
3005 /* Implements vdo_zone_action_fn. */
3006 static void resume_block_map_zone(void *context, zone_count_t zone_number,
3007 				  struct vdo_completion *parent)
3008 {
3009 	struct block_map *map = context;
3010 	struct block_map_zone *zone = &map->zones[zone_number];
3011 
3012 	vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
3013 }
3014 
3015 void vdo_resume_block_map(struct block_map *map, struct vdo_completion *parent)
3016 {
3017 	vdo_schedule_operation(map->action_manager, VDO_ADMIN_STATE_RESUMING,
3018 			       NULL, resume_block_map_zone, NULL, parent);
3019 }
3020 
3021 /* Allocate an expanded collection of trees, for a future growth. */
3022 int vdo_prepare_to_grow_block_map(struct block_map *map,
3023 				  block_count_t new_logical_blocks)
3024 {
3025 	if (map->next_entry_count == new_logical_blocks)
3026 		return VDO_SUCCESS;
3027 
3028 	if (map->next_entry_count > 0)
3029 		vdo_abandon_block_map_growth(map);
3030 
3031 	if (new_logical_blocks < map->entry_count) {
3032 		map->next_entry_count = map->entry_count;
3033 		return VDO_SUCCESS;
3034 	}
3035 
3036 	return make_forest(map, new_logical_blocks);
3037 }
3038 
3039 /* Implements vdo_action_preamble_fn */
3040 static void grow_forest(void *context, struct vdo_completion *completion)
3041 {
3042 	replace_forest(context);
3043 	vdo_finish_completion(completion);
3044 }
3045 
3046 /* Requires vdo_prepare_to_grow_block_map() to have been previously called. */
3047 void vdo_grow_block_map(struct block_map *map, struct vdo_completion *parent)
3048 {
3049 	vdo_schedule_operation(map->action_manager,
3050 			       VDO_ADMIN_STATE_SUSPENDED_OPERATION,
3051 			       grow_forest, NULL, NULL, parent);
3052 }
3053 
3054 void vdo_abandon_block_map_growth(struct block_map *map)
3055 {
3056 	struct forest *forest = vdo_forget(map->next_forest);
3057 
3058 	if (forest != NULL)
3059 		deforest(forest, forest->segments - 1);
3060 
3061 	map->next_entry_count = 0;
3062 }
3063 
3064 /* Release the page completion and then continue the requester. */
3065 static inline void finish_processing_page(struct vdo_completion *completion, int result)
3066 {
3067 	struct vdo_completion *parent = completion->parent;
3068 
3069 	vdo_release_page_completion(completion);
3070 	vdo_continue_completion(parent, result);
3071 }
3072 
3073 static void handle_page_error(struct vdo_completion *completion)
3074 {
3075 	finish_processing_page(completion, completion->result);
3076 }
3077 
3078 /* Fetch the mapping page for a block map update, and call the provided handler when fetched. */
3079 static void fetch_mapping_page(struct data_vio *data_vio, bool modifiable,
3080 			       vdo_action_fn action)
3081 {
3082 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3083 
3084 	if (vdo_is_state_draining(&zone->state)) {
3085 		continue_data_vio_with_error(data_vio, VDO_SHUTTING_DOWN);
3086 		return;
3087 	}
3088 
3089 	vdo_get_page(&data_vio->page_completion, zone,
3090 		     data_vio->tree_lock.tree_slots[0].block_map_slot.pbn,
3091 		     modifiable, &data_vio->vio.completion,
3092 		     action, handle_page_error, false);
3093 }
3094 
3095 /**
3096  * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped.
3097  *
3098  * This indicates the block map entry for the logical block is either unmapped or corrupted.
3099  */
3100 static void clear_mapped_location(struct data_vio *data_vio)
3101 {
3102 	data_vio->mapped = (struct zoned_pbn) {
3103 		.state = VDO_MAPPING_STATE_UNMAPPED,
3104 	};
3105 }
3106 
3107 /**
3108  * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a
3109  *                         data_vio.
3110  *
3111  * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any
3112  *         other failure
3113  */
3114 static int __must_check set_mapped_location(struct data_vio *data_vio,
3115 					    const struct block_map_entry *entry)
3116 {
3117 	/* Unpack the PBN for logging purposes even if the entry is invalid. */
3118 	struct data_location mapped = vdo_unpack_block_map_entry(entry);
3119 
3120 	if (vdo_is_valid_location(&mapped)) {
3121 		int result;
3122 
3123 		result = vdo_get_physical_zone(vdo_from_data_vio(data_vio),
3124 					       mapped.pbn, &data_vio->mapped.zone);
3125 		if (result == VDO_SUCCESS) {
3126 			data_vio->mapped.pbn = mapped.pbn;
3127 			data_vio->mapped.state = mapped.state;
3128 			return VDO_SUCCESS;
3129 		}
3130 
3131 		/*
3132 		 * Return all errors not specifically known to be errors from validating the
3133 		 * location.
3134 		 */
3135 		if ((result != VDO_OUT_OF_RANGE) && (result != VDO_BAD_MAPPING))
3136 			return result;
3137 	}
3138 
3139 	/*
3140 	 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases
3141 	 * to VDO_BAD_MAPPING.
3142 	 */
3143 	vdo_log_error_strerror(VDO_BAD_MAPPING,
3144 			       "PBN %llu with state %u read from the block map was invalid",
3145 			       (unsigned long long) mapped.pbn, mapped.state);
3146 
3147 	/*
3148 	 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding
3149 	 * known data loss.
3150 	 */
3151 	if (!data_vio->write)
3152 		return VDO_BAD_MAPPING;
3153 
3154 	/*
3155 	 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped
3156 	 * entry rather than fail the write.
3157 	 */
3158 	clear_mapped_location(data_vio);
3159 	return VDO_SUCCESS;
3160 }
3161 
3162 /* This callback is registered in vdo_get_mapped_block(). */
3163 static void get_mapping_from_fetched_page(struct vdo_completion *completion)
3164 {
3165 	int result;
3166 	struct vdo_page_completion *vpc = as_vdo_page_completion(completion);
3167 	const struct block_map_page *page;
3168 	const struct block_map_entry *entry;
3169 	struct data_vio *data_vio = as_data_vio(completion->parent);
3170 	struct block_map_tree_slot *tree_slot;
3171 
3172 	if (completion->result != VDO_SUCCESS) {
3173 		finish_processing_page(completion, completion->result);
3174 		return;
3175 	}
3176 
3177 	result = validate_completed_page(vpc, false);
3178 	if (result != VDO_SUCCESS) {
3179 		finish_processing_page(completion, result);
3180 		return;
3181 	}
3182 
3183 	page = (const struct block_map_page *) get_page_buffer(vpc->info);
3184 	tree_slot = &data_vio->tree_lock.tree_slots[0];
3185 	entry = &page->entries[tree_slot->block_map_slot.slot];
3186 
3187 	result = set_mapped_location(data_vio, entry);
3188 	finish_processing_page(completion, result);
3189 }
3190 
3191 void vdo_update_block_map_page(struct block_map_page *page, struct data_vio *data_vio,
3192 			       physical_block_number_t pbn,
3193 			       enum block_mapping_state mapping_state,
3194 			       sequence_number_t *recovery_lock)
3195 {
3196 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3197 	struct block_map *block_map = zone->block_map;
3198 	struct recovery_journal *journal = block_map->journal;
3199 	sequence_number_t old_locked, new_locked;
3200 	struct tree_lock *tree_lock = &data_vio->tree_lock;
3201 
3202 	/* Encode the new mapping. */
3203 	page->entries[tree_lock->tree_slots[tree_lock->height].block_map_slot.slot] =
3204 		vdo_pack_block_map_entry(pbn, mapping_state);
3205 
3206 	/* Adjust references on the recovery journal blocks. */
3207 	old_locked = *recovery_lock;
3208 	new_locked = data_vio->recovery_sequence_number;
3209 
3210 	if ((old_locked == 0) || (old_locked > new_locked)) {
3211 		vdo_acquire_recovery_journal_block_reference(journal, new_locked,
3212 							     VDO_ZONE_TYPE_LOGICAL,
3213 							     zone->zone_number);
3214 
3215 		if (old_locked > 0) {
3216 			vdo_release_recovery_journal_block_reference(journal, old_locked,
3217 								     VDO_ZONE_TYPE_LOGICAL,
3218 								     zone->zone_number);
3219 		}
3220 
3221 		*recovery_lock = new_locked;
3222 	}
3223 
3224 	/*
3225 	 * FIXME: explain this more
3226 	 * Release the transferred lock from the data_vio.
3227 	 */
3228 	vdo_release_journal_entry_lock(journal, new_locked);
3229 	data_vio->recovery_sequence_number = 0;
3230 }
3231 
3232 static void put_mapping_in_fetched_page(struct vdo_completion *completion)
3233 {
3234 	struct data_vio *data_vio = as_data_vio(completion->parent);
3235 	sequence_number_t old_lock;
3236 	struct vdo_page_completion *vpc;
3237 	struct page_info *info;
3238 	int result;
3239 
3240 	if (completion->result != VDO_SUCCESS) {
3241 		finish_processing_page(completion, completion->result);
3242 		return;
3243 	}
3244 
3245 	vpc = as_vdo_page_completion(completion);
3246 	result = validate_completed_page(vpc, true);
3247 	if (result != VDO_SUCCESS) {
3248 		finish_processing_page(completion, result);
3249 		return;
3250 	}
3251 
3252 	info = vpc->info;
3253 	old_lock = info->recovery_lock;
3254 	vdo_update_block_map_page((struct block_map_page *) get_page_buffer(info),
3255 				  data_vio, data_vio->new_mapped.pbn,
3256 				  data_vio->new_mapped.state, &info->recovery_lock);
3257 	set_info_state(info, PS_DIRTY);
3258 	add_to_dirty_lists(info->cache->zone, &info->state_entry,
3259 			   VDO_CACHE_PAGE, old_lock, info->recovery_lock);
3260 	finish_processing_page(completion, VDO_SUCCESS);
3261 }
3262 
3263 /* Read a stored block mapping into a data_vio. */
3264 void vdo_get_mapped_block(struct data_vio *data_vio)
3265 {
3266 	if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
3267 		/*
3268 		 * We know that the block map page for this LBN has not been allocated, so the
3269 		 * block must be unmapped.
3270 		 */
3271 		clear_mapped_location(data_vio);
3272 		continue_data_vio(data_vio);
3273 		return;
3274 	}
3275 
3276 	fetch_mapping_page(data_vio, false, get_mapping_from_fetched_page);
3277 }
3278 
3279 /* Update a stored block mapping to reflect a data_vio's new mapping. */
3280 void vdo_put_mapped_block(struct data_vio *data_vio)
3281 {
3282 	fetch_mapping_page(data_vio, true, put_mapping_in_fetched_page);
3283 }
3284 
3285 struct block_map_statistics vdo_get_block_map_statistics(struct block_map *map)
3286 {
3287 	zone_count_t zone = 0;
3288 	struct block_map_statistics totals;
3289 
3290 	memset(&totals, 0, sizeof(struct block_map_statistics));
3291 	for (zone = 0; zone < map->zone_count; zone++) {
3292 		const struct block_map_statistics *stats =
3293 			&(map->zones[zone].page_cache.stats);
3294 
3295 		totals.dirty_pages += READ_ONCE(stats->dirty_pages);
3296 		totals.clean_pages += READ_ONCE(stats->clean_pages);
3297 		totals.free_pages += READ_ONCE(stats->free_pages);
3298 		totals.failed_pages += READ_ONCE(stats->failed_pages);
3299 		totals.incoming_pages += READ_ONCE(stats->incoming_pages);
3300 		totals.outgoing_pages += READ_ONCE(stats->outgoing_pages);
3301 		totals.cache_pressure += READ_ONCE(stats->cache_pressure);
3302 		totals.read_count += READ_ONCE(stats->read_count);
3303 		totals.write_count += READ_ONCE(stats->write_count);
3304 		totals.failed_reads += READ_ONCE(stats->failed_reads);
3305 		totals.failed_writes += READ_ONCE(stats->failed_writes);
3306 		totals.reclaimed += READ_ONCE(stats->reclaimed);
3307 		totals.read_outgoing += READ_ONCE(stats->read_outgoing);
3308 		totals.found_in_cache += READ_ONCE(stats->found_in_cache);
3309 		totals.discard_required += READ_ONCE(stats->discard_required);
3310 		totals.wait_for_page += READ_ONCE(stats->wait_for_page);
3311 		totals.fetch_required += READ_ONCE(stats->fetch_required);
3312 		totals.pages_loaded += READ_ONCE(stats->pages_loaded);
3313 		totals.pages_saved += READ_ONCE(stats->pages_saved);
3314 		totals.flush_count += READ_ONCE(stats->flush_count);
3315 	}
3316 
3317 	return totals;
3318 }
3319