1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "block-map.h" 7 8 #include <linux/bio.h> 9 #include <linux/ratelimit.h> 10 11 #include "errors.h" 12 #include "logger.h" 13 #include "memory-alloc.h" 14 #include "permassert.h" 15 16 #include "action-manager.h" 17 #include "admin-state.h" 18 #include "completion.h" 19 #include "constants.h" 20 #include "data-vio.h" 21 #include "encodings.h" 22 #include "io-submitter.h" 23 #include "physical-zone.h" 24 #include "recovery-journal.h" 25 #include "slab-depot.h" 26 #include "status-codes.h" 27 #include "types.h" 28 #include "vdo.h" 29 #include "vio.h" 30 #include "wait-queue.h" 31 32 /** 33 * DOC: Block map eras 34 * 35 * The block map era, or maximum age, is used as follows: 36 * 37 * Each block map page, when dirty, records the earliest recovery journal block sequence number of 38 * the changes reflected in that dirty block. Sequence numbers are classified into eras: every 39 * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras 40 * according to the sequence number they record. 41 * 42 * In the current (newest) era, block map pages are not written unless there is cache pressure. In 43 * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in 44 * this era are issued for write. In all older eras, pages are issued for write immediately. 45 */ 46 47 struct page_descriptor { 48 root_count_t root_index; 49 height_t height; 50 page_number_t page_index; 51 slot_number_t slot; 52 } __packed; 53 54 union page_key { 55 struct page_descriptor descriptor; 56 u64 key; 57 }; 58 59 struct write_if_not_dirtied_context { 60 struct block_map_zone *zone; 61 u8 generation; 62 }; 63 64 struct block_map_tree_segment { 65 struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT]; 66 }; 67 68 struct block_map_tree { 69 struct block_map_tree_segment *segments; 70 }; 71 72 struct forest { 73 struct block_map *map; 74 size_t segments; 75 struct boundary *boundaries; 76 struct tree_page **pages; 77 struct block_map_tree trees[]; 78 }; 79 80 struct cursor_level { 81 page_number_t page_index; 82 slot_number_t slot; 83 }; 84 85 struct cursors; 86 87 struct cursor { 88 struct vdo_waiter waiter; 89 struct block_map_tree *tree; 90 height_t height; 91 struct cursors *parent; 92 struct boundary boundary; 93 struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT]; 94 struct pooled_vio *vio; 95 }; 96 97 struct cursors { 98 struct block_map_zone *zone; 99 struct vio_pool *pool; 100 vdo_entry_callback_fn entry_callback; 101 struct vdo_completion *completion; 102 root_count_t active_roots; 103 struct cursor cursors[]; 104 }; 105 106 static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF; 107 108 /* Used to indicate that the page holding the location of a tree root has been "loaded". */ 109 static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF; 110 111 const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = { 112 .mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F, 113 .pbn_high_nibble = 0, 114 .pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX), 115 }; 116 117 #define LOG_INTERVAL 4000 118 #define DISPLAY_INTERVAL 100000 119 120 /* 121 * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread. 122 * Prevents any compiler shenanigans from affecting other threads reading those stats. 123 */ 124 #define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta)) 125 126 static inline bool is_dirty(const struct page_info *info) 127 { 128 return info->state == PS_DIRTY; 129 } 130 131 static inline bool is_present(const struct page_info *info) 132 { 133 return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY); 134 } 135 136 static inline bool is_in_flight(const struct page_info *info) 137 { 138 return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING); 139 } 140 141 static inline bool is_incoming(const struct page_info *info) 142 { 143 return info->state == PS_INCOMING; 144 } 145 146 static inline bool is_outgoing(const struct page_info *info) 147 { 148 return info->state == PS_OUTGOING; 149 } 150 151 static inline bool is_valid(const struct page_info *info) 152 { 153 return is_present(info) || is_outgoing(info); 154 } 155 156 static char *get_page_buffer(struct page_info *info) 157 { 158 struct vdo_page_cache *cache = info->cache; 159 160 return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE]; 161 } 162 163 static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter) 164 { 165 struct vdo_page_completion *completion; 166 167 if (waiter == NULL) 168 return NULL; 169 170 completion = container_of(waiter, struct vdo_page_completion, waiter); 171 vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION); 172 return completion; 173 } 174 175 /** 176 * initialize_info() - Initialize all page info structures and put them on the free list. 177 * @cache: The page cache. 178 * 179 * Return: VDO_SUCCESS or an error. 180 */ 181 static int initialize_info(struct vdo_page_cache *cache) 182 { 183 struct page_info *info; 184 185 INIT_LIST_HEAD(&cache->free_list); 186 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 187 int result; 188 189 info->cache = cache; 190 info->state = PS_FREE; 191 info->pbn = NO_PAGE; 192 193 result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP, 194 VIO_PRIORITY_METADATA, info, 195 get_page_buffer(info), &info->vio); 196 if (result != VDO_SUCCESS) 197 return result; 198 199 /* The thread ID should never change. */ 200 info->vio->completion.callback_thread_id = cache->zone->thread_id; 201 202 INIT_LIST_HEAD(&info->state_entry); 203 list_add_tail(&info->state_entry, &cache->free_list); 204 INIT_LIST_HEAD(&info->lru_entry); 205 } 206 207 return VDO_SUCCESS; 208 } 209 210 /** 211 * allocate_cache_components() - Allocate components of the cache which require their own 212 * allocation. 213 * @cache: The page cache. 214 * 215 * The caller is responsible for all clean up on errors. 216 * 217 * Return: VDO_SUCCESS or an error code. 218 */ 219 static int __must_check allocate_cache_components(struct vdo_page_cache *cache) 220 { 221 u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE; 222 int result; 223 224 result = vdo_allocate(cache->page_count, "page infos", &cache->infos); 225 if (result != VDO_SUCCESS) 226 return result; 227 228 result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages); 229 if (result != VDO_SUCCESS) 230 return result; 231 232 result = vdo_int_map_create(cache->page_count, &cache->page_map); 233 if (result != VDO_SUCCESS) 234 return result; 235 236 return initialize_info(cache); 237 } 238 239 /** 240 * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's 241 * thread. 242 * @cache: The page cache. 243 * @function_name: The funtion name to report if the assertion fails. 244 */ 245 static inline void assert_on_cache_thread(struct vdo_page_cache *cache, 246 const char *function_name) 247 { 248 thread_id_t thread_id = vdo_get_callback_thread_id(); 249 250 VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id), 251 "%s() must only be called on cache thread %d, not thread %d", 252 function_name, cache->zone->thread_id, thread_id); 253 } 254 255 /** assert_io_allowed() - Assert that a page cache may issue I/O. */ 256 static inline void assert_io_allowed(struct vdo_page_cache *cache) 257 { 258 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state), 259 "VDO page cache may issue I/O"); 260 } 261 262 /** report_cache_pressure() - Log and, if enabled, report cache pressure. */ 263 static void report_cache_pressure(struct vdo_page_cache *cache) 264 { 265 ADD_ONCE(cache->stats.cache_pressure, 1); 266 if (cache->waiter_count > cache->page_count) { 267 if ((cache->pressure_report % LOG_INTERVAL) == 0) 268 vdo_log_info("page cache pressure %u", cache->stats.cache_pressure); 269 270 if (++cache->pressure_report >= DISPLAY_INTERVAL) 271 cache->pressure_report = 0; 272 } 273 } 274 275 /** 276 * get_page_state_name() - Return the name of a page state. 277 * @state: The page state to describe. 278 * 279 * If the page state is invalid a static string is returned and the invalid state is logged. 280 * 281 * Return: A pointer to a static page state name. 282 */ 283 static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state) 284 { 285 int result; 286 static const char * const state_names[] = { 287 "FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING" 288 }; 289 290 BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT); 291 292 result = VDO_ASSERT(state < ARRAY_SIZE(state_names), 293 "Unknown page_state value %d", state); 294 if (result != VDO_SUCCESS) 295 return "[UNKNOWN PAGE STATE]"; 296 297 return state_names[state]; 298 } 299 300 /** 301 * update_counter() - Update the counter associated with a given state. 302 * @info: The page info to count. 303 * @delta: The delta to apply to the counter. 304 */ 305 static void update_counter(struct page_info *info, s32 delta) 306 { 307 struct block_map_statistics *stats = &info->cache->stats; 308 309 switch (info->state) { 310 case PS_FREE: 311 ADD_ONCE(stats->free_pages, delta); 312 return; 313 314 case PS_INCOMING: 315 ADD_ONCE(stats->incoming_pages, delta); 316 return; 317 318 case PS_OUTGOING: 319 ADD_ONCE(stats->outgoing_pages, delta); 320 return; 321 322 case PS_FAILED: 323 ADD_ONCE(stats->failed_pages, delta); 324 return; 325 326 case PS_RESIDENT: 327 ADD_ONCE(stats->clean_pages, delta); 328 return; 329 330 case PS_DIRTY: 331 ADD_ONCE(stats->dirty_pages, delta); 332 return; 333 334 default: 335 return; 336 } 337 } 338 339 /** update_lru() - Update the lru information for an active page. */ 340 static void update_lru(struct page_info *info) 341 { 342 if (info->cache->lru_list.prev != &info->lru_entry) 343 list_move_tail(&info->lru_entry, &info->cache->lru_list); 344 } 345 346 /** 347 * set_info_state() - Set the state of a page_info and put it on the right list, adjusting 348 * counters. 349 * @info: The page info to update. 350 * @new_state: The new state to set. 351 */ 352 static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state) 353 { 354 if (new_state == info->state) 355 return; 356 357 update_counter(info, -1); 358 info->state = new_state; 359 update_counter(info, 1); 360 361 switch (info->state) { 362 case PS_FREE: 363 case PS_FAILED: 364 list_move_tail(&info->state_entry, &info->cache->free_list); 365 return; 366 367 case PS_OUTGOING: 368 list_move_tail(&info->state_entry, &info->cache->outgoing_list); 369 return; 370 371 case PS_DIRTY: 372 return; 373 374 default: 375 list_del_init(&info->state_entry); 376 } 377 } 378 379 /** set_info_pbn() - Set the pbn for an info, updating the map as needed. */ 380 static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn) 381 { 382 struct vdo_page_cache *cache = info->cache; 383 384 /* Either the new or the old page number must be NO_PAGE. */ 385 int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE), 386 "Must free a page before reusing it."); 387 if (result != VDO_SUCCESS) 388 return result; 389 390 if (info->pbn != NO_PAGE) 391 vdo_int_map_remove(cache->page_map, info->pbn); 392 393 info->pbn = pbn; 394 395 if (pbn != NO_PAGE) { 396 result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL); 397 if (result != VDO_SUCCESS) 398 return result; 399 } 400 return VDO_SUCCESS; 401 } 402 403 /** reset_page_info() - Reset page info to represent an unallocated page. */ 404 static int reset_page_info(struct page_info *info) 405 { 406 int result; 407 408 result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy"); 409 if (result != VDO_SUCCESS) 410 return result; 411 412 result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting), 413 "VDO Page must not have waiters"); 414 if (result != VDO_SUCCESS) 415 return result; 416 417 result = set_info_pbn(info, NO_PAGE); 418 set_info_state(info, PS_FREE); 419 list_del_init(&info->lru_entry); 420 return result; 421 } 422 423 /** 424 * find_free_page() - Find a free page. 425 * @cache: The page cache. 426 * 427 * Return: A pointer to the page info structure (if found), NULL otherwise. 428 */ 429 static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache) 430 { 431 struct page_info *info; 432 433 info = list_first_entry_or_null(&cache->free_list, struct page_info, 434 state_entry); 435 if (info != NULL) 436 list_del_init(&info->state_entry); 437 438 return info; 439 } 440 441 /** 442 * find_page() - Find the page info (if any) associated with a given pbn. 443 * @cache: The page cache. 444 * @pbn: The absolute physical block number of the page. 445 * 446 * Return: The page info for the page if available, or NULL if not. 447 */ 448 static struct page_info * __must_check find_page(struct vdo_page_cache *cache, 449 physical_block_number_t pbn) 450 { 451 if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn)) 452 return cache->last_found; 453 454 cache->last_found = vdo_int_map_get(cache->page_map, pbn); 455 return cache->last_found; 456 } 457 458 /** 459 * select_lru_page() - Determine which page is least recently used. 460 * @cache: The page cache. 461 * 462 * Picks the least recently used from among the non-busy entries at the front of each of the lru 463 * list. Since whenever we mark a page busy we also put it to the end of the list it is unlikely 464 * that the entries at the front are busy unless the queue is very short, but not impossible. 465 * 466 * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be 467 * found. The page can be dirty or resident. 468 */ 469 static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache) 470 { 471 struct page_info *info; 472 473 list_for_each_entry(info, &cache->lru_list, lru_entry) 474 if ((info->busy == 0) && !is_in_flight(info)) 475 return info; 476 477 return NULL; 478 } 479 480 /* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */ 481 482 /** 483 * complete_with_page() - Helper to complete the VDO Page Completion request successfully. 484 * @info: The page info representing the result page. 485 * @vdo_page_comp: The VDO page completion to complete. 486 */ 487 static void complete_with_page(struct page_info *info, 488 struct vdo_page_completion *vdo_page_comp) 489 { 490 bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info); 491 492 if (!available) { 493 vdo_log_error_strerror(VDO_BAD_PAGE, 494 "Requested cache page %llu in state %s is not %s", 495 (unsigned long long) info->pbn, 496 get_page_state_name(info->state), 497 vdo_page_comp->writable ? "present" : "valid"); 498 vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE); 499 return; 500 } 501 502 vdo_page_comp->info = info; 503 vdo_page_comp->ready = true; 504 vdo_finish_completion(&vdo_page_comp->completion); 505 } 506 507 /** 508 * complete_waiter_with_error() - Complete a page completion with an error code. 509 * @waiter: The page completion, as a waiter. 510 * @result_ptr: A pointer to the error code. 511 * 512 * Implements waiter_callback_fn. 513 */ 514 static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr) 515 { 516 int *result = result_ptr; 517 518 vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result); 519 } 520 521 /** 522 * complete_waiter_with_page() - Complete a page completion with a page. 523 * @waiter: The page completion, as a waiter. 524 * @page_info: The page info to complete with. 525 * 526 * Implements waiter_callback_fn. 527 */ 528 static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info) 529 { 530 complete_with_page(page_info, page_completion_from_waiter(waiter)); 531 } 532 533 /** 534 * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result. 535 * @info: The loaded page info. 536 * @waitq: The list of waiting data_vios. 537 * 538 * Upon completion the waitq will be empty. 539 * 540 * Return: The number of pages distributed. 541 */ 542 static unsigned int distribute_page_over_waitq(struct page_info *info, 543 struct vdo_wait_queue *waitq) 544 { 545 size_t num_pages; 546 547 update_lru(info); 548 num_pages = vdo_waitq_num_waiters(waitq); 549 550 /* 551 * Increment the busy count once for each pending completion so that this page does not 552 * stop being busy until all completions have been processed. 553 */ 554 info->busy += num_pages; 555 556 vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info); 557 return num_pages; 558 } 559 560 /** 561 * set_persistent_error() - Set a persistent error which all requests will receive in the future. 562 * @cache: The page cache. 563 * @context: A string describing what triggered the error. 564 * @result: The error result to set on the cache. 565 * 566 * Once triggered, all enqueued completions will get this error. Any future requests will result in 567 * this error as well. 568 */ 569 static void set_persistent_error(struct vdo_page_cache *cache, const char *context, 570 int result) 571 { 572 struct page_info *info; 573 /* If we're already read-only, there's no need to log. */ 574 struct vdo *vdo = cache->vdo; 575 576 if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) { 577 vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s", 578 context); 579 vdo_enter_read_only_mode(vdo, result); 580 } 581 582 assert_on_cache_thread(cache, __func__); 583 584 vdo_waitq_notify_all_waiters(&cache->free_waiters, 585 complete_waiter_with_error, &result); 586 cache->waiter_count = 0; 587 588 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 589 vdo_waitq_notify_all_waiters(&info->waiting, 590 complete_waiter_with_error, &result); 591 } 592 } 593 594 /** 595 * validate_completed_page() - Check that a page completion which is being freed to the cache 596 * referred to a valid page and is in a valid state. 597 * @completion: The page completion to check. 598 * @writable: Whether a writable page is required. 599 * 600 * Return: VDO_SUCCESS if the page was valid, otherwise as error 601 */ 602 static int __must_check validate_completed_page(struct vdo_page_completion *completion, 603 bool writable) 604 { 605 int result; 606 607 result = VDO_ASSERT(completion->ready, "VDO Page completion not ready"); 608 if (result != VDO_SUCCESS) 609 return result; 610 611 result = VDO_ASSERT(completion->info != NULL, 612 "VDO Page Completion must be complete"); 613 if (result != VDO_SUCCESS) 614 return result; 615 616 result = VDO_ASSERT(completion->info->pbn == completion->pbn, 617 "VDO Page Completion pbn must be consistent"); 618 if (result != VDO_SUCCESS) 619 return result; 620 621 result = VDO_ASSERT(is_valid(completion->info), 622 "VDO Page Completion page must be valid"); 623 if (result != VDO_SUCCESS) 624 return result; 625 626 if (writable) { 627 result = VDO_ASSERT(completion->writable, 628 "VDO Page Completion must be writable"); 629 if (result != VDO_SUCCESS) 630 return result; 631 } 632 633 return VDO_SUCCESS; 634 } 635 636 static void check_for_drain_complete(struct block_map_zone *zone) 637 { 638 if (vdo_is_state_draining(&zone->state) && 639 (zone->active_lookups == 0) && 640 !vdo_waitq_has_waiters(&zone->flush_waiters) && 641 !is_vio_pool_busy(zone->vio_pool) && 642 (zone->page_cache.outstanding_reads == 0) && 643 (zone->page_cache.outstanding_writes == 0)) { 644 vdo_finish_draining_with_result(&zone->state, 645 (vdo_is_read_only(zone->block_map->vdo) ? 646 VDO_READ_ONLY : VDO_SUCCESS)); 647 } 648 } 649 650 static void enter_zone_read_only_mode(struct block_map_zone *zone, int result) 651 { 652 vdo_enter_read_only_mode(zone->block_map->vdo, result); 653 654 /* 655 * We are in read-only mode, so we won't ever write any page out. 656 * Just take all waiters off the waitq so the zone can drain. 657 */ 658 vdo_waitq_init(&zone->flush_waiters); 659 check_for_drain_complete(zone); 660 } 661 662 static bool __must_check 663 validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion, 664 bool writable) 665 { 666 int result = validate_completed_page(completion, writable); 667 668 if (result == VDO_SUCCESS) 669 return true; 670 671 enter_zone_read_only_mode(completion->info->cache->zone, result); 672 return false; 673 } 674 675 /** 676 * handle_load_error() - Handle page load errors. 677 * @completion: The page read vio. 678 */ 679 static void handle_load_error(struct vdo_completion *completion) 680 { 681 int result = completion->result; 682 struct page_info *info = completion->parent; 683 struct vdo_page_cache *cache = info->cache; 684 685 assert_on_cache_thread(cache, __func__); 686 vio_record_metadata_io_error(as_vio(completion)); 687 vdo_enter_read_only_mode(cache->zone->block_map->vdo, result); 688 ADD_ONCE(cache->stats.failed_reads, 1); 689 set_info_state(info, PS_FAILED); 690 vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result); 691 reset_page_info(info); 692 693 /* 694 * Don't decrement until right before calling check_for_drain_complete() to 695 * ensure that the above work can't cause the page cache to be freed out from under us. 696 */ 697 cache->outstanding_reads--; 698 check_for_drain_complete(cache->zone); 699 } 700 701 /** 702 * page_is_loaded() - Callback used when a page has been loaded. 703 * @completion: The vio which has loaded the page. Its parent is the page_info. 704 */ 705 static void page_is_loaded(struct vdo_completion *completion) 706 { 707 struct page_info *info = completion->parent; 708 struct vdo_page_cache *cache = info->cache; 709 nonce_t nonce = info->cache->zone->block_map->nonce; 710 struct block_map_page *page; 711 enum block_map_page_validity validity; 712 713 assert_on_cache_thread(cache, __func__); 714 715 page = (struct block_map_page *) get_page_buffer(info); 716 validity = vdo_validate_block_map_page(page, nonce, info->pbn); 717 if (validity == VDO_BLOCK_MAP_PAGE_BAD) { 718 physical_block_number_t pbn = vdo_get_block_map_page_pbn(page); 719 int result = vdo_log_error_strerror(VDO_BAD_PAGE, 720 "Expected page %llu but got page %llu instead", 721 (unsigned long long) info->pbn, 722 (unsigned long long) pbn); 723 724 vdo_continue_completion(completion, result); 725 return; 726 } 727 728 if (validity == VDO_BLOCK_MAP_PAGE_INVALID) 729 vdo_format_block_map_page(page, nonce, info->pbn, false); 730 731 info->recovery_lock = 0; 732 set_info_state(info, PS_RESIDENT); 733 distribute_page_over_waitq(info, &info->waiting); 734 735 /* 736 * Don't decrement until right before calling check_for_drain_complete() to 737 * ensure that the above work can't cause the page cache to be freed out from under us. 738 */ 739 cache->outstanding_reads--; 740 check_for_drain_complete(cache->zone); 741 } 742 743 /** 744 * handle_rebuild_read_error() - Handle a read error during a read-only rebuild. 745 * @completion: The page load completion. 746 */ 747 static void handle_rebuild_read_error(struct vdo_completion *completion) 748 { 749 struct page_info *info = completion->parent; 750 struct vdo_page_cache *cache = info->cache; 751 752 assert_on_cache_thread(cache, __func__); 753 754 /* 755 * We are doing a read-only rebuild, so treat this as a successful read 756 * of an uninitialized page. 757 */ 758 vio_record_metadata_io_error(as_vio(completion)); 759 ADD_ONCE(cache->stats.failed_reads, 1); 760 memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE); 761 vdo_reset_completion(completion); 762 page_is_loaded(completion); 763 } 764 765 static void load_cache_page_endio(struct bio *bio) 766 { 767 struct vio *vio = bio->bi_private; 768 struct page_info *info = vio->completion.parent; 769 770 continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id); 771 } 772 773 /** 774 * launch_page_load() - Begin the process of loading a page. 775 * @info: The page info to launch. 776 * @pbn: The absolute physical block number of the page to load. 777 * 778 * Return: VDO_SUCCESS or an error code. 779 */ 780 static int __must_check launch_page_load(struct page_info *info, 781 physical_block_number_t pbn) 782 { 783 int result; 784 vdo_action_fn callback; 785 struct vdo_page_cache *cache = info->cache; 786 787 assert_io_allowed(cache); 788 789 result = set_info_pbn(info, pbn); 790 if (result != VDO_SUCCESS) 791 return result; 792 793 result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading."); 794 if (result != VDO_SUCCESS) 795 return result; 796 797 set_info_state(info, PS_INCOMING); 798 cache->outstanding_reads++; 799 ADD_ONCE(cache->stats.pages_loaded, 1); 800 callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error); 801 vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio, 802 callback, REQ_OP_READ | REQ_PRIO); 803 return VDO_SUCCESS; 804 } 805 806 static void write_pages(struct vdo_completion *completion); 807 808 /** handle_flush_error() - Handle errors flushing the layer. */ 809 static void handle_flush_error(struct vdo_completion *completion) 810 { 811 struct page_info *info = completion->parent; 812 813 vio_record_metadata_io_error(as_vio(completion)); 814 set_persistent_error(info->cache, "flush failed", completion->result); 815 write_pages(completion); 816 } 817 818 static void flush_endio(struct bio *bio) 819 { 820 struct vio *vio = bio->bi_private; 821 struct page_info *info = vio->completion.parent; 822 823 continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id); 824 } 825 826 /** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */ 827 static void save_pages(struct vdo_page_cache *cache) 828 { 829 struct page_info *info; 830 struct vio *vio; 831 832 if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0)) 833 return; 834 835 assert_io_allowed(cache); 836 837 info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry); 838 839 cache->pages_in_flush = cache->pages_to_flush; 840 cache->pages_to_flush = 0; 841 ADD_ONCE(cache->stats.flush_count, 1); 842 843 vio = info->vio; 844 845 /* 846 * We must make sure that the recovery journal entries that changed these pages were 847 * successfully persisted, and thus must issue a flush before each batch of pages is 848 * written to ensure this. 849 */ 850 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error); 851 } 852 853 /** 854 * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved. 855 * @info: The page info to save. 856 * 857 * Once in the list, a page may not be used until it has been written out. 858 */ 859 static void schedule_page_save(struct page_info *info) 860 { 861 if (info->busy > 0) { 862 info->write_status = WRITE_STATUS_DEFERRED; 863 return; 864 } 865 866 info->cache->pages_to_flush++; 867 info->cache->outstanding_writes++; 868 set_info_state(info, PS_OUTGOING); 869 } 870 871 /** 872 * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving 873 * pages if another save is not in progress. 874 * @info: The page info to save. 875 */ 876 static void launch_page_save(struct page_info *info) 877 { 878 schedule_page_save(info); 879 save_pages(info->cache); 880 } 881 882 /** 883 * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is 884 * requesting a given page number. 885 * @waiter: The page completion waiter to check. 886 * @context: A pointer to the pbn of the desired page. 887 * 888 * Implements waiter_match_fn. 889 * 890 * Return: true if the page completion is for the desired page number. 891 */ 892 static bool completion_needs_page(struct vdo_waiter *waiter, void *context) 893 { 894 physical_block_number_t *pbn = context; 895 896 return (page_completion_from_waiter(waiter)->pbn == *pbn); 897 } 898 899 /** 900 * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and 901 * any other completions that match it in page number. 902 * @info: The page info to allocate a page for. 903 */ 904 static void allocate_free_page(struct page_info *info) 905 { 906 int result; 907 struct vdo_waiter *oldest_waiter; 908 physical_block_number_t pbn; 909 struct vdo_page_cache *cache = info->cache; 910 911 assert_on_cache_thread(cache, __func__); 912 913 if (!vdo_waitq_has_waiters(&cache->free_waiters)) { 914 if (cache->stats.cache_pressure > 0) { 915 vdo_log_info("page cache pressure relieved"); 916 WRITE_ONCE(cache->stats.cache_pressure, 0); 917 } 918 919 return; 920 } 921 922 result = reset_page_info(info); 923 if (result != VDO_SUCCESS) { 924 set_persistent_error(cache, "cannot reset page info", result); 925 return; 926 } 927 928 oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters); 929 pbn = page_completion_from_waiter(oldest_waiter)->pbn; 930 931 /* 932 * Remove all entries which match the page number in question and push them onto the page 933 * info's waitq. 934 */ 935 vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page, 936 &pbn, &info->waiting); 937 cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting); 938 939 result = launch_page_load(info, pbn); 940 if (result != VDO_SUCCESS) { 941 vdo_waitq_notify_all_waiters(&info->waiting, 942 complete_waiter_with_error, &result); 943 } 944 } 945 946 /** 947 * discard_a_page() - Begin the process of discarding a page. 948 * @cache: The page cache. 949 * 950 * If no page is discardable, increments a count of deferred frees so that the next release of a 951 * page which is no longer busy will kick off another discard cycle. This is an indication that the 952 * cache is not big enough. 953 * 954 * If the selected page is not dirty, immediately allocates the page to the oldest completion 955 * waiting for a free page. 956 */ 957 static void discard_a_page(struct vdo_page_cache *cache) 958 { 959 struct page_info *info = select_lru_page(cache); 960 961 if (info == NULL) { 962 report_cache_pressure(cache); 963 return; 964 } 965 966 if (!is_dirty(info)) { 967 allocate_free_page(info); 968 return; 969 } 970 971 VDO_ASSERT_LOG_ONLY(!is_in_flight(info), 972 "page selected for discard is not in flight"); 973 974 cache->discard_count++; 975 info->write_status = WRITE_STATUS_DISCARD; 976 launch_page_save(info); 977 } 978 979 static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp) 980 { 981 struct vdo_page_cache *cache = vdo_page_comp->cache; 982 983 cache->waiter_count++; 984 vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter); 985 discard_a_page(cache); 986 } 987 988 /** 989 * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free 990 * page. 991 * @cache: The page cache. 992 */ 993 static void discard_page_if_needed(struct vdo_page_cache *cache) 994 { 995 if (cache->waiter_count > cache->discard_count) 996 discard_a_page(cache); 997 } 998 999 /** 1000 * write_has_finished() - Inform the cache that a write has finished (possibly with an error). 1001 * @info: The info structure for the page whose write just completed. 1002 * 1003 * Return: true if the page write was a discard. 1004 */ 1005 static bool write_has_finished(struct page_info *info) 1006 { 1007 bool was_discard = (info->write_status == WRITE_STATUS_DISCARD); 1008 1009 assert_on_cache_thread(info->cache, __func__); 1010 info->cache->outstanding_writes--; 1011 1012 info->write_status = WRITE_STATUS_NORMAL; 1013 return was_discard; 1014 } 1015 1016 /** 1017 * handle_page_write_error() - Handler for page write errors. 1018 * @completion: The page write vio. 1019 */ 1020 static void handle_page_write_error(struct vdo_completion *completion) 1021 { 1022 int result = completion->result; 1023 struct page_info *info = completion->parent; 1024 struct vdo_page_cache *cache = info->cache; 1025 1026 vio_record_metadata_io_error(as_vio(completion)); 1027 1028 /* If we're already read-only, write failures are to be expected. */ 1029 if (result != VDO_READ_ONLY) { 1030 vdo_log_ratelimit(vdo_log_error, 1031 "failed to write block map page %llu", 1032 (unsigned long long) info->pbn); 1033 } 1034 1035 set_info_state(info, PS_DIRTY); 1036 ADD_ONCE(cache->stats.failed_writes, 1); 1037 set_persistent_error(cache, "cannot write page", result); 1038 1039 if (!write_has_finished(info)) 1040 discard_page_if_needed(cache); 1041 1042 check_for_drain_complete(cache->zone); 1043 } 1044 1045 static void page_is_written_out(struct vdo_completion *completion); 1046 1047 static void write_cache_page_endio(struct bio *bio) 1048 { 1049 struct vio *vio = bio->bi_private; 1050 struct page_info *info = vio->completion.parent; 1051 1052 continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id); 1053 } 1054 1055 /** 1056 * page_is_written_out() - Callback used when a page has been written out. 1057 * @completion: The vio which wrote the page. Its parent is a page_info. 1058 */ 1059 static void page_is_written_out(struct vdo_completion *completion) 1060 { 1061 bool was_discard, reclaimed; 1062 u32 reclamations; 1063 struct page_info *info = completion->parent; 1064 struct vdo_page_cache *cache = info->cache; 1065 struct block_map_page *page = (struct block_map_page *) get_page_buffer(info); 1066 1067 if (!page->header.initialized) { 1068 page->header.initialized = true; 1069 vdo_submit_metadata_vio(info->vio, info->pbn, 1070 write_cache_page_endio, 1071 handle_page_write_error, 1072 REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH); 1073 return; 1074 } 1075 1076 /* Handle journal updates and torn write protection. */ 1077 vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal, 1078 info->recovery_lock, 1079 VDO_ZONE_TYPE_LOGICAL, 1080 cache->zone->zone_number); 1081 info->recovery_lock = 0; 1082 was_discard = write_has_finished(info); 1083 reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting)); 1084 1085 set_info_state(info, PS_RESIDENT); 1086 1087 reclamations = distribute_page_over_waitq(info, &info->waiting); 1088 ADD_ONCE(cache->stats.reclaimed, reclamations); 1089 1090 if (was_discard) 1091 cache->discard_count--; 1092 1093 if (reclaimed) 1094 discard_page_if_needed(cache); 1095 else 1096 allocate_free_page(info); 1097 1098 check_for_drain_complete(cache->zone); 1099 } 1100 1101 /** 1102 * write_pages() - Write the batch of pages which were covered by the layer flush which just 1103 * completed. 1104 * @flush_completion: The flush vio. 1105 * 1106 * This callback is registered in save_pages(). 1107 */ 1108 static void write_pages(struct vdo_completion *flush_completion) 1109 { 1110 struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache; 1111 1112 /* 1113 * We need to cache these two values on the stack since it is possible for the last 1114 * page info to cause the page cache to get freed. Hence once we launch the last page, 1115 * it may be unsafe to dereference the cache. 1116 */ 1117 bool has_unflushed_pages = (cache->pages_to_flush > 0); 1118 page_count_t pages_in_flush = cache->pages_in_flush; 1119 1120 cache->pages_in_flush = 0; 1121 while (pages_in_flush-- > 0) { 1122 struct page_info *info = 1123 list_first_entry(&cache->outgoing_list, struct page_info, 1124 state_entry); 1125 1126 list_del_init(&info->state_entry); 1127 if (vdo_is_read_only(info->cache->vdo)) { 1128 struct vdo_completion *completion = &info->vio->completion; 1129 1130 vdo_reset_completion(completion); 1131 completion->callback = page_is_written_out; 1132 completion->error_handler = handle_page_write_error; 1133 vdo_fail_completion(completion, VDO_READ_ONLY); 1134 continue; 1135 } 1136 ADD_ONCE(info->cache->stats.pages_saved, 1); 1137 vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio, 1138 handle_page_write_error, REQ_OP_WRITE | REQ_PRIO); 1139 } 1140 1141 if (has_unflushed_pages) { 1142 /* 1143 * If there are unflushed pages, the cache can't have been freed, so this call is 1144 * safe. 1145 */ 1146 save_pages(cache); 1147 } 1148 } 1149 1150 /** 1151 * vdo_release_page_completion() - Release a VDO Page Completion. 1152 * @completion: The page completion to release. 1153 * 1154 * The page referenced by this completion (if any) will no longer be held busy by this completion. 1155 * If a page becomes discardable and there are completions awaiting free pages then a new round of 1156 * page discarding is started. 1157 */ 1158 void vdo_release_page_completion(struct vdo_completion *completion) 1159 { 1160 struct page_info *discard_info = NULL; 1161 struct vdo_page_completion *page_completion = as_vdo_page_completion(completion); 1162 struct vdo_page_cache *cache; 1163 1164 if (completion->result == VDO_SUCCESS) { 1165 if (!validate_completed_page_or_enter_read_only_mode(page_completion, false)) 1166 return; 1167 1168 if (--page_completion->info->busy == 0) 1169 discard_info = page_completion->info; 1170 } 1171 1172 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL), 1173 "Page being released after leaving all queues"); 1174 1175 page_completion->info = NULL; 1176 cache = page_completion->cache; 1177 assert_on_cache_thread(cache, __func__); 1178 1179 if (discard_info != NULL) { 1180 if (discard_info->write_status == WRITE_STATUS_DEFERRED) { 1181 discard_info->write_status = WRITE_STATUS_NORMAL; 1182 launch_page_save(discard_info); 1183 } 1184 1185 /* 1186 * if there are excess requests for pages (that have not already started discards) 1187 * we need to discard some page (which may be this one) 1188 */ 1189 discard_page_if_needed(cache); 1190 } 1191 } 1192 1193 static void load_page_for_completion(struct page_info *info, 1194 struct vdo_page_completion *vdo_page_comp) 1195 { 1196 int result; 1197 1198 vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter); 1199 result = launch_page_load(info, vdo_page_comp->pbn); 1200 if (result != VDO_SUCCESS) { 1201 vdo_waitq_notify_all_waiters(&info->waiting, 1202 complete_waiter_with_error, &result); 1203 } 1204 } 1205 1206 /** 1207 * vdo_get_page() - Initialize a page completion and get a block map page. 1208 * @page_completion: The vdo_page_completion to initialize. 1209 * @zone: The block map zone of the desired page. 1210 * @pbn: The absolute physical block of the desired page. 1211 * @writable: Whether the page can be modified. 1212 * @parent: The object to notify when the fetch is complete. 1213 * @callback: The notification callback. 1214 * @error_handler: The handler for fetch errors. 1215 * @requeue: Whether we must requeue when notifying the parent. 1216 * 1217 * May cause another page to be discarded (potentially writing a dirty page) and the one nominated 1218 * by the completion to be loaded from disk. When the callback is invoked, the page will be 1219 * resident in the cache and marked busy. All callers must call vdo_release_page_completion() 1220 * when they are done with the page to clear the busy mark. 1221 */ 1222 void vdo_get_page(struct vdo_page_completion *page_completion, 1223 struct block_map_zone *zone, physical_block_number_t pbn, 1224 bool writable, void *parent, vdo_action_fn callback, 1225 vdo_action_fn error_handler, bool requeue) 1226 { 1227 struct vdo_page_cache *cache = &zone->page_cache; 1228 struct vdo_completion *completion = &page_completion->completion; 1229 struct page_info *info; 1230 1231 assert_on_cache_thread(cache, __func__); 1232 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL), 1233 "New page completion was not already on a wait queue"); 1234 1235 *page_completion = (struct vdo_page_completion) { 1236 .pbn = pbn, 1237 .writable = writable, 1238 .cache = cache, 1239 }; 1240 1241 vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION); 1242 vdo_prepare_completion(completion, callback, error_handler, 1243 cache->zone->thread_id, parent); 1244 completion->requeue = requeue; 1245 1246 if (page_completion->writable && vdo_is_read_only(cache->vdo)) { 1247 vdo_fail_completion(completion, VDO_READ_ONLY); 1248 return; 1249 } 1250 1251 if (page_completion->writable) 1252 ADD_ONCE(cache->stats.write_count, 1); 1253 else 1254 ADD_ONCE(cache->stats.read_count, 1); 1255 1256 info = find_page(cache, page_completion->pbn); 1257 if (info != NULL) { 1258 /* The page is in the cache already. */ 1259 if ((info->write_status == WRITE_STATUS_DEFERRED) || 1260 is_incoming(info) || 1261 (is_outgoing(info) && page_completion->writable)) { 1262 /* The page is unusable until it has finished I/O. */ 1263 ADD_ONCE(cache->stats.wait_for_page, 1); 1264 vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter); 1265 return; 1266 } 1267 1268 if (is_valid(info)) { 1269 /* The page is usable. */ 1270 ADD_ONCE(cache->stats.found_in_cache, 1); 1271 if (!is_present(info)) 1272 ADD_ONCE(cache->stats.read_outgoing, 1); 1273 update_lru(info); 1274 info->busy++; 1275 complete_with_page(info, page_completion); 1276 return; 1277 } 1278 1279 /* Something horrible has gone wrong. */ 1280 VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state."); 1281 } 1282 1283 /* The page must be fetched. */ 1284 info = find_free_page(cache); 1285 if (info != NULL) { 1286 ADD_ONCE(cache->stats.fetch_required, 1); 1287 load_page_for_completion(info, page_completion); 1288 return; 1289 } 1290 1291 /* The page must wait for a page to be discarded. */ 1292 ADD_ONCE(cache->stats.discard_required, 1); 1293 discard_page_for_completion(page_completion); 1294 } 1295 1296 /** 1297 * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy. 1298 * @completion: The vdo_page_completion containing the page. 1299 */ 1300 void vdo_request_page_write(struct vdo_completion *completion) 1301 { 1302 struct page_info *info; 1303 struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion); 1304 1305 if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true)) 1306 return; 1307 1308 info = vdo_page_comp->info; 1309 set_info_state(info, PS_DIRTY); 1310 launch_page_save(info); 1311 } 1312 1313 /** 1314 * vdo_get_cached_page() - Get the block map page from a page completion. 1315 * @completion: A vdo page completion whose callback has been called. 1316 * @page_ptr: A pointer to hold the page 1317 * 1318 * Return: VDO_SUCCESS or an error 1319 */ 1320 int vdo_get_cached_page(struct vdo_completion *completion, 1321 struct block_map_page **page_ptr) 1322 { 1323 int result; 1324 struct vdo_page_completion *vpc; 1325 1326 vpc = as_vdo_page_completion(completion); 1327 result = validate_completed_page(vpc, true); 1328 if (result == VDO_SUCCESS) 1329 *page_ptr = (struct block_map_page *) get_page_buffer(vpc->info); 1330 1331 return result; 1332 } 1333 1334 /** 1335 * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache. 1336 * @cache: The page cache. 1337 * 1338 * There must not be any dirty pages in the cache. 1339 * 1340 * Return: A success or error code. 1341 */ 1342 int vdo_invalidate_page_cache(struct vdo_page_cache *cache) 1343 { 1344 struct page_info *info; 1345 1346 assert_on_cache_thread(cache, __func__); 1347 1348 /* Make sure we don't throw away any dirty pages. */ 1349 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 1350 int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages"); 1351 1352 if (result != VDO_SUCCESS) 1353 return result; 1354 } 1355 1356 /* Reset the page map by re-allocating it. */ 1357 vdo_int_map_free(vdo_forget(cache->page_map)); 1358 return vdo_int_map_create(cache->page_count, &cache->page_map); 1359 } 1360 1361 /** 1362 * get_tree_page_by_index() - Get the tree page for a given height and page index. 1363 * @forest: The block map forest. 1364 * @root_index: The root index of the tree to search. 1365 * @height: The height in the tree. 1366 * @page_index: The page index. 1367 * 1368 * Return: The requested page. 1369 */ 1370 static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest, 1371 root_count_t root_index, 1372 height_t height, 1373 page_number_t page_index) 1374 { 1375 page_number_t offset = 0; 1376 size_t segment; 1377 1378 for (segment = 0; segment < forest->segments; segment++) { 1379 page_number_t border = forest->boundaries[segment].levels[height - 1]; 1380 1381 if (page_index < border) { 1382 struct block_map_tree *tree = &forest->trees[root_index]; 1383 1384 return &(tree->segments[segment].levels[height - 1][page_index - offset]); 1385 } 1386 1387 offset = border; 1388 } 1389 1390 return NULL; 1391 } 1392 1393 /* Get the page referred to by the lock's tree slot at its current height. */ 1394 static inline struct tree_page *get_tree_page(const struct block_map_zone *zone, 1395 const struct tree_lock *lock) 1396 { 1397 return get_tree_page_by_index(zone->block_map->forest, lock->root_index, 1398 lock->height, 1399 lock->tree_slots[lock->height].page_index); 1400 } 1401 1402 /** vdo_copy_valid_page() - Validate and copy a buffer to a page. */ 1403 bool vdo_copy_valid_page(char *buffer, nonce_t nonce, 1404 physical_block_number_t pbn, 1405 struct block_map_page *page) 1406 { 1407 struct block_map_page *loaded = (struct block_map_page *) buffer; 1408 enum block_map_page_validity validity = 1409 vdo_validate_block_map_page(loaded, nonce, pbn); 1410 1411 if (validity == VDO_BLOCK_MAP_PAGE_VALID) { 1412 memcpy(page, loaded, VDO_BLOCK_SIZE); 1413 return true; 1414 } 1415 1416 if (validity == VDO_BLOCK_MAP_PAGE_BAD) { 1417 vdo_log_error_strerror(VDO_BAD_PAGE, 1418 "Expected page %llu but got page %llu instead", 1419 (unsigned long long) pbn, 1420 (unsigned long long) vdo_get_block_map_page_pbn(loaded)); 1421 } 1422 1423 return false; 1424 } 1425 1426 /** 1427 * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within 1428 * a cyclic range of values from 0 to (modulus - 1). 1429 * @lower: The lowest value to accept. 1430 * @value: The value to check. 1431 * @upper: The highest value to accept. 1432 * @modulus: The size of the cyclic space, no more than 2^15. 1433 * 1434 * The value and both bounds must be smaller than the modulus. 1435 * 1436 * Return: true if the value is in range. 1437 */ 1438 static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus) 1439 { 1440 if (value < lower) 1441 value += modulus; 1442 if (upper < lower) 1443 upper += modulus; 1444 return (value <= upper); 1445 } 1446 1447 /** 1448 * is_not_older() - Check whether a generation is strictly older than some other generation in the 1449 * context of a zone's current generation range. 1450 * @zone: The zone in which to do the comparison. 1451 * @a: The generation in question. 1452 * @b: The generation to compare to. 1453 * 1454 * Return: true if generation @a is not strictly older than generation @b in the context of @zone 1455 */ 1456 static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b) 1457 { 1458 int result; 1459 1460 result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) && 1461 in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)), 1462 "generation(s) %u, %u are out of range [%u, %u]", 1463 a, b, zone->oldest_generation, zone->generation); 1464 if (result != VDO_SUCCESS) { 1465 enter_zone_read_only_mode(zone, result); 1466 return true; 1467 } 1468 1469 return in_cyclic_range(b, a, zone->generation, 1 << 8); 1470 } 1471 1472 static void release_generation(struct block_map_zone *zone, u8 generation) 1473 { 1474 int result; 1475 1476 result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0), 1477 "dirty page count underflow for generation %u", generation); 1478 if (result != VDO_SUCCESS) { 1479 enter_zone_read_only_mode(zone, result); 1480 return; 1481 } 1482 1483 zone->dirty_page_counts[generation]--; 1484 while ((zone->dirty_page_counts[zone->oldest_generation] == 0) && 1485 (zone->oldest_generation != zone->generation)) 1486 zone->oldest_generation++; 1487 } 1488 1489 static void set_generation(struct block_map_zone *zone, struct tree_page *page, 1490 u8 new_generation) 1491 { 1492 u32 new_count; 1493 int result; 1494 bool decrement_old = vdo_waiter_is_waiting(&page->waiter); 1495 u8 old_generation = page->generation; 1496 1497 if (decrement_old && (old_generation == new_generation)) 1498 return; 1499 1500 page->generation = new_generation; 1501 new_count = ++zone->dirty_page_counts[new_generation]; 1502 result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u", 1503 new_generation); 1504 if (result != VDO_SUCCESS) { 1505 enter_zone_read_only_mode(zone, result); 1506 return; 1507 } 1508 1509 if (decrement_old) 1510 release_generation(zone, old_generation); 1511 } 1512 1513 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio); 1514 1515 /* Implements waiter_callback_fn */ 1516 static void write_page_callback(struct vdo_waiter *waiter, void *context) 1517 { 1518 write_page(container_of(waiter, struct tree_page, waiter), context); 1519 } 1520 1521 static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone) 1522 { 1523 waiter->callback = write_page_callback; 1524 acquire_vio_from_pool(zone->vio_pool, waiter); 1525 } 1526 1527 /* Return: true if all possible generations were not already active */ 1528 static bool attempt_increment(struct block_map_zone *zone) 1529 { 1530 u8 generation = zone->generation + 1; 1531 1532 if (zone->oldest_generation == generation) 1533 return false; 1534 1535 zone->generation = generation; 1536 return true; 1537 } 1538 1539 /* Launches a flush if one is not already in progress. */ 1540 static void enqueue_page(struct tree_page *page, struct block_map_zone *zone) 1541 { 1542 if ((zone->flusher == NULL) && attempt_increment(zone)) { 1543 zone->flusher = page; 1544 acquire_vio(&page->waiter, zone); 1545 return; 1546 } 1547 1548 vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter); 1549 } 1550 1551 static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context) 1552 { 1553 struct tree_page *page = container_of(waiter, struct tree_page, waiter); 1554 struct write_if_not_dirtied_context *write_context = context; 1555 1556 if (page->generation == write_context->generation) { 1557 acquire_vio(waiter, write_context->zone); 1558 return; 1559 } 1560 1561 enqueue_page(page, write_context->zone); 1562 } 1563 1564 static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio) 1565 { 1566 return_vio_to_pool(vio); 1567 check_for_drain_complete(zone); 1568 } 1569 1570 /* This callback is registered in write_initialized_page(). */ 1571 static void finish_page_write(struct vdo_completion *completion) 1572 { 1573 bool dirty; 1574 struct vio *vio = as_vio(completion); 1575 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1576 struct tree_page *page = completion->parent; 1577 struct block_map_zone *zone = pooled->context; 1578 1579 vdo_release_recovery_journal_block_reference(zone->block_map->journal, 1580 page->writing_recovery_lock, 1581 VDO_ZONE_TYPE_LOGICAL, 1582 zone->zone_number); 1583 1584 dirty = (page->writing_generation != page->generation); 1585 release_generation(zone, page->writing_generation); 1586 page->writing = false; 1587 1588 if (zone->flusher == page) { 1589 struct write_if_not_dirtied_context context = { 1590 .zone = zone, 1591 .generation = page->writing_generation, 1592 }; 1593 1594 vdo_waitq_notify_all_waiters(&zone->flush_waiters, 1595 write_page_if_not_dirtied, &context); 1596 if (dirty && attempt_increment(zone)) { 1597 write_page(page, pooled); 1598 return; 1599 } 1600 1601 zone->flusher = NULL; 1602 } 1603 1604 if (dirty) { 1605 enqueue_page(page, zone); 1606 } else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) && 1607 attempt_increment(zone)) { 1608 zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters), 1609 struct tree_page, waiter); 1610 write_page(zone->flusher, pooled); 1611 return; 1612 } 1613 1614 return_to_pool(zone, pooled); 1615 } 1616 1617 static void handle_write_error(struct vdo_completion *completion) 1618 { 1619 int result = completion->result; 1620 struct vio *vio = as_vio(completion); 1621 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1622 struct block_map_zone *zone = pooled->context; 1623 1624 vio_record_metadata_io_error(vio); 1625 enter_zone_read_only_mode(zone, result); 1626 return_to_pool(zone, pooled); 1627 } 1628 1629 static void write_page_endio(struct bio *bio); 1630 1631 static void write_initialized_page(struct vdo_completion *completion) 1632 { 1633 struct vio *vio = as_vio(completion); 1634 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1635 struct block_map_zone *zone = pooled->context; 1636 struct tree_page *tree_page = completion->parent; 1637 struct block_map_page *page = (struct block_map_page *) vio->data; 1638 blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO; 1639 1640 /* 1641 * Now that we know the page has been written at least once, mark the copy we are writing 1642 * as initialized. 1643 */ 1644 page->header.initialized = true; 1645 1646 if (zone->flusher == tree_page) 1647 operation |= REQ_PREFLUSH; 1648 1649 vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page), 1650 write_page_endio, handle_write_error, 1651 operation); 1652 } 1653 1654 static void write_page_endio(struct bio *bio) 1655 { 1656 struct pooled_vio *vio = bio->bi_private; 1657 struct block_map_zone *zone = vio->context; 1658 struct block_map_page *page = (struct block_map_page *) vio->vio.data; 1659 1660 continue_vio_after_io(&vio->vio, 1661 (page->header.initialized ? 1662 finish_page_write : write_initialized_page), 1663 zone->thread_id); 1664 } 1665 1666 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio) 1667 { 1668 struct vdo_completion *completion = &vio->vio.completion; 1669 struct block_map_zone *zone = vio->context; 1670 struct block_map_page *page = vdo_as_block_map_page(tree_page); 1671 1672 if ((zone->flusher != tree_page) && 1673 is_not_older(zone, tree_page->generation, zone->generation)) { 1674 /* 1675 * This page was re-dirtied after the last flush was issued, hence we need to do 1676 * another flush. 1677 */ 1678 enqueue_page(tree_page, zone); 1679 return_to_pool(zone, vio); 1680 return; 1681 } 1682 1683 completion->parent = tree_page; 1684 memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE); 1685 completion->callback_thread_id = zone->thread_id; 1686 1687 tree_page->writing = true; 1688 tree_page->writing_generation = tree_page->generation; 1689 tree_page->writing_recovery_lock = tree_page->recovery_lock; 1690 1691 /* Clear this now so that we know this page is not on any dirty list. */ 1692 tree_page->recovery_lock = 0; 1693 1694 /* 1695 * We've already copied the page into the vio which will write it, so if it was not yet 1696 * initialized, the first write will indicate that (for torn write protection). It is now 1697 * safe to mark it as initialized in memory since if the write fails, the in memory state 1698 * will become irrelevant. 1699 */ 1700 if (page->header.initialized) { 1701 write_initialized_page(completion); 1702 return; 1703 } 1704 1705 page->header.initialized = true; 1706 vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page), 1707 write_page_endio, handle_write_error, 1708 REQ_OP_WRITE | REQ_PRIO); 1709 } 1710 1711 /* Release a lock on a page which was being loaded or allocated. */ 1712 static void release_page_lock(struct data_vio *data_vio, char *what) 1713 { 1714 struct block_map_zone *zone; 1715 struct tree_lock *lock_holder; 1716 struct tree_lock *lock = &data_vio->tree_lock; 1717 1718 VDO_ASSERT_LOG_ONLY(lock->locked, 1719 "release of unlocked block map page %s for key %llu in tree %u", 1720 what, (unsigned long long) lock->key, lock->root_index); 1721 1722 zone = data_vio->logical.zone->block_map_zone; 1723 lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key); 1724 VDO_ASSERT_LOG_ONLY((lock_holder == lock), 1725 "block map page %s mismatch for key %llu in tree %u", 1726 what, (unsigned long long) lock->key, lock->root_index); 1727 lock->locked = false; 1728 } 1729 1730 static void finish_lookup(struct data_vio *data_vio, int result) 1731 { 1732 data_vio->tree_lock.height = 0; 1733 1734 --data_vio->logical.zone->block_map_zone->active_lookups; 1735 1736 set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot); 1737 data_vio->vio.completion.error_handler = handle_data_vio_error; 1738 continue_data_vio_with_error(data_vio, result); 1739 } 1740 1741 static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context) 1742 { 1743 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1744 int result = *((int *) context); 1745 1746 if (!data_vio->write) { 1747 if (result == VDO_NO_SPACE) 1748 result = VDO_SUCCESS; 1749 } else if (result != VDO_NO_SPACE) { 1750 result = VDO_READ_ONLY; 1751 } 1752 1753 finish_lookup(data_vio, result); 1754 } 1755 1756 static void abort_lookup(struct data_vio *data_vio, int result, char *what) 1757 { 1758 if (result != VDO_NO_SPACE) 1759 enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result); 1760 1761 if (data_vio->tree_lock.locked) { 1762 release_page_lock(data_vio, what); 1763 vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters, 1764 abort_lookup_for_waiter, 1765 &result); 1766 } 1767 1768 finish_lookup(data_vio, result); 1769 } 1770 1771 static void abort_load(struct data_vio *data_vio, int result) 1772 { 1773 abort_lookup(data_vio, result, "load"); 1774 } 1775 1776 static bool __must_check is_invalid_tree_entry(const struct vdo *vdo, 1777 const struct data_location *mapping, 1778 height_t height) 1779 { 1780 if (!vdo_is_valid_location(mapping) || 1781 vdo_is_state_compressed(mapping->state) || 1782 (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK))) 1783 return true; 1784 1785 /* Roots aren't physical data blocks, so we can't check their PBNs. */ 1786 if (height == VDO_BLOCK_MAP_TREE_HEIGHT) 1787 return false; 1788 1789 return !vdo_is_physical_data_block(vdo->depot, mapping->pbn); 1790 } 1791 1792 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio); 1793 static void allocate_block_map_page(struct block_map_zone *zone, 1794 struct data_vio *data_vio); 1795 1796 static void continue_with_loaded_page(struct data_vio *data_vio, 1797 struct block_map_page *page) 1798 { 1799 struct tree_lock *lock = &data_vio->tree_lock; 1800 struct block_map_tree_slot slot = lock->tree_slots[lock->height]; 1801 struct data_location mapping = 1802 vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]); 1803 1804 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) { 1805 vdo_log_error_strerror(VDO_BAD_MAPPING, 1806 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u", 1807 (unsigned long long) mapping.pbn, mapping.state, 1808 lock->tree_slots[lock->height - 1].page_index, 1809 lock->height - 1); 1810 abort_load(data_vio, VDO_BAD_MAPPING); 1811 return; 1812 } 1813 1814 if (!vdo_is_mapped_location(&mapping)) { 1815 /* The page we need is unallocated */ 1816 allocate_block_map_page(data_vio->logical.zone->block_map_zone, 1817 data_vio); 1818 return; 1819 } 1820 1821 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn; 1822 if (lock->height == 1) { 1823 finish_lookup(data_vio, VDO_SUCCESS); 1824 return; 1825 } 1826 1827 /* We know what page we need to load next */ 1828 load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio); 1829 } 1830 1831 static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context) 1832 { 1833 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1834 1835 data_vio->tree_lock.height--; 1836 continue_with_loaded_page(data_vio, context); 1837 } 1838 1839 static void finish_block_map_page_load(struct vdo_completion *completion) 1840 { 1841 physical_block_number_t pbn; 1842 struct tree_page *tree_page; 1843 struct block_map_page *page; 1844 nonce_t nonce; 1845 struct vio *vio = as_vio(completion); 1846 struct pooled_vio *pooled = vio_as_pooled_vio(vio); 1847 struct data_vio *data_vio = completion->parent; 1848 struct block_map_zone *zone = pooled->context; 1849 struct tree_lock *tree_lock = &data_vio->tree_lock; 1850 1851 tree_lock->height--; 1852 pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn; 1853 tree_page = get_tree_page(zone, tree_lock); 1854 page = (struct block_map_page *) tree_page->page_buffer; 1855 nonce = zone->block_map->nonce; 1856 1857 if (!vdo_copy_valid_page(vio->data, nonce, pbn, page)) 1858 vdo_format_block_map_page(page, nonce, pbn, false); 1859 return_vio_to_pool(pooled); 1860 1861 /* Release our claim to the load and wake any waiters */ 1862 release_page_lock(data_vio, "load"); 1863 vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page); 1864 continue_with_loaded_page(data_vio, page); 1865 } 1866 1867 static void handle_io_error(struct vdo_completion *completion) 1868 { 1869 int result = completion->result; 1870 struct vio *vio = as_vio(completion); 1871 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1872 struct data_vio *data_vio = completion->parent; 1873 1874 vio_record_metadata_io_error(vio); 1875 return_vio_to_pool(pooled); 1876 abort_load(data_vio, result); 1877 } 1878 1879 static void load_page_endio(struct bio *bio) 1880 { 1881 struct vio *vio = bio->bi_private; 1882 struct data_vio *data_vio = vio->completion.parent; 1883 1884 continue_vio_after_io(vio, finish_block_map_page_load, 1885 data_vio->logical.zone->thread_id); 1886 } 1887 1888 static void load_page(struct vdo_waiter *waiter, void *context) 1889 { 1890 struct pooled_vio *pooled = context; 1891 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1892 struct tree_lock *lock = &data_vio->tree_lock; 1893 physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn; 1894 1895 pooled->vio.completion.parent = data_vio; 1896 vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio, 1897 handle_io_error, REQ_OP_READ | REQ_PRIO); 1898 } 1899 1900 /* 1901 * If the page is already locked, queue up to wait for the lock to be released. If the lock is 1902 * acquired, @data_vio->tree_lock.locked will be true. 1903 */ 1904 static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio) 1905 { 1906 int result; 1907 struct tree_lock *lock_holder; 1908 struct tree_lock *lock = &data_vio->tree_lock; 1909 height_t height = lock->height; 1910 struct block_map_tree_slot tree_slot = lock->tree_slots[height]; 1911 union page_key key; 1912 1913 key.descriptor = (struct page_descriptor) { 1914 .root_index = lock->root_index, 1915 .height = height, 1916 .page_index = tree_slot.page_index, 1917 .slot = tree_slot.block_map_slot.slot, 1918 }; 1919 lock->key = key.key; 1920 1921 result = vdo_int_map_put(zone->loading_pages, lock->key, 1922 lock, false, (void **) &lock_holder); 1923 if (result != VDO_SUCCESS) 1924 return result; 1925 1926 if (lock_holder == NULL) { 1927 /* We got the lock */ 1928 data_vio->tree_lock.locked = true; 1929 return VDO_SUCCESS; 1930 } 1931 1932 /* Someone else is loading or allocating the page we need */ 1933 vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter); 1934 return VDO_SUCCESS; 1935 } 1936 1937 /* Load a block map tree page from disk, for the next level in the data vio tree lock. */ 1938 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio) 1939 { 1940 int result; 1941 1942 result = attempt_page_lock(zone, data_vio); 1943 if (result != VDO_SUCCESS) { 1944 abort_load(data_vio, result); 1945 return; 1946 } 1947 1948 if (data_vio->tree_lock.locked) { 1949 data_vio->waiter.callback = load_page; 1950 acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter); 1951 } 1952 } 1953 1954 static void allocation_failure(struct vdo_completion *completion) 1955 { 1956 struct data_vio *data_vio = as_data_vio(completion); 1957 1958 if (vdo_requeue_completion_if_needed(completion, 1959 data_vio->logical.zone->thread_id)) 1960 return; 1961 1962 abort_lookup(data_vio, completion->result, "allocation"); 1963 } 1964 1965 static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context) 1966 { 1967 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1968 struct tree_lock *tree_lock = &data_vio->tree_lock; 1969 physical_block_number_t pbn = *((physical_block_number_t *) context); 1970 1971 tree_lock->height--; 1972 data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn; 1973 1974 if (tree_lock->height == 0) { 1975 finish_lookup(data_vio, VDO_SUCCESS); 1976 return; 1977 } 1978 1979 allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio); 1980 } 1981 1982 /** expire_oldest_list() - Expire the oldest list. */ 1983 static void expire_oldest_list(struct dirty_lists *dirty_lists) 1984 { 1985 block_count_t i = dirty_lists->offset++; 1986 1987 dirty_lists->oldest_period++; 1988 if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) { 1989 list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE], 1990 &dirty_lists->expired[VDO_TREE_PAGE]); 1991 } 1992 1993 if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) { 1994 list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE], 1995 &dirty_lists->expired[VDO_CACHE_PAGE]); 1996 } 1997 1998 if (dirty_lists->offset == dirty_lists->maximum_age) 1999 dirty_lists->offset = 0; 2000 } 2001 2002 2003 /** update_period() - Update the dirty_lists period if necessary. */ 2004 static void update_period(struct dirty_lists *dirty, sequence_number_t period) 2005 { 2006 while (dirty->next_period <= period) { 2007 if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age) 2008 expire_oldest_list(dirty); 2009 dirty->next_period++; 2010 } 2011 } 2012 2013 /** write_expired_elements() - Write out the expired list. */ 2014 static void write_expired_elements(struct block_map_zone *zone) 2015 { 2016 struct tree_page *page, *ttmp; 2017 struct page_info *info, *ptmp; 2018 struct list_head *expired; 2019 u8 generation = zone->generation; 2020 2021 expired = &zone->dirty_lists->expired[VDO_TREE_PAGE]; 2022 list_for_each_entry_safe(page, ttmp, expired, entry) { 2023 int result; 2024 2025 list_del_init(&page->entry); 2026 2027 result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter), 2028 "Newly expired page not already waiting to write"); 2029 if (result != VDO_SUCCESS) { 2030 enter_zone_read_only_mode(zone, result); 2031 continue; 2032 } 2033 2034 set_generation(zone, page, generation); 2035 if (!page->writing) 2036 enqueue_page(page, zone); 2037 } 2038 2039 expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE]; 2040 list_for_each_entry_safe(info, ptmp, expired, state_entry) { 2041 list_del_init(&info->state_entry); 2042 schedule_page_save(info); 2043 } 2044 2045 save_pages(&zone->page_cache); 2046 } 2047 2048 /** 2049 * add_to_dirty_lists() - Add an element to the dirty lists. 2050 * @zone: The zone in which we are operating. 2051 * @entry: The list entry of the element to add. 2052 * @type: The type of page. 2053 * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty. 2054 * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a 2055 * lock. 2056 */ 2057 static void add_to_dirty_lists(struct block_map_zone *zone, 2058 struct list_head *entry, 2059 enum block_map_page_type type, 2060 sequence_number_t old_period, 2061 sequence_number_t new_period) 2062 { 2063 struct dirty_lists *dirty_lists = zone->dirty_lists; 2064 2065 if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period))) 2066 return; 2067 2068 if (new_period < dirty_lists->oldest_period) { 2069 list_move_tail(entry, &dirty_lists->expired[type]); 2070 } else { 2071 update_period(dirty_lists, new_period); 2072 list_move_tail(entry, 2073 &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]); 2074 } 2075 2076 write_expired_elements(zone); 2077 } 2078 2079 /* 2080 * Record the allocation in the tree and wake any waiters now that the write lock has been 2081 * released. 2082 */ 2083 static void finish_block_map_allocation(struct vdo_completion *completion) 2084 { 2085 physical_block_number_t pbn; 2086 struct tree_page *tree_page; 2087 struct block_map_page *page; 2088 sequence_number_t old_lock; 2089 struct data_vio *data_vio = as_data_vio(completion); 2090 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 2091 struct tree_lock *tree_lock = &data_vio->tree_lock; 2092 height_t height = tree_lock->height; 2093 2094 assert_data_vio_in_logical_zone(data_vio); 2095 2096 tree_page = get_tree_page(zone, tree_lock); 2097 pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn; 2098 2099 /* Record the allocation. */ 2100 page = (struct block_map_page *) tree_page->page_buffer; 2101 old_lock = tree_page->recovery_lock; 2102 vdo_update_block_map_page(page, data_vio, pbn, 2103 VDO_MAPPING_STATE_UNCOMPRESSED, 2104 &tree_page->recovery_lock); 2105 2106 if (vdo_waiter_is_waiting(&tree_page->waiter)) { 2107 /* This page is waiting to be written out. */ 2108 if (zone->flusher != tree_page) { 2109 /* 2110 * The outstanding flush won't cover the update we just made, 2111 * so mark the page as needing another flush. 2112 */ 2113 set_generation(zone, tree_page, zone->generation); 2114 } 2115 } else { 2116 /* Put the page on a dirty list */ 2117 if (old_lock == 0) 2118 INIT_LIST_HEAD(&tree_page->entry); 2119 add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE, 2120 old_lock, tree_page->recovery_lock); 2121 } 2122 2123 tree_lock->height--; 2124 if (height > 1) { 2125 /* Format the interior node we just allocated (in memory). */ 2126 tree_page = get_tree_page(zone, tree_lock); 2127 vdo_format_block_map_page(tree_page->page_buffer, 2128 zone->block_map->nonce, 2129 pbn, false); 2130 } 2131 2132 /* Release our claim to the allocation and wake any waiters */ 2133 release_page_lock(data_vio, "allocation"); 2134 vdo_waitq_notify_all_waiters(&tree_lock->waiters, 2135 continue_allocation_for_waiter, &pbn); 2136 if (tree_lock->height == 0) { 2137 finish_lookup(data_vio, VDO_SUCCESS); 2138 return; 2139 } 2140 2141 allocate_block_map_page(zone, data_vio); 2142 } 2143 2144 static void release_block_map_write_lock(struct vdo_completion *completion) 2145 { 2146 struct data_vio *data_vio = as_data_vio(completion); 2147 2148 assert_data_vio_in_allocated_zone(data_vio); 2149 2150 release_data_vio_allocation_lock(data_vio, true); 2151 launch_data_vio_logical_callback(data_vio, finish_block_map_allocation); 2152 } 2153 2154 /* 2155 * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled, 2156 * to prevent deduplication against the block after we release the write lock on it, but before we 2157 * write out the page. 2158 */ 2159 static void set_block_map_page_reference_count(struct vdo_completion *completion) 2160 { 2161 struct data_vio *data_vio = as_data_vio(completion); 2162 2163 assert_data_vio_in_allocated_zone(data_vio); 2164 2165 completion->callback = release_block_map_write_lock; 2166 vdo_modify_reference_count(completion, &data_vio->increment_updater); 2167 } 2168 2169 static void journal_block_map_allocation(struct vdo_completion *completion) 2170 { 2171 struct data_vio *data_vio = as_data_vio(completion); 2172 2173 assert_data_vio_in_journal_zone(data_vio); 2174 2175 set_data_vio_allocated_zone_callback(data_vio, 2176 set_block_map_page_reference_count); 2177 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio); 2178 } 2179 2180 static void allocate_block(struct vdo_completion *completion) 2181 { 2182 struct data_vio *data_vio = as_data_vio(completion); 2183 struct tree_lock *lock = &data_vio->tree_lock; 2184 physical_block_number_t pbn; 2185 2186 assert_data_vio_in_allocated_zone(data_vio); 2187 2188 if (!vdo_allocate_block_in_zone(data_vio)) 2189 return; 2190 2191 pbn = data_vio->allocation.pbn; 2192 lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn; 2193 data_vio->increment_updater = (struct reference_updater) { 2194 .operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING, 2195 .increment = true, 2196 .zpbn = { 2197 .pbn = pbn, 2198 .state = VDO_MAPPING_STATE_UNCOMPRESSED, 2199 }, 2200 .lock = data_vio->allocation.lock, 2201 }; 2202 2203 launch_data_vio_journal_callback(data_vio, journal_block_map_allocation); 2204 } 2205 2206 static void allocate_block_map_page(struct block_map_zone *zone, 2207 struct data_vio *data_vio) 2208 { 2209 int result; 2210 2211 if (!data_vio->write || data_vio->is_discard) { 2212 /* This is a pure read or a discard, so there's nothing left to do here. */ 2213 finish_lookup(data_vio, VDO_SUCCESS); 2214 return; 2215 } 2216 2217 result = attempt_page_lock(zone, data_vio); 2218 if (result != VDO_SUCCESS) { 2219 abort_lookup(data_vio, result, "allocation"); 2220 return; 2221 } 2222 2223 if (!data_vio->tree_lock.locked) 2224 return; 2225 2226 data_vio_allocate_data_block(data_vio, VIO_BLOCK_MAP_WRITE_LOCK, 2227 allocate_block, allocation_failure); 2228 } 2229 2230 /** 2231 * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio 2232 * resides and cache that result in the data_vio. 2233 * @data_vio: The data vio. 2234 * 2235 * All ancestors in the tree will be allocated or loaded, as needed. 2236 */ 2237 void vdo_find_block_map_slot(struct data_vio *data_vio) 2238 { 2239 page_number_t page_index; 2240 struct block_map_tree_slot tree_slot; 2241 struct data_location mapping; 2242 struct block_map_page *page = NULL; 2243 struct tree_lock *lock = &data_vio->tree_lock; 2244 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 2245 2246 zone->active_lookups++; 2247 if (vdo_is_state_draining(&zone->state)) { 2248 finish_lookup(data_vio, VDO_SHUTTING_DOWN); 2249 return; 2250 } 2251 2252 lock->tree_slots[0].block_map_slot.slot = 2253 data_vio->logical.lbn % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2254 page_index = (lock->tree_slots[0].page_index / zone->block_map->root_count); 2255 tree_slot = (struct block_map_tree_slot) { 2256 .page_index = page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE, 2257 .block_map_slot = { 2258 .pbn = 0, 2259 .slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE, 2260 }, 2261 }; 2262 2263 for (lock->height = 1; lock->height <= VDO_BLOCK_MAP_TREE_HEIGHT; lock->height++) { 2264 physical_block_number_t pbn; 2265 2266 lock->tree_slots[lock->height] = tree_slot; 2267 page = (struct block_map_page *) (get_tree_page(zone, lock)->page_buffer); 2268 pbn = vdo_get_block_map_page_pbn(page); 2269 if (pbn != VDO_ZERO_BLOCK) { 2270 lock->tree_slots[lock->height].block_map_slot.pbn = pbn; 2271 break; 2272 } 2273 2274 /* Calculate the index and slot for the next level. */ 2275 tree_slot.block_map_slot.slot = 2276 tree_slot.page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2277 tree_slot.page_index = tree_slot.page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2278 } 2279 2280 /* The page at this height has been allocated and loaded. */ 2281 mapping = vdo_unpack_block_map_entry(&page->entries[tree_slot.block_map_slot.slot]); 2282 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) { 2283 vdo_log_error_strerror(VDO_BAD_MAPPING, 2284 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u", 2285 (unsigned long long) mapping.pbn, mapping.state, 2286 lock->tree_slots[lock->height - 1].page_index, 2287 lock->height - 1); 2288 abort_load(data_vio, VDO_BAD_MAPPING); 2289 return; 2290 } 2291 2292 if (!vdo_is_mapped_location(&mapping)) { 2293 /* The page we want one level down has not been allocated, so allocate it. */ 2294 allocate_block_map_page(zone, data_vio); 2295 return; 2296 } 2297 2298 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn; 2299 if (lock->height == 1) { 2300 /* This is the ultimate block map page, so we're done */ 2301 finish_lookup(data_vio, VDO_SUCCESS); 2302 return; 2303 } 2304 2305 /* We know what page we need to load. */ 2306 load_block_map_page(zone, data_vio); 2307 } 2308 2309 /* 2310 * Find the PBN of a leaf block map page. This method may only be used after all allocated tree 2311 * pages have been loaded, otherwise, it may give the wrong answer (0). 2312 */ 2313 physical_block_number_t vdo_find_block_map_page_pbn(struct block_map *map, 2314 page_number_t page_number) 2315 { 2316 struct data_location mapping; 2317 struct tree_page *tree_page; 2318 struct block_map_page *page; 2319 root_count_t root_index = page_number % map->root_count; 2320 page_number_t page_index = page_number / map->root_count; 2321 slot_number_t slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2322 2323 page_index /= VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2324 2325 tree_page = get_tree_page_by_index(map->forest, root_index, 1, page_index); 2326 page = (struct block_map_page *) tree_page->page_buffer; 2327 if (!page->header.initialized) 2328 return VDO_ZERO_BLOCK; 2329 2330 mapping = vdo_unpack_block_map_entry(&page->entries[slot]); 2331 if (!vdo_is_valid_location(&mapping) || vdo_is_state_compressed(mapping.state)) 2332 return VDO_ZERO_BLOCK; 2333 return mapping.pbn; 2334 } 2335 2336 /* 2337 * Write a tree page or indicate that it has been re-dirtied if it is already being written. This 2338 * method is used when correcting errors in the tree during read-only rebuild. 2339 */ 2340 void vdo_write_tree_page(struct tree_page *page, struct block_map_zone *zone) 2341 { 2342 bool waiting = vdo_waiter_is_waiting(&page->waiter); 2343 2344 if (waiting && (zone->flusher == page)) 2345 return; 2346 2347 set_generation(zone, page, zone->generation); 2348 if (waiting || page->writing) 2349 return; 2350 2351 enqueue_page(page, zone); 2352 } 2353 2354 static int make_segment(struct forest *old_forest, block_count_t new_pages, 2355 struct boundary *new_boundary, struct forest *forest) 2356 { 2357 size_t index = (old_forest == NULL) ? 0 : old_forest->segments; 2358 struct tree_page *page_ptr; 2359 page_count_t segment_sizes[VDO_BLOCK_MAP_TREE_HEIGHT]; 2360 height_t height; 2361 root_count_t root; 2362 int result; 2363 2364 forest->segments = index + 1; 2365 2366 result = vdo_allocate(forest->segments, "forest boundary array", &forest->boundaries); 2367 if (result != VDO_SUCCESS) 2368 return result; 2369 2370 result = vdo_allocate(forest->segments, "forest page pointers", &forest->pages); 2371 if (result != VDO_SUCCESS) 2372 return result; 2373 2374 result = vdo_allocate(new_pages, "new forest pages", &forest->pages[index]); 2375 if (result != VDO_SUCCESS) 2376 return result; 2377 2378 if (index > 0) { 2379 memcpy(forest->boundaries, old_forest->boundaries, 2380 index * sizeof(struct boundary)); 2381 memcpy(forest->pages, old_forest->pages, 2382 index * sizeof(struct tree_page *)); 2383 } 2384 2385 memcpy(&(forest->boundaries[index]), new_boundary, sizeof(struct boundary)); 2386 2387 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) { 2388 segment_sizes[height] = new_boundary->levels[height]; 2389 if (index > 0) 2390 segment_sizes[height] -= old_forest->boundaries[index - 1].levels[height]; 2391 } 2392 2393 page_ptr = forest->pages[index]; 2394 for (root = 0; root < forest->map->root_count; root++) { 2395 struct block_map_tree_segment *segment; 2396 struct block_map_tree *tree = &(forest->trees[root]); 2397 height_t height; 2398 2399 result = vdo_allocate(forest->segments, "tree root segments", &tree->segments); 2400 if (result != VDO_SUCCESS) 2401 return result; 2402 2403 if (index > 0) { 2404 memcpy(tree->segments, old_forest->trees[root].segments, 2405 index * sizeof(struct block_map_tree_segment)); 2406 } 2407 2408 segment = &(tree->segments[index]); 2409 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) { 2410 if (segment_sizes[height] == 0) 2411 continue; 2412 2413 segment->levels[height] = page_ptr; 2414 if (height == (VDO_BLOCK_MAP_TREE_HEIGHT - 1)) { 2415 /* Record the root. */ 2416 struct block_map_page *page = 2417 vdo_format_block_map_page(page_ptr->page_buffer, 2418 forest->map->nonce, 2419 VDO_INVALID_PBN, true); 2420 page->entries[0] = 2421 vdo_pack_block_map_entry(forest->map->root_origin + root, 2422 VDO_MAPPING_STATE_UNCOMPRESSED); 2423 } 2424 page_ptr += segment_sizes[height]; 2425 } 2426 } 2427 2428 return VDO_SUCCESS; 2429 } 2430 2431 static void deforest(struct forest *forest, size_t first_page_segment) 2432 { 2433 root_count_t root; 2434 2435 if (forest->pages != NULL) { 2436 size_t segment; 2437 2438 for (segment = first_page_segment; segment < forest->segments; segment++) 2439 vdo_free(forest->pages[segment]); 2440 vdo_free(forest->pages); 2441 } 2442 2443 for (root = 0; root < forest->map->root_count; root++) 2444 vdo_free(forest->trees[root].segments); 2445 2446 vdo_free(forest->boundaries); 2447 vdo_free(forest); 2448 } 2449 2450 /** 2451 * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if 2452 * there is one. 2453 * @map: The block map. 2454 * @entries: The number of entries the block map will hold. 2455 * 2456 * Return: VDO_SUCCESS or an error. 2457 */ 2458 static int make_forest(struct block_map *map, block_count_t entries) 2459 { 2460 struct forest *forest, *old_forest = map->forest; 2461 struct boundary new_boundary, *old_boundary = NULL; 2462 block_count_t new_pages; 2463 int result; 2464 2465 if (old_forest != NULL) 2466 old_boundary = &(old_forest->boundaries[old_forest->segments - 1]); 2467 2468 new_pages = vdo_compute_new_forest_pages(map->root_count, old_boundary, 2469 entries, &new_boundary); 2470 if (new_pages == 0) { 2471 map->next_entry_count = entries; 2472 return VDO_SUCCESS; 2473 } 2474 2475 result = vdo_allocate_extended(map->root_count, trees, __func__, &forest); 2476 if (result != VDO_SUCCESS) 2477 return result; 2478 2479 forest->map = map; 2480 result = make_segment(old_forest, new_pages, &new_boundary, forest); 2481 if (result != VDO_SUCCESS) { 2482 deforest(forest, forest->segments - 1); 2483 return result; 2484 } 2485 2486 map->next_forest = forest; 2487 map->next_entry_count = entries; 2488 return VDO_SUCCESS; 2489 } 2490 2491 /** 2492 * replace_forest() - Replace a block_map's forest with the already-prepared larger forest. 2493 * @map: The block map. 2494 */ 2495 static void replace_forest(struct block_map *map) 2496 { 2497 if (map->next_forest != NULL) { 2498 if (map->forest != NULL) 2499 deforest(map->forest, map->forest->segments); 2500 map->forest = vdo_forget(map->next_forest); 2501 } 2502 2503 map->entry_count = map->next_entry_count; 2504 map->next_entry_count = 0; 2505 } 2506 2507 /** 2508 * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the 2509 * traversal. 2510 * @cursor: The cursor to complete. 2511 */ 2512 static void finish_cursor(struct cursor *cursor) 2513 { 2514 struct cursors *cursors = cursor->parent; 2515 struct vdo_completion *completion = cursors->completion; 2516 2517 return_vio_to_pool(vdo_forget(cursor->vio)); 2518 if (--cursors->active_roots > 0) 2519 return; 2520 2521 vdo_free(cursors); 2522 2523 vdo_finish_completion(completion); 2524 } 2525 2526 static void traverse(struct cursor *cursor); 2527 2528 /** 2529 * continue_traversal() - Continue traversing a block map tree. 2530 * @completion: The VIO doing a read or write. 2531 */ 2532 static void continue_traversal(struct vdo_completion *completion) 2533 { 2534 vio_record_metadata_io_error(as_vio(completion)); 2535 traverse(completion->parent); 2536 } 2537 2538 /** 2539 * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded. 2540 * @completion: The VIO doing the read. 2541 */ 2542 static void finish_traversal_load(struct vdo_completion *completion) 2543 { 2544 struct cursor *cursor = completion->parent; 2545 height_t height = cursor->height; 2546 struct cursor_level *level = &cursor->levels[height]; 2547 struct tree_page *tree_page = 2548 &(cursor->tree->segments[0].levels[height][level->page_index]); 2549 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer; 2550 2551 vdo_copy_valid_page(cursor->vio->vio.data, 2552 cursor->parent->zone->block_map->nonce, 2553 pbn_from_vio_bio(cursor->vio->vio.bio), page); 2554 traverse(cursor); 2555 } 2556 2557 static void traversal_endio(struct bio *bio) 2558 { 2559 struct vio *vio = bio->bi_private; 2560 struct cursor *cursor = vio->completion.parent; 2561 2562 continue_vio_after_io(vio, finish_traversal_load, 2563 cursor->parent->zone->thread_id); 2564 } 2565 2566 /** 2567 * traverse() - Traverse a single block map tree. 2568 * @cursor: A cursor tracking traversal progress. 2569 * 2570 * This is the recursive heart of the traversal process. 2571 */ 2572 static void traverse(struct cursor *cursor) 2573 { 2574 for (; cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT; cursor->height++) { 2575 height_t height = cursor->height; 2576 struct cursor_level *level = &cursor->levels[height]; 2577 struct tree_page *tree_page = 2578 &(cursor->tree->segments[0].levels[height][level->page_index]); 2579 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer; 2580 2581 if (!page->header.initialized) 2582 continue; 2583 2584 for (; level->slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; level->slot++) { 2585 struct cursor_level *next_level; 2586 page_number_t entry_index = 2587 (VDO_BLOCK_MAP_ENTRIES_PER_PAGE * level->page_index) + level->slot; 2588 struct data_location location = 2589 vdo_unpack_block_map_entry(&page->entries[level->slot]); 2590 2591 if (!vdo_is_valid_location(&location)) { 2592 /* This entry is invalid, so remove it from the page. */ 2593 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2594 vdo_write_tree_page(tree_page, cursor->parent->zone); 2595 continue; 2596 } 2597 2598 if (!vdo_is_mapped_location(&location)) 2599 continue; 2600 2601 /* Erase mapped entries past the end of the logical space. */ 2602 if (entry_index >= cursor->boundary.levels[height]) { 2603 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2604 vdo_write_tree_page(tree_page, cursor->parent->zone); 2605 continue; 2606 } 2607 2608 if (cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT - 1) { 2609 int result = cursor->parent->entry_callback(location.pbn, 2610 cursor->parent->completion); 2611 if (result != VDO_SUCCESS) { 2612 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2613 vdo_write_tree_page(tree_page, cursor->parent->zone); 2614 continue; 2615 } 2616 } 2617 2618 if (cursor->height == 0) 2619 continue; 2620 2621 cursor->height--; 2622 next_level = &cursor->levels[cursor->height]; 2623 next_level->page_index = entry_index; 2624 next_level->slot = 0; 2625 level->slot++; 2626 vdo_submit_metadata_vio(&cursor->vio->vio, location.pbn, 2627 traversal_endio, continue_traversal, 2628 REQ_OP_READ | REQ_PRIO); 2629 return; 2630 } 2631 } 2632 2633 finish_cursor(cursor); 2634 } 2635 2636 /** 2637 * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with 2638 * which to load pages. 2639 * @waiter: The parent of the cursor to launch. 2640 * @context: The pooled_vio just acquired. 2641 * 2642 * Implements waiter_callback_fn. 2643 */ 2644 static void launch_cursor(struct vdo_waiter *waiter, void *context) 2645 { 2646 struct cursor *cursor = container_of(waiter, struct cursor, waiter); 2647 struct pooled_vio *pooled = context; 2648 2649 cursor->vio = pooled; 2650 pooled->vio.completion.parent = cursor; 2651 pooled->vio.completion.callback_thread_id = cursor->parent->zone->thread_id; 2652 traverse(cursor); 2653 } 2654 2655 /** 2656 * compute_boundary() - Compute the number of pages used at each level of the given root's tree. 2657 * @map: The block map. 2658 * @root_index: The tree root index. 2659 * 2660 * Return: The list of page counts as a boundary structure. 2661 */ 2662 static struct boundary compute_boundary(struct block_map *map, root_count_t root_index) 2663 { 2664 struct boundary boundary; 2665 height_t height; 2666 page_count_t leaf_pages = vdo_compute_block_map_page_count(map->entry_count); 2667 /* 2668 * Compute the leaf pages for this root. If the number of leaf pages does not distribute 2669 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to 2670 * roots starting from tree 0. 2671 */ 2672 page_count_t last_tree_root = (leaf_pages - 1) % map->root_count; 2673 page_count_t level_pages = leaf_pages / map->root_count; 2674 2675 if (root_index <= last_tree_root) 2676 level_pages++; 2677 2678 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT - 1; height++) { 2679 boundary.levels[height] = level_pages; 2680 level_pages = DIV_ROUND_UP(level_pages, VDO_BLOCK_MAP_ENTRIES_PER_PAGE); 2681 } 2682 2683 /* The root node always exists, even if the root is otherwise unused. */ 2684 boundary.levels[VDO_BLOCK_MAP_TREE_HEIGHT - 1] = 1; 2685 2686 return boundary; 2687 } 2688 2689 /** 2690 * vdo_traverse_forest() - Walk the entire forest of a block map. 2691 * @map: The block map. 2692 * @callback: A function to call with the pbn of each allocated node in the forest. 2693 * @completion: The completion to notify on each traversed PBN, and when traversal completes. 2694 */ 2695 void vdo_traverse_forest(struct block_map *map, vdo_entry_callback_fn callback, 2696 struct vdo_completion *completion) 2697 { 2698 root_count_t root; 2699 struct cursors *cursors; 2700 int result; 2701 2702 result = vdo_allocate_extended(map->root_count, cursors, __func__, &cursors); 2703 if (result != VDO_SUCCESS) { 2704 vdo_fail_completion(completion, result); 2705 return; 2706 } 2707 2708 cursors->zone = &map->zones[0]; 2709 cursors->pool = cursors->zone->vio_pool; 2710 cursors->entry_callback = callback; 2711 cursors->completion = completion; 2712 cursors->active_roots = map->root_count; 2713 for (root = 0; root < map->root_count; root++) { 2714 struct cursor *cursor = &cursors->cursors[root]; 2715 2716 *cursor = (struct cursor) { 2717 .tree = &map->forest->trees[root], 2718 .height = VDO_BLOCK_MAP_TREE_HEIGHT - 1, 2719 .parent = cursors, 2720 .boundary = compute_boundary(map, root), 2721 }; 2722 2723 cursor->waiter.callback = launch_cursor; 2724 acquire_vio_from_pool(cursors->pool, &cursor->waiter); 2725 } 2726 } 2727 2728 /** 2729 * initialize_block_map_zone() - Initialize the per-zone portions of the block map. 2730 * @map: The block map. 2731 * @zone_number: The zone to initialize. 2732 * @cache_size: The total block map cache size. 2733 * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be 2734 * written out. 2735 */ 2736 static int __must_check initialize_block_map_zone(struct block_map *map, 2737 zone_count_t zone_number, 2738 page_count_t cache_size, 2739 block_count_t maximum_age) 2740 { 2741 int result; 2742 block_count_t i; 2743 struct vdo *vdo = map->vdo; 2744 struct block_map_zone *zone = &map->zones[zone_number]; 2745 2746 BUILD_BUG_ON(sizeof(struct page_descriptor) != sizeof(u64)); 2747 2748 zone->zone_number = zone_number; 2749 zone->thread_id = vdo->thread_config.logical_threads[zone_number]; 2750 zone->block_map = map; 2751 2752 result = vdo_allocate_extended(maximum_age, eras, __func__, &zone->dirty_lists); 2753 if (result != VDO_SUCCESS) 2754 return result; 2755 2756 zone->dirty_lists->maximum_age = maximum_age; 2757 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_TREE_PAGE]); 2758 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_CACHE_PAGE]); 2759 2760 for (i = 0; i < maximum_age; i++) { 2761 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_TREE_PAGE]); 2762 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_CACHE_PAGE]); 2763 } 2764 2765 result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->loading_pages); 2766 if (result != VDO_SUCCESS) 2767 return result; 2768 2769 result = make_vio_pool(vdo, BLOCK_MAP_VIO_POOL_SIZE, 1, 2770 zone->thread_id, VIO_TYPE_BLOCK_MAP_INTERIOR, 2771 VIO_PRIORITY_METADATA, zone, &zone->vio_pool); 2772 if (result != VDO_SUCCESS) 2773 return result; 2774 2775 vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 2776 2777 zone->page_cache.zone = zone; 2778 zone->page_cache.vdo = vdo; 2779 zone->page_cache.page_count = cache_size / map->zone_count; 2780 zone->page_cache.stats.free_pages = zone->page_cache.page_count; 2781 2782 result = allocate_cache_components(&zone->page_cache); 2783 if (result != VDO_SUCCESS) 2784 return result; 2785 2786 /* initialize empty circular queues */ 2787 INIT_LIST_HEAD(&zone->page_cache.lru_list); 2788 INIT_LIST_HEAD(&zone->page_cache.outgoing_list); 2789 2790 return VDO_SUCCESS; 2791 } 2792 2793 /* Implements vdo_zone_thread_getter_fn */ 2794 static thread_id_t get_block_map_zone_thread_id(void *context, zone_count_t zone_number) 2795 { 2796 struct block_map *map = context; 2797 2798 return map->zones[zone_number].thread_id; 2799 } 2800 2801 /* Implements vdo_action_preamble_fn */ 2802 static void prepare_for_era_advance(void *context, struct vdo_completion *parent) 2803 { 2804 struct block_map *map = context; 2805 2806 map->current_era_point = map->pending_era_point; 2807 vdo_finish_completion(parent); 2808 } 2809 2810 /* Implements vdo_zone_action_fn */ 2811 static void advance_block_map_zone_era(void *context, zone_count_t zone_number, 2812 struct vdo_completion *parent) 2813 { 2814 struct block_map *map = context; 2815 struct block_map_zone *zone = &map->zones[zone_number]; 2816 2817 update_period(zone->dirty_lists, map->current_era_point); 2818 write_expired_elements(zone); 2819 vdo_finish_completion(parent); 2820 } 2821 2822 /* 2823 * Schedule an era advance if necessary. This method should not be called directly. Rather, call 2824 * vdo_schedule_default_action() on the block map's action manager. 2825 * 2826 * Implements vdo_action_scheduler_fn. 2827 */ 2828 static bool schedule_era_advance(void *context) 2829 { 2830 struct block_map *map = context; 2831 2832 if (map->current_era_point == map->pending_era_point) 2833 return false; 2834 2835 return vdo_schedule_action(map->action_manager, prepare_for_era_advance, 2836 advance_block_map_zone_era, NULL, NULL); 2837 } 2838 2839 static void uninitialize_block_map_zone(struct block_map_zone *zone) 2840 { 2841 struct vdo_page_cache *cache = &zone->page_cache; 2842 2843 vdo_free(vdo_forget(zone->dirty_lists)); 2844 free_vio_pool(vdo_forget(zone->vio_pool)); 2845 vdo_int_map_free(vdo_forget(zone->loading_pages)); 2846 if (cache->infos != NULL) { 2847 struct page_info *info; 2848 2849 for (info = cache->infos; info < cache->infos + cache->page_count; info++) 2850 free_vio(vdo_forget(info->vio)); 2851 } 2852 2853 vdo_int_map_free(vdo_forget(cache->page_map)); 2854 vdo_free(vdo_forget(cache->infos)); 2855 vdo_free(vdo_forget(cache->pages)); 2856 } 2857 2858 void vdo_free_block_map(struct block_map *map) 2859 { 2860 zone_count_t zone; 2861 2862 if (map == NULL) 2863 return; 2864 2865 for (zone = 0; zone < map->zone_count; zone++) 2866 uninitialize_block_map_zone(&map->zones[zone]); 2867 2868 vdo_abandon_block_map_growth(map); 2869 if (map->forest != NULL) 2870 deforest(vdo_forget(map->forest), 0); 2871 vdo_free(vdo_forget(map->action_manager)); 2872 vdo_free(map); 2873 } 2874 2875 /* @journal may be NULL. */ 2876 int vdo_decode_block_map(struct block_map_state_2_0 state, block_count_t logical_blocks, 2877 struct vdo *vdo, struct recovery_journal *journal, 2878 nonce_t nonce, page_count_t cache_size, block_count_t maximum_age, 2879 struct block_map **map_ptr) 2880 { 2881 struct block_map *map; 2882 int result; 2883 zone_count_t zone = 0; 2884 2885 BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE != 2886 ((VDO_BLOCK_SIZE - sizeof(struct block_map_page)) / 2887 sizeof(struct block_map_entry))); 2888 result = VDO_ASSERT(cache_size > 0, "block map cache size is specified"); 2889 if (result != VDO_SUCCESS) 2890 return result; 2891 2892 result = vdo_allocate_extended(vdo->thread_config.logical_zone_count, 2893 zones, __func__, &map); 2894 if (result != VDO_SUCCESS) 2895 return result; 2896 2897 map->vdo = vdo; 2898 map->root_origin = state.root_origin; 2899 map->root_count = state.root_count; 2900 map->entry_count = logical_blocks; 2901 map->journal = journal; 2902 map->nonce = nonce; 2903 2904 result = make_forest(map, map->entry_count); 2905 if (result != VDO_SUCCESS) { 2906 vdo_free_block_map(map); 2907 return result; 2908 } 2909 2910 replace_forest(map); 2911 2912 map->zone_count = vdo->thread_config.logical_zone_count; 2913 for (zone = 0; zone < map->zone_count; zone++) { 2914 result = initialize_block_map_zone(map, zone, cache_size, maximum_age); 2915 if (result != VDO_SUCCESS) { 2916 vdo_free_block_map(map); 2917 return result; 2918 } 2919 } 2920 2921 result = vdo_make_action_manager(map->zone_count, get_block_map_zone_thread_id, 2922 vdo_get_recovery_journal_thread_id(journal), 2923 map, schedule_era_advance, vdo, 2924 &map->action_manager); 2925 if (result != VDO_SUCCESS) { 2926 vdo_free_block_map(map); 2927 return result; 2928 } 2929 2930 *map_ptr = map; 2931 return VDO_SUCCESS; 2932 } 2933 2934 struct block_map_state_2_0 vdo_record_block_map(const struct block_map *map) 2935 { 2936 return (struct block_map_state_2_0) { 2937 .flat_page_origin = VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN, 2938 /* This is the flat page count, which has turned out to always be 0. */ 2939 .flat_page_count = 0, 2940 .root_origin = map->root_origin, 2941 .root_count = map->root_count, 2942 }; 2943 } 2944 2945 /* The block map needs to know the journals' sequence number to initialize the eras. */ 2946 void vdo_initialize_block_map_from_journal(struct block_map *map, 2947 struct recovery_journal *journal) 2948 { 2949 zone_count_t z = 0; 2950 2951 map->current_era_point = vdo_get_recovery_journal_current_sequence_number(journal); 2952 map->pending_era_point = map->current_era_point; 2953 2954 for (z = 0; z < map->zone_count; z++) { 2955 struct dirty_lists *dirty_lists = map->zones[z].dirty_lists; 2956 2957 VDO_ASSERT_LOG_ONLY(dirty_lists->next_period == 0, "current period not set"); 2958 dirty_lists->oldest_period = map->current_era_point; 2959 dirty_lists->next_period = map->current_era_point + 1; 2960 dirty_lists->offset = map->current_era_point % dirty_lists->maximum_age; 2961 } 2962 } 2963 2964 /* Compute the logical zone for the LBN of a data vio. */ 2965 zone_count_t vdo_compute_logical_zone(struct data_vio *data_vio) 2966 { 2967 struct block_map *map = vdo_from_data_vio(data_vio)->block_map; 2968 struct tree_lock *tree_lock = &data_vio->tree_lock; 2969 page_number_t page_number = data_vio->logical.lbn / VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2970 2971 tree_lock->tree_slots[0].page_index = page_number; 2972 tree_lock->root_index = page_number % map->root_count; 2973 return (tree_lock->root_index % map->zone_count); 2974 } 2975 2976 void vdo_advance_block_map_era(struct block_map *map, 2977 sequence_number_t recovery_block_number) 2978 { 2979 if (map == NULL) 2980 return; 2981 2982 map->pending_era_point = recovery_block_number; 2983 vdo_schedule_default_action(map->action_manager); 2984 } 2985 2986 /* Implements vdo_admin_initiator_fn */ 2987 static void initiate_drain(struct admin_state *state) 2988 { 2989 struct block_map_zone *zone = container_of(state, struct block_map_zone, state); 2990 2991 VDO_ASSERT_LOG_ONLY((zone->active_lookups == 0), 2992 "%s() called with no active lookups", __func__); 2993 2994 if (!vdo_is_state_suspending(state)) { 2995 while (zone->dirty_lists->oldest_period < zone->dirty_lists->next_period) 2996 expire_oldest_list(zone->dirty_lists); 2997 write_expired_elements(zone); 2998 } 2999 3000 check_for_drain_complete(zone); 3001 } 3002 3003 /* Implements vdo_zone_action_fn. */ 3004 static void drain_zone(void *context, zone_count_t zone_number, 3005 struct vdo_completion *parent) 3006 { 3007 struct block_map *map = context; 3008 struct block_map_zone *zone = &map->zones[zone_number]; 3009 3010 vdo_start_draining(&zone->state, 3011 vdo_get_current_manager_operation(map->action_manager), 3012 parent, initiate_drain); 3013 } 3014 3015 void vdo_drain_block_map(struct block_map *map, const struct admin_state_code *operation, 3016 struct vdo_completion *parent) 3017 { 3018 vdo_schedule_operation(map->action_manager, operation, NULL, drain_zone, NULL, 3019 parent); 3020 } 3021 3022 /* Implements vdo_zone_action_fn. */ 3023 static void resume_block_map_zone(void *context, zone_count_t zone_number, 3024 struct vdo_completion *parent) 3025 { 3026 struct block_map *map = context; 3027 struct block_map_zone *zone = &map->zones[zone_number]; 3028 3029 vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state)); 3030 } 3031 3032 void vdo_resume_block_map(struct block_map *map, struct vdo_completion *parent) 3033 { 3034 vdo_schedule_operation(map->action_manager, VDO_ADMIN_STATE_RESUMING, 3035 NULL, resume_block_map_zone, NULL, parent); 3036 } 3037 3038 /* Allocate an expanded collection of trees, for a future growth. */ 3039 int vdo_prepare_to_grow_block_map(struct block_map *map, 3040 block_count_t new_logical_blocks) 3041 { 3042 if (map->next_entry_count == new_logical_blocks) 3043 return VDO_SUCCESS; 3044 3045 if (map->next_entry_count > 0) 3046 vdo_abandon_block_map_growth(map); 3047 3048 if (new_logical_blocks < map->entry_count) { 3049 map->next_entry_count = map->entry_count; 3050 return VDO_SUCCESS; 3051 } 3052 3053 return make_forest(map, new_logical_blocks); 3054 } 3055 3056 /* Implements vdo_action_preamble_fn */ 3057 static void grow_forest(void *context, struct vdo_completion *completion) 3058 { 3059 replace_forest(context); 3060 vdo_finish_completion(completion); 3061 } 3062 3063 /* Requires vdo_prepare_to_grow_block_map() to have been previously called. */ 3064 void vdo_grow_block_map(struct block_map *map, struct vdo_completion *parent) 3065 { 3066 vdo_schedule_operation(map->action_manager, 3067 VDO_ADMIN_STATE_SUSPENDED_OPERATION, 3068 grow_forest, NULL, NULL, parent); 3069 } 3070 3071 void vdo_abandon_block_map_growth(struct block_map *map) 3072 { 3073 struct forest *forest = vdo_forget(map->next_forest); 3074 3075 if (forest != NULL) 3076 deforest(forest, forest->segments - 1); 3077 3078 map->next_entry_count = 0; 3079 } 3080 3081 /* Release the page completion and then continue the requester. */ 3082 static inline void finish_processing_page(struct vdo_completion *completion, int result) 3083 { 3084 struct vdo_completion *parent = completion->parent; 3085 3086 vdo_release_page_completion(completion); 3087 vdo_continue_completion(parent, result); 3088 } 3089 3090 static void handle_page_error(struct vdo_completion *completion) 3091 { 3092 finish_processing_page(completion, completion->result); 3093 } 3094 3095 /* Fetch the mapping page for a block map update, and call the provided handler when fetched. */ 3096 static void fetch_mapping_page(struct data_vio *data_vio, bool modifiable, 3097 vdo_action_fn action) 3098 { 3099 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 3100 3101 if (vdo_is_state_draining(&zone->state)) { 3102 continue_data_vio_with_error(data_vio, VDO_SHUTTING_DOWN); 3103 return; 3104 } 3105 3106 vdo_get_page(&data_vio->page_completion, zone, 3107 data_vio->tree_lock.tree_slots[0].block_map_slot.pbn, 3108 modifiable, &data_vio->vio.completion, 3109 action, handle_page_error, false); 3110 } 3111 3112 /** 3113 * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped. 3114 * @data_vio: The data vio. 3115 * 3116 * This indicates the block map entry for the logical block is either unmapped or corrupted. 3117 */ 3118 static void clear_mapped_location(struct data_vio *data_vio) 3119 { 3120 data_vio->mapped = (struct zoned_pbn) { 3121 .state = VDO_MAPPING_STATE_UNMAPPED, 3122 }; 3123 } 3124 3125 /** 3126 * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a 3127 * data_vio. 3128 * @data_vio: The data vio. 3129 * @entry: The new mapped entry to set. 3130 * 3131 * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any 3132 * other failure 3133 */ 3134 static int __must_check set_mapped_location(struct data_vio *data_vio, 3135 const struct block_map_entry *entry) 3136 { 3137 /* Unpack the PBN for logging purposes even if the entry is invalid. */ 3138 struct data_location mapped = vdo_unpack_block_map_entry(entry); 3139 3140 if (vdo_is_valid_location(&mapped)) { 3141 int result; 3142 3143 result = vdo_get_physical_zone(vdo_from_data_vio(data_vio), 3144 mapped.pbn, &data_vio->mapped.zone); 3145 if (result == VDO_SUCCESS) { 3146 data_vio->mapped.pbn = mapped.pbn; 3147 data_vio->mapped.state = mapped.state; 3148 return VDO_SUCCESS; 3149 } 3150 3151 /* 3152 * Return all errors not specifically known to be errors from validating the 3153 * location. 3154 */ 3155 if ((result != VDO_OUT_OF_RANGE) && (result != VDO_BAD_MAPPING)) 3156 return result; 3157 } 3158 3159 /* 3160 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases 3161 * to VDO_BAD_MAPPING. 3162 */ 3163 vdo_log_error_strerror(VDO_BAD_MAPPING, 3164 "PBN %llu with state %u read from the block map was invalid", 3165 (unsigned long long) mapped.pbn, mapped.state); 3166 3167 /* 3168 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding 3169 * known data loss. 3170 */ 3171 if (!data_vio->write) 3172 return VDO_BAD_MAPPING; 3173 3174 /* 3175 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped 3176 * entry rather than fail the write. 3177 */ 3178 clear_mapped_location(data_vio); 3179 return VDO_SUCCESS; 3180 } 3181 3182 /* This callback is registered in vdo_get_mapped_block(). */ 3183 static void get_mapping_from_fetched_page(struct vdo_completion *completion) 3184 { 3185 int result; 3186 struct vdo_page_completion *vpc = as_vdo_page_completion(completion); 3187 const struct block_map_page *page; 3188 const struct block_map_entry *entry; 3189 struct data_vio *data_vio = as_data_vio(completion->parent); 3190 struct block_map_tree_slot *tree_slot; 3191 3192 if (completion->result != VDO_SUCCESS) { 3193 finish_processing_page(completion, completion->result); 3194 return; 3195 } 3196 3197 result = validate_completed_page(vpc, false); 3198 if (result != VDO_SUCCESS) { 3199 finish_processing_page(completion, result); 3200 return; 3201 } 3202 3203 page = (const struct block_map_page *) get_page_buffer(vpc->info); 3204 tree_slot = &data_vio->tree_lock.tree_slots[0]; 3205 entry = &page->entries[tree_slot->block_map_slot.slot]; 3206 3207 result = set_mapped_location(data_vio, entry); 3208 finish_processing_page(completion, result); 3209 } 3210 3211 void vdo_update_block_map_page(struct block_map_page *page, struct data_vio *data_vio, 3212 physical_block_number_t pbn, 3213 enum block_mapping_state mapping_state, 3214 sequence_number_t *recovery_lock) 3215 { 3216 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 3217 struct block_map *block_map = zone->block_map; 3218 struct recovery_journal *journal = block_map->journal; 3219 sequence_number_t old_locked, new_locked; 3220 struct tree_lock *tree_lock = &data_vio->tree_lock; 3221 3222 /* Encode the new mapping. */ 3223 page->entries[tree_lock->tree_slots[tree_lock->height].block_map_slot.slot] = 3224 vdo_pack_block_map_entry(pbn, mapping_state); 3225 3226 /* Adjust references on the recovery journal blocks. */ 3227 old_locked = *recovery_lock; 3228 new_locked = data_vio->recovery_sequence_number; 3229 3230 if ((old_locked == 0) || (old_locked > new_locked)) { 3231 vdo_acquire_recovery_journal_block_reference(journal, new_locked, 3232 VDO_ZONE_TYPE_LOGICAL, 3233 zone->zone_number); 3234 3235 if (old_locked > 0) { 3236 vdo_release_recovery_journal_block_reference(journal, old_locked, 3237 VDO_ZONE_TYPE_LOGICAL, 3238 zone->zone_number); 3239 } 3240 3241 *recovery_lock = new_locked; 3242 } 3243 3244 /* 3245 * FIXME: explain this more 3246 * Release the transferred lock from the data_vio. 3247 */ 3248 vdo_release_journal_entry_lock(journal, new_locked); 3249 data_vio->recovery_sequence_number = 0; 3250 } 3251 3252 static void put_mapping_in_fetched_page(struct vdo_completion *completion) 3253 { 3254 struct data_vio *data_vio = as_data_vio(completion->parent); 3255 sequence_number_t old_lock; 3256 struct vdo_page_completion *vpc; 3257 struct page_info *info; 3258 int result; 3259 3260 if (completion->result != VDO_SUCCESS) { 3261 finish_processing_page(completion, completion->result); 3262 return; 3263 } 3264 3265 vpc = as_vdo_page_completion(completion); 3266 result = validate_completed_page(vpc, true); 3267 if (result != VDO_SUCCESS) { 3268 finish_processing_page(completion, result); 3269 return; 3270 } 3271 3272 info = vpc->info; 3273 old_lock = info->recovery_lock; 3274 vdo_update_block_map_page((struct block_map_page *) get_page_buffer(info), 3275 data_vio, data_vio->new_mapped.pbn, 3276 data_vio->new_mapped.state, &info->recovery_lock); 3277 set_info_state(info, PS_DIRTY); 3278 add_to_dirty_lists(info->cache->zone, &info->state_entry, 3279 VDO_CACHE_PAGE, old_lock, info->recovery_lock); 3280 finish_processing_page(completion, VDO_SUCCESS); 3281 } 3282 3283 /* Read a stored block mapping into a data_vio. */ 3284 void vdo_get_mapped_block(struct data_vio *data_vio) 3285 { 3286 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) { 3287 /* 3288 * We know that the block map page for this LBN has not been allocated, so the 3289 * block must be unmapped. 3290 */ 3291 clear_mapped_location(data_vio); 3292 continue_data_vio(data_vio); 3293 return; 3294 } 3295 3296 fetch_mapping_page(data_vio, false, get_mapping_from_fetched_page); 3297 } 3298 3299 /* Update a stored block mapping to reflect a data_vio's new mapping. */ 3300 void vdo_put_mapped_block(struct data_vio *data_vio) 3301 { 3302 fetch_mapping_page(data_vio, true, put_mapping_in_fetched_page); 3303 } 3304 3305 struct block_map_statistics vdo_get_block_map_statistics(struct block_map *map) 3306 { 3307 zone_count_t zone = 0; 3308 struct block_map_statistics totals; 3309 3310 memset(&totals, 0, sizeof(struct block_map_statistics)); 3311 for (zone = 0; zone < map->zone_count; zone++) { 3312 const struct block_map_statistics *stats = 3313 &(map->zones[zone].page_cache.stats); 3314 3315 totals.dirty_pages += READ_ONCE(stats->dirty_pages); 3316 totals.clean_pages += READ_ONCE(stats->clean_pages); 3317 totals.free_pages += READ_ONCE(stats->free_pages); 3318 totals.failed_pages += READ_ONCE(stats->failed_pages); 3319 totals.incoming_pages += READ_ONCE(stats->incoming_pages); 3320 totals.outgoing_pages += READ_ONCE(stats->outgoing_pages); 3321 totals.cache_pressure += READ_ONCE(stats->cache_pressure); 3322 totals.read_count += READ_ONCE(stats->read_count); 3323 totals.write_count += READ_ONCE(stats->write_count); 3324 totals.failed_reads += READ_ONCE(stats->failed_reads); 3325 totals.failed_writes += READ_ONCE(stats->failed_writes); 3326 totals.reclaimed += READ_ONCE(stats->reclaimed); 3327 totals.read_outgoing += READ_ONCE(stats->read_outgoing); 3328 totals.found_in_cache += READ_ONCE(stats->found_in_cache); 3329 totals.discard_required += READ_ONCE(stats->discard_required); 3330 totals.wait_for_page += READ_ONCE(stats->wait_for_page); 3331 totals.fetch_required += READ_ONCE(stats->fetch_required); 3332 totals.pages_loaded += READ_ONCE(stats->pages_loaded); 3333 totals.pages_saved += READ_ONCE(stats->pages_saved); 3334 totals.flush_count += READ_ONCE(stats->flush_count); 3335 } 3336 3337 return totals; 3338 } 3339