1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "volume.h" 7 8 #include <linux/atomic.h> 9 #include <linux/dm-bufio.h> 10 #include <linux/err.h> 11 12 #include "errors.h" 13 #include "logger.h" 14 #include "memory-alloc.h" 15 #include "permassert.h" 16 #include "string-utils.h" 17 #include "thread-utils.h" 18 19 #include "chapter-index.h" 20 #include "config.h" 21 #include "geometry.h" 22 #include "hash-utils.h" 23 #include "index.h" 24 #include "sparse-cache.h" 25 26 /* 27 * The first block of the volume layout is reserved for the volume header, which is no longer used. 28 * The remainder of the volume is divided into chapters consisting of several pages of records, and 29 * several pages of static index to use to find those records. The index pages are recorded first, 30 * followed by the record pages. The chapters are written in order as they are filled, so the 31 * volume storage acts as a circular log of the most recent chapters, with each new chapter 32 * overwriting the oldest saved one. 33 * 34 * When a new chapter is filled and closed, the records from that chapter are sorted and 35 * interleaved in approximate temporal order, and assigned to record pages. Then a static delta 36 * index is generated to store which record page contains each record. The in-memory index page map 37 * is also updated to indicate which delta lists fall on each chapter index page. This means that 38 * when a record is read, the volume only has to load a single index page and a single record page, 39 * rather than search the entire chapter. These index and record pages are written to storage, and 40 * the index pages are transferred to the page cache under the theory that the most recently 41 * written chapter is likely to be accessed again soon. 42 * 43 * When reading a record, the volume index will indicate which chapter should contain it. The 44 * volume uses the index page map to determine which chapter index page needs to be loaded, and 45 * then reads the relevant record page number from the chapter index. Both index and record pages 46 * are stored in a page cache when read for the common case that subsequent records need the same 47 * pages. The page cache evicts the least recently accessed entries when caching new pages. In 48 * addition, the volume uses dm-bufio to manage access to the storage, which may allow for 49 * additional caching depending on available system resources. 50 * 51 * Record requests are handled from cached pages when possible. If a page needs to be read, it is 52 * placed on a queue along with the request that wants to read it. Any requests for the same page 53 * that arrive while the read is pending are added to the queue entry. A separate reader thread 54 * handles the queued reads, adding the page to the cache and updating any requests queued with it 55 * so they can continue processing. This allows the index zone threads to continue processing new 56 * requests rather than wait for the storage reads. 57 * 58 * When an index rebuild is necessary, the volume reads each stored chapter to determine which 59 * range of chapters contain valid records, so that those records can be used to reconstruct the 60 * in-memory volume index. 61 */ 62 63 /* The maximum allowable number of contiguous bad chapters */ 64 #define MAX_BAD_CHAPTERS 100 65 #define VOLUME_CACHE_MAX_ENTRIES (U16_MAX >> 1) 66 #define VOLUME_CACHE_QUEUED_FLAG (1 << 15) 67 #define VOLUME_CACHE_MAX_QUEUED_READS 4096 68 69 static const u64 BAD_CHAPTER = U64_MAX; 70 71 /* 72 * The invalidate counter is two 32 bits fields stored together atomically. The low order 32 bits 73 * are the physical page number of the cached page being read. The high order 32 bits are a 74 * sequence number. This value is written when the zone that owns it begins or completes a cache 75 * search. Any other thread will only read the counter in wait_for_pending_searches() while waiting 76 * to update the cache contents. 77 */ 78 union invalidate_counter { 79 u64 value; 80 struct { 81 u32 page; 82 u32 counter; 83 }; 84 }; 85 86 static inline u32 map_to_page_number(struct index_geometry *geometry, u32 physical_page) 87 { 88 return (physical_page - HEADER_PAGES_PER_VOLUME) % geometry->pages_per_chapter; 89 } 90 91 static inline u32 map_to_chapter_number(struct index_geometry *geometry, u32 physical_page) 92 { 93 return (physical_page - HEADER_PAGES_PER_VOLUME) / geometry->pages_per_chapter; 94 } 95 96 static inline bool is_record_page(struct index_geometry *geometry, u32 physical_page) 97 { 98 return map_to_page_number(geometry, physical_page) >= geometry->index_pages_per_chapter; 99 } 100 101 static u32 map_to_physical_page(const struct index_geometry *geometry, u32 chapter, u32 page) 102 { 103 /* Page zero is the header page, so the first chapter index page is page one. */ 104 return HEADER_PAGES_PER_VOLUME + (geometry->pages_per_chapter * chapter) + page; 105 } 106 107 static inline union invalidate_counter get_invalidate_counter(struct page_cache *cache, 108 unsigned int zone_number) 109 { 110 return (union invalidate_counter) { 111 .value = READ_ONCE(cache->search_pending_counters[zone_number].atomic_value), 112 }; 113 } 114 115 static inline void set_invalidate_counter(struct page_cache *cache, 116 unsigned int zone_number, 117 union invalidate_counter invalidate_counter) 118 { 119 WRITE_ONCE(cache->search_pending_counters[zone_number].atomic_value, 120 invalidate_counter.value); 121 } 122 123 static inline bool search_pending(union invalidate_counter invalidate_counter) 124 { 125 return (invalidate_counter.counter & 1) != 0; 126 } 127 128 /* Lock the cache for a zone in order to search for a page. */ 129 static void begin_pending_search(struct page_cache *cache, u32 physical_page, 130 unsigned int zone_number) 131 { 132 union invalidate_counter invalidate_counter = 133 get_invalidate_counter(cache, zone_number); 134 135 invalidate_counter.page = physical_page; 136 invalidate_counter.counter++; 137 set_invalidate_counter(cache, zone_number, invalidate_counter); 138 VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter), 139 "Search is pending for zone %u", zone_number); 140 /* 141 * This memory barrier ensures that the write to the invalidate counter is seen by other 142 * threads before this thread accesses the cached page. The corresponding read memory 143 * barrier is in wait_for_pending_searches(). 144 */ 145 smp_mb(); 146 } 147 148 /* Unlock the cache for a zone by clearing its invalidate counter. */ 149 static void end_pending_search(struct page_cache *cache, unsigned int zone_number) 150 { 151 union invalidate_counter invalidate_counter; 152 153 /* 154 * This memory barrier ensures that this thread completes reads of the 155 * cached page before other threads see the write to the invalidate 156 * counter. 157 */ 158 smp_mb(); 159 160 invalidate_counter = get_invalidate_counter(cache, zone_number); 161 VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter), 162 "Search is pending for zone %u", zone_number); 163 invalidate_counter.counter++; 164 set_invalidate_counter(cache, zone_number, invalidate_counter); 165 } 166 167 static void wait_for_pending_searches(struct page_cache *cache, u32 physical_page) 168 { 169 union invalidate_counter initial_counters[MAX_ZONES]; 170 unsigned int i; 171 172 /* 173 * We hold the read_threads_mutex. We are waiting for threads that do not hold the 174 * read_threads_mutex. Those threads have "locked" their targeted page by setting the 175 * search_pending_counter. The corresponding write memory barrier is in 176 * begin_pending_search(). 177 */ 178 smp_mb(); 179 180 for (i = 0; i < cache->zone_count; i++) 181 initial_counters[i] = get_invalidate_counter(cache, i); 182 for (i = 0; i < cache->zone_count; i++) { 183 if (search_pending(initial_counters[i]) && 184 (initial_counters[i].page == physical_page)) { 185 /* 186 * There is an active search using the physical page. We need to wait for 187 * the search to finish. 188 * 189 * FIXME: Investigate using wait_event() to wait for the search to finish. 190 */ 191 while (initial_counters[i].value == 192 get_invalidate_counter(cache, i).value) 193 cond_resched(); 194 } 195 } 196 } 197 198 static void release_page_buffer(struct cached_page *page) 199 { 200 if (page->buffer != NULL) 201 dm_bufio_release(vdo_forget(page->buffer)); 202 } 203 204 static void clear_cache_page(struct page_cache *cache, struct cached_page *page) 205 { 206 /* Do not clear read_pending because the read queue relies on it. */ 207 release_page_buffer(page); 208 page->physical_page = cache->indexable_pages; 209 WRITE_ONCE(page->last_used, 0); 210 } 211 212 static void make_page_most_recent(struct page_cache *cache, struct cached_page *page) 213 { 214 /* 215 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any 216 * thread holding the read_threads_mutex. 217 */ 218 if (atomic64_read(&cache->clock) != READ_ONCE(page->last_used)) 219 WRITE_ONCE(page->last_used, atomic64_inc_return(&cache->clock)); 220 } 221 222 /* Select a page to remove from the cache to make space for a new entry. */ 223 static struct cached_page *select_victim_in_cache(struct page_cache *cache) 224 { 225 struct cached_page *page; 226 int oldest_index = 0; 227 s64 oldest_time = S64_MAX; 228 s64 last_used; 229 u16 i; 230 231 /* Find the oldest unclaimed page. We hold the read_threads_mutex. */ 232 for (i = 0; i < cache->cache_slots; i++) { 233 /* A page with a pending read must not be replaced. */ 234 if (cache->cache[i].read_pending) 235 continue; 236 237 last_used = READ_ONCE(cache->cache[i].last_used); 238 if (last_used <= oldest_time) { 239 oldest_time = last_used; 240 oldest_index = i; 241 } 242 } 243 244 page = &cache->cache[oldest_index]; 245 if (page->physical_page != cache->indexable_pages) { 246 WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots); 247 wait_for_pending_searches(cache, page->physical_page); 248 } 249 250 page->read_pending = true; 251 clear_cache_page(cache, page); 252 return page; 253 } 254 255 /* Make a newly filled cache entry available to other threads. */ 256 static int put_page_in_cache(struct page_cache *cache, u32 physical_page, 257 struct cached_page *page) 258 { 259 int result; 260 261 /* We hold the read_threads_mutex. */ 262 result = VDO_ASSERT((page->read_pending), "page to install has a pending read"); 263 if (result != VDO_SUCCESS) 264 return result; 265 266 page->physical_page = physical_page; 267 make_page_most_recent(cache, page); 268 page->read_pending = false; 269 270 /* 271 * We hold the read_threads_mutex, but we must have a write memory barrier before making 272 * the cached_page available to the readers that do not hold the mutex. The corresponding 273 * read memory barrier is in get_page_and_index(). 274 */ 275 smp_wmb(); 276 277 /* This assignment also clears the queued flag. */ 278 WRITE_ONCE(cache->index[physical_page], page - cache->cache); 279 return UDS_SUCCESS; 280 } 281 282 static void cancel_page_in_cache(struct page_cache *cache, u32 physical_page, 283 struct cached_page *page) 284 { 285 int result; 286 287 /* We hold the read_threads_mutex. */ 288 result = VDO_ASSERT((page->read_pending), "page to install has a pending read"); 289 if (result != VDO_SUCCESS) 290 return; 291 292 clear_cache_page(cache, page); 293 page->read_pending = false; 294 295 /* Clear the mapping and the queued flag for the new page. */ 296 WRITE_ONCE(cache->index[physical_page], cache->cache_slots); 297 } 298 299 static inline u16 next_queue_position(u16 position) 300 { 301 return (position + 1) % VOLUME_CACHE_MAX_QUEUED_READS; 302 } 303 304 static inline void advance_queue_position(u16 *position) 305 { 306 *position = next_queue_position(*position); 307 } 308 309 static inline bool read_queue_is_full(struct page_cache *cache) 310 { 311 return cache->read_queue_first == next_queue_position(cache->read_queue_last); 312 } 313 314 static bool enqueue_read(struct page_cache *cache, struct uds_request *request, 315 u32 physical_page) 316 { 317 struct queued_read *queue_entry; 318 u16 last = cache->read_queue_last; 319 u16 read_queue_index; 320 321 /* We hold the read_threads_mutex. */ 322 if ((cache->index[physical_page] & VOLUME_CACHE_QUEUED_FLAG) == 0) { 323 /* This page has no existing entry in the queue. */ 324 if (read_queue_is_full(cache)) 325 return false; 326 327 /* Fill in the read queue entry. */ 328 cache->read_queue[last].physical_page = physical_page; 329 cache->read_queue[last].invalid = false; 330 cache->read_queue[last].first_request = NULL; 331 cache->read_queue[last].last_request = NULL; 332 333 /* Point the cache index to the read queue entry. */ 334 read_queue_index = last; 335 WRITE_ONCE(cache->index[physical_page], 336 read_queue_index | VOLUME_CACHE_QUEUED_FLAG); 337 338 advance_queue_position(&cache->read_queue_last); 339 } else { 340 /* It's already queued, so add this request to the existing entry. */ 341 read_queue_index = cache->index[physical_page] & ~VOLUME_CACHE_QUEUED_FLAG; 342 } 343 344 request->next_request = NULL; 345 queue_entry = &cache->read_queue[read_queue_index]; 346 if (queue_entry->first_request == NULL) 347 queue_entry->first_request = request; 348 else 349 queue_entry->last_request->next_request = request; 350 queue_entry->last_request = request; 351 352 return true; 353 } 354 355 static void enqueue_page_read(struct volume *volume, struct uds_request *request, 356 u32 physical_page) 357 { 358 /* Mark the page as queued, so that chapter invalidation knows to cancel a read. */ 359 while (!enqueue_read(&volume->page_cache, request, physical_page)) { 360 vdo_log_debug("Read queue full, waiting for reads to finish"); 361 uds_wait_cond(&volume->read_threads_read_done_cond, 362 &volume->read_threads_mutex); 363 } 364 365 uds_signal_cond(&volume->read_threads_cond); 366 } 367 368 /* 369 * Reserve the next read queue entry for processing, but do not actually remove it from the queue. 370 * Must be followed by release_queued_requests(). 371 */ 372 static struct queued_read *reserve_read_queue_entry(struct page_cache *cache) 373 { 374 /* We hold the read_threads_mutex. */ 375 struct queued_read *entry; 376 u16 index_value; 377 bool queued; 378 379 /* No items to dequeue */ 380 if (cache->read_queue_next_read == cache->read_queue_last) 381 return NULL; 382 383 entry = &cache->read_queue[cache->read_queue_next_read]; 384 index_value = cache->index[entry->physical_page]; 385 queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0; 386 /* Check to see if it's still queued before resetting. */ 387 if (entry->invalid && queued) 388 WRITE_ONCE(cache->index[entry->physical_page], cache->cache_slots); 389 390 /* 391 * If a synchronous read has taken this page, set invalid to true so it doesn't get 392 * overwritten. Requests will just be requeued. 393 */ 394 if (!queued) 395 entry->invalid = true; 396 397 entry->reserved = true; 398 advance_queue_position(&cache->read_queue_next_read); 399 return entry; 400 } 401 402 static inline struct queued_read *wait_to_reserve_read_queue_entry(struct volume *volume) 403 { 404 struct queued_read *queue_entry = NULL; 405 406 while (!volume->read_threads_exiting) { 407 queue_entry = reserve_read_queue_entry(&volume->page_cache); 408 if (queue_entry != NULL) 409 break; 410 411 uds_wait_cond(&volume->read_threads_cond, &volume->read_threads_mutex); 412 } 413 414 return queue_entry; 415 } 416 417 static int init_chapter_index_page(const struct volume *volume, u8 *index_page, 418 u32 chapter, u32 index_page_number, 419 struct delta_index_page *chapter_index_page) 420 { 421 u64 ci_virtual; 422 u32 ci_chapter; 423 u32 lowest_list; 424 u32 highest_list; 425 struct index_geometry *geometry = volume->geometry; 426 int result; 427 428 result = uds_initialize_chapter_index_page(chapter_index_page, geometry, 429 index_page, volume->nonce); 430 if (volume->lookup_mode == LOOKUP_FOR_REBUILD) 431 return result; 432 433 if (result != UDS_SUCCESS) { 434 return vdo_log_error_strerror(result, 435 "Reading chapter index page for chapter %u page %u", 436 chapter, index_page_number); 437 } 438 439 uds_get_list_number_bounds(volume->index_page_map, chapter, index_page_number, 440 &lowest_list, &highest_list); 441 ci_virtual = chapter_index_page->virtual_chapter_number; 442 ci_chapter = uds_map_to_physical_chapter(geometry, ci_virtual); 443 if ((chapter == ci_chapter) && 444 (lowest_list == chapter_index_page->lowest_list_number) && 445 (highest_list == chapter_index_page->highest_list_number)) 446 return UDS_SUCCESS; 447 448 vdo_log_warning("Index page map updated to %llu", 449 (unsigned long long) volume->index_page_map->last_update); 450 vdo_log_warning("Page map expects that chapter %u page %u has range %u to %u, but chapter index page has chapter %llu with range %u to %u", 451 chapter, index_page_number, lowest_list, highest_list, 452 (unsigned long long) ci_virtual, 453 chapter_index_page->lowest_list_number, 454 chapter_index_page->highest_list_number); 455 return vdo_log_error_strerror(UDS_CORRUPT_DATA, 456 "index page map mismatch with chapter index"); 457 } 458 459 static int initialize_index_page(const struct volume *volume, u32 physical_page, 460 struct cached_page *page) 461 { 462 u32 chapter = map_to_chapter_number(volume->geometry, physical_page); 463 u32 index_page_number = map_to_page_number(volume->geometry, physical_page); 464 465 return init_chapter_index_page(volume, dm_bufio_get_block_data(page->buffer), 466 chapter, index_page_number, &page->index_page); 467 } 468 469 static bool search_record_page(const u8 record_page[], 470 const struct uds_record_name *name, 471 const struct index_geometry *geometry, 472 struct uds_record_data *metadata) 473 { 474 /* 475 * The array of records is sorted by name and stored as a binary tree in heap order, so the 476 * root of the tree is the first array element. 477 */ 478 u32 node = 0; 479 const struct uds_volume_record *records = (const struct uds_volume_record *) record_page; 480 481 while (node < geometry->records_per_page) { 482 int result; 483 const struct uds_volume_record *record = &records[node]; 484 485 result = memcmp(name, &record->name, UDS_RECORD_NAME_SIZE); 486 if (result == 0) { 487 if (metadata != NULL) 488 *metadata = record->data; 489 return true; 490 } 491 492 /* The children of node N are at indexes 2N+1 and 2N+2. */ 493 node = ((2 * node) + ((result < 0) ? 1 : 2)); 494 } 495 496 return false; 497 } 498 499 /* 500 * If we've read in a record page, we're going to do an immediate search, to speed up processing by 501 * avoiding get_record_from_zone(), and to ensure that requests make progress even when queued. If 502 * we've read in an index page, we save the record page number so we don't have to resolve the 503 * index page again. We use the location, virtual_chapter, and old_metadata fields in the request 504 * to allow the index code to know where to begin processing the request again. 505 */ 506 static int search_page(struct cached_page *page, const struct volume *volume, 507 struct uds_request *request, u32 physical_page) 508 { 509 int result; 510 enum uds_index_region location; 511 u16 record_page_number; 512 513 if (is_record_page(volume->geometry, physical_page)) { 514 if (search_record_page(dm_bufio_get_block_data(page->buffer), 515 &request->record_name, volume->geometry, 516 &request->old_metadata)) 517 location = UDS_LOCATION_RECORD_PAGE_LOOKUP; 518 else 519 location = UDS_LOCATION_UNAVAILABLE; 520 } else { 521 result = uds_search_chapter_index_page(&page->index_page, 522 volume->geometry, 523 &request->record_name, 524 &record_page_number); 525 if (result != UDS_SUCCESS) 526 return result; 527 528 if (record_page_number == NO_CHAPTER_INDEX_ENTRY) { 529 location = UDS_LOCATION_UNAVAILABLE; 530 } else { 531 location = UDS_LOCATION_INDEX_PAGE_LOOKUP; 532 *((u16 *) &request->old_metadata) = record_page_number; 533 } 534 } 535 536 request->location = location; 537 request->found = false; 538 return UDS_SUCCESS; 539 } 540 541 static int process_entry(struct volume *volume, struct queued_read *entry) 542 { 543 u32 page_number = entry->physical_page; 544 struct uds_request *request; 545 struct cached_page *page = NULL; 546 u8 *page_data; 547 int result; 548 549 if (entry->invalid) { 550 vdo_log_debug("Requeuing requests for invalid page"); 551 return UDS_SUCCESS; 552 } 553 554 page = select_victim_in_cache(&volume->page_cache); 555 556 mutex_unlock(&volume->read_threads_mutex); 557 page_data = dm_bufio_read(volume->client, page_number, &page->buffer); 558 mutex_lock(&volume->read_threads_mutex); 559 if (IS_ERR(page_data)) { 560 result = -PTR_ERR(page_data); 561 vdo_log_warning_strerror(result, 562 "error reading physical page %u from volume", 563 page_number); 564 cancel_page_in_cache(&volume->page_cache, page_number, page); 565 return result; 566 } 567 568 if (entry->invalid) { 569 vdo_log_warning("Page %u invalidated after read", page_number); 570 cancel_page_in_cache(&volume->page_cache, page_number, page); 571 return UDS_SUCCESS; 572 } 573 574 if (!is_record_page(volume->geometry, page_number)) { 575 result = initialize_index_page(volume, page_number, page); 576 if (result != UDS_SUCCESS) { 577 vdo_log_warning("Error initializing chapter index page"); 578 cancel_page_in_cache(&volume->page_cache, page_number, page); 579 return result; 580 } 581 } 582 583 result = put_page_in_cache(&volume->page_cache, page_number, page); 584 if (result != UDS_SUCCESS) { 585 vdo_log_warning("Error putting page %u in cache", page_number); 586 cancel_page_in_cache(&volume->page_cache, page_number, page); 587 return result; 588 } 589 590 request = entry->first_request; 591 while ((request != NULL) && (result == UDS_SUCCESS)) { 592 result = search_page(page, volume, request, page_number); 593 request = request->next_request; 594 } 595 596 return result; 597 } 598 599 static void release_queued_requests(struct volume *volume, struct queued_read *entry, 600 int result) 601 { 602 struct page_cache *cache = &volume->page_cache; 603 u16 next_read = cache->read_queue_next_read; 604 struct uds_request *request; 605 struct uds_request *next; 606 607 for (request = entry->first_request; request != NULL; request = next) { 608 next = request->next_request; 609 request->status = result; 610 request->requeued = true; 611 uds_enqueue_request(request, STAGE_INDEX); 612 } 613 614 entry->reserved = false; 615 616 /* Move the read_queue_first pointer as far as we can. */ 617 while ((cache->read_queue_first != next_read) && 618 (!cache->read_queue[cache->read_queue_first].reserved)) 619 advance_queue_position(&cache->read_queue_first); 620 uds_broadcast_cond(&volume->read_threads_read_done_cond); 621 } 622 623 static void read_thread_function(void *arg) 624 { 625 struct volume *volume = arg; 626 627 vdo_log_debug("reader starting"); 628 mutex_lock(&volume->read_threads_mutex); 629 while (true) { 630 struct queued_read *queue_entry; 631 int result; 632 633 queue_entry = wait_to_reserve_read_queue_entry(volume); 634 if (volume->read_threads_exiting) 635 break; 636 637 result = process_entry(volume, queue_entry); 638 release_queued_requests(volume, queue_entry, result); 639 } 640 mutex_unlock(&volume->read_threads_mutex); 641 vdo_log_debug("reader done"); 642 } 643 644 static void get_page_and_index(struct page_cache *cache, u32 physical_page, 645 int *queue_index, struct cached_page **page_ptr) 646 { 647 u16 index_value; 648 u16 index; 649 bool queued; 650 651 /* 652 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any 653 * thread holding the read_threads_mutex. 654 * 655 * Holding only a search_pending_counter is the most frequent case. 656 */ 657 /* 658 * It would be unlikely for the compiler to turn the usage of index_value into two reads of 659 * cache->index, but it would be possible and very bad if those reads did not return the 660 * same bits. 661 */ 662 index_value = READ_ONCE(cache->index[physical_page]); 663 queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0; 664 index = index_value & ~VOLUME_CACHE_QUEUED_FLAG; 665 666 if (!queued && (index < cache->cache_slots)) { 667 *page_ptr = &cache->cache[index]; 668 /* 669 * We have acquired access to the cached page, but unless we hold the 670 * read_threads_mutex, we need a read memory barrier now. The corresponding write 671 * memory barrier is in put_page_in_cache(). 672 */ 673 smp_rmb(); 674 } else { 675 *page_ptr = NULL; 676 } 677 678 *queue_index = queued ? index : -1; 679 } 680 681 static void get_page_from_cache(struct page_cache *cache, u32 physical_page, 682 struct cached_page **page) 683 { 684 /* 685 * ASSERTION: We are in a zone thread. 686 * ASSERTION: We holding a search_pending_counter or the read_threads_mutex. 687 */ 688 int queue_index = -1; 689 690 get_page_and_index(cache, physical_page, &queue_index, page); 691 } 692 693 static int read_page_locked(struct volume *volume, u32 physical_page, 694 struct cached_page **page_ptr) 695 { 696 int result = UDS_SUCCESS; 697 struct cached_page *page = NULL; 698 u8 *page_data; 699 700 page = select_victim_in_cache(&volume->page_cache); 701 page_data = dm_bufio_read(volume->client, physical_page, &page->buffer); 702 if (IS_ERR(page_data)) { 703 result = -PTR_ERR(page_data); 704 vdo_log_warning_strerror(result, 705 "error reading physical page %u from volume", 706 physical_page); 707 cancel_page_in_cache(&volume->page_cache, physical_page, page); 708 return result; 709 } 710 711 if (!is_record_page(volume->geometry, physical_page)) { 712 result = initialize_index_page(volume, physical_page, page); 713 if (result != UDS_SUCCESS) { 714 if (volume->lookup_mode != LOOKUP_FOR_REBUILD) 715 vdo_log_warning("Corrupt index page %u", physical_page); 716 cancel_page_in_cache(&volume->page_cache, physical_page, page); 717 return result; 718 } 719 } 720 721 result = put_page_in_cache(&volume->page_cache, physical_page, page); 722 if (result != UDS_SUCCESS) { 723 vdo_log_warning("Error putting page %u in cache", physical_page); 724 cancel_page_in_cache(&volume->page_cache, physical_page, page); 725 return result; 726 } 727 728 *page_ptr = page; 729 return UDS_SUCCESS; 730 } 731 732 /* Retrieve a page from the cache while holding the read threads mutex. */ 733 static int get_volume_page_locked(struct volume *volume, u32 physical_page, 734 struct cached_page **page_ptr) 735 { 736 int result; 737 struct cached_page *page = NULL; 738 739 get_page_from_cache(&volume->page_cache, physical_page, &page); 740 if (page == NULL) { 741 result = read_page_locked(volume, physical_page, &page); 742 if (result != UDS_SUCCESS) 743 return result; 744 } else { 745 make_page_most_recent(&volume->page_cache, page); 746 } 747 748 *page_ptr = page; 749 return UDS_SUCCESS; 750 } 751 752 /* Retrieve a page from the cache while holding a search_pending lock. */ 753 static int get_volume_page_protected(struct volume *volume, struct uds_request *request, 754 u32 physical_page, struct cached_page **page_ptr) 755 { 756 struct cached_page *page; 757 758 get_page_from_cache(&volume->page_cache, physical_page, &page); 759 if (page != NULL) { 760 if (request->zone_number == 0) { 761 /* Only one zone is allowed to update the LRU. */ 762 make_page_most_recent(&volume->page_cache, page); 763 } 764 765 *page_ptr = page; 766 return UDS_SUCCESS; 767 } 768 769 /* Prepare to enqueue a read for the page. */ 770 end_pending_search(&volume->page_cache, request->zone_number); 771 mutex_lock(&volume->read_threads_mutex); 772 773 /* 774 * Do the lookup again while holding the read mutex (no longer the fast case so this should 775 * be fine to repeat). We need to do this because a page may have been added to the cache 776 * by a reader thread between the time we searched above and the time we went to actually 777 * try to enqueue it below. This could result in us enqueuing another read for a page which 778 * is already in the cache, which would mean we end up with two entries in the cache for 779 * the same page. 780 */ 781 get_page_from_cache(&volume->page_cache, physical_page, &page); 782 if (page == NULL) { 783 enqueue_page_read(volume, request, physical_page); 784 /* 785 * The performance gain from unlocking first, while "search pending" mode is off, 786 * turns out to be significant in some cases. The page is not available yet so 787 * the order does not matter for correctness as it does below. 788 */ 789 mutex_unlock(&volume->read_threads_mutex); 790 begin_pending_search(&volume->page_cache, physical_page, 791 request->zone_number); 792 return UDS_QUEUED; 793 } 794 795 /* 796 * Now that the page is loaded, the volume needs to switch to "reader thread unlocked" and 797 * "search pending" state in careful order so no other thread can mess with the data before 798 * the caller gets to look at it. 799 */ 800 begin_pending_search(&volume->page_cache, physical_page, request->zone_number); 801 mutex_unlock(&volume->read_threads_mutex); 802 *page_ptr = page; 803 return UDS_SUCCESS; 804 } 805 806 static int get_volume_page(struct volume *volume, u32 chapter, u32 page_number, 807 struct cached_page **page_ptr) 808 { 809 int result; 810 u32 physical_page = map_to_physical_page(volume->geometry, chapter, page_number); 811 812 mutex_lock(&volume->read_threads_mutex); 813 result = get_volume_page_locked(volume, physical_page, page_ptr); 814 mutex_unlock(&volume->read_threads_mutex); 815 return result; 816 } 817 818 int uds_get_volume_record_page(struct volume *volume, u32 chapter, u32 page_number, 819 u8 **data_ptr) 820 { 821 int result; 822 struct cached_page *page = NULL; 823 824 result = get_volume_page(volume, chapter, page_number, &page); 825 if (result == UDS_SUCCESS) 826 *data_ptr = dm_bufio_get_block_data(page->buffer); 827 return result; 828 } 829 830 int uds_get_volume_index_page(struct volume *volume, u32 chapter, u32 page_number, 831 struct delta_index_page **index_page_ptr) 832 { 833 int result; 834 struct cached_page *page = NULL; 835 836 result = get_volume_page(volume, chapter, page_number, &page); 837 if (result == UDS_SUCCESS) 838 *index_page_ptr = &page->index_page; 839 return result; 840 } 841 842 /* 843 * Find the record page associated with a name in a given index page. This will return UDS_QUEUED 844 * if the page in question must be read from storage. 845 */ 846 static int search_cached_index_page(struct volume *volume, struct uds_request *request, 847 u32 chapter, u32 index_page_number, 848 u16 *record_page_number) 849 { 850 int result; 851 struct cached_page *page = NULL; 852 u32 physical_page = map_to_physical_page(volume->geometry, chapter, 853 index_page_number); 854 855 /* 856 * Make sure the invalidate counter is updated before we try and read the mapping. This 857 * prevents this thread from reading a page in the cache which has already been marked for 858 * invalidation by the reader thread, before the reader thread has noticed that the 859 * invalidate_counter has been incremented. 860 */ 861 begin_pending_search(&volume->page_cache, physical_page, request->zone_number); 862 863 result = get_volume_page_protected(volume, request, physical_page, &page); 864 if (result != UDS_SUCCESS) { 865 end_pending_search(&volume->page_cache, request->zone_number); 866 return result; 867 } 868 869 result = uds_search_chapter_index_page(&page->index_page, volume->geometry, 870 &request->record_name, 871 record_page_number); 872 end_pending_search(&volume->page_cache, request->zone_number); 873 return result; 874 } 875 876 /* 877 * Find the metadata associated with a name in a given record page. This will return UDS_QUEUED if 878 * the page in question must be read from storage. 879 */ 880 int uds_search_cached_record_page(struct volume *volume, struct uds_request *request, 881 u32 chapter, u16 record_page_number, bool *found) 882 { 883 struct cached_page *record_page; 884 struct index_geometry *geometry = volume->geometry; 885 int result; 886 u32 physical_page, page_number; 887 888 *found = false; 889 if (record_page_number == NO_CHAPTER_INDEX_ENTRY) 890 return UDS_SUCCESS; 891 892 result = VDO_ASSERT(record_page_number < geometry->record_pages_per_chapter, 893 "0 <= %d < %u", record_page_number, 894 geometry->record_pages_per_chapter); 895 if (result != VDO_SUCCESS) 896 return result; 897 898 page_number = geometry->index_pages_per_chapter + record_page_number; 899 900 physical_page = map_to_physical_page(volume->geometry, chapter, page_number); 901 902 /* 903 * Make sure the invalidate counter is updated before we try and read the mapping. This 904 * prevents this thread from reading a page in the cache which has already been marked for 905 * invalidation by the reader thread, before the reader thread has noticed that the 906 * invalidate_counter has been incremented. 907 */ 908 begin_pending_search(&volume->page_cache, physical_page, request->zone_number); 909 910 result = get_volume_page_protected(volume, request, physical_page, &record_page); 911 if (result != UDS_SUCCESS) { 912 end_pending_search(&volume->page_cache, request->zone_number); 913 return result; 914 } 915 916 if (search_record_page(dm_bufio_get_block_data(record_page->buffer), 917 &request->record_name, geometry, &request->old_metadata)) 918 *found = true; 919 920 end_pending_search(&volume->page_cache, request->zone_number); 921 return UDS_SUCCESS; 922 } 923 924 void uds_prefetch_volume_chapter(const struct volume *volume, u32 chapter) 925 { 926 const struct index_geometry *geometry = volume->geometry; 927 u32 physical_page = map_to_physical_page(geometry, chapter, 0); 928 929 dm_bufio_prefetch(volume->client, physical_page, geometry->pages_per_chapter); 930 } 931 932 int uds_read_chapter_index_from_volume(const struct volume *volume, u64 virtual_chapter, 933 struct dm_buffer *volume_buffers[], 934 struct delta_index_page index_pages[]) 935 { 936 int result; 937 u32 i; 938 const struct index_geometry *geometry = volume->geometry; 939 u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter); 940 u32 physical_page = map_to_physical_page(geometry, physical_chapter, 0); 941 942 dm_bufio_prefetch(volume->client, physical_page, geometry->index_pages_per_chapter); 943 for (i = 0; i < geometry->index_pages_per_chapter; i++) { 944 u8 *index_page; 945 946 index_page = dm_bufio_read(volume->client, physical_page + i, 947 &volume_buffers[i]); 948 if (IS_ERR(index_page)) { 949 result = -PTR_ERR(index_page); 950 vdo_log_warning_strerror(result, 951 "error reading physical page %u", 952 physical_page); 953 return result; 954 } 955 956 result = init_chapter_index_page(volume, index_page, physical_chapter, i, 957 &index_pages[i]); 958 if (result != UDS_SUCCESS) 959 return result; 960 } 961 962 return UDS_SUCCESS; 963 } 964 965 int uds_search_volume_page_cache(struct volume *volume, struct uds_request *request, 966 bool *found) 967 { 968 int result; 969 u32 physical_chapter = 970 uds_map_to_physical_chapter(volume->geometry, request->virtual_chapter); 971 u32 index_page_number; 972 u16 record_page_number; 973 974 index_page_number = uds_find_index_page_number(volume->index_page_map, 975 &request->record_name, 976 physical_chapter); 977 978 if (request->location == UDS_LOCATION_INDEX_PAGE_LOOKUP) { 979 record_page_number = *((u16 *) &request->old_metadata); 980 } else { 981 result = search_cached_index_page(volume, request, physical_chapter, 982 index_page_number, 983 &record_page_number); 984 if (result != UDS_SUCCESS) 985 return result; 986 } 987 988 return uds_search_cached_record_page(volume, request, physical_chapter, 989 record_page_number, found); 990 } 991 992 int uds_search_volume_page_cache_for_rebuild(struct volume *volume, 993 const struct uds_record_name *name, 994 u64 virtual_chapter, bool *found) 995 { 996 int result; 997 struct index_geometry *geometry = volume->geometry; 998 struct cached_page *page; 999 u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter); 1000 u32 index_page_number; 1001 u16 record_page_number; 1002 u32 page_number; 1003 1004 *found = false; 1005 index_page_number = 1006 uds_find_index_page_number(volume->index_page_map, name, 1007 physical_chapter); 1008 result = get_volume_page(volume, physical_chapter, index_page_number, &page); 1009 if (result != UDS_SUCCESS) 1010 return result; 1011 1012 result = uds_search_chapter_index_page(&page->index_page, geometry, name, 1013 &record_page_number); 1014 if (result != UDS_SUCCESS) 1015 return result; 1016 1017 if (record_page_number == NO_CHAPTER_INDEX_ENTRY) 1018 return UDS_SUCCESS; 1019 1020 page_number = geometry->index_pages_per_chapter + record_page_number; 1021 result = get_volume_page(volume, physical_chapter, page_number, &page); 1022 if (result != UDS_SUCCESS) 1023 return result; 1024 1025 *found = search_record_page(dm_bufio_get_block_data(page->buffer), name, 1026 geometry, NULL); 1027 return UDS_SUCCESS; 1028 } 1029 1030 static void invalidate_page(struct page_cache *cache, u32 physical_page) 1031 { 1032 struct cached_page *page; 1033 int queue_index = -1; 1034 1035 /* We hold the read_threads_mutex. */ 1036 get_page_and_index(cache, physical_page, &queue_index, &page); 1037 if (page != NULL) { 1038 WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots); 1039 wait_for_pending_searches(cache, page->physical_page); 1040 clear_cache_page(cache, page); 1041 } else if (queue_index > -1) { 1042 vdo_log_debug("setting pending read to invalid"); 1043 cache->read_queue[queue_index].invalid = true; 1044 } 1045 } 1046 1047 void uds_forget_chapter(struct volume *volume, u64 virtual_chapter) 1048 { 1049 u32 physical_chapter = 1050 uds_map_to_physical_chapter(volume->geometry, virtual_chapter); 1051 u32 first_page = map_to_physical_page(volume->geometry, physical_chapter, 0); 1052 u32 i; 1053 1054 vdo_log_debug("forgetting chapter %llu", (unsigned long long) virtual_chapter); 1055 mutex_lock(&volume->read_threads_mutex); 1056 for (i = 0; i < volume->geometry->pages_per_chapter; i++) 1057 invalidate_page(&volume->page_cache, first_page + i); 1058 mutex_unlock(&volume->read_threads_mutex); 1059 } 1060 1061 /* 1062 * Donate an index pages from a newly written chapter to the page cache since it is likely to be 1063 * used again soon. The caller must already hold the reader thread mutex. 1064 */ 1065 static int donate_index_page_locked(struct volume *volume, u32 physical_chapter, 1066 u32 index_page_number, struct dm_buffer *page_buffer) 1067 { 1068 int result; 1069 struct cached_page *page = NULL; 1070 u32 physical_page = 1071 map_to_physical_page(volume->geometry, physical_chapter, 1072 index_page_number); 1073 1074 page = select_victim_in_cache(&volume->page_cache); 1075 page->buffer = page_buffer; 1076 result = init_chapter_index_page(volume, dm_bufio_get_block_data(page_buffer), 1077 physical_chapter, index_page_number, 1078 &page->index_page); 1079 if (result != UDS_SUCCESS) { 1080 vdo_log_warning("Error initialize chapter index page"); 1081 cancel_page_in_cache(&volume->page_cache, physical_page, page); 1082 return result; 1083 } 1084 1085 result = put_page_in_cache(&volume->page_cache, physical_page, page); 1086 if (result != UDS_SUCCESS) { 1087 vdo_log_warning("Error putting page %u in cache", physical_page); 1088 cancel_page_in_cache(&volume->page_cache, physical_page, page); 1089 return result; 1090 } 1091 1092 return UDS_SUCCESS; 1093 } 1094 1095 static int write_index_pages(struct volume *volume, u32 physical_chapter_number, 1096 struct open_chapter_index *chapter_index) 1097 { 1098 struct index_geometry *geometry = volume->geometry; 1099 struct dm_buffer *page_buffer; 1100 u32 first_index_page = map_to_physical_page(geometry, physical_chapter_number, 0); 1101 u32 delta_list_number = 0; 1102 u32 index_page_number; 1103 1104 for (index_page_number = 0; 1105 index_page_number < geometry->index_pages_per_chapter; 1106 index_page_number++) { 1107 u8 *page_data; 1108 u32 physical_page = first_index_page + index_page_number; 1109 u32 lists_packed; 1110 bool last_page; 1111 int result; 1112 1113 page_data = dm_bufio_new(volume->client, physical_page, &page_buffer); 1114 if (IS_ERR(page_data)) { 1115 return vdo_log_warning_strerror(-PTR_ERR(page_data), 1116 "failed to prepare index page"); 1117 } 1118 1119 last_page = ((index_page_number + 1) == geometry->index_pages_per_chapter); 1120 result = uds_pack_open_chapter_index_page(chapter_index, page_data, 1121 delta_list_number, last_page, 1122 &lists_packed); 1123 if (result != UDS_SUCCESS) { 1124 dm_bufio_release(page_buffer); 1125 return vdo_log_warning_strerror(result, 1126 "failed to pack index page"); 1127 } 1128 1129 dm_bufio_mark_buffer_dirty(page_buffer); 1130 1131 if (lists_packed == 0) { 1132 vdo_log_debug("no delta lists packed on chapter %u page %u", 1133 physical_chapter_number, index_page_number); 1134 } else { 1135 delta_list_number += lists_packed; 1136 } 1137 1138 uds_update_index_page_map(volume->index_page_map, 1139 chapter_index->virtual_chapter_number, 1140 physical_chapter_number, index_page_number, 1141 delta_list_number - 1); 1142 1143 mutex_lock(&volume->read_threads_mutex); 1144 result = donate_index_page_locked(volume, physical_chapter_number, 1145 index_page_number, page_buffer); 1146 mutex_unlock(&volume->read_threads_mutex); 1147 if (result != UDS_SUCCESS) { 1148 dm_bufio_release(page_buffer); 1149 return result; 1150 } 1151 } 1152 1153 return UDS_SUCCESS; 1154 } 1155 1156 static u32 encode_tree(u8 record_page[], 1157 const struct uds_volume_record *sorted_pointers[], 1158 u32 next_record, u32 node, u32 node_count) 1159 { 1160 if (node < node_count) { 1161 u32 child = (2 * node) + 1; 1162 1163 next_record = encode_tree(record_page, sorted_pointers, next_record, 1164 child, node_count); 1165 1166 /* 1167 * In-order traversal: copy the contents of the next record into the page at the 1168 * node offset. 1169 */ 1170 memcpy(&record_page[node * BYTES_PER_RECORD], 1171 sorted_pointers[next_record++], BYTES_PER_RECORD); 1172 1173 next_record = encode_tree(record_page, sorted_pointers, next_record, 1174 child + 1, node_count); 1175 } 1176 1177 return next_record; 1178 } 1179 1180 static int encode_record_page(const struct volume *volume, 1181 const struct uds_volume_record records[], u8 record_page[]) 1182 { 1183 int result; 1184 u32 i; 1185 u32 records_per_page = volume->geometry->records_per_page; 1186 const struct uds_volume_record **record_pointers = volume->record_pointers; 1187 1188 for (i = 0; i < records_per_page; i++) 1189 record_pointers[i] = &records[i]; 1190 1191 /* 1192 * Sort the record pointers by using just the names in the records, which is less work than 1193 * sorting the entire record values. 1194 */ 1195 BUILD_BUG_ON(offsetof(struct uds_volume_record, name) != 0); 1196 result = uds_radix_sort(volume->radix_sorter, (const u8 **) record_pointers, 1197 records_per_page, UDS_RECORD_NAME_SIZE); 1198 if (result != UDS_SUCCESS) 1199 return result; 1200 1201 encode_tree(record_page, record_pointers, 0, 0, records_per_page); 1202 return UDS_SUCCESS; 1203 } 1204 1205 static int write_record_pages(struct volume *volume, u32 physical_chapter_number, 1206 const struct uds_volume_record *records) 1207 { 1208 u32 record_page_number; 1209 struct index_geometry *geometry = volume->geometry; 1210 struct dm_buffer *page_buffer; 1211 const struct uds_volume_record *next_record = records; 1212 u32 first_record_page = map_to_physical_page(geometry, physical_chapter_number, 1213 geometry->index_pages_per_chapter); 1214 1215 for (record_page_number = 0; 1216 record_page_number < geometry->record_pages_per_chapter; 1217 record_page_number++) { 1218 u8 *page_data; 1219 u32 physical_page = first_record_page + record_page_number; 1220 int result; 1221 1222 page_data = dm_bufio_new(volume->client, physical_page, &page_buffer); 1223 if (IS_ERR(page_data)) { 1224 return vdo_log_warning_strerror(-PTR_ERR(page_data), 1225 "failed to prepare record page"); 1226 } 1227 1228 result = encode_record_page(volume, next_record, page_data); 1229 if (result != UDS_SUCCESS) { 1230 dm_bufio_release(page_buffer); 1231 return vdo_log_warning_strerror(result, 1232 "failed to encode record page %u", 1233 record_page_number); 1234 } 1235 1236 next_record += geometry->records_per_page; 1237 dm_bufio_mark_buffer_dirty(page_buffer); 1238 dm_bufio_release(page_buffer); 1239 } 1240 1241 return UDS_SUCCESS; 1242 } 1243 1244 int uds_write_chapter(struct volume *volume, struct open_chapter_index *chapter_index, 1245 const struct uds_volume_record *records) 1246 { 1247 int result; 1248 u32 physical_chapter_number = 1249 uds_map_to_physical_chapter(volume->geometry, 1250 chapter_index->virtual_chapter_number); 1251 1252 result = write_index_pages(volume, physical_chapter_number, chapter_index); 1253 if (result != UDS_SUCCESS) 1254 return result; 1255 1256 result = write_record_pages(volume, physical_chapter_number, records); 1257 if (result != UDS_SUCCESS) 1258 return result; 1259 1260 result = -dm_bufio_write_dirty_buffers(volume->client); 1261 if (result != UDS_SUCCESS) 1262 vdo_log_error_strerror(result, "cannot sync chapter to volume"); 1263 1264 return result; 1265 } 1266 1267 static void probe_chapter(struct volume *volume, u32 chapter_number, 1268 u64 *virtual_chapter_number) 1269 { 1270 const struct index_geometry *geometry = volume->geometry; 1271 u32 expected_list_number = 0; 1272 u32 i; 1273 u64 vcn = BAD_CHAPTER; 1274 1275 *virtual_chapter_number = BAD_CHAPTER; 1276 dm_bufio_prefetch(volume->client, 1277 map_to_physical_page(geometry, chapter_number, 0), 1278 geometry->index_pages_per_chapter); 1279 1280 for (i = 0; i < geometry->index_pages_per_chapter; i++) { 1281 struct delta_index_page *page; 1282 int result; 1283 1284 result = uds_get_volume_index_page(volume, chapter_number, i, &page); 1285 if (result != UDS_SUCCESS) 1286 return; 1287 1288 if (page->virtual_chapter_number == BAD_CHAPTER) { 1289 vdo_log_error("corrupt index page in chapter %u", 1290 chapter_number); 1291 return; 1292 } 1293 1294 if (vcn == BAD_CHAPTER) { 1295 vcn = page->virtual_chapter_number; 1296 } else if (page->virtual_chapter_number != vcn) { 1297 vdo_log_error("inconsistent chapter %u index page %u: expected vcn %llu, got vcn %llu", 1298 chapter_number, i, (unsigned long long) vcn, 1299 (unsigned long long) page->virtual_chapter_number); 1300 return; 1301 } 1302 1303 if (expected_list_number != page->lowest_list_number) { 1304 vdo_log_error("inconsistent chapter %u index page %u: expected list number %u, got list number %u", 1305 chapter_number, i, expected_list_number, 1306 page->lowest_list_number); 1307 return; 1308 } 1309 expected_list_number = page->highest_list_number + 1; 1310 1311 result = uds_validate_chapter_index_page(page, geometry); 1312 if (result != UDS_SUCCESS) 1313 return; 1314 } 1315 1316 if (chapter_number != uds_map_to_physical_chapter(geometry, vcn)) { 1317 vdo_log_error("chapter %u vcn %llu is out of phase (%u)", chapter_number, 1318 (unsigned long long) vcn, geometry->chapters_per_volume); 1319 return; 1320 } 1321 1322 *virtual_chapter_number = vcn; 1323 } 1324 1325 /* Find the last valid physical chapter in the volume. */ 1326 static void find_real_end_of_volume(struct volume *volume, u32 limit, u32 *limit_ptr) 1327 { 1328 u32 span = 1; 1329 u32 tries = 0; 1330 1331 while (limit > 0) { 1332 u32 chapter = (span > limit) ? 0 : limit - span; 1333 u64 vcn = 0; 1334 1335 probe_chapter(volume, chapter, &vcn); 1336 if (vcn == BAD_CHAPTER) { 1337 limit = chapter; 1338 if (++tries > 1) 1339 span *= 2; 1340 } else { 1341 if (span == 1) 1342 break; 1343 span /= 2; 1344 tries = 0; 1345 } 1346 } 1347 1348 *limit_ptr = limit; 1349 } 1350 1351 static int find_chapter_limits(struct volume *volume, u32 chapter_limit, u64 *lowest_vcn, 1352 u64 *highest_vcn) 1353 { 1354 struct index_geometry *geometry = volume->geometry; 1355 u64 zero_vcn; 1356 u64 lowest = BAD_CHAPTER; 1357 u64 highest = BAD_CHAPTER; 1358 u64 moved_chapter = BAD_CHAPTER; 1359 u32 left_chapter = 0; 1360 u32 right_chapter = 0; 1361 u32 bad_chapters = 0; 1362 1363 /* 1364 * This method assumes there is at most one run of contiguous bad chapters caused by 1365 * unflushed writes. Either the bad spot is at the beginning and end, or somewhere in the 1366 * middle. Wherever it is, the highest and lowest VCNs are adjacent to it. Otherwise the 1367 * volume is cleanly saved and somewhere in the middle of it the highest VCN immediately 1368 * precedes the lowest one. 1369 */ 1370 1371 /* It doesn't matter if this results in a bad spot (BAD_CHAPTER). */ 1372 probe_chapter(volume, 0, &zero_vcn); 1373 1374 /* 1375 * Binary search for end of the discontinuity in the monotonically increasing virtual 1376 * chapter numbers; bad spots are treated as a span of BAD_CHAPTER values. In effect we're 1377 * searching for the index of the smallest value less than zero_vcn. In the case we go off 1378 * the end it means that chapter 0 has the lowest vcn. 1379 * 1380 * If a virtual chapter is out-of-order, it will be the one moved by conversion. Always 1381 * skip over the moved chapter when searching, adding it to the range at the end if 1382 * necessary. 1383 */ 1384 if (geometry->remapped_physical > 0) { 1385 u64 remapped_vcn; 1386 1387 probe_chapter(volume, geometry->remapped_physical, &remapped_vcn); 1388 if (remapped_vcn == geometry->remapped_virtual) 1389 moved_chapter = geometry->remapped_physical; 1390 } 1391 1392 left_chapter = 0; 1393 right_chapter = chapter_limit; 1394 1395 while (left_chapter < right_chapter) { 1396 u64 probe_vcn; 1397 u32 chapter = (left_chapter + right_chapter) / 2; 1398 1399 if (chapter == moved_chapter) 1400 chapter--; 1401 1402 probe_chapter(volume, chapter, &probe_vcn); 1403 if (zero_vcn <= probe_vcn) { 1404 left_chapter = chapter + 1; 1405 if (left_chapter == moved_chapter) 1406 left_chapter++; 1407 } else { 1408 right_chapter = chapter; 1409 } 1410 } 1411 1412 /* If left_chapter goes off the end, chapter 0 has the lowest virtual chapter number.*/ 1413 if (left_chapter >= chapter_limit) 1414 left_chapter = 0; 1415 1416 /* At this point, left_chapter is the chapter with the lowest virtual chapter number. */ 1417 probe_chapter(volume, left_chapter, &lowest); 1418 1419 /* The moved chapter might be the lowest in the range. */ 1420 if ((moved_chapter != BAD_CHAPTER) && (lowest == geometry->remapped_virtual + 1)) 1421 lowest = geometry->remapped_virtual; 1422 1423 /* 1424 * Circularly scan backwards, moving over any bad chapters until encountering a good one, 1425 * which is the chapter with the highest vcn. 1426 */ 1427 while (highest == BAD_CHAPTER) { 1428 right_chapter = (right_chapter + chapter_limit - 1) % chapter_limit; 1429 if (right_chapter == moved_chapter) 1430 continue; 1431 1432 probe_chapter(volume, right_chapter, &highest); 1433 if (bad_chapters++ >= MAX_BAD_CHAPTERS) { 1434 vdo_log_error("too many bad chapters in volume: %u", 1435 bad_chapters); 1436 return UDS_CORRUPT_DATA; 1437 } 1438 } 1439 1440 *lowest_vcn = lowest; 1441 *highest_vcn = highest; 1442 return UDS_SUCCESS; 1443 } 1444 1445 /* 1446 * Find the highest and lowest contiguous chapters present in the volume and determine their 1447 * virtual chapter numbers. This is used by rebuild. 1448 */ 1449 int uds_find_volume_chapter_boundaries(struct volume *volume, u64 *lowest_vcn, 1450 u64 *highest_vcn, bool *is_empty) 1451 { 1452 u32 chapter_limit = volume->geometry->chapters_per_volume; 1453 1454 find_real_end_of_volume(volume, chapter_limit, &chapter_limit); 1455 if (chapter_limit == 0) { 1456 *lowest_vcn = 0; 1457 *highest_vcn = 0; 1458 *is_empty = true; 1459 return UDS_SUCCESS; 1460 } 1461 1462 *is_empty = false; 1463 return find_chapter_limits(volume, chapter_limit, lowest_vcn, highest_vcn); 1464 } 1465 1466 int __must_check uds_replace_volume_storage(struct volume *volume, 1467 struct index_layout *layout, 1468 struct block_device *bdev) 1469 { 1470 int result; 1471 u32 i; 1472 1473 result = uds_replace_index_layout_storage(layout, bdev); 1474 if (result != UDS_SUCCESS) 1475 return result; 1476 1477 /* Release all outstanding dm_bufio objects */ 1478 for (i = 0; i < volume->page_cache.indexable_pages; i++) 1479 volume->page_cache.index[i] = volume->page_cache.cache_slots; 1480 for (i = 0; i < volume->page_cache.cache_slots; i++) 1481 clear_cache_page(&volume->page_cache, &volume->page_cache.cache[i]); 1482 if (volume->sparse_cache != NULL) 1483 uds_invalidate_sparse_cache(volume->sparse_cache); 1484 if (volume->client != NULL) 1485 dm_bufio_client_destroy(vdo_forget(volume->client)); 1486 1487 return uds_open_volume_bufio(layout, volume->geometry->bytes_per_page, 1488 volume->reserved_buffers, &volume->client); 1489 } 1490 1491 static int __must_check initialize_page_cache(struct page_cache *cache, 1492 const struct index_geometry *geometry, 1493 u32 chapters_in_cache, 1494 unsigned int zone_count) 1495 { 1496 int result; 1497 u32 i; 1498 1499 cache->indexable_pages = geometry->pages_per_volume + 1; 1500 cache->cache_slots = chapters_in_cache * geometry->record_pages_per_chapter; 1501 cache->zone_count = zone_count; 1502 atomic64_set(&cache->clock, 1); 1503 1504 result = VDO_ASSERT((cache->cache_slots <= VOLUME_CACHE_MAX_ENTRIES), 1505 "requested cache size, %u, within limit %u", 1506 cache->cache_slots, VOLUME_CACHE_MAX_ENTRIES); 1507 if (result != VDO_SUCCESS) 1508 return result; 1509 1510 result = vdo_allocate(VOLUME_CACHE_MAX_QUEUED_READS, struct queued_read, 1511 "volume read queue", &cache->read_queue); 1512 if (result != VDO_SUCCESS) 1513 return result; 1514 1515 result = vdo_allocate(cache->zone_count, struct search_pending_counter, 1516 "Volume Cache Zones", &cache->search_pending_counters); 1517 if (result != VDO_SUCCESS) 1518 return result; 1519 1520 result = vdo_allocate(cache->indexable_pages, u16, "page cache index", 1521 &cache->index); 1522 if (result != VDO_SUCCESS) 1523 return result; 1524 1525 result = vdo_allocate(cache->cache_slots, struct cached_page, "page cache cache", 1526 &cache->cache); 1527 if (result != VDO_SUCCESS) 1528 return result; 1529 1530 /* Initialize index values to invalid values. */ 1531 for (i = 0; i < cache->indexable_pages; i++) 1532 cache->index[i] = cache->cache_slots; 1533 1534 for (i = 0; i < cache->cache_slots; i++) 1535 clear_cache_page(cache, &cache->cache[i]); 1536 1537 return UDS_SUCCESS; 1538 } 1539 1540 int uds_make_volume(const struct uds_configuration *config, struct index_layout *layout, 1541 struct volume **new_volume) 1542 { 1543 unsigned int i; 1544 struct volume *volume = NULL; 1545 struct index_geometry *geometry; 1546 unsigned int reserved_buffers; 1547 int result; 1548 1549 result = vdo_allocate(1, struct volume, "volume", &volume); 1550 if (result != VDO_SUCCESS) 1551 return result; 1552 1553 volume->nonce = uds_get_volume_nonce(layout); 1554 1555 result = uds_copy_index_geometry(config->geometry, &volume->geometry); 1556 if (result != UDS_SUCCESS) { 1557 uds_free_volume(volume); 1558 return vdo_log_warning_strerror(result, 1559 "failed to allocate geometry: error"); 1560 } 1561 geometry = volume->geometry; 1562 1563 /* 1564 * Reserve a buffer for each entry in the page cache, one for the chapter writer, and one 1565 * for each entry in the sparse cache. 1566 */ 1567 reserved_buffers = config->cache_chapters * geometry->record_pages_per_chapter; 1568 reserved_buffers += 1; 1569 if (uds_is_sparse_index_geometry(geometry)) 1570 reserved_buffers += (config->cache_chapters * geometry->index_pages_per_chapter); 1571 volume->reserved_buffers = reserved_buffers; 1572 result = uds_open_volume_bufio(layout, geometry->bytes_per_page, 1573 volume->reserved_buffers, &volume->client); 1574 if (result != UDS_SUCCESS) { 1575 uds_free_volume(volume); 1576 return result; 1577 } 1578 1579 result = uds_make_radix_sorter(geometry->records_per_page, 1580 &volume->radix_sorter); 1581 if (result != UDS_SUCCESS) { 1582 uds_free_volume(volume); 1583 return result; 1584 } 1585 1586 result = vdo_allocate(geometry->records_per_page, 1587 const struct uds_volume_record *, "record pointers", 1588 &volume->record_pointers); 1589 if (result != VDO_SUCCESS) { 1590 uds_free_volume(volume); 1591 return result; 1592 } 1593 1594 if (uds_is_sparse_index_geometry(geometry)) { 1595 size_t page_size = sizeof(struct delta_index_page) + geometry->bytes_per_page; 1596 1597 result = uds_make_sparse_cache(geometry, config->cache_chapters, 1598 config->zone_count, 1599 &volume->sparse_cache); 1600 if (result != UDS_SUCCESS) { 1601 uds_free_volume(volume); 1602 return result; 1603 } 1604 1605 volume->cache_size = 1606 page_size * geometry->index_pages_per_chapter * config->cache_chapters; 1607 } 1608 1609 result = initialize_page_cache(&volume->page_cache, geometry, 1610 config->cache_chapters, config->zone_count); 1611 if (result != UDS_SUCCESS) { 1612 uds_free_volume(volume); 1613 return result; 1614 } 1615 1616 volume->cache_size += volume->page_cache.cache_slots * sizeof(struct delta_index_page); 1617 result = uds_make_index_page_map(geometry, &volume->index_page_map); 1618 if (result != UDS_SUCCESS) { 1619 uds_free_volume(volume); 1620 return result; 1621 } 1622 1623 mutex_init(&volume->read_threads_mutex); 1624 uds_init_cond(&volume->read_threads_read_done_cond); 1625 uds_init_cond(&volume->read_threads_cond); 1626 1627 result = vdo_allocate(config->read_threads, struct thread *, "reader threads", 1628 &volume->reader_threads); 1629 if (result != VDO_SUCCESS) { 1630 uds_free_volume(volume); 1631 return result; 1632 } 1633 1634 for (i = 0; i < config->read_threads; i++) { 1635 result = vdo_create_thread(read_thread_function, (void *) volume, 1636 "reader", &volume->reader_threads[i]); 1637 if (result != VDO_SUCCESS) { 1638 uds_free_volume(volume); 1639 return result; 1640 } 1641 1642 volume->read_thread_count = i + 1; 1643 } 1644 1645 *new_volume = volume; 1646 return UDS_SUCCESS; 1647 } 1648 1649 static void uninitialize_page_cache(struct page_cache *cache) 1650 { 1651 u16 i; 1652 1653 if (cache->cache != NULL) { 1654 for (i = 0; i < cache->cache_slots; i++) 1655 release_page_buffer(&cache->cache[i]); 1656 } 1657 vdo_free(cache->index); 1658 vdo_free(cache->cache); 1659 vdo_free(cache->search_pending_counters); 1660 vdo_free(cache->read_queue); 1661 } 1662 1663 void uds_free_volume(struct volume *volume) 1664 { 1665 if (volume == NULL) 1666 return; 1667 1668 if (volume->reader_threads != NULL) { 1669 unsigned int i; 1670 1671 /* This works even if some threads weren't started. */ 1672 mutex_lock(&volume->read_threads_mutex); 1673 volume->read_threads_exiting = true; 1674 uds_broadcast_cond(&volume->read_threads_cond); 1675 mutex_unlock(&volume->read_threads_mutex); 1676 for (i = 0; i < volume->read_thread_count; i++) 1677 vdo_join_threads(volume->reader_threads[i]); 1678 vdo_free(volume->reader_threads); 1679 volume->reader_threads = NULL; 1680 } 1681 1682 /* Must destroy the client AFTER freeing the cached pages. */ 1683 uninitialize_page_cache(&volume->page_cache); 1684 uds_free_sparse_cache(volume->sparse_cache); 1685 if (volume->client != NULL) 1686 dm_bufio_client_destroy(vdo_forget(volume->client)); 1687 1688 uds_free_index_page_map(volume->index_page_map); 1689 uds_free_radix_sorter(volume->radix_sorter); 1690 vdo_free(volume->geometry); 1691 vdo_free(volume->record_pointers); 1692 vdo_free(volume); 1693 } 1694