xref: /linux/drivers/md/dm-vdo/indexer/open-chapter.c (revision fbf5df34a4dbcd09d433dd4f0916bf9b2ddb16de)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "open-chapter.h"
7 
8 #include <linux/log2.h>
9 
10 #include "logger.h"
11 #include "memory-alloc.h"
12 #include "numeric.h"
13 #include "permassert.h"
14 
15 #include "config.h"
16 #include "hash-utils.h"
17 
18 /*
19  * Each index zone has a dedicated open chapter zone structure which gets an equal share of the
20  * open chapter space. Records are assigned to zones based on their record name. Within each zone,
21  * records are stored in an array in the order they arrive. Additionally, a reference to each
22  * record is stored in a hash table to help determine if a new record duplicates an existing one.
23  * If new metadata for an existing name arrives, the record is altered in place. The array of
24  * records is 1-based so that record number 0 can be used to indicate an unused hash slot.
25  *
26  * Deleted records are marked with a flag rather than actually removed to simplify hash table
27  * management. The array of deleted flags overlays the array of hash slots, but the flags are
28  * indexed by record number instead of by record name. The number of hash slots will always be a
29  * power of two that is greater than the number of records to be indexed, guaranteeing that hash
30  * insertion cannot fail, and that there are sufficient flags for all records.
31  *
32  * Once any open chapter zone fills its available space, the chapter is closed. The records from
33  * each zone are interleaved to attempt to preserve temporal locality and assigned to record pages.
34  * Empty or deleted records are replaced by copies of a valid record so that the record pages only
35  * contain valid records. The chapter then constructs a delta index which maps each record name to
36  * the record page on which that record can be found, which is split into index pages. These
37  * structures are then passed to the volume to be recorded on storage.
38  *
39  * When the index is saved, the open chapter records are saved in a single array, once again
40  * interleaved to attempt to preserve temporal locality. When the index is reloaded, there may be a
41  * different number of zones than previously, so the records must be parcelled out to their new
42  * zones. In addition, depending on the distribution of record names, a new zone may have more
43  * records than it has space. In this case, the latest records for that zone will be discarded.
44  */
45 
46 static const u8 OPEN_CHAPTER_MAGIC[] = "ALBOC";
47 static const u8 OPEN_CHAPTER_VERSION[] = "02.00";
48 
49 #define OPEN_CHAPTER_MAGIC_LENGTH (sizeof(OPEN_CHAPTER_MAGIC) - 1)
50 #define OPEN_CHAPTER_VERSION_LENGTH (sizeof(OPEN_CHAPTER_VERSION) - 1)
51 #define LOAD_RATIO 2
52 
53 static inline size_t records_size(const struct open_chapter_zone *open_chapter)
54 {
55 	return sizeof(struct uds_volume_record) * (1 + open_chapter->capacity);
56 }
57 
58 static inline size_t slots_size(size_t slot_count)
59 {
60 	return sizeof(struct open_chapter_zone_slot) * slot_count;
61 }
62 
63 int uds_make_open_chapter(const struct index_geometry *geometry, unsigned int zone_count,
64 			  struct open_chapter_zone **open_chapter_ptr)
65 {
66 	int result;
67 	struct open_chapter_zone *open_chapter;
68 	size_t capacity = geometry->records_per_chapter / zone_count;
69 	size_t slot_count = (1 << bits_per(capacity * LOAD_RATIO));
70 
71 	result = vdo_allocate_extended(slot_count, slots, "open chapter", &open_chapter);
72 	if (result != VDO_SUCCESS)
73 		return result;
74 
75 	open_chapter->slot_count = slot_count;
76 	open_chapter->capacity = capacity;
77 	result = vdo_allocate_cache_aligned(records_size(open_chapter), "record pages",
78 					    &open_chapter->records);
79 	if (result != VDO_SUCCESS) {
80 		uds_free_open_chapter(open_chapter);
81 		return result;
82 	}
83 
84 	*open_chapter_ptr = open_chapter;
85 	return UDS_SUCCESS;
86 }
87 
88 void uds_reset_open_chapter(struct open_chapter_zone *open_chapter)
89 {
90 	open_chapter->size = 0;
91 	open_chapter->deletions = 0;
92 
93 	memset(open_chapter->records, 0, records_size(open_chapter));
94 	memset(open_chapter->slots, 0, slots_size(open_chapter->slot_count));
95 }
96 
97 static unsigned int probe_chapter_slots(struct open_chapter_zone *open_chapter,
98 					const struct uds_record_name *name)
99 {
100 	struct uds_volume_record *record;
101 	unsigned int slot_count = open_chapter->slot_count;
102 	unsigned int slot = uds_name_to_hash_slot(name, slot_count);
103 	unsigned int record_number;
104 	unsigned int attempts = 1;
105 
106 	while (true) {
107 		record_number = open_chapter->slots[slot].record_number;
108 
109 		/*
110 		 * If the hash slot is empty, we've reached the end of a chain without finding the
111 		 * record and should terminate the search.
112 		 */
113 		if (record_number == 0)
114 			return slot;
115 
116 		/*
117 		 * If the name of the record referenced by the slot matches and has not been
118 		 * deleted, then we've found the requested name.
119 		 */
120 		record = &open_chapter->records[record_number];
121 		if ((memcmp(&record->name, name, UDS_RECORD_NAME_SIZE) == 0) &&
122 		    !open_chapter->slots[record_number].deleted)
123 			return slot;
124 
125 		/*
126 		 * Quadratic probing: advance the probe by 1, 2, 3, etc. and try again. This
127 		 * performs better than linear probing and works best for 2^N slots.
128 		 */
129 		slot = (slot + attempts++) % slot_count;
130 	}
131 }
132 
133 void uds_search_open_chapter(struct open_chapter_zone *open_chapter,
134 			     const struct uds_record_name *name,
135 			     struct uds_record_data *metadata, bool *found)
136 {
137 	unsigned int slot;
138 	unsigned int record_number;
139 
140 	slot = probe_chapter_slots(open_chapter, name);
141 	record_number = open_chapter->slots[slot].record_number;
142 	if (record_number == 0) {
143 		*found = false;
144 	} else {
145 		*found = true;
146 		*metadata = open_chapter->records[record_number].data;
147 	}
148 }
149 
150 /* Add a record to the open chapter zone and return the remaining space. */
151 int uds_put_open_chapter(struct open_chapter_zone *open_chapter,
152 			 const struct uds_record_name *name,
153 			 const struct uds_record_data *metadata)
154 {
155 	unsigned int slot;
156 	unsigned int record_number;
157 	struct uds_volume_record *record;
158 
159 	if (open_chapter->size >= open_chapter->capacity)
160 		return 0;
161 
162 	slot = probe_chapter_slots(open_chapter, name);
163 	record_number = open_chapter->slots[slot].record_number;
164 
165 	if (record_number == 0) {
166 		record_number = ++open_chapter->size;
167 		open_chapter->slots[slot].record_number = record_number;
168 	}
169 
170 	record = &open_chapter->records[record_number];
171 	record->name = *name;
172 	record->data = *metadata;
173 
174 	return open_chapter->capacity - open_chapter->size;
175 }
176 
177 void uds_remove_from_open_chapter(struct open_chapter_zone *open_chapter,
178 				  const struct uds_record_name *name)
179 {
180 	unsigned int slot;
181 	unsigned int record_number;
182 
183 	slot = probe_chapter_slots(open_chapter, name);
184 	record_number = open_chapter->slots[slot].record_number;
185 
186 	if (record_number > 0) {
187 		open_chapter->slots[record_number].deleted = true;
188 		open_chapter->deletions += 1;
189 	}
190 }
191 
192 void uds_free_open_chapter(struct open_chapter_zone *open_chapter)
193 {
194 	if (open_chapter != NULL) {
195 		vdo_free(open_chapter->records);
196 		vdo_free(open_chapter);
197 	}
198 }
199 
200 /* Map each record name to its record page number in the delta chapter index. */
201 static int fill_delta_chapter_index(struct open_chapter_zone **chapter_zones,
202 				    unsigned int zone_count,
203 				    struct open_chapter_index *index,
204 				    struct uds_volume_record *collated_records)
205 {
206 	int result;
207 	unsigned int records_per_chapter;
208 	unsigned int records_per_page;
209 	unsigned int record_index;
210 	unsigned int records = 0;
211 	u32 page_number;
212 	unsigned int z;
213 	int overflow_count = 0;
214 	struct uds_volume_record *fill_record = NULL;
215 
216 	/*
217 	 * The record pages should not have any empty space, so find a record with which to fill
218 	 * the chapter zone if it was closed early, and also to replace any deleted records. The
219 	 * last record in any filled zone is guaranteed to not have been deleted, so use one of
220 	 * those.
221 	 */
222 	for (z = 0; z < zone_count; z++) {
223 		struct open_chapter_zone *zone = chapter_zones[z];
224 
225 		if (zone->size == zone->capacity) {
226 			fill_record = &zone->records[zone->size];
227 			break;
228 		}
229 	}
230 
231 	records_per_chapter = index->geometry->records_per_chapter;
232 	records_per_page = index->geometry->records_per_page;
233 
234 	for (records = 0; records < records_per_chapter; records++) {
235 		struct uds_volume_record *record = &collated_records[records];
236 		struct open_chapter_zone *open_chapter;
237 
238 		/* The record arrays in the zones are 1-based. */
239 		record_index = 1 + (records / zone_count);
240 		page_number = records / records_per_page;
241 		open_chapter = chapter_zones[records % zone_count];
242 
243 		/* Use the fill record in place of an unused record. */
244 		if (record_index > open_chapter->size ||
245 		    open_chapter->slots[record_index].deleted) {
246 			*record = *fill_record;
247 			continue;
248 		}
249 
250 		*record = open_chapter->records[record_index];
251 		result = uds_put_open_chapter_index_record(index, &record->name,
252 							   page_number);
253 		switch (result) {
254 		case UDS_SUCCESS:
255 			break;
256 		case UDS_OVERFLOW:
257 			overflow_count++;
258 			break;
259 		default:
260 			vdo_log_error_strerror(result,
261 					       "failed to build open chapter index");
262 			return result;
263 		}
264 	}
265 
266 	if (overflow_count > 0)
267 		vdo_log_warning("Failed to add %d entries to chapter index",
268 				overflow_count);
269 
270 	return UDS_SUCCESS;
271 }
272 
273 int uds_close_open_chapter(struct open_chapter_zone **chapter_zones,
274 			   unsigned int zone_count, struct volume *volume,
275 			   struct open_chapter_index *chapter_index,
276 			   struct uds_volume_record *collated_records,
277 			   u64 virtual_chapter_number)
278 {
279 	int result;
280 
281 	uds_empty_open_chapter_index(chapter_index, virtual_chapter_number);
282 	result = fill_delta_chapter_index(chapter_zones, zone_count, chapter_index,
283 					  collated_records);
284 	if (result != UDS_SUCCESS)
285 		return result;
286 
287 	return uds_write_chapter(volume, chapter_index, collated_records);
288 }
289 
290 int uds_save_open_chapter(struct uds_index *index, struct buffered_writer *writer)
291 {
292 	int result;
293 	struct open_chapter_zone *open_chapter;
294 	struct uds_volume_record *record;
295 	u8 record_count_data[sizeof(u32)];
296 	u32 record_count = 0;
297 	unsigned int record_index;
298 	unsigned int z;
299 
300 	result = uds_write_to_buffered_writer(writer, OPEN_CHAPTER_MAGIC,
301 					      OPEN_CHAPTER_MAGIC_LENGTH);
302 	if (result != UDS_SUCCESS)
303 		return result;
304 
305 	result = uds_write_to_buffered_writer(writer, OPEN_CHAPTER_VERSION,
306 					      OPEN_CHAPTER_VERSION_LENGTH);
307 	if (result != UDS_SUCCESS)
308 		return result;
309 
310 	for (z = 0; z < index->zone_count; z++) {
311 		open_chapter = index->zones[z]->open_chapter;
312 		record_count += open_chapter->size - open_chapter->deletions;
313 	}
314 
315 	put_unaligned_le32(record_count, record_count_data);
316 	result = uds_write_to_buffered_writer(writer, record_count_data,
317 					      sizeof(record_count_data));
318 	if (result != UDS_SUCCESS)
319 		return result;
320 
321 	record_index = 1;
322 	while (record_count > 0) {
323 		for (z = 0; z < index->zone_count; z++) {
324 			open_chapter = index->zones[z]->open_chapter;
325 			if (record_index > open_chapter->size)
326 				continue;
327 
328 			if (open_chapter->slots[record_index].deleted)
329 				continue;
330 
331 			record = &open_chapter->records[record_index];
332 			result = uds_write_to_buffered_writer(writer, (u8 *) record,
333 							      sizeof(*record));
334 			if (result != UDS_SUCCESS)
335 				return result;
336 
337 			record_count--;
338 		}
339 
340 		record_index++;
341 	}
342 
343 	return uds_flush_buffered_writer(writer);
344 }
345 
346 u64 uds_compute_saved_open_chapter_size(struct index_geometry *geometry)
347 {
348 	unsigned int records_per_chapter = geometry->records_per_chapter;
349 
350 	return OPEN_CHAPTER_MAGIC_LENGTH + OPEN_CHAPTER_VERSION_LENGTH + sizeof(u32) +
351 		records_per_chapter * sizeof(struct uds_volume_record);
352 }
353 
354 static int load_version20(struct uds_index *index, struct buffered_reader *reader)
355 {
356 	int result;
357 	u32 record_count;
358 	u8 record_count_data[sizeof(u32)];
359 	struct uds_volume_record record;
360 
361 	/*
362 	 * Track which zones cannot accept any more records. If the open chapter had a different
363 	 * number of zones previously, some new zones may have more records than they have space
364 	 * for. These overflow records will be discarded.
365 	 */
366 	bool full_flags[MAX_ZONES] = {
367 		false,
368 	};
369 
370 	result = uds_read_from_buffered_reader(reader, (u8 *) &record_count_data,
371 					       sizeof(record_count_data));
372 	if (result != UDS_SUCCESS)
373 		return result;
374 
375 	record_count = get_unaligned_le32(record_count_data);
376 	while (record_count-- > 0) {
377 		unsigned int zone = 0;
378 
379 		result = uds_read_from_buffered_reader(reader, (u8 *) &record,
380 						       sizeof(record));
381 		if (result != UDS_SUCCESS)
382 			return result;
383 
384 		if (index->zone_count > 1)
385 			zone = uds_get_volume_index_zone(index->volume_index,
386 							 &record.name);
387 
388 		if (!full_flags[zone]) {
389 			struct open_chapter_zone *open_chapter;
390 			unsigned int remaining;
391 
392 			open_chapter = index->zones[zone]->open_chapter;
393 			remaining = uds_put_open_chapter(open_chapter, &record.name,
394 							 &record.data);
395 			/* Do not allow any zone to fill completely. */
396 			full_flags[zone] = (remaining <= 1);
397 		}
398 	}
399 
400 	return UDS_SUCCESS;
401 }
402 
403 int uds_load_open_chapter(struct uds_index *index, struct buffered_reader *reader)
404 {
405 	u8 version[OPEN_CHAPTER_VERSION_LENGTH];
406 	int result;
407 
408 	result = uds_verify_buffered_data(reader, OPEN_CHAPTER_MAGIC,
409 					  OPEN_CHAPTER_MAGIC_LENGTH);
410 	if (result != UDS_SUCCESS)
411 		return result;
412 
413 	result = uds_read_from_buffered_reader(reader, version, sizeof(version));
414 	if (result != UDS_SUCCESS)
415 		return result;
416 
417 	if (memcmp(OPEN_CHAPTER_VERSION, version, sizeof(version)) != 0) {
418 		return vdo_log_error_strerror(UDS_CORRUPT_DATA,
419 					      "Invalid open chapter version: %.*s",
420 					      (int) sizeof(version), version);
421 	}
422 
423 	return load_version20(index, reader);
424 }
425