1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 #include "open-chapter.h"
7
8 #include <linux/log2.h>
9
10 #include "logger.h"
11 #include "memory-alloc.h"
12 #include "numeric.h"
13 #include "permassert.h"
14
15 #include "config.h"
16 #include "hash-utils.h"
17
18 /*
19 * Each index zone has a dedicated open chapter zone structure which gets an equal share of the
20 * open chapter space. Records are assigned to zones based on their record name. Within each zone,
21 * records are stored in an array in the order they arrive. Additionally, a reference to each
22 * record is stored in a hash table to help determine if a new record duplicates an existing one.
23 * If new metadata for an existing name arrives, the record is altered in place. The array of
24 * records is 1-based so that record number 0 can be used to indicate an unused hash slot.
25 *
26 * Deleted records are marked with a flag rather than actually removed to simplify hash table
27 * management. The array of deleted flags overlays the array of hash slots, but the flags are
28 * indexed by record number instead of by record name. The number of hash slots will always be a
29 * power of two that is greater than the number of records to be indexed, guaranteeing that hash
30 * insertion cannot fail, and that there are sufficient flags for all records.
31 *
32 * Once any open chapter zone fills its available space, the chapter is closed. The records from
33 * each zone are interleaved to attempt to preserve temporal locality and assigned to record pages.
34 * Empty or deleted records are replaced by copies of a valid record so that the record pages only
35 * contain valid records. The chapter then constructs a delta index which maps each record name to
36 * the record page on which that record can be found, which is split into index pages. These
37 * structures are then passed to the volume to be recorded on storage.
38 *
39 * When the index is saved, the open chapter records are saved in a single array, once again
40 * interleaved to attempt to preserve temporal locality. When the index is reloaded, there may be a
41 * different number of zones than previously, so the records must be parcelled out to their new
42 * zones. In addition, depending on the distribution of record names, a new zone may have more
43 * records than it has space. In this case, the latest records for that zone will be discarded.
44 */
45
46 static const u8 OPEN_CHAPTER_MAGIC[] = "ALBOC";
47 static const u8 OPEN_CHAPTER_VERSION[] = "02.00";
48
49 #define OPEN_CHAPTER_MAGIC_LENGTH (sizeof(OPEN_CHAPTER_MAGIC) - 1)
50 #define OPEN_CHAPTER_VERSION_LENGTH (sizeof(OPEN_CHAPTER_VERSION) - 1)
51 #define LOAD_RATIO 2
52
records_size(const struct open_chapter_zone * open_chapter)53 static inline size_t records_size(const struct open_chapter_zone *open_chapter)
54 {
55 return sizeof(struct uds_volume_record) * (1 + open_chapter->capacity);
56 }
57
slots_size(size_t slot_count)58 static inline size_t slots_size(size_t slot_count)
59 {
60 return sizeof(struct open_chapter_zone_slot) * slot_count;
61 }
62
uds_make_open_chapter(const struct index_geometry * geometry,unsigned int zone_count,struct open_chapter_zone ** open_chapter_ptr)63 int uds_make_open_chapter(const struct index_geometry *geometry, unsigned int zone_count,
64 struct open_chapter_zone **open_chapter_ptr)
65 {
66 int result;
67 struct open_chapter_zone *open_chapter;
68 size_t capacity = geometry->records_per_chapter / zone_count;
69 size_t slot_count = (1 << bits_per(capacity * LOAD_RATIO));
70
71 result = vdo_allocate_extended(struct open_chapter_zone, slot_count,
72 struct open_chapter_zone_slot, "open chapter",
73 &open_chapter);
74 if (result != VDO_SUCCESS)
75 return result;
76
77 open_chapter->slot_count = slot_count;
78 open_chapter->capacity = capacity;
79 result = vdo_allocate_cache_aligned(records_size(open_chapter), "record pages",
80 &open_chapter->records);
81 if (result != VDO_SUCCESS) {
82 uds_free_open_chapter(open_chapter);
83 return result;
84 }
85
86 *open_chapter_ptr = open_chapter;
87 return UDS_SUCCESS;
88 }
89
uds_reset_open_chapter(struct open_chapter_zone * open_chapter)90 void uds_reset_open_chapter(struct open_chapter_zone *open_chapter)
91 {
92 open_chapter->size = 0;
93 open_chapter->deletions = 0;
94
95 memset(open_chapter->records, 0, records_size(open_chapter));
96 memset(open_chapter->slots, 0, slots_size(open_chapter->slot_count));
97 }
98
probe_chapter_slots(struct open_chapter_zone * open_chapter,const struct uds_record_name * name)99 static unsigned int probe_chapter_slots(struct open_chapter_zone *open_chapter,
100 const struct uds_record_name *name)
101 {
102 struct uds_volume_record *record;
103 unsigned int slot_count = open_chapter->slot_count;
104 unsigned int slot = uds_name_to_hash_slot(name, slot_count);
105 unsigned int record_number;
106 unsigned int attempts = 1;
107
108 while (true) {
109 record_number = open_chapter->slots[slot].record_number;
110
111 /*
112 * If the hash slot is empty, we've reached the end of a chain without finding the
113 * record and should terminate the search.
114 */
115 if (record_number == 0)
116 return slot;
117
118 /*
119 * If the name of the record referenced by the slot matches and has not been
120 * deleted, then we've found the requested name.
121 */
122 record = &open_chapter->records[record_number];
123 if ((memcmp(&record->name, name, UDS_RECORD_NAME_SIZE) == 0) &&
124 !open_chapter->slots[record_number].deleted)
125 return slot;
126
127 /*
128 * Quadratic probing: advance the probe by 1, 2, 3, etc. and try again. This
129 * performs better than linear probing and works best for 2^N slots.
130 */
131 slot = (slot + attempts++) % slot_count;
132 }
133 }
134
uds_search_open_chapter(struct open_chapter_zone * open_chapter,const struct uds_record_name * name,struct uds_record_data * metadata,bool * found)135 void uds_search_open_chapter(struct open_chapter_zone *open_chapter,
136 const struct uds_record_name *name,
137 struct uds_record_data *metadata, bool *found)
138 {
139 unsigned int slot;
140 unsigned int record_number;
141
142 slot = probe_chapter_slots(open_chapter, name);
143 record_number = open_chapter->slots[slot].record_number;
144 if (record_number == 0) {
145 *found = false;
146 } else {
147 *found = true;
148 *metadata = open_chapter->records[record_number].data;
149 }
150 }
151
152 /* Add a record to the open chapter zone and return the remaining space. */
uds_put_open_chapter(struct open_chapter_zone * open_chapter,const struct uds_record_name * name,const struct uds_record_data * metadata)153 int uds_put_open_chapter(struct open_chapter_zone *open_chapter,
154 const struct uds_record_name *name,
155 const struct uds_record_data *metadata)
156 {
157 unsigned int slot;
158 unsigned int record_number;
159 struct uds_volume_record *record;
160
161 if (open_chapter->size >= open_chapter->capacity)
162 return 0;
163
164 slot = probe_chapter_slots(open_chapter, name);
165 record_number = open_chapter->slots[slot].record_number;
166
167 if (record_number == 0) {
168 record_number = ++open_chapter->size;
169 open_chapter->slots[slot].record_number = record_number;
170 }
171
172 record = &open_chapter->records[record_number];
173 record->name = *name;
174 record->data = *metadata;
175
176 return open_chapter->capacity - open_chapter->size;
177 }
178
uds_remove_from_open_chapter(struct open_chapter_zone * open_chapter,const struct uds_record_name * name)179 void uds_remove_from_open_chapter(struct open_chapter_zone *open_chapter,
180 const struct uds_record_name *name)
181 {
182 unsigned int slot;
183 unsigned int record_number;
184
185 slot = probe_chapter_slots(open_chapter, name);
186 record_number = open_chapter->slots[slot].record_number;
187
188 if (record_number > 0) {
189 open_chapter->slots[record_number].deleted = true;
190 open_chapter->deletions += 1;
191 }
192 }
193
uds_free_open_chapter(struct open_chapter_zone * open_chapter)194 void uds_free_open_chapter(struct open_chapter_zone *open_chapter)
195 {
196 if (open_chapter != NULL) {
197 vdo_free(open_chapter->records);
198 vdo_free(open_chapter);
199 }
200 }
201
202 /* Map each record name to its record page number in the delta chapter index. */
fill_delta_chapter_index(struct open_chapter_zone ** chapter_zones,unsigned int zone_count,struct open_chapter_index * index,struct uds_volume_record * collated_records)203 static int fill_delta_chapter_index(struct open_chapter_zone **chapter_zones,
204 unsigned int zone_count,
205 struct open_chapter_index *index,
206 struct uds_volume_record *collated_records)
207 {
208 int result;
209 unsigned int records_per_chapter;
210 unsigned int records_per_page;
211 unsigned int record_index;
212 unsigned int records = 0;
213 u32 page_number;
214 unsigned int z;
215 int overflow_count = 0;
216 struct uds_volume_record *fill_record = NULL;
217
218 /*
219 * The record pages should not have any empty space, so find a record with which to fill
220 * the chapter zone if it was closed early, and also to replace any deleted records. The
221 * last record in any filled zone is guaranteed to not have been deleted, so use one of
222 * those.
223 */
224 for (z = 0; z < zone_count; z++) {
225 struct open_chapter_zone *zone = chapter_zones[z];
226
227 if (zone->size == zone->capacity) {
228 fill_record = &zone->records[zone->size];
229 break;
230 }
231 }
232
233 records_per_chapter = index->geometry->records_per_chapter;
234 records_per_page = index->geometry->records_per_page;
235
236 for (records = 0; records < records_per_chapter; records++) {
237 struct uds_volume_record *record = &collated_records[records];
238 struct open_chapter_zone *open_chapter;
239
240 /* The record arrays in the zones are 1-based. */
241 record_index = 1 + (records / zone_count);
242 page_number = records / records_per_page;
243 open_chapter = chapter_zones[records % zone_count];
244
245 /* Use the fill record in place of an unused record. */
246 if (record_index > open_chapter->size ||
247 open_chapter->slots[record_index].deleted) {
248 *record = *fill_record;
249 continue;
250 }
251
252 *record = open_chapter->records[record_index];
253 result = uds_put_open_chapter_index_record(index, &record->name,
254 page_number);
255 switch (result) {
256 case UDS_SUCCESS:
257 break;
258 case UDS_OVERFLOW:
259 overflow_count++;
260 break;
261 default:
262 vdo_log_error_strerror(result,
263 "failed to build open chapter index");
264 return result;
265 }
266 }
267
268 if (overflow_count > 0)
269 vdo_log_warning("Failed to add %d entries to chapter index",
270 overflow_count);
271
272 return UDS_SUCCESS;
273 }
274
uds_close_open_chapter(struct open_chapter_zone ** chapter_zones,unsigned int zone_count,struct volume * volume,struct open_chapter_index * chapter_index,struct uds_volume_record * collated_records,u64 virtual_chapter_number)275 int uds_close_open_chapter(struct open_chapter_zone **chapter_zones,
276 unsigned int zone_count, struct volume *volume,
277 struct open_chapter_index *chapter_index,
278 struct uds_volume_record *collated_records,
279 u64 virtual_chapter_number)
280 {
281 int result;
282
283 uds_empty_open_chapter_index(chapter_index, virtual_chapter_number);
284 result = fill_delta_chapter_index(chapter_zones, zone_count, chapter_index,
285 collated_records);
286 if (result != UDS_SUCCESS)
287 return result;
288
289 return uds_write_chapter(volume, chapter_index, collated_records);
290 }
291
uds_save_open_chapter(struct uds_index * index,struct buffered_writer * writer)292 int uds_save_open_chapter(struct uds_index *index, struct buffered_writer *writer)
293 {
294 int result;
295 struct open_chapter_zone *open_chapter;
296 struct uds_volume_record *record;
297 u8 record_count_data[sizeof(u32)];
298 u32 record_count = 0;
299 unsigned int record_index;
300 unsigned int z;
301
302 result = uds_write_to_buffered_writer(writer, OPEN_CHAPTER_MAGIC,
303 OPEN_CHAPTER_MAGIC_LENGTH);
304 if (result != UDS_SUCCESS)
305 return result;
306
307 result = uds_write_to_buffered_writer(writer, OPEN_CHAPTER_VERSION,
308 OPEN_CHAPTER_VERSION_LENGTH);
309 if (result != UDS_SUCCESS)
310 return result;
311
312 for (z = 0; z < index->zone_count; z++) {
313 open_chapter = index->zones[z]->open_chapter;
314 record_count += open_chapter->size - open_chapter->deletions;
315 }
316
317 put_unaligned_le32(record_count, record_count_data);
318 result = uds_write_to_buffered_writer(writer, record_count_data,
319 sizeof(record_count_data));
320 if (result != UDS_SUCCESS)
321 return result;
322
323 record_index = 1;
324 while (record_count > 0) {
325 for (z = 0; z < index->zone_count; z++) {
326 open_chapter = index->zones[z]->open_chapter;
327 if (record_index > open_chapter->size)
328 continue;
329
330 if (open_chapter->slots[record_index].deleted)
331 continue;
332
333 record = &open_chapter->records[record_index];
334 result = uds_write_to_buffered_writer(writer, (u8 *) record,
335 sizeof(*record));
336 if (result != UDS_SUCCESS)
337 return result;
338
339 record_count--;
340 }
341
342 record_index++;
343 }
344
345 return uds_flush_buffered_writer(writer);
346 }
347
uds_compute_saved_open_chapter_size(struct index_geometry * geometry)348 u64 uds_compute_saved_open_chapter_size(struct index_geometry *geometry)
349 {
350 unsigned int records_per_chapter = geometry->records_per_chapter;
351
352 return OPEN_CHAPTER_MAGIC_LENGTH + OPEN_CHAPTER_VERSION_LENGTH + sizeof(u32) +
353 records_per_chapter * sizeof(struct uds_volume_record);
354 }
355
load_version20(struct uds_index * index,struct buffered_reader * reader)356 static int load_version20(struct uds_index *index, struct buffered_reader *reader)
357 {
358 int result;
359 u32 record_count;
360 u8 record_count_data[sizeof(u32)];
361 struct uds_volume_record record;
362
363 /*
364 * Track which zones cannot accept any more records. If the open chapter had a different
365 * number of zones previously, some new zones may have more records than they have space
366 * for. These overflow records will be discarded.
367 */
368 bool full_flags[MAX_ZONES] = {
369 false,
370 };
371
372 result = uds_read_from_buffered_reader(reader, (u8 *) &record_count_data,
373 sizeof(record_count_data));
374 if (result != UDS_SUCCESS)
375 return result;
376
377 record_count = get_unaligned_le32(record_count_data);
378 while (record_count-- > 0) {
379 unsigned int zone = 0;
380
381 result = uds_read_from_buffered_reader(reader, (u8 *) &record,
382 sizeof(record));
383 if (result != UDS_SUCCESS)
384 return result;
385
386 if (index->zone_count > 1)
387 zone = uds_get_volume_index_zone(index->volume_index,
388 &record.name);
389
390 if (!full_flags[zone]) {
391 struct open_chapter_zone *open_chapter;
392 unsigned int remaining;
393
394 open_chapter = index->zones[zone]->open_chapter;
395 remaining = uds_put_open_chapter(open_chapter, &record.name,
396 &record.data);
397 /* Do not allow any zone to fill completely. */
398 full_flags[zone] = (remaining <= 1);
399 }
400 }
401
402 return UDS_SUCCESS;
403 }
404
uds_load_open_chapter(struct uds_index * index,struct buffered_reader * reader)405 int uds_load_open_chapter(struct uds_index *index, struct buffered_reader *reader)
406 {
407 u8 version[OPEN_CHAPTER_VERSION_LENGTH];
408 int result;
409
410 result = uds_verify_buffered_data(reader, OPEN_CHAPTER_MAGIC,
411 OPEN_CHAPTER_MAGIC_LENGTH);
412 if (result != UDS_SUCCESS)
413 return result;
414
415 result = uds_read_from_buffered_reader(reader, version, sizeof(version));
416 if (result != UDS_SUCCESS)
417 return result;
418
419 if (memcmp(OPEN_CHAPTER_VERSION, version, sizeof(version)) != 0) {
420 return vdo_log_error_strerror(UDS_CORRUPT_DATA,
421 "Invalid open chapter version: %.*s",
422 (int) sizeof(version), version);
423 }
424
425 return load_version20(index, reader);
426 }
427