xref: /linux/drivers/md/dm-vdo/slab-depot.c (revision fa8a4d3659d0c1ad73d5f59b2e0a6d408de5b317)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "slab-depot.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 #include <linux/err.h>
11 #include <linux/log2.h>
12 #include <linux/min_heap.h>
13 #include <linux/minmax.h>
14 
15 #include "logger.h"
16 #include "memory-alloc.h"
17 #include "numeric.h"
18 #include "permassert.h"
19 #include "string-utils.h"
20 
21 #include "action-manager.h"
22 #include "admin-state.h"
23 #include "completion.h"
24 #include "constants.h"
25 #include "data-vio.h"
26 #include "encodings.h"
27 #include "io-submitter.h"
28 #include "physical-zone.h"
29 #include "priority-table.h"
30 #include "recovery-journal.h"
31 #include "repair.h"
32 #include "status-codes.h"
33 #include "types.h"
34 #include "vdo.h"
35 #include "vio.h"
36 #include "wait-queue.h"
37 
38 static const u64 BYTES_PER_WORD = sizeof(u64);
39 static const bool NORMAL_OPERATION = true;
40 
41 /**
42  * get_lock() - Get the lock object for a slab journal block by sequence number.
43  * @journal: vdo_slab journal to retrieve from.
44  * @sequence_number: Sequence number of the block.
45  *
46  * Return: The lock object for the given sequence number.
47  */
48 static inline struct journal_lock * __must_check get_lock(struct slab_journal *journal,
49 							  sequence_number_t sequence_number)
50 {
51 	return &journal->locks[sequence_number % journal->size];
52 }
53 
54 static bool is_slab_open(struct vdo_slab *slab)
55 {
56 	return (!vdo_is_state_quiescing(&slab->state) &&
57 		!vdo_is_state_quiescent(&slab->state));
58 }
59 
60 /**
61  * must_make_entries_to_flush() - Check whether there are entry waiters which should delay a flush.
62  * @journal: The journal to check.
63  *
64  * Return: true if there are no entry waiters, or if the slab is unrecovered.
65  */
66 static inline bool __must_check must_make_entries_to_flush(struct slab_journal *journal)
67 {
68 	return ((journal->slab->status != VDO_SLAB_REBUILDING) &&
69 		vdo_waitq_has_waiters(&journal->entry_waiters));
70 }
71 
72 /**
73  * is_reaping() - Check whether a reap is currently in progress.
74  * @journal: The journal which may be reaping.
75  *
76  * Return: true if the journal is reaping.
77  */
78 static inline bool __must_check is_reaping(struct slab_journal *journal)
79 {
80 	return (journal->head != journal->unreapable);
81 }
82 
83 /**
84  * initialize_tail_block() - Initialize tail block as a new block.
85  * @journal: The journal whose tail block is being initialized.
86  */
87 static void initialize_tail_block(struct slab_journal *journal)
88 {
89 	struct slab_journal_block_header *header = &journal->tail_header;
90 
91 	header->sequence_number = journal->tail;
92 	header->entry_count = 0;
93 	header->has_block_map_increments = false;
94 }
95 
96 /**
97  * initialize_journal_state() - Set all journal fields appropriately to start journaling.
98  * @journal: The journal to be reset, based on its tail sequence number.
99  */
100 static void initialize_journal_state(struct slab_journal *journal)
101 {
102 	journal->unreapable = journal->head;
103 	journal->reap_lock = get_lock(journal, journal->unreapable);
104 	journal->next_commit = journal->tail;
105 	journal->summarized = journal->last_summarized = journal->tail;
106 	initialize_tail_block(journal);
107 }
108 
109 /**
110  * block_is_full() - Check whether a journal block is full.
111  * @journal: The slab journal for the block.
112  *
113  * Return: true if the tail block is full.
114  */
115 static bool __must_check block_is_full(struct slab_journal *journal)
116 {
117 	journal_entry_count_t count = journal->tail_header.entry_count;
118 
119 	return (journal->tail_header.has_block_map_increments ?
120 		(journal->full_entries_per_block == count) :
121 		(journal->entries_per_block == count));
122 }
123 
124 static void add_entries(struct slab_journal *journal);
125 static void update_tail_block_location(struct slab_journal *journal);
126 static void release_journal_locks(struct vdo_waiter *waiter, void *context);
127 
128 /**
129  * is_slab_journal_blank() - Check whether a slab's journal is blank.
130  *
131  * A slab journal is blank if it has never had any entries recorded in it.
132  *
133  * Return: true if the slab's journal has never been modified.
134  */
135 static bool is_slab_journal_blank(const struct vdo_slab *slab)
136 {
137 	return ((slab->journal.tail == 1) &&
138 		(slab->journal.tail_header.entry_count == 0));
139 }
140 
141 /**
142  * mark_slab_journal_dirty() - Put a slab journal on the dirty ring of its allocator in the correct
143  *                             order.
144  * @journal: The journal to be marked dirty.
145  * @lock: The recovery journal lock held by the slab journal.
146  */
147 static void mark_slab_journal_dirty(struct slab_journal *journal, sequence_number_t lock)
148 {
149 	struct slab_journal *dirty_journal;
150 	struct list_head *dirty_list = &journal->slab->allocator->dirty_slab_journals;
151 
152 	VDO_ASSERT_LOG_ONLY(journal->recovery_lock == 0, "slab journal was clean");
153 
154 	journal->recovery_lock = lock;
155 	list_for_each_entry_reverse(dirty_journal, dirty_list, dirty_entry) {
156 		if (dirty_journal->recovery_lock <= journal->recovery_lock)
157 			break;
158 	}
159 
160 	list_move_tail(&journal->dirty_entry, dirty_journal->dirty_entry.next);
161 }
162 
163 static void mark_slab_journal_clean(struct slab_journal *journal)
164 {
165 	journal->recovery_lock = 0;
166 	list_del_init(&journal->dirty_entry);
167 }
168 
169 static void check_if_slab_drained(struct vdo_slab *slab)
170 {
171 	bool read_only;
172 	struct slab_journal *journal = &slab->journal;
173 	const struct admin_state_code *code;
174 
175 	if (!vdo_is_state_draining(&slab->state) ||
176 	    must_make_entries_to_flush(journal) ||
177 	    is_reaping(journal) ||
178 	    journal->waiting_to_commit ||
179 	    !list_empty(&journal->uncommitted_blocks) ||
180 	    journal->updating_slab_summary ||
181 	    (slab->active_count > 0))
182 		return;
183 
184 	/* When not suspending or recovering, the slab must be clean. */
185 	code = vdo_get_admin_state_code(&slab->state);
186 	read_only = vdo_is_read_only(slab->allocator->depot->vdo);
187 	if (!read_only &&
188 	    vdo_waitq_has_waiters(&slab->dirty_blocks) &&
189 	    (code != VDO_ADMIN_STATE_SUSPENDING) &&
190 	    (code != VDO_ADMIN_STATE_RECOVERING))
191 		return;
192 
193 	vdo_finish_draining_with_result(&slab->state,
194 					(read_only ? VDO_READ_ONLY : VDO_SUCCESS));
195 }
196 
197 /* FULLNESS HINT COMPUTATION */
198 
199 /**
200  * compute_fullness_hint() - Translate a slab's free block count into a 'fullness hint' that can be
201  *                           stored in a slab_summary_entry's 7 bits that are dedicated to its free
202  *                           count.
203  * @depot: The depot whose summary being updated.
204  * @free_blocks: The number of free blocks.
205  *
206  * Note: the number of free blocks must be strictly less than 2^23 blocks, even though
207  * theoretically slabs could contain precisely 2^23 blocks; there is an assumption that at least
208  * one block is used by metadata. This assumption is necessary; otherwise, the fullness hint might
209  * overflow. The fullness hint formula is roughly (fullness >> 16) & 0x7f, but (2^23 >> 16) & 0x7f
210  * is 0, which would make it impossible to distinguish completely full from completely empty.
211  *
212  * Return: A fullness hint, which can be stored in 7 bits.
213  */
214 static u8 __must_check compute_fullness_hint(struct slab_depot *depot,
215 					     block_count_t free_blocks)
216 {
217 	block_count_t hint;
218 
219 	VDO_ASSERT_LOG_ONLY((free_blocks < (1 << 23)), "free blocks must be less than 2^23");
220 
221 	if (free_blocks == 0)
222 		return 0;
223 
224 	hint = free_blocks >> depot->hint_shift;
225 	return ((hint == 0) ? 1 : hint);
226 }
227 
228 /**
229  * check_summary_drain_complete() - Check whether an allocators summary has finished draining.
230  */
231 static void check_summary_drain_complete(struct block_allocator *allocator)
232 {
233 	if (!vdo_is_state_draining(&allocator->summary_state) ||
234 	    (allocator->summary_write_count > 0))
235 		return;
236 
237 	vdo_finish_operation(&allocator->summary_state,
238 			     (vdo_is_read_only(allocator->depot->vdo) ?
239 			      VDO_READ_ONLY : VDO_SUCCESS));
240 }
241 
242 /**
243  * notify_summary_waiters() - Wake all the waiters in a given queue.
244  * @allocator: The block allocator summary which owns the queue.
245  * @queue: The queue to notify.
246  */
247 static void notify_summary_waiters(struct block_allocator *allocator,
248 				   struct vdo_wait_queue *queue)
249 {
250 	int result = (vdo_is_read_only(allocator->depot->vdo) ?
251 		      VDO_READ_ONLY : VDO_SUCCESS);
252 
253 	vdo_waitq_notify_all_waiters(queue, NULL, &result);
254 }
255 
256 static void launch_write(struct slab_summary_block *summary_block);
257 
258 /**
259  * finish_updating_slab_summary_block() - Finish processing a block which attempted to write,
260  *                                        whether or not the attempt succeeded.
261  * @block: The block.
262  */
263 static void finish_updating_slab_summary_block(struct slab_summary_block *block)
264 {
265 	notify_summary_waiters(block->allocator, &block->current_update_waiters);
266 	block->writing = false;
267 	block->allocator->summary_write_count--;
268 	if (vdo_waitq_has_waiters(&block->next_update_waiters))
269 		launch_write(block);
270 	else
271 		check_summary_drain_complete(block->allocator);
272 }
273 
274 /**
275  * finish_update() - This is the callback for a successful summary block write.
276  * @completion: The write vio.
277  */
278 static void finish_update(struct vdo_completion *completion)
279 {
280 	struct slab_summary_block *block =
281 		container_of(as_vio(completion), struct slab_summary_block, vio);
282 
283 	atomic64_inc(&block->allocator->depot->summary_statistics.blocks_written);
284 	finish_updating_slab_summary_block(block);
285 }
286 
287 /**
288  * handle_write_error() - Handle an error writing a slab summary block.
289  * @completion: The write VIO.
290  */
291 static void handle_write_error(struct vdo_completion *completion)
292 {
293 	struct slab_summary_block *block =
294 		container_of(as_vio(completion), struct slab_summary_block, vio);
295 
296 	vio_record_metadata_io_error(as_vio(completion));
297 	vdo_enter_read_only_mode(completion->vdo, completion->result);
298 	finish_updating_slab_summary_block(block);
299 }
300 
301 static void write_slab_summary_endio(struct bio *bio)
302 {
303 	struct vio *vio = bio->bi_private;
304 	struct slab_summary_block *block =
305 		container_of(vio, struct slab_summary_block, vio);
306 
307 	continue_vio_after_io(vio, finish_update, block->allocator->thread_id);
308 }
309 
310 /**
311  * launch_write() - Write a slab summary block unless it is currently out for writing.
312  * @block: The block that needs to be committed.
313  */
314 static void launch_write(struct slab_summary_block *block)
315 {
316 	struct block_allocator *allocator = block->allocator;
317 	struct slab_depot *depot = allocator->depot;
318 	physical_block_number_t pbn;
319 
320 	if (block->writing)
321 		return;
322 
323 	allocator->summary_write_count++;
324 	vdo_waitq_transfer_all_waiters(&block->next_update_waiters,
325 				       &block->current_update_waiters);
326 	block->writing = true;
327 
328 	if (vdo_is_read_only(depot->vdo)) {
329 		finish_updating_slab_summary_block(block);
330 		return;
331 	}
332 
333 	memcpy(block->outgoing_entries, block->entries, VDO_BLOCK_SIZE);
334 
335 	/*
336 	 * Flush before writing to ensure that the slab journal tail blocks and reference updates
337 	 * covered by this summary update are stable. Otherwise, a subsequent recovery could
338 	 * encounter a slab summary update that refers to a slab journal tail block that has not
339 	 * actually been written. In such cases, the slab journal referenced will be treated as
340 	 * empty, causing any data within the slab which predates the existing recovery journal
341 	 * entries to be lost.
342 	 */
343 	pbn = (depot->summary_origin +
344 	       (VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE * allocator->zone_number) +
345 	       block->index);
346 	vdo_submit_metadata_vio(&block->vio, pbn, write_slab_summary_endio,
347 				handle_write_error, REQ_OP_WRITE | REQ_PREFLUSH);
348 }
349 
350 /**
351  * update_slab_summary_entry() - Update the entry for a slab.
352  * @slab: The slab whose entry is to be updated
353  * @waiter: The waiter that is updating the summary.
354  * @tail_block_offset: The offset of the slab journal's tail block.
355  * @load_ref_counts: Whether the reference counts must be loaded from disk on the vdo load.
356  * @is_clean: Whether the slab is clean.
357  * @free_blocks: The number of free blocks.
358  */
359 static void update_slab_summary_entry(struct vdo_slab *slab, struct vdo_waiter *waiter,
360 				      tail_block_offset_t tail_block_offset,
361 				      bool load_ref_counts, bool is_clean,
362 				      block_count_t free_blocks)
363 {
364 	u8 index = slab->slab_number / VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK;
365 	struct block_allocator *allocator = slab->allocator;
366 	struct slab_summary_block *block = &allocator->summary_blocks[index];
367 	int result;
368 	struct slab_summary_entry *entry;
369 
370 	if (vdo_is_read_only(block->vio.completion.vdo)) {
371 		result = VDO_READ_ONLY;
372 		waiter->callback(waiter, &result);
373 		return;
374 	}
375 
376 	if (vdo_is_state_draining(&allocator->summary_state) ||
377 	    vdo_is_state_quiescent(&allocator->summary_state)) {
378 		result = VDO_INVALID_ADMIN_STATE;
379 		waiter->callback(waiter, &result);
380 		return;
381 	}
382 
383 	entry = &allocator->summary_entries[slab->slab_number];
384 	*entry = (struct slab_summary_entry) {
385 		.tail_block_offset = tail_block_offset,
386 		.load_ref_counts = (entry->load_ref_counts || load_ref_counts),
387 		.is_dirty = !is_clean,
388 		.fullness_hint = compute_fullness_hint(allocator->depot, free_blocks),
389 	};
390 	vdo_waitq_enqueue_waiter(&block->next_update_waiters, waiter);
391 	launch_write(block);
392 }
393 
394 /**
395  * finish_reaping() - Actually advance the head of the journal now that any necessary flushes are
396  *                    complete.
397  * @journal: The journal to be reaped.
398  */
399 static void finish_reaping(struct slab_journal *journal)
400 {
401 	journal->head = journal->unreapable;
402 	add_entries(journal);
403 	check_if_slab_drained(journal->slab);
404 }
405 
406 static void reap_slab_journal(struct slab_journal *journal);
407 
408 /**
409  * complete_reaping() - Finish reaping now that we have flushed the lower layer and then try
410  *                      reaping again in case we deferred reaping due to an outstanding vio.
411  * @completion: The flush vio.
412  */
413 static void complete_reaping(struct vdo_completion *completion)
414 {
415 	struct slab_journal *journal = completion->parent;
416 
417 	return_vio_to_pool(journal->slab->allocator->vio_pool,
418 			   vio_as_pooled_vio(as_vio(vdo_forget(completion))));
419 	finish_reaping(journal);
420 	reap_slab_journal(journal);
421 }
422 
423 /**
424  * handle_flush_error() - Handle an error flushing the lower layer.
425  * @completion: The flush vio.
426  */
427 static void handle_flush_error(struct vdo_completion *completion)
428 {
429 	vio_record_metadata_io_error(as_vio(completion));
430 	vdo_enter_read_only_mode(completion->vdo, completion->result);
431 	complete_reaping(completion);
432 }
433 
434 static void flush_endio(struct bio *bio)
435 {
436 	struct vio *vio = bio->bi_private;
437 	struct slab_journal *journal = vio->completion.parent;
438 
439 	continue_vio_after_io(vio, complete_reaping,
440 			      journal->slab->allocator->thread_id);
441 }
442 
443 /**
444  * flush_for_reaping() - A waiter callback for getting a vio with which to flush the lower layer
445  *                       prior to reaping.
446  * @waiter: The journal as a flush waiter.
447  * @context: The newly acquired flush vio.
448  */
449 static void flush_for_reaping(struct vdo_waiter *waiter, void *context)
450 {
451 	struct slab_journal *journal =
452 		container_of(waiter, struct slab_journal, flush_waiter);
453 	struct pooled_vio *pooled = context;
454 	struct vio *vio = &pooled->vio;
455 
456 	vio->completion.parent = journal;
457 	vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
458 }
459 
460 /**
461  * reap_slab_journal() - Conduct a reap on a slab journal to reclaim unreferenced blocks.
462  * @journal: The slab journal.
463  */
464 static void reap_slab_journal(struct slab_journal *journal)
465 {
466 	bool reaped = false;
467 
468 	if (is_reaping(journal)) {
469 		/* We already have a reap in progress so wait for it to finish. */
470 		return;
471 	}
472 
473 	if ((journal->slab->status != VDO_SLAB_REBUILT) ||
474 	    !vdo_is_state_normal(&journal->slab->state) ||
475 	    vdo_is_read_only(journal->slab->allocator->depot->vdo)) {
476 		/*
477 		 * We must not reap in the first two cases, and there's no point in read-only mode.
478 		 */
479 		return;
480 	}
481 
482 	/*
483 	 * Start reclaiming blocks only when the journal head has no references. Then stop when a
484 	 * block is referenced or reap reaches the most recently written block, referenced by the
485 	 * slab summary, which has the sequence number just before the tail.
486 	 */
487 	while ((journal->unreapable < journal->tail) && (journal->reap_lock->count == 0)) {
488 		reaped = true;
489 		journal->unreapable++;
490 		journal->reap_lock++;
491 		if (journal->reap_lock == &journal->locks[journal->size])
492 			journal->reap_lock = &journal->locks[0];
493 	}
494 
495 	if (!reaped)
496 		return;
497 
498 	/*
499 	 * It is never safe to reap a slab journal block without first issuing a flush, regardless
500 	 * of whether a user flush has been received or not. In the absence of the flush, the
501 	 * reference block write which released the locks allowing the slab journal to reap may not
502 	 * be persisted. Although slab summary writes will eventually issue flushes, multiple slab
503 	 * journal block writes can be issued while previous slab summary updates have not yet been
504 	 * made. Even though those slab journal block writes will be ignored if the slab summary
505 	 * update is not persisted, they may still overwrite the to-be-reaped slab journal block
506 	 * resulting in a loss of reference count updates.
507 	 */
508 	journal->flush_waiter.callback = flush_for_reaping;
509 	acquire_vio_from_pool(journal->slab->allocator->vio_pool,
510 			      &journal->flush_waiter);
511 }
512 
513 /**
514  * adjust_slab_journal_block_reference() - Adjust the reference count for a slab journal block.
515  * @journal: The slab journal.
516  * @sequence_number: The journal sequence number of the referenced block.
517  * @adjustment: Amount to adjust the reference counter.
518  *
519  * Note that when the adjustment is negative, the slab journal will be reaped.
520  */
521 static void adjust_slab_journal_block_reference(struct slab_journal *journal,
522 						sequence_number_t sequence_number,
523 						int adjustment)
524 {
525 	struct journal_lock *lock;
526 
527 	if (sequence_number == 0)
528 		return;
529 
530 	if (journal->slab->status == VDO_SLAB_REPLAYING) {
531 		/* Locks should not be used during offline replay. */
532 		return;
533 	}
534 
535 	VDO_ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero");
536 	lock = get_lock(journal, sequence_number);
537 	if (adjustment < 0) {
538 		VDO_ASSERT_LOG_ONLY((-adjustment <= lock->count),
539 				    "adjustment %d of lock count %u for slab journal block %llu must not underflow",
540 				    adjustment, lock->count,
541 				    (unsigned long long) sequence_number);
542 	}
543 
544 	lock->count += adjustment;
545 	if (lock->count == 0)
546 		reap_slab_journal(journal);
547 }
548 
549 /**
550  * release_journal_locks() - Callback invoked after a slab summary update completes.
551  * @waiter: The slab summary waiter that has just been notified.
552  * @context: The result code of the update.
553  *
554  * Registered in the constructor on behalf of update_tail_block_location().
555  *
556  * Implements waiter_callback_fn.
557  */
558 static void release_journal_locks(struct vdo_waiter *waiter, void *context)
559 {
560 	sequence_number_t first, i;
561 	struct slab_journal *journal =
562 		container_of(waiter, struct slab_journal, slab_summary_waiter);
563 	int result = *((int *) context);
564 
565 	if (result != VDO_SUCCESS) {
566 		if (result != VDO_READ_ONLY) {
567 			/*
568 			 * Don't bother logging what might be lots of errors if we are already in
569 			 * read-only mode.
570 			 */
571 			vdo_log_error_strerror(result, "failed slab summary update %llu",
572 					       (unsigned long long) journal->summarized);
573 		}
574 
575 		journal->updating_slab_summary = false;
576 		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
577 		check_if_slab_drained(journal->slab);
578 		return;
579 	}
580 
581 	if (journal->partial_write_in_progress && (journal->summarized == journal->tail)) {
582 		journal->partial_write_in_progress = false;
583 		add_entries(journal);
584 	}
585 
586 	first = journal->last_summarized;
587 	journal->last_summarized = journal->summarized;
588 	for (i = journal->summarized - 1; i >= first; i--) {
589 		/*
590 		 * Release the lock the summarized block held on the recovery journal. (During
591 		 * replay, recovery_start will always be 0.)
592 		 */
593 		if (journal->recovery_journal != NULL) {
594 			zone_count_t zone_number = journal->slab->allocator->zone_number;
595 			struct journal_lock *lock = get_lock(journal, i);
596 
597 			vdo_release_recovery_journal_block_reference(journal->recovery_journal,
598 								     lock->recovery_start,
599 								     VDO_ZONE_TYPE_PHYSICAL,
600 								     zone_number);
601 		}
602 
603 		/*
604 		 * Release our own lock against reaping for blocks that are committed. (This
605 		 * function will not change locks during replay.)
606 		 */
607 		adjust_slab_journal_block_reference(journal, i, -1);
608 	}
609 
610 	journal->updating_slab_summary = false;
611 
612 	reap_slab_journal(journal);
613 
614 	/* Check if the slab summary needs to be updated again. */
615 	update_tail_block_location(journal);
616 }
617 
618 /**
619  * update_tail_block_location() - Update the tail block location in the slab summary, if necessary.
620  * @journal: The slab journal that is updating its tail block location.
621  */
622 static void update_tail_block_location(struct slab_journal *journal)
623 {
624 	block_count_t free_block_count;
625 	struct vdo_slab *slab = journal->slab;
626 
627 	if (journal->updating_slab_summary ||
628 	    vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
629 	    (journal->last_summarized >= journal->next_commit)) {
630 		check_if_slab_drained(slab);
631 		return;
632 	}
633 
634 	if (slab->status != VDO_SLAB_REBUILT) {
635 		u8 hint = slab->allocator->summary_entries[slab->slab_number].fullness_hint;
636 
637 		free_block_count = ((block_count_t) hint) << slab->allocator->depot->hint_shift;
638 	} else {
639 		free_block_count = slab->free_blocks;
640 	}
641 
642 	journal->summarized = journal->next_commit;
643 	journal->updating_slab_summary = true;
644 
645 	/*
646 	 * Update slab summary as dirty.
647 	 * vdo_slab journal can only reap past sequence number 1 when all the ref counts for this
648 	 * slab have been written to the layer. Therefore, indicate that the ref counts must be
649 	 * loaded when the journal head has reaped past sequence number 1.
650 	 */
651 	update_slab_summary_entry(slab, &journal->slab_summary_waiter,
652 				  journal->summarized % journal->size,
653 				  (journal->head > 1), false, free_block_count);
654 }
655 
656 /**
657  * reopen_slab_journal() - Reopen a slab's journal by emptying it and then adding pending entries.
658  */
659 static void reopen_slab_journal(struct vdo_slab *slab)
660 {
661 	struct slab_journal *journal = &slab->journal;
662 	sequence_number_t block;
663 
664 	VDO_ASSERT_LOG_ONLY(journal->tail_header.entry_count == 0,
665 			    "vdo_slab journal's active block empty before reopening");
666 	journal->head = journal->tail;
667 	initialize_journal_state(journal);
668 
669 	/* Ensure no locks are spuriously held on an empty journal. */
670 	for (block = 1; block <= journal->size; block++) {
671 		VDO_ASSERT_LOG_ONLY((get_lock(journal, block)->count == 0),
672 				    "Scrubbed journal's block %llu is not locked",
673 				    (unsigned long long) block);
674 	}
675 
676 	add_entries(journal);
677 }
678 
679 static sequence_number_t get_committing_sequence_number(const struct pooled_vio *vio)
680 {
681 	const struct packed_slab_journal_block *block =
682 		(const struct packed_slab_journal_block *) vio->vio.data;
683 
684 	return __le64_to_cpu(block->header.sequence_number);
685 }
686 
687 /**
688  * complete_write() - Handle post-commit processing.
689  * @completion: The write vio as a completion.
690  *
691  * This is the callback registered by write_slab_journal_block().
692  */
693 static void complete_write(struct vdo_completion *completion)
694 {
695 	int result = completion->result;
696 	struct pooled_vio *pooled = vio_as_pooled_vio(as_vio(completion));
697 	struct slab_journal *journal = completion->parent;
698 	sequence_number_t committed = get_committing_sequence_number(pooled);
699 
700 	list_del_init(&pooled->list_entry);
701 	return_vio_to_pool(journal->slab->allocator->vio_pool, vdo_forget(pooled));
702 
703 	if (result != VDO_SUCCESS) {
704 		vio_record_metadata_io_error(as_vio(completion));
705 		vdo_log_error_strerror(result, "cannot write slab journal block %llu",
706 				       (unsigned long long) committed);
707 		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
708 		check_if_slab_drained(journal->slab);
709 		return;
710 	}
711 
712 	WRITE_ONCE(journal->events->blocks_written, journal->events->blocks_written + 1);
713 
714 	if (list_empty(&journal->uncommitted_blocks)) {
715 		/* If no blocks are outstanding, then the commit point is at the tail. */
716 		journal->next_commit = journal->tail;
717 	} else {
718 		/* The commit point is always the beginning of the oldest incomplete block. */
719 		pooled = container_of(journal->uncommitted_blocks.next,
720 				      struct pooled_vio, list_entry);
721 		journal->next_commit = get_committing_sequence_number(pooled);
722 	}
723 
724 	update_tail_block_location(journal);
725 }
726 
727 static void write_slab_journal_endio(struct bio *bio)
728 {
729 	struct vio *vio = bio->bi_private;
730 	struct slab_journal *journal = vio->completion.parent;
731 
732 	continue_vio_after_io(vio, complete_write, journal->slab->allocator->thread_id);
733 }
734 
735 /**
736  * write_slab_journal_block() - Write a slab journal block.
737  * @waiter: The vio pool waiter which was just notified.
738  * @context: The vio pool entry for the write.
739  *
740  * Callback from acquire_vio_from_pool() registered in commit_tail().
741  */
742 static void write_slab_journal_block(struct vdo_waiter *waiter, void *context)
743 {
744 	struct pooled_vio *pooled = context;
745 	struct vio *vio = &pooled->vio;
746 	struct slab_journal *journal =
747 		container_of(waiter, struct slab_journal, resource_waiter);
748 	struct slab_journal_block_header *header = &journal->tail_header;
749 	int unused_entries = journal->entries_per_block - header->entry_count;
750 	physical_block_number_t block_number;
751 	const struct admin_state_code *operation;
752 
753 	header->head = journal->head;
754 	list_add_tail(&pooled->list_entry, &journal->uncommitted_blocks);
755 	vdo_pack_slab_journal_block_header(header, &journal->block->header);
756 
757 	/* Copy the tail block into the vio. */
758 	memcpy(pooled->vio.data, journal->block, VDO_BLOCK_SIZE);
759 
760 	VDO_ASSERT_LOG_ONLY(unused_entries >= 0, "vdo_slab journal block is not overfull");
761 	if (unused_entries > 0) {
762 		/*
763 		 * Release the per-entry locks for any unused entries in the block we are about to
764 		 * write.
765 		 */
766 		adjust_slab_journal_block_reference(journal, header->sequence_number,
767 						    -unused_entries);
768 		journal->partial_write_in_progress = !block_is_full(journal);
769 	}
770 
771 	block_number = journal->slab->journal_origin +
772 		(header->sequence_number % journal->size);
773 	vio->completion.parent = journal;
774 
775 	/*
776 	 * This block won't be read in recovery until the slab summary is updated to refer to it.
777 	 * The slab summary update does a flush which is sufficient to protect us from corruption
778 	 * due to out of order slab journal, reference block, or block map writes.
779 	 */
780 	vdo_submit_metadata_vio(vdo_forget(vio), block_number, write_slab_journal_endio,
781 				complete_write, REQ_OP_WRITE);
782 
783 	/* Since the write is submitted, the tail block structure can be reused. */
784 	journal->tail++;
785 	initialize_tail_block(journal);
786 	journal->waiting_to_commit = false;
787 
788 	operation = vdo_get_admin_state_code(&journal->slab->state);
789 	if (operation == VDO_ADMIN_STATE_WAITING_FOR_RECOVERY) {
790 		vdo_finish_operation(&journal->slab->state,
791 				     (vdo_is_read_only(journal->slab->allocator->depot->vdo) ?
792 				      VDO_READ_ONLY : VDO_SUCCESS));
793 		return;
794 	}
795 
796 	add_entries(journal);
797 }
798 
799 /**
800  * commit_tail() - Commit the tail block of the slab journal.
801  * @journal: The journal whose tail block should be committed.
802  */
803 static void commit_tail(struct slab_journal *journal)
804 {
805 	if ((journal->tail_header.entry_count == 0) && must_make_entries_to_flush(journal)) {
806 		/*
807 		 * There are no entries at the moment, but there are some waiters, so defer
808 		 * initiating the flush until those entries are ready to write.
809 		 */
810 		return;
811 	}
812 
813 	if (vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
814 	    journal->waiting_to_commit ||
815 	    (journal->tail_header.entry_count == 0)) {
816 		/*
817 		 * There is nothing to do since the tail block is empty, or writing, or the journal
818 		 * is in read-only mode.
819 		 */
820 		return;
821 	}
822 
823 	/*
824 	 * Since we are about to commit the tail block, this journal no longer needs to be on the
825 	 * ring of journals which the recovery journal might ask to commit.
826 	 */
827 	mark_slab_journal_clean(journal);
828 
829 	journal->waiting_to_commit = true;
830 
831 	journal->resource_waiter.callback = write_slab_journal_block;
832 	acquire_vio_from_pool(journal->slab->allocator->vio_pool,
833 			      &journal->resource_waiter);
834 }
835 
836 /**
837  * encode_slab_journal_entry() - Encode a slab journal entry.
838  * @tail_header: The unpacked header for the block.
839  * @payload: The journal block payload to hold the entry.
840  * @sbn: The slab block number of the entry to encode.
841  * @operation: The type of the entry.
842  * @increment: True if this is an increment.
843  *
844  * Exposed for unit tests.
845  */
846 static void encode_slab_journal_entry(struct slab_journal_block_header *tail_header,
847 				      slab_journal_payload *payload,
848 				      slab_block_number sbn,
849 				      enum journal_operation operation,
850 				      bool increment)
851 {
852 	journal_entry_count_t entry_number = tail_header->entry_count++;
853 
854 	if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
855 		if (!tail_header->has_block_map_increments) {
856 			memset(payload->full_entries.entry_types, 0,
857 			       VDO_SLAB_JOURNAL_ENTRY_TYPES_SIZE);
858 			tail_header->has_block_map_increments = true;
859 		}
860 
861 		payload->full_entries.entry_types[entry_number / 8] |=
862 			((u8)1 << (entry_number % 8));
863 	}
864 
865 	vdo_pack_slab_journal_entry(&payload->entries[entry_number], sbn, increment);
866 }
867 
868 /**
869  * expand_journal_point() - Convert a recovery journal journal_point which refers to both an
870  *                          increment and a decrement to a single point which refers to one or the
871  *                          other.
872  * @recovery_point: The journal point to convert.
873  * @increment: Whether the current entry is an increment.
874  *
875  * Return: The expanded journal point
876  *
877  * Because each data_vio has but a single recovery journal point, but may need to make both
878  * increment and decrement entries in the same slab journal. In order to distinguish the two
879  * entries, the entry count of the expanded journal point is twice the actual recovery journal
880  * entry count for increments, and one more than that for decrements.
881  */
882 static struct journal_point expand_journal_point(struct journal_point recovery_point,
883 						 bool increment)
884 {
885 	recovery_point.entry_count *= 2;
886 	if (!increment)
887 		recovery_point.entry_count++;
888 
889 	return recovery_point;
890 }
891 
892 /**
893  * add_entry() - Actually add an entry to the slab journal, potentially firing off a write if a
894  *               block becomes full.
895  * @journal: The slab journal to append to.
896  * @pbn: The pbn being adjusted.
897  * @operation: The type of entry to make.
898  * @increment: True if this is an increment.
899  * @recovery_point: The expanded recovery point.
900  *
901  * This function is synchronous.
902  */
903 static void add_entry(struct slab_journal *journal, physical_block_number_t pbn,
904 		      enum journal_operation operation, bool increment,
905 		      struct journal_point recovery_point)
906 {
907 	struct packed_slab_journal_block *block = journal->block;
908 	int result;
909 
910 	result = VDO_ASSERT(vdo_before_journal_point(&journal->tail_header.recovery_point,
911 						     &recovery_point),
912 			    "recovery journal point is monotonically increasing, recovery point: %llu.%u, block recovery point: %llu.%u",
913 			    (unsigned long long) recovery_point.sequence_number,
914 			    recovery_point.entry_count,
915 			    (unsigned long long) journal->tail_header.recovery_point.sequence_number,
916 			    journal->tail_header.recovery_point.entry_count);
917 	if (result != VDO_SUCCESS) {
918 		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
919 		return;
920 	}
921 
922 	if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
923 		result = VDO_ASSERT((journal->tail_header.entry_count <
924 				     journal->full_entries_per_block),
925 				    "block has room for full entries");
926 		if (result != VDO_SUCCESS) {
927 			vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo,
928 						 result);
929 			return;
930 		}
931 	}
932 
933 	encode_slab_journal_entry(&journal->tail_header, &block->payload,
934 				  pbn - journal->slab->start, operation, increment);
935 	journal->tail_header.recovery_point = recovery_point;
936 	if (block_is_full(journal))
937 		commit_tail(journal);
938 }
939 
940 static inline block_count_t journal_length(const struct slab_journal *journal)
941 {
942 	return journal->tail - journal->head;
943 }
944 
945 /**
946  * vdo_attempt_replay_into_slab() - Replay a recovery journal entry into a slab's journal.
947  * @slab: The slab to play into.
948  * @pbn: The PBN for the entry.
949  * @operation: The type of entry to add.
950  * @increment: True if this entry is an increment.
951  * @recovery_point: The recovery journal point corresponding to this entry.
952  * @parent: The completion to notify when there is space to add the entry if the entry could not be
953  *          added immediately.
954  *
955  * Return: true if the entry was added immediately.
956  */
957 bool vdo_attempt_replay_into_slab(struct vdo_slab *slab, physical_block_number_t pbn,
958 				  enum journal_operation operation, bool increment,
959 				  struct journal_point *recovery_point,
960 				  struct vdo_completion *parent)
961 {
962 	struct slab_journal *journal = &slab->journal;
963 	struct slab_journal_block_header *header = &journal->tail_header;
964 	struct journal_point expanded = expand_journal_point(*recovery_point, increment);
965 
966 	/* Only accept entries after the current recovery point. */
967 	if (!vdo_before_journal_point(&journal->tail_header.recovery_point, &expanded))
968 		return true;
969 
970 	if ((header->entry_count >= journal->full_entries_per_block) &&
971 	    (header->has_block_map_increments || (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING))) {
972 		/*
973 		 * The tail block does not have room for the entry we are attempting to add so
974 		 * commit the tail block now.
975 		 */
976 		commit_tail(journal);
977 	}
978 
979 	if (journal->waiting_to_commit) {
980 		vdo_start_operation_with_waiter(&journal->slab->state,
981 						VDO_ADMIN_STATE_WAITING_FOR_RECOVERY,
982 						parent, NULL);
983 		return false;
984 	}
985 
986 	if (journal_length(journal) >= journal->size) {
987 		/*
988 		 * We must have reaped the current head before the crash, since the blocked
989 		 * threshold keeps us from having more entries than fit in a slab journal; hence we
990 		 * can just advance the head (and unreapable block), as needed.
991 		 */
992 		journal->head++;
993 		journal->unreapable++;
994 	}
995 
996 	if (journal->slab->status == VDO_SLAB_REBUILT)
997 		journal->slab->status = VDO_SLAB_REPLAYING;
998 
999 	add_entry(journal, pbn, operation, increment, expanded);
1000 	return true;
1001 }
1002 
1003 /**
1004  * requires_reaping() - Check whether the journal must be reaped before adding new entries.
1005  * @journal: The journal to check.
1006  *
1007  * Return: true if the journal must be reaped.
1008  */
1009 static bool requires_reaping(const struct slab_journal *journal)
1010 {
1011 	return (journal_length(journal) >= journal->blocking_threshold);
1012 }
1013 
1014 /** finish_summary_update() - A waiter callback that resets the writing state of a slab. */
1015 static void finish_summary_update(struct vdo_waiter *waiter, void *context)
1016 {
1017 	struct vdo_slab *slab = container_of(waiter, struct vdo_slab, summary_waiter);
1018 	int result = *((int *) context);
1019 
1020 	slab->active_count--;
1021 
1022 	if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) {
1023 		vdo_log_error_strerror(result, "failed to update slab summary");
1024 		vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1025 	}
1026 
1027 	check_if_slab_drained(slab);
1028 }
1029 
1030 static void write_reference_block(struct vdo_waiter *waiter, void *context);
1031 
1032 /**
1033  * launch_reference_block_write() - Launch the write of a dirty reference block by first acquiring
1034  *                                  a VIO for it from the pool.
1035  * @waiter: The waiter of the block which is starting to write.
1036  * @context: The parent slab of the block.
1037  *
1038  * This can be asynchronous since the writer will have to wait if all VIOs in the pool are
1039  * currently in use.
1040  */
1041 static void launch_reference_block_write(struct vdo_waiter *waiter, void *context)
1042 {
1043 	struct vdo_slab *slab = context;
1044 
1045 	if (vdo_is_read_only(slab->allocator->depot->vdo))
1046 		return;
1047 
1048 	slab->active_count++;
1049 	container_of(waiter, struct reference_block, waiter)->is_writing = true;
1050 	waiter->callback = write_reference_block;
1051 	acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
1052 }
1053 
1054 static void save_dirty_reference_blocks(struct vdo_slab *slab)
1055 {
1056 	vdo_waitq_notify_all_waiters(&slab->dirty_blocks,
1057 				     launch_reference_block_write, slab);
1058 	check_if_slab_drained(slab);
1059 }
1060 
1061 /**
1062  * finish_reference_block_write() - After a reference block has written, clean it, release its
1063  *                                  locks, and return its VIO to the pool.
1064  * @completion: The VIO that just finished writing.
1065  */
1066 static void finish_reference_block_write(struct vdo_completion *completion)
1067 {
1068 	struct vio *vio = as_vio(completion);
1069 	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1070 	struct reference_block *block = completion->parent;
1071 	struct vdo_slab *slab = block->slab;
1072 	tail_block_offset_t offset;
1073 
1074 	slab->active_count--;
1075 
1076 	/* Release the slab journal lock. */
1077 	adjust_slab_journal_block_reference(&slab->journal,
1078 					    block->slab_journal_lock_to_release, -1);
1079 	return_vio_to_pool(slab->allocator->vio_pool, pooled);
1080 
1081 	/*
1082 	 * We can't clear the is_writing flag earlier as releasing the slab journal lock may cause
1083 	 * us to be dirtied again, but we don't want to double enqueue.
1084 	 */
1085 	block->is_writing = false;
1086 
1087 	if (vdo_is_read_only(completion->vdo)) {
1088 		check_if_slab_drained(slab);
1089 		return;
1090 	}
1091 
1092 	/* Re-queue the block if it was re-dirtied while it was writing. */
1093 	if (block->is_dirty) {
1094 		vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1095 		if (vdo_is_state_draining(&slab->state)) {
1096 			/* We must be saving, and this block will otherwise not be relaunched. */
1097 			save_dirty_reference_blocks(slab);
1098 		}
1099 
1100 		return;
1101 	}
1102 
1103 	/*
1104 	 * Mark the slab as clean in the slab summary if there are no dirty or writing blocks
1105 	 * and no summary update in progress.
1106 	 */
1107 	if ((slab->active_count > 0) || vdo_waitq_has_waiters(&slab->dirty_blocks)) {
1108 		check_if_slab_drained(slab);
1109 		return;
1110 	}
1111 
1112 	offset = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
1113 	slab->active_count++;
1114 	slab->summary_waiter.callback = finish_summary_update;
1115 	update_slab_summary_entry(slab, &slab->summary_waiter, offset,
1116 				  true, true, slab->free_blocks);
1117 }
1118 
1119 /**
1120  * get_reference_counters_for_block() - Find the reference counters for a given block.
1121  * @block: The reference_block in question.
1122  *
1123  * Return: A pointer to the reference counters for this block.
1124  */
1125 static vdo_refcount_t * __must_check get_reference_counters_for_block(struct reference_block *block)
1126 {
1127 	size_t block_index = block - block->slab->reference_blocks;
1128 
1129 	return &block->slab->counters[block_index * COUNTS_PER_BLOCK];
1130 }
1131 
1132 /**
1133  * pack_reference_block() - Copy data from a reference block to a buffer ready to be written out.
1134  * @block: The block to copy.
1135  * @buffer: The char buffer to fill with the packed block.
1136  */
1137 static void pack_reference_block(struct reference_block *block, void *buffer)
1138 {
1139 	struct packed_reference_block *packed = buffer;
1140 	vdo_refcount_t *counters = get_reference_counters_for_block(block);
1141 	sector_count_t i;
1142 	struct packed_journal_point commit_point;
1143 
1144 	vdo_pack_journal_point(&block->slab->slab_journal_point, &commit_point);
1145 
1146 	for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
1147 		packed->sectors[i].commit_point = commit_point;
1148 		memcpy(packed->sectors[i].counts, counters + (i * COUNTS_PER_SECTOR),
1149 		       (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
1150 	}
1151 }
1152 
1153 static void write_reference_block_endio(struct bio *bio)
1154 {
1155 	struct vio *vio = bio->bi_private;
1156 	struct reference_block *block = vio->completion.parent;
1157 	thread_id_t thread_id = block->slab->allocator->thread_id;
1158 
1159 	continue_vio_after_io(vio, finish_reference_block_write, thread_id);
1160 }
1161 
1162 /**
1163  * handle_io_error() - Handle an I/O error reading or writing a reference count block.
1164  * @completion: The VIO doing the I/O as a completion.
1165  */
1166 static void handle_io_error(struct vdo_completion *completion)
1167 {
1168 	int result = completion->result;
1169 	struct vio *vio = as_vio(completion);
1170 	struct vdo_slab *slab = ((struct reference_block *) completion->parent)->slab;
1171 
1172 	vio_record_metadata_io_error(vio);
1173 	return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio));
1174 	slab->active_count--;
1175 	vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1176 	check_if_slab_drained(slab);
1177 }
1178 
1179 /**
1180  * write_reference_block() - After a dirty block waiter has gotten a VIO from the VIO pool, copy
1181  *                           its counters and associated data into the VIO, and launch the write.
1182  * @waiter: The waiter of the dirty block.
1183  * @context: The VIO returned by the pool.
1184  */
1185 static void write_reference_block(struct vdo_waiter *waiter, void *context)
1186 {
1187 	size_t block_offset;
1188 	physical_block_number_t pbn;
1189 	struct pooled_vio *pooled = context;
1190 	struct vdo_completion *completion = &pooled->vio.completion;
1191 	struct reference_block *block = container_of(waiter, struct reference_block,
1192 						     waiter);
1193 
1194 	pack_reference_block(block, pooled->vio.data);
1195 	block_offset = (block - block->slab->reference_blocks);
1196 	pbn = (block->slab->ref_counts_origin + block_offset);
1197 	block->slab_journal_lock_to_release = block->slab_journal_lock;
1198 	completion->parent = block;
1199 
1200 	/*
1201 	 * Mark the block as clean, since we won't be committing any updates that happen after this
1202 	 * moment. As long as VIO order is preserved, two VIOs updating this block at once will not
1203 	 * cause complications.
1204 	 */
1205 	block->is_dirty = false;
1206 
1207 	/*
1208 	 * Flush before writing to ensure that the recovery journal and slab journal entries which
1209 	 * cover this reference update are stable. This prevents data corruption that can be caused
1210 	 * by out of order writes.
1211 	 */
1212 	WRITE_ONCE(block->slab->allocator->ref_counts_statistics.blocks_written,
1213 		   block->slab->allocator->ref_counts_statistics.blocks_written + 1);
1214 
1215 	completion->callback_thread_id = ((struct block_allocator *) pooled->context)->thread_id;
1216 	vdo_submit_metadata_vio(&pooled->vio, pbn, write_reference_block_endio,
1217 				handle_io_error, REQ_OP_WRITE | REQ_PREFLUSH);
1218 }
1219 
1220 static void reclaim_journal_space(struct slab_journal *journal)
1221 {
1222 	block_count_t length = journal_length(journal);
1223 	struct vdo_slab *slab = journal->slab;
1224 	block_count_t write_count = vdo_waitq_num_waiters(&slab->dirty_blocks);
1225 	block_count_t written;
1226 
1227 	if ((length < journal->flushing_threshold) || (write_count == 0))
1228 		return;
1229 
1230 	/* The slab journal is over the first threshold, schedule some reference block writes. */
1231 	WRITE_ONCE(journal->events->flush_count, journal->events->flush_count + 1);
1232 	if (length < journal->flushing_deadline) {
1233 		/* Schedule more writes the closer to the deadline we get. */
1234 		write_count /= journal->flushing_deadline - length + 1;
1235 		write_count = max_t(block_count_t, write_count, 1);
1236 	}
1237 
1238 	for (written = 0; written < write_count; written++) {
1239 		vdo_waitq_notify_next_waiter(&slab->dirty_blocks,
1240 					     launch_reference_block_write, slab);
1241 	}
1242 }
1243 
1244 /**
1245  * reference_count_to_status() - Convert a reference count to a reference status.
1246  * @count: The count to convert.
1247  *
1248  * Return: The appropriate reference status.
1249  */
1250 static enum reference_status __must_check reference_count_to_status(vdo_refcount_t count)
1251 {
1252 	if (count == EMPTY_REFERENCE_COUNT)
1253 		return RS_FREE;
1254 	else if (count == 1)
1255 		return RS_SINGLE;
1256 	else if (count == PROVISIONAL_REFERENCE_COUNT)
1257 		return RS_PROVISIONAL;
1258 	else
1259 		return RS_SHARED;
1260 }
1261 
1262 /**
1263  * dirty_block() - Mark a reference count block as dirty, potentially adding it to the dirty queue
1264  *                 if it wasn't already dirty.
1265  * @block: The reference block to mark as dirty.
1266  */
1267 static void dirty_block(struct reference_block *block)
1268 {
1269 	if (block->is_dirty)
1270 		return;
1271 
1272 	block->is_dirty = true;
1273 	if (!block->is_writing)
1274 		vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1275 }
1276 
1277 /**
1278  * get_reference_block() - Get the reference block that covers the given block index.
1279  */
1280 static struct reference_block * __must_check get_reference_block(struct vdo_slab *slab,
1281 								 slab_block_number index)
1282 {
1283 	return &slab->reference_blocks[index / COUNTS_PER_BLOCK];
1284 }
1285 
1286 /**
1287  * slab_block_number_from_pbn() - Determine the index within the slab of a particular physical
1288  *                                block number.
1289  * @slab: The slab.
1290  * @physical_block_number: The physical block number.
1291  * @slab_block_number_ptr: A pointer to the slab block number.
1292  *
1293  * Return: VDO_SUCCESS or an error code.
1294  */
1295 static int __must_check slab_block_number_from_pbn(struct vdo_slab *slab,
1296 						   physical_block_number_t pbn,
1297 						   slab_block_number *slab_block_number_ptr)
1298 {
1299 	u64 slab_block_number;
1300 
1301 	if (pbn < slab->start)
1302 		return VDO_OUT_OF_RANGE;
1303 
1304 	slab_block_number = pbn - slab->start;
1305 	if (slab_block_number >= slab->allocator->depot->slab_config.data_blocks)
1306 		return VDO_OUT_OF_RANGE;
1307 
1308 	*slab_block_number_ptr = slab_block_number;
1309 	return VDO_SUCCESS;
1310 }
1311 
1312 /**
1313  * get_reference_counter() - Get the reference counter that covers the given physical block number.
1314  * @slab: The slab to query.
1315  * @pbn: The physical block number.
1316  * @counter_ptr: A pointer to the reference counter.
1317  */
1318 static int __must_check get_reference_counter(struct vdo_slab *slab,
1319 					      physical_block_number_t pbn,
1320 					      vdo_refcount_t **counter_ptr)
1321 {
1322 	slab_block_number index;
1323 	int result = slab_block_number_from_pbn(slab, pbn, &index);
1324 
1325 	if (result != VDO_SUCCESS)
1326 		return result;
1327 
1328 	*counter_ptr = &slab->counters[index];
1329 
1330 	return VDO_SUCCESS;
1331 }
1332 
1333 static unsigned int calculate_slab_priority(struct vdo_slab *slab)
1334 {
1335 	block_count_t free_blocks = slab->free_blocks;
1336 	unsigned int unopened_slab_priority = slab->allocator->unopened_slab_priority;
1337 	unsigned int priority;
1338 
1339 	/*
1340 	 * Wholly full slabs must be the only ones with lowest priority, 0.
1341 	 *
1342 	 * Slabs that have never been opened (empty, newly initialized, and never been written to)
1343 	 * have lower priority than previously opened slabs that have a significant number of free
1344 	 * blocks. This ranking causes VDO to avoid writing physical blocks for the first time
1345 	 * unless there are very few free blocks that have been previously written to.
1346 	 *
1347 	 * Since VDO doesn't discard blocks currently, reusing previously written blocks makes VDO
1348 	 * a better client of any underlying storage that is thinly-provisioned (though discarding
1349 	 * would be better).
1350 	 *
1351 	 * For all other slabs, the priority is derived from the logarithm of the number of free
1352 	 * blocks. Slabs with the same order of magnitude of free blocks have the same priority.
1353 	 * With 2^23 blocks, the priority will range from 1 to 25. The reserved
1354 	 * unopened_slab_priority divides the range and is skipped by the logarithmic mapping.
1355 	 */
1356 
1357 	if (free_blocks == 0)
1358 		return 0;
1359 
1360 	if (is_slab_journal_blank(slab))
1361 		return unopened_slab_priority;
1362 
1363 	priority = (1 + ilog2(free_blocks));
1364 	return ((priority < unopened_slab_priority) ? priority : priority + 1);
1365 }
1366 
1367 /*
1368  * Slabs are essentially prioritized by an approximation of the number of free blocks in the slab
1369  * so slabs with lots of free blocks will be opened for allocation before slabs that have few free
1370  * blocks.
1371  */
1372 static void prioritize_slab(struct vdo_slab *slab)
1373 {
1374 	VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
1375 			    "a slab must not already be on a ring when prioritizing");
1376 	slab->priority = calculate_slab_priority(slab);
1377 	vdo_priority_table_enqueue(slab->allocator->prioritized_slabs,
1378 				   slab->priority, &slab->allocq_entry);
1379 }
1380 
1381 /**
1382  * adjust_free_block_count() - Adjust the free block count and (if needed) reprioritize the slab.
1383  * @incremented: true if the free block count went up.
1384  */
1385 static void adjust_free_block_count(struct vdo_slab *slab, bool incremented)
1386 {
1387 	struct block_allocator *allocator = slab->allocator;
1388 
1389 	WRITE_ONCE(allocator->allocated_blocks,
1390 		   allocator->allocated_blocks + (incremented ? -1 : 1));
1391 
1392 	/* The open slab doesn't need to be reprioritized until it is closed. */
1393 	if (slab == allocator->open_slab)
1394 		return;
1395 
1396 	/* Don't bother adjusting the priority table if unneeded. */
1397 	if (slab->priority == calculate_slab_priority(slab))
1398 		return;
1399 
1400 	/*
1401 	 * Reprioritize the slab to reflect the new free block count by removing it from the table
1402 	 * and re-enqueuing it with the new priority.
1403 	 */
1404 	vdo_priority_table_remove(allocator->prioritized_slabs, &slab->allocq_entry);
1405 	prioritize_slab(slab);
1406 }
1407 
1408 /**
1409  * increment_for_data() - Increment the reference count for a data block.
1410  * @slab: The slab which owns the block.
1411  * @block: The reference block which contains the block being updated.
1412  * @block_number: The block to update.
1413  * @old_status: The reference status of the data block before this increment.
1414  * @lock: The pbn_lock associated with this increment (may be NULL).
1415  * @counter_ptr: A pointer to the count for the data block (in, out).
1416  * @adjust_block_count: Whether to update the allocator's free block count.
1417  *
1418  * Return: VDO_SUCCESS or an error.
1419  */
1420 static int increment_for_data(struct vdo_slab *slab, struct reference_block *block,
1421 			      slab_block_number block_number,
1422 			      enum reference_status old_status,
1423 			      struct pbn_lock *lock, vdo_refcount_t *counter_ptr,
1424 			      bool adjust_block_count)
1425 {
1426 	switch (old_status) {
1427 	case RS_FREE:
1428 		*counter_ptr = 1;
1429 		block->allocated_count++;
1430 		slab->free_blocks--;
1431 		if (adjust_block_count)
1432 			adjust_free_block_count(slab, false);
1433 
1434 		break;
1435 
1436 	case RS_PROVISIONAL:
1437 		*counter_ptr = 1;
1438 		break;
1439 
1440 	default:
1441 		/* Single or shared */
1442 		if (*counter_ptr >= MAXIMUM_REFERENCE_COUNT) {
1443 			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1444 						      "Incrementing a block already having 254 references (slab %u, offset %u)",
1445 						      slab->slab_number, block_number);
1446 		}
1447 		(*counter_ptr)++;
1448 	}
1449 
1450 	if (lock != NULL)
1451 		vdo_unassign_pbn_lock_provisional_reference(lock);
1452 	return VDO_SUCCESS;
1453 }
1454 
1455 /**
1456  * decrement_for_data() - Decrement the reference count for a data block.
1457  * @slab: The slab which owns the block.
1458  * @block: The reference block which contains the block being updated.
1459  * @block_number: The block to update.
1460  * @old_status: The reference status of the data block before this decrement.
1461  * @updater: The reference updater doing this operation in case we need to look up the pbn lock.
1462  * @lock: The pbn_lock associated with the block being decremented (may be NULL).
1463  * @counter_ptr: A pointer to the count for the data block (in, out).
1464  * @adjust_block_count: Whether to update the allocator's free block count.
1465  *
1466  * Return: VDO_SUCCESS or an error.
1467  */
1468 static int decrement_for_data(struct vdo_slab *slab, struct reference_block *block,
1469 			      slab_block_number block_number,
1470 			      enum reference_status old_status,
1471 			      struct reference_updater *updater,
1472 			      vdo_refcount_t *counter_ptr, bool adjust_block_count)
1473 {
1474 	switch (old_status) {
1475 	case RS_FREE:
1476 		return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1477 					      "Decrementing free block at offset %u in slab %u",
1478 					      block_number, slab->slab_number);
1479 
1480 	case RS_PROVISIONAL:
1481 	case RS_SINGLE:
1482 		if (updater->zpbn.zone != NULL) {
1483 			struct pbn_lock *lock = vdo_get_physical_zone_pbn_lock(updater->zpbn.zone,
1484 									       updater->zpbn.pbn);
1485 
1486 			if (lock != NULL) {
1487 				/*
1488 				 * There is a read lock on this block, so the block must not become
1489 				 * unreferenced.
1490 				 */
1491 				*counter_ptr = PROVISIONAL_REFERENCE_COUNT;
1492 				vdo_assign_pbn_lock_provisional_reference(lock);
1493 				break;
1494 			}
1495 		}
1496 
1497 		*counter_ptr = EMPTY_REFERENCE_COUNT;
1498 		block->allocated_count--;
1499 		slab->free_blocks++;
1500 		if (adjust_block_count)
1501 			adjust_free_block_count(slab, true);
1502 
1503 		break;
1504 
1505 	default:
1506 		/* Shared */
1507 		(*counter_ptr)--;
1508 	}
1509 
1510 	return VDO_SUCCESS;
1511 }
1512 
1513 /**
1514  * increment_for_block_map() - Increment the reference count for a block map page.
1515  * @slab: The slab which owns the block.
1516  * @block: The reference block which contains the block being updated.
1517  * @block_number: The block to update.
1518  * @old_status: The reference status of the block before this increment.
1519  * @lock: The pbn_lock associated with this increment (may be NULL).
1520  * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1521  * @counter_ptr: A pointer to the count for the block (in, out).
1522  * @adjust_block_count: Whether to update the allocator's free block count.
1523  *
1524  * All block map increments should be from provisional to MAXIMUM_REFERENCE_COUNT. Since block map
1525  * blocks never dedupe they should never be adjusted from any other state. The adjustment always
1526  * results in MAXIMUM_REFERENCE_COUNT as this value is used to prevent dedupe against block map
1527  * blocks.
1528  *
1529  * Return: VDO_SUCCESS or an error.
1530  */
1531 static int increment_for_block_map(struct vdo_slab *slab, struct reference_block *block,
1532 				   slab_block_number block_number,
1533 				   enum reference_status old_status,
1534 				   struct pbn_lock *lock, bool normal_operation,
1535 				   vdo_refcount_t *counter_ptr, bool adjust_block_count)
1536 {
1537 	switch (old_status) {
1538 	case RS_FREE:
1539 		if (normal_operation) {
1540 			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1541 						      "Incrementing unallocated block map block (slab %u, offset %u)",
1542 						      slab->slab_number, block_number);
1543 		}
1544 
1545 		*counter_ptr = MAXIMUM_REFERENCE_COUNT;
1546 		block->allocated_count++;
1547 		slab->free_blocks--;
1548 		if (adjust_block_count)
1549 			adjust_free_block_count(slab, false);
1550 
1551 		return VDO_SUCCESS;
1552 
1553 	case RS_PROVISIONAL:
1554 		if (!normal_operation)
1555 			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1556 						      "Block map block had provisional reference during replay (slab %u, offset %u)",
1557 						      slab->slab_number, block_number);
1558 
1559 		*counter_ptr = MAXIMUM_REFERENCE_COUNT;
1560 		if (lock != NULL)
1561 			vdo_unassign_pbn_lock_provisional_reference(lock);
1562 		return VDO_SUCCESS;
1563 
1564 	default:
1565 		return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1566 					      "Incrementing a block map block which is already referenced %u times (slab %u, offset %u)",
1567 					      *counter_ptr, slab->slab_number,
1568 					      block_number);
1569 	}
1570 }
1571 
1572 static bool __must_check is_valid_journal_point(const struct journal_point *point)
1573 {
1574 	return ((point != NULL) && (point->sequence_number > 0));
1575 }
1576 
1577 /**
1578  * update_reference_count() - Update the reference count of a block.
1579  * @slab: The slab which owns the block.
1580  * @block: The reference block which contains the block being updated.
1581  * @block_number: The block to update.
1582  * @slab_journal_point: The slab journal point at which this update is journaled.
1583  * @updater: The reference updater.
1584  * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1585  * @adjust_block_count: Whether to update the slab's free block count.
1586  * @provisional_decrement_ptr: A pointer which will be set to true if this update was a decrement
1587  *                             of a provisional reference.
1588  *
1589  * Return: VDO_SUCCESS or an error.
1590  */
1591 static int update_reference_count(struct vdo_slab *slab, struct reference_block *block,
1592 				  slab_block_number block_number,
1593 				  const struct journal_point *slab_journal_point,
1594 				  struct reference_updater *updater,
1595 				  bool normal_operation, bool adjust_block_count,
1596 				  bool *provisional_decrement_ptr)
1597 {
1598 	vdo_refcount_t *counter_ptr = &slab->counters[block_number];
1599 	enum reference_status old_status = reference_count_to_status(*counter_ptr);
1600 	int result;
1601 
1602 	if (!updater->increment) {
1603 		result = decrement_for_data(slab, block, block_number, old_status,
1604 					    updater, counter_ptr, adjust_block_count);
1605 		if ((result == VDO_SUCCESS) && (old_status == RS_PROVISIONAL)) {
1606 			if (provisional_decrement_ptr != NULL)
1607 				*provisional_decrement_ptr = true;
1608 			return VDO_SUCCESS;
1609 		}
1610 	} else if (updater->operation == VDO_JOURNAL_DATA_REMAPPING) {
1611 		result = increment_for_data(slab, block, block_number, old_status,
1612 					    updater->lock, counter_ptr, adjust_block_count);
1613 	} else {
1614 		result = increment_for_block_map(slab, block, block_number, old_status,
1615 						 updater->lock, normal_operation,
1616 						 counter_ptr, adjust_block_count);
1617 	}
1618 
1619 	if (result != VDO_SUCCESS)
1620 		return result;
1621 
1622 	if (is_valid_journal_point(slab_journal_point))
1623 		slab->slab_journal_point = *slab_journal_point;
1624 
1625 	return VDO_SUCCESS;
1626 }
1627 
1628 static int __must_check adjust_reference_count(struct vdo_slab *slab,
1629 					       struct reference_updater *updater,
1630 					       const struct journal_point *slab_journal_point)
1631 {
1632 	slab_block_number block_number;
1633 	int result;
1634 	struct reference_block *block;
1635 	bool provisional_decrement = false;
1636 
1637 	if (!is_slab_open(slab))
1638 		return VDO_INVALID_ADMIN_STATE;
1639 
1640 	result = slab_block_number_from_pbn(slab, updater->zpbn.pbn, &block_number);
1641 	if (result != VDO_SUCCESS)
1642 		return result;
1643 
1644 	block = get_reference_block(slab, block_number);
1645 	result = update_reference_count(slab, block, block_number, slab_journal_point,
1646 					updater, NORMAL_OPERATION, true,
1647 					&provisional_decrement);
1648 	if ((result != VDO_SUCCESS) || provisional_decrement)
1649 		return result;
1650 
1651 	if (block->is_dirty && (block->slab_journal_lock > 0)) {
1652 		sequence_number_t entry_lock = slab_journal_point->sequence_number;
1653 		/*
1654 		 * This block is already dirty and a slab journal entry has been made for it since
1655 		 * the last time it was clean. We must release the per-entry slab journal lock for
1656 		 * the entry associated with the update we are now doing.
1657 		 */
1658 		result = VDO_ASSERT(is_valid_journal_point(slab_journal_point),
1659 				    "Reference count adjustments need slab journal points.");
1660 		if (result != VDO_SUCCESS)
1661 			return result;
1662 
1663 		adjust_slab_journal_block_reference(&slab->journal, entry_lock, -1);
1664 		return VDO_SUCCESS;
1665 	}
1666 
1667 	/*
1668 	 * This may be the first time we are applying an update for which there is a slab journal
1669 	 * entry to this block since the block was cleaned. Therefore, we convert the per-entry
1670 	 * slab journal lock to an uncommitted reference block lock, if there is a per-entry lock.
1671 	 */
1672 	if (is_valid_journal_point(slab_journal_point))
1673 		block->slab_journal_lock = slab_journal_point->sequence_number;
1674 	else
1675 		block->slab_journal_lock = 0;
1676 
1677 	dirty_block(block);
1678 	return VDO_SUCCESS;
1679 }
1680 
1681 /**
1682  * add_entry_from_waiter() - Add an entry to the slab journal.
1683  * @waiter: The vio which should make an entry now.
1684  * @context: The slab journal to make an entry in.
1685  *
1686  * This callback is invoked by add_entries() once it has determined that we are ready to make
1687  * another entry in the slab journal. Implements waiter_callback_fn.
1688  */
1689 static void add_entry_from_waiter(struct vdo_waiter *waiter, void *context)
1690 {
1691 	int result;
1692 	struct reference_updater *updater =
1693 		container_of(waiter, struct reference_updater, waiter);
1694 	struct data_vio *data_vio = data_vio_from_reference_updater(updater);
1695 	struct slab_journal *journal = context;
1696 	struct slab_journal_block_header *header = &journal->tail_header;
1697 	struct journal_point slab_journal_point = {
1698 		.sequence_number = header->sequence_number,
1699 		.entry_count = header->entry_count,
1700 	};
1701 	sequence_number_t recovery_block = data_vio->recovery_journal_point.sequence_number;
1702 
1703 	if (header->entry_count == 0) {
1704 		/*
1705 		 * This is the first entry in the current tail block, so get a lock on the recovery
1706 		 * journal which we will hold until this tail block is committed.
1707 		 */
1708 		get_lock(journal, header->sequence_number)->recovery_start = recovery_block;
1709 		if (journal->recovery_journal != NULL) {
1710 			zone_count_t zone_number = journal->slab->allocator->zone_number;
1711 
1712 			vdo_acquire_recovery_journal_block_reference(journal->recovery_journal,
1713 								     recovery_block,
1714 								     VDO_ZONE_TYPE_PHYSICAL,
1715 								     zone_number);
1716 		}
1717 
1718 		mark_slab_journal_dirty(journal, recovery_block);
1719 		reclaim_journal_space(journal);
1720 	}
1721 
1722 	add_entry(journal, updater->zpbn.pbn, updater->operation, updater->increment,
1723 		  expand_journal_point(data_vio->recovery_journal_point,
1724 				       updater->increment));
1725 
1726 	if (journal->slab->status != VDO_SLAB_REBUILT) {
1727 		/*
1728 		 * If the slab is unrecovered, scrubbing will take care of the count since the
1729 		 * update is now recorded in the journal.
1730 		 */
1731 		adjust_slab_journal_block_reference(journal,
1732 						    slab_journal_point.sequence_number, -1);
1733 		result = VDO_SUCCESS;
1734 	} else {
1735 		/* Now that an entry has been made in the slab journal, update the counter. */
1736 		result = adjust_reference_count(journal->slab, updater,
1737 						&slab_journal_point);
1738 	}
1739 
1740 	if (updater->increment)
1741 		continue_data_vio_with_error(data_vio, result);
1742 	else
1743 		vdo_continue_completion(&data_vio->decrement_completion, result);
1744 }
1745 
1746 /**
1747  * is_next_entry_a_block_map_increment() - Check whether the next entry to be made is a block map
1748  *                                         increment.
1749  * @journal: The journal.
1750  *
1751  * Return: true if the first entry waiter's operation is a block map increment.
1752  */
1753 static inline bool is_next_entry_a_block_map_increment(struct slab_journal *journal)
1754 {
1755 	struct vdo_waiter *waiter = vdo_waitq_get_first_waiter(&journal->entry_waiters);
1756 	struct reference_updater *updater =
1757 		container_of(waiter, struct reference_updater, waiter);
1758 
1759 	return (updater->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING);
1760 }
1761 
1762 /**
1763  * add_entries() - Add as many entries as possible from the queue of vios waiting to make entries.
1764  * @journal: The journal to which entries may be added.
1765  *
1766  * By processing the queue in order, we ensure that slab journal entries are made in the same order
1767  * as recovery journal entries for the same increment or decrement.
1768  */
1769 static void add_entries(struct slab_journal *journal)
1770 {
1771 	if (journal->adding_entries) {
1772 		/* Protect against re-entrancy. */
1773 		return;
1774 	}
1775 
1776 	journal->adding_entries = true;
1777 	while (vdo_waitq_has_waiters(&journal->entry_waiters)) {
1778 		struct slab_journal_block_header *header = &journal->tail_header;
1779 
1780 		if (journal->partial_write_in_progress ||
1781 		    (journal->slab->status == VDO_SLAB_REBUILDING)) {
1782 			/*
1783 			 * Don't add entries while rebuilding or while a partial write is
1784 			 * outstanding, as it could result in reference count corruption.
1785 			 */
1786 			break;
1787 		}
1788 
1789 		if (journal->waiting_to_commit) {
1790 			/*
1791 			 * If we are waiting for resources to write the tail block, and the tail
1792 			 * block is full, we can't make another entry.
1793 			 */
1794 			WRITE_ONCE(journal->events->tail_busy_count,
1795 				   journal->events->tail_busy_count + 1);
1796 			break;
1797 		} else if (is_next_entry_a_block_map_increment(journal) &&
1798 			   (header->entry_count >= journal->full_entries_per_block)) {
1799 			/*
1800 			 * The tail block does not have room for a block map increment, so commit
1801 			 * it now.
1802 			 */
1803 			commit_tail(journal);
1804 			if (journal->waiting_to_commit) {
1805 				WRITE_ONCE(journal->events->tail_busy_count,
1806 					   journal->events->tail_busy_count + 1);
1807 				break;
1808 			}
1809 		}
1810 
1811 		/* If the slab is over the blocking threshold, make the vio wait. */
1812 		if (requires_reaping(journal)) {
1813 			WRITE_ONCE(journal->events->blocked_count,
1814 				   journal->events->blocked_count + 1);
1815 			save_dirty_reference_blocks(journal->slab);
1816 			break;
1817 		}
1818 
1819 		if (header->entry_count == 0) {
1820 			struct journal_lock *lock =
1821 				get_lock(journal, header->sequence_number);
1822 
1823 			/*
1824 			 * Check if the on disk slab journal is full. Because of the blocking and
1825 			 * scrubbing thresholds, this should never happen.
1826 			 */
1827 			if (lock->count > 0) {
1828 				VDO_ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail,
1829 						    "New block has locks, but journal is not full");
1830 
1831 				/*
1832 				 * The blocking threshold must let the journal fill up if the new
1833 				 * block has locks; if the blocking threshold is smaller than the
1834 				 * journal size, the new block cannot possibly have locks already.
1835 				 */
1836 				VDO_ASSERT_LOG_ONLY((journal->blocking_threshold >= journal->size),
1837 						    "New block can have locks already iff blocking threshold is at the end of the journal");
1838 
1839 				WRITE_ONCE(journal->events->disk_full_count,
1840 					   journal->events->disk_full_count + 1);
1841 				save_dirty_reference_blocks(journal->slab);
1842 				break;
1843 			}
1844 
1845 			/*
1846 			 * Don't allow the new block to be reaped until all of the reference count
1847 			 * blocks are written and the journal block has been fully committed as
1848 			 * well.
1849 			 */
1850 			lock->count = journal->entries_per_block + 1;
1851 
1852 			if (header->sequence_number == 1) {
1853 				struct vdo_slab *slab = journal->slab;
1854 				block_count_t i;
1855 
1856 				/*
1857 				 * This is the first entry in this slab journal, ever. Dirty all of
1858 				 * the reference count blocks. Each will acquire a lock on the tail
1859 				 * block so that the journal won't be reaped until the reference
1860 				 * counts are initialized. The lock acquisition must be done by the
1861 				 * ref_counts since here we don't know how many reference blocks
1862 				 * the ref_counts has.
1863 				 */
1864 				for (i = 0; i < slab->reference_block_count; i++) {
1865 					slab->reference_blocks[i].slab_journal_lock = 1;
1866 					dirty_block(&slab->reference_blocks[i]);
1867 				}
1868 
1869 				adjust_slab_journal_block_reference(journal, 1,
1870 								    slab->reference_block_count);
1871 			}
1872 		}
1873 
1874 		vdo_waitq_notify_next_waiter(&journal->entry_waiters,
1875 					     add_entry_from_waiter, journal);
1876 	}
1877 
1878 	journal->adding_entries = false;
1879 
1880 	/* If there are no waiters, and we are flushing or saving, commit the tail block. */
1881 	if (vdo_is_state_draining(&journal->slab->state) &&
1882 	    !vdo_is_state_suspending(&journal->slab->state) &&
1883 	    !vdo_waitq_has_waiters(&journal->entry_waiters))
1884 		commit_tail(journal);
1885 }
1886 
1887 /**
1888  * reset_search_cursor() - Reset the free block search back to the first reference counter in the
1889  *                         first reference block of a slab.
1890  */
1891 static void reset_search_cursor(struct vdo_slab *slab)
1892 {
1893 	struct search_cursor *cursor = &slab->search_cursor;
1894 
1895 	cursor->block = cursor->first_block;
1896 	cursor->index = 0;
1897 	/* Unit tests have slabs with only one reference block (and it's a runt). */
1898 	cursor->end_index = min_t(u32, COUNTS_PER_BLOCK, slab->block_count);
1899 }
1900 
1901 /**
1902  * advance_search_cursor() - Advance the search cursor to the start of the next reference block in
1903  *                           a slab,
1904  *
1905  * Wraps around to the first reference block if the current block is the last reference block.
1906  *
1907  * Return: true unless the cursor was at the last reference block.
1908  */
1909 static bool advance_search_cursor(struct vdo_slab *slab)
1910 {
1911 	struct search_cursor *cursor = &slab->search_cursor;
1912 
1913 	/*
1914 	 * If we just finished searching the last reference block, then wrap back around to the
1915 	 * start of the array.
1916 	 */
1917 	if (cursor->block == cursor->last_block) {
1918 		reset_search_cursor(slab);
1919 		return false;
1920 	}
1921 
1922 	/* We're not already at the end, so advance to cursor to the next block. */
1923 	cursor->block++;
1924 	cursor->index = cursor->end_index;
1925 
1926 	if (cursor->block == cursor->last_block) {
1927 		/* The last reference block will usually be a runt. */
1928 		cursor->end_index = slab->block_count;
1929 	} else {
1930 		cursor->end_index += COUNTS_PER_BLOCK;
1931 	}
1932 
1933 	return true;
1934 }
1935 
1936 /**
1937  * vdo_adjust_reference_count_for_rebuild() - Adjust the reference count of a block during rebuild.
1938  *
1939  * Return: VDO_SUCCESS or an error.
1940  */
1941 int vdo_adjust_reference_count_for_rebuild(struct slab_depot *depot,
1942 					   physical_block_number_t pbn,
1943 					   enum journal_operation operation)
1944 {
1945 	int result;
1946 	slab_block_number block_number;
1947 	struct reference_block *block;
1948 	struct vdo_slab *slab = vdo_get_slab(depot, pbn);
1949 	struct reference_updater updater = {
1950 		.operation = operation,
1951 		.increment = true,
1952 	};
1953 
1954 	result = slab_block_number_from_pbn(slab, pbn, &block_number);
1955 	if (result != VDO_SUCCESS)
1956 		return result;
1957 
1958 	block = get_reference_block(slab, block_number);
1959 	result = update_reference_count(slab, block, block_number, NULL,
1960 					&updater, !NORMAL_OPERATION, false, NULL);
1961 	if (result != VDO_SUCCESS)
1962 		return result;
1963 
1964 	dirty_block(block);
1965 	return VDO_SUCCESS;
1966 }
1967 
1968 /**
1969  * replay_reference_count_change() - Replay the reference count adjustment from a slab journal
1970  *                                   entry into the reference count for a block.
1971  * @slab: The slab.
1972  * @entry_point: The slab journal point for the entry.
1973  * @entry: The slab journal entry being replayed.
1974  *
1975  * The adjustment will be ignored if it was already recorded in the reference count.
1976  *
1977  * Return: VDO_SUCCESS or an error code.
1978  */
1979 static int replay_reference_count_change(struct vdo_slab *slab,
1980 					 const struct journal_point *entry_point,
1981 					 struct slab_journal_entry entry)
1982 {
1983 	int result;
1984 	struct reference_block *block = get_reference_block(slab, entry.sbn);
1985 	sector_count_t sector = (entry.sbn % COUNTS_PER_BLOCK) / COUNTS_PER_SECTOR;
1986 	struct reference_updater updater = {
1987 		.operation = entry.operation,
1988 		.increment = entry.increment,
1989 	};
1990 
1991 	if (!vdo_before_journal_point(&block->commit_points[sector], entry_point)) {
1992 		/* This entry is already reflected in the existing counts, so do nothing. */
1993 		return VDO_SUCCESS;
1994 	}
1995 
1996 	/* This entry is not yet counted in the reference counts. */
1997 	result = update_reference_count(slab, block, entry.sbn, entry_point,
1998 					&updater, !NORMAL_OPERATION, false, NULL);
1999 	if (result != VDO_SUCCESS)
2000 		return result;
2001 
2002 	dirty_block(block);
2003 	return VDO_SUCCESS;
2004 }
2005 
2006 /**
2007  * find_zero_byte_in_word() - Find the array index of the first zero byte in word-sized range of
2008  *                            reference counters.
2009  * @word_ptr: A pointer to the eight counter bytes to check.
2010  * @start_index: The array index corresponding to word_ptr[0].
2011  * @fail_index: The array index to return if no zero byte is found.
2012  *
2013  * The search does no bounds checking; the function relies on the array being sufficiently padded.
2014  *
2015  * Return: The array index of the first zero byte in the word, or the value passed as fail_index if
2016  *         no zero byte was found.
2017  */
2018 static inline slab_block_number find_zero_byte_in_word(const u8 *word_ptr,
2019 						       slab_block_number start_index,
2020 						       slab_block_number fail_index)
2021 {
2022 	u64 word = get_unaligned_le64(word_ptr);
2023 
2024 	/* This looks like a loop, but GCC will unroll the eight iterations for us. */
2025 	unsigned int offset;
2026 
2027 	for (offset = 0; offset < BYTES_PER_WORD; offset++) {
2028 		/* Assumes little-endian byte order, which we have on X86. */
2029 		if ((word & 0xFF) == 0)
2030 			return (start_index + offset);
2031 		word >>= 8;
2032 	}
2033 
2034 	return fail_index;
2035 }
2036 
2037 /**
2038  * find_free_block() - Find the first block with a reference count of zero in the specified
2039  *                     range of reference counter indexes.
2040  * @slab: The slab counters to scan.
2041  * @index_ptr: A pointer to hold the array index of the free block.
2042  *
2043  * Exposed for unit testing.
2044  *
2045  * Return: true if a free block was found in the specified range.
2046  */
2047 static bool find_free_block(const struct vdo_slab *slab, slab_block_number *index_ptr)
2048 {
2049 	slab_block_number zero_index;
2050 	slab_block_number next_index = slab->search_cursor.index;
2051 	slab_block_number end_index = slab->search_cursor.end_index;
2052 	u8 *next_counter = &slab->counters[next_index];
2053 	u8 *end_counter = &slab->counters[end_index];
2054 
2055 	/*
2056 	 * Search every byte of the first unaligned word. (Array is padded so reading past end is
2057 	 * safe.)
2058 	 */
2059 	zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2060 	if (zero_index < end_index) {
2061 		*index_ptr = zero_index;
2062 		return true;
2063 	}
2064 
2065 	/*
2066 	 * On architectures where unaligned word access is expensive, this would be a good place to
2067 	 * advance to an alignment boundary.
2068 	 */
2069 	next_index += BYTES_PER_WORD;
2070 	next_counter += BYTES_PER_WORD;
2071 
2072 	/*
2073 	 * Now we're word-aligned; check an word at a time until we find a word containing a zero.
2074 	 * (Array is padded so reading past end is safe.)
2075 	 */
2076 	while (next_counter < end_counter) {
2077 		/*
2078 		 * The following code is currently an exact copy of the code preceding the loop,
2079 		 * but if you try to merge them by using a do loop, it runs slower because a jump
2080 		 * instruction gets added at the start of the iteration.
2081 		 */
2082 		zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2083 		if (zero_index < end_index) {
2084 			*index_ptr = zero_index;
2085 			return true;
2086 		}
2087 
2088 		next_index += BYTES_PER_WORD;
2089 		next_counter += BYTES_PER_WORD;
2090 	}
2091 
2092 	return false;
2093 }
2094 
2095 /**
2096  * search_current_reference_block() - Search the reference block currently saved in the search
2097  *                                    cursor for a reference count of zero, starting at the saved
2098  *                                    counter index.
2099  * @slab: The slab to search.
2100  * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2101  *
2102  * Return: true if an unreferenced counter was found.
2103  */
2104 static bool search_current_reference_block(const struct vdo_slab *slab,
2105 					   slab_block_number *free_index_ptr)
2106 {
2107 	/* Don't bother searching if the current block is known to be full. */
2108 	return ((slab->search_cursor.block->allocated_count < COUNTS_PER_BLOCK) &&
2109 		find_free_block(slab, free_index_ptr));
2110 }
2111 
2112 /**
2113  * search_reference_blocks() - Search each reference block for a reference count of zero.
2114  * @slab: The slab to search.
2115  * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2116  *
2117  * Searches each reference block for a reference count of zero, starting at the reference block and
2118  * counter index saved in the search cursor and searching up to the end of the last reference
2119  * block. The search does not wrap.
2120  *
2121  * Return: true if an unreferenced counter was found.
2122  */
2123 static bool search_reference_blocks(struct vdo_slab *slab,
2124 				    slab_block_number *free_index_ptr)
2125 {
2126 	/* Start searching at the saved search position in the current block. */
2127 	if (search_current_reference_block(slab, free_index_ptr))
2128 		return true;
2129 
2130 	/* Search each reference block up to the end of the slab. */
2131 	while (advance_search_cursor(slab)) {
2132 		if (search_current_reference_block(slab, free_index_ptr))
2133 			return true;
2134 	}
2135 
2136 	return false;
2137 }
2138 
2139 /**
2140  * make_provisional_reference() - Do the bookkeeping for making a provisional reference.
2141  */
2142 static void make_provisional_reference(struct vdo_slab *slab,
2143 				       slab_block_number block_number)
2144 {
2145 	struct reference_block *block = get_reference_block(slab, block_number);
2146 
2147 	/*
2148 	 * Make the initial transition from an unreferenced block to a
2149 	 * provisionally allocated block.
2150 	 */
2151 	slab->counters[block_number] = PROVISIONAL_REFERENCE_COUNT;
2152 
2153 	/* Account for the allocation. */
2154 	block->allocated_count++;
2155 	slab->free_blocks--;
2156 }
2157 
2158 /**
2159  * dirty_all_reference_blocks() - Mark all reference count blocks in a slab as dirty.
2160  */
2161 static void dirty_all_reference_blocks(struct vdo_slab *slab)
2162 {
2163 	block_count_t i;
2164 
2165 	for (i = 0; i < slab->reference_block_count; i++)
2166 		dirty_block(&slab->reference_blocks[i]);
2167 }
2168 
2169 /**
2170  * clear_provisional_references() - Clear the provisional reference counts from a reference block.
2171  * @block: The block to clear.
2172  */
2173 static void clear_provisional_references(struct reference_block *block)
2174 {
2175 	vdo_refcount_t *counters = get_reference_counters_for_block(block);
2176 	block_count_t j;
2177 
2178 	for (j = 0; j < COUNTS_PER_BLOCK; j++) {
2179 		if (counters[j] == PROVISIONAL_REFERENCE_COUNT) {
2180 			counters[j] = EMPTY_REFERENCE_COUNT;
2181 			block->allocated_count--;
2182 		}
2183 	}
2184 }
2185 
2186 static inline bool journal_points_equal(struct journal_point first,
2187 					struct journal_point second)
2188 {
2189 	return ((first.sequence_number == second.sequence_number) &&
2190 		(first.entry_count == second.entry_count));
2191 }
2192 
2193 /**
2194  * unpack_reference_block() - Unpack reference counts blocks into the internal memory structure.
2195  * @packed: The written reference block to be unpacked.
2196  * @block: The internal reference block to be loaded.
2197  */
2198 static void unpack_reference_block(struct packed_reference_block *packed,
2199 				   struct reference_block *block)
2200 {
2201 	block_count_t index;
2202 	sector_count_t i;
2203 	struct vdo_slab *slab = block->slab;
2204 	vdo_refcount_t *counters = get_reference_counters_for_block(block);
2205 
2206 	for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
2207 		struct packed_reference_sector *sector = &packed->sectors[i];
2208 
2209 		vdo_unpack_journal_point(&sector->commit_point, &block->commit_points[i]);
2210 		memcpy(counters + (i * COUNTS_PER_SECTOR), sector->counts,
2211 		       (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
2212 		/* The slab_journal_point must be the latest point found in any sector. */
2213 		if (vdo_before_journal_point(&slab->slab_journal_point,
2214 					     &block->commit_points[i]))
2215 			slab->slab_journal_point = block->commit_points[i];
2216 
2217 		if ((i > 0) &&
2218 		    !journal_points_equal(block->commit_points[0],
2219 					  block->commit_points[i])) {
2220 			size_t block_index = block - block->slab->reference_blocks;
2221 
2222 			vdo_log_warning("Torn write detected in sector %u of reference block %zu of slab %u",
2223 					i, block_index, block->slab->slab_number);
2224 		}
2225 	}
2226 
2227 	block->allocated_count = 0;
2228 	for (index = 0; index < COUNTS_PER_BLOCK; index++) {
2229 		if (counters[index] != EMPTY_REFERENCE_COUNT)
2230 			block->allocated_count++;
2231 	}
2232 }
2233 
2234 /**
2235  * finish_reference_block_load() - After a reference block has been read, unpack it.
2236  * @completion: The VIO that just finished reading.
2237  */
2238 static void finish_reference_block_load(struct vdo_completion *completion)
2239 {
2240 	struct vio *vio = as_vio(completion);
2241 	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
2242 	struct reference_block *block = completion->parent;
2243 	struct vdo_slab *slab = block->slab;
2244 
2245 	unpack_reference_block((struct packed_reference_block *) vio->data, block);
2246 	return_vio_to_pool(slab->allocator->vio_pool, pooled);
2247 	slab->active_count--;
2248 	clear_provisional_references(block);
2249 
2250 	slab->free_blocks -= block->allocated_count;
2251 	check_if_slab_drained(slab);
2252 }
2253 
2254 static void load_reference_block_endio(struct bio *bio)
2255 {
2256 	struct vio *vio = bio->bi_private;
2257 	struct reference_block *block = vio->completion.parent;
2258 
2259 	continue_vio_after_io(vio, finish_reference_block_load,
2260 			      block->slab->allocator->thread_id);
2261 }
2262 
2263 /**
2264  * load_reference_block() - After a block waiter has gotten a VIO from the VIO pool, load the
2265  *                          block.
2266  * @waiter: The waiter of the block to load.
2267  * @context: The VIO returned by the pool.
2268  */
2269 static void load_reference_block(struct vdo_waiter *waiter, void *context)
2270 {
2271 	struct pooled_vio *pooled = context;
2272 	struct vio *vio = &pooled->vio;
2273 	struct reference_block *block =
2274 		container_of(waiter, struct reference_block, waiter);
2275 	size_t block_offset = (block - block->slab->reference_blocks);
2276 
2277 	vio->completion.parent = block;
2278 	vdo_submit_metadata_vio(vio, block->slab->ref_counts_origin + block_offset,
2279 				load_reference_block_endio, handle_io_error,
2280 				REQ_OP_READ);
2281 }
2282 
2283 /**
2284  * load_reference_blocks() - Load a slab's reference blocks from the underlying storage into a
2285  *                           pre-allocated reference counter.
2286  */
2287 static void load_reference_blocks(struct vdo_slab *slab)
2288 {
2289 	block_count_t i;
2290 
2291 	slab->free_blocks = slab->block_count;
2292 	slab->active_count = slab->reference_block_count;
2293 	for (i = 0; i < slab->reference_block_count; i++) {
2294 		struct vdo_waiter *waiter = &slab->reference_blocks[i].waiter;
2295 
2296 		waiter->callback = load_reference_block;
2297 		acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
2298 	}
2299 }
2300 
2301 /**
2302  * drain_slab() - Drain all reference count I/O.
2303  *
2304  * Depending upon the type of drain being performed (as recorded in the ref_count's vdo_slab), the
2305  * reference blocks may be loaded from disk or dirty reference blocks may be written out.
2306  */
2307 static void drain_slab(struct vdo_slab *slab)
2308 {
2309 	bool save;
2310 	bool load;
2311 	const struct admin_state_code *state = vdo_get_admin_state_code(&slab->state);
2312 
2313 	if (state == VDO_ADMIN_STATE_SUSPENDING)
2314 		return;
2315 
2316 	if ((state != VDO_ADMIN_STATE_REBUILDING) &&
2317 	    (state != VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING))
2318 		commit_tail(&slab->journal);
2319 
2320 	if ((state == VDO_ADMIN_STATE_RECOVERING) || (slab->counters == NULL))
2321 		return;
2322 
2323 	save = false;
2324 	load = slab->allocator->summary_entries[slab->slab_number].load_ref_counts;
2325 	if (state == VDO_ADMIN_STATE_SCRUBBING) {
2326 		if (load) {
2327 			load_reference_blocks(slab);
2328 			return;
2329 		}
2330 	} else if (state == VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING) {
2331 		if (!load) {
2332 			/* These reference counts were never written, so mark them all dirty. */
2333 			dirty_all_reference_blocks(slab);
2334 		}
2335 		save = true;
2336 	} else if (state == VDO_ADMIN_STATE_REBUILDING) {
2337 		/*
2338 		 * Write out the counters if the slab has written them before, or it has any
2339 		 * non-zero reference counts, or there are any slab journal blocks.
2340 		 */
2341 		block_count_t data_blocks = slab->allocator->depot->slab_config.data_blocks;
2342 
2343 		if (load || (slab->free_blocks != data_blocks) ||
2344 		    !is_slab_journal_blank(slab)) {
2345 			dirty_all_reference_blocks(slab);
2346 			save = true;
2347 		}
2348 	} else if (state == VDO_ADMIN_STATE_SAVING) {
2349 		save = (slab->status == VDO_SLAB_REBUILT);
2350 	} else {
2351 		vdo_finish_draining_with_result(&slab->state, VDO_SUCCESS);
2352 		return;
2353 	}
2354 
2355 	if (save)
2356 		save_dirty_reference_blocks(slab);
2357 }
2358 
2359 static int allocate_slab_counters(struct vdo_slab *slab)
2360 {
2361 	int result;
2362 	size_t index, bytes;
2363 
2364 	result = VDO_ASSERT(slab->reference_blocks == NULL,
2365 			    "vdo_slab %u doesn't allocate refcounts twice",
2366 			    slab->slab_number);
2367 	if (result != VDO_SUCCESS)
2368 		return result;
2369 
2370 	result = vdo_allocate(slab->reference_block_count, struct reference_block,
2371 			      __func__, &slab->reference_blocks);
2372 	if (result != VDO_SUCCESS)
2373 		return result;
2374 
2375 	/*
2376 	 * Allocate such that the runt slab has a full-length memory array, plus a little padding
2377 	 * so we can word-search even at the very end.
2378 	 */
2379 	bytes = (slab->reference_block_count * COUNTS_PER_BLOCK) + (2 * BYTES_PER_WORD);
2380 	result = vdo_allocate(bytes, vdo_refcount_t, "ref counts array",
2381 			      &slab->counters);
2382 	if (result != VDO_SUCCESS) {
2383 		vdo_free(vdo_forget(slab->reference_blocks));
2384 		return result;
2385 	}
2386 
2387 	slab->search_cursor.first_block = slab->reference_blocks;
2388 	slab->search_cursor.last_block = &slab->reference_blocks[slab->reference_block_count - 1];
2389 	reset_search_cursor(slab);
2390 
2391 	for (index = 0; index < slab->reference_block_count; index++) {
2392 		slab->reference_blocks[index] = (struct reference_block) {
2393 			.slab = slab,
2394 		};
2395 	}
2396 
2397 	return VDO_SUCCESS;
2398 }
2399 
2400 static int allocate_counters_if_clean(struct vdo_slab *slab)
2401 {
2402 	if (vdo_is_state_clean_load(&slab->state))
2403 		return allocate_slab_counters(slab);
2404 
2405 	return VDO_SUCCESS;
2406 }
2407 
2408 static void finish_loading_journal(struct vdo_completion *completion)
2409 {
2410 	struct vio *vio = as_vio(completion);
2411 	struct slab_journal *journal = completion->parent;
2412 	struct vdo_slab *slab = journal->slab;
2413 	struct packed_slab_journal_block *block = (struct packed_slab_journal_block *) vio->data;
2414 	struct slab_journal_block_header header;
2415 
2416 	vdo_unpack_slab_journal_block_header(&block->header, &header);
2417 
2418 	/* FIXME: should it be an error if the following conditional fails? */
2419 	if ((header.metadata_type == VDO_METADATA_SLAB_JOURNAL) &&
2420 	    (header.nonce == slab->allocator->nonce)) {
2421 		journal->tail = header.sequence_number + 1;
2422 
2423 		/*
2424 		 * If the slab is clean, this implies the slab journal is empty, so advance the
2425 		 * head appropriately.
2426 		 */
2427 		journal->head = (slab->allocator->summary_entries[slab->slab_number].is_dirty ?
2428 				 header.head : journal->tail);
2429 		journal->tail_header = header;
2430 		initialize_journal_state(journal);
2431 	}
2432 
2433 	return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio));
2434 	vdo_finish_loading_with_result(&slab->state, allocate_counters_if_clean(slab));
2435 }
2436 
2437 static void read_slab_journal_tail_endio(struct bio *bio)
2438 {
2439 	struct vio *vio = bio->bi_private;
2440 	struct slab_journal *journal = vio->completion.parent;
2441 
2442 	continue_vio_after_io(vio, finish_loading_journal,
2443 			      journal->slab->allocator->thread_id);
2444 }
2445 
2446 static void handle_load_error(struct vdo_completion *completion)
2447 {
2448 	int result = completion->result;
2449 	struct slab_journal *journal = completion->parent;
2450 	struct vio *vio = as_vio(completion);
2451 
2452 	vio_record_metadata_io_error(vio);
2453 	return_vio_to_pool(journal->slab->allocator->vio_pool, vio_as_pooled_vio(vio));
2454 	vdo_finish_loading_with_result(&journal->slab->state, result);
2455 }
2456 
2457 /**
2458  * read_slab_journal_tail() - Read the slab journal tail block by using a vio acquired from the vio
2459  *                            pool.
2460  * @waiter: The vio pool waiter which has just been notified.
2461  * @context: The vio pool entry given to the waiter.
2462  *
2463  * This is the success callback from acquire_vio_from_pool() when loading a slab journal.
2464  */
2465 static void read_slab_journal_tail(struct vdo_waiter *waiter, void *context)
2466 {
2467 	struct slab_journal *journal =
2468 		container_of(waiter, struct slab_journal, resource_waiter);
2469 	struct vdo_slab *slab = journal->slab;
2470 	struct pooled_vio *pooled = context;
2471 	struct vio *vio = &pooled->vio;
2472 	tail_block_offset_t last_commit_point =
2473 		slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2474 
2475 	/*
2476 	 * Slab summary keeps the commit point offset, so the tail block is the block before that.
2477 	 * Calculation supports small journals in unit tests.
2478 	 */
2479 	tail_block_offset_t tail_block = ((last_commit_point == 0) ?
2480 					  (tail_block_offset_t)(journal->size - 1) :
2481 					  (last_commit_point - 1));
2482 
2483 	vio->completion.parent = journal;
2484 	vio->completion.callback_thread_id = slab->allocator->thread_id;
2485 	vdo_submit_metadata_vio(vio, slab->journal_origin + tail_block,
2486 				read_slab_journal_tail_endio, handle_load_error,
2487 				REQ_OP_READ);
2488 }
2489 
2490 /**
2491  * load_slab_journal() - Load a slab's journal by reading the journal's tail.
2492  */
2493 static void load_slab_journal(struct vdo_slab *slab)
2494 {
2495 	struct slab_journal *journal = &slab->journal;
2496 	tail_block_offset_t last_commit_point;
2497 
2498 	last_commit_point = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2499 	if ((last_commit_point == 0) &&
2500 	    !slab->allocator->summary_entries[slab->slab_number].load_ref_counts) {
2501 		/*
2502 		 * This slab claims that it has a tail block at (journal->size - 1), but a head of
2503 		 * 1. This is impossible, due to the scrubbing threshold, on a real system, so
2504 		 * don't bother reading the (bogus) data off disk.
2505 		 */
2506 		VDO_ASSERT_LOG_ONLY(((journal->size < 16) ||
2507 				     (journal->scrubbing_threshold < (journal->size - 1))),
2508 				    "Scrubbing threshold protects against reads of unwritten slab journal blocks");
2509 		vdo_finish_loading_with_result(&slab->state,
2510 					       allocate_counters_if_clean(slab));
2511 		return;
2512 	}
2513 
2514 	journal->resource_waiter.callback = read_slab_journal_tail;
2515 	acquire_vio_from_pool(slab->allocator->vio_pool, &journal->resource_waiter);
2516 }
2517 
2518 static void register_slab_for_scrubbing(struct vdo_slab *slab, bool high_priority)
2519 {
2520 	struct slab_scrubber *scrubber = &slab->allocator->scrubber;
2521 
2522 	VDO_ASSERT_LOG_ONLY((slab->status != VDO_SLAB_REBUILT),
2523 			    "slab to be scrubbed is unrecovered");
2524 
2525 	if (slab->status != VDO_SLAB_REQUIRES_SCRUBBING)
2526 		return;
2527 
2528 	list_del_init(&slab->allocq_entry);
2529 	if (!slab->was_queued_for_scrubbing) {
2530 		WRITE_ONCE(scrubber->slab_count, scrubber->slab_count + 1);
2531 		slab->was_queued_for_scrubbing = true;
2532 	}
2533 
2534 	if (high_priority) {
2535 		slab->status = VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING;
2536 		list_add_tail(&slab->allocq_entry, &scrubber->high_priority_slabs);
2537 		return;
2538 	}
2539 
2540 	list_add_tail(&slab->allocq_entry, &scrubber->slabs);
2541 }
2542 
2543 /* Queue a slab for allocation or scrubbing. */
2544 static void queue_slab(struct vdo_slab *slab)
2545 {
2546 	struct block_allocator *allocator = slab->allocator;
2547 	block_count_t free_blocks;
2548 	int result;
2549 
2550 	VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
2551 			"a requeued slab must not already be on a ring");
2552 
2553 	if (vdo_is_read_only(allocator->depot->vdo))
2554 		return;
2555 
2556 	free_blocks = slab->free_blocks;
2557 	result = VDO_ASSERT((free_blocks <= allocator->depot->slab_config.data_blocks),
2558 			    "rebuilt slab %u must have a valid free block count (has %llu, expected maximum %llu)",
2559 			    slab->slab_number, (unsigned long long) free_blocks,
2560 			    (unsigned long long) allocator->depot->slab_config.data_blocks);
2561 	if (result != VDO_SUCCESS) {
2562 		vdo_enter_read_only_mode(allocator->depot->vdo, result);
2563 		return;
2564 	}
2565 
2566 	if (slab->status != VDO_SLAB_REBUILT) {
2567 		register_slab_for_scrubbing(slab, false);
2568 		return;
2569 	}
2570 
2571 	if (!vdo_is_state_resuming(&slab->state)) {
2572 		/*
2573 		 * If the slab is resuming, we've already accounted for it here, so don't do it
2574 		 * again.
2575 		 * FIXME: under what situation would the slab be resuming here?
2576 		 */
2577 		WRITE_ONCE(allocator->allocated_blocks,
2578 			   allocator->allocated_blocks - free_blocks);
2579 		if (!is_slab_journal_blank(slab)) {
2580 			WRITE_ONCE(allocator->statistics.slabs_opened,
2581 				   allocator->statistics.slabs_opened + 1);
2582 		}
2583 	}
2584 
2585 	if (allocator->depot->vdo->suspend_type == VDO_ADMIN_STATE_SAVING)
2586 		reopen_slab_journal(slab);
2587 
2588 	prioritize_slab(slab);
2589 }
2590 
2591 /**
2592  * initiate_slab_action() - Initiate a slab action.
2593  *
2594  * Implements vdo_admin_initiator_fn.
2595  */
2596 static void initiate_slab_action(struct admin_state *state)
2597 {
2598 	struct vdo_slab *slab = container_of(state, struct vdo_slab, state);
2599 
2600 	if (vdo_is_state_draining(state)) {
2601 		const struct admin_state_code *operation = vdo_get_admin_state_code(state);
2602 
2603 		if (operation == VDO_ADMIN_STATE_SCRUBBING)
2604 			slab->status = VDO_SLAB_REBUILDING;
2605 
2606 		drain_slab(slab);
2607 		check_if_slab_drained(slab);
2608 		return;
2609 	}
2610 
2611 	if (vdo_is_state_loading(state)) {
2612 		load_slab_journal(slab);
2613 		return;
2614 	}
2615 
2616 	if (vdo_is_state_resuming(state)) {
2617 		queue_slab(slab);
2618 		vdo_finish_resuming(state);
2619 		return;
2620 	}
2621 
2622 	vdo_finish_operation(state, VDO_INVALID_ADMIN_STATE);
2623 }
2624 
2625 /**
2626  * get_next_slab() - Get the next slab to scrub.
2627  * @scrubber: The slab scrubber.
2628  *
2629  * Return: The next slab to scrub or NULL if there are none.
2630  */
2631 static struct vdo_slab *get_next_slab(struct slab_scrubber *scrubber)
2632 {
2633 	struct vdo_slab *slab;
2634 
2635 	slab = list_first_entry_or_null(&scrubber->high_priority_slabs,
2636 					struct vdo_slab, allocq_entry);
2637 	if (slab != NULL)
2638 		return slab;
2639 
2640 	return list_first_entry_or_null(&scrubber->slabs, struct vdo_slab,
2641 					allocq_entry);
2642 }
2643 
2644 /**
2645  * has_slabs_to_scrub() - Check whether a scrubber has slabs to scrub.
2646  * @scrubber: The scrubber to check.
2647  *
2648  * Return: true if the scrubber has slabs to scrub.
2649  */
2650 static inline bool __must_check has_slabs_to_scrub(struct slab_scrubber *scrubber)
2651 {
2652 	return (get_next_slab(scrubber) != NULL);
2653 }
2654 
2655 /**
2656  * uninitialize_scrubber_vio() - Clean up the slab_scrubber's vio.
2657  * @scrubber: The scrubber.
2658  */
2659 static void uninitialize_scrubber_vio(struct slab_scrubber *scrubber)
2660 {
2661 	vdo_free(vdo_forget(scrubber->vio.data));
2662 	free_vio_components(&scrubber->vio);
2663 }
2664 
2665 /**
2666  * finish_scrubbing() - Stop scrubbing, either because there are no more slabs to scrub or because
2667  *                      there's been an error.
2668  * @scrubber: The scrubber.
2669  */
2670 static void finish_scrubbing(struct slab_scrubber *scrubber, int result)
2671 {
2672 	bool notify = vdo_waitq_has_waiters(&scrubber->waiters);
2673 	bool done = !has_slabs_to_scrub(scrubber);
2674 	struct block_allocator *allocator =
2675 		container_of(scrubber, struct block_allocator, scrubber);
2676 
2677 	if (done)
2678 		uninitialize_scrubber_vio(scrubber);
2679 
2680 	if (scrubber->high_priority_only) {
2681 		scrubber->high_priority_only = false;
2682 		vdo_fail_completion(vdo_forget(scrubber->vio.completion.parent), result);
2683 	} else if (done && (atomic_add_return(-1, &allocator->depot->zones_to_scrub) == 0)) {
2684 		/* All of our slabs were scrubbed, and we're the last allocator to finish. */
2685 		enum vdo_state prior_state =
2686 			atomic_cmpxchg(&allocator->depot->vdo->state, VDO_RECOVERING,
2687 				       VDO_DIRTY);
2688 
2689 		/*
2690 		 * To be safe, even if the CAS failed, ensure anything that follows is ordered with
2691 		 * respect to whatever state change did happen.
2692 		 */
2693 		smp_mb__after_atomic();
2694 
2695 		/*
2696 		 * We must check the VDO state here and not the depot's read_only_notifier since
2697 		 * the compare-swap-above could have failed due to a read-only entry which our own
2698 		 * thread does not yet know about.
2699 		 */
2700 		if (prior_state == VDO_DIRTY)
2701 			vdo_log_info("VDO commencing normal operation");
2702 		else if (prior_state == VDO_RECOVERING)
2703 			vdo_log_info("Exiting recovery mode");
2704 	}
2705 
2706 	/*
2707 	 * Note that the scrubber has stopped, and inform anyone who might be waiting for that to
2708 	 * happen.
2709 	 */
2710 	if (!vdo_finish_draining(&scrubber->admin_state))
2711 		WRITE_ONCE(scrubber->admin_state.current_state,
2712 			   VDO_ADMIN_STATE_SUSPENDED);
2713 
2714 	/*
2715 	 * We can't notify waiters until after we've finished draining or they'll just requeue.
2716 	 * Fortunately if there were waiters, we can't have been freed yet.
2717 	 */
2718 	if (notify)
2719 		vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2720 }
2721 
2722 static void scrub_next_slab(struct slab_scrubber *scrubber);
2723 
2724 /**
2725  * slab_scrubbed() - Notify the scrubber that a slab has been scrubbed.
2726  * @completion: The slab rebuild completion.
2727  *
2728  * This callback is registered in apply_journal_entries().
2729  */
2730 static void slab_scrubbed(struct vdo_completion *completion)
2731 {
2732 	struct slab_scrubber *scrubber =
2733 		container_of(as_vio(completion), struct slab_scrubber, vio);
2734 	struct vdo_slab *slab = scrubber->slab;
2735 
2736 	slab->status = VDO_SLAB_REBUILT;
2737 	queue_slab(slab);
2738 	reopen_slab_journal(slab);
2739 	WRITE_ONCE(scrubber->slab_count, scrubber->slab_count - 1);
2740 	scrub_next_slab(scrubber);
2741 }
2742 
2743 /**
2744  * abort_scrubbing() - Abort scrubbing due to an error.
2745  * @scrubber: The slab scrubber.
2746  * @result: The error.
2747  */
2748 static void abort_scrubbing(struct slab_scrubber *scrubber, int result)
2749 {
2750 	vdo_enter_read_only_mode(scrubber->vio.completion.vdo, result);
2751 	finish_scrubbing(scrubber, result);
2752 }
2753 
2754 /**
2755  * handle_scrubber_error() - Handle errors while rebuilding a slab.
2756  * @completion: The slab rebuild completion.
2757  */
2758 static void handle_scrubber_error(struct vdo_completion *completion)
2759 {
2760 	struct vio *vio = as_vio(completion);
2761 
2762 	vio_record_metadata_io_error(vio);
2763 	abort_scrubbing(container_of(vio, struct slab_scrubber, vio),
2764 			completion->result);
2765 }
2766 
2767 /**
2768  * apply_block_entries() - Apply all the entries in a block to the reference counts.
2769  * @block: A block with entries to apply.
2770  * @entry_count: The number of entries to apply.
2771  * @block_number: The sequence number of the block.
2772  * @slab: The slab to apply the entries to.
2773  *
2774  * Return: VDO_SUCCESS or an error code.
2775  */
2776 static int apply_block_entries(struct packed_slab_journal_block *block,
2777 			       journal_entry_count_t entry_count,
2778 			       sequence_number_t block_number, struct vdo_slab *slab)
2779 {
2780 	struct journal_point entry_point = {
2781 		.sequence_number = block_number,
2782 		.entry_count = 0,
2783 	};
2784 	int result;
2785 	slab_block_number max_sbn = slab->end - slab->start;
2786 
2787 	while (entry_point.entry_count < entry_count) {
2788 		struct slab_journal_entry entry =
2789 			vdo_decode_slab_journal_entry(block, entry_point.entry_count);
2790 
2791 		if (entry.sbn > max_sbn) {
2792 			/* This entry is out of bounds. */
2793 			return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
2794 						      "vdo_slab journal entry (%llu, %u) had invalid offset %u in slab (size %u blocks)",
2795 						      (unsigned long long) block_number,
2796 						      entry_point.entry_count,
2797 						      entry.sbn, max_sbn);
2798 		}
2799 
2800 		result = replay_reference_count_change(slab, &entry_point, entry);
2801 		if (result != VDO_SUCCESS) {
2802 			vdo_log_error_strerror(result,
2803 					       "vdo_slab journal entry (%llu, %u) (%s of offset %u) could not be applied in slab %u",
2804 					       (unsigned long long) block_number,
2805 					       entry_point.entry_count,
2806 					       vdo_get_journal_operation_name(entry.operation),
2807 					       entry.sbn, slab->slab_number);
2808 			return result;
2809 		}
2810 		entry_point.entry_count++;
2811 	}
2812 
2813 	return VDO_SUCCESS;
2814 }
2815 
2816 /**
2817  * apply_journal_entries() - Find the relevant vio of the slab journal and apply all valid entries.
2818  * @completion: The metadata read vio completion.
2819  *
2820  * This is a callback registered in start_scrubbing().
2821  */
2822 static void apply_journal_entries(struct vdo_completion *completion)
2823 {
2824 	int result;
2825 	struct slab_scrubber *scrubber =
2826 		container_of(as_vio(completion), struct slab_scrubber, vio);
2827 	struct vdo_slab *slab = scrubber->slab;
2828 	struct slab_journal *journal = &slab->journal;
2829 
2830 	/* Find the boundaries of the useful part of the journal. */
2831 	sequence_number_t tail = journal->tail;
2832 	tail_block_offset_t end_index = (tail - 1) % journal->size;
2833 	char *end_data = scrubber->vio.data + (end_index * VDO_BLOCK_SIZE);
2834 	struct packed_slab_journal_block *end_block =
2835 		(struct packed_slab_journal_block *) end_data;
2836 
2837 	sequence_number_t head = __le64_to_cpu(end_block->header.head);
2838 	tail_block_offset_t head_index = head % journal->size;
2839 	block_count_t index = head_index;
2840 
2841 	struct journal_point ref_counts_point = slab->slab_journal_point;
2842 	struct journal_point last_entry_applied = ref_counts_point;
2843 	sequence_number_t sequence;
2844 
2845 	for (sequence = head; sequence < tail; sequence++) {
2846 		char *block_data = scrubber->vio.data + (index * VDO_BLOCK_SIZE);
2847 		struct packed_slab_journal_block *block =
2848 			(struct packed_slab_journal_block *) block_data;
2849 		struct slab_journal_block_header header;
2850 
2851 		vdo_unpack_slab_journal_block_header(&block->header, &header);
2852 
2853 		if ((header.nonce != slab->allocator->nonce) ||
2854 		    (header.metadata_type != VDO_METADATA_SLAB_JOURNAL) ||
2855 		    (header.sequence_number != sequence) ||
2856 		    (header.entry_count > journal->entries_per_block) ||
2857 		    (header.has_block_map_increments &&
2858 		     (header.entry_count > journal->full_entries_per_block))) {
2859 			/* The block is not what we expect it to be. */
2860 			vdo_log_error("vdo_slab journal block for slab %u was invalid",
2861 				      slab->slab_number);
2862 			abort_scrubbing(scrubber, VDO_CORRUPT_JOURNAL);
2863 			return;
2864 		}
2865 
2866 		result = apply_block_entries(block, header.entry_count, sequence, slab);
2867 		if (result != VDO_SUCCESS) {
2868 			abort_scrubbing(scrubber, result);
2869 			return;
2870 		}
2871 
2872 		last_entry_applied.sequence_number = sequence;
2873 		last_entry_applied.entry_count = header.entry_count - 1;
2874 		index++;
2875 		if (index == journal->size)
2876 			index = 0;
2877 	}
2878 
2879 	/*
2880 	 * At the end of rebuild, the reference counters should be accurate to the end of the
2881 	 * journal we just applied.
2882 	 */
2883 	result = VDO_ASSERT(!vdo_before_journal_point(&last_entry_applied,
2884 						      &ref_counts_point),
2885 			    "Refcounts are not more accurate than the slab journal");
2886 	if (result != VDO_SUCCESS) {
2887 		abort_scrubbing(scrubber, result);
2888 		return;
2889 	}
2890 
2891 	/* Save out the rebuilt reference blocks. */
2892 	vdo_prepare_completion(completion, slab_scrubbed, handle_scrubber_error,
2893 			       slab->allocator->thread_id, completion->parent);
2894 	vdo_start_operation_with_waiter(&slab->state,
2895 					VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING,
2896 					completion, initiate_slab_action);
2897 }
2898 
2899 static void read_slab_journal_endio(struct bio *bio)
2900 {
2901 	struct vio *vio = bio->bi_private;
2902 	struct slab_scrubber *scrubber = container_of(vio, struct slab_scrubber, vio);
2903 
2904 	continue_vio_after_io(bio->bi_private, apply_journal_entries,
2905 			      scrubber->slab->allocator->thread_id);
2906 }
2907 
2908 /**
2909  * start_scrubbing() - Read the current slab's journal from disk now that it has been flushed.
2910  * @completion: The scrubber's vio completion.
2911  *
2912  * This callback is registered in scrub_next_slab().
2913  */
2914 static void start_scrubbing(struct vdo_completion *completion)
2915 {
2916 	struct slab_scrubber *scrubber =
2917 		container_of(as_vio(completion), struct slab_scrubber, vio);
2918 	struct vdo_slab *slab = scrubber->slab;
2919 
2920 	if (!slab->allocator->summary_entries[slab->slab_number].is_dirty) {
2921 		slab_scrubbed(completion);
2922 		return;
2923 	}
2924 
2925 	vdo_submit_metadata_vio(&scrubber->vio, slab->journal_origin,
2926 				read_slab_journal_endio, handle_scrubber_error,
2927 				REQ_OP_READ);
2928 }
2929 
2930 /**
2931  * scrub_next_slab() - Scrub the next slab if there is one.
2932  * @scrubber: The scrubber.
2933  */
2934 static void scrub_next_slab(struct slab_scrubber *scrubber)
2935 {
2936 	struct vdo_completion *completion = &scrubber->vio.completion;
2937 	struct vdo_slab *slab;
2938 
2939 	/*
2940 	 * Note: this notify call is always safe only because scrubbing can only be started when
2941 	 * the VDO is quiescent.
2942 	 */
2943 	vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2944 
2945 	if (vdo_is_read_only(completion->vdo)) {
2946 		finish_scrubbing(scrubber, VDO_READ_ONLY);
2947 		return;
2948 	}
2949 
2950 	slab = get_next_slab(scrubber);
2951 	if ((slab == NULL) ||
2952 	    (scrubber->high_priority_only && list_empty(&scrubber->high_priority_slabs))) {
2953 		finish_scrubbing(scrubber, VDO_SUCCESS);
2954 		return;
2955 	}
2956 
2957 	if (vdo_finish_draining(&scrubber->admin_state))
2958 		return;
2959 
2960 	list_del_init(&slab->allocq_entry);
2961 	scrubber->slab = slab;
2962 	vdo_prepare_completion(completion, start_scrubbing, handle_scrubber_error,
2963 			       slab->allocator->thread_id, completion->parent);
2964 	vdo_start_operation_with_waiter(&slab->state, VDO_ADMIN_STATE_SCRUBBING,
2965 					completion, initiate_slab_action);
2966 }
2967 
2968 /**
2969  * scrub_slabs() - Scrub all of an allocator's slabs that are eligible for scrubbing.
2970  * @allocator: The block_allocator to scrub.
2971  * @parent: The completion to notify when scrubbing is done, implies high_priority, may be NULL.
2972  */
2973 static void scrub_slabs(struct block_allocator *allocator, struct vdo_completion *parent)
2974 {
2975 	struct slab_scrubber *scrubber = &allocator->scrubber;
2976 
2977 	scrubber->vio.completion.parent = parent;
2978 	scrubber->high_priority_only = (parent != NULL);
2979 	if (!has_slabs_to_scrub(scrubber)) {
2980 		finish_scrubbing(scrubber, VDO_SUCCESS);
2981 		return;
2982 	}
2983 
2984 	if (scrubber->high_priority_only &&
2985 	    vdo_is_priority_table_empty(allocator->prioritized_slabs) &&
2986 	    list_empty(&scrubber->high_priority_slabs))
2987 		register_slab_for_scrubbing(get_next_slab(scrubber), true);
2988 
2989 	vdo_resume_if_quiescent(&scrubber->admin_state);
2990 	scrub_next_slab(scrubber);
2991 }
2992 
2993 static inline void assert_on_allocator_thread(thread_id_t thread_id,
2994 					      const char *function_name)
2995 {
2996 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == thread_id),
2997 			    "%s called on correct thread", function_name);
2998 }
2999 
3000 static void register_slab_with_allocator(struct block_allocator *allocator,
3001 					 struct vdo_slab *slab)
3002 {
3003 	allocator->slab_count++;
3004 	allocator->last_slab = slab->slab_number;
3005 }
3006 
3007 /**
3008  * get_depot_slab_iterator() - Return a slab_iterator over the slabs in a slab_depot.
3009  * @depot: The depot over which to iterate.
3010  * @start: The number of the slab to start iterating from.
3011  * @end: The number of the last slab which may be returned.
3012  * @stride: The difference in slab number between successive slabs.
3013  *
3014  * Iteration always occurs from higher to lower numbered slabs.
3015  *
3016  * Return: An initialized iterator structure.
3017  */
3018 static struct slab_iterator get_depot_slab_iterator(struct slab_depot *depot,
3019 						    slab_count_t start, slab_count_t end,
3020 						    slab_count_t stride)
3021 {
3022 	struct vdo_slab **slabs = depot->slabs;
3023 
3024 	return (struct slab_iterator) {
3025 		.slabs = slabs,
3026 		.next = (((slabs == NULL) || (start < end)) ? NULL : slabs[start]),
3027 		.end = end,
3028 		.stride = stride,
3029 	};
3030 }
3031 
3032 static struct slab_iterator get_slab_iterator(const struct block_allocator *allocator)
3033 {
3034 	return get_depot_slab_iterator(allocator->depot, allocator->last_slab,
3035 				       allocator->zone_number,
3036 				       allocator->depot->zone_count);
3037 }
3038 
3039 /**
3040  * next_slab() - Get the next slab from a slab_iterator and advance the iterator
3041  * @iterator: The slab_iterator.
3042  *
3043  * Return: The next slab or NULL if the iterator is exhausted.
3044  */
3045 static struct vdo_slab *next_slab(struct slab_iterator *iterator)
3046 {
3047 	struct vdo_slab *slab = iterator->next;
3048 
3049 	if ((slab == NULL) || (slab->slab_number < iterator->end + iterator->stride))
3050 		iterator->next = NULL;
3051 	else
3052 		iterator->next = iterator->slabs[slab->slab_number - iterator->stride];
3053 
3054 	return slab;
3055 }
3056 
3057 /**
3058  * abort_waiter() - Abort vios waiting to make journal entries when read-only.
3059  *
3060  * This callback is invoked on all vios waiting to make slab journal entries after the VDO has gone
3061  * into read-only mode. Implements waiter_callback_fn.
3062  */
3063 static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
3064 {
3065 	struct reference_updater *updater =
3066 		container_of(waiter, struct reference_updater, waiter);
3067 	struct data_vio *data_vio = data_vio_from_reference_updater(updater);
3068 
3069 	if (updater->increment) {
3070 		continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
3071 		return;
3072 	}
3073 
3074 	vdo_continue_completion(&data_vio->decrement_completion, VDO_READ_ONLY);
3075 }
3076 
3077 /* Implements vdo_read_only_notification_fn. */
3078 static void notify_block_allocator_of_read_only_mode(void *listener,
3079 						     struct vdo_completion *parent)
3080 {
3081 	struct block_allocator *allocator = listener;
3082 	struct slab_iterator iterator;
3083 
3084 	assert_on_allocator_thread(allocator->thread_id, __func__);
3085 	iterator = get_slab_iterator(allocator);
3086 	while (iterator.next != NULL) {
3087 		struct vdo_slab *slab = next_slab(&iterator);
3088 
3089 		vdo_waitq_notify_all_waiters(&slab->journal.entry_waiters,
3090 					     abort_waiter, &slab->journal);
3091 		check_if_slab_drained(slab);
3092 	}
3093 
3094 	vdo_finish_completion(parent);
3095 }
3096 
3097 /**
3098  * vdo_acquire_provisional_reference() - Acquire a provisional reference on behalf of a PBN lock if
3099  *                                       the block it locks is unreferenced.
3100  * @slab: The slab which contains the block.
3101  * @pbn: The physical block to reference.
3102  * @lock: The lock.
3103  *
3104  * Return: VDO_SUCCESS or an error.
3105  */
3106 int vdo_acquire_provisional_reference(struct vdo_slab *slab, physical_block_number_t pbn,
3107 				      struct pbn_lock *lock)
3108 {
3109 	slab_block_number block_number;
3110 	int result;
3111 
3112 	if (vdo_pbn_lock_has_provisional_reference(lock))
3113 		return VDO_SUCCESS;
3114 
3115 	if (!is_slab_open(slab))
3116 		return VDO_INVALID_ADMIN_STATE;
3117 
3118 	result = slab_block_number_from_pbn(slab, pbn, &block_number);
3119 	if (result != VDO_SUCCESS)
3120 		return result;
3121 
3122 	if (slab->counters[block_number] == EMPTY_REFERENCE_COUNT) {
3123 		make_provisional_reference(slab, block_number);
3124 		if (lock != NULL)
3125 			vdo_assign_pbn_lock_provisional_reference(lock);
3126 	}
3127 
3128 	if (vdo_pbn_lock_has_provisional_reference(lock))
3129 		adjust_free_block_count(slab, false);
3130 
3131 	return VDO_SUCCESS;
3132 }
3133 
3134 static int __must_check allocate_slab_block(struct vdo_slab *slab,
3135 					    physical_block_number_t *block_number_ptr)
3136 {
3137 	slab_block_number free_index;
3138 
3139 	if (!is_slab_open(slab))
3140 		return VDO_INVALID_ADMIN_STATE;
3141 
3142 	if (!search_reference_blocks(slab, &free_index))
3143 		return VDO_NO_SPACE;
3144 
3145 	VDO_ASSERT_LOG_ONLY((slab->counters[free_index] == EMPTY_REFERENCE_COUNT),
3146 			    "free block must have ref count of zero");
3147 	make_provisional_reference(slab, free_index);
3148 	adjust_free_block_count(slab, false);
3149 
3150 	/*
3151 	 * Update the search hint so the next search will start at the array index just past the
3152 	 * free block we just found.
3153 	 */
3154 	slab->search_cursor.index = (free_index + 1);
3155 
3156 	*block_number_ptr = slab->start + free_index;
3157 	return VDO_SUCCESS;
3158 }
3159 
3160 /**
3161  * open_slab() - Prepare a slab to be allocated from.
3162  * @slab: The slab.
3163  */
3164 static void open_slab(struct vdo_slab *slab)
3165 {
3166 	reset_search_cursor(slab);
3167 	if (is_slab_journal_blank(slab)) {
3168 		WRITE_ONCE(slab->allocator->statistics.slabs_opened,
3169 			   slab->allocator->statistics.slabs_opened + 1);
3170 		dirty_all_reference_blocks(slab);
3171 	} else {
3172 		WRITE_ONCE(slab->allocator->statistics.slabs_reopened,
3173 			   slab->allocator->statistics.slabs_reopened + 1);
3174 	}
3175 
3176 	slab->allocator->open_slab = slab;
3177 }
3178 
3179 
3180 /*
3181  * The block allocated will have a provisional reference and the reference must be either confirmed
3182  * with a subsequent increment or vacated with a subsequent decrement via
3183  * vdo_release_block_reference().
3184  */
3185 int vdo_allocate_block(struct block_allocator *allocator,
3186 		       physical_block_number_t *block_number_ptr)
3187 {
3188 	int result;
3189 
3190 	if (allocator->open_slab != NULL) {
3191 		/* Try to allocate the next block in the currently open slab. */
3192 		result = allocate_slab_block(allocator->open_slab, block_number_ptr);
3193 		if ((result == VDO_SUCCESS) || (result != VDO_NO_SPACE))
3194 			return result;
3195 
3196 		/* Put the exhausted open slab back into the priority table. */
3197 		prioritize_slab(allocator->open_slab);
3198 	}
3199 
3200 	/* Remove the highest priority slab from the priority table and make it the open slab. */
3201 	open_slab(list_entry(vdo_priority_table_dequeue(allocator->prioritized_slabs),
3202 			     struct vdo_slab, allocq_entry));
3203 
3204 	/*
3205 	 * Try allocating again. If we're out of space immediately after opening a slab, then every
3206 	 * slab must be fully allocated.
3207 	 */
3208 	return allocate_slab_block(allocator->open_slab, block_number_ptr);
3209 }
3210 
3211 /**
3212  * vdo_enqueue_clean_slab_waiter() - Wait for a clean slab.
3213  * @allocator: The block_allocator on which to wait.
3214  * @waiter: The waiter.
3215  *
3216  * Return: VDO_SUCCESS if the waiter was queued, VDO_NO_SPACE if there are no slabs to scrub, and
3217  *         some other error otherwise.
3218  */
3219 int vdo_enqueue_clean_slab_waiter(struct block_allocator *allocator,
3220 				  struct vdo_waiter *waiter)
3221 {
3222 	if (vdo_is_read_only(allocator->depot->vdo))
3223 		return VDO_READ_ONLY;
3224 
3225 	if (vdo_is_state_quiescent(&allocator->scrubber.admin_state))
3226 		return VDO_NO_SPACE;
3227 
3228 	vdo_waitq_enqueue_waiter(&allocator->scrubber.waiters, waiter);
3229 	return VDO_SUCCESS;
3230 }
3231 
3232 /**
3233  * vdo_modify_reference_count() - Modify the reference count of a block by first making a slab
3234  *                                journal entry and then updating the reference counter.
3235  *
3236  * @data_vio: The data_vio for which to add the entry.
3237  * @updater: Which of the data_vio's reference updaters is being submitted.
3238  */
3239 void vdo_modify_reference_count(struct vdo_completion *completion,
3240 				struct reference_updater *updater)
3241 {
3242 	struct vdo_slab *slab = vdo_get_slab(completion->vdo->depot, updater->zpbn.pbn);
3243 
3244 	if (!is_slab_open(slab)) {
3245 		vdo_continue_completion(completion, VDO_INVALID_ADMIN_STATE);
3246 		return;
3247 	}
3248 
3249 	if (vdo_is_read_only(completion->vdo)) {
3250 		vdo_continue_completion(completion, VDO_READ_ONLY);
3251 		return;
3252 	}
3253 
3254 	vdo_waitq_enqueue_waiter(&slab->journal.entry_waiters, &updater->waiter);
3255 	if ((slab->status != VDO_SLAB_REBUILT) && requires_reaping(&slab->journal))
3256 		register_slab_for_scrubbing(slab, true);
3257 
3258 	add_entries(&slab->journal);
3259 }
3260 
3261 /* Release an unused provisional reference. */
3262 int vdo_release_block_reference(struct block_allocator *allocator,
3263 				physical_block_number_t pbn)
3264 {
3265 	struct reference_updater updater;
3266 
3267 	if (pbn == VDO_ZERO_BLOCK)
3268 		return VDO_SUCCESS;
3269 
3270 	updater = (struct reference_updater) {
3271 		.operation = VDO_JOURNAL_DATA_REMAPPING,
3272 		.increment = false,
3273 		.zpbn = {
3274 			.pbn = pbn,
3275 		},
3276 	};
3277 
3278 	return adjust_reference_count(vdo_get_slab(allocator->depot, pbn),
3279 				      &updater, NULL);
3280 }
3281 
3282 /*
3283  * This is a min_heap callback function orders slab_status structures using the 'is_clean' field as
3284  * the primary key and the 'emptiness' field as the secondary key.
3285  *
3286  * Slabs need to be pushed onto the rings in the same order they are to be popped off. Popping
3287  * should always get the most empty first, so pushing should be from most empty to least empty.
3288  * Thus, the ordering is reversed from the usual sense since min_heap returns smaller elements
3289  * before larger ones.
3290  */
3291 static bool slab_status_is_less_than(const void *item1, const void *item2,
3292 					void __always_unused *args)
3293 {
3294 	const struct slab_status *info1 = item1;
3295 	const struct slab_status *info2 = item2;
3296 
3297 	if (info1->is_clean != info2->is_clean)
3298 		return info1->is_clean;
3299 	if (info1->emptiness != info2->emptiness)
3300 		return info1->emptiness > info2->emptiness;
3301 	return info1->slab_number < info2->slab_number;
3302 }
3303 
3304 static void swap_slab_statuses(void *item1, void *item2, void __always_unused *args)
3305 {
3306 	struct slab_status *info1 = item1;
3307 	struct slab_status *info2 = item2;
3308 
3309 	swap(*info1, *info2);
3310 }
3311 
3312 static const struct min_heap_callbacks slab_status_min_heap = {
3313 	.less = slab_status_is_less_than,
3314 	.swp = swap_slab_statuses,
3315 };
3316 
3317 /* Inform the slab actor that a action has finished on some slab; used by apply_to_slabs(). */
3318 static void slab_action_callback(struct vdo_completion *completion)
3319 {
3320 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3321 	struct slab_actor *actor = &allocator->slab_actor;
3322 
3323 	if (--actor->slab_action_count == 0) {
3324 		actor->callback(completion);
3325 		return;
3326 	}
3327 
3328 	vdo_reset_completion(completion);
3329 }
3330 
3331 /* Preserve the error from part of an action and continue. */
3332 static void handle_operation_error(struct vdo_completion *completion)
3333 {
3334 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3335 
3336 	if (allocator->state.waiter != NULL)
3337 		vdo_set_completion_result(allocator->state.waiter, completion->result);
3338 	completion->callback(completion);
3339 }
3340 
3341 /* Perform an action on each of an allocator's slabs in parallel. */
3342 static void apply_to_slabs(struct block_allocator *allocator, vdo_action_fn callback)
3343 {
3344 	struct slab_iterator iterator;
3345 
3346 	vdo_prepare_completion(&allocator->completion, slab_action_callback,
3347 			       handle_operation_error, allocator->thread_id, NULL);
3348 	allocator->completion.requeue = false;
3349 
3350 	/*
3351 	 * Since we are going to dequeue all of the slabs, the open slab will become invalid, so
3352 	 * clear it.
3353 	 */
3354 	allocator->open_slab = NULL;
3355 
3356 	/* Ensure that we don't finish before we're done starting. */
3357 	allocator->slab_actor = (struct slab_actor) {
3358 		.slab_action_count = 1,
3359 		.callback = callback,
3360 	};
3361 
3362 	iterator = get_slab_iterator(allocator);
3363 	while (iterator.next != NULL) {
3364 		const struct admin_state_code *operation =
3365 			vdo_get_admin_state_code(&allocator->state);
3366 		struct vdo_slab *slab = next_slab(&iterator);
3367 
3368 		list_del_init(&slab->allocq_entry);
3369 		allocator->slab_actor.slab_action_count++;
3370 		vdo_start_operation_with_waiter(&slab->state, operation,
3371 						&allocator->completion,
3372 						initiate_slab_action);
3373 	}
3374 
3375 	slab_action_callback(&allocator->completion);
3376 }
3377 
3378 static void finish_loading_allocator(struct vdo_completion *completion)
3379 {
3380 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3381 	const struct admin_state_code *operation =
3382 		vdo_get_admin_state_code(&allocator->state);
3383 
3384 	if (allocator->eraser != NULL)
3385 		dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
3386 
3387 	if (operation == VDO_ADMIN_STATE_LOADING_FOR_RECOVERY) {
3388 		void *context =
3389 			vdo_get_current_action_context(allocator->depot->action_manager);
3390 
3391 		vdo_replay_into_slab_journals(allocator, context);
3392 		return;
3393 	}
3394 
3395 	vdo_finish_loading(&allocator->state);
3396 }
3397 
3398 static void erase_next_slab_journal(struct block_allocator *allocator);
3399 
3400 static void copy_callback(int read_err, unsigned long write_err, void *context)
3401 {
3402 	struct block_allocator *allocator = context;
3403 	int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO);
3404 
3405 	if (result != VDO_SUCCESS) {
3406 		vdo_fail_completion(&allocator->completion, result);
3407 		return;
3408 	}
3409 
3410 	erase_next_slab_journal(allocator);
3411 }
3412 
3413 /* erase_next_slab_journal() - Erase the next slab journal. */
3414 static void erase_next_slab_journal(struct block_allocator *allocator)
3415 {
3416 	struct vdo_slab *slab;
3417 	physical_block_number_t pbn;
3418 	struct dm_io_region regions[1];
3419 	struct slab_depot *depot = allocator->depot;
3420 	block_count_t blocks = depot->slab_config.slab_journal_blocks;
3421 
3422 	if (allocator->slabs_to_erase.next == NULL) {
3423 		vdo_finish_completion(&allocator->completion);
3424 		return;
3425 	}
3426 
3427 	slab = next_slab(&allocator->slabs_to_erase);
3428 	pbn = slab->journal_origin - depot->vdo->geometry.bio_offset;
3429 	regions[0] = (struct dm_io_region) {
3430 		.bdev = vdo_get_backing_device(depot->vdo),
3431 		.sector = pbn * VDO_SECTORS_PER_BLOCK,
3432 		.count = blocks * VDO_SECTORS_PER_BLOCK,
3433 	};
3434 	dm_kcopyd_zero(allocator->eraser, 1, regions, 0, copy_callback, allocator);
3435 }
3436 
3437 /* Implements vdo_admin_initiator_fn. */
3438 static void initiate_load(struct admin_state *state)
3439 {
3440 	struct block_allocator *allocator =
3441 		container_of(state, struct block_allocator, state);
3442 	const struct admin_state_code *operation = vdo_get_admin_state_code(state);
3443 
3444 	if (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD) {
3445 		/*
3446 		 * Must requeue because the kcopyd client cannot be freed in the same stack frame
3447 		 * as the kcopyd callback, lest it deadlock.
3448 		 */
3449 		vdo_prepare_completion_for_requeue(&allocator->completion,
3450 						   finish_loading_allocator,
3451 						   handle_operation_error,
3452 						   allocator->thread_id, NULL);
3453 		allocator->eraser = dm_kcopyd_client_create(NULL);
3454 		if (IS_ERR(allocator->eraser)) {
3455 			vdo_fail_completion(&allocator->completion,
3456 					    PTR_ERR(allocator->eraser));
3457 			allocator->eraser = NULL;
3458 			return;
3459 		}
3460 		allocator->slabs_to_erase = get_slab_iterator(allocator);
3461 
3462 		erase_next_slab_journal(allocator);
3463 		return;
3464 	}
3465 
3466 	apply_to_slabs(allocator, finish_loading_allocator);
3467 }
3468 
3469 /**
3470  * vdo_notify_slab_journals_are_recovered() - Inform a block allocator that its slab journals have
3471  *                                            been recovered from the recovery journal.
3472  * @completion The allocator completion
3473  */
3474 void vdo_notify_slab_journals_are_recovered(struct vdo_completion *completion)
3475 {
3476 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3477 
3478 	vdo_finish_loading_with_result(&allocator->state, completion->result);
3479 }
3480 
3481 static int get_slab_statuses(struct block_allocator *allocator,
3482 			     struct slab_status **statuses_ptr)
3483 {
3484 	int result;
3485 	struct slab_status *statuses;
3486 	struct slab_iterator iterator = get_slab_iterator(allocator);
3487 
3488 	result = vdo_allocate(allocator->slab_count, struct slab_status, __func__,
3489 			      &statuses);
3490 	if (result != VDO_SUCCESS)
3491 		return result;
3492 
3493 	*statuses_ptr = statuses;
3494 
3495 	while (iterator.next != NULL)  {
3496 		slab_count_t slab_number = next_slab(&iterator)->slab_number;
3497 
3498 		*statuses++ = (struct slab_status) {
3499 			.slab_number = slab_number,
3500 			.is_clean = !allocator->summary_entries[slab_number].is_dirty,
3501 			.emptiness = allocator->summary_entries[slab_number].fullness_hint,
3502 		};
3503 	}
3504 
3505 	return VDO_SUCCESS;
3506 }
3507 
3508 /* Prepare slabs for allocation or scrubbing. */
3509 static int __must_check vdo_prepare_slabs_for_allocation(struct block_allocator *allocator)
3510 {
3511 	struct slab_status current_slab_status;
3512 	DEFINE_MIN_HEAP(struct slab_status, heap) heap;
3513 	int result;
3514 	struct slab_status *slab_statuses;
3515 	struct slab_depot *depot = allocator->depot;
3516 
3517 	WRITE_ONCE(allocator->allocated_blocks,
3518 		   allocator->slab_count * depot->slab_config.data_blocks);
3519 	result = get_slab_statuses(allocator, &slab_statuses);
3520 	if (result != VDO_SUCCESS)
3521 		return result;
3522 
3523 	/* Sort the slabs by cleanliness, then by emptiness hint. */
3524 	heap = (struct heap) {
3525 		.data = slab_statuses,
3526 		.nr = allocator->slab_count,
3527 		.size = allocator->slab_count,
3528 	};
3529 	min_heapify_all(&heap, &slab_status_min_heap, NULL);
3530 
3531 	while (heap.nr > 0) {
3532 		bool high_priority;
3533 		struct vdo_slab *slab;
3534 		struct slab_journal *journal;
3535 
3536 		current_slab_status = slab_statuses[0];
3537 		min_heap_pop(&heap, &slab_status_min_heap, NULL);
3538 		slab = depot->slabs[current_slab_status.slab_number];
3539 
3540 		if ((depot->load_type == VDO_SLAB_DEPOT_REBUILD_LOAD) ||
3541 		    (!allocator->summary_entries[slab->slab_number].load_ref_counts &&
3542 		     current_slab_status.is_clean)) {
3543 			queue_slab(slab);
3544 			continue;
3545 		}
3546 
3547 		slab->status = VDO_SLAB_REQUIRES_SCRUBBING;
3548 		journal = &slab->journal;
3549 		high_priority = ((current_slab_status.is_clean &&
3550 				 (depot->load_type == VDO_SLAB_DEPOT_NORMAL_LOAD)) ||
3551 				 (journal_length(journal) >= journal->scrubbing_threshold));
3552 		register_slab_for_scrubbing(slab, high_priority);
3553 	}
3554 
3555 	vdo_free(slab_statuses);
3556 	return VDO_SUCCESS;
3557 }
3558 
3559 static const char *status_to_string(enum slab_rebuild_status status)
3560 {
3561 	switch (status) {
3562 	case VDO_SLAB_REBUILT:
3563 		return "REBUILT";
3564 	case VDO_SLAB_REQUIRES_SCRUBBING:
3565 		return "SCRUBBING";
3566 	case VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING:
3567 		return "PRIORITY_SCRUBBING";
3568 	case VDO_SLAB_REBUILDING:
3569 		return "REBUILDING";
3570 	case VDO_SLAB_REPLAYING:
3571 		return "REPLAYING";
3572 	default:
3573 		return "UNKNOWN";
3574 	}
3575 }
3576 
3577 void vdo_dump_block_allocator(const struct block_allocator *allocator)
3578 {
3579 	unsigned int pause_counter = 0;
3580 	struct slab_iterator iterator = get_slab_iterator(allocator);
3581 	const struct slab_scrubber *scrubber = &allocator->scrubber;
3582 
3583 	vdo_log_info("block_allocator zone %u", allocator->zone_number);
3584 	while (iterator.next != NULL) {
3585 		struct vdo_slab *slab = next_slab(&iterator);
3586 		struct slab_journal *journal = &slab->journal;
3587 
3588 		if (slab->reference_blocks != NULL) {
3589 			/* Terse because there are a lot of slabs to dump and syslog is lossy. */
3590 			vdo_log_info("slab %u: P%u, %llu free", slab->slab_number,
3591 				     slab->priority,
3592 				     (unsigned long long) slab->free_blocks);
3593 		} else {
3594 			vdo_log_info("slab %u: status %s", slab->slab_number,
3595 				     status_to_string(slab->status));
3596 		}
3597 
3598 		vdo_log_info("  slab journal: entry_waiters=%zu waiting_to_commit=%s updating_slab_summary=%s head=%llu unreapable=%llu tail=%llu next_commit=%llu summarized=%llu last_summarized=%llu recovery_lock=%llu dirty=%s",
3599 			     vdo_waitq_num_waiters(&journal->entry_waiters),
3600 			     vdo_bool_to_string(journal->waiting_to_commit),
3601 			     vdo_bool_to_string(journal->updating_slab_summary),
3602 			     (unsigned long long) journal->head,
3603 			     (unsigned long long) journal->unreapable,
3604 			     (unsigned long long) journal->tail,
3605 			     (unsigned long long) journal->next_commit,
3606 			     (unsigned long long) journal->summarized,
3607 			     (unsigned long long) journal->last_summarized,
3608 			     (unsigned long long) journal->recovery_lock,
3609 			     vdo_bool_to_string(journal->recovery_lock != 0));
3610 		/*
3611 		 * Given the frequency with which the locks are just a tiny bit off, it might be
3612 		 * worth dumping all the locks, but that might be too much logging.
3613 		 */
3614 
3615 		if (slab->counters != NULL) {
3616 			/* Terse because there are a lot of slabs to dump and syslog is lossy. */
3617 			vdo_log_info("  slab: free=%u/%u blocks=%u dirty=%zu active=%zu journal@(%llu,%u)",
3618 				     slab->free_blocks, slab->block_count,
3619 				     slab->reference_block_count,
3620 				     vdo_waitq_num_waiters(&slab->dirty_blocks),
3621 				     slab->active_count,
3622 				     (unsigned long long) slab->slab_journal_point.sequence_number,
3623 				     slab->slab_journal_point.entry_count);
3624 		} else {
3625 			vdo_log_info("  no counters");
3626 		}
3627 
3628 		/*
3629 		 * Wait for a while after each batch of 32 slabs dumped, an arbitrary number,
3630 		 * allowing the kernel log a chance to be flushed instead of being overrun.
3631 		 */
3632 		if (pause_counter++ == 31) {
3633 			pause_counter = 0;
3634 			vdo_pause_for_logger();
3635 		}
3636 	}
3637 
3638 	vdo_log_info("slab_scrubber slab_count %u waiters %zu %s%s",
3639 		     READ_ONCE(scrubber->slab_count),
3640 		     vdo_waitq_num_waiters(&scrubber->waiters),
3641 		     vdo_get_admin_state_code(&scrubber->admin_state)->name,
3642 		     scrubber->high_priority_only ? ", high_priority_only " : "");
3643 }
3644 
3645 static void free_slab(struct vdo_slab *slab)
3646 {
3647 	if (slab == NULL)
3648 		return;
3649 
3650 	list_del(&slab->allocq_entry);
3651 	vdo_free(vdo_forget(slab->journal.block));
3652 	vdo_free(vdo_forget(slab->journal.locks));
3653 	vdo_free(vdo_forget(slab->counters));
3654 	vdo_free(vdo_forget(slab->reference_blocks));
3655 	vdo_free(slab);
3656 }
3657 
3658 static int initialize_slab_journal(struct vdo_slab *slab)
3659 {
3660 	struct slab_journal *journal = &slab->journal;
3661 	const struct slab_config *slab_config = &slab->allocator->depot->slab_config;
3662 	int result;
3663 
3664 	result = vdo_allocate(slab_config->slab_journal_blocks, struct journal_lock,
3665 			      __func__, &journal->locks);
3666 	if (result != VDO_SUCCESS)
3667 		return result;
3668 
3669 	result = vdo_allocate(VDO_BLOCK_SIZE, char, "struct packed_slab_journal_block",
3670 			      (char **) &journal->block);
3671 	if (result != VDO_SUCCESS)
3672 		return result;
3673 
3674 	journal->slab = slab;
3675 	journal->size = slab_config->slab_journal_blocks;
3676 	journal->flushing_threshold = slab_config->slab_journal_flushing_threshold;
3677 	journal->blocking_threshold = slab_config->slab_journal_blocking_threshold;
3678 	journal->scrubbing_threshold = slab_config->slab_journal_scrubbing_threshold;
3679 	journal->entries_per_block = VDO_SLAB_JOURNAL_ENTRIES_PER_BLOCK;
3680 	journal->full_entries_per_block = VDO_SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK;
3681 	journal->events = &slab->allocator->slab_journal_statistics;
3682 	journal->recovery_journal = slab->allocator->depot->vdo->recovery_journal;
3683 	journal->tail = 1;
3684 	journal->head = 1;
3685 
3686 	journal->flushing_deadline = journal->flushing_threshold;
3687 	/*
3688 	 * Set there to be some time between the deadline and the blocking threshold, so that
3689 	 * hopefully all are done before blocking.
3690 	 */
3691 	if ((journal->blocking_threshold - journal->flushing_threshold) > 5)
3692 		journal->flushing_deadline = journal->blocking_threshold - 5;
3693 
3694 	journal->slab_summary_waiter.callback = release_journal_locks;
3695 
3696 	INIT_LIST_HEAD(&journal->dirty_entry);
3697 	INIT_LIST_HEAD(&journal->uncommitted_blocks);
3698 
3699 	journal->tail_header.nonce = slab->allocator->nonce;
3700 	journal->tail_header.metadata_type = VDO_METADATA_SLAB_JOURNAL;
3701 	initialize_journal_state(journal);
3702 	return VDO_SUCCESS;
3703 }
3704 
3705 /**
3706  * make_slab() - Construct a new, empty slab.
3707  * @slab_origin: The physical block number within the block allocator partition of the first block
3708  *               in the slab.
3709  * @allocator: The block allocator to which the slab belongs.
3710  * @slab_number: The slab number of the slab.
3711  * @is_new: true if this slab is being allocated as part of a resize.
3712  * @slab_ptr: A pointer to receive the new slab.
3713  *
3714  * Return: VDO_SUCCESS or an error code.
3715  */
3716 static int __must_check make_slab(physical_block_number_t slab_origin,
3717 				  struct block_allocator *allocator,
3718 				  slab_count_t slab_number, bool is_new,
3719 				  struct vdo_slab **slab_ptr)
3720 {
3721 	const struct slab_config *slab_config = &allocator->depot->slab_config;
3722 	struct vdo_slab *slab;
3723 	int result;
3724 
3725 	result = vdo_allocate(1, struct vdo_slab, __func__, &slab);
3726 	if (result != VDO_SUCCESS)
3727 		return result;
3728 
3729 	*slab = (struct vdo_slab) {
3730 		.allocator = allocator,
3731 		.start = slab_origin,
3732 		.end = slab_origin + slab_config->slab_blocks,
3733 		.slab_number = slab_number,
3734 		.ref_counts_origin = slab_origin + slab_config->data_blocks,
3735 		.journal_origin =
3736 			vdo_get_slab_journal_start_block(slab_config, slab_origin),
3737 		.block_count = slab_config->data_blocks,
3738 		.free_blocks = slab_config->data_blocks,
3739 		.reference_block_count =
3740 			vdo_get_saved_reference_count_size(slab_config->data_blocks),
3741 	};
3742 	INIT_LIST_HEAD(&slab->allocq_entry);
3743 
3744 	result = initialize_slab_journal(slab);
3745 	if (result != VDO_SUCCESS) {
3746 		free_slab(slab);
3747 		return result;
3748 	}
3749 
3750 	if (is_new) {
3751 		vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NEW);
3752 		result = allocate_slab_counters(slab);
3753 		if (result != VDO_SUCCESS) {
3754 			free_slab(slab);
3755 			return result;
3756 		}
3757 	} else {
3758 		vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
3759 	}
3760 
3761 	*slab_ptr = slab;
3762 	return VDO_SUCCESS;
3763 }
3764 
3765 /**
3766  * allocate_slabs() - Allocate a new slab pointer array.
3767  * @depot: The depot.
3768  * @slab_count: The number of slabs the depot should have in the new array.
3769  *
3770  * Any existing slab pointers will be copied into the new array, and slabs will be allocated as
3771  * needed. The newly allocated slabs will not be distributed for use by the block allocators.
3772  *
3773  * Return: VDO_SUCCESS or an error code.
3774  */
3775 static int allocate_slabs(struct slab_depot *depot, slab_count_t slab_count)
3776 {
3777 	block_count_t slab_size;
3778 	bool resizing = false;
3779 	physical_block_number_t slab_origin;
3780 	int result;
3781 
3782 	result = vdo_allocate(slab_count, struct vdo_slab *,
3783 			      "slab pointer array", &depot->new_slabs);
3784 	if (result != VDO_SUCCESS)
3785 		return result;
3786 
3787 	if (depot->slabs != NULL) {
3788 		memcpy(depot->new_slabs, depot->slabs,
3789 		       depot->slab_count * sizeof(struct vdo_slab *));
3790 		resizing = true;
3791 	}
3792 
3793 	slab_size = depot->slab_config.slab_blocks;
3794 	slab_origin = depot->first_block + (depot->slab_count * slab_size);
3795 
3796 	for (depot->new_slab_count = depot->slab_count;
3797 	     depot->new_slab_count < slab_count;
3798 	     depot->new_slab_count++, slab_origin += slab_size) {
3799 		struct block_allocator *allocator =
3800 			&depot->allocators[depot->new_slab_count % depot->zone_count];
3801 		struct vdo_slab **slab_ptr = &depot->new_slabs[depot->new_slab_count];
3802 
3803 		result = make_slab(slab_origin, allocator, depot->new_slab_count,
3804 				   resizing, slab_ptr);
3805 		if (result != VDO_SUCCESS)
3806 			return result;
3807 	}
3808 
3809 	return VDO_SUCCESS;
3810 }
3811 
3812 /**
3813  * vdo_abandon_new_slabs() - Abandon any new slabs in this depot, freeing them as needed.
3814  * @depot: The depot.
3815  */
3816 void vdo_abandon_new_slabs(struct slab_depot *depot)
3817 {
3818 	slab_count_t i;
3819 
3820 	if (depot->new_slabs == NULL)
3821 		return;
3822 
3823 	for (i = depot->slab_count; i < depot->new_slab_count; i++)
3824 		free_slab(vdo_forget(depot->new_slabs[i]));
3825 	depot->new_slab_count = 0;
3826 	depot->new_size = 0;
3827 	vdo_free(vdo_forget(depot->new_slabs));
3828 }
3829 
3830 /**
3831  * get_allocator_thread_id() - Get the ID of the thread on which a given allocator operates.
3832  *
3833  * Implements vdo_zone_thread_getter_fn.
3834  */
3835 static thread_id_t get_allocator_thread_id(void *context, zone_count_t zone_number)
3836 {
3837 	return ((struct slab_depot *) context)->allocators[zone_number].thread_id;
3838 }
3839 
3840 /**
3841  * release_recovery_journal_lock() - Request the slab journal to release the recovery journal lock
3842  *                                   it may hold on a specified recovery journal block.
3843  * @journal: The slab journal.
3844  * @recovery_lock: The sequence number of the recovery journal block whose locks should be
3845  *                 released.
3846  *
3847  * Return: true if the journal does hold a lock on the specified block (which it will release).
3848  */
3849 static bool __must_check release_recovery_journal_lock(struct slab_journal *journal,
3850 						       sequence_number_t recovery_lock)
3851 {
3852 	if (recovery_lock > journal->recovery_lock) {
3853 		VDO_ASSERT_LOG_ONLY((recovery_lock < journal->recovery_lock),
3854 				    "slab journal recovery lock is not older than the recovery journal head");
3855 		return false;
3856 	}
3857 
3858 	if ((recovery_lock < journal->recovery_lock) ||
3859 	    vdo_is_read_only(journal->slab->allocator->depot->vdo))
3860 		return false;
3861 
3862 	/* All locks are held by the block which is in progress; write it. */
3863 	commit_tail(journal);
3864 	return true;
3865 }
3866 
3867 /*
3868  * Request a commit of all dirty tail blocks which are locking the recovery journal block the depot
3869  * is seeking to release.
3870  *
3871  * Implements vdo_zone_action_fn.
3872  */
3873 static void release_tail_block_locks(void *context, zone_count_t zone_number,
3874 				     struct vdo_completion *parent)
3875 {
3876 	struct slab_journal *journal, *tmp;
3877 	struct slab_depot *depot = context;
3878 	struct list_head *list = &depot->allocators[zone_number].dirty_slab_journals;
3879 
3880 	list_for_each_entry_safe(journal, tmp, list, dirty_entry) {
3881 		if (!release_recovery_journal_lock(journal,
3882 						   depot->active_release_request))
3883 			break;
3884 	}
3885 
3886 	vdo_finish_completion(parent);
3887 }
3888 
3889 /**
3890  * prepare_for_tail_block_commit() - Prepare to commit oldest tail blocks.
3891  *
3892  * Implements vdo_action_preamble_fn.
3893  */
3894 static void prepare_for_tail_block_commit(void *context, struct vdo_completion *parent)
3895 {
3896 	struct slab_depot *depot = context;
3897 
3898 	depot->active_release_request = depot->new_release_request;
3899 	vdo_finish_completion(parent);
3900 }
3901 
3902 /**
3903  * schedule_tail_block_commit() - Schedule a tail block commit if necessary.
3904  *
3905  * This method should not be called directly. Rather, call vdo_schedule_default_action() on the
3906  * depot's action manager.
3907  *
3908  * Implements vdo_action_scheduler_fn.
3909  */
3910 static bool schedule_tail_block_commit(void *context)
3911 {
3912 	struct slab_depot *depot = context;
3913 
3914 	if (depot->new_release_request == depot->active_release_request)
3915 		return false;
3916 
3917 	return vdo_schedule_action(depot->action_manager,
3918 				   prepare_for_tail_block_commit,
3919 				   release_tail_block_locks,
3920 				   NULL, NULL);
3921 }
3922 
3923 /**
3924  * initialize_slab_scrubber() - Initialize an allocator's slab scrubber.
3925  * @allocator: The allocator being initialized
3926  *
3927  * Return: VDO_SUCCESS or an error.
3928  */
3929 static int initialize_slab_scrubber(struct block_allocator *allocator)
3930 {
3931 	struct slab_scrubber *scrubber = &allocator->scrubber;
3932 	block_count_t slab_journal_size =
3933 		allocator->depot->slab_config.slab_journal_blocks;
3934 	char *journal_data;
3935 	int result;
3936 
3937 	result = vdo_allocate(VDO_BLOCK_SIZE * slab_journal_size,
3938 			      char, __func__, &journal_data);
3939 	if (result != VDO_SUCCESS)
3940 		return result;
3941 
3942 	result = allocate_vio_components(allocator->completion.vdo,
3943 					 VIO_TYPE_SLAB_JOURNAL,
3944 					 VIO_PRIORITY_METADATA,
3945 					 allocator, slab_journal_size,
3946 					 journal_data, &scrubber->vio);
3947 	if (result != VDO_SUCCESS) {
3948 		vdo_free(journal_data);
3949 		return result;
3950 	}
3951 
3952 	INIT_LIST_HEAD(&scrubber->high_priority_slabs);
3953 	INIT_LIST_HEAD(&scrubber->slabs);
3954 	vdo_set_admin_state_code(&scrubber->admin_state, VDO_ADMIN_STATE_SUSPENDED);
3955 	return VDO_SUCCESS;
3956 }
3957 
3958 /**
3959  * initialize_slab_summary_block() - Initialize a slab_summary_block.
3960  * @allocator: The allocator which owns the block.
3961  * @index: The index of this block in its zone's summary.
3962  *
3963  * Return: VDO_SUCCESS or an error.
3964  */
3965 static int __must_check initialize_slab_summary_block(struct block_allocator *allocator,
3966 						      block_count_t index)
3967 {
3968 	struct slab_summary_block *block = &allocator->summary_blocks[index];
3969 	int result;
3970 
3971 	result = vdo_allocate(VDO_BLOCK_SIZE, char, __func__, &block->outgoing_entries);
3972 	if (result != VDO_SUCCESS)
3973 		return result;
3974 
3975 	result = allocate_vio_components(allocator->depot->vdo, VIO_TYPE_SLAB_SUMMARY,
3976 					 VIO_PRIORITY_METADATA, NULL, 1,
3977 					 block->outgoing_entries, &block->vio);
3978 	if (result != VDO_SUCCESS)
3979 		return result;
3980 
3981 	block->allocator = allocator;
3982 	block->entries = &allocator->summary_entries[VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK * index];
3983 	block->index = index;
3984 	return VDO_SUCCESS;
3985 }
3986 
3987 static int __must_check initialize_block_allocator(struct slab_depot *depot,
3988 						   zone_count_t zone)
3989 {
3990 	int result;
3991 	block_count_t i;
3992 	struct block_allocator *allocator = &depot->allocators[zone];
3993 	struct vdo *vdo = depot->vdo;
3994 	block_count_t max_free_blocks = depot->slab_config.data_blocks;
3995 	unsigned int max_priority = (2 + ilog2(max_free_blocks));
3996 
3997 	*allocator = (struct block_allocator) {
3998 		.depot = depot,
3999 		.zone_number = zone,
4000 		.thread_id = vdo->thread_config.physical_threads[zone],
4001 		.nonce = vdo->states.vdo.nonce,
4002 	};
4003 
4004 	INIT_LIST_HEAD(&allocator->dirty_slab_journals);
4005 	vdo_set_admin_state_code(&allocator->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
4006 	result = vdo_register_read_only_listener(vdo, allocator,
4007 						 notify_block_allocator_of_read_only_mode,
4008 						 allocator->thread_id);
4009 	if (result != VDO_SUCCESS)
4010 		return result;
4011 
4012 	vdo_initialize_completion(&allocator->completion, vdo, VDO_BLOCK_ALLOCATOR_COMPLETION);
4013 	result = make_vio_pool(vdo, BLOCK_ALLOCATOR_VIO_POOL_SIZE, allocator->thread_id,
4014 			       VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
4015 			       allocator, &allocator->vio_pool);
4016 	if (result != VDO_SUCCESS)
4017 		return result;
4018 
4019 	result = initialize_slab_scrubber(allocator);
4020 	if (result != VDO_SUCCESS)
4021 		return result;
4022 
4023 	result = vdo_make_priority_table(max_priority, &allocator->prioritized_slabs);
4024 	if (result != VDO_SUCCESS)
4025 		return result;
4026 
4027 	result = vdo_allocate(VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE,
4028 			      struct slab_summary_block, __func__,
4029 			      &allocator->summary_blocks);
4030 	if (result != VDO_SUCCESS)
4031 		return result;
4032 
4033 	vdo_set_admin_state_code(&allocator->summary_state,
4034 				 VDO_ADMIN_STATE_NORMAL_OPERATION);
4035 	allocator->summary_entries = depot->summary_entries + (MAX_VDO_SLABS * zone);
4036 
4037 	/* Initialize each summary block. */
4038 	for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4039 		result = initialize_slab_summary_block(allocator, i);
4040 		if (result != VDO_SUCCESS)
4041 			return result;
4042 	}
4043 
4044 	/*
4045 	 * Performing well atop thin provisioned storage requires either that VDO discards freed
4046 	 * blocks, or that the block allocator try to use slabs that already have allocated blocks
4047 	 * in preference to slabs that have never been opened. For reasons we have not been able to
4048 	 * fully understand, some SSD machines have been have been very sensitive (50% reduction in
4049 	 * test throughput) to very slight differences in the timing and locality of block
4050 	 * allocation. Assigning a low priority to unopened slabs (max_priority/2, say) would be
4051 	 * ideal for the story, but anything less than a very high threshold (max_priority - 1)
4052 	 * hurts on these machines.
4053 	 *
4054 	 * This sets the free block threshold for preferring to open an unopened slab to the binary
4055 	 * floor of 3/4ths the total number of data blocks in a slab, which will generally evaluate
4056 	 * to about half the slab size.
4057 	 */
4058 	allocator->unopened_slab_priority = (1 + ilog2((max_free_blocks * 3) / 4));
4059 
4060 	return VDO_SUCCESS;
4061 }
4062 
4063 static int allocate_components(struct slab_depot *depot,
4064 			       struct partition *summary_partition)
4065 {
4066 	int result;
4067 	zone_count_t zone;
4068 	slab_count_t slab_count;
4069 	u8 hint;
4070 	u32 i;
4071 	const struct thread_config *thread_config = &depot->vdo->thread_config;
4072 
4073 	result = vdo_make_action_manager(depot->zone_count, get_allocator_thread_id,
4074 					 thread_config->journal_thread, depot,
4075 					 schedule_tail_block_commit,
4076 					 depot->vdo, &depot->action_manager);
4077 	if (result != VDO_SUCCESS)
4078 		return result;
4079 
4080 	depot->origin = depot->first_block;
4081 
4082 	/* block size must be a multiple of entry size */
4083 	BUILD_BUG_ON((VDO_BLOCK_SIZE % sizeof(struct slab_summary_entry)) != 0);
4084 
4085 	depot->summary_origin = summary_partition->offset;
4086 	depot->hint_shift = vdo_get_slab_summary_hint_shift(depot->slab_size_shift);
4087 	result = vdo_allocate(MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES,
4088 			      struct slab_summary_entry, __func__,
4089 			      &depot->summary_entries);
4090 	if (result != VDO_SUCCESS)
4091 		return result;
4092 
4093 
4094 	/* Initialize all the entries. */
4095 	hint = compute_fullness_hint(depot, depot->slab_config.data_blocks);
4096 	for (i = 0; i < MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES; i++) {
4097 		/*
4098 		 * This default tail block offset must be reflected in
4099 		 * slabJournal.c::read_slab_journal_tail().
4100 		 */
4101 		depot->summary_entries[i] = (struct slab_summary_entry) {
4102 			.tail_block_offset = 0,
4103 			.fullness_hint = hint,
4104 			.load_ref_counts = false,
4105 			.is_dirty = false,
4106 		};
4107 	}
4108 
4109 	slab_count = vdo_compute_slab_count(depot->first_block, depot->last_block,
4110 					    depot->slab_size_shift);
4111 	if (thread_config->physical_zone_count > slab_count) {
4112 		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
4113 					      "%u physical zones exceeds slab count %u",
4114 					      thread_config->physical_zone_count,
4115 					      slab_count);
4116 	}
4117 
4118 	/* Initialize the block allocators. */
4119 	for (zone = 0; zone < depot->zone_count; zone++) {
4120 		result = initialize_block_allocator(depot, zone);
4121 		if (result != VDO_SUCCESS)
4122 			return result;
4123 	}
4124 
4125 	/* Allocate slabs. */
4126 	result = allocate_slabs(depot, slab_count);
4127 	if (result != VDO_SUCCESS)
4128 		return result;
4129 
4130 	/* Use the new slabs. */
4131 	for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4132 		struct vdo_slab *slab = depot->new_slabs[i];
4133 
4134 		register_slab_with_allocator(slab->allocator, slab);
4135 		WRITE_ONCE(depot->slab_count, depot->slab_count + 1);
4136 	}
4137 
4138 	depot->slabs = depot->new_slabs;
4139 	depot->new_slabs = NULL;
4140 	depot->new_slab_count = 0;
4141 
4142 	return VDO_SUCCESS;
4143 }
4144 
4145 /**
4146  * vdo_decode_slab_depot() - Make a slab depot and configure it with the state read from the super
4147  *                           block.
4148  * @state: The slab depot state from the super block.
4149  * @vdo: The VDO which will own the depot.
4150  * @summary_partition: The partition which holds the slab summary.
4151  * @depot_ptr: A pointer to hold the depot.
4152  *
4153  * Return: A success or error code.
4154  */
4155 int vdo_decode_slab_depot(struct slab_depot_state_2_0 state, struct vdo *vdo,
4156 			  struct partition *summary_partition,
4157 			  struct slab_depot **depot_ptr)
4158 {
4159 	unsigned int slab_size_shift;
4160 	struct slab_depot *depot;
4161 	int result;
4162 
4163 	/*
4164 	 * Calculate the bit shift for efficiently mapping block numbers to slabs. Using a shift
4165 	 * requires that the slab size be a power of two.
4166 	 */
4167 	block_count_t slab_size = state.slab_config.slab_blocks;
4168 
4169 	if (!is_power_of_2(slab_size)) {
4170 		return vdo_log_error_strerror(UDS_INVALID_ARGUMENT,
4171 					      "slab size must be a power of two");
4172 	}
4173 	slab_size_shift = ilog2(slab_size);
4174 
4175 	result = vdo_allocate_extended(struct slab_depot,
4176 				       vdo->thread_config.physical_zone_count,
4177 				       struct block_allocator, __func__, &depot);
4178 	if (result != VDO_SUCCESS)
4179 		return result;
4180 
4181 	depot->vdo = vdo;
4182 	depot->old_zone_count = state.zone_count;
4183 	depot->zone_count = vdo->thread_config.physical_zone_count;
4184 	depot->slab_config = state.slab_config;
4185 	depot->first_block = state.first_block;
4186 	depot->last_block = state.last_block;
4187 	depot->slab_size_shift = slab_size_shift;
4188 
4189 	result = allocate_components(depot, summary_partition);
4190 	if (result != VDO_SUCCESS) {
4191 		vdo_free_slab_depot(depot);
4192 		return result;
4193 	}
4194 
4195 	*depot_ptr = depot;
4196 	return VDO_SUCCESS;
4197 }
4198 
4199 static void uninitialize_allocator_summary(struct block_allocator *allocator)
4200 {
4201 	block_count_t i;
4202 
4203 	if (allocator->summary_blocks == NULL)
4204 		return;
4205 
4206 	for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4207 		free_vio_components(&allocator->summary_blocks[i].vio);
4208 		vdo_free(vdo_forget(allocator->summary_blocks[i].outgoing_entries));
4209 	}
4210 
4211 	vdo_free(vdo_forget(allocator->summary_blocks));
4212 }
4213 
4214 /**
4215  * vdo_free_slab_depot() - Destroy a slab depot.
4216  * @depot: The depot to destroy.
4217  */
4218 void vdo_free_slab_depot(struct slab_depot *depot)
4219 {
4220 	zone_count_t zone = 0;
4221 
4222 	if (depot == NULL)
4223 		return;
4224 
4225 	vdo_abandon_new_slabs(depot);
4226 
4227 	for (zone = 0; zone < depot->zone_count; zone++) {
4228 		struct block_allocator *allocator = &depot->allocators[zone];
4229 
4230 		if (allocator->eraser != NULL)
4231 			dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
4232 
4233 		uninitialize_allocator_summary(allocator);
4234 		uninitialize_scrubber_vio(&allocator->scrubber);
4235 		free_vio_pool(vdo_forget(allocator->vio_pool));
4236 		vdo_free_priority_table(vdo_forget(allocator->prioritized_slabs));
4237 	}
4238 
4239 	if (depot->slabs != NULL) {
4240 		slab_count_t i;
4241 
4242 		for (i = 0; i < depot->slab_count; i++)
4243 			free_slab(vdo_forget(depot->slabs[i]));
4244 	}
4245 
4246 	vdo_free(vdo_forget(depot->slabs));
4247 	vdo_free(vdo_forget(depot->action_manager));
4248 	vdo_free(vdo_forget(depot->summary_entries));
4249 	vdo_free(depot);
4250 }
4251 
4252 /**
4253  * vdo_record_slab_depot() - Record the state of a slab depot for encoding into the super block.
4254  * @depot: The depot to encode.
4255  *
4256  * Return: The depot state.
4257  */
4258 struct slab_depot_state_2_0 vdo_record_slab_depot(const struct slab_depot *depot)
4259 {
4260 	/*
4261 	 * If this depot is currently using 0 zones, it must have been synchronously loaded by a
4262 	 * tool and is now being saved. We did not load and combine the slab summary, so we still
4263 	 * need to do that next time we load with the old zone count rather than 0.
4264 	 */
4265 	struct slab_depot_state_2_0 state;
4266 	zone_count_t zones_to_record = depot->zone_count;
4267 
4268 	if (depot->zone_count == 0)
4269 		zones_to_record = depot->old_zone_count;
4270 
4271 	state = (struct slab_depot_state_2_0) {
4272 		.slab_config = depot->slab_config,
4273 		.first_block = depot->first_block,
4274 		.last_block = depot->last_block,
4275 		.zone_count = zones_to_record,
4276 	};
4277 
4278 	return state;
4279 }
4280 
4281 /**
4282  * vdo_allocate_reference_counters() - Allocate the reference counters for all slabs in the depot.
4283  *
4284  * Context: This method may be called only before entering normal operation from the load thread.
4285  *
4286  * Return: VDO_SUCCESS or an error.
4287  */
4288 int vdo_allocate_reference_counters(struct slab_depot *depot)
4289 {
4290 	struct slab_iterator iterator =
4291 		get_depot_slab_iterator(depot, depot->slab_count - 1, 0, 1);
4292 
4293 	while (iterator.next != NULL) {
4294 		int result = allocate_slab_counters(next_slab(&iterator));
4295 
4296 		if (result != VDO_SUCCESS)
4297 			return result;
4298 	}
4299 
4300 	return VDO_SUCCESS;
4301 }
4302 
4303 /**
4304  * get_slab_number() - Get the number of the slab that contains a specified block.
4305  * @depot: The slab depot.
4306  * @pbn: The physical block number.
4307  * @slab_number_ptr: A pointer to hold the slab number.
4308  *
4309  * Return: VDO_SUCCESS or an error.
4310  */
4311 static int __must_check get_slab_number(const struct slab_depot *depot,
4312 					physical_block_number_t pbn,
4313 					slab_count_t *slab_number_ptr)
4314 {
4315 	slab_count_t slab_number;
4316 
4317 	if (pbn < depot->first_block)
4318 		return VDO_OUT_OF_RANGE;
4319 
4320 	slab_number = (pbn - depot->first_block) >> depot->slab_size_shift;
4321 	if (slab_number >= depot->slab_count)
4322 		return VDO_OUT_OF_RANGE;
4323 
4324 	*slab_number_ptr = slab_number;
4325 	return VDO_SUCCESS;
4326 }
4327 
4328 /**
4329  * vdo_get_slab() - Get the slab object for the slab that contains a specified block.
4330  * @depot: The slab depot.
4331  * @pbn: The physical block number.
4332  *
4333  * Will put the VDO in read-only mode if the PBN is not a valid data block nor the zero block.
4334  *
4335  * Return: The slab containing the block, or NULL if the block number is the zero block or
4336  * otherwise out of range.
4337  */
4338 struct vdo_slab *vdo_get_slab(const struct slab_depot *depot,
4339 			      physical_block_number_t pbn)
4340 {
4341 	slab_count_t slab_number;
4342 	int result;
4343 
4344 	if (pbn == VDO_ZERO_BLOCK)
4345 		return NULL;
4346 
4347 	result = get_slab_number(depot, pbn, &slab_number);
4348 	if (result != VDO_SUCCESS) {
4349 		vdo_enter_read_only_mode(depot->vdo, result);
4350 		return NULL;
4351 	}
4352 
4353 	return depot->slabs[slab_number];
4354 }
4355 
4356 /**
4357  * vdo_get_increment_limit() - Determine how many new references a block can acquire.
4358  * @depot: The slab depot.
4359  * @pbn: The physical block number that is being queried.
4360  *
4361  * Context: This method must be called from the physical zone thread of the PBN.
4362  *
4363  * Return: The number of available references.
4364  */
4365 u8 vdo_get_increment_limit(struct slab_depot *depot, physical_block_number_t pbn)
4366 {
4367 	struct vdo_slab *slab = vdo_get_slab(depot, pbn);
4368 	vdo_refcount_t *counter_ptr = NULL;
4369 	int result;
4370 
4371 	if ((slab == NULL) || (slab->status != VDO_SLAB_REBUILT))
4372 		return 0;
4373 
4374 	result = get_reference_counter(slab, pbn, &counter_ptr);
4375 	if (result != VDO_SUCCESS)
4376 		return 0;
4377 
4378 	if (*counter_ptr == PROVISIONAL_REFERENCE_COUNT)
4379 		return (MAXIMUM_REFERENCE_COUNT - 1);
4380 
4381 	return (MAXIMUM_REFERENCE_COUNT - *counter_ptr);
4382 }
4383 
4384 /**
4385  * vdo_is_physical_data_block() - Determine whether the given PBN refers to a data block.
4386  * @depot: The depot.
4387  * @pbn: The physical block number to ask about.
4388  *
4389  * Return: True if the PBN corresponds to a data block.
4390  */
4391 bool vdo_is_physical_data_block(const struct slab_depot *depot,
4392 				physical_block_number_t pbn)
4393 {
4394 	slab_count_t slab_number;
4395 	slab_block_number sbn;
4396 
4397 	return ((pbn == VDO_ZERO_BLOCK) ||
4398 		((get_slab_number(depot, pbn, &slab_number) == VDO_SUCCESS) &&
4399 		 (slab_block_number_from_pbn(depot->slabs[slab_number], pbn, &sbn) ==
4400 		  VDO_SUCCESS)));
4401 }
4402 
4403 /**
4404  * vdo_get_slab_depot_allocated_blocks() - Get the total number of data blocks allocated across all
4405  * the slabs in the depot.
4406  * @depot: The slab depot.
4407  *
4408  * This is the total number of blocks with a non-zero reference count.
4409  *
4410  * Context: This may be called from any thread.
4411  *
4412  * Return: The total number of blocks with a non-zero reference count.
4413  */
4414 block_count_t vdo_get_slab_depot_allocated_blocks(const struct slab_depot *depot)
4415 {
4416 	block_count_t total = 0;
4417 	zone_count_t zone;
4418 
4419 	for (zone = 0; zone < depot->zone_count; zone++) {
4420 		/* The allocators are responsible for thread safety. */
4421 		total += READ_ONCE(depot->allocators[zone].allocated_blocks);
4422 	}
4423 
4424 	return total;
4425 }
4426 
4427 /**
4428  * vdo_get_slab_depot_data_blocks() - Get the total number of data blocks in all the slabs in the
4429  *                                    depot.
4430  * @depot: The slab depot.
4431  *
4432  * Context: This may be called from any thread.
4433  *
4434  * Return: The total number of data blocks in all slabs.
4435  */
4436 block_count_t vdo_get_slab_depot_data_blocks(const struct slab_depot *depot)
4437 {
4438 	return (READ_ONCE(depot->slab_count) * depot->slab_config.data_blocks);
4439 }
4440 
4441 /**
4442  * finish_combining_zones() - Clean up after saving out the combined slab summary.
4443  * @completion: The vio which was used to write the summary data.
4444  */
4445 static void finish_combining_zones(struct vdo_completion *completion)
4446 {
4447 	int result = completion->result;
4448 	struct vdo_completion *parent = completion->parent;
4449 
4450 	free_vio(as_vio(vdo_forget(completion)));
4451 	vdo_fail_completion(parent, result);
4452 }
4453 
4454 static void handle_combining_error(struct vdo_completion *completion)
4455 {
4456 	vio_record_metadata_io_error(as_vio(completion));
4457 	finish_combining_zones(completion);
4458 }
4459 
4460 static void write_summary_endio(struct bio *bio)
4461 {
4462 	struct vio *vio = bio->bi_private;
4463 	struct vdo *vdo = vio->completion.vdo;
4464 
4465 	continue_vio_after_io(vio, finish_combining_zones,
4466 			      vdo->thread_config.admin_thread);
4467 }
4468 
4469 /**
4470  * combine_summaries() - Treating the current entries buffer as the on-disk value of all zones,
4471  *                       update every zone to the correct values for every slab.
4472  * @depot: The depot whose summary entries should be combined.
4473  */
4474 static void combine_summaries(struct slab_depot *depot)
4475 {
4476 	/*
4477 	 * Combine all the old summary data into the portion of the buffer corresponding to the
4478 	 * first zone.
4479 	 */
4480 	zone_count_t zone = 0;
4481 	struct slab_summary_entry *entries = depot->summary_entries;
4482 
4483 	if (depot->old_zone_count > 1) {
4484 		slab_count_t entry_number;
4485 
4486 		for (entry_number = 0; entry_number < MAX_VDO_SLABS; entry_number++) {
4487 			if (zone != 0) {
4488 				memcpy(entries + entry_number,
4489 				       entries + (zone * MAX_VDO_SLABS) + entry_number,
4490 				       sizeof(struct slab_summary_entry));
4491 			}
4492 
4493 			zone++;
4494 			if (zone == depot->old_zone_count)
4495 				zone = 0;
4496 		}
4497 	}
4498 
4499 	/* Copy the combined data to each zones's region of the buffer. */
4500 	for (zone = 1; zone < MAX_VDO_PHYSICAL_ZONES; zone++) {
4501 		memcpy(entries + (zone * MAX_VDO_SLABS), entries,
4502 		       MAX_VDO_SLABS * sizeof(struct slab_summary_entry));
4503 	}
4504 }
4505 
4506 /**
4507  * finish_loading_summary() - Finish loading slab summary data.
4508  * @completion: The vio which was used to read the summary data.
4509  *
4510  * Combines the slab summary data from all the previously written zones and copies the combined
4511  * summary to each partition's data region. Then writes the combined summary back out to disk. This
4512  * callback is registered in load_summary_endio().
4513  */
4514 static void finish_loading_summary(struct vdo_completion *completion)
4515 {
4516 	struct slab_depot *depot = completion->vdo->depot;
4517 
4518 	/* Combine the summary from each zone so each zone is correct for all slabs. */
4519 	combine_summaries(depot);
4520 
4521 	/* Write the combined summary back out. */
4522 	vdo_submit_metadata_vio(as_vio(completion), depot->summary_origin,
4523 				write_summary_endio, handle_combining_error,
4524 				REQ_OP_WRITE);
4525 }
4526 
4527 static void load_summary_endio(struct bio *bio)
4528 {
4529 	struct vio *vio = bio->bi_private;
4530 	struct vdo *vdo = vio->completion.vdo;
4531 
4532 	continue_vio_after_io(vio, finish_loading_summary,
4533 			      vdo->thread_config.admin_thread);
4534 }
4535 
4536 /**
4537  * load_slab_summary() - The preamble of a load operation.
4538  *
4539  * Implements vdo_action_preamble_fn.
4540  */
4541 static void load_slab_summary(void *context, struct vdo_completion *parent)
4542 {
4543 	int result;
4544 	struct vio *vio;
4545 	struct slab_depot *depot = context;
4546 	const struct admin_state_code *operation =
4547 		vdo_get_current_manager_operation(depot->action_manager);
4548 
4549 	result = create_multi_block_metadata_vio(depot->vdo, VIO_TYPE_SLAB_SUMMARY,
4550 						 VIO_PRIORITY_METADATA, parent,
4551 						 VDO_SLAB_SUMMARY_BLOCKS,
4552 						 (char *) depot->summary_entries, &vio);
4553 	if (result != VDO_SUCCESS) {
4554 		vdo_fail_completion(parent, result);
4555 		return;
4556 	}
4557 
4558 	if ((operation == VDO_ADMIN_STATE_FORMATTING) ||
4559 	    (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD)) {
4560 		finish_loading_summary(&vio->completion);
4561 		return;
4562 	}
4563 
4564 	vdo_submit_metadata_vio(vio, depot->summary_origin, load_summary_endio,
4565 				handle_combining_error, REQ_OP_READ);
4566 }
4567 
4568 /* Implements vdo_zone_action_fn. */
4569 static void load_allocator(void *context, zone_count_t zone_number,
4570 			   struct vdo_completion *parent)
4571 {
4572 	struct slab_depot *depot = context;
4573 
4574 	vdo_start_loading(&depot->allocators[zone_number].state,
4575 			  vdo_get_current_manager_operation(depot->action_manager),
4576 			  parent, initiate_load);
4577 }
4578 
4579 /**
4580  * vdo_load_slab_depot() - Asynchronously load any slab depot state that isn't included in the
4581  *                         super_block component.
4582  * @depot: The depot to load.
4583  * @operation: The type of load to perform.
4584  * @parent: The completion to notify when the load is complete.
4585  * @context: Additional context for the load operation; may be NULL.
4586  *
4587  * This method may be called only before entering normal operation from the load thread.
4588  */
4589 void vdo_load_slab_depot(struct slab_depot *depot,
4590 			 const struct admin_state_code *operation,
4591 			 struct vdo_completion *parent, void *context)
4592 {
4593 	if (!vdo_assert_load_operation(operation, parent))
4594 		return;
4595 
4596 	vdo_schedule_operation_with_context(depot->action_manager, operation,
4597 					    load_slab_summary, load_allocator,
4598 					    NULL, context, parent);
4599 }
4600 
4601 /* Implements vdo_zone_action_fn. */
4602 static void prepare_to_allocate(void *context, zone_count_t zone_number,
4603 				struct vdo_completion *parent)
4604 {
4605 	struct slab_depot *depot = context;
4606 	struct block_allocator *allocator = &depot->allocators[zone_number];
4607 	int result;
4608 
4609 	result = vdo_prepare_slabs_for_allocation(allocator);
4610 	if (result != VDO_SUCCESS) {
4611 		vdo_fail_completion(parent, result);
4612 		return;
4613 	}
4614 
4615 	scrub_slabs(allocator, parent);
4616 }
4617 
4618 /**
4619  * vdo_prepare_slab_depot_to_allocate() - Prepare the slab depot to come online and start
4620  *                                        allocating blocks.
4621  * @depot: The depot to prepare.
4622  * @load_type: The load type.
4623  * @parent: The completion to notify when the operation is complete.
4624  *
4625  * This method may be called only before entering normal operation from the load thread. It must be
4626  * called before allocation may proceed.
4627  */
4628 void vdo_prepare_slab_depot_to_allocate(struct slab_depot *depot,
4629 					enum slab_depot_load_type load_type,
4630 					struct vdo_completion *parent)
4631 {
4632 	depot->load_type = load_type;
4633 	atomic_set(&depot->zones_to_scrub, depot->zone_count);
4634 	vdo_schedule_action(depot->action_manager, NULL,
4635 			    prepare_to_allocate, NULL, parent);
4636 }
4637 
4638 /**
4639  * vdo_update_slab_depot_size() - Update the slab depot to reflect its new size in memory.
4640  * @depot: The depot to update.
4641  *
4642  * This size is saved to disk as part of the super block.
4643  */
4644 void vdo_update_slab_depot_size(struct slab_depot *depot)
4645 {
4646 	depot->last_block = depot->new_last_block;
4647 }
4648 
4649 /**
4650  * vdo_prepare_to_grow_slab_depot() - Allocate new memory needed for a resize of a slab depot to
4651  *                                    the given size.
4652  * @depot: The depot to prepare to resize.
4653  * @partition: The new depot partition
4654  *
4655  * Return: VDO_SUCCESS or an error.
4656  */
4657 int vdo_prepare_to_grow_slab_depot(struct slab_depot *depot,
4658 				   const struct partition *partition)
4659 {
4660 	struct slab_depot_state_2_0 new_state;
4661 	int result;
4662 	slab_count_t new_slab_count;
4663 
4664 	if ((partition->count >> depot->slab_size_shift) <= depot->slab_count)
4665 		return VDO_INCREMENT_TOO_SMALL;
4666 
4667 	/* Generate the depot configuration for the new block count. */
4668 	VDO_ASSERT_LOG_ONLY(depot->first_block == partition->offset,
4669 			    "New slab depot partition doesn't change origin");
4670 	result = vdo_configure_slab_depot(partition, depot->slab_config,
4671 					  depot->zone_count, &new_state);
4672 	if (result != VDO_SUCCESS)
4673 		return result;
4674 
4675 	new_slab_count = vdo_compute_slab_count(depot->first_block,
4676 						new_state.last_block,
4677 						depot->slab_size_shift);
4678 	if (new_slab_count <= depot->slab_count)
4679 		return vdo_log_error_strerror(VDO_INCREMENT_TOO_SMALL,
4680 					      "Depot can only grow");
4681 	if (new_slab_count == depot->new_slab_count) {
4682 		/* Check it out, we've already got all the new slabs allocated! */
4683 		return VDO_SUCCESS;
4684 	}
4685 
4686 	vdo_abandon_new_slabs(depot);
4687 	result = allocate_slabs(depot, new_slab_count);
4688 	if (result != VDO_SUCCESS) {
4689 		vdo_abandon_new_slabs(depot);
4690 		return result;
4691 	}
4692 
4693 	depot->new_size = partition->count;
4694 	depot->old_last_block = depot->last_block;
4695 	depot->new_last_block = new_state.last_block;
4696 
4697 	return VDO_SUCCESS;
4698 }
4699 
4700 /**
4701  * finish_registration() - Finish registering new slabs now that all of the allocators have
4702  *                         received their new slabs.
4703  *
4704  * Implements vdo_action_conclusion_fn.
4705  */
4706 static int finish_registration(void *context)
4707 {
4708 	struct slab_depot *depot = context;
4709 
4710 	WRITE_ONCE(depot->slab_count, depot->new_slab_count);
4711 	vdo_free(depot->slabs);
4712 	depot->slabs = depot->new_slabs;
4713 	depot->new_slabs = NULL;
4714 	depot->new_slab_count = 0;
4715 	return VDO_SUCCESS;
4716 }
4717 
4718 /* Implements vdo_zone_action_fn. */
4719 static void register_new_slabs(void *context, zone_count_t zone_number,
4720 			       struct vdo_completion *parent)
4721 {
4722 	struct slab_depot *depot = context;
4723 	struct block_allocator *allocator = &depot->allocators[zone_number];
4724 	slab_count_t i;
4725 
4726 	for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4727 		struct vdo_slab *slab = depot->new_slabs[i];
4728 
4729 		if (slab->allocator == allocator)
4730 			register_slab_with_allocator(allocator, slab);
4731 	}
4732 
4733 	vdo_finish_completion(parent);
4734 }
4735 
4736 /**
4737  * vdo_use_new_slabs() - Use the new slabs allocated for resize.
4738  * @depot: The depot.
4739  * @parent: The object to notify when complete.
4740  */
4741 void vdo_use_new_slabs(struct slab_depot *depot, struct vdo_completion *parent)
4742 {
4743 	VDO_ASSERT_LOG_ONLY(depot->new_slabs != NULL, "Must have new slabs to use");
4744 	vdo_schedule_operation(depot->action_manager,
4745 			       VDO_ADMIN_STATE_SUSPENDED_OPERATION,
4746 			       NULL, register_new_slabs,
4747 			       finish_registration, parent);
4748 }
4749 
4750 /**
4751  * stop_scrubbing() - Tell the scrubber to stop scrubbing after it finishes the slab it is
4752  *                    currently working on.
4753  * @scrubber: The scrubber to stop.
4754  * @parent: The completion to notify when scrubbing has stopped.
4755  */
4756 static void stop_scrubbing(struct block_allocator *allocator)
4757 {
4758 	struct slab_scrubber *scrubber = &allocator->scrubber;
4759 
4760 	if (vdo_is_state_quiescent(&scrubber->admin_state)) {
4761 		vdo_finish_completion(&allocator->completion);
4762 	} else {
4763 		vdo_start_draining(&scrubber->admin_state,
4764 				   VDO_ADMIN_STATE_SUSPENDING,
4765 				   &allocator->completion, NULL);
4766 	}
4767 }
4768 
4769 /* Implements vdo_admin_initiator_fn. */
4770 static void initiate_summary_drain(struct admin_state *state)
4771 {
4772 	check_summary_drain_complete(container_of(state, struct block_allocator,
4773 						  summary_state));
4774 }
4775 
4776 static void do_drain_step(struct vdo_completion *completion)
4777 {
4778 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
4779 
4780 	vdo_prepare_completion_for_requeue(&allocator->completion, do_drain_step,
4781 					   handle_operation_error, allocator->thread_id,
4782 					   NULL);
4783 	switch (++allocator->drain_step) {
4784 	case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4785 		stop_scrubbing(allocator);
4786 		return;
4787 
4788 	case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4789 		apply_to_slabs(allocator, do_drain_step);
4790 		return;
4791 
4792 	case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4793 		vdo_start_draining(&allocator->summary_state,
4794 				   vdo_get_admin_state_code(&allocator->state),
4795 				   completion, initiate_summary_drain);
4796 		return;
4797 
4798 	case VDO_DRAIN_ALLOCATOR_STEP_FINISHED:
4799 		VDO_ASSERT_LOG_ONLY(!is_vio_pool_busy(allocator->vio_pool),
4800 				    "vio pool not busy");
4801 		vdo_finish_draining_with_result(&allocator->state, completion->result);
4802 		return;
4803 
4804 	default:
4805 		vdo_finish_draining_with_result(&allocator->state, UDS_BAD_STATE);
4806 	}
4807 }
4808 
4809 /* Implements vdo_admin_initiator_fn. */
4810 static void initiate_drain(struct admin_state *state)
4811 {
4812 	struct block_allocator *allocator =
4813 		container_of(state, struct block_allocator, state);
4814 
4815 	allocator->drain_step = VDO_DRAIN_ALLOCATOR_START;
4816 	do_drain_step(&allocator->completion);
4817 }
4818 
4819 /*
4820  * Drain all allocator I/O. Depending upon the type of drain, some or all dirty metadata may be
4821  * written to disk. The type of drain will be determined from the state of the allocator's depot.
4822  *
4823  * Implements vdo_zone_action_fn.
4824  */
4825 static void drain_allocator(void *context, zone_count_t zone_number,
4826 			    struct vdo_completion *parent)
4827 {
4828 	struct slab_depot *depot = context;
4829 
4830 	vdo_start_draining(&depot->allocators[zone_number].state,
4831 			   vdo_get_current_manager_operation(depot->action_manager),
4832 			   parent, initiate_drain);
4833 }
4834 
4835 /**
4836  * vdo_drain_slab_depot() - Drain all slab depot I/O.
4837  * @depot: The depot to drain.
4838  * @operation: The drain operation (flush, rebuild, suspend, or save).
4839  * @parent: The completion to finish when the drain is complete.
4840  *
4841  * If saving, or flushing, all dirty depot metadata will be written out. If saving or suspending,
4842  * the depot will be left in a suspended state.
4843  */
4844 void vdo_drain_slab_depot(struct slab_depot *depot,
4845 			  const struct admin_state_code *operation,
4846 			  struct vdo_completion *parent)
4847 {
4848 	vdo_schedule_operation(depot->action_manager, operation,
4849 			       NULL, drain_allocator, NULL, parent);
4850 }
4851 
4852 /**
4853  * resume_scrubbing() - Tell the scrubber to resume scrubbing if it has been stopped.
4854  * @allocator: The allocator being resumed.
4855  */
4856 static void resume_scrubbing(struct block_allocator *allocator)
4857 {
4858 	int result;
4859 	struct slab_scrubber *scrubber = &allocator->scrubber;
4860 
4861 	if (!has_slabs_to_scrub(scrubber)) {
4862 		vdo_finish_completion(&allocator->completion);
4863 		return;
4864 	}
4865 
4866 	result = vdo_resume_if_quiescent(&scrubber->admin_state);
4867 	if (result != VDO_SUCCESS) {
4868 		vdo_fail_completion(&allocator->completion, result);
4869 		return;
4870 	}
4871 
4872 	scrub_next_slab(scrubber);
4873 	vdo_finish_completion(&allocator->completion);
4874 }
4875 
4876 static void do_resume_step(struct vdo_completion *completion)
4877 {
4878 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
4879 
4880 	vdo_prepare_completion_for_requeue(&allocator->completion, do_resume_step,
4881 					   handle_operation_error,
4882 					   allocator->thread_id, NULL);
4883 	switch (--allocator->drain_step) {
4884 	case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4885 		vdo_fail_completion(completion,
4886 				    vdo_resume_if_quiescent(&allocator->summary_state));
4887 		return;
4888 
4889 	case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4890 		apply_to_slabs(allocator, do_resume_step);
4891 		return;
4892 
4893 	case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4894 		resume_scrubbing(allocator);
4895 		return;
4896 
4897 	case VDO_DRAIN_ALLOCATOR_START:
4898 		vdo_finish_resuming_with_result(&allocator->state, completion->result);
4899 		return;
4900 
4901 	default:
4902 		vdo_finish_resuming_with_result(&allocator->state, UDS_BAD_STATE);
4903 	}
4904 }
4905 
4906 /* Implements vdo_admin_initiator_fn. */
4907 static void initiate_resume(struct admin_state *state)
4908 {
4909 	struct block_allocator *allocator =
4910 		container_of(state, struct block_allocator, state);
4911 
4912 	allocator->drain_step = VDO_DRAIN_ALLOCATOR_STEP_FINISHED;
4913 	do_resume_step(&allocator->completion);
4914 }
4915 
4916 /* Implements vdo_zone_action_fn. */
4917 static void resume_allocator(void *context, zone_count_t zone_number,
4918 			     struct vdo_completion *parent)
4919 {
4920 	struct slab_depot *depot = context;
4921 
4922 	vdo_start_resuming(&depot->allocators[zone_number].state,
4923 			   vdo_get_current_manager_operation(depot->action_manager),
4924 			   parent, initiate_resume);
4925 }
4926 
4927 /**
4928  * vdo_resume_slab_depot() - Resume a suspended slab depot.
4929  * @depot: The depot to resume.
4930  * @parent: The completion to finish when the depot has resumed.
4931  */
4932 void vdo_resume_slab_depot(struct slab_depot *depot, struct vdo_completion *parent)
4933 {
4934 	if (vdo_is_read_only(depot->vdo)) {
4935 		vdo_continue_completion(parent, VDO_READ_ONLY);
4936 		return;
4937 	}
4938 
4939 	vdo_schedule_operation(depot->action_manager, VDO_ADMIN_STATE_RESUMING,
4940 			       NULL, resume_allocator, NULL, parent);
4941 }
4942 
4943 /**
4944  * vdo_commit_oldest_slab_journal_tail_blocks() - Commit all dirty tail blocks which are locking a
4945  *                                                given recovery journal block.
4946  * @depot: The depot.
4947  * @recovery_block_number: The sequence number of the recovery journal block whose locks should be
4948  *                         released.
4949  *
4950  * Context: This method must be called from the journal zone thread.
4951  */
4952 void vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot *depot,
4953 						sequence_number_t recovery_block_number)
4954 {
4955 	if (depot == NULL)
4956 		return;
4957 
4958 	depot->new_release_request = recovery_block_number;
4959 	vdo_schedule_default_action(depot->action_manager);
4960 }
4961 
4962 /* Implements vdo_zone_action_fn. */
4963 static void scrub_all_unrecovered_slabs(void *context, zone_count_t zone_number,
4964 					struct vdo_completion *parent)
4965 {
4966 	struct slab_depot *depot = context;
4967 
4968 	scrub_slabs(&depot->allocators[zone_number], NULL);
4969 	vdo_launch_completion(parent);
4970 }
4971 
4972 /**
4973  * vdo_scrub_all_unrecovered_slabs() - Scrub all unrecovered slabs.
4974  * @depot: The depot to scrub.
4975  * @parent: The object to notify when scrubbing has been launched for all zones.
4976  */
4977 void vdo_scrub_all_unrecovered_slabs(struct slab_depot *depot,
4978 				     struct vdo_completion *parent)
4979 {
4980 	vdo_schedule_action(depot->action_manager, NULL,
4981 			    scrub_all_unrecovered_slabs,
4982 			    NULL, parent);
4983 }
4984 
4985 /**
4986  * get_block_allocator_statistics() - Get the total of the statistics from all the block allocators
4987  *                                    in the depot.
4988  * @depot: The slab depot.
4989  *
4990  * Return: The statistics from all block allocators in the depot.
4991  */
4992 static struct block_allocator_statistics __must_check
4993 get_block_allocator_statistics(const struct slab_depot *depot)
4994 {
4995 	struct block_allocator_statistics totals;
4996 	zone_count_t zone;
4997 
4998 	memset(&totals, 0, sizeof(totals));
4999 
5000 	for (zone = 0; zone < depot->zone_count; zone++) {
5001 		const struct block_allocator *allocator = &depot->allocators[zone];
5002 		const struct block_allocator_statistics *stats = &allocator->statistics;
5003 
5004 		totals.slab_count += allocator->slab_count;
5005 		totals.slabs_opened += READ_ONCE(stats->slabs_opened);
5006 		totals.slabs_reopened += READ_ONCE(stats->slabs_reopened);
5007 	}
5008 
5009 	return totals;
5010 }
5011 
5012 /**
5013  * get_ref_counts_statistics() - Get the cumulative ref_counts statistics for the depot.
5014  * @depot: The slab depot.
5015  *
5016  * Return: The cumulative statistics for all ref_counts in the depot.
5017  */
5018 static struct ref_counts_statistics __must_check
5019 get_ref_counts_statistics(const struct slab_depot *depot)
5020 {
5021 	struct ref_counts_statistics totals;
5022 	zone_count_t zone;
5023 
5024 	memset(&totals, 0, sizeof(totals));
5025 
5026 	for (zone = 0; zone < depot->zone_count; zone++) {
5027 		totals.blocks_written +=
5028 			READ_ONCE(depot->allocators[zone].ref_counts_statistics.blocks_written);
5029 	}
5030 
5031 	return totals;
5032 }
5033 
5034 /**
5035  * get_slab_journal_statistics() - Get the aggregated slab journal statistics for the depot.
5036  * @depot: The slab depot.
5037  *
5038  * Return: The aggregated statistics for all slab journals in the depot.
5039  */
5040 static struct slab_journal_statistics __must_check
5041 get_slab_journal_statistics(const struct slab_depot *depot)
5042 {
5043 	struct slab_journal_statistics totals;
5044 	zone_count_t zone;
5045 
5046 	memset(&totals, 0, sizeof(totals));
5047 
5048 	for (zone = 0; zone < depot->zone_count; zone++) {
5049 		const struct slab_journal_statistics *stats =
5050 			&depot->allocators[zone].slab_journal_statistics;
5051 
5052 		totals.disk_full_count += READ_ONCE(stats->disk_full_count);
5053 		totals.flush_count += READ_ONCE(stats->flush_count);
5054 		totals.blocked_count += READ_ONCE(stats->blocked_count);
5055 		totals.blocks_written += READ_ONCE(stats->blocks_written);
5056 		totals.tail_busy_count += READ_ONCE(stats->tail_busy_count);
5057 	}
5058 
5059 	return totals;
5060 }
5061 
5062 /**
5063  * vdo_get_slab_depot_statistics() - Get all the vdo_statistics fields that are properties of the
5064  *                                   slab depot.
5065  * @depot: The slab depot.
5066  * @stats: The vdo statistics structure to partially fill.
5067  */
5068 void vdo_get_slab_depot_statistics(const struct slab_depot *depot,
5069 				   struct vdo_statistics *stats)
5070 {
5071 	slab_count_t slab_count = READ_ONCE(depot->slab_count);
5072 	slab_count_t unrecovered = 0;
5073 	zone_count_t zone;
5074 
5075 	for (zone = 0; zone < depot->zone_count; zone++) {
5076 		/* The allocators are responsible for thread safety. */
5077 		unrecovered += READ_ONCE(depot->allocators[zone].scrubber.slab_count);
5078 	}
5079 
5080 	stats->recovery_percentage = (slab_count - unrecovered) * 100 / slab_count;
5081 	stats->allocator = get_block_allocator_statistics(depot);
5082 	stats->ref_counts = get_ref_counts_statistics(depot);
5083 	stats->slab_journal = get_slab_journal_statistics(depot);
5084 	stats->slab_summary = (struct slab_summary_statistics) {
5085 		.blocks_written = atomic64_read(&depot->summary_statistics.blocks_written),
5086 	};
5087 }
5088 
5089 /**
5090  * vdo_dump_slab_depot() - Dump the slab depot, in a thread-unsafe fashion.
5091  * @depot: The slab depot.
5092  */
5093 void vdo_dump_slab_depot(const struct slab_depot *depot)
5094 {
5095 	vdo_log_info("vdo slab depot");
5096 	vdo_log_info("  zone_count=%u old_zone_count=%u slabCount=%u active_release_request=%llu new_release_request=%llu",
5097 		     (unsigned int) depot->zone_count,
5098 		     (unsigned int) depot->old_zone_count, READ_ONCE(depot->slab_count),
5099 		     (unsigned long long) depot->active_release_request,
5100 		     (unsigned long long) depot->new_release_request);
5101 }
5102