1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "open-chapter.h" 7 8 #include <linux/log2.h> 9 10 #include "logger.h" 11 #include "memory-alloc.h" 12 #include "numeric.h" 13 #include "permassert.h" 14 15 #include "config.h" 16 #include "hash-utils.h" 17 18 /* 19 * Each index zone has a dedicated open chapter zone structure which gets an equal share of the 20 * open chapter space. Records are assigned to zones based on their record name. Within each zone, 21 * records are stored in an array in the order they arrive. Additionally, a reference to each 22 * record is stored in a hash table to help determine if a new record duplicates an existing one. 23 * If new metadata for an existing name arrives, the record is altered in place. The array of 24 * records is 1-based so that record number 0 can be used to indicate an unused hash slot. 25 * 26 * Deleted records are marked with a flag rather than actually removed to simplify hash table 27 * management. The array of deleted flags overlays the array of hash slots, but the flags are 28 * indexed by record number instead of by record name. The number of hash slots will always be a 29 * power of two that is greater than the number of records to be indexed, guaranteeing that hash 30 * insertion cannot fail, and that there are sufficient flags for all records. 31 * 32 * Once any open chapter zone fills its available space, the chapter is closed. The records from 33 * each zone are interleaved to attempt to preserve temporal locality and assigned to record pages. 34 * Empty or deleted records are replaced by copies of a valid record so that the record pages only 35 * contain valid records. The chapter then constructs a delta index which maps each record name to 36 * the record page on which that record can be found, which is split into index pages. These 37 * structures are then passed to the volume to be recorded on storage. 38 * 39 * When the index is saved, the open chapter records are saved in a single array, once again 40 * interleaved to attempt to preserve temporal locality. When the index is reloaded, there may be a 41 * different number of zones than previously, so the records must be parcelled out to their new 42 * zones. In addition, depending on the distribution of record names, a new zone may have more 43 * records than it has space. In this case, the latest records for that zone will be discarded. 44 */ 45 46 static const u8 OPEN_CHAPTER_MAGIC[] = "ALBOC"; 47 static const u8 OPEN_CHAPTER_VERSION[] = "02.00"; 48 49 #define OPEN_CHAPTER_MAGIC_LENGTH (sizeof(OPEN_CHAPTER_MAGIC) - 1) 50 #define OPEN_CHAPTER_VERSION_LENGTH (sizeof(OPEN_CHAPTER_VERSION) - 1) 51 #define LOAD_RATIO 2 52 53 static inline size_t records_size(const struct open_chapter_zone *open_chapter) 54 { 55 return sizeof(struct uds_volume_record) * (1 + open_chapter->capacity); 56 } 57 58 static inline size_t slots_size(size_t slot_count) 59 { 60 return sizeof(struct open_chapter_zone_slot) * slot_count; 61 } 62 63 int uds_make_open_chapter(const struct index_geometry *geometry, unsigned int zone_count, 64 struct open_chapter_zone **open_chapter_ptr) 65 { 66 int result; 67 struct open_chapter_zone *open_chapter; 68 size_t capacity = geometry->records_per_chapter / zone_count; 69 size_t slot_count = (1 << bits_per(capacity * LOAD_RATIO)); 70 71 result = vdo_allocate_extended(struct open_chapter_zone, slot_count, 72 struct open_chapter_zone_slot, "open chapter", 73 &open_chapter); 74 if (result != VDO_SUCCESS) 75 return result; 76 77 open_chapter->slot_count = slot_count; 78 open_chapter->capacity = capacity; 79 result = vdo_allocate_cache_aligned(records_size(open_chapter), "record pages", 80 &open_chapter->records); 81 if (result != VDO_SUCCESS) { 82 uds_free_open_chapter(open_chapter); 83 return result; 84 } 85 86 *open_chapter_ptr = open_chapter; 87 return UDS_SUCCESS; 88 } 89 90 void uds_reset_open_chapter(struct open_chapter_zone *open_chapter) 91 { 92 open_chapter->size = 0; 93 open_chapter->deletions = 0; 94 95 memset(open_chapter->records, 0, records_size(open_chapter)); 96 memset(open_chapter->slots, 0, slots_size(open_chapter->slot_count)); 97 } 98 99 static unsigned int probe_chapter_slots(struct open_chapter_zone *open_chapter, 100 const struct uds_record_name *name) 101 { 102 struct uds_volume_record *record; 103 unsigned int slot_count = open_chapter->slot_count; 104 unsigned int slot = uds_name_to_hash_slot(name, slot_count); 105 unsigned int record_number; 106 unsigned int attempts = 1; 107 108 while (true) { 109 record_number = open_chapter->slots[slot].record_number; 110 111 /* 112 * If the hash slot is empty, we've reached the end of a chain without finding the 113 * record and should terminate the search. 114 */ 115 if (record_number == 0) 116 return slot; 117 118 /* 119 * If the name of the record referenced by the slot matches and has not been 120 * deleted, then we've found the requested name. 121 */ 122 record = &open_chapter->records[record_number]; 123 if ((memcmp(&record->name, name, UDS_RECORD_NAME_SIZE) == 0) && 124 !open_chapter->slots[record_number].deleted) 125 return slot; 126 127 /* 128 * Quadratic probing: advance the probe by 1, 2, 3, etc. and try again. This 129 * performs better than linear probing and works best for 2^N slots. 130 */ 131 slot = (slot + attempts++) % slot_count; 132 } 133 } 134 135 void uds_search_open_chapter(struct open_chapter_zone *open_chapter, 136 const struct uds_record_name *name, 137 struct uds_record_data *metadata, bool *found) 138 { 139 unsigned int slot; 140 unsigned int record_number; 141 142 slot = probe_chapter_slots(open_chapter, name); 143 record_number = open_chapter->slots[slot].record_number; 144 if (record_number == 0) { 145 *found = false; 146 } else { 147 *found = true; 148 *metadata = open_chapter->records[record_number].data; 149 } 150 } 151 152 /* Add a record to the open chapter zone and return the remaining space. */ 153 int uds_put_open_chapter(struct open_chapter_zone *open_chapter, 154 const struct uds_record_name *name, 155 const struct uds_record_data *metadata) 156 { 157 unsigned int slot; 158 unsigned int record_number; 159 struct uds_volume_record *record; 160 161 if (open_chapter->size >= open_chapter->capacity) 162 return 0; 163 164 slot = probe_chapter_slots(open_chapter, name); 165 record_number = open_chapter->slots[slot].record_number; 166 167 if (record_number == 0) { 168 record_number = ++open_chapter->size; 169 open_chapter->slots[slot].record_number = record_number; 170 } 171 172 record = &open_chapter->records[record_number]; 173 record->name = *name; 174 record->data = *metadata; 175 176 return open_chapter->capacity - open_chapter->size; 177 } 178 179 void uds_remove_from_open_chapter(struct open_chapter_zone *open_chapter, 180 const struct uds_record_name *name) 181 { 182 unsigned int slot; 183 unsigned int record_number; 184 185 slot = probe_chapter_slots(open_chapter, name); 186 record_number = open_chapter->slots[slot].record_number; 187 188 if (record_number > 0) { 189 open_chapter->slots[record_number].deleted = true; 190 open_chapter->deletions += 1; 191 } 192 } 193 194 void uds_free_open_chapter(struct open_chapter_zone *open_chapter) 195 { 196 if (open_chapter != NULL) { 197 vdo_free(open_chapter->records); 198 vdo_free(open_chapter); 199 } 200 } 201 202 /* Map each record name to its record page number in the delta chapter index. */ 203 static int fill_delta_chapter_index(struct open_chapter_zone **chapter_zones, 204 unsigned int zone_count, 205 struct open_chapter_index *index, 206 struct uds_volume_record *collated_records) 207 { 208 int result; 209 unsigned int records_per_chapter; 210 unsigned int records_per_page; 211 unsigned int record_index; 212 unsigned int records = 0; 213 u32 page_number; 214 unsigned int z; 215 int overflow_count = 0; 216 struct uds_volume_record *fill_record = NULL; 217 218 /* 219 * The record pages should not have any empty space, so find a record with which to fill 220 * the chapter zone if it was closed early, and also to replace any deleted records. The 221 * last record in any filled zone is guaranteed to not have been deleted, so use one of 222 * those. 223 */ 224 for (z = 0; z < zone_count; z++) { 225 struct open_chapter_zone *zone = chapter_zones[z]; 226 227 if (zone->size == zone->capacity) { 228 fill_record = &zone->records[zone->size]; 229 break; 230 } 231 } 232 233 records_per_chapter = index->geometry->records_per_chapter; 234 records_per_page = index->geometry->records_per_page; 235 236 for (records = 0; records < records_per_chapter; records++) { 237 struct uds_volume_record *record = &collated_records[records]; 238 struct open_chapter_zone *open_chapter; 239 240 /* The record arrays in the zones are 1-based. */ 241 record_index = 1 + (records / zone_count); 242 page_number = records / records_per_page; 243 open_chapter = chapter_zones[records % zone_count]; 244 245 /* Use the fill record in place of an unused record. */ 246 if (record_index > open_chapter->size || 247 open_chapter->slots[record_index].deleted) { 248 *record = *fill_record; 249 continue; 250 } 251 252 *record = open_chapter->records[record_index]; 253 result = uds_put_open_chapter_index_record(index, &record->name, 254 page_number); 255 switch (result) { 256 case UDS_SUCCESS: 257 break; 258 case UDS_OVERFLOW: 259 overflow_count++; 260 break; 261 default: 262 vdo_log_error_strerror(result, 263 "failed to build open chapter index"); 264 return result; 265 } 266 } 267 268 if (overflow_count > 0) 269 vdo_log_warning("Failed to add %d entries to chapter index", 270 overflow_count); 271 272 return UDS_SUCCESS; 273 } 274 275 int uds_close_open_chapter(struct open_chapter_zone **chapter_zones, 276 unsigned int zone_count, struct volume *volume, 277 struct open_chapter_index *chapter_index, 278 struct uds_volume_record *collated_records, 279 u64 virtual_chapter_number) 280 { 281 int result; 282 283 uds_empty_open_chapter_index(chapter_index, virtual_chapter_number); 284 result = fill_delta_chapter_index(chapter_zones, zone_count, chapter_index, 285 collated_records); 286 if (result != UDS_SUCCESS) 287 return result; 288 289 return uds_write_chapter(volume, chapter_index, collated_records); 290 } 291 292 int uds_save_open_chapter(struct uds_index *index, struct buffered_writer *writer) 293 { 294 int result; 295 struct open_chapter_zone *open_chapter; 296 struct uds_volume_record *record; 297 u8 record_count_data[sizeof(u32)]; 298 u32 record_count = 0; 299 unsigned int record_index; 300 unsigned int z; 301 302 result = uds_write_to_buffered_writer(writer, OPEN_CHAPTER_MAGIC, 303 OPEN_CHAPTER_MAGIC_LENGTH); 304 if (result != UDS_SUCCESS) 305 return result; 306 307 result = uds_write_to_buffered_writer(writer, OPEN_CHAPTER_VERSION, 308 OPEN_CHAPTER_VERSION_LENGTH); 309 if (result != UDS_SUCCESS) 310 return result; 311 312 for (z = 0; z < index->zone_count; z++) { 313 open_chapter = index->zones[z]->open_chapter; 314 record_count += open_chapter->size - open_chapter->deletions; 315 } 316 317 put_unaligned_le32(record_count, record_count_data); 318 result = uds_write_to_buffered_writer(writer, record_count_data, 319 sizeof(record_count_data)); 320 if (result != UDS_SUCCESS) 321 return result; 322 323 record_index = 1; 324 while (record_count > 0) { 325 for (z = 0; z < index->zone_count; z++) { 326 open_chapter = index->zones[z]->open_chapter; 327 if (record_index > open_chapter->size) 328 continue; 329 330 if (open_chapter->slots[record_index].deleted) 331 continue; 332 333 record = &open_chapter->records[record_index]; 334 result = uds_write_to_buffered_writer(writer, (u8 *) record, 335 sizeof(*record)); 336 if (result != UDS_SUCCESS) 337 return result; 338 339 record_count--; 340 } 341 342 record_index++; 343 } 344 345 return uds_flush_buffered_writer(writer); 346 } 347 348 u64 uds_compute_saved_open_chapter_size(struct index_geometry *geometry) 349 { 350 unsigned int records_per_chapter = geometry->records_per_chapter; 351 352 return OPEN_CHAPTER_MAGIC_LENGTH + OPEN_CHAPTER_VERSION_LENGTH + sizeof(u32) + 353 records_per_chapter * sizeof(struct uds_volume_record); 354 } 355 356 static int load_version20(struct uds_index *index, struct buffered_reader *reader) 357 { 358 int result; 359 u32 record_count; 360 u8 record_count_data[sizeof(u32)]; 361 struct uds_volume_record record; 362 363 /* 364 * Track which zones cannot accept any more records. If the open chapter had a different 365 * number of zones previously, some new zones may have more records than they have space 366 * for. These overflow records will be discarded. 367 */ 368 bool full_flags[MAX_ZONES] = { 369 false, 370 }; 371 372 result = uds_read_from_buffered_reader(reader, (u8 *) &record_count_data, 373 sizeof(record_count_data)); 374 if (result != UDS_SUCCESS) 375 return result; 376 377 record_count = get_unaligned_le32(record_count_data); 378 while (record_count-- > 0) { 379 unsigned int zone = 0; 380 381 result = uds_read_from_buffered_reader(reader, (u8 *) &record, 382 sizeof(record)); 383 if (result != UDS_SUCCESS) 384 return result; 385 386 if (index->zone_count > 1) 387 zone = uds_get_volume_index_zone(index->volume_index, 388 &record.name); 389 390 if (!full_flags[zone]) { 391 struct open_chapter_zone *open_chapter; 392 unsigned int remaining; 393 394 open_chapter = index->zones[zone]->open_chapter; 395 remaining = uds_put_open_chapter(open_chapter, &record.name, 396 &record.data); 397 /* Do not allow any zone to fill completely. */ 398 full_flags[zone] = (remaining <= 1); 399 } 400 } 401 402 return UDS_SUCCESS; 403 } 404 405 int uds_load_open_chapter(struct uds_index *index, struct buffered_reader *reader) 406 { 407 u8 version[OPEN_CHAPTER_VERSION_LENGTH]; 408 int result; 409 410 result = uds_verify_buffered_data(reader, OPEN_CHAPTER_MAGIC, 411 OPEN_CHAPTER_MAGIC_LENGTH); 412 if (result != UDS_SUCCESS) 413 return result; 414 415 result = uds_read_from_buffered_reader(reader, version, sizeof(version)); 416 if (result != UDS_SUCCESS) 417 return result; 418 419 if (memcmp(OPEN_CHAPTER_VERSION, version, sizeof(version)) != 0) { 420 return vdo_log_error_strerror(UDS_CORRUPT_DATA, 421 "Invalid open chapter version: %.*s", 422 (int) sizeof(version), version); 423 } 424 425 return load_version20(index, reader); 426 } 427