xref: /linux/drivers/md/dm-vdo/block-map.c (revision d358e5254674b70f34c847715ca509e46eb81e6f)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "block-map.h"
7 
8 #include <linux/bio.h>
9 #include <linux/ratelimit.h>
10 
11 #include "errors.h"
12 #include "logger.h"
13 #include "memory-alloc.h"
14 #include "permassert.h"
15 
16 #include "action-manager.h"
17 #include "admin-state.h"
18 #include "completion.h"
19 #include "constants.h"
20 #include "data-vio.h"
21 #include "encodings.h"
22 #include "io-submitter.h"
23 #include "physical-zone.h"
24 #include "recovery-journal.h"
25 #include "slab-depot.h"
26 #include "status-codes.h"
27 #include "types.h"
28 #include "vdo.h"
29 #include "vio.h"
30 #include "wait-queue.h"
31 
32 /**
33  * DOC: Block map eras
34  *
35  * The block map era, or maximum age, is used as follows:
36  *
37  * Each block map page, when dirty, records the earliest recovery journal block sequence number of
38  * the changes reflected in that dirty block. Sequence numbers are classified into eras: every
39  * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras
40  * according to the sequence number they record.
41  *
42  * In the current (newest) era, block map pages are not written unless there is cache pressure. In
43  * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in
44  * this era are issued for write. In all older eras, pages are issued for write immediately.
45  */
46 
47 struct page_descriptor {
48 	root_count_t root_index;
49 	height_t height;
50 	page_number_t page_index;
51 	slot_number_t slot;
52 } __packed;
53 
54 union page_key {
55 	struct page_descriptor descriptor;
56 	u64 key;
57 };
58 
59 struct write_if_not_dirtied_context {
60 	struct block_map_zone *zone;
61 	u8 generation;
62 };
63 
64 struct block_map_tree_segment {
65 	struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT];
66 };
67 
68 struct block_map_tree {
69 	struct block_map_tree_segment *segments;
70 };
71 
72 struct forest {
73 	struct block_map *map;
74 	size_t segments;
75 	struct boundary *boundaries;
76 	struct tree_page **pages;
77 	struct block_map_tree trees[];
78 };
79 
80 struct cursor_level {
81 	page_number_t page_index;
82 	slot_number_t slot;
83 };
84 
85 struct cursors;
86 
87 struct cursor {
88 	struct vdo_waiter waiter;
89 	struct block_map_tree *tree;
90 	height_t height;
91 	struct cursors *parent;
92 	struct boundary boundary;
93 	struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT];
94 	struct pooled_vio *vio;
95 };
96 
97 struct cursors {
98 	struct block_map_zone *zone;
99 	struct vio_pool *pool;
100 	vdo_entry_callback_fn entry_callback;
101 	struct vdo_completion *completion;
102 	root_count_t active_roots;
103 	struct cursor cursors[];
104 };
105 
106 static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF;
107 
108 /* Used to indicate that the page holding the location of a tree root has been "loaded". */
109 static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF;
110 
111 const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = {
112 	.mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F,
113 	.pbn_high_nibble = 0,
114 	.pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX),
115 };
116 
117 #define LOG_INTERVAL 4000
118 #define DISPLAY_INTERVAL 100000
119 
120 /*
121  * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread.
122  * Prevents any compiler shenanigans from affecting other threads reading those stats.
123  */
124 #define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta))
125 
is_dirty(const struct page_info * info)126 static inline bool is_dirty(const struct page_info *info)
127 {
128 	return info->state == PS_DIRTY;
129 }
130 
is_present(const struct page_info * info)131 static inline bool is_present(const struct page_info *info)
132 {
133 	return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY);
134 }
135 
is_in_flight(const struct page_info * info)136 static inline bool is_in_flight(const struct page_info *info)
137 {
138 	return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING);
139 }
140 
is_incoming(const struct page_info * info)141 static inline bool is_incoming(const struct page_info *info)
142 {
143 	return info->state == PS_INCOMING;
144 }
145 
is_outgoing(const struct page_info * info)146 static inline bool is_outgoing(const struct page_info *info)
147 {
148 	return info->state == PS_OUTGOING;
149 }
150 
is_valid(const struct page_info * info)151 static inline bool is_valid(const struct page_info *info)
152 {
153 	return is_present(info) || is_outgoing(info);
154 }
155 
get_page_buffer(struct page_info * info)156 static char *get_page_buffer(struct page_info *info)
157 {
158 	struct vdo_page_cache *cache = info->cache;
159 
160 	return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE];
161 }
162 
page_completion_from_waiter(struct vdo_waiter * waiter)163 static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter)
164 {
165 	struct vdo_page_completion *completion;
166 
167 	if (waiter == NULL)
168 		return NULL;
169 
170 	completion = container_of(waiter, struct vdo_page_completion, waiter);
171 	vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION);
172 	return completion;
173 }
174 
175 /**
176  * initialize_info() - Initialize all page info structures and put them on the free list.
177  * @cache: The page cache.
178  *
179  * Return: VDO_SUCCESS or an error.
180  */
initialize_info(struct vdo_page_cache * cache)181 static int initialize_info(struct vdo_page_cache *cache)
182 {
183 	struct page_info *info;
184 
185 	INIT_LIST_HEAD(&cache->free_list);
186 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
187 		int result;
188 
189 		info->cache = cache;
190 		info->state = PS_FREE;
191 		info->pbn = NO_PAGE;
192 
193 		result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP,
194 					     VIO_PRIORITY_METADATA, info,
195 					     get_page_buffer(info), &info->vio);
196 		if (result != VDO_SUCCESS)
197 			return result;
198 
199 		/* The thread ID should never change. */
200 		info->vio->completion.callback_thread_id = cache->zone->thread_id;
201 
202 		INIT_LIST_HEAD(&info->state_entry);
203 		list_add_tail(&info->state_entry, &cache->free_list);
204 		INIT_LIST_HEAD(&info->lru_entry);
205 	}
206 
207 	return VDO_SUCCESS;
208 }
209 
210 /**
211  * allocate_cache_components() - Allocate components of the cache which require their own
212  *                               allocation.
213  * @cache: The page cache.
214  *
215  * The caller is responsible for all clean up on errors.
216  *
217  * Return: VDO_SUCCESS or an error code.
218  */
allocate_cache_components(struct vdo_page_cache * cache)219 static int __must_check allocate_cache_components(struct vdo_page_cache *cache)
220 {
221 	u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE;
222 	int result;
223 
224 	result = vdo_allocate(cache->page_count, struct page_info, "page infos",
225 			      &cache->infos);
226 	if (result != VDO_SUCCESS)
227 		return result;
228 
229 	result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages);
230 	if (result != VDO_SUCCESS)
231 		return result;
232 
233 	result = vdo_int_map_create(cache->page_count, &cache->page_map);
234 	if (result != VDO_SUCCESS)
235 		return result;
236 
237 	return initialize_info(cache);
238 }
239 
240 /**
241  * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's
242  *                            thread.
243  * @cache: The page cache.
244  * @function_name: The funtion name to report if the assertion fails.
245  */
assert_on_cache_thread(struct vdo_page_cache * cache,const char * function_name)246 static inline void assert_on_cache_thread(struct vdo_page_cache *cache,
247 					  const char *function_name)
248 {
249 	thread_id_t thread_id = vdo_get_callback_thread_id();
250 
251 	VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id),
252 			    "%s() must only be called on cache thread %d, not thread %d",
253 			    function_name, cache->zone->thread_id, thread_id);
254 }
255 
256 /** assert_io_allowed() - Assert that a page cache may issue I/O. */
assert_io_allowed(struct vdo_page_cache * cache)257 static inline void assert_io_allowed(struct vdo_page_cache *cache)
258 {
259 	VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state),
260 			    "VDO page cache may issue I/O");
261 }
262 
263 /** report_cache_pressure() - Log and, if enabled, report cache pressure. */
report_cache_pressure(struct vdo_page_cache * cache)264 static void report_cache_pressure(struct vdo_page_cache *cache)
265 {
266 	ADD_ONCE(cache->stats.cache_pressure, 1);
267 	if (cache->waiter_count > cache->page_count) {
268 		if ((cache->pressure_report % LOG_INTERVAL) == 0)
269 			vdo_log_info("page cache pressure %u", cache->stats.cache_pressure);
270 
271 		if (++cache->pressure_report >= DISPLAY_INTERVAL)
272 			cache->pressure_report = 0;
273 	}
274 }
275 
276 /**
277  * get_page_state_name() - Return the name of a page state.
278  * @state: The page state to describe.
279  *
280  * If the page state is invalid a static string is returned and the invalid state is logged.
281  *
282  * Return: A pointer to a static page state name.
283  */
get_page_state_name(enum vdo_page_buffer_state state)284 static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state)
285 {
286 	int result;
287 	static const char * const state_names[] = {
288 		"FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING"
289 	};
290 
291 	BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT);
292 
293 	result = VDO_ASSERT(state < ARRAY_SIZE(state_names),
294 			    "Unknown page_state value %d", state);
295 	if (result != VDO_SUCCESS)
296 		return "[UNKNOWN PAGE STATE]";
297 
298 	return state_names[state];
299 }
300 
301 /**
302  * update_counter() - Update the counter associated with a given state.
303  * @info: The page info to count.
304  * @delta: The delta to apply to the counter.
305  */
update_counter(struct page_info * info,s32 delta)306 static void update_counter(struct page_info *info, s32 delta)
307 {
308 	struct block_map_statistics *stats = &info->cache->stats;
309 
310 	switch (info->state) {
311 	case PS_FREE:
312 		ADD_ONCE(stats->free_pages, delta);
313 		return;
314 
315 	case PS_INCOMING:
316 		ADD_ONCE(stats->incoming_pages, delta);
317 		return;
318 
319 	case PS_OUTGOING:
320 		ADD_ONCE(stats->outgoing_pages, delta);
321 		return;
322 
323 	case PS_FAILED:
324 		ADD_ONCE(stats->failed_pages, delta);
325 		return;
326 
327 	case PS_RESIDENT:
328 		ADD_ONCE(stats->clean_pages, delta);
329 		return;
330 
331 	case PS_DIRTY:
332 		ADD_ONCE(stats->dirty_pages, delta);
333 		return;
334 
335 	default:
336 		return;
337 	}
338 }
339 
340 /** update_lru() - Update the lru information for an active page. */
update_lru(struct page_info * info)341 static void update_lru(struct page_info *info)
342 {
343 	if (info->cache->lru_list.prev != &info->lru_entry)
344 		list_move_tail(&info->lru_entry, &info->cache->lru_list);
345 }
346 
347 /**
348  * set_info_state() - Set the state of a page_info and put it on the right list, adjusting
349  *                    counters.
350  * @info: The page info to update.
351  * @new_state: The new state to set.
352  */
set_info_state(struct page_info * info,enum vdo_page_buffer_state new_state)353 static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state)
354 {
355 	if (new_state == info->state)
356 		return;
357 
358 	update_counter(info, -1);
359 	info->state = new_state;
360 	update_counter(info, 1);
361 
362 	switch (info->state) {
363 	case PS_FREE:
364 	case PS_FAILED:
365 		list_move_tail(&info->state_entry, &info->cache->free_list);
366 		return;
367 
368 	case PS_OUTGOING:
369 		list_move_tail(&info->state_entry, &info->cache->outgoing_list);
370 		return;
371 
372 	case PS_DIRTY:
373 		return;
374 
375 	default:
376 		list_del_init(&info->state_entry);
377 	}
378 }
379 
380 /** set_info_pbn() - Set the pbn for an info, updating the map as needed. */
set_info_pbn(struct page_info * info,physical_block_number_t pbn)381 static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn)
382 {
383 	struct vdo_page_cache *cache = info->cache;
384 
385 	/* Either the new or the old page number must be NO_PAGE. */
386 	int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE),
387 				"Must free a page before reusing it.");
388 	if (result != VDO_SUCCESS)
389 		return result;
390 
391 	if (info->pbn != NO_PAGE)
392 		vdo_int_map_remove(cache->page_map, info->pbn);
393 
394 	info->pbn = pbn;
395 
396 	if (pbn != NO_PAGE) {
397 		result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL);
398 		if (result != VDO_SUCCESS)
399 			return result;
400 	}
401 	return VDO_SUCCESS;
402 }
403 
404 /** reset_page_info() - Reset page info to represent an unallocated page. */
reset_page_info(struct page_info * info)405 static int reset_page_info(struct page_info *info)
406 {
407 	int result;
408 
409 	result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy");
410 	if (result != VDO_SUCCESS)
411 		return result;
412 
413 	result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting),
414 			    "VDO Page must not have waiters");
415 	if (result != VDO_SUCCESS)
416 		return result;
417 
418 	result = set_info_pbn(info, NO_PAGE);
419 	set_info_state(info, PS_FREE);
420 	list_del_init(&info->lru_entry);
421 	return result;
422 }
423 
424 /**
425  * find_free_page() - Find a free page.
426  * @cache: The page cache.
427  *
428  * Return: A pointer to the page info structure (if found), NULL otherwise.
429  */
find_free_page(struct vdo_page_cache * cache)430 static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache)
431 {
432 	struct page_info *info;
433 
434 	info = list_first_entry_or_null(&cache->free_list, struct page_info,
435 					state_entry);
436 	if (info != NULL)
437 		list_del_init(&info->state_entry);
438 
439 	return info;
440 }
441 
442 /**
443  * find_page() - Find the page info (if any) associated with a given pbn.
444  * @cache: The page cache.
445  * @pbn: The absolute physical block number of the page.
446  *
447  * Return: The page info for the page if available, or NULL if not.
448  */
find_page(struct vdo_page_cache * cache,physical_block_number_t pbn)449 static struct page_info * __must_check find_page(struct vdo_page_cache *cache,
450 						 physical_block_number_t pbn)
451 {
452 	if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn))
453 		return cache->last_found;
454 
455 	cache->last_found = vdo_int_map_get(cache->page_map, pbn);
456 	return cache->last_found;
457 }
458 
459 /**
460  * select_lru_page() - Determine which page is least recently used.
461  * @cache: The page cache.
462  *
463  * Picks the least recently used from among the non-busy entries at the front of each of the lru
464  * list. Since whenever we mark a page busy we also put it to the end of the list it is unlikely
465  * that the entries at the front are busy unless the queue is very short, but not impossible.
466  *
467  * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be
468  *         found. The page can be dirty or resident.
469  */
select_lru_page(struct vdo_page_cache * cache)470 static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache)
471 {
472 	struct page_info *info;
473 
474 	list_for_each_entry(info, &cache->lru_list, lru_entry)
475 		if ((info->busy == 0) && !is_in_flight(info))
476 			return info;
477 
478 	return NULL;
479 }
480 
481 /* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */
482 
483 /**
484  * complete_with_page() - Helper to complete the VDO Page Completion request successfully.
485  * @info: The page info representing the result page.
486  * @vdo_page_comp: The VDO page completion to complete.
487  */
complete_with_page(struct page_info * info,struct vdo_page_completion * vdo_page_comp)488 static void complete_with_page(struct page_info *info,
489 			       struct vdo_page_completion *vdo_page_comp)
490 {
491 	bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info);
492 
493 	if (!available) {
494 		vdo_log_error_strerror(VDO_BAD_PAGE,
495 				       "Requested cache page %llu in state %s is not %s",
496 				       (unsigned long long) info->pbn,
497 				       get_page_state_name(info->state),
498 				       vdo_page_comp->writable ? "present" : "valid");
499 		vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE);
500 		return;
501 	}
502 
503 	vdo_page_comp->info = info;
504 	vdo_page_comp->ready = true;
505 	vdo_finish_completion(&vdo_page_comp->completion);
506 }
507 
508 /**
509  * complete_waiter_with_error() - Complete a page completion with an error code.
510  * @waiter: The page completion, as a waiter.
511  * @result_ptr: A pointer to the error code.
512  *
513  * Implements waiter_callback_fn.
514  */
complete_waiter_with_error(struct vdo_waiter * waiter,void * result_ptr)515 static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr)
516 {
517 	int *result = result_ptr;
518 
519 	vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result);
520 }
521 
522 /**
523  * complete_waiter_with_page() - Complete a page completion with a page.
524  * @waiter: The page completion, as a waiter.
525  * @page_info: The page info to complete with.
526  *
527  * Implements waiter_callback_fn.
528  */
complete_waiter_with_page(struct vdo_waiter * waiter,void * page_info)529 static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info)
530 {
531 	complete_with_page(page_info, page_completion_from_waiter(waiter));
532 }
533 
534 /**
535  * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result.
536  * @info: The loaded page info.
537  * @waitq: The list of waiting data_vios.
538  *
539  * Upon completion the waitq will be empty.
540  *
541  * Return: The number of pages distributed.
542  */
distribute_page_over_waitq(struct page_info * info,struct vdo_wait_queue * waitq)543 static unsigned int distribute_page_over_waitq(struct page_info *info,
544 					       struct vdo_wait_queue *waitq)
545 {
546 	size_t num_pages;
547 
548 	update_lru(info);
549 	num_pages = vdo_waitq_num_waiters(waitq);
550 
551 	/*
552 	 * Increment the busy count once for each pending completion so that this page does not
553 	 * stop being busy until all completions have been processed.
554 	 */
555 	info->busy += num_pages;
556 
557 	vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info);
558 	return num_pages;
559 }
560 
561 /**
562  * set_persistent_error() - Set a persistent error which all requests will receive in the future.
563  * @cache: The page cache.
564  * @context: A string describing what triggered the error.
565  * @result: The error result to set on the cache.
566  *
567  * Once triggered, all enqueued completions will get this error. Any future requests will result in
568  * this error as well.
569  */
set_persistent_error(struct vdo_page_cache * cache,const char * context,int result)570 static void set_persistent_error(struct vdo_page_cache *cache, const char *context,
571 				 int result)
572 {
573 	struct page_info *info;
574 	/* If we're already read-only, there's no need to log. */
575 	struct vdo *vdo = cache->vdo;
576 
577 	if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) {
578 		vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s",
579 				       context);
580 		vdo_enter_read_only_mode(vdo, result);
581 	}
582 
583 	assert_on_cache_thread(cache, __func__);
584 
585 	vdo_waitq_notify_all_waiters(&cache->free_waiters,
586 				     complete_waiter_with_error, &result);
587 	cache->waiter_count = 0;
588 
589 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
590 		vdo_waitq_notify_all_waiters(&info->waiting,
591 					     complete_waiter_with_error, &result);
592 	}
593 }
594 
595 /**
596  * validate_completed_page() - Check that a page completion which is being freed to the cache
597  *                             referred to a valid page and is in a valid state.
598  * @completion: The page completion to check.
599  * @writable: Whether a writable page is required.
600  *
601  * Return: VDO_SUCCESS if the page was valid, otherwise as error
602  */
validate_completed_page(struct vdo_page_completion * completion,bool writable)603 static int __must_check validate_completed_page(struct vdo_page_completion *completion,
604 						bool writable)
605 {
606 	int result;
607 
608 	result = VDO_ASSERT(completion->ready, "VDO Page completion not ready");
609 	if (result != VDO_SUCCESS)
610 		return result;
611 
612 	result = VDO_ASSERT(completion->info != NULL,
613 			    "VDO Page Completion must be complete");
614 	if (result != VDO_SUCCESS)
615 		return result;
616 
617 	result = VDO_ASSERT(completion->info->pbn == completion->pbn,
618 			    "VDO Page Completion pbn must be consistent");
619 	if (result != VDO_SUCCESS)
620 		return result;
621 
622 	result = VDO_ASSERT(is_valid(completion->info),
623 			    "VDO Page Completion page must be valid");
624 	if (result != VDO_SUCCESS)
625 		return result;
626 
627 	if (writable) {
628 		result = VDO_ASSERT(completion->writable,
629 				    "VDO Page Completion must be writable");
630 		if (result != VDO_SUCCESS)
631 			return result;
632 	}
633 
634 	return VDO_SUCCESS;
635 }
636 
check_for_drain_complete(struct block_map_zone * zone)637 static void check_for_drain_complete(struct block_map_zone *zone)
638 {
639 	if (vdo_is_state_draining(&zone->state) &&
640 	    (zone->active_lookups == 0) &&
641 	    !vdo_waitq_has_waiters(&zone->flush_waiters) &&
642 	    !is_vio_pool_busy(zone->vio_pool) &&
643 	    (zone->page_cache.outstanding_reads == 0) &&
644 	    (zone->page_cache.outstanding_writes == 0)) {
645 		vdo_finish_draining_with_result(&zone->state,
646 						(vdo_is_read_only(zone->block_map->vdo) ?
647 						 VDO_READ_ONLY : VDO_SUCCESS));
648 	}
649 }
650 
enter_zone_read_only_mode(struct block_map_zone * zone,int result)651 static void enter_zone_read_only_mode(struct block_map_zone *zone, int result)
652 {
653 	vdo_enter_read_only_mode(zone->block_map->vdo, result);
654 
655 	/*
656 	 * We are in read-only mode, so we won't ever write any page out.
657 	 * Just take all waiters off the waitq so the zone can drain.
658 	 */
659 	vdo_waitq_init(&zone->flush_waiters);
660 	check_for_drain_complete(zone);
661 }
662 
663 static bool __must_check
validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion * completion,bool writable)664 validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion,
665 						bool writable)
666 {
667 	int result = validate_completed_page(completion, writable);
668 
669 	if (result == VDO_SUCCESS)
670 		return true;
671 
672 	enter_zone_read_only_mode(completion->info->cache->zone, result);
673 	return false;
674 }
675 
676 /**
677  * handle_load_error() - Handle page load errors.
678  * @completion: The page read vio.
679  */
handle_load_error(struct vdo_completion * completion)680 static void handle_load_error(struct vdo_completion *completion)
681 {
682 	int result = completion->result;
683 	struct page_info *info = completion->parent;
684 	struct vdo_page_cache *cache = info->cache;
685 
686 	assert_on_cache_thread(cache, __func__);
687 	vio_record_metadata_io_error(as_vio(completion));
688 	vdo_enter_read_only_mode(cache->zone->block_map->vdo, result);
689 	ADD_ONCE(cache->stats.failed_reads, 1);
690 	set_info_state(info, PS_FAILED);
691 	vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result);
692 	reset_page_info(info);
693 
694 	/*
695 	 * Don't decrement until right before calling check_for_drain_complete() to
696 	 * ensure that the above work can't cause the page cache to be freed out from under us.
697 	 */
698 	cache->outstanding_reads--;
699 	check_for_drain_complete(cache->zone);
700 }
701 
702 /**
703  * page_is_loaded() - Callback used when a page has been loaded.
704  * @completion: The vio which has loaded the page. Its parent is the page_info.
705  */
page_is_loaded(struct vdo_completion * completion)706 static void page_is_loaded(struct vdo_completion *completion)
707 {
708 	struct page_info *info = completion->parent;
709 	struct vdo_page_cache *cache = info->cache;
710 	nonce_t nonce = info->cache->zone->block_map->nonce;
711 	struct block_map_page *page;
712 	enum block_map_page_validity validity;
713 
714 	assert_on_cache_thread(cache, __func__);
715 
716 	page = (struct block_map_page *) get_page_buffer(info);
717 	validity = vdo_validate_block_map_page(page, nonce, info->pbn);
718 	if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
719 		physical_block_number_t pbn = vdo_get_block_map_page_pbn(page);
720 		int result = vdo_log_error_strerror(VDO_BAD_PAGE,
721 						    "Expected page %llu but got page %llu instead",
722 						    (unsigned long long) info->pbn,
723 						    (unsigned long long) pbn);
724 
725 		vdo_continue_completion(completion, result);
726 		return;
727 	}
728 
729 	if (validity == VDO_BLOCK_MAP_PAGE_INVALID)
730 		vdo_format_block_map_page(page, nonce, info->pbn, false);
731 
732 	info->recovery_lock = 0;
733 	set_info_state(info, PS_RESIDENT);
734 	distribute_page_over_waitq(info, &info->waiting);
735 
736 	/*
737 	 * Don't decrement until right before calling check_for_drain_complete() to
738 	 * ensure that the above work can't cause the page cache to be freed out from under us.
739 	 */
740 	cache->outstanding_reads--;
741 	check_for_drain_complete(cache->zone);
742 }
743 
744 /**
745  * handle_rebuild_read_error() - Handle a read error during a read-only rebuild.
746  * @completion: The page load completion.
747  */
handle_rebuild_read_error(struct vdo_completion * completion)748 static void handle_rebuild_read_error(struct vdo_completion *completion)
749 {
750 	struct page_info *info = completion->parent;
751 	struct vdo_page_cache *cache = info->cache;
752 
753 	assert_on_cache_thread(cache, __func__);
754 
755 	/*
756 	 * We are doing a read-only rebuild, so treat this as a successful read
757 	 * of an uninitialized page.
758 	 */
759 	vio_record_metadata_io_error(as_vio(completion));
760 	ADD_ONCE(cache->stats.failed_reads, 1);
761 	memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE);
762 	vdo_reset_completion(completion);
763 	page_is_loaded(completion);
764 }
765 
load_cache_page_endio(struct bio * bio)766 static void load_cache_page_endio(struct bio *bio)
767 {
768 	struct vio *vio = bio->bi_private;
769 	struct page_info *info = vio->completion.parent;
770 
771 	continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id);
772 }
773 
774 /**
775  * launch_page_load() - Begin the process of loading a page.
776  * @info: The page info to launch.
777  * @pbn: The absolute physical block number of the page to load.
778  *
779  * Return: VDO_SUCCESS or an error code.
780  */
launch_page_load(struct page_info * info,physical_block_number_t pbn)781 static int __must_check launch_page_load(struct page_info *info,
782 					 physical_block_number_t pbn)
783 {
784 	int result;
785 	vdo_action_fn callback;
786 	struct vdo_page_cache *cache = info->cache;
787 
788 	assert_io_allowed(cache);
789 
790 	result = set_info_pbn(info, pbn);
791 	if (result != VDO_SUCCESS)
792 		return result;
793 
794 	result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading.");
795 	if (result != VDO_SUCCESS)
796 		return result;
797 
798 	set_info_state(info, PS_INCOMING);
799 	cache->outstanding_reads++;
800 	ADD_ONCE(cache->stats.pages_loaded, 1);
801 	callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error);
802 	vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio,
803 				callback, REQ_OP_READ | REQ_PRIO);
804 	return VDO_SUCCESS;
805 }
806 
807 static void write_pages(struct vdo_completion *completion);
808 
809 /** handle_flush_error() - Handle errors flushing the layer. */
handle_flush_error(struct vdo_completion * completion)810 static void handle_flush_error(struct vdo_completion *completion)
811 {
812 	struct page_info *info = completion->parent;
813 
814 	vio_record_metadata_io_error(as_vio(completion));
815 	set_persistent_error(info->cache, "flush failed", completion->result);
816 	write_pages(completion);
817 }
818 
flush_endio(struct bio * bio)819 static void flush_endio(struct bio *bio)
820 {
821 	struct vio *vio = bio->bi_private;
822 	struct page_info *info = vio->completion.parent;
823 
824 	continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id);
825 }
826 
827 /** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */
save_pages(struct vdo_page_cache * cache)828 static void save_pages(struct vdo_page_cache *cache)
829 {
830 	struct page_info *info;
831 	struct vio *vio;
832 
833 	if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0))
834 		return;
835 
836 	assert_io_allowed(cache);
837 
838 	info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry);
839 
840 	cache->pages_in_flush = cache->pages_to_flush;
841 	cache->pages_to_flush = 0;
842 	ADD_ONCE(cache->stats.flush_count, 1);
843 
844 	vio = info->vio;
845 
846 	/*
847 	 * We must make sure that the recovery journal entries that changed these pages were
848 	 * successfully persisted, and thus must issue a flush before each batch of pages is
849 	 * written to ensure this.
850 	 */
851 	vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
852 }
853 
854 /**
855  * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved.
856  * @info: The page info to save.
857  *
858  * Once in the list, a page may not be used until it has been written out.
859  */
schedule_page_save(struct page_info * info)860 static void schedule_page_save(struct page_info *info)
861 {
862 	if (info->busy > 0) {
863 		info->write_status = WRITE_STATUS_DEFERRED;
864 		return;
865 	}
866 
867 	info->cache->pages_to_flush++;
868 	info->cache->outstanding_writes++;
869 	set_info_state(info, PS_OUTGOING);
870 }
871 
872 /**
873  * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving
874  * pages if another save is not in progress.
875  * @info: The page info to save.
876  */
launch_page_save(struct page_info * info)877 static void launch_page_save(struct page_info *info)
878 {
879 	schedule_page_save(info);
880 	save_pages(info->cache);
881 }
882 
883 /**
884  * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is
885  *                           requesting a given page number.
886  * @waiter: The page completion waiter to check.
887  * @context: A pointer to the pbn of the desired page.
888  *
889  * Implements waiter_match_fn.
890  *
891  * Return: true if the page completion is for the desired page number.
892  */
completion_needs_page(struct vdo_waiter * waiter,void * context)893 static bool completion_needs_page(struct vdo_waiter *waiter, void *context)
894 {
895 	physical_block_number_t *pbn = context;
896 
897 	return (page_completion_from_waiter(waiter)->pbn == *pbn);
898 }
899 
900 /**
901  * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and
902  *                        any other completions that match it in page number.
903  * @info: The page info to allocate a page for.
904  */
allocate_free_page(struct page_info * info)905 static void allocate_free_page(struct page_info *info)
906 {
907 	int result;
908 	struct vdo_waiter *oldest_waiter;
909 	physical_block_number_t pbn;
910 	struct vdo_page_cache *cache = info->cache;
911 
912 	assert_on_cache_thread(cache, __func__);
913 
914 	if (!vdo_waitq_has_waiters(&cache->free_waiters)) {
915 		if (cache->stats.cache_pressure > 0) {
916 			vdo_log_info("page cache pressure relieved");
917 			WRITE_ONCE(cache->stats.cache_pressure, 0);
918 		}
919 
920 		return;
921 	}
922 
923 	result = reset_page_info(info);
924 	if (result != VDO_SUCCESS) {
925 		set_persistent_error(cache, "cannot reset page info", result);
926 		return;
927 	}
928 
929 	oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters);
930 	pbn = page_completion_from_waiter(oldest_waiter)->pbn;
931 
932 	/*
933 	 * Remove all entries which match the page number in question and push them onto the page
934 	 * info's waitq.
935 	 */
936 	vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page,
937 					   &pbn, &info->waiting);
938 	cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting);
939 
940 	result = launch_page_load(info, pbn);
941 	if (result != VDO_SUCCESS) {
942 		vdo_waitq_notify_all_waiters(&info->waiting,
943 					     complete_waiter_with_error, &result);
944 	}
945 }
946 
947 /**
948  * discard_a_page() - Begin the process of discarding a page.
949  * @cache: The page cache.
950  *
951  * If no page is discardable, increments a count of deferred frees so that the next release of a
952  * page which is no longer busy will kick off another discard cycle. This is an indication that the
953  * cache is not big enough.
954  *
955  * If the selected page is not dirty, immediately allocates the page to the oldest completion
956  * waiting for a free page.
957  */
discard_a_page(struct vdo_page_cache * cache)958 static void discard_a_page(struct vdo_page_cache *cache)
959 {
960 	struct page_info *info = select_lru_page(cache);
961 
962 	if (info == NULL) {
963 		report_cache_pressure(cache);
964 		return;
965 	}
966 
967 	if (!is_dirty(info)) {
968 		allocate_free_page(info);
969 		return;
970 	}
971 
972 	VDO_ASSERT_LOG_ONLY(!is_in_flight(info),
973 			    "page selected for discard is not in flight");
974 
975 	cache->discard_count++;
976 	info->write_status = WRITE_STATUS_DISCARD;
977 	launch_page_save(info);
978 }
979 
discard_page_for_completion(struct vdo_page_completion * vdo_page_comp)980 static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp)
981 {
982 	struct vdo_page_cache *cache = vdo_page_comp->cache;
983 
984 	cache->waiter_count++;
985 	vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter);
986 	discard_a_page(cache);
987 }
988 
989 /**
990  * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free
991  *                            page.
992  * @cache: The page cache.
993  */
discard_page_if_needed(struct vdo_page_cache * cache)994 static void discard_page_if_needed(struct vdo_page_cache *cache)
995 {
996 	if (cache->waiter_count > cache->discard_count)
997 		discard_a_page(cache);
998 }
999 
1000 /**
1001  * write_has_finished() - Inform the cache that a write has finished (possibly with an error).
1002  * @info: The info structure for the page whose write just completed.
1003  *
1004  * Return: true if the page write was a discard.
1005  */
write_has_finished(struct page_info * info)1006 static bool write_has_finished(struct page_info *info)
1007 {
1008 	bool was_discard = (info->write_status == WRITE_STATUS_DISCARD);
1009 
1010 	assert_on_cache_thread(info->cache, __func__);
1011 	info->cache->outstanding_writes--;
1012 
1013 	info->write_status = WRITE_STATUS_NORMAL;
1014 	return was_discard;
1015 }
1016 
1017 /**
1018  * handle_page_write_error() - Handler for page write errors.
1019  * @completion: The page write vio.
1020  */
handle_page_write_error(struct vdo_completion * completion)1021 static void handle_page_write_error(struct vdo_completion *completion)
1022 {
1023 	int result = completion->result;
1024 	struct page_info *info = completion->parent;
1025 	struct vdo_page_cache *cache = info->cache;
1026 
1027 	vio_record_metadata_io_error(as_vio(completion));
1028 
1029 	/* If we're already read-only, write failures are to be expected. */
1030 	if (result != VDO_READ_ONLY) {
1031 		vdo_log_ratelimit(vdo_log_error,
1032 				  "failed to write block map page %llu",
1033 				  (unsigned long long) info->pbn);
1034 	}
1035 
1036 	set_info_state(info, PS_DIRTY);
1037 	ADD_ONCE(cache->stats.failed_writes, 1);
1038 	set_persistent_error(cache, "cannot write page", result);
1039 
1040 	if (!write_has_finished(info))
1041 		discard_page_if_needed(cache);
1042 
1043 	check_for_drain_complete(cache->zone);
1044 }
1045 
1046 static void page_is_written_out(struct vdo_completion *completion);
1047 
write_cache_page_endio(struct bio * bio)1048 static void write_cache_page_endio(struct bio *bio)
1049 {
1050 	struct vio *vio = bio->bi_private;
1051 	struct page_info *info = vio->completion.parent;
1052 
1053 	continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id);
1054 }
1055 
1056 /**
1057  * page_is_written_out() - Callback used when a page has been written out.
1058  * @completion: The vio which wrote the page. Its parent is a page_info.
1059  */
page_is_written_out(struct vdo_completion * completion)1060 static void page_is_written_out(struct vdo_completion *completion)
1061 {
1062 	bool was_discard, reclaimed;
1063 	u32 reclamations;
1064 	struct page_info *info = completion->parent;
1065 	struct vdo_page_cache *cache = info->cache;
1066 	struct block_map_page *page = (struct block_map_page *) get_page_buffer(info);
1067 
1068 	if (!page->header.initialized) {
1069 		page->header.initialized = true;
1070 		vdo_submit_metadata_vio(info->vio, info->pbn,
1071 					write_cache_page_endio,
1072 					handle_page_write_error,
1073 					REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH);
1074 		return;
1075 	}
1076 
1077 	/* Handle journal updates and torn write protection. */
1078 	vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal,
1079 						     info->recovery_lock,
1080 						     VDO_ZONE_TYPE_LOGICAL,
1081 						     cache->zone->zone_number);
1082 	info->recovery_lock = 0;
1083 	was_discard = write_has_finished(info);
1084 	reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting));
1085 
1086 	set_info_state(info, PS_RESIDENT);
1087 
1088 	reclamations = distribute_page_over_waitq(info, &info->waiting);
1089 	ADD_ONCE(cache->stats.reclaimed, reclamations);
1090 
1091 	if (was_discard)
1092 		cache->discard_count--;
1093 
1094 	if (reclaimed)
1095 		discard_page_if_needed(cache);
1096 	else
1097 		allocate_free_page(info);
1098 
1099 	check_for_drain_complete(cache->zone);
1100 }
1101 
1102 /**
1103  * write_pages() - Write the batch of pages which were covered by the layer flush which just
1104  *                 completed.
1105  * @flush_completion: The flush vio.
1106  *
1107  * This callback is registered in save_pages().
1108  */
write_pages(struct vdo_completion * flush_completion)1109 static void write_pages(struct vdo_completion *flush_completion)
1110 {
1111 	struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache;
1112 
1113 	/*
1114 	 * We need to cache these two values on the stack since it is possible for the last
1115 	 * page info to cause the page cache to get freed. Hence once we launch the last page,
1116 	 * it may be unsafe to dereference the cache.
1117 	 */
1118 	bool has_unflushed_pages = (cache->pages_to_flush > 0);
1119 	page_count_t pages_in_flush = cache->pages_in_flush;
1120 
1121 	cache->pages_in_flush = 0;
1122 	while (pages_in_flush-- > 0) {
1123 		struct page_info *info =
1124 			list_first_entry(&cache->outgoing_list, struct page_info,
1125 					 state_entry);
1126 
1127 		list_del_init(&info->state_entry);
1128 		if (vdo_is_read_only(info->cache->vdo)) {
1129 			struct vdo_completion *completion = &info->vio->completion;
1130 
1131 			vdo_reset_completion(completion);
1132 			completion->callback = page_is_written_out;
1133 			completion->error_handler = handle_page_write_error;
1134 			vdo_fail_completion(completion, VDO_READ_ONLY);
1135 			continue;
1136 		}
1137 		ADD_ONCE(info->cache->stats.pages_saved, 1);
1138 		vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio,
1139 					handle_page_write_error, REQ_OP_WRITE | REQ_PRIO);
1140 	}
1141 
1142 	if (has_unflushed_pages) {
1143 		/*
1144 		 * If there are unflushed pages, the cache can't have been freed, so this call is
1145 		 * safe.
1146 		 */
1147 		save_pages(cache);
1148 	}
1149 }
1150 
1151 /**
1152  * vdo_release_page_completion() - Release a VDO Page Completion.
1153  * @completion: The page completion to release.
1154  *
1155  * The page referenced by this completion (if any) will no longer be held busy by this completion.
1156  * If a page becomes discardable and there are completions awaiting free pages then a new round of
1157  * page discarding is started.
1158  */
vdo_release_page_completion(struct vdo_completion * completion)1159 void vdo_release_page_completion(struct vdo_completion *completion)
1160 {
1161 	struct page_info *discard_info = NULL;
1162 	struct vdo_page_completion *page_completion = as_vdo_page_completion(completion);
1163 	struct vdo_page_cache *cache;
1164 
1165 	if (completion->result == VDO_SUCCESS) {
1166 		if (!validate_completed_page_or_enter_read_only_mode(page_completion, false))
1167 			return;
1168 
1169 		if (--page_completion->info->busy == 0)
1170 			discard_info = page_completion->info;
1171 	}
1172 
1173 	VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1174 			    "Page being released after leaving all queues");
1175 
1176 	page_completion->info = NULL;
1177 	cache = page_completion->cache;
1178 	assert_on_cache_thread(cache, __func__);
1179 
1180 	if (discard_info != NULL) {
1181 		if (discard_info->write_status == WRITE_STATUS_DEFERRED) {
1182 			discard_info->write_status = WRITE_STATUS_NORMAL;
1183 			launch_page_save(discard_info);
1184 		}
1185 
1186 		/*
1187 		 * if there are excess requests for pages (that have not already started discards)
1188 		 * we need to discard some page (which may be this one)
1189 		 */
1190 		discard_page_if_needed(cache);
1191 	}
1192 }
1193 
load_page_for_completion(struct page_info * info,struct vdo_page_completion * vdo_page_comp)1194 static void load_page_for_completion(struct page_info *info,
1195 				     struct vdo_page_completion *vdo_page_comp)
1196 {
1197 	int result;
1198 
1199 	vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter);
1200 	result = launch_page_load(info, vdo_page_comp->pbn);
1201 	if (result != VDO_SUCCESS) {
1202 		vdo_waitq_notify_all_waiters(&info->waiting,
1203 					     complete_waiter_with_error, &result);
1204 	}
1205 }
1206 
1207 /**
1208  * vdo_get_page() - Initialize a page completion and get a block map page.
1209  * @page_completion: The vdo_page_completion to initialize.
1210  * @zone: The block map zone of the desired page.
1211  * @pbn: The absolute physical block of the desired page.
1212  * @writable: Whether the page can be modified.
1213  * @parent: The object to notify when the fetch is complete.
1214  * @callback: The notification callback.
1215  * @error_handler: The handler for fetch errors.
1216  * @requeue: Whether we must requeue when notifying the parent.
1217  *
1218  * May cause another page to be discarded (potentially writing a dirty page) and the one nominated
1219  * by the completion to be loaded from disk. When the callback is invoked, the page will be
1220  * resident in the cache and marked busy. All callers must call vdo_release_page_completion()
1221  * when they are done with the page to clear the busy mark.
1222  */
vdo_get_page(struct vdo_page_completion * page_completion,struct block_map_zone * zone,physical_block_number_t pbn,bool writable,void * parent,vdo_action_fn callback,vdo_action_fn error_handler,bool requeue)1223 void vdo_get_page(struct vdo_page_completion *page_completion,
1224 		  struct block_map_zone *zone, physical_block_number_t pbn,
1225 		  bool writable, void *parent, vdo_action_fn callback,
1226 		  vdo_action_fn error_handler, bool requeue)
1227 {
1228 	struct vdo_page_cache *cache = &zone->page_cache;
1229 	struct vdo_completion *completion = &page_completion->completion;
1230 	struct page_info *info;
1231 
1232 	assert_on_cache_thread(cache, __func__);
1233 	VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1234 			    "New page completion was not already on a wait queue");
1235 
1236 	*page_completion = (struct vdo_page_completion) {
1237 		.pbn = pbn,
1238 		.writable = writable,
1239 		.cache = cache,
1240 	};
1241 
1242 	vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION);
1243 	vdo_prepare_completion(completion, callback, error_handler,
1244 			       cache->zone->thread_id, parent);
1245 	completion->requeue = requeue;
1246 
1247 	if (page_completion->writable && vdo_is_read_only(cache->vdo)) {
1248 		vdo_fail_completion(completion, VDO_READ_ONLY);
1249 		return;
1250 	}
1251 
1252 	if (page_completion->writable)
1253 		ADD_ONCE(cache->stats.write_count, 1);
1254 	else
1255 		ADD_ONCE(cache->stats.read_count, 1);
1256 
1257 	info = find_page(cache, page_completion->pbn);
1258 	if (info != NULL) {
1259 		/* The page is in the cache already. */
1260 		if ((info->write_status == WRITE_STATUS_DEFERRED) ||
1261 		    is_incoming(info) ||
1262 		    (is_outgoing(info) && page_completion->writable)) {
1263 			/* The page is unusable until it has finished I/O. */
1264 			ADD_ONCE(cache->stats.wait_for_page, 1);
1265 			vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter);
1266 			return;
1267 		}
1268 
1269 		if (is_valid(info)) {
1270 			/* The page is usable. */
1271 			ADD_ONCE(cache->stats.found_in_cache, 1);
1272 			if (!is_present(info))
1273 				ADD_ONCE(cache->stats.read_outgoing, 1);
1274 			update_lru(info);
1275 			info->busy++;
1276 			complete_with_page(info, page_completion);
1277 			return;
1278 		}
1279 
1280 		/* Something horrible has gone wrong. */
1281 		VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state.");
1282 	}
1283 
1284 	/* The page must be fetched. */
1285 	info = find_free_page(cache);
1286 	if (info != NULL) {
1287 		ADD_ONCE(cache->stats.fetch_required, 1);
1288 		load_page_for_completion(info, page_completion);
1289 		return;
1290 	}
1291 
1292 	/* The page must wait for a page to be discarded. */
1293 	ADD_ONCE(cache->stats.discard_required, 1);
1294 	discard_page_for_completion(page_completion);
1295 }
1296 
1297 /**
1298  * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy.
1299  * @completion: The vdo_page_completion containing the page.
1300  */
vdo_request_page_write(struct vdo_completion * completion)1301 void vdo_request_page_write(struct vdo_completion *completion)
1302 {
1303 	struct page_info *info;
1304 	struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion);
1305 
1306 	if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true))
1307 		return;
1308 
1309 	info = vdo_page_comp->info;
1310 	set_info_state(info, PS_DIRTY);
1311 	launch_page_save(info);
1312 }
1313 
1314 /**
1315  * vdo_get_cached_page() - Get the block map page from a page completion.
1316  * @completion: A vdo page completion whose callback has been called.
1317  * @page_ptr: A pointer to hold the page
1318  *
1319  * Return: VDO_SUCCESS or an error
1320  */
vdo_get_cached_page(struct vdo_completion * completion,struct block_map_page ** page_ptr)1321 int vdo_get_cached_page(struct vdo_completion *completion,
1322 			struct block_map_page **page_ptr)
1323 {
1324 	int result;
1325 	struct vdo_page_completion *vpc;
1326 
1327 	vpc = as_vdo_page_completion(completion);
1328 	result = validate_completed_page(vpc, true);
1329 	if (result == VDO_SUCCESS)
1330 		*page_ptr = (struct block_map_page *) get_page_buffer(vpc->info);
1331 
1332 	return result;
1333 }
1334 
1335 /**
1336  * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache.
1337  * @cache: The page cache.
1338  *
1339  * There must not be any dirty pages in the cache.
1340  *
1341  * Return: A success or error code.
1342  */
vdo_invalidate_page_cache(struct vdo_page_cache * cache)1343 int vdo_invalidate_page_cache(struct vdo_page_cache *cache)
1344 {
1345 	struct page_info *info;
1346 
1347 	assert_on_cache_thread(cache, __func__);
1348 
1349 	/* Make sure we don't throw away any dirty pages. */
1350 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
1351 		int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages");
1352 
1353 		if (result != VDO_SUCCESS)
1354 			return result;
1355 	}
1356 
1357 	/* Reset the page map by re-allocating it. */
1358 	vdo_int_map_free(vdo_forget(cache->page_map));
1359 	return vdo_int_map_create(cache->page_count, &cache->page_map);
1360 }
1361 
1362 /**
1363  * get_tree_page_by_index() - Get the tree page for a given height and page index.
1364  * @forest: The block map forest.
1365  * @root_index: The root index of the tree to search.
1366  * @height: The height in the tree.
1367  * @page_index: The page index.
1368  *
1369  * Return: The requested page.
1370  */
get_tree_page_by_index(struct forest * forest,root_count_t root_index,height_t height,page_number_t page_index)1371 static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest,
1372 							      root_count_t root_index,
1373 							      height_t height,
1374 							      page_number_t page_index)
1375 {
1376 	page_number_t offset = 0;
1377 	size_t segment;
1378 
1379 	for (segment = 0; segment < forest->segments; segment++) {
1380 		page_number_t border = forest->boundaries[segment].levels[height - 1];
1381 
1382 		if (page_index < border) {
1383 			struct block_map_tree *tree = &forest->trees[root_index];
1384 
1385 			return &(tree->segments[segment].levels[height - 1][page_index - offset]);
1386 		}
1387 
1388 		offset = border;
1389 	}
1390 
1391 	return NULL;
1392 }
1393 
1394 /* Get the page referred to by the lock's tree slot at its current height. */
get_tree_page(const struct block_map_zone * zone,const struct tree_lock * lock)1395 static inline struct tree_page *get_tree_page(const struct block_map_zone *zone,
1396 					      const struct tree_lock *lock)
1397 {
1398 	return get_tree_page_by_index(zone->block_map->forest, lock->root_index,
1399 				      lock->height,
1400 				      lock->tree_slots[lock->height].page_index);
1401 }
1402 
1403 /** vdo_copy_valid_page() - Validate and copy a buffer to a page. */
vdo_copy_valid_page(char * buffer,nonce_t nonce,physical_block_number_t pbn,struct block_map_page * page)1404 bool vdo_copy_valid_page(char *buffer, nonce_t nonce,
1405 			 physical_block_number_t pbn,
1406 			 struct block_map_page *page)
1407 {
1408 	struct block_map_page *loaded = (struct block_map_page *) buffer;
1409 	enum block_map_page_validity validity =
1410 		vdo_validate_block_map_page(loaded, nonce, pbn);
1411 
1412 	if (validity == VDO_BLOCK_MAP_PAGE_VALID) {
1413 		memcpy(page, loaded, VDO_BLOCK_SIZE);
1414 		return true;
1415 	}
1416 
1417 	if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
1418 		vdo_log_error_strerror(VDO_BAD_PAGE,
1419 				       "Expected page %llu but got page %llu instead",
1420 				       (unsigned long long) pbn,
1421 				       (unsigned long long) vdo_get_block_map_page_pbn(loaded));
1422 	}
1423 
1424 	return false;
1425 }
1426 
1427 /**
1428  * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within
1429  *                     a cyclic range of values from 0 to (modulus - 1).
1430  * @lower: The lowest value to accept.
1431  * @value: The value to check.
1432  * @upper: The highest value to accept.
1433  * @modulus: The size of the cyclic space, no more than 2^15.
1434  *
1435  * The value and both bounds must be smaller than the modulus.
1436  *
1437  * Return: true if the value is in range.
1438  */
in_cyclic_range(u16 lower,u16 value,u16 upper,u16 modulus)1439 static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus)
1440 {
1441 	if (value < lower)
1442 		value += modulus;
1443 	if (upper < lower)
1444 		upper += modulus;
1445 	return (value <= upper);
1446 }
1447 
1448 /**
1449  * is_not_older() - Check whether a generation is strictly older than some other generation in the
1450  *                  context of a zone's current generation range.
1451  * @zone: The zone in which to do the comparison.
1452  * @a: The generation in question.
1453  * @b: The generation to compare to.
1454  *
1455  * Return: true if generation @a is not strictly older than generation @b in the context of @zone
1456  */
is_not_older(struct block_map_zone * zone,u8 a,u8 b)1457 static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b)
1458 {
1459 	int result;
1460 
1461 	result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) &&
1462 			     in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)),
1463 			    "generation(s) %u, %u are out of range [%u, %u]",
1464 			    a, b, zone->oldest_generation, zone->generation);
1465 	if (result != VDO_SUCCESS) {
1466 		enter_zone_read_only_mode(zone, result);
1467 		return true;
1468 	}
1469 
1470 	return in_cyclic_range(b, a, zone->generation, 1 << 8);
1471 }
1472 
release_generation(struct block_map_zone * zone,u8 generation)1473 static void release_generation(struct block_map_zone *zone, u8 generation)
1474 {
1475 	int result;
1476 
1477 	result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0),
1478 			    "dirty page count underflow for generation %u", generation);
1479 	if (result != VDO_SUCCESS) {
1480 		enter_zone_read_only_mode(zone, result);
1481 		return;
1482 	}
1483 
1484 	zone->dirty_page_counts[generation]--;
1485 	while ((zone->dirty_page_counts[zone->oldest_generation] == 0) &&
1486 	       (zone->oldest_generation != zone->generation))
1487 		zone->oldest_generation++;
1488 }
1489 
set_generation(struct block_map_zone * zone,struct tree_page * page,u8 new_generation)1490 static void set_generation(struct block_map_zone *zone, struct tree_page *page,
1491 			   u8 new_generation)
1492 {
1493 	u32 new_count;
1494 	int result;
1495 	bool decrement_old = vdo_waiter_is_waiting(&page->waiter);
1496 	u8 old_generation = page->generation;
1497 
1498 	if (decrement_old && (old_generation == new_generation))
1499 		return;
1500 
1501 	page->generation = new_generation;
1502 	new_count = ++zone->dirty_page_counts[new_generation];
1503 	result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u",
1504 			    new_generation);
1505 	if (result != VDO_SUCCESS) {
1506 		enter_zone_read_only_mode(zone, result);
1507 		return;
1508 	}
1509 
1510 	if (decrement_old)
1511 		release_generation(zone, old_generation);
1512 }
1513 
1514 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio);
1515 
1516 /* Implements waiter_callback_fn */
write_page_callback(struct vdo_waiter * waiter,void * context)1517 static void write_page_callback(struct vdo_waiter *waiter, void *context)
1518 {
1519 	write_page(container_of(waiter, struct tree_page, waiter), context);
1520 }
1521 
acquire_vio(struct vdo_waiter * waiter,struct block_map_zone * zone)1522 static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone)
1523 {
1524 	waiter->callback = write_page_callback;
1525 	acquire_vio_from_pool(zone->vio_pool, waiter);
1526 }
1527 
1528 /* Return: true if all possible generations were not already active */
attempt_increment(struct block_map_zone * zone)1529 static bool attempt_increment(struct block_map_zone *zone)
1530 {
1531 	u8 generation = zone->generation + 1;
1532 
1533 	if (zone->oldest_generation == generation)
1534 		return false;
1535 
1536 	zone->generation = generation;
1537 	return true;
1538 }
1539 
1540 /* Launches a flush if one is not already in progress. */
enqueue_page(struct tree_page * page,struct block_map_zone * zone)1541 static void enqueue_page(struct tree_page *page, struct block_map_zone *zone)
1542 {
1543 	if ((zone->flusher == NULL) && attempt_increment(zone)) {
1544 		zone->flusher = page;
1545 		acquire_vio(&page->waiter, zone);
1546 		return;
1547 	}
1548 
1549 	vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter);
1550 }
1551 
write_page_if_not_dirtied(struct vdo_waiter * waiter,void * context)1552 static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context)
1553 {
1554 	struct tree_page *page = container_of(waiter, struct tree_page, waiter);
1555 	struct write_if_not_dirtied_context *write_context = context;
1556 
1557 	if (page->generation == write_context->generation) {
1558 		acquire_vio(waiter, write_context->zone);
1559 		return;
1560 	}
1561 
1562 	enqueue_page(page, write_context->zone);
1563 }
1564 
return_to_pool(struct block_map_zone * zone,struct pooled_vio * vio)1565 static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio)
1566 {
1567 	return_vio_to_pool(vio);
1568 	check_for_drain_complete(zone);
1569 }
1570 
1571 /* This callback is registered in write_initialized_page(). */
finish_page_write(struct vdo_completion * completion)1572 static void finish_page_write(struct vdo_completion *completion)
1573 {
1574 	bool dirty;
1575 	struct vio *vio = as_vio(completion);
1576 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1577 	struct tree_page *page = completion->parent;
1578 	struct block_map_zone *zone = pooled->context;
1579 
1580 	vdo_release_recovery_journal_block_reference(zone->block_map->journal,
1581 						     page->writing_recovery_lock,
1582 						     VDO_ZONE_TYPE_LOGICAL,
1583 						     zone->zone_number);
1584 
1585 	dirty = (page->writing_generation != page->generation);
1586 	release_generation(zone, page->writing_generation);
1587 	page->writing = false;
1588 
1589 	if (zone->flusher == page) {
1590 		struct write_if_not_dirtied_context context = {
1591 			.zone = zone,
1592 			.generation = page->writing_generation,
1593 		};
1594 
1595 		vdo_waitq_notify_all_waiters(&zone->flush_waiters,
1596 					     write_page_if_not_dirtied, &context);
1597 		if (dirty && attempt_increment(zone)) {
1598 			write_page(page, pooled);
1599 			return;
1600 		}
1601 
1602 		zone->flusher = NULL;
1603 	}
1604 
1605 	if (dirty) {
1606 		enqueue_page(page, zone);
1607 	} else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) &&
1608 		   attempt_increment(zone)) {
1609 		zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters),
1610 					     struct tree_page, waiter);
1611 		write_page(zone->flusher, pooled);
1612 		return;
1613 	}
1614 
1615 	return_to_pool(zone, pooled);
1616 }
1617 
handle_write_error(struct vdo_completion * completion)1618 static void handle_write_error(struct vdo_completion *completion)
1619 {
1620 	int result = completion->result;
1621 	struct vio *vio = as_vio(completion);
1622 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1623 	struct block_map_zone *zone = pooled->context;
1624 
1625 	vio_record_metadata_io_error(vio);
1626 	enter_zone_read_only_mode(zone, result);
1627 	return_to_pool(zone, pooled);
1628 }
1629 
1630 static void write_page_endio(struct bio *bio);
1631 
write_initialized_page(struct vdo_completion * completion)1632 static void write_initialized_page(struct vdo_completion *completion)
1633 {
1634 	struct vio *vio = as_vio(completion);
1635 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1636 	struct block_map_zone *zone = pooled->context;
1637 	struct tree_page *tree_page = completion->parent;
1638 	struct block_map_page *page = (struct block_map_page *) vio->data;
1639 	blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO;
1640 
1641 	/*
1642 	 * Now that we know the page has been written at least once, mark the copy we are writing
1643 	 * as initialized.
1644 	 */
1645 	page->header.initialized = true;
1646 
1647 	if (zone->flusher == tree_page)
1648 		operation |= REQ_PREFLUSH;
1649 
1650 	vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page),
1651 				write_page_endio, handle_write_error,
1652 				operation);
1653 }
1654 
write_page_endio(struct bio * bio)1655 static void write_page_endio(struct bio *bio)
1656 {
1657 	struct pooled_vio *vio = bio->bi_private;
1658 	struct block_map_zone *zone = vio->context;
1659 	struct block_map_page *page = (struct block_map_page *) vio->vio.data;
1660 
1661 	continue_vio_after_io(&vio->vio,
1662 			      (page->header.initialized ?
1663 			       finish_page_write : write_initialized_page),
1664 			      zone->thread_id);
1665 }
1666 
write_page(struct tree_page * tree_page,struct pooled_vio * vio)1667 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio)
1668 {
1669 	struct vdo_completion *completion = &vio->vio.completion;
1670 	struct block_map_zone *zone = vio->context;
1671 	struct block_map_page *page = vdo_as_block_map_page(tree_page);
1672 
1673 	if ((zone->flusher != tree_page) &&
1674 	    is_not_older(zone, tree_page->generation, zone->generation)) {
1675 		/*
1676 		 * This page was re-dirtied after the last flush was issued, hence we need to do
1677 		 * another flush.
1678 		 */
1679 		enqueue_page(tree_page, zone);
1680 		return_to_pool(zone, vio);
1681 		return;
1682 	}
1683 
1684 	completion->parent = tree_page;
1685 	memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE);
1686 	completion->callback_thread_id = zone->thread_id;
1687 
1688 	tree_page->writing = true;
1689 	tree_page->writing_generation = tree_page->generation;
1690 	tree_page->writing_recovery_lock = tree_page->recovery_lock;
1691 
1692 	/* Clear this now so that we know this page is not on any dirty list. */
1693 	tree_page->recovery_lock = 0;
1694 
1695 	/*
1696 	 * We've already copied the page into the vio which will write it, so if it was not yet
1697 	 * initialized, the first write will indicate that (for torn write protection). It is now
1698 	 * safe to mark it as initialized in memory since if the write fails, the in memory state
1699 	 * will become irrelevant.
1700 	 */
1701 	if (page->header.initialized) {
1702 		write_initialized_page(completion);
1703 		return;
1704 	}
1705 
1706 	page->header.initialized = true;
1707 	vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page),
1708 				write_page_endio, handle_write_error,
1709 				REQ_OP_WRITE | REQ_PRIO);
1710 }
1711 
1712 /* Release a lock on a page which was being loaded or allocated. */
release_page_lock(struct data_vio * data_vio,char * what)1713 static void release_page_lock(struct data_vio *data_vio, char *what)
1714 {
1715 	struct block_map_zone *zone;
1716 	struct tree_lock *lock_holder;
1717 	struct tree_lock *lock = &data_vio->tree_lock;
1718 
1719 	VDO_ASSERT_LOG_ONLY(lock->locked,
1720 			    "release of unlocked block map page %s for key %llu in tree %u",
1721 			    what, (unsigned long long) lock->key, lock->root_index);
1722 
1723 	zone = data_vio->logical.zone->block_map_zone;
1724 	lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key);
1725 	VDO_ASSERT_LOG_ONLY((lock_holder == lock),
1726 			    "block map page %s mismatch for key %llu in tree %u",
1727 			    what, (unsigned long long) lock->key, lock->root_index);
1728 	lock->locked = false;
1729 }
1730 
finish_lookup(struct data_vio * data_vio,int result)1731 static void finish_lookup(struct data_vio *data_vio, int result)
1732 {
1733 	data_vio->tree_lock.height = 0;
1734 
1735 	--data_vio->logical.zone->block_map_zone->active_lookups;
1736 
1737 	set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot);
1738 	data_vio->vio.completion.error_handler = handle_data_vio_error;
1739 	continue_data_vio_with_error(data_vio, result);
1740 }
1741 
abort_lookup_for_waiter(struct vdo_waiter * waiter,void * context)1742 static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context)
1743 {
1744 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1745 	int result = *((int *) context);
1746 
1747 	if (!data_vio->write) {
1748 		if (result == VDO_NO_SPACE)
1749 			result = VDO_SUCCESS;
1750 	} else if (result != VDO_NO_SPACE) {
1751 		result = VDO_READ_ONLY;
1752 	}
1753 
1754 	finish_lookup(data_vio, result);
1755 }
1756 
abort_lookup(struct data_vio * data_vio,int result,char * what)1757 static void abort_lookup(struct data_vio *data_vio, int result, char *what)
1758 {
1759 	if (result != VDO_NO_SPACE)
1760 		enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result);
1761 
1762 	if (data_vio->tree_lock.locked) {
1763 		release_page_lock(data_vio, what);
1764 		vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters,
1765 					     abort_lookup_for_waiter,
1766 					     &result);
1767 	}
1768 
1769 	finish_lookup(data_vio, result);
1770 }
1771 
abort_load(struct data_vio * data_vio,int result)1772 static void abort_load(struct data_vio *data_vio, int result)
1773 {
1774 	abort_lookup(data_vio, result, "load");
1775 }
1776 
is_invalid_tree_entry(const struct vdo * vdo,const struct data_location * mapping,height_t height)1777 static bool __must_check is_invalid_tree_entry(const struct vdo *vdo,
1778 					       const struct data_location *mapping,
1779 					       height_t height)
1780 {
1781 	if (!vdo_is_valid_location(mapping) ||
1782 	    vdo_is_state_compressed(mapping->state) ||
1783 	    (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK)))
1784 		return true;
1785 
1786 	/* Roots aren't physical data blocks, so we can't check their PBNs. */
1787 	if (height == VDO_BLOCK_MAP_TREE_HEIGHT)
1788 		return false;
1789 
1790 	return !vdo_is_physical_data_block(vdo->depot, mapping->pbn);
1791 }
1792 
1793 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio);
1794 static void allocate_block_map_page(struct block_map_zone *zone,
1795 				    struct data_vio *data_vio);
1796 
continue_with_loaded_page(struct data_vio * data_vio,struct block_map_page * page)1797 static void continue_with_loaded_page(struct data_vio *data_vio,
1798 				      struct block_map_page *page)
1799 {
1800 	struct tree_lock *lock = &data_vio->tree_lock;
1801 	struct block_map_tree_slot slot = lock->tree_slots[lock->height];
1802 	struct data_location mapping =
1803 		vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]);
1804 
1805 	if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
1806 		vdo_log_error_strerror(VDO_BAD_MAPPING,
1807 				       "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
1808 				       (unsigned long long) mapping.pbn, mapping.state,
1809 				       lock->tree_slots[lock->height - 1].page_index,
1810 				       lock->height - 1);
1811 		abort_load(data_vio, VDO_BAD_MAPPING);
1812 		return;
1813 	}
1814 
1815 	if (!vdo_is_mapped_location(&mapping)) {
1816 		/* The page we need is unallocated */
1817 		allocate_block_map_page(data_vio->logical.zone->block_map_zone,
1818 					data_vio);
1819 		return;
1820 	}
1821 
1822 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
1823 	if (lock->height == 1) {
1824 		finish_lookup(data_vio, VDO_SUCCESS);
1825 		return;
1826 	}
1827 
1828 	/* We know what page we need to load next */
1829 	load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1830 }
1831 
continue_load_for_waiter(struct vdo_waiter * waiter,void * context)1832 static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context)
1833 {
1834 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1835 
1836 	data_vio->tree_lock.height--;
1837 	continue_with_loaded_page(data_vio, context);
1838 }
1839 
finish_block_map_page_load(struct vdo_completion * completion)1840 static void finish_block_map_page_load(struct vdo_completion *completion)
1841 {
1842 	physical_block_number_t pbn;
1843 	struct tree_page *tree_page;
1844 	struct block_map_page *page;
1845 	nonce_t nonce;
1846 	struct vio *vio = as_vio(completion);
1847 	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1848 	struct data_vio *data_vio = completion->parent;
1849 	struct block_map_zone *zone = pooled->context;
1850 	struct tree_lock *tree_lock = &data_vio->tree_lock;
1851 
1852 	tree_lock->height--;
1853 	pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn;
1854 	tree_page = get_tree_page(zone, tree_lock);
1855 	page = (struct block_map_page *) tree_page->page_buffer;
1856 	nonce = zone->block_map->nonce;
1857 
1858 	if (!vdo_copy_valid_page(vio->data, nonce, pbn, page))
1859 		vdo_format_block_map_page(page, nonce, pbn, false);
1860 	return_vio_to_pool(pooled);
1861 
1862 	/* Release our claim to the load and wake any waiters */
1863 	release_page_lock(data_vio, "load");
1864 	vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page);
1865 	continue_with_loaded_page(data_vio, page);
1866 }
1867 
handle_io_error(struct vdo_completion * completion)1868 static void handle_io_error(struct vdo_completion *completion)
1869 {
1870 	int result = completion->result;
1871 	struct vio *vio = as_vio(completion);
1872 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1873 	struct data_vio *data_vio = completion->parent;
1874 
1875 	vio_record_metadata_io_error(vio);
1876 	return_vio_to_pool(pooled);
1877 	abort_load(data_vio, result);
1878 }
1879 
load_page_endio(struct bio * bio)1880 static void load_page_endio(struct bio *bio)
1881 {
1882 	struct vio *vio = bio->bi_private;
1883 	struct data_vio *data_vio = vio->completion.parent;
1884 
1885 	continue_vio_after_io(vio, finish_block_map_page_load,
1886 			      data_vio->logical.zone->thread_id);
1887 }
1888 
load_page(struct vdo_waiter * waiter,void * context)1889 static void load_page(struct vdo_waiter *waiter, void *context)
1890 {
1891 	struct pooled_vio *pooled = context;
1892 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1893 	struct tree_lock *lock = &data_vio->tree_lock;
1894 	physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn;
1895 
1896 	pooled->vio.completion.parent = data_vio;
1897 	vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio,
1898 				handle_io_error, REQ_OP_READ | REQ_PRIO);
1899 }
1900 
1901 /*
1902  * If the page is already locked, queue up to wait for the lock to be released. If the lock is
1903  * acquired, @data_vio->tree_lock.locked will be true.
1904  */
attempt_page_lock(struct block_map_zone * zone,struct data_vio * data_vio)1905 static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio)
1906 {
1907 	int result;
1908 	struct tree_lock *lock_holder;
1909 	struct tree_lock *lock = &data_vio->tree_lock;
1910 	height_t height = lock->height;
1911 	struct block_map_tree_slot tree_slot = lock->tree_slots[height];
1912 	union page_key key;
1913 
1914 	key.descriptor = (struct page_descriptor) {
1915 		.root_index = lock->root_index,
1916 		.height = height,
1917 		.page_index = tree_slot.page_index,
1918 		.slot = tree_slot.block_map_slot.slot,
1919 	};
1920 	lock->key = key.key;
1921 
1922 	result = vdo_int_map_put(zone->loading_pages, lock->key,
1923 				 lock, false, (void **) &lock_holder);
1924 	if (result != VDO_SUCCESS)
1925 		return result;
1926 
1927 	if (lock_holder == NULL) {
1928 		/* We got the lock */
1929 		data_vio->tree_lock.locked = true;
1930 		return VDO_SUCCESS;
1931 	}
1932 
1933 	/* Someone else is loading or allocating the page we need */
1934 	vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter);
1935 	return VDO_SUCCESS;
1936 }
1937 
1938 /* Load a block map tree page from disk, for the next level in the data vio tree lock. */
load_block_map_page(struct block_map_zone * zone,struct data_vio * data_vio)1939 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio)
1940 {
1941 	int result;
1942 
1943 	result = attempt_page_lock(zone, data_vio);
1944 	if (result != VDO_SUCCESS) {
1945 		abort_load(data_vio, result);
1946 		return;
1947 	}
1948 
1949 	if (data_vio->tree_lock.locked) {
1950 		data_vio->waiter.callback = load_page;
1951 		acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter);
1952 	}
1953 }
1954 
allocation_failure(struct vdo_completion * completion)1955 static void allocation_failure(struct vdo_completion *completion)
1956 {
1957 	struct data_vio *data_vio = as_data_vio(completion);
1958 
1959 	if (vdo_requeue_completion_if_needed(completion,
1960 					     data_vio->logical.zone->thread_id))
1961 		return;
1962 
1963 	abort_lookup(data_vio, completion->result, "allocation");
1964 }
1965 
continue_allocation_for_waiter(struct vdo_waiter * waiter,void * context)1966 static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context)
1967 {
1968 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1969 	struct tree_lock *tree_lock = &data_vio->tree_lock;
1970 	physical_block_number_t pbn = *((physical_block_number_t *) context);
1971 
1972 	tree_lock->height--;
1973 	data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn;
1974 
1975 	if (tree_lock->height == 0) {
1976 		finish_lookup(data_vio, VDO_SUCCESS);
1977 		return;
1978 	}
1979 
1980 	allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1981 }
1982 
1983 /** expire_oldest_list() - Expire the oldest list. */
expire_oldest_list(struct dirty_lists * dirty_lists)1984 static void expire_oldest_list(struct dirty_lists *dirty_lists)
1985 {
1986 	block_count_t i = dirty_lists->offset++;
1987 
1988 	dirty_lists->oldest_period++;
1989 	if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) {
1990 		list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE],
1991 				      &dirty_lists->expired[VDO_TREE_PAGE]);
1992 	}
1993 
1994 	if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) {
1995 		list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE],
1996 				      &dirty_lists->expired[VDO_CACHE_PAGE]);
1997 	}
1998 
1999 	if (dirty_lists->offset == dirty_lists->maximum_age)
2000 		dirty_lists->offset = 0;
2001 }
2002 
2003 
2004 /** update_period() - Update the dirty_lists period if necessary. */
update_period(struct dirty_lists * dirty,sequence_number_t period)2005 static void update_period(struct dirty_lists *dirty, sequence_number_t period)
2006 {
2007 	while (dirty->next_period <= period) {
2008 		if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age)
2009 			expire_oldest_list(dirty);
2010 		dirty->next_period++;
2011 	}
2012 }
2013 
2014 /** write_expired_elements() - Write out the expired list. */
write_expired_elements(struct block_map_zone * zone)2015 static void write_expired_elements(struct block_map_zone *zone)
2016 {
2017 	struct tree_page *page, *ttmp;
2018 	struct page_info *info, *ptmp;
2019 	struct list_head *expired;
2020 	u8 generation = zone->generation;
2021 
2022 	expired = &zone->dirty_lists->expired[VDO_TREE_PAGE];
2023 	list_for_each_entry_safe(page, ttmp, expired, entry) {
2024 		int result;
2025 
2026 		list_del_init(&page->entry);
2027 
2028 		result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter),
2029 				    "Newly expired page not already waiting to write");
2030 		if (result != VDO_SUCCESS) {
2031 			enter_zone_read_only_mode(zone, result);
2032 			continue;
2033 		}
2034 
2035 		set_generation(zone, page, generation);
2036 		if (!page->writing)
2037 			enqueue_page(page, zone);
2038 	}
2039 
2040 	expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE];
2041 	list_for_each_entry_safe(info, ptmp, expired, state_entry) {
2042 		list_del_init(&info->state_entry);
2043 		schedule_page_save(info);
2044 	}
2045 
2046 	save_pages(&zone->page_cache);
2047 }
2048 
2049 /**
2050  * add_to_dirty_lists() - Add an element to the dirty lists.
2051  * @zone: The zone in which we are operating.
2052  * @entry: The list entry of the element to add.
2053  * @type: The type of page.
2054  * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty.
2055  * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a
2056  *              lock.
2057  */
add_to_dirty_lists(struct block_map_zone * zone,struct list_head * entry,enum block_map_page_type type,sequence_number_t old_period,sequence_number_t new_period)2058 static void add_to_dirty_lists(struct block_map_zone *zone,
2059 			       struct list_head *entry,
2060 			       enum block_map_page_type type,
2061 			       sequence_number_t old_period,
2062 			       sequence_number_t new_period)
2063 {
2064 	struct dirty_lists *dirty_lists = zone->dirty_lists;
2065 
2066 	if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period)))
2067 		return;
2068 
2069 	if (new_period < dirty_lists->oldest_period) {
2070 		list_move_tail(entry, &dirty_lists->expired[type]);
2071 	} else {
2072 		update_period(dirty_lists, new_period);
2073 		list_move_tail(entry,
2074 			       &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]);
2075 	}
2076 
2077 	write_expired_elements(zone);
2078 }
2079 
2080 /*
2081  * Record the allocation in the tree and wake any waiters now that the write lock has been
2082  * released.
2083  */
finish_block_map_allocation(struct vdo_completion * completion)2084 static void finish_block_map_allocation(struct vdo_completion *completion)
2085 {
2086 	physical_block_number_t pbn;
2087 	struct tree_page *tree_page;
2088 	struct block_map_page *page;
2089 	sequence_number_t old_lock;
2090 	struct data_vio *data_vio = as_data_vio(completion);
2091 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2092 	struct tree_lock *tree_lock = &data_vio->tree_lock;
2093 	height_t height = tree_lock->height;
2094 
2095 	assert_data_vio_in_logical_zone(data_vio);
2096 
2097 	tree_page = get_tree_page(zone, tree_lock);
2098 	pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn;
2099 
2100 	/* Record the allocation. */
2101 	page = (struct block_map_page *) tree_page->page_buffer;
2102 	old_lock = tree_page->recovery_lock;
2103 	vdo_update_block_map_page(page, data_vio, pbn,
2104 				  VDO_MAPPING_STATE_UNCOMPRESSED,
2105 				  &tree_page->recovery_lock);
2106 
2107 	if (vdo_waiter_is_waiting(&tree_page->waiter)) {
2108 		/* This page is waiting to be written out. */
2109 		if (zone->flusher != tree_page) {
2110 			/*
2111 			 * The outstanding flush won't cover the update we just made,
2112 			 * so mark the page as needing another flush.
2113 			 */
2114 			set_generation(zone, tree_page, zone->generation);
2115 		}
2116 	} else {
2117 		/* Put the page on a dirty list */
2118 		if (old_lock == 0)
2119 			INIT_LIST_HEAD(&tree_page->entry);
2120 		add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE,
2121 				   old_lock, tree_page->recovery_lock);
2122 	}
2123 
2124 	tree_lock->height--;
2125 	if (height > 1) {
2126 		/* Format the interior node we just allocated (in memory). */
2127 		tree_page = get_tree_page(zone, tree_lock);
2128 		vdo_format_block_map_page(tree_page->page_buffer,
2129 					  zone->block_map->nonce,
2130 					  pbn, false);
2131 	}
2132 
2133 	/* Release our claim to the allocation and wake any waiters */
2134 	release_page_lock(data_vio, "allocation");
2135 	vdo_waitq_notify_all_waiters(&tree_lock->waiters,
2136 				     continue_allocation_for_waiter, &pbn);
2137 	if (tree_lock->height == 0) {
2138 		finish_lookup(data_vio, VDO_SUCCESS);
2139 		return;
2140 	}
2141 
2142 	allocate_block_map_page(zone, data_vio);
2143 }
2144 
release_block_map_write_lock(struct vdo_completion * completion)2145 static void release_block_map_write_lock(struct vdo_completion *completion)
2146 {
2147 	struct data_vio *data_vio = as_data_vio(completion);
2148 
2149 	assert_data_vio_in_allocated_zone(data_vio);
2150 
2151 	release_data_vio_allocation_lock(data_vio, true);
2152 	launch_data_vio_logical_callback(data_vio, finish_block_map_allocation);
2153 }
2154 
2155 /*
2156  * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled,
2157  * to prevent deduplication against the block after we release the write lock on it, but before we
2158  * write out the page.
2159  */
set_block_map_page_reference_count(struct vdo_completion * completion)2160 static void set_block_map_page_reference_count(struct vdo_completion *completion)
2161 {
2162 	struct data_vio *data_vio = as_data_vio(completion);
2163 
2164 	assert_data_vio_in_allocated_zone(data_vio);
2165 
2166 	completion->callback = release_block_map_write_lock;
2167 	vdo_modify_reference_count(completion, &data_vio->increment_updater);
2168 }
2169 
journal_block_map_allocation(struct vdo_completion * completion)2170 static void journal_block_map_allocation(struct vdo_completion *completion)
2171 {
2172 	struct data_vio *data_vio = as_data_vio(completion);
2173 
2174 	assert_data_vio_in_journal_zone(data_vio);
2175 
2176 	set_data_vio_allocated_zone_callback(data_vio,
2177 					     set_block_map_page_reference_count);
2178 	vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
2179 }
2180 
allocate_block(struct vdo_completion * completion)2181 static void allocate_block(struct vdo_completion *completion)
2182 {
2183 	struct data_vio *data_vio = as_data_vio(completion);
2184 	struct tree_lock *lock = &data_vio->tree_lock;
2185 	physical_block_number_t pbn;
2186 
2187 	assert_data_vio_in_allocated_zone(data_vio);
2188 
2189 	if (!vdo_allocate_block_in_zone(data_vio))
2190 		return;
2191 
2192 	pbn = data_vio->allocation.pbn;
2193 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn;
2194 	data_vio->increment_updater = (struct reference_updater) {
2195 		.operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING,
2196 		.increment = true,
2197 		.zpbn = {
2198 			.pbn = pbn,
2199 			.state = VDO_MAPPING_STATE_UNCOMPRESSED,
2200 		},
2201 		.lock = data_vio->allocation.lock,
2202 	};
2203 
2204 	launch_data_vio_journal_callback(data_vio, journal_block_map_allocation);
2205 }
2206 
allocate_block_map_page(struct block_map_zone * zone,struct data_vio * data_vio)2207 static void allocate_block_map_page(struct block_map_zone *zone,
2208 				    struct data_vio *data_vio)
2209 {
2210 	int result;
2211 
2212 	if (!data_vio->write || data_vio->is_discard) {
2213 		/* This is a pure read or a discard, so there's nothing left to do here. */
2214 		finish_lookup(data_vio, VDO_SUCCESS);
2215 		return;
2216 	}
2217 
2218 	result = attempt_page_lock(zone, data_vio);
2219 	if (result != VDO_SUCCESS) {
2220 		abort_lookup(data_vio, result, "allocation");
2221 		return;
2222 	}
2223 
2224 	if (!data_vio->tree_lock.locked)
2225 		return;
2226 
2227 	data_vio_allocate_data_block(data_vio, VIO_BLOCK_MAP_WRITE_LOCK,
2228 				     allocate_block, allocation_failure);
2229 }
2230 
2231 /**
2232  * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio
2233  *                             resides and cache that result in the data_vio.
2234  * @data_vio: The data vio.
2235  *
2236  * All ancestors in the tree will be allocated or loaded, as needed.
2237  */
vdo_find_block_map_slot(struct data_vio * data_vio)2238 void vdo_find_block_map_slot(struct data_vio *data_vio)
2239 {
2240 	page_number_t page_index;
2241 	struct block_map_tree_slot tree_slot;
2242 	struct data_location mapping;
2243 	struct block_map_page *page = NULL;
2244 	struct tree_lock *lock = &data_vio->tree_lock;
2245 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2246 
2247 	zone->active_lookups++;
2248 	if (vdo_is_state_draining(&zone->state)) {
2249 		finish_lookup(data_vio, VDO_SHUTTING_DOWN);
2250 		return;
2251 	}
2252 
2253 	lock->tree_slots[0].block_map_slot.slot =
2254 		data_vio->logical.lbn % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2255 	page_index = (lock->tree_slots[0].page_index / zone->block_map->root_count);
2256 	tree_slot = (struct block_map_tree_slot) {
2257 		.page_index = page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2258 		.block_map_slot = {
2259 			.pbn = 0,
2260 			.slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2261 		},
2262 	};
2263 
2264 	for (lock->height = 1; lock->height <= VDO_BLOCK_MAP_TREE_HEIGHT; lock->height++) {
2265 		physical_block_number_t pbn;
2266 
2267 		lock->tree_slots[lock->height] = tree_slot;
2268 		page = (struct block_map_page *) (get_tree_page(zone, lock)->page_buffer);
2269 		pbn = vdo_get_block_map_page_pbn(page);
2270 		if (pbn != VDO_ZERO_BLOCK) {
2271 			lock->tree_slots[lock->height].block_map_slot.pbn = pbn;
2272 			break;
2273 		}
2274 
2275 		/* Calculate the index and slot for the next level. */
2276 		tree_slot.block_map_slot.slot =
2277 			tree_slot.page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2278 		tree_slot.page_index = tree_slot.page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2279 	}
2280 
2281 	/* The page at this height has been allocated and loaded. */
2282 	mapping = vdo_unpack_block_map_entry(&page->entries[tree_slot.block_map_slot.slot]);
2283 	if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
2284 		vdo_log_error_strerror(VDO_BAD_MAPPING,
2285 				       "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
2286 				       (unsigned long long) mapping.pbn, mapping.state,
2287 				       lock->tree_slots[lock->height - 1].page_index,
2288 				       lock->height - 1);
2289 		abort_load(data_vio, VDO_BAD_MAPPING);
2290 		return;
2291 	}
2292 
2293 	if (!vdo_is_mapped_location(&mapping)) {
2294 		/* The page we want one level down has not been allocated, so allocate it. */
2295 		allocate_block_map_page(zone, data_vio);
2296 		return;
2297 	}
2298 
2299 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
2300 	if (lock->height == 1) {
2301 		/* This is the ultimate block map page, so we're done */
2302 		finish_lookup(data_vio, VDO_SUCCESS);
2303 		return;
2304 	}
2305 
2306 	/* We know what page we need to load. */
2307 	load_block_map_page(zone, data_vio);
2308 }
2309 
2310 /*
2311  * Find the PBN of a leaf block map page. This method may only be used after all allocated tree
2312  * pages have been loaded, otherwise, it may give the wrong answer (0).
2313  */
vdo_find_block_map_page_pbn(struct block_map * map,page_number_t page_number)2314 physical_block_number_t vdo_find_block_map_page_pbn(struct block_map *map,
2315 						    page_number_t page_number)
2316 {
2317 	struct data_location mapping;
2318 	struct tree_page *tree_page;
2319 	struct block_map_page *page;
2320 	root_count_t root_index = page_number % map->root_count;
2321 	page_number_t page_index = page_number / map->root_count;
2322 	slot_number_t slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2323 
2324 	page_index /= VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2325 
2326 	tree_page = get_tree_page_by_index(map->forest, root_index, 1, page_index);
2327 	page = (struct block_map_page *) tree_page->page_buffer;
2328 	if (!page->header.initialized)
2329 		return VDO_ZERO_BLOCK;
2330 
2331 	mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
2332 	if (!vdo_is_valid_location(&mapping) || vdo_is_state_compressed(mapping.state))
2333 		return VDO_ZERO_BLOCK;
2334 	return mapping.pbn;
2335 }
2336 
2337 /*
2338  * Write a tree page or indicate that it has been re-dirtied if it is already being written. This
2339  * method is used when correcting errors in the tree during read-only rebuild.
2340  */
vdo_write_tree_page(struct tree_page * page,struct block_map_zone * zone)2341 void vdo_write_tree_page(struct tree_page *page, struct block_map_zone *zone)
2342 {
2343 	bool waiting = vdo_waiter_is_waiting(&page->waiter);
2344 
2345 	if (waiting && (zone->flusher == page))
2346 		return;
2347 
2348 	set_generation(zone, page, zone->generation);
2349 	if (waiting || page->writing)
2350 		return;
2351 
2352 	enqueue_page(page, zone);
2353 }
2354 
make_segment(struct forest * old_forest,block_count_t new_pages,struct boundary * new_boundary,struct forest * forest)2355 static int make_segment(struct forest *old_forest, block_count_t new_pages,
2356 			struct boundary *new_boundary, struct forest *forest)
2357 {
2358 	size_t index = (old_forest == NULL) ? 0 : old_forest->segments;
2359 	struct tree_page *page_ptr;
2360 	page_count_t segment_sizes[VDO_BLOCK_MAP_TREE_HEIGHT];
2361 	height_t height;
2362 	root_count_t root;
2363 	int result;
2364 
2365 	forest->segments = index + 1;
2366 
2367 	result = vdo_allocate(forest->segments, struct boundary,
2368 			      "forest boundary array", &forest->boundaries);
2369 	if (result != VDO_SUCCESS)
2370 		return result;
2371 
2372 	result = vdo_allocate(forest->segments, struct tree_page *,
2373 			      "forest page pointers", &forest->pages);
2374 	if (result != VDO_SUCCESS)
2375 		return result;
2376 
2377 	result = vdo_allocate(new_pages, struct tree_page,
2378 			      "new forest pages", &forest->pages[index]);
2379 	if (result != VDO_SUCCESS)
2380 		return result;
2381 
2382 	if (index > 0) {
2383 		memcpy(forest->boundaries, old_forest->boundaries,
2384 		       index * sizeof(struct boundary));
2385 		memcpy(forest->pages, old_forest->pages,
2386 		       index * sizeof(struct tree_page *));
2387 	}
2388 
2389 	memcpy(&(forest->boundaries[index]), new_boundary, sizeof(struct boundary));
2390 
2391 	for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2392 		segment_sizes[height] = new_boundary->levels[height];
2393 		if (index > 0)
2394 			segment_sizes[height] -= old_forest->boundaries[index - 1].levels[height];
2395 	}
2396 
2397 	page_ptr = forest->pages[index];
2398 	for (root = 0; root < forest->map->root_count; root++) {
2399 		struct block_map_tree_segment *segment;
2400 		struct block_map_tree *tree = &(forest->trees[root]);
2401 		height_t height;
2402 
2403 		int result = vdo_allocate(forest->segments,
2404 					  struct block_map_tree_segment,
2405 					  "tree root segments", &tree->segments);
2406 		if (result != VDO_SUCCESS)
2407 			return result;
2408 
2409 		if (index > 0) {
2410 			memcpy(tree->segments, old_forest->trees[root].segments,
2411 			       index * sizeof(struct block_map_tree_segment));
2412 		}
2413 
2414 		segment = &(tree->segments[index]);
2415 		for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2416 			if (segment_sizes[height] == 0)
2417 				continue;
2418 
2419 			segment->levels[height] = page_ptr;
2420 			if (height == (VDO_BLOCK_MAP_TREE_HEIGHT - 1)) {
2421 				/* Record the root. */
2422 				struct block_map_page *page =
2423 					vdo_format_block_map_page(page_ptr->page_buffer,
2424 								  forest->map->nonce,
2425 								  VDO_INVALID_PBN, true);
2426 				page->entries[0] =
2427 					vdo_pack_block_map_entry(forest->map->root_origin + root,
2428 								 VDO_MAPPING_STATE_UNCOMPRESSED);
2429 			}
2430 			page_ptr += segment_sizes[height];
2431 		}
2432 	}
2433 
2434 	return VDO_SUCCESS;
2435 }
2436 
deforest(struct forest * forest,size_t first_page_segment)2437 static void deforest(struct forest *forest, size_t first_page_segment)
2438 {
2439 	root_count_t root;
2440 
2441 	if (forest->pages != NULL) {
2442 		size_t segment;
2443 
2444 		for (segment = first_page_segment; segment < forest->segments; segment++)
2445 			vdo_free(forest->pages[segment]);
2446 		vdo_free(forest->pages);
2447 	}
2448 
2449 	for (root = 0; root < forest->map->root_count; root++)
2450 		vdo_free(forest->trees[root].segments);
2451 
2452 	vdo_free(forest->boundaries);
2453 	vdo_free(forest);
2454 }
2455 
2456 /**
2457  * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if
2458  *                 there is one.
2459  * @map: The block map.
2460  * @entries: The number of entries the block map will hold.
2461  *
2462  * Return: VDO_SUCCESS or an error.
2463  */
make_forest(struct block_map * map,block_count_t entries)2464 static int make_forest(struct block_map *map, block_count_t entries)
2465 {
2466 	struct forest *forest, *old_forest = map->forest;
2467 	struct boundary new_boundary, *old_boundary = NULL;
2468 	block_count_t new_pages;
2469 	int result;
2470 
2471 	if (old_forest != NULL)
2472 		old_boundary = &(old_forest->boundaries[old_forest->segments - 1]);
2473 
2474 	new_pages = vdo_compute_new_forest_pages(map->root_count, old_boundary,
2475 						 entries, &new_boundary);
2476 	if (new_pages == 0) {
2477 		map->next_entry_count = entries;
2478 		return VDO_SUCCESS;
2479 	}
2480 
2481 	result = vdo_allocate_extended(struct forest, map->root_count,
2482 				       struct block_map_tree, __func__,
2483 				       &forest);
2484 	if (result != VDO_SUCCESS)
2485 		return result;
2486 
2487 	forest->map = map;
2488 	result = make_segment(old_forest, new_pages, &new_boundary, forest);
2489 	if (result != VDO_SUCCESS) {
2490 		deforest(forest, forest->segments - 1);
2491 		return result;
2492 	}
2493 
2494 	map->next_forest = forest;
2495 	map->next_entry_count = entries;
2496 	return VDO_SUCCESS;
2497 }
2498 
2499 /**
2500  * replace_forest() - Replace a block_map's forest with the already-prepared larger forest.
2501  * @map: The block map.
2502  */
replace_forest(struct block_map * map)2503 static void replace_forest(struct block_map *map)
2504 {
2505 	if (map->next_forest != NULL) {
2506 		if (map->forest != NULL)
2507 			deforest(map->forest, map->forest->segments);
2508 		map->forest = vdo_forget(map->next_forest);
2509 	}
2510 
2511 	map->entry_count = map->next_entry_count;
2512 	map->next_entry_count = 0;
2513 }
2514 
2515 /**
2516  * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the
2517  *                   traversal.
2518  * @cursor: The cursor to complete.
2519  */
finish_cursor(struct cursor * cursor)2520 static void finish_cursor(struct cursor *cursor)
2521 {
2522 	struct cursors *cursors = cursor->parent;
2523 	struct vdo_completion *completion = cursors->completion;
2524 
2525 	return_vio_to_pool(vdo_forget(cursor->vio));
2526 	if (--cursors->active_roots > 0)
2527 		return;
2528 
2529 	vdo_free(cursors);
2530 
2531 	vdo_finish_completion(completion);
2532 }
2533 
2534 static void traverse(struct cursor *cursor);
2535 
2536 /**
2537  * continue_traversal() - Continue traversing a block map tree.
2538  * @completion: The VIO doing a read or write.
2539  */
continue_traversal(struct vdo_completion * completion)2540 static void continue_traversal(struct vdo_completion *completion)
2541 {
2542 	vio_record_metadata_io_error(as_vio(completion));
2543 	traverse(completion->parent);
2544 }
2545 
2546 /**
2547  * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded.
2548  * @completion: The VIO doing the read.
2549  */
finish_traversal_load(struct vdo_completion * completion)2550 static void finish_traversal_load(struct vdo_completion *completion)
2551 {
2552 	struct cursor *cursor = completion->parent;
2553 	height_t height = cursor->height;
2554 	struct cursor_level *level = &cursor->levels[height];
2555 	struct tree_page *tree_page =
2556 		&(cursor->tree->segments[0].levels[height][level->page_index]);
2557 	struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2558 
2559 	vdo_copy_valid_page(cursor->vio->vio.data,
2560 			    cursor->parent->zone->block_map->nonce,
2561 			    pbn_from_vio_bio(cursor->vio->vio.bio), page);
2562 	traverse(cursor);
2563 }
2564 
traversal_endio(struct bio * bio)2565 static void traversal_endio(struct bio *bio)
2566 {
2567 	struct vio *vio = bio->bi_private;
2568 	struct cursor *cursor = vio->completion.parent;
2569 
2570 	continue_vio_after_io(vio, finish_traversal_load,
2571 			      cursor->parent->zone->thread_id);
2572 }
2573 
2574 /**
2575  * traverse() - Traverse a single block map tree.
2576  * @cursor: A cursor tracking traversal progress.
2577  *
2578  * This is the recursive heart of the traversal process.
2579  */
traverse(struct cursor * cursor)2580 static void traverse(struct cursor *cursor)
2581 {
2582 	for (; cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT; cursor->height++) {
2583 		height_t height = cursor->height;
2584 		struct cursor_level *level = &cursor->levels[height];
2585 		struct tree_page *tree_page =
2586 			&(cursor->tree->segments[0].levels[height][level->page_index]);
2587 		struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2588 
2589 		if (!page->header.initialized)
2590 			continue;
2591 
2592 		for (; level->slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; level->slot++) {
2593 			struct cursor_level *next_level;
2594 			page_number_t entry_index =
2595 				(VDO_BLOCK_MAP_ENTRIES_PER_PAGE * level->page_index) + level->slot;
2596 			struct data_location location =
2597 				vdo_unpack_block_map_entry(&page->entries[level->slot]);
2598 
2599 			if (!vdo_is_valid_location(&location)) {
2600 				/* This entry is invalid, so remove it from the page. */
2601 				page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2602 				vdo_write_tree_page(tree_page, cursor->parent->zone);
2603 				continue;
2604 			}
2605 
2606 			if (!vdo_is_mapped_location(&location))
2607 				continue;
2608 
2609 			/* Erase mapped entries past the end of the logical space. */
2610 			if (entry_index >= cursor->boundary.levels[height]) {
2611 				page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2612 				vdo_write_tree_page(tree_page, cursor->parent->zone);
2613 				continue;
2614 			}
2615 
2616 			if (cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT - 1) {
2617 				int result = cursor->parent->entry_callback(location.pbn,
2618 									    cursor->parent->completion);
2619 				if (result != VDO_SUCCESS) {
2620 					page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2621 					vdo_write_tree_page(tree_page, cursor->parent->zone);
2622 					continue;
2623 				}
2624 			}
2625 
2626 			if (cursor->height == 0)
2627 				continue;
2628 
2629 			cursor->height--;
2630 			next_level = &cursor->levels[cursor->height];
2631 			next_level->page_index = entry_index;
2632 			next_level->slot = 0;
2633 			level->slot++;
2634 			vdo_submit_metadata_vio(&cursor->vio->vio, location.pbn,
2635 						traversal_endio, continue_traversal,
2636 						REQ_OP_READ | REQ_PRIO);
2637 			return;
2638 		}
2639 	}
2640 
2641 	finish_cursor(cursor);
2642 }
2643 
2644 /**
2645  * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with
2646  *                   which to load pages.
2647  * @waiter: The parent of the cursor to launch.
2648  * @context: The pooled_vio just acquired.
2649  *
2650  * Implements waiter_callback_fn.
2651  */
launch_cursor(struct vdo_waiter * waiter,void * context)2652 static void launch_cursor(struct vdo_waiter *waiter, void *context)
2653 {
2654 	struct cursor *cursor = container_of(waiter, struct cursor, waiter);
2655 	struct pooled_vio *pooled = context;
2656 
2657 	cursor->vio = pooled;
2658 	pooled->vio.completion.parent = cursor;
2659 	pooled->vio.completion.callback_thread_id = cursor->parent->zone->thread_id;
2660 	traverse(cursor);
2661 }
2662 
2663 /**
2664  * compute_boundary() - Compute the number of pages used at each level of the given root's tree.
2665  * @map: The block map.
2666  * @root_index: The tree root index.
2667  *
2668  * Return: The list of page counts as a boundary structure.
2669  */
compute_boundary(struct block_map * map,root_count_t root_index)2670 static struct boundary compute_boundary(struct block_map *map, root_count_t root_index)
2671 {
2672 	struct boundary boundary;
2673 	height_t height;
2674 	page_count_t leaf_pages = vdo_compute_block_map_page_count(map->entry_count);
2675 	/*
2676 	 * Compute the leaf pages for this root. If the number of leaf pages does not distribute
2677 	 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to
2678 	 * roots starting from tree 0.
2679 	 */
2680 	page_count_t last_tree_root = (leaf_pages - 1) % map->root_count;
2681 	page_count_t level_pages = leaf_pages / map->root_count;
2682 
2683 	if (root_index <= last_tree_root)
2684 		level_pages++;
2685 
2686 	for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT - 1; height++) {
2687 		boundary.levels[height] = level_pages;
2688 		level_pages = DIV_ROUND_UP(level_pages, VDO_BLOCK_MAP_ENTRIES_PER_PAGE);
2689 	}
2690 
2691 	/* The root node always exists, even if the root is otherwise unused. */
2692 	boundary.levels[VDO_BLOCK_MAP_TREE_HEIGHT - 1] = 1;
2693 
2694 	return boundary;
2695 }
2696 
2697 /**
2698  * vdo_traverse_forest() - Walk the entire forest of a block map.
2699  * @map: The block map.
2700  * @callback: A function to call with the pbn of each allocated node in the forest.
2701  * @completion: The completion to notify on each traversed PBN, and when traversal completes.
2702  */
vdo_traverse_forest(struct block_map * map,vdo_entry_callback_fn callback,struct vdo_completion * completion)2703 void vdo_traverse_forest(struct block_map *map, vdo_entry_callback_fn callback,
2704 			 struct vdo_completion *completion)
2705 {
2706 	root_count_t root;
2707 	struct cursors *cursors;
2708 	int result;
2709 
2710 	result = vdo_allocate_extended(struct cursors, map->root_count,
2711 				       struct cursor, __func__, &cursors);
2712 	if (result != VDO_SUCCESS) {
2713 		vdo_fail_completion(completion, result);
2714 		return;
2715 	}
2716 
2717 	cursors->zone = &map->zones[0];
2718 	cursors->pool = cursors->zone->vio_pool;
2719 	cursors->entry_callback = callback;
2720 	cursors->completion = completion;
2721 	cursors->active_roots = map->root_count;
2722 	for (root = 0; root < map->root_count; root++) {
2723 		struct cursor *cursor = &cursors->cursors[root];
2724 
2725 		*cursor = (struct cursor) {
2726 			.tree = &map->forest->trees[root],
2727 			.height = VDO_BLOCK_MAP_TREE_HEIGHT - 1,
2728 			.parent = cursors,
2729 			.boundary = compute_boundary(map, root),
2730 		};
2731 
2732 		cursor->waiter.callback = launch_cursor;
2733 		acquire_vio_from_pool(cursors->pool, &cursor->waiter);
2734 	}
2735 }
2736 
2737 /**
2738  * initialize_block_map_zone() - Initialize the per-zone portions of the block map.
2739  * @map: The block map.
2740  * @zone_number: The zone to initialize.
2741  * @cache_size: The total block map cache size.
2742  * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be
2743  *               written out.
2744  */
initialize_block_map_zone(struct block_map * map,zone_count_t zone_number,page_count_t cache_size,block_count_t maximum_age)2745 static int __must_check initialize_block_map_zone(struct block_map *map,
2746 						  zone_count_t zone_number,
2747 						  page_count_t cache_size,
2748 						  block_count_t maximum_age)
2749 {
2750 	int result;
2751 	block_count_t i;
2752 	struct vdo *vdo = map->vdo;
2753 	struct block_map_zone *zone = &map->zones[zone_number];
2754 
2755 	BUILD_BUG_ON(sizeof(struct page_descriptor) != sizeof(u64));
2756 
2757 	zone->zone_number = zone_number;
2758 	zone->thread_id = vdo->thread_config.logical_threads[zone_number];
2759 	zone->block_map = map;
2760 
2761 	result = vdo_allocate_extended(struct dirty_lists, maximum_age,
2762 				       dirty_era_t, __func__,
2763 				       &zone->dirty_lists);
2764 	if (result != VDO_SUCCESS)
2765 		return result;
2766 
2767 	zone->dirty_lists->maximum_age = maximum_age;
2768 	INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_TREE_PAGE]);
2769 	INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_CACHE_PAGE]);
2770 
2771 	for (i = 0; i < maximum_age; i++) {
2772 		INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_TREE_PAGE]);
2773 		INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_CACHE_PAGE]);
2774 	}
2775 
2776 	result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->loading_pages);
2777 	if (result != VDO_SUCCESS)
2778 		return result;
2779 
2780 	result = make_vio_pool(vdo, BLOCK_MAP_VIO_POOL_SIZE, 1,
2781 			       zone->thread_id, VIO_TYPE_BLOCK_MAP_INTERIOR,
2782 			       VIO_PRIORITY_METADATA, zone, &zone->vio_pool);
2783 	if (result != VDO_SUCCESS)
2784 		return result;
2785 
2786 	vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2787 
2788 	zone->page_cache.zone = zone;
2789 	zone->page_cache.vdo = vdo;
2790 	zone->page_cache.page_count = cache_size / map->zone_count;
2791 	zone->page_cache.stats.free_pages = zone->page_cache.page_count;
2792 
2793 	result = allocate_cache_components(&zone->page_cache);
2794 	if (result != VDO_SUCCESS)
2795 		return result;
2796 
2797 	/* initialize empty circular queues */
2798 	INIT_LIST_HEAD(&zone->page_cache.lru_list);
2799 	INIT_LIST_HEAD(&zone->page_cache.outgoing_list);
2800 
2801 	return VDO_SUCCESS;
2802 }
2803 
2804 /* Implements vdo_zone_thread_getter_fn */
get_block_map_zone_thread_id(void * context,zone_count_t zone_number)2805 static thread_id_t get_block_map_zone_thread_id(void *context, zone_count_t zone_number)
2806 {
2807 	struct block_map *map = context;
2808 
2809 	return map->zones[zone_number].thread_id;
2810 }
2811 
2812 /* Implements vdo_action_preamble_fn */
prepare_for_era_advance(void * context,struct vdo_completion * parent)2813 static void prepare_for_era_advance(void *context, struct vdo_completion *parent)
2814 {
2815 	struct block_map *map = context;
2816 
2817 	map->current_era_point = map->pending_era_point;
2818 	vdo_finish_completion(parent);
2819 }
2820 
2821 /* Implements vdo_zone_action_fn */
advance_block_map_zone_era(void * context,zone_count_t zone_number,struct vdo_completion * parent)2822 static void advance_block_map_zone_era(void *context, zone_count_t zone_number,
2823 				       struct vdo_completion *parent)
2824 {
2825 	struct block_map *map = context;
2826 	struct block_map_zone *zone = &map->zones[zone_number];
2827 
2828 	update_period(zone->dirty_lists, map->current_era_point);
2829 	write_expired_elements(zone);
2830 	vdo_finish_completion(parent);
2831 }
2832 
2833 /*
2834  * Schedule an era advance if necessary. This method should not be called directly. Rather, call
2835  * vdo_schedule_default_action() on the block map's action manager.
2836  *
2837  * Implements vdo_action_scheduler_fn.
2838  */
schedule_era_advance(void * context)2839 static bool schedule_era_advance(void *context)
2840 {
2841 	struct block_map *map = context;
2842 
2843 	if (map->current_era_point == map->pending_era_point)
2844 		return false;
2845 
2846 	return vdo_schedule_action(map->action_manager, prepare_for_era_advance,
2847 				   advance_block_map_zone_era, NULL, NULL);
2848 }
2849 
uninitialize_block_map_zone(struct block_map_zone * zone)2850 static void uninitialize_block_map_zone(struct block_map_zone *zone)
2851 {
2852 	struct vdo_page_cache *cache = &zone->page_cache;
2853 
2854 	vdo_free(vdo_forget(zone->dirty_lists));
2855 	free_vio_pool(vdo_forget(zone->vio_pool));
2856 	vdo_int_map_free(vdo_forget(zone->loading_pages));
2857 	if (cache->infos != NULL) {
2858 		struct page_info *info;
2859 
2860 		for (info = cache->infos; info < cache->infos + cache->page_count; info++)
2861 			free_vio(vdo_forget(info->vio));
2862 	}
2863 
2864 	vdo_int_map_free(vdo_forget(cache->page_map));
2865 	vdo_free(vdo_forget(cache->infos));
2866 	vdo_free(vdo_forget(cache->pages));
2867 }
2868 
vdo_free_block_map(struct block_map * map)2869 void vdo_free_block_map(struct block_map *map)
2870 {
2871 	zone_count_t zone;
2872 
2873 	if (map == NULL)
2874 		return;
2875 
2876 	for (zone = 0; zone < map->zone_count; zone++)
2877 		uninitialize_block_map_zone(&map->zones[zone]);
2878 
2879 	vdo_abandon_block_map_growth(map);
2880 	if (map->forest != NULL)
2881 		deforest(vdo_forget(map->forest), 0);
2882 	vdo_free(vdo_forget(map->action_manager));
2883 	vdo_free(map);
2884 }
2885 
2886 /* @journal may be NULL. */
vdo_decode_block_map(struct block_map_state_2_0 state,block_count_t logical_blocks,struct vdo * vdo,struct recovery_journal * journal,nonce_t nonce,page_count_t cache_size,block_count_t maximum_age,struct block_map ** map_ptr)2887 int vdo_decode_block_map(struct block_map_state_2_0 state, block_count_t logical_blocks,
2888 			 struct vdo *vdo, struct recovery_journal *journal,
2889 			 nonce_t nonce, page_count_t cache_size, block_count_t maximum_age,
2890 			 struct block_map **map_ptr)
2891 {
2892 	struct block_map *map;
2893 	int result;
2894 	zone_count_t zone = 0;
2895 
2896 	BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE !=
2897 		     ((VDO_BLOCK_SIZE - sizeof(struct block_map_page)) /
2898 		      sizeof(struct block_map_entry)));
2899 	result = VDO_ASSERT(cache_size > 0, "block map cache size is specified");
2900 	if (result != VDO_SUCCESS)
2901 		return result;
2902 
2903 	result = vdo_allocate_extended(struct block_map,
2904 				       vdo->thread_config.logical_zone_count,
2905 				       struct block_map_zone, __func__, &map);
2906 	if (result != VDO_SUCCESS)
2907 		return result;
2908 
2909 	map->vdo = vdo;
2910 	map->root_origin = state.root_origin;
2911 	map->root_count = state.root_count;
2912 	map->entry_count = logical_blocks;
2913 	map->journal = journal;
2914 	map->nonce = nonce;
2915 
2916 	result = make_forest(map, map->entry_count);
2917 	if (result != VDO_SUCCESS) {
2918 		vdo_free_block_map(map);
2919 		return result;
2920 	}
2921 
2922 	replace_forest(map);
2923 
2924 	map->zone_count = vdo->thread_config.logical_zone_count;
2925 	for (zone = 0; zone < map->zone_count; zone++) {
2926 		result = initialize_block_map_zone(map, zone, cache_size, maximum_age);
2927 		if (result != VDO_SUCCESS) {
2928 			vdo_free_block_map(map);
2929 			return result;
2930 		}
2931 	}
2932 
2933 	result = vdo_make_action_manager(map->zone_count, get_block_map_zone_thread_id,
2934 					 vdo_get_recovery_journal_thread_id(journal),
2935 					 map, schedule_era_advance, vdo,
2936 					 &map->action_manager);
2937 	if (result != VDO_SUCCESS) {
2938 		vdo_free_block_map(map);
2939 		return result;
2940 	}
2941 
2942 	*map_ptr = map;
2943 	return VDO_SUCCESS;
2944 }
2945 
vdo_record_block_map(const struct block_map * map)2946 struct block_map_state_2_0 vdo_record_block_map(const struct block_map *map)
2947 {
2948 	return (struct block_map_state_2_0) {
2949 		.flat_page_origin = VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN,
2950 		/* This is the flat page count, which has turned out to always be 0. */
2951 		.flat_page_count = 0,
2952 		.root_origin = map->root_origin,
2953 		.root_count = map->root_count,
2954 	};
2955 }
2956 
2957 /* The block map needs to know the journals' sequence number to initialize the eras. */
vdo_initialize_block_map_from_journal(struct block_map * map,struct recovery_journal * journal)2958 void vdo_initialize_block_map_from_journal(struct block_map *map,
2959 					   struct recovery_journal *journal)
2960 {
2961 	zone_count_t z = 0;
2962 
2963 	map->current_era_point = vdo_get_recovery_journal_current_sequence_number(journal);
2964 	map->pending_era_point = map->current_era_point;
2965 
2966 	for (z = 0; z < map->zone_count; z++) {
2967 		struct dirty_lists *dirty_lists = map->zones[z].dirty_lists;
2968 
2969 		VDO_ASSERT_LOG_ONLY(dirty_lists->next_period == 0, "current period not set");
2970 		dirty_lists->oldest_period = map->current_era_point;
2971 		dirty_lists->next_period = map->current_era_point + 1;
2972 		dirty_lists->offset = map->current_era_point % dirty_lists->maximum_age;
2973 	}
2974 }
2975 
2976 /* Compute the logical zone for the LBN of a data vio. */
vdo_compute_logical_zone(struct data_vio * data_vio)2977 zone_count_t vdo_compute_logical_zone(struct data_vio *data_vio)
2978 {
2979 	struct block_map *map = vdo_from_data_vio(data_vio)->block_map;
2980 	struct tree_lock *tree_lock = &data_vio->tree_lock;
2981 	page_number_t page_number = data_vio->logical.lbn / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2982 
2983 	tree_lock->tree_slots[0].page_index = page_number;
2984 	tree_lock->root_index = page_number % map->root_count;
2985 	return (tree_lock->root_index % map->zone_count);
2986 }
2987 
vdo_advance_block_map_era(struct block_map * map,sequence_number_t recovery_block_number)2988 void vdo_advance_block_map_era(struct block_map *map,
2989 			       sequence_number_t recovery_block_number)
2990 {
2991 	if (map == NULL)
2992 		return;
2993 
2994 	map->pending_era_point = recovery_block_number;
2995 	vdo_schedule_default_action(map->action_manager);
2996 }
2997 
2998 /* Implements vdo_admin_initiator_fn */
initiate_drain(struct admin_state * state)2999 static void initiate_drain(struct admin_state *state)
3000 {
3001 	struct block_map_zone *zone = container_of(state, struct block_map_zone, state);
3002 
3003 	VDO_ASSERT_LOG_ONLY((zone->active_lookups == 0),
3004 			    "%s() called with no active lookups", __func__);
3005 
3006 	if (!vdo_is_state_suspending(state)) {
3007 		while (zone->dirty_lists->oldest_period < zone->dirty_lists->next_period)
3008 			expire_oldest_list(zone->dirty_lists);
3009 		write_expired_elements(zone);
3010 	}
3011 
3012 	check_for_drain_complete(zone);
3013 }
3014 
3015 /* Implements vdo_zone_action_fn. */
drain_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)3016 static void drain_zone(void *context, zone_count_t zone_number,
3017 		       struct vdo_completion *parent)
3018 {
3019 	struct block_map *map = context;
3020 	struct block_map_zone *zone = &map->zones[zone_number];
3021 
3022 	vdo_start_draining(&zone->state,
3023 			   vdo_get_current_manager_operation(map->action_manager),
3024 			   parent, initiate_drain);
3025 }
3026 
vdo_drain_block_map(struct block_map * map,const struct admin_state_code * operation,struct vdo_completion * parent)3027 void vdo_drain_block_map(struct block_map *map, const struct admin_state_code *operation,
3028 			 struct vdo_completion *parent)
3029 {
3030 	vdo_schedule_operation(map->action_manager, operation, NULL, drain_zone, NULL,
3031 			       parent);
3032 }
3033 
3034 /* Implements vdo_zone_action_fn. */
resume_block_map_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)3035 static void resume_block_map_zone(void *context, zone_count_t zone_number,
3036 				  struct vdo_completion *parent)
3037 {
3038 	struct block_map *map = context;
3039 	struct block_map_zone *zone = &map->zones[zone_number];
3040 
3041 	vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
3042 }
3043 
vdo_resume_block_map(struct block_map * map,struct vdo_completion * parent)3044 void vdo_resume_block_map(struct block_map *map, struct vdo_completion *parent)
3045 {
3046 	vdo_schedule_operation(map->action_manager, VDO_ADMIN_STATE_RESUMING,
3047 			       NULL, resume_block_map_zone, NULL, parent);
3048 }
3049 
3050 /* Allocate an expanded collection of trees, for a future growth. */
vdo_prepare_to_grow_block_map(struct block_map * map,block_count_t new_logical_blocks)3051 int vdo_prepare_to_grow_block_map(struct block_map *map,
3052 				  block_count_t new_logical_blocks)
3053 {
3054 	if (map->next_entry_count == new_logical_blocks)
3055 		return VDO_SUCCESS;
3056 
3057 	if (map->next_entry_count > 0)
3058 		vdo_abandon_block_map_growth(map);
3059 
3060 	if (new_logical_blocks < map->entry_count) {
3061 		map->next_entry_count = map->entry_count;
3062 		return VDO_SUCCESS;
3063 	}
3064 
3065 	return make_forest(map, new_logical_blocks);
3066 }
3067 
3068 /* Implements vdo_action_preamble_fn */
grow_forest(void * context,struct vdo_completion * completion)3069 static void grow_forest(void *context, struct vdo_completion *completion)
3070 {
3071 	replace_forest(context);
3072 	vdo_finish_completion(completion);
3073 }
3074 
3075 /* Requires vdo_prepare_to_grow_block_map() to have been previously called. */
vdo_grow_block_map(struct block_map * map,struct vdo_completion * parent)3076 void vdo_grow_block_map(struct block_map *map, struct vdo_completion *parent)
3077 {
3078 	vdo_schedule_operation(map->action_manager,
3079 			       VDO_ADMIN_STATE_SUSPENDED_OPERATION,
3080 			       grow_forest, NULL, NULL, parent);
3081 }
3082 
vdo_abandon_block_map_growth(struct block_map * map)3083 void vdo_abandon_block_map_growth(struct block_map *map)
3084 {
3085 	struct forest *forest = vdo_forget(map->next_forest);
3086 
3087 	if (forest != NULL)
3088 		deforest(forest, forest->segments - 1);
3089 
3090 	map->next_entry_count = 0;
3091 }
3092 
3093 /* Release the page completion and then continue the requester. */
finish_processing_page(struct vdo_completion * completion,int result)3094 static inline void finish_processing_page(struct vdo_completion *completion, int result)
3095 {
3096 	struct vdo_completion *parent = completion->parent;
3097 
3098 	vdo_release_page_completion(completion);
3099 	vdo_continue_completion(parent, result);
3100 }
3101 
handle_page_error(struct vdo_completion * completion)3102 static void handle_page_error(struct vdo_completion *completion)
3103 {
3104 	finish_processing_page(completion, completion->result);
3105 }
3106 
3107 /* Fetch the mapping page for a block map update, and call the provided handler when fetched. */
fetch_mapping_page(struct data_vio * data_vio,bool modifiable,vdo_action_fn action)3108 static void fetch_mapping_page(struct data_vio *data_vio, bool modifiable,
3109 			       vdo_action_fn action)
3110 {
3111 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3112 
3113 	if (vdo_is_state_draining(&zone->state)) {
3114 		continue_data_vio_with_error(data_vio, VDO_SHUTTING_DOWN);
3115 		return;
3116 	}
3117 
3118 	vdo_get_page(&data_vio->page_completion, zone,
3119 		     data_vio->tree_lock.tree_slots[0].block_map_slot.pbn,
3120 		     modifiable, &data_vio->vio.completion,
3121 		     action, handle_page_error, false);
3122 }
3123 
3124 /**
3125  * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped.
3126  * @data_vio: The data vio.
3127  *
3128  * This indicates the block map entry for the logical block is either unmapped or corrupted.
3129  */
clear_mapped_location(struct data_vio * data_vio)3130 static void clear_mapped_location(struct data_vio *data_vio)
3131 {
3132 	data_vio->mapped = (struct zoned_pbn) {
3133 		.state = VDO_MAPPING_STATE_UNMAPPED,
3134 	};
3135 }
3136 
3137 /**
3138  * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a
3139  *                         data_vio.
3140  * @data_vio: The data vio.
3141  * @entry: The new mapped entry to set.
3142  *
3143  * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any
3144  *         other failure
3145  */
set_mapped_location(struct data_vio * data_vio,const struct block_map_entry * entry)3146 static int __must_check set_mapped_location(struct data_vio *data_vio,
3147 					    const struct block_map_entry *entry)
3148 {
3149 	/* Unpack the PBN for logging purposes even if the entry is invalid. */
3150 	struct data_location mapped = vdo_unpack_block_map_entry(entry);
3151 
3152 	if (vdo_is_valid_location(&mapped)) {
3153 		int result;
3154 
3155 		result = vdo_get_physical_zone(vdo_from_data_vio(data_vio),
3156 					       mapped.pbn, &data_vio->mapped.zone);
3157 		if (result == VDO_SUCCESS) {
3158 			data_vio->mapped.pbn = mapped.pbn;
3159 			data_vio->mapped.state = mapped.state;
3160 			return VDO_SUCCESS;
3161 		}
3162 
3163 		/*
3164 		 * Return all errors not specifically known to be errors from validating the
3165 		 * location.
3166 		 */
3167 		if ((result != VDO_OUT_OF_RANGE) && (result != VDO_BAD_MAPPING))
3168 			return result;
3169 	}
3170 
3171 	/*
3172 	 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases
3173 	 * to VDO_BAD_MAPPING.
3174 	 */
3175 	vdo_log_error_strerror(VDO_BAD_MAPPING,
3176 			       "PBN %llu with state %u read from the block map was invalid",
3177 			       (unsigned long long) mapped.pbn, mapped.state);
3178 
3179 	/*
3180 	 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding
3181 	 * known data loss.
3182 	 */
3183 	if (!data_vio->write)
3184 		return VDO_BAD_MAPPING;
3185 
3186 	/*
3187 	 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped
3188 	 * entry rather than fail the write.
3189 	 */
3190 	clear_mapped_location(data_vio);
3191 	return VDO_SUCCESS;
3192 }
3193 
3194 /* This callback is registered in vdo_get_mapped_block(). */
get_mapping_from_fetched_page(struct vdo_completion * completion)3195 static void get_mapping_from_fetched_page(struct vdo_completion *completion)
3196 {
3197 	int result;
3198 	struct vdo_page_completion *vpc = as_vdo_page_completion(completion);
3199 	const struct block_map_page *page;
3200 	const struct block_map_entry *entry;
3201 	struct data_vio *data_vio = as_data_vio(completion->parent);
3202 	struct block_map_tree_slot *tree_slot;
3203 
3204 	if (completion->result != VDO_SUCCESS) {
3205 		finish_processing_page(completion, completion->result);
3206 		return;
3207 	}
3208 
3209 	result = validate_completed_page(vpc, false);
3210 	if (result != VDO_SUCCESS) {
3211 		finish_processing_page(completion, result);
3212 		return;
3213 	}
3214 
3215 	page = (const struct block_map_page *) get_page_buffer(vpc->info);
3216 	tree_slot = &data_vio->tree_lock.tree_slots[0];
3217 	entry = &page->entries[tree_slot->block_map_slot.slot];
3218 
3219 	result = set_mapped_location(data_vio, entry);
3220 	finish_processing_page(completion, result);
3221 }
3222 
vdo_update_block_map_page(struct block_map_page * page,struct data_vio * data_vio,physical_block_number_t pbn,enum block_mapping_state mapping_state,sequence_number_t * recovery_lock)3223 void vdo_update_block_map_page(struct block_map_page *page, struct data_vio *data_vio,
3224 			       physical_block_number_t pbn,
3225 			       enum block_mapping_state mapping_state,
3226 			       sequence_number_t *recovery_lock)
3227 {
3228 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3229 	struct block_map *block_map = zone->block_map;
3230 	struct recovery_journal *journal = block_map->journal;
3231 	sequence_number_t old_locked, new_locked;
3232 	struct tree_lock *tree_lock = &data_vio->tree_lock;
3233 
3234 	/* Encode the new mapping. */
3235 	page->entries[tree_lock->tree_slots[tree_lock->height].block_map_slot.slot] =
3236 		vdo_pack_block_map_entry(pbn, mapping_state);
3237 
3238 	/* Adjust references on the recovery journal blocks. */
3239 	old_locked = *recovery_lock;
3240 	new_locked = data_vio->recovery_sequence_number;
3241 
3242 	if ((old_locked == 0) || (old_locked > new_locked)) {
3243 		vdo_acquire_recovery_journal_block_reference(journal, new_locked,
3244 							     VDO_ZONE_TYPE_LOGICAL,
3245 							     zone->zone_number);
3246 
3247 		if (old_locked > 0) {
3248 			vdo_release_recovery_journal_block_reference(journal, old_locked,
3249 								     VDO_ZONE_TYPE_LOGICAL,
3250 								     zone->zone_number);
3251 		}
3252 
3253 		*recovery_lock = new_locked;
3254 	}
3255 
3256 	/*
3257 	 * FIXME: explain this more
3258 	 * Release the transferred lock from the data_vio.
3259 	 */
3260 	vdo_release_journal_entry_lock(journal, new_locked);
3261 	data_vio->recovery_sequence_number = 0;
3262 }
3263 
put_mapping_in_fetched_page(struct vdo_completion * completion)3264 static void put_mapping_in_fetched_page(struct vdo_completion *completion)
3265 {
3266 	struct data_vio *data_vio = as_data_vio(completion->parent);
3267 	sequence_number_t old_lock;
3268 	struct vdo_page_completion *vpc;
3269 	struct page_info *info;
3270 	int result;
3271 
3272 	if (completion->result != VDO_SUCCESS) {
3273 		finish_processing_page(completion, completion->result);
3274 		return;
3275 	}
3276 
3277 	vpc = as_vdo_page_completion(completion);
3278 	result = validate_completed_page(vpc, true);
3279 	if (result != VDO_SUCCESS) {
3280 		finish_processing_page(completion, result);
3281 		return;
3282 	}
3283 
3284 	info = vpc->info;
3285 	old_lock = info->recovery_lock;
3286 	vdo_update_block_map_page((struct block_map_page *) get_page_buffer(info),
3287 				  data_vio, data_vio->new_mapped.pbn,
3288 				  data_vio->new_mapped.state, &info->recovery_lock);
3289 	set_info_state(info, PS_DIRTY);
3290 	add_to_dirty_lists(info->cache->zone, &info->state_entry,
3291 			   VDO_CACHE_PAGE, old_lock, info->recovery_lock);
3292 	finish_processing_page(completion, VDO_SUCCESS);
3293 }
3294 
3295 /* Read a stored block mapping into a data_vio. */
vdo_get_mapped_block(struct data_vio * data_vio)3296 void vdo_get_mapped_block(struct data_vio *data_vio)
3297 {
3298 	if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
3299 		/*
3300 		 * We know that the block map page for this LBN has not been allocated, so the
3301 		 * block must be unmapped.
3302 		 */
3303 		clear_mapped_location(data_vio);
3304 		continue_data_vio(data_vio);
3305 		return;
3306 	}
3307 
3308 	fetch_mapping_page(data_vio, false, get_mapping_from_fetched_page);
3309 }
3310 
3311 /* Update a stored block mapping to reflect a data_vio's new mapping. */
vdo_put_mapped_block(struct data_vio * data_vio)3312 void vdo_put_mapped_block(struct data_vio *data_vio)
3313 {
3314 	fetch_mapping_page(data_vio, true, put_mapping_in_fetched_page);
3315 }
3316 
vdo_get_block_map_statistics(struct block_map * map)3317 struct block_map_statistics vdo_get_block_map_statistics(struct block_map *map)
3318 {
3319 	zone_count_t zone = 0;
3320 	struct block_map_statistics totals;
3321 
3322 	memset(&totals, 0, sizeof(struct block_map_statistics));
3323 	for (zone = 0; zone < map->zone_count; zone++) {
3324 		const struct block_map_statistics *stats =
3325 			&(map->zones[zone].page_cache.stats);
3326 
3327 		totals.dirty_pages += READ_ONCE(stats->dirty_pages);
3328 		totals.clean_pages += READ_ONCE(stats->clean_pages);
3329 		totals.free_pages += READ_ONCE(stats->free_pages);
3330 		totals.failed_pages += READ_ONCE(stats->failed_pages);
3331 		totals.incoming_pages += READ_ONCE(stats->incoming_pages);
3332 		totals.outgoing_pages += READ_ONCE(stats->outgoing_pages);
3333 		totals.cache_pressure += READ_ONCE(stats->cache_pressure);
3334 		totals.read_count += READ_ONCE(stats->read_count);
3335 		totals.write_count += READ_ONCE(stats->write_count);
3336 		totals.failed_reads += READ_ONCE(stats->failed_reads);
3337 		totals.failed_writes += READ_ONCE(stats->failed_writes);
3338 		totals.reclaimed += READ_ONCE(stats->reclaimed);
3339 		totals.read_outgoing += READ_ONCE(stats->read_outgoing);
3340 		totals.found_in_cache += READ_ONCE(stats->found_in_cache);
3341 		totals.discard_required += READ_ONCE(stats->discard_required);
3342 		totals.wait_for_page += READ_ONCE(stats->wait_for_page);
3343 		totals.fetch_required += READ_ONCE(stats->fetch_required);
3344 		totals.pages_loaded += READ_ONCE(stats->pages_loaded);
3345 		totals.pages_saved += READ_ONCE(stats->pages_saved);
3346 		totals.flush_count += READ_ONCE(stats->flush_count);
3347 	}
3348 
3349 	return totals;
3350 }
3351