xref: /linux/drivers/md/dm-vdo/block-map.c (revision 5014bebee0cffda14fafae5a2534d08120b7b9e8)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "block-map.h"
7 
8 #include <linux/bio.h>
9 #include <linux/ratelimit.h>
10 
11 #include "errors.h"
12 #include "logger.h"
13 #include "memory-alloc.h"
14 #include "permassert.h"
15 
16 #include "action-manager.h"
17 #include "admin-state.h"
18 #include "completion.h"
19 #include "constants.h"
20 #include "data-vio.h"
21 #include "encodings.h"
22 #include "io-submitter.h"
23 #include "physical-zone.h"
24 #include "recovery-journal.h"
25 #include "slab-depot.h"
26 #include "status-codes.h"
27 #include "types.h"
28 #include "vdo.h"
29 #include "vio.h"
30 #include "wait-queue.h"
31 
32 /**
33  * DOC: Block map eras
34  *
35  * The block map era, or maximum age, is used as follows:
36  *
37  * Each block map page, when dirty, records the earliest recovery journal block sequence number of
38  * the changes reflected in that dirty block. Sequence numbers are classified into eras: every
39  * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras
40  * according to the sequence number they record.
41  *
42  * In the current (newest) era, block map pages are not written unless there is cache pressure. In
43  * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in
44  * this era are issued for write. In all older eras, pages are issued for write immediately.
45  */
46 
47 struct page_descriptor {
48 	root_count_t root_index;
49 	height_t height;
50 	page_number_t page_index;
51 	slot_number_t slot;
52 } __packed;
53 
54 union page_key {
55 	struct page_descriptor descriptor;
56 	u64 key;
57 };
58 
59 struct write_if_not_dirtied_context {
60 	struct block_map_zone *zone;
61 	u8 generation;
62 };
63 
64 struct block_map_tree_segment {
65 	struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT];
66 };
67 
68 struct block_map_tree {
69 	struct block_map_tree_segment *segments;
70 };
71 
72 struct forest {
73 	struct block_map *map;
74 	size_t segments;
75 	struct boundary *boundaries;
76 	struct tree_page **pages;
77 	struct block_map_tree trees[];
78 };
79 
80 struct cursor_level {
81 	page_number_t page_index;
82 	slot_number_t slot;
83 };
84 
85 struct cursors;
86 
87 struct cursor {
88 	struct vdo_waiter waiter;
89 	struct block_map_tree *tree;
90 	height_t height;
91 	struct cursors *parent;
92 	struct boundary boundary;
93 	struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT];
94 	struct pooled_vio *vio;
95 };
96 
97 struct cursors {
98 	struct block_map_zone *zone;
99 	struct vio_pool *pool;
100 	vdo_entry_callback_fn entry_callback;
101 	struct vdo_completion *completion;
102 	root_count_t active_roots;
103 	struct cursor cursors[];
104 };
105 
106 static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF;
107 
108 /* Used to indicate that the page holding the location of a tree root has been "loaded". */
109 static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF;
110 
111 const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = {
112 	.mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F,
113 	.pbn_high_nibble = 0,
114 	.pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX),
115 };
116 
117 #define LOG_INTERVAL 4000
118 #define DISPLAY_INTERVAL 100000
119 
120 /*
121  * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread.
122  * Prevents any compiler shenanigans from affecting other threads reading those stats.
123  */
124 #define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta))
125 
is_dirty(const struct page_info * info)126 static inline bool is_dirty(const struct page_info *info)
127 {
128 	return info->state == PS_DIRTY;
129 }
130 
is_present(const struct page_info * info)131 static inline bool is_present(const struct page_info *info)
132 {
133 	return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY);
134 }
135 
is_in_flight(const struct page_info * info)136 static inline bool is_in_flight(const struct page_info *info)
137 {
138 	return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING);
139 }
140 
is_incoming(const struct page_info * info)141 static inline bool is_incoming(const struct page_info *info)
142 {
143 	return info->state == PS_INCOMING;
144 }
145 
is_outgoing(const struct page_info * info)146 static inline bool is_outgoing(const struct page_info *info)
147 {
148 	return info->state == PS_OUTGOING;
149 }
150 
is_valid(const struct page_info * info)151 static inline bool is_valid(const struct page_info *info)
152 {
153 	return is_present(info) || is_outgoing(info);
154 }
155 
get_page_buffer(struct page_info * info)156 static char *get_page_buffer(struct page_info *info)
157 {
158 	struct vdo_page_cache *cache = info->cache;
159 
160 	return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE];
161 }
162 
page_completion_from_waiter(struct vdo_waiter * waiter)163 static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter)
164 {
165 	struct vdo_page_completion *completion;
166 
167 	if (waiter == NULL)
168 		return NULL;
169 
170 	completion = container_of(waiter, struct vdo_page_completion, waiter);
171 	vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION);
172 	return completion;
173 }
174 
175 /**
176  * initialize_info() - Initialize all page info structures and put them on the free list.
177  *
178  * Return: VDO_SUCCESS or an error.
179  */
initialize_info(struct vdo_page_cache * cache)180 static int initialize_info(struct vdo_page_cache *cache)
181 {
182 	struct page_info *info;
183 
184 	INIT_LIST_HEAD(&cache->free_list);
185 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
186 		int result;
187 
188 		info->cache = cache;
189 		info->state = PS_FREE;
190 		info->pbn = NO_PAGE;
191 
192 		result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP,
193 					     VIO_PRIORITY_METADATA, info,
194 					     get_page_buffer(info), &info->vio);
195 		if (result != VDO_SUCCESS)
196 			return result;
197 
198 		/* The thread ID should never change. */
199 		info->vio->completion.callback_thread_id = cache->zone->thread_id;
200 
201 		INIT_LIST_HEAD(&info->state_entry);
202 		list_add_tail(&info->state_entry, &cache->free_list);
203 		INIT_LIST_HEAD(&info->lru_entry);
204 	}
205 
206 	return VDO_SUCCESS;
207 }
208 
209 /**
210  * allocate_cache_components() - Allocate components of the cache which require their own
211  *                               allocation.
212  *
213  * The caller is responsible for all clean up on errors.
214  *
215  * Return: VDO_SUCCESS or an error code.
216  */
allocate_cache_components(struct vdo_page_cache * cache)217 static int __must_check allocate_cache_components(struct vdo_page_cache *cache)
218 {
219 	u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE;
220 	int result;
221 
222 	result = vdo_allocate(cache->page_count, struct page_info, "page infos",
223 			      &cache->infos);
224 	if (result != VDO_SUCCESS)
225 		return result;
226 
227 	result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages);
228 	if (result != VDO_SUCCESS)
229 		return result;
230 
231 	result = vdo_int_map_create(cache->page_count, &cache->page_map);
232 	if (result != VDO_SUCCESS)
233 		return result;
234 
235 	return initialize_info(cache);
236 }
237 
238 /**
239  * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's
240  *                            thread.
241  */
assert_on_cache_thread(struct vdo_page_cache * cache,const char * function_name)242 static inline void assert_on_cache_thread(struct vdo_page_cache *cache,
243 					  const char *function_name)
244 {
245 	thread_id_t thread_id = vdo_get_callback_thread_id();
246 
247 	VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id),
248 			    "%s() must only be called on cache thread %d, not thread %d",
249 			    function_name, cache->zone->thread_id, thread_id);
250 }
251 
252 /** assert_io_allowed() - Assert that a page cache may issue I/O. */
assert_io_allowed(struct vdo_page_cache * cache)253 static inline void assert_io_allowed(struct vdo_page_cache *cache)
254 {
255 	VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state),
256 			    "VDO page cache may issue I/O");
257 }
258 
259 /** report_cache_pressure() - Log and, if enabled, report cache pressure. */
report_cache_pressure(struct vdo_page_cache * cache)260 static void report_cache_pressure(struct vdo_page_cache *cache)
261 {
262 	ADD_ONCE(cache->stats.cache_pressure, 1);
263 	if (cache->waiter_count > cache->page_count) {
264 		if ((cache->pressure_report % LOG_INTERVAL) == 0)
265 			vdo_log_info("page cache pressure %u", cache->stats.cache_pressure);
266 
267 		if (++cache->pressure_report >= DISPLAY_INTERVAL)
268 			cache->pressure_report = 0;
269 	}
270 }
271 
272 /**
273  * get_page_state_name() - Return the name of a page state.
274  *
275  * If the page state is invalid a static string is returned and the invalid state is logged.
276  *
277  * Return: A pointer to a static page state name.
278  */
get_page_state_name(enum vdo_page_buffer_state state)279 static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state)
280 {
281 	int result;
282 	static const char * const state_names[] = {
283 		"FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING"
284 	};
285 
286 	BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT);
287 
288 	result = VDO_ASSERT(state < ARRAY_SIZE(state_names),
289 			    "Unknown page_state value %d", state);
290 	if (result != VDO_SUCCESS)
291 		return "[UNKNOWN PAGE STATE]";
292 
293 	return state_names[state];
294 }
295 
296 /**
297  * update_counter() - Update the counter associated with a given state.
298  * @info: The page info to count.
299  * @delta: The delta to apply to the counter.
300  */
update_counter(struct page_info * info,s32 delta)301 static void update_counter(struct page_info *info, s32 delta)
302 {
303 	struct block_map_statistics *stats = &info->cache->stats;
304 
305 	switch (info->state) {
306 	case PS_FREE:
307 		ADD_ONCE(stats->free_pages, delta);
308 		return;
309 
310 	case PS_INCOMING:
311 		ADD_ONCE(stats->incoming_pages, delta);
312 		return;
313 
314 	case PS_OUTGOING:
315 		ADD_ONCE(stats->outgoing_pages, delta);
316 		return;
317 
318 	case PS_FAILED:
319 		ADD_ONCE(stats->failed_pages, delta);
320 		return;
321 
322 	case PS_RESIDENT:
323 		ADD_ONCE(stats->clean_pages, delta);
324 		return;
325 
326 	case PS_DIRTY:
327 		ADD_ONCE(stats->dirty_pages, delta);
328 		return;
329 
330 	default:
331 		return;
332 	}
333 }
334 
335 /** update_lru() - Update the lru information for an active page. */
update_lru(struct page_info * info)336 static void update_lru(struct page_info *info)
337 {
338 	if (info->cache->lru_list.prev != &info->lru_entry)
339 		list_move_tail(&info->lru_entry, &info->cache->lru_list);
340 }
341 
342 /**
343  * set_info_state() - Set the state of a page_info and put it on the right list, adjusting
344  *                    counters.
345  */
set_info_state(struct page_info * info,enum vdo_page_buffer_state new_state)346 static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state)
347 {
348 	if (new_state == info->state)
349 		return;
350 
351 	update_counter(info, -1);
352 	info->state = new_state;
353 	update_counter(info, 1);
354 
355 	switch (info->state) {
356 	case PS_FREE:
357 	case PS_FAILED:
358 		list_move_tail(&info->state_entry, &info->cache->free_list);
359 		return;
360 
361 	case PS_OUTGOING:
362 		list_move_tail(&info->state_entry, &info->cache->outgoing_list);
363 		return;
364 
365 	case PS_DIRTY:
366 		return;
367 
368 	default:
369 		list_del_init(&info->state_entry);
370 	}
371 }
372 
373 /** set_info_pbn() - Set the pbn for an info, updating the map as needed. */
set_info_pbn(struct page_info * info,physical_block_number_t pbn)374 static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn)
375 {
376 	struct vdo_page_cache *cache = info->cache;
377 
378 	/* Either the new or the old page number must be NO_PAGE. */
379 	int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE),
380 				"Must free a page before reusing it.");
381 	if (result != VDO_SUCCESS)
382 		return result;
383 
384 	if (info->pbn != NO_PAGE)
385 		vdo_int_map_remove(cache->page_map, info->pbn);
386 
387 	info->pbn = pbn;
388 
389 	if (pbn != NO_PAGE) {
390 		result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL);
391 		if (result != VDO_SUCCESS)
392 			return result;
393 	}
394 	return VDO_SUCCESS;
395 }
396 
397 /** reset_page_info() - Reset page info to represent an unallocated page. */
reset_page_info(struct page_info * info)398 static int reset_page_info(struct page_info *info)
399 {
400 	int result;
401 
402 	result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy");
403 	if (result != VDO_SUCCESS)
404 		return result;
405 
406 	result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting),
407 			    "VDO Page must not have waiters");
408 	if (result != VDO_SUCCESS)
409 		return result;
410 
411 	result = set_info_pbn(info, NO_PAGE);
412 	set_info_state(info, PS_FREE);
413 	list_del_init(&info->lru_entry);
414 	return result;
415 }
416 
417 /**
418  * find_free_page() - Find a free page.
419  *
420  * Return: A pointer to the page info structure (if found), NULL otherwise.
421  */
find_free_page(struct vdo_page_cache * cache)422 static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache)
423 {
424 	struct page_info *info;
425 
426 	info = list_first_entry_or_null(&cache->free_list, struct page_info,
427 					state_entry);
428 	if (info != NULL)
429 		list_del_init(&info->state_entry);
430 
431 	return info;
432 }
433 
434 /**
435  * find_page() - Find the page info (if any) associated with a given pbn.
436  * @pbn: The absolute physical block number of the page.
437  *
438  * Return: The page info for the page if available, or NULL if not.
439  */
find_page(struct vdo_page_cache * cache,physical_block_number_t pbn)440 static struct page_info * __must_check find_page(struct vdo_page_cache *cache,
441 						 physical_block_number_t pbn)
442 {
443 	if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn))
444 		return cache->last_found;
445 
446 	cache->last_found = vdo_int_map_get(cache->page_map, pbn);
447 	return cache->last_found;
448 }
449 
450 /**
451  * select_lru_page() - Determine which page is least recently used.
452  *
453  * Picks the least recently used from among the non-busy entries at the front of each of the lru
454  * list. Since whenever we mark a page busy we also put it to the end of the list it is unlikely
455  * that the entries at the front are busy unless the queue is very short, but not impossible.
456  *
457  * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be
458  *         found. The page can be dirty or resident.
459  */
select_lru_page(struct vdo_page_cache * cache)460 static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache)
461 {
462 	struct page_info *info;
463 
464 	list_for_each_entry(info, &cache->lru_list, lru_entry)
465 		if ((info->busy == 0) && !is_in_flight(info))
466 			return info;
467 
468 	return NULL;
469 }
470 
471 /* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */
472 
473 /**
474  * complete_with_page() - Helper to complete the VDO Page Completion request successfully.
475  * @info: The page info representing the result page.
476  * @vdo_page_comp: The VDO page completion to complete.
477  */
complete_with_page(struct page_info * info,struct vdo_page_completion * vdo_page_comp)478 static void complete_with_page(struct page_info *info,
479 			       struct vdo_page_completion *vdo_page_comp)
480 {
481 	bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info);
482 
483 	if (!available) {
484 		vdo_log_error_strerror(VDO_BAD_PAGE,
485 				       "Requested cache page %llu in state %s is not %s",
486 				       (unsigned long long) info->pbn,
487 				       get_page_state_name(info->state),
488 				       vdo_page_comp->writable ? "present" : "valid");
489 		vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE);
490 		return;
491 	}
492 
493 	vdo_page_comp->info = info;
494 	vdo_page_comp->ready = true;
495 	vdo_finish_completion(&vdo_page_comp->completion);
496 }
497 
498 /**
499  * complete_waiter_with_error() - Complete a page completion with an error code.
500  * @waiter: The page completion, as a waiter.
501  * @result_ptr: A pointer to the error code.
502  *
503  * Implements waiter_callback_fn.
504  */
complete_waiter_with_error(struct vdo_waiter * waiter,void * result_ptr)505 static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr)
506 {
507 	int *result = result_ptr;
508 
509 	vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result);
510 }
511 
512 /**
513  * complete_waiter_with_page() - Complete a page completion with a page.
514  * @waiter: The page completion, as a waiter.
515  * @page_info: The page info to complete with.
516  *
517  * Implements waiter_callback_fn.
518  */
complete_waiter_with_page(struct vdo_waiter * waiter,void * page_info)519 static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info)
520 {
521 	complete_with_page(page_info, page_completion_from_waiter(waiter));
522 }
523 
524 /**
525  * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result.
526  *
527  * Upon completion the waitq will be empty.
528  *
529  * Return: The number of pages distributed.
530  */
distribute_page_over_waitq(struct page_info * info,struct vdo_wait_queue * waitq)531 static unsigned int distribute_page_over_waitq(struct page_info *info,
532 					       struct vdo_wait_queue *waitq)
533 {
534 	size_t num_pages;
535 
536 	update_lru(info);
537 	num_pages = vdo_waitq_num_waiters(waitq);
538 
539 	/*
540 	 * Increment the busy count once for each pending completion so that this page does not
541 	 * stop being busy until all completions have been processed.
542 	 */
543 	info->busy += num_pages;
544 
545 	vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info);
546 	return num_pages;
547 }
548 
549 /**
550  * set_persistent_error() - Set a persistent error which all requests will receive in the future.
551  * @context: A string describing what triggered the error.
552  *
553  * Once triggered, all enqueued completions will get this error. Any future requests will result in
554  * this error as well.
555  */
set_persistent_error(struct vdo_page_cache * cache,const char * context,int result)556 static void set_persistent_error(struct vdo_page_cache *cache, const char *context,
557 				 int result)
558 {
559 	struct page_info *info;
560 	/* If we're already read-only, there's no need to log. */
561 	struct vdo *vdo = cache->vdo;
562 
563 	if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) {
564 		vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s",
565 				       context);
566 		vdo_enter_read_only_mode(vdo, result);
567 	}
568 
569 	assert_on_cache_thread(cache, __func__);
570 
571 	vdo_waitq_notify_all_waiters(&cache->free_waiters,
572 				     complete_waiter_with_error, &result);
573 	cache->waiter_count = 0;
574 
575 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
576 		vdo_waitq_notify_all_waiters(&info->waiting,
577 					     complete_waiter_with_error, &result);
578 	}
579 }
580 
581 /**
582  * validate_completed_page() - Check that a page completion which is being freed to the cache
583  *                             referred to a valid page and is in a valid state.
584  * @writable: Whether a writable page is required.
585  *
586  * Return: VDO_SUCCESS if the page was valid, otherwise as error
587  */
validate_completed_page(struct vdo_page_completion * completion,bool writable)588 static int __must_check validate_completed_page(struct vdo_page_completion *completion,
589 						bool writable)
590 {
591 	int result;
592 
593 	result = VDO_ASSERT(completion->ready, "VDO Page completion not ready");
594 	if (result != VDO_SUCCESS)
595 		return result;
596 
597 	result = VDO_ASSERT(completion->info != NULL,
598 			    "VDO Page Completion must be complete");
599 	if (result != VDO_SUCCESS)
600 		return result;
601 
602 	result = VDO_ASSERT(completion->info->pbn == completion->pbn,
603 			    "VDO Page Completion pbn must be consistent");
604 	if (result != VDO_SUCCESS)
605 		return result;
606 
607 	result = VDO_ASSERT(is_valid(completion->info),
608 			    "VDO Page Completion page must be valid");
609 	if (result != VDO_SUCCESS)
610 		return result;
611 
612 	if (writable) {
613 		result = VDO_ASSERT(completion->writable,
614 				    "VDO Page Completion must be writable");
615 		if (result != VDO_SUCCESS)
616 			return result;
617 	}
618 
619 	return VDO_SUCCESS;
620 }
621 
check_for_drain_complete(struct block_map_zone * zone)622 static void check_for_drain_complete(struct block_map_zone *zone)
623 {
624 	if (vdo_is_state_draining(&zone->state) &&
625 	    (zone->active_lookups == 0) &&
626 	    !vdo_waitq_has_waiters(&zone->flush_waiters) &&
627 	    !is_vio_pool_busy(zone->vio_pool) &&
628 	    (zone->page_cache.outstanding_reads == 0) &&
629 	    (zone->page_cache.outstanding_writes == 0)) {
630 		vdo_finish_draining_with_result(&zone->state,
631 						(vdo_is_read_only(zone->block_map->vdo) ?
632 						 VDO_READ_ONLY : VDO_SUCCESS));
633 	}
634 }
635 
enter_zone_read_only_mode(struct block_map_zone * zone,int result)636 static void enter_zone_read_only_mode(struct block_map_zone *zone, int result)
637 {
638 	vdo_enter_read_only_mode(zone->block_map->vdo, result);
639 
640 	/*
641 	 * We are in read-only mode, so we won't ever write any page out.
642 	 * Just take all waiters off the waitq so the zone can drain.
643 	 */
644 	vdo_waitq_init(&zone->flush_waiters);
645 	check_for_drain_complete(zone);
646 }
647 
648 static bool __must_check
validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion * completion,bool writable)649 validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion,
650 						bool writable)
651 {
652 	int result = validate_completed_page(completion, writable);
653 
654 	if (result == VDO_SUCCESS)
655 		return true;
656 
657 	enter_zone_read_only_mode(completion->info->cache->zone, result);
658 	return false;
659 }
660 
661 /**
662  * handle_load_error() - Handle page load errors.
663  * @completion: The page read vio.
664  */
handle_load_error(struct vdo_completion * completion)665 static void handle_load_error(struct vdo_completion *completion)
666 {
667 	int result = completion->result;
668 	struct page_info *info = completion->parent;
669 	struct vdo_page_cache *cache = info->cache;
670 
671 	assert_on_cache_thread(cache, __func__);
672 	vio_record_metadata_io_error(as_vio(completion));
673 	vdo_enter_read_only_mode(cache->zone->block_map->vdo, result);
674 	ADD_ONCE(cache->stats.failed_reads, 1);
675 	set_info_state(info, PS_FAILED);
676 	vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result);
677 	reset_page_info(info);
678 
679 	/*
680 	 * Don't decrement until right before calling check_for_drain_complete() to
681 	 * ensure that the above work can't cause the page cache to be freed out from under us.
682 	 */
683 	cache->outstanding_reads--;
684 	check_for_drain_complete(cache->zone);
685 }
686 
687 /**
688  * page_is_loaded() - Callback used when a page has been loaded.
689  * @completion: The vio which has loaded the page. Its parent is the page_info.
690  */
page_is_loaded(struct vdo_completion * completion)691 static void page_is_loaded(struct vdo_completion *completion)
692 {
693 	struct page_info *info = completion->parent;
694 	struct vdo_page_cache *cache = info->cache;
695 	nonce_t nonce = info->cache->zone->block_map->nonce;
696 	struct block_map_page *page;
697 	enum block_map_page_validity validity;
698 
699 	assert_on_cache_thread(cache, __func__);
700 
701 	page = (struct block_map_page *) get_page_buffer(info);
702 	validity = vdo_validate_block_map_page(page, nonce, info->pbn);
703 	if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
704 		physical_block_number_t pbn = vdo_get_block_map_page_pbn(page);
705 		int result = vdo_log_error_strerror(VDO_BAD_PAGE,
706 						    "Expected page %llu but got page %llu instead",
707 						    (unsigned long long) info->pbn,
708 						    (unsigned long long) pbn);
709 
710 		vdo_continue_completion(completion, result);
711 		return;
712 	}
713 
714 	if (validity == VDO_BLOCK_MAP_PAGE_INVALID)
715 		vdo_format_block_map_page(page, nonce, info->pbn, false);
716 
717 	info->recovery_lock = 0;
718 	set_info_state(info, PS_RESIDENT);
719 	distribute_page_over_waitq(info, &info->waiting);
720 
721 	/*
722 	 * Don't decrement until right before calling check_for_drain_complete() to
723 	 * ensure that the above work can't cause the page cache to be freed out from under us.
724 	 */
725 	cache->outstanding_reads--;
726 	check_for_drain_complete(cache->zone);
727 }
728 
729 /**
730  * handle_rebuild_read_error() - Handle a read error during a read-only rebuild.
731  * @completion: The page load completion.
732  */
handle_rebuild_read_error(struct vdo_completion * completion)733 static void handle_rebuild_read_error(struct vdo_completion *completion)
734 {
735 	struct page_info *info = completion->parent;
736 	struct vdo_page_cache *cache = info->cache;
737 
738 	assert_on_cache_thread(cache, __func__);
739 
740 	/*
741 	 * We are doing a read-only rebuild, so treat this as a successful read
742 	 * of an uninitialized page.
743 	 */
744 	vio_record_metadata_io_error(as_vio(completion));
745 	ADD_ONCE(cache->stats.failed_reads, 1);
746 	memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE);
747 	vdo_reset_completion(completion);
748 	page_is_loaded(completion);
749 }
750 
load_cache_page_endio(struct bio * bio)751 static void load_cache_page_endio(struct bio *bio)
752 {
753 	struct vio *vio = bio->bi_private;
754 	struct page_info *info = vio->completion.parent;
755 
756 	continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id);
757 }
758 
759 /**
760  * launch_page_load() - Begin the process of loading a page.
761  *
762  * Return: VDO_SUCCESS or an error code.
763  */
launch_page_load(struct page_info * info,physical_block_number_t pbn)764 static int __must_check launch_page_load(struct page_info *info,
765 					 physical_block_number_t pbn)
766 {
767 	int result;
768 	vdo_action_fn callback;
769 	struct vdo_page_cache *cache = info->cache;
770 
771 	assert_io_allowed(cache);
772 
773 	result = set_info_pbn(info, pbn);
774 	if (result != VDO_SUCCESS)
775 		return result;
776 
777 	result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading.");
778 	if (result != VDO_SUCCESS)
779 		return result;
780 
781 	set_info_state(info, PS_INCOMING);
782 	cache->outstanding_reads++;
783 	ADD_ONCE(cache->stats.pages_loaded, 1);
784 	callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error);
785 	vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio,
786 				callback, REQ_OP_READ | REQ_PRIO);
787 	return VDO_SUCCESS;
788 }
789 
790 static void write_pages(struct vdo_completion *completion);
791 
792 /** handle_flush_error() - Handle errors flushing the layer. */
handle_flush_error(struct vdo_completion * completion)793 static void handle_flush_error(struct vdo_completion *completion)
794 {
795 	struct page_info *info = completion->parent;
796 
797 	vio_record_metadata_io_error(as_vio(completion));
798 	set_persistent_error(info->cache, "flush failed", completion->result);
799 	write_pages(completion);
800 }
801 
flush_endio(struct bio * bio)802 static void flush_endio(struct bio *bio)
803 {
804 	struct vio *vio = bio->bi_private;
805 	struct page_info *info = vio->completion.parent;
806 
807 	continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id);
808 }
809 
810 /** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */
save_pages(struct vdo_page_cache * cache)811 static void save_pages(struct vdo_page_cache *cache)
812 {
813 	struct page_info *info;
814 	struct vio *vio;
815 
816 	if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0))
817 		return;
818 
819 	assert_io_allowed(cache);
820 
821 	info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry);
822 
823 	cache->pages_in_flush = cache->pages_to_flush;
824 	cache->pages_to_flush = 0;
825 	ADD_ONCE(cache->stats.flush_count, 1);
826 
827 	vio = info->vio;
828 
829 	/*
830 	 * We must make sure that the recovery journal entries that changed these pages were
831 	 * successfully persisted, and thus must issue a flush before each batch of pages is
832 	 * written to ensure this.
833 	 */
834 	vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
835 }
836 
837 /**
838  * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved.
839  *
840  * Once in the list, a page may not be used until it has been written out.
841  */
schedule_page_save(struct page_info * info)842 static void schedule_page_save(struct page_info *info)
843 {
844 	if (info->busy > 0) {
845 		info->write_status = WRITE_STATUS_DEFERRED;
846 		return;
847 	}
848 
849 	info->cache->pages_to_flush++;
850 	info->cache->outstanding_writes++;
851 	set_info_state(info, PS_OUTGOING);
852 }
853 
854 /**
855  * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving
856  * pages if another save is not in progress.
857  */
launch_page_save(struct page_info * info)858 static void launch_page_save(struct page_info *info)
859 {
860 	schedule_page_save(info);
861 	save_pages(info->cache);
862 }
863 
864 /**
865  * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is
866  *                           requesting a given page number.
867  * @context: A pointer to the pbn of the desired page.
868  *
869  * Implements waiter_match_fn.
870  *
871  * Return: true if the page completion is for the desired page number.
872  */
completion_needs_page(struct vdo_waiter * waiter,void * context)873 static bool completion_needs_page(struct vdo_waiter *waiter, void *context)
874 {
875 	physical_block_number_t *pbn = context;
876 
877 	return (page_completion_from_waiter(waiter)->pbn == *pbn);
878 }
879 
880 /**
881  * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and
882  *                        any other completions that match it in page number.
883  */
allocate_free_page(struct page_info * info)884 static void allocate_free_page(struct page_info *info)
885 {
886 	int result;
887 	struct vdo_waiter *oldest_waiter;
888 	physical_block_number_t pbn;
889 	struct vdo_page_cache *cache = info->cache;
890 
891 	assert_on_cache_thread(cache, __func__);
892 
893 	if (!vdo_waitq_has_waiters(&cache->free_waiters)) {
894 		if (cache->stats.cache_pressure > 0) {
895 			vdo_log_info("page cache pressure relieved");
896 			WRITE_ONCE(cache->stats.cache_pressure, 0);
897 		}
898 
899 		return;
900 	}
901 
902 	result = reset_page_info(info);
903 	if (result != VDO_SUCCESS) {
904 		set_persistent_error(cache, "cannot reset page info", result);
905 		return;
906 	}
907 
908 	oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters);
909 	pbn = page_completion_from_waiter(oldest_waiter)->pbn;
910 
911 	/*
912 	 * Remove all entries which match the page number in question and push them onto the page
913 	 * info's waitq.
914 	 */
915 	vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page,
916 					   &pbn, &info->waiting);
917 	cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting);
918 
919 	result = launch_page_load(info, pbn);
920 	if (result != VDO_SUCCESS) {
921 		vdo_waitq_notify_all_waiters(&info->waiting,
922 					     complete_waiter_with_error, &result);
923 	}
924 }
925 
926 /**
927  * discard_a_page() - Begin the process of discarding a page.
928  *
929  * If no page is discardable, increments a count of deferred frees so that the next release of a
930  * page which is no longer busy will kick off another discard cycle. This is an indication that the
931  * cache is not big enough.
932  *
933  * If the selected page is not dirty, immediately allocates the page to the oldest completion
934  * waiting for a free page.
935  */
discard_a_page(struct vdo_page_cache * cache)936 static void discard_a_page(struct vdo_page_cache *cache)
937 {
938 	struct page_info *info = select_lru_page(cache);
939 
940 	if (info == NULL) {
941 		report_cache_pressure(cache);
942 		return;
943 	}
944 
945 	if (!is_dirty(info)) {
946 		allocate_free_page(info);
947 		return;
948 	}
949 
950 	VDO_ASSERT_LOG_ONLY(!is_in_flight(info),
951 			    "page selected for discard is not in flight");
952 
953 	cache->discard_count++;
954 	info->write_status = WRITE_STATUS_DISCARD;
955 	launch_page_save(info);
956 }
957 
958 /**
959  * discard_page_for_completion() - Helper used to trigger a discard so that the completion can get
960  *                                 a different page.
961  */
discard_page_for_completion(struct vdo_page_completion * vdo_page_comp)962 static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp)
963 {
964 	struct vdo_page_cache *cache = vdo_page_comp->cache;
965 
966 	cache->waiter_count++;
967 	vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter);
968 	discard_a_page(cache);
969 }
970 
971 /**
972  * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free
973  *                            page.
974  * @cache: The page cache.
975  */
discard_page_if_needed(struct vdo_page_cache * cache)976 static void discard_page_if_needed(struct vdo_page_cache *cache)
977 {
978 	if (cache->waiter_count > cache->discard_count)
979 		discard_a_page(cache);
980 }
981 
982 /**
983  * write_has_finished() - Inform the cache that a write has finished (possibly with an error).
984  * @info: The info structure for the page whose write just completed.
985  *
986  * Return: true if the page write was a discard.
987  */
write_has_finished(struct page_info * info)988 static bool write_has_finished(struct page_info *info)
989 {
990 	bool was_discard = (info->write_status == WRITE_STATUS_DISCARD);
991 
992 	assert_on_cache_thread(info->cache, __func__);
993 	info->cache->outstanding_writes--;
994 
995 	info->write_status = WRITE_STATUS_NORMAL;
996 	return was_discard;
997 }
998 
999 /**
1000  * handle_page_write_error() - Handler for page write errors.
1001  * @completion: The page write vio.
1002  */
handle_page_write_error(struct vdo_completion * completion)1003 static void handle_page_write_error(struct vdo_completion *completion)
1004 {
1005 	int result = completion->result;
1006 	struct page_info *info = completion->parent;
1007 	struct vdo_page_cache *cache = info->cache;
1008 
1009 	vio_record_metadata_io_error(as_vio(completion));
1010 
1011 	/* If we're already read-only, write failures are to be expected. */
1012 	if (result != VDO_READ_ONLY) {
1013 		vdo_log_ratelimit(vdo_log_error,
1014 				  "failed to write block map page %llu",
1015 				  (unsigned long long) info->pbn);
1016 	}
1017 
1018 	set_info_state(info, PS_DIRTY);
1019 	ADD_ONCE(cache->stats.failed_writes, 1);
1020 	set_persistent_error(cache, "cannot write page", result);
1021 
1022 	if (!write_has_finished(info))
1023 		discard_page_if_needed(cache);
1024 
1025 	check_for_drain_complete(cache->zone);
1026 }
1027 
1028 static void page_is_written_out(struct vdo_completion *completion);
1029 
write_cache_page_endio(struct bio * bio)1030 static void write_cache_page_endio(struct bio *bio)
1031 {
1032 	struct vio *vio = bio->bi_private;
1033 	struct page_info *info = vio->completion.parent;
1034 
1035 	continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id);
1036 }
1037 
1038 /**
1039  * page_is_written_out() - Callback used when a page has been written out.
1040  * @completion: The vio which wrote the page. Its parent is a page_info.
1041  */
page_is_written_out(struct vdo_completion * completion)1042 static void page_is_written_out(struct vdo_completion *completion)
1043 {
1044 	bool was_discard, reclaimed;
1045 	u32 reclamations;
1046 	struct page_info *info = completion->parent;
1047 	struct vdo_page_cache *cache = info->cache;
1048 	struct block_map_page *page = (struct block_map_page *) get_page_buffer(info);
1049 
1050 	if (!page->header.initialized) {
1051 		page->header.initialized = true;
1052 		vdo_submit_metadata_vio(info->vio, info->pbn,
1053 					write_cache_page_endio,
1054 					handle_page_write_error,
1055 					REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH);
1056 		return;
1057 	}
1058 
1059 	/* Handle journal updates and torn write protection. */
1060 	vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal,
1061 						     info->recovery_lock,
1062 						     VDO_ZONE_TYPE_LOGICAL,
1063 						     cache->zone->zone_number);
1064 	info->recovery_lock = 0;
1065 	was_discard = write_has_finished(info);
1066 	reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting));
1067 
1068 	set_info_state(info, PS_RESIDENT);
1069 
1070 	reclamations = distribute_page_over_waitq(info, &info->waiting);
1071 	ADD_ONCE(cache->stats.reclaimed, reclamations);
1072 
1073 	if (was_discard)
1074 		cache->discard_count--;
1075 
1076 	if (reclaimed)
1077 		discard_page_if_needed(cache);
1078 	else
1079 		allocate_free_page(info);
1080 
1081 	check_for_drain_complete(cache->zone);
1082 }
1083 
1084 /**
1085  * write_pages() - Write the batch of pages which were covered by the layer flush which just
1086  *                 completed.
1087  * @flush_completion: The flush vio.
1088  *
1089  * This callback is registered in save_pages().
1090  */
write_pages(struct vdo_completion * flush_completion)1091 static void write_pages(struct vdo_completion *flush_completion)
1092 {
1093 	struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache;
1094 
1095 	/*
1096 	 * We need to cache these two values on the stack since it is possible for the last
1097 	 * page info to cause the page cache to get freed. Hence once we launch the last page,
1098 	 * it may be unsafe to dereference the cache.
1099 	 */
1100 	bool has_unflushed_pages = (cache->pages_to_flush > 0);
1101 	page_count_t pages_in_flush = cache->pages_in_flush;
1102 
1103 	cache->pages_in_flush = 0;
1104 	while (pages_in_flush-- > 0) {
1105 		struct page_info *info =
1106 			list_first_entry(&cache->outgoing_list, struct page_info,
1107 					 state_entry);
1108 
1109 		list_del_init(&info->state_entry);
1110 		if (vdo_is_read_only(info->cache->vdo)) {
1111 			struct vdo_completion *completion = &info->vio->completion;
1112 
1113 			vdo_reset_completion(completion);
1114 			completion->callback = page_is_written_out;
1115 			completion->error_handler = handle_page_write_error;
1116 			vdo_fail_completion(completion, VDO_READ_ONLY);
1117 			continue;
1118 		}
1119 		ADD_ONCE(info->cache->stats.pages_saved, 1);
1120 		vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio,
1121 					handle_page_write_error, REQ_OP_WRITE | REQ_PRIO);
1122 	}
1123 
1124 	if (has_unflushed_pages) {
1125 		/*
1126 		 * If there are unflushed pages, the cache can't have been freed, so this call is
1127 		 * safe.
1128 		 */
1129 		save_pages(cache);
1130 	}
1131 }
1132 
1133 /**
1134  * vdo_release_page_completion() - Release a VDO Page Completion.
1135  *
1136  * The page referenced by this completion (if any) will no longer be held busy by this completion.
1137  * If a page becomes discardable and there are completions awaiting free pages then a new round of
1138  * page discarding is started.
1139  */
vdo_release_page_completion(struct vdo_completion * completion)1140 void vdo_release_page_completion(struct vdo_completion *completion)
1141 {
1142 	struct page_info *discard_info = NULL;
1143 	struct vdo_page_completion *page_completion = as_vdo_page_completion(completion);
1144 	struct vdo_page_cache *cache;
1145 
1146 	if (completion->result == VDO_SUCCESS) {
1147 		if (!validate_completed_page_or_enter_read_only_mode(page_completion, false))
1148 			return;
1149 
1150 		if (--page_completion->info->busy == 0)
1151 			discard_info = page_completion->info;
1152 	}
1153 
1154 	VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1155 			    "Page being released after leaving all queues");
1156 
1157 	page_completion->info = NULL;
1158 	cache = page_completion->cache;
1159 	assert_on_cache_thread(cache, __func__);
1160 
1161 	if (discard_info != NULL) {
1162 		if (discard_info->write_status == WRITE_STATUS_DEFERRED) {
1163 			discard_info->write_status = WRITE_STATUS_NORMAL;
1164 			launch_page_save(discard_info);
1165 		}
1166 
1167 		/*
1168 		 * if there are excess requests for pages (that have not already started discards)
1169 		 * we need to discard some page (which may be this one)
1170 		 */
1171 		discard_page_if_needed(cache);
1172 	}
1173 }
1174 
1175 /**
1176  * load_page_for_completion() - Helper function to load a page as described by a VDO Page
1177  *                              Completion.
1178  */
load_page_for_completion(struct page_info * info,struct vdo_page_completion * vdo_page_comp)1179 static void load_page_for_completion(struct page_info *info,
1180 				     struct vdo_page_completion *vdo_page_comp)
1181 {
1182 	int result;
1183 
1184 	vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter);
1185 	result = launch_page_load(info, vdo_page_comp->pbn);
1186 	if (result != VDO_SUCCESS) {
1187 		vdo_waitq_notify_all_waiters(&info->waiting,
1188 					     complete_waiter_with_error, &result);
1189 	}
1190 }
1191 
1192 /**
1193  * vdo_get_page() - Initialize a page completion and get a block map page.
1194  * @page_completion: The vdo_page_completion to initialize.
1195  * @zone: The block map zone of the desired page.
1196  * @pbn: The absolute physical block of the desired page.
1197  * @writable: Whether the page can be modified.
1198  * @parent: The object to notify when the fetch is complete.
1199  * @callback: The notification callback.
1200  * @error_handler: The handler for fetch errors.
1201  * @requeue: Whether we must requeue when notifying the parent.
1202  *
1203  * May cause another page to be discarded (potentially writing a dirty page) and the one nominated
1204  * by the completion to be loaded from disk. When the callback is invoked, the page will be
1205  * resident in the cache and marked busy. All callers must call vdo_release_page_completion()
1206  * when they are done with the page to clear the busy mark.
1207  */
vdo_get_page(struct vdo_page_completion * page_completion,struct block_map_zone * zone,physical_block_number_t pbn,bool writable,void * parent,vdo_action_fn callback,vdo_action_fn error_handler,bool requeue)1208 void vdo_get_page(struct vdo_page_completion *page_completion,
1209 		  struct block_map_zone *zone, physical_block_number_t pbn,
1210 		  bool writable, void *parent, vdo_action_fn callback,
1211 		  vdo_action_fn error_handler, bool requeue)
1212 {
1213 	struct vdo_page_cache *cache = &zone->page_cache;
1214 	struct vdo_completion *completion = &page_completion->completion;
1215 	struct page_info *info;
1216 
1217 	assert_on_cache_thread(cache, __func__);
1218 	VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1219 			    "New page completion was not already on a wait queue");
1220 
1221 	*page_completion = (struct vdo_page_completion) {
1222 		.pbn = pbn,
1223 		.writable = writable,
1224 		.cache = cache,
1225 	};
1226 
1227 	vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION);
1228 	vdo_prepare_completion(completion, callback, error_handler,
1229 			       cache->zone->thread_id, parent);
1230 	completion->requeue = requeue;
1231 
1232 	if (page_completion->writable && vdo_is_read_only(cache->vdo)) {
1233 		vdo_fail_completion(completion, VDO_READ_ONLY);
1234 		return;
1235 	}
1236 
1237 	if (page_completion->writable)
1238 		ADD_ONCE(cache->stats.write_count, 1);
1239 	else
1240 		ADD_ONCE(cache->stats.read_count, 1);
1241 
1242 	info = find_page(cache, page_completion->pbn);
1243 	if (info != NULL) {
1244 		/* The page is in the cache already. */
1245 		if ((info->write_status == WRITE_STATUS_DEFERRED) ||
1246 		    is_incoming(info) ||
1247 		    (is_outgoing(info) && page_completion->writable)) {
1248 			/* The page is unusable until it has finished I/O. */
1249 			ADD_ONCE(cache->stats.wait_for_page, 1);
1250 			vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter);
1251 			return;
1252 		}
1253 
1254 		if (is_valid(info)) {
1255 			/* The page is usable. */
1256 			ADD_ONCE(cache->stats.found_in_cache, 1);
1257 			if (!is_present(info))
1258 				ADD_ONCE(cache->stats.read_outgoing, 1);
1259 			update_lru(info);
1260 			info->busy++;
1261 			complete_with_page(info, page_completion);
1262 			return;
1263 		}
1264 
1265 		/* Something horrible has gone wrong. */
1266 		VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state.");
1267 	}
1268 
1269 	/* The page must be fetched. */
1270 	info = find_free_page(cache);
1271 	if (info != NULL) {
1272 		ADD_ONCE(cache->stats.fetch_required, 1);
1273 		load_page_for_completion(info, page_completion);
1274 		return;
1275 	}
1276 
1277 	/* The page must wait for a page to be discarded. */
1278 	ADD_ONCE(cache->stats.discard_required, 1);
1279 	discard_page_for_completion(page_completion);
1280 }
1281 
1282 /**
1283  * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy.
1284  * @completion: The vdo_page_completion containing the page.
1285  */
vdo_request_page_write(struct vdo_completion * completion)1286 void vdo_request_page_write(struct vdo_completion *completion)
1287 {
1288 	struct page_info *info;
1289 	struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion);
1290 
1291 	if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true))
1292 		return;
1293 
1294 	info = vdo_page_comp->info;
1295 	set_info_state(info, PS_DIRTY);
1296 	launch_page_save(info);
1297 }
1298 
1299 /**
1300  * vdo_get_cached_page() - Get the block map page from a page completion.
1301  * @completion: A vdo page completion whose callback has been called.
1302  * @page_ptr: A pointer to hold the page
1303  *
1304  * Return: VDO_SUCCESS or an error
1305  */
vdo_get_cached_page(struct vdo_completion * completion,struct block_map_page ** page_ptr)1306 int vdo_get_cached_page(struct vdo_completion *completion,
1307 			struct block_map_page **page_ptr)
1308 {
1309 	int result;
1310 	struct vdo_page_completion *vpc;
1311 
1312 	vpc = as_vdo_page_completion(completion);
1313 	result = validate_completed_page(vpc, true);
1314 	if (result == VDO_SUCCESS)
1315 		*page_ptr = (struct block_map_page *) get_page_buffer(vpc->info);
1316 
1317 	return result;
1318 }
1319 
1320 /**
1321  * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache.
1322  *
1323  * There must not be any dirty pages in the cache.
1324  *
1325  * Return: A success or error code.
1326  */
vdo_invalidate_page_cache(struct vdo_page_cache * cache)1327 int vdo_invalidate_page_cache(struct vdo_page_cache *cache)
1328 {
1329 	struct page_info *info;
1330 
1331 	assert_on_cache_thread(cache, __func__);
1332 
1333 	/* Make sure we don't throw away any dirty pages. */
1334 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
1335 		int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages");
1336 
1337 		if (result != VDO_SUCCESS)
1338 			return result;
1339 	}
1340 
1341 	/* Reset the page map by re-allocating it. */
1342 	vdo_int_map_free(vdo_forget(cache->page_map));
1343 	return vdo_int_map_create(cache->page_count, &cache->page_map);
1344 }
1345 
1346 /**
1347  * get_tree_page_by_index() - Get the tree page for a given height and page index.
1348  *
1349  * Return: The requested page.
1350  */
get_tree_page_by_index(struct forest * forest,root_count_t root_index,height_t height,page_number_t page_index)1351 static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest,
1352 							      root_count_t root_index,
1353 							      height_t height,
1354 							      page_number_t page_index)
1355 {
1356 	page_number_t offset = 0;
1357 	size_t segment;
1358 
1359 	for (segment = 0; segment < forest->segments; segment++) {
1360 		page_number_t border = forest->boundaries[segment].levels[height - 1];
1361 
1362 		if (page_index < border) {
1363 			struct block_map_tree *tree = &forest->trees[root_index];
1364 
1365 			return &(tree->segments[segment].levels[height - 1][page_index - offset]);
1366 		}
1367 
1368 		offset = border;
1369 	}
1370 
1371 	return NULL;
1372 }
1373 
1374 /* Get the page referred to by the lock's tree slot at its current height. */
get_tree_page(const struct block_map_zone * zone,const struct tree_lock * lock)1375 static inline struct tree_page *get_tree_page(const struct block_map_zone *zone,
1376 					      const struct tree_lock *lock)
1377 {
1378 	return get_tree_page_by_index(zone->block_map->forest, lock->root_index,
1379 				      lock->height,
1380 				      lock->tree_slots[lock->height].page_index);
1381 }
1382 
1383 /** vdo_copy_valid_page() - Validate and copy a buffer to a page. */
vdo_copy_valid_page(char * buffer,nonce_t nonce,physical_block_number_t pbn,struct block_map_page * page)1384 bool vdo_copy_valid_page(char *buffer, nonce_t nonce,
1385 			 physical_block_number_t pbn,
1386 			 struct block_map_page *page)
1387 {
1388 	struct block_map_page *loaded = (struct block_map_page *) buffer;
1389 	enum block_map_page_validity validity =
1390 		vdo_validate_block_map_page(loaded, nonce, pbn);
1391 
1392 	if (validity == VDO_BLOCK_MAP_PAGE_VALID) {
1393 		memcpy(page, loaded, VDO_BLOCK_SIZE);
1394 		return true;
1395 	}
1396 
1397 	if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
1398 		vdo_log_error_strerror(VDO_BAD_PAGE,
1399 				       "Expected page %llu but got page %llu instead",
1400 				       (unsigned long long) pbn,
1401 				       (unsigned long long) vdo_get_block_map_page_pbn(loaded));
1402 	}
1403 
1404 	return false;
1405 }
1406 
1407 /**
1408  * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within
1409  *                     a cyclic range of values from 0 to (modulus - 1).
1410  * @lower: The lowest value to accept.
1411  * @value: The value to check.
1412  * @upper: The highest value to accept.
1413  * @modulus: The size of the cyclic space, no more than 2^15.
1414  *
1415  * The value and both bounds must be smaller than the modulus.
1416  *
1417  * Return: true if the value is in range.
1418  */
in_cyclic_range(u16 lower,u16 value,u16 upper,u16 modulus)1419 static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus)
1420 {
1421 	if (value < lower)
1422 		value += modulus;
1423 	if (upper < lower)
1424 		upper += modulus;
1425 	return (value <= upper);
1426 }
1427 
1428 /**
1429  * is_not_older() - Check whether a generation is strictly older than some other generation in the
1430  *                  context of a zone's current generation range.
1431  * @zone: The zone in which to do the comparison.
1432  * @a: The generation in question.
1433  * @b: The generation to compare to.
1434  *
1435  * Return: true if generation @a is not strictly older than generation @b in the context of @zone
1436  */
is_not_older(struct block_map_zone * zone,u8 a,u8 b)1437 static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b)
1438 {
1439 	int result;
1440 
1441 	result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) &&
1442 			     in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)),
1443 			    "generation(s) %u, %u are out of range [%u, %u]",
1444 			    a, b, zone->oldest_generation, zone->generation);
1445 	if (result != VDO_SUCCESS) {
1446 		enter_zone_read_only_mode(zone, result);
1447 		return true;
1448 	}
1449 
1450 	return in_cyclic_range(b, a, zone->generation, 1 << 8);
1451 }
1452 
release_generation(struct block_map_zone * zone,u8 generation)1453 static void release_generation(struct block_map_zone *zone, u8 generation)
1454 {
1455 	int result;
1456 
1457 	result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0),
1458 			    "dirty page count underflow for generation %u", generation);
1459 	if (result != VDO_SUCCESS) {
1460 		enter_zone_read_only_mode(zone, result);
1461 		return;
1462 	}
1463 
1464 	zone->dirty_page_counts[generation]--;
1465 	while ((zone->dirty_page_counts[zone->oldest_generation] == 0) &&
1466 	       (zone->oldest_generation != zone->generation))
1467 		zone->oldest_generation++;
1468 }
1469 
set_generation(struct block_map_zone * zone,struct tree_page * page,u8 new_generation)1470 static void set_generation(struct block_map_zone *zone, struct tree_page *page,
1471 			   u8 new_generation)
1472 {
1473 	u32 new_count;
1474 	int result;
1475 	bool decrement_old = vdo_waiter_is_waiting(&page->waiter);
1476 	u8 old_generation = page->generation;
1477 
1478 	if (decrement_old && (old_generation == new_generation))
1479 		return;
1480 
1481 	page->generation = new_generation;
1482 	new_count = ++zone->dirty_page_counts[new_generation];
1483 	result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u",
1484 			    new_generation);
1485 	if (result != VDO_SUCCESS) {
1486 		enter_zone_read_only_mode(zone, result);
1487 		return;
1488 	}
1489 
1490 	if (decrement_old)
1491 		release_generation(zone, old_generation);
1492 }
1493 
1494 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio);
1495 
1496 /* Implements waiter_callback_fn */
write_page_callback(struct vdo_waiter * waiter,void * context)1497 static void write_page_callback(struct vdo_waiter *waiter, void *context)
1498 {
1499 	write_page(container_of(waiter, struct tree_page, waiter), context);
1500 }
1501 
acquire_vio(struct vdo_waiter * waiter,struct block_map_zone * zone)1502 static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone)
1503 {
1504 	waiter->callback = write_page_callback;
1505 	acquire_vio_from_pool(zone->vio_pool, waiter);
1506 }
1507 
1508 /* Return: true if all possible generations were not already active */
attempt_increment(struct block_map_zone * zone)1509 static bool attempt_increment(struct block_map_zone *zone)
1510 {
1511 	u8 generation = zone->generation + 1;
1512 
1513 	if (zone->oldest_generation == generation)
1514 		return false;
1515 
1516 	zone->generation = generation;
1517 	return true;
1518 }
1519 
1520 /* Launches a flush if one is not already in progress. */
enqueue_page(struct tree_page * page,struct block_map_zone * zone)1521 static void enqueue_page(struct tree_page *page, struct block_map_zone *zone)
1522 {
1523 	if ((zone->flusher == NULL) && attempt_increment(zone)) {
1524 		zone->flusher = page;
1525 		acquire_vio(&page->waiter, zone);
1526 		return;
1527 	}
1528 
1529 	vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter);
1530 }
1531 
write_page_if_not_dirtied(struct vdo_waiter * waiter,void * context)1532 static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context)
1533 {
1534 	struct tree_page *page = container_of(waiter, struct tree_page, waiter);
1535 	struct write_if_not_dirtied_context *write_context = context;
1536 
1537 	if (page->generation == write_context->generation) {
1538 		acquire_vio(waiter, write_context->zone);
1539 		return;
1540 	}
1541 
1542 	enqueue_page(page, write_context->zone);
1543 }
1544 
return_to_pool(struct block_map_zone * zone,struct pooled_vio * vio)1545 static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio)
1546 {
1547 	return_vio_to_pool(vio);
1548 	check_for_drain_complete(zone);
1549 }
1550 
1551 /* This callback is registered in write_initialized_page(). */
finish_page_write(struct vdo_completion * completion)1552 static void finish_page_write(struct vdo_completion *completion)
1553 {
1554 	bool dirty;
1555 	struct vio *vio = as_vio(completion);
1556 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1557 	struct tree_page *page = completion->parent;
1558 	struct block_map_zone *zone = pooled->context;
1559 
1560 	vdo_release_recovery_journal_block_reference(zone->block_map->journal,
1561 						     page->writing_recovery_lock,
1562 						     VDO_ZONE_TYPE_LOGICAL,
1563 						     zone->zone_number);
1564 
1565 	dirty = (page->writing_generation != page->generation);
1566 	release_generation(zone, page->writing_generation);
1567 	page->writing = false;
1568 
1569 	if (zone->flusher == page) {
1570 		struct write_if_not_dirtied_context context = {
1571 			.zone = zone,
1572 			.generation = page->writing_generation,
1573 		};
1574 
1575 		vdo_waitq_notify_all_waiters(&zone->flush_waiters,
1576 					     write_page_if_not_dirtied, &context);
1577 		if (dirty && attempt_increment(zone)) {
1578 			write_page(page, pooled);
1579 			return;
1580 		}
1581 
1582 		zone->flusher = NULL;
1583 	}
1584 
1585 	if (dirty) {
1586 		enqueue_page(page, zone);
1587 	} else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) &&
1588 		   attempt_increment(zone)) {
1589 		zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters),
1590 					     struct tree_page, waiter);
1591 		write_page(zone->flusher, pooled);
1592 		return;
1593 	}
1594 
1595 	return_to_pool(zone, pooled);
1596 }
1597 
handle_write_error(struct vdo_completion * completion)1598 static void handle_write_error(struct vdo_completion *completion)
1599 {
1600 	int result = completion->result;
1601 	struct vio *vio = as_vio(completion);
1602 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1603 	struct block_map_zone *zone = pooled->context;
1604 
1605 	vio_record_metadata_io_error(vio);
1606 	enter_zone_read_only_mode(zone, result);
1607 	return_to_pool(zone, pooled);
1608 }
1609 
1610 static void write_page_endio(struct bio *bio);
1611 
write_initialized_page(struct vdo_completion * completion)1612 static void write_initialized_page(struct vdo_completion *completion)
1613 {
1614 	struct vio *vio = as_vio(completion);
1615 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1616 	struct block_map_zone *zone = pooled->context;
1617 	struct tree_page *tree_page = completion->parent;
1618 	struct block_map_page *page = (struct block_map_page *) vio->data;
1619 	blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO;
1620 
1621 	/*
1622 	 * Now that we know the page has been written at least once, mark the copy we are writing
1623 	 * as initialized.
1624 	 */
1625 	page->header.initialized = true;
1626 
1627 	if (zone->flusher == tree_page)
1628 		operation |= REQ_PREFLUSH;
1629 
1630 	vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page),
1631 				write_page_endio, handle_write_error,
1632 				operation);
1633 }
1634 
write_page_endio(struct bio * bio)1635 static void write_page_endio(struct bio *bio)
1636 {
1637 	struct pooled_vio *vio = bio->bi_private;
1638 	struct block_map_zone *zone = vio->context;
1639 	struct block_map_page *page = (struct block_map_page *) vio->vio.data;
1640 
1641 	continue_vio_after_io(&vio->vio,
1642 			      (page->header.initialized ?
1643 			       finish_page_write : write_initialized_page),
1644 			      zone->thread_id);
1645 }
1646 
write_page(struct tree_page * tree_page,struct pooled_vio * vio)1647 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio)
1648 {
1649 	struct vdo_completion *completion = &vio->vio.completion;
1650 	struct block_map_zone *zone = vio->context;
1651 	struct block_map_page *page = vdo_as_block_map_page(tree_page);
1652 
1653 	if ((zone->flusher != tree_page) &&
1654 	    is_not_older(zone, tree_page->generation, zone->generation)) {
1655 		/*
1656 		 * This page was re-dirtied after the last flush was issued, hence we need to do
1657 		 * another flush.
1658 		 */
1659 		enqueue_page(tree_page, zone);
1660 		return_to_pool(zone, vio);
1661 		return;
1662 	}
1663 
1664 	completion->parent = tree_page;
1665 	memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE);
1666 	completion->callback_thread_id = zone->thread_id;
1667 
1668 	tree_page->writing = true;
1669 	tree_page->writing_generation = tree_page->generation;
1670 	tree_page->writing_recovery_lock = tree_page->recovery_lock;
1671 
1672 	/* Clear this now so that we know this page is not on any dirty list. */
1673 	tree_page->recovery_lock = 0;
1674 
1675 	/*
1676 	 * We've already copied the page into the vio which will write it, so if it was not yet
1677 	 * initialized, the first write will indicate that (for torn write protection). It is now
1678 	 * safe to mark it as initialized in memory since if the write fails, the in memory state
1679 	 * will become irrelevant.
1680 	 */
1681 	if (page->header.initialized) {
1682 		write_initialized_page(completion);
1683 		return;
1684 	}
1685 
1686 	page->header.initialized = true;
1687 	vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page),
1688 				write_page_endio, handle_write_error,
1689 				REQ_OP_WRITE | REQ_PRIO);
1690 }
1691 
1692 /* Release a lock on a page which was being loaded or allocated. */
release_page_lock(struct data_vio * data_vio,char * what)1693 static void release_page_lock(struct data_vio *data_vio, char *what)
1694 {
1695 	struct block_map_zone *zone;
1696 	struct tree_lock *lock_holder;
1697 	struct tree_lock *lock = &data_vio->tree_lock;
1698 
1699 	VDO_ASSERT_LOG_ONLY(lock->locked,
1700 			    "release of unlocked block map page %s for key %llu in tree %u",
1701 			    what, (unsigned long long) lock->key, lock->root_index);
1702 
1703 	zone = data_vio->logical.zone->block_map_zone;
1704 	lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key);
1705 	VDO_ASSERT_LOG_ONLY((lock_holder == lock),
1706 			    "block map page %s mismatch for key %llu in tree %u",
1707 			    what, (unsigned long long) lock->key, lock->root_index);
1708 	lock->locked = false;
1709 }
1710 
finish_lookup(struct data_vio * data_vio,int result)1711 static void finish_lookup(struct data_vio *data_vio, int result)
1712 {
1713 	data_vio->tree_lock.height = 0;
1714 
1715 	--data_vio->logical.zone->block_map_zone->active_lookups;
1716 
1717 	set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot);
1718 	data_vio->vio.completion.error_handler = handle_data_vio_error;
1719 	continue_data_vio_with_error(data_vio, result);
1720 }
1721 
abort_lookup_for_waiter(struct vdo_waiter * waiter,void * context)1722 static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context)
1723 {
1724 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1725 	int result = *((int *) context);
1726 
1727 	if (!data_vio->write) {
1728 		if (result == VDO_NO_SPACE)
1729 			result = VDO_SUCCESS;
1730 	} else if (result != VDO_NO_SPACE) {
1731 		result = VDO_READ_ONLY;
1732 	}
1733 
1734 	finish_lookup(data_vio, result);
1735 }
1736 
abort_lookup(struct data_vio * data_vio,int result,char * what)1737 static void abort_lookup(struct data_vio *data_vio, int result, char *what)
1738 {
1739 	if (result != VDO_NO_SPACE)
1740 		enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result);
1741 
1742 	if (data_vio->tree_lock.locked) {
1743 		release_page_lock(data_vio, what);
1744 		vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters,
1745 					     abort_lookup_for_waiter,
1746 					     &result);
1747 	}
1748 
1749 	finish_lookup(data_vio, result);
1750 }
1751 
abort_load(struct data_vio * data_vio,int result)1752 static void abort_load(struct data_vio *data_vio, int result)
1753 {
1754 	abort_lookup(data_vio, result, "load");
1755 }
1756 
is_invalid_tree_entry(const struct vdo * vdo,const struct data_location * mapping,height_t height)1757 static bool __must_check is_invalid_tree_entry(const struct vdo *vdo,
1758 					       const struct data_location *mapping,
1759 					       height_t height)
1760 {
1761 	if (!vdo_is_valid_location(mapping) ||
1762 	    vdo_is_state_compressed(mapping->state) ||
1763 	    (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK)))
1764 		return true;
1765 
1766 	/* Roots aren't physical data blocks, so we can't check their PBNs. */
1767 	if (height == VDO_BLOCK_MAP_TREE_HEIGHT)
1768 		return false;
1769 
1770 	return !vdo_is_physical_data_block(vdo->depot, mapping->pbn);
1771 }
1772 
1773 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio);
1774 static void allocate_block_map_page(struct block_map_zone *zone,
1775 				    struct data_vio *data_vio);
1776 
continue_with_loaded_page(struct data_vio * data_vio,struct block_map_page * page)1777 static void continue_with_loaded_page(struct data_vio *data_vio,
1778 				      struct block_map_page *page)
1779 {
1780 	struct tree_lock *lock = &data_vio->tree_lock;
1781 	struct block_map_tree_slot slot = lock->tree_slots[lock->height];
1782 	struct data_location mapping =
1783 		vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]);
1784 
1785 	if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
1786 		vdo_log_error_strerror(VDO_BAD_MAPPING,
1787 				       "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
1788 				       (unsigned long long) mapping.pbn, mapping.state,
1789 				       lock->tree_slots[lock->height - 1].page_index,
1790 				       lock->height - 1);
1791 		abort_load(data_vio, VDO_BAD_MAPPING);
1792 		return;
1793 	}
1794 
1795 	if (!vdo_is_mapped_location(&mapping)) {
1796 		/* The page we need is unallocated */
1797 		allocate_block_map_page(data_vio->logical.zone->block_map_zone,
1798 					data_vio);
1799 		return;
1800 	}
1801 
1802 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
1803 	if (lock->height == 1) {
1804 		finish_lookup(data_vio, VDO_SUCCESS);
1805 		return;
1806 	}
1807 
1808 	/* We know what page we need to load next */
1809 	load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1810 }
1811 
continue_load_for_waiter(struct vdo_waiter * waiter,void * context)1812 static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context)
1813 {
1814 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1815 
1816 	data_vio->tree_lock.height--;
1817 	continue_with_loaded_page(data_vio, context);
1818 }
1819 
finish_block_map_page_load(struct vdo_completion * completion)1820 static void finish_block_map_page_load(struct vdo_completion *completion)
1821 {
1822 	physical_block_number_t pbn;
1823 	struct tree_page *tree_page;
1824 	struct block_map_page *page;
1825 	nonce_t nonce;
1826 	struct vio *vio = as_vio(completion);
1827 	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1828 	struct data_vio *data_vio = completion->parent;
1829 	struct block_map_zone *zone = pooled->context;
1830 	struct tree_lock *tree_lock = &data_vio->tree_lock;
1831 
1832 	tree_lock->height--;
1833 	pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn;
1834 	tree_page = get_tree_page(zone, tree_lock);
1835 	page = (struct block_map_page *) tree_page->page_buffer;
1836 	nonce = zone->block_map->nonce;
1837 
1838 	if (!vdo_copy_valid_page(vio->data, nonce, pbn, page))
1839 		vdo_format_block_map_page(page, nonce, pbn, false);
1840 	return_vio_to_pool(pooled);
1841 
1842 	/* Release our claim to the load and wake any waiters */
1843 	release_page_lock(data_vio, "load");
1844 	vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page);
1845 	continue_with_loaded_page(data_vio, page);
1846 }
1847 
handle_io_error(struct vdo_completion * completion)1848 static void handle_io_error(struct vdo_completion *completion)
1849 {
1850 	int result = completion->result;
1851 	struct vio *vio = as_vio(completion);
1852 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1853 	struct data_vio *data_vio = completion->parent;
1854 
1855 	vio_record_metadata_io_error(vio);
1856 	return_vio_to_pool(pooled);
1857 	abort_load(data_vio, result);
1858 }
1859 
load_page_endio(struct bio * bio)1860 static void load_page_endio(struct bio *bio)
1861 {
1862 	struct vio *vio = bio->bi_private;
1863 	struct data_vio *data_vio = vio->completion.parent;
1864 
1865 	continue_vio_after_io(vio, finish_block_map_page_load,
1866 			      data_vio->logical.zone->thread_id);
1867 }
1868 
load_page(struct vdo_waiter * waiter,void * context)1869 static void load_page(struct vdo_waiter *waiter, void *context)
1870 {
1871 	struct pooled_vio *pooled = context;
1872 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1873 	struct tree_lock *lock = &data_vio->tree_lock;
1874 	physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn;
1875 
1876 	pooled->vio.completion.parent = data_vio;
1877 	vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio,
1878 				handle_io_error, REQ_OP_READ | REQ_PRIO);
1879 }
1880 
1881 /*
1882  * If the page is already locked, queue up to wait for the lock to be released. If the lock is
1883  * acquired, @data_vio->tree_lock.locked will be true.
1884  */
attempt_page_lock(struct block_map_zone * zone,struct data_vio * data_vio)1885 static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio)
1886 {
1887 	int result;
1888 	struct tree_lock *lock_holder;
1889 	struct tree_lock *lock = &data_vio->tree_lock;
1890 	height_t height = lock->height;
1891 	struct block_map_tree_slot tree_slot = lock->tree_slots[height];
1892 	union page_key key;
1893 
1894 	key.descriptor = (struct page_descriptor) {
1895 		.root_index = lock->root_index,
1896 		.height = height,
1897 		.page_index = tree_slot.page_index,
1898 		.slot = tree_slot.block_map_slot.slot,
1899 	};
1900 	lock->key = key.key;
1901 
1902 	result = vdo_int_map_put(zone->loading_pages, lock->key,
1903 				 lock, false, (void **) &lock_holder);
1904 	if (result != VDO_SUCCESS)
1905 		return result;
1906 
1907 	if (lock_holder == NULL) {
1908 		/* We got the lock */
1909 		data_vio->tree_lock.locked = true;
1910 		return VDO_SUCCESS;
1911 	}
1912 
1913 	/* Someone else is loading or allocating the page we need */
1914 	vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter);
1915 	return VDO_SUCCESS;
1916 }
1917 
1918 /* Load a block map tree page from disk, for the next level in the data vio tree lock. */
load_block_map_page(struct block_map_zone * zone,struct data_vio * data_vio)1919 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio)
1920 {
1921 	int result;
1922 
1923 	result = attempt_page_lock(zone, data_vio);
1924 	if (result != VDO_SUCCESS) {
1925 		abort_load(data_vio, result);
1926 		return;
1927 	}
1928 
1929 	if (data_vio->tree_lock.locked) {
1930 		data_vio->waiter.callback = load_page;
1931 		acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter);
1932 	}
1933 }
1934 
allocation_failure(struct vdo_completion * completion)1935 static void allocation_failure(struct vdo_completion *completion)
1936 {
1937 	struct data_vio *data_vio = as_data_vio(completion);
1938 
1939 	if (vdo_requeue_completion_if_needed(completion,
1940 					     data_vio->logical.zone->thread_id))
1941 		return;
1942 
1943 	abort_lookup(data_vio, completion->result, "allocation");
1944 }
1945 
continue_allocation_for_waiter(struct vdo_waiter * waiter,void * context)1946 static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context)
1947 {
1948 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1949 	struct tree_lock *tree_lock = &data_vio->tree_lock;
1950 	physical_block_number_t pbn = *((physical_block_number_t *) context);
1951 
1952 	tree_lock->height--;
1953 	data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn;
1954 
1955 	if (tree_lock->height == 0) {
1956 		finish_lookup(data_vio, VDO_SUCCESS);
1957 		return;
1958 	}
1959 
1960 	allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1961 }
1962 
1963 /** expire_oldest_list() - Expire the oldest list. */
expire_oldest_list(struct dirty_lists * dirty_lists)1964 static void expire_oldest_list(struct dirty_lists *dirty_lists)
1965 {
1966 	block_count_t i = dirty_lists->offset++;
1967 
1968 	dirty_lists->oldest_period++;
1969 	if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) {
1970 		list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE],
1971 				      &dirty_lists->expired[VDO_TREE_PAGE]);
1972 	}
1973 
1974 	if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) {
1975 		list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE],
1976 				      &dirty_lists->expired[VDO_CACHE_PAGE]);
1977 	}
1978 
1979 	if (dirty_lists->offset == dirty_lists->maximum_age)
1980 		dirty_lists->offset = 0;
1981 }
1982 
1983 
1984 /** update_period() - Update the dirty_lists period if necessary. */
update_period(struct dirty_lists * dirty,sequence_number_t period)1985 static void update_period(struct dirty_lists *dirty, sequence_number_t period)
1986 {
1987 	while (dirty->next_period <= period) {
1988 		if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age)
1989 			expire_oldest_list(dirty);
1990 		dirty->next_period++;
1991 	}
1992 }
1993 
1994 /** write_expired_elements() - Write out the expired list. */
write_expired_elements(struct block_map_zone * zone)1995 static void write_expired_elements(struct block_map_zone *zone)
1996 {
1997 	struct tree_page *page, *ttmp;
1998 	struct page_info *info, *ptmp;
1999 	struct list_head *expired;
2000 	u8 generation = zone->generation;
2001 
2002 	expired = &zone->dirty_lists->expired[VDO_TREE_PAGE];
2003 	list_for_each_entry_safe(page, ttmp, expired, entry) {
2004 		int result;
2005 
2006 		list_del_init(&page->entry);
2007 
2008 		result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter),
2009 				    "Newly expired page not already waiting to write");
2010 		if (result != VDO_SUCCESS) {
2011 			enter_zone_read_only_mode(zone, result);
2012 			continue;
2013 		}
2014 
2015 		set_generation(zone, page, generation);
2016 		if (!page->writing)
2017 			enqueue_page(page, zone);
2018 	}
2019 
2020 	expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE];
2021 	list_for_each_entry_safe(info, ptmp, expired, state_entry) {
2022 		list_del_init(&info->state_entry);
2023 		schedule_page_save(info);
2024 	}
2025 
2026 	save_pages(&zone->page_cache);
2027 }
2028 
2029 /**
2030  * add_to_dirty_lists() - Add an element to the dirty lists.
2031  * @zone: The zone in which we are operating.
2032  * @entry: The list entry of the element to add.
2033  * @type: The type of page.
2034  * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty.
2035  * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a
2036  *              lock.
2037  */
add_to_dirty_lists(struct block_map_zone * zone,struct list_head * entry,enum block_map_page_type type,sequence_number_t old_period,sequence_number_t new_period)2038 static void add_to_dirty_lists(struct block_map_zone *zone,
2039 			       struct list_head *entry,
2040 			       enum block_map_page_type type,
2041 			       sequence_number_t old_period,
2042 			       sequence_number_t new_period)
2043 {
2044 	struct dirty_lists *dirty_lists = zone->dirty_lists;
2045 
2046 	if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period)))
2047 		return;
2048 
2049 	if (new_period < dirty_lists->oldest_period) {
2050 		list_move_tail(entry, &dirty_lists->expired[type]);
2051 	} else {
2052 		update_period(dirty_lists, new_period);
2053 		list_move_tail(entry,
2054 			       &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]);
2055 	}
2056 
2057 	write_expired_elements(zone);
2058 }
2059 
2060 /*
2061  * Record the allocation in the tree and wake any waiters now that the write lock has been
2062  * released.
2063  */
finish_block_map_allocation(struct vdo_completion * completion)2064 static void finish_block_map_allocation(struct vdo_completion *completion)
2065 {
2066 	physical_block_number_t pbn;
2067 	struct tree_page *tree_page;
2068 	struct block_map_page *page;
2069 	sequence_number_t old_lock;
2070 	struct data_vio *data_vio = as_data_vio(completion);
2071 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2072 	struct tree_lock *tree_lock = &data_vio->tree_lock;
2073 	height_t height = tree_lock->height;
2074 
2075 	assert_data_vio_in_logical_zone(data_vio);
2076 
2077 	tree_page = get_tree_page(zone, tree_lock);
2078 	pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn;
2079 
2080 	/* Record the allocation. */
2081 	page = (struct block_map_page *) tree_page->page_buffer;
2082 	old_lock = tree_page->recovery_lock;
2083 	vdo_update_block_map_page(page, data_vio, pbn,
2084 				  VDO_MAPPING_STATE_UNCOMPRESSED,
2085 				  &tree_page->recovery_lock);
2086 
2087 	if (vdo_waiter_is_waiting(&tree_page->waiter)) {
2088 		/* This page is waiting to be written out. */
2089 		if (zone->flusher != tree_page) {
2090 			/*
2091 			 * The outstanding flush won't cover the update we just made,
2092 			 * so mark the page as needing another flush.
2093 			 */
2094 			set_generation(zone, tree_page, zone->generation);
2095 		}
2096 	} else {
2097 		/* Put the page on a dirty list */
2098 		if (old_lock == 0)
2099 			INIT_LIST_HEAD(&tree_page->entry);
2100 		add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE,
2101 				   old_lock, tree_page->recovery_lock);
2102 	}
2103 
2104 	tree_lock->height--;
2105 	if (height > 1) {
2106 		/* Format the interior node we just allocated (in memory). */
2107 		tree_page = get_tree_page(zone, tree_lock);
2108 		vdo_format_block_map_page(tree_page->page_buffer,
2109 					  zone->block_map->nonce,
2110 					  pbn, false);
2111 	}
2112 
2113 	/* Release our claim to the allocation and wake any waiters */
2114 	release_page_lock(data_vio, "allocation");
2115 	vdo_waitq_notify_all_waiters(&tree_lock->waiters,
2116 				     continue_allocation_for_waiter, &pbn);
2117 	if (tree_lock->height == 0) {
2118 		finish_lookup(data_vio, VDO_SUCCESS);
2119 		return;
2120 	}
2121 
2122 	allocate_block_map_page(zone, data_vio);
2123 }
2124 
release_block_map_write_lock(struct vdo_completion * completion)2125 static void release_block_map_write_lock(struct vdo_completion *completion)
2126 {
2127 	struct data_vio *data_vio = as_data_vio(completion);
2128 
2129 	assert_data_vio_in_allocated_zone(data_vio);
2130 
2131 	release_data_vio_allocation_lock(data_vio, true);
2132 	launch_data_vio_logical_callback(data_vio, finish_block_map_allocation);
2133 }
2134 
2135 /*
2136  * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled,
2137  * to prevent deduplication against the block after we release the write lock on it, but before we
2138  * write out the page.
2139  */
set_block_map_page_reference_count(struct vdo_completion * completion)2140 static void set_block_map_page_reference_count(struct vdo_completion *completion)
2141 {
2142 	struct data_vio *data_vio = as_data_vio(completion);
2143 
2144 	assert_data_vio_in_allocated_zone(data_vio);
2145 
2146 	completion->callback = release_block_map_write_lock;
2147 	vdo_modify_reference_count(completion, &data_vio->increment_updater);
2148 }
2149 
journal_block_map_allocation(struct vdo_completion * completion)2150 static void journal_block_map_allocation(struct vdo_completion *completion)
2151 {
2152 	struct data_vio *data_vio = as_data_vio(completion);
2153 
2154 	assert_data_vio_in_journal_zone(data_vio);
2155 
2156 	set_data_vio_allocated_zone_callback(data_vio,
2157 					     set_block_map_page_reference_count);
2158 	vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
2159 }
2160 
allocate_block(struct vdo_completion * completion)2161 static void allocate_block(struct vdo_completion *completion)
2162 {
2163 	struct data_vio *data_vio = as_data_vio(completion);
2164 	struct tree_lock *lock = &data_vio->tree_lock;
2165 	physical_block_number_t pbn;
2166 
2167 	assert_data_vio_in_allocated_zone(data_vio);
2168 
2169 	if (!vdo_allocate_block_in_zone(data_vio))
2170 		return;
2171 
2172 	pbn = data_vio->allocation.pbn;
2173 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn;
2174 	data_vio->increment_updater = (struct reference_updater) {
2175 		.operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING,
2176 		.increment = true,
2177 		.zpbn = {
2178 			.pbn = pbn,
2179 			.state = VDO_MAPPING_STATE_UNCOMPRESSED,
2180 		},
2181 		.lock = data_vio->allocation.lock,
2182 	};
2183 
2184 	launch_data_vio_journal_callback(data_vio, journal_block_map_allocation);
2185 }
2186 
allocate_block_map_page(struct block_map_zone * zone,struct data_vio * data_vio)2187 static void allocate_block_map_page(struct block_map_zone *zone,
2188 				    struct data_vio *data_vio)
2189 {
2190 	int result;
2191 
2192 	if (!data_vio->write || data_vio->is_discard) {
2193 		/* This is a pure read or a discard, so there's nothing left to do here. */
2194 		finish_lookup(data_vio, VDO_SUCCESS);
2195 		return;
2196 	}
2197 
2198 	result = attempt_page_lock(zone, data_vio);
2199 	if (result != VDO_SUCCESS) {
2200 		abort_lookup(data_vio, result, "allocation");
2201 		return;
2202 	}
2203 
2204 	if (!data_vio->tree_lock.locked)
2205 		return;
2206 
2207 	data_vio_allocate_data_block(data_vio, VIO_BLOCK_MAP_WRITE_LOCK,
2208 				     allocate_block, allocation_failure);
2209 }
2210 
2211 /**
2212  * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio
2213  *                             resides and cache that result in the data_vio.
2214  *
2215  * All ancestors in the tree will be allocated or loaded, as needed.
2216  */
vdo_find_block_map_slot(struct data_vio * data_vio)2217 void vdo_find_block_map_slot(struct data_vio *data_vio)
2218 {
2219 	page_number_t page_index;
2220 	struct block_map_tree_slot tree_slot;
2221 	struct data_location mapping;
2222 	struct block_map_page *page = NULL;
2223 	struct tree_lock *lock = &data_vio->tree_lock;
2224 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2225 
2226 	zone->active_lookups++;
2227 	if (vdo_is_state_draining(&zone->state)) {
2228 		finish_lookup(data_vio, VDO_SHUTTING_DOWN);
2229 		return;
2230 	}
2231 
2232 	lock->tree_slots[0].block_map_slot.slot =
2233 		data_vio->logical.lbn % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2234 	page_index = (lock->tree_slots[0].page_index / zone->block_map->root_count);
2235 	tree_slot = (struct block_map_tree_slot) {
2236 		.page_index = page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2237 		.block_map_slot = {
2238 			.pbn = 0,
2239 			.slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2240 		},
2241 	};
2242 
2243 	for (lock->height = 1; lock->height <= VDO_BLOCK_MAP_TREE_HEIGHT; lock->height++) {
2244 		physical_block_number_t pbn;
2245 
2246 		lock->tree_slots[lock->height] = tree_slot;
2247 		page = (struct block_map_page *) (get_tree_page(zone, lock)->page_buffer);
2248 		pbn = vdo_get_block_map_page_pbn(page);
2249 		if (pbn != VDO_ZERO_BLOCK) {
2250 			lock->tree_slots[lock->height].block_map_slot.pbn = pbn;
2251 			break;
2252 		}
2253 
2254 		/* Calculate the index and slot for the next level. */
2255 		tree_slot.block_map_slot.slot =
2256 			tree_slot.page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2257 		tree_slot.page_index = tree_slot.page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2258 	}
2259 
2260 	/* The page at this height has been allocated and loaded. */
2261 	mapping = vdo_unpack_block_map_entry(&page->entries[tree_slot.block_map_slot.slot]);
2262 	if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
2263 		vdo_log_error_strerror(VDO_BAD_MAPPING,
2264 				       "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
2265 				       (unsigned long long) mapping.pbn, mapping.state,
2266 				       lock->tree_slots[lock->height - 1].page_index,
2267 				       lock->height - 1);
2268 		abort_load(data_vio, VDO_BAD_MAPPING);
2269 		return;
2270 	}
2271 
2272 	if (!vdo_is_mapped_location(&mapping)) {
2273 		/* The page we want one level down has not been allocated, so allocate it. */
2274 		allocate_block_map_page(zone, data_vio);
2275 		return;
2276 	}
2277 
2278 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
2279 	if (lock->height == 1) {
2280 		/* This is the ultimate block map page, so we're done */
2281 		finish_lookup(data_vio, VDO_SUCCESS);
2282 		return;
2283 	}
2284 
2285 	/* We know what page we need to load. */
2286 	load_block_map_page(zone, data_vio);
2287 }
2288 
2289 /*
2290  * Find the PBN of a leaf block map page. This method may only be used after all allocated tree
2291  * pages have been loaded, otherwise, it may give the wrong answer (0).
2292  */
vdo_find_block_map_page_pbn(struct block_map * map,page_number_t page_number)2293 physical_block_number_t vdo_find_block_map_page_pbn(struct block_map *map,
2294 						    page_number_t page_number)
2295 {
2296 	struct data_location mapping;
2297 	struct tree_page *tree_page;
2298 	struct block_map_page *page;
2299 	root_count_t root_index = page_number % map->root_count;
2300 	page_number_t page_index = page_number / map->root_count;
2301 	slot_number_t slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2302 
2303 	page_index /= VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2304 
2305 	tree_page = get_tree_page_by_index(map->forest, root_index, 1, page_index);
2306 	page = (struct block_map_page *) tree_page->page_buffer;
2307 	if (!page->header.initialized)
2308 		return VDO_ZERO_BLOCK;
2309 
2310 	mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
2311 	if (!vdo_is_valid_location(&mapping) || vdo_is_state_compressed(mapping.state))
2312 		return VDO_ZERO_BLOCK;
2313 	return mapping.pbn;
2314 }
2315 
2316 /*
2317  * Write a tree page or indicate that it has been re-dirtied if it is already being written. This
2318  * method is used when correcting errors in the tree during read-only rebuild.
2319  */
vdo_write_tree_page(struct tree_page * page,struct block_map_zone * zone)2320 void vdo_write_tree_page(struct tree_page *page, struct block_map_zone *zone)
2321 {
2322 	bool waiting = vdo_waiter_is_waiting(&page->waiter);
2323 
2324 	if (waiting && (zone->flusher == page))
2325 		return;
2326 
2327 	set_generation(zone, page, zone->generation);
2328 	if (waiting || page->writing)
2329 		return;
2330 
2331 	enqueue_page(page, zone);
2332 }
2333 
make_segment(struct forest * old_forest,block_count_t new_pages,struct boundary * new_boundary,struct forest * forest)2334 static int make_segment(struct forest *old_forest, block_count_t new_pages,
2335 			struct boundary *new_boundary, struct forest *forest)
2336 {
2337 	size_t index = (old_forest == NULL) ? 0 : old_forest->segments;
2338 	struct tree_page *page_ptr;
2339 	page_count_t segment_sizes[VDO_BLOCK_MAP_TREE_HEIGHT];
2340 	height_t height;
2341 	root_count_t root;
2342 	int result;
2343 
2344 	forest->segments = index + 1;
2345 
2346 	result = vdo_allocate(forest->segments, struct boundary,
2347 			      "forest boundary array", &forest->boundaries);
2348 	if (result != VDO_SUCCESS)
2349 		return result;
2350 
2351 	result = vdo_allocate(forest->segments, struct tree_page *,
2352 			      "forest page pointers", &forest->pages);
2353 	if (result != VDO_SUCCESS)
2354 		return result;
2355 
2356 	result = vdo_allocate(new_pages, struct tree_page,
2357 			      "new forest pages", &forest->pages[index]);
2358 	if (result != VDO_SUCCESS)
2359 		return result;
2360 
2361 	if (index > 0) {
2362 		memcpy(forest->boundaries, old_forest->boundaries,
2363 		       index * sizeof(struct boundary));
2364 		memcpy(forest->pages, old_forest->pages,
2365 		       index * sizeof(struct tree_page *));
2366 	}
2367 
2368 	memcpy(&(forest->boundaries[index]), new_boundary, sizeof(struct boundary));
2369 
2370 	for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2371 		segment_sizes[height] = new_boundary->levels[height];
2372 		if (index > 0)
2373 			segment_sizes[height] -= old_forest->boundaries[index - 1].levels[height];
2374 	}
2375 
2376 	page_ptr = forest->pages[index];
2377 	for (root = 0; root < forest->map->root_count; root++) {
2378 		struct block_map_tree_segment *segment;
2379 		struct block_map_tree *tree = &(forest->trees[root]);
2380 		height_t height;
2381 
2382 		int result = vdo_allocate(forest->segments,
2383 					  struct block_map_tree_segment,
2384 					  "tree root segments", &tree->segments);
2385 		if (result != VDO_SUCCESS)
2386 			return result;
2387 
2388 		if (index > 0) {
2389 			memcpy(tree->segments, old_forest->trees[root].segments,
2390 			       index * sizeof(struct block_map_tree_segment));
2391 		}
2392 
2393 		segment = &(tree->segments[index]);
2394 		for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2395 			if (segment_sizes[height] == 0)
2396 				continue;
2397 
2398 			segment->levels[height] = page_ptr;
2399 			if (height == (VDO_BLOCK_MAP_TREE_HEIGHT - 1)) {
2400 				/* Record the root. */
2401 				struct block_map_page *page =
2402 					vdo_format_block_map_page(page_ptr->page_buffer,
2403 								  forest->map->nonce,
2404 								  VDO_INVALID_PBN, true);
2405 				page->entries[0] =
2406 					vdo_pack_block_map_entry(forest->map->root_origin + root,
2407 								 VDO_MAPPING_STATE_UNCOMPRESSED);
2408 			}
2409 			page_ptr += segment_sizes[height];
2410 		}
2411 	}
2412 
2413 	return VDO_SUCCESS;
2414 }
2415 
deforest(struct forest * forest,size_t first_page_segment)2416 static void deforest(struct forest *forest, size_t first_page_segment)
2417 {
2418 	root_count_t root;
2419 
2420 	if (forest->pages != NULL) {
2421 		size_t segment;
2422 
2423 		for (segment = first_page_segment; segment < forest->segments; segment++)
2424 			vdo_free(forest->pages[segment]);
2425 		vdo_free(forest->pages);
2426 	}
2427 
2428 	for (root = 0; root < forest->map->root_count; root++)
2429 		vdo_free(forest->trees[root].segments);
2430 
2431 	vdo_free(forest->boundaries);
2432 	vdo_free(forest);
2433 }
2434 
2435 /**
2436  * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if
2437  *                 there is one.
2438  * @entries: The number of entries the block map will hold.
2439  *
2440  * Return: VDO_SUCCESS or an error.
2441  */
make_forest(struct block_map * map,block_count_t entries)2442 static int make_forest(struct block_map *map, block_count_t entries)
2443 {
2444 	struct forest *forest, *old_forest = map->forest;
2445 	struct boundary new_boundary, *old_boundary = NULL;
2446 	block_count_t new_pages;
2447 	int result;
2448 
2449 	if (old_forest != NULL)
2450 		old_boundary = &(old_forest->boundaries[old_forest->segments - 1]);
2451 
2452 	new_pages = vdo_compute_new_forest_pages(map->root_count, old_boundary,
2453 						 entries, &new_boundary);
2454 	if (new_pages == 0) {
2455 		map->next_entry_count = entries;
2456 		return VDO_SUCCESS;
2457 	}
2458 
2459 	result = vdo_allocate_extended(struct forest, map->root_count,
2460 				       struct block_map_tree, __func__,
2461 				       &forest);
2462 	if (result != VDO_SUCCESS)
2463 		return result;
2464 
2465 	forest->map = map;
2466 	result = make_segment(old_forest, new_pages, &new_boundary, forest);
2467 	if (result != VDO_SUCCESS) {
2468 		deforest(forest, forest->segments - 1);
2469 		return result;
2470 	}
2471 
2472 	map->next_forest = forest;
2473 	map->next_entry_count = entries;
2474 	return VDO_SUCCESS;
2475 }
2476 
2477 /**
2478  * replace_forest() - Replace a block_map's forest with the already-prepared larger forest.
2479  */
replace_forest(struct block_map * map)2480 static void replace_forest(struct block_map *map)
2481 {
2482 	if (map->next_forest != NULL) {
2483 		if (map->forest != NULL)
2484 			deforest(map->forest, map->forest->segments);
2485 		map->forest = vdo_forget(map->next_forest);
2486 	}
2487 
2488 	map->entry_count = map->next_entry_count;
2489 	map->next_entry_count = 0;
2490 }
2491 
2492 /**
2493  * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the
2494  *                   traversal.
2495  */
finish_cursor(struct cursor * cursor)2496 static void finish_cursor(struct cursor *cursor)
2497 {
2498 	struct cursors *cursors = cursor->parent;
2499 	struct vdo_completion *completion = cursors->completion;
2500 
2501 	return_vio_to_pool(vdo_forget(cursor->vio));
2502 	if (--cursors->active_roots > 0)
2503 		return;
2504 
2505 	vdo_free(cursors);
2506 
2507 	vdo_finish_completion(completion);
2508 }
2509 
2510 static void traverse(struct cursor *cursor);
2511 
2512 /**
2513  * continue_traversal() - Continue traversing a block map tree.
2514  * @completion: The VIO doing a read or write.
2515  */
continue_traversal(struct vdo_completion * completion)2516 static void continue_traversal(struct vdo_completion *completion)
2517 {
2518 	vio_record_metadata_io_error(as_vio(completion));
2519 	traverse(completion->parent);
2520 }
2521 
2522 /**
2523  * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded.
2524  * @completion: The VIO doing the read.
2525  */
finish_traversal_load(struct vdo_completion * completion)2526 static void finish_traversal_load(struct vdo_completion *completion)
2527 {
2528 	struct cursor *cursor = completion->parent;
2529 	height_t height = cursor->height;
2530 	struct cursor_level *level = &cursor->levels[height];
2531 	struct tree_page *tree_page =
2532 		&(cursor->tree->segments[0].levels[height][level->page_index]);
2533 	struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2534 
2535 	vdo_copy_valid_page(cursor->vio->vio.data,
2536 			    cursor->parent->zone->block_map->nonce,
2537 			    pbn_from_vio_bio(cursor->vio->vio.bio), page);
2538 	traverse(cursor);
2539 }
2540 
traversal_endio(struct bio * bio)2541 static void traversal_endio(struct bio *bio)
2542 {
2543 	struct vio *vio = bio->bi_private;
2544 	struct cursor *cursor = vio->completion.parent;
2545 
2546 	continue_vio_after_io(vio, finish_traversal_load,
2547 			      cursor->parent->zone->thread_id);
2548 }
2549 
2550 /**
2551  * traverse() - Traverse a single block map tree.
2552  *
2553  * This is the recursive heart of the traversal process.
2554  */
traverse(struct cursor * cursor)2555 static void traverse(struct cursor *cursor)
2556 {
2557 	for (; cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT; cursor->height++) {
2558 		height_t height = cursor->height;
2559 		struct cursor_level *level = &cursor->levels[height];
2560 		struct tree_page *tree_page =
2561 			&(cursor->tree->segments[0].levels[height][level->page_index]);
2562 		struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2563 
2564 		if (!page->header.initialized)
2565 			continue;
2566 
2567 		for (; level->slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; level->slot++) {
2568 			struct cursor_level *next_level;
2569 			page_number_t entry_index =
2570 				(VDO_BLOCK_MAP_ENTRIES_PER_PAGE * level->page_index) + level->slot;
2571 			struct data_location location =
2572 				vdo_unpack_block_map_entry(&page->entries[level->slot]);
2573 
2574 			if (!vdo_is_valid_location(&location)) {
2575 				/* This entry is invalid, so remove it from the page. */
2576 				page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2577 				vdo_write_tree_page(tree_page, cursor->parent->zone);
2578 				continue;
2579 			}
2580 
2581 			if (!vdo_is_mapped_location(&location))
2582 				continue;
2583 
2584 			/* Erase mapped entries past the end of the logical space. */
2585 			if (entry_index >= cursor->boundary.levels[height]) {
2586 				page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2587 				vdo_write_tree_page(tree_page, cursor->parent->zone);
2588 				continue;
2589 			}
2590 
2591 			if (cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT - 1) {
2592 				int result = cursor->parent->entry_callback(location.pbn,
2593 									    cursor->parent->completion);
2594 				if (result != VDO_SUCCESS) {
2595 					page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2596 					vdo_write_tree_page(tree_page, cursor->parent->zone);
2597 					continue;
2598 				}
2599 			}
2600 
2601 			if (cursor->height == 0)
2602 				continue;
2603 
2604 			cursor->height--;
2605 			next_level = &cursor->levels[cursor->height];
2606 			next_level->page_index = entry_index;
2607 			next_level->slot = 0;
2608 			level->slot++;
2609 			vdo_submit_metadata_vio(&cursor->vio->vio, location.pbn,
2610 						traversal_endio, continue_traversal,
2611 						REQ_OP_READ | REQ_PRIO);
2612 			return;
2613 		}
2614 	}
2615 
2616 	finish_cursor(cursor);
2617 }
2618 
2619 /**
2620  * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with
2621  *                   which to load pages.
2622  * @context: The pooled_vio just acquired.
2623  *
2624  * Implements waiter_callback_fn.
2625  */
launch_cursor(struct vdo_waiter * waiter,void * context)2626 static void launch_cursor(struct vdo_waiter *waiter, void *context)
2627 {
2628 	struct cursor *cursor = container_of(waiter, struct cursor, waiter);
2629 	struct pooled_vio *pooled = context;
2630 
2631 	cursor->vio = pooled;
2632 	pooled->vio.completion.parent = cursor;
2633 	pooled->vio.completion.callback_thread_id = cursor->parent->zone->thread_id;
2634 	traverse(cursor);
2635 }
2636 
2637 /**
2638  * compute_boundary() - Compute the number of pages used at each level of the given root's tree.
2639  *
2640  * Return: The list of page counts as a boundary structure.
2641  */
compute_boundary(struct block_map * map,root_count_t root_index)2642 static struct boundary compute_boundary(struct block_map *map, root_count_t root_index)
2643 {
2644 	struct boundary boundary;
2645 	height_t height;
2646 	page_count_t leaf_pages = vdo_compute_block_map_page_count(map->entry_count);
2647 	/*
2648 	 * Compute the leaf pages for this root. If the number of leaf pages does not distribute
2649 	 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to
2650 	 * roots starting from tree 0.
2651 	 */
2652 	page_count_t last_tree_root = (leaf_pages - 1) % map->root_count;
2653 	page_count_t level_pages = leaf_pages / map->root_count;
2654 
2655 	if (root_index <= last_tree_root)
2656 		level_pages++;
2657 
2658 	for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT - 1; height++) {
2659 		boundary.levels[height] = level_pages;
2660 		level_pages = DIV_ROUND_UP(level_pages, VDO_BLOCK_MAP_ENTRIES_PER_PAGE);
2661 	}
2662 
2663 	/* The root node always exists, even if the root is otherwise unused. */
2664 	boundary.levels[VDO_BLOCK_MAP_TREE_HEIGHT - 1] = 1;
2665 
2666 	return boundary;
2667 }
2668 
2669 /**
2670  * vdo_traverse_forest() - Walk the entire forest of a block map.
2671  * @callback: A function to call with the pbn of each allocated node in the forest.
2672  * @completion: The completion to notify on each traversed PBN, and when traversal completes.
2673  */
vdo_traverse_forest(struct block_map * map,vdo_entry_callback_fn callback,struct vdo_completion * completion)2674 void vdo_traverse_forest(struct block_map *map, vdo_entry_callback_fn callback,
2675 			 struct vdo_completion *completion)
2676 {
2677 	root_count_t root;
2678 	struct cursors *cursors;
2679 	int result;
2680 
2681 	result = vdo_allocate_extended(struct cursors, map->root_count,
2682 				       struct cursor, __func__, &cursors);
2683 	if (result != VDO_SUCCESS) {
2684 		vdo_fail_completion(completion, result);
2685 		return;
2686 	}
2687 
2688 	cursors->zone = &map->zones[0];
2689 	cursors->pool = cursors->zone->vio_pool;
2690 	cursors->entry_callback = callback;
2691 	cursors->completion = completion;
2692 	cursors->active_roots = map->root_count;
2693 	for (root = 0; root < map->root_count; root++) {
2694 		struct cursor *cursor = &cursors->cursors[root];
2695 
2696 		*cursor = (struct cursor) {
2697 			.tree = &map->forest->trees[root],
2698 			.height = VDO_BLOCK_MAP_TREE_HEIGHT - 1,
2699 			.parent = cursors,
2700 			.boundary = compute_boundary(map, root),
2701 		};
2702 
2703 		cursor->waiter.callback = launch_cursor;
2704 		acquire_vio_from_pool(cursors->pool, &cursor->waiter);
2705 	}
2706 }
2707 
2708 /**
2709  * initialize_block_map_zone() - Initialize the per-zone portions of the block map.
2710  * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be
2711  *               written out.
2712  */
initialize_block_map_zone(struct block_map * map,zone_count_t zone_number,page_count_t cache_size,block_count_t maximum_age)2713 static int __must_check initialize_block_map_zone(struct block_map *map,
2714 						  zone_count_t zone_number,
2715 						  page_count_t cache_size,
2716 						  block_count_t maximum_age)
2717 {
2718 	int result;
2719 	block_count_t i;
2720 	struct vdo *vdo = map->vdo;
2721 	struct block_map_zone *zone = &map->zones[zone_number];
2722 
2723 	BUILD_BUG_ON(sizeof(struct page_descriptor) != sizeof(u64));
2724 
2725 	zone->zone_number = zone_number;
2726 	zone->thread_id = vdo->thread_config.logical_threads[zone_number];
2727 	zone->block_map = map;
2728 
2729 	result = vdo_allocate_extended(struct dirty_lists, maximum_age,
2730 				       dirty_era_t, __func__,
2731 				       &zone->dirty_lists);
2732 	if (result != VDO_SUCCESS)
2733 		return result;
2734 
2735 	zone->dirty_lists->maximum_age = maximum_age;
2736 	INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_TREE_PAGE]);
2737 	INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_CACHE_PAGE]);
2738 
2739 	for (i = 0; i < maximum_age; i++) {
2740 		INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_TREE_PAGE]);
2741 		INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_CACHE_PAGE]);
2742 	}
2743 
2744 	result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->loading_pages);
2745 	if (result != VDO_SUCCESS)
2746 		return result;
2747 
2748 	result = make_vio_pool(vdo, BLOCK_MAP_VIO_POOL_SIZE, 1,
2749 			       zone->thread_id, VIO_TYPE_BLOCK_MAP_INTERIOR,
2750 			       VIO_PRIORITY_METADATA, zone, &zone->vio_pool);
2751 	if (result != VDO_SUCCESS)
2752 		return result;
2753 
2754 	vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2755 
2756 	zone->page_cache.zone = zone;
2757 	zone->page_cache.vdo = vdo;
2758 	zone->page_cache.page_count = cache_size / map->zone_count;
2759 	zone->page_cache.stats.free_pages = zone->page_cache.page_count;
2760 
2761 	result = allocate_cache_components(&zone->page_cache);
2762 	if (result != VDO_SUCCESS)
2763 		return result;
2764 
2765 	/* initialize empty circular queues */
2766 	INIT_LIST_HEAD(&zone->page_cache.lru_list);
2767 	INIT_LIST_HEAD(&zone->page_cache.outgoing_list);
2768 
2769 	return VDO_SUCCESS;
2770 }
2771 
2772 /* Implements vdo_zone_thread_getter_fn */
get_block_map_zone_thread_id(void * context,zone_count_t zone_number)2773 static thread_id_t get_block_map_zone_thread_id(void *context, zone_count_t zone_number)
2774 {
2775 	struct block_map *map = context;
2776 
2777 	return map->zones[zone_number].thread_id;
2778 }
2779 
2780 /* Implements vdo_action_preamble_fn */
prepare_for_era_advance(void * context,struct vdo_completion * parent)2781 static void prepare_for_era_advance(void *context, struct vdo_completion *parent)
2782 {
2783 	struct block_map *map = context;
2784 
2785 	map->current_era_point = map->pending_era_point;
2786 	vdo_finish_completion(parent);
2787 }
2788 
2789 /* Implements vdo_zone_action_fn */
advance_block_map_zone_era(void * context,zone_count_t zone_number,struct vdo_completion * parent)2790 static void advance_block_map_zone_era(void *context, zone_count_t zone_number,
2791 				       struct vdo_completion *parent)
2792 {
2793 	struct block_map *map = context;
2794 	struct block_map_zone *zone = &map->zones[zone_number];
2795 
2796 	update_period(zone->dirty_lists, map->current_era_point);
2797 	write_expired_elements(zone);
2798 	vdo_finish_completion(parent);
2799 }
2800 
2801 /*
2802  * Schedule an era advance if necessary. This method should not be called directly. Rather, call
2803  * vdo_schedule_default_action() on the block map's action manager.
2804  *
2805  * Implements vdo_action_scheduler_fn.
2806  */
schedule_era_advance(void * context)2807 static bool schedule_era_advance(void *context)
2808 {
2809 	struct block_map *map = context;
2810 
2811 	if (map->current_era_point == map->pending_era_point)
2812 		return false;
2813 
2814 	return vdo_schedule_action(map->action_manager, prepare_for_era_advance,
2815 				   advance_block_map_zone_era, NULL, NULL);
2816 }
2817 
uninitialize_block_map_zone(struct block_map_zone * zone)2818 static void uninitialize_block_map_zone(struct block_map_zone *zone)
2819 {
2820 	struct vdo_page_cache *cache = &zone->page_cache;
2821 
2822 	vdo_free(vdo_forget(zone->dirty_lists));
2823 	free_vio_pool(vdo_forget(zone->vio_pool));
2824 	vdo_int_map_free(vdo_forget(zone->loading_pages));
2825 	if (cache->infos != NULL) {
2826 		struct page_info *info;
2827 
2828 		for (info = cache->infos; info < cache->infos + cache->page_count; info++)
2829 			free_vio(vdo_forget(info->vio));
2830 	}
2831 
2832 	vdo_int_map_free(vdo_forget(cache->page_map));
2833 	vdo_free(vdo_forget(cache->infos));
2834 	vdo_free(vdo_forget(cache->pages));
2835 }
2836 
vdo_free_block_map(struct block_map * map)2837 void vdo_free_block_map(struct block_map *map)
2838 {
2839 	zone_count_t zone;
2840 
2841 	if (map == NULL)
2842 		return;
2843 
2844 	for (zone = 0; zone < map->zone_count; zone++)
2845 		uninitialize_block_map_zone(&map->zones[zone]);
2846 
2847 	vdo_abandon_block_map_growth(map);
2848 	if (map->forest != NULL)
2849 		deforest(vdo_forget(map->forest), 0);
2850 	vdo_free(vdo_forget(map->action_manager));
2851 	vdo_free(map);
2852 }
2853 
2854 /* @journal may be NULL. */
vdo_decode_block_map(struct block_map_state_2_0 state,block_count_t logical_blocks,struct vdo * vdo,struct recovery_journal * journal,nonce_t nonce,page_count_t cache_size,block_count_t maximum_age,struct block_map ** map_ptr)2855 int vdo_decode_block_map(struct block_map_state_2_0 state, block_count_t logical_blocks,
2856 			 struct vdo *vdo, struct recovery_journal *journal,
2857 			 nonce_t nonce, page_count_t cache_size, block_count_t maximum_age,
2858 			 struct block_map **map_ptr)
2859 {
2860 	struct block_map *map;
2861 	int result;
2862 	zone_count_t zone = 0;
2863 
2864 	BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE !=
2865 		     ((VDO_BLOCK_SIZE - sizeof(struct block_map_page)) /
2866 		      sizeof(struct block_map_entry)));
2867 	result = VDO_ASSERT(cache_size > 0, "block map cache size is specified");
2868 	if (result != VDO_SUCCESS)
2869 		return result;
2870 
2871 	result = vdo_allocate_extended(struct block_map,
2872 				       vdo->thread_config.logical_zone_count,
2873 				       struct block_map_zone, __func__, &map);
2874 	if (result != VDO_SUCCESS)
2875 		return result;
2876 
2877 	map->vdo = vdo;
2878 	map->root_origin = state.root_origin;
2879 	map->root_count = state.root_count;
2880 	map->entry_count = logical_blocks;
2881 	map->journal = journal;
2882 	map->nonce = nonce;
2883 
2884 	result = make_forest(map, map->entry_count);
2885 	if (result != VDO_SUCCESS) {
2886 		vdo_free_block_map(map);
2887 		return result;
2888 	}
2889 
2890 	replace_forest(map);
2891 
2892 	map->zone_count = vdo->thread_config.logical_zone_count;
2893 	for (zone = 0; zone < map->zone_count; zone++) {
2894 		result = initialize_block_map_zone(map, zone, cache_size, maximum_age);
2895 		if (result != VDO_SUCCESS) {
2896 			vdo_free_block_map(map);
2897 			return result;
2898 		}
2899 	}
2900 
2901 	result = vdo_make_action_manager(map->zone_count, get_block_map_zone_thread_id,
2902 					 vdo_get_recovery_journal_thread_id(journal),
2903 					 map, schedule_era_advance, vdo,
2904 					 &map->action_manager);
2905 	if (result != VDO_SUCCESS) {
2906 		vdo_free_block_map(map);
2907 		return result;
2908 	}
2909 
2910 	*map_ptr = map;
2911 	return VDO_SUCCESS;
2912 }
2913 
vdo_record_block_map(const struct block_map * map)2914 struct block_map_state_2_0 vdo_record_block_map(const struct block_map *map)
2915 {
2916 	return (struct block_map_state_2_0) {
2917 		.flat_page_origin = VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN,
2918 		/* This is the flat page count, which has turned out to always be 0. */
2919 		.flat_page_count = 0,
2920 		.root_origin = map->root_origin,
2921 		.root_count = map->root_count,
2922 	};
2923 }
2924 
2925 /* The block map needs to know the journals' sequence number to initialize the eras. */
vdo_initialize_block_map_from_journal(struct block_map * map,struct recovery_journal * journal)2926 void vdo_initialize_block_map_from_journal(struct block_map *map,
2927 					   struct recovery_journal *journal)
2928 {
2929 	zone_count_t z = 0;
2930 
2931 	map->current_era_point = vdo_get_recovery_journal_current_sequence_number(journal);
2932 	map->pending_era_point = map->current_era_point;
2933 
2934 	for (z = 0; z < map->zone_count; z++) {
2935 		struct dirty_lists *dirty_lists = map->zones[z].dirty_lists;
2936 
2937 		VDO_ASSERT_LOG_ONLY(dirty_lists->next_period == 0, "current period not set");
2938 		dirty_lists->oldest_period = map->current_era_point;
2939 		dirty_lists->next_period = map->current_era_point + 1;
2940 		dirty_lists->offset = map->current_era_point % dirty_lists->maximum_age;
2941 	}
2942 }
2943 
2944 /* Compute the logical zone for the LBN of a data vio. */
vdo_compute_logical_zone(struct data_vio * data_vio)2945 zone_count_t vdo_compute_logical_zone(struct data_vio *data_vio)
2946 {
2947 	struct block_map *map = vdo_from_data_vio(data_vio)->block_map;
2948 	struct tree_lock *tree_lock = &data_vio->tree_lock;
2949 	page_number_t page_number = data_vio->logical.lbn / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2950 
2951 	tree_lock->tree_slots[0].page_index = page_number;
2952 	tree_lock->root_index = page_number % map->root_count;
2953 	return (tree_lock->root_index % map->zone_count);
2954 }
2955 
vdo_advance_block_map_era(struct block_map * map,sequence_number_t recovery_block_number)2956 void vdo_advance_block_map_era(struct block_map *map,
2957 			       sequence_number_t recovery_block_number)
2958 {
2959 	if (map == NULL)
2960 		return;
2961 
2962 	map->pending_era_point = recovery_block_number;
2963 	vdo_schedule_default_action(map->action_manager);
2964 }
2965 
2966 /* Implements vdo_admin_initiator_fn */
initiate_drain(struct admin_state * state)2967 static void initiate_drain(struct admin_state *state)
2968 {
2969 	struct block_map_zone *zone = container_of(state, struct block_map_zone, state);
2970 
2971 	VDO_ASSERT_LOG_ONLY((zone->active_lookups == 0),
2972 			    "%s() called with no active lookups", __func__);
2973 
2974 	if (!vdo_is_state_suspending(state)) {
2975 		while (zone->dirty_lists->oldest_period < zone->dirty_lists->next_period)
2976 			expire_oldest_list(zone->dirty_lists);
2977 		write_expired_elements(zone);
2978 	}
2979 
2980 	check_for_drain_complete(zone);
2981 }
2982 
2983 /* Implements vdo_zone_action_fn. */
drain_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)2984 static void drain_zone(void *context, zone_count_t zone_number,
2985 		       struct vdo_completion *parent)
2986 {
2987 	struct block_map *map = context;
2988 	struct block_map_zone *zone = &map->zones[zone_number];
2989 
2990 	vdo_start_draining(&zone->state,
2991 			   vdo_get_current_manager_operation(map->action_manager),
2992 			   parent, initiate_drain);
2993 }
2994 
vdo_drain_block_map(struct block_map * map,const struct admin_state_code * operation,struct vdo_completion * parent)2995 void vdo_drain_block_map(struct block_map *map, const struct admin_state_code *operation,
2996 			 struct vdo_completion *parent)
2997 {
2998 	vdo_schedule_operation(map->action_manager, operation, NULL, drain_zone, NULL,
2999 			       parent);
3000 }
3001 
3002 /* Implements vdo_zone_action_fn. */
resume_block_map_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)3003 static void resume_block_map_zone(void *context, zone_count_t zone_number,
3004 				  struct vdo_completion *parent)
3005 {
3006 	struct block_map *map = context;
3007 	struct block_map_zone *zone = &map->zones[zone_number];
3008 
3009 	vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
3010 }
3011 
vdo_resume_block_map(struct block_map * map,struct vdo_completion * parent)3012 void vdo_resume_block_map(struct block_map *map, struct vdo_completion *parent)
3013 {
3014 	vdo_schedule_operation(map->action_manager, VDO_ADMIN_STATE_RESUMING,
3015 			       NULL, resume_block_map_zone, NULL, parent);
3016 }
3017 
3018 /* Allocate an expanded collection of trees, for a future growth. */
vdo_prepare_to_grow_block_map(struct block_map * map,block_count_t new_logical_blocks)3019 int vdo_prepare_to_grow_block_map(struct block_map *map,
3020 				  block_count_t new_logical_blocks)
3021 {
3022 	if (map->next_entry_count == new_logical_blocks)
3023 		return VDO_SUCCESS;
3024 
3025 	if (map->next_entry_count > 0)
3026 		vdo_abandon_block_map_growth(map);
3027 
3028 	if (new_logical_blocks < map->entry_count) {
3029 		map->next_entry_count = map->entry_count;
3030 		return VDO_SUCCESS;
3031 	}
3032 
3033 	return make_forest(map, new_logical_blocks);
3034 }
3035 
3036 /* Implements vdo_action_preamble_fn */
grow_forest(void * context,struct vdo_completion * completion)3037 static void grow_forest(void *context, struct vdo_completion *completion)
3038 {
3039 	replace_forest(context);
3040 	vdo_finish_completion(completion);
3041 }
3042 
3043 /* Requires vdo_prepare_to_grow_block_map() to have been previously called. */
vdo_grow_block_map(struct block_map * map,struct vdo_completion * parent)3044 void vdo_grow_block_map(struct block_map *map, struct vdo_completion *parent)
3045 {
3046 	vdo_schedule_operation(map->action_manager,
3047 			       VDO_ADMIN_STATE_SUSPENDED_OPERATION,
3048 			       grow_forest, NULL, NULL, parent);
3049 }
3050 
vdo_abandon_block_map_growth(struct block_map * map)3051 void vdo_abandon_block_map_growth(struct block_map *map)
3052 {
3053 	struct forest *forest = vdo_forget(map->next_forest);
3054 
3055 	if (forest != NULL)
3056 		deforest(forest, forest->segments - 1);
3057 
3058 	map->next_entry_count = 0;
3059 }
3060 
3061 /* Release the page completion and then continue the requester. */
finish_processing_page(struct vdo_completion * completion,int result)3062 static inline void finish_processing_page(struct vdo_completion *completion, int result)
3063 {
3064 	struct vdo_completion *parent = completion->parent;
3065 
3066 	vdo_release_page_completion(completion);
3067 	vdo_continue_completion(parent, result);
3068 }
3069 
handle_page_error(struct vdo_completion * completion)3070 static void handle_page_error(struct vdo_completion *completion)
3071 {
3072 	finish_processing_page(completion, completion->result);
3073 }
3074 
3075 /* Fetch the mapping page for a block map update, and call the provided handler when fetched. */
fetch_mapping_page(struct data_vio * data_vio,bool modifiable,vdo_action_fn action)3076 static void fetch_mapping_page(struct data_vio *data_vio, bool modifiable,
3077 			       vdo_action_fn action)
3078 {
3079 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3080 
3081 	if (vdo_is_state_draining(&zone->state)) {
3082 		continue_data_vio_with_error(data_vio, VDO_SHUTTING_DOWN);
3083 		return;
3084 	}
3085 
3086 	vdo_get_page(&data_vio->page_completion, zone,
3087 		     data_vio->tree_lock.tree_slots[0].block_map_slot.pbn,
3088 		     modifiable, &data_vio->vio.completion,
3089 		     action, handle_page_error, false);
3090 }
3091 
3092 /**
3093  * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped.
3094  *
3095  * This indicates the block map entry for the logical block is either unmapped or corrupted.
3096  */
clear_mapped_location(struct data_vio * data_vio)3097 static void clear_mapped_location(struct data_vio *data_vio)
3098 {
3099 	data_vio->mapped = (struct zoned_pbn) {
3100 		.state = VDO_MAPPING_STATE_UNMAPPED,
3101 	};
3102 }
3103 
3104 /**
3105  * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a
3106  *                         data_vio.
3107  *
3108  * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any
3109  *         other failure
3110  */
set_mapped_location(struct data_vio * data_vio,const struct block_map_entry * entry)3111 static int __must_check set_mapped_location(struct data_vio *data_vio,
3112 					    const struct block_map_entry *entry)
3113 {
3114 	/* Unpack the PBN for logging purposes even if the entry is invalid. */
3115 	struct data_location mapped = vdo_unpack_block_map_entry(entry);
3116 
3117 	if (vdo_is_valid_location(&mapped)) {
3118 		int result;
3119 
3120 		result = vdo_get_physical_zone(vdo_from_data_vio(data_vio),
3121 					       mapped.pbn, &data_vio->mapped.zone);
3122 		if (result == VDO_SUCCESS) {
3123 			data_vio->mapped.pbn = mapped.pbn;
3124 			data_vio->mapped.state = mapped.state;
3125 			return VDO_SUCCESS;
3126 		}
3127 
3128 		/*
3129 		 * Return all errors not specifically known to be errors from validating the
3130 		 * location.
3131 		 */
3132 		if ((result != VDO_OUT_OF_RANGE) && (result != VDO_BAD_MAPPING))
3133 			return result;
3134 	}
3135 
3136 	/*
3137 	 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases
3138 	 * to VDO_BAD_MAPPING.
3139 	 */
3140 	vdo_log_error_strerror(VDO_BAD_MAPPING,
3141 			       "PBN %llu with state %u read from the block map was invalid",
3142 			       (unsigned long long) mapped.pbn, mapped.state);
3143 
3144 	/*
3145 	 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding
3146 	 * known data loss.
3147 	 */
3148 	if (!data_vio->write)
3149 		return VDO_BAD_MAPPING;
3150 
3151 	/*
3152 	 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped
3153 	 * entry rather than fail the write.
3154 	 */
3155 	clear_mapped_location(data_vio);
3156 	return VDO_SUCCESS;
3157 }
3158 
3159 /* This callback is registered in vdo_get_mapped_block(). */
get_mapping_from_fetched_page(struct vdo_completion * completion)3160 static void get_mapping_from_fetched_page(struct vdo_completion *completion)
3161 {
3162 	int result;
3163 	struct vdo_page_completion *vpc = as_vdo_page_completion(completion);
3164 	const struct block_map_page *page;
3165 	const struct block_map_entry *entry;
3166 	struct data_vio *data_vio = as_data_vio(completion->parent);
3167 	struct block_map_tree_slot *tree_slot;
3168 
3169 	if (completion->result != VDO_SUCCESS) {
3170 		finish_processing_page(completion, completion->result);
3171 		return;
3172 	}
3173 
3174 	result = validate_completed_page(vpc, false);
3175 	if (result != VDO_SUCCESS) {
3176 		finish_processing_page(completion, result);
3177 		return;
3178 	}
3179 
3180 	page = (const struct block_map_page *) get_page_buffer(vpc->info);
3181 	tree_slot = &data_vio->tree_lock.tree_slots[0];
3182 	entry = &page->entries[tree_slot->block_map_slot.slot];
3183 
3184 	result = set_mapped_location(data_vio, entry);
3185 	finish_processing_page(completion, result);
3186 }
3187 
vdo_update_block_map_page(struct block_map_page * page,struct data_vio * data_vio,physical_block_number_t pbn,enum block_mapping_state mapping_state,sequence_number_t * recovery_lock)3188 void vdo_update_block_map_page(struct block_map_page *page, struct data_vio *data_vio,
3189 			       physical_block_number_t pbn,
3190 			       enum block_mapping_state mapping_state,
3191 			       sequence_number_t *recovery_lock)
3192 {
3193 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3194 	struct block_map *block_map = zone->block_map;
3195 	struct recovery_journal *journal = block_map->journal;
3196 	sequence_number_t old_locked, new_locked;
3197 	struct tree_lock *tree_lock = &data_vio->tree_lock;
3198 
3199 	/* Encode the new mapping. */
3200 	page->entries[tree_lock->tree_slots[tree_lock->height].block_map_slot.slot] =
3201 		vdo_pack_block_map_entry(pbn, mapping_state);
3202 
3203 	/* Adjust references on the recovery journal blocks. */
3204 	old_locked = *recovery_lock;
3205 	new_locked = data_vio->recovery_sequence_number;
3206 
3207 	if ((old_locked == 0) || (old_locked > new_locked)) {
3208 		vdo_acquire_recovery_journal_block_reference(journal, new_locked,
3209 							     VDO_ZONE_TYPE_LOGICAL,
3210 							     zone->zone_number);
3211 
3212 		if (old_locked > 0) {
3213 			vdo_release_recovery_journal_block_reference(journal, old_locked,
3214 								     VDO_ZONE_TYPE_LOGICAL,
3215 								     zone->zone_number);
3216 		}
3217 
3218 		*recovery_lock = new_locked;
3219 	}
3220 
3221 	/*
3222 	 * FIXME: explain this more
3223 	 * Release the transferred lock from the data_vio.
3224 	 */
3225 	vdo_release_journal_entry_lock(journal, new_locked);
3226 	data_vio->recovery_sequence_number = 0;
3227 }
3228 
put_mapping_in_fetched_page(struct vdo_completion * completion)3229 static void put_mapping_in_fetched_page(struct vdo_completion *completion)
3230 {
3231 	struct data_vio *data_vio = as_data_vio(completion->parent);
3232 	sequence_number_t old_lock;
3233 	struct vdo_page_completion *vpc;
3234 	struct page_info *info;
3235 	int result;
3236 
3237 	if (completion->result != VDO_SUCCESS) {
3238 		finish_processing_page(completion, completion->result);
3239 		return;
3240 	}
3241 
3242 	vpc = as_vdo_page_completion(completion);
3243 	result = validate_completed_page(vpc, true);
3244 	if (result != VDO_SUCCESS) {
3245 		finish_processing_page(completion, result);
3246 		return;
3247 	}
3248 
3249 	info = vpc->info;
3250 	old_lock = info->recovery_lock;
3251 	vdo_update_block_map_page((struct block_map_page *) get_page_buffer(info),
3252 				  data_vio, data_vio->new_mapped.pbn,
3253 				  data_vio->new_mapped.state, &info->recovery_lock);
3254 	set_info_state(info, PS_DIRTY);
3255 	add_to_dirty_lists(info->cache->zone, &info->state_entry,
3256 			   VDO_CACHE_PAGE, old_lock, info->recovery_lock);
3257 	finish_processing_page(completion, VDO_SUCCESS);
3258 }
3259 
3260 /* Read a stored block mapping into a data_vio. */
vdo_get_mapped_block(struct data_vio * data_vio)3261 void vdo_get_mapped_block(struct data_vio *data_vio)
3262 {
3263 	if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
3264 		/*
3265 		 * We know that the block map page for this LBN has not been allocated, so the
3266 		 * block must be unmapped.
3267 		 */
3268 		clear_mapped_location(data_vio);
3269 		continue_data_vio(data_vio);
3270 		return;
3271 	}
3272 
3273 	fetch_mapping_page(data_vio, false, get_mapping_from_fetched_page);
3274 }
3275 
3276 /* Update a stored block mapping to reflect a data_vio's new mapping. */
vdo_put_mapped_block(struct data_vio * data_vio)3277 void vdo_put_mapped_block(struct data_vio *data_vio)
3278 {
3279 	fetch_mapping_page(data_vio, true, put_mapping_in_fetched_page);
3280 }
3281 
vdo_get_block_map_statistics(struct block_map * map)3282 struct block_map_statistics vdo_get_block_map_statistics(struct block_map *map)
3283 {
3284 	zone_count_t zone = 0;
3285 	struct block_map_statistics totals;
3286 
3287 	memset(&totals, 0, sizeof(struct block_map_statistics));
3288 	for (zone = 0; zone < map->zone_count; zone++) {
3289 		const struct block_map_statistics *stats =
3290 			&(map->zones[zone].page_cache.stats);
3291 
3292 		totals.dirty_pages += READ_ONCE(stats->dirty_pages);
3293 		totals.clean_pages += READ_ONCE(stats->clean_pages);
3294 		totals.free_pages += READ_ONCE(stats->free_pages);
3295 		totals.failed_pages += READ_ONCE(stats->failed_pages);
3296 		totals.incoming_pages += READ_ONCE(stats->incoming_pages);
3297 		totals.outgoing_pages += READ_ONCE(stats->outgoing_pages);
3298 		totals.cache_pressure += READ_ONCE(stats->cache_pressure);
3299 		totals.read_count += READ_ONCE(stats->read_count);
3300 		totals.write_count += READ_ONCE(stats->write_count);
3301 		totals.failed_reads += READ_ONCE(stats->failed_reads);
3302 		totals.failed_writes += READ_ONCE(stats->failed_writes);
3303 		totals.reclaimed += READ_ONCE(stats->reclaimed);
3304 		totals.read_outgoing += READ_ONCE(stats->read_outgoing);
3305 		totals.found_in_cache += READ_ONCE(stats->found_in_cache);
3306 		totals.discard_required += READ_ONCE(stats->discard_required);
3307 		totals.wait_for_page += READ_ONCE(stats->wait_for_page);
3308 		totals.fetch_required += READ_ONCE(stats->fetch_required);
3309 		totals.pages_loaded += READ_ONCE(stats->pages_loaded);
3310 		totals.pages_saved += READ_ONCE(stats->pages_saved);
3311 		totals.flush_count += READ_ONCE(stats->flush_count);
3312 	}
3313 
3314 	return totals;
3315 }
3316