xref: /linux/drivers/md/dm-vdo/indexer/index.c (revision 2697b79a469b68e3ad3640f55284359c1396278d)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 
7 #include "index.h"
8 
9 #include "logger.h"
10 #include "memory-alloc.h"
11 
12 #include "funnel-requestqueue.h"
13 #include "hash-utils.h"
14 #include "sparse-cache.h"
15 
16 static const u64 NO_LAST_SAVE = U64_MAX;
17 
18 /*
19  * When searching for deduplication records, the index first searches the volume index, and then
20  * searches the chapter index for the relevant chapter. If the chapter has been fully committed to
21  * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been
22  * committed (either the open chapter or a recently closed one), the index searches the in-memory
23  * representation of the chapter. Finally, if the volume index does not find a record and the index
24  * is sparse, the index will search the sparse cache.
25  *
26  * The index send two kinds of messages to coordinate between zones: chapter close messages for the
27  * chapter writer, and sparse cache barrier messages for the sparse cache.
28  *
29  * The chapter writer is responsible for committing chapters of records to storage. Since zones can
30  * get different numbers of records, some zones may fall behind others. Each time a zone fills up
31  * its available space in a chapter, it informs the chapter writer that the chapter is complete,
32  * and also informs all other zones that it has closed the chapter. Each other zone will then close
33  * the chapter immediately, regardless of how full it is, in order to minimize skew between zones.
34  * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage.
35  *
36  * The last zone to close the chapter also removes the oldest chapter from the volume index.
37  * Although that chapter is invalid for zones that have moved on, the existence of the open chapter
38  * means that those zones will never ask the volume index about it. No zone is allowed to get more
39  * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another
40  * chapter before the previous one has been closed by all zones, it is forced to wait.
41  *
42  * The sparse cache relies on having the same set of chapter indexes available to all zones. When a
43  * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone
44  * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and
45  * paused its operations, the cache membership is changed and each zone is then informed that it
46  * can proceed. More details can be found in the sparse cache documentation.
47  *
48  * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the
49  * barrier message to change the sparse cache membership, so the index simulates the message by
50  * invoking the handler directly.
51  */
52 
53 struct chapter_writer {
54 	/* The index to which we belong */
55 	struct uds_index *index;
56 	/* The thread to do the writing */
57 	struct thread *thread;
58 	/* The lock protecting the following fields */
59 	struct mutex mutex;
60 	/* The condition signalled on state changes */
61 	struct cond_var cond;
62 	/* Set to true to stop the thread */
63 	bool stop;
64 	/* The result from the most recent write */
65 	int result;
66 	/* The number of bytes allocated by the chapter writer */
67 	size_t memory_size;
68 	/* The number of zones which have submitted a chapter for writing */
69 	unsigned int zones_to_write;
70 	/* Open chapter index used by uds_close_open_chapter() */
71 	struct open_chapter_index *open_chapter_index;
72 	/* Collated records used by uds_close_open_chapter() */
73 	struct uds_volume_record *collated_records;
74 	/* The chapters to write (one per zone) */
75 	struct open_chapter_zone *chapters[];
76 };
77 
78 static bool is_zone_chapter_sparse(const struct index_zone *zone, u64 virtual_chapter)
79 {
80 	return uds_is_chapter_sparse(zone->index->volume->geometry,
81 				     zone->oldest_virtual_chapter,
82 				     zone->newest_virtual_chapter, virtual_chapter);
83 }
84 
85 static int launch_zone_message(struct uds_zone_message message, unsigned int zone,
86 			       struct uds_index *index)
87 {
88 	int result;
89 	struct uds_request *request;
90 
91 	result = vdo_allocate(1, struct uds_request, __func__, &request);
92 	if (result != VDO_SUCCESS)
93 		return result;
94 
95 	request->index = index;
96 	request->unbatched = true;
97 	request->zone_number = zone;
98 	request->zone_message = message;
99 
100 	uds_enqueue_request(request, STAGE_MESSAGE);
101 	return UDS_SUCCESS;
102 }
103 
104 static void enqueue_barrier_messages(struct uds_index *index, u64 virtual_chapter)
105 {
106 	struct uds_zone_message message = {
107 		.type = UDS_MESSAGE_SPARSE_CACHE_BARRIER,
108 		.virtual_chapter = virtual_chapter,
109 	};
110 	unsigned int zone;
111 
112 	for (zone = 0; zone < index->zone_count; zone++) {
113 		int result = launch_zone_message(message, zone, index);
114 
115 		VDO_ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation");
116 	}
117 }
118 
119 /*
120  * Determine whether this request should trigger a sparse cache barrier message to change the
121  * membership of the sparse cache. If a change in membership is desired, the function returns the
122  * chapter number to add.
123  */
124 static u64 triage_index_request(struct uds_index *index, struct uds_request *request)
125 {
126 	u64 virtual_chapter;
127 	struct index_zone *zone;
128 
129 	virtual_chapter = uds_lookup_volume_index_name(index->volume_index,
130 						       &request->record_name);
131 	if (virtual_chapter == NO_CHAPTER)
132 		return NO_CHAPTER;
133 
134 	zone = index->zones[request->zone_number];
135 	if (!is_zone_chapter_sparse(zone, virtual_chapter))
136 		return NO_CHAPTER;
137 
138 	/*
139 	 * FIXME: Optimize for a common case by remembering the chapter from the most recent
140 	 * barrier message and skipping this chapter if is it the same.
141 	 */
142 
143 	return virtual_chapter;
144 }
145 
146 /*
147  * Simulate a message to change the sparse cache membership for a single-zone sparse index. This
148  * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind
149  * of index does nothing here.
150  */
151 static int simulate_index_zone_barrier_message(struct index_zone *zone,
152 					       struct uds_request *request)
153 {
154 	u64 sparse_virtual_chapter;
155 
156 	if ((zone->index->zone_count > 1) ||
157 	    !uds_is_sparse_index_geometry(zone->index->volume->geometry))
158 		return UDS_SUCCESS;
159 
160 	sparse_virtual_chapter = triage_index_request(zone->index, request);
161 	if (sparse_virtual_chapter == NO_CHAPTER)
162 		return UDS_SUCCESS;
163 
164 	return uds_update_sparse_cache(zone, sparse_virtual_chapter);
165 }
166 
167 /* This is the request processing function for the triage queue. */
168 static void triage_request(struct uds_request *request)
169 {
170 	struct uds_index *index = request->index;
171 	u64 sparse_virtual_chapter = triage_index_request(index, request);
172 
173 	if (sparse_virtual_chapter != NO_CHAPTER)
174 		enqueue_barrier_messages(index, sparse_virtual_chapter);
175 
176 	uds_enqueue_request(request, STAGE_INDEX);
177 }
178 
179 static int finish_previous_chapter(struct uds_index *index, u64 current_chapter_number)
180 {
181 	int result;
182 	struct chapter_writer *writer = index->chapter_writer;
183 
184 	mutex_lock(&writer->mutex);
185 	while (index->newest_virtual_chapter < current_chapter_number)
186 		uds_wait_cond(&writer->cond, &writer->mutex);
187 	result = writer->result;
188 	mutex_unlock(&writer->mutex);
189 
190 	if (result != UDS_SUCCESS)
191 		return vdo_log_error_strerror(result,
192 					      "Writing of previous open chapter failed");
193 
194 	return UDS_SUCCESS;
195 }
196 
197 static int swap_open_chapter(struct index_zone *zone)
198 {
199 	int result;
200 	struct open_chapter_zone *temporary_chapter;
201 
202 	result = finish_previous_chapter(zone->index, zone->newest_virtual_chapter);
203 	if (result != UDS_SUCCESS)
204 		return result;
205 
206 	temporary_chapter = zone->open_chapter;
207 	zone->open_chapter = zone->writing_chapter;
208 	zone->writing_chapter = temporary_chapter;
209 	return UDS_SUCCESS;
210 }
211 
212 /*
213  * Inform the chapter writer that this zone is done with this chapter. The chapter won't start
214  * writing until all zones have closed it.
215  */
216 static unsigned int start_closing_chapter(struct uds_index *index,
217 					  unsigned int zone_number,
218 					  struct open_chapter_zone *chapter)
219 {
220 	unsigned int finished_zones;
221 	struct chapter_writer *writer = index->chapter_writer;
222 
223 	mutex_lock(&writer->mutex);
224 	finished_zones = ++writer->zones_to_write;
225 	writer->chapters[zone_number] = chapter;
226 	uds_broadcast_cond(&writer->cond);
227 	mutex_unlock(&writer->mutex);
228 
229 	return finished_zones;
230 }
231 
232 static int announce_chapter_closed(struct index_zone *zone, u64 closed_chapter)
233 {
234 	int result;
235 	unsigned int i;
236 	struct uds_zone_message zone_message = {
237 		.type = UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED,
238 		.virtual_chapter = closed_chapter,
239 	};
240 
241 	for (i = 0; i < zone->index->zone_count; i++) {
242 		if (zone->id == i)
243 			continue;
244 
245 		result = launch_zone_message(zone_message, i, zone->index);
246 		if (result != UDS_SUCCESS)
247 			return result;
248 	}
249 
250 	return UDS_SUCCESS;
251 }
252 
253 static int open_next_chapter(struct index_zone *zone)
254 {
255 	int result;
256 	u64 closed_chapter;
257 	u64 expiring;
258 	unsigned int finished_zones;
259 	u32 expire_chapters;
260 
261 	vdo_log_debug("closing chapter %llu of zone %u after %u entries (%u short)",
262 		      (unsigned long long) zone->newest_virtual_chapter, zone->id,
263 		      zone->open_chapter->size,
264 		      zone->open_chapter->capacity - zone->open_chapter->size);
265 
266 	result = swap_open_chapter(zone);
267 	if (result != UDS_SUCCESS)
268 		return result;
269 
270 	closed_chapter = zone->newest_virtual_chapter++;
271 	uds_set_volume_index_zone_open_chapter(zone->index->volume_index, zone->id,
272 					       zone->newest_virtual_chapter);
273 	uds_reset_open_chapter(zone->open_chapter);
274 
275 	finished_zones = start_closing_chapter(zone->index, zone->id,
276 					       zone->writing_chapter);
277 	if ((finished_zones == 1) && (zone->index->zone_count > 1)) {
278 		result = announce_chapter_closed(zone, closed_chapter);
279 		if (result != UDS_SUCCESS)
280 			return result;
281 	}
282 
283 	expiring = zone->oldest_virtual_chapter;
284 	expire_chapters = uds_chapters_to_expire(zone->index->volume->geometry,
285 						 zone->newest_virtual_chapter);
286 	zone->oldest_virtual_chapter += expire_chapters;
287 
288 	if (finished_zones < zone->index->zone_count)
289 		return UDS_SUCCESS;
290 
291 	while (expire_chapters-- > 0)
292 		uds_forget_chapter(zone->index->volume, expiring++);
293 
294 	return UDS_SUCCESS;
295 }
296 
297 static int handle_chapter_closed(struct index_zone *zone, u64 virtual_chapter)
298 {
299 	if (zone->newest_virtual_chapter == virtual_chapter)
300 		return open_next_chapter(zone);
301 
302 	return UDS_SUCCESS;
303 }
304 
305 static int dispatch_index_zone_control_request(struct uds_request *request)
306 {
307 	struct uds_zone_message *message = &request->zone_message;
308 	struct index_zone *zone = request->index->zones[request->zone_number];
309 
310 	switch (message->type) {
311 	case UDS_MESSAGE_SPARSE_CACHE_BARRIER:
312 		return uds_update_sparse_cache(zone, message->virtual_chapter);
313 
314 	case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED:
315 		return handle_chapter_closed(zone, message->virtual_chapter);
316 
317 	default:
318 		vdo_log_error("invalid message type: %d", message->type);
319 		return UDS_INVALID_ARGUMENT;
320 	}
321 }
322 
323 static void set_request_location(struct uds_request *request,
324 				 enum uds_index_region new_location)
325 {
326 	request->location = new_location;
327 	request->found = ((new_location == UDS_LOCATION_IN_OPEN_CHAPTER) ||
328 			  (new_location == UDS_LOCATION_IN_DENSE) ||
329 			  (new_location == UDS_LOCATION_IN_SPARSE));
330 }
331 
332 static void set_chapter_location(struct uds_request *request,
333 				 const struct index_zone *zone, u64 virtual_chapter)
334 {
335 	request->found = true;
336 	if (virtual_chapter == zone->newest_virtual_chapter)
337 		request->location = UDS_LOCATION_IN_OPEN_CHAPTER;
338 	else if (is_zone_chapter_sparse(zone, virtual_chapter))
339 		request->location = UDS_LOCATION_IN_SPARSE;
340 	else
341 		request->location = UDS_LOCATION_IN_DENSE;
342 }
343 
344 static int search_sparse_cache_in_zone(struct index_zone *zone, struct uds_request *request,
345 				       u64 virtual_chapter, bool *found)
346 {
347 	int result;
348 	struct volume *volume;
349 	u16 record_page_number;
350 	u32 chapter;
351 
352 	result = uds_search_sparse_cache(zone, &request->record_name, &virtual_chapter,
353 					 &record_page_number);
354 	if ((result != UDS_SUCCESS) || (virtual_chapter == NO_CHAPTER))
355 		return result;
356 
357 	request->virtual_chapter = virtual_chapter;
358 	volume = zone->index->volume;
359 	chapter = uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
360 	return uds_search_cached_record_page(volume, request, chapter,
361 					     record_page_number, found);
362 }
363 
364 static int get_record_from_zone(struct index_zone *zone, struct uds_request *request,
365 				bool *found)
366 {
367 	struct volume *volume;
368 
369 	if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
370 		*found = true;
371 		return UDS_SUCCESS;
372 	} else if (request->location == UDS_LOCATION_UNAVAILABLE) {
373 		*found = false;
374 		return UDS_SUCCESS;
375 	}
376 
377 	if (request->virtual_chapter == zone->newest_virtual_chapter) {
378 		uds_search_open_chapter(zone->open_chapter, &request->record_name,
379 					&request->old_metadata, found);
380 		return UDS_SUCCESS;
381 	}
382 
383 	if ((zone->newest_virtual_chapter > 0) &&
384 	    (request->virtual_chapter == (zone->newest_virtual_chapter - 1)) &&
385 	    (zone->writing_chapter->size > 0)) {
386 		uds_search_open_chapter(zone->writing_chapter, &request->record_name,
387 					&request->old_metadata, found);
388 		return UDS_SUCCESS;
389 	}
390 
391 	volume = zone->index->volume;
392 	if (is_zone_chapter_sparse(zone, request->virtual_chapter) &&
393 	    uds_sparse_cache_contains(volume->sparse_cache, request->virtual_chapter,
394 				      request->zone_number))
395 		return search_sparse_cache_in_zone(zone, request,
396 						   request->virtual_chapter, found);
397 
398 	return uds_search_volume_page_cache(volume, request, found);
399 }
400 
401 static int put_record_in_zone(struct index_zone *zone, struct uds_request *request,
402 			      const struct uds_record_data *metadata)
403 {
404 	unsigned int remaining;
405 
406 	remaining = uds_put_open_chapter(zone->open_chapter, &request->record_name,
407 					 metadata);
408 	if (remaining == 0)
409 		return open_next_chapter(zone);
410 
411 	return UDS_SUCCESS;
412 }
413 
414 static int search_index_zone(struct index_zone *zone, struct uds_request *request)
415 {
416 	int result;
417 	struct volume_index_record record;
418 	bool overflow_record, found = false;
419 	struct uds_record_data *metadata;
420 	u64 chapter;
421 
422 	result = uds_get_volume_index_record(zone->index->volume_index,
423 					     &request->record_name, &record);
424 	if (result != UDS_SUCCESS)
425 		return result;
426 
427 	if (record.is_found) {
428 		if (request->requeued && request->virtual_chapter != record.virtual_chapter)
429 			set_request_location(request, UDS_LOCATION_UNKNOWN);
430 
431 		request->virtual_chapter = record.virtual_chapter;
432 		result = get_record_from_zone(zone, request, &found);
433 		if (result != UDS_SUCCESS)
434 			return result;
435 	}
436 
437 	if (found)
438 		set_chapter_location(request, zone, record.virtual_chapter);
439 
440 	/*
441 	 * If a record has overflowed a chapter index in more than one chapter (or overflowed in
442 	 * one chapter and collided with an existing record), it will exist as a collision record
443 	 * in the volume index, but we won't find it in the volume. This case needs special
444 	 * handling.
445 	 */
446 	overflow_record = (record.is_found && record.is_collision && !found);
447 	chapter = zone->newest_virtual_chapter;
448 	if (found || overflow_record) {
449 		if ((request->type == UDS_QUERY_NO_UPDATE) ||
450 		    ((request->type == UDS_QUERY) && overflow_record)) {
451 			/* There is nothing left to do. */
452 			return UDS_SUCCESS;
453 		}
454 
455 		if (record.virtual_chapter != chapter) {
456 			/*
457 			 * Update the volume index to reference the new chapter for the block. If
458 			 * the record had been deleted or dropped from the chapter index, it will
459 			 * be back.
460 			 */
461 			result = uds_set_volume_index_record_chapter(&record, chapter);
462 		} else if (request->type != UDS_UPDATE) {
463 			/* The record is already in the open chapter. */
464 			return UDS_SUCCESS;
465 		}
466 	} else {
467 		/*
468 		 * The record wasn't in the volume index, so check whether the
469 		 * name is in a cached sparse chapter. If we found the name on
470 		 * a previous search, use that result instead.
471 		 */
472 		if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
473 			found = true;
474 		} else if (request->location == UDS_LOCATION_UNAVAILABLE) {
475 			found = false;
476 		} else if (uds_is_sparse_index_geometry(zone->index->volume->geometry) &&
477 			   !uds_is_volume_index_sample(zone->index->volume_index,
478 						       &request->record_name)) {
479 			result = search_sparse_cache_in_zone(zone, request, NO_CHAPTER,
480 							     &found);
481 			if (result != UDS_SUCCESS)
482 				return result;
483 		}
484 
485 		if (found)
486 			set_request_location(request, UDS_LOCATION_IN_SPARSE);
487 
488 		if ((request->type == UDS_QUERY_NO_UPDATE) ||
489 		    ((request->type == UDS_QUERY) && !found)) {
490 			/* There is nothing left to do. */
491 			return UDS_SUCCESS;
492 		}
493 
494 		/*
495 		 * Add a new entry to the volume index referencing the open chapter. This needs to
496 		 * be done both for new records, and for records from cached sparse chapters.
497 		 */
498 		result = uds_put_volume_index_record(&record, chapter);
499 	}
500 
501 	if (result == UDS_OVERFLOW) {
502 		/*
503 		 * The volume index encountered a delta list overflow. The condition was already
504 		 * logged. We will go on without adding the record to the open chapter.
505 		 */
506 		return UDS_SUCCESS;
507 	}
508 
509 	if (result != UDS_SUCCESS)
510 		return result;
511 
512 	if (!found || (request->type == UDS_UPDATE)) {
513 		/* This is a new record or we're updating an existing record. */
514 		metadata = &request->new_metadata;
515 	} else {
516 		/* Move the existing record to the open chapter. */
517 		metadata = &request->old_metadata;
518 	}
519 
520 	return put_record_in_zone(zone, request, metadata);
521 }
522 
523 static int remove_from_index_zone(struct index_zone *zone, struct uds_request *request)
524 {
525 	int result;
526 	struct volume_index_record record;
527 
528 	result = uds_get_volume_index_record(zone->index->volume_index,
529 					     &request->record_name, &record);
530 	if (result != UDS_SUCCESS)
531 		return result;
532 
533 	if (!record.is_found)
534 		return UDS_SUCCESS;
535 
536 	/* If the request was requeued, check whether the saved state is still valid. */
537 
538 	if (record.is_collision) {
539 		set_chapter_location(request, zone, record.virtual_chapter);
540 	} else {
541 		/* Non-collision records are hints, so resolve the name in the chapter. */
542 		bool found;
543 
544 		if (request->requeued && request->virtual_chapter != record.virtual_chapter)
545 			set_request_location(request, UDS_LOCATION_UNKNOWN);
546 
547 		request->virtual_chapter = record.virtual_chapter;
548 		result = get_record_from_zone(zone, request, &found);
549 		if (result != UDS_SUCCESS)
550 			return result;
551 
552 		if (!found) {
553 			/* There is no record to remove. */
554 			return UDS_SUCCESS;
555 		}
556 	}
557 
558 	set_chapter_location(request, zone, record.virtual_chapter);
559 
560 	/*
561 	 * Delete the volume index entry for the named record only. Note that a later search might
562 	 * later return stale advice if there is a colliding name in the same chapter, but it's a
563 	 * very rare case (1 in 2^21).
564 	 */
565 	result = uds_remove_volume_index_record(&record);
566 	if (result != UDS_SUCCESS)
567 		return result;
568 
569 	/*
570 	 * If the record is in the open chapter, we must remove it or mark it deleted to avoid
571 	 * trouble if the record is added again later.
572 	 */
573 	if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER)
574 		uds_remove_from_open_chapter(zone->open_chapter, &request->record_name);
575 
576 	return UDS_SUCCESS;
577 }
578 
579 static int dispatch_index_request(struct uds_index *index, struct uds_request *request)
580 {
581 	int result;
582 	struct index_zone *zone = index->zones[request->zone_number];
583 
584 	if (!request->requeued) {
585 		result = simulate_index_zone_barrier_message(zone, request);
586 		if (result != UDS_SUCCESS)
587 			return result;
588 	}
589 
590 	switch (request->type) {
591 	case UDS_POST:
592 	case UDS_UPDATE:
593 	case UDS_QUERY:
594 	case UDS_QUERY_NO_UPDATE:
595 		result = search_index_zone(zone, request);
596 		break;
597 
598 	case UDS_DELETE:
599 		result = remove_from_index_zone(zone, request);
600 		break;
601 
602 	default:
603 		result = vdo_log_warning_strerror(UDS_INVALID_ARGUMENT,
604 						  "invalid request type: %d",
605 						  request->type);
606 		break;
607 	}
608 
609 	return result;
610 }
611 
612 /* This is the request processing function invoked by each zone's thread. */
613 static void execute_zone_request(struct uds_request *request)
614 {
615 	int result;
616 	struct uds_index *index = request->index;
617 
618 	if (request->zone_message.type != UDS_MESSAGE_NONE) {
619 		result = dispatch_index_zone_control_request(request);
620 		if (result != UDS_SUCCESS) {
621 			vdo_log_error_strerror(result, "error executing message: %d",
622 					       request->zone_message.type);
623 		}
624 
625 		/* Once the message is processed it can be freed. */
626 		vdo_free(vdo_forget(request));
627 		return;
628 	}
629 
630 	index->need_to_save = true;
631 	if (request->requeued && (request->status != UDS_SUCCESS)) {
632 		set_request_location(request, UDS_LOCATION_UNAVAILABLE);
633 		index->callback(request);
634 		return;
635 	}
636 
637 	result = dispatch_index_request(index, request);
638 	if (result == UDS_QUEUED) {
639 		/* The request has been requeued so don't let it complete. */
640 		return;
641 	}
642 
643 	if (!request->found)
644 		set_request_location(request, UDS_LOCATION_UNAVAILABLE);
645 
646 	request->status = result;
647 	index->callback(request);
648 }
649 
650 static int initialize_index_queues(struct uds_index *index,
651 				   const struct index_geometry *geometry)
652 {
653 	int result;
654 	unsigned int i;
655 
656 	for (i = 0; i < index->zone_count; i++) {
657 		result = uds_make_request_queue("indexW", &execute_zone_request,
658 						&index->zone_queues[i]);
659 		if (result != UDS_SUCCESS)
660 			return result;
661 	}
662 
663 	/* The triage queue is only needed for sparse multi-zone indexes. */
664 	if ((index->zone_count > 1) && uds_is_sparse_index_geometry(geometry)) {
665 		result = uds_make_request_queue("triageW", &triage_request,
666 						&index->triage_queue);
667 		if (result != UDS_SUCCESS)
668 			return result;
669 	}
670 
671 	return UDS_SUCCESS;
672 }
673 
674 /* This is the driver function for the chapter writer thread. */
675 static void close_chapters(void *arg)
676 {
677 	int result;
678 	struct chapter_writer *writer = arg;
679 	struct uds_index *index = writer->index;
680 
681 	vdo_log_debug("chapter writer starting");
682 	mutex_lock(&writer->mutex);
683 	for (;;) {
684 		while (writer->zones_to_write < index->zone_count) {
685 			if (writer->stop && (writer->zones_to_write == 0)) {
686 				/*
687 				 * We've been told to stop, and all of the zones are in the same
688 				 * open chapter, so we can exit now.
689 				 */
690 				mutex_unlock(&writer->mutex);
691 				vdo_log_debug("chapter writer stopping");
692 				return;
693 			}
694 			uds_wait_cond(&writer->cond, &writer->mutex);
695 		}
696 
697 		/*
698 		 * Release the lock while closing a chapter. We probably don't need to do this, but
699 		 * it seems safer in principle. It's OK to access the chapter and chapter_number
700 		 * fields without the lock since those aren't allowed to change until we're done.
701 		 */
702 		mutex_unlock(&writer->mutex);
703 
704 		if (index->has_saved_open_chapter) {
705 			/*
706 			 * Remove the saved open chapter the first time we close an open chapter
707 			 * after loading from a clean shutdown, or after doing a clean save. The
708 			 * lack of the saved open chapter will indicate that a recovery is
709 			 * necessary.
710 			 */
711 			index->has_saved_open_chapter = false;
712 			result = uds_discard_open_chapter(index->layout);
713 			if (result == UDS_SUCCESS)
714 				vdo_log_debug("Discarding saved open chapter");
715 		}
716 
717 		result = uds_close_open_chapter(writer->chapters, index->zone_count,
718 						index->volume,
719 						writer->open_chapter_index,
720 						writer->collated_records,
721 						index->newest_virtual_chapter);
722 
723 		mutex_lock(&writer->mutex);
724 		index->newest_virtual_chapter++;
725 		index->oldest_virtual_chapter +=
726 			uds_chapters_to_expire(index->volume->geometry,
727 					       index->newest_virtual_chapter);
728 		writer->result = result;
729 		writer->zones_to_write = 0;
730 		uds_broadcast_cond(&writer->cond);
731 	}
732 }
733 
734 static void stop_chapter_writer(struct chapter_writer *writer)
735 {
736 	struct thread *writer_thread = NULL;
737 
738 	mutex_lock(&writer->mutex);
739 	if (writer->thread != NULL) {
740 		writer_thread = writer->thread;
741 		writer->thread = NULL;
742 		writer->stop = true;
743 		uds_broadcast_cond(&writer->cond);
744 	}
745 	mutex_unlock(&writer->mutex);
746 
747 	if (writer_thread != NULL)
748 		vdo_join_threads(writer_thread);
749 }
750 
751 static void free_chapter_writer(struct chapter_writer *writer)
752 {
753 	if (writer == NULL)
754 		return;
755 
756 	stop_chapter_writer(writer);
757 	uds_free_open_chapter_index(writer->open_chapter_index);
758 	vdo_free(writer->collated_records);
759 	vdo_free(writer);
760 }
761 
762 static int make_chapter_writer(struct uds_index *index,
763 			       struct chapter_writer **writer_ptr)
764 {
765 	int result;
766 	struct chapter_writer *writer;
767 	size_t collated_records_size =
768 		(sizeof(struct uds_volume_record) * index->volume->geometry->records_per_chapter);
769 
770 	result = vdo_allocate_extended(struct chapter_writer, index->zone_count,
771 				       struct open_chapter_zone *, "Chapter Writer",
772 				       &writer);
773 	if (result != VDO_SUCCESS)
774 		return result;
775 
776 	writer->index = index;
777 	mutex_init(&writer->mutex);
778 	uds_init_cond(&writer->cond);
779 
780 	result = vdo_allocate_cache_aligned(collated_records_size, "collated records",
781 					    &writer->collated_records);
782 	if (result != VDO_SUCCESS) {
783 		free_chapter_writer(writer);
784 		return result;
785 	}
786 
787 	result = uds_make_open_chapter_index(&writer->open_chapter_index,
788 					     index->volume->geometry,
789 					     index->volume->nonce);
790 	if (result != UDS_SUCCESS) {
791 		free_chapter_writer(writer);
792 		return result;
793 	}
794 
795 	writer->memory_size = (sizeof(struct chapter_writer) +
796 			       index->zone_count * sizeof(struct open_chapter_zone *) +
797 			       collated_records_size +
798 			       writer->open_chapter_index->memory_size);
799 
800 	result = vdo_create_thread(close_chapters, writer, "writer", &writer->thread);
801 	if (result != VDO_SUCCESS) {
802 		free_chapter_writer(writer);
803 		return result;
804 	}
805 
806 	*writer_ptr = writer;
807 	return UDS_SUCCESS;
808 }
809 
810 static int load_index(struct uds_index *index)
811 {
812 	int result;
813 	u64 last_save_chapter;
814 
815 	result = uds_load_index_state(index->layout, index);
816 	if (result != UDS_SUCCESS)
817 		return UDS_INDEX_NOT_SAVED_CLEANLY;
818 
819 	last_save_chapter = ((index->last_save != NO_LAST_SAVE) ? index->last_save : 0);
820 
821 	vdo_log_info("loaded index from chapter %llu through chapter %llu",
822 		     (unsigned long long) index->oldest_virtual_chapter,
823 		     (unsigned long long) last_save_chapter);
824 
825 	return UDS_SUCCESS;
826 }
827 
828 static int rebuild_index_page_map(struct uds_index *index, u64 vcn)
829 {
830 	int result;
831 	struct delta_index_page *chapter_index_page;
832 	struct index_geometry *geometry = index->volume->geometry;
833 	u32 chapter = uds_map_to_physical_chapter(geometry, vcn);
834 	u32 expected_list_number = 0;
835 	u32 index_page_number;
836 	u32 lowest_delta_list;
837 	u32 highest_delta_list;
838 
839 	for (index_page_number = 0;
840 	     index_page_number < geometry->index_pages_per_chapter;
841 	     index_page_number++) {
842 		result = uds_get_volume_index_page(index->volume, chapter,
843 						   index_page_number,
844 						   &chapter_index_page);
845 		if (result != UDS_SUCCESS) {
846 			return vdo_log_error_strerror(result,
847 						      "failed to read index page %u in chapter %u",
848 						      index_page_number, chapter);
849 		}
850 
851 		lowest_delta_list = chapter_index_page->lowest_list_number;
852 		highest_delta_list = chapter_index_page->highest_list_number;
853 		if (lowest_delta_list != expected_list_number) {
854 			return vdo_log_error_strerror(UDS_CORRUPT_DATA,
855 						      "chapter %u index page %u is corrupt",
856 						      chapter, index_page_number);
857 		}
858 
859 		uds_update_index_page_map(index->volume->index_page_map, vcn, chapter,
860 					  index_page_number, highest_delta_list);
861 		expected_list_number = highest_delta_list + 1;
862 	}
863 
864 	return UDS_SUCCESS;
865 }
866 
867 static int replay_record(struct uds_index *index, const struct uds_record_name *name,
868 			 u64 virtual_chapter, bool will_be_sparse_chapter)
869 {
870 	int result;
871 	struct volume_index_record record;
872 	bool update_record;
873 
874 	if (will_be_sparse_chapter &&
875 	    !uds_is_volume_index_sample(index->volume_index, name)) {
876 		/*
877 		 * This entry will be in a sparse chapter after the rebuild completes, and it is
878 		 * not a sample, so just skip over it.
879 		 */
880 		return UDS_SUCCESS;
881 	}
882 
883 	result = uds_get_volume_index_record(index->volume_index, name, &record);
884 	if (result != UDS_SUCCESS)
885 		return result;
886 
887 	if (record.is_found) {
888 		if (record.is_collision) {
889 			if (record.virtual_chapter == virtual_chapter) {
890 				/* The record is already correct. */
891 				return UDS_SUCCESS;
892 			}
893 
894 			update_record = true;
895 		} else if (record.virtual_chapter == virtual_chapter) {
896 			/*
897 			 * There is a volume index entry pointing to the current chapter, but we
898 			 * don't know if it is for the same name as the one we are currently
899 			 * working on or not. For now, we're just going to assume that it isn't.
900 			 * This will create one extra collision record if there was a deleted
901 			 * record in the current chapter.
902 			 */
903 			update_record = false;
904 		} else {
905 			/*
906 			 * If we're rebuilding, we don't normally want to go to disk to see if the
907 			 * record exists, since we will likely have just read the record from disk
908 			 * (i.e. we know it's there). The exception to this is when we find an
909 			 * entry in the volume index that has a different chapter. In this case, we
910 			 * need to search that chapter to determine if the volume index entry was
911 			 * for the same record or a different one.
912 			 */
913 			result = uds_search_volume_page_cache_for_rebuild(index->volume,
914 									  name,
915 									  record.virtual_chapter,
916 									  &update_record);
917 			if (result != UDS_SUCCESS)
918 				return result;
919 			}
920 	} else {
921 		update_record = false;
922 	}
923 
924 	if (update_record) {
925 		/*
926 		 * Update the volume index to reference the new chapter for the block. If the
927 		 * record had been deleted or dropped from the chapter index, it will be back.
928 		 */
929 		result = uds_set_volume_index_record_chapter(&record, virtual_chapter);
930 	} else {
931 		/*
932 		 * Add a new entry to the volume index referencing the open chapter. This should be
933 		 * done regardless of whether we are a brand new record or a sparse record, i.e.
934 		 * one that doesn't exist in the index but does on disk, since for a sparse record,
935 		 * we would want to un-sparsify if it did exist.
936 		 */
937 		result = uds_put_volume_index_record(&record, virtual_chapter);
938 	}
939 
940 	if ((result == UDS_DUPLICATE_NAME) || (result == UDS_OVERFLOW)) {
941 		/* The rebuilt index will lose these records. */
942 		return UDS_SUCCESS;
943 	}
944 
945 	return result;
946 }
947 
948 static bool check_for_suspend(struct uds_index *index)
949 {
950 	bool closing;
951 
952 	if (index->load_context == NULL)
953 		return false;
954 
955 	mutex_lock(&index->load_context->mutex);
956 	if (index->load_context->status != INDEX_SUSPENDING) {
957 		mutex_unlock(&index->load_context->mutex);
958 		return false;
959 	}
960 
961 	/* Notify that we are suspended and wait for the resume. */
962 	index->load_context->status = INDEX_SUSPENDED;
963 	uds_broadcast_cond(&index->load_context->cond);
964 
965 	while ((index->load_context->status != INDEX_OPENING) &&
966 	       (index->load_context->status != INDEX_FREEING))
967 		uds_wait_cond(&index->load_context->cond, &index->load_context->mutex);
968 
969 	closing = (index->load_context->status == INDEX_FREEING);
970 	mutex_unlock(&index->load_context->mutex);
971 	return closing;
972 }
973 
974 static int replay_chapter(struct uds_index *index, u64 virtual, bool sparse)
975 {
976 	int result;
977 	u32 i;
978 	u32 j;
979 	const struct index_geometry *geometry;
980 	u32 physical_chapter;
981 
982 	if (check_for_suspend(index)) {
983 		vdo_log_info("Replay interrupted by index shutdown at chapter %llu",
984 			     (unsigned long long) virtual);
985 		return -EBUSY;
986 	}
987 
988 	geometry = index->volume->geometry;
989 	physical_chapter = uds_map_to_physical_chapter(geometry, virtual);
990 	uds_prefetch_volume_chapter(index->volume, physical_chapter);
991 	uds_set_volume_index_open_chapter(index->volume_index, virtual);
992 
993 	result = rebuild_index_page_map(index, virtual);
994 	if (result != UDS_SUCCESS) {
995 		return vdo_log_error_strerror(result,
996 					      "could not rebuild index page map for chapter %u",
997 					      physical_chapter);
998 	}
999 
1000 	for (i = 0; i < geometry->record_pages_per_chapter; i++) {
1001 		u8 *record_page;
1002 		u32 record_page_number;
1003 
1004 		record_page_number = geometry->index_pages_per_chapter + i;
1005 		result = uds_get_volume_record_page(index->volume, physical_chapter,
1006 						    record_page_number, &record_page);
1007 		if (result != UDS_SUCCESS) {
1008 			return vdo_log_error_strerror(result, "could not get page %d",
1009 						      record_page_number);
1010 		}
1011 
1012 		for (j = 0; j < geometry->records_per_page; j++) {
1013 			const u8 *name_bytes;
1014 			struct uds_record_name name;
1015 
1016 			name_bytes = record_page + (j * BYTES_PER_RECORD);
1017 			memcpy(&name.name, name_bytes, UDS_RECORD_NAME_SIZE);
1018 			result = replay_record(index, &name, virtual, sparse);
1019 			if (result != UDS_SUCCESS)
1020 				return result;
1021 		}
1022 	}
1023 
1024 	return UDS_SUCCESS;
1025 }
1026 
1027 static int replay_volume(struct uds_index *index)
1028 {
1029 	int result;
1030 	u64 old_map_update;
1031 	u64 new_map_update;
1032 	u64 virtual;
1033 	u64 from_virtual = index->oldest_virtual_chapter;
1034 	u64 upto_virtual = index->newest_virtual_chapter;
1035 	bool will_be_sparse;
1036 
1037 	vdo_log_info("Replaying volume from chapter %llu through chapter %llu",
1038 		     (unsigned long long) from_virtual,
1039 		     (unsigned long long) upto_virtual);
1040 
1041 	/*
1042 	 * The index failed to load, so the volume index is empty. Add records to the volume index
1043 	 * in order, skipping non-hooks in chapters which will be sparse to save time.
1044 	 *
1045 	 * Go through each record page of each chapter and add the records back to the volume
1046 	 * index. This should not cause anything to be written to either the open chapter or the
1047 	 * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this
1048 	 * would have already been purged from the volume index when the chapter was opened.
1049 	 *
1050 	 * Also, go through each index page for each chapter and rebuild the index page map.
1051 	 */
1052 	old_map_update = index->volume->index_page_map->last_update;
1053 	for (virtual = from_virtual; virtual < upto_virtual; virtual++) {
1054 		will_be_sparse = uds_is_chapter_sparse(index->volume->geometry,
1055 						       from_virtual, upto_virtual,
1056 						       virtual);
1057 		result = replay_chapter(index, virtual, will_be_sparse);
1058 		if (result != UDS_SUCCESS)
1059 			return result;
1060 	}
1061 
1062 	/* Also reap the chapter being replaced by the open chapter. */
1063 	uds_set_volume_index_open_chapter(index->volume_index, upto_virtual);
1064 
1065 	new_map_update = index->volume->index_page_map->last_update;
1066 	if (new_map_update != old_map_update) {
1067 		vdo_log_info("replay changed index page map update from %llu to %llu",
1068 			     (unsigned long long) old_map_update,
1069 			     (unsigned long long) new_map_update);
1070 	}
1071 
1072 	return UDS_SUCCESS;
1073 }
1074 
1075 static int rebuild_index(struct uds_index *index)
1076 {
1077 	int result;
1078 	u64 lowest;
1079 	u64 highest;
1080 	bool is_empty = false;
1081 	u32 chapters_per_volume = index->volume->geometry->chapters_per_volume;
1082 
1083 	index->volume->lookup_mode = LOOKUP_FOR_REBUILD;
1084 	result = uds_find_volume_chapter_boundaries(index->volume, &lowest, &highest,
1085 						    &is_empty);
1086 	if (result != UDS_SUCCESS) {
1087 		return vdo_log_fatal_strerror(result,
1088 					      "cannot rebuild index: unknown volume chapter boundaries");
1089 	}
1090 
1091 	if (is_empty) {
1092 		index->newest_virtual_chapter = 0;
1093 		index->oldest_virtual_chapter = 0;
1094 		index->volume->lookup_mode = LOOKUP_NORMAL;
1095 		return UDS_SUCCESS;
1096 	}
1097 
1098 	index->newest_virtual_chapter = highest + 1;
1099 	index->oldest_virtual_chapter = lowest;
1100 	if (index->newest_virtual_chapter ==
1101 	    (index->oldest_virtual_chapter + chapters_per_volume)) {
1102 		/* Skip the chapter shadowed by the open chapter. */
1103 		index->oldest_virtual_chapter++;
1104 	}
1105 
1106 	result = replay_volume(index);
1107 	if (result != UDS_SUCCESS)
1108 		return result;
1109 
1110 	index->volume->lookup_mode = LOOKUP_NORMAL;
1111 	return UDS_SUCCESS;
1112 }
1113 
1114 static void free_index_zone(struct index_zone *zone)
1115 {
1116 	if (zone == NULL)
1117 		return;
1118 
1119 	uds_free_open_chapter(zone->open_chapter);
1120 	uds_free_open_chapter(zone->writing_chapter);
1121 	vdo_free(zone);
1122 }
1123 
1124 static int make_index_zone(struct uds_index *index, unsigned int zone_number)
1125 {
1126 	int result;
1127 	struct index_zone *zone;
1128 
1129 	result = vdo_allocate(1, struct index_zone, "index zone", &zone);
1130 	if (result != VDO_SUCCESS)
1131 		return result;
1132 
1133 	result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1134 				       &zone->open_chapter);
1135 	if (result != UDS_SUCCESS) {
1136 		free_index_zone(zone);
1137 		return result;
1138 	}
1139 
1140 	result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1141 				       &zone->writing_chapter);
1142 	if (result != UDS_SUCCESS) {
1143 		free_index_zone(zone);
1144 		return result;
1145 	}
1146 
1147 	zone->index = index;
1148 	zone->id = zone_number;
1149 	index->zones[zone_number] = zone;
1150 
1151 	return UDS_SUCCESS;
1152 }
1153 
1154 int uds_make_index(struct uds_configuration *config, enum uds_open_index_type open_type,
1155 		   struct index_load_context *load_context, index_callback_fn callback,
1156 		   struct uds_index **new_index)
1157 {
1158 	int result;
1159 	bool loaded = false;
1160 	bool new = (open_type == UDS_CREATE);
1161 	struct uds_index *index = NULL;
1162 	struct index_zone *zone;
1163 	u64 nonce;
1164 	unsigned int z;
1165 
1166 	result = vdo_allocate_extended(struct uds_index, config->zone_count,
1167 				       struct uds_request_queue *, "index", &index);
1168 	if (result != VDO_SUCCESS)
1169 		return result;
1170 
1171 	index->zone_count = config->zone_count;
1172 
1173 	result = uds_make_index_layout(config, new, &index->layout);
1174 	if (result != UDS_SUCCESS) {
1175 		uds_free_index(index);
1176 		return result;
1177 	}
1178 
1179 	result = vdo_allocate(index->zone_count, struct index_zone *, "zones",
1180 			      &index->zones);
1181 	if (result != VDO_SUCCESS) {
1182 		uds_free_index(index);
1183 		return result;
1184 	}
1185 
1186 	result = uds_make_volume(config, index->layout, &index->volume);
1187 	if (result != UDS_SUCCESS) {
1188 		uds_free_index(index);
1189 		return result;
1190 	}
1191 
1192 	index->volume->lookup_mode = LOOKUP_NORMAL;
1193 	for (z = 0; z < index->zone_count; z++) {
1194 		result = make_index_zone(index, z);
1195 		if (result != UDS_SUCCESS) {
1196 			uds_free_index(index);
1197 			return vdo_log_error_strerror(result,
1198 						      "Could not create index zone");
1199 		}
1200 	}
1201 
1202 	nonce = uds_get_volume_nonce(index->layout);
1203 	result = uds_make_volume_index(config, nonce, &index->volume_index);
1204 	if (result != UDS_SUCCESS) {
1205 		uds_free_index(index);
1206 		return vdo_log_error_strerror(result, "could not make volume index");
1207 	}
1208 
1209 	index->load_context = load_context;
1210 	index->callback = callback;
1211 
1212 	result = initialize_index_queues(index, config->geometry);
1213 	if (result != UDS_SUCCESS) {
1214 		uds_free_index(index);
1215 		return result;
1216 	}
1217 
1218 	result = make_chapter_writer(index, &index->chapter_writer);
1219 	if (result != UDS_SUCCESS) {
1220 		uds_free_index(index);
1221 		return result;
1222 	}
1223 
1224 	if (!new) {
1225 		result = load_index(index);
1226 		switch (result) {
1227 		case UDS_SUCCESS:
1228 			loaded = true;
1229 			break;
1230 		case -ENOMEM:
1231 			/* We should not try a rebuild for this error. */
1232 			vdo_log_error_strerror(result, "index could not be loaded");
1233 			break;
1234 		default:
1235 			vdo_log_error_strerror(result, "index could not be loaded");
1236 			if (open_type == UDS_LOAD) {
1237 				result = rebuild_index(index);
1238 				if (result != UDS_SUCCESS) {
1239 					vdo_log_error_strerror(result,
1240 							       "index could not be rebuilt");
1241 				}
1242 			}
1243 			break;
1244 		}
1245 	}
1246 
1247 	if (result != UDS_SUCCESS) {
1248 		uds_free_index(index);
1249 		return vdo_log_error_strerror(result, "fatal error in %s()", __func__);
1250 	}
1251 
1252 	for (z = 0; z < index->zone_count; z++) {
1253 		zone = index->zones[z];
1254 		zone->oldest_virtual_chapter = index->oldest_virtual_chapter;
1255 		zone->newest_virtual_chapter = index->newest_virtual_chapter;
1256 	}
1257 
1258 	if (index->load_context != NULL) {
1259 		mutex_lock(&index->load_context->mutex);
1260 		index->load_context->status = INDEX_READY;
1261 		/*
1262 		 * If we get here, suspend is meaningless, but notify any thread trying to suspend
1263 		 * us so it doesn't hang.
1264 		 */
1265 		uds_broadcast_cond(&index->load_context->cond);
1266 		mutex_unlock(&index->load_context->mutex);
1267 	}
1268 
1269 	index->has_saved_open_chapter = loaded;
1270 	index->need_to_save = !loaded;
1271 	*new_index = index;
1272 	return UDS_SUCCESS;
1273 }
1274 
1275 void uds_free_index(struct uds_index *index)
1276 {
1277 	unsigned int i;
1278 
1279 	if (index == NULL)
1280 		return;
1281 
1282 	uds_request_queue_finish(index->triage_queue);
1283 	for (i = 0; i < index->zone_count; i++)
1284 		uds_request_queue_finish(index->zone_queues[i]);
1285 
1286 	free_chapter_writer(index->chapter_writer);
1287 
1288 	uds_free_volume_index(index->volume_index);
1289 	if (index->zones != NULL) {
1290 		for (i = 0; i < index->zone_count; i++)
1291 			free_index_zone(index->zones[i]);
1292 		vdo_free(index->zones);
1293 	}
1294 
1295 	uds_free_volume(index->volume);
1296 	uds_free_index_layout(vdo_forget(index->layout));
1297 	vdo_free(index);
1298 }
1299 
1300 /* Wait for the chapter writer to complete any outstanding writes. */
1301 void uds_wait_for_idle_index(struct uds_index *index)
1302 {
1303 	struct chapter_writer *writer = index->chapter_writer;
1304 
1305 	mutex_lock(&writer->mutex);
1306 	while (writer->zones_to_write > 0)
1307 		uds_wait_cond(&writer->cond, &writer->mutex);
1308 	mutex_unlock(&writer->mutex);
1309 }
1310 
1311 /* This function assumes that all requests have been drained. */
1312 int uds_save_index(struct uds_index *index)
1313 {
1314 	int result;
1315 
1316 	if (!index->need_to_save)
1317 		return UDS_SUCCESS;
1318 
1319 	uds_wait_for_idle_index(index);
1320 	index->prev_save = index->last_save;
1321 	index->last_save = ((index->newest_virtual_chapter == 0) ?
1322 			    NO_LAST_SAVE : index->newest_virtual_chapter - 1);
1323 	vdo_log_info("beginning save (vcn %llu)", (unsigned long long) index->last_save);
1324 
1325 	result = uds_save_index_state(index->layout, index);
1326 	if (result != UDS_SUCCESS) {
1327 		vdo_log_info("save index failed");
1328 		index->last_save = index->prev_save;
1329 	} else {
1330 		index->has_saved_open_chapter = true;
1331 		index->need_to_save = false;
1332 		vdo_log_info("finished save (vcn %llu)",
1333 			     (unsigned long long) index->last_save);
1334 	}
1335 
1336 	return result;
1337 }
1338 
1339 int uds_replace_index_storage(struct uds_index *index, struct block_device *bdev)
1340 {
1341 	return uds_replace_volume_storage(index->volume, index->layout, bdev);
1342 }
1343 
1344 /* Accessing statistics should be safe from any thread. */
1345 void uds_get_index_stats(struct uds_index *index, struct uds_index_stats *counters)
1346 {
1347 	struct volume_index_stats stats;
1348 
1349 	uds_get_volume_index_stats(index->volume_index, &stats);
1350 	counters->entries_indexed = stats.record_count;
1351 	counters->collisions = stats.collision_count;
1352 	counters->entries_discarded = stats.discard_count;
1353 
1354 	counters->memory_used = (index->volume_index->memory_size +
1355 				 index->volume->cache_size +
1356 				 index->chapter_writer->memory_size);
1357 }
1358 
1359 void uds_enqueue_request(struct uds_request *request, enum request_stage stage)
1360 {
1361 	struct uds_index *index = request->index;
1362 	struct uds_request_queue *queue;
1363 
1364 	switch (stage) {
1365 	case STAGE_TRIAGE:
1366 		if (index->triage_queue != NULL) {
1367 			queue = index->triage_queue;
1368 			break;
1369 		}
1370 
1371 		fallthrough;
1372 
1373 	case STAGE_INDEX:
1374 		request->zone_number =
1375 			uds_get_volume_index_zone(index->volume_index, &request->record_name);
1376 		fallthrough;
1377 
1378 	case STAGE_MESSAGE:
1379 		queue = index->zone_queues[request->zone_number];
1380 		break;
1381 
1382 	default:
1383 		VDO_ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage);
1384 		return;
1385 	}
1386 
1387 	uds_request_queue_enqueue(queue, request);
1388 }
1389