1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6
7 #include "index.h"
8
9 #include "logger.h"
10 #include "memory-alloc.h"
11
12 #include "funnel-requestqueue.h"
13 #include "hash-utils.h"
14 #include "sparse-cache.h"
15
16 static const u64 NO_LAST_SAVE = U64_MAX;
17
18 /*
19 * When searching for deduplication records, the index first searches the volume index, and then
20 * searches the chapter index for the relevant chapter. If the chapter has been fully committed to
21 * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been
22 * committed (either the open chapter or a recently closed one), the index searches the in-memory
23 * representation of the chapter. Finally, if the volume index does not find a record and the index
24 * is sparse, the index will search the sparse cache.
25 *
26 * The index send two kinds of messages to coordinate between zones: chapter close messages for the
27 * chapter writer, and sparse cache barrier messages for the sparse cache.
28 *
29 * The chapter writer is responsible for committing chapters of records to storage. Since zones can
30 * get different numbers of records, some zones may fall behind others. Each time a zone fills up
31 * its available space in a chapter, it informs the chapter writer that the chapter is complete,
32 * and also informs all other zones that it has closed the chapter. Each other zone will then close
33 * the chapter immediately, regardless of how full it is, in order to minimize skew between zones.
34 * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage.
35 *
36 * The last zone to close the chapter also removes the oldest chapter from the volume index.
37 * Although that chapter is invalid for zones that have moved on, the existence of the open chapter
38 * means that those zones will never ask the volume index about it. No zone is allowed to get more
39 * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another
40 * chapter before the previous one has been closed by all zones, it is forced to wait.
41 *
42 * The sparse cache relies on having the same set of chapter indexes available to all zones. When a
43 * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone
44 * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and
45 * paused its operations, the cache membership is changed and each zone is then informed that it
46 * can proceed. More details can be found in the sparse cache documentation.
47 *
48 * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the
49 * barrier message to change the sparse cache membership, so the index simulates the message by
50 * invoking the handler directly.
51 */
52
53 struct chapter_writer {
54 /* The index to which we belong */
55 struct uds_index *index;
56 /* The thread to do the writing */
57 struct thread *thread;
58 /* The lock protecting the following fields */
59 struct mutex mutex;
60 /* The condition signalled on state changes */
61 struct cond_var cond;
62 /* Set to true to stop the thread */
63 bool stop;
64 /* The result from the most recent write */
65 int result;
66 /* The number of bytes allocated by the chapter writer */
67 size_t memory_size;
68 /* The number of zones which have submitted a chapter for writing */
69 unsigned int zones_to_write;
70 /* Open chapter index used by uds_close_open_chapter() */
71 struct open_chapter_index *open_chapter_index;
72 /* Collated records used by uds_close_open_chapter() */
73 struct uds_volume_record *collated_records;
74 /* The chapters to write (one per zone) */
75 struct open_chapter_zone *chapters[];
76 };
77
is_zone_chapter_sparse(const struct index_zone * zone,u64 virtual_chapter)78 static bool is_zone_chapter_sparse(const struct index_zone *zone, u64 virtual_chapter)
79 {
80 return uds_is_chapter_sparse(zone->index->volume->geometry,
81 zone->oldest_virtual_chapter,
82 zone->newest_virtual_chapter, virtual_chapter);
83 }
84
launch_zone_message(struct uds_zone_message message,unsigned int zone,struct uds_index * index)85 static int launch_zone_message(struct uds_zone_message message, unsigned int zone,
86 struct uds_index *index)
87 {
88 int result;
89 struct uds_request *request;
90
91 result = vdo_allocate(1, struct uds_request, __func__, &request);
92 if (result != VDO_SUCCESS)
93 return result;
94
95 request->index = index;
96 request->unbatched = true;
97 request->zone_number = zone;
98 request->zone_message = message;
99
100 uds_enqueue_request(request, STAGE_MESSAGE);
101 return UDS_SUCCESS;
102 }
103
enqueue_barrier_messages(struct uds_index * index,u64 virtual_chapter)104 static void enqueue_barrier_messages(struct uds_index *index, u64 virtual_chapter)
105 {
106 struct uds_zone_message message = {
107 .type = UDS_MESSAGE_SPARSE_CACHE_BARRIER,
108 .virtual_chapter = virtual_chapter,
109 };
110 unsigned int zone;
111
112 for (zone = 0; zone < index->zone_count; zone++) {
113 int result = launch_zone_message(message, zone, index);
114
115 VDO_ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation");
116 }
117 }
118
119 /*
120 * Determine whether this request should trigger a sparse cache barrier message to change the
121 * membership of the sparse cache. If a change in membership is desired, the function returns the
122 * chapter number to add.
123 */
triage_index_request(struct uds_index * index,struct uds_request * request)124 static u64 triage_index_request(struct uds_index *index, struct uds_request *request)
125 {
126 u64 virtual_chapter;
127 struct index_zone *zone;
128
129 virtual_chapter = uds_lookup_volume_index_name(index->volume_index,
130 &request->record_name);
131 if (virtual_chapter == NO_CHAPTER)
132 return NO_CHAPTER;
133
134 zone = index->zones[request->zone_number];
135 if (!is_zone_chapter_sparse(zone, virtual_chapter))
136 return NO_CHAPTER;
137
138 /*
139 * FIXME: Optimize for a common case by remembering the chapter from the most recent
140 * barrier message and skipping this chapter if is it the same.
141 */
142
143 return virtual_chapter;
144 }
145
146 /*
147 * Simulate a message to change the sparse cache membership for a single-zone sparse index. This
148 * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind
149 * of index does nothing here.
150 */
simulate_index_zone_barrier_message(struct index_zone * zone,struct uds_request * request)151 static int simulate_index_zone_barrier_message(struct index_zone *zone,
152 struct uds_request *request)
153 {
154 u64 sparse_virtual_chapter;
155
156 if ((zone->index->zone_count > 1) ||
157 !uds_is_sparse_index_geometry(zone->index->volume->geometry))
158 return UDS_SUCCESS;
159
160 sparse_virtual_chapter = triage_index_request(zone->index, request);
161 if (sparse_virtual_chapter == NO_CHAPTER)
162 return UDS_SUCCESS;
163
164 return uds_update_sparse_cache(zone, sparse_virtual_chapter);
165 }
166
167 /* This is the request processing function for the triage queue. */
triage_request(struct uds_request * request)168 static void triage_request(struct uds_request *request)
169 {
170 struct uds_index *index = request->index;
171 u64 sparse_virtual_chapter = triage_index_request(index, request);
172
173 if (sparse_virtual_chapter != NO_CHAPTER)
174 enqueue_barrier_messages(index, sparse_virtual_chapter);
175
176 uds_enqueue_request(request, STAGE_INDEX);
177 }
178
finish_previous_chapter(struct uds_index * index,u64 current_chapter_number)179 static int finish_previous_chapter(struct uds_index *index, u64 current_chapter_number)
180 {
181 int result;
182 struct chapter_writer *writer = index->chapter_writer;
183
184 mutex_lock(&writer->mutex);
185 while (index->newest_virtual_chapter < current_chapter_number)
186 uds_wait_cond(&writer->cond, &writer->mutex);
187 result = writer->result;
188 mutex_unlock(&writer->mutex);
189
190 if (result != UDS_SUCCESS)
191 return vdo_log_error_strerror(result,
192 "Writing of previous open chapter failed");
193
194 return UDS_SUCCESS;
195 }
196
swap_open_chapter(struct index_zone * zone)197 static int swap_open_chapter(struct index_zone *zone)
198 {
199 int result;
200
201 result = finish_previous_chapter(zone->index, zone->newest_virtual_chapter);
202 if (result != UDS_SUCCESS)
203 return result;
204
205 swap(zone->open_chapter, zone->writing_chapter);
206 return UDS_SUCCESS;
207 }
208
209 /*
210 * Inform the chapter writer that this zone is done with this chapter. The chapter won't start
211 * writing until all zones have closed it.
212 */
start_closing_chapter(struct uds_index * index,unsigned int zone_number,struct open_chapter_zone * chapter)213 static unsigned int start_closing_chapter(struct uds_index *index,
214 unsigned int zone_number,
215 struct open_chapter_zone *chapter)
216 {
217 unsigned int finished_zones;
218 struct chapter_writer *writer = index->chapter_writer;
219
220 mutex_lock(&writer->mutex);
221 finished_zones = ++writer->zones_to_write;
222 writer->chapters[zone_number] = chapter;
223 uds_broadcast_cond(&writer->cond);
224 mutex_unlock(&writer->mutex);
225
226 return finished_zones;
227 }
228
announce_chapter_closed(struct index_zone * zone,u64 closed_chapter)229 static int announce_chapter_closed(struct index_zone *zone, u64 closed_chapter)
230 {
231 int result;
232 unsigned int i;
233 struct uds_zone_message zone_message = {
234 .type = UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED,
235 .virtual_chapter = closed_chapter,
236 };
237
238 for (i = 0; i < zone->index->zone_count; i++) {
239 if (zone->id == i)
240 continue;
241
242 result = launch_zone_message(zone_message, i, zone->index);
243 if (result != UDS_SUCCESS)
244 return result;
245 }
246
247 return UDS_SUCCESS;
248 }
249
open_next_chapter(struct index_zone * zone)250 static int open_next_chapter(struct index_zone *zone)
251 {
252 int result;
253 u64 closed_chapter;
254 u64 expiring;
255 unsigned int finished_zones;
256 u32 expire_chapters;
257
258 vdo_log_debug("closing chapter %llu of zone %u after %u entries (%u short)",
259 (unsigned long long) zone->newest_virtual_chapter, zone->id,
260 zone->open_chapter->size,
261 zone->open_chapter->capacity - zone->open_chapter->size);
262
263 result = swap_open_chapter(zone);
264 if (result != UDS_SUCCESS)
265 return result;
266
267 closed_chapter = zone->newest_virtual_chapter++;
268 uds_set_volume_index_zone_open_chapter(zone->index->volume_index, zone->id,
269 zone->newest_virtual_chapter);
270 uds_reset_open_chapter(zone->open_chapter);
271
272 finished_zones = start_closing_chapter(zone->index, zone->id,
273 zone->writing_chapter);
274 if ((finished_zones == 1) && (zone->index->zone_count > 1)) {
275 result = announce_chapter_closed(zone, closed_chapter);
276 if (result != UDS_SUCCESS)
277 return result;
278 }
279
280 expiring = zone->oldest_virtual_chapter;
281 expire_chapters = uds_chapters_to_expire(zone->index->volume->geometry,
282 zone->newest_virtual_chapter);
283 zone->oldest_virtual_chapter += expire_chapters;
284
285 if (finished_zones < zone->index->zone_count)
286 return UDS_SUCCESS;
287
288 while (expire_chapters-- > 0)
289 uds_forget_chapter(zone->index->volume, expiring++);
290
291 return UDS_SUCCESS;
292 }
293
handle_chapter_closed(struct index_zone * zone,u64 virtual_chapter)294 static int handle_chapter_closed(struct index_zone *zone, u64 virtual_chapter)
295 {
296 if (zone->newest_virtual_chapter == virtual_chapter)
297 return open_next_chapter(zone);
298
299 return UDS_SUCCESS;
300 }
301
dispatch_index_zone_control_request(struct uds_request * request)302 static int dispatch_index_zone_control_request(struct uds_request *request)
303 {
304 struct uds_zone_message *message = &request->zone_message;
305 struct index_zone *zone = request->index->zones[request->zone_number];
306
307 switch (message->type) {
308 case UDS_MESSAGE_SPARSE_CACHE_BARRIER:
309 return uds_update_sparse_cache(zone, message->virtual_chapter);
310
311 case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED:
312 return handle_chapter_closed(zone, message->virtual_chapter);
313
314 default:
315 vdo_log_error("invalid message type: %d", message->type);
316 return UDS_INVALID_ARGUMENT;
317 }
318 }
319
set_request_location(struct uds_request * request,enum uds_index_region new_location)320 static void set_request_location(struct uds_request *request,
321 enum uds_index_region new_location)
322 {
323 request->location = new_location;
324 request->found = ((new_location == UDS_LOCATION_IN_OPEN_CHAPTER) ||
325 (new_location == UDS_LOCATION_IN_DENSE) ||
326 (new_location == UDS_LOCATION_IN_SPARSE));
327 }
328
set_chapter_location(struct uds_request * request,const struct index_zone * zone,u64 virtual_chapter)329 static void set_chapter_location(struct uds_request *request,
330 const struct index_zone *zone, u64 virtual_chapter)
331 {
332 request->found = true;
333 if (virtual_chapter == zone->newest_virtual_chapter)
334 request->location = UDS_LOCATION_IN_OPEN_CHAPTER;
335 else if (is_zone_chapter_sparse(zone, virtual_chapter))
336 request->location = UDS_LOCATION_IN_SPARSE;
337 else
338 request->location = UDS_LOCATION_IN_DENSE;
339 }
340
search_sparse_cache_in_zone(struct index_zone * zone,struct uds_request * request,u64 virtual_chapter,bool * found)341 static int search_sparse_cache_in_zone(struct index_zone *zone, struct uds_request *request,
342 u64 virtual_chapter, bool *found)
343 {
344 int result;
345 struct volume *volume;
346 u16 record_page_number;
347 u32 chapter;
348
349 result = uds_search_sparse_cache(zone, &request->record_name, &virtual_chapter,
350 &record_page_number);
351 if ((result != UDS_SUCCESS) || (virtual_chapter == NO_CHAPTER))
352 return result;
353
354 request->virtual_chapter = virtual_chapter;
355 volume = zone->index->volume;
356 chapter = uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
357 return uds_search_cached_record_page(volume, request, chapter,
358 record_page_number, found);
359 }
360
get_record_from_zone(struct index_zone * zone,struct uds_request * request,bool * found)361 static int get_record_from_zone(struct index_zone *zone, struct uds_request *request,
362 bool *found)
363 {
364 struct volume *volume;
365
366 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
367 *found = true;
368 return UDS_SUCCESS;
369 } else if (request->location == UDS_LOCATION_UNAVAILABLE) {
370 *found = false;
371 return UDS_SUCCESS;
372 }
373
374 if (request->virtual_chapter == zone->newest_virtual_chapter) {
375 uds_search_open_chapter(zone->open_chapter, &request->record_name,
376 &request->old_metadata, found);
377 return UDS_SUCCESS;
378 }
379
380 if ((zone->newest_virtual_chapter > 0) &&
381 (request->virtual_chapter == (zone->newest_virtual_chapter - 1)) &&
382 (zone->writing_chapter->size > 0)) {
383 uds_search_open_chapter(zone->writing_chapter, &request->record_name,
384 &request->old_metadata, found);
385 return UDS_SUCCESS;
386 }
387
388 volume = zone->index->volume;
389 if (is_zone_chapter_sparse(zone, request->virtual_chapter) &&
390 uds_sparse_cache_contains(volume->sparse_cache, request->virtual_chapter,
391 request->zone_number))
392 return search_sparse_cache_in_zone(zone, request,
393 request->virtual_chapter, found);
394
395 return uds_search_volume_page_cache(volume, request, found);
396 }
397
put_record_in_zone(struct index_zone * zone,struct uds_request * request,const struct uds_record_data * metadata)398 static int put_record_in_zone(struct index_zone *zone, struct uds_request *request,
399 const struct uds_record_data *metadata)
400 {
401 unsigned int remaining;
402
403 remaining = uds_put_open_chapter(zone->open_chapter, &request->record_name,
404 metadata);
405 if (remaining == 0)
406 return open_next_chapter(zone);
407
408 return UDS_SUCCESS;
409 }
410
search_index_zone(struct index_zone * zone,struct uds_request * request)411 static int search_index_zone(struct index_zone *zone, struct uds_request *request)
412 {
413 int result;
414 struct volume_index_record record;
415 bool overflow_record, found = false;
416 struct uds_record_data *metadata;
417 u64 chapter;
418
419 result = uds_get_volume_index_record(zone->index->volume_index,
420 &request->record_name, &record);
421 if (result != UDS_SUCCESS)
422 return result;
423
424 if (record.is_found) {
425 if (request->requeued && request->virtual_chapter != record.virtual_chapter)
426 set_request_location(request, UDS_LOCATION_UNKNOWN);
427
428 request->virtual_chapter = record.virtual_chapter;
429 result = get_record_from_zone(zone, request, &found);
430 if (result != UDS_SUCCESS)
431 return result;
432 }
433
434 if (found)
435 set_chapter_location(request, zone, record.virtual_chapter);
436
437 /*
438 * If a record has overflowed a chapter index in more than one chapter (or overflowed in
439 * one chapter and collided with an existing record), it will exist as a collision record
440 * in the volume index, but we won't find it in the volume. This case needs special
441 * handling.
442 */
443 overflow_record = (record.is_found && record.is_collision && !found);
444 chapter = zone->newest_virtual_chapter;
445 if (found || overflow_record) {
446 if ((request->type == UDS_QUERY_NO_UPDATE) ||
447 ((request->type == UDS_QUERY) && overflow_record)) {
448 /* There is nothing left to do. */
449 return UDS_SUCCESS;
450 }
451
452 if (record.virtual_chapter != chapter) {
453 /*
454 * Update the volume index to reference the new chapter for the block. If
455 * the record had been deleted or dropped from the chapter index, it will
456 * be back.
457 */
458 result = uds_set_volume_index_record_chapter(&record, chapter);
459 } else if (request->type != UDS_UPDATE) {
460 /* The record is already in the open chapter. */
461 return UDS_SUCCESS;
462 }
463 } else {
464 /*
465 * The record wasn't in the volume index, so check whether the
466 * name is in a cached sparse chapter. If we found the name on
467 * a previous search, use that result instead.
468 */
469 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
470 found = true;
471 } else if (request->location == UDS_LOCATION_UNAVAILABLE) {
472 found = false;
473 } else if (uds_is_sparse_index_geometry(zone->index->volume->geometry) &&
474 !uds_is_volume_index_sample(zone->index->volume_index,
475 &request->record_name)) {
476 result = search_sparse_cache_in_zone(zone, request, NO_CHAPTER,
477 &found);
478 if (result != UDS_SUCCESS)
479 return result;
480 }
481
482 if (found)
483 set_request_location(request, UDS_LOCATION_IN_SPARSE);
484
485 if ((request->type == UDS_QUERY_NO_UPDATE) ||
486 ((request->type == UDS_QUERY) && !found)) {
487 /* There is nothing left to do. */
488 return UDS_SUCCESS;
489 }
490
491 /*
492 * Add a new entry to the volume index referencing the open chapter. This needs to
493 * be done both for new records, and for records from cached sparse chapters.
494 */
495 result = uds_put_volume_index_record(&record, chapter);
496 }
497
498 if (result == UDS_OVERFLOW) {
499 /*
500 * The volume index encountered a delta list overflow. The condition was already
501 * logged. We will go on without adding the record to the open chapter.
502 */
503 return UDS_SUCCESS;
504 }
505
506 if (result != UDS_SUCCESS)
507 return result;
508
509 if (!found || (request->type == UDS_UPDATE)) {
510 /* This is a new record or we're updating an existing record. */
511 metadata = &request->new_metadata;
512 } else {
513 /* Move the existing record to the open chapter. */
514 metadata = &request->old_metadata;
515 }
516
517 return put_record_in_zone(zone, request, metadata);
518 }
519
remove_from_index_zone(struct index_zone * zone,struct uds_request * request)520 static int remove_from_index_zone(struct index_zone *zone, struct uds_request *request)
521 {
522 int result;
523 struct volume_index_record record;
524
525 result = uds_get_volume_index_record(zone->index->volume_index,
526 &request->record_name, &record);
527 if (result != UDS_SUCCESS)
528 return result;
529
530 if (!record.is_found)
531 return UDS_SUCCESS;
532
533 /* If the request was requeued, check whether the saved state is still valid. */
534
535 if (record.is_collision) {
536 set_chapter_location(request, zone, record.virtual_chapter);
537 } else {
538 /* Non-collision records are hints, so resolve the name in the chapter. */
539 bool found;
540
541 if (request->requeued && request->virtual_chapter != record.virtual_chapter)
542 set_request_location(request, UDS_LOCATION_UNKNOWN);
543
544 request->virtual_chapter = record.virtual_chapter;
545 result = get_record_from_zone(zone, request, &found);
546 if (result != UDS_SUCCESS)
547 return result;
548
549 if (!found) {
550 /* There is no record to remove. */
551 return UDS_SUCCESS;
552 }
553 }
554
555 set_chapter_location(request, zone, record.virtual_chapter);
556
557 /*
558 * Delete the volume index entry for the named record only. Note that a later search might
559 * later return stale advice if there is a colliding name in the same chapter, but it's a
560 * very rare case (1 in 2^21).
561 */
562 result = uds_remove_volume_index_record(&record);
563 if (result != UDS_SUCCESS)
564 return result;
565
566 /*
567 * If the record is in the open chapter, we must remove it or mark it deleted to avoid
568 * trouble if the record is added again later.
569 */
570 if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER)
571 uds_remove_from_open_chapter(zone->open_chapter, &request->record_name);
572
573 return UDS_SUCCESS;
574 }
575
dispatch_index_request(struct uds_index * index,struct uds_request * request)576 static int dispatch_index_request(struct uds_index *index, struct uds_request *request)
577 {
578 int result;
579 struct index_zone *zone = index->zones[request->zone_number];
580
581 if (!request->requeued) {
582 result = simulate_index_zone_barrier_message(zone, request);
583 if (result != UDS_SUCCESS)
584 return result;
585 }
586
587 switch (request->type) {
588 case UDS_POST:
589 case UDS_UPDATE:
590 case UDS_QUERY:
591 case UDS_QUERY_NO_UPDATE:
592 result = search_index_zone(zone, request);
593 break;
594
595 case UDS_DELETE:
596 result = remove_from_index_zone(zone, request);
597 break;
598
599 default:
600 result = vdo_log_warning_strerror(UDS_INVALID_ARGUMENT,
601 "invalid request type: %d",
602 request->type);
603 break;
604 }
605
606 return result;
607 }
608
609 /* This is the request processing function invoked by each zone's thread. */
execute_zone_request(struct uds_request * request)610 static void execute_zone_request(struct uds_request *request)
611 {
612 int result;
613 struct uds_index *index = request->index;
614
615 if (request->zone_message.type != UDS_MESSAGE_NONE) {
616 result = dispatch_index_zone_control_request(request);
617 if (result != UDS_SUCCESS) {
618 vdo_log_error_strerror(result, "error executing message: %d",
619 request->zone_message.type);
620 }
621
622 /* Once the message is processed it can be freed. */
623 vdo_free(vdo_forget(request));
624 return;
625 }
626
627 index->need_to_save = true;
628 if (request->requeued && (request->status != UDS_SUCCESS)) {
629 set_request_location(request, UDS_LOCATION_UNAVAILABLE);
630 index->callback(request);
631 return;
632 }
633
634 result = dispatch_index_request(index, request);
635 if (result == UDS_QUEUED) {
636 /* The request has been requeued so don't let it complete. */
637 return;
638 }
639
640 if (!request->found)
641 set_request_location(request, UDS_LOCATION_UNAVAILABLE);
642
643 request->status = result;
644 index->callback(request);
645 }
646
initialize_index_queues(struct uds_index * index,const struct index_geometry * geometry)647 static int initialize_index_queues(struct uds_index *index,
648 const struct index_geometry *geometry)
649 {
650 int result;
651 unsigned int i;
652
653 for (i = 0; i < index->zone_count; i++) {
654 result = uds_make_request_queue("indexW", &execute_zone_request,
655 &index->zone_queues[i]);
656 if (result != UDS_SUCCESS)
657 return result;
658 }
659
660 /* The triage queue is only needed for sparse multi-zone indexes. */
661 if ((index->zone_count > 1) && uds_is_sparse_index_geometry(geometry)) {
662 result = uds_make_request_queue("triageW", &triage_request,
663 &index->triage_queue);
664 if (result != UDS_SUCCESS)
665 return result;
666 }
667
668 return UDS_SUCCESS;
669 }
670
671 /* This is the driver function for the chapter writer thread. */
close_chapters(void * arg)672 static void close_chapters(void *arg)
673 {
674 int result;
675 struct chapter_writer *writer = arg;
676 struct uds_index *index = writer->index;
677
678 vdo_log_debug("chapter writer starting");
679 mutex_lock(&writer->mutex);
680 for (;;) {
681 while (writer->zones_to_write < index->zone_count) {
682 if (writer->stop && (writer->zones_to_write == 0)) {
683 /*
684 * We've been told to stop, and all of the zones are in the same
685 * open chapter, so we can exit now.
686 */
687 mutex_unlock(&writer->mutex);
688 vdo_log_debug("chapter writer stopping");
689 return;
690 }
691 uds_wait_cond(&writer->cond, &writer->mutex);
692 }
693
694 /*
695 * Release the lock while closing a chapter. We probably don't need to do this, but
696 * it seems safer in principle. It's OK to access the chapter and chapter_number
697 * fields without the lock since those aren't allowed to change until we're done.
698 */
699 mutex_unlock(&writer->mutex);
700
701 if (index->has_saved_open_chapter) {
702 /*
703 * Remove the saved open chapter the first time we close an open chapter
704 * after loading from a clean shutdown, or after doing a clean save. The
705 * lack of the saved open chapter will indicate that a recovery is
706 * necessary.
707 */
708 index->has_saved_open_chapter = false;
709 result = uds_discard_open_chapter(index->layout);
710 if (result == UDS_SUCCESS)
711 vdo_log_debug("Discarding saved open chapter");
712 }
713
714 result = uds_close_open_chapter(writer->chapters, index->zone_count,
715 index->volume,
716 writer->open_chapter_index,
717 writer->collated_records,
718 index->newest_virtual_chapter);
719
720 mutex_lock(&writer->mutex);
721 index->newest_virtual_chapter++;
722 index->oldest_virtual_chapter +=
723 uds_chapters_to_expire(index->volume->geometry,
724 index->newest_virtual_chapter);
725 writer->result = result;
726 writer->zones_to_write = 0;
727 uds_broadcast_cond(&writer->cond);
728 }
729 }
730
stop_chapter_writer(struct chapter_writer * writer)731 static void stop_chapter_writer(struct chapter_writer *writer)
732 {
733 struct thread *writer_thread = NULL;
734
735 mutex_lock(&writer->mutex);
736 if (writer->thread != NULL) {
737 writer_thread = writer->thread;
738 writer->thread = NULL;
739 writer->stop = true;
740 uds_broadcast_cond(&writer->cond);
741 }
742 mutex_unlock(&writer->mutex);
743
744 if (writer_thread != NULL)
745 vdo_join_threads(writer_thread);
746 }
747
free_chapter_writer(struct chapter_writer * writer)748 static void free_chapter_writer(struct chapter_writer *writer)
749 {
750 if (writer == NULL)
751 return;
752
753 stop_chapter_writer(writer);
754 uds_free_open_chapter_index(writer->open_chapter_index);
755 vdo_free(writer->collated_records);
756 vdo_free(writer);
757 }
758
make_chapter_writer(struct uds_index * index,struct chapter_writer ** writer_ptr)759 static int make_chapter_writer(struct uds_index *index,
760 struct chapter_writer **writer_ptr)
761 {
762 int result;
763 struct chapter_writer *writer;
764 size_t collated_records_size =
765 (sizeof(struct uds_volume_record) * index->volume->geometry->records_per_chapter);
766
767 result = vdo_allocate_extended(struct chapter_writer, index->zone_count,
768 struct open_chapter_zone *, "Chapter Writer",
769 &writer);
770 if (result != VDO_SUCCESS)
771 return result;
772
773 writer->index = index;
774 mutex_init(&writer->mutex);
775 uds_init_cond(&writer->cond);
776
777 result = vdo_allocate_cache_aligned(collated_records_size, "collated records",
778 &writer->collated_records);
779 if (result != VDO_SUCCESS) {
780 free_chapter_writer(writer);
781 return result;
782 }
783
784 result = uds_make_open_chapter_index(&writer->open_chapter_index,
785 index->volume->geometry,
786 index->volume->nonce);
787 if (result != UDS_SUCCESS) {
788 free_chapter_writer(writer);
789 return result;
790 }
791
792 writer->memory_size = (sizeof(struct chapter_writer) +
793 index->zone_count * sizeof(struct open_chapter_zone *) +
794 collated_records_size +
795 writer->open_chapter_index->memory_size);
796
797 result = vdo_create_thread(close_chapters, writer, "writer", &writer->thread);
798 if (result != VDO_SUCCESS) {
799 free_chapter_writer(writer);
800 return result;
801 }
802
803 *writer_ptr = writer;
804 return UDS_SUCCESS;
805 }
806
load_index(struct uds_index * index)807 static int load_index(struct uds_index *index)
808 {
809 int result;
810 u64 last_save_chapter;
811
812 result = uds_load_index_state(index->layout, index);
813 if (result != UDS_SUCCESS)
814 return UDS_INDEX_NOT_SAVED_CLEANLY;
815
816 last_save_chapter = ((index->last_save != NO_LAST_SAVE) ? index->last_save : 0);
817
818 vdo_log_info("loaded index from chapter %llu through chapter %llu",
819 (unsigned long long) index->oldest_virtual_chapter,
820 (unsigned long long) last_save_chapter);
821
822 return UDS_SUCCESS;
823 }
824
rebuild_index_page_map(struct uds_index * index,u64 vcn)825 static int rebuild_index_page_map(struct uds_index *index, u64 vcn)
826 {
827 int result;
828 struct delta_index_page *chapter_index_page;
829 struct index_geometry *geometry = index->volume->geometry;
830 u32 chapter = uds_map_to_physical_chapter(geometry, vcn);
831 u32 expected_list_number = 0;
832 u32 index_page_number;
833 u32 lowest_delta_list;
834 u32 highest_delta_list;
835
836 for (index_page_number = 0;
837 index_page_number < geometry->index_pages_per_chapter;
838 index_page_number++) {
839 result = uds_get_volume_index_page(index->volume, chapter,
840 index_page_number,
841 &chapter_index_page);
842 if (result != UDS_SUCCESS) {
843 return vdo_log_error_strerror(result,
844 "failed to read index page %u in chapter %u",
845 index_page_number, chapter);
846 }
847
848 lowest_delta_list = chapter_index_page->lowest_list_number;
849 highest_delta_list = chapter_index_page->highest_list_number;
850 if (lowest_delta_list != expected_list_number) {
851 return vdo_log_error_strerror(UDS_CORRUPT_DATA,
852 "chapter %u index page %u is corrupt",
853 chapter, index_page_number);
854 }
855
856 uds_update_index_page_map(index->volume->index_page_map, vcn, chapter,
857 index_page_number, highest_delta_list);
858 expected_list_number = highest_delta_list + 1;
859 }
860
861 return UDS_SUCCESS;
862 }
863
replay_record(struct uds_index * index,const struct uds_record_name * name,u64 virtual_chapter,bool will_be_sparse_chapter)864 static int replay_record(struct uds_index *index, const struct uds_record_name *name,
865 u64 virtual_chapter, bool will_be_sparse_chapter)
866 {
867 int result;
868 struct volume_index_record record;
869 bool update_record;
870
871 if (will_be_sparse_chapter &&
872 !uds_is_volume_index_sample(index->volume_index, name)) {
873 /*
874 * This entry will be in a sparse chapter after the rebuild completes, and it is
875 * not a sample, so just skip over it.
876 */
877 return UDS_SUCCESS;
878 }
879
880 result = uds_get_volume_index_record(index->volume_index, name, &record);
881 if (result != UDS_SUCCESS)
882 return result;
883
884 if (record.is_found) {
885 if (record.is_collision) {
886 if (record.virtual_chapter == virtual_chapter) {
887 /* The record is already correct. */
888 return UDS_SUCCESS;
889 }
890
891 update_record = true;
892 } else if (record.virtual_chapter == virtual_chapter) {
893 /*
894 * There is a volume index entry pointing to the current chapter, but we
895 * don't know if it is for the same name as the one we are currently
896 * working on or not. For now, we're just going to assume that it isn't.
897 * This will create one extra collision record if there was a deleted
898 * record in the current chapter.
899 */
900 update_record = false;
901 } else {
902 /*
903 * If we're rebuilding, we don't normally want to go to disk to see if the
904 * record exists, since we will likely have just read the record from disk
905 * (i.e. we know it's there). The exception to this is when we find an
906 * entry in the volume index that has a different chapter. In this case, we
907 * need to search that chapter to determine if the volume index entry was
908 * for the same record or a different one.
909 */
910 result = uds_search_volume_page_cache_for_rebuild(index->volume,
911 name,
912 record.virtual_chapter,
913 &update_record);
914 if (result != UDS_SUCCESS)
915 return result;
916 }
917 } else {
918 update_record = false;
919 }
920
921 if (update_record) {
922 /*
923 * Update the volume index to reference the new chapter for the block. If the
924 * record had been deleted or dropped from the chapter index, it will be back.
925 */
926 result = uds_set_volume_index_record_chapter(&record, virtual_chapter);
927 } else {
928 /*
929 * Add a new entry to the volume index referencing the open chapter. This should be
930 * done regardless of whether we are a brand new record or a sparse record, i.e.
931 * one that doesn't exist in the index but does on disk, since for a sparse record,
932 * we would want to un-sparsify if it did exist.
933 */
934 result = uds_put_volume_index_record(&record, virtual_chapter);
935 }
936
937 if ((result == UDS_DUPLICATE_NAME) || (result == UDS_OVERFLOW)) {
938 /* The rebuilt index will lose these records. */
939 return UDS_SUCCESS;
940 }
941
942 return result;
943 }
944
check_for_suspend(struct uds_index * index)945 static bool check_for_suspend(struct uds_index *index)
946 {
947 bool closing;
948
949 if (index->load_context == NULL)
950 return false;
951
952 mutex_lock(&index->load_context->mutex);
953 if (index->load_context->status != INDEX_SUSPENDING) {
954 mutex_unlock(&index->load_context->mutex);
955 return false;
956 }
957
958 /* Notify that we are suspended and wait for the resume. */
959 index->load_context->status = INDEX_SUSPENDED;
960 uds_broadcast_cond(&index->load_context->cond);
961
962 while ((index->load_context->status != INDEX_OPENING) &&
963 (index->load_context->status != INDEX_FREEING))
964 uds_wait_cond(&index->load_context->cond, &index->load_context->mutex);
965
966 closing = (index->load_context->status == INDEX_FREEING);
967 mutex_unlock(&index->load_context->mutex);
968 return closing;
969 }
970
replay_chapter(struct uds_index * index,u64 virtual,bool sparse)971 static int replay_chapter(struct uds_index *index, u64 virtual, bool sparse)
972 {
973 int result;
974 u32 i;
975 u32 j;
976 const struct index_geometry *geometry;
977 u32 physical_chapter;
978
979 if (check_for_suspend(index)) {
980 vdo_log_info("Replay interrupted by index shutdown at chapter %llu",
981 (unsigned long long) virtual);
982 return -EBUSY;
983 }
984
985 geometry = index->volume->geometry;
986 physical_chapter = uds_map_to_physical_chapter(geometry, virtual);
987 uds_prefetch_volume_chapter(index->volume, physical_chapter);
988 uds_set_volume_index_open_chapter(index->volume_index, virtual);
989
990 result = rebuild_index_page_map(index, virtual);
991 if (result != UDS_SUCCESS) {
992 return vdo_log_error_strerror(result,
993 "could not rebuild index page map for chapter %u",
994 physical_chapter);
995 }
996
997 for (i = 0; i < geometry->record_pages_per_chapter; i++) {
998 u8 *record_page;
999 u32 record_page_number;
1000
1001 record_page_number = geometry->index_pages_per_chapter + i;
1002 result = uds_get_volume_record_page(index->volume, physical_chapter,
1003 record_page_number, &record_page);
1004 if (result != UDS_SUCCESS) {
1005 return vdo_log_error_strerror(result, "could not get page %d",
1006 record_page_number);
1007 }
1008
1009 for (j = 0; j < geometry->records_per_page; j++) {
1010 const u8 *name_bytes;
1011 struct uds_record_name name;
1012
1013 name_bytes = record_page + (j * BYTES_PER_RECORD);
1014 memcpy(&name.name, name_bytes, UDS_RECORD_NAME_SIZE);
1015 result = replay_record(index, &name, virtual, sparse);
1016 if (result != UDS_SUCCESS)
1017 return result;
1018 }
1019 }
1020
1021 return UDS_SUCCESS;
1022 }
1023
replay_volume(struct uds_index * index)1024 static int replay_volume(struct uds_index *index)
1025 {
1026 int result;
1027 u64 old_map_update;
1028 u64 new_map_update;
1029 u64 virtual;
1030 u64 from_virtual = index->oldest_virtual_chapter;
1031 u64 upto_virtual = index->newest_virtual_chapter;
1032 bool will_be_sparse;
1033
1034 vdo_log_info("Replaying volume from chapter %llu through chapter %llu",
1035 (unsigned long long) from_virtual,
1036 (unsigned long long) upto_virtual);
1037
1038 /*
1039 * The index failed to load, so the volume index is empty. Add records to the volume index
1040 * in order, skipping non-hooks in chapters which will be sparse to save time.
1041 *
1042 * Go through each record page of each chapter and add the records back to the volume
1043 * index. This should not cause anything to be written to either the open chapter or the
1044 * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this
1045 * would have already been purged from the volume index when the chapter was opened.
1046 *
1047 * Also, go through each index page for each chapter and rebuild the index page map.
1048 */
1049 old_map_update = index->volume->index_page_map->last_update;
1050 for (virtual = from_virtual; virtual < upto_virtual; virtual++) {
1051 will_be_sparse = uds_is_chapter_sparse(index->volume->geometry,
1052 from_virtual, upto_virtual,
1053 virtual);
1054 result = replay_chapter(index, virtual, will_be_sparse);
1055 if (result != UDS_SUCCESS)
1056 return result;
1057 }
1058
1059 /* Also reap the chapter being replaced by the open chapter. */
1060 uds_set_volume_index_open_chapter(index->volume_index, upto_virtual);
1061
1062 new_map_update = index->volume->index_page_map->last_update;
1063 if (new_map_update != old_map_update) {
1064 vdo_log_info("replay changed index page map update from %llu to %llu",
1065 (unsigned long long) old_map_update,
1066 (unsigned long long) new_map_update);
1067 }
1068
1069 return UDS_SUCCESS;
1070 }
1071
rebuild_index(struct uds_index * index)1072 static int rebuild_index(struct uds_index *index)
1073 {
1074 int result;
1075 u64 lowest;
1076 u64 highest;
1077 bool is_empty = false;
1078 u32 chapters_per_volume = index->volume->geometry->chapters_per_volume;
1079
1080 index->volume->lookup_mode = LOOKUP_FOR_REBUILD;
1081 result = uds_find_volume_chapter_boundaries(index->volume, &lowest, &highest,
1082 &is_empty);
1083 if (result != UDS_SUCCESS) {
1084 return vdo_log_fatal_strerror(result,
1085 "cannot rebuild index: unknown volume chapter boundaries");
1086 }
1087
1088 if (is_empty) {
1089 index->newest_virtual_chapter = 0;
1090 index->oldest_virtual_chapter = 0;
1091 index->volume->lookup_mode = LOOKUP_NORMAL;
1092 return UDS_SUCCESS;
1093 }
1094
1095 index->newest_virtual_chapter = highest + 1;
1096 index->oldest_virtual_chapter = lowest;
1097 if (index->newest_virtual_chapter ==
1098 (index->oldest_virtual_chapter + chapters_per_volume)) {
1099 /* Skip the chapter shadowed by the open chapter. */
1100 index->oldest_virtual_chapter++;
1101 }
1102
1103 result = replay_volume(index);
1104 if (result != UDS_SUCCESS)
1105 return result;
1106
1107 index->volume->lookup_mode = LOOKUP_NORMAL;
1108 return UDS_SUCCESS;
1109 }
1110
free_index_zone(struct index_zone * zone)1111 static void free_index_zone(struct index_zone *zone)
1112 {
1113 if (zone == NULL)
1114 return;
1115
1116 uds_free_open_chapter(zone->open_chapter);
1117 uds_free_open_chapter(zone->writing_chapter);
1118 vdo_free(zone);
1119 }
1120
make_index_zone(struct uds_index * index,unsigned int zone_number)1121 static int make_index_zone(struct uds_index *index, unsigned int zone_number)
1122 {
1123 int result;
1124 struct index_zone *zone;
1125
1126 result = vdo_allocate(1, struct index_zone, "index zone", &zone);
1127 if (result != VDO_SUCCESS)
1128 return result;
1129
1130 result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1131 &zone->open_chapter);
1132 if (result != UDS_SUCCESS) {
1133 free_index_zone(zone);
1134 return result;
1135 }
1136
1137 result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1138 &zone->writing_chapter);
1139 if (result != UDS_SUCCESS) {
1140 free_index_zone(zone);
1141 return result;
1142 }
1143
1144 zone->index = index;
1145 zone->id = zone_number;
1146 index->zones[zone_number] = zone;
1147
1148 return UDS_SUCCESS;
1149 }
1150
uds_make_index(struct uds_configuration * config,enum uds_open_index_type open_type,struct index_load_context * load_context,index_callback_fn callback,struct uds_index ** new_index)1151 int uds_make_index(struct uds_configuration *config, enum uds_open_index_type open_type,
1152 struct index_load_context *load_context, index_callback_fn callback,
1153 struct uds_index **new_index)
1154 {
1155 int result;
1156 bool loaded = false;
1157 bool new = (open_type == UDS_CREATE);
1158 struct uds_index *index = NULL;
1159 struct index_zone *zone;
1160 u64 nonce;
1161 unsigned int z;
1162
1163 result = vdo_allocate_extended(struct uds_index, config->zone_count,
1164 struct uds_request_queue *, "index", &index);
1165 if (result != VDO_SUCCESS)
1166 return result;
1167
1168 index->zone_count = config->zone_count;
1169
1170 result = uds_make_index_layout(config, new, &index->layout);
1171 if (result != UDS_SUCCESS) {
1172 uds_free_index(index);
1173 return result;
1174 }
1175
1176 result = vdo_allocate(index->zone_count, struct index_zone *, "zones",
1177 &index->zones);
1178 if (result != VDO_SUCCESS) {
1179 uds_free_index(index);
1180 return result;
1181 }
1182
1183 result = uds_make_volume(config, index->layout, &index->volume);
1184 if (result != UDS_SUCCESS) {
1185 uds_free_index(index);
1186 return result;
1187 }
1188
1189 index->volume->lookup_mode = LOOKUP_NORMAL;
1190 for (z = 0; z < index->zone_count; z++) {
1191 result = make_index_zone(index, z);
1192 if (result != UDS_SUCCESS) {
1193 uds_free_index(index);
1194 return vdo_log_error_strerror(result,
1195 "Could not create index zone");
1196 }
1197 }
1198
1199 nonce = uds_get_volume_nonce(index->layout);
1200 result = uds_make_volume_index(config, nonce, &index->volume_index);
1201 if (result != UDS_SUCCESS) {
1202 uds_free_index(index);
1203 return vdo_log_error_strerror(result, "could not make volume index");
1204 }
1205
1206 index->load_context = load_context;
1207 index->callback = callback;
1208
1209 result = initialize_index_queues(index, config->geometry);
1210 if (result != UDS_SUCCESS) {
1211 uds_free_index(index);
1212 return result;
1213 }
1214
1215 result = make_chapter_writer(index, &index->chapter_writer);
1216 if (result != UDS_SUCCESS) {
1217 uds_free_index(index);
1218 return result;
1219 }
1220
1221 if (!new) {
1222 result = load_index(index);
1223 switch (result) {
1224 case UDS_SUCCESS:
1225 loaded = true;
1226 break;
1227 case -ENOMEM:
1228 /* We should not try a rebuild for this error. */
1229 vdo_log_error_strerror(result, "index could not be loaded");
1230 break;
1231 default:
1232 vdo_log_error_strerror(result, "index could not be loaded");
1233 if (open_type == UDS_LOAD) {
1234 result = rebuild_index(index);
1235 if (result != UDS_SUCCESS) {
1236 vdo_log_error_strerror(result,
1237 "index could not be rebuilt");
1238 }
1239 }
1240 break;
1241 }
1242 }
1243
1244 if (result != UDS_SUCCESS) {
1245 uds_free_index(index);
1246 return vdo_log_error_strerror(result, "fatal error in %s()", __func__);
1247 }
1248
1249 for (z = 0; z < index->zone_count; z++) {
1250 zone = index->zones[z];
1251 zone->oldest_virtual_chapter = index->oldest_virtual_chapter;
1252 zone->newest_virtual_chapter = index->newest_virtual_chapter;
1253 }
1254
1255 if (index->load_context != NULL) {
1256 mutex_lock(&index->load_context->mutex);
1257 index->load_context->status = INDEX_READY;
1258 /*
1259 * If we get here, suspend is meaningless, but notify any thread trying to suspend
1260 * us so it doesn't hang.
1261 */
1262 uds_broadcast_cond(&index->load_context->cond);
1263 mutex_unlock(&index->load_context->mutex);
1264 }
1265
1266 index->has_saved_open_chapter = loaded;
1267 index->need_to_save = !loaded;
1268 *new_index = index;
1269 return UDS_SUCCESS;
1270 }
1271
uds_free_index(struct uds_index * index)1272 void uds_free_index(struct uds_index *index)
1273 {
1274 unsigned int i;
1275
1276 if (index == NULL)
1277 return;
1278
1279 uds_request_queue_finish(index->triage_queue);
1280 for (i = 0; i < index->zone_count; i++)
1281 uds_request_queue_finish(index->zone_queues[i]);
1282
1283 free_chapter_writer(index->chapter_writer);
1284
1285 uds_free_volume_index(index->volume_index);
1286 if (index->zones != NULL) {
1287 for (i = 0; i < index->zone_count; i++)
1288 free_index_zone(index->zones[i]);
1289 vdo_free(index->zones);
1290 }
1291
1292 uds_free_volume(index->volume);
1293 uds_free_index_layout(vdo_forget(index->layout));
1294 vdo_free(index);
1295 }
1296
1297 /* Wait for the chapter writer to complete any outstanding writes. */
uds_wait_for_idle_index(struct uds_index * index)1298 void uds_wait_for_idle_index(struct uds_index *index)
1299 {
1300 struct chapter_writer *writer = index->chapter_writer;
1301
1302 mutex_lock(&writer->mutex);
1303 while (writer->zones_to_write > 0)
1304 uds_wait_cond(&writer->cond, &writer->mutex);
1305 mutex_unlock(&writer->mutex);
1306 }
1307
1308 /* This function assumes that all requests have been drained. */
uds_save_index(struct uds_index * index)1309 int uds_save_index(struct uds_index *index)
1310 {
1311 int result;
1312
1313 if (!index->need_to_save)
1314 return UDS_SUCCESS;
1315
1316 uds_wait_for_idle_index(index);
1317 index->prev_save = index->last_save;
1318 index->last_save = ((index->newest_virtual_chapter == 0) ?
1319 NO_LAST_SAVE : index->newest_virtual_chapter - 1);
1320 vdo_log_info("beginning save (vcn %llu)", (unsigned long long) index->last_save);
1321
1322 result = uds_save_index_state(index->layout, index);
1323 if (result != UDS_SUCCESS) {
1324 vdo_log_info("save index failed");
1325 index->last_save = index->prev_save;
1326 } else {
1327 index->has_saved_open_chapter = true;
1328 index->need_to_save = false;
1329 vdo_log_info("finished save (vcn %llu)",
1330 (unsigned long long) index->last_save);
1331 }
1332
1333 return result;
1334 }
1335
uds_replace_index_storage(struct uds_index * index,struct block_device * bdev)1336 int uds_replace_index_storage(struct uds_index *index, struct block_device *bdev)
1337 {
1338 return uds_replace_volume_storage(index->volume, index->layout, bdev);
1339 }
1340
1341 /* Accessing statistics should be safe from any thread. */
uds_get_index_stats(struct uds_index * index,struct uds_index_stats * counters)1342 void uds_get_index_stats(struct uds_index *index, struct uds_index_stats *counters)
1343 {
1344 struct volume_index_stats stats;
1345
1346 uds_get_volume_index_stats(index->volume_index, &stats);
1347 counters->entries_indexed = stats.record_count;
1348 counters->collisions = stats.collision_count;
1349 counters->entries_discarded = stats.discard_count;
1350
1351 counters->memory_used = (index->volume_index->memory_size +
1352 index->volume->cache_size +
1353 index->chapter_writer->memory_size);
1354 }
1355
uds_enqueue_request(struct uds_request * request,enum request_stage stage)1356 void uds_enqueue_request(struct uds_request *request, enum request_stage stage)
1357 {
1358 struct uds_index *index = request->index;
1359 struct uds_request_queue *queue;
1360
1361 switch (stage) {
1362 case STAGE_TRIAGE:
1363 if (index->triage_queue != NULL) {
1364 queue = index->triage_queue;
1365 break;
1366 }
1367
1368 fallthrough;
1369
1370 case STAGE_INDEX:
1371 request->zone_number =
1372 uds_get_volume_index_zone(index->volume_index, &request->record_name);
1373 fallthrough;
1374
1375 case STAGE_MESSAGE:
1376 queue = index->zone_queues[request->zone_number];
1377 break;
1378
1379 default:
1380 VDO_ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage);
1381 return;
1382 }
1383
1384 uds_request_queue_enqueue(queue, request);
1385 }
1386