1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "block-map.h" 7 8 #include <linux/bio.h> 9 #include <linux/ratelimit.h> 10 11 #include "errors.h" 12 #include "logger.h" 13 #include "memory-alloc.h" 14 #include "permassert.h" 15 16 #include "action-manager.h" 17 #include "admin-state.h" 18 #include "completion.h" 19 #include "constants.h" 20 #include "data-vio.h" 21 #include "encodings.h" 22 #include "io-submitter.h" 23 #include "physical-zone.h" 24 #include "recovery-journal.h" 25 #include "slab-depot.h" 26 #include "status-codes.h" 27 #include "types.h" 28 #include "vdo.h" 29 #include "vio.h" 30 #include "wait-queue.h" 31 32 /** 33 * DOC: Block map eras 34 * 35 * The block map era, or maximum age, is used as follows: 36 * 37 * Each block map page, when dirty, records the earliest recovery journal block sequence number of 38 * the changes reflected in that dirty block. Sequence numbers are classified into eras: every 39 * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras 40 * according to the sequence number they record. 41 * 42 * In the current (newest) era, block map pages are not written unless there is cache pressure. In 43 * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in 44 * this era are issued for write. In all older eras, pages are issued for write immediately. 45 */ 46 47 struct page_descriptor { 48 root_count_t root_index; 49 height_t height; 50 page_number_t page_index; 51 slot_number_t slot; 52 } __packed; 53 54 union page_key { 55 struct page_descriptor descriptor; 56 u64 key; 57 }; 58 59 struct write_if_not_dirtied_context { 60 struct block_map_zone *zone; 61 u8 generation; 62 }; 63 64 struct block_map_tree_segment { 65 struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT]; 66 }; 67 68 struct block_map_tree { 69 struct block_map_tree_segment *segments; 70 }; 71 72 struct forest { 73 struct block_map *map; 74 size_t segments; 75 struct boundary *boundaries; 76 struct tree_page **pages; 77 struct block_map_tree trees[]; 78 }; 79 80 struct cursor_level { 81 page_number_t page_index; 82 slot_number_t slot; 83 }; 84 85 struct cursors; 86 87 struct cursor { 88 struct vdo_waiter waiter; 89 struct block_map_tree *tree; 90 height_t height; 91 struct cursors *parent; 92 struct boundary boundary; 93 struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT]; 94 struct pooled_vio *vio; 95 }; 96 97 struct cursors { 98 struct block_map_zone *zone; 99 struct vio_pool *pool; 100 vdo_entry_callback_fn entry_callback; 101 struct vdo_completion *completion; 102 root_count_t active_roots; 103 struct cursor cursors[]; 104 }; 105 106 static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF; 107 108 /* Used to indicate that the page holding the location of a tree root has been "loaded". */ 109 static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF; 110 111 const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = { 112 .mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F, 113 .pbn_high_nibble = 0, 114 .pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX), 115 }; 116 117 #define LOG_INTERVAL 4000 118 #define DISPLAY_INTERVAL 100000 119 120 /* 121 * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread. 122 * Prevents any compiler shenanigans from affecting other threads reading those stats. 123 */ 124 #define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta)) 125 126 static inline bool is_dirty(const struct page_info *info) 127 { 128 return info->state == PS_DIRTY; 129 } 130 131 static inline bool is_present(const struct page_info *info) 132 { 133 return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY); 134 } 135 136 static inline bool is_in_flight(const struct page_info *info) 137 { 138 return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING); 139 } 140 141 static inline bool is_incoming(const struct page_info *info) 142 { 143 return info->state == PS_INCOMING; 144 } 145 146 static inline bool is_outgoing(const struct page_info *info) 147 { 148 return info->state == PS_OUTGOING; 149 } 150 151 static inline bool is_valid(const struct page_info *info) 152 { 153 return is_present(info) || is_outgoing(info); 154 } 155 156 static char *get_page_buffer(struct page_info *info) 157 { 158 struct vdo_page_cache *cache = info->cache; 159 160 return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE]; 161 } 162 163 static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter) 164 { 165 struct vdo_page_completion *completion; 166 167 if (waiter == NULL) 168 return NULL; 169 170 completion = container_of(waiter, struct vdo_page_completion, waiter); 171 vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION); 172 return completion; 173 } 174 175 /** 176 * initialize_info() - Initialize all page info structures and put them on the free list. 177 * @cache: The page cache. 178 * 179 * Return: VDO_SUCCESS or an error. 180 */ 181 static int initialize_info(struct vdo_page_cache *cache) 182 { 183 struct page_info *info; 184 185 INIT_LIST_HEAD(&cache->free_list); 186 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 187 int result; 188 189 info->cache = cache; 190 info->state = PS_FREE; 191 info->pbn = NO_PAGE; 192 193 result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP, 194 VIO_PRIORITY_METADATA, info, 195 get_page_buffer(info), &info->vio); 196 if (result != VDO_SUCCESS) 197 return result; 198 199 /* The thread ID should never change. */ 200 info->vio->completion.callback_thread_id = cache->zone->thread_id; 201 202 INIT_LIST_HEAD(&info->state_entry); 203 list_add_tail(&info->state_entry, &cache->free_list); 204 INIT_LIST_HEAD(&info->lru_entry); 205 } 206 207 return VDO_SUCCESS; 208 } 209 210 /** 211 * allocate_cache_components() - Allocate components of the cache which require their own 212 * allocation. 213 * @cache: The page cache. 214 * 215 * The caller is responsible for all clean up on errors. 216 * 217 * Return: VDO_SUCCESS or an error code. 218 */ 219 static int __must_check allocate_cache_components(struct vdo_page_cache *cache) 220 { 221 u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE; 222 int result; 223 224 result = vdo_allocate(cache->page_count, struct page_info, "page infos", 225 &cache->infos); 226 if (result != VDO_SUCCESS) 227 return result; 228 229 result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages); 230 if (result != VDO_SUCCESS) 231 return result; 232 233 result = vdo_int_map_create(cache->page_count, &cache->page_map); 234 if (result != VDO_SUCCESS) 235 return result; 236 237 return initialize_info(cache); 238 } 239 240 /** 241 * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's 242 * thread. 243 * @cache: The page cache. 244 * @function_name: The funtion name to report if the assertion fails. 245 */ 246 static inline void assert_on_cache_thread(struct vdo_page_cache *cache, 247 const char *function_name) 248 { 249 thread_id_t thread_id = vdo_get_callback_thread_id(); 250 251 VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id), 252 "%s() must only be called on cache thread %d, not thread %d", 253 function_name, cache->zone->thread_id, thread_id); 254 } 255 256 /** assert_io_allowed() - Assert that a page cache may issue I/O. */ 257 static inline void assert_io_allowed(struct vdo_page_cache *cache) 258 { 259 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state), 260 "VDO page cache may issue I/O"); 261 } 262 263 /** report_cache_pressure() - Log and, if enabled, report cache pressure. */ 264 static void report_cache_pressure(struct vdo_page_cache *cache) 265 { 266 ADD_ONCE(cache->stats.cache_pressure, 1); 267 if (cache->waiter_count > cache->page_count) { 268 if ((cache->pressure_report % LOG_INTERVAL) == 0) 269 vdo_log_info("page cache pressure %u", cache->stats.cache_pressure); 270 271 if (++cache->pressure_report >= DISPLAY_INTERVAL) 272 cache->pressure_report = 0; 273 } 274 } 275 276 /** 277 * get_page_state_name() - Return the name of a page state. 278 * @state: The page state to describe. 279 * 280 * If the page state is invalid a static string is returned and the invalid state is logged. 281 * 282 * Return: A pointer to a static page state name. 283 */ 284 static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state) 285 { 286 int result; 287 static const char * const state_names[] = { 288 "FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING" 289 }; 290 291 BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT); 292 293 result = VDO_ASSERT(state < ARRAY_SIZE(state_names), 294 "Unknown page_state value %d", state); 295 if (result != VDO_SUCCESS) 296 return "[UNKNOWN PAGE STATE]"; 297 298 return state_names[state]; 299 } 300 301 /** 302 * update_counter() - Update the counter associated with a given state. 303 * @info: The page info to count. 304 * @delta: The delta to apply to the counter. 305 */ 306 static void update_counter(struct page_info *info, s32 delta) 307 { 308 struct block_map_statistics *stats = &info->cache->stats; 309 310 switch (info->state) { 311 case PS_FREE: 312 ADD_ONCE(stats->free_pages, delta); 313 return; 314 315 case PS_INCOMING: 316 ADD_ONCE(stats->incoming_pages, delta); 317 return; 318 319 case PS_OUTGOING: 320 ADD_ONCE(stats->outgoing_pages, delta); 321 return; 322 323 case PS_FAILED: 324 ADD_ONCE(stats->failed_pages, delta); 325 return; 326 327 case PS_RESIDENT: 328 ADD_ONCE(stats->clean_pages, delta); 329 return; 330 331 case PS_DIRTY: 332 ADD_ONCE(stats->dirty_pages, delta); 333 return; 334 335 default: 336 return; 337 } 338 } 339 340 /** update_lru() - Update the lru information for an active page. */ 341 static void update_lru(struct page_info *info) 342 { 343 if (info->cache->lru_list.prev != &info->lru_entry) 344 list_move_tail(&info->lru_entry, &info->cache->lru_list); 345 } 346 347 /** 348 * set_info_state() - Set the state of a page_info and put it on the right list, adjusting 349 * counters. 350 * @info: The page info to update. 351 * @new_state: The new state to set. 352 */ 353 static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state) 354 { 355 if (new_state == info->state) 356 return; 357 358 update_counter(info, -1); 359 info->state = new_state; 360 update_counter(info, 1); 361 362 switch (info->state) { 363 case PS_FREE: 364 case PS_FAILED: 365 list_move_tail(&info->state_entry, &info->cache->free_list); 366 return; 367 368 case PS_OUTGOING: 369 list_move_tail(&info->state_entry, &info->cache->outgoing_list); 370 return; 371 372 case PS_DIRTY: 373 return; 374 375 default: 376 list_del_init(&info->state_entry); 377 } 378 } 379 380 /** set_info_pbn() - Set the pbn for an info, updating the map as needed. */ 381 static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn) 382 { 383 struct vdo_page_cache *cache = info->cache; 384 385 /* Either the new or the old page number must be NO_PAGE. */ 386 int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE), 387 "Must free a page before reusing it."); 388 if (result != VDO_SUCCESS) 389 return result; 390 391 if (info->pbn != NO_PAGE) 392 vdo_int_map_remove(cache->page_map, info->pbn); 393 394 info->pbn = pbn; 395 396 if (pbn != NO_PAGE) { 397 result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL); 398 if (result != VDO_SUCCESS) 399 return result; 400 } 401 return VDO_SUCCESS; 402 } 403 404 /** reset_page_info() - Reset page info to represent an unallocated page. */ 405 static int reset_page_info(struct page_info *info) 406 { 407 int result; 408 409 result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy"); 410 if (result != VDO_SUCCESS) 411 return result; 412 413 result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting), 414 "VDO Page must not have waiters"); 415 if (result != VDO_SUCCESS) 416 return result; 417 418 result = set_info_pbn(info, NO_PAGE); 419 set_info_state(info, PS_FREE); 420 list_del_init(&info->lru_entry); 421 return result; 422 } 423 424 /** 425 * find_free_page() - Find a free page. 426 * @cache: The page cache. 427 * 428 * Return: A pointer to the page info structure (if found), NULL otherwise. 429 */ 430 static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache) 431 { 432 struct page_info *info; 433 434 info = list_first_entry_or_null(&cache->free_list, struct page_info, 435 state_entry); 436 if (info != NULL) 437 list_del_init(&info->state_entry); 438 439 return info; 440 } 441 442 /** 443 * find_page() - Find the page info (if any) associated with a given pbn. 444 * @cache: The page cache. 445 * @pbn: The absolute physical block number of the page. 446 * 447 * Return: The page info for the page if available, or NULL if not. 448 */ 449 static struct page_info * __must_check find_page(struct vdo_page_cache *cache, 450 physical_block_number_t pbn) 451 { 452 if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn)) 453 return cache->last_found; 454 455 cache->last_found = vdo_int_map_get(cache->page_map, pbn); 456 return cache->last_found; 457 } 458 459 /** 460 * select_lru_page() - Determine which page is least recently used. 461 * @cache: The page cache. 462 * 463 * Picks the least recently used from among the non-busy entries at the front of each of the lru 464 * list. Since whenever we mark a page busy we also put it to the end of the list it is unlikely 465 * that the entries at the front are busy unless the queue is very short, but not impossible. 466 * 467 * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be 468 * found. The page can be dirty or resident. 469 */ 470 static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache) 471 { 472 struct page_info *info; 473 474 list_for_each_entry(info, &cache->lru_list, lru_entry) 475 if ((info->busy == 0) && !is_in_flight(info)) 476 return info; 477 478 return NULL; 479 } 480 481 /* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */ 482 483 /** 484 * complete_with_page() - Helper to complete the VDO Page Completion request successfully. 485 * @info: The page info representing the result page. 486 * @vdo_page_comp: The VDO page completion to complete. 487 */ 488 static void complete_with_page(struct page_info *info, 489 struct vdo_page_completion *vdo_page_comp) 490 { 491 bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info); 492 493 if (!available) { 494 vdo_log_error_strerror(VDO_BAD_PAGE, 495 "Requested cache page %llu in state %s is not %s", 496 (unsigned long long) info->pbn, 497 get_page_state_name(info->state), 498 vdo_page_comp->writable ? "present" : "valid"); 499 vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE); 500 return; 501 } 502 503 vdo_page_comp->info = info; 504 vdo_page_comp->ready = true; 505 vdo_finish_completion(&vdo_page_comp->completion); 506 } 507 508 /** 509 * complete_waiter_with_error() - Complete a page completion with an error code. 510 * @waiter: The page completion, as a waiter. 511 * @result_ptr: A pointer to the error code. 512 * 513 * Implements waiter_callback_fn. 514 */ 515 static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr) 516 { 517 int *result = result_ptr; 518 519 vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result); 520 } 521 522 /** 523 * complete_waiter_with_page() - Complete a page completion with a page. 524 * @waiter: The page completion, as a waiter. 525 * @page_info: The page info to complete with. 526 * 527 * Implements waiter_callback_fn. 528 */ 529 static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info) 530 { 531 complete_with_page(page_info, page_completion_from_waiter(waiter)); 532 } 533 534 /** 535 * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result. 536 * @info: The loaded page info. 537 * @waitq: The list of waiting data_vios. 538 * 539 * Upon completion the waitq will be empty. 540 * 541 * Return: The number of pages distributed. 542 */ 543 static unsigned int distribute_page_over_waitq(struct page_info *info, 544 struct vdo_wait_queue *waitq) 545 { 546 size_t num_pages; 547 548 update_lru(info); 549 num_pages = vdo_waitq_num_waiters(waitq); 550 551 /* 552 * Increment the busy count once for each pending completion so that this page does not 553 * stop being busy until all completions have been processed. 554 */ 555 info->busy += num_pages; 556 557 vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info); 558 return num_pages; 559 } 560 561 /** 562 * set_persistent_error() - Set a persistent error which all requests will receive in the future. 563 * @cache: The page cache. 564 * @context: A string describing what triggered the error. 565 * @result: The error result to set on the cache. 566 * 567 * Once triggered, all enqueued completions will get this error. Any future requests will result in 568 * this error as well. 569 */ 570 static void set_persistent_error(struct vdo_page_cache *cache, const char *context, 571 int result) 572 { 573 struct page_info *info; 574 /* If we're already read-only, there's no need to log. */ 575 struct vdo *vdo = cache->vdo; 576 577 if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) { 578 vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s", 579 context); 580 vdo_enter_read_only_mode(vdo, result); 581 } 582 583 assert_on_cache_thread(cache, __func__); 584 585 vdo_waitq_notify_all_waiters(&cache->free_waiters, 586 complete_waiter_with_error, &result); 587 cache->waiter_count = 0; 588 589 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 590 vdo_waitq_notify_all_waiters(&info->waiting, 591 complete_waiter_with_error, &result); 592 } 593 } 594 595 /** 596 * validate_completed_page() - Check that a page completion which is being freed to the cache 597 * referred to a valid page and is in a valid state. 598 * @completion: The page completion to check. 599 * @writable: Whether a writable page is required. 600 * 601 * Return: VDO_SUCCESS if the page was valid, otherwise as error 602 */ 603 static int __must_check validate_completed_page(struct vdo_page_completion *completion, 604 bool writable) 605 { 606 int result; 607 608 result = VDO_ASSERT(completion->ready, "VDO Page completion not ready"); 609 if (result != VDO_SUCCESS) 610 return result; 611 612 result = VDO_ASSERT(completion->info != NULL, 613 "VDO Page Completion must be complete"); 614 if (result != VDO_SUCCESS) 615 return result; 616 617 result = VDO_ASSERT(completion->info->pbn == completion->pbn, 618 "VDO Page Completion pbn must be consistent"); 619 if (result != VDO_SUCCESS) 620 return result; 621 622 result = VDO_ASSERT(is_valid(completion->info), 623 "VDO Page Completion page must be valid"); 624 if (result != VDO_SUCCESS) 625 return result; 626 627 if (writable) { 628 result = VDO_ASSERT(completion->writable, 629 "VDO Page Completion must be writable"); 630 if (result != VDO_SUCCESS) 631 return result; 632 } 633 634 return VDO_SUCCESS; 635 } 636 637 static void check_for_drain_complete(struct block_map_zone *zone) 638 { 639 if (vdo_is_state_draining(&zone->state) && 640 (zone->active_lookups == 0) && 641 !vdo_waitq_has_waiters(&zone->flush_waiters) && 642 !is_vio_pool_busy(zone->vio_pool) && 643 (zone->page_cache.outstanding_reads == 0) && 644 (zone->page_cache.outstanding_writes == 0)) { 645 vdo_finish_draining_with_result(&zone->state, 646 (vdo_is_read_only(zone->block_map->vdo) ? 647 VDO_READ_ONLY : VDO_SUCCESS)); 648 } 649 } 650 651 static void enter_zone_read_only_mode(struct block_map_zone *zone, int result) 652 { 653 vdo_enter_read_only_mode(zone->block_map->vdo, result); 654 655 /* 656 * We are in read-only mode, so we won't ever write any page out. 657 * Just take all waiters off the waitq so the zone can drain. 658 */ 659 vdo_waitq_init(&zone->flush_waiters); 660 check_for_drain_complete(zone); 661 } 662 663 static bool __must_check 664 validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion, 665 bool writable) 666 { 667 int result = validate_completed_page(completion, writable); 668 669 if (result == VDO_SUCCESS) 670 return true; 671 672 enter_zone_read_only_mode(completion->info->cache->zone, result); 673 return false; 674 } 675 676 /** 677 * handle_load_error() - Handle page load errors. 678 * @completion: The page read vio. 679 */ 680 static void handle_load_error(struct vdo_completion *completion) 681 { 682 int result = completion->result; 683 struct page_info *info = completion->parent; 684 struct vdo_page_cache *cache = info->cache; 685 686 assert_on_cache_thread(cache, __func__); 687 vio_record_metadata_io_error(as_vio(completion)); 688 vdo_enter_read_only_mode(cache->zone->block_map->vdo, result); 689 ADD_ONCE(cache->stats.failed_reads, 1); 690 set_info_state(info, PS_FAILED); 691 vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result); 692 reset_page_info(info); 693 694 /* 695 * Don't decrement until right before calling check_for_drain_complete() to 696 * ensure that the above work can't cause the page cache to be freed out from under us. 697 */ 698 cache->outstanding_reads--; 699 check_for_drain_complete(cache->zone); 700 } 701 702 /** 703 * page_is_loaded() - Callback used when a page has been loaded. 704 * @completion: The vio which has loaded the page. Its parent is the page_info. 705 */ 706 static void page_is_loaded(struct vdo_completion *completion) 707 { 708 struct page_info *info = completion->parent; 709 struct vdo_page_cache *cache = info->cache; 710 nonce_t nonce = info->cache->zone->block_map->nonce; 711 struct block_map_page *page; 712 enum block_map_page_validity validity; 713 714 assert_on_cache_thread(cache, __func__); 715 716 page = (struct block_map_page *) get_page_buffer(info); 717 validity = vdo_validate_block_map_page(page, nonce, info->pbn); 718 if (validity == VDO_BLOCK_MAP_PAGE_BAD) { 719 physical_block_number_t pbn = vdo_get_block_map_page_pbn(page); 720 int result = vdo_log_error_strerror(VDO_BAD_PAGE, 721 "Expected page %llu but got page %llu instead", 722 (unsigned long long) info->pbn, 723 (unsigned long long) pbn); 724 725 vdo_continue_completion(completion, result); 726 return; 727 } 728 729 if (validity == VDO_BLOCK_MAP_PAGE_INVALID) 730 vdo_format_block_map_page(page, nonce, info->pbn, false); 731 732 info->recovery_lock = 0; 733 set_info_state(info, PS_RESIDENT); 734 distribute_page_over_waitq(info, &info->waiting); 735 736 /* 737 * Don't decrement until right before calling check_for_drain_complete() to 738 * ensure that the above work can't cause the page cache to be freed out from under us. 739 */ 740 cache->outstanding_reads--; 741 check_for_drain_complete(cache->zone); 742 } 743 744 /** 745 * handle_rebuild_read_error() - Handle a read error during a read-only rebuild. 746 * @completion: The page load completion. 747 */ 748 static void handle_rebuild_read_error(struct vdo_completion *completion) 749 { 750 struct page_info *info = completion->parent; 751 struct vdo_page_cache *cache = info->cache; 752 753 assert_on_cache_thread(cache, __func__); 754 755 /* 756 * We are doing a read-only rebuild, so treat this as a successful read 757 * of an uninitialized page. 758 */ 759 vio_record_metadata_io_error(as_vio(completion)); 760 ADD_ONCE(cache->stats.failed_reads, 1); 761 memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE); 762 vdo_reset_completion(completion); 763 page_is_loaded(completion); 764 } 765 766 static void load_cache_page_endio(struct bio *bio) 767 { 768 struct vio *vio = bio->bi_private; 769 struct page_info *info = vio->completion.parent; 770 771 continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id); 772 } 773 774 /** 775 * launch_page_load() - Begin the process of loading a page. 776 * @info: The page info to launch. 777 * @pbn: The absolute physical block number of the page to load. 778 * 779 * Return: VDO_SUCCESS or an error code. 780 */ 781 static int __must_check launch_page_load(struct page_info *info, 782 physical_block_number_t pbn) 783 { 784 int result; 785 vdo_action_fn callback; 786 struct vdo_page_cache *cache = info->cache; 787 788 assert_io_allowed(cache); 789 790 result = set_info_pbn(info, pbn); 791 if (result != VDO_SUCCESS) 792 return result; 793 794 result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading."); 795 if (result != VDO_SUCCESS) 796 return result; 797 798 set_info_state(info, PS_INCOMING); 799 cache->outstanding_reads++; 800 ADD_ONCE(cache->stats.pages_loaded, 1); 801 callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error); 802 vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio, 803 callback, REQ_OP_READ | REQ_PRIO); 804 return VDO_SUCCESS; 805 } 806 807 static void write_pages(struct vdo_completion *completion); 808 809 /** handle_flush_error() - Handle errors flushing the layer. */ 810 static void handle_flush_error(struct vdo_completion *completion) 811 { 812 struct page_info *info = completion->parent; 813 814 vio_record_metadata_io_error(as_vio(completion)); 815 set_persistent_error(info->cache, "flush failed", completion->result); 816 write_pages(completion); 817 } 818 819 static void flush_endio(struct bio *bio) 820 { 821 struct vio *vio = bio->bi_private; 822 struct page_info *info = vio->completion.parent; 823 824 continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id); 825 } 826 827 /** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */ 828 static void save_pages(struct vdo_page_cache *cache) 829 { 830 struct page_info *info; 831 struct vio *vio; 832 833 if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0)) 834 return; 835 836 assert_io_allowed(cache); 837 838 info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry); 839 840 cache->pages_in_flush = cache->pages_to_flush; 841 cache->pages_to_flush = 0; 842 ADD_ONCE(cache->stats.flush_count, 1); 843 844 vio = info->vio; 845 846 /* 847 * We must make sure that the recovery journal entries that changed these pages were 848 * successfully persisted, and thus must issue a flush before each batch of pages is 849 * written to ensure this. 850 */ 851 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error); 852 } 853 854 /** 855 * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved. 856 * @info: The page info to save. 857 * 858 * Once in the list, a page may not be used until it has been written out. 859 */ 860 static void schedule_page_save(struct page_info *info) 861 { 862 if (info->busy > 0) { 863 info->write_status = WRITE_STATUS_DEFERRED; 864 return; 865 } 866 867 info->cache->pages_to_flush++; 868 info->cache->outstanding_writes++; 869 set_info_state(info, PS_OUTGOING); 870 } 871 872 /** 873 * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving 874 * pages if another save is not in progress. 875 * @info: The page info to save. 876 */ 877 static void launch_page_save(struct page_info *info) 878 { 879 schedule_page_save(info); 880 save_pages(info->cache); 881 } 882 883 /** 884 * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is 885 * requesting a given page number. 886 * @waiter: The page completion waiter to check. 887 * @context: A pointer to the pbn of the desired page. 888 * 889 * Implements waiter_match_fn. 890 * 891 * Return: true if the page completion is for the desired page number. 892 */ 893 static bool completion_needs_page(struct vdo_waiter *waiter, void *context) 894 { 895 physical_block_number_t *pbn = context; 896 897 return (page_completion_from_waiter(waiter)->pbn == *pbn); 898 } 899 900 /** 901 * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and 902 * any other completions that match it in page number. 903 * @info: The page info to allocate a page for. 904 */ 905 static void allocate_free_page(struct page_info *info) 906 { 907 int result; 908 struct vdo_waiter *oldest_waiter; 909 physical_block_number_t pbn; 910 struct vdo_page_cache *cache = info->cache; 911 912 assert_on_cache_thread(cache, __func__); 913 914 if (!vdo_waitq_has_waiters(&cache->free_waiters)) { 915 if (cache->stats.cache_pressure > 0) { 916 vdo_log_info("page cache pressure relieved"); 917 WRITE_ONCE(cache->stats.cache_pressure, 0); 918 } 919 920 return; 921 } 922 923 result = reset_page_info(info); 924 if (result != VDO_SUCCESS) { 925 set_persistent_error(cache, "cannot reset page info", result); 926 return; 927 } 928 929 oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters); 930 pbn = page_completion_from_waiter(oldest_waiter)->pbn; 931 932 /* 933 * Remove all entries which match the page number in question and push them onto the page 934 * info's waitq. 935 */ 936 vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page, 937 &pbn, &info->waiting); 938 cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting); 939 940 result = launch_page_load(info, pbn); 941 if (result != VDO_SUCCESS) { 942 vdo_waitq_notify_all_waiters(&info->waiting, 943 complete_waiter_with_error, &result); 944 } 945 } 946 947 /** 948 * discard_a_page() - Begin the process of discarding a page. 949 * @cache: The page cache. 950 * 951 * If no page is discardable, increments a count of deferred frees so that the next release of a 952 * page which is no longer busy will kick off another discard cycle. This is an indication that the 953 * cache is not big enough. 954 * 955 * If the selected page is not dirty, immediately allocates the page to the oldest completion 956 * waiting for a free page. 957 */ 958 static void discard_a_page(struct vdo_page_cache *cache) 959 { 960 struct page_info *info = select_lru_page(cache); 961 962 if (info == NULL) { 963 report_cache_pressure(cache); 964 return; 965 } 966 967 if (!is_dirty(info)) { 968 allocate_free_page(info); 969 return; 970 } 971 972 VDO_ASSERT_LOG_ONLY(!is_in_flight(info), 973 "page selected for discard is not in flight"); 974 975 cache->discard_count++; 976 info->write_status = WRITE_STATUS_DISCARD; 977 launch_page_save(info); 978 } 979 980 static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp) 981 { 982 struct vdo_page_cache *cache = vdo_page_comp->cache; 983 984 cache->waiter_count++; 985 vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter); 986 discard_a_page(cache); 987 } 988 989 /** 990 * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free 991 * page. 992 * @cache: The page cache. 993 */ 994 static void discard_page_if_needed(struct vdo_page_cache *cache) 995 { 996 if (cache->waiter_count > cache->discard_count) 997 discard_a_page(cache); 998 } 999 1000 /** 1001 * write_has_finished() - Inform the cache that a write has finished (possibly with an error). 1002 * @info: The info structure for the page whose write just completed. 1003 * 1004 * Return: true if the page write was a discard. 1005 */ 1006 static bool write_has_finished(struct page_info *info) 1007 { 1008 bool was_discard = (info->write_status == WRITE_STATUS_DISCARD); 1009 1010 assert_on_cache_thread(info->cache, __func__); 1011 info->cache->outstanding_writes--; 1012 1013 info->write_status = WRITE_STATUS_NORMAL; 1014 return was_discard; 1015 } 1016 1017 /** 1018 * handle_page_write_error() - Handler for page write errors. 1019 * @completion: The page write vio. 1020 */ 1021 static void handle_page_write_error(struct vdo_completion *completion) 1022 { 1023 int result = completion->result; 1024 struct page_info *info = completion->parent; 1025 struct vdo_page_cache *cache = info->cache; 1026 1027 vio_record_metadata_io_error(as_vio(completion)); 1028 1029 /* If we're already read-only, write failures are to be expected. */ 1030 if (result != VDO_READ_ONLY) { 1031 vdo_log_ratelimit(vdo_log_error, 1032 "failed to write block map page %llu", 1033 (unsigned long long) info->pbn); 1034 } 1035 1036 set_info_state(info, PS_DIRTY); 1037 ADD_ONCE(cache->stats.failed_writes, 1); 1038 set_persistent_error(cache, "cannot write page", result); 1039 1040 if (!write_has_finished(info)) 1041 discard_page_if_needed(cache); 1042 1043 check_for_drain_complete(cache->zone); 1044 } 1045 1046 static void page_is_written_out(struct vdo_completion *completion); 1047 1048 static void write_cache_page_endio(struct bio *bio) 1049 { 1050 struct vio *vio = bio->bi_private; 1051 struct page_info *info = vio->completion.parent; 1052 1053 continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id); 1054 } 1055 1056 /** 1057 * page_is_written_out() - Callback used when a page has been written out. 1058 * @completion: The vio which wrote the page. Its parent is a page_info. 1059 */ 1060 static void page_is_written_out(struct vdo_completion *completion) 1061 { 1062 bool was_discard, reclaimed; 1063 u32 reclamations; 1064 struct page_info *info = completion->parent; 1065 struct vdo_page_cache *cache = info->cache; 1066 struct block_map_page *page = (struct block_map_page *) get_page_buffer(info); 1067 1068 if (!page->header.initialized) { 1069 page->header.initialized = true; 1070 vdo_submit_metadata_vio(info->vio, info->pbn, 1071 write_cache_page_endio, 1072 handle_page_write_error, 1073 REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH); 1074 return; 1075 } 1076 1077 /* Handle journal updates and torn write protection. */ 1078 vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal, 1079 info->recovery_lock, 1080 VDO_ZONE_TYPE_LOGICAL, 1081 cache->zone->zone_number); 1082 info->recovery_lock = 0; 1083 was_discard = write_has_finished(info); 1084 reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting)); 1085 1086 set_info_state(info, PS_RESIDENT); 1087 1088 reclamations = distribute_page_over_waitq(info, &info->waiting); 1089 ADD_ONCE(cache->stats.reclaimed, reclamations); 1090 1091 if (was_discard) 1092 cache->discard_count--; 1093 1094 if (reclaimed) 1095 discard_page_if_needed(cache); 1096 else 1097 allocate_free_page(info); 1098 1099 check_for_drain_complete(cache->zone); 1100 } 1101 1102 /** 1103 * write_pages() - Write the batch of pages which were covered by the layer flush which just 1104 * completed. 1105 * @flush_completion: The flush vio. 1106 * 1107 * This callback is registered in save_pages(). 1108 */ 1109 static void write_pages(struct vdo_completion *flush_completion) 1110 { 1111 struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache; 1112 1113 /* 1114 * We need to cache these two values on the stack since it is possible for the last 1115 * page info to cause the page cache to get freed. Hence once we launch the last page, 1116 * it may be unsafe to dereference the cache. 1117 */ 1118 bool has_unflushed_pages = (cache->pages_to_flush > 0); 1119 page_count_t pages_in_flush = cache->pages_in_flush; 1120 1121 cache->pages_in_flush = 0; 1122 while (pages_in_flush-- > 0) { 1123 struct page_info *info = 1124 list_first_entry(&cache->outgoing_list, struct page_info, 1125 state_entry); 1126 1127 list_del_init(&info->state_entry); 1128 if (vdo_is_read_only(info->cache->vdo)) { 1129 struct vdo_completion *completion = &info->vio->completion; 1130 1131 vdo_reset_completion(completion); 1132 completion->callback = page_is_written_out; 1133 completion->error_handler = handle_page_write_error; 1134 vdo_fail_completion(completion, VDO_READ_ONLY); 1135 continue; 1136 } 1137 ADD_ONCE(info->cache->stats.pages_saved, 1); 1138 vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio, 1139 handle_page_write_error, REQ_OP_WRITE | REQ_PRIO); 1140 } 1141 1142 if (has_unflushed_pages) { 1143 /* 1144 * If there are unflushed pages, the cache can't have been freed, so this call is 1145 * safe. 1146 */ 1147 save_pages(cache); 1148 } 1149 } 1150 1151 /** 1152 * vdo_release_page_completion() - Release a VDO Page Completion. 1153 * @completion: The page completion to release. 1154 * 1155 * The page referenced by this completion (if any) will no longer be held busy by this completion. 1156 * If a page becomes discardable and there are completions awaiting free pages then a new round of 1157 * page discarding is started. 1158 */ 1159 void vdo_release_page_completion(struct vdo_completion *completion) 1160 { 1161 struct page_info *discard_info = NULL; 1162 struct vdo_page_completion *page_completion = as_vdo_page_completion(completion); 1163 struct vdo_page_cache *cache; 1164 1165 if (completion->result == VDO_SUCCESS) { 1166 if (!validate_completed_page_or_enter_read_only_mode(page_completion, false)) 1167 return; 1168 1169 if (--page_completion->info->busy == 0) 1170 discard_info = page_completion->info; 1171 } 1172 1173 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL), 1174 "Page being released after leaving all queues"); 1175 1176 page_completion->info = NULL; 1177 cache = page_completion->cache; 1178 assert_on_cache_thread(cache, __func__); 1179 1180 if (discard_info != NULL) { 1181 if (discard_info->write_status == WRITE_STATUS_DEFERRED) { 1182 discard_info->write_status = WRITE_STATUS_NORMAL; 1183 launch_page_save(discard_info); 1184 } 1185 1186 /* 1187 * if there are excess requests for pages (that have not already started discards) 1188 * we need to discard some page (which may be this one) 1189 */ 1190 discard_page_if_needed(cache); 1191 } 1192 } 1193 1194 static void load_page_for_completion(struct page_info *info, 1195 struct vdo_page_completion *vdo_page_comp) 1196 { 1197 int result; 1198 1199 vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter); 1200 result = launch_page_load(info, vdo_page_comp->pbn); 1201 if (result != VDO_SUCCESS) { 1202 vdo_waitq_notify_all_waiters(&info->waiting, 1203 complete_waiter_with_error, &result); 1204 } 1205 } 1206 1207 /** 1208 * vdo_get_page() - Initialize a page completion and get a block map page. 1209 * @page_completion: The vdo_page_completion to initialize. 1210 * @zone: The block map zone of the desired page. 1211 * @pbn: The absolute physical block of the desired page. 1212 * @writable: Whether the page can be modified. 1213 * @parent: The object to notify when the fetch is complete. 1214 * @callback: The notification callback. 1215 * @error_handler: The handler for fetch errors. 1216 * @requeue: Whether we must requeue when notifying the parent. 1217 * 1218 * May cause another page to be discarded (potentially writing a dirty page) and the one nominated 1219 * by the completion to be loaded from disk. When the callback is invoked, the page will be 1220 * resident in the cache and marked busy. All callers must call vdo_release_page_completion() 1221 * when they are done with the page to clear the busy mark. 1222 */ 1223 void vdo_get_page(struct vdo_page_completion *page_completion, 1224 struct block_map_zone *zone, physical_block_number_t pbn, 1225 bool writable, void *parent, vdo_action_fn callback, 1226 vdo_action_fn error_handler, bool requeue) 1227 { 1228 struct vdo_page_cache *cache = &zone->page_cache; 1229 struct vdo_completion *completion = &page_completion->completion; 1230 struct page_info *info; 1231 1232 assert_on_cache_thread(cache, __func__); 1233 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL), 1234 "New page completion was not already on a wait queue"); 1235 1236 *page_completion = (struct vdo_page_completion) { 1237 .pbn = pbn, 1238 .writable = writable, 1239 .cache = cache, 1240 }; 1241 1242 vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION); 1243 vdo_prepare_completion(completion, callback, error_handler, 1244 cache->zone->thread_id, parent); 1245 completion->requeue = requeue; 1246 1247 if (page_completion->writable && vdo_is_read_only(cache->vdo)) { 1248 vdo_fail_completion(completion, VDO_READ_ONLY); 1249 return; 1250 } 1251 1252 if (page_completion->writable) 1253 ADD_ONCE(cache->stats.write_count, 1); 1254 else 1255 ADD_ONCE(cache->stats.read_count, 1); 1256 1257 info = find_page(cache, page_completion->pbn); 1258 if (info != NULL) { 1259 /* The page is in the cache already. */ 1260 if ((info->write_status == WRITE_STATUS_DEFERRED) || 1261 is_incoming(info) || 1262 (is_outgoing(info) && page_completion->writable)) { 1263 /* The page is unusable until it has finished I/O. */ 1264 ADD_ONCE(cache->stats.wait_for_page, 1); 1265 vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter); 1266 return; 1267 } 1268 1269 if (is_valid(info)) { 1270 /* The page is usable. */ 1271 ADD_ONCE(cache->stats.found_in_cache, 1); 1272 if (!is_present(info)) 1273 ADD_ONCE(cache->stats.read_outgoing, 1); 1274 update_lru(info); 1275 info->busy++; 1276 complete_with_page(info, page_completion); 1277 return; 1278 } 1279 1280 /* Something horrible has gone wrong. */ 1281 VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state."); 1282 } 1283 1284 /* The page must be fetched. */ 1285 info = find_free_page(cache); 1286 if (info != NULL) { 1287 ADD_ONCE(cache->stats.fetch_required, 1); 1288 load_page_for_completion(info, page_completion); 1289 return; 1290 } 1291 1292 /* The page must wait for a page to be discarded. */ 1293 ADD_ONCE(cache->stats.discard_required, 1); 1294 discard_page_for_completion(page_completion); 1295 } 1296 1297 /** 1298 * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy. 1299 * @completion: The vdo_page_completion containing the page. 1300 */ 1301 void vdo_request_page_write(struct vdo_completion *completion) 1302 { 1303 struct page_info *info; 1304 struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion); 1305 1306 if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true)) 1307 return; 1308 1309 info = vdo_page_comp->info; 1310 set_info_state(info, PS_DIRTY); 1311 launch_page_save(info); 1312 } 1313 1314 /** 1315 * vdo_get_cached_page() - Get the block map page from a page completion. 1316 * @completion: A vdo page completion whose callback has been called. 1317 * @page_ptr: A pointer to hold the page 1318 * 1319 * Return: VDO_SUCCESS or an error 1320 */ 1321 int vdo_get_cached_page(struct vdo_completion *completion, 1322 struct block_map_page **page_ptr) 1323 { 1324 int result; 1325 struct vdo_page_completion *vpc; 1326 1327 vpc = as_vdo_page_completion(completion); 1328 result = validate_completed_page(vpc, true); 1329 if (result == VDO_SUCCESS) 1330 *page_ptr = (struct block_map_page *) get_page_buffer(vpc->info); 1331 1332 return result; 1333 } 1334 1335 /** 1336 * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache. 1337 * @cache: The page cache. 1338 * 1339 * There must not be any dirty pages in the cache. 1340 * 1341 * Return: A success or error code. 1342 */ 1343 int vdo_invalidate_page_cache(struct vdo_page_cache *cache) 1344 { 1345 struct page_info *info; 1346 1347 assert_on_cache_thread(cache, __func__); 1348 1349 /* Make sure we don't throw away any dirty pages. */ 1350 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 1351 int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages"); 1352 1353 if (result != VDO_SUCCESS) 1354 return result; 1355 } 1356 1357 /* Reset the page map by re-allocating it. */ 1358 vdo_int_map_free(vdo_forget(cache->page_map)); 1359 return vdo_int_map_create(cache->page_count, &cache->page_map); 1360 } 1361 1362 /** 1363 * get_tree_page_by_index() - Get the tree page for a given height and page index. 1364 * @forest: The block map forest. 1365 * @root_index: The root index of the tree to search. 1366 * @height: The height in the tree. 1367 * @page_index: The page index. 1368 * 1369 * Return: The requested page. 1370 */ 1371 static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest, 1372 root_count_t root_index, 1373 height_t height, 1374 page_number_t page_index) 1375 { 1376 page_number_t offset = 0; 1377 size_t segment; 1378 1379 for (segment = 0; segment < forest->segments; segment++) { 1380 page_number_t border = forest->boundaries[segment].levels[height - 1]; 1381 1382 if (page_index < border) { 1383 struct block_map_tree *tree = &forest->trees[root_index]; 1384 1385 return &(tree->segments[segment].levels[height - 1][page_index - offset]); 1386 } 1387 1388 offset = border; 1389 } 1390 1391 return NULL; 1392 } 1393 1394 /* Get the page referred to by the lock's tree slot at its current height. */ 1395 static inline struct tree_page *get_tree_page(const struct block_map_zone *zone, 1396 const struct tree_lock *lock) 1397 { 1398 return get_tree_page_by_index(zone->block_map->forest, lock->root_index, 1399 lock->height, 1400 lock->tree_slots[lock->height].page_index); 1401 } 1402 1403 /** vdo_copy_valid_page() - Validate and copy a buffer to a page. */ 1404 bool vdo_copy_valid_page(char *buffer, nonce_t nonce, 1405 physical_block_number_t pbn, 1406 struct block_map_page *page) 1407 { 1408 struct block_map_page *loaded = (struct block_map_page *) buffer; 1409 enum block_map_page_validity validity = 1410 vdo_validate_block_map_page(loaded, nonce, pbn); 1411 1412 if (validity == VDO_BLOCK_MAP_PAGE_VALID) { 1413 memcpy(page, loaded, VDO_BLOCK_SIZE); 1414 return true; 1415 } 1416 1417 if (validity == VDO_BLOCK_MAP_PAGE_BAD) { 1418 vdo_log_error_strerror(VDO_BAD_PAGE, 1419 "Expected page %llu but got page %llu instead", 1420 (unsigned long long) pbn, 1421 (unsigned long long) vdo_get_block_map_page_pbn(loaded)); 1422 } 1423 1424 return false; 1425 } 1426 1427 /** 1428 * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within 1429 * a cyclic range of values from 0 to (modulus - 1). 1430 * @lower: The lowest value to accept. 1431 * @value: The value to check. 1432 * @upper: The highest value to accept. 1433 * @modulus: The size of the cyclic space, no more than 2^15. 1434 * 1435 * The value and both bounds must be smaller than the modulus. 1436 * 1437 * Return: true if the value is in range. 1438 */ 1439 static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus) 1440 { 1441 if (value < lower) 1442 value += modulus; 1443 if (upper < lower) 1444 upper += modulus; 1445 return (value <= upper); 1446 } 1447 1448 /** 1449 * is_not_older() - Check whether a generation is strictly older than some other generation in the 1450 * context of a zone's current generation range. 1451 * @zone: The zone in which to do the comparison. 1452 * @a: The generation in question. 1453 * @b: The generation to compare to. 1454 * 1455 * Return: true if generation @a is not strictly older than generation @b in the context of @zone 1456 */ 1457 static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b) 1458 { 1459 int result; 1460 1461 result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) && 1462 in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)), 1463 "generation(s) %u, %u are out of range [%u, %u]", 1464 a, b, zone->oldest_generation, zone->generation); 1465 if (result != VDO_SUCCESS) { 1466 enter_zone_read_only_mode(zone, result); 1467 return true; 1468 } 1469 1470 return in_cyclic_range(b, a, zone->generation, 1 << 8); 1471 } 1472 1473 static void release_generation(struct block_map_zone *zone, u8 generation) 1474 { 1475 int result; 1476 1477 result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0), 1478 "dirty page count underflow for generation %u", generation); 1479 if (result != VDO_SUCCESS) { 1480 enter_zone_read_only_mode(zone, result); 1481 return; 1482 } 1483 1484 zone->dirty_page_counts[generation]--; 1485 while ((zone->dirty_page_counts[zone->oldest_generation] == 0) && 1486 (zone->oldest_generation != zone->generation)) 1487 zone->oldest_generation++; 1488 } 1489 1490 static void set_generation(struct block_map_zone *zone, struct tree_page *page, 1491 u8 new_generation) 1492 { 1493 u32 new_count; 1494 int result; 1495 bool decrement_old = vdo_waiter_is_waiting(&page->waiter); 1496 u8 old_generation = page->generation; 1497 1498 if (decrement_old && (old_generation == new_generation)) 1499 return; 1500 1501 page->generation = new_generation; 1502 new_count = ++zone->dirty_page_counts[new_generation]; 1503 result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u", 1504 new_generation); 1505 if (result != VDO_SUCCESS) { 1506 enter_zone_read_only_mode(zone, result); 1507 return; 1508 } 1509 1510 if (decrement_old) 1511 release_generation(zone, old_generation); 1512 } 1513 1514 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio); 1515 1516 /* Implements waiter_callback_fn */ 1517 static void write_page_callback(struct vdo_waiter *waiter, void *context) 1518 { 1519 write_page(container_of(waiter, struct tree_page, waiter), context); 1520 } 1521 1522 static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone) 1523 { 1524 waiter->callback = write_page_callback; 1525 acquire_vio_from_pool(zone->vio_pool, waiter); 1526 } 1527 1528 /* Return: true if all possible generations were not already active */ 1529 static bool attempt_increment(struct block_map_zone *zone) 1530 { 1531 u8 generation = zone->generation + 1; 1532 1533 if (zone->oldest_generation == generation) 1534 return false; 1535 1536 zone->generation = generation; 1537 return true; 1538 } 1539 1540 /* Launches a flush if one is not already in progress. */ 1541 static void enqueue_page(struct tree_page *page, struct block_map_zone *zone) 1542 { 1543 if ((zone->flusher == NULL) && attempt_increment(zone)) { 1544 zone->flusher = page; 1545 acquire_vio(&page->waiter, zone); 1546 return; 1547 } 1548 1549 vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter); 1550 } 1551 1552 static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context) 1553 { 1554 struct tree_page *page = container_of(waiter, struct tree_page, waiter); 1555 struct write_if_not_dirtied_context *write_context = context; 1556 1557 if (page->generation == write_context->generation) { 1558 acquire_vio(waiter, write_context->zone); 1559 return; 1560 } 1561 1562 enqueue_page(page, write_context->zone); 1563 } 1564 1565 static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio) 1566 { 1567 return_vio_to_pool(vio); 1568 check_for_drain_complete(zone); 1569 } 1570 1571 /* This callback is registered in write_initialized_page(). */ 1572 static void finish_page_write(struct vdo_completion *completion) 1573 { 1574 bool dirty; 1575 struct vio *vio = as_vio(completion); 1576 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1577 struct tree_page *page = completion->parent; 1578 struct block_map_zone *zone = pooled->context; 1579 1580 vdo_release_recovery_journal_block_reference(zone->block_map->journal, 1581 page->writing_recovery_lock, 1582 VDO_ZONE_TYPE_LOGICAL, 1583 zone->zone_number); 1584 1585 dirty = (page->writing_generation != page->generation); 1586 release_generation(zone, page->writing_generation); 1587 page->writing = false; 1588 1589 if (zone->flusher == page) { 1590 struct write_if_not_dirtied_context context = { 1591 .zone = zone, 1592 .generation = page->writing_generation, 1593 }; 1594 1595 vdo_waitq_notify_all_waiters(&zone->flush_waiters, 1596 write_page_if_not_dirtied, &context); 1597 if (dirty && attempt_increment(zone)) { 1598 write_page(page, pooled); 1599 return; 1600 } 1601 1602 zone->flusher = NULL; 1603 } 1604 1605 if (dirty) { 1606 enqueue_page(page, zone); 1607 } else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) && 1608 attempt_increment(zone)) { 1609 zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters), 1610 struct tree_page, waiter); 1611 write_page(zone->flusher, pooled); 1612 return; 1613 } 1614 1615 return_to_pool(zone, pooled); 1616 } 1617 1618 static void handle_write_error(struct vdo_completion *completion) 1619 { 1620 int result = completion->result; 1621 struct vio *vio = as_vio(completion); 1622 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1623 struct block_map_zone *zone = pooled->context; 1624 1625 vio_record_metadata_io_error(vio); 1626 enter_zone_read_only_mode(zone, result); 1627 return_to_pool(zone, pooled); 1628 } 1629 1630 static void write_page_endio(struct bio *bio); 1631 1632 static void write_initialized_page(struct vdo_completion *completion) 1633 { 1634 struct vio *vio = as_vio(completion); 1635 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1636 struct block_map_zone *zone = pooled->context; 1637 struct tree_page *tree_page = completion->parent; 1638 struct block_map_page *page = (struct block_map_page *) vio->data; 1639 blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO; 1640 1641 /* 1642 * Now that we know the page has been written at least once, mark the copy we are writing 1643 * as initialized. 1644 */ 1645 page->header.initialized = true; 1646 1647 if (zone->flusher == tree_page) 1648 operation |= REQ_PREFLUSH; 1649 1650 vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page), 1651 write_page_endio, handle_write_error, 1652 operation); 1653 } 1654 1655 static void write_page_endio(struct bio *bio) 1656 { 1657 struct pooled_vio *vio = bio->bi_private; 1658 struct block_map_zone *zone = vio->context; 1659 struct block_map_page *page = (struct block_map_page *) vio->vio.data; 1660 1661 continue_vio_after_io(&vio->vio, 1662 (page->header.initialized ? 1663 finish_page_write : write_initialized_page), 1664 zone->thread_id); 1665 } 1666 1667 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio) 1668 { 1669 struct vdo_completion *completion = &vio->vio.completion; 1670 struct block_map_zone *zone = vio->context; 1671 struct block_map_page *page = vdo_as_block_map_page(tree_page); 1672 1673 if ((zone->flusher != tree_page) && 1674 is_not_older(zone, tree_page->generation, zone->generation)) { 1675 /* 1676 * This page was re-dirtied after the last flush was issued, hence we need to do 1677 * another flush. 1678 */ 1679 enqueue_page(tree_page, zone); 1680 return_to_pool(zone, vio); 1681 return; 1682 } 1683 1684 completion->parent = tree_page; 1685 memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE); 1686 completion->callback_thread_id = zone->thread_id; 1687 1688 tree_page->writing = true; 1689 tree_page->writing_generation = tree_page->generation; 1690 tree_page->writing_recovery_lock = tree_page->recovery_lock; 1691 1692 /* Clear this now so that we know this page is not on any dirty list. */ 1693 tree_page->recovery_lock = 0; 1694 1695 /* 1696 * We've already copied the page into the vio which will write it, so if it was not yet 1697 * initialized, the first write will indicate that (for torn write protection). It is now 1698 * safe to mark it as initialized in memory since if the write fails, the in memory state 1699 * will become irrelevant. 1700 */ 1701 if (page->header.initialized) { 1702 write_initialized_page(completion); 1703 return; 1704 } 1705 1706 page->header.initialized = true; 1707 vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page), 1708 write_page_endio, handle_write_error, 1709 REQ_OP_WRITE | REQ_PRIO); 1710 } 1711 1712 /* Release a lock on a page which was being loaded or allocated. */ 1713 static void release_page_lock(struct data_vio *data_vio, char *what) 1714 { 1715 struct block_map_zone *zone; 1716 struct tree_lock *lock_holder; 1717 struct tree_lock *lock = &data_vio->tree_lock; 1718 1719 VDO_ASSERT_LOG_ONLY(lock->locked, 1720 "release of unlocked block map page %s for key %llu in tree %u", 1721 what, (unsigned long long) lock->key, lock->root_index); 1722 1723 zone = data_vio->logical.zone->block_map_zone; 1724 lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key); 1725 VDO_ASSERT_LOG_ONLY((lock_holder == lock), 1726 "block map page %s mismatch for key %llu in tree %u", 1727 what, (unsigned long long) lock->key, lock->root_index); 1728 lock->locked = false; 1729 } 1730 1731 static void finish_lookup(struct data_vio *data_vio, int result) 1732 { 1733 data_vio->tree_lock.height = 0; 1734 1735 --data_vio->logical.zone->block_map_zone->active_lookups; 1736 1737 set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot); 1738 data_vio->vio.completion.error_handler = handle_data_vio_error; 1739 continue_data_vio_with_error(data_vio, result); 1740 } 1741 1742 static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context) 1743 { 1744 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1745 int result = *((int *) context); 1746 1747 if (!data_vio->write) { 1748 if (result == VDO_NO_SPACE) 1749 result = VDO_SUCCESS; 1750 } else if (result != VDO_NO_SPACE) { 1751 result = VDO_READ_ONLY; 1752 } 1753 1754 finish_lookup(data_vio, result); 1755 } 1756 1757 static void abort_lookup(struct data_vio *data_vio, int result, char *what) 1758 { 1759 if (result != VDO_NO_SPACE) 1760 enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result); 1761 1762 if (data_vio->tree_lock.locked) { 1763 release_page_lock(data_vio, what); 1764 vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters, 1765 abort_lookup_for_waiter, 1766 &result); 1767 } 1768 1769 finish_lookup(data_vio, result); 1770 } 1771 1772 static void abort_load(struct data_vio *data_vio, int result) 1773 { 1774 abort_lookup(data_vio, result, "load"); 1775 } 1776 1777 static bool __must_check is_invalid_tree_entry(const struct vdo *vdo, 1778 const struct data_location *mapping, 1779 height_t height) 1780 { 1781 if (!vdo_is_valid_location(mapping) || 1782 vdo_is_state_compressed(mapping->state) || 1783 (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK))) 1784 return true; 1785 1786 /* Roots aren't physical data blocks, so we can't check their PBNs. */ 1787 if (height == VDO_BLOCK_MAP_TREE_HEIGHT) 1788 return false; 1789 1790 return !vdo_is_physical_data_block(vdo->depot, mapping->pbn); 1791 } 1792 1793 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio); 1794 static void allocate_block_map_page(struct block_map_zone *zone, 1795 struct data_vio *data_vio); 1796 1797 static void continue_with_loaded_page(struct data_vio *data_vio, 1798 struct block_map_page *page) 1799 { 1800 struct tree_lock *lock = &data_vio->tree_lock; 1801 struct block_map_tree_slot slot = lock->tree_slots[lock->height]; 1802 struct data_location mapping = 1803 vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]); 1804 1805 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) { 1806 vdo_log_error_strerror(VDO_BAD_MAPPING, 1807 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u", 1808 (unsigned long long) mapping.pbn, mapping.state, 1809 lock->tree_slots[lock->height - 1].page_index, 1810 lock->height - 1); 1811 abort_load(data_vio, VDO_BAD_MAPPING); 1812 return; 1813 } 1814 1815 if (!vdo_is_mapped_location(&mapping)) { 1816 /* The page we need is unallocated */ 1817 allocate_block_map_page(data_vio->logical.zone->block_map_zone, 1818 data_vio); 1819 return; 1820 } 1821 1822 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn; 1823 if (lock->height == 1) { 1824 finish_lookup(data_vio, VDO_SUCCESS); 1825 return; 1826 } 1827 1828 /* We know what page we need to load next */ 1829 load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio); 1830 } 1831 1832 static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context) 1833 { 1834 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1835 1836 data_vio->tree_lock.height--; 1837 continue_with_loaded_page(data_vio, context); 1838 } 1839 1840 static void finish_block_map_page_load(struct vdo_completion *completion) 1841 { 1842 physical_block_number_t pbn; 1843 struct tree_page *tree_page; 1844 struct block_map_page *page; 1845 nonce_t nonce; 1846 struct vio *vio = as_vio(completion); 1847 struct pooled_vio *pooled = vio_as_pooled_vio(vio); 1848 struct data_vio *data_vio = completion->parent; 1849 struct block_map_zone *zone = pooled->context; 1850 struct tree_lock *tree_lock = &data_vio->tree_lock; 1851 1852 tree_lock->height--; 1853 pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn; 1854 tree_page = get_tree_page(zone, tree_lock); 1855 page = (struct block_map_page *) tree_page->page_buffer; 1856 nonce = zone->block_map->nonce; 1857 1858 if (!vdo_copy_valid_page(vio->data, nonce, pbn, page)) 1859 vdo_format_block_map_page(page, nonce, pbn, false); 1860 return_vio_to_pool(pooled); 1861 1862 /* Release our claim to the load and wake any waiters */ 1863 release_page_lock(data_vio, "load"); 1864 vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page); 1865 continue_with_loaded_page(data_vio, page); 1866 } 1867 1868 static void handle_io_error(struct vdo_completion *completion) 1869 { 1870 int result = completion->result; 1871 struct vio *vio = as_vio(completion); 1872 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1873 struct data_vio *data_vio = completion->parent; 1874 1875 vio_record_metadata_io_error(vio); 1876 return_vio_to_pool(pooled); 1877 abort_load(data_vio, result); 1878 } 1879 1880 static void load_page_endio(struct bio *bio) 1881 { 1882 struct vio *vio = bio->bi_private; 1883 struct data_vio *data_vio = vio->completion.parent; 1884 1885 continue_vio_after_io(vio, finish_block_map_page_load, 1886 data_vio->logical.zone->thread_id); 1887 } 1888 1889 static void load_page(struct vdo_waiter *waiter, void *context) 1890 { 1891 struct pooled_vio *pooled = context; 1892 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1893 struct tree_lock *lock = &data_vio->tree_lock; 1894 physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn; 1895 1896 pooled->vio.completion.parent = data_vio; 1897 vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio, 1898 handle_io_error, REQ_OP_READ | REQ_PRIO); 1899 } 1900 1901 /* 1902 * If the page is already locked, queue up to wait for the lock to be released. If the lock is 1903 * acquired, @data_vio->tree_lock.locked will be true. 1904 */ 1905 static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio) 1906 { 1907 int result; 1908 struct tree_lock *lock_holder; 1909 struct tree_lock *lock = &data_vio->tree_lock; 1910 height_t height = lock->height; 1911 struct block_map_tree_slot tree_slot = lock->tree_slots[height]; 1912 union page_key key; 1913 1914 key.descriptor = (struct page_descriptor) { 1915 .root_index = lock->root_index, 1916 .height = height, 1917 .page_index = tree_slot.page_index, 1918 .slot = tree_slot.block_map_slot.slot, 1919 }; 1920 lock->key = key.key; 1921 1922 result = vdo_int_map_put(zone->loading_pages, lock->key, 1923 lock, false, (void **) &lock_holder); 1924 if (result != VDO_SUCCESS) 1925 return result; 1926 1927 if (lock_holder == NULL) { 1928 /* We got the lock */ 1929 data_vio->tree_lock.locked = true; 1930 return VDO_SUCCESS; 1931 } 1932 1933 /* Someone else is loading or allocating the page we need */ 1934 vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter); 1935 return VDO_SUCCESS; 1936 } 1937 1938 /* Load a block map tree page from disk, for the next level in the data vio tree lock. */ 1939 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio) 1940 { 1941 int result; 1942 1943 result = attempt_page_lock(zone, data_vio); 1944 if (result != VDO_SUCCESS) { 1945 abort_load(data_vio, result); 1946 return; 1947 } 1948 1949 if (data_vio->tree_lock.locked) { 1950 data_vio->waiter.callback = load_page; 1951 acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter); 1952 } 1953 } 1954 1955 static void allocation_failure(struct vdo_completion *completion) 1956 { 1957 struct data_vio *data_vio = as_data_vio(completion); 1958 1959 if (vdo_requeue_completion_if_needed(completion, 1960 data_vio->logical.zone->thread_id)) 1961 return; 1962 1963 abort_lookup(data_vio, completion->result, "allocation"); 1964 } 1965 1966 static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context) 1967 { 1968 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1969 struct tree_lock *tree_lock = &data_vio->tree_lock; 1970 physical_block_number_t pbn = *((physical_block_number_t *) context); 1971 1972 tree_lock->height--; 1973 data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn; 1974 1975 if (tree_lock->height == 0) { 1976 finish_lookup(data_vio, VDO_SUCCESS); 1977 return; 1978 } 1979 1980 allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio); 1981 } 1982 1983 /** expire_oldest_list() - Expire the oldest list. */ 1984 static void expire_oldest_list(struct dirty_lists *dirty_lists) 1985 { 1986 block_count_t i = dirty_lists->offset++; 1987 1988 dirty_lists->oldest_period++; 1989 if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) { 1990 list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE], 1991 &dirty_lists->expired[VDO_TREE_PAGE]); 1992 } 1993 1994 if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) { 1995 list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE], 1996 &dirty_lists->expired[VDO_CACHE_PAGE]); 1997 } 1998 1999 if (dirty_lists->offset == dirty_lists->maximum_age) 2000 dirty_lists->offset = 0; 2001 } 2002 2003 2004 /** update_period() - Update the dirty_lists period if necessary. */ 2005 static void update_period(struct dirty_lists *dirty, sequence_number_t period) 2006 { 2007 while (dirty->next_period <= period) { 2008 if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age) 2009 expire_oldest_list(dirty); 2010 dirty->next_period++; 2011 } 2012 } 2013 2014 /** write_expired_elements() - Write out the expired list. */ 2015 static void write_expired_elements(struct block_map_zone *zone) 2016 { 2017 struct tree_page *page, *ttmp; 2018 struct page_info *info, *ptmp; 2019 struct list_head *expired; 2020 u8 generation = zone->generation; 2021 2022 expired = &zone->dirty_lists->expired[VDO_TREE_PAGE]; 2023 list_for_each_entry_safe(page, ttmp, expired, entry) { 2024 int result; 2025 2026 list_del_init(&page->entry); 2027 2028 result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter), 2029 "Newly expired page not already waiting to write"); 2030 if (result != VDO_SUCCESS) { 2031 enter_zone_read_only_mode(zone, result); 2032 continue; 2033 } 2034 2035 set_generation(zone, page, generation); 2036 if (!page->writing) 2037 enqueue_page(page, zone); 2038 } 2039 2040 expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE]; 2041 list_for_each_entry_safe(info, ptmp, expired, state_entry) { 2042 list_del_init(&info->state_entry); 2043 schedule_page_save(info); 2044 } 2045 2046 save_pages(&zone->page_cache); 2047 } 2048 2049 /** 2050 * add_to_dirty_lists() - Add an element to the dirty lists. 2051 * @zone: The zone in which we are operating. 2052 * @entry: The list entry of the element to add. 2053 * @type: The type of page. 2054 * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty. 2055 * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a 2056 * lock. 2057 */ 2058 static void add_to_dirty_lists(struct block_map_zone *zone, 2059 struct list_head *entry, 2060 enum block_map_page_type type, 2061 sequence_number_t old_period, 2062 sequence_number_t new_period) 2063 { 2064 struct dirty_lists *dirty_lists = zone->dirty_lists; 2065 2066 if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period))) 2067 return; 2068 2069 if (new_period < dirty_lists->oldest_period) { 2070 list_move_tail(entry, &dirty_lists->expired[type]); 2071 } else { 2072 update_period(dirty_lists, new_period); 2073 list_move_tail(entry, 2074 &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]); 2075 } 2076 2077 write_expired_elements(zone); 2078 } 2079 2080 /* 2081 * Record the allocation in the tree and wake any waiters now that the write lock has been 2082 * released. 2083 */ 2084 static void finish_block_map_allocation(struct vdo_completion *completion) 2085 { 2086 physical_block_number_t pbn; 2087 struct tree_page *tree_page; 2088 struct block_map_page *page; 2089 sequence_number_t old_lock; 2090 struct data_vio *data_vio = as_data_vio(completion); 2091 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 2092 struct tree_lock *tree_lock = &data_vio->tree_lock; 2093 height_t height = tree_lock->height; 2094 2095 assert_data_vio_in_logical_zone(data_vio); 2096 2097 tree_page = get_tree_page(zone, tree_lock); 2098 pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn; 2099 2100 /* Record the allocation. */ 2101 page = (struct block_map_page *) tree_page->page_buffer; 2102 old_lock = tree_page->recovery_lock; 2103 vdo_update_block_map_page(page, data_vio, pbn, 2104 VDO_MAPPING_STATE_UNCOMPRESSED, 2105 &tree_page->recovery_lock); 2106 2107 if (vdo_waiter_is_waiting(&tree_page->waiter)) { 2108 /* This page is waiting to be written out. */ 2109 if (zone->flusher != tree_page) { 2110 /* 2111 * The outstanding flush won't cover the update we just made, 2112 * so mark the page as needing another flush. 2113 */ 2114 set_generation(zone, tree_page, zone->generation); 2115 } 2116 } else { 2117 /* Put the page on a dirty list */ 2118 if (old_lock == 0) 2119 INIT_LIST_HEAD(&tree_page->entry); 2120 add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE, 2121 old_lock, tree_page->recovery_lock); 2122 } 2123 2124 tree_lock->height--; 2125 if (height > 1) { 2126 /* Format the interior node we just allocated (in memory). */ 2127 tree_page = get_tree_page(zone, tree_lock); 2128 vdo_format_block_map_page(tree_page->page_buffer, 2129 zone->block_map->nonce, 2130 pbn, false); 2131 } 2132 2133 /* Release our claim to the allocation and wake any waiters */ 2134 release_page_lock(data_vio, "allocation"); 2135 vdo_waitq_notify_all_waiters(&tree_lock->waiters, 2136 continue_allocation_for_waiter, &pbn); 2137 if (tree_lock->height == 0) { 2138 finish_lookup(data_vio, VDO_SUCCESS); 2139 return; 2140 } 2141 2142 allocate_block_map_page(zone, data_vio); 2143 } 2144 2145 static void release_block_map_write_lock(struct vdo_completion *completion) 2146 { 2147 struct data_vio *data_vio = as_data_vio(completion); 2148 2149 assert_data_vio_in_allocated_zone(data_vio); 2150 2151 release_data_vio_allocation_lock(data_vio, true); 2152 launch_data_vio_logical_callback(data_vio, finish_block_map_allocation); 2153 } 2154 2155 /* 2156 * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled, 2157 * to prevent deduplication against the block after we release the write lock on it, but before we 2158 * write out the page. 2159 */ 2160 static void set_block_map_page_reference_count(struct vdo_completion *completion) 2161 { 2162 struct data_vio *data_vio = as_data_vio(completion); 2163 2164 assert_data_vio_in_allocated_zone(data_vio); 2165 2166 completion->callback = release_block_map_write_lock; 2167 vdo_modify_reference_count(completion, &data_vio->increment_updater); 2168 } 2169 2170 static void journal_block_map_allocation(struct vdo_completion *completion) 2171 { 2172 struct data_vio *data_vio = as_data_vio(completion); 2173 2174 assert_data_vio_in_journal_zone(data_vio); 2175 2176 set_data_vio_allocated_zone_callback(data_vio, 2177 set_block_map_page_reference_count); 2178 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio); 2179 } 2180 2181 static void allocate_block(struct vdo_completion *completion) 2182 { 2183 struct data_vio *data_vio = as_data_vio(completion); 2184 struct tree_lock *lock = &data_vio->tree_lock; 2185 physical_block_number_t pbn; 2186 2187 assert_data_vio_in_allocated_zone(data_vio); 2188 2189 if (!vdo_allocate_block_in_zone(data_vio)) 2190 return; 2191 2192 pbn = data_vio->allocation.pbn; 2193 lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn; 2194 data_vio->increment_updater = (struct reference_updater) { 2195 .operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING, 2196 .increment = true, 2197 .zpbn = { 2198 .pbn = pbn, 2199 .state = VDO_MAPPING_STATE_UNCOMPRESSED, 2200 }, 2201 .lock = data_vio->allocation.lock, 2202 }; 2203 2204 launch_data_vio_journal_callback(data_vio, journal_block_map_allocation); 2205 } 2206 2207 static void allocate_block_map_page(struct block_map_zone *zone, 2208 struct data_vio *data_vio) 2209 { 2210 int result; 2211 2212 if (!data_vio->write || data_vio->is_discard) { 2213 /* This is a pure read or a discard, so there's nothing left to do here. */ 2214 finish_lookup(data_vio, VDO_SUCCESS); 2215 return; 2216 } 2217 2218 result = attempt_page_lock(zone, data_vio); 2219 if (result != VDO_SUCCESS) { 2220 abort_lookup(data_vio, result, "allocation"); 2221 return; 2222 } 2223 2224 if (!data_vio->tree_lock.locked) 2225 return; 2226 2227 data_vio_allocate_data_block(data_vio, VIO_BLOCK_MAP_WRITE_LOCK, 2228 allocate_block, allocation_failure); 2229 } 2230 2231 /** 2232 * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio 2233 * resides and cache that result in the data_vio. 2234 * @data_vio: The data vio. 2235 * 2236 * All ancestors in the tree will be allocated or loaded, as needed. 2237 */ 2238 void vdo_find_block_map_slot(struct data_vio *data_vio) 2239 { 2240 page_number_t page_index; 2241 struct block_map_tree_slot tree_slot; 2242 struct data_location mapping; 2243 struct block_map_page *page = NULL; 2244 struct tree_lock *lock = &data_vio->tree_lock; 2245 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 2246 2247 zone->active_lookups++; 2248 if (vdo_is_state_draining(&zone->state)) { 2249 finish_lookup(data_vio, VDO_SHUTTING_DOWN); 2250 return; 2251 } 2252 2253 lock->tree_slots[0].block_map_slot.slot = 2254 data_vio->logical.lbn % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2255 page_index = (lock->tree_slots[0].page_index / zone->block_map->root_count); 2256 tree_slot = (struct block_map_tree_slot) { 2257 .page_index = page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE, 2258 .block_map_slot = { 2259 .pbn = 0, 2260 .slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE, 2261 }, 2262 }; 2263 2264 for (lock->height = 1; lock->height <= VDO_BLOCK_MAP_TREE_HEIGHT; lock->height++) { 2265 physical_block_number_t pbn; 2266 2267 lock->tree_slots[lock->height] = tree_slot; 2268 page = (struct block_map_page *) (get_tree_page(zone, lock)->page_buffer); 2269 pbn = vdo_get_block_map_page_pbn(page); 2270 if (pbn != VDO_ZERO_BLOCK) { 2271 lock->tree_slots[lock->height].block_map_slot.pbn = pbn; 2272 break; 2273 } 2274 2275 /* Calculate the index and slot for the next level. */ 2276 tree_slot.block_map_slot.slot = 2277 tree_slot.page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2278 tree_slot.page_index = tree_slot.page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2279 } 2280 2281 /* The page at this height has been allocated and loaded. */ 2282 mapping = vdo_unpack_block_map_entry(&page->entries[tree_slot.block_map_slot.slot]); 2283 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) { 2284 vdo_log_error_strerror(VDO_BAD_MAPPING, 2285 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u", 2286 (unsigned long long) mapping.pbn, mapping.state, 2287 lock->tree_slots[lock->height - 1].page_index, 2288 lock->height - 1); 2289 abort_load(data_vio, VDO_BAD_MAPPING); 2290 return; 2291 } 2292 2293 if (!vdo_is_mapped_location(&mapping)) { 2294 /* The page we want one level down has not been allocated, so allocate it. */ 2295 allocate_block_map_page(zone, data_vio); 2296 return; 2297 } 2298 2299 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn; 2300 if (lock->height == 1) { 2301 /* This is the ultimate block map page, so we're done */ 2302 finish_lookup(data_vio, VDO_SUCCESS); 2303 return; 2304 } 2305 2306 /* We know what page we need to load. */ 2307 load_block_map_page(zone, data_vio); 2308 } 2309 2310 /* 2311 * Find the PBN of a leaf block map page. This method may only be used after all allocated tree 2312 * pages have been loaded, otherwise, it may give the wrong answer (0). 2313 */ 2314 physical_block_number_t vdo_find_block_map_page_pbn(struct block_map *map, 2315 page_number_t page_number) 2316 { 2317 struct data_location mapping; 2318 struct tree_page *tree_page; 2319 struct block_map_page *page; 2320 root_count_t root_index = page_number % map->root_count; 2321 page_number_t page_index = page_number / map->root_count; 2322 slot_number_t slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2323 2324 page_index /= VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2325 2326 tree_page = get_tree_page_by_index(map->forest, root_index, 1, page_index); 2327 page = (struct block_map_page *) tree_page->page_buffer; 2328 if (!page->header.initialized) 2329 return VDO_ZERO_BLOCK; 2330 2331 mapping = vdo_unpack_block_map_entry(&page->entries[slot]); 2332 if (!vdo_is_valid_location(&mapping) || vdo_is_state_compressed(mapping.state)) 2333 return VDO_ZERO_BLOCK; 2334 return mapping.pbn; 2335 } 2336 2337 /* 2338 * Write a tree page or indicate that it has been re-dirtied if it is already being written. This 2339 * method is used when correcting errors in the tree during read-only rebuild. 2340 */ 2341 void vdo_write_tree_page(struct tree_page *page, struct block_map_zone *zone) 2342 { 2343 bool waiting = vdo_waiter_is_waiting(&page->waiter); 2344 2345 if (waiting && (zone->flusher == page)) 2346 return; 2347 2348 set_generation(zone, page, zone->generation); 2349 if (waiting || page->writing) 2350 return; 2351 2352 enqueue_page(page, zone); 2353 } 2354 2355 static int make_segment(struct forest *old_forest, block_count_t new_pages, 2356 struct boundary *new_boundary, struct forest *forest) 2357 { 2358 size_t index = (old_forest == NULL) ? 0 : old_forest->segments; 2359 struct tree_page *page_ptr; 2360 page_count_t segment_sizes[VDO_BLOCK_MAP_TREE_HEIGHT]; 2361 height_t height; 2362 root_count_t root; 2363 int result; 2364 2365 forest->segments = index + 1; 2366 2367 result = vdo_allocate(forest->segments, struct boundary, 2368 "forest boundary array", &forest->boundaries); 2369 if (result != VDO_SUCCESS) 2370 return result; 2371 2372 result = vdo_allocate(forest->segments, struct tree_page *, 2373 "forest page pointers", &forest->pages); 2374 if (result != VDO_SUCCESS) 2375 return result; 2376 2377 result = vdo_allocate(new_pages, struct tree_page, 2378 "new forest pages", &forest->pages[index]); 2379 if (result != VDO_SUCCESS) 2380 return result; 2381 2382 if (index > 0) { 2383 memcpy(forest->boundaries, old_forest->boundaries, 2384 index * sizeof(struct boundary)); 2385 memcpy(forest->pages, old_forest->pages, 2386 index * sizeof(struct tree_page *)); 2387 } 2388 2389 memcpy(&(forest->boundaries[index]), new_boundary, sizeof(struct boundary)); 2390 2391 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) { 2392 segment_sizes[height] = new_boundary->levels[height]; 2393 if (index > 0) 2394 segment_sizes[height] -= old_forest->boundaries[index - 1].levels[height]; 2395 } 2396 2397 page_ptr = forest->pages[index]; 2398 for (root = 0; root < forest->map->root_count; root++) { 2399 struct block_map_tree_segment *segment; 2400 struct block_map_tree *tree = &(forest->trees[root]); 2401 height_t height; 2402 2403 int result = vdo_allocate(forest->segments, 2404 struct block_map_tree_segment, 2405 "tree root segments", &tree->segments); 2406 if (result != VDO_SUCCESS) 2407 return result; 2408 2409 if (index > 0) { 2410 memcpy(tree->segments, old_forest->trees[root].segments, 2411 index * sizeof(struct block_map_tree_segment)); 2412 } 2413 2414 segment = &(tree->segments[index]); 2415 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) { 2416 if (segment_sizes[height] == 0) 2417 continue; 2418 2419 segment->levels[height] = page_ptr; 2420 if (height == (VDO_BLOCK_MAP_TREE_HEIGHT - 1)) { 2421 /* Record the root. */ 2422 struct block_map_page *page = 2423 vdo_format_block_map_page(page_ptr->page_buffer, 2424 forest->map->nonce, 2425 VDO_INVALID_PBN, true); 2426 page->entries[0] = 2427 vdo_pack_block_map_entry(forest->map->root_origin + root, 2428 VDO_MAPPING_STATE_UNCOMPRESSED); 2429 } 2430 page_ptr += segment_sizes[height]; 2431 } 2432 } 2433 2434 return VDO_SUCCESS; 2435 } 2436 2437 static void deforest(struct forest *forest, size_t first_page_segment) 2438 { 2439 root_count_t root; 2440 2441 if (forest->pages != NULL) { 2442 size_t segment; 2443 2444 for (segment = first_page_segment; segment < forest->segments; segment++) 2445 vdo_free(forest->pages[segment]); 2446 vdo_free(forest->pages); 2447 } 2448 2449 for (root = 0; root < forest->map->root_count; root++) 2450 vdo_free(forest->trees[root].segments); 2451 2452 vdo_free(forest->boundaries); 2453 vdo_free(forest); 2454 } 2455 2456 /** 2457 * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if 2458 * there is one. 2459 * @map: The block map. 2460 * @entries: The number of entries the block map will hold. 2461 * 2462 * Return: VDO_SUCCESS or an error. 2463 */ 2464 static int make_forest(struct block_map *map, block_count_t entries) 2465 { 2466 struct forest *forest, *old_forest = map->forest; 2467 struct boundary new_boundary, *old_boundary = NULL; 2468 block_count_t new_pages; 2469 int result; 2470 2471 if (old_forest != NULL) 2472 old_boundary = &(old_forest->boundaries[old_forest->segments - 1]); 2473 2474 new_pages = vdo_compute_new_forest_pages(map->root_count, old_boundary, 2475 entries, &new_boundary); 2476 if (new_pages == 0) { 2477 map->next_entry_count = entries; 2478 return VDO_SUCCESS; 2479 } 2480 2481 result = vdo_allocate_extended(struct forest, map->root_count, 2482 struct block_map_tree, __func__, 2483 &forest); 2484 if (result != VDO_SUCCESS) 2485 return result; 2486 2487 forest->map = map; 2488 result = make_segment(old_forest, new_pages, &new_boundary, forest); 2489 if (result != VDO_SUCCESS) { 2490 deforest(forest, forest->segments - 1); 2491 return result; 2492 } 2493 2494 map->next_forest = forest; 2495 map->next_entry_count = entries; 2496 return VDO_SUCCESS; 2497 } 2498 2499 /** 2500 * replace_forest() - Replace a block_map's forest with the already-prepared larger forest. 2501 * @map: The block map. 2502 */ 2503 static void replace_forest(struct block_map *map) 2504 { 2505 if (map->next_forest != NULL) { 2506 if (map->forest != NULL) 2507 deforest(map->forest, map->forest->segments); 2508 map->forest = vdo_forget(map->next_forest); 2509 } 2510 2511 map->entry_count = map->next_entry_count; 2512 map->next_entry_count = 0; 2513 } 2514 2515 /** 2516 * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the 2517 * traversal. 2518 * @cursor: The cursor to complete. 2519 */ 2520 static void finish_cursor(struct cursor *cursor) 2521 { 2522 struct cursors *cursors = cursor->parent; 2523 struct vdo_completion *completion = cursors->completion; 2524 2525 return_vio_to_pool(vdo_forget(cursor->vio)); 2526 if (--cursors->active_roots > 0) 2527 return; 2528 2529 vdo_free(cursors); 2530 2531 vdo_finish_completion(completion); 2532 } 2533 2534 static void traverse(struct cursor *cursor); 2535 2536 /** 2537 * continue_traversal() - Continue traversing a block map tree. 2538 * @completion: The VIO doing a read or write. 2539 */ 2540 static void continue_traversal(struct vdo_completion *completion) 2541 { 2542 vio_record_metadata_io_error(as_vio(completion)); 2543 traverse(completion->parent); 2544 } 2545 2546 /** 2547 * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded. 2548 * @completion: The VIO doing the read. 2549 */ 2550 static void finish_traversal_load(struct vdo_completion *completion) 2551 { 2552 struct cursor *cursor = completion->parent; 2553 height_t height = cursor->height; 2554 struct cursor_level *level = &cursor->levels[height]; 2555 struct tree_page *tree_page = 2556 &(cursor->tree->segments[0].levels[height][level->page_index]); 2557 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer; 2558 2559 vdo_copy_valid_page(cursor->vio->vio.data, 2560 cursor->parent->zone->block_map->nonce, 2561 pbn_from_vio_bio(cursor->vio->vio.bio), page); 2562 traverse(cursor); 2563 } 2564 2565 static void traversal_endio(struct bio *bio) 2566 { 2567 struct vio *vio = bio->bi_private; 2568 struct cursor *cursor = vio->completion.parent; 2569 2570 continue_vio_after_io(vio, finish_traversal_load, 2571 cursor->parent->zone->thread_id); 2572 } 2573 2574 /** 2575 * traverse() - Traverse a single block map tree. 2576 * @cursor: A cursor tracking traversal progress. 2577 * 2578 * This is the recursive heart of the traversal process. 2579 */ 2580 static void traverse(struct cursor *cursor) 2581 { 2582 for (; cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT; cursor->height++) { 2583 height_t height = cursor->height; 2584 struct cursor_level *level = &cursor->levels[height]; 2585 struct tree_page *tree_page = 2586 &(cursor->tree->segments[0].levels[height][level->page_index]); 2587 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer; 2588 2589 if (!page->header.initialized) 2590 continue; 2591 2592 for (; level->slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; level->slot++) { 2593 struct cursor_level *next_level; 2594 page_number_t entry_index = 2595 (VDO_BLOCK_MAP_ENTRIES_PER_PAGE * level->page_index) + level->slot; 2596 struct data_location location = 2597 vdo_unpack_block_map_entry(&page->entries[level->slot]); 2598 2599 if (!vdo_is_valid_location(&location)) { 2600 /* This entry is invalid, so remove it from the page. */ 2601 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2602 vdo_write_tree_page(tree_page, cursor->parent->zone); 2603 continue; 2604 } 2605 2606 if (!vdo_is_mapped_location(&location)) 2607 continue; 2608 2609 /* Erase mapped entries past the end of the logical space. */ 2610 if (entry_index >= cursor->boundary.levels[height]) { 2611 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2612 vdo_write_tree_page(tree_page, cursor->parent->zone); 2613 continue; 2614 } 2615 2616 if (cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT - 1) { 2617 int result = cursor->parent->entry_callback(location.pbn, 2618 cursor->parent->completion); 2619 if (result != VDO_SUCCESS) { 2620 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2621 vdo_write_tree_page(tree_page, cursor->parent->zone); 2622 continue; 2623 } 2624 } 2625 2626 if (cursor->height == 0) 2627 continue; 2628 2629 cursor->height--; 2630 next_level = &cursor->levels[cursor->height]; 2631 next_level->page_index = entry_index; 2632 next_level->slot = 0; 2633 level->slot++; 2634 vdo_submit_metadata_vio(&cursor->vio->vio, location.pbn, 2635 traversal_endio, continue_traversal, 2636 REQ_OP_READ | REQ_PRIO); 2637 return; 2638 } 2639 } 2640 2641 finish_cursor(cursor); 2642 } 2643 2644 /** 2645 * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with 2646 * which to load pages. 2647 * @waiter: The parent of the cursor to launch. 2648 * @context: The pooled_vio just acquired. 2649 * 2650 * Implements waiter_callback_fn. 2651 */ 2652 static void launch_cursor(struct vdo_waiter *waiter, void *context) 2653 { 2654 struct cursor *cursor = container_of(waiter, struct cursor, waiter); 2655 struct pooled_vio *pooled = context; 2656 2657 cursor->vio = pooled; 2658 pooled->vio.completion.parent = cursor; 2659 pooled->vio.completion.callback_thread_id = cursor->parent->zone->thread_id; 2660 traverse(cursor); 2661 } 2662 2663 /** 2664 * compute_boundary() - Compute the number of pages used at each level of the given root's tree. 2665 * @map: The block map. 2666 * @root_index: The tree root index. 2667 * 2668 * Return: The list of page counts as a boundary structure. 2669 */ 2670 static struct boundary compute_boundary(struct block_map *map, root_count_t root_index) 2671 { 2672 struct boundary boundary; 2673 height_t height; 2674 page_count_t leaf_pages = vdo_compute_block_map_page_count(map->entry_count); 2675 /* 2676 * Compute the leaf pages for this root. If the number of leaf pages does not distribute 2677 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to 2678 * roots starting from tree 0. 2679 */ 2680 page_count_t last_tree_root = (leaf_pages - 1) % map->root_count; 2681 page_count_t level_pages = leaf_pages / map->root_count; 2682 2683 if (root_index <= last_tree_root) 2684 level_pages++; 2685 2686 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT - 1; height++) { 2687 boundary.levels[height] = level_pages; 2688 level_pages = DIV_ROUND_UP(level_pages, VDO_BLOCK_MAP_ENTRIES_PER_PAGE); 2689 } 2690 2691 /* The root node always exists, even if the root is otherwise unused. */ 2692 boundary.levels[VDO_BLOCK_MAP_TREE_HEIGHT - 1] = 1; 2693 2694 return boundary; 2695 } 2696 2697 /** 2698 * vdo_traverse_forest() - Walk the entire forest of a block map. 2699 * @map: The block map. 2700 * @callback: A function to call with the pbn of each allocated node in the forest. 2701 * @completion: The completion to notify on each traversed PBN, and when traversal completes. 2702 */ 2703 void vdo_traverse_forest(struct block_map *map, vdo_entry_callback_fn callback, 2704 struct vdo_completion *completion) 2705 { 2706 root_count_t root; 2707 struct cursors *cursors; 2708 int result; 2709 2710 result = vdo_allocate_extended(struct cursors, map->root_count, 2711 struct cursor, __func__, &cursors); 2712 if (result != VDO_SUCCESS) { 2713 vdo_fail_completion(completion, result); 2714 return; 2715 } 2716 2717 cursors->zone = &map->zones[0]; 2718 cursors->pool = cursors->zone->vio_pool; 2719 cursors->entry_callback = callback; 2720 cursors->completion = completion; 2721 cursors->active_roots = map->root_count; 2722 for (root = 0; root < map->root_count; root++) { 2723 struct cursor *cursor = &cursors->cursors[root]; 2724 2725 *cursor = (struct cursor) { 2726 .tree = &map->forest->trees[root], 2727 .height = VDO_BLOCK_MAP_TREE_HEIGHT - 1, 2728 .parent = cursors, 2729 .boundary = compute_boundary(map, root), 2730 }; 2731 2732 cursor->waiter.callback = launch_cursor; 2733 acquire_vio_from_pool(cursors->pool, &cursor->waiter); 2734 } 2735 } 2736 2737 /** 2738 * initialize_block_map_zone() - Initialize the per-zone portions of the block map. 2739 * @map: The block map. 2740 * @zone_number: The zone to initialize. 2741 * @cache_size: The total block map cache size. 2742 * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be 2743 * written out. 2744 */ 2745 static int __must_check initialize_block_map_zone(struct block_map *map, 2746 zone_count_t zone_number, 2747 page_count_t cache_size, 2748 block_count_t maximum_age) 2749 { 2750 int result; 2751 block_count_t i; 2752 struct vdo *vdo = map->vdo; 2753 struct block_map_zone *zone = &map->zones[zone_number]; 2754 2755 BUILD_BUG_ON(sizeof(struct page_descriptor) != sizeof(u64)); 2756 2757 zone->zone_number = zone_number; 2758 zone->thread_id = vdo->thread_config.logical_threads[zone_number]; 2759 zone->block_map = map; 2760 2761 result = vdo_allocate_extended(struct dirty_lists, maximum_age, 2762 dirty_era_t, __func__, 2763 &zone->dirty_lists); 2764 if (result != VDO_SUCCESS) 2765 return result; 2766 2767 zone->dirty_lists->maximum_age = maximum_age; 2768 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_TREE_PAGE]); 2769 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_CACHE_PAGE]); 2770 2771 for (i = 0; i < maximum_age; i++) { 2772 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_TREE_PAGE]); 2773 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_CACHE_PAGE]); 2774 } 2775 2776 result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->loading_pages); 2777 if (result != VDO_SUCCESS) 2778 return result; 2779 2780 result = make_vio_pool(vdo, BLOCK_MAP_VIO_POOL_SIZE, 1, 2781 zone->thread_id, VIO_TYPE_BLOCK_MAP_INTERIOR, 2782 VIO_PRIORITY_METADATA, zone, &zone->vio_pool); 2783 if (result != VDO_SUCCESS) 2784 return result; 2785 2786 vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 2787 2788 zone->page_cache.zone = zone; 2789 zone->page_cache.vdo = vdo; 2790 zone->page_cache.page_count = cache_size / map->zone_count; 2791 zone->page_cache.stats.free_pages = zone->page_cache.page_count; 2792 2793 result = allocate_cache_components(&zone->page_cache); 2794 if (result != VDO_SUCCESS) 2795 return result; 2796 2797 /* initialize empty circular queues */ 2798 INIT_LIST_HEAD(&zone->page_cache.lru_list); 2799 INIT_LIST_HEAD(&zone->page_cache.outgoing_list); 2800 2801 return VDO_SUCCESS; 2802 } 2803 2804 /* Implements vdo_zone_thread_getter_fn */ 2805 static thread_id_t get_block_map_zone_thread_id(void *context, zone_count_t zone_number) 2806 { 2807 struct block_map *map = context; 2808 2809 return map->zones[zone_number].thread_id; 2810 } 2811 2812 /* Implements vdo_action_preamble_fn */ 2813 static void prepare_for_era_advance(void *context, struct vdo_completion *parent) 2814 { 2815 struct block_map *map = context; 2816 2817 map->current_era_point = map->pending_era_point; 2818 vdo_finish_completion(parent); 2819 } 2820 2821 /* Implements vdo_zone_action_fn */ 2822 static void advance_block_map_zone_era(void *context, zone_count_t zone_number, 2823 struct vdo_completion *parent) 2824 { 2825 struct block_map *map = context; 2826 struct block_map_zone *zone = &map->zones[zone_number]; 2827 2828 update_period(zone->dirty_lists, map->current_era_point); 2829 write_expired_elements(zone); 2830 vdo_finish_completion(parent); 2831 } 2832 2833 /* 2834 * Schedule an era advance if necessary. This method should not be called directly. Rather, call 2835 * vdo_schedule_default_action() on the block map's action manager. 2836 * 2837 * Implements vdo_action_scheduler_fn. 2838 */ 2839 static bool schedule_era_advance(void *context) 2840 { 2841 struct block_map *map = context; 2842 2843 if (map->current_era_point == map->pending_era_point) 2844 return false; 2845 2846 return vdo_schedule_action(map->action_manager, prepare_for_era_advance, 2847 advance_block_map_zone_era, NULL, NULL); 2848 } 2849 2850 static void uninitialize_block_map_zone(struct block_map_zone *zone) 2851 { 2852 struct vdo_page_cache *cache = &zone->page_cache; 2853 2854 vdo_free(vdo_forget(zone->dirty_lists)); 2855 free_vio_pool(vdo_forget(zone->vio_pool)); 2856 vdo_int_map_free(vdo_forget(zone->loading_pages)); 2857 if (cache->infos != NULL) { 2858 struct page_info *info; 2859 2860 for (info = cache->infos; info < cache->infos + cache->page_count; info++) 2861 free_vio(vdo_forget(info->vio)); 2862 } 2863 2864 vdo_int_map_free(vdo_forget(cache->page_map)); 2865 vdo_free(vdo_forget(cache->infos)); 2866 vdo_free(vdo_forget(cache->pages)); 2867 } 2868 2869 void vdo_free_block_map(struct block_map *map) 2870 { 2871 zone_count_t zone; 2872 2873 if (map == NULL) 2874 return; 2875 2876 for (zone = 0; zone < map->zone_count; zone++) 2877 uninitialize_block_map_zone(&map->zones[zone]); 2878 2879 vdo_abandon_block_map_growth(map); 2880 if (map->forest != NULL) 2881 deforest(vdo_forget(map->forest), 0); 2882 vdo_free(vdo_forget(map->action_manager)); 2883 vdo_free(map); 2884 } 2885 2886 /* @journal may be NULL. */ 2887 int vdo_decode_block_map(struct block_map_state_2_0 state, block_count_t logical_blocks, 2888 struct vdo *vdo, struct recovery_journal *journal, 2889 nonce_t nonce, page_count_t cache_size, block_count_t maximum_age, 2890 struct block_map **map_ptr) 2891 { 2892 struct block_map *map; 2893 int result; 2894 zone_count_t zone = 0; 2895 2896 BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE != 2897 ((VDO_BLOCK_SIZE - sizeof(struct block_map_page)) / 2898 sizeof(struct block_map_entry))); 2899 result = VDO_ASSERT(cache_size > 0, "block map cache size is specified"); 2900 if (result != VDO_SUCCESS) 2901 return result; 2902 2903 result = vdo_allocate_extended(struct block_map, 2904 vdo->thread_config.logical_zone_count, 2905 struct block_map_zone, __func__, &map); 2906 if (result != VDO_SUCCESS) 2907 return result; 2908 2909 map->vdo = vdo; 2910 map->root_origin = state.root_origin; 2911 map->root_count = state.root_count; 2912 map->entry_count = logical_blocks; 2913 map->journal = journal; 2914 map->nonce = nonce; 2915 2916 result = make_forest(map, map->entry_count); 2917 if (result != VDO_SUCCESS) { 2918 vdo_free_block_map(map); 2919 return result; 2920 } 2921 2922 replace_forest(map); 2923 2924 map->zone_count = vdo->thread_config.logical_zone_count; 2925 for (zone = 0; zone < map->zone_count; zone++) { 2926 result = initialize_block_map_zone(map, zone, cache_size, maximum_age); 2927 if (result != VDO_SUCCESS) { 2928 vdo_free_block_map(map); 2929 return result; 2930 } 2931 } 2932 2933 result = vdo_make_action_manager(map->zone_count, get_block_map_zone_thread_id, 2934 vdo_get_recovery_journal_thread_id(journal), 2935 map, schedule_era_advance, vdo, 2936 &map->action_manager); 2937 if (result != VDO_SUCCESS) { 2938 vdo_free_block_map(map); 2939 return result; 2940 } 2941 2942 *map_ptr = map; 2943 return VDO_SUCCESS; 2944 } 2945 2946 struct block_map_state_2_0 vdo_record_block_map(const struct block_map *map) 2947 { 2948 return (struct block_map_state_2_0) { 2949 .flat_page_origin = VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN, 2950 /* This is the flat page count, which has turned out to always be 0. */ 2951 .flat_page_count = 0, 2952 .root_origin = map->root_origin, 2953 .root_count = map->root_count, 2954 }; 2955 } 2956 2957 /* The block map needs to know the journals' sequence number to initialize the eras. */ 2958 void vdo_initialize_block_map_from_journal(struct block_map *map, 2959 struct recovery_journal *journal) 2960 { 2961 zone_count_t z = 0; 2962 2963 map->current_era_point = vdo_get_recovery_journal_current_sequence_number(journal); 2964 map->pending_era_point = map->current_era_point; 2965 2966 for (z = 0; z < map->zone_count; z++) { 2967 struct dirty_lists *dirty_lists = map->zones[z].dirty_lists; 2968 2969 VDO_ASSERT_LOG_ONLY(dirty_lists->next_period == 0, "current period not set"); 2970 dirty_lists->oldest_period = map->current_era_point; 2971 dirty_lists->next_period = map->current_era_point + 1; 2972 dirty_lists->offset = map->current_era_point % dirty_lists->maximum_age; 2973 } 2974 } 2975 2976 /* Compute the logical zone for the LBN of a data vio. */ 2977 zone_count_t vdo_compute_logical_zone(struct data_vio *data_vio) 2978 { 2979 struct block_map *map = vdo_from_data_vio(data_vio)->block_map; 2980 struct tree_lock *tree_lock = &data_vio->tree_lock; 2981 page_number_t page_number = data_vio->logical.lbn / VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2982 2983 tree_lock->tree_slots[0].page_index = page_number; 2984 tree_lock->root_index = page_number % map->root_count; 2985 return (tree_lock->root_index % map->zone_count); 2986 } 2987 2988 void vdo_advance_block_map_era(struct block_map *map, 2989 sequence_number_t recovery_block_number) 2990 { 2991 if (map == NULL) 2992 return; 2993 2994 map->pending_era_point = recovery_block_number; 2995 vdo_schedule_default_action(map->action_manager); 2996 } 2997 2998 /* Implements vdo_admin_initiator_fn */ 2999 static void initiate_drain(struct admin_state *state) 3000 { 3001 struct block_map_zone *zone = container_of(state, struct block_map_zone, state); 3002 3003 VDO_ASSERT_LOG_ONLY((zone->active_lookups == 0), 3004 "%s() called with no active lookups", __func__); 3005 3006 if (!vdo_is_state_suspending(state)) { 3007 while (zone->dirty_lists->oldest_period < zone->dirty_lists->next_period) 3008 expire_oldest_list(zone->dirty_lists); 3009 write_expired_elements(zone); 3010 } 3011 3012 check_for_drain_complete(zone); 3013 } 3014 3015 /* Implements vdo_zone_action_fn. */ 3016 static void drain_zone(void *context, zone_count_t zone_number, 3017 struct vdo_completion *parent) 3018 { 3019 struct block_map *map = context; 3020 struct block_map_zone *zone = &map->zones[zone_number]; 3021 3022 vdo_start_draining(&zone->state, 3023 vdo_get_current_manager_operation(map->action_manager), 3024 parent, initiate_drain); 3025 } 3026 3027 void vdo_drain_block_map(struct block_map *map, const struct admin_state_code *operation, 3028 struct vdo_completion *parent) 3029 { 3030 vdo_schedule_operation(map->action_manager, operation, NULL, drain_zone, NULL, 3031 parent); 3032 } 3033 3034 /* Implements vdo_zone_action_fn. */ 3035 static void resume_block_map_zone(void *context, zone_count_t zone_number, 3036 struct vdo_completion *parent) 3037 { 3038 struct block_map *map = context; 3039 struct block_map_zone *zone = &map->zones[zone_number]; 3040 3041 vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state)); 3042 } 3043 3044 void vdo_resume_block_map(struct block_map *map, struct vdo_completion *parent) 3045 { 3046 vdo_schedule_operation(map->action_manager, VDO_ADMIN_STATE_RESUMING, 3047 NULL, resume_block_map_zone, NULL, parent); 3048 } 3049 3050 /* Allocate an expanded collection of trees, for a future growth. */ 3051 int vdo_prepare_to_grow_block_map(struct block_map *map, 3052 block_count_t new_logical_blocks) 3053 { 3054 if (map->next_entry_count == new_logical_blocks) 3055 return VDO_SUCCESS; 3056 3057 if (map->next_entry_count > 0) 3058 vdo_abandon_block_map_growth(map); 3059 3060 if (new_logical_blocks < map->entry_count) { 3061 map->next_entry_count = map->entry_count; 3062 return VDO_SUCCESS; 3063 } 3064 3065 return make_forest(map, new_logical_blocks); 3066 } 3067 3068 /* Implements vdo_action_preamble_fn */ 3069 static void grow_forest(void *context, struct vdo_completion *completion) 3070 { 3071 replace_forest(context); 3072 vdo_finish_completion(completion); 3073 } 3074 3075 /* Requires vdo_prepare_to_grow_block_map() to have been previously called. */ 3076 void vdo_grow_block_map(struct block_map *map, struct vdo_completion *parent) 3077 { 3078 vdo_schedule_operation(map->action_manager, 3079 VDO_ADMIN_STATE_SUSPENDED_OPERATION, 3080 grow_forest, NULL, NULL, parent); 3081 } 3082 3083 void vdo_abandon_block_map_growth(struct block_map *map) 3084 { 3085 struct forest *forest = vdo_forget(map->next_forest); 3086 3087 if (forest != NULL) 3088 deforest(forest, forest->segments - 1); 3089 3090 map->next_entry_count = 0; 3091 } 3092 3093 /* Release the page completion and then continue the requester. */ 3094 static inline void finish_processing_page(struct vdo_completion *completion, int result) 3095 { 3096 struct vdo_completion *parent = completion->parent; 3097 3098 vdo_release_page_completion(completion); 3099 vdo_continue_completion(parent, result); 3100 } 3101 3102 static void handle_page_error(struct vdo_completion *completion) 3103 { 3104 finish_processing_page(completion, completion->result); 3105 } 3106 3107 /* Fetch the mapping page for a block map update, and call the provided handler when fetched. */ 3108 static void fetch_mapping_page(struct data_vio *data_vio, bool modifiable, 3109 vdo_action_fn action) 3110 { 3111 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 3112 3113 if (vdo_is_state_draining(&zone->state)) { 3114 continue_data_vio_with_error(data_vio, VDO_SHUTTING_DOWN); 3115 return; 3116 } 3117 3118 vdo_get_page(&data_vio->page_completion, zone, 3119 data_vio->tree_lock.tree_slots[0].block_map_slot.pbn, 3120 modifiable, &data_vio->vio.completion, 3121 action, handle_page_error, false); 3122 } 3123 3124 /** 3125 * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped. 3126 * @data_vio: The data vio. 3127 * 3128 * This indicates the block map entry for the logical block is either unmapped or corrupted. 3129 */ 3130 static void clear_mapped_location(struct data_vio *data_vio) 3131 { 3132 data_vio->mapped = (struct zoned_pbn) { 3133 .state = VDO_MAPPING_STATE_UNMAPPED, 3134 }; 3135 } 3136 3137 /** 3138 * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a 3139 * data_vio. 3140 * @data_vio: The data vio. 3141 * @entry: The new mapped entry to set. 3142 * 3143 * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any 3144 * other failure 3145 */ 3146 static int __must_check set_mapped_location(struct data_vio *data_vio, 3147 const struct block_map_entry *entry) 3148 { 3149 /* Unpack the PBN for logging purposes even if the entry is invalid. */ 3150 struct data_location mapped = vdo_unpack_block_map_entry(entry); 3151 3152 if (vdo_is_valid_location(&mapped)) { 3153 int result; 3154 3155 result = vdo_get_physical_zone(vdo_from_data_vio(data_vio), 3156 mapped.pbn, &data_vio->mapped.zone); 3157 if (result == VDO_SUCCESS) { 3158 data_vio->mapped.pbn = mapped.pbn; 3159 data_vio->mapped.state = mapped.state; 3160 return VDO_SUCCESS; 3161 } 3162 3163 /* 3164 * Return all errors not specifically known to be errors from validating the 3165 * location. 3166 */ 3167 if ((result != VDO_OUT_OF_RANGE) && (result != VDO_BAD_MAPPING)) 3168 return result; 3169 } 3170 3171 /* 3172 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases 3173 * to VDO_BAD_MAPPING. 3174 */ 3175 vdo_log_error_strerror(VDO_BAD_MAPPING, 3176 "PBN %llu with state %u read from the block map was invalid", 3177 (unsigned long long) mapped.pbn, mapped.state); 3178 3179 /* 3180 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding 3181 * known data loss. 3182 */ 3183 if (!data_vio->write) 3184 return VDO_BAD_MAPPING; 3185 3186 /* 3187 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped 3188 * entry rather than fail the write. 3189 */ 3190 clear_mapped_location(data_vio); 3191 return VDO_SUCCESS; 3192 } 3193 3194 /* This callback is registered in vdo_get_mapped_block(). */ 3195 static void get_mapping_from_fetched_page(struct vdo_completion *completion) 3196 { 3197 int result; 3198 struct vdo_page_completion *vpc = as_vdo_page_completion(completion); 3199 const struct block_map_page *page; 3200 const struct block_map_entry *entry; 3201 struct data_vio *data_vio = as_data_vio(completion->parent); 3202 struct block_map_tree_slot *tree_slot; 3203 3204 if (completion->result != VDO_SUCCESS) { 3205 finish_processing_page(completion, completion->result); 3206 return; 3207 } 3208 3209 result = validate_completed_page(vpc, false); 3210 if (result != VDO_SUCCESS) { 3211 finish_processing_page(completion, result); 3212 return; 3213 } 3214 3215 page = (const struct block_map_page *) get_page_buffer(vpc->info); 3216 tree_slot = &data_vio->tree_lock.tree_slots[0]; 3217 entry = &page->entries[tree_slot->block_map_slot.slot]; 3218 3219 result = set_mapped_location(data_vio, entry); 3220 finish_processing_page(completion, result); 3221 } 3222 3223 void vdo_update_block_map_page(struct block_map_page *page, struct data_vio *data_vio, 3224 physical_block_number_t pbn, 3225 enum block_mapping_state mapping_state, 3226 sequence_number_t *recovery_lock) 3227 { 3228 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 3229 struct block_map *block_map = zone->block_map; 3230 struct recovery_journal *journal = block_map->journal; 3231 sequence_number_t old_locked, new_locked; 3232 struct tree_lock *tree_lock = &data_vio->tree_lock; 3233 3234 /* Encode the new mapping. */ 3235 page->entries[tree_lock->tree_slots[tree_lock->height].block_map_slot.slot] = 3236 vdo_pack_block_map_entry(pbn, mapping_state); 3237 3238 /* Adjust references on the recovery journal blocks. */ 3239 old_locked = *recovery_lock; 3240 new_locked = data_vio->recovery_sequence_number; 3241 3242 if ((old_locked == 0) || (old_locked > new_locked)) { 3243 vdo_acquire_recovery_journal_block_reference(journal, new_locked, 3244 VDO_ZONE_TYPE_LOGICAL, 3245 zone->zone_number); 3246 3247 if (old_locked > 0) { 3248 vdo_release_recovery_journal_block_reference(journal, old_locked, 3249 VDO_ZONE_TYPE_LOGICAL, 3250 zone->zone_number); 3251 } 3252 3253 *recovery_lock = new_locked; 3254 } 3255 3256 /* 3257 * FIXME: explain this more 3258 * Release the transferred lock from the data_vio. 3259 */ 3260 vdo_release_journal_entry_lock(journal, new_locked); 3261 data_vio->recovery_sequence_number = 0; 3262 } 3263 3264 static void put_mapping_in_fetched_page(struct vdo_completion *completion) 3265 { 3266 struct data_vio *data_vio = as_data_vio(completion->parent); 3267 sequence_number_t old_lock; 3268 struct vdo_page_completion *vpc; 3269 struct page_info *info; 3270 int result; 3271 3272 if (completion->result != VDO_SUCCESS) { 3273 finish_processing_page(completion, completion->result); 3274 return; 3275 } 3276 3277 vpc = as_vdo_page_completion(completion); 3278 result = validate_completed_page(vpc, true); 3279 if (result != VDO_SUCCESS) { 3280 finish_processing_page(completion, result); 3281 return; 3282 } 3283 3284 info = vpc->info; 3285 old_lock = info->recovery_lock; 3286 vdo_update_block_map_page((struct block_map_page *) get_page_buffer(info), 3287 data_vio, data_vio->new_mapped.pbn, 3288 data_vio->new_mapped.state, &info->recovery_lock); 3289 set_info_state(info, PS_DIRTY); 3290 add_to_dirty_lists(info->cache->zone, &info->state_entry, 3291 VDO_CACHE_PAGE, old_lock, info->recovery_lock); 3292 finish_processing_page(completion, VDO_SUCCESS); 3293 } 3294 3295 /* Read a stored block mapping into a data_vio. */ 3296 void vdo_get_mapped_block(struct data_vio *data_vio) 3297 { 3298 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) { 3299 /* 3300 * We know that the block map page for this LBN has not been allocated, so the 3301 * block must be unmapped. 3302 */ 3303 clear_mapped_location(data_vio); 3304 continue_data_vio(data_vio); 3305 return; 3306 } 3307 3308 fetch_mapping_page(data_vio, false, get_mapping_from_fetched_page); 3309 } 3310 3311 /* Update a stored block mapping to reflect a data_vio's new mapping. */ 3312 void vdo_put_mapped_block(struct data_vio *data_vio) 3313 { 3314 fetch_mapping_page(data_vio, true, put_mapping_in_fetched_page); 3315 } 3316 3317 struct block_map_statistics vdo_get_block_map_statistics(struct block_map *map) 3318 { 3319 zone_count_t zone = 0; 3320 struct block_map_statistics totals; 3321 3322 memset(&totals, 0, sizeof(struct block_map_statistics)); 3323 for (zone = 0; zone < map->zone_count; zone++) { 3324 const struct block_map_statistics *stats = 3325 &(map->zones[zone].page_cache.stats); 3326 3327 totals.dirty_pages += READ_ONCE(stats->dirty_pages); 3328 totals.clean_pages += READ_ONCE(stats->clean_pages); 3329 totals.free_pages += READ_ONCE(stats->free_pages); 3330 totals.failed_pages += READ_ONCE(stats->failed_pages); 3331 totals.incoming_pages += READ_ONCE(stats->incoming_pages); 3332 totals.outgoing_pages += READ_ONCE(stats->outgoing_pages); 3333 totals.cache_pressure += READ_ONCE(stats->cache_pressure); 3334 totals.read_count += READ_ONCE(stats->read_count); 3335 totals.write_count += READ_ONCE(stats->write_count); 3336 totals.failed_reads += READ_ONCE(stats->failed_reads); 3337 totals.failed_writes += READ_ONCE(stats->failed_writes); 3338 totals.reclaimed += READ_ONCE(stats->reclaimed); 3339 totals.read_outgoing += READ_ONCE(stats->read_outgoing); 3340 totals.found_in_cache += READ_ONCE(stats->found_in_cache); 3341 totals.discard_required += READ_ONCE(stats->discard_required); 3342 totals.wait_for_page += READ_ONCE(stats->wait_for_page); 3343 totals.fetch_required += READ_ONCE(stats->fetch_required); 3344 totals.pages_loaded += READ_ONCE(stats->pages_loaded); 3345 totals.pages_saved += READ_ONCE(stats->pages_saved); 3346 totals.flush_count += READ_ONCE(stats->flush_count); 3347 } 3348 3349 return totals; 3350 } 3351