1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "slab-depot.h" 7 8 #include <linux/atomic.h> 9 #include <linux/bio.h> 10 #include <linux/err.h> 11 #include <linux/log2.h> 12 #include <linux/min_heap.h> 13 #include <linux/minmax.h> 14 15 #include "logger.h" 16 #include "memory-alloc.h" 17 #include "numeric.h" 18 #include "permassert.h" 19 #include "string-utils.h" 20 21 #include "action-manager.h" 22 #include "admin-state.h" 23 #include "completion.h" 24 #include "constants.h" 25 #include "data-vio.h" 26 #include "encodings.h" 27 #include "io-submitter.h" 28 #include "physical-zone.h" 29 #include "priority-table.h" 30 #include "recovery-journal.h" 31 #include "repair.h" 32 #include "status-codes.h" 33 #include "types.h" 34 #include "vdo.h" 35 #include "vio.h" 36 #include "wait-queue.h" 37 38 static const u64 BYTES_PER_WORD = sizeof(u64); 39 static const bool NORMAL_OPERATION = true; 40 41 /** 42 * get_lock() - Get the lock object for a slab journal block by sequence number. 43 * @journal: The vdo_slab journal to retrieve from. 44 * @sequence_number: Sequence number of the block. 45 * 46 * Return: The lock object for the given sequence number. 47 */ 48 static inline struct journal_lock * __must_check get_lock(struct slab_journal *journal, 49 sequence_number_t sequence_number) 50 { 51 return &journal->locks[sequence_number % journal->size]; 52 } 53 54 static bool is_slab_open(struct vdo_slab *slab) 55 { 56 return (!vdo_is_state_quiescing(&slab->state) && 57 !vdo_is_state_quiescent(&slab->state)); 58 } 59 60 /** 61 * must_make_entries_to_flush() - Check whether there are entry waiters which should delay a flush. 62 * @journal: The journal to check. 63 * 64 * Return: true if there are no entry waiters, or if the slab is unrecovered. 65 */ 66 static inline bool __must_check must_make_entries_to_flush(struct slab_journal *journal) 67 { 68 return ((journal->slab->status != VDO_SLAB_REBUILDING) && 69 vdo_waitq_has_waiters(&journal->entry_waiters)); 70 } 71 72 /** 73 * is_reaping() - Check whether a reap is currently in progress. 74 * @journal: The journal which may be reaping. 75 * 76 * Return: true if the journal is reaping. 77 */ 78 static inline bool __must_check is_reaping(struct slab_journal *journal) 79 { 80 return (journal->head != journal->unreapable); 81 } 82 83 /** 84 * initialize_tail_block() - Initialize tail block as a new block. 85 * @journal: The journal whose tail block is being initialized. 86 */ 87 static void initialize_tail_block(struct slab_journal *journal) 88 { 89 struct slab_journal_block_header *header = &journal->tail_header; 90 91 header->sequence_number = journal->tail; 92 header->entry_count = 0; 93 header->has_block_map_increments = false; 94 } 95 96 /** 97 * initialize_journal_state() - Set all journal fields appropriately to start journaling. 98 * @journal: The journal to be reset, based on its tail sequence number. 99 */ 100 static void initialize_journal_state(struct slab_journal *journal) 101 { 102 journal->unreapable = journal->head; 103 journal->reap_lock = get_lock(journal, journal->unreapable); 104 journal->next_commit = journal->tail; 105 journal->summarized = journal->last_summarized = journal->tail; 106 initialize_tail_block(journal); 107 } 108 109 /** 110 * block_is_full() - Check whether a journal block is full. 111 * @journal: The slab journal for the block. 112 * 113 * Return: True if the tail block is full. 114 */ 115 static bool __must_check block_is_full(struct slab_journal *journal) 116 { 117 journal_entry_count_t count = journal->tail_header.entry_count; 118 119 return (journal->tail_header.has_block_map_increments ? 120 (journal->full_entries_per_block == count) : 121 (journal->entries_per_block == count)); 122 } 123 124 static void add_entries(struct slab_journal *journal); 125 static void update_tail_block_location(struct slab_journal *journal); 126 static void release_journal_locks(struct vdo_waiter *waiter, void *context); 127 128 /** 129 * is_slab_journal_blank() - Check whether a slab's journal is blank. 130 * @slab: The slab to check. 131 * 132 * A slab journal is blank if it has never had any entries recorded in it. 133 * 134 * Return: True if the slab's journal has never been modified. 135 */ 136 static bool is_slab_journal_blank(const struct vdo_slab *slab) 137 { 138 return ((slab->journal.tail == 1) && 139 (slab->journal.tail_header.entry_count == 0)); 140 } 141 142 /** 143 * mark_slab_journal_dirty() - Put a slab journal on the dirty list of its allocator in the correct 144 * order. 145 * @journal: The journal to be marked dirty. 146 * @lock: The recovery journal lock held by the slab journal. 147 */ 148 static void mark_slab_journal_dirty(struct slab_journal *journal, sequence_number_t lock) 149 { 150 struct slab_journal *dirty_journal; 151 struct list_head *dirty_list = &journal->slab->allocator->dirty_slab_journals; 152 153 VDO_ASSERT_LOG_ONLY(journal->recovery_lock == 0, "slab journal was clean"); 154 155 journal->recovery_lock = lock; 156 list_for_each_entry_reverse(dirty_journal, dirty_list, dirty_entry) { 157 if (dirty_journal->recovery_lock <= journal->recovery_lock) 158 break; 159 } 160 161 list_move_tail(&journal->dirty_entry, dirty_journal->dirty_entry.next); 162 } 163 164 static void mark_slab_journal_clean(struct slab_journal *journal) 165 { 166 journal->recovery_lock = 0; 167 list_del_init(&journal->dirty_entry); 168 } 169 170 static void check_if_slab_drained(struct vdo_slab *slab) 171 { 172 bool read_only; 173 struct slab_journal *journal = &slab->journal; 174 const struct admin_state_code *code; 175 176 if (!vdo_is_state_draining(&slab->state) || 177 must_make_entries_to_flush(journal) || 178 is_reaping(journal) || 179 journal->waiting_to_commit || 180 !list_empty(&journal->uncommitted_blocks) || 181 journal->updating_slab_summary || 182 (slab->active_count > 0)) 183 return; 184 185 /* When not suspending or recovering, the slab must be clean. */ 186 code = vdo_get_admin_state_code(&slab->state); 187 read_only = vdo_is_read_only(slab->allocator->depot->vdo); 188 if (!read_only && 189 vdo_waitq_has_waiters(&slab->dirty_blocks) && 190 (code != VDO_ADMIN_STATE_SUSPENDING) && 191 (code != VDO_ADMIN_STATE_RECOVERING)) 192 return; 193 194 vdo_finish_draining_with_result(&slab->state, 195 (read_only ? VDO_READ_ONLY : VDO_SUCCESS)); 196 } 197 198 /* FULLNESS HINT COMPUTATION */ 199 200 /** 201 * compute_fullness_hint() - Translate a slab's free block count into a 'fullness hint' that can be 202 * stored in a slab_summary_entry's 7 bits that are dedicated to its free 203 * count. 204 * @depot: The depot whose summary being updated. 205 * @free_blocks: The number of free blocks. 206 * 207 * Note: the number of free blocks must be strictly less than 2^23 blocks, even though 208 * theoretically slabs could contain precisely 2^23 blocks; there is an assumption that at least 209 * one block is used by metadata. This assumption is necessary; otherwise, the fullness hint might 210 * overflow. The fullness hint formula is roughly (fullness >> 16) & 0x7f, but (2^23 >> 16) & 0x7f 211 * is 0, which would make it impossible to distinguish completely full from completely empty. 212 * 213 * Return: A fullness hint, which can be stored in 7 bits. 214 */ 215 static u8 __must_check compute_fullness_hint(struct slab_depot *depot, 216 block_count_t free_blocks) 217 { 218 block_count_t hint; 219 220 VDO_ASSERT_LOG_ONLY((free_blocks < (1 << 23)), "free blocks must be less than 2^23"); 221 222 if (free_blocks == 0) 223 return 0; 224 225 hint = free_blocks >> depot->hint_shift; 226 return ((hint == 0) ? 1 : hint); 227 } 228 229 /** 230 * check_summary_drain_complete() - Check whether an allocators summary has finished draining. 231 * @allocator: The allocator to check. 232 */ 233 static void check_summary_drain_complete(struct block_allocator *allocator) 234 { 235 if (!vdo_is_state_draining(&allocator->summary_state) || 236 (allocator->summary_write_count > 0)) 237 return; 238 239 vdo_finish_operation(&allocator->summary_state, 240 (vdo_is_read_only(allocator->depot->vdo) ? 241 VDO_READ_ONLY : VDO_SUCCESS)); 242 } 243 244 /** 245 * notify_summary_waiters() - Wake all the waiters in a given queue. 246 * @allocator: The block allocator summary which owns the queue. 247 * @queue: The queue to notify. 248 */ 249 static void notify_summary_waiters(struct block_allocator *allocator, 250 struct vdo_wait_queue *queue) 251 { 252 int result = (vdo_is_read_only(allocator->depot->vdo) ? 253 VDO_READ_ONLY : VDO_SUCCESS); 254 255 vdo_waitq_notify_all_waiters(queue, NULL, &result); 256 } 257 258 static void launch_write(struct slab_summary_block *summary_block); 259 260 /** 261 * finish_updating_slab_summary_block() - Finish processing a block which attempted to write, 262 * whether or not the attempt succeeded. 263 * @block: The block. 264 */ 265 static void finish_updating_slab_summary_block(struct slab_summary_block *block) 266 { 267 notify_summary_waiters(block->allocator, &block->current_update_waiters); 268 block->writing = false; 269 block->allocator->summary_write_count--; 270 if (vdo_waitq_has_waiters(&block->next_update_waiters)) 271 launch_write(block); 272 else 273 check_summary_drain_complete(block->allocator); 274 } 275 276 /** 277 * finish_update() - This is the callback for a successful summary block write. 278 * @completion: The write vio. 279 */ 280 static void finish_update(struct vdo_completion *completion) 281 { 282 struct slab_summary_block *block = 283 container_of(as_vio(completion), struct slab_summary_block, vio); 284 285 atomic64_inc(&block->allocator->depot->summary_statistics.blocks_written); 286 finish_updating_slab_summary_block(block); 287 } 288 289 /** 290 * handle_write_error() - Handle an error writing a slab summary block. 291 * @completion: The write VIO. 292 */ 293 static void handle_write_error(struct vdo_completion *completion) 294 { 295 struct slab_summary_block *block = 296 container_of(as_vio(completion), struct slab_summary_block, vio); 297 298 vio_record_metadata_io_error(as_vio(completion)); 299 vdo_enter_read_only_mode(completion->vdo, completion->result); 300 finish_updating_slab_summary_block(block); 301 } 302 303 static void write_slab_summary_endio(struct bio *bio) 304 { 305 struct vio *vio = bio->bi_private; 306 struct slab_summary_block *block = 307 container_of(vio, struct slab_summary_block, vio); 308 309 continue_vio_after_io(vio, finish_update, block->allocator->thread_id); 310 } 311 312 /** 313 * launch_write() - Write a slab summary block unless it is currently out for writing. 314 * @block: The block that needs to be committed. 315 */ 316 static void launch_write(struct slab_summary_block *block) 317 { 318 struct block_allocator *allocator = block->allocator; 319 struct slab_depot *depot = allocator->depot; 320 physical_block_number_t pbn; 321 322 if (block->writing) 323 return; 324 325 allocator->summary_write_count++; 326 vdo_waitq_transfer_all_waiters(&block->next_update_waiters, 327 &block->current_update_waiters); 328 block->writing = true; 329 330 if (vdo_is_read_only(depot->vdo)) { 331 finish_updating_slab_summary_block(block); 332 return; 333 } 334 335 memcpy(block->outgoing_entries, block->entries, VDO_BLOCK_SIZE); 336 337 /* 338 * Flush before writing to ensure that the slab journal tail blocks and reference updates 339 * covered by this summary update are stable. Otherwise, a subsequent recovery could 340 * encounter a slab summary update that refers to a slab journal tail block that has not 341 * actually been written. In such cases, the slab journal referenced will be treated as 342 * empty, causing any data within the slab which predates the existing recovery journal 343 * entries to be lost. 344 */ 345 pbn = (depot->summary_origin + 346 (VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE * allocator->zone_number) + 347 block->index); 348 vdo_submit_metadata_vio(&block->vio, pbn, write_slab_summary_endio, 349 handle_write_error, REQ_OP_WRITE | REQ_PREFLUSH); 350 } 351 352 /** 353 * update_slab_summary_entry() - Update the entry for a slab. 354 * @slab: The slab whose entry is to be updated. 355 * @waiter: The waiter that is updating the summary. 356 * @tail_block_offset: The offset of the slab journal's tail block. 357 * @load_ref_counts: Whether the reference counts must be loaded from disk on the vdo load. 358 * @is_clean: Whether the slab is clean. 359 * @free_blocks: The number of free blocks. 360 */ 361 static void update_slab_summary_entry(struct vdo_slab *slab, struct vdo_waiter *waiter, 362 tail_block_offset_t tail_block_offset, 363 bool load_ref_counts, bool is_clean, 364 block_count_t free_blocks) 365 { 366 u8 index = slab->slab_number / VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK; 367 struct block_allocator *allocator = slab->allocator; 368 struct slab_summary_block *block = &allocator->summary_blocks[index]; 369 int result; 370 struct slab_summary_entry *entry; 371 372 if (vdo_is_read_only(block->vio.completion.vdo)) { 373 result = VDO_READ_ONLY; 374 waiter->callback(waiter, &result); 375 return; 376 } 377 378 if (vdo_is_state_draining(&allocator->summary_state) || 379 vdo_is_state_quiescent(&allocator->summary_state)) { 380 result = VDO_INVALID_ADMIN_STATE; 381 waiter->callback(waiter, &result); 382 return; 383 } 384 385 entry = &allocator->summary_entries[slab->slab_number]; 386 *entry = (struct slab_summary_entry) { 387 .tail_block_offset = tail_block_offset, 388 .load_ref_counts = (entry->load_ref_counts || load_ref_counts), 389 .is_dirty = !is_clean, 390 .fullness_hint = compute_fullness_hint(allocator->depot, free_blocks), 391 }; 392 vdo_waitq_enqueue_waiter(&block->next_update_waiters, waiter); 393 launch_write(block); 394 } 395 396 /** 397 * finish_reaping() - Actually advance the head of the journal now that any necessary flushes are 398 * complete. 399 * @journal: The journal to be reaped. 400 */ 401 static void finish_reaping(struct slab_journal *journal) 402 { 403 journal->head = journal->unreapable; 404 add_entries(journal); 405 check_if_slab_drained(journal->slab); 406 } 407 408 static void reap_slab_journal(struct slab_journal *journal); 409 410 /** 411 * complete_reaping() - Finish reaping now that we have flushed the lower layer and then try 412 * reaping again in case we deferred reaping due to an outstanding vio. 413 * @completion: The flush vio. 414 */ 415 static void complete_reaping(struct vdo_completion *completion) 416 { 417 struct slab_journal *journal = completion->parent; 418 419 return_vio_to_pool(vio_as_pooled_vio(as_vio(completion))); 420 finish_reaping(journal); 421 reap_slab_journal(journal); 422 } 423 424 /** 425 * handle_flush_error() - Handle an error flushing the lower layer. 426 * @completion: The flush vio. 427 */ 428 static void handle_flush_error(struct vdo_completion *completion) 429 { 430 vio_record_metadata_io_error(as_vio(completion)); 431 vdo_enter_read_only_mode(completion->vdo, completion->result); 432 complete_reaping(completion); 433 } 434 435 static void flush_endio(struct bio *bio) 436 { 437 struct vio *vio = bio->bi_private; 438 struct slab_journal *journal = vio->completion.parent; 439 440 continue_vio_after_io(vio, complete_reaping, 441 journal->slab->allocator->thread_id); 442 } 443 444 /** 445 * flush_for_reaping() - A waiter callback for getting a vio with which to flush the lower layer 446 * prior to reaping. 447 * @waiter: The journal as a flush waiter. 448 * @context: The newly acquired flush vio. 449 */ 450 static void flush_for_reaping(struct vdo_waiter *waiter, void *context) 451 { 452 struct slab_journal *journal = 453 container_of(waiter, struct slab_journal, flush_waiter); 454 struct pooled_vio *pooled = context; 455 struct vio *vio = &pooled->vio; 456 457 vio->completion.parent = journal; 458 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error); 459 } 460 461 /** 462 * reap_slab_journal() - Conduct a reap on a slab journal to reclaim unreferenced blocks. 463 * @journal: The slab journal. 464 */ 465 static void reap_slab_journal(struct slab_journal *journal) 466 { 467 bool reaped = false; 468 469 if (is_reaping(journal)) { 470 /* We already have a reap in progress so wait for it to finish. */ 471 return; 472 } 473 474 if ((journal->slab->status != VDO_SLAB_REBUILT) || 475 !vdo_is_state_normal(&journal->slab->state) || 476 vdo_is_read_only(journal->slab->allocator->depot->vdo)) { 477 /* 478 * We must not reap in the first two cases, and there's no point in read-only mode. 479 */ 480 return; 481 } 482 483 /* 484 * Start reclaiming blocks only when the journal head has no references. Then stop when a 485 * block is referenced or reap reaches the most recently written block, referenced by the 486 * slab summary, which has the sequence number just before the tail. 487 */ 488 while ((journal->unreapable < journal->tail) && (journal->reap_lock->count == 0)) { 489 reaped = true; 490 journal->unreapable++; 491 journal->reap_lock++; 492 if (journal->reap_lock == &journal->locks[journal->size]) 493 journal->reap_lock = &journal->locks[0]; 494 } 495 496 if (!reaped) 497 return; 498 499 /* 500 * It is never safe to reap a slab journal block without first issuing a flush, regardless 501 * of whether a user flush has been received or not. In the absence of the flush, the 502 * reference block write which released the locks allowing the slab journal to reap may not 503 * be persisted. Although slab summary writes will eventually issue flushes, multiple slab 504 * journal block writes can be issued while previous slab summary updates have not yet been 505 * made. Even though those slab journal block writes will be ignored if the slab summary 506 * update is not persisted, they may still overwrite the to-be-reaped slab journal block 507 * resulting in a loss of reference count updates. 508 */ 509 journal->flush_waiter.callback = flush_for_reaping; 510 acquire_vio_from_pool(journal->slab->allocator->vio_pool, 511 &journal->flush_waiter); 512 } 513 514 /** 515 * adjust_slab_journal_block_reference() - Adjust the reference count for a slab journal block. 516 * @journal: The slab journal. 517 * @sequence_number: The journal sequence number of the referenced block. 518 * @adjustment: Amount to adjust the reference counter. 519 * 520 * Note that when the adjustment is negative, the slab journal will be reaped. 521 */ 522 static void adjust_slab_journal_block_reference(struct slab_journal *journal, 523 sequence_number_t sequence_number, 524 int adjustment) 525 { 526 struct journal_lock *lock; 527 528 if (sequence_number == 0) 529 return; 530 531 if (journal->slab->status == VDO_SLAB_REPLAYING) { 532 /* Locks should not be used during offline replay. */ 533 return; 534 } 535 536 VDO_ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero"); 537 lock = get_lock(journal, sequence_number); 538 if (adjustment < 0) { 539 VDO_ASSERT_LOG_ONLY((-adjustment <= lock->count), 540 "adjustment %d of lock count %u for slab journal block %llu must not underflow", 541 adjustment, lock->count, 542 (unsigned long long) sequence_number); 543 } 544 545 lock->count += adjustment; 546 if (lock->count == 0) 547 reap_slab_journal(journal); 548 } 549 550 /** 551 * release_journal_locks() - Callback invoked after a slab summary update completes. 552 * @waiter: The slab summary waiter that has just been notified. 553 * @context: The result code of the update. 554 * 555 * Registered in the constructor on behalf of update_tail_block_location(). 556 * 557 * Implements waiter_callback_fn. 558 */ 559 static void release_journal_locks(struct vdo_waiter *waiter, void *context) 560 { 561 sequence_number_t first, i; 562 struct slab_journal *journal = 563 container_of(waiter, struct slab_journal, slab_summary_waiter); 564 int result = *((int *) context); 565 566 if (result != VDO_SUCCESS) { 567 if (result != VDO_READ_ONLY) { 568 /* 569 * Don't bother logging what might be lots of errors if we are already in 570 * read-only mode. 571 */ 572 vdo_log_error_strerror(result, "failed slab summary update %llu", 573 (unsigned long long) journal->summarized); 574 } 575 576 journal->updating_slab_summary = false; 577 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result); 578 check_if_slab_drained(journal->slab); 579 return; 580 } 581 582 if (journal->partial_write_in_progress && (journal->summarized == journal->tail)) { 583 journal->partial_write_in_progress = false; 584 add_entries(journal); 585 } 586 587 first = journal->last_summarized; 588 journal->last_summarized = journal->summarized; 589 for (i = journal->summarized - 1; i >= first; i--) { 590 /* 591 * Release the lock the summarized block held on the recovery journal. (During 592 * replay, recovery_start will always be 0.) 593 */ 594 if (journal->recovery_journal != NULL) { 595 zone_count_t zone_number = journal->slab->allocator->zone_number; 596 struct journal_lock *lock = get_lock(journal, i); 597 598 vdo_release_recovery_journal_block_reference(journal->recovery_journal, 599 lock->recovery_start, 600 VDO_ZONE_TYPE_PHYSICAL, 601 zone_number); 602 } 603 604 /* 605 * Release our own lock against reaping for blocks that are committed. (This 606 * function will not change locks during replay.) 607 */ 608 adjust_slab_journal_block_reference(journal, i, -1); 609 } 610 611 journal->updating_slab_summary = false; 612 613 reap_slab_journal(journal); 614 615 /* Check if the slab summary needs to be updated again. */ 616 update_tail_block_location(journal); 617 } 618 619 /** 620 * update_tail_block_location() - Update the tail block location in the slab summary, if necessary. 621 * @journal: The slab journal that is updating its tail block location. 622 */ 623 static void update_tail_block_location(struct slab_journal *journal) 624 { 625 block_count_t free_block_count; 626 struct vdo_slab *slab = journal->slab; 627 628 if (journal->updating_slab_summary || 629 vdo_is_read_only(journal->slab->allocator->depot->vdo) || 630 (journal->last_summarized >= journal->next_commit)) { 631 check_if_slab_drained(slab); 632 return; 633 } 634 635 if (slab->status != VDO_SLAB_REBUILT) { 636 u8 hint = slab->allocator->summary_entries[slab->slab_number].fullness_hint; 637 638 free_block_count = ((block_count_t) hint) << slab->allocator->depot->hint_shift; 639 } else { 640 free_block_count = slab->free_blocks; 641 } 642 643 journal->summarized = journal->next_commit; 644 journal->updating_slab_summary = true; 645 646 /* 647 * Update slab summary as dirty. 648 * vdo_slab journal can only reap past sequence number 1 when all the ref counts for this 649 * slab have been written to the layer. Therefore, indicate that the ref counts must be 650 * loaded when the journal head has reaped past sequence number 1. 651 */ 652 update_slab_summary_entry(slab, &journal->slab_summary_waiter, 653 journal->summarized % journal->size, 654 (journal->head > 1), false, free_block_count); 655 } 656 657 /** 658 * reopen_slab_journal() - Reopen a slab's journal by emptying it and then adding pending entries. 659 * @slab: The slab to reopen. 660 */ 661 static void reopen_slab_journal(struct vdo_slab *slab) 662 { 663 struct slab_journal *journal = &slab->journal; 664 sequence_number_t block; 665 666 VDO_ASSERT_LOG_ONLY(journal->tail_header.entry_count == 0, 667 "vdo_slab journal's active block empty before reopening"); 668 journal->head = journal->tail; 669 initialize_journal_state(journal); 670 671 /* Ensure no locks are spuriously held on an empty journal. */ 672 for (block = 1; block <= journal->size; block++) { 673 VDO_ASSERT_LOG_ONLY((get_lock(journal, block)->count == 0), 674 "Scrubbed journal's block %llu is not locked", 675 (unsigned long long) block); 676 } 677 678 add_entries(journal); 679 } 680 681 static sequence_number_t get_committing_sequence_number(const struct pooled_vio *vio) 682 { 683 const struct packed_slab_journal_block *block = 684 (const struct packed_slab_journal_block *) vio->vio.data; 685 686 return __le64_to_cpu(block->header.sequence_number); 687 } 688 689 /** 690 * complete_write() - Handle post-commit processing. 691 * @completion: The write vio as a completion. 692 * 693 * This is the callback registered by write_slab_journal_block(). 694 */ 695 static void complete_write(struct vdo_completion *completion) 696 { 697 int result = completion->result; 698 struct pooled_vio *pooled = vio_as_pooled_vio(as_vio(completion)); 699 struct slab_journal *journal = completion->parent; 700 sequence_number_t committed = get_committing_sequence_number(pooled); 701 702 list_del_init(&pooled->list_entry); 703 return_vio_to_pool(pooled); 704 705 if (result != VDO_SUCCESS) { 706 vio_record_metadata_io_error(as_vio(completion)); 707 vdo_log_error_strerror(result, "cannot write slab journal block %llu", 708 (unsigned long long) committed); 709 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result); 710 check_if_slab_drained(journal->slab); 711 return; 712 } 713 714 WRITE_ONCE(journal->events->blocks_written, journal->events->blocks_written + 1); 715 716 if (list_empty(&journal->uncommitted_blocks)) { 717 /* If no blocks are outstanding, then the commit point is at the tail. */ 718 journal->next_commit = journal->tail; 719 } else { 720 /* The commit point is always the beginning of the oldest incomplete block. */ 721 pooled = container_of(journal->uncommitted_blocks.next, 722 struct pooled_vio, list_entry); 723 journal->next_commit = get_committing_sequence_number(pooled); 724 } 725 726 update_tail_block_location(journal); 727 } 728 729 static void write_slab_journal_endio(struct bio *bio) 730 { 731 struct vio *vio = bio->bi_private; 732 struct slab_journal *journal = vio->completion.parent; 733 734 continue_vio_after_io(vio, complete_write, journal->slab->allocator->thread_id); 735 } 736 737 /** 738 * write_slab_journal_block() - Write a slab journal block. 739 * @waiter: The vio pool waiter which was just notified. 740 * @context: The vio pool entry for the write. 741 * 742 * Callback from acquire_vio_from_pool() registered in commit_tail(). 743 */ 744 static void write_slab_journal_block(struct vdo_waiter *waiter, void *context) 745 { 746 struct pooled_vio *pooled = context; 747 struct vio *vio = &pooled->vio; 748 struct slab_journal *journal = 749 container_of(waiter, struct slab_journal, resource_waiter); 750 struct slab_journal_block_header *header = &journal->tail_header; 751 int unused_entries = journal->entries_per_block - header->entry_count; 752 physical_block_number_t block_number; 753 const struct admin_state_code *operation; 754 755 header->head = journal->head; 756 list_add_tail(&pooled->list_entry, &journal->uncommitted_blocks); 757 vdo_pack_slab_journal_block_header(header, &journal->block->header); 758 759 /* Copy the tail block into the vio. */ 760 memcpy(pooled->vio.data, journal->block, VDO_BLOCK_SIZE); 761 762 VDO_ASSERT_LOG_ONLY(unused_entries >= 0, "vdo_slab journal block is not overfull"); 763 if (unused_entries > 0) { 764 /* 765 * Release the per-entry locks for any unused entries in the block we are about to 766 * write. 767 */ 768 adjust_slab_journal_block_reference(journal, header->sequence_number, 769 -unused_entries); 770 journal->partial_write_in_progress = !block_is_full(journal); 771 } 772 773 block_number = journal->slab->journal_origin + 774 (header->sequence_number % journal->size); 775 vio->completion.parent = journal; 776 777 /* 778 * This block won't be read in recovery until the slab summary is updated to refer to it. 779 * The slab summary update does a flush which is sufficient to protect us from corruption 780 * due to out of order slab journal, reference block, or block map writes. 781 */ 782 vdo_submit_metadata_vio(vdo_forget(vio), block_number, write_slab_journal_endio, 783 complete_write, REQ_OP_WRITE); 784 785 /* Since the write is submitted, the tail block structure can be reused. */ 786 journal->tail++; 787 initialize_tail_block(journal); 788 journal->waiting_to_commit = false; 789 790 operation = vdo_get_admin_state_code(&journal->slab->state); 791 if (operation == VDO_ADMIN_STATE_WAITING_FOR_RECOVERY) { 792 vdo_finish_operation(&journal->slab->state, 793 (vdo_is_read_only(journal->slab->allocator->depot->vdo) ? 794 VDO_READ_ONLY : VDO_SUCCESS)); 795 return; 796 } 797 798 add_entries(journal); 799 } 800 801 /** 802 * commit_tail() - Commit the tail block of the slab journal. 803 * @journal: The journal whose tail block should be committed. 804 */ 805 static void commit_tail(struct slab_journal *journal) 806 { 807 if ((journal->tail_header.entry_count == 0) && must_make_entries_to_flush(journal)) { 808 /* 809 * There are no entries at the moment, but there are some waiters, so defer 810 * initiating the flush until those entries are ready to write. 811 */ 812 return; 813 } 814 815 if (vdo_is_read_only(journal->slab->allocator->depot->vdo) || 816 journal->waiting_to_commit || 817 (journal->tail_header.entry_count == 0)) { 818 /* 819 * There is nothing to do since the tail block is empty, or writing, or the journal 820 * is in read-only mode. 821 */ 822 return; 823 } 824 825 /* 826 * Since we are about to commit the tail block, this journal no longer needs to be on the 827 * list of journals which the recovery journal might ask to commit. 828 */ 829 mark_slab_journal_clean(journal); 830 831 journal->waiting_to_commit = true; 832 833 journal->resource_waiter.callback = write_slab_journal_block; 834 acquire_vio_from_pool(journal->slab->allocator->vio_pool, 835 &journal->resource_waiter); 836 } 837 838 /** 839 * encode_slab_journal_entry() - Encode a slab journal entry. 840 * @tail_header: The unpacked header for the block. 841 * @payload: The journal block payload to hold the entry. 842 * @sbn: The slab block number of the entry to encode. 843 * @operation: The type of the entry. 844 * @increment: True if this is an increment. 845 */ 846 static void encode_slab_journal_entry(struct slab_journal_block_header *tail_header, 847 slab_journal_payload *payload, 848 slab_block_number sbn, 849 enum journal_operation operation, 850 bool increment) 851 { 852 journal_entry_count_t entry_number = tail_header->entry_count++; 853 854 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) { 855 if (!tail_header->has_block_map_increments) { 856 memset(payload->full_entries.entry_types, 0, 857 VDO_SLAB_JOURNAL_ENTRY_TYPES_SIZE); 858 tail_header->has_block_map_increments = true; 859 } 860 861 payload->full_entries.entry_types[entry_number / 8] |= 862 ((u8)1 << (entry_number % 8)); 863 } 864 865 vdo_pack_slab_journal_entry(&payload->entries[entry_number], sbn, increment); 866 } 867 868 /** 869 * expand_journal_point() - Convert a recovery journal journal_point which refers to both an 870 * increment and a decrement to a single point which refers to one or the 871 * other. 872 * @recovery_point: The journal point to convert. 873 * @increment: Whether the current entry is an increment. 874 * 875 * Return: The expanded journal point 876 * 877 * Because each data_vio has but a single recovery journal point, but may need to make both 878 * increment and decrement entries in the same slab journal. In order to distinguish the two 879 * entries, the entry count of the expanded journal point is twice the actual recovery journal 880 * entry count for increments, and one more than that for decrements. 881 */ 882 static struct journal_point expand_journal_point(struct journal_point recovery_point, 883 bool increment) 884 { 885 recovery_point.entry_count *= 2; 886 if (!increment) 887 recovery_point.entry_count++; 888 889 return recovery_point; 890 } 891 892 /** 893 * add_entry() - Actually add an entry to the slab journal, potentially firing off a write if a 894 * block becomes full. 895 * @journal: The slab journal to append to. 896 * @pbn: The pbn being adjusted. 897 * @operation: The type of entry to make. 898 * @increment: True if this is an increment. 899 * @recovery_point: The expanded recovery point. 900 * 901 * This function is synchronous. 902 */ 903 static void add_entry(struct slab_journal *journal, physical_block_number_t pbn, 904 enum journal_operation operation, bool increment, 905 struct journal_point recovery_point) 906 { 907 struct packed_slab_journal_block *block = journal->block; 908 int result; 909 910 result = VDO_ASSERT(vdo_before_journal_point(&journal->tail_header.recovery_point, 911 &recovery_point), 912 "recovery journal point is monotonically increasing, recovery point: %llu.%u, block recovery point: %llu.%u", 913 (unsigned long long) recovery_point.sequence_number, 914 recovery_point.entry_count, 915 (unsigned long long) journal->tail_header.recovery_point.sequence_number, 916 journal->tail_header.recovery_point.entry_count); 917 if (result != VDO_SUCCESS) { 918 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result); 919 return; 920 } 921 922 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) { 923 result = VDO_ASSERT((journal->tail_header.entry_count < 924 journal->full_entries_per_block), 925 "block has room for full entries"); 926 if (result != VDO_SUCCESS) { 927 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, 928 result); 929 return; 930 } 931 } 932 933 encode_slab_journal_entry(&journal->tail_header, &block->payload, 934 pbn - journal->slab->start, operation, increment); 935 journal->tail_header.recovery_point = recovery_point; 936 if (block_is_full(journal)) 937 commit_tail(journal); 938 } 939 940 static inline block_count_t journal_length(const struct slab_journal *journal) 941 { 942 return journal->tail - journal->head; 943 } 944 945 /** 946 * vdo_attempt_replay_into_slab() - Replay a recovery journal entry into a slab's journal. 947 * @slab: The slab to play into. 948 * @pbn: The PBN for the entry. 949 * @operation: The type of entry to add. 950 * @increment: True if this entry is an increment. 951 * @recovery_point: The recovery journal point corresponding to this entry. 952 * @parent: The completion to notify when there is space to add the entry if the entry could not be 953 * added immediately. 954 * 955 * Return: True if the entry was added immediately. 956 */ 957 bool vdo_attempt_replay_into_slab(struct vdo_slab *slab, physical_block_number_t pbn, 958 enum journal_operation operation, bool increment, 959 struct journal_point *recovery_point, 960 struct vdo_completion *parent) 961 { 962 struct slab_journal *journal = &slab->journal; 963 struct slab_journal_block_header *header = &journal->tail_header; 964 struct journal_point expanded = expand_journal_point(*recovery_point, increment); 965 966 /* Only accept entries after the current recovery point. */ 967 if (!vdo_before_journal_point(&journal->tail_header.recovery_point, &expanded)) 968 return true; 969 970 if ((header->entry_count >= journal->full_entries_per_block) && 971 (header->has_block_map_increments || (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING))) { 972 /* 973 * The tail block does not have room for the entry we are attempting to add so 974 * commit the tail block now. 975 */ 976 commit_tail(journal); 977 } 978 979 if (journal->waiting_to_commit) { 980 vdo_start_operation_with_waiter(&journal->slab->state, 981 VDO_ADMIN_STATE_WAITING_FOR_RECOVERY, 982 parent, NULL); 983 return false; 984 } 985 986 if (journal_length(journal) >= journal->size) { 987 /* 988 * We must have reaped the current head before the crash, since the blocked 989 * threshold keeps us from having more entries than fit in a slab journal; hence we 990 * can just advance the head (and unreapable block), as needed. 991 */ 992 journal->head++; 993 journal->unreapable++; 994 } 995 996 if (journal->slab->status == VDO_SLAB_REBUILT) 997 journal->slab->status = VDO_SLAB_REPLAYING; 998 999 add_entry(journal, pbn, operation, increment, expanded); 1000 return true; 1001 } 1002 1003 /** 1004 * requires_reaping() - Check whether the journal must be reaped before adding new entries. 1005 * @journal: The journal to check. 1006 * 1007 * Return: True if the journal must be reaped. 1008 */ 1009 static bool requires_reaping(const struct slab_journal *journal) 1010 { 1011 return (journal_length(journal) >= journal->blocking_threshold); 1012 } 1013 1014 /** finish_summary_update() - A waiter callback that resets the writing state of a slab. */ 1015 static void finish_summary_update(struct vdo_waiter *waiter, void *context) 1016 { 1017 struct vdo_slab *slab = container_of(waiter, struct vdo_slab, summary_waiter); 1018 int result = *((int *) context); 1019 1020 slab->active_count--; 1021 1022 if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) { 1023 vdo_log_error_strerror(result, "failed to update slab summary"); 1024 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result); 1025 } 1026 1027 check_if_slab_drained(slab); 1028 } 1029 1030 static void write_reference_block(struct vdo_waiter *waiter, void *context); 1031 1032 /** 1033 * launch_reference_block_write() - Launch the write of a dirty reference block by first acquiring 1034 * a VIO for it from the pool. 1035 * @waiter: The waiter of the block which is starting to write. 1036 * @context: The parent slab of the block. 1037 * 1038 * This can be asynchronous since the writer will have to wait if all VIOs in the pool are 1039 * currently in use. 1040 */ 1041 static void launch_reference_block_write(struct vdo_waiter *waiter, void *context) 1042 { 1043 struct vdo_slab *slab = context; 1044 1045 if (vdo_is_read_only(slab->allocator->depot->vdo)) 1046 return; 1047 1048 slab->active_count++; 1049 container_of(waiter, struct reference_block, waiter)->is_writing = true; 1050 waiter->callback = write_reference_block; 1051 acquire_vio_from_pool(slab->allocator->vio_pool, waiter); 1052 } 1053 1054 static void save_dirty_reference_blocks(struct vdo_slab *slab) 1055 { 1056 vdo_waitq_notify_all_waiters(&slab->dirty_blocks, 1057 launch_reference_block_write, slab); 1058 check_if_slab_drained(slab); 1059 } 1060 1061 /** 1062 * finish_reference_block_write() - After a reference block has written, clean it, release its 1063 * locks, and return its VIO to the pool. 1064 * @completion: The VIO that just finished writing. 1065 */ 1066 static void finish_reference_block_write(struct vdo_completion *completion) 1067 { 1068 struct vio *vio = as_vio(completion); 1069 struct pooled_vio *pooled = vio_as_pooled_vio(vio); 1070 struct reference_block *block = completion->parent; 1071 struct vdo_slab *slab = block->slab; 1072 tail_block_offset_t offset; 1073 1074 slab->active_count--; 1075 1076 /* Release the slab journal lock. */ 1077 adjust_slab_journal_block_reference(&slab->journal, 1078 block->slab_journal_lock_to_release, -1); 1079 return_vio_to_pool(pooled); 1080 1081 /* 1082 * We can't clear the is_writing flag earlier as releasing the slab journal lock may cause 1083 * us to be dirtied again, but we don't want to double enqueue. 1084 */ 1085 block->is_writing = false; 1086 1087 if (vdo_is_read_only(completion->vdo)) { 1088 check_if_slab_drained(slab); 1089 return; 1090 } 1091 1092 /* Re-queue the block if it was re-dirtied while it was writing. */ 1093 if (block->is_dirty) { 1094 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter); 1095 if (vdo_is_state_draining(&slab->state)) { 1096 /* We must be saving, and this block will otherwise not be relaunched. */ 1097 save_dirty_reference_blocks(slab); 1098 } 1099 1100 return; 1101 } 1102 1103 /* 1104 * Mark the slab as clean in the slab summary if there are no dirty or writing blocks 1105 * and no summary update in progress. 1106 */ 1107 if ((slab->active_count > 0) || vdo_waitq_has_waiters(&slab->dirty_blocks)) { 1108 check_if_slab_drained(slab); 1109 return; 1110 } 1111 1112 offset = slab->allocator->summary_entries[slab->slab_number].tail_block_offset; 1113 slab->active_count++; 1114 slab->summary_waiter.callback = finish_summary_update; 1115 update_slab_summary_entry(slab, &slab->summary_waiter, offset, 1116 true, true, slab->free_blocks); 1117 } 1118 1119 /** 1120 * get_reference_counters_for_block() - Find the reference counters for a given block. 1121 * @block: The reference_block in question. 1122 * 1123 * Return: A pointer to the reference counters for this block. 1124 */ 1125 static vdo_refcount_t * __must_check get_reference_counters_for_block(struct reference_block *block) 1126 { 1127 size_t block_index = block - block->slab->reference_blocks; 1128 1129 return &block->slab->counters[block_index * COUNTS_PER_BLOCK]; 1130 } 1131 1132 /** 1133 * pack_reference_block() - Copy data from a reference block to a buffer ready to be written out. 1134 * @block: The block to copy. 1135 * @buffer: The char buffer to fill with the packed block. 1136 */ 1137 static void pack_reference_block(struct reference_block *block, void *buffer) 1138 { 1139 struct packed_reference_block *packed = buffer; 1140 vdo_refcount_t *counters = get_reference_counters_for_block(block); 1141 sector_count_t i; 1142 struct packed_journal_point commit_point; 1143 1144 vdo_pack_journal_point(&block->slab->slab_journal_point, &commit_point); 1145 1146 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) { 1147 packed->sectors[i].commit_point = commit_point; 1148 memcpy(packed->sectors[i].counts, counters + (i * COUNTS_PER_SECTOR), 1149 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR)); 1150 } 1151 } 1152 1153 static void write_reference_block_endio(struct bio *bio) 1154 { 1155 struct vio *vio = bio->bi_private; 1156 struct reference_block *block = vio->completion.parent; 1157 thread_id_t thread_id = block->slab->allocator->thread_id; 1158 1159 continue_vio_after_io(vio, finish_reference_block_write, thread_id); 1160 } 1161 1162 /** 1163 * handle_io_error() - Handle an I/O error reading or writing a reference count block. 1164 * @completion: The VIO doing the I/O as a completion. 1165 */ 1166 static void handle_io_error(struct vdo_completion *completion) 1167 { 1168 int result = completion->result; 1169 struct vio *vio = as_vio(completion); 1170 struct vdo_slab *slab = ((struct reference_block *) completion->parent)->slab; 1171 1172 vio_record_metadata_io_error(vio); 1173 return_vio_to_pool(vio_as_pooled_vio(vio)); 1174 slab->active_count -= vio->io_size / VDO_BLOCK_SIZE; 1175 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result); 1176 check_if_slab_drained(slab); 1177 } 1178 1179 /** 1180 * write_reference_block() - After a dirty block waiter has gotten a VIO from the VIO pool, copy 1181 * its counters and associated data into the VIO, and launch the write. 1182 * @waiter: The waiter of the dirty block. 1183 * @context: The VIO returned by the pool. 1184 */ 1185 static void write_reference_block(struct vdo_waiter *waiter, void *context) 1186 { 1187 size_t block_offset; 1188 physical_block_number_t pbn; 1189 struct pooled_vio *pooled = context; 1190 struct vdo_completion *completion = &pooled->vio.completion; 1191 struct reference_block *block = container_of(waiter, struct reference_block, 1192 waiter); 1193 1194 pack_reference_block(block, pooled->vio.data); 1195 block_offset = (block - block->slab->reference_blocks); 1196 pbn = (block->slab->ref_counts_origin + block_offset); 1197 block->slab_journal_lock_to_release = block->slab_journal_lock; 1198 completion->parent = block; 1199 1200 /* 1201 * Mark the block as clean, since we won't be committing any updates that happen after this 1202 * moment. As long as VIO order is preserved, two VIOs updating this block at once will not 1203 * cause complications. 1204 */ 1205 block->is_dirty = false; 1206 1207 /* 1208 * Flush before writing to ensure that the recovery journal and slab journal entries which 1209 * cover this reference update are stable. This prevents data corruption that can be caused 1210 * by out of order writes. 1211 */ 1212 WRITE_ONCE(block->slab->allocator->ref_counts_statistics.blocks_written, 1213 block->slab->allocator->ref_counts_statistics.blocks_written + 1); 1214 1215 completion->callback_thread_id = ((struct block_allocator *) pooled->context)->thread_id; 1216 vdo_submit_metadata_vio(&pooled->vio, pbn, write_reference_block_endio, 1217 handle_io_error, REQ_OP_WRITE | REQ_PREFLUSH); 1218 } 1219 1220 static void reclaim_journal_space(struct slab_journal *journal) 1221 { 1222 block_count_t length = journal_length(journal); 1223 struct vdo_slab *slab = journal->slab; 1224 block_count_t write_count = vdo_waitq_num_waiters(&slab->dirty_blocks); 1225 block_count_t written; 1226 1227 if ((length < journal->flushing_threshold) || (write_count == 0)) 1228 return; 1229 1230 /* The slab journal is over the first threshold, schedule some reference block writes. */ 1231 WRITE_ONCE(journal->events->flush_count, journal->events->flush_count + 1); 1232 if (length < journal->flushing_deadline) { 1233 /* Schedule more writes the closer to the deadline we get. */ 1234 write_count /= journal->flushing_deadline - length + 1; 1235 write_count = max_t(block_count_t, write_count, 1); 1236 } 1237 1238 for (written = 0; written < write_count; written++) { 1239 vdo_waitq_notify_next_waiter(&slab->dirty_blocks, 1240 launch_reference_block_write, slab); 1241 } 1242 } 1243 1244 /** 1245 * reference_count_to_status() - Convert a reference count to a reference status. 1246 * @count: The count to convert. 1247 * 1248 * Return: The appropriate reference status. 1249 */ 1250 static enum reference_status __must_check reference_count_to_status(vdo_refcount_t count) 1251 { 1252 if (count == EMPTY_REFERENCE_COUNT) 1253 return RS_FREE; 1254 else if (count == 1) 1255 return RS_SINGLE; 1256 else if (count == PROVISIONAL_REFERENCE_COUNT) 1257 return RS_PROVISIONAL; 1258 else 1259 return RS_SHARED; 1260 } 1261 1262 /** 1263 * dirty_block() - Mark a reference count block as dirty, potentially adding it to the dirty queue 1264 * if it wasn't already dirty. 1265 * @block: The reference block to mark as dirty. 1266 */ 1267 static void dirty_block(struct reference_block *block) 1268 { 1269 if (block->is_dirty) 1270 return; 1271 1272 block->is_dirty = true; 1273 if (!block->is_writing) 1274 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter); 1275 } 1276 1277 /** 1278 * get_reference_block() - Get the reference block that covers the given block index. 1279 * @slab: The slab containing the references. 1280 * @index: The index of the physical block. 1281 */ 1282 static struct reference_block * __must_check get_reference_block(struct vdo_slab *slab, 1283 slab_block_number index) 1284 { 1285 return &slab->reference_blocks[index / COUNTS_PER_BLOCK]; 1286 } 1287 1288 /** 1289 * slab_block_number_from_pbn() - Determine the index within the slab of a particular physical 1290 * block number. 1291 * @slab: The slab. 1292 * @pbn: The physical block number. 1293 * @slab_block_number_ptr: A pointer to the slab block number. 1294 * 1295 * Return: VDO_SUCCESS or an error code. 1296 */ 1297 static int __must_check slab_block_number_from_pbn(struct vdo_slab *slab, 1298 physical_block_number_t pbn, 1299 slab_block_number *slab_block_number_ptr) 1300 { 1301 u64 slab_block_number; 1302 1303 if (pbn < slab->start) 1304 return VDO_OUT_OF_RANGE; 1305 1306 slab_block_number = pbn - slab->start; 1307 if (slab_block_number >= slab->allocator->depot->slab_config.data_blocks) 1308 return VDO_OUT_OF_RANGE; 1309 1310 *slab_block_number_ptr = slab_block_number; 1311 return VDO_SUCCESS; 1312 } 1313 1314 /** 1315 * get_reference_counter() - Get the reference counter that covers the given physical block number. 1316 * @slab: The slab to query. 1317 * @pbn: The physical block number. 1318 * @counter_ptr: A pointer to the reference counter. 1319 */ 1320 static int __must_check get_reference_counter(struct vdo_slab *slab, 1321 physical_block_number_t pbn, 1322 vdo_refcount_t **counter_ptr) 1323 { 1324 slab_block_number index; 1325 int result = slab_block_number_from_pbn(slab, pbn, &index); 1326 1327 if (result != VDO_SUCCESS) 1328 return result; 1329 1330 *counter_ptr = &slab->counters[index]; 1331 1332 return VDO_SUCCESS; 1333 } 1334 1335 static unsigned int calculate_slab_priority(struct vdo_slab *slab) 1336 { 1337 block_count_t free_blocks = slab->free_blocks; 1338 unsigned int unopened_slab_priority = slab->allocator->unopened_slab_priority; 1339 unsigned int priority; 1340 1341 /* 1342 * Wholly full slabs must be the only ones with lowest priority, 0. 1343 * 1344 * Slabs that have never been opened (empty, newly initialized, and never been written to) 1345 * have lower priority than previously opened slabs that have a significant number of free 1346 * blocks. This ranking causes VDO to avoid writing physical blocks for the first time 1347 * unless there are very few free blocks that have been previously written to. 1348 * 1349 * Since VDO doesn't discard blocks currently, reusing previously written blocks makes VDO 1350 * a better client of any underlying storage that is thinly-provisioned (though discarding 1351 * would be better). 1352 * 1353 * For all other slabs, the priority is derived from the logarithm of the number of free 1354 * blocks. Slabs with the same order of magnitude of free blocks have the same priority. 1355 * With 2^23 blocks, the priority will range from 1 to 25. The reserved 1356 * unopened_slab_priority divides the range and is skipped by the logarithmic mapping. 1357 */ 1358 1359 if (free_blocks == 0) 1360 return 0; 1361 1362 if (is_slab_journal_blank(slab)) 1363 return unopened_slab_priority; 1364 1365 priority = (1 + ilog2(free_blocks)); 1366 return ((priority < unopened_slab_priority) ? priority : priority + 1); 1367 } 1368 1369 /* 1370 * Slabs are essentially prioritized by an approximation of the number of free blocks in the slab 1371 * so slabs with lots of free blocks will be opened for allocation before slabs that have few free 1372 * blocks. 1373 */ 1374 static void prioritize_slab(struct vdo_slab *slab) 1375 { 1376 VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry), 1377 "a slab must not already be on a list when prioritizing"); 1378 slab->priority = calculate_slab_priority(slab); 1379 vdo_priority_table_enqueue(slab->allocator->prioritized_slabs, 1380 slab->priority, &slab->allocq_entry); 1381 } 1382 1383 /** 1384 * adjust_free_block_count() - Adjust the free block count and (if needed) reprioritize the slab. 1385 * @slab: The slab. 1386 * @incremented: True if the free block count went up. 1387 */ 1388 static void adjust_free_block_count(struct vdo_slab *slab, bool incremented) 1389 { 1390 struct block_allocator *allocator = slab->allocator; 1391 1392 WRITE_ONCE(allocator->allocated_blocks, 1393 allocator->allocated_blocks + (incremented ? -1 : 1)); 1394 1395 /* The open slab doesn't need to be reprioritized until it is closed. */ 1396 if (slab == allocator->open_slab) 1397 return; 1398 1399 /* Don't bother adjusting the priority table if unneeded. */ 1400 if (slab->priority == calculate_slab_priority(slab)) 1401 return; 1402 1403 /* 1404 * Reprioritize the slab to reflect the new free block count by removing it from the table 1405 * and re-enqueuing it with the new priority. 1406 */ 1407 vdo_priority_table_remove(allocator->prioritized_slabs, &slab->allocq_entry); 1408 prioritize_slab(slab); 1409 } 1410 1411 /** 1412 * increment_for_data() - Increment the reference count for a data block. 1413 * @slab: The slab which owns the block. 1414 * @block: The reference block which contains the block being updated. 1415 * @block_number: The block to update. 1416 * @old_status: The reference status of the data block before this increment. 1417 * @lock: The pbn_lock associated with this increment (may be NULL). 1418 * @counter_ptr: A pointer to the count for the data block (in, out). 1419 * @adjust_block_count: Whether to update the allocator's free block count. 1420 * 1421 * Return: VDO_SUCCESS or an error. 1422 */ 1423 static int increment_for_data(struct vdo_slab *slab, struct reference_block *block, 1424 slab_block_number block_number, 1425 enum reference_status old_status, 1426 struct pbn_lock *lock, vdo_refcount_t *counter_ptr, 1427 bool adjust_block_count) 1428 { 1429 switch (old_status) { 1430 case RS_FREE: 1431 *counter_ptr = 1; 1432 block->allocated_count++; 1433 slab->free_blocks--; 1434 if (adjust_block_count) 1435 adjust_free_block_count(slab, false); 1436 1437 break; 1438 1439 case RS_PROVISIONAL: 1440 *counter_ptr = 1; 1441 break; 1442 1443 default: 1444 /* Single or shared */ 1445 if (*counter_ptr >= MAXIMUM_REFERENCE_COUNT) { 1446 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1447 "Incrementing a block already having 254 references (slab %u, offset %u)", 1448 slab->slab_number, block_number); 1449 } 1450 (*counter_ptr)++; 1451 } 1452 1453 if (lock != NULL) 1454 vdo_unassign_pbn_lock_provisional_reference(lock); 1455 return VDO_SUCCESS; 1456 } 1457 1458 /** 1459 * decrement_for_data() - Decrement the reference count for a data block. 1460 * @slab: The slab which owns the block. 1461 * @block: The reference block which contains the block being updated. 1462 * @block_number: The block to update. 1463 * @old_status: The reference status of the data block before this decrement. 1464 * @updater: The reference updater doing this operation in case we need to look up the pbn lock. 1465 * @counter_ptr: A pointer to the count for the data block (in, out). 1466 * @adjust_block_count: Whether to update the allocator's free block count. 1467 * 1468 * Return: VDO_SUCCESS or an error. 1469 */ 1470 static int decrement_for_data(struct vdo_slab *slab, struct reference_block *block, 1471 slab_block_number block_number, 1472 enum reference_status old_status, 1473 struct reference_updater *updater, 1474 vdo_refcount_t *counter_ptr, bool adjust_block_count) 1475 { 1476 switch (old_status) { 1477 case RS_FREE: 1478 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1479 "Decrementing free block at offset %u in slab %u", 1480 block_number, slab->slab_number); 1481 1482 case RS_PROVISIONAL: 1483 case RS_SINGLE: 1484 if (updater->zpbn.zone != NULL) { 1485 struct pbn_lock *lock = vdo_get_physical_zone_pbn_lock(updater->zpbn.zone, 1486 updater->zpbn.pbn); 1487 1488 if (lock != NULL) { 1489 /* 1490 * There is a read lock on this block, so the block must not become 1491 * unreferenced. 1492 */ 1493 *counter_ptr = PROVISIONAL_REFERENCE_COUNT; 1494 vdo_assign_pbn_lock_provisional_reference(lock); 1495 break; 1496 } 1497 } 1498 1499 *counter_ptr = EMPTY_REFERENCE_COUNT; 1500 block->allocated_count--; 1501 slab->free_blocks++; 1502 if (adjust_block_count) 1503 adjust_free_block_count(slab, true); 1504 1505 break; 1506 1507 default: 1508 /* Shared */ 1509 (*counter_ptr)--; 1510 } 1511 1512 return VDO_SUCCESS; 1513 } 1514 1515 /** 1516 * increment_for_block_map() - Increment the reference count for a block map page. 1517 * @slab: The slab which owns the block. 1518 * @block: The reference block which contains the block being updated. 1519 * @block_number: The block to update. 1520 * @old_status: The reference status of the block before this increment. 1521 * @lock: The pbn_lock associated with this increment (may be NULL). 1522 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild. 1523 * @counter_ptr: A pointer to the count for the block (in, out). 1524 * @adjust_block_count: Whether to update the allocator's free block count. 1525 * 1526 * All block map increments should be from provisional to MAXIMUM_REFERENCE_COUNT. Since block map 1527 * blocks never dedupe they should never be adjusted from any other state. The adjustment always 1528 * results in MAXIMUM_REFERENCE_COUNT as this value is used to prevent dedupe against block map 1529 * blocks. 1530 * 1531 * Return: VDO_SUCCESS or an error. 1532 */ 1533 static int increment_for_block_map(struct vdo_slab *slab, struct reference_block *block, 1534 slab_block_number block_number, 1535 enum reference_status old_status, 1536 struct pbn_lock *lock, bool normal_operation, 1537 vdo_refcount_t *counter_ptr, bool adjust_block_count) 1538 { 1539 switch (old_status) { 1540 case RS_FREE: 1541 if (normal_operation) { 1542 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1543 "Incrementing unallocated block map block (slab %u, offset %u)", 1544 slab->slab_number, block_number); 1545 } 1546 1547 *counter_ptr = MAXIMUM_REFERENCE_COUNT; 1548 block->allocated_count++; 1549 slab->free_blocks--; 1550 if (adjust_block_count) 1551 adjust_free_block_count(slab, false); 1552 1553 return VDO_SUCCESS; 1554 1555 case RS_PROVISIONAL: 1556 if (!normal_operation) 1557 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1558 "Block map block had provisional reference during replay (slab %u, offset %u)", 1559 slab->slab_number, block_number); 1560 1561 *counter_ptr = MAXIMUM_REFERENCE_COUNT; 1562 if (lock != NULL) 1563 vdo_unassign_pbn_lock_provisional_reference(lock); 1564 return VDO_SUCCESS; 1565 1566 default: 1567 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1568 "Incrementing a block map block which is already referenced %u times (slab %u, offset %u)", 1569 *counter_ptr, slab->slab_number, 1570 block_number); 1571 } 1572 } 1573 1574 static bool __must_check is_valid_journal_point(const struct journal_point *point) 1575 { 1576 return ((point != NULL) && (point->sequence_number > 0)); 1577 } 1578 1579 /** 1580 * update_reference_count() - Update the reference count of a block. 1581 * @slab: The slab which owns the block. 1582 * @block: The reference block which contains the block being updated. 1583 * @block_number: The block to update. 1584 * @slab_journal_point: The slab journal point at which this update is journaled. 1585 * @updater: The reference updater. 1586 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild. 1587 * @adjust_block_count: Whether to update the slab's free block count. 1588 * @provisional_decrement_ptr: A pointer which will be set to true if this update was a decrement 1589 * of a provisional reference. 1590 * 1591 * Return: VDO_SUCCESS or an error. 1592 */ 1593 static int update_reference_count(struct vdo_slab *slab, struct reference_block *block, 1594 slab_block_number block_number, 1595 const struct journal_point *slab_journal_point, 1596 struct reference_updater *updater, 1597 bool normal_operation, bool adjust_block_count, 1598 bool *provisional_decrement_ptr) 1599 { 1600 vdo_refcount_t *counter_ptr = &slab->counters[block_number]; 1601 enum reference_status old_status = reference_count_to_status(*counter_ptr); 1602 int result; 1603 1604 if (!updater->increment) { 1605 result = decrement_for_data(slab, block, block_number, old_status, 1606 updater, counter_ptr, adjust_block_count); 1607 if ((result == VDO_SUCCESS) && (old_status == RS_PROVISIONAL)) { 1608 if (provisional_decrement_ptr != NULL) 1609 *provisional_decrement_ptr = true; 1610 return VDO_SUCCESS; 1611 } 1612 } else if (updater->operation == VDO_JOURNAL_DATA_REMAPPING) { 1613 result = increment_for_data(slab, block, block_number, old_status, 1614 updater->lock, counter_ptr, adjust_block_count); 1615 } else { 1616 result = increment_for_block_map(slab, block, block_number, old_status, 1617 updater->lock, normal_operation, 1618 counter_ptr, adjust_block_count); 1619 } 1620 1621 if (result != VDO_SUCCESS) 1622 return result; 1623 1624 if (is_valid_journal_point(slab_journal_point)) 1625 slab->slab_journal_point = *slab_journal_point; 1626 1627 return VDO_SUCCESS; 1628 } 1629 1630 static int __must_check adjust_reference_count(struct vdo_slab *slab, 1631 struct reference_updater *updater, 1632 const struct journal_point *slab_journal_point) 1633 { 1634 slab_block_number block_number; 1635 int result; 1636 struct reference_block *block; 1637 bool provisional_decrement = false; 1638 1639 if (!is_slab_open(slab)) 1640 return VDO_INVALID_ADMIN_STATE; 1641 1642 result = slab_block_number_from_pbn(slab, updater->zpbn.pbn, &block_number); 1643 if (result != VDO_SUCCESS) 1644 return result; 1645 1646 block = get_reference_block(slab, block_number); 1647 result = update_reference_count(slab, block, block_number, slab_journal_point, 1648 updater, NORMAL_OPERATION, true, 1649 &provisional_decrement); 1650 if ((result != VDO_SUCCESS) || provisional_decrement) 1651 return result; 1652 1653 if (block->is_dirty && (block->slab_journal_lock > 0)) { 1654 sequence_number_t entry_lock = slab_journal_point->sequence_number; 1655 /* 1656 * This block is already dirty and a slab journal entry has been made for it since 1657 * the last time it was clean. We must release the per-entry slab journal lock for 1658 * the entry associated with the update we are now doing. 1659 */ 1660 result = VDO_ASSERT(is_valid_journal_point(slab_journal_point), 1661 "Reference count adjustments need slab journal points."); 1662 if (result != VDO_SUCCESS) 1663 return result; 1664 1665 adjust_slab_journal_block_reference(&slab->journal, entry_lock, -1); 1666 return VDO_SUCCESS; 1667 } 1668 1669 /* 1670 * This may be the first time we are applying an update for which there is a slab journal 1671 * entry to this block since the block was cleaned. Therefore, we convert the per-entry 1672 * slab journal lock to an uncommitted reference block lock, if there is a per-entry lock. 1673 */ 1674 if (is_valid_journal_point(slab_journal_point)) 1675 block->slab_journal_lock = slab_journal_point->sequence_number; 1676 else 1677 block->slab_journal_lock = 0; 1678 1679 dirty_block(block); 1680 return VDO_SUCCESS; 1681 } 1682 1683 /** 1684 * add_entry_from_waiter() - Add an entry to the slab journal. 1685 * @waiter: The vio which should make an entry now. 1686 * @context: The slab journal to make an entry in. 1687 * 1688 * This callback is invoked by add_entries() once it has determined that we are ready to make 1689 * another entry in the slab journal. Implements waiter_callback_fn. 1690 */ 1691 static void add_entry_from_waiter(struct vdo_waiter *waiter, void *context) 1692 { 1693 int result; 1694 struct reference_updater *updater = 1695 container_of(waiter, struct reference_updater, waiter); 1696 struct data_vio *data_vio = data_vio_from_reference_updater(updater); 1697 struct slab_journal *journal = context; 1698 struct slab_journal_block_header *header = &journal->tail_header; 1699 struct journal_point slab_journal_point = { 1700 .sequence_number = header->sequence_number, 1701 .entry_count = header->entry_count, 1702 }; 1703 sequence_number_t recovery_block = data_vio->recovery_journal_point.sequence_number; 1704 1705 if (header->entry_count == 0) { 1706 /* 1707 * This is the first entry in the current tail block, so get a lock on the recovery 1708 * journal which we will hold until this tail block is committed. 1709 */ 1710 get_lock(journal, header->sequence_number)->recovery_start = recovery_block; 1711 if (journal->recovery_journal != NULL) { 1712 zone_count_t zone_number = journal->slab->allocator->zone_number; 1713 1714 vdo_acquire_recovery_journal_block_reference(journal->recovery_journal, 1715 recovery_block, 1716 VDO_ZONE_TYPE_PHYSICAL, 1717 zone_number); 1718 } 1719 1720 mark_slab_journal_dirty(journal, recovery_block); 1721 reclaim_journal_space(journal); 1722 } 1723 1724 add_entry(journal, updater->zpbn.pbn, updater->operation, updater->increment, 1725 expand_journal_point(data_vio->recovery_journal_point, 1726 updater->increment)); 1727 1728 if (journal->slab->status != VDO_SLAB_REBUILT) { 1729 /* 1730 * If the slab is unrecovered, scrubbing will take care of the count since the 1731 * update is now recorded in the journal. 1732 */ 1733 adjust_slab_journal_block_reference(journal, 1734 slab_journal_point.sequence_number, -1); 1735 result = VDO_SUCCESS; 1736 } else { 1737 /* Now that an entry has been made in the slab journal, update the counter. */ 1738 result = adjust_reference_count(journal->slab, updater, 1739 &slab_journal_point); 1740 } 1741 1742 if (updater->increment) 1743 continue_data_vio_with_error(data_vio, result); 1744 else 1745 vdo_continue_completion(&data_vio->decrement_completion, result); 1746 } 1747 1748 /** 1749 * is_next_entry_a_block_map_increment() - Check whether the next entry to be made is a block map 1750 * increment. 1751 * @journal: The journal. 1752 * 1753 * Return: true if the first entry waiter's operation is a block map increment. 1754 */ 1755 static inline bool is_next_entry_a_block_map_increment(struct slab_journal *journal) 1756 { 1757 struct vdo_waiter *waiter = vdo_waitq_get_first_waiter(&journal->entry_waiters); 1758 struct reference_updater *updater = 1759 container_of(waiter, struct reference_updater, waiter); 1760 1761 return (updater->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING); 1762 } 1763 1764 /** 1765 * add_entries() - Add as many entries as possible from the queue of vios waiting to make entries. 1766 * @journal: The journal to which entries may be added. 1767 * 1768 * By processing the queue in order, we ensure that slab journal entries are made in the same order 1769 * as recovery journal entries for the same increment or decrement. 1770 */ 1771 static void add_entries(struct slab_journal *journal) 1772 { 1773 if (journal->adding_entries) { 1774 /* Protect against re-entrancy. */ 1775 return; 1776 } 1777 1778 journal->adding_entries = true; 1779 while (vdo_waitq_has_waiters(&journal->entry_waiters)) { 1780 struct slab_journal_block_header *header = &journal->tail_header; 1781 1782 if (journal->partial_write_in_progress || 1783 (journal->slab->status == VDO_SLAB_REBUILDING)) { 1784 /* 1785 * Don't add entries while rebuilding or while a partial write is 1786 * outstanding, as it could result in reference count corruption. 1787 */ 1788 break; 1789 } 1790 1791 if (journal->waiting_to_commit) { 1792 /* 1793 * If we are waiting for resources to write the tail block, and the tail 1794 * block is full, we can't make another entry. 1795 */ 1796 WRITE_ONCE(journal->events->tail_busy_count, 1797 journal->events->tail_busy_count + 1); 1798 break; 1799 } else if (is_next_entry_a_block_map_increment(journal) && 1800 (header->entry_count >= journal->full_entries_per_block)) { 1801 /* 1802 * The tail block does not have room for a block map increment, so commit 1803 * it now. 1804 */ 1805 commit_tail(journal); 1806 if (journal->waiting_to_commit) { 1807 WRITE_ONCE(journal->events->tail_busy_count, 1808 journal->events->tail_busy_count + 1); 1809 break; 1810 } 1811 } 1812 1813 /* If the slab is over the blocking threshold, make the vio wait. */ 1814 if (requires_reaping(journal)) { 1815 WRITE_ONCE(journal->events->blocked_count, 1816 journal->events->blocked_count + 1); 1817 save_dirty_reference_blocks(journal->slab); 1818 break; 1819 } 1820 1821 if (header->entry_count == 0) { 1822 struct journal_lock *lock = 1823 get_lock(journal, header->sequence_number); 1824 1825 /* 1826 * Check if the on disk slab journal is full. Because of the blocking and 1827 * scrubbing thresholds, this should never happen. 1828 */ 1829 if (lock->count > 0) { 1830 VDO_ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail, 1831 "New block has locks, but journal is not full"); 1832 1833 /* 1834 * The blocking threshold must let the journal fill up if the new 1835 * block has locks; if the blocking threshold is smaller than the 1836 * journal size, the new block cannot possibly have locks already. 1837 */ 1838 VDO_ASSERT_LOG_ONLY((journal->blocking_threshold >= journal->size), 1839 "New block can have locks already iff blocking threshold is at the end of the journal"); 1840 1841 WRITE_ONCE(journal->events->disk_full_count, 1842 journal->events->disk_full_count + 1); 1843 save_dirty_reference_blocks(journal->slab); 1844 break; 1845 } 1846 1847 /* 1848 * Don't allow the new block to be reaped until all of the reference count 1849 * blocks are written and the journal block has been fully committed as 1850 * well. 1851 */ 1852 lock->count = journal->entries_per_block + 1; 1853 1854 if (header->sequence_number == 1) { 1855 struct vdo_slab *slab = journal->slab; 1856 block_count_t i; 1857 1858 /* 1859 * This is the first entry in this slab journal, ever. Dirty all of 1860 * the reference count blocks. Each will acquire a lock on the tail 1861 * block so that the journal won't be reaped until the reference 1862 * counts are initialized. The lock acquisition must be done by the 1863 * ref_counts since here we don't know how many reference blocks 1864 * the ref_counts has. 1865 */ 1866 for (i = 0; i < slab->reference_block_count; i++) { 1867 slab->reference_blocks[i].slab_journal_lock = 1; 1868 dirty_block(&slab->reference_blocks[i]); 1869 } 1870 1871 adjust_slab_journal_block_reference(journal, 1, 1872 slab->reference_block_count); 1873 } 1874 } 1875 1876 vdo_waitq_notify_next_waiter(&journal->entry_waiters, 1877 add_entry_from_waiter, journal); 1878 } 1879 1880 journal->adding_entries = false; 1881 1882 /* If there are no waiters, and we are flushing or saving, commit the tail block. */ 1883 if (vdo_is_state_draining(&journal->slab->state) && 1884 !vdo_is_state_suspending(&journal->slab->state) && 1885 !vdo_waitq_has_waiters(&journal->entry_waiters)) 1886 commit_tail(journal); 1887 } 1888 1889 /** 1890 * reset_search_cursor() - Reset the free block search back to the first reference counter in the 1891 * first reference block of a slab. 1892 * @slab: The slab. 1893 */ 1894 static void reset_search_cursor(struct vdo_slab *slab) 1895 { 1896 struct search_cursor *cursor = &slab->search_cursor; 1897 1898 cursor->block = cursor->first_block; 1899 cursor->index = 0; 1900 cursor->end_index = min_t(u32, COUNTS_PER_BLOCK, slab->block_count); 1901 } 1902 1903 /** 1904 * advance_search_cursor() - Advance the search cursor to the start of the next reference block in 1905 * a slab. 1906 * @slab: The slab. 1907 * 1908 * Wraps around to the first reference block if the current block is the last reference block. 1909 * 1910 * Return: True unless the cursor was at the last reference block. 1911 */ 1912 static bool advance_search_cursor(struct vdo_slab *slab) 1913 { 1914 struct search_cursor *cursor = &slab->search_cursor; 1915 1916 /* 1917 * If we just finished searching the last reference block, then wrap back around to the 1918 * start of the array. 1919 */ 1920 if (cursor->block == cursor->last_block) { 1921 reset_search_cursor(slab); 1922 return false; 1923 } 1924 1925 /* We're not already at the end, so advance to cursor to the next block. */ 1926 cursor->block++; 1927 cursor->index = cursor->end_index; 1928 1929 if (cursor->block == cursor->last_block) { 1930 /* The last reference block will usually be a runt. */ 1931 cursor->end_index = slab->block_count; 1932 } else { 1933 cursor->end_index += COUNTS_PER_BLOCK; 1934 } 1935 1936 return true; 1937 } 1938 1939 /** 1940 * vdo_adjust_reference_count_for_rebuild() - Adjust the reference count of a block during rebuild. 1941 * @depot: The slab depot. 1942 * @pbn: The physical block number to adjust. 1943 * @operation: The type opf operation. 1944 * 1945 * Return: VDO_SUCCESS or an error. 1946 */ 1947 int vdo_adjust_reference_count_for_rebuild(struct slab_depot *depot, 1948 physical_block_number_t pbn, 1949 enum journal_operation operation) 1950 { 1951 int result; 1952 slab_block_number block_number; 1953 struct reference_block *block; 1954 struct vdo_slab *slab = vdo_get_slab(depot, pbn); 1955 struct reference_updater updater = { 1956 .operation = operation, 1957 .increment = true, 1958 }; 1959 1960 result = slab_block_number_from_pbn(slab, pbn, &block_number); 1961 if (result != VDO_SUCCESS) 1962 return result; 1963 1964 block = get_reference_block(slab, block_number); 1965 result = update_reference_count(slab, block, block_number, NULL, 1966 &updater, !NORMAL_OPERATION, false, NULL); 1967 if (result != VDO_SUCCESS) 1968 return result; 1969 1970 dirty_block(block); 1971 return VDO_SUCCESS; 1972 } 1973 1974 /** 1975 * replay_reference_count_change() - Replay the reference count adjustment from a slab journal 1976 * entry into the reference count for a block. 1977 * @slab: The slab. 1978 * @entry_point: The slab journal point for the entry. 1979 * @entry: The slab journal entry being replayed. 1980 * 1981 * The adjustment will be ignored if it was already recorded in the reference count. 1982 * 1983 * Return: VDO_SUCCESS or an error code. 1984 */ 1985 static int replay_reference_count_change(struct vdo_slab *slab, 1986 const struct journal_point *entry_point, 1987 struct slab_journal_entry entry) 1988 { 1989 int result; 1990 struct reference_block *block = get_reference_block(slab, entry.sbn); 1991 sector_count_t sector = (entry.sbn % COUNTS_PER_BLOCK) / COUNTS_PER_SECTOR; 1992 struct reference_updater updater = { 1993 .operation = entry.operation, 1994 .increment = entry.increment, 1995 }; 1996 1997 if (!vdo_before_journal_point(&block->commit_points[sector], entry_point)) { 1998 /* This entry is already reflected in the existing counts, so do nothing. */ 1999 return VDO_SUCCESS; 2000 } 2001 2002 /* This entry is not yet counted in the reference counts. */ 2003 result = update_reference_count(slab, block, entry.sbn, entry_point, 2004 &updater, !NORMAL_OPERATION, false, NULL); 2005 if (result != VDO_SUCCESS) 2006 return result; 2007 2008 dirty_block(block); 2009 return VDO_SUCCESS; 2010 } 2011 2012 /** 2013 * find_zero_byte_in_word() - Find the array index of the first zero byte in word-sized range of 2014 * reference counters. 2015 * @word_ptr: A pointer to the eight counter bytes to check. 2016 * @start_index: The array index corresponding to word_ptr[0]. 2017 * @fail_index: The array index to return if no zero byte is found. 2018 * 2019 * The search does no bounds checking; the function relies on the array being sufficiently padded. 2020 * 2021 * Return: The array index of the first zero byte in the word, or the value passed as fail_index if 2022 * no zero byte was found. 2023 */ 2024 static inline slab_block_number find_zero_byte_in_word(const u8 *word_ptr, 2025 slab_block_number start_index, 2026 slab_block_number fail_index) 2027 { 2028 u64 word = get_unaligned_le64(word_ptr); 2029 2030 /* This looks like a loop, but GCC will unroll the eight iterations for us. */ 2031 unsigned int offset; 2032 2033 for (offset = 0; offset < BYTES_PER_WORD; offset++) { 2034 /* Assumes little-endian byte order, which we have on X86. */ 2035 if ((word & 0xFF) == 0) 2036 return (start_index + offset); 2037 word >>= 8; 2038 } 2039 2040 return fail_index; 2041 } 2042 2043 /** 2044 * find_free_block() - Find the first block with a reference count of zero in the specified 2045 * range of reference counter indexes. 2046 * @slab: The slab counters to scan. 2047 * @index_ptr: A pointer to hold the array index of the free block. 2048 * 2049 * Return: True if a free block was found in the specified range. 2050 */ 2051 static bool find_free_block(const struct vdo_slab *slab, slab_block_number *index_ptr) 2052 { 2053 slab_block_number zero_index; 2054 slab_block_number next_index = slab->search_cursor.index; 2055 slab_block_number end_index = slab->search_cursor.end_index; 2056 u8 *next_counter = &slab->counters[next_index]; 2057 u8 *end_counter = &slab->counters[end_index]; 2058 2059 /* 2060 * Search every byte of the first unaligned word. (Array is padded so reading past end is 2061 * safe.) 2062 */ 2063 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index); 2064 if (zero_index < end_index) { 2065 *index_ptr = zero_index; 2066 return true; 2067 } 2068 2069 /* 2070 * On architectures where unaligned word access is expensive, this would be a good place to 2071 * advance to an alignment boundary. 2072 */ 2073 next_index += BYTES_PER_WORD; 2074 next_counter += BYTES_PER_WORD; 2075 2076 /* 2077 * Now we're word-aligned; check an word at a time until we find a word containing a zero. 2078 * (Array is padded so reading past end is safe.) 2079 */ 2080 while (next_counter < end_counter) { 2081 /* 2082 * The following code is currently an exact copy of the code preceding the loop, 2083 * but if you try to merge them by using a do loop, it runs slower because a jump 2084 * instruction gets added at the start of the iteration. 2085 */ 2086 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index); 2087 if (zero_index < end_index) { 2088 *index_ptr = zero_index; 2089 return true; 2090 } 2091 2092 next_index += BYTES_PER_WORD; 2093 next_counter += BYTES_PER_WORD; 2094 } 2095 2096 return false; 2097 } 2098 2099 /** 2100 * search_current_reference_block() - Search the reference block currently saved in the search 2101 * cursor for a reference count of zero, starting at the saved 2102 * counter index. 2103 * @slab: The slab to search. 2104 * @free_index_ptr: A pointer to receive the array index of the zero reference count. 2105 * 2106 * Return: True if an unreferenced counter was found. 2107 */ 2108 static bool search_current_reference_block(const struct vdo_slab *slab, 2109 slab_block_number *free_index_ptr) 2110 { 2111 /* Don't bother searching if the current block is known to be full. */ 2112 return ((slab->search_cursor.block->allocated_count < COUNTS_PER_BLOCK) && 2113 find_free_block(slab, free_index_ptr)); 2114 } 2115 2116 /** 2117 * search_reference_blocks() - Search each reference block for a reference count of zero. 2118 * @slab: The slab to search. 2119 * @free_index_ptr: A pointer to receive the array index of the zero reference count. 2120 * 2121 * Searches each reference block for a reference count of zero, starting at the reference block and 2122 * counter index saved in the search cursor and searching up to the end of the last reference 2123 * block. The search does not wrap. 2124 * 2125 * Return: True if an unreferenced counter was found. 2126 */ 2127 static bool search_reference_blocks(struct vdo_slab *slab, 2128 slab_block_number *free_index_ptr) 2129 { 2130 /* Start searching at the saved search position in the current block. */ 2131 if (search_current_reference_block(slab, free_index_ptr)) 2132 return true; 2133 2134 /* Search each reference block up to the end of the slab. */ 2135 while (advance_search_cursor(slab)) { 2136 if (search_current_reference_block(slab, free_index_ptr)) 2137 return true; 2138 } 2139 2140 return false; 2141 } 2142 2143 /** 2144 * make_provisional_reference() - Do the bookkeeping for making a provisional reference. 2145 * @slab: The slab. 2146 * @block_number: The index for the physical block to reference. 2147 */ 2148 static void make_provisional_reference(struct vdo_slab *slab, 2149 slab_block_number block_number) 2150 { 2151 struct reference_block *block = get_reference_block(slab, block_number); 2152 2153 /* 2154 * Make the initial transition from an unreferenced block to a 2155 * provisionally allocated block. 2156 */ 2157 slab->counters[block_number] = PROVISIONAL_REFERENCE_COUNT; 2158 2159 /* Account for the allocation. */ 2160 block->allocated_count++; 2161 slab->free_blocks--; 2162 } 2163 2164 /** 2165 * dirty_all_reference_blocks() - Mark all reference count blocks in a slab as dirty. 2166 * @slab: The slab. 2167 */ 2168 static void dirty_all_reference_blocks(struct vdo_slab *slab) 2169 { 2170 block_count_t i; 2171 2172 for (i = 0; i < slab->reference_block_count; i++) 2173 dirty_block(&slab->reference_blocks[i]); 2174 } 2175 2176 static inline bool journal_points_equal(struct journal_point first, 2177 struct journal_point second) 2178 { 2179 return ((first.sequence_number == second.sequence_number) && 2180 (first.entry_count == second.entry_count)); 2181 } 2182 2183 /** 2184 * match_bytes() - Check an 8-byte word for bytes matching the value specified 2185 * @input: A word to examine the bytes of. 2186 * @match: The byte value sought. 2187 * 2188 * Return: 1 in each byte when the corresponding input byte matched, 0 otherwise. 2189 */ 2190 static inline u64 match_bytes(u64 input, u8 match) 2191 { 2192 u64 temp = input ^ (match * 0x0101010101010101ULL); 2193 /* top bit of each byte is set iff top bit of temp byte is clear; rest are 0 */ 2194 u64 test_top_bits = ~temp & 0x8080808080808080ULL; 2195 /* top bit of each byte is set iff low 7 bits of temp byte are clear; rest are useless */ 2196 u64 test_low_bits = 0x8080808080808080ULL - (temp & 0x7f7f7f7f7f7f7f7fULL); 2197 /* return 1 when both tests indicate temp byte is 0 */ 2198 return (test_top_bits & test_low_bits) >> 7; 2199 } 2200 2201 /** 2202 * count_valid_references() - Process a newly loaded refcount array 2203 * @counters: The array of counters from a metadata block. 2204 * 2205 * Scan an 8-byte-aligned array of counters, fixing up any provisional values that 2206 * weren't cleaned up at shutdown, changing them internally to zero. 2207 * 2208 * Return: The number of blocks with a non-zero reference count. 2209 */ 2210 static unsigned int count_valid_references(vdo_refcount_t *counters) 2211 { 2212 u64 *words = (u64 *)counters; 2213 /* It's easier to count occurrences of a specific byte than its absences. */ 2214 unsigned int empty_count = 0; 2215 /* For speed, we process 8 bytes at once. */ 2216 unsigned int words_left = COUNTS_PER_BLOCK / sizeof(u64); 2217 2218 /* 2219 * Sanity check assumptions used for optimizing this code: Counters are bytes. The counter 2220 * array is a multiple of the word size. 2221 */ 2222 BUILD_BUG_ON(sizeof(vdo_refcount_t) != 1); 2223 BUILD_BUG_ON((COUNTS_PER_BLOCK % sizeof(u64)) != 0); 2224 2225 while (words_left > 0) { 2226 /* 2227 * This is used effectively as 8 byte-size counters. Byte 0 counts how many words 2228 * had the target value found in byte 0, etc. We just have to avoid overflow. 2229 */ 2230 u64 split_count = 0; 2231 /* 2232 * The counter "% 255" trick used below to fold split_count into empty_count 2233 * imposes a limit of 254 bytes examined each iteration of the outer loop. We 2234 * process a word at a time, so that limit gets rounded down to 31 u64 words. 2235 */ 2236 const unsigned int max_words_per_iteration = 254 / sizeof(u64); 2237 unsigned int iter_words_left = min_t(unsigned int, words_left, 2238 max_words_per_iteration); 2239 2240 words_left -= iter_words_left; 2241 2242 while (iter_words_left--) { 2243 u64 word = *words; 2244 u64 temp; 2245 2246 /* First, if we have any provisional refcount values, clear them. */ 2247 temp = match_bytes(word, PROVISIONAL_REFERENCE_COUNT); 2248 if (temp) { 2249 /* 2250 * 'temp' has 0x01 bytes where 'word' has PROVISIONAL; this xor 2251 * will alter just those bytes, changing PROVISIONAL to EMPTY. 2252 */ 2253 word ^= temp * (PROVISIONAL_REFERENCE_COUNT ^ EMPTY_REFERENCE_COUNT); 2254 *words = word; 2255 } 2256 2257 /* Now count the EMPTY_REFERENCE_COUNT bytes, updating the 8 counters. */ 2258 split_count += match_bytes(word, EMPTY_REFERENCE_COUNT); 2259 words++; 2260 } 2261 empty_count += split_count % 255; 2262 } 2263 2264 return COUNTS_PER_BLOCK - empty_count; 2265 } 2266 2267 /** 2268 * unpack_reference_block() - Unpack reference counts blocks into the internal memory structure. 2269 * @packed: The written reference block to be unpacked. 2270 * @block: The internal reference block to be loaded. 2271 */ 2272 static void unpack_reference_block(struct packed_reference_block *packed, 2273 struct reference_block *block) 2274 { 2275 sector_count_t i; 2276 struct vdo_slab *slab = block->slab; 2277 vdo_refcount_t *counters = get_reference_counters_for_block(block); 2278 2279 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) { 2280 struct packed_reference_sector *sector = &packed->sectors[i]; 2281 2282 vdo_unpack_journal_point(§or->commit_point, &block->commit_points[i]); 2283 memcpy(counters + (i * COUNTS_PER_SECTOR), sector->counts, 2284 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR)); 2285 /* The slab_journal_point must be the latest point found in any sector. */ 2286 if (vdo_before_journal_point(&slab->slab_journal_point, 2287 &block->commit_points[i])) 2288 slab->slab_journal_point = block->commit_points[i]; 2289 2290 if ((i > 0) && 2291 !journal_points_equal(block->commit_points[0], 2292 block->commit_points[i])) { 2293 size_t block_index = block - block->slab->reference_blocks; 2294 2295 vdo_log_warning("Torn write detected in sector %u of reference block %zu of slab %u", 2296 i, block_index, block->slab->slab_number); 2297 } 2298 } 2299 2300 block->allocated_count = count_valid_references(counters); 2301 } 2302 2303 /** 2304 * finish_reference_block_load() - After a reference block has been read, unpack it. 2305 * @completion: The VIO that just finished reading. 2306 */ 2307 static void finish_reference_block_load(struct vdo_completion *completion) 2308 { 2309 struct vio *vio = as_vio(completion); 2310 struct pooled_vio *pooled = vio_as_pooled_vio(vio); 2311 struct reference_block *block = completion->parent; 2312 struct vdo_slab *slab = block->slab; 2313 unsigned int block_count = vio->io_size / VDO_BLOCK_SIZE; 2314 unsigned int i; 2315 char *data = vio->data; 2316 2317 for (i = 0; i < block_count; i++, block++, data += VDO_BLOCK_SIZE) { 2318 struct packed_reference_block *packed = (struct packed_reference_block *) data; 2319 2320 unpack_reference_block(packed, block); 2321 slab->free_blocks -= block->allocated_count; 2322 } 2323 return_vio_to_pool(pooled); 2324 slab->active_count -= block_count; 2325 2326 check_if_slab_drained(slab); 2327 } 2328 2329 static void load_reference_block_endio(struct bio *bio) 2330 { 2331 struct vio *vio = bio->bi_private; 2332 struct reference_block *block = vio->completion.parent; 2333 2334 continue_vio_after_io(vio, finish_reference_block_load, 2335 block->slab->allocator->thread_id); 2336 } 2337 2338 /** 2339 * load_reference_block_group() - After a block waiter has gotten a VIO from the VIO pool, load 2340 * a set of blocks. 2341 * @waiter: The waiter of the first block to load. 2342 * @context: The VIO returned by the pool. 2343 */ 2344 static void load_reference_block_group(struct vdo_waiter *waiter, void *context) 2345 { 2346 struct pooled_vio *pooled = context; 2347 struct vio *vio = &pooled->vio; 2348 struct reference_block *block = 2349 container_of(waiter, struct reference_block, waiter); 2350 u32 block_offset = block - block->slab->reference_blocks; 2351 u32 max_block_count = block->slab->reference_block_count - block_offset; 2352 u32 block_count = min_t(int, vio->block_count, max_block_count); 2353 2354 vio->completion.parent = block; 2355 vdo_submit_metadata_vio_with_size(vio, block->slab->ref_counts_origin + block_offset, 2356 load_reference_block_endio, handle_io_error, 2357 REQ_OP_READ, block_count * VDO_BLOCK_SIZE); 2358 } 2359 2360 /** 2361 * load_reference_blocks() - Load a slab's reference blocks from the underlying storage into a 2362 * pre-allocated reference counter. 2363 * @slab: The slab. 2364 */ 2365 static void load_reference_blocks(struct vdo_slab *slab) 2366 { 2367 block_count_t i; 2368 u64 blocks_per_vio = slab->allocator->refcount_blocks_per_big_vio; 2369 struct vio_pool *pool = slab->allocator->refcount_big_vio_pool; 2370 2371 if (!pool) { 2372 pool = slab->allocator->vio_pool; 2373 blocks_per_vio = 1; 2374 } 2375 2376 slab->free_blocks = slab->block_count; 2377 slab->active_count = slab->reference_block_count; 2378 for (i = 0; i < slab->reference_block_count; i += blocks_per_vio) { 2379 struct vdo_waiter *waiter = &slab->reference_blocks[i].waiter; 2380 2381 waiter->callback = load_reference_block_group; 2382 acquire_vio_from_pool(pool, waiter); 2383 } 2384 } 2385 2386 /** 2387 * drain_slab() - Drain all reference count I/O. 2388 * @slab: The slab. 2389 * 2390 * Depending upon the type of drain being performed (as recorded in the ref_count's vdo_slab), the 2391 * reference blocks may be loaded from disk or dirty reference blocks may be written out. 2392 */ 2393 static void drain_slab(struct vdo_slab *slab) 2394 { 2395 bool save; 2396 bool load; 2397 const struct admin_state_code *state = vdo_get_admin_state_code(&slab->state); 2398 2399 if (state == VDO_ADMIN_STATE_SUSPENDING) 2400 return; 2401 2402 if ((state != VDO_ADMIN_STATE_REBUILDING) && 2403 (state != VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING)) 2404 commit_tail(&slab->journal); 2405 2406 if ((state == VDO_ADMIN_STATE_RECOVERING) || (slab->counters == NULL)) 2407 return; 2408 2409 save = false; 2410 load = slab->allocator->summary_entries[slab->slab_number].load_ref_counts; 2411 if (state == VDO_ADMIN_STATE_SCRUBBING) { 2412 if (load) { 2413 load_reference_blocks(slab); 2414 return; 2415 } 2416 } else if (state == VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING) { 2417 if (!load) { 2418 /* These reference counts were never written, so mark them all dirty. */ 2419 dirty_all_reference_blocks(slab); 2420 } 2421 save = true; 2422 } else if (state == VDO_ADMIN_STATE_REBUILDING) { 2423 /* 2424 * Write out the counters if the slab has written them before, or it has any 2425 * non-zero reference counts, or there are any slab journal blocks. 2426 */ 2427 block_count_t data_blocks = slab->allocator->depot->slab_config.data_blocks; 2428 2429 if (load || (slab->free_blocks != data_blocks) || 2430 !is_slab_journal_blank(slab)) { 2431 dirty_all_reference_blocks(slab); 2432 save = true; 2433 } 2434 } else if (state == VDO_ADMIN_STATE_SAVING) { 2435 save = (slab->status == VDO_SLAB_REBUILT); 2436 } else { 2437 vdo_finish_draining_with_result(&slab->state, VDO_SUCCESS); 2438 return; 2439 } 2440 2441 if (save) 2442 save_dirty_reference_blocks(slab); 2443 } 2444 2445 static int allocate_slab_counters(struct vdo_slab *slab) 2446 { 2447 int result; 2448 size_t index, bytes; 2449 2450 result = VDO_ASSERT(slab->reference_blocks == NULL, 2451 "vdo_slab %u doesn't allocate refcounts twice", 2452 slab->slab_number); 2453 if (result != VDO_SUCCESS) 2454 return result; 2455 2456 result = vdo_allocate(slab->reference_block_count, struct reference_block, 2457 __func__, &slab->reference_blocks); 2458 if (result != VDO_SUCCESS) 2459 return result; 2460 2461 /* 2462 * Allocate such that the runt slab has a full-length memory array, plus a little padding 2463 * so we can word-search even at the very end. 2464 */ 2465 bytes = (slab->reference_block_count * COUNTS_PER_BLOCK) + (2 * BYTES_PER_WORD); 2466 result = vdo_allocate(bytes, vdo_refcount_t, "ref counts array", 2467 &slab->counters); 2468 if (result != VDO_SUCCESS) { 2469 vdo_free(vdo_forget(slab->reference_blocks)); 2470 return result; 2471 } 2472 2473 slab->search_cursor.first_block = slab->reference_blocks; 2474 slab->search_cursor.last_block = &slab->reference_blocks[slab->reference_block_count - 1]; 2475 reset_search_cursor(slab); 2476 2477 for (index = 0; index < slab->reference_block_count; index++) { 2478 slab->reference_blocks[index] = (struct reference_block) { 2479 .slab = slab, 2480 }; 2481 } 2482 2483 return VDO_SUCCESS; 2484 } 2485 2486 static int allocate_counters_if_clean(struct vdo_slab *slab) 2487 { 2488 if (vdo_is_state_clean_load(&slab->state)) 2489 return allocate_slab_counters(slab); 2490 2491 return VDO_SUCCESS; 2492 } 2493 2494 static void finish_loading_journal(struct vdo_completion *completion) 2495 { 2496 struct vio *vio = as_vio(completion); 2497 struct slab_journal *journal = completion->parent; 2498 struct vdo_slab *slab = journal->slab; 2499 struct packed_slab_journal_block *block = (struct packed_slab_journal_block *) vio->data; 2500 struct slab_journal_block_header header; 2501 2502 vdo_unpack_slab_journal_block_header(&block->header, &header); 2503 2504 /* FIXME: should it be an error if the following conditional fails? */ 2505 if ((header.metadata_type == VDO_METADATA_SLAB_JOURNAL) && 2506 (header.nonce == slab->allocator->nonce)) { 2507 journal->tail = header.sequence_number + 1; 2508 2509 /* 2510 * If the slab is clean, this implies the slab journal is empty, so advance the 2511 * head appropriately. 2512 */ 2513 journal->head = (slab->allocator->summary_entries[slab->slab_number].is_dirty ? 2514 header.head : journal->tail); 2515 journal->tail_header = header; 2516 initialize_journal_state(journal); 2517 } 2518 2519 return_vio_to_pool(vio_as_pooled_vio(vio)); 2520 vdo_finish_loading_with_result(&slab->state, allocate_counters_if_clean(slab)); 2521 } 2522 2523 static void read_slab_journal_tail_endio(struct bio *bio) 2524 { 2525 struct vio *vio = bio->bi_private; 2526 struct slab_journal *journal = vio->completion.parent; 2527 2528 continue_vio_after_io(vio, finish_loading_journal, 2529 journal->slab->allocator->thread_id); 2530 } 2531 2532 static void handle_load_error(struct vdo_completion *completion) 2533 { 2534 int result = completion->result; 2535 struct slab_journal *journal = completion->parent; 2536 struct vio *vio = as_vio(completion); 2537 2538 vio_record_metadata_io_error(vio); 2539 return_vio_to_pool(vio_as_pooled_vio(vio)); 2540 vdo_finish_loading_with_result(&journal->slab->state, result); 2541 } 2542 2543 /** 2544 * read_slab_journal_tail() - Read the slab journal tail block by using a vio acquired from the vio 2545 * pool. 2546 * @waiter: The vio pool waiter which has just been notified. 2547 * @context: The vio pool entry given to the waiter. 2548 * 2549 * This is the success callback from acquire_vio_from_pool() when loading a slab journal. 2550 */ 2551 static void read_slab_journal_tail(struct vdo_waiter *waiter, void *context) 2552 { 2553 struct slab_journal *journal = 2554 container_of(waiter, struct slab_journal, resource_waiter); 2555 struct vdo_slab *slab = journal->slab; 2556 struct pooled_vio *pooled = context; 2557 struct vio *vio = &pooled->vio; 2558 tail_block_offset_t last_commit_point = 2559 slab->allocator->summary_entries[slab->slab_number].tail_block_offset; 2560 2561 /* 2562 * Slab summary keeps the commit point offset, so the tail block is the block before that. 2563 * Calculation supports small journals in unit tests. 2564 */ 2565 tail_block_offset_t tail_block = ((last_commit_point == 0) ? 2566 (tail_block_offset_t)(journal->size - 1) : 2567 (last_commit_point - 1)); 2568 2569 vio->completion.parent = journal; 2570 vio->completion.callback_thread_id = slab->allocator->thread_id; 2571 vdo_submit_metadata_vio(vio, slab->journal_origin + tail_block, 2572 read_slab_journal_tail_endio, handle_load_error, 2573 REQ_OP_READ); 2574 } 2575 2576 /** 2577 * load_slab_journal() - Load a slab's journal by reading the journal's tail. 2578 * @slab: The slab. 2579 */ 2580 static void load_slab_journal(struct vdo_slab *slab) 2581 { 2582 struct slab_journal *journal = &slab->journal; 2583 tail_block_offset_t last_commit_point; 2584 2585 last_commit_point = slab->allocator->summary_entries[slab->slab_number].tail_block_offset; 2586 if ((last_commit_point == 0) && 2587 !slab->allocator->summary_entries[slab->slab_number].load_ref_counts) { 2588 /* 2589 * This slab claims that it has a tail block at (journal->size - 1), but a head of 2590 * 1. This is impossible, due to the scrubbing threshold, on a real system, so 2591 * don't bother reading the (bogus) data off disk. 2592 */ 2593 VDO_ASSERT_LOG_ONLY(((journal->size < 16) || 2594 (journal->scrubbing_threshold < (journal->size - 1))), 2595 "Scrubbing threshold protects against reads of unwritten slab journal blocks"); 2596 vdo_finish_loading_with_result(&slab->state, 2597 allocate_counters_if_clean(slab)); 2598 return; 2599 } 2600 2601 journal->resource_waiter.callback = read_slab_journal_tail; 2602 acquire_vio_from_pool(slab->allocator->vio_pool, &journal->resource_waiter); 2603 } 2604 2605 static void register_slab_for_scrubbing(struct vdo_slab *slab, bool high_priority) 2606 { 2607 struct slab_scrubber *scrubber = &slab->allocator->scrubber; 2608 2609 VDO_ASSERT_LOG_ONLY((slab->status != VDO_SLAB_REBUILT), 2610 "slab to be scrubbed is unrecovered"); 2611 2612 if (slab->status != VDO_SLAB_REQUIRES_SCRUBBING) 2613 return; 2614 2615 list_del_init(&slab->allocq_entry); 2616 if (!slab->was_queued_for_scrubbing) { 2617 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count + 1); 2618 slab->was_queued_for_scrubbing = true; 2619 } 2620 2621 if (high_priority) { 2622 slab->status = VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING; 2623 list_add_tail(&slab->allocq_entry, &scrubber->high_priority_slabs); 2624 return; 2625 } 2626 2627 list_add_tail(&slab->allocq_entry, &scrubber->slabs); 2628 } 2629 2630 /* Queue a slab for allocation or scrubbing. */ 2631 static void queue_slab(struct vdo_slab *slab) 2632 { 2633 struct block_allocator *allocator = slab->allocator; 2634 block_count_t free_blocks; 2635 int result; 2636 2637 VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry), 2638 "a requeued slab must not already be on a list"); 2639 2640 if (vdo_is_read_only(allocator->depot->vdo)) 2641 return; 2642 2643 free_blocks = slab->free_blocks; 2644 result = VDO_ASSERT((free_blocks <= allocator->depot->slab_config.data_blocks), 2645 "rebuilt slab %u must have a valid free block count (has %llu, expected maximum %llu)", 2646 slab->slab_number, (unsigned long long) free_blocks, 2647 (unsigned long long) allocator->depot->slab_config.data_blocks); 2648 if (result != VDO_SUCCESS) { 2649 vdo_enter_read_only_mode(allocator->depot->vdo, result); 2650 return; 2651 } 2652 2653 if (slab->status != VDO_SLAB_REBUILT) { 2654 register_slab_for_scrubbing(slab, false); 2655 return; 2656 } 2657 2658 if (!vdo_is_state_resuming(&slab->state)) { 2659 /* 2660 * If the slab is resuming, we've already accounted for it here, so don't do it 2661 * again. 2662 * FIXME: under what situation would the slab be resuming here? 2663 */ 2664 WRITE_ONCE(allocator->allocated_blocks, 2665 allocator->allocated_blocks - free_blocks); 2666 if (!is_slab_journal_blank(slab)) { 2667 WRITE_ONCE(allocator->statistics.slabs_opened, 2668 allocator->statistics.slabs_opened + 1); 2669 } 2670 } 2671 2672 if (allocator->depot->vdo->suspend_type == VDO_ADMIN_STATE_SAVING) 2673 reopen_slab_journal(slab); 2674 2675 prioritize_slab(slab); 2676 } 2677 2678 /** Implements vdo_admin_initiator_fn. */ 2679 static void initiate_slab_action(struct admin_state *state) 2680 { 2681 struct vdo_slab *slab = container_of(state, struct vdo_slab, state); 2682 2683 if (vdo_is_state_draining(state)) { 2684 const struct admin_state_code *operation = vdo_get_admin_state_code(state); 2685 2686 if (operation == VDO_ADMIN_STATE_SCRUBBING) 2687 slab->status = VDO_SLAB_REBUILDING; 2688 2689 drain_slab(slab); 2690 check_if_slab_drained(slab); 2691 return; 2692 } 2693 2694 if (vdo_is_state_loading(state)) { 2695 load_slab_journal(slab); 2696 return; 2697 } 2698 2699 if (vdo_is_state_resuming(state)) { 2700 queue_slab(slab); 2701 vdo_finish_resuming(state); 2702 return; 2703 } 2704 2705 vdo_finish_operation(state, VDO_INVALID_ADMIN_STATE); 2706 } 2707 2708 /** 2709 * get_next_slab() - Get the next slab to scrub. 2710 * @scrubber: The slab scrubber. 2711 * 2712 * Return: The next slab to scrub or NULL if there are none. 2713 */ 2714 static struct vdo_slab *get_next_slab(struct slab_scrubber *scrubber) 2715 { 2716 struct vdo_slab *slab; 2717 2718 slab = list_first_entry_or_null(&scrubber->high_priority_slabs, 2719 struct vdo_slab, allocq_entry); 2720 if (slab != NULL) 2721 return slab; 2722 2723 return list_first_entry_or_null(&scrubber->slabs, struct vdo_slab, 2724 allocq_entry); 2725 } 2726 2727 /** 2728 * has_slabs_to_scrub() - Check whether a scrubber has slabs to scrub. 2729 * @scrubber: The scrubber to check. 2730 * 2731 * Return: True if the scrubber has slabs to scrub. 2732 */ 2733 static inline bool __must_check has_slabs_to_scrub(struct slab_scrubber *scrubber) 2734 { 2735 return (get_next_slab(scrubber) != NULL); 2736 } 2737 2738 /** 2739 * uninitialize_scrubber_vio() - Clean up the slab_scrubber's vio. 2740 * @scrubber: The scrubber. 2741 */ 2742 static void uninitialize_scrubber_vio(struct slab_scrubber *scrubber) 2743 { 2744 vdo_free(vdo_forget(scrubber->vio.data)); 2745 free_vio_components(&scrubber->vio); 2746 } 2747 2748 /** 2749 * finish_scrubbing() - Stop scrubbing, either because there are no more slabs to scrub or because 2750 * there's been an error. 2751 * @scrubber: The scrubber. 2752 * @result: The result of the scrubbing operation. 2753 */ 2754 static void finish_scrubbing(struct slab_scrubber *scrubber, int result) 2755 { 2756 bool notify = vdo_waitq_has_waiters(&scrubber->waiters); 2757 bool done = !has_slabs_to_scrub(scrubber); 2758 struct block_allocator *allocator = 2759 container_of(scrubber, struct block_allocator, scrubber); 2760 2761 if (done) 2762 uninitialize_scrubber_vio(scrubber); 2763 2764 if (scrubber->high_priority_only) { 2765 scrubber->high_priority_only = false; 2766 vdo_fail_completion(vdo_forget(scrubber->vio.completion.parent), result); 2767 } else if (done && (atomic_add_return(-1, &allocator->depot->zones_to_scrub) == 0)) { 2768 /* All of our slabs were scrubbed, and we're the last allocator to finish. */ 2769 enum vdo_state prior_state = 2770 atomic_cmpxchg(&allocator->depot->vdo->state, VDO_RECOVERING, 2771 VDO_DIRTY); 2772 2773 /* 2774 * To be safe, even if the CAS failed, ensure anything that follows is ordered with 2775 * respect to whatever state change did happen. 2776 */ 2777 smp_mb__after_atomic(); 2778 2779 /* 2780 * We must check the VDO state here and not the depot's read_only_notifier since 2781 * the compare-swap-above could have failed due to a read-only entry which our own 2782 * thread does not yet know about. 2783 */ 2784 if (prior_state == VDO_DIRTY) 2785 vdo_log_info("VDO commencing normal operation"); 2786 else if (prior_state == VDO_RECOVERING) 2787 vdo_log_info("Exiting recovery mode"); 2788 free_vio_pool(vdo_forget(allocator->refcount_big_vio_pool)); 2789 } 2790 2791 /* 2792 * Note that the scrubber has stopped, and inform anyone who might be waiting for that to 2793 * happen. 2794 */ 2795 if (!vdo_finish_draining(&scrubber->admin_state)) 2796 WRITE_ONCE(scrubber->admin_state.current_state, 2797 VDO_ADMIN_STATE_SUSPENDED); 2798 2799 /* 2800 * We can't notify waiters until after we've finished draining or they'll just requeue. 2801 * Fortunately if there were waiters, we can't have been freed yet. 2802 */ 2803 if (notify) 2804 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL); 2805 } 2806 2807 static void scrub_next_slab(struct slab_scrubber *scrubber); 2808 2809 /** 2810 * slab_scrubbed() - Notify the scrubber that a slab has been scrubbed. 2811 * @completion: The slab rebuild completion. 2812 * 2813 * This callback is registered in apply_journal_entries(). 2814 */ 2815 static void slab_scrubbed(struct vdo_completion *completion) 2816 { 2817 struct slab_scrubber *scrubber = 2818 container_of(as_vio(completion), struct slab_scrubber, vio); 2819 struct vdo_slab *slab = scrubber->slab; 2820 2821 slab->status = VDO_SLAB_REBUILT; 2822 queue_slab(slab); 2823 reopen_slab_journal(slab); 2824 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count - 1); 2825 scrub_next_slab(scrubber); 2826 } 2827 2828 /** 2829 * abort_scrubbing() - Abort scrubbing due to an error. 2830 * @scrubber: The slab scrubber. 2831 * @result: The error. 2832 */ 2833 static void abort_scrubbing(struct slab_scrubber *scrubber, int result) 2834 { 2835 vdo_enter_read_only_mode(scrubber->vio.completion.vdo, result); 2836 finish_scrubbing(scrubber, result); 2837 } 2838 2839 /** 2840 * handle_scrubber_error() - Handle errors while rebuilding a slab. 2841 * @completion: The slab rebuild completion. 2842 */ 2843 static void handle_scrubber_error(struct vdo_completion *completion) 2844 { 2845 struct vio *vio = as_vio(completion); 2846 2847 vio_record_metadata_io_error(vio); 2848 abort_scrubbing(container_of(vio, struct slab_scrubber, vio), 2849 completion->result); 2850 } 2851 2852 /** 2853 * apply_block_entries() - Apply all the entries in a block to the reference counts. 2854 * @block: A block with entries to apply. 2855 * @entry_count: The number of entries to apply. 2856 * @block_number: The sequence number of the block. 2857 * @slab: The slab to apply the entries to. 2858 * 2859 * Return: VDO_SUCCESS or an error code. 2860 */ 2861 static int apply_block_entries(struct packed_slab_journal_block *block, 2862 journal_entry_count_t entry_count, 2863 sequence_number_t block_number, struct vdo_slab *slab) 2864 { 2865 struct journal_point entry_point = { 2866 .sequence_number = block_number, 2867 .entry_count = 0, 2868 }; 2869 int result; 2870 slab_block_number max_sbn = slab->end - slab->start; 2871 2872 while (entry_point.entry_count < entry_count) { 2873 struct slab_journal_entry entry = 2874 vdo_decode_slab_journal_entry(block, entry_point.entry_count); 2875 2876 if (entry.sbn > max_sbn) { 2877 /* This entry is out of bounds. */ 2878 return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL, 2879 "vdo_slab journal entry (%llu, %u) had invalid offset %u in slab (size %u blocks)", 2880 (unsigned long long) block_number, 2881 entry_point.entry_count, 2882 entry.sbn, max_sbn); 2883 } 2884 2885 result = replay_reference_count_change(slab, &entry_point, entry); 2886 if (result != VDO_SUCCESS) { 2887 vdo_log_error_strerror(result, 2888 "vdo_slab journal entry (%llu, %u) (%s of offset %u) could not be applied in slab %u", 2889 (unsigned long long) block_number, 2890 entry_point.entry_count, 2891 vdo_get_journal_operation_name(entry.operation), 2892 entry.sbn, slab->slab_number); 2893 return result; 2894 } 2895 entry_point.entry_count++; 2896 } 2897 2898 return VDO_SUCCESS; 2899 } 2900 2901 /** 2902 * apply_journal_entries() - Find the relevant vio of the slab journal and apply all valid entries. 2903 * @completion: The metadata read vio completion. 2904 * 2905 * This is a callback registered in start_scrubbing(). 2906 */ 2907 static void apply_journal_entries(struct vdo_completion *completion) 2908 { 2909 int result; 2910 struct slab_scrubber *scrubber = 2911 container_of(as_vio(completion), struct slab_scrubber, vio); 2912 struct vdo_slab *slab = scrubber->slab; 2913 struct slab_journal *journal = &slab->journal; 2914 2915 /* Find the boundaries of the useful part of the journal. */ 2916 sequence_number_t tail = journal->tail; 2917 tail_block_offset_t end_index = (tail - 1) % journal->size; 2918 char *end_data = scrubber->vio.data + (end_index * VDO_BLOCK_SIZE); 2919 struct packed_slab_journal_block *end_block = 2920 (struct packed_slab_journal_block *) end_data; 2921 2922 sequence_number_t head = __le64_to_cpu(end_block->header.head); 2923 tail_block_offset_t head_index = head % journal->size; 2924 block_count_t index = head_index; 2925 2926 struct journal_point ref_counts_point = slab->slab_journal_point; 2927 struct journal_point last_entry_applied = ref_counts_point; 2928 sequence_number_t sequence; 2929 2930 for (sequence = head; sequence < tail; sequence++) { 2931 char *block_data = scrubber->vio.data + (index * VDO_BLOCK_SIZE); 2932 struct packed_slab_journal_block *block = 2933 (struct packed_slab_journal_block *) block_data; 2934 struct slab_journal_block_header header; 2935 2936 vdo_unpack_slab_journal_block_header(&block->header, &header); 2937 2938 if ((header.nonce != slab->allocator->nonce) || 2939 (header.metadata_type != VDO_METADATA_SLAB_JOURNAL) || 2940 (header.sequence_number != sequence) || 2941 (header.entry_count > journal->entries_per_block) || 2942 (header.has_block_map_increments && 2943 (header.entry_count > journal->full_entries_per_block))) { 2944 /* The block is not what we expect it to be. */ 2945 vdo_log_error("vdo_slab journal block for slab %u was invalid", 2946 slab->slab_number); 2947 abort_scrubbing(scrubber, VDO_CORRUPT_JOURNAL); 2948 return; 2949 } 2950 2951 result = apply_block_entries(block, header.entry_count, sequence, slab); 2952 if (result != VDO_SUCCESS) { 2953 abort_scrubbing(scrubber, result); 2954 return; 2955 } 2956 2957 last_entry_applied.sequence_number = sequence; 2958 last_entry_applied.entry_count = header.entry_count - 1; 2959 index++; 2960 if (index == journal->size) 2961 index = 0; 2962 } 2963 2964 /* 2965 * At the end of rebuild, the reference counters should be accurate to the end of the 2966 * journal we just applied. 2967 */ 2968 result = VDO_ASSERT(!vdo_before_journal_point(&last_entry_applied, 2969 &ref_counts_point), 2970 "Refcounts are not more accurate than the slab journal"); 2971 if (result != VDO_SUCCESS) { 2972 abort_scrubbing(scrubber, result); 2973 return; 2974 } 2975 2976 /* Save out the rebuilt reference blocks. */ 2977 vdo_prepare_completion(completion, slab_scrubbed, handle_scrubber_error, 2978 slab->allocator->thread_id, completion->parent); 2979 vdo_start_operation_with_waiter(&slab->state, 2980 VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING, 2981 completion, initiate_slab_action); 2982 } 2983 2984 static void read_slab_journal_endio(struct bio *bio) 2985 { 2986 struct vio *vio = bio->bi_private; 2987 struct slab_scrubber *scrubber = container_of(vio, struct slab_scrubber, vio); 2988 2989 continue_vio_after_io(bio->bi_private, apply_journal_entries, 2990 scrubber->slab->allocator->thread_id); 2991 } 2992 2993 /** 2994 * start_scrubbing() - Read the current slab's journal from disk now that it has been flushed. 2995 * @completion: The scrubber's vio completion. 2996 * 2997 * This callback is registered in scrub_next_slab(). 2998 */ 2999 static void start_scrubbing(struct vdo_completion *completion) 3000 { 3001 struct slab_scrubber *scrubber = 3002 container_of(as_vio(completion), struct slab_scrubber, vio); 3003 struct vdo_slab *slab = scrubber->slab; 3004 3005 if (!slab->allocator->summary_entries[slab->slab_number].is_dirty) { 3006 slab_scrubbed(completion); 3007 return; 3008 } 3009 3010 vdo_submit_metadata_vio(&scrubber->vio, slab->journal_origin, 3011 read_slab_journal_endio, handle_scrubber_error, 3012 REQ_OP_READ); 3013 } 3014 3015 /** 3016 * scrub_next_slab() - Scrub the next slab if there is one. 3017 * @scrubber: The scrubber. 3018 */ 3019 static void scrub_next_slab(struct slab_scrubber *scrubber) 3020 { 3021 struct vdo_completion *completion = &scrubber->vio.completion; 3022 struct vdo_slab *slab; 3023 3024 /* 3025 * Note: this notify call is always safe only because scrubbing can only be started when 3026 * the VDO is quiescent. 3027 */ 3028 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL); 3029 3030 if (vdo_is_read_only(completion->vdo)) { 3031 finish_scrubbing(scrubber, VDO_READ_ONLY); 3032 return; 3033 } 3034 3035 slab = get_next_slab(scrubber); 3036 if ((slab == NULL) || 3037 (scrubber->high_priority_only && list_empty(&scrubber->high_priority_slabs))) { 3038 finish_scrubbing(scrubber, VDO_SUCCESS); 3039 return; 3040 } 3041 3042 if (vdo_finish_draining(&scrubber->admin_state)) 3043 return; 3044 3045 list_del_init(&slab->allocq_entry); 3046 scrubber->slab = slab; 3047 vdo_prepare_completion(completion, start_scrubbing, handle_scrubber_error, 3048 slab->allocator->thread_id, completion->parent); 3049 vdo_start_operation_with_waiter(&slab->state, VDO_ADMIN_STATE_SCRUBBING, 3050 completion, initiate_slab_action); 3051 } 3052 3053 /** 3054 * scrub_slabs() - Scrub all of an allocator's slabs that are eligible for scrubbing. 3055 * @allocator: The block_allocator to scrub. 3056 * @parent: The completion to notify when scrubbing is done, implies high_priority, may be NULL. 3057 */ 3058 static void scrub_slabs(struct block_allocator *allocator, struct vdo_completion *parent) 3059 { 3060 struct slab_scrubber *scrubber = &allocator->scrubber; 3061 3062 scrubber->vio.completion.parent = parent; 3063 scrubber->high_priority_only = (parent != NULL); 3064 if (!has_slabs_to_scrub(scrubber)) { 3065 finish_scrubbing(scrubber, VDO_SUCCESS); 3066 return; 3067 } 3068 3069 if (scrubber->high_priority_only && 3070 vdo_is_priority_table_empty(allocator->prioritized_slabs) && 3071 list_empty(&scrubber->high_priority_slabs)) 3072 register_slab_for_scrubbing(get_next_slab(scrubber), true); 3073 3074 vdo_resume_if_quiescent(&scrubber->admin_state); 3075 scrub_next_slab(scrubber); 3076 } 3077 3078 static inline void assert_on_allocator_thread(thread_id_t thread_id, 3079 const char *function_name) 3080 { 3081 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == thread_id), 3082 "%s called on correct thread", function_name); 3083 } 3084 3085 static void register_slab_with_allocator(struct block_allocator *allocator, 3086 struct vdo_slab *slab) 3087 { 3088 allocator->slab_count++; 3089 allocator->last_slab = slab->slab_number; 3090 } 3091 3092 /** 3093 * get_depot_slab_iterator() - Return a slab_iterator over the slabs in a slab_depot. 3094 * @depot: The depot over which to iterate. 3095 * @start: The number of the slab to start iterating from. 3096 * @end: The number of the last slab which may be returned. 3097 * @stride: The difference in slab number between successive slabs. 3098 * 3099 * Iteration always occurs from higher to lower numbered slabs. 3100 * 3101 * Return: An initialized iterator structure. 3102 */ 3103 static struct slab_iterator get_depot_slab_iterator(struct slab_depot *depot, 3104 slab_count_t start, slab_count_t end, 3105 slab_count_t stride) 3106 { 3107 struct vdo_slab **slabs = depot->slabs; 3108 3109 return (struct slab_iterator) { 3110 .slabs = slabs, 3111 .next = (((slabs == NULL) || (start < end)) ? NULL : slabs[start]), 3112 .end = end, 3113 .stride = stride, 3114 }; 3115 } 3116 3117 static struct slab_iterator get_slab_iterator(const struct block_allocator *allocator) 3118 { 3119 return get_depot_slab_iterator(allocator->depot, allocator->last_slab, 3120 allocator->zone_number, 3121 allocator->depot->zone_count); 3122 } 3123 3124 /** 3125 * next_slab() - Get the next slab from a slab_iterator and advance the iterator 3126 * @iterator: The slab_iterator. 3127 * 3128 * Return: The next slab or NULL if the iterator is exhausted. 3129 */ 3130 static struct vdo_slab *next_slab(struct slab_iterator *iterator) 3131 { 3132 struct vdo_slab *slab = iterator->next; 3133 3134 if ((slab == NULL) || (slab->slab_number < iterator->end + iterator->stride)) 3135 iterator->next = NULL; 3136 else 3137 iterator->next = iterator->slabs[slab->slab_number - iterator->stride]; 3138 3139 return slab; 3140 } 3141 3142 /** 3143 * abort_waiter() - Abort vios waiting to make journal entries when read-only. 3144 * @waiter: A waiting data_vio. 3145 * @context: Not used. 3146 * 3147 * This callback is invoked on all vios waiting to make slab journal entries after the VDO has gone 3148 * into read-only mode. Implements waiter_callback_fn. 3149 */ 3150 static void abort_waiter(struct vdo_waiter *waiter, void __always_unused *context) 3151 { 3152 struct reference_updater *updater = 3153 container_of(waiter, struct reference_updater, waiter); 3154 struct data_vio *data_vio = data_vio_from_reference_updater(updater); 3155 3156 if (updater->increment) { 3157 continue_data_vio_with_error(data_vio, VDO_READ_ONLY); 3158 return; 3159 } 3160 3161 vdo_continue_completion(&data_vio->decrement_completion, VDO_READ_ONLY); 3162 } 3163 3164 /* Implements vdo_read_only_notification_fn. */ 3165 static void notify_block_allocator_of_read_only_mode(void *listener, 3166 struct vdo_completion *parent) 3167 { 3168 struct block_allocator *allocator = listener; 3169 struct slab_iterator iterator; 3170 3171 assert_on_allocator_thread(allocator->thread_id, __func__); 3172 iterator = get_slab_iterator(allocator); 3173 while (iterator.next != NULL) { 3174 struct vdo_slab *slab = next_slab(&iterator); 3175 3176 vdo_waitq_notify_all_waiters(&slab->journal.entry_waiters, 3177 abort_waiter, &slab->journal); 3178 check_if_slab_drained(slab); 3179 } 3180 3181 vdo_finish_completion(parent); 3182 } 3183 3184 /** 3185 * vdo_acquire_provisional_reference() - Acquire a provisional reference on behalf of a PBN lock if 3186 * the block it locks is unreferenced. 3187 * @slab: The slab which contains the block. 3188 * @pbn: The physical block to reference. 3189 * @lock: The lock. 3190 * 3191 * Return: VDO_SUCCESS or an error. 3192 */ 3193 int vdo_acquire_provisional_reference(struct vdo_slab *slab, physical_block_number_t pbn, 3194 struct pbn_lock *lock) 3195 { 3196 slab_block_number block_number; 3197 int result; 3198 3199 if (vdo_pbn_lock_has_provisional_reference(lock)) 3200 return VDO_SUCCESS; 3201 3202 if (!is_slab_open(slab)) 3203 return VDO_INVALID_ADMIN_STATE; 3204 3205 result = slab_block_number_from_pbn(slab, pbn, &block_number); 3206 if (result != VDO_SUCCESS) 3207 return result; 3208 3209 if (slab->counters[block_number] == EMPTY_REFERENCE_COUNT) { 3210 make_provisional_reference(slab, block_number); 3211 if (lock != NULL) 3212 vdo_assign_pbn_lock_provisional_reference(lock); 3213 } 3214 3215 if (vdo_pbn_lock_has_provisional_reference(lock)) 3216 adjust_free_block_count(slab, false); 3217 3218 return VDO_SUCCESS; 3219 } 3220 3221 static int __must_check allocate_slab_block(struct vdo_slab *slab, 3222 physical_block_number_t *block_number_ptr) 3223 { 3224 slab_block_number free_index; 3225 3226 if (!is_slab_open(slab)) 3227 return VDO_INVALID_ADMIN_STATE; 3228 3229 if (!search_reference_blocks(slab, &free_index)) 3230 return VDO_NO_SPACE; 3231 3232 VDO_ASSERT_LOG_ONLY((slab->counters[free_index] == EMPTY_REFERENCE_COUNT), 3233 "free block must have ref count of zero"); 3234 make_provisional_reference(slab, free_index); 3235 adjust_free_block_count(slab, false); 3236 3237 /* 3238 * Update the search hint so the next search will start at the array index just past the 3239 * free block we just found. 3240 */ 3241 slab->search_cursor.index = (free_index + 1); 3242 3243 *block_number_ptr = slab->start + free_index; 3244 return VDO_SUCCESS; 3245 } 3246 3247 /** 3248 * open_slab() - Prepare a slab to be allocated from. 3249 * @slab: The slab. 3250 */ 3251 static void open_slab(struct vdo_slab *slab) 3252 { 3253 reset_search_cursor(slab); 3254 if (is_slab_journal_blank(slab)) { 3255 WRITE_ONCE(slab->allocator->statistics.slabs_opened, 3256 slab->allocator->statistics.slabs_opened + 1); 3257 dirty_all_reference_blocks(slab); 3258 } else { 3259 WRITE_ONCE(slab->allocator->statistics.slabs_reopened, 3260 slab->allocator->statistics.slabs_reopened + 1); 3261 } 3262 3263 slab->allocator->open_slab = slab; 3264 } 3265 3266 3267 /* 3268 * The block allocated will have a provisional reference and the reference must be either confirmed 3269 * with a subsequent increment or vacated with a subsequent decrement via 3270 * vdo_release_block_reference(). 3271 */ 3272 int vdo_allocate_block(struct block_allocator *allocator, 3273 physical_block_number_t *block_number_ptr) 3274 { 3275 int result; 3276 3277 if (allocator->open_slab != NULL) { 3278 /* Try to allocate the next block in the currently open slab. */ 3279 result = allocate_slab_block(allocator->open_slab, block_number_ptr); 3280 if ((result == VDO_SUCCESS) || (result != VDO_NO_SPACE)) 3281 return result; 3282 3283 /* Put the exhausted open slab back into the priority table. */ 3284 prioritize_slab(allocator->open_slab); 3285 } 3286 3287 /* Remove the highest priority slab from the priority table and make it the open slab. */ 3288 open_slab(list_entry(vdo_priority_table_dequeue(allocator->prioritized_slabs), 3289 struct vdo_slab, allocq_entry)); 3290 3291 /* 3292 * Try allocating again. If we're out of space immediately after opening a slab, then every 3293 * slab must be fully allocated. 3294 */ 3295 return allocate_slab_block(allocator->open_slab, block_number_ptr); 3296 } 3297 3298 /** 3299 * vdo_enqueue_clean_slab_waiter() - Wait for a clean slab. 3300 * @allocator: The block_allocator on which to wait. 3301 * @waiter: The waiter. 3302 * 3303 * Return: VDO_SUCCESS if the waiter was queued, VDO_NO_SPACE if there are no slabs to scrub, and 3304 * some other error otherwise. 3305 */ 3306 int vdo_enqueue_clean_slab_waiter(struct block_allocator *allocator, 3307 struct vdo_waiter *waiter) 3308 { 3309 if (vdo_is_read_only(allocator->depot->vdo)) 3310 return VDO_READ_ONLY; 3311 3312 if (vdo_is_state_quiescent(&allocator->scrubber.admin_state)) 3313 return VDO_NO_SPACE; 3314 3315 vdo_waitq_enqueue_waiter(&allocator->scrubber.waiters, waiter); 3316 return VDO_SUCCESS; 3317 } 3318 3319 /** 3320 * vdo_modify_reference_count() - Modify the reference count of a block by first making a slab 3321 * journal entry and then updating the reference counter. 3322 * @completion: The data_vio completion for which to add the entry. 3323 * @updater: Which of the data_vio's reference updaters is being submitted. 3324 */ 3325 void vdo_modify_reference_count(struct vdo_completion *completion, 3326 struct reference_updater *updater) 3327 { 3328 struct vdo_slab *slab = vdo_get_slab(completion->vdo->depot, updater->zpbn.pbn); 3329 3330 if (!is_slab_open(slab)) { 3331 vdo_continue_completion(completion, VDO_INVALID_ADMIN_STATE); 3332 return; 3333 } 3334 3335 if (vdo_is_read_only(completion->vdo)) { 3336 vdo_continue_completion(completion, VDO_READ_ONLY); 3337 return; 3338 } 3339 3340 vdo_waitq_enqueue_waiter(&slab->journal.entry_waiters, &updater->waiter); 3341 if ((slab->status != VDO_SLAB_REBUILT) && requires_reaping(&slab->journal)) 3342 register_slab_for_scrubbing(slab, true); 3343 3344 add_entries(&slab->journal); 3345 } 3346 3347 /* Release an unused provisional reference. */ 3348 int vdo_release_block_reference(struct block_allocator *allocator, 3349 physical_block_number_t pbn) 3350 { 3351 struct reference_updater updater; 3352 3353 if (pbn == VDO_ZERO_BLOCK) 3354 return VDO_SUCCESS; 3355 3356 updater = (struct reference_updater) { 3357 .operation = VDO_JOURNAL_DATA_REMAPPING, 3358 .increment = false, 3359 .zpbn = { 3360 .pbn = pbn, 3361 }, 3362 }; 3363 3364 return adjust_reference_count(vdo_get_slab(allocator->depot, pbn), 3365 &updater, NULL); 3366 } 3367 3368 /* 3369 * This is a min_heap callback function orders slab_status structures using the 'is_clean' field as 3370 * the primary key and the 'emptiness' field as the secondary key. 3371 * 3372 * Slabs need to be pushed onto the lists in the same order they are to be popped off. Popping 3373 * should always get the most empty first, so pushing should be from most empty to least empty. 3374 * Thus, the ordering is reversed from the usual sense since min_heap returns smaller elements 3375 * before larger ones. 3376 */ 3377 static bool slab_status_is_less_than(const void *item1, const void *item2, 3378 void __always_unused *args) 3379 { 3380 const struct slab_status *info1 = item1; 3381 const struct slab_status *info2 = item2; 3382 3383 if (info1->is_clean != info2->is_clean) 3384 return info1->is_clean; 3385 if (info1->emptiness != info2->emptiness) 3386 return info1->emptiness > info2->emptiness; 3387 return info1->slab_number < info2->slab_number; 3388 } 3389 3390 static const struct min_heap_callbacks slab_status_min_heap = { 3391 .less = slab_status_is_less_than, 3392 .swp = NULL, 3393 }; 3394 3395 /* Inform the slab actor that a action has finished on some slab; used by apply_to_slabs(). */ 3396 static void slab_action_callback(struct vdo_completion *completion) 3397 { 3398 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3399 struct slab_actor *actor = &allocator->slab_actor; 3400 3401 if (--actor->slab_action_count == 0) { 3402 actor->callback(completion); 3403 return; 3404 } 3405 3406 vdo_reset_completion(completion); 3407 } 3408 3409 /* Preserve the error from part of an action and continue. */ 3410 static void handle_operation_error(struct vdo_completion *completion) 3411 { 3412 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3413 3414 if (allocator->state.waiter != NULL) 3415 vdo_set_completion_result(allocator->state.waiter, completion->result); 3416 completion->callback(completion); 3417 } 3418 3419 /* Perform an action on each of an allocator's slabs in parallel. */ 3420 static void apply_to_slabs(struct block_allocator *allocator, vdo_action_fn callback) 3421 { 3422 struct slab_iterator iterator; 3423 3424 vdo_prepare_completion(&allocator->completion, slab_action_callback, 3425 handle_operation_error, allocator->thread_id, NULL); 3426 allocator->completion.requeue = false; 3427 3428 /* 3429 * Since we are going to dequeue all of the slabs, the open slab will become invalid, so 3430 * clear it. 3431 */ 3432 allocator->open_slab = NULL; 3433 3434 /* Ensure that we don't finish before we're done starting. */ 3435 allocator->slab_actor = (struct slab_actor) { 3436 .slab_action_count = 1, 3437 .callback = callback, 3438 }; 3439 3440 iterator = get_slab_iterator(allocator); 3441 while (iterator.next != NULL) { 3442 const struct admin_state_code *operation = 3443 vdo_get_admin_state_code(&allocator->state); 3444 struct vdo_slab *slab = next_slab(&iterator); 3445 3446 list_del_init(&slab->allocq_entry); 3447 allocator->slab_actor.slab_action_count++; 3448 vdo_start_operation_with_waiter(&slab->state, operation, 3449 &allocator->completion, 3450 initiate_slab_action); 3451 } 3452 3453 slab_action_callback(&allocator->completion); 3454 } 3455 3456 static void finish_loading_allocator(struct vdo_completion *completion) 3457 { 3458 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3459 const struct admin_state_code *operation = 3460 vdo_get_admin_state_code(&allocator->state); 3461 3462 if (allocator->eraser != NULL) 3463 dm_kcopyd_client_destroy(vdo_forget(allocator->eraser)); 3464 3465 if (operation == VDO_ADMIN_STATE_LOADING_FOR_RECOVERY) { 3466 void *context = 3467 vdo_get_current_action_context(allocator->depot->action_manager); 3468 3469 vdo_replay_into_slab_journals(allocator, context); 3470 return; 3471 } 3472 3473 vdo_finish_loading(&allocator->state); 3474 } 3475 3476 static void erase_next_slab_journal(struct block_allocator *allocator); 3477 3478 static void copy_callback(int read_err, unsigned long write_err, void *context) 3479 { 3480 struct block_allocator *allocator = context; 3481 int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO); 3482 3483 if (result != VDO_SUCCESS) { 3484 vdo_fail_completion(&allocator->completion, result); 3485 return; 3486 } 3487 3488 erase_next_slab_journal(allocator); 3489 } 3490 3491 /* erase_next_slab_journal() - Erase the next slab journal. */ 3492 static void erase_next_slab_journal(struct block_allocator *allocator) 3493 { 3494 struct vdo_slab *slab; 3495 physical_block_number_t pbn; 3496 struct dm_io_region regions[1]; 3497 struct slab_depot *depot = allocator->depot; 3498 block_count_t blocks = depot->slab_config.slab_journal_blocks; 3499 3500 if (allocator->slabs_to_erase.next == NULL) { 3501 vdo_finish_completion(&allocator->completion); 3502 return; 3503 } 3504 3505 slab = next_slab(&allocator->slabs_to_erase); 3506 pbn = slab->journal_origin - depot->vdo->geometry.bio_offset; 3507 regions[0] = (struct dm_io_region) { 3508 .bdev = vdo_get_backing_device(depot->vdo), 3509 .sector = pbn * VDO_SECTORS_PER_BLOCK, 3510 .count = blocks * VDO_SECTORS_PER_BLOCK, 3511 }; 3512 dm_kcopyd_zero(allocator->eraser, 1, regions, 0, copy_callback, allocator); 3513 } 3514 3515 /* Implements vdo_admin_initiator_fn. */ 3516 static void initiate_load(struct admin_state *state) 3517 { 3518 struct block_allocator *allocator = 3519 container_of(state, struct block_allocator, state); 3520 const struct admin_state_code *operation = vdo_get_admin_state_code(state); 3521 3522 if (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD) { 3523 /* 3524 * Must requeue because the kcopyd client cannot be freed in the same stack frame 3525 * as the kcopyd callback, lest it deadlock. 3526 */ 3527 vdo_prepare_completion_for_requeue(&allocator->completion, 3528 finish_loading_allocator, 3529 handle_operation_error, 3530 allocator->thread_id, NULL); 3531 allocator->eraser = dm_kcopyd_client_create(NULL); 3532 if (IS_ERR(allocator->eraser)) { 3533 vdo_fail_completion(&allocator->completion, 3534 PTR_ERR(allocator->eraser)); 3535 allocator->eraser = NULL; 3536 return; 3537 } 3538 allocator->slabs_to_erase = get_slab_iterator(allocator); 3539 3540 erase_next_slab_journal(allocator); 3541 return; 3542 } 3543 3544 apply_to_slabs(allocator, finish_loading_allocator); 3545 } 3546 3547 /** 3548 * vdo_notify_slab_journals_are_recovered() - Inform a block allocator that its slab journals have 3549 * been recovered from the recovery journal. 3550 * @completion: The allocator completion. 3551 */ 3552 void vdo_notify_slab_journals_are_recovered(struct vdo_completion *completion) 3553 { 3554 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3555 3556 vdo_finish_loading_with_result(&allocator->state, completion->result); 3557 } 3558 3559 static int get_slab_statuses(struct block_allocator *allocator, 3560 struct slab_status **statuses_ptr) 3561 { 3562 int result; 3563 struct slab_status *statuses; 3564 struct slab_iterator iterator = get_slab_iterator(allocator); 3565 3566 result = vdo_allocate(allocator->slab_count, struct slab_status, __func__, 3567 &statuses); 3568 if (result != VDO_SUCCESS) 3569 return result; 3570 3571 *statuses_ptr = statuses; 3572 3573 while (iterator.next != NULL) { 3574 slab_count_t slab_number = next_slab(&iterator)->slab_number; 3575 3576 *statuses++ = (struct slab_status) { 3577 .slab_number = slab_number, 3578 .is_clean = !allocator->summary_entries[slab_number].is_dirty, 3579 .emptiness = allocator->summary_entries[slab_number].fullness_hint, 3580 }; 3581 } 3582 3583 return VDO_SUCCESS; 3584 } 3585 3586 /* Prepare slabs for allocation or scrubbing. */ 3587 static int __must_check vdo_prepare_slabs_for_allocation(struct block_allocator *allocator) 3588 { 3589 struct slab_status current_slab_status; 3590 DEFINE_MIN_HEAP(struct slab_status, heap) heap; 3591 int result; 3592 struct slab_status *slab_statuses; 3593 struct slab_depot *depot = allocator->depot; 3594 3595 WRITE_ONCE(allocator->allocated_blocks, 3596 allocator->slab_count * depot->slab_config.data_blocks); 3597 result = get_slab_statuses(allocator, &slab_statuses); 3598 if (result != VDO_SUCCESS) 3599 return result; 3600 3601 /* Sort the slabs by cleanliness, then by emptiness hint. */ 3602 heap = (struct heap) { 3603 .data = slab_statuses, 3604 .nr = allocator->slab_count, 3605 .size = allocator->slab_count, 3606 }; 3607 min_heapify_all(&heap, &slab_status_min_heap, NULL); 3608 3609 while (heap.nr > 0) { 3610 bool high_priority; 3611 struct vdo_slab *slab; 3612 struct slab_journal *journal; 3613 3614 current_slab_status = slab_statuses[0]; 3615 min_heap_pop(&heap, &slab_status_min_heap, NULL); 3616 slab = depot->slabs[current_slab_status.slab_number]; 3617 3618 if ((depot->load_type == VDO_SLAB_DEPOT_REBUILD_LOAD) || 3619 (!allocator->summary_entries[slab->slab_number].load_ref_counts && 3620 current_slab_status.is_clean)) { 3621 queue_slab(slab); 3622 continue; 3623 } 3624 3625 slab->status = VDO_SLAB_REQUIRES_SCRUBBING; 3626 journal = &slab->journal; 3627 high_priority = ((current_slab_status.is_clean && 3628 (depot->load_type == VDO_SLAB_DEPOT_NORMAL_LOAD)) || 3629 (journal_length(journal) >= journal->scrubbing_threshold)); 3630 register_slab_for_scrubbing(slab, high_priority); 3631 } 3632 3633 vdo_free(slab_statuses); 3634 return VDO_SUCCESS; 3635 } 3636 3637 static const char *status_to_string(enum slab_rebuild_status status) 3638 { 3639 switch (status) { 3640 case VDO_SLAB_REBUILT: 3641 return "REBUILT"; 3642 case VDO_SLAB_REQUIRES_SCRUBBING: 3643 return "SCRUBBING"; 3644 case VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING: 3645 return "PRIORITY_SCRUBBING"; 3646 case VDO_SLAB_REBUILDING: 3647 return "REBUILDING"; 3648 case VDO_SLAB_REPLAYING: 3649 return "REPLAYING"; 3650 default: 3651 return "UNKNOWN"; 3652 } 3653 } 3654 3655 void vdo_dump_block_allocator(const struct block_allocator *allocator) 3656 { 3657 unsigned int pause_counter = 0; 3658 struct slab_iterator iterator = get_slab_iterator(allocator); 3659 const struct slab_scrubber *scrubber = &allocator->scrubber; 3660 3661 vdo_log_info("block_allocator zone %u", allocator->zone_number); 3662 while (iterator.next != NULL) { 3663 struct vdo_slab *slab = next_slab(&iterator); 3664 struct slab_journal *journal = &slab->journal; 3665 3666 if (slab->reference_blocks != NULL) { 3667 /* Terse because there are a lot of slabs to dump and syslog is lossy. */ 3668 vdo_log_info("slab %u: P%u, %llu free", slab->slab_number, 3669 slab->priority, 3670 (unsigned long long) slab->free_blocks); 3671 } else { 3672 vdo_log_info("slab %u: status %s", slab->slab_number, 3673 status_to_string(slab->status)); 3674 } 3675 3676 vdo_log_info(" slab journal: entry_waiters=%zu waiting_to_commit=%s updating_slab_summary=%s head=%llu unreapable=%llu tail=%llu next_commit=%llu summarized=%llu last_summarized=%llu recovery_lock=%llu dirty=%s", 3677 vdo_waitq_num_waiters(&journal->entry_waiters), 3678 vdo_bool_to_string(journal->waiting_to_commit), 3679 vdo_bool_to_string(journal->updating_slab_summary), 3680 (unsigned long long) journal->head, 3681 (unsigned long long) journal->unreapable, 3682 (unsigned long long) journal->tail, 3683 (unsigned long long) journal->next_commit, 3684 (unsigned long long) journal->summarized, 3685 (unsigned long long) journal->last_summarized, 3686 (unsigned long long) journal->recovery_lock, 3687 vdo_bool_to_string(journal->recovery_lock != 0)); 3688 /* 3689 * Given the frequency with which the locks are just a tiny bit off, it might be 3690 * worth dumping all the locks, but that might be too much logging. 3691 */ 3692 3693 if (slab->counters != NULL) { 3694 /* Terse because there are a lot of slabs to dump and syslog is lossy. */ 3695 vdo_log_info(" slab: free=%u/%u blocks=%u dirty=%zu active=%zu journal@(%llu,%u)", 3696 slab->free_blocks, slab->block_count, 3697 slab->reference_block_count, 3698 vdo_waitq_num_waiters(&slab->dirty_blocks), 3699 slab->active_count, 3700 (unsigned long long) slab->slab_journal_point.sequence_number, 3701 slab->slab_journal_point.entry_count); 3702 } else { 3703 vdo_log_info(" no counters"); 3704 } 3705 3706 /* 3707 * Wait for a while after each batch of 32 slabs dumped, an arbitrary number, 3708 * allowing the kernel log a chance to be flushed instead of being overrun. 3709 */ 3710 if (pause_counter++ == 31) { 3711 pause_counter = 0; 3712 vdo_pause_for_logger(); 3713 } 3714 } 3715 3716 vdo_log_info("slab_scrubber slab_count %u waiters %zu %s%s", 3717 READ_ONCE(scrubber->slab_count), 3718 vdo_waitq_num_waiters(&scrubber->waiters), 3719 vdo_get_admin_state_code(&scrubber->admin_state)->name, 3720 scrubber->high_priority_only ? ", high_priority_only " : ""); 3721 } 3722 3723 static void free_slab(struct vdo_slab *slab) 3724 { 3725 if (slab == NULL) 3726 return; 3727 3728 list_del(&slab->allocq_entry); 3729 vdo_free(vdo_forget(slab->journal.block)); 3730 vdo_free(vdo_forget(slab->journal.locks)); 3731 vdo_free(vdo_forget(slab->counters)); 3732 vdo_free(vdo_forget(slab->reference_blocks)); 3733 vdo_free(slab); 3734 } 3735 3736 static int initialize_slab_journal(struct vdo_slab *slab) 3737 { 3738 struct slab_journal *journal = &slab->journal; 3739 const struct slab_config *slab_config = &slab->allocator->depot->slab_config; 3740 int result; 3741 3742 result = vdo_allocate(slab_config->slab_journal_blocks, struct journal_lock, 3743 __func__, &journal->locks); 3744 if (result != VDO_SUCCESS) 3745 return result; 3746 3747 result = vdo_allocate(VDO_BLOCK_SIZE, char, "struct packed_slab_journal_block", 3748 (char **) &journal->block); 3749 if (result != VDO_SUCCESS) 3750 return result; 3751 3752 journal->slab = slab; 3753 journal->size = slab_config->slab_journal_blocks; 3754 journal->flushing_threshold = slab_config->slab_journal_flushing_threshold; 3755 journal->blocking_threshold = slab_config->slab_journal_blocking_threshold; 3756 journal->scrubbing_threshold = slab_config->slab_journal_scrubbing_threshold; 3757 journal->entries_per_block = VDO_SLAB_JOURNAL_ENTRIES_PER_BLOCK; 3758 journal->full_entries_per_block = VDO_SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK; 3759 journal->events = &slab->allocator->slab_journal_statistics; 3760 journal->recovery_journal = slab->allocator->depot->vdo->recovery_journal; 3761 journal->tail = 1; 3762 journal->head = 1; 3763 3764 journal->flushing_deadline = journal->flushing_threshold; 3765 /* 3766 * Set there to be some time between the deadline and the blocking threshold, so that 3767 * hopefully all are done before blocking. 3768 */ 3769 if ((journal->blocking_threshold - journal->flushing_threshold) > 5) 3770 journal->flushing_deadline = journal->blocking_threshold - 5; 3771 3772 journal->slab_summary_waiter.callback = release_journal_locks; 3773 3774 INIT_LIST_HEAD(&journal->dirty_entry); 3775 INIT_LIST_HEAD(&journal->uncommitted_blocks); 3776 3777 journal->tail_header.nonce = slab->allocator->nonce; 3778 journal->tail_header.metadata_type = VDO_METADATA_SLAB_JOURNAL; 3779 initialize_journal_state(journal); 3780 return VDO_SUCCESS; 3781 } 3782 3783 /** 3784 * make_slab() - Construct a new, empty slab. 3785 * @slab_origin: The physical block number within the block allocator partition of the first block 3786 * in the slab. 3787 * @allocator: The block allocator to which the slab belongs. 3788 * @slab_number: The slab number of the slab. 3789 * @is_new: True if this slab is being allocated as part of a resize. 3790 * @slab_ptr: A pointer to receive the new slab. 3791 * 3792 * Return: VDO_SUCCESS or an error code. 3793 */ 3794 static int __must_check make_slab(physical_block_number_t slab_origin, 3795 struct block_allocator *allocator, 3796 slab_count_t slab_number, bool is_new, 3797 struct vdo_slab **slab_ptr) 3798 { 3799 const struct slab_config *slab_config = &allocator->depot->slab_config; 3800 struct vdo_slab *slab; 3801 int result; 3802 3803 result = vdo_allocate(1, struct vdo_slab, __func__, &slab); 3804 if (result != VDO_SUCCESS) 3805 return result; 3806 3807 *slab = (struct vdo_slab) { 3808 .allocator = allocator, 3809 .start = slab_origin, 3810 .end = slab_origin + slab_config->slab_blocks, 3811 .slab_number = slab_number, 3812 .ref_counts_origin = slab_origin + slab_config->data_blocks, 3813 .journal_origin = 3814 vdo_get_slab_journal_start_block(slab_config, slab_origin), 3815 .block_count = slab_config->data_blocks, 3816 .free_blocks = slab_config->data_blocks, 3817 .reference_block_count = 3818 vdo_get_saved_reference_count_size(slab_config->data_blocks), 3819 }; 3820 INIT_LIST_HEAD(&slab->allocq_entry); 3821 3822 result = initialize_slab_journal(slab); 3823 if (result != VDO_SUCCESS) { 3824 free_slab(slab); 3825 return result; 3826 } 3827 3828 if (is_new) { 3829 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NEW); 3830 result = allocate_slab_counters(slab); 3831 if (result != VDO_SUCCESS) { 3832 free_slab(slab); 3833 return result; 3834 } 3835 } else { 3836 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 3837 } 3838 3839 *slab_ptr = slab; 3840 return VDO_SUCCESS; 3841 } 3842 3843 /** 3844 * allocate_slabs() - Allocate a new slab pointer array. 3845 * @depot: The depot. 3846 * @slab_count: The number of slabs the depot should have in the new array. 3847 * 3848 * Any existing slab pointers will be copied into the new array, and slabs will be allocated as 3849 * needed. The newly allocated slabs will not be distributed for use by the block allocators. 3850 * 3851 * Return: VDO_SUCCESS or an error code. 3852 */ 3853 static int allocate_slabs(struct slab_depot *depot, slab_count_t slab_count) 3854 { 3855 block_count_t slab_size; 3856 bool resizing = false; 3857 physical_block_number_t slab_origin; 3858 int result; 3859 3860 result = vdo_allocate(slab_count, struct vdo_slab *, 3861 "slab pointer array", &depot->new_slabs); 3862 if (result != VDO_SUCCESS) 3863 return result; 3864 3865 if (depot->slabs != NULL) { 3866 memcpy(depot->new_slabs, depot->slabs, 3867 depot->slab_count * sizeof(struct vdo_slab *)); 3868 resizing = true; 3869 } 3870 3871 slab_size = depot->slab_config.slab_blocks; 3872 slab_origin = depot->first_block + (depot->slab_count * slab_size); 3873 3874 for (depot->new_slab_count = depot->slab_count; 3875 depot->new_slab_count < slab_count; 3876 depot->new_slab_count++, slab_origin += slab_size) { 3877 struct block_allocator *allocator = 3878 &depot->allocators[depot->new_slab_count % depot->zone_count]; 3879 struct vdo_slab **slab_ptr = &depot->new_slabs[depot->new_slab_count]; 3880 3881 result = make_slab(slab_origin, allocator, depot->new_slab_count, 3882 resizing, slab_ptr); 3883 if (result != VDO_SUCCESS) 3884 return result; 3885 } 3886 3887 return VDO_SUCCESS; 3888 } 3889 3890 /** 3891 * vdo_abandon_new_slabs() - Abandon any new slabs in this depot, freeing them as needed. 3892 * @depot: The depot. 3893 */ 3894 void vdo_abandon_new_slabs(struct slab_depot *depot) 3895 { 3896 slab_count_t i; 3897 3898 if (depot->new_slabs == NULL) 3899 return; 3900 3901 for (i = depot->slab_count; i < depot->new_slab_count; i++) 3902 free_slab(vdo_forget(depot->new_slabs[i])); 3903 depot->new_slab_count = 0; 3904 depot->new_size = 0; 3905 vdo_free(vdo_forget(depot->new_slabs)); 3906 } 3907 3908 /** Implements vdo_zone_thread_getter_fn. */ 3909 static thread_id_t get_allocator_thread_id(void *context, zone_count_t zone_number) 3910 { 3911 return ((struct slab_depot *) context)->allocators[zone_number].thread_id; 3912 } 3913 3914 /** 3915 * release_recovery_journal_lock() - Request the slab journal to release the recovery journal lock 3916 * it may hold on a specified recovery journal block. 3917 * @journal: The slab journal. 3918 * @recovery_lock: The sequence number of the recovery journal block whose locks should be 3919 * released. 3920 * 3921 * Return: True if the journal released a lock on the specified block. 3922 */ 3923 static bool __must_check release_recovery_journal_lock(struct slab_journal *journal, 3924 sequence_number_t recovery_lock) 3925 { 3926 if (recovery_lock > journal->recovery_lock) { 3927 VDO_ASSERT_LOG_ONLY((recovery_lock < journal->recovery_lock), 3928 "slab journal recovery lock is not older than the recovery journal head"); 3929 return false; 3930 } 3931 3932 if ((recovery_lock < journal->recovery_lock) || 3933 vdo_is_read_only(journal->slab->allocator->depot->vdo)) 3934 return false; 3935 3936 /* All locks are held by the block which is in progress; write it. */ 3937 commit_tail(journal); 3938 return true; 3939 } 3940 3941 /* 3942 * Request a commit of all dirty tail blocks which are locking the recovery journal block the depot 3943 * is seeking to release. 3944 * 3945 * Implements vdo_zone_action_fn. 3946 */ 3947 static void release_tail_block_locks(void *context, zone_count_t zone_number, 3948 struct vdo_completion *parent) 3949 { 3950 struct slab_journal *journal, *tmp; 3951 struct slab_depot *depot = context; 3952 struct list_head *list = &depot->allocators[zone_number].dirty_slab_journals; 3953 3954 list_for_each_entry_safe(journal, tmp, list, dirty_entry) { 3955 if (!release_recovery_journal_lock(journal, 3956 depot->active_release_request)) 3957 break; 3958 } 3959 3960 vdo_finish_completion(parent); 3961 } 3962 3963 /** 3964 * prepare_for_tail_block_commit() - Prepare to commit oldest tail blocks. 3965 * @context: The slab depot. 3966 * @parent: The parent operation. 3967 * 3968 * Implements vdo_action_preamble_fn. 3969 */ 3970 static void prepare_for_tail_block_commit(void *context, struct vdo_completion *parent) 3971 { 3972 struct slab_depot *depot = context; 3973 3974 depot->active_release_request = depot->new_release_request; 3975 vdo_finish_completion(parent); 3976 } 3977 3978 /** 3979 * schedule_tail_block_commit() - Schedule a tail block commit if necessary. 3980 * @context: The slab depot. 3981 * 3982 * This method should not be called directly. Rather, call vdo_schedule_default_action() on the 3983 * depot's action manager. 3984 * 3985 * Implements vdo_action_scheduler_fn. 3986 */ 3987 static bool schedule_tail_block_commit(void *context) 3988 { 3989 struct slab_depot *depot = context; 3990 3991 if (depot->new_release_request == depot->active_release_request) 3992 return false; 3993 3994 return vdo_schedule_action(depot->action_manager, 3995 prepare_for_tail_block_commit, 3996 release_tail_block_locks, 3997 NULL, NULL); 3998 } 3999 4000 /** 4001 * initialize_slab_scrubber() - Initialize an allocator's slab scrubber. 4002 * @allocator: The allocator being initialized 4003 * 4004 * Return: VDO_SUCCESS or an error. 4005 */ 4006 static int initialize_slab_scrubber(struct block_allocator *allocator) 4007 { 4008 struct slab_scrubber *scrubber = &allocator->scrubber; 4009 block_count_t slab_journal_size = 4010 allocator->depot->slab_config.slab_journal_blocks; 4011 char *journal_data; 4012 int result; 4013 4014 result = vdo_allocate(VDO_BLOCK_SIZE * slab_journal_size, 4015 char, __func__, &journal_data); 4016 if (result != VDO_SUCCESS) 4017 return result; 4018 4019 result = allocate_vio_components(allocator->completion.vdo, 4020 VIO_TYPE_SLAB_JOURNAL, 4021 VIO_PRIORITY_METADATA, 4022 allocator, slab_journal_size, 4023 journal_data, &scrubber->vio); 4024 if (result != VDO_SUCCESS) { 4025 vdo_free(journal_data); 4026 return result; 4027 } 4028 4029 INIT_LIST_HEAD(&scrubber->high_priority_slabs); 4030 INIT_LIST_HEAD(&scrubber->slabs); 4031 vdo_set_admin_state_code(&scrubber->admin_state, VDO_ADMIN_STATE_SUSPENDED); 4032 return VDO_SUCCESS; 4033 } 4034 4035 /** 4036 * initialize_slab_summary_block() - Initialize a slab_summary_block. 4037 * @allocator: The allocator which owns the block. 4038 * @index: The index of this block in its zone's summary. 4039 * 4040 * Return: VDO_SUCCESS or an error. 4041 */ 4042 static int __must_check initialize_slab_summary_block(struct block_allocator *allocator, 4043 block_count_t index) 4044 { 4045 struct slab_summary_block *block = &allocator->summary_blocks[index]; 4046 int result; 4047 4048 result = vdo_allocate(VDO_BLOCK_SIZE, char, __func__, &block->outgoing_entries); 4049 if (result != VDO_SUCCESS) 4050 return result; 4051 4052 result = allocate_vio_components(allocator->depot->vdo, VIO_TYPE_SLAB_SUMMARY, 4053 VIO_PRIORITY_METADATA, NULL, 1, 4054 block->outgoing_entries, &block->vio); 4055 if (result != VDO_SUCCESS) 4056 return result; 4057 4058 block->allocator = allocator; 4059 block->entries = &allocator->summary_entries[VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK * index]; 4060 block->index = index; 4061 return VDO_SUCCESS; 4062 } 4063 4064 static int __must_check initialize_block_allocator(struct slab_depot *depot, 4065 zone_count_t zone) 4066 { 4067 int result; 4068 block_count_t i; 4069 struct block_allocator *allocator = &depot->allocators[zone]; 4070 struct vdo *vdo = depot->vdo; 4071 block_count_t max_free_blocks = depot->slab_config.data_blocks; 4072 unsigned int max_priority = (2 + ilog2(max_free_blocks)); 4073 u32 reference_block_count, refcount_reads_needed, refcount_blocks_per_vio; 4074 4075 *allocator = (struct block_allocator) { 4076 .depot = depot, 4077 .zone_number = zone, 4078 .thread_id = vdo->thread_config.physical_threads[zone], 4079 .nonce = vdo->states.vdo.nonce, 4080 }; 4081 4082 INIT_LIST_HEAD(&allocator->dirty_slab_journals); 4083 vdo_set_admin_state_code(&allocator->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 4084 result = vdo_register_read_only_listener(vdo, allocator, 4085 notify_block_allocator_of_read_only_mode, 4086 allocator->thread_id); 4087 if (result != VDO_SUCCESS) 4088 return result; 4089 4090 vdo_initialize_completion(&allocator->completion, vdo, VDO_BLOCK_ALLOCATOR_COMPLETION); 4091 result = make_vio_pool(vdo, BLOCK_ALLOCATOR_VIO_POOL_SIZE, 1, allocator->thread_id, 4092 VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA, 4093 allocator, &allocator->vio_pool); 4094 if (result != VDO_SUCCESS) 4095 return result; 4096 4097 /* Initialize the refcount-reading vio pool. */ 4098 reference_block_count = vdo_get_saved_reference_count_size(depot->slab_config.slab_blocks); 4099 refcount_reads_needed = DIV_ROUND_UP(reference_block_count, MAX_BLOCKS_PER_VIO); 4100 refcount_blocks_per_vio = DIV_ROUND_UP(reference_block_count, refcount_reads_needed); 4101 allocator->refcount_blocks_per_big_vio = refcount_blocks_per_vio; 4102 result = make_vio_pool(vdo, BLOCK_ALLOCATOR_REFCOUNT_VIO_POOL_SIZE, 4103 allocator->refcount_blocks_per_big_vio, allocator->thread_id, 4104 VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA, 4105 NULL, &allocator->refcount_big_vio_pool); 4106 if (result != VDO_SUCCESS) 4107 return result; 4108 4109 result = initialize_slab_scrubber(allocator); 4110 if (result != VDO_SUCCESS) 4111 return result; 4112 4113 result = vdo_make_priority_table(max_priority, &allocator->prioritized_slabs); 4114 if (result != VDO_SUCCESS) 4115 return result; 4116 4117 result = vdo_allocate(VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE, 4118 struct slab_summary_block, __func__, 4119 &allocator->summary_blocks); 4120 if (result != VDO_SUCCESS) 4121 return result; 4122 4123 vdo_set_admin_state_code(&allocator->summary_state, 4124 VDO_ADMIN_STATE_NORMAL_OPERATION); 4125 allocator->summary_entries = depot->summary_entries + (MAX_VDO_SLABS * zone); 4126 4127 /* Initialize each summary block. */ 4128 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) { 4129 result = initialize_slab_summary_block(allocator, i); 4130 if (result != VDO_SUCCESS) 4131 return result; 4132 } 4133 4134 /* 4135 * Performing well atop thin provisioned storage requires either that VDO discards freed 4136 * blocks, or that the block allocator try to use slabs that already have allocated blocks 4137 * in preference to slabs that have never been opened. For reasons we have not been able to 4138 * fully understand, some SSD machines have been have been very sensitive (50% reduction in 4139 * test throughput) to very slight differences in the timing and locality of block 4140 * allocation. Assigning a low priority to unopened slabs (max_priority/2, say) would be 4141 * ideal for the story, but anything less than a very high threshold (max_priority - 1) 4142 * hurts on these machines. 4143 * 4144 * This sets the free block threshold for preferring to open an unopened slab to the binary 4145 * floor of 3/4ths the total number of data blocks in a slab, which will generally evaluate 4146 * to about half the slab size. 4147 */ 4148 allocator->unopened_slab_priority = (1 + ilog2((max_free_blocks * 3) / 4)); 4149 4150 return VDO_SUCCESS; 4151 } 4152 4153 static int allocate_components(struct slab_depot *depot, 4154 struct partition *summary_partition) 4155 { 4156 int result; 4157 zone_count_t zone; 4158 slab_count_t slab_count; 4159 u8 hint; 4160 u32 i; 4161 const struct thread_config *thread_config = &depot->vdo->thread_config; 4162 4163 result = vdo_make_action_manager(depot->zone_count, get_allocator_thread_id, 4164 thread_config->journal_thread, depot, 4165 schedule_tail_block_commit, 4166 depot->vdo, &depot->action_manager); 4167 if (result != VDO_SUCCESS) 4168 return result; 4169 4170 depot->origin = depot->first_block; 4171 4172 /* block size must be a multiple of entry size */ 4173 BUILD_BUG_ON((VDO_BLOCK_SIZE % sizeof(struct slab_summary_entry)) != 0); 4174 4175 depot->summary_origin = summary_partition->offset; 4176 depot->hint_shift = vdo_get_slab_summary_hint_shift(depot->slab_size_shift); 4177 result = vdo_allocate(MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES, 4178 struct slab_summary_entry, __func__, 4179 &depot->summary_entries); 4180 if (result != VDO_SUCCESS) 4181 return result; 4182 4183 4184 /* Initialize all the entries. */ 4185 hint = compute_fullness_hint(depot, depot->slab_config.data_blocks); 4186 for (i = 0; i < MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES; i++) { 4187 /* 4188 * This default tail block offset must be reflected in 4189 * slabJournal.c::read_slab_journal_tail(). 4190 */ 4191 depot->summary_entries[i] = (struct slab_summary_entry) { 4192 .tail_block_offset = 0, 4193 .fullness_hint = hint, 4194 .load_ref_counts = false, 4195 .is_dirty = false, 4196 }; 4197 } 4198 4199 slab_count = vdo_compute_slab_count(depot->first_block, depot->last_block, 4200 depot->slab_size_shift); 4201 if (thread_config->physical_zone_count > slab_count) { 4202 return vdo_log_error_strerror(VDO_BAD_CONFIGURATION, 4203 "%u physical zones exceeds slab count %u", 4204 thread_config->physical_zone_count, 4205 slab_count); 4206 } 4207 4208 /* Initialize the block allocators. */ 4209 for (zone = 0; zone < depot->zone_count; zone++) { 4210 result = initialize_block_allocator(depot, zone); 4211 if (result != VDO_SUCCESS) 4212 return result; 4213 } 4214 4215 /* Allocate slabs. */ 4216 result = allocate_slabs(depot, slab_count); 4217 if (result != VDO_SUCCESS) 4218 return result; 4219 4220 /* Use the new slabs. */ 4221 for (i = depot->slab_count; i < depot->new_slab_count; i++) { 4222 struct vdo_slab *slab = depot->new_slabs[i]; 4223 4224 register_slab_with_allocator(slab->allocator, slab); 4225 WRITE_ONCE(depot->slab_count, depot->slab_count + 1); 4226 } 4227 4228 depot->slabs = depot->new_slabs; 4229 depot->new_slabs = NULL; 4230 depot->new_slab_count = 0; 4231 4232 return VDO_SUCCESS; 4233 } 4234 4235 /** 4236 * vdo_decode_slab_depot() - Make a slab depot and configure it with the state read from the super 4237 * block. 4238 * @state: The slab depot state from the super block. 4239 * @vdo: The VDO which will own the depot. 4240 * @summary_partition: The partition which holds the slab summary. 4241 * @depot_ptr: A pointer to hold the depot. 4242 * 4243 * Return: A success or error code. 4244 */ 4245 int vdo_decode_slab_depot(struct slab_depot_state_2_0 state, struct vdo *vdo, 4246 struct partition *summary_partition, 4247 struct slab_depot **depot_ptr) 4248 { 4249 unsigned int slab_size_shift; 4250 struct slab_depot *depot; 4251 int result; 4252 4253 /* 4254 * Calculate the bit shift for efficiently mapping block numbers to slabs. Using a shift 4255 * requires that the slab size be a power of two. 4256 */ 4257 block_count_t slab_size = state.slab_config.slab_blocks; 4258 4259 if (!is_power_of_2(slab_size)) { 4260 return vdo_log_error_strerror(UDS_INVALID_ARGUMENT, 4261 "slab size must be a power of two"); 4262 } 4263 slab_size_shift = ilog2(slab_size); 4264 4265 result = vdo_allocate_extended(struct slab_depot, 4266 vdo->thread_config.physical_zone_count, 4267 struct block_allocator, __func__, &depot); 4268 if (result != VDO_SUCCESS) 4269 return result; 4270 4271 depot->vdo = vdo; 4272 depot->old_zone_count = state.zone_count; 4273 depot->zone_count = vdo->thread_config.physical_zone_count; 4274 depot->slab_config = state.slab_config; 4275 depot->first_block = state.first_block; 4276 depot->last_block = state.last_block; 4277 depot->slab_size_shift = slab_size_shift; 4278 4279 result = allocate_components(depot, summary_partition); 4280 if (result != VDO_SUCCESS) { 4281 vdo_free_slab_depot(depot); 4282 return result; 4283 } 4284 4285 *depot_ptr = depot; 4286 return VDO_SUCCESS; 4287 } 4288 4289 static void uninitialize_allocator_summary(struct block_allocator *allocator) 4290 { 4291 block_count_t i; 4292 4293 if (allocator->summary_blocks == NULL) 4294 return; 4295 4296 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) { 4297 free_vio_components(&allocator->summary_blocks[i].vio); 4298 vdo_free(vdo_forget(allocator->summary_blocks[i].outgoing_entries)); 4299 } 4300 4301 vdo_free(vdo_forget(allocator->summary_blocks)); 4302 } 4303 4304 /** 4305 * vdo_free_slab_depot() - Destroy a slab depot. 4306 * @depot: The depot to destroy. 4307 */ 4308 void vdo_free_slab_depot(struct slab_depot *depot) 4309 { 4310 zone_count_t zone = 0; 4311 4312 if (depot == NULL) 4313 return; 4314 4315 vdo_abandon_new_slabs(depot); 4316 4317 for (zone = 0; zone < depot->zone_count; zone++) { 4318 struct block_allocator *allocator = &depot->allocators[zone]; 4319 4320 if (allocator->eraser != NULL) 4321 dm_kcopyd_client_destroy(vdo_forget(allocator->eraser)); 4322 4323 uninitialize_allocator_summary(allocator); 4324 uninitialize_scrubber_vio(&allocator->scrubber); 4325 free_vio_pool(vdo_forget(allocator->vio_pool)); 4326 free_vio_pool(vdo_forget(allocator->refcount_big_vio_pool)); 4327 vdo_free_priority_table(vdo_forget(allocator->prioritized_slabs)); 4328 } 4329 4330 if (depot->slabs != NULL) { 4331 slab_count_t i; 4332 4333 for (i = 0; i < depot->slab_count; i++) 4334 free_slab(vdo_forget(depot->slabs[i])); 4335 } 4336 4337 vdo_free(vdo_forget(depot->slabs)); 4338 vdo_free(vdo_forget(depot->action_manager)); 4339 vdo_free(vdo_forget(depot->summary_entries)); 4340 vdo_free(depot); 4341 } 4342 4343 /** 4344 * vdo_record_slab_depot() - Record the state of a slab depot for encoding into the super block. 4345 * @depot: The depot to encode. 4346 * 4347 * Return: The depot state. 4348 */ 4349 struct slab_depot_state_2_0 vdo_record_slab_depot(const struct slab_depot *depot) 4350 { 4351 /* 4352 * If this depot is currently using 0 zones, it must have been synchronously loaded by a 4353 * tool and is now being saved. We did not load and combine the slab summary, so we still 4354 * need to do that next time we load with the old zone count rather than 0. 4355 */ 4356 struct slab_depot_state_2_0 state; 4357 zone_count_t zones_to_record = depot->zone_count; 4358 4359 if (depot->zone_count == 0) 4360 zones_to_record = depot->old_zone_count; 4361 4362 state = (struct slab_depot_state_2_0) { 4363 .slab_config = depot->slab_config, 4364 .first_block = depot->first_block, 4365 .last_block = depot->last_block, 4366 .zone_count = zones_to_record, 4367 }; 4368 4369 return state; 4370 } 4371 4372 /** 4373 * vdo_allocate_reference_counters() - Allocate the reference counters for all slabs in the depot. 4374 * @depot: The slab depot. 4375 * 4376 * Context: This method may be called only before entering normal operation from the load thread. 4377 * 4378 * Return: VDO_SUCCESS or an error. 4379 */ 4380 int vdo_allocate_reference_counters(struct slab_depot *depot) 4381 { 4382 struct slab_iterator iterator = 4383 get_depot_slab_iterator(depot, depot->slab_count - 1, 0, 1); 4384 4385 while (iterator.next != NULL) { 4386 int result = allocate_slab_counters(next_slab(&iterator)); 4387 4388 if (result != VDO_SUCCESS) 4389 return result; 4390 } 4391 4392 return VDO_SUCCESS; 4393 } 4394 4395 /** 4396 * get_slab_number() - Get the number of the slab that contains a specified block. 4397 * @depot: The slab depot. 4398 * @pbn: The physical block number. 4399 * @slab_number_ptr: A pointer to hold the slab number. 4400 * 4401 * Return: VDO_SUCCESS or an error. 4402 */ 4403 static int __must_check get_slab_number(const struct slab_depot *depot, 4404 physical_block_number_t pbn, 4405 slab_count_t *slab_number_ptr) 4406 { 4407 slab_count_t slab_number; 4408 4409 if (pbn < depot->first_block) 4410 return VDO_OUT_OF_RANGE; 4411 4412 slab_number = (pbn - depot->first_block) >> depot->slab_size_shift; 4413 if (slab_number >= depot->slab_count) 4414 return VDO_OUT_OF_RANGE; 4415 4416 *slab_number_ptr = slab_number; 4417 return VDO_SUCCESS; 4418 } 4419 4420 /** 4421 * vdo_get_slab() - Get the slab object for the slab that contains a specified block. 4422 * @depot: The slab depot. 4423 * @pbn: The physical block number. 4424 * 4425 * Will put the VDO in read-only mode if the PBN is not a valid data block nor the zero block. 4426 * 4427 * Return: The slab containing the block, or NULL if the block number is the zero block or 4428 * otherwise out of range. 4429 */ 4430 struct vdo_slab *vdo_get_slab(const struct slab_depot *depot, 4431 physical_block_number_t pbn) 4432 { 4433 slab_count_t slab_number; 4434 int result; 4435 4436 if (pbn == VDO_ZERO_BLOCK) 4437 return NULL; 4438 4439 result = get_slab_number(depot, pbn, &slab_number); 4440 if (result != VDO_SUCCESS) { 4441 vdo_enter_read_only_mode(depot->vdo, result); 4442 return NULL; 4443 } 4444 4445 return depot->slabs[slab_number]; 4446 } 4447 4448 /** 4449 * vdo_get_increment_limit() - Determine how many new references a block can acquire. 4450 * @depot: The slab depot. 4451 * @pbn: The physical block number that is being queried. 4452 * 4453 * Context: This method must be called from the physical zone thread of the PBN. 4454 * 4455 * Return: The number of available references. 4456 */ 4457 u8 vdo_get_increment_limit(struct slab_depot *depot, physical_block_number_t pbn) 4458 { 4459 struct vdo_slab *slab = vdo_get_slab(depot, pbn); 4460 vdo_refcount_t *counter_ptr = NULL; 4461 int result; 4462 4463 if ((slab == NULL) || (slab->status != VDO_SLAB_REBUILT)) 4464 return 0; 4465 4466 result = get_reference_counter(slab, pbn, &counter_ptr); 4467 if (result != VDO_SUCCESS) 4468 return 0; 4469 4470 if (*counter_ptr == PROVISIONAL_REFERENCE_COUNT) 4471 return (MAXIMUM_REFERENCE_COUNT - 1); 4472 4473 return (MAXIMUM_REFERENCE_COUNT - *counter_ptr); 4474 } 4475 4476 /** 4477 * vdo_is_physical_data_block() - Determine whether the given PBN refers to a data block. 4478 * @depot: The depot. 4479 * @pbn: The physical block number to ask about. 4480 * 4481 * Return: True if the PBN corresponds to a data block. 4482 */ 4483 bool vdo_is_physical_data_block(const struct slab_depot *depot, 4484 physical_block_number_t pbn) 4485 { 4486 slab_count_t slab_number; 4487 slab_block_number sbn; 4488 4489 return ((pbn == VDO_ZERO_BLOCK) || 4490 ((get_slab_number(depot, pbn, &slab_number) == VDO_SUCCESS) && 4491 (slab_block_number_from_pbn(depot->slabs[slab_number], pbn, &sbn) == 4492 VDO_SUCCESS))); 4493 } 4494 4495 /** 4496 * vdo_get_slab_depot_allocated_blocks() - Get the total number of data blocks allocated across all 4497 * the slabs in the depot. 4498 * @depot: The slab depot. 4499 * 4500 * This is the total number of blocks with a non-zero reference count. 4501 * 4502 * Context: This may be called from any thread. 4503 * 4504 * Return: The total number of blocks with a non-zero reference count. 4505 */ 4506 block_count_t vdo_get_slab_depot_allocated_blocks(const struct slab_depot *depot) 4507 { 4508 block_count_t total = 0; 4509 zone_count_t zone; 4510 4511 for (zone = 0; zone < depot->zone_count; zone++) { 4512 /* The allocators are responsible for thread safety. */ 4513 total += READ_ONCE(depot->allocators[zone].allocated_blocks); 4514 } 4515 4516 return total; 4517 } 4518 4519 /** 4520 * vdo_get_slab_depot_data_blocks() - Get the total number of data blocks in all the slabs in the 4521 * depot. 4522 * @depot: The slab depot. 4523 * 4524 * Context: This may be called from any thread. 4525 * 4526 * Return: The total number of data blocks in all slabs. 4527 */ 4528 block_count_t vdo_get_slab_depot_data_blocks(const struct slab_depot *depot) 4529 { 4530 return (READ_ONCE(depot->slab_count) * depot->slab_config.data_blocks); 4531 } 4532 4533 /** 4534 * finish_combining_zones() - Clean up after saving out the combined slab summary. 4535 * @completion: The vio which was used to write the summary data. 4536 */ 4537 static void finish_combining_zones(struct vdo_completion *completion) 4538 { 4539 int result = completion->result; 4540 struct vdo_completion *parent = completion->parent; 4541 4542 free_vio(as_vio(vdo_forget(completion))); 4543 vdo_fail_completion(parent, result); 4544 } 4545 4546 static void handle_combining_error(struct vdo_completion *completion) 4547 { 4548 vio_record_metadata_io_error(as_vio(completion)); 4549 finish_combining_zones(completion); 4550 } 4551 4552 static void write_summary_endio(struct bio *bio) 4553 { 4554 struct vio *vio = bio->bi_private; 4555 struct vdo *vdo = vio->completion.vdo; 4556 4557 continue_vio_after_io(vio, finish_combining_zones, 4558 vdo->thread_config.admin_thread); 4559 } 4560 4561 /** 4562 * combine_summaries() - Treating the current entries buffer as the on-disk value of all zones, 4563 * update every zone to the correct values for every slab. 4564 * @depot: The depot whose summary entries should be combined. 4565 */ 4566 static void combine_summaries(struct slab_depot *depot) 4567 { 4568 /* 4569 * Combine all the old summary data into the portion of the buffer corresponding to the 4570 * first zone. 4571 */ 4572 zone_count_t zone = 0; 4573 struct slab_summary_entry *entries = depot->summary_entries; 4574 4575 if (depot->old_zone_count > 1) { 4576 slab_count_t entry_number; 4577 4578 for (entry_number = 0; entry_number < MAX_VDO_SLABS; entry_number++) { 4579 if (zone != 0) { 4580 memcpy(entries + entry_number, 4581 entries + (zone * MAX_VDO_SLABS) + entry_number, 4582 sizeof(struct slab_summary_entry)); 4583 } 4584 4585 zone++; 4586 if (zone == depot->old_zone_count) 4587 zone = 0; 4588 } 4589 } 4590 4591 /* Copy the combined data to each zones's region of the buffer. */ 4592 for (zone = 1; zone < MAX_VDO_PHYSICAL_ZONES; zone++) { 4593 memcpy(entries + (zone * MAX_VDO_SLABS), entries, 4594 MAX_VDO_SLABS * sizeof(struct slab_summary_entry)); 4595 } 4596 } 4597 4598 /** 4599 * finish_loading_summary() - Finish loading slab summary data. 4600 * @completion: The vio which was used to read the summary data. 4601 * 4602 * Combines the slab summary data from all the previously written zones and copies the combined 4603 * summary to each partition's data region. Then writes the combined summary back out to disk. This 4604 * callback is registered in load_summary_endio(). 4605 */ 4606 static void finish_loading_summary(struct vdo_completion *completion) 4607 { 4608 struct slab_depot *depot = completion->vdo->depot; 4609 4610 /* Combine the summary from each zone so each zone is correct for all slabs. */ 4611 combine_summaries(depot); 4612 4613 /* Write the combined summary back out. */ 4614 vdo_submit_metadata_vio(as_vio(completion), depot->summary_origin, 4615 write_summary_endio, handle_combining_error, 4616 REQ_OP_WRITE); 4617 } 4618 4619 static void load_summary_endio(struct bio *bio) 4620 { 4621 struct vio *vio = bio->bi_private; 4622 struct vdo *vdo = vio->completion.vdo; 4623 4624 continue_vio_after_io(vio, finish_loading_summary, 4625 vdo->thread_config.admin_thread); 4626 } 4627 4628 /** 4629 * load_slab_summary() - Load the slab summary before the slab data. 4630 * @context: The slab depot. 4631 * @parent: The load operation. 4632 * 4633 * Implements vdo_action_preamble_fn. 4634 */ 4635 static void load_slab_summary(void *context, struct vdo_completion *parent) 4636 { 4637 int result; 4638 struct vio *vio; 4639 struct slab_depot *depot = context; 4640 const struct admin_state_code *operation = 4641 vdo_get_current_manager_operation(depot->action_manager); 4642 4643 result = create_multi_block_metadata_vio(depot->vdo, VIO_TYPE_SLAB_SUMMARY, 4644 VIO_PRIORITY_METADATA, parent, 4645 VDO_SLAB_SUMMARY_BLOCKS, 4646 (char *) depot->summary_entries, &vio); 4647 if (result != VDO_SUCCESS) { 4648 vdo_fail_completion(parent, result); 4649 return; 4650 } 4651 4652 if ((operation == VDO_ADMIN_STATE_FORMATTING) || 4653 (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD)) { 4654 finish_loading_summary(&vio->completion); 4655 return; 4656 } 4657 4658 vdo_submit_metadata_vio(vio, depot->summary_origin, load_summary_endio, 4659 handle_combining_error, REQ_OP_READ); 4660 } 4661 4662 /* Implements vdo_zone_action_fn. */ 4663 static void load_allocator(void *context, zone_count_t zone_number, 4664 struct vdo_completion *parent) 4665 { 4666 struct slab_depot *depot = context; 4667 4668 vdo_start_loading(&depot->allocators[zone_number].state, 4669 vdo_get_current_manager_operation(depot->action_manager), 4670 parent, initiate_load); 4671 } 4672 4673 /** 4674 * vdo_load_slab_depot() - Asynchronously load any slab depot state that isn't included in the 4675 * super_block component. 4676 * @depot: The depot to load. 4677 * @operation: The type of load to perform. 4678 * @parent: The completion to notify when the load is complete. 4679 * @context: Additional context for the load operation; may be NULL. 4680 * 4681 * This method may be called only before entering normal operation from the load thread. 4682 */ 4683 void vdo_load_slab_depot(struct slab_depot *depot, 4684 const struct admin_state_code *operation, 4685 struct vdo_completion *parent, void *context) 4686 { 4687 if (!vdo_assert_load_operation(operation, parent)) 4688 return; 4689 4690 vdo_schedule_operation_with_context(depot->action_manager, operation, 4691 load_slab_summary, load_allocator, 4692 NULL, context, parent); 4693 } 4694 4695 /* Implements vdo_zone_action_fn. */ 4696 static void prepare_to_allocate(void *context, zone_count_t zone_number, 4697 struct vdo_completion *parent) 4698 { 4699 struct slab_depot *depot = context; 4700 struct block_allocator *allocator = &depot->allocators[zone_number]; 4701 int result; 4702 4703 result = vdo_prepare_slabs_for_allocation(allocator); 4704 if (result != VDO_SUCCESS) { 4705 vdo_fail_completion(parent, result); 4706 return; 4707 } 4708 4709 scrub_slabs(allocator, parent); 4710 } 4711 4712 /** 4713 * vdo_prepare_slab_depot_to_allocate() - Prepare the slab depot to come online and start 4714 * allocating blocks. 4715 * @depot: The depot to prepare. 4716 * @load_type: The load type. 4717 * @parent: The completion to notify when the operation is complete. 4718 * 4719 * This method may be called only before entering normal operation from the load thread. It must be 4720 * called before allocation may proceed. 4721 */ 4722 void vdo_prepare_slab_depot_to_allocate(struct slab_depot *depot, 4723 enum slab_depot_load_type load_type, 4724 struct vdo_completion *parent) 4725 { 4726 depot->load_type = load_type; 4727 atomic_set(&depot->zones_to_scrub, depot->zone_count); 4728 vdo_schedule_action(depot->action_manager, NULL, 4729 prepare_to_allocate, NULL, parent); 4730 } 4731 4732 /** 4733 * vdo_update_slab_depot_size() - Update the slab depot to reflect its new size in memory. 4734 * @depot: The depot to update. 4735 * 4736 * This size is saved to disk as part of the super block. 4737 */ 4738 void vdo_update_slab_depot_size(struct slab_depot *depot) 4739 { 4740 depot->last_block = depot->new_last_block; 4741 } 4742 4743 /** 4744 * vdo_prepare_to_grow_slab_depot() - Allocate new memory needed for a resize of a slab depot to 4745 * the given size. 4746 * @depot: The depot to prepare to resize. 4747 * @partition: The new depot partition. 4748 * 4749 * Return: VDO_SUCCESS or an error. 4750 */ 4751 int vdo_prepare_to_grow_slab_depot(struct slab_depot *depot, 4752 const struct partition *partition) 4753 { 4754 struct slab_depot_state_2_0 new_state; 4755 int result; 4756 slab_count_t new_slab_count; 4757 4758 if ((partition->count >> depot->slab_size_shift) <= depot->slab_count) 4759 return VDO_INCREMENT_TOO_SMALL; 4760 4761 /* Generate the depot configuration for the new block count. */ 4762 VDO_ASSERT_LOG_ONLY(depot->first_block == partition->offset, 4763 "New slab depot partition doesn't change origin"); 4764 result = vdo_configure_slab_depot(partition, depot->slab_config, 4765 depot->zone_count, &new_state); 4766 if (result != VDO_SUCCESS) 4767 return result; 4768 4769 new_slab_count = vdo_compute_slab_count(depot->first_block, 4770 new_state.last_block, 4771 depot->slab_size_shift); 4772 if (new_slab_count <= depot->slab_count) 4773 return vdo_log_error_strerror(VDO_INCREMENT_TOO_SMALL, 4774 "Depot can only grow"); 4775 if (new_slab_count == depot->new_slab_count) { 4776 /* Check it out, we've already got all the new slabs allocated! */ 4777 return VDO_SUCCESS; 4778 } 4779 4780 vdo_abandon_new_slabs(depot); 4781 result = allocate_slabs(depot, new_slab_count); 4782 if (result != VDO_SUCCESS) { 4783 vdo_abandon_new_slabs(depot); 4784 return result; 4785 } 4786 4787 depot->new_size = partition->count; 4788 depot->old_last_block = depot->last_block; 4789 depot->new_last_block = new_state.last_block; 4790 4791 return VDO_SUCCESS; 4792 } 4793 4794 /** 4795 * finish_registration() - Finish registering new slabs now that all of the allocators have 4796 * received their new slabs. 4797 * @context: The slab depot. 4798 * 4799 * Implements vdo_action_conclusion_fn. 4800 */ 4801 static int finish_registration(void *context) 4802 { 4803 struct slab_depot *depot = context; 4804 4805 WRITE_ONCE(depot->slab_count, depot->new_slab_count); 4806 vdo_free(depot->slabs); 4807 depot->slabs = depot->new_slabs; 4808 depot->new_slabs = NULL; 4809 depot->new_slab_count = 0; 4810 return VDO_SUCCESS; 4811 } 4812 4813 /* Implements vdo_zone_action_fn. */ 4814 static void register_new_slabs(void *context, zone_count_t zone_number, 4815 struct vdo_completion *parent) 4816 { 4817 struct slab_depot *depot = context; 4818 struct block_allocator *allocator = &depot->allocators[zone_number]; 4819 slab_count_t i; 4820 4821 for (i = depot->slab_count; i < depot->new_slab_count; i++) { 4822 struct vdo_slab *slab = depot->new_slabs[i]; 4823 4824 if (slab->allocator == allocator) 4825 register_slab_with_allocator(allocator, slab); 4826 } 4827 4828 vdo_finish_completion(parent); 4829 } 4830 4831 /** 4832 * vdo_use_new_slabs() - Use the new slabs allocated for resize. 4833 * @depot: The depot. 4834 * @parent: The object to notify when complete. 4835 */ 4836 void vdo_use_new_slabs(struct slab_depot *depot, struct vdo_completion *parent) 4837 { 4838 VDO_ASSERT_LOG_ONLY(depot->new_slabs != NULL, "Must have new slabs to use"); 4839 vdo_schedule_operation(depot->action_manager, 4840 VDO_ADMIN_STATE_SUSPENDED_OPERATION, 4841 NULL, register_new_slabs, 4842 finish_registration, parent); 4843 } 4844 4845 /** 4846 * stop_scrubbing() - Tell the scrubber to stop scrubbing after it finishes the slab it is 4847 * currently working on. 4848 * @allocator: The block allocator owning the scrubber to stop. 4849 */ 4850 static void stop_scrubbing(struct block_allocator *allocator) 4851 { 4852 struct slab_scrubber *scrubber = &allocator->scrubber; 4853 4854 if (vdo_is_state_quiescent(&scrubber->admin_state)) { 4855 vdo_finish_completion(&allocator->completion); 4856 } else { 4857 vdo_start_draining(&scrubber->admin_state, 4858 VDO_ADMIN_STATE_SUSPENDING, 4859 &allocator->completion, NULL); 4860 } 4861 } 4862 4863 /* Implements vdo_admin_initiator_fn. */ 4864 static void initiate_summary_drain(struct admin_state *state) 4865 { 4866 check_summary_drain_complete(container_of(state, struct block_allocator, 4867 summary_state)); 4868 } 4869 4870 static void do_drain_step(struct vdo_completion *completion) 4871 { 4872 struct block_allocator *allocator = vdo_as_block_allocator(completion); 4873 4874 vdo_prepare_completion_for_requeue(&allocator->completion, do_drain_step, 4875 handle_operation_error, allocator->thread_id, 4876 NULL); 4877 switch (++allocator->drain_step) { 4878 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER: 4879 stop_scrubbing(allocator); 4880 return; 4881 4882 case VDO_DRAIN_ALLOCATOR_STEP_SLABS: 4883 apply_to_slabs(allocator, do_drain_step); 4884 return; 4885 4886 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY: 4887 vdo_start_draining(&allocator->summary_state, 4888 vdo_get_admin_state_code(&allocator->state), 4889 completion, initiate_summary_drain); 4890 return; 4891 4892 case VDO_DRAIN_ALLOCATOR_STEP_FINISHED: 4893 VDO_ASSERT_LOG_ONLY(!is_vio_pool_busy(allocator->vio_pool), 4894 "vio pool not busy"); 4895 vdo_finish_draining_with_result(&allocator->state, completion->result); 4896 return; 4897 4898 default: 4899 vdo_finish_draining_with_result(&allocator->state, UDS_BAD_STATE); 4900 } 4901 } 4902 4903 /* Implements vdo_admin_initiator_fn. */ 4904 static void initiate_drain(struct admin_state *state) 4905 { 4906 struct block_allocator *allocator = 4907 container_of(state, struct block_allocator, state); 4908 4909 allocator->drain_step = VDO_DRAIN_ALLOCATOR_START; 4910 do_drain_step(&allocator->completion); 4911 } 4912 4913 /* 4914 * Drain all allocator I/O. Depending upon the type of drain, some or all dirty metadata may be 4915 * written to disk. The type of drain will be determined from the state of the allocator's depot. 4916 * 4917 * Implements vdo_zone_action_fn. 4918 */ 4919 static void drain_allocator(void *context, zone_count_t zone_number, 4920 struct vdo_completion *parent) 4921 { 4922 struct slab_depot *depot = context; 4923 4924 vdo_start_draining(&depot->allocators[zone_number].state, 4925 vdo_get_current_manager_operation(depot->action_manager), 4926 parent, initiate_drain); 4927 } 4928 4929 /** 4930 * vdo_drain_slab_depot() - Drain all slab depot I/O. 4931 * @depot: The depot to drain. 4932 * @operation: The drain operation (flush, rebuild, suspend, or save). 4933 * @parent: The completion to finish when the drain is complete. 4934 * 4935 * If saving, or flushing, all dirty depot metadata will be written out. If saving or suspending, 4936 * the depot will be left in a suspended state. 4937 */ 4938 void vdo_drain_slab_depot(struct slab_depot *depot, 4939 const struct admin_state_code *operation, 4940 struct vdo_completion *parent) 4941 { 4942 vdo_schedule_operation(depot->action_manager, operation, 4943 NULL, drain_allocator, NULL, parent); 4944 } 4945 4946 /** 4947 * resume_scrubbing() - Tell the scrubber to resume scrubbing if it has been stopped. 4948 * @allocator: The allocator being resumed. 4949 */ 4950 static void resume_scrubbing(struct block_allocator *allocator) 4951 { 4952 int result; 4953 struct slab_scrubber *scrubber = &allocator->scrubber; 4954 4955 if (!has_slabs_to_scrub(scrubber)) { 4956 vdo_finish_completion(&allocator->completion); 4957 return; 4958 } 4959 4960 result = vdo_resume_if_quiescent(&scrubber->admin_state); 4961 if (result != VDO_SUCCESS) { 4962 vdo_fail_completion(&allocator->completion, result); 4963 return; 4964 } 4965 4966 scrub_next_slab(scrubber); 4967 vdo_finish_completion(&allocator->completion); 4968 } 4969 4970 static void do_resume_step(struct vdo_completion *completion) 4971 { 4972 struct block_allocator *allocator = vdo_as_block_allocator(completion); 4973 4974 vdo_prepare_completion_for_requeue(&allocator->completion, do_resume_step, 4975 handle_operation_error, 4976 allocator->thread_id, NULL); 4977 switch (--allocator->drain_step) { 4978 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY: 4979 vdo_fail_completion(completion, 4980 vdo_resume_if_quiescent(&allocator->summary_state)); 4981 return; 4982 4983 case VDO_DRAIN_ALLOCATOR_STEP_SLABS: 4984 apply_to_slabs(allocator, do_resume_step); 4985 return; 4986 4987 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER: 4988 resume_scrubbing(allocator); 4989 return; 4990 4991 case VDO_DRAIN_ALLOCATOR_START: 4992 vdo_finish_resuming_with_result(&allocator->state, completion->result); 4993 return; 4994 4995 default: 4996 vdo_finish_resuming_with_result(&allocator->state, UDS_BAD_STATE); 4997 } 4998 } 4999 5000 /* Implements vdo_admin_initiator_fn. */ 5001 static void initiate_resume(struct admin_state *state) 5002 { 5003 struct block_allocator *allocator = 5004 container_of(state, struct block_allocator, state); 5005 5006 allocator->drain_step = VDO_DRAIN_ALLOCATOR_STEP_FINISHED; 5007 do_resume_step(&allocator->completion); 5008 } 5009 5010 /* Implements vdo_zone_action_fn. */ 5011 static void resume_allocator(void *context, zone_count_t zone_number, 5012 struct vdo_completion *parent) 5013 { 5014 struct slab_depot *depot = context; 5015 5016 vdo_start_resuming(&depot->allocators[zone_number].state, 5017 vdo_get_current_manager_operation(depot->action_manager), 5018 parent, initiate_resume); 5019 } 5020 5021 /** 5022 * vdo_resume_slab_depot() - Resume a suspended slab depot. 5023 * @depot: The depot to resume. 5024 * @parent: The completion to finish when the depot has resumed. 5025 */ 5026 void vdo_resume_slab_depot(struct slab_depot *depot, struct vdo_completion *parent) 5027 { 5028 if (vdo_is_read_only(depot->vdo)) { 5029 vdo_continue_completion(parent, VDO_READ_ONLY); 5030 return; 5031 } 5032 5033 vdo_schedule_operation(depot->action_manager, VDO_ADMIN_STATE_RESUMING, 5034 NULL, resume_allocator, NULL, parent); 5035 } 5036 5037 /** 5038 * vdo_commit_oldest_slab_journal_tail_blocks() - Commit all dirty tail blocks which are locking a 5039 * given recovery journal block. 5040 * @depot: The depot. 5041 * @recovery_block_number: The sequence number of the recovery journal block whose locks should be 5042 * released. 5043 * 5044 * Context: This method must be called from the journal zone thread. 5045 */ 5046 void vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot *depot, 5047 sequence_number_t recovery_block_number) 5048 { 5049 if (depot == NULL) 5050 return; 5051 5052 depot->new_release_request = recovery_block_number; 5053 vdo_schedule_default_action(depot->action_manager); 5054 } 5055 5056 /* Implements vdo_zone_action_fn. */ 5057 static void scrub_all_unrecovered_slabs(void *context, zone_count_t zone_number, 5058 struct vdo_completion *parent) 5059 { 5060 struct slab_depot *depot = context; 5061 5062 scrub_slabs(&depot->allocators[zone_number], NULL); 5063 vdo_launch_completion(parent); 5064 } 5065 5066 /** 5067 * vdo_scrub_all_unrecovered_slabs() - Scrub all unrecovered slabs. 5068 * @depot: The depot to scrub. 5069 * @parent: The object to notify when scrubbing has been launched for all zones. 5070 */ 5071 void vdo_scrub_all_unrecovered_slabs(struct slab_depot *depot, 5072 struct vdo_completion *parent) 5073 { 5074 vdo_schedule_action(depot->action_manager, NULL, 5075 scrub_all_unrecovered_slabs, 5076 NULL, parent); 5077 } 5078 5079 /** 5080 * get_block_allocator_statistics() - Get the total of the statistics from all the block allocators 5081 * in the depot. 5082 * @depot: The slab depot. 5083 * 5084 * Return: The statistics from all block allocators in the depot. 5085 */ 5086 static struct block_allocator_statistics __must_check 5087 get_block_allocator_statistics(const struct slab_depot *depot) 5088 { 5089 struct block_allocator_statistics totals; 5090 zone_count_t zone; 5091 5092 memset(&totals, 0, sizeof(totals)); 5093 5094 for (zone = 0; zone < depot->zone_count; zone++) { 5095 const struct block_allocator *allocator = &depot->allocators[zone]; 5096 const struct block_allocator_statistics *stats = &allocator->statistics; 5097 5098 totals.slab_count += allocator->slab_count; 5099 totals.slabs_opened += READ_ONCE(stats->slabs_opened); 5100 totals.slabs_reopened += READ_ONCE(stats->slabs_reopened); 5101 } 5102 5103 return totals; 5104 } 5105 5106 /** 5107 * get_ref_counts_statistics() - Get the cumulative ref_counts statistics for the depot. 5108 * @depot: The slab depot. 5109 * 5110 * Return: The cumulative statistics for all ref_counts in the depot. 5111 */ 5112 static struct ref_counts_statistics __must_check 5113 get_ref_counts_statistics(const struct slab_depot *depot) 5114 { 5115 struct ref_counts_statistics totals; 5116 zone_count_t zone; 5117 5118 memset(&totals, 0, sizeof(totals)); 5119 5120 for (zone = 0; zone < depot->zone_count; zone++) { 5121 totals.blocks_written += 5122 READ_ONCE(depot->allocators[zone].ref_counts_statistics.blocks_written); 5123 } 5124 5125 return totals; 5126 } 5127 5128 /** 5129 * get_slab_journal_statistics() - Get the aggregated slab journal statistics for the depot. 5130 * @depot: The slab depot. 5131 * 5132 * Return: The aggregated statistics for all slab journals in the depot. 5133 */ 5134 static struct slab_journal_statistics __must_check 5135 get_slab_journal_statistics(const struct slab_depot *depot) 5136 { 5137 struct slab_journal_statistics totals; 5138 zone_count_t zone; 5139 5140 memset(&totals, 0, sizeof(totals)); 5141 5142 for (zone = 0; zone < depot->zone_count; zone++) { 5143 const struct slab_journal_statistics *stats = 5144 &depot->allocators[zone].slab_journal_statistics; 5145 5146 totals.disk_full_count += READ_ONCE(stats->disk_full_count); 5147 totals.flush_count += READ_ONCE(stats->flush_count); 5148 totals.blocked_count += READ_ONCE(stats->blocked_count); 5149 totals.blocks_written += READ_ONCE(stats->blocks_written); 5150 totals.tail_busy_count += READ_ONCE(stats->tail_busy_count); 5151 } 5152 5153 return totals; 5154 } 5155 5156 /** 5157 * vdo_get_slab_depot_statistics() - Get all the vdo_statistics fields that are properties of the 5158 * slab depot. 5159 * @depot: The slab depot. 5160 * @stats: The vdo statistics structure to partially fill. 5161 */ 5162 void vdo_get_slab_depot_statistics(const struct slab_depot *depot, 5163 struct vdo_statistics *stats) 5164 { 5165 slab_count_t slab_count = READ_ONCE(depot->slab_count); 5166 slab_count_t unrecovered = 0; 5167 zone_count_t zone; 5168 5169 for (zone = 0; zone < depot->zone_count; zone++) { 5170 /* The allocators are responsible for thread safety. */ 5171 unrecovered += READ_ONCE(depot->allocators[zone].scrubber.slab_count); 5172 } 5173 5174 stats->recovery_percentage = (slab_count - unrecovered) * 100 / slab_count; 5175 stats->allocator = get_block_allocator_statistics(depot); 5176 stats->ref_counts = get_ref_counts_statistics(depot); 5177 stats->slab_journal = get_slab_journal_statistics(depot); 5178 stats->slab_summary = (struct slab_summary_statistics) { 5179 .blocks_written = atomic64_read(&depot->summary_statistics.blocks_written), 5180 }; 5181 } 5182 5183 /** 5184 * vdo_dump_slab_depot() - Dump the slab depot, in a thread-unsafe fashion. 5185 * @depot: The slab depot. 5186 */ 5187 void vdo_dump_slab_depot(const struct slab_depot *depot) 5188 { 5189 vdo_log_info("vdo slab depot"); 5190 vdo_log_info(" zone_count=%u old_zone_count=%u slabCount=%u active_release_request=%llu new_release_request=%llu", 5191 (unsigned int) depot->zone_count, 5192 (unsigned int) depot->old_zone_count, READ_ONCE(depot->slab_count), 5193 (unsigned long long) depot->active_release_request, 5194 (unsigned long long) depot->new_release_request); 5195 } 5196