xref: /linux/drivers/md/dm-vdo/indexer/volume.c (revision 5d83b9cbe7cf40746948a419c5018f4d617a86fa)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "volume.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/dm-bufio.h>
10 #include <linux/err.h>
11 
12 #include "errors.h"
13 #include "logger.h"
14 #include "memory-alloc.h"
15 #include "permassert.h"
16 #include "string-utils.h"
17 #include "thread-utils.h"
18 
19 #include "chapter-index.h"
20 #include "config.h"
21 #include "geometry.h"
22 #include "hash-utils.h"
23 #include "index.h"
24 #include "sparse-cache.h"
25 
26 /*
27  * The first block of the volume layout is reserved for the volume header, which is no longer used.
28  * The remainder of the volume is divided into chapters consisting of several pages of records, and
29  * several pages of static index to use to find those records. The index pages are recorded first,
30  * followed by the record pages. The chapters are written in order as they are filled, so the
31  * volume storage acts as a circular log of the most recent chapters, with each new chapter
32  * overwriting the oldest saved one.
33  *
34  * When a new chapter is filled and closed, the records from that chapter are sorted and
35  * interleaved in approximate temporal order, and assigned to record pages. Then a static delta
36  * index is generated to store which record page contains each record. The in-memory index page map
37  * is also updated to indicate which delta lists fall on each chapter index page. This means that
38  * when a record is read, the volume only has to load a single index page and a single record page,
39  * rather than search the entire chapter. These index and record pages are written to storage, and
40  * the index pages are transferred to the page cache under the theory that the most recently
41  * written chapter is likely to be accessed again soon.
42  *
43  * When reading a record, the volume index will indicate which chapter should contain it. The
44  * volume uses the index page map to determine which chapter index page needs to be loaded, and
45  * then reads the relevant record page number from the chapter index. Both index and record pages
46  * are stored in a page cache when read for the common case that subsequent records need the same
47  * pages. The page cache evicts the least recently accessed entries when caching new pages. In
48  * addition, the volume uses dm-bufio to manage access to the storage, which may allow for
49  * additional caching depending on available system resources.
50  *
51  * Record requests are handled from cached pages when possible. If a page needs to be read, it is
52  * placed on a queue along with the request that wants to read it. Any requests for the same page
53  * that arrive while the read is pending are added to the queue entry. A separate reader thread
54  * handles the queued reads, adding the page to the cache and updating any requests queued with it
55  * so they can continue processing. This allows the index zone threads to continue processing new
56  * requests rather than wait for the storage reads.
57  *
58  * When an index rebuild is necessary, the volume reads each stored chapter to determine which
59  * range of chapters contain valid records, so that those records can be used to reconstruct the
60  * in-memory volume index.
61  */
62 
63 /* The maximum allowable number of contiguous bad chapters */
64 #define MAX_BAD_CHAPTERS 100
65 #define VOLUME_CACHE_MAX_ENTRIES (U16_MAX >> 1)
66 #define VOLUME_CACHE_QUEUED_FLAG (1 << 15)
67 #define VOLUME_CACHE_MAX_QUEUED_READS 4096
68 
69 static const u64 BAD_CHAPTER = U64_MAX;
70 
71 /*
72  * The invalidate counter is two 32 bits fields stored together atomically. The low order 32 bits
73  * are the physical page number of the cached page being read. The high order 32 bits are a
74  * sequence number. This value is written when the zone that owns it begins or completes a cache
75  * search. Any other thread will only read the counter in wait_for_pending_searches() while waiting
76  * to update the cache contents.
77  */
78 union invalidate_counter {
79 	u64 value;
80 	struct {
81 		u32 page;
82 		u32 counter;
83 	};
84 };
85 
86 static inline u32 map_to_page_number(struct index_geometry *geometry, u32 physical_page)
87 {
88 	return (physical_page - HEADER_PAGES_PER_VOLUME) % geometry->pages_per_chapter;
89 }
90 
91 static inline u32 map_to_chapter_number(struct index_geometry *geometry, u32 physical_page)
92 {
93 	return (physical_page - HEADER_PAGES_PER_VOLUME) / geometry->pages_per_chapter;
94 }
95 
96 static inline bool is_record_page(struct index_geometry *geometry, u32 physical_page)
97 {
98 	return map_to_page_number(geometry, physical_page) >= geometry->index_pages_per_chapter;
99 }
100 
101 static u32 map_to_physical_page(const struct index_geometry *geometry, u32 chapter, u32 page)
102 {
103 	/* Page zero is the header page, so the first chapter index page is page one. */
104 	return HEADER_PAGES_PER_VOLUME + (geometry->pages_per_chapter * chapter) + page;
105 }
106 
107 static inline union invalidate_counter get_invalidate_counter(struct page_cache *cache,
108 							      unsigned int zone_number)
109 {
110 	return (union invalidate_counter) {
111 		.value = READ_ONCE(cache->search_pending_counters[zone_number].atomic_value),
112 	};
113 }
114 
115 static inline void set_invalidate_counter(struct page_cache *cache,
116 					  unsigned int zone_number,
117 					  union invalidate_counter invalidate_counter)
118 {
119 	WRITE_ONCE(cache->search_pending_counters[zone_number].atomic_value,
120 		   invalidate_counter.value);
121 }
122 
123 static inline bool search_pending(union invalidate_counter invalidate_counter)
124 {
125 	return (invalidate_counter.counter & 1) != 0;
126 }
127 
128 /* Lock the cache for a zone in order to search for a page. */
129 static void begin_pending_search(struct page_cache *cache, u32 physical_page,
130 				 unsigned int zone_number)
131 {
132 	union invalidate_counter invalidate_counter =
133 		get_invalidate_counter(cache, zone_number);
134 
135 	invalidate_counter.page = physical_page;
136 	invalidate_counter.counter++;
137 	set_invalidate_counter(cache, zone_number, invalidate_counter);
138 	VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter),
139 			    "Search is pending for zone %u", zone_number);
140 	/*
141 	 * This memory barrier ensures that the write to the invalidate counter is seen by other
142 	 * threads before this thread accesses the cached page. The corresponding read memory
143 	 * barrier is in wait_for_pending_searches().
144 	 */
145 	smp_mb();
146 }
147 
148 /* Unlock the cache for a zone by clearing its invalidate counter. */
149 static void end_pending_search(struct page_cache *cache, unsigned int zone_number)
150 {
151 	union invalidate_counter invalidate_counter;
152 
153 	/*
154 	 * This memory barrier ensures that this thread completes reads of the
155 	 * cached page before other threads see the write to the invalidate
156 	 * counter.
157 	 */
158 	smp_mb();
159 
160 	invalidate_counter = get_invalidate_counter(cache, zone_number);
161 	VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter),
162 			    "Search is pending for zone %u", zone_number);
163 	invalidate_counter.counter++;
164 	set_invalidate_counter(cache, zone_number, invalidate_counter);
165 }
166 
167 static void wait_for_pending_searches(struct page_cache *cache, u32 physical_page)
168 {
169 	union invalidate_counter initial_counters[MAX_ZONES];
170 	unsigned int i;
171 
172 	/*
173 	 * We hold the read_threads_mutex. We are waiting for threads that do not hold the
174 	 * read_threads_mutex. Those threads have "locked" their targeted page by setting the
175 	 * search_pending_counter. The corresponding write memory barrier is in
176 	 * begin_pending_search().
177 	 */
178 	smp_mb();
179 
180 	for (i = 0; i < cache->zone_count; i++)
181 		initial_counters[i] = get_invalidate_counter(cache, i);
182 	for (i = 0; i < cache->zone_count; i++) {
183 		if (search_pending(initial_counters[i]) &&
184 		    (initial_counters[i].page == physical_page)) {
185 			/*
186 			 * There is an active search using the physical page. We need to wait for
187 			 * the search to finish.
188 			 *
189 			 * FIXME: Investigate using wait_event() to wait for the search to finish.
190 			 */
191 			while (initial_counters[i].value ==
192 			       get_invalidate_counter(cache, i).value)
193 				cond_resched();
194 		}
195 	}
196 }
197 
198 static void release_page_buffer(struct cached_page *page)
199 {
200 	if (page->buffer != NULL)
201 		dm_bufio_release(vdo_forget(page->buffer));
202 }
203 
204 static void clear_cache_page(struct page_cache *cache, struct cached_page *page)
205 {
206 	/* Do not clear read_pending because the read queue relies on it. */
207 	release_page_buffer(page);
208 	page->physical_page = cache->indexable_pages;
209 	WRITE_ONCE(page->last_used, 0);
210 }
211 
212 static void make_page_most_recent(struct page_cache *cache, struct cached_page *page)
213 {
214 	/*
215 	 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any
216 	 * thread holding the read_threads_mutex.
217 	 */
218 	if (atomic64_read(&cache->clock) != READ_ONCE(page->last_used))
219 		WRITE_ONCE(page->last_used, atomic64_inc_return(&cache->clock));
220 }
221 
222 /* Select a page to remove from the cache to make space for a new entry. */
223 static struct cached_page *select_victim_in_cache(struct page_cache *cache)
224 {
225 	struct cached_page *page;
226 	int oldest_index = 0;
227 	s64 oldest_time = S64_MAX;
228 	s64 last_used;
229 	u16 i;
230 
231 	/* Find the oldest unclaimed page. We hold the read_threads_mutex. */
232 	for (i = 0; i < cache->cache_slots; i++) {
233 		/* A page with a pending read must not be replaced. */
234 		if (cache->cache[i].read_pending)
235 			continue;
236 
237 		last_used = READ_ONCE(cache->cache[i].last_used);
238 		if (last_used <= oldest_time) {
239 			oldest_time = last_used;
240 			oldest_index = i;
241 		}
242 	}
243 
244 	page = &cache->cache[oldest_index];
245 	if (page->physical_page != cache->indexable_pages) {
246 		WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots);
247 		wait_for_pending_searches(cache, page->physical_page);
248 	}
249 
250 	page->read_pending = true;
251 	clear_cache_page(cache, page);
252 	return page;
253 }
254 
255 /* Make a newly filled cache entry available to other threads. */
256 static int put_page_in_cache(struct page_cache *cache, u32 physical_page,
257 			     struct cached_page *page)
258 {
259 	int result;
260 
261 	/* We hold the read_threads_mutex. */
262 	result = VDO_ASSERT((page->read_pending), "page to install has a pending read");
263 	if (result != VDO_SUCCESS)
264 		return result;
265 
266 	page->physical_page = physical_page;
267 	make_page_most_recent(cache, page);
268 	page->read_pending = false;
269 
270 	/*
271 	 * We hold the read_threads_mutex, but we must have a write memory barrier before making
272 	 * the cached_page available to the readers that do not hold the mutex. The corresponding
273 	 * read memory barrier is in get_page_and_index().
274 	 */
275 	smp_wmb();
276 
277 	/* This assignment also clears the queued flag. */
278 	WRITE_ONCE(cache->index[physical_page], page - cache->cache);
279 	return UDS_SUCCESS;
280 }
281 
282 static void cancel_page_in_cache(struct page_cache *cache, u32 physical_page,
283 				 struct cached_page *page)
284 {
285 	int result;
286 
287 	/* We hold the read_threads_mutex. */
288 	result = VDO_ASSERT((page->read_pending), "page to install has a pending read");
289 	if (result != VDO_SUCCESS)
290 		return;
291 
292 	clear_cache_page(cache, page);
293 	page->read_pending = false;
294 
295 	/* Clear the mapping and the queued flag for the new page. */
296 	WRITE_ONCE(cache->index[physical_page], cache->cache_slots);
297 }
298 
299 static inline u16 next_queue_position(u16 position)
300 {
301 	return (position + 1) % VOLUME_CACHE_MAX_QUEUED_READS;
302 }
303 
304 static inline void advance_queue_position(u16 *position)
305 {
306 	*position = next_queue_position(*position);
307 }
308 
309 static inline bool read_queue_is_full(struct page_cache *cache)
310 {
311 	return cache->read_queue_first == next_queue_position(cache->read_queue_last);
312 }
313 
314 static bool enqueue_read(struct page_cache *cache, struct uds_request *request,
315 			 u32 physical_page)
316 {
317 	struct queued_read *queue_entry;
318 	u16 last = cache->read_queue_last;
319 	u16 read_queue_index;
320 
321 	/* We hold the read_threads_mutex. */
322 	if ((cache->index[physical_page] & VOLUME_CACHE_QUEUED_FLAG) == 0) {
323 		/* This page has no existing entry in the queue. */
324 		if (read_queue_is_full(cache))
325 			return false;
326 
327 		/* Fill in the read queue entry. */
328 		cache->read_queue[last].physical_page = physical_page;
329 		cache->read_queue[last].invalid = false;
330 		cache->read_queue[last].first_request = NULL;
331 		cache->read_queue[last].last_request = NULL;
332 
333 		/* Point the cache index to the read queue entry. */
334 		read_queue_index = last;
335 		WRITE_ONCE(cache->index[physical_page],
336 			   read_queue_index | VOLUME_CACHE_QUEUED_FLAG);
337 
338 		advance_queue_position(&cache->read_queue_last);
339 	} else {
340 		/* It's already queued, so add this request to the existing entry. */
341 		read_queue_index = cache->index[physical_page] & ~VOLUME_CACHE_QUEUED_FLAG;
342 	}
343 
344 	request->next_request = NULL;
345 	queue_entry = &cache->read_queue[read_queue_index];
346 	if (queue_entry->first_request == NULL)
347 		queue_entry->first_request = request;
348 	else
349 		queue_entry->last_request->next_request = request;
350 	queue_entry->last_request = request;
351 
352 	return true;
353 }
354 
355 static void enqueue_page_read(struct volume *volume, struct uds_request *request,
356 			      u32 physical_page)
357 {
358 	/* Mark the page as queued, so that chapter invalidation knows to cancel a read. */
359 	while (!enqueue_read(&volume->page_cache, request, physical_page)) {
360 		vdo_log_debug("Read queue full, waiting for reads to finish");
361 		uds_wait_cond(&volume->read_threads_read_done_cond,
362 			      &volume->read_threads_mutex);
363 	}
364 
365 	uds_signal_cond(&volume->read_threads_cond);
366 }
367 
368 /*
369  * Reserve the next read queue entry for processing, but do not actually remove it from the queue.
370  * Must be followed by release_queued_requests().
371  */
372 static struct queued_read *reserve_read_queue_entry(struct page_cache *cache)
373 {
374 	/* We hold the read_threads_mutex. */
375 	struct queued_read *entry;
376 	u16 index_value;
377 	bool queued;
378 
379 	/* No items to dequeue */
380 	if (cache->read_queue_next_read == cache->read_queue_last)
381 		return NULL;
382 
383 	entry = &cache->read_queue[cache->read_queue_next_read];
384 	index_value = cache->index[entry->physical_page];
385 	queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0;
386 	/* Check to see if it's still queued before resetting. */
387 	if (entry->invalid && queued)
388 		WRITE_ONCE(cache->index[entry->physical_page], cache->cache_slots);
389 
390 	/*
391 	 * If a synchronous read has taken this page, set invalid to true so it doesn't get
392 	 * overwritten. Requests will just be requeued.
393 	 */
394 	if (!queued)
395 		entry->invalid = true;
396 
397 	entry->reserved = true;
398 	advance_queue_position(&cache->read_queue_next_read);
399 	return entry;
400 }
401 
402 static inline struct queued_read *wait_to_reserve_read_queue_entry(struct volume *volume)
403 {
404 	struct queued_read *queue_entry = NULL;
405 
406 	while (!volume->read_threads_exiting) {
407 		queue_entry = reserve_read_queue_entry(&volume->page_cache);
408 		if (queue_entry != NULL)
409 			break;
410 
411 		uds_wait_cond(&volume->read_threads_cond, &volume->read_threads_mutex);
412 	}
413 
414 	return queue_entry;
415 }
416 
417 static int init_chapter_index_page(const struct volume *volume, u8 *index_page,
418 				   u32 chapter, u32 index_page_number,
419 				   struct delta_index_page *chapter_index_page)
420 {
421 	u64 ci_virtual;
422 	u32 ci_chapter;
423 	u32 lowest_list;
424 	u32 highest_list;
425 	struct index_geometry *geometry = volume->geometry;
426 	int result;
427 
428 	result = uds_initialize_chapter_index_page(chapter_index_page, geometry,
429 						   index_page, volume->nonce);
430 	if (volume->lookup_mode == LOOKUP_FOR_REBUILD)
431 		return result;
432 
433 	if (result != UDS_SUCCESS) {
434 		return vdo_log_error_strerror(result,
435 					      "Reading chapter index page for chapter %u page %u",
436 					      chapter, index_page_number);
437 	}
438 
439 	uds_get_list_number_bounds(volume->index_page_map, chapter, index_page_number,
440 				   &lowest_list, &highest_list);
441 	ci_virtual = chapter_index_page->virtual_chapter_number;
442 	ci_chapter = uds_map_to_physical_chapter(geometry, ci_virtual);
443 	if ((chapter == ci_chapter) &&
444 	    (lowest_list == chapter_index_page->lowest_list_number) &&
445 	    (highest_list == chapter_index_page->highest_list_number))
446 		return UDS_SUCCESS;
447 
448 	vdo_log_warning("Index page map updated to %llu",
449 			(unsigned long long) volume->index_page_map->last_update);
450 	vdo_log_warning("Page map expects that chapter %u page %u has range %u to %u, but chapter index page has chapter %llu with range %u to %u",
451 			chapter, index_page_number, lowest_list, highest_list,
452 			(unsigned long long) ci_virtual,
453 			chapter_index_page->lowest_list_number,
454 			chapter_index_page->highest_list_number);
455 	return vdo_log_error_strerror(UDS_CORRUPT_DATA,
456 				      "index page map mismatch with chapter index");
457 }
458 
459 static int initialize_index_page(const struct volume *volume, u32 physical_page,
460 				 struct cached_page *page)
461 {
462 	u32 chapter = map_to_chapter_number(volume->geometry, physical_page);
463 	u32 index_page_number = map_to_page_number(volume->geometry, physical_page);
464 
465 	return init_chapter_index_page(volume, dm_bufio_get_block_data(page->buffer),
466 				       chapter, index_page_number, &page->index_page);
467 }
468 
469 static bool search_record_page(const u8 record_page[],
470 			       const struct uds_record_name *name,
471 			       const struct index_geometry *geometry,
472 			       struct uds_record_data *metadata)
473 {
474 	/*
475 	 * The array of records is sorted by name and stored as a binary tree in heap order, so the
476 	 * root of the tree is the first array element.
477 	 */
478 	u32 node = 0;
479 	const struct uds_volume_record *records = (const struct uds_volume_record *) record_page;
480 
481 	while (node < geometry->records_per_page) {
482 		int result;
483 		const struct uds_volume_record *record = &records[node];
484 
485 		result = memcmp(name, &record->name, UDS_RECORD_NAME_SIZE);
486 		if (result == 0) {
487 			if (metadata != NULL)
488 				*metadata = record->data;
489 			return true;
490 		}
491 
492 		/* The children of node N are at indexes 2N+1 and 2N+2. */
493 		node = ((2 * node) + ((result < 0) ? 1 : 2));
494 	}
495 
496 	return false;
497 }
498 
499 /*
500  * If we've read in a record page, we're going to do an immediate search, to speed up processing by
501  * avoiding get_record_from_zone(), and to ensure that requests make progress even when queued. If
502  * we've read in an index page, we save the record page number so we don't have to resolve the
503  * index page again. We use the location, virtual_chapter, and old_metadata fields in the request
504  * to allow the index code to know where to begin processing the request again.
505  */
506 static int search_page(struct cached_page *page, const struct volume *volume,
507 		       struct uds_request *request, u32 physical_page)
508 {
509 	int result;
510 	enum uds_index_region location;
511 	u16 record_page_number;
512 
513 	if (is_record_page(volume->geometry, physical_page)) {
514 		if (search_record_page(dm_bufio_get_block_data(page->buffer),
515 				       &request->record_name, volume->geometry,
516 				       &request->old_metadata))
517 			location = UDS_LOCATION_RECORD_PAGE_LOOKUP;
518 		else
519 			location = UDS_LOCATION_UNAVAILABLE;
520 	} else {
521 		result = uds_search_chapter_index_page(&page->index_page,
522 						       volume->geometry,
523 						       &request->record_name,
524 						       &record_page_number);
525 		if (result != UDS_SUCCESS)
526 			return result;
527 
528 		if (record_page_number == NO_CHAPTER_INDEX_ENTRY) {
529 			location = UDS_LOCATION_UNAVAILABLE;
530 		} else {
531 			location = UDS_LOCATION_INDEX_PAGE_LOOKUP;
532 			*((u16 *) &request->old_metadata) = record_page_number;
533 		}
534 	}
535 
536 	request->location = location;
537 	request->found = false;
538 	return UDS_SUCCESS;
539 }
540 
541 static int process_entry(struct volume *volume, struct queued_read *entry)
542 {
543 	u32 page_number = entry->physical_page;
544 	struct uds_request *request;
545 	struct cached_page *page = NULL;
546 	u8 *page_data;
547 	int result;
548 
549 	if (entry->invalid) {
550 		vdo_log_debug("Requeuing requests for invalid page");
551 		return UDS_SUCCESS;
552 	}
553 
554 	page = select_victim_in_cache(&volume->page_cache);
555 
556 	mutex_unlock(&volume->read_threads_mutex);
557 	page_data = dm_bufio_read(volume->client, page_number, &page->buffer);
558 	mutex_lock(&volume->read_threads_mutex);
559 	if (IS_ERR(page_data)) {
560 		result = -PTR_ERR(page_data);
561 		vdo_log_warning_strerror(result,
562 					 "error reading physical page %u from volume",
563 					 page_number);
564 		cancel_page_in_cache(&volume->page_cache, page_number, page);
565 		return result;
566 	}
567 
568 	if (entry->invalid) {
569 		vdo_log_warning("Page %u invalidated after read", page_number);
570 		cancel_page_in_cache(&volume->page_cache, page_number, page);
571 		return UDS_SUCCESS;
572 	}
573 
574 	if (!is_record_page(volume->geometry, page_number)) {
575 		result = initialize_index_page(volume, page_number, page);
576 		if (result != UDS_SUCCESS) {
577 			vdo_log_warning("Error initializing chapter index page");
578 			cancel_page_in_cache(&volume->page_cache, page_number, page);
579 			return result;
580 		}
581 	}
582 
583 	result = put_page_in_cache(&volume->page_cache, page_number, page);
584 	if (result != UDS_SUCCESS) {
585 		vdo_log_warning("Error putting page %u in cache", page_number);
586 		cancel_page_in_cache(&volume->page_cache, page_number, page);
587 		return result;
588 	}
589 
590 	request = entry->first_request;
591 	while ((request != NULL) && (result == UDS_SUCCESS)) {
592 		result = search_page(page, volume, request, page_number);
593 		request = request->next_request;
594 	}
595 
596 	return result;
597 }
598 
599 static void release_queued_requests(struct volume *volume, struct queued_read *entry,
600 				    int result)
601 {
602 	struct page_cache *cache = &volume->page_cache;
603 	u16 next_read = cache->read_queue_next_read;
604 	struct uds_request *request;
605 	struct uds_request *next;
606 
607 	for (request = entry->first_request; request != NULL; request = next) {
608 		next = request->next_request;
609 		request->status = result;
610 		request->requeued = true;
611 		uds_enqueue_request(request, STAGE_INDEX);
612 	}
613 
614 	entry->reserved = false;
615 
616 	/* Move the read_queue_first pointer as far as we can. */
617 	while ((cache->read_queue_first != next_read) &&
618 	       (!cache->read_queue[cache->read_queue_first].reserved))
619 		advance_queue_position(&cache->read_queue_first);
620 	uds_broadcast_cond(&volume->read_threads_read_done_cond);
621 }
622 
623 static void read_thread_function(void *arg)
624 {
625 	struct volume *volume = arg;
626 
627 	vdo_log_debug("reader starting");
628 	mutex_lock(&volume->read_threads_mutex);
629 	while (true) {
630 		struct queued_read *queue_entry;
631 		int result;
632 
633 		queue_entry = wait_to_reserve_read_queue_entry(volume);
634 		if (volume->read_threads_exiting)
635 			break;
636 
637 		result = process_entry(volume, queue_entry);
638 		release_queued_requests(volume, queue_entry, result);
639 	}
640 	mutex_unlock(&volume->read_threads_mutex);
641 	vdo_log_debug("reader done");
642 }
643 
644 static void get_page_and_index(struct page_cache *cache, u32 physical_page,
645 			       int *queue_index, struct cached_page **page_ptr)
646 {
647 	u16 index_value;
648 	u16 index;
649 	bool queued;
650 
651 	/*
652 	 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any
653 	 * thread holding the read_threads_mutex.
654 	 *
655 	 * Holding only a search_pending_counter is the most frequent case.
656 	 */
657 	/*
658 	 * It would be unlikely for the compiler to turn the usage of index_value into two reads of
659 	 * cache->index, but it would be possible and very bad if those reads did not return the
660 	 * same bits.
661 	 */
662 	index_value = READ_ONCE(cache->index[physical_page]);
663 	queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0;
664 	index = index_value & ~VOLUME_CACHE_QUEUED_FLAG;
665 
666 	if (!queued && (index < cache->cache_slots)) {
667 		*page_ptr = &cache->cache[index];
668 		/*
669 		 * We have acquired access to the cached page, but unless we hold the
670 		 * read_threads_mutex, we need a read memory barrier now. The corresponding write
671 		 * memory barrier is in put_page_in_cache().
672 		 */
673 		smp_rmb();
674 	} else {
675 		*page_ptr = NULL;
676 	}
677 
678 	*queue_index = queued ? index : -1;
679 }
680 
681 static void get_page_from_cache(struct page_cache *cache, u32 physical_page,
682 				struct cached_page **page)
683 {
684 	/*
685 	 * ASSERTION: We are in a zone thread.
686 	 * ASSERTION: We holding a search_pending_counter or the read_threads_mutex.
687 	 */
688 	int queue_index = -1;
689 
690 	get_page_and_index(cache, physical_page, &queue_index, page);
691 }
692 
693 static int read_page_locked(struct volume *volume, u32 physical_page,
694 			    struct cached_page **page_ptr)
695 {
696 	int result = UDS_SUCCESS;
697 	struct cached_page *page = NULL;
698 	u8 *page_data;
699 
700 	page = select_victim_in_cache(&volume->page_cache);
701 	page_data = dm_bufio_read(volume->client, physical_page, &page->buffer);
702 	if (IS_ERR(page_data)) {
703 		result = -PTR_ERR(page_data);
704 		vdo_log_warning_strerror(result,
705 					 "error reading physical page %u from volume",
706 					 physical_page);
707 		cancel_page_in_cache(&volume->page_cache, physical_page, page);
708 		return result;
709 	}
710 
711 	if (!is_record_page(volume->geometry, physical_page)) {
712 		result = initialize_index_page(volume, physical_page, page);
713 		if (result != UDS_SUCCESS) {
714 			if (volume->lookup_mode != LOOKUP_FOR_REBUILD)
715 				vdo_log_warning("Corrupt index page %u", physical_page);
716 			cancel_page_in_cache(&volume->page_cache, physical_page, page);
717 			return result;
718 		}
719 	}
720 
721 	result = put_page_in_cache(&volume->page_cache, physical_page, page);
722 	if (result != UDS_SUCCESS) {
723 		vdo_log_warning("Error putting page %u in cache", physical_page);
724 		cancel_page_in_cache(&volume->page_cache, physical_page, page);
725 		return result;
726 	}
727 
728 	*page_ptr = page;
729 	return UDS_SUCCESS;
730 }
731 
732 /* Retrieve a page from the cache while holding the read threads mutex. */
733 static int get_volume_page_locked(struct volume *volume, u32 physical_page,
734 				  struct cached_page **page_ptr)
735 {
736 	int result;
737 	struct cached_page *page = NULL;
738 
739 	get_page_from_cache(&volume->page_cache, physical_page, &page);
740 	if (page == NULL) {
741 		result = read_page_locked(volume, physical_page, &page);
742 		if (result != UDS_SUCCESS)
743 			return result;
744 	} else {
745 		make_page_most_recent(&volume->page_cache, page);
746 	}
747 
748 	*page_ptr = page;
749 	return UDS_SUCCESS;
750 }
751 
752 /* Retrieve a page from the cache while holding a search_pending lock. */
753 static int get_volume_page_protected(struct volume *volume, struct uds_request *request,
754 				     u32 physical_page, struct cached_page **page_ptr)
755 {
756 	struct cached_page *page;
757 
758 	get_page_from_cache(&volume->page_cache, physical_page, &page);
759 	if (page != NULL) {
760 		if (request->zone_number == 0) {
761 			/* Only one zone is allowed to update the LRU. */
762 			make_page_most_recent(&volume->page_cache, page);
763 		}
764 
765 		*page_ptr = page;
766 		return UDS_SUCCESS;
767 	}
768 
769 	/* Prepare to enqueue a read for the page. */
770 	end_pending_search(&volume->page_cache, request->zone_number);
771 	mutex_lock(&volume->read_threads_mutex);
772 
773 	/*
774 	 * Do the lookup again while holding the read mutex (no longer the fast case so this should
775 	 * be fine to repeat). We need to do this because a page may have been added to the cache
776 	 * by a reader thread between the time we searched above and the time we went to actually
777 	 * try to enqueue it below. This could result in us enqueuing another read for a page which
778 	 * is already in the cache, which would mean we end up with two entries in the cache for
779 	 * the same page.
780 	 */
781 	get_page_from_cache(&volume->page_cache, physical_page, &page);
782 	if (page == NULL) {
783 		enqueue_page_read(volume, request, physical_page);
784 		/*
785 		 * The performance gain from unlocking first, while "search pending" mode is off,
786 		 * turns out to be significant in some cases. The page is not available yet so
787 		 * the order does not matter for correctness as it does below.
788 		 */
789 		mutex_unlock(&volume->read_threads_mutex);
790 		begin_pending_search(&volume->page_cache, physical_page,
791 				     request->zone_number);
792 		return UDS_QUEUED;
793 	}
794 
795 	/*
796 	 * Now that the page is loaded, the volume needs to switch to "reader thread unlocked" and
797 	 * "search pending" state in careful order so no other thread can mess with the data before
798 	 * the caller gets to look at it.
799 	 */
800 	begin_pending_search(&volume->page_cache, physical_page, request->zone_number);
801 	mutex_unlock(&volume->read_threads_mutex);
802 	*page_ptr = page;
803 	return UDS_SUCCESS;
804 }
805 
806 static int get_volume_page(struct volume *volume, u32 chapter, u32 page_number,
807 			   struct cached_page **page_ptr)
808 {
809 	int result;
810 	u32 physical_page = map_to_physical_page(volume->geometry, chapter, page_number);
811 
812 	mutex_lock(&volume->read_threads_mutex);
813 	result = get_volume_page_locked(volume, physical_page, page_ptr);
814 	mutex_unlock(&volume->read_threads_mutex);
815 	return result;
816 }
817 
818 int uds_get_volume_record_page(struct volume *volume, u32 chapter, u32 page_number,
819 			       u8 **data_ptr)
820 {
821 	int result;
822 	struct cached_page *page = NULL;
823 
824 	result = get_volume_page(volume, chapter, page_number, &page);
825 	if (result == UDS_SUCCESS)
826 		*data_ptr = dm_bufio_get_block_data(page->buffer);
827 	return result;
828 }
829 
830 int uds_get_volume_index_page(struct volume *volume, u32 chapter, u32 page_number,
831 			      struct delta_index_page **index_page_ptr)
832 {
833 	int result;
834 	struct cached_page *page = NULL;
835 
836 	result = get_volume_page(volume, chapter, page_number, &page);
837 	if (result == UDS_SUCCESS)
838 		*index_page_ptr = &page->index_page;
839 	return result;
840 }
841 
842 /*
843  * Find the record page associated with a name in a given index page. This will return UDS_QUEUED
844  * if the page in question must be read from storage.
845  */
846 static int search_cached_index_page(struct volume *volume, struct uds_request *request,
847 				    u32 chapter, u32 index_page_number,
848 				    u16 *record_page_number)
849 {
850 	int result;
851 	struct cached_page *page = NULL;
852 	u32 physical_page = map_to_physical_page(volume->geometry, chapter,
853 						 index_page_number);
854 
855 	/*
856 	 * Make sure the invalidate counter is updated before we try and read the mapping. This
857 	 * prevents this thread from reading a page in the cache which has already been marked for
858 	 * invalidation by the reader thread, before the reader thread has noticed that the
859 	 * invalidate_counter has been incremented.
860 	 */
861 	begin_pending_search(&volume->page_cache, physical_page, request->zone_number);
862 
863 	result = get_volume_page_protected(volume, request, physical_page, &page);
864 	if (result != UDS_SUCCESS) {
865 		end_pending_search(&volume->page_cache, request->zone_number);
866 		return result;
867 	}
868 
869 	result = uds_search_chapter_index_page(&page->index_page, volume->geometry,
870 					       &request->record_name,
871 					       record_page_number);
872 	end_pending_search(&volume->page_cache, request->zone_number);
873 	return result;
874 }
875 
876 /*
877  * Find the metadata associated with a name in a given record page. This will return UDS_QUEUED if
878  * the page in question must be read from storage.
879  */
880 int uds_search_cached_record_page(struct volume *volume, struct uds_request *request,
881 				  u32 chapter, u16 record_page_number, bool *found)
882 {
883 	struct cached_page *record_page;
884 	struct index_geometry *geometry = volume->geometry;
885 	int result;
886 	u32 physical_page, page_number;
887 
888 	*found = false;
889 	if (record_page_number == NO_CHAPTER_INDEX_ENTRY)
890 		return UDS_SUCCESS;
891 
892 	result = VDO_ASSERT(record_page_number < geometry->record_pages_per_chapter,
893 			    "0 <= %d < %u", record_page_number,
894 			    geometry->record_pages_per_chapter);
895 	if (result != VDO_SUCCESS)
896 		return result;
897 
898 	page_number = geometry->index_pages_per_chapter + record_page_number;
899 
900 	physical_page = map_to_physical_page(volume->geometry, chapter, page_number);
901 
902 	/*
903 	 * Make sure the invalidate counter is updated before we try and read the mapping. This
904 	 * prevents this thread from reading a page in the cache which has already been marked for
905 	 * invalidation by the reader thread, before the reader thread has noticed that the
906 	 * invalidate_counter has been incremented.
907 	 */
908 	begin_pending_search(&volume->page_cache, physical_page, request->zone_number);
909 
910 	result = get_volume_page_protected(volume, request, physical_page, &record_page);
911 	if (result != UDS_SUCCESS) {
912 		end_pending_search(&volume->page_cache, request->zone_number);
913 		return result;
914 	}
915 
916 	if (search_record_page(dm_bufio_get_block_data(record_page->buffer),
917 			       &request->record_name, geometry, &request->old_metadata))
918 		*found = true;
919 
920 	end_pending_search(&volume->page_cache, request->zone_number);
921 	return UDS_SUCCESS;
922 }
923 
924 void uds_prefetch_volume_chapter(const struct volume *volume, u32 chapter)
925 {
926 	const struct index_geometry *geometry = volume->geometry;
927 	u32 physical_page = map_to_physical_page(geometry, chapter, 0);
928 
929 	dm_bufio_prefetch(volume->client, physical_page, geometry->pages_per_chapter);
930 }
931 
932 int uds_read_chapter_index_from_volume(const struct volume *volume, u64 virtual_chapter,
933 				       struct dm_buffer *volume_buffers[],
934 				       struct delta_index_page index_pages[])
935 {
936 	int result;
937 	u32 i;
938 	const struct index_geometry *geometry = volume->geometry;
939 	u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter);
940 	u32 physical_page = map_to_physical_page(geometry, physical_chapter, 0);
941 
942 	dm_bufio_prefetch(volume->client, physical_page, geometry->index_pages_per_chapter);
943 	for (i = 0; i < geometry->index_pages_per_chapter; i++) {
944 		u8 *index_page;
945 
946 		index_page = dm_bufio_read(volume->client, physical_page + i,
947 					   &volume_buffers[i]);
948 		if (IS_ERR(index_page)) {
949 			result = -PTR_ERR(index_page);
950 			vdo_log_warning_strerror(result,
951 						 "error reading physical page %u",
952 						 physical_page);
953 			return result;
954 		}
955 
956 		result = init_chapter_index_page(volume, index_page, physical_chapter, i,
957 						 &index_pages[i]);
958 		if (result != UDS_SUCCESS)
959 			return result;
960 	}
961 
962 	return UDS_SUCCESS;
963 }
964 
965 int uds_search_volume_page_cache(struct volume *volume, struct uds_request *request,
966 				 bool *found)
967 {
968 	int result;
969 	u32 physical_chapter =
970 		uds_map_to_physical_chapter(volume->geometry, request->virtual_chapter);
971 	u32 index_page_number;
972 	u16 record_page_number;
973 
974 	index_page_number = uds_find_index_page_number(volume->index_page_map,
975 						       &request->record_name,
976 						       physical_chapter);
977 
978 	if (request->location == UDS_LOCATION_INDEX_PAGE_LOOKUP) {
979 		record_page_number = *((u16 *) &request->old_metadata);
980 	} else {
981 		result = search_cached_index_page(volume, request, physical_chapter,
982 						  index_page_number,
983 						  &record_page_number);
984 		if (result != UDS_SUCCESS)
985 			return result;
986 	}
987 
988 	return uds_search_cached_record_page(volume, request, physical_chapter,
989 					     record_page_number, found);
990 }
991 
992 int uds_search_volume_page_cache_for_rebuild(struct volume *volume,
993 					     const struct uds_record_name *name,
994 					     u64 virtual_chapter, bool *found)
995 {
996 	int result;
997 	struct index_geometry *geometry = volume->geometry;
998 	struct cached_page *page;
999 	u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter);
1000 	u32 index_page_number;
1001 	u16 record_page_number;
1002 	u32 page_number;
1003 
1004 	*found = false;
1005 	index_page_number =
1006 		uds_find_index_page_number(volume->index_page_map, name,
1007 					   physical_chapter);
1008 	result = get_volume_page(volume, physical_chapter, index_page_number, &page);
1009 	if (result != UDS_SUCCESS)
1010 		return result;
1011 
1012 	result = uds_search_chapter_index_page(&page->index_page, geometry, name,
1013 					       &record_page_number);
1014 	if (result != UDS_SUCCESS)
1015 		return result;
1016 
1017 	if (record_page_number == NO_CHAPTER_INDEX_ENTRY)
1018 		return UDS_SUCCESS;
1019 
1020 	page_number = geometry->index_pages_per_chapter + record_page_number;
1021 	result = get_volume_page(volume, physical_chapter, page_number, &page);
1022 	if (result != UDS_SUCCESS)
1023 		return result;
1024 
1025 	*found = search_record_page(dm_bufio_get_block_data(page->buffer), name,
1026 				    geometry, NULL);
1027 	return UDS_SUCCESS;
1028 }
1029 
1030 static void invalidate_page(struct page_cache *cache, u32 physical_page)
1031 {
1032 	struct cached_page *page;
1033 	int queue_index = -1;
1034 
1035 	/* We hold the read_threads_mutex. */
1036 	get_page_and_index(cache, physical_page, &queue_index, &page);
1037 	if (page != NULL) {
1038 		WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots);
1039 		wait_for_pending_searches(cache, page->physical_page);
1040 		clear_cache_page(cache, page);
1041 	} else if (queue_index > -1) {
1042 		vdo_log_debug("setting pending read to invalid");
1043 		cache->read_queue[queue_index].invalid = true;
1044 	}
1045 }
1046 
1047 void uds_forget_chapter(struct volume *volume, u64 virtual_chapter)
1048 {
1049 	u32 physical_chapter =
1050 		uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
1051 	u32 first_page = map_to_physical_page(volume->geometry, physical_chapter, 0);
1052 	u32 i;
1053 
1054 	vdo_log_debug("forgetting chapter %llu", (unsigned long long) virtual_chapter);
1055 	mutex_lock(&volume->read_threads_mutex);
1056 	for (i = 0; i < volume->geometry->pages_per_chapter; i++)
1057 		invalidate_page(&volume->page_cache, first_page + i);
1058 	mutex_unlock(&volume->read_threads_mutex);
1059 }
1060 
1061 /*
1062  * Donate an index pages from a newly written chapter to the page cache since it is likely to be
1063  * used again soon. The caller must already hold the reader thread mutex.
1064  */
1065 static int donate_index_page_locked(struct volume *volume, u32 physical_chapter,
1066 				    u32 index_page_number, struct dm_buffer *page_buffer)
1067 {
1068 	int result;
1069 	struct cached_page *page = NULL;
1070 	u32 physical_page =
1071 		map_to_physical_page(volume->geometry, physical_chapter,
1072 				     index_page_number);
1073 
1074 	page = select_victim_in_cache(&volume->page_cache);
1075 	page->buffer = page_buffer;
1076 	result = init_chapter_index_page(volume, dm_bufio_get_block_data(page_buffer),
1077 					 physical_chapter, index_page_number,
1078 					 &page->index_page);
1079 	if (result != UDS_SUCCESS) {
1080 		vdo_log_warning("Error initialize chapter index page");
1081 		cancel_page_in_cache(&volume->page_cache, physical_page, page);
1082 		return result;
1083 	}
1084 
1085 	result = put_page_in_cache(&volume->page_cache, physical_page, page);
1086 	if (result != UDS_SUCCESS) {
1087 		vdo_log_warning("Error putting page %u in cache", physical_page);
1088 		cancel_page_in_cache(&volume->page_cache, physical_page, page);
1089 		return result;
1090 	}
1091 
1092 	return UDS_SUCCESS;
1093 }
1094 
1095 static int write_index_pages(struct volume *volume, u32 physical_chapter_number,
1096 			     struct open_chapter_index *chapter_index)
1097 {
1098 	struct index_geometry *geometry = volume->geometry;
1099 	struct dm_buffer *page_buffer;
1100 	u32 first_index_page = map_to_physical_page(geometry, physical_chapter_number, 0);
1101 	u32 delta_list_number = 0;
1102 	u32 index_page_number;
1103 
1104 	for (index_page_number = 0;
1105 	     index_page_number < geometry->index_pages_per_chapter;
1106 	     index_page_number++) {
1107 		u8 *page_data;
1108 		u32 physical_page = first_index_page + index_page_number;
1109 		u32 lists_packed;
1110 		bool last_page;
1111 		int result;
1112 
1113 		page_data = dm_bufio_new(volume->client, physical_page, &page_buffer);
1114 		if (IS_ERR(page_data)) {
1115 			return vdo_log_warning_strerror(-PTR_ERR(page_data),
1116 							"failed to prepare index page");
1117 		}
1118 
1119 		last_page = ((index_page_number + 1) == geometry->index_pages_per_chapter);
1120 		result = uds_pack_open_chapter_index_page(chapter_index, page_data,
1121 							  delta_list_number, last_page,
1122 							  &lists_packed);
1123 		if (result != UDS_SUCCESS) {
1124 			dm_bufio_release(page_buffer);
1125 			return vdo_log_warning_strerror(result,
1126 							"failed to pack index page");
1127 		}
1128 
1129 		dm_bufio_mark_buffer_dirty(page_buffer);
1130 
1131 		if (lists_packed == 0) {
1132 			vdo_log_debug("no delta lists packed on chapter %u page %u",
1133 				      physical_chapter_number, index_page_number);
1134 		} else {
1135 			delta_list_number += lists_packed;
1136 		}
1137 
1138 		uds_update_index_page_map(volume->index_page_map,
1139 					  chapter_index->virtual_chapter_number,
1140 					  physical_chapter_number, index_page_number,
1141 					  delta_list_number - 1);
1142 
1143 		mutex_lock(&volume->read_threads_mutex);
1144 		result = donate_index_page_locked(volume, physical_chapter_number,
1145 						  index_page_number, page_buffer);
1146 		mutex_unlock(&volume->read_threads_mutex);
1147 		if (result != UDS_SUCCESS) {
1148 			dm_bufio_release(page_buffer);
1149 			return result;
1150 		}
1151 	}
1152 
1153 	return UDS_SUCCESS;
1154 }
1155 
1156 static u32 encode_tree(u8 record_page[],
1157 		       const struct uds_volume_record *sorted_pointers[],
1158 		       u32 next_record, u32 node, u32 node_count)
1159 {
1160 	if (node < node_count) {
1161 		u32 child = (2 * node) + 1;
1162 
1163 		next_record = encode_tree(record_page, sorted_pointers, next_record,
1164 					  child, node_count);
1165 
1166 		/*
1167 		 * In-order traversal: copy the contents of the next record into the page at the
1168 		 * node offset.
1169 		 */
1170 		memcpy(&record_page[node * BYTES_PER_RECORD],
1171 		       sorted_pointers[next_record++], BYTES_PER_RECORD);
1172 
1173 		next_record = encode_tree(record_page, sorted_pointers, next_record,
1174 					  child + 1, node_count);
1175 	}
1176 
1177 	return next_record;
1178 }
1179 
1180 static int encode_record_page(const struct volume *volume,
1181 			      const struct uds_volume_record records[], u8 record_page[])
1182 {
1183 	int result;
1184 	u32 i;
1185 	u32 records_per_page = volume->geometry->records_per_page;
1186 	const struct uds_volume_record **record_pointers = volume->record_pointers;
1187 
1188 	for (i = 0; i < records_per_page; i++)
1189 		record_pointers[i] = &records[i];
1190 
1191 	/*
1192 	 * Sort the record pointers by using just the names in the records, which is less work than
1193 	 * sorting the entire record values.
1194 	 */
1195 	BUILD_BUG_ON(offsetof(struct uds_volume_record, name) != 0);
1196 	result = uds_radix_sort(volume->radix_sorter, (const u8 **) record_pointers,
1197 				records_per_page, UDS_RECORD_NAME_SIZE);
1198 	if (result != UDS_SUCCESS)
1199 		return result;
1200 
1201 	encode_tree(record_page, record_pointers, 0, 0, records_per_page);
1202 	return UDS_SUCCESS;
1203 }
1204 
1205 static int write_record_pages(struct volume *volume, u32 physical_chapter_number,
1206 			      const struct uds_volume_record *records)
1207 {
1208 	u32 record_page_number;
1209 	struct index_geometry *geometry = volume->geometry;
1210 	struct dm_buffer *page_buffer;
1211 	const struct uds_volume_record *next_record = records;
1212 	u32 first_record_page = map_to_physical_page(geometry, physical_chapter_number,
1213 						     geometry->index_pages_per_chapter);
1214 
1215 	for (record_page_number = 0;
1216 	     record_page_number < geometry->record_pages_per_chapter;
1217 	     record_page_number++) {
1218 		u8 *page_data;
1219 		u32 physical_page = first_record_page + record_page_number;
1220 		int result;
1221 
1222 		page_data = dm_bufio_new(volume->client, physical_page, &page_buffer);
1223 		if (IS_ERR(page_data)) {
1224 			return vdo_log_warning_strerror(-PTR_ERR(page_data),
1225 							"failed to prepare record page");
1226 		}
1227 
1228 		result = encode_record_page(volume, next_record, page_data);
1229 		if (result != UDS_SUCCESS) {
1230 			dm_bufio_release(page_buffer);
1231 			return vdo_log_warning_strerror(result,
1232 							"failed to encode record page %u",
1233 							record_page_number);
1234 		}
1235 
1236 		next_record += geometry->records_per_page;
1237 		dm_bufio_mark_buffer_dirty(page_buffer);
1238 		dm_bufio_release(page_buffer);
1239 	}
1240 
1241 	return UDS_SUCCESS;
1242 }
1243 
1244 int uds_write_chapter(struct volume *volume, struct open_chapter_index *chapter_index,
1245 		      const struct uds_volume_record *records)
1246 {
1247 	int result;
1248 	u32 physical_chapter_number =
1249 		uds_map_to_physical_chapter(volume->geometry,
1250 					    chapter_index->virtual_chapter_number);
1251 
1252 	result = write_index_pages(volume, physical_chapter_number, chapter_index);
1253 	if (result != UDS_SUCCESS)
1254 		return result;
1255 
1256 	result = write_record_pages(volume, physical_chapter_number, records);
1257 	if (result != UDS_SUCCESS)
1258 		return result;
1259 
1260 	result = -dm_bufio_write_dirty_buffers(volume->client);
1261 	if (result != UDS_SUCCESS)
1262 		vdo_log_error_strerror(result, "cannot sync chapter to volume");
1263 
1264 	return result;
1265 }
1266 
1267 static void probe_chapter(struct volume *volume, u32 chapter_number,
1268 			  u64 *virtual_chapter_number)
1269 {
1270 	const struct index_geometry *geometry = volume->geometry;
1271 	u32 expected_list_number = 0;
1272 	u32 i;
1273 	u64 vcn = BAD_CHAPTER;
1274 
1275 	*virtual_chapter_number = BAD_CHAPTER;
1276 	dm_bufio_prefetch(volume->client,
1277 			  map_to_physical_page(geometry, chapter_number, 0),
1278 			  geometry->index_pages_per_chapter);
1279 
1280 	for (i = 0; i < geometry->index_pages_per_chapter; i++) {
1281 		struct delta_index_page *page;
1282 		int result;
1283 
1284 		result = uds_get_volume_index_page(volume, chapter_number, i, &page);
1285 		if (result != UDS_SUCCESS)
1286 			return;
1287 
1288 		if (page->virtual_chapter_number == BAD_CHAPTER) {
1289 			vdo_log_error("corrupt index page in chapter %u",
1290 				      chapter_number);
1291 			return;
1292 		}
1293 
1294 		if (vcn == BAD_CHAPTER) {
1295 			vcn = page->virtual_chapter_number;
1296 		} else if (page->virtual_chapter_number != vcn) {
1297 			vdo_log_error("inconsistent chapter %u index page %u: expected vcn %llu, got vcn %llu",
1298 				      chapter_number, i, (unsigned long long) vcn,
1299 				      (unsigned long long) page->virtual_chapter_number);
1300 			return;
1301 		}
1302 
1303 		if (expected_list_number != page->lowest_list_number) {
1304 			vdo_log_error("inconsistent chapter %u index page %u: expected list number %u, got list number %u",
1305 				      chapter_number, i, expected_list_number,
1306 				      page->lowest_list_number);
1307 			return;
1308 		}
1309 		expected_list_number = page->highest_list_number + 1;
1310 
1311 		result = uds_validate_chapter_index_page(page, geometry);
1312 		if (result != UDS_SUCCESS)
1313 			return;
1314 	}
1315 
1316 	if (chapter_number != uds_map_to_physical_chapter(geometry, vcn)) {
1317 		vdo_log_error("chapter %u vcn %llu is out of phase (%u)", chapter_number,
1318 			      (unsigned long long) vcn, geometry->chapters_per_volume);
1319 		return;
1320 	}
1321 
1322 	*virtual_chapter_number = vcn;
1323 }
1324 
1325 /* Find the last valid physical chapter in the volume. */
1326 static void find_real_end_of_volume(struct volume *volume, u32 limit, u32 *limit_ptr)
1327 {
1328 	u32 span = 1;
1329 	u32 tries = 0;
1330 
1331 	while (limit > 0) {
1332 		u32 chapter = (span > limit) ? 0 : limit - span;
1333 		u64 vcn = 0;
1334 
1335 		probe_chapter(volume, chapter, &vcn);
1336 		if (vcn == BAD_CHAPTER) {
1337 			limit = chapter;
1338 			if (++tries > 1)
1339 				span *= 2;
1340 		} else {
1341 			if (span == 1)
1342 				break;
1343 			span /= 2;
1344 			tries = 0;
1345 		}
1346 	}
1347 
1348 	*limit_ptr = limit;
1349 }
1350 
1351 static int find_chapter_limits(struct volume *volume, u32 chapter_limit, u64 *lowest_vcn,
1352 			       u64 *highest_vcn)
1353 {
1354 	struct index_geometry *geometry = volume->geometry;
1355 	u64 zero_vcn;
1356 	u64 lowest = BAD_CHAPTER;
1357 	u64 highest = BAD_CHAPTER;
1358 	u64 moved_chapter = BAD_CHAPTER;
1359 	u32 left_chapter = 0;
1360 	u32 right_chapter = 0;
1361 	u32 bad_chapters = 0;
1362 
1363 	/*
1364 	 * This method assumes there is at most one run of contiguous bad chapters caused by
1365 	 * unflushed writes. Either the bad spot is at the beginning and end, or somewhere in the
1366 	 * middle. Wherever it is, the highest and lowest VCNs are adjacent to it. Otherwise the
1367 	 * volume is cleanly saved and somewhere in the middle of it the highest VCN immediately
1368 	 * precedes the lowest one.
1369 	 */
1370 
1371 	/* It doesn't matter if this results in a bad spot (BAD_CHAPTER). */
1372 	probe_chapter(volume, 0, &zero_vcn);
1373 
1374 	/*
1375 	 * Binary search for end of the discontinuity in the monotonically increasing virtual
1376 	 * chapter numbers; bad spots are treated as a span of BAD_CHAPTER values. In effect we're
1377 	 * searching for the index of the smallest value less than zero_vcn. In the case we go off
1378 	 * the end it means that chapter 0 has the lowest vcn.
1379 	 *
1380 	 * If a virtual chapter is out-of-order, it will be the one moved by conversion. Always
1381 	 * skip over the moved chapter when searching, adding it to the range at the end if
1382 	 * necessary.
1383 	 */
1384 	if (geometry->remapped_physical > 0) {
1385 		u64 remapped_vcn;
1386 
1387 		probe_chapter(volume, geometry->remapped_physical, &remapped_vcn);
1388 		if (remapped_vcn == geometry->remapped_virtual)
1389 			moved_chapter = geometry->remapped_physical;
1390 	}
1391 
1392 	left_chapter = 0;
1393 	right_chapter = chapter_limit;
1394 
1395 	while (left_chapter < right_chapter) {
1396 		u64 probe_vcn;
1397 		u32 chapter = (left_chapter + right_chapter) / 2;
1398 
1399 		if (chapter == moved_chapter)
1400 			chapter--;
1401 
1402 		probe_chapter(volume, chapter, &probe_vcn);
1403 		if (zero_vcn <= probe_vcn) {
1404 			left_chapter = chapter + 1;
1405 			if (left_chapter == moved_chapter)
1406 				left_chapter++;
1407 		} else {
1408 			right_chapter = chapter;
1409 		}
1410 	}
1411 
1412 	/* If left_chapter goes off the end, chapter 0 has the lowest virtual chapter number.*/
1413 	if (left_chapter >= chapter_limit)
1414 		left_chapter = 0;
1415 
1416 	/* At this point, left_chapter is the chapter with the lowest virtual chapter number. */
1417 	probe_chapter(volume, left_chapter, &lowest);
1418 
1419 	/* The moved chapter might be the lowest in the range. */
1420 	if ((moved_chapter != BAD_CHAPTER) && (lowest == geometry->remapped_virtual + 1))
1421 		lowest = geometry->remapped_virtual;
1422 
1423 	/*
1424 	 * Circularly scan backwards, moving over any bad chapters until encountering a good one,
1425 	 * which is the chapter with the highest vcn.
1426 	 */
1427 	while (highest == BAD_CHAPTER) {
1428 		right_chapter = (right_chapter + chapter_limit - 1) % chapter_limit;
1429 		if (right_chapter == moved_chapter)
1430 			continue;
1431 
1432 		probe_chapter(volume, right_chapter, &highest);
1433 		if (bad_chapters++ >= MAX_BAD_CHAPTERS) {
1434 			vdo_log_error("too many bad chapters in volume: %u",
1435 				      bad_chapters);
1436 			return UDS_CORRUPT_DATA;
1437 		}
1438 	}
1439 
1440 	*lowest_vcn = lowest;
1441 	*highest_vcn = highest;
1442 	return UDS_SUCCESS;
1443 }
1444 
1445 /*
1446  * Find the highest and lowest contiguous chapters present in the volume and determine their
1447  * virtual chapter numbers. This is used by rebuild.
1448  */
1449 int uds_find_volume_chapter_boundaries(struct volume *volume, u64 *lowest_vcn,
1450 				       u64 *highest_vcn, bool *is_empty)
1451 {
1452 	u32 chapter_limit = volume->geometry->chapters_per_volume;
1453 
1454 	find_real_end_of_volume(volume, chapter_limit, &chapter_limit);
1455 	if (chapter_limit == 0) {
1456 		*lowest_vcn = 0;
1457 		*highest_vcn = 0;
1458 		*is_empty = true;
1459 		return UDS_SUCCESS;
1460 	}
1461 
1462 	*is_empty = false;
1463 	return find_chapter_limits(volume, chapter_limit, lowest_vcn, highest_vcn);
1464 }
1465 
1466 int __must_check uds_replace_volume_storage(struct volume *volume,
1467 					    struct index_layout *layout,
1468 					    struct block_device *bdev)
1469 {
1470 	int result;
1471 	u32 i;
1472 
1473 	result = uds_replace_index_layout_storage(layout, bdev);
1474 	if (result != UDS_SUCCESS)
1475 		return result;
1476 
1477 	/* Release all outstanding dm_bufio objects */
1478 	for (i = 0; i < volume->page_cache.indexable_pages; i++)
1479 		volume->page_cache.index[i] = volume->page_cache.cache_slots;
1480 	for (i = 0; i < volume->page_cache.cache_slots; i++)
1481 		clear_cache_page(&volume->page_cache, &volume->page_cache.cache[i]);
1482 	if (volume->sparse_cache != NULL)
1483 		uds_invalidate_sparse_cache(volume->sparse_cache);
1484 	if (volume->client != NULL)
1485 		dm_bufio_client_destroy(vdo_forget(volume->client));
1486 
1487 	return uds_open_volume_bufio(layout, volume->geometry->bytes_per_page,
1488 				     volume->reserved_buffers, &volume->client);
1489 }
1490 
1491 static int __must_check initialize_page_cache(struct page_cache *cache,
1492 					      const struct index_geometry *geometry,
1493 					      u32 chapters_in_cache,
1494 					      unsigned int zone_count)
1495 {
1496 	int result;
1497 	u32 i;
1498 
1499 	cache->indexable_pages = geometry->pages_per_volume + 1;
1500 	cache->cache_slots = chapters_in_cache * geometry->record_pages_per_chapter;
1501 	cache->zone_count = zone_count;
1502 	atomic64_set(&cache->clock, 1);
1503 
1504 	result = VDO_ASSERT((cache->cache_slots <= VOLUME_CACHE_MAX_ENTRIES),
1505 			    "requested cache size, %u, within limit %u",
1506 			    cache->cache_slots, VOLUME_CACHE_MAX_ENTRIES);
1507 	if (result != VDO_SUCCESS)
1508 		return result;
1509 
1510 	result = vdo_allocate(VOLUME_CACHE_MAX_QUEUED_READS, struct queued_read,
1511 			      "volume read queue", &cache->read_queue);
1512 	if (result != VDO_SUCCESS)
1513 		return result;
1514 
1515 	result = vdo_allocate(cache->zone_count, struct search_pending_counter,
1516 			      "Volume Cache Zones", &cache->search_pending_counters);
1517 	if (result != VDO_SUCCESS)
1518 		return result;
1519 
1520 	result = vdo_allocate(cache->indexable_pages, u16, "page cache index",
1521 			      &cache->index);
1522 	if (result != VDO_SUCCESS)
1523 		return result;
1524 
1525 	result = vdo_allocate(cache->cache_slots, struct cached_page, "page cache cache",
1526 			      &cache->cache);
1527 	if (result != VDO_SUCCESS)
1528 		return result;
1529 
1530 	/* Initialize index values to invalid values. */
1531 	for (i = 0; i < cache->indexable_pages; i++)
1532 		cache->index[i] = cache->cache_slots;
1533 
1534 	for (i = 0; i < cache->cache_slots; i++)
1535 		clear_cache_page(cache, &cache->cache[i]);
1536 
1537 	return UDS_SUCCESS;
1538 }
1539 
1540 int uds_make_volume(const struct uds_configuration *config, struct index_layout *layout,
1541 		    struct volume **new_volume)
1542 {
1543 	unsigned int i;
1544 	struct volume *volume = NULL;
1545 	struct index_geometry *geometry;
1546 	unsigned int reserved_buffers;
1547 	int result;
1548 
1549 	result = vdo_allocate(1, struct volume, "volume", &volume);
1550 	if (result != VDO_SUCCESS)
1551 		return result;
1552 
1553 	volume->nonce = uds_get_volume_nonce(layout);
1554 
1555 	result = uds_copy_index_geometry(config->geometry, &volume->geometry);
1556 	if (result != UDS_SUCCESS) {
1557 		uds_free_volume(volume);
1558 		return vdo_log_warning_strerror(result,
1559 						"failed to allocate geometry: error");
1560 	}
1561 	geometry = volume->geometry;
1562 
1563 	/*
1564 	 * Reserve a buffer for each entry in the page cache, one for the chapter writer, and one
1565 	 * for each entry in the sparse cache.
1566 	 */
1567 	reserved_buffers = config->cache_chapters * geometry->record_pages_per_chapter;
1568 	reserved_buffers += 1;
1569 	if (uds_is_sparse_index_geometry(geometry))
1570 		reserved_buffers += (config->cache_chapters * geometry->index_pages_per_chapter);
1571 	volume->reserved_buffers = reserved_buffers;
1572 	result = uds_open_volume_bufio(layout, geometry->bytes_per_page,
1573 				       volume->reserved_buffers, &volume->client);
1574 	if (result != UDS_SUCCESS) {
1575 		uds_free_volume(volume);
1576 		return result;
1577 	}
1578 
1579 	result = uds_make_radix_sorter(geometry->records_per_page,
1580 				       &volume->radix_sorter);
1581 	if (result != UDS_SUCCESS) {
1582 		uds_free_volume(volume);
1583 		return result;
1584 	}
1585 
1586 	result = vdo_allocate(geometry->records_per_page,
1587 			      const struct uds_volume_record *, "record pointers",
1588 			      &volume->record_pointers);
1589 	if (result != VDO_SUCCESS) {
1590 		uds_free_volume(volume);
1591 		return result;
1592 	}
1593 
1594 	if (uds_is_sparse_index_geometry(geometry)) {
1595 		size_t page_size = sizeof(struct delta_index_page) + geometry->bytes_per_page;
1596 
1597 		result = uds_make_sparse_cache(geometry, config->cache_chapters,
1598 					       config->zone_count,
1599 					       &volume->sparse_cache);
1600 		if (result != UDS_SUCCESS) {
1601 			uds_free_volume(volume);
1602 			return result;
1603 		}
1604 
1605 		volume->cache_size =
1606 			page_size * geometry->index_pages_per_chapter * config->cache_chapters;
1607 	}
1608 
1609 	result = initialize_page_cache(&volume->page_cache, geometry,
1610 				       config->cache_chapters, config->zone_count);
1611 	if (result != UDS_SUCCESS) {
1612 		uds_free_volume(volume);
1613 		return result;
1614 	}
1615 
1616 	volume->cache_size += volume->page_cache.cache_slots * sizeof(struct delta_index_page);
1617 	result = uds_make_index_page_map(geometry, &volume->index_page_map);
1618 	if (result != UDS_SUCCESS) {
1619 		uds_free_volume(volume);
1620 		return result;
1621 	}
1622 
1623 	mutex_init(&volume->read_threads_mutex);
1624 	uds_init_cond(&volume->read_threads_read_done_cond);
1625 	uds_init_cond(&volume->read_threads_cond);
1626 
1627 	result = vdo_allocate(config->read_threads, struct thread *, "reader threads",
1628 			      &volume->reader_threads);
1629 	if (result != VDO_SUCCESS) {
1630 		uds_free_volume(volume);
1631 		return result;
1632 	}
1633 
1634 	for (i = 0; i < config->read_threads; i++) {
1635 		result = vdo_create_thread(read_thread_function, (void *) volume,
1636 					   "reader", &volume->reader_threads[i]);
1637 		if (result != VDO_SUCCESS) {
1638 			uds_free_volume(volume);
1639 			return result;
1640 		}
1641 
1642 		volume->read_thread_count = i + 1;
1643 	}
1644 
1645 	*new_volume = volume;
1646 	return UDS_SUCCESS;
1647 }
1648 
1649 static void uninitialize_page_cache(struct page_cache *cache)
1650 {
1651 	u16 i;
1652 
1653 	if (cache->cache != NULL) {
1654 		for (i = 0; i < cache->cache_slots; i++)
1655 			release_page_buffer(&cache->cache[i]);
1656 	}
1657 	vdo_free(cache->index);
1658 	vdo_free(cache->cache);
1659 	vdo_free(cache->search_pending_counters);
1660 	vdo_free(cache->read_queue);
1661 }
1662 
1663 void uds_free_volume(struct volume *volume)
1664 {
1665 	if (volume == NULL)
1666 		return;
1667 
1668 	if (volume->reader_threads != NULL) {
1669 		unsigned int i;
1670 
1671 		/* This works even if some threads weren't started. */
1672 		mutex_lock(&volume->read_threads_mutex);
1673 		volume->read_threads_exiting = true;
1674 		uds_broadcast_cond(&volume->read_threads_cond);
1675 		mutex_unlock(&volume->read_threads_mutex);
1676 		for (i = 0; i < volume->read_thread_count; i++)
1677 			vdo_join_threads(volume->reader_threads[i]);
1678 		vdo_free(volume->reader_threads);
1679 		volume->reader_threads = NULL;
1680 	}
1681 
1682 	/* Must destroy the client AFTER freeing the cached pages. */
1683 	uninitialize_page_cache(&volume->page_cache);
1684 	uds_free_sparse_cache(volume->sparse_cache);
1685 	if (volume->client != NULL)
1686 		dm_bufio_client_destroy(vdo_forget(volume->client));
1687 
1688 	uds_free_index_page_map(volume->index_page_map);
1689 	uds_free_radix_sorter(volume->radix_sorter);
1690 	vdo_free(volume->geometry);
1691 	vdo_free(volume->record_pointers);
1692 	vdo_free(volume);
1693 }
1694