xref: /linux/drivers/md/dm-vdo/slab-depot.c (revision 5014bebee0cffda14fafae5a2534d08120b7b9e8)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "slab-depot.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 #include <linux/err.h>
11 #include <linux/log2.h>
12 #include <linux/min_heap.h>
13 #include <linux/minmax.h>
14 
15 #include "logger.h"
16 #include "memory-alloc.h"
17 #include "numeric.h"
18 #include "permassert.h"
19 #include "string-utils.h"
20 
21 #include "action-manager.h"
22 #include "admin-state.h"
23 #include "completion.h"
24 #include "constants.h"
25 #include "data-vio.h"
26 #include "encodings.h"
27 #include "io-submitter.h"
28 #include "physical-zone.h"
29 #include "priority-table.h"
30 #include "recovery-journal.h"
31 #include "repair.h"
32 #include "status-codes.h"
33 #include "types.h"
34 #include "vdo.h"
35 #include "vio.h"
36 #include "wait-queue.h"
37 
38 static const u64 BYTES_PER_WORD = sizeof(u64);
39 static const bool NORMAL_OPERATION = true;
40 
41 /**
42  * get_lock() - Get the lock object for a slab journal block by sequence number.
43  * @journal: vdo_slab journal to retrieve from.
44  * @sequence_number: Sequence number of the block.
45  *
46  * Return: The lock object for the given sequence number.
47  */
get_lock(struct slab_journal * journal,sequence_number_t sequence_number)48 static inline struct journal_lock * __must_check get_lock(struct slab_journal *journal,
49 							  sequence_number_t sequence_number)
50 {
51 	return &journal->locks[sequence_number % journal->size];
52 }
53 
is_slab_open(struct vdo_slab * slab)54 static bool is_slab_open(struct vdo_slab *slab)
55 {
56 	return (!vdo_is_state_quiescing(&slab->state) &&
57 		!vdo_is_state_quiescent(&slab->state));
58 }
59 
60 /**
61  * must_make_entries_to_flush() - Check whether there are entry waiters which should delay a flush.
62  * @journal: The journal to check.
63  *
64  * Return: true if there are no entry waiters, or if the slab is unrecovered.
65  */
must_make_entries_to_flush(struct slab_journal * journal)66 static inline bool __must_check must_make_entries_to_flush(struct slab_journal *journal)
67 {
68 	return ((journal->slab->status != VDO_SLAB_REBUILDING) &&
69 		vdo_waitq_has_waiters(&journal->entry_waiters));
70 }
71 
72 /**
73  * is_reaping() - Check whether a reap is currently in progress.
74  * @journal: The journal which may be reaping.
75  *
76  * Return: true if the journal is reaping.
77  */
is_reaping(struct slab_journal * journal)78 static inline bool __must_check is_reaping(struct slab_journal *journal)
79 {
80 	return (journal->head != journal->unreapable);
81 }
82 
83 /**
84  * initialize_tail_block() - Initialize tail block as a new block.
85  * @journal: The journal whose tail block is being initialized.
86  */
initialize_tail_block(struct slab_journal * journal)87 static void initialize_tail_block(struct slab_journal *journal)
88 {
89 	struct slab_journal_block_header *header = &journal->tail_header;
90 
91 	header->sequence_number = journal->tail;
92 	header->entry_count = 0;
93 	header->has_block_map_increments = false;
94 }
95 
96 /**
97  * initialize_journal_state() - Set all journal fields appropriately to start journaling.
98  * @journal: The journal to be reset, based on its tail sequence number.
99  */
initialize_journal_state(struct slab_journal * journal)100 static void initialize_journal_state(struct slab_journal *journal)
101 {
102 	journal->unreapable = journal->head;
103 	journal->reap_lock = get_lock(journal, journal->unreapable);
104 	journal->next_commit = journal->tail;
105 	journal->summarized = journal->last_summarized = journal->tail;
106 	initialize_tail_block(journal);
107 }
108 
109 /**
110  * block_is_full() - Check whether a journal block is full.
111  * @journal: The slab journal for the block.
112  *
113  * Return: true if the tail block is full.
114  */
block_is_full(struct slab_journal * journal)115 static bool __must_check block_is_full(struct slab_journal *journal)
116 {
117 	journal_entry_count_t count = journal->tail_header.entry_count;
118 
119 	return (journal->tail_header.has_block_map_increments ?
120 		(journal->full_entries_per_block == count) :
121 		(journal->entries_per_block == count));
122 }
123 
124 static void add_entries(struct slab_journal *journal);
125 static void update_tail_block_location(struct slab_journal *journal);
126 static void release_journal_locks(struct vdo_waiter *waiter, void *context);
127 
128 /**
129  * is_slab_journal_blank() - Check whether a slab's journal is blank.
130  *
131  * A slab journal is blank if it has never had any entries recorded in it.
132  *
133  * Return: true if the slab's journal has never been modified.
134  */
is_slab_journal_blank(const struct vdo_slab * slab)135 static bool is_slab_journal_blank(const struct vdo_slab *slab)
136 {
137 	return ((slab->journal.tail == 1) &&
138 		(slab->journal.tail_header.entry_count == 0));
139 }
140 
141 /**
142  * mark_slab_journal_dirty() - Put a slab journal on the dirty list of its allocator in the correct
143  *                             order.
144  * @journal: The journal to be marked dirty.
145  * @lock: The recovery journal lock held by the slab journal.
146  */
mark_slab_journal_dirty(struct slab_journal * journal,sequence_number_t lock)147 static void mark_slab_journal_dirty(struct slab_journal *journal, sequence_number_t lock)
148 {
149 	struct slab_journal *dirty_journal;
150 	struct list_head *dirty_list = &journal->slab->allocator->dirty_slab_journals;
151 
152 	VDO_ASSERT_LOG_ONLY(journal->recovery_lock == 0, "slab journal was clean");
153 
154 	journal->recovery_lock = lock;
155 	list_for_each_entry_reverse(dirty_journal, dirty_list, dirty_entry) {
156 		if (dirty_journal->recovery_lock <= journal->recovery_lock)
157 			break;
158 	}
159 
160 	list_move_tail(&journal->dirty_entry, dirty_journal->dirty_entry.next);
161 }
162 
mark_slab_journal_clean(struct slab_journal * journal)163 static void mark_slab_journal_clean(struct slab_journal *journal)
164 {
165 	journal->recovery_lock = 0;
166 	list_del_init(&journal->dirty_entry);
167 }
168 
check_if_slab_drained(struct vdo_slab * slab)169 static void check_if_slab_drained(struct vdo_slab *slab)
170 {
171 	bool read_only;
172 	struct slab_journal *journal = &slab->journal;
173 	const struct admin_state_code *code;
174 
175 	if (!vdo_is_state_draining(&slab->state) ||
176 	    must_make_entries_to_flush(journal) ||
177 	    is_reaping(journal) ||
178 	    journal->waiting_to_commit ||
179 	    !list_empty(&journal->uncommitted_blocks) ||
180 	    journal->updating_slab_summary ||
181 	    (slab->active_count > 0))
182 		return;
183 
184 	/* When not suspending or recovering, the slab must be clean. */
185 	code = vdo_get_admin_state_code(&slab->state);
186 	read_only = vdo_is_read_only(slab->allocator->depot->vdo);
187 	if (!read_only &&
188 	    vdo_waitq_has_waiters(&slab->dirty_blocks) &&
189 	    (code != VDO_ADMIN_STATE_SUSPENDING) &&
190 	    (code != VDO_ADMIN_STATE_RECOVERING))
191 		return;
192 
193 	vdo_finish_draining_with_result(&slab->state,
194 					(read_only ? VDO_READ_ONLY : VDO_SUCCESS));
195 }
196 
197 /* FULLNESS HINT COMPUTATION */
198 
199 /**
200  * compute_fullness_hint() - Translate a slab's free block count into a 'fullness hint' that can be
201  *                           stored in a slab_summary_entry's 7 bits that are dedicated to its free
202  *                           count.
203  * @depot: The depot whose summary being updated.
204  * @free_blocks: The number of free blocks.
205  *
206  * Note: the number of free blocks must be strictly less than 2^23 blocks, even though
207  * theoretically slabs could contain precisely 2^23 blocks; there is an assumption that at least
208  * one block is used by metadata. This assumption is necessary; otherwise, the fullness hint might
209  * overflow. The fullness hint formula is roughly (fullness >> 16) & 0x7f, but (2^23 >> 16) & 0x7f
210  * is 0, which would make it impossible to distinguish completely full from completely empty.
211  *
212  * Return: A fullness hint, which can be stored in 7 bits.
213  */
compute_fullness_hint(struct slab_depot * depot,block_count_t free_blocks)214 static u8 __must_check compute_fullness_hint(struct slab_depot *depot,
215 					     block_count_t free_blocks)
216 {
217 	block_count_t hint;
218 
219 	VDO_ASSERT_LOG_ONLY((free_blocks < (1 << 23)), "free blocks must be less than 2^23");
220 
221 	if (free_blocks == 0)
222 		return 0;
223 
224 	hint = free_blocks >> depot->hint_shift;
225 	return ((hint == 0) ? 1 : hint);
226 }
227 
228 /**
229  * check_summary_drain_complete() - Check whether an allocators summary has finished draining.
230  */
check_summary_drain_complete(struct block_allocator * allocator)231 static void check_summary_drain_complete(struct block_allocator *allocator)
232 {
233 	if (!vdo_is_state_draining(&allocator->summary_state) ||
234 	    (allocator->summary_write_count > 0))
235 		return;
236 
237 	vdo_finish_operation(&allocator->summary_state,
238 			     (vdo_is_read_only(allocator->depot->vdo) ?
239 			      VDO_READ_ONLY : VDO_SUCCESS));
240 }
241 
242 /**
243  * notify_summary_waiters() - Wake all the waiters in a given queue.
244  * @allocator: The block allocator summary which owns the queue.
245  * @queue: The queue to notify.
246  */
notify_summary_waiters(struct block_allocator * allocator,struct vdo_wait_queue * queue)247 static void notify_summary_waiters(struct block_allocator *allocator,
248 				   struct vdo_wait_queue *queue)
249 {
250 	int result = (vdo_is_read_only(allocator->depot->vdo) ?
251 		      VDO_READ_ONLY : VDO_SUCCESS);
252 
253 	vdo_waitq_notify_all_waiters(queue, NULL, &result);
254 }
255 
256 static void launch_write(struct slab_summary_block *summary_block);
257 
258 /**
259  * finish_updating_slab_summary_block() - Finish processing a block which attempted to write,
260  *                                        whether or not the attempt succeeded.
261  * @block: The block.
262  */
finish_updating_slab_summary_block(struct slab_summary_block * block)263 static void finish_updating_slab_summary_block(struct slab_summary_block *block)
264 {
265 	notify_summary_waiters(block->allocator, &block->current_update_waiters);
266 	block->writing = false;
267 	block->allocator->summary_write_count--;
268 	if (vdo_waitq_has_waiters(&block->next_update_waiters))
269 		launch_write(block);
270 	else
271 		check_summary_drain_complete(block->allocator);
272 }
273 
274 /**
275  * finish_update() - This is the callback for a successful summary block write.
276  * @completion: The write vio.
277  */
finish_update(struct vdo_completion * completion)278 static void finish_update(struct vdo_completion *completion)
279 {
280 	struct slab_summary_block *block =
281 		container_of(as_vio(completion), struct slab_summary_block, vio);
282 
283 	atomic64_inc(&block->allocator->depot->summary_statistics.blocks_written);
284 	finish_updating_slab_summary_block(block);
285 }
286 
287 /**
288  * handle_write_error() - Handle an error writing a slab summary block.
289  * @completion: The write VIO.
290  */
handle_write_error(struct vdo_completion * completion)291 static void handle_write_error(struct vdo_completion *completion)
292 {
293 	struct slab_summary_block *block =
294 		container_of(as_vio(completion), struct slab_summary_block, vio);
295 
296 	vio_record_metadata_io_error(as_vio(completion));
297 	vdo_enter_read_only_mode(completion->vdo, completion->result);
298 	finish_updating_slab_summary_block(block);
299 }
300 
write_slab_summary_endio(struct bio * bio)301 static void write_slab_summary_endio(struct bio *bio)
302 {
303 	struct vio *vio = bio->bi_private;
304 	struct slab_summary_block *block =
305 		container_of(vio, struct slab_summary_block, vio);
306 
307 	continue_vio_after_io(vio, finish_update, block->allocator->thread_id);
308 }
309 
310 /**
311  * launch_write() - Write a slab summary block unless it is currently out for writing.
312  * @block: The block that needs to be committed.
313  */
launch_write(struct slab_summary_block * block)314 static void launch_write(struct slab_summary_block *block)
315 {
316 	struct block_allocator *allocator = block->allocator;
317 	struct slab_depot *depot = allocator->depot;
318 	physical_block_number_t pbn;
319 
320 	if (block->writing)
321 		return;
322 
323 	allocator->summary_write_count++;
324 	vdo_waitq_transfer_all_waiters(&block->next_update_waiters,
325 				       &block->current_update_waiters);
326 	block->writing = true;
327 
328 	if (vdo_is_read_only(depot->vdo)) {
329 		finish_updating_slab_summary_block(block);
330 		return;
331 	}
332 
333 	memcpy(block->outgoing_entries, block->entries, VDO_BLOCK_SIZE);
334 
335 	/*
336 	 * Flush before writing to ensure that the slab journal tail blocks and reference updates
337 	 * covered by this summary update are stable. Otherwise, a subsequent recovery could
338 	 * encounter a slab summary update that refers to a slab journal tail block that has not
339 	 * actually been written. In such cases, the slab journal referenced will be treated as
340 	 * empty, causing any data within the slab which predates the existing recovery journal
341 	 * entries to be lost.
342 	 */
343 	pbn = (depot->summary_origin +
344 	       (VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE * allocator->zone_number) +
345 	       block->index);
346 	vdo_submit_metadata_vio(&block->vio, pbn, write_slab_summary_endio,
347 				handle_write_error, REQ_OP_WRITE | REQ_PREFLUSH);
348 }
349 
350 /**
351  * update_slab_summary_entry() - Update the entry for a slab.
352  * @slab: The slab whose entry is to be updated
353  * @waiter: The waiter that is updating the summary.
354  * @tail_block_offset: The offset of the slab journal's tail block.
355  * @load_ref_counts: Whether the reference counts must be loaded from disk on the vdo load.
356  * @is_clean: Whether the slab is clean.
357  * @free_blocks: The number of free blocks.
358  */
update_slab_summary_entry(struct vdo_slab * slab,struct vdo_waiter * waiter,tail_block_offset_t tail_block_offset,bool load_ref_counts,bool is_clean,block_count_t free_blocks)359 static void update_slab_summary_entry(struct vdo_slab *slab, struct vdo_waiter *waiter,
360 				      tail_block_offset_t tail_block_offset,
361 				      bool load_ref_counts, bool is_clean,
362 				      block_count_t free_blocks)
363 {
364 	u8 index = slab->slab_number / VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK;
365 	struct block_allocator *allocator = slab->allocator;
366 	struct slab_summary_block *block = &allocator->summary_blocks[index];
367 	int result;
368 	struct slab_summary_entry *entry;
369 
370 	if (vdo_is_read_only(block->vio.completion.vdo)) {
371 		result = VDO_READ_ONLY;
372 		waiter->callback(waiter, &result);
373 		return;
374 	}
375 
376 	if (vdo_is_state_draining(&allocator->summary_state) ||
377 	    vdo_is_state_quiescent(&allocator->summary_state)) {
378 		result = VDO_INVALID_ADMIN_STATE;
379 		waiter->callback(waiter, &result);
380 		return;
381 	}
382 
383 	entry = &allocator->summary_entries[slab->slab_number];
384 	*entry = (struct slab_summary_entry) {
385 		.tail_block_offset = tail_block_offset,
386 		.load_ref_counts = (entry->load_ref_counts || load_ref_counts),
387 		.is_dirty = !is_clean,
388 		.fullness_hint = compute_fullness_hint(allocator->depot, free_blocks),
389 	};
390 	vdo_waitq_enqueue_waiter(&block->next_update_waiters, waiter);
391 	launch_write(block);
392 }
393 
394 /**
395  * finish_reaping() - Actually advance the head of the journal now that any necessary flushes are
396  *                    complete.
397  * @journal: The journal to be reaped.
398  */
finish_reaping(struct slab_journal * journal)399 static void finish_reaping(struct slab_journal *journal)
400 {
401 	journal->head = journal->unreapable;
402 	add_entries(journal);
403 	check_if_slab_drained(journal->slab);
404 }
405 
406 static void reap_slab_journal(struct slab_journal *journal);
407 
408 /**
409  * complete_reaping() - Finish reaping now that we have flushed the lower layer and then try
410  *                      reaping again in case we deferred reaping due to an outstanding vio.
411  * @completion: The flush vio.
412  */
complete_reaping(struct vdo_completion * completion)413 static void complete_reaping(struct vdo_completion *completion)
414 {
415 	struct slab_journal *journal = completion->parent;
416 
417 	return_vio_to_pool(vio_as_pooled_vio(as_vio(completion)));
418 	finish_reaping(journal);
419 	reap_slab_journal(journal);
420 }
421 
422 /**
423  * handle_flush_error() - Handle an error flushing the lower layer.
424  * @completion: The flush vio.
425  */
handle_flush_error(struct vdo_completion * completion)426 static void handle_flush_error(struct vdo_completion *completion)
427 {
428 	vio_record_metadata_io_error(as_vio(completion));
429 	vdo_enter_read_only_mode(completion->vdo, completion->result);
430 	complete_reaping(completion);
431 }
432 
flush_endio(struct bio * bio)433 static void flush_endio(struct bio *bio)
434 {
435 	struct vio *vio = bio->bi_private;
436 	struct slab_journal *journal = vio->completion.parent;
437 
438 	continue_vio_after_io(vio, complete_reaping,
439 			      journal->slab->allocator->thread_id);
440 }
441 
442 /**
443  * flush_for_reaping() - A waiter callback for getting a vio with which to flush the lower layer
444  *                       prior to reaping.
445  * @waiter: The journal as a flush waiter.
446  * @context: The newly acquired flush vio.
447  */
flush_for_reaping(struct vdo_waiter * waiter,void * context)448 static void flush_for_reaping(struct vdo_waiter *waiter, void *context)
449 {
450 	struct slab_journal *journal =
451 		container_of(waiter, struct slab_journal, flush_waiter);
452 	struct pooled_vio *pooled = context;
453 	struct vio *vio = &pooled->vio;
454 
455 	vio->completion.parent = journal;
456 	vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
457 }
458 
459 /**
460  * reap_slab_journal() - Conduct a reap on a slab journal to reclaim unreferenced blocks.
461  * @journal: The slab journal.
462  */
reap_slab_journal(struct slab_journal * journal)463 static void reap_slab_journal(struct slab_journal *journal)
464 {
465 	bool reaped = false;
466 
467 	if (is_reaping(journal)) {
468 		/* We already have a reap in progress so wait for it to finish. */
469 		return;
470 	}
471 
472 	if ((journal->slab->status != VDO_SLAB_REBUILT) ||
473 	    !vdo_is_state_normal(&journal->slab->state) ||
474 	    vdo_is_read_only(journal->slab->allocator->depot->vdo)) {
475 		/*
476 		 * We must not reap in the first two cases, and there's no point in read-only mode.
477 		 */
478 		return;
479 	}
480 
481 	/*
482 	 * Start reclaiming blocks only when the journal head has no references. Then stop when a
483 	 * block is referenced or reap reaches the most recently written block, referenced by the
484 	 * slab summary, which has the sequence number just before the tail.
485 	 */
486 	while ((journal->unreapable < journal->tail) && (journal->reap_lock->count == 0)) {
487 		reaped = true;
488 		journal->unreapable++;
489 		journal->reap_lock++;
490 		if (journal->reap_lock == &journal->locks[journal->size])
491 			journal->reap_lock = &journal->locks[0];
492 	}
493 
494 	if (!reaped)
495 		return;
496 
497 	/*
498 	 * It is never safe to reap a slab journal block without first issuing a flush, regardless
499 	 * of whether a user flush has been received or not. In the absence of the flush, the
500 	 * reference block write which released the locks allowing the slab journal to reap may not
501 	 * be persisted. Although slab summary writes will eventually issue flushes, multiple slab
502 	 * journal block writes can be issued while previous slab summary updates have not yet been
503 	 * made. Even though those slab journal block writes will be ignored if the slab summary
504 	 * update is not persisted, they may still overwrite the to-be-reaped slab journal block
505 	 * resulting in a loss of reference count updates.
506 	 */
507 	journal->flush_waiter.callback = flush_for_reaping;
508 	acquire_vio_from_pool(journal->slab->allocator->vio_pool,
509 			      &journal->flush_waiter);
510 }
511 
512 /**
513  * adjust_slab_journal_block_reference() - Adjust the reference count for a slab journal block.
514  * @journal: The slab journal.
515  * @sequence_number: The journal sequence number of the referenced block.
516  * @adjustment: Amount to adjust the reference counter.
517  *
518  * Note that when the adjustment is negative, the slab journal will be reaped.
519  */
adjust_slab_journal_block_reference(struct slab_journal * journal,sequence_number_t sequence_number,int adjustment)520 static void adjust_slab_journal_block_reference(struct slab_journal *journal,
521 						sequence_number_t sequence_number,
522 						int adjustment)
523 {
524 	struct journal_lock *lock;
525 
526 	if (sequence_number == 0)
527 		return;
528 
529 	if (journal->slab->status == VDO_SLAB_REPLAYING) {
530 		/* Locks should not be used during offline replay. */
531 		return;
532 	}
533 
534 	VDO_ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero");
535 	lock = get_lock(journal, sequence_number);
536 	if (adjustment < 0) {
537 		VDO_ASSERT_LOG_ONLY((-adjustment <= lock->count),
538 				    "adjustment %d of lock count %u for slab journal block %llu must not underflow",
539 				    adjustment, lock->count,
540 				    (unsigned long long) sequence_number);
541 	}
542 
543 	lock->count += adjustment;
544 	if (lock->count == 0)
545 		reap_slab_journal(journal);
546 }
547 
548 /**
549  * release_journal_locks() - Callback invoked after a slab summary update completes.
550  * @waiter: The slab summary waiter that has just been notified.
551  * @context: The result code of the update.
552  *
553  * Registered in the constructor on behalf of update_tail_block_location().
554  *
555  * Implements waiter_callback_fn.
556  */
release_journal_locks(struct vdo_waiter * waiter,void * context)557 static void release_journal_locks(struct vdo_waiter *waiter, void *context)
558 {
559 	sequence_number_t first, i;
560 	struct slab_journal *journal =
561 		container_of(waiter, struct slab_journal, slab_summary_waiter);
562 	int result = *((int *) context);
563 
564 	if (result != VDO_SUCCESS) {
565 		if (result != VDO_READ_ONLY) {
566 			/*
567 			 * Don't bother logging what might be lots of errors if we are already in
568 			 * read-only mode.
569 			 */
570 			vdo_log_error_strerror(result, "failed slab summary update %llu",
571 					       (unsigned long long) journal->summarized);
572 		}
573 
574 		journal->updating_slab_summary = false;
575 		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
576 		check_if_slab_drained(journal->slab);
577 		return;
578 	}
579 
580 	if (journal->partial_write_in_progress && (journal->summarized == journal->tail)) {
581 		journal->partial_write_in_progress = false;
582 		add_entries(journal);
583 	}
584 
585 	first = journal->last_summarized;
586 	journal->last_summarized = journal->summarized;
587 	for (i = journal->summarized - 1; i >= first; i--) {
588 		/*
589 		 * Release the lock the summarized block held on the recovery journal. (During
590 		 * replay, recovery_start will always be 0.)
591 		 */
592 		if (journal->recovery_journal != NULL) {
593 			zone_count_t zone_number = journal->slab->allocator->zone_number;
594 			struct journal_lock *lock = get_lock(journal, i);
595 
596 			vdo_release_recovery_journal_block_reference(journal->recovery_journal,
597 								     lock->recovery_start,
598 								     VDO_ZONE_TYPE_PHYSICAL,
599 								     zone_number);
600 		}
601 
602 		/*
603 		 * Release our own lock against reaping for blocks that are committed. (This
604 		 * function will not change locks during replay.)
605 		 */
606 		adjust_slab_journal_block_reference(journal, i, -1);
607 	}
608 
609 	journal->updating_slab_summary = false;
610 
611 	reap_slab_journal(journal);
612 
613 	/* Check if the slab summary needs to be updated again. */
614 	update_tail_block_location(journal);
615 }
616 
617 /**
618  * update_tail_block_location() - Update the tail block location in the slab summary, if necessary.
619  * @journal: The slab journal that is updating its tail block location.
620  */
update_tail_block_location(struct slab_journal * journal)621 static void update_tail_block_location(struct slab_journal *journal)
622 {
623 	block_count_t free_block_count;
624 	struct vdo_slab *slab = journal->slab;
625 
626 	if (journal->updating_slab_summary ||
627 	    vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
628 	    (journal->last_summarized >= journal->next_commit)) {
629 		check_if_slab_drained(slab);
630 		return;
631 	}
632 
633 	if (slab->status != VDO_SLAB_REBUILT) {
634 		u8 hint = slab->allocator->summary_entries[slab->slab_number].fullness_hint;
635 
636 		free_block_count = ((block_count_t) hint) << slab->allocator->depot->hint_shift;
637 	} else {
638 		free_block_count = slab->free_blocks;
639 	}
640 
641 	journal->summarized = journal->next_commit;
642 	journal->updating_slab_summary = true;
643 
644 	/*
645 	 * Update slab summary as dirty.
646 	 * vdo_slab journal can only reap past sequence number 1 when all the ref counts for this
647 	 * slab have been written to the layer. Therefore, indicate that the ref counts must be
648 	 * loaded when the journal head has reaped past sequence number 1.
649 	 */
650 	update_slab_summary_entry(slab, &journal->slab_summary_waiter,
651 				  journal->summarized % journal->size,
652 				  (journal->head > 1), false, free_block_count);
653 }
654 
655 /**
656  * reopen_slab_journal() - Reopen a slab's journal by emptying it and then adding pending entries.
657  */
reopen_slab_journal(struct vdo_slab * slab)658 static void reopen_slab_journal(struct vdo_slab *slab)
659 {
660 	struct slab_journal *journal = &slab->journal;
661 	sequence_number_t block;
662 
663 	VDO_ASSERT_LOG_ONLY(journal->tail_header.entry_count == 0,
664 			    "vdo_slab journal's active block empty before reopening");
665 	journal->head = journal->tail;
666 	initialize_journal_state(journal);
667 
668 	/* Ensure no locks are spuriously held on an empty journal. */
669 	for (block = 1; block <= journal->size; block++) {
670 		VDO_ASSERT_LOG_ONLY((get_lock(journal, block)->count == 0),
671 				    "Scrubbed journal's block %llu is not locked",
672 				    (unsigned long long) block);
673 	}
674 
675 	add_entries(journal);
676 }
677 
get_committing_sequence_number(const struct pooled_vio * vio)678 static sequence_number_t get_committing_sequence_number(const struct pooled_vio *vio)
679 {
680 	const struct packed_slab_journal_block *block =
681 		(const struct packed_slab_journal_block *) vio->vio.data;
682 
683 	return __le64_to_cpu(block->header.sequence_number);
684 }
685 
686 /**
687  * complete_write() - Handle post-commit processing.
688  * @completion: The write vio as a completion.
689  *
690  * This is the callback registered by write_slab_journal_block().
691  */
complete_write(struct vdo_completion * completion)692 static void complete_write(struct vdo_completion *completion)
693 {
694 	int result = completion->result;
695 	struct pooled_vio *pooled = vio_as_pooled_vio(as_vio(completion));
696 	struct slab_journal *journal = completion->parent;
697 	sequence_number_t committed = get_committing_sequence_number(pooled);
698 
699 	list_del_init(&pooled->list_entry);
700 	return_vio_to_pool(pooled);
701 
702 	if (result != VDO_SUCCESS) {
703 		vio_record_metadata_io_error(as_vio(completion));
704 		vdo_log_error_strerror(result, "cannot write slab journal block %llu",
705 				       (unsigned long long) committed);
706 		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
707 		check_if_slab_drained(journal->slab);
708 		return;
709 	}
710 
711 	WRITE_ONCE(journal->events->blocks_written, journal->events->blocks_written + 1);
712 
713 	if (list_empty(&journal->uncommitted_blocks)) {
714 		/* If no blocks are outstanding, then the commit point is at the tail. */
715 		journal->next_commit = journal->tail;
716 	} else {
717 		/* The commit point is always the beginning of the oldest incomplete block. */
718 		pooled = container_of(journal->uncommitted_blocks.next,
719 				      struct pooled_vio, list_entry);
720 		journal->next_commit = get_committing_sequence_number(pooled);
721 	}
722 
723 	update_tail_block_location(journal);
724 }
725 
write_slab_journal_endio(struct bio * bio)726 static void write_slab_journal_endio(struct bio *bio)
727 {
728 	struct vio *vio = bio->bi_private;
729 	struct slab_journal *journal = vio->completion.parent;
730 
731 	continue_vio_after_io(vio, complete_write, journal->slab->allocator->thread_id);
732 }
733 
734 /**
735  * write_slab_journal_block() - Write a slab journal block.
736  * @waiter: The vio pool waiter which was just notified.
737  * @context: The vio pool entry for the write.
738  *
739  * Callback from acquire_vio_from_pool() registered in commit_tail().
740  */
write_slab_journal_block(struct vdo_waiter * waiter,void * context)741 static void write_slab_journal_block(struct vdo_waiter *waiter, void *context)
742 {
743 	struct pooled_vio *pooled = context;
744 	struct vio *vio = &pooled->vio;
745 	struct slab_journal *journal =
746 		container_of(waiter, struct slab_journal, resource_waiter);
747 	struct slab_journal_block_header *header = &journal->tail_header;
748 	int unused_entries = journal->entries_per_block - header->entry_count;
749 	physical_block_number_t block_number;
750 	const struct admin_state_code *operation;
751 
752 	header->head = journal->head;
753 	list_add_tail(&pooled->list_entry, &journal->uncommitted_blocks);
754 	vdo_pack_slab_journal_block_header(header, &journal->block->header);
755 
756 	/* Copy the tail block into the vio. */
757 	memcpy(pooled->vio.data, journal->block, VDO_BLOCK_SIZE);
758 
759 	VDO_ASSERT_LOG_ONLY(unused_entries >= 0, "vdo_slab journal block is not overfull");
760 	if (unused_entries > 0) {
761 		/*
762 		 * Release the per-entry locks for any unused entries in the block we are about to
763 		 * write.
764 		 */
765 		adjust_slab_journal_block_reference(journal, header->sequence_number,
766 						    -unused_entries);
767 		journal->partial_write_in_progress = !block_is_full(journal);
768 	}
769 
770 	block_number = journal->slab->journal_origin +
771 		(header->sequence_number % journal->size);
772 	vio->completion.parent = journal;
773 
774 	/*
775 	 * This block won't be read in recovery until the slab summary is updated to refer to it.
776 	 * The slab summary update does a flush which is sufficient to protect us from corruption
777 	 * due to out of order slab journal, reference block, or block map writes.
778 	 */
779 	vdo_submit_metadata_vio(vdo_forget(vio), block_number, write_slab_journal_endio,
780 				complete_write, REQ_OP_WRITE);
781 
782 	/* Since the write is submitted, the tail block structure can be reused. */
783 	journal->tail++;
784 	initialize_tail_block(journal);
785 	journal->waiting_to_commit = false;
786 
787 	operation = vdo_get_admin_state_code(&journal->slab->state);
788 	if (operation == VDO_ADMIN_STATE_WAITING_FOR_RECOVERY) {
789 		vdo_finish_operation(&journal->slab->state,
790 				     (vdo_is_read_only(journal->slab->allocator->depot->vdo) ?
791 				      VDO_READ_ONLY : VDO_SUCCESS));
792 		return;
793 	}
794 
795 	add_entries(journal);
796 }
797 
798 /**
799  * commit_tail() - Commit the tail block of the slab journal.
800  * @journal: The journal whose tail block should be committed.
801  */
commit_tail(struct slab_journal * journal)802 static void commit_tail(struct slab_journal *journal)
803 {
804 	if ((journal->tail_header.entry_count == 0) && must_make_entries_to_flush(journal)) {
805 		/*
806 		 * There are no entries at the moment, but there are some waiters, so defer
807 		 * initiating the flush until those entries are ready to write.
808 		 */
809 		return;
810 	}
811 
812 	if (vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
813 	    journal->waiting_to_commit ||
814 	    (journal->tail_header.entry_count == 0)) {
815 		/*
816 		 * There is nothing to do since the tail block is empty, or writing, or the journal
817 		 * is in read-only mode.
818 		 */
819 		return;
820 	}
821 
822 	/*
823 	 * Since we are about to commit the tail block, this journal no longer needs to be on the
824 	 * list of journals which the recovery journal might ask to commit.
825 	 */
826 	mark_slab_journal_clean(journal);
827 
828 	journal->waiting_to_commit = true;
829 
830 	journal->resource_waiter.callback = write_slab_journal_block;
831 	acquire_vio_from_pool(journal->slab->allocator->vio_pool,
832 			      &journal->resource_waiter);
833 }
834 
835 /**
836  * encode_slab_journal_entry() - Encode a slab journal entry.
837  * @tail_header: The unpacked header for the block.
838  * @payload: The journal block payload to hold the entry.
839  * @sbn: The slab block number of the entry to encode.
840  * @operation: The type of the entry.
841  * @increment: True if this is an increment.
842  *
843  * Exposed for unit tests.
844  */
encode_slab_journal_entry(struct slab_journal_block_header * tail_header,slab_journal_payload * payload,slab_block_number sbn,enum journal_operation operation,bool increment)845 static void encode_slab_journal_entry(struct slab_journal_block_header *tail_header,
846 				      slab_journal_payload *payload,
847 				      slab_block_number sbn,
848 				      enum journal_operation operation,
849 				      bool increment)
850 {
851 	journal_entry_count_t entry_number = tail_header->entry_count++;
852 
853 	if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
854 		if (!tail_header->has_block_map_increments) {
855 			memset(payload->full_entries.entry_types, 0,
856 			       VDO_SLAB_JOURNAL_ENTRY_TYPES_SIZE);
857 			tail_header->has_block_map_increments = true;
858 		}
859 
860 		payload->full_entries.entry_types[entry_number / 8] |=
861 			((u8)1 << (entry_number % 8));
862 	}
863 
864 	vdo_pack_slab_journal_entry(&payload->entries[entry_number], sbn, increment);
865 }
866 
867 /**
868  * expand_journal_point() - Convert a recovery journal journal_point which refers to both an
869  *                          increment and a decrement to a single point which refers to one or the
870  *                          other.
871  * @recovery_point: The journal point to convert.
872  * @increment: Whether the current entry is an increment.
873  *
874  * Return: The expanded journal point
875  *
876  * Because each data_vio has but a single recovery journal point, but may need to make both
877  * increment and decrement entries in the same slab journal. In order to distinguish the two
878  * entries, the entry count of the expanded journal point is twice the actual recovery journal
879  * entry count for increments, and one more than that for decrements.
880  */
expand_journal_point(struct journal_point recovery_point,bool increment)881 static struct journal_point expand_journal_point(struct journal_point recovery_point,
882 						 bool increment)
883 {
884 	recovery_point.entry_count *= 2;
885 	if (!increment)
886 		recovery_point.entry_count++;
887 
888 	return recovery_point;
889 }
890 
891 /**
892  * add_entry() - Actually add an entry to the slab journal, potentially firing off a write if a
893  *               block becomes full.
894  * @journal: The slab journal to append to.
895  * @pbn: The pbn being adjusted.
896  * @operation: The type of entry to make.
897  * @increment: True if this is an increment.
898  * @recovery_point: The expanded recovery point.
899  *
900  * This function is synchronous.
901  */
add_entry(struct slab_journal * journal,physical_block_number_t pbn,enum journal_operation operation,bool increment,struct journal_point recovery_point)902 static void add_entry(struct slab_journal *journal, physical_block_number_t pbn,
903 		      enum journal_operation operation, bool increment,
904 		      struct journal_point recovery_point)
905 {
906 	struct packed_slab_journal_block *block = journal->block;
907 	int result;
908 
909 	result = VDO_ASSERT(vdo_before_journal_point(&journal->tail_header.recovery_point,
910 						     &recovery_point),
911 			    "recovery journal point is monotonically increasing, recovery point: %llu.%u, block recovery point: %llu.%u",
912 			    (unsigned long long) recovery_point.sequence_number,
913 			    recovery_point.entry_count,
914 			    (unsigned long long) journal->tail_header.recovery_point.sequence_number,
915 			    journal->tail_header.recovery_point.entry_count);
916 	if (result != VDO_SUCCESS) {
917 		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
918 		return;
919 	}
920 
921 	if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
922 		result = VDO_ASSERT((journal->tail_header.entry_count <
923 				     journal->full_entries_per_block),
924 				    "block has room for full entries");
925 		if (result != VDO_SUCCESS) {
926 			vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo,
927 						 result);
928 			return;
929 		}
930 	}
931 
932 	encode_slab_journal_entry(&journal->tail_header, &block->payload,
933 				  pbn - journal->slab->start, operation, increment);
934 	journal->tail_header.recovery_point = recovery_point;
935 	if (block_is_full(journal))
936 		commit_tail(journal);
937 }
938 
journal_length(const struct slab_journal * journal)939 static inline block_count_t journal_length(const struct slab_journal *journal)
940 {
941 	return journal->tail - journal->head;
942 }
943 
944 /**
945  * vdo_attempt_replay_into_slab() - Replay a recovery journal entry into a slab's journal.
946  * @slab: The slab to play into.
947  * @pbn: The PBN for the entry.
948  * @operation: The type of entry to add.
949  * @increment: True if this entry is an increment.
950  * @recovery_point: The recovery journal point corresponding to this entry.
951  * @parent: The completion to notify when there is space to add the entry if the entry could not be
952  *          added immediately.
953  *
954  * Return: true if the entry was added immediately.
955  */
vdo_attempt_replay_into_slab(struct vdo_slab * slab,physical_block_number_t pbn,enum journal_operation operation,bool increment,struct journal_point * recovery_point,struct vdo_completion * parent)956 bool vdo_attempt_replay_into_slab(struct vdo_slab *slab, physical_block_number_t pbn,
957 				  enum journal_operation operation, bool increment,
958 				  struct journal_point *recovery_point,
959 				  struct vdo_completion *parent)
960 {
961 	struct slab_journal *journal = &slab->journal;
962 	struct slab_journal_block_header *header = &journal->tail_header;
963 	struct journal_point expanded = expand_journal_point(*recovery_point, increment);
964 
965 	/* Only accept entries after the current recovery point. */
966 	if (!vdo_before_journal_point(&journal->tail_header.recovery_point, &expanded))
967 		return true;
968 
969 	if ((header->entry_count >= journal->full_entries_per_block) &&
970 	    (header->has_block_map_increments || (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING))) {
971 		/*
972 		 * The tail block does not have room for the entry we are attempting to add so
973 		 * commit the tail block now.
974 		 */
975 		commit_tail(journal);
976 	}
977 
978 	if (journal->waiting_to_commit) {
979 		vdo_start_operation_with_waiter(&journal->slab->state,
980 						VDO_ADMIN_STATE_WAITING_FOR_RECOVERY,
981 						parent, NULL);
982 		return false;
983 	}
984 
985 	if (journal_length(journal) >= journal->size) {
986 		/*
987 		 * We must have reaped the current head before the crash, since the blocked
988 		 * threshold keeps us from having more entries than fit in a slab journal; hence we
989 		 * can just advance the head (and unreapable block), as needed.
990 		 */
991 		journal->head++;
992 		journal->unreapable++;
993 	}
994 
995 	if (journal->slab->status == VDO_SLAB_REBUILT)
996 		journal->slab->status = VDO_SLAB_REPLAYING;
997 
998 	add_entry(journal, pbn, operation, increment, expanded);
999 	return true;
1000 }
1001 
1002 /**
1003  * requires_reaping() - Check whether the journal must be reaped before adding new entries.
1004  * @journal: The journal to check.
1005  *
1006  * Return: true if the journal must be reaped.
1007  */
requires_reaping(const struct slab_journal * journal)1008 static bool requires_reaping(const struct slab_journal *journal)
1009 {
1010 	return (journal_length(journal) >= journal->blocking_threshold);
1011 }
1012 
1013 /** finish_summary_update() - A waiter callback that resets the writing state of a slab. */
finish_summary_update(struct vdo_waiter * waiter,void * context)1014 static void finish_summary_update(struct vdo_waiter *waiter, void *context)
1015 {
1016 	struct vdo_slab *slab = container_of(waiter, struct vdo_slab, summary_waiter);
1017 	int result = *((int *) context);
1018 
1019 	slab->active_count--;
1020 
1021 	if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) {
1022 		vdo_log_error_strerror(result, "failed to update slab summary");
1023 		vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1024 	}
1025 
1026 	check_if_slab_drained(slab);
1027 }
1028 
1029 static void write_reference_block(struct vdo_waiter *waiter, void *context);
1030 
1031 /**
1032  * launch_reference_block_write() - Launch the write of a dirty reference block by first acquiring
1033  *                                  a VIO for it from the pool.
1034  * @waiter: The waiter of the block which is starting to write.
1035  * @context: The parent slab of the block.
1036  *
1037  * This can be asynchronous since the writer will have to wait if all VIOs in the pool are
1038  * currently in use.
1039  */
launch_reference_block_write(struct vdo_waiter * waiter,void * context)1040 static void launch_reference_block_write(struct vdo_waiter *waiter, void *context)
1041 {
1042 	struct vdo_slab *slab = context;
1043 
1044 	if (vdo_is_read_only(slab->allocator->depot->vdo))
1045 		return;
1046 
1047 	slab->active_count++;
1048 	container_of(waiter, struct reference_block, waiter)->is_writing = true;
1049 	waiter->callback = write_reference_block;
1050 	acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
1051 }
1052 
save_dirty_reference_blocks(struct vdo_slab * slab)1053 static void save_dirty_reference_blocks(struct vdo_slab *slab)
1054 {
1055 	vdo_waitq_notify_all_waiters(&slab->dirty_blocks,
1056 				     launch_reference_block_write, slab);
1057 	check_if_slab_drained(slab);
1058 }
1059 
1060 /**
1061  * finish_reference_block_write() - After a reference block has written, clean it, release its
1062  *                                  locks, and return its VIO to the pool.
1063  * @completion: The VIO that just finished writing.
1064  */
finish_reference_block_write(struct vdo_completion * completion)1065 static void finish_reference_block_write(struct vdo_completion *completion)
1066 {
1067 	struct vio *vio = as_vio(completion);
1068 	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1069 	struct reference_block *block = completion->parent;
1070 	struct vdo_slab *slab = block->slab;
1071 	tail_block_offset_t offset;
1072 
1073 	slab->active_count--;
1074 
1075 	/* Release the slab journal lock. */
1076 	adjust_slab_journal_block_reference(&slab->journal,
1077 					    block->slab_journal_lock_to_release, -1);
1078 	return_vio_to_pool(pooled);
1079 
1080 	/*
1081 	 * We can't clear the is_writing flag earlier as releasing the slab journal lock may cause
1082 	 * us to be dirtied again, but we don't want to double enqueue.
1083 	 */
1084 	block->is_writing = false;
1085 
1086 	if (vdo_is_read_only(completion->vdo)) {
1087 		check_if_slab_drained(slab);
1088 		return;
1089 	}
1090 
1091 	/* Re-queue the block if it was re-dirtied while it was writing. */
1092 	if (block->is_dirty) {
1093 		vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1094 		if (vdo_is_state_draining(&slab->state)) {
1095 			/* We must be saving, and this block will otherwise not be relaunched. */
1096 			save_dirty_reference_blocks(slab);
1097 		}
1098 
1099 		return;
1100 	}
1101 
1102 	/*
1103 	 * Mark the slab as clean in the slab summary if there are no dirty or writing blocks
1104 	 * and no summary update in progress.
1105 	 */
1106 	if ((slab->active_count > 0) || vdo_waitq_has_waiters(&slab->dirty_blocks)) {
1107 		check_if_slab_drained(slab);
1108 		return;
1109 	}
1110 
1111 	offset = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
1112 	slab->active_count++;
1113 	slab->summary_waiter.callback = finish_summary_update;
1114 	update_slab_summary_entry(slab, &slab->summary_waiter, offset,
1115 				  true, true, slab->free_blocks);
1116 }
1117 
1118 /**
1119  * get_reference_counters_for_block() - Find the reference counters for a given block.
1120  * @block: The reference_block in question.
1121  *
1122  * Return: A pointer to the reference counters for this block.
1123  */
get_reference_counters_for_block(struct reference_block * block)1124 static vdo_refcount_t * __must_check get_reference_counters_for_block(struct reference_block *block)
1125 {
1126 	size_t block_index = block - block->slab->reference_blocks;
1127 
1128 	return &block->slab->counters[block_index * COUNTS_PER_BLOCK];
1129 }
1130 
1131 /**
1132  * pack_reference_block() - Copy data from a reference block to a buffer ready to be written out.
1133  * @block: The block to copy.
1134  * @buffer: The char buffer to fill with the packed block.
1135  */
pack_reference_block(struct reference_block * block,void * buffer)1136 static void pack_reference_block(struct reference_block *block, void *buffer)
1137 {
1138 	struct packed_reference_block *packed = buffer;
1139 	vdo_refcount_t *counters = get_reference_counters_for_block(block);
1140 	sector_count_t i;
1141 	struct packed_journal_point commit_point;
1142 
1143 	vdo_pack_journal_point(&block->slab->slab_journal_point, &commit_point);
1144 
1145 	for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
1146 		packed->sectors[i].commit_point = commit_point;
1147 		memcpy(packed->sectors[i].counts, counters + (i * COUNTS_PER_SECTOR),
1148 		       (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
1149 	}
1150 }
1151 
write_reference_block_endio(struct bio * bio)1152 static void write_reference_block_endio(struct bio *bio)
1153 {
1154 	struct vio *vio = bio->bi_private;
1155 	struct reference_block *block = vio->completion.parent;
1156 	thread_id_t thread_id = block->slab->allocator->thread_id;
1157 
1158 	continue_vio_after_io(vio, finish_reference_block_write, thread_id);
1159 }
1160 
1161 /**
1162  * handle_io_error() - Handle an I/O error reading or writing a reference count block.
1163  * @completion: The VIO doing the I/O as a completion.
1164  */
handle_io_error(struct vdo_completion * completion)1165 static void handle_io_error(struct vdo_completion *completion)
1166 {
1167 	int result = completion->result;
1168 	struct vio *vio = as_vio(completion);
1169 	struct vdo_slab *slab = ((struct reference_block *) completion->parent)->slab;
1170 
1171 	vio_record_metadata_io_error(vio);
1172 	return_vio_to_pool(vio_as_pooled_vio(vio));
1173 	slab->active_count -= vio->io_size / VDO_BLOCK_SIZE;
1174 	vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1175 	check_if_slab_drained(slab);
1176 }
1177 
1178 /**
1179  * write_reference_block() - After a dirty block waiter has gotten a VIO from the VIO pool, copy
1180  *                           its counters and associated data into the VIO, and launch the write.
1181  * @waiter: The waiter of the dirty block.
1182  * @context: The VIO returned by the pool.
1183  */
write_reference_block(struct vdo_waiter * waiter,void * context)1184 static void write_reference_block(struct vdo_waiter *waiter, void *context)
1185 {
1186 	size_t block_offset;
1187 	physical_block_number_t pbn;
1188 	struct pooled_vio *pooled = context;
1189 	struct vdo_completion *completion = &pooled->vio.completion;
1190 	struct reference_block *block = container_of(waiter, struct reference_block,
1191 						     waiter);
1192 
1193 	pack_reference_block(block, pooled->vio.data);
1194 	block_offset = (block - block->slab->reference_blocks);
1195 	pbn = (block->slab->ref_counts_origin + block_offset);
1196 	block->slab_journal_lock_to_release = block->slab_journal_lock;
1197 	completion->parent = block;
1198 
1199 	/*
1200 	 * Mark the block as clean, since we won't be committing any updates that happen after this
1201 	 * moment. As long as VIO order is preserved, two VIOs updating this block at once will not
1202 	 * cause complications.
1203 	 */
1204 	block->is_dirty = false;
1205 
1206 	/*
1207 	 * Flush before writing to ensure that the recovery journal and slab journal entries which
1208 	 * cover this reference update are stable. This prevents data corruption that can be caused
1209 	 * by out of order writes.
1210 	 */
1211 	WRITE_ONCE(block->slab->allocator->ref_counts_statistics.blocks_written,
1212 		   block->slab->allocator->ref_counts_statistics.blocks_written + 1);
1213 
1214 	completion->callback_thread_id = ((struct block_allocator *) pooled->context)->thread_id;
1215 	vdo_submit_metadata_vio(&pooled->vio, pbn, write_reference_block_endio,
1216 				handle_io_error, REQ_OP_WRITE | REQ_PREFLUSH);
1217 }
1218 
reclaim_journal_space(struct slab_journal * journal)1219 static void reclaim_journal_space(struct slab_journal *journal)
1220 {
1221 	block_count_t length = journal_length(journal);
1222 	struct vdo_slab *slab = journal->slab;
1223 	block_count_t write_count = vdo_waitq_num_waiters(&slab->dirty_blocks);
1224 	block_count_t written;
1225 
1226 	if ((length < journal->flushing_threshold) || (write_count == 0))
1227 		return;
1228 
1229 	/* The slab journal is over the first threshold, schedule some reference block writes. */
1230 	WRITE_ONCE(journal->events->flush_count, journal->events->flush_count + 1);
1231 	if (length < journal->flushing_deadline) {
1232 		/* Schedule more writes the closer to the deadline we get. */
1233 		write_count /= journal->flushing_deadline - length + 1;
1234 		write_count = max_t(block_count_t, write_count, 1);
1235 	}
1236 
1237 	for (written = 0; written < write_count; written++) {
1238 		vdo_waitq_notify_next_waiter(&slab->dirty_blocks,
1239 					     launch_reference_block_write, slab);
1240 	}
1241 }
1242 
1243 /**
1244  * reference_count_to_status() - Convert a reference count to a reference status.
1245  * @count: The count to convert.
1246  *
1247  * Return: The appropriate reference status.
1248  */
reference_count_to_status(vdo_refcount_t count)1249 static enum reference_status __must_check reference_count_to_status(vdo_refcount_t count)
1250 {
1251 	if (count == EMPTY_REFERENCE_COUNT)
1252 		return RS_FREE;
1253 	else if (count == 1)
1254 		return RS_SINGLE;
1255 	else if (count == PROVISIONAL_REFERENCE_COUNT)
1256 		return RS_PROVISIONAL;
1257 	else
1258 		return RS_SHARED;
1259 }
1260 
1261 /**
1262  * dirty_block() - Mark a reference count block as dirty, potentially adding it to the dirty queue
1263  *                 if it wasn't already dirty.
1264  * @block: The reference block to mark as dirty.
1265  */
dirty_block(struct reference_block * block)1266 static void dirty_block(struct reference_block *block)
1267 {
1268 	if (block->is_dirty)
1269 		return;
1270 
1271 	block->is_dirty = true;
1272 	if (!block->is_writing)
1273 		vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1274 }
1275 
1276 /**
1277  * get_reference_block() - Get the reference block that covers the given block index.
1278  */
get_reference_block(struct vdo_slab * slab,slab_block_number index)1279 static struct reference_block * __must_check get_reference_block(struct vdo_slab *slab,
1280 								 slab_block_number index)
1281 {
1282 	return &slab->reference_blocks[index / COUNTS_PER_BLOCK];
1283 }
1284 
1285 /**
1286  * slab_block_number_from_pbn() - Determine the index within the slab of a particular physical
1287  *                                block number.
1288  * @slab: The slab.
1289  * @pbn: The physical block number.
1290  * @slab_block_number_ptr: A pointer to the slab block number.
1291  *
1292  * Return: VDO_SUCCESS or an error code.
1293  */
slab_block_number_from_pbn(struct vdo_slab * slab,physical_block_number_t pbn,slab_block_number * slab_block_number_ptr)1294 static int __must_check slab_block_number_from_pbn(struct vdo_slab *slab,
1295 						   physical_block_number_t pbn,
1296 						   slab_block_number *slab_block_number_ptr)
1297 {
1298 	u64 slab_block_number;
1299 
1300 	if (pbn < slab->start)
1301 		return VDO_OUT_OF_RANGE;
1302 
1303 	slab_block_number = pbn - slab->start;
1304 	if (slab_block_number >= slab->allocator->depot->slab_config.data_blocks)
1305 		return VDO_OUT_OF_RANGE;
1306 
1307 	*slab_block_number_ptr = slab_block_number;
1308 	return VDO_SUCCESS;
1309 }
1310 
1311 /**
1312  * get_reference_counter() - Get the reference counter that covers the given physical block number.
1313  * @slab: The slab to query.
1314  * @pbn: The physical block number.
1315  * @counter_ptr: A pointer to the reference counter.
1316  */
get_reference_counter(struct vdo_slab * slab,physical_block_number_t pbn,vdo_refcount_t ** counter_ptr)1317 static int __must_check get_reference_counter(struct vdo_slab *slab,
1318 					      physical_block_number_t pbn,
1319 					      vdo_refcount_t **counter_ptr)
1320 {
1321 	slab_block_number index;
1322 	int result = slab_block_number_from_pbn(slab, pbn, &index);
1323 
1324 	if (result != VDO_SUCCESS)
1325 		return result;
1326 
1327 	*counter_ptr = &slab->counters[index];
1328 
1329 	return VDO_SUCCESS;
1330 }
1331 
calculate_slab_priority(struct vdo_slab * slab)1332 static unsigned int calculate_slab_priority(struct vdo_slab *slab)
1333 {
1334 	block_count_t free_blocks = slab->free_blocks;
1335 	unsigned int unopened_slab_priority = slab->allocator->unopened_slab_priority;
1336 	unsigned int priority;
1337 
1338 	/*
1339 	 * Wholly full slabs must be the only ones with lowest priority, 0.
1340 	 *
1341 	 * Slabs that have never been opened (empty, newly initialized, and never been written to)
1342 	 * have lower priority than previously opened slabs that have a significant number of free
1343 	 * blocks. This ranking causes VDO to avoid writing physical blocks for the first time
1344 	 * unless there are very few free blocks that have been previously written to.
1345 	 *
1346 	 * Since VDO doesn't discard blocks currently, reusing previously written blocks makes VDO
1347 	 * a better client of any underlying storage that is thinly-provisioned (though discarding
1348 	 * would be better).
1349 	 *
1350 	 * For all other slabs, the priority is derived from the logarithm of the number of free
1351 	 * blocks. Slabs with the same order of magnitude of free blocks have the same priority.
1352 	 * With 2^23 blocks, the priority will range from 1 to 25. The reserved
1353 	 * unopened_slab_priority divides the range and is skipped by the logarithmic mapping.
1354 	 */
1355 
1356 	if (free_blocks == 0)
1357 		return 0;
1358 
1359 	if (is_slab_journal_blank(slab))
1360 		return unopened_slab_priority;
1361 
1362 	priority = (1 + ilog2(free_blocks));
1363 	return ((priority < unopened_slab_priority) ? priority : priority + 1);
1364 }
1365 
1366 /*
1367  * Slabs are essentially prioritized by an approximation of the number of free blocks in the slab
1368  * so slabs with lots of free blocks will be opened for allocation before slabs that have few free
1369  * blocks.
1370  */
prioritize_slab(struct vdo_slab * slab)1371 static void prioritize_slab(struct vdo_slab *slab)
1372 {
1373 	VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
1374 			    "a slab must not already be on a list when prioritizing");
1375 	slab->priority = calculate_slab_priority(slab);
1376 	vdo_priority_table_enqueue(slab->allocator->prioritized_slabs,
1377 				   slab->priority, &slab->allocq_entry);
1378 }
1379 
1380 /**
1381  * adjust_free_block_count() - Adjust the free block count and (if needed) reprioritize the slab.
1382  * @incremented: true if the free block count went up.
1383  */
adjust_free_block_count(struct vdo_slab * slab,bool incremented)1384 static void adjust_free_block_count(struct vdo_slab *slab, bool incremented)
1385 {
1386 	struct block_allocator *allocator = slab->allocator;
1387 
1388 	WRITE_ONCE(allocator->allocated_blocks,
1389 		   allocator->allocated_blocks + (incremented ? -1 : 1));
1390 
1391 	/* The open slab doesn't need to be reprioritized until it is closed. */
1392 	if (slab == allocator->open_slab)
1393 		return;
1394 
1395 	/* Don't bother adjusting the priority table if unneeded. */
1396 	if (slab->priority == calculate_slab_priority(slab))
1397 		return;
1398 
1399 	/*
1400 	 * Reprioritize the slab to reflect the new free block count by removing it from the table
1401 	 * and re-enqueuing it with the new priority.
1402 	 */
1403 	vdo_priority_table_remove(allocator->prioritized_slabs, &slab->allocq_entry);
1404 	prioritize_slab(slab);
1405 }
1406 
1407 /**
1408  * increment_for_data() - Increment the reference count for a data block.
1409  * @slab: The slab which owns the block.
1410  * @block: The reference block which contains the block being updated.
1411  * @block_number: The block to update.
1412  * @old_status: The reference status of the data block before this increment.
1413  * @lock: The pbn_lock associated with this increment (may be NULL).
1414  * @counter_ptr: A pointer to the count for the data block (in, out).
1415  * @adjust_block_count: Whether to update the allocator's free block count.
1416  *
1417  * Return: VDO_SUCCESS or an error.
1418  */
increment_for_data(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct pbn_lock * lock,vdo_refcount_t * counter_ptr,bool adjust_block_count)1419 static int increment_for_data(struct vdo_slab *slab, struct reference_block *block,
1420 			      slab_block_number block_number,
1421 			      enum reference_status old_status,
1422 			      struct pbn_lock *lock, vdo_refcount_t *counter_ptr,
1423 			      bool adjust_block_count)
1424 {
1425 	switch (old_status) {
1426 	case RS_FREE:
1427 		*counter_ptr = 1;
1428 		block->allocated_count++;
1429 		slab->free_blocks--;
1430 		if (adjust_block_count)
1431 			adjust_free_block_count(slab, false);
1432 
1433 		break;
1434 
1435 	case RS_PROVISIONAL:
1436 		*counter_ptr = 1;
1437 		break;
1438 
1439 	default:
1440 		/* Single or shared */
1441 		if (*counter_ptr >= MAXIMUM_REFERENCE_COUNT) {
1442 			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1443 						      "Incrementing a block already having 254 references (slab %u, offset %u)",
1444 						      slab->slab_number, block_number);
1445 		}
1446 		(*counter_ptr)++;
1447 	}
1448 
1449 	if (lock != NULL)
1450 		vdo_unassign_pbn_lock_provisional_reference(lock);
1451 	return VDO_SUCCESS;
1452 }
1453 
1454 /**
1455  * decrement_for_data() - Decrement the reference count for a data block.
1456  * @slab: The slab which owns the block.
1457  * @block: The reference block which contains the block being updated.
1458  * @block_number: The block to update.
1459  * @old_status: The reference status of the data block before this decrement.
1460  * @updater: The reference updater doing this operation in case we need to look up the pbn lock.
1461  * @counter_ptr: A pointer to the count for the data block (in, out).
1462  * @adjust_block_count: Whether to update the allocator's free block count.
1463  *
1464  * Return: VDO_SUCCESS or an error.
1465  */
decrement_for_data(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct reference_updater * updater,vdo_refcount_t * counter_ptr,bool adjust_block_count)1466 static int decrement_for_data(struct vdo_slab *slab, struct reference_block *block,
1467 			      slab_block_number block_number,
1468 			      enum reference_status old_status,
1469 			      struct reference_updater *updater,
1470 			      vdo_refcount_t *counter_ptr, bool adjust_block_count)
1471 {
1472 	switch (old_status) {
1473 	case RS_FREE:
1474 		return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1475 					      "Decrementing free block at offset %u in slab %u",
1476 					      block_number, slab->slab_number);
1477 
1478 	case RS_PROVISIONAL:
1479 	case RS_SINGLE:
1480 		if (updater->zpbn.zone != NULL) {
1481 			struct pbn_lock *lock = vdo_get_physical_zone_pbn_lock(updater->zpbn.zone,
1482 									       updater->zpbn.pbn);
1483 
1484 			if (lock != NULL) {
1485 				/*
1486 				 * There is a read lock on this block, so the block must not become
1487 				 * unreferenced.
1488 				 */
1489 				*counter_ptr = PROVISIONAL_REFERENCE_COUNT;
1490 				vdo_assign_pbn_lock_provisional_reference(lock);
1491 				break;
1492 			}
1493 		}
1494 
1495 		*counter_ptr = EMPTY_REFERENCE_COUNT;
1496 		block->allocated_count--;
1497 		slab->free_blocks++;
1498 		if (adjust_block_count)
1499 			adjust_free_block_count(slab, true);
1500 
1501 		break;
1502 
1503 	default:
1504 		/* Shared */
1505 		(*counter_ptr)--;
1506 	}
1507 
1508 	return VDO_SUCCESS;
1509 }
1510 
1511 /**
1512  * increment_for_block_map() - Increment the reference count for a block map page.
1513  * @slab: The slab which owns the block.
1514  * @block: The reference block which contains the block being updated.
1515  * @block_number: The block to update.
1516  * @old_status: The reference status of the block before this increment.
1517  * @lock: The pbn_lock associated with this increment (may be NULL).
1518  * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1519  * @counter_ptr: A pointer to the count for the block (in, out).
1520  * @adjust_block_count: Whether to update the allocator's free block count.
1521  *
1522  * All block map increments should be from provisional to MAXIMUM_REFERENCE_COUNT. Since block map
1523  * blocks never dedupe they should never be adjusted from any other state. The adjustment always
1524  * results in MAXIMUM_REFERENCE_COUNT as this value is used to prevent dedupe against block map
1525  * blocks.
1526  *
1527  * Return: VDO_SUCCESS or an error.
1528  */
increment_for_block_map(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct pbn_lock * lock,bool normal_operation,vdo_refcount_t * counter_ptr,bool adjust_block_count)1529 static int increment_for_block_map(struct vdo_slab *slab, struct reference_block *block,
1530 				   slab_block_number block_number,
1531 				   enum reference_status old_status,
1532 				   struct pbn_lock *lock, bool normal_operation,
1533 				   vdo_refcount_t *counter_ptr, bool adjust_block_count)
1534 {
1535 	switch (old_status) {
1536 	case RS_FREE:
1537 		if (normal_operation) {
1538 			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1539 						      "Incrementing unallocated block map block (slab %u, offset %u)",
1540 						      slab->slab_number, block_number);
1541 		}
1542 
1543 		*counter_ptr = MAXIMUM_REFERENCE_COUNT;
1544 		block->allocated_count++;
1545 		slab->free_blocks--;
1546 		if (adjust_block_count)
1547 			adjust_free_block_count(slab, false);
1548 
1549 		return VDO_SUCCESS;
1550 
1551 	case RS_PROVISIONAL:
1552 		if (!normal_operation)
1553 			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1554 						      "Block map block had provisional reference during replay (slab %u, offset %u)",
1555 						      slab->slab_number, block_number);
1556 
1557 		*counter_ptr = MAXIMUM_REFERENCE_COUNT;
1558 		if (lock != NULL)
1559 			vdo_unassign_pbn_lock_provisional_reference(lock);
1560 		return VDO_SUCCESS;
1561 
1562 	default:
1563 		return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1564 					      "Incrementing a block map block which is already referenced %u times (slab %u, offset %u)",
1565 					      *counter_ptr, slab->slab_number,
1566 					      block_number);
1567 	}
1568 }
1569 
is_valid_journal_point(const struct journal_point * point)1570 static bool __must_check is_valid_journal_point(const struct journal_point *point)
1571 {
1572 	return ((point != NULL) && (point->sequence_number > 0));
1573 }
1574 
1575 /**
1576  * update_reference_count() - Update the reference count of a block.
1577  * @slab: The slab which owns the block.
1578  * @block: The reference block which contains the block being updated.
1579  * @block_number: The block to update.
1580  * @slab_journal_point: The slab journal point at which this update is journaled.
1581  * @updater: The reference updater.
1582  * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1583  * @adjust_block_count: Whether to update the slab's free block count.
1584  * @provisional_decrement_ptr: A pointer which will be set to true if this update was a decrement
1585  *                             of a provisional reference.
1586  *
1587  * Return: VDO_SUCCESS or an error.
1588  */
update_reference_count(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,const struct journal_point * slab_journal_point,struct reference_updater * updater,bool normal_operation,bool adjust_block_count,bool * provisional_decrement_ptr)1589 static int update_reference_count(struct vdo_slab *slab, struct reference_block *block,
1590 				  slab_block_number block_number,
1591 				  const struct journal_point *slab_journal_point,
1592 				  struct reference_updater *updater,
1593 				  bool normal_operation, bool adjust_block_count,
1594 				  bool *provisional_decrement_ptr)
1595 {
1596 	vdo_refcount_t *counter_ptr = &slab->counters[block_number];
1597 	enum reference_status old_status = reference_count_to_status(*counter_ptr);
1598 	int result;
1599 
1600 	if (!updater->increment) {
1601 		result = decrement_for_data(slab, block, block_number, old_status,
1602 					    updater, counter_ptr, adjust_block_count);
1603 		if ((result == VDO_SUCCESS) && (old_status == RS_PROVISIONAL)) {
1604 			if (provisional_decrement_ptr != NULL)
1605 				*provisional_decrement_ptr = true;
1606 			return VDO_SUCCESS;
1607 		}
1608 	} else if (updater->operation == VDO_JOURNAL_DATA_REMAPPING) {
1609 		result = increment_for_data(slab, block, block_number, old_status,
1610 					    updater->lock, counter_ptr, adjust_block_count);
1611 	} else {
1612 		result = increment_for_block_map(slab, block, block_number, old_status,
1613 						 updater->lock, normal_operation,
1614 						 counter_ptr, adjust_block_count);
1615 	}
1616 
1617 	if (result != VDO_SUCCESS)
1618 		return result;
1619 
1620 	if (is_valid_journal_point(slab_journal_point))
1621 		slab->slab_journal_point = *slab_journal_point;
1622 
1623 	return VDO_SUCCESS;
1624 }
1625 
adjust_reference_count(struct vdo_slab * slab,struct reference_updater * updater,const struct journal_point * slab_journal_point)1626 static int __must_check adjust_reference_count(struct vdo_slab *slab,
1627 					       struct reference_updater *updater,
1628 					       const struct journal_point *slab_journal_point)
1629 {
1630 	slab_block_number block_number;
1631 	int result;
1632 	struct reference_block *block;
1633 	bool provisional_decrement = false;
1634 
1635 	if (!is_slab_open(slab))
1636 		return VDO_INVALID_ADMIN_STATE;
1637 
1638 	result = slab_block_number_from_pbn(slab, updater->zpbn.pbn, &block_number);
1639 	if (result != VDO_SUCCESS)
1640 		return result;
1641 
1642 	block = get_reference_block(slab, block_number);
1643 	result = update_reference_count(slab, block, block_number, slab_journal_point,
1644 					updater, NORMAL_OPERATION, true,
1645 					&provisional_decrement);
1646 	if ((result != VDO_SUCCESS) || provisional_decrement)
1647 		return result;
1648 
1649 	if (block->is_dirty && (block->slab_journal_lock > 0)) {
1650 		sequence_number_t entry_lock = slab_journal_point->sequence_number;
1651 		/*
1652 		 * This block is already dirty and a slab journal entry has been made for it since
1653 		 * the last time it was clean. We must release the per-entry slab journal lock for
1654 		 * the entry associated with the update we are now doing.
1655 		 */
1656 		result = VDO_ASSERT(is_valid_journal_point(slab_journal_point),
1657 				    "Reference count adjustments need slab journal points.");
1658 		if (result != VDO_SUCCESS)
1659 			return result;
1660 
1661 		adjust_slab_journal_block_reference(&slab->journal, entry_lock, -1);
1662 		return VDO_SUCCESS;
1663 	}
1664 
1665 	/*
1666 	 * This may be the first time we are applying an update for which there is a slab journal
1667 	 * entry to this block since the block was cleaned. Therefore, we convert the per-entry
1668 	 * slab journal lock to an uncommitted reference block lock, if there is a per-entry lock.
1669 	 */
1670 	if (is_valid_journal_point(slab_journal_point))
1671 		block->slab_journal_lock = slab_journal_point->sequence_number;
1672 	else
1673 		block->slab_journal_lock = 0;
1674 
1675 	dirty_block(block);
1676 	return VDO_SUCCESS;
1677 }
1678 
1679 /**
1680  * add_entry_from_waiter() - Add an entry to the slab journal.
1681  * @waiter: The vio which should make an entry now.
1682  * @context: The slab journal to make an entry in.
1683  *
1684  * This callback is invoked by add_entries() once it has determined that we are ready to make
1685  * another entry in the slab journal. Implements waiter_callback_fn.
1686  */
add_entry_from_waiter(struct vdo_waiter * waiter,void * context)1687 static void add_entry_from_waiter(struct vdo_waiter *waiter, void *context)
1688 {
1689 	int result;
1690 	struct reference_updater *updater =
1691 		container_of(waiter, struct reference_updater, waiter);
1692 	struct data_vio *data_vio = data_vio_from_reference_updater(updater);
1693 	struct slab_journal *journal = context;
1694 	struct slab_journal_block_header *header = &journal->tail_header;
1695 	struct journal_point slab_journal_point = {
1696 		.sequence_number = header->sequence_number,
1697 		.entry_count = header->entry_count,
1698 	};
1699 	sequence_number_t recovery_block = data_vio->recovery_journal_point.sequence_number;
1700 
1701 	if (header->entry_count == 0) {
1702 		/*
1703 		 * This is the first entry in the current tail block, so get a lock on the recovery
1704 		 * journal which we will hold until this tail block is committed.
1705 		 */
1706 		get_lock(journal, header->sequence_number)->recovery_start = recovery_block;
1707 		if (journal->recovery_journal != NULL) {
1708 			zone_count_t zone_number = journal->slab->allocator->zone_number;
1709 
1710 			vdo_acquire_recovery_journal_block_reference(journal->recovery_journal,
1711 								     recovery_block,
1712 								     VDO_ZONE_TYPE_PHYSICAL,
1713 								     zone_number);
1714 		}
1715 
1716 		mark_slab_journal_dirty(journal, recovery_block);
1717 		reclaim_journal_space(journal);
1718 	}
1719 
1720 	add_entry(journal, updater->zpbn.pbn, updater->operation, updater->increment,
1721 		  expand_journal_point(data_vio->recovery_journal_point,
1722 				       updater->increment));
1723 
1724 	if (journal->slab->status != VDO_SLAB_REBUILT) {
1725 		/*
1726 		 * If the slab is unrecovered, scrubbing will take care of the count since the
1727 		 * update is now recorded in the journal.
1728 		 */
1729 		adjust_slab_journal_block_reference(journal,
1730 						    slab_journal_point.sequence_number, -1);
1731 		result = VDO_SUCCESS;
1732 	} else {
1733 		/* Now that an entry has been made in the slab journal, update the counter. */
1734 		result = adjust_reference_count(journal->slab, updater,
1735 						&slab_journal_point);
1736 	}
1737 
1738 	if (updater->increment)
1739 		continue_data_vio_with_error(data_vio, result);
1740 	else
1741 		vdo_continue_completion(&data_vio->decrement_completion, result);
1742 }
1743 
1744 /**
1745  * is_next_entry_a_block_map_increment() - Check whether the next entry to be made is a block map
1746  *                                         increment.
1747  * @journal: The journal.
1748  *
1749  * Return: true if the first entry waiter's operation is a block map increment.
1750  */
is_next_entry_a_block_map_increment(struct slab_journal * journal)1751 static inline bool is_next_entry_a_block_map_increment(struct slab_journal *journal)
1752 {
1753 	struct vdo_waiter *waiter = vdo_waitq_get_first_waiter(&journal->entry_waiters);
1754 	struct reference_updater *updater =
1755 		container_of(waiter, struct reference_updater, waiter);
1756 
1757 	return (updater->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING);
1758 }
1759 
1760 /**
1761  * add_entries() - Add as many entries as possible from the queue of vios waiting to make entries.
1762  * @journal: The journal to which entries may be added.
1763  *
1764  * By processing the queue in order, we ensure that slab journal entries are made in the same order
1765  * as recovery journal entries for the same increment or decrement.
1766  */
add_entries(struct slab_journal * journal)1767 static void add_entries(struct slab_journal *journal)
1768 {
1769 	if (journal->adding_entries) {
1770 		/* Protect against re-entrancy. */
1771 		return;
1772 	}
1773 
1774 	journal->adding_entries = true;
1775 	while (vdo_waitq_has_waiters(&journal->entry_waiters)) {
1776 		struct slab_journal_block_header *header = &journal->tail_header;
1777 
1778 		if (journal->partial_write_in_progress ||
1779 		    (journal->slab->status == VDO_SLAB_REBUILDING)) {
1780 			/*
1781 			 * Don't add entries while rebuilding or while a partial write is
1782 			 * outstanding, as it could result in reference count corruption.
1783 			 */
1784 			break;
1785 		}
1786 
1787 		if (journal->waiting_to_commit) {
1788 			/*
1789 			 * If we are waiting for resources to write the tail block, and the tail
1790 			 * block is full, we can't make another entry.
1791 			 */
1792 			WRITE_ONCE(journal->events->tail_busy_count,
1793 				   journal->events->tail_busy_count + 1);
1794 			break;
1795 		} else if (is_next_entry_a_block_map_increment(journal) &&
1796 			   (header->entry_count >= journal->full_entries_per_block)) {
1797 			/*
1798 			 * The tail block does not have room for a block map increment, so commit
1799 			 * it now.
1800 			 */
1801 			commit_tail(journal);
1802 			if (journal->waiting_to_commit) {
1803 				WRITE_ONCE(journal->events->tail_busy_count,
1804 					   journal->events->tail_busy_count + 1);
1805 				break;
1806 			}
1807 		}
1808 
1809 		/* If the slab is over the blocking threshold, make the vio wait. */
1810 		if (requires_reaping(journal)) {
1811 			WRITE_ONCE(journal->events->blocked_count,
1812 				   journal->events->blocked_count + 1);
1813 			save_dirty_reference_blocks(journal->slab);
1814 			break;
1815 		}
1816 
1817 		if (header->entry_count == 0) {
1818 			struct journal_lock *lock =
1819 				get_lock(journal, header->sequence_number);
1820 
1821 			/*
1822 			 * Check if the on disk slab journal is full. Because of the blocking and
1823 			 * scrubbing thresholds, this should never happen.
1824 			 */
1825 			if (lock->count > 0) {
1826 				VDO_ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail,
1827 						    "New block has locks, but journal is not full");
1828 
1829 				/*
1830 				 * The blocking threshold must let the journal fill up if the new
1831 				 * block has locks; if the blocking threshold is smaller than the
1832 				 * journal size, the new block cannot possibly have locks already.
1833 				 */
1834 				VDO_ASSERT_LOG_ONLY((journal->blocking_threshold >= journal->size),
1835 						    "New block can have locks already iff blocking threshold is at the end of the journal");
1836 
1837 				WRITE_ONCE(journal->events->disk_full_count,
1838 					   journal->events->disk_full_count + 1);
1839 				save_dirty_reference_blocks(journal->slab);
1840 				break;
1841 			}
1842 
1843 			/*
1844 			 * Don't allow the new block to be reaped until all of the reference count
1845 			 * blocks are written and the journal block has been fully committed as
1846 			 * well.
1847 			 */
1848 			lock->count = journal->entries_per_block + 1;
1849 
1850 			if (header->sequence_number == 1) {
1851 				struct vdo_slab *slab = journal->slab;
1852 				block_count_t i;
1853 
1854 				/*
1855 				 * This is the first entry in this slab journal, ever. Dirty all of
1856 				 * the reference count blocks. Each will acquire a lock on the tail
1857 				 * block so that the journal won't be reaped until the reference
1858 				 * counts are initialized. The lock acquisition must be done by the
1859 				 * ref_counts since here we don't know how many reference blocks
1860 				 * the ref_counts has.
1861 				 */
1862 				for (i = 0; i < slab->reference_block_count; i++) {
1863 					slab->reference_blocks[i].slab_journal_lock = 1;
1864 					dirty_block(&slab->reference_blocks[i]);
1865 				}
1866 
1867 				adjust_slab_journal_block_reference(journal, 1,
1868 								    slab->reference_block_count);
1869 			}
1870 		}
1871 
1872 		vdo_waitq_notify_next_waiter(&journal->entry_waiters,
1873 					     add_entry_from_waiter, journal);
1874 	}
1875 
1876 	journal->adding_entries = false;
1877 
1878 	/* If there are no waiters, and we are flushing or saving, commit the tail block. */
1879 	if (vdo_is_state_draining(&journal->slab->state) &&
1880 	    !vdo_is_state_suspending(&journal->slab->state) &&
1881 	    !vdo_waitq_has_waiters(&journal->entry_waiters))
1882 		commit_tail(journal);
1883 }
1884 
1885 /**
1886  * reset_search_cursor() - Reset the free block search back to the first reference counter in the
1887  *                         first reference block of a slab.
1888  */
reset_search_cursor(struct vdo_slab * slab)1889 static void reset_search_cursor(struct vdo_slab *slab)
1890 {
1891 	struct search_cursor *cursor = &slab->search_cursor;
1892 
1893 	cursor->block = cursor->first_block;
1894 	cursor->index = 0;
1895 	/* Unit tests have slabs with only one reference block (and it's a runt). */
1896 	cursor->end_index = min_t(u32, COUNTS_PER_BLOCK, slab->block_count);
1897 }
1898 
1899 /**
1900  * advance_search_cursor() - Advance the search cursor to the start of the next reference block in
1901  *                           a slab,
1902  *
1903  * Wraps around to the first reference block if the current block is the last reference block.
1904  *
1905  * Return: true unless the cursor was at the last reference block.
1906  */
advance_search_cursor(struct vdo_slab * slab)1907 static bool advance_search_cursor(struct vdo_slab *slab)
1908 {
1909 	struct search_cursor *cursor = &slab->search_cursor;
1910 
1911 	/*
1912 	 * If we just finished searching the last reference block, then wrap back around to the
1913 	 * start of the array.
1914 	 */
1915 	if (cursor->block == cursor->last_block) {
1916 		reset_search_cursor(slab);
1917 		return false;
1918 	}
1919 
1920 	/* We're not already at the end, so advance to cursor to the next block. */
1921 	cursor->block++;
1922 	cursor->index = cursor->end_index;
1923 
1924 	if (cursor->block == cursor->last_block) {
1925 		/* The last reference block will usually be a runt. */
1926 		cursor->end_index = slab->block_count;
1927 	} else {
1928 		cursor->end_index += COUNTS_PER_BLOCK;
1929 	}
1930 
1931 	return true;
1932 }
1933 
1934 /**
1935  * vdo_adjust_reference_count_for_rebuild() - Adjust the reference count of a block during rebuild.
1936  *
1937  * Return: VDO_SUCCESS or an error.
1938  */
vdo_adjust_reference_count_for_rebuild(struct slab_depot * depot,physical_block_number_t pbn,enum journal_operation operation)1939 int vdo_adjust_reference_count_for_rebuild(struct slab_depot *depot,
1940 					   physical_block_number_t pbn,
1941 					   enum journal_operation operation)
1942 {
1943 	int result;
1944 	slab_block_number block_number;
1945 	struct reference_block *block;
1946 	struct vdo_slab *slab = vdo_get_slab(depot, pbn);
1947 	struct reference_updater updater = {
1948 		.operation = operation,
1949 		.increment = true,
1950 	};
1951 
1952 	result = slab_block_number_from_pbn(slab, pbn, &block_number);
1953 	if (result != VDO_SUCCESS)
1954 		return result;
1955 
1956 	block = get_reference_block(slab, block_number);
1957 	result = update_reference_count(slab, block, block_number, NULL,
1958 					&updater, !NORMAL_OPERATION, false, NULL);
1959 	if (result != VDO_SUCCESS)
1960 		return result;
1961 
1962 	dirty_block(block);
1963 	return VDO_SUCCESS;
1964 }
1965 
1966 /**
1967  * replay_reference_count_change() - Replay the reference count adjustment from a slab journal
1968  *                                   entry into the reference count for a block.
1969  * @slab: The slab.
1970  * @entry_point: The slab journal point for the entry.
1971  * @entry: The slab journal entry being replayed.
1972  *
1973  * The adjustment will be ignored if it was already recorded in the reference count.
1974  *
1975  * Return: VDO_SUCCESS or an error code.
1976  */
replay_reference_count_change(struct vdo_slab * slab,const struct journal_point * entry_point,struct slab_journal_entry entry)1977 static int replay_reference_count_change(struct vdo_slab *slab,
1978 					 const struct journal_point *entry_point,
1979 					 struct slab_journal_entry entry)
1980 {
1981 	int result;
1982 	struct reference_block *block = get_reference_block(slab, entry.sbn);
1983 	sector_count_t sector = (entry.sbn % COUNTS_PER_BLOCK) / COUNTS_PER_SECTOR;
1984 	struct reference_updater updater = {
1985 		.operation = entry.operation,
1986 		.increment = entry.increment,
1987 	};
1988 
1989 	if (!vdo_before_journal_point(&block->commit_points[sector], entry_point)) {
1990 		/* This entry is already reflected in the existing counts, so do nothing. */
1991 		return VDO_SUCCESS;
1992 	}
1993 
1994 	/* This entry is not yet counted in the reference counts. */
1995 	result = update_reference_count(slab, block, entry.sbn, entry_point,
1996 					&updater, !NORMAL_OPERATION, false, NULL);
1997 	if (result != VDO_SUCCESS)
1998 		return result;
1999 
2000 	dirty_block(block);
2001 	return VDO_SUCCESS;
2002 }
2003 
2004 /**
2005  * find_zero_byte_in_word() - Find the array index of the first zero byte in word-sized range of
2006  *                            reference counters.
2007  * @word_ptr: A pointer to the eight counter bytes to check.
2008  * @start_index: The array index corresponding to word_ptr[0].
2009  * @fail_index: The array index to return if no zero byte is found.
2010  *
2011  * The search does no bounds checking; the function relies on the array being sufficiently padded.
2012  *
2013  * Return: The array index of the first zero byte in the word, or the value passed as fail_index if
2014  *         no zero byte was found.
2015  */
find_zero_byte_in_word(const u8 * word_ptr,slab_block_number start_index,slab_block_number fail_index)2016 static inline slab_block_number find_zero_byte_in_word(const u8 *word_ptr,
2017 						       slab_block_number start_index,
2018 						       slab_block_number fail_index)
2019 {
2020 	u64 word = get_unaligned_le64(word_ptr);
2021 
2022 	/* This looks like a loop, but GCC will unroll the eight iterations for us. */
2023 	unsigned int offset;
2024 
2025 	for (offset = 0; offset < BYTES_PER_WORD; offset++) {
2026 		/* Assumes little-endian byte order, which we have on X86. */
2027 		if ((word & 0xFF) == 0)
2028 			return (start_index + offset);
2029 		word >>= 8;
2030 	}
2031 
2032 	return fail_index;
2033 }
2034 
2035 /**
2036  * find_free_block() - Find the first block with a reference count of zero in the specified
2037  *                     range of reference counter indexes.
2038  * @slab: The slab counters to scan.
2039  * @index_ptr: A pointer to hold the array index of the free block.
2040  *
2041  * Exposed for unit testing.
2042  *
2043  * Return: true if a free block was found in the specified range.
2044  */
find_free_block(const struct vdo_slab * slab,slab_block_number * index_ptr)2045 static bool find_free_block(const struct vdo_slab *slab, slab_block_number *index_ptr)
2046 {
2047 	slab_block_number zero_index;
2048 	slab_block_number next_index = slab->search_cursor.index;
2049 	slab_block_number end_index = slab->search_cursor.end_index;
2050 	u8 *next_counter = &slab->counters[next_index];
2051 	u8 *end_counter = &slab->counters[end_index];
2052 
2053 	/*
2054 	 * Search every byte of the first unaligned word. (Array is padded so reading past end is
2055 	 * safe.)
2056 	 */
2057 	zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2058 	if (zero_index < end_index) {
2059 		*index_ptr = zero_index;
2060 		return true;
2061 	}
2062 
2063 	/*
2064 	 * On architectures where unaligned word access is expensive, this would be a good place to
2065 	 * advance to an alignment boundary.
2066 	 */
2067 	next_index += BYTES_PER_WORD;
2068 	next_counter += BYTES_PER_WORD;
2069 
2070 	/*
2071 	 * Now we're word-aligned; check an word at a time until we find a word containing a zero.
2072 	 * (Array is padded so reading past end is safe.)
2073 	 */
2074 	while (next_counter < end_counter) {
2075 		/*
2076 		 * The following code is currently an exact copy of the code preceding the loop,
2077 		 * but if you try to merge them by using a do loop, it runs slower because a jump
2078 		 * instruction gets added at the start of the iteration.
2079 		 */
2080 		zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2081 		if (zero_index < end_index) {
2082 			*index_ptr = zero_index;
2083 			return true;
2084 		}
2085 
2086 		next_index += BYTES_PER_WORD;
2087 		next_counter += BYTES_PER_WORD;
2088 	}
2089 
2090 	return false;
2091 }
2092 
2093 /**
2094  * search_current_reference_block() - Search the reference block currently saved in the search
2095  *                                    cursor for a reference count of zero, starting at the saved
2096  *                                    counter index.
2097  * @slab: The slab to search.
2098  * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2099  *
2100  * Return: true if an unreferenced counter was found.
2101  */
search_current_reference_block(const struct vdo_slab * slab,slab_block_number * free_index_ptr)2102 static bool search_current_reference_block(const struct vdo_slab *slab,
2103 					   slab_block_number *free_index_ptr)
2104 {
2105 	/* Don't bother searching if the current block is known to be full. */
2106 	return ((slab->search_cursor.block->allocated_count < COUNTS_PER_BLOCK) &&
2107 		find_free_block(slab, free_index_ptr));
2108 }
2109 
2110 /**
2111  * search_reference_blocks() - Search each reference block for a reference count of zero.
2112  * @slab: The slab to search.
2113  * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2114  *
2115  * Searches each reference block for a reference count of zero, starting at the reference block and
2116  * counter index saved in the search cursor and searching up to the end of the last reference
2117  * block. The search does not wrap.
2118  *
2119  * Return: true if an unreferenced counter was found.
2120  */
search_reference_blocks(struct vdo_slab * slab,slab_block_number * free_index_ptr)2121 static bool search_reference_blocks(struct vdo_slab *slab,
2122 				    slab_block_number *free_index_ptr)
2123 {
2124 	/* Start searching at the saved search position in the current block. */
2125 	if (search_current_reference_block(slab, free_index_ptr))
2126 		return true;
2127 
2128 	/* Search each reference block up to the end of the slab. */
2129 	while (advance_search_cursor(slab)) {
2130 		if (search_current_reference_block(slab, free_index_ptr))
2131 			return true;
2132 	}
2133 
2134 	return false;
2135 }
2136 
2137 /**
2138  * make_provisional_reference() - Do the bookkeeping for making a provisional reference.
2139  */
make_provisional_reference(struct vdo_slab * slab,slab_block_number block_number)2140 static void make_provisional_reference(struct vdo_slab *slab,
2141 				       slab_block_number block_number)
2142 {
2143 	struct reference_block *block = get_reference_block(slab, block_number);
2144 
2145 	/*
2146 	 * Make the initial transition from an unreferenced block to a
2147 	 * provisionally allocated block.
2148 	 */
2149 	slab->counters[block_number] = PROVISIONAL_REFERENCE_COUNT;
2150 
2151 	/* Account for the allocation. */
2152 	block->allocated_count++;
2153 	slab->free_blocks--;
2154 }
2155 
2156 /**
2157  * dirty_all_reference_blocks() - Mark all reference count blocks in a slab as dirty.
2158  */
dirty_all_reference_blocks(struct vdo_slab * slab)2159 static void dirty_all_reference_blocks(struct vdo_slab *slab)
2160 {
2161 	block_count_t i;
2162 
2163 	for (i = 0; i < slab->reference_block_count; i++)
2164 		dirty_block(&slab->reference_blocks[i]);
2165 }
2166 
journal_points_equal(struct journal_point first,struct journal_point second)2167 static inline bool journal_points_equal(struct journal_point first,
2168 					struct journal_point second)
2169 {
2170 	return ((first.sequence_number == second.sequence_number) &&
2171 		(first.entry_count == second.entry_count));
2172 }
2173 
2174 /**
2175  * match_bytes() - Check an 8-byte word for bytes matching the value specified
2176  * @input: A word to examine the bytes of
2177  * @match: The byte value sought
2178  *
2179  * Return: 1 in each byte when the corresponding input byte matched, 0 otherwise
2180  */
match_bytes(u64 input,u8 match)2181 static inline u64 match_bytes(u64 input, u8 match)
2182 {
2183 	u64 temp = input ^ (match * 0x0101010101010101ULL);
2184 	/* top bit of each byte is set iff top bit of temp byte is clear; rest are 0 */
2185 	u64 test_top_bits = ~temp & 0x8080808080808080ULL;
2186 	/* top bit of each byte is set iff low 7 bits of temp byte are clear; rest are useless */
2187 	u64 test_low_bits = 0x8080808080808080ULL - (temp & 0x7f7f7f7f7f7f7f7fULL);
2188 	/* return 1 when both tests indicate temp byte is 0 */
2189 	return (test_top_bits & test_low_bits) >> 7;
2190 }
2191 
2192 /**
2193  * count_valid_references() - Process a newly loaded refcount array
2194  * @counters: the array of counters from a metadata block
2195  *
2196  * Scan a 8-byte-aligned array of counters, fixing up any "provisional" values that weren't
2197  * cleaned up at shutdown, changing them internally to "empty".
2198  *
2199  * Return: the number of blocks that are referenced (counters not "empty")
2200  */
count_valid_references(vdo_refcount_t * counters)2201 static unsigned int count_valid_references(vdo_refcount_t *counters)
2202 {
2203 	u64 *words = (u64 *)counters;
2204 	/* It's easier to count occurrences of a specific byte than its absences. */
2205 	unsigned int empty_count = 0;
2206 	/* For speed, we process 8 bytes at once. */
2207 	unsigned int words_left = COUNTS_PER_BLOCK / sizeof(u64);
2208 
2209 	/*
2210 	 * Sanity check assumptions used for optimizing this code: Counters are bytes. The counter
2211 	 * array is a multiple of the word size.
2212 	 */
2213 	BUILD_BUG_ON(sizeof(vdo_refcount_t) != 1);
2214 	BUILD_BUG_ON((COUNTS_PER_BLOCK % sizeof(u64)) != 0);
2215 
2216 	while (words_left > 0) {
2217 		/*
2218 		 * This is used effectively as 8 byte-size counters. Byte 0 counts how many words
2219 		 * had the target value found in byte 0, etc. We just have to avoid overflow.
2220 		 */
2221 		u64 split_count = 0;
2222 		/*
2223 		 * The counter "% 255" trick used below to fold split_count into empty_count
2224 		 * imposes a limit of 254 bytes examined each iteration of the outer loop. We
2225 		 * process a word at a time, so that limit gets rounded down to 31 u64 words.
2226 		 */
2227 		const unsigned int max_words_per_iteration = 254 / sizeof(u64);
2228 		unsigned int iter_words_left = min_t(unsigned int, words_left,
2229 						     max_words_per_iteration);
2230 
2231 		words_left -= iter_words_left;
2232 
2233 		while (iter_words_left--) {
2234 			u64 word = *words;
2235 			u64 temp;
2236 
2237 			/* First, if we have any provisional refcount values, clear them. */
2238 			temp = match_bytes(word, PROVISIONAL_REFERENCE_COUNT);
2239 			if (temp) {
2240 				/*
2241 				 * 'temp' has 0x01 bytes where 'word' has PROVISIONAL; this xor
2242 				 * will alter just those bytes, changing PROVISIONAL to EMPTY.
2243 				 */
2244 				word ^= temp * (PROVISIONAL_REFERENCE_COUNT ^ EMPTY_REFERENCE_COUNT);
2245 				*words = word;
2246 			}
2247 
2248 			/* Now count the EMPTY_REFERENCE_COUNT bytes, updating the 8 counters. */
2249 			split_count += match_bytes(word, EMPTY_REFERENCE_COUNT);
2250 			words++;
2251 		}
2252 		empty_count += split_count % 255;
2253 	}
2254 
2255 	return COUNTS_PER_BLOCK - empty_count;
2256 }
2257 
2258 /**
2259  * unpack_reference_block() - Unpack reference counts blocks into the internal memory structure.
2260  * @packed: The written reference block to be unpacked.
2261  * @block: The internal reference block to be loaded.
2262  */
unpack_reference_block(struct packed_reference_block * packed,struct reference_block * block)2263 static void unpack_reference_block(struct packed_reference_block *packed,
2264 				   struct reference_block *block)
2265 {
2266 	sector_count_t i;
2267 	struct vdo_slab *slab = block->slab;
2268 	vdo_refcount_t *counters = get_reference_counters_for_block(block);
2269 
2270 	for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
2271 		struct packed_reference_sector *sector = &packed->sectors[i];
2272 
2273 		vdo_unpack_journal_point(&sector->commit_point, &block->commit_points[i]);
2274 		memcpy(counters + (i * COUNTS_PER_SECTOR), sector->counts,
2275 		       (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
2276 		/* The slab_journal_point must be the latest point found in any sector. */
2277 		if (vdo_before_journal_point(&slab->slab_journal_point,
2278 					     &block->commit_points[i]))
2279 			slab->slab_journal_point = block->commit_points[i];
2280 
2281 		if ((i > 0) &&
2282 		    !journal_points_equal(block->commit_points[0],
2283 					  block->commit_points[i])) {
2284 			size_t block_index = block - block->slab->reference_blocks;
2285 
2286 			vdo_log_warning("Torn write detected in sector %u of reference block %zu of slab %u",
2287 					i, block_index, block->slab->slab_number);
2288 		}
2289 	}
2290 
2291 	block->allocated_count = count_valid_references(counters);
2292 }
2293 
2294 /**
2295  * finish_reference_block_load() - After a reference block has been read, unpack it.
2296  * @completion: The VIO that just finished reading.
2297  */
finish_reference_block_load(struct vdo_completion * completion)2298 static void finish_reference_block_load(struct vdo_completion *completion)
2299 {
2300 	struct vio *vio = as_vio(completion);
2301 	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
2302 	struct reference_block *block = completion->parent;
2303 	struct vdo_slab *slab = block->slab;
2304 	unsigned int block_count = vio->io_size / VDO_BLOCK_SIZE;
2305 	unsigned int i;
2306 	char *data = vio->data;
2307 
2308 	for (i = 0; i < block_count; i++, block++, data += VDO_BLOCK_SIZE) {
2309 		struct packed_reference_block *packed = (struct packed_reference_block *) data;
2310 
2311 		unpack_reference_block(packed, block);
2312 		slab->free_blocks -= block->allocated_count;
2313 	}
2314 	return_vio_to_pool(pooled);
2315 	slab->active_count -= block_count;
2316 
2317 	check_if_slab_drained(slab);
2318 }
2319 
load_reference_block_endio(struct bio * bio)2320 static void load_reference_block_endio(struct bio *bio)
2321 {
2322 	struct vio *vio = bio->bi_private;
2323 	struct reference_block *block = vio->completion.parent;
2324 
2325 	continue_vio_after_io(vio, finish_reference_block_load,
2326 			      block->slab->allocator->thread_id);
2327 }
2328 
2329 /**
2330  * load_reference_block_group() - After a block waiter has gotten a VIO from the VIO pool, load
2331  *                                a set of blocks.
2332  * @waiter: The waiter of the first block to load.
2333  * @context: The VIO returned by the pool.
2334  */
load_reference_block_group(struct vdo_waiter * waiter,void * context)2335 static void load_reference_block_group(struct vdo_waiter *waiter, void *context)
2336 {
2337 	struct pooled_vio *pooled = context;
2338 	struct vio *vio = &pooled->vio;
2339 	struct reference_block *block =
2340 		container_of(waiter, struct reference_block, waiter);
2341 	u32 block_offset = block - block->slab->reference_blocks;
2342 	u32 max_block_count = block->slab->reference_block_count - block_offset;
2343 	u32 block_count = min_t(int, vio->block_count, max_block_count);
2344 
2345 	vio->completion.parent = block;
2346 	vdo_submit_metadata_vio_with_size(vio, block->slab->ref_counts_origin + block_offset,
2347 					  load_reference_block_endio, handle_io_error,
2348 					  REQ_OP_READ, block_count * VDO_BLOCK_SIZE);
2349 }
2350 
2351 /**
2352  * load_reference_blocks() - Load a slab's reference blocks from the underlying storage into a
2353  *                           pre-allocated reference counter.
2354  */
load_reference_blocks(struct vdo_slab * slab)2355 static void load_reference_blocks(struct vdo_slab *slab)
2356 {
2357 	block_count_t i;
2358 	u64 blocks_per_vio = slab->allocator->refcount_blocks_per_big_vio;
2359 	struct vio_pool *pool = slab->allocator->refcount_big_vio_pool;
2360 
2361 	if (!pool) {
2362 		pool = slab->allocator->vio_pool;
2363 		blocks_per_vio = 1;
2364 	}
2365 
2366 	slab->free_blocks = slab->block_count;
2367 	slab->active_count = slab->reference_block_count;
2368 	for (i = 0; i < slab->reference_block_count; i += blocks_per_vio) {
2369 		struct vdo_waiter *waiter = &slab->reference_blocks[i].waiter;
2370 
2371 		waiter->callback = load_reference_block_group;
2372 		acquire_vio_from_pool(pool, waiter);
2373 	}
2374 }
2375 
2376 /**
2377  * drain_slab() - Drain all reference count I/O.
2378  *
2379  * Depending upon the type of drain being performed (as recorded in the ref_count's vdo_slab), the
2380  * reference blocks may be loaded from disk or dirty reference blocks may be written out.
2381  */
drain_slab(struct vdo_slab * slab)2382 static void drain_slab(struct vdo_slab *slab)
2383 {
2384 	bool save;
2385 	bool load;
2386 	const struct admin_state_code *state = vdo_get_admin_state_code(&slab->state);
2387 
2388 	if (state == VDO_ADMIN_STATE_SUSPENDING)
2389 		return;
2390 
2391 	if ((state != VDO_ADMIN_STATE_REBUILDING) &&
2392 	    (state != VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING))
2393 		commit_tail(&slab->journal);
2394 
2395 	if ((state == VDO_ADMIN_STATE_RECOVERING) || (slab->counters == NULL))
2396 		return;
2397 
2398 	save = false;
2399 	load = slab->allocator->summary_entries[slab->slab_number].load_ref_counts;
2400 	if (state == VDO_ADMIN_STATE_SCRUBBING) {
2401 		if (load) {
2402 			load_reference_blocks(slab);
2403 			return;
2404 		}
2405 	} else if (state == VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING) {
2406 		if (!load) {
2407 			/* These reference counts were never written, so mark them all dirty. */
2408 			dirty_all_reference_blocks(slab);
2409 		}
2410 		save = true;
2411 	} else if (state == VDO_ADMIN_STATE_REBUILDING) {
2412 		/*
2413 		 * Write out the counters if the slab has written them before, or it has any
2414 		 * non-zero reference counts, or there are any slab journal blocks.
2415 		 */
2416 		block_count_t data_blocks = slab->allocator->depot->slab_config.data_blocks;
2417 
2418 		if (load || (slab->free_blocks != data_blocks) ||
2419 		    !is_slab_journal_blank(slab)) {
2420 			dirty_all_reference_blocks(slab);
2421 			save = true;
2422 		}
2423 	} else if (state == VDO_ADMIN_STATE_SAVING) {
2424 		save = (slab->status == VDO_SLAB_REBUILT);
2425 	} else {
2426 		vdo_finish_draining_with_result(&slab->state, VDO_SUCCESS);
2427 		return;
2428 	}
2429 
2430 	if (save)
2431 		save_dirty_reference_blocks(slab);
2432 }
2433 
allocate_slab_counters(struct vdo_slab * slab)2434 static int allocate_slab_counters(struct vdo_slab *slab)
2435 {
2436 	int result;
2437 	size_t index, bytes;
2438 
2439 	result = VDO_ASSERT(slab->reference_blocks == NULL,
2440 			    "vdo_slab %u doesn't allocate refcounts twice",
2441 			    slab->slab_number);
2442 	if (result != VDO_SUCCESS)
2443 		return result;
2444 
2445 	result = vdo_allocate(slab->reference_block_count, struct reference_block,
2446 			      __func__, &slab->reference_blocks);
2447 	if (result != VDO_SUCCESS)
2448 		return result;
2449 
2450 	/*
2451 	 * Allocate such that the runt slab has a full-length memory array, plus a little padding
2452 	 * so we can word-search even at the very end.
2453 	 */
2454 	bytes = (slab->reference_block_count * COUNTS_PER_BLOCK) + (2 * BYTES_PER_WORD);
2455 	result = vdo_allocate(bytes, vdo_refcount_t, "ref counts array",
2456 			      &slab->counters);
2457 	if (result != VDO_SUCCESS) {
2458 		vdo_free(vdo_forget(slab->reference_blocks));
2459 		return result;
2460 	}
2461 
2462 	slab->search_cursor.first_block = slab->reference_blocks;
2463 	slab->search_cursor.last_block = &slab->reference_blocks[slab->reference_block_count - 1];
2464 	reset_search_cursor(slab);
2465 
2466 	for (index = 0; index < slab->reference_block_count; index++) {
2467 		slab->reference_blocks[index] = (struct reference_block) {
2468 			.slab = slab,
2469 		};
2470 	}
2471 
2472 	return VDO_SUCCESS;
2473 }
2474 
allocate_counters_if_clean(struct vdo_slab * slab)2475 static int allocate_counters_if_clean(struct vdo_slab *slab)
2476 {
2477 	if (vdo_is_state_clean_load(&slab->state))
2478 		return allocate_slab_counters(slab);
2479 
2480 	return VDO_SUCCESS;
2481 }
2482 
finish_loading_journal(struct vdo_completion * completion)2483 static void finish_loading_journal(struct vdo_completion *completion)
2484 {
2485 	struct vio *vio = as_vio(completion);
2486 	struct slab_journal *journal = completion->parent;
2487 	struct vdo_slab *slab = journal->slab;
2488 	struct packed_slab_journal_block *block = (struct packed_slab_journal_block *) vio->data;
2489 	struct slab_journal_block_header header;
2490 
2491 	vdo_unpack_slab_journal_block_header(&block->header, &header);
2492 
2493 	/* FIXME: should it be an error if the following conditional fails? */
2494 	if ((header.metadata_type == VDO_METADATA_SLAB_JOURNAL) &&
2495 	    (header.nonce == slab->allocator->nonce)) {
2496 		journal->tail = header.sequence_number + 1;
2497 
2498 		/*
2499 		 * If the slab is clean, this implies the slab journal is empty, so advance the
2500 		 * head appropriately.
2501 		 */
2502 		journal->head = (slab->allocator->summary_entries[slab->slab_number].is_dirty ?
2503 				 header.head : journal->tail);
2504 		journal->tail_header = header;
2505 		initialize_journal_state(journal);
2506 	}
2507 
2508 	return_vio_to_pool(vio_as_pooled_vio(vio));
2509 	vdo_finish_loading_with_result(&slab->state, allocate_counters_if_clean(slab));
2510 }
2511 
read_slab_journal_tail_endio(struct bio * bio)2512 static void read_slab_journal_tail_endio(struct bio *bio)
2513 {
2514 	struct vio *vio = bio->bi_private;
2515 	struct slab_journal *journal = vio->completion.parent;
2516 
2517 	continue_vio_after_io(vio, finish_loading_journal,
2518 			      journal->slab->allocator->thread_id);
2519 }
2520 
handle_load_error(struct vdo_completion * completion)2521 static void handle_load_error(struct vdo_completion *completion)
2522 {
2523 	int result = completion->result;
2524 	struct slab_journal *journal = completion->parent;
2525 	struct vio *vio = as_vio(completion);
2526 
2527 	vio_record_metadata_io_error(vio);
2528 	return_vio_to_pool(vio_as_pooled_vio(vio));
2529 	vdo_finish_loading_with_result(&journal->slab->state, result);
2530 }
2531 
2532 /**
2533  * read_slab_journal_tail() - Read the slab journal tail block by using a vio acquired from the vio
2534  *                            pool.
2535  * @waiter: The vio pool waiter which has just been notified.
2536  * @context: The vio pool entry given to the waiter.
2537  *
2538  * This is the success callback from acquire_vio_from_pool() when loading a slab journal.
2539  */
read_slab_journal_tail(struct vdo_waiter * waiter,void * context)2540 static void read_slab_journal_tail(struct vdo_waiter *waiter, void *context)
2541 {
2542 	struct slab_journal *journal =
2543 		container_of(waiter, struct slab_journal, resource_waiter);
2544 	struct vdo_slab *slab = journal->slab;
2545 	struct pooled_vio *pooled = context;
2546 	struct vio *vio = &pooled->vio;
2547 	tail_block_offset_t last_commit_point =
2548 		slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2549 
2550 	/*
2551 	 * Slab summary keeps the commit point offset, so the tail block is the block before that.
2552 	 * Calculation supports small journals in unit tests.
2553 	 */
2554 	tail_block_offset_t tail_block = ((last_commit_point == 0) ?
2555 					  (tail_block_offset_t)(journal->size - 1) :
2556 					  (last_commit_point - 1));
2557 
2558 	vio->completion.parent = journal;
2559 	vio->completion.callback_thread_id = slab->allocator->thread_id;
2560 	vdo_submit_metadata_vio(vio, slab->journal_origin + tail_block,
2561 				read_slab_journal_tail_endio, handle_load_error,
2562 				REQ_OP_READ);
2563 }
2564 
2565 /**
2566  * load_slab_journal() - Load a slab's journal by reading the journal's tail.
2567  */
load_slab_journal(struct vdo_slab * slab)2568 static void load_slab_journal(struct vdo_slab *slab)
2569 {
2570 	struct slab_journal *journal = &slab->journal;
2571 	tail_block_offset_t last_commit_point;
2572 
2573 	last_commit_point = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2574 	if ((last_commit_point == 0) &&
2575 	    !slab->allocator->summary_entries[slab->slab_number].load_ref_counts) {
2576 		/*
2577 		 * This slab claims that it has a tail block at (journal->size - 1), but a head of
2578 		 * 1. This is impossible, due to the scrubbing threshold, on a real system, so
2579 		 * don't bother reading the (bogus) data off disk.
2580 		 */
2581 		VDO_ASSERT_LOG_ONLY(((journal->size < 16) ||
2582 				     (journal->scrubbing_threshold < (journal->size - 1))),
2583 				    "Scrubbing threshold protects against reads of unwritten slab journal blocks");
2584 		vdo_finish_loading_with_result(&slab->state,
2585 					       allocate_counters_if_clean(slab));
2586 		return;
2587 	}
2588 
2589 	journal->resource_waiter.callback = read_slab_journal_tail;
2590 	acquire_vio_from_pool(slab->allocator->vio_pool, &journal->resource_waiter);
2591 }
2592 
register_slab_for_scrubbing(struct vdo_slab * slab,bool high_priority)2593 static void register_slab_for_scrubbing(struct vdo_slab *slab, bool high_priority)
2594 {
2595 	struct slab_scrubber *scrubber = &slab->allocator->scrubber;
2596 
2597 	VDO_ASSERT_LOG_ONLY((slab->status != VDO_SLAB_REBUILT),
2598 			    "slab to be scrubbed is unrecovered");
2599 
2600 	if (slab->status != VDO_SLAB_REQUIRES_SCRUBBING)
2601 		return;
2602 
2603 	list_del_init(&slab->allocq_entry);
2604 	if (!slab->was_queued_for_scrubbing) {
2605 		WRITE_ONCE(scrubber->slab_count, scrubber->slab_count + 1);
2606 		slab->was_queued_for_scrubbing = true;
2607 	}
2608 
2609 	if (high_priority) {
2610 		slab->status = VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING;
2611 		list_add_tail(&slab->allocq_entry, &scrubber->high_priority_slabs);
2612 		return;
2613 	}
2614 
2615 	list_add_tail(&slab->allocq_entry, &scrubber->slabs);
2616 }
2617 
2618 /* Queue a slab for allocation or scrubbing. */
queue_slab(struct vdo_slab * slab)2619 static void queue_slab(struct vdo_slab *slab)
2620 {
2621 	struct block_allocator *allocator = slab->allocator;
2622 	block_count_t free_blocks;
2623 	int result;
2624 
2625 	VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
2626 			"a requeued slab must not already be on a list");
2627 
2628 	if (vdo_is_read_only(allocator->depot->vdo))
2629 		return;
2630 
2631 	free_blocks = slab->free_blocks;
2632 	result = VDO_ASSERT((free_blocks <= allocator->depot->slab_config.data_blocks),
2633 			    "rebuilt slab %u must have a valid free block count (has %llu, expected maximum %llu)",
2634 			    slab->slab_number, (unsigned long long) free_blocks,
2635 			    (unsigned long long) allocator->depot->slab_config.data_blocks);
2636 	if (result != VDO_SUCCESS) {
2637 		vdo_enter_read_only_mode(allocator->depot->vdo, result);
2638 		return;
2639 	}
2640 
2641 	if (slab->status != VDO_SLAB_REBUILT) {
2642 		register_slab_for_scrubbing(slab, false);
2643 		return;
2644 	}
2645 
2646 	if (!vdo_is_state_resuming(&slab->state)) {
2647 		/*
2648 		 * If the slab is resuming, we've already accounted for it here, so don't do it
2649 		 * again.
2650 		 * FIXME: under what situation would the slab be resuming here?
2651 		 */
2652 		WRITE_ONCE(allocator->allocated_blocks,
2653 			   allocator->allocated_blocks - free_blocks);
2654 		if (!is_slab_journal_blank(slab)) {
2655 			WRITE_ONCE(allocator->statistics.slabs_opened,
2656 				   allocator->statistics.slabs_opened + 1);
2657 		}
2658 	}
2659 
2660 	if (allocator->depot->vdo->suspend_type == VDO_ADMIN_STATE_SAVING)
2661 		reopen_slab_journal(slab);
2662 
2663 	prioritize_slab(slab);
2664 }
2665 
2666 /**
2667  * initiate_slab_action() - Initiate a slab action.
2668  *
2669  * Implements vdo_admin_initiator_fn.
2670  */
initiate_slab_action(struct admin_state * state)2671 static void initiate_slab_action(struct admin_state *state)
2672 {
2673 	struct vdo_slab *slab = container_of(state, struct vdo_slab, state);
2674 
2675 	if (vdo_is_state_draining(state)) {
2676 		const struct admin_state_code *operation = vdo_get_admin_state_code(state);
2677 
2678 		if (operation == VDO_ADMIN_STATE_SCRUBBING)
2679 			slab->status = VDO_SLAB_REBUILDING;
2680 
2681 		drain_slab(slab);
2682 		check_if_slab_drained(slab);
2683 		return;
2684 	}
2685 
2686 	if (vdo_is_state_loading(state)) {
2687 		load_slab_journal(slab);
2688 		return;
2689 	}
2690 
2691 	if (vdo_is_state_resuming(state)) {
2692 		queue_slab(slab);
2693 		vdo_finish_resuming(state);
2694 		return;
2695 	}
2696 
2697 	vdo_finish_operation(state, VDO_INVALID_ADMIN_STATE);
2698 }
2699 
2700 /**
2701  * get_next_slab() - Get the next slab to scrub.
2702  * @scrubber: The slab scrubber.
2703  *
2704  * Return: The next slab to scrub or NULL if there are none.
2705  */
get_next_slab(struct slab_scrubber * scrubber)2706 static struct vdo_slab *get_next_slab(struct slab_scrubber *scrubber)
2707 {
2708 	struct vdo_slab *slab;
2709 
2710 	slab = list_first_entry_or_null(&scrubber->high_priority_slabs,
2711 					struct vdo_slab, allocq_entry);
2712 	if (slab != NULL)
2713 		return slab;
2714 
2715 	return list_first_entry_or_null(&scrubber->slabs, struct vdo_slab,
2716 					allocq_entry);
2717 }
2718 
2719 /**
2720  * has_slabs_to_scrub() - Check whether a scrubber has slabs to scrub.
2721  * @scrubber: The scrubber to check.
2722  *
2723  * Return: true if the scrubber has slabs to scrub.
2724  */
has_slabs_to_scrub(struct slab_scrubber * scrubber)2725 static inline bool __must_check has_slabs_to_scrub(struct slab_scrubber *scrubber)
2726 {
2727 	return (get_next_slab(scrubber) != NULL);
2728 }
2729 
2730 /**
2731  * uninitialize_scrubber_vio() - Clean up the slab_scrubber's vio.
2732  * @scrubber: The scrubber.
2733  */
uninitialize_scrubber_vio(struct slab_scrubber * scrubber)2734 static void uninitialize_scrubber_vio(struct slab_scrubber *scrubber)
2735 {
2736 	vdo_free(vdo_forget(scrubber->vio.data));
2737 	free_vio_components(&scrubber->vio);
2738 }
2739 
2740 /**
2741  * finish_scrubbing() - Stop scrubbing, either because there are no more slabs to scrub or because
2742  *                      there's been an error.
2743  * @scrubber: The scrubber.
2744  */
finish_scrubbing(struct slab_scrubber * scrubber,int result)2745 static void finish_scrubbing(struct slab_scrubber *scrubber, int result)
2746 {
2747 	bool notify = vdo_waitq_has_waiters(&scrubber->waiters);
2748 	bool done = !has_slabs_to_scrub(scrubber);
2749 	struct block_allocator *allocator =
2750 		container_of(scrubber, struct block_allocator, scrubber);
2751 
2752 	if (done)
2753 		uninitialize_scrubber_vio(scrubber);
2754 
2755 	if (scrubber->high_priority_only) {
2756 		scrubber->high_priority_only = false;
2757 		vdo_fail_completion(vdo_forget(scrubber->vio.completion.parent), result);
2758 	} else if (done && (atomic_add_return(-1, &allocator->depot->zones_to_scrub) == 0)) {
2759 		/* All of our slabs were scrubbed, and we're the last allocator to finish. */
2760 		enum vdo_state prior_state =
2761 			atomic_cmpxchg(&allocator->depot->vdo->state, VDO_RECOVERING,
2762 				       VDO_DIRTY);
2763 
2764 		/*
2765 		 * To be safe, even if the CAS failed, ensure anything that follows is ordered with
2766 		 * respect to whatever state change did happen.
2767 		 */
2768 		smp_mb__after_atomic();
2769 
2770 		/*
2771 		 * We must check the VDO state here and not the depot's read_only_notifier since
2772 		 * the compare-swap-above could have failed due to a read-only entry which our own
2773 		 * thread does not yet know about.
2774 		 */
2775 		if (prior_state == VDO_DIRTY)
2776 			vdo_log_info("VDO commencing normal operation");
2777 		else if (prior_state == VDO_RECOVERING)
2778 			vdo_log_info("Exiting recovery mode");
2779 		free_vio_pool(vdo_forget(allocator->refcount_big_vio_pool));
2780 	}
2781 
2782 	/*
2783 	 * Note that the scrubber has stopped, and inform anyone who might be waiting for that to
2784 	 * happen.
2785 	 */
2786 	if (!vdo_finish_draining(&scrubber->admin_state))
2787 		WRITE_ONCE(scrubber->admin_state.current_state,
2788 			   VDO_ADMIN_STATE_SUSPENDED);
2789 
2790 	/*
2791 	 * We can't notify waiters until after we've finished draining or they'll just requeue.
2792 	 * Fortunately if there were waiters, we can't have been freed yet.
2793 	 */
2794 	if (notify)
2795 		vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2796 }
2797 
2798 static void scrub_next_slab(struct slab_scrubber *scrubber);
2799 
2800 /**
2801  * slab_scrubbed() - Notify the scrubber that a slab has been scrubbed.
2802  * @completion: The slab rebuild completion.
2803  *
2804  * This callback is registered in apply_journal_entries().
2805  */
slab_scrubbed(struct vdo_completion * completion)2806 static void slab_scrubbed(struct vdo_completion *completion)
2807 {
2808 	struct slab_scrubber *scrubber =
2809 		container_of(as_vio(completion), struct slab_scrubber, vio);
2810 	struct vdo_slab *slab = scrubber->slab;
2811 
2812 	slab->status = VDO_SLAB_REBUILT;
2813 	queue_slab(slab);
2814 	reopen_slab_journal(slab);
2815 	WRITE_ONCE(scrubber->slab_count, scrubber->slab_count - 1);
2816 	scrub_next_slab(scrubber);
2817 }
2818 
2819 /**
2820  * abort_scrubbing() - Abort scrubbing due to an error.
2821  * @scrubber: The slab scrubber.
2822  * @result: The error.
2823  */
abort_scrubbing(struct slab_scrubber * scrubber,int result)2824 static void abort_scrubbing(struct slab_scrubber *scrubber, int result)
2825 {
2826 	vdo_enter_read_only_mode(scrubber->vio.completion.vdo, result);
2827 	finish_scrubbing(scrubber, result);
2828 }
2829 
2830 /**
2831  * handle_scrubber_error() - Handle errors while rebuilding a slab.
2832  * @completion: The slab rebuild completion.
2833  */
handle_scrubber_error(struct vdo_completion * completion)2834 static void handle_scrubber_error(struct vdo_completion *completion)
2835 {
2836 	struct vio *vio = as_vio(completion);
2837 
2838 	vio_record_metadata_io_error(vio);
2839 	abort_scrubbing(container_of(vio, struct slab_scrubber, vio),
2840 			completion->result);
2841 }
2842 
2843 /**
2844  * apply_block_entries() - Apply all the entries in a block to the reference counts.
2845  * @block: A block with entries to apply.
2846  * @entry_count: The number of entries to apply.
2847  * @block_number: The sequence number of the block.
2848  * @slab: The slab to apply the entries to.
2849  *
2850  * Return: VDO_SUCCESS or an error code.
2851  */
apply_block_entries(struct packed_slab_journal_block * block,journal_entry_count_t entry_count,sequence_number_t block_number,struct vdo_slab * slab)2852 static int apply_block_entries(struct packed_slab_journal_block *block,
2853 			       journal_entry_count_t entry_count,
2854 			       sequence_number_t block_number, struct vdo_slab *slab)
2855 {
2856 	struct journal_point entry_point = {
2857 		.sequence_number = block_number,
2858 		.entry_count = 0,
2859 	};
2860 	int result;
2861 	slab_block_number max_sbn = slab->end - slab->start;
2862 
2863 	while (entry_point.entry_count < entry_count) {
2864 		struct slab_journal_entry entry =
2865 			vdo_decode_slab_journal_entry(block, entry_point.entry_count);
2866 
2867 		if (entry.sbn > max_sbn) {
2868 			/* This entry is out of bounds. */
2869 			return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
2870 						      "vdo_slab journal entry (%llu, %u) had invalid offset %u in slab (size %u blocks)",
2871 						      (unsigned long long) block_number,
2872 						      entry_point.entry_count,
2873 						      entry.sbn, max_sbn);
2874 		}
2875 
2876 		result = replay_reference_count_change(slab, &entry_point, entry);
2877 		if (result != VDO_SUCCESS) {
2878 			vdo_log_error_strerror(result,
2879 					       "vdo_slab journal entry (%llu, %u) (%s of offset %u) could not be applied in slab %u",
2880 					       (unsigned long long) block_number,
2881 					       entry_point.entry_count,
2882 					       vdo_get_journal_operation_name(entry.operation),
2883 					       entry.sbn, slab->slab_number);
2884 			return result;
2885 		}
2886 		entry_point.entry_count++;
2887 	}
2888 
2889 	return VDO_SUCCESS;
2890 }
2891 
2892 /**
2893  * apply_journal_entries() - Find the relevant vio of the slab journal and apply all valid entries.
2894  * @completion: The metadata read vio completion.
2895  *
2896  * This is a callback registered in start_scrubbing().
2897  */
apply_journal_entries(struct vdo_completion * completion)2898 static void apply_journal_entries(struct vdo_completion *completion)
2899 {
2900 	int result;
2901 	struct slab_scrubber *scrubber =
2902 		container_of(as_vio(completion), struct slab_scrubber, vio);
2903 	struct vdo_slab *slab = scrubber->slab;
2904 	struct slab_journal *journal = &slab->journal;
2905 
2906 	/* Find the boundaries of the useful part of the journal. */
2907 	sequence_number_t tail = journal->tail;
2908 	tail_block_offset_t end_index = (tail - 1) % journal->size;
2909 	char *end_data = scrubber->vio.data + (end_index * VDO_BLOCK_SIZE);
2910 	struct packed_slab_journal_block *end_block =
2911 		(struct packed_slab_journal_block *) end_data;
2912 
2913 	sequence_number_t head = __le64_to_cpu(end_block->header.head);
2914 	tail_block_offset_t head_index = head % journal->size;
2915 	block_count_t index = head_index;
2916 
2917 	struct journal_point ref_counts_point = slab->slab_journal_point;
2918 	struct journal_point last_entry_applied = ref_counts_point;
2919 	sequence_number_t sequence;
2920 
2921 	for (sequence = head; sequence < tail; sequence++) {
2922 		char *block_data = scrubber->vio.data + (index * VDO_BLOCK_SIZE);
2923 		struct packed_slab_journal_block *block =
2924 			(struct packed_slab_journal_block *) block_data;
2925 		struct slab_journal_block_header header;
2926 
2927 		vdo_unpack_slab_journal_block_header(&block->header, &header);
2928 
2929 		if ((header.nonce != slab->allocator->nonce) ||
2930 		    (header.metadata_type != VDO_METADATA_SLAB_JOURNAL) ||
2931 		    (header.sequence_number != sequence) ||
2932 		    (header.entry_count > journal->entries_per_block) ||
2933 		    (header.has_block_map_increments &&
2934 		     (header.entry_count > journal->full_entries_per_block))) {
2935 			/* The block is not what we expect it to be. */
2936 			vdo_log_error("vdo_slab journal block for slab %u was invalid",
2937 				      slab->slab_number);
2938 			abort_scrubbing(scrubber, VDO_CORRUPT_JOURNAL);
2939 			return;
2940 		}
2941 
2942 		result = apply_block_entries(block, header.entry_count, sequence, slab);
2943 		if (result != VDO_SUCCESS) {
2944 			abort_scrubbing(scrubber, result);
2945 			return;
2946 		}
2947 
2948 		last_entry_applied.sequence_number = sequence;
2949 		last_entry_applied.entry_count = header.entry_count - 1;
2950 		index++;
2951 		if (index == journal->size)
2952 			index = 0;
2953 	}
2954 
2955 	/*
2956 	 * At the end of rebuild, the reference counters should be accurate to the end of the
2957 	 * journal we just applied.
2958 	 */
2959 	result = VDO_ASSERT(!vdo_before_journal_point(&last_entry_applied,
2960 						      &ref_counts_point),
2961 			    "Refcounts are not more accurate than the slab journal");
2962 	if (result != VDO_SUCCESS) {
2963 		abort_scrubbing(scrubber, result);
2964 		return;
2965 	}
2966 
2967 	/* Save out the rebuilt reference blocks. */
2968 	vdo_prepare_completion(completion, slab_scrubbed, handle_scrubber_error,
2969 			       slab->allocator->thread_id, completion->parent);
2970 	vdo_start_operation_with_waiter(&slab->state,
2971 					VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING,
2972 					completion, initiate_slab_action);
2973 }
2974 
read_slab_journal_endio(struct bio * bio)2975 static void read_slab_journal_endio(struct bio *bio)
2976 {
2977 	struct vio *vio = bio->bi_private;
2978 	struct slab_scrubber *scrubber = container_of(vio, struct slab_scrubber, vio);
2979 
2980 	continue_vio_after_io(bio->bi_private, apply_journal_entries,
2981 			      scrubber->slab->allocator->thread_id);
2982 }
2983 
2984 /**
2985  * start_scrubbing() - Read the current slab's journal from disk now that it has been flushed.
2986  * @completion: The scrubber's vio completion.
2987  *
2988  * This callback is registered in scrub_next_slab().
2989  */
start_scrubbing(struct vdo_completion * completion)2990 static void start_scrubbing(struct vdo_completion *completion)
2991 {
2992 	struct slab_scrubber *scrubber =
2993 		container_of(as_vio(completion), struct slab_scrubber, vio);
2994 	struct vdo_slab *slab = scrubber->slab;
2995 
2996 	if (!slab->allocator->summary_entries[slab->slab_number].is_dirty) {
2997 		slab_scrubbed(completion);
2998 		return;
2999 	}
3000 
3001 	vdo_submit_metadata_vio(&scrubber->vio, slab->journal_origin,
3002 				read_slab_journal_endio, handle_scrubber_error,
3003 				REQ_OP_READ);
3004 }
3005 
3006 /**
3007  * scrub_next_slab() - Scrub the next slab if there is one.
3008  * @scrubber: The scrubber.
3009  */
scrub_next_slab(struct slab_scrubber * scrubber)3010 static void scrub_next_slab(struct slab_scrubber *scrubber)
3011 {
3012 	struct vdo_completion *completion = &scrubber->vio.completion;
3013 	struct vdo_slab *slab;
3014 
3015 	/*
3016 	 * Note: this notify call is always safe only because scrubbing can only be started when
3017 	 * the VDO is quiescent.
3018 	 */
3019 	vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
3020 
3021 	if (vdo_is_read_only(completion->vdo)) {
3022 		finish_scrubbing(scrubber, VDO_READ_ONLY);
3023 		return;
3024 	}
3025 
3026 	slab = get_next_slab(scrubber);
3027 	if ((slab == NULL) ||
3028 	    (scrubber->high_priority_only && list_empty(&scrubber->high_priority_slabs))) {
3029 		finish_scrubbing(scrubber, VDO_SUCCESS);
3030 		return;
3031 	}
3032 
3033 	if (vdo_finish_draining(&scrubber->admin_state))
3034 		return;
3035 
3036 	list_del_init(&slab->allocq_entry);
3037 	scrubber->slab = slab;
3038 	vdo_prepare_completion(completion, start_scrubbing, handle_scrubber_error,
3039 			       slab->allocator->thread_id, completion->parent);
3040 	vdo_start_operation_with_waiter(&slab->state, VDO_ADMIN_STATE_SCRUBBING,
3041 					completion, initiate_slab_action);
3042 }
3043 
3044 /**
3045  * scrub_slabs() - Scrub all of an allocator's slabs that are eligible for scrubbing.
3046  * @allocator: The block_allocator to scrub.
3047  * @parent: The completion to notify when scrubbing is done, implies high_priority, may be NULL.
3048  */
scrub_slabs(struct block_allocator * allocator,struct vdo_completion * parent)3049 static void scrub_slabs(struct block_allocator *allocator, struct vdo_completion *parent)
3050 {
3051 	struct slab_scrubber *scrubber = &allocator->scrubber;
3052 
3053 	scrubber->vio.completion.parent = parent;
3054 	scrubber->high_priority_only = (parent != NULL);
3055 	if (!has_slabs_to_scrub(scrubber)) {
3056 		finish_scrubbing(scrubber, VDO_SUCCESS);
3057 		return;
3058 	}
3059 
3060 	if (scrubber->high_priority_only &&
3061 	    vdo_is_priority_table_empty(allocator->prioritized_slabs) &&
3062 	    list_empty(&scrubber->high_priority_slabs))
3063 		register_slab_for_scrubbing(get_next_slab(scrubber), true);
3064 
3065 	vdo_resume_if_quiescent(&scrubber->admin_state);
3066 	scrub_next_slab(scrubber);
3067 }
3068 
assert_on_allocator_thread(thread_id_t thread_id,const char * function_name)3069 static inline void assert_on_allocator_thread(thread_id_t thread_id,
3070 					      const char *function_name)
3071 {
3072 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == thread_id),
3073 			    "%s called on correct thread", function_name);
3074 }
3075 
register_slab_with_allocator(struct block_allocator * allocator,struct vdo_slab * slab)3076 static void register_slab_with_allocator(struct block_allocator *allocator,
3077 					 struct vdo_slab *slab)
3078 {
3079 	allocator->slab_count++;
3080 	allocator->last_slab = slab->slab_number;
3081 }
3082 
3083 /**
3084  * get_depot_slab_iterator() - Return a slab_iterator over the slabs in a slab_depot.
3085  * @depot: The depot over which to iterate.
3086  * @start: The number of the slab to start iterating from.
3087  * @end: The number of the last slab which may be returned.
3088  * @stride: The difference in slab number between successive slabs.
3089  *
3090  * Iteration always occurs from higher to lower numbered slabs.
3091  *
3092  * Return: An initialized iterator structure.
3093  */
get_depot_slab_iterator(struct slab_depot * depot,slab_count_t start,slab_count_t end,slab_count_t stride)3094 static struct slab_iterator get_depot_slab_iterator(struct slab_depot *depot,
3095 						    slab_count_t start, slab_count_t end,
3096 						    slab_count_t stride)
3097 {
3098 	struct vdo_slab **slabs = depot->slabs;
3099 
3100 	return (struct slab_iterator) {
3101 		.slabs = slabs,
3102 		.next = (((slabs == NULL) || (start < end)) ? NULL : slabs[start]),
3103 		.end = end,
3104 		.stride = stride,
3105 	};
3106 }
3107 
get_slab_iterator(const struct block_allocator * allocator)3108 static struct slab_iterator get_slab_iterator(const struct block_allocator *allocator)
3109 {
3110 	return get_depot_slab_iterator(allocator->depot, allocator->last_slab,
3111 				       allocator->zone_number,
3112 				       allocator->depot->zone_count);
3113 }
3114 
3115 /**
3116  * next_slab() - Get the next slab from a slab_iterator and advance the iterator
3117  * @iterator: The slab_iterator.
3118  *
3119  * Return: The next slab or NULL if the iterator is exhausted.
3120  */
next_slab(struct slab_iterator * iterator)3121 static struct vdo_slab *next_slab(struct slab_iterator *iterator)
3122 {
3123 	struct vdo_slab *slab = iterator->next;
3124 
3125 	if ((slab == NULL) || (slab->slab_number < iterator->end + iterator->stride))
3126 		iterator->next = NULL;
3127 	else
3128 		iterator->next = iterator->slabs[slab->slab_number - iterator->stride];
3129 
3130 	return slab;
3131 }
3132 
3133 /**
3134  * abort_waiter() - Abort vios waiting to make journal entries when read-only.
3135  *
3136  * This callback is invoked on all vios waiting to make slab journal entries after the VDO has gone
3137  * into read-only mode. Implements waiter_callback_fn.
3138  */
abort_waiter(struct vdo_waiter * waiter,void * context __always_unused)3139 static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
3140 {
3141 	struct reference_updater *updater =
3142 		container_of(waiter, struct reference_updater, waiter);
3143 	struct data_vio *data_vio = data_vio_from_reference_updater(updater);
3144 
3145 	if (updater->increment) {
3146 		continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
3147 		return;
3148 	}
3149 
3150 	vdo_continue_completion(&data_vio->decrement_completion, VDO_READ_ONLY);
3151 }
3152 
3153 /* Implements vdo_read_only_notification_fn. */
notify_block_allocator_of_read_only_mode(void * listener,struct vdo_completion * parent)3154 static void notify_block_allocator_of_read_only_mode(void *listener,
3155 						     struct vdo_completion *parent)
3156 {
3157 	struct block_allocator *allocator = listener;
3158 	struct slab_iterator iterator;
3159 
3160 	assert_on_allocator_thread(allocator->thread_id, __func__);
3161 	iterator = get_slab_iterator(allocator);
3162 	while (iterator.next != NULL) {
3163 		struct vdo_slab *slab = next_slab(&iterator);
3164 
3165 		vdo_waitq_notify_all_waiters(&slab->journal.entry_waiters,
3166 					     abort_waiter, &slab->journal);
3167 		check_if_slab_drained(slab);
3168 	}
3169 
3170 	vdo_finish_completion(parent);
3171 }
3172 
3173 /**
3174  * vdo_acquire_provisional_reference() - Acquire a provisional reference on behalf of a PBN lock if
3175  *                                       the block it locks is unreferenced.
3176  * @slab: The slab which contains the block.
3177  * @pbn: The physical block to reference.
3178  * @lock: The lock.
3179  *
3180  * Return: VDO_SUCCESS or an error.
3181  */
vdo_acquire_provisional_reference(struct vdo_slab * slab,physical_block_number_t pbn,struct pbn_lock * lock)3182 int vdo_acquire_provisional_reference(struct vdo_slab *slab, physical_block_number_t pbn,
3183 				      struct pbn_lock *lock)
3184 {
3185 	slab_block_number block_number;
3186 	int result;
3187 
3188 	if (vdo_pbn_lock_has_provisional_reference(lock))
3189 		return VDO_SUCCESS;
3190 
3191 	if (!is_slab_open(slab))
3192 		return VDO_INVALID_ADMIN_STATE;
3193 
3194 	result = slab_block_number_from_pbn(slab, pbn, &block_number);
3195 	if (result != VDO_SUCCESS)
3196 		return result;
3197 
3198 	if (slab->counters[block_number] == EMPTY_REFERENCE_COUNT) {
3199 		make_provisional_reference(slab, block_number);
3200 		if (lock != NULL)
3201 			vdo_assign_pbn_lock_provisional_reference(lock);
3202 	}
3203 
3204 	if (vdo_pbn_lock_has_provisional_reference(lock))
3205 		adjust_free_block_count(slab, false);
3206 
3207 	return VDO_SUCCESS;
3208 }
3209 
allocate_slab_block(struct vdo_slab * slab,physical_block_number_t * block_number_ptr)3210 static int __must_check allocate_slab_block(struct vdo_slab *slab,
3211 					    physical_block_number_t *block_number_ptr)
3212 {
3213 	slab_block_number free_index;
3214 
3215 	if (!is_slab_open(slab))
3216 		return VDO_INVALID_ADMIN_STATE;
3217 
3218 	if (!search_reference_blocks(slab, &free_index))
3219 		return VDO_NO_SPACE;
3220 
3221 	VDO_ASSERT_LOG_ONLY((slab->counters[free_index] == EMPTY_REFERENCE_COUNT),
3222 			    "free block must have ref count of zero");
3223 	make_provisional_reference(slab, free_index);
3224 	adjust_free_block_count(slab, false);
3225 
3226 	/*
3227 	 * Update the search hint so the next search will start at the array index just past the
3228 	 * free block we just found.
3229 	 */
3230 	slab->search_cursor.index = (free_index + 1);
3231 
3232 	*block_number_ptr = slab->start + free_index;
3233 	return VDO_SUCCESS;
3234 }
3235 
3236 /**
3237  * open_slab() - Prepare a slab to be allocated from.
3238  * @slab: The slab.
3239  */
open_slab(struct vdo_slab * slab)3240 static void open_slab(struct vdo_slab *slab)
3241 {
3242 	reset_search_cursor(slab);
3243 	if (is_slab_journal_blank(slab)) {
3244 		WRITE_ONCE(slab->allocator->statistics.slabs_opened,
3245 			   slab->allocator->statistics.slabs_opened + 1);
3246 		dirty_all_reference_blocks(slab);
3247 	} else {
3248 		WRITE_ONCE(slab->allocator->statistics.slabs_reopened,
3249 			   slab->allocator->statistics.slabs_reopened + 1);
3250 	}
3251 
3252 	slab->allocator->open_slab = slab;
3253 }
3254 
3255 
3256 /*
3257  * The block allocated will have a provisional reference and the reference must be either confirmed
3258  * with a subsequent increment or vacated with a subsequent decrement via
3259  * vdo_release_block_reference().
3260  */
vdo_allocate_block(struct block_allocator * allocator,physical_block_number_t * block_number_ptr)3261 int vdo_allocate_block(struct block_allocator *allocator,
3262 		       physical_block_number_t *block_number_ptr)
3263 {
3264 	int result;
3265 
3266 	if (allocator->open_slab != NULL) {
3267 		/* Try to allocate the next block in the currently open slab. */
3268 		result = allocate_slab_block(allocator->open_slab, block_number_ptr);
3269 		if ((result == VDO_SUCCESS) || (result != VDO_NO_SPACE))
3270 			return result;
3271 
3272 		/* Put the exhausted open slab back into the priority table. */
3273 		prioritize_slab(allocator->open_slab);
3274 	}
3275 
3276 	/* Remove the highest priority slab from the priority table and make it the open slab. */
3277 	open_slab(list_entry(vdo_priority_table_dequeue(allocator->prioritized_slabs),
3278 			     struct vdo_slab, allocq_entry));
3279 
3280 	/*
3281 	 * Try allocating again. If we're out of space immediately after opening a slab, then every
3282 	 * slab must be fully allocated.
3283 	 */
3284 	return allocate_slab_block(allocator->open_slab, block_number_ptr);
3285 }
3286 
3287 /**
3288  * vdo_enqueue_clean_slab_waiter() - Wait for a clean slab.
3289  * @allocator: The block_allocator on which to wait.
3290  * @waiter: The waiter.
3291  *
3292  * Return: VDO_SUCCESS if the waiter was queued, VDO_NO_SPACE if there are no slabs to scrub, and
3293  *         some other error otherwise.
3294  */
vdo_enqueue_clean_slab_waiter(struct block_allocator * allocator,struct vdo_waiter * waiter)3295 int vdo_enqueue_clean_slab_waiter(struct block_allocator *allocator,
3296 				  struct vdo_waiter *waiter)
3297 {
3298 	if (vdo_is_read_only(allocator->depot->vdo))
3299 		return VDO_READ_ONLY;
3300 
3301 	if (vdo_is_state_quiescent(&allocator->scrubber.admin_state))
3302 		return VDO_NO_SPACE;
3303 
3304 	vdo_waitq_enqueue_waiter(&allocator->scrubber.waiters, waiter);
3305 	return VDO_SUCCESS;
3306 }
3307 
3308 /**
3309  * vdo_modify_reference_count() - Modify the reference count of a block by first making a slab
3310  *                                journal entry and then updating the reference counter.
3311  * @completion: The data_vio completion for which to add the entry.
3312  * @updater: Which of the data_vio's reference updaters is being submitted.
3313  */
vdo_modify_reference_count(struct vdo_completion * completion,struct reference_updater * updater)3314 void vdo_modify_reference_count(struct vdo_completion *completion,
3315 				struct reference_updater *updater)
3316 {
3317 	struct vdo_slab *slab = vdo_get_slab(completion->vdo->depot, updater->zpbn.pbn);
3318 
3319 	if (!is_slab_open(slab)) {
3320 		vdo_continue_completion(completion, VDO_INVALID_ADMIN_STATE);
3321 		return;
3322 	}
3323 
3324 	if (vdo_is_read_only(completion->vdo)) {
3325 		vdo_continue_completion(completion, VDO_READ_ONLY);
3326 		return;
3327 	}
3328 
3329 	vdo_waitq_enqueue_waiter(&slab->journal.entry_waiters, &updater->waiter);
3330 	if ((slab->status != VDO_SLAB_REBUILT) && requires_reaping(&slab->journal))
3331 		register_slab_for_scrubbing(slab, true);
3332 
3333 	add_entries(&slab->journal);
3334 }
3335 
3336 /* Release an unused provisional reference. */
vdo_release_block_reference(struct block_allocator * allocator,physical_block_number_t pbn)3337 int vdo_release_block_reference(struct block_allocator *allocator,
3338 				physical_block_number_t pbn)
3339 {
3340 	struct reference_updater updater;
3341 
3342 	if (pbn == VDO_ZERO_BLOCK)
3343 		return VDO_SUCCESS;
3344 
3345 	updater = (struct reference_updater) {
3346 		.operation = VDO_JOURNAL_DATA_REMAPPING,
3347 		.increment = false,
3348 		.zpbn = {
3349 			.pbn = pbn,
3350 		},
3351 	};
3352 
3353 	return adjust_reference_count(vdo_get_slab(allocator->depot, pbn),
3354 				      &updater, NULL);
3355 }
3356 
3357 /*
3358  * This is a min_heap callback function orders slab_status structures using the 'is_clean' field as
3359  * the primary key and the 'emptiness' field as the secondary key.
3360  *
3361  * Slabs need to be pushed onto the lists in the same order they are to be popped off. Popping
3362  * should always get the most empty first, so pushing should be from most empty to least empty.
3363  * Thus, the ordering is reversed from the usual sense since min_heap returns smaller elements
3364  * before larger ones.
3365  */
slab_status_is_less_than(const void * item1,const void * item2,void __always_unused * args)3366 static bool slab_status_is_less_than(const void *item1, const void *item2,
3367 					void __always_unused *args)
3368 {
3369 	const struct slab_status *info1 = item1;
3370 	const struct slab_status *info2 = item2;
3371 
3372 	if (info1->is_clean != info2->is_clean)
3373 		return info1->is_clean;
3374 	if (info1->emptiness != info2->emptiness)
3375 		return info1->emptiness > info2->emptiness;
3376 	return info1->slab_number < info2->slab_number;
3377 }
3378 
3379 static const struct min_heap_callbacks slab_status_min_heap = {
3380 	.less = slab_status_is_less_than,
3381 	.swp = NULL,
3382 };
3383 
3384 /* Inform the slab actor that a action has finished on some slab; used by apply_to_slabs(). */
slab_action_callback(struct vdo_completion * completion)3385 static void slab_action_callback(struct vdo_completion *completion)
3386 {
3387 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3388 	struct slab_actor *actor = &allocator->slab_actor;
3389 
3390 	if (--actor->slab_action_count == 0) {
3391 		actor->callback(completion);
3392 		return;
3393 	}
3394 
3395 	vdo_reset_completion(completion);
3396 }
3397 
3398 /* Preserve the error from part of an action and continue. */
handle_operation_error(struct vdo_completion * completion)3399 static void handle_operation_error(struct vdo_completion *completion)
3400 {
3401 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3402 
3403 	if (allocator->state.waiter != NULL)
3404 		vdo_set_completion_result(allocator->state.waiter, completion->result);
3405 	completion->callback(completion);
3406 }
3407 
3408 /* Perform an action on each of an allocator's slabs in parallel. */
apply_to_slabs(struct block_allocator * allocator,vdo_action_fn callback)3409 static void apply_to_slabs(struct block_allocator *allocator, vdo_action_fn callback)
3410 {
3411 	struct slab_iterator iterator;
3412 
3413 	vdo_prepare_completion(&allocator->completion, slab_action_callback,
3414 			       handle_operation_error, allocator->thread_id, NULL);
3415 	allocator->completion.requeue = false;
3416 
3417 	/*
3418 	 * Since we are going to dequeue all of the slabs, the open slab will become invalid, so
3419 	 * clear it.
3420 	 */
3421 	allocator->open_slab = NULL;
3422 
3423 	/* Ensure that we don't finish before we're done starting. */
3424 	allocator->slab_actor = (struct slab_actor) {
3425 		.slab_action_count = 1,
3426 		.callback = callback,
3427 	};
3428 
3429 	iterator = get_slab_iterator(allocator);
3430 	while (iterator.next != NULL) {
3431 		const struct admin_state_code *operation =
3432 			vdo_get_admin_state_code(&allocator->state);
3433 		struct vdo_slab *slab = next_slab(&iterator);
3434 
3435 		list_del_init(&slab->allocq_entry);
3436 		allocator->slab_actor.slab_action_count++;
3437 		vdo_start_operation_with_waiter(&slab->state, operation,
3438 						&allocator->completion,
3439 						initiate_slab_action);
3440 	}
3441 
3442 	slab_action_callback(&allocator->completion);
3443 }
3444 
finish_loading_allocator(struct vdo_completion * completion)3445 static void finish_loading_allocator(struct vdo_completion *completion)
3446 {
3447 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3448 	const struct admin_state_code *operation =
3449 		vdo_get_admin_state_code(&allocator->state);
3450 
3451 	if (allocator->eraser != NULL)
3452 		dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
3453 
3454 	if (operation == VDO_ADMIN_STATE_LOADING_FOR_RECOVERY) {
3455 		void *context =
3456 			vdo_get_current_action_context(allocator->depot->action_manager);
3457 
3458 		vdo_replay_into_slab_journals(allocator, context);
3459 		return;
3460 	}
3461 
3462 	vdo_finish_loading(&allocator->state);
3463 }
3464 
3465 static void erase_next_slab_journal(struct block_allocator *allocator);
3466 
copy_callback(int read_err,unsigned long write_err,void * context)3467 static void copy_callback(int read_err, unsigned long write_err, void *context)
3468 {
3469 	struct block_allocator *allocator = context;
3470 	int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO);
3471 
3472 	if (result != VDO_SUCCESS) {
3473 		vdo_fail_completion(&allocator->completion, result);
3474 		return;
3475 	}
3476 
3477 	erase_next_slab_journal(allocator);
3478 }
3479 
3480 /* erase_next_slab_journal() - Erase the next slab journal. */
erase_next_slab_journal(struct block_allocator * allocator)3481 static void erase_next_slab_journal(struct block_allocator *allocator)
3482 {
3483 	struct vdo_slab *slab;
3484 	physical_block_number_t pbn;
3485 	struct dm_io_region regions[1];
3486 	struct slab_depot *depot = allocator->depot;
3487 	block_count_t blocks = depot->slab_config.slab_journal_blocks;
3488 
3489 	if (allocator->slabs_to_erase.next == NULL) {
3490 		vdo_finish_completion(&allocator->completion);
3491 		return;
3492 	}
3493 
3494 	slab = next_slab(&allocator->slabs_to_erase);
3495 	pbn = slab->journal_origin - depot->vdo->geometry.bio_offset;
3496 	regions[0] = (struct dm_io_region) {
3497 		.bdev = vdo_get_backing_device(depot->vdo),
3498 		.sector = pbn * VDO_SECTORS_PER_BLOCK,
3499 		.count = blocks * VDO_SECTORS_PER_BLOCK,
3500 	};
3501 	dm_kcopyd_zero(allocator->eraser, 1, regions, 0, copy_callback, allocator);
3502 }
3503 
3504 /* Implements vdo_admin_initiator_fn. */
initiate_load(struct admin_state * state)3505 static void initiate_load(struct admin_state *state)
3506 {
3507 	struct block_allocator *allocator =
3508 		container_of(state, struct block_allocator, state);
3509 	const struct admin_state_code *operation = vdo_get_admin_state_code(state);
3510 
3511 	if (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD) {
3512 		/*
3513 		 * Must requeue because the kcopyd client cannot be freed in the same stack frame
3514 		 * as the kcopyd callback, lest it deadlock.
3515 		 */
3516 		vdo_prepare_completion_for_requeue(&allocator->completion,
3517 						   finish_loading_allocator,
3518 						   handle_operation_error,
3519 						   allocator->thread_id, NULL);
3520 		allocator->eraser = dm_kcopyd_client_create(NULL);
3521 		if (IS_ERR(allocator->eraser)) {
3522 			vdo_fail_completion(&allocator->completion,
3523 					    PTR_ERR(allocator->eraser));
3524 			allocator->eraser = NULL;
3525 			return;
3526 		}
3527 		allocator->slabs_to_erase = get_slab_iterator(allocator);
3528 
3529 		erase_next_slab_journal(allocator);
3530 		return;
3531 	}
3532 
3533 	apply_to_slabs(allocator, finish_loading_allocator);
3534 }
3535 
3536 /**
3537  * vdo_notify_slab_journals_are_recovered() - Inform a block allocator that its slab journals have
3538  *                                            been recovered from the recovery journal.
3539  * @completion The allocator completion
3540  */
vdo_notify_slab_journals_are_recovered(struct vdo_completion * completion)3541 void vdo_notify_slab_journals_are_recovered(struct vdo_completion *completion)
3542 {
3543 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3544 
3545 	vdo_finish_loading_with_result(&allocator->state, completion->result);
3546 }
3547 
get_slab_statuses(struct block_allocator * allocator,struct slab_status ** statuses_ptr)3548 static int get_slab_statuses(struct block_allocator *allocator,
3549 			     struct slab_status **statuses_ptr)
3550 {
3551 	int result;
3552 	struct slab_status *statuses;
3553 	struct slab_iterator iterator = get_slab_iterator(allocator);
3554 
3555 	result = vdo_allocate(allocator->slab_count, struct slab_status, __func__,
3556 			      &statuses);
3557 	if (result != VDO_SUCCESS)
3558 		return result;
3559 
3560 	*statuses_ptr = statuses;
3561 
3562 	while (iterator.next != NULL)  {
3563 		slab_count_t slab_number = next_slab(&iterator)->slab_number;
3564 
3565 		*statuses++ = (struct slab_status) {
3566 			.slab_number = slab_number,
3567 			.is_clean = !allocator->summary_entries[slab_number].is_dirty,
3568 			.emptiness = allocator->summary_entries[slab_number].fullness_hint,
3569 		};
3570 	}
3571 
3572 	return VDO_SUCCESS;
3573 }
3574 
3575 /* Prepare slabs for allocation or scrubbing. */
vdo_prepare_slabs_for_allocation(struct block_allocator * allocator)3576 static int __must_check vdo_prepare_slabs_for_allocation(struct block_allocator *allocator)
3577 {
3578 	struct slab_status current_slab_status;
3579 	DEFINE_MIN_HEAP(struct slab_status, heap) heap;
3580 	int result;
3581 	struct slab_status *slab_statuses;
3582 	struct slab_depot *depot = allocator->depot;
3583 
3584 	WRITE_ONCE(allocator->allocated_blocks,
3585 		   allocator->slab_count * depot->slab_config.data_blocks);
3586 	result = get_slab_statuses(allocator, &slab_statuses);
3587 	if (result != VDO_SUCCESS)
3588 		return result;
3589 
3590 	/* Sort the slabs by cleanliness, then by emptiness hint. */
3591 	heap = (struct heap) {
3592 		.data = slab_statuses,
3593 		.nr = allocator->slab_count,
3594 		.size = allocator->slab_count,
3595 	};
3596 	min_heapify_all(&heap, &slab_status_min_heap, NULL);
3597 
3598 	while (heap.nr > 0) {
3599 		bool high_priority;
3600 		struct vdo_slab *slab;
3601 		struct slab_journal *journal;
3602 
3603 		current_slab_status = slab_statuses[0];
3604 		min_heap_pop(&heap, &slab_status_min_heap, NULL);
3605 		slab = depot->slabs[current_slab_status.slab_number];
3606 
3607 		if ((depot->load_type == VDO_SLAB_DEPOT_REBUILD_LOAD) ||
3608 		    (!allocator->summary_entries[slab->slab_number].load_ref_counts &&
3609 		     current_slab_status.is_clean)) {
3610 			queue_slab(slab);
3611 			continue;
3612 		}
3613 
3614 		slab->status = VDO_SLAB_REQUIRES_SCRUBBING;
3615 		journal = &slab->journal;
3616 		high_priority = ((current_slab_status.is_clean &&
3617 				 (depot->load_type == VDO_SLAB_DEPOT_NORMAL_LOAD)) ||
3618 				 (journal_length(journal) >= journal->scrubbing_threshold));
3619 		register_slab_for_scrubbing(slab, high_priority);
3620 	}
3621 
3622 	vdo_free(slab_statuses);
3623 	return VDO_SUCCESS;
3624 }
3625 
status_to_string(enum slab_rebuild_status status)3626 static const char *status_to_string(enum slab_rebuild_status status)
3627 {
3628 	switch (status) {
3629 	case VDO_SLAB_REBUILT:
3630 		return "REBUILT";
3631 	case VDO_SLAB_REQUIRES_SCRUBBING:
3632 		return "SCRUBBING";
3633 	case VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING:
3634 		return "PRIORITY_SCRUBBING";
3635 	case VDO_SLAB_REBUILDING:
3636 		return "REBUILDING";
3637 	case VDO_SLAB_REPLAYING:
3638 		return "REPLAYING";
3639 	default:
3640 		return "UNKNOWN";
3641 	}
3642 }
3643 
vdo_dump_block_allocator(const struct block_allocator * allocator)3644 void vdo_dump_block_allocator(const struct block_allocator *allocator)
3645 {
3646 	unsigned int pause_counter = 0;
3647 	struct slab_iterator iterator = get_slab_iterator(allocator);
3648 	const struct slab_scrubber *scrubber = &allocator->scrubber;
3649 
3650 	vdo_log_info("block_allocator zone %u", allocator->zone_number);
3651 	while (iterator.next != NULL) {
3652 		struct vdo_slab *slab = next_slab(&iterator);
3653 		struct slab_journal *journal = &slab->journal;
3654 
3655 		if (slab->reference_blocks != NULL) {
3656 			/* Terse because there are a lot of slabs to dump and syslog is lossy. */
3657 			vdo_log_info("slab %u: P%u, %llu free", slab->slab_number,
3658 				     slab->priority,
3659 				     (unsigned long long) slab->free_blocks);
3660 		} else {
3661 			vdo_log_info("slab %u: status %s", slab->slab_number,
3662 				     status_to_string(slab->status));
3663 		}
3664 
3665 		vdo_log_info("  slab journal: entry_waiters=%zu waiting_to_commit=%s updating_slab_summary=%s head=%llu unreapable=%llu tail=%llu next_commit=%llu summarized=%llu last_summarized=%llu recovery_lock=%llu dirty=%s",
3666 			     vdo_waitq_num_waiters(&journal->entry_waiters),
3667 			     vdo_bool_to_string(journal->waiting_to_commit),
3668 			     vdo_bool_to_string(journal->updating_slab_summary),
3669 			     (unsigned long long) journal->head,
3670 			     (unsigned long long) journal->unreapable,
3671 			     (unsigned long long) journal->tail,
3672 			     (unsigned long long) journal->next_commit,
3673 			     (unsigned long long) journal->summarized,
3674 			     (unsigned long long) journal->last_summarized,
3675 			     (unsigned long long) journal->recovery_lock,
3676 			     vdo_bool_to_string(journal->recovery_lock != 0));
3677 		/*
3678 		 * Given the frequency with which the locks are just a tiny bit off, it might be
3679 		 * worth dumping all the locks, but that might be too much logging.
3680 		 */
3681 
3682 		if (slab->counters != NULL) {
3683 			/* Terse because there are a lot of slabs to dump and syslog is lossy. */
3684 			vdo_log_info("  slab: free=%u/%u blocks=%u dirty=%zu active=%zu journal@(%llu,%u)",
3685 				     slab->free_blocks, slab->block_count,
3686 				     slab->reference_block_count,
3687 				     vdo_waitq_num_waiters(&slab->dirty_blocks),
3688 				     slab->active_count,
3689 				     (unsigned long long) slab->slab_journal_point.sequence_number,
3690 				     slab->slab_journal_point.entry_count);
3691 		} else {
3692 			vdo_log_info("  no counters");
3693 		}
3694 
3695 		/*
3696 		 * Wait for a while after each batch of 32 slabs dumped, an arbitrary number,
3697 		 * allowing the kernel log a chance to be flushed instead of being overrun.
3698 		 */
3699 		if (pause_counter++ == 31) {
3700 			pause_counter = 0;
3701 			vdo_pause_for_logger();
3702 		}
3703 	}
3704 
3705 	vdo_log_info("slab_scrubber slab_count %u waiters %zu %s%s",
3706 		     READ_ONCE(scrubber->slab_count),
3707 		     vdo_waitq_num_waiters(&scrubber->waiters),
3708 		     vdo_get_admin_state_code(&scrubber->admin_state)->name,
3709 		     scrubber->high_priority_only ? ", high_priority_only " : "");
3710 }
3711 
free_slab(struct vdo_slab * slab)3712 static void free_slab(struct vdo_slab *slab)
3713 {
3714 	if (slab == NULL)
3715 		return;
3716 
3717 	list_del(&slab->allocq_entry);
3718 	vdo_free(vdo_forget(slab->journal.block));
3719 	vdo_free(vdo_forget(slab->journal.locks));
3720 	vdo_free(vdo_forget(slab->counters));
3721 	vdo_free(vdo_forget(slab->reference_blocks));
3722 	vdo_free(slab);
3723 }
3724 
initialize_slab_journal(struct vdo_slab * slab)3725 static int initialize_slab_journal(struct vdo_slab *slab)
3726 {
3727 	struct slab_journal *journal = &slab->journal;
3728 	const struct slab_config *slab_config = &slab->allocator->depot->slab_config;
3729 	int result;
3730 
3731 	result = vdo_allocate(slab_config->slab_journal_blocks, struct journal_lock,
3732 			      __func__, &journal->locks);
3733 	if (result != VDO_SUCCESS)
3734 		return result;
3735 
3736 	result = vdo_allocate(VDO_BLOCK_SIZE, char, "struct packed_slab_journal_block",
3737 			      (char **) &journal->block);
3738 	if (result != VDO_SUCCESS)
3739 		return result;
3740 
3741 	journal->slab = slab;
3742 	journal->size = slab_config->slab_journal_blocks;
3743 	journal->flushing_threshold = slab_config->slab_journal_flushing_threshold;
3744 	journal->blocking_threshold = slab_config->slab_journal_blocking_threshold;
3745 	journal->scrubbing_threshold = slab_config->slab_journal_scrubbing_threshold;
3746 	journal->entries_per_block = VDO_SLAB_JOURNAL_ENTRIES_PER_BLOCK;
3747 	journal->full_entries_per_block = VDO_SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK;
3748 	journal->events = &slab->allocator->slab_journal_statistics;
3749 	journal->recovery_journal = slab->allocator->depot->vdo->recovery_journal;
3750 	journal->tail = 1;
3751 	journal->head = 1;
3752 
3753 	journal->flushing_deadline = journal->flushing_threshold;
3754 	/*
3755 	 * Set there to be some time between the deadline and the blocking threshold, so that
3756 	 * hopefully all are done before blocking.
3757 	 */
3758 	if ((journal->blocking_threshold - journal->flushing_threshold) > 5)
3759 		journal->flushing_deadline = journal->blocking_threshold - 5;
3760 
3761 	journal->slab_summary_waiter.callback = release_journal_locks;
3762 
3763 	INIT_LIST_HEAD(&journal->dirty_entry);
3764 	INIT_LIST_HEAD(&journal->uncommitted_blocks);
3765 
3766 	journal->tail_header.nonce = slab->allocator->nonce;
3767 	journal->tail_header.metadata_type = VDO_METADATA_SLAB_JOURNAL;
3768 	initialize_journal_state(journal);
3769 	return VDO_SUCCESS;
3770 }
3771 
3772 /**
3773  * make_slab() - Construct a new, empty slab.
3774  * @slab_origin: The physical block number within the block allocator partition of the first block
3775  *               in the slab.
3776  * @allocator: The block allocator to which the slab belongs.
3777  * @slab_number: The slab number of the slab.
3778  * @is_new: true if this slab is being allocated as part of a resize.
3779  * @slab_ptr: A pointer to receive the new slab.
3780  *
3781  * Return: VDO_SUCCESS or an error code.
3782  */
make_slab(physical_block_number_t slab_origin,struct block_allocator * allocator,slab_count_t slab_number,bool is_new,struct vdo_slab ** slab_ptr)3783 static int __must_check make_slab(physical_block_number_t slab_origin,
3784 				  struct block_allocator *allocator,
3785 				  slab_count_t slab_number, bool is_new,
3786 				  struct vdo_slab **slab_ptr)
3787 {
3788 	const struct slab_config *slab_config = &allocator->depot->slab_config;
3789 	struct vdo_slab *slab;
3790 	int result;
3791 
3792 	result = vdo_allocate(1, struct vdo_slab, __func__, &slab);
3793 	if (result != VDO_SUCCESS)
3794 		return result;
3795 
3796 	*slab = (struct vdo_slab) {
3797 		.allocator = allocator,
3798 		.start = slab_origin,
3799 		.end = slab_origin + slab_config->slab_blocks,
3800 		.slab_number = slab_number,
3801 		.ref_counts_origin = slab_origin + slab_config->data_blocks,
3802 		.journal_origin =
3803 			vdo_get_slab_journal_start_block(slab_config, slab_origin),
3804 		.block_count = slab_config->data_blocks,
3805 		.free_blocks = slab_config->data_blocks,
3806 		.reference_block_count =
3807 			vdo_get_saved_reference_count_size(slab_config->data_blocks),
3808 	};
3809 	INIT_LIST_HEAD(&slab->allocq_entry);
3810 
3811 	result = initialize_slab_journal(slab);
3812 	if (result != VDO_SUCCESS) {
3813 		free_slab(slab);
3814 		return result;
3815 	}
3816 
3817 	if (is_new) {
3818 		vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NEW);
3819 		result = allocate_slab_counters(slab);
3820 		if (result != VDO_SUCCESS) {
3821 			free_slab(slab);
3822 			return result;
3823 		}
3824 	} else {
3825 		vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
3826 	}
3827 
3828 	*slab_ptr = slab;
3829 	return VDO_SUCCESS;
3830 }
3831 
3832 /**
3833  * allocate_slabs() - Allocate a new slab pointer array.
3834  * @depot: The depot.
3835  * @slab_count: The number of slabs the depot should have in the new array.
3836  *
3837  * Any existing slab pointers will be copied into the new array, and slabs will be allocated as
3838  * needed. The newly allocated slabs will not be distributed for use by the block allocators.
3839  *
3840  * Return: VDO_SUCCESS or an error code.
3841  */
allocate_slabs(struct slab_depot * depot,slab_count_t slab_count)3842 static int allocate_slabs(struct slab_depot *depot, slab_count_t slab_count)
3843 {
3844 	block_count_t slab_size;
3845 	bool resizing = false;
3846 	physical_block_number_t slab_origin;
3847 	int result;
3848 
3849 	result = vdo_allocate(slab_count, struct vdo_slab *,
3850 			      "slab pointer array", &depot->new_slabs);
3851 	if (result != VDO_SUCCESS)
3852 		return result;
3853 
3854 	if (depot->slabs != NULL) {
3855 		memcpy(depot->new_slabs, depot->slabs,
3856 		       depot->slab_count * sizeof(struct vdo_slab *));
3857 		resizing = true;
3858 	}
3859 
3860 	slab_size = depot->slab_config.slab_blocks;
3861 	slab_origin = depot->first_block + (depot->slab_count * slab_size);
3862 
3863 	for (depot->new_slab_count = depot->slab_count;
3864 	     depot->new_slab_count < slab_count;
3865 	     depot->new_slab_count++, slab_origin += slab_size) {
3866 		struct block_allocator *allocator =
3867 			&depot->allocators[depot->new_slab_count % depot->zone_count];
3868 		struct vdo_slab **slab_ptr = &depot->new_slabs[depot->new_slab_count];
3869 
3870 		result = make_slab(slab_origin, allocator, depot->new_slab_count,
3871 				   resizing, slab_ptr);
3872 		if (result != VDO_SUCCESS)
3873 			return result;
3874 	}
3875 
3876 	return VDO_SUCCESS;
3877 }
3878 
3879 /**
3880  * vdo_abandon_new_slabs() - Abandon any new slabs in this depot, freeing them as needed.
3881  * @depot: The depot.
3882  */
vdo_abandon_new_slabs(struct slab_depot * depot)3883 void vdo_abandon_new_slabs(struct slab_depot *depot)
3884 {
3885 	slab_count_t i;
3886 
3887 	if (depot->new_slabs == NULL)
3888 		return;
3889 
3890 	for (i = depot->slab_count; i < depot->new_slab_count; i++)
3891 		free_slab(vdo_forget(depot->new_slabs[i]));
3892 	depot->new_slab_count = 0;
3893 	depot->new_size = 0;
3894 	vdo_free(vdo_forget(depot->new_slabs));
3895 }
3896 
3897 /**
3898  * get_allocator_thread_id() - Get the ID of the thread on which a given allocator operates.
3899  *
3900  * Implements vdo_zone_thread_getter_fn.
3901  */
get_allocator_thread_id(void * context,zone_count_t zone_number)3902 static thread_id_t get_allocator_thread_id(void *context, zone_count_t zone_number)
3903 {
3904 	return ((struct slab_depot *) context)->allocators[zone_number].thread_id;
3905 }
3906 
3907 /**
3908  * release_recovery_journal_lock() - Request the slab journal to release the recovery journal lock
3909  *                                   it may hold on a specified recovery journal block.
3910  * @journal: The slab journal.
3911  * @recovery_lock: The sequence number of the recovery journal block whose locks should be
3912  *                 released.
3913  *
3914  * Return: true if the journal does hold a lock on the specified block (which it will release).
3915  */
release_recovery_journal_lock(struct slab_journal * journal,sequence_number_t recovery_lock)3916 static bool __must_check release_recovery_journal_lock(struct slab_journal *journal,
3917 						       sequence_number_t recovery_lock)
3918 {
3919 	if (recovery_lock > journal->recovery_lock) {
3920 		VDO_ASSERT_LOG_ONLY((recovery_lock < journal->recovery_lock),
3921 				    "slab journal recovery lock is not older than the recovery journal head");
3922 		return false;
3923 	}
3924 
3925 	if ((recovery_lock < journal->recovery_lock) ||
3926 	    vdo_is_read_only(journal->slab->allocator->depot->vdo))
3927 		return false;
3928 
3929 	/* All locks are held by the block which is in progress; write it. */
3930 	commit_tail(journal);
3931 	return true;
3932 }
3933 
3934 /*
3935  * Request a commit of all dirty tail blocks which are locking the recovery journal block the depot
3936  * is seeking to release.
3937  *
3938  * Implements vdo_zone_action_fn.
3939  */
release_tail_block_locks(void * context,zone_count_t zone_number,struct vdo_completion * parent)3940 static void release_tail_block_locks(void *context, zone_count_t zone_number,
3941 				     struct vdo_completion *parent)
3942 {
3943 	struct slab_journal *journal, *tmp;
3944 	struct slab_depot *depot = context;
3945 	struct list_head *list = &depot->allocators[zone_number].dirty_slab_journals;
3946 
3947 	list_for_each_entry_safe(journal, tmp, list, dirty_entry) {
3948 		if (!release_recovery_journal_lock(journal,
3949 						   depot->active_release_request))
3950 			break;
3951 	}
3952 
3953 	vdo_finish_completion(parent);
3954 }
3955 
3956 /**
3957  * prepare_for_tail_block_commit() - Prepare to commit oldest tail blocks.
3958  *
3959  * Implements vdo_action_preamble_fn.
3960  */
prepare_for_tail_block_commit(void * context,struct vdo_completion * parent)3961 static void prepare_for_tail_block_commit(void *context, struct vdo_completion *parent)
3962 {
3963 	struct slab_depot *depot = context;
3964 
3965 	depot->active_release_request = depot->new_release_request;
3966 	vdo_finish_completion(parent);
3967 }
3968 
3969 /**
3970  * schedule_tail_block_commit() - Schedule a tail block commit if necessary.
3971  *
3972  * This method should not be called directly. Rather, call vdo_schedule_default_action() on the
3973  * depot's action manager.
3974  *
3975  * Implements vdo_action_scheduler_fn.
3976  */
schedule_tail_block_commit(void * context)3977 static bool schedule_tail_block_commit(void *context)
3978 {
3979 	struct slab_depot *depot = context;
3980 
3981 	if (depot->new_release_request == depot->active_release_request)
3982 		return false;
3983 
3984 	return vdo_schedule_action(depot->action_manager,
3985 				   prepare_for_tail_block_commit,
3986 				   release_tail_block_locks,
3987 				   NULL, NULL);
3988 }
3989 
3990 /**
3991  * initialize_slab_scrubber() - Initialize an allocator's slab scrubber.
3992  * @allocator: The allocator being initialized
3993  *
3994  * Return: VDO_SUCCESS or an error.
3995  */
initialize_slab_scrubber(struct block_allocator * allocator)3996 static int initialize_slab_scrubber(struct block_allocator *allocator)
3997 {
3998 	struct slab_scrubber *scrubber = &allocator->scrubber;
3999 	block_count_t slab_journal_size =
4000 		allocator->depot->slab_config.slab_journal_blocks;
4001 	char *journal_data;
4002 	int result;
4003 
4004 	result = vdo_allocate(VDO_BLOCK_SIZE * slab_journal_size,
4005 			      char, __func__, &journal_data);
4006 	if (result != VDO_SUCCESS)
4007 		return result;
4008 
4009 	result = allocate_vio_components(allocator->completion.vdo,
4010 					 VIO_TYPE_SLAB_JOURNAL,
4011 					 VIO_PRIORITY_METADATA,
4012 					 allocator, slab_journal_size,
4013 					 journal_data, &scrubber->vio);
4014 	if (result != VDO_SUCCESS) {
4015 		vdo_free(journal_data);
4016 		return result;
4017 	}
4018 
4019 	INIT_LIST_HEAD(&scrubber->high_priority_slabs);
4020 	INIT_LIST_HEAD(&scrubber->slabs);
4021 	vdo_set_admin_state_code(&scrubber->admin_state, VDO_ADMIN_STATE_SUSPENDED);
4022 	return VDO_SUCCESS;
4023 }
4024 
4025 /**
4026  * initialize_slab_summary_block() - Initialize a slab_summary_block.
4027  * @allocator: The allocator which owns the block.
4028  * @index: The index of this block in its zone's summary.
4029  *
4030  * Return: VDO_SUCCESS or an error.
4031  */
initialize_slab_summary_block(struct block_allocator * allocator,block_count_t index)4032 static int __must_check initialize_slab_summary_block(struct block_allocator *allocator,
4033 						      block_count_t index)
4034 {
4035 	struct slab_summary_block *block = &allocator->summary_blocks[index];
4036 	int result;
4037 
4038 	result = vdo_allocate(VDO_BLOCK_SIZE, char, __func__, &block->outgoing_entries);
4039 	if (result != VDO_SUCCESS)
4040 		return result;
4041 
4042 	result = allocate_vio_components(allocator->depot->vdo, VIO_TYPE_SLAB_SUMMARY,
4043 					 VIO_PRIORITY_METADATA, NULL, 1,
4044 					 block->outgoing_entries, &block->vio);
4045 	if (result != VDO_SUCCESS)
4046 		return result;
4047 
4048 	block->allocator = allocator;
4049 	block->entries = &allocator->summary_entries[VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK * index];
4050 	block->index = index;
4051 	return VDO_SUCCESS;
4052 }
4053 
initialize_block_allocator(struct slab_depot * depot,zone_count_t zone)4054 static int __must_check initialize_block_allocator(struct slab_depot *depot,
4055 						   zone_count_t zone)
4056 {
4057 	int result;
4058 	block_count_t i;
4059 	struct block_allocator *allocator = &depot->allocators[zone];
4060 	struct vdo *vdo = depot->vdo;
4061 	block_count_t max_free_blocks = depot->slab_config.data_blocks;
4062 	unsigned int max_priority = (2 + ilog2(max_free_blocks));
4063 	u32 reference_block_count, refcount_reads_needed, refcount_blocks_per_vio;
4064 
4065 	*allocator = (struct block_allocator) {
4066 		.depot = depot,
4067 		.zone_number = zone,
4068 		.thread_id = vdo->thread_config.physical_threads[zone],
4069 		.nonce = vdo->states.vdo.nonce,
4070 	};
4071 
4072 	INIT_LIST_HEAD(&allocator->dirty_slab_journals);
4073 	vdo_set_admin_state_code(&allocator->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
4074 	result = vdo_register_read_only_listener(vdo, allocator,
4075 						 notify_block_allocator_of_read_only_mode,
4076 						 allocator->thread_id);
4077 	if (result != VDO_SUCCESS)
4078 		return result;
4079 
4080 	vdo_initialize_completion(&allocator->completion, vdo, VDO_BLOCK_ALLOCATOR_COMPLETION);
4081 	result = make_vio_pool(vdo, BLOCK_ALLOCATOR_VIO_POOL_SIZE, 1, allocator->thread_id,
4082 			       VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
4083 			       allocator, &allocator->vio_pool);
4084 	if (result != VDO_SUCCESS)
4085 		return result;
4086 
4087 	/* Initialize the refcount-reading vio pool. */
4088 	reference_block_count = vdo_get_saved_reference_count_size(depot->slab_config.slab_blocks);
4089 	refcount_reads_needed = DIV_ROUND_UP(reference_block_count, MAX_BLOCKS_PER_VIO);
4090 	refcount_blocks_per_vio = DIV_ROUND_UP(reference_block_count, refcount_reads_needed);
4091 	allocator->refcount_blocks_per_big_vio = refcount_blocks_per_vio;
4092 	result = make_vio_pool(vdo, BLOCK_ALLOCATOR_REFCOUNT_VIO_POOL_SIZE,
4093 			       allocator->refcount_blocks_per_big_vio, allocator->thread_id,
4094 			       VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
4095 			       NULL, &allocator->refcount_big_vio_pool);
4096 	if (result != VDO_SUCCESS)
4097 		return result;
4098 
4099 	result = initialize_slab_scrubber(allocator);
4100 	if (result != VDO_SUCCESS)
4101 		return result;
4102 
4103 	result = vdo_make_priority_table(max_priority, &allocator->prioritized_slabs);
4104 	if (result != VDO_SUCCESS)
4105 		return result;
4106 
4107 	result = vdo_allocate(VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE,
4108 			      struct slab_summary_block, __func__,
4109 			      &allocator->summary_blocks);
4110 	if (result != VDO_SUCCESS)
4111 		return result;
4112 
4113 	vdo_set_admin_state_code(&allocator->summary_state,
4114 				 VDO_ADMIN_STATE_NORMAL_OPERATION);
4115 	allocator->summary_entries = depot->summary_entries + (MAX_VDO_SLABS * zone);
4116 
4117 	/* Initialize each summary block. */
4118 	for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4119 		result = initialize_slab_summary_block(allocator, i);
4120 		if (result != VDO_SUCCESS)
4121 			return result;
4122 	}
4123 
4124 	/*
4125 	 * Performing well atop thin provisioned storage requires either that VDO discards freed
4126 	 * blocks, or that the block allocator try to use slabs that already have allocated blocks
4127 	 * in preference to slabs that have never been opened. For reasons we have not been able to
4128 	 * fully understand, some SSD machines have been have been very sensitive (50% reduction in
4129 	 * test throughput) to very slight differences in the timing and locality of block
4130 	 * allocation. Assigning a low priority to unopened slabs (max_priority/2, say) would be
4131 	 * ideal for the story, but anything less than a very high threshold (max_priority - 1)
4132 	 * hurts on these machines.
4133 	 *
4134 	 * This sets the free block threshold for preferring to open an unopened slab to the binary
4135 	 * floor of 3/4ths the total number of data blocks in a slab, which will generally evaluate
4136 	 * to about half the slab size.
4137 	 */
4138 	allocator->unopened_slab_priority = (1 + ilog2((max_free_blocks * 3) / 4));
4139 
4140 	return VDO_SUCCESS;
4141 }
4142 
allocate_components(struct slab_depot * depot,struct partition * summary_partition)4143 static int allocate_components(struct slab_depot *depot,
4144 			       struct partition *summary_partition)
4145 {
4146 	int result;
4147 	zone_count_t zone;
4148 	slab_count_t slab_count;
4149 	u8 hint;
4150 	u32 i;
4151 	const struct thread_config *thread_config = &depot->vdo->thread_config;
4152 
4153 	result = vdo_make_action_manager(depot->zone_count, get_allocator_thread_id,
4154 					 thread_config->journal_thread, depot,
4155 					 schedule_tail_block_commit,
4156 					 depot->vdo, &depot->action_manager);
4157 	if (result != VDO_SUCCESS)
4158 		return result;
4159 
4160 	depot->origin = depot->first_block;
4161 
4162 	/* block size must be a multiple of entry size */
4163 	BUILD_BUG_ON((VDO_BLOCK_SIZE % sizeof(struct slab_summary_entry)) != 0);
4164 
4165 	depot->summary_origin = summary_partition->offset;
4166 	depot->hint_shift = vdo_get_slab_summary_hint_shift(depot->slab_size_shift);
4167 	result = vdo_allocate(MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES,
4168 			      struct slab_summary_entry, __func__,
4169 			      &depot->summary_entries);
4170 	if (result != VDO_SUCCESS)
4171 		return result;
4172 
4173 
4174 	/* Initialize all the entries. */
4175 	hint = compute_fullness_hint(depot, depot->slab_config.data_blocks);
4176 	for (i = 0; i < MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES; i++) {
4177 		/*
4178 		 * This default tail block offset must be reflected in
4179 		 * slabJournal.c::read_slab_journal_tail().
4180 		 */
4181 		depot->summary_entries[i] = (struct slab_summary_entry) {
4182 			.tail_block_offset = 0,
4183 			.fullness_hint = hint,
4184 			.load_ref_counts = false,
4185 			.is_dirty = false,
4186 		};
4187 	}
4188 
4189 	slab_count = vdo_compute_slab_count(depot->first_block, depot->last_block,
4190 					    depot->slab_size_shift);
4191 	if (thread_config->physical_zone_count > slab_count) {
4192 		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
4193 					      "%u physical zones exceeds slab count %u",
4194 					      thread_config->physical_zone_count,
4195 					      slab_count);
4196 	}
4197 
4198 	/* Initialize the block allocators. */
4199 	for (zone = 0; zone < depot->zone_count; zone++) {
4200 		result = initialize_block_allocator(depot, zone);
4201 		if (result != VDO_SUCCESS)
4202 			return result;
4203 	}
4204 
4205 	/* Allocate slabs. */
4206 	result = allocate_slabs(depot, slab_count);
4207 	if (result != VDO_SUCCESS)
4208 		return result;
4209 
4210 	/* Use the new slabs. */
4211 	for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4212 		struct vdo_slab *slab = depot->new_slabs[i];
4213 
4214 		register_slab_with_allocator(slab->allocator, slab);
4215 		WRITE_ONCE(depot->slab_count, depot->slab_count + 1);
4216 	}
4217 
4218 	depot->slabs = depot->new_slabs;
4219 	depot->new_slabs = NULL;
4220 	depot->new_slab_count = 0;
4221 
4222 	return VDO_SUCCESS;
4223 }
4224 
4225 /**
4226  * vdo_decode_slab_depot() - Make a slab depot and configure it with the state read from the super
4227  *                           block.
4228  * @state: The slab depot state from the super block.
4229  * @vdo: The VDO which will own the depot.
4230  * @summary_partition: The partition which holds the slab summary.
4231  * @depot_ptr: A pointer to hold the depot.
4232  *
4233  * Return: A success or error code.
4234  */
vdo_decode_slab_depot(struct slab_depot_state_2_0 state,struct vdo * vdo,struct partition * summary_partition,struct slab_depot ** depot_ptr)4235 int vdo_decode_slab_depot(struct slab_depot_state_2_0 state, struct vdo *vdo,
4236 			  struct partition *summary_partition,
4237 			  struct slab_depot **depot_ptr)
4238 {
4239 	unsigned int slab_size_shift;
4240 	struct slab_depot *depot;
4241 	int result;
4242 
4243 	/*
4244 	 * Calculate the bit shift for efficiently mapping block numbers to slabs. Using a shift
4245 	 * requires that the slab size be a power of two.
4246 	 */
4247 	block_count_t slab_size = state.slab_config.slab_blocks;
4248 
4249 	if (!is_power_of_2(slab_size)) {
4250 		return vdo_log_error_strerror(UDS_INVALID_ARGUMENT,
4251 					      "slab size must be a power of two");
4252 	}
4253 	slab_size_shift = ilog2(slab_size);
4254 
4255 	result = vdo_allocate_extended(struct slab_depot,
4256 				       vdo->thread_config.physical_zone_count,
4257 				       struct block_allocator, __func__, &depot);
4258 	if (result != VDO_SUCCESS)
4259 		return result;
4260 
4261 	depot->vdo = vdo;
4262 	depot->old_zone_count = state.zone_count;
4263 	depot->zone_count = vdo->thread_config.physical_zone_count;
4264 	depot->slab_config = state.slab_config;
4265 	depot->first_block = state.first_block;
4266 	depot->last_block = state.last_block;
4267 	depot->slab_size_shift = slab_size_shift;
4268 
4269 	result = allocate_components(depot, summary_partition);
4270 	if (result != VDO_SUCCESS) {
4271 		vdo_free_slab_depot(depot);
4272 		return result;
4273 	}
4274 
4275 	*depot_ptr = depot;
4276 	return VDO_SUCCESS;
4277 }
4278 
uninitialize_allocator_summary(struct block_allocator * allocator)4279 static void uninitialize_allocator_summary(struct block_allocator *allocator)
4280 {
4281 	block_count_t i;
4282 
4283 	if (allocator->summary_blocks == NULL)
4284 		return;
4285 
4286 	for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4287 		free_vio_components(&allocator->summary_blocks[i].vio);
4288 		vdo_free(vdo_forget(allocator->summary_blocks[i].outgoing_entries));
4289 	}
4290 
4291 	vdo_free(vdo_forget(allocator->summary_blocks));
4292 }
4293 
4294 /**
4295  * vdo_free_slab_depot() - Destroy a slab depot.
4296  * @depot: The depot to destroy.
4297  */
vdo_free_slab_depot(struct slab_depot * depot)4298 void vdo_free_slab_depot(struct slab_depot *depot)
4299 {
4300 	zone_count_t zone = 0;
4301 
4302 	if (depot == NULL)
4303 		return;
4304 
4305 	vdo_abandon_new_slabs(depot);
4306 
4307 	for (zone = 0; zone < depot->zone_count; zone++) {
4308 		struct block_allocator *allocator = &depot->allocators[zone];
4309 
4310 		if (allocator->eraser != NULL)
4311 			dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
4312 
4313 		uninitialize_allocator_summary(allocator);
4314 		uninitialize_scrubber_vio(&allocator->scrubber);
4315 		free_vio_pool(vdo_forget(allocator->vio_pool));
4316 		free_vio_pool(vdo_forget(allocator->refcount_big_vio_pool));
4317 		vdo_free_priority_table(vdo_forget(allocator->prioritized_slabs));
4318 	}
4319 
4320 	if (depot->slabs != NULL) {
4321 		slab_count_t i;
4322 
4323 		for (i = 0; i < depot->slab_count; i++)
4324 			free_slab(vdo_forget(depot->slabs[i]));
4325 	}
4326 
4327 	vdo_free(vdo_forget(depot->slabs));
4328 	vdo_free(vdo_forget(depot->action_manager));
4329 	vdo_free(vdo_forget(depot->summary_entries));
4330 	vdo_free(depot);
4331 }
4332 
4333 /**
4334  * vdo_record_slab_depot() - Record the state of a slab depot for encoding into the super block.
4335  * @depot: The depot to encode.
4336  *
4337  * Return: The depot state.
4338  */
vdo_record_slab_depot(const struct slab_depot * depot)4339 struct slab_depot_state_2_0 vdo_record_slab_depot(const struct slab_depot *depot)
4340 {
4341 	/*
4342 	 * If this depot is currently using 0 zones, it must have been synchronously loaded by a
4343 	 * tool and is now being saved. We did not load and combine the slab summary, so we still
4344 	 * need to do that next time we load with the old zone count rather than 0.
4345 	 */
4346 	struct slab_depot_state_2_0 state;
4347 	zone_count_t zones_to_record = depot->zone_count;
4348 
4349 	if (depot->zone_count == 0)
4350 		zones_to_record = depot->old_zone_count;
4351 
4352 	state = (struct slab_depot_state_2_0) {
4353 		.slab_config = depot->slab_config,
4354 		.first_block = depot->first_block,
4355 		.last_block = depot->last_block,
4356 		.zone_count = zones_to_record,
4357 	};
4358 
4359 	return state;
4360 }
4361 
4362 /**
4363  * vdo_allocate_reference_counters() - Allocate the reference counters for all slabs in the depot.
4364  *
4365  * Context: This method may be called only before entering normal operation from the load thread.
4366  *
4367  * Return: VDO_SUCCESS or an error.
4368  */
vdo_allocate_reference_counters(struct slab_depot * depot)4369 int vdo_allocate_reference_counters(struct slab_depot *depot)
4370 {
4371 	struct slab_iterator iterator =
4372 		get_depot_slab_iterator(depot, depot->slab_count - 1, 0, 1);
4373 
4374 	while (iterator.next != NULL) {
4375 		int result = allocate_slab_counters(next_slab(&iterator));
4376 
4377 		if (result != VDO_SUCCESS)
4378 			return result;
4379 	}
4380 
4381 	return VDO_SUCCESS;
4382 }
4383 
4384 /**
4385  * get_slab_number() - Get the number of the slab that contains a specified block.
4386  * @depot: The slab depot.
4387  * @pbn: The physical block number.
4388  * @slab_number_ptr: A pointer to hold the slab number.
4389  *
4390  * Return: VDO_SUCCESS or an error.
4391  */
get_slab_number(const struct slab_depot * depot,physical_block_number_t pbn,slab_count_t * slab_number_ptr)4392 static int __must_check get_slab_number(const struct slab_depot *depot,
4393 					physical_block_number_t pbn,
4394 					slab_count_t *slab_number_ptr)
4395 {
4396 	slab_count_t slab_number;
4397 
4398 	if (pbn < depot->first_block)
4399 		return VDO_OUT_OF_RANGE;
4400 
4401 	slab_number = (pbn - depot->first_block) >> depot->slab_size_shift;
4402 	if (slab_number >= depot->slab_count)
4403 		return VDO_OUT_OF_RANGE;
4404 
4405 	*slab_number_ptr = slab_number;
4406 	return VDO_SUCCESS;
4407 }
4408 
4409 /**
4410  * vdo_get_slab() - Get the slab object for the slab that contains a specified block.
4411  * @depot: The slab depot.
4412  * @pbn: The physical block number.
4413  *
4414  * Will put the VDO in read-only mode if the PBN is not a valid data block nor the zero block.
4415  *
4416  * Return: The slab containing the block, or NULL if the block number is the zero block or
4417  * otherwise out of range.
4418  */
vdo_get_slab(const struct slab_depot * depot,physical_block_number_t pbn)4419 struct vdo_slab *vdo_get_slab(const struct slab_depot *depot,
4420 			      physical_block_number_t pbn)
4421 {
4422 	slab_count_t slab_number;
4423 	int result;
4424 
4425 	if (pbn == VDO_ZERO_BLOCK)
4426 		return NULL;
4427 
4428 	result = get_slab_number(depot, pbn, &slab_number);
4429 	if (result != VDO_SUCCESS) {
4430 		vdo_enter_read_only_mode(depot->vdo, result);
4431 		return NULL;
4432 	}
4433 
4434 	return depot->slabs[slab_number];
4435 }
4436 
4437 /**
4438  * vdo_get_increment_limit() - Determine how many new references a block can acquire.
4439  * @depot: The slab depot.
4440  * @pbn: The physical block number that is being queried.
4441  *
4442  * Context: This method must be called from the physical zone thread of the PBN.
4443  *
4444  * Return: The number of available references.
4445  */
vdo_get_increment_limit(struct slab_depot * depot,physical_block_number_t pbn)4446 u8 vdo_get_increment_limit(struct slab_depot *depot, physical_block_number_t pbn)
4447 {
4448 	struct vdo_slab *slab = vdo_get_slab(depot, pbn);
4449 	vdo_refcount_t *counter_ptr = NULL;
4450 	int result;
4451 
4452 	if ((slab == NULL) || (slab->status != VDO_SLAB_REBUILT))
4453 		return 0;
4454 
4455 	result = get_reference_counter(slab, pbn, &counter_ptr);
4456 	if (result != VDO_SUCCESS)
4457 		return 0;
4458 
4459 	if (*counter_ptr == PROVISIONAL_REFERENCE_COUNT)
4460 		return (MAXIMUM_REFERENCE_COUNT - 1);
4461 
4462 	return (MAXIMUM_REFERENCE_COUNT - *counter_ptr);
4463 }
4464 
4465 /**
4466  * vdo_is_physical_data_block() - Determine whether the given PBN refers to a data block.
4467  * @depot: The depot.
4468  * @pbn: The physical block number to ask about.
4469  *
4470  * Return: True if the PBN corresponds to a data block.
4471  */
vdo_is_physical_data_block(const struct slab_depot * depot,physical_block_number_t pbn)4472 bool vdo_is_physical_data_block(const struct slab_depot *depot,
4473 				physical_block_number_t pbn)
4474 {
4475 	slab_count_t slab_number;
4476 	slab_block_number sbn;
4477 
4478 	return ((pbn == VDO_ZERO_BLOCK) ||
4479 		((get_slab_number(depot, pbn, &slab_number) == VDO_SUCCESS) &&
4480 		 (slab_block_number_from_pbn(depot->slabs[slab_number], pbn, &sbn) ==
4481 		  VDO_SUCCESS)));
4482 }
4483 
4484 /**
4485  * vdo_get_slab_depot_allocated_blocks() - Get the total number of data blocks allocated across all
4486  * the slabs in the depot.
4487  * @depot: The slab depot.
4488  *
4489  * This is the total number of blocks with a non-zero reference count.
4490  *
4491  * Context: This may be called from any thread.
4492  *
4493  * Return: The total number of blocks with a non-zero reference count.
4494  */
vdo_get_slab_depot_allocated_blocks(const struct slab_depot * depot)4495 block_count_t vdo_get_slab_depot_allocated_blocks(const struct slab_depot *depot)
4496 {
4497 	block_count_t total = 0;
4498 	zone_count_t zone;
4499 
4500 	for (zone = 0; zone < depot->zone_count; zone++) {
4501 		/* The allocators are responsible for thread safety. */
4502 		total += READ_ONCE(depot->allocators[zone].allocated_blocks);
4503 	}
4504 
4505 	return total;
4506 }
4507 
4508 /**
4509  * vdo_get_slab_depot_data_blocks() - Get the total number of data blocks in all the slabs in the
4510  *                                    depot.
4511  * @depot: The slab depot.
4512  *
4513  * Context: This may be called from any thread.
4514  *
4515  * Return: The total number of data blocks in all slabs.
4516  */
vdo_get_slab_depot_data_blocks(const struct slab_depot * depot)4517 block_count_t vdo_get_slab_depot_data_blocks(const struct slab_depot *depot)
4518 {
4519 	return (READ_ONCE(depot->slab_count) * depot->slab_config.data_blocks);
4520 }
4521 
4522 /**
4523  * finish_combining_zones() - Clean up after saving out the combined slab summary.
4524  * @completion: The vio which was used to write the summary data.
4525  */
finish_combining_zones(struct vdo_completion * completion)4526 static void finish_combining_zones(struct vdo_completion *completion)
4527 {
4528 	int result = completion->result;
4529 	struct vdo_completion *parent = completion->parent;
4530 
4531 	free_vio(as_vio(vdo_forget(completion)));
4532 	vdo_fail_completion(parent, result);
4533 }
4534 
handle_combining_error(struct vdo_completion * completion)4535 static void handle_combining_error(struct vdo_completion *completion)
4536 {
4537 	vio_record_metadata_io_error(as_vio(completion));
4538 	finish_combining_zones(completion);
4539 }
4540 
write_summary_endio(struct bio * bio)4541 static void write_summary_endio(struct bio *bio)
4542 {
4543 	struct vio *vio = bio->bi_private;
4544 	struct vdo *vdo = vio->completion.vdo;
4545 
4546 	continue_vio_after_io(vio, finish_combining_zones,
4547 			      vdo->thread_config.admin_thread);
4548 }
4549 
4550 /**
4551  * combine_summaries() - Treating the current entries buffer as the on-disk value of all zones,
4552  *                       update every zone to the correct values for every slab.
4553  * @depot: The depot whose summary entries should be combined.
4554  */
combine_summaries(struct slab_depot * depot)4555 static void combine_summaries(struct slab_depot *depot)
4556 {
4557 	/*
4558 	 * Combine all the old summary data into the portion of the buffer corresponding to the
4559 	 * first zone.
4560 	 */
4561 	zone_count_t zone = 0;
4562 	struct slab_summary_entry *entries = depot->summary_entries;
4563 
4564 	if (depot->old_zone_count > 1) {
4565 		slab_count_t entry_number;
4566 
4567 		for (entry_number = 0; entry_number < MAX_VDO_SLABS; entry_number++) {
4568 			if (zone != 0) {
4569 				memcpy(entries + entry_number,
4570 				       entries + (zone * MAX_VDO_SLABS) + entry_number,
4571 				       sizeof(struct slab_summary_entry));
4572 			}
4573 
4574 			zone++;
4575 			if (zone == depot->old_zone_count)
4576 				zone = 0;
4577 		}
4578 	}
4579 
4580 	/* Copy the combined data to each zones's region of the buffer. */
4581 	for (zone = 1; zone < MAX_VDO_PHYSICAL_ZONES; zone++) {
4582 		memcpy(entries + (zone * MAX_VDO_SLABS), entries,
4583 		       MAX_VDO_SLABS * sizeof(struct slab_summary_entry));
4584 	}
4585 }
4586 
4587 /**
4588  * finish_loading_summary() - Finish loading slab summary data.
4589  * @completion: The vio which was used to read the summary data.
4590  *
4591  * Combines the slab summary data from all the previously written zones and copies the combined
4592  * summary to each partition's data region. Then writes the combined summary back out to disk. This
4593  * callback is registered in load_summary_endio().
4594  */
finish_loading_summary(struct vdo_completion * completion)4595 static void finish_loading_summary(struct vdo_completion *completion)
4596 {
4597 	struct slab_depot *depot = completion->vdo->depot;
4598 
4599 	/* Combine the summary from each zone so each zone is correct for all slabs. */
4600 	combine_summaries(depot);
4601 
4602 	/* Write the combined summary back out. */
4603 	vdo_submit_metadata_vio(as_vio(completion), depot->summary_origin,
4604 				write_summary_endio, handle_combining_error,
4605 				REQ_OP_WRITE);
4606 }
4607 
load_summary_endio(struct bio * bio)4608 static void load_summary_endio(struct bio *bio)
4609 {
4610 	struct vio *vio = bio->bi_private;
4611 	struct vdo *vdo = vio->completion.vdo;
4612 
4613 	continue_vio_after_io(vio, finish_loading_summary,
4614 			      vdo->thread_config.admin_thread);
4615 }
4616 
4617 /**
4618  * load_slab_summary() - The preamble of a load operation.
4619  *
4620  * Implements vdo_action_preamble_fn.
4621  */
load_slab_summary(void * context,struct vdo_completion * parent)4622 static void load_slab_summary(void *context, struct vdo_completion *parent)
4623 {
4624 	int result;
4625 	struct vio *vio;
4626 	struct slab_depot *depot = context;
4627 	const struct admin_state_code *operation =
4628 		vdo_get_current_manager_operation(depot->action_manager);
4629 
4630 	result = create_multi_block_metadata_vio(depot->vdo, VIO_TYPE_SLAB_SUMMARY,
4631 						 VIO_PRIORITY_METADATA, parent,
4632 						 VDO_SLAB_SUMMARY_BLOCKS,
4633 						 (char *) depot->summary_entries, &vio);
4634 	if (result != VDO_SUCCESS) {
4635 		vdo_fail_completion(parent, result);
4636 		return;
4637 	}
4638 
4639 	if ((operation == VDO_ADMIN_STATE_FORMATTING) ||
4640 	    (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD)) {
4641 		finish_loading_summary(&vio->completion);
4642 		return;
4643 	}
4644 
4645 	vdo_submit_metadata_vio(vio, depot->summary_origin, load_summary_endio,
4646 				handle_combining_error, REQ_OP_READ);
4647 }
4648 
4649 /* Implements vdo_zone_action_fn. */
load_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)4650 static void load_allocator(void *context, zone_count_t zone_number,
4651 			   struct vdo_completion *parent)
4652 {
4653 	struct slab_depot *depot = context;
4654 
4655 	vdo_start_loading(&depot->allocators[zone_number].state,
4656 			  vdo_get_current_manager_operation(depot->action_manager),
4657 			  parent, initiate_load);
4658 }
4659 
4660 /**
4661  * vdo_load_slab_depot() - Asynchronously load any slab depot state that isn't included in the
4662  *                         super_block component.
4663  * @depot: The depot to load.
4664  * @operation: The type of load to perform.
4665  * @parent: The completion to notify when the load is complete.
4666  * @context: Additional context for the load operation; may be NULL.
4667  *
4668  * This method may be called only before entering normal operation from the load thread.
4669  */
vdo_load_slab_depot(struct slab_depot * depot,const struct admin_state_code * operation,struct vdo_completion * parent,void * context)4670 void vdo_load_slab_depot(struct slab_depot *depot,
4671 			 const struct admin_state_code *operation,
4672 			 struct vdo_completion *parent, void *context)
4673 {
4674 	if (!vdo_assert_load_operation(operation, parent))
4675 		return;
4676 
4677 	vdo_schedule_operation_with_context(depot->action_manager, operation,
4678 					    load_slab_summary, load_allocator,
4679 					    NULL, context, parent);
4680 }
4681 
4682 /* Implements vdo_zone_action_fn. */
prepare_to_allocate(void * context,zone_count_t zone_number,struct vdo_completion * parent)4683 static void prepare_to_allocate(void *context, zone_count_t zone_number,
4684 				struct vdo_completion *parent)
4685 {
4686 	struct slab_depot *depot = context;
4687 	struct block_allocator *allocator = &depot->allocators[zone_number];
4688 	int result;
4689 
4690 	result = vdo_prepare_slabs_for_allocation(allocator);
4691 	if (result != VDO_SUCCESS) {
4692 		vdo_fail_completion(parent, result);
4693 		return;
4694 	}
4695 
4696 	scrub_slabs(allocator, parent);
4697 }
4698 
4699 /**
4700  * vdo_prepare_slab_depot_to_allocate() - Prepare the slab depot to come online and start
4701  *                                        allocating blocks.
4702  * @depot: The depot to prepare.
4703  * @load_type: The load type.
4704  * @parent: The completion to notify when the operation is complete.
4705  *
4706  * This method may be called only before entering normal operation from the load thread. It must be
4707  * called before allocation may proceed.
4708  */
vdo_prepare_slab_depot_to_allocate(struct slab_depot * depot,enum slab_depot_load_type load_type,struct vdo_completion * parent)4709 void vdo_prepare_slab_depot_to_allocate(struct slab_depot *depot,
4710 					enum slab_depot_load_type load_type,
4711 					struct vdo_completion *parent)
4712 {
4713 	depot->load_type = load_type;
4714 	atomic_set(&depot->zones_to_scrub, depot->zone_count);
4715 	vdo_schedule_action(depot->action_manager, NULL,
4716 			    prepare_to_allocate, NULL, parent);
4717 }
4718 
4719 /**
4720  * vdo_update_slab_depot_size() - Update the slab depot to reflect its new size in memory.
4721  * @depot: The depot to update.
4722  *
4723  * This size is saved to disk as part of the super block.
4724  */
vdo_update_slab_depot_size(struct slab_depot * depot)4725 void vdo_update_slab_depot_size(struct slab_depot *depot)
4726 {
4727 	depot->last_block = depot->new_last_block;
4728 }
4729 
4730 /**
4731  * vdo_prepare_to_grow_slab_depot() - Allocate new memory needed for a resize of a slab depot to
4732  *                                    the given size.
4733  * @depot: The depot to prepare to resize.
4734  * @partition: The new depot partition
4735  *
4736  * Return: VDO_SUCCESS or an error.
4737  */
vdo_prepare_to_grow_slab_depot(struct slab_depot * depot,const struct partition * partition)4738 int vdo_prepare_to_grow_slab_depot(struct slab_depot *depot,
4739 				   const struct partition *partition)
4740 {
4741 	struct slab_depot_state_2_0 new_state;
4742 	int result;
4743 	slab_count_t new_slab_count;
4744 
4745 	if ((partition->count >> depot->slab_size_shift) <= depot->slab_count)
4746 		return VDO_INCREMENT_TOO_SMALL;
4747 
4748 	/* Generate the depot configuration for the new block count. */
4749 	VDO_ASSERT_LOG_ONLY(depot->first_block == partition->offset,
4750 			    "New slab depot partition doesn't change origin");
4751 	result = vdo_configure_slab_depot(partition, depot->slab_config,
4752 					  depot->zone_count, &new_state);
4753 	if (result != VDO_SUCCESS)
4754 		return result;
4755 
4756 	new_slab_count = vdo_compute_slab_count(depot->first_block,
4757 						new_state.last_block,
4758 						depot->slab_size_shift);
4759 	if (new_slab_count <= depot->slab_count)
4760 		return vdo_log_error_strerror(VDO_INCREMENT_TOO_SMALL,
4761 					      "Depot can only grow");
4762 	if (new_slab_count == depot->new_slab_count) {
4763 		/* Check it out, we've already got all the new slabs allocated! */
4764 		return VDO_SUCCESS;
4765 	}
4766 
4767 	vdo_abandon_new_slabs(depot);
4768 	result = allocate_slabs(depot, new_slab_count);
4769 	if (result != VDO_SUCCESS) {
4770 		vdo_abandon_new_slabs(depot);
4771 		return result;
4772 	}
4773 
4774 	depot->new_size = partition->count;
4775 	depot->old_last_block = depot->last_block;
4776 	depot->new_last_block = new_state.last_block;
4777 
4778 	return VDO_SUCCESS;
4779 }
4780 
4781 /**
4782  * finish_registration() - Finish registering new slabs now that all of the allocators have
4783  *                         received their new slabs.
4784  *
4785  * Implements vdo_action_conclusion_fn.
4786  */
finish_registration(void * context)4787 static int finish_registration(void *context)
4788 {
4789 	struct slab_depot *depot = context;
4790 
4791 	WRITE_ONCE(depot->slab_count, depot->new_slab_count);
4792 	vdo_free(depot->slabs);
4793 	depot->slabs = depot->new_slabs;
4794 	depot->new_slabs = NULL;
4795 	depot->new_slab_count = 0;
4796 	return VDO_SUCCESS;
4797 }
4798 
4799 /* Implements vdo_zone_action_fn. */
register_new_slabs(void * context,zone_count_t zone_number,struct vdo_completion * parent)4800 static void register_new_slabs(void *context, zone_count_t zone_number,
4801 			       struct vdo_completion *parent)
4802 {
4803 	struct slab_depot *depot = context;
4804 	struct block_allocator *allocator = &depot->allocators[zone_number];
4805 	slab_count_t i;
4806 
4807 	for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4808 		struct vdo_slab *slab = depot->new_slabs[i];
4809 
4810 		if (slab->allocator == allocator)
4811 			register_slab_with_allocator(allocator, slab);
4812 	}
4813 
4814 	vdo_finish_completion(parent);
4815 }
4816 
4817 /**
4818  * vdo_use_new_slabs() - Use the new slabs allocated for resize.
4819  * @depot: The depot.
4820  * @parent: The object to notify when complete.
4821  */
vdo_use_new_slabs(struct slab_depot * depot,struct vdo_completion * parent)4822 void vdo_use_new_slabs(struct slab_depot *depot, struct vdo_completion *parent)
4823 {
4824 	VDO_ASSERT_LOG_ONLY(depot->new_slabs != NULL, "Must have new slabs to use");
4825 	vdo_schedule_operation(depot->action_manager,
4826 			       VDO_ADMIN_STATE_SUSPENDED_OPERATION,
4827 			       NULL, register_new_slabs,
4828 			       finish_registration, parent);
4829 }
4830 
4831 /**
4832  * stop_scrubbing() - Tell the scrubber to stop scrubbing after it finishes the slab it is
4833  *                    currently working on.
4834  * @allocator: The block allocator owning the scrubber to stop.
4835  */
stop_scrubbing(struct block_allocator * allocator)4836 static void stop_scrubbing(struct block_allocator *allocator)
4837 {
4838 	struct slab_scrubber *scrubber = &allocator->scrubber;
4839 
4840 	if (vdo_is_state_quiescent(&scrubber->admin_state)) {
4841 		vdo_finish_completion(&allocator->completion);
4842 	} else {
4843 		vdo_start_draining(&scrubber->admin_state,
4844 				   VDO_ADMIN_STATE_SUSPENDING,
4845 				   &allocator->completion, NULL);
4846 	}
4847 }
4848 
4849 /* Implements vdo_admin_initiator_fn. */
initiate_summary_drain(struct admin_state * state)4850 static void initiate_summary_drain(struct admin_state *state)
4851 {
4852 	check_summary_drain_complete(container_of(state, struct block_allocator,
4853 						  summary_state));
4854 }
4855 
do_drain_step(struct vdo_completion * completion)4856 static void do_drain_step(struct vdo_completion *completion)
4857 {
4858 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
4859 
4860 	vdo_prepare_completion_for_requeue(&allocator->completion, do_drain_step,
4861 					   handle_operation_error, allocator->thread_id,
4862 					   NULL);
4863 	switch (++allocator->drain_step) {
4864 	case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4865 		stop_scrubbing(allocator);
4866 		return;
4867 
4868 	case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4869 		apply_to_slabs(allocator, do_drain_step);
4870 		return;
4871 
4872 	case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4873 		vdo_start_draining(&allocator->summary_state,
4874 				   vdo_get_admin_state_code(&allocator->state),
4875 				   completion, initiate_summary_drain);
4876 		return;
4877 
4878 	case VDO_DRAIN_ALLOCATOR_STEP_FINISHED:
4879 		VDO_ASSERT_LOG_ONLY(!is_vio_pool_busy(allocator->vio_pool),
4880 				    "vio pool not busy");
4881 		vdo_finish_draining_with_result(&allocator->state, completion->result);
4882 		return;
4883 
4884 	default:
4885 		vdo_finish_draining_with_result(&allocator->state, UDS_BAD_STATE);
4886 	}
4887 }
4888 
4889 /* Implements vdo_admin_initiator_fn. */
initiate_drain(struct admin_state * state)4890 static void initiate_drain(struct admin_state *state)
4891 {
4892 	struct block_allocator *allocator =
4893 		container_of(state, struct block_allocator, state);
4894 
4895 	allocator->drain_step = VDO_DRAIN_ALLOCATOR_START;
4896 	do_drain_step(&allocator->completion);
4897 }
4898 
4899 /*
4900  * Drain all allocator I/O. Depending upon the type of drain, some or all dirty metadata may be
4901  * written to disk. The type of drain will be determined from the state of the allocator's depot.
4902  *
4903  * Implements vdo_zone_action_fn.
4904  */
drain_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)4905 static void drain_allocator(void *context, zone_count_t zone_number,
4906 			    struct vdo_completion *parent)
4907 {
4908 	struct slab_depot *depot = context;
4909 
4910 	vdo_start_draining(&depot->allocators[zone_number].state,
4911 			   vdo_get_current_manager_operation(depot->action_manager),
4912 			   parent, initiate_drain);
4913 }
4914 
4915 /**
4916  * vdo_drain_slab_depot() - Drain all slab depot I/O.
4917  * @depot: The depot to drain.
4918  * @operation: The drain operation (flush, rebuild, suspend, or save).
4919  * @parent: The completion to finish when the drain is complete.
4920  *
4921  * If saving, or flushing, all dirty depot metadata will be written out. If saving or suspending,
4922  * the depot will be left in a suspended state.
4923  */
vdo_drain_slab_depot(struct slab_depot * depot,const struct admin_state_code * operation,struct vdo_completion * parent)4924 void vdo_drain_slab_depot(struct slab_depot *depot,
4925 			  const struct admin_state_code *operation,
4926 			  struct vdo_completion *parent)
4927 {
4928 	vdo_schedule_operation(depot->action_manager, operation,
4929 			       NULL, drain_allocator, NULL, parent);
4930 }
4931 
4932 /**
4933  * resume_scrubbing() - Tell the scrubber to resume scrubbing if it has been stopped.
4934  * @allocator: The allocator being resumed.
4935  */
resume_scrubbing(struct block_allocator * allocator)4936 static void resume_scrubbing(struct block_allocator *allocator)
4937 {
4938 	int result;
4939 	struct slab_scrubber *scrubber = &allocator->scrubber;
4940 
4941 	if (!has_slabs_to_scrub(scrubber)) {
4942 		vdo_finish_completion(&allocator->completion);
4943 		return;
4944 	}
4945 
4946 	result = vdo_resume_if_quiescent(&scrubber->admin_state);
4947 	if (result != VDO_SUCCESS) {
4948 		vdo_fail_completion(&allocator->completion, result);
4949 		return;
4950 	}
4951 
4952 	scrub_next_slab(scrubber);
4953 	vdo_finish_completion(&allocator->completion);
4954 }
4955 
do_resume_step(struct vdo_completion * completion)4956 static void do_resume_step(struct vdo_completion *completion)
4957 {
4958 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
4959 
4960 	vdo_prepare_completion_for_requeue(&allocator->completion, do_resume_step,
4961 					   handle_operation_error,
4962 					   allocator->thread_id, NULL);
4963 	switch (--allocator->drain_step) {
4964 	case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4965 		vdo_fail_completion(completion,
4966 				    vdo_resume_if_quiescent(&allocator->summary_state));
4967 		return;
4968 
4969 	case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4970 		apply_to_slabs(allocator, do_resume_step);
4971 		return;
4972 
4973 	case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4974 		resume_scrubbing(allocator);
4975 		return;
4976 
4977 	case VDO_DRAIN_ALLOCATOR_START:
4978 		vdo_finish_resuming_with_result(&allocator->state, completion->result);
4979 		return;
4980 
4981 	default:
4982 		vdo_finish_resuming_with_result(&allocator->state, UDS_BAD_STATE);
4983 	}
4984 }
4985 
4986 /* Implements vdo_admin_initiator_fn. */
initiate_resume(struct admin_state * state)4987 static void initiate_resume(struct admin_state *state)
4988 {
4989 	struct block_allocator *allocator =
4990 		container_of(state, struct block_allocator, state);
4991 
4992 	allocator->drain_step = VDO_DRAIN_ALLOCATOR_STEP_FINISHED;
4993 	do_resume_step(&allocator->completion);
4994 }
4995 
4996 /* Implements vdo_zone_action_fn. */
resume_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)4997 static void resume_allocator(void *context, zone_count_t zone_number,
4998 			     struct vdo_completion *parent)
4999 {
5000 	struct slab_depot *depot = context;
5001 
5002 	vdo_start_resuming(&depot->allocators[zone_number].state,
5003 			   vdo_get_current_manager_operation(depot->action_manager),
5004 			   parent, initiate_resume);
5005 }
5006 
5007 /**
5008  * vdo_resume_slab_depot() - Resume a suspended slab depot.
5009  * @depot: The depot to resume.
5010  * @parent: The completion to finish when the depot has resumed.
5011  */
vdo_resume_slab_depot(struct slab_depot * depot,struct vdo_completion * parent)5012 void vdo_resume_slab_depot(struct slab_depot *depot, struct vdo_completion *parent)
5013 {
5014 	if (vdo_is_read_only(depot->vdo)) {
5015 		vdo_continue_completion(parent, VDO_READ_ONLY);
5016 		return;
5017 	}
5018 
5019 	vdo_schedule_operation(depot->action_manager, VDO_ADMIN_STATE_RESUMING,
5020 			       NULL, resume_allocator, NULL, parent);
5021 }
5022 
5023 /**
5024  * vdo_commit_oldest_slab_journal_tail_blocks() - Commit all dirty tail blocks which are locking a
5025  *                                                given recovery journal block.
5026  * @depot: The depot.
5027  * @recovery_block_number: The sequence number of the recovery journal block whose locks should be
5028  *                         released.
5029  *
5030  * Context: This method must be called from the journal zone thread.
5031  */
vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot * depot,sequence_number_t recovery_block_number)5032 void vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot *depot,
5033 						sequence_number_t recovery_block_number)
5034 {
5035 	if (depot == NULL)
5036 		return;
5037 
5038 	depot->new_release_request = recovery_block_number;
5039 	vdo_schedule_default_action(depot->action_manager);
5040 }
5041 
5042 /* Implements vdo_zone_action_fn. */
scrub_all_unrecovered_slabs(void * context,zone_count_t zone_number,struct vdo_completion * parent)5043 static void scrub_all_unrecovered_slabs(void *context, zone_count_t zone_number,
5044 					struct vdo_completion *parent)
5045 {
5046 	struct slab_depot *depot = context;
5047 
5048 	scrub_slabs(&depot->allocators[zone_number], NULL);
5049 	vdo_launch_completion(parent);
5050 }
5051 
5052 /**
5053  * vdo_scrub_all_unrecovered_slabs() - Scrub all unrecovered slabs.
5054  * @depot: The depot to scrub.
5055  * @parent: The object to notify when scrubbing has been launched for all zones.
5056  */
vdo_scrub_all_unrecovered_slabs(struct slab_depot * depot,struct vdo_completion * parent)5057 void vdo_scrub_all_unrecovered_slabs(struct slab_depot *depot,
5058 				     struct vdo_completion *parent)
5059 {
5060 	vdo_schedule_action(depot->action_manager, NULL,
5061 			    scrub_all_unrecovered_slabs,
5062 			    NULL, parent);
5063 }
5064 
5065 /**
5066  * get_block_allocator_statistics() - Get the total of the statistics from all the block allocators
5067  *                                    in the depot.
5068  * @depot: The slab depot.
5069  *
5070  * Return: The statistics from all block allocators in the depot.
5071  */
5072 static struct block_allocator_statistics __must_check
get_block_allocator_statistics(const struct slab_depot * depot)5073 get_block_allocator_statistics(const struct slab_depot *depot)
5074 {
5075 	struct block_allocator_statistics totals;
5076 	zone_count_t zone;
5077 
5078 	memset(&totals, 0, sizeof(totals));
5079 
5080 	for (zone = 0; zone < depot->zone_count; zone++) {
5081 		const struct block_allocator *allocator = &depot->allocators[zone];
5082 		const struct block_allocator_statistics *stats = &allocator->statistics;
5083 
5084 		totals.slab_count += allocator->slab_count;
5085 		totals.slabs_opened += READ_ONCE(stats->slabs_opened);
5086 		totals.slabs_reopened += READ_ONCE(stats->slabs_reopened);
5087 	}
5088 
5089 	return totals;
5090 }
5091 
5092 /**
5093  * get_ref_counts_statistics() - Get the cumulative ref_counts statistics for the depot.
5094  * @depot: The slab depot.
5095  *
5096  * Return: The cumulative statistics for all ref_counts in the depot.
5097  */
5098 static struct ref_counts_statistics __must_check
get_ref_counts_statistics(const struct slab_depot * depot)5099 get_ref_counts_statistics(const struct slab_depot *depot)
5100 {
5101 	struct ref_counts_statistics totals;
5102 	zone_count_t zone;
5103 
5104 	memset(&totals, 0, sizeof(totals));
5105 
5106 	for (zone = 0; zone < depot->zone_count; zone++) {
5107 		totals.blocks_written +=
5108 			READ_ONCE(depot->allocators[zone].ref_counts_statistics.blocks_written);
5109 	}
5110 
5111 	return totals;
5112 }
5113 
5114 /**
5115  * get_slab_journal_statistics() - Get the aggregated slab journal statistics for the depot.
5116  * @depot: The slab depot.
5117  *
5118  * Return: The aggregated statistics for all slab journals in the depot.
5119  */
5120 static struct slab_journal_statistics __must_check
get_slab_journal_statistics(const struct slab_depot * depot)5121 get_slab_journal_statistics(const struct slab_depot *depot)
5122 {
5123 	struct slab_journal_statistics totals;
5124 	zone_count_t zone;
5125 
5126 	memset(&totals, 0, sizeof(totals));
5127 
5128 	for (zone = 0; zone < depot->zone_count; zone++) {
5129 		const struct slab_journal_statistics *stats =
5130 			&depot->allocators[zone].slab_journal_statistics;
5131 
5132 		totals.disk_full_count += READ_ONCE(stats->disk_full_count);
5133 		totals.flush_count += READ_ONCE(stats->flush_count);
5134 		totals.blocked_count += READ_ONCE(stats->blocked_count);
5135 		totals.blocks_written += READ_ONCE(stats->blocks_written);
5136 		totals.tail_busy_count += READ_ONCE(stats->tail_busy_count);
5137 	}
5138 
5139 	return totals;
5140 }
5141 
5142 /**
5143  * vdo_get_slab_depot_statistics() - Get all the vdo_statistics fields that are properties of the
5144  *                                   slab depot.
5145  * @depot: The slab depot.
5146  * @stats: The vdo statistics structure to partially fill.
5147  */
vdo_get_slab_depot_statistics(const struct slab_depot * depot,struct vdo_statistics * stats)5148 void vdo_get_slab_depot_statistics(const struct slab_depot *depot,
5149 				   struct vdo_statistics *stats)
5150 {
5151 	slab_count_t slab_count = READ_ONCE(depot->slab_count);
5152 	slab_count_t unrecovered = 0;
5153 	zone_count_t zone;
5154 
5155 	for (zone = 0; zone < depot->zone_count; zone++) {
5156 		/* The allocators are responsible for thread safety. */
5157 		unrecovered += READ_ONCE(depot->allocators[zone].scrubber.slab_count);
5158 	}
5159 
5160 	stats->recovery_percentage = (slab_count - unrecovered) * 100 / slab_count;
5161 	stats->allocator = get_block_allocator_statistics(depot);
5162 	stats->ref_counts = get_ref_counts_statistics(depot);
5163 	stats->slab_journal = get_slab_journal_statistics(depot);
5164 	stats->slab_summary = (struct slab_summary_statistics) {
5165 		.blocks_written = atomic64_read(&depot->summary_statistics.blocks_written),
5166 	};
5167 }
5168 
5169 /**
5170  * vdo_dump_slab_depot() - Dump the slab depot, in a thread-unsafe fashion.
5171  * @depot: The slab depot.
5172  */
vdo_dump_slab_depot(const struct slab_depot * depot)5173 void vdo_dump_slab_depot(const struct slab_depot *depot)
5174 {
5175 	vdo_log_info("vdo slab depot");
5176 	vdo_log_info("  zone_count=%u old_zone_count=%u slabCount=%u active_release_request=%llu new_release_request=%llu",
5177 		     (unsigned int) depot->zone_count,
5178 		     (unsigned int) depot->old_zone_count, READ_ONCE(depot->slab_count),
5179 		     (unsigned long long) depot->active_release_request,
5180 		     (unsigned long long) depot->new_release_request);
5181 }
5182